Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place. Commercial Alternative to JupyterHub.
Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place. Commercial Alternative to JupyterHub.
Path: blob/master/Tools/autotest/logger_metadata/parse.py
Views: 1799
#!/usr/bin/env python12'''3AP_FLAKE8_CLEAN4'''56from __future__ import print_function78import argparse9import copy10import os11import re12import sys1314import emit_html15import emit_rst16import emit_xml17import emit_md1819import enum_parse20from enum_parse import EnumDocco2122topdir = os.path.join(os.path.dirname(os.path.realpath(__file__)), '../../../')23topdir = os.path.realpath(topdir)2425# Regular expressions for finding message information in code comments26re_loggermessage = re.compile(r"@LoggerMessage\s*:\s*([\w,]+)", re.MULTILINE)27re_commentline = re.compile(r"\s*//")28re_description = re.compile(r"\s*//\s*@Description\s*:\s*(.*)")29re_url = re.compile(r"\s*//\s*@URL\s*:\s*(.*)")30re_field = re.compile(r"\s*//\s*@Field\s*:\s*(\w+):\s*(.*)")31re_fieldbits = re.compile(r"\s*//\s*@FieldBits\s*:\s*(\w+):\s*(.*)")32re_fieldbitmaskenum = re.compile(r"\s*//\s*@FieldBitmaskEnum\s*:\s*(\w+):\s*(.*)")33re_fieldvalueenum = re.compile(r"\s*//\s*@FieldValueEnum\s*:\s*(\w+):\s*(.*)")34re_vehicles = re.compile(r"\s*//\s*@Vehicles\s*:\s*(.*)")3536# Regular expressions for finding message definitions in structure format37re_start_messagedef = re.compile(r"^\s*{?\s*LOG_[A-Z0-9_]+_[MSGTA]+[A-Z0-9_]*\s*,")38re_deffield = r'[\s\\]*"?([\w\-#?%]+)"?\s*'39re_full_messagedef = re.compile(r'\s*LOG_\w+\s*,\s*\w+\([^)]+\)[\s\\]*,' +40f'{re_deffield},{re_deffield},' +41r'[\s\\]*"?([\w,]+)"?[\s\\]*,' +42f'{re_deffield},{re_deffield}',43re.MULTILINE)44re_fmt_define = re.compile(r'#define\s+(\w+_FMT)\s+"([\w\-#?%]+)"')45re_units_define = re.compile(r'#define\s+(\w+_UNITS)\s+"([\w\-#?%]+)"')46re_mults_define = re.compile(r'#define\s+(\w+_MULTS)\s+"([\w\-#?%]+)"')4748# Regular expressions for finding message definitions in Write calls49re_start_writecall = re.compile(r"\s*[AP:]*logger[\(\)]*.Write[StreamingCrcl]*\(")50re_writefield = r'\s*"([\w\-#?%,]+)"\s*'51re_full_writecall = re.compile(r'\s*[AP:]*logger[\(\)]*.Write[StreamingCrcl]*\(' +52f'{re_writefield},{re_writefield},{re_writefield},({re_writefield},{re_writefield})?',53re.MULTILINE)5455# Regular expression for extracting unit and multipliers from structure56re_units_mults_struct = re.compile(r"^\s*{\s*'([\w\-#?%!/])',"+r'\s*"?([\w\-#?%./]*)"?\s*}')5758# TODO: validate URLS actually return 2005960# Lookup tables are populated by reading LogStructure.h61log_fmt_lookup = {}62log_units_lookup = {}63log_mult_lookup = {}6465# Lookup table to convert multiplier to prefix66mult_prefix_lookup = {670: "",681: "",691e-1: "d", # deci-701e-2: "c", # centi-711e-3: "m", # milli-721e-6: "μ", # micro-731e-9: "n" # nano-74}757677class LoggerDocco(object):7879vehicle_map = {80"Rover": "Rover",81"Sub": "ArduSub",82"Copter": "ArduCopter",83"Plane": "ArduPlane",84"Tracker": "AntennaTracker",85"Blimp": "Blimp",86}8788def __init__(self, vehicle):89self.vehicle = vehicle90self.doccos = []91self.emitters = [92emit_html.HTMLEmitter(),93emit_rst.RSTEmitter(),94emit_xml.XMLEmitter(),95emit_md.MDEmitter(),96]97self.msg_fmts_list = {}98self.msg_units_list = {}99self.msg_mults_list = {}100101class Docco(object):102103def __init__(self, name):104self.name = name105self.url = None106if isinstance(name, list):107self.description = [None] * len(name)108else:109self.description = None110self.fields = {}111self.fields_order = []112self.vehicles = None113self.bits_enums = []114115def add_name(self, name):116# If self.name/description aren't lists, convert them117if isinstance(self.name, str):118self.name = [self.name]119self.description = [self.description]120# Replace any existing empty descriptions with empty strings121for i in range(0, len(self.description)):122if self.description[i] is None:123self.description[i] = ""124# Extend the name and description lists125if isinstance(name, list):126self.name.extend(name)127self.description.extend([None] * len(name))128else:129self.name.append(name)130self.description.append(None)131132def set_description(self, desc):133if isinstance(self.description, list):134for i in range(0, len(self.description)):135if self.description[i] is None:136self.description[i] = desc137else:138self.description = desc139140def set_url(self, url):141self.url = url142143def ensure_field(self, field):144if field not in self.fields:145self.fields[field] = {}146self.fields_order.append(field)147148def set_field_description(self, field, description):149if field in self.fields:150raise ValueError("Already have field %s in %s" %151(field, self.name))152self.ensure_field(field)153self.fields[field]["description"] = description154155def set_field_bits(self, field, bits):156bits = bits.split(",")157count = 0158entries = []159for bit in bits:160entries.append(EnumDocco.EnumEntry(bit, 1 << count, None))161count += 1162bitmask_name = self.name + field163self.bits_enums.append(EnumDocco.Enumeration(bitmask_name, entries))164self.ensure_field(field)165self.fields[field]["bitmaskenum"] = bitmask_name166167def set_fieldbitmaskenum(self, field, bits):168self.ensure_field(field)169self.fields[field]["bitmaskenum"] = bits170171def set_fieldvalueenum(self, field, bits):172self.ensure_field(field)173self.fields[field]["valueenum"] = bits174175def set_vehicles(self, vehicles):176self.vehicles = vehicles177178def set_fmts(self, fmts):179# If no fields are defined, do nothing180if len(self.fields_order) == 0:181return182# Make sure lengths match up183if len(fmts) != len(self.fields_order):184print(f"Number of fmts don't match fields: msg={self.name} fmts={fmts} num_fields={len(self.fields_order)} {self.fields_order}") # noqa:E501185return186# Loop through the list187for idx in range(0, len(fmts)):188if fmts[idx] in log_fmt_lookup:189self.fields[self.fields_order[idx]]["fmt"] = log_fmt_lookup[fmts[idx]]190else:191print(f"Unrecognised format character: {fmts[idx]} in message {self.name}")192193def set_units(self, units, mults):194# If no fields are defined, do nothing195if len(self.fields_order) == 0:196return197# Make sure lengths match up198if len(units) != len(self.fields_order) or len(units) != len(mults):199print(f"Number of units/mults/fields don't match: msg={self.name} units={units} mults={mults} num_fields={len(self.fields_order)}") # noqa:E501200return201# Loop through the list202for idx in range(0, len(units)):203# Get the index into fields from field_order204f = self.fields_order[idx]205# Convert unit char to base unit206if units[idx] in log_units_lookup:207baseunit = log_units_lookup[units[idx]]208else:209print(f"Unrecognised units character: {units[idx]} in message {self.name}")210continue211# Do nothing if this field has no unit defined212if baseunit == "":213continue214# Convert mult char to value215if mults[idx] in log_mult_lookup:216mult = log_mult_lookup[mults[idx]]217mult_num = float(mult)218else:219print(f"Unrecognised multiplier character: {mults[idx]} in message {self.name}")220continue221# Check if the defined format for this field contains its own multiplier222# If so, the presented value will be the base-unit directly223if 'fmt' in self.fields[f] and self.fields[f]['fmt'].endswith("* 100"):224self.fields[f]["units"] = baseunit225elif 'fmt' in self.fields[f] and "latitude/longitude" in self.fields[f]['fmt']:226self.fields[f]["units"] = baseunit227# Check if we have a defined prefix for this multiplier228elif mult_num in mult_prefix_lookup:229self.fields[f]["units"] = f"{mult_prefix_lookup[mult_num]}{baseunit}"230# If all else fails, set the unit as the multipler and base unit together231else:232self.fields[f]["units"] = f"{mult} {baseunit}"233234def populate_lookups(self):235# Initialise the lookup tables236# Read the contents of the LogStructure.h file237structfile = os.path.join(topdir, "libraries", "AP_Logger", "LogStructure.h")238with open(structfile) as f:239lines = f.readlines()240f.close()241# Initialise current section to none242section = "none"243# Loop through the lines in the file244for line in lines:245# Look for the start of fmt/unit/mult info246if line.startswith("Format characters"):247section = "fmt"248elif line.startswith("const struct UnitStructure"):249section = "units"250elif line.startswith("const struct MultiplierStructure"):251section = "mult"252# Read formats from code comment, e.g.:253# b : int8_t254elif section == "fmt":255if "*/" in line:256section = "none"257else:258parts = line.split(":")259log_fmt_lookup[parts[0].strip()] = parts[1].strip()260# Read units or multipliers from C struct definition, e.g.:261# { '2', 1e2 }, or { 'J', "W.s" },262elif section != "none":263if "};" in line:264section = "none"265else:266u = re_units_mults_struct.search(line)267if u is not None and section == "units":268log_units_lookup[u.group(1)] = u.group(2)269if u is not None and section == "mult":270log_mult_lookup[u.group(1)] = u.group(2)271272def search_for_files(self, dirs_to_search):273_next = []274for _dir in dirs_to_search:275_dir = os.path.join(topdir, _dir)276for entry in os.listdir(_dir):277filepath = os.path.join(_dir, entry)278if os.path.isdir(filepath):279_next.append(filepath)280continue281(name, extension) = os.path.splitext(filepath)282if extension not in [".cpp", ".h"]:283continue284self.files.append(filepath)285if len(_next):286self.search_for_files(_next)287288def parse_messagedef(self, messagedef):289# Merge concatinated strings and remove comments290messagedef = re.sub(r'"\s+"', '', messagedef)291messagedef = re.sub(r'//[^\n]*', '', messagedef)292# Extract details from a structure definition293d = re_full_messagedef.search(messagedef)294if d is not None:295self.msg_fmts_list[d.group(1)] = d.group(2)296self.msg_units_list[d.group(1)] = d.group(4)297self.msg_mults_list[d.group(1)] = d.group(5)298return299# Extract details from a WriteStreaming call300d = re_full_writecall.search(messagedef)301if d is not None:302if d.group(1) in self.msg_fmts_list:303return304if d.group(5) is None:305self.msg_fmts_list[d.group(1)] = d.group(3)306else:307self.msg_fmts_list[d.group(1)] = d.group(6)308self.msg_units_list[d.group(1)] = d.group(3)309self.msg_mults_list[d.group(1)] = d.group(5)310return311# Didn't parse312# print(f"Unable to parse: {messagedef}")313314def search_messagedef_start(self, line, prevmessagedef=""):315# Look for the start of a structure definition316d = re_start_messagedef.search(line)317if d is not None:318messagedef = line319if "}" in line:320self.parse_messagedef(messagedef)321return ""322else:323return messagedef324# Look for a new call to WriteStreaming325d = re_start_writecall.search(line)326if d is not None:327messagedef = line328if ";" in line:329self.parse_messagedef(messagedef)330return ""331else:332return messagedef333# If we didn't find a new one, continue with any previous state334return prevmessagedef335336def parse_file(self, filepath):337with open(filepath) as f:338# print("Opened (%s)" % filepath)339lines = f.readlines()340f.close()341342def debug(x):343pass344# if filepath == "/home/pbarker/rc/ardupilot/libraries/AP_HAL/AnalogIn.h":345# debug = print346state_outside = "outside"347state_inside = "inside"348messagedef = ""349state = state_outside350docco = None351for line in lines:352debug(f"{state}: {line}")353if messagedef:354messagedef = messagedef + line355if "}" in line or ";" in line:356self.parse_messagedef(messagedef)357messagedef = ""358if state == state_outside:359# Check for start of a message definition360messagedef = self.search_messagedef_start(line, messagedef)361362# Check for fmt/unit/mult #define363u = re_fmt_define.search(line)364if u is not None:365self.msg_fmts_list[u.group(1)] = u.group(2)366u = re_units_define.search(line)367if u is not None:368self.msg_units_list[u.group(1)] = u.group(2)369u = re_mults_define.search(line)370if u is not None:371self.msg_mults_list[u.group(1)] = u.group(2)372373# Check for the @LoggerMessage tag indicating the start of the docco block374m = re_loggermessage.search(line)375if m is None:376continue377name = m.group(1)378if "," in name:379name = name.split(",")380state = state_inside381docco = LoggerDocco.Docco(name)382elif state == state_inside:383# If this line is not a comment, then this is the end of the docco block384if not re_commentline.match(line):385state = state_outside386if docco.vehicles is None or self.vehicle in docco.vehicles:387self.finalise_docco(docco)388messagedef = self.search_messagedef_start(line)389continue390# Check for an multiple @LoggerMessage lines in this docco block391m = re_loggermessage.search(line)392if m is not None:393name = m.group(1)394if "," in name:395name = name.split(",")396docco.add_name(name)397continue398# Find and extract data from the various docco fields399m = re_description.match(line)400if m is not None:401docco.set_description(m.group(1))402continue403m = re_url.match(line)404if m is not None:405docco.set_url(m.group(1))406continue407m = re_field.match(line)408if m is not None:409docco.set_field_description(m.group(1), m.group(2))410continue411m = re_fieldbits.match(line)412if m is not None:413docco.set_field_bits(m.group(1), m.group(2))414continue415m = re_fieldbitmaskenum.match(line)416if m is not None:417docco.set_fieldbitmaskenum(m.group(1), m.group(2))418continue419m = re_fieldvalueenum.match(line)420if m is not None:421docco.set_fieldvalueenum(m.group(1), m.group(2))422continue423m = re_vehicles.match(line)424if m is not None:425docco.set_vehicles([x.strip() for x in m.group(1).split(',')])426continue427print("Unknown field (%s)" % str(line))428sys.exit(1)429430def parse_files(self):431for _file in self.files:432self.parse_file(_file)433434def emit_output(self):435# expand things like PIDR,PIDQ,PIDA into multiple doccos436new_doccos = []437for docco in self.doccos:438if isinstance(docco.name, list):439for name, desc in zip(docco.name, docco.description):440tmpdocco = copy.copy(docco)441tmpdocco.name = name442tmpdocco.description = desc443new_doccos.append(tmpdocco)444else:445new_doccos.append(docco)446new_doccos = sorted(new_doccos, key=lambda x : x.name)447448# Try to attach the formats/units/multipliers449for docco in new_doccos:450# Apply the Formats to the docco451if docco.name in self.msg_fmts_list:452if "FMT" in self.msg_fmts_list[docco.name]:453if self.msg_fmts_list[docco.name] in self.msg_fmts_list:454docco.set_fmts(self.msg_fmts_list[self.msg_fmts_list[docco.name]])455else:456docco.set_fmts(self.msg_fmts_list[docco.name])457else:458print(f"No formats found for message {docco.name}")459# Get the Units460units = None461if docco.name in self.msg_units_list:462if "UNITS" in self.msg_units_list[docco.name]:463if self.msg_units_list[docco.name] in self.msg_units_list:464units = self.msg_units_list[self.msg_units_list[docco.name]]465else:466units = self.msg_units_list[docco.name]467# Get the Multipliers468mults = None469if docco.name in self.msg_mults_list:470if "MULTS" in self.msg_mults_list[docco.name]:471if self.msg_mults_list[docco.name] in self.msg_mults_list:472mults = self.msg_mults_list[self.msg_mults_list[docco.name]]473else:474mults = self.msg_mults_list[docco.name]475# Apply the units/mults to the docco476if units is not None and mults is not None:477docco.set_units(units, mults)478elif units is not None or mults is not None:479print(f"Cannot find matching units/mults for message {docco.name}")480481enums_by_name = {}482for enum in self.enumerations:483enums_by_name[enum.name] = enum484for emitter in self.emitters:485emitter.emit(new_doccos, enums_by_name)486487def run(self):488self.populate_lookups()489self.enumerations = enum_parse.EnumDocco(self.vehicle).get_enumerations()490self.files = []491self.search_for_files([self.vehicle_map[self.vehicle], "libraries"])492self.parse_files()493self.emit_output()494495def finalise_docco(self, docco):496self.doccos.append(docco)497self.enumerations += docco.bits_enums498499500if __name__ == '__main__':501parser = argparse.ArgumentParser(description="Parse parameters.")502parser.add_argument("-v", "--verbose", dest='verbose', action='store_true', default=False, help="show debugging output")503parser.add_argument("--vehicle", required=True, help="Vehicle type to generate for")504505args = parser.parse_args()506507s = LoggerDocco(args.vehicle)508509if args.vehicle not in s.vehicle_map:510print("Invalid vehicle (choose from: %s)" % str(s.vehicle_map.keys()))511sys.exit(1)512513s.run()514515516