Path: blob/main/crates/polars-parquet/src/arrow/write/row_group.rs
6940 views
use arrow::array::Array;1use arrow::datatypes::ArrowSchema;2use arrow::record_batch::RecordBatchT;3use polars_error::{PolarsError, PolarsResult, polars_bail, to_compute_err};45use super::{6ColumnWriteOptions, DynIter, DynStreamingIterator, RowGroupIterColumns, SchemaDescriptor,7WriteOptions, array_to_columns, to_parquet_schema,8};9use crate::parquet::FallibleStreamingIterator;10use crate::parquet::error::ParquetError;11use crate::parquet::schema::types::ParquetType;12use crate::parquet::write::Compressor;1314/// Maps a [`RecordBatchT`] and parquet-specific options to an [`RowGroupIterColumns`] used to15/// write to parquet16/// # Panics17/// Iff18/// * `encodings.len() != fields.len()` or19/// * `encodings.len() != chunk.arrays().len()`20pub fn row_group_iter<A: AsRef<dyn Array> + 'static + Send + Sync>(21chunk: RecordBatchT<A>,22column_options: Vec<ColumnWriteOptions>,23fields: Vec<ParquetType>,24options: WriteOptions,25) -> RowGroupIterColumns<'static, PolarsError> {26assert_eq!(column_options.len(), fields.len());27assert_eq!(column_options.len(), chunk.arrays().len());28DynIter::new(29chunk30.into_arrays()31.into_iter()32.zip(fields)33.zip(column_options)34.flat_map(move |((array, type_), column_options)| {35let encoded_columns =36array_to_columns(array, type_, &column_options, options).unwrap();37encoded_columns38.into_iter()39.map(|encoded_pages| {40let pages = encoded_pages;4142let pages = DynIter::new(43pages44.into_iter()45.map(|x| x.map_err(|e| ParquetError::oos(e.to_string()))),46);4748let compressed_pages = Compressor::new(pages, options.compression, vec![])49.map_err(to_compute_err);50Ok(DynStreamingIterator::new(compressed_pages))51})52.collect::<Vec<_>>()53}),54)55}5657/// An iterator adapter that converts an iterator over [`RecordBatchT`] into an iterator58/// of row groups.59/// Use it to create an iterator consumable by the parquet's API.60pub struct RowGroupIterator<61A: AsRef<dyn Array> + 'static,62I: Iterator<Item = PolarsResult<RecordBatchT<A>>>,63> {64iter: I,65options: WriteOptions,66parquet_schema: SchemaDescriptor,67column_options: Vec<ColumnWriteOptions>,68}6970impl<A: AsRef<dyn Array> + 'static, I: Iterator<Item = PolarsResult<RecordBatchT<A>>>>71RowGroupIterator<A, I>72{73/// Creates a new [`RowGroupIterator`] from an iterator over [`RecordBatchT`].74///75/// # Errors76/// Iff77/// * the Arrow schema can't be converted to a valid Parquet schema.78/// * the length of the encodings is different from the number of fields in schema79pub fn try_new(80iter: I,81schema: &ArrowSchema,82options: WriteOptions,83column_options: Vec<ColumnWriteOptions>,84) -> PolarsResult<Self> {85if column_options.len() != schema.len() {86polars_bail!(InvalidOperation:87"The number of column options must equal the number of fields".to_string(),88)89}90let parquet_schema = to_parquet_schema(schema, &column_options)?;9192Ok(Self {93iter,94options,95parquet_schema,96column_options,97})98}99100/// Returns the [`SchemaDescriptor`] of the [`RowGroupIterator`].101pub fn parquet_schema(&self) -> &SchemaDescriptor {102&self.parquet_schema103}104}105106impl<A: AsRef<dyn Array> + 'static + Send + Sync, I: Iterator<Item = PolarsResult<RecordBatchT<A>>>>107Iterator for RowGroupIterator<A, I>108{109type Item = PolarsResult<RowGroupIterColumns<'static, PolarsError>>;110111fn next(&mut self) -> Option<Self::Item> {112let options = self.options;113114self.iter.next().map(|maybe_chunk| {115let chunk = maybe_chunk?;116if self.column_options.len() != chunk.arrays().len() {117polars_bail!(InvalidOperation:118"The number of arrays in the chunk must equal the number of fields in the schema"119)120};121let encodings = self.column_options.clone();122Ok(row_group_iter(123chunk,124encodings,125self.parquet_schema.fields().to_vec(),126options,127))128})129}130}131132133