po_hi_driver_sockets.c 15.5 KB
Newer Older
1
2
3
4
5
6
7
/*
 * This is a part of PolyORB-HI-C distribution, a minimal
 * middleware written for generated code from AADL models.
 * You should use it with the Ocarina toolsuite.
 *
 * For more informations, please visit http://ocarina.enst.fr
 *
8
 * Copyright (C) 2010, European Space Agency
9
10
11
 * Copyright (C) 2007-2008, GET-Telecom Paris.
 */

12
#include <deployment.h>
julien.delange's avatar
julien.delange committed
13
#include <marshallers.h>
14

julien.delange's avatar
julien.delange committed
15
16
#if (defined (__PO_HI_NEED_DRIVER_SOCKETS) || \
     defined (__PO_HI_NEED_DRIVER_RTEMS_NE2000_SOCKETS))
17

18
19
20
21
22
23
24
25
26
#include <po_hi_config.h>
#include <po_hi_task.h>
#include <po_hi_transport.h>
#include <po_hi_debug.h>
#include <po_hi_types.h>
#include <po_hi_messages.h>
#include <po_hi_returns.h>
#include <po_hi_main.h>
#include <po_hi_task.h>
27
#include <drivers/po_hi_driver_sockets.h>
28
#include <drivers/po_hi_driver_sockets_common.h>
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

#include <activity.h>

#include <signal.h>
#include <string.h>
#include <unistd.h>
#include <netdb.h>
#include <sys/types.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/time.h>

/*
44
 * This file (po_hi_sockets.c) provides function to handle
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
 * communication between nodes in PolyORB-HI-C.  We don't use a
 * protocol to send data. For each data sent, we send before the
 * entity number provided by the generated file deployment.h, then, we
 * send the message.  Each entity has a fixed size
 * (sizeof(__po_hi_entity_t)), and each message has a max fixed size
 * (see the __PO_HI_MESSAGES_MAX_SIZE macro).
 */

/* The following declarations avoid conflicts
 * with current generated code.
 */

#ifndef __PO_HI_NB_NODES
#define __PO_HI_NB_NODES 1
#endif

/*
 * We have two arrays of sockets. The first array (nodes) is used to
 * send data to other nodes. A special socket if nodes[mynode] : this
 * socket is used to listen others processes.  The second array
 * (rnodes), is used to store all socket that are created by the
 * listen socket. This array is used only by the receiver_task
 */

julien.delange's avatar
julien.delange committed
69
#ifdef __PO_HI_NEED_DRIVER_SOCKETS
70
71
__po_hi_inetnode_t nodes[__PO_HI_NB_NODES];
__po_hi_inetnode_t rnodes[__PO_HI_NB_NODES];
julien.delange's avatar
julien.delange committed
72
73
74
75
76
#else
__po_hi_inetnode_t nodes[__PO_HI_NB_DEVICES];
__po_hi_inetnode_t rnodes[__PO_HI_NB_DEVICES];
#endif

77

julien.delange's avatar
julien.delange committed
78
int __po_hi_driver_sockets_send (__po_hi_entity_t from, 
79
80
                                 __po_hi_entity_t to, 
                                 __po_hi_msg_t* msg)
