Commit 4dc61ebb authored by Maxime Perrotin's avatar Maxime Perrotin
Browse files

Test interaction with the simulation lib

Send events and read back system state
parent ce4e4a5e
......@@ -3,32 +3,64 @@
import ctypes
import observer_asn
from opengeode import Asn1scc
from asn1_value_editor import vn
import observer_asn
dll = ctypes.CDLL ("libsimulator.so")
# Get the ASN.1 AST from python.stg
ASN1_AST=Asn1scc.parse_asn1(["observer.asn"],
rename_policy=Asn1scc.ASN1.RenameOnlyConflicting,
ast_version=Asn1scc.ASN1.UniqueEnumeratedNames,
flags=[Asn1scc.ASN1.AstOnly]).types
# it is possible to run the model checker:
#dll.mc_run_exhaustive()
evt = observer_asn.Observable_Event()
def get_system_state():
''' Return the ASN.1 ctypes value of the current system state '''
get_state = dll.global_state
class ReturnPointer(ctypes.Structure): pass
ReturnHandle = ctypes.POINTER(ReturnPointer)
get_state.restype = ReturnHandle
state_ptr = get_state()
# DONT USE THE FOLLOWING, IT DOES NOT WORK, USE SetData INSTEAD
#system_state = observer_asn.System_State (state_ptr)
system_state = observer_asn.System_State()
system_state.SetData(state_ptr)
return system_state
def send_event(gser_event : str) -> None:
''' Send an event (input or output) to the TASTE system '''
evt = observer_asn.Observable_Event()
# Convert the GSER input into the C instance of the type (evt)
vn.valueNotationToCTypes(gser_event,
evt,
ASN1_AST["Observable-Event"].type,
observer_asn,
ASN1_AST)
evt.Reset()
# Send the event to the DLL
dll.send_event(evt._ptr)
# out events's effect is to create a corresponding input event in the receivers's queue
testEvent = "output-event: {source simulator-gui, dest orchestrator, event simulator-gui: msg-out: pulse: {p1 4}}"
send_event(testEvent)
print(get_system_state().GSER())
# in events are calling the actual PI, so it is better (out events make sense if observers are called between
# the out and in)
testEvent = "input-event: {source simulator-gui, dest orchestrator, event orchestrator: msg-in: pulse: {p1 4}}"
send_event(testEvent)
print(get_system_state().GSER())
#queue = dll.global_event_queue (type Events_Ty)
# this would possibly work
queue_ptr = ctypes.c_int.in_dll(dll, "global_event_queue")
asn1_queue = observer_asn.Events_Ty (queue_ptr)
# this sequence works:
from opengeode import Asn1scc
ASN1_AST=Asn1scc.parse_asn1(["observer.asn"], rename_policy=Asn1scc.ASN1.RenameOnlyConflicting, ast_version=Asn1scc.ASN1.UniqueEnumeratedNames, flags=[Asn1scc.ASN1.AstOnly]).types
from asn1_value_editor import vn
import observer_asn
vn.valueNotationToCTypes(testEvent, evt, ASN1_AST["Observable-Event"].type, observer_asn, ASN1_AST)
evt.Reset()
dll.send_event(evt._ptr)
print(asn1_queue.GSER())
# get global system state
state_ptr = ctypes.c_int.in_dll(dll, "global_state")
system_state = observer_asn.System_State (state_ptr)
print(system_state.GSER())
#queue_ptr = ctypes.c_int.in_dll(dll, "global_event_queue")
#asn1_queue = observer_asn.Events_Ty()
#asn1_queue.SetData(queue_ptr)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment