Path: blob/main/crates/polars-plan/src/plans/functions/mod.rs
8446 views
mod count;1mod dsl;2mod hint;3#[cfg(feature = "python")]4mod python_udf;5mod schema;67use std::borrow::Cow;8use std::fmt::{Debug, Display, Formatter};9use std::hash::{Hash, Hasher};10use std::sync::Arc;1112pub use dsl::*;13pub use hint::*;14use polars_core::error::feature_gated;15use polars_core::prelude::*;16use polars_core::series::IsSorted;17use polars_utils::pl_str::PlSmallStr;18#[cfg(feature = "serde")]19use serde::{Deserialize, Serialize};20use strum_macros::IntoStaticStr;2122#[cfg(feature = "python")]23use crate::dsl::python_dsl::PythonFunction;24use crate::plans::ir::ScanSourcesDisplay;25use crate::prelude::*;2627#[cfg_attr(feature = "ir_serde", derive(Serialize, Deserialize))]28#[derive(Clone, IntoStaticStr)]29#[strum(serialize_all = "SCREAMING_SNAKE_CASE")]30pub enum FunctionIR {31RowIndex {32name: PlSmallStr,33offset: Option<IdxSize>,34// Might be cached.35#[cfg_attr(feature = "ir_serde", serde(skip))]36schema: CachedSchema,37},38#[cfg(feature = "python")]39OpaquePython(OpaquePythonUdf),4041FastCount {42sources: ScanSources,43scan_type: Box<FileScanIR>,44alias: Option<PlSmallStr>,45},4647Unnest {48columns: Arc<[PlSmallStr]>,49separator: Option<PlSmallStr>,50},51Rechunk,52Explode {53columns: Arc<[PlSmallStr]>,54options: ExplodeOptions,55#[cfg_attr(feature = "ir_serde", serde(skip))]56schema: CachedSchema,57},58#[cfg(feature = "pivot")]59Unpivot {60args: Arc<UnpivotArgsIR>,61#[cfg_attr(feature = "ir_serde", serde(skip))]62schema: CachedSchema,63},64#[cfg_attr(feature = "ir_serde", serde(skip))]65Opaque {66function: Arc<dyn DataFrameUdf>,67schema: Option<Arc<dyn UdfSchema>>,68/// allow predicate pushdown optimizations69predicate_pd: bool,70/// allow projection pushdown optimizations71projection_pd: bool,72streamable: bool,73// used for formatting74fmt_str: PlSmallStr,75},76Hint(HintIR),77}7879impl Hash for FunctionIR {80fn hash<H: Hasher>(&self, state: &mut H) {81std::mem::discriminant(self).hash(state);82match self {83#[cfg(feature = "python")]84FunctionIR::OpaquePython { .. } => {},85FunctionIR::Opaque { fmt_str, .. } => fmt_str.hash(state),86FunctionIR::FastCount {87sources,88scan_type,89alias,90} => {91sources.hash(state);92scan_type.hash(state);93alias.hash(state);94},95FunctionIR::Unnest { columns, separator } => {96columns.hash(state);97separator.hash(state);98},99FunctionIR::Rechunk => {},100FunctionIR::Explode {101columns,102options,103schema: _,104} => {105columns.hash(state);106options.hash(state);107},108#[cfg(feature = "pivot")]109FunctionIR::Unpivot { args, schema: _ } => args.hash(state),110FunctionIR::RowIndex {111name,112schema: _,113offset,114} => {115name.hash(state);116offset.hash(state);117},118FunctionIR::Hint(hint) => hint.hash(state),119}120}121}122123impl FunctionIR {124/// Whether this function can run on batches of data at a time.125pub fn is_streamable(&self) -> bool {126use FunctionIR::*;127match self {128Rechunk => false,129FastCount { .. } | Unnest { .. } | Explode { .. } => true,130#[cfg(feature = "pivot")]131Unpivot { .. } => true,132Opaque { streamable, .. } => *streamable,133#[cfg(feature = "python")]134OpaquePython(OpaquePythonUdf { streamable, .. }) => *streamable,135RowIndex { .. } => false,136Hint(_) => true,137}138}139140/// Whether this function will increase the number of rows141pub fn expands_rows(&self) -> bool {142use FunctionIR::*;143match self {144#[cfg(feature = "pivot")]145Unpivot { .. } => true,146Explode { .. } => true,147_ => false,148}149}150151pub(crate) fn allow_predicate_pd(&self) -> bool {152use FunctionIR::*;153match self {154Opaque { predicate_pd, .. } => *predicate_pd,155#[cfg(feature = "python")]156OpaquePython(OpaquePythonUdf { predicate_pd, .. }) => *predicate_pd,157#[cfg(feature = "pivot")]158Unpivot { .. } => true,159Rechunk | Unnest { .. } | Explode { .. } | Hint(_) => true,160RowIndex { .. } | FastCount { .. } => false,161}162}163164pub(crate) fn allow_projection_pd(&self) -> bool {165use FunctionIR::*;166match self {167Opaque { projection_pd, .. } => *projection_pd,168#[cfg(feature = "python")]169OpaquePython(OpaquePythonUdf { projection_pd, .. }) => *projection_pd,170Rechunk | FastCount { .. } | Unnest { .. } | Explode { .. } | Hint(_) => true,171#[cfg(feature = "pivot")]172Unpivot { .. } => true,173RowIndex { .. } => true,174}175}176177pub(crate) fn additional_projection_pd_columns(&self) -> Cow<'_, [PlSmallStr]> {178use FunctionIR::*;179match self {180Unnest { columns, .. } => Cow::Borrowed(columns.as_ref()),181Explode { columns, .. } => Cow::Borrowed(columns.as_ref()),182_ => Cow::Borrowed(&[]),183}184}185186pub fn evaluate(&self, mut df: DataFrame) -> PolarsResult<DataFrame> {187use FunctionIR::*;188match self {189Opaque { function, .. } => function.call_udf(df),190#[cfg(feature = "python")]191OpaquePython(OpaquePythonUdf {192function,193validate_output,194schema,195..196}) => python_udf::call_python_udf(function, df, *validate_output, schema.clone()),197FastCount {198sources,199scan_type,200alias,201} => count::count_rows(sources, scan_type, alias.clone()),202Rechunk => {203df.rechunk_mut_par();204Ok(df)205},206Unnest { columns, separator } => {207feature_gated!(208"dtype-struct",209df.unnest(columns.iter().cloned(), separator.as_deref())210)211},212Explode {213columns, options, ..214} => df.explode(columns.iter().cloned(), *options),215#[cfg(feature = "pivot")]216Unpivot { args, .. } => {217use polars_ops::unpivot::UnpivotDF;218let args = (**args).clone();219df.unpivot2(args)220},221RowIndex { name, offset, .. } => df.with_row_index(name.clone(), *offset),222Hint(hint) => {223#[expect(irrefutable_let_patterns)]224if let HintIR::Sorted(s) = &hint225&& let Some(s) = s.first()226{227let idx = df.try_get_column_index(&s.column)?;228let col = &mut unsafe { df.columns_mut_retain_schema() }[idx];229if let Some(d) = s.descending {230let flag = if d {231IsSorted::Descending232} else {233IsSorted::Ascending234};235col.set_sorted_flag(flag);236}237}238239Ok(df)240},241}242}243244pub fn is_order_producing(&self, is_input_ordered: bool) -> bool {245match self {246FunctionIR::RowIndex { .. } => true,247FunctionIR::FastCount { .. } => false,248FunctionIR::Unnest { .. } => is_input_ordered,249FunctionIR::Rechunk => is_input_ordered,250#[cfg(feature = "python")]251FunctionIR::OpaquePython(..) => true,252FunctionIR::Explode { .. } => true,253#[cfg(feature = "pivot")]254FunctionIR::Unpivot { .. } => true,255FunctionIR::Opaque { .. } => true,256FunctionIR::Hint(_) => is_input_ordered,257}258}259260pub fn is_elementwise(&self) -> bool {261match self {262Self::Unnest { .. } | Self::Hint(_) => true,263#[cfg(feature = "python")]264Self::OpaquePython(..) => false,265#[cfg(feature = "pivot")]266Self::Unpivot { .. } => false,267Self::RowIndex { .. }268| Self::FastCount { .. }269| Self::Rechunk270| Self::Explode { .. }271| Self::Opaque { .. } => false,272}273}274275pub fn observes_input_order(&self) -> bool {276true277}278279/// Is the input ordering always the same as the output ordering.280pub fn has_equal_order(&self) -> bool {281match self {282Self::Unnest { .. } | Self::Rechunk | Self::Hint(_) => true,283#[cfg(feature = "python")]284Self::OpaquePython(..) => false,285#[cfg(feature = "pivot")]286Self::Unpivot { .. } => false,287Self::RowIndex { .. }288| Self::FastCount { .. }289| Self::Explode { .. }290| Self::Opaque { .. } => false,291}292}293}294295impl Debug for FunctionIR {296fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {297write!(f, "{self}")298}299}300301impl Display for FunctionIR {302fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {303use FunctionIR::*;304match self {305Hint(hint) => {306write!(f, "hint.{hint}")307},308Opaque { fmt_str, .. } => write!(f, "{fmt_str}"),309Unnest { columns, separator } => {310write!(f, "UNNEST by:")?;311let columns = columns.as_ref();312fmt_column_delimited(f, columns, "[", "]")?;313if let Some(separator) = separator {314write!(f, ", separator: {separator}")?;315}316Ok(())317},318FastCount {319sources,320scan_type,321alias,322} => {323let scan_type: &str = (&(**scan_type)).into();324let default_column_name = PlSmallStr::from_static(crate::constants::LEN);325let alias = alias.as_ref().unwrap_or(&default_column_name);326327write!(328f,329"FAST COUNT ({scan_type}) {} as \"{alias}\"",330ScanSourcesDisplay(sources)331)332},333RowIndex {334name,335offset,336schema: _,337} => {338write!(f, "ROW INDEX name: {name}")?;339if let Some(offset) = offset {340write!(f, ", offset: {offset}")?;341}342343Ok(())344},345Explode {346columns,347options,348schema: _,349} => {350f.write_str("EXPLODE ")?;351fmt_column_delimited(f, columns, "[", "]")?;352if !options.empty_as_null {353f.write_str(", empty_as_null: false")?;354}355if !options.keep_nulls {356f.write_str(", keep_nulls: false")?;357}358Ok(())359},360#[cfg(feature = "pivot")]361Unpivot { args, schema: _ } => {362let UnpivotArgsIR {363on,364index,365variable_name,366value_name,367} = args.as_ref();368369f.write_str("UNPIVOT on: ")?;370fmt_column_delimited(f, on, "[", "]")?;371fmt_column_delimited(f, index, "[", "]")?;372write!(f, ", variable_name: {variable_name}")?;373write!(f, ", value_name: {value_name}")?;374Ok(())375},376#[cfg(feature = "python")]377OpaquePython(_) => f.write_str(<&'static str>::from(self)),378Rechunk => f.write_str(<&'static str>::from(self)),379}380}381}382383384