po_hi_gqueue.c 11 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/*
 * This is a part of PolyORB-HI-C distribution, a minimal
 * middleware written for generated code from AADL models.
 * You should use it with the Ocarina toolsuite.
 *
 * For more informations, please visit http://ocarina.enst.fr
 *
 * Copyright (C) 2007-2009, GET-Telecom Paris.
 */

#include <po_hi_config.h>
#include <po_hi_types.h>
#include <po_hi_messages.h>
#include <po_hi_returns.h>
#include <po_hi_transport.h>
#include <po_hi_debug.h>
#include <po_hi_gqueue.h>
/* Headers from PolyORB-HI-C */

#include <deployment.h>
#include <activity.h>
#include <request.h>
/* Headers from the generated code */

#include <string.h>
#include <pthread.h>
/* System-dependent headers */

#define __PO_HI_GQUEUE_OUT_PORT constant_out_identifier 
/* give a default value to the out port */

__po_hi_port_t*        __po_hi_gqueues[__PO_HI_NB_TASKS];
__po_hi_int8_t         __po_hi_gqueues_nb_ports[__PO_HI_NB_TASKS];
__po_hi_int8_t*        __po_hi_gqueues_sizes[__PO_HI_NB_TASKS];
__po_hi_uint8_t*       __po_hi_gqueues_used_size[__PO_HI_NB_TASKS];
__po_hi_uint8_t*       __po_hi_gqueues_offsets[__PO_HI_NB_TASKS];
__po_hi_uint8_t*       __po_hi_gqueues_woffsets[__PO_HI_NB_TASKS];
__po_hi_uint8_t*       __po_hi_gqueues_n_destinations[__PO_HI_NB_TASKS];
__po_hi_port_t**       __po_hi_gqueues_destinations[__PO_HI_NB_TASKS];
__po_hi_uint16_t       __po_hi_gqueues_total_fifo_size[__PO_HI_NB_TASKS];
__po_hi_request_t*     __po_hi_gqueues_most_recent_values[__PO_HI_NB_TASKS];
__po_hi_uint8_t*       __po_hi_gqueues_first[__PO_HI_NB_TASKS];

__po_hi_uint8_t        __po_hi_gqueues_global_size[__PO_HI_NB_TASKS];
__po_hi_local_port_t*  __po_hi_gqueues_global_history[__PO_HI_NB_TASKS];
__po_hi_uint16_t       __po_hi_gqueues_global_history_offset[__PO_HI_NB_TASKS];
__po_hi_uint16_t       __po_hi_gqueues_global_history_woffset[__PO_HI_NB_TASKS];

__po_hi_uint8_t*       __po_hi_gqueues_port_is_empty[__PO_HI_NB_TASKS];
__po_hi_uint8_t        __po_hi_gqueues_queue_is_empty[__PO_HI_NB_TASKS];
__po_hi_uint8_t        __po_hi_gqueues_n_empty[__PO_HI_NB_TASKS];

pthread_mutex_t        __po_hi_gqueues_mutexes[__PO_HI_NB_TASKS];
pthread_cond_t         __po_hi_gqueues_conds[__PO_HI_NB_TASKS];
pthread_mutexattr_t    __po_hi_gqueues_mutexes_attr[__PO_HI_NB_TASKS];
pthread_condattr_t     __po_hi_gqueues_conds_attr[__PO_HI_NB_TASKS];

58
59
60
61
62
63
64
65
66
67
68
69
70
71
void __po_hi_gqueue_init (__po_hi_task_id       id,
                          __po_hi_uint8_t       nb_ports,
                          __po_hi_port_t        queue[],
                          __po_hi_int8_t        sizes[],
                          __po_hi_uint8_t       first[],
                          __po_hi_uint8_t       offsets[],
                          __po_hi_uint8_t       woffsets[],
                          __po_hi_uint8_t       n_dest[],
                          __po_hi_port_t*       destinations[],
                          __po_hi_uint8_t       used_size[],
                          __po_hi_local_port_t  history[],
                          __po_hi_request_t     recent[],
                          __po_hi_uint8_t       empties[],
                          __po_hi_uint16_t      total_fifo_size)
