Commit 64e09225 authored by Maxime Perrotin's avatar Maxime Perrotin
Browse files

Add interface to DMT/asn2dataModel

parent bbb345a1
......@@ -142,6 +142,9 @@ The background pattern was downloaded from www.subtlepatterns.com
Changelog
=========
1.5.0 (07/2016):
- Asn1scc API added to interface with DMT/asn2dataModel
1.4.5 (07/2016):
- Context variable was not prefixed properly
- Callback function for timers use 64bits integer
......
......@@ -4,7 +4,7 @@
"""
Python API for the ASN1Scc compiler
Copyright (c) 2013 European Space Agency
Copyright (c) 2013-2016 European Space Agency
Designed and implemented by Maxime Perrotin
......@@ -25,18 +25,23 @@ from PySide.QtCore import QProcess
LOG = logging.getLogger(__name__)
terminal_formatter = logging.Formatter(fmt="[%(levelname)s] %(message)s")
handler_console = logging.StreamHandler()
handler_console.setFormatter(terminal_formatter)
LOG.addHandler(handler_console)
# global needed to store the imported module and list of modules ever loaded
AST = {}
# Same for the modules imported by the call to asn2DataModel
ASN2DM = {}
try:
from enum import Enum
except ImportError:
raise ImportError('Enum module not found. use sudo pip install enum34')
raise ImportError('Enum module not found. Run pip install --user enum34')
__all__ = ['ASN1', 'parse_asn1']
__version__ = '0.1'
class ASN1(Enum):
......@@ -50,6 +55,21 @@ class ASN1(Enum):
RenameOnlyConflicting = 1
RenameAllEnumerants = 2
def waitfor_qprocess(qprocess, name):
''' Wait the the execution of a QProcess instance
Raise an exception if anything went wrong, otherwise return stdout '''
if not qprocess.waitForStarted():
raise TypeError('Could not start ' + name)
if not qprocess.waitForFinished():
raise TypeError('Execution time out : ' + name)
exitcode = qprocess.exitCode()
err = qprocess.readAllStandardError()
std = qprocess.readAllStandardOutput()
if exitcode != 0:
raise TypeError(name + ' error (exit code = {}) - {}'
.format(exitcode, str(err)))
return std
def parse_asn1(*files, **options):
''' Call the ASN.1 parser on a number of files, and return the module
......@@ -57,20 +77,12 @@ def parse_asn1(*files, **options):
This function uses QProcess to launch the ASN.1 compiler because
the subprocess module from Python has issues on the Windows platform
'''
global AST
ast_version = options.get('ast_version', ASN1.UniqueEnumeratedNames)
rename_policy = options.get('rename_policy', ASN1.NoRename)
flags = options.get('flags', [ASN1.AstOnly])
assert isinstance(ast_version, ASN1)
assert isinstance(rename_policy, ASN1)
assert isinstance(flags, list)
#if os.name == 'posix' and hasattr(sys, 'frozen'):
# Frozen Linux binaries are expected to use the frozen ASN.1 compiler
# No: there are issues with freezing the .NET applications - discard
# asn1exe = 'asn1scc'
#else:
# asn1exe = 'asn1.exe'
path_to_asn1scc = spawn.find_executable('asn1.exe')
if not path_to_asn1scc:
......@@ -104,27 +116,75 @@ def parse_asn1(*files, **options):
LOG.debug(os.getcwd())
LOG.debug(binary + ' ' + ' '.join(args))
asn1scc.start(binary, args)
if not asn1scc.waitForStarted():
raise TypeError('Could not start ASN.1 Compiler')
if not asn1scc.waitForFinished():
raise TypeError('Execution of ASN.1 Compiler timed out')
exitcode = asn1scc.exitCode()
result = asn1scc.readAllStandardError()
if exitcode != 0:
raise TypeError('ASN.1 Compiler Error (exit code = {}) - {}'
.format(exitcode, str(result)))
_ = waitfor_qprocess(asn1scc, "ASN.1 Compiler")
if filename in AST.viewkeys():
# Re-import module if it was already loaded
ast = AST[filename]
reload(ast)
else:
ast = importlib.import_module(filename)
AST[filename] = ast
return ast
def asn2dataModel(*files):
''' Call asn1DataModel, including the Makefile.python and return
the imported module "name_of_dataview_asn.py"
From this module it is possible to create native Asn1scc instances of
ASN.1 data types, and to access to DV.py, which contains constants
such as the _PRESENT fields for choice selectors '''
assert len(files) > 0
# 1) Create a temporary directory for the output
tempdir = tempfile.mkdtemp()
sys.path.append(tempdir)
concat_prefix = 'dataview_uniq'
concat_path = os.path.join(tempdir, concat_prefix)
# 2) Concat all input files to the output directory
cat_bin = spawn.find_executable('cat')
args = list(files)
cat = QProcess()
LOG.debug(os.getcwd())
LOG.debug(cat_bin + ' ' + ' '.join(args))
cat.start(cat_bin, args)
merged = waitfor_qprocess(cat, 'Merge dataviews')
with open(concat_path + '.asn', 'wt') as merged_file:
merged_file.write(merged)
# 3) Run asn2dataModel
asn2dm_bin = spawn.find_executable('asn2dataModel')
args = ['-toPython', '-o', tempdir, concat_path + '.asn']
asn2dm = QProcess()
LOG.debug(os.getcwd())
LOG.debug(asn2dm_bin + ' ' + ' '.join(args))
asn2dm.start(asn2dm_bin, args)
waitfor_qprocess(asn2dm, 'DMT tool "asn2dataModel"')
# 4) call make -f Makefile.python to build the .so
make_bin = spawn.find_executable('make')
args = ['-f', 'Makefile.python']
make = QProcess()
make.setWorkingDirectory(tempdir)
LOG.debug(os.getcwd())
LOG.debug(make_bin + ' ' + ' '.join(args))
make.start(make_bin, args)
waitfor_qprocess(make, 'make -f Makefile.python')
if concat_prefix in ASN2DM.viewkeys():
# Re-import module if it was already loaded
asn1mod = ASN2DM[concat_prefix]
reload(asn1mod)
else:
if filename in AST.viewkeys():
# Re-import module if it was already loaded
ast = AST[filename]
reload(ast)
else:
ast = importlib.import_module(filename)
AST[filename] = ast
return ast
asn1mod = importlib.import_module(concat_prefix + '_asn')
ASN2DM[concat_prefix] = asn1mod
return asn1mod
if __name__ == '__main__':
LOG.setLevel(logging.DEBUG)
try:
ast = parse_asn1('dataview-uniq.asn',
ast_version=ASN1.NoParameterizedTypes,
......
......@@ -134,7 +134,7 @@ except ImportError:
__all__ = ['opengeode', 'SDL_Scene', 'SDL_View', 'parse']
__version__ = '1.4.5'
__version__ = '1.5.0'
if hasattr(sys, 'frozen'):
# Detect if we are running on Windows (py2exe-generated)
......
TASTE-Dataview DEFINITIONS ::=
BEGIN
IMPORTS T-Int32, T-UInt32, T-Int8, T-UInt8, T-Boolean FROM TASTE-BasicTypes;
MyData ::= SEQUENCE(SIZE(1..10)) OF SEQUENCE {
a SEQUENCE (SIZE(10)) OF INTEGER (0..255),
b ENUMERATED { enum-one, enum-two }
}
Type-SingleInt ::= T-UInt8
Type-SingleBool ::= T-Boolean
Type-SingleReal ::= REAL (-5.0 .. 5.0)
Type-SingleEnum ::= ENUMERATED { enum-one(4), enum-two(2) }
Type-SingleString ::= OCTET STRING (SIZE(0..20))
Type-SingleChoice ::= CHOICE {
choice-a Type-SingleInt,
choice-b Type-SingleBool
}
Type-Seq ::= SEQUENCE {
item-a Type-SingleReal,
item-b Type-SingleChoice,
item-c MyData,
item-d Type-SingleString
}
END
TASTE-BasicTypes DEFINITIONS ::=
BEGIN
-- Set of TASTE predefined basic types
T-Int32 ::= INTEGER (-2147483648 .. 2147483647)
T-UInt32 ::= INTEGER (0 .. 4294967295)
T-Int8 ::= INTEGER (-128 .. 127)
T-UInt8 ::= INTEGER (0 .. 255)
T-Boolean ::= BOOLEAN
END
Other-Dataview DEFINITIONS ::=
BEGIN
Ahah ::= BOOLEAN
END
#!/usr/bin/env python
from opengeode import Asn1scc as asn1scc
asn1scc.LOG.setLevel(asn1scc.logging.DEBUG)
def test_1():
''' Test the call to asn2dataModel with one asn1 file '''
result = asn1scc.asn2dataModel('data/dv1.asn')
assert result is not None
def test_2():
''' Test the call to asn2dataModel with two asn1 files '''
result = asn1scc.asn2dataModel('data/dv1.asn', 'data/dv2.asn')
assert result is not None
if __name__ == '__main__':
for name, value in dict(globals()).viewitems():
if name.startswith('test_'):
print('---- Executing {} ----'.format(name))
value()
print('---- Done - {} ----\n'.format(name))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment