po_hi_gqueue.c 19.8 KB
Newer Older
1
2
3
4
5
6
7
/*
 * This is a part of PolyORB-HI-C distribution, a minimal
 * middleware written for generated code from AADL models.
 * You should use it with the Ocarina toolsuite.
 *
 * For more informations, please visit http://ocarina.enst.fr
 *
8
 * Copyright (C) 2010-2011, European Space Agency (ESA).
9
10
11
12
13
14
15
16
17
 */

#include <po_hi_config.h>
#include <po_hi_types.h>
#include <po_hi_messages.h>
#include <po_hi_returns.h>
#include <po_hi_transport.h>
#include <po_hi_debug.h>
#include <po_hi_gqueue.h>
18
#include <po_hi_utils.h>
19
20
21
22
23
24
25
26
/* Headers from PolyORB-HI-C */

#include <deployment.h>
#include <activity.h>
#include <request.h>
/* Headers from the generated code */

#include <string.h>
27

28
#if defined (POSIX) || defined (RTEMS_POSIX) || defined (XENO_POSIX)
29
#include <pthread.h>
30
31
32
33
34
#elif defined(RTEMS_PURE)
#include <rtems.h>
#include <inttypes.h>
#include <po_hi_time.h>
#define __PO_HI_DEFAULT_PRIORITY RTEMS_NO_PRIORITY
35
36
37
#elif defined (XENO_NATIVE)
#include <native/cond.h>
#include <native/mutex.h>
38
39
40
#endif


41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65

#define __PO_HI_GQUEUE_OUT_PORT constant_out_identifier 
/* give a default value to the out port */

__po_hi_port_t*        __po_hi_gqueues[__PO_HI_NB_TASKS];
__po_hi_int8_t         __po_hi_gqueues_nb_ports[__PO_HI_NB_TASKS];
__po_hi_int8_t*        __po_hi_gqueues_sizes[__PO_HI_NB_TASKS];
__po_hi_uint8_t*       __po_hi_gqueues_used_size[__PO_HI_NB_TASKS];
__po_hi_uint8_t*       __po_hi_gqueues_offsets[__PO_HI_NB_TASKS];
__po_hi_uint8_t*       __po_hi_gqueues_woffsets[__PO_HI_NB_TASKS];
__po_hi_uint8_t*       __po_hi_gqueues_n_destinations[__PO_HI_NB_TASKS];
__po_hi_port_t**       __po_hi_gqueues_destinations[__PO_HI_NB_TASKS];
__po_hi_uint16_t       __po_hi_gqueues_total_fifo_size[__PO_HI_NB_TASKS];
__po_hi_request_t*     __po_hi_gqueues_most_recent_values[__PO_HI_NB_TASKS];
__po_hi_uint8_t*       __po_hi_gqueues_first[__PO_HI_NB_TASKS];

__po_hi_uint8_t        __po_hi_gqueues_global_size[__PO_HI_NB_TASKS];
__po_hi_local_port_t*  __po_hi_gqueues_global_history[__PO_HI_NB_TASKS];
__po_hi_uint16_t       __po_hi_gqueues_global_history_offset[__PO_HI_NB_TASKS];
__po_hi_uint16_t       __po_hi_gqueues_global_history_woffset[__PO_HI_NB_TASKS];

__po_hi_uint8_t*       __po_hi_gqueues_port_is_empty[__PO_HI_NB_TASKS];
__po_hi_uint8_t        __po_hi_gqueues_queue_is_empty[__PO_HI_NB_TASKS];
__po_hi_uint8_t        __po_hi_gqueues_n_empty[__PO_HI_NB_TASKS];

