Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sinks/config.rs
8480 views
1
use std::num::NonZeroUsize;
2
3
use polars_core::schema::SchemaRef;
4
use polars_plan::dsl::file_provider::FileProviderType;
5
use polars_plan::dsl::{FileWriteFormat, SinkTarget, UnifiedSinkArgs};
6
use polars_utils::pl_path::{CloudScheme, PlRefPath};
7
8
use crate::nodes::io_sinks::components::hstack_columns::HStackColumns;
9
use crate::nodes::io_sinks::components::partitioner::Partitioner;
10
use crate::nodes::io_sinks::components::size::NonZeroRowCountAndSize;
11
12
pub struct IOSinkNodeConfig {
13
pub file_format: FileWriteFormat,
14
pub target: IOSinkTarget,
15
pub unified_sink_args: UnifiedSinkArgs,
16
pub input_schema: SchemaRef,
17
}
18
19
impl IOSinkNodeConfig {
20
pub fn num_pipelines_per_sink(&self, num_pipelines: NonZeroUsize) -> NonZeroUsize {
21
NonZeroUsize::min(num_pipelines, self.inflight_morsel_limit(num_pipelines))
22
}
23
24
pub fn inflight_morsel_limit(&self, num_pipelines: NonZeroUsize) -> NonZeroUsize {
25
if let Ok(v) = std::env::var("POLARS_INFLIGHT_SINK_MORSEL_LIMIT").map(|x| {
26
x.parse::<NonZeroUsize>().unwrap_or_else(|_| {
27
panic!("invalid value for POLARS_INFLIGHT_SINK_MORSEL_LIMIT: {x}")
28
})
29
}) {
30
return v;
31
};
32
33
NonZeroUsize::saturating_add(
34
num_pipelines,
35
// Additional buffer to accommodate head-of-line blocking
36
4,
37
)
38
}
39
40
pub fn max_open_sinks(&self) -> NonZeroUsize {
41
if let Ok(v) = std::env::var("POLARS_MAX_OPEN_SINKS").map(|x| {
42
x.parse::<NonZeroUsize>()
43
.unwrap_or_else(|_| panic!("invalid value for POLARS_MAX_OPEN_SINKS: {x}"))
44
}) {
45
return v;
46
}
47
48
if self.target.is_cloud_location() {
49
const { NonZeroUsize::new(512).unwrap() }
50
} else {
51
const { NonZeroUsize::new(128).unwrap() }
52
}
53
}
54
55
pub fn cloud_upload_chunk_size(&self) -> usize {
56
polars_io::configs::upload_chunk_size()
57
}
58
59
pub fn partitioned_upload_chunk_size(&self) -> usize {
60
polars_io::configs::partitioned_upload_chunk_size()
61
}
62
63
pub fn upload_concurrency(&self) -> NonZeroUsize {
64
polars_io::configs::upload_concurrency()
65
}
66
67
pub fn partitioned_upload_concurrency(&self) -> NonZeroUsize {
68
polars_io::configs::partitioned_upload_concurrency()
69
}
70
}
71
72
pub enum IOSinkTarget {
73
File(SinkTarget),
74
Partitioned(Box<PartitionedTarget>),
75
}
76
77
impl IOSinkTarget {
78
pub fn is_cloud_location(&self) -> bool {
79
match self {
80
Self::File(v) => v.cloud_scheme(),
81
Self::Partitioned(v) => v.base_path.scheme(),
82
}
83
.is_some_and(|x| !matches!(x, CloudScheme::File | CloudScheme::FileNoHostname))
84
}
85
}
86
87
pub struct PartitionedTarget {
88
pub base_path: PlRefPath,
89
pub file_path_provider: FileProviderType,
90
pub partitioner: Partitioner,
91
/// How to hstack the keys back into the dataframe (with_columns)
92
pub hstack_keys: Option<HStackColumns>,
93
pub include_keys_in_file: bool,
94
pub file_schema: SchemaRef,
95
pub file_size_limit: Option<NonZeroRowCountAndSize>,
96
}
97
98