commonSMP2.py 19.5 KB
Newer Older
1
2
import re
import sys
3

4
from typing import List, Union, Optional, Any, Tuple, Dict  # NOQA pylint: disable=unused-import
5

6
from lxml import etree
7
8
from .asnAST import (
    AsnBool, AsnInt, AsnReal, AsnEnumerated, AsnOctetString, AsnSequenceOf,
9
    AsnSet, AsnSetOf, AsnSequence, AsnChoice, AsnMetaMember, AsnNode, Lookup)
10
11
12
13
14
15

# Level of verbosity
g_verboseLevel = 0

# colors (used when calling 'info')
ESC = chr(27)
16
17
18
19
red = ESC + "[31m"
green = ESC + "[32m"
white = ESC + "[0m"
yellow = ESC + "[33m"
20
colors = [red, green, white, yellow]
21
22
23
24


# Lookup table for SMP2 types that map to AsnBasicNodes
class MagicSmp2SimpleTypesDict(dict):
25
    def __getitem__(self, name: str) -> Any:
26
27
28
29
30
        # strip 'http://www.esa.int/XXXX/YY/Smp#Bool'
        # to    'http://www.esa.int/Smp#Bool'
        name = re.sub(r'/\d{4}/\d{2}/', '/', name)
        return super(MagicSmp2SimpleTypesDict, self).__getitem__(name)

31
    # ---------------------------------------------------------------------------
32
    def __contains__(self, name: Any) -> bool:
33
34
35
        name = re.sub(r'/\d{4}/\d{2}/', '/', name)
        return super(MagicSmp2SimpleTypesDict, self).__contains__(name)

36
    # ---------------------------------------------------------------------------
37
    def has_key(self, name: str) -> bool:
38
        name = re.sub(r'/\d{4}/\d{2}/', '/', name)
39
        return name in super(MagicSmp2SimpleTypesDict, self)  # type: ignore  pylint: disable=unsupported-membership-test
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59


simpleTypesTable = MagicSmp2SimpleTypesDict({
    'http://www.esa.int/Smp#Bool': (AsnBool, None, None),
    'http://www.esa.int/Smp#Char8': (AsnInt, 0, 255),
    'http://www.esa.int/Smp#DateTime': (AsnOctetString, 30, 30),
    'http://www.esa.int/Smp#Duration': (AsnInt, 0, 2147483647),
    'http://www.esa.int/Smp#Int8': (AsnInt, -128, 127),
    'http://www.esa.int/Smp#Int16': (AsnInt, -32768, 32767),
    'http://www.esa.int/Smp#Int32': (AsnInt, -2147483648, 2147483647),
    'http://www.esa.int/Smp#Int64': (AsnInt, -9223372036854775808, 9223372036854775807),
    'http://www.esa.int/Smp#UInt8': (AsnInt, 0, 255),
    'http://www.esa.int/Smp#UInt16': (AsnInt, 0, 65535),
    'http://www.esa.int/Smp#UInt32': (AsnInt, 0, 4294967295),
    'http://www.esa.int/Smp#UInt64': (AsnInt, 0, 9223372036854775807),
    'http://www.esa.int/Smp#Float32': (AsnReal, -3.4E37, 3.4E37),
    'http://www.esa.int/Smp#Float64': (AsnReal, -1.8E307, 1.8E307)
})


60
def setVerbosity(level: int) -> None:
61
62
63
64
    global g_verboseLevel
    g_verboseLevel = level


65
def info(level: int, *args: Any) -> None:
66
    """Checks the 'level' argument against g_verboseLevel and then prints
67
68
69
    the rest of the args, one by one, separated by a space. It also
    has logic to deal with usage of one of the colors as arguments
    (in which case it avoids printing spurious spaces).
70
    """
71
72
    if not args:
        panic("You called info without args")  # pragma: no cover
73
74
    if level <= g_verboseLevel:
        for i in range(len(args)):  # pylint: disable=consider-using-enumerate
75
            if i != 0 and args[i - 1] not in colors:
76
77
                sys.stdout.write(' ')
            sys.stdout.write(args[i])
78
        for i in range(len(args) - 1, -1, -1):
79
80
81
82
83
84
85
            if args[i] in colors:
                continue
            if not args[i].endswith('\n'):
                sys.stdout.write('\n')
                return


