Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
eclipse
GitHub Repository: eclipse/sumo
Path: blob/main/tools/output/generateITetrisIntersectionMetrics.py
169674 views
1
#!/usr/bin/env python
2
# Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
3
# Copyright (C) 2009-2025 German Aerospace Center (DLR) and others.
4
# This program and the accompanying materials are made available under the
5
# terms of the Eclipse Public License 2.0 which is available at
6
# https://www.eclipse.org/legal/epl-2.0/
7
# This Source Code may also be made available under the following Secondary
8
# Licenses when the conditions for such availability set forth in the Eclipse
9
# Public License 2.0 are satisfied: GNU General Public License, version 2
10
# or later which is available at
11
# https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13
14
# @file generateITetrisIntersectionMetrics.py
15
# @author Daniel Krajzewicz
16
# @author Lena Kalleske
17
# @author Michael Behrisch
18
# @date 2007-10-25
19
20
from __future__ import absolute_import
21
from __future__ import print_function
22
from optparse import OptionParser
23
import os
24
import sys
25
from numpy import mean, log
26
from xml.sax import parse, handler
27
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
28
import sumolib.net # noqa
29
30
31
def getBasicStats(net, lanesInfo, T):
32
tlsInfo = {}
33
for tl in net._tlss:
34
tlID = tl._id
35
mQueueLen = []
36
# mWaitTime = []
37
nbStops = []
38
tWaitTime = []
39
seenLanes = set()
40
for conn in tl._connections:
41
lane = conn[0]
42
if lane in seenLanes:
43
continue
44
seenLanes.add(lane)
45
mQueueLenInfo = sum(lanesInfo[lane.getID()]['mQueueLen'])
46
mQueueLen.append(mQueueLenInfo)
47
# mWaitTimeInfo = mean(lanesInfo[lane.getID()]['mWaitTime'])
48
# mWaitTime.append(mWaitTimeInfo)
49
nbStopsInfo = sum(lanesInfo[lane.getID()]['nbStops'])
50
nbStops.append(nbStopsInfo)
51
tWaitTimeInfo = sum(lanesInfo[lane.getID()]['tWaitTime'])
52
tWaitTime.append(tWaitTimeInfo)
53
tlsInfo[tlID] = {}
54
tlsInfo[tlID]['mQueueLen'] = mean(mQueueLen) / T
55
tlsInfo[tlID]['mWaitTime'] = mean(tWaitTime) / T
56
tlsInfo[tlID]['nbStops'] = sum(nbStops)
57
tlsInfo[tlID]['tWaitTime'] = sum(tWaitTime)
58
return tlsInfo
59
60
61
def mergeInfos(tlsInfoAll, tlsInfoOne, metric):
62
for tl in tlsInfoOne.keys():
63
tlsInfoAll[tl][metric] = tlsInfoOne[tl]
64
65
66
def getStatisticsOutput(tlsInfo, outputfile):
67
with open(outputfile, 'w') as opfile:
68
for tl in tlsInfo.keys():
69
opfile.write('Traffic Light %s\n' % tl)
70
opfile.write('=================\n')
71
opfile.write(
72
'mean queue length in front of the junction: %s\n' % tlsInfo[tl]['mQueueLen'])
73
opfile.write(
74
'mean waiting time in front of the junction: %s\n' % tlsInfo[tl]['mWaitTime'])
75
if 'noise' in tlsInfo[tl]:
76
opfile.write('mean noise emission: %s\n' % tlsInfo[tl]['noise'])
77
if 'CO' in tlsInfo[tl]:
78
opfile.write('mean CO emission: %s\n' % tlsInfo[tl]['CO'])
79
opfile.write('mean CO2 emission: %s\n' % tlsInfo[tl]['CO2'])
80
opfile.write('mean HC emission: %s\n' % tlsInfo[tl]['HC'])
81
opfile.write('mean PMx emission: %s\n' % tlsInfo[tl]['PMx'])
82
opfile.write('mean NOx emission: %s\n' % tlsInfo[tl]['NOx'])
83
opfile.write('mean fuel consumption: %s\n' % tlsInfo[tl]['fuel'])
84
opfile.write('number of stops: %s\n' % tlsInfo[tl]['nbStops'])
85
opfile.write('total waiting time at junction: %s\n\n' %
86
tlsInfo[tl]['tWaitTime'])
87
88
89
def tlsIDToNodeID(net):
90
tlsID2NodeID = {}
91
for tls in net._tlss:
92
tlsID = tls._id
93
tlsID2NodeID[tlsID] = []
94
seenNodes = set()
95
for conn in tls._connections:
96
lane = conn[0]
97
edge = lane._edge
98
node = edge._to
99
nodeID = node._id
100
if nodeID not in seenNodes:
101
tlsID2NodeID[tlsID].append(nodeID)
102
seenNodes.add(nodeID)
103
return tlsID2NodeID
104
105
106
class E2OutputReader(handler.ContentHandler):
107
108
def __init__(self):
109
self._lanes = {}
110
self._maxT = 0
111
112
def startElement(self, name, attrs):
113
if name == 'interval':
114
detID = attrs['id']
115
laneID = detID[6:len(detID)]
116
if laneID not in self._lanes:
117
self._lanes[laneID] = {}
118
self._lanes[laneID]['mQueueLen'] = []
119
# self._lanes[laneID]['mWaitTime'] = []
120
self._lanes[laneID]['nbStops'] = []
121
self._lanes[laneID]['tWaitTime'] = []
122
if float(attrs['end']) < 100000000:
123
self._lanes[laneID]['mQueueLen'].append(
124
float(attrs['jamLengthInMetersSum']))
125
# self._lanes[laneID]['mWaitTime'].append(float(attrs['meanHaltingDuration']))
126
self._lanes[laneID]['nbStops'].append(
127
float(attrs['startedHalts']))
128
self._lanes[laneID]['tWaitTime'].append(
129
float(attrs['haltingDurationSum']))
130
self._maxT = max(float(attrs['end']), self._maxT)
131
132
133
class HarmonoiseReader(handler.ContentHandler):
134
135
def __init__(self, net, tlsID2NodeID):
136
self._nodeIntervalNoise = {}
137
self._maxT = 0
138
self._net = net
139
self._tlsNoise = {}
140
self._tlsID2NodeID = tlsID2NodeID
141
142
def startElement(self, name, attrs):
143
if name == 'interval':
144
self._maxT = max(float(attrs['end']), self._maxT)
145
if name == 'edge':
146
edgeID = attrs['id']
147
noiseStr = attrs['noise']
148
if len(noiseStr) < 10:
149
noise = float(noiseStr)
150
else:
151
noise = 0
152
if edgeID[0] == ':':
153
nodeID = edgeID[1:edgeID.find('_')]
154
if nodeID not in self._nodeIntervalNoise:
155
self._nodeIntervalNoise[nodeID] = []
156
self._nodeIntervalNoise[nodeID].append(noise)
157
else:
158
fromNodeID = net.getEdge(edgeID)._from._id
159
if fromNodeID not in self._nodeIntervalNoise:
160
self._nodeIntervalNoise[fromNodeID] = []
161
self._nodeIntervalNoise[fromNodeID].append(noise)
162
toNodeID = net.getEdge(edgeID)._to._id
163
if toNodeID not in self._nodeIntervalNoise:
164
self._nodeIntervalNoise[toNodeID] = []
165
self._nodeIntervalNoise[toNodeID].append(noise)
166
167
def endElement(self, name):
168
if name == 'interval':
169
self.sumIntervalNoise()
170
if name == 'netstats':
171
self.sumNoise()
172
173
def sumIntervalNoise(self):
174
for tls in net._tlss:
175
sum = 0
176
tlsID = tls._id
177
if tlsID not in self._tlsNoise:
178
self._tlsNoise[tlsID] = []
179
for nodeID in self._tlsID2NodeID[tlsID]:
180
for noise in self._nodeIntervalNoise[nodeID]:
181
sum = sum + pow(10, noise / 10)
182
self._tlsNoise[tlsID].append(10 * log(sum) / log(10))
183
184
def sumNoise(self):
185
for tls in net._tlss:
186
tlsID = tls._id
187
self._tlsNoise[tlsID] = sum(self._tlsNoise[tlsID]) / self._maxT
188
189
190
class HBEFAReader(handler.ContentHandler):
191
192
def __init__(self, net, tlsID2NodeID):
193
self._maxT = 0
194
self._net = net
195
self._nodeIntervalCO = {}
196
self._nodeIntervalCO2 = {}
197
self._nodeIntervalHC = {}
198
self._nodeIntervalPMx = {}
199
self._nodeIntervalNOx = {}
200
self._nodeIntervalfuel = {}
201
self._tlsCO = {}
202
self._tlsCO2 = {}
203
self._tlsHC = {}
204
self._tlsPMx = {}
205
self._tlsNOx = {}
206
self._tlsfuel = {}
207
self._tlsID2NodeID = tlsID2NodeID
208
209
def startElement(self, name, attrs):
210
if name == 'interval':
211
self._maxT = max(float(attrs['end']), self._maxT)
212
if name == 'edge':
213
edgeID = attrs['id']
214
CO = float(attrs['CO_perVeh'])
215
CO2 = float(attrs['CO2_perVeh'])
216
HC = float(attrs['HC_perVeh'])
217
PMx = float(attrs['PMx_perVeh'])
218
NOx = float(attrs['NOx_perVeh'])
219
fuel = float(attrs['fuel_perVeh'])
220
if edgeID[0] == ':':
221
nodeIDs = edgeID[1:edgeID.find('_')]
222
else:
223
fromNodeID = net.getEdge(edgeID)._from._id
224
toNodeID = net.getEdge(edgeID)._to._id
225
nodeIDs = [fromNodeID, toNodeID]
226
for nodeID in nodeIDs:
227
if nodeID not in self._nodeIntervalCO:
228
self._nodeIntervalCO[nodeID] = []
229
self._nodeIntervalCO2[nodeID] = []
230
self._nodeIntervalHC[nodeID] = []
231
self._nodeIntervalPMx[nodeID] = []
232
self._nodeIntervalNOx[nodeID] = []
233
self._nodeIntervalfuel[nodeID] = []
234
self._nodeIntervalCO[nodeID].append(CO)
235
self._nodeIntervalCO2[nodeID].append(CO2)
236
self._nodeIntervalHC[nodeID].append(HC)
237
self._nodeIntervalPMx[nodeID].append(PMx)
238
self._nodeIntervalNOx[nodeID].append(NOx)
239
self._nodeIntervalfuel[nodeID].append(fuel)
240
241
def endElement(self, name):
242
if name == 'interval':
243
self.sumInterval()
244
if name == 'netstats':
245
self.sum()
246
247
def sumInterval(self):
248
for tls in net._tlss:
249
tlsID = tls._id
250
if tlsID not in self._tlsCO:
251
self._tlsCO[tlsID] = []
252
self._tlsCO2[tlsID] = []
253
self._tlsHC[tlsID] = []
254
self._tlsPMx[tlsID] = []
255
self._tlsNOx[tlsID] = []
256
self._tlsfuel[tlsID] = []
257
sum = 0
258
for nodeID in self._tlsID2NodeID[tlsID]:
259
for v in self._nodeIntervalCO[nodeID]:
260
sum = sum + v
261
self._tlsCO[tlsID].append(sum)
262
sum = 0
263
for nodeID in self._tlsID2NodeID[tlsID]:
264
for v in self._nodeIntervalCO2[nodeID]:
265
sum = sum + v
266
self._tlsCO2[tlsID].append(sum)
267
sum = 0
268
for nodeID in self._tlsID2NodeID[tlsID]:
269
for v in self._nodeIntervalHC[nodeID]:
270
sum = sum + v
271
self._tlsHC[tlsID].append(sum)
272
sum = 0
273
for nodeID in self._tlsID2NodeID[tlsID]:
274
for v in self._nodeIntervalPMx[nodeID]:
275
sum = sum + v
276
self._tlsPMx[tlsID].append(sum)
277
sum = 0
278
for nodeID in self._tlsID2NodeID[tlsID]:
279
for v in self._nodeIntervalNOx[nodeID]:
280
sum = sum + v
281
self._tlsNOx[tlsID].append(sum)
282
sum = 0
283
for nodeID in self._tlsID2NodeID[tlsID]:
284
for v in self._nodeIntervalfuel[nodeID]:
285
sum = sum + v
286
self._tlsfuel[tlsID].append(sum)
287
288
def sum(self):
289
for tls in net._tlss:
290
tlsID = tls._id
291
self._tlsCO[tlsID] = sum(self._tlsCO[tlsID]) / self._maxT
292
self._tlsCO2[tlsID] = sum(self._tlsCO2[tlsID]) / self._maxT
293
self._tlsHC[tlsID] = sum(self._tlsHC[tlsID]) / self._maxT
294
self._tlsPMx[tlsID] = sum(self._tlsPMx[tlsID]) / self._maxT
295
self._tlsNOx[tlsID] = sum(self._tlsNOx[tlsID]) / self._maxT
296
self._tlsfuel[tlsID] = sum(self._tlsfuel[tlsID]) / self._maxT
297
298
299
# initialise
300
optParser = OptionParser()
301
optParser.add_option("-n", "--netfile", dest="netfile",
302
help="name of the netfile (f.e. 'inputs\\pasubio\\a_costa.net.xml')", metavar="<FILE>",
303
type="string")
304
optParser.add_option("-p", "--path", dest="path",
305
help="name of folder to work with (f.e. 'inputs\\a_costa\\')", metavar="<FOLDER>",
306
type="string", default="./")
307
optParser.add_option("-o", "--harmonoiseFile", dest="harmonoiseFile",
308
help="name of the harmonoise file", metavar="<FOLDER>", type="string")
309
optParser.add_option("-e", "--HBEFAFile", dest="hbefaFile",
310
help="name of the HBEFA file", metavar="<FOLDER>", type="string")
311
312
313
optParser.set_usage(
314
'\n-n inputs\\pasubio\\pasubio.net.xml -p inputs\\pasubio\\')
315
# parse options
316
(options, args) = optParser.parse_args()
317
if not options.netfile:
318
print("Missing arguments")
319
optParser.print_help()
320
exit()
321
322
netfile = options.netfile
323
e2OutputFile = os.path.join(options.path, 'e2_output.xml')
324
325
net = sumolib.net.readNet(netfile)
326
327
e2Output = E2OutputReader()
328
parse(e2OutputFile, e2Output)
329
330
tlsID2NodeID = tlsIDToNodeID(net)
331
tlsInfo = getBasicStats(net, e2Output._lanes, e2Output._maxT)
332
333
if options.harmonoiseFile:
334
harmonoiseOutput = HarmonoiseReader(net, tlsID2NodeID)
335
parse(options.harmonoiseFile, harmonoiseOutput)
336
mergeInfos(tlsInfo, harmonoiseOutput._tlsNoise, 'noise')
337
338
if options.hbefaFile:
339
hbefaOutput = HBEFAReader(net, tlsID2NodeID)
340
parse(options.hbefaFile, hbefaOutput)
341
mergeInfos(tlsInfo, hbefaOutput._tlsCO, 'CO')
342
mergeInfos(tlsInfo, hbefaOutput._tlsCO2, 'CO2')
343
mergeInfos(tlsInfo, hbefaOutput._tlsHC, 'HC')
344
mergeInfos(tlsInfo, hbefaOutput._tlsPMx, 'PMx')
345
mergeInfos(tlsInfo, hbefaOutput._tlsNOx, 'NOx')
346
mergeInfos(tlsInfo, hbefaOutput._tlsfuel, 'fuel')
347
348
getStatisticsOutput(
349
tlsInfo, os.path.join(options.path, "intersection_metrics_summary.txt"))
350
351
print('The calculation is done!')
352
353