asynchronousTool.py 9.19 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#
# (C) Semantix Information Technologies.
#
# Semantix Information Technologies is licensing the code of the
# Data Modelling Tools (DMT) in the following dual-license mode:
#
# Commercial Developer License:
#       The DMT Commercial Developer License is the suggested version
# to use for the development of proprietary and/or commercial software.
# This version is for developers/companies who do not want to comply
# with the terms of the GNU Lesser General Public License version 2.1.
#
# GNU LGPL v. 2.1:
#       This version of DMT is the one to use for the development of
# applications, when you are willing to comply with the terms of the
# GNU Lesser General Public License version 2.1.
#
# Note that in both cases, there are no charges (royalties) for the
# generated code.
#
21
'''
22
23
24
25
26
27
Base class for all asynchronous tools
'''

import re
import os

28
from typing import Tuple, IO, Any, Dict  # NOQA pylint: disable=unused-import
29

30
from ..commonPy.utility import inform, panicWithCallStack
31
from ..commonPy.asnParser import Typename, AsnNode, AST_Lookup, AST_Leaftypes  # NOQA pylint: disable=unused-import
32
33
34
35
36
37
38


class ASynchronousToolGlueGenerator:

    ################################################
    # Parts to override for each asynchronous tool

Thanassis Tsiodras's avatar
Thanassis Tsiodras committed
39
    def Version(self) -> None:  # pylint: disable=no-self-use
40
41
        panicWithCallStack("Method undefined in a ASynchronousToolGlueGenerator...")  # pragma: no cover

42
    def HeadersOnStartup(self,  # pylint: disable=no-self-use
Thanassis Tsiodras's avatar
Thanassis Tsiodras committed
43
44
                         unused_asnFile: str,
                         unused_outputDir: str,
45
                         unused_maybeFVname: str) -> None:
46
47
        panicWithCallStack("Method undefined in a ASynchronousToolGlueGenerator...")  # pragma: no cover

48
    def Encoder(self,  # pylint: disable=no-self-use
Thanassis Tsiodras's avatar
Thanassis Tsiodras committed
49
50
51
52
                unused_nodeTypename: str,
                unused_node: AsnNode,
                unused_leafTypeDict: AST_Leaftypes,
                unused_names: AST_Lookup,
53
                unused_encoding: str) -> None:
54
55
        panicWithCallStack("Method undefined in a ASynchronousToolGlueGenerator...")  # pragma: no cover

56
    def Decoder(self,  # pylint: disable=no-self-use
Thanassis Tsiodras's avatar
Thanassis Tsiodras committed
57
58
59
60
                unused_nodeTypename: str,
                unused_node: AsnNode,
                unused_leafTypeDict: AST_Leaftypes,
                unused_names: AST_Lookup,
61
                unused_encoding: str) -> None:
62
63
64
65
66
        panicWithCallStack("Method undefined in a ASynchronousToolGlueGenerator...")  # pragma: no cover

    ########################################################
    # Parts to possibly override for each synchronous tool

67
    # noinspection PyMethodMayBeStatic
Thanassis Tsiodras's avatar
Thanassis Tsiodras committed
68
    def CleanNameAsToolWants(self, name: str) -> str:  # pylint: disable=no-self-use
69
70
71
72
73
        return re.sub(r'[^a-zA-Z0-9_]', '_', name)

    ##########################################
    # Common parts for all asynchronous tools

Thanassis Tsiodras's avatar
Thanassis Tsiodras committed
74
    def __init__(self) -> None:
75
        # The files written to
76
77
        self.C_HeaderFile = None  # type: IO[Any]
        self.C_SourceFile = None  # type: IO[Any]
78
79
        self.asn_name = ""
        self.supportedEncodings = ['native', 'uper', 'acn']
80
        self.useOSS = None  # type: bool
Thanassis Tsiodras's avatar
Thanassis Tsiodras committed
81
        self.typesToWorkOn = {}  # type: Dict[str, Tuple[AsnNode, AST_Leaftypes, AST_Lookup]]
82