86
def panic(x: str, coloredBanner: str="") -> None:
87
    """Notifies the user that something fatal happened and aborts. """
88
89
90
91
92
    info(0, yellow + coloredBanner + white + '\n' + x)
    sys.exit(1)


class DashUnderscoreAgnosticDict(dict):
93
    """A dictionary that automatically replaces '_' to '-' in its keys. """
94
    def __setitem__(self, key: Any, value: Any) -> None:
95
96
        super(DashUnderscoreAgnosticDict, self).__setitem__(key.replace('_', '-'), value)

97
    def __getitem__(self, key: Any) -> Any:
98
99
        return super(DashUnderscoreAgnosticDict, self).__getitem__(key.replace('_', '-'))

100
    def __contains__(self, key: Any) -> bool:
101
102
103
104
        return super(DashUnderscoreAgnosticDict, self).__contains__(key.replace('_', '-'))


class Attributes:
105
    """Helper class, to ease access to XML attributes.
106
107
108
109
110
111
112
113
114
115
    It allows us to write code like this...

            a = Attributes(lxmlEtreeNode)
            whatever = a.href
            print a.title

        ...instead of this:

            whatever = lxmlEtreeNode.get('href', None)
            print a.get('title', None)
116
    """
117
118
119
    base = None  # type: str
    sourceline = None  # type: int

120
    def __init__(self, t: Dict[str, Any]) -> None:
121
        """Argument t is an lxml Etree node."""
122
        self._attrs = {}  # type: Dict[str, Any]
123
        for k, v in list(t.items()):
124
125
            endBraceIdx = k.find('}')
            if endBraceIdx != -1:
126
                k = k[endBraceIdx + 1:]
127
128
            self._attrs[k] = v

129
    def __getattr__(self, x: str) -> Any:
130
131
132
        return self._attrs.get(x, None)


133
def Clean(fieldName: str) -> str:
134
135
    """When mapping field names and type names from SMP2 to ASN.1,
    we need to change '_' to '-'. """
136
137
138
    return re.sub(r'[^a-zA-Z0-9-]', '-', fieldName)


139
140
141
142
143
def MapSMP2Type(
        attrs: Attributes,
        enumOptions: List[Tuple[str, str]],
        itemTypes: List[Any],  # pylint: disable=invalid-sequence-index
        fields: List[Any]) -> AsnNode:  # pylint: disable=invalid-sequence-index
144
    """
145
146
    Core mapping function. Works on the XML attributes of the lxml Etree node,
    and returns a node from commonPy.asnAST.
147
    """
148
149
150
    location = 'from %s, in line %s' % (attrs.base, attrs.sourceline)
    info(2, "Mapping SMP2 type", location)

151
    def getMaybe(cast: Any, x: str) -> Optional[Any]:
152
153
154
155
156
        try:
            return cast(x)
        except:  # pragma: no cover
            return None  # pragma: no cover
    dataDict = {"asnFilename": attrs.base, "lineno": attrs.sourceline}
157

158
    def HandleTypesInteger() -> Union[AsnBool, AsnInt]:
159
160
161
        lowRange = getMaybe(int, attrs.Minimum)
        highRange = getMaybe(int, attrs.Maximum)
        if lowRange == 0 and highRange == 1:
162
163
164
165
            # Pseudo-boolean from TASTE mapping, as per SpaceBel instructions
            return AsnBool(**dataDict)
        else:
            # Normal integer
166
167
            spanRange = [lowRange, highRange] if lowRange is not None and highRange is not None else []
            dataDict["range"] = spanRange
168
            return AsnInt(**dataDict)
169

170
    def HandleTypesFloat() -> AsnReal:
171
172
173
174
        lowRange = getMaybe(float, attrs.Minimum)
        highRange = getMaybe(float, attrs.Maximum)
        spanRange = [lowRange, highRange] if lowRange is not None and highRange is not None else []
        dataDict["range"] = spanRange
175
        return AsnReal(**dataDict)
176

177
    def HandleTypesArray() -> Union[AsnOctetString, AsnSequenceOf]:
