commonSMP2.py 18.6 KB
Newer Older
1
2
import re
import sys
3
4
5

from typing import Any  # NOQA pylint: disable=unused-import

6
7
8
9
10
11
12
13
14
15
16
17
18
19
from lxml import etree
from commonPy.asnAST import AsnBool, AsnInt, AsnReal, \
    AsnEnumerated, AsnOctetString, AsnSequenceOf, AsnSet, \
    AsnSetOf, AsnSequence, AsnChoice, AsnMetaMember

# Level of verbosity
g_verboseLevel = 0

# colors (used when calling 'info')
ESC = chr(27)
red = ESC+"[31m"
green = ESC+"[32m"
white = ESC+"[0m"
yellow = ESC+"[33m"
20
colors = [red, green, white, yellow]
21
22
23
24
25
26
27
28
29
30


# Lookup table for SMP2 types that map to AsnBasicNodes
class MagicSmp2SimpleTypesDict(dict):
    def __getitem__(self, name):
        # strip 'http://www.esa.int/XXXX/YY/Smp#Bool'
        # to    'http://www.esa.int/Smp#Bool'
        name = re.sub(r'/\d{4}/\d{2}/', '/', name)
        return super(MagicSmp2SimpleTypesDict, self).__getitem__(name)

31
    # ---------------------------------------------------------------------------
32
33
34
35
    def __contains__(self, name):
        name = re.sub(r'/\d{4}/\d{2}/', '/', name)
        return super(MagicSmp2SimpleTypesDict, self).__contains__(name)

36
    # ---------------------------------------------------------------------------
37
38
    def has_key(self, name):
        name = re.sub(r'/\d{4}/\d{2}/', '/', name)
39
        return name in super(MagicSmp2SimpleTypesDict, self)  # pylint: disable=unsupported-membership-test
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72


simpleTypesTable = MagicSmp2SimpleTypesDict({
    'http://www.esa.int/Smp#Bool': (AsnBool, None, None),
    'http://www.esa.int/Smp#Char8': (AsnInt, 0, 255),
    'http://www.esa.int/Smp#DateTime': (AsnOctetString, 30, 30),
    'http://www.esa.int/Smp#Duration': (AsnInt, 0, 2147483647),
    'http://www.esa.int/Smp#Int8': (AsnInt, -128, 127),
    'http://www.esa.int/Smp#Int16': (AsnInt, -32768, 32767),
    'http://www.esa.int/Smp#Int32': (AsnInt, -2147483648, 2147483647),
    'http://www.esa.int/Smp#Int64': (AsnInt, -9223372036854775808, 9223372036854775807),
    'http://www.esa.int/Smp#UInt8': (AsnInt, 0, 255),
    'http://www.esa.int/Smp#UInt16': (AsnInt, 0, 65535),
    'http://www.esa.int/Smp#UInt32': (AsnInt, 0, 4294967295),
    'http://www.esa.int/Smp#UInt64': (AsnInt, 0, 9223372036854775807),
    'http://www.esa.int/Smp#Float32': (AsnReal, -3.4E37, 3.4E37),
    'http://www.esa.int/Smp#Float64': (AsnReal, -1.8E307, 1.8E307)
})


def setVerbosity(level):
    global g_verboseLevel
    g_verboseLevel = level


def info(level, *args):
    '''Checks the 'level' argument against g_verboseLevel and then prints
    the rest of the args, one by one, separated by a space. It also
    has logic to deal with usage of one of the colors as arguments
    (in which case it avoids printing spurious spaces).
    '''
    if not args:
        panic("You called info without args")  # pragma: no cover
73
74
75
    if level <= g_verboseLevel:
        for i in range(len(args)):  # pylint: disable=consider-using-enumerate
            if i != 0 and args[i-1] not in colors:
76
77
                sys.stdout.write(' ')
            sys.stdout.write(args[i])
78
        for i in range(len(args)-1, -1, -1):
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
            if args[i] in colors:
                continue
            if not args[i].endswith('\n'):
                sys.stdout.write('\n')
                return


def panic(x, coloredBanner=""):
    '''Notifies the user that something fatal happened and aborts. '''
    info(0, yellow + coloredBanner + white + '\n' + x)
    sys.exit(1)


class DashUnderscoreAgnosticDict(dict):
    '''A dictionary that automatically replaces '_' to '-' in its keys. '''
    def __setitem__(self, key, value):
        super(DashUnderscoreAgnosticDict, self).__setitem__(key.replace('_', '-'), value)

    def __getitem__(self, key):
        return super(DashUnderscoreAgnosticDict, self).__getitem__(key.replace('_', '-'))

    def __contains__(self, key):
        return super(DashUnderscoreAgnosticDict, self).__contains__(key.replace('_', '-'))


