Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-plan/src/dsl/options/sink2.rs
7884 views
1
use std::hash::{Hash, Hasher};
2
use std::sync::Arc;
3
4
use polars_core::frame::DataFrame;
5
use polars_error::PolarsResult;
6
use polars_io::cloud::CloudOptions;
7
use polars_io::utils::file::Writeable;
8
use polars_io::utils::sync_on_close::SyncOnCloseType;
9
use polars_utils::IdxSize;
10
use polars_utils::arena::Arena;
11
use polars_utils::pl_str::PlSmallStr;
12
use polars_utils::plpath::{CloudScheme, PlPath};
13
14
use super::Expr;
15
use super::sink::*;
16
use crate::plans::{AExpr, ExprIR};
17
use crate::prelude::PlanCallback;
18
19
#[derive(Clone, Debug, PartialEq)]
20
pub enum SinkDestination {
21
File {
22
target: SinkTarget,
23
},
24
Partitioned {
25
base_path: PlPath,
26
file_path_provider: Option<FileProviderType>,
27
partition_strategy: PartitionStrategy,
28
/// TODO: Remove
29
finish_callback: Option<SinkFinishCallback>,
30
max_rows_per_file: IdxSize,
31
approximate_bytes_per_file: u64,
32
},
33
}
34
35
impl SinkDestination {
36
pub fn cloud_scheme(&self) -> Option<CloudScheme> {
37
match self {
38
Self::File { target } => target.cloud_scheme(),
39
Self::Partitioned { base_path, .. } => base_path.cloud_scheme(),
40
}
41
}
42
}
43
44
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
45
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
46
#[derive(Clone, Debug, Hash, PartialEq)]
47
pub struct UnifiedSinkArgs {
48
pub mkdir: bool,
49
pub maintain_order: bool,
50
pub sync_on_close: SyncOnCloseType,
51
pub cloud_options: Option<Arc<CloudOptions>>,
52
}
53
54
impl Default for UnifiedSinkArgs {
55
fn default() -> Self {
56
Self {
57
mkdir: false,
58
maintain_order: true,
59
sync_on_close: SyncOnCloseType::None,
60
cloud_options: None,
61
}
62
}
63
}
64
65
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
66
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
67
#[derive(Clone, Debug, PartialEq)]
68
pub enum PartitionStrategy {
69
Keyed {
70
keys: Vec<Expr>,
71
include_keys: bool,
72
keys_pre_grouped: bool,
73
per_partition_sort_by: Vec<SortColumn>,
74
},
75
/// Split the size of the input stream into chunks.
76
///
77
/// Semantically equivalent to a 0-key partition by.
78
FileSize,
79
}
80
81
#[cfg_attr(feature = "ir_serde", derive(serde::Serialize, serde::Deserialize))]
82
#[derive(Clone, Debug, PartialEq, strum_macros::IntoStaticStr)]
83
pub enum PartitionStrategyIR {
84
Keyed {
85
keys: Vec<ExprIR>,
86
include_keys: bool,
87
keys_pre_grouped: bool,
88
per_partition_sort_by: Vec<SortColumnIR>,
89
},
90
/// Split the size of the input stream into chunks.
91
///
92
/// Semantically equivalent to a 0-key partition by.
93
FileSize,
94
}
95
96
#[cfg(feature = "cse")]
97
impl PartitionStrategyIR {
98
pub(crate) fn traverse_and_hash<H: Hasher>(&self, expr_arena: &Arena<AExpr>, state: &mut H) {
99
std::mem::discriminant(self).hash(state);
100
match self {
101
Self::Keyed {
102
keys,
103
include_keys,
104
keys_pre_grouped,
105
per_partition_sort_by,
106
} => {
107
for k in keys {
108
k.traverse_and_hash(expr_arena, state);
109
}
110
111
include_keys.hash(state);
112
keys_pre_grouped.hash(state);
113
114
for x in per_partition_sort_by {
115
x.traverse_and_hash(expr_arena, state);
116
}
117
},
118
Self::FileSize => {},
119
}
120
}
121
}
122
123
#[derive(Debug)]
124
pub struct FileProviderArgs {
125
pub index_in_partition: usize,
126
pub partition_keys: Arc<DataFrame>,
127
}
128
129
pub enum FileProviderReturn {
130
Path(String),
131
Writeable(Writeable),
132
}
133
134
pub type FileProviderFunction = PlanCallback<FileProviderArgs, FileProviderReturn>;
135
136
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
137
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
138
#[derive(Clone, Debug, Hash, PartialEq)]
139
pub enum FileProviderType {
140
Hive { extension: PlSmallStr },
141
Function(FileProviderFunction),
142
Legacy(PartitionTargetCallback),
143
}
144
145
impl FileProviderFunction {
146
pub fn get_file(&self, args: FileProviderArgs) -> PolarsResult<FileProviderReturn> {
147
match self {
148
Self::Rust(func) => (func)(args),
149
#[cfg(feature = "python")]
150
Self::Python(object) => pyo3::Python::attach(|py| {
151
use polars_error::PolarsError;
152
use pyo3::intern;
153
use pyo3::types::{PyAnyMethods, PyDict};
154
155
let FileProviderArgs {
156
index_in_partition,
157
partition_keys,
158
} = args;
159
160
let convert_registry =
161
polars_utils::python_convert_registry::get_python_convert_registry();
162
163
let partition_keys = convert_registry
164
.to_py
165
.df_to_wrapped_pydf(partition_keys.as_ref())
166
.map_err(PolarsError::from)?;
167
168
let kwargs = PyDict::new(py);
169
kwargs.set_item(intern!(py, "index_in_partition"), index_in_partition)?;
170
kwargs.set_item(intern!(py, "partition_keys"), partition_keys)?;
171
172
let args_dataclass = convert_registry.py_file_provider_args_dataclass().call(
173
py,
174
(),
175
Some(&kwargs),
176
)?;
177
178
let out = object.call1(py, (args_dataclass,))?;
179
let out = (convert_registry.from_py.file_provider_result)(out)?;
180
let out: FileProviderReturn = *out.downcast().unwrap();
181
182
PolarsResult::Ok(out)
183
}),
184
}
185
}
186
}
187
188