Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place. Commercial Alternative to JupyterHub.
Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place. Commercial Alternative to JupyterHub.
Path: blob/master/Tools/Replay/check_replay_branch.py
Views: 1798
#!/usr/bin/env python12'''3Create a replay log using master branch.4Check out a specified branch, compile and run Replay against replay log5Run check_replay.py over the produced log6'''78import git9import glob10import os11import subprocess12import sys13import re1415from pymavlink import DFReader1617import check_replay1819class CheckReplayBranch(object):20def __init__(self, master='remotes/origin/master'):21self.master = master2223def find_topdir(self):24here = os.getcwd()25bits = here.split(os.path.sep)26while len(bits):27# print("bits: %s" % str(bits))28tmp = bits[:]29tmp.extend([".git"]) # can we look for something more specific?30flagfile = os.path.sep.join(tmp)31if os.path.exists(flagfile):32# print("Found in (%s)" % str(flagfile))33return os.path.sep.join(bits)34bits = bits[:-2]35raise FileNotFoundError()3637def find_repo(self):38return git.Repo(self.topdir)3940def assert_tree_clean(self):41if self.repo.is_dirty():42raise ValueError("Tree is dirty")4344def is_replayable_log(self, logfile_path):45'''returns true if an XKF1 or NKF1 message appears in first 500046messages'''47dfreader = DFReader.DFReader_binary(logfile_path, zero_time_base=True);48seen_log_replay = False49seen_log_disarmed = False50while True:51m = dfreader.recv_match(type='PARM')52if m is None:53if not seen_log_replay:54return False55if not seen_log_disarmed:56return False57break58if m.Name == "LOG_REPLAY":59if seen_log_replay:60return False61if m.Value != 1:62return False63seen_log_replay = True64if m.Name == "LOG_DISARMED":65if seen_log_disarmed:66return False67seen_log_disarmed = True68if m.Value != 1:69return False70return False7172def is_replay_log(self, logfile_path):73'''returns true if an XKF1 or NKF1 message appears in first 500074messages'''75dfreader = DFReader.DFReader_binary(logfile_path, zero_time_base=True);76count = 077while True:78m = dfreader.recv_match()79if m is None:80break81if m.get_type() == 'XKF1' and m.C >= 100:82return True83if m.get_type() == 'NKF1' and m.C >= 100:84return True85count += 186if count > 5000:87return False88return False8990def progress(self, message):91print("CRB: %s" % message)9293def build_replay(self):94subprocess.check_call(["./waf", "replay"])9596def run_replay_on_log(self, logfile_path):97subprocess.check_call(["./build/sitl/tool/Replay", logfile_path])9899def get_logs(self):100return sorted(glob.glob("logs/*.BIN"))101102def run_autotest_replay_on_master(self):103# remember where we were:104old_branch = self.repo.active_branch105106# check out the master branch:107self.repo.head.reference = self.master108self.repo.head.reset(index=True, working_tree=True)109110# generate logs:111subprocess.check_call(["Tools/autotest/autotest.py", "--debug", "build.Copter", "test.Copter.Replay"])112113# check out the original branch:114self.repo.head.reference = old_branch115self.repo.head.reset(index=True, working_tree=True)116117def find_replayed_logs(self):118'''find logs which were replayed in the autotest'''119replayed_logs = set()120for logfile_path in self.get_logs():121self.progress(" Checking %s" % logfile_path)122dfreader = DFReader.DFReader_binary(logfile_path, zero_time_base=True);123while True:124m = dfreader.recv_match(type='MSG')125if m is None:126break127match = re.match(".*Running replay on \(([^)]+)\).*", m.Message)128if match is None:129continue130replayed_logs.add(match.group(1))131return sorted(list(replayed_logs))132133def run(self):134self.topdir = self.find_topdir()135self.repo = self.find_repo()136self.assert_tree_clean()137138os.chdir(self.topdir)139self.progress("chdir (%s)" % str(self.topdir))140141self.progress("Running autotest Replay on %s" % self.master)142self.run_autotest_replay_on_master()143144self.progress("Building Replay")145self.build_replay()146self.progress("Build of Replay done")147148# check all replayable logs149self.progress("Finding replayed logs")150replay_logs = self.find_replayed_logs()151success = True152if len(replay_logs) == 0:153raise ValueError("Found no Replay logs")154for log in replay_logs:155self.progress("Running Replay on (%s)" % log)156old_logs = self.get_logs()157self.run_replay_on_log(log)158new_logs = self.get_logs()159delta = [x for x in new_logs if x not in old_logs]160if len(delta) != 1:161raise ValueError("Expected a single new log")162new_log = delta[0]163self.progress("Running check_replay.py on Replay output log: %s" % new_log)164165# run check_replay across Replay log166if check_replay.check_log(new_log, verbose=True):167self.progress("check_replay.py of (%s): OK" % new_log)168else:169self.progress("check_replay.py of (%s): FAILED" % new_log)170success = False171if success:172self.progress("All OK")173else:174self.progress("Failed")175176return success177178if __name__ == '__main__':179import sys180from argparse import ArgumentParser181parser = ArgumentParser(description=__doc__)182parser.add_argument("--master", default='remotes/origin/master', help="branch to consider master branch")183184args = parser.parse_args()185186s = CheckReplayBranch(master=args.master)187if not s.run():188sys.exit(1)189190sys.exit(0)191192193