po_hi_driver_sockets.c 22.2 KB
Newer Older
1
2
3
4
5
/*
 * This is a part of PolyORB-HI-C distribution, a minimal
 * middleware written for generated code from AADL models.
 * You should use it with the Ocarina toolsuite.
 *
yoogx's avatar
yoogx committed
6
 * For more informations, please visit http://taste.tuxfamily.org/wiki
7
 *
yoogx's avatar
yoogx committed
8
 * Copyright (C) 2010-2014 ESA & ISAE.
9
10
 */

11
#include <deployment.h>
julien.delange's avatar
julien.delange committed
12
#include <marshallers.h>
13

jdelange's avatar
jdelange committed
14

julien.delange's avatar
julien.delange committed
15
16
#if (defined (__PO_HI_NEED_DRIVER_SOCKETS) || \
     defined (__PO_HI_NEED_DRIVER_RTEMS_NE2000_SOCKETS))
17

jdelange's avatar
jdelange committed
18
19
20
21
#ifdef _WIN32
#include <winsock2.h>
#endif

22
#include <po_hi_config.h>
jdelange's avatar
jdelange committed
23
#include <po_hi_utils.h>
24
25
26
27
28
#include <po_hi_task.h>
#include <po_hi_transport.h>
#include <po_hi_debug.h>
#include <po_hi_types.h>
#include <po_hi_messages.h>
jdelange's avatar
jdelange committed
29
#include <po_hi_monitor.h>
30
31
32
#include <po_hi_returns.h>
#include <po_hi_main.h>
#include <po_hi_task.h>
33
#include <po_hi_gqueue.h>
34
#include <drivers/po_hi_driver_sockets.h>
35
36
37
38
39
40

#include <activity.h>

#include <signal.h>
#include <string.h>
#include <unistd.h>
jdelange's avatar
jdelange committed
41
#ifndef _WIN32
42
43
44
45
46
#include <netdb.h>
#include <sys/types.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <netinet/in.h>
yoogx's avatar
yoogx committed
47
#include <netinet/tcp.h>
48
49
#include <arpa/inet.h>
#include <sys/time.h>
jdelange's avatar
jdelange committed
50
#endif
51
52

/*
53
 * This file (po_hi_sockets.c) provides function to handle
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
 * communication between nodes in PolyORB-HI-C.  We don't use a
 * protocol to send data. For each data sent, we send before the
 * entity number provided by the generated file deployment.h, then, we
 * send the message.  Each entity has a fixed size
 * (sizeof(__po_hi_entity_t)), and each message has a max fixed size
 * (see the __PO_HI_MESSAGES_MAX_SIZE macro).
 */

/* The following declarations avoid conflicts
 * with current generated code.
 */

#ifndef __PO_HI_NB_NODES
#define __PO_HI_NB_NODES 1
#endif

/*
 * We have two arrays of sockets. The first array (nodes) is used to
72
 * send data to other nodes. A special socket if nodes[__po_hi_mynode] : this
73
74
75
76
77
 * socket is used to listen others processes.  The second array
 * (rnodes), is used to store all socket that are created by the
 * listen socket. This array is used only by the receiver_task
 */

jdelange's avatar
jdelange committed
78
79
80
81
82
83
int                  __po_hi_c_sockets_listen_socket;
int                  __po_hi_c_sockets_read_sockets[__PO_HI_NB_DEVICES];
int                  __po_hi_c_sockets_write_sockets[__PO_HI_NB_DEVICES];
__po_hi_request_t    __po_hi_c_sockets_poller_received_request;
__po_hi_msg_t        __po_hi_c_sockets_poller_msg;
__po_hi_msg_t        __po_hi_c_sockets_send_msg;
jdelange's avatar
jdelange committed
84
__po_hi_device_id    __po_hi_c_sockets_device_id;
85

jdelange's avatar
jdelange committed
86
87
int      __po_hi_c_sockets_array_init_done = 0;

