Path: blob/main/crates/polars-parquet/src/parquet/parquet_bridge.rs
8512 views
// Bridges structs from thrift-generated code to rust enums.12pub use polars_utils::compression::*;3#[cfg(feature = "serde")]4use serde::{Deserialize, Serialize};56use super::thrift_format::{7BoundaryOrder as ParquetBoundaryOrder, CompressionCodec, DataPageHeader, DataPageHeaderV2,8DecimalType, Encoding as ParquetEncoding, FieldRepetitionType, IntType,9LogicalType as ParquetLogicalType, PageType as ParquetPageType, TimeType,10TimeUnit as ParquetTimeUnit, TimestampType,11};12use crate::parquet::error::ParquetError;1314/// The repetition of a parquet field15#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]16#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]17pub enum Repetition {18/// When the field has no null values19Required,20/// When the field may have null values21Optional,22/// When the field may be repeated (list field)23Repeated,24}2526impl TryFrom<FieldRepetitionType> for Repetition {27type Error = ParquetError;2829fn try_from(repetition: FieldRepetitionType) -> Result<Self, Self::Error> {30Ok(match repetition {31FieldRepetitionType::REQUIRED => Repetition::Required,32FieldRepetitionType::OPTIONAL => Repetition::Optional,33FieldRepetitionType::REPEATED => Repetition::Repeated,34_ => return Err(ParquetError::oos("Thrift out of range")),35})36}37}3839impl From<Repetition> for FieldRepetitionType {40fn from(repetition: Repetition) -> Self {41match repetition {42Repetition::Required => FieldRepetitionType::REQUIRED,43Repetition::Optional => FieldRepetitionType::OPTIONAL,44Repetition::Repeated => FieldRepetitionType::REPEATED,45}46}47}4849#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]50#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]51pub enum Compression {52Uncompressed,53Snappy,54Gzip,55Lzo,56Brotli,57Lz4,58Zstd,59Lz4Raw,60}6162impl TryFrom<CompressionCodec> for Compression {63type Error = ParquetError;6465fn try_from(codec: CompressionCodec) -> Result<Self, Self::Error> {66Ok(match codec {67CompressionCodec::UNCOMPRESSED => Compression::Uncompressed,68CompressionCodec::SNAPPY => Compression::Snappy,69CompressionCodec::GZIP => Compression::Gzip,70CompressionCodec::LZO => Compression::Lzo,71CompressionCodec::BROTLI => Compression::Brotli,72CompressionCodec::LZ4 => Compression::Lz4,73CompressionCodec::ZSTD => Compression::Zstd,74CompressionCodec::LZ4_RAW => Compression::Lz4Raw,75_ => return Err(ParquetError::oos("Thrift out of range")),76})77}78}7980impl From<Compression> for CompressionCodec {81fn from(codec: Compression) -> Self {82match codec {83Compression::Uncompressed => CompressionCodec::UNCOMPRESSED,84Compression::Snappy => CompressionCodec::SNAPPY,85Compression::Gzip => CompressionCodec::GZIP,86Compression::Lzo => CompressionCodec::LZO,87Compression::Brotli => CompressionCodec::BROTLI,88Compression::Lz4 => CompressionCodec::LZ4,89Compression::Zstd => CompressionCodec::ZSTD,90Compression::Lz4Raw => CompressionCodec::LZ4_RAW,91}92}93}9495/// Defines the compression settings for writing a parquet file.96///97/// If None is provided as a compression setting, then the default compression level is used.98#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]99pub enum CompressionOptions {100Uncompressed,101Snappy,102Gzip(Option<GzipLevel>),103Lzo,104Brotli(Option<BrotliLevel>),105Lz4,106Zstd(Option<ZstdLevel>),107Lz4Raw,108}109110impl From<CompressionOptions> for Compression {111fn from(value: CompressionOptions) -> Self {112match value {113CompressionOptions::Uncompressed => Compression::Uncompressed,114CompressionOptions::Snappy => Compression::Snappy,115CompressionOptions::Gzip(_) => Compression::Gzip,116CompressionOptions::Lzo => Compression::Lzo,117CompressionOptions::Brotli(_) => Compression::Brotli,118CompressionOptions::Lz4 => Compression::Lz4,119CompressionOptions::Zstd(_) => Compression::Zstd,120CompressionOptions::Lz4Raw => Compression::Lz4Raw,121}122}123}124125impl From<CompressionOptions> for CompressionCodec {126fn from(codec: CompressionOptions) -> Self {127match codec {128CompressionOptions::Uncompressed => CompressionCodec::UNCOMPRESSED,129CompressionOptions::Snappy => CompressionCodec::SNAPPY,130CompressionOptions::Gzip(_) => CompressionCodec::GZIP,131CompressionOptions::Lzo => CompressionCodec::LZO,132CompressionOptions::Brotli(_) => CompressionCodec::BROTLI,133CompressionOptions::Lz4 => CompressionCodec::LZ4,134CompressionOptions::Zstd(_) => CompressionCodec::ZSTD,135CompressionOptions::Lz4Raw => CompressionCodec::LZ4_RAW,136}137}138}139140#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]141pub enum PageType {142DataPage,143DataPageV2,144DictionaryPage,145}146147impl TryFrom<ParquetPageType> for PageType {148type Error = ParquetError;149150fn try_from(type_: ParquetPageType) -> Result<Self, Self::Error> {151Ok(match type_ {152ParquetPageType::DATA_PAGE => PageType::DataPage,153ParquetPageType::DATA_PAGE_V2 => PageType::DataPageV2,154ParquetPageType::DICTIONARY_PAGE => PageType::DictionaryPage,155_ => return Err(ParquetError::oos("Thrift out of range")),156})157}158}159160impl From<PageType> for ParquetPageType {161fn from(type_: PageType) -> Self {162match type_ {163PageType::DataPage => ParquetPageType::DATA_PAGE,164PageType::DataPageV2 => ParquetPageType::DATA_PAGE_V2,165PageType::DictionaryPage => ParquetPageType::DICTIONARY_PAGE,166}167}168}169170#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)]171pub enum Encoding {172/// Default encoding.173/// BOOLEAN - 1 bit per value. 0 is false; 1 is true.174/// INT32 - 4 bytes per value. Stored as little-endian.175/// INT64 - 8 bytes per value. Stored as little-endian.176/// FLOAT - 4 bytes per value. IEEE. Stored as little-endian.177/// DOUBLE - 8 bytes per value. IEEE. Stored as little-endian.178/// BYTE_ARRAY - 4 byte length stored as little endian, followed by bytes.179/// FIXED_LEN_BYTE_ARRAY - Just the bytes.180Plain,181/// Deprecated: Dictionary encoding. The values in the dictionary are encoded in the182/// plain type.183/// in a data page use RLE_DICTIONARY instead.184/// in a Dictionary page use PLAIN instead185PlainDictionary,186/// Group packed run length encoding. Usable for definition/repetition levels187/// encoding and Booleans (on one bit: 0 is false; 1 is true.)188Rle,189/// Bit packed encoding. This can only be used if the data has a known max190/// width. Usable for definition/repetition levels encoding.191BitPacked,192/// Delta encoding for integers. This can be used for int columns and works best193/// on sorted data194DeltaBinaryPacked,195/// Encoding for byte arrays to separate the length values and the data. The lengths196/// are encoded using DELTA_BINARY_PACKED197DeltaLengthByteArray,198/// Incremental-encoded byte array. Prefix lengths are encoded using DELTA_BINARY_PACKED.199/// Suffixes are stored as delta length byte arrays.200DeltaByteArray,201/// Dictionary encoding: the ids are encoded using the RLE encoding202RleDictionary,203/// Encoding for floating-point data.204/// K byte-streams are created where K is the size in bytes of the data type.205/// The individual bytes of an FP value are scattered to the corresponding stream and206/// the streams are concatenated.207/// This itself does not reduce the size of the data but can lead to better compression208/// afterwards.209ByteStreamSplit,210}211212impl TryFrom<ParquetEncoding> for Encoding {213type Error = ParquetError;214215fn try_from(encoding: ParquetEncoding) -> Result<Self, Self::Error> {216Ok(match encoding {217ParquetEncoding::PLAIN => Encoding::Plain,218ParquetEncoding::PLAIN_DICTIONARY => Encoding::PlainDictionary,219ParquetEncoding::RLE => Encoding::Rle,220ParquetEncoding::BIT_PACKED => Encoding::BitPacked,221ParquetEncoding::DELTA_BINARY_PACKED => Encoding::DeltaBinaryPacked,222ParquetEncoding::DELTA_LENGTH_BYTE_ARRAY => Encoding::DeltaLengthByteArray,223ParquetEncoding::DELTA_BYTE_ARRAY => Encoding::DeltaByteArray,224ParquetEncoding::RLE_DICTIONARY => Encoding::RleDictionary,225ParquetEncoding::BYTE_STREAM_SPLIT => Encoding::ByteStreamSplit,226_ => return Err(ParquetError::oos("Thrift out of range")),227})228}229}230231impl From<Encoding> for ParquetEncoding {232fn from(encoding: Encoding) -> Self {233match encoding {234Encoding::Plain => ParquetEncoding::PLAIN,235Encoding::PlainDictionary => ParquetEncoding::PLAIN_DICTIONARY,236Encoding::Rle => ParquetEncoding::RLE,237Encoding::BitPacked => ParquetEncoding::BIT_PACKED,238Encoding::DeltaBinaryPacked => ParquetEncoding::DELTA_BINARY_PACKED,239Encoding::DeltaLengthByteArray => ParquetEncoding::DELTA_LENGTH_BYTE_ARRAY,240Encoding::DeltaByteArray => ParquetEncoding::DELTA_BYTE_ARRAY,241Encoding::RleDictionary => ParquetEncoding::RLE_DICTIONARY,242Encoding::ByteStreamSplit => ParquetEncoding::BYTE_STREAM_SPLIT,243}244}245}246247/// Enum to annotate whether lists of min/max elements inside ColumnIndex248/// are ordered and if so, in which direction.249#[derive(Debug, Eq, PartialEq, Hash, Clone, Copy, Default)]250pub enum BoundaryOrder {251#[default]252Unordered,253Ascending,254Descending,255}256257impl TryFrom<ParquetBoundaryOrder> for BoundaryOrder {258type Error = ParquetError;259260fn try_from(encoding: ParquetBoundaryOrder) -> Result<Self, Self::Error> {261Ok(match encoding {262ParquetBoundaryOrder::UNORDERED => BoundaryOrder::Unordered,263ParquetBoundaryOrder::ASCENDING => BoundaryOrder::Ascending,264ParquetBoundaryOrder::DESCENDING => BoundaryOrder::Descending,265_ => return Err(ParquetError::oos("BoundaryOrder Thrift value out of range")),266})267}268}269270impl From<BoundaryOrder> for ParquetBoundaryOrder {271fn from(encoding: BoundaryOrder) -> Self {272match encoding {273BoundaryOrder::Unordered => ParquetBoundaryOrder::UNORDERED,274BoundaryOrder::Ascending => ParquetBoundaryOrder::ASCENDING,275BoundaryOrder::Descending => ParquetBoundaryOrder::DESCENDING,276}277}278}279280pub trait DataPageHeaderExt {281fn encoding(&self) -> Encoding;282fn repetition_level_encoding(&self) -> Encoding;283fn definition_level_encoding(&self) -> Encoding;284}285286impl DataPageHeaderExt for DataPageHeader {287fn encoding(&self) -> Encoding {288self.encoding.try_into().unwrap()289}290291fn repetition_level_encoding(&self) -> Encoding {292self.repetition_level_encoding.try_into().unwrap()293}294295fn definition_level_encoding(&self) -> Encoding {296self.definition_level_encoding.try_into().unwrap()297}298}299300impl DataPageHeaderExt for DataPageHeaderV2 {301fn encoding(&self) -> Encoding {302self.encoding.try_into().unwrap()303}304305fn repetition_level_encoding(&self) -> Encoding {306Encoding::Rle307}308309fn definition_level_encoding(&self) -> Encoding {310Encoding::Rle311}312}313314#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]315#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]316pub enum TimeUnit {317Milliseconds,318Microseconds,319Nanoseconds,320}321322impl From<ParquetTimeUnit> for TimeUnit {323fn from(encoding: ParquetTimeUnit) -> Self {324match encoding {325ParquetTimeUnit::MILLIS(_) => TimeUnit::Milliseconds,326ParquetTimeUnit::MICROS(_) => TimeUnit::Microseconds,327ParquetTimeUnit::NANOS(_) => TimeUnit::Nanoseconds,328}329}330}331332impl From<TimeUnit> for ParquetTimeUnit {333fn from(unit: TimeUnit) -> Self {334match unit {335TimeUnit::Milliseconds => ParquetTimeUnit::MILLIS(Default::default()),336TimeUnit::Microseconds => ParquetTimeUnit::MICROS(Default::default()),337TimeUnit::Nanoseconds => ParquetTimeUnit::NANOS(Default::default()),338}339}340}341342/// Enum of all valid logical integer types343#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]344#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]345pub enum IntegerType {346Int8,347Int16,348Int32,349Int64,350UInt8,351UInt16,352UInt32,353UInt64,354}355356#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]357#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]358pub enum PrimitiveLogicalType {359String,360Enum,361Decimal(usize, usize),362Date,363Time {364unit: TimeUnit,365is_adjusted_to_utc: bool,366},367Timestamp {368unit: TimeUnit,369is_adjusted_to_utc: bool,370},371Integer(IntegerType),372Unknown,373Json,374Bson,375Uuid,376Float16,377}378379#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]380#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]381pub enum GroupLogicalType {382Map,383List,384}385386impl From<GroupLogicalType> for ParquetLogicalType {387fn from(type_: GroupLogicalType) -> Self {388match type_ {389GroupLogicalType::Map => ParquetLogicalType::MAP(Default::default()),390GroupLogicalType::List => ParquetLogicalType::LIST(Default::default()),391}392}393}394395impl From<(i32, bool)> for IntegerType {396fn from((bit_width, is_signed): (i32, bool)) -> Self {397match (bit_width, is_signed) {398(8, true) => IntegerType::Int8,399(16, true) => IntegerType::Int16,400(32, true) => IntegerType::Int32,401(64, true) => IntegerType::Int64,402(8, false) => IntegerType::UInt8,403(16, false) => IntegerType::UInt16,404(32, false) => IntegerType::UInt32,405(64, false) => IntegerType::UInt64,406// The above are the only possible annotations for parquet's int32. Anything else407// is a deviation to the parquet specification and we ignore408_ => IntegerType::Int32,409}410}411}412413impl From<IntegerType> for (usize, bool) {414fn from(type_: IntegerType) -> (usize, bool) {415match type_ {416IntegerType::Int8 => (8, true),417IntegerType::Int16 => (16, true),418IntegerType::Int32 => (32, true),419IntegerType::Int64 => (64, true),420IntegerType::UInt8 => (8, false),421IntegerType::UInt16 => (16, false),422IntegerType::UInt32 => (32, false),423IntegerType::UInt64 => (64, false),424}425}426}427428impl TryFrom<ParquetLogicalType> for PrimitiveLogicalType {429type Error = ParquetError;430431fn try_from(type_: ParquetLogicalType) -> Result<Self, Self::Error> {432Ok(match type_ {433ParquetLogicalType::STRING(_) => PrimitiveLogicalType::String,434ParquetLogicalType::ENUM(_) => PrimitiveLogicalType::Enum,435ParquetLogicalType::DECIMAL(decimal) => PrimitiveLogicalType::Decimal(436decimal.precision.try_into()?,437decimal.scale.try_into()?,438),439ParquetLogicalType::DATE(_) => PrimitiveLogicalType::Date,440ParquetLogicalType::TIME(time) => PrimitiveLogicalType::Time {441unit: time.unit.into(),442is_adjusted_to_utc: time.is_adjusted_to_u_t_c,443},444ParquetLogicalType::TIMESTAMP(time) => PrimitiveLogicalType::Timestamp {445unit: time.unit.into(),446is_adjusted_to_utc: time.is_adjusted_to_u_t_c,447},448ParquetLogicalType::INTEGER(int) => {449PrimitiveLogicalType::Integer((int.bit_width as i32, int.is_signed).into())450},451ParquetLogicalType::UNKNOWN(_) => PrimitiveLogicalType::Unknown,452ParquetLogicalType::JSON(_) => PrimitiveLogicalType::Json,453ParquetLogicalType::BSON(_) => PrimitiveLogicalType::Bson,454ParquetLogicalType::UUID(_) => PrimitiveLogicalType::Uuid,455ParquetLogicalType::FLOAT16(_) => PrimitiveLogicalType::Float16,456_ => return Err(ParquetError::oos("LogicalType value out of range")),457})458}459}460461impl TryFrom<ParquetLogicalType> for GroupLogicalType {462type Error = ParquetError;463464fn try_from(type_: ParquetLogicalType) -> Result<Self, Self::Error> {465Ok(match type_ {466ParquetLogicalType::LIST(_) => GroupLogicalType::List,467ParquetLogicalType::MAP(_) => GroupLogicalType::Map,468_ => return Err(ParquetError::oos("LogicalType value out of range")),469})470}471}472473impl From<PrimitiveLogicalType> for ParquetLogicalType {474fn from(type_: PrimitiveLogicalType) -> Self {475match type_ {476PrimitiveLogicalType::String => ParquetLogicalType::STRING(Default::default()),477PrimitiveLogicalType::Enum => ParquetLogicalType::ENUM(Default::default()),478PrimitiveLogicalType::Decimal(precision, scale) => {479ParquetLogicalType::DECIMAL(DecimalType {480precision: precision as i32,481scale: scale as i32,482})483},484PrimitiveLogicalType::Date => ParquetLogicalType::DATE(Default::default()),485PrimitiveLogicalType::Time {486unit,487is_adjusted_to_utc,488} => ParquetLogicalType::TIME(TimeType {489unit: unit.into(),490is_adjusted_to_u_t_c: is_adjusted_to_utc,491}),492PrimitiveLogicalType::Timestamp {493unit,494is_adjusted_to_utc,495} => ParquetLogicalType::TIMESTAMP(TimestampType {496unit: unit.into(),497is_adjusted_to_u_t_c: is_adjusted_to_utc,498}),499PrimitiveLogicalType::Integer(integer) => {500let (bit_width, is_signed) = integer.into();501ParquetLogicalType::INTEGER(IntType {502bit_width: bit_width as i8,503is_signed,504})505},506PrimitiveLogicalType::Unknown => ParquetLogicalType::UNKNOWN(Default::default()),507PrimitiveLogicalType::Json => ParquetLogicalType::JSON(Default::default()),508PrimitiveLogicalType::Bson => ParquetLogicalType::BSON(Default::default()),509PrimitiveLogicalType::Uuid => ParquetLogicalType::UUID(Default::default()),510PrimitiveLogicalType::Float16 => ParquetLogicalType::FLOAT16(Default::default()),511}512}513}514515#[cfg(test)]516mod tests {517use super::*;518519#[test]520fn round_trip_primitive() -> Result<(), ParquetError> {521use PrimitiveLogicalType::*;522let a = vec![523String,524Enum,525Decimal(3, 1),526Date,527Time {528unit: TimeUnit::Milliseconds,529is_adjusted_to_utc: true,530},531Timestamp {532unit: TimeUnit::Milliseconds,533is_adjusted_to_utc: true,534},535Integer(IntegerType::Int16),536Unknown,537Json,538Bson,539Uuid,540];541for a in a {542let c: ParquetLogicalType = a.into();543let e: PrimitiveLogicalType = c.try_into()?;544assert_eq!(e, a);545}546Ok(())547}548549#[test]550fn round_trip_encoding() -> Result<(), ParquetError> {551use Encoding::*;552let a = vec![553Plain,554PlainDictionary,555Rle,556BitPacked,557DeltaBinaryPacked,558DeltaLengthByteArray,559DeltaByteArray,560RleDictionary,561ByteStreamSplit,562];563for a in a {564let c: ParquetEncoding = a.into();565let e: Encoding = c.try_into()?;566assert_eq!(e, a);567}568Ok(())569}570571#[test]572fn round_compression() -> Result<(), ParquetError> {573use Compression::*;574let a = vec![Uncompressed, Snappy, Gzip, Lzo, Brotli, Lz4, Zstd, Lz4Raw];575for a in a {576let c: CompressionCodec = a.into();577let e: Compression = c.try_into()?;578assert_eq!(e, a);579}580Ok(())581}582}583584585