po_hi_gqueue.c 15.2 KB
Newer Older
1
2
3
4
5
/*
 * This is a part of PolyORB-HI-C distribution, a minimal
 * middleware written for generated code from AADL models.
 * You should use it with the Ocarina toolsuite.
 *
yoogx's avatar
yoogx committed
6
 * For more informations, please visit http://taste.tuxfamily.org/wiki
7
 *
yoogx's avatar
yoogx committed
8
 * Copyright (C) 2010-2018 ESA & ISAE.
9
10
11
12
13
14
15
16
17
 */

#include <po_hi_config.h>
#include <po_hi_types.h>
#include <po_hi_messages.h>
#include <po_hi_returns.h>
#include <po_hi_transport.h>
#include <po_hi_debug.h>
#include <po_hi_gqueue.h>
18
19
20
#include <po_hi_protected.h>
#include <po_hi_semaphore.h>

21
#include <po_hi_utils.h>
22
23
24
25
26
27
28
29
/* Headers from PolyORB-HI-C */

#include <deployment.h>
#include <activity.h>
#include <request.h>
/* Headers from the generated code */

#include <string.h>
30
31
#include <assert.h>
#include <stdlib.h>
32

33
#if defined (POSIX) || defined (RTEMS_POSIX) || defined (XENO_POSIX)
34
#include <pthread.h>
35
#elif defined(__PO_HI_RTEMS_CLASSIC_API)
36
37
38
39
#include <rtems.h>
#include <inttypes.h>
#include <po_hi_time.h>
#define __PO_HI_DEFAULT_PRIORITY RTEMS_NO_PRIORITY
40
41
42
#elif defined (XENO_NATIVE)
#include <native/cond.h>
#include <native/mutex.h>
43
44
#endif

45
#if defined (MONITORING) /* Headers from run-time verification */
yoogx's avatar
yoogx committed
46
#include <trace_manager.h>
47
#endif
48

yoogx's avatar
yoogx committed
49
#define __PO_HI_GQUEUE_OUT_PORT constant_out_identifier
50
51
52
/* give a default value to the out port */

__po_hi_port_t*        __po_hi_gqueues[__PO_HI_NB_TASKS];
53
54
55
56
57
58
__po_hi_port_id_t      __po_hi_gqueues_nb_ports[__PO_HI_NB_TASKS];
__po_hi_port_id_t*     __po_hi_gqueues_sizes[__PO_HI_NB_TASKS];
__po_hi_port_id_t*     __po_hi_gqueues_used_size[__PO_HI_NB_TASKS];
__po_hi_port_id_t*     __po_hi_gqueues_offsets[__PO_HI_NB_TASKS];
__po_hi_port_id_t*     __po_hi_gqueues_woffsets[__PO_HI_NB_TASKS];
__po_hi_port_id_t*     __po_hi_gqueues_n_destinations[__PO_HI_NB_TASKS];
59
__po_hi_port_t**       __po_hi_gqueues_destinations[__PO_HI_NB_TASKS];
60
__po_hi_uint32_t       __po_hi_gqueues_total_fifo_size[__PO_HI_NB_TASKS];
61
__po_hi_request_t*     __po_hi_gqueues_most_recent_values[__PO_HI_NB_TASKS];
62
__po_hi_port_id_t*     __po_hi_gqueues_first[__PO_HI_NB_TASKS];
63

64
65
66
67
__po_hi_port_id_t       __po_hi_gqueues_global_size[__PO_HI_NB_TASKS];
__po_hi_local_port_t*   __po_hi_gqueues_global_history[__PO_HI_NB_TASKS];
__po_hi_uint32_t        __po_hi_gqueues_global_history_offset[__PO_HI_NB_TASKS];
__po_hi_uint32_t        __po_hi_gqueues_global_history_woffset[__PO_HI_NB_TASKS];
68

69
70
71
__po_hi_port_id_t*       __po_hi_gqueues_port_is_empty[__PO_HI_NB_TASKS];
__po_hi_port_id_t        __po_hi_gqueues_queue_is_empty[__PO_HI_NB_TASKS];
__po_hi_port_id_t        __po_hi_gqueues_n_empty[__PO_HI_NB_TASKS];
72

73
74
75
76

