Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Ardupilot
GitHub Repository: Ardupilot/ardupilot
Path: blob/master/Tools/autotest/logger_metadata/parse.py
9685 views
1
#!/usr/bin/env python3
2
3
'''
4
AP_FLAKE8_CLEAN
5
'''
6
7
import argparse
8
import copy
9
import os
10
import re
11
import sys
12
13
import emit_html
14
import emit_rst
15
import emit_xml
16
import emit_md
17
18
import enum_parse
19
from enum_parse import EnumDocco
20
21
topdir = os.path.join(os.path.dirname(os.path.realpath(__file__)), '../../../')
22
topdir = os.path.realpath(topdir)
23
24
# Regular expressions for finding message information in code comments
25
re_loggermessage = re.compile(r"@LoggerMessage\s*:\s*([\w,]+)", re.MULTILINE)
26
re_commentline = re.compile(r"\s*//")
27
re_description = re.compile(r"\s*//\s*@Description\s*:\s*(.*)")
28
re_url = re.compile(r"\s*//\s*@URL\s*:\s*(.*)")
29
re_field = re.compile(r"\s*//\s*@Field\s*:\s*(\w+):\s*(.*)")
30
re_fieldbits = re.compile(r"\s*//\s*@FieldBits\s*:\s*(\w+):\s*(.*)")
31
re_fieldbitmaskenum = re.compile(r"\s*//\s*@FieldBitmaskEnum\s*:\s*(\w+):\s*(.*)")
32
re_fieldvalueenum = re.compile(r"\s*//\s*@FieldValueEnum\s*:\s*(\w+):\s*(.*)")
33
re_vehicles = re.compile(r"\s*//\s*@Vehicles\s*:\s*(.*)")
34
35
# Regular expressions for finding message definitions in structure format
36
re_start_messagedef = re.compile(r"^\s*{?\s*LOG_[A-Z0-9_]+_[MSGTA]+[A-Z0-9_]*\s*,")
37
re_deffield = r'[\s\\]*"?([\w\-#?%]+)"?\s*'
38
re_full_messagedef = re.compile(r'\s*LOG_\w+\s*,\s*\w+\([^)]+\)[\s\\]*,' +
39
f'{re_deffield},{re_deffield},' +
40
r'[\s\\]*"?([\w,]+)"?[\s\\]*,' +
41
f'{re_deffield},{re_deffield}',
42
re.MULTILINE)
43
re_names_define = re.compile(r'#define\s+(\w+_LABELS)\s+"([\w,]+)"')
44
re_fmt_define = re.compile(r'#define\s+(\w+_FMT)\s+"([\w\-#?%]+)"')
45
re_units_define = re.compile(r'#define\s+(\w+_UNITS)\s+"([\w\-#?%]+)"')
46
re_mults_define = re.compile(r'#define\s+(\w+_MULTS)\s+"([\w\-#?%]+)"')
47
48
# Regular expressions for finding message definitions in Write calls
49
re_start_writecall = re.compile(r"\s*[AP:]*logger[\(\)]*.Write[StreamingCrcl]*\(")
50
re_writefield = r'\s*"([\w\-#?%,]+)"\s*'
51
re_full_writecall = re.compile(r'\s*[AP:]*logger[\(\)]*.Write[StreamingCrcl]*\(' +
52
f'{re_writefield},{re_writefield},{re_writefield},({re_writefield},{re_writefield})?',
53
re.MULTILINE)
54
55
# Regular expression for extracting unit and multipliers from structure
56
re_units_mults_struct = re.compile(r"^\s*{\s*'([\w\-#?%!/])',"+r'\s*"?([\w\-#?%./]*)"?\s*}')
57
58
# TODO: validate URLS actually return 200
59
60
# Lookup tables are populated by reading LogStructure.h
61
log_fmt_lookup = {}
62
log_units_lookup = {}
63
log_mult_lookup = {}
64
65
# Lookup table to convert multiplier to prefix
66
mult_prefix_lookup = {
67
0: "",
68
1: "",
69
1e-1: "d", # deci-
70
1e-2: "c", # centi-
71
1e-3: "m", # milli-
72
1e-6: "μ", # micro-
73
1e-9: "n" # nano-
74
}
75
76
77
class LoggerDocco(object):
78
79
vehicle_map = {
80
"Rover": "Rover",
81
"Sub": "ArduSub",
82
"Copter": "ArduCopter",
83
"Plane": "ArduPlane",
84
"Tracker": "AntennaTracker",
85
"Blimp": "Blimp",
86
}
87
88
def __init__(self, vehicle):
89
self.vehicle = vehicle
90
self.doccos = []
91
self.emitters = [
92
emit_html.HTMLEmitter(),
93
emit_rst.RSTEmitter(),
94
emit_xml.XMLEmitter(),
95
emit_md.MDEmitter(),
96
]
97
self.msg_fmts_list = {}
98
self.msg_names_list = {}
99
self.msg_units_list = {}
100
self.msg_mults_list = {}
101
102
class Docco(object):
103
104
def __init__(self, name):
105
self.name = name
106
self.url = None
107
if isinstance(name, list):
108
self.description = [None] * len(name)
109
else:
110
self.description = None
111
self.fields = {}
112
self.fields_order = []
113
self.vehicles = None
114
self.bits_enums = []
115
116
def add_name(self, name):
117
# If self.name/description aren't lists, convert them
118
if isinstance(self.name, str):
119
self.name = [self.name]
120
self.description = [self.description]
121
# Replace any existing empty descriptions with empty strings
122
for i in range(0, len(self.description)):
123
if self.description[i] is None:
124
self.description[i] = ""
125
# Extend the name and description lists
126
if isinstance(name, list):
127
self.name.extend(name)
128
self.description.extend([None] * len(name))
129
else:
130
self.name.append(name)
131
self.description.append(None)
132
133
def set_description(self, desc):
134
if isinstance(self.description, list):
135
for i in range(0, len(self.description)):
136
if self.description[i] is None:
137
self.description[i] = desc
138
else:
139
self.description = desc
140
141
def set_url(self, url):
142
self.url = url
143
144
def ensure_field(self, field):
145
if field not in self.fields:
146
self.fields[field] = {}
147
self.fields_order.append(field)
148
149
def set_field_description(self, field, description):
150
if field in self.fields:
151
raise ValueError("Already have field %s in %s" %
152
(field, self.name))
153
self.ensure_field(field)
154
self.fields[field]["description"] = description
155
156
def get_field_description(self, field):
157
if field not in self.fields:
158
return None
159
return self.fields[field].get('description', None)
160
161
def set_field_bits(self, field, bits):
162
bits = bits.split(",")
163
count = 0
164
entries = []
165
for bit in bits:
166
entries.append(EnumDocco.EnumEntry(bit, 1 << count, None))
167
count += 1
168
bitmask_name = self.name + field
169
self.bits_enums.append(EnumDocco.Enumeration(bitmask_name, entries))
170
self.ensure_field(field)
171
self.fields[field]["bitmaskenum"] = bitmask_name
172
173
def set_fieldbitmaskenum(self, field, bits):
174
self.ensure_field(field)
175
self.fields[field]["bitmaskenum"] = bits
176
177
def set_fieldvalueenum(self, field, bits):
178
self.ensure_field(field)
179
self.fields[field]["valueenum"] = bits
180
181
def set_vehicles(self, vehicles):
182
self.vehicles = vehicles
183
184
def set_field_names(self, fields):
185
''' Check that the field ordering matches the defined fields '''
186
fields = fields.split(",")
187
# First check that the number of fields match
188
if len(fields) != len(self.fields_order):
189
print(f"Error: Mismatch in number of fields in message {self.name}: ", file=sys.stderr, end='')
190
print(f"{len(self.fields_order)} vs {len(fields)}", file=sys.stderr)
191
sys.exit(1)
192
# Now check that each field name matches
193
err = False
194
for idx in range(0, len(fields)):
195
if fields[idx] != self.fields_order[idx]:
196
print(f"Error: Field order mismatch in log message {self.name}: ", file=sys.stderr, end='')
197
print(f"field={idx+1}: {fields[idx]} vs {self.fields_order[idx]}", file=sys.stderr)
198
err = True
199
# Exit if we had any name mismatch errors
200
if err:
201
sys.exit(1)
202
203
def set_fmts(self, fmts):
204
# If no fields are defined, do nothing
205
if len(self.fields_order) == 0:
206
return
207
# Make sure lengths match up
208
if len(fmts) != len(self.fields_order):
209
print(f"Number of fmts don't match fields: msg={self.name} fmts={fmts} num_fields={len(self.fields_order)} {self.fields_order}") # noqa:E501
210
return
211
# Loop through the list
212
for idx in range(0, len(fmts)):
213
if fmts[idx] in log_fmt_lookup:
214
self.fields[self.fields_order[idx]]["fmt"] = log_fmt_lookup[fmts[idx]]
215
else:
216
print(f"Unrecognised format character: {fmts[idx]} in message {self.name}")
217
218
def set_units(self, units, mults):
219
# If no fields are defined, do nothing
220
if len(self.fields_order) == 0:
221
return
222
# Make sure lengths match up
223
if len(units) != len(self.fields_order) or len(units) != len(mults):
224
print(f"Number of units/mults/fields don't match: msg={self.name} units={units} mults={mults} num_fields={len(self.fields_order)}") # noqa:E501
225
return
226
# Loop through the list
227
for idx in range(0, len(units)):
228
# Get the index into fields from field_order
229
f = self.fields_order[idx]
230
# Convert unit char to base unit
231
if units[idx] in log_units_lookup:
232
baseunit = log_units_lookup[units[idx]]
233
else:
234
print(f"Unrecognised units character: {units[idx]} in message {self.name}")
235
continue
236
# Do nothing if this field has no unit defined
237
if baseunit == "":
238
continue
239
# Convert mult char to value
240
if mults[idx] in log_mult_lookup:
241
mult = log_mult_lookup[mults[idx]]
242
mult_num = float(mult)
243
else:
244
print(f"Unrecognised multiplier character: {mults[idx]} in message {self.name}")
245
continue
246
# Check if the defined format for this field contains its own multiplier
247
# If so, the presented value will be the base-unit directly
248
if 'fmt' in self.fields[f] and self.fields[f]['fmt'].endswith("* 100"):
249
self.fields[f]["units"] = baseunit
250
elif 'fmt' in self.fields[f] and "latitude/longitude" in self.fields[f]['fmt']:
251
self.fields[f]["units"] = baseunit
252
# Check if we have a defined prefix for this multiplier
253
elif mult_num in mult_prefix_lookup:
254
self.fields[f]["units"] = f"{mult_prefix_lookup[mult_num]}{baseunit}"
255
# If all else fails, set the unit as the multiplier and base unit together
256
else:
257
self.fields[f]["units"] = f"{mult} {baseunit}"
258
259
def populate_lookups(self):
260
# Initialise the lookup tables
261
# Read the contents of the LogStructure.h file
262
structfile = os.path.join(topdir, "libraries", "AP_Logger", "LogStructure.h")
263
with open(structfile) as f:
264
lines = f.readlines()
265
f.close()
266
# Initialise current section to none
267
section = "none"
268
# Loop through the lines in the file
269
for line in lines:
270
# Look for the start of fmt/unit/mult info
271
if line.startswith("Format characters"):
272
section = "fmt"
273
elif line.startswith("const struct UnitStructure"):
274
section = "units"
275
elif line.startswith("const struct MultiplierStructure"):
276
section = "mult"
277
# Read formats from code comment, e.g.:
278
# b : int8_t
279
elif section == "fmt":
280
if "*/" in line:
281
section = "none"
282
else:
283
parts = line.split(":")
284
log_fmt_lookup[parts[0].strip()] = parts[1].strip()
285
# Read units or multipliers from C struct definition, e.g.:
286
# { '2', 1e2 }, or { 'J', "W.s" },
287
elif section != "none":
288
if "};" in line:
289
section = "none"
290
else:
291
u = re_units_mults_struct.search(line)
292
if u is not None and section == "units":
293
log_units_lookup[u.group(1)] = u.group(2)
294
if u is not None and section == "mult":
295
log_mult_lookup[u.group(1)] = u.group(2)
296
297
def search_for_files(self, dirs_to_search):
298
_next = []
299
for _dir in dirs_to_search:
300
_dir = os.path.join(topdir, _dir)
301
for entry in os.listdir(_dir):
302
filepath = os.path.join(_dir, entry)
303
if os.path.isdir(filepath):
304
_next.append(filepath)
305
continue
306
(name, extension) = os.path.splitext(filepath)
307
if extension not in [".cpp", ".h"]:
308
continue
309
self.files.append(filepath)
310
if len(_next):
311
self.search_for_files(_next)
312
313
def parse_messagedef(self, messagedef):
314
# Merge concatenated strings and remove comments
315
messagedef = re.sub(r'"\s+"', '', messagedef)
316
messagedef = re.sub(r'//[^\n]*', '', messagedef)
317
# Extract details from a structure definition
318
d = re_full_messagedef.search(messagedef)
319
if d is not None:
320
self.msg_fmts_list[d.group(1)] = d.group(2)
321
self.msg_names_list[d.group(1)] = d.group(3)
322
self.msg_units_list[d.group(1)] = d.group(4)
323
self.msg_mults_list[d.group(1)] = d.group(5)
324
return
325
# Extract details from a WriteStreaming call
326
d = re_full_writecall.search(messagedef)
327
if d is not None:
328
if d.group(1) in self.msg_fmts_list:
329
return
330
if d.group(5) is None:
331
self.msg_names_list[d.group(1)] = d.group(2)
332
self.msg_fmts_list[d.group(1)] = d.group(3)
333
else:
334
self.msg_names_list[d.group(1)] = d.group(2)
335
self.msg_fmts_list[d.group(1)] = d.group(6)
336
self.msg_units_list[d.group(1)] = d.group(3)
337
self.msg_mults_list[d.group(1)] = d.group(5)
338
return
339
# Didn't parse
340
# print(f"Unable to parse: {messagedef}")
341
342
def search_messagedef_start(self, line, prevmessagedef=""):
343
# Look for the start of a structure definition
344
d = re_start_messagedef.search(line)
345
if d is not None:
346
messagedef = line
347
if "}" in line:
348
self.parse_messagedef(messagedef)
349
return ""
350
else:
351
return messagedef
352
# Look for a new call to WriteStreaming
353
d = re_start_writecall.search(line)
354
if d is not None:
355
messagedef = line
356
if ";" in line:
357
self.parse_messagedef(messagedef)
358
return ""
359
else:
360
return messagedef
361
# If we didn't find a new one, continue with any previous state
362
return prevmessagedef
363
364
def parse_file(self, filepath):
365
with open(filepath) as f:
366
# print("Opened (%s)" % filepath)
367
lines = f.readlines()
368
f.close()
369
370
def debug(x):
371
pass
372
# if filepath == "/home/pbarker/rc/ardupilot/libraries/AP_HAL/AnalogIn.h":
373
# debug = print
374
state_outside = "outside"
375
state_inside = "inside"
376
messagedef = ""
377
state = state_outside
378
docco = None
379
for line in lines:
380
debug(f"{state}: {line}")
381
if messagedef:
382
messagedef = messagedef + line
383
if "}" in line or ";" in line:
384
self.parse_messagedef(messagedef)
385
messagedef = ""
386
if state == state_outside:
387
# Check for start of a message definition
388
messagedef = self.search_messagedef_start(line, messagedef)
389
390
# Check for fmt/unit/mult #define
391
u = re_names_define.search(line)
392
if u is not None:
393
self.msg_names_list[u.group(1)] = u.group(2)
394
u = re_fmt_define.search(line)
395
if u is not None:
396
self.msg_fmts_list[u.group(1)] = u.group(2)
397
u = re_units_define.search(line)
398
if u is not None:
399
self.msg_units_list[u.group(1)] = u.group(2)
400
u = re_mults_define.search(line)
401
if u is not None:
402
self.msg_mults_list[u.group(1)] = u.group(2)
403
404
# Check for the @LoggerMessage tag indicating the start of the docco block
405
m = re_loggermessage.search(line)
406
if m is None:
407
continue
408
name = m.group(1)
409
if "," in name:
410
name = name.split(",")
411
state = state_inside
412
docco = LoggerDocco.Docco(name)
413
elif state == state_inside:
414
# If this line is not a comment, then this is the end of the docco block
415
if not re_commentline.match(line):
416
state = state_outside
417
if docco.vehicles is None or self.vehicle in docco.vehicles:
418
self.finalise_docco(docco)
419
messagedef = self.search_messagedef_start(line)
420
continue
421
# Check for an multiple @LoggerMessage lines in this docco block
422
m = re_loggermessage.search(line)
423
if m is not None:
424
name = m.group(1)
425
if "," in name:
426
name = name.split(",")
427
docco.add_name(name)
428
continue
429
# Find and extract data from the various docco fields
430
m = re_description.match(line)
431
if m is not None:
432
docco.set_description(m.group(1))
433
continue
434
m = re_url.match(line)
435
if m is not None:
436
docco.set_url(m.group(1))
437
continue
438
m = re_field.match(line)
439
if m is not None:
440
docco.set_field_description(m.group(1), m.group(2))
441
continue
442
m = re_fieldbits.match(line)
443
if m is not None:
444
docco.set_field_bits(m.group(1), m.group(2))
445
continue
446
m = re_fieldbitmaskenum.match(line)
447
if m is not None:
448
docco.set_fieldbitmaskenum(m.group(1), m.group(2))
449
continue
450
m = re_fieldvalueenum.match(line)
451
if m is not None:
452
docco.set_fieldvalueenum(m.group(1), m.group(2))
453
continue
454
m = re_vehicles.match(line)
455
if m is not None:
456
docco.set_vehicles([x.strip() for x in m.group(1).split(',')])
457
continue
458
print("Unknown field (%s)" % str(line))
459
sys.exit(1)
460
461
def parse_files(self):
462
for _file in self.files:
463
self.parse_file(_file)
464
465
def emit_output(self):
466
# expand things like PIDR,PIDQ,PIDA into multiple doccos
467
new_doccos = []
468
for docco in self.doccos:
469
if isinstance(docco.name, list):
470
for name, desc in zip(docco.name, docco.description):
471
tmpdocco = copy.copy(docco)
472
tmpdocco.name = name
473
tmpdocco.description = desc
474
new_doccos.append(tmpdocco)
475
else:
476
new_doccos.append(docco)
477
new_doccos = sorted(new_doccos, key=lambda x : x.name)
478
479
# Try to attach the formats/units/multipliers
480
for docco in new_doccos:
481
# Check that the field names are correctly ordered
482
if docco.name in self.msg_names_list:
483
if "LABELS" in self.msg_names_list[docco.name]:
484
if self.msg_names_list[docco.name] in self.msg_names_list:
485
docco.set_field_names(self.msg_names_list[self.msg_names_list[docco.name]])
486
else:
487
docco.set_field_names(self.msg_names_list[docco.name])
488
else:
489
print(f"No field names found for message {docco.name}")
490
# Apply the Formats to the docco
491
if docco.name in self.msg_fmts_list:
492
if "FMT" in self.msg_fmts_list[docco.name]:
493
if self.msg_fmts_list[docco.name] in self.msg_fmts_list:
494
docco.set_fmts(self.msg_fmts_list[self.msg_fmts_list[docco.name]])
495
else:
496
docco.set_fmts(self.msg_fmts_list[docco.name])
497
else:
498
print(f"No formats found for message {docco.name}")
499
# Get the Units
500
units = None
501
if docco.name in self.msg_units_list:
502
if "UNITS" in self.msg_units_list[docco.name]:
503
if self.msg_units_list[docco.name] in self.msg_units_list:
504
units = self.msg_units_list[self.msg_units_list[docco.name]]
505
else:
506
units = self.msg_units_list[docco.name]
507
# Get the Multipliers
508
mults = None
509
if docco.name in self.msg_mults_list:
510
if "MULTS" in self.msg_mults_list[docco.name]:
511
if self.msg_mults_list[docco.name] in self.msg_mults_list:
512
mults = self.msg_mults_list[self.msg_mults_list[docco.name]]
513
else:
514
mults = self.msg_mults_list[docco.name]
515
# Apply the units/mults to the docco
516
if units is not None and mults is not None:
517
docco.set_units(units, mults)
518
elif units is not None or mults is not None:
519
print(f"Cannot find matching units/mults for message {docco.name}")
520
521
# every field must have a description. Things like
522
# FieldBitmaskEnum can create the field object but not fill
523
# description in.
524
for docco in new_doccos:
525
for field in docco.fields:
526
if docco.get_field_description(field) is None:
527
raise ValueError(f"{docco.name}.{field} missing description")
528
529
enums_by_name = {}
530
for enum in self.enumerations:
531
enums_by_name[enum.name] = enum
532
for emitter in self.emitters:
533
emitter.emit(new_doccos, enums_by_name)
534
535
def run(self):
536
self.populate_lookups()
537
self.enumerations = enum_parse.EnumDocco(self.vehicle).get_enumerations()
538
self.files = []
539
self.search_for_files([self.vehicle_map[self.vehicle], "libraries"])
540
self.parse_files()
541
self.emit_output()
542
543
def finalise_docco(self, docco):
544
self.doccos.append(docco)
545
self.enumerations += docco.bits_enums
546
547
548
if __name__ == '__main__':
549
parser = argparse.ArgumentParser(description="Parse parameters.")
550
parser.add_argument("-v", "--verbose", dest='verbose', action='store_true', default=False, help="show debugging output")
551
parser.add_argument("--vehicle", required=True, help="Vehicle type to generate for")
552
553
args = parser.parse_args()
554
555
s = LoggerDocco(args.vehicle)
556
557
if args.vehicle not in s.vehicle_map:
558
print("Invalid vehicle (choose from: %s)" % str(s.vehicle_map.keys()))
559
sys.exit(1)
560
561
s.run()
562
563