Path: blob/main/crates/polars-stream/src/nodes/io_sinks/csv.rs
6939 views
use std::cmp::Reverse;1use std::pin::Pin;23use polars_core::frame::DataFrame;4use polars_core::schema::SchemaRef;5use polars_error::PolarsResult;6use polars_io::SerWriter;7use polars_io::cloud::CloudOptions;8use polars_io::prelude::{CsvWriter, CsvWriterOptions};9use polars_plan::dsl::{SinkOptions, SinkTarget};10use polars_utils::priority::Priority;1112use super::{SinkInputPort, SinkNode};13use crate::async_executor::spawn;14use crate::async_primitives::connector::{Receiver, Sender, connector};15use crate::async_primitives::linearizer::Linearizer;16use crate::execute::StreamingExecutionState;17use crate::morsel::MorselSeq;18use crate::nodes::io_sinks::parallelize_receive_task;19use crate::nodes::io_sinks::phase::PhaseOutcome;20use crate::nodes::{JoinHandle, TaskPriority};2122type IOSend = Linearizer<Priority<Reverse<MorselSeq>, Vec<u8>>>;2324pub struct CsvSinkNode {25target: SinkTarget,26schema: SchemaRef,27sink_options: SinkOptions,28write_options: CsvWriterOptions,29cloud_options: Option<CloudOptions>,3031io_tx: Option<Sender<IOSend>>,32io_task: Option<tokio_util::task::AbortOnDropHandle<PolarsResult<()>>>,33}34impl CsvSinkNode {35pub fn new(36target: SinkTarget,37schema: SchemaRef,38sink_options: SinkOptions,39write_options: CsvWriterOptions,40cloud_options: Option<CloudOptions>,41) -> Self {42Self {43target,44schema,45sink_options,46write_options,47cloud_options,4849io_tx: None,50io_task: None,51}52}53}5455impl SinkNode for CsvSinkNode {56fn name(&self) -> &str {57"csv-sink"58}5960fn is_sink_input_parallel(&self) -> bool {61true62}6364fn initialize(&mut self, _state: &StreamingExecutionState) -> PolarsResult<()> {65let (io_tx, mut io_rx) = connector::<Linearizer<Priority<Reverse<MorselSeq>, Vec<u8>>>>();6667// IO task.68//69// Task that will actually do write to the target file.70let target = self.target.clone();71let sink_options = self.sink_options.clone();72let schema = self.schema.clone();73let options = self.write_options.clone();74let cloud_options = self.cloud_options.clone();75let io_task = polars_io::pl_async::get_runtime().spawn(async move {76use tokio::io::AsyncWriteExt;7778let mut file = target79.open_into_writeable_async(&sink_options, cloud_options.as_ref())80.await?;8182// Write the header83if options.include_header || options.include_bom {84let mut writer = CsvWriter::new(&mut *file)85.include_bom(options.include_bom)86.include_header(options.include_header)87.with_separator(options.serialize_options.separator)88.with_line_terminator(options.serialize_options.line_terminator.clone())89.with_quote_char(options.serialize_options.quote_char)90.with_datetime_format(options.serialize_options.datetime_format.clone())91.with_date_format(options.serialize_options.date_format.clone())92.with_time_format(options.serialize_options.time_format.clone())93.with_float_scientific(options.serialize_options.float_scientific)94.with_float_precision(options.serialize_options.float_precision)95.with_decimal_comma(options.serialize_options.decimal_comma)96.with_null_value(options.serialize_options.null.clone())97.with_quote_style(options.serialize_options.quote_style)98.n_threads(1) // Disable rayon parallelism99.batched(&schema)?;100writer.write_batch(&DataFrame::empty_with_schema(&schema))?;101}102103let mut file = file.try_into_async_writeable()?;104105while let Ok(mut lin_rx) = io_rx.recv().await {106while let Some(Priority(_, buffer)) = lin_rx.get().await {107file.write_all(&buffer).await?;108}109}110111file.sync_on_close(sink_options.sync_on_close).await?;112file.close().await?;113114PolarsResult::Ok(())115});116117self.io_tx = Some(io_tx);118self.io_task = Some(tokio_util::task::AbortOnDropHandle::new(io_task));119120Ok(())121}122123fn spawn_sink(124&mut self,125recv_port_rx: Receiver<(PhaseOutcome, SinkInputPort)>,126state: &StreamingExecutionState,127join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,128) {129let io_tx = self130.io_tx131.take()132.expect("not initialized / spawn called more than once");133let pass_rxs = parallelize_receive_task(134join_handles,135recv_port_rx,136state.num_pipelines,137self.sink_options.maintain_order,138io_tx,139);140141// 16MB142const DEFAULT_ALLOCATION_SIZE: usize = 1 << 24;143144// Encode task.145//146// Task encodes the columns into their corresponding CSV encoding.147join_handles.extend(pass_rxs.into_iter().map(|mut pass_rx| {148let schema = self.schema.clone();149let options = self.write_options.clone();150151spawn(TaskPriority::High, async move {152// Amortize the allocations over time. If we see that we need to do way larger153// allocations, we adjust to that over time.154let mut allocation_size = DEFAULT_ALLOCATION_SIZE;155let options = options.clone();156157while let Ok((mut rx, mut lin_tx)) = pass_rx.recv().await {158while let Ok(morsel) = rx.recv().await {159let (df, seq, _, consume_token) = morsel.into_inner();160161let mut buffer = Vec::with_capacity(allocation_size);162let mut writer = CsvWriter::new(&mut buffer)163.include_bom(false) // Handled once in the IO task.164.include_header(false) // Handled once in the IO task.165.with_separator(options.serialize_options.separator)166.with_line_terminator(options.serialize_options.line_terminator.clone())167.with_quote_char(options.serialize_options.quote_char)168.with_datetime_format(options.serialize_options.datetime_format.clone())169.with_date_format(options.serialize_options.date_format.clone())170.with_time_format(options.serialize_options.time_format.clone())171.with_float_scientific(options.serialize_options.float_scientific)172.with_float_precision(options.serialize_options.float_precision)173.with_decimal_comma(options.serialize_options.decimal_comma)174.with_null_value(options.serialize_options.null.clone())175.with_quote_style(options.serialize_options.quote_style)176.n_threads(1) // Disable rayon parallelism177.batched(&schema)?;178179writer.write_batch(&df)?;180181allocation_size = allocation_size.max(buffer.len());182if lin_tx.insert(Priority(Reverse(seq), buffer)).await.is_err() {183return Ok(());184}185drop(consume_token); // Keep the consume_token until here to increase the186// backpressure.187}188}189190PolarsResult::Ok(())191})192}));193}194195fn finalize(196&mut self,197_state: &StreamingExecutionState,198) -> Option<Pin<Box<dyn Future<Output = PolarsResult<()>> + Send>>> {199// If we were never spawned, we need to make sure that the `tx` is taken. This signals to200// the IO task that it is done and prevents deadlocks.201drop(self.io_tx.take());202203let io_task = self204.io_task205.take()206.expect("not initialized / finish called more than once");207208// Wait for the IO task to complete.209Some(Box::pin(async move {210io_task211.await212.unwrap_or_else(|e| Err(std::io::Error::from(e).into()))213}))214}215}216217218