Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
eclipse
GitHub Repository: eclipse/sumo
Path: blob/main/tools/distributeChargingStations.py
169660 views
1
#!/usr/bin/env python
2
# Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
3
# Copyright (C) 2010-2025 German Aerospace Center (DLR) and others.
4
# This program and the accompanying materials are made available under the
5
# terms of the Eclipse Public License 2.0 which is available at
6
# https://www.eclipse.org/legal/epl-2.0/
7
# This Source Code may also be made available under the following Secondary
8
# Licenses when the conditions for such availability set forth in the Eclipse
9
# Public License 2.0 are satisfied: GNU General Public License, version 2
10
# or later which is available at
11
# https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13
14
# @file distributeChargingStations.py
15
# @author Mirko Barthauer
16
# @date 2024-04-04
17
18
19
from __future__ import print_function
20
from __future__ import absolute_import
21
import os
22
import sys
23
import random
24
import math
25
26
if 'SUMO_HOME' in os.environ:
27
sys.path.append(os.path.join(os.environ['SUMO_HOME'], 'tools'))
28
import sumolib # noqa
29
30
31
def getOptions(args=None):
32
argParser = sumolib.options.ArgumentParser(
33
description="Add charging stations on parkings and rebuild parkings if necessary")
34
argParser.add_argument("-n", "--net-file", category="input", dest="netFile", required=True,
35
help="define the net file (mandatory)")
36
argParser.add_argument("-a", "--add-files", category="input", dest="addFiles", required=True,
37
help="define the base parking areas and charging stations")
38
argParser.add_argument("--selection-file", category="input", dest="selectionFile",
39
help="optionally restrict the parking area to the selected net part")
40
argParser.add_argument("-o", "--output-file", category="output", dest="outFile",
41
default="parkingCharging.add.xml",
42
help="define the output filename for charging station definitions "
43
"(and by default parking areas)")
44
argParser.add_argument("--output-parking-file", category="output", dest="outParkingFile",
45
help="define the output filename for the separate parking additional file")
46
argParser.add_argument("--separate-unused-parkings", dest="separateUnusedParkings", action="store_true",
47
default=False,
48
help="Write parkings with charging stations to the main output file "
49
"and other parkings to the separate one for parkings")
50
argParser.add_argument("-p", "--probability", category="processing", type=float, default=1,
51
help="Probability for an edge to receive a charging station")
52
argParser.add_argument("-d", "--density", category="processing", type=float, default=0.1,
53
help="Share of parking spaces on the edge which should get charging points")
54
argParser.add_argument("--power", category="processing", type=float, default=22000,
55
help="Charging power of the charging station")
56
argParser.add_argument("--efficiency", category="processing", type=float, default=0.95,
57
help="Charging efficiency")
58
argParser.add_argument("--min", category="processing", type=int, default=1,
59
help="Minimum number of charging points per parking")
60
argParser.add_argument("--max", category="processing", type=int, default=100,
61
help="Maximum number of charging points per parking")
62
argParser.add_argument("--prefix", category="processing", default="cs",
63
help="prefix for the charging station IDs")
64
argParser.add_argument("--suffix", category="processing", default="_shift",
65
help="suffix for ID of splitted parkingArea")
66
argParser.add_argument("-s", "--seed", category="processing", type=int, default=42, help="random seed")
67
argParser.add_argument("--vclass", category="processing", default="passenger",
68
help="only use edges which permit the given vehicle class")
69
argParser.add_argument("--entire-parkings", dest="entireParkings", action="store_true",
70
default=False,
71
help="If set, parkings are not divided if the number of charging stations "
72
"is smaller than the parking capacity")
73
argParser.add_argument("--include-existing", dest="includeExisting", action="store_true",
74
default=False,
75
help="If set, loaded charging stations from input files will contribute to the density")
76
argParser.add_argument("--skip-equipped-edges", dest="skipEquippedEdges", action="store_true",
77
default=False,
78
help="If set, edges where a charging station already exists are skipped")
79
argParser.add_argument("--only-roadside", dest="onlyRoadside", action="store_true",
80
default=False, help="Only use roadside parking for charging points")
81
argParser.add_argument("--only-parking-lot", dest="onlyParkingLot", action="store_true",
82
default=False, help="Only use parking lots off the road for charging points")
83
argParser.add_argument("-v", "--verbose", category="processing", action="store_true",
84
default=False, help="tell me what you are doing")
85
86
options = argParser.parse_args(args=args)
87
if not options.netFile or not options.addFiles:
88
argParser.print_help()
89
sys.exit(1)
90
if options.onlyRoadside and options.onlyParkingLot:
91
sys.exit("Please use at most one of the parameters --only-parking-lot and --only-roadside.")
92
options.min = max(0, options.min)
93
options.max = max(0, options.max)
94
if options.min > options.max:
95
print("Will swap min and max charging point number due to wrong order (%d > %d)" % (options.min, options.max))
96
options.min, options.max = options.max, options.min
97
options.density = max(0., min(options.density, 1.))
98
if options.separateUnusedParkings and options.outParkingFile is None:
99
options.separateUnusedParkings = False
100
if options.verbose:
101
print("Cannot separate parkings as the output file has not been provided by --output-parking-file).")
102
return options
103
104
105
def addChildToParent(parentEl, childEl, secondChildTags=[]):
106
addedChildEl = parentEl.addChild(childEl.name, {t[0]: t[1] for t in childEl.getAttributes()}, sortAttrs=False)
107
for secondChildEl in childEl.getChildList():
108
if secondChildEl.name in secondChildTags:
109
addedChildEl.addChild(secondChildEl.name, {t[0]: t[1]
110
for t in secondChildEl.getAttributes()}, sortAttrs=False)
111
return addedChildEl
112
113
114
def main(options):
115
random.seed(options.seed)
116
net = sumolib.net.readNet(options.netFile)
117
checkSelection = False
118
if options.selectionFile is not None:
119
net.loadSelection(options.selectionFile)
120
checkSelection = True
121
122
# read data of predefined parkingAreas and charging stations
123
edge2parkingArea = {}
124
edge2chargingPointCount = {}
125
additionals = []
126
for addFile in options.addFiles.split(","):
127
if options.verbose:
128
print("Load additional file %s" % addFile)
129
additionals.extend(list(sumolib.xml.parse(addFile, "additional"))[0].getChildList())
130
totalCapacity = 0
131
paCount = 0
132
existingChargingStations = []
133
unusedParkings = []
134
for node in additionals:
135
if node.name == "chargingStation" and node.hasAttribute("parkingArea"):
136
existingChargingStations.append(node)
137
elif node.name == "parkingArea":
138
if int(node.getAttributeSecure("roadsideCapacity", 0)) > 0 and options.onlyParkingLot:
139
unusedParkings.append(node)
140
continue
141
if node.hasChild("space") and options.onlyRoadside:
142
unusedParkings.append(node)
143
continue
144
edge = net.getLane(node.lane).getEdge()
145
if not edge.allows(options.vclass):
146
continue
147
capacity = sum(determineParkingCapacity(node))
148
if edge not in edge2parkingArea:
149
edge2parkingArea[edge] = []
150
edge2parkingArea[edge].append([node, capacity])
151
paCount += 1
152
totalCapacity += capacity
153
if options.verbose:
154
print("Loaded %d parkings (%d already equipped with charging stations) with a total of %d parking lots." %
155
(paCount, len(existingChargingStations), totalCapacity))
156
print("%d unused parkings due to settings." % len(unusedParkings))
157
158
# count already existing charging points per edge
159
if options.includeExisting:
160
for cs in existingChargingStations:
161
edge = net.getLane(cs.getAttribute("lane")).getEdge()
162
for item in edge2parkingArea[edge]:
163
if item[0].getAttribute("id") == cs.getAttribute("parkingArea"):
164
if edge not in edge2chargingPointCount:
165
edge2chargingPointCount[edge] = 0
166
edge2chargingPointCount[edge] += item[1]
167
item[1] = 0
168
break
169
170
# iterate edges with parkingArea groups and randomly select a charging point count
171
totalChargingPoints = math.floor(totalCapacity * options.probability * options.density)
172
if options.verbose:
173
print("Charging points to distribute using a density of %.2f: %.0f on %.0f parking spaces (%.2f%%)" %
174
(options.density, totalChargingPoints, totalCapacity,
175
totalChargingPoints/totalCapacity*100 if totalCapacity > 0 else 0))
176
csIndex = 0
177
rootParking = None
178
with sumolib.openz(options.outFile, 'w') as outf:
179
rootCharging = sumolib.xml.create_document("additional")
180
rootParking = sumolib.xml.create_document("additional") if options.outParkingFile else rootCharging
181
for unusedParking in unusedParkings:
182
addChildToParent(rootParking, unusedParking)
183
unchangedParkings = []
184
unvisitedEdges = []
185
for edge, parkingAreas in edge2parkingArea.items():
186
if ((checkSelection and not edge.isSelected()) or len(parkingAreas) == 0 or
187
(options.skipEquippedEdges and edge2chargingPointCount.get(edge, 0) > 0)):
188
if len(parkingAreas) > 0:
189
unchangedParkings.extend([pa[0] for pa in parkingAreas])
190
continue
191
unvisitedEdges.append(edge)
192
193
assignBalance = 0
194
while totalChargingPoints > 0 and len(unvisitedEdges) > 0:
195
# select edge
196
randomNumber = random.random()
197
selectedEdge = unvisitedEdges[int(randomNumber * len(unvisitedEdges))]
198
unvisitedEdges.remove(selectedEdge)
199
usedParkingAreas = []
200
parkingAreas = edge2parkingArea[selectedEdge]
201
capacities = [p[1] for p in parkingAreas]
202
parkingSum = sum(capacities)
203
chargingPointDiscount = edge2chargingPointCount.get(selectedEdge, 0) if options.includeExisting else 0
204
wishedChargingPointCount = max(0, math.floor(options.density * parkingSum) - chargingPointDiscount)
205
if parkingSum < options.min:
206
assignBalance -= wishedChargingPointCount
207
continue
208
chargingPointCount = min(parkingSum, min(options.max, max(options.min, wishedChargingPointCount)))
209
if options.verbose:
210
print("Charging points before balancing: cp %d wished %d discount %d" %
211
(chargingPointCount, wishedChargingPointCount, chargingPointDiscount))
212
213
openDelta = wishedChargingPointCount - chargingPointCount
214
if openDelta > 0 and assignBalance < 0:
215
addPoints = max(1, int(min(-assignBalance, parkingSum - chargingPointCount) * randomNumber))
216
chargingPointCount += addPoints
217
assignBalance += addPoints
218
if chargingPointCount < wishedChargingPointCount:
219
assignBalance -= wishedChargingPointCount - chargingPointCount
220
221
if options.verbose:
222
print("\tDistribute %d charging points on edge %s" % (chargingPointCount, str(selectedEdge.getID())))
223
224
# first check if the charging point fits exactly one parkingArea
225
remainingChargingPoints = chargingPointCount
226
227
# optional: do not divide parkings
228
# and just choose the one which matches best the select charging point number
229
if options.entireParkings:
230
closestCapacityIndex = min(range(len(capacities)), key=lambda i: abs(
231
capacities[i]-remainingChargingPoints))
232
addChargingStation(options, rootCharging, rootParking, selectedEdge,
233
parkingAreas[closestCapacityIndex][0],
234
capacities[closestCapacityIndex], "%s%d" % (options.prefix, csIndex))
235
csIndex += 1
236
remainingChargingPoints = 0
237
usedParkingAreas.append(parkingAreas[closestCapacityIndex][0])
238
else:
239
for i in range(len(capacities)):
240
if capacities[i] >= remainingChargingPoints:
241
if options.verbose:
242
print("Add charging station to parking %s with %d spaces." %
243
(parkingAreas[i][0].getAttribute("id"), parkingAreas[i][1]))
244
addChargingStation(options, rootCharging, rootParking, selectedEdge, parkingAreas[i][0],
245
remainingChargingPoints, "%s%d" % (options.prefix, csIndex))
246
csIndex += 1
247
remainingChargingPoints = 0
248
usedParkingAreas.append(parkingAreas[i][0])
249
break
250
# then distribute across the parkingAreas in definition order
251
if remainingChargingPoints > 0:
252
capacities = [p[1] for p in parkingAreas]
253
for i in range(len(capacities)):
254
if parkingAreas[i][0] in usedParkingAreas:
255
continue
256
if options.verbose:
257
print("Add charging station to parking %s with %d spaces." %
258
(parkingAreas[i][0].getAttribute("id"), parkingAreas[i][1]))
259
installChargingPoints = min(remainingChargingPoints, capacities[i])
260
addChargingStation(options, rootCharging, rootParking, selectedEdge, parkingAreas[i][0],
261
installChargingPoints, "%s%d" % (options.prefix, csIndex))
262
csIndex += 1
263
remainingChargingPoints -= installChargingPoints
264
usedParkingAreas.append(parkingAreas[i][0])
265
if remainingChargingPoints == 0:
266
break
267
totalChargingPoints -= chargingPointCount
268
269
# write unchanged parkings
270
for node, _ in parkingAreas:
271
if node not in usedParkingAreas:
272
unchangedParkings.append(node)
273
274
# add more unchanged parkings
275
for edge in unvisitedEdges:
276
if edge in edge2parkingArea:
277
unchangedParkings.extend([pa[0] for pa in edge2parkingArea[edge]])
278
279
for node in unchangedParkings:
280
addChildToParent(rootParking, node, secondChildTags=["param", "space"])
281
282
# write existing charging stations
283
for csNode in existingChargingStations:
284
addChildToParent(rootCharging, csNode)
285
outf.write(rootCharging.toXML())
286
if options.verbose:
287
print("Final charging point balance: %.0f" % totalChargingPoints)
288
if options.outParkingFile:
289
outfParking = open(options.outParkingFile, 'w')
290
outfParking.write(rootParking.toXML())
291
outfParking.close()
292
293
294
def addChargingStation(options, rootCharging, rootParking, edge, parkingArea, chargingPoints, csID):
295
parkingCapacity = determineParkingCapacity(parkingArea)
296
if chargingPoints <= sum(parkingCapacity) and chargingPoints > 0:
297
# downsize parkingArea and create a new one for the remaining parking spaces
298
chargingRoadSide = min(parkingCapacity[0], chargingPoints)
299
chargingOnSpaces = chargingPoints - chargingRoadSide
300
shiftRoadSideCapacity = parkingCapacity[0] - chargingRoadSide
301
shiftSpaces = parkingCapacity[1] - chargingOnSpaces
302
startPos = float(parkingArea.startPos) if parkingArea.startPos is not None else 0
303
endPos = float(parkingArea.endPos) if parkingArea.endPos is not None else edge.getLength()
304
if startPos < 0:
305
startPos += edge.getLength()
306
if endPos < 0:
307
endPos += edge.getLength()
308
posDownSize = (startPos, endPos) if chargingOnSpaces > 0 else (
309
startPos, startPos + (endPos - startPos) * chargingRoadSide / sum(parkingCapacity))
310
assert (posDownSize[0] < posDownSize[1])
311
parkingArea.roadsideCapacity = str(chargingRoadSide)
312
remainingSpaces = []
313
314
if shiftRoadSideCapacity + shiftSpaces > 0:
315
parkingArea.setAttribute("startPos", str(posDownSize[0]))
316
parkingArea.setAttribute("endPos", str(posDownSize[1]))
317
posShift = (posDownSize[0], posDownSize[1]
318
) if chargingRoadSide == parkingCapacity[0] else (posDownSize[1], endPos)
319
spacesToShift = []
320
if shiftSpaces > 0:
321
spacesToShift.extend(parkingArea.getChild("space")[chargingOnSpaces:])
322
remainingSpaces.extend(parkingArea.getChild("space")[:chargingOnSpaces])
323
shiftedPaDict = {t[0]: t[1] for t in parkingArea.getAttributes()}
324
shiftedPaDict["id"] = "%s%s" % (shiftedPaDict["id"], options.suffix)
325
shiftedPaDict["startPos"] = str(posShift[0])
326
shiftedPaDict["endPos"] = str(posShift[1])
327
shiftedPaDict["roadsideCapacity"] = str(shiftRoadSideCapacity)
328
shiftedParkingArea = rootParking.addChild(parkingArea.name, shiftedPaDict, sortAttrs=False)
329
for spaceToShift in spacesToShift:
330
addChildToParent(shiftedParkingArea, spaceToShift)
331
if parkingArea.hasChild("param"):
332
for paramEl in parkingArea.getChild("param"):
333
addChildToParent(shiftedParkingArea, paramEl)
334
elif parkingArea.hasChild("space"):
335
remainingSpaces.extend(parkingArea.getChild("space"))
336
remainingParking = addChildToParent(
337
rootCharging if options.separateUnusedParkings else rootParking, parkingArea, secondChildTags=["param"])
338
if len(remainingSpaces) > 0:
339
for remainingSpace in remainingSpaces:
340
addChildToParent(remainingParking, remainingSpace)
341
rootCharging.addChild("chargingStation", {"id": csID,
342
"lane": parkingArea.lane,
343
"startPos": parkingArea.startPos,
344
"endPos": parkingArea.endPos,
345
"power": str(options.power),
346
"efficiency": str(options.efficiency),
347
"parkingArea": parkingArea.id}, sortAttrs=False)
348
return True
349
return False
350
351
352
def determineParkingCapacity(parkingArea):
353
roadSide = int(parkingArea.getAttributeSecure("roadsideCapacity", 0))
354
spaces = 0
355
if parkingArea.hasChild("space"):
356
spaces += len(parkingArea.getChild("space"))
357
return (roadSide, spaces)
358
359
360
if __name__ == "__main__":
361
if not main(getOptions()):
362
sys.exit(1)
363
364