Commit 571365a5 authored by julien.delange's avatar julien.delange
Browse files

first version of gqueues with RTEMS interface



git-svn-id: https://tecsw.estec.esa.int/svn/taste/trunk/po-hi-c@1241 129961e7-ef38-4bb5-a8f7-c9a525a55882
parent b77bd7c4
......@@ -69,7 +69,7 @@
#define CONFIGURE_USE_IMFS_AS_BASE_FILESYSTEM
#define CONFIGURE_RTEMS_INIT_TASKS_TABLE
#define CONFIGURE_MAXIMUM_TASKS 20
#define CONFIGURE_MAXIMUM_BARRIERS 20
#define CONFIGURE_MAXIMUM_BARRIERS 20
#define CONFIGURE_INIT
#include <rtems.h>
#include <inttypes.h>
......
......@@ -50,7 +50,6 @@
#define __PO_HI_DEFAULT_PRIORITY ((sched_get_priority_min(SCHED_FIFO) + sched_get_priority_max(SCHED_FIFO))/2)
#elif defined(RTEMS_PURE)
#include <rtems.h>
#include <inttypes.h>
#include <po_hi_time.h>
......
......@@ -24,8 +24,17 @@
/* Headers from the generated code */
#include <string.h>
#if defined (POSIX) || defined (RTEMS_POSIX)
#include <pthread.h>
/* System-dependent headers */
#elif defined(RTEMS_PURE)
#include <rtems.h>
#include <inttypes.h>
#include <po_hi_time.h>
#define __PO_HI_DEFAULT_PRIORITY RTEMS_NO_PRIORITY
#endif
#define __PO_HI_GQUEUE_OUT_PORT constant_out_identifier
/* give a default value to the out port */
......@@ -51,10 +60,17 @@ __po_hi_uint8_t* __po_hi_gqueues_port_is_empty[__PO_HI_NB_TASKS];
__po_hi_uint8_t __po_hi_gqueues_queue_is_empty[__PO_HI_NB_TASKS];
__po_hi_uint8_t __po_hi_gqueues_n_empty[__PO_HI_NB_TASKS];
#if defined (RTEMS_POSIX) || defined (POSIX)
pthread_mutex_t __po_hi_gqueues_mutexes[__PO_HI_NB_TASKS];
pthread_cond_t __po_hi_gqueues_conds[__PO_HI_NB_TASKS];
pthread_mutexattr_t __po_hi_gqueues_mutexes_attr[__PO_HI_NB_TASKS];
pthread_condattr_t __po_hi_gqueues_conds_attr[__PO_HI_NB_TASKS];
#endif
#ifdef RTEMS_PURE
rtems_id __po_hi_gqueues_semaphores[__PO_HI_NB_TASKS];
rtems_id __po_hi_gqueues_barriers[__PO_HI_NB_TASKS];
#endif
void __po_hi_gqueue_init (__po_hi_task_id id,
__po_hi_uint8_t nb_ports,
......@@ -71,9 +87,13 @@ void __po_hi_gqueue_init (__po_hi_task_id id,
__po_hi_uint8_t empties[],
__po_hi_uint16_t total_fifo_size)
{
__po_hi_uint8_t tmp;
__po_hi_uint16_t off;
__po_hi_request_t* request;
__po_hi_uint8_t tmp;
__po_hi_uint16_t off;
__po_hi_request_t* request;
#ifdef RTEMS_PURE
rtems_status_code ret;
#endif
__po_hi_gqueues_global_history_woffset[id] = 0;
__po_hi_gqueues_global_history_offset[id] = 0;
......@@ -98,6 +118,7 @@ void __po_hi_gqueue_init (__po_hi_task_id id,
__po_hi_gqueues_queue_is_empty[id] = 1;
#if defined (RTEMS_POSIX) || defined (POSIX)
pthread_mutexattr_init (&__po_hi_gqueues_mutexes_attr[id]);
pthread_condattr_init (&__po_hi_gqueues_conds_attr[id]);
#ifdef POSIX
......@@ -105,6 +126,23 @@ void __po_hi_gqueue_init (__po_hi_task_id id,
#endif
pthread_mutex_init (&__po_hi_gqueues_mutexes[id], &__po_hi_gqueues_mutexes_attr[id]);
pthread_cond_init (&__po_hi_gqueues_conds[id], &__po_hi_gqueues_conds_attr[id]);
#endif
#ifdef RTEMS_PURE
__DEBUGMSG ("[GQUEUE] Create semaphore for queue of task %d\n", id);
ret = rtems_semaphore_create (rtems_build_name ('G', 'S', 'E' , 'A' + (char) id), 1, RTEMS_BINARY_SEMAPHORE, __PO_HI_DEFAULT_PRIORITY, &(__po_hi_gqueues_semaphores[id]));
if (ret != RTEMS_SUCCESSFUL)
{
__DEBUGMSG ("[GQUEUE] Cannot create semaphore, error code=%d\n", ret);
}
__DEBUGMSG ("[GQUEUE] Create barrier for queue of task %d\n", id);
ret = rtems_barrier_create (rtems_build_name ('G', 'S', 'I' , 'A' + (char) id),RTEMS_BARRIER_AUTOMATIC_RELEASE , 10, &(__po_hi_gqueues_barriers[id]));
if (ret != RTEMS_SUCCESSFUL)
{
__DEBUGMSG ("[GQUEUE] Cannot create barrier, error code=%d\n", ret);
}
#endif
off = 0;
......@@ -159,6 +197,9 @@ __po_hi_uint8_t __po_hi_gqueue_store_in (__po_hi_task_id id,
__po_hi_request_t* request)
{
__po_hi_request_t* ptr;
#ifdef RTEMS_PURE
rtems_status_code ret;
#endif
ptr = &__po_hi_gqueues_most_recent_values[id][port];
#ifdef __PO_HI_DEBUG
if (ptr == NULL)
......@@ -167,7 +208,24 @@ __po_hi_uint8_t __po_hi_gqueue_store_in (__po_hi_task_id id,
}
#endif
#if defined (POSIX) || defined (RTEMS_POSIX)
pthread_mutex_lock (&__po_hi_gqueues_mutexes[id]);
#elif defined (RTEMS_PURE)
/*
ret = rtems_barrier_wait (__po_hi_gqueues_barriers[id], RTEMS_WAIT);
rtems_id __po_hi_gqueues_semaphores[__PO_HI_NB_TASKS];
rtems_id __po_hi_gqueues_barriers[__PO_HI_NB_TASKS];
*/
__DEBUGMSG ("[GQUEUE] Try to obtain semaphore for queue of task %d\n", id);
ret = rtems_semaphore_obtain (__po_hi_gqueues_semaphores[id], RTEMS_WAIT, RTEMS_NO_TIMEOUT);
if (ret != RTEMS_SUCCESSFUL)
{
__DEBUGMSG ("[GQUEUE] Cannot obtain semaphore in __po_hi_gqueue_store_in()\n");
}
__DEBUGMSG ("[GQUEUE] Semaphore got\n");
#endif
if (__po_hi_gqueues_sizes[id][port] == __PO_HI_GQUEUE_FIFO_INDATA)
{
memcpy(ptr,request,sizeof(*request));
......@@ -175,12 +233,20 @@ __po_hi_uint8_t __po_hi_gqueue_store_in (__po_hi_task_id id,
else
{
#ifdef __PO_HI_DEBUG
__DEBUGMSG ("Received message for task %d, port %d\n", id, port);
__DEBUGMSG ("[GQUEUE] Received message for task %d, port %d\n", id, port);
#endif
if (__po_hi_gqueues_used_size[id][port] == __po_hi_gqueues_sizes[id][port])
{
#if defined (POSIX) || defined (RTEMS_POSIX)
pthread_mutex_unlock (&__po_hi_gqueues_mutexes[id]);
__DEBUGMSG ("QUEUE FULL");
#elif defined (RTEMS_PURE)
ret = rtems_semaphore_release (__po_hi_gqueues_semaphores[id]);
if (ret != RTEMS_SUCCESSFUL)
{
__DEBUGMSG ("[GQUEUE] Cannot release semaphore in __po_hi_gqueue_store_in()\n");
}
#endif
__DEBUGMSG ("[GQUEUE] QUEUE FULL, task-id=%d, port=%d", id, port);
return __PO_HI_ERROR_QUEUE_FULL;
}
......@@ -200,8 +266,16 @@ __po_hi_uint8_t __po_hi_gqueue_store_in (__po_hi_task_id id,
__po_hi_gqueues_queue_is_empty[id] = 0;
}
#if defined (POSIX) || defined (RTEMS_POSIX)
pthread_mutex_unlock (&__po_hi_gqueues_mutexes[id]);
pthread_cond_broadcast (&__po_hi_gqueues_conds[id]);
#elif defined (RTEMS_PURE)
ret = rtems_semaphore_release (__po_hi_gqueues_semaphores[id]);
if (ret != RTEMS_SUCCESSFUL)
{
__DEBUGMSG ("[GQUEUE] Cannot release semaphore in __po_hi_gqueue_store_in()\n");
}
#endif
return __PO_HI_SUCCESS;
}
......@@ -209,14 +283,59 @@ __po_hi_uint8_t __po_hi_gqueue_store_in (__po_hi_task_id id,
void __po_hi_gqueue_wait_for_incoming_event (__po_hi_task_id id,
__po_hi_local_port_t* port)
{
#ifdef RTEMS_PURE
rtems_status_code ret;
#endif
#if defined (POSIX) || defined (RTEMS_POSIX)
pthread_mutex_lock (&__po_hi_gqueues_mutexes[id]);
#elif defined (RTEMS_PURE)
/*
ret = rtems_barrier_wait (__po_hi_gqueues_barriers[id], RTEMS_WAIT);
rtems_id __po_hi_gqueues_semaphores[__PO_HI_NB_TASKS];
rtems_id __po_hi_gqueues_barriers[__PO_HI_NB_TASKS];
*/
ret = rtems_semaphore_obtain (__po_hi_gqueues_semaphores[id], RTEMS_WAIT, RTEMS_NO_TIMEOUT);
if (ret != RTEMS_SUCCESSFUL)
{
__DEBUGMSG ("[GQUEUE] Cannot obtain semaphore in __po_hi_gqueue_store_in()\n");
}
#endif
while(__po_hi_gqueues_queue_is_empty[id] == 1)
{
#if defined (POSIX) || defined (RTEMS_POSIX)
pthread_cond_wait (&__po_hi_gqueues_conds[id],
&__po_hi_gqueues_mutexes[id]);
#elif defined (RTEMS_PURE)
ret = rtems_semaphore_release (__po_hi_gqueues_semaphores[id]);
if (ret != RTEMS_SUCCESSFUL)
{
__DEBUGMSG ("[GQUEUE] Cannot obtain semaphore in __po_hi_gqueue_store_in()\n");
}
rtems_task_wake_after( RTEMS_YIELD_PROCESSOR );
ret = rtems_semaphore_obtain (__po_hi_gqueues_semaphores[id], RTEMS_WAIT, RTEMS_NO_TIMEOUT);
if (ret != RTEMS_SUCCESSFUL)
{
__DEBUGMSG ("[GQUEUE] Cannot obtain semaphore in __po_hi_gqueue_store_in()\n");
}
#endif
}
*port = __po_hi_gqueues_global_history[id][__po_hi_gqueues_global_history_offset[id]];
#if defined (POSIX) || defined (RTEMS_POSIX)
pthread_mutex_unlock (&__po_hi_gqueues_mutexes[id]);
#elif defined (RTEMS_PURE)
ret = rtems_semaphore_release (__po_hi_gqueues_semaphores[id]);
if (ret != RTEMS_SUCCESSFUL)
{
__DEBUGMSG ("[GQUEUE] Cannot release semaphore in __po_hi_gqueue_store_in()\n");
}
#endif
}
int __po_hi_gqueue_get_count( __po_hi_task_id id, __po_hi_local_port_t port)
......@@ -236,8 +355,28 @@ int __po_hi_gqueue_get_value( __po_hi_task_id id,
__po_hi_request_t* request)
{
__po_hi_request_t* ptr;
#ifdef RTEMS_PURE
rtems_status_code ret;
#endif
ptr = &__po_hi_gqueues_most_recent_values[id][port];
#if defined (POSIX) || defined (RTEMS_POSIX)
pthread_mutex_lock (&__po_hi_gqueues_mutexes[id]);
#elif defined (RTEMS_PURE)
/*
ret = rtems_barrier_wait (__po_hi_gqueues_barriers[id], RTEMS_WAIT);
rtems_id __po_hi_gqueues_semaphores[__PO_HI_NB_TASKS];
rtems_id __po_hi_gqueues_barriers[__PO_HI_NB_TASKS];
*/
ret = rtems_semaphore_obtain (__po_hi_gqueues_semaphores[id], RTEMS_WAIT, RTEMS_NO_TIMEOUT);
if (ret != RTEMS_SUCCESSFUL)
{
__DEBUGMSG ("[GQUEUE] Cannot obtain semaphore in __po_hi_gqueue_store_in()\n");
}
#endif
/*
* If the port is an event port, with no value queued, then we block
......@@ -247,8 +386,12 @@ int __po_hi_gqueue_get_value( __po_hi_task_id id,
{
while (__po_hi_gqueues_port_is_empty[id][port] == 1)
{
#if defined (POSIX) || defined (RTEMS_POSIX)
pthread_cond_wait (&__po_hi_gqueues_conds[id],
&__po_hi_gqueues_mutexes[id]);
#elif defined (RTEMS_PURE)
rtems_task_wake_after( RTEMS_YIELD_PROCESSOR );
#endif
}
}
......@@ -282,12 +425,26 @@ int __po_hi_gqueue_get_value( __po_hi_task_id id,
__DEBUGMSG ("|\n");
#endif
#if defined (POSIX) || defined (RTEMS_POSIX)
pthread_mutex_unlock (&__po_hi_gqueues_mutexes[id]);
#elif defined (RTEMS_PURE)
ret = rtems_semaphore_release (__po_hi_gqueues_semaphores[id]);
if (ret != RTEMS_SUCCESSFUL)
{
__DEBUGMSG ("[GQUEUE] Cannot release semaphore in __po_hi_gqueue_store_in()\n");
}
#endif
return 0;
}
int __po_hi_gqueue_next_value (__po_hi_task_id id, __po_hi_local_port_t port)
{
#ifdef RTEMS_PURE
rtems_status_code ret;
#endif
/* incomplete semantics, should discriminate and report whether
there is a next value or not */
......@@ -297,7 +454,23 @@ int __po_hi_gqueue_next_value (__po_hi_task_id id, __po_hi_local_port_t port)
return 1;
}
#if defined (POSIX) || defined (RTEMS_POSIX)
pthread_mutex_lock (&__po_hi_gqueues_mutexes[id]);
#elif defined (RTEMS_PURE)
/*
ret = rtems_barrier_wait (__po_hi_gqueues_barriers[id], RTEMS_WAIT);
rtems_id __po_hi_gqueues_semaphores[__PO_HI_NB_TASKS];
rtems_id __po_hi_gqueues_barriers[__PO_HI_NB_TASKS];
*/
ret = rtems_semaphore_obtain (__po_hi_gqueues_semaphores[id], RTEMS_WAIT, RTEMS_NO_TIMEOUT);
if (ret != RTEMS_SUCCESSFUL)
{
__DEBUGMSG ("[GQUEUE] Cannot obtain semaphore in __po_hi_gqueue_store_in()\n");
}
#endif
__po_hi_gqueues_offsets[id][port] =
(__po_hi_gqueues_offsets[id][port] + 1)
% __po_hi_gqueues_sizes[id][port];
......@@ -319,7 +492,16 @@ int __po_hi_gqueue_next_value (__po_hi_task_id id, __po_hi_local_port_t port)
(__po_hi_gqueues_global_history_offset[id] + 1)
% __po_hi_gqueues_total_fifo_size[id];
#if defined (POSIX) || defined (RTEMS_POSIX)
pthread_mutex_unlock (&__po_hi_gqueues_mutexes[id]);
#elif defined (RTEMS_PURE)
ret = rtems_semaphore_release (__po_hi_gqueues_semaphores[id]);
if (ret != RTEMS_SUCCESSFUL)
{
__DEBUGMSG ("[GQUEUE] Cannot release semaphore in __po_hi_gqueue_store_in()\n");
}
#endif
return __PO_HI_SUCCESS;
}
......
......@@ -125,11 +125,11 @@ int __po_hi_wait_for_next_period (__po_hi_task_id task)
return (__PO_HI_SUCCESS);
break;
case RTEMS_TIMEOUT:
__DEBUGMSG ("Error in rtems_rate_monotonic_period (TIMEOUT)\n");
__DEBUGMSG ("Error in rtems_rate_monotonic_period (TIMEOUT, task = %d)\n", task);
return (__PO_HI_ERROR_TASK_PERIOD);
break;
default:
__DEBUGMSG ("Error in rtems_rate_monotonic_period (unknown, error code=%d)\n", ret);
__DEBUGMSG ("Error in rtems_rate_monotonic_period (unknown, error code=%d, task=%d)\n", ret, task);
return (__PO_HI_ERROR_UNKNOWN);
break;
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment