Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
eclipse
GitHub Repository: eclipse/sumo
Path: blob/main/tests/complex/traci/misc/subscription/runner.py
169689 views
1
#!/usr/bin/env python
2
# Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
3
# Copyright (C) 2008-2025 German Aerospace Center (DLR) and others.
4
# This program and the accompanying materials are made available under the
5
# terms of the Eclipse Public License 2.0 which is available at
6
# https://www.eclipse.org/legal/epl-2.0/
7
# This Source Code may also be made available under the following Secondary
8
# Licenses when the conditions for such availability set forth in the Eclipse
9
# Public License 2.0 are satisfied: GNU General Public License, version 2
10
# or later which is available at
11
# https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13
14
# @file runner.py
15
# @author Michael Behrisch
16
# @date 2023-11-14
17
18
19
import os
20
import sys
21
import inspect
22
from pprint import pprint
23
24
if "SUMO_HOME" in os.environ:
25
sys.path.append(os.path.join(os.environ["SUMO_HOME"], "tools"))
26
import sumolib
27
if os.environ.pop('LIBSUMO_AS_TRACI', None):
28
import traci as pure_traci
29
import libsumo as traci
30
elif os.environ.pop('LIBTRACI_AS_TRACI', None):
31
import traci as pure_traci
32
import libtraci as traci
33
else:
34
import traci
35
pure_traci = traci
36
37
try:
38
traci.start([sumolib.checkBinary("sumo"), "-c", "sumo.sumocfg", "+a", "input_additional.add.xml"])
39
traci.simulationStep()
40
for dt in pure_traci.DOMAINS:
41
if dt._name == "gui":
42
continue
43
for ft in inspect.getmembers(dt):
44
if inspect.ismethod(ft[1]):
45
source = inspect.getsource(ft[1])
46
if "self._getUniversal(" in source:
47
s = source.index("_getUniversal(") + 17
48
if "," in source[s:]:
49
variable = source[s:source.index(",", s)]
50
remainder = source[source.index(",", s):source.index(")", s)]
51
else:
52
variable = source[s:source.index(")", s)]
53
remainder = ""
54
if hasattr(traci.constants, variable):
55
print("Subscribing to %s.%s." % (dt._name, variable))
56
sys.stdout.flush()
57
v = getattr(traci.constants, variable)
58
if dt._name == "edge":
59
name = "1si"
60
elif dt._name == "lane":
61
name = "1si_0"
62
elif dt._name in ("junction", "trafficlight"):
63
name = "A" if "CONSTRAINT" in variable else "0"
64
else:
65
name = dt._name + "_0"
66
param = None
67
if '"s"' in remainder:
68
param = {v: "3o_0"}
69
elif '"d"' in remainder:
70
param = {v: 0.}
71
elif '"i"' in remainder:
72
param = {v: 0}
73
elif '"b"' in remainder:
74
param = {v: ("b", 1)}
75
elif '"B"' in remainder:
76
param = {v: ("B", 0)}
77
elif '"tds"' in remainder: # effort or traveltime for vehicle
78
param = {v: ("tds", 2, 0., "1si")}
79
elif '"tdddds"' in remainder: # follow speed for vehicle
80
param = {v: ("tdddds", 5, 0., 0., 0., 0., "1si")}
81
elif '"tddds"' in remainder: # secure gap for vehicle
82
param = {v: ("tddds", 4, 0., 0., 0., "1si")}
83
elif '"tdd"' in remainder: # stop speed for vehicle
84
param = {v: ("tdd", 2, 0., 0.)}
85
elif '"tisb"' in remainder: # stop parameter for vehicle
86
param = {v: ("tisb", 3, 0, "edge", False)}
87
elif '"tru"' in remainder:
88
param = {v: ("tru", 2, ("1si", 0., 0), traci.constants.REQUEST_DRIVINGDIST)}
89
elif '"tou"' in remainder:
90
param = {v: ("tou", 2, (400., 495.), traci.constants.REQUEST_DRIVINGDIST)}
91
try:
92
if dt._name == "simulation":
93
traci.simulation.subscribe([v], parameters=param)
94
else:
95
getattr(traci, dt._name).subscribe(name, [v], parameters=param)
96
except traci.TraCIException as ex:
97
print(ex)
98
print("Expecting failure for %s.ADD subscription." % dt._name)
99
sys.stdout.flush()
100
try:
101
if dt._name == "simulation":
102
traci.simulation.subscribe([traci.constants.ADD])
103
else:
104
getattr(traci, dt._name).subscribe(name, [traci.constants.ADD])
105
except traci.TraCIException as ex:
106
print(ex)
107
traci.simulationStep()
108
for dt in pure_traci.DOMAINS:
109
pprint(getattr(traci, dt._name).getAllSubscriptionResults())
110
finally:
111
traci.close()
112
113