po_hi_gqueue.c 11.1 KB
Newer Older
1
2
3
4
5
6
7
8
/*
 * This is a part of PolyORB-HI-C distribution, a minimal
 * middleware written for generated code from AADL models.
 * You should use it with the Ocarina toolsuite.
 *
 * For more informations, please visit http://ocarina.enst.fr
 *
 * Copyright (C) 2007-2009, GET-Telecom Paris.
9
 * Copyright (C) 2010, European Space Agency
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
 */

#include <po_hi_config.h>
#include <po_hi_types.h>
#include <po_hi_messages.h>
#include <po_hi_returns.h>
#include <po_hi_transport.h>
#include <po_hi_debug.h>
#include <po_hi_gqueue.h>
/* Headers from PolyORB-HI-C */

#include <deployment.h>
#include <activity.h>
#include <request.h>
/* Headers from the generated code */

#include <string.h>
#include <pthread.h>
/* System-dependent headers */

#define __PO_HI_GQUEUE_OUT_PORT constant_out_identifier 
/* give a default value to the out port */

__po_hi_port_t*        __po_hi_gqueues[__PO_HI_NB_TASKS];
__po_hi_int8_t         __po_hi_gqueues_nb_ports[__PO_HI_NB_TASKS];
__po_hi_int8_t*        __po_hi_gqueues_sizes[__PO_HI_NB_TASKS];
__po_hi_uint8_t*       __po_hi_gqueues_used_size[__PO_HI_NB_TASKS];
__po_hi_uint8_t*       __po_hi_gqueues_offsets[__PO_HI_NB_TASKS];
__po_hi_uint8_t*       __po_hi_gqueues_woffsets[__PO_HI_NB_TASKS];
__po_hi_uint8_t*       __po_hi_gqueues_n_destinations[__PO_HI_NB_TASKS];
__po_hi_port_t**       __po_hi_gqueues_destinations[__PO_HI_NB_TASKS];
__po_hi_uint16_t       __po_hi_gqueues_total_fifo_size[__PO_HI_NB_TASKS];
__po_hi_request_t*     __po_hi_gqueues_most_recent_values[__PO_HI_NB_TASKS];
__po_hi_uint8_t*       __po_hi_gqueues_first[__PO_HI_NB_TASKS];

__po_hi_uint8_t        __po_hi_gqueues_global_size[__PO_HI_NB_TASKS];
__po_hi_local_port_t*  __po_hi_gqueues_global_history[__PO_HI_NB_TASKS];
__po_hi_uint16_t       __po_hi_gqueues_global_history_offset[__PO_HI_NB_TASKS];
__po_hi_uint16_t       __po_hi_gqueues_global_history_woffset[__PO_HI_NB_TASKS];

__po_hi_uint8_t*       __po_hi_gqueues_port_is_empty[__PO_HI_NB_TASKS];
__po_hi_uint8_t        __po_hi_gqueues_queue_is_empty[__PO_HI_NB_TASKS];
__po_hi_uint8_t        __po_hi_gqueues_n_empty[__PO_HI_NB_TASKS];

pthread_mutex_t        __po_hi_gqueues_mutexes[__PO_HI_NB_TASKS];
pthread_cond_t         __po_hi_gqueues_conds[__PO_HI_NB_TASKS];
pthread_mutexattr_t    __po_hi_gqueues_mutexes_attr[__PO_HI_NB_TASKS];
pthread_condattr_t     __po_hi_gqueues_conds_attr[__PO_HI_NB_TASKS];

59
60
61
62
63
64
65
66
67
68
69
70
71
72
void __po_hi_gqueue_init (__po_hi_task_id       id,
                          __po_hi_uint8_t       nb_ports,
                          __po_hi_port_t        queue[],
                          __po_hi_int8_t        sizes[],
                          __po_hi_uint8_t       first[],
                          __po_hi_uint8_t       offsets[],
                          __po_hi_uint8_t       woffsets[],
                          __po_hi_uint8_t       n_dest[],
                          __po_hi_port_t*       destinations[],
                          __po_hi_uint8_t       used_size[],
                          __po_hi_local_port_t  history[],
                          __po_hi_request_t     recent[],
                          __po_hi_uint8_t       empties[],
                          __po_hi_uint16_t      total_fifo_size)
