po_hi_gqueue.c 14.8 KB
Newer Older
1 2 3 4 5
/*
 * This is a part of PolyORB-HI-C distribution, a minimal
 * middleware written for generated code from AADL models.
 * You should use it with the Ocarina toolsuite.
 *
yoogx's avatar
yoogx committed
6
 * For more informations, please visit http://taste.tuxfamily.org/wiki
7
 *
yoogx's avatar
yoogx committed
8
 * Copyright (C) 2010-2018 ESA & ISAE.
9 10 11 12 13 14 15 16 17
 */

#include <po_hi_config.h>
#include <po_hi_types.h>
#include <po_hi_messages.h>
#include <po_hi_returns.h>
#include <po_hi_transport.h>
#include <po_hi_debug.h>
#include <po_hi_gqueue.h>
18 19 20
#include <po_hi_protected.h>
#include <po_hi_semaphore.h>

21
#include <po_hi_utils.h>
22 23 24 25 26 27 28 29
/* Headers from PolyORB-HI-C */

#include <deployment.h>
#include <activity.h>
#include <request.h>
/* Headers from the generated code */

#include <string.h>
30 31
#include <assert.h>
#include <stdlib.h>
32

33
#if defined (POSIX) || defined (RTEMS_POSIX) || defined (XENO_POSIX)
34
#include <pthread.h>
35
#elif defined(__PO_HI_RTEMS_CLASSIC_API)
36 37 38 39
#include <rtems.h>
#include <inttypes.h>
#include <po_hi_time.h>
#define __PO_HI_DEFAULT_PRIORITY RTEMS_NO_PRIORITY
40 41 42
#elif defined (XENO_NATIVE)
#include <native/cond.h>
#include <native/mutex.h>
43 44
#endif

45 46 47
#if defined (MONITORING) /* Headers from run-time verification */
#include <trace_manager.hh>
#endif
48

yoogx's avatar
yoogx committed
49
#define __PO_HI_GQUEUE_OUT_PORT constant_out_identifier
50 51 52
/* give a default value to the out port */

__po_hi_port_t*        __po_hi_gqueues[__PO_HI_NB_TASKS];
53 54 55 56 57 58
__po_hi_port_id_t      __po_hi_gqueues_nb_ports[__PO_HI_NB_TASKS];
__po_hi_port_id_t*     __po_hi_gqueues_sizes[__PO_HI_NB_TASKS];
__po_hi_port_id_t*     __po_hi_gqueues_used_size[__PO_HI_NB_TASKS];
__po_hi_port_id_t*     __po_hi_gqueues_offsets[__PO_HI_NB_TASKS];
__po_hi_port_id_t*     __po_hi_gqueues_woffsets[__PO_HI_NB_TASKS];
__po_hi_port_id_t*     __po_hi_gqueues_n_destinations[__PO_HI_NB_TASKS];
59
__po_hi_port_t**       __po_hi_gqueues_destinations[__PO_HI_NB_TASKS];
60
__po_hi_uint32_t       __po_hi_gqueues_total_fifo_size[__PO_HI_NB_TASKS];
61
__po_hi_request_t*     __po_hi_gqueues_most_recent_values[__PO_HI_NB_TASKS];
62
__po_hi_port_id_t*     __po_hi_gqueues_first[__PO_HI_NB_TASKS];
63

64 65 66 67
__po_hi_port_id_t       __po_hi_gqueues_global_size[__PO_HI_NB_TASKS];
__po_hi_local_port_t*   __po_hi_gqueues_global_history[__PO_HI_NB_TASKS];
__po_hi_uint32_t        __po_hi_gqueues_global_history_offset[__PO_HI_NB_TASKS];
__po_hi_uint32_t        __po_hi_gqueues_global_history_woffset[__PO_HI_NB_TASKS];
68

69 70 71
__po_hi_port_id_t*       __po_hi_gqueues_port_is_empty[__PO_HI_NB_TASKS];
__po_hi_port_id_t        __po_hi_gqueues_queue_is_empty[__PO_HI_NB_TASKS];
__po_hi_port_id_t        __po_hi_gqueues_n_empty[__PO_HI_NB_TASKS];
72

