Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sinks2/writers/interface.rs
7884 views
1
use polars_error::PolarsResult;
2
use polars_io::utils::file::Writeable;
3
use polars_utils::IdxSize;
4
5
use crate::async_executor;
6
use crate::async_primitives::connector;
7
use crate::nodes::io_sinks2::components::sink_morsel::SinkMorsel;
8
use crate::nodes::io_sinks2::components::size::RowCountAndSize;
9
use crate::utils::tokio_handle_ext;
10
11
pub trait FileWriterStarter: Send + Sync + 'static {
12
fn writer_name(&self) -> &str;
13
14
/// Hints to the sender how morsels should be sized.
15
fn ideal_morsel_size(&self) -> RowCountAndSize;
16
17
fn start_file_writer(
18
&self,
19
morsel_rx: connector::Receiver<SinkMorsel>,
20
file: tokio_handle_ext::AbortOnDropHandle<PolarsResult<Writeable>>,
21
) -> PolarsResult<async_executor::JoinHandle<PolarsResult<()>>>;
22
}
23
24
pub(super) fn default_ideal_sink_morsel_size() -> RowCountAndSize {
25
RowCountAndSize {
26
num_rows: std::env::var("POLARS_IDEAL_SINK_MORSEL_SIZE_ROWS")
27
.map(|x| {
28
x.parse::<IdxSize>()
29
.ok()
30
.filter(|x| *x > 0)
31
.unwrap_or_else(|| {
32
panic!("invalid value for POLARS_IDEAL_SINK_MORSEL_SIZE_ROWS: {x}")
33
})
34
})
35
.unwrap_or(122_880),
36
num_bytes: std::env::var("POLARS_IDEAL_SINK_MORSEL_SIZE_BYTES")
37
.map(|x| {
38
x.parse::<u64>().ok().filter(|x| *x > 0).unwrap_or_else(|| {
39
panic!("invalid value for POLARS_IDEAL_SINK_MORSEL_SIZE_BYTES: {x}")
40
})
41
})
42
.unwrap_or(64 * 1024 * 1024),
43
}
44
}
45
46