__po_hi_sem_t            __po_hi_gqueues_semaphores[__PO_HI_NB_TASKS];


77

78
void __po_hi_gqueue_init (__po_hi_task_id       id,
79
                          __po_hi_port_id_t     nb_ports,
80
                          __po_hi_port_t        queue[],
81
82
83
84
85
                          __po_hi_port_id_t     sizes[],
                          __po_hi_port_id_t     first[],
                          __po_hi_port_id_t     offsets[],
                          __po_hi_port_id_t     woffsets[],
                          __po_hi_port_id_t     n_dest[],
86
                          __po_hi_port_t*       destinations[],
87
                          __po_hi_port_id_t     used_size[],
88
89
                          __po_hi_local_port_t  history[],
                          __po_hi_request_t     recent[],
90
91
                          __po_hi_port_id_t     empties[],
                          __po_hi_uint32_t      total_fifo_size)
92
{
93
94
   __po_hi_port_id_t      tmp;
   __po_hi_uint32_t     off; /* XXX May overflow for large value .. */
95

96
97
   __po_hi_gqueues_global_history_woffset[id] = 0;
   __po_hi_gqueues_global_history_offset[id] = 0;
98

99
100
101
102
103
   __po_hi_gqueues_n_empty[id] = nb_ports;
   __po_hi_gqueues[id] = queue;
   __po_hi_gqueues_most_recent_values[id] = recent;
   __po_hi_gqueues_global_history[id] = history;
   __po_hi_gqueues_woffsets[id] = woffsets;
104

105
   __po_hi_gqueues_port_is_empty[id] = empties;
106

yoogx's avatar
yoogx committed
107
   __po_hi_gqueues_nb_ports[id] = nb_ports;
108
109
110
   __po_hi_gqueues_sizes[id] = sizes;
   __po_hi_gqueues_first[id] = first;
   __po_hi_gqueues_used_size[id] = used_size;
111

jdelange's avatar
jdelange committed
112
113
114
115
   __po_hi_gqueues_offsets[id]            = offsets;
   __po_hi_gqueues_n_destinations[id]     = n_dest;
   __po_hi_gqueues_destinations[id]       = destinations;
   __po_hi_gqueues_total_fifo_size[id]    = total_fifo_size;
116

117
   __po_hi_gqueues_queue_is_empty[id] = 1;
118
119


Antonia Francis's avatar
Antonia Francis committed
120
  /* Using the semaphore API to initialize the semaphore_gqueue array */
121
122
123
  int res = __po_hi_sem_init_gqueue(__po_hi_gqueues_semaphores,id);
  __DEBUGMSG("GQUEUE_SEM_INIT %d %d\n", id, res);
  assert(res == __PO_HI_SUCCESS);
124

jdelange's avatar
jdelange committed
125

126
127
128
   off = 0;
   for (tmp=0;tmp<nb_ports;tmp++)
   {
129
      __po_hi_gqueues_used_size[id][tmp] = 0;
130

yoogx's avatar
yoogx committed
131
      if ( (sizes[tmp] != __PO_HI_GQUEUE_FIFO_INDATA)
132
133
134
135
136
137
138
139
            && (sizes[tmp] != __PO_HI_GQUEUE_FIFO_OUT))
      {
         __po_hi_gqueues_first[id][tmp]=off;
         off += __po_hi_gqueues_sizes[id][tmp];
         __po_hi_gqueues_offsets[id][tmp] = 0;
         __po_hi_gqueues_woffsets[id][tmp] = 0;
         __po_hi_gqueues_port_is_empty[id][tmp] = 1;
      }
140

141
      /* Set invalid all recent values */
142
      __po_hi_request_t* request = (__po_hi_request_t*)&__po_hi_gqueues_most_recent_values[id][tmp];
143
      request->port = __PO_HI_GQUEUE_INVALID_PORT;
144
   }
145
146

#ifdef __PO_HI_DEBUG
147
148
149
   __DEBUGMSG("Initialize global queue for task-id %d ... ", id);
   for (tmp=0;tmp<nb_ports;tmp++)
   {
yoogx's avatar
yoogx committed
150
151
      __DEBUGMSG("port %d (used_size=%d,first=%d) ",
            tmp,
152
153
154
155
            __po_hi_gqueues_used_size[id][tmp],
            __po_hi_gqueues_first[id][tmp]);
   }
   __DEBUGMSG(" ... done\n");
yoogx's avatar
yoogx committed
156
#endif
157
158
}

