po_hi_gqueue.c 10.6 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/*
 * This is a part of PolyORB-HI-C distribution, a minimal
 * middleware written for generated code from AADL models.
 * You should use it with the Ocarina toolsuite.
 *
 * For more informations, please visit http://ocarina.enst.fr
 *
 * Copyright (C) 2007-2009, GET-Telecom Paris.
 */

#include <po_hi_config.h>
#include <po_hi_types.h>
#include <po_hi_messages.h>
#include <po_hi_returns.h>
#include <po_hi_transport.h>
#include <po_hi_debug.h>
#include <po_hi_gqueue.h>
/* Headers from PolyORB-HI-C */

#include <deployment.h>
#include <activity.h>
#include <request.h>
/* Headers from the generated code */

#include <string.h>
#include <pthread.h>
/* System-dependent headers */

#define __PO_HI_GQUEUE_OUT_PORT constant_out_identifier 
/* give a default value to the out port */

extern __po_hi_entity_t       __po_hi_port_global_to_entity[__PO_HI_NB_PORTS];
extern __po_hi_local_port_t   __po_hi_port_global_to_local[__PO_HI_NB_PORTS];

__po_hi_port_t*        __po_hi_gqueues[__PO_HI_NB_TASKS];
__po_hi_int8_t         __po_hi_gqueues_nb_ports[__PO_HI_NB_TASKS];
__po_hi_int8_t*        __po_hi_gqueues_sizes[__PO_HI_NB_TASKS];
__po_hi_uint8_t*       __po_hi_gqueues_used_size[__PO_HI_NB_TASKS];
__po_hi_uint8_t*       __po_hi_gqueues_offsets[__PO_HI_NB_TASKS];
__po_hi_uint8_t*       __po_hi_gqueues_woffsets[__PO_HI_NB_TASKS];
__po_hi_uint8_t*       __po_hi_gqueues_n_destinations[__PO_HI_NB_TASKS];
__po_hi_port_t**       __po_hi_gqueues_destinations[__PO_HI_NB_TASKS];
__po_hi_uint16_t       __po_hi_gqueues_total_fifo_size[__PO_HI_NB_TASKS];
__po_hi_request_t*     __po_hi_gqueues_most_recent_values[__PO_HI_NB_TASKS];
__po_hi_uint8_t*       __po_hi_gqueues_first[__PO_HI_NB_TASKS];

__po_hi_uint8_t        __po_hi_gqueues_global_size[__PO_HI_NB_TASKS];
__po_hi_local_port_t*  __po_hi_gqueues_global_history[__PO_HI_NB_TASKS];
__po_hi_uint16_t       __po_hi_gqueues_global_history_offset[__PO_HI_NB_TASKS];
__po_hi_uint16_t       __po_hi_gqueues_global_history_woffset[__PO_HI_NB_TASKS];

__po_hi_uint8_t*       __po_hi_gqueues_port_is_empty[__PO_HI_NB_TASKS];
__po_hi_uint8_t        __po_hi_gqueues_queue_is_empty[__PO_HI_NB_TASKS];
__po_hi_uint8_t        __po_hi_gqueues_n_empty[__PO_HI_NB_TASKS];

pthread_mutex_t        __po_hi_gqueues_mutexes[__PO_HI_NB_TASKS];
pthread_cond_t         __po_hi_gqueues_conds[__PO_HI_NB_TASKS];
pthread_mutexattr_t    __po_hi_gqueues_mutexes_attr[__PO_HI_NB_TASKS];
pthread_condattr_t     __po_hi_gqueues_conds_attr[__PO_HI_NB_TASKS];

