Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
eclipse
GitHub Repository: eclipse/sumo
Path: blob/main/tools/libtraci/__init__.py
169674 views
1
# Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
2
# Copyright (C) 2018-2025 German Aerospace Center (DLR) and others.
3
# This program and the accompanying materials are made available under the
4
# terms of the Eclipse Public License 2.0 which is available at
5
# https://www.eclipse.org/legal/epl-2.0/
6
# This Source Code may also be made available under the following Secondary
7
# Licenses when the conditions for such availability set forth in the Eclipse
8
# Public License 2.0 are satisfied: GNU General Public License, version 2
9
# or later which is available at
10
# https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
11
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
12
13
# @file __init__.py
14
# @author Michael Behrisch
15
# @date 2020-10-08
16
17
import os
18
if hasattr(os, "add_dll_directory"):
19
# since Python 3.8 the DLL search path has to be set explicitly see https://bugs.python.org/issue43173
20
if os.path.exists(os.path.join(os.path.dirname(__file__), "..", "..", "bin", "zlib.dll")):
21
os.add_dll_directory(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "bin")))
22
elif "SUMO_HOME" in os.environ and os.path.exists(os.path.join(os.environ["SUMO_HOME"], "bin", "zlib.dll")):
23
os.add_dll_directory(os.path.join(os.environ["SUMO_HOME"], "bin"))
24
25
from traci import connection, constants, exceptions, _vehicle, _person, _trafficlight, _simulation, _gui # noqa
26
from traci.step import StepManager, StepListener # noqa
27
from .libtraci import vehicle, simulation, person, trafficlight, gui # noqa
28
from .libtraci import * # noqa
29
from .libtraci import TraCIStage, TraCINextStopData, TraCIReservation, TraCILogic, TraCIPhase, TraCIException # noqa
30
from .libtraci import TraCICollision, TraCISignalConstraint # noqa
31
from ._libtraci import TraCILogic_phases_get, TraCILogic_phases_set, TraCILogic_swiginit, new_TraCILogic # noqa
32
33
DOMAINS = [
34
busstop, # noqa
35
calibrator, # noqa
36
chargingstation, # noqa
37
edge, # noqa
38
gui,
39
inductionloop, # noqa
40
junction, # noqa
41
lane, # noqa
42
lanearea, # noqa
43
meandata, # noqa
44
multientryexit, # noqa
45
overheadwire, # noqa
46
parkingarea, # noqa
47
person,
48
poi, # noqa
49
polygon, # noqa
50
rerouter, # noqa
51
route, # noqa
52
routeprobe, # noqa
53
simulation,
54
trafficlight,
55
variablespeedsign, # noqa
56
vehicle,
57
vehicletype, # noqa
58
]
59
60
_stepManager = StepManager()
61
62
63
def wrapAsClassMethod(func, module):
64
def wrapper(*args, **kwargs):
65
return func(module, *args, **kwargs)
66
return wrapper
67
68
69
TraCIStage.__attr_repr__ = _simulation.Stage.__attr_repr__
70
TraCIStage.__repr__ = _simulation.Stage.__repr__
71
TraCIStage.toXML = _simulation.Stage.toXML
72
TraCICollision.__attr_repr__ = _simulation.Collision.__attr_repr__
73
TraCICollision.__repr__ = _simulation.Collision.__repr__
74
75
TraCINextStopData.__attr_repr__ = _vehicle.StopData.__attr_repr__
76
TraCINextStopData.__repr__ = _vehicle.StopData.__repr__
77
78
TraCIReservation.__attr_repr__ = _person.Reservation.__attr_repr__
79
TraCIReservation.__repr__ = _person.Reservation.__repr__
80
81
82
def set_phases(self, phases):
83
new_phases = [TraCIPhase(p.duration, p.state, p.minDur, p.maxDur, p.next, p.name) for p in phases]
84
TraCILogic_phases_set(self, new_phases)
85
86
87
def TraCILogic__init__(self, *args, **kwargs):
88
# Extract known keyword arguments or set to None if not provided
89
programID = kwargs.get('programID', args[0] if len(args) > 0 else None)
90
type_ = kwargs.get('type', args[1] if len(args) > 1 else None)
91
currentPhaseIndex = kwargs.get('currentPhaseIndex', args[2] if len(args) > 2 else None)
92
phases = kwargs.get('phases', args[3] if len(args) > 3 else None)
93
# subParameter = kwargs.get('subParameter', args[4] if len(args) > 4 else None)
94
95
# Update phases if provided
96
if phases:
97
new_phases = [TraCIPhase(p.duration, p.state, p.minDur, p.maxDur, p.next, p.name) for p in phases]
98
phases = new_phases
99
100
# Rebuild args including the extracted keyword arguments
101
args = (programID, type_, currentPhaseIndex, phases)
102
103
# Initialize with the original function
104
TraCILogic_swiginit(self, new_TraCILogic(*args))
105
106
107
# Override methods and properties
108
TraCILogic.__init__ = TraCILogic__init__
109
TraCILogic.phases = property(TraCILogic_phases_get, set_phases)
110
TraCILogic.getPhases = _trafficlight.Logic.getPhases
111
TraCILogic.__repr__ = _trafficlight.Logic.__repr__
112
TraCIPhase.__repr__ = _trafficlight.Phase.__repr__
113
TraCISignalConstraint.__repr__ = _trafficlight.Constraint.__repr__
114
115
exceptions.TraCIException = TraCIException
116
simulation.Stage = TraCIStage
117
vehicle.StopData = TraCINextStopData
118
person.Reservation = TraCIReservation
119
trafficlight.Phase = TraCIPhase
120
trafficlight.Logic = TraCILogic
121
vehicle.addFull = vehicle.add
122
vehicle.addLegacy = wrapAsClassMethod(_vehicle.VehicleDomain.addLegacy, vehicle)
123
vehicle.couldChangeLane = wrapAsClassMethod(_vehicle.VehicleDomain.couldChangeLane, vehicle)
124
vehicle.wantsAndCouldChangeLane = wrapAsClassMethod(_vehicle.VehicleDomain.wantsAndCouldChangeLane, vehicle)
125
vehicle.isStopped = wrapAsClassMethod(_vehicle.VehicleDomain.isStopped, vehicle)
126
vehicle.isStoppedParking = wrapAsClassMethod(_vehicle.VehicleDomain.isStoppedParking, vehicle)
127
vehicle.isStoppedTriggered = wrapAsClassMethod(_vehicle.VehicleDomain.isStoppedTriggered, vehicle)
128
vehicle.isAtBusStop = wrapAsClassMethod(_vehicle.VehicleDomain.isAtBusStop, vehicle)
129
vehicle.isAtContainerStop = wrapAsClassMethod(_vehicle.VehicleDomain.isAtContainerStop, vehicle)
130
vehicle.setBusStop = wrapAsClassMethod(_vehicle.VehicleDomain.setBusStop, vehicle)
131
vehicle.setContainerStop = wrapAsClassMethod(_vehicle.VehicleDomain.setContainerStop, vehicle)
132
vehicle.setParkingAreaStop = wrapAsClassMethod(_vehicle.VehicleDomain.setParkingAreaStop, vehicle)
133
vehicle.setChargingStationStop = wrapAsClassMethod(_vehicle.VehicleDomain.setChargingStationStop, vehicle)
134
vehicle.getRightFollowers = wrapAsClassMethod(_vehicle.VehicleDomain.getRightFollowers, vehicle)
135
vehicle.getRightLeaders = wrapAsClassMethod(_vehicle.VehicleDomain.getRightLeaders, vehicle)
136
vehicle.getLeftFollowers = wrapAsClassMethod(_vehicle.VehicleDomain.getLeftFollowers, vehicle)
137
vehicle.getLeftLeaders = wrapAsClassMethod(_vehicle.VehicleDomain.getLeftLeaders, vehicle)
138
vehicle.getLaneChangeStatePretty = wrapAsClassMethod(_vehicle.VehicleDomain.getLaneChangeStatePretty, vehicle)
139
vehicle._legacyGetLeader = True
140
person.removeStages = wrapAsClassMethod(_person.PersonDomain.removeStages, person)
141
_trafficlight.TraCIException = TraCIException
142
trafficlight.setLinkState = wrapAsClassMethod(_trafficlight.TrafficLightDomain.setLinkState, trafficlight)
143
trafficlight.getNemaPhaseCalls = wrapAsClassMethod(_trafficlight.TrafficLightDomain.getNemaPhaseCalls, trafficlight)
144
trafficlight.setNemaSplits = wrapAsClassMethod(_trafficlight.TrafficLightDomain.setNemaSplits, trafficlight)
145
trafficlight.setNemaOffset = wrapAsClassMethod(_trafficlight.TrafficLightDomain.setNemaOffset, trafficlight)
146
147
148
def addStepListener(listener):
149
return _stepManager.addStepListener(listener)
150
151
152
def removeStepListener(listenerID):
153
return _stepManager.removeStepListener(listenerID)
154
155
156
def isLibsumo():
157
return False
158
159
160
def isLibtraci():
161
return True
162
163
164
def setLegacyGetLeader(enabled):
165
vehicle._legacyGetLeader = enabled
166
167
168
hasGUI = simulation.hasGUI
169
init = simulation.init
170
load = simulation.load
171
isLoaded = simulation.isLoaded
172
getVersion = simulation.getVersion
173
executeMove = simulation.executeMove
174
setOrder = simulation.setOrder
175
switch = simulation.switchConnection
176
177
_libtraci_step = simulation.step
178
179
180
def simulationStep(step=0):
181
_libtraci_step(step)
182
_stepManager.manageStepListeners(step)
183
184
185
simulation.step = simulationStep
186
187
188
def close():
189
simulation.close()
190
_stepManager.close()
191
192
193
def start(cmd, port=None, numRetries=constants.DEFAULT_NUM_RETRIES, label="default", verbose=False,
194
traceFile=None, traceGetters=True, stdout=None, doSwitch=True):
195
version = simulation.start(cmd, -1 if port is None else port, numRetries, label, verbose)
196
if traceFile is not None:
197
if _stepManager.startTracing(traceFile, traceGetters, DOMAINS):
198
# simulationStep shows up as simulation.step
199
global _libtraci_step
200
_libtraci_step = _stepManager._addTracing(_libtraci_step, "simulation")
201
_stepManager._traceFile.write("import traci\n")
202
_stepManager.write("start", repr(cmd))
203
return version
204
205
206
gui.DEFAULT_VIEW = _gui.GuiDomain.DEFAULT_VIEW
207
208