Path: blob/main/crates/polars-stream/src/nodes/io_sinks/writers/ndjson/mod.rs
8485 views
use polars_core::config;1use polars_core::schema::SchemaRef;2use polars_error::PolarsResult;3use polars_io::ndjson::NDJsonWriterOptions;4use polars_io::pl_async;5use polars_utils::IdxSize;6use polars_utils::index::NonZeroIdxSize;78use crate::async_executor::{self, TaskPriority};9use crate::async_primitives::connector;10use crate::morsel::get_ideal_morsel_size;11use crate::nodes::io_sinks::components::sink_morsel::{SinkMorsel, SinkMorselPermit};12use crate::nodes::io_sinks::components::size::{13NonZeroRowCountAndSize, RowCountAndSize, TakeableRowsProvider,14};15use crate::nodes::io_sinks::writers::interface::{16FileOpenTaskHandle, FileWriterStarter, ideal_sink_morsel_size_env,17};18use crate::utils::tokio_handle_ext;1920mod io_writer;21mod morsel_serializer;2223pub struct NDJsonWriterStarter {24pub options: NDJsonWriterOptions,25pub schema: SchemaRef,26pub initialized_state: std::sync::Mutex<Option<InitializedState>>,27}2829#[derive(Clone)]30pub struct InitializedState {31pub ideal_morsel_size: NonZeroRowCountAndSize,32pub base_allocation_size: usize,33}3435impl NDJsonWriterStarter {36fn initialized_state(&self) -> InitializedState {37let mut initialized_state = self.initialized_state.lock().unwrap();3839if initialized_state.is_none() {40let (env_num_rows, env_num_bytes) = ideal_sink_morsel_size_env();4142let ideal_morsel_size = RowCountAndSize {43num_rows: env_num_rows44.unwrap_or(get_ideal_morsel_size().try_into().unwrap_or(IdxSize::MAX)),45num_bytes: env_num_bytes.unwrap_or(8 * 1024 * 1024),46};4748let serialized_row_size_estimate = u64::saturating_mul(self.schema.len() as _, 50);4950let base_allocation_size: usize = u64::min(5164 * 1024 * 1024,52u64::min(53ideal_morsel_size.num_bytes.saturating_mul(3),54u64::saturating_mul(55serialized_row_size_estimate,56ideal_morsel_size.num_rows as _,57),58),59) as _;6061if config::verbose() {62eprintln!("[NDJsonWriterStarter]: base_allocation_size: {base_allocation_size}")63}6465let ideal_morsel_size = NonZeroRowCountAndSize::new(ideal_morsel_size).unwrap();6667*initialized_state = Some(InitializedState {68ideal_morsel_size,69base_allocation_size,70})71}7273initialized_state.clone().unwrap()74}75}7677impl FileWriterStarter for NDJsonWriterStarter {78fn writer_name(&self) -> &str {79"ndjson"80}8182fn takeable_rows_provider(&self) -> TakeableRowsProvider {83TakeableRowsProvider {84max_size: self.initialized_state().ideal_morsel_size,85byte_size_min_rows: NonZeroIdxSize::new(256).unwrap(),86allow_non_max_size: true,87}88}8990fn start_file_writer(91&self,92morsel_rx: connector::Receiver<SinkMorsel>,93file: FileOpenTaskHandle,94num_pipelines: std::num::NonZeroUsize,95) -> PolarsResult<async_executor::JoinHandle<PolarsResult<()>>> {96let (filled_serializer_tx, filled_serializer_rx) = tokio::sync::mpsc::channel::<(97async_executor::AbortOnDropHandle<PolarsResult<morsel_serializer::MorselSerializer>>,98SinkMorselPermit,99)>(num_pipelines.get());100101let max_serializers = num_pipelines.get();102let (reuse_serializer_tx, reuse_serializer_rx) =103tokio::sync::mpsc::channel::<morsel_serializer::MorselSerializer>(max_serializers);104105let io_handle = tokio_handle_ext::AbortOnDropHandle(106pl_async::get_runtime().spawn(107io_writer::IOWriter {108file,109filled_serializer_rx,110reuse_serializer_tx,111options: self.options,112}113.run(),114),115);116117let base_allocation_size = self.initialized_state().base_allocation_size;118119let serializer_handle = async_executor::spawn(120TaskPriority::High,121morsel_serializer::MorselSerializerPipeline {122morsel_rx,123filled_serializer_tx,124reuse_serializer_rx,125max_serializers,126base_allocation_size,127}128.run(),129);130131Ok(async_executor::spawn(TaskPriority::Low, async move {132io_handle.await.unwrap()?;133serializer_handle.await;134Ok(())135}))136}137}138139140