Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
eclipse
GitHub Repository: eclipse/sumo
Path: blob/main/tests/tools/turndefs/generateTurnDefs/unit_tests/unittests.py
428465 views
1
#!/usr/bin/env python
2
# Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
3
# Copyright (C) 2018-2026 German Aerospace Center (DLR) and others.
4
# This program and the accompanying materials are made available under the
5
# terms of the Eclipse Public License 2.0 which is available at
6
# https://www.eclipse.org/legal/epl-2.0/
7
# This Source Code may also be made available under the following Secondary
8
# Licenses when the conditions for such availability set forth in the Eclipse
9
# Public License 2.0 are satisfied: GNU General Public License, version 2
10
# or later which is available at
11
# https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13
14
# @file unittests.py
15
# @author Michael Behrisch
16
# @date 2019-01-09
17
18
from __future__ import absolute_import
19
from __future__ import print_function
20
21
import sys
22
import os
23
import logging
24
import unittest
25
26
# Do not use SUMO_HOME here to ensure you are always testing the
27
# functions from the same tree the test is in
28
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', '..', '..', '..', 'tools', 'turn-defs'))
29
import connections # noqa
30
import turndefinitions # noqa
31
32
33
class CollectingHandler(logging.Handler):
34
35
""" Handler for loggers from logging module. Collects all log messages. """
36
37
def __init__(self, level=0):
38
""" Constructor. The level parameter stands for logging level. """
39
40
self.log_records = []
41
logging.Handler.__init__(self, level)
42
43
def handle(self, record):
44
""" See logging.Handler.handle(self, record) docs. """
45
46
self.log_records.append(record)
47
48
def emit(self, record):
49
""" See logging.Handler.emit(self, record) docs. """
50
51
pass # do not emit the record. Other handlers can do that.
52
53
54
class TurnDefinitionsTestCase(unittest.TestCase):
55
# pylint: disable=C,R,W,E,F
56
57
def setUp(self):
58
self.turn_definitions = turndefinitions.TurnDefinitions()
59
60
def tearDown(self):
61
self.turn_definitions = None
62
63
def test_new_turn_definitions_is_empty(self):
64
self.assertEqual([], self.turn_definitions.get_sources())
65
66
def test_get_sources(self):
67
self.turn_definitions.add("source", "destination", 100)
68
self.assertEqual(["source"], self.turn_definitions.get_sources())
69
70
def test_get_sources_is_sorted(self):
71
self.turn_definitions.add("source 1", "destination", 100)
72
self.turn_definitions.add("source 2", "destination", 100)
73
self.assertEqual(["source 1", "source 2"],
74
self.turn_definitions.get_sources())
75
76
def test_get_destinations(self):
77
self.turn_definitions.add("source", "destination", 100)
78
self.assertEqual(["destination"],
79
self.turn_definitions.get_destinations("source"))
80
81
def test_get_destinations_is_sorted(self):
82
self.turn_definitions.add("source", "destination 1", 100)
83
self.turn_definitions.add("source", "destination 2", 100)
84
self.assertEqual(["destination 1", "destination 2"],
85
self.turn_definitions.get_destinations("source"))
86
87
def test_get_turning_probability(self):
88
self.turn_definitions.add("source", "destination", 100)
89
self.assertEqual(100,
90
self.turn_definitions.get_turning_probability("source",
91
"destination"))
92
93
def test_probability_overflow_raises_warning_on_initial_add(self):
94
collecting_handler = CollectingHandler()
95
self.turn_definitions.logger.addHandler(collecting_handler)
96
97
self.turn_definitions.add("source", "destination", 101)
98
99
self.assertTrue(
100
"Turn probability overflow: 101.000000; lowered to 100"
101
in [record.getMessage()
102
for record in collecting_handler.log_records])
103
104
def test_probability_overflow_raises_warning_after_addition(self):
105
collecting_handler = CollectingHandler()
106
self.turn_definitions.logger.addHandler(collecting_handler)
107
108
self.turn_definitions.add("source", "destination", 90)
109
self.turn_definitions.add("source", "destination", 11)
110
111
self.assertTrue(
112
"Turn probability overflow: 101.000000; lowered to 100"
113
in [record.getMessage()
114
for record in collecting_handler.log_records])
115
116
117
class CreateTurnDefinitionsTestCase(unittest.TestCase):
118
# pylint: disable=C,R,W,E,F
119
120
def test_creation_with_0_sources(self):
121
self.assertEqual(turndefinitions.TurnDefinitions(),
122
turndefinitions.from_connections(connections.Connections()))
123
124
def test_creation_with_one_source(self):
125
input_connections = connections.Connections()
126
input_connections.add("source 1", "source lane 1", "destination 1")
127
128
turn_definitions = turndefinitions.TurnDefinitions()
129
turn_definitions.add("source 1", "destination 1", 100.0)
130
131
self.assertEqual(turn_definitions,
132
turndefinitions.from_connections(input_connections))
133
134
def test_creation_with_two_sources(self):
135
input_connections = connections.Connections()
136
input_connections.add("source 1", "source lane 1", "destination 1")
137
input_connections.add("source 2", "source lane 1", "destination 1")
138
139
turn_definitions = turndefinitions.TurnDefinitions()
140
turn_definitions.add("source 1", "destination 1", 100.0)
141
turn_definitions.add("source 2", "destination 1", 100.0)
142
143
self.assertEqual(turn_definitions,
144
turndefinitions.from_connections(input_connections))
145
146
def test_creation_two_destinations_from_two_lanes_overlapping(self):
147
input_connections = connections.Connections()
148
input_connections.add("source 1", "source lane 1", "destination 1")
149
input_connections.add("source 1", "source lane 2", "destination 2")
150
input_connections.add("source 1", "source lane 2", "destination 1")
151
152
turn_definitions = turndefinitions.TurnDefinitions()
153
turn_definitions.add(
154
"source 1", "destination 1", 100.0 / 2 / 2 + 100.0 / 2)
155
turn_definitions.add("source 1", "destination 2", 100.0 / 2 / 2)
156
157
self.assertEqual(turn_definitions,
158
turndefinitions.from_connections(input_connections))
159
160
def test_creation_one_destination_from_two_lanes(self):
161
input_connections = connections.Connections()
162
input_connections.add("source 1", "source lane 1", "destination 1")
163
input_connections.add("source 1", "source lane 2", "destination 1")
164
165
turn_definitions = turndefinitions.TurnDefinitions()
166
turn_definitions.add("source 1", "destination 1", 100.0)
167
168
self.assertEqual(turn_definitions,
169
turndefinitions.from_connections(input_connections))
170
171
def test_creation_with_one_destination_from_one_lane(self):
172
input_connections = connections.Connections()
173
input_connections.add("source 1", "source lane 1", "destination 1")
174
175
turn_definitions = turndefinitions.TurnDefinitions()
176
turn_definitions.add("source 1", "destination 1", 100.0)
177
178
self.assertEqual(turn_definitions,
179
turndefinitions.from_connections(input_connections))
180
181
def test_creation_with_two_destinations_from_one_lane(self):
182
input_connections = connections.Connections()
183
input_connections.add("source 1", "source lane 1", "destination 1")
184
input_connections.add("source 1", "source lane 1", "destination 2")
185
186
turn_definitions = turndefinitions.TurnDefinitions()
187
turn_definitions.add("source 1", "destination 1", 100.0 / 2)
188
turn_definitions.add("source 1", "destination 2", 100.0 / 2)
189
190
self.assertEqual(turn_definitions,
191
turndefinitions.from_connections(input_connections))
192
193
def test_creation_with_three_destinations_from_one_lane(self):
194
input_connections = connections.Connections()
195
input_connections.add("source 1", "source lane 1", "destination 1")
196
input_connections.add("source 1", "source lane 1", "destination 2")
197
input_connections.add("source 1", "source lane 1", "destination 3")
198
199
turn_definitions = turndefinitions.TurnDefinitions()
200
turn_definitions.add("source 1", "destination 1", 100.0 / 3)
201
turn_definitions.add("source 1", "destination 2", 100.0 / 3)
202
turn_definitions.add("source 1", "destination 3", 100.0 / 3)
203
204
self.assertEqual(turn_definitions,
205
turndefinitions.from_connections(input_connections))
206
207
208
class UniformDestinationWeightCalculatorTestCase(unittest.TestCase):
209
# pylint: disable=C,W,R,E,F
210
211
def setUp(self):
212
self.calculator = connections.UniformDestinationWeightCalculator()
213
214
def tearDown(self):
215
self.calculator = None
216
217
def test_uniform_weight_spread_across_lanes(self):
218
# 1 lane case
219
self.assertAlmostEqual(
220
100.0, self.calculator.calculate_weight(0, 1, 1))
221
222
# 2 lane case
223
self.assertAlmostEqual(50.0, self.calculator.calculate_weight(0, 2, 1))
224
self.assertAlmostEqual(50.0, self.calculator.calculate_weight(1, 2, 1))
225
226
# 2 lane case
227
self.assertAlmostEqual(
228
100.0 / 3, self.calculator.calculate_weight(0, 3, 1))
229
self.assertAlmostEqual(
230
100.0 / 3, self.calculator.calculate_weight(1, 3, 1))
231
self.assertAlmostEqual(
232
100.0 / 3, self.calculator.calculate_weight(2, 3, 1))
233
234
def test_uniform_weight_spread_across_destinations(self):
235
# 1 destination
236
self.assertAlmostEqual(
237
100.0, self.calculator.calculate_weight(0, 1, 1))
238
239
# 2 destinations
240
self.assertAlmostEqual(
241
100.0 / 2, self.calculator.calculate_weight(0, 1, 2))
242
243
# 3 destinations
244
self.assertAlmostEqual(
245
100.0 / 3, self.calculator.calculate_weight(0, 1, 3))
246
247
248
class ConnectionsTestCase(unittest.TestCase):
249
# pylint: disable=C,W,R,E,F
250
251
def setUp(self):
252
self.connections = connections.Connections()
253
254
def tearDown(self):
255
self.connections = None
256
257
def assert_contains_source(self, source):
258
self.assertTrue(source in
259
self.connections.get_sources())
260
261
def assert_contains_lane(self, source, lane):
262
self.assertTrue(lane in
263
self.connections.get_lanes(source))
264
265
def assert_contains_destination(self, source, lane, destination):
266
self.assertTrue(destination in
267
self.connections.get_destinations(source, lane))
268
269
def assert_sources_no(self, sources_no):
270
self.assertEqual(sources_no,
271
len(self.connections.get_sources()))
272
273
def assert_lanes_no(self, source, lanes_no):
274
self.assertEqual(lanes_no,
275
len(self.connections.get_lanes(source)))
276
277
def assert_destinations_no(self, source, lane, destinations_no):
278
self.assertEqual(destinations_no,
279
len(self.connections.get_destinations(source, lane)))
280
281
def test_map_empty_on_init(self):
282
self.assert_sources_no(0)
283
284
def test_add(self):
285
self.connections.add("source", "source lane", "destination")
286
287
self.assert_sources_no(1)
288
self.assert_contains_source("source")
289
290
self.assert_lanes_no("source", 1)
291
self.assert_contains_lane("source", "source lane")
292
293
self.assert_destinations_no("source", "source lane", 1)
294
self.assert_contains_destination(
295
"source", "source lane", "destination")
296
297
def test_add_different_lanes(self):
298
self.connections.add("source", "source lane 1", "destination")
299
self.connections.add("source", "source lane 2", "destination")
300
301
self.assert_sources_no(1)
302
self.assert_contains_source("source")
303
304
self.assert_lanes_no("source", 2)
305
self.assert_contains_lane("source", "source lane 1")
306
self.assert_contains_lane("source", "source lane 2")
307
308
self.assert_destinations_no("source", "source lane 1", 1)
309
self.assert_contains_destination(
310
"source", "source lane 1", "destination")
311
312
self.assert_destinations_no("source", "source lane 2", 1)
313
self.assert_contains_destination(
314
"source", "source lane 2", "destination")
315
316
def test_add_different_destinations(self):
317
self.connections.add("source", "source lane", "destination 1")
318
self.connections.add("source", "source lane", "destination 2")
319
320
self.assert_sources_no(1)
321
self.assert_contains_source("source")
322
323
self.assert_lanes_no("source", 1)
324
self.assert_contains_lane("source", "source lane")
325
326
self.assert_destinations_no("source", "source lane", 2)
327
self.assert_contains_destination(
328
"source", "source lane", "destination 1")
329
self.assert_contains_destination(
330
"source", "source lane", "destination 2")
331
332
def test_readd_raises_warning(self):
333
collecting_handler = CollectingHandler()
334
self.connections.logger.addHandler(collecting_handler)
335
336
self.connections.add("source", "source lane", "destination")
337
self.connections.add("source", "source lane", "destination")
338
339
self.assert_sources_no(1)
340
self.assert_contains_source("source")
341
342
self.assert_lanes_no("source", 1)
343
self.assert_contains_lane("source", "source lane")
344
345
self.assert_destinations_no("source", "source lane", 1)
346
self.assert_contains_destination("source", "source lane", "destination")
347
348
self.assertTrue(
349
"Destination for source (lane source lane) readded: destination"
350
in [record.getMessage() for record in collecting_handler.log_records])
351
352
353
if __name__ == "__main__":
354
unittest.main()
355
356