Path: blob/main/crates/polars-stream/src/nodes/io_sinks/parquet.rs
6939 views
use std::cmp::Reverse;1use std::io::BufWriter;2use std::pin::Pin;3use std::sync::{Arc, Mutex};45use polars_core::prelude::{ArrowSchema, CompatLevel};6use polars_core::schema::SchemaRef;7use polars_error::PolarsResult;8use polars_io::cloud::CloudOptions;9use polars_io::parquet::write::BatchedWriter;10use polars_io::prelude::{ParquetWriteOptions, get_column_write_options};11use polars_io::schema_to_arrow_checked;12use polars_parquet::parquet::error::ParquetResult;13use polars_parquet::read::ParquetError;14use polars_parquet::write::{15ColumnWriteOptions, CompressedPage, Compressor, FileWriter, SchemaDescriptor, Version,16WriteOptions, array_to_columns, to_parquet_schema,17};18use polars_plan::dsl::{SinkOptions, SinkTarget};19use polars_utils::priority::Priority;20use polars_utils::relaxed_cell::RelaxedCell;2122use super::metrics::WriteMetrics;23use super::{24DEFAULT_SINK_DISTRIBUTOR_BUFFER_SIZE, DEFAULT_SINK_LINEARIZER_BUFFER_SIZE, SinkInputPort,25SinkNode, buffer_and_distribute_columns_task,26};27use crate::async_executor::spawn;28use crate::async_primitives::connector::{Receiver, connector};29use crate::async_primitives::distributor_channel::distributor_channel;30use crate::async_primitives::linearizer::Linearizer;31use crate::execute::StreamingExecutionState;32use crate::nodes::io_sinks::phase::PhaseOutcome;33use crate::nodes::{JoinHandle, TaskPriority};3435pub struct ParquetSinkNode {36target: SinkTarget,3738input_schema: SchemaRef,39sink_options: SinkOptions,40write_options: ParquetWriteOptions,4142parquet_schema: SchemaDescriptor,43arrow_schema: ArrowSchema,44column_options: Vec<ColumnWriteOptions>,45cloud_options: Option<CloudOptions>,4647file_size: Arc<RelaxedCell<u64>>,48metrics: Arc<Mutex<Option<WriteMetrics>>>,4950io_tx: Option<crate::async_primitives::connector::Sender<Vec<Vec<CompressedPage>>>>,51io_task: Option<tokio_util::task::AbortOnDropHandle<PolarsResult<()>>>,52}5354impl ParquetSinkNode {55pub fn new(56input_schema: SchemaRef,57target: SinkTarget,58sink_options: SinkOptions,59write_options: &ParquetWriteOptions,60cloud_options: Option<CloudOptions>,61collect_metrics: bool,62) -> PolarsResult<Self> {63let schema = schema_to_arrow_checked(&input_schema, CompatLevel::newest(), "parquet")?;64let column_options: Vec<ColumnWriteOptions> =65get_column_write_options(&schema, &write_options.field_overwrites);66let parquet_schema = to_parquet_schema(&schema, &column_options)?;67let metrics =68Arc::new(Mutex::new(collect_metrics.then(|| {69WriteMetrics::new(target.to_display_string(), &input_schema)70})));7172Ok(Self {73target,7475input_schema,76sink_options,77write_options: write_options.clone(),7879parquet_schema,80arrow_schema: schema,81column_options,82cloud_options,8384file_size: Arc::default(),85metrics,8687io_tx: None,88io_task: None,89})90}91}9293// 512 ^ 294const DEFAULT_ROW_GROUP_SIZE: usize = 1 << 18;9596impl SinkNode for ParquetSinkNode {97fn name(&self) -> &str {98"parquet-sink"99}100101fn is_sink_input_parallel(&self) -> bool {102false103}104fn do_maintain_order(&self) -> bool {105self.sink_options.maintain_order106}107108fn initialize(&mut self, _state: &StreamingExecutionState) -> PolarsResult<()> {109// Collect task -> IO task110let (io_tx, mut io_rx) = connector::<Vec<Vec<CompressedPage>>>();111112// IO task.113//114// Task that will actually do write to the target file. It is important that this is only115// spawned once.116let target = self.target.clone();117let sink_options = self.sink_options.clone();118let cloud_options = self.cloud_options.clone();119let write_options = self.write_options.clone();120let arrow_schema = self.arrow_schema.clone();121let parquet_schema = self.parquet_schema.clone();122let column_options = self.column_options.clone();123let output_file_size = self.file_size.clone();124let io_task = polars_io::pl_async::get_runtime().spawn(async move {125let mut file = target126.open_into_writeable_async(&sink_options, cloud_options.as_ref())127.await?;128129let writer = BufWriter::new(&mut *file);130let key_value_metadata = write_options.key_value_metadata;131let write_options = WriteOptions {132statistics: write_options.statistics,133compression: write_options.compression.into(),134version: Version::V1,135data_page_size: write_options.data_page_size,136};137let file_writer = Mutex::new(FileWriter::new_with_parquet_schema(138writer,139arrow_schema,140parquet_schema,141write_options,142));143let mut writer = BatchedWriter::new(144file_writer,145column_options,146write_options,147false,148key_value_metadata,149);150151let num_parquet_columns = writer.parquet_schema().leaves().len();152while let Ok(current_row_group) = io_rx.recv().await {153// @TODO: At the moment this is a sync write, this is not ideal because we can only154// have so many blocking threads in the tokio threadpool.155assert_eq!(current_row_group.len(), num_parquet_columns);156writer.write_row_group(¤t_row_group)?;157}158159let file_size = writer.finish()?;160drop(writer);161162file.sync_on_close(sink_options.sync_on_close)?;163file.close()?;164165output_file_size.store(file_size);166PolarsResult::Ok(())167});168169self.io_tx = Some(io_tx);170self.io_task = Some(tokio_util::task::AbortOnDropHandle::new(io_task));171172Ok(())173}174175fn spawn_sink(176&mut self,177recv_port_rx: Receiver<(PhaseOutcome, SinkInputPort)>,178state: &StreamingExecutionState,179join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,180) {181// Collect task -> IO task182let mut io_tx = self183.io_tx184.take()185.expect("not initialized / spawn called more than once");186// Buffer task -> Encode tasks187let (dist_tx, dist_rxs) =188distributor_channel(state.num_pipelines, *DEFAULT_SINK_DISTRIBUTOR_BUFFER_SIZE);189// Encode tasks -> Collect task190let (mut lin_rx, lin_txs) =191Linearizer::new(state.num_pipelines, *DEFAULT_SINK_LINEARIZER_BUFFER_SIZE);192193let write_options = &self.write_options;194195let options = WriteOptions {196statistics: write_options.statistics,197compression: write_options.compression.into(),198version: Version::V1,199data_page_size: write_options.data_page_size,200};201202// Buffer task.203join_handles.push(buffer_and_distribute_columns_task(204recv_port_rx,205dist_tx,206write_options207.row_group_size208.unwrap_or(DEFAULT_ROW_GROUP_SIZE),209self.input_schema.clone(),210self.metrics.clone(),211));212213// Encode task.214//215// Task encodes the columns into their corresponding Parquet encoding.216join_handles.extend(217dist_rxs218.into_iter()219.zip(lin_txs)220.map(|(mut dist_rx, mut lin_tx)| {221let parquet_schema = self.parquet_schema.clone();222let column_options = self.column_options.clone();223224spawn(TaskPriority::High, async move {225while let Ok((rg_idx, col_idx, column)) = dist_rx.recv().await {226let type_ = &parquet_schema.fields()[col_idx];227let column_options = &column_options[col_idx];228229let array = column.as_materialized_series().rechunk();230let array = array.to_arrow(0, CompatLevel::newest());231232// @TODO: This causes all structs fields to be handled on a single thread. It233// would be preferable to split the encoding among multiple threads.234235// @NOTE: Since one Polars column might contain multiple Parquet columns (when236// it has a struct datatype), we return a Vec<Vec<CompressedPage>>.237238// Array -> Parquet pages.239let encoded_columns =240array_to_columns(array, type_.clone(), column_options, options)?;241242// Compress the pages.243let compressed_pages = encoded_columns244.into_iter()245.map(|encoded_pages| {246Compressor::new_from_vec(247encoded_pages.map(|result| {248result.map_err(|e| {249ParquetError::FeatureNotSupported(format!(250"reraised in polars: {e}",251))252})253}),254options.compression,255vec![],256)257.collect::<ParquetResult<Vec<_>>>()258})259.collect::<ParquetResult<Vec<_>>>()?;260261if lin_tx262.insert(Priority(Reverse(rg_idx), (col_idx, compressed_pages)))263.await264.is_err()265{266return Ok(());267}268}269270PolarsResult::Ok(())271})272}),273);274275// Collect Task.276//277// Collects all the encoded data and packs it together for the IO task to write it.278let input_schema = self.input_schema.clone();279let num_parquet_columns = self.parquet_schema.leaves().len();280join_handles.push(spawn(TaskPriority::High, async move {281struct Current {282seq: usize,283num_columns_seen: usize,284columns: Vec<Option<Vec<Vec<CompressedPage>>>>,285}286287let mut current = Current {288seq: 0,289num_columns_seen: 0,290columns: (0..input_schema.len()).map(|_| None).collect(),291};292293// Linearize from all the Encoder tasks.294while let Some(Priority(Reverse(seq), (i, compressed_pages))) = lin_rx.get().await {295if current.num_columns_seen == 0 {296current.seq = seq;297}298299debug_assert_eq!(current.seq, seq);300debug_assert!(current.columns[i].is_none());301current.columns[i] = Some(compressed_pages);302current.num_columns_seen += 1;303304if current.num_columns_seen == input_schema.len() {305// @Optimize: Keep track of these sizes so we can correctly preallocate306// them.307let mut current_row_group: Vec<Vec<CompressedPage>> =308Vec::with_capacity(num_parquet_columns);309for column in current.columns.iter_mut() {310current_row_group.extend(column.take().unwrap());311}312313if io_tx.send(current_row_group).await.is_err() {314return Ok(());315}316current.num_columns_seen = 0;317}318}319320Ok(())321}));322}323324fn get_metrics(&self) -> PolarsResult<Option<WriteMetrics>> {325let file_size = self.file_size.load();326let metrics = self.metrics.lock().unwrap().take();327328Ok(metrics.map(|mut m| {329m.file_size = file_size;330m331}))332}333334fn finalize(335&mut self,336_state: &StreamingExecutionState,337) -> Option<Pin<Box<dyn Future<Output = PolarsResult<()>> + Send>>> {338// If we were never spawned, we need to make sure that the `tx` is taken. This signals to339// the IO task that it is done and prevents deadlocks.340drop(self.io_tx.take());341342let io_task = self343.io_task344.take()345.expect("not initialized / finish called more than once");346347// Wait for the IO task to complete.348Some(Box::pin(async move {349io_task350.await351.unwrap_or_else(|e| Err(std::io::Error::from(e).into()))352}))353}354}355356357