61
62
63
64
65
66
67
68
69
70
71
72
73
74
void __po_hi_gqueue_init (__po_hi_task_id       id,
                          __po_hi_uint8_t       nb_ports,
                          __po_hi_port_t        queue[],
                          __po_hi_int8_t        sizes[],
                          __po_hi_uint8_t       first[],
                          __po_hi_uint8_t       offsets[],
                          __po_hi_uint8_t       woffsets[],
                          __po_hi_uint8_t       n_dest[],
                          __po_hi_port_t*       destinations[],
                          __po_hi_uint8_t       used_size[],
                          __po_hi_local_port_t  history[],
                          __po_hi_request_t     recent[],
                          __po_hi_uint8_t       empties[],
                          __po_hi_uint16_t      total_fifo_size)
75
{
76
77
78
   __po_hi_uint8_t tmp;
   __po_hi_uint16_t off;
   __po_hi_request_t* request;
79

80
81
   __po_hi_gqueues_global_history_woffset[id] = 0;
   __po_hi_gqueues_global_history_offset[id] = 0;
82

83
84
85
86
87
   __po_hi_gqueues_n_empty[id] = nb_ports;
   __po_hi_gqueues[id] = queue;
   __po_hi_gqueues_most_recent_values[id] = recent;
   __po_hi_gqueues_global_history[id] = history;
   __po_hi_gqueues_woffsets[id] = woffsets;
88

89
   __po_hi_gqueues_port_is_empty[id] = empties;
90

91
92
93
94
   __po_hi_gqueues_nb_ports[id] = nb_ports; 
   __po_hi_gqueues_sizes[id] = sizes;
   __po_hi_gqueues_first[id] = first;
   __po_hi_gqueues_used_size[id] = used_size;
95

96
97
98
99
   __po_hi_gqueues_offsets[id] = offsets;
   __po_hi_gqueues_n_destinations[id] = n_dest;
   __po_hi_gqueues_destinations[id] = destinations;
   __po_hi_gqueues_total_fifo_size[id] = total_fifo_size;
100

101
   __po_hi_gqueues_queue_is_empty[id] = 1;
102

103
104
   pthread_mutexattr_init (&__po_hi_gqueues_mutexes_attr[id]);
   pthread_condattr_init (&__po_hi_gqueues_conds_attr[id]);
105
#ifdef POSIX
106
   pthread_mutexattr_setpshared(&__po_hi_gqueues_mutexes_attr[id],PTHREAD_PROCESS_SHARED); 
107
#endif
108
109
   pthread_mutex_init (&__po_hi_gqueues_mutexes[id], &__po_hi_gqueues_mutexes_attr[id]);
   pthread_cond_init (&__po_hi_gqueues_conds[id], &__po_hi_gqueues_conds_attr[id]);
110

111
   off = 0;
112

113
114
   for (tmp=0;tmp<nb_ports;tmp++)
   {
115
      __po_hi_gqueues_used_size[id][tmp] = 0;
116

117
      if ( (sizes[tmp] != __PO_HI_GQUEUE_FIFO_INDATA) 
118
119
120
121
122
123
124
125
            && (sizes[tmp] != __PO_HI_GQUEUE_FIFO_OUT))
      {
         __po_hi_gqueues_first[id][tmp]=off;
         off += __po_hi_gqueues_sizes[id][tmp];
         __po_hi_gqueues_offsets[id][tmp] = 0;
         __po_hi_gqueues_woffsets[id][tmp] = 0;
         __po_hi_gqueues_port_is_empty[id][tmp] = 1;
      }
126
127
128
129

      /* Set invalid all recent values */
      request = (__po_hi_request_t*)&__po_hi_gqueues_most_recent_values[id][tmp];
      request->port = __PO_HI_GQUEUE_INVALID_PORT;
130
   }
131
132

#ifdef __PO_HI_DEBUG
133
134
135
   __DEBUGMSG("Initialize global queue for task-id %d ... ", id);
   for (tmp=0;tmp<nb_ports;tmp++)
   {
136
      __DEBUGMSG("port %d (used_size=%d,first=%d) ", 
137
138
139
140
141
            tmp, 
            __po_hi_gqueues_used_size[id][tmp],
            __po_hi_gqueues_first[id][tmp]);
   }
   __DEBUGMSG(" ... done\n");
142
143
144
145
146
#endif 
}


