po_hi_gqueue.c 14.8 KB
Newer Older
1
2
3
4
5
/*
 * This is a part of PolyORB-HI-C distribution, a minimal
 * middleware written for generated code from AADL models.
 * You should use it with the Ocarina toolsuite.
 *
yoogx's avatar
yoogx committed
6
 * For more informations, please visit http://taste.tuxfamily.org/wiki
7
 *
yoogx's avatar
yoogx committed
8
 * Copyright (C) 2010-2018 ESA & ISAE.
9
10
11
12
13
14
15
16
17
 */

#include <po_hi_config.h>
#include <po_hi_types.h>
#include <po_hi_messages.h>
#include <po_hi_returns.h>
#include <po_hi_transport.h>
#include <po_hi_debug.h>
#include <po_hi_gqueue.h>
18
19
20
#include <po_hi_protected.h>
#include <po_hi_semaphore.h>

21
#include <po_hi_utils.h>
22
23
24
25
26
27
28
29
/* Headers from PolyORB-HI-C */

#include <deployment.h>
#include <activity.h>
#include <request.h>
/* Headers from the generated code */

#include <string.h>
30
31
#include <assert.h>
#include <stdlib.h>
32

33
#if defined (POSIX) || defined (RTEMS_POSIX) || defined (XENO_POSIX)
34
#include <pthread.h>
35
#elif defined(__PO_HI_RTEMS_CLASSIC_API)
36
37
38
39
#include <rtems.h>
#include <inttypes.h>
#include <po_hi_time.h>
#define __PO_HI_DEFAULT_PRIORITY RTEMS_NO_PRIORITY
40
41
42
#elif defined (XENO_NATIVE)
#include <native/cond.h>
#include <native/mutex.h>
43
44
#endif

45
#if defined (MONITORING) /* Headers from run-time verification */
yoogx's avatar
yoogx committed
46
#include <trace_manager.h>
47
#endif
48

yoogx's avatar
yoogx committed
49
#define __PO_HI_GQUEUE_OUT_PORT constant_out_identifier
50
51
52
/* give a default value to the out port */

__po_hi_port_t*        __po_hi_gqueues[__PO_HI_NB_TASKS];
53
54
55
56
57
58
__po_hi_port_id_t      __po_hi_gqueues_nb_ports[__PO_HI_NB_TASKS];
__po_hi_port_id_t*     __po_hi_gqueues_sizes[__PO_HI_NB_TASKS];
__po_hi_port_id_t*     __po_hi_gqueues_used_size[__PO_HI_NB_TASKS];
__po_hi_port_id_t*     __po_hi_gqueues_offsets[__PO_HI_NB_TASKS];
__po_hi_port_id_t*     __po_hi_gqueues_woffsets[__PO_HI_NB_TASKS];
__po_hi_port_id_t*     __po_hi_gqueues_n_destinations[__PO_HI_NB_TASKS];
59
__po_hi_port_t**       __po_hi_gqueues_destinations[__PO_HI_NB_TASKS];
60
__po_hi_uint32_t       __po_hi_gqueues_total_fifo_size[__PO_HI_NB_TASKS];
61
__po_hi_request_t*     __po_hi_gqueues_most_recent_values[__PO_HI_NB_TASKS];
62
__po_hi_port_id_t*     __po_hi_gqueues_first[__PO_HI_NB_TASKS];
63

64
65
66
67
__po_hi_port_id_t       __po_hi_gqueues_global_size[__PO_HI_NB_TASKS];
__po_hi_local_port_t*   __po_hi_gqueues_global_history[__PO_HI_NB_TASKS];
__po_hi_uint32_t        __po_hi_gqueues_global_history_offset[__PO_HI_NB_TASKS];
__po_hi_uint32_t        __po_hi_gqueues_global_history_woffset[__PO_HI_NB_TASKS];
68

69
70
71
__po_hi_port_id_t*       __po_hi_gqueues_port_is_empty[__PO_HI_NB_TASKS];
__po_hi_port_id_t        __po_hi_gqueues_queue_is_empty[__PO_HI_NB_TASKS];
__po_hi_port_id_t        __po_hi_gqueues_n_empty[__PO_HI_NB_TASKS];
72

