Path: blob/main/tests/complex/traci/misc/subscription/runner.py
169689 views
#!/usr/bin/env python1# Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo2# Copyright (C) 2008-2025 German Aerospace Center (DLR) and others.3# This program and the accompanying materials are made available under the4# terms of the Eclipse Public License 2.0 which is available at5# https://www.eclipse.org/legal/epl-2.0/6# This Source Code may also be made available under the following Secondary7# Licenses when the conditions for such availability set forth in the Eclipse8# Public License 2.0 are satisfied: GNU General Public License, version 29# or later which is available at10# https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html11# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later1213# @file runner.py14# @author Michael Behrisch15# @date 2023-11-14161718import os19import sys20import inspect21from pprint import pprint2223if "SUMO_HOME" in os.environ:24sys.path.append(os.path.join(os.environ["SUMO_HOME"], "tools"))25import sumolib26if os.environ.pop('LIBSUMO_AS_TRACI', None):27import traci as pure_traci28import libsumo as traci29elif os.environ.pop('LIBTRACI_AS_TRACI', None):30import traci as pure_traci31import libtraci as traci32else:33import traci34pure_traci = traci3536try:37traci.start([sumolib.checkBinary("sumo"), "-c", "sumo.sumocfg", "+a", "input_additional.add.xml"])38traci.simulationStep()39for dt in pure_traci.DOMAINS:40if dt._name == "gui":41continue42for ft in inspect.getmembers(dt):43if inspect.ismethod(ft[1]):44source = inspect.getsource(ft[1])45if "self._getUniversal(" in source:46s = source.index("_getUniversal(") + 1747if "," in source[s:]:48variable = source[s:source.index(",", s)]49remainder = source[source.index(",", s):source.index(")", s)]50else:51variable = source[s:source.index(")", s)]52remainder = ""53if hasattr(traci.constants, variable):54print("Subscribing to %s.%s." % (dt._name, variable))55sys.stdout.flush()56v = getattr(traci.constants, variable)57if dt._name == "edge":58name = "1si"59elif dt._name == "lane":60name = "1si_0"61elif dt._name in ("junction", "trafficlight"):62name = "A" if "CONSTRAINT" in variable else "0"63else:64name = dt._name + "_0"65param = None66if '"s"' in remainder:67param = {v: "3o_0"}68elif '"d"' in remainder:69param = {v: 0.}70elif '"i"' in remainder:71param = {v: 0}72elif '"b"' in remainder:73param = {v: ("b", 1)}74elif '"B"' in remainder:75param = {v: ("B", 0)}76elif '"tds"' in remainder: # effort or traveltime for vehicle77param = {v: ("tds", 2, 0., "1si")}78elif '"tdddds"' in remainder: # follow speed for vehicle79param = {v: ("tdddds", 5, 0., 0., 0., 0., "1si")}80elif '"tddds"' in remainder: # secure gap for vehicle81param = {v: ("tddds", 4, 0., 0., 0., "1si")}82elif '"tdd"' in remainder: # stop speed for vehicle83param = {v: ("tdd", 2, 0., 0.)}84elif '"tisb"' in remainder: # stop parameter for vehicle85param = {v: ("tisb", 3, 0, "edge", False)}86elif '"tru"' in remainder:87param = {v: ("tru", 2, ("1si", 0., 0), traci.constants.REQUEST_DRIVINGDIST)}88elif '"tou"' in remainder:89param = {v: ("tou", 2, (400., 495.), traci.constants.REQUEST_DRIVINGDIST)}90try:91if dt._name == "simulation":92traci.simulation.subscribe([v], parameters=param)93else:94getattr(traci, dt._name).subscribe(name, [v], parameters=param)95except traci.TraCIException as ex:96print(ex)97print("Expecting failure for %s.ADD subscription." % dt._name)98sys.stdout.flush()99try:100if dt._name == "simulation":101traci.simulation.subscribe([traci.constants.ADD])102else:103getattr(traci, dt._name).subscribe(name, [traci.constants.ADD])104except traci.TraCIException as ex:105print(ex)106traci.simulationStep()107for dt in pure_traci.DOMAINS:108pprint(getattr(traci, dt._name).getAllSubscriptionResults())109finally:110traci.close()111112113