po_hi_driver_sockets.c 19.2 KB
Newer Older
1
2
3
4
5
6
7
/*
 * This is a part of PolyORB-HI-C distribution, a minimal
 * middleware written for generated code from AADL models.
 * You should use it with the Ocarina toolsuite.
 *
 * For more informations, please visit http://ocarina.enst.fr
 *
8
 * Copyright (C) 2010, European Space Agency
9
10
11
 * Copyright (C) 2007-2008, GET-Telecom Paris.
 */

12
#include <deployment.h>
julien.delange's avatar
julien.delange committed
13
#include <marshallers.h>
14

julien.delange's avatar
julien.delange committed
15
#if (defined (__PO_HI_NEED_DRIVER_SOCKETS) || \
16
     defined (__PO_HI_NEED_DRIVER_SOCKETSNEW) || \
julien.delange's avatar
julien.delange committed
17
     defined (__PO_HI_NEED_DRIVER_RTEMS_NE2000_SOCKETS))
18

19
20
21
22
23
24
25
26
27
#include <po_hi_config.h>
#include <po_hi_task.h>
#include <po_hi_transport.h>
#include <po_hi_debug.h>
#include <po_hi_types.h>
#include <po_hi_messages.h>
#include <po_hi_returns.h>
#include <po_hi_main.h>
#include <po_hi_task.h>
28
#include <po_hi_gqueue.h>
29
#include <drivers/po_hi_driver_sockets.h>
30
#include <drivers/po_hi_driver_sockets_common.h>
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

#include <activity.h>

#include <signal.h>
#include <string.h>
#include <unistd.h>
#include <netdb.h>
#include <sys/types.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/time.h>

/*
46
 * This file (po_hi_sockets.c) provides function to handle
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
 * communication between nodes in PolyORB-HI-C.  We don't use a
 * protocol to send data. For each data sent, we send before the
 * entity number provided by the generated file deployment.h, then, we
 * send the message.  Each entity has a fixed size
 * (sizeof(__po_hi_entity_t)), and each message has a max fixed size
 * (see the __PO_HI_MESSAGES_MAX_SIZE macro).
 */

/* The following declarations avoid conflicts
 * with current generated code.
 */

#ifndef __PO_HI_NB_NODES
#define __PO_HI_NB_NODES 1
#endif

/*
 * We have two arrays of sockets. The first array (nodes) is used to
 * send data to other nodes. A special socket if nodes[mynode] : this
 * socket is used to listen others processes.  The second array
 * (rnodes), is used to store all socket that are created by the
 * listen socket. This array is used only by the receiver_task
 */

julien.delange's avatar
julien.delange committed
71
#ifdef __PO_HI_NEED_DRIVER_SOCKETS
72
73
__po_hi_inetnode_t nodes[__PO_HI_NB_NODES];
__po_hi_inetnode_t rnodes[__PO_HI_NB_NODES];
julien.delange's avatar
julien.delange committed
74
75
76
77
78
#else
__po_hi_inetnode_t nodes[__PO_HI_NB_DEVICES];
__po_hi_inetnode_t rnodes[__PO_HI_NB_DEVICES];
#endif

79
#ifdef __PO_HI_NEED_DRIVER_SOCKETS
julien.delange's avatar
julien.delange committed
80
int __po_hi_driver_sockets_send (__po_hi_entity_t from, 
81
82
                                 __po_hi_entity_t to, 
                                 __po_hi_msg_t* msg)
