Path: blob/main/tools/import/vissim/convert_detectors2SUMO.py
169679 views
#!/usr/bin/env python1# -*- coding: utf-8 -*-2# Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo3# Copyright (C) 2009-2025 German Aerospace Center (DLR) and others.4# This program and the accompanying materials are made available under the5# terms of the Eclipse Public License 2.0 which is available at6# https://www.eclipse.org/legal/epl-2.0/7# This Source Code may also be made available under the following Secondary8# Licenses when the conditions for such availability set forth in the Eclipse9# Public License 2.0 are satisfied: GNU General Public License, version 210# or later which is available at11# https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html12# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later1314# @file convert_detectors2SUMO.py15# @author Lukas Grohmann <[email protected]>16# @date Aug 14 20151718"""19Parses induction loops and travel time measurements from a VISSIM .inpx file20and writes converted information to a given .add.xml file.21see documentation22"""23from __future__ import absolute_import24from __future__ import print_function2526import os27import sys28from xml.dom import minidom29from xml.dom.minidom import Document3031import numpy as np32if 'SUMO_HOME' in os.environ:33sys.path.append(os.path.join(os.environ['SUMO_HOME'], 'tools'))34import sumolib # noqa353637def dict_from_node_attributes(node):38"""takes a xml node and returns a dictionary with its attributes"""39return dict((attn, node.getAttribute(attn)) for attn in40node.attributes.keys())414243def nparr_from_dict_list(dicl_tab, col_ns, col_ts):44"""converts a dictionary into an np array table structure"""45return np.array([tuple(rd.get(cn, '-1') for cn in col_ns) for rd in46dicl_tab], dtype=np.dtype(list(zip(col_ns, col_ts))))474849def get_induction_loops(inpx_doc):50induction_tab = [dict_from_node_attributes(nd) for nd in51inpx_doc.getElementsByTagName('dataCollectionPoint')]52return induction_tab535455def get_travel_time_detectors(inpx_doc):56travel_time_tab = []57for detector in \58inpx_doc.getElementsByTagName('vehicleTravelTimeMeasurement'):59travel_time_d = dict_from_node_attributes(detector)60start = detector.getElementsByTagName('start')[0]61travel_time_d['startLink'] = start.getAttribute('link')62travel_time_d['startPos'] = start.getAttribute('pos')63end = detector.getElementsByTagName('end')[0]64travel_time_d['endLink'] = end.getAttribute('link')65travel_time_d['endPos'] = end.getAttribute('pos')66travel_time_tab.append(travel_time_d)67return travel_time_tab686970def get_detector_coords_from_link(link_id, link_tab, pathlen):71link = [li for li in link_tab if li["no"] == link_id]72if len(link) > 0:73points = link[0]["points"]74return get_point_on_polyline(points, float(pathlen))75else:76print("link not found")777879def convert_vissim_to_sumo_coords(vissim_point, net_offset):80sumo_loop_coords = [vissim_point[0] + float(net_offset[0]),81vissim_point[1] + float(net_offset[1])]82return sumo_loop_coords838485def create_measurement_file(induction_tab, travel_time_tab,86edge_tab, link_tab, net_offset):87result_doc = Document()88root = result_doc.createElement("additional")89result_doc.appendChild(root)9091for loop in induction_tab:92ind_loop = result_doc.createElement("inductionLoop")93ind_loop.setAttribute("id", "_".join([loop["no"], loop["name"]]))94sumo_lane = "_".join([loop["lane"].split(" ")[0],95str(int(loop["lane"].split(" ")[1]) - 1)])96ind_loop.setAttribute("lane", sumo_lane)9798pathlen = loop["pos"]99link_id = loop["lane"].split(" ")[0]100lane_index = str(int(loop["lane"].split(" ")[1]) - 1)101vissim_loop_coords = get_detector_coords_from_link(link_id,102link_tab,103pathlen)104sumo_loop_coords = convert_vissim_to_sumo_coords(vissim_loop_coords,105net_offset)106polyline = [lane for lane in107[edge for edge in edge_tab if edge["id"] == link_id][1080]["lanes"]109if lane["index"] == lane_index][0]["shape"].split(" ")110shape = []111for point in polyline:112shape.append(point.split(","))113edge_offset = sumolib.geomhelper.polygonOffsetWithMinimumDistanceToPoint(114sumo_loop_coords,115[[float(coord) for coord in point] for point in shape])116ind_loop.setAttribute("pos", str(edge_offset))117ind_loop.setAttribute("file", "ind_out.xml")118ind_loop.setAttribute("freq", "900")119root.appendChild(ind_loop)120121for det in travel_time_tab:122travel_time = result_doc.createElement("entryExitDetector")123travel_time.setAttribute("id", det["no"])124travel_time.setAttribute("freq", "900")125travel_time.setAttribute("file", "time_out.xml")126127start_edge = [edge for edge in edge_tab if128edge["id"] == det["startLink"]]129if len(start_edge) > 0:130start_point = get_detector_coords_from_link(start_edge[0]["id"],131link_tab,132det["startPos"])133sumo_point = convert_vissim_to_sumo_coords(start_point, net_offset)134for lane in start_edge[0]["lanes"]:135det_entry = result_doc.createElement("detEntry")136polyline = lane["shape"].split(" ")137shape = []138for point in polyline:139shape.append(point.split(","))140start_offset = sumolib.geomhelper.polygonOffsetWithMinimumDistanceToPoint(141sumo_point,142[[float(coord) for coord in point] for point in shape])143det_entry.setAttribute("lane", lane["id"])144if start_offset < float(lane["length"]):145det_entry.setAttribute("pos", str(start_offset))146else:147det_entry.setAttribute("pos", lane["length"])148travel_time.appendChild(det_entry)149end_edge = [edge for edge in edge_tab if150edge["id"] == det["endLink"]]151if len(end_edge) > 0:152end_point = get_detector_coords_from_link(end_edge[0]["id"],153link_tab,154det["endPos"])155sumo_point = convert_vissim_to_sumo_coords(end_point, net_offset)156for lane in end_edge[0]["lanes"]:157det_exit = result_doc.createElement("detExit")158polyline = lane["shape"].split(" ")159shape = []160for point in polyline:161shape.append(point.split(","))162end_offset = sumolib.geomhelper.polygonOffsetWithMinimumDistanceToPoint(163sumo_point,164[[float(coord) for coord in point] for point in shape])165det_exit.setAttribute("lane", lane["id"])166if end_offset < float(lane["length"]):167det_exit.setAttribute("pos", str(end_offset))168else:169det_exit.setAttribute("pos", lane["length"])170travel_time.appendChild(det_exit)171root.appendChild(travel_time)172return result_doc173174175def get_point_on_polyline(points, pathlen):176points = np.array(points, dtype=float)177index, rem_len = get_segment_of_polyline(points, pathlen)178# check if index is reasonable value179if index <= 0:180print("WARNING: got invalid point on polyline")181return None182P = np.array(points[index - 1])183# if the remaining length is within tolerance, snap to initial point184if rem_len <= 1.0e-3:185return P186Q = np.array(points[index])187PQ = Q - P # Vektor PQ188vn = PQ / np.linalg.norm(PQ) # normierter Richtungsvektor189return P + vn * rem_len190191192def get_segment_of_polyline(points, pathlen):193"""take a polyline and return the segment index where pathlen along the polyline lies194"""195# check if pathlen is < 0196if pathlen < 0:197return 0, None198seg_lens = get_segment_lengths(points)199# check if pathlen is longer than polyline200# with a tolerance of 1e-4201if pathlen > sum(seg_lens) + 1e-3:202return -1, pathlen - sum(seg_lens)203lm_segG = np.r_[0., np.cumsum(seg_lens)]204index = np.digitize([pathlen], lm_segG).item()205return (index, pathlen - lm_segG[index - 1])206207208def get_segment_lengths(points):209dxyz = np.diff(points, axis=0)210return np.linalg.norm(dxyz, axis=1)211212213def get_vissim_data(inpxdoc):214link_tab = []215for link in inpx_doc.getElementsByTagName('link'):216link_d = {}217link_d['no'] = link.getAttribute('no')218link_d['lanes'] = []219link_d['points'] = []220for lane in link.getElementsByTagName('lane'):221link_d['lanes'].append({'width': lane.getAttribute('width')})222link_tab.append(link_d)223for point in link.getElementsByTagName('point3D'):224link_d['points'].append([point.getAttribute('x'),225point.getAttribute('y')])226227from_to_tab = []228for lin in inpxdoc.getElementsByTagName('link'):229if lin.hasChildNodes():230lep_d = {} # link end point dict231for ept in ('fromLinkEndPt', 'toLinkEndPt'):232lep_nd = lin.getElementsByTagName(ept)233ch0 = ept[0] # identifier 'f'rom / 't'o234if len(lep_nd) > 0:235dic = dict_from_node_attributes(lep_nd.item(0))236dic['link'], dic['lane'] = dic['lane'].split(' ')237lep_d.update(dict((ch0 + '_' + key, value)238for key, value in dic.items()))239lep_d.update({'_link': link_d['no'], })240from_to_tab.append(lep_d)241# which columns to pick ?242from_to_tab = nparr_from_dict_list(243from_to_tab,244'_link f_link f_lane t_link t_lane'.split(),245'O O i O i'.split())246return link_tab, from_to_tab247248249def get_sumo_data(sumodoc):250"""parse the SUMO data"""251junc_tab = []252conn_tab = []253edge_tab = []254for edge in sumo_doc.getElementsByTagName('edge'):255edge_d = dict_from_node_attributes(edge)256edge_d['lanes'] = []257for lane in edge.getElementsByTagName('lane'):258edge_d['lanes'].append(dict_from_node_attributes(lane))259edge_tab.append(edge_d)260junc_tab = [dict_from_node_attributes(nd) for261nd in sumodoc.getElementsByTagName('junction')]262col_n = ('id', 'type', 'x', 'y', 'incLanes', 'intLanes')263col_t = ('O', ) * 6264junc_tab = nparr_from_dict_list(junc_tab, col_n, col_t)265conn_tab = [dict_from_node_attributes(nd) for266nd in sumodoc.getElementsByTagName('connection')]267col_n = ('from', 'to', 'fromLane', 'toLane', 'via')268col_t = ('O', ) * 5269conn_tab = nparr_from_dict_list(conn_tab, col_n, col_t)270return junc_tab, conn_tab, edge_tab271272273def get_conn_verb_rel(conn_tab, from_to_tab):274"""returns 2 dictionaries, which contains the relation between connections275and verbinder"""276277conn_link_d = {} # key = verbinder.id, value = list<connection.id>278link_conn_d = {} # key = connection.id, value = verbinder.id279for conn in conn_tab:280#281if ':' not in conn['from']:282link = from_to_tab[283(from_to_tab['f_link'] == conn['from'].split("[")[0]) & (284from_to_tab['t_link'] == conn['to'].split("[")[0])]285if len(link) > 0:286# dictionary to get the connection id for a given verbinder id287link_conn_d[conn['via']] = link['_link'][0]288if link["_link"][0] in conn_link_d:289conn_link_d[link["_link"][0]].append(conn['via'])290else:291conn_link_d[link["_link"][0]] = [conn['via']]292else:293print("from: " + conn['from'] + "to: " + conn['to'])294return link_conn_d, conn_link_d295296297# MAIN298if __name__ == '__main__':299op = sumolib.options.ArgumentParser(description='detector conversion utility (VISSIM.inpx to SUMO)')300op.add_argument('--vissim-input', '-V', category="input", required=True, type=op.file,301help='VISSIM inpx file path')302op.add_argument('--output-file', '-o', category="output", required=True, type=op.file,303help='output file name')304op.add_argument('--SUMO-net', '-S', category="input", required=True, type=op.net_file,305help='SUMO net file path')306args = op.parse_args()307print("\n", args, "\n")308print('\n---\n\n* loading VISSIM net:\n\t', args.vissim_input)309sumo_doc = minidom.parse(args.SUMO_net)310inpx_doc = minidom.parse(args.vissim_input)311net_offset = sumo_doc.getElementsByTagName('location')[0].getAttribute(312'netOffset').split(',')313link_tab, from_to_tab = get_vissim_data(inpx_doc)314junc_tab, conn_tab, edge_tab = get_sumo_data(sumo_doc)315316conn_link_d = get_conn_verb_rel(conn_tab, from_to_tab)317induction_tab = get_induction_loops(inpx_doc)318travel_time_tab = get_travel_time_detectors(inpx_doc)319320result_doc = create_measurement_file(induction_tab,321travel_time_tab,322edge_tab,323link_tab,324net_offset)325326with open("%s.add.xml" % args.output_file, "w") as ofh:327result_doc.writexml(ofh, addindent=' ', newl='\n')328ofh.close()329330331