Path: blob/main/crates/polars-parquet/src/parquet/write/row_group.rs
6940 views
use std::io::Write;12#[cfg(feature = "async")]3use futures::AsyncWrite;4use polars_parquet_format::{ColumnChunk, RowGroup};56use super::column_chunk::write_column_chunk;7#[cfg(feature = "async")]8use super::column_chunk::write_column_chunk_async;9use super::page::{PageWriteSpec, is_data_page};10use super::{DynIter, DynStreamingIterator};11use crate::parquet::error::{ParquetError, ParquetResult};12use crate::parquet::metadata::{ColumnChunkMetadata, ColumnDescriptor};13use crate::parquet::page::CompressedPage;1415pub struct ColumnOffsetsMetadata {16pub dictionary_page_offset: Option<i64>,17pub data_page_offset: Option<i64>,18}1920impl ColumnOffsetsMetadata {21pub fn from_column_chunk(column_chunk: &ColumnChunk) -> ColumnOffsetsMetadata {22ColumnOffsetsMetadata {23dictionary_page_offset: column_chunk24.meta_data25.as_ref()26.map(|meta| meta.dictionary_page_offset)27.unwrap_or(None),28data_page_offset: column_chunk29.meta_data30.as_ref()31.map(|meta| meta.data_page_offset),32}33}3435pub fn from_column_chunk_metadata(36column_chunk_metadata: &ColumnChunkMetadata,37) -> ColumnOffsetsMetadata {38ColumnOffsetsMetadata {39dictionary_page_offset: column_chunk_metadata.dictionary_page_offset(),40data_page_offset: Some(column_chunk_metadata.data_page_offset()),41}42}4344pub fn calc_row_group_file_offset(&self) -> Option<i64> {45self.dictionary_page_offset46.filter(|x| *x > 0_i64)47.or(self.data_page_offset)48}49}5051fn compute_num_rows(columns: &[(ColumnChunk, Vec<PageWriteSpec>)]) -> ParquetResult<i64> {52columns53.first()54.map(|(_, specs)| {55let mut num_rows = 0;56specs57.iter()58.filter(|x| is_data_page(x))59.try_for_each(|spec| {60num_rows += spec.num_rows as i64;61ParquetResult::Ok(())62})?;63ParquetResult::Ok(num_rows)64})65.unwrap_or(Ok(0))66}6768pub fn write_row_group<69'a,70W,71E, // external error any of the iterators may emit72>(73writer: &mut W,74mut offset: u64,75descriptors: &[ColumnDescriptor],76columns: DynIter<'a, std::result::Result<DynStreamingIterator<'a, CompressedPage, E>, E>>,77ordinal: usize,78) -> ParquetResult<(RowGroup, Vec<Vec<PageWriteSpec>>, u64)>79where80W: Write,81ParquetError: From<E>,82E: std::error::Error,83{84let column_iter = descriptors.iter().zip(columns);8586let initial = offset;87let columns = column_iter88.map(|(descriptor, page_iter)| {89let (column, page_specs, size) =90write_column_chunk(writer, offset, descriptor, page_iter?)?;91offset += size;92Ok((column, page_specs))93})94.collect::<ParquetResult<Vec<_>>>()?;95let bytes_written = offset - initial;9697let num_rows = compute_num_rows(&columns)?;9899// compute row group stats100let file_offset = columns101.first()102.map(|(column_chunk, _)| {103ColumnOffsetsMetadata::from_column_chunk(column_chunk).calc_row_group_file_offset()104})105.unwrap_or(None);106107let total_byte_size = columns108.iter()109.map(|(c, _)| c.meta_data.as_ref().unwrap().total_uncompressed_size)110.sum();111let total_compressed_size = columns112.iter()113.map(|(c, _)| c.meta_data.as_ref().unwrap().total_compressed_size)114.sum();115116let (columns, specs) = columns.into_iter().unzip();117118Ok((119RowGroup {120columns,121total_byte_size,122num_rows,123sorting_columns: None,124file_offset,125total_compressed_size: Some(total_compressed_size),126ordinal: ordinal.try_into().ok(),127},128specs,129bytes_written,130))131}132133#[cfg(feature = "async")]134#[cfg_attr(docsrs, doc(cfg(feature = "async")))]135pub async fn write_row_group_async<136'a,137W,138E, // external error any of the iterators may emit139>(140writer: &mut W,141mut offset: u64,142descriptors: &[ColumnDescriptor],143columns: DynIter<'a, std::result::Result<DynStreamingIterator<'a, CompressedPage, E>, E>>,144ordinal: usize,145) -> ParquetResult<(RowGroup, Vec<Vec<PageWriteSpec>>, u64)>146where147W: AsyncWrite + Unpin + Send,148ParquetError: From<E>,149E: std::error::Error,150{151let column_iter = descriptors.iter().zip(columns);152153let initial = offset;154let mut columns = vec![];155for (descriptor, page_iter) in column_iter {156let (column, page_specs, size) =157write_column_chunk_async(writer, offset, descriptor, page_iter?).await?;158offset += size;159columns.push((column, page_specs));160}161let bytes_written = offset - initial;162163let num_rows = compute_num_rows(&columns)?;164165// compute row group stats166let file_offset = columns167.first()168.map(|(column_chunk, _)| {169ColumnOffsetsMetadata::from_column_chunk(column_chunk).calc_row_group_file_offset()170})171.unwrap_or(None);172173let total_byte_size = columns174.iter()175.map(|(c, _)| c.meta_data.as_ref().unwrap().total_uncompressed_size)176.sum();177let total_compressed_size = columns178.iter()179.map(|(c, _)| c.meta_data.as_ref().unwrap().total_compressed_size)180.sum();181182let (columns, specs) = columns.into_iter().unzip();183184Ok((185RowGroup {186columns,187total_byte_size,188num_rows: num_rows as i64,189sorting_columns: None,190file_offset,191total_compressed_size: Some(total_compressed_size),192ordinal: ordinal.try_into().ok(),193},194specs,195bytes_written,196))197}198199200