73
74
75
76

__po_hi_sem_t            __po_hi_gqueues_semaphores[__PO_HI_NB_TASKS];


77

78
void __po_hi_gqueue_init (__po_hi_task_id       id,
79
                          __po_hi_port_id_t     nb_ports,
80
                          __po_hi_port_t        queue[],
81
82
83
84
85
                          __po_hi_port_id_t     sizes[],
                          __po_hi_port_id_t     first[],
                          __po_hi_port_id_t     offsets[],
                          __po_hi_port_id_t     woffsets[],
                          __po_hi_port_id_t     n_dest[],
86
                          __po_hi_port_t*       destinations[],
87
                          __po_hi_port_id_t     used_size[],
88
89
                          __po_hi_local_port_t  history[],
                          __po_hi_request_t     recent[],
90
91
                          __po_hi_port_id_t     empties[],
                          __po_hi_uint32_t      total_fifo_size)
92
{
93
94
   __po_hi_port_id_t      tmp;
   __po_hi_uint32_t     off; /* XXX May overflow for large value .. */
95

96
97
   __po_hi_gqueues_global_history_woffset[id] = 0;
   __po_hi_gqueues_global_history_offset[id] = 0;
98

99
100
101
102
103
   __po_hi_gqueues_n_empty[id] = nb_ports;
   __po_hi_gqueues[id] = queue;
   __po_hi_gqueues_most_recent_values[id] = recent;
   __po_hi_gqueues_global_history[id] = history;
   __po_hi_gqueues_woffsets[id] = woffsets;
104

105
   __po_hi_gqueues_port_is_empty[id] = empties;
106

yoogx's avatar
yoogx committed
107
   __po_hi_gqueues_nb_ports[id] = nb_ports;
108
109
110
   __po_hi_gqueues_sizes[id] = sizes;
   __po_hi_gqueues_first[id] = first;
   __po_hi_gqueues_used_size[id] = used_size;
111

jdelange's avatar
jdelange committed
112
113
114
115
   __po_hi_gqueues_offsets[id]            = offsets;
   __po_hi_gqueues_n_destinations[id]     = n_dest;
   __po_hi_gqueues_destinations[id]       = destinations;
   __po_hi_gqueues_total_fifo_size[id]    = total_fifo_size;
116

117
   __po_hi_gqueues_queue_is_empty[id] = 1;
118
119


Antonia Francis's avatar
Antonia Francis committed
120
  /* Using the semaphore API to initialize the semaphore_gqueue array */
121
122
123
  int res = __po_hi_sem_init_gqueue(__po_hi_gqueues_semaphores,id);
  __DEBUGMSG("GQUEUE_SEM_INIT %d %d\n", id, res);
  assert(res == __PO_HI_SUCCESS);
124

jdelange's avatar
jdelange committed
125

126
127
128
   off = 0;
   for (tmp=0;tmp<nb_ports;tmp++)
   {
129
      __po_hi_gqueues_used_size[id][tmp] = 0;
130

yoogx's avatar
yoogx committed
131
      if ( (sizes[tmp] != __PO_HI_GQUEUE_FIFO_INDATA)
132
133
134
135
136
137
138
139
            && (sizes[tmp] != __PO_HI_GQUEUE_FIFO_OUT))
      {
         __po_hi_gqueues_first[id][tmp]=off;
         off += __po_hi_gqueues_sizes[id][tmp];
         __po_hi_gqueues_offsets[id][tmp] = 0;
         __po_hi_gqueues_woffsets[id][tmp] = 0;
         __po_hi_gqueues_port_is_empty[id][tmp] = 1;
      }
140

141
      /* Set invalid all recent values */
142
      __po_hi_request_t* request = (__po_hi_request_t*)&__po_hi_gqueues_most_recent_values[id][tmp];
143
      request->port = __PO_HI_GQUEUE_INVALID_PORT;
144
   }
145
146

#ifdef __PO_HI_DEBUG
147
148
149
   __DEBUGMSG("Initialize global queue for task-id %d ... ", id);
   for (tmp=0;tmp<nb_ports;tmp++)
   {
yoogx's avatar
yoogx committed
150
151
      __DEBUGMSG("port %d (used_size=%d,first=%d) ",
            tmp,
152
153
154
155
            __po_hi_gqueues_used_size[id][tmp],
            __po_hi_gqueues_first[id][tmp]);
   }
   __DEBUGMSG(" ... done\n");
yoogx's avatar
yoogx committed
156
#endif
157
158
}

