asynchronousTool.py 7.54 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#
# (C) Semantix Information Technologies.
#
# Semantix Information Technologies is licensing the code of the
# Data Modelling Tools (DMT) in the following dual-license mode:
#
# Commercial Developer License:
#       The DMT Commercial Developer License is the suggested version
# to use for the development of proprietary and/or commercial software.
# This version is for developers/companies who do not want to comply
# with the terms of the GNU Lesser General Public License version 2.1.
#
# GNU LGPL v. 2.1:
#       This version of DMT is the one to use for the development of
# applications, when you are willing to comply with the terms of the
# GNU Lesser General Public License version 2.1.
#
# Note that in both cases, there are no charges (royalties) for the
# generated code.
#
21
'''
22
23
24
25
26
27
28
29
30
31
32
33
34
35
Base class for all asynchronous tools
'''

import re
import os

from commonPy.utility import inform, panicWithCallStack


class ASynchronousToolGlueGenerator:

    ################################################
    # Parts to override for each asynchronous tool

36
    def Version(self):  # pylint: disable=no-self-use
37
38
        panicWithCallStack("Method undefined in a ASynchronousToolGlueGenerator...")  # pragma: no cover

39
    def HeadersOnStartup(self, unused_asnFile, unused_outputDir, unused_maybeFVname):  # pylint: disable=no-self-use
40
41
        panicWithCallStack("Method undefined in a ASynchronousToolGlueGenerator...")  # pragma: no cover

42
    def Encoder(self, unused_nodeTypename, unused_node, unused_leafTypeDict, unused_names, unused_encoding):  # pylint: disable=no-self-use
43
44
        panicWithCallStack("Method undefined in a ASynchronousToolGlueGenerator...")  # pragma: no cover

45
    def Decoder(self, unused_nodeTypename, unused_node, unused_leafTypeDict, unused_names, unused_encoding):  # pylint: disable=no-self-use
46
47
48
49
50
        panicWithCallStack("Method undefined in a ASynchronousToolGlueGenerator...")  # pragma: no cover

    ########################################################
    # Parts to possibly override for each synchronous tool

51
    # noinspection PyMethodMayBeStatic
52
    def CleanNameAsToolWants(self, name):  # pylint: disable=no-self-use
53
54
55
56
57
58
59
60
61
62
63
        return re.sub(r'[^a-zA-Z0-9_]', '_', name)

    ##########################################
    # Common parts for all asynchronous tools

    def __init__(self):
        # The files written to
        self.C_HeaderFile = None
        self.C_SourceFile = None
        self.asn_name = ""
        self.supportedEncodings = ['native', 'uper', 'acn']
