Path: blob/main/crates/polars-stream/src/nodes/io_sinks/writers/interface.rs
8463 views
use std::num::{NonZeroU64, NonZeroUsize};12use futures::FutureExt;3use polars_error::PolarsResult;4use polars_io::utils::file::Writeable;5use polars_io::utils::sync_on_close::SyncOnCloseType;6use polars_utils::IdxSize;7use polars_utils::index::NonZeroIdxSize;8use polars_utils::pl_str::PlSmallStr;910use crate::async_executor;11use crate::async_primitives::connector;12use crate::nodes::io_sinks::components::sink_morsel::SinkMorsel;13use crate::nodes::io_sinks::components::size::TakeableRowsProvider;14use crate::utils::tokio_handle_ext;1516pub const IPC_RW_RECORD_BATCH_FLAGS_KEY: PlSmallStr =17PlSmallStr::from_static("polars:statistics:v1");1819pub trait FileWriterStarter: Send + Sync + 'static {20fn writer_name(&self) -> &str;2122/// Hints to the sender how morsels should be sized.23fn takeable_rows_provider(&self) -> TakeableRowsProvider;2425fn start_file_writer(26&self,27morsel_rx: connector::Receiver<SinkMorsel>,28file: FileOpenTaskHandle,29num_pipelines: NonZeroUsize,30) -> PolarsResult<async_executor::JoinHandle<PolarsResult<()>>>;31}3233pub struct FileOpenTaskHandle {34handle: tokio_handle_ext::AbortOnDropHandle<PolarsResult<Writeable>>,35sync_on_close: SyncOnCloseType,36}3738impl FileOpenTaskHandle {39pub fn new(40handle: tokio_handle_ext::AbortOnDropHandle<PolarsResult<Writeable>>,41sync_on_close: SyncOnCloseType,42) -> Self {43Self {44handle,45sync_on_close,46}47}48}4950impl std::future::Future for FileOpenTaskHandle {51type Output = PolarsResult<(Writeable, SyncOnCloseType)>;5253fn poll(54mut self: std::pin::Pin<&mut Self>,55cx: &mut std::task::Context<'_>,56) -> std::task::Poll<Self::Output> {57use std::task::Poll;5859let file: Result<_, tokio::task::JoinError> = futures::ready!(self.handle.poll_unpin(cx));60let file: PolarsResult<Writeable> = file.unwrap();6162Poll::Ready(file.map(|file| (file, self.sync_on_close)))63}64}6566/// Load ideal morsel size configuration from environment variables.67pub(super) fn ideal_sink_morsel_size_env() -> (Option<IdxSize>, Option<u64>) {68let num_rows = std::env::var("POLARS_IDEAL_SINK_MORSEL_SIZE_ROWS")69.map(|x| {70x.parse::<NonZeroIdxSize>()71.ok()72.unwrap_or_else(|| {73panic!("invalid value for POLARS_IDEAL_SINK_MORSEL_SIZE_ROWS: {x}")74})75.get()76})77.ok();7879let num_bytes = std::env::var("POLARS_IDEAL_SINK_MORSEL_SIZE_BYTES")80.map(|x| {81x.parse::<NonZeroU64>()82.ok()83.unwrap_or_else(|| {84panic!("invalid value for POLARS_IDEAL_SINK_MORSEL_SIZE_BYTES: {x}")85})86.get()87})88.ok();8990(91num_rows,92num_bytes.or(num_rows.is_some().then_some(u64::MAX)),93)94}959697