Path: blob/main/src/utils/iodevices/ParquetFormatter.cpp
169678 views
/****************************************************************************/1// Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo2// Copyright (C) 2012-2025 German Aerospace Center (DLR) and others.3// This program and the accompanying materials are made available under the4// terms of the Eclipse Public License 2.0 which is available at5// https://www.eclipse.org/legal/epl-2.0/6// This Source Code may also be made available under the following Secondary7// Licenses when the conditions for such availability set forth in the Eclipse8// Public License 2.0 are satisfied: GNU General Public License, version 29// or later which is available at10// https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html11// SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later12/****************************************************************************/13/// @file ParquetFormatter.cpp14/// @author Michael Behrisch15/// @date 2025-06-1716///17// An output formatter for Parquet files18/****************************************************************************/19#include <config.h>2021#ifdef _MSC_VER22#pragma warning(push)23/* Disable warning about changed memory layout due to virtual base class */24#pragma warning(disable: 4435)25#endif26#include <arrow/io/api.h>27#ifdef _MSC_VER28#pragma warning(pop)29#endif3031#include <utils/common/MsgHandler.h>32#include <utils/common/ToString.h>33#include "ParquetFormatter.h"343536// ===========================================================================37// helper class definitions38// ===========================================================================39class ArrowOStreamWrapper : public arrow::io::OutputStream {40public:41ArrowOStreamWrapper(std::ostream& out)42: myOStream(out), myAmOpen(true) {}4344arrow::Status Close() override {45myAmOpen = false;46return arrow::Status::OK();47}4849arrow::Status Flush() override {50myOStream.flush();51return arrow::Status::OK();52}5354arrow::Result<int64_t> Tell() const override {55return myOStream.tellp();56}5758bool closed() const override {59return !myAmOpen;60}6162arrow::Status Write(const void* data, int64_t nbytes) override {63if (!myAmOpen) {64return arrow::Status::IOError("Write on closed stream");65}66myOStream.write(reinterpret_cast<const char*>(data), nbytes);67if (!myOStream) {68return arrow::Status::IOError("Failed to write to ostream");69}70return arrow::Status::OK();71}7273private:74std::ostream& myOStream;75bool myAmOpen;76};777879// ===========================================================================80// member method definitions81// ===========================================================================82ParquetFormatter::ParquetFormatter(const std::string& columnNames, const std::string& compression, const int batchSize)83: OutputFormatter(OutputFormatterType::PARQUET), myHeaderFormat(columnNames), myBatchSize(batchSize) {84if (compression == "snappy") {85myCompression = parquet::Compression::SNAPPY;86} else if (compression == "gzip") {87myCompression = parquet::Compression::GZIP;88} else if (compression == "brotli") {89myCompression = parquet::Compression::BROTLI;90} else if (compression == "zstd") {91myCompression = parquet::Compression::ZSTD;92} else if (compression == "lz4") {93myCompression = parquet::Compression::LZ4;94} else if (compression == "bz2") {95myCompression = parquet::Compression::BZ2;96} else if (compression != "" && compression != "uncompressed") {97WRITE_ERRORF("Unknown compression: %", compression);98}99if (!arrow::util::Codec::IsAvailable(myCompression)) {100WRITE_WARNINGF("Compression '%' not available, falling back to uncompressed.", compression);101myCompression = parquet::Compression::UNCOMPRESSED;102}103}104105void106ParquetFormatter::openTag(std::ostream& /* into */, const std::string& xmlElement) {107myXMLStack.push_back((int)myValues.size());108if (!myWroteHeader) {109myCurrentTag = xmlElement;110}111if (myMaxDepth == (int)myXMLStack.size() && myWroteHeader && myCurrentTag != xmlElement) {112WRITE_WARNINGF("Encountered mismatch in XML tags (expected % but got %). Column names may be incorrect.", myCurrentTag, xmlElement);113}114}115116117void118ParquetFormatter::openTag(std::ostream& /* into */, const SumoXMLTag& xmlElement) {119myXMLStack.push_back((int)myValues.size());120if (!myWroteHeader) {121myCurrentTag = toString(xmlElement);122}123if (myMaxDepth == (int)myXMLStack.size() && myWroteHeader && myCurrentTag != toString(xmlElement)) {124WRITE_WARNINGF("Encountered mismatch in XML tags (expected % but got %). Column names may be incorrect.", myCurrentTag, toString(xmlElement));125}126}127128129bool130ParquetFormatter::closeTag(std::ostream& into, const std::string& /* comment */) {131if (myMaxDepth == 0) {132myMaxDepth = (int)myXMLStack.size();133}134if (myMaxDepth == (int)myXMLStack.size() && !myWroteHeader) {135if (!myCheckColumns) {136WRITE_WARNING("Column based formats are still experimental. Autodetection only works for homogeneous output.");137}138auto arrow_stream = std::make_shared<ArrowOStreamWrapper>(into);139std::shared_ptr<parquet::WriterProperties> props = parquet::WriterProperties::Builder().compression(myCompression)->build();140myParquetWriter = *parquet::arrow::FileWriter::Open(*mySchema, arrow::default_memory_pool(), arrow_stream, props);141myWroteHeader = true;142}143bool writeBatch = false;144if ((int)myXMLStack.size() == myMaxDepth) {145if (myCheckColumns && myExpectedAttrs != mySeenAttrs) {146for (int i = 0; i < (int)myExpectedAttrs.size(); ++i) {147if (myExpectedAttrs.test(i) && !mySeenAttrs.test(i)) {148WRITE_ERRORF("Incomplete attribute set, '%' is missing. This file format does not support Parquet output yet.",149toString((SumoXMLAttr)i));150}151}152}153int index = 0;154for (auto& builder : myBuilders) {155const auto val = myValues[index++];156PARQUET_THROW_NOT_OK(val == nullptr ? builder->AppendNull() : builder->AppendScalar(*val));157}158writeBatch = myBuilders.back()->length() == myBatchSize;159mySeenAttrs.reset();160}161if (writeBatch || myXMLStack.empty()) {162std::vector<std::shared_ptr<arrow::Array> > data;163for (auto& builder : myBuilders) {164std::shared_ptr<arrow::Array> column;165PARQUET_THROW_NOT_OK(builder->Finish(&column));166data.push_back(column);167// builder.reset();168}169auto batch = arrow::RecordBatch::Make(mySchema, data.back()->length(), data);170PARQUET_THROW_NOT_OK(myParquetWriter->WriteRecordBatch(*batch));171}172if (!myXMLStack.empty()) {173while ((int)myValues.size() > myXMLStack.back()) {174if (!myWroteHeader) {175mySchema = *mySchema->RemoveField(mySchema->num_fields() - 1);176myBuilders.pop_back();177}178myValues.pop_back();179}180myXMLStack.pop_back();181}182return false;183}184185186/****************************************************************************/187188189