Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/io_sinks2/config.rs
7884 views
1
use std::sync::Arc;
2
3
use polars_core::prelude::SortMultipleOptions;
4
use polars_core::schema::SchemaRef;
5
use polars_plan::dsl::sink2::FileProviderType;
6
use polars_plan::dsl::{FileType, SinkTarget, UnifiedSinkArgs};
7
use polars_utils::plpath::{CloudScheme, PlPath};
8
9
use crate::expression::StreamExpr;
10
use crate::nodes::io_sinks2::components::hstack_columns::HStackColumns;
11
use crate::nodes::io_sinks2::components::partitioner::Partitioner;
12
use crate::nodes::io_sinks2::components::size::RowCountAndSize;
13
14
pub struct IOSinkNodeConfig {
15
pub file_format: Arc<FileType>,
16
pub target: IOSinkTarget,
17
pub unified_sink_args: UnifiedSinkArgs,
18
pub input_schema: SchemaRef,
19
pub num_pipelines: usize,
20
}
21
22
impl IOSinkNodeConfig {
23
pub fn per_sink_pipeline_depth(&self) -> usize {
24
self.inflight_morsel_limit().min(self.num_pipelines)
25
}
26
27
pub fn inflight_morsel_limit(&self) -> usize {
28
if let Ok(v) = std::env::var("POLARS_INFLIGHT_SINK_MORSEL_LIMIT").map(|x| {
29
x.parse::<usize>()
30
.ok()
31
.filter(|x| *x > 0)
32
.unwrap_or_else(|| {
33
panic!("invalid value for POLARS_INFLIGHT_SINK_MORSEL_LIMIT: {x}")
34
})
35
}) {
36
return v;
37
};
38
39
self.num_pipelines.saturating_add(
40
// Additional buffer to accommodate head-of-line blocking
41
4,
42
)
43
}
44
45
pub fn max_open_sinks(&self) -> usize {
46
if let Ok(v) = std::env::var("POLARS_MAX_OPEN_SINKS").map(|x| {
47
x.parse::<usize>()
48
.ok()
49
.filter(|x| *x > 0)
50
.unwrap_or_else(|| panic!("invalid value for POLARS_MAX_OPEN_SINKS: {x}"))
51
}) {
52
return v;
53
}
54
55
if self.target.is_cloud_location() {
56
512
57
} else {
58
128
59
}
60
}
61
62
pub fn cloud_upload_chunk_size(&self) -> usize {
63
polars_io::get_upload_chunk_size()
64
}
65
66
pub fn partitioned_cloud_upload_chunk_size(&self) -> usize {
67
if let Ok(v) = std::env::var("POLARS_PARTITIONED_UPLOAD_CHUNK_SIZE").map(|x| {
68
x.parse::<usize>()
69
.ok()
70
.filter(|x| *x > 0)
71
.unwrap_or_else(|| {
72
panic!("invalid value for POLARS_PARTITIONED_UPLOAD_CHUNK_SIZE: {x}")
73
})
74
}) {
75
return v;
76
}
77
78
6 * 1024 * 1024
79
}
80
}
81
82
pub enum IOSinkTarget {
83
File(SinkTarget),
84
Partitioned(Box<PartitionedTarget>),
85
}
86
87
impl IOSinkTarget {
88
pub fn is_cloud_location(&self) -> bool {
89
match self {
90
Self::File(v) => v.cloud_scheme(),
91
Self::Partitioned(v) => v.base_path.cloud_scheme(),
92
}
93
.is_some_and(|x| !matches!(x, CloudScheme::File | CloudScheme::FileNoHostname))
94
}
95
}
96
97
pub struct PartitionedTarget {
98
pub base_path: PlPath,
99
pub file_path_provider: FileProviderType,
100
pub partitioner: Partitioner,
101
/// How to hstack the keys back into the dataframe (with_columns)
102
pub hstack_keys: Option<HStackColumns>,
103
pub include_keys_in_file: bool,
104
pub file_schema: SchemaRef,
105
pub file_size_limit: Option<RowCountAndSize>,
106
pub per_partition_sort: Option<(Arc<[StreamExpr]>, SortMultipleOptions)>,
107
}
108
109