Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
eclipse
GitHub Repository: eclipse/sumo
Path: blob/main/tools/import/citybrain/citybrain_road.py
169679 views
1
#!/usr/bin/env python
2
# Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
3
# Copyright (C) 2010-2025 German Aerospace Center (DLR) and others.
4
# This program and the accompanying materials are made available under the
5
# terms of the Eclipse Public License 2.0 which is available at
6
# https://www.eclipse.org/legal/epl-2.0/
7
# This Source Code may also be made available under the following Secondary
8
# Licenses when the conditions for such availability set forth in the Eclipse
9
# Public License 2.0 are satisfied: GNU General Public License, version 2
10
# or later which is available at
11
# https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13
14
# @file citybrain_road.py
15
# @author Jakob Erdmann
16
# @date 2021-05-07
17
18
import os
19
import sys
20
import subprocess
21
from collections import defaultdict
22
23
if 'SUMO_HOME' in os.environ:
24
sys.path.append(os.path.join(os.environ['SUMO_HOME'], 'tools'))
25
import sumolib # noqa
26
27
28
def get_options(args=None):
29
optParser = sumolib.options.ArgumentParser(description="Import citybrains network")
30
optParser.add_argument("-n", "--net-file", dest="netfile",
31
help="citybrains network file to import")
32
optParser.add_argument("-o", "--output", dest="output",
33
default="net.net.xml", help="define the output sumo network filename")
34
optParser.add_argument("-p", "--prefix", dest="prefix",
35
default="net", help="prefix for plain xml files")
36
optParser.add_argument("-j", "--junction-type", dest="junctionType",
37
default="allway_stop", help="the default type for junctions without traffic light")
38
optParser.add_argument("-t", "--temp-network", dest="tmp",
39
default="tmp.net.xml", help="intermediate network file")
40
optParser.add_argument("-x", "--ignore-connections", dest="ignoreCons", action="store_true", default=False,
41
help="use connections guessed by netconvert instead of the specified connections")
42
options = optParser.parse_args(args=args)
43
if not options.netfile:
44
optParser.print_help()
45
sys.exit(1)
46
47
return options
48
49
50
def main(options):
51
52
nodefile = options.prefix + ".nod.xml"
53
edgefile = options.prefix + ".edg.xml"
54
55
edg = open(edgefile, "w")
56
57
sumolib.writeXMLHeader(edg, "$Id$", "edges") # noqa
58
59
numNodes = 0
60
numEdges = 0
61
lastEdge = 0
62
edgeLine = 0
63
edgeID1 = ""
64
edgeID2 = ""
65
nodes = []
66
nodeEdges = defaultdict(lambda: 0)
67
connections = {} # edge -> laneDirections
68
for i, line in enumerate(open(options.netfile)):
69
if i == 0:
70
numNodes = int(line)
71
elif i <= numNodes:
72
nodes.append(line)
73
elif i == numNodes + 1:
74
numEdges = int(line)
75
lastEdge = numNodes + 2 + numEdges * 3
76
edgeLine = 0
77
elif i < lastEdge:
78
if edgeLine == 0:
79
fromID, toID, length, speed, nLanes1, nLanes2, edgeID1, edgeID2 = line.split()
80
nodeEdges[toID] += 1
81
nodeEdges[fromID] += 1
82
edg.write(' <edge id="%s" from="%s" to="%s" speed="%s" numLanes="%s" length="%s"/>\n' % (
83
edgeID1, fromID, toID, speed, nLanes1, length))
84
edg.write(' <edge id="%s" from="%s" to="%s" speed="%s" numLanes="%s" length="%s"/>\n' % (
85
edgeID2, toID, fromID, speed, nLanes2, length))
86
elif edgeLine == 1:
87
connections[edgeID1] = list(map(int, line.split()))
88
elif edgeLine == 2:
89
connections[edgeID2] = list(map(int, line.split()))
90
edgeLine = (edgeLine + 1) % 3
91
elif i == lastEdge:
92
# extract traffic signal approach ids
93
break
94
95
edg.write('</edges>\n')
96
edg.close()
97
98
with open(nodefile, "w") as nod:
99
sumolib.writeXMLHeader(nod, "$Id$", "nodes") # noqa
100
for line in nodes:
101
lat, lon, nodeID, signalized = line.split()
102
nodeType = options.junctionType
103
if signalized == "1":
104
nodeType = "traffic_light"
105
elif nodeEdges[nodeID] < 4:
106
nodeType = "priority"
107
nod.write(' <node id="%s" x="%s" y="%s" type="%s"/>\n' % (
108
nodeID, lon, lat, nodeType))
109
110
nod.write('</nodes>\n')
111
112
NETCONVERT = sumolib.checkBinary('netconvert')
113
114
# the sample route data didn't include a single turn-around so let's not build any
115
args = [NETCONVERT,
116
'-e', edgefile,
117
'-n', nodefile,
118
'--proj.utm',
119
'--junctions.corner-detail', '0',
120
'--no-turnarounds',
121
'--no-internal-links',
122
]
123
124
if options.ignoreCons:
125
subprocess.call(args + ['-o', options.output])
126
else:
127
# connections are encoded relative to driving direction.
128
# We build the network once to obtain directions and then build again with the connections
129
subprocess.call(args + ['-o', options.tmp, '--no-warnings', ])
130
131
net = sumolib.net.readNet(options.tmp)
132
connfile = options.prefix + ".con.xml"
133
con = open(connfile, "w")
134
sumolib.writeXMLHeader(con, "$Id$", "connections") # noqa
135
136
for edgeID in sorted(connections.keys()):
137
edge = net.getEdge(edgeID)
138
directionTargets = 3 * [None]
139
directionTargetLanes = [[], [], []]
140
141
targetIndex = []
142
for target, cons in edge.getOutgoing().items():
143
targetLanes = []
144
for c in cons:
145
targetLanes.append(c.getToLane().getIndex())
146
targetIndex.append([cons[0].getJunctionIndex(), target.getID(), targetLanes,
147
cons[0].getDirection()])
148
targetIndex.sort()
149
150
numTargets = len(targetIndex)
151
152
if numTargets == 1:
153
# interpret the single target as "straight"
154
targetIndex = [[None] * 4] + targetIndex
155
elif numTargets == 2:
156
if targetIndex[0][-1] == 's':
157
targetIndex.insert(0, [None] * 4)
158
elif targetIndex[1][-1] != 's':
159
targetIndex.insert(1, [None] * 4)
160
161
# check which direction is missing
162
163
for i, [linkIndex, target, targetLanes, targetDir] in enumerate(targetIndex):
164
if i == 3:
165
break
166
directionTargets[i] = target
167
directionTargetLanes[i] = targetLanes
168
169
code = connections[edgeID]
170
lanes = edge.getLanes()
171
assert len(code) == len(lanes) * 3
172
for laneIndex, lane in enumerate(lanes):
173
for index in [0, 1, 2]:
174
if code[3 * laneIndex + index] == 1:
175
if directionTargets[index] is not None:
176
# target lane is not specified, we reuse the target lanes from sumo
177
targetLanes = directionTargetLanes[index]
178
toLane = targetLanes[0]
179
if len(targetLanes) > 1:
180
directionTargetLanes[index] = targetLanes[1:]
181
con.write(' <connection from="%s" to="%s" fromLane="%s" toLane="%s"/>\n' % (
182
edgeID, directionTargets[index], laneIndex, toLane))
183
# else:
184
# sys.stderr.write("Warning: Could not find target from edge %s laneIndex %s for direction index %s\n" % (edgeID, laneIndex, index)) # noqa
185
186
con.write('</connections>\n')
187
con.close()
188
189
subprocess.call(args + ['-o', options.output, '-x', connfile])
190
191
print("Built network with %s nodes and %s edges" % (numNodes, numEdges * 2))
192
193
194
if __name__ == "__main__":
195
if not main(get_options()):
196
sys.exit(1)
197
198