73 74 75 76

__po_hi_sem_t            __po_hi_gqueues_semaphores[__PO_HI_NB_TASKS];


77

78
void __po_hi_gqueue_init (__po_hi_task_id       id,
79
                          __po_hi_port_id_t     nb_ports,
80
                          __po_hi_port_t        queue[],
81 82 83 84 85
                          __po_hi_port_id_t     sizes[],
                          __po_hi_port_id_t     first[],
                          __po_hi_port_id_t     offsets[],
                          __po_hi_port_id_t     woffsets[],
                          __po_hi_port_id_t     n_dest[],
86
                          __po_hi_port_t*       destinations[],
87
                          __po_hi_port_id_t     used_size[],
88 89
                          __po_hi_local_port_t  history[],
                          __po_hi_request_t     recent[],
90 91
                          __po_hi_port_id_t     empties[],
                          __po_hi_uint32_t      total_fifo_size)
92
{
93 94
   __po_hi_port_id_t      tmp;
   __po_hi_uint32_t     off; /* XXX May overflow for large value .. */
95

96 97
   __po_hi_gqueues_global_history_woffset[id] = 0;
   __po_hi_gqueues_global_history_offset[id] = 0;
98

99 100 101 102 103
   __po_hi_gqueues_n_empty[id] = nb_ports;
   __po_hi_gqueues[id] = queue;
   __po_hi_gqueues_most_recent_values[id] = recent;
   __po_hi_gqueues_global_history[id] = history;
   __po_hi_gqueues_woffsets[id] = woffsets;
104

105
   __po_hi_gqueues_port_is_empty[id] = empties;
106

yoogx's avatar
yoogx committed
107
   __po_hi_gqueues_nb_ports[id] = nb_ports;
108 109 110
   __po_hi_gqueues_sizes[id] = sizes;
   __po_hi_gqueues_first[id] = first;
   __po_hi_gqueues_used_size[id] = used_size;
111

jdelange's avatar
jdelange committed
112 113 114 115
   __po_hi_gqueues_offsets[id]            = offsets;
   __po_hi_gqueues_n_destinations[id]     = n_dest;
   __po_hi_gqueues_destinations[id]       = destinations;
   __po_hi_gqueues_total_fifo_size[id]    = total_fifo_size;
116

117
   __po_hi_gqueues_queue_is_empty[id] = 1;
118 119


Antonia Francis's avatar
Antonia Francis committed
120
  /* Using the semaphore API to initialize the semaphore_gqueue array */
121 122 123
  int res = __po_hi_sem_init_gqueue(__po_hi_gqueues_semaphores,id);
  __DEBUGMSG("GQUEUE_SEM_INIT %d %d\n", id, res);
  assert(res == __PO_HI_SUCCESS);
124

jdelange's avatar
jdelange committed
125

126 127 128
   off = 0;
   for (tmp=0;tmp<nb_ports;tmp++)
   {
129
      __po_hi_gqueues_used_size[id][tmp] = 0;
130

yoogx's avatar
yoogx committed
131
      if ( (sizes[tmp] != __PO_HI_GQUEUE_FIFO_INDATA)
132 133 134 135 136 137 138 139
            && (sizes[tmp] != __PO_HI_GQUEUE_FIFO_OUT))
      {
         __po_hi_gqueues_first[id][tmp]=off;
         off += __po_hi_gqueues_sizes[id][tmp];
         __po_hi_gqueues_offsets[id][tmp] = 0;
         __po_hi_gqueues_woffsets[id][tmp] = 0;
         __po_hi_gqueues_port_is_empty[id][tmp] = 1;
      }
140

141
      /* Set invalid all recent values */
142
      __po_hi_request_t* request = (__po_hi_request_t*)&__po_hi_gqueues_most_recent_values[id][tmp];
143
      request->port = __PO_HI_GQUEUE_INVALID_PORT;
144
   }
145 146

#ifdef __PO_HI_DEBUG
147 148 149
   __DEBUGMSG("Initialize global queue for task-id %d ... ", id);
   for (tmp=0;tmp<nb_ports;tmp++)
   {
yoogx's avatar
yoogx committed
150 151
      __DEBUGMSG("port %d (used_size=%d,first=%d) ",
            tmp,
152 153 154 155
            __po_hi_gqueues_used_size[id][tmp],
            __po_hi_gqueues_first[id][tmp]);
   }
   __DEBUGMSG(" ... done\n");
yoogx's avatar
yoogx committed
156
#endif
157 158
}

