CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
Ardupilot

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place. Commercial Alternative to JupyterHub.

GitHub Repository: Ardupilot/ardupilot
Path: blob/master/Tools/Replay/check_replay_branch.py
Views: 1798
1
#!/usr/bin/env python
2
3
'''
4
Create a replay log using master branch.
5
Check out a specified branch, compile and run Replay against replay log
6
Run check_replay.py over the produced log
7
'''
8
9
import git
10
import glob
11
import os
12
import subprocess
13
import sys
14
import re
15
16
from pymavlink import DFReader
17
18
import check_replay
19
20
class CheckReplayBranch(object):
21
def __init__(self, master='remotes/origin/master'):
22
self.master = master
23
24
def find_topdir(self):
25
here = os.getcwd()
26
bits = here.split(os.path.sep)
27
while len(bits):
28
# print("bits: %s" % str(bits))
29
tmp = bits[:]
30
tmp.extend([".git"]) # can we look for something more specific?
31
flagfile = os.path.sep.join(tmp)
32
if os.path.exists(flagfile):
33
# print("Found in (%s)" % str(flagfile))
34
return os.path.sep.join(bits)
35
bits = bits[:-2]
36
raise FileNotFoundError()
37
38
def find_repo(self):
39
return git.Repo(self.topdir)
40
41
def assert_tree_clean(self):
42
if self.repo.is_dirty():
43
raise ValueError("Tree is dirty")
44
45
def is_replayable_log(self, logfile_path):
46
'''returns true if an XKF1 or NKF1 message appears in first 5000
47
messages'''
48
dfreader = DFReader.DFReader_binary(logfile_path, zero_time_base=True);
49
seen_log_replay = False
50
seen_log_disarmed = False
51
while True:
52
m = dfreader.recv_match(type='PARM')
53
if m is None:
54
if not seen_log_replay:
55
return False
56
if not seen_log_disarmed:
57
return False
58
break
59
if m.Name == "LOG_REPLAY":
60
if seen_log_replay:
61
return False
62
if m.Value != 1:
63
return False
64
seen_log_replay = True
65
if m.Name == "LOG_DISARMED":
66
if seen_log_disarmed:
67
return False
68
seen_log_disarmed = True
69
if m.Value != 1:
70
return False
71
return False
72
73
def is_replay_log(self, logfile_path):
74
'''returns true if an XKF1 or NKF1 message appears in first 5000
75
messages'''
76
dfreader = DFReader.DFReader_binary(logfile_path, zero_time_base=True);
77
count = 0
78
while True:
79
m = dfreader.recv_match()
80
if m is None:
81
break
82
if m.get_type() == 'XKF1' and m.C >= 100:
83
return True
84
if m.get_type() == 'NKF1' and m.C >= 100:
85
return True
86
count += 1
87
if count > 5000:
88
return False
89
return False
90
91
def progress(self, message):
92
print("CRB: %s" % message)
93
94
def build_replay(self):
95
subprocess.check_call(["./waf", "replay"])
96
97
def run_replay_on_log(self, logfile_path):
98
subprocess.check_call(["./build/sitl/tool/Replay", logfile_path])
99
100
def get_logs(self):
101
return sorted(glob.glob("logs/*.BIN"))
102
103
def run_autotest_replay_on_master(self):
104
# remember where we were:
105
old_branch = self.repo.active_branch
106
107
# check out the master branch:
108
self.repo.head.reference = self.master
109
self.repo.head.reset(index=True, working_tree=True)
110
111
# generate logs:
112
subprocess.check_call(["Tools/autotest/autotest.py", "--debug", "build.Copter", "test.Copter.Replay"])
113
114
# check out the original branch:
115
self.repo.head.reference = old_branch
116
self.repo.head.reset(index=True, working_tree=True)
117
118
def find_replayed_logs(self):
119
'''find logs which were replayed in the autotest'''
120
replayed_logs = set()
121
for logfile_path in self.get_logs():
122
self.progress(" Checking %s" % logfile_path)
123
dfreader = DFReader.DFReader_binary(logfile_path, zero_time_base=True);
124
while True:
125
m = dfreader.recv_match(type='MSG')
126
if m is None:
127
break
128
match = re.match(".*Running replay on \(([^)]+)\).*", m.Message)
129
if match is None:
130
continue
131
replayed_logs.add(match.group(1))
132
return sorted(list(replayed_logs))
133
134
def run(self):
135
self.topdir = self.find_topdir()
136
self.repo = self.find_repo()
137
self.assert_tree_clean()
138
139
os.chdir(self.topdir)
140
self.progress("chdir (%s)" % str(self.topdir))
141
142
self.progress("Running autotest Replay on %s" % self.master)
143
self.run_autotest_replay_on_master()
144
145
self.progress("Building Replay")
146
self.build_replay()
147
self.progress("Build of Replay done")
148
149
# check all replayable logs
150
self.progress("Finding replayed logs")
151
replay_logs = self.find_replayed_logs()
152
success = True
153
if len(replay_logs) == 0:
154
raise ValueError("Found no Replay logs")
155
for log in replay_logs:
156
self.progress("Running Replay on (%s)" % log)
157
old_logs = self.get_logs()
158
self.run_replay_on_log(log)
159
new_logs = self.get_logs()
160
delta = [x for x in new_logs if x not in old_logs]
161
if len(delta) != 1:
162
raise ValueError("Expected a single new log")
163
new_log = delta[0]
164
self.progress("Running check_replay.py on Replay output log: %s" % new_log)
165
166
# run check_replay across Replay log
167
if check_replay.check_log(new_log, verbose=True):
168
self.progress("check_replay.py of (%s): OK" % new_log)
169
else:
170
self.progress("check_replay.py of (%s): FAILED" % new_log)
171
success = False
172
if success:
173
self.progress("All OK")
174
else:
175
self.progress("Failed")
176
177
return success
178
179
if __name__ == '__main__':
180
import sys
181
from argparse import ArgumentParser
182
parser = ArgumentParser(description=__doc__)
183
parser.add_argument("--master", default='remotes/origin/master', help="branch to consider master branch")
184
185
args = parser.parse_args()
186
187
s = CheckReplayBranch(master=args.master)
188
if not s.run():
189
sys.exit(1)
190
191
sys.exit(0)
192
193