159

yoogx's avatar
yoogx committed
160
161
void __po_hi_gqueue_store_out (__po_hi_task_id id,
                               __po_hi_local_port_t port,
162
                               __po_hi_request_t* request)
163
{
164
   __po_hi_request_t* ptr;
165

166
167
   request->port = __PO_HI_GQUEUE_OUT_PORT;
   ptr = &__po_hi_gqueues_most_recent_values[id][port];
168
   memcpy (ptr, request, sizeof (__po_hi_request_t));
julien.delange's avatar
julien.delange committed
169
   __PO_HI_DEBUG_DEBUG ("__po_hi_gqueue_store_out() from task %d on port %d\n", id, port);
yoogx's avatar
yoogx committed
170
171
172
173
174

#if defined (MONITORING)
   record_event(ANY, STORE_OUT, id, invalid_port_t, invalid_port_t, port, invalid_local_port_t, request);
#endif

175
176
}

177
__po_hi_port_id_t __po_hi_gqueue_store_in (__po_hi_task_id id,
yoogx's avatar
yoogx committed
178
                                         __po_hi_local_port_t port,
179
                                         __po_hi_request_t* request)
180
{
181
   __po_hi_request_t* ptr;
182
   __po_hi_request_t* tmp;
183

184

185
   ptr = &__po_hi_gqueues_most_recent_values[id][port];
186
#ifdef __PO_HI_DEBUG
187
188
   if (ptr == NULL)
   {
189
      __DEBUGMSG ("__po_hi_gqueue_store_in : NULL POINTER\n");
190
   }
191
192
#endif

Antonia Francis's avatar
Antonia Francis committed
193
194

  /* Locking only a mutex */
195
  int result = __po_hi_sem_mutex_wait_gqueue(__po_hi_gqueues_semaphores,id);
Antonia Francis's avatar
Antonia Francis committed
196
  __DEBUGMSG("GQUEUE_SEM_MUTEX_WAIT on task %d result = %d\n", id, result);
197
  assert(result == __PO_HI_SUCCESS);
yoogx's avatar
yoogx committed
198

Antonia Francis's avatar
Antonia Francis committed
199

200
   if (__po_hi_gqueue_get_port_size(id,port) == __PO_HI_GQUEUE_FIFO_INDATA)
201
   {
yoogx's avatar
yoogx committed
202
     memcpy(ptr,request,sizeof(*request));
203
204
205
   }
   else
   {
yoogx's avatar
yoogx committed
206
207
     __DEBUGMSG ("[GQUEUE] Received  message for task %d, port %d\n", id, port);

208
      if (__po_hi_gqueue_used_size(id,port) == __po_hi_gqueue_get_port_size(id,port))
209
      {
Antonia Francis's avatar
Antonia Francis committed
210
        /* Releasing only a mutex */
211
212
213
        int res = __po_hi_sem_mutex_release_gqueue(__po_hi_gqueues_semaphores,id);
        __DEBUGMSG("GQUEUE_SEM_MTUEX_RELEASE %d %d\n", id, res);
        assert(res == __PO_HI_SUCCESS);
yoogx's avatar
yoogx committed
214

Antonia Francis's avatar
Antonia Francis committed
215

yoogx's avatar
yoogx committed
216
        __PO_HI_DEBUG_CRITICAL ("[GQUEUE] QUEUE FULL, task-id=%d, port=%d\n", id, port);
jdelange's avatar
jdelange committed
217

yoogx's avatar
yoogx committed
218
219
        __DEBUGMSG ("[GQUEUE] Semaphore released (id=%d)\n", id);
        return __PO_HI_ERROR_QUEUE_FULL;
220
      }
221

222
      __po_hi_uint32_t   size;
223
224
      tmp = (__po_hi_request_t*) &__po_hi_gqueues[id][port];
      size = __po_hi_gqueues_woffsets[id][port] + __po_hi_gqueues_first[id][port];
225

jdelange's avatar
jdelange committed
226
      tmp = tmp + size;
227
228

      memcpy (tmp , request, sizeof (__po_hi_request_t));
jdelange's avatar
jdelange committed
229

230
231
232
      __po_hi_gqueues_woffsets[id][port] =  (__po_hi_gqueues_woffsets[id][port] + 1 ) % __po_hi_gqueues_sizes[id][port];

      __po_hi_gqueues_used_size[id][port]++;
233
      __PO_HI_INSTRUMENTATION_VCD_WRITE("r%d p%d.%d\n", __po_hi_gqueue_used_size(id,port), id, port);
234

235
236
237
238
      __po_hi_gqueues_global_history[id][__po_hi_gqueues_global_history_woffset[id]] = port;
      __po_hi_gqueues_global_history_woffset[id] = (__po_hi_gqueues_global_history_woffset[id] + 1 ) % __po_hi_gqueues_total_fifo_size[id];

      if (__po_hi_gqueues_port_is_empty[id][port] == 1)
239
240
241
242
      {
         __po_hi_gqueues_port_is_empty[id][port] = 0;
         __po_hi_gqueues_n_empty[id]--;
      }
243
      __po_hi_gqueues_queue_is_empty[id] = 0;
244
   }
Antonia Francis's avatar
Antonia Francis committed
245
  /* Releasing a complete semaphore */
246
247
248
  int rel = __po_hi_sem_release_gqueue(__po_hi_gqueues_semaphores,id);
  __DEBUGMSG("GQUEUE_SEM_RELEASE %d %d\n", id, rel);
  assert(rel == __PO_HI_SUCCESS);
249

250
251
  __DEBUGMSG ("[GQUEUE] store_in completed\n");
  return __PO_HI_SUCCESS;
252
253
}