class Attributes:
    '''Helper class, to ease access to XML attributes.
    It allows us to write code like this...

            a = Attributes(lxmlEtreeNode)
            whatever = a.href
            print a.title

        ...instead of this:

            whatever = lxmlEtreeNode.get('href', None)
            print a.get('title', None)
    '''
117
118
119
    base = None  # type: str
    sourceline = None  # type: int

120
121
    def __init__(self, t):
        '''Argument t is an lxml Etree node.'''
122
        self._attrs = {}  # type: Dict[str, Any]
123
        for k, v in list(t.items()):
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
            endBraceIdx = k.find('}')
            if endBraceIdx != -1:
                k = k[endBraceIdx+1:]
            self._attrs[k] = v

    def __getattr__(self, x):
        return self._attrs.get(x, None)


def Clean(fieldName):
    '''When mapping field names and type names from SMP2 to ASN.1,
    we need to change '_' to '-'. '''
    return re.sub(r'[^a-zA-Z0-9-]', '-', fieldName)


def MapSMP2Type(attrs, enumOptions, itemTypes, fields):
    '''
    Core mapping function. Works on the XML attributes of the lxml Etree node,
    and returns a node from commonPy.asnAST.
    '''
    location = 'from %s, in line %s' % (attrs.base, attrs.sourceline)
    info(2, "Mapping SMP2 type", location)

    def getMaybe(cast, x):
        try:
            return cast(x)
        except:  # pragma: no cover
            return None  # pragma: no cover
    dataDict = {"asnFilename": attrs.base, "lineno": attrs.sourceline}
153
154

    def HandleTypesInteger():
155
156
        low = getMaybe(int, attrs.Minimum)
        high = getMaybe(int, attrs.Maximum)
157
        if low == 0 and high == 1:
158
159
160
161
162
163
164
            # Pseudo-boolean from TASTE mapping, as per SpaceBel instructions
            return AsnBool(**dataDict)
        else:
            # Normal integer
            span = [low, high] if low is not None and high is not None else []
            dataDict["range"] = span
            return AsnInt(**dataDict)
165
166

    def HandleTypesFloat():
167
168
169
170
171
        low = getMaybe(float, attrs.Minimum)
        high = getMaybe(float, attrs.Maximum)
        span = [low, high] if low is not None and high is not None else []
        dataDict["range"] = span
        return AsnReal(**dataDict)
172
173

    def HandleTypesArray():
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
        if itemTypes == []:
            panic("Missing mandatory ItemType element", location)  # pragma: no cover
        itemTypeAttrs = Attributes(itemTypes[0])
        arrSize = getMaybe(int, attrs.Size)
        if not arrSize:
            panic("Missing array 'Size' attribute", location)  # pragma: no cover
        dataDict["range"] = [arrSize, arrSize]
        if itemTypeAttrs.href in [
                'http://www.esa.int/2005/10/Smp#Char8',
                'http://www.esa.int/2005/10/Smp#Int8',
                'http://www.esa.int/2005/10/Smp#UInt8']:
            return AsnOctetString(**dataDict)
        else:
            containedHref = itemTypeAttrs.href
            if not containedHref:
                panic("Missing reference to 'href' (file:%s, line:%d)" %
                      itemTypeAttrs.base, itemTypeAttrs.sourceline)  # pragma: no cover
            idxHash = containedHref.find('#')
            if -1 != idxHash:
                containedHref = containedHref[idxHash+1:]
            if itemTypeAttrs.href in simpleTypesTable:
                # Create the AsnBasicNode this child maps to.
                cast, low, high = simpleTypesTable[itemTypeAttrs.href]
197
                span = [low, high] if low is not None and high is not None else []
198
199
200
201
202
203
204
205
206
207
208
209
210
                childDict = {
                    'asnFilename': itemTypes[0].base,
                    'lineno': itemTypes[0].sourceline
                }
                if span != []:
                    childDict['range'] = span
                childNode = cast(**childDict)
                dataDict['containedType'] = childNode
            else:
                # Store the 'Id' attribute - we will resolve this
                # in the FixupOutOfOrderIdReferences function.
                dataDict['containedType'] = containedHref
            return AsnSequenceOf(**dataDict)
211
212

    def HandleTypesStructure():
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
        members = []
        for field in fields:
            try:
                fieldName = field.get('Name')
                if fieldName != 'choiceIdx':
                    fieldName = Clean(fieldName)
                    fieldName = fieldName[0].lower() + fieldName[1:]
                    try:
                        refTypeAttrs = Attributes(field.xpath("Type")[0])
                    except:  # pragma: no cover
                        location = 'from %s, in line %s' % \
                            (field.base, field.sourceline)  # pragma: no cover
                        panic("Missing Type child element", location)  # pragma: no cover
                    refTypeHref = refTypeAttrs.href
                    idxHash = refTypeHref.find('#')
                    if -1 != idxHash:
                        refTypeHref = refTypeHref[idxHash+1:]
                    if refTypeAttrs.href in simpleTypesTable:
                        cast, low, high = simpleTypesTable[refTypeAttrs.href]
                        containedDict = {
                            'asnFilename': field.base,
                            'lineno': field.sourceline
                        }
                        span = [low, high] if low is not None and high is not None else []
                        if span != []:
