commonSMP2.py 17.8 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
import re
import sys
from lxml import etree
from commonPy.asnAST import AsnBool, AsnInt, AsnReal, \
    AsnEnumerated, AsnOctetString, AsnSequenceOf, AsnSet, \
    AsnSetOf, AsnSequence, AsnChoice, AsnMetaMember

# Level of verbosity
g_verboseLevel = 0

# colors (used when calling 'info')
ESC = chr(27)
red = ESC+"[31m"
green = ESC+"[32m"
white = ESC+"[0m"
yellow = ESC+"[33m"
colors=[red, green, white, yellow]


# Lookup table for SMP2 types that map to AsnBasicNodes
class MagicSmp2SimpleTypesDict(dict):
    def __getitem__(self, name):
        # strip 'http://www.esa.int/XXXX/YY/Smp#Bool'
        # to    'http://www.esa.int/Smp#Bool'
        name = re.sub(r'/\d{4}/\d{2}/', '/', name)
        return super(MagicSmp2SimpleTypesDict, self).__getitem__(name)

    #---------------------------------------------------------------------------
    def __contains__(self, name):
        name = re.sub(r'/\d{4}/\d{2}/', '/', name)
        return super(MagicSmp2SimpleTypesDict, self).__contains__(name)

    #---------------------------------------------------------------------------
    def has_key(self, name):
        name = re.sub(r'/\d{4}/\d{2}/', '/', name)
        return super(MagicSmp2SimpleTypesDict, self).has_key(name)


simpleTypesTable = MagicSmp2SimpleTypesDict({
    'http://www.esa.int/Smp#Bool': (AsnBool, None, None),
    'http://www.esa.int/Smp#Char8': (AsnInt, 0, 255),
    'http://www.esa.int/Smp#DateTime': (AsnOctetString, 30, 30),
    'http://www.esa.int/Smp#Duration': (AsnInt, 0, 2147483647),
    'http://www.esa.int/Smp#Int8': (AsnInt, -128, 127),
    'http://www.esa.int/Smp#Int16': (AsnInt, -32768, 32767),
    'http://www.esa.int/Smp#Int32': (AsnInt, -2147483648, 2147483647),
    'http://www.esa.int/Smp#Int64': (AsnInt, -9223372036854775808, 9223372036854775807),
    'http://www.esa.int/Smp#UInt8': (AsnInt, 0, 255),
    'http://www.esa.int/Smp#UInt16': (AsnInt, 0, 65535),
    'http://www.esa.int/Smp#UInt32': (AsnInt, 0, 4294967295),
    'http://www.esa.int/Smp#UInt64': (AsnInt, 0, 9223372036854775807),
    'http://www.esa.int/Smp#Float32': (AsnReal, -3.4E37, 3.4E37),
    'http://www.esa.int/Smp#Float64': (AsnReal, -1.8E307, 1.8E307)
})


def setVerbosity(level):
    global g_verboseLevel
    g_verboseLevel = level


def info(level, *args):
    '''Checks the 'level' argument against g_verboseLevel and then prints
    the rest of the args, one by one, separated by a space. It also
    has logic to deal with usage of one of the colors as arguments
    (in which case it avoids printing spurious spaces).
    '''
    if not args:
        panic("You called info without args")  # pragma: no cover
    if level<=g_verboseLevel:
        for i in xrange(len(args)):
            if i !=0 and args[i-1] not in colors:
                sys.stdout.write(' ')
            sys.stdout.write(args[i])
        for i in xrange(len(args)-1, -1, -1):
            if args[i] in colors:
                continue
            if not args[i].endswith('\n'):
                sys.stdout.write('\n')
                return


def panic(x, coloredBanner=""):
    '''Notifies the user that something fatal happened and aborts. '''
    info(0, yellow + coloredBanner + white + '\n' + x)
    sys.exit(1)


class DashUnderscoreAgnosticDict(dict):
    '''A dictionary that automatically replaces '_' to '-' in its keys. '''
    def __setitem__(self, key, value):
        super(DashUnderscoreAgnosticDict, self).__setitem__(key.replace('_', '-'), value)

    def __getitem__(self, key):
        return super(DashUnderscoreAgnosticDict, self).__getitem__(key.replace('_', '-'))

    def __contains__(self, key):
        return super(DashUnderscoreAgnosticDict, self).__contains__(key.replace('_', '-'))