83
{
julien.delange's avatar
julien.delange committed
84
85
86
87
88
89
90
91
92
93
   __po_hi_node_t  node;
   int             len;
   int             size_to_write;
   int             optval = 0;
   socklen_t       optlen = 0;

   node = __po_hi_transport_get_node_from_entity (to);

   if (nodes[node].socket == -1 )
   {
94
#ifdef __PO_HI_DEBUG
95
      __DEBUGMSG (" [... failure ...]\n");
96
97
98
99
100
101
102
103
#endif
      return __PO_HI_ERROR_TRANSPORT_SEND;		
   }

   /*
    * After sending the entity identifier, we send the message which
    * contains the request.
    */
julien.delange's avatar
julien.delange committed
104
105
106
107
108

   size_to_write = __PO_HI_MESSAGES_MAX_SIZE;

   if (getsockopt (nodes[node].socket, SOL_SOCKET, SO_ERROR, &optval, &optlen) == -1)
   {
109
      __DEBUGMSG (" [error getsockopt() in file %s, line%d ]\n", __FILE__, __LINE__);
110
111
112
113
114
      close (nodes[node].socket);
      nodes[node].socket = -1;
      return __PO_HI_ERROR_TRANSPORT_SEND;		
   }

julien.delange's avatar
julien.delange committed
115
116
   if (optval != 0)
   {
117
      __DEBUGMSG (" [error getsockopt() return code in file %s, line%d ]\n", __FILE__, __LINE__);
118
119
120
      close (nodes[node].socket);
      nodes[node].socket = -1;
      return __PO_HI_ERROR_TRANSPORT_SEND;		
julien.delange's avatar
julien.delange committed
121
   }
122

julien.delange's avatar
julien.delange committed
123
124
125
126
   /* Ignore SIGPIPE to be able to recover from errors instead of crashing the node */

   if (signal (SIGPIPE, SIG_IGN) == SIG_ERR)
   {
127
      __DEBUGMSG (" [error signal() return code in file %s, line%d ]\n", __FILE__, __LINE__);
128
129
130
      close (nodes[node].socket);
      nodes[node].socket = -1;
      return __PO_HI_ERROR_TRANSPORT_SEND;		
julien.delange's avatar
julien.delange committed
131
132
   }

133
#ifdef __PO_HI_DEBUG
julien.delange's avatar
julien.delange committed
134
   __po_hi_messages_debug (msg);
135
136
#endif

julien.delange's avatar
julien.delange committed
137
138
139
140
   len = write (nodes[node].socket, &(msg->content), size_to_write);

   if (len != size_to_write)
   {
141
      __DEBUGMSG (" [error write() length in file %s, line%d ]\n", __FILE__, __LINE__);
142
143
144
      close (nodes[node].socket);
      nodes[node].socket = -1;
      return __PO_HI_ERROR_TRANSPORT_SEND;		
julien.delange's avatar
julien.delange committed
145
146
   }

147
   __DEBUGMSG (" [SUCCESS]\n");
148

julien.delange's avatar
julien.delange committed
149
   return __PO_HI_SUCCESS;
150
}
151
152
#endif

153
154
155
#if (defined (__PO_HI_NEED_DRIVER_SOCKETSNEW) || \
     defined (__PO_HI_NEED_DRIVER_RTEMS_NE2000_SOCKETS))