254

yoogx's avatar
yoogx committed
255
void __po_hi_gqueue_wait_for_incoming_event (__po_hi_task_id id,
256
                                             __po_hi_local_port_t* port)
257
{
Antonia Francis's avatar
Antonia Francis committed
258
  /* Locking only the mutex of the semaphore */
259
260
261
  int result = __po_hi_sem_mutex_wait_gqueue(__po_hi_gqueues_semaphores,id);
  __DEBUGMSG("GQUEUE_SEM_MUTEX_WAIT %d %d\n", id, result);
  assert(result == __PO_HI_SUCCESS);
262

263
  while(po_hi_gqueues_queue_is_empty(id) == 1)
yoogx's avatar
yoogx committed
264
265
    {
      __PO_HI_INSTRUMENTATION_VCD_WRITE("0t%d\n", id);
266

Antonia Francis's avatar
Antonia Francis committed
267
    /* Telling the semaphore to wait with putting its condvar on wait mode */
268
    int res_sem =  __po_hi_sem_wait_gqueue(__po_hi_gqueues_semaphores,id);
Antonia Francis's avatar
Antonia Francis committed
269
    __DEBUGMSG("GQUEUE_SEM_WAIT %d %d\n", id, res_sem);
270
    assert(res_sem == __PO_HI_SUCCESS);
Antonia Francis's avatar
Antonia Francis committed
271
    __PO_HI_INSTRUMENTATION_VCD_WRITE("1t%d\n", id);
yoogx's avatar
yoogx committed
272
    }
jdelange's avatar
jdelange committed
273

yoogx's avatar
yoogx committed
274
  *port = __po_hi_gqueues_global_history[id][__po_hi_gqueues_global_history_offset[id]];
jdelange's avatar
jdelange committed
275

yoogx's avatar
yoogx committed
276
277
#if defined (MONITORING)
  record_event(SPORADIC, WAIT_FOR, id, invalid_port_t, invalid_port_t, *port, invalid_local_port_t, NULL);
Antonia Francis's avatar
Antonia Francis committed
278
279
#endif
  
yoogx's avatar
yoogx committed
280
  /** Releasing only the mutex of the semaphore*/
281

282
283
284
  int res = __po_hi_sem_mutex_release_gqueue(__po_hi_gqueues_semaphores,id);
  __DEBUGMSG("GQUEUE_SEM_MTUEX_RELEASE %d %d\n", id, res);
  assert(res == __PO_HI_SUCCESS);
285

286
287
}