81
{
julien.delange's avatar
julien.delange committed
82
83
84
85
86
87
88
89
90
91
   __po_hi_node_t  node;
   int             len;
   int             size_to_write;
   int             optval = 0;
   socklen_t       optlen = 0;

   node = __po_hi_transport_get_node_from_entity (to);

   if (nodes[node].socket == -1 )
   {
92
#ifdef __PO_HI_DEBUG
93
      __DEBUGMSG (" [... failure ...]\n");
94
95
96
97
98
99
100
101
#endif
      return __PO_HI_ERROR_TRANSPORT_SEND;		
   }

   /*
    * After sending the entity identifier, we send the message which
    * contains the request.
    */
julien.delange's avatar
julien.delange committed
102
103
104
105
106

   size_to_write = __PO_HI_MESSAGES_MAX_SIZE;

   if (getsockopt (nodes[node].socket, SOL_SOCKET, SO_ERROR, &optval, &optlen) == -1)
   {
107
      __DEBUGMSG (" [error getsockopt() in file %s, line%d ]\n", __FILE__, __LINE__);
108
109
110
111
112
      close (nodes[node].socket);
      nodes[node].socket = -1;
      return __PO_HI_ERROR_TRANSPORT_SEND;		
   }

julien.delange's avatar
julien.delange committed
113
114
   if (optval != 0)
   {
115
      __DEBUGMSG (" [error getsockopt() return code in file %s, line%d ]\n", __FILE__, __LINE__);
116
117
118
      close (nodes[node].socket);
      nodes[node].socket = -1;
      return __PO_HI_ERROR_TRANSPORT_SEND;		
julien.delange's avatar
julien.delange committed
119
   }
120

julien.delange's avatar
julien.delange committed
121
122
123
124
   /* Ignore SIGPIPE to be able to recover from errors instead of crashing the node */

   if (signal (SIGPIPE, SIG_IGN) == SIG_ERR)
   {
125
      __DEBUGMSG (" [error signal() return code in file %s, line%d ]\n", __FILE__, __LINE__);
126
127
128
      close (nodes[node].socket);
      nodes[node].socket = -1;
      return __PO_HI_ERROR_TRANSPORT_SEND;		
julien.delange's avatar
julien.delange committed
129
130
   }

131
#ifdef __PO_HI_DEBUG
julien.delange's avatar
julien.delange committed
132
   __po_hi_messages_debug (msg);
133
134
#endif

julien.delange's avatar
julien.delange committed
135
136
137
138
   len = write (nodes[node].socket, &(msg->content), size_to_write);

   if (len != size_to_write)
   {
139
      __DEBUGMSG (" [error write() length in file %s, line%d ]\n", __FILE__, __LINE__);
140
141
142
      close (nodes[node].socket);
      nodes[node].socket = -1;
      return __PO_HI_ERROR_TRANSPORT_SEND;		
julien.delange's avatar
julien.delange committed
143
144
   }

145
   __DEBUGMSG (" [SUCCESS]\n");
146

julien.delange's avatar
julien.delange committed
147
   return __PO_HI_SUCCESS;
148
149
}

julien.delange's avatar
julien.delange committed
150
151
152
153

extern __po_hi_device_id socket_device_id;

void* __po_hi_sockets_poller (void)
154
{
julien.delange's avatar
julien.delange committed
155
   __DEBUGMSG ("Poller launched\n");
julien.delange's avatar
julien.delange committed
156
157
158
159
160
161
162
   socklen_t          socklen = sizeof (struct sockaddr);
   /* See ACCEPT (2) for details on initial value of socklen */

   __po_hi_uint32_t   len;
   int                sock;
   int                max_socket;
   fd_set             selector;
julien.delange's avatar
julien.delange committed
163
164
165
166
   struct sockaddr_in sa;
   __po_hi_node_t     dev;
   __po_hi_node_t     dev_init;
   __po_hi_request_t  received_request;
julien.delange's avatar
julien.delange committed
167
   __po_hi_msg_t      msg;
168
   int                established = 0; 
julien.delange's avatar
julien.delange committed
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187

   max_socket = 0; /* Used to compute the max socket number, useful for listen() call */

   /*
    * We initialize each node socket with -1 value.  This value means
    * that the socket is not active.
    */
   for (dev = 0 ; dev < __PO_HI_NB_DEVICES ; dev++)
   {
      rnodes[dev].socket = -1;
   }

   /*
    * Create a socket for each node that will communicate with us.
    */
   for (dev = 0; dev < __PO_HI_NB_DEVICES ; dev++)
   {
      if (dev != socket_device_id)
      {
188
         __DEBUGMSG ("[DRIVER SOCKETS] Poller wait for connection with device %d\n", dev);
julien.delange's avatar
julien.delange committed
189

190
191
192
193
194
         __PO_HI_SET_SOCKET_TIMEOUT(nodes[socket_device_id].socket,5);

         established = 0;

         while (established == 0)
julien.delange's avatar
julien.delange committed
195
         {
196
197
198
199
200
201
202
203
204
205
206
207
208
            sock = accept (nodes[socket_device_id].socket, (struct sockaddr*) &sa, &socklen);

            __PO_HI_SET_SOCKET_TIMEOUT(sock,10);

            if (read (sock, &dev_init, sizeof (__po_hi_device_id)) != sizeof (__po_hi_device_id))
            {
               established = 0;
               __DEBUGMSG ("[DRIVER SOCKETS] Cannot read device-id for device %d, socket=%d\n", dev, sock);
            }
            else
            {
               established = 1;
            }
julien.delange's avatar
julien.delange committed
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
         }
         __DEBUGMSG ("[DRIVER SOCKETS] read device-id %d from socket=%d\n", dev_init, sock);
         rnodes[dev].socket = sock;
         if (sock > max_socket )
         {
            max_socket = sock;
         }	  
      }
   }
   __DEBUGMSG ("[DRIVER SOCKETS] Poller initialization finished\n");
   __po_hi_wait_initialization ();

   /*
    * Then, listen and receive data on the socket, identify the node
    * which send the data and put it in its message queue
    */
   while (1)
   {
      FD_ZERO( &selector );
      for (dev = 0; dev < __PO_HI_NB_DEVICES ; dev++)
      {
         if ( (dev != socket_device_id ) && ( rnodes[dev].socket != -1 ) )
         {
            FD_SET( rnodes[dev].socket , &selector );
         }
      }

      if (select (max_socket + 1, &selector, NULL, NULL, NULL) == -1 )
      {
#ifdef __PO_HI_DEBUG
         __DEBUGMSG ("[DRIVER SOCKETS] Error on select for node %d\n", mynode);
#endif 
      }
#ifdef __PO_HI_DEBUG
      __DEBUGMSG ("[DRIVER SOCKETS] Receive message\n");
244
#endif
julien.delange's avatar
julien.delange committed
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284

      for (dev = 0; dev < __PO_HI_NB_DEVICES ; dev++)
      {
         if ( (rnodes[dev].socket != -1 ) && FD_ISSET(rnodes[dev].socket, &selector))
         {
            __DEBUGMSG ("[DRIVER SOCKETS] Receive message from dev %d\n", dev);
            memset (msg.content, '\0', __PO_HI_MESSAGES_MAX_SIZE);
            len = recv (rnodes[dev].socket, msg.content, __PO_HI_MESSAGES_MAX_SIZE, MSG_WAITALL);
            __DEBUGMSG ("[DRIVER SOCKETS] Message received len=%d\n",(int)len);

            if (len == 0)
            {
               rnodes[dev].socket = -1;
               continue;
            }

            __po_hi_unmarshall_request (&received_request, &msg);

            __po_hi_main_deliver (&received_request);
         }
      }
   }  
   return NULL;
}


