po_hi_driver_sockets.c 22.4 KB
Newer Older
1
2
3
4
5
6
7
/*
 * This is a part of PolyORB-HI-C distribution, a minimal
 * middleware written for generated code from AADL models.
 * You should use it with the Ocarina toolsuite.
 *
 * For more informations, please visit http://ocarina.enst.fr
 *
8
 * Copyright (C) 2010, European Space Agency
9
10
11
 * Copyright (C) 2007-2008, GET-Telecom Paris.
 */

12
#include <deployment.h>
julien.delange's avatar
julien.delange committed
13
#include <marshallers.h>
14

julien.delange's avatar
julien.delange committed
15
16
#if (defined (__PO_HI_NEED_DRIVER_SOCKETS) || \
     defined (__PO_HI_NEED_DRIVER_RTEMS_NE2000_SOCKETS))
17

18
19
20
21
22
23
24
25
26
#include <po_hi_config.h>
#include <po_hi_task.h>
#include <po_hi_transport.h>
#include <po_hi_debug.h>
#include <po_hi_types.h>
#include <po_hi_messages.h>
#include <po_hi_returns.h>
#include <po_hi_main.h>
#include <po_hi_task.h>
27
#include <po_hi_gqueue.h>
28
#include <drivers/po_hi_driver_sockets.h>
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

#include <activity.h>

#include <signal.h>
#include <string.h>
#include <unistd.h>
#include <netdb.h>
#include <sys/types.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/time.h>

/*
44
 * This file (po_hi_sockets.c) provides function to handle
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
 * communication between nodes in PolyORB-HI-C.  We don't use a
 * protocol to send data. For each data sent, we send before the
 * entity number provided by the generated file deployment.h, then, we
 * send the message.  Each entity has a fixed size
 * (sizeof(__po_hi_entity_t)), and each message has a max fixed size
 * (see the __PO_HI_MESSAGES_MAX_SIZE macro).
 */

/* The following declarations avoid conflicts
 * with current generated code.
 */

#ifndef __PO_HI_NB_NODES
#define __PO_HI_NB_NODES 1
#endif

/*
 * We have two arrays of sockets. The first array (nodes) is used to
63
 * send data to other nodes. A special socket if nodes[__po_hi_mynode] : this
64
65
66
67
68
 * socket is used to listen others processes.  The second array
 * (rnodes), is used to store all socket that are created by the
 * listen socket. This array is used only by the receiver_task
 */

jdelange's avatar
jdelange committed
69
70
__po_hi_inetnode_t         nodes[__PO_HI_NB_DEVICES];
__po_hi_inetnode_t         rnodes[__PO_HI_NB_DEVICES];
julien.delange's avatar
julien.delange committed
71

jdelange's avatar
jdelange committed
72
__po_hi_device_id          socket_device_id;
73

74
75
__po_hi_msg_t              __po_hi_driver_sockets_send_msg;

