'''
Drive Rover in SITL
AP_FLAKE8_CLEAN
'''
import copy
import math
import operator
import os
import pathlib
import sys
import time
import vehicle_test_suite
from pysim import util
from pysim import vehicleinfo
from vehicle_test_suite import AutoTestTimeoutException
from vehicle_test_suite import NotAchievedException
from vehicle_test_suite import PreconditionFailedException
from pymavlink import mavextra
from pymavlink import mavutil
testdir = os.path.dirname(os.path.realpath(__file__))
SITL_START_LOCATION = mavutil.location(40.071374969556928,
-105.22978898137808,
1583.702759,
246)
class AutoTestRover(vehicle_test_suite.TestSuite):
@staticmethod
def get_not_armable_mode_list():
return ["RTL", "SMART_RTL"]
@staticmethod
def get_not_disarmed_settable_modes_list():
return ["FOLLOW"]
@staticmethod
def get_no_position_not_settable_modes_list():
return []
@staticmethod
def get_position_armable_modes_list():
return ["GUIDED", "LOITER", "STEERING", "AUTO"]
@staticmethod
def get_normal_armable_modes_list():
return ["ACRO", "HOLD", "MANUAL"]
def log_name(self):
return "Rover"
def test_filepath(self):
return os.path.realpath(__file__)
def set_current_test_name(self, name):
self.current_test_name_directory = "ArduRover_Tests/" + name + "/"
def sitl_start_location(self):
return SITL_START_LOCATION
def default_frame(self):
return "rover"
def default_speedup(self):
return 30
def is_rover(self):
return True
def get_stick_arming_channel(self):
return int(self.get_parameter("RCMAP_ROLL"))
def DriveSquare(self, side=50):
"""Learn/Drive Square with Ch7 option"""
self.progress("TEST SQUARE")
self.set_parameters({
"RC7_OPTION": 7,
"RC9_OPTION": 58,
})
self.change_mode('MANUAL')
self.wait_ready_to_arm()
self.arm_vehicle()
self.clear_wp(9)
self.progress("\nTurn right towards north")
self.reach_heading_manual(10)
self.progress("Save HOME")
self.save_wp()
self.progress("Save WP")
self.save_wp()
self.progress("\nGoing north %u meters" % side)
self.reach_distance_manual(side)
self.progress("Save WP")
self.save_wp()
self.progress("\nGoing east %u meters" % side)
self.reach_heading_manual(100)
self.reach_distance_manual(side)
self.progress("Save WP")
self.save_wp()
self.progress("\nGoing south %u meters" % side)
self.reach_heading_manual(190)
self.reach_distance_manual(side)
self.progress("Save WP")
self.save_wp()
self.progress("\nGoing west %u meters" % side)
self.reach_heading_manual(280)
self.reach_distance_manual(side)
self.progress("Save WP")
self.save_wp()
self.progress("Checking number of saved waypoints")
mavproxy = self.start_mavproxy()
num_wp = self.save_mission_to_file_using_mavproxy(
mavproxy,
os.path.join(testdir, "ch7_mission.txt"))
self.stop_mavproxy(mavproxy)
expected = 7
if num_wp != expected:
raise NotAchievedException("Did not get %u waypoints; got %u" %
(expected, num_wp))
self.clear_wp(9)
self.disarm_vehicle()
def drive_left_circuit(self):
"""Drive a left circuit, 50m on a side."""
self.change_mode('MANUAL')
self.set_rc(3, 2000)
self.progress("Driving left circuit")
for i in range(0, 4):
self.progress("Starting turn %u" % i)
self.set_rc(1, 1000)
self.wait_heading(270 - (90*i), accuracy=10)
self.set_rc(1, 1500)
self.progress("Starting leg %u" % i)
self.wait_distance(50, accuracy=7)
self.set_rc(3, 1500)
self.progress("Circuit complete")
def ThrottleFailsafe(self):
"""Trigger throttle failsafes"""
self.progress("Testing throttle failsafe")
self.set_parameters({
"FS_THR_ENABLE": 1,
"FS_ACTION": 6,
})
self.change_mode("MANUAL")
self.wait_ready_to_arm()
self.arm_vehicle()
self.progress("Testing failsafe to loiter")
self.set_parameter("SIM_RC_FAIL", 1)
self.wait_mode("LOITER", timeout=10)
self.set_parameter("SIM_RC_FAIL", 0)
self.set_parameter("SIM_GPS1_ENABLE", 0)
self.change_mode("MANUAL")
self.progress("Testing failsafe to hold")
self.set_parameter("SIM_RC_FAIL", 1)
self.wait_mode("HOLD", timeout=10)
self.disarm_vehicle()
self.progress("Loiter or Hold as throttle failsafe OK")
def PARAM_ERROR(self):
'''test PARAM_ERROR mavlink message'''
self.start_subtest("Non-existent parameter (get)")
self.context_push()
self.context_collect('PARAM_ERROR')
self.install_messageprinter_handlers_context(['PARAM_ERROR'])
self.send_get_parameter_direct("BOB")
self.assert_received_message_field_values('PARAM_ERROR', {
"target_system": 250,
"target_component": 250,
"param_id": 'BOB',
"param_index": -1,
"error": mavutil.mavlink.MAV_PARAM_ERROR_DOES_NOT_EXIST,
}, check_context=True, very_verbose=True)
self.context_pop()
self.start_subtest("Non-existent parameter (get-by-id)")
self.context_push()
self.context_collect('PARAM_ERROR')
self.install_messageprinter_handlers_context(['PARAM_ERROR'])
non_existent_param_index = 32764
self.mav.mav.param_request_read_send(
self.sysid_thismav(),
1,
bytes("", 'ascii'),
non_existent_param_index
)
self.assert_received_message_field_values('PARAM_ERROR', {
"target_system": 250,
"target_component": 250,
"param_id": '',
"param_index": non_existent_param_index,
"error": mavutil.mavlink.MAV_PARAM_ERROR_DOES_NOT_EXIST,
}, check_context=True, very_verbose=True)
self.context_pop()
self.start_subtest("Non-existent parameter (set)")
self.context_push()
self.context_collect('PARAM_ERROR')
self.send_set_parameter_direct("BOB", 1)
self.assert_received_message_field_values('PARAM_ERROR', {
"target_system": 250,
"target_component": 250,
"param_id": 'BOB',
"param_index": -1,
"error": mavutil.mavlink.MAV_PARAM_ERROR_DOES_NOT_EXIST,
}, check_context=True, very_verbose=True)
self.context_pop()
self.start_subtest("Try to set a bad parameter value")
self.context_push()
self.context_collect('PARAM_ERROR')
self.send_set_parameter_direct("SPRAY_ENABLE", float("nan"))
self.assert_received_message_field_values('PARAM_ERROR', {
"target_system": 250,
"target_component": 250,
"param_id": 'SPRAY_ENABLE',
"param_index": -1,
"error": mavutil.mavlink.MAV_PARAM_ERROR_VALUE_OUT_OF_RANGE,
}, check_context=True, very_verbose=True)
self.context_pop()
def Sprayer(self):
"""Test sprayer functionality."""
rc_ch = 5
pump_ch = 5
spinner_ch = 6
pump_ch_min = 1050
pump_ch_trim = 1520
pump_ch_max = 1950
spinner_ch_min = 975
spinner_ch_trim = 1510
spinner_ch_max = 1975
self.set_parameters({
"SPRAY_ENABLE": 1,
"SERVO%u_FUNCTION" % pump_ch: 22,
"SERVO%u_MIN" % pump_ch: pump_ch_min,
"SERVO%u_TRIM" % pump_ch: pump_ch_trim,
"SERVO%u_MAX" % pump_ch: pump_ch_max,
"SERVO%u_FUNCTION" % spinner_ch: 23,
"SERVO%u_MIN" % spinner_ch: spinner_ch_min,
"SERVO%u_TRIM" % spinner_ch: spinner_ch_trim,
"SERVO%u_MAX" % spinner_ch: spinner_ch_max,
"SIM_SPR_ENABLE": 1,
"SIM_SPR_PUMP": pump_ch,
"SIM_SPR_SPIN": spinner_ch,
"RC%u_OPTION" % rc_ch: 15,
"LOG_DISARMED": 1,
})
self.reboot_sitl()
self.wait_ready_to_arm()
self.arm_vehicle()
self.progress("test boot-up state - it's zero-output!")
self.wait_servo_channel_value(spinner_ch, 0)
self.wait_servo_channel_value(pump_ch, 0)
self.progress("Enable sprayer")
self.set_rc(rc_ch, 2000)
self.progress("Testing zero-speed state")
self.wait_servo_channel_value(spinner_ch, spinner_ch_min)
self.wait_servo_channel_value(pump_ch, pump_ch_min)
self.progress("Testing turning it off")
self.set_rc(rc_ch, 1000)
self.wait_servo_channel_value(spinner_ch, spinner_ch_min)
self.wait_servo_channel_value(pump_ch, pump_ch_min)
self.progress("Testing turning it back on")
self.set_rc(rc_ch, 2000)
self.wait_servo_channel_value(spinner_ch, spinner_ch_min)
self.wait_servo_channel_value(pump_ch, pump_ch_min)
self.progress("Testing speed-ramping")
self.set_rc(3, 1700)
self.wait_servo_channel_value(pump_ch, 1695, timeout=60)
self.progress("Turning it off again")
self.set_rc(rc_ch, 1000)
self.wait_servo_channel_value(spinner_ch, spinner_ch_min)
self.wait_servo_channel_value(pump_ch, pump_ch_min)
self.start_subtest("Sprayer Mission")
self.load_mission("sprayer-mission.txt")
self.change_mode("AUTO")
self.progress("Waiting for sprayer to start")
self.wait_servo_channel_value(pump_ch, 1250, timeout=60, comparator=operator.gt)
self.progress("Waiting for sprayer to stop")
self.wait_servo_channel_value(pump_ch, pump_ch_min, timeout=120)
self.start_subtest("Checking mavlink commands")
self.change_mode("MANUAL")
self.progress("Starting Sprayer")
self.run_cmd(mavutil.mavlink.MAV_CMD_DO_SPRAYER, p1=1)
self.progress("Testing speed-ramping")
self.set_rc(3, 1700)
self.wait_servo_channel_value(pump_ch, 1690, timeout=60, comparator=operator.gt)
self.start_subtest("Stopping Sprayer")
self.run_cmd(mavutil.mavlink.MAV_CMD_DO_SPRAYER, p1=0)
self.wait_servo_channel_value(pump_ch, pump_ch_min)
self.set_rc(3, 1000)
self.progress("Sprayer OK")
self.disarm_vehicle()
def DriveMaxRCIN(self, timeout=30):
"""Drive rover at max RC inputs"""
self.progress("Testing max RC inputs")
self.change_mode("MANUAL")
self.wait_ready_to_arm()
self.arm_vehicle()
self.set_rc(3, 2000)
self.set_rc(1, 1000)
tstart = self.get_sim_time()
while self.get_sim_time_cached() - tstart < timeout:
m = self.assert_receive_message('VFR_HUD')
self.progress("Current speed: %f" % m.groundspeed)
self.disarm_vehicle()
def drive_mission(self, filename, strict=True, ignore_MANUAL_mode_change=False):
"""Drive a mission from a file."""
self.progress("Driving mission %s" % filename)
wp_count = self.load_mission(filename, strict=strict)
self.wait_ready_to_arm()
self.arm_vehicle()
self.change_mode('AUTO')
self.wait_waypoint(1, wp_count-1, max_dist=5, ignore_MANUAL_mode_change=ignore_MANUAL_mode_change)
self.wait_statustext("Mission Complete", timeout=600)
self.disarm_vehicle()
self.progress("Mission OK")
def DriveMission(self):
'''Drive Mission rover1.txt'''
self.drive_mission("rover1.txt", strict=False)
def GripperMission(self):
'''Test Gripper Mission Items'''
self.load_mission("rover-gripper-mission.txt")
self.change_mode('AUTO')
self.wait_ready_to_arm()
self.arm_vehicle()
self.context_collect('STATUSTEXT')
self.wait_statustext("Gripper Grabbed", timeout=60, check_context=True)
self.wait_statustext("Gripper Released", timeout=60, check_context=True)
self.wait_statustext("Mission Complete", timeout=60, check_context=True)
self.disarm_vehicle()
def _MAV_CMD_DO_SEND_BANNER(self, run_cmd):
'''Get Banner'''
self.context_push()
self.context_collect('STATUSTEXT')
run_cmd(mavutil.mavlink.MAV_CMD_DO_SEND_BANNER)
self.wait_statustext("ArduRover", timeout=1, check_context=True)
self.context_pop()
def MAV_CMD_DO_SEND_BANNER(self):
'''test MAV_CMD_DO_SEND_BANNER'''
self._MAV_CMD_DO_SEND_BANNER(self.run_cmd)
self._MAV_CMD_DO_SEND_BANNER(self.run_cmd_int)
def drive_brake_get_stopping_distance(self, speed):
'''measure our stopping distance'''
self.context_push()
self.set_parameters({
'CRUISE_SPEED': speed*1.2,
'ATC_ACCEL_MAX': 15,
})
self.change_mode("STEERING")
self.set_rc(3, 2000)
self.wait_groundspeed(15, 100)
initial = self.mav.location()
initial_time = time.time()
while time.time() - initial_time < 2:
start = self.mav.location()
if start != initial:
break
self.set_rc(3, 1500)
self.wait_groundspeed(0, 0.2)
initial = self.mav.location()
initial_time = time.time()
while time.time() - initial_time < 2:
stop = self.mav.location()
if stop != initial:
break
delta = self.get_distance(start, stop)
self.context_pop()
return delta
def DriveBrake(self):
'''Test braking'''
self.set_parameters({
'CRUISE_SPEED': 15,
'ATC_BRAKE': 0,
})
self.arm_vehicle()
distance_without_brakes = self.drive_brake_get_stopping_distance(15)
self.set_parameter('ATC_BRAKE', 1)
distance_with_brakes = self.drive_brake_get_stopping_distance(15)
delta = distance_without_brakes - distance_with_brakes
self.disarm_vehicle()
if delta < distance_without_brakes * 0.05:
raise NotAchievedException("""
Brakes have negligible effect (with=%0.2fm without=%0.2fm delta=%0.2fm)
""" %
(distance_with_brakes,
distance_without_brakes,
delta))
self.progress(
"Brakes work (with=%0.2fm without=%0.2fm delta=%0.2fm)" %
(distance_with_brakes, distance_without_brakes, delta))
def drive_rtl_mission_max_distance_from_home(self):
'''maximum distance allowed from home at end'''
return 6.5
def DriveRTL(self, timeout=120):
'''Drive an RTL Mission'''
self.wait_ready_to_arm()
self.arm_vehicle()
self.load_mission("rtl.txt")
self.change_mode("AUTO")
tstart = self.get_sim_time()
while True:
now = self.get_sim_time_cached()
if now - tstart > timeout:
raise AutoTestTimeoutException("Didn't see wp 3")
m = self.assert_receive_message('MISSION_CURRENT')
self.progress("MISSION_CURRENT: %s" % str(m))
if m.seq == 3:
break
self.drain_mav()
m = self.assert_receive_message('NAV_CONTROLLER_OUTPUT')
wp_dist_min = 5
if m.wp_dist < wp_dist_min:
raise PreconditionFailedException(
"Did not start at least %f metres from destination (is=%f)" %
(wp_dist_min, m.wp_dist))
self.progress("NAV_CONTROLLER_OUTPUT.wp_dist looks good (%u >= %u)" %
(m.wp_dist, wp_dist_min,))
self.wait_statustext("Mission Complete", timeout=70)
self.wait_groundspeed(0, 0.5, timeout=600)
home_distance = self.distance_to_home()
home_distance_min = 5.5
home_distance_max = self.drive_rtl_mission_max_distance_from_home()
if home_distance > home_distance_max:
raise NotAchievedException(
"Did not stop near home (%f metres distant (%f > want > %f))" %
(home_distance, home_distance_min, home_distance_max))
self.disarm_vehicle()
self.progress("RTL Mission OK (%fm)" % home_distance)
def RTL_SPEED(self, timeout=120):
'''Test RTL_SPEED is honoured'''
self.upload_simple_relhome_mission([
(mavutil.mavlink.MAV_CMD_NAV_WAYPOINT, 300, 0, 0),
(mavutil.mavlink.MAV_CMD_NAV_WAYPOINT, 1000, 0, 0),
])
self.wait_ready_to_arm()
self.arm_vehicle()
self.change_mode('AUTO')
self.wait_current_waypoint(2, timeout=120)
for speed in 1, 5.5, 1.5, 7.5:
self.set_parameter("RTL_SPEED", speed)
self.change_mode('RTL')
self.wait_groundspeed(speed-0.1, speed+0.1, minimum_duration=10)
self.change_mode('HOLD')
self.do_RTL()
self.disarm_vehicle()
def AC_Avoidance(self):
'''Test AC Avoidance switch'''
self.load_fence("rover-fence-ac-avoid.txt")
self.set_parameters({
"FENCE_ENABLE": 0,
"PRX1_TYPE": 10,
"RC10_OPTION": 40,
})
self.reboot_sitl()
self.wait_ready_to_arm()
self.arm_vehicle()
self.set_rc(10, 1000)
self.change_mode("ACRO")
self.set_rc(3, 1550)
self.wait_distance_to_home(25, 100000, timeout=60)
self.change_mode("RTL")
self.wait_statustext("Reached destination", timeout=60)
self.set_rc(10, 2000)
self.change_mode("ACRO")
self.wait_groundspeed(0, 0.7, timeout=60)
self.wait_groundspeed(0, 0.2, timeout=120)
self.disarm_vehicle()
def ServoRelayEvents(self):
'''Test ServoRelayEvents'''
for method in self.run_cmd, self.run_cmd_int:
self.context_push()
self.set_parameters({
"RELAY1_FUNCTION": 1,
"RELAY2_FUNCTION": 1,
})
self.reboot_sitl()
method(mavutil.mavlink.MAV_CMD_DO_SET_RELAY, p1=0, p2=0)
off = self.get_parameter("SIM_PIN_MASK")
method(mavutil.mavlink.MAV_CMD_DO_SET_RELAY, p1=0, p2=1)
on = self.get_parameter("SIM_PIN_MASK")
if on == off:
raise NotAchievedException(
"Pin mask unchanged after relay cmd")
self.progress("Pin mask changed after relay command")
method(mavutil.mavlink.MAV_CMD_DO_SET_RELAY, p1=0, p2=0)
self.set_message_rate_hz("RELAY_STATUS", 10)
self.assert_received_message_field_values('RELAY_STATUS', {
"present": 3,
"on": 0,
})
method(mavutil.mavlink.MAV_CMD_DO_SET_RELAY, p1=0, p2=1)
self.assert_received_message_field_values('RELAY_STATUS', {
"present": 3,
"on": 1,
})
method(mavutil.mavlink.MAV_CMD_DO_SET_RELAY, p1=1, p2=1)
self.assert_received_message_field_values('RELAY_STATUS', {
"present": 3,
"on": 3,
})
method(mavutil.mavlink.MAV_CMD_DO_SET_RELAY, p1=0, p2=0)
method(mavutil.mavlink.MAV_CMD_DO_SET_RELAY, p1=1, p2=0)
self.assert_received_message_field_values('RELAY_STATUS', {
"present": 3,
"on": 0,
})
self.set_parameters({
"RELAY6_FUNCTION": 1,
"RELAY6_PIN": 14,
})
self.reboot_sitl()
self.set_message_rate_hz("RELAY_STATUS", 10)
self.assert_received_message_field_values('RELAY_STATUS', {
"present": 35,
"on": 0,
})
method(mavutil.mavlink.MAV_CMD_DO_SET_RELAY, p1=5, p2=1)
self.assert_received_message_field_values('RELAY_STATUS', {
"present": 35,
"on": 32,
})
method(mavutil.mavlink.MAV_CMD_DO_SET_RELAY, p1=0, p2=1)
self.assert_received_message_field_values('RELAY_STATUS', {
"present": 35,
"on": 33,
})
method(mavutil.mavlink.MAV_CMD_DO_SET_RELAY, p1=5, p2=0)
self.assert_received_message_field_values('RELAY_STATUS', {
"present": 35,
"on": 1,
})
self.start_subtest("test MAV_CMD_DO_REPEAT_RELAY")
self.context_push()
self.set_parameter("SIM_SPEEDUP", 1)
method(
mavutil.mavlink.MAV_CMD_DO_REPEAT_RELAY,
p1=0,
p2=5,
p3=0.5,
)
for value in 0, 1, 0, 1, 0, 1, 0, 1:
self.wait_message_field_values('RELAY_STATUS', {
"on": value,
})
self.context_pop()
self.delay_sim_time(3)
self.assert_received_message_field_values('RELAY_STATUS', {
"on": 1,
})
self.context_pop()
self.start_subtest("test MAV_CMD_DO_SET_SERVO")
for value in 1678, 2300, 0:
method(mavutil.mavlink.MAV_CMD_DO_SET_SERVO, p1=13, p2=value)
self.wait_servo_channel_value(13, value)
self.start_subtest("test MAV_CMD_DO_REPEAT_SERVO")
self.context_push()
self.set_parameter("SIM_SPEEDUP", 1)
trim = self.get_parameter("SERVO13_TRIM")
value = 2000
method(
mavutil.mavlink.MAV_CMD_DO_REPEAT_SERVO,
p1=12,
p2=value,
p3=5,
p4=0.5,
)
for value in trim, value, trim, value, trim, value, trim, value:
self.wait_servo_channel_value(12, value)
self.context_pop()
self.set_message_rate_hz("RELAY_STATUS", 0)
def MAVProxy_SetModeUsingSwitch(self):
"""Set modes via mavproxy switch"""
port = self.sitl_rcin_port(offset=1)
self.customise_SITL_commandline([
"--rc-in-port", str(port),
])
ex = None
try:
self.load_mission(self.arming_test_mission())
self.wait_ready_to_arm()
fnoo = [(1, 'MANUAL'),
(2, 'MANUAL'),
(3, 'RTL'),
(4, 'AUTO'),
(5, 'AUTO'),
(6, 'MANUAL')]
mavproxy = self.start_mavproxy(sitl_rcin_port=port)
for (num, expected) in fnoo:
mavproxy.send('switch %u\n' % num)
self.wait_mode(expected)
self.stop_mavproxy(mavproxy)
except Exception as e:
self.print_exception_caught(e)
ex = e
self.customise_SITL_commandline([
])
if ex is not None:
raise ex
def MAVProxy_SetModeUsingMode(self):
'''Set modes via mavproxy mode command'''
fnoo = [(1, 'ACRO'),
(3, 'STEERING'),
(4, 'HOLD'),
]
mavproxy = self.start_mavproxy()
for (num, expected) in fnoo:
mavproxy.send('mode manual\n')
self.wait_mode("MANUAL")
mavproxy.send('mode %u\n' % num)
self.wait_mode(expected)
mavproxy.send('mode manual\n')
self.wait_mode("MANUAL")
mavproxy.send('mode %s\n' % expected)
self.wait_mode(expected)
self.stop_mavproxy(mavproxy)
def ModeSwitch(self):
''''Set modes via modeswitch'''
self.set_parameter("MODE_CH", 8)
self.set_rc(8, 1000)
self.set_parameter("MODE6", 4)
self.set_parameter("MODE5", 1)
self.set_rc(8, 1800)
self.wait_mode("HOLD")
self.set_rc(8, 1700)
self.wait_mode("ACRO")
self.set_rc(8, 1800)
self.wait_mode("HOLD")
self.set_rc(8, 1700)
self.wait_mode("ACRO")
def AuxModeSwitch(self):
'''Set modes via auxswitches'''
mapping = [0, 1165, 1295, 1425, 1555, 1685, 1815]
self.set_parameter("MODE1", 1)
self.set_rc(8, mapping[1])
self.wait_mode('ACRO')
self.set_rc(9, 1000)
self.set_rc(10, 1000)
self.set_parameters({
"RC9_OPTION": 53,
"RC10_OPTION": 54,
})
self.set_rc(9, 1900)
self.wait_mode("STEERING")
self.set_rc(10, 1900)
self.wait_mode("HOLD")
self.set_rc(9, 1000)
self.set_rc(10, 1000)
self.wait_mode("ACRO")
self.set_rc(9, 1900)
self.wait_mode("STEERING")
self.set_rc(10, 1900)
self.wait_mode("HOLD")
self.set_rc(10, 1000)
self.wait_mode("ACRO")
self.set_rc(9, 1000)
def RCOverridesCancel(self):
'''Test RC overrides Cancel'''
self.set_parameter("MAV_GCS_SYSID", self.mav.source_system)
self.change_mode('MANUAL')
self.wait_ready_to_arm()
self.zero_throttle()
self.arm_vehicle()
normal_rc_throttle = 1700
throttle_override = 1900
self.progress("Establishing baseline RC input")
self.set_rc(3, normal_rc_throttle)
self.drain_mav()
tstart = self.get_sim_time()
while True:
if self.get_sim_time_cached() - tstart > 10:
raise AutoTestTimeoutException("Did not get rc change")
m = self.assert_receive_message('RC_CHANNELS')
if m.chan3_raw == normal_rc_throttle:
break
self.progress("Set override with RC_CHANNELS_OVERRIDE")
self.drain_mav()
tstart = self.get_sim_time()
while True:
if self.get_sim_time_cached() - tstart > 10:
raise AutoTestTimeoutException("Did not override")
self.progress("Sending throttle of %u" % (throttle_override,))
self.mav.mav.rc_channels_override_send(
1,
1,
65535,
65535,
throttle_override,
65535,
65535,
65535,
65535,
65535)
m = self.assert_receive_message('RC_CHANNELS')
self.progress("chan3=%f want=%f" % (m.chan3_raw, throttle_override))
if m.chan3_raw == throttle_override:
break
self.progress("disabling override and making sure we revert to RC input in good time")
self.drain_mav()
tstart = self.get_sim_time()
while True:
if self.get_sim_time_cached() - tstart > 0.5:
raise AutoTestTimeoutException("Did not cancel override")
self.progress("Sending cancel of throttle override")
self.mav.mav.rc_channels_override_send(
1,
1,
65535,
65535,
0,
65535,
65535,
65535,
65535,
65535)
self.do_timesync_roundtrip()
m = self.assert_receive_message('RC_CHANNELS')
self.progress("chan3=%f want=%f" % (m.chan3_raw, normal_rc_throttle))
if m.chan3_raw == normal_rc_throttle:
break
self.disarm_vehicle()
def RCOverrides(self):
'''Test RC overrides'''
self.set_parameter("MAV_GCS_SYSID", self.mav.source_system)
self.set_parameter("RC12_OPTION", 46)
self.reboot_sitl()
self.change_mode('MANUAL')
self.wait_ready_to_arm()
self.set_rc(3, 1500)
self.arm_vehicle()
normal_rc_throttle = 1700
self.set_rc(3, normal_rc_throttle)
self.wait_groundspeed(5, 100)
self.set_rc(12, 2000)
throttle_override = 1500
tstart = self.get_sim_time_cached()
while True:
if self.get_sim_time_cached() - tstart > 10:
raise AutoTestTimeoutException("Did not reach speed")
self.progress("Sending throttle of %u" % (throttle_override,))
self.mav.mav.rc_channels_override_send(
1,
1,
65535,
65535,
throttle_override,
65535,
65535,
65535,
65535,
65535)
m = self.assert_receive_message('VFR_HUD')
want_speed = 2.0
self.progress("Speed=%f want=<%f" % (m.groundspeed, want_speed))
if m.groundspeed < want_speed:
break
self.set_rc(12, 1000)
throttle_override = 1500
tstart = self.get_sim_time_cached()
while True:
if self.get_sim_time_cached() - tstart > 10:
raise AutoTestTimeoutException("Did not speed back up")
self.progress("Sending throttle of %u" % (throttle_override,))
self.mav.mav.rc_channels_override_send(
1,
1,
65535,
65535,
throttle_override,
65535,
65535,
65535,
65535,
65535)
m = self.assert_receive_message('VFR_HUD')
want_speed = 5.0
self.progress("Speed=%f want=>%f" % (m.groundspeed, want_speed))
if m.groundspeed > want_speed:
break
self.set_rc(12, 2000)
self.progress("Waiting for RC to revert to normal RC input")
self.wait_rc_channel_value(3, normal_rc_throttle, timeout=10)
self.start_subtest("Check override time of zero disables overrides")
old = self.get_parameter("RC_OVERRIDE_TIME")
ch = 2
self.set_rc(ch, 1000)
channels = [65535] * 18
ch_override_value = 1700
channels[ch-1] = ch_override_value
channels[7] = 1234
self.progress("Sending override message %u" % ch_override_value)
self.mav.mav.rc_channels_override_send(
1,
1,
*channels
)
self.wait_rc_channel_value(ch, ch_override_value, timeout=30)
self.set_parameter("RC_OVERRIDE_TIME", 0)
self.wait_rc_channel_value(ch, 1000)
self.set_parameter("RC_OVERRIDE_TIME", old)
self.wait_rc_channel_value(ch, ch_override_value)
ch_override_value = 1720
channels[ch-1] = ch_override_value
self.progress("Sending override message %u" % ch_override_value)
self.mav.mav.rc_channels_override_send(
1,
1,
*channels
)
self.wait_rc_channel_value(ch, ch_override_value, timeout=10)
self.set_parameter("RC_OVERRIDE_TIME", 0)
self.wait_rc_channel_value(ch, 1000)
self.set_parameter("RC_OVERRIDE_TIME", old)
self.progress("Ensuring timeout works")
self.wait_rc_channel_value(ch, 1000, timeout=5)
self.delay_sim_time(10)
self.set_parameter("RC_OVERRIDE_TIME", 10)
self.progress("Sending override message")
ch_override_value = 1730
channels[ch-1] = ch_override_value
self.progress("Sending override message %u" % ch_override_value)
self.mav.mav.rc_channels_override_send(
1,
1,
*channels
)
self.wait_rc_channel_value(ch, ch_override_value, timeout=10)
tstart = self.get_sim_time()
self.progress("Waiting for channel to revert to 1000 in ~10s")
self.wait_rc_channel_value(ch, 1000, timeout=15)
delta = self.get_sim_time() - tstart
if delta > 12:
raise NotAchievedException("Took too long to revert RC channel value (delta=%f)" % delta)
min_delta = 9
if delta < min_delta:
raise NotAchievedException("Didn't take long enough to revert RC channel value (delta=%f want>=%f)" %
(delta, min_delta))
self.progress("Disabling RC override timeout")
self.set_parameter("RC_OVERRIDE_TIME", -1)
ch_override_value = 1740
channels[ch-1] = ch_override_value
self.progress("Sending override message %u" % ch_override_value)
self.mav.mav.rc_channels_override_send(
1,
1,
*channels
)
self.wait_rc_channel_value(ch, ch_override_value, timeout=10)
tstart = self.get_sim_time()
while True:
delta = self.get_sim_time() - tstart
if delta > 20:
break
m = self.assert_receive_message('RC_CHANNELS')
channel_field = "chan%u_raw" % ch
m_value = getattr(m, channel_field)
if m_value != ch_override_value:
raise NotAchievedException("Value reverted after %f seconds when it should not have (got=%u) (want=%u)" % (delta, m_value, ch_override_value))
self.set_parameter("RC_OVERRIDE_TIME", old)
self.delay_sim_time(10)
self.start_subtest("Checking higher-channel semantics")
self.context_push()
self.set_parameter("RC_OVERRIDE_TIME", 30)
ch = 11
rc_value = 1010
self.set_rc(ch, rc_value)
channels = [65535] * 18
ch_override_value = 1234
channels[ch-1] = ch_override_value
self.progress("Sending override message ch%u=%u" % (ch, ch_override_value))
self.mav.mav.rc_channels_override_send(
1,
1,
*channels
)
self.progress("Wait for override value")
self.wait_rc_channel_value(ch, ch_override_value, timeout=10)
self.progress("Sending return-to-RC-input value")
channels[ch-1] = 65534
self.mav.mav.rc_channels_override_send(
1,
1,
*channels
)
self.wait_rc_channel_value(ch, rc_value, timeout=10)
channels[ch-1] = ch_override_value
self.progress("Sending override message ch%u=%u" % (ch, ch_override_value))
self.mav.mav.rc_channels_override_send(
1,
1,
*channels
)
self.progress("Wait for override value")
self.wait_rc_channel_value(ch, ch_override_value, timeout=10)
tstart = self.get_sim_time()
while True:
if self.get_sim_time_cached() - tstart > 10:
break
ignore_value = 0
if self.get_sim_time_cached() - tstart > 5:
ignore_value = 65535
self.progress("Sending ignore value %u" % ignore_value)
channels[ch-1] = ignore_value
self.mav.mav.rc_channels_override_send(
1,
1,
*channels
)
if self.get_rc_channel_value(ch) != ch_override_value:
raise NotAchievedException("Did not maintain value")
self.context_pop()
self.end_subtest("Checking higher-channel semantics")
self.disarm_vehicle()
def MANUAL_CONTROL(self):
'''Test mavlink MANUAL_CONTROL'''
self.set_parameters({
"MAV_GCS_SYSID": self.mav.source_system,
"RC12_OPTION": 46,
})
self.reboot_sitl()
self.change_mode("MANUAL")
self.wait_ready_to_arm()
self.arm_vehicle()
self.progress("start moving forward a little")
normal_rc_throttle = 1700
self.set_rc(3, normal_rc_throttle)
self.wait_groundspeed(5, 100)
self.progress("allow overrides")
self.set_rc(12, 2000)
self.progress("now override to stop")
throttle_override_normalized = 0
expected_throttle = 0
tstart = self.get_sim_time_cached()
while True:
if self.get_sim_time_cached() - tstart > 10:
raise AutoTestTimeoutException("Did not reach speed")
self.progress("Sending normalized throttle of %d" % (throttle_override_normalized,))
self.mav.mav.manual_control_send(
1,
32767,
32767,
throttle_override_normalized,
32767,
0)
m = self.assert_receive_message('VFR_HUD')
want_speed = 2.0
self.progress("Speed=%f want=<%f throttle=%u want=%u" %
(m.groundspeed, want_speed, m.throttle, expected_throttle))
if m.groundspeed < want_speed and m.throttle == expected_throttle:
break
self.progress("now override to stop - but set the switch on the RC transmitter to deny overrides; this should send the speed back up to 5 metres/second")
self.set_rc(12, 1000)
throttle_override_normalized = 500
expected_throttle = 36
tstart = self.get_sim_time_cached()
while True:
if self.get_sim_time_cached() - tstart > 10:
raise AutoTestTimeoutException("Did not stop")
self.progress("Sending normalized throttle of %u" % (throttle_override_normalized,))
self.mav.mav.manual_control_send(
1,
32767,
32767,
throttle_override_normalized,
32767,
0)
m = self.assert_receive_message('VFR_HUD')
want_speed = 5.0
self.progress("Speed=%f want=>%f throttle=%u want=%u" %
(m.groundspeed, want_speed, m.throttle, expected_throttle))
if m.groundspeed > want_speed and m.throttle == expected_throttle:
break
self.set_rc(12, 2000)
self.progress("Waiting for RC to revert to normal RC input")
self.wait_rc_channel_value(3, normal_rc_throttle, timeout=10)
self.disarm_vehicle()
def CameraMission(self):
'''Test Camera Mission Items'''
self.set_parameter("CAM1_TYPE", 1)
self.reboot_sitl()
self.load_mission("rover-camera-mission.txt")
self.wait_ready_to_arm()
self.change_mode("AUTO")
self.wait_ready_to_arm()
self.arm_vehicle()
prev_cf = None
while True:
cf = self.assert_receive_message('CAMERA_FEEDBACK')
if prev_cf is None:
prev_cf = cf
continue
dist_travelled = self.get_distance_int(prev_cf, cf)
prev_cf = cf
mc = self.mav.messages.get("MISSION_CURRENT", None)
if mc is None:
continue
elif mc.seq == 2:
expected_distance = 2
elif mc.seq == 4:
expected_distance = 5
elif mc.seq == 5:
break
else:
continue
self.progress("Expected distance %f got %f" %
(expected_distance, dist_travelled))
error = abs(expected_distance - dist_travelled)
max_error = 1.5
if error > max_error:
raise NotAchievedException("Camera distance error: %f (%f)" %
(error, max_error))
self.disarm_vehicle()
def DO_SET_MODE(self):
'''Set mode via MAV_COMMAND_DO_SET_MODE'''
self.do_set_mode_via_command_long("HOLD")
self.do_set_mode_via_command_long("MANUAL")
self.do_set_mode_via_command_int("HOLD")
self.do_set_mode_via_command_int("MANUAL")
def RoverInitialMode(self):
'''test INITIAL_MODE parameter works'''
self.wait_ready_to_arm()
mapping = [0, 1165, 1295, 1425, 1555, 1685, 1815]
mode_ch = 8
throttle_ch = 3
self.set_parameter('MODE5', 3)
self.set_rc(mode_ch, mapping[5])
self.wait_mode('STEERING')
self.set_rc(mode_ch, mapping[6])
self.wait_mode('MANUAL')
self.set_parameter("INITIAL_MODE", 1)
self.set_parameter('FS_ACTION', 0)
self.set_rc(throttle_ch, 900)
self.reboot_sitl()
self.wait_mode(1)
self.progress("Make sure we stay in this mode")
self.delay_sim_time(5)
self.wait_mode(1)
self.set_rc(throttle_ch, 1100)
self.delay_sim_time(3)
self.set_rc(mode_ch, mapping[5])
self.wait_mode('STEERING')
def MAVProxy_DO_SET_MODE(self):
'''Set mode using MAVProxy commandline DO_SET_MODE'''
mavproxy = self.start_mavproxy()
self.mavproxy_do_set_mode_via_command_long(mavproxy, "HOLD")
self.mavproxy_do_set_mode_via_command_long(mavproxy, "MANUAL")
self.stop_mavproxy(mavproxy)
def MAV_GCS_ENFORCE(self):
'''Test enforcement of strict mode for MAV_GCS_SYSID'''
'''Run the same arming code with correct then incorrect SYSID'''
if self.mav.source_system != self.mav.mav.srcSystem:
raise PreconditionFailedException("Expected mav.source_system and mav.srcSystem to match")
self.context_push()
old_srcSystem = self.mav.mav.srcSystem
ex = None
try:
self.set_parameter("MAV_GCS_SYSID", self.mav.source_system)
self.set_parameter("MAV_OPTIONS", 1, add_to_context=False)
self.change_mode('MANUAL')
self.progress("make sure I can arm ATM")
self.wait_ready_to_arm()
self.arm_vehicle(timeout=5)
self.disarm_vehicle()
self.do_timesync_roundtrip()
self.progress("Attempting to arm vehicle from bad system-id")
success = None
try:
self.mav.mav.srcSystem = 72
self.arm_vehicle(timeout=5)
self.disarm_vehicle()
success = False
except AutoTestTimeoutException:
success = True
self.mav.mav.srcSystem = old_srcSystem
if not success:
raise NotAchievedException("Managed to arm with MAV_GCS enforce set")
self.progress("Attempting to arm vehicle from vehicle component")
comp_arm_exception = None
try:
self.mav.mav.srcSystem = 1
self.arm_vehicle(timeout=5)
self.disarm_vehicle()
except Exception as e:
comp_arm_exception = e
self.mav.mav.srcSystem = old_srcSystem
if comp_arm_exception is not None:
raise comp_arm_exception
except Exception as e:
self.print_exception_caught(e)
ex = e
self.mav.mav.srcSystem = old_srcSystem
self.set_parameter("MAV_OPTIONS", 0, add_to_context=False)
self.context_pop()
if ex is not None:
raise ex
def Rally(self):
'''Test Rally Points'''
self.load_rally_using_mavproxy("rover-test-rally.txt")
self.assert_parameter_value('RALLY_TOTAL', 2)
self.wait_ready_to_arm()
self.arm_vehicle()
accuracy = self.get_parameter("WP_RADIUS")
self.reach_heading_manual(10)
self.reach_distance_manual(50)
self.change_mode("RTL")
loc = mavutil.location(40.071553,
-105.229401,
1583,
0)
self.wait_location(loc, accuracy=accuracy, minimum_duration=10, timeout=45)
self.disarm_vehicle()
def fence_with_bad_frame(self, target_system=1, target_component=1):
return [
self.mav.mav.mission_item_int_encode(
target_system,
target_component,
0,
mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT,
mavutil.mavlink.MAV_CMD_NAV_FENCE_RETURN_POINT,
0,
0,
0,
0,
0,
0,
int(1.0017 * 1e7),
int(1.0017 * 1e7),
31.0000,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE),
]
def fence_with_zero_vertex_count(self, target_system=1, target_component=1):
return [
self.mav.mav.mission_item_int_encode(
target_system,
target_component,
0,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_INCLUSION,
0,
0,
0,
0,
0,
0,
int(1.0017 * 1e7),
int(1.0017 * 1e7),
31.0000,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE),
]
def fence_with_wrong_vertex_count(self, target_system=1, target_component=1):
return [
self.mav.mav.mission_item_int_encode(
target_system,
target_component,
0,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_INCLUSION,
0,
0,
2,
0,
0,
0,
int(1.0017 * 1e7),
int(1.0017 * 1e7),
31.0000,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE),
]
def fence_with_multiple_return_points(self, target_system=1, target_component=1):
return [
self.mav.mav.mission_item_int_encode(
target_system,
target_component,
0,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_FENCE_RETURN_POINT,
0,
0,
0,
0,
0,
0,
int(1.0017 * 1e7),
int(1.0017 * 1e7),
31.0000,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE),
self.mav.mav.mission_item_int_encode(
target_system,
target_component,
1,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_FENCE_RETURN_POINT,
0,
0,
0,
0,
0,
0,
int(1.0017 * 1e7),
int(1.0017 * 1e7),
31.0000,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE),
]
def fence_with_invalid_latlon(self, target_system=1, target_component=1):
return [
self.mav.mav.mission_item_int_encode(
target_system,
target_component,
0,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_FENCE_RETURN_POINT,
0,
0,
0,
0,
0,
0,
int(100 * 1e7),
int(1.0017 * 1e7),
31.0000,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE),
]
def fence_with_multiple_return_points_with_bad_sequence_numbers(self, target_system=1, target_component=1):
return [
self.mav.mav.mission_item_int_encode(
target_system,
target_component,
0,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_FENCE_RETURN_POINT,
0,
0,
0,
0,
0,
0,
int(1.0 * 1e7),
int(1.0017 * 1e7),
31.0000,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE),
self.mav.mav.mission_item_int_encode(
target_system,
target_component,
0,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_FENCE_RETURN_POINT,
0,
0,
0,
0,
0,
0,
int(2.0 * 1e7),
int(2.0017 * 1e7),
31.0000,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE),
]
def fence_which_exceeds_storage_space(self, target_system=1, target_component=1):
ret = []
for i in range(0, 60):
ret.append(self.mav.mav.mission_item_int_encode(
target_system,
target_component,
i,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_FENCE_CIRCLE_EXCLUSION,
0,
0,
10,
0,
0,
0,
int(1.0 * 1e7),
int(1.0017 * 1e7),
31.0000,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE),
)
return ret
def fences_which_should_not_upload(self, target_system=1, target_component=1):
return [
("Bad Frame", self.fence_with_bad_frame(
target_system=target_system,
target_component=target_component)),
("Zero Vertex Count", self.fence_with_zero_vertex_count(
target_system=target_system,
target_component=target_component)),
("Wrong Vertex Count", self.fence_with_wrong_vertex_count(
target_system=target_system,
target_component=target_component)),
("Multiple return points", self.fence_with_multiple_return_points(
target_system=target_system,
target_component=target_component)),
("Invalid lat/lon", self.fence_with_invalid_latlon(
target_system=target_system,
target_component=target_component)),
("Multiple Return points with bad sequence numbers",
self.fence_with_multiple_return_points_with_bad_sequence_numbers(
target_system=target_system,
target_component=target_component)),
("Fence which exceeds storage space",
self.fence_which_exceeds_storage_space(
target_system=target_system,
target_component=target_component)),
]
def fence_with_single_return_point(self, target_system=1, target_component=1):
return [
self.mav.mav.mission_item_int_encode(
target_system,
target_component,
0,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_FENCE_RETURN_POINT,
0,
0,
0,
0,
0,
0,
int(1.0017 * 1e7),
int(1.0017 * 1e7),
31.0000,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE),
]
def fence_with_single_return_point_and_5_vertex_inclusion(self,
target_system=1,
target_component=1):
return [
self.mav.mav.mission_item_int_encode(
target_system,
target_component,
0,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_FENCE_RETURN_POINT,
0,
0,
0,
0,
0,
0,
int(1.0017 * 1e7),
int(1.0017 * 1e7),
31.0000,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE),
self.mav.mav.mission_item_int_encode(
target_system,
target_component,
1,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_INCLUSION,
0,
0,
5,
0,
0,
0,
int(1.0000 * 1e7),
int(1.0000 * 1e7),
31.0000,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE),
self.mav.mav.mission_item_int_encode(
target_system,
target_component,
2,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_INCLUSION,
0,
0,
5,
0,
0,
0,
int(1.0001 * 1e7),
int(1.0000 * 1e7),
32.0000,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE),
self.mav.mav.mission_item_int_encode(
target_system,
target_component,
3,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_INCLUSION,
0,
0,
5,
0,
0,
0,
int(1.0001 * 1e7),
int(1.0001 * 1e7),
33.0000,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE),
self.mav.mav.mission_item_int_encode(
target_system,
target_component,
4,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_INCLUSION,
0,
0,
5,
0,
0,
0,
int(1.0002 * 1e7),
int(1.0002 * 1e7),
33.0000,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE),
self.mav.mav.mission_item_int_encode(
target_system,
target_component,
5,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_INCLUSION,
0,
0,
5,
0,
0,
0,
int(1.0002 * 1e7),
int(1.0003 * 1e7),
33.0000,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE),
]
def fence_with_many_exclusion_circles(self, count=50, target_system=1, target_component=1):
ret = []
for i in range(0, count):
lat_deg = 1.0003 + count/10
lng_deg = 1.0002 + count/10
item = self.mav.mav.mission_item_int_encode(
target_system,
target_component,
i,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_FENCE_CIRCLE_EXCLUSION,
0,
0,
count,
0,
0,
0,
int(lat_deg * 1e7),
int(lng_deg * 1e7),
33.0000,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE)
ret.append(item)
return ret
def fence_with_many_exclusion_polyfences(self, target_system=1, target_component=1):
ret = []
seq = 0
for fencenum in range(0, 4):
pointcount = fencenum + 6
for p in range(0, pointcount):
lat_deg = 1.0003 + p/10 + fencenum/100
lng_deg = 1.0002 + p/10 + fencenum/100
item = self.mav.mav.mission_item_int_encode(
target_system,
target_component,
seq,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_EXCLUSION,
0,
0,
pointcount,
0,
0,
0,
int(lat_deg * 1e7),
int(lng_deg * 1e7),
33.0000,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE)
ret.append(item)
seq += 1
return ret
def fences_which_should_upload(self, target_system=1, target_component=1):
return [
("Single Return Point",
self.fence_with_single_return_point(
target_system=target_system,
target_component=target_component)),
("Return and 5-vertex-inclusion",
self.fence_with_single_return_point_and_5_vertex_inclusion(
target_system=target_system,
target_component=target_component)),
("Many exclusion circles",
self.fence_with_many_exclusion_circles(
target_system=target_system,
target_component=target_component)),
("Many exclusion polyfences",
self.fence_with_many_exclusion_polyfences(
target_system=target_system,
target_component=target_component)),
("Empty fence", []),
]
def assert_fence_does_not_upload(self, fence, target_system=1, target_component=1):
self.clear_mission(mavutil.mavlink.MAV_MISSION_TYPE_FENCE,
target_system=target_system,
target_component=target_component)
upload_failed = False
try:
self.upload_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_FENCE,
fence)
except NotAchievedException:
upload_failed = True
if not upload_failed:
raise NotAchievedException("Uploaded fence when should not be possible")
self.progress("Fence rightfully bounced")
def send_fencepoint_expect_statustext(self,
offset,
count,
lat,
lng,
statustext_fragment,
target_system=1,
target_component=1,
timeout=10):
self.mav.mav.fence_point_send(target_system,
target_component,
offset,
count,
lat,
lng)
tstart = self.get_sim_time_cached()
while True:
if self.get_sim_time_cached() - tstart > timeout:
raise NotAchievedException("Did not get error message back")
m = self.assert_receive_message('STATUSTEXT')
self.progress("statustext: %s (want='%s')" %
(str(m), statustext_fragment))
if m is None:
continue
if statustext_fragment in m.text:
break
def GCSFailsafe(self, side=60, timeout=360):
"""Test GCS Failsafe"""
try:
self.test_gcs_failsafe(side=side, timeout=timeout)
except Exception as ex:
self.setGCSfailsafe(0)
self.set_parameter('FS_ACTION', 0)
self.disarm_vehicle(force=True)
self.reboot_sitl()
raise ex
def test_gcs_failsafe(self, side=60, timeout=360):
self.set_parameter("MAV_GCS_SYSID", self.mav.source_system)
self.set_parameter("FS_ACTION", 1)
self.set_parameter("FS_THR_ENABLE", 0)
def go_somewhere():
self.change_mode("MANUAL")
self.wait_ready_to_arm()
self.arm_vehicle()
self.set_rc(3, 2000)
self.delay_sim_time(5)
self.set_rc(3, 1500)
self.start_subtest("GCS failsafe disabled test: FS_GCS_ENABLE=0 should take no failsafe action")
self.setGCSfailsafe(0)
go_somewhere()
self.set_heartbeat_rate(0)
self.delay_sim_time(5)
self.wait_mode("MANUAL")
self.set_heartbeat_rate(self.speedup)
self.delay_sim_time(5)
self.wait_mode("MANUAL")
self.end_subtest("Completed GCS failsafe disabled test")
self.start_subtest("GCS failsafe RTL with no options test: FS_GCS_ENABLE=1")
self.setGCSfailsafe(1)
self.set_heartbeat_rate(0)
self.wait_mode("RTL")
self.wait_statustext("Reached destination", timeout=60)
self.set_heartbeat_rate(self.speedup)
self.wait_statustext("GCS Failsafe Cleared", timeout=60)
self.end_subtest("Completed GCS failsafe RTL")
self.start_subtest("GCS failsafe invalid value with no options test: FS_GCS_ENABLE=99")
self.setGCSfailsafe(99)
go_somewhere()
self.set_heartbeat_rate(0)
self.wait_mode("RTL")
self.wait_statustext("Reached destination", timeout=60)
self.set_heartbeat_rate(self.speedup)
self.wait_statustext("GCS Failsafe Cleared", timeout=60)
self.end_subtest("Completed GCS failsafe invalid value")
self.start_subtest("Testing continue in auto mission")
self.disarm_vehicle()
self.setGCSfailsafe(2)
self.load_mission("test_arming.txt")
self.change_mode("AUTO")
self.delay_sim_time(5)
self.set_heartbeat_rate(0)
self.wait_statustext("Failsafe - Continuing Auto Mode", timeout=60)
self.delay_sim_time(5)
self.wait_mode("AUTO")
self.set_heartbeat_rate(self.speedup)
self.wait_statustext("GCS Failsafe Cleared", timeout=60)
self.start_subtest("GCS failsafe RTL with no options test: FS_GCS_ENABLE=1 and FS_GCS_TIMEOUT=10")
self.setGCSfailsafe(1)
old_gcs_timeout = self.get_parameter("FS_GCS_TIMEOUT")
new_gcs_timeout = old_gcs_timeout * 2
self.set_parameter("FS_GCS_TIMEOUT", new_gcs_timeout)
go_somewhere()
self.set_heartbeat_rate(0)
self.delay_sim_time(old_gcs_timeout + (new_gcs_timeout - old_gcs_timeout) / 2)
self.assert_mode("MANUAL")
self.wait_mode("RTL")
self.wait_statustext("Reached destination", timeout=60)
self.set_heartbeat_rate(self.speedup)
self.wait_statustext("GCS Failsafe Cleared", timeout=60)
self.disarm_vehicle()
self.end_subtest("Completed GCS failsafe RTL")
self.setGCSfailsafe(0)
self.progress("All GCS failsafe tests complete")
def test_gcs_fence_centroid(self, target_system=1, target_component=1):
self.start_subtest("Ensuring if we don't have a centroid it gets calculated")
items = self.test_gcs_fence_need_centroid(
target_system=target_system,
target_component=target_component)
self.upload_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_FENCE,
items)
centroid = self.get_fence_point(0)
want_lat = 1.0001
want_lng = 1.00005
if abs(centroid.lat - want_lat) > 0.000001:
raise NotAchievedException("Centroid lat not as expected (want=%f got=%f)" % (want_lat, centroid.lat))
if abs(centroid.lng - want_lng) > 0.000001:
raise NotAchievedException("Centroid lng not as expected (want=%f got=%f)" % (want_lng, centroid.lng))
def test_gcs_fence_update_fencepoint(self, target_system=1, target_component=1):
self.start_subtest("Ensuring we can move a fencepoint")
items = self.test_gcs_fence_boring_triangle(
target_system=target_system,
target_component=target_component)
self.upload_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_FENCE,
items)
item_seq = 2
item = items[item_seq]
print("item is (%s)" % str(item))
self.progress("original x=%d" % item.x)
item.x += int(0.1 * 1e7)
self.progress("new x=%d" % item.x)
self.progress("try to overwrite item %u" % item_seq)
self.mav.mav.mission_write_partial_list_send(
target_system,
target_component,
item_seq,
item_seq,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE)
self.assert_receive_mission_item_request(mavutil.mavlink.MAV_MISSION_TYPE_FENCE, item_seq)
item.pack(self.mav.mav)
self.mav.mav.send(item)
self.progress("Answered request for fence point %u" % item_seq)
self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_FENCE)
downloaded_items2 = self.download_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_FENCE)
if downloaded_items2[item_seq].x != item.x:
raise NotAchievedException("Item did not update")
self.check_fence_items_same([items[0], items[1], item, items[3]], downloaded_items2)
def test_gcs_fence_boring_triangle(self, target_system=1, target_component=1):
return copy.copy([
self.mav.mav.mission_item_int_encode(
target_system,
target_component,
0,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_INCLUSION,
0,
0,
3,
0,
0,
0,
int(1.0000 * 1e7),
int(1.0000 * 1e7),
31.0000,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE),
self.mav.mav.mission_item_int_encode(
target_system,
target_component,
1,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_INCLUSION,
0,
0,
3,
0,
0,
0,
int(1.0001 * 1e7),
int(1.0000 * 1e7),
32.0000,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE),
self.mav.mav.mission_item_int_encode(
target_system,
target_component,
2,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_INCLUSION,
0,
0,
3,
0,
0,
0,
int(1.0001 * 1e7),
int(1.0001 * 1e7),
33.0000,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE),
self.mav.mav.mission_item_int_encode(
target_system,
target_component,
3,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_FENCE_RETURN_POINT,
0,
0,
0,
0,
0,
0,
int(1.00015 * 1e7),
int(1.00015 * 1e7),
33.0000,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE),
])
def test_gcs_fence_need_centroid(self, target_system=1, target_component=1):
return copy.copy([
self.mav.mav.mission_item_int_encode(
target_system,
target_component,
0,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_INCLUSION,
0,
0,
4,
0,
0,
0,
int(1.0000 * 1e7),
int(1.0000 * 1e7),
31.0000,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE),
self.mav.mav.mission_item_int_encode(
target_system,
target_component,
1,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_INCLUSION,
0,
0,
4,
0,
0,
0,
int(1.0002 * 1e7),
int(1.0000 * 1e7),
32.0000,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE),
self.mav.mav.mission_item_int_encode(
target_system,
target_component,
2,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_INCLUSION,
0,
0,
4,
0,
0,
0,
int(1.0002 * 1e7),
int(1.0001 * 1e7),
33.0000,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE),
self.mav.mav.mission_item_int_encode(
target_system,
target_component,
3,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_INCLUSION,
0,
0,
4,
0,
0,
0,
int(1.0000 * 1e7),
int(1.0001 * 1e7),
33.0000,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE),
])
def click_location_from_item(self, mavproxy, item):
mavproxy.send("click %f %f\n" % (item.x*1e-7, item.y*1e-7))
def test_gcs_fence_via_mavproxy(self, target_system=1, target_component=1):
self.start_subtest("Fence via MAVProxy")
if not self.mavproxy_can_do_mision_item_protocols():
return
mavproxy = self.start_mavproxy()
self.start_subsubtest("fence addcircle")
self.clear_fence_using_mavproxy(mavproxy)
self.delay_sim_time(1)
radius = 20
item = self.mav.mav.mission_item_int_encode(
target_system,
target_component,
0,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_FENCE_CIRCLE_INCLUSION,
0,
0,
radius,
0,
0,
0,
int(1.0017 * 1e7),
int(1.0017 * 1e7),
0.0000,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE)
print("item is (%s)" % str(item))
self.click_location_from_item(mavproxy, item)
mavproxy.send("fence addcircle inc %u\n" % radius)
self.delay_sim_time(1)
downloaded_items = self.download_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_FENCE)
print("downloaded items: %s" % str(downloaded_items))
self.check_fence_items_same([item], downloaded_items)
radius_exc = 57.3
item2 = self.mav.mav.mission_item_int_encode(
target_system,
target_component,
0,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_FENCE_CIRCLE_EXCLUSION,
0,
0,
radius_exc,
0,
0,
0,
int(1.0017 * 1e7),
int(1.0017 * 1e7),
0.0000,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE)
self.click_location_from_item(mavproxy, item2)
mavproxy.send("fence addcircle exc %f\n" % radius_exc)
self.delay_sim_time(1)
downloaded_items = self.download_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_FENCE)
print("downloaded items: %s" % str(downloaded_items))
self.check_fence_items_same([item, item2], downloaded_items)
self.end_subsubtest("fence addcircle")
self.start_subsubtest("fence addpoly")
self.clear_fence_using_mavproxy(mavproxy)
self.delay_sim_time(1)
pointcount = 7
mavproxy.send("fence addpoly inc 20 %u 37.2\n" % pointcount)
self.delay_sim_time(5)
downloaded_items = self.download_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_FENCE)
if len(downloaded_items) != pointcount:
raise NotAchievedException("Did not get expected number of points returned (want=%u got=%u)" %
(pointcount, len(downloaded_items)))
self.end_subsubtest("fence addpoly")
self.start_subsubtest("fence movepolypoint")
self.clear_fence_using_mavproxy(mavproxy)
self.delay_sim_time(1)
triangle = self.test_gcs_fence_boring_triangle(
target_system=target_system,
target_component=target_component)
self.upload_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_FENCE,
triangle)
mavproxy.send("fence list\n")
self.delay_sim_time(1)
triangle[2].x += 500
triangle[2].y += 700
self.click_location_from_item(mavproxy, triangle[2])
mavproxy.send("fence movepolypoint 0 2\n")
self.delay_sim_time(10)
downloaded_items = self.download_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_FENCE)
self.check_fence_items_same(triangle, downloaded_items)
self.end_subsubtest("fence movepolypoint")
self.start_subsubtest("fence enable and disable")
mavproxy.send("fence enable\n")
mavproxy.expect("fence enabled")
mavproxy.send("fence disable\n")
mavproxy.expect("fence disabled")
self.end_subsubtest("fence enable and disable")
self.stop_mavproxy(mavproxy)
def GCSFence(self):
'''Upload and download of fence'''
target_system = 1
target_component = 1
self.progress("Testing FENCE_POINT protocol")
self.start_subtest("FENCE_TOTAL manipulation")
self.clear_mission(mavutil.mavlink.MAV_MISSION_TYPE_FENCE)
self.assert_parameter_value("FENCE_TOTAL", 0)
self.set_parameter("FENCE_TOTAL", 5)
self.assert_parameter_value("FENCE_TOTAL", 5)
self.clear_mission(mavutil.mavlink.MAV_MISSION_TYPE_FENCE)
self.assert_parameter_value("FENCE_TOTAL", 0)
self.progress("sending out-of-range fencepoint")
self.send_fencepoint_expect_statustext(0,
0,
1.2345,
5.4321,
"index past total",
target_system=target_component,
target_component=target_component)
self.progress("sending another out-of-range fencepoint")
self.send_fencepoint_expect_statustext(0,
1,
1.2345,
5.4321,
"bad count",
target_system=target_component,
target_component=target_component)
self.set_parameter("FENCE_TOTAL", 1)
self.assert_parameter_value("FENCE_TOTAL", 1)
self.send_fencepoint_expect_statustext(0,
1,
1.2345,
5.4321,
"Invalid FENCE_TOTAL",
target_system=target_component,
target_component=target_component)
self.set_parameter("FENCE_TOTAL", 5)
self.progress("Checking default points")
for i in range(5):
m = self.get_fence_point(i)
if m.count != 5:
raise NotAchievedException("Unexpected count in fence point (want=%u got=%u" %
(5, m.count))
if m.lat != 0 or m.lng != 0:
raise NotAchievedException("Unexpected lat/lon in fencepoint")
self.progress("Storing a return point")
self.roundtrip_fencepoint_protocol(0,
5,
1.2345,
5.4321,
target_system=target_system,
target_component=target_component)
lat = 2.345
lng = 4.321
self.roundtrip_fencepoint_protocol(0,
5,
lat,
lng,
target_system=target_system,
target_component=target_component)
if not self.mavproxy_can_do_mision_item_protocols():
self.progress("MAVProxy too old to do fence point protocols")
return
self.progress("Download with new protocol")
items = self.download_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_FENCE)
if len(items) != 1:
raise NotAchievedException("Unexpected fencepoint count (want=%u got=%u)" % (1, len(items)))
if items[0].command != mavutil.mavlink.MAV_CMD_NAV_FENCE_RETURN_POINT:
raise NotAchievedException(
"Fence return point not of correct type expected (%u) got %u" %
(items[0].command,
mavutil.mavlink.MAV_CMD_NAV_FENCE_RETURN_POINT))
if items[0].frame != mavutil.mavlink.MAV_FRAME_GLOBAL:
raise NotAchievedException(
"Unexpected frame want=%s got=%s," %
(self.string_for_frame(mavutil.mavlink.MAV_FRAME_GLOBAL),
self.string_for_frame(items[0].frame)))
got_lat = items[0].x
want_lat = lat * 1e7
if abs(got_lat - want_lat) > 1:
raise NotAchievedException("Disagree in lat (got=%f want=%f)" % (got_lat, want_lat))
if abs(items[0].y - lng * 1e7) > 1:
raise NotAchievedException("Disagree in lng")
if items[0].seq != 0:
raise NotAchievedException("Disagree in offset")
self.progress("Downloaded with new protocol OK")
items = self.test_gcs_fence_boring_triangle(
target_system=target_system,
target_component=target_component)
self.upload_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_FENCE,
items)
self.progress("Download with new protocol")
downloaded_items = self.download_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_FENCE)
if len(downloaded_items) != len(items):
raise NotAchievedException("Did not download expected number of items (wanted=%u got=%u)" %
(len(items), len(downloaded_items)))
self.assert_parameter_value("FENCE_TOTAL", len(items) + 1)
self.progress("Ensuring fence items match what we sent up")
self.check_fence_items_same(items, downloaded_items)
self.progress("Requesting fence return point")
self.mav.mav.fence_fetch_point_send(target_system,
target_component,
0)
m = self.assert_receive_message("FENCE_POINT")
print("m: %s" % str(m))
self.clear_mission(mavutil.mavlink.MAV_MISSION_TYPE_FENCE,
target_system=target_system,
target_component=target_component)
self.progress("Checking count post-nuke")
self.clear_mission(mavutil.mavlink.MAV_MISSION_TYPE_FENCE,
target_system=target_system,
target_component=target_component)
self.assert_mission_count_on_link(self.mav,
0,
target_system,
target_component,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE)
self.start_subtest("Ensuring bad fences get bounced")
for fence in self.fences_which_should_not_upload(target_system=target_system, target_component=target_component):
(name, items) = fence
self.progress("Ensuring (%s) gets bounced" % (name,))
self.assert_fence_does_not_upload(items)
self.start_subtest("Ensuring good fences don't get bounced")
for fence in self.fences_which_should_upload(target_system=target_system, target_component=target_component):
(name, items) = fence
self.progress("Ensuring (%s) gets uploaded" % (name,))
self.check_fence_upload_download(items)
self.progress("(%s) uploaded just fine" % (name,))
self.test_gcs_fence_update_fencepoint(target_system=target_system,
target_component=target_component)
self.test_gcs_fence_centroid(target_system=target_system,
target_component=target_component)
self.test_gcs_fence_via_mavproxy(target_system=target_system,
target_component=target_component)
def GCSFenceInvalidPoint(self):
'''Test fetch non-existent fencepoint'''
target_system = 1
target_component = 1
here = self.mav.location()
self.upload_fences_from_locations([
(mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_EXCLUSION, [
self.offset_location_ne(here, -50, 20),
self.offset_location_ne(here, 50, 20),
self.offset_location_ne(here, 50, 40),
]),
])
for i in 0, 1, 2:
self.assert_fetch_mission_item_int(target_system, target_component, i, mavutil.mavlink.MAV_MISSION_TYPE_FENCE)
self.progress("Requesting invalid point")
self.mav.mav.mission_request_int_send(
target_system,
target_component,
3,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE
)
self.context_push()
self.context_collect('MISSION_COUNT')
m = self.assert_receive_mission_ack(
mavutil.mavlink.MAV_MISSION_TYPE_FENCE,
want_type=mavutil.mavlink.MAV_MISSION_INVALID_SEQUENCE,
)
self.delay_sim_time(0.1)
found = False
for m in self.context_collection('MISSION_COUNT'):
if m.mission_type != mavutil.mavlink.MAV_MISSION_TYPE_FENCE:
continue
if m.count != 3:
raise NotAchievedException("Unexpected count in MISSION_COUNT")
found = True
self.context_pop()
if not found:
raise NotAchievedException("Did not see correction for fencepoint count")
def Offboard(self, timeout=90):
'''Test Offboard Control'''
self.load_mission("rover-guided-mission.txt")
self.wait_ready_to_arm(require_absolute=True)
self.arm_vehicle()
self.change_mode("AUTO")
offboard_expected_duration = 10
if self.mav.messages.get("SET_POSITION_TARGET_GLOBAL_INT", None):
raise PreconditionFailedException("Already have SET_POSITION_TARGET_GLOBAL_INT")
tstart = self.get_sim_time_cached()
last_heartbeat_sent = 0
got_ptgi = False
magic_waypoint_tstart = 0
magic_waypoint_tstop = 0
while True:
now = self.get_sim_time_cached()
if now - last_heartbeat_sent > 1:
last_heartbeat_sent = now
self.mav.mav.heartbeat_send(mavutil.mavlink.MAV_TYPE_ONBOARD_CONTROLLER,
mavutil.mavlink.MAV_AUTOPILOT_INVALID,
0,
0,
0)
if now - tstart > timeout:
raise AutoTestTimeoutException("Didn't complete")
magic_waypoint = 3
mc = self.mav.recv_match(type=["MISSION_CURRENT", "STATUSTEXT"],
blocking=False)
if mc is not None:
print("%s" % str(mc))
if mc.get_type() == "STATUSTEXT":
if "Mission Complete" in mc.text:
break
continue
if mc.seq == magic_waypoint:
print("At magic waypoint")
if magic_waypoint_tstart == 0:
magic_waypoint_tstart = self.get_sim_time_cached()
ptgi = self.mav.messages.get("POSITION_TARGET_GLOBAL_INT", None)
if ptgi is not None:
got_ptgi = True
elif mc.seq > magic_waypoint:
if magic_waypoint_tstop == 0:
magic_waypoint_tstop = self.get_sim_time_cached()
self.disarm_vehicle()
offboard_duration = magic_waypoint_tstop - magic_waypoint_tstart
if abs(offboard_duration - offboard_expected_duration) > 1:
raise NotAchievedException("Did not stay in offboard control for correct time (want=%f got=%f)" %
(offboard_expected_duration, offboard_duration))
if not got_ptgi:
raise NotAchievedException("Did not get ptgi message")
print("pgti: %s" % str(ptgi))
def assert_mission_count_on_link(self, mav, expected_count, target_system, target_component, mission_type):
self.drain_mav_unparsed(mav=mav, freshen_sim_time=True)
self.progress("waiting for a message - any message....")
m = mav.recv_match(blocking=True, timeout=1)
self.progress("Received (%s)" % str(m))
if not mav.mavlink20():
raise NotAchievedException("Not doing mavlink2")
mav.mav.mission_request_list_send(target_system,
target_component,
mission_type)
self.assert_receive_mission_count_on_link(mav,
expected_count,
target_system,
target_component,
mission_type)
def assert_receive_mission_count_on_link(self,
mav,
expected_count,
target_system,
target_component,
mission_type,
expected_target_system=None,
expected_target_component=None,
timeout=120):
if expected_target_system is None:
expected_target_system = mav.mav.srcSystem
if expected_target_component is None:
expected_target_component = mav.mav.srcComponent
self.progress("Waiting for mission count of (%u) from (%u:%u) to (%u:%u)" %
(expected_count, target_system, target_component, expected_target_system, expected_target_component))
tstart = self.get_sim_time_cached()
while True:
delta = self.get_sim_time_cached() - tstart
if delta > timeout:
raise NotAchievedException("Did not receive MISSION_COUNT on link after %fs" % delta)
m = mav.recv_match(blocking=True, timeout=1)
if m is None:
self.progress("No messages")
continue
if m.get_type() == "MISSION_ACK":
if m.type != mavutil.mavlink.MAV_MISSION_ACCEPTED:
raise NotAchievedException("Expected MAV_MISSION_ACCEPTED, got (%s)" % m)
if m.get_type() == "MISSION_COUNT":
break
if m.target_system != expected_target_system:
raise NotAchievedException("Incorrect target system in MISSION_COUNT (want=%u got=%u)" %
(expected_target_system, m.target_system))
if m.target_component != expected_target_component:
raise NotAchievedException("Incorrect target component in MISSION_COUNT")
if m.mission_type != mission_type:
raise NotAchievedException("Did not get expected mission type (want=%u got=%u)" % (mission_type, m.mission_type))
if m.count != expected_count:
raise NotAchievedException("Bad count received (want=%u got=%u)" %
(expected_count, m.count))
self.progress("Asserted mission count (type=%u) is %u after %fs" % (
(mission_type, m.count, delta)))
def get_mission_item_int_on_link(self, item, mav, target_system, target_component, mission_type, delay_fn=None):
self.drain_mav(mav=mav, unparsed=True)
mav.mav.mission_request_int_send(target_system,
target_component,
item,
mission_type)
m = self.assert_receive_message(
'MISSION_ITEM_INT',
timeout=60,
condition='MISSION_ITEM_INT.mission_type==%u' % mission_type,
delay_fn=delay_fn)
if m is None:
raise NotAchievedException("Did not receive MISSION_ITEM_INT")
if m.mission_type != mission_type:
raise NotAchievedException("Mission item of incorrect type")
if m.target_system != mav.mav.srcSystem:
raise NotAchievedException("Unexpected target system %u want=%u" %
(m.target_system, mav.mav.srcSystem))
if m.seq != item:
raise NotAchievedException(
"Incorrect sequence number on received item got=%u want=%u" %
(m.seq, item))
if m.mission_type != mission_type:
raise NotAchievedException(
"Mission type incorrect on received item (want=%u got=%u)" %
(mission_type, m.mission_type))
if m.target_component != mav.mav.srcComponent:
raise NotAchievedException(
"Unexpected target component %u want=%u" %
(m.target_component, mav.mav.srcComponent))
return m
def get_mission_item_on_link(self, item, mav, target_system, target_component, mission_type):
self.drain_mav(mav=mav, unparsed=True)
mav.mav.mission_request_send(target_system,
target_component,
item,
mission_type)
m = self.assert_receive_message('MISSION_ITEM', timeout=60)
if m.target_system != mav.mav.srcSystem:
raise NotAchievedException("Unexpected target system %u want=%u" %
(m.target_system, mav.mav.srcSystem))
if m.seq != item:
raise NotAchievedException("Incorrect sequence number on received item_int got=%u want=%u" %
(m.seq, item))
if m.mission_type != mission_type:
raise NotAchievedException("Mission type incorrect on received item_int (want=%u got=%u)" %
(mission_type, m.mission_type))
if m.target_component != mav.mav.srcComponent:
raise NotAchievedException("Unexpected target component %u want=%u" %
(m.target_component, mav.mav.srcComponent))
return m
def assert_receive_mission_item_request(self, mission_type, seq):
self.progress("Expecting request for item %u" % seq)
m = self.assert_receive_message('MISSION_REQUEST')
if m.mission_type != mission_type:
raise NotAchievedException("Incorrect mission type (wanted=%u got=%u)" %
(mission_type, m.mission_type))
if m.seq != seq:
raise NotAchievedException("Unexpected sequence number (want=%u got=%u)" % (seq, m.seq))
self.progress("Received item request OK")
def assert_receive_mission_ack(self, mission_type,
want_type=mavutil.mavlink.MAV_MISSION_ACCEPTED,
target_system=None,
target_component=None,
mav=None):
if mav is None:
mav = self.mav
if target_system is None:
target_system = mav.mav.srcSystem
if target_component is None:
target_component = mav.mav.srcComponent
self.progress("Expecting mission ack")
m = self.assert_receive_message(
'MISSION_ACK',
timeout=5,
verbose=True,
)
if m.target_system != target_system:
raise NotAchievedException("ACK not targeted at correct system want=%u got=%u" %
(target_system, m.target_system))
if m.target_component != target_component:
raise NotAchievedException("ACK not targeted at correct component")
if m.mission_type != mission_type:
raise NotAchievedException("Unexpected mission type %u want=%u" %
(m.mission_type, mission_type))
if m.type != want_type:
raise NotAchievedException("Expected ack type %u got %u" %
(want_type, m.type))
return m
def assert_filepath_content(self, filepath, want):
got = pathlib.Path(filepath).read_text()
if want != got:
raise NotAchievedException("Did not get expected file content (want=%s) (got=%s)" % (want, got))
def mavproxy_can_do_mision_item_protocols(self):
return False
if not self.mavproxy_version_gt(1, 8, 69):
self.progress("MAVProxy is too old; skipping tests")
return False
return True
def check_rally_items_same(self, want, got, epsilon=None):
check_atts = ['mission_type', 'command', 'x', 'y', 'z', 'seq', 'param1']
return self.check_mission_items_same(check_atts, want, got, epsilon=epsilon)
def click_three_in(self, mavproxy, target_system=1, target_component=1):
mavproxy.send('rally clear\n')
self.drain_mav()
mavproxy.send("click 1.0 1.0\n")
mavproxy.send("rally add\n")
self.delay_sim_time(1)
mavproxy.send("click 2.0 2.0\n")
mavproxy.send("rally add\n")
self.delay_sim_time(1)
mavproxy.send("click 3.0 3.0\n")
mavproxy.send("rally add\n")
self.delay_sim_time(10)
self.assert_mission_count_on_link(
self.mav,
3,
target_system,
target_component,
mavutil.mavlink.MAV_MISSION_TYPE_RALLY,
)
def GCSRally(self, target_system=1, target_component=1):
'''Upload and download of rally using MAVProxy'''
self.start_subtest("Testing mavproxy CLI for rally points")
mavproxy = self.start_mavproxy()
rallyalt = 87
mavproxy.send(f'set rallyalt {rallyalt}\n')
mavproxy.send('rally clear\n')
self.start_subsubtest("rally add")
mavproxy.send('rally clear\n')
lat_s = "-5.6789"
lng_s = "98.2341"
lat = float(lat_s)
lng = float(lng_s)
mavproxy.send(f'click {lat_s} {lng_s}\n')
self.drain_mav()
mavproxy.send('rally add\n')
self.delay_sim_time(2)
downloaded_items = self.download_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
if len(downloaded_items) != 1:
raise NotAchievedException("Unexpected count (got=%u want=1)" %
(len(downloaded_items), ))
if abs(downloaded_items[0].x - int(lat * 1e7)) > 1:
raise NotAchievedException("Bad rally lat. Want=%d got=%d" %
(int(lat * 1e7), downloaded_items[0].x))
if abs(downloaded_items[0].y - int(lng * 1e7)) > 1:
raise NotAchievedException("Bad rally lng. Want=%d got=%d" %
(int(lng * 1e7), downloaded_items[0].y))
if abs(downloaded_items[0].z - rallyalt) > 1:
raise NotAchievedException(f"Bad rally alt. Want={rallyalt} got={downloaded_items[0].z}")
self.end_subsubtest("rally add")
self.start_subsubtest("rally list")
util.pexpect_drain(mavproxy)
mavproxy.send('rally list\n')
mavproxy.expect(r"Saved 1 rally items to ([^\s]*)\s")
filename = mavproxy.match.group(1)
self.assert_rally_filepath_content(filename, '''QGC WPL 110
0 0 3 5100 0.000000 0.000000 0.000000 0.000000 -5.678900 98.234100 87.000000 0
''')
self.end_subsubtest("rally list")
self.start_subsubtest("rally save")
util.pexpect_drain(mavproxy)
save_tmppath = self.buildlogs_path("rally-testing-tmp.txt")
mavproxy.send('rally save %s\n' % save_tmppath)
mavproxy.expect(r"Saved 1 rally items to ([^\s]*)\s")
filename = mavproxy.match.group(1)
if filename != save_tmppath:
raise NotAchievedException("Bad save filepath; want=%s got=%s" % (save_tmppath, filename))
self.assert_rally_filepath_content(filename, '''QGC WPL 110
0 0 3 5100 0.000000 0.000000 0.000000 0.000000 -5.678900 98.234100 87.000000 0
''')
self.end_subsubtest("rally save")
self.start_subsubtest("rally savecsv")
util.pexpect_drain(mavproxy)
csvpath = self.buildlogs_path("rally-testing-tmp.csv")
mavproxy.send('rally savecsv %s\n' % csvpath)
(major, minor, micro, releaselevel, serial) = sys.version_info
if major == 3 and minor == 12:
expected_content = '''"0","Rel","NAV_RALLY_POINT","0.0","0.0","0.0","0.0","-5.678899899999999","98.2341","87.0"
'''
for i in range(1, 10):
if os.path.exists(csvpath):
break
time.sleep(0.1)
self.assert_filepath_content(csvpath, expected_content)
self.end_subsubtest("rally savecsv")
self.start_subsubtest("rally load")
self.drain_mav()
mavproxy.send('rally clear\n')
self.wait_heartbeat()
self.wait_heartbeat()
self.assert_mission_count_on_link(self.mav,
0,
target_system,
target_component,
mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
self.start_subtest("Check rally load from filepath")
mavproxy.send('rally load %s\n' % save_tmppath)
mavproxy.expect(r"Loaded 1 rally items from ([^\s]*)\s")
mavproxy.expect("Sent all .* rally items")
downloaded_items = self.download_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
if len(downloaded_items) != 1:
raise NotAchievedException("Unexpected item count (%u)" % len(downloaded_items))
if abs(int(downloaded_items[0].x) - int(lat * 1e7)) > 3:
raise NotAchievedException("Expected lat=%d got=%d" %
(lat * 1e7, downloaded_items[0].x))
if abs(int(downloaded_items[0].y) - int(lng * 1e7)) > 10:
raise NotAchievedException("Expected lng=%d got=%d" %
(lng * 1e7, downloaded_items[0].y))
self.end_subsubtest("rally load")
self.start_subsubtest("rally changealt")
mavproxy.send('rally clear\n')
mavproxy.send("click 1.0 1.0\n")
mavproxy.send("rally add\n")
mavproxy.send("click 2.0 2.0\n")
mavproxy.send("rally add\n")
self.delay_sim_time(2)
self.assert_mission_count_on_link(
self.mav,
2,
target_system,
target_component,
mavutil.mavlink.MAV_MISSION_TYPE_RALLY,
)
self.drain_mav()
mavproxy.send("rally changealt 1 17.6\n")
self.wait_heartbeat()
self.wait_heartbeat()
mavproxy.send("rally changealt 2 19.1\n")
self.delay_sim_time(2)
downloaded_items = self.download_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
if len(downloaded_items) != 2:
raise NotAchievedException("Unexpected item count (%u)" % len(downloaded_items))
if abs(int(downloaded_items[0].x) - int(1 * 1e7)) > 3:
raise NotAchievedException("Expected lat=%d got=%d" % (1 * 1e7, downloaded_items[0].x))
if abs(int(downloaded_items[0].y) - int(1 * 1e7)) > 10:
raise NotAchievedException("Expected lng=%d got=%d" % (1 * 1e7, downloaded_items[0].y))
if abs(downloaded_items[0].z - int(17.6)) > 0.0001:
raise NotAchievedException("Expected alt=%f got=%f" % (17.6, downloaded_items[0].z))
if abs(int(downloaded_items[1].x) - int(2 * 1e7)) > 3:
raise NotAchievedException("Expected lat=%d got=%d" % (2 * 1e7, downloaded_items[0].x))
if abs(int(downloaded_items[1].y) - int(2 * 1e7)) > 10:
raise NotAchievedException("Expected lng=%d got=%d" % (2 * 1e7, downloaded_items[0].y))
if abs(downloaded_items[1].z - int(19.1)) > 0.0001:
raise NotAchievedException("Expected alt=%f got=%f" % (19.1, downloaded_items[1].z))
self.progress("Now change two at once")
mavproxy.send("rally changealt 1 17.3 2\n")
self.delay_sim_time(2)
mavproxy.send('rally list\n')
self.drain_mav()
self.wait_heartbeat()
downloaded_items = self.download_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
if len(downloaded_items) != 2:
raise NotAchievedException("Unexpected item count (%u)" % len(downloaded_items))
if abs(int(downloaded_items[0].x) - int(1 * 1e7)) > 3:
raise NotAchievedException("Expected lat=%d got=%d" % (1 * 1e7, downloaded_items[0].x))
if abs(int(downloaded_items[0].y) - int(1 * 1e7)) > 10:
raise NotAchievedException("Expected lng=%d got=%d" % (1 * 1e7, downloaded_items[0].y))
if abs(int(downloaded_items[0].z) - int(17.3)) > 0.0001:
raise NotAchievedException("Expected alt=%f got=%f" % (17.3, downloaded_items[0].z))
if abs(int(downloaded_items[1].x) - int(2 * 1e7)) > 3:
raise NotAchievedException("Expected lat=%d got=%d" % (2 * 1e7, downloaded_items[0].x))
if abs(int(downloaded_items[1].y) - int(2 * 1e7)) > 10:
raise NotAchievedException("Expected lng=%d got=%d" % (2 * 1e7, downloaded_items[0].y))
if abs(int(downloaded_items[1].z) - int(17.3)) > 0.0001:
raise NotAchievedException("Expected alt=%f got=%f" % (17.3, downloaded_items[0].z))
self.end_subsubtest("rally changealt")
self.start_subsubtest("rally move")
mavproxy.send('rally clear\n')
mavproxy.send("click 1.0 1.0\n")
mavproxy.send("rally add\n")
mavproxy.send("click 2.0 2.0\n")
mavproxy.send("rally add\n")
self.delay_sim_time(2)
self.assert_mission_count_on_link(
self.mav,
2,
target_system,
target_component,
mavutil.mavlink.MAV_MISSION_TYPE_RALLY,
)
mavproxy.send("click 3.0 3.0\n")
mavproxy.send("rally move 2\n")
self.delay_sim_time(2)
mavproxy.send("click 4.12345 4.987654\n")
mavproxy.send("rally move 1\n")
self.delay_sim_time(2)
downloaded_items = self.download_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
if len(downloaded_items) != 2:
raise NotAchievedException("Unexpected item count (%u)" % len(downloaded_items))
if downloaded_items[0].x != 41234500:
raise NotAchievedException("Bad latitude")
if downloaded_items[0].y != 49876540:
raise NotAchievedException("Bad longitude")
if downloaded_items[0].z != rallyalt:
raise NotAchievedException("Bad altitude (want=%u got=%u)" %
(90, downloaded_items[0].z))
if downloaded_items[1].x != 30000000:
raise NotAchievedException("Bad latitude")
if downloaded_items[1].y != 30000000:
raise NotAchievedException("Bad longitude")
if downloaded_items[1].z != rallyalt:
raise NotAchievedException("Bad altitude (want=%u got=%u)" %
(rallyalt, downloaded_items[1].z))
self.end_subsubtest("rally move")
self.start_subsubtest("rally remove")
self.click_three_in(mavproxy, target_system=target_system, target_component=target_component)
pure_items = self.download_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
self.progress("Removing last in list")
mavproxy.send("rally remove 3\n")
self.delay_sim_time(2)
self.assert_mission_count_on_link(
self.mav,
2,
target_system,
target_component,
mavutil.mavlink.MAV_MISSION_TYPE_RALLY,
)
fewer_downloaded_items = self.download_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
if len(fewer_downloaded_items) != 2:
raise NotAchievedException("Unexpected download list length")
shorter_items = copy.copy(pure_items)
shorter_items = shorter_items[0:2]
self.check_rally_items_same(shorter_items, fewer_downloaded_items)
self.progress("Removing first in list")
mavproxy.send("rally remove 1\n")
self.delay_sim_time(2)
self.assert_mission_count_on_link(
self.mav,
1,
target_system,
target_component,
mavutil.mavlink.MAV_MISSION_TYPE_RALLY,
)
fewer_downloaded_items = self.download_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
if len(fewer_downloaded_items) != 1:
raise NotAchievedException("Unexpected download list length")
shorter_items = shorter_items[1:]
self.check_rally_items_same(shorter_items, fewer_downloaded_items)
self.progress("Removing remaining item")
mavproxy.send("rally remove 1\n")
self.delay_sim_time(2)
self.assert_mission_count_on_link(
self.mav,
0,
target_system,
target_component,
mavutil.mavlink.MAV_MISSION_TYPE_RALLY,
)
self.end_subsubtest("rally remove")
self.start_subsubtest("rally show")
mavproxy.send("rally show %s\n" % save_tmppath)
self.end_subsubtest("rally show")
self.start_subsubtest("rally savelocal")
util.pexpect_drain(mavproxy)
savelocal_path = self.buildlogs_path("rally-testing-tmp-local.txt")
mavproxy.send('rally savelocal %s\n' % savelocal_path)
self.delay_sim_time(2)
self.assert_rally_filepath_content(savelocal_path, '''QGC WPL 110
0 0 3 5100 0.000000 0.000000 0.000000 0.000000 -5.678900 98.234100 87.000000 0
''')
self.end_subsubtest("rally savelocal")
self.start_subsubtest("rally status")
self.click_three_in(mavproxy, target_system=target_system, target_component=target_component)
mavproxy.send("rally status\n")
mavproxy.expect("Have 3 of 3 rally items")
mavproxy.send("rally clear\n")
mavproxy.send("rally status\n")
mavproxy.expect("Have 0 of 0 rally items")
self.end_subsubtest("rally status")
self.start_subsubtest("rally undo")
self.progress("Testing undo-remove")
self.click_three_in(mavproxy, target_system=target_system, target_component=target_component)
pure_items = self.download_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
self.progress("Removing first in list")
mavproxy.send("rally remove 1\n")
self.delay_sim_time(2)
self.assert_mission_count_on_link(
self.mav,
2,
target_system,
target_component,
mavutil.mavlink.MAV_MISSION_TYPE_RALLY,
)
mavproxy.send("rally undo\n")
self.delay_sim_time(2)
undone_items = self.download_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
self.check_rally_items_same(pure_items, undone_items)
self.progress("Testing undo-move")
pure_items = self.download_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
mavproxy.send("click 4.12345 4.987654\n")
mavproxy.send("rally move 1\n")
self.delay_sim_time(2)
mavproxy.send("rally undo\n")
self.delay_sim_time(2)
undone_items = self.download_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
self.check_rally_items_same(pure_items, undone_items)
self.end_subsubtest("rally undo")
self.stop_mavproxy(mavproxy)
def RallyUploadDownload(self, target_system=1, target_component=1):
'''Upload and download of rally'''
old_srcSystem = self.mav.mav.srcSystem
self.drain_mav()
try:
item1_lat = int(2.0000 * 1e7)
items = [
self.mav.mav.mission_item_int_encode(
target_system,
target_component,
0,
mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT,
mavutil.mavlink.MAV_CMD_NAV_RALLY_POINT,
0,
0,
0,
0,
0,
0,
int(1.0000 * 1e7),
int(1.0000 * 1e7),
31.0000,
mavutil.mavlink.MAV_MISSION_TYPE_RALLY),
self.mav.mav.mission_item_int_encode(
target_system,
target_component,
1,
mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT,
mavutil.mavlink.MAV_CMD_NAV_RALLY_POINT,
0,
0,
0,
0,
0,
0,
item1_lat,
int(2.0000 * 1e7),
32.0000,
mavutil.mavlink.MAV_MISSION_TYPE_RALLY),
self.mav.mav.mission_item_int_encode(
target_system,
target_component,
2,
mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT,
mavutil.mavlink.MAV_CMD_NAV_RALLY_POINT,
0,
0,
0,
0,
0,
0,
int(3.0000 * 1e7),
int(3.0000 * 1e7),
33.0000,
mavutil.mavlink.MAV_MISSION_TYPE_RALLY),
]
self.upload_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_RALLY,
items)
downloaded = self.download_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
print("Got items (%s)" % str(items))
if len(downloaded) != len(items):
raise NotAchievedException(
"Did not download correct number of items want=%u got=%u" %
(len(downloaded), len(items)))
rally_total = self.get_parameter("RALLY_TOTAL")
if rally_total != len(downloaded):
raise NotAchievedException(
"Unexpected rally point count: want=%u got=%u" %
(len(items), rally_total))
self.progress("Pruning count by setting parameter (urgh)")
self.set_parameter("RALLY_TOTAL", 2)
downloaded = self.download_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
if len(downloaded) != 2:
raise NotAchievedException(
"Failed to prune rally points by setting parameter. want=%u got=%u" %
(2, len(downloaded)))
self.progress("Uploading a third item using old protocol")
new_item2_lat = int(6.0 * 1e7)
self.set_parameter("RALLY_TOTAL", 3)
self.mav.mav.rally_point_send(target_system,
target_component,
2,
3,
new_item2_lat,
int(7.0 * 1e7),
15,
0,
0,
0)
downloaded = self.download_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
if len(downloaded) != 3:
raise NotAchievedException(
"resetting rally point count didn't change items returned")
if downloaded[2].x != new_item2_lat:
raise NotAchievedException(
"Bad latitude in downloaded item: want=%u got=%u" %
(new_item2_lat, downloaded[2].x))
self.progress("Grabbing original item 1 using original protocol")
self.mav.mav.rally_fetch_point_send(target_system,
target_component,
1)
m = self.assert_receive_message("RALLY_POINT")
if m.target_system != self.mav.source_system:
raise NotAchievedException(
"Bad target_system on received rally point (want=%u got=%u)" %
(255, m.target_system))
if m.target_component != self.mav.source_component:
raise NotAchievedException("Bad target_component on received rally point")
if m.lat != item1_lat:
raise NotAchievedException("Bad latitude on received rally point")
self.start_subtest("Test upload lockout and timeout")
self.progress("Starting upload from normal sysid")
self.mav.mav.mission_count_send(target_system,
target_component,
len(items),
mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
self.drain_mav()
self.mav.mav.srcSystem = 243
self.progress("Attempting upload from sysid=%u" %
(self.mav.mav.srcSystem,))
self.mav.mav.mission_count_send(target_system,
target_component,
len(items),
mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY,
want_type=mavutil.mavlink.MAV_MISSION_DENIED)
self.progress("Attempting download from sysid=%u" %
(self.mav.mav.srcSystem,))
self.mav.mav.mission_request_list_send(target_system,
target_component,
mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY,
want_type=mavutil.mavlink.MAV_MISSION_DENIED)
tstart = self.get_sim_time()
got_statustext = False
got_ack = False
while True:
if got_statustext and got_ack:
self.progress("Got both ack and statustext")
break
if self.get_sim_time_cached() - tstart > 100:
raise NotAchievedException("Did not get both ack and statustext")
m = self.mav.recv_match(type=['STATUSTEXT', 'MISSION_ACK'],
blocking=True,
timeout=1)
if m is None:
continue
self.progress("Got (%s)" % str(m))
if m.get_type() == 'STATUSTEXT':
if "upload timeout" in m.text:
got_statustext = True
self.progress("Received desired statustext")
continue
if m.get_type() == 'MISSION_ACK':
if m.target_system != old_srcSystem:
raise NotAchievedException("Incorrect sourcesystem")
if m.type != mavutil.mavlink.MAV_MISSION_OPERATION_CANCELLED:
raise NotAchievedException("Incorrect result")
if m.mission_type != mavutil.mavlink.MAV_MISSION_TYPE_RALLY:
raise NotAchievedException("Incorrect mission_type")
got_ack = True
self.progress("Received desired ACK")
continue
raise NotAchievedException("Huh?")
self.progress("Now trying to upload empty mission after timeout")
self.mav.mav.mission_count_send(target_system,
target_component,
0,
mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
self.drain_mav()
self.start_subtest("Check rally upload/download across separate links")
self.upload_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_RALLY,
items)
self.progress("ensure a mavlink1 connection can't do anything useful with new item types")
self.set_parameter("SERIAL2_PROTOCOL", 1)
self.reboot_sitl()
mav2 = mavutil.mavlink_connection("tcp:localhost:5763",
robust_parsing=True,
source_system=7,
source_component=7)
mav2.mav.mission_request_list_send(target_system,
target_component,
mission_type=mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
self.assert_receive_mission_ack(
mavutil.mavlink.MAV_MISSION_TYPE_MISSION,
want_type=mavutil.mavlink.MAV_MISSION_UNSUPPORTED,
mav=mav2,
)
self.set_parameter("SERIAL2_PROTOCOL", 2)
expected_count = 3
self.progress("Assert mission count on new link")
self.assert_mission_count_on_link(
mav2,
expected_count,
target_system,
target_component,
mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
self.progress("Assert mission count on original link")
self.assert_mission_count_on_link(
self.mav,
expected_count,
target_system,
target_component,
mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
self.progress("Get first item on new link")
def drain_self_mav_fn():
self.drain_mav(self.mav)
m2 = self.get_mission_item_int_on_link(
2,
mav2,
target_system,
target_component,
mavutil.mavlink.MAV_MISSION_TYPE_RALLY,
delay_fn=drain_self_mav_fn)
self.progress("Get first item on original link")
m = self.get_mission_item_int_on_link(
2,
self.mav,
target_system,
target_component,
mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
if m2.x != m.x:
raise NotAchievedException("mission items do not match (%d vs %d)" % (m2.x, m.x))
self.get_mission_item_on_link(2, self.mav, target_system, target_component, mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
self.mav.mav.mission_request_send(target_system,
target_component,
65,
mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
self.assert_receive_mission_ack(
mavutil.mavlink.MAV_MISSION_TYPE_RALLY,
want_type=mavutil.mavlink.MAV_MISSION_INVALID_SEQUENCE,
)
self.mav.mav.mission_request_int_send(target_system,
target_component,
65,
mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
self.assert_receive_mission_ack(
mavutil.mavlink.MAV_MISSION_TYPE_RALLY,
want_type=mavutil.mavlink.MAV_MISSION_INVALID_SEQUENCE,
)
self.start_subtest("Should enforce items come from correct GCS")
self.drain_mav(unparsed=True)
self.mav.mav.mission_count_send(target_system,
target_component,
1,
mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
self.assert_receive_mission_item_request(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, 0)
self.progress("Attempting to upload from bad sysid")
old_sysid = self.mav.mav.srcSystem
self.mav.mav.srcSystem = 17
items[0].pack(self.mav.mav)
self.drain_mav(unparsed=True)
self.mav.mav.send(items[0])
self.mav.mav.srcSystem = old_sysid
self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY,
want_type=mavutil.mavlink.MAV_MISSION_DENIED,
target_system=17)
self.progress("Sending from correct sysid")
items[0].pack(self.mav.mav)
self.drain_mav(unparsed=True)
self.mav.mav.send(items[0])
self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
self.drain_mav()
self.drain_all_pexpects()
self.start_subtest("Attempt to send item on different link to that which we are sending requests on")
self.progress("Sending count")
self.drain_mav(unparsed=True)
self.mav.mav.mission_count_send(target_system,
target_component,
2,
mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
self.assert_receive_mission_item_request(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, 0)
old_mav2_system = mav2.mav.srcSystem
old_mav2_component = mav2.mav.srcComponent
mav2.mav.srcSystem = self.mav.mav.srcSystem
mav2.mav.srcComponent = self.mav.mav.srcComponent
self.progress("Sending item on second link")
items[0].pack(mav2.mav)
self.drain_mav(mav=self.mav, unparsed=True)
mav2.mav.send(items[0])
mav2.mav.srcSystem = old_mav2_system
mav2.mav.srcComponent = old_mav2_component
m = self.assert_receive_message('MISSION_REQUEST')
if m.mission_type != mavutil.mavlink.MAV_MISSION_TYPE_RALLY:
raise NotAchievedException("Mission request of incorrect type")
if m.seq != 1:
raise NotAchievedException("Unexpected sequence number (expected=%u got=%u)" % (1, m.seq))
items[1].pack(self.mav.mav)
self.drain_mav(unparsed=True)
self.mav.mav.send(items[1])
self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
self.drain_mav()
self.drain_all_pexpects()
self.start_subtest("Upload mission and rally points at same time")
self.progress("Sending rally count")
self.drain_mav(unparsed=True)
self.mav.mav.mission_count_send(target_system,
target_component,
3,
mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
self.assert_receive_mission_item_request(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, 0)
self.progress("Sending wp count")
self.mav.mav.mission_count_send(target_system,
target_component,
3,
mavutil.mavlink.MAV_MISSION_TYPE_MISSION)
self.assert_receive_mission_item_request(mavutil.mavlink.MAV_MISSION_TYPE_MISSION, 0)
self.progress("Answering request for mission item 0")
self.drain_mav(mav=self.mav, unparsed=True)
self.mav.mav.mission_item_int_send(
target_system,
target_component,
0,
mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT,
mavutil.mavlink.MAV_CMD_NAV_WAYPOINT,
0,
0,
0,
0,
0,
0,
int(1.1000 * 1e7),
int(1.2000 * 1e7),
321.0000,
mavutil.mavlink.MAV_MISSION_TYPE_MISSION),
self.assert_receive_mission_item_request(mavutil.mavlink.MAV_MISSION_TYPE_MISSION, 1)
self.progress("Answering request for rally point 0")
items[0].pack(self.mav.mav)
self.drain_mav(unparsed=True)
self.mav.mav.send(items[0])
self.progress("Expecting request for rally item 1")
self.assert_receive_mission_item_request(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, 1)
self.progress("Answering request for rally point 1")
items[1].pack(self.mav.mav)
self.drain_mav(unparsed=True)
self.mav.mav.send(items[1])
self.progress("Expecting request for rally item 2")
self.assert_receive_mission_item_request(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, 2)
self.progress("Answering request for rally point 2")
items[2].pack(self.mav.mav)
self.drain_mav(unparsed=True)
self.mav.mav.send(items[2])
self.progress("Expecting mission ack for rally")
self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
self.progress("Answering request for waypoints item 1")
self.drain_mav(unparsed=True)
self.mav.mav.mission_item_int_send(
target_system,
target_component,
1,
mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT,
mavutil.mavlink.MAV_CMD_NAV_WAYPOINT,
0,
0,
0,
0,
0,
0,
int(1.1000 * 1e7),
int(1.2000 * 1e7),
321.0000,
mavutil.mavlink.MAV_MISSION_TYPE_MISSION),
self.assert_receive_mission_item_request(mavutil.mavlink.MAV_MISSION_TYPE_MISSION, 2)
self.progress("Answering request for waypoints item 2")
self.drain_mav(unparsed=True)
self.mav.mav.mission_item_int_send(
target_system,
target_component,
2,
mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT,
mavutil.mavlink.MAV_CMD_NAV_WAYPOINT,
0,
0,
0,
0,
0,
0,
int(1.1000 * 1e7),
int(1.2000 * 1e7),
321.0000,
mavutil.mavlink.MAV_MISSION_TYPE_MISSION),
self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_MISSION)
self.start_subtest("Test write-partial-list")
self.progress("Clearing rally points using count-send")
self.clear_mission(mavutil.mavlink.MAV_MISSION_TYPE_RALLY,
target_system=target_system,
target_component=target_component)
self.progress("Should not be able to set items completely past the waypoint count")
self.upload_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_RALLY,
items)
self.drain_mav(unparsed=True)
self.mav.mav.mission_write_partial_list_send(
target_system,
target_component,
17,
20,
mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY,
want_type=mavutil.mavlink.MAV_MISSION_ERROR)
self.progress("Should not be able to set items overlapping the waypoint count")
self.drain_mav(unparsed=True)
self.mav.mav.mission_write_partial_list_send(
target_system,
target_component,
0,
20,
mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY,
want_type=mavutil.mavlink.MAV_MISSION_ERROR)
self.progress("try to overwrite items 1 and 2")
self.drain_mav(unparsed=True)
self.mav.mav.mission_write_partial_list_send(
target_system,
target_component,
1,
2,
mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
self.assert_receive_mission_item_request(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, 1)
self.progress("Try shoving up an incorrectly sequenced item")
self.drain_mav(unparsed=True)
self.mav.mav.mission_item_int_send(
target_system,
target_component,
0,
mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT,
mavutil.mavlink.MAV_CMD_NAV_RALLY_POINT,
0,
0,
0,
0,
0,
0,
int(1.1000 * 1e7),
int(1.2000 * 1e7),
321.0000,
mavutil.mavlink.MAV_MISSION_TYPE_RALLY),
self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY,
want_type=mavutil.mavlink.MAV_MISSION_INVALID_SEQUENCE)
self.progress("Try shoving up an incorrectly sequenced item (but within band)")
self.drain_mav(unparsed=True)
self.mav.mav.mission_item_int_send(
target_system,
target_component,
2,
mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT,
mavutil.mavlink.MAV_CMD_NAV_RALLY_POINT,
0,
0,
0,
0,
0,
0,
int(1.1000 * 1e7),
int(1.2000 * 1e7),
321.0000,
mavutil.mavlink.MAV_MISSION_TYPE_RALLY),
self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY,
want_type=mavutil.mavlink.MAV_MISSION_INVALID_SEQUENCE)
self.progress("Now provide correct item")
item1_latitude = int(1.2345 * 1e7)
self.drain_mav(unparsed=True)
self.mav.mav.mission_item_int_send(
target_system,
target_component,
1,
mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT,
mavutil.mavlink.MAV_CMD_NAV_RALLY_POINT,
0,
0,
0,
0,
0,
0,
item1_latitude,
int(1.2000 * 1e7),
321.0000,
mavutil.mavlink.MAV_MISSION_TYPE_RALLY),
self.assert_receive_mission_item_request(mavutil.mavlink.MAV_MISSION_TYPE_RALLY, 2)
self.progress("Answering request for rally point 2")
items[2].pack(self.mav.mav)
self.drain_mav(unparsed=True)
self.mav.mav.send(items[2])
self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
self.progress("TODO: ensure partial mission write was good")
self.start_subtest("clear mission types")
self.assert_mission_count_on_link(
self.mav,
3,
target_system,
target_component,
mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
self.assert_mission_count_on_link(
self.mav,
3,
target_system,
target_component,
mavutil.mavlink.MAV_MISSION_TYPE_MISSION)
self.drain_mav(unparsed=True)
self.mav.mav.mission_clear_all_send(target_system, target_component, mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
self.assert_mission_count_on_link(
self.mav,
0,
target_system,
target_component,
mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
self.assert_mission_count_on_link(
self.mav,
3,
target_system,
target_component,
mavutil.mavlink.MAV_MISSION_TYPE_MISSION)
self.drain_mav(unparsed=True)
self.mav.mav.mission_clear_all_send(target_system, target_component, mavutil.mavlink.MAV_MISSION_TYPE_MISSION)
self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_MISSION)
self.assert_mission_count_on_link(
self.mav,
0,
target_system,
target_component,
mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
self.assert_mission_count_on_link(
self.mav,
0,
target_system,
target_component,
mavutil.mavlink.MAV_MISSION_TYPE_MISSION)
self.start_subtest("try sending out-of-range counts")
self.drain_mav(unparsed=True)
self.mav.mav.mission_count_send(target_system,
target_component,
1,
230)
self.assert_receive_mission_ack(230,
want_type=mavutil.mavlink.MAV_MISSION_UNSUPPORTED)
self.drain_mav(unparsed=True)
self.mav.mav.mission_count_send(target_system,
target_component,
16000,
mavutil.mavlink.MAV_MISSION_TYPE_RALLY)
self.assert_receive_mission_ack(mavutil.mavlink.MAV_MISSION_TYPE_RALLY,
want_type=mavutil.mavlink.MAV_MISSION_NO_SPACE)
except Exception as e:
self.progress("Received exception (%s)" % self.get_exception_stacktrace(e))
self.mav.mav.srcSystem = old_srcSystem
raise e
self.reboot_sitl()
def ClearMission(self, target_system=1, target_component=1):
'''check mission clearing'''
self.start_subtest("Clear via mission_clear_all message")
self.upload_simple_relhome_mission([
(mavutil.mavlink.MAV_CMD_NAV_WAYPOINT, 20, 0, 20),
(mavutil.mavlink.MAV_CMD_NAV_WAYPOINT, 20, 0, 20),
(mavutil.mavlink.MAV_CMD_NAV_WAYPOINT, 20, 0, 20),
(mavutil.mavlink.MAV_CMD_NAV_WAYPOINT, 20, 0, 20),
(mavutil.mavlink.MAV_CMD_NAV_WAYPOINT, 20, 0, 20),
])
self.set_current_waypoint(3)
self.mav.mav.mission_clear_all_send(
target_system,
target_component,
mavutil.mavlink.MAV_MISSION_TYPE_MISSION
)
self.assert_current_waypoint(0)
self.drain_mav()
self.start_subtest("No clear mission while it is being uploaded by a different node")
mav2 = mavutil.mavlink_connection("tcp:localhost:5763",
robust_parsing=True,
source_system=7,
source_component=7)
self.context_push()
self.context_collect("MISSION_REQUEST")
mav2.mav.mission_count_send(target_system,
target_component,
17,
mavutil.mavlink.MAV_MISSION_TYPE_MISSION)
ack = self.assert_receive_message('MISSION_REQUEST', check_context=True, mav=mav2)
self.context_pop()
self.context_push()
self.context_collect("MISSION_ACK")
self.mav.mav.mission_clear_all_send(
target_system,
target_component,
mavutil.mavlink.MAV_MISSION_TYPE_MISSION
)
ack = self.assert_receive_message('MISSION_ACK', check_context=True)
self.assert_message_field_values(ack, {
"type": mavutil.mavlink.MAV_MISSION_DENIED,
})
self.context_pop()
self.progress("Test cancel upload from second connection")
self.context_push()
self.context_collect("MISSION_ACK")
mav2.mav.mission_clear_all_send(
target_system,
target_component,
mavutil.mavlink.MAV_MISSION_TYPE_MISSION
)
ack = self.assert_receive_message('MISSION_ACK', mav=mav2, check_context=True)
self.assert_message_field_values(ack, {
"type": mavutil.mavlink.MAV_MISSION_ACCEPTED,
})
self.context_pop()
mav2.close()
del mav2
def GCSMission(self):
'''check MAVProxy's waypoint handling of missions'''
self.reboot_sitl()
target_system = 1
target_component = 1
mavproxy = self.start_mavproxy()
self.wait_parameter_value('MIS_TOTAL', 1)
mavproxy.send('wp clear\n')
self.wait_parameter_value('MIS_TOTAL', 0)
self.assert_received_message_field_values('MISSION_CURRENT', {
"seq": 0,
}, timeout=5, verbose=True)
self.load_mission_using_mavproxy(mavproxy, "rover-gripper-mission.txt")
set_wp = 1
mavproxy.send('wp set %u\n' % set_wp)
self.wait_current_waypoint(set_wp)
self.start_subsubtest("wp changealt")
downloaded_items = self.download_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_MISSION)
changealt_item = 1
want_newalt = 37.2
mavproxy.send('wp changealt %u %f\n' % (changealt_item, want_newalt))
self.delay_sim_time(15)
downloaded_items = self.download_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_MISSION)
if abs(downloaded_items[changealt_item].z - want_newalt) > 0.0001:
raise NotAchievedException(
"changealt didn't (want=%f got=%f)" %
(want_newalt, downloaded_items[changealt_item].z))
self.end_subsubtest("wp changealt")
self.start_subsubtest("wp sethome")
new_home_lat = 3.14159
new_home_lng = 2.71828
mavproxy.send('click %f %f\n' % (new_home_lat, new_home_lng))
mavproxy.send('wp sethome\n')
self.delay_sim_time(5)
self.end_subsubtest("wp sethome")
self.start_subsubtest("wp slope")
mavproxy.send('wp slope\n')
mavproxy.expect("WP3: slope 0.1")
self.delay_sim_time(5)
self.end_subsubtest("wp slope")
if not self.mavproxy_can_do_mision_item_protocols():
return
self.start_subsubtest("wp split")
mavproxy.send("wp clear\n")
self.delay_sim_time(5)
mavproxy.send("wp list\n")
self.delay_sim_time(5)
items = [
None,
self.mav.mav.mission_item_int_encode(
target_system,
target_component,
1,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_WAYPOINT,
0,
0,
0,
0,
0,
0,
int(1.0 * 1e7),
int(1.0 * 1e7),
33.0000,
mavutil.mavlink.MAV_MISSION_TYPE_MISSION),
self.mav.mav.mission_item_int_encode(
target_system,
target_component,
2,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_WAYPOINT,
0,
0,
0,
0,
0,
0,
int(2.0 * 1e7),
int(2.0 * 1e7),
33.0000,
mavutil.mavlink.MAV_MISSION_TYPE_MISSION),
]
mavproxy.send("click 5 5\n")
mavproxy.send("wp add\n")
self.delay_sim_time(5)
self.click_location_from_item(mavproxy, items[1])
mavproxy.send("wp add\n")
self.delay_sim_time(5)
self.click_location_from_item(mavproxy, items[2])
mavproxy.send("wp add\n")
self.delay_sim_time(5)
downloaded_items = self.download_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_MISSION)
self.check_mission_waypoint_items_same(items, downloaded_items)
mavproxy.send("wp split 2\n")
self.delay_sim_time(5)
items_with_split_in = [
items[0],
items[1],
self.mav.mav.mission_item_int_encode(
target_system,
target_component,
2,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_WAYPOINT,
0,
0,
0,
0,
0,
0,
int(1.5 * 1e7),
int(1.5 * 1e7),
33.0000,
mavutil.mavlink.MAV_MISSION_TYPE_MISSION),
items[2],
]
downloaded_items = self.download_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_MISSION)
self.check_mission_waypoint_items_same(items_with_split_in,
downloaded_items)
mavproxy.send('wp clear\n')
self.wait_parameter_value('MIS_TOTAL', 0)
self.stop_mavproxy(mavproxy)
def wait_location_sending_target(self, loc, target_system=1, target_component=1, timeout=60, max_delta=2):
tstart = self.get_sim_time()
last_sent = 0
type_mask = (mavutil.mavlink.POSITION_TARGET_TYPEMASK_VX_IGNORE +
mavutil.mavlink.POSITION_TARGET_TYPEMASK_VY_IGNORE +
mavutil.mavlink.POSITION_TARGET_TYPEMASK_VZ_IGNORE +
mavutil.mavlink.POSITION_TARGET_TYPEMASK_AX_IGNORE +
mavutil.mavlink.POSITION_TARGET_TYPEMASK_AY_IGNORE +
mavutil.mavlink.POSITION_TARGET_TYPEMASK_AZ_IGNORE +
mavutil.mavlink.POSITION_TARGET_TYPEMASK_YAW_IGNORE +
mavutil.mavlink.POSITION_TARGET_TYPEMASK_YAW_RATE_IGNORE)
self.change_mode('GUIDED')
tstart = self.get_sim_time()
while True:
now = self.get_sim_time_cached()
if now - tstart > timeout:
raise AutoTestTimeoutException("Did not get to location")
if now - last_sent > 10:
last_sent = now
self.mav.mav.set_position_target_global_int_send(
0,
target_system,
target_component,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
type_mask,
int(loc.lat * 1.0e7),
int(loc.lng * 1.0e7),
0,
0,
0,
0,
0,
0,
0,
0,
0,
)
m = self.mav.recv_match(blocking=True,
timeout=1)
if m is None:
continue
t = m.get_type()
if t == "POSITION_TARGET_GLOBAL_INT":
self.progress("Target: (%s)" % str(m), send_statustext=False)
elif t == "GLOBAL_POSITION_INT":
self.progress("Position: (%s)" % str(m), send_statustext=False)
delta = self.get_distance(
mavutil.location(m.lat * 1e-7, m.lon * 1e-7, 0, 0),
loc)
self.progress("delta: %s" % str(delta), send_statustext=False)
if delta < max_delta:
self.progress("Reached destination")
def drive_to_location(self, loc, tolerance=1, timeout=30, target_system=1, target_component=1):
self.assert_mode('GUIDED')
type_mask = (mavutil.mavlink.POSITION_TARGET_TYPEMASK_VX_IGNORE +
mavutil.mavlink.POSITION_TARGET_TYPEMASK_VY_IGNORE +
mavutil.mavlink.POSITION_TARGET_TYPEMASK_VZ_IGNORE +
mavutil.mavlink.POSITION_TARGET_TYPEMASK_AX_IGNORE +
mavutil.mavlink.POSITION_TARGET_TYPEMASK_AY_IGNORE +
mavutil.mavlink.POSITION_TARGET_TYPEMASK_AZ_IGNORE +
mavutil.mavlink.POSITION_TARGET_TYPEMASK_YAW_IGNORE +
mavutil.mavlink.POSITION_TARGET_TYPEMASK_YAW_RATE_IGNORE)
last_sent = 0
tstart = self.get_sim_time()
while True:
now = self.get_sim_time_cached()
if now - tstart > timeout:
raise NotAchievedException("Did not get to location")
if now - last_sent > 10:
last_sent = now
self.mav.mav.set_position_target_global_int_send(
0,
target_system,
target_component,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
type_mask,
int(loc.lat * 1.0e7),
int(loc.lng * 1.0e7),
0,
0,
0,
0,
0,
0,
0,
0,
0,
)
if self.get_distance(self.mav.location(), loc) > tolerance:
continue
return
def drive_somewhere_breach_boundary_and_rtl(self, loc, target_system=1, target_component=1, timeout=60):
tstart = self.get_sim_time()
last_sent = 0
seen_fence_breach = False
type_mask = (mavutil.mavlink.POSITION_TARGET_TYPEMASK_VX_IGNORE +
mavutil.mavlink.POSITION_TARGET_TYPEMASK_VY_IGNORE +
mavutil.mavlink.POSITION_TARGET_TYPEMASK_VZ_IGNORE +
mavutil.mavlink.POSITION_TARGET_TYPEMASK_AX_IGNORE +
mavutil.mavlink.POSITION_TARGET_TYPEMASK_AY_IGNORE +
mavutil.mavlink.POSITION_TARGET_TYPEMASK_AZ_IGNORE +
mavutil.mavlink.POSITION_TARGET_TYPEMASK_YAW_IGNORE +
mavutil.mavlink.POSITION_TARGET_TYPEMASK_YAW_RATE_IGNORE)
self.change_mode('GUIDED')
while True:
now = self.get_sim_time_cached()
if now - tstart > timeout:
raise NotAchievedException("Did not breach boundary + RTL")
if now - last_sent > 10:
last_sent = now
self.mav.mav.set_position_target_global_int_send(
0,
target_system,
target_component,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
type_mask,
int(loc.lat * 1.0e7),
int(loc.lng * 1.0e7),
0,
0,
0,
0,
0,
0,
0,
0,
0,
)
m = self.mav.recv_match(blocking=True,
timeout=1)
if m is None:
continue
t = m.get_type()
if t == "POSITION_TARGET_GLOBAL_INT":
self.progress("Target: (%s)" % str(m))
elif t == "GLOBAL_POSITION_INT":
self.progress("Position: (%s)" % str(m))
elif t == "FENCE_STATUS":
self.progress("Fence: %s" % str(m))
if m.breach_status != 0:
seen_fence_breach = True
self.progress("Fence breach detected!")
if m.breach_type != mavutil.mavlink.FENCE_BREACH_BOUNDARY:
raise NotAchievedException("Breach of unexpected type")
if self.mode_is("RTL", cached=True) and seen_fence_breach:
break
self.wait_distance_to_home(3, 7, timeout=30)
def drive_somewhere_stop_at_boundary(self,
loc,
expected_stopping_point,
expected_distance_epsilon=1.0,
target_system=1,
target_component=1,
timeout=120):
tstart = self.get_sim_time()
last_sent = 0
type_mask = (mavutil.mavlink.POSITION_TARGET_TYPEMASK_VX_IGNORE +
mavutil.mavlink.POSITION_TARGET_TYPEMASK_VY_IGNORE +
mavutil.mavlink.POSITION_TARGET_TYPEMASK_VZ_IGNORE +
mavutil.mavlink.POSITION_TARGET_TYPEMASK_AX_IGNORE +
mavutil.mavlink.POSITION_TARGET_TYPEMASK_AY_IGNORE +
mavutil.mavlink.POSITION_TARGET_TYPEMASK_AZ_IGNORE +
mavutil.mavlink.POSITION_TARGET_TYPEMASK_YAW_IGNORE +
mavutil.mavlink.POSITION_TARGET_TYPEMASK_YAW_RATE_IGNORE)
self.change_mode('GUIDED')
at_stopping_point = False
while True:
now = self.get_sim_time_cached()
if now - tstart > timeout:
raise NotAchievedException("Did not arrive and stop at boundary")
if now - last_sent > 10:
last_sent = now
self.mav.mav.set_position_target_global_int_send(
0,
target_system,
target_component,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
type_mask,
int(loc.lat * 1.0e7),
int(loc.lng * 1.0e7),
0,
0,
0,
0,
0,
0,
0,
0,
0,
)
m = self.mav.recv_match(blocking=True,
timeout=1)
if m is None:
continue
t = m.get_type()
if t == "POSITION_TARGET_GLOBAL_INT":
print("Target: (%s)" % str(m))
elif t == "GLOBAL_POSITION_INT":
print("Position: (%s)" % str(m))
delta = self.get_distance(
mavutil.location(m.lat * 1e-7, m.lon * 1e-7, 0, 0),
mavutil.location(expected_stopping_point.lat,
expected_stopping_point.lng,
0,
0))
print("delta: %s want_delta<%f" % (str(delta), expected_distance_epsilon))
at_stopping_point = delta < expected_distance_epsilon
elif t == "VFR_HUD":
print("groundspeed: %f" % m.groundspeed)
if at_stopping_point:
if m.groundspeed < 1:
self.progress("Seemed to have stopped at stopping point")
return
def assert_fence_breached(self):
m = self.assert_receive_message('FENCE_STATUS', timeout=10)
if m.breach_status != 1:
raise NotAchievedException("Expected to be breached")
def wait_fence_not_breached(self, timeout=5):
tstart = self.get_sim_time()
while True:
if self.get_sim_time_cached() - tstart > timeout:
raise AutoTestTimeoutException("Fence remains breached")
m = self.mav.recv_match(type='FENCE_STATUS',
blocking=True,
timeout=1)
if m is None:
self.progress("No FENCE_STATUS received")
continue
self.progress("STATUS: %s" % str(m))
if m.breach_status == 0:
break
def test_poly_fence_noarms(self, target_system=1, target_component=1):
'''various tests to ensure we can't arm when in breach of a polyfence'''
self.start_subtest("Ensure PolyFence arming checks work")
self.clear_mission(mavutil.mavlink.MAV_MISSION_TYPE_FENCE,
target_system=target_system,
target_component=target_component)
self.set_parameters({
"FENCE_TYPE": 2,
})
self.delay_sim_time(5)
self.progress("Ensure we can arm with no poly in place")
self.change_mode("GUIDED")
self.wait_ready_to_arm()
self.arm_vehicle()
self.disarm_vehicle()
self.set_parameters({
"FENCE_TYPE": 6,
})
self.test_poly_fence_noarms_exclusion_circle(target_system=target_system,
target_component=target_component)
self.test_poly_fence_noarms_inclusion_circle(target_system=target_system,
target_component=target_component)
self.test_poly_fence_noarms_exclusion_polyfence(target_system=target_system,
target_component=target_component)
self.test_poly_fence_noarms_inclusion_polyfence(target_system=target_system,
target_component=target_component)
def test_poly_fence_noarms_exclusion_circle(self, target_system=1, target_component=1):
self.start_subtest("Ensure not armable when within an exclusion circle")
here = self.mav.location()
items = [
self.mav.mav.mission_item_int_encode(
target_system,
target_component,
0,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_FENCE_CIRCLE_EXCLUSION,
0,
0,
5,
0,
0,
0,
int(here.lat * 1e7),
int(here.lng * 1e7),
33.0000,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE),
self.mav.mav.mission_item_int_encode(
target_system,
target_component,
1,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_FENCE_CIRCLE_EXCLUSION,
0,
0,
5,
0,
0,
0,
int(self.offset_location_ne(here, 100, 100).lat * 1e7),
int(here.lng * 1e7),
33.0000,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE),
]
self.upload_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_FENCE,
items)
self.delay_sim_time(5)
self.drain_mav()
self.assert_fence_breached()
try:
self.arm_motors_with_rc_input()
except NotAchievedException:
pass
if self.armed():
raise NotAchievedException(
"Armed when within exclusion zone")
self.upload_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_FENCE,
[])
self.wait_fence_not_breached()
def test_poly_fence_noarms_inclusion_circle(self, target_system=1, target_component=1):
self.start_subtest("Ensure not armable when outside an inclusion circle (but within another")
here = self.mav.location()
items = [
self.mav.mav.mission_item_int_encode(
target_system,
target_component,
0,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_FENCE_CIRCLE_INCLUSION,
0,
0,
5,
0,
0,
0,
int(here.lat * 1e7),
int(here.lng * 1e7),
33.0000,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE),
self.mav.mav.mission_item_int_encode(
target_system,
target_component,
1,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_FENCE_CIRCLE_INCLUSION,
0,
0,
5,
0,
0,
0,
int(self.offset_location_ne(here, 100, 100).lat * 1e7),
int(here.lng * 1e7),
33.0000,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE),
]
self.upload_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_FENCE,
items)
self.delay_sim_time(5)
self.drain_mav()
self.assert_fence_breached()
try:
self.arm_motors_with_rc_input()
except NotAchievedException:
pass
if self.armed():
raise NotAchievedException(
"Armed when outside an inclusion zone")
self.upload_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_FENCE,
[])
self.wait_fence_not_breached()
def test_poly_fence_noarms_exclusion_polyfence(self, target_system=1, target_component=1):
self.start_subtest("Ensure not armable when inside an exclusion polyfence (but outside another")
here = self.mav.location()
self.upload_fences_from_locations([
(mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_EXCLUSION, [
self.offset_location_ne(here, -50, 20),
self.offset_location_ne(here, 50, 20),
self.offset_location_ne(here, 50, 40),
self.offset_location_ne(here, -50, 40),
]),
(mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_EXCLUSION, [
self.offset_location_ne(here, -50, -50),
self.offset_location_ne(here, -50, 50),
self.offset_location_ne(here, 50, 50),
self.offset_location_ne(here, 50, -50),
]),
])
self.delay_sim_time(5)
self.drain_mav()
self.assert_fence_breached()
try:
self.arm_motors_with_rc_input()
except NotAchievedException:
pass
if self.armed():
raise NotAchievedException(
"Armed when within polygon exclusion zone")
self.upload_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_FENCE,
[])
self.wait_fence_not_breached()
def test_poly_fence_noarms_inclusion_polyfence(self, target_system=1, target_component=1):
self.start_subtest("Ensure not armable when outside an inclusion polyfence (but within another")
here = self.mav.location()
self.upload_fences_from_locations([
(mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_INCLUSION, [
self.offset_location_ne(here, -50, 20),
self.offset_location_ne(here, 50, 20),
self.offset_location_ne(here, 50, 40),
self.offset_location_ne(here, -50, 40),
]),
(mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_INCLUSION, [
self.offset_location_ne(here, -50, -50),
self.offset_location_ne(here, -50, 50),
self.offset_location_ne(here, 50, 50),
self.offset_location_ne(here, 50, -50),
]),
])
self.delay_sim_time(5)
self.drain_mav()
self.assert_fence_breached()
try:
self.arm_motors_with_rc_input()
except NotAchievedException:
pass
if self.armed():
raise NotAchievedException(
"Armed when outside polygon inclusion zone")
self.upload_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_FENCE,
[])
self.wait_fence_not_breached()
def test_fence_upload_timeouts_1(self, target_system=1, target_component=1):
self.start_subtest("fence_upload timeouts 1")
self.progress("Telling victim there will be one item coming")
self.mav.mav.mission_count_send(target_system,
target_component,
1,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE)
m = self.assert_receive_message(['MISSION_REQUEST', 'MISSION_ACK'], verbose=True)
if m.get_type() == "MISSION_ACK":
raise NotAchievedException("Expected MISSION_REQUEST")
if m.seq != 0:
raise NotAchievedException("Expected request for seq=0")
if m.target_system != self.mav.mav.srcSystem:
raise NotAchievedException("Incorrect target system in MISSION_REQUEST")
if m.target_component != self.mav.mav.srcComponent:
raise NotAchievedException("Incorrect target component in MISSION_REQUEST")
tstart = self.get_sim_time()
rerequest_count = 0
received_text = False
received_ack = False
while True:
if received_ack and received_text:
break
if self.get_sim_time_cached() - tstart > 10:
raise NotAchievedException("Did not get expected ack and statustext")
m = self.mav.recv_match(type=['MISSION_REQUEST', 'MISSION_ACK', 'STATUSTEXT'],
blocking=True,
timeout=1)
self.progress("Got (%s)" % str(m))
if m is None:
self.progress("Did not receive any messages")
continue
if m.get_type() == "MISSION_REQUEST":
if m.seq != 0:
raise NotAchievedException("Received request for invalid seq")
if m.target_system != self.mav.mav.srcSystem:
raise NotAchievedException("Incorrect target system in MISSION_REQUEST")
if m.target_component != self.mav.mav.srcComponent:
raise NotAchievedException("Incorrect target component in MISSION_REQUEST")
rerequest_count += 1
self.progress("Valid re-request received.")
continue
if m.get_type() == "MISSION_ACK":
if m.mission_type != mavutil.mavlink.MAV_MISSION_TYPE_FENCE:
raise NotAchievedException("Wrong mission type")
if m.type != mavutil.mavlink.MAV_MISSION_OPERATION_CANCELLED:
raise NotAchievedException("Wrong result")
received_ack = True
continue
if m.get_type() == "STATUSTEXT":
if "upload time" in m.text:
received_text = True
continue
if rerequest_count < 3:
raise NotAchievedException("Expected several re-requests of mission item")
self.end_subtest("fence upload timeouts 1")
def expect_request_for_item(self, item):
m = self.assert_receive_message(['MISSION_REQUEST', 'MISSION_ACK'], verbose=-True)
if m.get_type() == "MISSION_ACK":
raise NotAchievedException("Expected MISSION_REQUEST")
if m.seq != item.seq:
raise NotAchievedException("Expected request for seq=%u" % item.seq)
if m.target_system != self.mav.mav.srcSystem:
raise NotAchievedException("Incorrect target system in MISSION_REQUEST")
if m.target_component != self.mav.mav.srcComponent:
raise NotAchievedException("Incorrect target component in MISSION_REQUEST")
def test_fence_upload_timeouts_2(self, target_system=1, target_component=1):
self.start_subtest("fence upload timeouts 2")
self.progress("Telling victim there will be two items coming")
old_speedup = self.get_parameter("SIM_SPEEDUP")
self.set_parameter("SIM_SPEEDUP", 1)
self.mav.mav.mission_count_send(target_system,
target_component,
2,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE)
self.progress("Sending item with seq=0")
item = self.mav.mav.mission_item_int_encode(
target_system,
target_component,
0,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_FENCE_CIRCLE_EXCLUSION,
0,
0,
1,
0,
0,
0,
int(1.1 * 1e7),
int(1.2 * 1e7),
33.0000,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE)
self.expect_request_for_item(item)
item.pack(self.mav.mav)
self.mav.mav.send(item)
self.progress("Sending item with seq=1")
item = self.mav.mav.mission_item_int_encode(
target_system,
target_component,
1,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_FENCE_CIRCLE_EXCLUSION,
0,
0,
1,
0,
0,
0,
int(1.1 * 1e7),
int(1.2 * 1e7),
33.0000,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE)
self.expect_request_for_item(item)
self.set_parameter("SIM_SPEEDUP", old_speedup)
self.progress("Now waiting for a timeout")
tstart = self.get_sim_time()
rerequest_count = 0
received_text = False
received_ack = False
while True:
if received_ack and received_text:
break
if self.get_sim_time_cached() - tstart > 10:
raise NotAchievedException("Did not get expected ack and statustext")
m = self.mav.recv_match(type=['MISSION_REQUEST', 'MISSION_ACK', 'STATUSTEXT'],
blocking=True,
timeout=0.1)
self.progress("Got (%s)" % str(m))
if m is None:
self.progress("Did not receive any messages")
continue
if m.get_type() == "MISSION_REQUEST":
if m.seq != 1:
raise NotAchievedException("Received request for invalid seq")
if m.target_system != self.mav.mav.srcSystem:
raise NotAchievedException("Incorrect target system in MISSION_REQUEST")
if m.target_component != self.mav.mav.srcComponent:
raise NotAchievedException("Incorrect target component in MISSION_REQUEST")
rerequest_count += 1
self.progress("Valid re-request received.")
continue
if m.get_type() == "MISSION_ACK":
if m.mission_type != mavutil.mavlink.MAV_MISSION_TYPE_FENCE:
raise NotAchievedException("Wrong mission type")
if m.type != mavutil.mavlink.MAV_MISSION_OPERATION_CANCELLED:
raise NotAchievedException("Wrong result")
received_ack = True
continue
if m.get_type() == "STATUSTEXT":
if "upload time" in m.text:
received_text = True
continue
if rerequest_count < 3:
raise NotAchievedException("Expected several re-requests of mission item")
self.end_subtest("fence upload timeouts 2")
def test_fence_upload_timeouts(self, target_system=1, target_component=1):
self.test_fence_upload_timeouts_1(target_system=target_system,
target_component=target_component)
self.test_fence_upload_timeouts_2(target_system=target_system,
target_component=target_component)
def test_poly_fence_compatability_ordering(self, target_system=1, target_component=1):
self.clear_mission(mavutil.mavlink.MAV_MISSION_TYPE_FENCE,
target_system=target_system,
target_component=target_component)
here = self.mav.location()
self.progress("try uploading return point last")
self.roundtrip_fence_using_fencepoint_protocol([
self.offset_location_ne(here, 0, 0),
self.offset_location_ne(here, -50, 20),
self.offset_location_ne(here, 50, 20),
self.offset_location_ne(here, 50, 40),
self.offset_location_ne(here, -50, 40),
self.offset_location_ne(here, -50, 20),
], ordering=[1, 2, 3, 4, 5, 0])
self.clear_mission(mavutil.mavlink.MAV_MISSION_TYPE_FENCE,
target_system=target_system,
target_component=target_component)
self.progress("try uploading return point in middle")
self.roundtrip_fence_using_fencepoint_protocol([
self.offset_location_ne(here, 0, 0),
self.offset_location_ne(here, -50, 20),
self.offset_location_ne(here, 50, 20),
self.offset_location_ne(here, 50, 40),
self.offset_location_ne(here, -50, 40),
self.offset_location_ne(here, -50, 20),
], ordering=[1, 2, 3, 0, 4, 5])
self.clear_mission(mavutil.mavlink.MAV_MISSION_TYPE_FENCE,
target_system=target_system,
target_component=target_component)
self.progress("try closing point in middle")
self.roundtrip_fence_using_fencepoint_protocol([
self.offset_location_ne(here, 0, 0),
self.offset_location_ne(here, -50, 20),
self.offset_location_ne(here, 50, 20),
self.offset_location_ne(here, 50, 40),
self.offset_location_ne(here, -50, 40),
self.offset_location_ne(here, -50, 20),
], ordering=[0, 1, 2, 5, 3, 4])
self.clear_mission(mavutil.mavlink.MAV_MISSION_TYPE_FENCE,
target_system=target_system,
target_component=target_component)
self.progress("try (almost) reverse order")
self.roundtrip_fence_using_fencepoint_protocol([
self.offset_location_ne(here, 0, 0),
self.offset_location_ne(here, -50, 20),
self.offset_location_ne(here, 50, 20),
self.offset_location_ne(here, 50, 40),
self.offset_location_ne(here, -50, 40),
self.offset_location_ne(here, -50, 20),
], ordering=[4, 3, 2, 1, 0, 5])
self.clear_mission(mavutil.mavlink.MAV_MISSION_TYPE_FENCE,
target_system=target_system,
target_component=target_component)
def test_poly_fence_big_then_small(self, target_system=1, target_component=1):
here = self.mav.location()
self.roundtrip_fence_using_fencepoint_protocol([
self.offset_location_ne(here, 0, 0),
self.offset_location_ne(here, -50, 20),
self.offset_location_ne(here, 50, 20),
self.offset_location_ne(here, 50, 40),
self.offset_location_ne(here, -50, 40),
self.offset_location_ne(here, -50, 20),
], ordering=[1, 2, 3, 4, 5, 0])
downloaded_items = self.download_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_FENCE)
if len(downloaded_items) != 5:
raise NotAchievedException("Bad number of downloaded items in original download")
self.roundtrip_fence_using_fencepoint_protocol([
self.offset_location_ne(here, 0, 0),
self.offset_location_ne(here, -50, 20),
self.offset_location_ne(here, 50, 40),
self.offset_location_ne(here, -50, 40),
self.offset_location_ne(here, -50, 20),
], ordering=[1, 2, 3, 4, 0])
downloaded_items = self.download_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_FENCE)
want_count = 4
if len(downloaded_items) != want_count:
raise NotAchievedException("Bad number of downloaded items in second download got=%u wanted=%u" %
(len(downloaded_items), want_count))
downloaded_items = self.download_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_FENCE)
if len(downloaded_items) != 4:
raise NotAchievedException("Bad number of downloaded items in second download (second time) got=%u want=%u" %
(len(downloaded_items), want_count))
def test_poly_fence_compatability(self, target_system=1, target_component=1):
self.clear_mission(mavutil.mavlink.MAV_MISSION_TYPE_FENCE,
target_system=target_system,
target_component=target_component)
self.test_poly_fence_compatability_ordering(target_system=target_system, target_component=target_component)
here = self.mav.location()
self.progress("Playing with changing point count")
self.roundtrip_fence_using_fencepoint_protocol(
[
self.offset_location_ne(here, 0, 0),
self.offset_location_ne(here, -50, 20),
self.offset_location_ne(here, 50, 20),
self.offset_location_ne(here, 50, 40),
self.offset_location_ne(here, -50, 40),
self.offset_location_ne(here, -50, 20),
])
self.roundtrip_fence_using_fencepoint_protocol(
[
self.offset_location_ne(here, 0, 0),
self.offset_location_ne(here, -50, 20),
self.offset_location_ne(here, 50, 20),
self.offset_location_ne(here, -50, 40),
self.offset_location_ne(here, -50, 20),
])
self.roundtrip_fence_using_fencepoint_protocol(
[
self.offset_location_ne(here, 0, 0),
self.offset_location_ne(here, -50, 20),
self.offset_location_ne(here, 50, 20),
self.offset_location_ne(here, 50, 40),
self.offset_location_ne(here, -50, 40),
self.offset_location_ne(here, -50, 20),
])
def test_poly_fence_reboot_survivability(self):
here = self.mav.location()
self.upload_fences_from_locations([
(mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_EXCLUSION, [
self.offset_location_ne(here, -50, 20),
self.offset_location_ne(here, 50, 20),
self.offset_location_ne(here, 50, 40),
self.offset_location_ne(here, -50, 40),
]),
(mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_EXCLUSION, [
self.offset_location_ne(here, -50, -50),
self.offset_location_ne(here, -50, 50),
self.offset_location_ne(here, 50, 50),
self.offset_location_ne(here, 50, -50),
]),
])
self.reboot_sitl()
downloaded_items = self.download_using_mission_protocol(mavutil.mavlink.MAV_MISSION_TYPE_FENCE)
downloaded_len = len(downloaded_items)
if downloaded_len != 8:
raise NotAchievedException("Items did not survive reboot (want=%u got=%u)" %
(8, downloaded_len))
def PolyFence(self):
'''test fence-related functions'''
target_system = 1
target_component = 1
self.change_mode("LOITER")
self.wait_ready_to_arm()
here = self.mav.location()
self.progress("here: %f %f" % (here.lat, here.lng))
self.set_parameters({
"FENCE_ENABLE": 1,
"AVOID_ENABLE": 0,
})
self.test_poly_fence_big_then_small()
self.test_poly_fence_compatability()
self.test_fence_upload_timeouts()
self.test_poly_fence_noarms(target_system=target_system, target_component=target_component)
self.arm_vehicle()
self.test_poly_fence_inclusion(here, target_system=target_system, target_component=target_component)
self.test_poly_fence_exclusion(here, target_system=target_system, target_component=target_component)
self.disarm_vehicle()
self.test_poly_fence_reboot_survivability()
def test_poly_fence_inclusion_overlapping_inclusion_circles(self, here, target_system=1, target_component=1):
self.start_subtest("Overlapping circular inclusion")
self.upload_fences_from_locations([
(mavutil.mavlink.MAV_CMD_NAV_FENCE_CIRCLE_INCLUSION, {
"radius": 30,
"loc": self.offset_location_ne(here, -20, 0),
}),
(mavutil.mavlink.MAV_CMD_NAV_FENCE_CIRCLE_INCLUSION, {
"radius": 30,
"loc": self.offset_location_ne(here, 20, 0),
}),
])
if self.mavproxy is not None:
self.mavproxy.send("fence list\n")
self.delay_sim_time(5)
self.progress("Drive outside top circle")
fence_middle = self.offset_location_ne(here, -150, 0)
self.drive_somewhere_breach_boundary_and_rtl(
fence_middle,
target_system=target_system,
target_component=target_component)
self.delay_sim_time(5)
self.progress("Drive outside bottom circle")
fence_middle = self.offset_location_ne(here, 150, 0)
self.drive_somewhere_breach_boundary_and_rtl(
fence_middle,
target_system=target_system,
target_component=target_component)
def test_poly_fence_inclusion(self, here, target_system=1, target_component=1):
self.progress("Circle and Polygon inclusion")
self.test_poly_fence_inclusion_overlapping_inclusion_circles(
here,
target_system=target_system,
target_component=target_component)
self.upload_fences_from_locations([
(mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_INCLUSION, [
self.offset_location_ne(here, -40, -20),
self.offset_location_ne(here, 50, -20),
self.offset_location_ne(here, 50, 20),
self.offset_location_ne(here, -40, 20),
]),
(mavutil.mavlink.MAV_CMD_NAV_FENCE_CIRCLE_INCLUSION, {
"radius": 30,
"loc": self.offset_location_ne(here, -20, 0),
}),
])
self.delay_sim_time(5)
if self.mavproxy is not None:
self.mavproxy.send("fence list\n")
self.progress("Drive outside polygon")
fence_middle = self.offset_location_ne(here, -150, 0)
self.drive_somewhere_breach_boundary_and_rtl(
fence_middle,
target_system=target_system,
target_component=target_component)
self.delay_sim_time(5)
self.progress("Drive outside circle")
fence_middle = self.offset_location_ne(here, 150, 0)
self.drive_somewhere_breach_boundary_and_rtl(
fence_middle,
target_system=target_system,
target_component=target_component)
self.upload_fences_from_locations([
(mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_INCLUSION, [
self.offset_location_ne(here, -20, -25),
self.offset_location_ne(here, 50, -25),
self.offset_location_ne(here, 50, 15),
self.offset_location_ne(here, -20, 15),
]),
(mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_INCLUSION, [
self.offset_location_ne(here, 20, -20),
self.offset_location_ne(here, -50, -20),
self.offset_location_ne(here, -50, 20),
self.offset_location_ne(here, 20, 20),
]),
])
self.delay_sim_time(5)
if self.mavproxy is not None:
self.mavproxy.send("fence list\n")
self.progress("Drive outside top polygon")
fence_middle = self.offset_location_ne(here, -150, 0)
self.drive_somewhere_breach_boundary_and_rtl(
fence_middle,
target_system=target_system,
target_component=target_component)
self.delay_sim_time(5)
self.progress("Drive outside bottom polygon")
fence_middle = self.offset_location_ne(here, 150, 0)
self.drive_somewhere_breach_boundary_and_rtl(
fence_middle,
target_system=target_system,
target_component=target_component)
def test_poly_fence_exclusion(self, here, target_system=1, target_component=1):
self.upload_fences_from_locations([
(mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_EXCLUSION, [
self.offset_location_ne(here, -50, 20),
self.offset_location_ne(here, 50, 20),
self.offset_location_ne(here, 50, 40),
self.offset_location_ne(here, -50, 40),
]),
(mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_EXCLUSION, [
self.offset_location_ne(here, -50, -20),
self.offset_location_ne(here, 50, -20),
self.offset_location_ne(here, 50, -40),
self.offset_location_ne(here, -50, -40),
]),
(mavutil.mavlink.MAV_CMD_NAV_FENCE_CIRCLE_EXCLUSION, {
"radius": 30,
"loc": self.offset_location_ne(here, -60, 0),
}),
])
self.delay_sim_time(5)
if self.mavproxy is not None:
self.mavproxy.send("fence list\n")
self.progress("Breach eastern boundary")
fence_middle = self.offset_location_ne(here, 0, 30)
self.drive_somewhere_breach_boundary_and_rtl(fence_middle,
target_system=target_system,
target_component=target_component)
self.progress("delaying - hack to work around manual recovery bug")
self.delay_sim_time(5)
self.progress("Breach western boundary")
fence_middle = self.offset_location_ne(here, 0, -30)
self.drive_somewhere_breach_boundary_and_rtl(fence_middle,
target_system=target_system,
target_component=target_component)
self.progress("delaying - hack to work around manual recovery bug")
self.delay_sim_time(5)
self.progress("Breach southern circle")
fence_middle = self.offset_location_ne(here, -150, 0)
self.drive_somewhere_breach_boundary_and_rtl(fence_middle,
target_system=target_system,
target_component=target_component)
def SmartRTL(self):
'''Test SmartRTL'''
self.change_mode("STEERING")
self.wait_ready_to_arm()
self.arm_vehicle()
self.progress("Driving North")
self.reach_heading_manual(0)
self.set_rc(3, 2000)
self.delay_sim_time(5)
self.set_rc(3, 1000)
self.wait_groundspeed(0, 1)
loc = self.mav.location()
self.progress("Driving East")
self.set_rc(3, 2000)
self.reach_heading_manual(90)
self.set_rc(3, 2000)
self.delay_sim_time(5)
self.set_rc(3, 1000)
self.progress("Entering smartrtl")
self.change_mode("SMART_RTL")
self.progress("Ensure we go via intermediate point")
self.wait_distance_to_location(loc, 0, 5, timeout=60)
self.progress("Ensure we get home")
self.wait_distance_to_home(3, 7, timeout=30)
self.disarm_vehicle()
def MotorTest(self):
'''Motor Test triggered via mavlink'''
magic_throttle_value = 1812
self.wait_ready_to_arm()
self.run_cmd(
mavutil.mavlink.MAV_CMD_DO_MOTOR_TEST,
p1=1,
p2=mavutil.mavlink.MOTOR_TEST_THROTTLE_PWM,
p3=magic_throttle_value,
p4=5,
p5=1,
p6=0,
)
self.wait_armed()
self.progress("Waiting for magic throttle value")
self.wait_servo_channel_value(3, magic_throttle_value)
self.wait_servo_channel_value(3, self.get_parameter("RC3_TRIM", 5), timeout=10)
self.wait_disarmed()
def PolyFenceObjectAvoidanceGuided(self, target_system=1, target_component=1):
'''PolyFence object avoidance tests - guided mode'''
if not self.mavproxy_can_do_mision_item_protocols():
return
self.test_poly_fence_object_avoidance_guided_pathfinding(
target_system=target_system,
target_component=target_component)
self.test_poly_fence_object_avoidance_guided_two_squares(
target_system=target_system,
target_component=target_component)
def PolyFenceObjectAvoidanceAuto(self, target_system=1, target_component=1):
'''PolyFence object avoidance tests - auto mode'''
mavproxy = self.start_mavproxy()
self.load_fence_using_mavproxy(mavproxy, "rover-path-planning-fence.txt")
self.stop_mavproxy(mavproxy)
self.load_mission("rover-path-planning-mission.txt")
self.set_parameters({
"AVOID_ENABLE": 3,
"OA_TYPE": 2,
"FENCE_MARGIN": 0,
})
self.reboot_sitl()
self.change_mode('AUTO')
self.wait_ready_to_arm()
self.arm_vehicle()
self.set_parameter("FENCE_ENABLE", 1)
target_loc = mavutil.location(40.073799, -105.229156)
self.wait_location(target_loc, height_accuracy=None, timeout=300)
self.wait_distance_to_home(3, 7, timeout=300)
self.disarm_vehicle()
def send_guided_mission_item(self, loc, target_system=1, target_component=1):
self.mav.mav.mission_item_send(
target_system,
target_component,
0,
mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT,
mavutil.mavlink.MAV_CMD_NAV_WAYPOINT,
2,
0,
0,
0,
0,
0,
loc.lat,
loc.lng,
0
)
def test_poly_fence_object_avoidance_guided_pathfinding(self, target_system=1, target_component=1):
self.load_fence("rover-path-planning-fence.txt")
self.set_parameters({
"AVOID_ENABLE": 3,
"OA_TYPE": 2,
"FENCE_MARGIN": 0,
})
self.reboot_sitl()
self.change_mode('GUIDED')
self.wait_ready_to_arm()
self.arm_vehicle()
self.set_parameter("FENCE_ENABLE", 1)
target_loc = mavutil.location(40.073800, -105.229172)
self.send_guided_mission_item(target_loc,
target_system=target_system,
target_component=target_component)
self.wait_location(target_loc, timeout=300)
self.do_RTL(timeout=300)
self.disarm_vehicle()
def WheelEncoders(self):
'''make sure wheel encoders are generally working'''
self.set_parameters({
"WENC_TYPE": 10,
"EK3_ENABLE": 1,
"AHRS_EKF_TYPE": 3,
})
self.reboot_sitl()
self.change_mode("LOITER")
self.wait_ready_to_arm()
self.change_mode("MANUAL")
self.arm_vehicle()
self.set_rc(3, 1600)
m = self.assert_receive_message('WHEEL_DISTANCE', timeout=5)
tstart = self.get_sim_time()
while True:
if self.get_sim_time_cached() - tstart > 10:
break
dist_home = self.distance_to_home(use_cached_home=True)
m = self.mav.messages.get("WHEEL_DISTANCE")
delta = abs(m.distance[0] - dist_home)
self.progress("dist-home=%f wheel-distance=%f delta=%f" %
(dist_home, m.distance[0], delta))
if delta > 5:
raise NotAchievedException("wheel distance incorrect")
self.disarm_vehicle()
def test_poly_fence_object_avoidance_guided_two_squares(self, target_system=1, target_component=1):
self.start_subtest("Ensure we can steer around obstacles in guided mode")
here = self.mav.location()
self.upload_fences_from_locations([
(mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_EXCLUSION, [
self.offset_location_ne(here, -50, 20),
self.offset_location_ne(here, 50, 10),
self.offset_location_ne(here, 50, 30),
self.offset_location_ne(here, -50, 40),
]),
(mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_EXCLUSION, [
self.offset_location_ne(here, -60, 60),
self.offset_location_ne(here, 40, 70),
self.offset_location_ne(here, 40, 90),
self.offset_location_ne(here, -60, 80),
]),
])
if self.mavproxy is not None:
self.mavproxy.send("fence list\n")
self.context_push()
ex = None
try:
self.set_parameters({
"AVOID_ENABLE": 3,
"OA_TYPE": 2,
})
self.reboot_sitl()
self.change_mode('GUIDED')
self.wait_ready_to_arm()
self.set_parameter("FENCE_ENABLE", 1)
if self.mavproxy is not None:
self.mavproxy.send("fence list\n")
self.arm_vehicle()
self.change_mode("GUIDED")
target = mavutil.location(40.071382, -105.228340, 0, 0)
self.send_guided_mission_item(target,
target_system=target_system,
target_component=target_component)
self.wait_location(target, timeout=300)
self.do_RTL()
self.disarm_vehicle()
except Exception as e:
self.print_exception_caught(e)
ex = e
self.context_pop()
self.reboot_sitl()
if ex is not None:
raise ex
def test_poly_fence_avoidance_dont_breach_exclusion(self, target_system=1, target_component=1):
self.start_subtest("Ensure we stop before breaching an exclusion fence")
here = self.mav.location()
self.upload_fences_from_locations([
(mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_EXCLUSION, [
self.offset_location_ne(here, -50, 20),
self.offset_location_ne(here, 50, 20),
self.offset_location_ne(here, 50, 40),
self.offset_location_ne(here, -50, 40),
]),
(mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_EXCLUSION, [
self.offset_location_ne(here, -50, -20),
self.offset_location_ne(here, 50, -20),
self.offset_location_ne(here, 50, -40),
self.offset_location_ne(here, -50, -40),
]),
(mavutil.mavlink.MAV_CMD_NAV_FENCE_CIRCLE_INCLUSION, {
"radius": 30,
"loc": self.offset_location_ne(here, -60, 0),
}),
])
if self.mavproxy is not None:
self.mavproxy.send("fence list\n")
self.set_parameters({
"FENCE_ENABLE": 1,
"AVOID_ENABLE": 3,
})
fence_middle = self.offset_location_ne(here, 0, 30)
expected_stopping_point = mavutil.location(40.0713376, -105.2295738, 0, 0)
self.drive_somewhere_stop_at_boundary(
fence_middle,
expected_stopping_point,
target_system=target_system,
target_component=target_component,
expected_distance_epsilon=3)
self.set_parameter("AVOID_ENABLE", 0)
self.do_RTL()
def do_RTL(self, distance_min=3, distance_max=7, timeout=60):
self.change_mode("RTL")
self.wait_distance_to_home(distance_min, distance_max, timeout=timeout)
def PolyFenceAvoidance(self, target_system=1, target_component=1):
'''PolyFence avoidance tests'''
self.change_mode("LOITER")
self.wait_ready_to_arm()
self.arm_vehicle()
self.change_mode("MANUAL")
self.reach_heading_manual(180, turn_right=False)
self.change_mode("GUIDED")
self.test_poly_fence_avoidance_dont_breach_exclusion(target_system=target_system, target_component=target_component)
self.disarm_vehicle()
def PolyFenceObjectAvoidanceBendyRuler(self, target_system=1, target_component=1):
'''PolyFence object avoidance tests - bendy ruler'''
self.load_fence_using_mavwp("rover-path-bendyruler-fence.txt")
self.set_parameters({
"AVOID_ENABLE": 3,
"OA_TYPE": 1,
"FENCE_ENABLE": 1,
"WP_RADIUS": 5,
})
self.reboot_sitl()
self.set_parameters({
"OA_BR_LOOKAHEAD": 50,
})
self.change_mode('GUIDED')
self.wait_ready_to_arm()
self.arm_vehicle()
target_loc = mavutil.location(40.071060, -105.227734, 1584, 0)
self.send_guided_mission_item(target_loc,
target_system=target_system,
target_component=target_component)
self.wait_location(target_loc, timeout=300, accuracy=15)
self.do_RTL(timeout=300)
self.disarm_vehicle()
def PolyFenceObjectAvoidanceBendyRulerEasierGuided(self, target_system=1, target_component=1):
'''finish-line issue means we can't complete the harder one. This
test can go away once we've nailed that one. The only
difference here is the target point.
'''
self.load_fence_using_mavwp("rover-path-bendyruler-fence.txt")
self.set_parameters({
"AVOID_ENABLE": 3,
"OA_TYPE": 1,
"FENCE_ENABLE": 1,
"WP_RADIUS": 5,
})
self.reboot_sitl()
self.set_parameters({
"OA_BR_LOOKAHEAD": 60,
})
self.change_mode('GUIDED')
self.wait_ready_to_arm()
self.arm_vehicle()
target_loc = mavutil.location(40.071260, -105.227000, 1584, 0)
self.send_guided_mission_item(target_loc,
target_system=target_system,
target_component=target_component)
self.wait_location(target_loc, timeout=300, accuracy=15)
self.do_RTL(timeout=300)
self.disarm_vehicle()
def PolyFenceObjectAvoidanceBendyRulerEasierAuto(self, target_system=1, target_component=1):
'''finish-line issue means we can't complete the harder one. This
test can go away once we've nailed that one. The only
difference here is the target point.
'''
self.load_fence_using_mavwp("rover-path-bendyruler-fence.txt")
self.load_mission("rover-path-bendyruler-mission-easier.txt")
self.set_parameters({
"AVOID_ENABLE": 3,
"OA_TYPE": 1,
"FENCE_ENABLE": 1,
"WP_RADIUS": 5,
})
self.reboot_sitl()
self.set_parameters({
"OA_BR_LOOKAHEAD": 60,
})
self.change_mode('AUTO')
self.wait_ready_to_arm()
self.arm_vehicle()
target_loc = mavutil.location(40.071260, -105.227000, 1584, 0)
self.wait_location(target_loc, timeout=300)
self.wait_distance_to_home(3, 7, timeout=300)
self.disarm_vehicle()
def test_scripting_simple_loop(self):
self.start_subtest("Scripting simple loop")
self.context_push()
messages = []
def my_message_hook(mav, message):
if message.get_type() != 'STATUSTEXT':
return
messages.append(message)
self.install_message_hook_context(my_message_hook)
self.set_parameter("SCR_ENABLE", 1)
self.install_example_script_context("simple_loop.lua")
self.reboot_sitl()
self.delay_sim_time(10)
self.context_pop()
self.reboot_sitl()
count = 0
for m in messages:
if "hello, world" in m.text:
count += 1
self.progress("Got %u hellos" % count)
if count < 3:
raise NotAchievedException("Expected at least three hellos")
def test_scripting_internal_test(self):
self.start_subtest("Scripting internal test")
self.context_push()
self.set_parameters({
"SCR_ENABLE": 1,
"SCR_HEAP_SIZE": 1024000,
"SCR_VM_I_COUNT": 1000000,
})
self.install_test_modules_context()
self.install_mavlink_module_context()
self.install_test_scripts_context([
"scripting_test.lua",
"scripting_require_test_2.lua",
"math.lua",
"strings.lua",
"mavlink_test.lua",
])
self.context_collect('STATUSTEXT')
self.context_collect('NAMED_VALUE_FLOAT')
self.reboot_sitl()
for success_text in [
"Internal tests passed",
"Require test 2 passed",
"Math tests passed",
"String tests passed",
"Received heartbeat from"
]:
self.wait_statustext(success_text, check_context=True)
for success_nvf in [
"test",
]:
self.assert_received_message_field_values("NAMED_VALUE_FLOAT", {
"name": success_nvf,
}, check_context=True)
self.context_pop()
self.reboot_sitl()
def test_scripting_hello_world(self):
self.start_subtest("Scripting hello world")
self.context_push()
self.context_collect("STATUSTEXT")
self.set_parameter("SCR_ENABLE", 1)
self.install_example_script_context("hello_world.lua")
self.reboot_sitl()
self.wait_statustext('hello, world', check_context=True, timeout=30)
self.context_pop()
self.reboot_sitl()
def ScriptingSteeringAndThrottle(self):
'''Scripting test - steering and throttle'''
self.start_subtest("Scripting square")
self.context_push()
self.install_example_script_context("rover-set-steering-and-throttle.lua")
self.set_parameter("SCR_ENABLE", 1)
self.reboot_sitl()
self.wait_ready_to_arm()
self.arm_vehicle()
self.set_rc(6, 2000)
tstart = self.get_sim_time()
while not self.mode_is("HOLD"):
if self.get_sim_time_cached() - tstart > 30:
raise NotAchievedException("Did not move to hold")
m = self.assert_receive_message('VFR_HUD')
if m is not None:
self.progress("Current speed: %f" % m.groundspeed)
self.disarm_vehicle()
self.context_pop()
self.reboot_sitl()
def test_scripting_auxfunc(self):
self.start_subtest("Scripting aufunc triggering")
self.context_push()
self.context_collect("STATUSTEXT")
self.set_parameters({
"SCR_ENABLE": 1,
"RELAY1_FUNCTION": 1,
"RELAY1_PIN": 1
})
self.install_example_script_context("RCIN_test.lua")
self.reboot_sitl()
self.wait_parameter_value("SIM_PIN_MASK", 121)
self.wait_parameter_value("SIM_PIN_MASK", 123)
self.wait_parameter_value("SIM_PIN_MASK", 121)
self.context_pop()
self.reboot_sitl()
def test_scripting_print_home_and_origin(self):
self.start_subtest("Scripting print home and origin")
self.context_push()
self.set_parameter("SCR_ENABLE", 1)
self.install_example_script_context("ahrs-print-home-and-origin.lua")
self.reboot_sitl()
self.wait_ready_to_arm()
self.wait_statustext("Home - ")
self.wait_statustext("Origin - ")
self.context_pop()
self.reboot_sitl()
def test_scripting_set_home_to_vehicle_location(self):
self.start_subtest("Scripting set home to vehicle location")
self.context_push()
self.set_parameter("SCR_ENABLE", 1)
self.install_example_script_context("ahrs-set-home-to-vehicle-location.lua")
self.reboot_sitl()
self.wait_statustext("Home position reset")
self.context_pop()
self.reboot_sitl()
def test_scripting_serial_loopback(self):
self.start_subtest("Scripting serial loopback test")
self.context_push()
self.context_collect('STATUSTEXT')
self.set_parameters({
"SCR_ENABLE": 1,
"SCR_SDEV_EN": 1,
"SCR_SDEV1_PROTO": 28,
})
self.install_test_script_context("serial_loopback.lua")
self.reboot_sitl()
for success_text in [
"driver -> device good",
"device -> driver good",
]:
self.wait_statustext(success_text, check_context=True)
self.context_pop()
self.reboot_sitl()
def Scripting(self):
'''Scripting test'''
self.test_scripting_set_home_to_vehicle_location()
self.test_scripting_print_home_and_origin()
self.test_scripting_hello_world()
self.test_scripting_simple_loop()
self.test_scripting_internal_test()
self.test_scripting_auxfunc()
self.test_scripting_serial_loopback()
def test_mission_frame(self, frame, target_system=1, target_component=1):
self.clear_mission(mavutil.mavlink.MAV_MISSION_TYPE_MISSION,
target_system=target_system,
target_component=target_component)
items = [
self.mav.mav.mission_item_int_encode(
target_system,
target_component,
0,
mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT,
mavutil.mavlink.MAV_CMD_NAV_WAYPOINT,
0,
0,
3,
0,
0,
0,
int(1.0000 * 1e7),
int(1.0000 * 1e7),
31.0000,
mavutil.mavlink.MAV_MISSION_TYPE_MISSION),
self.mav.mav.mission_item_int_encode(
target_system,
target_component,
1,
frame,
mavutil.mavlink.MAV_CMD_NAV_WAYPOINT,
0,
0,
3,
0,
0,
0,
int(1.0000 * 1e7),
int(1.0000 * 1e7),
31.0000,
mavutil.mavlink.MAV_MISSION_TYPE_MISSION),
]
self.check_mission_upload_download(items)
def MissionFrames(self, target_system=1, target_component=1):
'''Upload/Download of items in different frames'''
for frame in (mavutil.mavlink.MAV_FRAME_GLOBAL_TERRAIN_ALT_INT,
mavutil.mavlink.MAV_FRAME_GLOBAL_TERRAIN_ALT,
mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT_INT,
mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_FRAME_GLOBAL):
self.test_mission_frame(frame,
target_system=1,
target_component=1)
def mavlink_time_boot_ms(self):
'''returns a time suitable for putting into the time_boot_ms entry in mavlink packets'''
return int(time.time() * 1000000)
def mavlink_time_boot_us(self):
'''returns a time suitable for putting into the time_boot_ms entry in mavlink packets'''
return int(time.time() * 1000000000)
def ap_proximity_mav_obstacle_distance_send(self, data):
increment = data.get("increment", 0)
increment_f = data.get("increment_f", 0.0)
max_distance = data["max_distance"]
invalid_distance = max_distance + 1
distances = data["distances"][:]
distances.extend([invalid_distance] * (72-len(distances)))
self.mav.mav.obstacle_distance_send(
self.mavlink_time_boot_us(),
mavutil.mavlink.MAV_DISTANCE_SENSOR_LASER,
distances,
increment,
data["min_distance"],
data["max_distance"],
increment_f,
data["angle_offset"],
mavutil.mavlink.MAV_FRAME_BODY_FRD
)
def send_obstacle_distances_expect_distance_sensor_messages(self, obstacle_distances_in, expect_distance_sensor_messages):
self.delay_sim_time(11)
self.do_timesync_roundtrip()
expect_distance_sensor_messages_copy = expect_distance_sensor_messages[:]
last_sent = 0
while True:
now = self.get_sim_time_cached()
if now - last_sent > 1:
self.progress("Sending")
self.ap_proximity_mav_obstacle_distance_send(obstacle_distances_in)
last_sent = now
m = self.mav.recv_match(type='DISTANCE_SENSOR', blocking=True, timeout=1)
self.progress("Got (%s)" % str(m))
if m is None:
self.delay_sim_time(1)
continue
orientation = m.orientation
found = False
if m.current_distance == m.max_distance:
continue
for expected_distance_sensor_message in expect_distance_sensor_messages_copy:
if expected_distance_sensor_message["orientation"] != orientation:
continue
found = True
if not expected_distance_sensor_message.get("__found__", False):
self.progress("Marking message as found")
expected_distance_sensor_message["__found__"] = True
if (m.current_distance - expected_distance_sensor_message["distance"] > 1):
raise NotAchievedException(
"Bad distance for orient=%u want=%u got=%u" %
(orientation, expected_distance_sensor_message["distance"], m.current_distance))
break
if not found:
raise NotAchievedException("Got unexpected DISTANCE_SENSOR message")
all_found = True
for expected_distance_sensor_message in expect_distance_sensor_messages_copy:
if not expected_distance_sensor_message.get("__found__", False):
self.progress("message still not found (orient=%u" % expected_distance_sensor_message["orientation"])
all_found = False
break
if all_found:
self.progress("Have now seen all expected messages")
break
def AP_Proximity_MAV(self):
'''Test MAV proximity backend'''
self.set_parameters({
"PRX1_TYPE": 2,
"OA_TYPE": 2,
"OA_DB_OUTPUT": 3,
})
self.reboot_sitl()
self.send_obstacle_distances_expect_distance_sensor_messages(
{
"distances": [234],
"increment_f": 10,
"angle_offset": 0.0,
"min_distance": 0,
"max_distance": 1000,
}, [
{"orientation": 0, "distance": 234},
])
self.send_obstacle_distances_expect_distance_sensor_messages(
{
"distances": [111, 222, 333, 444, 555],
"increment_f": 10,
"angle_offset": -20.0,
"min_distance": 0,
"max_distance": 1000,
}, [
{"orientation": 0, "distance": 111},
])
distances = [0] * 72
for i in range(0, 72):
distances[i] = 1000 + 10*abs(36-i)
self.send_obstacle_distances_expect_distance_sensor_messages(
{
"distances": distances,
"increment_f": 90/72.0,
"angle_offset": -45.0,
"min_distance": 0,
"max_distance": 2000,
}, [
{"orientation": 0, "distance": 1000},
{"orientation": 1, "distance": 1190},
{"orientation": 7, "distance": 1190},
])
def SendToComponents(self):
'''Test ArduPilot send_to_components function'''
self.set_parameter("CAM1_TYPE", 5)
self.reboot_sitl()
self.progress("Introducing ourselves to the autopilot as a component")
old_srcSystem = self.mav.mav.srcSystem
self.mav.mav.srcSystem = 1
self.mav.mav.heartbeat_send(
mavutil.mavlink.MAV_TYPE_ONBOARD_CONTROLLER,
mavutil.mavlink.MAV_AUTOPILOT_INVALID,
0,
0,
0)
self.progress("Sending control message")
self.context_push()
self.context_collect('COMMAND_LONG')
self.mav.mav.digicam_control_send(
1,
1,
1,
1,
0,
0,
0,
17,
0,
0.0,
)
self.mav.mav.srcSystem = old_srcSystem
self.assert_received_message_field_values('COMMAND_LONG', {
'command': mavutil.mavlink.MAV_CMD_DO_DIGICAM_CONTROL,
'param6': 17,
}, timeout=2, check_context=True)
self.context_pop()
for run_cmd in self.run_cmd, self.run_cmd_int:
self.progress("Sending control command")
self.context_push()
self.context_collect('COMMAND_LONG')
run_cmd(mavutil.mavlink.MAV_CMD_DO_DIGICAM_CONTROL,
p1=1,
p2=1,
p3=0,
p4=0,
p5=0,
p6=37,
)
self.assert_received_message_field_values('COMMAND_LONG', {
'command': mavutil.mavlink.MAV_CMD_DO_DIGICAM_CONTROL,
'param6': 37,
}, timeout=2, check_context=True)
self.context_pop()
for run_cmd in self.run_cmd, self.run_cmd_int:
self.progress("Sending configure command")
self.context_push()
self.context_collect('COMMAND_LONG')
run_cmd(mavutil.mavlink.MAV_CMD_DO_DIGICAM_CONFIGURE,
p1=1,
p2=1,
p3=0,
p4=0,
p5=12,
p6=37
)
self.assert_received_message_field_values('COMMAND_LONG', {
'command': mavutil.mavlink.MAV_CMD_DO_DIGICAM_CONFIGURE,
'param5': 12,
'param6': 37,
}, timeout=2, check_context=True)
self.context_pop()
self.mav.mav.srcSystem = old_srcSystem
def SkidSteer(self):
'''Check skid-steering'''
model = "rover-skid"
self.customise_SITL_commandline([],
model=model,
defaults_filepath=self.model_defaults_filepath(model))
self.change_mode("MANUAL")
self.wait_ready_to_arm()
self.arm_vehicle()
self.progress("get a known heading to avoid worrying about wrap")
self.set_rc(1, 1400)
self.set_rc(3, 1500)
self.wait_heading(90)
self.progress("straighten up")
self.set_rc(1, 1500)
self.set_rc(3, 1500)
self.progress("steer one way")
self.set_rc(1, 1600)
self.set_rc(3, 1400)
self.wait_heading(120)
self.progress("steer the other")
self.set_rc(1, 1400)
self.set_rc(3, 1600)
self.wait_heading(60)
self.zero_throttle()
self.disarm_vehicle()
def SlewRate(self):
"""Test Motor Slew Rate feature."""
self.context_push()
self.change_mode("MANUAL")
self.wait_ready_to_arm()
self.arm_vehicle()
self.start_subtest("Test no slew behavior")
throttle_channel = 3
throttle_max = 2000
self.set_parameter("MOT_SLEWRATE", 0)
self.set_rc(throttle_channel, throttle_max)
tstart = self.get_sim_time()
self.wait_servo_channel_value(throttle_channel, throttle_max)
tstop = self.get_sim_time()
achieved_time = tstop - tstart
self.progress("achieved_time: %0.1fs" % achieved_time)
if achieved_time > 0.5:
raise NotAchievedException("Output response should be instant, got %f" % achieved_time)
self.zero_throttle()
self.wait_groundspeed(0, 0.5)
self.start_subtest("Test 100% slew rate")
self.set_parameter("MOT_SLEWRATE", 100)
self.set_rc(throttle_channel, throttle_max)
tstart = self.get_sim_time()
self.wait_servo_channel_value(throttle_channel, throttle_max)
tstop = self.get_sim_time()
achieved_time = tstop - tstart
self.progress("achieved_time: %0.1fs" % achieved_time)
if achieved_time < 0.9 or achieved_time > 1.1:
raise NotAchievedException("Output response should be 1s, got %f" % achieved_time)
self.zero_throttle()
self.wait_groundspeed(0, 0.5)
self.start_subtest("Test 50% slew rate")
self.set_parameter("MOT_SLEWRATE", 50)
self.set_rc(throttle_channel, throttle_max)
tstart = self.get_sim_time()
self.wait_servo_channel_value(throttle_channel, throttle_max, timeout=10)
tstop = self.get_sim_time()
achieved_time = tstop - tstart
self.progress("achieved_time: %0.1fs" % achieved_time)
if achieved_time < 1.8 or achieved_time > 2.2:
raise NotAchievedException("Output response should be 2s, got %f" % achieved_time)
self.zero_throttle()
self.wait_groundspeed(0, 0.5)
self.start_subtest("Test 25% slew rate")
self.set_parameter("MOT_SLEWRATE", 25)
self.set_rc(throttle_channel, throttle_max)
tstart = self.get_sim_time()
self.wait_servo_channel_value(throttle_channel, throttle_max, timeout=10)
tstop = self.get_sim_time()
achieved_time = tstop - tstart
self.progress("achieved_time: %0.1fs" % achieved_time)
if achieved_time < 3.6 or achieved_time > 4.4:
raise NotAchievedException("Output response should be 4s, got %f" % achieved_time)
self.zero_throttle()
self.wait_groundspeed(0, 0.5)
self.start_subtest("Test 10% slew rate")
self.set_parameter("MOT_SLEWRATE", 10)
self.set_rc(throttle_channel, throttle_max)
tstart = self.get_sim_time()
self.wait_servo_channel_value(throttle_channel, throttle_max, timeout=20)
tstop = self.get_sim_time()
achieved_time = tstop - tstart
self.progress("achieved_time: %0.1fs" % achieved_time)
if achieved_time < 9 or achieved_time > 11:
raise NotAchievedException("Output response should be 10s, got %f" % achieved_time)
self.zero_throttle()
self.wait_groundspeed(0, 0.5)
self.disarm_vehicle()
self.context_pop()
def SET_ATTITUDE_TARGET(self, target_sysid=None, target_compid=1):
'''Test handling of SET_ATTITUDE_TARGET'''
if target_sysid is None:
target_sysid = self.sysid_thismav()
self.change_mode('GUIDED')
self.wait_ready_to_arm()
self.arm_vehicle()
tstart = self.get_sim_time()
while True:
now = self.get_sim_time_cached()
if now - tstart > 10:
raise AutoTestTimeoutException("Didn't get to speed")
self.mav.mav.set_attitude_target_send(
0,
target_sysid,
target_compid,
mavutil.mavlink.ATTITUDE_TARGET_TYPEMASK_BODY_ROLL_RATE_IGNORE |
mavutil.mavlink.ATTITUDE_TARGET_TYPEMASK_BODY_PITCH_RATE_IGNORE |
mavutil.mavlink.ATTITUDE_TARGET_TYPEMASK_ATTITUDE_IGNORE,
mavextra.euler_to_quat([0,
math.radians(0),
math.radians(0)]),
0,
0,
0,
1)
msg = self.assert_receive_message('VFR_HUD')
if msg.groundspeed > 5:
break
self.disarm_vehicle()
def SET_ATTITUDE_TARGET_heading(self, target_sysid=None, target_compid=1):
'''Test handling of SET_ATTITUDE_TARGET'''
self.change_mode('GUIDED')
self.wait_ready_to_arm()
self.arm_vehicle()
for angle in 0, 290, 70, 180, 0:
self.SET_ATTITUDE_TARGET_heading_test_target(angle, target_sysid, target_compid)
self.disarm_vehicle()
def SET_ATTITUDE_TARGET_heading_test_target(self, angle, target_sysid, target_compid):
if target_sysid is None:
target_sysid = self.sysid_thismav()
def poke_set_attitude(value, target):
self.mav.mav.set_attitude_target_send(
0,
target_sysid,
target_compid,
mavutil.mavlink.ATTITUDE_TARGET_TYPEMASK_BODY_ROLL_RATE_IGNORE |
mavutil.mavlink.ATTITUDE_TARGET_TYPEMASK_BODY_PITCH_RATE_IGNORE |
mavutil.mavlink.ATTITUDE_TARGET_TYPEMASK_BODY_YAW_RATE_IGNORE,
mavextra.euler_to_quat([
math.radians(0),
math.radians(0),
math.radians(angle)
]),
0,
0,
0,
1)
self.wait_heading(angle, called_function=poke_set_attitude, minimum_duration=5)
def SET_POSITION_TARGET_LOCAL_NED(self, target_sysid=None, target_compid=1):
'''Test handling of SET_POSITION_TARGET_LOCAL_NED'''
if target_sysid is None:
target_sysid = self.sysid_thismav()
self.change_mode('GUIDED')
self.wait_ready_to_arm()
self.arm_vehicle()
ofs_x = 30.0
ofs_y = 30.0
def send_target():
self.mav.mav.set_position_target_local_ned_send(
0,
target_sysid,
target_compid,
mavutil.mavlink.MAV_FRAME_LOCAL_NED,
mavutil.mavlink.POSITION_TARGET_TYPEMASK_VX_IGNORE |
mavutil.mavlink.POSITION_TARGET_TYPEMASK_VY_IGNORE |
mavutil.mavlink.POSITION_TARGET_TYPEMASK_VZ_IGNORE |
mavutil.mavlink.POSITION_TARGET_TYPEMASK_AX_IGNORE |
mavutil.mavlink.POSITION_TARGET_TYPEMASK_AY_IGNORE |
mavutil.mavlink.POSITION_TARGET_TYPEMASK_AZ_IGNORE |
mavutil.mavlink.POSITION_TARGET_TYPEMASK_YAW_IGNORE |
mavutil.mavlink.POSITION_TARGET_TYPEMASK_YAW_RATE_IGNORE,
ofs_x,
ofs_y,
0,
0,
0,
0,
0,
0,
0,
0,
0,
)
self.wait_distance_to_local_position(
(ofs_x, ofs_y, 0),
distance_min=0,
distance_max=3,
timeout=60,
called_function=lambda last_value, target : send_target(),
minimum_duration=5,
)
self.do_RTL()
self.disarm_vehicle()
def EndMissionBehavior(self, timeout=60):
'''Test end mission behavior'''
self.context_push()
self.load_mission("end-mission.txt")
self.wait_ready_to_arm()
self.arm_vehicle()
self.start_subtest("Test End Mission Behavior HOLD")
self.context_collect("STATUSTEXT")
self.change_mode("AUTO")
self.wait_text("Mission Complete", check_context=True, wallclock_timeout=2)
tstart = self.get_sim_time()
while True:
if self.get_sim_time_cached() - tstart > 15:
raise AutoTestTimeoutException("Still getting POSITION_TARGET_GLOBAL_INT")
m = self.mav.recv_match(type="POSITION_TARGET_GLOBAL_INT",
blocking=True,
timeout=10)
if m is None:
self.progress("No POSITION_TARGET_GLOBAL_INT received, all good !")
break
self.context_clear_collection("STATUSTEXT")
self.change_mode("GUIDED")
self.context_collect("STATUSTEXT")
self.start_subtest("Test End Mission Behavior LOITER")
self.set_parameter("MIS_DONE_BEHAVE", 1)
self.change_mode("AUTO")
self.wait_text("Mission Complete", check_context=True, wallclock_timeout=2)
tstart = self.get_sim_time()
while True:
if self.get_sim_time_cached() - tstart > 15:
raise AutoTestTimeoutException("Not getting POSITION_TARGET_GLOBAL_INT")
m = self.mav.recv_match(type="POSITION_TARGET_GLOBAL_INT",
blocking=True,
timeout=5)
if m is None:
self.progress("No POSITION_TARGET_GLOBAL_INT received")
continue
else:
if self.get_sim_time_cached() - tstart > 15:
self.progress("Got POSITION_TARGET_GLOBAL_INT, all good !")
break
self.start_subtest("Test End Mission Behavior ACRO")
self.set_parameter("MIS_DONE_BEHAVE", 2)
self.change_mode("GUIDED")
self.send_cmd_do_set_mode('AUTO')
self.wait_mode("ACRO")
self.start_subtest("Test End Mission Behavior MANUAL")
self.set_parameter("MIS_DONE_BEHAVE", 3)
self.change_mode("GUIDED")
self.send_cmd_do_set_mode("AUTO")
self.wait_mode("MANUAL")
self.disarm_vehicle()
self.context_pop()
self.reboot_sitl()
def MAVProxyParam(self):
'''Test MAVProxy parameter handling'''
mavproxy = self.start_mavproxy()
mavproxy.send("param fetch\n")
mavproxy.expect("Received [0-9]+ parameters")
self.stop_mavproxy(mavproxy)
def MAV_CMD_DO_SET_MISSION_CURRENT_mission(self, target_system=1, target_component=1):
return copy.copy([
self.mav.mav.mission_item_int_encode(
target_system,
target_component,
0,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_WAYPOINT,
0,
0,
3,
0,
0,
0,
int(1.0000 * 1e7),
int(1.0000 * 1e7),
31.0000,
mavutil.mavlink.MAV_MISSION_TYPE_MISSION),
self.mav.mav.mission_item_int_encode(
target_system,
target_component,
1,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_WAYPOINT,
0,
0,
3,
0,
0,
0,
int(1.0000 * 1e7),
int(1.0000 * 1e7),
31.0000,
mavutil.mavlink.MAV_MISSION_TYPE_MISSION),
self.mav.mav.mission_item_int_encode(
target_system,
target_component,
2,
mavutil.mavlink.MAV_FRAME_GLOBAL_INT,
mavutil.mavlink.MAV_CMD_NAV_WAYPOINT,
0,
0,
3,
0,
0,
0,
int(1.0000 * 1e7),
int(1.0000 * 1e7),
31.0000,
mavutil.mavlink.MAV_MISSION_TYPE_MISSION),
])
def MAV_CMD_DO_SET_MISSION_CURRENT(self, target_sysid=None, target_compid=1):
'''Test handling of CMD_DO_SET_MISSION_CURRENT'''
if target_sysid is None:
target_sysid = self.sysid_thismav()
self.check_mission_upload_download(self.MAV_CMD_DO_SET_MISSION_CURRENT_mission())
self.set_current_waypoint(2)
self.set_current_waypoint_using_mav_cmd_do_set_mission_current(2)
self.run_cmd(
mavutil.mavlink.MAV_CMD_DO_SET_MISSION_CURRENT,
p1=17,
timeout=1,
target_sysid=target_sysid,
target_compid=target_compid,
want_result=mavutil.mavlink.MAV_RESULT_DENIED,
)
def FlashStorage(self):
'''Test flash storage (for parameters etc)'''
self.set_parameter("LOG_BITMASK", 1)
self.reboot_sitl()
self.customise_SITL_commandline([
"--set-storage-posix-enabled", "0",
"--set-storage-flash-enabled", "1",
])
if self.get_parameter("LOG_BITMASK") == 1:
raise NotAchievedException("not using flash storage?")
self.set_parameter("LOG_BITMASK", 2)
self.reboot_sitl()
self.assert_parameter_value("LOG_BITMASK", 2)
self.set_parameter("LOG_BITMASK", 3)
self.reboot_sitl()
self.assert_parameter_value("LOG_BITMASK", 3)
self.customise_SITL_commandline([])
self.assert_parameter_value("LOG_BITMASK", 1)
def FRAMStorage(self):
'''Test FRAM storage (for parameters etc)'''
self.set_parameter("LOG_BITMASK", 1)
self.reboot_sitl()
self.customise_SITL_commandline([
"--set-storage-posix-enabled", "0",
"--set-storage-fram-enabled", "1",
])
self.set_parameter("LOG_BITMASK", 2)
self.reboot_sitl()
self.assert_parameter_value("LOG_BITMASK", 2)
self.set_parameter("LOG_BITMASK", 3)
self.reboot_sitl()
self.assert_parameter_value("LOG_BITMASK", 3)
self.customise_SITL_commandline([])
self.assert_parameter_value("LOG_BITMASK", 1)
def RangeFinder(self):
'''Test RangeFinder'''
home_string = "%s,%s,%s,%s" % (51.8752066, 14.6487840, 54.15, 231)
rangefinder_params = {
"SIM_SONAR_ROT": 0,
}
rangefinder_params.update(self.analog_rangefinder_parameters())
self.set_parameters(rangefinder_params)
self.customise_SITL_commandline([
"--home", home_string,
])
self.wait_ready_to_arm()
if self.mavproxy is not None:
self.mavproxy.send('script /tmp/post-locations.scr\n')
self.context_push()
self.context_set_message_rate_hz('RANGEFINDER', self.sitl_streamrate())
m = self.assert_receive_message('RANGEFINDER', very_verbose=True)
self.context_pop()
if m.voltage == 0:
raise NotAchievedException("Did not get non-zero voltage")
want_range = 10
if abs(m.distance - want_range) > 0.5:
raise NotAchievedException("Expected %fm got %fm" % (want_range, m.distance))
def AIS(self):
'''Test AIS receiver'''
self.customise_SITL_commandline([
"--serial5=sim:AIS",
])
self.set_parameters({
"SERIAL5_PROTOCOL": 40,
"SERIAL5_BAUD": 38,
"AIS_TYPE": 1,
})
self.reboot_sitl()
self.set_parameter("SIM_AIS_COUNT", 5)
self.delay_sim_time(10)
m = self.assert_receive_message('AIS_VESSEL', timeout=60)
if m.MMSI == 0:
raise NotAchievedException("Invalid MMSI")
self.progress("Received AIS vessel MMSI=%u" % m.MMSI)
def AISMultipleVessels(self):
'''Test tracking multiple AIS vessels'''
self.customise_SITL_commandline([
"--serial5=sim:AIS",
])
self.set_parameters({
"SERIAL5_PROTOCOL": 40,
"SERIAL5_BAUD": 38,
"AIS_TYPE": 1,
})
self.reboot_sitl()
self.set_parameter("SIM_AIS_COUNT", 1)
m = self.assert_receive_message('AIS_VESSEL', timeout=30)
if m.MMSI == 0:
raise NotAchievedException("No vessel with count=1")
self.progress("Received vessel with count=1")
self.set_parameter("SIM_AIS_COUNT", 3)
vessels_seen = set()
start_time = self.get_sim_time()
while len(vessels_seen) < 3:
if self.get_sim_time() - start_time > 60:
raise NotAchievedException(
"Only saw %d vessels, expected 3" % len(vessels_seen)
)
m = self.assert_receive_message('AIS_VESSEL', timeout=10)
vessels_seen.add(m.MMSI)
self.progress("Tracked 3 unique vessels")
self.set_parameter("SIM_AIS_COUNT", 5)
vessels_seen = set()
start_time = self.get_sim_time()
while len(vessels_seen) < 5:
if self.get_sim_time() - start_time > 90:
raise NotAchievedException(
"Only saw %d vessels, expected 5" % len(vessels_seen)
)
m = self.assert_receive_message('AIS_VESSEL', timeout=10)
vessels_seen.add(m.MMSI)
self.progress("Successfully tracked vessels with counts 1, 3, and 5")
def AISDataValidation(self):
'''Test AIS vessel data validity'''
self.customise_SITL_commandline([
"--serial5=sim:AIS",
])
self.set_parameters({
"SERIAL5_PROTOCOL": 40,
"SERIAL5_BAUD": 38,
"AIS_TYPE": 1,
})
self.reboot_sitl()
self.set_parameter("SIM_AIS_COUNT", 3)
m = self.assert_receive_message('AIS_VESSEL', timeout=60)
if m.MMSI == 0:
raise NotAchievedException("Invalid MMSI: must be non-zero")
lat_deg = m.lat / 1e7
if abs(lat_deg) > 90:
raise NotAchievedException("Invalid latitude: %f" % lat_deg)
lon_deg = m.lon / 1e7
if abs(lon_deg) > 180:
raise NotAchievedException("Invalid longitude: %f" % lon_deg)
if m.heading > 65535:
raise NotAchievedException("Invalid heading: %u" % m.heading)
if m.COG > 65535:
raise NotAchievedException("Invalid COG: %u" % m.COG)
if m.velocity > 10000 and m.velocity != 65535:
raise NotAchievedException("Invalid velocity: %u" % m.velocity)
self.progress("AIS data validation passed: MMSI=%u lat=%.6f lon=%.6f" %
(m.MMSI, lat_deg, lon_deg))
def DepthFinder(self):
'''Test multiple depthfinders for boats'''
self.customise_SITL_commandline([
"--serial7=sim:nmea",
])
self.set_parameters({
"RNGFND1_TYPE" : 17,
"RNGFND1_ORIENT" : 25,
"SERIAL7_PROTOCOL" : 9,
"SERIAL7_BAUD" : 9600,
"RNGFND3_TYPE" : 2,
"RNGFND3_ADDR" : 112,
"RNGFND3_ORIENT" : 0,
"RNGFND6_ADDR" : 113,
"RNGFND6_ORIENT" : 25,
"RNGFND6_TYPE" : 2,
})
self.reboot_sitl()
self.wait_ready_to_arm()
self.assert_not_receive_message("WATER_DEPTH", timeout=2)
self.set_parameters({
"FRAME_CLASS": 2,
})
rangefinder = [None, None, None, None, None, None]
def check_rangefinder(mav, m):
if m.get_type() != 'WATER_DEPTH':
return
id = m.id
if id == 2:
raise NotAchievedException("Depthfinder Instance %i with non-downward orientation found" % (id))
rangefinder[id] = True
if id == 0:
if float(m.temperature) == 0.0:
raise NotAchievedException("Depthfinder Instance %i NMEA with temperature not found" % (id))
elif id == 5:
if float(m.temperature) != 0.0:
raise NotAchievedException("Depthfinder Instance %i should not have temperature" % (id))
self.wait_ready_to_arm()
self.arm_vehicle()
self.install_message_hook_context(check_rangefinder)
self.drive_mission("rover1.txt", strict=False)
if rangefinder[0] is None:
raise NotAchievedException("Never saw Depthfinder 1")
if rangefinder[2] is not None:
raise NotAchievedException("Should not have found a Depthfinder 3")
if rangefinder[5] is None:
raise NotAchievedException("Never saw Depthfinder 6")
if not self.current_onboard_log_contains_message("DPTH"):
raise NotAchievedException("Expected DPTH log message")
self.progress("Ensuring we get WATER_DEPTH at 12Hz with 2 backends")
self.set_message_rate_hz('WATER_DEPTH', 12)
self.assert_message_rate_hz('WATER_DEPTH', 12)
def EStopAtBoot(self):
'''Ensure EStop prevents arming when asserted at boot time'''
self.context_push()
self.set_parameters({
"RC9_OPTION": 31,
})
self.set_rc(9, 2000)
self.reboot_sitl()
self.assert_prearm_failure(
"Motors Emergency Stopped",
other_prearm_failures_fatal=False)
self.context_pop()
self.reboot_sitl()
def assert_mode(self, mode):
if not self.mode_is(mode):
raise NotAchievedException("Mode is not %s" % str(mode))
def ChangeModeByNumber(self):
'''ensure we can set a mode by number, handy when we don't have a
pymavlink number for it yet'''
for (x, want) in (0, 'MANUAL'), (1, 'ACRO'), (3, 3):
self.change_mode(x)
self.assert_mode(want)
def StickMixingAuto(self):
'''Ensure Stick Mixing works in auto'''
items = []
self.set_parameter('STICK_MIXING', 1)
items.append((mavutil.mavlink.MAV_CMD_NAV_WAYPOINT, 0, 0, 0),)
items.append((mavutil.mavlink.MAV_CMD_NAV_WAYPOINT, 2000, 0, 0),)
self.upload_simple_relhome_mission(items)
if self.mavproxy is not None:
self.mavproxy.send("wp list\n")
self.change_mode('AUTO')
self.wait_ready_to_arm()
self.arm_vehicle()
self.set_rc(1, 1150)
self.wait_heading(45)
self.wait_heading(90)
self.disarm_vehicle()
def AutoDock(self):
'''Test automatic docking of rover for multiple FOVs of simulated beacon'''
self.set_parameters({
"PLND_ENABLED": 1,
"PLND_TYPE": 4,
"PLND_ORIENT": 0,
})
start = self.mav.location()
target = self.offset_location_ne(start, 50, 0)
self.progress("Setting target to %f %f" % (start.lat, start.lng))
stopping_dist = 0.5
self.set_parameters({
"SIM_PLD_ENABLE": 1,
"SIM_PLD_LAT": target.lat,
"SIM_PLD_LON": target.lng,
"SIM_PLD_HEIGHT": 0,
"SIM_PLD_ALT_LMT": 30,
"SIM_PLD_DIST_LMT": 30,
"SIM_PLD_ORIENT": 4,
"SIM_PLD_OPTIONS": 1,
"DOCK_SPEED": 2,
"DOCK_STOP_DIST": stopping_dist,
})
for type in range(0, 3):
self.set_parameter("SIM_PLD_TYPE", type)
self.reboot_sitl()
self.change_mode('GUIDED')
self.wait_ready_to_arm()
self.arm_vehicle()
initial_position = self.offset_location_ne(target, -20, -2)
self.drive_to_location(initial_position)
self.change_mode(8)
max_delta = 1
self.wait_distance_to_location(target, 0, max_delta, timeout=180)
self.disarm_vehicle()
self.assert_receive_message('GLOBAL_POSITION_INT')
new_pos = self.mav.location()
delta = abs(self.get_distance(target, new_pos) - stopping_dist)
self.progress("Docked %f metres from stopping point" % delta)
if delta > max_delta:
raise NotAchievedException("Did not dock close enough to stopping point (%fm > %fm" % (delta, max_delta))
if not self.current_onboard_log_contains_message("PL"):
raise NotAchievedException("Did not see expected PL message")
self.progress("All done")
def PrivateChannel(self):
'''test the serial option bit specifying a mavlink channel as private'''
global mav2
port = self.adjust_ardupilot_port(5763)
mav2 = mavutil.mavlink_connection("tcp:localhost:%u" % port,
robust_parsing=True,
source_system=7,
source_component=7)
def heartbeat_on_mav2(mav, m):
'''send a heartbeat on mav2 whenever we get one on mav'''
if mav == mav2:
return
if m.get_type() == 'HEARTBEAT':
mav2.mav.heartbeat_send(
mavutil.mavlink.MAV_TYPE_ONBOARD_CONTROLLER,
mavutil.mavlink.MAV_AUTOPILOT_INVALID,
0,
0,
0)
return
self.assert_receive_message("HEARTBEAT", mav=mav2)
self.install_message_hook_context(heartbeat_on_mav2)
self.progress("Ensuring we can get a message normally")
self.poll_message("AUTOPILOT_VERSION", mav=mav2)
self.progress("Polling AUTOPILOT_VERSION from random sysid")
self.send_poll_message("AUTOPILOT_VERSION", mav=mav2, target_sysid=134)
self.assert_not_receive_message("AUTOPILOT_VERSION", mav=mav2, timeout=10)
tstart = self.get_sim_time()
while True:
if self.get_sim_time_cached() - tstart > 5:
raise NotAchievedException("Did not get expected heartbeat from %u" % 7)
m = self.assert_receive_message("HEARTBEAT")
if m.get_srcSystem() == 7:
self.progress("Got heartbeat from (%u) on non-private channel" % 7)
break
tstart = self.get_sim_time()
while True:
if self.get_sim_time_cached() - tstart > 5:
raise NotAchievedException("Did not get expected heartbeat from %u" % self.mav.source_system)
m = self.assert_receive_message("HEARTBEAT", mav=mav2)
if m.get_srcSystem() == self.mav.source_system:
self.progress("Got heartbeat from (%u) on non-private channel" % self.mav.source_system)
break
def printmessage(mav, m):
if mav == mav2:
return
print("Got (%u/%u) (%s) " % (m.get_srcSystem(), m.get_srcComponent(), str(m)))
self.set_parameter("MAV3_OPTIONS", 2)
self.reboot_sitl()
mav2 = mavutil.mavlink_connection("tcp:localhost:5763",
robust_parsing=True,
source_system=7,
source_component=7)
self.send_poll_message("AUTOPILOT_VERSION", mav=mav2, target_sysid=134)
self.assert_not_receive_message("AUTOPILOT_VERSION", mav=mav2, timeout=10)
self.drain_mav(self.mav)
self.drain_mav(mav2)
tstart = self.get_sim_time()
while True:
if self.get_sim_time_cached() - tstart > 5:
break
m = self.assert_receive_message("HEARTBEAT")
if m.get_srcSystem() == 7:
raise NotAchievedException("Got heartbeat from private channel")
self.progress("ensure no outside heartbeats reach private channels")
tstart = self.get_sim_time()
while True:
if self.get_sim_time_cached() - tstart > 5:
break
m = self.assert_receive_message("HEARTBEAT")
if m.get_srcSystem() == 1 and m.get_srcComponent() == 1:
continue
raise NotAchievedException("Got heartbeat on private channel from non-vehicle")
def MAV_CMD_DO_SET_REVERSE(self):
'''test MAV_CMD_DO_SET_REVERSE command'''
self.change_mode('GUIDED')
self.wait_ready_to_arm()
self.arm_vehicle()
here = self.mav.location()
target_loc = self.offset_location_ne(here, 2000, 0)
self.send_guided_mission_item(target_loc)
self.wait_groundspeed(3, 100, minimum_duration=5)
for method in self.run_cmd, self.run_cmd_int:
self.progress("Forwards!")
method(mavutil.mavlink.MAV_CMD_DO_SET_REVERSE, p1=0)
self.wait_heading(0)
self.progress("Backwards!")
method(mavutil.mavlink.MAV_CMD_DO_SET_REVERSE, p1=1)
self.wait_heading(180)
self.progress("Forwards!")
method(mavutil.mavlink.MAV_CMD_DO_SET_REVERSE, p1=0)
self.wait_heading(0)
self.disarm_vehicle()
def MAV_CMD_NAV_RETURN_TO_LAUNCH(self):
'''test MAV_CMD_NAV_RETURN_TO_LAUNCH mavlink command'''
self.change_mode('GUIDED')
self.wait_ready_to_arm()
self.arm_vehicle()
here = self.mav.location()
target_loc = self.offset_location_ne(here, 2000, 0)
self.send_guided_mission_item(target_loc)
self.wait_distance_to_home(20, 100)
self.run_cmd(mavutil.mavlink.MAV_CMD_NAV_RETURN_TO_LAUNCH)
self.wait_mode('RTL')
self.change_mode('GUIDED')
self.run_cmd_int(mavutil.mavlink.MAV_CMD_NAV_RETURN_TO_LAUNCH)
self.wait_mode('RTL')
self.wait_distance_to_home(0, 5, timeout=30)
self.disarm_vehicle()
def MAV_CMD_DO_CHANGE_SPEED(self):
'''test MAV_CMD_NAV_RETURN_TO_LAUNCH mavlink command'''
self.change_mode('GUIDED')
self.wait_ready_to_arm()
self.arm_vehicle()
original_loc = self.mav.location()
here = original_loc
target_loc = self.offset_location_ne(here, 2000, 0)
self.send_guided_mission_item(target_loc)
self.wait_distance_to_home(20, 100)
speeds = 3, 7, 12, 4
for speed in speeds:
self.run_cmd(mavutil.mavlink.MAV_CMD_DO_CHANGE_SPEED, p2=speed)
self.wait_groundspeed(speed-0.5, speed+0.5, minimum_duration=5)
self.send_guided_mission_item(original_loc)
for speed in speeds:
self.run_cmd_int(mavutil.mavlink.MAV_CMD_DO_CHANGE_SPEED, p2=speed)
self.wait_groundspeed(speed-0.5, speed+0.5, minimum_duration=5)
self.change_mode('RTL')
self.wait_distance_to_home(0, 5, timeout=30)
self.disarm_vehicle()
def MAV_CMD_MISSION_START(self):
'''simple test for starting missing using this command'''
self.upload_simple_relhome_mission([
(mavutil.mavlink.MAV_CMD_NAV_WAYPOINT, 0, 0, 0),
(mavutil.mavlink.MAV_CMD_NAV_WAYPOINT, 2000, 0, 0),
])
self.change_mode('AUTO')
self.wait_ready_to_arm()
self.arm_vehicle()
for method in self.run_cmd, self.run_cmd_int:
self.change_mode('MANUAL')
self.wait_groundspeed(0, 1)
method(mavutil.mavlink.MAV_CMD_MISSION_START)
self.wait_mode('AUTO')
self.wait_groundspeed(3, 100)
self.disarm_vehicle()
def MAV_CMD_NAV_SET_YAW_SPEED(self):
'''tests for MAV_CMD_NAV_SET_YAW_SPEED guided-mode command'''
self.change_mode('GUIDED')
self.wait_ready_to_arm()
self.arm_vehicle()
for method in self.run_cmd, self.run_cmd_int:
self.change_mode('MANUAL')
self.wait_groundspeed(0, 1)
self.change_mode('GUIDED')
self.start_subtest("Absolute angles")
for (heading, speed) in (10, 5), (190, 10), (0, 2), (135, 6):
def cf(*args, **kwargs):
method(
mavutil.mavlink.MAV_CMD_NAV_SET_YAW_SPEED,
p1=heading,
p2=speed,
p3=0,
)
self.wait_groundspeed(speed-0.5, speed+0.5, called_function=cf, minimum_duration=2)
self.wait_heading(heading-0.5, heading+0.5, called_function=cf, minimum_duration=2)
self.start_subtest("relative angles")
original_angle = 90
method(
mavutil.mavlink.MAV_CMD_NAV_SET_YAW_SPEED,
p1=original_angle,
p2=5,
p3=0,
)
self.wait_groundspeed(4, 6)
self.wait_heading(original_angle-0.5, original_angle+0.5)
expected_angle = original_angle
for (angle_delta, speed) in (5, 6), (-30, 2), (180, 7):
method(
mavutil.mavlink.MAV_CMD_NAV_SET_YAW_SPEED,
p1=angle_delta,
p2=speed,
p3=1,
)
def cf(*args, **kwargs):
method(
mavutil.mavlink.MAV_CMD_NAV_SET_YAW_SPEED,
p1=0,
p2=speed,
p3=1,
)
expected_angle += angle_delta
if expected_angle < 0:
expected_angle += 360
if expected_angle > 360:
expected_angle -= 360
self.wait_groundspeed(speed-0.5, speed+0.5, called_function=cf, minimum_duration=2)
self.wait_heading(expected_angle, called_function=cf, minimum_duration=2)
self.do_RTL()
self.disarm_vehicle()
def _MAV_CMD_GET_HOME_POSITION(self, run_cmd):
'''test handling of mavlink command MAV_CMD_GET_HOME_POSITION'''
self.context_collect('HOME_POSITION')
run_cmd(mavutil.mavlink.MAV_CMD_GET_HOME_POSITION)
self.assert_receive_message('HOME_POSITION', check_context=True)
def MAV_CMD_GET_HOME_POSITION(self):
'''test handling of mavlink command MAV_CMD_GET_HOME_POSITION'''
self.change_mode('LOITER')
self.wait_ready_to_arm()
self._MAV_CMD_GET_HOME_POSITION(self.run_cmd)
self._MAV_CMD_GET_HOME_POSITION(self.run_cmd_int)
def MAV_CMD_DO_FENCE_ENABLE(self):
'''ensure MAV_CMD_DO_FENCE_ENABLE mavlink command works'''
here = self.mav.location()
self.upload_fences_from_locations([
(mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_EXCLUSION, [
self.offset_location_ne(here, -50, 20),
self.offset_location_ne(here, 50, 20),
self.offset_location_ne(here, 50, 40),
self.offset_location_ne(here, -50, 40),
]),
(mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_EXCLUSION, [
self.offset_location_ne(here, -50, -50),
self.offset_location_ne(here, -50, 50),
self.offset_location_ne(here, 50, 50),
self.offset_location_ne(here, 50, -50),
]),
])
self.run_cmd(mavutil.mavlink.MAV_CMD_DO_FENCE_ENABLE, p1=1)
self.assert_fence_enabled()
self.run_cmd_int(mavutil.mavlink.MAV_CMD_DO_FENCE_ENABLE, p1=0)
self.assert_fence_disabled()
def MAV_CMD_BATTERY_RESET(self):
'''manipulate battery levels with MAV_CMD_BATTERY_RESET'''
for (run_cmd, value) in (self.run_cmd, 56), (self.run_cmd_int, 97):
run_cmd(
mavutil.mavlink.MAV_CMD_BATTERY_RESET,
p1=65535,
p2=value,
)
self.assert_received_message_field_values('BATTERY_STATUS', {
"battery_remaining": value,
}, {
"poll": True,
})
def TestWebServer(self, url):
'''test active web server'''
self.progress("Accessing webserver main page")
import urllib.request
main_page = urllib.request.urlopen(url).read().decode('utf-8')
if main_page.find('ArduPilot Web Server') == -1:
raise NotAchievedException("Expected banner on main page")
board_status = urllib.request.urlopen(url + '/@DYNAMIC/board_status.shtml').read().decode('utf-8')
if board_status.find('0 hours') == -1:
raise NotAchievedException("Expected uptime in board status")
if board_status.find('40.713') == -1:
raise NotAchievedException("Expected latitude in board status")
self.progress("WebServer tests OK")
def NetworkingWebServer(self):
'''web server'''
applet_script = "net_webserver.lua"
self.context_push()
self.install_applet_script_context(applet_script)
self.set_parameters({
"SCR_ENABLE": 1,
"SCR_VM_I_COUNT": 1000000,
"SIM_SPEEDUP": 20,
"NET_ENABLE": 1,
})
self.reboot_sitl()
self.context_push()
self.context_collect('STATUSTEXT')
self.set_parameters({
"WEB_BIND_PORT": 8081,
})
self.scripting_restart()
self.wait_text("WebServer: starting on port 8081", check_context=True)
self.wait_ready_to_arm()
self.TestWebServer("http://127.0.0.1:8081")
self.context_pop()
self.context_pop()
self.reboot_sitl()
def NetworkingWebServerPPP(self):
'''web server over PPP'''
applet_script = "net_webserver.lua"
self.context_push()
self.install_applet_script_context(applet_script)
self.set_parameters({
"SCR_ENABLE": 1,
"SCR_VM_I_COUNT": 1000000,
"SIM_SPEEDUP": 20,
"NET_ENABLE": 1,
"SERIAL5_PROTOCOL": 48,
})
self.progress('rebuilding rover with ppp enabled')
import shutil
shutil.copy('build/sitl/bin/ardurover', 'build/sitl/bin/ardurover.noppp')
util.build_SITL('bin/ardurover', clean=False, configure=True, extra_configure_args=['--enable-PPP', '--debug'])
self.reboot_sitl()
self.progress("Starting PPP daemon")
pppd = util.start_PPP_daemon("192.168.14.15:192.168.14.13", '127.0.0.1:5765')
self.context_push()
self.context_collect('STATUSTEXT')
pppd.expect("remote IP address 192.168.14.13")
self.progress("PPP daemon started")
self.set_parameters({
"WEB_BIND_PORT": 8081,
})
self.scripting_restart()
self.wait_text("WebServer: starting on port 8081", check_context=True)
self.wait_ready_to_arm()
self.TestWebServer("http://192.168.14.13:8081")
self.context_pop()
self.context_pop()
os.unlink('build/sitl/bin/ardurover')
shutil.copy('build/sitl/bin/ardurover.noppp', 'build/sitl/bin/ardurover')
self.reboot_sitl()
def FenceFullAndPartialTransfer(self, target_system=1, target_component=1):
'''ensure starting a fence transfer then a partial transfer behaves
appropriately'''
self.mav.mav.mission_count_send(
target_system,
target_component,
10,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE
)
self.assert_receive_mission_item_request(mavutil.mavlink.MAV_MISSION_TYPE_FENCE, 0)
self.mav.mav.mission_write_partial_list_send(
target_system,
target_component,
3,
3,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE)
self.assert_receive_mission_ack(
mavutil.mavlink.MAV_MISSION_TYPE_FENCE,
want_type=mavutil.mavlink.MAV_MISSION_DENIED,
)
self.assert_receive_mission_ack(
mavutil.mavlink.MAV_MISSION_TYPE_FENCE,
want_type=mavutil.mavlink.MAV_MISSION_OPERATION_CANCELLED,
)
def MissionRetransfer(self, target_system=1, target_component=1):
'''torture-test with MISSION_COUNT'''
self.mav.mav.mission_count_send(
target_system,
target_component,
10,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE
)
self.assert_receive_mission_item_request(mavutil.mavlink.MAV_MISSION_TYPE_FENCE, 0)
self.context_push()
self.context_collect('STATUSTEXT')
self.mav.mav.mission_count_send(
target_system,
target_component,
10000,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE
)
self.wait_statustext('Only [0-9]+ items are supported', regex=True, check_context=True)
self.context_pop()
self.assert_not_receive_message('MISSION_REQUEST')
self.mav.mav.mission_count_send(
target_system,
target_component,
10,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE
)
self.assert_receive_mission_item_request(mavutil.mavlink.MAV_MISSION_TYPE_FENCE, 0)
self.assert_receive_mission_ack(
mavutil.mavlink.MAV_MISSION_TYPE_FENCE,
want_type=mavutil.mavlink.MAV_MISSION_OPERATION_CANCELLED,
)
def MissionPolyEnabledPreArm(self):
'''check Polygon porearm checks'''
self.set_parameters({
'FENCE_ENABLE': 1,
})
self.progress("Ensure that we can arm if polyfence is enabled but we have no polyfence")
self.assert_parameter_value('FENCE_TYPE', 6)
self.wait_ready_to_arm()
self.reboot_sitl()
self.wait_ready_to_arm()
self.progress("Ensure we can arm when we have an inclusion fence we are inside of")
here = self.mav.location()
self.upload_fences_from_locations([
(mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_INCLUSION, [
self.offset_location_ne(here, -50, -50),
self.offset_location_ne(here, -50, 50),
self.offset_location_ne(here, 50, 50),
self.offset_location_ne(here, 50, -50),
]),
])
self.delay_sim_time(5)
self.wait_ready_to_arm()
self.reboot_sitl()
self.wait_ready_to_arm()
self.progress("Ensure we can't arm when we are in breacnh of a polyfence")
self.clear_fence()
self.progress("Now create a fence we are in breach of")
here = self.mav.location()
self.upload_fences_from_locations([
(mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_INCLUSION, [
self.offset_location_ne(here, 20, 20),
self.offset_location_ne(here, 20, 50),
self.offset_location_ne(here, 50, 50),
self.offset_location_ne(here, 50, 20),
]),
])
self.assert_prearm_failure('Vehicle breaching Polygon fence', other_prearm_failures_fatal=False)
self.reboot_sitl()
self.assert_prearm_failure('Vehicle breaching Polygon fence', other_prearm_failures_fatal=False, timeout=120)
self.progress("Ensure we can arm when a polyfence fence is cleared when we've previously been in breach")
self.clear_fence()
self.wait_ready_to_arm()
self.upload_fences_from_locations([
(mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_INCLUSION, [
self.offset_location_ne(here, 20, 20),
self.offset_location_ne(here, 20, 50),
self.offset_location_ne(here, 50, 50),
self.offset_location_ne(here, 50, 20),
]),
])
self.reboot_sitl()
self.assert_prearm_failure('Vehicle breaching Polygon fence', other_prearm_failures_fatal=False, timeout=120)
self.clear_fence()
self.wait_ready_to_arm()
self.progress("Ensure we can arm after clearing polygon fence type enabled")
self.upload_fences_from_locations([
(mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_INCLUSION, [
self.offset_location_ne(here, 20, 20),
self.offset_location_ne(here, 20, 50),
self.offset_location_ne(here, 50, 50),
self.offset_location_ne(here, 50, 20),
]),
])
self.assert_prearm_failure('Vehicle breaching Polygon fence', other_prearm_failures_fatal=False, timeout=120)
self.set_parameter('FENCE_TYPE', 2)
self.wait_ready_to_arm()
self.set_parameter('FENCE_TYPE', 6)
self.assert_prearm_failure('Vehicle breaching Polygon fence', other_prearm_failures_fatal=False, timeout=120)
def OpticalFlow(self):
'''lightly test OpticalFlow'''
self.wait_sensor_state(mavutil.mavlink.MAV_SYS_STATUS_SENSOR_OPTICAL_FLOW, False, False, False, verbose=True)
self.context_push()
self.set_parameter("SIM_FLOW_ENABLE", 1)
self.set_parameter("FLOW_TYPE", 10)
self.reboot_sitl()
self.wait_sensor_state(mavutil.mavlink.MAV_SYS_STATUS_SENSOR_OPTICAL_FLOW, True, True, True, verbose=True)
self.context_pop()
self.reboot_sitl()
def RCDuplicateOptionsExist(self):
'''ensure duplicate RC option detection works'''
self.wait_ready_to_arm()
self.set_parameters({
"RC6_OPTION": 118,
"RC7_OPTION": 118,
})
self.assert_arm_failure("Duplicate Aux Switch Options")
def JammingSimulation(self):
'''Test jamming simulation works'''
self.wait_ready_to_arm()
start_loc = self.assert_receive_message('GPS_RAW_INT')
self.set_parameter("SIM_GPS1_JAM", 1)
class Requirement():
def __init__(self, field, min_value):
self.field = field
self.min_value = min_value
def met(self, m):
return getattr(m, self.field) > self.min_value
requirements = set([
Requirement('v_acc', 50000),
Requirement('h_acc', 50000),
Requirement('vel_acc', 1000),
Requirement('vel', 10000),
])
low_sats_seen = False
seen_bad_loc = False
tstart = self.get_sim_time()
while True:
if self.get_sim_time() - tstart > 120:
raise NotAchievedException("Did not see all jamming")
m = self.assert_receive_message('GPS_RAW_INT')
new_requirements = copy.copy(requirements)
for requirement in requirements:
if requirement.met(m):
new_requirements.remove(requirement)
requirements = new_requirements
if m.satellites_visible < 6:
low_sats_seen = True
here = self.assert_receive_message('GPS_RAW_INT')
if self.get_distance_int(start_loc, here) > 100:
seen_bad_loc = True
if len(requirements) == 0 and low_sats_seen and seen_bad_loc:
break
def BatteryInvalid(self):
'''check Battery prearms report useful data to user'''
self.start_subtest("Changing battery types makes no difference")
self.set_parameter("BATT_MONITOR", 0)
self.assert_prearm_failure("Battery 1 unhealthy", other_prearm_failures_fatal=False)
self.set_parameter("BATT_MONITOR", 4)
self.wait_ready_to_arm()
self.start_subtest("No battery monitor should be armable")
self.set_parameter("BATT_MONITOR", 0)
self.reboot_sitl()
self.wait_ready_to_arm()
self.set_parameter("BATT_MONITOR", 4)
self.assert_prearm_failure("Battery 1 unhealthy", other_prearm_failures_fatal=False)
self.reboot_sitl()
self.wait_ready_to_arm()
self.start_subtest("Invalid backend should have a clear error")
self.set_parameter("BATT_MONITOR", 98)
self.reboot_sitl()
self.assert_prearm_failure("Battery 1 unhealthy", other_prearm_failures_fatal=False)
self.start_subtest("Switching from an invalid backend to a valid backend should require a reboot")
self.set_parameter("BATT_MONITOR", 4)
self.assert_prearm_failure("Battery 1 unhealthy", other_prearm_failures_fatal=False)
self.start_subtest("Switching to None should NOT require a reboot")
self.set_parameter("BATT_MONITOR", 0)
self.wait_ready_to_arm()
def generate_polyfence(self, centre_loc, command, radius, count, rotation=0):
'''adds a number of waypoints equally spaced around a circle
'''
if count < 3:
raise ValueError("Invalid count (%s)" % str(count))
if radius <= 0:
raise ValueError("Invalid radius (%s)" % str(radius))
latlon = (centre_loc.lat, centre_loc.lng)
items = []
for i in range(0, count):
(lat, lon) = mavextra.gps_newpos(latlon[0],
latlon[1],
360/float(count)*i + rotation,
radius)
m = mavutil.mavlink.MAVLink_mission_item_int_message(
1,
1,
0,
mavutil.mavlink.MAV_FRAME_GLOBAL,
command,
0,
0,
count,
0.0,
0.0,
0.0,
int(lat*1e7),
int(lon*1e7),
0,
mavutil.mavlink.MAV_MISSION_TYPE_FENCE,
)
items.append(m)
return items
def SDPolyFence(self):
'''test storage of fence on SD card'''
self.set_parameters({
'BRD_SD_FENCE': 32767,
})
self.reboot_sitl()
home = self.home_position_as_mav_location()
fence = self.generate_polyfence(
home,
mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_INCLUSION,
radius=100,
count=100,
)
for bearing in range(0, 359, 60):
x = self.offset_location_heading_distance(home, bearing, 100)
fence.extend(self.generate_polyfence(
x,
mavutil.mavlink.MAV_CMD_NAV_FENCE_POLYGON_VERTEX_EXCLUSION,
radius=100,
count=100,
))
self.correct_wp_seq_numbers(fence)
self.check_fence_upload_download(fence)
self.delay_sim_time(1000)
def ManyMAVLinkConnections(self):
'''test testing >8 MAVLink connections'''
self.set_parameters({
"SERIAL3_PROTOCOL": 2,
"SERIAL4_PROTOCOL": 2,
"SERIAL5_PROTOCOL": 2,
"SERIAL6_PROTOCOL": 2,
"SERIAL7_PROTOCOL": 2,
"NET_ENABLE": 1,
"NET_P1_TYPE": 4,
"NET_P1_IP0": 127,
"NET_P1_IP1": 0,
"NET_P1_IP2": 0,
"NET_P1_IP3": 1,
"NET_P1_PORT": 6700,
"NET_P1_PROTOCOL": 2,
"NET_P2_TYPE": 4,
"NET_P2_IP0": 127,
"NET_P2_IP1": 0,
"NET_P2_IP2": 0,
"NET_P2_IP3": 1,
"NET_P2_PORT": 6701,
"NET_P2_PROTOCOL": 2,
"NET_P3_TYPE": 4,
"NET_P3_IP0": 127,
"NET_P3_IP1": 0,
"NET_P3_IP2": 0,
"NET_P3_IP3": 1,
"NET_P3_PORT": 6702,
"NET_P3_PROTOCOL": 2,
"NET_P4_TYPE": 4,
"NET_P4_IP0": 127,
"NET_P4_IP1": 0,
"NET_P4_IP2": 0,
"NET_P4_IP3": 1,
"NET_P4_PORT": 6703,
"NET_P4_PROTOCOL": 2,
"SCR_ENABLE": 1,
})
self.install_example_script_context("simple_loop.lua")
self.customise_SITL_commandline([
"--serial3=tcp:4",
"--serial4=tcp:1",
])
self.wait_statustext("hello, world")
conns = {}
for port in 5761, 5762, 5763, 5764, 5765, 5766, 5767, 6700, 6701, 6702, 6703:
cstring = f"tcp:localhost:{port}"
self.progress(f"Connecting to {cstring}")
c = mavutil.mavlink_connection(
cstring,
robust_parsing=True,
source_system=7,
source_component=7,
)
conns[port] = c
for repeats in range(1, 5):
for (port, conn) in conns.items():
while True:
self.progress(f"Checking port {port}")
m = self.assert_receive_message('STATUSTEXT', mav=conn, timeout=120)
self.drain_all_pexpects()
self.drain_mav()
if m.text == "hello, world":
self.progress(f"Received {m.text} from port {port}")
break
def REQUIRE_LOCATION_FOR_ARMING(self):
'''check DriveOption::REQUIRE_POSITION_FOR_ARMING works'''
self.context_push()
self.set_parameters({
"SIM_GPS1_NUMSATS": 3,
"AHRS_GPS_USE": 0,
})
self.reboot_sitl()
self.change_mode('MANUAL')
self.wait_prearm_sys_status_healthy()
self.arm_vehicle()
self.disarm_vehicle()
self.change_mode('GUIDED')
self.assert_prearm_failure("Need Position Estimate", other_prearm_failures_fatal=False)
self.change_mode('MANUAL')
self.set_parameters({
"ARMING_NEED_LOC": 1,
})
self.assert_prearm_failure("Need Position Estimate", other_prearm_failures_fatal=False)
self.context_pop()
self.reboot_sitl()
def SafetySwitch(self):
'''check safety switch works'''
self.start_subtest("Make sure we don't start moving when safety switch enabled")
self.wait_ready_to_arm()
self.set_rc(1, 2000)
self.wait_armed()
self.set_rc(1, 1500)
self.set_safetyswitch_on()
self.wait_groundspeed(0, 0.1, minimum_duration=2)
self.set_safetyswitch_off()
self.disarm_vehicle()
self.start_subtest("Make sure we stop moving when safety switch enabled")
self.wait_ready_to_arm()
self.arm_vehicle()
self.set_rc(3, 1700)
self.wait_groundspeed(5, 100, minimum_duration=2)
self.set_safetyswitch_on()
self.wait_groundspeed(0, 0.1, minimum_duration=2)
self.set_safetyswitch_off()
self.wait_groundspeed(5, 100, minimum_duration=2)
self.set_rc(3, 1500)
self.disarm_vehicle()
def GetMessageInterval(self):
'''check that the two methods for requesting a MESSAGE_INTERVAL message are equivalent'''
target_msg = mavutil.mavlink.MAVLINK_MSG_ID_HOME_POSITION
old_cmd = (mavutil.mavlink.MAV_CMD_GET_MESSAGE_INTERVAL, target_msg, 0)
new_cmd = (mavutil.mavlink.MAV_CMD_REQUEST_MESSAGE, mavutil.mavlink.MAVLINK_MSG_ID_MESSAGE_INTERVAL, target_msg)
interval_us = None
for run_command in self.run_cmd, self.run_cmd_int:
for cmd in old_cmd, new_cmd:
cmd_id, p1, p2 = cmd
self.context_collect("MESSAGE_INTERVAL")
run_command(cmd_id, p1=p1, p2=p2)
m = self.assert_receive_message('MESSAGE_INTERVAL', timeout=1, check_context=True)
self.context_clear_collection("MESSAGE_INTERVAL")
if m.message_id != target_msg:
raise NotAchievedException(f"Unexpected ID in MESSAGE_INTERVAL (want={target_msg}, got={m.message_id})")
if interval_us is None:
interval_us = m.interval_us
if m.interval_us != interval_us:
raise NotAchievedException(f"Unexpected interval_us (want={interval_us}, got={m.interval_us})")
def ScriptingLocationBindings(self):
'''test scripting bindings'''
self.install_script_content_context("test-scripting-bindings.lua", """
function main()
here = ahrs:get_location()
if here == nil then
gcs:send_text(0, "Need location")
return
end
if here.alt == 0 then
gcs:send_text(0, "zero altitude?!")
return
end
ALT_FRAME = { ABSOLUTE = 0, ABOVE_HOME = 1, ABOVE_ORIGIN = 2, ABOVE_TERRAIN = 3 }
here:set_alt_m(12.34, ALT_FRAME.ABSOLUTE)
want = 1234
if here:alt() ~= want then
gcs:send_text(0, string.format("Bad altitude" .. here:alt() .. " want " .. want))
return
end
gcs:send_text(0, "Tests OK")
end
function update()
main()
return update, 1000
end
return update()
""")
self.set_parameter('SCR_ENABLE', 1)
self.reboot_sitl()
self.wait_statustext('Tests OK')
def DriveEachFrame(self):
'''drive each frame (except BalanceBot) in a mission to ensure basic functionality'''
vinfo = vehicleinfo.VehicleInfo()
vinfo_options = vinfo.options[self.vehicleinfo_key()]
known_broken_frames = {
"balancebot": "needs special stay-upright code",
"motorboat-skid": "gets stuck between waypoints 2 and 3",
}
for frame in sorted(vinfo_options["frames"].keys()):
self.start_subtest("Testing frame (%s)" % str(frame))
if frame in known_broken_frames:
self.progress("Actually, no I'm not - it is known-broken (%s)" %
(known_broken_frames[frame]))
continue
frame_bits = vinfo_options["frames"][frame]
print("frame_bits: %s" % str(frame_bits))
if frame_bits.get("external", False):
self.progress("Actually, no I'm not - it is an external simulation")
continue
model = frame_bits.get("model", frame)
defaults = self.model_defaults_filepath(frame)
if not isinstance(defaults, list):
defaults = [defaults]
self.customise_SITL_commandline(
[],
defaults_filepath=defaults,
model=model,
wipe=True,
)
mission_file = "basic.txt"
self.wait_ready_to_arm()
self.set_parameters({
"MIS_DONE_BEHAVE": 3,
"SIM_WIND_SPD": 10,
})
self.arm_vehicle()
self.drive_mission(mission_file, strict=False, ignore_MANUAL_mode_change=True)
self.wait_mode('MANUAL')
if self.distance_to_home() > 2:
raise NotAchievedException("Did not get home!")
def AP_ROVER_AUTO_ARM_ONCE_ENABLED(self):
'''test Rover arm-once-when-ready behaviour'''
self.wait_ready_to_arm()
self.arm_vehicle()
self.disarm_vehicle()
self.set_parameter("ARMING_REQUIRE", 3)
self.reboot_sitl()
self.wait_armed(timeout=120)
self.disarm_vehicle()
vehicle_test_suite.WaitAndMaintainDisarmed(
self,
minimum_duration=10,
timeout=30,
).run()
def tests(self):
'''return list of all tests'''
ret = super(AutoTestRover, self).tests()
ret.extend([
self.MAVProxy_SetModeUsingSwitch,
self.HIGH_LATENCY2,
self.MAVProxy_SetModeUsingMode,
self.ModeSwitch,
self.AuxModeSwitch,
self.DriveRTL,
self.SmartRTL,
self.DriveSquare,
self.DriveMission,
self.MAV_CMD_DO_SEND_BANNER,
self.DO_SET_MODE,
self.MAVProxy_DO_SET_MODE,
self.ServoRelayEvents,
self.RCOverrides,
self.RCOverridesCancel,
self.MANUAL_CONTROL,
self.Sprayer,
self.AC_Avoidance,
self.CameraMission,
self.Gripper,
self.GripperMission,
self.SET_MESSAGE_INTERVAL,
self.MESSAGE_INTERVAL_COMMAND_INT,
self.REQUEST_MESSAGE,
self.MAV_GCS_ENFORCE,
self.SET_ATTITUDE_TARGET,
self.SET_ATTITUDE_TARGET_heading,
self.SET_POSITION_TARGET_LOCAL_NED,
self.MAV_CMD_DO_SET_MISSION_CURRENT,
self.MAV_CMD_DO_CHANGE_SPEED,
self.MAV_CMD_MISSION_START,
self.MAV_CMD_NAV_SET_YAW_SPEED,
self.Button,
self.Rally,
self.Offboard,
self.MAVProxyParam,
self.GCSFence,
self.GCSFenceInvalidPoint,
self.GCSMission,
self.MotorTest,
self.WheelEncoders,
self.DataFlashOverMAVLink,
self.DataFlash,
self.SkidSteer,
self.PolyFence,
self.SDPolyFence,
self.PolyFenceAvoidance,
self.PolyFenceObjectAvoidanceAuto,
self.PolyFenceObjectAvoidanceGuided,
self.PolyFenceObjectAvoidanceBendyRuler,
self.SendToComponents,
self.PolyFenceObjectAvoidanceBendyRulerEasierGuided,
self.PolyFenceObjectAvoidanceBendyRulerEasierAuto,
self.SlewRate,
self.Scripting,
self.ScriptingSteeringAndThrottle,
self.MissionFrames,
self.SetpointGlobalPos,
self.SetpointGlobalVel,
self.AccelCal,
self.RangeFinder,
self.AIS,
self.AISMultipleVessels,
self.AISDataValidation,
self.AP_Proximity_MAV,
self.EndMissionBehavior,
self.FlashStorage,
self.FRAMStorage,
self.DepthFinder,
self.ChangeModeByNumber,
self.EStopAtBoot,
self.MAV_CMD_NAV_RETURN_TO_LAUNCH,
self.StickMixingAuto,
self.AutoDock,
self.PrivateChannel,
self.GCSFailsafe,
self.RoverInitialMode,
self.DriveMaxRCIN,
self.NoArmWithoutMissionItems,
self.PARAM_ERROR,
self.CompassPrearms,
self.ManyMAVLinkConnections,
self.MAV_CMD_DO_SET_REVERSE,
self.MAV_CMD_GET_HOME_POSITION,
self.MAV_CMD_DO_FENCE_ENABLE,
self.MAV_CMD_BATTERY_RESET,
self.NetworkingWebServer,
self.NetworkingWebServerPPP,
self.RTL_SPEED,
self.ScriptingLocationBindings,
self.MissionRetransfer,
self.FenceFullAndPartialTransfer,
self.MissionPolyEnabledPreArm,
self.OpticalFlow,
self.RCDuplicateOptionsExist,
self.ClearMission,
self.JammingSimulation,
self.BatteryInvalid,
self.REQUIRE_LOCATION_FOR_ARMING,
self.GetMessageInterval,
self.SafetySwitch,
self.ThrottleFailsafe,
self.DriveEachFrame,
self.AP_ROVER_AUTO_ARM_ONCE_ENABLED,
])
return ret
def disabled_tests(self):
return {
"SlewRate": "got timing report failure on CI",
"MAV_CMD_NAV_SET_YAW_SPEED": "compiled out of code by default",
"PolyFenceObjectAvoidanceBendyRuler": "unreliable",
}
def rc_defaults(self):
ret = super(AutoTestRover, self).rc_defaults()
ret[3] = 1500
ret[8] = 1800
return ret
def initial_mode_switch_mode(self):
return "MANUAL"
def default_mode(self):
return 'MANUAL'