#!/usr/bin/python # ASN.1 Variable editor __author__ = "Maxime Perrotin" __license__ = "ESA Community (compatible with LGPLv3)" __version__ = "1.0" __url__ = "http://taste.tuxfamily.org" # maxime.perrotin@esa.int import os import signal import sys import platform import re import logging import optparse import time from ctypes import CDLL log = logging.getLogger(__name__) terminal_formatter = logging.Formatter( fmt="%(levelname)s - %(name)s: %(message)s") handler_console = logging.StreamHandler() handler_console.setLevel(logging.DEBUG) handler_console.setFormatter(terminal_formatter) log.addHandler(handler_console) from ColorFormatter import ColorFormatter from asn1_value_editor import asn1Editor, asn1Viewer from plotmanager import PlotManager try: import speedometer except ImportError: log.error('You must install speedometer first') # Make sure the gui can import modules in the current directory sys.path.insert(0, '.') try: import datamodel except ImportError: datamodel = None try: from PythonController import(OpenMsgQueueForReading, GetMsgQueueBufferSize, DV, RetrieveMessageFromQueue) python_controller = True except ImportError: python_controller = False import resources from mscHandler import mscHandler from sdlHandler import sdlHandler from PySide.QtCore import QThread, QFile, Qt, Signal, QObject from PySide.QtGui import(QApplication, QStatusBar, QLabel, QWidget, QListWidget, QPushButton, QDockWidget, QTabWidget, QMenu, QAction) from PySide.QtUiTools import QUiLoader g_tmPool = {} False class MsgQ_Poller(QThread): ''' Thread for polling the GUI queue in order to receive the messages from the main binary ''' tm = Signal() def __init__(self, log, parent=None): self.log = log QThread.__init__(self, parent) self.Qname = "{uid}_{fvname}_PI_queue".format(uid=str(os.geteuid()), fvname=datamodel.FVname) self.log.info('msQ name= ' + self.Qname) def run(self): self.log.debug('Starting msgQ polling thread') self._bDie = False while True: if self._bDie: return self._msgQueue = OpenMsgQueueForReading(self.Qname) if (self._msgQueue != -1): break self.log.warning( "Communication channel over {Qname} not established yet. " "Retrying...".format(Qname=self.Qname)) time.sleep(1) bufferSize = GetMsgQueueBufferSize(self._msgQueue) while not self._bDie: pDataFromMQ = DV.new_byte_SWIG_PTR(bufferSize) messageReceivedType = RetrieveMessageFromQueue( self._msgQueue, bufferSize, pDataFromMQ) if messageReceivedType == -1: time.sleep(0.005) continue self.log.debug('MsgQ poller: received message') # use a signal to inform the TM editor of the received TM g_tmPool[messageReceivedType].pendingTM = pDataFromMQ self.tm.emit() class MyLogger: ''' Hook for catching the calls to "print" from imported modules ''' def __init__(self, log): self.log = log self.string = "" def write(self, string): if string.endswith('\n'): self.log.info(self.string) self.string = "" else: self.string += string def gui(): ''' Application entry point ''' # Exit app on Ctrl-C signal.signal(signal.SIGINT, signal.SIG_DFL) usage = 'usage: gui.py [--udp=IP_Address:in_Port:out_Port]' version = 'taste auto-gui %s' % (__version__) # Set up the logging facilities log = logging.getLogger("gui") console = logging.StreamHandler(sys.__stdout__) console.setFormatter(ColorFormatter(True)) log.addHandler(console) log.info('Starting GUI') myPrint = MyLogger(log) sys.stdout = myPrint # Parse the command line parser = optparse.OptionParser(usage=usage, version=version) parser.add_option('-v', '--verbose', action='store_true', default=False, help='Display debug information') parser.add_option('--udp', dest='udp', metavar='RemoteIP:inPort:outPort', help='Use UDP sockets instead of message queues') parser.add_option('-l', '--shared_lib', action='store_true', default=False, help='Use a shared library instead of msgQ or UDP') parser.add_option('-r', '--reset', action='store_true', default=False, help='Do not try to restore windows layout') parser.add_option('-i', '--ivpath', dest='ivpath', default='./InterfaceView.aadl', help='Path and filename of the system interface view') options, args = parser.parse_args() if options.verbose: log.setLevel(logging.DEBUG) else: log.setLevel(logging.INFO) log.debug('{version} using Python {pyVersion}'.format( version=version, pyVersion=platform.python_version())) log.debug('Using interface view (for MSC processing): ' + options.ivpath) if options.udp is not None and not re.match("\d+.\d+.\d+.\d+:\d+:\d+", options.udp): log.error('Wrong UDP format. Run with --help for the proper syntax') log.error('You entered ' + options.udp) sys.exit(-1) if not datamodel: log.error('Could not import project data') return -1 msgQ = True udp = None dll = None hasTM = False if not options.udp and not options.shared_lib: # Default: use message queue log.info('Using message queues to communicate') if not python_controller: log.error('Python module "PythonController" is missing') return -1 elif options.udp: msgQ = False udpConfig = options.udp.split(':') log.debug('UDP with IP={cfg[0]} inPort={cfg[1]} outPort={cfg[2]}' .format(cfg=udpConfig)) try: from udpcontroller import tasteUDP udp = tasteUDP(log=log, ip_out=udpConfig[0], inport=int(udpConfig[1]), outport=int(udpConfig[2])) log.info('Using UDP to communicate') except ImportError: log.error('Python module "udpcontroller" is missing') return -1 elif options.shared_lib: # Load and run the startup transition of the shared library (Ada only?) msgQ = False try: dll = CDLL('lib{}.so'.format(datamodel.FVname)) dll_init = getattr(dll, 'lib{}init'.format(datamodel.FVname)) dll_init() except OSError as err: log.error('Issue with shared library lib{}.so:' .format(datamodel.FVname)) log.error(str(err)) return -1 # Define a Qt application (mandatory) app = QApplication(sys.argv) # All available styles are listed with: QStyleFactory.keys() app.setStyle("cleanlooks") # Load the GUI description file (.ui file editable with Qt Designer) # Pyside and PyQt have a different way of doing... from TasteMainWindow import TasteMainWindow loader = QUiLoader() loader.registerCustomWidget(asn1Editor) loader.registerCustomWidget(asn1Viewer) loader.registerCustomWidget(TasteMainWindow) uiFile = QFile("guilayout.ui") uiFile.open(QFile.ReadOnly) myWidget = loader.load(uiFile) uiFile.close() log.info('GUI generated for function ' + datamodel.FVname) myWidget.FVname = datamodel.FVname # Create an instance of the Plot/Meter manager plotter = PlotManager(parent=myWidget, log=log) # find QStatusBar widget statusbar = myWidget.findChildren(QStatusBar)[0] statusbar.showMessage('Welcome to TASTE') # find TASTE logo and get width of the picture tasteLogo = myWidget.findChild(QLabel, 'tasteLogo') tasteLogo.setPixmap(':/tasteLogo_white.png') logoWidth = tasteLogo.pixmap().width() # find Central widget and set maximum size to width of the logo centralWidget = myWidget.findChild(QWidget, 'centralWidget') centralWidget.setMaximumWidth(logoWidth) # To Put ASSERT background image #assertPix=QPalette() #assertPix.setBrush(myWidget.backgroundRole(), # QBrush(QImage("assert_bkg.jpg"))) #myWidget.setPalette(assertPix) # find list to place MSC scenarii and corresponding buttons mscList = myWidget.findChild(QListWidget, 'msc') runMSC = myWidget.findChild(QPushButton, 'runMSC') loadMSC = myWidget.findChild(QPushButton, 'loadMSC') editMSC = myWidget.findChild(QPushButton, 'editMSC') # Create an instance of the class handling MSC/Python scripts msgs = [] for msg in datamodel.tc: msgs.append({'name': msg, 'type': datamodel.tc[msg]['nodeTypename']}) for msg in datamodel.tm: msgs.append({'name': msg, 'type': datamodel.tm[msg]['nodeTypename']}) msc = mscHandler( myWidget, mscList, datamodel.FVname, msgs, udpController=udp) msc.log = log msc.ivpath = options.ivpath runMSC.pressed.connect(msc.run) loadMSC.pressed.connect(msc.load) editMSC.pressed.connect(msc.edit) myWidget.mscStop.connect(msc.stopMscRecording) # Create toolbar with a button to start/stop MSC recording and streamind toolbar = myWidget.addToolBar('toolbar') toolbar.setObjectName('toolbar') mscButton = toolbar.addAction('MSC') mscButton.triggered.connect(msc.startStop) myWidget.mscTrigger.connect(msc.startStop) # find dockable widgets (containing TM and TC editors) docks = myWidget.findChildren(QDockWidget) firstTC = None firstTM = None # Arrange them in a nice way: group the TM and TC in tabs myWidget.setTabPosition(Qt.RightDockWidgetArea, QTabWidget.North) for dock in docks: if dock.objectName().startswith('tm'): if firstTM is None: firstTM = dock else: myWidget.tabifyDockWidget(firstTM, dock) else: if firstTC is None: firstTC = dock else: myWidget.tabifyDockWidget(firstTC, dock) dock.setMinimumWidth(700 - logoWidth) # find TC/TM Editors myEdits = myWidget.findChildren(asn1Editor) #Pattern to create/add a new dock window - easy! #newDock=QDockWidget('Hello', parent=myWidget) #newDock.setFloating(True) #myWidget.addDockWidget(Qt.RightDockWidgetArea, newDock) # Create a dock to handle the display of SDL diagrams try: sdl = sdlHandler(myWidget) sdl.dll = dll except IOError: log.info('SDL viewer not available') sdl = None else: sdlButton = toolbar.addAction('SDL') sdlButton.triggered.connect(sdl.startStop) # Set the ASN.1 data type of each TC editor for editor in myEdits: # Enable mouse tracking - to display tips in the status bar editor.setMouseTracking(True) editor.entered.connect(editor.displaytip) # Hide columns showing the ASN.1 Type and constraint editor.hideExtraColumns() # Import the backend generated by the pyside_B_mapper from aadl2glueC encoder_backend = __import__("%s_backend" % editor.objectName()) # Set the callback in the backend to forward incoming TM to the GUI encoder_backend.editor = editor TMHandler = type('TMHandler', (QObject,), {'got_tm': Signal()}) # Callback in the backend to create a condition box in the MSC MSCHandler = type('TCHandler', (QObject,), {'msc_box': Signal(unicode)}) encoder_backend.tm_callback = TMHandler() encoder_backend.msc_callback = MSCHandler() encoder_backend.msc_callback.msc_box.connect(msc.addCondition) encoder_backend.tm_callback.got_tm.connect(editor.receivedTM) # Callback in the backend to update the state of the SDL diagram SDLHandler = type('SDLHandler', (QObject,), {'change_state': Signal(unicode)}) encoder_backend.sdl_callback = SDLHandler() encoder_backend.sdl_callback.change_state.connect(sdl.change_state) encoder_backend.log = log encoder_backend.statusbar = statusbar editor.statusBarMessage.connect(statusbar.showMessage) editor.msc.connect(msc.addToMsc) if msgQ: encoder_backend.setMsgQ() elif udp: encoder_backend.setUDP() elif dll: # Provide pointer to the shared lib to the backends encoder_backend.setSharedLib(dll) editor.backend = encoder_backend editor.log = log dockWidget = editor.parent() myButtons = dockWidget.findChildren(QPushButton) if isinstance(editor, asn1Viewer): # TM Viewer hasTM = True editor.setAsn1Model(datamodel.tm[editor.objectName()]) g_tmPool[encoder_backend.tmId] = editor editor.setPlotterBackend(plotter) for button in myButtons: if button.objectName() == 'plotButton': myMenu = QMenu() action = QAction("New Plot", myMenu) myMenu.addAction(action) button.setMenu(myMenu) action.triggered.connect(editor.newPlot) plotter.addPlotMenu(myMenu, editor) elif button.objectName() == 'meterButton': button.clicked.connect(editor.meter) else: button.hide() # button.clicked.connect(editor.unmeter) else: # TC Editor editor.setAsn1Model(datamodel.tc[editor.objectName()]) if not msgQ: encoder_backend.udpController = udp for button in myButtons: if button.objectName() == 'saveButton': button.clicked.connect(editor.saveTC) elif button.objectName() == 'loadButton': button.clicked.connect(editor.loadTC) elif button.objectName() == 'sendButton': button.clicked.connect(editor.sendTC) if sdl: button.clicked.connect(sdl.on_event) # Create a thread to poll the message queue or open an UDP socket if msgQ: if hasTM: pollingThread = MsgQ_Poller(log=log) for editor in myEdits: if isinstance(editor, asn1Viewer): pollingThread.tm.connect(editor.receivedTM) pollingThread.start() elif udp: if hasTM: udp.tmPool = g_tmPool udp.start(hasTM) elif dll: # Nothing to do, the TM callbacks have already been connected (above) pass # Associate the plotter manager to the main app myWidget.plotter = plotter # Set list of editors to the main app myWidget.editors = myEdits # If applicable, restore window geometry from a previous session if not options.reset: log.info('Restoring windows layout (use --reset to discard)') myWidget.restoreApplicationState() else: log.info('Ignoring and deleting previously stored windows layout') # Display the main window (including TM/TC editors) myWidget.show() ret = app.exec_() sys.stdout = sys.__stdout__ if msgQ and hasTM: pollingThread._bDie = True pollingThread.wait() return ret if __name__ == '__main__': sys.exit(gui())