88
89
90
int __po_hi_driver_sockets_send (__po_hi_task_id task_id,
                                 __po_hi_port_t port)
{
91
92
   int                        len;
   int                        size_to_write;
jdelange's avatar
jdelange committed
93
#ifndef _WIN32
94
95
   int                        optval = 0;
   socklen_t                  optlen = 0;
jdelange's avatar
jdelange committed
96
97
98
99
#else
   char FAR                   optval = 0;
   int FAR                    optlen = 0;
#endif
jdelange's avatar
jdelange committed
100
   __po_hi_device_id          remote_device;
jdelange's avatar
jdelange committed
101
   __po_hi_device_id          local_device;
102
103
104
105
106
   __po_hi_local_port_t       local_port;
   __po_hi_request_t*         request;
   __po_hi_port_t             destination_port;
   __po_hi_protocol_t         protocol_id;
   __po_hi_protocol_conf_t*   protocol_conf;
jdelange's avatar
jdelange committed
107
   __po_hi_monitor_status_t   device_status;
108
109
110
111

   local_port              = __po_hi_get_local_port_from_global_port (port);
   request                 = __po_hi_gqueue_get_most_recent_value (task_id, local_port);
   destination_port        = __po_hi_gqueue_get_destination (task_id, local_port, 0);
jdelange's avatar
jdelange committed
112
113
   local_device            = __po_hi_get_device_from_port (port);
   remote_device           = __po_hi_get_device_from_port (destination_port);
114
115
   protocol_id             = __po_hi_transport_get_protocol (port, destination_port);
   protocol_conf           = __po_hi_transport_get_protocol_configuration (protocol_id);
116

jdelange's avatar
jdelange committed
117

jdelange's avatar
jdelange committed
118
   __DEBUGMSG ("[DRIVER SOCKETS] Try to write from task=%d, port=%d, local_device=%d, remote device=%d, socket=%d\n", task_id, port, local_device, remote_device, __po_hi_c_sockets_write_sockets[remote_device]);
119
120
121
   if (request->port == -1)
   {
      __DEBUGMSG (" [DRIVER SOCKETS] No data to write on port %d\n", port);
122
      return __PO_HI_ERROR_TRANSPORT_SEND;
123
124
   }

jdelange's avatar
jdelange committed
125
126
127
128
129
130
131
#if __PO_HI_MONITOR_ENABLED
   if (__po_hi_monitor_get_status_device (local_device, &device_status) != __PO_HI_SUCCESS)
   {
      __DEBUGMSG ("[DRIVER SOCKETS] Cannot get the status of device %d\n", local_device);
      return __PO_HI_ERROR_TRANSPORT_SEND;
   }

jdelange's avatar
jdelange committed
132
   if (device_status.status != po_hi_monitor_status_ok)
jdelange's avatar
jdelange committed
133
134
135
136
137
138
   {
      __DEBUGMSG ("[DRIVER SOCKETS] Device has a problem and is not able to process the request, aborting (device-id=%d, status= %d)\n", local_device, device_status);
      return __PO_HI_ERROR_TRANSPORT_SEND;
   }
#endif

jdelange's avatar
jdelange committed
139
   if (__po_hi_c_sockets_write_sockets[remote_device] == -1 )
140
141
   {
#ifdef __PO_HI_DEBUG
142
      __DEBUGMSG (" [DRIVER SOCKETS] Invalid socket for port-id %d, device-id %d\n", destination_port, remote_device);
143
#endif
yoogx's avatar
yoogx committed
144
      return __PO_HI_ERROR_TRANSPORT_SEND;
145
146
147
148
149
150
151
152
153
   }

   /*
    * After sending the entity identifier, we send the message which
    * contains the request.
    */

   size_to_write = __PO_HI_MESSAGES_MAX_SIZE;

jdelange's avatar
jdelange committed
154
   if (getsockopt (__po_hi_c_sockets_write_sockets[remote_device], SOL_SOCKET, SO_ERROR, &optval, &optlen) == -1)
155
156
   {
      __DEBUGMSG (" [error getsockopt() in file %s, line%d ]\n", __FILE__, __LINE__);
jdelange's avatar
jdelange committed
157
158
      close (__po_hi_c_sockets_write_sockets[remote_device]);
      __po_hi_c_sockets_write_sockets[remote_device] = -1;
yoogx's avatar
yoogx committed
159
      return __PO_HI_ERROR_TRANSPORT_SEND;
160
161
162
163
164
   }

   if (optval != 0)
   {
      __DEBUGMSG (" [error getsockopt() return code in file %s, line%d ]\n", __FILE__, __LINE__);
jdelange's avatar
jdelange committed
165
166
      close (__po_hi_c_sockets_write_sockets[remote_device]);
      __po_hi_c_sockets_write_sockets[remote_device] = -1;
yoogx's avatar
yoogx committed
167
      return __PO_HI_ERROR_TRANSPORT_SEND;
168
169
   }

jdelange's avatar
jdelange committed
170
#ifndef _WIN32
yoogx's avatar
yoogx committed
171
172
173
   /*
    * Ignore SIGPIPE to be able to recover from
    * errors instead of crashing the node
jdelange's avatar
jdelange committed
174
    */
175
176
177
178

   if (signal (SIGPIPE, SIG_IGN) == SIG_ERR)
   {
      __DEBUGMSG (" [error signal() return code in file %s, line%d ]\n", __FILE__, __LINE__);
jdelange's avatar
jdelange committed
179
180
      close (__po_hi_c_sockets_write_sockets[remote_device]);
      __po_hi_c_sockets_write_sockets[remote_device] = -1;
181
182
      return __PO_HI_ERROR_TRANSPORT_SEND;
   }
jdelange's avatar
jdelange committed
183
#endif
184
185
186
187
188
189

   switch (protocol_id)
   {
#ifdef __PO_HI_USE_PROTOCOL_MYPROTOCOL_I
      case virtual_bus_myprotocol_i:
      {
jdelange's avatar
jdelange committed
190
191
192
         size_to_write = sizeof (int);
         int datawritten;
         protocol_conf->marshaller(request, &datawritten, &size_to_write);
jdelange's avatar
jdelange committed
193
194
195
#ifdef _WIN32
         len = send (__po_hi_c_sockets_write_sockets[remote_device], &datawritten, size_to_write, 0);
#else
jdelange's avatar
jdelange committed
196
         len = write (__po_hi_c_sockets_write_sockets[remote_device], &datawritten, size_to_write);
jdelange's avatar
jdelange committed
197
#endif
jdelange's avatar
jdelange committed
198
199
200
201

         if (len != size_to_write)
         {
            __DEBUGMSG (" [error write() length in file %s, line%d ]\n", __FILE__, __LINE__);
jdelange's avatar
jdelange committed
202
            close (__po_hi_c_sockets_write_sockets[remote_device]);
jdelange's avatar
jdelange committed
203
            __po_hi_c_sockets_write_sockets[remote_device] = -1;
yoogx's avatar
yoogx committed
204
            return __PO_HI_ERROR_TRANSPORT_SEND;
jdelange's avatar
jdelange committed
205
         }
206
207
208
         break;
      }
#endif
yoogx's avatar
yoogx committed
209
      default:
210
211
      {
         request->port = destination_port;
jdelange's avatar
jdelange committed
212
213
         __po_hi_msg_reallocate (&__po_hi_c_sockets_send_msg);
         __po_hi_marshall_request (request, &__po_hi_c_sockets_send_msg);
214
215

#ifdef __PO_HI_DEBUG
jdelange's avatar
jdelange committed
216
         __po_hi_messages_debug (&__po_hi_c_sockets_send_msg[remote_device]);
217
#endif
jdelange's avatar
jdelange committed
218
         if (__po_hi_c_sockets_write_sockets[remote_device] != -1)
219
         {
jdelange's avatar
jdelange committed
220
221
222
223

#ifdef _WIN32
            len = send (__po_hi_c_sockets_write_sockets[remote_device], (char*) &(__po_hi_c_sockets_send_msg.content), size_to_write, 0);
#else
jdelange's avatar
jdelange committed
224
            len = write (__po_hi_c_sockets_write_sockets[remote_device], &(__po_hi_c_sockets_send_msg.content), size_to_write);
jdelange's avatar
jdelange committed
225
#endif
jdelange's avatar
jdelange committed
226
227
228

            if (len != size_to_write)
            {
229
230
231
232
233

#if __PO_HI_MONITOR_ENABLED
               __po_hi_monitor_report_failure_device (remote_device, po_hi_monitor_failure_value);
#endif

jdelange's avatar
jdelange committed
234
235
236
               __DEBUGMSG (" [error write() length in file %s, line%d ]\n", __FILE__, __LINE__);
               close (__po_hi_c_sockets_write_sockets[remote_device]);
               __po_hi_c_sockets_write_sockets[remote_device] = -1;
yoogx's avatar
yoogx committed
237
               return __PO_HI_ERROR_TRANSPORT_SEND;
jdelange's avatar
jdelange committed
238
            }
239
         }
240
241

         request->port = __PO_HI_GQUEUE_INVALID_PORT;
242
243
         break;
      }
244
245
246
247
248
249
   }

   return __PO_HI_SUCCESS;
}