156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
int __po_hi_driver_sockets_send (__po_hi_task_id task_id,
                                 __po_hi_port_t port)
{
   int                     len;
   int                     size_to_write;
   int                     optval = 0;
   socklen_t               optlen = 0;
   __po_hi_device_id       associated_device;
   __po_hi_local_port_t    local_port;
   __po_hi_request_t*      request;
   __po_hi_port_t          destination_port;
   __po_hi_msg_t           msg;

   local_port = __po_hi_get_local_port_from_global_port (port);

   request = __po_hi_gqueue_get_most_recent_value (task_id, local_port);

   destination_port     = __po_hi_gqueue_get_destination (task_id, local_port, 0);

   associated_device = __po_hi_get_device_from_port (destination_port);

177
178
179
180
181
182
183
184
185
   if (request->port == -1)
   {

#ifdef __PO_HI_DEBUG
      __DEBUGMSG (" [DRIVER SOCKETS] No data to write on port %d\n", port);
#endif
      return;
   }

186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
   if (nodes[associated_device].socket == -1 )
   {
#ifdef __PO_HI_DEBUG
      __DEBUGMSG (" [DRIVER SOCKETS] Invalid socket for port-id %d, device-id %d\n", destination_port, associated_device);
#endif
      return __PO_HI_ERROR_TRANSPORT_SEND;		
   }

   /*
    * After sending the entity identifier, we send the message which
    * contains the request.
    */

   size_to_write = __PO_HI_MESSAGES_MAX_SIZE;

   if (getsockopt (nodes[associated_device].socket, SOL_SOCKET, SO_ERROR, &optval, &optlen) == -1)
   {
      __DEBUGMSG (" [error getsockopt() in file %s, line%d ]\n", __FILE__, __LINE__);
      close (nodes[associated_device].socket);
      nodes[associated_device].socket = -1;
      return __PO_HI_ERROR_TRANSPORT_SEND;		
   }

   if (optval != 0)
   {
      __DEBUGMSG (" [error getsockopt() return code in file %s, line%d ]\n", __FILE__, __LINE__);
      close (nodes[associated_device].socket);
      nodes[associated_device].socket = -1;
      return __PO_HI_ERROR_TRANSPORT_SEND;		
   }

   /* Ignore SIGPIPE to be able to recover from errors instead of crashing the node */

   if (signal (SIGPIPE, SIG_IGN) == SIG_ERR)
   {
      __DEBUGMSG (" [error signal() return code in file %s, line%d ]\n", __FILE__, __LINE__);
      close (nodes[associated_device].socket);
      nodes[associated_device].socket = -1;
      return __PO_HI_ERROR_TRANSPORT_SEND;
   }
   request->port = destination_port;
   __po_hi_msg_reallocate (&msg);
   __po_hi_marshall_request (request, &msg);

#ifdef __PO_HI_DEBUG
   __po_hi_messages_debug (&msg);
#endif

   len = write (nodes[associated_device].socket, &(msg.content), size_to_write);

   if (len != size_to_write)
   {
      __DEBUGMSG (" [error write() length in file %s, line%d ]\n", __FILE__, __LINE__);
      close (nodes[associated_device].socket);
      nodes[associated_device].socket = -1;
      return __PO_HI_ERROR_TRANSPORT_SEND;		
   }

   return __PO_HI_SUCCESS;
}


#endif
249

julien.delange's avatar
julien.delange committed
250
251
252
253

extern __po_hi_device_id socket_device_id;

