Path: blob/main/crates/polars-python/src/io/sink_output.rs
7889 views
use polars::prelude::sink::{PartitionTargetCallback, SinkFinishCallback};1use polars::prelude::sink2::FileProviderType;2use polars::prelude::{PartitionStrategy, PlPath, SinkDestination, SortColumn};3use polars_utils::IdxSize;4use polars_utils::python_function::PythonObject;5use pyo3::exceptions::PyValueError;6use pyo3::types::PyAnyMethods;7use pyo3::{Bound, FromPyObject, Py, PyAny, PyResult, intern};89use crate::PyExpr;10use crate::prelude::Wrap;1112pub struct PyFileSinkDestination<'py>(Bound<'py, pyo3::PyAny>);1314impl<'py> FromPyObject<'py> for PyFileSinkDestination<'py> {15fn extract_bound(ob: &Bound<'py, pyo3::PyAny>) -> pyo3::PyResult<Self> {16Ok(Self(ob.clone()))17}18}1920impl PyFileSinkDestination<'_> {21pub fn extract_file_sink_destination(&self) -> PyResult<SinkDestination> {22let py = self.0.py();2324if let Ok(sink_output_dataclass) = self.0.getattr(intern!(py, "_pl_sink_directory")) {25return self.extract_from_py_sink_directory(sink_output_dataclass);26};2728let v: Wrap<polars_plan::dsl::SinkTarget> = self.0.extract()?;2930Ok(SinkDestination::File { target: v.0 })31}3233fn extract_from_py_sink_directory(34&self,35sink_output_dataclass: Bound<'_, PyAny>,36) -> PyResult<SinkDestination> {37/// Extract from `SinkDirectoryInner` dataclass.38#[derive(FromPyObject)]39struct Extract {40base_path: Wrap<PlPath>,41file_path_provider: Option<Py<PyAny>>,42partition_by: Option<Vec<PyExpr>>,43partition_keys_sorted: Option<bool>,44include_keys: Option<bool>,45per_partition_sort_by: Option<Vec<PyExpr>>,46per_file_sort_by: Option<Vec<PyExpr>>,47max_rows_per_file: Option<IdxSize>,48finish_callback: Option<Py<PyAny>>,49}5051let Extract {52base_path,53file_path_provider,54partition_by,55partition_keys_sorted,56include_keys,57per_partition_sort_by,58per_file_sort_by,59max_rows_per_file,60finish_callback,61} = sink_output_dataclass.extract()?;6263if per_partition_sort_by.is_some() && per_file_sort_by.is_some() {64return Err(PyValueError::new_err(65"cannot specify both 'per_partition_sort_by' and 'per_file_sort_by'",66));67}6869let partition_strategy: PartitionStrategy = if let Some(partition_by) = partition_by {70if max_rows_per_file.is_some() {71return Err(PyValueError::new_err(72"unimplemented: 'max_rows_per_file' with 'partition_by'",73));74}7576if per_file_sort_by.is_some() {77return Err(PyValueError::new_err(78"unimplemented: 'per_file_sort_by' with 'partition_by'",79));80}8182PartitionStrategy::Keyed {83keys: partition_by.into_iter().map(|x| x.inner).collect(),84include_keys: include_keys.unwrap_or(true),85keys_pre_grouped: false,86per_partition_sort_by: per_partition_sort_by87.unwrap_or_default()88.into_iter()89.map(|x| SortColumn {90expr: x.inner,91descending: false,92nulls_last: false,93})94.collect(),95}96} else if let Some(parameter_name) = partition_keys_sorted97.as_ref()98.is_some()99.then_some("partition_keys_sorted")100.or(include_keys.is_some().then_some("include_keys"))101.or(per_partition_sort_by102.is_some()103.then_some("per_partition_sort_by"))104{105return Err(PyValueError::new_err(format!(106"cannot use '{parameter_name}' without specifying `partition_by`"107)));108} else if max_rows_per_file.is_some() {109PartitionStrategy::FileSize110} else {111return Err(PyValueError::new_err(112"at least one of ('partition_by', 'max_rows_per_file') \113must be specified for SinkPartitioned",114));115};116117Ok(SinkDestination::Partitioned {118base_path: base_path.0,119file_path_provider: file_path_provider.map(|x| {120FileProviderType::Legacy(PartitionTargetCallback::Python(PythonObject(x)))121}),122partition_strategy,123finish_callback: finish_callback.map(|x| SinkFinishCallback::Python(PythonObject(x))),124max_rows_per_file: max_rows_per_file.unwrap_or(IdxSize::MAX),125approximate_bytes_per_file: u64::MAX,126})127}128}129130131