250

jdelange's avatar
jdelange committed
251
void* __po_hi_sockets_poller (__po_hi_device_id* dev_id_addr)
252
{
jdelange's avatar
jdelange committed
253
254
255
256
257
#ifdef _WIN32
   int                        socklen;
#else
   socklen_t                  socklen;
#endif
julien.delange's avatar
julien.delange committed
258
259
   /* See ACCEPT (2) for details on initial value of socklen */

jdelange's avatar
jdelange committed
260
   int                        len;
jdelange's avatar
jdelange committed
261
262
263
264
265
266
   int                        sock;
   int                        max_socket;
   fd_set                     selector;
   struct sockaddr_in         sa;
   __po_hi_device_id          dev;
   __po_hi_node_t             dev_init;
yoogx's avatar
yoogx committed
267
   int                        established = 0;
jdelange's avatar
jdelange committed
268
   int                        ret;
jdelange's avatar
jdelange committed
269
   __po_hi_device_id          dev_id;
jdelange's avatar
jdelange committed
270
   __po_hi_uint32_t           n_connected;
julien.delange's avatar
julien.delange committed
271

jdelange's avatar
jdelange committed
272
273
   socklen = sizeof (struct sockaddr);

julien.delange's avatar
julien.delange committed
274
275
   max_socket = 0; /* Used to compute the max socket number, useful for listen() call */

jdelange's avatar
jdelange committed
276
   dev_id = __po_hi_c_sockets_device_id;
jdelange's avatar
jdelange committed
277
278
279

   __DEBUGMSG ("Poller launched, device-id=%d\n", dev_id);

jdelange's avatar
jdelange committed
280
281
282
   n_connected = 0;
   for (dev = 0; dev < __PO_HI_NB_DEVICES ; dev++)
   {
283
      if (__po_hi_transport_share_bus (dev, dev_id) == 1)
jdelange's avatar
jdelange committed
284
285
286
287
288
289
290
291
292
      {
         n_connected++;
      }
   }


   __DEBUGMSG ("Number of devices that share the bus=%d\n", n_connected);


julien.delange's avatar
julien.delange committed
293
294
295
   /*
    * Create a socket for each node that will communicate with us.
    */
jdelange's avatar
jdelange committed
296
   for (dev = 0; dev < n_connected - 1; dev++)
julien.delange's avatar
julien.delange committed
297
   {
298
299
300
         established = 0;

         while (established == 0)
julien.delange's avatar
julien.delange committed
301
         {
302
            __DEBUGMSG ("[DRIVER SOCKETS] Poller waits for connection with device %d (reading device=%d, socket=%d)\n", dev, dev_id, __po_hi_c_sockets_read_sockets[dev]);
jdelange's avatar
jdelange committed
303
            sock = accept (__po_hi_c_sockets_listen_socket, (struct sockaddr*) &sa, &socklen);
304

jdelange's avatar
jdelange committed
305
306
307
308
309
310
            if (sock == -1)
            {
               continue;
            }

            __DEBUGMSG ("[DRIVER SOCKETS] accept() passed, waiting for device id %d\n", dev);
311

312
#ifndef __PO_HI_USE_PROTOCOL_MYPROTOCOL_I
jdelange's avatar
jdelange committed
313
            dev_init = 0;
jdelange's avatar
jdelange committed
314
315
316
#ifdef _WIN32
            ret = recv (sock, (char*) &dev_init, sizeof (__po_hi_device_id), 0);
#else
jdelange's avatar
jdelange committed
317
            ret = read (sock, &dev_init, sizeof (__po_hi_device_id));
jdelange's avatar
jdelange committed
318
#endif
jdelange's avatar
jdelange committed
319

jdelange's avatar
jdelange committed
320
            if (ret != sizeof (__po_hi_device_id))
321
322
            {
               established = 0;
jdelange's avatar
jdelange committed
323
               __DEBUGMSG ("[DRIVER SOCKETS] Cannot read device-id for device %d, socket=%d, ret=%d, read size=%d, expected size=%d\n", dev, sock, ret, ret, sizeof (__po_hi_device_id));
324
325
326
            }
            else
            {
jdelange's avatar
jdelange committed
327
328
               dev_init = __po_hi_swap_byte (dev_init);

329
               __DEBUGMSG ("[DRIVER SOCKETS] read device-id %d from socket=%d\n", dev_init, sock);
330
331
               established = 1;
            }
332
333
#else
            established = 1;
jdelange's avatar
jdelange committed
334
            dev_init = dev;
335
#endif
julien.delange's avatar
julien.delange committed
336
         }
jdelange's avatar
jdelange committed
337
         __po_hi_c_sockets_read_sockets[dev_init] = sock;
julien.delange's avatar
julien.delange committed
338
339
340
         if (sock > max_socket )
         {
            max_socket = sock;
yoogx's avatar
yoogx committed
341
         }
julien.delange's avatar
julien.delange committed
342
   }
jdelange's avatar
jdelange committed
343

344
   __DEBUGMSG ("[DRIVER SOCKETS] Poller initialization finished, waiting for other tasks\n");
julien.delange's avatar
julien.delange committed
345
   __po_hi_wait_initialization ();
346
   __DEBUGMSG ("[DRIVER SOCKETS] Other tasks are initialized, let's start the polling !\n");
julien.delange's avatar
julien.delange committed
347
348
349
350
351
352
353
354
355
356

   /*
    * Then, listen and receive data on the socket, identify the node
    * which send the data and put it in its message queue
    */
   while (1)
   {
      FD_ZERO( &selector );
      for (dev = 0; dev < __PO_HI_NB_DEVICES ; dev++)
      {
jdelange's avatar
jdelange committed
357
         if ( (dev != dev_id ) && ( __po_hi_c_sockets_read_sockets[dev] != -1 ) )
julien.delange's avatar
julien.delange committed
358
         {
jdelange's avatar
jdelange committed
359
360

            __DEBUGMSG ("[DRIVER SOCKETS] Add socket %d to the selector\n", __po_hi_c_sockets_read_sockets[dev]);
jdelange's avatar
jdelange committed
361
            FD_SET( __po_hi_c_sockets_read_sockets[dev], &selector );
julien.delange's avatar
julien.delange committed
362
363
364
365
366
367
         }
      }

      if (select (max_socket + 1, &selector, NULL, NULL, NULL) == -1 )
      {
#ifdef __PO_HI_DEBUG
368
         __DEBUGMSG ("[DRIVER SOCKETS] Error on select for node %d\n", __po_hi_mynode);
yoogx's avatar
yoogx committed
369
#endif
julien.delange's avatar
julien.delange committed
370
371
372
      }
#ifdef __PO_HI_DEBUG
      __DEBUGMSG ("[DRIVER SOCKETS] Receive message\n");
373
#endif
julien.delange's avatar
julien.delange committed
374
375
376

      for (dev = 0; dev < __PO_HI_NB_DEVICES ; dev++)
      {
jdelange's avatar
jdelange committed
377
378
         __DEBUGMSG ("[DRIVER SOCKETS] Try to watch if it comes from device %d (socket=%d)\n", dev, __po_hi_c_sockets_read_sockets[dev]);
         if ( (__po_hi_c_sockets_read_sockets[dev] != -1 ) && FD_ISSET(__po_hi_c_sockets_read_sockets[dev], &selector))
julien.delange's avatar
julien.delange committed
379
380
         {
            __DEBUGMSG ("[DRIVER SOCKETS] Receive message from dev %d\n", dev);
jdelange's avatar
jdelange committed
381
382
383
384
385
386
#ifdef __PO_HI_USE_PROTOCOL_MYPROTOCOL_I
            {

               protocol_conf = __po_hi_transport_get_protocol_configuration (virtual_bus_myprotocol_i);

               int datareceived;
jdelange's avatar
jdelange committed
387
               len = recv (__po_hi_c_sockets_read_sockets[dev], &datareceived, sizeof (int), MSG_WAITALL);
jdelange's avatar
jdelange committed
388
389
390
391
               __DEBUGMSG ("[DRIVER SOCKETS] Message received len=%d\n",(int)len);
               if (len == 0)
               {
                  __DEBUGMSG ("[DRIVER SOCKETS] Zero size from device %d\n",dev);
jdelange's avatar
jdelange committed
392
                  __po_hi_c_sockets_read_sockets[dev] = -1;
jdelange's avatar
jdelange committed
393
394
                  continue;
               }
jdelange's avatar
jdelange committed
395
               protocol_conf->unmarshaller (&__po_hi_c_sockets_poller_received_request, &datareceived, len);
jdelange's avatar
jdelange committed
396
               __po_hi_c_sockets_poller_received_request.port = 1;
jdelange's avatar
jdelange committed
397
398
399
            }

#else
jdelange's avatar
jdelange committed
400
            memset (__po_hi_c_sockets_poller_msg.content, '\0', __PO_HI_MESSAGES_MAX_SIZE);
jdelange's avatar
jdelange committed
401
402
403
404
405


#ifdef _WIN32
            len = recv (__po_hi_c_sockets_read_sockets[dev], __po_hi_c_sockets_poller_msg.content, __PO_HI_MESSAGES_MAX_SIZE, 0);
#else
jdelange's avatar
jdelange committed
406
            len = recv (__po_hi_c_sockets_read_sockets[dev], __po_hi_c_sockets_poller_msg.content, __PO_HI_MESSAGES_MAX_SIZE, MSG_WAITALL);
jdelange's avatar
jdelange committed
407
408
#endif

jdelange's avatar
jdelange committed
409
            __po_hi_c_sockets_poller_msg.length = len;
julien.delange's avatar
julien.delange committed
410
411
            __DEBUGMSG ("[DRIVER SOCKETS] Message received len=%d\n",(int)len);

412
#ifdef __PO_HI_DEBUG
jdelange's avatar
jdelange committed
413
   __po_hi_messages_debug (&__po_hi_c_sockets_poller_msg);
414
415
#endif

jdelange's avatar
jdelange committed
416
            if (len <= 0)
julien.delange's avatar
julien.delange committed
417
            {
jdelange's avatar
jdelange committed
418
               __DEBUGMSG ("[DRIVER SOCKETS] Invalid size (%d) from device %d\n",len, dev);
jdelange's avatar
jdelange committed
419
               __po_hi_c_sockets_read_sockets[dev] = -1;
julien.delange's avatar
julien.delange committed
420
421
422
               continue;
            }

jdelange's avatar
jdelange committed
423
            __po_hi_unmarshall_request (&__po_hi_c_sockets_poller_received_request, &__po_hi_c_sockets_poller_msg);
jdelange's avatar
jdelange committed
424
#endif
yoogx's avatar
yoogx committed
425
426

            __DEBUGMSG ("[DRIVER SOCKETS] Delivering message to %d\n",__po_hi_c_sockets_poller_received_request.port );
jdelange's avatar
jdelange committed
427
            __po_hi_main_deliver (&__po_hi_c_sockets_poller_received_request);
julien.delange's avatar
julien.delange committed
428
429
         }
      }
yoogx's avatar
yoogx committed
430
   }
julien.delange's avatar
julien.delange committed
431
432
433
434
   return NULL;
}