/*
 * Old receiver code that is based on PolyORB-HI-C for AADLv1
 * Would be considered as deprecated.
 */
void* __po_hi_sockets_receiver_task (void)
{
   socklen_t          socklen = sizeof (struct sockaddr);
   /* See ACCEPT (2) for details on initial value of socklen */

   __po_hi_uint32_t   len;
   int                sock;
   int                max_socket;
   fd_set             selector;
   __po_hi_msg_t      msg;
julien.delange's avatar
julien.delange committed
285
286
287
288
289
   __po_hi_node_t     node;
   __po_hi_node_t     node_init;
   __po_hi_request_t  received_request;
   struct sockaddr_in sa;

290

julien.delange's avatar
julien.delange committed
291
292
293
294
295
296
297
298
   max_socket = 0; /* Used to compute the max socket number, useful for listen() call */

   /*
    * We initialize each node socket with -1 value.  This value means
    * that the socket is not active.
    */
   for (node = 0 ; node < __PO_HI_NB_NODES ; node++)
   {
299
      rnodes[node].socket = -1;
julien.delange's avatar
julien.delange committed
300
301
302
303
304
305
306
   }

   /*
    * Create a socket for each node that will communicate with us.
    */
   for (node = 0; node < __PO_HI_NB_NODES ; node++)
   {
307
      if (node != mynode )
julien.delange's avatar
julien.delange committed
308
309
      {
         sock = accept (nodes[mynode].socket, (struct sockaddr*) &sa, &socklen);
310
311
312
313
         if (sock == -1)
         {
            __DEBUGMSG ("accept() failed, return=%d\n", sock);
         }
julien.delange's avatar
julien.delange committed
314
315
316
317
318
319

         if (read (sock, &node_init, sizeof (__po_hi_node_t)) != sizeof (__po_hi_node_t))
         {
            __DEBUGMSG ("Cannot read node-id for socket %d\n", sock);
            continue;
         }
320

julien.delange's avatar
julien.delange committed
321
322
323
324
325
326
327
         rnodes[node].socket = sock;
         if (sock > max_socket )
         {
            max_socket = sock;
         }	  
      }
   }
328
#ifdef __PO_HI_DEBUG
julien.delange's avatar
julien.delange committed
329
   __DEBUGMSG ("Receiver initialization finished\n");
330
#endif
julien.delange's avatar
julien.delange committed
331
332
333
334
335
336
337
   __po_hi_wait_initialization ();

   /*
    * Then, listen and receive data on the socket, identify the node
    * which send the data and put it in its message queue
    */
   while (1)
338
   {
julien.delange's avatar
julien.delange committed
339
340
341
      FD_ZERO( &selector );
      for (node = 0; node < __PO_HI_NB_NODES ; node++)
      {
342
         if ( (node != mynode ) && ( rnodes[node].socket != -1 ) )
julien.delange's avatar
julien.delange committed
343
344
345
346
347
348
349
         {
            FD_SET( rnodes[node].socket , &selector );
         }
      }

      if (select (max_socket + 1, &selector, NULL, NULL, NULL) == -1 )
      {
350
351
352
#ifdef __PO_HI_DEBUG
         __DEBUGMSG ("Error on select for node %d\n", mynode);
#endif 
julien.delange's avatar
julien.delange committed
353
      }
354
#ifdef __PO_HI_DEBUG
julien.delange's avatar
julien.delange committed
355
      __DEBUGMSG ("Receive message\n");
356
#endif
julien.delange's avatar
julien.delange committed
357
358
359

      for (node = 0; node < __PO_HI_NB_NODES ; node++)
      {
360
         if ( (rnodes[node].socket != -1 ) && FD_ISSET(rnodes[node].socket, &selector))
julien.delange's avatar
julien.delange committed
361
         {
362
#ifdef __PO_HI_DEBUG
julien.delange's avatar
julien.delange committed
363
            __DEBUGMSG ("Receive message from node %d\n", node);
364
#endif
julien.delange's avatar
julien.delange committed
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380

            __DEBUGMSG ("Using raw protocol stack\n");
            len = recv (rnodes[node].socket, &(msg.content), __PO_HI_MESSAGES_MAX_SIZE, MSG_WAITALL);
            msg.length = len;
            if (len != __PO_HI_MESSAGES_MAX_SIZE )
            {
               __DEBUGMSG ("ERROR, %u %d", (unsigned int) len, __PO_HI_MESSAGES_MAX_SIZE);
               close (rnodes[node].socket);
               rnodes[node].socket = -1;
               continue;
            }
            __DEBUGMSG ("Message delivered");

            __po_hi_unmarshall_request (&received_request, &msg);

            __po_hi_main_deliver (&received_request);
julien.delange's avatar
julien.delange committed
381

julien.delange's avatar
julien.delange committed
382
383
384
            __po_hi_msg_reallocate(&msg);        /* re-initialize the message */
         }
      }
385
   }  
julien.delange's avatar
julien.delange committed
386
   return NULL;
387
388
}

