Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Ardupilot
GitHub Repository: Ardupilot/ardupilot
Path: blob/master/Tools/scripts/CAN/CAN_playback.py
9460 views
1
#!/usr/bin/env python3
2
3
# flake8: noqa
4
5
'''
6
playback a set of CAN frames
7
capture frames either using libraries/AP_Scripting/examples/CAN_logger.lua
8
or the CAN_Pn_OPTIONS bit to enable CAN logging
9
'''
10
11
import dronecan
12
import time
13
import sys
14
import threading
15
from pymavlink import mavutil
16
from dronecan.driver.common import CANFrame
17
import struct
18
19
20
# get command line arguments
21
from argparse import ArgumentParser
22
parser = ArgumentParser(description='CAN playback')
23
parser.add_argument("logfile", default=None, type=str, help="logfile")
24
parser.add_argument("canport", default=None, type=str, help="CAN port")
25
parser.add_argument("--bus", default=0, type=int, help="CAN bus")
26
parser.add_argument("--nodes", default=None, type=str, help="comma separated list of source node IDs to send")
27
parser.add_argument("--messageids", default=None, type=str, help="comma separated list of DroneCAN message IDs to send")
28
29
args = parser.parse_args()
30
31
node_filter = None
32
if args.nodes is not None:
33
node_filter = set(int(n) for n in args.nodes.split(','))
34
35
msgid_filter = None
36
if args.messageids is not None:
37
msgid_filter = set(int(n, 0) for n in args.messageids.split(','))
38
39
print("Connecting to %s" % args.canport)
40
driver = dronecan.driver.make_driver(args.canport)
41
42
print("Opening %s" % args.logfile)
43
mlog = mavutil.mavlink_connection(args.logfile)
44
45
tstart = time.time()
46
first_tstamp = None
47
48
def dlc_to_datalength(dlc):
49
# Data Length Code 9 10 11 12 13 14 15
50
# Number of data bytes 12 16 20 24 32 48 64
51
if (dlc <= 8):
52
return dlc
53
elif (dlc == 9):
54
return 12
55
elif (dlc == 10):
56
return 16
57
elif (dlc == 11):
58
return 20
59
elif (dlc == 12):
60
return 24
61
elif (dlc == 13):
62
return 32
63
elif (dlc == 14):
64
return 48
65
return 64
66
67
while True:
68
m = mlog.recv_match(type=['CANF','CAFD'])
69
70
if m is None:
71
print("Rewinding")
72
mlog.rewind()
73
tstart = time.time()
74
first_tstamp = None
75
continue
76
77
if getattr(m,'bus',0) != args.bus:
78
continue
79
80
if node_filter is not None and (m.Id & 0x7F) not in node_filter:
81
continue
82
83
if msgid_filter is not None:
84
is_message = (m.Id & 0x7F) != 0 and (m.Id & (1 << 7)) == 0
85
msgId = ((m.Id >> 8) & 0xFFFF)
86
if not is_message or msgId not in msgid_filter:
87
print("skip 0x%04x" % msgId, is_message)
88
continue
89
90
if first_tstamp is None:
91
first_tstamp = m.TimeUS
92
dt = time.time() - tstart
93
dt2 = (m.TimeUS - first_tstamp)*1.0e-6
94
if dt2 > dt:
95
time.sleep(dt2 - dt)
96
97
canfd = m.get_type() == 'CAFD'
98
if canfd:
99
data = struct.pack("<QQQQQQQQ", m.D0, m.D1, m.D2, m.D3, m.D4, m.D5, m.D6, m.D7)
100
data = data[:dlc_to_datalength(m.DLC)]
101
else:
102
data = struct.pack("<BBBBBBBB", m.B0, m.B1, m.B2, m.B3, m.B4, m.B5, m.B6, m.B7)
103
data = data[:m.DLC]
104
105
fid = m.Id
106
is_extended = (fid & (1<<31)) != 0
107
driver.send(fid, data, extended=is_extended, canfd=canfd)
108
print(m)
109
110