From 22b4790314e65aa933f48fa55d9ba923ef946ac1 Mon Sep 17 00:00:00 2001 From: yoogx Date: Sun, 8 Jun 2014 18:41:03 -0700 Subject: [PATCH] * Move Unprotected_Queue to a dedicated package --- src/Makefile.am | 30 +- src/polyorb_hi-thread_interrogators.adb | 1190 +---------------------- src/polyorb_hi-unprotected_queue.adb | 1003 +++++++++++++++++++ src/polyorb_hi-unprotected_queue.ads | 319 ++++++ 4 files changed, 1375 insertions(+), 1167 deletions(-) create mode 100644 src/polyorb_hi-unprotected_queue.adb create mode 100644 src/polyorb_hi-unprotected_queue.ads diff --git a/src/Makefile.am b/src/Makefile.am index e0a116f..a80767f 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -2,20 +2,22 @@ SUBDIRS = drivers # BEGIN: DO NOT DELETE THIS LINE ADA_SPECS_WITH_BODY = $(srcdir)/polyorb_hi-aperiodic_task.ads \ - $(srcdir)/polyorb_hi-background_task.ads \ - $(srcdir)/polyorb_hi-hybrid_task.ads \ - $(srcdir)/polyorb_hi-hybrid_task_driver.ads \ - $(srcdir)/polyorb_hi-marshallers_g.ads \ - $(srcdir)/polyorb_hi-messages.ads \ - $(srcdir)/polyorb_hi-output.ads \ - $(srcdir)/polyorb_hi-periodic_task.ads \ - $(srcdir)/polyorb_hi-port_kinds.ads \ - $(srcdir)/polyorb_hi-protocols.ads \ - $(srcdir)/polyorb_hi-sporadic_task.ads \ - $(srcdir)/polyorb_hi-suspenders.ads \ - $(srcdir)/polyorb_hi-thread_interrogators.ads \ - $(srcdir)/polyorb_hi-scheduler.ads \ - $(srcdir)/polyorb_hi-utils.ads + $(srcdir)/polyorb_hi-background_task.ads \ + $(srcdir)/polyorb_hi-hybrid_task.ads \ + $(srcdir)/polyorb_hi-hybrid_task_driver.ads \ + $(srcdir)/polyorb_hi-marshallers_g.ads \ + $(srcdir)/polyorb_hi-messages.ads \ + $(srcdir)/polyorb_hi-null_task.ads \ + $(srcdir)/polyorb_hi-output.ads \ + $(srcdir)/polyorb_hi-periodic_task.ads \ + $(srcdir)/polyorb_hi-port_kinds.ads \ + $(srcdir)/polyorb_hi-protocols.ads \ + $(srcdir)/polyorb_hi-sporadic_task.ads \ + $(srcdir)/polyorb_hi-suspenders.ads \ + $(srcdir)/polyorb_hi-thread_interrogators.ads \ + $(srcdir)/polyorb_hi-scheduler.ads \ + $(srcdir)/polyorb_hi-unprotected_queue.ads \ + $(srcdir)/polyorb_hi-utils.ads ADA_SPECS = $(ADA_SPECS_WITH_BODY) $(srcdir)/polyorb_hi.ads \ $(srcdir)/polyorb_hi-errors.ads \ diff --git a/src/polyorb_hi-thread_interrogators.adb b/src/polyorb_hi-thread_interrogators.adb index 74f9246..c500c3c 100644 --- a/src/polyorb_hi-thread_interrogators.adb +++ b/src/polyorb_hi-thread_interrogators.adb @@ -39,6 +39,7 @@ with PolyORB_HI.Port_Type_Marshallers; with PolyORB_HI.Streams; with PolyORB_HI.Time_Marshallers; with POlyORB_HI.Utils; +with PolyORB_HI.Unprotected_Queue; package body PolyORB_HI.Thread_Interrogators is @@ -49,1098 +50,32 @@ package body PolyORB_HI.Thread_Interrogators is use PolyORB_HI.Output; use PolyORB_HI.Utils; - -- The types and the routines below give a flexible way to handle - -- Thread_Interface_Type variables and to store them in arrays. - - type Port_Stream is - new PolyORB_HI.Streams.Stream_Element_Array - (1 .. Thread_Interface_Type'Size / 8); - - type Port_Stream_Entry is record - From : Entity_Type; - Payload : Port_Stream; - end record; - -- A couple of a message and its sender - - N_Ports : constant Integer := - Port_Type'Pos (Port_Type'Last) - Port_Type'Pos (Port_Type'First) + 1; - -- Number of ports in the thread - - function Interface_To_Stream is - new Ada.Unchecked_Conversion (Thread_Interface_Type, Port_Stream); - function Stream_To_Interface is - new Ada.Unchecked_Conversion (Port_Stream, Thread_Interface_Type); - - function CE return String; - pragma Inline (CE); - -- Shortcut to Entity_Image (Current_Entity) - - type Port_Stream_Array is array (Port_Type) of Port_Stream_Entry; - - subtype Big_Port_Index_Type is Integer range 0 .. Global_Data_Queue_Size; - - Default_Index_Value : constant Big_Port_Index_Type - := (if Global_Data_Queue_Size > 0 then 1 else 0); - - type Big_Port_Stream_Array is - array (Big_Port_Index_Type) of Port_Stream_Entry; - type Big_Port_Type_Array is array (Big_Port_Index_Type) of Port_Type; - -- FIXME: We begin by 0 although the 0 position is unused. We do - -- this to avoid compile time warning. After this package is - -- deeply tested, begin by 1 and disable Index_Check and - -- Range_Check for the Big_Port_Stream_Array type. - - type Port_Index_Array is array (Port_Type) of Big_Port_Index_Type; - -- An array type to specify the port FIFO sizes and urgencies. - - procedure H_Increment_First (F : in out Big_Port_Index_Type); - procedure H_Increment_Last (L : in out Big_Port_Index_Type); - pragma Inline (H_Increment_First); - pragma Inline (H_Increment_Last); - -- Cyclic incrementation and decrementation of F or L within the - -- 1..Global_Data_Queue_Size range. - - type Boolean_Array is array (Port_Type) of Boolean; - type Time_Array is array (Port_Type) of Time; - - ----------------------- - -- Unprotected_Queue -- - ----------------------- - - package Unprotected_Queue is - - procedure Read_Event - (P : out Port_Type; - Valid : out Boolean; - Not_Empty : Boolean); - -- Same as 'Wait_Event' but without blocking. Valid is set to - -- False if there is nothing to receive. - - procedure Dequeue - (T : Port_Type; P : out Port_Stream_Entry; Not_Empty : out Boolean); - -- Dequeue a value from the partial FIFO of port T. If there is - -- no enqueued value, return the latest dequeued value. - - function Read_In (T : Port_Type) return Port_Stream_Entry; - -- Read the oldest queued value on the partial FIFO of IN port - -- T without dequeuing it. If there is no queued value, return - -- the latest dequeued value. - - function Read_Out (T : Port_Type) return Port_Stream_Entry; - -- Return the value put for OUT port T. - - function Is_Invalid (T : Port_Type) return Boolean; - -- Return True if no Put_Value has been called for this port - -- since the last Set_Invalid call. - - procedure Set_Invalid (T : Port_Type); - -- Set the value stored for OUT port T as invalid to impede its - -- future sending without calling Put_Value. This procedure is - -- generally called just after Read_Out. However we cannot - -- combine them in one routine because we need Read_Out to be a - -- function and functions cannot modify protected object - -- states. - - procedure Store_In - (P : Port_Stream_Entry; T : Time; Not_Empty : out Boolean); - -- Stores a new incoming message in its corresponding - -- position. If this is an event [data] incoming message, then - -- stores it in the queue, updates its most recent value and - -- unblock the barrier. Otherwise, it only overrides the most - -- recent value. T is the time stamp associated to the port - -- P. In case of data ports with delayed connections, it - -- indicates the instant from which the data of P becomes - -- deliverable. - - procedure Store_Out (P : Port_Stream_Entry; T : Time); - -- Store a value of an OUT port to be sent at the next call to - -- Send_Output and mark the value as valid. - - function Count (T : Port_Type) return Integer; - -- Return the number of pending messages on IN port T. - - function Get_Time_Stamp (P : Port_Type) return Time; - -- Return the time stamp associated to port T - - -- The following are accessors to some internal data of the event queue - - function Get_Most_Recent_Value (P : Port_Type) return Port_Stream_Entry; - procedure Set_Most_Recent_Value - (P : Port_Type; - S : Port_Stream_Entry; - T : Time); - -- The protected object contains also an array to store the - -- values of received IN DATA ports as well as the most recent - -- value of IN EVENT DATA. For OUT port, the value is the - -- message to be send when Send_Output is called. In case of an - -- event data port, we do not use the 2 elements of the array - -- to store most recent values because there is no delayed - -- connections for event data ports. - - private - Global_Data_Queue : Big_Port_Stream_Array; - -- The structure of the buffer is as follows: - - -- ---------------------------------------------------------------- - -- | Q1 | Q2 | Q3 | ... | Qn | - -- ---------------------------------------------------------------- - -- O1 O2 O3 O4 ... On - - -- 'On' is the offset associated to IN [event] [data] port n, - -- given from the generic formal, Thread_FIFO_Offsets. This - -- guarantees an O(1) access and storage time of a given - -- element in the global queue. Intrinsically, the global table - -- is a concatenation of circular arrays each one corresponding - -- to a port queue. - - Firsts : Port_Index_Array := (Port_Type'Range => Default_Index_Value); - Lasts : Port_Index_Array := (Port_Type'Range => 0); - -- Used for IN [event] [data] ports to navigate in the global - -- queue. For IN DATA ports, in case of immediate connection - -- only the 'Lasts' value is relevant and it is 0 or 1, in case - -- of a delayed connection both values are relevant. - - Empties : Boolean_Array := (Port_Type'Range => True); - -- Indicates whether each port-FIFO is empty or not - - Global_Data_History : Big_Port_Type_Array; - GH_First : Big_Port_Index_Type := Default_Index_Value; - GH_Last : Big_Port_Index_Type := 0; - -- This contains, in an increasing chronological order the IN - -- EVENT ports that have a pending event. Example (P_1, P_3, - -- P_1, P_2, P_3) means that the oldest pending message is - -- received on P_1 then on P_3, then on P_1 again and so on... - - -- FIXME: Add N_Ports to the array size to handle the case the - -- thread has an IN event [data] port with a FIFO size equal to - -- zero which is not supported yet. - - Most_Recent_Values : Port_Stream_Array; - Time_Stamps : Time_Array; - - Initialized : Boolean_Array := (Port_Type'Range => False); - -- To indicate whether the port ever received a data (or an - -- event). - - Value_Put : Boolean_Array := (Port_Type'Range => False); - -- To indicate whether the OUT port values have been set in - -- order to be sent. - - N_Empties : Integer := N_Ports; - -- Number of empty partial queues. At the beginning, all the - -- queues are empty. - - end Unprotected_Queue; - - package body Unprotected_Queue is - - ---------------- - -- Read_Event -- - ---------------- - - procedure Read_Event - (P : out Port_Type; - Valid : out Boolean; - Not_Empty : Boolean) - is - begin - Valid := Not_Empty; - - if Valid then - P := Global_Data_History (GH_First); - - pragma Debug (Put_Line - (Verbose, - CE - + ": Read_Event: read valid event [data] on " - + Thread_Port_Images (P))); - end if; - end Read_Event; - - ------------- - -- Dequeue -- - ------------- - - procedure Dequeue - (T : Port_Type; - P : out Port_Stream_Entry; - Not_Empty : out Boolean) - is - Is_Empty : Boolean renames Empties (T); - First : Big_Port_Index_Type renames Firsts (T); - Last : Big_Port_Index_Type renames Lasts (T); - FIFO_Size : Integer renames Thread_FIFO_Sizes (T); - P_Kind : Port_Kind renames Thread_Port_Kinds (T); - Offset : Integer renames Thread_FIFO_Offsets (T); - begin - -- This subprogram is called only when the thread has IN - -- ports. - - pragma Assert (Is_In (P_Kind)); - - if Is_Empty then - -- If the FIFO is empty, return the latest received value - -- during the previous dispatches. - - pragma Debug (Put_Line - (Verbose, - CE - + ": Dequeue: Empty queue for " - + Thread_Port_Images (T))); - - P := Get_Most_Recent_Value (T); - - elsif FIFO_Size = 0 then - -- If the FIFO is empty or non-existent, return the - -- latest received value during the previous dispatches. - - pragma Debug (Put_Line - (Verbose, - CE - + ": Dequeue: NO FIFO for " - + Thread_Port_Images (T))); - - P := Get_Most_Recent_Value (T); - - else - pragma Debug (Put_Line - (Verbose, - CE - + ": Dequeue: dequeuing " - + Thread_Port_Images (T))); - - if First = Last then - -- Update the value of N_Empties only when this is the - -- first time we mark the partial queue as empty. - - if not Is_Empty and then Is_Event (P_Kind) then - N_Empties := N_Empties + 1; - end if; - - Is_Empty := True; - end if; - - P := Global_Data_Queue (First + Offset - 1); - - if First = FIFO_Size then - First := Default_Index_Value; - elsif Global_Data_Queue_Size > 0 - and then FIFO_Size > 1 then - First := Big_Port_Index_Type'Succ (First); - end if; - - -- Shift the First index of the global history queue - - H_Increment_First (GH_First); - end if; - - -- Update the barrier - - Not_Empty := N_Empties < N_Ports; - end Dequeue; - - ------------- - -- Read_In -- - ------------- - - function Read_In (T : Port_Type) return Port_Stream_Entry is - P : Port_Stream_Entry; - Is_Empty : Boolean renames Empties (T); - First : Integer renames Firsts (T); - FIFO_Size : Integer renames Thread_FIFO_Sizes (T); - Offset : Integer renames Thread_FIFO_Offsets (T); - P_Kind : Port_Kind renames Thread_Port_Kinds (T); - begin - -- This subprogram is called only when the thread has IN - -- ports. - - pragma Assert (Is_In (P_Kind)); - - if Is_Empty or else FIFO_Size = 0 then - -- If the FIFO is empty or non-existent return the - -- latest received value during the previous dispatches. - - pragma Debug (Put_Line - (Verbose, - CE - + ": Read_In: Empty queue for port " - + Thread_Port_Images (T) - + ". Reading the last stored value.")); - - P := Get_Most_Recent_Value (T); - else - pragma Debug (Put_Line - (Verbose, - CE - + ": Read_In: Reading the oldest element in the" - + " queue of port " - + Thread_Port_Images (T))); - - P := Global_Data_Queue (First + Offset - 1); - pragma Debug (Put_Line - (Verbose, - CE - + ": Read_In: Global reading position: " - + Integer'Image (First + Offset - 1))); - end if; - - pragma Debug (Put_Line - (Verbose, - CE - + ": Read_In: Value read from port " - + Thread_Port_Images (T))); - return P; - end Read_In; - - -------------- - -- Read_Out -- - -------------- - - function Read_Out (T : Port_Type) return Port_Stream_Entry is - begin - -- There is no need here to go through the Get_ routine - -- since we are sending, not receiving. - - pragma Debug (Put_Line - (Verbose, - CE - + ": Read_Out: Value read from port " - + Thread_Port_Images (T))); - - return Most_Recent_Values (T); - end Read_Out; - - ---------------- - -- Is_Invalid -- - ---------------- - - function Is_Invalid (T : Port_Type) return Boolean is - begin - return not (Value_Put (T)); - end Is_Invalid; - - ----------------- - -- Set_Invalid -- - ----------------- - - procedure Set_Invalid (T : Port_Type) is - begin - pragma Debug (Put_Line - (Verbose, - CE - + ": Set_Invalid: Setting INVALID for sending: port " - + Thread_Port_Images (T))); - - Value_Put (T) := False; - end Set_Invalid; - - -------------- - -- Store_In -- - -------------- - - procedure Store_In (P : Port_Stream_Entry; T : Time; Not_Empty : out Boolean) is - Thread_Interface : constant Thread_Interface_Type - := Stream_To_Interface (P.Payload); - PT : Port_Type renames Thread_Interface.Port; - Is_Empty : Boolean renames Empties (PT); - First : Integer renames Firsts (PT); - Last : Integer renames Lasts (PT); - P_Kind : Port_Kind renames Thread_Port_Kinds (PT); - FIFO_Size : Integer renames Thread_FIFO_Sizes (PT); - Offset : Integer renames Thread_FIFO_Offsets (PT); - Urgency : Integer renames Urgencies (PT); - Overflow_Protocol : Overflow_Handling_Protocol - renames Thread_Overflow_Protocols (PT); - Replace : Boolean := False; - begin - -- This subprogram is called only when the thread has IN - -- ports. - - pragma Assert (Is_In (P_Kind)); - - -- Set PT as initialized - - Initialized (PT) := True; - - if Has_Event_Ports then - if Is_Event (P_Kind) then - -- If the FIFO is full apply the overflow-policy - -- indicated by the user. - - if FIFO_Size > 0 then - if not Is_Empty - and then (Last = First - 1 - or else (First = 1 and then Last = FIFO_Size)) - then - declare - Frst : Integer; - GDH : Big_Port_Type_Array renames Global_Data_History; - begin - case Overflow_Protocol is - when DropOldest => - -- Drop the oldest element in the FIFO - - Global_Data_Queue (First + Offset - 1) := P; - pragma Debug - (Put_Line - (Verbose, - CE - + ": Store_In: FIFO is full." - + " Dropping oldest element." - + " Global storage position: " - + Integer'Image (First + Offset - 1))); - - Last := First; - - if First = FIFO_Size then - First := Default_Index_Value; - elsif Global_Data_Queue_Size > 0 - and then FIFO_Size > 1 - then - First := Big_Port_Index_Type'Succ (First); - end if; - - -- Search the oldest element in the history - - Frst := GH_First; - loop - if GDH (Frst) = PT then - exit; - end if; - Frst := Frst + 1; - if Frst > Global_Data_Queue_Size then - exit; - end if; - end loop; - - if Frst > Global_Data_Queue_Size then - -- Second configuration, We have only - -- searched from GH_First to Queue_Size, - -- continue from the beginning to GH_Last. - - -- --------------------------------------------- - -- |xxxxxxxxx|x| |x|xxxxxxxxxxxxx| - -- --------------------------------------------- - -- 1 GH_Last GH_First Queue_Size - Frst := 1; - loop - exit when GDH (Frst) = PT; - Frst := Frst + 1; - end loop; - end if; - - when DropNewest => - -- Drop the newest element in the FIFO - - Global_Data_Queue (Last + Offset - 1) := P; - pragma Debug - (Put_Line - (Verbose, - CE - + ": Store_In: FIFO is full." - + " Dropping newest element" - + " Global storage position: " - + Integer'Image (Last + Offset - 1))); - - -- Search the newest element in the history - - Frst := GH_Last; - loop - if GDH (Frst) = PT then - exit; - end if; - Frst := Frst - 1; - if Frst < 1 then - exit; - end if; - end loop; - - if Frst < 1 then - -- Continue the search from the end - Frst := Global_Data_Queue_Size; - loop - exit when GDH (Frst) = PT; - Frst := Frst - 1; - end loop; - end if; - when Error => - raise Program_Error with - CE + ": Store_In: FIFO is full"; - end case; - - -- Remove event in the history and shift - -- others with the same urgency - - pragma Debug - (Put_Line - (Verbose, - CE - + ": Store_In: FIFO is full." - + " Removed element in history at" - + Integer'Image (Frst))); - - loop - exit when Frst = Global_Data_Queue_Size - or else Urgencies (GDH (Frst)) < Urgency; - GDH (Frst) := GDH (Frst + 1); - Frst := Frst + 1; - end loop; - - if Frst = Global_Data_Queue_Size - and then Urgencies (GDH (Frst)) < Urgency then - -- Continue suppressing from the beginning - Frst := 1; - GDH (Global_Data_Queue_Size) := GDH (Frst); - loop - exit when Urgencies (GDH (Frst)) < Urgency; - GDH (Frst) := GDH (Frst + 1); - Frst := Frst + 1; - end loop; - end if; - end; - Replace := True; - else - -- Update the value of N_Empties only when this is the - -- first time we mark the partial queue as NOT empty. - - if Is_Empty then - N_Empties := N_Empties - 1; - end if; - - Is_Empty := False; - - if Last = FIFO_Size then - Last := Default_Index_Value; - elsif Global_Data_Queue_Size > 0 then - Last := Big_Port_Index_Type'Succ (Last); - end if; - - Global_Data_Queue (Last + Offset - 1) := P; - pragma Debug (Put_Line - (Verbose, - CE - + ": Store_In: Global storage position: " - + Integer'Image (Last + Offset - 1))); - - end if; - - -- Update the oldest updated port value - declare - Frst : Integer := GH_Last; - Lst : constant Integer := GH_Last; - GDH : Big_Port_Type_Array renames Global_Data_History; - begin - - -- Add an entry in the history - if not Replace then - H_Increment_Last (GH_Last); - end if; - - if GH_First /= GH_Last then - -- Search the first entry with a higher urgency - -- and shift other entries - if Frst = Global_Data_Queue_Size - and then Urgencies (GDH (Frst)) < Urgency then - GDH (GH_Last) := GDH (Frst); - Frst := Frst - 1; - end if; - loop - if Urgencies (GDH (Frst)) >= Urgency then - exit; - end if; - GDH (Frst + 1) := GDH (Frst); - Frst := Frst - 1; - exit when (GH_First <= Lst - and then Frst < GH_First) - or else Frst < 1; - end loop; - - if Frst < 1 and then GH_First > Lst then - -- Continue the search from the end - Frst := Global_Data_Queue_Size; - if Urgencies (GDH (Frst)) < Urgency then - GDH (1 mod GDH'Length) := GDH (Frst); - Frst := Frst - 1; - loop - if Urgencies (GDH (Frst)) >= Urgency then - exit; - end if; - GDH (Frst + 1) := GDH (Frst); - Frst := Frst - 1; - exit when Frst < GH_First; - end loop; - end if; - end if; - end if; - - -- Insert the port of the event - if Frst = Global_Data_Queue_Size then - GDH (1 mod GDH'Size) := PT; - -- The modulo avoids warning when accessing - -- GDH (1) while Queue_Size = 0 - pragma Debug (Put_Line - (Verbose, - CE - + ": Store_In: Insert event" - + " in history at: " - + Integer'Image (1))); - else - GDH (Frst + 1) := PT; - pragma Debug (Put_Line - (Verbose, - CE - + ": Store_In: Insert event" - + " in history at: " - + Integer'Image (Frst + 1))); - end if; - end; - - end if; - - -- Update the most recent value corresponding to port PT - - Set_Most_Recent_Value (PT, P, T); - - pragma Debug (Put_Line - (Verbose, - CE - + ": Store_In: Enqueued Event [Data] message" - + " for port " - + Thread_Port_Images (PT))); - - -- Update the barrier - - Not_Empty := True; - end if; - end if; - - -- If this is a data port, we only override the - -- Most_Recent_Value corresponding to the port. - - if not Is_Event (P_Kind) then - pragma Debug (Put_Line - (Verbose, - CE - + ": Store_In: Storing Data message in DATA port " - + Thread_Port_Images (PT))); - - Set_Most_Recent_Value (PT, P, T); - - pragma Debug (Put_Line - (Verbose, - CE - + ": Store_In: Stored Data message in DATA port " - + Thread_Port_Images (PT))); - - end if; - end Store_In; - - --------------- - -- Store_Out -- - --------------- - - procedure Store_Out (P : Port_Stream_Entry; T : Time) is - Thread_Interface : constant Thread_Interface_Type - := Stream_To_Interface (P.Payload); - PT : Port_Type renames Thread_Interface.Port; - begin - pragma Debug (Put_Line - (Verbose, - CE - + ": Store_Out: Storing value for sending: port " - + Thread_Port_Images (PT))); - - -- Mark as valid for sending - - Value_Put (PT) := True; - - pragma Debug (Put_Line - (Verbose, - CE - + ": Store_Out: Value stored for sending: port " - + Thread_Port_Images (PT))); - - -- No need to go through the Set_ routine since we are - -- sending, not receiving. - - Most_Recent_Values (PT) := P; - Time_Stamps (PT) := T; -- overwritten below - -- Maxime workaround for backdoor accesses - Time_Stamps (PT) := Ada.Real_time.clock; - - end Store_Out; - - ----------- - -- Count -- - ----------- - - function Count (T : Port_Type) return Integer is - Is_Empty : Boolean renames Empties (T); - First : Integer renames Firsts (T); - Last : Integer renames Lasts (T); - P_Kind : Port_Kind renames Thread_Port_Kinds (T); - FIFO_Size : Integer renames Thread_FIFO_Sizes (T); - begin - -- This subprogram is called only when the thread has IN - -- ports. - - pragma Assert (Is_In (P_Kind)); - - if not Initialized (T) then - pragma Debug (Put_Line - (Verbose, - CE - + ": Count: Not initialized port: " - + Thread_Port_Images (T))); - - return -1; - - elsif Is_Empty then - pragma Debug (Put_Line - (Verbose, - CE - + ": Count: Empty FIFO for port " - + Thread_Port_Images (T))); - - return 0; - - elsif FIFO_Size = 0 then - pragma Debug (Put_Line - (Verbose, - CE - + ": Count: No FIFO for port " - + Thread_Port_Images (T))); - - return 0; - - else - pragma Debug (Put_Line - (Verbose, - CE - + ": Count: FIFO exists for port " - + Thread_Port_Images (T))); - - if Last >= First then - -- First configuration - - -- ------------------------------------------------------- - -- | |x|xxxxxxxxxxxxxxxxxxxxxxxxx|x| | - -- ------------------------------------------------------- - -- First Last - - return (Last - First) + 1; - - else - -- Second configuration - - -- ------------------------------------------------------- - -- |xxxxxxxxx|x| |x|xxxxxxxxxxxxx| - -- ------------------------------------------------------- - -- Last First - - return FIFO_Size - First + Last + 1; - end if; - end if; - end Count; - - --------------------------- - -- Get_Most_Recent_Value -- - --------------------------- - - function Get_Most_Recent_Value - (P : Port_Type) - return Port_Stream_Entry - is - First : Integer renames Firsts (P); - Last : Integer renames Lasts (P); - P_Kind : Port_Kind renames Thread_Port_Kinds (P); - FIFO_Size : Integer renames Thread_FIFO_Sizes (P); - Offset : Integer renames Thread_FIFO_Offsets (P); - T : constant Time := Clock; - S : Port_Stream_Entry; - begin - if Has_Event_Ports then - if Is_Event (P_Kind) then - pragma Debug - (Put_Line - (Verbose, - CE - + ": Get_Most_Recent_Value: event [data] port " - + Thread_Port_Images (P))); - - S := Most_Recent_Values (P); - end if; - end if; - if not Is_Event (P_Kind) then - if FIFO_Size = 1 then - -- Immediate connection - - pragma Debug - (Put_Line - (Verbose, - CE - + ": Get_Most_Recent_Value: data port " - + Thread_Port_Images (P) - + ". Immediate connection")); - pragma Debug - (Put_Line - (Verbose, - CE - + ": Get_Most_Recent_Value: First =" - + Integer'Image (First))); - pragma Debug - (Put_Line - (Verbose, - CE - + ": Get_Most_Recent_Value: Last = " - + Integer'Image (Last))); - pragma Debug - (Put_Line - (Verbose, - CE - + ": Get_Most_Recent_Value: Offset = " - + Integer'Image (Offset))); - pragma Debug - (Put_Line - (Verbose, - CE - + ": Get_Most_Recent_Value: Global_Data_Queue_Size = " - + Integer'Image (Global_Data_Queue_Size))); - - S := Global_Data_Queue (First + Offset - 1); - - pragma Debug - (Put_Line - (Verbose, - CE - + ": Get_Most_Recent_Value: Most recent value for" - + " data port " - + Thread_Port_Images (P) - + " got. Immediate connection")); - else - -- Delayed connection: The element indexed by First is - -- the oldest element and the element indexed by Last - -- is the most recent element. - - pragma Debug - (Put_Line - (Verbose, - CE - + ": Get_Most_Recent_Value: data port " - + Thread_Port_Images (P) - + ". Delayed connection")); - pragma Debug - (Put_Line - (Verbose, - CE - + ": Get_Most_Recent_Value: First = " - + Integer'Image (First))); - pragma Debug - (Put_Line - (Verbose, - CE - + ": Get_Most_Recent_Value: Last = " - + Integer'Image (Last))); - pragma Debug - (Put_Line - (Verbose, - " Offset = " + Integer'Image (Offset))); - pragma Debug - (Put_Line - (Verbose, - CE - + ": Get_Most_Recent_Value: Global_Data_Queue_Size = " - + Integer'Image (Global_Data_Queue_Size))); - - if Time_Stamps (P) <= T then - pragma Debug - (Put_Line - (Verbose, - CE + ": Get_Most_Recent_Value: Getting NEW value")); - - S := Global_Data_Queue (Last + Offset - 1); - else - pragma Debug - (Put_Line - (Verbose, - CE + ": Get_Most_Recent_Value: Getting OLD value")); - - S := Global_Data_Queue (First + Offset - 1); - end if; - - pragma Debug - (Put_Line - (Verbose, - CE - + ": Get_Most_Recent_Value: Most recent value" - + " for data port " - + Thread_Port_Images (P) - + " got. Delayed connection")); - end if; - end if; - - return S; - end Get_Most_Recent_Value; - - --------------------------- - -- Set_Most_Recent_Value -- - --------------------------- - - procedure Set_Most_Recent_Value - (P : Port_Type; - S : Port_Stream_Entry; - T : Time) - is - First : Big_Port_Index_Type renames Firsts (P); - Last : Big_Port_Index_Type renames Lasts (P); - P_Kind : Port_Kind renames Thread_Port_Kinds (P); - FIFO_Size : Integer renames Thread_FIFO_Sizes (P); - Offset : Integer renames Thread_FIFO_Offsets (P); - begin - if Global_Data_Queue_Size = 0 then - -- XXX Actually, if the queue has a null size, this - -- function is never called, hence we can exit - -- immediatly. This should be captured with a proper - -- pre-condition. We need this trap to avoid GNATProve - -- attempting to prove the code below in this particular - -- case. - return; - end if; - - if Has_Event_Ports then - if Is_Event (P_Kind) then - pragma Debug (Put_Line - (Verbose, - CE - + ": Set_Most_Recent_Value: event [data] port " - + Thread_Port_Images (P))); - - Most_Recent_Values (P) := S; - - pragma Debug (Put_Line - (Verbose, - CE - + ": Set_Most_Recent_Value: event [data] port " - + Thread_Port_Images (P) - + ". Done.")); - end if; - end if; - if not Is_Event (P_Kind) then - Time_Stamps (P) := T; - - if FIFO_Size = 1 then - -- Immediate connection - - pragma Debug - (Put_Line - (Verbose, - CE - + ": Set_Most_Recent_Value: data port " - + Thread_Port_Images (P) - + ". Immediate connection")); - pragma Debug - (Put_Line - (Verbose, - CE - + ": Set_Most_Recent_Value: First =" - + Integer'Image (First))); - pragma Debug - (Put_Line - (Verbose, - CE - + ": Set_Most_Recent_Value: Last = " - + Integer'Image (Last))); - pragma Debug - (Put_Line - (Verbose, - CE - + ": Set_Most_Recent_Value: Offset = " - + Integer'Image (Offset))); - pragma Debug - (Put_Line - (Verbose, - CE - + ": Set_Most_Recent_Value: Global_Data_Queue_Size = " - + Integer'Image (Global_Data_Queue_Size))); - - Global_Data_Queue (First + Offset - 1) := S; - - pragma Debug - (Put_Line - (Verbose, - CE - + ": Set_Most_Recent_Value: Most recent value" - + " for data port " - + Thread_Port_Images (P) - + " set. Immediate connection")); - else - -- Delayed connection: The element indexed by First must be - -- the oldest element and the element indexed by Last - -- is the most recent element. - -- XXX JH: why? - - pragma Debug - (Put_Line - (Verbose, - CE - + ": Set_Most_Recent_Value: data port " - + Thread_Port_Images (P) - + ". Delayed connection")); - pragma Debug - (Put_Line - (Verbose, - CE - + ": Set_Most_Recent_Value: First = " - + Integer'Image (First))); - pragma Debug - (Put_Line - (Verbose, - CE - + ": Set_Most_Recent_Value: Last = " - + Integer'Image (Last))); - pragma Debug - (Put_Line - (Verbose, - " Offset = " + Integer'Image (Offset))); - pragma Debug - (Put_Line - (Verbose, - CE - + ": Set_Most_Recent_Value: Global_Data_Queue_Size = " - + Integer'Image (Global_Data_Queue_Size))); - - Global_Data_Queue (First + Offset - 1) := - Global_Data_Queue (Last + Offset - 1); - Global_Data_Queue (Last + Offset - 1) := S; - - pragma Debug - (Put_Line - (Verbose, - CE - + ": Set_Most_Recent_Value: Most recent value" - + " for data port " - + Thread_Port_Images (P) - + " set. Delayed connection")); - end if; - end if; - end Set_Most_Recent_Value; - - -------------------- - -- Get_Time_Stamp -- - -------------------- - - function Get_Time_Stamp (P : Port_Type) return Time is - begin - pragma Debug (Put_Line - (Verbose, - CE - + ": Get_Time_Stamp: port " - + Thread_Port_Images (P))); - - return Time_Stamps (P); - end Get_Time_Stamp; + -------- + -- UQ -- + -------- - end Unprotected_Queue; + package UQ is new PolyORB_HI.Unprotected_Queue + (Port_Type, + Integer_Array, + Port_Kind_Array, + Port_Image_Array, + Address_Array, + Overflow_Protocol_Array, + Thread_Interface_Type, + Current_Entity, + Thread_Port_Kinds, + Has_Event_Ports, + Thread_Port_Images, + Thread_Fifo_Sizes, + Thread_Fifo_Offsets, + Thread_Overflow_Protocols, + Urgencies, + Global_Data_Queue_Size, + N_Destinations, + Destinations, + Marshall, + Next_Deadline); + use UQ; ------------------ -- Global_Queue -- @@ -1225,7 +160,7 @@ package body PolyORB_HI.Thread_Interrogators is entry Wait_Event (P : out Port_Type) when Not_Empty is Valid : Boolean; begin - Unprotected_Queue.Read_Event (P, Valid, Not_Empty); + UQ.Read_Event (P, Valid, Not_Empty); pragma Debug (Put_Line (Verbose, @@ -1240,7 +175,7 @@ package body PolyORB_HI.Thread_Interrogators is procedure Read_Event (P : out Port_Type; Valid : out Boolean) is begin - Unprotected_Queue.Read_Event (P, Valid, Not_Empty); + UQ.Read_Event (P, Valid, Not_Empty); end Read_Event; ------------- @@ -1249,7 +184,7 @@ package body PolyORB_HI.Thread_Interrogators is procedure Dequeue (T : Port_Type; P : out Port_Stream_Entry) is begin - Unprotected_Queue.Dequeue (T, P, Not_Empty); + UQ.Dequeue (T, P, Not_Empty); end Dequeue; ------------- @@ -1258,7 +193,7 @@ package body PolyORB_HI.Thread_Interrogators is function Read_In (T : Port_Type) return Port_Stream_Entry is begin - return Unprotected_Queue.Read_In (T); + return UQ.Read_In (T); end Read_In; -------------- @@ -1267,7 +202,7 @@ package body PolyORB_HI.Thread_Interrogators is function Read_Out (T : Port_Type) return Port_Stream_Entry is begin - return Unprotected_Queue.Read_Out (T); + return UQ.Read_Out (T); end Read_Out; ---------------- @@ -1276,7 +211,7 @@ package body PolyORB_HI.Thread_Interrogators is function Is_Invalid (T : Port_Type) return Boolean is begin - return Unprotected_Queue.Is_Invalid (T); + return UQ.Is_Invalid (T); end Is_Invalid; ----------------- @@ -1285,7 +220,7 @@ package body PolyORB_HI.Thread_Interrogators is procedure Set_Invalid (T : Port_Type) is begin - Unprotected_Queue.Set_Invalid (T); + UQ.Set_Invalid (T); end Set_Invalid; -------------- @@ -1294,7 +229,7 @@ package body PolyORB_HI.Thread_Interrogators is procedure Store_In (P : Port_Stream_Entry; T : Time) is begin - Unprotected_Queue.Store_In (P, T, Not_Empty); + UQ.Store_In (P, T, Not_Empty); end Store_In; --------------- @@ -1303,7 +238,7 @@ package body PolyORB_HI.Thread_Interrogators is procedure Store_Out (P : Port_Stream_Entry; T : Time) is begin - Unprotected_Queue.Store_Out (P, T); + UQ.Store_Out (P, T); end Store_Out; ----------- @@ -1312,7 +247,7 @@ package body PolyORB_HI.Thread_Interrogators is function Count (T : Port_Type) return Integer is begin - return Unprotected_Queue.Count (T); + return UQ.Count (T); end Count; -------------------- @@ -1321,7 +256,7 @@ package body PolyORB_HI.Thread_Interrogators is function Get_Time_Stamp (P : Port_Type) return Time is begin - return Unprotected_Queue.Get_Time_Stamp (P); + return UQ.Get_Time_Stamp (P); end Get_Time_Stamp; end Global_Queue; @@ -1575,55 +510,4 @@ package body PolyORB_HI.Thread_Interrogators is return Global_Queue.Get_Time_Stamp (P); end Get_Time_Stamp; - ----------------------- - -- H_Increment_First -- - ----------------------- - - procedure H_Increment_First (F : in out Big_Port_Index_Type) is - begin - if Big_Port_Index_Type'Last > 0 then - if F < Big_Port_Index_Type'Last then - F := Big_Port_Index_Type'Succ (F); - else - F := Default_Index_Value; - end if; - - pragma Debug (Put_Line - (Verbose, - CE - + ": H_Increment_First: F =" - + Integer'Image (F))); - end if; - end H_Increment_First; - - ---------------------- - -- H_Increment_Last -- - ---------------------- - - procedure H_Increment_Last (L : in out Big_Port_Index_Type) is - begin - if Big_Port_Index_Type'Last > 0 then - if L < Big_Port_Index_Type'Last then - L := Big_Port_Index_Type'Succ (L); - else - L := Default_Index_Value; - end if; - - pragma Debug (Put_Line - (Verbose, - CE - + ": H_Increment_Last: L =" - + Integer'Image (L))); - end if; - end H_Increment_Last; - - -------- - -- CE -- - -------- - - function CE return String is - begin - return Entity_Image (Current_Entity); - end CE; - end PolyORB_HI.Thread_Interrogators; diff --git a/src/polyorb_hi-unprotected_queue.adb b/src/polyorb_hi-unprotected_queue.adb new file mode 100644 index 0000000..3778cf9 --- /dev/null +++ b/src/polyorb_hi-unprotected_queue.adb @@ -0,0 +1,1003 @@ +------------------------------------------------------------------------------ +-- -- +-- PolyORB HI COMPONENTS -- +-- -- +-- P O L Y O R B _ H I . U N P R O T E C T E D _ Q U E U E -- +-- -- +-- B o d y -- +-- -- +-- Copyright (C) 2014 ESA & ISAE. -- +-- -- +-- PolyORB HI is free software; you can redistribute it and/or modify it -- +-- under terms of the GNU General Public License as published by the Free -- +-- Software Foundation; either version 2, or (at your option) any later. -- +-- PolyORB HI is distributed in the hope that it will be useful, but -- +-- WITHOUT ANY WARRANTY; without even the implied warranty of -- +-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General -- +-- Public License for more details. You should have received a copy of the -- +-- GNU General Public License distributed with PolyORB HI; see file -- +-- COPYING. If not, write to the Free Software Foundation, 51 Franklin -- +-- Street, Fifth Floor, Boston, MA 02111-1301, USA. -- +-- -- +-- As a special exception, if other files instantiate generics from this -- +-- unit, or you link this unit with other files to produce an executable, -- +-- this unit does not by itself cause the resulting executable to be -- +-- covered by the GNU General Public License. This exception does not -- +-- however invalidate any other reasons why the executable file might be -- +-- covered by the GNU Public License. -- +-- -- +-- PolyORB-HI/Ada is maintained by the TASTE project -- +-- (taste-users@lists.tuxfamily.org) -- +-- -- +------------------------------------------------------------------------------ + +with PolyORB_HI.Output; +with PolyORB_HI.Utils; + +package body PolyORB_HI.Unprotected_Queue is + + use type PolyORB_HI.Streams.Stream_Element_Offset; + use PolyORB_HI.Port_Kinds; + use Ada.Real_Time; + use PolyORB_HI_Generated.Deployment; + use PolyORB_HI.Output; + use PolyORB_HI.Utils; + + ---------------- + -- Read_Event -- + ---------------- + + procedure Read_Event + (P : out Port_Type; + Valid : out Boolean; + Not_Empty : Boolean) + is + begin + Valid := Not_Empty; + + if Valid then + P := Global_Data_History (GH_First); + + pragma Debug (Put_Line + (Verbose, + CE + + ": Read_Event: read valid event [data] on " + + Thread_Port_Images (P))); + end if; + end Read_Event; + + ------------- + -- Dequeue -- + ------------- + + procedure Dequeue + (T : Port_Type; + P : out Port_Stream_Entry; + Not_Empty : out Boolean) + is + Is_Empty : Boolean renames Empties (T); + First : Big_Port_Index_Type renames Firsts (T); + Last : Big_Port_Index_Type renames Lasts (T); + FIFO_Size : Integer renames Thread_FIFO_Sizes (T); + P_Kind : Port_Kind renames Thread_Port_Kinds (T); + Offset : Integer renames Thread_FIFO_Offsets (T); + begin + -- This subprogram is called only when the thread has IN + -- ports. + + pragma Assert (Is_In (P_Kind)); + + if Is_Empty then + -- If the FIFO is empty, return the latest received value + -- during the previous dispatches. + + pragma Debug (Put_Line + (Verbose, + CE + + ": Dequeue: Empty queue for " + + Thread_Port_Images (T))); + + P := Get_Most_Recent_Value (T); + + elsif FIFO_Size = 0 then + -- If the FIFO is empty or non-existent, return the + -- latest received value during the previous dispatches. + + pragma Debug (Put_Line + (Verbose, + CE + + ": Dequeue: NO FIFO for " + + Thread_Port_Images (T))); + + P := Get_Most_Recent_Value (T); + + else + pragma Debug (Put_Line + (Verbose, + CE + + ": Dequeue: dequeuing " + + Thread_Port_Images (T))); + + if First = Last then + -- Update the value of N_Empties only when this is the + -- first time we mark the partial queue as empty. + + if not Is_Empty and then Is_Event (P_Kind) then + N_Empties := N_Empties + 1; + end if; + + Is_Empty := True; + end if; + + P := Global_Data_Queue (First + Offset - 1); + + if First = FIFO_Size then + First := Default_Index_Value; + elsif Global_Data_Queue_Size > 0 + and then FIFO_Size > 1 then + First := Big_Port_Index_Type'Succ (First); + end if; + + -- Shift the First index of the global history queue + + H_Increment_First (GH_First); + end if; + + -- Update the barrier + + Not_Empty := N_Empties < N_Ports; + end Dequeue; + + ------------- + -- Read_In -- + ------------- + + function Read_In (T : Port_Type) return Port_Stream_Entry is + P : Port_Stream_Entry; + Is_Empty : Boolean renames Empties (T); + First : Integer renames Firsts (T); + FIFO_Size : Integer renames Thread_FIFO_Sizes (T); + Offset : Integer renames Thread_FIFO_Offsets (T); + P_Kind : Port_Kind renames Thread_Port_Kinds (T); + begin + -- This subprogram is called only when the thread has IN + -- ports. + + pragma Assert (Is_In (P_Kind)); + + if Is_Empty or else FIFO_Size = 0 then + -- If the FIFO is empty or non-existent return the + -- latest received value during the previous dispatches. + + pragma Debug (Put_Line + (Verbose, + CE + + ": Read_In: Empty queue for port " + + Thread_Port_Images (T) + + ". Reading the last stored value.")); + + P := Get_Most_Recent_Value (T); + else + pragma Debug (Put_Line + (Verbose, + CE + + ": Read_In: Reading the oldest element in the" + + " queue of port " + + Thread_Port_Images (T))); + + P := Global_Data_Queue (First + Offset - 1); + pragma Debug (Put_Line + (Verbose, + CE + + ": Read_In: Global reading position: " + + Integer'Image (First + Offset - 1))); + end if; + + pragma Debug (Put_Line + (Verbose, + CE + + ": Read_In: Value read from port " + + Thread_Port_Images (T))); + return P; + end Read_In; + + -------------- + -- Read_Out -- + -------------- + + function Read_Out (T : Port_Type) return Port_Stream_Entry is + begin + -- There is no need here to go through the Get_ routine + -- since we are sending, not receiving. + + pragma Debug (Put_Line + (Verbose, + CE + + ": Read_Out: Value read from port " + + Thread_Port_Images (T))); + + return Most_Recent_Values (T); + end Read_Out; + + ---------------- + -- Is_Invalid -- + ---------------- + + function Is_Invalid (T : Port_Type) return Boolean is + begin + return not (Value_Put (T)); + end Is_Invalid; + + ----------------- + -- Set_Invalid -- + ----------------- + + procedure Set_Invalid (T : Port_Type) is + begin + pragma Debug (Put_Line + (Verbose, + CE + + ": Set_Invalid: Setting INVALID for sending: port " + + Thread_Port_Images (T))); + + Value_Put (T) := False; + end Set_Invalid; + + -------------- + -- Store_In -- + -------------- + + procedure Store_In (P : Port_Stream_Entry; T : Time; Not_Empty : out Boolean) is + Thread_Interface : constant Thread_Interface_Type + := Stream_To_Interface (P.Payload); + PT : Port_Type renames Thread_Interface.Port; + Is_Empty : Boolean renames Empties (PT); + First : Integer renames Firsts (PT); + Last : Integer renames Lasts (PT); + P_Kind : Port_Kind renames Thread_Port_Kinds (PT); + FIFO_Size : Integer renames Thread_FIFO_Sizes (PT); + Offset : Integer renames Thread_FIFO_Offsets (PT); + Urgency : Integer renames Urgencies (PT); + Overflow_Protocol : Overflow_Handling_Protocol + renames Thread_Overflow_Protocols (PT); + Replace : Boolean := False; + begin + -- This subprogram is called only when the thread has IN + -- ports. + + pragma Assert (Is_In (P_Kind)); + + -- Set PT as initialized + + Initialized (PT) := True; + + if Has_Event_Ports then + if Is_Event (P_Kind) then + -- If the FIFO is full apply the overflow-policy + -- indicated by the user. + + if FIFO_Size > 0 then + if not Is_Empty + and then (Last = First - 1 + or else (First = 1 and then Last = FIFO_Size)) + then + declare + Frst : Integer; + GDH : Big_Port_Type_Array renames Global_Data_History; + begin + case Overflow_Protocol is + when DropOldest => + -- Drop the oldest element in the FIFO + + Global_Data_Queue (First + Offset - 1) := P; + pragma Debug + (Put_Line + (Verbose, + CE + + ": Store_In: FIFO is full." + + " Dropping oldest element." + + " Global storage position: " + + Integer'Image (First + Offset - 1))); + + Last := First; + + if First = FIFO_Size then + First := Default_Index_Value; + elsif Global_Data_Queue_Size > 0 + and then FIFO_Size > 1 + then + First := Big_Port_Index_Type'Succ (First); + end if; + + -- Search the oldest element in the history + + Frst := GH_First; + loop + if GDH (Frst) = PT then + exit; + end if; + Frst := Frst + 1; + if Frst > Global_Data_Queue_Size then + exit; + end if; + end loop; + + if Frst > Global_Data_Queue_Size then + -- Second configuration, We have only + -- searched from GH_First to Queue_Size, + -- continue from the beginning to GH_Last. + + -- --------------------------------------------- + -- |xxxxxxxxx|x| |x|xxxxxxxxxxxxx| + -- --------------------------------------------- + -- 1 GH_Last GH_First Queue_Size + Frst := 1; + loop + exit when GDH (Frst) = PT; + Frst := Frst + 1; + end loop; + end if; + + when DropNewest => + -- Drop the newest element in the FIFO + + Global_Data_Queue (Last + Offset - 1) := P; + pragma Debug + (Put_Line + (Verbose, + CE + + ": Store_In: FIFO is full." + + " Dropping newest element" + + " Global storage position: " + + Integer'Image (Last + Offset - 1))); + + -- Search the newest element in the history + + Frst := GH_Last; + loop + if GDH (Frst) = PT then + exit; + end if; + Frst := Frst - 1; + if Frst < 1 then + exit; + end if; + end loop; + + if Frst < 1 then + -- Continue the search from the end + Frst := Global_Data_Queue_Size; + loop + exit when GDH (Frst) = PT; + Frst := Frst - 1; + end loop; + end if; + when Error => + raise Program_Error with + CE + ": Store_In: FIFO is full"; + end case; + + -- Remove event in the history and shift + -- others with the same urgency + + pragma Debug + (Put_Line + (Verbose, + CE + + ": Store_In: FIFO is full." + + " Removed element in history at" + + Integer'Image (Frst))); + + loop + exit when Frst = Global_Data_Queue_Size + or else Urgencies (GDH (Frst)) < Urgency; + GDH (Frst) := GDH (Frst + 1); + Frst := Frst + 1; + end loop; + + if Frst = Global_Data_Queue_Size + and then Urgencies (GDH (Frst)) < Urgency then + -- Continue suppressing from the beginning + Frst := 1; + GDH (Global_Data_Queue_Size) := GDH (Frst); + loop + exit when Urgencies (GDH (Frst)) < Urgency; + GDH (Frst) := GDH (Frst + 1); + Frst := Frst + 1; + end loop; + end if; + end; + Replace := True; + else + -- Update the value of N_Empties only when this is the + -- first time we mark the partial queue as NOT empty. + + if Is_Empty then + N_Empties := N_Empties - 1; + end if; + + Is_Empty := False; + + if Last = FIFO_Size then + Last := Default_Index_Value; + elsif Global_Data_Queue_Size > 0 then + Last := Big_Port_Index_Type'Succ (Last); + end if; + + Global_Data_Queue (Last + Offset - 1) := P; + pragma Debug (Put_Line + (Verbose, + CE + + ": Store_In: Global storage position: " + + Integer'Image (Last + Offset - 1))); + + end if; + + -- Update the oldest updated port value + declare + Frst : Integer := GH_Last; + Lst : constant Integer := GH_Last; + GDH : Big_Port_Type_Array renames Global_Data_History; + begin + + -- Add an entry in the history + if not Replace then + H_Increment_Last (GH_Last); + end if; + + if GH_First /= GH_Last then + -- Search the first entry with a higher urgency + -- and shift other entries + if Frst = Global_Data_Queue_Size + and then Urgencies (GDH (Frst)) < Urgency then + GDH (GH_Last) := GDH (Frst); + Frst := Frst - 1; + end if; + loop + if Urgencies (GDH (Frst)) >= Urgency then + exit; + end if; + GDH (Frst + 1) := GDH (Frst); + Frst := Frst - 1; + exit when (GH_First <= Lst + and then Frst < GH_First) + or else Frst < 1; + end loop; + + if Frst < 1 and then GH_First > Lst then + -- Continue the search from the end + Frst := Global_Data_Queue_Size; + if Urgencies (GDH (Frst)) < Urgency then + GDH (1 mod GDH'Length) := GDH (Frst); + Frst := Frst - 1; + loop + if Urgencies (GDH (Frst)) >= Urgency then + exit; + end if; + GDH (Frst + 1) := GDH (Frst); + Frst := Frst - 1; + exit when Frst < GH_First; + end loop; + end if; + end if; + end if; + + -- Insert the port of the event + if Frst = Global_Data_Queue_Size then + GDH (1 mod GDH'Size) := PT; + -- The modulo avoids warning when accessing + -- GDH (1) while Queue_Size = 0 + pragma Debug (Put_Line + (Verbose, + CE + + ": Store_In: Insert event" + + " in history at: " + + Integer'Image (1))); + else + GDH (Frst + 1) := PT; + pragma Debug (Put_Line + (Verbose, + CE + + ": Store_In: Insert event" + + " in history at: " + + Integer'Image (Frst + 1))); + end if; + end; + + end if; + + -- Update the most recent value corresponding to port PT + + Set_Most_Recent_Value (PT, P, T); + + pragma Debug (Put_Line + (Verbose, + CE + + ": Store_In: Enqueued Event [Data] message" + + " for port " + + Thread_Port_Images (PT))); + + -- Update the barrier + + Not_Empty := True; + end if; + end if; + + -- If this is a data port, we only override the + -- Most_Recent_Value corresponding to the port. + + if not Is_Event (P_Kind) then + pragma Debug (Put_Line + (Verbose, + CE + + ": Store_In: Storing Data message in DATA port " + + Thread_Port_Images (PT))); + + Set_Most_Recent_Value (PT, P, T); + + pragma Debug (Put_Line + (Verbose, + CE + + ": Store_In: Stored Data message in DATA port " + + Thread_Port_Images (PT))); + + end if; + end Store_In; + + --------------- + -- Store_Out -- + --------------- + + procedure Store_Out (P : Port_Stream_Entry; T : Time) is + Thread_Interface : constant Thread_Interface_Type + := Stream_To_Interface (P.Payload); + PT : Port_Type renames Thread_Interface.Port; + begin + pragma Debug (Put_Line + (Verbose, + CE + + ": Store_Out: Storing value for sending: port " + + Thread_Port_Images (PT))); + + -- Mark as valid for sending + + Value_Put (PT) := True; + + pragma Debug (Put_Line + (Verbose, + CE + + ": Store_Out: Value stored for sending: port " + + Thread_Port_Images (PT))); + + -- No need to go through the Set_ routine since we are + -- sending, not receiving. + + Most_Recent_Values (PT) := P; + Time_Stamps (PT) := T; -- overwritten below + -- Maxime workaround for backdoor accesses + Time_Stamps (PT) := Ada.Real_time.clock; + + end Store_Out; + + ----------- + -- Count -- + ----------- + + function Count (T : Port_Type) return Integer is + Is_Empty : Boolean renames Empties (T); + First : Integer renames Firsts (T); + Last : Integer renames Lasts (T); + P_Kind : Port_Kind renames Thread_Port_Kinds (T); + FIFO_Size : Integer renames Thread_FIFO_Sizes (T); + begin + -- This subprogram is called only when the thread has IN + -- ports. + + pragma Assert (Is_In (P_Kind)); + + if not Initialized (T) then + pragma Debug (Put_Line + (Verbose, + CE + + ": Count: Not initialized port: " + + Thread_Port_Images (T))); + + return -1; + + elsif Is_Empty then + pragma Debug (Put_Line + (Verbose, + CE + + ": Count: Empty FIFO for port " + + Thread_Port_Images (T))); + + return 0; + + elsif FIFO_Size = 0 then + pragma Debug (Put_Line + (Verbose, + CE + + ": Count: No FIFO for port " + + Thread_Port_Images (T))); + + return 0; + + else + pragma Debug (Put_Line + (Verbose, + CE + + ": Count: FIFO exists for port " + + Thread_Port_Images (T))); + + if Last >= First then + -- First configuration + + -- ------------------------------------------------------- + -- | |x|xxxxxxxxxxxxxxxxxxxxxxxxx|x| | + -- ------------------------------------------------------- + -- First Last + + return (Last - First) + 1; + + else + -- Second configuration + + -- ------------------------------------------------------- + -- |xxxxxxxxx|x| |x|xxxxxxxxxxxxx| + -- ------------------------------------------------------- + -- Last First + + return FIFO_Size - First + Last + 1; + end if; + end if; + end Count; + + --------------------------- + -- Get_Most_Recent_Value -- + --------------------------- + + function Get_Most_Recent_Value + (P : Port_Type) + return Port_Stream_Entry + is + First : Integer renames Firsts (P); + Last : Integer renames Lasts (P); + P_Kind : Port_Kind renames Thread_Port_Kinds (P); + FIFO_Size : Integer renames Thread_FIFO_Sizes (P); + Offset : Integer renames Thread_FIFO_Offsets (P); + T : constant Time := Clock; + S : Port_Stream_Entry; + begin + if Has_Event_Ports then + if Is_Event (P_Kind) then + pragma Debug + (Put_Line + (Verbose, + CE + + ": Get_Most_Recent_Value: event [data] port " + + Thread_Port_Images (P))); + + S := Most_Recent_Values (P); + end if; + end if; + if not Is_Event (P_Kind) then + if FIFO_Size = 1 then + -- Immediate connection + + pragma Debug + (Put_Line + (Verbose, + CE + + ": Get_Most_Recent_Value: data port " + + Thread_Port_Images (P) + + ". Immediate connection")); + pragma Debug + (Put_Line + (Verbose, + CE + + ": Get_Most_Recent_Value: First =" + + Integer'Image (First))); + pragma Debug + (Put_Line + (Verbose, + CE + + ": Get_Most_Recent_Value: Last = " + + Integer'Image (Last))); + pragma Debug + (Put_Line + (Verbose, + CE + + ": Get_Most_Recent_Value: Offset = " + + Integer'Image (Offset))); + pragma Debug + (Put_Line + (Verbose, + CE + + ": Get_Most_Recent_Value: Global_Data_Queue_Size = " + + Integer'Image (Global_Data_Queue_Size))); + + S := Global_Data_Queue (First + Offset - 1); + + pragma Debug + (Put_Line + (Verbose, + CE + + ": Get_Most_Recent_Value: Most recent value for" + + " data port " + + Thread_Port_Images (P) + + " got. Immediate connection")); + else + -- Delayed connection: The element indexed by First is + -- the oldest element and the element indexed by Last + -- is the most recent element. + + pragma Debug + (Put_Line + (Verbose, + CE + + ": Get_Most_Recent_Value: data port " + + Thread_Port_Images (P) + + ". Delayed connection")); + pragma Debug + (Put_Line + (Verbose, + CE + + ": Get_Most_Recent_Value: First = " + + Integer'Image (First))); + pragma Debug + (Put_Line + (Verbose, + CE + + ": Get_Most_Recent_Value: Last = " + + Integer'Image (Last))); + pragma Debug + (Put_Line + (Verbose, + " Offset = " + Integer'Image (Offset))); + pragma Debug + (Put_Line + (Verbose, + CE + + ": Get_Most_Recent_Value: Global_Data_Queue_Size = " + + Integer'Image (Global_Data_Queue_Size))); + + if Time_Stamps (P) <= T then + pragma Debug + (Put_Line + (Verbose, + CE + ": Get_Most_Recent_Value: Getting NEW value")); + + S := Global_Data_Queue (Last + Offset - 1); + else + pragma Debug + (Put_Line + (Verbose, + CE + ": Get_Most_Recent_Value: Getting OLD value")); + + S := Global_Data_Queue (First + Offset - 1); + end if; + + pragma Debug + (Put_Line + (Verbose, + CE + + ": Get_Most_Recent_Value: Most recent value" + + " for data port " + + Thread_Port_Images (P) + + " got. Delayed connection")); + end if; + end if; + + return S; + end Get_Most_Recent_Value; + + --------------------------- + -- Set_Most_Recent_Value -- + --------------------------- + + procedure Set_Most_Recent_Value + (P : Port_Type; + S : Port_Stream_Entry; + T : Time) + is + First : Big_Port_Index_Type renames Firsts (P); + Last : Big_Port_Index_Type renames Lasts (P); + P_Kind : Port_Kind renames Thread_Port_Kinds (P); + FIFO_Size : Integer renames Thread_FIFO_Sizes (P); + Offset : Integer renames Thread_FIFO_Offsets (P); + begin + if Global_Data_Queue_Size = 0 then + -- XXX Actually, if the queue has a null size, this + -- function is never called, hence we can exit + -- immediatly. This should be captured with a proper + -- pre-condition. We need this trap to avoid GNATProve + -- attempting to prove the code below in this particular + -- case. + return; + end if; + + if Has_Event_Ports then + if Is_Event (P_Kind) then + pragma Debug (Put_Line + (Verbose, + CE + + ": Set_Most_Recent_Value: event [data] port " + + Thread_Port_Images (P))); + + Most_Recent_Values (P) := S; + + pragma Debug (Put_Line + (Verbose, + CE + + ": Set_Most_Recent_Value: event [data] port " + + Thread_Port_Images (P) + + ". Done.")); + end if; + end if; + if not Is_Event (P_Kind) then + Time_Stamps (P) := T; + + if FIFO_Size = 1 then + -- Immediate connection + + pragma Debug + (Put_Line + (Verbose, + CE + + ": Set_Most_Recent_Value: data port " + + Thread_Port_Images (P) + + ". Immediate connection")); + pragma Debug + (Put_Line + (Verbose, + CE + + ": Set_Most_Recent_Value: First =" + + Integer'Image (First))); + pragma Debug + (Put_Line + (Verbose, + CE + + ": Set_Most_Recent_Value: Last = " + + Integer'Image (Last))); + pragma Debug + (Put_Line + (Verbose, + CE + + ": Set_Most_Recent_Value: Offset = " + + Integer'Image (Offset))); + pragma Debug + (Put_Line + (Verbose, + CE + + ": Set_Most_Recent_Value: Global_Data_Queue_Size = " + + Integer'Image (Global_Data_Queue_Size))); + + Global_Data_Queue (First + Offset - 1) := S; + + pragma Debug + (Put_Line + (Verbose, + CE + + ": Set_Most_Recent_Value: Most recent value" + + " for data port " + + Thread_Port_Images (P) + + " set. Immediate connection")); + else + -- Delayed connection: The element indexed by First must be + -- the oldest element and the element indexed by Last + -- is the most recent element. + -- XXX JH: why? + + pragma Debug + (Put_Line + (Verbose, + CE + + ": Set_Most_Recent_Value: data port " + + Thread_Port_Images (P) + + ". Delayed connection")); + pragma Debug + (Put_Line + (Verbose, + CE + + ": Set_Most_Recent_Value: First = " + + Integer'Image (First))); + pragma Debug + (Put_Line + (Verbose, + CE + + ": Set_Most_Recent_Value: Last = " + + Integer'Image (Last))); + pragma Debug + (Put_Line + (Verbose, + " Offset = " + Integer'Image (Offset))); + pragma Debug + (Put_Line + (Verbose, + CE + + ": Set_Most_Recent_Value: Global_Data_Queue_Size = " + + Integer'Image (Global_Data_Queue_Size))); + + Global_Data_Queue (First + Offset - 1) := + Global_Data_Queue (Last + Offset - 1); + Global_Data_Queue (Last + Offset - 1) := S; + + pragma Debug + (Put_Line + (Verbose, + CE + + ": Set_Most_Recent_Value: Most recent value" + + " for data port " + + Thread_Port_Images (P) + + " set. Delayed connection")); + end if; + end if; + end Set_Most_Recent_Value; + + -------------------- + -- Get_Time_Stamp -- + -------------------- + + function Get_Time_Stamp (P : Port_Type) return Time is + begin + pragma Debug (Put_Line + (Verbose, + CE + + ": Get_Time_Stamp: port " + + Thread_Port_Images (P))); + + return Time_Stamps (P); + end Get_Time_Stamp; + + ----------------------- + -- H_Increment_First -- + ----------------------- + + procedure H_Increment_First (F : in out Big_Port_Index_Type) is + begin + if Big_Port_Index_Type'Last > 0 then + if F < Big_Port_Index_Type'Last then + F := Big_Port_Index_Type'Succ (F); + else + F := Default_Index_Value; + end if; + + pragma Debug (Put_Line + (Verbose, + CE + + ": H_Increment_First: F =" + + Integer'Image (F))); + end if; + end H_Increment_First; + + ---------------------- + -- H_Increment_Last -- + ---------------------- + + procedure H_Increment_Last (L : in out Big_Port_Index_Type) is + begin + if Big_Port_Index_Type'Last > 0 then + if L < Big_Port_Index_Type'Last then + L := Big_Port_Index_Type'Succ (L); + else + L := Default_Index_Value; + end if; + + pragma Debug (Put_Line + (Verbose, + CE + + ": H_Increment_Last: L =" + + Integer'Image (L))); + end if; + end H_Increment_Last; + + -------- + -- CE -- + -------- + + function CE return String is + begin + return Entity_Image (Current_Entity); + end CE; + +end PolyORB_HI.Unprotected_Queue; diff --git a/src/polyorb_hi-unprotected_queue.ads b/src/polyorb_hi-unprotected_queue.ads new file mode 100644 index 0000000..8701612 --- /dev/null +++ b/src/polyorb_hi-unprotected_queue.ads @@ -0,0 +1,319 @@ +------------------------------------------------------------------------------ +-- -- +-- PolyORB HI COMPONENTS -- +-- -- +-- P O L Y O R B _ H I . U N P R O T E C T E D _ Q U E U E -- +-- -- +-- S p e c -- +-- -- +-- Copyright (C) 2014 ESA & ISAE. -- +-- -- +-- PolyORB HI is free software; you can redistribute it and/or modify it -- +-- under terms of the GNU General Public License as published by the Free -- +-- Software Foundation; either version 2, or (at your option) any later. -- +-- PolyORB HI is distributed in the hope that it will be useful, but -- +-- WITHOUT ANY WARRANTY; without even the implied warranty of -- +-- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General -- +-- Public License for more details. You should have received a copy of the -- +-- GNU General Public License distributed with PolyORB HI; see file -- +-- COPYING. If not, write to the Free Software Foundation, 51 Franklin -- +-- Street, Fifth Floor, Boston, MA 02111-1301, USA. -- +-- -- +-- As a special exception, if other files instantiate generics from this -- +-- unit, or you link this unit with other files to produce an executable, -- +-- this unit does not by itself cause the resulting executable to be -- +-- covered by the GNU General Public License. This exception does not -- +-- however invalidate any other reasons why the executable file might be -- +-- covered by the GNU Public License. -- +-- -- +-- PolyORB-HI/Ada is maintained by the TASTE project -- +-- (taste-users@lists.tuxfamily.org) -- +-- -- +------------------------------------------------------------------------------ + +with Ada.Unchecked_Conversion; +With Ada.Real_Time; +With System; + +with PolyORB_HI_Generated.Deployment; + +with PolyORB_HI.Errors; +with PolyORB_HI.Messages; +with PolyORB_HI.Port_Kinds; +with PolyORB_HI.Streams; + +generic + + type Port_Type is (<>); + -- This should be an enumeration type that represent the port list + -- of a given thread. + + type Integer_Array is array (Port_Type) of Integer; + -- An array type to specify the port FIFO sizes and urgencies. + + type Port_Kind_Array is array (Port_Type) of Port_Kinds.Port_Kind; + -- An array type to specify the kind of each port. + + type Port_Image_Array is array (Port_Type) of + PolyORB_HI_Generated.Deployment.Port_Sized_String; + -- An array type to specify the image of each port. + + type Address_Array is array (Port_Type) of System.Address; + -- An array to specify a list of arrays of various sizes. + + type Overflow_Protocol_Array is array (Port_Type) of + Port_Kinds.Overflow_Handling_Protocol; + -- An array to specify the overflow_handling_protocol of each port + + type Thread_Interface_Type (Port : Port_Type) is private; + -- This is a discriminated record type that represents a thread + -- port. The discriminant of this type must have a default value + -- so that the type size can be computed at compile time. + + Current_Entity : in PolyORB_HI_Generated.Deployment.Entity_Type; + -- Indicate the thread for which this package has been + -- instantiated. + + Thread_Port_Kinds : in Port_Kind_Array; + -- For each port, a value indicates the kind and the orientation + -- of the port. + + Has_Event_Ports : in Boolean; + -- True if Thread_Port_Kinds contains Event or Event Data ports + + Thread_Port_Images : in Port_Image_Array; + -- For each port, a string indicates the image of the port. + + Thread_Fifo_Sizes : in Integer_Array; + -- This array gives for each port the FIFO size (depending on the + -- port nature or on an AADL property associated to the port. FIFO + -- size for IN DATA ports is either 1 (immediate connection) or 2 + -- (delayed connection). In this case (data ports), the value must + -- not be interpreted as a FIFO size but rather a way to support + -- delayed connections. By convention, FIFO size for all OUT ports + -- must be set to -1 by the code generator. + + Thread_Fifo_Offsets : in Integer_Array; + -- This array holds an incremental value of the queue size for + -- each IN [event] [data] port. For each IN [event] [data] port, the + -- corresponding offset value is 1 + the sum of all queue sized of + -- the ports declared before it. For other port kinds, the value + -- must be 0. We give this array as a generic formal instead of + -- computing it in this package to guarantee an O(1) access time + -- for queue elements. + + Thread_Overflow_Protocols : in Overflow_Protocol_array; + -- This array gives for each port the Overflow_Handling_Protocol + -- depending on the AADL property associated to the port. + + Urgencies : in Integer_Array; + -- This array gives for each port the Urgency depending on the + -- AADL property associated to the port. + + Global_Data_Queue_Size : in Integer; + -- The sum of all IN [event] [data] port queue sizes. Giving this + -- value as a generic formal in spite of the possibility of + -- deducing it from Thread_Fifo_Sizes is done to guarantee static + -- allocation of the global message queue of the thread. + + N_Destinations : in Integer_Array; + -- For each OUT port, we give the number of destinations. This + -- will be used to know the length of each element of the array + -- below. + + Destinations : in Address_Array; + -- For each OUT port, we give the address of an constant + -- Entity_Type array containing the list of all the destination of + -- the port. For IN ports, we give Null_Address. + + with procedure Marshall + (R : Thread_Interface_Type; + M : in out PolyORB_Hi.Messages.Message_Type); + -- A procedure that marshalls a Thread port content into a message. + + with function Next_Deadline return Ada.Real_Time.Time; + -- To indicate when does the next deadline of the thread occur (in + -- absolute time). + +package PolyORB_HI.Unprotected_Queue is + + use Ada.Real_Time; + use PolyORB_HI_Generated.Deployment; + use PolyORB_HI.Streams; + + -- The types and the routines below give a flexible way to handle + -- Thread_Interface_Type variables and to store them in arrays. + + type Port_Stream is + new PolyORB_HI.Streams.Stream_Element_Array + (1 .. Thread_Interface_Type'Size / 8); + + type Port_Stream_Entry is record + From : Entity_Type; + Payload : Port_Stream; + end record; + -- A couple of a message and its sender + + N_Ports : constant Integer := + Port_Type'Pos (Port_Type'Last) - Port_Type'Pos (Port_Type'First) + 1; + -- Number of ports in the thread + + function Interface_To_Stream is + new Ada.Unchecked_Conversion (Thread_Interface_Type, Port_Stream); + function Stream_To_Interface is + new Ada.Unchecked_Conversion (Port_Stream, Thread_Interface_Type); + + function CE return String; + pragma Inline (CE); + -- Shortcut to Entity_Image (Current_Entity) + + type Port_Stream_Array is array (Port_Type) of Port_Stream_Entry; + + subtype Big_Port_Index_Type is Integer range 0 .. Global_Data_Queue_Size; + + Default_Index_Value : constant Big_Port_Index_Type + := (if Global_Data_Queue_Size > 0 then 1 else 0); + + type Big_Port_Stream_Array is + array (Big_Port_Index_Type) of Port_Stream_Entry; + type Big_Port_Type_Array is array (Big_Port_Index_Type) of Port_Type; + -- FIXME: We begin by 0 although the 0 position is unused. We do + -- this to avoid compile time warning. After this package is + -- deeply tested, begin by 1 and disable Index_Check and + -- Range_Check for the Big_Port_Stream_Array type. + + type Port_Index_Array is array (Port_Type) of Big_Port_Index_Type; + -- An array type to specify the port FIFO sizes and urgencies. + + procedure H_Increment_First (F : in out Big_Port_Index_Type); + procedure H_Increment_Last (L : in out Big_Port_Index_Type); + pragma Inline (H_Increment_First); + pragma Inline (H_Increment_Last); + -- Cyclic incrementation and decrementation of F or L within the + -- 1..Global_Data_Queue_Size range. + + type Boolean_Array is array (Port_Type) of Boolean; + type Time_Array is array (Port_Type) of Time; + + procedure Read_Event + (P : out Port_Type; + Valid : out Boolean; + Not_Empty : Boolean); + -- Same as 'Wait_Event' but without blocking. Valid is set to + -- False if there is nothing to receive. + + procedure Dequeue + (T : Port_Type; P : out Port_Stream_Entry; Not_Empty : out Boolean); + -- Dequeue a value from the partial FIFO of port T. If there is + -- no enqueued value, return the latest dequeued value. + + function Read_In (T : Port_Type) return Port_Stream_Entry; + -- Read the oldest queued value on the partial FIFO of IN port + -- T without dequeuing it. If there is no queued value, return + -- the latest dequeued value. + + function Read_Out (T : Port_Type) return Port_Stream_Entry; + -- Return the value put for OUT port T. + + function Is_Invalid (T : Port_Type) return Boolean; + -- Return True if no Put_Value has been called for this port + -- since the last Set_Invalid call. + + procedure Set_Invalid (T : Port_Type); + -- Set the value stored for OUT port T as invalid to impede its + -- future sending without calling Put_Value. This procedure is + -- generally called just after Read_Out. However we cannot + -- combine them in one routine because we need Read_Out to be a + -- function and functions cannot modify protected object + -- states. + + procedure Store_In + (P : Port_Stream_Entry; T : Time; Not_Empty : out Boolean); + -- Stores a new incoming message in its corresponding + -- position. If this is an event [data] incoming message, then + -- stores it in the queue, updates its most recent value and + -- unblock the barrier. Otherwise, it only overrides the most + -- recent value. T is the time stamp associated to the port + -- P. In case of data ports with delayed connections, it + -- indicates the instant from which the data of P becomes + -- deliverable. + + procedure Store_Out (P : Port_Stream_Entry; T : Time); + -- Store a value of an OUT port to be sent at the next call to + -- Send_Output and mark the value as valid. + + function Count (T : Port_Type) return Integer; + -- Return the number of pending messages on IN port T. + + function Get_Time_Stamp (P : Port_Type) return Time; + -- Return the time stamp associated to port T + + -- The following are accessors to some internal data of the event queue + + function Get_Most_Recent_Value (P : Port_Type) return Port_Stream_Entry; + procedure Set_Most_Recent_Value + (P : Port_Type; + S : Port_Stream_Entry; + T : Time); + -- The protected object contains also an array to store the + -- values of received IN DATA ports as well as the most recent + -- value of IN EVENT DATA. For OUT port, the value is the + -- message to be send when Send_Output is called. In case of an + -- event data port, we do not use the 2 elements of the array + -- to store most recent values because there is no delayed + -- connections for event data ports. + +private + Global_Data_Queue : Big_Port_Stream_Array; + -- The structure of the buffer is as follows: + + -- ---------------------------------------------------------------- + -- | Q1 | Q2 | Q3 | ... | Qn | + -- ---------------------------------------------------------------- + -- O1 O2 O3 O4 ... On + + -- 'On' is the offset associated to IN [event] [data] port n, + -- given from the generic formal, Thread_FIFO_Offsets. This + -- guarantees an O(1) access and storage time of a given + -- element in the global queue. Intrinsically, the global table + -- is a concatenation of circular arrays each one corresponding + -- to a port queue. + + Firsts : Port_Index_Array := (Port_Type'Range => Default_Index_Value); + Lasts : Port_Index_Array := (Port_Type'Range => 0); + -- Used for IN [event] [data] ports to navigate in the global + -- queue. For IN DATA ports, in case of immediate connection + -- only the 'Lasts' value is relevant and it is 0 or 1, in case + -- of a delayed connection both values are relevant. + + Empties : Boolean_Array := (Port_Type'Range => True); + -- Indicates whether each port-FIFO is empty or not + + Global_Data_History : Big_Port_Type_Array; + GH_First : Big_Port_Index_Type := Default_Index_Value; + GH_Last : Big_Port_Index_Type := 0; + -- This contains, in an increasing chronological order the IN + -- EVENT ports that have a pending event. Example (P_1, P_3, + -- P_1, P_2, P_3) means that the oldest pending message is + -- received on P_1 then on P_3, then on P_1 again and so on... + + -- FIXME: Add N_Ports to the array size to handle the case the + -- thread has an IN event [data] port with a FIFO size equal to + -- zero which is not supported yet. + + Most_Recent_Values : Port_Stream_Array; + Time_Stamps : Time_Array; + + Initialized : Boolean_Array := (Port_Type'Range => False); + -- To indicate whether the port ever received a data (or an + -- event). + + Value_Put : Boolean_Array := (Port_Type'Range => False); + -- To indicate whether the OUT port values have been set in + -- order to be sent. + + N_Empties : Integer := N_Ports; + -- Number of empty partial queues. At the beginning, all the + -- queues are empty. + +end PolyORB_HI.Unprotected_Queue; -- GitLab