Path: blob/main/tools/import/citybrain/citybrain_road.py
169679 views
#!/usr/bin/env python1# Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo2# Copyright (C) 2010-2025 German Aerospace Center (DLR) and others.3# This program and the accompanying materials are made available under the4# terms of the Eclipse Public License 2.0 which is available at5# https://www.eclipse.org/legal/epl-2.0/6# This Source Code may also be made available under the following Secondary7# Licenses when the conditions for such availability set forth in the Eclipse8# Public License 2.0 are satisfied: GNU General Public License, version 29# or later which is available at10# https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html11# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later1213# @file citybrain_road.py14# @author Jakob Erdmann15# @date 2021-05-071617import os18import sys19import subprocess20from collections import defaultdict2122if 'SUMO_HOME' in os.environ:23sys.path.append(os.path.join(os.environ['SUMO_HOME'], 'tools'))24import sumolib # noqa252627def get_options(args=None):28optParser = sumolib.options.ArgumentParser(description="Import citybrains network")29optParser.add_argument("-n", "--net-file", dest="netfile",30help="citybrains network file to import")31optParser.add_argument("-o", "--output", dest="output",32default="net.net.xml", help="define the output sumo network filename")33optParser.add_argument("-p", "--prefix", dest="prefix",34default="net", help="prefix for plain xml files")35optParser.add_argument("-j", "--junction-type", dest="junctionType",36default="allway_stop", help="the default type for junctions without traffic light")37optParser.add_argument("-t", "--temp-network", dest="tmp",38default="tmp.net.xml", help="intermediate network file")39optParser.add_argument("-x", "--ignore-connections", dest="ignoreCons", action="store_true", default=False,40help="use connections guessed by netconvert instead of the specified connections")41options = optParser.parse_args(args=args)42if not options.netfile:43optParser.print_help()44sys.exit(1)4546return options474849def main(options):5051nodefile = options.prefix + ".nod.xml"52edgefile = options.prefix + ".edg.xml"5354edg = open(edgefile, "w")5556sumolib.writeXMLHeader(edg, "$Id$", "edges") # noqa5758numNodes = 059numEdges = 060lastEdge = 061edgeLine = 062edgeID1 = ""63edgeID2 = ""64nodes = []65nodeEdges = defaultdict(lambda: 0)66connections = {} # edge -> laneDirections67for i, line in enumerate(open(options.netfile)):68if i == 0:69numNodes = int(line)70elif i <= numNodes:71nodes.append(line)72elif i == numNodes + 1:73numEdges = int(line)74lastEdge = numNodes + 2 + numEdges * 375edgeLine = 076elif i < lastEdge:77if edgeLine == 0:78fromID, toID, length, speed, nLanes1, nLanes2, edgeID1, edgeID2 = line.split()79nodeEdges[toID] += 180nodeEdges[fromID] += 181edg.write(' <edge id="%s" from="%s" to="%s" speed="%s" numLanes="%s" length="%s"/>\n' % (82edgeID1, fromID, toID, speed, nLanes1, length))83edg.write(' <edge id="%s" from="%s" to="%s" speed="%s" numLanes="%s" length="%s"/>\n' % (84edgeID2, toID, fromID, speed, nLanes2, length))85elif edgeLine == 1:86connections[edgeID1] = list(map(int, line.split()))87elif edgeLine == 2:88connections[edgeID2] = list(map(int, line.split()))89edgeLine = (edgeLine + 1) % 390elif i == lastEdge:91# extract traffic signal approach ids92break9394edg.write('</edges>\n')95edg.close()9697with open(nodefile, "w") as nod:98sumolib.writeXMLHeader(nod, "$Id$", "nodes") # noqa99for line in nodes:100lat, lon, nodeID, signalized = line.split()101nodeType = options.junctionType102if signalized == "1":103nodeType = "traffic_light"104elif nodeEdges[nodeID] < 4:105nodeType = "priority"106nod.write(' <node id="%s" x="%s" y="%s" type="%s"/>\n' % (107nodeID, lon, lat, nodeType))108109nod.write('</nodes>\n')110111NETCONVERT = sumolib.checkBinary('netconvert')112113# the sample route data didn't include a single turn-around so let's not build any114args = [NETCONVERT,115'-e', edgefile,116'-n', nodefile,117'--proj.utm',118'--junctions.corner-detail', '0',119'--no-turnarounds',120'--no-internal-links',121]122123if options.ignoreCons:124subprocess.call(args + ['-o', options.output])125else:126# connections are encoded relative to driving direction.127# We build the network once to obtain directions and then build again with the connections128subprocess.call(args + ['-o', options.tmp, '--no-warnings', ])129130net = sumolib.net.readNet(options.tmp)131connfile = options.prefix + ".con.xml"132con = open(connfile, "w")133sumolib.writeXMLHeader(con, "$Id$", "connections") # noqa134135for edgeID in sorted(connections.keys()):136edge = net.getEdge(edgeID)137directionTargets = 3 * [None]138directionTargetLanes = [[], [], []]139140targetIndex = []141for target, cons in edge.getOutgoing().items():142targetLanes = []143for c in cons:144targetLanes.append(c.getToLane().getIndex())145targetIndex.append([cons[0].getJunctionIndex(), target.getID(), targetLanes,146cons[0].getDirection()])147targetIndex.sort()148149numTargets = len(targetIndex)150151if numTargets == 1:152# interpret the single target as "straight"153targetIndex = [[None] * 4] + targetIndex154elif numTargets == 2:155if targetIndex[0][-1] == 's':156targetIndex.insert(0, [None] * 4)157elif targetIndex[1][-1] != 's':158targetIndex.insert(1, [None] * 4)159160# check which direction is missing161162for i, [linkIndex, target, targetLanes, targetDir] in enumerate(targetIndex):163if i == 3:164break165directionTargets[i] = target166directionTargetLanes[i] = targetLanes167168code = connections[edgeID]169lanes = edge.getLanes()170assert len(code) == len(lanes) * 3171for laneIndex, lane in enumerate(lanes):172for index in [0, 1, 2]:173if code[3 * laneIndex + index] == 1:174if directionTargets[index] is not None:175# target lane is not specified, we reuse the target lanes from sumo176targetLanes = directionTargetLanes[index]177toLane = targetLanes[0]178if len(targetLanes) > 1:179directionTargetLanes[index] = targetLanes[1:]180con.write(' <connection from="%s" to="%s" fromLane="%s" toLane="%s"/>\n' % (181edgeID, directionTargets[index], laneIndex, toLane))182# else:183# sys.stderr.write("Warning: Could not find target from edge %s laneIndex %s for direction index %s\n" % (edgeID, laneIndex, index)) # noqa184185con.write('</connections>\n')186con.close()187188subprocess.call(args + ['-o', options.output, '-x', connfile])189190print("Built network with %s nodes and %s edges" % (numNodes, numEdges * 2))191192193if __name__ == "__main__":194if not main(get_options()):195sys.exit(1)196197198