Path: blob/main/crates/polars-stream/src/nodes/io_sinks2/writers/interface.rs
7884 views
use polars_error::PolarsResult;1use polars_io::utils::file::Writeable;2use polars_utils::IdxSize;34use crate::async_executor;5use crate::async_primitives::connector;6use crate::nodes::io_sinks2::components::sink_morsel::SinkMorsel;7use crate::nodes::io_sinks2::components::size::RowCountAndSize;8use crate::utils::tokio_handle_ext;910pub trait FileWriterStarter: Send + Sync + 'static {11fn writer_name(&self) -> &str;1213/// Hints to the sender how morsels should be sized.14fn ideal_morsel_size(&self) -> RowCountAndSize;1516fn start_file_writer(17&self,18morsel_rx: connector::Receiver<SinkMorsel>,19file: tokio_handle_ext::AbortOnDropHandle<PolarsResult<Writeable>>,20) -> PolarsResult<async_executor::JoinHandle<PolarsResult<()>>>;21}2223pub(super) fn default_ideal_sink_morsel_size() -> RowCountAndSize {24RowCountAndSize {25num_rows: std::env::var("POLARS_IDEAL_SINK_MORSEL_SIZE_ROWS")26.map(|x| {27x.parse::<IdxSize>()28.ok()29.filter(|x| *x > 0)30.unwrap_or_else(|| {31panic!("invalid value for POLARS_IDEAL_SINK_MORSEL_SIZE_ROWS: {x}")32})33})34.unwrap_or(122_880),35num_bytes: std::env::var("POLARS_IDEAL_SINK_MORSEL_SIZE_BYTES")36.map(|x| {37x.parse::<u64>().ok().filter(|x| *x > 0).unwrap_or_else(|| {38panic!("invalid value for POLARS_IDEAL_SINK_MORSEL_SIZE_BYTES: {x}")39})40})41.unwrap_or(64 * 1024 * 1024),42}43}444546