Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
eclipse
GitHub Repository: eclipse/sumo
Path: blob/main/tools/import/opendrive/signal_POIs_from_xodr.py
169679 views
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
# Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
4
# Copyright (C) 2014-2025 German Aerospace Center (DLR) and others.
5
# This program and the accompanying materials are made available under the
6
# terms of the Eclipse Public License 2.0 which is available at
7
# https://www.eclipse.org/legal/epl-2.0/
8
# This Source Code may also be made available under the following Secondary
9
# Licenses when the conditions for such availability set forth in the Eclipse
10
# Public License 2.0 are satisfied: GNU General Public License, version 2
11
# or later which is available at
12
# https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
13
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
14
15
# @file signal_POIs_from_xodr.py
16
# @author Gerald Richter; [email protected]
17
# @date 2017-08-06
18
19
"""
20
# what does it do:
21
- extract signal records from an xodr file that was converted
22
to a SUMO net using netconvert
23
- generate an additionals file containing pois of type='signal'
24
- ensure POIs are positioned and associated to the appropriate edge's lanes
25
26
# example call:
27
signal_POIs_from_xodr.py data/OpenDrive/scen.xodr data/sumo/net.net.xml
28
-> will create a file data/sumo/signals.add.xml
29
"""
30
31
import os
32
import sys
33
import numpy as np
34
# import pandas as pd # want to drop that dep
35
import lxml.etree as lET
36
if 'SUMO_HOME' in os.environ:
37
sys.path.append(os.path.join(os.environ['SUMO_HOME'], 'tools'))
38
import sumolib # noqa
39
40
# polygon & POI
41
# https://sumo.dlr.de/docs/Simulation/Shapes.html
42
#
43
# <poly id="<POLYGON_ID>" type="<TYPENAME>" color="<COLOR>"
44
# fill="<FILL_OPTION>" layer="<LAYER_NO>" shape="<2D-POSITION>[
45
# <2D-POSITION>]*"/>
46
#
47
# <poi id="<POLYGON_ID>" type="<TYPENAME>" color="<RED>,<GREEN>,<BLUE>"
48
# layer="<LAYER_NO>" [(x="<X_POS>" y="<Y_POS>") | (lane="<LANE_ID>"
49
# pos="<LANE_POS>" [posLat="<LATERAL_POS>"])]/>
50
51
# xodr:
52
# road/signal
53
# road:id
54
# net: (from netconvert conversion)
55
# edge:id <- [-]<road:id>.<meters>.<cm> # for driving
56
# lane:id <- [-]<road:id>.<meters>.<cm>_<lane> # for driving
57
# lane:id <- :<road:id>.<meters>.<cm>_<lane> # for internal
58
59
60
def lol_T(lol):
61
"""transpose the list-of-lists way"""
62
return list(map(list, zip(*lol)))
63
64
65
def find_upstream_lin_m(lm_soff, lim_lin_m):
66
"""for a linear metrage array lm_soff, find the index
67
of the max value, smaller lim_lin_m
68
"""
69
# NOTE: more elegant; np.digitize()
70
sel_ls_s = lm_soff < lin_m
71
sel_ls_s &= lm_soff == lm_soff[sel_ls_s].max()
72
return np.r_[range(len(sel_ls_s))][sel_ls_s].item()
73
74
75
def calculate_lin_m_width(wr_attribs, lin_m):
76
"""determine the width at given linear meterage
77
wr_attribs hold parameters (sOffset, a,b,c,d)
78
lin_m is relative to last lane section offset
79
"""
80
wr_attribs = {k: float(v) for k, v in wr_attribs.items()}
81
ds = lin_m - wr_attribs['sOffset']
82
pa = np.r_[[wr_attribs[k] for k in 'abcd']]
83
va = np.r_[[1., ds, ds * ds, ds * ds * ds]]
84
return (pa * va).sum() # wr_attribs['a']
85
86
87
def get_OD_lane_widths(lane_sec, cur_ls_sOff):
88
"""takes a lane section element and its offest against the road
89
returns [(lane_id, width), ...]
90
"""
91
global xodr_ns
92
93
# get width record for each lane
94
ls_lanes = lane_sec.xpath('.//ns:lane', namespaces={'ns': xodr_ns})
95
# container for each lane's calculated width at position
96
lin_m_width = [] # [(lane_id, width), ]
97
for lane in ls_lanes:
98
width = 0.
99
# determine last upstream width record
100
l_soff = lane.xpath('.//ns:width/@sOffset', namespaces={'ns': xodr_ns})
101
if len(l_soff) == 0: # in case there is no width rec (xodr lane 0)
102
l_soff = [0.0]
103
# width = 0.
104
else:
105
l_soff = np.r_[l_soff].astype(float)
106
l_soff += cur_ls_sOff # to get actual linear meters along road
107
# find relevant width record index
108
wr_ind = find_upstream_lin_m(l_soff, lin_m)
109
wr_attribs = lane.xpath('.//ns:width',
110
namespaces={'ns': xodr_ns})[wr_ind].attrib
111
# subtract outer element offset
112
width = calculate_lin_m_width(wr_attribs, lin_m - cur_ls_sOff)
113
lin_m_width.append((int(lane.attrib['id']), width))
114
115
return lin_m_width
116
117
118
def extract_lanes_width_data(rte, ):
119
"""extract lane width data below given root xml tree element rte
120
rte usually is a road or lane section
121
return data as recarray with columns (id, sOffset, a,b,c,d)
122
"""
123
global xodr_ns
124
125
# get the lanes
126
# print(rte.xpath('.//ns:lane/@id', namespaces={'ns':xodr_ns}))
127
c_names = ['id', ]
128
# fetch the lane ids
129
ln_ids = rte.xpath('.//ns:lane/@id', namespaces={'ns': xodr_ns})
130
olane_ra = [list(map(int, ln_ids)), ]
131
# grab ANY lane/width for definition of keys
132
wid_tmpl = rte.xpath('//ns:lane/ns:width', namespaces={'ns': xodr_ns})[0].attrib
133
c_names.extend(list(wid_tmpl.keys()))
134
wd_par_num = [] # container for the numerics
135
for lnid in ln_ids:
136
# there should be only one
137
wd_pars = rte.xpath('.//ns:lane[@id="%s"]/ns:width' % lnid,
138
namespaces={'ns': xodr_ns})
139
if wd_pars: # list does contain something
140
wd_par_num.append(list(map(float, wd_pars[0].values())))
141
else:
142
wd_par_num.append([0., ] * len(wid_tmpl.keys()))
143
# transpose the list-of-lists way:
144
olane_ra.extend(lol_T(wd_par_num))
145
olane_ra = np.rec.fromarrays(olane_ra, names=c_names)
146
147
return olane_ra
148
149
150
if __name__ == "__main__":
151
op = sumolib.options.ArgumentParser()
152
op.add_argument("xodr_file", category="input",
153
help="file path of open drive file")
154
op.add_argument("net_file", category="input",
155
help="file path of net file")
156
# op.add_argument("workLog", type=str, help="work log file")
157
args = op.parse_args()
158
159
net_Fp = args.net_file # td_Dp+'/sumo/net.net.xml'
160
xodr_Fp = args.xodr_file # td_Dp+'/OpenDrive/scen_T01.02.xodr'
161
162
# parse XODR tree
163
otree = lET.parse(xodr_Fp)
164
oroot = otree.getroot()
165
# grab the docs namespace for xpath shortcut
166
xodr_ns = oroot.nsmap[None]
167
# nasty ns addressing
168
roads = otree.xpath('ns:road', namespaces={'ns': xodr_ns})
169
"""
170
# tried to drop ns, but no to avail
171
import lxml.objectify
172
lxml.objectify.deannotate(oroot, cleanup_namespaces=True)
173
"""
174
175
# parse SUMO net xml tree:
176
ntree = lET.parse(net_Fp)
177
edges = ntree.xpath("edge[@type!='internal']") # 'edge')
178
# get non internal lanes
179
nlanes = ntree.xpath("edge[@type!='internal']/lane")
180
181
# get similarity ids incl. possible sign
182
edge_ids = (e.attrib['id'].split('.', 1) for e in edges)
183
# edge_df = pd.DataFrame(edge_ids, columns=('rd_ref','lin_m',)) # 'lane'))
184
# edge_df.lin_m = edge_df.lin_m.astype(float)
185
186
lane_ids = (lane.attrib['id'].split('.', 1) for lane in nlanes)
187
lane_ids = ([c, l[0], ] + l[1].split('_') for c, l in enumerate(lane_ids))
188
lane_ra = lol_T(list(lane_ids))
189
lane_ra.append([lane.attrib['width'] for lane in nlanes])
190
# get max len of string id
191
idS_max = max(map(len, lane_ra[1]))
192
lane_ra = np.rec.fromarrays(lane_ra, names=('index', 'rd_ref', 'lin_m', 'lane', 'width'),
193
formats=['i4', 'U%d' % idS_max, 'f4', 'i2', 'f4'])
194
195
# create new tree root element for SUMO additionals file
196
aroot = lET.Element('additional')
197
atree = aroot.getroottree()
198
199
# walk through all XODR roads
200
for r_cnt, r in enumerate(roads, 1):
201
road_id = r.attrib['id']
202
print("* parsing road %2d : %s" % (r_cnt, road_id))
203
road_len = float(r.attrib['length'])
204
205
# find matching SUMO-net elements
206
# edges matching the road-id
207
# edge_sel = edge_df['rd_ref'].apply(lambda s:road_id in s)
208
# xsd: every edge must have at least 1 lane
209
# lanes matching the road-id
210
lane_id_sel = np.fromiter(map(lambda s: road_id in s, lane_ra.rd_ref),
211
np.dtype(bool))
212
213
# got to find right lanes:laneSection
214
# + with attribute s <= running road s of signal
215
olane_ra = extract_lanes_width_data(r)
216
217
# get the signals element for this road
218
# sigs = r.xpath('ns:signals', namespaces={'ns':xodr_ns})
219
# expecting just 1 element in signals
220
# assert len(sigs) == 1
221
# can use relative addressing instead of: sigs[0].xpath('ns:signal',
222
223
# TODO: preselect, as we dont want TLS @dynamic='no'
224
for s_cnt, s in enumerate(r.xpath('.//ns:signal', namespaces={'ns': xodr_ns}), 1):
225
poi = {'id': s.attrib['id'], 'layer': '10',
226
'type': 'signal',
227
'width': '0.75', 'height': '0.75', 'fill': 'true', } # just to show
228
poi_params = [{'key': key, 'value': s.attrib.get(key, '')}
229
for key in s.attrib if key not in ('id', 's', 't')]
230
print("* + parsing signal %2d : %s" % (s_cnt, poi['id']))
231
lin_m = float(s.attrib['s']) # get linear meter start
232
233
# select the SUMO net lanes relevant
234
lin_m_sel = lane_ra.lin_m < lin_m
235
# join the lane selector with meterage limitations
236
lane_sel = lane_id_sel & lin_m_sel
237
# assuming we dont have to check for lanes of different length
238
# take max linear meterage
239
max_lin_m = lane_ra.lin_m[lane_sel].max()
240
lane_sel &= (lane_ra.lin_m == max_lin_m)
241
# need to calc that:
242
# (lin.meterage of unsplit road) - lin.m of start of fitting split edge)
243
poi['pos'] = "%e" % (lin_m - max_lin_m)
244
245
# get the fitting laneSection from xodr
246
# find last upstream element by s offsets of the starts
247
ls_soff = np.r_[r.xpath('.//ns:laneSection/@s',
248
namespaces={'ns': xodr_ns})].astype(float)
249
ls_ind = find_upstream_lin_m(ls_soff, lin_m)
250
lane_sec = r.xpath('.//ns:laneSection', namespaces={'ns': xodr_ns})[ls_ind]
251
cur_ls_sOff = ls_soff[ls_ind]
252
253
# TODO: future - process laneOffset, as another offset
254
# cur_ls_sOff +=- laneOffset
255
lin_m_width = get_OD_lane_widths(lane_sec, cur_ls_sOff)
256
257
# transposing to [[lane_ids...], [lane_widths...]]
258
lin_m_width = lol_T(lin_m_width)
259
# get in order 0,-1,-2,...
260
decr_ord_ind = np.argsort(lin_m_width[0])[::-1]
261
lin_m_width = [np.take(r, decr_ord_ind) for r in lin_m_width]
262
# drop 0 width lanes, which are not translated to SUMO net
263
lin_m_width = [r[lin_m_width[1] > 0.] for r in lin_m_width]
264
num_oLanes = len(lin_m_width[0]) # number of xodr lanes at pos
265
# add lane limits as lateral coordinates
266
lin_m_width.append(-lin_m_width[-1].cumsum())
267
268
# posLat transformation
269
def_nRef_lane = 0 # default SUMO net reference lane
270
nRef_lane = def_nRef_lane
271
# the OD t-offset against lane 0, left border
272
od_t_l0l = float(s.attrib['t']) # lateral xodr position
273
274
# locate on which xodr lane this offset lands. might be good info
275
closest_lane_index = np.digitize(od_t_l0l, lin_m_width[-1]).item()
276
# catch stray to right
277
closest_lane_index = min(closest_lane_index, num_oLanes - 1)
278
net_laneInds = list(reversed(range(len(lin_m_width[0]))))
279
# get corresponding SUMO lane num_id
280
nRef_lane = net_laneInds[closest_lane_index]
281
# finally pack the sumo lane ids also
282
lin_m_width.append(net_laneInds)
283
284
lnid_sel = lane_ra.lane == nRef_lane
285
lnid_gt_sel = lane_ra.lane > nRef_lane # for lanes to left
286
# assoc SUMO net lane
287
nlane_wid = lane_ra[lane_sel & lnid_sel].width.item()
288
nlleft_wid = lane_ra[lane_sel & lnid_gt_sel].width.sum()
289
# width(sumo_ref.lane)/2. +width(all lanes left sumo ref.) + (od_t_l0l)
290
posLat = nlane_wid / 2. + nlleft_wid + od_t_l0l
291
poi['posLat'] = "%.2f" % posLat
292
poi['lane'] = nlanes[lane_ra[lane_sel & lnid_sel].index[0]].attrib['id']
293
# print(od_t_l0l, lin_m_width, net_laneInds, nRef_lane)
294
# print(poi)
295
296
# signal:validity
297
sign_valids = s.xpath('ns:validity', namespaces={'ns': xodr_ns})
298
# TODO: test this part
299
for v in sign_valids:
300
# translate the xodr lane-range #"<from_lane> <to_lane>"
301
sumo_lids = [lin_m_width[3][int(v.attrib[atn]) == lin_m_width[0]].item()
302
for atn in 'fromLane toLane'.split()]
303
poi_params.append({'key': 'validity',
304
'value': " ".join(map(str, sumo_lids))})
305
if len(sign_valids) == 0:
306
print(": INFO : default signal validity for all lanes")
307
308
# TODO: pick and insert image refs for the signal's <type>-<subtype>
309
310
# changing data type
311
poi = lET.Element('poi', attrib=poi)
312
# append all params
313
[poi.append(lET.Element('param', kvd)) for kvd in poi_params]
314
aroot.append(poi)
315
316
sig_Fp = os.path.join(os.path.dirname(net_Fp), 'signals.add.xml')
317
print('-' * 3)
318
ans = input('? write sumo additionals file with POIs ([N] / y): ')
319
if ans.lower() == 'y':
320
atree.write(sig_Fp, pretty_print=True)
321
print('-' * 5, '\n', '* stored signal data to:', sig_Fp)
322
323