Path: blob/main/crates/polars-io/src/parquet/write/batched_writer.rs
6940 views
use std::io::Write;1use std::sync::Mutex;23use arrow::record_batch::RecordBatch;4use polars_core::POOL;5use polars_core::prelude::*;6use polars_parquet::read::{ParquetError, fallible_streaming_iterator};7use polars_parquet::write::{8ColumnWriteOptions, CompressedPage, Compressor, DynIter, DynStreamingIterator,9FallibleStreamingIterator, FileWriter, Page, ParquetType, RowGroupIterColumns,10SchemaDescriptor, WriteOptions, array_to_columns, schema_to_metadata_key,11};12use rayon::prelude::*;1314use super::{KeyValueMetadata, ParquetMetadataContext};1516pub struct BatchedWriter<W: Write> {17// A mutex so that streaming engine can get concurrent read access to18// compress pages.19//20// @TODO: Remove mutex when old streaming engine is removed21pub(super) writer: Mutex<FileWriter<W>>,22// @TODO: Remove when old streaming engine is removed23pub(super) parquet_schema: SchemaDescriptor,24pub(super) column_options: Vec<ColumnWriteOptions>,25pub(super) options: WriteOptions,26pub(super) parallel: bool,27pub(super) key_value_metadata: Option<KeyValueMetadata>,28}2930impl<W: Write> BatchedWriter<W> {31pub fn new(32writer: Mutex<FileWriter<W>>,33column_options: Vec<ColumnWriteOptions>,34options: WriteOptions,35parallel: bool,36key_value_metadata: Option<KeyValueMetadata>,37) -> Self {38Self {39writer,40parquet_schema: SchemaDescriptor::new(PlSmallStr::EMPTY, vec![]),41column_options,42options,43parallel,44key_value_metadata,45}46}4748pub fn encode_and_compress<'a>(49&'a self,50df: &'a DataFrame,51) -> impl Iterator<Item = PolarsResult<RowGroupIterColumns<'static, PolarsError>>> + 'a {52let rb_iter = df.iter_chunks(CompatLevel::newest(), false);53rb_iter.filter_map(move |batch| match batch.len() {540 => None,55_ => {56let row_group = create_eager_serializer(57batch,58self.parquet_schema.fields(),59self.column_options.as_ref(),60self.options,61);6263Some(row_group)64},65})66}6768/// Write a batch to the parquet writer.69///70/// # Panics71/// The caller must ensure the chunks in the given [`DataFrame`] are aligned.72pub fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {73let row_group_iter = prepare_rg_iter(74df,75&self.parquet_schema,76&self.column_options,77self.options,78self.parallel,79);80// Lock before looping so that order is maintained under contention.81let mut writer = self.writer.lock().unwrap();82for group in row_group_iter {83writer.write(group?)?;84}85Ok(())86}8788pub fn parquet_schema(&mut self) -> &SchemaDescriptor {89let writer = self.writer.get_mut().unwrap();90writer.parquet_schema()91}9293pub fn write_row_group(&mut self, rg: &[Vec<CompressedPage>]) -> PolarsResult<()> {94let writer = self.writer.get_mut().unwrap();95let rg = DynIter::new(rg.iter().map(|col_pages| {96Ok(DynStreamingIterator::new(97fallible_streaming_iterator::convert(col_pages.iter().map(PolarsResult::Ok)),98))99}));100writer.write(rg)?;101Ok(())102}103104pub fn get_writer(&self) -> &Mutex<FileWriter<W>> {105&self.writer106}107108pub fn write_row_groups(109&self,110rgs: Vec<RowGroupIterColumns<'static, PolarsError>>,111) -> PolarsResult<()> {112// Lock before looping so that order is maintained.113let mut writer = self.writer.lock().unwrap();114for group in rgs {115writer.write(group)?;116}117Ok(())118}119120/// Writes the footer of the parquet file. Returns the total size of the file.121pub fn finish(&self) -> PolarsResult<u64> {122let mut writer = self.writer.lock().unwrap();123124let key_value_metadata = self125.key_value_metadata126.as_ref()127.map(|meta| {128let arrow_schema = schema_to_metadata_key(writer.schema(), &self.column_options);129let ctx = ParquetMetadataContext {130arrow_schema: arrow_schema.value.as_ref().unwrap(),131};132let mut out = meta.collect(ctx)?;133if !out.iter().any(|kv| kv.key == arrow_schema.key) {134out.insert(0, arrow_schema);135}136PolarsResult::Ok(out)137})138.transpose()?;139140let size = writer.end(key_value_metadata, &self.column_options)?;141Ok(size)142}143}144145// Note that the df should be rechunked146fn prepare_rg_iter<'a>(147df: &'a DataFrame,148parquet_schema: &'a SchemaDescriptor,149column_options: &'a [ColumnWriteOptions],150options: WriteOptions,151parallel: bool,152) -> impl Iterator<Item = PolarsResult<RowGroupIterColumns<'static, PolarsError>>> + 'a {153let rb_iter = df.iter_chunks(CompatLevel::newest(), false);154rb_iter.filter_map(move |batch| match batch.len() {1550 => None,156_ => {157let row_group = create_serializer(158batch,159parquet_schema.fields(),160column_options,161options,162parallel,163);164165Some(row_group)166},167})168}169170fn pages_iter_to_compressor(171encoded_columns: Vec<DynIter<'static, PolarsResult<Page>>>,172options: WriteOptions,173) -> Vec<PolarsResult<DynStreamingIterator<'static, CompressedPage, PolarsError>>> {174encoded_columns175.into_iter()176.map(|encoded_pages| {177// iterator over pages178let pages = DynStreamingIterator::new(179Compressor::new_from_vec(180encoded_pages.map(|result| {181result.map_err(|e| {182ParquetError::FeatureNotSupported(format!("reraised in polars: {e}",))183})184}),185options.compression,186vec![],187)188.map_err(PolarsError::from),189);190191Ok(pages)192})193.collect::<Vec<_>>()194}195196fn array_to_pages_iter(197array: &ArrayRef,198type_: &ParquetType,199column_options: &ColumnWriteOptions,200options: WriteOptions,201) -> Vec<PolarsResult<DynStreamingIterator<'static, CompressedPage, PolarsError>>> {202let encoded_columns = array_to_columns(array, type_.clone(), column_options, options).unwrap();203pages_iter_to_compressor(encoded_columns, options)204}205206fn create_serializer(207batch: RecordBatch,208fields: &[ParquetType],209column_options: &[ColumnWriteOptions],210options: WriteOptions,211parallel: bool,212) -> PolarsResult<RowGroupIterColumns<'static, PolarsError>> {213let func = move |((array, type_), column_options): (214(&ArrayRef, &ParquetType),215&ColumnWriteOptions,216)| { array_to_pages_iter(array, type_, column_options, options) };217218let columns = if parallel {219POOL.install(|| {220batch221.columns()222.par_iter()223.zip(fields)224.zip(column_options)225.flat_map(func)226.collect::<Vec<_>>()227})228} else {229batch230.columns()231.iter()232.zip(fields)233.zip(column_options)234.flat_map(func)235.collect::<Vec<_>>()236};237238let row_group = DynIter::new(columns.into_iter());239240Ok(row_group)241}242243/// This serializer encodes and compresses all eagerly in memory.244/// Used for separating compute from IO.245fn create_eager_serializer(246batch: RecordBatch,247fields: &[ParquetType],248column_options: &[ColumnWriteOptions],249options: WriteOptions,250) -> PolarsResult<RowGroupIterColumns<'static, PolarsError>> {251let func = move |((array, type_), column_options): (252(&ArrayRef, &ParquetType),253&ColumnWriteOptions,254)| { array_to_pages_iter(array, type_, column_options, options) };255256let columns = batch257.columns()258.iter()259.zip(fields)260.zip(column_options)261.flat_map(func)262.collect::<Vec<_>>();263264let row_group = DynIter::new(columns.into_iter());265266Ok(row_group)267}268269270