72
{
73
74
75
   __po_hi_uint8_t tmp;
   __po_hi_uint16_t off;
   __po_hi_request_t* request;
76

77
78
   __po_hi_gqueues_global_history_woffset[id] = 0;
   __po_hi_gqueues_global_history_offset[id] = 0;
79

80
81
82
83
84
   __po_hi_gqueues_n_empty[id] = nb_ports;
   __po_hi_gqueues[id] = queue;
   __po_hi_gqueues_most_recent_values[id] = recent;
   __po_hi_gqueues_global_history[id] = history;
   __po_hi_gqueues_woffsets[id] = woffsets;
85

86
   __po_hi_gqueues_port_is_empty[id] = empties;
87

88
89
90
91
   __po_hi_gqueues_nb_ports[id] = nb_ports; 
   __po_hi_gqueues_sizes[id] = sizes;
   __po_hi_gqueues_first[id] = first;
   __po_hi_gqueues_used_size[id] = used_size;
92

93
94
95
96
   __po_hi_gqueues_offsets[id] = offsets;
   __po_hi_gqueues_n_destinations[id] = n_dest;
   __po_hi_gqueues_destinations[id] = destinations;
   __po_hi_gqueues_total_fifo_size[id] = total_fifo_size;
97

98
   __po_hi_gqueues_queue_is_empty[id] = 1;
99

100
101
   pthread_mutexattr_init (&__po_hi_gqueues_mutexes_attr[id]);
   pthread_condattr_init (&__po_hi_gqueues_conds_attr[id]);
102
#ifdef POSIX
103
   pthread_mutexattr_setpshared(&__po_hi_gqueues_mutexes_attr[id],PTHREAD_PROCESS_SHARED); 
104
#endif
105
106
   pthread_mutex_init (&__po_hi_gqueues_mutexes[id], &__po_hi_gqueues_mutexes_attr[id]);
   pthread_cond_init (&__po_hi_gqueues_conds[id], &__po_hi_gqueues_conds_attr[id]);
107

108
   off = 0;
109

110
111
   for (tmp=0;tmp<nb_ports;tmp++)
   {
112
      __po_hi_gqueues_used_size[id][tmp] = 0;
113

114
      if ( (sizes[tmp] != __PO_HI_GQUEUE_FIFO_INDATA) 
115
116
117
118
119
120
121
122
            && (sizes[tmp] != __PO_HI_GQUEUE_FIFO_OUT))
      {
         __po_hi_gqueues_first[id][tmp]=off;
         off += __po_hi_gqueues_sizes[id][tmp];
         __po_hi_gqueues_offsets[id][tmp] = 0;
         __po_hi_gqueues_woffsets[id][tmp] = 0;
         __po_hi_gqueues_port_is_empty[id][tmp] = 1;
      }
123
124
125
126

      /* Set invalid all recent values */
      request = (__po_hi_request_t*)&__po_hi_gqueues_most_recent_values[id][tmp];
      request->port = __PO_HI_GQUEUE_INVALID_PORT;
127
   }
128
129

#ifdef __PO_HI_DEBUG
130
131
132
   __DEBUGMSG("Initialize global queue for task-id %d ... ", id);
   for (tmp=0;tmp<nb_ports;tmp++)
   {
133
      __DEBUGMSG("port %d (used_size=%d,first=%d) ", 
134
135
136
137
138
            tmp, 
            __po_hi_gqueues_used_size[id][tmp],
            __po_hi_gqueues_first[id][tmp]);
   }
   __DEBUGMSG(" ... done\n");
139
140
141
142
143
#endif 
}


void __po_hi_gqueue_store_out (__po_hi_task_id id, 
144
145
                               __po_hi_local_port_t port, 
                               __po_hi_request_t* request)
146
{
147
   __po_hi_request_t* ptr;
148

149
150
151
   request->port = __PO_HI_GQUEUE_OUT_PORT;
   ptr = &__po_hi_gqueues_most_recent_values[id][port];
   memcpy (ptr, request, sizeof (*request));
152
153
}