76
77
78
int __po_hi_driver_sockets_send (__po_hi_task_id task_id,
                                 __po_hi_port_t port)
{
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
   int                        len;
   int                        size_to_write;
   int                        optval = 0;
   socklen_t                  optlen = 0;
   __po_hi_device_id          associated_device;
   __po_hi_local_port_t       local_port;
   __po_hi_request_t*         request;
   __po_hi_port_t             destination_port;
   __po_hi_protocol_t         protocol_id;
   __po_hi_protocol_conf_t*   protocol_conf;

   local_port              = __po_hi_get_local_port_from_global_port (port);
   request                 = __po_hi_gqueue_get_most_recent_value (task_id, local_port);
   destination_port        = __po_hi_gqueue_get_destination (task_id, local_port, 0);
   associated_device       = __po_hi_get_device_from_port (destination_port);
   protocol_id             = __po_hi_transport_get_protocol (port, destination_port);
   protocol_conf           = __po_hi_transport_get_protocol_configuration (protocol_id);
96

97
98
99
100
101
102
   if (request->port == -1)
   {

#ifdef __PO_HI_DEBUG
      __DEBUGMSG (" [DRIVER SOCKETS] No data to write on port %d\n", port);
#endif
103
      return __PO_HI_ERROR_TRANSPORT_SEND;
104
105
   }

106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
   if (nodes[associated_device].socket == -1 )
   {
#ifdef __PO_HI_DEBUG
      __DEBUGMSG (" [DRIVER SOCKETS] Invalid socket for port-id %d, device-id %d\n", destination_port, associated_device);
#endif
      return __PO_HI_ERROR_TRANSPORT_SEND;		
   }

   /*
    * After sending the entity identifier, we send the message which
    * contains the request.
    */

   size_to_write = __PO_HI_MESSAGES_MAX_SIZE;

   if (getsockopt (nodes[associated_device].socket, SOL_SOCKET, SO_ERROR, &optval, &optlen) == -1)
   {
      __DEBUGMSG (" [error getsockopt() in file %s, line%d ]\n", __FILE__, __LINE__);
      close (nodes[associated_device].socket);
      nodes[associated_device].socket = -1;
      return __PO_HI_ERROR_TRANSPORT_SEND;		
   }

   if (optval != 0)
   {
      __DEBUGMSG (" [error getsockopt() return code in file %s, line%d ]\n", __FILE__, __LINE__);
      close (nodes[associated_device].socket);
      nodes[associated_device].socket = -1;
      return __PO_HI_ERROR_TRANSPORT_SEND;		
   }

   /* Ignore SIGPIPE to be able to recover from errors instead of crashing the node */

   if (signal (SIGPIPE, SIG_IGN) == SIG_ERR)
   {
      __DEBUGMSG (" [error signal() return code in file %s, line%d ]\n", __FILE__, __LINE__);
      close (nodes[associated_device].socket);
      nodes[associated_device].socket = -1;
      return __PO_HI_ERROR_TRANSPORT_SEND;
   }
146
147
148
149
150
151

   switch (protocol_id)
   {
#ifdef __PO_HI_USE_PROTOCOL_MYPROTOCOL_I
      case virtual_bus_myprotocol_i:
      {
jdelange's avatar
jdelange committed
152
153
154
155
156
157
158
159
160
161
162
163
         size_to_write = sizeof (int);
         int datawritten;
         protocol_conf->marshaller(request, &datawritten, &size_to_write);
         len = write (nodes[associated_device].socket, &datawritten, size_to_write);

         if (len != size_to_write)
         {
            __DEBUGMSG (" [error write() length in file %s, line%d ]\n", __FILE__, __LINE__);
            close (nodes[associated_device].socket);
            nodes[associated_device].socket = -1;
            return __PO_HI_ERROR_TRANSPORT_SEND;		
         }
164
165
166
167
168
169
         break;
      }
#endif
      default: 
      {
         request->port = destination_port;
170
171
         __po_hi_msg_reallocate (&__po_hi_driver_sockets_send_msg);
         __po_hi_marshall_request (request, &__po_hi_driver_sockets_send_msg);
172
173

#ifdef __PO_HI_DEBUG
174
         __po_hi_messages_debug (&__po_hi_driver_sockets_send_msg);
175
176
#endif

177
         len = write (nodes[associated_device].socket, &(__po_hi_driver_sockets_send_msg.content), size_to_write);
178

179
180
181
182
183
184
185
         if (len != size_to_write)
         {
            __DEBUGMSG (" [error write() length in file %s, line%d ]\n", __FILE__, __LINE__);
            close (nodes[associated_device].socket);
            nodes[associated_device].socket = -1;
            return __PO_HI_ERROR_TRANSPORT_SEND;		
         }
186
187

         request->port = __PO_HI_GQUEUE_INVALID_PORT;
188
189
         break;
      }
190
191
192
193
194
195
   }

   return __PO_HI_SUCCESS;
}


196
197
198
__po_hi_request_t          __po_hi_sockets_poller_received_request;
__po_hi_msg_t              __po_hi_sockets_poller_msg;