159

yoogx's avatar
yoogx committed
160
161
void __po_hi_gqueue_store_out (__po_hi_task_id id,
                               __po_hi_local_port_t port,
162
                               __po_hi_request_t* request)
163
{
164
   __po_hi_request_t* ptr;
165

166
167
   request->port = __PO_HI_GQUEUE_OUT_PORT;
   ptr = &__po_hi_gqueues_most_recent_values[id][port];
168
   memcpy (ptr, request, sizeof (__po_hi_request_t));
julien.delange's avatar
julien.delange committed
169
   __PO_HI_DEBUG_DEBUG ("__po_hi_gqueue_store_out() from task %d on port %d\n", id, port);
yoogx's avatar
yoogx committed
170
171
172
173
174
175

#if defined (MONITORING)
printf("record_event");
   record_event(ANY, STORE_OUT, id, invalid_port_t, invalid_port_t, port, invalid_local_port_t, request);
#endif

176
177
}

178
__po_hi_port_id_t __po_hi_gqueue_store_in (__po_hi_task_id id,
yoogx's avatar
yoogx committed
179
                                         __po_hi_local_port_t port,
180
                                         __po_hi_request_t* request)
181
{
182
   __po_hi_request_t* ptr;
183
   __po_hi_request_t* tmp;
184

185

186
   ptr = &__po_hi_gqueues_most_recent_values[id][port];
187
#ifdef __PO_HI_DEBUG
188
189
   if (ptr == NULL)
   {
190
      __DEBUGMSG ("__po_hi_gqueue_store_in : NULL POINTER\n");
191
   }
192
193
#endif

Antonia Francis's avatar
Antonia Francis committed
194
195

  /* Locking only a mutex */
196
  int result = __po_hi_sem_mutex_wait_gqueue(__po_hi_gqueues_semaphores,id);
Antonia Francis's avatar
Antonia Francis committed
197
  __DEBUGMSG("GQUEUE_SEM_MUTEX_WAIT on task %d result = %d\n", id, result);
198
  assert(result == __PO_HI_SUCCESS);
yoogx's avatar
yoogx committed
199

Antonia Francis's avatar
Antonia Francis committed
200

201
202
   if (__po_hi_gqueues_sizes[id][port] == __PO_HI_GQUEUE_FIFO_INDATA)
   {
yoogx's avatar
yoogx committed
203
     memcpy(ptr,request,sizeof(*request));
204
205
206
   }
   else
   {
yoogx's avatar
yoogx committed
207
208
     __DEBUGMSG ("[GQUEUE] Received  message for task %d, port %d\n", id, port);

209
      if (__po_hi_gqueues_used_size[id][port] == __po_hi_gqueues_sizes[id][port])
210
      {
Antonia Francis's avatar
Antonia Francis committed
211
        /* Releasing only a mutex */
212
213
214
        int res = __po_hi_sem_mutex_release_gqueue(__po_hi_gqueues_semaphores,id);
        __DEBUGMSG("GQUEUE_SEM_MTUEX_RELEASE %d %d\n", id, res);
        assert(res == __PO_HI_SUCCESS);
yoogx's avatar
yoogx committed
215

Antonia Francis's avatar
Antonia Francis committed
216

yoogx's avatar
yoogx committed
217
        __PO_HI_DEBUG_CRITICAL ("[GQUEUE] QUEUE FULL, task-id=%d, port=%d\n", id, port);
jdelange's avatar
jdelange committed
218

yoogx's avatar
yoogx committed
219
220
        __DEBUGMSG ("[GQUEUE] Semaphore released (id=%d)\n", id);
        return __PO_HI_ERROR_QUEUE_FULL;
221
      }
222

223
      __po_hi_uint32_t   size;
224
225
      tmp = (__po_hi_request_t*) &__po_hi_gqueues[id][port];
      size = __po_hi_gqueues_woffsets[id][port] + __po_hi_gqueues_first[id][port];
226

jdelange's avatar
jdelange committed
227
      tmp = tmp + size;
228
229

      memcpy (tmp , request, sizeof (__po_hi_request_t));
jdelange's avatar
jdelange committed
230

231
232
233
      __po_hi_gqueues_woffsets[id][port] =  (__po_hi_gqueues_woffsets[id][port] + 1 ) % __po_hi_gqueues_sizes[id][port];

      __po_hi_gqueues_used_size[id][port]++;
yoogx's avatar
yoogx committed
234
      __PO_HI_INSTRUMENTATION_VCD_WRITE("r%d p%d.%d\n", __po_hi_gqueues_used_size[id][port], id, port);
235

236
237
238
239
      __po_hi_gqueues_global_history[id][__po_hi_gqueues_global_history_woffset[id]] = port;
      __po_hi_gqueues_global_history_woffset[id] = (__po_hi_gqueues_global_history_woffset[id] + 1 ) % __po_hi_gqueues_total_fifo_size[id];

      if (__po_hi_gqueues_port_is_empty[id][port] == 1)
240
241
242
243
      {
         __po_hi_gqueues_port_is_empty[id][port] = 0;
         __po_hi_gqueues_n_empty[id]--;
      }
244
      __po_hi_gqueues_queue_is_empty[id] = 0;
245
   }
Antonia Francis's avatar
Antonia Francis committed
246
  /* Releasing a complete semaphore */
247
248
249
  int rel = __po_hi_sem_release_gqueue(__po_hi_gqueues_semaphores,id);
  __DEBUGMSG("GQUEUE_SEM_RELEASE %d %d\n", id, rel);
  assert(rel == __PO_HI_SUCCESS);
250

251
252
  __DEBUGMSG ("[GQUEUE] store_in completed\n");
  return __PO_HI_SUCCESS;
253
254
}

