Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
eclipse
GitHub Repository: eclipse/sumo
Path: blob/main/tests/complex/traci/contextSubscriptions/matrix_test/runner.py
169689 views
1
#!/usr/bin/env python
2
# Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
3
# Copyright (C) 2008-2025 German Aerospace Center (DLR) and others.
4
# This program and the accompanying materials are made available under the
5
# terms of the Eclipse Public License 2.0 which is available at
6
# https://www.eclipse.org/legal/epl-2.0/
7
# This Source Code may also be made available under the following Secondary
8
# Licenses when the conditions for such availability set forth in the Eclipse
9
# Public License 2.0 are satisfied: GNU General Public License, version 2
10
# or later which is available at
11
# https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13
14
# @file runner.py
15
# @author Daniel Krajzewicz
16
# @author Michael Behrisch
17
# @date 2012-10-19
18
19
from __future__ import absolute_import
20
from __future__ import print_function
21
22
import os
23
import sys
24
25
if "SUMO_HOME" in os.environ:
26
sys.path.append(os.path.join(os.environ["SUMO_HOME"], "tools"))
27
import sumolib # noqa
28
import traci # noqa
29
30
31
def runSingle(viewRange, domain, domain2, varIDs):
32
name = domain._name if hasattr(domain, "_name") else domain.__name__
33
name2 = domain2._name if hasattr(domain2, "_name") else domain2.__name__
34
ids = domain.getIDList() if name != "simulation" else [""]
35
if not ids:
36
print("No objects for domain '%s' at time %s" %
37
(name, traci.simulation.getTime()))
38
return
39
egoID = ids[0]
40
41
print("trying to subscribe to %s around %s '%s' at time %s" % (
42
name2, name, egoID, traci.simulation.getTime()))
43
domain.subscribeContext(egoID, domain2.DOMAIN_ID, viewRange,
44
varIDs)
45
traci.simulationStep()
46
responses = domain.getAllContextSubscriptionResults()
47
print(" found %s objects" % len(responses))
48
49
domain.unsubscribeContext(egoID, domain2.DOMAIN_ID, viewRange)
50
traci.simulationStep()
51
responses = domain.getAllContextSubscriptionResults()
52
if responses:
53
print("Error: Unsubscribe did not work", responses)
54
else:
55
print("Ok: Unsubscribe successful")
56
sys.stdout.flush()
57
58
59
# main
60
try:
61
traci.start([sumolib.checkBinary(sys.argv[1]),
62
'-Q', "-c", "sumo.sumocfg",
63
'-a', 'input_additional.add.xml'])
64
traci.simulationStep()
65
66
varIDs = None if "--defaults" in sys.argv else [traci.constants.TRACI_ID_LIST]
67
for domain in traci.DOMAINS:
68
for domain2 in traci.DOMAINS:
69
try:
70
runSingle(100, domain, domain2, varIDs)
71
except traci.TraCIException:
72
pass
73
finally:
74
traci.close()
75
76