Path: blob/main/crates/polars-parquet/src/parquet/write/row_group.rs
8512 views
use std::io::Write;12#[cfg(feature = "async")]3use futures::AsyncWrite;4use polars_parquet_format::{ColumnChunk, RowGroup};56use super::column_chunk::write_column_chunk;7#[cfg(feature = "async")]8use super::column_chunk::write_column_chunk_async;9use super::page::{PageWriteSpec, is_data_page};10use super::{DynIter, DynStreamingIterator};11use crate::parquet::error::{ParquetError, ParquetResult};12use crate::parquet::metadata::{ColumnChunkMetadata, ColumnDescriptor};13use crate::parquet::page::CompressedPage;1415pub struct ColumnOffsetsMetadata {16pub dictionary_page_offset: Option<i64>,17pub data_page_offset: Option<i64>,18}1920impl ColumnOffsetsMetadata {21pub fn from_column_chunk(column_chunk: &ColumnChunk) -> ColumnOffsetsMetadata {22ColumnOffsetsMetadata {23dictionary_page_offset: column_chunk24.meta_data25.as_ref()26.map(|meta| meta.dictionary_page_offset)27.unwrap_or(None),28data_page_offset: column_chunk29.meta_data30.as_ref()31.map(|meta| meta.data_page_offset),32}33}3435pub fn from_column_chunk_metadata(36column_chunk_metadata: &ColumnChunkMetadata,37) -> ColumnOffsetsMetadata {38ColumnOffsetsMetadata {39dictionary_page_offset: column_chunk_metadata.dictionary_page_offset(),40data_page_offset: Some(column_chunk_metadata.data_page_offset()),41}42}4344pub fn calc_row_group_file_offset(&self) -> Option<i64> {45self.dictionary_page_offset46.filter(|x| *x > 0_i64)47.or(self.data_page_offset)48}49}5051fn compute_num_rows(columns: &[(ColumnChunk, Vec<PageWriteSpec>)]) -> ParquetResult<i64> {52columns53.first()54.map(|(_, specs)| {55let mut num_rows = 0;56specs57.iter()58.filter(|x| is_data_page(x))59.try_for_each(|spec| {60num_rows += spec.num_rows as i64;61ParquetResult::Ok(())62})?;63ParquetResult::Ok(num_rows)64})65.unwrap_or(Ok(0))66}6768pub fn write_row_group<69'a,70W,71E, // external error any of the iterators may emit72>(73writer: &mut W,74num_rows: u64,75mut offset: u64,76descriptors: &[ColumnDescriptor],77columns: DynIter<'a, std::result::Result<DynStreamingIterator<'a, CompressedPage, E>, E>>,78ordinal: usize,79) -> ParquetResult<(RowGroup, Vec<Vec<PageWriteSpec>>, u64)>80where81W: Write,82ParquetError: From<E>,83E: std::error::Error,84{85let column_iter = descriptors.iter().zip(columns);8687let initial = offset;88let columns = column_iter89.map(|(descriptor, page_iter)| {90let (column, page_specs, size) =91write_column_chunk(writer, offset, descriptor, page_iter?)?;92offset += size;93Ok((column, page_specs))94})95.collect::<ParquetResult<Vec<_>>>()?;96let bytes_written = offset - initial;9798let num_rows = if num_rows != u64::MAX99&& let Ok(v) = i64::try_from(num_rows)100{101if cfg!(debug_assertions) {102let inferred = compute_num_rows(&columns)?;103assert!(v == inferred || (columns.is_empty() && inferred == 0));104}105v106} else {107compute_num_rows(&columns)?108};109110// compute row group stats111let file_offset = columns112.first()113.map(|(column_chunk, _)| {114ColumnOffsetsMetadata::from_column_chunk(column_chunk).calc_row_group_file_offset()115})116.unwrap_or(None);117118let total_byte_size = columns119.iter()120.map(|(c, _)| c.meta_data.as_ref().unwrap().total_uncompressed_size)121.sum();122let total_compressed_size = columns123.iter()124.map(|(c, _)| c.meta_data.as_ref().unwrap().total_compressed_size)125.sum();126127let (columns, specs) = columns.into_iter().unzip();128129Ok((130RowGroup {131columns,132total_byte_size,133num_rows,134sorting_columns: None,135file_offset,136total_compressed_size: Some(total_compressed_size),137ordinal: ordinal.try_into().ok(),138},139specs,140bytes_written,141))142}143144#[cfg(feature = "async")]145#[cfg_attr(docsrs, doc(cfg(feature = "async")))]146pub async fn write_row_group_async<147'a,148W,149E, // external error any of the iterators may emit150>(151writer: &mut W,152mut offset: u64,153descriptors: &[ColumnDescriptor],154columns: DynIter<'a, std::result::Result<DynStreamingIterator<'a, CompressedPage, E>, E>>,155ordinal: usize,156) -> ParquetResult<(RowGroup, Vec<Vec<PageWriteSpec>>, u64)>157where158W: AsyncWrite + Unpin + Send,159ParquetError: From<E>,160E: std::error::Error,161{162let column_iter = descriptors.iter().zip(columns);163164let initial = offset;165let mut columns = vec![];166for (descriptor, page_iter) in column_iter {167let (column, page_specs, size) =168write_column_chunk_async(writer, offset, descriptor, page_iter?).await?;169offset += size;170columns.push((column, page_specs));171}172let bytes_written = offset - initial;173174let num_rows = compute_num_rows(&columns)?;175176// compute row group stats177let file_offset = columns178.first()179.map(|(column_chunk, _)| {180ColumnOffsetsMetadata::from_column_chunk(column_chunk).calc_row_group_file_offset()181})182.unwrap_or(None);183184let total_byte_size = columns185.iter()186.map(|(c, _)| c.meta_data.as_ref().unwrap().total_uncompressed_size)187.sum();188let total_compressed_size = columns189.iter()190.map(|(c, _)| c.meta_data.as_ref().unwrap().total_compressed_size)191.sum();192193let (columns, specs) = columns.into_iter().unzip();194195Ok((196RowGroup {197columns,198total_byte_size,199num_rows: num_rows as i64,200sorting_columns: None,201file_offset,202total_compressed_size: Some(total_compressed_size),203ordinal: ordinal.try_into().ok(),204},205specs,206bytes_written,207))208}209210211