159

yoogx's avatar
yoogx committed
160 161
void __po_hi_gqueue_store_out (__po_hi_task_id id,
                               __po_hi_local_port_t port,
162
                               __po_hi_request_t* request)
163
{
164
   __po_hi_request_t* ptr;
165

166 167
   request->port = __PO_HI_GQUEUE_OUT_PORT;
   ptr = &__po_hi_gqueues_most_recent_values[id][port];
168
   memcpy (ptr, request, sizeof (__po_hi_request_t));
julien.delange's avatar
julien.delange committed
169
   __PO_HI_DEBUG_DEBUG ("__po_hi_gqueue_store_out() from task %d on port %d\n", id, port);
170 171
}

172
__po_hi_port_id_t __po_hi_gqueue_store_in (__po_hi_task_id id,
yoogx's avatar
yoogx committed
173
                                         __po_hi_local_port_t port,
174
                                         __po_hi_request_t* request)
175
{
176
   __po_hi_request_t* ptr;
177
   __po_hi_request_t* tmp;
178

179

180
   ptr = &__po_hi_gqueues_most_recent_values[id][port];
181
#ifdef __PO_HI_DEBUG
182 183
   if (ptr == NULL)
   {
184
      __DEBUGMSG ("__po_hi_gqueue_store_in : NULL POINTER\n");
185
   }
186 187
#endif

Antonia Francis's avatar
Antonia Francis committed
188 189

  /* Locking only a mutex */
190
  int result = __po_hi_sem_mutex_wait_gqueue(__po_hi_gqueues_semaphores,id);
Antonia Francis's avatar
Antonia Francis committed
191
  __DEBUGMSG("GQUEUE_SEM_MUTEX_WAIT on task %d result = %d\n", id, result);
192
  assert(result == __PO_HI_SUCCESS);
yoogx's avatar
yoogx committed
193

Antonia Francis's avatar
Antonia Francis committed
194

195 196
   if (__po_hi_gqueues_sizes[id][port] == __PO_HI_GQUEUE_FIFO_INDATA)
   {
yoogx's avatar
yoogx committed
197
     memcpy(ptr,request,sizeof(*request));
198 199 200
   }
   else
   {
yoogx's avatar
yoogx committed
201 202
     __DEBUGMSG ("[GQUEUE] Received  message for task %d, port %d\n", id, port);

203
      if (__po_hi_gqueues_used_size[id][port] == __po_hi_gqueues_sizes[id][port])
204
      {
Antonia Francis's avatar
Antonia Francis committed
205
        /* Releasing only a mutex */
206 207 208
        int res = __po_hi_sem_mutex_release_gqueue(__po_hi_gqueues_semaphores,id);
        __DEBUGMSG("GQUEUE_SEM_MTUEX_RELEASE %d %d\n", id, res);
        assert(res == __PO_HI_SUCCESS);
yoogx's avatar
yoogx committed
209

Antonia Francis's avatar
Antonia Francis committed
210

yoogx's avatar
yoogx committed
211
        __PO_HI_DEBUG_CRITICAL ("[GQUEUE] QUEUE FULL, task-id=%d, port=%d\n", id, port);
jdelange's avatar
jdelange committed
212

yoogx's avatar
yoogx committed
213 214
        __DEBUGMSG ("[GQUEUE] Semaphore released (id=%d)\n", id);
        return __PO_HI_ERROR_QUEUE_FULL;
215
      }
216

217
      __po_hi_uint32_t   size;
218 219
      tmp = (__po_hi_request_t*) &__po_hi_gqueues[id][port];
      size = __po_hi_gqueues_woffsets[id][port] + __po_hi_gqueues_first[id][port];
220

jdelange's avatar
jdelange committed
221
      tmp = tmp + size;
222 223

      memcpy (tmp , request, sizeof (__po_hi_request_t));
jdelange's avatar
jdelange committed
224

225 226 227
      __po_hi_gqueues_woffsets[id][port] =  (__po_hi_gqueues_woffsets[id][port] + 1 ) % __po_hi_gqueues_sizes[id][port];

      __po_hi_gqueues_used_size[id][port]++;
yoogx's avatar
yoogx committed
228
      __PO_HI_INSTRUMENTATION_VCD_WRITE("r%d p%d.%d\n", __po_hi_gqueues_used_size[id][port], id, port);
229

230 231 232 233
      __po_hi_gqueues_global_history[id][__po_hi_gqueues_global_history_woffset[id]] = port;
      __po_hi_gqueues_global_history_woffset[id] = (__po_hi_gqueues_global_history_woffset[id] + 1 ) % __po_hi_gqueues_total_fifo_size[id];

      if (__po_hi_gqueues_port_is_empty[id][port] == 1)
234 235 236 237
      {
         __po_hi_gqueues_port_is_empty[id][port] = 0;
         __po_hi_gqueues_n_empty[id]--;
      }
238
      __po_hi_gqueues_queue_is_empty[id] = 0;
239
   }
Antonia Francis's avatar
Antonia Francis committed
240
  /* Releasing a complete semaphore */
241 242 243
  int rel = __po_hi_sem_release_gqueue(__po_hi_gqueues_semaphores,id);
  __DEBUGMSG("GQUEUE_SEM_RELEASE %d %d\n", id, rel);
  assert(rel == __PO_HI_SUCCESS);
244

245 246
  __DEBUGMSG ("[GQUEUE] store_in completed\n");
  return __PO_HI_SUCCESS;
247 248
}

