Path: blob/main/crates/polars-plan/src/plans/functions/mod.rs
6940 views
mod count;1mod dsl;2#[cfg(feature = "python")]3mod python_udf;4mod schema;56use std::borrow::Cow;7use std::fmt::{Debug, Display, Formatter};8use std::hash::{Hash, Hasher};9use std::sync::Arc;1011pub use dsl::*;12use polars_core::error::feature_gated;13use polars_core::prelude::*;14use polars_io::cloud::CloudOptions;15use polars_utils::pl_str::PlSmallStr;16#[cfg(feature = "serde")]17use serde::{Deserialize, Serialize};18use strum_macros::IntoStaticStr;1920#[cfg(feature = "python")]21use crate::dsl::python_dsl::PythonFunction;22use crate::plans::ir::ScanSourcesDisplay;23use crate::prelude::*;2425#[cfg_attr(feature = "ir_serde", derive(Serialize, Deserialize))]26#[derive(Clone, IntoStaticStr)]27#[strum(serialize_all = "SCREAMING_SNAKE_CASE")]28pub enum FunctionIR {29RowIndex {30name: PlSmallStr,31offset: Option<IdxSize>,32// Might be cached.33#[cfg_attr(feature = "ir_serde", serde(skip))]34schema: CachedSchema,35},36#[cfg(feature = "python")]37OpaquePython(OpaquePythonUdf),3839FastCount {40sources: ScanSources,41scan_type: Box<FileScanIR>,42cloud_options: Option<CloudOptions>,43alias: Option<PlSmallStr>,44},4546Unnest {47columns: Arc<[PlSmallStr]>,48},49Rechunk,50Explode {51columns: Arc<[PlSmallStr]>,52#[cfg_attr(feature = "ir_serde", serde(skip))]53schema: CachedSchema,54},55#[cfg(feature = "pivot")]56Unpivot {57args: Arc<UnpivotArgsIR>,58#[cfg_attr(feature = "ir_serde", serde(skip))]59schema: CachedSchema,60},61#[cfg_attr(feature = "ir_serde", serde(skip))]62Opaque {63function: Arc<dyn DataFrameUdf>,64schema: Option<Arc<dyn UdfSchema>>,65/// allow predicate pushdown optimizations66predicate_pd: bool,67/// allow projection pushdown optimizations68projection_pd: bool,69streamable: bool,70// used for formatting71fmt_str: PlSmallStr,72},73}7475impl Eq for FunctionIR {}7677impl PartialEq for FunctionIR {78fn eq(&self, other: &Self) -> bool {79use FunctionIR::*;80match (self, other) {81(Rechunk, Rechunk) => true,82(83FastCount {84sources: srcs_l, ..85},86FastCount {87sources: srcs_r, ..88},89) => srcs_l == srcs_r,90(Explode { columns: l, .. }, Explode { columns: r, .. }) => l == r,91#[cfg(feature = "pivot")]92(Unpivot { args: l, .. }, Unpivot { args: r, .. }) => l == r,93(RowIndex { name: l, .. }, RowIndex { name: r, .. }) => l == r,94_ => false,95}96}97}9899impl Hash for FunctionIR {100fn hash<H: Hasher>(&self, state: &mut H) {101std::mem::discriminant(self).hash(state);102match self {103#[cfg(feature = "python")]104FunctionIR::OpaquePython { .. } => {},105FunctionIR::Opaque { fmt_str, .. } => fmt_str.hash(state),106FunctionIR::FastCount {107sources,108scan_type,109cloud_options,110alias,111} => {112sources.hash(state);113scan_type.hash(state);114cloud_options.hash(state);115alias.hash(state);116},117FunctionIR::Unnest { columns } => columns.hash(state),118FunctionIR::Rechunk => {},119FunctionIR::Explode { columns, schema: _ } => columns.hash(state),120#[cfg(feature = "pivot")]121FunctionIR::Unpivot { args, schema: _ } => args.hash(state),122FunctionIR::RowIndex {123name,124schema: _,125offset,126} => {127name.hash(state);128offset.hash(state);129},130}131}132}133134impl FunctionIR {135/// Whether this function can run on batches of data at a time.136pub fn is_streamable(&self) -> bool {137use FunctionIR::*;138match self {139Rechunk => false,140FastCount { .. } | Unnest { .. } | Explode { .. } => true,141#[cfg(feature = "pivot")]142Unpivot { .. } => true,143Opaque { streamable, .. } => *streamable,144#[cfg(feature = "python")]145OpaquePython(OpaquePythonUdf { streamable, .. }) => *streamable,146RowIndex { .. } => false,147}148}149150/// Whether this function will increase the number of rows151pub fn expands_rows(&self) -> bool {152use FunctionIR::*;153match self {154#[cfg(feature = "pivot")]155Unpivot { .. } => true,156Explode { .. } => true,157_ => false,158}159}160161pub(crate) fn allow_predicate_pd(&self) -> bool {162use FunctionIR::*;163match self {164Opaque { predicate_pd, .. } => *predicate_pd,165#[cfg(feature = "python")]166OpaquePython(OpaquePythonUdf { predicate_pd, .. }) => *predicate_pd,167#[cfg(feature = "pivot")]168Unpivot { .. } => true,169Rechunk | Unnest { .. } | Explode { .. } => true,170RowIndex { .. } | FastCount { .. } => false,171}172}173174pub(crate) fn allow_projection_pd(&self) -> bool {175use FunctionIR::*;176match self {177Opaque { projection_pd, .. } => *projection_pd,178#[cfg(feature = "python")]179OpaquePython(OpaquePythonUdf { projection_pd, .. }) => *projection_pd,180Rechunk | FastCount { .. } | Unnest { .. } | Explode { .. } => true,181#[cfg(feature = "pivot")]182Unpivot { .. } => true,183RowIndex { .. } => true,184}185}186187pub(crate) fn additional_projection_pd_columns(&self) -> Cow<'_, [PlSmallStr]> {188use FunctionIR::*;189match self {190Unnest { columns } => Cow::Borrowed(columns.as_ref()),191Explode { columns, .. } => Cow::Borrowed(columns.as_ref()),192_ => Cow::Borrowed(&[]),193}194}195196pub fn evaluate(&self, mut df: DataFrame) -> PolarsResult<DataFrame> {197use FunctionIR::*;198match self {199Opaque { function, .. } => function.call_udf(df),200#[cfg(feature = "python")]201OpaquePython(OpaquePythonUdf {202function,203validate_output,204schema,205..206}) => python_udf::call_python_udf(function, df, *validate_output, schema.clone()),207FastCount {208sources,209scan_type,210cloud_options,211alias,212} => count::count_rows(sources, scan_type, cloud_options.as_ref(), alias.clone()),213Rechunk => {214df.as_single_chunk_par();215Ok(df)216},217Unnest { columns: _columns } => {218feature_gated!("dtype-struct", df.unnest(_columns.iter().cloned()))219},220Explode { columns, .. } => df.explode(columns.iter().cloned()),221#[cfg(feature = "pivot")]222Unpivot { args, .. } => {223use polars_ops::pivot::UnpivotDF;224let args = (**args).clone();225df.unpivot2(args)226},227RowIndex { name, offset, .. } => df.with_row_index(name.clone(), *offset),228}229}230}231232impl Debug for FunctionIR {233fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {234write!(f, "{self}")235}236}237238impl Display for FunctionIR {239fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {240use FunctionIR::*;241match self {242Opaque { fmt_str, .. } => write!(f, "{fmt_str}"),243Unnest { columns } => {244write!(f, "UNNEST by:")?;245let columns = columns.as_ref();246fmt_column_delimited(f, columns, "[", "]")247},248FastCount {249sources,250scan_type,251cloud_options: _,252alias,253} => {254let scan_type: &str = (&(**scan_type)).into();255let default_column_name = PlSmallStr::from_static(crate::constants::LEN);256let alias = alias.as_ref().unwrap_or(&default_column_name);257258write!(259f,260"FAST COUNT ({scan_type}) {} as \"{alias}\"",261ScanSourcesDisplay(sources)262)263},264v => {265let s: &str = v.into();266write!(f, "{s}")267},268}269}270}271272273