Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
eclipse
GitHub Repository: eclipse/sumo
Path: blob/main/src/utils/iodevices/ParquetFormatter.h
169678 views
1
/****************************************************************************/
2
// Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
3
// Copyright (C) 2012-2025 German Aerospace Center (DLR) and others.
4
// This program and the accompanying materials are made available under the
5
// terms of the Eclipse Public License 2.0 which is available at
6
// https://www.eclipse.org/legal/epl-2.0/
7
// This Source Code may also be made available under the following Secondary
8
// Licenses when the conditions for such availability set forth in the Eclipse
9
// Public License 2.0 are satisfied: GNU General Public License, version 2
10
// or later which is available at
11
// https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13
/****************************************************************************/
14
/// @file ParquetFormatter.h
15
/// @author Michael Behrisch
16
/// @date 2025-06-17
17
///
18
// Output formatter for Parquet output
19
/****************************************************************************/
20
#pragma once
21
#include <config.h>
22
23
#include <ostream>
24
25
#ifdef _MSC_VER
26
/* Disable warning about unmatched push / pop.
27
TODO Re-enable this once it has been solved upstream, see https://github.com/apache/arrow/issues/47099 */
28
#pragma warning(suppress: 5032)
29
#pragma warning(push)
30
/* Disable warning about unused parameters */
31
#pragma warning(disable: 4100)
32
/* Disable warning about hidden function arrow::io::Writable::Write */
33
#pragma warning(disable: 4266)
34
/* Disable warning about padded memory layout */
35
#pragma warning(disable: 4324)
36
/* Disable warning about this in initializers */
37
#pragma warning(disable: 4355)
38
/* Disable warning about changed memory layout due to virtual base class */
39
#pragma warning(disable: 4435)
40
/* Disable warning about declaration hiding class member */
41
#pragma warning(disable: 4458)
42
/* Disable warning about implicit conversion of int to bool */
43
#pragma warning(disable: 4800)
44
#endif
45
#include <arrow/api.h>
46
#include <parquet/arrow/writer.h>
47
#ifdef _MSC_VER
48
/* Disable warning about unmatched push / pop.
49
TODO Re-enable this once it has been solved upstream, see https://github.com/apache/arrow/issues/47099 */
50
#pragma warning(suppress: 5031)
51
#pragma warning(pop)
52
#endif
53
54
#include <utils/common/ToString.h>
55
#include "OutputFormatter.h"
56
57
58
// ===========================================================================
59
// class definitions
60
// ===========================================================================
61
/**
62
* @class ParquetFormatter
63
* @brief Output formatter for Parquet output
64
*/
65
class ParquetFormatter : public OutputFormatter {
66
public:
67
/// @brief Constructor
68
// for some motivation on the default batch size see https://stackoverflow.com/questions/76782018/what-is-actually-meant-when-referring-to-parquet-row-group-size
69
ParquetFormatter(const std::string& columnNames, const std::string& compression = "", const int batchSize = 1000000);
70
71
/// @brief Destructor
72
virtual ~ParquetFormatter() { }
73
74
/** @brief Keeps track of an open XML tag by adding a new element to the stack
75
*
76
* @param[in] into The output stream to use (unused)
77
* @param[in] xmlElement Name of element to open (unused)
78
* @return The OutputDevice for further processing
79
*/
80
void openTag(std::ostream& into, const std::string& xmlElement);
81
82
/** @brief Keeps track of an open XML tag by adding a new element to the stack
83
*
84
* @param[in] into The output stream to use (unused)
85
* @param[in] xmlElement Name of element to open (unused)
86
*/
87
void openTag(std::ostream& into, const SumoXMLTag& xmlElement);
88
89
/** @brief Closes the most recently opened tag
90
*
91
* @param[in] into The output stream to use
92
* @return Whether a further element existed in the stack and could be closed
93
* @todo it is not verified that the topmost element was closed
94
*/
95
bool closeTag(std::ostream& into, const std::string& comment = "");
96
97
/** @brief writes a named attribute
98
*
99
* @param[in] attr The attribute (name)
100
* @param[in] val The attribute value
101
* @param[in] isNull The given value is not set
102
*/
103
template <class T>
104
void writeAttr(std::ostream& /* into */, const SumoXMLAttr attr, const T& val, const bool isNull = false) {
105
checkAttr(attr);
106
if (!myWroteHeader) {
107
mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(toString(attr)), arrow::utf8()));
108
myBuilders.push_back(std::make_shared<arrow::StringBuilder>());
109
}
110
myValues.push_back(isNull ? nullptr : std::make_shared<arrow::StringScalar>(toString(val)));
111
}
112
113
template <class T>
114
void writeAttr(std::ostream& /* into */, const std::string& attr, const T& val) {
115
assert(!myCheckColumns);
116
if (!myWroteHeader) {
117
mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(attr), arrow::utf8()));
118
myBuilders.push_back(std::make_shared<arrow::StringBuilder>());
119
}
120
myValues.push_back(std::make_shared<arrow::StringScalar>(toString(val)));
121
}
122
123
void writeTime(std::ostream& into, const SumoXMLAttr attr, const SUMOTime val) {
124
if (!gHumanReadableTime) {
125
if (!myWroteHeader) {
126
mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(toString(attr)), arrow::float64()));
127
myBuilders.push_back(std::make_shared<arrow::DoubleBuilder>());
128
}
129
myValues.push_back(std::make_shared<arrow::DoubleScalar>(STEPS2TIME(val)));
130
return;
131
}
132
writeAttr(into, attr, time2string(val));
133
}
134
135
bool wroteHeader() const {
136
return myWroteHeader;
137
}
138
139
void setExpectedAttributes(const SumoXMLAttrMask& expected, const int depth = 2) {
140
myExpectedAttrs = expected;
141
myMaxDepth = depth;
142
myCheckColumns = expected.any();
143
}
144
145
private:
146
inline const std::string getAttrString(const std::string& attrString) {
147
if (myHeaderFormat == "plain") {
148
return attrString;
149
}
150
if (myHeaderFormat == "auto") {
151
for (const auto& field : mySchema->fields()) {
152
if (field->name() == attrString) {
153
return myCurrentTag + "_" + attrString;
154
}
155
}
156
return attrString;
157
}
158
return myCurrentTag + "_" + attrString;
159
}
160
161
inline void checkAttr(const SumoXMLAttr attr) {
162
if (myCheckColumns && myMaxDepth == (int)myXMLStack.size()) {
163
mySeenAttrs.set(attr);
164
if (!myExpectedAttrs.test(attr)) {
165
throw ProcessError(TLF("Unexpected attribute '%', this file format does not support Parquet output yet.", toString(attr)));
166
}
167
}
168
}
169
170
/// @brief the format to use for the column names
171
const std::string myHeaderFormat;
172
173
/// @brief the compression to use
174
parquet::Compression::type myCompression = parquet::Compression::UNCOMPRESSED;
175
176
/// @brief the number of rows to write per batch
177
const int myBatchSize;
178
179
/// @brief the currently read tag (only valid when generating the header)
180
std::string myCurrentTag;
181
182
/// @brief the table schema
183
std::shared_ptr<arrow::Schema> mySchema = arrow::schema({});
184
185
/// @brief the output stream writer
186
std::unique_ptr<parquet::arrow::FileWriter> myParquetWriter;
187
188
/// @brief the content array builders for the table
189
std::vector<std::shared_ptr<arrow::ArrayBuilder> > myBuilders;
190
191
/// @brief The number of attributes in the currently open XML elements
192
std::vector<int> myXMLStack;
193
194
/// @brief the current attribute / column values
195
std::vector<std::shared_ptr<arrow::Scalar> > myValues;
196
197
/// @brief the maximum depth of the XML hierarchy
198
int myMaxDepth = 0;
199
200
/// @brief whether the schema has been constructed completely
201
bool myWroteHeader = false;
202
203
/// @brief whether the columns should be checked for completeness
204
bool myCheckColumns = false;
205
206
/// @brief the attributes which are expected for a complete row (including null values)
207
SumoXMLAttrMask myExpectedAttrs;
208
209
/// @brief the attributes already seen (including null values)
210
SumoXMLAttrMask mySeenAttrs;
211
};
212
213
214
// ===========================================================================
215
// specialized template implementations
216
// ===========================================================================
217
template <>
218
inline void ParquetFormatter::writeAttr(std::ostream& into, const SumoXMLAttr attr, const double& val, const bool isNull) {
219
checkAttr(attr);
220
if (attr == SUMO_ATTR_X || attr == SUMO_ATTR_Y || into.precision() > 2) {
221
if (!myWroteHeader) {
222
mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(toString(attr)), arrow::float64()));
223
myBuilders.push_back(std::make_shared<arrow::DoubleBuilder>());
224
}
225
myValues.push_back(isNull ? nullptr : std::make_shared<arrow::DoubleScalar>(val));
226
} else {
227
if (!myWroteHeader) {
228
mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(toString(attr)), arrow::float32()));
229
myBuilders.push_back(std::make_shared<arrow::FloatBuilder>());
230
}
231
myValues.push_back(isNull ? nullptr : std::make_shared<arrow::FloatScalar>((float)val));
232
}
233
}
234
235
template <>
236
inline void ParquetFormatter::writeAttr(std::ostream& /* into */, const SumoXMLAttr attr, const int& val, const bool isNull) {
237
checkAttr(attr);
238
if (!myWroteHeader) {
239
mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(toString(attr)), arrow::int32()));
240
myBuilders.push_back(std::make_shared<arrow::Int32Builder>());
241
}
242
myValues.push_back(isNull ? nullptr : std::make_shared<arrow::Int32Scalar>(val));
243
}
244
245
template <>
246
inline void ParquetFormatter::writeAttr(std::ostream& into, const std::string& attr, const double& val) {
247
assert(!myCheckColumns);
248
if (into.precision() > 2) {
249
if (!myWroteHeader) {
250
mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(attr), arrow::float64()));
251
myBuilders.push_back(std::make_shared<arrow::DoubleBuilder>());
252
}
253
myValues.push_back(std::make_shared<arrow::DoubleScalar>(val));
254
} else {
255
if (!myWroteHeader) {
256
mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(attr), arrow::float32()));
257
myBuilders.push_back(std::make_shared<arrow::FloatBuilder>());
258
}
259
myValues.push_back(std::make_shared<arrow::FloatScalar>((float)val));
260
}
261
}
262
263
template <>
264
inline void ParquetFormatter::writeAttr(std::ostream& /* into */, const std::string& attr, const int& val) {
265
assert(!myCheckColumns);
266
if (!myWroteHeader) {
267
mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(attr), arrow::int32()));
268
myBuilders.push_back(std::make_shared<arrow::Int32Builder>());
269
}
270
myValues.push_back(std::make_shared<arrow::Int32Scalar>(val));
271
}
272
273