Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
eclipse
GitHub Repository: eclipse/sumo
Path: blob/main/tools/import/vissim/convert_detectors2SUMO.py
169679 views
1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
# Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
4
# Copyright (C) 2009-2025 German Aerospace Center (DLR) and others.
5
# This program and the accompanying materials are made available under the
6
# terms of the Eclipse Public License 2.0 which is available at
7
# https://www.eclipse.org/legal/epl-2.0/
8
# This Source Code may also be made available under the following Secondary
9
# Licenses when the conditions for such availability set forth in the Eclipse
10
# Public License 2.0 are satisfied: GNU General Public License, version 2
11
# or later which is available at
12
# https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
13
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
14
15
# @file convert_detectors2SUMO.py
16
# @author Lukas Grohmann <[email protected]>
17
# @date Aug 14 2015
18
19
"""
20
Parses induction loops and travel time measurements from a VISSIM .inpx file
21
and writes converted information to a given .add.xml file.
22
see documentation
23
"""
24
from __future__ import absolute_import
25
from __future__ import print_function
26
27
import os
28
import sys
29
from xml.dom import minidom
30
from xml.dom.minidom import Document
31
32
import numpy as np
33
if 'SUMO_HOME' in os.environ:
34
sys.path.append(os.path.join(os.environ['SUMO_HOME'], 'tools'))
35
import sumolib # noqa
36
37
38
def dict_from_node_attributes(node):
39
"""takes a xml node and returns a dictionary with its attributes"""
40
return dict((attn, node.getAttribute(attn)) for attn in
41
node.attributes.keys())
42
43
44
def nparr_from_dict_list(dicl_tab, col_ns, col_ts):
45
"""converts a dictionary into an np array table structure"""
46
return np.array([tuple(rd.get(cn, '-1') for cn in col_ns) for rd in
47
dicl_tab], dtype=np.dtype(list(zip(col_ns, col_ts))))
48
49
50
def get_induction_loops(inpx_doc):
51
induction_tab = [dict_from_node_attributes(nd) for nd in
52
inpx_doc.getElementsByTagName('dataCollectionPoint')]
53
return induction_tab
54
55
56
def get_travel_time_detectors(inpx_doc):
57
travel_time_tab = []
58
for detector in \
59
inpx_doc.getElementsByTagName('vehicleTravelTimeMeasurement'):
60
travel_time_d = dict_from_node_attributes(detector)
61
start = detector.getElementsByTagName('start')[0]
62
travel_time_d['startLink'] = start.getAttribute('link')
63
travel_time_d['startPos'] = start.getAttribute('pos')
64
end = detector.getElementsByTagName('end')[0]
65
travel_time_d['endLink'] = end.getAttribute('link')
66
travel_time_d['endPos'] = end.getAttribute('pos')
67
travel_time_tab.append(travel_time_d)
68
return travel_time_tab
69
70
71
def get_detector_coords_from_link(link_id, link_tab, pathlen):
72
link = [li for li in link_tab if li["no"] == link_id]
73
if len(link) > 0:
74
points = link[0]["points"]
75
return get_point_on_polyline(points, float(pathlen))
76
else:
77
print("link not found")
78
79
80
def convert_vissim_to_sumo_coords(vissim_point, net_offset):
81
sumo_loop_coords = [vissim_point[0] + float(net_offset[0]),
82
vissim_point[1] + float(net_offset[1])]
83
return sumo_loop_coords
84
85
86
def create_measurement_file(induction_tab, travel_time_tab,
87
edge_tab, link_tab, net_offset):
88
result_doc = Document()
89
root = result_doc.createElement("additional")
90
result_doc.appendChild(root)
91
92
for loop in induction_tab:
93
ind_loop = result_doc.createElement("inductionLoop")
94
ind_loop.setAttribute("id", "_".join([loop["no"], loop["name"]]))
95
sumo_lane = "_".join([loop["lane"].split(" ")[0],
96
str(int(loop["lane"].split(" ")[1]) - 1)])
97
ind_loop.setAttribute("lane", sumo_lane)
98
99
pathlen = loop["pos"]
100
link_id = loop["lane"].split(" ")[0]
101
lane_index = str(int(loop["lane"].split(" ")[1]) - 1)
102
vissim_loop_coords = get_detector_coords_from_link(link_id,
103
link_tab,
104
pathlen)
105
sumo_loop_coords = convert_vissim_to_sumo_coords(vissim_loop_coords,
106
net_offset)
107
polyline = [lane for lane in
108
[edge for edge in edge_tab if edge["id"] == link_id][
109
0]["lanes"]
110
if lane["index"] == lane_index][0]["shape"].split(" ")
111
shape = []
112
for point in polyline:
113
shape.append(point.split(","))
114
edge_offset = sumolib.geomhelper.polygonOffsetWithMinimumDistanceToPoint(
115
sumo_loop_coords,
116
[[float(coord) for coord in point] for point in shape])
117
ind_loop.setAttribute("pos", str(edge_offset))
118
ind_loop.setAttribute("file", "ind_out.xml")
119
ind_loop.setAttribute("freq", "900")
120
root.appendChild(ind_loop)
121
122
for det in travel_time_tab:
123
travel_time = result_doc.createElement("entryExitDetector")
124
travel_time.setAttribute("id", det["no"])
125
travel_time.setAttribute("freq", "900")
126
travel_time.setAttribute("file", "time_out.xml")
127
128
start_edge = [edge for edge in edge_tab if
129
edge["id"] == det["startLink"]]
130
if len(start_edge) > 0:
131
start_point = get_detector_coords_from_link(start_edge[0]["id"],
132
link_tab,
133
det["startPos"])
134
sumo_point = convert_vissim_to_sumo_coords(start_point, net_offset)
135
for lane in start_edge[0]["lanes"]:
136
det_entry = result_doc.createElement("detEntry")
137
polyline = lane["shape"].split(" ")
138
shape = []
139
for point in polyline:
140
shape.append(point.split(","))
141
start_offset = sumolib.geomhelper.polygonOffsetWithMinimumDistanceToPoint(
142
sumo_point,
143
[[float(coord) for coord in point] for point in shape])
144
det_entry.setAttribute("lane", lane["id"])
145
if start_offset < float(lane["length"]):
146
det_entry.setAttribute("pos", str(start_offset))
147
else:
148
det_entry.setAttribute("pos", lane["length"])
149
travel_time.appendChild(det_entry)
150
end_edge = [edge for edge in edge_tab if
151
edge["id"] == det["endLink"]]
152
if len(end_edge) > 0:
153
end_point = get_detector_coords_from_link(end_edge[0]["id"],
154
link_tab,
155
det["endPos"])
156
sumo_point = convert_vissim_to_sumo_coords(end_point, net_offset)
157
for lane in end_edge[0]["lanes"]:
158
det_exit = result_doc.createElement("detExit")
159
polyline = lane["shape"].split(" ")
160
shape = []
161
for point in polyline:
162
shape.append(point.split(","))
163
end_offset = sumolib.geomhelper.polygonOffsetWithMinimumDistanceToPoint(
164
sumo_point,
165
[[float(coord) for coord in point] for point in shape])
166
det_exit.setAttribute("lane", lane["id"])
167
if end_offset < float(lane["length"]):
168
det_exit.setAttribute("pos", str(end_offset))
169
else:
170
det_exit.setAttribute("pos", lane["length"])
171
travel_time.appendChild(det_exit)
172
root.appendChild(travel_time)
173
return result_doc
174
175
176
def get_point_on_polyline(points, pathlen):
177
points = np.array(points, dtype=float)
178
index, rem_len = get_segment_of_polyline(points, pathlen)
179
# check if index is reasonable value
180
if index <= 0:
181
print("WARNING: got invalid point on polyline")
182
return None
183
P = np.array(points[index - 1])
184
# if the remaining length is within tolerance, snap to initial point
185
if rem_len <= 1.0e-3:
186
return P
187
Q = np.array(points[index])
188
PQ = Q - P # Vektor PQ
189
vn = PQ / np.linalg.norm(PQ) # normierter Richtungsvektor
190
return P + vn * rem_len
191
192
193
def get_segment_of_polyline(points, pathlen):
194
"""take a polyline and return the segment index where pathlen along the polyline lies
195
"""
196
# check if pathlen is < 0
197
if pathlen < 0:
198
return 0, None
199
seg_lens = get_segment_lengths(points)
200
# check if pathlen is longer than polyline
201
# with a tolerance of 1e-4
202
if pathlen > sum(seg_lens) + 1e-3:
203
return -1, pathlen - sum(seg_lens)
204
lm_segG = np.r_[0., np.cumsum(seg_lens)]
205
index = np.digitize([pathlen], lm_segG).item()
206
return (index, pathlen - lm_segG[index - 1])
207
208
209
def get_segment_lengths(points):
210
dxyz = np.diff(points, axis=0)
211
return np.linalg.norm(dxyz, axis=1)
212
213
214
def get_vissim_data(inpxdoc):
215
link_tab = []
216
for link in inpx_doc.getElementsByTagName('link'):
217
link_d = {}
218
link_d['no'] = link.getAttribute('no')
219
link_d['lanes'] = []
220
link_d['points'] = []
221
for lane in link.getElementsByTagName('lane'):
222
link_d['lanes'].append({'width': lane.getAttribute('width')})
223
link_tab.append(link_d)
224
for point in link.getElementsByTagName('point3D'):
225
link_d['points'].append([point.getAttribute('x'),
226
point.getAttribute('y')])
227
228
from_to_tab = []
229
for lin in inpxdoc.getElementsByTagName('link'):
230
if lin.hasChildNodes():
231
lep_d = {} # link end point dict
232
for ept in ('fromLinkEndPt', 'toLinkEndPt'):
233
lep_nd = lin.getElementsByTagName(ept)
234
ch0 = ept[0] # identifier 'f'rom / 't'o
235
if len(lep_nd) > 0:
236
dic = dict_from_node_attributes(lep_nd.item(0))
237
dic['link'], dic['lane'] = dic['lane'].split(' ')
238
lep_d.update(dict((ch0 + '_' + key, value)
239
for key, value in dic.items()))
240
lep_d.update({'_link': link_d['no'], })
241
from_to_tab.append(lep_d)
242
# which columns to pick ?
243
from_to_tab = nparr_from_dict_list(
244
from_to_tab,
245
'_link f_link f_lane t_link t_lane'.split(),
246
'O O i O i'.split())
247
return link_tab, from_to_tab
248
249
250
def get_sumo_data(sumodoc):
251
"""parse the SUMO data"""
252
junc_tab = []
253
conn_tab = []
254
edge_tab = []
255
for edge in sumo_doc.getElementsByTagName('edge'):
256
edge_d = dict_from_node_attributes(edge)
257
edge_d['lanes'] = []
258
for lane in edge.getElementsByTagName('lane'):
259
edge_d['lanes'].append(dict_from_node_attributes(lane))
260
edge_tab.append(edge_d)
261
junc_tab = [dict_from_node_attributes(nd) for
262
nd in sumodoc.getElementsByTagName('junction')]
263
col_n = ('id', 'type', 'x', 'y', 'incLanes', 'intLanes')
264
col_t = ('O', ) * 6
265
junc_tab = nparr_from_dict_list(junc_tab, col_n, col_t)
266
conn_tab = [dict_from_node_attributes(nd) for
267
nd in sumodoc.getElementsByTagName('connection')]
268
col_n = ('from', 'to', 'fromLane', 'toLane', 'via')
269
col_t = ('O', ) * 5
270
conn_tab = nparr_from_dict_list(conn_tab, col_n, col_t)
271
return junc_tab, conn_tab, edge_tab
272
273
274
def get_conn_verb_rel(conn_tab, from_to_tab):
275
"""returns 2 dictionaries, which contains the relation between connections
276
and verbinder"""
277
278
conn_link_d = {} # key = verbinder.id, value = list<connection.id>
279
link_conn_d = {} # key = connection.id, value = verbinder.id
280
for conn in conn_tab:
281
#
282
if ':' not in conn['from']:
283
link = from_to_tab[
284
(from_to_tab['f_link'] == conn['from'].split("[")[0]) & (
285
from_to_tab['t_link'] == conn['to'].split("[")[0])]
286
if len(link) > 0:
287
# dictionary to get the connection id for a given verbinder id
288
link_conn_d[conn['via']] = link['_link'][0]
289
if link["_link"][0] in conn_link_d:
290
conn_link_d[link["_link"][0]].append(conn['via'])
291
else:
292
conn_link_d[link["_link"][0]] = [conn['via']]
293
else:
294
print("from: " + conn['from'] + "to: " + conn['to'])
295
return link_conn_d, conn_link_d
296
297
298
# MAIN
299
if __name__ == '__main__':
300
op = sumolib.options.ArgumentParser(description='detector conversion utility (VISSIM.inpx to SUMO)')
301
op.add_argument('--vissim-input', '-V', category="input", required=True, type=op.file,
302
help='VISSIM inpx file path')
303
op.add_argument('--output-file', '-o', category="output", required=True, type=op.file,
304
help='output file name')
305
op.add_argument('--SUMO-net', '-S', category="input", required=True, type=op.net_file,
306
help='SUMO net file path')
307
args = op.parse_args()
308
print("\n", args, "\n")
309
print('\n---\n\n* loading VISSIM net:\n\t', args.vissim_input)
310
sumo_doc = minidom.parse(args.SUMO_net)
311
inpx_doc = minidom.parse(args.vissim_input)
312
net_offset = sumo_doc.getElementsByTagName('location')[0].getAttribute(
313
'netOffset').split(',')
314
link_tab, from_to_tab = get_vissim_data(inpx_doc)
315
junc_tab, conn_tab, edge_tab = get_sumo_data(sumo_doc)
316
317
conn_link_d = get_conn_verb_rel(conn_tab, from_to_tab)
318
induction_tab = get_induction_loops(inpx_doc)
319
travel_time_tab = get_travel_time_detectors(inpx_doc)
320
321
result_doc = create_measurement_file(induction_tab,
322
travel_time_tab,
323
edge_tab,
324
link_tab,
325
net_offset)
326
327
with open("%s.add.xml" % args.output_file, "w") as ofh:
328
result_doc.writexml(ofh, addindent=' ', newl='\n')
329
ofh.close()
330
331