CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
Ardupilot

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place. Commercial Alternative to JupyterHub.

GitHub Repository: Ardupilot/ardupilot
Path: blob/master/Tools/autotest/logger_metadata/parse.py
Views: 1799
1
#!/usr/bin/env python
2
3
'''
4
AP_FLAKE8_CLEAN
5
'''
6
7
from __future__ import print_function
8
9
import argparse
10
import copy
11
import os
12
import re
13
import sys
14
15
import emit_html
16
import emit_rst
17
import emit_xml
18
import emit_md
19
20
import enum_parse
21
from enum_parse import EnumDocco
22
23
topdir = os.path.join(os.path.dirname(os.path.realpath(__file__)), '../../../')
24
topdir = os.path.realpath(topdir)
25
26
# Regular expressions for finding message information in code comments
27
re_loggermessage = re.compile(r"@LoggerMessage\s*:\s*([\w,]+)", re.MULTILINE)
28
re_commentline = re.compile(r"\s*//")
29
re_description = re.compile(r"\s*//\s*@Description\s*:\s*(.*)")
30
re_url = re.compile(r"\s*//\s*@URL\s*:\s*(.*)")
31
re_field = re.compile(r"\s*//\s*@Field\s*:\s*(\w+):\s*(.*)")
32
re_fieldbits = re.compile(r"\s*//\s*@FieldBits\s*:\s*(\w+):\s*(.*)")
33
re_fieldbitmaskenum = re.compile(r"\s*//\s*@FieldBitmaskEnum\s*:\s*(\w+):\s*(.*)")
34
re_fieldvalueenum = re.compile(r"\s*//\s*@FieldValueEnum\s*:\s*(\w+):\s*(.*)")
35
re_vehicles = re.compile(r"\s*//\s*@Vehicles\s*:\s*(.*)")
36
37
# Regular expressions for finding message definitions in structure format
38
re_start_messagedef = re.compile(r"^\s*{?\s*LOG_[A-Z0-9_]+_[MSGTA]+[A-Z0-9_]*\s*,")
39
re_deffield = r'[\s\\]*"?([\w\-#?%]+)"?\s*'
40
re_full_messagedef = re.compile(r'\s*LOG_\w+\s*,\s*\w+\([^)]+\)[\s\\]*,' +
41
f'{re_deffield},{re_deffield},' +
42
r'[\s\\]*"?([\w,]+)"?[\s\\]*,' +
43
f'{re_deffield},{re_deffield}',
44
re.MULTILINE)
45
re_fmt_define = re.compile(r'#define\s+(\w+_FMT)\s+"([\w\-#?%]+)"')
46
re_units_define = re.compile(r'#define\s+(\w+_UNITS)\s+"([\w\-#?%]+)"')
47
re_mults_define = re.compile(r'#define\s+(\w+_MULTS)\s+"([\w\-#?%]+)"')
48
49
# Regular expressions for finding message definitions in Write calls
50
re_start_writecall = re.compile(r"\s*[AP:]*logger[\(\)]*.Write[StreamingCrcl]*\(")
51
re_writefield = r'\s*"([\w\-#?%,]+)"\s*'
52
re_full_writecall = re.compile(r'\s*[AP:]*logger[\(\)]*.Write[StreamingCrcl]*\(' +
53
f'{re_writefield},{re_writefield},{re_writefield},({re_writefield},{re_writefield})?',
54
re.MULTILINE)
55
56
# Regular expression for extracting unit and multipliers from structure
57
re_units_mults_struct = re.compile(r"^\s*{\s*'([\w\-#?%!/])',"+r'\s*"?([\w\-#?%./]*)"?\s*}')
58
59
# TODO: validate URLS actually return 200
60
61
# Lookup tables are populated by reading LogStructure.h
62
log_fmt_lookup = {}
63
log_units_lookup = {}
64
log_mult_lookup = {}
65
66
# Lookup table to convert multiplier to prefix
67
mult_prefix_lookup = {
68
0: "",
69
1: "",
70
1e-1: "d", # deci-
71
1e-2: "c", # centi-
72
1e-3: "m", # milli-
73
1e-6: "μ", # micro-
74
1e-9: "n" # nano-
75
}
76
77
78
class LoggerDocco(object):
79
80
vehicle_map = {
81
"Rover": "Rover",
82
"Sub": "ArduSub",
83
"Copter": "ArduCopter",
84
"Plane": "ArduPlane",
85
"Tracker": "AntennaTracker",
86
"Blimp": "Blimp",
87
}
88
89
def __init__(self, vehicle):
90
self.vehicle = vehicle
91
self.doccos = []
92
self.emitters = [
93
emit_html.HTMLEmitter(),
94
emit_rst.RSTEmitter(),
95
emit_xml.XMLEmitter(),
96
emit_md.MDEmitter(),
97
]
98
self.msg_fmts_list = {}
99
self.msg_units_list = {}
100
self.msg_mults_list = {}
101
102
class Docco(object):
103
104
def __init__(self, name):
105
self.name = name
106
self.url = None
107
if isinstance(name, list):
108
self.description = [None] * len(name)
109
else:
110
self.description = None
111
self.fields = {}
112
self.fields_order = []
113
self.vehicles = None
114
self.bits_enums = []
115
116
def add_name(self, name):
117
# If self.name/description aren't lists, convert them
118
if isinstance(self.name, str):
119
self.name = [self.name]
120
self.description = [self.description]
121
# Replace any existing empty descriptions with empty strings
122
for i in range(0, len(self.description)):
123
if self.description[i] is None:
124
self.description[i] = ""
125
# Extend the name and description lists
126
if isinstance(name, list):
127
self.name.extend(name)
128
self.description.extend([None] * len(name))
129
else:
130
self.name.append(name)
131
self.description.append(None)
132
133
def set_description(self, desc):
134
if isinstance(self.description, list):
135
for i in range(0, len(self.description)):
136
if self.description[i] is None:
137
self.description[i] = desc
138
else:
139
self.description = desc
140
141
def set_url(self, url):
142
self.url = url
143
144
def ensure_field(self, field):
145
if field not in self.fields:
146
self.fields[field] = {}
147
self.fields_order.append(field)
148
149
def set_field_description(self, field, description):
150
if field in self.fields:
151
raise ValueError("Already have field %s in %s" %
152
(field, self.name))
153
self.ensure_field(field)
154
self.fields[field]["description"] = description
155
156
def set_field_bits(self, field, bits):
157
bits = bits.split(",")
158
count = 0
159
entries = []
160
for bit in bits:
161
entries.append(EnumDocco.EnumEntry(bit, 1 << count, None))
162
count += 1
163
bitmask_name = self.name + field
164
self.bits_enums.append(EnumDocco.Enumeration(bitmask_name, entries))
165
self.ensure_field(field)
166
self.fields[field]["bitmaskenum"] = bitmask_name
167
168
def set_fieldbitmaskenum(self, field, bits):
169
self.ensure_field(field)
170
self.fields[field]["bitmaskenum"] = bits
171
172
def set_fieldvalueenum(self, field, bits):
173
self.ensure_field(field)
174
self.fields[field]["valueenum"] = bits
175
176
def set_vehicles(self, vehicles):
177
self.vehicles = vehicles
178
179
def set_fmts(self, fmts):
180
# If no fields are defined, do nothing
181
if len(self.fields_order) == 0:
182
return
183
# Make sure lengths match up
184
if len(fmts) != len(self.fields_order):
185
print(f"Number of fmts don't match fields: msg={self.name} fmts={fmts} num_fields={len(self.fields_order)} {self.fields_order}") # noqa:E501
186
return
187
# Loop through the list
188
for idx in range(0, len(fmts)):
189
if fmts[idx] in log_fmt_lookup:
190
self.fields[self.fields_order[idx]]["fmt"] = log_fmt_lookup[fmts[idx]]
191
else:
192
print(f"Unrecognised format character: {fmts[idx]} in message {self.name}")
193
194
def set_units(self, units, mults):
195
# If no fields are defined, do nothing
196
if len(self.fields_order) == 0:
197
return
198
# Make sure lengths match up
199
if len(units) != len(self.fields_order) or len(units) != len(mults):
200
print(f"Number of units/mults/fields don't match: msg={self.name} units={units} mults={mults} num_fields={len(self.fields_order)}") # noqa:E501
201
return
202
# Loop through the list
203
for idx in range(0, len(units)):
204
# Get the index into fields from field_order
205
f = self.fields_order[idx]
206
# Convert unit char to base unit
207
if units[idx] in log_units_lookup:
208
baseunit = log_units_lookup[units[idx]]
209
else:
210
print(f"Unrecognised units character: {units[idx]} in message {self.name}")
211
continue
212
# Do nothing if this field has no unit defined
213
if baseunit == "":
214
continue
215
# Convert mult char to value
216
if mults[idx] in log_mult_lookup:
217
mult = log_mult_lookup[mults[idx]]
218
mult_num = float(mult)
219
else:
220
print(f"Unrecognised multiplier character: {mults[idx]} in message {self.name}")
221
continue
222
# Check if the defined format for this field contains its own multiplier
223
# If so, the presented value will be the base-unit directly
224
if 'fmt' in self.fields[f] and self.fields[f]['fmt'].endswith("* 100"):
225
self.fields[f]["units"] = baseunit
226
elif 'fmt' in self.fields[f] and "latitude/longitude" in self.fields[f]['fmt']:
227
self.fields[f]["units"] = baseunit
228
# Check if we have a defined prefix for this multiplier
229
elif mult_num in mult_prefix_lookup:
230
self.fields[f]["units"] = f"{mult_prefix_lookup[mult_num]}{baseunit}"
231
# If all else fails, set the unit as the multipler and base unit together
232
else:
233
self.fields[f]["units"] = f"{mult} {baseunit}"
234
235
def populate_lookups(self):
236
# Initialise the lookup tables
237
# Read the contents of the LogStructure.h file
238
structfile = os.path.join(topdir, "libraries", "AP_Logger", "LogStructure.h")
239
with open(structfile) as f:
240
lines = f.readlines()
241
f.close()
242
# Initialise current section to none
243
section = "none"
244
# Loop through the lines in the file
245
for line in lines:
246
# Look for the start of fmt/unit/mult info
247
if line.startswith("Format characters"):
248
section = "fmt"
249
elif line.startswith("const struct UnitStructure"):
250
section = "units"
251
elif line.startswith("const struct MultiplierStructure"):
252
section = "mult"
253
# Read formats from code comment, e.g.:
254
# b : int8_t
255
elif section == "fmt":
256
if "*/" in line:
257
section = "none"
258
else:
259
parts = line.split(":")
260
log_fmt_lookup[parts[0].strip()] = parts[1].strip()
261
# Read units or multipliers from C struct definition, e.g.:
262
# { '2', 1e2 }, or { 'J', "W.s" },
263
elif section != "none":
264
if "};" in line:
265
section = "none"
266
else:
267
u = re_units_mults_struct.search(line)
268
if u is not None and section == "units":
269
log_units_lookup[u.group(1)] = u.group(2)
270
if u is not None and section == "mult":
271
log_mult_lookup[u.group(1)] = u.group(2)
272
273
def search_for_files(self, dirs_to_search):
274
_next = []
275
for _dir in dirs_to_search:
276
_dir = os.path.join(topdir, _dir)
277
for entry in os.listdir(_dir):
278
filepath = os.path.join(_dir, entry)
279
if os.path.isdir(filepath):
280
_next.append(filepath)
281
continue
282
(name, extension) = os.path.splitext(filepath)
283
if extension not in [".cpp", ".h"]:
284
continue
285
self.files.append(filepath)
286
if len(_next):
287
self.search_for_files(_next)
288
289
def parse_messagedef(self, messagedef):
290
# Merge concatinated strings and remove comments
291
messagedef = re.sub(r'"\s+"', '', messagedef)
292
messagedef = re.sub(r'//[^\n]*', '', messagedef)
293
# Extract details from a structure definition
294
d = re_full_messagedef.search(messagedef)
295
if d is not None:
296
self.msg_fmts_list[d.group(1)] = d.group(2)
297
self.msg_units_list[d.group(1)] = d.group(4)
298
self.msg_mults_list[d.group(1)] = d.group(5)
299
return
300
# Extract details from a WriteStreaming call
301
d = re_full_writecall.search(messagedef)
302
if d is not None:
303
if d.group(1) in self.msg_fmts_list:
304
return
305
if d.group(5) is None:
306
self.msg_fmts_list[d.group(1)] = d.group(3)
307
else:
308
self.msg_fmts_list[d.group(1)] = d.group(6)
309
self.msg_units_list[d.group(1)] = d.group(3)
310
self.msg_mults_list[d.group(1)] = d.group(5)
311
return
312
# Didn't parse
313
# print(f"Unable to parse: {messagedef}")
314
315
def search_messagedef_start(self, line, prevmessagedef=""):
316
# Look for the start of a structure definition
317
d = re_start_messagedef.search(line)
318
if d is not None:
319
messagedef = line
320
if "}" in line:
321
self.parse_messagedef(messagedef)
322
return ""
323
else:
324
return messagedef
325
# Look for a new call to WriteStreaming
326
d = re_start_writecall.search(line)
327
if d is not None:
328
messagedef = line
329
if ";" in line:
330
self.parse_messagedef(messagedef)
331
return ""
332
else:
333
return messagedef
334
# If we didn't find a new one, continue with any previous state
335
return prevmessagedef
336
337
def parse_file(self, filepath):
338
with open(filepath) as f:
339
# print("Opened (%s)" % filepath)
340
lines = f.readlines()
341
f.close()
342
343
def debug(x):
344
pass
345
# if filepath == "/home/pbarker/rc/ardupilot/libraries/AP_HAL/AnalogIn.h":
346
# debug = print
347
state_outside = "outside"
348
state_inside = "inside"
349
messagedef = ""
350
state = state_outside
351
docco = None
352
for line in lines:
353
debug(f"{state}: {line}")
354
if messagedef:
355
messagedef = messagedef + line
356
if "}" in line or ";" in line:
357
self.parse_messagedef(messagedef)
358
messagedef = ""
359
if state == state_outside:
360
# Check for start of a message definition
361
messagedef = self.search_messagedef_start(line, messagedef)
362
363
# Check for fmt/unit/mult #define
364
u = re_fmt_define.search(line)
365
if u is not None:
366
self.msg_fmts_list[u.group(1)] = u.group(2)
367
u = re_units_define.search(line)
368
if u is not None:
369
self.msg_units_list[u.group(1)] = u.group(2)
370
u = re_mults_define.search(line)
371
if u is not None:
372
self.msg_mults_list[u.group(1)] = u.group(2)
373
374
# Check for the @LoggerMessage tag indicating the start of the docco block
375
m = re_loggermessage.search(line)
376
if m is None:
377
continue
378
name = m.group(1)
379
if "," in name:
380
name = name.split(",")
381
state = state_inside
382
docco = LoggerDocco.Docco(name)
383
elif state == state_inside:
384
# If this line is not a comment, then this is the end of the docco block
385
if not re_commentline.match(line):
386
state = state_outside
387
if docco.vehicles is None or self.vehicle in docco.vehicles:
388
self.finalise_docco(docco)
389
messagedef = self.search_messagedef_start(line)
390
continue
391
# Check for an multiple @LoggerMessage lines in this docco block
392
m = re_loggermessage.search(line)
393
if m is not None:
394
name = m.group(1)
395
if "," in name:
396
name = name.split(",")
397
docco.add_name(name)
398
continue
399
# Find and extract data from the various docco fields
400
m = re_description.match(line)
401
if m is not None:
402
docco.set_description(m.group(1))
403
continue
404
m = re_url.match(line)
405
if m is not None:
406
docco.set_url(m.group(1))
407
continue
408
m = re_field.match(line)
409
if m is not None:
410
docco.set_field_description(m.group(1), m.group(2))
411
continue
412
m = re_fieldbits.match(line)
413
if m is not None:
414
docco.set_field_bits(m.group(1), m.group(2))
415
continue
416
m = re_fieldbitmaskenum.match(line)
417
if m is not None:
418
docco.set_fieldbitmaskenum(m.group(1), m.group(2))
419
continue
420
m = re_fieldvalueenum.match(line)
421
if m is not None:
422
docco.set_fieldvalueenum(m.group(1), m.group(2))
423
continue
424
m = re_vehicles.match(line)
425
if m is not None:
426
docco.set_vehicles([x.strip() for x in m.group(1).split(',')])
427
continue
428
print("Unknown field (%s)" % str(line))
429
sys.exit(1)
430
431
def parse_files(self):
432
for _file in self.files:
433
self.parse_file(_file)
434
435
def emit_output(self):
436
# expand things like PIDR,PIDQ,PIDA into multiple doccos
437
new_doccos = []
438
for docco in self.doccos:
439
if isinstance(docco.name, list):
440
for name, desc in zip(docco.name, docco.description):
441
tmpdocco = copy.copy(docco)
442
tmpdocco.name = name
443
tmpdocco.description = desc
444
new_doccos.append(tmpdocco)
445
else:
446
new_doccos.append(docco)
447
new_doccos = sorted(new_doccos, key=lambda x : x.name)
448
449
# Try to attach the formats/units/multipliers
450
for docco in new_doccos:
451
# Apply the Formats to the docco
452
if docco.name in self.msg_fmts_list:
453
if "FMT" in self.msg_fmts_list[docco.name]:
454
if self.msg_fmts_list[docco.name] in self.msg_fmts_list:
455
docco.set_fmts(self.msg_fmts_list[self.msg_fmts_list[docco.name]])
456
else:
457
docco.set_fmts(self.msg_fmts_list[docco.name])
458
else:
459
print(f"No formats found for message {docco.name}")
460
# Get the Units
461
units = None
462
if docco.name in self.msg_units_list:
463
if "UNITS" in self.msg_units_list[docco.name]:
464
if self.msg_units_list[docco.name] in self.msg_units_list:
465
units = self.msg_units_list[self.msg_units_list[docco.name]]
466
else:
467
units = self.msg_units_list[docco.name]
468
# Get the Multipliers
469
mults = None
470
if docco.name in self.msg_mults_list:
471
if "MULTS" in self.msg_mults_list[docco.name]:
472
if self.msg_mults_list[docco.name] in self.msg_mults_list:
473
mults = self.msg_mults_list[self.msg_mults_list[docco.name]]
474
else:
475
mults = self.msg_mults_list[docco.name]
476
# Apply the units/mults to the docco
477
if units is not None and mults is not None:
478
docco.set_units(units, mults)
479
elif units is not None or mults is not None:
480
print(f"Cannot find matching units/mults for message {docco.name}")
481
482
enums_by_name = {}
483
for enum in self.enumerations:
484
enums_by_name[enum.name] = enum
485
for emitter in self.emitters:
486
emitter.emit(new_doccos, enums_by_name)
487
488
def run(self):
489
self.populate_lookups()
490
self.enumerations = enum_parse.EnumDocco(self.vehicle).get_enumerations()
491
self.files = []
492
self.search_for_files([self.vehicle_map[self.vehicle], "libraries"])
493
self.parse_files()
494
self.emit_output()
495
496
def finalise_docco(self, docco):
497
self.doccos.append(docco)
498
self.enumerations += docco.bits_enums
499
500
501
if __name__ == '__main__':
502
parser = argparse.ArgumentParser(description="Parse parameters.")
503
parser.add_argument("-v", "--verbose", dest='verbose', action='store_true', default=False, help="show debugging output")
504
parser.add_argument("--vehicle", required=True, help="Vehicle type to generate for")
505
506
args = parser.parse_args()
507
508
s = LoggerDocco(args.vehicle)
509
510
if args.vehicle not in s.vehicle_map:
511
print("Invalid vehicle (choose from: %s)" % str(s.vehicle_map.keys()))
512
sys.exit(1)
513
514
s.run()
515
516