#!/usr/bin/env python # -*- coding: utf-8 -*- """ TASTE ASN.1 Value Editor Handle regression testing scenarios in the context of the MSC recordings Copyright (c) 2012-2015 European Space Agency Designed and implemented by Maxime Perrotin Contact: maxime.perrotin@esa.int License is LGPLv3 - Check the LICENSE file """ from PySide.QtCore import QThread, Signal import Queue import datamodel import sys import os import importlib import time import DV import ctypes try: from PythonController import(OpenMsgQueueForReading, GetMsgQueueBufferSize, RetrieveMessageFromQueue) except ImportError: print 'ERROR importing PythonController' from opengeode import Asn1scc as asn1scc #TASTE_INST = os.popen('taste-config --prefix').readlines()[0].strip() #sys.path.append(TASTE_INST + '/share/asn1-editor') class Scenario(QThread, object): ''' Generic class handling the execution of an MSC scenario ''' def __init__(self, func): QThread.__init__(self) self.msg_q = Queue.Queue() self.modules = {} self.scenario = func # Parse the dataview.asn to get the Python AST dataView = asn1scc.parse_asn1(['dataview-uniq.asn'], rename_policy=asn1scc.ASN1.RenameOnlyConflicting, ast_version=asn1scc.ASN1.UniqueEnumeratedNames, flags=[asn1scc.ASN1.AstOnly]) # Control the end of the scenario self.stop_requested = False # Load all TM and TC backends, once for all backends = datamodel.tm.keys() backends.extend(datamodel.tc.keys()) for backend in backends: try: mod = reload(backend + '_backend') except (TypeError, NameError): try: mod = importlib.import_module(backend + '_backend') except ImportError: try: self.log.put((self.name, 'ERROR', 'Could not import {b}_backend.py' .format(b=backend))) except AttributeError: print("ERROR: could not import {}_backend.py" .format(backend)) continue mod.setMsgQ() # set the ASN.AST into the backend module, needed to convert VN mod.ASN1_AST = dataView.types self.modules[backend] = mod def __call__(self, log, name='Scenario'): self.name = name self.log = log return self def run(self): ''' Thread starting point ''' self.log.put((self.name, 'INFO', 'Starting scenario')) try: self.scenario(self) except (IOError, TypeError) as err: self.log.put((self.name, 'ERROR', str(err))) else: self.log.put((self.name, 'INFO', 'Scenario completed with no errors')) self.log.put((self.name, 'INFO', 'END')) #self.done.emit(self.log) def stop(self): ''' Set the stop_requested flag to true ''' self.stop_requested = True self.wait() def getNextMsg(self, timeout=None): ''' Wait for a message and return it (in native format) ''' self.log.put((self.name, 'INFO', 'Waiting for the next message')) try: (msgId, p_data_from_mq) = self.msg_q.get( block=True, timeout=timeout) except Queue.Empty: # Timeout expired self.log.put((self.name, 'ERROR', 'Timeout expired')) raise IOError('Timeout expired') else: self.log.put((self.name, 'INFO', 'Received message')) self.msg_q.task_done() # Determine which message was received: for b, mod in self.modules.viewitems(): if hasattr(mod, 'tmId') and mod.tmId == msgId: nativeValue = mod.decode_TM(p_data_from_mq) return (b, nativeValue) def expectMsg(self, msgId, value='', lineNo=0, ignoreOther=False, timeout=None): ''' Wait for a specific message, with optional explicit parameter If the message content shall not be checked, use value = '*' If you want to select fields to ignore, replace them with a '*' e.g. value = ' { name "John", age * }' will discard the 'age' field ''' self.log.put((self.name, 'INFO', 'Waiting for {id}({val})'.format(id=msgId, val=value))) # Call the function from the backend for mod in self.modules.viewkeys(): if(mod.lower() == msgId.lower() and hasattr(self.modules[mod], 'expect')): try: self.modules[mod].expect( self.msg_q, value, ignoreOther, timeout) except (ValueError, TypeError) as err: # Value error: good message but wrong params # TypeError : wrong message received self.log.put((self.name, 'ERROR', str(err))) raise except IOError as err: # Timeout self.log.put((self.name, 'ERROR', str(err))) raise else: self.log.put((self.name, 'INFO', 'Received and verified message content, all OK')) break else: self.log.put((self.name, 'ERROR', 'Undefined message: ' + str(msgId))) raise TypeError('Undefined message: ' + str(msgId)) def sendMsg(self, msgId, value='', lineNo=0): ''' Send a message to the running binary, use ASN.1 notation ''' for mod in self.modules.viewkeys(): if mod.lower() == msgId.lower(): self.log.put((self.name, 'INFO', 'Sending ' + str(mod))) send_id = 'send_{id}_VN'.format(id=mod) if hasattr(self.modules[mod], send_id): send_ptr = getattr(self.modules[mod], send_id) # get error code from sending function try: send_ptr(value) except IOError: log_msg = 'Sending message error' self.log.put((self.name, 'ERROR', log_msg)) raise IOError(log_msg) break else: log_msg = 'Undefined message: ' + str(msgId) self.log.put((self.name, 'ERROR', log_msg)) raise TypeError(log_msg) class PollerThread(QThread): ''' Class polling the msgQ and sending signals to running scenarii ''' msgReceived = Signal(int) def __init__(self, parent=None): QThread.__init__(self, parent) self.q_name = ("{uid}_{fvname}_PI_Python_queue" .format(uid=str(os.geteuid()), fvname=datamodel.FVname)) self.stop_requested = False self.slots = [] def run(self): print('Opening msgQ: ' + self.q_name) msg_q = OpenMsgQueueForReading(self.q_name) if msg_q == -1: print 'Failed to open message queue ' + self.q_name return -1 buffer_size = GetMsgQueueBufferSize(msg_q) # Configure a buffer to read data from the queue buff = ctypes.create_string_buffer(buffer_size) while True: if self.stop_requested: return 0 # Create an actual buffer and get the pointer # (accessing .raw is doing Python magic) raw = buff.raw msg_received_type = RetrieveMessageFromQueue(msg_q, buffer_size, raw) if msg_received_type == -1: time.sleep(0.01) continue else: for slot in self.slots: slot.put((msg_received_type, raw)) def stop(self): ''' Request the poller to stop ''' self.stop_requested = True self.wait() if __name__ == "__main__": print 'This module cannot be run standalone. Check TASTE documentation'