255

yoogx's avatar
yoogx committed
256
void __po_hi_gqueue_wait_for_incoming_event (__po_hi_task_id id,
257
                                             __po_hi_local_port_t* port)
258
{
Antonia Francis's avatar
Antonia Francis committed
259
  /* Locking only the mutex of the semaphore */
260
261
262
  int result = __po_hi_sem_mutex_wait_gqueue(__po_hi_gqueues_semaphores,id);
  __DEBUGMSG("GQUEUE_SEM_MUTEX_WAIT %d %d\n", id, result);
  assert(result == __PO_HI_SUCCESS);
263

yoogx's avatar
yoogx committed
264
265
266
  while(__po_hi_gqueues_queue_is_empty[id] == 1)
    {
      __PO_HI_INSTRUMENTATION_VCD_WRITE("0t%d\n", id);
267

Antonia Francis's avatar
Antonia Francis committed
268
    /* Telling the semaphore to wait with putting its condvar on wait mode */
269
270
271
    int res_sem =  __po_hi_sem_wait_gqueue(__po_hi_gqueues_semaphores,id);
    __DEBUGMSG("GQUEUE_SEM_WAIT %d %d\n", id, result);
    assert(res_sem == __PO_HI_SUCCESS);
yoogx's avatar
yoogx committed
272
273
      __PO_HI_INSTRUMENTATION_VCD_WRITE("1t%d\n", id);
    }
jdelange's avatar
jdelange committed
274

yoogx's avatar
yoogx committed
275
  *port = __po_hi_gqueues_global_history[id][__po_hi_gqueues_global_history_offset[id]];
jdelange's avatar
jdelange committed
276

yoogx's avatar
yoogx committed
277
278
279
280
281
#if defined (MONITORING)
  printf("record_event");
  record_event(SPORADIC, WAIT_FOR, id, invalid_port_t, invalid_port_t, *port, invalid_local_port_t, NULL);

  /** Releasing only the mutex of the semaphore*/
282

283
284
285
  int res = __po_hi_sem_mutex_release_gqueue(__po_hi_gqueues_semaphores,id);
  __DEBUGMSG("GQUEUE_SEM_MTUEX_RELEASE %d %d\n", id, res);
  assert(res == __PO_HI_SUCCESS);
286

287
288
}

289

290
291
int __po_hi_gqueue_get_count( __po_hi_task_id id, __po_hi_local_port_t port)
{
292
293
   if (__po_hi_gqueues_sizes[id][port] == __PO_HI_GQUEUE_FIFO_INDATA)
   {
294
      return 1; /* data port are always of size 1 */
295
296
297
   }
   else
   {
298
      return (__po_hi_gqueues_used_size[id][port]);
299
   }
300
301
}