66
#if defined (RTEMS_POSIX) || defined (POSIX) || defined (XENO_POSIX)
67
68
69
70
pthread_mutex_t        __po_hi_gqueues_mutexes[__PO_HI_NB_TASKS];
pthread_cond_t         __po_hi_gqueues_conds[__PO_HI_NB_TASKS];
pthread_mutexattr_t    __po_hi_gqueues_mutexes_attr[__PO_HI_NB_TASKS];
pthread_condattr_t     __po_hi_gqueues_conds_attr[__PO_HI_NB_TASKS];
71
#elif defined (RTEMS_PURE)
72
73
rtems_id                __po_hi_gqueues_semaphores[__PO_HI_NB_TASKS];
rtems_id                __po_hi_gqueues_barriers[__PO_HI_NB_TASKS];
74
75
76
#elif defined (XENO_NATIVE)
RT_MUTEX                __po_hi_gqueues_mutexes[__PO_HI_NB_TASKS];
RT_COND                 __po_hi_gqueues_conds[__PO_HI_NB_TASKS];
77
#endif
78

79
80
81
82
83
84
85
86
87
88
89
90
91
92
void __po_hi_gqueue_init (__po_hi_task_id       id,
                          __po_hi_uint8_t       nb_ports,
                          __po_hi_port_t        queue[],
                          __po_hi_int8_t        sizes[],
                          __po_hi_uint8_t       first[],
                          __po_hi_uint8_t       offsets[],
                          __po_hi_uint8_t       woffsets[],
                          __po_hi_uint8_t       n_dest[],
                          __po_hi_port_t*       destinations[],
                          __po_hi_uint8_t       used_size[],
                          __po_hi_local_port_t  history[],
                          __po_hi_request_t     recent[],
                          __po_hi_uint8_t       empties[],
                          __po_hi_uint16_t      total_fifo_size)