void __po_hi_gqueue_store_out (__po_hi_task_id id, 
147
148
                               __po_hi_local_port_t port, 
                               __po_hi_request_t* request)
149
{
150
   __po_hi_request_t* ptr;
151

152
153
154
   request->port = __PO_HI_GQUEUE_OUT_PORT;
   ptr = &__po_hi_gqueues_most_recent_values[id][port];
   memcpy (ptr, request, sizeof (*request));
155
156
}

157
158


159
__po_hi_uint8_t __po_hi_gqueue_store_in (__po_hi_task_id id, 
160
161
                                         __po_hi_local_port_t port, 
                                         __po_hi_request_t* request)
162
{
163
164
   __po_hi_request_t* ptr;
   ptr = &__po_hi_gqueues_most_recent_values[id][port];
165
#ifdef __PO_HI_DEBUG
166
167
   if (ptr == NULL)
   {
168
      __DEBUGMSG ("__po_hi_gqueue_store_in : NULL POINTER\n");
169
   }
170
171
#endif

172
173
174
   pthread_mutex_lock (&__po_hi_gqueues_mutexes[id]);
   if (__po_hi_gqueues_sizes[id][port] == __PO_HI_GQUEUE_FIFO_INDATA)
   {
175
      memcpy(ptr,request,sizeof(*request));
176
177
178
   }
   else
   {
179
180
181
182
#ifdef __PO_HI_DEBUG
      __DEBUGMSG ("Received  message for task %d, port %d\n", id, port);
#endif
      if (__po_hi_gqueues_used_size[id][port] == __po_hi_gqueues_sizes[id][port])
183
184
185
186
187
      {
         pthread_mutex_unlock (&__po_hi_gqueues_mutexes[id]);
         __DEBUGMSG ("QUEUE FULL");
         return __PO_HI_ERROR_QUEUE_FULL;
      }
188
189
190
191
192
193
194
195
196
197

      memcpy ((void *)&__po_hi_gqueues[id][port] + ( (__po_hi_gqueues_woffsets[id][port] + __po_hi_gqueues_first[id][port])  * sizeof (*request) ) , request, sizeof (*request));
      __po_hi_gqueues_woffsets[id][port] =  (__po_hi_gqueues_woffsets[id][port] + 1 ) % __po_hi_gqueues_sizes[id][port];

      __po_hi_gqueues_used_size[id][port]++;

      __po_hi_gqueues_global_history[id][__po_hi_gqueues_global_history_woffset[id]] = port;
      __po_hi_gqueues_global_history_woffset[id] = (__po_hi_gqueues_global_history_woffset[id] + 1 ) % __po_hi_gqueues_total_fifo_size[id];

      if (__po_hi_gqueues_port_is_empty[id][port] == 1)
198
199
200
201
      {
         __po_hi_gqueues_port_is_empty[id][port] = 0;
         __po_hi_gqueues_n_empty[id]--;
      }
202
      __po_hi_gqueues_queue_is_empty[id] = 0;
203
   }
204

205
206
   pthread_mutex_unlock (&__po_hi_gqueues_mutexes[id]);
   pthread_cond_broadcast (&__po_hi_gqueues_conds[id]);
207

208
   return __PO_HI_SUCCESS;
209
210
}

211
212
void __po_hi_gqueue_wait_for_incoming_event (__po_hi_task_id id, 
                                             __po_hi_local_port_t* port)
213
{
214
215
216
   pthread_mutex_lock (&__po_hi_gqueues_mutexes[id]);
   while(__po_hi_gqueues_queue_is_empty[id] == 1)
   {
217
      pthread_cond_wait (&__po_hi_gqueues_conds[id],
218
219
220
221
            &__po_hi_gqueues_mutexes[id]);
   }
   *port = __po_hi_gqueues_global_history[id][__po_hi_gqueues_global_history_offset[id]];
   pthread_mutex_unlock (&__po_hi_gqueues_mutexes[id]);
222
223
224
225
}