void* __po_hi_sockets_poller (void)
254
{
255
   __DEBUGMSG ("Poller launched, device-id=%d\n", socket_device_id);
julien.delange's avatar
julien.delange committed
256
257
258
259
260
261
262
   socklen_t          socklen = sizeof (struct sockaddr);
   /* See ACCEPT (2) for details on initial value of socklen */

   __po_hi_uint32_t   len;
   int                sock;
   int                max_socket;
   fd_set             selector;
julien.delange's avatar
julien.delange committed
263
264
265
266
   struct sockaddr_in sa;
   __po_hi_node_t     dev;
   __po_hi_node_t     dev_init;
   __po_hi_request_t  received_request;
julien.delange's avatar
julien.delange committed
267
   __po_hi_msg_t      msg;
268
   int                established = 0; 
julien.delange's avatar
julien.delange committed
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285

   max_socket = 0; /* Used to compute the max socket number, useful for listen() call */

   /*
    * We initialize each node socket with -1 value.  This value means
    * that the socket is not active.
    */
   for (dev = 0 ; dev < __PO_HI_NB_DEVICES ; dev++)
   {
      rnodes[dev].socket = -1;
   }

   /*
    * Create a socket for each node that will communicate with us.
    */
   for (dev = 0; dev < __PO_HI_NB_DEVICES ; dev++)
   {
286
      if ( (dev != socket_device_id) && (strstr (__po_hi_get_device_naming (dev), "ip") != NULL))
julien.delange's avatar
julien.delange committed
287
      {
288
         __DEBUGMSG ("[DRIVER SOCKETS] Poller waits for connection with device %d\n", dev);
julien.delange's avatar
julien.delange committed
289

290
         /*
291
         __PO_HI_SET_SOCKET_TIMEOUT(nodes[socket_device_id].socket,5);
292
         */
293
294
295
296

         established = 0;

         while (established == 0)
julien.delange's avatar
julien.delange committed
297
         {
298
299
300
301
302
303
304
305
306
307
308
309
310
            sock = accept (nodes[socket_device_id].socket, (struct sockaddr*) &sa, &socklen);

            __PO_HI_SET_SOCKET_TIMEOUT(sock,10);

            if (read (sock, &dev_init, sizeof (__po_hi_device_id)) != sizeof (__po_hi_device_id))
            {
               established = 0;
               __DEBUGMSG ("[DRIVER SOCKETS] Cannot read device-id for device %d, socket=%d\n", dev, sock);
            }
            else
            {
               established = 1;
            }
julien.delange's avatar
julien.delange committed
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
         }
         __DEBUGMSG ("[DRIVER SOCKETS] read device-id %d from socket=%d\n", dev_init, sock);
         rnodes[dev].socket = sock;
         if (sock > max_socket )
         {
            max_socket = sock;
         }	  
      }
   }
   __DEBUGMSG ("[DRIVER SOCKETS] Poller initialization finished\n");
   __po_hi_wait_initialization ();

   /*
    * Then, listen and receive data on the socket, identify the node
    * which send the data and put it in its message queue
    */
   while (1)
   {
      FD_ZERO( &selector );
      for (dev = 0; dev < __PO_HI_NB_DEVICES ; dev++)
      {
         if ( (dev != socket_device_id ) && ( rnodes[dev].socket != -1 ) )
         {
            FD_SET( rnodes[dev].socket , &selector );
         }
      }

      if (select (max_socket + 1, &selector, NULL, NULL, NULL) == -1 )
      {
#ifdef __PO_HI_DEBUG
         __DEBUGMSG ("[DRIVER SOCKETS] Error on select for node %d\n", mynode);
#endif 
      }
#ifdef __PO_HI_DEBUG
      __DEBUGMSG ("[DRIVER SOCKETS] Receive message\n");
346
#endif
julien.delange's avatar
julien.delange committed
347
348
349

      for (dev = 0; dev < __PO_HI_NB_DEVICES ; dev++)
      {
350
         __DEBUGMSG ("[DRIVER SOCKETS] Try to watch if it comes from device %d (socket=%d)\n", dev, rnodes[dev].socket);
julien.delange's avatar
julien.delange committed
351
352
353
354
355
         if ( (rnodes[dev].socket != -1 ) && FD_ISSET(rnodes[dev].socket, &selector))
         {
            __DEBUGMSG ("[DRIVER SOCKETS] Receive message from dev %d\n", dev);
            memset (msg.content, '\0', __PO_HI_MESSAGES_MAX_SIZE);
            len = recv (rnodes[dev].socket, msg.content, __PO_HI_MESSAGES_MAX_SIZE, MSG_WAITALL);
356
            msg.length = len;
julien.delange's avatar
julien.delange committed
357
358
            __DEBUGMSG ("[DRIVER SOCKETS] Message received len=%d\n",(int)len);

359
360
361
362
363
#ifdef __PO_HI_DEBUG
   __po_hi_messages_debug (&msg);
#endif


julien.delange's avatar
julien.delange committed
364
365
            if (len == 0)
            {
366
367

               __DEBUGMSG ("[DRIVER SOCKETS] Zero size from device %d\n",dev);
julien.delange's avatar
julien.delange committed
368
369
370
371
372
373
               rnodes[dev].socket = -1;
               continue;
            }

            __po_hi_unmarshall_request (&received_request, &msg);

374

julien.delange's avatar
julien.delange committed
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
            __po_hi_main_deliver (&received_request);
         }
      }
   }  
   return NULL;
}


/*
 * Old receiver code that is based on PolyORB-HI-C for AADLv1
 * Would be considered as deprecated.
 */
