CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
Ardupilot

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place. Commercial Alternative to JupyterHub.

GitHub Repository: Ardupilot/ardupilot
Path: blob/master/Tools/autotest/pysim/fg_display.py
Views: 1799
1
#!/usr/bin/env python
2
3
import errno
4
import socket
5
import sys
6
import time
7
8
from pymavlink import fgFDM
9
10
11
class udp_socket(object):
12
"""A UDP socket."""
13
def __init__(self, device, blocking=True, is_input=True):
14
a = device.split(':')
15
if len(a) != 2:
16
print("UDP ports must be specified as host:port")
17
sys.exit(1)
18
self.port = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
19
if is_input:
20
self.port.bind((a[0], int(a[1])))
21
self.destination_addr = None
22
else:
23
self.destination_addr = (a[0], int(a[1]))
24
if not blocking:
25
self.port.setblocking(0)
26
self.last_address = None
27
28
def recv(self, n=1000):
29
try:
30
data, self.last_address = self.port.recvfrom(n)
31
except socket.error as e:
32
if e.errno in [errno.EAGAIN, errno.EWOULDBLOCK]:
33
return ""
34
raise
35
return data
36
37
def write(self, buf):
38
try:
39
if self.destination_addr:
40
self.port.sendto(buf, self.destination_addr)
41
else:
42
self.port.sendto(buf, self.last_addr)
43
except socket.error:
44
pass
45
46
47
udp = udp_socket("127.0.0.1:5123")
48
fgout = udp_socket("127.0.0.1:5124", is_input=False)
49
50
tlast = time.time()
51
count = 0
52
53
fg = fgFDM.fgFDM()
54
55
while True:
56
udp_buffer = udp.recv(1000)
57
fg.parse(udp_buffer)
58
fgout.write(fg.pack())
59
count += 1
60
if time.time() - tlast > 1.0:
61
print("%u FPS len=%u" % (count, len(udp_buffer)))
62
count = 0
63
tlast = time.time()
64
print(fg.get('latitude', units='degrees'),
65
fg.get('longitude', units='degrees'),
66
fg.get('altitude', units='meters'),
67
fg.get('vcas', units='mps'))
68
69