Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Ardupilot
GitHub Repository: Ardupilot/ardupilot
Path: blob/master/Tools/Replay/check_replay_branch.py
9743 views
1
#!/usr/bin/env python3
2
3
# flake8: noqa
4
5
'''
6
Create a replay log using master branch.
7
Check out a specified branch, compile and run Replay against replay log
8
Run check_replay.py over the produced log
9
'''
10
11
import git # https://pypi.org/project/GitPython/
12
import glob
13
import os
14
import subprocess
15
import sys
16
import re
17
18
from pymavlink import DFReader
19
20
import check_replay
21
22
class CheckReplayBranch(object):
23
def __init__(self, master='master', no_clean=False, no_debug=False):
24
self.master = master
25
self.no_clean = no_clean
26
self.no_debug = no_debug
27
28
def find_topdir(self):
29
here = os.getcwd()
30
bits = here.split(os.path.sep)
31
while len(bits):
32
# print("bits: %s" % str(bits))
33
tmp = bits[:]
34
tmp.extend([".git"]) # can we look for something more specific?
35
flagfile = os.path.sep.join(tmp)
36
if os.path.exists(flagfile):
37
# print("Found in (%s)" % str(flagfile))
38
return os.path.sep.join(bits)
39
bits = bits[:-2]
40
raise FileNotFoundError()
41
42
def find_repo(self):
43
return git.Repo(self.topdir)
44
45
def assert_tree_clean(self):
46
if self.repo.is_dirty():
47
raise ValueError("Tree is dirty")
48
49
def is_replayable_log(self, logfile_path):
50
'''returns true if an XKF1 or NKF1 message appears in first 5000
51
messages'''
52
dfreader = DFReader.DFReader_binary(logfile_path, zero_time_base=True);
53
seen_log_replay = False
54
seen_log_disarmed = False
55
while True:
56
m = dfreader.recv_match(type='PARM')
57
if m is None:
58
if not seen_log_replay:
59
return False
60
if not seen_log_disarmed:
61
return False
62
break
63
if m.Name == "LOG_REPLAY":
64
if seen_log_replay:
65
return False
66
if m.Value != 1:
67
return False
68
seen_log_replay = True
69
if m.Name == "LOG_DISARMED":
70
if seen_log_disarmed:
71
return False
72
seen_log_disarmed = True
73
if m.Value != 1:
74
return False
75
return False
76
77
def is_replay_log(self, logfile_path):
78
'''returns true if an XKF1 or NKF1 message appears in first 5000
79
messages'''
80
dfreader = DFReader.DFReader_binary(logfile_path, zero_time_base=True);
81
count = 0
82
while True:
83
m = dfreader.recv_match()
84
if m is None:
85
break
86
if m.get_type() == 'XKF1' and m.C >= 100:
87
return True
88
if m.get_type() == 'NKF1' and m.C >= 100:
89
return True
90
count += 1
91
if count > 5000:
92
return False
93
return False
94
95
def progress(self, message):
96
print("CRB: %s" % message)
97
98
def build_replay(self):
99
subprocess.check_call(["./waf", "replay"])
100
101
def run_replay_on_log(self, logfile_path):
102
subprocess.check_call(["./build/sitl/tool/Replay", logfile_path])
103
104
def get_logs(self):
105
return sorted(glob.glob("logs/*.BIN"))
106
107
def run_autotest_replay_on_master(self):
108
# remember where we were:
109
old_branch = self.repo.active_branch
110
111
# check out the master branch:
112
self.repo.head.reference = self.master
113
self.repo.head.reset(index=True, working_tree=True)
114
115
# generate logs:
116
args = ["Tools/autotest/autotest.py"]
117
118
if self.no_debug:
119
args.append("--no-debug")
120
else:
121
args.append("--debug")
122
123
if self.no_clean:
124
args.append("--no-clean")
125
126
args.extend(["build.Copter", "test.Copter.Replay"])
127
128
subprocess.check_call(args) # actually run the test
129
130
# check out the original branch:
131
self.repo.head.reference = old_branch
132
self.repo.head.reset(index=True, working_tree=True)
133
134
def find_replayed_logs(self):
135
'''find logs which were replayed in the autotest'''
136
replayed_logs = set()
137
for logfile_path in self.get_logs():
138
self.progress(" Checking %s" % logfile_path)
139
dfreader = DFReader.DFReader_binary(logfile_path, zero_time_base=True);
140
while True:
141
m = dfreader.recv_match(type='MSG')
142
if m is None:
143
break
144
match = re.match(r".*Running replay on \(([^)]+)\).*", m.Message)
145
if match is None:
146
continue
147
replayed_logs.add(match.group(1))
148
return sorted(list(replayed_logs))
149
150
def run(self):
151
self.topdir = self.find_topdir()
152
self.repo = self.find_repo()
153
self.assert_tree_clean()
154
155
os.chdir(self.topdir)
156
self.progress("chdir (%s)" % str(self.topdir))
157
158
self.progress("Running autotest Replay on %s" % self.master)
159
self.run_autotest_replay_on_master()
160
161
self.progress("Building Replay")
162
self.build_replay()
163
self.progress("Build of Replay done")
164
165
# check all replayable logs
166
self.progress("Finding replayed logs")
167
replay_logs = self.find_replayed_logs()
168
success = True
169
if len(replay_logs) == 0:
170
raise ValueError("Found no Replay logs")
171
for log in replay_logs:
172
self.progress("Running Replay on (%s)" % log)
173
old_logs = self.get_logs()
174
self.run_replay_on_log(log)
175
new_logs = self.get_logs()
176
delta = [x for x in new_logs if x not in old_logs]
177
if len(delta) != 1:
178
raise ValueError("Expected a single new log")
179
new_log = delta[0]
180
self.progress("Running check_replay.py on Replay output log: %s" % new_log)
181
182
# run check_replay across Replay log
183
if check_replay.check_log(new_log, verbose=True):
184
self.progress("check_replay.py of (%s): OK" % new_log)
185
else:
186
self.progress("check_replay.py of (%s): FAILED" % new_log)
187
success = False
188
if success:
189
self.progress("All OK")
190
else:
191
self.progress("Failed")
192
193
return success
194
195
if __name__ == '__main__':
196
import sys
197
from argparse import ArgumentParser
198
parser = ArgumentParser(description=__doc__)
199
parser.add_argument("--master", default='master', help="branch to consider master branch")
200
parser.add_argument("--no-clean", action="store_true", help="do not clean SITL before building")
201
parser.add_argument("--no-debug", action="store_true", help="do not make built SITL binaries debug binaries")
202
203
args = parser.parse_args()
204
205
s = CheckReplayBranch(master=args.master, no_clean=args.no_clean, no_debug=args.no_debug)
206
if not s.run():
207
sys.exit(1)
208
209
sys.exit(0)
210
211