Path: blob/main/tools/import/opendrive/signal_POIs_from_xodr.py
169679 views
#!/usr/bin/env python1# -*- coding: utf-8 -*-2# Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo3# Copyright (C) 2014-2025 German Aerospace Center (DLR) and others.4# This program and the accompanying materials are made available under the5# terms of the Eclipse Public License 2.0 which is available at6# https://www.eclipse.org/legal/epl-2.0/7# This Source Code may also be made available under the following Secondary8# Licenses when the conditions for such availability set forth in the Eclipse9# Public License 2.0 are satisfied: GNU General Public License, version 210# or later which is available at11# https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html12# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later1314# @file signal_POIs_from_xodr.py15# @author Gerald Richter; [email protected]16# @date 2017-08-061718"""19# what does it do:20- extract signal records from an xodr file that was converted21to a SUMO net using netconvert22- generate an additionals file containing pois of type='signal'23- ensure POIs are positioned and associated to the appropriate edge's lanes2425# example call:26signal_POIs_from_xodr.py data/OpenDrive/scen.xodr data/sumo/net.net.xml27-> will create a file data/sumo/signals.add.xml28"""2930import os31import sys32import numpy as np33# import pandas as pd # want to drop that dep34import lxml.etree as lET35if 'SUMO_HOME' in os.environ:36sys.path.append(os.path.join(os.environ['SUMO_HOME'], 'tools'))37import sumolib # noqa3839# polygon & POI40# https://sumo.dlr.de/docs/Simulation/Shapes.html41#42# <poly id="<POLYGON_ID>" type="<TYPENAME>" color="<COLOR>"43# fill="<FILL_OPTION>" layer="<LAYER_NO>" shape="<2D-POSITION>[44# <2D-POSITION>]*"/>45#46# <poi id="<POLYGON_ID>" type="<TYPENAME>" color="<RED>,<GREEN>,<BLUE>"47# layer="<LAYER_NO>" [(x="<X_POS>" y="<Y_POS>") | (lane="<LANE_ID>"48# pos="<LANE_POS>" [posLat="<LATERAL_POS>"])]/>4950# xodr:51# road/signal52# road:id53# net: (from netconvert conversion)54# edge:id <- [-]<road:id>.<meters>.<cm> # for driving55# lane:id <- [-]<road:id>.<meters>.<cm>_<lane> # for driving56# lane:id <- :<road:id>.<meters>.<cm>_<lane> # for internal575859def lol_T(lol):60"""transpose the list-of-lists way"""61return list(map(list, zip(*lol)))626364def find_upstream_lin_m(lm_soff, lim_lin_m):65"""for a linear metrage array lm_soff, find the index66of the max value, smaller lim_lin_m67"""68# NOTE: more elegant; np.digitize()69sel_ls_s = lm_soff < lin_m70sel_ls_s &= lm_soff == lm_soff[sel_ls_s].max()71return np.r_[range(len(sel_ls_s))][sel_ls_s].item()727374def calculate_lin_m_width(wr_attribs, lin_m):75"""determine the width at given linear meterage76wr_attribs hold parameters (sOffset, a,b,c,d)77lin_m is relative to last lane section offset78"""79wr_attribs = {k: float(v) for k, v in wr_attribs.items()}80ds = lin_m - wr_attribs['sOffset']81pa = np.r_[[wr_attribs[k] for k in 'abcd']]82va = np.r_[[1., ds, ds * ds, ds * ds * ds]]83return (pa * va).sum() # wr_attribs['a']848586def get_OD_lane_widths(lane_sec, cur_ls_sOff):87"""takes a lane section element and its offest against the road88returns [(lane_id, width), ...]89"""90global xodr_ns9192# get width record for each lane93ls_lanes = lane_sec.xpath('.//ns:lane', namespaces={'ns': xodr_ns})94# container for each lane's calculated width at position95lin_m_width = [] # [(lane_id, width), ]96for lane in ls_lanes:97width = 0.98# determine last upstream width record99l_soff = lane.xpath('.//ns:width/@sOffset', namespaces={'ns': xodr_ns})100if len(l_soff) == 0: # in case there is no width rec (xodr lane 0)101l_soff = [0.0]102# width = 0.103else:104l_soff = np.r_[l_soff].astype(float)105l_soff += cur_ls_sOff # to get actual linear meters along road106# find relevant width record index107wr_ind = find_upstream_lin_m(l_soff, lin_m)108wr_attribs = lane.xpath('.//ns:width',109namespaces={'ns': xodr_ns})[wr_ind].attrib110# subtract outer element offset111width = calculate_lin_m_width(wr_attribs, lin_m - cur_ls_sOff)112lin_m_width.append((int(lane.attrib['id']), width))113114return lin_m_width115116117def extract_lanes_width_data(rte, ):118"""extract lane width data below given root xml tree element rte119rte usually is a road or lane section120return data as recarray with columns (id, sOffset, a,b,c,d)121"""122global xodr_ns123124# get the lanes125# print(rte.xpath('.//ns:lane/@id', namespaces={'ns':xodr_ns}))126c_names = ['id', ]127# fetch the lane ids128ln_ids = rte.xpath('.//ns:lane/@id', namespaces={'ns': xodr_ns})129olane_ra = [list(map(int, ln_ids)), ]130# grab ANY lane/width for definition of keys131wid_tmpl = rte.xpath('//ns:lane/ns:width', namespaces={'ns': xodr_ns})[0].attrib132c_names.extend(list(wid_tmpl.keys()))133wd_par_num = [] # container for the numerics134for lnid in ln_ids:135# there should be only one136wd_pars = rte.xpath('.//ns:lane[@id="%s"]/ns:width' % lnid,137namespaces={'ns': xodr_ns})138if wd_pars: # list does contain something139wd_par_num.append(list(map(float, wd_pars[0].values())))140else:141wd_par_num.append([0., ] * len(wid_tmpl.keys()))142# transpose the list-of-lists way:143olane_ra.extend(lol_T(wd_par_num))144olane_ra = np.rec.fromarrays(olane_ra, names=c_names)145146return olane_ra147148149if __name__ == "__main__":150op = sumolib.options.ArgumentParser()151op.add_argument("xodr_file", category="input",152help="file path of open drive file")153op.add_argument("net_file", category="input",154help="file path of net file")155# op.add_argument("workLog", type=str, help="work log file")156args = op.parse_args()157158net_Fp = args.net_file # td_Dp+'/sumo/net.net.xml'159xodr_Fp = args.xodr_file # td_Dp+'/OpenDrive/scen_T01.02.xodr'160161# parse XODR tree162otree = lET.parse(xodr_Fp)163oroot = otree.getroot()164# grab the docs namespace for xpath shortcut165xodr_ns = oroot.nsmap[None]166# nasty ns addressing167roads = otree.xpath('ns:road', namespaces={'ns': xodr_ns})168"""169# tried to drop ns, but no to avail170import lxml.objectify171lxml.objectify.deannotate(oroot, cleanup_namespaces=True)172"""173174# parse SUMO net xml tree:175ntree = lET.parse(net_Fp)176edges = ntree.xpath("edge[@type!='internal']") # 'edge')177# get non internal lanes178nlanes = ntree.xpath("edge[@type!='internal']/lane")179180# get similarity ids incl. possible sign181edge_ids = (e.attrib['id'].split('.', 1) for e in edges)182# edge_df = pd.DataFrame(edge_ids, columns=('rd_ref','lin_m',)) # 'lane'))183# edge_df.lin_m = edge_df.lin_m.astype(float)184185lane_ids = (lane.attrib['id'].split('.', 1) for lane in nlanes)186lane_ids = ([c, l[0], ] + l[1].split('_') for c, l in enumerate(lane_ids))187lane_ra = lol_T(list(lane_ids))188lane_ra.append([lane.attrib['width'] for lane in nlanes])189# get max len of string id190idS_max = max(map(len, lane_ra[1]))191lane_ra = np.rec.fromarrays(lane_ra, names=('index', 'rd_ref', 'lin_m', 'lane', 'width'),192formats=['i4', 'U%d' % idS_max, 'f4', 'i2', 'f4'])193194# create new tree root element for SUMO additionals file195aroot = lET.Element('additional')196atree = aroot.getroottree()197198# walk through all XODR roads199for r_cnt, r in enumerate(roads, 1):200road_id = r.attrib['id']201print("* parsing road %2d : %s" % (r_cnt, road_id))202road_len = float(r.attrib['length'])203204# find matching SUMO-net elements205# edges matching the road-id206# edge_sel = edge_df['rd_ref'].apply(lambda s:road_id in s)207# xsd: every edge must have at least 1 lane208# lanes matching the road-id209lane_id_sel = np.fromiter(map(lambda s: road_id in s, lane_ra.rd_ref),210np.dtype(bool))211212# got to find right lanes:laneSection213# + with attribute s <= running road s of signal214olane_ra = extract_lanes_width_data(r)215216# get the signals element for this road217# sigs = r.xpath('ns:signals', namespaces={'ns':xodr_ns})218# expecting just 1 element in signals219# assert len(sigs) == 1220# can use relative addressing instead of: sigs[0].xpath('ns:signal',221222# TODO: preselect, as we dont want TLS @dynamic='no'223for s_cnt, s in enumerate(r.xpath('.//ns:signal', namespaces={'ns': xodr_ns}), 1):224poi = {'id': s.attrib['id'], 'layer': '10',225'type': 'signal',226'width': '0.75', 'height': '0.75', 'fill': 'true', } # just to show227poi_params = [{'key': key, 'value': s.attrib.get(key, '')}228for key in s.attrib if key not in ('id', 's', 't')]229print("* + parsing signal %2d : %s" % (s_cnt, poi['id']))230lin_m = float(s.attrib['s']) # get linear meter start231232# select the SUMO net lanes relevant233lin_m_sel = lane_ra.lin_m < lin_m234# join the lane selector with meterage limitations235lane_sel = lane_id_sel & lin_m_sel236# assuming we dont have to check for lanes of different length237# take max linear meterage238max_lin_m = lane_ra.lin_m[lane_sel].max()239lane_sel &= (lane_ra.lin_m == max_lin_m)240# need to calc that:241# (lin.meterage of unsplit road) - lin.m of start of fitting split edge)242poi['pos'] = "%e" % (lin_m - max_lin_m)243244# get the fitting laneSection from xodr245# find last upstream element by s offsets of the starts246ls_soff = np.r_[r.xpath('.//ns:laneSection/@s',247namespaces={'ns': xodr_ns})].astype(float)248ls_ind = find_upstream_lin_m(ls_soff, lin_m)249lane_sec = r.xpath('.//ns:laneSection', namespaces={'ns': xodr_ns})[ls_ind]250cur_ls_sOff = ls_soff[ls_ind]251252# TODO: future - process laneOffset, as another offset253# cur_ls_sOff +=- laneOffset254lin_m_width = get_OD_lane_widths(lane_sec, cur_ls_sOff)255256# transposing to [[lane_ids...], [lane_widths...]]257lin_m_width = lol_T(lin_m_width)258# get in order 0,-1,-2,...259decr_ord_ind = np.argsort(lin_m_width[0])[::-1]260lin_m_width = [np.take(r, decr_ord_ind) for r in lin_m_width]261# drop 0 width lanes, which are not translated to SUMO net262lin_m_width = [r[lin_m_width[1] > 0.] for r in lin_m_width]263num_oLanes = len(lin_m_width[0]) # number of xodr lanes at pos264# add lane limits as lateral coordinates265lin_m_width.append(-lin_m_width[-1].cumsum())266267# posLat transformation268def_nRef_lane = 0 # default SUMO net reference lane269nRef_lane = def_nRef_lane270# the OD t-offset against lane 0, left border271od_t_l0l = float(s.attrib['t']) # lateral xodr position272273# locate on which xodr lane this offset lands. might be good info274closest_lane_index = np.digitize(od_t_l0l, lin_m_width[-1]).item()275# catch stray to right276closest_lane_index = min(closest_lane_index, num_oLanes - 1)277net_laneInds = list(reversed(range(len(lin_m_width[0]))))278# get corresponding SUMO lane num_id279nRef_lane = net_laneInds[closest_lane_index]280# finally pack the sumo lane ids also281lin_m_width.append(net_laneInds)282283lnid_sel = lane_ra.lane == nRef_lane284lnid_gt_sel = lane_ra.lane > nRef_lane # for lanes to left285# assoc SUMO net lane286nlane_wid = lane_ra[lane_sel & lnid_sel].width.item()287nlleft_wid = lane_ra[lane_sel & lnid_gt_sel].width.sum()288# width(sumo_ref.lane)/2. +width(all lanes left sumo ref.) + (od_t_l0l)289posLat = nlane_wid / 2. + nlleft_wid + od_t_l0l290poi['posLat'] = "%.2f" % posLat291poi['lane'] = nlanes[lane_ra[lane_sel & lnid_sel].index[0]].attrib['id']292# print(od_t_l0l, lin_m_width, net_laneInds, nRef_lane)293# print(poi)294295# signal:validity296sign_valids = s.xpath('ns:validity', namespaces={'ns': xodr_ns})297# TODO: test this part298for v in sign_valids:299# translate the xodr lane-range #"<from_lane> <to_lane>"300sumo_lids = [lin_m_width[3][int(v.attrib[atn]) == lin_m_width[0]].item()301for atn in 'fromLane toLane'.split()]302poi_params.append({'key': 'validity',303'value': " ".join(map(str, sumo_lids))})304if len(sign_valids) == 0:305print(": INFO : default signal validity for all lanes")306307# TODO: pick and insert image refs for the signal's <type>-<subtype>308309# changing data type310poi = lET.Element('poi', attrib=poi)311# append all params312[poi.append(lET.Element('param', kvd)) for kvd in poi_params]313aroot.append(poi)314315sig_Fp = os.path.join(os.path.dirname(net_Fp), 'signals.add.xml')316print('-' * 3)317ans = input('? write sumo additionals file with POIs ([N] / y): ')318if ans.lower() == 'y':319atree.write(sig_Fp, pretty_print=True)320print('-' * 5, '\n', '* stored signal data to:', sig_Fp)321322323