199
void* __po_hi_sockets_poller (const __po_hi_device_id dev_id)
200
{
jdelange's avatar
jdelange committed
201
   (void)                     dev_id;
202
   __DEBUGMSG ("Poller launched, device-id=%d\n", socket_device_id);
jdelange's avatar
jdelange committed
203
   socklen_t                  socklen = sizeof (struct sockaddr);
julien.delange's avatar
julien.delange committed
204
205
   /* See ACCEPT (2) for details on initial value of socklen */

jdelange's avatar
jdelange committed
206
207
208
209
210
211
212
213
   __po_hi_uint32_t           len;
   int                        sock;
   int                        max_socket;
   fd_set                     selector;
   struct sockaddr_in         sa;
   __po_hi_device_id          dev;
   __po_hi_node_t             dev_init;
   int                        established = 0; 
jdelange's avatar
jdelange committed
214
   int                        ret;
jdelange's avatar
jdelange committed
215
   __po_hi_protocol_conf_t*   protocol_conf;
julien.delange's avatar
julien.delange committed
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232

   max_socket = 0; /* Used to compute the max socket number, useful for listen() call */

   /*
    * We initialize each node socket with -1 value.  This value means
    * that the socket is not active.
    */
   for (dev = 0 ; dev < __PO_HI_NB_DEVICES ; dev++)
   {
      rnodes[dev].socket = -1;
   }

   /*
    * Create a socket for each node that will communicate with us.
    */
   for (dev = 0; dev < __PO_HI_NB_DEVICES ; dev++)
   {
julien.delange's avatar
julien.delange committed
233
      if (dev != socket_device_id)
julien.delange's avatar
julien.delange committed
234
      {
235
236
237
238

         established = 0;

         while (established == 0)
julien.delange's avatar
julien.delange committed
239
         {
jdelange's avatar
jdelange committed
240
            __DEBUGMSG ("[DRIVER SOCKETS] Poller waits for connection with device %d (reading device=%d, socket=%d)\n", dev, socket_device_id, nodes[socket_device_id].socket);
241
242
            sock = accept (nodes[socket_device_id].socket, (struct sockaddr*) &sa, &socklen);

jdelange's avatar
jdelange committed
243
244
245
246
247
248
            if (sock == -1)
            {
               continue;
            }

            __DEBUGMSG ("[DRIVER SOCKETS] accept() passed, waiting for device id %d\n", dev);
249

250
#ifndef __PO_HI_USE_PROTOCOL_MYPROTOCOL_I
jdelange's avatar
jdelange committed
251
252
253

            ret = read (sock, &dev_init, sizeof (__po_hi_device_id));
            if (ret != sizeof (__po_hi_device_id))
254
255
            {
               established = 0;
jdelange's avatar
jdelange committed
256
257
               perror ("[DRIVER SOCKETS]");
               __DEBUGMSG ("[DRIVER SOCKETS] Cannot read device-id for device %d, socket=%d, ret=%d\n", dev, sock, ret);
258
259
260
            }
            else
            {
261
               __DEBUGMSG ("[DRIVER SOCKETS] read device-id %d from socket=%d\n", dev_init, sock);
262
263
               established = 1;
            }
264
265
266
#else
            established = 1;
#endif
julien.delange's avatar
julien.delange committed
267
268
269
270
271
272
273
274
         }
         rnodes[dev].socket = sock;
         if (sock > max_socket )
         {
            max_socket = sock;
         }	  
      }
   }
275
   __DEBUGMSG ("[DRIVER SOCKETS] Poller initialization finished, waiting for other tasks\n");
julien.delange's avatar
julien.delange committed
276
   __po_hi_wait_initialization ();
277
   __DEBUGMSG ("[DRIVER SOCKETS] Other tasks are initialized, let's start the polling !\n");
julien.delange's avatar
julien.delange committed
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296

   /*
    * Then, listen and receive data on the socket, identify the node
    * which send the data and put it in its message queue
    */
   while (1)
   {
      FD_ZERO( &selector );
      for (dev = 0; dev < __PO_HI_NB_DEVICES ; dev++)
      {
         if ( (dev != socket_device_id ) && ( rnodes[dev].socket != -1 ) )
         {
            FD_SET( rnodes[dev].socket , &selector );
         }
      }

      if (select (max_socket + 1, &selector, NULL, NULL, NULL) == -1 )
      {
#ifdef __PO_HI_DEBUG
297
         __DEBUGMSG ("[DRIVER SOCKETS] Error on select for node %d\n", __po_hi_mynode);
julien.delange's avatar
julien.delange committed
298
299
300
301
#endif 
      }
#ifdef __PO_HI_DEBUG
      __DEBUGMSG ("[DRIVER SOCKETS] Receive message\n");
302
#endif
julien.delange's avatar
julien.delange committed
303
304
305

      for (dev = 0; dev < __PO_HI_NB_DEVICES ; dev++)
      {
306
         __DEBUGMSG ("[DRIVER SOCKETS] Try to watch if it comes from device %d (socket=%d)\n", dev, rnodes[dev].socket);
julien.delange's avatar
julien.delange committed
307
308
309
         if ( (rnodes[dev].socket != -1 ) && FD_ISSET(rnodes[dev].socket, &selector))
         {
            __DEBUGMSG ("[DRIVER SOCKETS] Receive message from dev %d\n", dev);
jdelange's avatar
jdelange committed
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
#ifdef __PO_HI_USE_PROTOCOL_MYPROTOCOL_I
            {

               protocol_conf = __po_hi_transport_get_protocol_configuration (virtual_bus_myprotocol_i);


               int datareceived;
               len = recv (rnodes[dev].socket, &datareceived, sizeof (int), MSG_WAITALL);
               __DEBUGMSG ("[DRIVER SOCKETS] Message received len=%d\n",(int)len);
               if (len == 0)
               {
                  __DEBUGMSG ("[DRIVER SOCKETS] Zero size from device %d\n",dev);
                  rnodes[dev].socket = -1;
                  continue;
               }
325
326
               protocol_conf->unmarshaller (&__po_hi_sockets_poller_received_request, &datareceived, len);
               __po_hi_sockets_poller_received_request.port = 1;
jdelange's avatar
jdelange committed
327
328
329
            }

#else
330
331
332
            memset (__po_hi_sockets_poller_msg.content, '\0', __PO_HI_MESSAGES_MAX_SIZE);
            len = recv (rnodes[dev].socket, __po_hi_sockets_poller_msg.content, __PO_HI_MESSAGES_MAX_SIZE, MSG_WAITALL);
            __po_hi_sockets_poller_msg.length = len;
julien.delange's avatar
julien.delange committed
333
334
            __DEBUGMSG ("[DRIVER SOCKETS] Message received len=%d\n",(int)len);

335
#ifdef __PO_HI_DEBUG
336
   __po_hi_messages_debug (&__po_hi_sockets_poller_msg);
337
338
339
#endif


julien.delange's avatar
julien.delange committed
340
341
            if (len == 0)
            {
342
343

               __DEBUGMSG ("[DRIVER SOCKETS] Zero size from device %d\n",dev);
julien.delange's avatar
julien.delange committed
344
345
346
347
               rnodes[dev].socket = -1;
               continue;
            }

348
            __po_hi_unmarshall_request (&__po_hi_sockets_poller_received_request, &__po_hi_sockets_poller_msg);
jdelange's avatar
jdelange committed
349
#endif
350

351
            __po_hi_main_deliver (&__po_hi_sockets_poller_received_request);
julien.delange's avatar
julien.delange committed
352
353
354
355
356
357
358
         }
      }
   }  
   return NULL;
}


