#pragma once
#include <config.h>
#include <ostream>
#ifdef _MSC_VER
#pragma warning(suppress: 5032)
#pragma warning(push)
#pragma warning(disable: 4100)
#pragma warning(disable: 4266)
#pragma warning(disable: 4324)
#pragma warning(disable: 4355)
#pragma warning(disable: 4435)
#pragma warning(disable: 4458)
#pragma warning(disable: 4800)
#endif
#include <arrow/api.h>
#include <parquet/arrow/writer.h>
#ifdef _MSC_VER
#pragma warning(suppress: 5031)
#pragma warning(pop)
#endif
#include <utils/common/ToString.h>
#include "OutputFormatter.h"
class ParquetFormatter : public OutputFormatter {
public:
ParquetFormatter(const std::string& columnNames, const std::string& compression = "", const int batchSize = 1000000);
virtual ~ParquetFormatter() { }
void openTag(std::ostream& into, const std::string& xmlElement);
void openTag(std::ostream& into, const SumoXMLTag& xmlElement);
bool closeTag(std::ostream& into, const std::string& comment = "");
template <class T>
void writeAttr(std::ostream& , const SumoXMLAttr attr, const T& val, const bool isNull = false) {
checkAttr(attr);
if (!myWroteHeader) {
mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(toString(attr)), arrow::utf8()));
myBuilders.push_back(std::make_shared<arrow::StringBuilder>());
}
myValues.push_back(isNull ? nullptr : std::make_shared<arrow::StringScalar>(toString(val)));
}
template <class T>
void writeAttr(std::ostream& , const std::string& attr, const T& val) {
assert(!myCheckColumns);
if (!myWroteHeader) {
mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(attr), arrow::utf8()));
myBuilders.push_back(std::make_shared<arrow::StringBuilder>());
}
myValues.push_back(std::make_shared<arrow::StringScalar>(toString(val)));
}
void writeTime(std::ostream& into, const SumoXMLAttr attr, const SUMOTime val) {
if (!gHumanReadableTime) {
if (!myWroteHeader) {
mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(toString(attr)), arrow::float64()));
myBuilders.push_back(std::make_shared<arrow::DoubleBuilder>());
}
myValues.push_back(std::make_shared<arrow::DoubleScalar>(STEPS2TIME(val)));
return;
}
writeAttr(into, attr, time2string(val));
}
bool wroteHeader() const {
return myWroteHeader;
}
void setExpectedAttributes(const SumoXMLAttrMask& expected, const int depth = 2) {
myExpectedAttrs = expected;
myMaxDepth = depth;
myCheckColumns = expected.any();
}
private:
inline const std::string getAttrString(const std::string& attrString) {
if (myHeaderFormat == "plain") {
return attrString;
}
if (myHeaderFormat == "auto") {
for (const auto& field : mySchema->fields()) {
if (field->name() == attrString) {
return myCurrentTag + "_" + attrString;
}
}
return attrString;
}
return myCurrentTag + "_" + attrString;
}
inline void checkAttr(const SumoXMLAttr attr) {
if (myCheckColumns && myMaxDepth == (int)myXMLStack.size()) {
mySeenAttrs.set(attr);
if (!myExpectedAttrs.test(attr)) {
throw ProcessError(TLF("Unexpected attribute '%', this file format does not support Parquet output yet.", toString(attr)));
}
}
}
const std::string myHeaderFormat;
parquet::Compression::type myCompression = parquet::Compression::UNCOMPRESSED;
const int myBatchSize;
std::string myCurrentTag;
std::shared_ptr<arrow::Schema> mySchema = arrow::schema({});
std::unique_ptr<parquet::arrow::FileWriter> myParquetWriter;
std::vector<std::shared_ptr<arrow::ArrayBuilder> > myBuilders;
std::vector<int> myXMLStack;
std::vector<std::shared_ptr<arrow::Scalar> > myValues;
int myMaxDepth = 0;
bool myWroteHeader = false;
bool myCheckColumns = false;
SumoXMLAttrMask myExpectedAttrs;
SumoXMLAttrMask mySeenAttrs;
};
template <>
inline void ParquetFormatter::writeAttr(std::ostream& into, const SumoXMLAttr attr, const double& val, const bool isNull) {
checkAttr(attr);
if (attr == SUMO_ATTR_X || attr == SUMO_ATTR_Y || into.precision() > 2) {
if (!myWroteHeader) {
mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(toString(attr)), arrow::float64()));
myBuilders.push_back(std::make_shared<arrow::DoubleBuilder>());
}
myValues.push_back(isNull ? nullptr : std::make_shared<arrow::DoubleScalar>(val));
} else {
if (!myWroteHeader) {
mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(toString(attr)), arrow::float32()));
myBuilders.push_back(std::make_shared<arrow::FloatBuilder>());
}
myValues.push_back(isNull ? nullptr : std::make_shared<arrow::FloatScalar>((float)val));
}
}
template <>
inline void ParquetFormatter::writeAttr(std::ostream& , const SumoXMLAttr attr, const int& val, const bool isNull) {
checkAttr(attr);
if (!myWroteHeader) {
mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(toString(attr)), arrow::int32()));
myBuilders.push_back(std::make_shared<arrow::Int32Builder>());
}
myValues.push_back(isNull ? nullptr : std::make_shared<arrow::Int32Scalar>(val));
}
template <>
inline void ParquetFormatter::writeAttr(std::ostream& into, const std::string& attr, const double& val) {
assert(!myCheckColumns);
if (into.precision() > 2) {
if (!myWroteHeader) {
mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(attr), arrow::float64()));
myBuilders.push_back(std::make_shared<arrow::DoubleBuilder>());
}
myValues.push_back(std::make_shared<arrow::DoubleScalar>(val));
} else {
if (!myWroteHeader) {
mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(attr), arrow::float32()));
myBuilders.push_back(std::make_shared<arrow::FloatBuilder>());
}
myValues.push_back(std::make_shared<arrow::FloatScalar>((float)val));
}
}
template <>
inline void ParquetFormatter::writeAttr(std::ostream& , const std::string& attr, const int& val) {
assert(!myCheckColumns);
if (!myWroteHeader) {
mySchema = *mySchema->AddField(mySchema->num_fields(), arrow::field(getAttrString(attr), arrow::int32()));
myBuilders.push_back(std::make_shared<arrow::Int32Builder>());
}
myValues.push_back(std::make_shared<arrow::Int32Scalar>(val));
}