Commit 54e44775 authored by Maxime Perrotin's avatar Maxime Perrotin

Adding ASN.1 value editor

parents
#!/usr/bin/env python2
# -*- coding:Utf-8 -*-
# Adapted from :
# Brandon Thomson
# http://stackoverflow.com/questions/384076/how-can-i-make-the-python-logging-output-to-be-colored
import logging
class ColorFormatter( logging.Formatter ):
FORMAT = ( "[%(levelname)-19s] " "$BOLD%(filename)-20s$RESET" "%(message)s" )
# FORMAT = ( "[%(levelname)-18s] " "($BOLD%(filename)s$RESET:%(lineno)d) " "%(message)s" )
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
RESET_SEQ = "\033[0m"
COLOR_SEQ = "\033[1;%dm"
BOLD_SEQ = "\033[1m"
COLORS = {
'WARNING' : YELLOW,
'INFO' : GREEN,
'DEBUG' : WHITE,
'CRITICAL' : RED,
'ERROR' : RED
}
def __init__( self, use_color ):
self.use_color = use_color
msg = self.formatter_msg( self.FORMAT )
logging.Formatter.__init__( self, msg )
def formatter_msg( self, msg ):
if( self.use_color ):
msg = msg.replace( "$RESET", self.RESET_SEQ ).replace( "$BOLD", self.BOLD_SEQ )
else:
msg = msg.replace( "$RESET", "" ).replace( "$BOLD", "" )
return msg
def format( self, record ):
levelname = record.levelname
if( self.use_color and levelname in self.COLORS ):
fore_color = 30 + self.COLORS[ levelname ]
levelname_color = self.COLOR_SEQ % fore_color + levelname + self.RESET_SEQ
record.levelname = levelname_color
return logging.Formatter.format( self, record )
%module InterfaceEnum
%{
#include "interface_enum.h"
%}
#include "interface_enum.h"
/*
+----------------+
+------------+ +--------------+ | MscDocument |
| MscASTRoot | | MscFile | |----------------|
|------------|-------->|--------------|-------->| Name |
| files | | mscDocuments | | mscDefinitions |
+------------+ +--------------+ +----------------+
|
+---------------------------+ v
| MscInstance | +---------------+
|---------------------------| | MscDefinition |
| Name | |---------------|
| CleanName (computed) |<----| Name |
| events | | instances |
| conditions (computed) | +---------------+
| incomingEvents (computed) |
+---------------------------+
|
v
+---------------------------+
| MscEvent |
|---------------------------|--------.
.----| uniqueEventData | |
| | (computed: 0xDE,0xAD,...) | |
| +---------------------------+ |
| | v
v | +---------------------------------+
+--------------+ | | MscOutgoingEvent |
| MscCondition | | |---------------------------------|
|--------------| | | nameOfPI (router_put_tc) |
| bShared | | | variableValue (tc TC-T ::= ...) |
| Name | | | nameOrEnv (mygui_GUI) |
+--------------+ | | lineNo (in .msc) |
| | typeName (TC_T) |
| +---------------------------------+
v
+---------------------------------+
| MscIncomingEvent |
|---------------------------------|
| nameOfPI (gui_send_tc) |
| variableValue (tm TM_T ::= ...) |
| nameOrEnv (mygui_GUI) |
| lineNo (msc) |
| typeName (TM_T) |
+---------------------------------+
*/
group MSCtoPython;
main(files) ::= <<
#!/usr/bin/env python
#
# Automatically generated Python sequence chart (MSC) implementation
import os
import sys
import signal
import Queue
taste_inst = os.popen('taste-config --prefix').readlines()[0].strip()
sys.path.append(taste_inst+'/share/asn1-editor')
from Scenario import Scenario, PollerThread
from PySide.QtCore import QCoreApplication, Qt
from udpcontroller import tasteUDP
<files:{file|<PrintFile(file)>}; separator="\n">
>>
PrintFile(file) ::= <<
# Generated due to "<file.filename>"
<file.mscDocuments:{doc|<PrintMscDocument(doc)>}; separator="\n">
>>
PrintMscDocument(doc) ::= <<
# From the section: MSCDOCUMENT <doc.name>
<doc.mscDefinitions:{msc|<PrintMscDefinition(msc)>}>
>>
PrintMscDefinition(msc) ::= <<
<msc.instances:{inst|
<inst.conditions:{cnd|
g_<cnd.Name>_synchronize = <inst.conditions.Count>
def WaitFor_<cnd.Name>():
pass
#g_lockSynchronize.acquire()
#global g_<cnd.Name>_synchronize
#g_<cnd.Name>_synchronize -= 1
#g_lockSynchronize.release()
#while g_<cnd.Name>_synchronize != 0:
# time.sleep(1)
}>
<PrintMscInstance(inst)>}; separator="\n">
<msc.instances:{inst|
def runScenario(pipe_in=None, pipe_out=None, udpController=None):
# Queue for getting scenario status
log = Queue.Queue()
if udpController:
<inst.CleanName> = Exercise_<inst.CleanName>(log, name='Scenario')
udpController.slots.append(<inst.CleanName>.msq_q)
<inst.CleanName>.wait()
udpController.slots.remove(<inst.CleanName>.msg_q)
return 0 # <inst.CleanName>.status
else:
# Use old-style message queue
poller = PollerThread()
<inst.CleanName> = Exercise_<inst.CleanName>(log, name='Scenario')
poller.slots.append(<inst.CleanName>.msg_q)
poller.start()
<inst.CleanName>.start()
# Wait and log messages from both scenarii
while True:
try:
scenario, severity, msg = log.get(block=False)
except Queue.Empty:
pass
else:
log.task_done()
try:
# If called from the GUI, send log through pipe
pipe_out.send((scenario, severity, msg))
except AttributeError:
print('[{level}] {name} - {msg}'.format
(level=severity, name=scenario, msg=msg))
if severity == 'ERROR' or msg == 'END':
# Stop execution on first error or completed scenario
try:
pipe_out.send(('All', 'COMMAND', 'END'))
except AttributeError:
<inst.CleanName>.stop()
poller.stop()
return
try:
if pipe_out.poll():
cmd = pipe_out.recv()
if cmd == 'STOP':
<inst.CleanName>.stop()
poller.stop()
return
except AttributeError:
pass
}>
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal.SIG_DFL)
udpController = None
if '--udp' in sys.argv:
# Create UDP Controller with default IP/Port values (127.0.0.1:7755:7756)
udpController = tasteUDP()
QCoreApplication(sys.argv)
sys.exit(runScenario(udpController))
>>
PrintExecutions(file) ::= <<
<file.mscDocuments:{doc|
<doc.mscDefinitions:{msc|
<msc.instances:{inst|<inst.CleanName> = Exercise_<inst.CleanName>()
poller.slots.append(<inst.CleanName>.msg_q)}>}>}>
>>
PrintMscInstance(inst) ::= <<
@Scenario
def Exercise_<inst.CleanName>(queue):
'''<inst.CleanName> processing'''
<inst.events:{evt|<PrintEvent(evt)>}; separator="\n">
return 0
>>
EventMap ::= [
"MscCondition":"PrintCondition",
"MscIncomingEvent":"PrintIncoming",
"MscOutgoingEvent":"PrintOutgoing"
]
PrintEvent(evt) ::= <<
<(EventMap.(evt.EventKind))(evt=evt)>
>>
PrintCondition(evt) ::= <<
WaitFor_<evt.Name>()
>>
PrintIncoming(evt) ::= <<
try:
queue.expectMsg('<evt.nameOfPI>', '<evt.variableValue>', lineNo=<evt.lineNo>, ignoreOther=False)
except TypeError as err:
raise
>>
PrintOutgoing(evt) ::= <<
queue.sendMsg('<evt.nameOfPI>', '<evt.variableValue>', lineNo=<evt.lineNo>)
>>
all: compile-all
compile-all:
@pyside-rcc asn1_value_editor.qrc -o resources.py
install: compile-all
@mkdir -p asn1_value_editor
@for f in ColorFormatter.py TasteMainWindow.py asn1_python.py \
errCode.py mscHandler.py plotmanager.py tasteplot.py \
vn.py Scenario.py __init__.py asn1_value_editor.py gui.py \
mscStreamingScene.py standalone_editor.py \
udpcontroller.py resources.py; \
do echo Installing $$f && cp $$f asn1_value_editor ; \
done
@python setup.py install --record install.record
clean:
@rm *.pyc
.PHONY: all compile-all install clean
This directory contains the code for the ASN.1 Value editor:
- the Pyside B mapper, to be placed in the DMT/aadl2glueC directory
- the code for the automatically-generated Python GUIs
- the standalone ASN.1 value editor, which can be used from the command line (or from another python module):
- code for communication using UDP packets instead of the TASTE message queues
The auto-generated GUIs make use of the speedometer and of the MSC Editor/Viewer.
Usage for the standalone editor:
standalong_editor -a <DataModel.asn> -t <Type> [-d <Default Value>]
You can try it with the T-POS type that is in TPos.asn file under the ./test directory:
./standalone_editor.py -a test/TPos.asn -t T-POS
Or with a default value (make sure you use quote):
./standalone_editor.py -a test/TPos.asn -t T-POS -d 'myIntSetOf: { 1, 2, 3, 4, 1 }'
(c) European Space Agency
Author: Maxime Perrotin
#!/usr/bin/env python
from PySide.QtCore import QThread, Signal
#from PySide.QtNetwork import QUdpSocket, QHostAddress
import Queue
import datamodel
import sys
import os
import importlib
import time
import DV
try:
from PythonController import(OpenMsgQueueForReading, GetMsgQueueBufferSize,
RetrieveMessageFromQueue)
except ImportError:
print 'ERROR importing PythonController'
TASTE_INST = os.popen('taste-config --prefix').readlines()[0].strip()
sys.path.append(TASTE_INST + '/share/asn1-editor')
class Scenario(QThread, object):
#done = Signal(int)
''' Generic class handling the execution of an MSC scenario '''
def __init__(self, func):
QThread.__init__(self)
self.msg_q = Queue.Queue()
self.modules = {}
self.scenario = func
# Control the end of the scenario
self.stop_requested = False
# Load all TM and TC backends, once for all
backends = datamodel.tm.keys()
backends.extend(datamodel.tc.keys())
for backend in backends:
try:
mod = reload(backend + '_backend')
except (TypeError, NameError):
try:
mod = importlib.import_module(backend + '_backend')
except ImportError:
self.log.put((self.name, 'ERROR',
'Could not import {b}_backend.py'
.format(b=backend)))
continue
mod.setMsgQ()
self.modules[backend] = mod
def __call__(self, log, name='Scenario'):
self.name = name
self.log = log
return self
def run(self):
''' Thread starting point '''
self.log.put((self.name, 'INFO', 'Starting Starting scenario'))
try:
self.scenario(self)
except (IOError, TypeError) as err:
self.log.put((self.name, 'ERROR', str(err)))
else:
self.log.put((self.name, 'INFO',
'Scenario completed with no errors'))
self.log.put((self.name, 'INFO', 'END'))
#self.done.emit(self.log)
def stop(self):
''' Set the stop_requested flag to true '''
self.stop_requested = True
self.wait()
def getNextMsg(self, timeout=None):
''' Wait for a message and return it (in native format) '''
self.log.put((self.name, 'INFO', 'Waiting for the next message'))
try:
(msgId, p_data_from_mq) = self.msg_q.get(
block=True, timeout=timeout)
except Queue.Empty:
# Timeout expired
self.log.put((self.name, 'ERROR', 'Timeout expired'))
raise IOError('Timeout expired')
else:
self.log.put((self.name, 'INFO', 'Received message'))
self.msg_q.task_done()
# Determine which message was received:
for b, mod in self.modules.viewitems():
if hasattr(mod, 'tmId') and mod.tmId == msgId:
nativeValue = mod.decode_TM(p_data_from_mq)
return (b, nativeValue)
def expectMsg(self, msgId, value='', lineNo=0,
ignoreOther=False, timeout=None):
'''
Wait for a specific message, with optional explicit parameter
If the message content shall not be checked, use value = '*'
If you want to select fields to ignore, replace them with a '*'
e.g. value = ' { name "John", age * }' will discard the 'age' field
'''
self.log.put((self.name, 'INFO',
'Waiting for {id}({val})'.format(id=msgId, val=value)))
# Call the function from the backend
for mod in self.modules.viewkeys():
if(mod.lower() == msgId.lower()
and hasattr(self.modules[mod], 'expect')):
try:
self.modules[mod].expect(
self.msg_q, value, ignoreOther, timeout)
except (ValueError, TypeError) as err:
# Value error: good message but wrong params
# TypeError : wrong message received
self.log.put((self.name, 'ERROR', str(err)))
raise
except IOError as err:
# Timeout
self.log.put((self.name, 'ERROR', str(err)))
raise
else:
self.log.put((self.name, 'INFO',
'Received and verified message content, all OK'))
break
else:
self.log.put((self.name, 'ERROR',
'Undefined message: ' + str(msgId)))
raise TypeError('Undefined message: ' + str(msgId))
def sendMsg(self, msgId, value='', lineNo=0):
''' Send a message to the running binary, use ASN.1 notation '''
for mod in self.modules.viewkeys():
if mod.lower() == msgId.lower():
self.log.put((self.name, 'INFO', 'Sending ' + str(mod)))
send_id = 'send_{id}_VN'.format(id=mod)
if hasattr(self.modules[mod], send_id):
send_ptr = getattr(self.modules[mod], send_id)
# get error code from sending function
try:
send_ptr(value)
except IOError:
log_msg = 'Sending message error'
self.log.put((self.name, 'ERROR', log_msg))
raise IOError(log_msg)
break
else:
log_msg = 'Undefined message: ' + str(msgId)
self.log.put((self.name, 'ERROR', log_msg))
raise TypeError(log_msg)
class PollerThread(QThread):
''' Class polling the msgQ and sending signals to running scenarii '''
msgReceived = Signal(int)
def __init__(self, parent=None):
QThread.__init__(self, parent)
self.q_name = ("{uid}_{fvname}_PI_Python_queue"
.format(uid=str(os.geteuid()), fvname=datamodel.FVname))
self.stop_requested = False
self.slots = []
def run(self):
print('Opening msgQ: ' + self.q_name)
msg_q = OpenMsgQueueForReading(self.q_name)
if msg_q == -1:
print 'Failed to open message queue ' + self.q_name
return -1
buffer_size = GetMsgQueueBufferSize(msg_q)
while True:
if self.stop_requested:
return 0
p_data_from_mq = DV.new_byte_SWIG_PTR(buffer_size)
msg_received_type = RetrieveMessageFromQueue(
msg_q, buffer_size, p_data_from_mq)
if msg_received_type == -1:
time.sleep(0.01)
continue
else:
#print('Received message from TASTE message queue, id =',
# msg_received_type)
for slot in self.slots:
slot.put((msg_received_type, p_data_from_mq))
def stop(self):
''' Request the poller to stop '''
self.stop_requested = True
self.wait()
if __name__ == "__main__":
print 'This module cannot be run standalone. Check TASTE documentation'
from PySide.QtCore import QSettings, Qt, Signal
from PySide.QtGui import QMainWindow, QDockWidget
from asn1_value_editor import PLOTTERS
class TasteMainWindow(QMainWindow):
''' Main GUI window - when closed, saves context (geometry, plots) '''
mscStop = Signal()
mscTrigger = Signal()
def __init__(self, parent=None):
QMainWindow.__init__(self, parent)
self.plotter = None
self.editors = []
self.FVname = ''
self.msc = None
def closeEvent(self, event):
''' Save application state before closing '''
settings = QSettings(self.FVname+".ini", QSettings.IniFormat)
docks = []
for dock in self.plotter.plotPool:
if dock._speedo:
minR, maxR = dock._speedo.getRange()
docks.append(dock._speedoName+':'+str(minR)+':'+str(maxR))
elif dock._tasteplot:
# Save plot configuration (axis, etc)
docks.append('Plot ' +
str(dock.handle) +
':' + str(dock._tasteplot.scrollingLimit) +
':' + str(dock._tasteplot.yAuto) +
':' + str(dock._tasteplot.xShowAll) +
':' + str(dock._tasteplot.yMin) +
':' + str(dock._tasteplot.yMax) +
':' + str(dock._tasteplot.saveCsv) +
':' + str(dock._tasteplot.csvFile) +
':' + str(dock._tasteplot.savePng) +
':' + str(dock._tasteplot.pngFile))
settings.setValue('plots', docks)
settings.setValue('geometry', self.saveGeometry())
settings.setValue('windowState', self.saveState())
# Find and save all plotted values indexes
plotInfo={}
for editor in self.editors:
plotList={}
for idx in editor.plottedIdxs:
plotters = idx.data(Qt.UserRole+4)
if not isinstance(plotters, list):
plotters=[plotters]
for plot in plotters:
if plot['fifoId'] not in plotList:
plotList[plot['fifoId']]=[]
plotList[plot['fifoId']].append(plot['curveName'])
if plotList != {}:
plotInfo[editor.objectName()] = plotList
settings.setValue('plotInfo', plotInfo)
self.plotter.closeAll()
self.mscStop.emit()
def restoreApplicationState(self):
''' Restore windows geometry and state '''
settings = QSettings(self.FVname+".ini", QSettings.IniFormat)
plots = settings.value('plots')
if plots is not None:
if not isinstance(plots, list):
plots = [plots]
for plot in plots:
if plot.startswith('Plot'):
plot = plot.split(':')
scrollingLimit = int(plot[1])
yAuto = (plot[2] == 'True')
xShowAll = (plot[3] == 'True')
yMin = float(plot[4])
yMax = float(plot[5])
saveCsv = (plot[6] == 'True')
csvFile = plot[7]
savePng = (plot[8] == 'True')
pngFile = plot[9]
self.plotter.newPlot(scrollingLimit,
yAuto, xShowAll, yMin, yMax,
saveCsv, csvFile, savePng, pngFile)
elif plot.startswith('SM'):
plot = plot.split(':')
title = plot[1].strip()
minR = float(plot[2])
maxR = float(plot[3])
self.plotter.newMeter(title=title, minR=minR, maxR=maxR)
plotInfo = settings.value('plotInfo')
# Populate plotInfo in the TM viewers (following the path)
for ed in plotInfo:
for editor in self.editors:
if editor.objectName() == ed:
break
for plotId in plotInfo[ed]:
for path in plotInfo[ed][plotId]:
idx = editor.pathToIdx(path.split('.'))
idx = idx.sibling(idx.row(), 3)
currData = idx.data(PLOTTERS)
if currData is None:
currData = []
currData.append({'fifoId': plotId, 'curveName': path})
editor.model.itemFromIndex(idx).setData(currData, PLOTTERS)
if idx not in editor.plottedIdxs:
editor.plottedIdxs.append(idx)
self.restoreGeometry(settings.value('geometry'))
self.restoreState(settings.value('windowState'))
mscWindow = self.findChild(QDockWidget, 'MSCRecorder')
if mscWindow.isVisible():
self.mscTrigger.emit()
if __name__ == '__main__':
pass
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
This module offers facilities to edit ASN.1 values using a GUI
It also contains the engine for the TASTE GUI components
"""
import asn1_value_editor
import gui
__version__ = asn1_value_editor.__version__
#!/usr/bin/python
'''
ASN.1 Python support functions
'''
__author__ = "Maxime Perrotin"
__license__ = "ESA Licence for open source software"
__version__ = "0.1"
__url__ = "http://taste.tuxfamily.org"
from vn import fromValueNotationToPySide
def comparePythonValues(a, b):
''' Compare two ASN.1 variables in Python format, ignoring 'Any' fields (translated from '*' in GSER) '''
if a == b:
return True
for x in (a, b):
if type(x) is dict and 'Any' in x:
return True
if type(a) is not type(b):
return False