Path: blob/main/tests/complex/tutorial/city_mobil/data/vehicleControl.py
169689 views
# -*- coding: utf-8 -*-1# Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo2# Copyright (C) 2008-2025 German Aerospace Center (DLR) and others.3# This program and the accompanying materials are made available under the4# terms of the Eclipse Public License 2.0 which is available at5# https://www.eclipse.org/legal/epl-2.0/6# This Source Code may also be made available under the following Secondary7# Licenses when the conditions for such availability set forth in the Eclipse8# Public License 2.0 are satisfied: GNU General Public License, version 29# or later which is available at10# https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html11# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later1213# @file vehicleControl.py14# @author Michael Behrisch15# @author Daniel Krajzewicz16# @author Lena Kalleske17# @date 2008-07-211819from __future__ import absolute_import20from __future__ import print_function21import random22import sys23import os24from optparse import OptionParser2526from constants import PREFIX, DOUBLE_ROWS, STOP_POS, SLOTS_PER_ROW, SLOT_LENGTH, BUS_CAPACITY, BREAK_DELAY27from constants import CYBER_CAPACITY28import statistics2930if 'SUMO_HOME' in os.environ:31tools = os.path.join(os.environ['SUMO_HOME'], 'tools')32sys.path.append(tools)33else:34sys.exit("please declare environment variable 'SUMO_HOME'")3536import traci # noqa37import traci.constants as tc # noqa38import sumolib # noqa394041class Manager:4243def personArrived(self, vehicleID, edge, target):44raise NotImplementedError4546def cyberCarArrived(self, vehicleID, edge):47raise NotImplementedError4849def cyberCarBroken(self, vehicleID, edge):50pass5152def setNewTargets(self):53pass545556class Status:5758def __init__(self, edge, pos):59self.edge = edge60self.pos = pos61self.parking = False62self.target = None63self.targetPos = None64self.slot = None65self.delay = None6667def __repr__(self):68return "%s,%s" % (self.edge, self.pos)697071class Setting:72step = 073manager = None74verbose = False75cyber = False76breakstep = None777879setting = Setting()80occupancy = {}81vehicleStatus = {}82persons = {}83waiting = {}848586def init(manager):87optParser = OptionParser()88optParser.add_option("-v", "--verbose", action="store_true",89default=False, help="tell me what you are doing")90optParser.add_option("-g", "--gui", action="store_true",91default=False, help="run with GUI")92optParser.add_option("-c", "--cyber", action="store_true",93default=False, help="use small cybercars instead of big busses")94optParser.add_option("-d", "--demand", type="int",95default=15, help="period with which the persons are emitted")96optParser.add_option("-b", "--break", type="int", dest="breakstep", metavar="TIMESTEP",97help="let a vehicle break for %s seconds at TIMESTEP" % BREAK_DELAY)98optParser.add_option("-t", "--test", action="store_true",99default=False, help="Run in test mode")100options, _ = optParser.parse_args()101sumoExe = sumolib.checkBinary('sumo')102if options.gui:103sumoExe = sumolib.checkBinary('sumo-gui')104sumoConfig = "%s%02i.sumocfg" % (PREFIX, options.demand)105if options.cyber:106sumoConfig = "%s%02i_cyber.sumocfg" % (PREFIX, options.demand)107traci.start([sumoExe, "-c", sumoConfig])108traci.simulation.subscribe()109setting.manager = manager110setting.verbose = options.verbose111setting.cyber = options.cyber112setting.breakstep = options.breakstep113try:114while setting.step < 100 or statistics.personsRunning > 0:115doStep()116statistics.evaluate(options.test)117finally:118traci.close()119120121def getCapacity():122if setting.cyber:123return CYBER_CAPACITY124return BUS_CAPACITY125126127def getStep():128return setting.step129130131def getPosition(vehicleID):132return vehicleStatus[vehicleID].edge133134135def stopAt(vehicleID, edge, pos=None):136if pos is None:137pos = STOP_POS138if edge.endswith("out") or edge.endswith("in"):139pos = 90.140traci.vehicle.changeTarget(vehicleID, edge)141if setting.verbose:142print("stopAt", vehicleID, edge, pos)143# print vehicleStatus[vehicleID]144# print traci.vehicle.getRoute(vehicleID)145traci.vehicle.setStop(vehicleID, edge, pos)146vehicleStatus[vehicleID].target = edge147vehicleStatus[vehicleID].targetPos = pos148149150def leaveStop(vehicleID, newTarget=None, delay=0):151v = vehicleStatus[vehicleID]152if newTarget:153traci.vehicle.changeTarget(vehicleID, newTarget)154traci.vehicle.setStop(vehicleID, v.target, v.targetPos, 0, delay)155v.target = None156v.targetPos = None157v.parking = False158159160def _rerouteCar(vehicleID):161slotEdge = ""162for rowIdx in range(DOUBLE_ROWS):163for idx in range(SLOTS_PER_ROW):164for dir in ["l", "r"]:165slotEdge = "slot%s-%s%s" % (rowIdx, idx, dir)166if slotEdge not in occupancy:167occupancy[slotEdge] = vehicleID168stopAt(vehicleID, slotEdge, SLOT_LENGTH - 5.)169return170171172def _reroutePersons(edge):173if edge in persons:174for person in persons[edge]:175if not vehicleStatus[person].slot:176row = int(edge[4])177targetEdge = "footmain%sto%s" % (row, row + 1)178traci.vehicle.setStop(179person, edge.replace("slot", "-foot"), 1., 0, 0)180stopAt(person, targetEdge)181vehicleStatus[person].parking = False182vehicleStatus[person].slot = edge183184185def _checkInitialPositions(vehicleID, edge, pos):186if vehicleID in vehicleStatus:187vehicleStatus[vehicleID].edge = edge188vehicleStatus[vehicleID].pos = pos189else:190vehicleStatus[vehicleID] = Status(edge, pos)191if edge == "mainin":192_rerouteCar(vehicleID)193elif edge == "cyberin":194stopAt(vehicleID, "cyberin")195elif edge == "footfairin":196stopAt(vehicleID, "footmainout")197elif "foot" in edge:198traci.vehicle.setStop(vehicleID, "-" + edge)199parkEdge = edge.replace("foot", "slot")200if parkEdge not in persons:201persons[parkEdge] = []202persons[parkEdge].append(vehicleID)203vehicleStatus[vehicleID].parking = True204elif edge.startswith("slot"):205stopAt(vehicleID, edge, SLOT_LENGTH - 5.)206occupancy[edge] = vehicleID207_reroutePersons(edge)208209210def doStep():211setting.step += 1212if setting.verbose:213print("step", setting.step)214traci.simulationStep()215moveNodes = []216for veh, subs in traci.vehicle.getAllSubscriptionResults().items():217moveNodes.append((veh, subs[tc.VAR_ROAD_ID], subs[tc.VAR_LANEPOSITION]))218departed = traci.simulation.getSubscriptionResults()[tc.VAR_DEPARTED_VEHICLES_IDS]219for v in departed:220traci.vehicle.subscribe(v)221subs = traci.vehicle.getSubscriptionResults(v)222moveNodes.append((v, subs[tc.VAR_ROAD_ID], subs[tc.VAR_LANEPOSITION]))223for vehicleID, edge, pos in moveNodes:224_checkInitialPositions(vehicleID, edge, pos)225vehicle = vehicleStatus[vehicleID]226if edge == vehicle.target and not vehicle.parking:227if edge.startswith("footmain"):228vehicle.parking = True229target = "footmainout"230if edge == "footmainout":231row = random.randrange(0, DOUBLE_ROWS)232target = "footmain%sto%s" % (row, row + 1)233statistics.personArrived(vehicleID, edge, target, setting.step)234setting.manager.personArrived(vehicleID, edge, target)235if edge.startswith("cyber"):236if vehicle.delay:237vehicle.delay -= 1238elif setting.breakstep and setting.step >= setting.breakstep and edge != "cyberin":239if setting.verbose:240print("broken", vehicleID, edge)241setting.breakstep = None242setting.manager.cyberCarBroken(vehicleID, edge)243vehicle.delay = BREAK_DELAY244else:245if setting.verbose:246print("arrived", vehicleID, edge)247vehicle.parking = True248setting.manager.cyberCarArrived(vehicleID, edge)249setting.manager.setNewTargets()250251252