Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
eclipse
GitHub Repository: eclipse/sumo
Path: blob/main/tools/net/patchRailPriorities.py
428331 views
1
#!/usr/bin/env python
2
# Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
3
# Copyright (C) 2007-2026 German Aerospace Center (DLR) and others.
4
# This program and the accompanying materials are made available under the
5
# terms of the Eclipse Public License 2.0 which is available at
6
# https://www.eclipse.org/legal/epl-2.0/
7
# This Source Code may also be made available under the following Secondary
8
# Licenses when the conditions for such availability set forth in the Eclipse
9
# Public License 2.0 are satisfied: GNU General Public License, version 2
10
# or later which is available at
11
# https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13
14
# @file patchRailPriorities.py
15
# @author Jakob Erdmann
16
# @date 2026-03-28
17
18
"""
19
Identifies single-track routes and attempts to find sidings for passing in
20
opposite directions. For found sidings
21
- the routingType and priorities in the network are patched so that trains prefer driving on the right
22
- if there is a ptStop on the main line but not on the siding, a new ptStop is added
23
24
"""
25
26
import os
27
import sys
28
import subprocess
29
from collections import defaultdict
30
SUMO_HOME = os.environ.get('SUMO_HOME',
31
os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', '..'))
32
sys.path.append(os.path.join(SUMO_HOME, 'tools'))
33
sys.path.append(os.path.join(SUMO_HOME, 'tools', 'trigger'))
34
import sumolib # noqa
35
from sumolib.options import ArgumentParser # noqa
36
import sumolib.geomhelper as gh # noqa
37
from sumolib.net import lane2edge # noqa
38
from createOvertakingReroutes import parseRoutes, findSwitches, findSidings, filterSidings # noqa
39
40
try:
41
sys.stdout.reconfigure(encoding='utf-8')
42
except: # noqa
43
pass
44
45
NETCONVERT = sumolib.checkBinary('netconvert')
46
47
48
def get_options():
49
ap = ArgumentParser()
50
ap.add_argument("-n", "--net-file", category="input", dest="netfile", required=True, type=ap.net_file,
51
help="the network to read lane and edge permissions")
52
ap.add_argument("-r", "--route-files", category="input", dest="routes", required=True, type=ap.route_file_list,
53
help="The file to road routes from (can also be a ptline-file)")
54
ap.add_argument("-s", "--stop-file", category="input", dest="stopfile", type=ap.additional_file,
55
help="The file to road stops from")
56
ap.add_argument("-o", "--output-file", category="output", dest="outfile", required=True, type=ap.net_file,
57
help="output network file")
58
ap.add_argument("-p", "--patchfile-prefix", category="output", dest="prefix",
59
help="prefix for generated patch files")
60
ap.add_argument("-O", "--stop-output", category="output", dest="stopOutput", type=ap.additional_file,
61
help="stop output file (requires stop-file)")
62
ap.add_argument("--rt-use", default="4", dest="rtUse",
63
help="The routing type for edges that should be used (default '4')")
64
ap.add_argument("--rt-avoid", default="0", dest="rtAvoid",
65
help="The routing type for edges that should be avoided (default '0')")
66
ap.add_argument("--vclass", default="rail",
67
help="only consider edges which permit the given vehicle class")
68
ap.add_argument("--min-length", dest="minLength", metavar="FLOAT", default=100.0,
69
type=float, help="minimum siding length")
70
ap.add_argument("--max-length", dest="maxLength", metavar="FLOAT",
71
type=float, help="maximum siding length")
72
ap.add_argument("--max-detour-factor", dest="maxDetour", metavar="FLOAT", default=2,
73
type=float, help="Maximum factor by which the siding may be longer than the main path")
74
ap.add_argument("--min-priority", dest="minPrio", metavar="INT",
75
type=int, help="Minimum edge priority value to be eligible as siding")
76
ap.add_argument("-x", "--exclude-usage", dest="excludeUsage", type=int,
77
help="Exclude all edges that are used at least INT times by loaded routes")
78
ap.add_argument("-X", "--exclude-reverse-usage", dest="excludeRevUsage", type=int, default="1",
79
help="Exclude all edges that are used at least INT times in reverse by loaded routes")
80
ap.add_argument("--reversal-penalty", dest="reversalPenalty", metavar="FLOAT", default=-1,
81
type=float, help="Set penalty for reversals, by default sidings with reversals are forbidden")
82
ap.add_argument("--use-left", action="store_true", default=False, dest="useLeft",
83
help="Drive on the left instead of on the right")
84
ap.add_argument("--add-stop-signals", action="store_true", default=False, dest="addStopSignals",
85
help="If a siding has a stop on the mainline, add signals in case they are missing")
86
ap.add_argument("-v", "--verbose", action="store_true", default=False,
87
help="tell me what you are doing")
88
options = ap.parse_args()
89
90
options.routes = options.routes.split(',')
91
if options.prefix is None:
92
options.prefix = sumolib.miscutils.getBaseName(options.outfile)
93
options.edges_file = options.prefix + ".edg.xml"
94
options.nodes_file = options.prefix + ".nod.xml"
95
options.connections_file = options.prefix + ".con.xml"
96
return options
97
98
99
def filterNoSignalidings(options, net, sidings, noSignal, stops):
100
"""keep only sidings that have a signal or that have a stop on the main track"""
101
sidings2 = {}
102
signalNodes = set()
103
for main, (rid, fromIndex, siding) in sidings.items():
104
hasNoSignal = tuple(main) in noSignal
105
if hasNoSignal and not any([e in stops for e in main]):
106
continue
107
sidings2[main] = (rid, fromIndex, siding)
108
109
if hasNoSignal:
110
last = net.getEdge(siding[-1])
111
signalNodes.add(last.getToNode())
112
first = net.getEdge(siding[0])
113
signalNodes.add(first.getFromNode())
114
115
return sidings2, signalNodes
116
117
118
def filterBidiSidings(options, net, sidings, edgeUsage):
119
"""keep only sidings where the main track is used in both directions"""
120
sidings2 = {}
121
for main, (rid, fromIndex, siding) in sidings.items():
122
for eid in main:
123
e = net.getEdge(eid)
124
b = e.getBidi()
125
if b is not None and edgeUsage.get(b.getID(), 0) > 0:
126
sidings2[main] = (rid, fromIndex, siding)
127
break
128
return sidings2
129
130
131
def getGeom(net, edges):
132
result = []
133
for eid in edges:
134
e = net.getEdge(eid)
135
result += e.getShape(True)
136
return result
137
138
139
def isSidingRight(net, main, siding):
140
geom = getGeom(net, main)
141
geom2 = getGeom(net, siding)
142
mainOffsets = [gh.polygonOffsetWithMinimumDistanceToPoint(p, geom) for p in geom2]
143
indices = [gh.indexAtShapeOffset(geom, of)[0] for of in mainOffsets]
144
mainDirs = [gh.sub(geom[i + 1], geom[i]) if i is not None else None for i in indices]
145
directions = []
146
for i, p in enumerate(geom2):
147
if indices[i] is not None:
148
directions.append(gh.crossProduct2D(mainDirs[i], gh.sub(p, geom[indices[i]])))
149
directions.sort()
150
return directions[len(directions) // 2] < 0
151
152
153
def writePatches(options, net, sidings, edgeUsage, signalNodes):
154
rTypes = dict() # eid -> routingType
155
for main, (rid, fromIndex, siding) in sidings.items():
156
if isSidingRight(net, main, siding) != options.useLeft:
157
rtMain = options.rtAvoid
158
rtSiding = options.rtUse
159
else:
160
rtMain = options.rtUse
161
rtSiding = options.rtAvoid
162
for eid in main:
163
e = net.getEdge(eid)
164
b = e.getBidi()
165
if b is not None and edgeUsage.get(b.getID(), 0) > 0:
166
rTypes[eid] = rtMain
167
for eid in siding:
168
rTypes[eid] = rtSiding
169
170
rTypes2 = dict()
171
for eid, rt in rTypes.items():
172
e = net.getEdge(eid)
173
b = e.getBidi()
174
if b is not None:
175
if b.getID() not in rTypes:
176
rTypes2[b.getID()] = rtMain if rt == rtSiding else rtSiding
177
rTypes.update(rTypes2)
178
179
with open(options.edges_file, 'w') as outf:
180
sumolib.writeXMLHeader(outf, "$Id$", "edges", schemaPath="edgediff_file.xsd", options=options)
181
for eid in sorted(rTypes.keys()):
182
routingType = rTypes[eid]
183
outf.write(' <edge id="%s" priority="%s" routingType="%s"/>\n' % (
184
eid, routingType, routingType))
185
outf.write("</edges>\n")
186
187
if signalNodes:
188
with open(options.nodes_file, 'w') as outf, open(options.connections_file, 'w') as outf2:
189
sumolib.writeXMLHeader(outf, "$Id$", "nodes", options=options)
190
sumolib.writeXMLHeader(outf2, "$Id$", "connections", options=options)
191
192
for n in sorted(signalNodes, key=lambda n: n.getID()):
193
outf.write(' <node id="%s" type="rail_signal"/>\n' % n.getID())
194
for inEdge in n.getIncoming():
195
for outEdge, conns in inEdge.getOutgoing().items():
196
for conn in conns:
197
if ((rTypes.get(inEdge.getID()) != options.rtUse and
198
rTypes.get(outEdge.getID()) != options.rtUse)):
199
outf2.write(' <connection from="%s" to="%s" fromLane="%s" toLane="%s" uncontrolled="true"/>\n' % ( # noqa
200
inEdge.getID(), outEdge.getID(),
201
conn.getFromLane().getIndex(), conn.getToLane().getIndex()))
202
outf.write("</nodes>\n")
203
outf2.write("</connections>\n")
204
205
206
def getUnique(baseID, stopIDs):
207
index = 1
208
sid = baseID + '#%s' % index
209
while sid in stopIDs:
210
index += 1
211
sid = baseID + '#%s' % index
212
stopIDs.add(sid)
213
return sid
214
215
216
def getClosest(net, stopEdge, startPos, endPos, siding):
217
e = net.getEdge(stopEdge)
218
if startPos < 0:
219
startPos += e.getLength()
220
if endPos < 0:
221
endPos += e.getLength()
222
pos = (startPos + endPos) / 2
223
halfLength = (endPos - startPos) / 2
224
x, y = gh.positionAtShapeOffset(e.getShape(True), pos)
225
bestDist = 1e400
226
bestEdge = None
227
bestPos = None
228
for sEdge in siding:
229
se = net.getEdge(sEdge)
230
sPos, dist = gh.polygonOffsetAndDistanceToPoint((x, y), se.getShape(True))
231
if dist < bestDist:
232
bestDist = dist
233
bestEdge = sEdge
234
bestPos = sPos
235
236
bestLength = net.getEdge(bestEdge).getLength()
237
return bestEdge, max(0, bestPos - halfLength), min(bestPos + halfLength, bestLength)
238
239
240
def writeStops(options, net, sidings, stopIDs, stops):
241
if options.stopfile and options.stopOutput:
242
sidingDict = defaultdict(set) # mainEdge -> [siding, ]
243
for main, (rid, fromIndex, siding) in sidings.items():
244
for e in main:
245
if e in stops:
246
sidingDict[e].add(siding)
247
248
with open(options.stopOutput, 'w') as outf:
249
sumolib.writeXMLHeader(outf, "$Id$", "additional", options=options)
250
# copy existing stops
251
for stopsOnEdge in stops.values():
252
for stop in stopsOnEdge:
253
outf.write(stop.toXML(' '))
254
255
for stopEdge in sorted(sidingDict.keys()):
256
for stop in stops[stopEdge]:
257
sidingEdges = set()
258
for siding in sidingDict[stopEdge]:
259
sid = getUnique(stop.id, stopIDs)
260
sidingEdge, startPos, endPos = getClosest(
261
net, stopEdge, float(stop.startPos), float(stop.endPos), siding)
262
if sidingEdge in sidingEdges:
263
continue
264
sidingEdges.add(sidingEdge)
265
name = ' name="%s"' % stop.name if stop.name else ''
266
lines = ' lines="%s"' % stop.lines if stop.lines else ''
267
outf.write(' <%s id="%s" lane="%s_0" startPos="%.2f" endPos="%.2f"%s%s/>\n' % (
268
stop.name, sid, sidingEdge, startPos, endPos, name, lines))
269
outf.write("</additional>\n")
270
271
272
def parseStops(options):
273
stopIDs = set()
274
stops = defaultdict(list) # stopEdge -> object
275
if options.stopfile:
276
for stop in sumolib.xml.parse(options.stopfile, ['busStop', 'trainStop']):
277
stops[lane2edge(stop.lane)].append(stop)
278
stopIDs.add(stop.id)
279
return stopIDs, stops
280
281
282
def main(options):
283
net = sumolib.net.readNet(options.netfile)
284
noSignal = None
285
signalNodes = set()
286
extraArgs = []
287
288
routes, edgeUsage = parseRoutes(options)
289
stopIDs, stops = parseStops(options)
290
# print("\n".join(map(str, routes.items())))
291
switches = findSwitches(options, routes, net)
292
# print("\n".join(map(str, switches.items())))
293
sidings, sidingRoutes = findSidings(options, routes, switches, net, edgeUsage)
294
# print("\n".join(map(str, sidings.items())))
295
296
if options.addStopSignals:
297
noSignal = set() # tuples of main-edges
298
299
sidings = filterSidings(options, net, sidings, noSignal)
300
# print("\n".join(map(str, sidings.items())))
301
sidings = filterBidiSidings(options, net, sidings, edgeUsage)
302
# print("\n".join(map(str, sidings.items())))
303
304
if options.addStopSignals:
305
sidings, signalNodes = filterNoSignalidings(options, net, sidings, noSignal, stops)
306
if signalNodes:
307
extraArgs += ['-n', options.nodes_file, '-x', options.connections_file]
308
# print("\n".join(map(str, sidings.items())))
309
310
writePatches(options, net, sidings, edgeUsage, signalNodes)
311
writeStops(options, net, sidings, stopIDs, stops)
312
313
if options.verbose:
314
print("Building new net")
315
sys.stderr.flush()
316
317
args = [NETCONVERT,
318
'-s', options.netfile,
319
'-e', options.edges_file,
320
'-o', options.outfile] + extraArgs
321
322
subprocess.call(args, stdout=subprocess.DEVNULL)
323
324
325
if __name__ == "__main__":
326
main(get_options())
327
328