CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
Ardupilot

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place. Commercial Alternative to JupyterHub.

GitHub Repository: Ardupilot/ardupilot
Path: blob/master/Tools/autotest/test_param_upgrade.py
Views: 1798
1
#!/usr/bin/env python3
2
3
'''
4
Test parameter upgrade, master vs branch
5
6
./Tools/autotest/test_param_upgrade.py --vehicle=arduplane --param "GPS_TYPE=17->GPS1_TYPE=17" --param "GPS_TYPE2=37->GPS2_TYPE=37" --param "GPS_GNSS_MODE=21->GPS1_GNSS_MODE=21" --param "GPS_GNSS_MODE2=45->GPS2_GNSS_MODE=45" --param "GPS_RATE_MS=186->GPS1_RATE_MS=186" --param "GPS_RATE_MS2=123->GPS2_RATE_MS=123" --param "GPS_POS1_X=3.75->GPS1_POS_X=3.75" --param "GPS_POS2_X=6.9->GPS2_POS_X=6.9" --param "GPS_POS1_Y=2.75->GPS1_POS_Y=2.75" --param "GPS_POS2_Y=5.9->GPS2_POS_Y=5.9" --param "GPS_POS1_Z=12.1->GPS1_POS_Z=12.1" --param "GPS_POS2_Z=-6.9->GPS2_POS_Z=-6.9" --param "GPS_DELAY_MS=987->GPS1_DELAY_MS=987" --param "GPS_DELAY_MS2=2345->GPS2_DELAY_MS=2345" --param "GPS_COM_PORT=19->GPS1_COM_PORT=19" --param "GPS_COM_PORT2=100->GPS2_COM_PORT=100" --param "GPS_CAN_NODEID1=109->GPS1_CAN_NODEID=109" --param "GPS_CAN_NODEID2=102->GPS2_CAN_NODEID=102" --param "GPS1_CAN_OVRIDE=34->GPS1_CAN_OVRIDE=34" --param "GPS2_CAN_OVRIDE=67" --param "GPS_MB1_TYPE=1->GPS1_MB_TYPE=1" --param "GPS_MB1_OFS_X=3.14->GPS1_MB_OFS_X=3.14" --param "GPS_MB1_OFS_Y=2.18->GPS1_MB_OFS_Y=2.18" --param "GPS_MB1_OFS_Z=17.6->GPS1_MB_OFS_Z=17.6" --param "GPS_MB2_TYPE=13->GPS2_MB_TYPE=13" --param "GPS_MB2_OFS_X=17.14->GPS2_MB_OFS_X=17.14" --param "GPS_MB2_OFS_Y=12.18->GPS2_MB_OFS_Y=12.18" --param "GPS_MB2_OFS_Z=27.6->GPS2_MB_OFS_Z=27.6" # noqa
7
8
AP_FLAKE8_CLEAN
9
'''
10
11
import vehicle_test_suite
12
import os
13
import sys
14
import argparse
15
import subprocess
16
import time
17
import shutil
18
import string
19
import pathlib
20
21
from pysim import util
22
23
24
class ParamChange():
25
def __init__(self, old_name, old_value, new_name, new_value):
26
self.old_name = old_name
27
self.old_value = old_value
28
self.new_name = new_name
29
self.new_value = new_value
30
31
def __str__(self):
32
return f"{self.old_name}={self.old_value}->{self.new_name}={self.new_value}"
33
34
35
class TestParamUpgradeTestSuiteSetParameters(vehicle_test_suite.TestSuite):
36
def __init__(self, binary, param_changes):
37
super(TestParamUpgradeTestSuiteSetParameters, self).__init__(binary)
38
self.param_changes = param_changes
39
40
def sysid_thismav(self):
41
if "antennatracker" in self.binary:
42
return 2
43
return super(TestParamUpgradeTestSuiteSetParameters, self).sysid_thismav()
44
45
def vehicleinfo_key(self):
46
'''magically guess vehicleinfo_key from filepath'''
47
path = self.binary.lower()
48
if "plane" in path:
49
return "ArduPlane"
50
if "copter" in path:
51
return "ArduCopter"
52
raise ValueError("Can't determine vehicleinfo_key from binary path")
53
54
def model(self):
55
path = self.binary.lower()
56
if "plane" in path:
57
return "quadplane"
58
if "copter" in path:
59
return "X"
60
raise ValueError("Can't determine vehicleinfo_key from binary path")
61
62
def run(self):
63
self.start_SITL(
64
binary=self.binary,
65
model=self.model(),
66
wipe=True,
67
sitl_home="1,1,1,1",
68
)
69
self.get_mavlink_connection_going()
70
71
sets = {}
72
for param_change in param_changes:
73
sets[param_change.old_name] = param_change.old_value
74
self.set_parameters(sets)
75
76
# stopping SITL too soon can leave eeprom.bin corrupt!
77
self.delay_sim_time(2) # FIXTHAT, should not be needed!
78
79
self.stop_SITL()
80
81
82
class TestParamUpgradeTestSuiteCheckParameters(vehicle_test_suite.TestSuite):
83
def __init__(self, binary, param_changes, epsilon=0.0001):
84
super(TestParamUpgradeTestSuiteCheckParameters, self).__init__(binary)
85
self.param_changes = param_changes
86
self.epsilon = epsilon
87
88
def sysid_thismav(self):
89
if "antennatracker" in self.binary:
90
return 2
91
return super(TestParamUpgradeTestSuiteCheckParameters, self).sysid_thismav()
92
93
def vehicleinfo_key(self):
94
'''magically guess vehicleinfo_key from filepath'''
95
path = self.binary.lower()
96
if "plane" in path:
97
return "ArduPlane"
98
if "copter" in path:
99
return "ArduCopter"
100
raise ValueError("Can't determine vehicleinfo_key from binary path")
101
102
def model(self):
103
path = self.binary.lower()
104
if "plane" in path:
105
return "quadplane"
106
if "copter" in path:
107
return "X"
108
raise ValueError("Can't determine vehicleinfo_key from binary path")
109
110
def run(self):
111
self.start_SITL(
112
binary=self.binary,
113
model=self.model(),
114
sitl_home="1,1,1,1",
115
wipe=False,
116
)
117
self.get_mavlink_connection_going()
118
119
params_to_check = [x.new_name for x in self.param_changes]
120
fetched_params = self.get_parameters(params_to_check)
121
122
for p in self.param_changes:
123
if abs(fetched_params[p.new_name] - p.new_value) > self.epsilon:
124
raise ValueError(f"{p.old_name}={p.old_value} did not convert into {p.new_name}={p.new_value}, got {p.new_name}={fetched_params[p.new_name]}") # noqa
125
126
print(f"OK: {p.old_name}={p.old_value} converted to {p.new_name}={p.new_value}")
127
128
self.stop_SITL()
129
130
131
class TestParamUpgradeForVehicle():
132
def __init__(self,
133
vehicle,
134
param_changes,
135
branch=None,
136
master_branch="master",
137
run_eedump_before=False,
138
run_eedump_after=False,
139
):
140
self.vehicle = vehicle
141
self.master_branch = master_branch
142
self.branch = branch
143
self.param_changes = param_changes
144
self.run_eedump_before = run_eedump_before
145
self.run_eedump_after = run_eedump_after
146
147
def run_program(self, prefix, cmd_list, show_output=True, env=None, show_output_on_error=True, show_command=None, cwd="."):
148
if show_command is None:
149
show_command = True
150
if show_command:
151
cmd = " ".join(cmd_list)
152
if cwd is None:
153
cwd = "."
154
self.progress(f"Running ({cmd}) in ({cwd})")
155
p = subprocess.Popen(
156
cmd_list,
157
stdin=None,
158
stdout=subprocess.PIPE,
159
close_fds=True,
160
stderr=subprocess.STDOUT,
161
cwd=cwd,
162
env=env)
163
output = ""
164
while True:
165
x = p.stdout.readline()
166
if len(x) == 0:
167
returncode = os.waitpid(p.pid, 0)
168
if returncode:
169
break
170
# select not available on Windows... probably...
171
time.sleep(0.1)
172
continue
173
x = bytearray(x)
174
x = filter(lambda x : chr(x) in string.printable, x)
175
x = "".join([chr(c) for c in x])
176
output += x
177
x = x.rstrip()
178
some_output = "%s: %s" % (prefix, x)
179
if show_output:
180
print(some_output)
181
else:
182
output += some_output
183
(_, status) = returncode
184
if status != 0:
185
if not show_output and show_output_on_error:
186
# we were told not to show output, but we just
187
# failed... so show output...
188
print(output)
189
self.progress("Process failed (%s)" %
190
str(returncode))
191
try:
192
path = pathlib.Path(self.tmpdir, f"process-failure-{int(time.time())}")
193
path.write_text(output)
194
self.progress("Wrote process failure file (%s)" % path)
195
except Exception:
196
self.progress("Writing process failure file failed")
197
raise subprocess.CalledProcessError(
198
returncode, cmd_list)
199
return output
200
201
def find_current_git_branch_or_sha1(self):
202
try:
203
output = self.run_git(["symbolic-ref", "--short", "HEAD"])
204
output = output.strip()
205
return output
206
except subprocess.CalledProcessError:
207
pass
208
209
# probably in a detached-head state. Get a sha1 instead:
210
output = self.run_git(["rev-parse", "--short", "HEAD"])
211
output = output.strip()
212
return output
213
214
def find_git_branch_merge_base(self, branch, master_branch):
215
output = self.run_git(["merge-base", branch, master_branch])
216
output = output.strip()
217
return output
218
219
def run_git(self, args, show_output=True, source_dir=None):
220
'''run git with args git_args; returns git's output'''
221
cmd_list = ["git"]
222
cmd_list.extend(args)
223
return self.run_program("TPU-GIT", cmd_list, show_output=show_output, cwd=source_dir)
224
225
def progress(self, string):
226
'''pretty-print progress'''
227
print("TPU: %s" % string)
228
229
def binary_path(self, vehicle):
230
build_subdir = "sitl"
231
if 'AP_Periph' in vehicle:
232
build_subdir = "sitl_periph_universal"
233
return f"build/{build_subdir}/{self.build_target_name(vehicle)}"
234
235
def build_target_name(self, vehicle):
236
binary_name = vehicle
237
if binary_name == 'heli':
238
binary_name = 'arducopter-heli'
239
if binary_name == 'rover':
240
binary_name = 'ardurover'
241
return f"bin/{binary_name}"
242
243
def run_eedump(self):
244
self.run_program("TPU-EED", [
245
"libraries/AP_Param/tools/eedump_apparam",
246
"eeprom.bin"
247
])
248
249
def run(self):
250
branch = self.branch
251
if branch is None:
252
branch = self.find_current_git_branch_or_sha1()
253
254
master_commit = self.master_branch
255
256
self.use_merge_base = True
257
if self.use_merge_base:
258
master_commit = self.find_git_branch_merge_base(branch, self.master_branch)
259
self.progress("Using merge base (%s)" % master_commit)
260
261
self.run_git(["checkout", master_commit], show_output=False)
262
self.run_git(["submodule", "update", "--recursive"], show_output=False)
263
shutil.rmtree("build", ignore_errors=True)
264
board = "sitl"
265
if "AP_Periph" in self.vehicle:
266
board = "sitl_periph_universal"
267
util.build_SITL(
268
self.build_target_name(self.vehicle),
269
board=board,
270
clean=False,
271
configure=True,
272
)
273
suite = TestParamUpgradeTestSuiteSetParameters(self.binary_path(self.vehicle), self.param_changes)
274
suite.run()
275
276
self.run_git(["checkout", branch], show_output=False)
277
self.run_git(["submodule", "update", "--recursive"], show_output=False)
278
shutil.rmtree("build", ignore_errors=True)
279
util.build_SITL(
280
self.build_target_name(self.vehicle),
281
board=board,
282
clean=False,
283
configure=True,
284
)
285
suite = TestParamUpgradeTestSuiteCheckParameters(self.binary_path(self.vehicle), self.param_changes)
286
287
if self.run_eedump_before:
288
self.run_eedump()
289
290
# this call starts SITL which will do the upgrade:
291
suite.run()
292
293
if self.run_eedump_after:
294
self.run_eedump()
295
296
297
class TestParamUpgrade():
298
def __init__(self,
299
param_changes,
300
vehicles=None,
301
run_eedump_before=False,
302
run_eedump_after=False,
303
master_branch="master",
304
):
305
self.vehicles = vehicles
306
self.param_changes = param_changes
307
self.vehicles = vehicles
308
self.run_eedump_before = run_eedump_before
309
self.run_eedump_after = run_eedump_after
310
self.master_branch = master_branch
311
312
if self.vehicles is None:
313
self.vehicles = self.all_vehicles()
314
315
def all_vehicles(self):
316
return [
317
# 'AP_Periph',
318
'arducopter',
319
'arduplane',
320
'antennatracker',
321
'heli',
322
'rover',
323
'blimp',
324
'ardusub',
325
]
326
327
def run(self):
328
for vehicle in self.vehicles:
329
s = TestParamUpgradeForVehicle(
330
vehicle,
331
self.param_changes,
332
run_eedump_before=self.run_eedump_before,
333
run_eedump_after=self.run_eedump_after,
334
master_branch=self.master_branch,
335
)
336
s.run()
337
338
339
if __name__ == "__main__":
340
''' main program '''
341
os.environ['PYTHONUNBUFFERED'] = '1'
342
343
if sys.platform != "darwin":
344
os.putenv('TMPDIR', util.reltopdir('tmp'))
345
346
parser = argparse.ArgumentParser("test_param_upgrade.py")
347
parser.add_argument(
348
"--param",
349
action='append',
350
default=[],
351
help="PARAM=VALUE pair to test upgrade for, or PARAM=VALUE->NEWPARAM=NEWVALUE",
352
)
353
parser.add_argument(
354
"--vehicle",
355
action='append',
356
default=[],
357
help="vehicle to test",
358
)
359
parser.add_argument(
360
"--run-eedump-before",
361
action='store_true',
362
default=False,
363
help="run the (already-compiled) eedump tool on eeprom.bin before doing conversion",
364
)
365
parser.add_argument(
366
"--run-eedump-after",
367
action='store_true',
368
default=False,
369
help="run the (already-compiled) eedump tool on eeprom.bin after doing conversion",
370
)
371
parser.add_argument(
372
"--master-branch",
373
type=str,
374
default="master",
375
help="master branch to use",
376
)
377
args = parser.parse_args()
378
379
param_changes = []
380
for x in args.param:
381
if "->" in x:
382
(old, new) = x.split("->")
383
(old_name, old_value) = old.split("=")
384
(new_name, new_value) = new.split("=")
385
param_changes.append(ParamChange(old_name, float(old_value), new_name, float(new_value)))
386
387
else:
388
(name, value) = x.split("=")
389
param_changes.append(ParamChange(name, float(value), name, float(value)))
390
391
vehicles = args.vehicle
392
393
if 'AP_Periph' in vehicles:
394
raise ValueError("AP_Periph not supported yet")
395
396
if len(vehicles) == 0:
397
vehicles = None
398
399
x = TestParamUpgrade(
400
param_changes,
401
vehicles=vehicles,
402
run_eedump_before=args.run_eedump_before,
403
run_eedump_after=args.run_eedump_after,
404
master_branch=args.master_branch,
405
)
406
x.run()
407
408