Path: blob/main/crates/polars-plan/src/dsl/options/sink.rs
6940 views
use std::fmt;1use std::hash::{Hash, Hasher};2use std::path::PathBuf;3use std::sync::Arc;45use polars_core::error::PolarsResult;6use polars_core::frame::DataFrame;7use polars_core::prelude::DataType;8use polars_core::scalar::Scalar;9use polars_io::cloud::CloudOptions;10use polars_io::utils::file::{DynWriteable, Writeable};11use polars_io::utils::sync_on_close::SyncOnCloseType;12use polars_utils::IdxSize;13use polars_utils::arena::Arena;14use polars_utils::pl_str::PlSmallStr;15use polars_utils::plpath::PlPath;1617use super::{ExprIR, FileType};18use crate::dsl::{AExpr, Expr, SpecialEq};1920/// Options that apply to all sinks.21#[derive(Clone, PartialEq, Eq, Debug, Hash)]22#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]23#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]24pub struct SinkOptions {25/// Call sync when closing the file.26pub sync_on_close: SyncOnCloseType,2728/// The output file needs to maintain order of the data that comes in.29pub maintain_order: bool,3031/// Recursively create all the directories in the path.32pub mkdir: bool,33}3435impl Default for SinkOptions {36fn default() -> Self {37Self {38sync_on_close: Default::default(),39maintain_order: true,40mkdir: false,41}42}43}4445type DynSinkTarget = SpecialEq<Arc<std::sync::Mutex<Option<Box<dyn DynWriteable>>>>>;4647#[derive(Clone, PartialEq, Eq)]48pub enum SinkTarget {49Path(PlPath),50Dyn(DynSinkTarget),51}5253impl SinkTarget {54pub fn open_into_writeable(55&self,56sink_options: &SinkOptions,57cloud_options: Option<&CloudOptions>,58) -> PolarsResult<Writeable> {59match self {60SinkTarget::Path(addr) => {61if sink_options.mkdir {62polars_io::utils::mkdir::mkdir_recursive(addr.as_ref())?;63}6465polars_io::utils::file::Writeable::try_new(addr.as_ref(), cloud_options)66},67SinkTarget::Dyn(memory_writer) => Ok(Writeable::Dyn(68memory_writer.lock().unwrap().take().unwrap(),69)),70}71}7273#[cfg(not(feature = "cloud"))]74pub async fn open_into_writeable_async(75&self,76sink_options: &SinkOptions,77cloud_options: Option<&CloudOptions>,78) -> PolarsResult<Writeable> {79self.open_into_writeable(sink_options, cloud_options)80}8182#[cfg(feature = "cloud")]83pub async fn open_into_writeable_async(84&self,85sink_options: &SinkOptions,86cloud_options: Option<&CloudOptions>,87) -> PolarsResult<Writeable> {88match self {89SinkTarget::Path(addr) => {90if sink_options.mkdir {91polars_io::utils::mkdir::tokio_mkdir_recursive(addr.as_ref()).await?;92}9394polars_io::utils::file::Writeable::try_new(addr.as_ref(), cloud_options)95},96SinkTarget::Dyn(memory_writer) => Ok(Writeable::Dyn(97memory_writer.lock().unwrap().take().unwrap(),98)),99}100}101102pub fn to_display_string(&self) -> String {103match self {104Self::Path(p) => p.display().to_string(),105Self::Dyn(_) => "dynamic-target".to_string(),106}107}108}109110impl fmt::Debug for SinkTarget {111fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {112f.write_str("SinkTarget::")?;113match self {114Self::Path(p) => write!(f, "Path({p:?})"),115Self::Dyn(_) => f.write_str("Dyn"),116}117}118}119120impl std::hash::Hash for SinkTarget {121fn hash<H: Hasher>(&self, state: &mut H) {122std::mem::discriminant(self).hash(state);123match self {124Self::Path(p) => p.hash(state),125Self::Dyn(p) => Arc::as_ptr(p).hash(state),126}127}128}129130#[cfg(feature = "serde")]131impl serde::Serialize for SinkTarget {132fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>133where134S: serde::Serializer,135{136match self {137Self::Path(p) => p.serialize(serializer),138Self::Dyn(_) => Err(serde::ser::Error::custom(139"cannot serialize in-memory sink target",140)),141}142}143}144145#[cfg(feature = "serde")]146impl<'de> serde::Deserialize<'de> for SinkTarget {147fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>148where149D: serde::Deserializer<'de>,150{151Ok(Self::Path(PlPath::deserialize(deserializer)?))152}153}154155#[cfg(feature = "dsl-schema")]156impl schemars::JsonSchema for SinkTarget {157fn schema_name() -> String {158"SinkTarget".to_owned()159}160161fn schema_id() -> std::borrow::Cow<'static, str> {162std::borrow::Cow::Borrowed(concat!(module_path!(), "::", "SinkTarget"))163}164165fn json_schema(generator: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema {166PathBuf::json_schema(generator)167}168}169170#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]171#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]172#[derive(Clone, Debug, PartialEq, Eq, Hash)]173pub struct FileSinkType {174pub target: SinkTarget,175pub file_type: FileType,176pub sink_options: SinkOptions,177pub cloud_options: Option<polars_io::cloud::CloudOptions>,178}179180#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]181#[derive(Clone, Debug, PartialEq)]182pub enum SinkTypeIR {183Memory,184File(FileSinkType),185#[cfg_attr(all(feature = "serde", not(feature = "ir_serde")), serde(skip))]186Partition(PartitionSinkTypeIR),187}188189#[cfg_attr(feature = "python", pyo3::pyclass)]190#[derive(Clone)]191pub struct PartitionTargetContextKey {192pub name: PlSmallStr,193pub raw_value: Scalar,194}195196#[cfg_attr(feature = "python", pyo3::pyclass)]197pub struct PartitionTargetContext {198pub file_idx: usize,199pub part_idx: usize,200pub in_part_idx: usize,201pub keys: Vec<PartitionTargetContextKey>,202pub file_path: String,203pub full_path: PlPath,204}205206#[cfg(feature = "python")]207#[pyo3::pymethods]208impl PartitionTargetContext {209#[getter]210pub fn file_idx(&self) -> usize {211self.file_idx212}213#[getter]214pub fn part_idx(&self) -> usize {215self.part_idx216}217#[getter]218pub fn in_part_idx(&self) -> usize {219self.in_part_idx220}221#[getter]222pub fn keys(&self) -> Vec<PartitionTargetContextKey> {223self.keys.clone()224}225#[getter]226pub fn file_path(&self) -> &str {227self.file_path.as_str()228}229#[getter]230pub fn full_path(&self) -> &str {231self.full_path.to_str()232}233}234#[cfg(feature = "python")]235#[pyo3::pymethods]236impl PartitionTargetContextKey {237#[getter]238pub fn name(&self) -> &str {239self.name.as_str()240}241#[getter]242pub fn str_value(&self) -> pyo3::PyResult<String> {243let value = self244.raw_value245.clone()246.into_series(PlSmallStr::EMPTY)247.strict_cast(&DataType::String)248.map_err(|err| pyo3::exceptions::PyRuntimeError::new_err(err.to_string()))?;249let value = value.str().unwrap();250let value = value.get(0).unwrap_or("null").as_bytes();251let value = percent_encoding::percent_encode(value, polars_io::utils::URL_ENCODE_CHAR_SET);252Ok(value.to_string())253}254#[getter]255pub fn raw_value(&self) -> pyo3::PyObject {256let converter = polars_core::chunked_array::object::registry::get_pyobject_converter();257*(converter.as_ref())(self.raw_value.as_any_value())258.downcast::<pyo3::PyObject>()259.unwrap()260}261}262263#[derive(Clone, Debug, PartialEq)]264pub enum PartitionTargetCallback {265Rust(266SpecialEq<267Arc<268dyn Fn(PartitionTargetContext) -> PolarsResult<PartitionTargetCallbackResult>269+ Send270+ Sync,271>,272>,273),274#[cfg(feature = "python")]275Python(polars_utils::python_function::PythonFunction),276}277278#[cfg_attr(feature = "python", pyo3::pyclass)]279pub struct SinkWritten {280pub file_idx: usize,281pub part_idx: usize,282pub in_part_idx: usize,283pub keys: Vec<PartitionTargetContextKey>,284pub file_path: PathBuf,285pub full_path: PathBuf,286pub num_rows: usize,287pub file_size: usize,288pub gathered: Option<DataFrame>,289}290291#[cfg_attr(feature = "python", pyo3::pyclass)]292pub struct SinkFinishContext {293pub written: Vec<SinkWritten>,294}295296#[derive(Clone, Debug, PartialEq)]297pub enum SinkFinishCallback {298Rust(SpecialEq<Arc<dyn Fn(DataFrame) -> PolarsResult<()> + Send + Sync>>),299#[cfg(feature = "python")]300Python(polars_utils::python_function::PythonFunction),301}302303impl SinkFinishCallback {304pub fn call(&self, df: DataFrame) -> PolarsResult<()> {305match self {306Self::Rust(f) => f(df),307#[cfg(feature = "python")]308Self::Python(f) => pyo3::Python::with_gil(|py| {309let converter =310polars_utils::python_convert_registry::get_python_convert_registry();311let df = (converter.to_py.df)(Box::new(df) as Box<dyn std::any::Any>)?;312f.call1(py, (df,))?;313PolarsResult::Ok(())314}),315}316}317}318319#[derive(Clone)]320pub enum PartitionTargetCallbackResult {321Str(String),322Dyn(DynSinkTarget),323}324325impl PartitionTargetCallback {326pub fn call(&self, ctx: PartitionTargetContext) -> PolarsResult<PartitionTargetCallbackResult> {327match self {328Self::Rust(f) => f(ctx),329#[cfg(feature = "python")]330Self::Python(f) => pyo3::Python::with_gil(|py| {331let partition_target = f.call1(py, (ctx,))?;332let converter =333polars_utils::python_convert_registry::get_python_convert_registry();334let partition_target =335(converter.from_py.partition_target_cb_result)(partition_target)?;336let partition_target = partition_target337.downcast_ref::<PartitionTargetCallbackResult>()338.unwrap()339.clone();340PolarsResult::Ok(partition_target)341}),342}343}344}345346#[cfg(feature = "serde")]347impl serde::Serialize for SinkFinishCallback {348fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>349where350S: serde::Serializer,351{352use serde::ser::Error;353354#[cfg(feature = "python")]355if let Self::Python(v) = self {356return v.serialize(_serializer);357}358359Err(S::Error::custom(format!("cannot serialize {self:?}")))360}361}362363#[cfg(feature = "serde")]364impl<'de> serde::Deserialize<'de> for SinkFinishCallback {365fn deserialize<D>(_deserializer: D) -> Result<Self, D::Error>366where367D: serde::Deserializer<'de>,368{369#[cfg(feature = "python")]370{371Ok(Self::Python(372polars_utils::python_function::PythonFunction::deserialize(_deserializer)?,373))374}375#[cfg(not(feature = "python"))]376{377use serde::de::Error;378Err(D::Error::custom(379"cannot deserialize PartitionOutputCallback",380))381}382}383}384385#[cfg(feature = "dsl-schema")]386impl schemars::JsonSchema for SinkFinishCallback {387fn schema_name() -> String {388"PartitionTargetCallback".to_owned()389}390391fn schema_id() -> std::borrow::Cow<'static, str> {392std::borrow::Cow::Borrowed(concat!(module_path!(), "::", "SinkFinishCallback"))393}394395fn json_schema(generator: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema {396Vec::<u8>::json_schema(generator)397}398}399400#[cfg(feature = "serde")]401impl<'de> serde::Deserialize<'de> for PartitionTargetCallback {402fn deserialize<D>(_deserializer: D) -> Result<Self, D::Error>403where404D: serde::Deserializer<'de>,405{406#[cfg(feature = "python")]407{408Ok(Self::Python(409polars_utils::python_function::PythonFunction::deserialize(_deserializer)?,410))411}412#[cfg(not(feature = "python"))]413{414use serde::de::Error;415Err(D::Error::custom(416"cannot deserialize PartitionOutputCallback",417))418}419}420}421422#[cfg(feature = "serde")]423impl serde::Serialize for PartitionTargetCallback {424fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>425where426S: serde::Serializer,427{428use serde::ser::Error;429430#[cfg(feature = "python")]431if let Self::Python(v) = self {432return v.serialize(_serializer);433}434435Err(S::Error::custom(format!("cannot serialize {self:?}")))436}437}438439#[cfg(feature = "dsl-schema")]440impl schemars::JsonSchema for PartitionTargetCallback {441fn schema_name() -> String {442"PartitionTargetCallback".to_owned()443}444445fn schema_id() -> std::borrow::Cow<'static, str> {446std::borrow::Cow::Borrowed(concat!(module_path!(), "::", "PartitionTargetCallback"))447}448449fn json_schema(generator: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema {450Vec::<u8>::json_schema(generator)451}452}453454#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]455#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]456#[derive(Clone, Debug, PartialEq)]457pub struct SortColumn {458pub expr: Expr,459pub descending: bool,460pub nulls_last: bool,461}462463#[cfg_attr(feature = "ir_serde", derive(serde::Serialize, serde::Deserialize))]464#[derive(Clone, Debug, PartialEq)]465pub struct SortColumnIR {466pub expr: ExprIR,467pub descending: bool,468pub nulls_last: bool,469}470471#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]472#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]473#[derive(Clone, Debug, PartialEq)]474pub struct PartitionSinkType {475pub base_path: Arc<PlPath>,476pub file_path_cb: Option<PartitionTargetCallback>,477pub file_type: FileType,478pub sink_options: SinkOptions,479pub variant: PartitionVariant,480pub cloud_options: Option<polars_io::cloud::CloudOptions>,481pub per_partition_sort_by: Option<Vec<SortColumn>>,482pub finish_callback: Option<SinkFinishCallback>,483}484485#[cfg_attr(feature = "ir_serde", derive(serde::Serialize, serde::Deserialize))]486#[derive(Clone, Debug, PartialEq)]487pub struct PartitionSinkTypeIR {488pub base_path: Arc<PlPath>,489pub file_path_cb: Option<PartitionTargetCallback>,490pub file_type: FileType,491pub sink_options: SinkOptions,492pub variant: PartitionVariantIR,493pub cloud_options: Option<polars_io::cloud::CloudOptions>,494pub per_partition_sort_by: Option<Vec<SortColumnIR>>,495pub finish_callback: Option<SinkFinishCallback>,496}497498#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]499#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]500#[derive(Clone, Debug, PartialEq)]501pub enum SinkType {502Memory,503File(FileSinkType),504Partition(PartitionSinkType),505}506507#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]508#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]509#[derive(Clone, Debug, PartialEq, Eq, Hash)]510pub enum PartitionVariant {511MaxSize(IdxSize),512Parted {513key_exprs: Vec<Expr>,514include_key: bool,515},516ByKey {517key_exprs: Vec<Expr>,518include_key: bool,519},520}521522#[cfg_attr(feature = "ir_serde", derive(serde::Serialize, serde::Deserialize))]523#[derive(Clone, Debug, PartialEq, Eq)]524pub enum PartitionVariantIR {525MaxSize(IdxSize),526Parted {527key_exprs: Vec<ExprIR>,528include_key: bool,529},530ByKey {531key_exprs: Vec<ExprIR>,532include_key: bool,533},534}535536#[cfg(feature = "cse")]537impl SinkTypeIR {538pub(crate) fn traverse_and_hash<H: Hasher>(&self, expr_arena: &Arena<AExpr>, state: &mut H) {539std::mem::discriminant(self).hash(state);540match self {541Self::Memory => {},542Self::File(f) => f.hash(state),543Self::Partition(f) => f.traverse_and_hash(expr_arena, state),544}545}546}547548impl SinkTypeIR {549pub fn maintain_order(&self) -> bool {550match self {551SinkTypeIR::Memory => true,552SinkTypeIR::File(s) => s.sink_options.maintain_order,553SinkTypeIR::Partition(s) => s.sink_options.maintain_order,554}555}556}557558#[cfg(feature = "cse")]559impl PartitionSinkTypeIR {560pub(crate) fn traverse_and_hash<H: Hasher>(&self, expr_arena: &Arena<AExpr>, state: &mut H) {561self.file_type.hash(state);562self.sink_options.hash(state);563self.variant.traverse_and_hash(expr_arena, state);564self.cloud_options.hash(state);565std::mem::discriminant(&self.per_partition_sort_by).hash(state);566if let Some(v) = &self.per_partition_sort_by {567v.len().hash(state);568for v in v {569v.traverse_and_hash(expr_arena, state);570}571}572}573}574575#[cfg(feature = "cse")]576impl SortColumnIR {577pub(crate) fn traverse_and_hash<H: Hasher>(&self, expr_arena: &Arena<AExpr>, state: &mut H) {578self.expr.traverse_and_hash(expr_arena, state);579self.descending.hash(state);580self.nulls_last.hash(state);581}582}583584impl PartitionVariantIR {585#[cfg(feature = "cse")]586pub(crate) fn traverse_and_hash<H: Hasher>(&self, expr_arena: &Arena<AExpr>, state: &mut H) {587std::mem::discriminant(self).hash(state);588match self {589Self::MaxSize(size) => size.hash(state),590Self::Parted {591key_exprs,592include_key,593}594| Self::ByKey {595key_exprs,596include_key,597} => {598include_key.hash(state);599for key_expr in key_exprs.as_slice() {600key_expr.traverse_and_hash(expr_arena, state);601}602},603}604}605}606607#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]608#[derive(Clone, Debug)]609pub struct FileSinkOptions {610pub path: Arc<PathBuf>,611pub file_type: FileType,612}613614615