178
        if not itemTypes:
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
            panic("Missing mandatory ItemType element", location)  # pragma: no cover
        itemTypeAttrs = Attributes(itemTypes[0])
        arrSize = getMaybe(int, attrs.Size)
        if not arrSize:
            panic("Missing array 'Size' attribute", location)  # pragma: no cover
        dataDict["range"] = [arrSize, arrSize]
        if itemTypeAttrs.href in [
                'http://www.esa.int/2005/10/Smp#Char8',
                'http://www.esa.int/2005/10/Smp#Int8',
                'http://www.esa.int/2005/10/Smp#UInt8']:
            return AsnOctetString(**dataDict)
        else:
            containedHref = itemTypeAttrs.href
            if not containedHref:
                panic("Missing reference to 'href' (file:%s, line:%d)" %
194
                      (itemTypeAttrs.base, itemTypeAttrs.sourceline))  # pragma: no cover
195
196
            idxHash = containedHref.find('#')
            if -1 != idxHash:
197
                containedHref = containedHref[idxHash + 1:]
198
199
            if itemTypeAttrs.href in simpleTypesTable:
                # Create the AsnBasicNode this child maps to.
200
201
                cast, lowRange, highRange = simpleTypesTable[itemTypeAttrs.href]
                spanRange = [lowRange, highRange] if lowRange is not None and highRange is not None else []
202
203
204
205
                childDict = {
                    'asnFilename': itemTypes[0].base,
                    'lineno': itemTypes[0].sourceline
                }
206
207
                if spanRange:
                    childDict['range'] = spanRange
208
209
210
211
212
213
214
                childNode = cast(**childDict)
                dataDict['containedType'] = childNode
            else:
                # Store the 'Id' attribute - we will resolve this
                # in the FixupOutOfOrderIdReferences function.
                dataDict['containedType'] = containedHref
            return AsnSequenceOf(**dataDict)
215

216
    def HandleTypesStructure() -> Union[AsnChoice, AsnSequence]:
217
218
219
220
221
222
223
224
225
226
        members = []
        for field in fields:
            try:
                fieldName = field.get('Name')
                if fieldName != 'choiceIdx':
                    fieldName = Clean(fieldName)
                    fieldName = fieldName[0].lower() + fieldName[1:]
                    try:
                        refTypeAttrs = Attributes(field.xpath("Type")[0])
                    except:  # pragma: no cover
227
228
229
                        loc = 'from %s, in line %s' % \
                              (field.base, field.sourceline)  # pragma: no cover
                        panic("Missing Type child element", loc)  # pragma: no cover
230
231
232
                    refTypeHref = refTypeAttrs.href
                    idxHash = refTypeHref.find('#')
                    if -1 != idxHash:
233
                        refTypeHref = refTypeHref[idxHash + 1:]
234
                    if refTypeAttrs.href in simpleTypesTable:
235
                        cast, lowRange, highRange = simpleTypesTable[refTypeAttrs.href]
236
237
238
239
                        containedDict = {
                            'asnFilename': field.base,
                            'lineno': field.sourceline
                        }
240
241
242
                        spanRange = [lowRange, highRange] if lowRange is not None and highRange is not None else []
                        if spanRange:
                            containedDict['range'] = [lowRange, highRange]
243
244
245
246
247
248
249
250
251
                        basicNode = cast(**containedDict)
                        members.append((fieldName, basicNode))
                    else:
                        members.append((fieldName, AsnMetaMember(
                            asnFilename=field.base,
                            lineno=field.sourceline,
                            containedType=refTypeHref)))
                else:
                    members.append((fieldName, 'dummy'))
252
            except Exception as e:  # pragma: no cover
253
254
255
256
257
                panic(str(e) + '\nMake sure that:\n'
                      '1. The "Name" attribute exists\n'
                      '2. The "Type" child element, with attribute '
                      '"xlink:title" also exists.',
                      'In %s, line %d:' % (field.base, field.sourceline))  # pragma: no cover
258
        if not members:
259
            panic("Empty SEQUENCE is not supported", loc)  # pragma: no cover
260
261
262
263
264
265
        if members[0][0] == 'choiceIdx':
            dataDict['members'] = members[1:]
            return AsnChoice(**dataDict)
        else:
            dataDict['members'] = members
            return AsnSequence(**dataDict)
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282

    if attrs.type == 'Types:Integer':
        return HandleTypesInteger()
    elif attrs.type == 'Types:Float':
        return HandleTypesFloat()
    elif attrs.type == 'Types:Enumeration':
        dataDict["members"] = enumOptions
        return AsnEnumerated(**dataDict)
    elif attrs.type == 'Types:String':
        high = getMaybe(int, attrs.Length)
        span = [high, high] if high is not None else []
        dataDict["range"] = span
        return AsnOctetString(**dataDict)
    elif attrs.type == 'Types:Array':
        return HandleTypesArray()
    elif attrs.type == 'Types:Structure':  # pylint: disable=too-many-nested-blocks
        return HandleTypesStructure()
