/* * This is a part of PolyORB-HI-C distribution, a minimal * middleware written for generated code from AADL models. * You should use it with the Ocarina toolsuite. * * For more informations, please visit http://ocarina.enst.fr * * Copyright (C) 2010-2011, European Space Agency * Copyright (C) 2007-2008, GET-Telecom Paris. */ #include #include #if (defined (__PO_HI_NEED_DRIVER_SOCKETS) || \ defined (__PO_HI_NEED_DRIVER_RTEMS_NE2000_SOCKETS)) #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include /* * This file (po_hi_sockets.c) provides function to handle * communication between nodes in PolyORB-HI-C. We don't use a * protocol to send data. For each data sent, we send before the * entity number provided by the generated file deployment.h, then, we * send the message. Each entity has a fixed size * (sizeof(__po_hi_entity_t)), and each message has a max fixed size * (see the __PO_HI_MESSAGES_MAX_SIZE macro). */ /* The following declarations avoid conflicts * with current generated code. */ #ifndef __PO_HI_NB_NODES #define __PO_HI_NB_NODES 1 #endif /* * We have two arrays of sockets. The first array (nodes) is used to * send data to other nodes. A special socket if nodes[__po_hi_mynode] : this * socket is used to listen others processes. The second array * (rnodes), is used to store all socket that are created by the * listen socket. This array is used only by the receiver_task */ int __po_hi_c_sockets_listen_socket; int __po_hi_c_sockets_read_sockets[__PO_HI_NB_DEVICES]; int __po_hi_c_sockets_write_sockets[__PO_HI_NB_DEVICES]; __po_hi_request_t __po_hi_c_sockets_poller_received_request; __po_hi_msg_t __po_hi_c_sockets_poller_msg; __po_hi_msg_t __po_hi_c_sockets_send_msg; int __po_hi_driver_sockets_send (__po_hi_task_id task_id, __po_hi_port_t port) { int len; int size_to_write; int optval = 0; socklen_t optlen = 0; __po_hi_device_id remote_device; __po_hi_local_port_t local_port; __po_hi_request_t* request; __po_hi_port_t destination_port; __po_hi_protocol_t protocol_id; __po_hi_protocol_conf_t* protocol_conf; local_port = __po_hi_get_local_port_from_global_port (port); request = __po_hi_gqueue_get_most_recent_value (task_id, local_port); destination_port = __po_hi_gqueue_get_destination (task_id, local_port, 0); remote_device = __po_hi_get_device_from_port (destination_port); protocol_id = __po_hi_transport_get_protocol (port, destination_port); protocol_conf = __po_hi_transport_get_protocol_configuration (protocol_id); __DEBUGMSG ("[DRIVER SOCKETS] Try to write from task=%d, port=%d, remote device=%d, socket=%d\n", task_id, port, remote_device, __po_hi_c_sockets_write_sockets[remote_device]); if (request->port == -1) { #ifdef __PO_HI_DEBUG __DEBUGMSG (" [DRIVER SOCKETS] No data to write on port %d\n", port); #endif return __PO_HI_ERROR_TRANSPORT_SEND; } if (__po_hi_c_sockets_write_sockets[remote_device] == -1 ) { #ifdef __PO_HI_DEBUG __DEBUGMSG (" [DRIVER SOCKETS] Invalid socket for port-id %d, device-id %d\n", destination_port, remote_device); #endif return __PO_HI_ERROR_TRANSPORT_SEND; } /* * After sending the entity identifier, we send the message which * contains the request. */ size_to_write = __PO_HI_MESSAGES_MAX_SIZE; if (getsockopt (__po_hi_c_sockets_write_sockets[remote_device], SOL_SOCKET, SO_ERROR, &optval, &optlen) == -1) { __DEBUGMSG (" [error getsockopt() in file %s, line%d ]\n", __FILE__, __LINE__); close (__po_hi_c_sockets_write_sockets[remote_device]); __po_hi_c_sockets_write_sockets[remote_device] = -1; return __PO_HI_ERROR_TRANSPORT_SEND; } if (optval != 0) { __DEBUGMSG (" [error getsockopt() return code in file %s, line%d ]\n", __FILE__, __LINE__); close (__po_hi_c_sockets_write_sockets[remote_device]); __po_hi_c_sockets_write_sockets[remote_device] = -1; return __PO_HI_ERROR_TRANSPORT_SEND; } /* Ignore SIGPIPE to be able to recover from errors instead of crashing the node */ if (signal (SIGPIPE, SIG_IGN) == SIG_ERR) { __DEBUGMSG (" [error signal() return code in file %s, line%d ]\n", __FILE__, __LINE__); close (__po_hi_c_sockets_write_sockets[remote_device]); __po_hi_c_sockets_write_sockets[remote_device] = -1; return __PO_HI_ERROR_TRANSPORT_SEND; } switch (protocol_id) { #ifdef __PO_HI_USE_PROTOCOL_MYPROTOCOL_I case virtual_bus_myprotocol_i: { size_to_write = sizeof (int); int datawritten; protocol_conf->marshaller(request, &datawritten, &size_to_write); len = write (__po_hi_c_sockets_write_sockets[remote_device], &datawritten, size_to_write); if (len != size_to_write) { __DEBUGMSG (" [error write() length in file %s, line%d ]\n", __FILE__, __LINE__); close (__po_hi_c_sockets_write_sockets[remote_device]); __po_hi_c_write_sockets[remote_device] = -1; return __PO_HI_ERROR_TRANSPORT_SEND; } break; } #endif default: { request->port = destination_port; __po_hi_msg_reallocate (&__po_hi_c_sockets_send_msg); __po_hi_marshall_request (request, &__po_hi_c_sockets_send_msg); #ifdef __PO_HI_DEBUG __po_hi_messages_debug (&__po_hi_c_sockets_send_msg[remote_device]); #endif if (__po_hi_c_sockets_write_sockets[remote_device] != -1) { len = write (__po_hi_c_sockets_write_sockets[remote_device], &(__po_hi_c_sockets_send_msg.content), size_to_write); if (len != size_to_write) { __DEBUGMSG (" [error write() length in file %s, line%d ]\n", __FILE__, __LINE__); close (__po_hi_c_sockets_write_sockets[remote_device]); __po_hi_c_sockets_write_sockets[remote_device] = -1; return __PO_HI_ERROR_TRANSPORT_SEND; } } request->port = __PO_HI_GQUEUE_INVALID_PORT; break; } } return __PO_HI_SUCCESS; } void* __po_hi_sockets_poller (__po_hi_device_id* dev_id_addr) { socklen_t socklen = sizeof (struct sockaddr); /* See ACCEPT (2) for details on initial value of socklen */ __po_hi_uint32_t len; int sock; int max_socket; fd_set selector; struct sockaddr_in sa; __po_hi_device_id dev; __po_hi_node_t dev_init; int established = 0; int ret; unsigned short ip_port; __po_hi_protocol_conf_t* protocol_conf; __po_hi_c_ip_conf_t* ipconf; __po_hi_device_id dev_id; __po_hi_device_id sent_id; struct hostent* hostinfo; __po_hi_time_t mytime; __po_hi_time_t tmptime; char *tmp; __po_hi_time_t current_time; int i; max_socket = 0; /* Used to compute the max socket number, useful for listen() call */ dev_id = *dev_id_addr; __DEBUGMSG ("Poller launched, device-id=%d\n", dev_id); /* * Create a socket for each node that will communicate with us. */ for (dev = 0; dev < __PO_HI_NB_NODES - 1 ; dev++) { established = 0; while (established == 0) { __DEBUGMSG ("[DRIVER SOCKETS] Poller waits for connection with device %d (reading device=%d, socket=%d)\n", dev, dev_id, __po_hi_c_sockets_read_sockets[dev]); sock = accept (__po_hi_c_sockets_listen_socket, (struct sockaddr*) &sa, &socklen); if (sock == -1) { continue; } __DEBUGMSG ("[DRIVER SOCKETS] accept() passed, waiting for device id %d\n", dev); #ifndef __PO_HI_USE_PROTOCOL_MYPROTOCOL_I dev_init = 0; ret = read (sock, &dev_init, sizeof (__po_hi_device_id)); if (ret != sizeof (__po_hi_device_id)) { established = 0; __DEBUGMSG ("[DRIVER SOCKETS] Cannot read device-id for device %d, socket=%d, ret=%d\n", dev, sock, ret); } else { dev_init = __po_hi_swap_byte (dev_init); if (__po_hi_c_sockets_read_sockets[dev_init] != -1) { established = 0; __DEBUGMSG ("[DRIVER SOCKETS] Invalid id sent on socket=%d, received device id=%d\n", sock, dev_init); } __DEBUGMSG ("[DRIVER SOCKETS] read device-id %d from socket=%d\n", dev_init, sock); established = 1; } #else established = 1; #endif } __po_hi_c_sockets_read_sockets[dev_init] = sock; if (sock > max_socket ) { max_socket = sock; } } __DEBUGMSG ("[DRIVER SOCKETS] Poller initialization finished, waiting for other tasks\n"); __po_hi_wait_initialization (); __DEBUGMSG ("[DRIVER SOCKETS] Other tasks are initialized, let's start the polling !\n"); /* * Then, listen and receive data on the socket, identify the node * which send the data and put it in its message queue */ while (1) { FD_ZERO( &selector ); for (dev = 0; dev < __PO_HI_NB_DEVICES ; dev++) { if ( (dev != dev_id ) && ( __po_hi_c_sockets_read_sockets[dev] != -1 ) ) { FD_SET( __po_hi_c_sockets_read_sockets[dev], &selector ); } } if (select (max_socket + 1, &selector, NULL, NULL, NULL) == -1 ) { #ifdef __PO_HI_DEBUG __DEBUGMSG ("[DRIVER SOCKETS] Error on select for node %d\n", __po_hi_mynode); #endif } #ifdef __PO_HI_DEBUG __DEBUGMSG ("[DRIVER SOCKETS] Receive message\n"); #endif for (dev = 0; dev < __PO_HI_NB_DEVICES ; dev++) { __DEBUGMSG ("[DRIVER SOCKETS] Try to watch if it comes from device %d (socket=%d)\n", dev, __po_hi_c_sockets_read_sockets[dev]); if ( (__po_hi_c_sockets_read_sockets[dev] != -1 ) && FD_ISSET(__po_hi_c_sockets_read_sockets[dev], &selector)) { __DEBUGMSG ("[DRIVER SOCKETS] Receive message from dev %d\n", dev); #ifdef __PO_HI_USE_PROTOCOL_MYPROTOCOL_I { protocol_conf = __po_hi_transport_get_protocol_configuration (virtual_bus_myprotocol_i); int datareceived; len = recv (__po_hi_c_sockets_read_sockets[dev], &datareceived, sizeof (int), MSG_WAITALL); __DEBUGMSG ("[DRIVER SOCKETS] Message received len=%d\n",(int)len); if (len == 0) { __DEBUGMSG ("[DRIVER SOCKETS] Zero size from device %d\n",dev); __po_hi_c_sockets_read_sockets[dev] = -1; continue; } protocol_conf->unmarshaller (&__po_hi_c_sockets_poller_received_request, &datareceived, len); __po_hi_sockets_poller_received_request.port = 1; } #else memset (__po_hi_c_sockets_poller_msg.content, '\0', __PO_HI_MESSAGES_MAX_SIZE); len = recv (__po_hi_c_sockets_read_sockets[dev], __po_hi_c_sockets_poller_msg.content, __PO_HI_MESSAGES_MAX_SIZE, MSG_WAITALL); __po_hi_c_sockets_poller_msg.length = len; __DEBUGMSG ("[DRIVER SOCKETS] Message received len=%d\n",(int)len); #ifdef __PO_HI_DEBUG __po_hi_messages_debug (&__po_hi_c_sockets_poller_msg); #endif if (len == 0) { __DEBUGMSG ("[DRIVER SOCKETS] Zero size from device %d\n",dev); __po_hi_c_sockets_read_sockets[dev] = -1; continue; } __po_hi_unmarshall_request (&__po_hi_c_sockets_poller_received_request, &__po_hi_c_sockets_poller_msg); #endif __po_hi_main_deliver (&__po_hi_c_sockets_poller_received_request); } } } return NULL; } void __po_hi_driver_sockets_init (__po_hi_device_id dev_id) { int ret; int reuse; struct sockaddr_in sa; unsigned short ip_port; __po_hi_c_ip_conf_t* ipconf; __po_hi_device_id dev; __po_hi_device_id sent_id; struct hostent* hostinfo; __po_hi_time_t mytime; __po_hi_time_t tmptime; char *tmp; __po_hi_time_t current_time; int i; __po_hi_c_sockets_listen_socket = -1; for (dev = 0 ; dev < __PO_HI_NB_DEVICES ; dev++) { __po_hi_c_sockets_read_sockets[dev] = -1; __po_hi_c_sockets_write_sockets[dev] = -1; } ipconf = (__po_hi_c_ip_conf_t*)__po_hi_get_device_configuration (dev_id); ip_port = (int)ipconf->port; __DEBUGMSG ("My configuration, addr=%s, port=%d\n", ipconf->address, ip_port ); /* * If the current node port has a port number, then it has to * listen to other nodes. So, we create a socket, bind it and * listen to other nodes. */ if (ip_port != 0) { __po_hi_c_sockets_listen_socket = socket (AF_INET, SOCK_STREAM, 0); if (__po_hi_c_sockets_listen_socket == -1 ) { #ifdef __PO_HI_DEBUG __DEBUGMSG ("Cannot create socket for device %d\n", dev_id); #endif return; } __DEBUGMSG ("Socket created for addr=%s, port=%d, socket value=%d\n", ipconf->address, ip_port, __po_hi_c_sockets_listen_socket); reuse = 1; if (setsockopt (__po_hi_c_sockets_listen_socket, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof (reuse))) { __DEBUGMSG ("[DRIVER SOCKETS] Error while making the receiving socket reusable\n"); } sa.sin_addr.s_addr = htonl (INADDR_ANY); /* We listen on all adresses */ sa.sin_family = AF_INET; sa.sin_port = htons (ip_port); /* Port provided by the generated code */ if( bind (__po_hi_c_sockets_listen_socket, ( struct sockaddr * ) &sa , sizeof( struct sockaddr_in ) ) < 0 ) { __DEBUGMSG ("Unable to bind socket and port on socket %d\n", __po_hi_c_sockets_listen_socket); } if( listen (__po_hi_c_sockets_listen_socket, __PO_HI_NB_DEVICES) < 0 ) { __DEBUGMSG ("Cannot listen on socket %d\n", __po_hi_c_sockets_listen_socket); } /* * Create the thread which receive all data from other * nodes. This thread will execute the function * __po_hi_receiver_task */ __po_hi_initialize_add_task (); __po_hi_create_generic_task (-1, 0,__PO_HI_MAX_PRIORITY, 0, (void* (*)(void))__po_hi_sockets_poller, &dev_id); } /* * For each node in the sytem that may communicate with the current * node we create a socket. This socket will be used to send data. */ for (dev = 0 ; dev < __PO_HI_NB_DEVICES ; dev++ ) { if (dev == dev_id) { continue; } __DEBUGMSG ("[DRIVER SOCKETS] Will initialize connection with device %d\n", dev); ip_port = 0; ipconf = (__po_hi_c_ip_conf_t*) __po_hi_get_device_configuration (dev); ip_port = (unsigned short)ipconf->port; __DEBUGMSG ("[DRIVER SOCKETS] Configuration for device %d, port=%d\n", dev, ip_port); if (ip_port == 0) { __DEBUGMSG ("[DRIVER SOCKETS] Invalid remote port\n"); continue; } while (1) { __po_hi_c_sockets_write_sockets[dev] = socket (AF_INET, SOCK_STREAM, 0); if (__po_hi_c_sockets_write_sockets[dev] == -1 ) { __DEBUGMSG ("[DRIVER SOCKETS] Socket for dev %d is not created\n", dev); return; } __DEBUGMSG ("[DRIVER SOCKETS] Socket for dev %d created, value=%d\n", dev, __po_hi_c_sockets_write_sockets[dev]); hostinfo = NULL; hostinfo = gethostbyname ((char*)ipconf->address); if (hostinfo == NULL ) { __DEBUGMSG ("[DRIVER SOCKETS] Error while getting host informations for device %d\n", dev); } sa.sin_port = htons (ip_port); sa.sin_family = AF_INET; /* The following lines are used to copy the * hostinfo->h_length to the sa.sin_addr member. Most * of program use the memcpy to do that, but the * RT-POSIX profile we use forbid the use of this * function. We use a loop instead to perform the * copy. So, these lines replace the code : * * * memcpy( (char*) &( sa.sin_addr ) , (char*)hostinfo->h_addr , hostinfo->h_length ); */ tmp = (char*) &(sa.sin_addr); for (i=0 ; ih_length ; i++) { tmp[i] = hostinfo->h_addr[i]; } /* * We try to connect on the remote host. We try every * second to connect on. */ __PO_HI_SET_SOCKET_TIMEOUT(__po_hi_c_sockets_write_sockets[dev], 500000); ret = connect (__po_hi_c_sockets_write_sockets[dev], (struct sockaddr*) &sa , sizeof (struct sockaddr_in)); #ifdef __PO_HI_USE_PROTOCOL_MYPROTOCOL_I if (ret == 0) { __DEBUGMSG ("[DRIVER SOCKETS] Connection established with device %d, socket=%d\n", dev, __po_hi_c_sockets_write_sockets[dev]); break; } else { __DEBUGMSG ("connect() failed, return=%d\n", ret); } #else if (ret == 0) { __DEBUGMSG ("[DRIVER SOCKETS] Send my id (%d) to device %d through socket %d\n", dev_id, dev , __po_hi_c_sockets_write_sockets[dev]); sent_id = __po_hi_swap_byte (dev_id); if (write (__po_hi_c_sockets_write_sockets[dev], &sent_id, sizeof (__po_hi_device_id)) != sizeof (__po_hi_device_id)) { __DEBUGMSG ("[DRIVER SOCKETS] Device %d cannot send his id\n", dev_id); } __DEBUGMSG ("[DRIVER SOCKETS] Connection established with device %d, socket=%d\n", dev, __po_hi_c_sockets_write_sockets[dev]); break; } else { __DEBUGMSG ("connect() failed, return=%d\n", ret); } #endif if (close (__po_hi_c_sockets_write_sockets[dev])) { __DEBUGMSG ("[DRIVER SOCKETS] Cannot close socket %d\n", __po_hi_c_sockets_write_sockets[dev]); } /* * We wait 500ms each time we try to connect on the * remote host */ __po_hi_get_time (¤t_time); __po_hi_milliseconds (&tmptime, 500); __po_hi_add_times (&mytime, ¤t_time, &tmptime); __DEBUGMSG ("[DRIVER SOCKETS] Cannot connect on device %d, wait 500ms\n", dev); __po_hi_delay_until (&mytime); } } __DEBUGMSG ("[DRIVER SOCKETS] INITIALIZATION DONE\n"); } #endif