int __po_hi_gqueue_get_count( __po_hi_task_id id, __po_hi_local_port_t port)
{
226
227
   if (__po_hi_gqueues_sizes[id][port] == __PO_HI_GQUEUE_FIFO_INDATA)
   {
228
      return 1; /* data port are always of size 1 */
229
230
231
   }
   else
   {
232
      return (__po_hi_gqueues_used_size[id][port]);
233
   }
234
235
236
}

int __po_hi_gqueue_get_value( __po_hi_task_id id, 
237
238
      __po_hi_local_port_t port, 
      __po_hi_request_t* request)
239
{
240
241
242
243
244
245
246
247
248
249
   __po_hi_request_t* ptr;
   ptr = &__po_hi_gqueues_most_recent_values[id][port];
   pthread_mutex_lock (&__po_hi_gqueues_mutexes[id]);

   /*
    * If the port is an event port, with no value queued, then we block
    * the thread.
    */
   if (__po_hi_gqueues_sizes[id][port] != __PO_HI_GQUEUE_FIFO_INDATA)
   {
250
      while (__po_hi_gqueues_port_is_empty[id][port] == 1)
251
252
253
254
255
256
257
258
      {
         pthread_cond_wait (&__po_hi_gqueues_conds[id],
               &__po_hi_gqueues_mutexes[id]);
      }
   }

   if (__po_hi_gqueues_used_size[id][port] == 0)
   {
259
      memcpy (request, ptr, sizeof (__po_hi_request_t));
260
261
262
   }
   else
   {
263
      memcpy (request, 
264
265
266
267
268
269
            (void *)&__po_hi_gqueues[id][port] 
            + ( __po_hi_gqueues_first[id][port] 
               + __po_hi_gqueues_offsets[id][port] ) 
            * sizeof (__po_hi_request_t), 
            sizeof (__po_hi_request_t));
   }
270
271

#ifdef __PO_HI_DEBUG
272
   __DEBUGMSG ("Task %d get a value on port %d\n", id, port);
273
274
#endif

275
276
   pthread_mutex_unlock (&__po_hi_gqueues_mutexes[id]);
   return 0;
277
278
}

279
int __po_hi_gqueue_next_value (__po_hi_task_id id, __po_hi_local_port_t port)
280
{
281
282
   /* incomplete semantics, should discriminate and report whether
      there is a next value or not */
283

284
285
286
   /* XXX change and use assert ? */
   if (__po_hi_gqueues_sizes[id][port] == __PO_HI_GQUEUE_FIFO_INDATA)
   {
287
      return 1;
288
   }
289

290
291
292
293
   pthread_mutex_lock (&__po_hi_gqueues_mutexes[id]);
   __po_hi_gqueues_offsets[id][port] = 
      (__po_hi_gqueues_offsets[id][port] + 1) 
      % __po_hi_gqueues_sizes[id][port];
294

295
   __po_hi_gqueues_used_size[id][port]--;
296

297
298
   if (__po_hi_gqueues_used_size[id][port] == 0)
   {
299
300
      __po_hi_gqueues_n_empty[id]++;
      __po_hi_gqueues_port_is_empty[id][port] = 1;
301
302
303
304
   }

   if (__po_hi_gqueues_n_empty[id] == __po_hi_gqueues_nb_ports[id])
   {
305
      __po_hi_gqueues_queue_is_empty[id] = 1;
306
   }
307

308
309
310
   __po_hi_gqueues_global_history_offset[id] = 
      (__po_hi_gqueues_global_history_offset[id] + 1) 
      % __po_hi_gqueues_total_fifo_size[id];
311

312
313
   pthread_mutex_unlock (&__po_hi_gqueues_mutexes[id]);
   return __PO_HI_SUCCESS;
314
}