Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
eclipse
GitHub Repository: eclipse/sumo
Path: blob/main/tools/net/netcheck.py
169673 views
1
#!/usr/bin/env python
2
# Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
3
# Copyright (C) 2007-2025 German Aerospace Center (DLR) and others.
4
# This program and the accompanying materials are made available under the
5
# terms of the Eclipse Public License 2.0 which is available at
6
# https://www.eclipse.org/legal/epl-2.0/
7
# This Source Code may also be made available under the following Secondary
8
# Licenses when the conditions for such availability set forth in the Eclipse
9
# Public License 2.0 are satisfied: GNU General Public License, version 2
10
# or later which is available at
11
# https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13
14
# @file netcheck.py
15
# @author Michael Behrisch
16
# @author Daniel Krajzewicz
17
# @author Laura Bieker
18
# @author Jakob Erdmann
19
# @author Greg Albiston
20
# @date 2007-03-20
21
22
"""
23
This script performs checks for the network.
24
It needs one parameter, the SUMO net (.net.xml).
25
26
- if either option --source or --destination is given, it checks reachability
27
- if option --right-of-way is set, it checks for problems with right of way rules
28
- if option --short-tls-edges is set, a selection file for short edges (< 15m)
29
incoming to a traffic light is written (see #16014)
30
- by default it tests whether the network is (weakly) connected.
31
"""
32
from __future__ import absolute_import
33
from __future__ import print_function
34
import os
35
import sys
36
THIS_DIR = os.path.dirname(os.path.abspath(__file__))
37
sys.path.append(os.path.join(THIS_DIR, '..'))
38
from sumolib.options import ArgumentParser # noqa
39
import sumolib.net # noqa
40
41
42
def parse_args():
43
op = ArgumentParser()
44
op.add_argument("net", category="input", type=op.net_file,
45
help="The network file to be checked")
46
op.add_argument("-s", "--source", category="input", default=False, type=op.edge_list,
47
help="List edges reachable from the source")
48
op.add_argument("-d", "--destination", category="input", type=op.edge, default=False,
49
help="List edges which can reach the destination")
50
op.add_argument("-w", "--right-of-way", action="store_true", default=False,
51
dest="checkRightOfWay",
52
help="Check for problems with right-of-way rules")
53
op.add_argument("--short-tls-edges", action="store_true", default=False,
54
dest="shortTlsEdges",
55
help="Check for problems with right-of-way rules")
56
op.add_argument("-o", "--selection-output", category="output", type=op.file,
57
help="Write output to file(s) as a loadable selection")
58
op.add_argument("--ignore-connections", action="store_true", default=False,
59
help="Assume full connectivity at each node when computing all connected components")
60
op.add_argument("-l", "--vclass", help="Include only edges allowing vClass")
61
op.add_argument("--component-output", type=op.file, default=None,
62
help=("Write components of disconnected network to file - not compatible " +
63
"with --source or --destination options"))
64
op.add_argument("-r", "--results-output", type=op.file, default=None,
65
help=("Write results summary of disconnected network to file - not compatible " +
66
"with --source or --destination options"))
67
op.add_argument("-t", "--print-types", action="store_true", default=False,
68
help="Print edge types used in the component")
69
70
options = op.parse_args()
71
return options
72
73
74
def getWeaklyConnected(net, vclass=None, ignore_connections=False):
75
components = []
76
edgesLeft = set(net.getEdges())
77
queue = list()
78
while len(edgesLeft) != 0:
79
component = set()
80
queue.append(edgesLeft.pop())
81
while not len(queue) == 0:
82
edge = queue.pop(0)
83
if vclass is None or edge.allows(vclass):
84
component.add(edge.getID())
85
if ignore_connections:
86
for n in (edge.getFromNode().getOutgoing() +
87
edge.getFromNode().getIncoming() +
88
edge.getToNode().getOutgoing() +
89
edge.getToNode().getIncoming()):
90
if n in edgesLeft:
91
queue.append(n)
92
edgesLeft.remove(n)
93
else:
94
for n in edge.getOutgoing():
95
if n in edgesLeft:
96
queue.append(n)
97
edgesLeft.remove(n)
98
for n in edge.getIncoming():
99
if n in edgesLeft:
100
queue.append(n)
101
edgesLeft.remove(n)
102
if component:
103
components.append(sorted(component))
104
return components
105
106
107
def getReachable(net, source_id, options, useIncoming=False):
108
if not net.hasEdge(source_id):
109
sys.exit("'{}' is not a valid edge id".format(source_id))
110
source = net.getEdge(source_id)
111
try:
112
found = net.getReachable(source, options.vclass, useIncoming)
113
if useIncoming:
114
print("{} of {} edges can reach edge '{}':".format(
115
len(found), len(net.getEdges()), source_id))
116
else:
117
print("{} of {} edges are reachable from edge '{}':".format(
118
len(found), len(net.getEdges()), source_id))
119
120
ids = sorted([e.getID() for e in found])
121
if options.selection_output:
122
with open(options.selection_output, 'w') as f:
123
for e in ids:
124
f.write("edge:{}\n".format(e))
125
else:
126
print(ids)
127
128
except RuntimeError as e:
129
sys.exit(e)
130
131
132
def checkRightOfWay(net, options):
133
lanes = []
134
for edge in net.getEdges(False):
135
for lane in edge.getLanes():
136
if lane.isAccelerationLane():
137
for c in lane.getIncomingConnections():
138
if c.getFromLane().isNormal() and c.getState() != "M":
139
lanes.append(lane)
140
141
if options.selection_output:
142
with open(options.selection_output, 'w') as f:
143
for lane in lanes:
144
f.write("lane:%s\n" % lane.getID())
145
else:
146
if lanes:
147
print("Found %s acceleration lanes with invalid right-of-way on the incoming connection" % len(lanes))
148
print('\n'.join([lane.getID() for lane in lanes]))
149
150
151
def checkShortTLSEdges(net, options):
152
SHORT_EDGE = 15
153
short = []
154
for edge in net.getEdges(False):
155
if edge.getLength() < SHORT_EDGE and edge.getToNode().getType() == "traffic_light":
156
short.append(edge)
157
158
if options.selection_output:
159
with open(options.selection_output, 'w') as f:
160
for e in short:
161
f.write("edge:%s\n" % e.getID())
162
else:
163
print('\n'.join([e.getID() for e in short]))
164
if short:
165
print("Found %s edges with length below %sm ahead of traffic lights" % (
166
len(short), SHORT_EDGE))
167
168
169
if __name__ == "__main__":
170
options = parse_args()
171
net = sumolib.net.readNet(options.net,
172
withInternal=(options.vclass == "pedestrian"),
173
withPedestrianConnections=(options.vclass == "pedestrian"))
174
if options.source:
175
getReachable(net, options.source, options)
176
elif options.destination:
177
getReachable(net, options.destination, options, True)
178
elif options.checkRightOfWay:
179
checkRightOfWay(net, options)
180
elif options.shortTlsEdges:
181
checkShortTLSEdges(net, options)
182
else:
183
components = getWeaklyConnected(
184
net, options.vclass, options.ignore_connections)
185
if len(components) != 1:
186
print("Warning! Net is not connected.")
187
188
total = 0
189
max = 0
190
max_idx = ""
191
# Stores the distribution of components by edge counts - key: edge
192
# counts - value: number found
193
edge_count_dist = {}
194
output_str_list = []
195
dist_str_list = []
196
197
# Iterate through components to output and summarize
198
for idx, comp in enumerate(sorted(components, key=lambda c: next(iter(c)))):
199
if options.selection_output:
200
with open("{}comp{}.txt".format(options.selection_output, idx), 'w') as f:
201
for e in comp:
202
f.write("edge:{}\n".format(e))
203
types = set()
204
if options.print_types:
205
for e in comp:
206
types.add(net.getEdge(e).getType())
207
if len(types) > 10:
208
break
209
210
edge_count = len(comp)
211
total += edge_count
212
if edge_count > max:
213
max = edge_count
214
max_idx = idx
215
216
if edge_count not in edge_count_dist:
217
edge_count_dist[edge_count] = 0
218
edge_count_dist[edge_count] += 1
219
output_str = "Component: #{} Edge Count: {}\n {}\n".format(
220
idx, edge_count, " ".join(comp))
221
if types:
222
output_str += "Type(s): {}\n".format(" ".join(sorted(types)))
223
print(output_str)
224
output_str_list.append(output_str)
225
226
# Output the summary of all edges checked and largest component
227
# To avoid divide by zero error if total is 0 for some reason.
228
coverage = 0.0
229
if total > 0:
230
coverage = round(max * 100.0 / total, 2)
231
summary_str = "Total Edges: {}\nLargest Component: #{} Edge Count: {} Coverage: {}%\n".format(
232
total, max_idx, max, coverage)
233
print(summary_str)
234
dist_str = "Edges\tIncidence"
235
print(dist_str)
236
dist_str_list.append(dist_str)
237
238
# Output the distribution of components by edge counts
239
for key, value in sorted(edge_count_dist.items()):
240
dist_str = "{}\t{}".format(key, value)
241
print(dist_str)
242
dist_str_list.append(dist_str)
243
244
# Check for output of components to file
245
if options.component_output is not None:
246
print("Writing component output to: {}".format(
247
options.component_output))
248
with open(options.component_output, 'w') as f:
249
f.write("\n".join(output_str_list))
250
251
# Check for output of results summary to file
252
if options.results_output is not None:
253
print(
254
"Writing results output to: {}".format(options.results_output))
255
with open(options.results_output, 'w') as r:
256
r.write(summary_str)
257
r.write("\n".join(dist_str_list))
258
259