Path: blob/main/crates/polars-plan/src/dsl/options/sink2.rs
7884 views
use std::hash::{Hash, Hasher};1use std::sync::Arc;23use polars_core::frame::DataFrame;4use polars_error::PolarsResult;5use polars_io::cloud::CloudOptions;6use polars_io::utils::file::Writeable;7use polars_io::utils::sync_on_close::SyncOnCloseType;8use polars_utils::IdxSize;9use polars_utils::arena::Arena;10use polars_utils::pl_str::PlSmallStr;11use polars_utils::plpath::{CloudScheme, PlPath};1213use super::Expr;14use super::sink::*;15use crate::plans::{AExpr, ExprIR};16use crate::prelude::PlanCallback;1718#[derive(Clone, Debug, PartialEq)]19pub enum SinkDestination {20File {21target: SinkTarget,22},23Partitioned {24base_path: PlPath,25file_path_provider: Option<FileProviderType>,26partition_strategy: PartitionStrategy,27/// TODO: Remove28finish_callback: Option<SinkFinishCallback>,29max_rows_per_file: IdxSize,30approximate_bytes_per_file: u64,31},32}3334impl SinkDestination {35pub fn cloud_scheme(&self) -> Option<CloudScheme> {36match self {37Self::File { target } => target.cloud_scheme(),38Self::Partitioned { base_path, .. } => base_path.cloud_scheme(),39}40}41}4243#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]44#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]45#[derive(Clone, Debug, Hash, PartialEq)]46pub struct UnifiedSinkArgs {47pub mkdir: bool,48pub maintain_order: bool,49pub sync_on_close: SyncOnCloseType,50pub cloud_options: Option<Arc<CloudOptions>>,51}5253impl Default for UnifiedSinkArgs {54fn default() -> Self {55Self {56mkdir: false,57maintain_order: true,58sync_on_close: SyncOnCloseType::None,59cloud_options: None,60}61}62}6364#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]65#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]66#[derive(Clone, Debug, PartialEq)]67pub enum PartitionStrategy {68Keyed {69keys: Vec<Expr>,70include_keys: bool,71keys_pre_grouped: bool,72per_partition_sort_by: Vec<SortColumn>,73},74/// Split the size of the input stream into chunks.75///76/// Semantically equivalent to a 0-key partition by.77FileSize,78}7980#[cfg_attr(feature = "ir_serde", derive(serde::Serialize, serde::Deserialize))]81#[derive(Clone, Debug, PartialEq, strum_macros::IntoStaticStr)]82pub enum PartitionStrategyIR {83Keyed {84keys: Vec<ExprIR>,85include_keys: bool,86keys_pre_grouped: bool,87per_partition_sort_by: Vec<SortColumnIR>,88},89/// Split the size of the input stream into chunks.90///91/// Semantically equivalent to a 0-key partition by.92FileSize,93}9495#[cfg(feature = "cse")]96impl PartitionStrategyIR {97pub(crate) fn traverse_and_hash<H: Hasher>(&self, expr_arena: &Arena<AExpr>, state: &mut H) {98std::mem::discriminant(self).hash(state);99match self {100Self::Keyed {101keys,102include_keys,103keys_pre_grouped,104per_partition_sort_by,105} => {106for k in keys {107k.traverse_and_hash(expr_arena, state);108}109110include_keys.hash(state);111keys_pre_grouped.hash(state);112113for x in per_partition_sort_by {114x.traverse_and_hash(expr_arena, state);115}116},117Self::FileSize => {},118}119}120}121122#[derive(Debug)]123pub struct FileProviderArgs {124pub index_in_partition: usize,125pub partition_keys: Arc<DataFrame>,126}127128pub enum FileProviderReturn {129Path(String),130Writeable(Writeable),131}132133pub type FileProviderFunction = PlanCallback<FileProviderArgs, FileProviderReturn>;134135#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]136#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]137#[derive(Clone, Debug, Hash, PartialEq)]138pub enum FileProviderType {139Hive { extension: PlSmallStr },140Function(FileProviderFunction),141Legacy(PartitionTargetCallback),142}143144impl FileProviderFunction {145pub fn get_file(&self, args: FileProviderArgs) -> PolarsResult<FileProviderReturn> {146match self {147Self::Rust(func) => (func)(args),148#[cfg(feature = "python")]149Self::Python(object) => pyo3::Python::attach(|py| {150use polars_error::PolarsError;151use pyo3::intern;152use pyo3::types::{PyAnyMethods, PyDict};153154let FileProviderArgs {155index_in_partition,156partition_keys,157} = args;158159let convert_registry =160polars_utils::python_convert_registry::get_python_convert_registry();161162let partition_keys = convert_registry163.to_py164.df_to_wrapped_pydf(partition_keys.as_ref())165.map_err(PolarsError::from)?;166167let kwargs = PyDict::new(py);168kwargs.set_item(intern!(py, "index_in_partition"), index_in_partition)?;169kwargs.set_item(intern!(py, "partition_keys"), partition_keys)?;170171let args_dataclass = convert_registry.py_file_provider_args_dataclass().call(172py,173(),174Some(&kwargs),175)?;176177let out = object.call1(py, (args_dataclass,))?;178let out = (convert_registry.from_py.file_provider_result)(out)?;179let out: FileProviderReturn = *out.downcast().unwrap();180181PolarsResult::Ok(out)182}),183}184}185}186187188