Path: blob/main/crates/polars-plan/src/plans/functions/dsl.rs
8446 views
use polars_compute::rolling::QuantileMethod;1use strum_macros::IntoStaticStr;23use super::*;45#[cfg(feature = "python")]6#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]7#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]8#[derive(Clone)]9pub struct OpaquePythonUdf {10pub function: PythonFunction,11pub schema: Option<SchemaRef>,12/// allow predicate pushdown optimizations13pub predicate_pd: bool,14/// allow projection pushdown optimizations15pub projection_pd: bool,16pub streamable: bool,17pub validate_output: bool,18}1920// Except for Opaque functions, this only has the DSL name of the function.21#[derive(Clone, IntoStaticStr)]22#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]23#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]24#[strum(serialize_all = "SCREAMING_SNAKE_CASE")]25pub enum DslFunction {26RowIndex {27name: PlSmallStr,28offset: Option<IdxSize>,29},30// This is both in DSL and IR because we want to be able to serialize it.31#[cfg(feature = "python")]32OpaquePython(OpaquePythonUdf),33Explode {34columns: Selector,35options: ExplodeOptions,36allow_empty: bool,37},38#[cfg(feature = "pivot")]39Unpivot {40args: UnpivotArgsDSL,41},42Rename {43existing: Arc<[PlSmallStr]>,44new: Arc<[PlSmallStr]>,45strict: bool,46},47Unnest {48columns: Selector,49separator: Option<PlSmallStr>,50},51Stats(StatsFunction),52/// FillValue53FillNan(Expr),54// Function that is already converted to IR.55#[cfg_attr(any(feature = "serde", feature = "dsl-schema"), serde(skip))]56FunctionIR(FunctionIR),57Hint(HintIR),58}5960#[derive(Clone)]61#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]62#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]63pub enum StatsFunction {64Var {65ddof: u8,66},67Std {68ddof: u8,69},70Quantile {71quantile: Expr,72method: QuantileMethod,73},74Median,75Mean,76Sum,77Min,78Max,79}8081pub(crate) fn validate_columns_in_input<S: AsRef<str>, I: IntoIterator<Item = S>>(82columns: I,83input_schema: &Schema,84operation_name: &str,85) -> PolarsResult<()> {86let columns = columns.into_iter();87for c in columns {88polars_ensure!(input_schema.contains(c.as_ref()), ColumnNotFound: "'{}' on column: '{}' is invalid\n\nSchema at this point: {:?}", operation_name, c.as_ref(), input_schema)89}90Ok(())91}9293impl DslFunction {94pub(crate) fn into_function_ir(self, input_schema: &Schema) -> PolarsResult<FunctionIR> {95let function = match self {96#[cfg(feature = "pivot")]97DslFunction::Unpivot { args } => {98let variable_name = args.variable_name.as_deref().unwrap_or("variable");99polars_ensure!(100!input_schema.contains(variable_name),101Duplicate: "duplicate column name '{variable_name}'"102);103104let value_name = args.value_name.as_deref().unwrap_or("value");105polars_ensure!(106!input_schema.contains(value_name),107Duplicate: "duplicate column name '{value_name}'"108);109110let on = match args.on {111None => None,112Some(on) => Some(113on.into_columns(input_schema, &Default::default())?114.into_iter()115.collect::<Vec<_>>(),116),117};118119let index = args120.index121.into_columns(input_schema, &Default::default())?122.into_vec();123124let args = UnpivotArgsIR::new(125input_schema.iter().map(|(name, _)| name.clone()).collect(),126on,127index,128args.value_name,129args.variable_name,130);131132FunctionIR::Unpivot {133args: Arc::new(args),134schema: Default::default(),135}136},137DslFunction::FunctionIR(func) => func,138DslFunction::RowIndex { name, offset } => {139polars_ensure!(140!input_schema.contains(&name),141Duplicate: "duplicate column name {name}"142);143144FunctionIR::RowIndex {145name,146offset,147schema: Default::default(),148}149},150DslFunction::Unnest { columns, separator } => {151let columns = columns.into_columns(input_schema, &Default::default())?;152let columns: Arc<[PlSmallStr]> = columns.into_iter().collect();153for col in columns.iter() {154let dtype = input_schema.try_get(col.as_str())?;155polars_ensure!(156dtype.is_struct(),157InvalidOperation: "invalid dtype: expected 'Struct', got '{:?}' for '{}'", dtype, col158);159}160FunctionIR::Unnest { columns, separator }161},162DslFunction::Hint(h) => FunctionIR::Hint(h),163#[cfg(feature = "python")]164DslFunction::OpaquePython(inner) => FunctionIR::OpaquePython(inner),165DslFunction::Stats(_)166| DslFunction::FillNan(_)167| DslFunction::Rename { .. }168| DslFunction::Explode { .. } => {169// We should not reach this.170panic!("impl error")171},172};173Ok(function)174}175}176177impl Debug for DslFunction {178fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {179write!(f, "{self}")180}181}182183impl Display for DslFunction {184fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {185use DslFunction::*;186match self {187FunctionIR(inner) => write!(f, "{inner}"),188v => {189let s: &str = v.into();190write!(f, "{s}")191},192}193}194}195196impl From<FunctionIR> for DslFunction {197fn from(value: FunctionIR) -> Self {198DslFunction::FunctionIR(value)199}200}201202203