Thanassis Tsiodras's avatar
Thanassis Tsiodras committed
83
    def OnStartup(self, modelingLanguage: str, asnFile: str, outputDir: str, maybeFVname: str, useOSS: bool) -> None:
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
        self.useOSS = useOSS
        prefix = modelingLanguage
        if prefix == "SDL":
            prefix = "OG"
        outputCheaderFilename = self.CleanNameAsToolWants(prefix) + "_ASN1_Types.h"
        outputCsourceFilename = self.CleanNameAsToolWants(prefix) + "_ASN1_Types.c"

        inform(str(self.__class__) + ": Creating file '%s'...", outputCheaderFilename)
        self.C_HeaderFile = open(outputDir + outputCheaderFilename, 'w')

        inform(str(self.__class__) + ": Creating file '%s'...", outputCsourceFilename)
        self.C_SourceFile = open(outputDir + outputCsourceFilename, 'w')

        self.asn_name = os.path.basename(os.path.splitext(asnFile)[0])

        ID = modelingLanguage + "_" + self.asn_name.replace(".", "")
        ID = re.sub(r'[^A-Za-z0-9_]', '_', ID).upper()
        self.C_HeaderFile.write("#ifndef __%s_H__\n" % ID)
        self.C_HeaderFile.write("#define __%s_H__\n\n" % ID)
Thanassis Tsiodras's avatar
Thanassis Tsiodras committed
103
        self.C_HeaderFile.write("#if defined( __unix__ ) || defined( __MSP430__ )\n")
104
        self.C_HeaderFile.write("#include <stdlib.h> /* for size_t */\n")
105
106
107
        self.C_HeaderFile.write("#else\n")
        self.C_HeaderFile.write("typedef unsigned size_t;\n")
        self.C_HeaderFile.write("#endif\n\n")
108
109
110
        self.C_HeaderFile.write("#ifndef STATIC\n")
        self.C_HeaderFile.write("#define STATIC\n")
        self.C_HeaderFile.write("#endif\n\n")
111
        self.C_HeaderFile.write("\n")
112
        self.C_SourceFile.write("#ifdef __unix__\n")
113
        self.C_SourceFile.write("#include <stdio.h>\n")
114
        self.C_SourceFile.write("#include <assert.h>\n")
115
        self.C_SourceFile.write("#endif\n\n")
116
117
118
119
120
        self.C_SourceFile.write("#include <string.h>\n\n")
        self.C_SourceFile.write("#include \"%s\"\n\n" % outputCheaderFilename)

        self.HeadersOnStartup(asnFile, outputDir, maybeFVname)

Thanassis Tsiodras's avatar
Thanassis Tsiodras committed
121
        self.typesToWorkOn = {}  # type: Dict[str, Tuple[AsnNode, AST_Leaftypes, AST_Lookup]]
122

Thanassis Tsiodras's avatar
Thanassis Tsiodras committed
123
    def Common(self, nodeTypename: str, node: AsnNode, leafTypeDict: AST_Leaftypes, names: AST_Lookup) -> None:
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
        # Async backends are different: they work on ASN.1 types, not SP params.

        # OG for example, needs macros to be generated, not functions. This breaks the normal
        # encapsulation we followed for synchronous tools, because we can't create encoding
        # and decoding functions that work on their arguments. The reason for this is that
        # in contrast with the synchronous world (SCADE and MATLAB/Simulink), we can't use
        # global variables as targets for OG. Being asynchronous, OG-generated code can call
        # an external function whenever it wishes, and it needs message-specific work inside
        # it. That in itself is not bad, but what is bad is the fact that we can't predict
        # the type name used for the message (it is randomly numbered, and it can't be predicted
        # by the ASN.1 typename alone). We therefore resorted to macros... and we thus can't
        # employ a common scheme for all asynchronous tools where we generate functions
        # in the context of this method... Instead, we have to delegate all work to members
        # defined in the derived classes...
        self.Encoder(nodeTypename, node, leafTypeDict, names, 'uper')
        self.Encoder(nodeTypename, node, leafTypeDict, names, 'acn')
        self.Encoder(nodeTypename, node, leafTypeDict, names, 'native')
        self.Decoder(nodeTypename, node, leafTypeDict, names, 'uper')
        self.Decoder(nodeTypename, node, leafTypeDict, names, 'acn')
        self.Decoder(nodeTypename, node, leafTypeDict, names, 'native')