64
65
        self.useOSS = None
        self.typesToWorkOn = {}
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122

    def OnStartup(self, modelingLanguage, asnFile, outputDir, maybeFVname, useOSS):
        self.useOSS = useOSS
        prefix = modelingLanguage
        if prefix == "SDL":
            prefix = "OG"
        outputCheaderFilename = self.CleanNameAsToolWants(prefix) + "_ASN1_Types.h"
        outputCsourceFilename = self.CleanNameAsToolWants(prefix) + "_ASN1_Types.c"

        inform(str(self.__class__) + ": Creating file '%s'...", outputCheaderFilename)
        self.C_HeaderFile = open(outputDir + outputCheaderFilename, 'w')

        inform(str(self.__class__) + ": Creating file '%s'...", outputCsourceFilename)
        self.C_SourceFile = open(outputDir + outputCsourceFilename, 'w')

        self.asn_name = os.path.basename(os.path.splitext(asnFile)[0])

        ID = modelingLanguage + "_" + self.asn_name.replace(".", "")
        ID = re.sub(r'[^A-Za-z0-9_]', '_', ID).upper()
        self.C_HeaderFile.write("#ifndef __%s_H__\n" % ID)
        self.C_HeaderFile.write("#define __%s_H__\n\n" % ID)
        self.C_HeaderFile.write("#include <stdlib.h> /* for size_t */\n")
        self.C_HeaderFile.write("\n")
        self.C_SourceFile.write("#include <stdio.h>\n")
        self.C_SourceFile.write("#include <string.h>\n\n")
        self.C_SourceFile.write("#include <assert.h>\n\n")
        self.C_SourceFile.write("#include \"%s\"\n\n" % outputCheaderFilename)

        self.HeadersOnStartup(asnFile, outputDir, maybeFVname)

        self.typesToWorkOn = {}

    def Common(self, nodeTypename, node, leafTypeDict, names):
        # Async backends are different: they work on ASN.1 types, not SP params.

        # OG for example, needs macros to be generated, not functions. This breaks the normal
        # encapsulation we followed for synchronous tools, because we can't create encoding
        # and decoding functions that work on their arguments. The reason for this is that
        # in contrast with the synchronous world (SCADE and MATLAB/Simulink), we can't use
        # global variables as targets for OG. Being asynchronous, OG-generated code can call
        # an external function whenever it wishes, and it needs message-specific work inside
        # it. That in itself is not bad, but what is bad is the fact that we can't predict
        # the type name used for the message (it is randomly numbered, and it can't be predicted
        # by the ASN.1 typename alone). We therefore resorted to macros... and we thus can't
        # employ a common scheme for all asynchronous tools where we generate functions
        # in the context of this method... Instead, we have to delegate all work to members
        # defined in the derived classes...
        self.Encoder(nodeTypename, node, leafTypeDict, names, 'uper')
        self.Encoder(nodeTypename, node, leafTypeDict, names, 'acn')
        self.Encoder(nodeTypename, node, leafTypeDict, names, 'native')
        self.Decoder(nodeTypename, node, leafTypeDict, names, 'uper')
        self.Decoder(nodeTypename, node, leafTypeDict, names, 'acn')
        self.Decoder(nodeTypename, node, leafTypeDict, names, 'native')

    def OnBasic(self, nodeTypename, node, leafTypeDict, names):
        realLeafType = leafTypeDict[nodeTypename]
        inform(str(self.__class__) + ": BASE: %s (%s)", nodeTypename, realLeafType)
123
        self.typesToWorkOn[nodeTypename] = [node, leafTypeDict, names]
124
125
126

    def OnSequence(self, nodeTypename, node, leafTypeDict, names):
        inform(str(self.__class__) + ": SEQUENCE: %s", nodeTypename)
127
        self.typesToWorkOn[nodeTypename] = [node, leafTypeDict, names]
128
129
130

    def OnSet(self, nodeTypename, node, leafTypeDict, names):
        inform(str(self.__class__) + ": SET: %s", nodeTypename)  # pragma: nocover
131
        self.typesToWorkOn[nodeTypename] = [node, leafTypeDict, names]  # pragma: nocover
132
133
134

    def OnEnumerated(self, nodeTypename, node, leafTypeDict, names):
        inform(str(self.__class__) + ": ENUMERATED: %s", nodeTypename)
135
        self.typesToWorkOn[nodeTypename] = [node, leafTypeDict, names]
136
137
138

    def OnSequenceOf(self, nodeTypename, node, leafTypeDict, names):
        inform(str(self.__class__) + ": SEQUENCEOF: %s", nodeTypename)
139
        self.typesToWorkOn[nodeTypename] = [node, leafTypeDict, names]
140
141
142

    def OnSetOf(self, nodeTypename, node, leafTypeDict, names):
        inform(str(self.__class__) + ": SETOF: %s", nodeTypename)  # pragma: nocover
143
        self.typesToWorkOn[nodeTypename] = [node, leafTypeDict, names]  # pragma: nocover
144
145
146

    def OnChoice(self, nodeTypename, node, leafTypeDict, names):
        inform(str(self.__class__) + ": CHOICE: %s", nodeTypename)
147
        self.typesToWorkOn[nodeTypename] = [node, leafTypeDict, names]
148
149
150
151
152
153
154
155
156

    def OnShutdown(self, unused_modelingLanguage, unused_asnFile, unused_maybeFVname):
        for nodeTypename, value in list(self.typesToWorkOn.items()):
            inform(str(self.__class__) + "Really working on " + nodeTypename)
            (node, leafTypeDict, names) = value
            self.Common(nodeTypename, node, leafTypeDict, names)
        self.C_HeaderFile.write("\n#endif\n")
        self.C_HeaderFile.close()
        self.C_SourceFile.close()