93
{
94
95
96
97
   __po_hi_uint8_t      tmp;
   __po_hi_uint16_t     off;
   __po_hi_request_t*   request;

98
#if defined (RTEMS_PURE)
99
   rtems_status_code    ret;
100
101
#elif defined (XENO_NATIVE)
   int                  ret;
102
#endif
103

104
105
   __po_hi_gqueues_global_history_woffset[id] = 0;
   __po_hi_gqueues_global_history_offset[id] = 0;
106

107
108
109
110
111
   __po_hi_gqueues_n_empty[id] = nb_ports;
   __po_hi_gqueues[id] = queue;
   __po_hi_gqueues_most_recent_values[id] = recent;
   __po_hi_gqueues_global_history[id] = history;
   __po_hi_gqueues_woffsets[id] = woffsets;
112

113
   __po_hi_gqueues_port_is_empty[id] = empties;
114

115
116
117
118
   __po_hi_gqueues_nb_ports[id] = nb_ports; 
   __po_hi_gqueues_sizes[id] = sizes;
   __po_hi_gqueues_first[id] = first;
   __po_hi_gqueues_used_size[id] = used_size;
119

120
121
122
123
   __po_hi_gqueues_offsets[id] = offsets;
   __po_hi_gqueues_n_destinations[id] = n_dest;
   __po_hi_gqueues_destinations[id] = destinations;
   __po_hi_gqueues_total_fifo_size[id] = total_fifo_size;
124

125
   __po_hi_gqueues_queue_is_empty[id] = 1;
126

127
#if defined (RTEMS_POSIX) || defined (POSIX) || defined (XENO_POSIX)
128
129
   pthread_mutexattr_init (&__po_hi_gqueues_mutexes_attr[id]);
   pthread_condattr_init (&__po_hi_gqueues_conds_attr[id]);
130
#if defined (POSIX) || defined (XENO_POSIX)
131
   pthread_mutexattr_setpshared(&__po_hi_gqueues_mutexes_attr[id],PTHREAD_PROCESS_SHARED); 
132
#endif
133
134
   pthread_mutex_init (&__po_hi_gqueues_mutexes[id], &__po_hi_gqueues_mutexes_attr[id]);
   pthread_cond_init (&__po_hi_gqueues_conds[id], &__po_hi_gqueues_conds_attr[id]);
135
136
137
#endif

#ifdef RTEMS_PURE
julien.delange's avatar
julien.delange committed
138
   __PO_HI_DEBUG_INFO ("[GQUEUE] Create semaphore for queue of task %d\n", id);
139
140
141
   ret = rtems_semaphore_create (rtems_build_name ('G', 'S', 'E' , 'A' + (char) id), 1, RTEMS_BINARY_SEMAPHORE, __PO_HI_DEFAULT_PRIORITY, &(__po_hi_gqueues_semaphores[id]));
   if (ret != RTEMS_SUCCESSFUL)
   {
julien.delange's avatar
julien.delange committed
142
      __PO_HI_DEBUG_WARNING ("[GQUEUE] Cannot create semaphore, error code=%d\n", ret);
143
144
   }

julien.delange's avatar
julien.delange committed
145
   __PO_HI_DEBUG_INFO ("[GQUEUE] Create barrier for queue of task %d\n", id);
146
147
148
   ret = rtems_barrier_create (rtems_build_name ('G', 'S', 'I' , 'A' + (char) id),RTEMS_BARRIER_AUTOMATIC_RELEASE , 10, &(__po_hi_gqueues_barriers[id]));
   if (ret != RTEMS_SUCCESSFUL)
   {
julien.delange's avatar
julien.delange committed
149
      __PO_HI_DEBUG_WARNING ("[GQUEUE] Cannot create barrier, error code=%d\n", ret);
150
151
   }
#endif
152

153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#ifdef XENO_NATIVE
   ret = rt_mutex_create (&__po_hi_gqueues_mutexes[id], NULL);

   if (ret != 0)
   {
      __PO_HI_DEBUG_WARNING ("[GQUEUE] Cannot create mutex code=%d\n", ret);
   }

   ret = rt_cond_create (&__po_hi_gqueues_conds[id], NULL);

   if (ret != 0)
   {
      __PO_HI_DEBUG_WARNING ("[GQUEUE] Cannot create cond code=%d\n", ret);
   }
#endif

169
   off = 0;
170

171
172
   for (tmp=0;tmp<nb_ports;tmp++)
   {
173
      __po_hi_gqueues_used_size[id][tmp] = 0;
174

175
      if ( (sizes[tmp] != __PO_HI_GQUEUE_FIFO_INDATA) 
176
177
178
179
180
181
182
183
            && (sizes[tmp] != __PO_HI_GQUEUE_FIFO_OUT))
      {
         __po_hi_gqueues_first[id][tmp]=off;
         off += __po_hi_gqueues_sizes[id][tmp];
         __po_hi_gqueues_offsets[id][tmp] = 0;
         __po_hi_gqueues_woffsets[id][tmp] = 0;
         __po_hi_gqueues_port_is_empty[id][tmp] = 1;
      }
184
185
186
187

      /* Set invalid all recent values */
      request = (__po_hi_request_t*)&__po_hi_gqueues_most_recent_values[id][tmp];
      request->port = __PO_HI_GQUEUE_INVALID_PORT;
188
   }
189
190

#ifdef __PO_HI_DEBUG
191
192
193
   __DEBUGMSG("Initialize global queue for task-id %d ... ", id);
   for (tmp=0;tmp<nb_ports;tmp++)
   {
194
      __DEBUGMSG("port %d (used_size=%d,first=%d) ", 
195
196
197
198
199
            tmp, 
            __po_hi_gqueues_used_size[id][tmp],
            __po_hi_gqueues_first[id][tmp]);
   }
   __DEBUGMSG(" ... done\n");
200
201
202
203
204
#endif 
}


void __po_hi_gqueue_store_out (__po_hi_task_id id, 
205
206
                               __po_hi_local_port_t port, 
                               __po_hi_request_t* request)