302

yoogx's avatar
yoogx committed
303
304
int __po_hi_gqueue_get_value (__po_hi_task_id      id,
                              __po_hi_local_port_t port,
305
                              __po_hi_request_t*   request)
306
{
307
   __po_hi_request_t* ptr;
308
#ifdef __PO_HI_RTEMS_CLASSIC_API
309
310
311
   rtems_status_code ret;
#endif

jdelange's avatar
jdelange committed
312
313
314
315
#ifdef _WIN32
   DWORD ret;
#endif

Antonia Francis's avatar
Antonia Francis committed
316

317
   ptr = &__po_hi_gqueues_most_recent_values[id][port];
318

Antonia Francis's avatar
Antonia Francis committed
319
 /* Locking only the mutex of the semaphore */
320
321
322
  int result = __po_hi_sem_mutex_wait_gqueue(__po_hi_gqueues_semaphores,id);
  __DEBUGMSG("GQUEUE_SEM_MUTEX_WAIT %d %d\n", id, result);
  assert(result == __PO_HI_SUCCESS);
323

324
325
326
327
328
329
   /*
    * If the port is an event port, with no value queued, then we block
    * the thread.
    */
   if (__po_hi_gqueues_sizes[id][port] != __PO_HI_GQUEUE_FIFO_INDATA)
   {
330
      while (__po_hi_gqueues_port_is_empty[id][port] == 1)
331
      {
Antonia Francis's avatar
Antonia Francis committed
332
        /* Telling the semaphore to wait with putting its condvar on wait mode */
333
334
335
        int res_sem =  __po_hi_sem_wait_gqueue(__po_hi_gqueues_semaphores,id);
        __DEBUGMSG("GQUEUE_SEM_WAIT %d %d\n", id, result);
        assert(res_sem == __PO_HI_SUCCESS);
336
337
338
      }
   }

339
#if defined (MONITORING)
yoogx's avatar
yoogx committed
340
341
printf("record_event");
   record_event(ANY, GET_VALUE, id, invalid_port_t, invalid_port_t, port, invalid_local_port_t , request);
342
343
#endif

344
345
   if (__po_hi_gqueues_used_size[id][port] == 0)
   {
346
      memcpy (request, ptr, sizeof (__po_hi_request_t));
347
      //update_runtime (id, port, ptr);
348
349
350
   }
   else
   {
yoogx's avatar
yoogx committed
351
      ptr = ((__po_hi_request_t *) &__po_hi_gqueues[id][port]) +  __po_hi_gqueues_first[id][port] + __po_hi_gqueues_offsets[id][port];
352
      memcpy (request, ptr, sizeof (__po_hi_request_t));
353
   }
yoogx's avatar
yoogx committed
354

julien.delange's avatar
julien.delange committed
355
   __PO_HI_DEBUG_INFO ("[GQUEUE] Task %d get a value on port %d\n", id, port);
356

Antonia Francis's avatar
Antonia Francis committed
357
/* Releasing only the mutex of the semaphore*/
358
359
360
  int res = __po_hi_sem_mutex_release_gqueue(__po_hi_gqueues_semaphores,id);
  __DEBUGMSG("GQUEUE_SEM_MUTEX_RELEASE %d %d\n", id, res);
  assert(res == __PO_HI_SUCCESS);
361

362
   return 0;
363
364
}