389

julien.delange's avatar
julien.delange committed
390
391
392
393
394
395
396
/*
 * The following code implements the old socket layer
 * for PolyORB-HI-C and AADLv1.
 * Would be considered as deprecated.
 */
#ifdef __PO_HI_NEED_DRIVER_SOCKETS
void __po_hi_driver_sockets_init (__po_hi_device_id id)
397
{
julien.delange's avatar
julien.delange committed
398
399
400
401
402
   int                i;
   int                ret;
   int                reuse;
   char               *tmp;
   __po_hi_time_t     mytime;
403
   struct sockaddr_in sa;
julien.delange's avatar
julien.delange committed
404
   struct hostent*    hostinfo;
405

julien.delange's avatar
julien.delange committed
406
407
   char dev_addr[16];
   int node;
408

julien.delange's avatar
julien.delange committed
409
410
411
412
   memset (dev_addr, '\0', 16);


   for (node = 0 ; node < __PO_HI_NB_NODES ; node++)
413
   {
julien.delange's avatar
julien.delange committed
414
      nodes[node].socket = -1;
415
416
417
   }

   /*
julien.delange's avatar
julien.delange committed
418
419
420
    * If the current node port has a port number, then it has to
    * listen to other nodes. So, we create a socket, bind it and
    * listen to other nodes.
421
    */
julien.delange's avatar
julien.delange committed
422
   if ( node_port[mynode] != __PO_HI_NOPORT )
423
   {
julien.delange's avatar
julien.delange committed
424
425
426
      nodes[mynode].socket = socket (AF_INET, SOCK_STREAM, 0);

      if (nodes[mynode].socket == -1 )
427
      {
julien.delange's avatar
julien.delange committed
428
429
430
431
432
#ifdef __PO_HI_DEBUG
         __DEBUGMSG ("Cannot create socket for node %d\n", mynode);
#endif
         return;
      }
433

julien.delange's avatar
julien.delange committed
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
      reuse = 1;
      setsockopt (nodes[mynode].socket, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof (reuse));

      sa.sin_addr.s_addr = htonl (INADDR_ANY);   /* We listen on all adresses */
      sa.sin_family = AF_INET;                   
      sa.sin_port = htons (node_port[mynode]);   /* Port provided by the generated code */

      if( bind( nodes[mynode].socket , ( struct sockaddr * ) &sa , sizeof( struct sockaddr_in ) ) < 0 )
      {
#ifdef __PO_HI_DEBUG
         __DEBUGMSG ("Unable to bind socket and port on socket %d\n", nodes[mynode].socket);
#endif
      }

      if( listen( nodes[mynode].socket , __PO_HI_NB_ENTITIES ) < 0 )
      {
#ifdef __PO_HI_DEBUG
         __DEBUGMSG ("Cannot listen on socket %d\n", nodes[mynode].socket);
#endif
453
      }
julien.delange's avatar
julien.delange committed
454
455
456
457
458
459
460
461
462
463
464

      /* 
       * Create the thread which receive all data from other
       * nodes. This thread will execute the function
       * __po_hi_receiver_task
       */

      __po_hi_initialize_add_task ();

      __po_hi_create_generic_task 
         (-1, 0,__PO_HI_MAX_PRIORITY, 0, __po_hi_sockets_receiver_task);
465
466
467
   }

   /*
julien.delange's avatar
julien.delange committed
468
469
    * For each node in the sytem that may communicate with the current
    * node we create a socket. This socket will be used to send data.
470
    */
julien.delange's avatar
julien.delange committed
471
   for (node = 0 ; node < __PO_HI_NB_NODES ; node++ )
472
   {
julien.delange's avatar
julien.delange committed
473
      if ( (node != mynode) && (node_port[node] != __PO_HI_NOPORT) && (nodes[node].socket == -1) )
474
      {
julien.delange's avatar
julien.delange committed
475
         while (1)
476
         {
julien.delange's avatar
julien.delange committed
477
            nodes[node].socket = socket (AF_INET, SOCK_STREAM, 0);
478

julien.delange's avatar
julien.delange committed
479
480
            if (nodes[node].socket == -1 )
            {
481
#ifdef __PO_HI_DEBUG
julien.delange's avatar
julien.delange committed
482
483
484
485
486
487
488
489
490
               __DEBUGMSG ("Socket for node %d is not created", node);
#endif
               return;
            }

            hostinfo = gethostbyname ((char*)node_addr[node]);

            if (hostinfo == NULL )
            {
491
#ifdef __PO_HI_DEBUG
julien.delange's avatar
julien.delange committed
492
               __DEBUGMSG ("Error while getting host informations for node %d\n", node);
493
#endif
julien.delange's avatar
julien.delange committed
494
            }
495

julien.delange's avatar
julien.delange committed
496
497
498
499
500
501
502
503
504
505
506
507
            sa.sin_port = htons( node_port[node] );
            sa.sin_family = AF_INET;

            /* The following lines are used to copy the
             * hostinfo->h_length to the sa.sin_addr member. Most
             * of program use the memcpy to do that, but the
             * RT-POSIX profile we use forbid the use of this
             * function.  We use a loop instead to perform the
             * copy.  So, these lines replace the code :
             *
             * memcpy( (char*) &( sa.sin_addr ) , (char*)hostinfo->h_addr , hostinfo->h_length );
             */
508

julien.delange's avatar
julien.delange committed
509
510
            tmp = (char*) &(sa.sin_addr);
            for (i=0 ; i<hostinfo->h_length ; i++)
511
            {
julien.delange's avatar
julien.delange committed
512
               tmp[i] = hostinfo->h_addr[i];
513
514
            }

julien.delange's avatar
julien.delange committed
515
516
517
518
            /*
             * We try to connect on the remote host. We try every
             * second to connect on.
             */
519

julien.delange's avatar
julien.delange committed
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
            ret = connect 
               (nodes[node].socket, ( struct sockaddr* ) &sa , sizeof( struct sockaddr_in ));

            if (ret == 0)
            {
               if (write (nodes[node].socket, &mynode, sizeof (__po_hi_node_t)) != sizeof (__po_hi_node_t))
               {
#ifdef __PO_HI_DEBUG
                  __DEBUGMSG ("Node %d cannot send his node-id\n", node);
#endif
               }
               break;
            }

            if (close (nodes[node].socket))
            {
#ifdef __PO_HI_DEBUG
               __DEBUGMSG ("Cannot close socket %d\n", nodes[node].socket);
#endif
            }

            /*
             * We wait 500ms each time we try to connect on the
             * remote host
             */

            __po_hi_get_time (&mytime);
            __po_hi_delay_until (__po_hi_add_times (mytime, __po_hi_milliseconds (500)));
548
549
         }
      }
julien.delange's avatar
julien.delange committed
550
   }
551
}
julien.delange's avatar
julien.delange committed
552
553
#endif

554
555
556



557
558
#endif