Path: blob/main/crates/polars-parquet/src/arrow/write/row_group.rs
8480 views
use arrow::array::Array;1use arrow::datatypes::ArrowSchema;2use arrow::record_batch::RecordBatchT;3use polars_buffer::Buffer;4use polars_error::{PolarsError, PolarsResult, polars_bail, to_compute_err};56use super::{7DynIter, DynStreamingIterator, Encoding, RowGroupIterColumns, SchemaDescriptor, WriteOptions,8array_to_columns, to_parquet_schema,9};10use crate::parquet::FallibleStreamingIterator;11use crate::parquet::error::ParquetError;12use crate::parquet::schema::types::ParquetType;13use crate::parquet::write::Compressor;1415/// Maps a [`RecordBatchT`] and parquet-specific options to an [`RowGroupIterColumns`] used to16/// write to parquet17/// # Panics18/// Iff19/// * `encodings.len() != fields.len()` or20/// * `encodings.len() != chunk.arrays().len()`21pub fn row_group_iter<A: AsRef<dyn Array> + 'static + Send + Sync>(22chunk: RecordBatchT<A>,23encodings: Buffer<Vec<Encoding>>,24fields: Vec<ParquetType>,25options: WriteOptions,26) -> RowGroupIterColumns<'static, PolarsError> {27assert_eq!(encodings.len(), fields.len());28assert_eq!(encodings.len(), chunk.arrays().len());29DynIter::new(30chunk31.into_arrays()32.into_iter()33.zip(fields)34.enumerate()35.flat_map(move |(i, (array, type_))| {36let encoding = encodings[i].as_slice();37let encoded_columns = array_to_columns(array, type_, options, encoding).unwrap();38encoded_columns39.into_iter()40.map(|encoded_pages| {41let pages = encoded_pages;4243let pages = DynIter::new(44pages45.into_iter()46.map(|x| x.map_err(|e| ParquetError::oos(e.to_string()))),47);4849let compressed_pages = Compressor::new(pages, options.compression, vec![])50.map_err(to_compute_err);51Ok(DynStreamingIterator::new(compressed_pages))52})53.collect::<Vec<_>>()54}),55)56}5758/// An iterator adapter that converts an iterator over [`RecordBatchT`] into an iterator59/// of row groups.60/// Use it to create an iterator consumable by the parquet's API.61pub struct RowGroupIterator<62A: AsRef<dyn Array> + 'static,63I: Iterator<Item = PolarsResult<RecordBatchT<A>>>,64> {65iter: I,66options: WriteOptions,67parquet_schema: SchemaDescriptor,68encodings: Buffer<Vec<Encoding>>,69}7071impl<A: AsRef<dyn Array> + 'static, I: Iterator<Item = PolarsResult<RecordBatchT<A>>>>72RowGroupIterator<A, I>73{74/// Creates a new [`RowGroupIterator`] from an iterator over [`RecordBatchT`].75///76/// # Errors77/// Iff78/// * the Arrow schema can't be converted to a valid Parquet schema.79/// * the length of the encodings is different from the number of fields in schema80pub fn try_new(81iter: I,82schema: &ArrowSchema,83options: WriteOptions,84encodings: Buffer<Vec<Encoding>>,85) -> PolarsResult<Self> {86if encodings.len() != schema.len() {87polars_bail!(InvalidOperation:88"The number of encodings must equal the number of fields",89)90}91let parquet_schema = to_parquet_schema(schema)?;9293Ok(Self {94iter,95options,96parquet_schema,97encodings,98})99}100101/// Returns the [`SchemaDescriptor`] of the [`RowGroupIterator`].102pub fn parquet_schema(&self) -> &SchemaDescriptor {103&self.parquet_schema104}105}106107impl<A: AsRef<dyn Array> + 'static + Send + Sync, I: Iterator<Item = PolarsResult<RecordBatchT<A>>>>108Iterator for RowGroupIterator<A, I>109{110type Item = PolarsResult<RowGroupIterColumns<'static, PolarsError>>;111112fn next(&mut self) -> Option<Self::Item> {113let options = self.options;114115self.iter.next().map(|maybe_chunk| {116let chunk = maybe_chunk?;117if self.encodings.len() != chunk.arrays().len() {118polars_bail!(InvalidOperation:119"The number of arrays in the chunk must equal the number of fields in the schema"120)121};122let encodings = self.encodings.clone();123Ok(row_group_iter(124chunk,125encodings,126self.parquet_schema.fields().to_vec(),127options,128))129})130}131}132133134