Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
eclipse
GitHub Repository: eclipse/sumo
Path: blob/main/tools/libsumo/__init__.py
169675 views
1
# Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
2
# Copyright (C) 2018-2025 German Aerospace Center (DLR) and others.
3
# This program and the accompanying materials are made available under the
4
# terms of the Eclipse Public License 2.0 which is available at
5
# https://www.eclipse.org/legal/epl-2.0/
6
# This Source Code may also be made available under the following Secondary
7
# Licenses when the conditions for such availability set forth in the Eclipse
8
# Public License 2.0 are satisfied: GNU General Public License, version 2
9
# or later which is available at
10
# https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
11
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
12
13
# @file __init__.py
14
# @author Benjamin Striner
15
# @author Michael Behrisch
16
# @date 2018-06-05
17
18
import os
19
import sys
20
import warnings
21
SUMO_WHEEL_HOME = None
22
try:
23
import sumo
24
if not hasattr(sumo, "SUMO_HOME"): # maybe it is not the correct module
25
raise ImportError()
26
SUMO_WHEEL_HOME = sumo.SUMO_HOME
27
if "SUMO_HOME" not in os.environ:
28
os.environ["SUMO_HOME"] = SUMO_WHEEL_HOME
29
except ImportError:
30
if "SUMO_HOME" not in os.environ:
31
warnings.warn("SUMO_HOME is not set and the eclipse-sumo wheel is not installed!")
32
if hasattr(sys, "_is_gil_enabled") and not sys._is_gil_enabled():
33
warnings.warn("This package has not been validated in free-threaded Python (no-GIL mode).")
34
if hasattr(os, "add_dll_directory"):
35
# since Python 3.8 the DLL search path has to be set explicitly see https://bugs.python.org/issue43173
36
if SUMO_WHEEL_HOME and os.path.exists(os.path.join(SUMO_WHEEL_HOME, "bin", "zlib.dll")):
37
os.add_dll_directory(os.path.abspath(os.path.join(SUMO_WHEEL_HOME, "bin")))
38
elif "SUMO_HOME" in os.environ and os.path.exists(os.path.join(os.environ["SUMO_HOME"], "bin", "zlib.dll")):
39
os.add_dll_directory(os.path.join(os.environ["SUMO_HOME"], "bin"))
40
elif os.path.exists(os.path.join(os.path.dirname(__file__), "..", "..", "bin", "zlib.dll")):
41
os.add_dll_directory(os.path.join(os.path.dirname(__file__), "..", "..", "bin"))
42
43
from traci import connection, constants, exceptions, _vehicle, _person, _trafficlight, _simulation # noqa
44
from traci.step import StepManager, StepListener # noqa
45
from .libsumo import vehicle, simulation, person, trafficlight, edge # noqa
46
from .libsumo import TraCIStage, TraCINextStopData, TraCIReservation, TraCILogic, TraCIPhase, TraCIException # noqa
47
from .libsumo import TraCICollision, TraCISignalConstraint # noqa
48
from ._libsumo import TraCILogic_phases_get, TraCILogic_phases_set, TraCILogic_swiginit, new_TraCILogic # noqa
49
from .libsumo import * # noqa
50
51
DOMAINS = [
52
busstop, # noqa
53
calibrator, # noqa
54
chargingstation, # noqa
55
edge,
56
gui, # noqa
57
inductionloop, # noqa
58
junction, # noqa
59
lane, # noqa
60
lanearea, # noqa
61
meandata, # noqa
62
multientryexit, # noqa
63
overheadwire, # noqa
64
parkingarea, # noqa
65
person,
66
poi, # noqa
67
polygon, # noqa
68
rerouter, # noqa
69
route, # noqa
70
routeprobe, # noqa
71
simulation,
72
trafficlight,
73
variablespeedsign, # noqa
74
vehicle,
75
vehicletype, # noqa
76
]
77
78
_stepManager = StepManager()
79
80
81
def wrapAsClassMethod(func, module):
82
def wrapper(*args, **kwargs):
83
return func(module, *args, **kwargs)
84
return wrapper
85
86
87
TraCIStage.__attr_repr__ = _simulation.Stage.__attr_repr__
88
TraCIStage.__repr__ = _simulation.Stage.__repr__
89
TraCIStage.toXML = _simulation.Stage.toXML
90
TraCICollision.__attr_repr__ = _simulation.Collision.__attr_repr__
91
TraCICollision.__repr__ = _simulation.Collision.__repr__
92
93
TraCINextStopData.__attr_repr__ = _vehicle.StopData.__attr_repr__
94
TraCINextStopData.__repr__ = _vehicle.StopData.__repr__
95
96
TraCIReservation.__attr_repr__ = _person.Reservation.__attr_repr__
97
TraCIReservation.__repr__ = _person.Reservation.__repr__
98
99
100
def set_phases(self, phases):
101
new_phases = [TraCIPhase(p.duration, p.state, p.minDur, p.maxDur, p.next, p.name) for p in phases]
102
TraCILogic_phases_set(self, new_phases)
103
104
105
def TraCILogic__init__(self, *args, **kwargs):
106
# Extract known keyword arguments or set to None if not provided
107
programID = kwargs.get('programID', args[0] if len(args) > 0 else None)
108
type_ = kwargs.get('type', args[1] if len(args) > 1 else None)
109
currentPhaseIndex = kwargs.get('currentPhaseIndex', args[2] if len(args) > 2 else None)
110
phases = kwargs.get('phases', args[3] if len(args) > 3 else None)
111
# subParameter = kwargs.get('subParameter', args[4] if len(args) > 4 else None)
112
113
# Update phases if provided
114
if phases:
115
new_phases = [TraCIPhase(p.duration, p.state, p.minDur, p.maxDur, p.next, p.name) for p in phases]
116
phases = new_phases
117
118
# Rebuild args including the extracted keyword arguments
119
args = (programID, type_, currentPhaseIndex, phases)
120
121
# Initialize with the original function
122
TraCILogic_swiginit(self, new_TraCILogic(*args))
123
124
125
# Override methods and properties
126
TraCILogic.__init__ = TraCILogic__init__
127
TraCILogic.phases = property(TraCILogic_phases_get, set_phases)
128
TraCILogic.getPhases = _trafficlight.Logic.getPhases
129
TraCILogic.__repr__ = _trafficlight.Logic.__repr__
130
TraCIPhase.__repr__ = _trafficlight.Phase.__repr__
131
TraCISignalConstraint.__repr__ = _trafficlight.Constraint.__repr__
132
133
134
exceptions.TraCIException = TraCIException
135
simulation.Stage = TraCIStage
136
vehicle.StopData = TraCINextStopData
137
person.Reservation = TraCIReservation
138
trafficlight.Phase = TraCIPhase
139
trafficlight.Logic = TraCILogic
140
vehicle.addFull = vehicle.add
141
vehicle.addLegacy = wrapAsClassMethod(_vehicle.VehicleDomain.addLegacy, vehicle)
142
vehicle.couldChangeLane = wrapAsClassMethod(_vehicle.VehicleDomain.couldChangeLane, vehicle)
143
vehicle.wantsAndCouldChangeLane = wrapAsClassMethod(_vehicle.VehicleDomain.wantsAndCouldChangeLane, vehicle)
144
vehicle.isStopped = wrapAsClassMethod(_vehicle.VehicleDomain.isStopped, vehicle)
145
vehicle.isStoppedParking = wrapAsClassMethod(_vehicle.VehicleDomain.isStoppedParking, vehicle)
146
vehicle.isStoppedTriggered = wrapAsClassMethod(_vehicle.VehicleDomain.isStoppedTriggered, vehicle)
147
vehicle.isAtBusStop = wrapAsClassMethod(_vehicle.VehicleDomain.isAtBusStop, vehicle)
148
vehicle.isAtContainerStop = wrapAsClassMethod(_vehicle.VehicleDomain.isAtContainerStop, vehicle)
149
vehicle.setBusStop = wrapAsClassMethod(_vehicle.VehicleDomain.setBusStop, vehicle)
150
vehicle.setContainerStop = wrapAsClassMethod(_vehicle.VehicleDomain.setContainerStop, vehicle)
151
vehicle.setParkingAreaStop = wrapAsClassMethod(_vehicle.VehicleDomain.setParkingAreaStop, vehicle)
152
vehicle.setChargingStationStop = wrapAsClassMethod(_vehicle.VehicleDomain.setChargingStationStop, vehicle)
153
vehicle.getRightFollowers = wrapAsClassMethod(_vehicle.VehicleDomain.getRightFollowers, vehicle)
154
vehicle.getRightLeaders = wrapAsClassMethod(_vehicle.VehicleDomain.getRightLeaders, vehicle)
155
vehicle.getLeftFollowers = wrapAsClassMethod(_vehicle.VehicleDomain.getLeftFollowers, vehicle)
156
vehicle.getLeftLeaders = wrapAsClassMethod(_vehicle.VehicleDomain.getLeftLeaders, vehicle)
157
vehicle.getLaneChangeStatePretty = wrapAsClassMethod(_vehicle.VehicleDomain.getLaneChangeStatePretty, vehicle)
158
vehicle._legacyGetLeader = True
159
person.removeStages = wrapAsClassMethod(_person.PersonDomain.removeStages, person)
160
_trafficlight.TraCIException = TraCIException
161
trafficlight.setLinkState = wrapAsClassMethod(_trafficlight.TrafficLightDomain.setLinkState, trafficlight)
162
trafficlight.getNemaPhaseCalls = wrapAsClassMethod(_trafficlight.TrafficLightDomain.getNemaPhaseCalls, trafficlight)
163
trafficlight.setNemaSplits = wrapAsClassMethod(_trafficlight.TrafficLightDomain.setNemaSplits, trafficlight)
164
trafficlight.setNemaOffset = wrapAsClassMethod(_trafficlight.TrafficLightDomain.setNemaOffset, trafficlight)
165
166
167
def addStepListener(listener):
168
return _stepManager.addStepListener(listener)
169
170
171
def removeStepListener(listenerID):
172
return _stepManager.removeStepListener(listenerID)
173
174
175
def isLibsumo():
176
return True
177
178
179
def isLibtraci():
180
return False
181
182
183
def init(port):
184
print("Warning! To make your code usable with traci and libsumo, please use traci.start instead of traci.init.")
185
186
187
hasGUI = simulation.hasGUI
188
load = simulation.load
189
isLoaded = simulation.isLoaded
190
getVersion = simulation.getVersion
191
executeMove = simulation.executeMove
192
193
_libsumo_step = simulation.step
194
195
196
def simulationStep(time=0.):
197
_libsumo_step(time)
198
_stepManager.manageStepListeners(time)
199
200
201
simulation.step = simulationStep
202
203
204
def close():
205
simulation.close()
206
_stepManager.close()
207
208
209
def start(cmd, port=None, numRetries=constants.DEFAULT_NUM_RETRIES, label="default", verbose=False,
210
traceFile=None, traceGetters=True, stdout=None, doSwitch=True):
211
if port is not None:
212
print("Warning! To make your code usable with traci and libsumo, do not set an explicit port.")
213
version = simulation.start(cmd)
214
if traceFile is not None:
215
if _stepManager.startTracing(traceFile, traceGetters, DOMAINS):
216
# simulationStep shows up as simulation.step
217
global _libsumo_step
218
_libsumo_step = _stepManager._addTracing(_libsumo_step, "simulation")
219
_stepManager._traceFile.write("import traci\n")
220
_stepManager.write("start", repr(cmd))
221
return version
222
223
224
def setLegacyGetLeader(enabled):
225
vehicle._legacyGetLeader = enabled
226
227