Path: blob/main/crates/polars-python/src/lazyframe/sink.rs
7889 views
use std::sync::{Arc, Mutex};12use polars::prelude::sink2::FileProviderReturn;3use polars::prelude::sync_on_close::SyncOnCloseType;4use polars::prelude::{PartitionTargetCallbackResult, PlPath, SpecialEq};5use pyo3::exceptions::PyValueError;6use pyo3::pybacked::PyBackedStr;7use pyo3::types::PyAnyMethods;8use pyo3::{Bound, FromPyObject, PyAny, PyResult, Python};910use crate::prelude::Wrap;1112impl<'py> FromPyObject<'py> for Wrap<polars_plan::dsl::SinkTarget> {13fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {14if let Ok(v) = ob.extract::<PyBackedStr>() {15Ok(Wrap(polars::prelude::SinkTarget::Path(PlPath::new(&v))))16} else {17let writer = Python::attach(|py| {18let py_f = ob.clone();19PyResult::Ok(20crate::file::try_get_pyfile(py, py_f, true)?21.022.into_writeable(),23)24})?;2526Ok(Wrap(polars_plan::prelude::SinkTarget::Dyn(SpecialEq::new(27Arc::new(Mutex::new(Some(writer))),28))))29}30}31}3233impl<'py> FromPyObject<'py> for Wrap<PartitionTargetCallbackResult> {34fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {35if let Ok(v) = ob.extract::<PyBackedStr>() {36Ok(Wrap(polars::prelude::PartitionTargetCallbackResult::Str(37v.to_string(),38)))39} else if let Ok(v) = ob.extract::<std::path::PathBuf>() {40Ok(Wrap(polars::prelude::PartitionTargetCallbackResult::Str(41v.to_str().unwrap().to_string(),42)))43} else {44let writer = Python::attach(|py| {45let py_f = ob.clone();46PyResult::Ok(47crate::file::try_get_pyfile(py, py_f, true)?48.049.into_writeable(),50)51})?;5253Ok(Wrap(54polars_plan::prelude::PartitionTargetCallbackResult::Dyn(SpecialEq::new(Arc::new(55Mutex::new(Some(writer)),56))),57))58}59}60}6162impl<'py> FromPyObject<'py> for Wrap<FileProviderReturn> {63fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {64if let Ok(v) = ob.extract::<PyBackedStr>() {65Ok(Wrap(FileProviderReturn::Path(v.to_string())))66} else if let Ok(v) = ob.extract::<std::path::PathBuf>() {67Ok(Wrap(FileProviderReturn::Path(68v.to_str().unwrap().to_string(),69)))70} else {71let py = ob.py();7273let writeable = crate::file::try_get_pyfile(py, ob.clone(), true)?74.075.into_writeable();7677Ok(Wrap(FileProviderReturn::Writeable(writeable)))78}79}80}8182impl<'py> FromPyObject<'py> for Wrap<SyncOnCloseType> {83fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult<Self> {84let parsed = match &*ob.extract::<PyBackedStr>()? {85"none" => SyncOnCloseType::None,86"data" => SyncOnCloseType::Data,87"all" => SyncOnCloseType::All,88v => {89return Err(PyValueError::new_err(format!(90"`sync_on_close` must be one of {{'none', 'data', 'all'}}, got {v}",91)));92},93};94Ok(Wrap(parsed))95}96}979899