Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
eclipse
GitHub Repository: eclipse/sumo
Path: blob/main/tools/purgatory/elements.py
169674 views
1
# Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
2
# Copyright (C) 2007-2025 German Aerospace Center (DLR) and others.
3
# This program and the accompanying materials are made available under the
4
# terms of the Eclipse Public License 2.0 which is available at
5
# https://www.eclipse.org/legal/epl-2.0/
6
# This Source Code may also be made available under the following Secondary
7
# Licenses when the conditions for such availability set forth in the Eclipse
8
# Public License 2.0 are satisfied: GNU General Public License, version 2
9
# or later which is available at
10
# https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
11
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
12
13
# @file elements.py
14
# @author Yun-Pang Floetteroed
15
# @author Daniel Krajzewicz
16
# @author Michael Behrisch
17
# @date 2007-10-25
18
19
"""
20
This script is to define the classes and functions for
21
- reading network geometric,
22
- calculating link characteristics, such as capacity, travel time and link cost function,
23
- recording vehicular and path information, and
24
- conducting statistic tests.
25
"""
26
from __future__ import absolute_import
27
from __future__ import print_function
28
29
import sys
30
import math
31
from tables import crCurveTable, laneTypeTable
32
import sumolib
33
34
# This class is used for finding the k shortest paths.
35
36
37
class Predecessor:
38
39
def __init__(self, edge, pred, distance):
40
self.edge = edge
41
self.pred = pred
42
self.distance = distance
43
44
# This class is used to build the nodes in the investigated network and
45
# includes the update-function for searching the k shortest paths.
46
47
48
class Vertex(sumolib.net.node.Node):
49
50
"""
51
This class is to store node attributes and the respective incoming/outgoing links.
52
"""
53
54
def __init__(self, id, type=None, coord=None, incLanes=None):
55
sumolib.net.node.Node.__init__(self, id, type, coord, incLanes)
56
self.preds = []
57
self.wasUpdated = False
58
59
def __repr__(self):
60
return self._id
61
62
def _addNewPredecessor(self, edge, updatePred, newPreds):
63
for pred in newPreds:
64
if pred.pred == updatePred:
65
return
66
pred = updatePred
67
while pred.edge is not None:
68
if pred.edge == edge:
69
return
70
pred = pred.pred
71
newPreds.append(Predecessor(edge, updatePred,
72
updatePred.distance + edge.actualtime))
73
74
def update(self, KPaths, edge):
75
updatePreds = edge._from.preds
76
if len(self.preds) == KPaths\
77
and updatePreds[0].distance + edge.actualtime >= self.preds[KPaths - 1].distance:
78
return False
79
newPreds = []
80
updateIndex = 0
81
predIndex = 0
82
while len(newPreds) < KPaths\
83
and (updateIndex < len(updatePreds) or
84
predIndex < len(self.preds)):
85
if predIndex == len(self.preds):
86
self._addNewPredecessor(
87
edge, updatePreds[updateIndex], newPreds)
88
updateIndex += 1
89
elif updateIndex == len(updatePreds):
90
newPreds.append(self.preds[predIndex])
91
predIndex += 1
92
elif updatePreds[updateIndex].distance + edge.actualtime < self.preds[predIndex].distance:
93
self._addNewPredecessor(
94
edge, updatePreds[updateIndex], newPreds)
95
updateIndex += 1
96
else:
97
newPreds.append(self.preds[predIndex])
98
predIndex += 1
99
if predIndex == len(newPreds):
100
return False
101
self.preds = newPreds
102
returnVal = not self.wasUpdated
103
self.wasUpdated = True
104
return returnVal
105
106
# This class is used to store link information and estimate
107
# as well as flow and capacity for the flow computation and some parameters
108
# read from the net.
109
110
111
class Edge(sumolib.net.edge.Edge):
112
113
"""
114
This class is to record link attributes
115
"""
116
117
def __init__(self, label, source, target, prio, function, name):
118
sumolib.net.edge.Edge.__init__(
119
self, label, source, target, prio, function, name)
120
self.capacity = sys.maxsize
121
# parameter for estimating capacities according to signal timing plans
122
self.junction = None
123
self.junctiontype = None
124
self.rightturn = None
125
self.straight = None
126
self.leftturn = None
127
self.uturn = None
128
self.leftlink = []
129
self.straightlink = []
130
self.rightlink = []
131
self.conflictlink = {}
132
# self.againstlinkexist = None
133
self.flow = 0.0
134
self.helpflow = 0.0
135
self.freeflowtime = 0.0
136
self.queuetime = 0.0
137
self.estcapacity = 0.0
138
self.CRcurve = None
139
self.actualtime = 0.0
140
self.ratio = 0.0
141
self.connection = 0
142
self.edgetype = None
143
# parameter in the Lohse traffic assignment
144
self.helpacttime = 0.
145
# parameter in the Lohse traffic assignment
146
self.fTT = 0.
147
# parameter in the Lohse traffic assignment
148
self.TT = 0.
149
# parameter in the Lohse traffic assignment
150
self.delta = 0.
151
# parameter in the Lohse traffic assignment
152
self.helpacttimeEx = 0.
153
# parameter in the matrix estimation
154
self.detected = False
155
self.detectorNum = 0.
156
self.detecteddata = {}
157
self.detectedlanes = 0.
158
self.penalty = 0.
159
self.capLeft = 0.
160
self.capRight = 0.
161
self.capThrough = 0.
162
163
def addLane(self, lane):
164
sumolib.net.edge.Edge.addLane(self, lane)
165
if self._from._id == self._to._id:
166
self.freeflowtime = 0.0
167
else:
168
self.freeflowtime = self._length / self._speed
169
self.actualtime = self.freeflowtime
170
self.helpacttime = self.freeflowtime
171
172
def __repr__(self):
173
cap = str(self.capacity)
174
if self.capacity == sys.maxsize or self.connection != 0:
175
cap = "inf"
176
return "%s_%s_%s_%s<%s|%s|%s|%s|%s|%s|%s|%s|%s>" % (self._function, self._id, self._from, self._to,
177
self.junctiontype, self._speed,
178
self.flow, self._length, self._lanes,
179
self.CRcurve, self.estcapacity, cap, self.ratio)
180
181
def getConflictLink(self):
182
"""
183
method to get the conflict links for each link, when the respective left-turn behavior exists.
184
"""
185
if self._function == 'real' and len(self.leftlink) > 0:
186
for leftEdge in self.leftlink:
187
affectedTurning = None
188
for edge in leftEdge.source.inEdges:
189
if edge.source == self.target:
190
affectedTurning = edge
191
affectedTurning.freeflowtime = 6.
192
affectedTurning.actualtime = affectedTurning.freeflowtime
193
affectedTurning.helpacttime = affectedTurning.freeflowtime
194
195
for edge in leftEdge.source.inEdges:
196
for upstreamlink in edge.source.inEdges:
197
if leftEdge in upstreamlink.rightlink and len(upstreamlink.straightlink) > 0:
198
if upstreamlink not in self.conflictlink:
199
self.conflictlink[upstreamlink] = []
200
self.conflictlink[upstreamlink].append(
201
affectedTurning)
202
203
def getFreeFlowTravelTime(self):
204
return self.freeflowtime
205
206
def addDetectedData(self, detecteddataObj):
207
self.detecteddata[detecteddataObj.label] = detecteddataObj
208
209
def getCapacity(self):
210
"""
211
method to read the link capacity and the cr-curve type from the table.py
212
the applied CR-curve database is retrived from VISUM-Validate-network und VISUM-Koeln-network
213
"""
214
typeList = laneTypeTable[min(len(self._lanes), 4)]
215
for laneType in typeList:
216
if laneType[0] >= self._speed:
217
break
218
219
self.estcapacity = len(self._lanes) * laneType[1]
220
self.CRcurve = laneType[2]
221
222
def getAdjustedCapacity(self, net):
223
"""
224
method to adjust the link capacity based on the given signal timing plans
225
"""
226
straightGreen = 0.
227
rightGreen = 0.
228
leftGreen = 0.
229
greentime = 0.
230
straightSymbol = -1
231
rightSymbol = -1
232
leftSymbol = -1
233
cyclelength = 0.
234
count = 0
235
if self.junctiontype == 'signalized':
236
junction = net._junctions[self.junction]
237
if self.rightturn is not None and self.rightturn != 'O' and self.rightturn != 'o':
238
rightSymbol = int(self.rightturn)
239
if self.leftturn is not None and self.leftturn != 'O' and self.leftturn != 'o':
240
leftSymbol = int(self.leftturn)
241
if self.straight is not None and self.straight != 'O' and self.straight != 'o':
242
straightSymbol = int(self.straight)
243
for phase in junction.phases[:]:
244
count += 1
245
cyclelength += phase.duration
246
if straightSymbol != -1 and phase.green[straightSymbol] == "1":
247
straightGreen += phase.duration
248
if rightSymbol != -1 and phase.green[rightSymbol] == "1":
249
rightGreen += phase.duration
250
if leftSymbol != -1 and phase.green[leftSymbol] == "1":
251
leftGreen += phase.duration
252
if self.straight is not None:
253
self.estcapacity = (
254
straightGreen * (3600. / cyclelength)) / 1.5 * len(self._lanes)
255
else:
256
greentime = max(rightGreen, leftGreen)
257
self.estcapacity = (
258
greentime * (3600. / cyclelength)) / 1.5 * len(self._lanes)
259
260
def getActualTravelTime(self, options, lohse):
261
"""
262
method to calculate/update link travel time
263
"""
264
foutcheck = open('queue_info.txt', 'a')
265
266
if self.CRcurve in crCurveTable:
267
curve = crCurveTable[self.CRcurve]
268
if self.flow == 0.0 or self.connection > 0:
269
self.actualtime = self.freeflowtime
270
elif self.estcapacity != 0. and self.connection == 0:
271
self.actualtime = self.freeflowtime * \
272
(1 + (curve[0] * (self.flow /
273
(self.estcapacity * curve[2]))**curve[1]))
274
275
if ((self.flow > self.estcapacity or self.flow == self.estcapacity) and
276
self.flow > 0. and self.connection == 0):
277
self.queuetime = self.queuetime + options.lamda * \
278
(self.actualtime - self.freeflowtime * (1 + curve[0]))
279
if self.queuetime < 1.:
280
self.queuetime = 0.
281
else:
282
foutcheck.write(
283
'edge.label= %s: queuing time= %s.\n' % (self._id, self.queuetime))
284
foutcheck.write('travel time at capacity: %s; actual travel time: %s.\n' % (
285
self.freeflowtime * (1 + curve[0]), self.actualtime))
286
else:
287
self.queuetime = 0.
288
289
self.actualtime += self.queuetime
290
291
if lohse:
292
self.getLohseParUpdate(options)
293
else:
294
self.helpacttime = self.actualtime
295
296
self.penalty = 0.
297
if len(self.conflictlink) > 0:
298
for edge in self.conflictlink:
299
conflictEdge = edge
300
flowCapRatio = conflictEdge.flow / conflictEdge.estcapacity
301
302
weightFactor = 1.0
303
if self.numberlane == 2.:
304
weightFactor = 0.85
305
elif self.numberlane == 3.:
306
weightFactor = 0.75
307
elif self.numberlane > 3.:
308
weightFactor = 0.6
309
if options.dijkstra != 'extend':
310
for edge in self.conflictlink:
311
penalty = 0.
312
if edge.estcapacity > 0. and edge.flow / edge.estcapacity > 0.12:
313
penalty = weightFactor * \
314
(math.exp(self.flow / self.estcapacity) - 1. +
315
math.exp(edge.flow / edge.estcapacity) - 1.)
316
for affectedTurning in self.conflictlink[edge]:
317
affectedTurning.actualtime = self.actualtime * \
318
penalty
319
if lohse:
320
affectedTurning.helpacttime = self.helpacttime * \
321
penalty
322
else:
323
affectedTurning.helpacttime = affectedTurning.actualtime
324
else:
325
for edge in self.conflictlink:
326
if edge.estcapacity > 0. and edge.flow / edge.estcapacity >= flowCapRatio:
327
conflictEdge = edge
328
flowCapRatio = edge.flow / edge.estcapacity
329
330
if conflictEdge.estcapacity > 0. and conflictEdge.flow / conflictEdge.estcapacity > 0.12:
331
self.penalty = weightFactor * \
332
(math.exp(self.flow / self.estcapacity) - 1. +
333
math.exp(conflictEdge.flow / conflictEdge.estcapacity) - 1.)
334
if lohse:
335
self.penalty *= self.helpacttime
336
else:
337
self.penalty *= self.actualtime
338
foutcheck.close()
339
340
def cleanFlow(self):
341
""" method to reset link flows """
342
self.flow = 0.
343
self.helpflow = 0.
344
345
def getLohseParUpdate(self, options):
346
"""
347
method to update the parameter used in the Lohse-assignment (learning method - Lernverfahren)
348
"""
349
if self.helpacttime > 0.:
350
self.TT = abs(self.actualtime - self.helpacttime) / \
351
self.helpacttime
352
self.fTT = options.v1 / \
353
(1 + math.exp(options.v2 - options.v3 * self.TT))
354
self.delta = options.under + \
355
(options.upper - options.under) / ((1 + self.TT)**self.fTT)
356
self.helpacttimeEx = self.helpacttime
357
self.helpacttime = self.helpacttime + self.delta * \
358
(self.actualtime - self.helpacttime)
359
360
def stopCheck(self, options):
361
"""
362
method to check if the convergence reaches in the Lohse-assignment
363
"""
364
stop = False
365
criteria = 0.
366
criteria = options.cvg1 * \
367
self.helpacttimeEx**(options.cvg2 / options.cvg3)
368
369
if abs(self.actualtime - self.helpacttimeEx) <= criteria:
370
stop = True
371
return stop
372
373
374
class Vehicle:
375
376
"""
377
This class is to store vehicle information, such as departure time, route and travel time.
378
"""
379
380
def __init__(self, label):
381
self.label = label
382
self.method = None
383
self.depart = 0.
384
self.arrival = 0.
385
self.speed = 0.
386
self.route = []
387
self.traveltime = 0.
388
self.travellength = 0.
389
self.departdelay = 0.
390
self.waittime = 0.
391
self.rank = 0.
392
393
def __repr__(self):
394
return "%s_%s_%s_%s_%s_%s<%s>" % (self.label, self.depart, self.arrival, self.speed,
395
self.traveltime, self.travellength, self.route)
396
397
398
pathNum = 0
399
400
401
class Path:
402
403
"""
404
This class is to store path information which is mainly for the C-logit and the Lohse models.
405
"""
406
407
def __init__(self, source, target, edges):
408
self.source = source
409
self.target = target
410
global pathNum
411
self.label = "%s" % pathNum
412
pathNum += 1
413
self.edges = edges
414
self.length = 0.0
415
self.freepathtime = 0.0
416
self.actpathtime = 0.0
417
self.pathflow = 0.0
418
self.helpflow = 0.0
419
self.choiceprob = 0.0
420
self.sumOverlap = 0.0
421
self.utility = 0.0
422
# parameter used in the Lohse traffic assignment
423
self.usedcounts = 1
424
# parameter used in the Lohse traffic assignment
425
self.pathhelpacttime = 0.
426
# record if this path is the currrent shortest one.
427
self.currentshortest = True
428
429
def __repr__(self):
430
return "%s_%s_%s<%s|%s|%s|%s>" % (self.label, self.source, self.target, self.freepathtime,
431
self.pathflow, self.actpathtime, self.edges)
432
433
def getPathLength(self):
434
for edge in self.edges:
435
self.length += edge._length
436
437
def updateSumOverlap(self, newpath, gamma):
438
overlapLength = 0.
439
for edge in self.edges:
440
if edge in newpath.edges:
441
overlapLength += edge._length
442
overlapLength = overlapLength / 1000.
443
lengthOne = self.length / 1000.
444
lengthTwo = newpath.length / 1000.
445
self.sumOverlap += math.pow(overlapLength /
446
(math.pow(lengthOne, 0.5) * math.pow(lengthTwo, 0.5)), gamma)
447
448
def getPathTimeUpdate(self):
449
"""
450
used to update the path travel time in the c-logit and the Lohse traffic assignments
451
"""
452
self.actpathtime = 0.
453
self.pathhelpacttime = 0.
454
for edge in self.edges:
455
self.actpathtime += edge.actualtime
456
self.pathhelpacttime += edge.helpacttime
457
458
self.pathhelpacttime = self.pathhelpacttime / 3600.
459
self.actpathtime = self.actpathtime / 3600.
460
461
462
class TLJunction:
463
464
def __init__(self):
465
self.label = None
466
self.phaseNum = 0
467
self.phases = []
468
469
def __repr__(self):
470
return "%s_%s<%s>" % (self.label, self.phaseNum, self.phases)
471
472
473
class Signalphase:
474
475
def __init__(self, duration, state=None, phase=None, brake=None, yellow=None):
476
self.label = None
477
self.state = state
478
self.duration = duration
479
self.green = ''
480
self.brake = ''
481
self.yellow = ''
482
483
if phase and brake and yellow:
484
self.green = phase[::-1]
485
self.brake = brake[::-1]
486
self.yellow = yellow[::-1]
487
elif self.state:
488
for elem in self.state:
489
if elem == 'G':
490
self.green += '1'
491
self.brake += '0'
492
self.yellow += '0'
493
elif elem == 'y':
494
self.green += '0'
495
self.brake += '0'
496
self.yellow += '1'
497
elif elem == 'r':
498
self.green += '0'
499
self.brake += '1'
500
self.yellow += '0'
501
else:
502
print('no timing plans exist!')
503
504
def __repr__(self):
505
return "%s_%s<%s|%s|%s>" % (self.label, self.duration, self.green, self.brake, self.yellow)
506
507