73
{
74
75
76
   __po_hi_uint8_t tmp;
   __po_hi_uint16_t off;
   __po_hi_request_t* request;
77

78
79
   __po_hi_gqueues_global_history_woffset[id] = 0;
   __po_hi_gqueues_global_history_offset[id] = 0;
80

81
82
83
84
85
   __po_hi_gqueues_n_empty[id] = nb_ports;
   __po_hi_gqueues[id] = queue;
   __po_hi_gqueues_most_recent_values[id] = recent;
   __po_hi_gqueues_global_history[id] = history;
   __po_hi_gqueues_woffsets[id] = woffsets;
86

87
   __po_hi_gqueues_port_is_empty[id] = empties;
88

89
90
91
92
   __po_hi_gqueues_nb_ports[id] = nb_ports; 
   __po_hi_gqueues_sizes[id] = sizes;
   __po_hi_gqueues_first[id] = first;
   __po_hi_gqueues_used_size[id] = used_size;
93

94
95
96
97
   __po_hi_gqueues_offsets[id] = offsets;
   __po_hi_gqueues_n_destinations[id] = n_dest;
   __po_hi_gqueues_destinations[id] = destinations;
   __po_hi_gqueues_total_fifo_size[id] = total_fifo_size;
98

99
   __po_hi_gqueues_queue_is_empty[id] = 1;
100

101
102
   pthread_mutexattr_init (&__po_hi_gqueues_mutexes_attr[id]);
   pthread_condattr_init (&__po_hi_gqueues_conds_attr[id]);
103
#ifdef POSIX
104
   pthread_mutexattr_setpshared(&__po_hi_gqueues_mutexes_attr[id],PTHREAD_PROCESS_SHARED); 
105
#endif
106
107
   pthread_mutex_init (&__po_hi_gqueues_mutexes[id], &__po_hi_gqueues_mutexes_attr[id]);
   pthread_cond_init (&__po_hi_gqueues_conds[id], &__po_hi_gqueues_conds_attr[id]);
108

109
   off = 0;
110

111
112
   for (tmp=0;tmp<nb_ports;tmp++)
   {
113
      __po_hi_gqueues_used_size[id][tmp] = 0;
114

115
      if ( (sizes[tmp] != __PO_HI_GQUEUE_FIFO_INDATA) 
116
117
118
119
120
121
122
123
            && (sizes[tmp] != __PO_HI_GQUEUE_FIFO_OUT))
      {
         __po_hi_gqueues_first[id][tmp]=off;
         off += __po_hi_gqueues_sizes[id][tmp];
         __po_hi_gqueues_offsets[id][tmp] = 0;
         __po_hi_gqueues_woffsets[id][tmp] = 0;
         __po_hi_gqueues_port_is_empty[id][tmp] = 1;
      }
124
125
126
127

      /* Set invalid all recent values */
      request = (__po_hi_request_t*)&__po_hi_gqueues_most_recent_values[id][tmp];
      request->port = __PO_HI_GQUEUE_INVALID_PORT;
128
   }
129
130

#ifdef __PO_HI_DEBUG
131
132
133
   __DEBUGMSG("Initialize global queue for task-id %d ... ", id);
   for (tmp=0;tmp<nb_ports;tmp++)
   {
134
      __DEBUGMSG("port %d (used_size=%d,first=%d) ", 
135
136
137
138
139
            tmp, 
            __po_hi_gqueues_used_size[id][tmp],
            __po_hi_gqueues_first[id][tmp]);
   }
   __DEBUGMSG(" ... done\n");
140
141
142
143
144
#endif 
}


void __po_hi_gqueue_store_out (__po_hi_task_id id, 
145
146
                               __po_hi_local_port_t port, 
                               __po_hi_request_t* request)
147
{
148
   __po_hi_request_t* ptr;
149

150
151
152
   request->port = __PO_HI_GQUEUE_OUT_PORT;
   ptr = &__po_hi_gqueues_most_recent_values[id][port];
   memcpy (ptr, request, sizeof (*request));
153
154
}

