Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-python/src/lazyframe/sink.rs
7889 views
1
use std::sync::{Arc, Mutex};
2
3
use polars::prelude::sink2::FileProviderReturn;
4
use polars::prelude::sync_on_close::SyncOnCloseType;
5
use polars::prelude::{PartitionTargetCallbackResult, PlPath, SpecialEq};
6
use pyo3::exceptions::PyValueError;
7
use pyo3::pybacked::PyBackedStr;
8
use pyo3::types::PyAnyMethods;
9
use pyo3::{Bound, FromPyObject, PyAny, PyResult, Python};
10
11
use crate::prelude::Wrap;
12
13
impl<'py> FromPyObject<'py> for Wrap<polars_plan::dsl::SinkTarget> {
14
fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
15
if let Ok(v) = ob.extract::<PyBackedStr>() {
16
Ok(Wrap(polars::prelude::SinkTarget::Path(PlPath::new(&v))))
17
} else {
18
let writer = Python::attach(|py| {
19
let py_f = ob.clone();
20
PyResult::Ok(
21
crate::file::try_get_pyfile(py, py_f, true)?
22
.0
23
.into_writeable(),
24
)
25
})?;
26
27
Ok(Wrap(polars_plan::prelude::SinkTarget::Dyn(SpecialEq::new(
28
Arc::new(Mutex::new(Some(writer))),
29
))))
30
}
31
}
32
}
33
34
impl<'py> FromPyObject<'py> for Wrap<PartitionTargetCallbackResult> {
35
fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
36
if let Ok(v) = ob.extract::<PyBackedStr>() {
37
Ok(Wrap(polars::prelude::PartitionTargetCallbackResult::Str(
38
v.to_string(),
39
)))
40
} else if let Ok(v) = ob.extract::<std::path::PathBuf>() {
41
Ok(Wrap(polars::prelude::PartitionTargetCallbackResult::Str(
42
v.to_str().unwrap().to_string(),
43
)))
44
} else {
45
let writer = Python::attach(|py| {
46
let py_f = ob.clone();
47
PyResult::Ok(
48
crate::file::try_get_pyfile(py, py_f, true)?
49
.0
50
.into_writeable(),
51
)
52
})?;
53
54
Ok(Wrap(
55
polars_plan::prelude::PartitionTargetCallbackResult::Dyn(SpecialEq::new(Arc::new(
56
Mutex::new(Some(writer)),
57
))),
58
))
59
}
60
}
61
}
62
63
impl<'py> FromPyObject<'py> for Wrap<FileProviderReturn> {
64
fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
65
if let Ok(v) = ob.extract::<PyBackedStr>() {
66
Ok(Wrap(FileProviderReturn::Path(v.to_string())))
67
} else if let Ok(v) = ob.extract::<std::path::PathBuf>() {
68
Ok(Wrap(FileProviderReturn::Path(
69
v.to_str().unwrap().to_string(),
70
)))
71
} else {
72
let py = ob.py();
73
74
let writeable = crate::file::try_get_pyfile(py, ob.clone(), true)?
75
.0
76
.into_writeable();
77
78
Ok(Wrap(FileProviderReturn::Writeable(writeable)))
79
}
80
}
81
}
82
83
impl<'py> FromPyObject<'py> for Wrap<SyncOnCloseType> {
84
fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {
85
let parsed = match &*ob.extract::<PyBackedStr>()? {
86
"none" => SyncOnCloseType::None,
87
"data" => SyncOnCloseType::Data,
88
"all" => SyncOnCloseType::All,
89
v => {
90
return Err(PyValueError::new_err(format!(
91
"`sync_on_close` must be one of {{'none', 'data', 'all'}}, got {v}",
92
)));
93
},
94
};
95
Ok(Wrap(parsed))
96
}
97
}
98
99