283
284
285
    panic("Failed to map... (%s)" % attrs.type, location)  # pragma: no cover


286
def FixupOutOfOrderIdReferences(nodeTypename: str, asnTypesDict: Lookup, idToTypeDict: Dict[str, str]) -> None:
287
    """Based on the uniqueness of the 'Id' elements used in
288
    'xlink:href' remote references, we resolve the lookups of
289
    remote types that we stored in AsnMetaMembers during MapSMP2Type()."""
290
    node = asnTypesDict[nodeTypename]
291
    if isinstance(node, (AsnChoice, AsnSequence, AsnSet)):
292
293
294
295
296
297
298
299
300
301
        for idx, child in enumerate(node._members):
            if isinstance(child[1], AsnMetaMember):
                containedType = child[1]._containedType
                if containedType in idToTypeDict:
                    containedType = idToTypeDict[containedType]
                if containedType in asnTypesDict:
                    node._members[idx] = (child[0], asnTypesDict[containedType])
                else:
                    panic("Could not resolve Field '%s' in type '%s' (contained: %s)..." %
                          (child[0], nodeTypename, containedType), node.Location())  # pragma: no cover
302
    elif isinstance(node, (AsnSequenceOf, AsnSetOf)):
303
304
305
306
307
308
309
310
311
312
313
        if isinstance(node._containedType, str):
            containedType = node._containedType
            if containedType in idToTypeDict:
                containedType = idToTypeDict[containedType]
            if containedType in asnTypesDict:
                node._containedType = asnTypesDict[containedType]
            else:
                panic("In type '%s', could not resolve: %s)" %
                      (nodeTypename, containedType), node.Location())  # pragma: no cover


314
315
def ConvertCatalogueToASN_AST(
        inputSmp2Files: List[str]) -> Tuple[Lookup, Dict[str, str]]:  # pylint: disable=invalid-sequence-index
316
317
    """Converts a list of input SMP2 Catalogues into an ASN.1 AST,
    which it returns to the caller."""
318
319
    asnTypesDict = DashUnderscoreAgnosticDict()
    idToTypeDict = {}
320
    allSMP2Types = {}  # type: Dict[str, str]
321
322
323
    # Do a first pass, verifying the primary assumption:
    # That 'Id' elements of types are unique across our set of SMP2 files.
    for inputSmp2File in inputSmp2Files:
324
        a = etree.parse(open(inputSmp2File))  # type: Any  # mypy bugs in ElementTree handling
325
326
        root = a.getroot()
        if len(root) < 1 or not root.tag.endswith('Catalogue'):
327
328
329
330
331
332
333
334
335
336
337
338
339
            panic('', "You must use an XML file that contains an SMP2 Catalogue")  # pragma: no cover
        for t in root.xpath("//Type"):
            a = Attributes(t)
            if not a.Id:  # Missing attribute Id, don't bother checking for duplicates
                continue
            if a.Id in allSMP2Types:
                catalogue = allSMP2Types[a.Id]  # pragma: no cover
                if catalogue != inputSmp2File:  # pragma: no cover
                    panic("The same Id exists in two files: %s exists in both: %s" %
                          (a.Id, str([catalogue, inputSmp2File])))  # pragma: no cover
            else:
                allSMP2Types[a.Id] = inputSmp2File
    for inputSmp2File in inputSmp2Files:
340
341
342
        a = etree.parse(open(inputSmp2File))
        root = a.getroot()
        if len(root) < 1 or not root.tag.endswith('Catalogue'):
343
344
345
346
347
348
349
350
351
352
            panic('', "You must use an XML file that contains an SMP2 Catalogue")  # pragma: no cover
        for t in root.xpath("//Type"):
            # Find the enclosing Namespace element
            for namespace in t.iterancestors(tag='Namespace'):
                break
            else:
                panic("No Namespace parent node found (file:%s, line:%d)" %
                      t.base, t.sourceline)  # pragma: no cover

            # Store the namespace 'Name' attribute, and use it to prefix our types