void* __po_hi_sockets_receiver_task (void)
{
   socklen_t          socklen = sizeof (struct sockaddr);
   /* See ACCEPT (2) for details on initial value of socklen */

   __po_hi_uint32_t   len;
   int                sock;
   int                max_socket;
   fd_set             selector;
   __po_hi_msg_t      msg;
julien.delange's avatar
julien.delange committed
397
398
399
400
401
   __po_hi_node_t     node;
   __po_hi_node_t     node_init;
   __po_hi_request_t  received_request;
   struct sockaddr_in sa;

402

julien.delange's avatar
julien.delange committed
403
404
405
406
407
408
409
410
   max_socket = 0; /* Used to compute the max socket number, useful for listen() call */

   /*
    * We initialize each node socket with -1 value.  This value means
    * that the socket is not active.
    */
   for (node = 0 ; node < __PO_HI_NB_NODES ; node++)
   {
411
      rnodes[node].socket = -1;
julien.delange's avatar
julien.delange committed
412
413
414
415
416
417
418
   }

   /*
    * Create a socket for each node that will communicate with us.
    */
   for (node = 0; node < __PO_HI_NB_NODES ; node++)
   {
419
      if (node != mynode )
julien.delange's avatar
julien.delange committed
420
421
      {
         sock = accept (nodes[mynode].socket, (struct sockaddr*) &sa, &socklen);
422
423
424
425
         if (sock == -1)
         {
            __DEBUGMSG ("accept() failed, return=%d\n", sock);
         }
julien.delange's avatar
julien.delange committed
426
427
428
429
430
431

         if (read (sock, &node_init, sizeof (__po_hi_node_t)) != sizeof (__po_hi_node_t))
         {
            __DEBUGMSG ("Cannot read node-id for socket %d\n", sock);
            continue;
         }
432

julien.delange's avatar
julien.delange committed
433
434
435
436
437
438
439
         rnodes[node].socket = sock;
         if (sock > max_socket )
         {
            max_socket = sock;
         }	  
      }
   }
440
#ifdef __PO_HI_DEBUG
julien.delange's avatar
julien.delange committed
441
   __DEBUGMSG ("Receiver initialization finished\n");
442
#endif
julien.delange's avatar
julien.delange committed
443
444
445
446
447
448
449
   __po_hi_wait_initialization ();

   /*
    * Then, listen and receive data on the socket, identify the node
    * which send the data and put it in its message queue
    */
   while (1)
450
   {
julien.delange's avatar
julien.delange committed
451
452
453
      FD_ZERO( &selector );
      for (node = 0; node < __PO_HI_NB_NODES ; node++)
      {
454
         if ( (node != mynode ) && ( rnodes[node].socket != -1 ) )
julien.delange's avatar
julien.delange committed
455
456
457
458
459
460
461
         {
            FD_SET( rnodes[node].socket , &selector );
         }
      }

      if (select (max_socket + 1, &selector, NULL, NULL, NULL) == -1 )
      {
462
463
464
#ifdef __PO_HI_DEBUG
         __DEBUGMSG ("Error on select for node %d\n", mynode);
#endif 
julien.delange's avatar
julien.delange committed
465
      }
466
#ifdef __PO_HI_DEBUG
julien.delange's avatar
julien.delange committed
467
      __DEBUGMSG ("Receive message\n");
468
#endif
julien.delange's avatar
julien.delange committed
469
470
471

      for (node = 0; node < __PO_HI_NB_NODES ; node++)
      {
472
         if ( (rnodes[node].socket != -1 ) && FD_ISSET(rnodes[node].socket, &selector))
julien.delange's avatar
julien.delange committed
473
         {
474
#ifdef __PO_HI_DEBUG
julien.delange's avatar
julien.delange committed
475
            __DEBUGMSG ("Receive message from node %d\n", node);
476
#endif
julien.delange's avatar
julien.delange committed
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492

            __DEBUGMSG ("Using raw protocol stack\n");
            len = recv (rnodes[node].socket, &(msg.content), __PO_HI_MESSAGES_MAX_SIZE, MSG_WAITALL);
            msg.length = len;
            if (len != __PO_HI_MESSAGES_MAX_SIZE )
            {
               __DEBUGMSG ("ERROR, %u %d", (unsigned int) len, __PO_HI_MESSAGES_MAX_SIZE);
               close (rnodes[node].socket);
               rnodes[node].socket = -1;
               continue;
            }
            __DEBUGMSG ("Message delivered");

            __po_hi_unmarshall_request (&received_request, &msg);

            __po_hi_main_deliver (&received_request);
julien.delange's avatar
julien.delange committed
493

julien.delange's avatar
julien.delange committed
494
495
496
            __po_hi_msg_reallocate(&msg);        /* re-initialize the message */
         }
      }
497
   }  
julien.delange's avatar
julien.delange committed
498
   return NULL;
499
500
}