249

yoogx's avatar
yoogx committed
250
void __po_hi_gqueue_wait_for_incoming_event (__po_hi_task_id id,
251
                                             __po_hi_local_port_t* port)
252
{
Antonia Francis's avatar
Antonia Francis committed
253
  /* Locking only the mutex of the semaphore */
254 255 256
  int result = __po_hi_sem_mutex_wait_gqueue(__po_hi_gqueues_semaphores,id);
  __DEBUGMSG("GQUEUE_SEM_MUTEX_WAIT %d %d\n", id, result);
  assert(result == __PO_HI_SUCCESS);
257

yoogx's avatar
yoogx committed
258 259 260
  while(__po_hi_gqueues_queue_is_empty[id] == 1)
    {
      __PO_HI_INSTRUMENTATION_VCD_WRITE("0t%d\n", id);
261

Antonia Francis's avatar
Antonia Francis committed
262
    /* Telling the semaphore to wait with putting its condvar on wait mode */
263 264 265
    int res_sem =  __po_hi_sem_wait_gqueue(__po_hi_gqueues_semaphores,id);
    __DEBUGMSG("GQUEUE_SEM_WAIT %d %d\n", id, result);
    assert(res_sem == __PO_HI_SUCCESS);
yoogx's avatar
yoogx committed
266 267
      __PO_HI_INSTRUMENTATION_VCD_WRITE("1t%d\n", id);
    }
jdelange's avatar
jdelange committed
268

yoogx's avatar
yoogx committed
269
  *port = __po_hi_gqueues_global_history[id][__po_hi_gqueues_global_history_offset[id]];
jdelange's avatar
jdelange committed
270

Antonia Francis's avatar
Antonia Francis committed
271
/* Releasing only the mutex of the semaphore*/
272 273 274
  int res = __po_hi_sem_mutex_release_gqueue(__po_hi_gqueues_semaphores,id);
  __DEBUGMSG("GQUEUE_SEM_MTUEX_RELEASE %d %d\n", id, res);
  assert(res == __PO_HI_SUCCESS);
275

276 277
}

278

279 280
int __po_hi_gqueue_get_count( __po_hi_task_id id, __po_hi_local_port_t port)
{
281 282
   if (__po_hi_gqueues_sizes[id][port] == __PO_HI_GQUEUE_FIFO_INDATA)
   {
283
      return 1; /* data port are always of size 1 */
284 285 286
   }
   else
   {
287
      return (__po_hi_gqueues_used_size[id][port]);
288
   }
289 290
}

