Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Ardupilot
GitHub Repository: Ardupilot/ardupilot
Path: blob/master/Tools/autotest/test_param_upgrade.py
9660 views
1
#!/usr/bin/env python3
2
3
'''
4
Test parameter upgrade, master vs branch
5
6
./Tools/autotest/test_param_upgrade.py --vehicle=arduplane --param "GPS_TYPE=17->GPS1_TYPE=17" --param "GPS_TYPE2=37->GPS2_TYPE=37" --param "GPS_GNSS_MODE=21->GPS1_GNSS_MODE=21" --param "GPS_GNSS_MODE2=45->GPS2_GNSS_MODE=45" --param "GPS_RATE_MS=186->GPS1_RATE_MS=186" --param "GPS_RATE_MS2=123->GPS2_RATE_MS=123" --param "GPS_POS1_X=3.75->GPS1_POS_X=3.75" --param "GPS_POS2_X=6.9->GPS2_POS_X=6.9" --param "GPS_POS1_Y=2.75->GPS1_POS_Y=2.75" --param "GPS_POS2_Y=5.9->GPS2_POS_Y=5.9" --param "GPS_POS1_Z=12.1->GPS1_POS_Z=12.1" --param "GPS_POS2_Z=-6.9->GPS2_POS_Z=-6.9" --param "GPS_DELAY_MS=987->GPS1_DELAY_MS=987" --param "GPS_DELAY_MS2=2345->GPS2_DELAY_MS=2345" --param "GPS_COM_PORT=19->GPS1_COM_PORT=19" --param "GPS_COM_PORT2=100->GPS2_COM_PORT=100" --param "GPS_CAN_NODEID1=109->GPS1_CAN_NODEID=109" --param "GPS_CAN_NODEID2=102->GPS2_CAN_NODEID=102" --param "GPS1_CAN_OVRIDE=34->GPS1_CAN_OVRIDE=34" --param "GPS2_CAN_OVRIDE=67" --param "GPS_MB1_TYPE=1->GPS1_MB_TYPE=1" --param "GPS_MB1_OFS_X=3.14->GPS1_MB_OFS_X=3.14" --param "GPS_MB1_OFS_Y=2.18->GPS1_MB_OFS_Y=2.18" --param "GPS_MB1_OFS_Z=17.6->GPS1_MB_OFS_Z=17.6" --param "GPS_MB2_TYPE=13->GPS2_MB_TYPE=13" --param "GPS_MB2_OFS_X=17.14->GPS2_MB_OFS_X=17.14" --param "GPS_MB2_OFS_Y=12.18->GPS2_MB_OFS_Y=12.18" --param "GPS_MB2_OFS_Z=27.6->GPS2_MB_OFS_Z=27.6" # noqa
7
8
./Tools/autotest/test_param_upgrade.py --vehicle=arduplane --param "SERIAL1_OPTIONS=1024->MAV2_OPTIONS=2" --param "SERIAL2_OPTIONS=4096->MAV3_OPTIONS=4"
9
./Tools/autotest/test_param_upgrade.py --vehicle=arduplane --param "SERIAL1_OPTIONS=5120->MAV2_OPTIONS=6"
10
11
AP_FLAKE8_CLEAN
12
'''
13
14
import vehicle_test_suite
15
import os
16
import sys
17
import argparse
18
import subprocess
19
import time
20
import shutil
21
import string
22
import pathlib
23
24
from pysim import util
25
26
27
class ParamChange():
28
def __init__(self, old_name, old_value, new_name, new_value):
29
self.old_name = old_name
30
self.old_value = old_value
31
self.new_name = new_name
32
self.new_value = new_value
33
34
def __str__(self):
35
return f"{self.old_name}={self.old_value}->{self.new_name}={self.new_value}"
36
37
38
class TestParamUpgradeTestSuite(vehicle_test_suite.TestSuite):
39
def __init__(self, binary):
40
super(TestParamUpgradeTestSuite, self).__init__(binary)
41
42
def sysid_thismav(self):
43
if "antennatracker" in self.binary:
44
return 2
45
return super(TestParamUpgradeTestSuite, self).sysid_thismav()
46
47
def vehicleinfo_key(self):
48
'''magically guess vehicleinfo_key from filepath'''
49
path = self.binary.lower()
50
if "blimp" in path:
51
return "Blimp"
52
if "copter" in path:
53
return "ArduCopter"
54
if "plane" in path:
55
return "ArduPlane"
56
if "rover" in path:
57
return "Rover"
58
if "sub" in path:
59
return "ArduSub"
60
if "tracker" in path:
61
return "AntennaTracker"
62
raise ValueError(f"Can't determine vehicleinfo_key from binary path {path}")
63
64
def model(self):
65
path = self.binary.lower()
66
if "blimp" in path:
67
return "blimp"
68
if "copter" in path:
69
return "X"
70
if "plane" in path:
71
return "quadplane"
72
if "rover" in path:
73
return "rover"
74
if "sub" in path:
75
return "vectored"
76
if "tracker" in path:
77
return "tracker"
78
raise ValueError(f"Can't determine model from binary path {path}")
79
80
81
class TestParamUpgradeTestSuiteSetParameters(TestParamUpgradeTestSuite):
82
def __init__(self, binary, param_changes):
83
super(TestParamUpgradeTestSuiteSetParameters, self).__init__(binary)
84
self.param_changes = param_changes
85
86
def run(self):
87
self.start_SITL(
88
binary=self.binary,
89
model=self.model(),
90
wipe=True,
91
sitl_home="1,1,1,1",
92
)
93
self.get_mavlink_connection_going()
94
95
sets = {}
96
for param_change in param_changes:
97
sets[param_change.old_name] = param_change.old_value
98
self.set_parameters(sets)
99
100
# stopping SITL too soon can leave eeprom.bin corrupt!
101
self.delay_sim_time(2) # FIXTHAT, should not be needed!
102
103
self.stop_SITL()
104
105
106
class TestParamUpgradeTestSuiteCheckParameters(TestParamUpgradeTestSuite):
107
def __init__(self, binary, param_changes, epsilon=0.0001):
108
super(TestParamUpgradeTestSuiteCheckParameters, self).__init__(binary)
109
self.param_changes = param_changes
110
self.epsilon = epsilon
111
112
def run(self):
113
self.start_SITL(
114
binary=self.binary,
115
model=self.model(),
116
sitl_home="1,1,1,1",
117
wipe=False,
118
)
119
self.get_mavlink_connection_going()
120
121
params_to_check = [x.new_name for x in self.param_changes]
122
fetched_params = self.get_parameters(params_to_check)
123
124
for p in self.param_changes:
125
if abs(fetched_params[p.new_name] - p.new_value) > self.epsilon:
126
raise ValueError(f"{p.old_name}={p.old_value} did not convert into {p.new_name}={p.new_value}, got {p.new_name}={fetched_params[p.new_name]}") # noqa
127
128
print(f"OK: {p.old_name}={p.old_value} converted to {p.new_name}={p.new_value}")
129
130
self.stop_SITL()
131
132
133
class TestParamUpgradeForVehicle():
134
def __init__(self,
135
vehicle,
136
param_changes,
137
branch=None,
138
master_branch="master",
139
run_eedump_before=False,
140
run_eedump_after=False,
141
):
142
self.vehicle = vehicle
143
self.master_branch = master_branch
144
self.branch = branch
145
self.param_changes = param_changes
146
self.run_eedump_before = run_eedump_before
147
self.run_eedump_after = run_eedump_after
148
149
def run_program(self, prefix, cmd_list, show_output=True, env=None, show_output_on_error=True, show_command=None, cwd="."):
150
if show_command is None:
151
show_command = True
152
if show_command:
153
cmd = " ".join(cmd_list)
154
if cwd is None:
155
cwd = "."
156
self.progress(f"Running ({cmd}) in ({cwd})")
157
p = subprocess.Popen(
158
cmd_list,
159
stdin=None,
160
stdout=subprocess.PIPE,
161
close_fds=True,
162
stderr=subprocess.STDOUT,
163
cwd=cwd,
164
env=env)
165
output = ""
166
while True:
167
x = p.stdout.readline()
168
if len(x) == 0:
169
returncode = os.waitpid(p.pid, 0)
170
if returncode:
171
break
172
# select not available on Windows... probably...
173
time.sleep(0.1)
174
continue
175
x = bytearray(x)
176
x = filter(lambda x : chr(x) in string.printable, x)
177
x = "".join([chr(c) for c in x])
178
output += x
179
x = x.rstrip()
180
some_output = "%s: %s" % (prefix, x)
181
if show_output:
182
print(some_output)
183
else:
184
output += some_output
185
(_, status) = returncode
186
if status != 0:
187
if not show_output and show_output_on_error:
188
# we were told not to show output, but we just
189
# failed... so show output...
190
print(output)
191
self.progress("Process failed (%s)" %
192
str(returncode))
193
try:
194
path = pathlib.Path(self.tmpdir, f"process-failure-{int(time.time())}")
195
path.write_text(output)
196
self.progress("Wrote process failure file (%s)" % path)
197
except Exception:
198
self.progress("Writing process failure file failed")
199
raise subprocess.CalledProcessError(
200
returncode, cmd_list)
201
return output
202
203
def find_current_git_branch_or_sha1(self):
204
try:
205
output = self.run_git(["symbolic-ref", "--short", "HEAD"])
206
output = output.strip()
207
return output
208
except subprocess.CalledProcessError:
209
pass
210
211
# probably in a detached-head state. Get a sha1 instead:
212
output = self.run_git(["rev-parse", "--short", "HEAD"])
213
output = output.strip()
214
return output
215
216
def find_git_branch_merge_base(self, branch, master_branch):
217
output = self.run_git(["merge-base", branch, master_branch])
218
output = output.strip()
219
return output
220
221
def run_git(self, args, show_output=True, source_dir=None):
222
'''run git with args git_args; returns git's output'''
223
cmd_list = ["git"]
224
cmd_list.extend(args)
225
return self.run_program("TPU-GIT", cmd_list, show_output=show_output, cwd=source_dir)
226
227
def progress(self, string):
228
'''pretty-print progress'''
229
print("TPU: %s" % string)
230
231
def binary_path(self, vehicle):
232
build_subdir = "sitl"
233
if 'AP_Periph' in vehicle:
234
build_subdir = "sitl_periph_universal"
235
return f"build/{build_subdir}/{self.build_target_name(vehicle)}"
236
237
def build_target_name(self, vehicle):
238
binary_name = vehicle
239
if binary_name == 'heli':
240
binary_name = 'arducopter-heli'
241
if binary_name == 'rover':
242
binary_name = 'ardurover'
243
return f"bin/{binary_name}"
244
245
def run_eedump(self):
246
self.run_program("TPU-EED", [
247
"libraries/AP_Param/tools/eedump_apparam",
248
"eeprom.bin"
249
])
250
251
def run(self):
252
branch = self.branch
253
if branch is None:
254
branch = self.find_current_git_branch_or_sha1()
255
256
master_commit = self.master_branch
257
258
self.use_merge_base = True
259
if self.use_merge_base:
260
master_commit = self.find_git_branch_merge_base(branch, self.master_branch)
261
self.progress("Using merge base (%s)" % master_commit)
262
263
self.run_git(["checkout", master_commit], show_output=False)
264
self.run_git(["submodule", "update", "--recursive"], show_output=False)
265
shutil.rmtree("build", ignore_errors=True)
266
board = "sitl"
267
if "AP_Periph" in self.vehicle:
268
board = "sitl_periph_universal"
269
util.build_SITL(
270
self.build_target_name(self.vehicle),
271
board=board,
272
clean=False,
273
configure=True,
274
)
275
suite = TestParamUpgradeTestSuiteSetParameters(self.binary_path(self.vehicle), self.param_changes)
276
suite.run()
277
278
self.run_git(["checkout", branch], show_output=False)
279
self.run_git(["submodule", "update", "--recursive"], show_output=False)
280
shutil.rmtree("build", ignore_errors=True)
281
util.build_SITL(
282
self.build_target_name(self.vehicle),
283
board=board,
284
clean=False,
285
configure=True,
286
)
287
suite = TestParamUpgradeTestSuiteCheckParameters(self.binary_path(self.vehicle), self.param_changes)
288
289
if self.run_eedump_before:
290
self.run_eedump()
291
292
# this call starts SITL which will do the upgrade:
293
suite.run()
294
295
if self.run_eedump_after:
296
self.run_eedump()
297
298
299
class TestParamUpgrade():
300
def __init__(self,
301
param_changes,
302
vehicles=None,
303
run_eedump_before=False,
304
run_eedump_after=False,
305
master_branch="master",
306
):
307
self.vehicles = vehicles
308
self.param_changes = param_changes
309
self.vehicles = vehicles
310
self.run_eedump_before = run_eedump_before
311
self.run_eedump_after = run_eedump_after
312
self.master_branch = master_branch
313
314
if self.vehicles is None:
315
self.vehicles = self.all_vehicles()
316
317
def all_vehicles(self):
318
return [
319
# 'AP_Periph',
320
'arducopter',
321
'arduplane',
322
'antennatracker',
323
'heli',
324
'rover',
325
'blimp',
326
'ardusub',
327
]
328
329
def run(self):
330
for vehicle in self.vehicles:
331
s = TestParamUpgradeForVehicle(
332
vehicle,
333
self.param_changes,
334
run_eedump_before=self.run_eedump_before,
335
run_eedump_after=self.run_eedump_after,
336
master_branch=self.master_branch,
337
)
338
s.run()
339
340
341
if __name__ == "__main__":
342
''' main program '''
343
os.environ['PYTHONUNBUFFERED'] = '1'
344
345
if sys.platform != "darwin":
346
os.putenv('TMPDIR', util.reltopdir('tmp'))
347
348
parser = argparse.ArgumentParser("test_param_upgrade.py")
349
parser.add_argument(
350
"--param",
351
action='append',
352
default=[],
353
help="PARAM=VALUE pair to test upgrade for, or PARAM=VALUE->NEWPARAM=NEWVALUE",
354
)
355
parser.add_argument(
356
"--vehicle",
357
action='append',
358
default=[],
359
help="vehicle to test",
360
)
361
parser.add_argument(
362
"--run-eedump-before",
363
action='store_true',
364
default=False,
365
help="run the (already-compiled) eedump tool on eeprom.bin before doing conversion",
366
)
367
parser.add_argument(
368
"--run-eedump-after",
369
action='store_true',
370
default=False,
371
help="run the (already-compiled) eedump tool on eeprom.bin after doing conversion",
372
)
373
parser.add_argument(
374
"--master-branch",
375
type=str,
376
default="master",
377
help="master branch to use",
378
)
379
args = parser.parse_args()
380
381
param_changes = []
382
for x in args.param:
383
if "->" in x:
384
(old, new) = x.split("->")
385
(old_name, old_value) = old.split("=")
386
(new_name, new_value) = new.split("=")
387
param_changes.append(ParamChange(old_name, float(old_value), new_name, float(new_value)))
388
389
else:
390
(name, value) = x.split("=")
391
param_changes.append(ParamChange(name, float(value), name, float(value)))
392
393
vehicles = args.vehicle
394
395
if 'AP_Periph' in vehicles:
396
raise ValueError("AP_Periph not supported yet")
397
398
if len(vehicles) == 0:
399
vehicles = None
400
401
x = TestParamUpgrade(
402
param_changes,
403
vehicles=vehicles,
404
run_eedump_before=args.run_eedump_before,
405
run_eedump_after=args.run_eedump_after,
406
master_branch=args.master_branch,
407
)
408
x.run()
409
410