Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
eclipse
GitHub Repository: eclipse/sumo
Path: blob/main/tools/generateRerouters.py
193859 views
1
#!/usr/bin/env python
2
# Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
3
# Copyright (C) 2010-2026 German Aerospace Center (DLR) and others.
4
# This program and the accompanying materials are made available under the
5
# terms of the Eclipse Public License 2.0 which is available at
6
# https://www.eclipse.org/legal/epl-2.0/
7
# This Source Code may also be made available under the following Secondary
8
# Licenses when the conditions for such availability set forth in the Eclipse
9
# Public License 2.0 are satisfied: GNU General Public License, version 2
10
# or later which is available at
11
# https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13
14
# @file generateRerouters.py
15
# @author Jakob Erdmann
16
# @date 2023-02-07
17
18
"""
19
This script generates rerouters for a given list of closed edges and
20
automatically finds upstream rerouter edges that permit rerouting.
21
"""
22
from __future__ import print_function
23
from __future__ import absolute_import
24
import os
25
import sys
26
27
if 'SUMO_HOME' in os.environ:
28
sys.path.append(os.path.join(os.environ['SUMO_HOME'], 'tools'))
29
import sumolib # noqa
30
31
32
def get_options(args=None):
33
op = sumolib.options.ArgumentParser(description="Generate rerouter definition for closed edges",)
34
op.add_option("-n", "--net-file", category="input", dest="netfile", type=op.net_file,
35
help="define the net file (mandatory)", required=True)
36
op.add_option("-o", "--output-file", category="output", dest="outfile", type=op.additional_file,
37
help="define the output rerouter filename", default="rerouters.xml")
38
op.add_option("-x", "--closed-edges", category="input", dest="closedEdges", type=op.edge_list,
39
help="provide a comma-separated list of edges to close")
40
op.add_option("-f", "--closed-edges.input-file", category="input", dest="closedEdgesFile", type=op.file,
41
help="provide a selection file with edges to close")
42
op.add_option("-i", "--id-prefix", category="processing", dest="idPrefix", default="rr",
43
help="id prefix for generated rerouters")
44
op.add_option("--vclass", category="processing", default="passenger",
45
help="only consider necessary detours for the given vehicle class (default passenger)")
46
op.add_option("--allow", category="processing", default="authority",
47
help="vClasses that shall be permitted on the closed edge")
48
op.add_option("--disallow", category="processing",
49
help="vClasses that shall be prohibited on the closed edge")
50
op.add_option("-b", "--begin", category="time", default=0, type=float,
51
help="begin time for the closing")
52
op.add_option("-e", "--end", category="time", default=86400, type=float,
53
help="end time for the closing (default 86400)")
54
options = op.parse_args(args=args)
55
if not options.netfile or (not options.closedEdges and not options.closedEdgesFile):
56
op.print_help()
57
sys.exit(1)
58
59
options.closedEdges = options.closedEdges.split(',') if options.closedEdges else []
60
if options.closedEdgesFile:
61
with sumolib.miscutils.openz(options.closedEdgesFile, "r", encoding="utf-8") as f:
62
for line in f:
63
line = line.strip()
64
if line.startswith("edge:"):
65
edgeID = line[5:]
66
options.closedEdges.append(edgeID)
67
68
return options
69
70
71
def findNotifcationEdges(options, net, closedEdges):
72
result = set()
73
74
# close edges in the network
75
for e in closedEdges:
76
for lane in e.getLanes():
77
p = set(lane.getPermissions())
78
p.remove(options.vclass)
79
lane.setPermissions(p)
80
81
reachable = set()
82
for e in closedEdges:
83
for succ in e.getOutgoing().keys():
84
if succ.allows(options.vclass):
85
reachable.update(net.getReachable(succ, options.vclass))
86
87
upstream = []
88
for e in closedEdges:
89
for pred in e.getIncoming().keys():
90
if pred.allows(options.vclass):
91
upstream.append(pred)
92
93
seen = set()
94
cache = {}
95
while upstream and reachable:
96
cand = upstream.pop(0)
97
if cand in seen:
98
continue
99
seen.add(cand)
100
reachable2 = net.getReachable(cand, options.vclass, cache=cache)
101
found = reachable2.intersection(reachable)
102
if found:
103
result.add(cand)
104
reachable.difference_update(found)
105
for pred in cand.getIncoming().keys():
106
if pred.allows(options.vclass):
107
if pred not in seen:
108
upstream.append(pred)
109
110
return result
111
112
113
def main(options):
114
net = sumolib.net.readNet(options.netfile)
115
116
closedEdges = []
117
for closedID in options.closedEdges:
118
if not net.hasEdge(closedID):
119
print("Error: Unknown closed edge '%s'" % closedID, file=sys.stderr)
120
sys.exit(1)
121
closedEdges.append(net.getEdge(closedID))
122
123
if not closedEdges:
124
print("Error: found no edges to close.", file=sys.stderr)
125
sys.exit(1)
126
127
allowDisallow = ""
128
if options.disallow is not None:
129
allowDisallow = ' disallow="%s"' % options.disallow
130
elif options.allow != "":
131
allowDisallow = ' allow="%s"' % options.allow
132
133
with open(options.outfile, 'w') as outf:
134
sumolib.writeXMLHeader(outf, "$Id$", "additional", options=options)
135
136
rerouterEdges = findNotifcationEdges(options, net, closedEdges)
137
if not rerouterEdges:
138
print("Warning: No detours found. Rerouter will only close edges.", file=sys.stderr)
139
rerouterEdges = closedEdges
140
rerouterEdgeIDs = sorted([e.getID() for e in rerouterEdges])
141
142
outf.write(' <rerouter id="%s" edges="%s">\n' % (
143
options.idPrefix,
144
' '.join(rerouterEdgeIDs)))
145
outf.write(' <interval begin="%s" end="%s">\n' % (options.begin, options.end))
146
for e in closedEdges:
147
outf.write(' <closingReroute id="%s"%s/>\n' % (e.getID(), allowDisallow))
148
149
outf.write(' </interval>\n')
150
outf.write(' </rerouter>\n')
151
outf.write('</additional>\n')
152
153
154
if __name__ == "__main__":
155
main(get_options())
156
157