155
156


157
__po_hi_uint8_t __po_hi_gqueue_store_in (__po_hi_task_id id, 
158
159
                                         __po_hi_local_port_t port, 
                                         __po_hi_request_t* request)
160
{
161
162
   __po_hi_request_t* ptr;
   ptr = &__po_hi_gqueues_most_recent_values[id][port];
163
#ifdef __PO_HI_DEBUG
164
165
   if (ptr == NULL)
   {
166
      __DEBUGMSG ("__po_hi_gqueue_store_in : NULL POINTER\n");
167
   }
168
169
#endif

170
171
172
   pthread_mutex_lock (&__po_hi_gqueues_mutexes[id]);
   if (__po_hi_gqueues_sizes[id][port] == __PO_HI_GQUEUE_FIFO_INDATA)
   {
173
      memcpy(ptr,request,sizeof(*request));
174
175
176
   }
   else
   {
177
178
179
180
#ifdef __PO_HI_DEBUG
      __DEBUGMSG ("Received  message for task %d, port %d\n", id, port);
#endif
      if (__po_hi_gqueues_used_size[id][port] == __po_hi_gqueues_sizes[id][port])
181
182
183
184
185
      {
         pthread_mutex_unlock (&__po_hi_gqueues_mutexes[id]);
         __DEBUGMSG ("QUEUE FULL");
         return __PO_HI_ERROR_QUEUE_FULL;
      }
186
187
188
189
190
191
192
193
194
195

      memcpy ((void *)&__po_hi_gqueues[id][port] + ( (__po_hi_gqueues_woffsets[id][port] + __po_hi_gqueues_first[id][port])  * sizeof (*request) ) , request, sizeof (*request));
      __po_hi_gqueues_woffsets[id][port] =  (__po_hi_gqueues_woffsets[id][port] + 1 ) % __po_hi_gqueues_sizes[id][port];

      __po_hi_gqueues_used_size[id][port]++;

      __po_hi_gqueues_global_history[id][__po_hi_gqueues_global_history_woffset[id]] = port;
      __po_hi_gqueues_global_history_woffset[id] = (__po_hi_gqueues_global_history_woffset[id] + 1 ) % __po_hi_gqueues_total_fifo_size[id];

      if (__po_hi_gqueues_port_is_empty[id][port] == 1)
196
197
198
199
      {
         __po_hi_gqueues_port_is_empty[id][port] = 0;
         __po_hi_gqueues_n_empty[id]--;
      }
200
      __po_hi_gqueues_queue_is_empty[id] = 0;
201
   }
202

203
204
   pthread_mutex_unlock (&__po_hi_gqueues_mutexes[id]);
   pthread_cond_broadcast (&__po_hi_gqueues_conds[id]);
205

206
   return __PO_HI_SUCCESS;
207
208
}

209
210
void __po_hi_gqueue_wait_for_incoming_event (__po_hi_task_id id, 
                                             __po_hi_local_port_t* port)
211
{
212
213
214
   pthread_mutex_lock (&__po_hi_gqueues_mutexes[id]);
   while(__po_hi_gqueues_queue_is_empty[id] == 1)
   {
215
      pthread_cond_wait (&__po_hi_gqueues_conds[id],
216
217
218
219
            &__po_hi_gqueues_mutexes[id]);
   }
   *port = __po_hi_gqueues_global_history[id][__po_hi_gqueues_global_history_offset[id]];
   pthread_mutex_unlock (&__po_hi_gqueues_mutexes[id]);
220
221
222
223
}

int __po_hi_gqueue_get_count( __po_hi_task_id id, __po_hi_local_port_t port)
{
224
225
   if (__po_hi_gqueues_sizes[id][port] == __PO_HI_GQUEUE_FIFO_INDATA)
   {
226
      return 1; /* data port are always of size 1 */
227
228
229
   }
   else
   {
230
      return (__po_hi_gqueues_used_size[id][port]);
231
   }
232
233
234
}

int __po_hi_gqueue_get_value( __po_hi_task_id id, 
235
236
      __po_hi_local_port_t port, 
      __po_hi_request_t* request)
237
{
238
239
240
241
242
243
244
245
246
247
   __po_hi_request_t* ptr;
   ptr = &__po_hi_gqueues_most_recent_values[id][port];
   pthread_mutex_lock (&__po_hi_gqueues_mutexes[id]);

   /*
    * If the port is an event port, with no value queued, then we block
    * the thread.
    */
   if (__po_hi_gqueues_sizes[id][port] != __PO_HI_GQUEUE_FIFO_INDATA)
   {
248
      while (__po_hi_gqueues_port_is_empty[id][port] == 1)
249
250
251
252
253
254
255
256
      {
         pthread_cond_wait (&__po_hi_gqueues_conds[id],
               &__po_hi_gqueues_mutexes[id]);
      }
   }

   if (__po_hi_gqueues_used_size[id][port] == 0)
   {
257
      memcpy (request, ptr, sizeof (__po_hi_request_t));
258
259
260
   }
   else
   {
261
      memcpy (request, 
262
263
264
265
266
267
            (void *)&__po_hi_gqueues[id][port] 
            + ( __po_hi_gqueues_first[id][port] 
               + __po_hi_gqueues_offsets[id][port] ) 
            * sizeof (__po_hi_request_t), 
            sizeof (__po_hi_request_t));
   }
268
269

#ifdef __PO_HI_DEBUG
270
   __DEBUGMSG ("Task %d get a value on port %d\n", id, port);
271
272
#endif

273
274
   pthread_mutex_unlock (&__po_hi_gqueues_mutexes[id]);
   return 0;
275
276
}

277
int __po_hi_gqueue_next_value (__po_hi_task_id id, __po_hi_local_port_t port)
278
{
279
280
   /* incomplete semantics, should discriminate and report whether
      there is a next value or not */
281

282
283
284
   /* XXX change and use assert ? */
   if (__po_hi_gqueues_sizes[id][port] == __PO_HI_GQUEUE_FIFO_INDATA)
   {
285
      return 1;
286
   }
287

288
289
290
291
   pthread_mutex_lock (&__po_hi_gqueues_mutexes[id]);
   __po_hi_gqueues_offsets[id][port] = 
      (__po_hi_gqueues_offsets[id][port] + 1) 
      % __po_hi_gqueues_sizes[id][port];
292

293
   __po_hi_gqueues_used_size[id][port]--;
294

295
296
   if (__po_hi_gqueues_used_size[id][port] == 0)
   {
297
298
      __po_hi_gqueues_n_empty[id]++;
      __po_hi_gqueues_port_is_empty[id][port] = 1;
299
300
301
302
   }

   if (__po_hi_gqueues_n_empty[id] == __po_hi_gqueues_nb_ports[id])
   {
303
      __po_hi_gqueues_queue_is_empty[id] = 1;
304
   }
305

306
307
308
   __po_hi_gqueues_global_history_offset[id] = 
      (__po_hi_gqueues_global_history_offset[id] + 1) 
      % __po_hi_gqueues_total_fifo_size[id];
309

310
311
   pthread_mutex_unlock (&__po_hi_gqueues_mutexes[id]);
   return __PO_HI_SUCCESS;
312
}
313

314
__po_hi_request_t*  __po_hi_gqueue_get_most_recent_value (const __po_hi_task_id task_id, const __po_hi_local_port_t local_port)
315
316
317
318
319
320
321
322
323
324
325
326
327
328
{
   return (&__po_hi_gqueues_most_recent_values[task_id][local_port]);
}

uint8_t __po_hi_gqueue_get_destinations_number (const __po_hi_task_id task_id, const __po_hi_local_port_t local_port)
{
      return (__po_hi_gqueues_n_destinations[task_id][local_port]);
}

__po_hi_port_t __po_hi_gqueue_get_destination (const __po_hi_task_id task_id, const __po_hi_local_port_t local_port, const uint8_t destination_number)
{
      return (__po_hi_gqueues_destinations[task_id][local_port][destination_number]);
}