Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sinks/writers/interface.rs
8463 views
1
use std::num::{NonZeroU64, NonZeroUsize};
2
3
use futures::FutureExt;
4
use polars_error::PolarsResult;
5
use polars_io::utils::file::Writeable;
6
use polars_io::utils::sync_on_close::SyncOnCloseType;
7
use polars_utils::IdxSize;
8
use polars_utils::index::NonZeroIdxSize;
9
use polars_utils::pl_str::PlSmallStr;
10
11
use crate::async_executor;
12
use crate::async_primitives::connector;
13
use crate::nodes::io_sinks::components::sink_morsel::SinkMorsel;
14
use crate::nodes::io_sinks::components::size::TakeableRowsProvider;
15
use crate::utils::tokio_handle_ext;
16
17
pub const IPC_RW_RECORD_BATCH_FLAGS_KEY: PlSmallStr =
18
PlSmallStr::from_static("polars:statistics:v1");
19
20
pub trait FileWriterStarter: Send + Sync + 'static {
21
fn writer_name(&self) -> &str;
22
23
/// Hints to the sender how morsels should be sized.
24
fn takeable_rows_provider(&self) -> TakeableRowsProvider;
25
26
fn start_file_writer(
27
&self,
28
morsel_rx: connector::Receiver<SinkMorsel>,
29
file: FileOpenTaskHandle,
30
num_pipelines: NonZeroUsize,
31
) -> PolarsResult<async_executor::JoinHandle<PolarsResult<()>>>;
32
}
33
34
pub struct FileOpenTaskHandle {
35
handle: tokio_handle_ext::AbortOnDropHandle<PolarsResult<Writeable>>,
36
sync_on_close: SyncOnCloseType,
37
}
38
39
impl FileOpenTaskHandle {
40
pub fn new(
41
handle: tokio_handle_ext::AbortOnDropHandle<PolarsResult<Writeable>>,
42
sync_on_close: SyncOnCloseType,
43
) -> Self {
44
Self {
45
handle,
46
sync_on_close,
47
}
48
}
49
}
50
51
impl std::future::Future for FileOpenTaskHandle {
52
type Output = PolarsResult<(Writeable, SyncOnCloseType)>;
53
54
fn poll(
55
mut self: std::pin::Pin<&mut Self>,
56
cx: &mut std::task::Context<'_>,
57
) -> std::task::Poll<Self::Output> {
58
use std::task::Poll;
59
60
let file: Result<_, tokio::task::JoinError> = futures::ready!(self.handle.poll_unpin(cx));
61
let file: PolarsResult<Writeable> = file.unwrap();
62
63
Poll::Ready(file.map(|file| (file, self.sync_on_close)))
64
}
65
}
66
67
/// Load ideal morsel size configuration from environment variables.
68
pub(super) fn ideal_sink_morsel_size_env() -> (Option<IdxSize>, Option<u64>) {
69
let num_rows = std::env::var("POLARS_IDEAL_SINK_MORSEL_SIZE_ROWS")
70
.map(|x| {
71
x.parse::<NonZeroIdxSize>()
72
.ok()
73
.unwrap_or_else(|| {
74
panic!("invalid value for POLARS_IDEAL_SINK_MORSEL_SIZE_ROWS: {x}")
75
})
76
.get()
77
})
78
.ok();
79
80
let num_bytes = std::env::var("POLARS_IDEAL_SINK_MORSEL_SIZE_BYTES")
81
.map(|x| {
82
x.parse::<NonZeroU64>()
83
.ok()
84
.unwrap_or_else(|| {
85
panic!("invalid value for POLARS_IDEAL_SINK_MORSEL_SIZE_BYTES: {x}")
86
})
87
.get()
88
})
89
.ok();
90
91
(
92
num_rows,
93
num_bytes.or(num_rows.is_some().then_some(u64::MAX)),
94
)
95
}
96
97