Commit 22b47903 authored by yoogx's avatar yoogx

* Move Unprotected_Queue to a dedicated package

parent 1f87dae2
......@@ -2,20 +2,22 @@ SUBDIRS = drivers
# BEGIN: DO NOT DELETE THIS LINE
ADA_SPECS_WITH_BODY = $(srcdir)/polyorb_hi-aperiodic_task.ads \
$(srcdir)/polyorb_hi-background_task.ads \
$(srcdir)/polyorb_hi-hybrid_task.ads \
$(srcdir)/polyorb_hi-hybrid_task_driver.ads \
$(srcdir)/polyorb_hi-marshallers_g.ads \
$(srcdir)/polyorb_hi-messages.ads \
$(srcdir)/polyorb_hi-output.ads \
$(srcdir)/polyorb_hi-periodic_task.ads \
$(srcdir)/polyorb_hi-port_kinds.ads \
$(srcdir)/polyorb_hi-protocols.ads \
$(srcdir)/polyorb_hi-sporadic_task.ads \
$(srcdir)/polyorb_hi-suspenders.ads \
$(srcdir)/polyorb_hi-thread_interrogators.ads \
$(srcdir)/polyorb_hi-scheduler.ads \
$(srcdir)/polyorb_hi-utils.ads
$(srcdir)/polyorb_hi-background_task.ads \
$(srcdir)/polyorb_hi-hybrid_task.ads \
$(srcdir)/polyorb_hi-hybrid_task_driver.ads \
$(srcdir)/polyorb_hi-marshallers_g.ads \
$(srcdir)/polyorb_hi-messages.ads \
$(srcdir)/polyorb_hi-null_task.ads \
$(srcdir)/polyorb_hi-output.ads \
$(srcdir)/polyorb_hi-periodic_task.ads \
$(srcdir)/polyorb_hi-port_kinds.ads \
$(srcdir)/polyorb_hi-protocols.ads \
$(srcdir)/polyorb_hi-sporadic_task.ads \
$(srcdir)/polyorb_hi-suspenders.ads \
$(srcdir)/polyorb_hi-thread_interrogators.ads \
$(srcdir)/polyorb_hi-scheduler.ads \
$(srcdir)/polyorb_hi-unprotected_queue.ads \
$(srcdir)/polyorb_hi-utils.ads
ADA_SPECS = $(ADA_SPECS_WITH_BODY) $(srcdir)/polyorb_hi.ads \
$(srcdir)/polyorb_hi-errors.ads \
......
......@@ -39,6 +39,7 @@ with PolyORB_HI.Port_Type_Marshallers;
with PolyORB_HI.Streams;
with PolyORB_HI.Time_Marshallers;
with POlyORB_HI.Utils;
with PolyORB_HI.Unprotected_Queue;
package body PolyORB_HI.Thread_Interrogators is
......@@ -49,1098 +50,32 @@ package body PolyORB_HI.Thread_Interrogators is
use PolyORB_HI.Output;
use PolyORB_HI.Utils;
-- The types and the routines below give a flexible way to handle
-- Thread_Interface_Type variables and to store them in arrays.
type Port_Stream is
new PolyORB_HI.Streams.Stream_Element_Array
(1 .. Thread_Interface_Type'Size / 8);
type Port_Stream_Entry is record
From : Entity_Type;
Payload : Port_Stream;
end record;
-- A couple of a message and its sender
N_Ports : constant Integer :=
Port_Type'Pos (Port_Type'Last) - Port_Type'Pos (Port_Type'First) + 1;
-- Number of ports in the thread
function Interface_To_Stream is
new Ada.Unchecked_Conversion (Thread_Interface_Type, Port_Stream);
function Stream_To_Interface is
new Ada.Unchecked_Conversion (Port_Stream, Thread_Interface_Type);
function CE return String;
pragma Inline (CE);
-- Shortcut to Entity_Image (Current_Entity)
type Port_Stream_Array is array (Port_Type) of Port_Stream_Entry;
subtype Big_Port_Index_Type is Integer range 0 .. Global_Data_Queue_Size;
Default_Index_Value : constant Big_Port_Index_Type
:= (if Global_Data_Queue_Size > 0 then 1 else 0);
type Big_Port_Stream_Array is
array (Big_Port_Index_Type) of Port_Stream_Entry;
type Big_Port_Type_Array is array (Big_Port_Index_Type) of Port_Type;
-- FIXME: We begin by 0 although the 0 position is unused. We do
-- this to avoid compile time warning. After this package is
-- deeply tested, begin by 1 and disable Index_Check and
-- Range_Check for the Big_Port_Stream_Array type.
type Port_Index_Array is array (Port_Type) of Big_Port_Index_Type;
-- An array type to specify the port FIFO sizes and urgencies.
procedure H_Increment_First (F : in out Big_Port_Index_Type);
procedure H_Increment_Last (L : in out Big_Port_Index_Type);
pragma Inline (H_Increment_First);
pragma Inline (H_Increment_Last);
-- Cyclic incrementation and decrementation of F or L within the
-- 1..Global_Data_Queue_Size range.
type Boolean_Array is array (Port_Type) of Boolean;
type Time_Array is array (Port_Type) of Time;
-----------------------
-- Unprotected_Queue --
-----------------------
package Unprotected_Queue is
procedure Read_Event
(P : out Port_Type;
Valid : out Boolean;
Not_Empty : Boolean);
-- Same as 'Wait_Event' but without blocking. Valid is set to
-- False if there is nothing to receive.
procedure Dequeue
(T : Port_Type; P : out Port_Stream_Entry; Not_Empty : out Boolean);
-- Dequeue a value from the partial FIFO of port T. If there is
-- no enqueued value, return the latest dequeued value.
function Read_In (T : Port_Type) return Port_Stream_Entry;
-- Read the oldest queued value on the partial FIFO of IN port
-- T without dequeuing it. If there is no queued value, return
-- the latest dequeued value.
function Read_Out (T : Port_Type) return Port_Stream_Entry;
-- Return the value put for OUT port T.
function Is_Invalid (T : Port_Type) return Boolean;
-- Return True if no Put_Value has been called for this port
-- since the last Set_Invalid call.
procedure Set_Invalid (T : Port_Type);
-- Set the value stored for OUT port T as invalid to impede its
-- future sending without calling Put_Value. This procedure is
-- generally called just after Read_Out. However we cannot
-- combine them in one routine because we need Read_Out to be a
-- function and functions cannot modify protected object
-- states.
procedure Store_In
(P : Port_Stream_Entry; T : Time; Not_Empty : out Boolean);
-- Stores a new incoming message in its corresponding
-- position. If this is an event [data] incoming message, then
-- stores it in the queue, updates its most recent value and
-- unblock the barrier. Otherwise, it only overrides the most
-- recent value. T is the time stamp associated to the port
-- P. In case of data ports with delayed connections, it
-- indicates the instant from which the data of P becomes
-- deliverable.
procedure Store_Out (P : Port_Stream_Entry; T : Time);
-- Store a value of an OUT port to be sent at the next call to
-- Send_Output and mark the value as valid.
function Count (T : Port_Type) return Integer;
-- Return the number of pending messages on IN port T.
function Get_Time_Stamp (P : Port_Type) return Time;
-- Return the time stamp associated to port T
-- The following are accessors to some internal data of the event queue
function Get_Most_Recent_Value (P : Port_Type) return Port_Stream_Entry;
procedure Set_Most_Recent_Value
(P : Port_Type;
S : Port_Stream_Entry;
T : Time);
-- The protected object contains also an array to store the
-- values of received IN DATA ports as well as the most recent
-- value of IN EVENT DATA. For OUT port, the value is the
-- message to be send when Send_Output is called. In case of an
-- event data port, we do not use the 2 elements of the array
-- to store most recent values because there is no delayed
-- connections for event data ports.
private
Global_Data_Queue : Big_Port_Stream_Array;
-- The structure of the buffer is as follows:
-- ----------------------------------------------------------------
-- | Q1 | Q2 | Q3 | ... | Qn |
-- ----------------------------------------------------------------
-- O1 O2 O3 O4 ... On
-- 'On' is the offset associated to IN [event] [data] port n,
-- given from the generic formal, Thread_FIFO_Offsets. This
-- guarantees an O(1) access and storage time of a given
-- element in the global queue. Intrinsically, the global table
-- is a concatenation of circular arrays each one corresponding
-- to a port queue.
Firsts : Port_Index_Array := (Port_Type'Range => Default_Index_Value);
Lasts : Port_Index_Array := (Port_Type'Range => 0);
-- Used for IN [event] [data] ports to navigate in the global
-- queue. For IN DATA ports, in case of immediate connection
-- only the 'Lasts' value is relevant and it is 0 or 1, in case
-- of a delayed connection both values are relevant.
Empties : Boolean_Array := (Port_Type'Range => True);
-- Indicates whether each port-FIFO is empty or not
Global_Data_History : Big_Port_Type_Array;
GH_First : Big_Port_Index_Type := Default_Index_Value;
GH_Last : Big_Port_Index_Type := 0;
-- This contains, in an increasing chronological order the IN
-- EVENT ports that have a pending event. Example (P_1, P_3,
-- P_1, P_2, P_3) means that the oldest pending message is
-- received on P_1 then on P_3, then on P_1 again and so on...
-- FIXME: Add N_Ports to the array size to handle the case the
-- thread has an IN event [data] port with a FIFO size equal to
-- zero which is not supported yet.
Most_Recent_Values : Port_Stream_Array;
Time_Stamps : Time_Array;
Initialized : Boolean_Array := (Port_Type'Range => False);
-- To indicate whether the port ever received a data (or an
-- event).
Value_Put : Boolean_Array := (Port_Type'Range => False);
-- To indicate whether the OUT port values have been set in
-- order to be sent.
N_Empties : Integer := N_Ports;
-- Number of empty partial queues. At the beginning, all the
-- queues are empty.
end Unprotected_Queue;
package body Unprotected_Queue is
----------------
-- Read_Event --
----------------
procedure Read_Event
(P : out Port_Type;
Valid : out Boolean;
Not_Empty : Boolean)
is
begin
Valid := Not_Empty;
if Valid then
P := Global_Data_History (GH_First);
pragma Debug (Put_Line
(Verbose,
CE
+ ": Read_Event: read valid event [data] on "
+ Thread_Port_Images (P)));
end if;
end Read_Event;
-------------
-- Dequeue --
-------------
procedure Dequeue
(T : Port_Type;
P : out Port_Stream_Entry;
Not_Empty : out Boolean)
is
Is_Empty : Boolean renames Empties (T);
First : Big_Port_Index_Type renames Firsts (T);
Last : Big_Port_Index_Type renames Lasts (T);
FIFO_Size : Integer renames Thread_FIFO_Sizes (T);
P_Kind : Port_Kind renames Thread_Port_Kinds (T);
Offset : Integer renames Thread_FIFO_Offsets (T);
begin
-- This subprogram is called only when the thread has IN
-- ports.
pragma Assert (Is_In (P_Kind));
if Is_Empty then
-- If the FIFO is empty, return the latest received value
-- during the previous dispatches.
pragma Debug (Put_Line
(Verbose,
CE
+ ": Dequeue: Empty queue for "
+ Thread_Port_Images (T)));
P := Get_Most_Recent_Value (T);
elsif FIFO_Size = 0 then
-- If the FIFO is empty or non-existent, return the
-- latest received value during the previous dispatches.
pragma Debug (Put_Line
(Verbose,
CE
+ ": Dequeue: NO FIFO for "
+ Thread_Port_Images (T)));
P := Get_Most_Recent_Value (T);
else
pragma Debug (Put_Line
(Verbose,
CE
+ ": Dequeue: dequeuing "
+ Thread_Port_Images (T)));
if First = Last then
-- Update the value of N_Empties only when this is the
-- first time we mark the partial queue as empty.
if not Is_Empty and then Is_Event (P_Kind) then
N_Empties := N_Empties + 1;
end if;
Is_Empty := True;
end if;
P := Global_Data_Queue (First + Offset - 1);
if First = FIFO_Size then
First := Default_Index_Value;
elsif Global_Data_Queue_Size > 0
and then FIFO_Size > 1 then
First := Big_Port_Index_Type'Succ (First);
end if;
-- Shift the First index of the global history queue
H_Increment_First (GH_First);
end if;
-- Update the barrier
Not_Empty := N_Empties < N_Ports;
end Dequeue;
-------------
-- Read_In --
-------------
function Read_In (T : Port_Type) return Port_Stream_Entry is
P : Port_Stream_Entry;
Is_Empty : Boolean renames Empties (T);
First : Integer renames Firsts (T);
FIFO_Size : Integer renames Thread_FIFO_Sizes (T);
Offset : Integer renames Thread_FIFO_Offsets (T);
P_Kind : Port_Kind renames Thread_Port_Kinds (T);
begin
-- This subprogram is called only when the thread has IN
-- ports.
pragma Assert (Is_In (P_Kind));
if Is_Empty or else FIFO_Size = 0 then
-- If the FIFO is empty or non-existent return the
-- latest received value during the previous dispatches.
pragma Debug (Put_Line
(Verbose,
CE
+ ": Read_In: Empty queue for port "
+ Thread_Port_Images (T)
+ ". Reading the last stored value."));
P := Get_Most_Recent_Value (T);
else
pragma Debug (Put_Line
(Verbose,
CE
+ ": Read_In: Reading the oldest element in the"
+ " queue of port "
+ Thread_Port_Images (T)));
P := Global_Data_Queue (First + Offset - 1);
pragma Debug (Put_Line
(Verbose,
CE
+ ": Read_In: Global reading position: "
+ Integer'Image (First + Offset - 1)));
end if;
pragma Debug (Put_Line
(Verbose,
CE
+ ": Read_In: Value read from port "
+ Thread_Port_Images (T)));
return P;
end Read_In;
--------------
-- Read_Out --
--------------
function Read_Out (T : Port_Type) return Port_Stream_Entry is
begin
-- There is no need here to go through the Get_ routine
-- since we are sending, not receiving.
pragma Debug (Put_Line
(Verbose,
CE
+ ": Read_Out: Value read from port "
+ Thread_Port_Images (T)));
return Most_Recent_Values (T);
end Read_Out;
----------------
-- Is_Invalid --
----------------
function Is_Invalid (T : Port_Type) return Boolean is
begin
return not (Value_Put (T));
end Is_Invalid;
-----------------
-- Set_Invalid --
-----------------
procedure Set_Invalid (T : Port_Type) is
begin
pragma Debug (Put_Line
(Verbose,
CE
+ ": Set_Invalid: Setting INVALID for sending: port "
+ Thread_Port_Images (T)));
Value_Put (T) := False;
end Set_Invalid;
--------------
-- Store_In --
--------------
procedure Store_In (P : Port_Stream_Entry; T : Time; Not_Empty : out Boolean) is
Thread_Interface : constant Thread_Interface_Type
:= Stream_To_Interface (P.Payload);
PT : Port_Type renames Thread_Interface.Port;
Is_Empty : Boolean renames Empties (PT);
First : Integer renames Firsts (PT);
Last : Integer renames Lasts (PT);
P_Kind : Port_Kind renames Thread_Port_Kinds (PT);
FIFO_Size : Integer renames Thread_FIFO_Sizes (PT);
Offset : Integer renames Thread_FIFO_Offsets (PT);
Urgency : Integer renames Urgencies (PT);
Overflow_Protocol : Overflow_Handling_Protocol
renames Thread_Overflow_Protocols (PT);
Replace : Boolean := False;
begin
-- This subprogram is called only when the thread has IN
-- ports.
pragma Assert (Is_In (P_Kind));
-- Set PT as initialized
Initialized (PT) := True;
if Has_Event_Ports then
if Is_Event (P_Kind) then
-- If the FIFO is full apply the overflow-policy
-- indicated by the user.
if FIFO_Size > 0 then
if not Is_Empty
and then (Last = First - 1
or else (First = 1 and then Last = FIFO_Size))
then
declare
Frst : Integer;
GDH : Big_Port_Type_Array renames Global_Data_History;
begin
case Overflow_Protocol is
when DropOldest =>
-- Drop the oldest element in the FIFO
Global_Data_Queue (First + Offset - 1) := P;
pragma Debug
(Put_Line
(Verbose,
CE
+ ": Store_In: FIFO is full."
+ " Dropping oldest element."
+ " Global storage position: "
+ Integer'Image (First + Offset - 1)));
Last := First;
if First = FIFO_Size then
First := Default_Index_Value;
elsif Global_Data_Queue_Size > 0
and then FIFO_Size > 1
then
First := Big_Port_Index_Type'Succ (First);
end if;
-- Search the oldest element in the history
Frst := GH_First;
loop
if GDH (Frst) = PT then
exit;
end if;
Frst := Frst + 1;
if Frst > Global_Data_Queue_Size then
exit;
end if;
end loop;
if Frst > Global_Data_Queue_Size then
-- Second configuration, We have only
-- searched from GH_First to Queue_Size,
-- continue from the beginning to GH_Last.
-- ---------------------------------------------
-- |xxxxxxxxx|x| |x|xxxxxxxxxxxxx|
-- ---------------------------------------------
-- 1 GH_Last GH_First Queue_Size
Frst := 1;
loop
exit when GDH (Frst) = PT;
Frst := Frst + 1;
end loop;
end if;
when DropNewest =>
-- Drop the newest element in the FIFO
Global_Data_Queue (Last + Offset - 1) := P;
pragma Debug
(Put_Line
(Verbose,
CE
+ ": Store_In: FIFO is full."
+ " Dropping newest element"
+ " Global storage position: "
+ Integer'Image (Last + Offset - 1)));
-- Search the newest element in the history
Frst := GH_Last;
loop
if GDH (Frst) = PT then
exit;
end if;
Frst := Frst - 1;
if Frst < 1 then
exit;
end if;
end loop;
if Frst < 1 then
-- Continue the search from the end
Frst := Global_Data_Queue_Size;
loop
exit when GDH (Frst) = PT;
Frst := Frst - 1;
end loop;
end if;
when Error =>
raise Program_Error with
CE + ": Store_In: FIFO is full";
end case;
-- Remove event in the history and shift
-- others with the same urgency
pragma Debug
(Put_Line
(Verbose,
CE
+ ": Store_In: FIFO is full."
+ " Removed element in history at"
+ Integer'Image (Frst)));
loop
exit when Frst = Global_Data_Queue_Size
or else Urgencies (GDH (Frst)) < Urgency;
GDH (Frst) := GDH (Frst + 1);
Frst := Frst + 1;
end loop;
if Frst = Global_Data_Queue_Size
and then Urgencies (GDH (Frst)) < Urgency then
-- Continue suppressing from the beginning
Frst := 1;
GDH (Global_Data_Queue_Size) := GDH (Frst);
loop
exit when Urgencies (GDH (Frst)) < Urgency;
GDH (Frst) := GDH (Frst + 1);
Frst := Frst + 1;
end loop;
end if;
end;
Replace := True;
else
-- Update the value of N_Empties only when this is the
-- first time we mark the partial queue as NOT empty.
if Is_Empty then
N_Empties := N_Empties - 1;
end if;
Is_Empty := False;
if Last = FIFO_Size then
Last := Default_Index_Value;
elsif Global_Data_Queue_Size > 0 then
Last := Big_Port_Index_Type'Succ (Last);
end if;
Global_Data_Queue (Last + Offset - 1) := P;
pragma Debug (Put_Line
(Verbose,
CE
+ ": Store_In: Global storage position: "
+ Integer'Image (Last + Offset - 1)));
end if;
-- Update the oldest updated port value
declare
Frst : Integer := GH_Last;
Lst : constant Integer := GH_Last;
GDH : Big_Port_Type_Array renames Global_Data_History;
begin
-- Add an entry in the history
if not Replace then
H_Increment_Last (GH_Last);
end if;
if GH_First /= GH_Last then
-- Search the first entry with a higher urgency
-- and shift other entries