CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutSign UpSign In
Ardupilot

Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place. Commercial Alternative to JupyterHub.

GitHub Repository: Ardupilot/ardupilot
Path: blob/master/Tools/scripts/CAN/CAN_playback.py
Views: 1799
1
#!/usr/bin/env python3
2
'''
3
playback a set of CAN frames
4
capture frames either using libraries/AP_Scripting/examples/CAN_logger.lua
5
or the CAN_Pn_OPTIONS bit to enable CAN logging
6
'''
7
8
import dronecan
9
import time
10
import sys
11
import threading
12
from pymavlink import mavutil
13
from dronecan.driver.common import CANFrame
14
import struct
15
16
17
# get command line arguments
18
from argparse import ArgumentParser
19
parser = ArgumentParser(description='CAN playback')
20
parser.add_argument("logfile", default=None, type=str, help="logfile")
21
parser.add_argument("canport", default=None, type=str, help="CAN port")
22
parser.add_argument("--bus", default=0, type=int, help="CAN bus")
23
24
args = parser.parse_args()
25
26
print("Connecting to %s" % args.canport)
27
driver = dronecan.driver.make_driver(args.canport)
28
29
print("Opening %s" % args.logfile)
30
mlog = mavutil.mavlink_connection(args.logfile)
31
32
tstart = time.time()
33
first_tstamp = None
34
35
def dlc_to_datalength(dlc):
36
# Data Length Code 9 10 11 12 13 14 15
37
# Number of data bytes 12 16 20 24 32 48 64
38
if (dlc <= 8):
39
return dlc
40
elif (dlc == 9):
41
return 12
42
elif (dlc == 10):
43
return 16
44
elif (dlc == 11):
45
return 20
46
elif (dlc == 12):
47
return 24
48
elif (dlc == 13):
49
return 32
50
elif (dlc == 14):
51
return 48
52
return 64
53
54
while True:
55
m = mlog.recv_match(type=['CANF','CAFD'])
56
57
if m is None:
58
print("Rewinding")
59
mlog.rewind()
60
tstart = time.time()
61
first_tstamp = None
62
continue
63
64
if getattr(m,'bus',0) != args.bus:
65
continue
66
67
if first_tstamp is None:
68
first_tstamp = m.TimeUS
69
dt = time.time() - tstart
70
dt2 = (m.TimeUS - first_tstamp)*1.0e-6
71
if dt2 > dt:
72
time.sleep(dt2 - dt)
73
74
canfd = m.get_type() == 'CAFD'
75
if canfd:
76
data = struct.pack("<QQQQQQQQ", m.D0, m.D1, m.D2, m.D3, m.D4, m.D5, m.D6, m.D7)
77
data = data[:dlc_to_datalength(m.DLC)]
78
else:
79
data = struct.pack("<BBBBBBBB", m.B0, m.B1, m.B2, m.B3, m.B4, m.B5, m.B6, m.B7)
80
data = data[:m.DLC]
81
82
fid = m.Id
83
is_extended = (fid & (1<<31)) != 0
84
driver.send(fid, data, extended=is_extended, canfd=canfd)
85
print(m)
86
87