Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-python/src/io/sink_output.rs
8409 views
1
use polars::prelude::file_provider::{FileProviderFunction, FileProviderType};
2
use polars::prelude::{PartitionStrategy, PlRefPath, SinkDestination, SpecialEq};
3
use polars_utils::IdxSize;
4
use polars_utils::python_function::PythonObject;
5
use pyo3::intern;
6
use pyo3::prelude::*;
7
8
use crate::PyExpr;
9
use crate::prelude::Wrap;
10
11
pub struct PyFileSinkDestination<'py>(Bound<'py, PyAny>);
12
13
impl<'a, 'py> FromPyObject<'a, 'py> for PyFileSinkDestination<'py> {
14
type Error = PyErr;
15
16
fn extract(ob: Borrowed<'a, 'py, PyAny>) -> PyResult<Self> {
17
Ok(Self(ob.to_owned()))
18
}
19
}
20
21
impl PyFileSinkDestination<'_> {
22
pub fn extract_file_sink_destination(&self) -> PyResult<SinkDestination> {
23
let py = self.0.py();
24
25
if let Ok(partition_by_dataclass) = self.0.getattr(intern!(py, "_pl_partition_by")) {
26
return self.extract_from_py_partition_by(partition_by_dataclass);
27
};
28
29
let v: Wrap<polars_plan::dsl::SinkTarget> = self.0.extract()?;
30
31
Ok(SinkDestination::File { target: v.0 })
32
}
33
34
fn extract_from_py_partition_by(
35
&self,
36
partition_by_dataclass: Bound<'_, PyAny>,
37
) -> PyResult<SinkDestination> {
38
/// Extract from `PartitionByInner` dataclass.
39
#[derive(FromPyObject)]
40
struct Extract {
41
base_path: Wrap<PlRefPath>,
42
file_path_provider: Option<Py<PyAny>>,
43
key: Option<Vec<PyExpr>>,
44
include_key: Option<bool>,
45
max_rows_per_file: Option<IdxSize>,
46
approximate_bytes_per_file: u64,
47
}
48
49
let Extract {
50
base_path,
51
file_path_provider,
52
key,
53
include_key,
54
max_rows_per_file,
55
approximate_bytes_per_file,
56
} = partition_by_dataclass.extract()?;
57
58
let partition_strategy: PartitionStrategy = if let Some(partition_by) = key {
59
PartitionStrategy::Keyed {
60
keys: partition_by.into_iter().map(|x| x.inner).collect(),
61
include_keys: include_key.unwrap_or(true),
62
keys_pre_grouped: false,
63
}
64
} else {
65
// Should be validated on Python side
66
assert!(include_key.is_none());
67
68
PartitionStrategy::FileSize
69
};
70
71
Ok(SinkDestination::Partitioned {
72
base_path: base_path.0,
73
file_path_provider: file_path_provider.map(|x| {
74
FileProviderType::Function(FileProviderFunction::Python(SpecialEq::new(
75
PythonObject(x).into(),
76
)))
77
}),
78
partition_strategy,
79
max_rows_per_file: max_rows_per_file.unwrap_or(IdxSize::MAX),
80
approximate_bytes_per_file,
81
})
82
}
83
}
84
85