238
                            containedDict['range'] = [low, high]
239
240
241
242
243
244
245
246
247
                        basicNode = cast(**containedDict)
                        members.append((fieldName, basicNode))
                    else:
                        members.append((fieldName, AsnMetaMember(
                            asnFilename=field.base,
                            lineno=field.sourceline,
                            containedType=refTypeHref)))
                else:
                    members.append((fieldName, 'dummy'))
248
            except Exception as e:  # pragma: no cover
249
250
251
252
253
                panic(str(e) + '\nMake sure that:\n'
                      '1. The "Name" attribute exists\n'
                      '2. The "Type" child element, with attribute '
                      '"xlink:title" also exists.',
                      'In %s, line %d:' % (field.base, field.sourceline))  # pragma: no cover
254
        if not members:
255
256
257
258
259
260
261
            panic("Empty SEQUENCE is not supported", location)  # pragma: no cover
        if members[0][0] == 'choiceIdx':
            dataDict['members'] = members[1:]
            return AsnChoice(**dataDict)
        else:
            dataDict['members'] = members
            return AsnSequence(**dataDict)
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278

    if attrs.type == 'Types:Integer':
        return HandleTypesInteger()
    elif attrs.type == 'Types:Float':
        return HandleTypesFloat()
    elif attrs.type == 'Types:Enumeration':
        dataDict["members"] = enumOptions
        return AsnEnumerated(**dataDict)
    elif attrs.type == 'Types:String':
        high = getMaybe(int, attrs.Length)
        span = [high, high] if high is not None else []
        dataDict["range"] = span
        return AsnOctetString(**dataDict)
    elif attrs.type == 'Types:Array':
        return HandleTypesArray()
    elif attrs.type == 'Types:Structure':  # pylint: disable=too-many-nested-blocks
        return HandleTypesStructure()
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
    panic("Failed to map... (%s)" % attrs.type, location)  # pragma: no cover


def FixupOutOfOrderIdReferences(nodeTypename, asnTypesDict, idToTypeDict):
    '''Based on the uniqueness of the 'Id' elements used in
    'xlink:href' remote references, we resolve the lookups of
    remote types that we stored in AsnMetaMembers during MapSMP2Type().'''
    node = asnTypesDict[nodeTypename]
    if isinstance(node, AsnChoice) or isinstance(node, AsnSequence) or isinstance(node, AsnSet):
        for idx, child in enumerate(node._members):
            if isinstance(child[1], AsnMetaMember):
                containedType = child[1]._containedType
                if containedType in idToTypeDict:
                    containedType = idToTypeDict[containedType]
                if containedType in asnTypesDict:
                    node._members[idx] = (child[0], asnTypesDict[containedType])
                else:
                    panic("Could not resolve Field '%s' in type '%s' (contained: %s)..." %
                          (child[0], nodeTypename, containedType), node.Location())  # pragma: no cover
    elif isinstance(node, AsnSequenceOf) or isinstance(node, AsnSetOf):
        if isinstance(node._containedType, str):
            containedType = node._containedType
            if containedType in idToTypeDict:
                containedType = idToTypeDict[containedType]
            if containedType in asnTypesDict:
                node._containedType = asnTypesDict[containedType]
            else:
                panic("In type '%s', could not resolve: %s)" %
                      (nodeTypename, containedType), node.Location())  # pragma: no cover


def ConvertCatalogueToASN_AST(inputSmp2Files):
    '''Converts a list of input SMP2 Catalogues into an ASN.1 AST,
    which it returns to the caller.'''
    asnTypesDict = DashUnderscoreAgnosticDict()
    idToTypeDict = {}
315
    allSMP2Types = {}  # type: Dict[str, str]
316
317
318
    # Do a first pass, verifying the primary assumption:
    # That 'Id' elements of types are unique across our set of SMP2 files.
    for inputSmp2File in inputSmp2Files:
319
320
321
        a = etree.parse(open(inputSmp2File))
        root = a.getroot()
        if len(root) < 1 or not root.tag.endswith('Catalogue'):