288

289
290
int __po_hi_gqueue_get_count( __po_hi_task_id id, __po_hi_local_port_t port)
{
291
   if (__po_hi_gqueue_get_port_size(id,port) == __PO_HI_GQUEUE_FIFO_INDATA)
292
   {
293
      return 1; /* data port are always of size 1 */
294
295
296
   }
   else
   {
297
      return (__po_hi_gqueue_used_size(id,port));
298
   }
299
300
}

yoogx's avatar
yoogx committed
301
302
int __po_hi_gqueue_get_value (__po_hi_task_id      id,
                              __po_hi_local_port_t port,
303
                              __po_hi_request_t*   request)
304
{
305
   __po_hi_request_t* ptr;
306
#ifdef __PO_HI_RTEMS_CLASSIC_API
307
308
309
   rtems_status_code ret;
#endif

jdelange's avatar
jdelange committed
310
311
312
313
#ifdef _WIN32
   DWORD ret;
#endif

Antonia Francis's avatar
Antonia Francis committed
314

315
   ptr = &__po_hi_gqueues_most_recent_values[id][port];
316

Antonia Francis's avatar
Antonia Francis committed
317
 /* Locking only the mutex of the semaphore */
318
319
320
  int result = __po_hi_sem_mutex_wait_gqueue(__po_hi_gqueues_semaphores,id);
  __DEBUGMSG("GQUEUE_SEM_MUTEX_WAIT %d %d\n", id, result);
  assert(result == __PO_HI_SUCCESS);
321

322
323
324
325
326
327
328
329
330
   /*
    * If the port is an OUTPUT, with no value queued, the function returns
    * nothing.
    */
   if (__po_hi_gqueue_get_port_size(id,port) == 0)
   {
    __DEBUGMSG("THE PORT IS AN OUTPUT, REQUEST NOT SET UP");
    return 0;
   }
331
332
333
334
   /*
    * If the port is an event port, with no value queued, then we block
    * the thread.
    */
335
   if (__po_hi_gqueue_get_port_size(id,port) != __PO_HI_GQUEUE_FIFO_INDATA)
336
   {
337
      while (__po_hi_gqueues_port_is_empty[id][port] == 1)
338
      {
Antonia Francis's avatar
Antonia Francis committed
339
        /* Telling the semaphore to wait with putting its condvar on wait mode */
340
341
342
        int res_sem =  __po_hi_sem_wait_gqueue(__po_hi_gqueues_semaphores,id);
        __DEBUGMSG("GQUEUE_SEM_WAIT %d %d\n", id, result);
        assert(res_sem == __PO_HI_SUCCESS);
343
344
      }
   }
345
346
347
348
   /*
    * The request is set up
    */
   if (__po_hi_gqueue_used_size(id,port) == 0)
349
   {
350
      memcpy (request, ptr, sizeof (__po_hi_request_t));
351
      //update_runtime (id, port, ptr);
352
353
354
   }
   else
   {
yoogx's avatar
yoogx committed
355
      ptr = ((__po_hi_request_t *) &__po_hi_gqueues[id][port]) +  __po_hi_gqueues_first[id][port] + __po_hi_gqueues_offsets[id][port];
356
      memcpy (request, ptr, sizeof (__po_hi_request_t));
357
   }
yoogx's avatar
yoogx committed
358

359
360
361
362
#if defined (MONITORING)
   record_event(ANY, GET_VALUE, id, invalid_port_t, invalid_port_t, port, invalid_local_port_t , request);
#endif

julien.delange's avatar
julien.delange committed
363
   __PO_HI_DEBUG_INFO ("[GQUEUE] Task %d get a value on port %d\n", id, port);
364

Antonia Francis's avatar
Antonia Francis committed
365
/* Releasing only the mutex of the semaphore*/
366
367
368
  int res = __po_hi_sem_mutex_release_gqueue(__po_hi_gqueues_semaphores,id);
  __DEBUGMSG("GQUEUE_SEM_MUTEX_RELEASE %d %d\n", id, res);
  assert(res == __PO_HI_SUCCESS);
369

370
   return 0;
371
372
}

