Path: blob/main/tools/import/vissim/tls_vissimXML2SUMOnet_update.py
169679 views
#!/usr/bin/env python1# -*- coding: utf-8 -*-2# Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo3# Copyright (C) 2009-2025 German Aerospace Center (DLR) and others.4# This program and the accompanying materials are made available under the5# terms of the Eclipse Public License 2.0 which is available at6# https://www.eclipse.org/legal/epl-2.0/7# This Source Code may also be made available under the following Secondary8# Licenses when the conditions for such availability set forth in the Eclipse9# Public License 2.0 are satisfied: GNU General Public License, version 210# or later which is available at11# https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html12# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later1314# @file tls_vissimXML2SUMOnet_update.py15# @author Lukas Grohmann <[email protected]>16# @author Gerald Richter <[email protected]>17# @date Jun 11 20151819"""20Converts a VISSIM-tls-description into a SUMO-tls-description and writes21the appended information to a copy of the given sumo.net file22"""23# TODO: usage doc.ref2425from __future__ import print_function26from __future__ import absolute_import2728import os29import sys30from xml.dom import minidom31from copy import deepcopy3233import numpy as np34if 'SUMO_HOME' in os.environ:35sys.path.append(os.path.join(os.environ['SUMO_HOME'], 'tools'))36import sumolib # noqa373839def dict_from_node_attributes(node):40"""takes a xml node and returns a dictionary with its attributes"""41return dict((attn, node.getAttribute(attn)) for attn in42node.attributes.keys())434445def nparr_from_dict_list(dicl_tab, col_ns, col_ts):46"""converts a dictionary into an np array table structure"""47return np.array([tuple(rd.get(cn, '0') for cn in col_ns) for rd in48dicl_tab], dtype=np.dtype(list(zip(col_ns, col_ts))))495051def get_conn_verb_rel(conn_tab, from_to_tab):52"""53returns a dictionary to get the connection id for a54given verbinder id and vice versa55"""56conn_link_d = {} # key = verbinder.id, value = list<connection.id>57for conn in conn_tab:58if ':' not in conn['from']:59link = from_to_tab[60(from_to_tab['f_link'] == conn['from'].split("[")[0]) &61(from_to_tab['t_link'] == conn['to'].split("[")[0])]62if len(link) > 0:63if link["_link"][0] in conn_link_d:64conn_link_d[link["_link"][0]].append(conn['via'])65else:66conn_link_d[link["_link"][0]] = [conn['via']]67else:68print("from: " + conn['from'] + "to: " + conn['to'])69return conn_link_d707172def parse_vissim_net_data(inpxdoc):73link_tab = []74from_to_tab = []75for lin in inpxdoc.getElementsByTagName('link'):76link_d = dict_from_node_attributes(lin)77link_tab.append(link_d)78if lin.hasChildNodes():79lep_d = {} # link end point dict80for ept in ('fromLinkEndPt', 'toLinkEndPt'):81lep_nd = lin.getElementsByTagName(ept)82ch0 = ept[0] # identifier 'f'rom / 't'o83if len(lep_nd) > 0:84dic = dict_from_node_attributes(lep_nd.item(0))85dic['link'], dic['lane'] = dic['lane'].split(' ')86lep_d.update(dict((ch0 + '_' + key, value)87for key, value in dic.items()))88lep_d.update({'_link': link_d['no'], })89from_to_tab.append(lep_d)90# which columns to pick ?91link_tab = nparr_from_dict_list(link_tab, 'no name'.split(), ('O',) * 2)92# lane_tab = just the number of lanes and width93from_to_tab = nparr_from_dict_list(94from_to_tab,95'_link f_link f_lane t_link t_lane'.split(),96'O O i O i'.split())97return link_tab, from_to_tab9899100def parse_sumo_net_data(sumodoc):101junc_tab = []102conn_tab = []103junc_tab = [dict_from_node_attributes(nd) for104nd in sumodoc.getElementsByTagName('junction')]105col_n = ('id', 'type', 'x', 'y', 'incLanes', 'intLanes')106col_t = ('O', ) * 6107junc_tab = nparr_from_dict_list(junc_tab, col_n, col_t)108conn_tab = [dict_from_node_attributes(nd) for109nd in sumodoc.getElementsByTagName('connection')]110col_n = ('from', 'to', 'fromLane', 'toLane', 'via')111col_t = ('O', ) * 5112conn_tab = nparr_from_dict_list(conn_tab, col_n, col_t)113return junc_tab, conn_tab114115116def compute_signal_tables(disp_name_id_d, signal_state_d, prog_list):117"""118completes the signal tables with all duration and beginning times119(in the VISSIm .sig files are only the beginning times of the red and120green phases as well as the durations of the other phases given )121"""122for key, program in signal_state_d.items():123cycletime = int([prog for prog in prog_list124if prog["id"] == key][0]["cycletime"])125for sig_group in program.values():126sig_seq = sig_group["signal_sequence"]127sig_tab = deepcopy(disp_name_id_d[sig_seq])128# durations füllen129for state in sig_group["durations"]:130itemindex = np.where(sig_tab == int(state["display"]))131sig_tab[itemindex[0][0]][itemindex[1][0] + 2] \132= int(state["duration"])133# begins füllen134for cmd in sig_group["begins"]:135itemindex = np.where(sig_tab == int(cmd["display"]))136sig_tab[itemindex[0][0]][itemindex[1][0] + 1] \137= int(cmd["begin"])138# begin Zeiten berechnen139# bei zufälligem begin Eintrag starten140i = itemindex[0][0]141check = 0142while check != len(sig_tab):143if sig_tab[i - 1][1] == -1: # duration bekannt144# überlauf cycletime145if (sig_tab[i][1] - sig_tab[i - 1][2]) < 0:146sig_tab[i - 1][1] = cycletime - \147(sig_tab[i - 1][2] - sig_tab[i][1])148else:149sig_tab[i - 1][1] = sig_tab[i][1] - sig_tab[i - 1][2]150elif sig_tab[i - 1][2] == -1: # begin bekannt151if sig_tab[i - 1][1] > sig_tab[i][1]: # überlauf cycletime152sig_tab[i - 1][2] = \153(cycletime - sig_tab[i - 1][1]) + sig_tab[i][1]154else:155sig_tab[i - 1][2] = sig_tab[i][1] - sig_tab[i - 1][1]156i -= 1157check += 1158i = 0159while i < len(sig_tab):160if (sig_tab[i][1] + sig_tab[i][2]) > cycletime:161diff = cycletime - sig_tab[i][1]162dur = sig_tab[i][2]163sig_tab[i][2] = diff164sig_tab = np.insert(165sig_tab, i, np.array(166(sig_tab[i][0], 0, dur - diff)), 0)167break168i += 1169sig_tab = sig_tab[np.argsort(sig_tab[:, 1])]170sig_group["signal_table"] = sig_tab171172173def sigtable_split_time(signal_state_d, prog_list):174# FIXME: doc175reference_time_d = {}176for key, program in signal_state_d.items():177cycletime = int([prog for prog in prog_list178if prog["id"] == key][0]["cycletime"])179reference_time = np.array([], dtype="int")180reference_duration = np.array([], dtype="int")181for sg in program.values():182reference_time = np.append(183reference_time, sg["signal_table"][:, 1])184reference_time = np.unique(reference_time)185i = 0186while i < len(reference_time):187if i == len(reference_time) - 1:188ele = cycletime - reference_time[i]189else:190ele = reference_time[i + 1] - reference_time[i]191reference_duration = np.append(reference_duration, ele)192i += 1193reference_time_d[key] = {}194reference_time_d[key]["begin"] = reference_time195reference_time_d[key]["duration"] = reference_duration196return reference_time_d197198199def compute_sumo_signal_tables(reference_time_d,200signal_state_d,201sig_disp_list,202tls_state_vissim2SUMO):203# FIXME: doc204for key, program in signal_state_d.items():205for sg in program.values():206state = sg["signal_table"]207ref_time = reference_time_d[key]["begin"]208sumo_tab = ""209for time in ref_time:210i = 0211while i < len(state):212if state[i][1] <= time < state[i][1] + state[i][2]:213sumo_state = tls_state_vissim2SUMO[214[sig for sig in sig_disp_list if215sig["id"] == str(state[i][0])][0]["state"]]216sumo_tab = "".join([sumo_tab, sumo_state])217break218i += 1219sg["sumo_signal_table"] = sumo_tab220221222def get_sigcon_junc_relation(sig_con_tab, sig_group_conn_d, junc_tab):223"""224allocates the VISSIM signalcontrollers to SUMO junctions225"""226sigCon_junc_d = {}227for sig_con in sig_con_tab:228conn_l = []229for sg in sig_con["_sgs"]:230if sg["_sg"] in sig_group_conn_d:231conn_l += sig_group_conn_d[sg["_sg"]]232else:233continue234# intersection235junction = [236junc for junc in junc_tab if len(237set(conn_l).intersection(238junc['intLanes'].split(" "))) > 0]239if len(junction) > 0:240junction = junction[0]241else:242continue243sigCon_junc_d[sig_con["no"]] = junction["id"]244return sigCon_junc_d245246247def get_sigseq_id_list(sig_seq_tab, sig_disp_list):248# FIXME: doc249disp_name_id_d = {}250for seq in sig_seq_tab:251id_list = []252names = seq["name"].split("-")253for name in names:254id_list.append([[disp for disp in sig_disp_list255if disp["name"] == name][0]["id"], -1, -1])256disp_name_id_d[seq["id"]] = np.array(id_list, dtype="int")257return disp_name_id_d258259260def get_sg_connection_data(261conn_tab,262sig_con_tab,263sig_head_d,264edge_list,265conn_link_d):266# FIXME: doc267sig_group_conn_d = {} # dic [sigCon ID] = List <[conn via]>268for con in sig_con_tab:269for sg in con['_sgs']:270# check if a signalHead exists for the given signalGroup271if sg['_sg'] in sig_head_d:272for signal in sig_head_d[sg['_sg']]:273link = signal['link']274lane = str(int(signal['lane']) - 1)275# tls on normal edge or verbinder?276if is_verbinder_d[link] is False:277if link in edge_list:278connection = conn_tab[279(conn_tab["from"] == link) & (280conn_tab["fromLane"] == lane)]281else:282check = True283split_len = 0284while check:285if "".join(286[link, "[", str(split_len), "]"]) \287in edge_list:288split_len += 1289else:290check = False291print("".join([link, "[", str(split_len), "]"]))292connection = conn_tab[(conn_tab["from"] == "".join(293[link, "[", str(split_len), "]"])) &294(conn_tab["fromLane"] == lane)][0]295else:296connection = conn_tab[297(conn_tab["via"] ==298[conn for conn in299conn_link_d[link] if conn[-1] == lane])]300if sg['_sg'] in sig_group_conn_d:301for conn in connection:302sig_group_conn_d[sg['_sg']].append(conn["via"])303else:304sig_group_conn_d[sg['_sg']] = []305for conn in connection:306sig_group_conn_d[sg['_sg']].append(conn["via"])307else:308print(sg['_sg'])309return sig_group_conn_d310311312def parse_sig_file(sig_file):313xmldoc = minidom.parse(sig_file)314print('\n---\n\n* loading signal file:\n\t', sig_file)315316# just getting single head node317sc_node = xmldoc.getElementsByTagName('sc').item(0)318sc_id = sc_node.getAttribute('id')319320# get the signal displays; should be just 1 node321sig_disp_nodes = sc_node.getElementsByTagName('signaldisplays')322display_nodes = sig_disp_nodes.item(0).getElementsByTagName('display')323# build for single current signal324sig_disp_list = [dict_from_node_attributes(disp) for disp in display_nodes]325[sd.update({'_sc_id': sc_id}) for sd in sig_disp_list]326327# signalsequences328sig_seq_tab = []329# sigStat_tab needed for default program330sigStat_tab = []331for sig_seq in sc_node.getElementsByTagName('signalsequence'):332sig_seq_d = dict_from_node_attributes(sig_seq)333sig_seq_tab.append(sig_seq_d)334sig_state_l = [dict_from_node_attributes(sst) for335sst in sig_seq.getElementsByTagName('state')]336[sst.update({'_sigSeq_id': sig_seq_d['id']}) for sst in sig_state_l]337sigStat_tab.extend(sig_state_l)338sgroup_list = []339# holds defaultDurations, fixedstates, cmds340prog_list = []341# dict[prog_id][signal_id]342# <signal_sequence>343# <begins>344# <durations>345signal_state_d = {}346347# reading default program; should be just 1 node348sgs_list = sc_node.getElementsByTagName('sgs')349prog_id = '0' # unsaved350prog_d = dict((('id', prog_id), ))351prog_list.append(prog_d)352353# default sg einlesen354for sg in sgs_list.item(0).getElementsByTagName('sg'):355sg_d = dict_from_node_attributes(sg)356sg_d.update({'_prog_id': prog_id, })357sgroup_list.append(sg_d)358359# other sg reading360progs_node = sc_node.getElementsByTagName('progs').item(0)361for prog_node in progs_node.getElementsByTagName('prog'):362prog_d = dict_from_node_attributes(prog_node)363prog_list.append(prog_d)364prog_id = prog_d['id']365signal_state_d[prog_id] = {}366sg_nl = prog_node.getElementsByTagName(367'sgs').item(0).getElementsByTagName('sg')368for sg in sg_nl:369sg_d = dict_from_node_attributes(sg)370signal_state_d[prog_id][sg_d["sg_id"]] = {}371signal_state_d[prog_id][sg_d["sg_id"]][372"signal_sequence"] = sg_d["signal_sequence"]373signal_state_d[prog_id][sg_d["sg_id"]]["begins"] = []374signal_state_d[prog_id][sg_d["sg_id"]]["durations"] = []375sg_d.update({'_prog_id': prog_id, })376sgroup_list.append(sg_d)377# fixedstates378for fixStat in sg.getElementsByTagName('fixedstates').item(0).\379getElementsByTagName('fixedstate'):380fixst = dict_from_node_attributes(fixStat)381signal_state_d[prog_id][sg_d["sg_id"]][382"durations"].append(fixst)383# cmds384for cmd_node in sg.getElementsByTagName('cmds').item(0).\385getElementsByTagName('cmd'):386cmd_d = dict_from_node_attributes(cmd_node)387signal_state_d[prog_id][sg_d["sg_id"]]["begins"].append(cmd_d)388389return sig_seq_tab, signal_state_d, sig_disp_list, prog_list390391392def parse_inpx_sig_data(xmldoc):393"""parses the signal data from the .inpx file"""394sig_controller_tab = []395sig_head_d = dict()396397for controller in xmldoc.getElementsByTagName('signalController'):398controller_d = dict_from_node_attributes(controller)399sgs_l = [dict_from_node_attributes(sgn) for400sgn in controller.getElementsByTagName('signalGroup')]401for sg in sgs_l:402sg['_sg'] = " ".join([controller.getAttribute('no'), sg['no']])403controller_d['_sgs'] = sgs_l404sig_controller_tab.append(controller_d)405406# parse signalHeads407for s_head_item in xmldoc.getElementsByTagName('signalHead'):408sig_head = dict_from_node_attributes(s_head_item)409sig_head['link'], sig_head['lane'] = sig_head['lane'].split(" ")410# temp = sHead.getAttribute('lane').split(" ") # "link lane"411if sig_head['sg'] in sig_head_d:412sig_head_d[sig_head['sg']].append(sig_head)413else:414sig_head_d[sig_head['sg']] = [sig_head]415return sig_controller_tab, sig_head_d416417418def edit_connections(conn_l, sumodoc, junc_id):419i = 0420while i < len(conn_l):421for via in conn_l[i]:422connection = [conn for conn in423sumodoc.getElementsByTagName("connection")424if conn.getAttribute("via") == via][0]425connection.setAttribute("state", "o") # CHECK426connection.setAttribute("linkIndex", str(i))427connection.setAttribute("tl", junc_id)428i += 1429430431def is_verbinder(xmldoc):432"""checks if a given link is a verbinder"""433is_verbinder_d = dict()434for link in xmldoc.getElementsByTagName("link"):435if len(link.getElementsByTagName("fromLinkEndPt")) > 0:436is_verbinder_d[link.getAttribute("no")] = True437else:438is_verbinder_d[link.getAttribute("no")] = False439return is_verbinder_d440441442def generate_xml_doc(443sumo_tls_d, sigCon_junc_d,444sig_con_tab, reference_time_d,445sumodoc, prog_list_d, sig_group_conn_d):446for tls_id, programs in sumo_tls_d.items():447junc_id = sigCon_junc_d[tls_id]448default_prog_id = [449sig for sig in sig_con_tab if sig["no"] == tls_id][0]["progNo"]450for prog_id, program in programs.items():451signal_table = []452for sg_id, sg in program.items():453if " ".join([tls_id, sg_id]) in sig_group_conn_d:454signal_table.append([sg_id, sg["sumo_signal_table"]])455signal_table = np.array(signal_table)456signal_table = signal_table[457signal_table[:, 0].astype("int").argsort()]458sg_id_l = signal_table[:, 0]459conn_l = []460for s_id in sg_id_l:461conn_l.append(sig_group_conn_d[" ".join([tls_id, s_id])])462signal_table = np.delete(signal_table, 0, 1)463signal_table = np.ravel(signal_table)464state_l = []465i = 0466while i < len(signal_table[0]):467j = 0468duration = []469while j < len(signal_table):470duration.append(signal_table[j][i])471j += 1472state_l.append("".join(duration))473i += 1474duration_l = reference_time_d[tls_id][475prog_id]["duration"]476# edit net file477junction = [junc for junc in sumodoc.getElementsByTagName(478"junction") if junc.getAttribute("id") == junc_id][0]479junction.setAttribute("type", "traffic_light")480net = sumodoc.getElementsByTagName("net")[0]481482edit_connections(conn_l, sumodoc, junc_id)483tl_logic = sumodoc.createElement("tlLogic")484tl_logic.setAttribute("id", junc_id)485tl_logic.setAttribute("type", "static")486tl_logic.setAttribute("programID",487[prog for prog in prog_list_d[tls_id]488if prog["id"] == prog_id][0]["name"])489tl_logic.setAttribute("offset", "0.00")490net.insertBefore(tl_logic, junction)491for state, duration in zip(state_l, duration_l):492phase = sumodoc.createElement("phase")493phase.setAttribute("duration", str(duration / 1000))494phase.setAttribute("state", state)495tl_logic.appendChild(phase)496497# create WAUT498waut = sumodoc.createElement("WAUT")499waut.setAttribute("startProg",500[prog for prog in prog_list_d[tls_id]501if prog["id"] == default_prog_id][0]["name"])502waut.setAttribute("refTime", "100")503waut.setAttribute("id", "".join(["w", tls_id]))504# root.appendChild(WAUT)505net.insertBefore(waut, junction)506507# create waut junction508waut_junc = sumodoc.createElement("wautJunction")509waut_junc.setAttribute("junctionID", junc_id)510waut_junc.setAttribute("wautID", "".join(["w", tls_id]))511# root.appendChild(wautJunction)512net.insertBefore(waut_junc, junction)513514515# global signal color translation definition516tls_state_vissim2SUMO = {'RED': 'r',517'REDAMBER': 'u',518'GREEN': 'g',519'AMBER': 'y',520# this should be different: like in SUMO 'o', 'O'521'FLASHING_GREEN': 'g',522'OFF': 'O'}523524# MAIN525if __name__ == '__main__':526op = sumolib.options.ArgumentParser(527description='TLS conversion utility (VISSIM.inpx to SUMO)')528op.add_argument('--vissim-input', '-V',529category="input", required=True, type=op.file,530help='VISSIM inpx file path')531op.add_argument('--SUMO-net', '-S',532category="input", required=True, type=op.net_file,533help='SUMO net file path')534op.add_argument('--output-file', '-o',535category="output", required=True, type=op.file,536help='output file name')537args = op.parse_args()538print("\n", args, "\n")539print('\n---\n\n* loading VISSIM net:\n\t', args.vissim_input)540xmldoc = minidom.parse(args.vissim_input)541print('\n---\n\n* loading SUMO net:\n\t', args.SUMO_net,)542sumodoc = minidom.parse(args.SUMO_net)543544edge_list = []545for edge in sumodoc.getElementsByTagName('edge'):546# is it a normal edge ?547if not edge.hasAttribute("function"):548edge_list.append(edge.getAttribute("id"))549550# INPX read551sig_con_tab, sig_head_d = parse_inpx_sig_data(xmldoc)552link_tab, from_to_tab = parse_vissim_net_data(xmldoc)553is_verbinder_d = is_verbinder(xmldoc)554555# SUMO NET read556junc_tab, conn_tab = parse_sumo_net_data(sumodoc)557conn_link_d = get_conn_verb_rel(conn_tab, from_to_tab)558559# get the connections for every signal group560sig_group_conn_d = get_sg_connection_data(conn_tab,561sig_con_tab,562sig_head_d,563edge_list,564conn_link_d)565# related junction id for a given Signal Controller566sigCon_junc_d = get_sigcon_junc_relation(sig_con_tab,567sig_group_conn_d,568junc_tab)569570# pick all the .sig files from the signalControllers571sig_files = set(sc[att] for sc in sig_con_tab for att in572sc.keys() if 'supplyFile' in att and '.sig' in sc[att])573# sig_files = ['TestsiteGraz_v01301.sig'] # DEBUG, just 1 file574575reference_time_d = {}576sumo_tls_d = {}577prog_list_d = {}578579for sig_file in sig_files:580sig_file = os.path.join(os.path.dirname(args.vissim_input), sig_file)581sig_seq_tab = []582signal_state_d = {}583sig_disp_list = []584disp_name_id_d = {}585586# parse .sig files587sig_seq_tab, signal_state_d, sig_disp_list, \588prog_list_d[sig_disp_list[0]["_sc_id"]] = parse_sig_file(sig_file)589tls_id = sig_disp_list[0]["_sc_id"]590# returns a numpy array with the reference signal Sequence table591# format: display_id || begin_time || duration592disp_name_id_d = get_sigseq_id_list(sig_seq_tab, sig_disp_list)593594compute_signal_tables(595disp_name_id_d, signal_state_d, prog_list_d[tls_id])596597# reference time and duration for every signal program598# times need to be split, to convert the sig table from VISSIM to SUMO599reference_time_d[tls_id] = sigtable_split_time(600signal_state_d, prog_list_d[tls_id])601602compute_sumo_signal_tables(reference_time_d[tls_id],603signal_state_d,604sig_disp_list,605tls_state_vissim2SUMO)606607# Format: [tls id][signal program id][signal group index]608sumo_tls_d[tls_id] = signal_state_d609610generate_xml_doc(611sumo_tls_d, sigCon_junc_d, sig_con_tab,612reference_time_d, sumodoc, prog_list_d, sig_group_conn_d)613614with open("%s.net.xml" % args.output_file, "w") as ofh:615sumodoc.writexml(ofh, addindent=' ', newl='\n')616ofh.close()617618619