Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
eclipse
GitHub Repository: eclipse/sumo
Path: blob/main/src/utils/iodevices/ParquetFormatter.cpp
169678 views
1
/****************************************************************************/
2
// Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
3
// Copyright (C) 2012-2025 German Aerospace Center (DLR) and others.
4
// This program and the accompanying materials are made available under the
5
// terms of the Eclipse Public License 2.0 which is available at
6
// https://www.eclipse.org/legal/epl-2.0/
7
// This Source Code may also be made available under the following Secondary
8
// Licenses when the conditions for such availability set forth in the Eclipse
9
// Public License 2.0 are satisfied: GNU General Public License, version 2
10
// or later which is available at
11
// https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12
// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13
/****************************************************************************/
14
/// @file ParquetFormatter.cpp
15
/// @author Michael Behrisch
16
/// @date 2025-06-17
17
///
18
// An output formatter for Parquet files
19
/****************************************************************************/
20
#include <config.h>
21
22
#ifdef _MSC_VER
23
#pragma warning(push)
24
/* Disable warning about changed memory layout due to virtual base class */
25
#pragma warning(disable: 4435)
26
#endif
27
#include <arrow/io/api.h>
28
#ifdef _MSC_VER
29
#pragma warning(pop)
30
#endif
31
32
#include <utils/common/MsgHandler.h>
33
#include <utils/common/ToString.h>
34
#include "ParquetFormatter.h"
35
36
37
// ===========================================================================
38
// helper class definitions
39
// ===========================================================================
40
class ArrowOStreamWrapper : public arrow::io::OutputStream {
41
public:
42
ArrowOStreamWrapper(std::ostream& out)
43
: myOStream(out), myAmOpen(true) {}
44
45
arrow::Status Close() override {
46
myAmOpen = false;
47
return arrow::Status::OK();
48
}
49
50
arrow::Status Flush() override {
51
myOStream.flush();
52
return arrow::Status::OK();
53
}
54
55
arrow::Result<int64_t> Tell() const override {
56
return myOStream.tellp();
57
}
58
59
bool closed() const override {
60
return !myAmOpen;
61
}
62
63
arrow::Status Write(const void* data, int64_t nbytes) override {
64
if (!myAmOpen) {
65
return arrow::Status::IOError("Write on closed stream");
66
}
67
myOStream.write(reinterpret_cast<const char*>(data), nbytes);
68
if (!myOStream) {
69
return arrow::Status::IOError("Failed to write to ostream");
70
}
71
return arrow::Status::OK();
72
}
73
74
private:
75
std::ostream& myOStream;
76
bool myAmOpen;
77
};
78
79
80
// ===========================================================================
81
// member method definitions
82
// ===========================================================================
83
ParquetFormatter::ParquetFormatter(const std::string& columnNames, const std::string& compression, const int batchSize)
84
: OutputFormatter(OutputFormatterType::PARQUET), myHeaderFormat(columnNames), myBatchSize(batchSize) {
85
if (compression == "snappy") {
86
myCompression = parquet::Compression::SNAPPY;
87
} else if (compression == "gzip") {
88
myCompression = parquet::Compression::GZIP;
89
} else if (compression == "brotli") {
90
myCompression = parquet::Compression::BROTLI;
91
} else if (compression == "zstd") {
92
myCompression = parquet::Compression::ZSTD;
93
} else if (compression == "lz4") {
94
myCompression = parquet::Compression::LZ4;
95
} else if (compression == "bz2") {
96
myCompression = parquet::Compression::BZ2;
97
} else if (compression != "" && compression != "uncompressed") {
98
WRITE_ERRORF("Unknown compression: %", compression);
99
}
100
if (!arrow::util::Codec::IsAvailable(myCompression)) {
101
WRITE_WARNINGF("Compression '%' not available, falling back to uncompressed.", compression);
102
myCompression = parquet::Compression::UNCOMPRESSED;
103
}
104
}
105
106
void
107
ParquetFormatter::openTag(std::ostream& /* into */, const std::string& xmlElement) {
108
myXMLStack.push_back((int)myValues.size());
109
if (!myWroteHeader) {
110
myCurrentTag = xmlElement;
111
}
112
if (myMaxDepth == (int)myXMLStack.size() && myWroteHeader && myCurrentTag != xmlElement) {
113
WRITE_WARNINGF("Encountered mismatch in XML tags (expected % but got %). Column names may be incorrect.", myCurrentTag, xmlElement);
114
}
115
}
116
117
118
void
119
ParquetFormatter::openTag(std::ostream& /* into */, const SumoXMLTag& xmlElement) {
120
myXMLStack.push_back((int)myValues.size());
121
if (!myWroteHeader) {
122
myCurrentTag = toString(xmlElement);
123
}
124
if (myMaxDepth == (int)myXMLStack.size() && myWroteHeader && myCurrentTag != toString(xmlElement)) {
125
WRITE_WARNINGF("Encountered mismatch in XML tags (expected % but got %). Column names may be incorrect.", myCurrentTag, toString(xmlElement));
126
}
127
}
128
129
130
bool
131
ParquetFormatter::closeTag(std::ostream& into, const std::string& /* comment */) {
132
if (myMaxDepth == 0) {
133
myMaxDepth = (int)myXMLStack.size();
134
}
135
if (myMaxDepth == (int)myXMLStack.size() && !myWroteHeader) {
136
if (!myCheckColumns) {
137
WRITE_WARNING("Column based formats are still experimental. Autodetection only works for homogeneous output.");
138
}
139
auto arrow_stream = std::make_shared<ArrowOStreamWrapper>(into);
140
std::shared_ptr<parquet::WriterProperties> props = parquet::WriterProperties::Builder().compression(myCompression)->build();
141
myParquetWriter = *parquet::arrow::FileWriter::Open(*mySchema, arrow::default_memory_pool(), arrow_stream, props);
142
myWroteHeader = true;
143
}
144
bool writeBatch = false;
145
if ((int)myXMLStack.size() == myMaxDepth) {
146
if (myCheckColumns && myExpectedAttrs != mySeenAttrs) {
147
for (int i = 0; i < (int)myExpectedAttrs.size(); ++i) {
148
if (myExpectedAttrs.test(i) && !mySeenAttrs.test(i)) {
149
WRITE_ERRORF("Incomplete attribute set, '%' is missing. This file format does not support Parquet output yet.",
150
toString((SumoXMLAttr)i));
151
}
152
}
153
}
154
int index = 0;
155
for (auto& builder : myBuilders) {
156
const auto val = myValues[index++];
157
PARQUET_THROW_NOT_OK(val == nullptr ? builder->AppendNull() : builder->AppendScalar(*val));
158
}
159
writeBatch = myBuilders.back()->length() == myBatchSize;
160
mySeenAttrs.reset();
161
}
162
if (writeBatch || myXMLStack.empty()) {
163
std::vector<std::shared_ptr<arrow::Array> > data;
164
for (auto& builder : myBuilders) {
165
std::shared_ptr<arrow::Array> column;
166
PARQUET_THROW_NOT_OK(builder->Finish(&column));
167
data.push_back(column);
168
// builder.reset();
169
}
170
auto batch = arrow::RecordBatch::Make(mySchema, data.back()->length(), data);
171
PARQUET_THROW_NOT_OK(myParquetWriter->WriteRecordBatch(*batch));
172
}
173
if (!myXMLStack.empty()) {
174
while ((int)myValues.size() > myXMLStack.back()) {
175
if (!myWroteHeader) {
176
mySchema = *mySchema->RemoveField(mySchema->num_fields() - 1);
177
myBuilders.pop_back();
178
}
179
myValues.pop_back();
180
}
181
myXMLStack.pop_back();
182
}
183
return false;
184
}
185
186
187
/****************************************************************************/
188
189