373
int __po_hi_gqueue_next_value (__po_hi_task_id id, __po_hi_local_port_t port)
374
{
375
#ifdef __PO_HI_RTEMS_CLASSIC_API
376
377
378
   rtems_status_code ret;
#endif

379
380
   /* incomplete semantics, should discriminate and report whether
      there is a next value or not */
381

382
   /* XXX change and use assert ? */
383
   if (__po_hi_gqueue_get_port_size(id,port) == __PO_HI_GQUEUE_FIFO_INDATA)
384
   {
385
      return 1;
386
   }
387

Antonia Francis's avatar
Antonia Francis committed
388
 /* Locking a mutex */
389
390
391
  int result = __po_hi_sem_mutex_wait_gqueue(__po_hi_gqueues_semaphores,id);
  __DEBUGMSG("GQUEUE_SEM_MUTEX_WAIT %d %d\n", id, result);
  assert(result == __PO_HI_SUCCESS);
392

yoogx's avatar
yoogx committed
393
394
   __po_hi_gqueues_offsets[id][port] =
      (__po_hi_gqueues_offsets[id][port] + 1)
395
      % __po_hi_gqueues_sizes[id][port];
396

397
   __po_hi_gqueues_used_size[id][port]--;
398

399
   __PO_HI_INSTRUMENTATION_VCD_WRITE("r%d p%d.%d\n", __po_hi_gqueue_used_size(id,port), id, port);
400

401
   if (__po_hi_gqueue_used_size(id,port) == 0)
402
   {
403
404
      __po_hi_gqueues_n_empty[id]++;
      __po_hi_gqueues_port_is_empty[id][port] = 1;
405
406
407
408
   }

   if (__po_hi_gqueues_n_empty[id] == __po_hi_gqueues_nb_ports[id])
   {
409
      __po_hi_gqueues_queue_is_empty[id] = 1;
410
   }
411

yoogx's avatar
yoogx committed
412
413
   __po_hi_gqueues_global_history_offset[id] =
      (__po_hi_gqueues_global_history_offset[id] + 1)
414
      % __po_hi_gqueues_total_fifo_size[id];
415

416

Antonia Francis's avatar
Antonia Francis committed
417
/* Releasing a mutex*/
418
419
420
421
422
  int res = __po_hi_sem_mutex_release_gqueue(__po_hi_gqueues_semaphores,id);
  __DEBUGMSG("GQUEUE_SEM_MUTEX_RELEASE %d %d\n", id, res);
  assert(res == __PO_HI_SUCCESS);

  return __PO_HI_SUCCESS;
423
}
424

425
__po_hi_request_t*  __po_hi_gqueue_get_most_recent_value (const __po_hi_task_id task_id, const __po_hi_local_port_t local_port)
426
{
427
  return (&__po_hi_gqueues_most_recent_values[task_id][local_port]);
428
429
}

430
__po_hi_port_id_t __po_hi_gqueue_get_destinations_number (const __po_hi_task_id task_id, const __po_hi_local_port_t local_port)
431
{
432
  return (__po_hi_gqueues_n_destinations[task_id][local_port]);
433
434
435
436
}

__po_hi_port_t __po_hi_gqueue_get_destination (const __po_hi_task_id task_id, const __po_hi_local_port_t local_port, const uint8_t destination_number)
{
437
  return (__po_hi_gqueues_destinations[task_id][local_port][destination_number]);
438
}
439

440
__po_hi_port_id_t __po_hi_gqueue_get_port_size( __po_hi_task_id id, __po_hi_local_port_t port)
441
442
443
444
{
   return __po_hi_gqueues_sizes[id][port];
}

445
__po_hi_port_id_t __po_hi_gqueue_used_size( __po_hi_task_id id, __po_hi_local_port_t port)
446
447
448
449
{
   return __po_hi_gqueues_used_size[id][port];
}

450
__po_hi_port_id_t po_hi_gqueues_queue_is_empty( __po_hi_task_id id)
451
452
453
{
   return __po_hi_gqueues_queue_is_empty[id];
}