Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
eclipse
GitHub Repository: eclipse/sumo
Path: blob/main/tests/complex/fmi/runner.py
169678 views
1
#!/usr/bin/env python
2
# Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
3
# Copyright (C) 2008-2025 German Aerospace Center (DLR) and others.
4
# This program and the accompanying materials are made available under the
5
# terms of the Eclipse Public License 2.0 which is available at
6
# https://www.eclipse.org/legal/epl-2.0/
7
# This Source Code may also be made available under the following Secondary
8
# Licenses when the conditions for such availability set forth in the Eclipse
9
# Public License 2.0 are satisfied: GNU General Public License, version 2
10
# or later which is available at
11
# https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13
14
# @file runner.py
15
# @author Daniel Krajzewicz
16
# @author Michael Behrisch
17
# @author Matthias Schwamborn
18
# @date 2012-10-19
19
20
from __future__ import absolute_import
21
from __future__ import print_function
22
23
import os
24
import shutil
25
import sys
26
27
from fmpy import read_model_description, extract
28
from fmpy.fmi2 import FMU2Slave
29
from fmpy.validation import validate_fmu
30
31
VERBOSE = True
32
33
sumoHome = os.path.abspath(os.environ['SUMO_HOME'])
34
sys.path.append(os.path.join(sumoHome, "tools"))
35
import sumolib # noqa
36
37
egoID = "ego"
38
39
40
def determineFMUFilename():
41
FMU_FILENAME = 'sumo-fmi2.fmu'
42
BITS = '64' if sys.maxsize > 2**32 else '32'
43
if os.name == 'posix':
44
FMU_FILENAME = 'sumo-fmi2-linux' + BITS + '.fmu'
45
elif os.name == 'nt':
46
FMU_FILENAME = 'sumo-fmi2-win' + BITS + '.fmu'
47
else:
48
FMU_FILENAME = 'sumo-fmi2-darwin' + BITS + '.fmu'
49
return FMU_FILENAME
50
51
52
def runSingle(startTime, endTime, validate, scalarVariable):
53
callOptions = "-n input_net.net.xml -r input_routes.rou.xml"
54
55
fmuLocation = os.path.join(sumoHome, "bin", determineFMUFilename())
56
modelDescription = read_model_description(fmuLocation, validate=False)
57
58
if validate:
59
try:
60
problems = validate_fmu(fmuLocation)
61
except Exception as e:
62
problems = [str(e)]
63
if problems:
64
print("Problems found during FMU validation:")
65
for p in problems:
66
print("\t", p)
67
sys.exit(1)
68
else:
69
print("FMU validation successful")
70
71
# collect the value references
72
valueRefs = {}
73
for var in modelDescription.modelVariables:
74
valueRefs[var.name] = var.valueReference
75
76
# print("value references:")
77
# for vr in valueRefs:
78
# print("\t%d %s" % (valueRefs[vr], vr))
79
80
unzipDir = extract(fmuLocation)
81
82
fmu = FMU2Slave(guid=modelDescription.guid,
83
unzipDirectory=unzipDir,
84
modelIdentifier=modelDescription.coSimulation.modelIdentifier,
85
instanceName='instance_01')
86
87
# initialize FMU
88
fmu.instantiate()
89
fmu.setupExperiment(startTime=startTime, stopTime=endTime)
90
fmu.setString([valueRefs['simulation.libsumo.callOptions']], [callOptions])
91
fmu.enterInitializationMode()
92
fmu.exitInitializationMode()
93
94
time = startTime
95
stepSize = modelDescription.defaultExperiment.stepSize
96
97
print("Simulating %s (model name '%s', FMI v%s)..." % (
98
modelDescription.coSimulation.modelIdentifier, modelDescription.modelName, modelDescription.fmiVersion))
99
100
if scalarVariable == "vehicle.getParameterWithKey":
101
fmu.setString([valueRefs['setGetterParameters']], ["ego meaningOfLife"])
102
resultList = fmu.getString([valueRefs['vehicle.getParameterWithKey']])
103
key = resultList[0].decode('UTF-8').split()[0]
104
value = resultList[0].decode('UTF-8').split()[1]
105
print("key, value: \'%s\', \'%s\'" % (key, value))
106
107
while time < endTime:
108
if scalarVariable == "vehicle.moveToXY":
109
# get current laneID of ego vehicle
110
fmu.setString([valueRefs['setGetterParameters']], ["ego"])
111
resultList = fmu.getString([valueRefs['vehicle.getLaneID']])
112
laneID = resultList[0].decode('UTF-8')
113
print("TIME %s: laneID = \'%s\'" % (time, laneID))
114
if laneID is not None and len(laneID) > 0:
115
edgeID = laneID[:laneID.rfind('_')]
116
laneIndex = laneID[laneID.rfind('_') + 1:]
117
resultList = fmu.getString([valueRefs['vehicle.getPosition']])
118
currentPos = [float(x) for x in resultList[0].decode('UTF-8').split()]
119
targetPos = (currentPos[0] + 10, currentPos[1])
120
print("currentPos =", currentPos)
121
print("targetPos =", targetPos)
122
fmu.setString([valueRefs['vehicle.moveToXY']],
123
["ego " + edgeID + " " + laneIndex + " " + str(targetPos[0]) + " " + str(targetPos[1])])
124
125
# perform one step
126
fmu.doStep(currentCommunicationPoint=time, communicationStepSize=stepSize)
127
# advance the time
128
time += stepSize
129
130
if scalarVariable == "vehicle.getIDCount":
131
vehIDCount = fmu.getInteger([valueRefs['vehicle.getIDCount']])[0]
132
print("TIME %s: vehicle.getIDCount = %s" % (time, vehIDCount))
133
134
# clean up
135
fmu.terminate()
136
fmu.freeInstance()
137
shutil.rmtree(unzipDir, ignore_errors=True)
138
139
print("Print ended at step %s" % time)
140
sys.stdout.flush()
141
142
143
if len(sys.argv) < 5:
144
print("Usage: runner <startTime> <endTime> <validateFMU> <ScalarVariable>")
145
sys.exit("")
146
sys.stdout.flush()
147
148
startTime = int(sys.argv[1])
149
endTime = int(sys.argv[2])
150
validate = sumolib.miscutils.parseBool(sys.argv[3])
151
scalarVariable = sys.argv[4]
152
153
runSingle(startTime, endTime, validate, scalarVariable)
154
155