Path: blob/main/crates/polars-stream/src/nodes/io_sinks/writers/csv/mod.rs
8479 views
use std::sync::Arc;12use polars_core::config;3use polars_core::schema::SchemaRef;4use polars_error::PolarsResult;5use polars_io::pl_async;6use polars_io::prelude::{CsvSerializer, CsvWriterOptions};7use polars_utils::index::NonZeroIdxSize;89use crate::async_executor::{self, TaskPriority};10use crate::async_primitives::connector;11use crate::nodes::io_sinks::components::sink_morsel::{SinkMorsel, SinkMorselPermit};12use crate::nodes::io_sinks::components::size::{13NonZeroRowCountAndSize, RowCountAndSize, TakeableRowsProvider,14};15use crate::nodes::io_sinks::writers::interface::{16FileOpenTaskHandle, FileWriterStarter, ideal_sink_morsel_size_env,17};18use crate::utils::tokio_handle_ext;1920mod io_writer;21mod morsel_serializer;2223pub struct CsvWriterStarter {24pub options: Arc<CsvWriterOptions>,25/// `Mutex` is to handle `dyn ColumnSerializer` not being `Sync`.26pub base_serializer: std::sync::Mutex<CsvSerializer>,27pub schema: SchemaRef,28pub initialized_state: std::sync::Mutex<Option<InitializedState>>,29}3031#[derive(Clone)]32pub struct InitializedState {33pub ideal_morsel_size: NonZeroRowCountAndSize,34pub base_allocation_size: usize,35}3637impl CsvWriterStarter {38fn initialized_state(&self) -> InitializedState {39let mut initialized_state = self.initialized_state.lock().unwrap();4041if initialized_state.is_none() {42let (env_num_rows, env_num_bytes) = ideal_sink_morsel_size_env();4344let ideal_morsel_size = RowCountAndSize {45num_rows: env_num_rows.unwrap_or(25 * 1024),46num_bytes: env_num_bytes.unwrap_or(8 * 1024 * 1024),47};4849let serialized_row_size_estimate = u64::saturating_mul(self.schema.len() as _, 25);5051let base_allocation_size: usize = u64::min(5264 * 1024 * 1024,53u64::min(54ideal_morsel_size.num_bytes.div_ceil(2).saturating_mul(5),55u64::saturating_mul(56serialized_row_size_estimate,57ideal_morsel_size.num_rows as _,58),59),60) as _;6162if config::verbose() {63eprintln!("[CsvWriterStarter]: base_allocation_size: {base_allocation_size}")64}6566let ideal_morsel_size = NonZeroRowCountAndSize::new(ideal_morsel_size).unwrap();6768*initialized_state = Some(InitializedState {69ideal_morsel_size,70base_allocation_size,71})72}7374initialized_state.clone().unwrap()75}76}7778impl FileWriterStarter for CsvWriterStarter {79fn writer_name(&self) -> &str {80"csv"81}8283fn takeable_rows_provider(&self) -> TakeableRowsProvider {84TakeableRowsProvider {85max_size: self.initialized_state().ideal_morsel_size,86byte_size_min_rows: NonZeroIdxSize::new(256).unwrap(),87allow_non_max_size: true,88}89}9091fn start_file_writer(92&self,93morsel_rx: connector::Receiver<SinkMorsel>,94file: FileOpenTaskHandle,95num_pipelines: std::num::NonZeroUsize,96) -> PolarsResult<async_executor::JoinHandle<PolarsResult<()>>> {97let (filled_serializer_tx, filled_serializer_rx) = tokio::sync::mpsc::channel::<(98async_executor::AbortOnDropHandle<PolarsResult<morsel_serializer::MorselSerializer>>,99SinkMorselPermit,100)>(num_pipelines.get());101102let max_serializers = num_pipelines.get();103let (reuse_serializer_tx, reuse_serializer_rx) =104tokio::sync::mpsc::channel::<morsel_serializer::MorselSerializer>(max_serializers);105106let io_handle = tokio_handle_ext::AbortOnDropHandle(107pl_async::get_runtime().spawn(108io_writer::IOWriter {109file,110filled_serializer_rx,111reuse_serializer_tx,112schema: Arc::clone(&self.schema),113options: Arc::clone(&self.options),114}115.run(),116),117);118119let base_csv_serializer = { self.base_serializer.lock().unwrap().clone() };120let base_allocation_size = self.initialized_state().base_allocation_size;121122let serializer_handle = async_executor::spawn(123TaskPriority::High,124morsel_serializer::MorselSerializerPipeline {125morsel_rx,126filled_serializer_tx,127reuse_serializer_rx,128base_csv_serializer,129base_allocation_size,130max_serializers,131}132.run(),133);134135Ok(async_executor::spawn(TaskPriority::Low, async move {136io_handle.await.unwrap()?;137serializer_handle.await;138Ok(())139}))140}141}142143144