class Attributes:
    '''Helper class, to ease access to XML attributes.
    It allows us to write code like this...

            a = Attributes(lxmlEtreeNode)
            whatever = a.href
            print a.title

        ...instead of this:

            whatever = lxmlEtreeNode.get('href', None)
            print a.get('title', None)
    '''
    def __init__(self, t):
        '''Argument t is an lxml Etree node.'''
        self._attrs = {}
        for k, v in t.items():
            endBraceIdx = k.find('}')
            if endBraceIdx != -1:
                k = k[endBraceIdx+1:]
            self._attrs[k] = v

    def __getattr__(self, x):
        return self._attrs.get(x, None)


def Clean(fieldName):
    '''When mapping field names and type names from SMP2 to ASN.1,
    we need to change '_' to '-'. '''
    return re.sub(r'[^a-zA-Z0-9-]', '-', fieldName)


def MapSMP2Type(attrs, enumOptions, itemTypes, fields):
    '''
    Core mapping function. Works on the XML attributes of the lxml Etree node,
    and returns a node from commonPy.asnAST.
    '''
    location = 'from %s, in line %s' % (attrs.base, attrs.sourceline)
    info(2, "Mapping SMP2 type", location)

    def getMaybe(cast, x):
        try:
            return cast(x)
        except:  # pragma: no cover
            return None  # pragma: no cover
    dataDict = {"asnFilename": attrs.base, "lineno": attrs.sourceline}
    if attrs.type == 'Types:Integer':
        low = getMaybe(int, attrs.Minimum)
        high = getMaybe(int, attrs.Maximum)
        if low==0 and high==1:
            # Pseudo-boolean from TASTE mapping, as per SpaceBel instructions
            return AsnBool(**dataDict)
        else:
            # Normal integer
            span = [low, high] if low is not None and high is not None else []
            dataDict["range"] = span
            return AsnInt(**dataDict)
    elif attrs.type == 'Types:Float':
        low = getMaybe(float, attrs.Minimum)
        high = getMaybe(float, attrs.Maximum)
        span = [low, high] if low is not None and high is not None else []
        dataDict["range"] = span
        return AsnReal(**dataDict)
    elif attrs.type == 'Types:Enumeration':
        dataDict["members"] = enumOptions
        return AsnEnumerated(**dataDict)
    elif attrs.type == 'Types:String':
        high = getMaybe(int, attrs.Length)
        span = [high, high] if high is not None else []
        dataDict["range"] = span
        return AsnOctetString(**dataDict)
    elif attrs.type == 'Types:Array':
        if itemTypes == []:
            panic("Missing mandatory ItemType element", location)  # pragma: no cover
        itemTypeAttrs = Attributes(itemTypes[0])
        arrSize = getMaybe(int, attrs.Size)
        if not arrSize:
            panic("Missing array 'Size' attribute", location)  # pragma: no cover
        dataDict["range"] = [arrSize, arrSize]
        if itemTypeAttrs.href in [
                'http://www.esa.int/2005/10/Smp#Char8',
                'http://www.esa.int/2005/10/Smp#Int8',
                'http://www.esa.int/2005/10/Smp#UInt8']:
            return AsnOctetString(**dataDict)
        else:
            containedHref = itemTypeAttrs.href
            if not containedHref:
                panic("Missing reference to 'href' (file:%s, line:%d)" %
                      itemTypeAttrs.base, itemTypeAttrs.sourceline)  # pragma: no cover
            idxHash = containedHref.find('#')
            if -1 != idxHash:
                containedHref = containedHref[idxHash+1:]
            if itemTypeAttrs.href in simpleTypesTable:
                # Create the AsnBasicNode this child maps to.
                cast, low, high = simpleTypesTable[itemTypeAttrs.href]
                span=[low, high] if low is not None and high is not None else []
                childDict = {
                    'asnFilename': itemTypes[0].base,
                    'lineno': itemTypes[0].sourceline
                }
                if span != []:
                    childDict['range'] = span
                childNode = cast(**childDict)
                dataDict['containedType'] = childNode
            else:
                # Store the 'Id' attribute - we will resolve this
                # in the FixupOutOfOrderIdReferences function.
                dataDict['containedType'] = containedHref
            return AsnSequenceOf(**dataDict)
    elif attrs.type == 'Types:Structure':
        members = []
        for field in fields:
            try:
                fieldName = field.get('Name')
                if fieldName != 'choiceIdx':
                    fieldName = Clean(fieldName)
                    fieldName = fieldName[0].lower() + fieldName[1:]
                    try:
                        refTypeAttrs = Attributes(field.xpath("Type")[0])
                    except:  # pragma: no cover
                        location = 'from %s, in line %s' % \
                            (field.base, field.sourceline)  # pragma: no cover
                        panic("Missing Type child element", location)  # pragma: no cover
                    refTypeHref = refTypeAttrs.href
                    idxHash = refTypeHref.find('#')
                    if -1 != idxHash:
                        refTypeHref = refTypeHref[idxHash+1:]
                    if refTypeAttrs.href in simpleTypesTable:
                        cast, low, high = simpleTypesTable[refTypeAttrs.href]
                        containedDict = {
                            'asnFilename': field.base,
                            'lineno': field.sourceline
                        }
                        span = [low, high] if low is not None and high is not None else []
                        if span != []:
                            containedDict['range']=[low, high]
                        basicNode = cast(**containedDict)
                        members.append((fieldName, basicNode))
                    else:
                        members.append((fieldName, AsnMetaMember(
                            asnFilename=field.base,
                            lineno=field.sourceline,
                            containedType=refTypeHref)))
                else:
                    members.append((fieldName, 'dummy'))
            except Exception, e:  # pragma: no cover
                panic(str(e) + '\nMake sure that:\n'
                      '1. The "Name" attribute exists\n'
                      '2. The "Type" child element, with attribute '
                      '"xlink:title" also exists.',
                      'In %s, line %d:' % (field.base, field.sourceline))  # pragma: no cover
        if 0 == len(members):
            panic("Empty SEQUENCE is not supported", location)  # pragma: no cover
        if members[0][0] == 'choiceIdx':
            dataDict['members'] = members[1:]
            return AsnChoice(**dataDict)
        else:
            dataDict['members'] = members
            return AsnSequence(**dataDict)
    panic("Failed to map... (%s)" % attrs.type, location)  # pragma: no cover


