Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
Ardupilot
GitHub Repository: Ardupilot/ardupilot
Path: blob/master/Tools/terrain-tools/terrain_rangefinder_postprocess.ipynb
4277 views
Kernel: oi_kernel

Edit the below cell's input_log variable to the path to your BIN (dataflash) log. The BIN log parser currently assumes the first rangefinder is downward facing.

from dataclasses import dataclass from collections import namedtuple from datetime import datetime import math import pandas as pd from pathlib import Path from pymavlink import mavutil from pyproj import Geod from scipy.spatial.transform import Rotation as R input_log = Path("00000125.BIN") stem = input_log.stem # '00000125' from '00000125.BIN' output_csv = Path(f"projected_{stem}.csv") # Open .bin log log = mavutil.mavlink_connection(str(input_log)) # Create a geodesic calculator geod = Geod(ellps='WGS84') # Store messages as lists of tuples attitudes = [] ahrs_positions = [] rangefinder = [] terrain_data = [] Ahrs = namedtuple("Ahrs", ["timestamp", "lat", "lon", "alt", "roll", "pitch", "yaw"]) Rangefinder = namedtuple("Rangefinder", ["timestamp", "distance"]) Terrain = namedtuple("Terrain", ["timestamp", "terr_height"]) @dataclass class GroundPoint: timestamp: float rng_lat: float rng_lon: float rng_elev: float ahrs_lat: float ahrs_lon: float ahrs_alt: float distance: float terr_height: float while True: msg = log.recv_match(blocking=False) if msg is None: break t = datetime.fromtimestamp(msg._timestamp) mtype = msg.get_type() if msg.get_type() == "AHR2": # Lat and Lon in degrees, alt in m ahrs_positions.append(Ahrs( timestamp=t, lat=msg.Lat, lon=msg.Lng, alt=msg.Alt, roll=math.radians(msg.Roll), pitch=math.radians(msg.Pitch), yaw=math.radians(msg.Yaw), )) elif msg.get_type() == "RFND" and msg.Stat == 4: # Only use healthy readings, see AP_Rangefinder::Status::Good rangefinder.append(Rangefinder( timestamp=t, distance=msg.Dist )) elif msg.get_type() == "TERR": terrain_data.append(Terrain( timestamp=t, terr_height=msg.TerrH )) ahrs_df = pd.DataFrame(ahrs_positions).set_index("timestamp").sort_index() rfnd_df = pd.DataFrame(rangefinder).set_index("timestamp").sort_index() terr_df = pd.DataFrame(terrain_data).set_index("timestamp").sort_index()
# Compute estimated ground points ground_points: list[GroundPoint] = [] assert ahrs_positions, "No AHR2 messages found" assert rangefinder, "No valid RFND messages found" assert terrain_data, "No TERR messages found" for t, row in rfnd_df.iterrows(): dist = row.distance # Efficient nearest neighbor lookup with .asof() for datetime index try: ahrs_row = ahrs_df.loc[ahrs_df.index.asof(t)] terr_row = terr_df.loc[terr_df.index.asof(t)] except KeyError: continue # skip if no data available at or before timestamp # Unpack required fields lat, lon, alt, roll, pitch, yaw = ahrs_row.lat, ahrs_row.lon, ahrs_row.alt, ahrs_row.roll, ahrs_row.pitch, ahrs_row.yaw terr_height = terr_row.terr_height # Build the rotation using Euler angles (in radians) rotation = R.from_euler('xyz', [roll, pitch, yaw]) # Rotate the down vector (in body frame) ned_vector = rotation.apply([0, 0, dist]) # Calculate ground point in NED frame north_offset = ned_vector[0] east_offset = ned_vector[1] down_offset = ned_vector[2] ground_alt = alt - down_offset # down_offset is positive, so subtract it to get estimated ground height # Project horizontal lat/lon offset azimuth = math.degrees(math.atan2(east_offset, north_offset)) horizontal_distance = math.hypot(north_offset, east_offset) g_lon, g_lat, _ = geod.fwd(lon, lat, azimuth, horizontal_distance) ground_points.append(GroundPoint( timestamp=t, rng_lat=g_lat, rng_lon=g_lon, rng_elev=ground_alt, ahrs_lat=lat, ahrs_lon=lon, ahrs_alt=alt, distance=dist, terr_height=terr_height )) if not ground_points: print("Error: No projected ground point data found.") exit(1)
# Convert to DataFrame import pandas as pd df = pd.DataFrame([gp.__dict__ for gp in ground_points])
# Get the latitude/longitude extent for input to OpenTopography min_lat = df["rng_lat"].min() max_lat = df["rng_lat"].max() min_lon = df["rng_lon"].min() max_lon = df["rng_lon"].max() assert min_lat < max_lat assert min_lon < max_lon print(f"Latitude range: ({min_lat},{max_lat})") print(f"Longitude range: ({min_lon},{max_lon})")

To compare to other elevation data sources, these must be manually downloaded from OpenTopography. Once extracted into this directory, rename them and update paths in the below block. Add as many datasets for comparison as you like. GDAL supports many different file formats.

# Load elevation from GeoTIFF using GDAL from osgeo import gdal dataset_paths = [Path("cop30.tif"), Path("AW3D30.tif")] for dataset_path in dataset_paths: dataset = gdal.Open(str(dataset_path)) assert dataset is not None, f"Dataset '{dataset_path}' failed to load" band = dataset.GetRasterBand(1) transform = dataset.GetGeoTransform() # Convert lat/lon to row/col def latlon_to_rowcol(lat, lon): inv_transform = gdal.InvGeoTransform(transform) px, py = gdal.ApplyGeoTransform(inv_transform, lon, lat) return int(py), int(px) elevations = [] for _, row in df.iterrows(): r, c = latlon_to_rowcol(row.rng_lat, row.rng_lon) grid = band.ReadAsArray(c, r, 1, 1) if grid is not None: elevation = grid[0][0] else: # Out of range of the dataset, common with LOG_REPLAY before the EKF initializes elevation = math.nan elevations.append(elevation) df[dataset_path.stem + "_elevation"] = elevations
# plot using plotly, good for smaller logs (<30 minutes) import plotly.graph_objs as go fig = go.Figure() for col in df.columns: stems = [f"{p.stem}_elevation" for p in dataset_paths] cols = ("ahrs_alt", "rng_elev", "terr_height", *stems) if col in cols: print(f"Adding {col}") y = pd.to_numeric(df[col], errors='coerce') fig.add_trace(go.Scattergl(x=df["timestamp"], y=y, mode='lines+markers', name=col)) fig.update_layout(title=f"Terrain Comparison Over Time for {stem}", xaxis_title="Time", yaxis_title="Altitude (m)", height=600) fig.show()
# Save plot import plotly.io as pio pio.write_html(fig, f'terrain_pyplot_{stem}.html')