Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
eclipse
GitHub Repository: eclipse/sumo
Path: blob/main/tools/output/computeCoordination.py
169674 views
1
#!/usr/bin/env python
2
# Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
3
# Copyright (C) 2007-2025 German Aerospace Center (DLR) and others.
4
# This program and the accompanying materials are made available under the
5
# terms of the Eclipse Public License 2.0 which is available at
6
# https://www.eclipse.org/legal/epl-2.0/
7
# This Source Code may also be made available under the following Secondary
8
# Licenses when the conditions for such availability set forth in the Eclipse
9
# Public License 2.0 are satisfied: GNU General Public License, version 2
10
# or later which is available at
11
# https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13
14
# @file computeCoordination.py
15
# @author Daniel Wesemeyer
16
# @author Jakob Erdmann
17
# @date 2018-08-18
18
19
"""
20
This script analyses fcd output to compute the coordination factor for a given corridor
21
The coordination factor is defined as the fraction of vehicles that passed the
22
corridor without stopping to the total number of vehicles on the corridor
23
"""
24
from __future__ import absolute_import
25
from __future__ import print_function
26
import os
27
import sys
28
from collections import defaultdict
29
import math # noqa
30
31
if 'SUMO_HOME' in os.environ:
32
sys.path.append(os.path.join(os.environ['SUMO_HOME'], 'tools'))
33
import sumolib # noqa
34
from sumolib.xml import parse_fast_nested # noqa
35
from sumolib.miscutils import parseTime # noqa
36
37
38
def getOptions(args=None):
39
argParser = sumolib.options.ArgumentParser()
40
argParser.add_argument("-f", "--fcd-file", dest="fcdfile",
41
help="Input fcd file (mandatory)")
42
argParser.add_argument("-m", "--min-speed", dest="minspeed", type=float, default=5,
43
help="Minimum speed to consider vehicle undelayed")
44
argParser.add_argument("--filter-route", dest="filterRoute",
45
help="only consider vehicles that passed the given list of edges in order " +
46
"(regardless of gaps)")
47
argParser.add_argument("--entry",
48
help="detect delay after vehicles have passend one of the entry edges " +
49
"(defaults to the first edge of the route)")
50
argParser.add_argument("--exit",
51
help="detect delay until vehicles have passend one of the exit edges " +
52
"(defaults to the last edge of the route)")
53
argParser.add_argument("--full-output", dest="fullOutput",
54
help="For each vehicle that applies, write the time when it entered the corridor " +
55
"and the time when it was first delayed (-1 for undelayed)")
56
options = argParser.parse_args()
57
58
if options.fcdfile is None:
59
sys.exit("mandatory argument FCD_FILE missing")
60
61
if options.filterRoute is not None:
62
options.filterRoute = options.filterRoute.split(',')
63
else:
64
options.filterRoute = []
65
66
if options.entry is not None:
67
options.entry = options.entry.split(',')
68
elif options.filterRoute:
69
options.entry = [options.filterRoute[0]]
70
71
if options.exit is not None:
72
options.exit = options.exit.split(',')
73
elif options.filterRoute:
74
options.exit = [options.filterRoute[-1]]
75
76
return options
77
78
79
def main(options):
80
81
routes = defaultdict(list) # vehID -> recorded edges
82
minSpeed = defaultdict(lambda: 1000)
83
active = set() # vehicles that have passed the first filterRoute edge
84
entryTime = {} # vehID -> time when entering corridor
85
delayTime = {} # vehID -> time when vehicle speed first dropped below threshold
86
for timestep, vehicle in parse_fast_nested(options.fcdfile, 'timestep', ['time'],
87
'vehicle', ['id', 'speed', 'lane']):
88
time = parseTime(timestep.time)
89
vehID = vehicle.id
90
edge = vehicle.lane[0:vehicle.lane.rfind('_')]
91
prevEdge = None if len(routes[vehID]) == 0 else routes[vehID][-1]
92
if prevEdge != edge:
93
if options.exit and prevEdge in options.exit:
94
# vehicle has left the filtered corridor
95
continue
96
routes[vehID].append(edge)
97
if vehID not in active:
98
if not options.entry or edge in options.entry:
99
# vehicle has entered the filtered corridor
100
active.add(vehID)
101
entryTime[vehID] = time
102
else:
103
continue
104
speed = float(vehicle.speed)
105
if speed < minSpeed[vehID]:
106
minSpeed[vehID] = speed
107
if speed < options.minspeed:
108
delayTime[vehID] = time
109
110
vehs = []
111
numDelayed = 0
112
113
for vehID, route in routes.items():
114
skip = False
115
for required in options.filterRoute:
116
if required not in route:
117
skip = True
118
break
119
if not skip:
120
if minSpeed[vehID] < options.minspeed:
121
numDelayed += 1
122
vehs.append((entryTime[vehID], delayTime[vehID], vehID))
123
else:
124
vehs.append((entryTime[vehID], -1, vehID))
125
126
vehs.sort()
127
n = len(vehs)
128
if n > 0:
129
print("n=%s d=%s coordinationFactor=%.2f" % (n, numDelayed, (n - numDelayed) / float(n)))
130
else:
131
print("n=0")
132
133
if options.fullOutput:
134
with open(options.fullOutput, 'w') as outf:
135
outf.write("# entryTime delayTime vehID\n")
136
for record in vehs:
137
outf.write(" ".join(map(str, record)) + "\n")
138
139
140
if __name__ == "__main__":
141
main(getOptions())
142
143