def FixupOutOfOrderIdReferences(nodeTypename, asnTypesDict, idToTypeDict):
    '''Based on the uniqueness of the 'Id' elements used in
    'xlink:href' remote references, we resolve the lookups of
    remote types that we stored in AsnMetaMembers during MapSMP2Type().'''
    node = asnTypesDict[nodeTypename]
    if isinstance(node, AsnChoice) or isinstance(node, AsnSequence) or isinstance(node, AsnSet):
        for idx, child in enumerate(node._members):
            if isinstance(child[1], AsnMetaMember):
                containedType = child[1]._containedType
                if containedType in idToTypeDict:
                    containedType = idToTypeDict[containedType]
                if containedType in asnTypesDict:
                    node._members[idx] = (child[0], asnTypesDict[containedType])
                else:
                    panic("Could not resolve Field '%s' in type '%s' (contained: %s)..." %
                          (child[0], nodeTypename, containedType), node.Location())  # pragma: no cover
    elif isinstance(node, AsnSequenceOf) or isinstance(node, AsnSetOf):
        if isinstance(node._containedType, str):
            containedType = node._containedType
            if containedType in idToTypeDict:
                containedType = idToTypeDict[containedType]
            if containedType in asnTypesDict:
                node._containedType = asnTypesDict[containedType]
            else:
                panic("In type '%s', could not resolve: %s)" %
                      (nodeTypename, containedType), node.Location())  # pragma: no cover


