Path: blob/main/tests/complex/simpla/platoon_manager/test.py
169685 views
#!/usr/bin/env python1# Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo2# Copyright (C) 2017-2025 German Aerospace Center (DLR) and others.3# This program and the accompanying materials are made available under the4# terms of the Eclipse Public License 2.0 which is available at5# https://www.eclipse.org/legal/epl-2.0/6# This Source Code may also be made available under the following Secondary7# Licenses when the conditions for such availability set forth in the Eclipse8# Public License 2.0 are satisfied: GNU General Public License, version 29# or later which is available at10# https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html11# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later1213# @file test.py14# @author Leonhard Luecken15# @date 20171617import unittest as ut18import os19import sys2021# Put tools into PYTHONPATH22if "SUMO_HOME" in os.environ:23sys.path.append(os.path.join(os.environ["SUMO_HOME"], "tools"))2425import traci # noqa26import simpla # noqa27import sumolib # noqa28import simpla._reporting as rp # noqa29import simpla._config as cfg # noqa30from simpla._platoonmode import PlatoonMode # noqa31from functools import reduce # noqa323334class TestPlatoonManager(ut.TestCase):3536def setUp(self):37print("\n")38ut.TestCase.setUp(self)39testDir = os.path.dirname(os.path.realpath(__file__))40# Declare simpla config files41self.CFG0 = 'simpla.cfg.xml'42self.CFG1 = 'simpla_test.cfg.xml'4344self.SIMPLA_CFG = os.path.join(testDir, "simpla.cfg.xml")45self.SIMPLA_CFG_WARN = os.path.join(testDir, "simpla_test_warn.cfg.xml")46self.SIMPLA_CFG_VTYPEMAP = os.path.join(testDir, "simpla_test_vtypemap.cfg.xml")4748# define config contents49self.cfg_body0 =\50"""51<vTypeMapFile value="vtype.map" />52<controlRate value="10." />53<vehicleSelectors value="connected" />54<maxPlatoonGap value="15.0" />55<catchupDist value="50.0" />56<switchImpatienceFactor value="0.1" />57<platoonSplitTime value="3.0" />58<useHeadway value="false" />59<lcMode original="597" leader="597" follower="514" catchup="514" catchupFollower="514" />60<speedFactor original="1.01" leader="1.01" follower="1.11" catchup="1.21" catchupFollower="1.31" >\61</speedFactor>62<verbosity value="2" />63"""6465self.cfg_body1 =\66"""67<vTypeMapFile file="vtype.map"/>68<controlRate value="1000." />69<vehicleSelectors value="connected" />70<maxPlatoonGap value="15.0" />71<catchupDist value="50.0" />72<switchImpatienceFactor value="0.1" />73<platoonSplitTime value="3.0" />74<useHeadway value="false" />75<lcMode original="597" leader="597" follower="514" catchup="514" catchupFollower="514" />76<speedFactor original="1.01" leader="1.01" follower="1.11" catchup="1.21" catchupFollower="1.31" >\77</speedFactor>78<verbosity value="2" />79"""8081self.cfg_body2 =\82"""83<vTypeMap original="unknownVTypeID" leader="leaderVTypeID" follower="followerVTypeID" \84catchup="catchupVTypeID" catchupFollower="catchupFollowerVTypeID" />85<useHeadway value="false" />86"""8788self.cfg_body3 =\89"""90<controlRate value="10." />91<vehicleSelectors value="connected" />92<maxPlatoonGap value="15.0" />93<catchupDist value="50.0" />94<switchImpatienceFactor value="0.1" />95<platoonSplitTime value="3.0" />96<useHeadway value="false" />97<lcMode original="597" leader="597" follower="514" catchup="514" catchupFollower="514" />98<speedFactor original="1.01" leader="1.01" follower="1.11" catchup="1.21" catchupFollower="1.31" >\99</speedFactor>100<verbosity value="2" />101<vTypeMap original="connected" leader="connected_pLeader" follower="connected_pFollower" \102catchup="connected_pCatchup" catchupFollower="connected_pCatchupFollower" />103"""104105self.cfg_body4 =\106"""107<vTypeMap original="connected" leader="connected_pLeader" follower="connected_pFollower"/>108<useHeadway value="false" />109"""110# template still needs to insert definite values for placeholders111self.SUMO_CFG_TEMPLATE = os.path.join(testDir, "sumo.sumocfg")112self.SUMO_CFG = os.path.join(testDir, "test.sumocfg")113self.patchSumoConfig() # write default routes file and net file into config114115def connectToSumo(self, sumo_cfg):116traci.start([sumolib.checkBinary('sumo'), "-c", sumo_cfg])117118def patchSumoConfig(self, routes_fn="input_routes.rou.xml", net_fn="input_net.net.xml",119vtypes_fn="input_types.typ.xml"):120# replace routes_fn in config template121with open(self.SUMO_CFG_TEMPLATE, "r") as cfg_template, open(self.SUMO_CFG, "w") as cfg:122template_str = cfg_template.read()123cfg_str = template_str.format(routes_file=routes_fn, net_file=net_fn, vtypes_file=vtypes_fn)124cfg.write(cfg_str)125126def patchConfigFile(self, cfg_body):127# patch simpla config128with open(self.CFG0, "r") as empty_cfg, open(self.CFG1, "w") as target_cfg:129s = empty_cfg.read()130# print(s.format(body=cfg_body))131target_cfg.write(s.format(body=cfg_body))132133def tearDown(self):134ut.TestCase.tearDown(self)135# stop simpla136simpla.stop()137# close TraCI connection138traci.close()139# reset simpla140rp.initDefaults()141cfg.initDefaults()142if os.path.exists(self.SUMO_CFG):143os.remove(self.SUMO_CFG)144145def test_init(self):146print("Testing platoon manager initialization...")147self.patchSumoConfig()148self.connectToSumo(self.SUMO_CFG)149self.patchConfigFile(self.cfg_body0)150simpla.load(self.CFG1)151self.assertListEqual([r for t, r in rp.WARNING_LOG], [])152153expectedVTypes = ["connected", "connected_pLeader", "connected_pFollower",154"connected_pCatchup", "connected_pCatchupFollower"]155registeredPlatoonVTypes = list(156set(reduce(lambda x, y: x + y, [[orig] + list(mapped.values())157for orig, mapped in cfg.PLATOON_VTYPES.items()])))158expectedVTypes.sort()159registeredPlatoonVTypes.sort()160self.assertListEqual(expectedVTypes, registeredPlatoonVTypes)161162def test_init_vtypemap(self):163print("Testing specification per vtypemap xml-element...")164self.patchSumoConfig()165self.connectToSumo(self.SUMO_CFG)166self.patchConfigFile(self.cfg_body3)167simpla.load(self.CFG1)168# simpla.load(self.SIMPLA_CFG_VTYPEMAP)169self.assertListEqual([r for t, r in rp.WARNING_LOG], [])170171expectedVTypes = ["connected", "connected_pLeader", "connected_pFollower",172"connected_pCatchup", "connected_pCatchupFollower"]173registeredPlatoonVTypes = list(174set(reduce(lambda x, y: x + y, [[orig] + list(mapped.values())175for orig, mapped in cfg.PLATOON_VTYPES.items()])))176expectedVTypes.sort()177registeredPlatoonVTypes.sort()178self.assertListEqual(expectedVTypes, registeredPlatoonVTypes)179180def test_init_vtypemap_fallback(self):181print("Testing specification per vtypemap xml-element with missing modes...")182self.patchSumoConfig()183self.connectToSumo(self.SUMO_CFG)184self.patchConfigFile(self.cfg_body4)185simpla.load(self.CFG1)186187def test_init_warn(self):188print("Testing Warnings...")189self.patchSumoConfig(vtypes_fn="input_types2.typ.xml")190self.connectToSumo(self.SUMO_CFG)191# print (self.cfg_body1)192self.patchConfigFile(self.cfg_body1)193simpla.load(self.CFG1)194# simpla.load(self.SIMPLA_CFG_WARN)195expected_warnings = [196"WARNING: Restricting given control rate (= 1000 per sec.) to 1 per timestep (= 10 per sec.) " +197"(PlatoonManager)",198"WARNING: emergencyDecel of mapped vType 'connected_pCatchupFollower' (10.5m.) does not equal " +199"emergencyDecel of original vType 'connected' (4.5m.) (PlatoonManager)",200"WARNING: emergencyDecel of mapped vType 'connected_pFollower' (1.7m.) does not equal emergencyDecel " +201"of original vType 'connected' (4.5m.) (PlatoonManager)",202"WARNING: emergencyDecel of mapped vType 'connected_pCatchup' (0.5m.) does not equal emergencyDecel of " +203"original vType 'connected' (4.5m.) (PlatoonManager)",204"WARNING: length of mapped vType 'connected_pLeader' (10.0m.) does not equal length of original vType " +205"'connected' (5.0m.)\nThis will probably lead to collisions. (PlatoonManager)",206"WARNING: length of mapped vType 'connected_pCatchupFollower' (3.0m.) does not equal length of original " +207"vType 'connected' (5.0m.)\nThis will probably lead to collisions. (PlatoonManager)",208]209warnings_list = [r for t, r in rp.WARNING_LOG]210# for w in warnings_list:211# print(w)212for w in expected_warnings:213self.assertIn(w, warnings_list)214self.assertListEqual([], list(set(warnings_list).difference(expected_warnings)))215traci.simulationStep(1.)216self.assertEqual(rp.WARNING_LOG[-1][0], "1.0")217self.assertEqual(218rp.WARNING_LOG[-1][1], "WARNING: Step lengths that differ from SUMO's simulation step length are not " +219"supported and probably lead to undesired behavior.\nConsider decreasing simpla's control rate instead. " +220"(PlatoonManager)")221222def test_unknown_vtypes(self):223print("Testing Exceptions for unknown vTypes...")224self.patchSumoConfig(vtypes_fn="input_types2.typ.xml")225self.connectToSumo(self.SUMO_CFG)226# print (self.cfg_body1)227self.patchConfigFile(self.cfg_body2)228try:229simpla.load(self.CFG1)230self.assertTrue(False, "PlatoonManager() should raise an exception in case of unknown vtypes")231except simpla.SimplaException as e:232self.assertEqual(233str(e), "vType 'unknownVTypeID' is unknown to sumo! Note: Platooning vTypes must be defined at " +234"startup.")235236def test_add_and_remove(self):237print("Testing adding and removing connected vehicles...")238self.patchSumoConfig()239self.connectToSumo(self.SUMO_CFG)240241self.patchConfigFile(self.cfg_body0)242simpla.load(self.CFG1)243# simpla.load(self.SIMPLA_CFG)244mgr = simpla._mgr245246# # load simpla without adding a step listener247# simpla._config.load(self.SIMPLA_CFG)248# simpla._mgr = simpla._platoonmanager.PlatoonManager()249250self.assertTupleEqual((), traci.vehicle.getIDList())251252traci.simulationStep()253self.assertTupleEqual(('connected.1',), traci.vehicle.getIDList())254self.assertListEqual(['connected.1'], [vehID for vehID in mgr._connectedVehicles.keys()])255256self.assertEqual(rp.REPORT_LOG[-1][0], "0.1")257self.assertEqual(rp.REPORT_LOG[-1][1], "Adding vehicle 'connected.1' (PlatoonManager)")258259while traci.simulation.getTime() < 2:260traci.simulationStep()261262self.assertListEqual(list(sorted(['connected.1', 'conventional.1'])), list(sorted(traci.vehicle.getIDList())))263self.assertListEqual(['connected.1'], [vehID for vehID in mgr._connectedVehicles.keys()])264265while traci.simulation.getTime() <= 5:266traci.simulationStep()267268self.assertEqual(rp.REPORT_LOG[-1][0], "3.1")269self.assertEqual(rp.REPORT_LOG[-1][1], "Adding vehicle 'IAMconnectedTOO' (PlatoonManager)")270271self.assertTrue(mgr._hasConnectedType("IAMconnectedTOO"))272self.assertListEqual(list(sorted(['connected.1', 'conventional.1', 'IAMconnectedTOO'])), list(273sorted(traci.vehicle.getIDList())))274self.assertListEqual(list(sorted(['connected.1', 'IAMconnectedTOO'])), list(275sorted([vehID for vehID in mgr._connectedVehicles.keys()])))276277while traci.simulation.getTime() <= 14:278traci.simulationStep()279280self.assertEqual(rp.REPORT_LOG[-1][0], "11.4")281self.assertEqual(282rp.REPORT_LOG[-1][1], "Platoon '1' joined Platoon '0', which now contains vehicles:\n['connected.1', " +283"'IAMconnectedTOO'] (PlatoonManager)")284285while traci.simulation.getTime() <= 17:286traci.simulationStep()287288self.assertEqual(rp.REPORT_LOG[-1][0], "16.0")289self.assertEqual(290rp.REPORT_LOG[-1][1], "Platoon '0' splits (ID of new platoon: '2'):\n Platoon '0': ['connected.1']\n" +291" Platoon '2': ['IAMconnectedTOO'] (PlatoonManager)")292293while traci.simulation.getTime() <= 18:294traci.simulationStep()295296self.assertEqual(rp.REPORT_LOG[-1][0], "17.3")297self.assertEqual(rp.REPORT_LOG[-1][1], "Removing arrived vehicle 'connected.1' (PlatoonManager)")298self.assertListEqual(list(sorted(['conventional.1', 'IAMconnectedTOO'])),299list(sorted(traci.vehicle.getIDList())))300self.assertListEqual(['IAMconnectedTOO'], [vehID for vehID in mgr._connectedVehicles.keys()])301302while traci.simulation.getTime() <= 20:303traci.simulationStep()304305self.assertEqual(rp.REPORT_LOG[-1][0], "19.8")306self.assertEqual(rp.REPORT_LOG[-1][1], "Removing arrived vehicle 'IAMconnectedTOO' (PlatoonManager)")307308def test_platoon_formation(self):309print("Testing platoon formation...")310self.patchSumoConfig(net_fn="input_net2.net.xml", routes_fn="input_routes2.rou.xml")311self.connectToSumo(self.SUMO_CFG)312313self.patchConfigFile(self.cfg_body0)314simpla.load(self.CFG1)315# simpla.load(self.SIMPLA_CFG)316mgr = simpla._mgr317318while traci.simulation.getTime() <= 5:319traci.simulationStep()320321self.assertIn("connected.1", mgr._connectedVehicles)322self.assertIn("connected.2", mgr._connectedVehicles)323veh1 = mgr._connectedVehicles["connected.1"]324self.assertEqual(veh1.getCurrentPlatoonMode(), PlatoonMode.NONE)325veh2 = mgr._connectedVehicles["connected.2"]326self.assertEqual(veh2.getCurrentPlatoonMode(), PlatoonMode.CATCHUP)327328while traci.simulation.getTime() <= 20:329traci.simulationStep()330331# self.assertEqual(rp.REPORT_LOG[-1][0], "13.7")332# self.assertEqual(rp.REPORT_LOG[-1][1], "Platoon '1' joined Platoon '0', which now contains " +333# "vehicles:\n['connected.1', 'connected.2'] (PlatoonManager)")334335self.assertEqual(veh1.getCurrentPlatoonMode(), PlatoonMode.LEADER)336self.assertEqual(veh2.getCurrentPlatoonMode(), PlatoonMode.FOLLOWER)337338vehs = mgr.getPlatoonLeaders()[0].getPlatoon().getVehicles()339vehIDs = [v.getID() for v in vehs]340self.assertEqual(len(vehIDs), 2)341self.assertIn("connected.1", vehIDs)342self.assertIn("connected.2", vehIDs)343344while traci.vehicle.getLaneID(vehIDs[0]) == traci.vehicle.getLaneID(vehIDs[1]):345traci.simulationStep()346347self.assertFalse(veh1.state.laneID == veh2.state.laneID)348t0 = traci.simulation.getTime()349self.assertEqual(t0, 71.4)350351expected_split_time = t0 + mgr._DeltaT + cfg.PLATOON_SPLIT_TIME352353while traci.simulation.getTime() <= expected_split_time:354traci.simulationStep()355356self.assertAlmostEqual(veh2._timeUntilSplit, 0.0, 9)357358while traci.simulation.getTime() <= expected_split_time + 1.:359traci.simulationStep()360361self.assertEqual(362rp.REPORT_LOG[-1][1], "Platoon '0' splits (ID of new platoon: '2'):\n Platoon '0': ['connected.1']\n" +363" Platoon '2': ['connected.2'] (PlatoonManager)")364365366# # ignore some tests for faster execution367# ignore_tests = [368# "test_init",369# "test_init_vtypemap",370# # "test_init_warn",371# "test_add_and_remove",372# "test_platoon_formation",373# ]374#375# for t in ignore_tests:376# delattr(TestPlatoonManager,t)377378379if __name__ == "__main__":380ut.main()381382383