353
            nsName = namespace.get('Name')  # pylint: disable=undefined-loop-variable
354
355
            if not nsName:
                panic("Missing attribute Name from Namespace (file:%s, line:%d)" %
356
                      namespace.base, namespace.sourceline)  # pragma: no cover pylint: disable=undefined-loop-variable
357
358
359
360
361
362
363
364
365
366
367
368
369
370
            cataloguePrefix = Clean(nsName).capitalize() + "_"

            a = Attributes(t)
            a.base = t.base
            a.sourceline = t.sourceline

            if not a.type:
                # Check to see if this is one of the hardcoded types
                if a.href in simpleTypesTable:
                    k = a.href
                    v = simpleTypesTable[k]
                    nodeTypename = a.title
                    if nodeTypename is None:
                        panic("'xlink:href' points to ready-made SMP2 type, but 'xlink:title' is missing! (file:%s, line:%d)" %
371
                              (a.base, a.sourceline))  # pragma: no cover
372
373
374
375
376
377
378
                    nodeTypename = Clean(nodeTypename.split()[-1]).capitalize()  # Primitive Int32 -> Int32
                    cast, low, high = v
                    containedDict = {
                        'asnFilename': a.base,
                        'lineno': a.sourceline
                    }
                    span = [low, high] if (low is not None and high is not None) else []
379
                    if span:
380
                        containedDict['range'] = [low, high]
381
382
383
384
                    # Especially for these hardcoded types, don't prefix with namespace.Name
                    asnTypesDict[nodeTypename] = cast(**containedDict)
                else:
                    if a.href is not None and a.href.startswith("http://www.esa.int/"):
385
                        print("WARNING: Unknown hardcoded (%s) - should it be added in commonSMP2.py:simpleTypesTable?" % a.href)
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
                    # This <Type> element had no xsi:type, and it's xlink:title was not in the hardcoded list
                    # Skip it.
                    # panic("Both 'xsi:type' and 'Name' are mandatory attributes (file:%s, line:%d)" %
                    #       (a.base, a.sourceline))  # pragma: no cover
                    continue

                # The type was merged in the AST or skipped over - work on the next one
                continue

            if a.type.startswith('Catalogue:'):
                # We only wants Types, nothing more
                continue

            nodeTypename = a.Name
            nodeTypename = nodeTypename[0].upper() + nodeTypename[1:]
            nodeTypename = nodeTypename.replace('_', '-')

            # Gather children node's info:

            # 1. Enumeration data
406
            enumOptions = []  # type: List[Tuple[str, str]]
407
408
            if a.type == 'Types:Enumeration':
                for node in t.xpath("Literal"):
409
410
411
                    nn = node.get('Name').replace('_', '-').lower()  # type: str
                    vv = node.get('Value').replace('_', '-').lower()  # type: str
                    enumOptions.append((nn, vv))
412
413
414
415
416
417
418
419
420
421
422
423
424

            # 2. ItemType data (used in arrays)
            itemTypes = t.xpath("ItemType")

            # 3. Field data (used in structures)
            fields = t.xpath("Field")

            try:
                description = t.xpath("Description")[0].text
            except:  # pragma: no cover
                location = 'from %s, in line %s' % \
                    (t.base, t.sourceline)  # pragma: no cover
                panic("Missing Description child element", location)  # pragma: no cover
425
            info(2, "Creating type:", cataloguePrefix + nodeTypename)
426
427
428
429
430
431
432
433
            asnNode = MapSMP2Type(a, enumOptions, itemTypes, fields)
            if 'artificial' in description:
                asnNode._isArtificial = True
            asnTypesDict[cataloguePrefix + nodeTypename] = asnNode
            # Store mapping from Id to typename in idToTypeDict
            # (used below, in FixupOutOfOrderIdReferences)
            idToTypeDict[a.Id] = cataloguePrefix + nodeTypename

434
    for nodeTypename in list(asnTypesDict.keys()):
435
436
        FixupOutOfOrderIdReferences(nodeTypename, asnTypesDict, idToTypeDict)
    return asnTypesDict, idToTypeDict
Thanassis Tsiodras's avatar
Thanassis Tsiodras committed
437
438

# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4