Path: blob/main/crates/polars-plan/src/plans/functions/dsl.rs
6940 views
use polars_compute::rolling::QuantileMethod;1use strum_macros::IntoStaticStr;23use super::*;45#[cfg(feature = "python")]6#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]7#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]8#[derive(Clone)]9pub struct OpaquePythonUdf {10pub function: PythonFunction,11pub schema: Option<SchemaRef>,12/// allow predicate pushdown optimizations13pub predicate_pd: bool,14/// allow projection pushdown optimizations15pub projection_pd: bool,16pub streamable: bool,17pub validate_output: bool,18}1920// Except for Opaque functions, this only has the DSL name of the function.21#[derive(Clone, IntoStaticStr)]22#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]23#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]24#[strum(serialize_all = "SCREAMING_SNAKE_CASE")]25pub enum DslFunction {26RowIndex {27name: PlSmallStr,28offset: Option<IdxSize>,29},30// This is both in DSL and IR because we want to be able to serialize it.31#[cfg(feature = "python")]32OpaquePython(OpaquePythonUdf),33Explode {34columns: Selector,35allow_empty: bool,36},37#[cfg(feature = "pivot")]38Unpivot {39args: UnpivotArgsDSL,40},41Rename {42existing: Arc<[PlSmallStr]>,43new: Arc<[PlSmallStr]>,44strict: bool,45},46Unnest(Selector),47Stats(StatsFunction),48/// FillValue49FillNan(Expr),50// Function that is already converted to IR.51#[cfg_attr(any(feature = "serde", feature = "dsl-schema"), serde(skip))]52FunctionIR(FunctionIR),53}5455#[derive(Clone)]56#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]57#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]58pub enum StatsFunction {59Var {60ddof: u8,61},62Std {63ddof: u8,64},65Quantile {66quantile: Expr,67method: QuantileMethod,68},69Median,70Mean,71Sum,72Min,73Max,74}7576pub(crate) fn validate_columns_in_input<S: AsRef<str>, I: IntoIterator<Item = S>>(77columns: I,78input_schema: &Schema,79operation_name: &str,80) -> PolarsResult<()> {81let columns = columns.into_iter();82for c in columns {83polars_ensure!(input_schema.contains(c.as_ref()), ColumnNotFound: "'{}' on column: '{}' is invalid\n\nSchema at this point: {:?}", operation_name, c.as_ref(), input_schema)84}85Ok(())86}8788impl DslFunction {89pub(crate) fn into_function_ir(self, input_schema: &Schema) -> PolarsResult<FunctionIR> {90let function = match self {91#[cfg(feature = "pivot")]92DslFunction::Unpivot { args } => {93let on = args.on.into_columns(input_schema, &Default::default())?;94let index = args.index.into_columns(input_schema, &Default::default())?;9596let args = UnpivotArgsIR {97on: on.into_iter().collect(),98index: index.into_iter().collect(),99variable_name: args.variable_name.clone(),100value_name: args.value_name,101};102103FunctionIR::Unpivot {104args: Arc::new(args),105schema: Default::default(),106}107},108DslFunction::FunctionIR(func) => func,109DslFunction::RowIndex { name, offset } => FunctionIR::RowIndex {110name,111offset,112schema: Default::default(),113},114DslFunction::Unnest(selector) => {115let columns = selector.into_columns(input_schema, &Default::default())?;116let columns = columns.into_iter().collect();117FunctionIR::Unnest { columns }118},119#[cfg(feature = "python")]120DslFunction::OpaquePython(inner) => FunctionIR::OpaquePython(inner),121DslFunction::Stats(_)122| DslFunction::FillNan(_)123| DslFunction::Rename { .. }124| DslFunction::Explode { .. } => {125// We should not reach this.126panic!("impl error")127},128};129Ok(function)130}131}132133impl Debug for DslFunction {134fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {135write!(f, "{self}")136}137}138139impl Display for DslFunction {140fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {141use DslFunction::*;142match self {143FunctionIR(inner) => write!(f, "{inner}"),144v => {145let s: &str = v.into();146write!(f, "{s}")147},148}149}150}151152impl From<FunctionIR> for DslFunction {153fn from(value: FunctionIR) -> Self {154DslFunction::FunctionIR(value)155}156}157158159