154
155


156
__po_hi_uint8_t __po_hi_gqueue_store_in (__po_hi_task_id id, 
157
158
                                         __po_hi_local_port_t port, 
                                         __po_hi_request_t* request)
159
{
160
161
   __po_hi_request_t* ptr;
   ptr = &__po_hi_gqueues_most_recent_values[id][port];
162
#ifdef __PO_HI_DEBUG
163
164
   if (ptr == NULL)
   {
165
      __DEBUGMSG ("__po_hi_gqueue_store_in : NULL POINTER\n");
166
   }
167
168
#endif

169
170
171
   pthread_mutex_lock (&__po_hi_gqueues_mutexes[id]);
   if (__po_hi_gqueues_sizes[id][port] == __PO_HI_GQUEUE_FIFO_INDATA)
   {
172
      memcpy(ptr,request,sizeof(*request));
173
174
175
   }
   else
   {
176
177
178
179
#ifdef __PO_HI_DEBUG
      __DEBUGMSG ("Received  message for task %d, port %d\n", id, port);
#endif
      if (__po_hi_gqueues_used_size[id][port] == __po_hi_gqueues_sizes[id][port])
180
181
182
183
184
      {
         pthread_mutex_unlock (&__po_hi_gqueues_mutexes[id]);
         __DEBUGMSG ("QUEUE FULL");
         return __PO_HI_ERROR_QUEUE_FULL;
      }
185
186
187
188
189
190
191
192
193
194

      memcpy ((void *)&__po_hi_gqueues[id][port] + ( (__po_hi_gqueues_woffsets[id][port] + __po_hi_gqueues_first[id][port])  * sizeof (*request) ) , request, sizeof (*request));
      __po_hi_gqueues_woffsets[id][port] =  (__po_hi_gqueues_woffsets[id][port] + 1 ) % __po_hi_gqueues_sizes[id][port];

      __po_hi_gqueues_used_size[id][port]++;

      __po_hi_gqueues_global_history[id][__po_hi_gqueues_global_history_woffset[id]] = port;
      __po_hi_gqueues_global_history_woffset[id] = (__po_hi_gqueues_global_history_woffset[id] + 1 ) % __po_hi_gqueues_total_fifo_size[id];

      if (__po_hi_gqueues_port_is_empty[id][port] == 1)
195
196
197
198
      {
         __po_hi_gqueues_port_is_empty[id][port] = 0;
         __po_hi_gqueues_n_empty[id]--;
      }
199
      __po_hi_gqueues_queue_is_empty[id] = 0;
200
   }
201

202
203
   pthread_mutex_unlock (&__po_hi_gqueues_mutexes[id]);
   pthread_cond_broadcast (&__po_hi_gqueues_conds[id]);
204

205
   return __PO_HI_SUCCESS;
206
207
}

208
209
void __po_hi_gqueue_wait_for_incoming_event (__po_hi_task_id id, 
                                             __po_hi_local_port_t* port)
210
{
211
212
213
   pthread_mutex_lock (&__po_hi_gqueues_mutexes[id]);
   while(__po_hi_gqueues_queue_is_empty[id] == 1)
   {
214
      pthread_cond_wait (&__po_hi_gqueues_conds[id],
215
216
217
218
            &__po_hi_gqueues_mutexes[id]);
   }
   *port = __po_hi_gqueues_global_history[id][__po_hi_gqueues_global_history_offset[id]];
   pthread_mutex_unlock (&__po_hi_gqueues_mutexes[id]);
219
220
221
222
}

int __po_hi_gqueue_get_count( __po_hi_task_id id, __po_hi_local_port_t port)
{
223
224
   if (__po_hi_gqueues_sizes[id][port] == __PO_HI_GQUEUE_FIFO_INDATA)
   {
225
      return 1; /* data port are always of size 1 */
226
227
228
   }
   else
   {
229
      return (__po_hi_gqueues_used_size[id][port]);
230
   }
231
232
233
}