291

yoogx's avatar
yoogx committed
292 293
int __po_hi_gqueue_get_value (__po_hi_task_id      id,
                              __po_hi_local_port_t port,
294
                              __po_hi_request_t*   request)
295
{
296
   __po_hi_request_t* ptr;
297
#ifdef __PO_HI_RTEMS_CLASSIC_API
298 299 300
   rtems_status_code ret;
#endif

jdelange's avatar
jdelange committed
301 302 303 304
#ifdef _WIN32
   DWORD ret;
#endif

Antonia Francis's avatar
Antonia Francis committed
305

306
   ptr = &__po_hi_gqueues_most_recent_values[id][port];
307

Antonia Francis's avatar
Antonia Francis committed
308
 /* Locking only the mutex of the semaphore */
309 310 311
  int result = __po_hi_sem_mutex_wait_gqueue(__po_hi_gqueues_semaphores,id);
  __DEBUGMSG("GQUEUE_SEM_MUTEX_WAIT %d %d\n", id, result);
  assert(result == __PO_HI_SUCCESS);
312

313 314 315 316 317 318
   /*
    * If the port is an event port, with no value queued, then we block
    * the thread.
    */
   if (__po_hi_gqueues_sizes[id][port] != __PO_HI_GQUEUE_FIFO_INDATA)
   {
319
      while (__po_hi_gqueues_port_is_empty[id][port] == 1)
320
      {
Antonia Francis's avatar
Antonia Francis committed
321
        /* Telling the semaphore to wait with putting its condvar on wait mode */
322 323 324
        int res_sem =  __po_hi_sem_wait_gqueue(__po_hi_gqueues_semaphores,id);
        __DEBUGMSG("GQUEUE_SEM_WAIT %d %d\n", id, result);
        assert(res_sem == __PO_HI_SUCCESS);
325 326 327
      }
   }

328 329 330 331
#if defined (MONITORING)
   update_sporadic_dispatch (id, port);
#endif

332 333
   if (__po_hi_gqueues_used_size[id][port] == 0)
   {
334
      memcpy (request, ptr, sizeof (__po_hi_request_t));
335
      //update_runtime (id, port, ptr);
336 337 338
   }
   else
   {
yoogx's avatar
yoogx committed
339
      ptr = ((__po_hi_request_t *) &__po_hi_gqueues[id][port]) +  __po_hi_gqueues_first[id][port] + __po_hi_gqueues_offsets[id][port];
340
      memcpy (request, ptr, sizeof (__po_hi_request_t));
341
   }
yoogx's avatar
yoogx committed
342

julien.delange's avatar
julien.delange committed
343
   __PO_HI_DEBUG_INFO ("[GQUEUE] Task %d get a value on port %d\n", id, port);
344

Antonia Francis's avatar
Antonia Francis committed
345
/* Releasing only the mutex of the semaphore*/
346 347 348
  int res = __po_hi_sem_mutex_release_gqueue(__po_hi_gqueues_semaphores,id);
  __DEBUGMSG("GQUEUE_SEM_MUTEX_RELEASE %d %d\n", id, res);
  assert(res == __PO_HI_SUCCESS);
349

350
   return 0;
351 352
}

