Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
eclipse
GitHub Repository: eclipse/sumo
Path: blob/main/tools/libsumo/__init__.py
193772 views
1
# Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
2
# Copyright (C) 2018-2026 German Aerospace Center (DLR) and others.
3
# This program and the accompanying materials are made available under the
4
# terms of the Eclipse Public License 2.0 which is available at
5
# https://www.eclipse.org/legal/epl-2.0/
6
# This Source Code may also be made available under the following Secondary
7
# Licenses when the conditions for such availability set forth in the Eclipse
8
# Public License 2.0 are satisfied: GNU General Public License, version 2
9
# or later which is available at
10
# https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
11
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
12
13
# @file __init__.py
14
# @author Benjamin Striner
15
# @author Michael Behrisch
16
# @date 2018-06-05
17
18
import os
19
import sys
20
import warnings
21
22
SUMO_DATA_HOME = None
23
try:
24
import sumo_data
25
SUMO_DATA_HOME = sumo_data.__path__[0]
26
if not os.environ.get("SUMO_HOME"):
27
os.environ["SUMO_HOME"] = SUMO_DATA_HOME
28
if not os.environ.get("PROJ_LIB") and not os.environ.get("PROJ_DATA"):
29
os.environ["PROJ_LIB"] = os.environ["PROJ_DATA"] = os.path.join(SUMO_DATA_HOME, "data", "proj")
30
except ImportError:
31
# fall back to the eclipse-sumo wheel
32
try:
33
import sumo
34
if not hasattr(sumo, "SUMO_HOME"): # maybe it is not the correct module
35
raise ImportError()
36
SUMO_DATA_HOME = sumo.SUMO_HOME
37
if not os.environ.get("SUMO_HOME"):
38
os.environ["SUMO_HOME"] = SUMO_DATA_HOME
39
except ImportError:
40
if not os.environ.get("SUMO_HOME"):
41
warnings.warn("SUMO_HOME is not set and neither sumo-data nor the eclipse-sumo wheel are installed!")
42
43
if hasattr(sys, "_is_gil_enabled") and not sys._is_gil_enabled():
44
warnings.warn("This package has not been validated in free-threaded Python (no-GIL mode).")
45
if hasattr(os, "add_dll_directory"):
46
# since Python 3.8 the DLL search path has to be set explicitly see https://bugs.python.org/issue43173
47
if SUMO_DATA_HOME and os.path.exists(os.path.join(SUMO_DATA_HOME, "bin", "zlib.dll")):
48
os.add_dll_directory(os.path.abspath(os.path.join(SUMO_DATA_HOME, "bin")))
49
elif "SUMO_HOME" in os.environ and os.path.exists(os.path.join(os.environ["SUMO_HOME"], "bin", "zlib.dll")):
50
os.add_dll_directory(os.path.join(os.environ["SUMO_HOME"], "bin"))
51
elif os.path.exists(os.path.join(os.path.dirname(__file__), "..", "..", "bin", "zlib.dll")):
52
os.add_dll_directory(os.path.join(os.path.dirname(__file__), "..", "..", "bin"))
53
54
try:
55
# this tries to determine the version number of an installed wheel
56
import importlib.metadata # noqa
57
__version__ = importlib.metadata.version(__name__)
58
except ImportError:
59
# this is the fallback version, it gets replaced with the current version on "make install" or "make dist"
60
__version__ = "0.0.0"
61
62
from traci import connection, constants, exceptions, _vehicle, _person, _trafficlight, _simulation # noqa
63
from traci.step import StepManager, StepListener # noqa
64
from .libsumo import vehicle, simulation, person, trafficlight, edge # noqa
65
from .libsumo import TraCIStage, TraCINextStopData, TraCIReservation, TraCILogic, TraCIPhase, TraCIException # noqa
66
from .libsumo import TraCICollision, TraCISignalConstraint # noqa
67
from ._libsumo import TraCILogic_phases_get, TraCILogic_phases_set, TraCILogic_swiginit, new_TraCILogic # noqa
68
from .libsumo import * # noqa
69
70
DOMAINS = [
71
busstop, # noqa
72
calibrator, # noqa
73
chargingstation, # noqa
74
edge,
75
gui, # noqa
76
inductionloop, # noqa
77
junction, # noqa
78
lane, # noqa
79
lanearea, # noqa
80
meandata, # noqa
81
multientryexit, # noqa
82
overheadwire, # noqa
83
parkingarea, # noqa
84
person,
85
poi, # noqa
86
polygon, # noqa
87
rerouter, # noqa
88
route, # noqa
89
routeprobe, # noqa
90
simulation,
91
trafficlight,
92
variablespeedsign, # noqa
93
vehicle,
94
vehicletype, # noqa
95
]
96
97
_stepManager = StepManager()
98
99
100
def wrapAsClassMethod(func, module):
101
def wrapper(*args, **kwargs):
102
return func(module, *args, **kwargs)
103
return wrapper
104
105
106
TraCIStage.__attr_repr__ = _simulation.Stage.__attr_repr__
107
TraCIStage.__repr__ = _simulation.Stage.__repr__
108
TraCIStage.toXML = _simulation.Stage.toXML
109
TraCICollision.__attr_repr__ = _simulation.Collision.__attr_repr__
110
TraCICollision.__repr__ = _simulation.Collision.__repr__
111
112
TraCINextStopData.__attr_repr__ = _vehicle.StopData.__attr_repr__
113
TraCINextStopData.__repr__ = _vehicle.StopData.__repr__
114
115
TraCIReservation.__attr_repr__ = _person.Reservation.__attr_repr__
116
TraCIReservation.__repr__ = _person.Reservation.__repr__
117
118
119
def set_phases(self, phases):
120
new_phases = [TraCIPhase(p.duration, p.state, p.minDur, p.maxDur, p.next, p.name) for p in phases]
121
TraCILogic_phases_set(self, new_phases)
122
123
124
def TraCILogic__init__(self, *args, **kwargs):
125
# Extract known keyword arguments or set to None if not provided
126
programID = kwargs.get('programID', args[0] if len(args) > 0 else None)
127
type_ = kwargs.get('type', args[1] if len(args) > 1 else None)
128
currentPhaseIndex = kwargs.get('currentPhaseIndex', args[2] if len(args) > 2 else None)
129
phases = kwargs.get('phases', args[3] if len(args) > 3 else None)
130
# subParameter = kwargs.get('subParameter', args[4] if len(args) > 4 else None)
131
132
# Update phases if provided
133
if phases:
134
new_phases = [TraCIPhase(p.duration, p.state, p.minDur, p.maxDur, p.next, p.name) for p in phases]
135
phases = new_phases
136
137
# Rebuild args including the extracted keyword arguments
138
args = (programID, type_, currentPhaseIndex, phases)
139
140
# Initialize with the original function
141
TraCILogic_swiginit(self, new_TraCILogic(*args))
142
143
144
# Override methods and properties
145
TraCILogic.__init__ = TraCILogic__init__
146
TraCILogic.phases = property(TraCILogic_phases_get, set_phases)
147
TraCILogic.getPhases = _trafficlight.Logic.getPhases
148
TraCILogic.__repr__ = _trafficlight.Logic.__repr__
149
TraCIPhase.__repr__ = _trafficlight.Phase.__repr__
150
TraCISignalConstraint.__repr__ = _trafficlight.Constraint.__repr__
151
152
153
exceptions.TraCIException = TraCIException
154
simulation.Stage = TraCIStage
155
vehicle.StopData = TraCINextStopData
156
person.Reservation = TraCIReservation
157
trafficlight.Phase = TraCIPhase
158
trafficlight.Logic = TraCILogic
159
vehicle.addFull = vehicle.add
160
vehicle.addLegacy = wrapAsClassMethod(_vehicle.VehicleDomain.addLegacy, vehicle)
161
vehicle.couldChangeLane = wrapAsClassMethod(_vehicle.VehicleDomain.couldChangeLane, vehicle)
162
vehicle.wantsAndCouldChangeLane = wrapAsClassMethod(_vehicle.VehicleDomain.wantsAndCouldChangeLane, vehicle)
163
vehicle.isStopped = wrapAsClassMethod(_vehicle.VehicleDomain.isStopped, vehicle)
164
vehicle.isStoppedParking = wrapAsClassMethod(_vehicle.VehicleDomain.isStoppedParking, vehicle)
165
vehicle.isStoppedTriggered = wrapAsClassMethod(_vehicle.VehicleDomain.isStoppedTriggered, vehicle)
166
vehicle.isAtBusStop = wrapAsClassMethod(_vehicle.VehicleDomain.isAtBusStop, vehicle)
167
vehicle.isAtContainerStop = wrapAsClassMethod(_vehicle.VehicleDomain.isAtContainerStop, vehicle)
168
vehicle.setBusStop = wrapAsClassMethod(_vehicle.VehicleDomain.setBusStop, vehicle)
169
vehicle.setContainerStop = wrapAsClassMethod(_vehicle.VehicleDomain.setContainerStop, vehicle)
170
vehicle.setParkingAreaStop = wrapAsClassMethod(_vehicle.VehicleDomain.setParkingAreaStop, vehicle)
171
vehicle.setChargingStationStop = wrapAsClassMethod(_vehicle.VehicleDomain.setChargingStationStop, vehicle)
172
vehicle.getRightFollowers = wrapAsClassMethod(_vehicle.VehicleDomain.getRightFollowers, vehicle)
173
vehicle.getRightLeaders = wrapAsClassMethod(_vehicle.VehicleDomain.getRightLeaders, vehicle)
174
vehicle.getLeftFollowers = wrapAsClassMethod(_vehicle.VehicleDomain.getLeftFollowers, vehicle)
175
vehicle.getLeftLeaders = wrapAsClassMethod(_vehicle.VehicleDomain.getLeftLeaders, vehicle)
176
vehicle.getLaneChangeStatePretty = wrapAsClassMethod(_vehicle.VehicleDomain.getLaneChangeStatePretty, vehicle)
177
vehicle._legacyGetLeader = True
178
person.removeStages = wrapAsClassMethod(_person.PersonDomain.removeStages, person)
179
_trafficlight.TraCIException = TraCIException
180
trafficlight.setLinkState = wrapAsClassMethod(_trafficlight.TrafficLightDomain.setLinkState, trafficlight)
181
trafficlight.getNemaPhaseCalls = wrapAsClassMethod(_trafficlight.TrafficLightDomain.getNemaPhaseCalls, trafficlight)
182
trafficlight.setNemaSplits = wrapAsClassMethod(_trafficlight.TrafficLightDomain.setNemaSplits, trafficlight)
183
trafficlight.setNemaOffset = wrapAsClassMethod(_trafficlight.TrafficLightDomain.setNemaOffset, trafficlight)
184
185
186
def addStepListener(listener):
187
return _stepManager.addStepListener(listener)
188
189
190
def removeStepListener(listenerID):
191
return _stepManager.removeStepListener(listenerID)
192
193
194
def isLibsumo():
195
return True
196
197
198
def isLibtraci():
199
return False
200
201
202
def init(port):
203
print("Warning! To make your code usable with traci and libsumo, please use traci.start instead of traci.init.")
204
205
206
hasGUI = simulation.hasGUI
207
load = simulation.load
208
isLoaded = simulation.isLoaded
209
getVersion = simulation.getVersion
210
executeMove = simulation.executeMove
211
212
_libsumo_step = simulation.step
213
214
215
def simulationStep(time=0.):
216
_libsumo_step(time)
217
_stepManager.manageStepListeners(time)
218
219
220
simulation.step = simulationStep
221
222
223
def close():
224
simulation.close()
225
_stepManager.close()
226
227
228
def start(cmd, port=None, numRetries=constants.DEFAULT_NUM_RETRIES, label="default", verbose=False,
229
traceFile=None, traceGetters=True, stdout=None, doSwitch=True):
230
if port is not None:
231
print("Warning! To make your code usable with traci and libsumo, do not set an explicit port.")
232
version = simulation.start(cmd)
233
if traceFile is not None:
234
if _stepManager.startTracing(traceFile, traceGetters, DOMAINS):
235
# simulationStep shows up as simulation.step
236
global _libsumo_step
237
_libsumo_step = _stepManager._addTracing(_libsumo_step, "simulation")
238
_stepManager._traceFile.write("import traci\n")
239
_stepManager.write("start", repr(cmd))
240
return version
241
242
243
def setLegacyGetLeader(enabled):
244
vehicle._legacyGetLeader = enabled
245
246