359
360
361
__po_hi_msg_t              __po_hi_sockets_receiver_task_msg;
__po_hi_request_t          __po_hi_sockets_receiver_task_received_request;

julien.delange's avatar
julien.delange committed
362
363
364
365
366
367
368
369
370
/*
 * Old receiver code that is based on PolyORB-HI-C for AADLv1
 * Would be considered as deprecated.
 */
void* __po_hi_sockets_receiver_task (void)
{
   socklen_t          socklen = sizeof (struct sockaddr);
   /* See ACCEPT (2) for details on initial value of socklen */

371
372
373
374
375
376
377
378
   __po_hi_uint32_t           len;
   int                        sock;
   int                        max_socket;
   fd_set                     selector;
   __po_hi_node_t             node;
   __po_hi_node_t             node_init;
   struct sockaddr_in         sa;
   __po_hi_protocol_conf_t*   protocol_conf;
julien.delange's avatar
julien.delange committed
379

380

julien.delange's avatar
julien.delange committed
381
382
383
384
385
386
387
388
   max_socket = 0; /* Used to compute the max socket number, useful for listen() call */

   /*
    * We initialize each node socket with -1 value.  This value means
    * that the socket is not active.
    */
   for (node = 0 ; node < __PO_HI_NB_NODES ; node++)
   {
389
      rnodes[node].socket = -1;
julien.delange's avatar
julien.delange committed
390
391
392
393
394
395
396
   }

   /*
    * Create a socket for each node that will communicate with us.
    */
   for (node = 0; node < __PO_HI_NB_NODES ; node++)
   {
397
      if (node != __po_hi_mynode )
julien.delange's avatar
julien.delange committed
398
      {
399
         sock = accept (nodes[__po_hi_mynode].socket, (struct sockaddr*) &sa, &socklen);
400
401
402
403
         if (sock == -1)
         {
            __DEBUGMSG ("accept() failed, return=%d\n", sock);
         }
julien.delange's avatar
julien.delange committed
404

405
406
407
         __DEBUGMSG ("accept() success, return=%d\n", sock);

#ifndef __PO_HI_USE_PROTOCOL_MYPROTOCOL_I
julien.delange's avatar
julien.delange committed
408
409
410
411
412
         if (read (sock, &node_init, sizeof (__po_hi_node_t)) != sizeof (__po_hi_node_t))
         {
            __DEBUGMSG ("Cannot read node-id for socket %d\n", sock);
            continue;
         }
413
#endif
414

julien.delange's avatar
julien.delange committed
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
         rnodes[node].socket = sock;
         if (sock > max_socket )
         {
            max_socket = sock;
         }	  
      }
   }
   __DEBUGMSG ("Receiver initialization finished\n");
   __po_hi_wait_initialization ();

   /*
    * Then, listen and receive data on the socket, identify the node
    * which send the data and put it in its message queue
    */
   while (1)
430
   {
julien.delange's avatar
julien.delange committed
431
432
433
      FD_ZERO( &selector );
      for (node = 0; node < __PO_HI_NB_NODES ; node++)
      {
434
         if ( (node != __po_hi_mynode ) && ( rnodes[node].socket != -1 ) )
julien.delange's avatar
julien.delange committed
435
436
437
438
439
440
441
         {
            FD_SET( rnodes[node].socket , &selector );
         }
      }

      if (select (max_socket + 1, &selector, NULL, NULL, NULL) == -1 )
      {
442
#ifdef __PO_HI_DEBUG
443
         __DEBUGMSG ("Error on select for node %d\n", __po_hi_mynode);
444
#endif 
julien.delange's avatar
julien.delange committed
445
      }
446
#ifdef __PO_HI_DEBUG
julien.delange's avatar
julien.delange committed
447
      __DEBUGMSG ("Receive message\n");
448
#endif
julien.delange's avatar
julien.delange committed
449
450
451

      for (node = 0; node < __PO_HI_NB_NODES ; node++)
      {
452
         if ( (rnodes[node].socket != -1 ) && FD_ISSET(rnodes[node].socket, &selector))
julien.delange's avatar
julien.delange committed
453
         {
454
#ifdef __PO_HI_DEBUG
julien.delange's avatar
julien.delange committed
455
            __DEBUGMSG ("Receive message from node %d\n", node);
456
#endif
julien.delange's avatar
julien.delange committed
457

458
459
460
#ifdef __PO_HI_USE_PROTOCOL_MYPROTOCOL_I

            __DEBUGMSG ("Using raw protocol stack\n");
461
462
            len = recv (rnodes[node].socket, &(__po_hi_sockets_receiver_task_msg.content), __PO_HI_MESSAGES_MAX_SIZE, MSG_WAITALL);
            __po_hi_sockets_receiver_task_msg.length = len;
463
464
465
466
467
468
469
470
471
472
473
            if (len != __PO_HI_MESSAGES_MAX_SIZE )
            {
               __DEBUGMSG ("ERROR, %u %d", (unsigned int) len, __PO_HI_MESSAGES_MAX_SIZE);
               close (rnodes[node].socket);
               rnodes[node].socket = -1;
               continue;
            }
            __DEBUGMSG ("Message delivered");


            protocol_conf = __po_hi_transport_get_protocol_configuration (virtual_bus_myprotocol_i);
474
            protocol_conf->unmarshaller (&__po_hi_sockets_receiver_task_received_request, &__po_hi_sockets_receiver_task_msg, len);
475
476
#else

julien.delange's avatar
julien.delange committed
477
            __DEBUGMSG ("Using raw protocol stack\n");
478
479
            len = recv (rnodes[node].socket, &(__po_hi_sockets_receiver_task_msg.content), __PO_HI_MESSAGES_MAX_SIZE, MSG_WAITALL);
            __po_hi_sockets_receiver_task_msg.length = len;
julien.delange's avatar
julien.delange committed
480
481
482
483
484
485
486
487
488
            if (len != __PO_HI_MESSAGES_MAX_SIZE )
            {
               __DEBUGMSG ("ERROR, %u %d", (unsigned int) len, __PO_HI_MESSAGES_MAX_SIZE);
               close (rnodes[node].socket);
               rnodes[node].socket = -1;
               continue;
            }
            __DEBUGMSG ("Message delivered");

489
            __po_hi_unmarshall_request (&__po_hi_sockets_receiver_task_received_request, &__po_hi_sockets_receiver_task_msg);
490
#endif
julien.delange's avatar
julien.delange committed
491

492
            __po_hi_main_deliver (&__po_hi_sockets_receiver_task_received_request);
julien.delange's avatar
julien.delange committed
493

494
            __po_hi_msg_reallocate(&__po_hi_sockets_receiver_task_msg);        /* re-initialize the message */
julien.delange's avatar
julien.delange committed
495
496
         }
      }
497
   }  
julien.delange's avatar
julien.delange committed
498
   return NULL;
499
500
}