207
{
208
   __po_hi_request_t* ptr;
209

210
211
   request->port = __PO_HI_GQUEUE_OUT_PORT;
   ptr = &__po_hi_gqueues_most_recent_values[id][port];
212
   memcpy (ptr, request, sizeof (__po_hi_request_t));
julien.delange's avatar
julien.delange committed
213
   __PO_HI_DEBUG_DEBUG ("__po_hi_gqueue_store_out() from task %d on port %d\n", id, port);
214
215
}

216
217


218
__po_hi_uint8_t __po_hi_gqueue_store_in (__po_hi_task_id id, 
219
220
                                         __po_hi_local_port_t port, 
                                         __po_hi_request_t* request)
221
{
222
   __po_hi_request_t* ptr;
223
224
   __po_hi_request_t* tmp;
   __po_hi_uint32_t   size;
225
226
227
#ifdef RTEMS_PURE
   rtems_status_code ret;
#endif
228
   ptr = &__po_hi_gqueues_most_recent_values[id][port];
229
#ifdef __PO_HI_DEBUG
230
231
   if (ptr == NULL)
   {
232
      __DEBUGMSG ("__po_hi_gqueue_store_in : NULL POINTER\n");
233
   }
234
235
#endif

236
#if defined (POSIX) || defined (RTEMS_POSIX) || defined (XENO_POSIX)
237
   pthread_mutex_lock (&__po_hi_gqueues_mutexes[id]);
238
239
#elif defined (XENO_NATIVE)
   rt_mutex_acquire (&__po_hi_gqueues_mutexes[id], TM_INFINITE);
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
#elif defined (RTEMS_PURE)
   /*
  ret = rtems_barrier_wait (__po_hi_gqueues_barriers[id], RTEMS_WAIT);

rtems_id                __po_hi_gqueues_semaphores[__PO_HI_NB_TASKS];
rtems_id                __po_hi_gqueues_barriers[__PO_HI_NB_TASKS];
  */
   __DEBUGMSG ("[GQUEUE] Try to obtain semaphore for queue of task %d\n", id);
   ret = rtems_semaphore_obtain (__po_hi_gqueues_semaphores[id], RTEMS_WAIT, RTEMS_NO_TIMEOUT);
   if (ret != RTEMS_SUCCESSFUL)
   {
      __DEBUGMSG ("[GQUEUE] Cannot obtain semaphore in __po_hi_gqueue_store_in()\n");
   }

   __DEBUGMSG ("[GQUEUE] Semaphore got\n");
#endif
256
257
   if (__po_hi_gqueues_sizes[id][port] == __PO_HI_GQUEUE_FIFO_INDATA)
   {
258
      memcpy(ptr,request,sizeof(*request));
259
260
261
   }
   else
   {
262
#ifdef __PO_HI_DEBUG
263
      __DEBUGMSG ("[GQUEUE] Received  message for task %d, port %d\n", id, port);
264
265
#endif
      if (__po_hi_gqueues_used_size[id][port] == __po_hi_gqueues_sizes[id][port])
266
      {
267
#if defined (POSIX) || defined (RTEMS_POSIX) || defined (XENO_POSIX)
268
         pthread_mutex_unlock (&__po_hi_gqueues_mutexes[id]);
269
270
#elif defined (XENO_NATIVE)
   rt_mutex_release (&__po_hi_gqueues_mutexes[id]);
271
272
273
274
#elif defined (RTEMS_PURE)
         ret = rtems_semaphore_release (__po_hi_gqueues_semaphores[id]);
         if (ret != RTEMS_SUCCESSFUL)
         {
julien.delange's avatar
julien.delange committed
275
            __PO_HI_DEBUG_CRITICAL ("[GQUEUE] Cannot release semaphore in __po_hi_gqueue_store_in()\n");
276
277
         }
#endif
julien.delange's avatar
julien.delange committed
278
         __PO_HI_DEBUG_CRITICAL ("[GQUEUE] QUEUE FULL, task-id=%d, port=%d", id, port);
279
280
         return __PO_HI_ERROR_QUEUE_FULL;
      }
281

282
283
      tmp = (__po_hi_request_t*) &__po_hi_gqueues[id][port];
      size = __po_hi_gqueues_woffsets[id][port] + __po_hi_gqueues_first[id][port];
284

jdelange's avatar
jdelange committed
285
      tmp = tmp + size;
286
287

      memcpy (tmp , request, sizeof (__po_hi_request_t));
jdelange's avatar
jdelange committed
288

289
290
291
292
      __po_hi_gqueues_woffsets[id][port] =  (__po_hi_gqueues_woffsets[id][port] + 1 ) % __po_hi_gqueues_sizes[id][port];

      __po_hi_gqueues_used_size[id][port]++;

293
294
      __PO_HI_INSTRUMENTATION_VCD_WRITE("r%d p%d.%d\n", __po_hi_gqueues_used_size[id][port], id, port); 

295
296
297
298
      __po_hi_gqueues_global_history[id][__po_hi_gqueues_global_history_woffset[id]] = port;
      __po_hi_gqueues_global_history_woffset[id] = (__po_hi_gqueues_global_history_woffset[id] + 1 ) % __po_hi_gqueues_total_fifo_size[id];

      if (__po_hi_gqueues_port_is_empty[id][port] == 1)
299
300
301
302
      {
         __po_hi_gqueues_port_is_empty[id][port] = 0;
         __po_hi_gqueues_n_empty[id]--;
      }
303
      __po_hi_gqueues_queue_is_empty[id] = 0;
304
   }
305

306
#if defined (POSIX) || defined (RTEMS_POSIX) || defined (XENO_POSIX)
307
308
   pthread_mutex_unlock (&__po_hi_gqueues_mutexes[id]);
   pthread_cond_broadcast (&__po_hi_gqueues_conds[id]);
309
310
311
#elif defined (XENO_NATIVE)
   rt_mutex_release (&__po_hi_gqueues_mutexes[id]);
   rt_cond_broadcast (&__po_hi_gqueues_conds[id]);
312
313
314
315
#elif defined (RTEMS_PURE)
   ret = rtems_semaphore_release (__po_hi_gqueues_semaphores[id]);
   if (ret != RTEMS_SUCCESSFUL)
   {
julien.delange's avatar
julien.delange committed
316
      __PO_HI_DEBUG_CRITICAL ("[GQUEUE] Cannot release semaphore in __po_hi_gqueue_store_in()\n");
317
318
   }
#endif
319

320
   return __PO_HI_SUCCESS;
321
322
}