Thanassis Tsiodras's avatar
Thanassis Tsiodras committed
145
    def OnBasic(self, nodeTypename: str, node: AsnNode, leafTypeDict: AST_Leaftypes, names: AST_Lookup) -> None:
146
147
        realLeafType = leafTypeDict[nodeTypename]
        inform(str(self.__class__) + ": BASE: %s (%s)", nodeTypename, realLeafType)
Thanassis Tsiodras's avatar
Thanassis Tsiodras committed
148
        self.typesToWorkOn[nodeTypename] = (node, leafTypeDict, names)
149

Thanassis Tsiodras's avatar
Thanassis Tsiodras committed
150
    def OnSequence(self, nodeTypename: str, node: AsnNode, leafTypeDict: AST_Leaftypes, names: AST_Lookup) -> None:
151
        inform(str(self.__class__) + ": SEQUENCE: %s", nodeTypename)
Thanassis Tsiodras's avatar
Thanassis Tsiodras committed
152
        self.typesToWorkOn[nodeTypename] = (node, leafTypeDict, names)
153

Thanassis Tsiodras's avatar
Thanassis Tsiodras committed
154
    def OnSet(self, nodeTypename: str, node: AsnNode, leafTypeDict: AST_Leaftypes, names: AST_Lookup) -> None:
155
        inform(str(self.__class__) + ": SET: %s", nodeTypename)  # pragma: nocover
Thanassis Tsiodras's avatar
Thanassis Tsiodras committed
156
        self.typesToWorkOn[nodeTypename] = (node, leafTypeDict, names)  # pragma: nocover
157

Thanassis Tsiodras's avatar
Thanassis Tsiodras committed
158
    def OnEnumerated(self, nodeTypename: str, node: AsnNode, leafTypeDict: AST_Leaftypes, names: AST_Lookup) -> None:
159
        inform(str(self.__class__) + ": ENUMERATED: %s", nodeTypename)
Thanassis Tsiodras's avatar
Thanassis Tsiodras committed
160
        self.typesToWorkOn[nodeTypename] = (node, leafTypeDict, names)
161

Thanassis Tsiodras's avatar
Thanassis Tsiodras committed
162
    def OnSequenceOf(self, nodeTypename: str, node: AsnNode, leafTypeDict: AST_Leaftypes, names: AST_Lookup) -> None:
163
        inform(str(self.__class__) + ": SEQUENCEOF: %s", nodeTypename)
Thanassis Tsiodras's avatar
Thanassis Tsiodras committed
164
        self.typesToWorkOn[nodeTypename] = (node, leafTypeDict, names)
165

Thanassis Tsiodras's avatar
Thanassis Tsiodras committed
166
    def OnSetOf(self, nodeTypename: str, node: AsnNode, leafTypeDict: AST_Leaftypes, names: AST_Lookup) -> None:
167
        inform(str(self.__class__) + ": SETOF: %s", nodeTypename)  # pragma: nocover
Thanassis Tsiodras's avatar
Thanassis Tsiodras committed
168
        self.typesToWorkOn[nodeTypename] = (node, leafTypeDict, names)  # pragma: nocover
169

Thanassis Tsiodras's avatar
Thanassis Tsiodras committed
170
    def OnChoice(self, nodeTypename: str, node: AsnNode, leafTypeDict: AST_Leaftypes, names: AST_Lookup) -> None:
171
        inform(str(self.__class__) + ": CHOICE: %s", nodeTypename)
Thanassis Tsiodras's avatar
Thanassis Tsiodras committed
172
        self.typesToWorkOn[nodeTypename] = (node, leafTypeDict, names)
173

Thanassis Tsiodras's avatar
Thanassis Tsiodras committed
174
175
    def OnShutdown(self, unused_modelingLanguage: str, unused_asnFile: str, unused_maybeFVname: str) -> None:
        for nodeTypename, value in self.typesToWorkOn.items():
176
177
178
179
180
181
            inform(str(self.__class__) + "Really working on " + nodeTypename)
            (node, leafTypeDict, names) = value
            self.Common(nodeTypename, node, leafTypeDict, names)
        self.C_HeaderFile.write("\n#endif\n")
        self.C_HeaderFile.close()
        self.C_SourceFile.close()