501
502
503
504
505
506
507
508
#ifdef __PO_HI_NEED_DRIVER_SOCKETSNEW
void __po_hi_driver_sockets_init (__po_hi_device_id id)
{
   __po_hi_driver_sockets_common_generic_init (id, __po_hi_sockets_poller);
}
#endif


509

julien.delange's avatar
julien.delange committed
510
511
512
513
514
515
516
/*
 * The following code implements the old socket layer
 * for PolyORB-HI-C and AADLv1.
 * Would be considered as deprecated.
 */
#ifdef __PO_HI_NEED_DRIVER_SOCKETS
void __po_hi_driver_sockets_init (__po_hi_device_id id)
517
{
julien.delange's avatar
julien.delange committed
518
519
520
521
522
   int                i;
   int                ret;
   int                reuse;
   char               *tmp;
   __po_hi_time_t     mytime;
523
   struct sockaddr_in sa;
julien.delange's avatar
julien.delange committed
524
   struct hostent*    hostinfo;
525

julien.delange's avatar
julien.delange committed
526
527
   char dev_addr[16];
   int node;
528

julien.delange's avatar
julien.delange committed
529
530
531
532
   memset (dev_addr, '\0', 16);


   for (node = 0 ; node < __PO_HI_NB_NODES ; node++)
533
   {
julien.delange's avatar
julien.delange committed
534
      nodes[node].socket = -1;
535
536
537
   }

   /*
julien.delange's avatar
julien.delange committed
538
539
540
    * If the current node port has a port number, then it has to
    * listen to other nodes. So, we create a socket, bind it and
    * listen to other nodes.
541
    */
julien.delange's avatar
julien.delange committed
542
   if ( node_port[mynode] != __PO_HI_NOPORT )
543
   {
julien.delange's avatar
julien.delange committed
544
545
546
      nodes[mynode].socket = socket (AF_INET, SOCK_STREAM, 0);

      if (nodes[mynode].socket == -1 )
547
      {
julien.delange's avatar
julien.delange committed
548
549
550
551
552
#ifdef __PO_HI_DEBUG
         __DEBUGMSG ("Cannot create socket for node %d\n", mynode);
#endif
         return;
      }
553

julien.delange's avatar
julien.delange committed
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
      reuse = 1;
      setsockopt (nodes[mynode].socket, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof (reuse));

      sa.sin_addr.s_addr = htonl (INADDR_ANY);   /* We listen on all adresses */
      sa.sin_family = AF_INET;                   
      sa.sin_port = htons (node_port[mynode]);   /* Port provided by the generated code */

      if( bind( nodes[mynode].socket , ( struct sockaddr * ) &sa , sizeof( struct sockaddr_in ) ) < 0 )
      {
#ifdef __PO_HI_DEBUG
         __DEBUGMSG ("Unable to bind socket and port on socket %d\n", nodes[mynode].socket);
#endif
      }

      if( listen( nodes[mynode].socket , __PO_HI_NB_ENTITIES ) < 0 )
      {
#ifdef __PO_HI_DEBUG
         __DEBUGMSG ("Cannot listen on socket %d\n", nodes[mynode].socket);
#endif
573
      }
julien.delange's avatar
julien.delange committed
574
575
576
577
578
579
580
581
582
583
584

      /* 
       * Create the thread which receive all data from other
       * nodes. This thread will execute the function
       * __po_hi_receiver_task
       */

      __po_hi_initialize_add_task ();

      __po_hi_create_generic_task 
         (-1, 0,__PO_HI_MAX_PRIORITY, 0, __po_hi_sockets_receiver_task);
585
586
587
   }

   /*
julien.delange's avatar
julien.delange committed
588
589
    * For each node in the sytem that may communicate with the current
    * node we create a socket. This socket will be used to send data.
590
    */
julien.delange's avatar
julien.delange committed
591
   for (node = 0 ; node < __PO_HI_NB_NODES ; node++ )
592
   {
julien.delange's avatar
julien.delange committed
593
      if ( (node != mynode) && (node_port[node] != __PO_HI_NOPORT) && (nodes[node].socket == -1) )
594
      {
julien.delange's avatar
julien.delange committed
595
         while (1)
596
         {
julien.delange's avatar
julien.delange committed
597
            nodes[node].socket = socket (AF_INET, SOCK_STREAM, 0);
598

julien.delange's avatar
julien.delange committed
599
600
            if (nodes[node].socket == -1 )
            {
601
#ifdef __PO_HI_DEBUG
julien.delange's avatar
julien.delange committed
602
603
604
605
606
607
608
609
610
               __DEBUGMSG ("Socket for node %d is not created", node);
#endif
               return;
            }

            hostinfo = gethostbyname ((char*)node_addr[node]);

            if (hostinfo == NULL )
            {
611
#ifdef __PO_HI_DEBUG
julien.delange's avatar
julien.delange committed
612
               __DEBUGMSG ("Error while getting host informations for node %d\n", node);
613
#endif
julien.delange's avatar
julien.delange committed
614
            }
615

julien.delange's avatar
julien.delange committed
616
617
618
619
620
621
622
623
624
625
626
627
            sa.sin_port = htons( node_port[node] );
            sa.sin_family = AF_INET;

            /* The following lines are used to copy the
             * hostinfo->h_length to the sa.sin_addr member. Most
             * of program use the memcpy to do that, but the
             * RT-POSIX profile we use forbid the use of this
             * function.  We use a loop instead to perform the
             * copy.  So, these lines replace the code :
             *
             * memcpy( (char*) &( sa.sin_addr ) , (char*)hostinfo->h_addr , hostinfo->h_length );
             */
628

julien.delange's avatar
julien.delange committed
629
630
            tmp = (char*) &(sa.sin_addr);
            for (i=0 ; i<hostinfo->h_length ; i++)
631
            {
julien.delange's avatar
julien.delange committed
632
               tmp[i] = hostinfo->h_addr[i];
633
634
            }

julien.delange's avatar
julien.delange committed
635
636
637
638
            /*
             * We try to connect on the remote host. We try every
             * second to connect on.
             */
639

julien.delange's avatar
julien.delange committed
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
            ret = connect 
               (nodes[node].socket, ( struct sockaddr* ) &sa , sizeof( struct sockaddr_in ));

            if (ret == 0)
            {
               if (write (nodes[node].socket, &mynode, sizeof (__po_hi_node_t)) != sizeof (__po_hi_node_t))
               {
#ifdef __PO_HI_DEBUG
                  __DEBUGMSG ("Node %d cannot send his node-id\n", node);
#endif
               }
               break;
            }

            if (close (nodes[node].socket))
            {
#ifdef __PO_HI_DEBUG
               __DEBUGMSG ("Cannot close socket %d\n", nodes[node].socket);
#endif
            }

            /*
             * We wait 500ms each time we try to connect on the
             * remote host
             */

            __po_hi_get_time (&mytime);
            __po_hi_delay_until (__po_hi_add_times (mytime, __po_hi_milliseconds (500)));
668
669
         }
      }
julien.delange's avatar
julien.delange committed
670
   }
671
}
julien.delange's avatar
julien.delange committed
672
673
#endif

674
675
676



677
678
#endif