Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Ardupilot
GitHub Repository: Ardupilot/ardupilot
Path: blob/master/Tools/autotest/pysim/util.py
9767 views
1
'''
2
AP_FLAKE8_CLEAN
3
'''
4
5
from __future__ import annotations
6
7
import atexit
8
import math
9
import os
10
import re
11
import shlex
12
import signal
13
import subprocess
14
import sys
15
import tempfile
16
import time
17
18
19
import pexpect
20
21
RADIUS_OF_EARTH = 6378100.0 # in meters
22
23
# List of open terminal windows for macosx
24
windowID = []
25
26
27
def topdir():
28
"""Return top of git tree where autotest is running from."""
29
d = os.path.dirname(os.path.realpath(__file__))
30
assert os.path.basename(d) == 'pysim'
31
d = os.path.dirname(d)
32
assert os.path.basename(d) == 'autotest'
33
d = os.path.dirname(d)
34
assert os.path.basename(d) == 'Tools'
35
d = os.path.dirname(d)
36
return d
37
38
39
def relcurdir(path):
40
"""Return a path relative to current dir"""
41
return os.path.relpath(path, os.getcwd())
42
43
44
def reltopdir(path):
45
"""Returns the normalized ABSOLUTE path for 'path', where path is a path relative to topdir"""
46
return os.path.normpath(os.path.join(topdir(), path))
47
48
49
def run_cmd(cmd, directory=".", show=True, output=False, checkfail=True):
50
"""Run a shell command."""
51
shell = False
52
if not isinstance(cmd, list):
53
cmd = [cmd]
54
shell = True
55
if show:
56
print("Running: (%s) in (%s)" % (cmd_as_shell(cmd), directory,))
57
if output:
58
return subprocess.Popen(cmd, shell=shell, stdout=subprocess.PIPE, cwd=directory).communicate()[0]
59
elif checkfail:
60
return subprocess.check_call(cmd, shell=shell, cwd=directory)
61
else:
62
return subprocess.call(cmd, shell=shell, cwd=directory)
63
64
65
def rmfile(path):
66
"""Remove a file if it exists."""
67
try:
68
os.unlink(path)
69
except (OSError, FileNotFoundError):
70
pass
71
72
73
def deltree(path):
74
"""Delete a tree of files."""
75
run_cmd('rm -rf %s' % path)
76
77
78
def relwaf():
79
return "./modules/waf/waf-light"
80
81
82
def waf_configure(board,
83
j=None,
84
debug=False,
85
math_check_indexes=False,
86
coverage=False,
87
ekf_single=False,
88
postype_single=False,
89
force_32bit=False,
90
extra_args: list | None = None,
91
extra_hwdef=None,
92
ubsan=False,
93
ubsan_abort=False,
94
num_aux_imus=0,
95
dronecan_tests=False,
96
extra_defines: dict | None = None):
97
98
if extra_args is None:
99
extra_args = []
100
if extra_defines is None:
101
extra_defines = {}
102
103
cmd_configure = [relwaf(), "configure", "--board", board]
104
if debug:
105
cmd_configure.append('--debug')
106
if coverage:
107
cmd_configure.append('--coverage')
108
if math_check_indexes:
109
cmd_configure.append('--enable-math-check-indexes')
110
if ekf_single:
111
cmd_configure.append('--ekf-single')
112
if postype_single:
113
cmd_configure.append('--postype-single')
114
if force_32bit:
115
cmd_configure.append('--force-32bit')
116
if ubsan:
117
cmd_configure.append('--ubsan')
118
if ubsan_abort:
119
cmd_configure.append('--ubsan-abort')
120
if num_aux_imus > 0:
121
cmd_configure.append('--num-aux-imus=%u' % num_aux_imus)
122
if dronecan_tests:
123
cmd_configure.append('--enable-dronecan-tests')
124
if extra_hwdef is not None:
125
cmd_configure.extend(['--extra-hwdef', extra_hwdef])
126
for nv in extra_defines.items():
127
cmd_configure.extend(['--define', "%s=%s" % nv])
128
if j is not None:
129
cmd_configure.extend(['-j', str(j)])
130
pieces = [shlex.split(x) for x in extra_args]
131
for piece in pieces:
132
cmd_configure.extend(piece)
133
run_cmd(cmd_configure, directory=topdir(), checkfail=True)
134
135
136
def waf_clean():
137
run_cmd([relwaf(), "clean"], directory=topdir(), checkfail=True)
138
139
140
def waf_build(target=None):
141
cmd = [relwaf(), "build"]
142
if target is not None:
143
cmd.append(target)
144
run_cmd(cmd, directory=topdir(), checkfail=True)
145
146
147
def build_SITL(
148
build_target,
149
board='sitl',
150
clean=True,
151
configure=True,
152
coverage=False,
153
debug=False,
154
ekf_single=False,
155
extra_configure_args: list | None = None,
156
extra_defines: dict | None = None,
157
j=None,
158
math_check_indexes=False,
159
postype_single=False,
160
force_32bit=False,
161
ubsan=False,
162
ubsan_abort=False,
163
num_aux_imus=0,
164
dronecan_tests=False,
165
):
166
if extra_configure_args is None:
167
extra_configure_args = []
168
if extra_defines is None:
169
extra_defines = {}
170
171
# first configure
172
if configure:
173
waf_configure(board,
174
j=j,
175
debug=debug,
176
math_check_indexes=math_check_indexes,
177
ekf_single=ekf_single,
178
postype_single=postype_single,
179
coverage=coverage,
180
force_32bit=force_32bit,
181
ubsan=ubsan,
182
ubsan_abort=ubsan_abort,
183
extra_defines=extra_defines,
184
num_aux_imus=num_aux_imus,
185
dronecan_tests=dronecan_tests,
186
extra_args=extra_configure_args,)
187
188
# then clean
189
if clean:
190
waf_clean()
191
192
# then build
193
cmd_make = [relwaf(), "build", "--target", build_target]
194
if j is not None:
195
cmd_make.extend(['-j', str(j)])
196
run_cmd(cmd_make, directory=topdir(), checkfail=True, show=True)
197
return True
198
199
200
def build_examples(board, j=None, debug=False, clean=False, configure=True, math_check_indexes=False, coverage=False,
201
ekf_single=False, postype_single=False, force_32bit=False, ubsan=False, ubsan_abort=False,
202
num_aux_imus=0, dronecan_tests=False,
203
extra_configure_args: list | None = None):
204
if extra_configure_args is None:
205
extra_configure_args = []
206
207
# first configure
208
if configure:
209
waf_configure(board,
210
j=j,
211
debug=debug,
212
math_check_indexes=math_check_indexes,
213
ekf_single=ekf_single,
214
postype_single=postype_single,
215
coverage=coverage,
216
force_32bit=force_32bit,
217
ubsan=ubsan,
218
ubsan_abort=ubsan_abort,
219
extra_args=extra_configure_args,
220
dronecan_tests=dronecan_tests)
221
222
# then clean
223
if clean:
224
waf_clean()
225
226
# then build
227
cmd_make = [relwaf(), "examples"]
228
run_cmd(cmd_make, directory=topdir(), checkfail=True, show=True)
229
return True
230
231
232
def build_replay(board, j=None, debug=False, clean=False):
233
# first configure
234
waf_configure(board, j=j, debug=debug)
235
236
# then clean
237
if clean:
238
waf_clean()
239
240
# then build
241
cmd_make = [relwaf(), "replay"]
242
run_cmd(cmd_make, directory=topdir(), checkfail=True, show=True)
243
return True
244
245
246
def build_tests(board,
247
j=None,
248
debug=False,
249
clean=False,
250
configure=True,
251
math_check_indexes=False,
252
coverage=False,
253
ekf_single=False,
254
postype_single=False,
255
force_32bit=False,
256
ubsan=False,
257
ubsan_abort=False,
258
num_aux_imus=0,
259
dronecan_tests=False,
260
extra_configure_args: list | None = None):
261
if extra_configure_args is None:
262
extra_configure_args = []
263
264
# first configure
265
if configure:
266
waf_configure(board,
267
j=j,
268
debug=debug,
269
math_check_indexes=math_check_indexes,
270
ekf_single=ekf_single,
271
postype_single=postype_single,
272
coverage=coverage,
273
force_32bit=force_32bit,
274
ubsan=ubsan,
275
ubsan_abort=ubsan_abort,
276
num_aux_imus=num_aux_imus,
277
dronecan_tests=dronecan_tests,
278
extra_args=extra_configure_args,)
279
280
# then clean
281
if clean:
282
waf_clean()
283
284
# then build
285
run_cmd([relwaf(), "tests"], directory=topdir(), checkfail=True, show=True)
286
return True
287
288
289
# list of pexpect children to close on exit
290
close_list = []
291
292
293
def pexpect_autoclose(p):
294
"""Mark for autoclosing."""
295
close_list.append(p)
296
297
298
def pexpect_close(p):
299
"""Close a pexpect child."""
300
301
ex = None
302
if p is None:
303
print("Nothing to close")
304
return
305
try:
306
p.kill(signal.SIGTERM)
307
except IOError as e:
308
print("Caught exception: %s" % str(e))
309
ex = e
310
pass
311
if ex is None:
312
# give the process some time to go away
313
for i in range(20):
314
if not p.isalive():
315
break
316
time.sleep(0.05)
317
try:
318
p.close()
319
except Exception:
320
pass
321
try:
322
p.close(force=True)
323
except Exception:
324
pass
325
if p in close_list:
326
close_list.remove(p)
327
328
329
def pexpect_close_all():
330
"""Close all pexpect children."""
331
for p in close_list[:]:
332
pexpect_close(p)
333
334
335
def pexpect_drain(p):
336
"""Drain any pending input."""
337
try:
338
p.read_nonblocking(1000, timeout=0)
339
except Exception:
340
pass
341
342
343
def cmd_as_shell(cmd):
344
return (" ".join(['"%s"' % x for x in cmd]))
345
346
347
def make_safe_filename(text):
348
"""Return a version of text safe for use as a filename."""
349
r = re.compile("([^a-zA-Z0-9_.+-])")
350
text.replace('/', '-')
351
filename = r.sub(lambda m: str(hex(ord(str(m.group(1))))).upper(), text)
352
return filename
353
354
355
def valgrind_log_filepath(binary, model):
356
if model is None:
357
model = 'None'
358
return make_safe_filename('%s-%s-valgrind.log' % (os.path.basename(binary), model,))
359
360
361
def kill_screen_gdb():
362
cmd = ["screen", "-X", "-S", "ardupilot-gdb", "quit"]
363
subprocess.Popen(cmd)
364
365
366
def kill_mac_terminal():
367
for window in windowID:
368
cmd = ("osascript -e \'tell application \"Terminal\" to close "
369
"(window(get index of window id %s))\'" % window)
370
os.system(cmd)
371
372
373
class FakeMacOSXSpawn(object):
374
"""something that looks like a pspawn child so we can ignore attempts
375
to pause (and otherwise kill(1) SITL. MacOSX using osascript to
376
start/stop sitl
377
"""
378
def __init__(self):
379
pass
380
381
def progress(self, message):
382
print(message)
383
384
def kill(self, sig):
385
# self.progress("FakeMacOSXSpawn: ignoring kill(%s)" % str(sig))
386
pass
387
388
def isalive(self):
389
self.progress("FakeMacOSXSpawn: assuming process is alive")
390
return True
391
392
393
class PSpawnStdPrettyPrinter(object):
394
'''a fake filehandle-like object which prefixes a string to all lines
395
before printing to stdout/stderr. To be used to pass to
396
pexpect.spawn's logfile argument
397
'''
398
def __init__(self, output=sys.stdout, prefix="stdout"):
399
self.output = output
400
self.prefix = prefix
401
self.buffer = ""
402
403
def close(self):
404
self.print_prefixed_line(self.buffer)
405
406
def write(self, data):
407
self.buffer += data
408
lines = self.buffer.split("\n")
409
self.buffer = lines[-1]
410
lines.pop()
411
for line in lines:
412
self.print_prefixed_line(line)
413
414
def print_prefixed_line(self, line):
415
print("%s: %s" % (self.prefix, line), file=self.output)
416
417
def flush(self):
418
pass
419
420
421
def start_SITL(binary,
422
valgrind=False,
423
callgrind=False,
424
cwd=None,
425
gdb=False,
426
gdb_no_tui=False,
427
wipe=False,
428
home=None,
429
model=None,
430
speedup=1,
431
sim_rate_hz=None,
432
defaults_filepath: list | None = None,
433
param_defaults=None, # dictionary
434
unhide_parameters=False,
435
gdbserver=False,
436
breakpoints: list | None = None,
437
disable_breakpoints=False,
438
customisations: list | None = None,
439
lldb=False,
440
strace=False,
441
enable_fgview=False,
442
supplementary=False,
443
stdout_prefix=None,
444
):
445
"""Launch a SITL instance."""
446
447
if defaults_filepath is None:
448
defaults_filepath = []
449
if breakpoints is None:
450
breakpoints = []
451
if customisations is None:
452
customisations = []
453
454
cmd = []
455
# pexpect doesn't like pathlib:
456
if cwd is not None:
457
cwd = str(cwd)
458
if not isinstance(binary, str):
459
binary = str(binary)
460
if (callgrind or valgrind) and os.path.exists('/usr/bin/valgrind'):
461
# we specify a prefix for vgdb-pipe because on Vagrant virtual
462
# machines the pipes are created on the mountpoint for the
463
# shared directory with the host machine. mmap's,
464
# unsurprisingly, fail on files created on that mountpoint.
465
vgdb_prefix = os.path.join(tempfile.gettempdir(), "vgdb-pipe")
466
log_file = valgrind_log_filepath(binary=binary, model=model)
467
cmd.extend([
468
'valgrind',
469
# adding this option allows valgrind to cope with the overload
470
# of operator new
471
"--soname-synonyms=somalloc=nouserintercepts",
472
'--vgdb-prefix=%s' % vgdb_prefix,
473
'-q',
474
'--log-file=%s' % log_file])
475
if callgrind:
476
cmd.extend(["--tool=callgrind"])
477
if gdbserver:
478
cmd.extend(['gdbserver', 'localhost:3333'])
479
if gdb:
480
# attach gdb to the gdbserver:
481
f = open("/tmp/x.gdb", "w")
482
f.write("target extended-remote localhost:3333\nc\n")
483
for breakingpoint in breakpoints:
484
f.write("b %s\n" % (breakingpoint,))
485
if disable_breakpoints:
486
f.write("disable\n")
487
f.close()
488
run_cmd('screen -d -m -S ardupilot-gdbserver '
489
'bash -c "gdb -x /tmp/x.gdb"')
490
elif gdb:
491
f = open("/tmp/x.gdb", "w")
492
f.write("set pagination off\n")
493
for breakingpoint in breakpoints:
494
f.write("b %s\n" % (breakingpoint,))
495
if disable_breakpoints:
496
f.write("disable\n")
497
if not gdb_no_tui:
498
f.write("tui enable\n")
499
f.write("r\n")
500
f.close()
501
if sys.platform == "darwin" and os.getenv('DISPLAY'):
502
cmd.extend(['gdb', '-x', '/tmp/x.gdb', '--args'])
503
elif os.environ.get('DISPLAY'):
504
cmd.extend(['xterm', '-e', 'gdb', '-x', '/tmp/x.gdb', '--args'])
505
else:
506
cmd.extend(['screen',
507
'-L', '-Logfile', 'gdb.log',
508
'-d',
509
'-m',
510
'-S', 'ardupilot-gdb',
511
'gdb', '--cd', os.getcwd(), '-x', '/tmp/x.gdb', binary, '--args'])
512
elif strace:
513
cmd.append("strace")
514
strace_options = ['-f', '-o', binary + '.strace', '-s', '8000', '-ttt']
515
cmd.extend(strace_options)
516
elif lldb:
517
f = open("/tmp/x.lldb", "w")
518
for breakingpoint in breakpoints:
519
f.write("b %s\n" % (breakingpoint,))
520
if disable_breakpoints:
521
f.write("disable\n")
522
f.write("settings set target.process.stop-on-exec false\n")
523
f.write("process launch\n")
524
f.close()
525
if sys.platform == "darwin" and os.getenv('DISPLAY'):
526
cmd.extend(['lldb', '-s', '/tmp/x.lldb', '--'])
527
elif os.environ.get('DISPLAY'):
528
cmd.extend(['xterm', '-e', 'lldb', '-s', '/tmp/x.lldb', '--'])
529
else:
530
raise RuntimeError("DISPLAY was not set")
531
532
cmd.append(binary)
533
534
if defaults_filepath is None:
535
defaults_filepath = []
536
if not isinstance(defaults_filepath, list):
537
defaults_filepath = [defaults_filepath]
538
defaults = [reltopdir(path) for path in defaults_filepath]
539
540
if param_defaults is not None:
541
text = "".join([f"{name} {value}\n" for (name, value) in param_defaults.items()])
542
filepath = tempfile.NamedTemporaryFile(mode="w", delete=False)
543
print(text, file=filepath)
544
filepath.close()
545
defaults.append(str(filepath.name))
546
547
if not supplementary:
548
if wipe:
549
cmd.append('-w')
550
if home is not None:
551
cmd.extend(['--home', home])
552
if model is not None:
553
cmd.extend(['--model', model])
554
if speedup is not None and speedup != 1:
555
ntf = tempfile.NamedTemporaryFile(mode="w", delete=False)
556
print(f"SIM_SPEEDUP {speedup}", file=ntf)
557
ntf.close()
558
# prepend it so that a caller can override the speedup in
559
# passed-in defaults:
560
defaults = [ntf.name] + defaults
561
if sim_rate_hz is not None:
562
cmd.extend(['--rate', str(sim_rate_hz)])
563
if unhide_parameters:
564
cmd.extend(['--unhide-groups'])
565
# somewhere for MAVProxy to connect to:
566
cmd.append('--serial1=tcp:2')
567
if enable_fgview:
568
cmd.append("--enable-fgview")
569
570
if len(defaults):
571
cmd.extend(['--defaults', ",".join(defaults)])
572
573
cmd.extend(customisations)
574
575
if "--defaults" in customisations:
576
raise ValueError("--defaults must be passed in via defaults_filepath keyword argument, not as part of customisation list") # noqa
577
578
pexpect_logfile_prefix = stdout_prefix
579
if pexpect_logfile_prefix is None:
580
pexpect_logfile_prefix = os.path.basename(binary)
581
pexpect_logfile = PSpawnStdPrettyPrinter(prefix=pexpect_logfile_prefix)
582
583
if (gdb or lldb) and sys.platform == "darwin" and os.getenv('DISPLAY'):
584
# on MacOS record the window IDs so we can close them later
585
atexit.register(kill_mac_terminal)
586
child = None
587
mydir = os.path.dirname(os.path.realpath(__file__))
588
autotest_dir = os.path.realpath(os.path.join(mydir, '..'))
589
runme = [os.path.join(autotest_dir, "run_in_terminal_window.sh"), 'mactest']
590
runme.extend(cmd)
591
print(cmd)
592
out = subprocess.Popen(runme, stdout=subprocess.PIPE, cwd=cwd).communicate()[0]
593
out = out.decode('utf-8')
594
p = re.compile('tab 1 of window id (.*)')
595
596
tstart = time.time()
597
while time.time() - tstart < 5:
598
tabs = p.findall(out)
599
600
if len(tabs) > 0:
601
break
602
603
time.sleep(0.1)
604
# sleep for extra 2 seconds for application to start
605
time.sleep(2)
606
if len(tabs) > 0:
607
windowID.append(tabs[0])
608
else:
609
print("Cannot find %s process terminal" % binary)
610
child = FakeMacOSXSpawn()
611
elif gdb and not os.getenv('DISPLAY'):
612
subprocess.Popen(cmd, cwd=cwd)
613
atexit.register(kill_screen_gdb)
614
# we are expected to return a pexpect wrapped around the
615
# stdout of the ArduPilot binary. Not going to happen until
616
# AP gets a redirect-stdout-to-filehandle option. So, in the
617
# meantime, return a dummy:
618
return pexpect.spawn("true", ["true"],
619
logfile=pexpect_logfile,
620
encoding='ascii',
621
timeout=5)
622
else:
623
print("Running: %s" % cmd_as_shell(cmd))
624
625
first = cmd[0]
626
rest = cmd[1:]
627
child = pexpect.spawn(str(first), rest, logfile=pexpect_logfile, encoding='ascii', timeout=5, cwd=cwd)
628
pexpect_autoclose(child)
629
if gdb or lldb:
630
# if we run GDB we do so in an xterm. "Waiting for
631
# connection" is never going to appear on xterm's output.
632
# ... so let's give it another magic second.
633
time.sleep(1)
634
# TODO: have a SITL-compiled ardupilot able to have its
635
# console on an output fd.
636
else:
637
child.expect('Waiting for ', timeout=300)
638
return child
639
640
641
def mavproxy_cmd():
642
"""return path to which mavproxy to use"""
643
return os.getenv('MAVPROXY_CMD', 'mavproxy.py')
644
645
646
def MAVProxy_version():
647
"""return the current version of mavproxy as a tuple e.g. (1,8,8)"""
648
command = "%s --version" % mavproxy_cmd()
649
output = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE).communicate()[0]
650
output = output.decode('ascii')
651
match = re.search("MAVProxy Version: ([0-9]+)[.]([0-9]+)[.]([0-9]+)", output)
652
if match is None:
653
raise ValueError("Unable to determine MAVProxy version from (%s)" % output)
654
return int(match.group(1)), int(match.group(2)), int(match.group(3))
655
656
657
def start_MAVProxy_SITL(atype,
658
aircraft=None,
659
setup=False,
660
master=None,
661
options: list | None = None,
662
sitl_rcin_port=5501,
663
pexpect_timeout=60,
664
logfile=sys.stdout):
665
"""Launch mavproxy connected to a SITL instance."""
666
if options is None:
667
options = []
668
669
if master is None:
670
raise ValueError("Expected a master")
671
672
local_mp_modules_dir = os.path.abspath(
673
os.path.join(__file__, '..', '..', '..', 'mavproxy_modules'))
674
env = dict(os.environ)
675
old = env.get('PYTHONPATH', None)
676
env['PYTHONPATH'] = local_mp_modules_dir
677
if old is not None:
678
env['PYTHONPATH'] += os.path.pathsep + old
679
680
cmd = []
681
cmd.append(mavproxy_cmd())
682
cmd.extend(['--master', master])
683
cmd.extend(['--sitl', "localhost:%u" % sitl_rcin_port])
684
if setup:
685
cmd.append('--setup')
686
if aircraft is None:
687
aircraft = 'test.%s' % atype
688
cmd.extend(['--aircraft', aircraft])
689
cmd.extend(options)
690
cmd.extend(['--default-modules', 'misc,wp,rally,fence,param,arm,mode,rc,cmdlong,output'])
691
692
print("PYTHONPATH: %s" % str(env['PYTHONPATH']))
693
print("Running: %s" % cmd_as_shell(cmd))
694
695
ret = pexpect.spawn(cmd[0], cmd[1:], logfile=logfile, encoding='ascii', timeout=pexpect_timeout, env=env)
696
ret.delaybeforesend = 0
697
pexpect_autoclose(ret)
698
return ret
699
700
701
def start_PPP_daemon(ips, sockaddr):
702
"""Start pppd for networking"""
703
704
cmd = "sudo pppd socket %s debug noauth nodetach %s" % (sockaddr, ips)
705
cmd = cmd.split()
706
print("Running: %s" % cmd_as_shell(cmd))
707
708
ret = pexpect.spawn(cmd[0], cmd[1:], logfile=sys.stdout, encoding='ascii', timeout=30)
709
ret.delaybeforesend = 0
710
pexpect_autoclose(ret)
711
return ret
712
713
714
def expect_setup_callback(e, callback):
715
"""Setup a callback that is called once a second while waiting for
716
patterns."""
717
def _expect_callback(pattern, timeout=e.timeout):
718
tstart = time.time()
719
while time.time() < tstart + timeout:
720
try:
721
ret = e.expect_saved(pattern, timeout=1)
722
return ret
723
except pexpect.TIMEOUT:
724
e.expect_user_callback(e)
725
print("Timed out looking for %s" % pattern)
726
raise pexpect.TIMEOUT(timeout)
727
728
e.expect_user_callback = callback
729
e.expect_saved = e.expect
730
e.expect = _expect_callback
731
732
733
def mkdir_p(directory):
734
"""Like mkdir -p ."""
735
if not directory:
736
return
737
if directory.endswith("/"):
738
mkdir_p(directory[:-1])
739
return
740
if os.path.isdir(directory):
741
return
742
mkdir_p(os.path.dirname(directory))
743
os.mkdir(directory)
744
745
746
def loadfile(fname):
747
"""Load a file as a string."""
748
f = open(fname, mode='r')
749
r = f.read()
750
f.close()
751
return r
752
753
754
def lock_file(fname):
755
"""Lock a file."""
756
import fcntl
757
f = open(fname, mode='w')
758
try:
759
fcntl.lockf(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
760
except OSError:
761
return None
762
return f
763
764
765
def check_parent(parent_pid=None):
766
"""Check our parent process is still alive."""
767
if parent_pid is None:
768
try:
769
parent_pid = os.getppid()
770
except OSError:
771
pass
772
if parent_pid is None:
773
return
774
try:
775
os.kill(parent_pid, 0)
776
except OSError:
777
print("Parent had finished - exiting")
778
sys.exit(1)
779
780
781
def gps_newpos(lat, lon, bearing, distance):
782
"""Extrapolate latitude/longitude given a heading and distance
783
thanks to http://www.movable-type.co.uk/scripts/latlong.html .
784
"""
785
from math import sin, asin, cos, atan2, radians, degrees
786
787
lat1 = radians(lat)
788
lon1 = radians(lon)
789
brng = radians(bearing)
790
dr = distance / RADIUS_OF_EARTH
791
792
lat2 = asin(sin(lat1) * cos(dr) +
793
cos(lat1) * sin(dr) * cos(brng))
794
lon2 = lon1 + atan2(sin(brng) * sin(dr) * cos(lat1),
795
cos(dr) - sin(lat1) * sin(lat2))
796
return degrees(lat2), degrees(lon2)
797
798
799
def gps_distance(lat1, lon1, lat2, lon2):
800
"""Return distance between two points in meters,
801
coordinates are in degrees
802
thanks to http://www.movable-type.co.uk/scripts/latlong.html ."""
803
lat1 = math.radians(lat1)
804
lat2 = math.radians(lat2)
805
lon1 = math.radians(lon1)
806
lon2 = math.radians(lon2)
807
dLat = lat2 - lat1
808
dLon = lon2 - lon1
809
810
a = math.sin(0.5 * dLat)**2 + math.sin(0.5 * dLon)**2 * math.cos(lat1) * math.cos(lat2)
811
c = 2.0 * math.atan2(math.sqrt(a), math.sqrt(1.0 - a))
812
return RADIUS_OF_EARTH * c
813
814
815
def gps_bearing(lat1, lon1, lat2, lon2):
816
"""Return bearing between two points in degrees, in range 0-360
817
thanks to http://www.movable-type.co.uk/scripts/latlong.html ."""
818
lat1 = math.radians(lat1)
819
lat2 = math.radians(lat2)
820
lon1 = math.radians(lon1)
821
lon2 = math.radians(lon2)
822
dLon = lon2 - lon1
823
y = math.sin(dLon) * math.cos(lat2)
824
x = math.cos(lat1) * math.sin(lat2) - math.sin(lat1) * math.cos(lat2) * math.cos(dLon)
825
bearing = math.degrees(math.atan2(y, x))
826
if bearing < 0:
827
bearing += 360.0
828
return bearing
829
830
831
def constrain(value, minv, maxv):
832
"""Constrain a value to a range."""
833
if value < minv:
834
value = minv
835
if value > maxv:
836
value = maxv
837
return value
838
839
840
def load_local_module(fname):
841
"""load a python module from within the ardupilot tree"""
842
fname = os.path.join(topdir(), fname)
843
import importlib.util
844
spec = importlib.util.spec_from_file_location("local_module", fname)
845
ret = importlib.util.module_from_spec(spec)
846
spec.loader.exec_module(ret)
847
return ret
848
849
850
def get_git_hash(short=False):
851
short_v = "--short=8 " if short else ""
852
githash = run_cmd(f'git rev-parse {short_v}HEAD', output=True, directory=reltopdir('.')).strip()
853
return githash.decode('utf-8')
854
855
856
if __name__ == "__main__":
857
import doctest
858
doctest.testmod()
859
860