Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-python/src/io/sink_output.rs
7889 views
1
use polars::prelude::sink::{PartitionTargetCallback, SinkFinishCallback};
2
use polars::prelude::sink2::FileProviderType;
3
use polars::prelude::{PartitionStrategy, PlPath, SinkDestination, SortColumn};
4
use polars_utils::IdxSize;
5
use polars_utils::python_function::PythonObject;
6
use pyo3::exceptions::PyValueError;
7
use pyo3::types::PyAnyMethods;
8
use pyo3::{Bound, FromPyObject, Py, PyAny, PyResult, intern};
9
10
use crate::PyExpr;
11
use crate::prelude::Wrap;
12
13
pub struct PyFileSinkDestination<'py>(Bound<'py, pyo3::PyAny>);
14
15
impl<'py> FromPyObject<'py> for PyFileSinkDestination<'py> {
16
fn extract_bound(ob: &Bound<'py, pyo3::PyAny>) -> pyo3::PyResult<Self> {
17
Ok(Self(ob.clone()))
18
}
19
}
20
21
impl PyFileSinkDestination<'_> {
22
pub fn extract_file_sink_destination(&self) -> PyResult<SinkDestination> {
23
let py = self.0.py();
24
25
if let Ok(sink_output_dataclass) = self.0.getattr(intern!(py, "_pl_sink_directory")) {
26
return self.extract_from_py_sink_directory(sink_output_dataclass);
27
};
28
29
let v: Wrap<polars_plan::dsl::SinkTarget> = self.0.extract()?;
30
31
Ok(SinkDestination::File { target: v.0 })
32
}
33
34
fn extract_from_py_sink_directory(
35
&self,
36
sink_output_dataclass: Bound<'_, PyAny>,
37
) -> PyResult<SinkDestination> {
38
/// Extract from `SinkDirectoryInner` dataclass.
39
#[derive(FromPyObject)]
40
struct Extract {
41
base_path: Wrap<PlPath>,
42
file_path_provider: Option<Py<PyAny>>,
43
partition_by: Option<Vec<PyExpr>>,
44
partition_keys_sorted: Option<bool>,
45
include_keys: Option<bool>,
46
per_partition_sort_by: Option<Vec<PyExpr>>,
47
per_file_sort_by: Option<Vec<PyExpr>>,
48
max_rows_per_file: Option<IdxSize>,
49
finish_callback: Option<Py<PyAny>>,
50
}
51
52
let Extract {
53
base_path,
54
file_path_provider,
55
partition_by,
56
partition_keys_sorted,
57
include_keys,
58
per_partition_sort_by,
59
per_file_sort_by,
60
max_rows_per_file,
61
finish_callback,
62
} = sink_output_dataclass.extract()?;
63
64
if per_partition_sort_by.is_some() && per_file_sort_by.is_some() {
65
return Err(PyValueError::new_err(
66
"cannot specify both 'per_partition_sort_by' and 'per_file_sort_by'",
67
));
68
}
69
70
let partition_strategy: PartitionStrategy = if let Some(partition_by) = partition_by {
71
if max_rows_per_file.is_some() {
72
return Err(PyValueError::new_err(
73
"unimplemented: 'max_rows_per_file' with 'partition_by'",
74
));
75
}
76
77
if per_file_sort_by.is_some() {
78
return Err(PyValueError::new_err(
79
"unimplemented: 'per_file_sort_by' with 'partition_by'",
80
));
81
}
82
83
PartitionStrategy::Keyed {
84
keys: partition_by.into_iter().map(|x| x.inner).collect(),
85
include_keys: include_keys.unwrap_or(true),
86
keys_pre_grouped: false,
87
per_partition_sort_by: per_partition_sort_by
88
.unwrap_or_default()
89
.into_iter()
90
.map(|x| SortColumn {
91
expr: x.inner,
92
descending: false,
93
nulls_last: false,
94
})
95
.collect(),
96
}
97
} else if let Some(parameter_name) = partition_keys_sorted
98
.as_ref()
99
.is_some()
100
.then_some("partition_keys_sorted")
101
.or(include_keys.is_some().then_some("include_keys"))
102
.or(per_partition_sort_by
103
.is_some()
104
.then_some("per_partition_sort_by"))
105
{
106
return Err(PyValueError::new_err(format!(
107
"cannot use '{parameter_name}' without specifying `partition_by`"
108
)));
109
} else if max_rows_per_file.is_some() {
110
PartitionStrategy::FileSize
111
} else {
112
return Err(PyValueError::new_err(
113
"at least one of ('partition_by', 'max_rows_per_file') \
114
must be specified for SinkPartitioned",
115
));
116
};
117
118
Ok(SinkDestination::Partitioned {
119
base_path: base_path.0,
120
file_path_provider: file_path_provider.map(|x| {
121
FileProviderType::Legacy(PartitionTargetCallback::Python(PythonObject(x)))
122
}),
123
partition_strategy,
124
finish_callback: finish_callback.map(|x| SinkFinishCallback::Python(PythonObject(x))),
125
max_rows_per_file: max_rows_per_file.unwrap_or(IdxSize::MAX),
126
approximate_bytes_per_file: u64::MAX,
127
})
128
}
129
}
130
131