Path: blob/main/crates/polars-parquet/src/parquet/metadata/column_chunk_metadata.rs
6940 views
use polars_parquet_format::{ColumnChunk, ColumnMetaData, Encoding};12use super::column_descriptor::ColumnDescriptor;3use crate::parquet::compression::Compression;4use crate::parquet::error::{ParquetError, ParquetResult};5use crate::parquet::schema::types::PhysicalType;6use crate::parquet::statistics::Statistics;78#[cfg(feature = "serde")]9mod serde_types {10pub use std::io::Cursor;1112pub use polars_parquet_format::thrift::protocol::{13TCompactInputProtocol, TCompactOutputProtocol,14};15pub use serde::de::Error as DeserializeError;16pub use serde::ser::Error as SerializeError;17pub use serde::{Deserialize, Deserializer, Serialize, Serializer};18}19#[cfg(feature = "serde")]20use serde_types::*;2122/// Metadata for a column chunk.23///24/// This contains the `ColumnDescriptor` associated with the chunk so that deserializers have25/// access to the descriptor (e.g. physical, converted, logical).26///27/// This struct is intentionally not `Clone`, as it is a huge struct.28#[derive(Debug)]29#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]30pub struct ColumnChunkMetadata {31#[cfg_attr(feature = "serde", serde(serialize_with = "serialize_column_chunk"))]32#[cfg_attr(33feature = "serde",34serde(deserialize_with = "deserialize_column_chunk")35)]36column_chunk: ColumnChunk,37column_descr: ColumnDescriptor,38}3940#[cfg(feature = "serde")]41fn serialize_column_chunk<S>(42column_chunk: &ColumnChunk,43serializer: S,44) -> std::result::Result<S::Ok, S::Error>45where46S: Serializer,47{48let mut buf = vec![];49let cursor = Cursor::new(&mut buf[..]);50let mut protocol = TCompactOutputProtocol::new(cursor);51column_chunk52.write_to_out_protocol(&mut protocol)53.map_err(S::Error::custom)?;54serializer.serialize_bytes(&buf)55}5657#[cfg(feature = "serde")]58fn deserialize_column_chunk<'de, D>(deserializer: D) -> std::result::Result<ColumnChunk, D::Error>59where60D: Deserializer<'de>,61{62use polars_utils::pl_serialize::deserialize_map_bytes;6364deserialize_map_bytes(deserializer, |b| {65let mut b = b.as_ref();66let mut protocol = TCompactInputProtocol::new(&mut b, usize::MAX);67ColumnChunk::read_from_in_protocol(&mut protocol).map_err(D::Error::custom)68})?69}7071// Represents common operations for a column chunk.72impl ColumnChunkMetadata {73/// Returns a new [`ColumnChunkMetadata`]74pub fn new(column_chunk: ColumnChunk, column_descr: ColumnDescriptor) -> Self {75Self {76column_chunk,77column_descr,78}79}8081/// File where the column chunk is stored.82///83/// If not set, assumed to belong to the same file as the metadata.84/// This path is relative to the current file.85pub fn file_path(&self) -> &Option<String> {86&self.column_chunk.file_path87}8889/// Byte offset in `file_path()`.90pub fn file_offset(&self) -> i64 {91self.column_chunk.file_offset92}9394/// Returns this column's [`ColumnChunk`]95pub fn column_chunk(&self) -> &ColumnChunk {96&self.column_chunk97}9899/// The column's [`ColumnMetaData`]100pub fn metadata(&self) -> &ColumnMetaData {101self.column_chunk.meta_data.as_ref().unwrap()102}103104/// The [`ColumnDescriptor`] for this column. This descriptor contains the physical and logical type105/// of the pages.106pub fn descriptor(&self) -> &ColumnDescriptor {107&self.column_descr108}109110/// The [`PhysicalType`] of this column.111pub fn physical_type(&self) -> PhysicalType {112self.column_descr.descriptor.primitive_type.physical_type113}114115/// Decodes the raw statistics into [`Statistics`].116pub fn statistics(&self) -> Option<ParquetResult<Statistics>> {117self.metadata().statistics.as_ref().map(|x| {118Statistics::deserialize(x, self.column_descr.descriptor.primitive_type.clone())119})120}121122/// Total number of values in this column chunk. Note that this is not necessarily the number123/// of rows. E.g. the (nested) array `[[1, 2], [3]]` has 2 rows and 3 values.124pub fn num_values(&self) -> i64 {125self.metadata().num_values126}127128/// [`Compression`] for this column.129pub fn compression(&self) -> Compression {130self.metadata().codec.try_into().unwrap()131}132133/// Returns the total compressed data size of this column chunk.134pub fn compressed_size(&self) -> i64 {135self.metadata().total_compressed_size136}137138/// Returns the total uncompressed data size of this column chunk.139pub fn uncompressed_size(&self) -> i64 {140self.metadata().total_uncompressed_size141}142143/// Returns the offset for the column data.144pub fn data_page_offset(&self) -> i64 {145self.metadata().data_page_offset146}147148/// Returns `true` if this column chunk contains a index page, `false` otherwise.149pub fn has_index_page(&self) -> bool {150self.metadata().index_page_offset.is_some()151}152153/// Returns the offset for the index page.154pub fn index_page_offset(&self) -> Option<i64> {155self.metadata().index_page_offset156}157158/// Returns the offset for the dictionary page, if any.159pub fn dictionary_page_offset(&self) -> Option<i64> {160self.metadata().dictionary_page_offset161}162163/// Returns the encoding for this column164pub fn column_encoding(&self) -> &Vec<Encoding> {165&self.metadata().encodings166}167168/// Returns the offset and length in bytes of the column chunk within the file169pub fn byte_range(&self) -> core::ops::Range<u64> {170// this has been validated in [`try_from_thrift`]171column_metadata_byte_range(self.metadata())172}173174/// Method to convert from Thrift.175pub(crate) fn try_from_thrift(176column_descr: ColumnDescriptor,177column_chunk: ColumnChunk,178) -> ParquetResult<Self> {179// validate metadata180if let Some(meta) = &column_chunk.meta_data {181let _: u64 = meta.total_compressed_size.try_into()?;182183if let Some(offset) = meta.dictionary_page_offset {184let _: u64 = offset.try_into()?;185}186let _: u64 = meta.data_page_offset.try_into()?;187188let _: Compression = meta.codec.try_into()?;189} else {190return Err(ParquetError::oos("Column chunk requires metadata"));191}192193Ok(Self {194column_chunk,195column_descr,196})197}198199/// Method to convert to Thrift.200pub fn into_thrift(self) -> ColumnChunk {201self.column_chunk202}203}204205pub(super) fn column_metadata_byte_range(206column_metadata: &ColumnMetaData,207) -> core::ops::Range<u64> {208let offset = if let Some(dict_page_offset) = column_metadata.dictionary_page_offset {209dict_page_offset as u64210} else {211column_metadata.data_page_offset as u64212};213let len = column_metadata.total_compressed_size as u64;214offset..offset.checked_add(len).unwrap()215}216217218