365
int __po_hi_gqueue_next_value (__po_hi_task_id id, __po_hi_local_port_t port)
366
{
367
#ifdef __PO_HI_RTEMS_CLASSIC_API
368
369
370
   rtems_status_code ret;
#endif

371
372
   /* incomplete semantics, should discriminate and report whether
      there is a next value or not */
373

374
375
376
   /* XXX change and use assert ? */
   if (__po_hi_gqueues_sizes[id][port] == __PO_HI_GQUEUE_FIFO_INDATA)
   {
377
      return 1;
378
   }
379

Antonia Francis's avatar
Antonia Francis committed
380
 /* Locking a mutex */
381
382
383
  int result = __po_hi_sem_mutex_wait_gqueue(__po_hi_gqueues_semaphores,id);
  __DEBUGMSG("GQUEUE_SEM_MUTEX_WAIT %d %d\n", id, result);
  assert(result == __PO_HI_SUCCESS);
384

yoogx's avatar
yoogx committed
385
386
   __po_hi_gqueues_offsets[id][port] =
      (__po_hi_gqueues_offsets[id][port] + 1)
387
      % __po_hi_gqueues_sizes[id][port];
388

389
   __po_hi_gqueues_used_size[id][port]--;
390

yoogx's avatar
yoogx committed
391
   __PO_HI_INSTRUMENTATION_VCD_WRITE("r%d p%d.%d\n", __po_hi_gqueues_used_size[id][port], id, port);
392

393
394
   if (__po_hi_gqueues_used_size[id][port] == 0)
   {
395
396
      __po_hi_gqueues_n_empty[id]++;
      __po_hi_gqueues_port_is_empty[id][port] = 1;
397
398
399
400
   }

   if (__po_hi_gqueues_n_empty[id] == __po_hi_gqueues_nb_ports[id])
   {
401
      __po_hi_gqueues_queue_is_empty[id] = 1;
402
   }
403

yoogx's avatar
yoogx committed
404
405
   __po_hi_gqueues_global_history_offset[id] =
      (__po_hi_gqueues_global_history_offset[id] + 1)
406
      % __po_hi_gqueues_total_fifo_size[id];
407

408

Antonia Francis's avatar
Antonia Francis committed
409
/* Releasing a mutex*/
410
411
412
413
414
  int res = __po_hi_sem_mutex_release_gqueue(__po_hi_gqueues_semaphores,id);
  __DEBUGMSG("GQUEUE_SEM_MUTEX_RELEASE %d %d\n", id, res);
  assert(res == __PO_HI_SUCCESS);

  return __PO_HI_SUCCESS;
415
}
416

417
__po_hi_request_t*  __po_hi_gqueue_get_most_recent_value (const __po_hi_task_id task_id, const __po_hi_local_port_t local_port)
418
{
419
  return (&__po_hi_gqueues_most_recent_values[task_id][local_port]);
420
421
422
423
}

uint8_t __po_hi_gqueue_get_destinations_number (const __po_hi_task_id task_id, const __po_hi_local_port_t local_port)
{
424
  return (__po_hi_gqueues_n_destinations[task_id][local_port]);
425
426
427
428
}

__po_hi_port_t __po_hi_gqueue_get_destination (const __po_hi_task_id task_id, const __po_hi_local_port_t local_port, const uint8_t destination_number)
{
429
  return (__po_hi_gqueues_destinations[task_id][local_port][destination_number]);
430
}
431

432
__po_hi_port_id_t __po_hi_gqueue_get_port_size( __po_hi_task_id id, __po_hi_local_port_t port)
433
434
435
436
{
   return __po_hi_gqueues_sizes[id][port];
}

437
__po_hi_port_id_t __po_hi_gqueue_used_size( __po_hi_task_id id, __po_hi_local_port_t port)
438
439
440
441
{
   return __po_hi_gqueues_used_size[id][port];
}

442
__po_hi_port_id_t po_hi_gqueues_queue_is_empty( __po_hi_task_id id)
443
444
445
446
{
   return __po_hi_gqueues_queue_is_empty[id];
}

447
__po_hi_request_t*
448
449
__po_hi_gqueues_get_request(__po_hi_task_id id, __po_hi_local_port_t port)

450
 {
451
452
453
454
  __po_hi_request_t* request ;
  __po_hi_request_t* ptr ;
  request = calloc(1,sizeof(__po_hi_request_t));
  ptr = &__po_hi_gqueues_most_recent_values[id][port];
455
456
457
458
459
460
461
462
463
   if (__po_hi_gqueues_used_size[id][port] == 0)
   {
      memcpy (request, ptr, sizeof (__po_hi_request_t));
      //update_runtime (id, port, ptr);
   }
   else
   {
      ptr = ((__po_hi_request_t *) &__po_hi_gqueues[id][port]) +  __po_hi_gqueues_first[id][port] + __po_hi_gqueues_offsets[id][port];
      memcpy (request, ptr, sizeof (__po_hi_request_t));
464
   }	return request;
465
}