def ConvertCatalogueToASN_AST(inputSmp2Files):
    '''Converts a list of input SMP2 Catalogues into an ASN.1 AST,
    which it returns to the caller.'''
    asnTypesDict = DashUnderscoreAgnosticDict()
    idToTypeDict = {}
    allSMP2Types = {}
    # Do a first pass, verifying the primary assumption:
    # That 'Id' elements of types are unique across our set of SMP2 files.
    for inputSmp2File in inputSmp2Files:
        a=etree.parse(open(inputSmp2File))
        root=a.getroot()
        if len(root)<1 or not root.tag.endswith('Catalogue'):
            panic('', "You must use an XML file that contains an SMP2 Catalogue")  # pragma: no cover
        for t in root.xpath("//Type"):
            a = Attributes(t)
            if not a.Id:  # Missing attribute Id, don't bother checking for duplicates
                continue
            if a.Id in allSMP2Types:
                catalogue = allSMP2Types[a.Id]  # pragma: no cover
                if catalogue != inputSmp2File:  # pragma: no cover
                    panic("The same Id exists in two files: %s exists in both: %s" %
                          (a.Id, str([catalogue, inputSmp2File])))  # pragma: no cover
            else:
                allSMP2Types[a.Id] = inputSmp2File
    for inputSmp2File in inputSmp2Files:
        a=etree.parse(open(inputSmp2File))
        root=a.getroot()
        if len(root)<1 or not root.tag.endswith('Catalogue'):
            panic('', "You must use an XML file that contains an SMP2 Catalogue")  # pragma: no cover
        for t in root.xpath("//Type"):
            # Find the enclosing Namespace element
            for namespace in t.iterancestors(tag='Namespace'):
                break
            else:
                panic("No Namespace parent node found (file:%s, line:%d)" %
                      t.base, t.sourceline)  # pragma: no cover

            # Store the namespace 'Name' attribute, and use it to prefix our types
            nsName = namespace.get('Name')
            if not nsName:
                panic("Missing attribute Name from Namespace (file:%s, line:%d)" %
                      namespace.base, namespace.sourceline)  # pragma: no cover
            cataloguePrefix = Clean(nsName).capitalize() + "_"

            a = Attributes(t)
            a.base = t.base
            a.sourceline = t.sourceline

            if not a.type:
                # Check to see if this is one of the hardcoded types
                if a.href in simpleTypesTable:
                    k = a.href
                    v = simpleTypesTable[k]
                    nodeTypename = a.title
                    if nodeTypename is None:
                        panic("'xlink:href' points to ready-made SMP2 type, but 'xlink:title' is missing! (file:%s, line:%d)" %
                              a.base, a.sourceline)  # pragma: no cover
                    nodeTypename = Clean(nodeTypename.split()[-1]).capitalize()  # Primitive Int32 -> Int32
                    cast, low, high = v
                    containedDict = {
                        'asnFilename': a.base,
                        'lineno': a.sourceline
                    }
                    span = [low, high] if (low is not None and high is not None) else []
                    if span != []:
                        containedDict['range']=[low, high]
                    # Especially for these hardcoded types, don't prefix with namespace.Name
                    asnTypesDict[nodeTypename] = cast(**containedDict)
                else:
                    if a.href is not None and a.href.startswith("http://www.esa.int/"):
                        print "WARNING: Unknown hardcoded (%s) - should it be added in commonSMP2.py:simpleTypesTable?" % a.href
                    # This <Type> element had no xsi:type, and it's xlink:title was not in the hardcoded list
                    # Skip it.
                    # panic("Both 'xsi:type' and 'Name' are mandatory attributes (file:%s, line:%d)" %
                    #       (a.base, a.sourceline))  # pragma: no cover
                    continue

                # The type was merged in the AST or skipped over - work on the next one
                continue

            if a.type.startswith('Catalogue:'):
                # We only wants Types, nothing more
                continue

            nodeTypename = a.Name
            nodeTypename = nodeTypename[0].upper() + nodeTypename[1:]
            nodeTypename = nodeTypename.replace('_', '-')

            # Gather children node's info:

            # 1. Enumeration data
            enumOptions = []
            if a.type == 'Types:Enumeration':
                for node in t.xpath("Literal"):
                    enumOptions.append([x.replace('_', '-').lower() for x in map(node.get, ['Name', 'Value'])])

            # 2. ItemType data (used in arrays)
            itemTypes = t.xpath("ItemType")

            # 3. Field data (used in structures)
            fields = t.xpath("Field")

            try:
                description = t.xpath("Description")[0].text
            except:  # pragma: no cover
                location = 'from %s, in line %s' % \
                    (t.base, t.sourceline)  # pragma: no cover
                panic("Missing Description child element", location)  # pragma: no cover
            info(2, "Creating type:", cataloguePrefix+nodeTypename)
            asnNode = MapSMP2Type(a, enumOptions, itemTypes, fields)
            if 'artificial' in description:
                asnNode._isArtificial = True
            asnTypesDict[cataloguePrefix + nodeTypename] = asnNode
            # Store mapping from Id to typename in idToTypeDict
            # (used below, in FixupOutOfOrderIdReferences)
            idToTypeDict[a.Id] = cataloguePrefix + nodeTypename

    for nodeTypename in asnTypesDict.keys():
        FixupOutOfOrderIdReferences(nodeTypename, asnTypesDict, idToTypeDict)
    return asnTypesDict, idToTypeDict