322
323
324
325
326
327
328
329
330
331
332
333
334
            panic('', "You must use an XML file that contains an SMP2 Catalogue")  # pragma: no cover
        for t in root.xpath("//Type"):
            a = Attributes(t)
            if not a.Id:  # Missing attribute Id, don't bother checking for duplicates
                continue
            if a.Id in allSMP2Types:
                catalogue = allSMP2Types[a.Id]  # pragma: no cover
                if catalogue != inputSmp2File:  # pragma: no cover
                    panic("The same Id exists in two files: %s exists in both: %s" %
                          (a.Id, str([catalogue, inputSmp2File])))  # pragma: no cover
            else:
                allSMP2Types[a.Id] = inputSmp2File
    for inputSmp2File in inputSmp2Files:
335
336
337
        a = etree.parse(open(inputSmp2File))
        root = a.getroot()
        if len(root) < 1 or not root.tag.endswith('Catalogue'):
338
339
340
341
342
343
344
345
346
347
            panic('', "You must use an XML file that contains an SMP2 Catalogue")  # pragma: no cover
        for t in root.xpath("//Type"):
            # Find the enclosing Namespace element
            for namespace in t.iterancestors(tag='Namespace'):
                break
            else:
                panic("No Namespace parent node found (file:%s, line:%d)" %
                      t.base, t.sourceline)  # pragma: no cover

            # Store the namespace 'Name' attribute, and use it to prefix our types
348
            nsName = namespace.get('Name')  # pylint: disable=undefined-loop-variable
349
350
            if not nsName:
                panic("Missing attribute Name from Namespace (file:%s, line:%d)" %
351
                      namespace.base, namespace.sourceline)  # pragma: no cover pylint: disable=undefined-loop-variable
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
            cataloguePrefix = Clean(nsName).capitalize() + "_"

            a = Attributes(t)
            a.base = t.base
            a.sourceline = t.sourceline

            if not a.type:
                # Check to see if this is one of the hardcoded types
                if a.href in simpleTypesTable:
                    k = a.href
                    v = simpleTypesTable[k]
                    nodeTypename = a.title
                    if nodeTypename is None:
                        panic("'xlink:href' points to ready-made SMP2 type, but 'xlink:title' is missing! (file:%s, line:%d)" %
                              a.base, a.sourceline)  # pragma: no cover
                    nodeTypename = Clean(nodeTypename.split()[-1]).capitalize()  # Primitive Int32 -> Int32
                    cast, low, high = v
                    containedDict = {
                        'asnFilename': a.base,
                        'lineno': a.sourceline
                    }
                    span = [low, high] if (low is not None and high is not None) else []
                    if span != []:
375
                        containedDict['range'] = [low, high]
376
377
378
379
                    # Especially for these hardcoded types, don't prefix with namespace.Name
                    asnTypesDict[nodeTypename] = cast(**containedDict)
                else:
                    if a.href is not None and a.href.startswith("http://www.esa.int/"):
380
                        print("WARNING: Unknown hardcoded (%s) - should it be added in commonSMP2.py:simpleTypesTable?" % a.href)
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
                    # This <Type> element had no xsi:type, and it's xlink:title was not in the hardcoded list
                    # Skip it.
                    # panic("Both 'xsi:type' and 'Name' are mandatory attributes (file:%s, line:%d)" %
                    #       (a.base, a.sourceline))  # pragma: no cover
                    continue

                # The type was merged in the AST or skipped over - work on the next one
                continue

            if a.type.startswith('Catalogue:'):
                # We only wants Types, nothing more
                continue

            nodeTypename = a.Name
            nodeTypename = nodeTypename[0].upper() + nodeTypename[1:]
            nodeTypename = nodeTypename.replace('_', '-')

            # Gather children node's info:

            # 1. Enumeration data
            enumOptions = []
            if a.type == 'Types:Enumeration':
                for node in t.xpath("Literal"):
404
405
406
                    enumOptions.append(
                        [x.replace('_', '-').lower()
                         for x in [node.get('Name'), node.get('Value')]])
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428

            # 2. ItemType data (used in arrays)
            itemTypes = t.xpath("ItemType")

            # 3. Field data (used in structures)
            fields = t.xpath("Field")

            try:
                description = t.xpath("Description")[0].text
            except:  # pragma: no cover
                location = 'from %s, in line %s' % \
                    (t.base, t.sourceline)  # pragma: no cover
                panic("Missing Description child element", location)  # pragma: no cover
            info(2, "Creating type:", cataloguePrefix+nodeTypename)
            asnNode = MapSMP2Type(a, enumOptions, itemTypes, fields)
            if 'artificial' in description:
                asnNode._isArtificial = True
            asnTypesDict[cataloguePrefix + nodeTypename] = asnNode
            # Store mapping from Id to typename in idToTypeDict
            # (used below, in FixupOutOfOrderIdReferences)
            idToTypeDict[a.Id] = cataloguePrefix + nodeTypename

429
    for nodeTypename in list(asnTypesDict.keys()):
430
431
        FixupOutOfOrderIdReferences(nodeTypename, asnTypesDict, idToTypeDict)
    return asnTypesDict, idToTypeDict