353
int __po_hi_gqueue_next_value (__po_hi_task_id id, __po_hi_local_port_t port)
354
{
355
#ifdef __PO_HI_RTEMS_CLASSIC_API
356 357 358
   rtems_status_code ret;
#endif

359 360
   /* incomplete semantics, should discriminate and report whether
      there is a next value or not */
361

362 363 364
   /* XXX change and use assert ? */
   if (__po_hi_gqueues_sizes[id][port] == __PO_HI_GQUEUE_FIFO_INDATA)
   {
365
      return 1;
366
   }
367

Antonia Francis's avatar
Antonia Francis committed
368
 /* Locking a mutex */
369 370 371
  int result = __po_hi_sem_mutex_wait_gqueue(__po_hi_gqueues_semaphores,id);
  __DEBUGMSG("GQUEUE_SEM_MUTEX_WAIT %d %d\n", id, result);
  assert(result == __PO_HI_SUCCESS);
372

yoogx's avatar
yoogx committed
373 374
   __po_hi_gqueues_offsets[id][port] =
      (__po_hi_gqueues_offsets[id][port] + 1)
375
      % __po_hi_gqueues_sizes[id][port];
376

377
   __po_hi_gqueues_used_size[id][port]--;
378

yoogx's avatar
yoogx committed
379
   __PO_HI_INSTRUMENTATION_VCD_WRITE("r%d p%d.%d\n", __po_hi_gqueues_used_size[id][port], id, port);
380

381 382
   if (__po_hi_gqueues_used_size[id][port] == 0)
   {
383 384
      __po_hi_gqueues_n_empty[id]++;
      __po_hi_gqueues_port_is_empty[id][port] = 1;
385 386 387 388
   }

   if (__po_hi_gqueues_n_empty[id] == __po_hi_gqueues_nb_ports[id])
   {
389
      __po_hi_gqueues_queue_is_empty[id] = 1;
390
   }
391

yoogx's avatar
yoogx committed
392 393
   __po_hi_gqueues_global_history_offset[id] =
      (__po_hi_gqueues_global_history_offset[id] + 1)
394
      % __po_hi_gqueues_total_fifo_size[id];
395

396

Antonia Francis's avatar
Antonia Francis committed
397
/* Releasing a mutex*/
398 399 400 401 402
  int res = __po_hi_sem_mutex_release_gqueue(__po_hi_gqueues_semaphores,id);
  __DEBUGMSG("GQUEUE_SEM_MUTEX_RELEASE %d %d\n", id, res);
  assert(res == __PO_HI_SUCCESS);

  return __PO_HI_SUCCESS;
403
}
404

405
__po_hi_request_t*  __po_hi_gqueue_get_most_recent_value (const __po_hi_task_id task_id, const __po_hi_local_port_t local_port)
406
{
407
  return (&__po_hi_gqueues_most_recent_values[task_id][local_port]);
408 409 410 411
}

uint8_t __po_hi_gqueue_get_destinations_number (const __po_hi_task_id task_id, const __po_hi_local_port_t local_port)
{
412
  return (__po_hi_gqueues_n_destinations[task_id][local_port]);
413 414 415 416
}

__po_hi_port_t __po_hi_gqueue_get_destination (const __po_hi_task_id task_id, const __po_hi_local_port_t local_port, const uint8_t destination_number)
{
417
  return (__po_hi_gqueues_destinations[task_id][local_port][destination_number]);
418
}
419

420
__po_hi_port_id_t __po_hi_gqueue_get_port_size( __po_hi_task_id id, __po_hi_local_port_t port)
421 422 423 424
{
   return __po_hi_gqueues_sizes[id][port];
}

425
__po_hi_port_id_t __po_hi_gqueue_used_size( __po_hi_task_id id, __po_hi_local_port_t port)
426 427 428 429
{
   return __po_hi_gqueues_used_size[id][port];
}

430
__po_hi_port_id_t po_hi_gqueues_queue_is_empty( __po_hi_task_id id)
431 432 433 434
{
   return __po_hi_gqueues_queue_is_empty[id];
}

435
__po_hi_request_t*
436 437
__po_hi_gqueues_get_request(__po_hi_task_id id, __po_hi_local_port_t port)

438
 {
439 440 441 442
  __po_hi_request_t* request ;
  __po_hi_request_t* ptr ;
  request = calloc(1,sizeof(__po_hi_request_t));
  ptr = &__po_hi_gqueues_most_recent_values[id][port];
443 444 445 446 447 448 449 450 451
   if (__po_hi_gqueues_used_size[id][port] == 0)
   {
      memcpy (request, ptr, sizeof (__po_hi_request_t));
      //update_runtime (id, port, ptr);
   }
   else
   {
      ptr = ((__po_hi_request_t *) &__po_hi_gqueues[id][port]) +  __po_hi_gqueues_first[id][port] + __po_hi_gqueues_offsets[id][port];
      memcpy (request, ptr, sizeof (__po_hi_request_t));
452
   }	return request;
453
}