Commit 24924186 authored by Maxime Perrotin's avatar Maxime Perrotin

Adding project files

parent 11ed0fdb
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
OpenGEODE - A tiny SDL Editor for TASTE
This module generates Ada code from SDL process models.
The Ada code is compliant with the TASTE interfaces, and is
using the ASN.1 "Space-Certified" compiler for data type definition.
(See TASTE documentation for more information)
The design is very flexible and can be used as basis for
generating other backends.
Entry point:
The AST of the model that is parsed is described in ogAST.py
A Visitor Pattern using Python's "singledispatch" mechanism is used
to go through the AST and generate code for each SDL construct.
There is a single function called "generate", decorated with the
singledispatch mechanism, that needs to be called to generate the code
of any AST element.
Most functions return two values: "code" and "local_decl", containing
a set of statements and a set of local variables (that can be later
placed anywhere in the code).
Functions corresponding to the AST entries that are related to
expressions return three values: "code", "ada_string" and "local_decl".
The additional "ada_string" value is the usable string that corresponds
to the result of the expression evaluation.
For example, take the SDL statement "OUTPUT hello(a+5)"
This results (in TASTE terminology) in calling the required interface
called "hello" and passing a parameter of an ASN.1 type (say MyInteger).
The parameter is always passed by reference.
It is therefore necessary to build a temporary variable to hold the result
of the "a+5" expression.
In this example, the "generate" function will return:
local_decl = ["tmp01 : MyInteger;"]
(The template backend can then place it wherever appropriate)
and code = ["tmp01 := a + 5;", "hello(tmp01);"]
(The template will then do a '\n'.join(code) - and add indents, etc.)
To know about "tmp01" and generate the code "hello(tmp01);",
the function will recursively call "generate" and
pass a+5 as parameter. The call will return the tuple:
local_decl = ["tmp01 : MyInteger;"]
code = ["tmp01 := a + 5;"]
ada_string = "tmp01"
This design allows to have any level of complexity in the embedded
expression in a way that is easy to handle (adding constructs with
this pattern is straightforward, once the generate function for each AST
entry is properly implemented).
Copyright (c) 2012-2013 European Space Agency
Designed and implemented by Maxime Perrotin
Contact: maxime.perrotin@esa.int
"""
import logging
from singledispatch import singledispatch
import ogAST
LOG = logging.getLogger(__name__)
__all__ = ['generate']
# reference to the ASN.1 Data view and to the visible variables (in scope)
TYPES = None
VARIABLES = {}
LOCAL_VAR = {}
# List of output signals and procedures
OUT_SIGNALS = []
PROCEDURES = []
INNER_PROCEDURES = []
@singledispatch
def generate(ast):
''' Generate the code for an item of the AST '''
raise TypeError('[AdaGenerator] Unsupported AST construct')
# Processing of the AST
@generate.register(ogAST.Process)
def _process(process):
''' Generate the code for a complete process (AST Top level) '''
process_name = process.processName
VARIABLES.update(process.variables)
global TYPES
TYPES = process.dataview
del OUT_SIGNALS[:]
del PROCEDURES[:]
del INNER_PROCEDURES[:]
OUT_SIGNALS.extend(process.output_signals)
PROCEDURES.extend(process.procedures)
INNER_PROCEDURES.extend(process.content.inner_procedures)
LOG.info('Generating Ada code for process ' + str(process_name))
# Generate the code to declare process-level variables
process_level_decl = []
for var_name, (var_type, def_value) in process.variables.viewitems():
if def_value:
# Expression must be a ground expression, i.e. must not
# require temporary variable to store computed result
dst, dstr, dlocal = generate(def_value)
assert not dst and not dlocal, 'DCL: Expecting a ground expression'
process_level_decl.append(
'l_{n} : aliased asn1Scc{t}{default};'.format(
n=var_name,
t=var_type.ReferencedTypeName.replace('-','_'),
default=' := ' + dstr if def_value else ''))
# Add the process states list to the process-level variables
states_decl = 'type states is ('
states_decl += ', '.join(process.mapping.iterkeys()) + ');'
process_level_decl.append(states_decl)
process_level_decl.append('state : states := START;')
# Add function allowing to trace current state as a string
process_level_decl.append('function get_state return String;')
process_level_decl.append('pragma export(C, get_state, "{}_state");'
.format(process_name))
# Add the declaration of the runTransition procedure
process_level_decl.append('procedure runTransition(trId: Integer);')
# Generate the code of the start transition:
start_transition = ['begin']
start_transition.append('runTransition(0);')
mapping = {}
# Generate the code for the transitions in a mapping input-state
input_signals = [sig['name'] for sig in process.input_signals]
# Add timers to the mapping
input_signals.extend(process.timers)
for input_signal in input_signals:
mapping[input_signal] = {}
for state_name, input_symbols in process.mapping.viewitems():
if state_name != 'START':
for i in input_symbols:
if input_signal.lower() in (inp.lower() for
inp in i.inputlist):
mapping[input_signal][state_name] = i
# Generate the TASTE template
try:
asn1_modules = '\n'.join(['with {dv};\nuse {dv};'.format(
dv=dv.replace('-', '_'))
for dv in process.asn1Modules])
except TypeError:
asn1_modules = '-- No ASN.1 data types are used in this model'
taste_template = ['''\
-- This file was generated automatically: DO NOT MODIFY IT !
with System.IO;
use System.IO;
{dataview}
with adaasn1rtl;
use adaasn1rtl;
with Interfaces;
use Interfaces;
package body {process_name} is'''.format(process_name=process_name,
dataview=asn1_modules)]
# Generate the source file (.ads) header
ads_template = ['''\
-- This file was generated automatically: DO NOT MODIFY IT !
{dataview}
package {process_name} is'''.format(process_name=process_name,
dataview=asn1_modules)]
# Generate the the code of the procedures
inner_procedures_code = []
for proc in process.content.inner_procedures:
proc_code, proc_local = generate(proc)
process_level_decl.extend(proc_local)
inner_procedures_code.extend(proc_code)
# Generate the code for the process-level variable declarations
taste_template.extend(process_level_decl)
# Add the code of the procedures definitions
taste_template.extend(inner_procedures_code)
# Generate the code for each input signal (provided interface) and timers
for signal in process.input_signals + [
{'name': timer.lower()} for timer in process.timers]:
if signal.get('name', 'START') == 'START':
continue
pi_header = 'procedure {sig_name}'.format(sig_name=signal['name'])
param_name = signal.get('param_name') or 'MISSING_PARAM_NAME'
# Add (optional) PI parameter (only one is possible in TASTE PI)
if 'type' in signal:
typename = signal['type'].ReferencedTypeName.replace('-', '_')
pi_header += '({pName}: access asn1Scc{pType})'.format(
pName=param_name, pType=typename)
# Add declaration of the provided interface in the .ads file
ads_template.append('-- Provided interface "' + signal['name'] + '"')
ads_template.append(pi_header + ';')
pi_header += ' is'
taste_template.append(pi_header)
taste_template.append('begin')
taste_template.append('case state is')
for state in process.mapping.viewkeys():
if state == 'START':
continue
taste_template.append('when {state} =>'.format(state=state))
input_def = mapping[signal['name']].get(state)
if input_def:
for inp in input_def.parameters:
# Assign the (optional and unique) parameter
# to the corresponding process variable
taste_template.append('l_{inp} := {tInp}.all;'.format(
inp=inp, tInp=param_name))
# Execute the correponding transition
if input_def.transition:
taste_template.append('runTransition({idx});'.format(
idx=input_def.transition_id))
else:
taste_template.append('null;')
else:
taste_template.append('null;')
taste_template.append('when others =>')
taste_template.append('null;')
taste_template.append('end case;')
taste_template.append('end {sig_name};'.format(
sig_name=signal['name']))
taste_template.append('\n')
# for the .ads file, generate the declaration of the required interfaces
# output signals are the asynchronous RI - only one parameter
for signal in process.output_signals:
ri_header = 'procedure {sig_name}'.format(sig_name=signal['name'])
param_name = signal.get('param_name') or 'MISSING_PARAM_NAME'
# Add (optional) RI parameter
if 'type' in signal:
typename = signal['type'].ReferencedTypeName.replace('-', '_')
ri_header += '({pName}: access asn1Scc{pType})'.format(
pName=param_name, pType=typename)
ads_template.append('-- Required interface "' + signal['name'] + '"')
ads_template.append(ri_header + ';')
ads_template.append('pragma import(C, {sig}, "{proc}_RI_{sig}");'
.format(sig=signal['name'], proc=process_name))
# for the .ads file, generate the declaration of the external procedures
for proc in process.procedures:
ri_header = 'procedure {sig_name}'.format(sig_name=proc.inputString)
params = []
for param in proc.fpar:
typename = param['type'].ReferencedTypeName.replace('-', '_')
params.append('{par[name]}: access asn1Scc{partype}'.format(
par=param, partype=typename))
if params:
ri_header += '(' + ';'.join(params) + ')'
ads_template.append('-- Sync required interface "' + proc.inputString)
ads_template.append(ri_header + ';')
ads_template.append('pragma import(C, {sig}, "{proc}_RI_{sig}");'
.format(sig=proc.inputString, proc=process_name))
# for the .ads file, generate the declaration of timers set/reset functions
for timer in process.timers:
ads_template.append(
'-- Timer {} SET and RESET functions'.format(timer))
ads_template.append('procedure SET_{}(val: access asn1SccT_UInt32);'
.format(timer))
ads_template.append(
'pragma import(C, SET_{timer}, "{proc}_RI_set_{timer}");'
.format(timer=timer, proc=process_name))
ads_template.append('procedure RESET_{};'.format(timer))
ads_template.append(
'pragma import(C, RESET_{timer}, "{proc}_RI_reset_{timer}");'
.format(timer=timer, proc=process_name))
taste_template.append('procedure runTransition(trId: Integer) is')
# Generate the code for all transitions
code_transitions = []
local_decl_transitions = []
# Transform inner labels of both floating labels and transitions
# into new floating labels, so that they get generated in a separate
# section of the Ada code, where they are in the scope of everybody
for idx in xrange(len(process.content.floating_labels)):
for new_floating in find_labels(
process.content.floating_labels[idx].transition):
process.content.floating_labels.append(new_floating)
for proc_tr in process.transitions:
for new_floating in find_labels(proc_tr):
process.content.floating_labels.append(new_floating)
code_tr, tr_local_decl = generate(proc_tr)
code_transitions.append(code_tr)
local_decl_transitions.extend(tr_local_decl)
# Generate code for the floating labels
code_labels = []
for label in process.content.floating_labels:
code_label, label_decl = generate(label)
local_decl_transitions.extend(label_decl)
code_labels.extend(code_label)
# Declare the local variables needed by the transitions in the template
decl = ['{line}'.format(line=l)
for l in local_decl_transitions]
taste_template.extend(decl)
taste_template.append('begin')
# Generate the switch-case on the transition id
taste_template.append('case trId is')
for idx, val in enumerate(code_transitions):
taste_template.append('when {idx} =>'.format(idx=idx))
val = ['{line}'.format(line=l) for l in val]
if val:
taste_template.extend(val)
else:
taste_template.append('null;')
taste_template.append('when others =>')
taste_template.append('null;')
taste_template.append('end case;')
# Add the code for the floating labels
taste_template.extend(code_labels)
taste_template.append('end runTransition;')
taste_template.append('\n')
# Code of the function allowing to trace current state
taste_template.append('function get_state return String is')
taste_template.append('begin')
taste_template.append("return states'Image(state);")
taste_template.append('end get_state;')
taste_template.append('\n')
taste_template.extend(start_transition)
taste_template.append('end {process_name};'
.format(process_name=process_name))
ads_template.append('end {process_name};'
.format(process_name=process_name))
with open(process_name + '.adb', 'w') as ada_file:
ada_file.write('\n'.join(format_ada_code(taste_template)))
with open(process_name + '.ads', 'w') as ada_file:
ada_file.write('\n'.join(format_ada_code(ads_template)))
def write_statement(param, newline):
''' Generate the code for the special "write" operator '''
code = []
string = ''
local = []
basic_type = find_basic_type(param.exprType) or {}
type_kind = basic_type.kind
if type_kind.endswith('StringType'):
if isinstance(param, ogAST.PrimStringLiteral):
# Raw string
string =('"' +
param.value[1:-1].replace('"', "'") + '"')
else:
# XXX Cannot print an octet string like that...
code, string, local = generate(param)
elif type_kind in ('IntegerType', 'RealType',
'BooleanType', 'Integer32Type'):
code, string, local = generate(param)
if type_kind == 'IntegerType':
cast = "Interfaces.Integer_64"
elif type_kind == 'RealType':
cast = 'Long_Float'
elif type_kind == 'BooleanType':
cast = 'Boolean'
elif type_kind == 'Integer32Type':
cast = 'Integer'
string = "{cast}'Image({s})".format(cast=cast, s=string)
else:
error = ('Unsupported parameter in write call ' +
param.inputString)
LOG.error(error)
raise TypeError(error)
code.append('Put{line}({string});'.format(
line='_Line' if newline else '',
string=string))
return code, string, local
@generate.register(ogAST.Output)
@generate.register(ogAST.ProcedureCall)
def _call_external_function(output):
''' Generate the code of a set of output or procedure call statement '''
code = []
local_decl = []
# Add the traceability information
code.extend(traceability(output))
for out in output.output:
signal_name = out['outputName']
if signal_name.lower() in ('write', 'writeln'):
# special built-in SDL procedure for printing strings
# supports printing of native types (int, real, bool)
# but not yet complex ASN.1 structures (sequence/seqof/choice)
for param in out['params'][:-1]:
stmts, _, local = write_statement(param, newline=False)
code.extend(stmts)
local_decl.extend(local)
for param in out['params'][-1:]:
# Last parameter - add newline if necessary
stmts, _, local = write_statement(param, newline=True if
signal_name.lower() == 'writeln' else False)
code.extend(stmts)
local_decl.extend(local)
continue
elif signal_name.lower() == 'reset_timer':
# built-in operator for resetting timers. param = timer name
param, = out['params']
p_code, p_id, p_local = generate(param)
code.extend(p_code)
local_decl.extend(p_local)
code.append('RESET_{};'.format(p_id))
continue
elif signal_name.lower() == 'set_timer':
# built-in operator for setting a timer: SET(1000, timer_name)
timer_value, timer_id = out['params']
t_code, t_val, t_local = generate(timer_value)
p_code, p_id, p_local = generate(timer_id)
code.extend(t_code)
code.extend(p_code)
local_decl.extend(t_local)
local_decl.extend(p_local)
# Use a temporary variable to store the timer value
tmp_id = 'tmp' + str(out['tmpVars'][0])
local_decl.append('{} : aliased asn1SccT_Uint32;'.format(tmp_id))
code.append('{tmp} := {val};'.format(tmp=tmp_id, val=t_val))
code.append("SET_{timer}({value}'access);"
.format(timer=p_id, value=tmp_id))
continue
proc, out_sig = None, None
try:
out_sig, = [sig for sig in OUT_SIGNALS
if sig['name'].lower() == signal_name.lower()]
except ValueError:
# Not an output, try if it is an external procedure
try:
out_sig, = [sig for sig in PROCEDURES
if sig.inputString.lower() == signal_name.lower()]
except ValueError:
# Not external? Must be an inner procedure then.
# otherwise the parser would have barked
proc, = [sig for sig in INNER_PROCEDURES
if sig.inputString.lower() == signal_name.lower()]
if out_sig:
list_of_params = []
for idx, param in enumerate(out.get('params') or []):
param_direction = 'in'
try:
# If it is an output, there is a single parameter
param_type = out_sig['type']
except TypeError:
# Else if it is a procedure, get the type
param_type = out_sig.fpar[idx]['type']
param_direction = out_sig.fpar[idx]['direction']
typename = param_type.ReferencedTypeName.replace('-', '_')
p_code, p_id, p_local = generate(param)
code.extend(p_code)
local_decl.extend(p_local)
# Create a temporary variable for input parameters only
if param_direction == 'in':
tmp_id = out['tmpVars'][idx]
local_decl.append('tmp{idx} : aliased asn1Scc{oType};'
.format(idx=tmp_id, oType=typename))
code.append('tmp{idx} := {p_id};'.format(
idx=tmp_id, p_id=p_id))
list_of_params.append("tmp{idx}'access".
format(idx=out['tmpVars'][idx]))
else:
# Output parameters - no need for a temp variable
list_of_params.append("{var}'access".format(var=p_id))
if list_of_params:
code.append('{RI}({params});'.format(
RI=out['outputName'], params=', '.join(list_of_params)))
else:
code.append('{RI};'.format(RI=out['outputName']))
else:
# inner procedure call
list_of_params = []
for param in (out.get('params') or []):
p_code, p_id, p_local = generate(param)
code.extend(p_code)
local_decl.extend(p_local)
# no need to use temporary variables, we are in pure Ada
list_of_params.append(p_id)
if list_of_params:
code.append('{proc}({params});'.format(
proc=proc.inputString,
params=', '.join(list_of_params)))
else:
code.append('{};'.format(proc.inputString))
return code, local_decl
@generate.register(ogAST.TaskAssign)
def _task_assign(task):
''' A list of assignments in a task symbol '''
code, local_decl = [], []
ada_string = ''
if task.comment:
code.extend(traceability(task.comment))
for expr in task.elems:
code.extend(traceability(expr))
code_assign, ada_string, decl_assign = generate(expr)
code.extend(code_assign)
code.append(ada_string[1:-1] + ';')
local_decl.extend(decl_assign)
return code, local_decl
@generate.register(ogAST.TaskInformalText)
def _task_informal_text(task):
''' Generate Ada comments for informal text '''
code = []
if task.comment:
code.extend(traceability(task.comment))
code.extend(['-- ' + text.replace('\n', '\n-- ') for text in task.elems])
return code, []
@generate.register(ogAST.TaskForLoop)
def _task_forloop(task):
'''
Return the code corresponding to a for loop. Two forms are possible:
for x in range ([start], stop [, step])
for x in iterable (a SEQUENCE OF)
'''
stmt, local_decl = [], []
if task.comment:
stmt.extend(traceability(task.comment))
stmt.extend(traceability(task))
for loop in task.elems:
if loop['range']:
start_str, stop_str = '0', ''
if loop['range']['start']:
start_stmt, start_str, start_local = generate\
(loop['range']['start'])
local_decl.extend(start_local)
stmt.extend(start_stmt)
# ASN.1 Integers are 64 bits - we need to convert to 32 bits
if isinstance(loop['range']['start'], ogAST.PrimInteger):
start_str = 'Integer({})'.format(start_str)
if loop['range']['step'] == 1:
start_str += '..'
stop_stmt, stop_str, stop_local = generate(loop['range']['stop'])
local_decl.extend(stop_local)
stmt.extend(stop_stmt)
if isinstance(loop['range']['stop'], ogAST.PrimInteger):
stop_str = 'Integer({})'.format(stop_str)
if loop['range']['step'] == 1:
stmt.append(
'for {it} in {start}{stop} loop'
.format(it=loop['var'], start=start_str, stop=stop_str))
else:
# Step is not directly supported in Ada, we need to use 'while'
stmt.extend(['declare',
'{it} : Integer := {start};'
.format(it=loop['var'],
start=start_str),
'',
'begin',
'while {it} < {stop} loop'.format(it=loop['var'],
stop=stop_str)])
else:
# case of form: FOR x in SEQUENCE OF
elem_type = loop['type'].ReferencedTypeName.replace('-', '_')
list_stmt, list_str, list_local = generate(loop['list'])
basic_type = find_basic_type(loop['list'].exprType)
range_cond = "{}.Data'Range".format(list_str)\
if basic_type.Min == basic_type.Max\
else "1..{}.Length".format(list_str)
stmt.extend(list_stmt)
local_decl.extend(list_local)
stmt.extend(['declare',
'{it} : asn1Scc{it_ty};'.format(it=loop['var'],
it_ty=elem_type),
'',
'begin',
'for {it}_idx in {rc} loop'.format(it=loop['var'],
rc=range_cond),
'{it} := {var}.Data({it}_idx);'.format(it=loop['var'],
var=list_str)])
try:
code_trans, local_trans = generate(loop['transition'])
if local_trans:
stmt.append('declare')
stmt.extend(local_trans)
stmt.append('')
stmt.append('begin')
stmt.extend(code_trans)
if local_trans:
stmt.append('end;')
except AttributeError:
stmt.append('null;')
if loop['range'] and loop['range']['step'] != 1:
stmt.append('{it} := {it} + {step};'.format(it=loop['var'],
step=loop['range']['step']))
stmt.append('end loop;')
if (loop['range'] and loop['range']['step'] != 1) or loop['list']:
stmt.append('end;')
return stmt, local_decl
@generate.register(ogAST.PrimVariable)
def _primary_variable(prim):
''' Single variable reference '''
return [], 'l_{}'.format(prim.value[0]), []
@generate.register(ogAST.PrimPath)
def _prim_path(primaryId):
'''
Return the Ada string of a PrimaryId element list (path)
cases: a => 'l_a' (reference to a variable)
a_timer => 'a_timer' (reference to a timer)
a!b => a.b (field of a structure)
a!b if a is a CHOICE => TypeOfa_b_get(a)
a(Expression) => a(ExpressionSolver) (array index)
Expression can be complex (if-then-else-fi..)
'''
ada_string = ''
stmts, local_decl = [], []
# If first element is not a variable (can be a timer) do not add prefix
sep = 'l_' if find_var(primaryId.value[0]) else ''
sub_id = []
for pr_id in primaryId.value:
if type(pr_id) is not dict:
if pr_id.lower() == 'length':
special_op = 'Length'
continue
elif pr_id.lower() == 'present':
special_op = 'ChoiceKind'
continue
elif pr_id.lower() == 'abs':
special_op = 'Abs'
continue
special_op = ''
parent_kind, parent_typename = path_type(sub_id)
sub_id.append(pr_id)
if parent_kind == 'ChoiceType':
ada_string = ('asn1Scc{typename}_{p_id}_get({ada_string})'
.format(typename=parent_typename,
p_id=pr_id, ada_string=ada_string))