323
324
void __po_hi_gqueue_wait_for_incoming_event (__po_hi_task_id id, 
                                             __po_hi_local_port_t* port)
325
{
326
327
328
329
330
#ifdef RTEMS_PURE
   rtems_status_code ret;
#endif


331
#if defined (POSIX) || defined (RTEMS_POSIX) || defined (XENO_POSIX)
332
   pthread_mutex_lock (&__po_hi_gqueues_mutexes[id]);
333
334
#elif defined (XENO_NATIVE)
   rt_mutex_acquire (&__po_hi_gqueues_mutexes[id], TM_INFINITE);
335
336
337
338
339
340
341
342
343
344
345
346
347
348
#elif defined (RTEMS_PURE)
   /*
  ret = rtems_barrier_wait (__po_hi_gqueues_barriers[id], RTEMS_WAIT);

rtems_id                __po_hi_gqueues_semaphores[__PO_HI_NB_TASKS];
rtems_id                __po_hi_gqueues_barriers[__PO_HI_NB_TASKS];
  */
   ret = rtems_semaphore_obtain (__po_hi_gqueues_semaphores[id], RTEMS_WAIT, RTEMS_NO_TIMEOUT);
  if (ret != RTEMS_SUCCESSFUL)
  {
     __DEBUGMSG ("[GQUEUE] Cannot obtain semaphore in __po_hi_gqueue_store_in()\n");
  }
#endif

349
350
   while(__po_hi_gqueues_queue_is_empty[id] == 1)
   {
351
352

      __PO_HI_INSTRUMENTATION_VCD_WRITE("0t%d\n", id); 
353
#if defined (POSIX) || defined (RTEMS_POSIX) || defined (XENO_POSIX)
354

355
      pthread_cond_wait (&__po_hi_gqueues_conds[id],
356
            &__po_hi_gqueues_mutexes[id]);
357
358
#elif defined (XENO_NATIVE)
   rt_cond_wait (&__po_hi_gqueues_conds[id], &__po_hi_gqueues_mutexes[id], TM_INFINITE);
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
#elif defined (RTEMS_PURE)
      ret = rtems_semaphore_release (__po_hi_gqueues_semaphores[id]);
      if (ret != RTEMS_SUCCESSFUL)
      {
         __DEBUGMSG ("[GQUEUE] Cannot obtain semaphore in __po_hi_gqueue_store_in()\n");
      }
      rtems_task_wake_after( RTEMS_YIELD_PROCESSOR );
      ret = rtems_semaphore_obtain (__po_hi_gqueues_semaphores[id], RTEMS_WAIT, RTEMS_NO_TIMEOUT);
      if (ret != RTEMS_SUCCESSFUL)
      {
         __DEBUGMSG ("[GQUEUE] Cannot obtain semaphore in __po_hi_gqueue_store_in()\n");
      }

#endif

374
      __PO_HI_INSTRUMENTATION_VCD_WRITE("1t%d\n", id); 
375
376
   }
   *port = __po_hi_gqueues_global_history[id][__po_hi_gqueues_global_history_offset[id]];
377
#if defined (POSIX) || defined (RTEMS_POSIX) || defined (XENO_POSIX)
378
   pthread_mutex_unlock (&__po_hi_gqueues_mutexes[id]);
379
380
#elif defined (XENO_NATIVE)
   rt_mutex_release (&__po_hi_gqueues_mutexes[id]);
381
382
383
384
385
386
387
388
#elif defined (RTEMS_PURE)
   ret = rtems_semaphore_release (__po_hi_gqueues_semaphores[id]);
   if (ret != RTEMS_SUCCESSFUL)
   {
      __DEBUGMSG ("[GQUEUE] Cannot release semaphore in __po_hi_gqueue_store_in()\n");
   }
#endif

389
390
391
392
}

int __po_hi_gqueue_get_count( __po_hi_task_id id, __po_hi_local_port_t port)
{
393
394
   if (__po_hi_gqueues_sizes[id][port] == __PO_HI_GQUEUE_FIFO_INDATA)
   {
395
      return 1; /* data port are always of size 1 */
396
397
398
   }
   else
   {
399
      return (__po_hi_gqueues_used_size[id][port]);
400
   }
401
402
}

403
404
405
int __po_hi_gqueue_get_value (__po_hi_task_id      id, 
                              __po_hi_local_port_t port, 
                              __po_hi_request_t*   request)
406
{
407
   __po_hi_request_t* ptr;
408
409
410
411
412
#ifdef RTEMS_PURE
   rtems_status_code ret;
#endif


413
   ptr = &__po_hi_gqueues_most_recent_values[id][port];
414
#if defined (POSIX) || defined (RTEMS_POSIX) || defined (XENO_POSIX)
415
   pthread_mutex_lock (&__po_hi_gqueues_mutexes[id]);
416
417
#elif defined (XENO_NATIVE)
   rt_mutex_acquire (&__po_hi_gqueues_mutexes[id], TM_INFINITE);
418
419
420
421
422
423
424
425
426
427
428
429
430
431
#elif defined (RTEMS_PURE)
   /*
  ret = rtems_barrier_wait (__po_hi_gqueues_barriers[id], RTEMS_WAIT);

rtems_id                __po_hi_gqueues_semaphores[__PO_HI_NB_TASKS];
rtems_id                __po_hi_gqueues_barriers[__PO_HI_NB_TASKS];
  */
   ret = rtems_semaphore_obtain (__po_hi_gqueues_semaphores[id], RTEMS_WAIT, RTEMS_NO_TIMEOUT);
  if (ret != RTEMS_SUCCESSFUL)
  {
     __DEBUGMSG ("[GQUEUE] Cannot obtain semaphore in __po_hi_gqueue_store_in()\n");
  }
#endif

432
433
434
435
436
437
438

   /*
    * If the port is an event port, with no value queued, then we block
    * the thread.
    */
   if (__po_hi_gqueues_sizes[id][port] != __PO_HI_GQUEUE_FIFO_INDATA)
   {
439
      while (__po_hi_gqueues_port_is_empty[id][port] == 1)
440
      {
441
#if defined (POSIX) || defined (RTEMS_POSIX) || defined (XENO_POSIX)
442
443
         pthread_cond_wait (&__po_hi_gqueues_conds[id],
               &__po_hi_gqueues_mutexes[id]);
444
445
#elif defined (XENO_NATIVE)
   rt_cond_wait (&__po_hi_gqueues_conds[id], &__po_hi_gqueues_mutexes[id], TM_INFINITE);
446
447
448
#elif defined (RTEMS_PURE)
         rtems_task_wake_after( RTEMS_YIELD_PROCESSOR );
#endif
449
450
451
452
453
      }
   }

   if (__po_hi_gqueues_used_size[id][port] == 0)
   {
454
      memcpy (request, ptr, sizeof (__po_hi_request_t));
455
456
457
   }
   else
   {
458
459
      ptr = ((__po_hi_request_t *) &__po_hi_gqueues[id][port]) +  __po_hi_gqueues_first[id][port] + __po_hi_gqueues_offsets[id][port]; 
      memcpy (request, ptr, sizeof (__po_hi_request_t));
460
   }
461
462
    
   
julien.delange's avatar
julien.delange committed
463
   __PO_HI_DEBUG_INFO ("[GQUEUE] Task %d get a value on port %d\n", id, port);
464

465
466
467
   /*
    * As this part of the code is now considered as stable, we don't print debug output
    *
468

469
   __DEBUGMSG ("RECEIVED vars in gqueue: |");
470
471
472
   {
         int s;
         int i;
473
         uint8_t* tmp;
474
475
         tmp = (unsigned int*) &request->vars;
         s = sizeof (request->vars);
476
         for (i = 0 ; i < s ; i++)
477
478
479
480
481
482
483
         {
            printf("%x", *tmp);
            tmp++;
            fflush (stdout);
         }
   }
   __DEBUGMSG ("|\n");
484
#endif
485
*/
486

487
#if defined (POSIX) || defined (RTEMS_POSIX) || defined (XENO_POSIX)
488
   pthread_mutex_unlock (&__po_hi_gqueues_mutexes[id]);
489
490
#elif defined (XENO_NATIVE)
   rt_mutex_release (&__po_hi_gqueues_mutexes[id]);
491
492
493
494
495
496
497
498
#elif defined (RTEMS_PURE)
   ret = rtems_semaphore_release (__po_hi_gqueues_semaphores[id]);
   if (ret != RTEMS_SUCCESSFUL)
   {
      __DEBUGMSG ("[GQUEUE] Cannot release semaphore in __po_hi_gqueue_store_in()\n");
   }
#endif

499
   return 0;
500
501
}

502
int __po_hi_gqueue_next_value (__po_hi_task_id id, __po_hi_local_port_t port)
503
{
504
505
506
507
508
#ifdef RTEMS_PURE
   rtems_status_code ret;
#endif


509
510
   /* incomplete semantics, should discriminate and report whether
      there is a next value or not */
511

512
513
514
   /* XXX change and use assert ? */
   if (__po_hi_gqueues_sizes[id][port] == __PO_HI_GQUEUE_FIFO_INDATA)
   {
515
      return 1;
516
   }
517

518
#if defined (POSIX) || defined (RTEMS_POSIX) || defined (XENO_POSIX)
519
   pthread_mutex_lock (&__po_hi_gqueues_mutexes[id]);
520
521
#elif defined (XENO_NATIVE)
   rt_mutex_acquire (&__po_hi_gqueues_mutexes[id], TM_INFINITE);
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
#elif defined (RTEMS_PURE)
   /*
  ret = rtems_barrier_wait (__po_hi_gqueues_barriers[id], RTEMS_WAIT);

rtems_id                __po_hi_gqueues_semaphores[__PO_HI_NB_TASKS];
rtems_id                __po_hi_gqueues_barriers[__PO_HI_NB_TASKS];
  */
   ret = rtems_semaphore_obtain (__po_hi_gqueues_semaphores[id], RTEMS_WAIT, RTEMS_NO_TIMEOUT);
  if (ret != RTEMS_SUCCESSFUL)
  {
     __DEBUGMSG ("[GQUEUE] Cannot obtain semaphore in __po_hi_gqueue_store_in()\n");
  }
#endif


537
538
539
   __po_hi_gqueues_offsets[id][port] = 
      (__po_hi_gqueues_offsets[id][port] + 1) 
      % __po_hi_gqueues_sizes[id][port];
540

541
   __po_hi_gqueues_used_size[id][port]--;
542

543
544
   __PO_HI_INSTRUMENTATION_VCD_WRITE("r%d p%d.%d\n", __po_hi_gqueues_used_size[id][port], id, port); 

545
546
   if (__po_hi_gqueues_used_size[id][port] == 0)
   {
547
548
      __po_hi_gqueues_n_empty[id]++;
      __po_hi_gqueues_port_is_empty[id][port] = 1;
549
550
551
552
   }

   if (__po_hi_gqueues_n_empty[id] == __po_hi_gqueues_nb_ports[id])
   {
553
      __po_hi_gqueues_queue_is_empty[id] = 1;
554
   }
555

556
557
558
   __po_hi_gqueues_global_history_offset[id] = 
      (__po_hi_gqueues_global_history_offset[id] + 1) 
      % __po_hi_gqueues_total_fifo_size[id];
559

560
#if defined (POSIX) || defined (RTEMS_POSIX) || defined (XENO_POSIX)
561
   pthread_mutex_unlock (&__po_hi_gqueues_mutexes[id]);
562
563
#elif defined (XENO_NATIVE)
   rt_mutex_release (&__po_hi_gqueues_mutexes[id]);
564
565
566
567
568
569
570
571
#elif defined (RTEMS_PURE)
   ret = rtems_semaphore_release (__po_hi_gqueues_semaphores[id]);
   if (ret != RTEMS_SUCCESSFUL)
   {
      __DEBUGMSG ("[GQUEUE] Cannot release semaphore in __po_hi_gqueue_store_in()\n");
   }
#endif

572
   return __PO_HI_SUCCESS;
573
}
574

575
__po_hi_request_t*  __po_hi_gqueue_get_most_recent_value (const __po_hi_task_id task_id, const __po_hi_local_port_t local_port)
576
577
578
579
580
581
582
583
584
585
586
587
588
{
   return (&__po_hi_gqueues_most_recent_values[task_id][local_port]);
}

uint8_t __po_hi_gqueue_get_destinations_number (const __po_hi_task_id task_id, const __po_hi_local_port_t local_port)
{
      return (__po_hi_gqueues_n_destinations[task_id][local_port]);
}

__po_hi_port_t __po_hi_gqueue_get_destination (const __po_hi_task_id task_id, const __po_hi_local_port_t local_port, const uint8_t destination_number)
{
      return (__po_hi_gqueues_destinations[task_id][local_port][destination_number]);
}
589
590