Path: blob/main/crates/polars-python/src/io/sink_output.rs
8409 views
use polars::prelude::file_provider::{FileProviderFunction, FileProviderType};1use polars::prelude::{PartitionStrategy, PlRefPath, SinkDestination, SpecialEq};2use polars_utils::IdxSize;3use polars_utils::python_function::PythonObject;4use pyo3::intern;5use pyo3::prelude::*;67use crate::PyExpr;8use crate::prelude::Wrap;910pub struct PyFileSinkDestination<'py>(Bound<'py, PyAny>);1112impl<'a, 'py> FromPyObject<'a, 'py> for PyFileSinkDestination<'py> {13type Error = PyErr;1415fn extract(ob: Borrowed<'a, 'py, PyAny>) -> PyResult<Self> {16Ok(Self(ob.to_owned()))17}18}1920impl PyFileSinkDestination<'_> {21pub fn extract_file_sink_destination(&self) -> PyResult<SinkDestination> {22let py = self.0.py();2324if let Ok(partition_by_dataclass) = self.0.getattr(intern!(py, "_pl_partition_by")) {25return self.extract_from_py_partition_by(partition_by_dataclass);26};2728let v: Wrap<polars_plan::dsl::SinkTarget> = self.0.extract()?;2930Ok(SinkDestination::File { target: v.0 })31}3233fn extract_from_py_partition_by(34&self,35partition_by_dataclass: Bound<'_, PyAny>,36) -> PyResult<SinkDestination> {37/// Extract from `PartitionByInner` dataclass.38#[derive(FromPyObject)]39struct Extract {40base_path: Wrap<PlRefPath>,41file_path_provider: Option<Py<PyAny>>,42key: Option<Vec<PyExpr>>,43include_key: Option<bool>,44max_rows_per_file: Option<IdxSize>,45approximate_bytes_per_file: u64,46}4748let Extract {49base_path,50file_path_provider,51key,52include_key,53max_rows_per_file,54approximate_bytes_per_file,55} = partition_by_dataclass.extract()?;5657let partition_strategy: PartitionStrategy = if let Some(partition_by) = key {58PartitionStrategy::Keyed {59keys: partition_by.into_iter().map(|x| x.inner).collect(),60include_keys: include_key.unwrap_or(true),61keys_pre_grouped: false,62}63} else {64// Should be validated on Python side65assert!(include_key.is_none());6667PartitionStrategy::FileSize68};6970Ok(SinkDestination::Partitioned {71base_path: base_path.0,72file_path_provider: file_path_provider.map(|x| {73FileProviderType::Function(FileProviderFunction::Python(SpecialEq::new(74PythonObject(x).into(),75)))76}),77partition_strategy,78max_rows_per_file: max_rows_per_file.unwrap_or(IdxSize::MAX),79approximate_bytes_per_file,80})81}82}838485