Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
TASTE
asn1-value-editor
Commits
8fb23b54
Commit
8fb23b54
authored
Jul 13, 2015
by
Maxime Perrotin
Browse files
Add Overture handler
parent
333a0407
Changes
4
Hide whitespace changes
Inline
Side-by-side
asn1_value_editor/asn1_value_editor.py
View file @
8fb23b54
...
...
@@ -525,80 +525,12 @@ class asn1Editor(QTreeView):
ASN1_AST: full AST generated by ASN1SCC
'''
var
=
self
.
getVariable
(
root
).
popitem
()[
1
]
def
reach
(
field
,
orig
,
idx
=
True
):
''' Helper: move swig pointer to the next field, and optionaly
index (if idx=True)
Inputs: field is a string with optional index (e.g. "a[0]")
orig is the swig pointer
idx: set to true if you want the index to be reached
'''
split
=
field
.
strip
(
']'
).
split
(
'['
)
ptr
=
getattr
(
orig
,
split
[
0
])
if
split
else
orig
if
len
(
split
)
>
1
and
idx
:
# SEQOF index
ptr
=
ptr
[
int
(
split
[
1
])]
def
rec
(
inp
,
outp
,
sort
):
''' Recursively fill up the value '''
if
sort
.
kind
==
'ReferenceType'
:
sort
=
ASN1_AST
[
sort
.
ReferencedTypeName
]
if
isinstance
(
inp
,
list
):
# SEQUENCE OF
# get the path to the sequence of
_
,
params
,
path
=
outp
.
GetState
()
if
path
:
path
=
path
.
strip
(
'.'
).
split
(
'.'
)
for
i
in
range
(
len
(
inp
)):
outp
.
Reset
()
for
each
in
path
:
reach
(
each
,
outp
)
# Follow the ASN.1 type in the AST from ASN1SCC
rec
(
inp
[
i
],
outp
[
i
],
sort
.
type
.
type
)
if
sort
.
type
.
Min
!=
sort
.
type
.
Max
:
outp
.
Reset
()
for
each
in
path
:
reach
(
each
,
outp
)
# The ASN1SCC AST only knows if the list has a fixed length
outp
.
SetLength
(
len
(
inp
))
elif
isinstance
(
inp
,
(
int
,
float
,
bool
)):
outp
.
Set
(
inp
)
elif
isinstance
(
inp
,
dict
):
if
'Enum'
in
inp
:
# Get proper enum id from the ASN1SCC AST
enum_id
=
sort
.
type
.
EnumValues
[
inp
[
'Enum'
].
replace
(
'_'
,
'-'
)].
EnumID
val
=
getattr
(
ASN1Swig
.
DV
,
enum_id
)
outp
.
Set
(
val
)
elif
'Choice'
in
inp
:
child_name
=
inp
[
'Choice'
]
ch_ty
=
sort
.
type
.
Children
[
child_name
.
replace
(
'_'
,
'-'
)]
enum_val
=
getattr
(
ASN1Swig
.
DV
,
ch_ty
.
EnumID
)
outp
.
kind
.
Set
(
enum_val
)
rec
(
inp
[
child_name
],
getattr
(
outp
,
child_name
.
replace
(
'-'
,
'_'
)),
ch_ty
.
type
)
else
:
# SEQUENCE
# get the path to the sequence
_
,
params
,
path
=
outp
.
GetState
()
if
path
:
path
=
path
.
strip
(
'.'
).
split
(
'.'
)
for
field
,
data
in
inp
.
viewitems
():
outp
.
Reset
()
for
each
in
path
:
# Reach the path, including indexes
reach
(
each
,
outp
)
# Then get the field itself
reach
(
field
.
replace
(
'-'
,
'_'
),
outp
)
field_ty
=
sort
.
type
.
Children
[
field
.
replace
(
'_'
,
'-'
)]
rec
(
data
,
outp
,
field_ty
.
type
)
# move back to original path
outp
.
Reset
()
if
len
(
path
):
reach
(
path
[
0
],
outp
)
else
:
self
.
log
.
error
(
'Unsupported type in to_asn1scc_swig'
)
rec
(
var
,
dest
,
sort
)
vn
.
valueNotationToSwig
(
gser
=
None
,
var
=
var
,
dest
=
dest
,
ASN1Swig
=
ASN1Swig
,
sort
=
sort
,
ASN1_AST
=
ASN1_AST
)
def
sendTC
(
self
):
...
...
asn1_value_editor/sdlHandler.py
View file @
8fb23b54
...
...
@@ -34,6 +34,8 @@ import asn1_value_editor
from
standalone_editor
import
asn1sccToasn1ValueEditorTypes
import
vn
import
resources
import
vdmHandler
try
:
import
opengeode
except
ImportError
:
...
...
@@ -235,6 +237,8 @@ class sdlHandler(QObject):
# Placeholder to keep a list of properties to be checked at runtime
self
.
properties
=
{}
self
.
prop_dll
=
None
# VDM handler instance, when needed for external procedures calls
self
.
vdm
=
None
@
property
def
dll
(
self
):
...
...
@@ -636,6 +640,22 @@ class sdlHandler(QObject):
else
:
interface
[
'out'
].
append
((
spec
,
as_bytes
,
asn1_instance
))
# HERE: make the function call
vdm_cls
=
getattr
(
procedure
,
"vdm_instance"
,
None
)
if
vdm_cls
:
inp_as_vdm
=
[]
for
_
,
asn1_instance
in
interface
[
'in'
]:
inp_as_vdm
.
append
(
asn1_instance
.
GSER
())
outp
=
self
.
vdm
.
call_function
(
vdm_cls
,
name
,
','
.
join
(
inp_as_vdm
))
print
"CALLED VDM. Result ="
,
outp
# Use the pyside intermediate form to get the SWIG data
spec
,
_
,
out_asn1_inst
=
interface
[
'out'
][
0
]
# assuming 1 return parameter
vn
.
valueNotationToSwig
(
gser
=
outp
,
dest
=
out_asn1_inst
,
sort
=
spec
[
'type'
],
ASN1_AST
=
self
.
proc
.
dataview
,
ASN1Swig
=
ASN1
)
#spec_in, asn1_inst_in = interface['in'][0]
#spec_out, as_bytes, asn1_inst_out = interface['out'][0]
#asn1_inst_out.Set(asn1_inst_in.Get() + 1)
...
...
@@ -731,6 +751,17 @@ class sdlHandler(QObject):
if
"#vdm"
in
cmt
:
print
(
u
'Procedure {} in VDM... Connecting to Overture'
.
format
(
each
.
inputString
))
if
not
self
.
vdm
:
self
.
vdm
=
vdmHandler
.
vdmHandler
()
tokens
=
each
.
comment
.
inputString
.
split
(
"#"
)
classname
=
None
for
tok
in
tokens
:
if
tok
.
startswith
(
"classname"
):
#TODO: catch exception
classname
=
tok
.
split
(
"="
)[
1
]
inst
=
self
.
vdm
.
instanciate_class
(
classname
)
each
.
vdm_instance
=
inst
elif
"#c"
in
cmt
:
print
(
u
'Procedure {} in C... Loading DLL'
.
format
(
each
.
inputString
))
...
...
asn1_value_editor/vdmHandler.py
0 → 100644
View file @
8fb23b54
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# pylint: disable=C0302
"""
ASN.1 Value Editor - VDM / Overture handler
Connect to Overture to forward calls to VDM functions from other
parts of the system.
The interface to Overture is done using sockets, and functions
calls pass parameters using the VDM notation:
integers: "42"
reals : "42.0"
setof : "{1, 2, 3}"
sequence of: not supported, use setof
sequence, choice: not supported
The SET OF and numerical data are similar to the ASN.1 Value notation
Support for other types will be added lated
Copyright (c) 2012-2015 European Space Agency
Designed and implemented by Maxime Perrotin
Contact: maxime.perrotin@esa.int
License: LGPL
"""
import
socket
import
os
import
ctypes
import
itertools
from
functools
import
partial
from
itertools
import
chain
from
standalone_editor
import
asn1sccToasn1ValueEditorTypes
import
vn
try
:
import
dataview_uniq_asn
as
ASN1
except
ImportError
:
ASN1
=
None
class
vdmHandler
(
object
):
'''
Class managing the Overture interface to execute VDM code
'''
# Configurable: max size of a packet sent through the socket
MAX_LEN
=
256
def
__init__
(
self
,
host
=
"127.0.0.1"
,
port
=
1234
):
''' Create a socket and connect to Overture '''
self
.
socket
=
socket
.
socket
(
socket
.
AF_INET
,
socket
.
SOCK_STREAM
)
self
.
socket
.
connect
((
host
,
port
))
self
.
classes
=
[]
# Unique id incremented when an instance of a class is created
self
.
inst
=
0
def
send_message
(
self
,
msg
):
''' Send a command (text string) to Overture using the socket '''
assert
self
.
socket
,
"Not connected to Overture!"
sent
=
self
.
socket
.
send
(
msg
+
"
\n
"
)
if
sent
==
0
:
print
'sent = 0'
raise
RuntimeError
(
"Lost connection to Overture"
)
def
receive_message
(
self
):
''' Wait for a message from Overture '''
msg
=
[]
bytes_recvd
=
0
while
bytes_recvd
<
self
.
MAX_LEN
:
char
=
self
.
socket
.
recv
(
1
)
if
not
char
:
raise
RuntimeError
(
"Lost connection to Overture"
)
msg
.
append
(
char
)
bytes_recvd
+=
1
if
char
in
(
'
\n
'
,
'
\r
'
):
break
return
''
.
join
(
msg
)
def
instanciate_class
(
self
,
classname
):
''' Create a VDM class instance '''
instance
=
"{cls}_{id}"
.
format
(
cls
=
classname
.
lower
(),
id
=
self
.
inst
)
self
.
send_message
(
"create {} := new {}()"
.
format
(
instance
,
classname
))
ans
=
self
.
receive_message
()
if
not
ans
.
startswith
(
"OK"
):
raise
RuntimeError
(
"Class creation failed: "
+
ans
)
self
.
inst
+=
1
return
instance
def
call_function
(
self
,
instance
,
func
,
in_param
):
''' Call a VDM function from class instance '''
self
.
send_message
(
"print {}.{}({})"
.
format
(
instance
,
func
,
in_param
))
ans
=
self
.
receive_message
()
if
not
ans
.
startswith
(
"OK"
):
raise
RuntimeError
(
"Function execution failure "
+
ans
)
return
ans
[
2
:].
strip
()
asn1_value_editor/vn.py
View file @
8fb23b54
...
...
@@ -145,7 +145,6 @@ BitStringLiteral.setParseAction(lambda s, l, t: ''.join([chr(int(t[1][i:i+9], 2)
def
fromValueNotationToPySide
(
varName
,
string
):
''' Return a variable compatible with the ASN.1 Editor from a GSER string'''
# TODO: catch Exceptions
try
:
return
{
varName
:
value
.
parseString
(
string
,
True
)[
0
]}
except
ParseException
as
err
:
...
...
@@ -197,6 +196,98 @@ def toASN1ValueNotation(val):
result
=
str
(
val
)
# INTEGER and REAL
return
result
def
valueNotationToSwig
(
gser
,
dest
,
sort
,
ASN1Swig
,
ASN1_AST
,
var
=
None
):
''' Parse a GSER value and fill a SWIG variable with it
Inputs:
gser : input GSER string
dest : output SWIG instance to be filled
ASN1Swig : python module containing SWIG DV access
sort: ASN1 typename, with dashes, no underscores
ASN1_AST : AST generated by ASN1SCC
var : optional already pyside-converted GSER string
Outputs:
none - "dest" is modified by this function
'''
#var = var or fromValueNotationToPySide("input", gser).popitem()[1]
var
=
var
or
value
.
parseString
(
gser
,
True
)[
0
]
def
reach
(
field
,
orig
,
idx
=
True
):
''' Helper: move swig pointer to the next field, and optionaly
index (if idx=True)
Inputs: field is a string with optional index (e.g. "a[0]")
orig is the swig pointer
idx: set to true if you want the index to be reached
'''
split
=
field
.
strip
(
']'
).
split
(
'['
)
ptr
=
getattr
(
orig
,
split
[
0
])
if
split
else
orig
if
len
(
split
)
>
1
and
idx
:
# SEQOF index
ptr
=
ptr
[
int
(
split
[
1
])]
def
rec
(
inp
,
outp
,
sort
):
''' Recursively fill up the value '''
if
sort
.
kind
==
'ReferenceType'
:
sort
=
ASN1_AST
[
sort
.
ReferencedTypeName
]
if
isinstance
(
inp
,
list
):
# SEQUENCE OF
# get the path to the sequence of
_
,
params
,
path
=
outp
.
GetState
()
if
path
:
path
=
path
.
strip
(
'.'
).
split
(
'.'
)
for
i
in
range
(
len
(
inp
)):
outp
.
Reset
()
for
each
in
path
:
reach
(
each
,
outp
)
# Follow the ASN.1 type in the AST from ASN1SCC
rec
(
inp
[
i
],
outp
[
i
],
sort
.
type
.
type
)
if
sort
.
type
.
Min
!=
sort
.
type
.
Max
:
outp
.
Reset
()
for
each
in
path
:
reach
(
each
,
outp
)
# The ASN1SCC AST only knows if the list has a fixed length
outp
.
SetLength
(
len
(
inp
))
elif
isinstance
(
inp
,
(
int
,
float
,
bool
)):
outp
.
Set
(
inp
)
elif
isinstance
(
inp
,
dict
):
if
'Enum'
in
inp
:
# Get proper enum id from the ASN1SCC AST
enum_id
=
sort
.
type
.
EnumValues
[
inp
[
'Enum'
].
replace
(
'_'
,
'-'
)].
EnumID
val
=
getattr
(
ASN1Swig
.
DV
,
enum_id
)
outp
.
Set
(
val
)
elif
'Choice'
in
inp
:
child_name
=
inp
[
'Choice'
]
ch_ty
=
sort
.
type
.
Children
[
child_name
.
replace
(
'_'
,
'-'
)]
enum_val
=
getattr
(
ASN1Swig
.
DV
,
ch_ty
.
EnumID
)
outp
.
kind
.
Set
(
enum_val
)
rec
(
inp
[
child_name
],
getattr
(
outp
,
child_name
.
replace
(
'-'
,
'_'
)),
ch_ty
.
type
)
else
:
# SEQUENCE
# get the path to the sequence
_
,
params
,
path
=
outp
.
GetState
()
if
path
:
path
=
path
.
strip
(
'.'
).
split
(
'.'
)
for
field
,
data
in
inp
.
viewitems
():
outp
.
Reset
()
for
each
in
path
:
# Reach the path, including indexes
reach
(
each
,
outp
)
# Then get the field itself
reach
(
field
.
replace
(
'-'
,
'_'
),
outp
)
field_ty
=
sort
.
type
.
Children
[
field
.
replace
(
'_'
,
'-'
)]
rec
(
data
,
outp
,
field_ty
.
type
)
# move back to original path
outp
.
Reset
()
if
len
(
path
):
reach
(
path
[
0
],
outp
)
else
:
# Unsupported type
pass
rec
(
var
,
dest
,
sort
)
if
__name__
==
'__main__'
:
''' Test application '''
import
sys
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment