#!/usr/bin/env python # -*- coding: utf-8 -*- """ TASTE ASN.1 Value Editor Tool main entry point (when used standalone) Copyright (c) 2012-2015 European Space Agency Designed and implemented by Maxime Perrotin Contact: maxime.perrotin@esa.int License is LGPLv3 - Check the LICENSE file """ __url__ = "http://taste.tuxfamily.org" import os import traceback import signal import sys import platform import re import logging import argparse import time from functools import partial from ctypes import CDLL log = logging.getLogger(__name__) terminal_formatter = logging.Formatter( fmt="%(levelname)s - %(name)s: %(message)s") handler_console = logging.StreamHandler() handler_console.setLevel(logging.DEBUG) handler_console.setFormatter(terminal_formatter) log.addHandler(handler_console) from ColorFormatter import ColorFormatter from asn1_value_editor import asn1Editor, asn1Viewer, __version__ from plotmanager import PlotManager import vn import UserWidgets try: import speedometer except ImportError: log.error('You must install speedometer first') # Make sure the gui can import modules in the current directory sys.path.insert(0, '.') try: import datamodel except ImportError: datamodel = None try: from PythonController import(OpenMsgQueueForReading, new_byte_SWIG_PTR, GetMsgQueueBufferSize, DV, RetrieveMessageFromQueue) python_controller = True except ImportError: python_controller = False import resources from mscHandler import mscHandler from sdlHandler import sdlHandler from PySide.QtCore import QThread, QFile, Qt, Signal, QObject from PySide.QtGui import(QApplication, QStatusBar, QLabel, QWidget, QListWidget, QPushButton, QDockWidget, QTabWidget, QMenu, QAction) from PySide.QtUiTools import QUiLoader g_tmPool = {} False class MsgQ_Poller(QThread): ''' Thread for polling the GUI queue in order to receive the messages from the main binary ''' tm = Signal() def __init__(self, log, parent=None): self.log = log QThread.__init__(self, parent) self.Qname = "{uid}_{fvname}_PI_queue".format(uid=str(os.geteuid()), fvname=datamodel.FVname) self.log.info('msQ name= ' + self.Qname) def run(self): self.log.debug('Starting msgQ polling thread') self._bDie = False while True: if self._bDie: return self._msgQueue = OpenMsgQueueForReading(self.Qname) if (self._msgQueue != -1): break self.log.warning( "Communication channel over {Qname} not established yet. " "Retrying...".format(Qname=self.Qname)) time.sleep(1) bufferSize = GetMsgQueueBufferSize(self._msgQueue) pDataFromMQ = new_byte_SWIG_PTR(bufferSize) while not self._bDie: messageReceivedType = RetrieveMessageFromQueue( self._msgQueue, bufferSize, pDataFromMQ) if messageReceivedType == -1: time.sleep(0.005) continue self.log.debug('MsgQ poller: received message') # use a signal to inform the TM editor of the received TM g_tmPool[messageReceivedType].pendingTM = pDataFromMQ self.tm.emit() class MyLogger: ''' Hook for catching the calls to "print" from imported modules ''' def __init__(self, log): self.log = log self.string = "" def write(self, string): if string.endswith('\n'): self.log.info(self.string) self.string = "" else: self.string += string def gui(): ''' Application entry point ''' # Exit app on Ctrl-C signal.signal(signal.SIGINT, signal.SIG_DFL) if 'opengeode-simulator' in sys.argv[0]: logo = ':/opengeode.png' msclist_enable = False sys.argv.append('-l') else: logo = ':/tasteLogo_white.png' msclist_enable = True #usage = 'usage: gui.py [--udp=IP_Address:in_Port:out_Port]' version = 'taste auto-gui %s' % (__version__) # Set up the logging facilities log = logging.getLogger("gui") console = logging.StreamHandler(sys.__stdout__) console.setFormatter(ColorFormatter()) log.addHandler(console) log.info('Starting GUI') myPrint = MyLogger(log) sys.stdout = myPrint # Parse the command line parser = argparse.ArgumentParser(version=version) parser.add_argument('-g', '--verbose', action='store_true', default=False, help='Display debug information') parser.add_argument('--udp', dest='udp', metavar='RemoteIP:inPort:outPort', help='Use UDP sockets instead of message queues') parser.add_argument('-l', '--shared_lib', action='store_true', default=False, help='Simulate SDL system, ' 'using a shared library') parser.add_argument('-r', '--reset', action='store_true', default=False, help='Do not try to restore windows layout') parser.add_argument('-p', '--properties', dest='properties', help='Specify a file containing stop conditions') parser.add_argument('-i', '--ivpath', dest='ivpath', default='./InterfaceView.aadl', help='Path and filename of the system interface view') options = parser.parse_args() if options.verbose: log.setLevel(logging.DEBUG) else: log.setLevel(logging.INFO) log.debug('{version} using Python {pyVersion}'.format( version=version, pyVersion=platform.python_version())) log.debug('Using interface view (for MSC processing): ' + options.ivpath) if options.udp is not None and not re.match("\d+.\d+.\d+.\d+:\d+:\d+", options.udp): log.error('Wrong UDP format. Run with --help for the proper syntax') log.error('You entered ' + options.udp) sys.exit(-1) if not datamodel: log.error('Could not import project data') return -1 msgQ = True udp = None dll = None hasTM = False if not options.udp and not options.shared_lib: # Default: use message queue log.info('Using message queues to communicate') if not python_controller: log.error('Python module "PythonController" is missing') return -1 elif options.udp: msgQ = False udpConfig = options.udp.split(':') log.debug('UDP with IP={cfg[0]} inPort={cfg[1]} outPort={cfg[2]}' .format(cfg=udpConfig)) try: from udpcontroller import tasteUDP udp = tasteUDP(log=log, ip_out=udpConfig[0], inport=int(udpConfig[1]), outport=int(udpConfig[2])) log.info('Using UDP to communicate') except ImportError: log.error('Python module "udpcontroller" is missing') return -1 elif options.shared_lib: # Load and run the startup transition of the shared library (Ada only?) msgQ = False try: dll = CDLL('./lib{}.so'.format(datamodel.FVname.lower())) dll_init = getattr(dll, 'lib{}init'.format(datamodel.FVname.lower())) dll_init() except OSError as err: log.error('Issue with shared library lib{}.so:' .format(datamodel.FVname)) log.error(str(err)) return -1 # Define a Qt application (mandatory) app = QApplication(sys.argv) # All available styles are listed with: QStyleFactory.keys() app.setStyle("cleanlooks") # Load the GUI description file (.ui file editable with Qt Designer) # Pyside and PyQt have a different way of doing... from TasteMainWindow import TasteMainWindow loader = QUiLoader() loader.registerCustomWidget(asn1Editor) loader.registerCustomWidget(asn1Viewer) loader.registerCustomWidget(TasteMainWindow) uiFile = QFile("guilayout.ui") uiFile.open(QFile.ReadOnly) myWidget = loader.load(uiFile) uiFile.close() log.info('GUI generated for function ' + datamodel.FVname) myWidget.FVname = datamodel.FVname # Create an instance of the Plot/Meter manager plotter = PlotManager(parent=myWidget, log=log) # find QStatusBar widget statusbar = myWidget.findChildren(QStatusBar)[0] statusbar.showMessage('Welcome to TASTE') # find TASTE logo and get width of the picture tasteLogo = myWidget.findChild(QLabel, 'tasteLogo') tasteLogo.setPixmap(logo) logoWidth = tasteLogo.pixmap().width() # find Central widget and set maximum size to width of the logo centralWidget = myWidget.findChild(QWidget, 'centralWidget') centralWidget.setMaximumWidth(logoWidth) # To Put ASSERT background image #assertPix=QPalette() #assertPix.setBrush(myWidget.backgroundRole(), # QBrush(QImage("assert_bkg.jpg"))) #myWidget.setPalette(assertPix) # find list to place MSC scenarii and corresponding buttons mscList = myWidget.findChild(QListWidget, 'msc') runMSC = myWidget.findChild(QPushButton, 'runMSC') loadMSC = myWidget.findChild(QPushButton, 'loadMSC') editMSC = myWidget.findChild(QPushButton, 'editMSC') # In SDL simulation mode, MSC record/replay is not supported yet mscList.setEnabled(msclist_enable) runMSC.setEnabled(msclist_enable) loadMSC.setEnabled(msclist_enable) editMSC.setEnabled(msclist_enable) # Create an instance of the class handling MSC/Python scripts msgs = [] for msg in datamodel.tc: msgs.append({'name': msg, 'type': datamodel.tc[msg]['nodeTypename']}) for msg in datamodel.tm: msgs.append({'name': msg, 'type': datamodel.tm[msg]['nodeTypename']}) # set the names of the MSC instances, depending on the context if dll: instance1 = datamodel.FVname instance2 = 'Operator' else: instance1 = 'TASTE_System' instance2 = datamodel.FVname msc = mscHandler(myWidget, mscList, instance2, msgs, udpController=udp, instance_name=instance1) msc.log = log msc.ivpath = options.ivpath runMSC.pressed.connect(msc.run) loadMSC.pressed.connect(msc.load) editMSC.pressed.connect(msc.edit) myWidget.mscStop.connect(msc.stopMscRecording) # Create toolbar with a button to start/stop MSC recording and streamind toolbar = myWidget.addToolBar('toolbar') toolbar.setObjectName('toolbar') mscButton = toolbar.addAction('MSC') mscButton.triggered.connect(msc.startStop) myWidget.mscTrigger.connect(msc.startStop) # find dockable widgets (containing TM and TC editors) docks = myWidget.findChildren(QDockWidget) firstTC = None firstTM = None # Arrange them in a nice way: group the TM and TC in tabs myWidget.setTabPosition(Qt.RightDockWidgetArea, QTabWidget.North) for dock in docks: if dock.objectName().startswith('tm'): if firstTM is None: firstTM = dock else: myWidget.tabifyDockWidget(firstTM, dock) else: if firstTC is None: firstTC = dock else: myWidget.tabifyDockWidget(firstTC, dock) dock.setMinimumWidth(700 - logoWidth) # find TC/TM Editors myEdits = myWidget.findChildren(asn1Editor) #Pattern to create/add a new dock window - easy! #newDock=QDockWidget('Hello', parent=myWidget) #newDock.setFloating(True) #myWidget.addDockWidget(Qt.RightDockWidgetArea, newDock) # Create a dock to handle the display of SDL diagrams try: sdl = sdlHandler(myWidget) sdl.dll = dll sdl.msc.connect(msc.addToMsc) sdl.msc_undo.connect(msc.undo) sdl.msc_redo.connect(msc.redo) sdl.msc_macro_start.connect(msc.start_undo_macro) sdl.msc_macro_stop.connect(msc.stop_undo_macro) sdl.allowed_messages.connect(myWidget.allowed_editors) except IOError as err: log.info('SDL viewer not available - ' + str(err)) #log.info(traceback.format_exc()) sdl = None else: sdlButton = toolbar.addAction('SDL') sdlButton.triggered.connect(sdl.startStop) if options.properties: sdl.load_properties(options.properties) # Set the ASN.1 data type of each TC editor for editor in myEdits: # Enable mouse tracking - to display tips in the status bar editor.setMouseTracking(True) editor.entered.connect(editor.displaytip) # Hide columns showing the ASN.1 Type and constraint editor.hideExtraColumns() # Import the backend generated by the pyside_B_mapper from aadl2glueC encoder_backend = __import__("%s_backend" % editor.objectName()) # Set the callback in the backend to forward incoming TM to the GUI encoder_backend.editor = editor TMHandler = type('TMHandler', (QObject,), {'got_tm': Signal()}) encoder_backend.tm_callback = TMHandler() encoder_backend.tm_callback.got_tm.connect(editor.receivedTM) # Set the callback in the backend to send messages via the dll # (handled in the SDL backend, also managing MSC traces) if dll: DLLHandler = type('DLLHandler', (QObject,), {'dll': Signal(unicode, type, type)}) encoder_backend.send_via_dll = DLLHandler() encoder_backend.send_via_dll.dll.connect(sdl.send_tc) if isinstance(editor, asn1Viewer): # Connect TM to MSC editor.msc.connect(sdl.add_to_msc) else: editor.msc.connect(msc.addToMsc) encoder_backend.log = log encoder_backend.statusbar = statusbar editor.statusBarMessage.connect(statusbar.showMessage) if msgQ: encoder_backend.setMsgQ() elif udp: encoder_backend.setUDP() elif dll: # Provide pointer to the shared lib to the backends encoder_backend.setSharedLib(dll) editor.backend = encoder_backend editor.log = log dockWidget = editor.parent() myButtons = dockWidget.findChildren(QPushButton) # retrieve user widgets for TM and TC customTC, customTM = [], [] for each in UserWidgets.__all__: widget = getattr(UserWidgets, each) if widget.__base__ == UserWidgets.TC and widget.applicable(): customTC.append(widget) elif widget.__base__ == UserWidgets.TM and widget.applicable(): customTM.append(widget) if isinstance(editor, asn1Viewer): # TM Viewer hasTM = True editor.setAsn1Model(datamodel.tm[editor.objectName()]) g_tmPool[encoder_backend.tmId] = editor editor.setPlotterBackend(plotter) for button in myButtons: if button.objectName() == 'plotButton': myMenu = QMenu() action = QAction("New Plot", myMenu) myMenu.addAction(action) button.setMenu(myMenu) action.triggered.connect(editor.newPlot) plotter.addPlotMenu(myMenu, editor) elif button.objectName() == 'meterButton': button.clicked.connect(editor.meter) elif button.objectName() == 'customCombo': if not customTM: continue myMenu = QMenu() for each in customTM: action = QAction(each.name, myMenu) myMenu.addAction(action) action.triggered.connect(partial(editor.custom, each)) button.setMenu(myMenu) button.menu() # Seems there is a refresh issue without that else: button.hide() else: # TC Editor editor.setAsn1Model(datamodel.tc[editor.objectName()]) if not msgQ: encoder_backend.udpController = udp for button in myButtons: if button.objectName() == 'saveButton': button.clicked.connect(editor.saveTC) elif button.objectName() == 'loadButton': button.clicked.connect(editor.loadTC) elif button.objectName() == 'sendButton': button.clicked.connect(editor.sendTC) if dll: sdl.param_tc_editors.append({'name': editor.objectName(), 'editor': editor, 'send_btn': button}) elif button.objectName() == 'customCombo': if not customTC: button.hide() myMenu = QMenu() for each in customTC: action = QAction(each.name, myMenu) myMenu.addAction(action) action.triggered.connect(partial(editor.custom, each)) button.setMenu(myMenu) button.menu() # Seems there is a refresh issue without that else: button.hide() # Create a thread to poll the message queue or open an UDP socket if msgQ: if hasTM: pollingThread = MsgQ_Poller(log=log) for editor in myEdits: if isinstance(editor, asn1Viewer): pollingThread.tm.connect(editor.receivedTM) pollingThread.start() elif udp: if hasTM: udp.tmPool = g_tmPool udp.start(hasTM) elif dll: # Nothing to do, the TM callbacks have already been connected (above) pass # Associate the plotter manager to the main app myWidget.plotter = plotter # Set list of editors to the main app myWidget.editors = myEdits # If applicable, restore window geometry from a previous session if not options.reset: log.info('Restoring windows layout (use --reset to discard)') myWidget.restoreApplicationState() else: log.info('Ignoring and deleting previously stored windows layout') # Display the main window (including TM/TC editors) myWidget.show() ret = app.exec_() sys.stdout = sys.__stdout__ if msgQ and hasTM: pollingThread._bDie = True pollingThread.wait() return ret if __name__ == '__main__': sys.exit(gui())