int __po_hi_gqueue_get_value( __po_hi_task_id id, 
234
235
      __po_hi_local_port_t port, 
      __po_hi_request_t* request)
236
{
237
238
239
240
241
242
243
244
245
246
   __po_hi_request_t* ptr;
   ptr = &__po_hi_gqueues_most_recent_values[id][port];
   pthread_mutex_lock (&__po_hi_gqueues_mutexes[id]);

   /*
    * If the port is an event port, with no value queued, then we block
    * the thread.
    */
   if (__po_hi_gqueues_sizes[id][port] != __PO_HI_GQUEUE_FIFO_INDATA)
   {
247
      while (__po_hi_gqueues_port_is_empty[id][port] == 1)
248
249
250
251
252
253
254
255
      {
         pthread_cond_wait (&__po_hi_gqueues_conds[id],
               &__po_hi_gqueues_mutexes[id]);
      }
   }

   if (__po_hi_gqueues_used_size[id][port] == 0)
   {
256
      memcpy (request, ptr, sizeof (__po_hi_request_t));
257
258
259
   }
   else
   {
260
      memcpy (request, 
261
262
263
264
265
266
            (void *)&__po_hi_gqueues[id][port] 
            + ( __po_hi_gqueues_first[id][port] 
               + __po_hi_gqueues_offsets[id][port] ) 
            * sizeof (__po_hi_request_t), 
            sizeof (__po_hi_request_t));
   }
267
268

#ifdef __PO_HI_DEBUG
269
   __DEBUGMSG ("Task %d get a value on port %d\n", id, port);
270
271
#endif

272
273
   pthread_mutex_unlock (&__po_hi_gqueues_mutexes[id]);
   return 0;
274
275
}

276
int __po_hi_gqueue_next_value (__po_hi_task_id id, __po_hi_local_port_t port)
277
{
278
279
   /* incomplete semantics, should discriminate and report whether
      there is a next value or not */
280

281
282
283
   /* XXX change and use assert ? */
   if (__po_hi_gqueues_sizes[id][port] == __PO_HI_GQUEUE_FIFO_INDATA)
   {
284
      return 1;
285
   }
286

287
288
289
290
   pthread_mutex_lock (&__po_hi_gqueues_mutexes[id]);
   __po_hi_gqueues_offsets[id][port] = 
      (__po_hi_gqueues_offsets[id][port] + 1) 
      % __po_hi_gqueues_sizes[id][port];
291

292
   __po_hi_gqueues_used_size[id][port]--;
293

294
295
   if (__po_hi_gqueues_used_size[id][port] == 0)
   {
296
297
      __po_hi_gqueues_n_empty[id]++;
      __po_hi_gqueues_port_is_empty[id][port] = 1;
298
299
300
301
   }

   if (__po_hi_gqueues_n_empty[id] == __po_hi_gqueues_nb_ports[id])
   {
302
      __po_hi_gqueues_queue_is_empty[id] = 1;
303
   }
304

305
306
307
   __po_hi_gqueues_global_history_offset[id] = 
      (__po_hi_gqueues_global_history_offset[id] + 1) 
      % __po_hi_gqueues_total_fifo_size[id];
308

309
310
   pthread_mutex_unlock (&__po_hi_gqueues_mutexes[id]);
   return __PO_HI_SUCCESS;
311
}
312

313
__po_hi_request_t*  __po_hi_gqueue_get_most_recent_value (const __po_hi_task_id task_id, const __po_hi_local_port_t local_port)
314
315
316
317
318
319
320
321
322
323
324
325
326
327
{
   return (&__po_hi_gqueues_most_recent_values[task_id][local_port]);
}

uint8_t __po_hi_gqueue_get_destinations_number (const __po_hi_task_id task_id, const __po_hi_local_port_t local_port)
{
      return (__po_hi_gqueues_n_destinations[task_id][local_port]);
}

__po_hi_port_t __po_hi_gqueue_get_destination (const __po_hi_task_id task_id, const __po_hi_local_port_t local_port, const uint8_t destination_number)
{
      return (__po_hi_gqueues_destinations[task_id][local_port][destination_number]);
}