435
void __po_hi_driver_sockets_init (__po_hi_device_id dev_id)
436
{
jdelange's avatar
jdelange committed
437
   int                     ret;
jdelange's avatar
jdelange committed
438
439
440
#ifdef _WIN32
   char FAR                reuse;
#else
jdelange's avatar
jdelange committed
441
   int                     reuse;
jdelange's avatar
jdelange committed
442
#endif
jdelange's avatar
jdelange committed
443
444
   struct sockaddr_in      sa;
   unsigned short          ip_port;
445

jdelange's avatar
jdelange committed
446
   __po_hi_c_ip_conf_t*    ipconf;
447
448
449
450
451
452
453
454
455
456
457
458
   __po_hi_device_id       dev;

   __po_hi_device_id          sent_id;
   struct hostent*            hostinfo;

   __po_hi_time_t             mytime;
   __po_hi_time_t             tmptime;

   char                       *tmp;
   __po_hi_time_t             current_time;
   int                        i;

jdelange's avatar
jdelange committed
459
   __po_hi_c_sockets_listen_socket = -1;
julien.delange's avatar
julien.delange committed
460

jdelange's avatar
jdelange committed
461
462
   __po_hi_c_sockets_device_id     = dev_id;

jdelange's avatar
jdelange committed
463
   if (__po_hi_c_sockets_array_init_done == 0)
464
   {
jdelange's avatar
jdelange committed
465
466
467
468
469
470
471
      for (dev = 0 ; dev < __PO_HI_NB_DEVICES ; dev++)
      {
         __po_hi_c_sockets_read_sockets[dev]   = -1;
         __po_hi_c_sockets_write_sockets[dev]  = -1;
      }

      __po_hi_c_sockets_array_init_done = 1;
472
473
   }

jdelange's avatar
jdelange committed
474
475
476

   __po_hi_transport_set_sending_func (dev_id, __po_hi_driver_sockets_send);

477
   ipconf = (__po_hi_c_ip_conf_t*)__po_hi_get_device_configuration (dev_id);
jdelange's avatar
jdelange committed
478
   ip_port = ipconf->port;
479

jdelange's avatar
jdelange committed
480
   __DEBUGMSG ("My configuration, addr=%s, port=%d\n", ipconf->address, ip_port );
481

482
   /*
julien.delange's avatar
julien.delange committed
483
484
485
    * If the current node port has a port number, then it has to
    * listen to other nodes. So, we create a socket, bind it and
    * listen to other nodes.
486
    */
487
   if (ip_port != 0)
488
   {
jdelange's avatar
jdelange committed
489
      __po_hi_c_sockets_listen_socket = socket (AF_INET, SOCK_STREAM, 0);
julien.delange's avatar
julien.delange committed
490

jdelange's avatar
jdelange committed
491

jdelange's avatar
jdelange committed
492
      if (__po_hi_c_sockets_listen_socket == -1 )
493
      {
julien.delange's avatar
julien.delange committed
494
#ifdef __PO_HI_DEBUG
495
         __DEBUGMSG ("Cannot create socket for device %d\n", dev_id);
julien.delange's avatar
julien.delange committed
496
497
498
#endif
         return;
      }
499

500
      __DEBUGMSG ("Socket created for addr=%s, port=%d, socket value=%d\n", ipconf->address, ip_port, __po_hi_c_sockets_listen_socket);
jdelange's avatar
jdelange committed
501

julien.delange's avatar
julien.delange committed
502
      reuse = 1;
jdelange's avatar
jdelange committed
503

jdelange's avatar
jdelange committed
504
      if (setsockopt (__po_hi_c_sockets_listen_socket, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof (reuse)))
jdelange's avatar
jdelange committed
505
506
507
      {
         __DEBUGMSG ("[DRIVER SOCKETS] Error while making the receiving socket reusable\n");
      }
julien.delange's avatar
julien.delange committed
508
509

      sa.sin_addr.s_addr = htonl (INADDR_ANY);   /* We listen on all adresses */
yoogx's avatar
yoogx committed
510
      sa.sin_family = AF_INET;
511
      sa.sin_port = htons (ip_port);   /* Port provided by the generated code */
julien.delange's avatar
julien.delange committed
512

jdelange's avatar
jdelange committed
513
      if( bind (__po_hi_c_sockets_listen_socket, (struct sockaddr *) &sa , sizeof (struct sockaddr_in) ) < 0 )
julien.delange's avatar
julien.delange committed
514
      {
515
         __DEBUGMSG ("Unable to bind socket and port on socket %d\n", __po_hi_c_sockets_listen_socket);
julien.delange's avatar
julien.delange committed
516
517
      }

jdelange's avatar
jdelange committed
518
      if( listen (__po_hi_c_sockets_listen_socket, __PO_HI_NB_DEVICES) < 0 )
julien.delange's avatar
julien.delange committed
519
      {
jdelange's avatar
jdelange committed
520
         __DEBUGMSG ("Cannot listen on socket %d\n", __po_hi_c_sockets_listen_socket);
521
      }
julien.delange's avatar
julien.delange committed
522

yoogx's avatar
yoogx committed
523
      /*
julien.delange's avatar
julien.delange committed
524
525
526
527
528
529
       * Create the thread which receive all data from other
       * nodes. This thread will execute the function
       * __po_hi_receiver_task
       */

      __po_hi_initialize_add_task ();
yoogx's avatar
yoogx committed
530
      __po_hi_create_generic_task
531
532
533
534
535
536
537
538
         (-1, 0,__PO_HI_MAX_PRIORITY, 0, (void* (*)(void))__po_hi_sockets_poller, &dev_id);
   }


   /*
    * For each node in the sytem that may communicate with the current
    * node we create a socket. This socket will be used to send data.
    */
jdelange's avatar
jdelange committed
539
   for (dev = 0 ; dev < __PO_HI_NB_DEVICES ; dev++)
540
541
542
543
544
545
   {
      if (dev == dev_id)
      {
         continue;
      }

jdelange's avatar
jdelange committed
546
547
548
549
550
551
      if (__po_hi_transport_share_bus (dev, dev_id) == 0)
      {
         __DEBUGMSG ("[DRIVER SOCKETS] Device %d and device %d does not share the same bus, skip connecting them\n", dev, dev_id);
         continue;
      }

552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
      __DEBUGMSG ("[DRIVER SOCKETS] Will initialize connection with device %d\n", dev);

      ip_port = 0;

      ipconf = (__po_hi_c_ip_conf_t*) __po_hi_get_device_configuration (dev);
      ip_port = (unsigned short)ipconf->port;

      __DEBUGMSG ("[DRIVER SOCKETS] Configuration for device %d, port=%d\n", dev, ip_port);

      if (ip_port == 0)
      {
         __DEBUGMSG ("[DRIVER SOCKETS] Invalid remote port\n");
         continue;
      }

      while (1)
      {
         __po_hi_c_sockets_write_sockets[dev] = socket (AF_INET, SOCK_STREAM, 0);

         if (__po_hi_c_sockets_write_sockets[dev] == -1 )
         {
            __DEBUGMSG ("[DRIVER SOCKETS] Socket for dev %d is not created\n", dev);
            return;
         }

yoogx's avatar
yoogx committed
577
578
579
580
581
         int NoDelayFlag = 1;
         if(setsockopt(__po_hi_c_sockets_write_sockets[dev],IPPROTO_TCP,TCP_NODELAY,&NoDelayFlag,sizeof(NoDelayFlag))){
           __DEBUGMSG ("[DRIVER SOCKETS] Unable to set TCP_NODELAY for dev %d\n", dev);
         }

582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
         __DEBUGMSG ("[DRIVER SOCKETS] Socket for dev %d created, value=%d\n", dev, __po_hi_c_sockets_write_sockets[dev]);

         hostinfo = NULL;

         hostinfo = gethostbyname ((char*)ipconf->address);

         if (hostinfo == NULL )
         {
            __DEBUGMSG ("[DRIVER SOCKETS] Error while getting host informations for device %d\n", dev);
         }

         sa.sin_port = htons (ip_port);
         sa.sin_family = AF_INET;

         /* The following lines are used to copy the
          * hostinfo->h_length to the sa.sin_addr member. Most
          * of program use the memcpy to do that, but the
          * RT-POSIX profile we use forbid the use of this
          * function.  We use a loop instead to perform the
          * copy.  So, these lines replace the code :
          *
          *
          * memcpy( (char*) &( sa.sin_addr ) , (char*)hostinfo->h_addr , hostinfo->h_length );
          */
         tmp = (char*) &(sa.sin_addr);
         for (i=0 ; i<hostinfo->h_length ; i++)
         {
            tmp[i] = hostinfo->h_addr[i];
         }


         /*
          * We try to connect on the remote host. We try every
          * second to connect on.
          */
         __PO_HI_SET_SOCKET_TIMEOUT(__po_hi_c_sockets_write_sockets[dev], 500000);
yoogx's avatar
yoogx committed
618
         ret = connect (__po_hi_c_sockets_write_sockets[dev],
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
                        (struct sockaddr*) &sa ,
                        sizeof (struct sockaddr_in));

#ifdef __PO_HI_USE_PROTOCOL_MYPROTOCOL_I
         if (ret == 0)
         {
            __DEBUGMSG ("[DRIVER SOCKETS] Connection established with device %d, socket=%d\n", dev, __po_hi_c_sockets_write_sockets[dev]);

            break;
         }
         else
         {
            __DEBUGMSG ("connect() failed, return=%d\n", ret);
         }

#else
         if (ret == 0)
         {
            __DEBUGMSG ("[DRIVER SOCKETS] Send my id (%d) to device %d through socket %d\n", dev_id, dev , __po_hi_c_sockets_write_sockets[dev]);

            sent_id = __po_hi_swap_byte (dev_id);
jdelange's avatar
jdelange committed
640
641
642
643
644
645
#ifdef _WIN32
            ret = send (__po_hi_c_sockets_write_sockets[dev], (char*) &sent_id, sizeof (__po_hi_device_id), 0);
#else
            ret = write (__po_hi_c_sockets_write_sockets[dev], &sent_id, sizeof (__po_hi_device_id));
#endif
            if (ret != sizeof (__po_hi_device_id))
646
            {
jdelange's avatar
jdelange committed
647
               __DEBUGMSG ("[DRIVER SOCKETS] Device %d cannot send his id, expected size=%d, return value=%d\n", dev_id, sizeof (__po_hi_device_id), ret);
648
            }
jdelange's avatar
jdelange committed
649
650
651
            else
            {
               __DEBUGMSG ("[DRIVER SOCKETS] Connection established with device %d, socket=%d\n", dev, __po_hi_c_sockets_write_sockets[dev]);
652

jdelange's avatar
jdelange committed
653
654
               break;
            }
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
         }
         else
         {
            __DEBUGMSG ("connect() failed, return=%d\n", ret);
         }
#endif

         if (close (__po_hi_c_sockets_write_sockets[dev]))
         {
            __DEBUGMSG ("[DRIVER SOCKETS] Cannot close socket %d\n", __po_hi_c_sockets_write_sockets[dev]);
         }

         /*
          * We wait 500ms each time we try to connect on the
          * remote host
          */

         __po_hi_get_time (&current_time);
         __po_hi_milliseconds (&tmptime, 500);
         __po_hi_add_times (&mytime, &current_time, &tmptime);
         __DEBUGMSG ("[DRIVER SOCKETS] Cannot connect on device %d, wait 500ms\n", dev);
         __po_hi_delay_until (&mytime);
      }
678
   }
jdelange's avatar
jdelange committed
679
   __DEBUGMSG ("[DRIVER SOCKETS] INITIALIZATION DONE\n");
680
}
681

682
#endif