Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
eclipse
GitHub Repository: eclipse/sumo
Path: blob/main/tests/complex/traci/contextSubscriptions/simulation_5/runner.py
169689 views
1
#!/usr/bin/env python
2
# Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
3
# Copyright (C) 2008-2025 German Aerospace Center (DLR) and others.
4
# This program and the accompanying materials are made available under the
5
# terms of the Eclipse Public License 2.0 which is available at
6
# https://www.eclipse.org/legal/epl-2.0/
7
# This Source Code may also be made available under the following Secondary
8
# Licenses when the conditions for such availability set forth in the Eclipse
9
# Public License 2.0 are satisfied: GNU General Public License, version 2
10
# or later which is available at
11
# https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13
14
# @file runner.py
15
# @author Daniel Krajzewicz
16
# @author Michael Behrisch
17
# @date 2012-10-19
18
19
from __future__ import absolute_import
20
from __future__ import print_function
21
22
import os
23
import sys
24
25
if "SUMO_HOME" in os.environ:
26
sys.path.append(os.path.join(os.environ["SUMO_HOME"], "tools"))
27
import sumolib # noqa
28
import traci # noqa
29
30
31
def csRes2Str(csr):
32
return ', '.join(sorted(csr[''].keys()))
33
34
35
def runSingle(viewRange, domain, domain2):
36
name = domain._name if hasattr(domain, "_name") else domain.__name__
37
name2 = domain2._name if hasattr(domain2, "_name") else domain2.__name__
38
ids = domain.getIDList() if name != "simulation" else [""]
39
if not ids:
40
print("No objects for domain '%s' at time %s" %
41
(name, traci.simulation.getTime()))
42
return
43
egoID = ids[0]
44
45
print("trying to subscribe to %s around %s '%s' at time %s" % (
46
name2, name, egoID, traci.simulation.getTime()))
47
domain.subscribeContext(egoID, domain2.DOMAIN_ID, viewRange,
48
[traci.constants.TRACI_ID_LIST])
49
traci.simulationStep()
50
responses = domain.getAllContextSubscriptionResults()
51
print(" found %s objects" % len(responses))
52
53
for i in range(3):
54
print(i, csRes2Str(responses))
55
56
domain.unsubscribeContext(egoID, domain2.DOMAIN_ID, viewRange)
57
traci.simulationStep()
58
responses = domain.getAllContextSubscriptionResults()
59
if responses:
60
print("Error: Unsubscribe did not work", responses)
61
else:
62
print("Ok: Unsubscribe successful")
63
sys.stdout.flush()
64
65
66
# main
67
traci.start([sumolib.checkBinary(sys.argv[1]),
68
'-Q', "-c", "sumo.sumocfg",
69
'-a', 'input_additional.add.xml'])
70
traci.simulationStep()
71
for domain2 in traci.DOMAINS:
72
try:
73
runSingle(5, traci.simulation, domain2)
74
except traci.TraCIException:
75
pass
76
77
traci.close()
78
79