Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
eclipse
GitHub Repository: eclipse/sumo
Path: blob/main/tests/complex/simpla/platoon_manager/test.py
169685 views
1
#!/usr/bin/env python
2
# Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
3
# Copyright (C) 2017-2025 German Aerospace Center (DLR) and others.
4
# This program and the accompanying materials are made available under the
5
# terms of the Eclipse Public License 2.0 which is available at
6
# https://www.eclipse.org/legal/epl-2.0/
7
# This Source Code may also be made available under the following Secondary
8
# Licenses when the conditions for such availability set forth in the Eclipse
9
# Public License 2.0 are satisfied: GNU General Public License, version 2
10
# or later which is available at
11
# https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13
14
# @file test.py
15
# @author Leonhard Luecken
16
# @date 2017
17
18
import unittest as ut
19
import os
20
import sys
21
22
# Put tools into PYTHONPATH
23
if "SUMO_HOME" in os.environ:
24
sys.path.append(os.path.join(os.environ["SUMO_HOME"], "tools"))
25
26
import traci # noqa
27
import simpla # noqa
28
import sumolib # noqa
29
import simpla._reporting as rp # noqa
30
import simpla._config as cfg # noqa
31
from simpla._platoonmode import PlatoonMode # noqa
32
from functools import reduce # noqa
33
34
35
class TestPlatoonManager(ut.TestCase):
36
37
def setUp(self):
38
print("\n")
39
ut.TestCase.setUp(self)
40
testDir = os.path.dirname(os.path.realpath(__file__))
41
# Declare simpla config files
42
self.CFG0 = 'simpla.cfg.xml'
43
self.CFG1 = 'simpla_test.cfg.xml'
44
45
self.SIMPLA_CFG = os.path.join(testDir, "simpla.cfg.xml")
46
self.SIMPLA_CFG_WARN = os.path.join(testDir, "simpla_test_warn.cfg.xml")
47
self.SIMPLA_CFG_VTYPEMAP = os.path.join(testDir, "simpla_test_vtypemap.cfg.xml")
48
49
# define config contents
50
self.cfg_body0 =\
51
"""
52
<vTypeMapFile value="vtype.map" />
53
<controlRate value="10." />
54
<vehicleSelectors value="connected" />
55
<maxPlatoonGap value="15.0" />
56
<catchupDist value="50.0" />
57
<switchImpatienceFactor value="0.1" />
58
<platoonSplitTime value="3.0" />
59
<useHeadway value="false" />
60
<lcMode original="597" leader="597" follower="514" catchup="514" catchupFollower="514" />
61
<speedFactor original="1.01" leader="1.01" follower="1.11" catchup="1.21" catchupFollower="1.31" >\
62
</speedFactor>
63
<verbosity value="2" />
64
"""
65
66
self.cfg_body1 =\
67
"""
68
<vTypeMapFile file="vtype.map"/>
69
<controlRate value="1000." />
70
<vehicleSelectors value="connected" />
71
<maxPlatoonGap value="15.0" />
72
<catchupDist value="50.0" />
73
<switchImpatienceFactor value="0.1" />
74
<platoonSplitTime value="3.0" />
75
<useHeadway value="false" />
76
<lcMode original="597" leader="597" follower="514" catchup="514" catchupFollower="514" />
77
<speedFactor original="1.01" leader="1.01" follower="1.11" catchup="1.21" catchupFollower="1.31" >\
78
</speedFactor>
79
<verbosity value="2" />
80
"""
81
82
self.cfg_body2 =\
83
"""
84
<vTypeMap original="unknownVTypeID" leader="leaderVTypeID" follower="followerVTypeID" \
85
catchup="catchupVTypeID" catchupFollower="catchupFollowerVTypeID" />
86
<useHeadway value="false" />
87
"""
88
89
self.cfg_body3 =\
90
"""
91
<controlRate value="10." />
92
<vehicleSelectors value="connected" />
93
<maxPlatoonGap value="15.0" />
94
<catchupDist value="50.0" />
95
<switchImpatienceFactor value="0.1" />
96
<platoonSplitTime value="3.0" />
97
<useHeadway value="false" />
98
<lcMode original="597" leader="597" follower="514" catchup="514" catchupFollower="514" />
99
<speedFactor original="1.01" leader="1.01" follower="1.11" catchup="1.21" catchupFollower="1.31" >\
100
</speedFactor>
101
<verbosity value="2" />
102
<vTypeMap original="connected" leader="connected_pLeader" follower="connected_pFollower" \
103
catchup="connected_pCatchup" catchupFollower="connected_pCatchupFollower" />
104
"""
105
106
self.cfg_body4 =\
107
"""
108
<vTypeMap original="connected" leader="connected_pLeader" follower="connected_pFollower"/>
109
<useHeadway value="false" />
110
"""
111
# template still needs to insert definite values for placeholders
112
self.SUMO_CFG_TEMPLATE = os.path.join(testDir, "sumo.sumocfg")
113
self.SUMO_CFG = os.path.join(testDir, "test.sumocfg")
114
self.patchSumoConfig() # write default routes file and net file into config
115
116
def connectToSumo(self, sumo_cfg):
117
traci.start([sumolib.checkBinary('sumo'), "-c", sumo_cfg])
118
119
def patchSumoConfig(self, routes_fn="input_routes.rou.xml", net_fn="input_net.net.xml",
120
vtypes_fn="input_types.typ.xml"):
121
# replace routes_fn in config template
122
with open(self.SUMO_CFG_TEMPLATE, "r") as cfg_template, open(self.SUMO_CFG, "w") as cfg:
123
template_str = cfg_template.read()
124
cfg_str = template_str.format(routes_file=routes_fn, net_file=net_fn, vtypes_file=vtypes_fn)
125
cfg.write(cfg_str)
126
127
def patchConfigFile(self, cfg_body):
128
# patch simpla config
129
with open(self.CFG0, "r") as empty_cfg, open(self.CFG1, "w") as target_cfg:
130
s = empty_cfg.read()
131
# print(s.format(body=cfg_body))
132
target_cfg.write(s.format(body=cfg_body))
133
134
def tearDown(self):
135
ut.TestCase.tearDown(self)
136
# stop simpla
137
simpla.stop()
138
# close TraCI connection
139
traci.close()
140
# reset simpla
141
rp.initDefaults()
142
cfg.initDefaults()
143
if os.path.exists(self.SUMO_CFG):
144
os.remove(self.SUMO_CFG)
145
146
def test_init(self):
147
print("Testing platoon manager initialization...")
148
self.patchSumoConfig()
149
self.connectToSumo(self.SUMO_CFG)
150
self.patchConfigFile(self.cfg_body0)
151
simpla.load(self.CFG1)
152
self.assertListEqual([r for t, r in rp.WARNING_LOG], [])
153
154
expectedVTypes = ["connected", "connected_pLeader", "connected_pFollower",
155
"connected_pCatchup", "connected_pCatchupFollower"]
156
registeredPlatoonVTypes = list(
157
set(reduce(lambda x, y: x + y, [[orig] + list(mapped.values())
158
for orig, mapped in cfg.PLATOON_VTYPES.items()])))
159
expectedVTypes.sort()
160
registeredPlatoonVTypes.sort()
161
self.assertListEqual(expectedVTypes, registeredPlatoonVTypes)
162
163
def test_init_vtypemap(self):
164
print("Testing specification per vtypemap xml-element...")
165
self.patchSumoConfig()
166
self.connectToSumo(self.SUMO_CFG)
167
self.patchConfigFile(self.cfg_body3)
168
simpla.load(self.CFG1)
169
# simpla.load(self.SIMPLA_CFG_VTYPEMAP)
170
self.assertListEqual([r for t, r in rp.WARNING_LOG], [])
171
172
expectedVTypes = ["connected", "connected_pLeader", "connected_pFollower",
173
"connected_pCatchup", "connected_pCatchupFollower"]
174
registeredPlatoonVTypes = list(
175
set(reduce(lambda x, y: x + y, [[orig] + list(mapped.values())
176
for orig, mapped in cfg.PLATOON_VTYPES.items()])))
177
expectedVTypes.sort()
178
registeredPlatoonVTypes.sort()
179
self.assertListEqual(expectedVTypes, registeredPlatoonVTypes)
180
181
def test_init_vtypemap_fallback(self):
182
print("Testing specification per vtypemap xml-element with missing modes...")
183
self.patchSumoConfig()
184
self.connectToSumo(self.SUMO_CFG)
185
self.patchConfigFile(self.cfg_body4)
186
simpla.load(self.CFG1)
187
188
def test_init_warn(self):
189
print("Testing Warnings...")
190
self.patchSumoConfig(vtypes_fn="input_types2.typ.xml")
191
self.connectToSumo(self.SUMO_CFG)
192
# print (self.cfg_body1)
193
self.patchConfigFile(self.cfg_body1)
194
simpla.load(self.CFG1)
195
# simpla.load(self.SIMPLA_CFG_WARN)
196
expected_warnings = [
197
"WARNING: Restricting given control rate (= 1000 per sec.) to 1 per timestep (= 10 per sec.) " +
198
"(PlatoonManager)",
199
"WARNING: emergencyDecel of mapped vType 'connected_pCatchupFollower' (10.5m.) does not equal " +
200
"emergencyDecel of original vType 'connected' (4.5m.) (PlatoonManager)",
201
"WARNING: emergencyDecel of mapped vType 'connected_pFollower' (1.7m.) does not equal emergencyDecel " +
202
"of original vType 'connected' (4.5m.) (PlatoonManager)",
203
"WARNING: emergencyDecel of mapped vType 'connected_pCatchup' (0.5m.) does not equal emergencyDecel of " +
204
"original vType 'connected' (4.5m.) (PlatoonManager)",
205
"WARNING: length of mapped vType 'connected_pLeader' (10.0m.) does not equal length of original vType " +
206
"'connected' (5.0m.)\nThis will probably lead to collisions. (PlatoonManager)",
207
"WARNING: length of mapped vType 'connected_pCatchupFollower' (3.0m.) does not equal length of original " +
208
"vType 'connected' (5.0m.)\nThis will probably lead to collisions. (PlatoonManager)",
209
]
210
warnings_list = [r for t, r in rp.WARNING_LOG]
211
# for w in warnings_list:
212
# print(w)
213
for w in expected_warnings:
214
self.assertIn(w, warnings_list)
215
self.assertListEqual([], list(set(warnings_list).difference(expected_warnings)))
216
traci.simulationStep(1.)
217
self.assertEqual(rp.WARNING_LOG[-1][0], "1.0")
218
self.assertEqual(
219
rp.WARNING_LOG[-1][1], "WARNING: Step lengths that differ from SUMO's simulation step length are not " +
220
"supported and probably lead to undesired behavior.\nConsider decreasing simpla's control rate instead. " +
221
"(PlatoonManager)")
222
223
def test_unknown_vtypes(self):
224
print("Testing Exceptions for unknown vTypes...")
225
self.patchSumoConfig(vtypes_fn="input_types2.typ.xml")
226
self.connectToSumo(self.SUMO_CFG)
227
# print (self.cfg_body1)
228
self.patchConfigFile(self.cfg_body2)
229
try:
230
simpla.load(self.CFG1)
231
self.assertTrue(False, "PlatoonManager() should raise an exception in case of unknown vtypes")
232
except simpla.SimplaException as e:
233
self.assertEqual(
234
str(e), "vType 'unknownVTypeID' is unknown to sumo! Note: Platooning vTypes must be defined at " +
235
"startup.")
236
237
def test_add_and_remove(self):
238
print("Testing adding and removing connected vehicles...")
239
self.patchSumoConfig()
240
self.connectToSumo(self.SUMO_CFG)
241
242
self.patchConfigFile(self.cfg_body0)
243
simpla.load(self.CFG1)
244
# simpla.load(self.SIMPLA_CFG)
245
mgr = simpla._mgr
246
247
# # load simpla without adding a step listener
248
# simpla._config.load(self.SIMPLA_CFG)
249
# simpla._mgr = simpla._platoonmanager.PlatoonManager()
250
251
self.assertTupleEqual((), traci.vehicle.getIDList())
252
253
traci.simulationStep()
254
self.assertTupleEqual(('connected.1',), traci.vehicle.getIDList())
255
self.assertListEqual(['connected.1'], [vehID for vehID in mgr._connectedVehicles.keys()])
256
257
self.assertEqual(rp.REPORT_LOG[-1][0], "0.1")
258
self.assertEqual(rp.REPORT_LOG[-1][1], "Adding vehicle 'connected.1' (PlatoonManager)")
259
260
while traci.simulation.getTime() < 2:
261
traci.simulationStep()
262
263
self.assertListEqual(list(sorted(['connected.1', 'conventional.1'])), list(sorted(traci.vehicle.getIDList())))
264
self.assertListEqual(['connected.1'], [vehID for vehID in mgr._connectedVehicles.keys()])
265
266
while traci.simulation.getTime() <= 5:
267
traci.simulationStep()
268
269
self.assertEqual(rp.REPORT_LOG[-1][0], "3.1")
270
self.assertEqual(rp.REPORT_LOG[-1][1], "Adding vehicle 'IAMconnectedTOO' (PlatoonManager)")
271
272
self.assertTrue(mgr._hasConnectedType("IAMconnectedTOO"))
273
self.assertListEqual(list(sorted(['connected.1', 'conventional.1', 'IAMconnectedTOO'])), list(
274
sorted(traci.vehicle.getIDList())))
275
self.assertListEqual(list(sorted(['connected.1', 'IAMconnectedTOO'])), list(
276
sorted([vehID for vehID in mgr._connectedVehicles.keys()])))
277
278
while traci.simulation.getTime() <= 14:
279
traci.simulationStep()
280
281
self.assertEqual(rp.REPORT_LOG[-1][0], "11.4")
282
self.assertEqual(
283
rp.REPORT_LOG[-1][1], "Platoon '1' joined Platoon '0', which now contains vehicles:\n['connected.1', " +
284
"'IAMconnectedTOO'] (PlatoonManager)")
285
286
while traci.simulation.getTime() <= 17:
287
traci.simulationStep()
288
289
self.assertEqual(rp.REPORT_LOG[-1][0], "16.0")
290
self.assertEqual(
291
rp.REPORT_LOG[-1][1], "Platoon '0' splits (ID of new platoon: '2'):\n Platoon '0': ['connected.1']\n" +
292
" Platoon '2': ['IAMconnectedTOO'] (PlatoonManager)")
293
294
while traci.simulation.getTime() <= 18:
295
traci.simulationStep()
296
297
self.assertEqual(rp.REPORT_LOG[-1][0], "17.3")
298
self.assertEqual(rp.REPORT_LOG[-1][1], "Removing arrived vehicle 'connected.1' (PlatoonManager)")
299
self.assertListEqual(list(sorted(['conventional.1', 'IAMconnectedTOO'])),
300
list(sorted(traci.vehicle.getIDList())))
301
self.assertListEqual(['IAMconnectedTOO'], [vehID for vehID in mgr._connectedVehicles.keys()])
302
303
while traci.simulation.getTime() <= 20:
304
traci.simulationStep()
305
306
self.assertEqual(rp.REPORT_LOG[-1][0], "19.8")
307
self.assertEqual(rp.REPORT_LOG[-1][1], "Removing arrived vehicle 'IAMconnectedTOO' (PlatoonManager)")
308
309
def test_platoon_formation(self):
310
print("Testing platoon formation...")
311
self.patchSumoConfig(net_fn="input_net2.net.xml", routes_fn="input_routes2.rou.xml")
312
self.connectToSumo(self.SUMO_CFG)
313
314
self.patchConfigFile(self.cfg_body0)
315
simpla.load(self.CFG1)
316
# simpla.load(self.SIMPLA_CFG)
317
mgr = simpla._mgr
318
319
while traci.simulation.getTime() <= 5:
320
traci.simulationStep()
321
322
self.assertIn("connected.1", mgr._connectedVehicles)
323
self.assertIn("connected.2", mgr._connectedVehicles)
324
veh1 = mgr._connectedVehicles["connected.1"]
325
self.assertEqual(veh1.getCurrentPlatoonMode(), PlatoonMode.NONE)
326
veh2 = mgr._connectedVehicles["connected.2"]
327
self.assertEqual(veh2.getCurrentPlatoonMode(), PlatoonMode.CATCHUP)
328
329
while traci.simulation.getTime() <= 20:
330
traci.simulationStep()
331
332
# self.assertEqual(rp.REPORT_LOG[-1][0], "13.7")
333
# self.assertEqual(rp.REPORT_LOG[-1][1], "Platoon '1' joined Platoon '0', which now contains " +
334
# "vehicles:\n['connected.1', 'connected.2'] (PlatoonManager)")
335
336
self.assertEqual(veh1.getCurrentPlatoonMode(), PlatoonMode.LEADER)
337
self.assertEqual(veh2.getCurrentPlatoonMode(), PlatoonMode.FOLLOWER)
338
339
vehs = mgr.getPlatoonLeaders()[0].getPlatoon().getVehicles()
340
vehIDs = [v.getID() for v in vehs]
341
self.assertEqual(len(vehIDs), 2)
342
self.assertIn("connected.1", vehIDs)
343
self.assertIn("connected.2", vehIDs)
344
345
while traci.vehicle.getLaneID(vehIDs[0]) == traci.vehicle.getLaneID(vehIDs[1]):
346
traci.simulationStep()
347
348
self.assertFalse(veh1.state.laneID == veh2.state.laneID)
349
t0 = traci.simulation.getTime()
350
self.assertEqual(t0, 71.4)
351
352
expected_split_time = t0 + mgr._DeltaT + cfg.PLATOON_SPLIT_TIME
353
354
while traci.simulation.getTime() <= expected_split_time:
355
traci.simulationStep()
356
357
self.assertAlmostEqual(veh2._timeUntilSplit, 0.0, 9)
358
359
while traci.simulation.getTime() <= expected_split_time + 1.:
360
traci.simulationStep()
361
362
self.assertEqual(
363
rp.REPORT_LOG[-1][1], "Platoon '0' splits (ID of new platoon: '2'):\n Platoon '0': ['connected.1']\n" +
364
" Platoon '2': ['connected.2'] (PlatoonManager)")
365
366
367
# # ignore some tests for faster execution
368
# ignore_tests = [
369
# "test_init",
370
# "test_init_vtypemap",
371
# # "test_init_warn",
372
# "test_add_and_remove",
373
# "test_platoon_formation",
374
# ]
375
#
376
# for t in ignore_tests:
377
# delattr(TestPlatoonManager,t)
378
379
380
if __name__ == "__main__":
381
ut.main()
382
383