Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
eclipse
GitHub Repository: eclipse/sumo
Path: blob/main/tests/complex/traci/connection/multipleConnections/concurringSubscriptions/runner.py
169771 views
1
#!/usr/bin/env python
2
# Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
3
# Copyright (C) 2008-2025 German Aerospace Center (DLR) and others.
4
# This program and the accompanying materials are made available under the
5
# terms of the Eclipse Public License 2.0 which is available at
6
# https://www.eclipse.org/legal/epl-2.0/
7
# This Source Code may also be made available under the following Secondary
8
# Licenses when the conditions for such availability set forth in the Eclipse
9
# Public License 2.0 are satisfied: GNU General Public License, version 2
10
# or later which is available at
11
# https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13
14
# @file runner.py
15
# @author Daniel Krajzewicz
16
# @author Michael Behrisch
17
# @author Leonhard Luecken
18
# @date 2010-02-20
19
20
from __future__ import absolute_import
21
from __future__ import print_function
22
23
import os
24
import subprocess
25
import sys
26
import time
27
import math
28
import multiprocessing
29
30
if "SUMO_HOME" in os.environ:
31
sys.path.append(os.path.join(os.environ["SUMO_HOME"], "tools"))
32
import sumolib # noqa
33
import traci # noqa
34
import traci.constants as tc # noqa
35
36
PORT = sumolib.miscutils.getFreeSocketPort()
37
DELTA_T = 1000
38
sumoBinary = sumolib.checkBinary(sys.argv[1])
39
40
41
def traciLoop(port, traciEndTime, i, runNr, steplength=0):
42
orderTime = 0.25
43
time.sleep(orderTime * i) # assure ordering of outputs
44
if steplength == 0:
45
steplength = DELTA_T / 1000.
46
# order index dependent on runNr
47
index = i if (runNr % 2 == 0) else 10 - i
48
sys.stdout.flush()
49
traci.init(port)
50
traci.setOrder(index)
51
message = ("Starting process %s (order: %s) with steplength %s\n" % (i, index, steplength))
52
step = 1
53
vehID = ""
54
traciEndStep = math.ceil(traciEndTime / steplength)
55
vehResults = sorted((k, sorted(v.items())) for k, v in traci.vehicle.getAllSubscriptionResults().items())
56
simResults = traci.simulation.getSubscriptionResults()
57
while not step > traciEndStep:
58
message = ""
59
message += ("Process %s:\n" % (i))
60
message += (" %s vehicle subscription results: %s\n" % (i, vehResults))
61
message += (" %s simulation subscription results: %s\n" % (i, simResults))
62
if (vehID == ""):
63
vehs = traci.vehicle.getIDList()
64
if len(vehs) > 0:
65
vehID = vehs[0]
66
if i == 1:
67
message += (" %s subscribing to speed (ID = %s) of vehicle '%s'\n" % (i, tc.VAR_SPEED, vehID))
68
traci.vehicle.subscribe(vehID, [tc.VAR_SPEED])
69
message += (" -> %s\n" % str(traci.vehicle.getAllSubscriptionResults()))
70
else:
71
message += (" %s subscribing to acceleration (ID = %s) of vehicle '%s'\n" %
72
(i, tc.VAR_ACCEL, vehID))
73
traci.vehicle.subscribe(vehID, [tc.VAR_ACCEL])
74
message += (" -> %s\n" % str(traci.vehicle.getAllSubscriptionResults()))
75
sys.stdout.flush()
76
elif len(vehs) == 0:
77
message += (" %s breaking execution: traced vehicle '%s' left." % (i, vehID))
78
print(message)
79
sys.stdout.flush()
80
break
81
message += (" %s stepping (step %s)..." % (i, step))
82
print(message)
83
sys.stdout.flush()
84
message = ""
85
time.sleep(0.01) # give message time to be printed
86
traci.simulationStep(step * steplength)
87
simResults = traci.simulation.getSubscriptionResults()
88
vehResults = sorted((k, sorted(v.items())) for k, v in traci.vehicle.getAllSubscriptionResults().items())
89
step += 1
90
endTime = traci.simulation.getTime()
91
traci.close()
92
time.sleep(orderTime * i) # assure ordering of outputs
93
print("Process %s (order %s) ended at step %s" % (i, index, endTime))
94
sys.stdout.flush()
95
96
97
def runSingle(sumoEndTime, traciEndTime, numClients, runNr):
98
sumoProcess = subprocess.Popen(
99
"%s -v --num-clients %s -c sumo.sumocfg -S -Q --remote-port %s" %
100
(sumoBinary, numClients, PORT), shell=True, stdout=sys.stdout) # Alternate ordering
101
procs = [multiprocessing.Process(target=traciLoop, args=(PORT, traciEndTime, (i + 1), runNr))
102
for i in range(numClients)]
103
for p in procs:
104
p.start()
105
for p in procs:
106
p.join()
107
sumoProcess.wait()
108
sys.stdout.flush()
109
110
111
if __name__ == '__main__':
112
multiprocessing.set_start_method('spawn')
113
numClients = 2
114
runNr = 2
115
print(" Testing multiclient subscriptions...")
116
for i in range(0, runNr):
117
print("\n###### Run %s ######" % i)
118
sys.stdout.flush()
119
runSingle(50, 120, numClients, i)
120
121