Path: blob/master/Tools/Replay/check_replay_branch.py
9743 views
#!/usr/bin/env python312# flake8: noqa34'''5Create a replay log using master branch.6Check out a specified branch, compile and run Replay against replay log7Run check_replay.py over the produced log8'''910import git # https://pypi.org/project/GitPython/11import glob12import os13import subprocess14import sys15import re1617from pymavlink import DFReader1819import check_replay2021class CheckReplayBranch(object):22def __init__(self, master='master', no_clean=False, no_debug=False):23self.master = master24self.no_clean = no_clean25self.no_debug = no_debug2627def find_topdir(self):28here = os.getcwd()29bits = here.split(os.path.sep)30while len(bits):31# print("bits: %s" % str(bits))32tmp = bits[:]33tmp.extend([".git"]) # can we look for something more specific?34flagfile = os.path.sep.join(tmp)35if os.path.exists(flagfile):36# print("Found in (%s)" % str(flagfile))37return os.path.sep.join(bits)38bits = bits[:-2]39raise FileNotFoundError()4041def find_repo(self):42return git.Repo(self.topdir)4344def assert_tree_clean(self):45if self.repo.is_dirty():46raise ValueError("Tree is dirty")4748def is_replayable_log(self, logfile_path):49'''returns true if an XKF1 or NKF1 message appears in first 500050messages'''51dfreader = DFReader.DFReader_binary(logfile_path, zero_time_base=True);52seen_log_replay = False53seen_log_disarmed = False54while True:55m = dfreader.recv_match(type='PARM')56if m is None:57if not seen_log_replay:58return False59if not seen_log_disarmed:60return False61break62if m.Name == "LOG_REPLAY":63if seen_log_replay:64return False65if m.Value != 1:66return False67seen_log_replay = True68if m.Name == "LOG_DISARMED":69if seen_log_disarmed:70return False71seen_log_disarmed = True72if m.Value != 1:73return False74return False7576def is_replay_log(self, logfile_path):77'''returns true if an XKF1 or NKF1 message appears in first 500078messages'''79dfreader = DFReader.DFReader_binary(logfile_path, zero_time_base=True);80count = 081while True:82m = dfreader.recv_match()83if m is None:84break85if m.get_type() == 'XKF1' and m.C >= 100:86return True87if m.get_type() == 'NKF1' and m.C >= 100:88return True89count += 190if count > 5000:91return False92return False9394def progress(self, message):95print("CRB: %s" % message)9697def build_replay(self):98subprocess.check_call(["./waf", "replay"])99100def run_replay_on_log(self, logfile_path):101subprocess.check_call(["./build/sitl/tool/Replay", logfile_path])102103def get_logs(self):104return sorted(glob.glob("logs/*.BIN"))105106def run_autotest_replay_on_master(self):107# remember where we were:108old_branch = self.repo.active_branch109110# check out the master branch:111self.repo.head.reference = self.master112self.repo.head.reset(index=True, working_tree=True)113114# generate logs:115args = ["Tools/autotest/autotest.py"]116117if self.no_debug:118args.append("--no-debug")119else:120args.append("--debug")121122if self.no_clean:123args.append("--no-clean")124125args.extend(["build.Copter", "test.Copter.Replay"])126127subprocess.check_call(args) # actually run the test128129# check out the original branch:130self.repo.head.reference = old_branch131self.repo.head.reset(index=True, working_tree=True)132133def find_replayed_logs(self):134'''find logs which were replayed in the autotest'''135replayed_logs = set()136for logfile_path in self.get_logs():137self.progress(" Checking %s" % logfile_path)138dfreader = DFReader.DFReader_binary(logfile_path, zero_time_base=True);139while True:140m = dfreader.recv_match(type='MSG')141if m is None:142break143match = re.match(r".*Running replay on \(([^)]+)\).*", m.Message)144if match is None:145continue146replayed_logs.add(match.group(1))147return sorted(list(replayed_logs))148149def run(self):150self.topdir = self.find_topdir()151self.repo = self.find_repo()152self.assert_tree_clean()153154os.chdir(self.topdir)155self.progress("chdir (%s)" % str(self.topdir))156157self.progress("Running autotest Replay on %s" % self.master)158self.run_autotest_replay_on_master()159160self.progress("Building Replay")161self.build_replay()162self.progress("Build of Replay done")163164# check all replayable logs165self.progress("Finding replayed logs")166replay_logs = self.find_replayed_logs()167success = True168if len(replay_logs) == 0:169raise ValueError("Found no Replay logs")170for log in replay_logs:171self.progress("Running Replay on (%s)" % log)172old_logs = self.get_logs()173self.run_replay_on_log(log)174new_logs = self.get_logs()175delta = [x for x in new_logs if x not in old_logs]176if len(delta) != 1:177raise ValueError("Expected a single new log")178new_log = delta[0]179self.progress("Running check_replay.py on Replay output log: %s" % new_log)180181# run check_replay across Replay log182if check_replay.check_log(new_log, verbose=True):183self.progress("check_replay.py of (%s): OK" % new_log)184else:185self.progress("check_replay.py of (%s): FAILED" % new_log)186success = False187if success:188self.progress("All OK")189else:190self.progress("Failed")191192return success193194if __name__ == '__main__':195import sys196from argparse import ArgumentParser197parser = ArgumentParser(description=__doc__)198parser.add_argument("--master", default='master', help="branch to consider master branch")199parser.add_argument("--no-clean", action="store_true", help="do not clean SITL before building")200parser.add_argument("--no-debug", action="store_true", help="do not make built SITL binaries debug binaries")201202args = parser.parse_args()203204s = CheckReplayBranch(master=args.master, no_clean=args.no_clean, no_debug=args.no_debug)205if not s.run():206sys.exit(1)207208sys.exit(0)209210211