Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
eclipse
GitHub Repository: eclipse/sumo
Path: blob/main/tests/complex/tutorial/city_mobil/data/vehicleControl.py
169689 views
1
# -*- coding: utf-8 -*-
2
# Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
3
# Copyright (C) 2008-2025 German Aerospace Center (DLR) and others.
4
# This program and the accompanying materials are made available under the
5
# terms of the Eclipse Public License 2.0 which is available at
6
# https://www.eclipse.org/legal/epl-2.0/
7
# This Source Code may also be made available under the following Secondary
8
# Licenses when the conditions for such availability set forth in the Eclipse
9
# Public License 2.0 are satisfied: GNU General Public License, version 2
10
# or later which is available at
11
# https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13
14
# @file vehicleControl.py
15
# @author Michael Behrisch
16
# @author Daniel Krajzewicz
17
# @author Lena Kalleske
18
# @date 2008-07-21
19
20
from __future__ import absolute_import
21
from __future__ import print_function
22
import random
23
import sys
24
import os
25
from optparse import OptionParser
26
27
from constants import PREFIX, DOUBLE_ROWS, STOP_POS, SLOTS_PER_ROW, SLOT_LENGTH, BUS_CAPACITY, BREAK_DELAY
28
from constants import CYBER_CAPACITY
29
import statistics
30
31
if 'SUMO_HOME' in os.environ:
32
tools = os.path.join(os.environ['SUMO_HOME'], 'tools')
33
sys.path.append(tools)
34
else:
35
sys.exit("please declare environment variable 'SUMO_HOME'")
36
37
import traci # noqa
38
import traci.constants as tc # noqa
39
import sumolib # noqa
40
41
42
class Manager:
43
44
def personArrived(self, vehicleID, edge, target):
45
raise NotImplementedError
46
47
def cyberCarArrived(self, vehicleID, edge):
48
raise NotImplementedError
49
50
def cyberCarBroken(self, vehicleID, edge):
51
pass
52
53
def setNewTargets(self):
54
pass
55
56
57
class Status:
58
59
def __init__(self, edge, pos):
60
self.edge = edge
61
self.pos = pos
62
self.parking = False
63
self.target = None
64
self.targetPos = None
65
self.slot = None
66
self.delay = None
67
68
def __repr__(self):
69
return "%s,%s" % (self.edge, self.pos)
70
71
72
class Setting:
73
step = 0
74
manager = None
75
verbose = False
76
cyber = False
77
breakstep = None
78
79
80
setting = Setting()
81
occupancy = {}
82
vehicleStatus = {}
83
persons = {}
84
waiting = {}
85
86
87
def init(manager):
88
optParser = OptionParser()
89
optParser.add_option("-v", "--verbose", action="store_true",
90
default=False, help="tell me what you are doing")
91
optParser.add_option("-g", "--gui", action="store_true",
92
default=False, help="run with GUI")
93
optParser.add_option("-c", "--cyber", action="store_true",
94
default=False, help="use small cybercars instead of big busses")
95
optParser.add_option("-d", "--demand", type="int",
96
default=15, help="period with which the persons are emitted")
97
optParser.add_option("-b", "--break", type="int", dest="breakstep", metavar="TIMESTEP",
98
help="let a vehicle break for %s seconds at TIMESTEP" % BREAK_DELAY)
99
optParser.add_option("-t", "--test", action="store_true",
100
default=False, help="Run in test mode")
101
options, _ = optParser.parse_args()
102
sumoExe = sumolib.checkBinary('sumo')
103
if options.gui:
104
sumoExe = sumolib.checkBinary('sumo-gui')
105
sumoConfig = "%s%02i.sumocfg" % (PREFIX, options.demand)
106
if options.cyber:
107
sumoConfig = "%s%02i_cyber.sumocfg" % (PREFIX, options.demand)
108
traci.start([sumoExe, "-c", sumoConfig])
109
traci.simulation.subscribe()
110
setting.manager = manager
111
setting.verbose = options.verbose
112
setting.cyber = options.cyber
113
setting.breakstep = options.breakstep
114
try:
115
while setting.step < 100 or statistics.personsRunning > 0:
116
doStep()
117
statistics.evaluate(options.test)
118
finally:
119
traci.close()
120
121
122
def getCapacity():
123
if setting.cyber:
124
return CYBER_CAPACITY
125
return BUS_CAPACITY
126
127
128
def getStep():
129
return setting.step
130
131
132
def getPosition(vehicleID):
133
return vehicleStatus[vehicleID].edge
134
135
136
def stopAt(vehicleID, edge, pos=None):
137
if pos is None:
138
pos = STOP_POS
139
if edge.endswith("out") or edge.endswith("in"):
140
pos = 90.
141
traci.vehicle.changeTarget(vehicleID, edge)
142
if setting.verbose:
143
print("stopAt", vehicleID, edge, pos)
144
# print vehicleStatus[vehicleID]
145
# print traci.vehicle.getRoute(vehicleID)
146
traci.vehicle.setStop(vehicleID, edge, pos)
147
vehicleStatus[vehicleID].target = edge
148
vehicleStatus[vehicleID].targetPos = pos
149
150
151
def leaveStop(vehicleID, newTarget=None, delay=0):
152
v = vehicleStatus[vehicleID]
153
if newTarget:
154
traci.vehicle.changeTarget(vehicleID, newTarget)
155
traci.vehicle.setStop(vehicleID, v.target, v.targetPos, 0, delay)
156
v.target = None
157
v.targetPos = None
158
v.parking = False
159
160
161
def _rerouteCar(vehicleID):
162
slotEdge = ""
163
for rowIdx in range(DOUBLE_ROWS):
164
for idx in range(SLOTS_PER_ROW):
165
for dir in ["l", "r"]:
166
slotEdge = "slot%s-%s%s" % (rowIdx, idx, dir)
167
if slotEdge not in occupancy:
168
occupancy[slotEdge] = vehicleID
169
stopAt(vehicleID, slotEdge, SLOT_LENGTH - 5.)
170
return
171
172
173
def _reroutePersons(edge):
174
if edge in persons:
175
for person in persons[edge]:
176
if not vehicleStatus[person].slot:
177
row = int(edge[4])
178
targetEdge = "footmain%sto%s" % (row, row + 1)
179
traci.vehicle.setStop(
180
person, edge.replace("slot", "-foot"), 1., 0, 0)
181
stopAt(person, targetEdge)
182
vehicleStatus[person].parking = False
183
vehicleStatus[person].slot = edge
184
185
186
def _checkInitialPositions(vehicleID, edge, pos):
187
if vehicleID in vehicleStatus:
188
vehicleStatus[vehicleID].edge = edge
189
vehicleStatus[vehicleID].pos = pos
190
else:
191
vehicleStatus[vehicleID] = Status(edge, pos)
192
if edge == "mainin":
193
_rerouteCar(vehicleID)
194
elif edge == "cyberin":
195
stopAt(vehicleID, "cyberin")
196
elif edge == "footfairin":
197
stopAt(vehicleID, "footmainout")
198
elif "foot" in edge:
199
traci.vehicle.setStop(vehicleID, "-" + edge)
200
parkEdge = edge.replace("foot", "slot")
201
if parkEdge not in persons:
202
persons[parkEdge] = []
203
persons[parkEdge].append(vehicleID)
204
vehicleStatus[vehicleID].parking = True
205
elif edge.startswith("slot"):
206
stopAt(vehicleID, edge, SLOT_LENGTH - 5.)
207
occupancy[edge] = vehicleID
208
_reroutePersons(edge)
209
210
211
def doStep():
212
setting.step += 1
213
if setting.verbose:
214
print("step", setting.step)
215
traci.simulationStep()
216
moveNodes = []
217
for veh, subs in traci.vehicle.getAllSubscriptionResults().items():
218
moveNodes.append((veh, subs[tc.VAR_ROAD_ID], subs[tc.VAR_LANEPOSITION]))
219
departed = traci.simulation.getSubscriptionResults()[tc.VAR_DEPARTED_VEHICLES_IDS]
220
for v in departed:
221
traci.vehicle.subscribe(v)
222
subs = traci.vehicle.getSubscriptionResults(v)
223
moveNodes.append((v, subs[tc.VAR_ROAD_ID], subs[tc.VAR_LANEPOSITION]))
224
for vehicleID, edge, pos in moveNodes:
225
_checkInitialPositions(vehicleID, edge, pos)
226
vehicle = vehicleStatus[vehicleID]
227
if edge == vehicle.target and not vehicle.parking:
228
if edge.startswith("footmain"):
229
vehicle.parking = True
230
target = "footmainout"
231
if edge == "footmainout":
232
row = random.randrange(0, DOUBLE_ROWS)
233
target = "footmain%sto%s" % (row, row + 1)
234
statistics.personArrived(vehicleID, edge, target, setting.step)
235
setting.manager.personArrived(vehicleID, edge, target)
236
if edge.startswith("cyber"):
237
if vehicle.delay:
238
vehicle.delay -= 1
239
elif setting.breakstep and setting.step >= setting.breakstep and edge != "cyberin":
240
if setting.verbose:
241
print("broken", vehicleID, edge)
242
setting.breakstep = None
243
setting.manager.cyberCarBroken(vehicleID, edge)
244
vehicle.delay = BREAK_DELAY
245
else:
246
if setting.verbose:
247
print("arrived", vehicleID, edge)
248
vehicle.parking = True
249
setting.manager.cyberCarArrived(vehicleID, edge)
250
setting.manager.setNewTargets()
251
252