julien.delange's avatar
julien.delange committed
501
void __po_hi_driver_sockets_init (__po_hi_device_id id)
502
{
julien.delange's avatar
julien.delange committed
503
504
505
506
   int                i;
   int                ret;
   int                reuse;
   char               *tmp;
507
   __po_hi_uint16_t   dev;
julien.delange's avatar
julien.delange committed
508
   __po_hi_time_t     mytime;
509
510
   __po_hi_time_t     tmptime;
   __po_hi_time_t     current_time;
511
   struct sockaddr_in sa;
julien.delange's avatar
julien.delange committed
512
   struct hostent*    hostinfo;
513
514


jdelange's avatar
jdelange committed
515
516
   __po_hi_c_ip_conf_t*    ipconf;
   unsigned short          ip_port;
517
   int node;
julien.delange's avatar
julien.delange committed
518

519
   socket_device_id = id;
julien.delange's avatar
julien.delange committed
520

521
   for (node = 0 ; node < __PO_HI_NB_DEVICES ; node++)
522
   {
julien.delange's avatar
julien.delange committed
523
      nodes[node].socket = -1;
524
525
   }

526
527
528
   ipconf = (__po_hi_c_ip_conf_t*)__po_hi_get_device_configuration (id);
   ip_port = (int)ipconf->port;

jdelange's avatar
jdelange committed
529
   __DEBUGMSG ("My configuration, addr=%s, port=%d\n", ipconf->address, ip_port );
530

531
   /*
julien.delange's avatar
julien.delange committed
532
533
534
    * If the current node port has a port number, then it has to
    * listen to other nodes. So, we create a socket, bind it and
    * listen to other nodes.
535
    */
536
   if (ip_port != 0)
537
   {
538
      nodes[id].socket = socket (AF_INET, SOCK_STREAM, 0);
julien.delange's avatar
julien.delange committed
539

jdelange's avatar
jdelange committed
540

541
      if (nodes[id].socket == -1 )
542
      {
julien.delange's avatar
julien.delange committed
543
#ifdef __PO_HI_DEBUG
544
         __DEBUGMSG ("Cannot create socket for device %d\n", id);
julien.delange's avatar
julien.delange committed
545
546
547
#endif
         return;
      }
548

jdelange's avatar
jdelange committed
549
550
      __DEBUGMSG ("Socket created for addr=%s, port=%d, socket value=%d\n", ipconf->address, ip_port, nodes[socket_device_id].socket);

julien.delange's avatar
julien.delange committed
551
      reuse = 1;
jdelange's avatar
jdelange committed
552
553
554
555
556

      if (setsockopt (nodes[id].socket, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof (reuse)))
      {
         __DEBUGMSG ("[DRIVER SOCKETS] Error while making the receiving socket reusable\n");
      }
julien.delange's avatar
julien.delange committed
557
558
559

      sa.sin_addr.s_addr = htonl (INADDR_ANY);   /* We listen on all adresses */
      sa.sin_family = AF_INET;                   
560
      sa.sin_port = htons (ip_port);   /* Port provided by the generated code */
julien.delange's avatar
julien.delange committed
561

562
      if( bind( nodes[id].socket , ( struct sockaddr * ) &sa , sizeof( struct sockaddr_in ) ) < 0 )
julien.delange's avatar
julien.delange committed
563
564
      {
#ifdef __PO_HI_DEBUG
565
         __DEBUGMSG ("Unable to bind socket and port on socket %d\n", nodes[id].socket);
julien.delange's avatar
julien.delange committed
566
567
568
#endif
      }

569
      if( listen( nodes[id].socket , __PO_HI_NB_ENTITIES ) < 0 )
julien.delange's avatar
julien.delange committed
570
571
      {
#ifdef __PO_HI_DEBUG
572
         __DEBUGMSG ("Cannot listen on socket %d\n", nodes[id].socket);
julien.delange's avatar
julien.delange committed
573
#endif
574
      }
julien.delange's avatar
julien.delange committed
575
576
577
578
579
580
581
582
583
584

      /* 
       * Create the thread which receive all data from other
       * nodes. This thread will execute the function
       * __po_hi_receiver_task
       */

      __po_hi_initialize_add_task ();

      __po_hi_create_generic_task 
jdelange's avatar
jdelange committed
585
         (-1, 0,__PO_HI_MAX_PRIORITY, 0, (void* (*)(void))__po_hi_sockets_poller);
586
587
588
   }

   /*
julien.delange's avatar
julien.delange committed
589
590
    * For each node in the sytem that may communicate with the current
    * node we create a socket. This socket will be used to send data.
591
    */
592
   for (dev = 0 ; dev < __PO_HI_NB_DEVICES ; dev++ )
593
   {
594
      if (dev == id)
595
      {
596
597
598
599
600
601
602
603
         continue;
      }

      __DEBUGMSG ("[DRIVER SOCKETS] Will initialize connection with device %d\n", dev);

      ip_port = 0;

      ipconf = (__po_hi_c_ip_conf_t*) __po_hi_get_device_configuration (dev);
jdelange's avatar
jdelange committed
604
      ip_port = (unsigned short)ipconf->port;
605

jdelange's avatar
jdelange committed
606
      __DEBUGMSG ("[DRIVER SOCKETS] Configuration for device %d, port=%d\n", dev, ip_port);
607
608
609
610
611
612
613
614
615
616
617
618

      if (ip_port == 0)
      {
         __DEBUGMSG ("[DRIVER SOCKETS] Invalid remote port\n");
         continue;
      }

      while (1)
      {
         nodes[dev].socket = socket (AF_INET, SOCK_STREAM, 0);

         if (nodes[dev].socket == -1 )
619
         {
620
621
622
            __DEBUGMSG ("[DRIVER SOCKETS] Socket for dev %d is not created\n", dev);
            return;
         }
623

624
         __DEBUGMSG ("[DRIVER SOCKETS] Socket for dev %d created, value=%d\n", dev, nodes[dev].socket);
julien.delange's avatar
julien.delange committed
625

jdelange's avatar
jdelange committed
626
627
         hostinfo = NULL;

628
         hostinfo = gethostbyname ((char*)ipconf->address);
julien.delange's avatar
julien.delange committed
629

630
631
632
633
         if (hostinfo == NULL )
         {
            __DEBUGMSG ("[DRIVER SOCKETS] Error while getting host informations for device %d\n", dev);
         }
634

635
636
637
638
639
640
641
642
643
644
         sa.sin_port = htons (ip_port);
         sa.sin_family = AF_INET;

         /* The following lines are used to copy the
          * hostinfo->h_length to the sa.sin_addr member. Most
          * of program use the memcpy to do that, but the
          * RT-POSIX profile we use forbid the use of this
          * function.  We use a loop instead to perform the
          * copy.  So, these lines replace the code :
          *
jdelange's avatar
jdelange committed
645
          *
646
647
648
649
650
651
652
          * memcpy( (char*) &( sa.sin_addr ) , (char*)hostinfo->h_addr , hostinfo->h_length );
          */
         tmp = (char*) &(sa.sin_addr);
         for (i=0 ; i<hostinfo->h_length ; i++)
         {
            tmp[i] = hostinfo->h_addr[i];
         }
653

jdelange's avatar
jdelange committed
654

655
656
657
658
         /*
          * We try to connect on the remote host. We try every
          * second to connect on.
          */
jdelange's avatar
jdelange committed
659
         __PO_HI_SET_SOCKET_TIMEOUT(nodes[dev].socket,500000);
660

661
662
663
         ret = connect (nodes[dev].socket, 
                        (struct sockaddr*) &sa ,
                        sizeof (struct sockaddr_in));
julien.delange's avatar
julien.delange committed
664

665
666
667
668
669
670
671
672
673
674
675
676
677
#ifdef __PO_HI_USE_PROTOCOL_MYPROTOCOL_I
         if (ret == 0)
         {
            __DEBUGMSG ("[DRIVER SOCKETS] Connection established with device %d, socket=%d\n", dev, nodes[dev].socket);

            break;
         }
         else
         {
            __DEBUGMSG ("connect() failed, return=%d\n", ret);
         }

#else
678
679
         if (ret == 0)
         {
julien.delange's avatar
julien.delange committed
680

681
682
            __DEBUGMSG ("[DRIVER SOCKETS] Send my id (%d)\n", id);
            if (write (nodes[dev].socket, &id, sizeof (__po_hi_device_id)) != sizeof (__po_hi_device_id))
julien.delange's avatar
julien.delange committed
683
            {
684
               __DEBUGMSG ("[DRIVER SOCKETS] Device %d cannot send his id\n", id);
julien.delange's avatar
julien.delange committed
685
            }
686
687
688
689
690
691
692
            __DEBUGMSG ("[DRIVER SOCKETS] Connection established with device %d, socket=%d\n", dev, nodes[dev].socket);
            break;
         }
         else
         {
            __DEBUGMSG ("connect() failed, return=%d\n", ret);
         }
693
#endif
julien.delange's avatar
julien.delange committed
694

695
696
697
         if (close (nodes[dev].socket))
         {
            __DEBUGMSG ("[DRIVER SOCKETS] Cannot close socket %d\n", nodes[dev].socket);
698
         }
julien.delange's avatar
julien.delange committed
699

700
701
702
703
         /*
          * We wait 500ms each time we try to connect on the
          * remote host
          */
704

705
706
707
         __po_hi_get_time (&current_time);
         __po_hi_milliseconds (&tmptime, 500);
         __po_hi_add_times (&mytime, &current_time, &tmptime);
708
         __DEBUGMSG ("[DRIVER SOCKETS] Cannot connect on device %d, wait 500ms\n", dev);
709
         __po_hi_delay_until (&mytime);
710
711
      }
   }
712

jdelange's avatar
jdelange committed
713
   __DEBUGMSG ("[DRIVER SOCKETS] INITIALIZATION DONE\n");
714
}
715

716
717
#endif