Path: blob/main/crates/polars-parquet/src/parquet/write/column_chunk.rs
6940 views
use std::io::Write;12#[cfg(feature = "async")]3use futures::AsyncWrite;4use polars_parquet_format::thrift::protocol::TCompactOutputProtocol;5#[cfg(feature = "async")]6use polars_parquet_format::thrift::protocol::TCompactOutputStreamProtocol;7use polars_parquet_format::{ColumnChunk, ColumnMetaData, Type};8use polars_utils::aliases::PlHashSet;910use super::DynStreamingIterator;11#[cfg(feature = "async")]12use super::page::write_page_async;13use super::page::{PageWriteSpec, write_page};14use super::statistics::reduce;15use crate::parquet::FallibleStreamingIterator;16use crate::parquet::compression::Compression;17use crate::parquet::encoding::Encoding;18use crate::parquet::error::{ParquetError, ParquetResult};19use crate::parquet::metadata::ColumnDescriptor;20use crate::parquet::page::{CompressedPage, PageType};2122pub fn write_column_chunk<W, E>(23writer: &mut W,24mut offset: u64,25descriptor: &ColumnDescriptor,26mut compressed_pages: DynStreamingIterator<'_, CompressedPage, E>,27) -> ParquetResult<(ColumnChunk, Vec<PageWriteSpec>, u64)>28where29W: Write,30ParquetError: From<E>,31E: std::error::Error,32{33// write every page3435let initial = offset;3637let mut specs = vec![];38while let Some(compressed_page) = compressed_pages.next()? {39let spec = write_page(writer, offset, compressed_page)?;40offset += spec.bytes_written;41specs.push(spec);42}43let mut bytes_written = offset - initial;4445let column_chunk = build_column_chunk(&specs, descriptor)?;4647// write metadata48let mut protocol = TCompactOutputProtocol::new(writer);49bytes_written += column_chunk50.meta_data51.as_ref()52.unwrap()53.write_to_out_protocol(&mut protocol)? as u64;5455Ok((column_chunk, specs, bytes_written))56}5758#[cfg(feature = "async")]59#[cfg_attr(docsrs, doc(cfg(feature = "async")))]60pub async fn write_column_chunk_async<W, E>(61writer: &mut W,62mut offset: u64,63descriptor: &ColumnDescriptor,64mut compressed_pages: DynStreamingIterator<'_, CompressedPage, E>,65) -> ParquetResult<(ColumnChunk, Vec<PageWriteSpec>, u64)>66where67W: AsyncWrite + Unpin + Send,68ParquetError: From<E>,69E: std::error::Error,70{71let initial = offset;72// write every page73let mut specs = vec![];74while let Some(compressed_page) = compressed_pages.next()? {75let spec = write_page_async(writer, offset, compressed_page).await?;76offset += spec.bytes_written;77specs.push(spec);78}79let mut bytes_written = offset - initial;8081let column_chunk = build_column_chunk(&specs, descriptor)?;8283// write metadata84let mut protocol = TCompactOutputStreamProtocol::new(writer);85bytes_written += column_chunk86.meta_data87.as_ref()88.unwrap()89.write_to_out_stream_protocol(&mut protocol)90.await? as u64;9192Ok((column_chunk, specs, bytes_written))93}9495fn build_column_chunk(96specs: &[PageWriteSpec],97descriptor: &ColumnDescriptor,98) -> ParquetResult<ColumnChunk> {99// compute stats to build header at the end of the chunk100101let compression = specs102.iter()103.map(|spec| spec.compression)104.collect::<PlHashSet<_>>();105if compression.len() > 1 {106return Err(crate::parquet::error::ParquetError::oos(107"All pages within a column chunk must be compressed with the same codec",108));109}110let compression = compression111.into_iter()112.next()113.unwrap_or(Compression::Uncompressed);114115// SPEC: the total compressed size is the total compressed size of each page + the header size116let total_compressed_size = specs117.iter()118.map(|x| x.header_size as i64 + x.header.compressed_page_size as i64)119.sum();120// SPEC: the total compressed size is the total compressed size of each page + the header size121let total_uncompressed_size = specs122.iter()123.map(|x| x.header_size as i64 + x.header.uncompressed_page_size as i64)124.sum();125let data_page_offset = specs.first().map(|spec| spec.offset).unwrap_or(0) as i64;126let num_values = specs127.iter()128.map(|spec| {129let type_ = spec.header.type_.try_into().unwrap();130match type_ {131PageType::DataPage => {132spec.header.data_page_header.as_ref().unwrap().num_values as i64133},134PageType::DataPageV2 => {135spec.header.data_page_header_v2.as_ref().unwrap().num_values as i64136},137_ => 0, // only data pages contribute138}139})140.sum();141let mut encodings = specs142.iter()143.flat_map(|spec| {144let type_ = spec.header.type_.try_into().unwrap();145match type_ {146PageType::DataPage => vec![147spec.header.data_page_header.as_ref().unwrap().encoding,148Encoding::Rle.into(),149],150PageType::DataPageV2 => {151vec![152spec.header.data_page_header_v2.as_ref().unwrap().encoding,153Encoding::Rle.into(),154]155},156PageType::DictionaryPage => vec![157spec.header158.dictionary_page_header159.as_ref()160.unwrap()161.encoding,162],163}164})165.collect::<PlHashSet<_>>() // unique166.into_iter() // to vec167.collect::<Vec<_>>();168169// Sort the encodings to have deterministic metadata170encodings.sort();171172let statistics = specs.iter().map(|x| &x.statistics).collect::<Vec<_>>();173let statistics = reduce(&statistics)?;174let statistics = statistics.map(|x| x.serialize());175176let (type_, _): (Type, Option<i32>) = descriptor.descriptor.primitive_type.physical_type.into();177178let metadata = ColumnMetaData {179type_,180encodings,181path_in_schema: descriptor182.path_in_schema183.iter()184.map(|x| x.to_string())185.collect::<Vec<_>>(),186codec: compression.into(),187num_values,188total_uncompressed_size,189total_compressed_size,190key_value_metadata: None,191data_page_offset,192index_page_offset: None,193dictionary_page_offset: None,194statistics,195encoding_stats: None,196bloom_filter_offset: None,197bloom_filter_length: None,198size_statistics: None,199};200201Ok(ColumnChunk {202file_path: None, // same file for now.203file_offset: data_page_offset + total_compressed_size,204meta_data: Some(metadata),205offset_index_offset: None,206offset_index_length: None,207column_index_offset: None,208column_index_length: None,209crypto_metadata: None,210encrypted_column_metadata: None,211})212}213214215