Path: blob/main/crates/polars-plan/src/plans/functions/dsl.rs
8327 views
use polars_compute::rolling::QuantileMethod;1use strum_macros::IntoStaticStr;23use super::*;45#[cfg(feature = "python")]6#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]7#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]8#[derive(Clone)]9pub struct OpaquePythonUdf {10pub function: PythonFunction,11pub schema: Option<SchemaRef>,12/// allow predicate pushdown optimizations13pub predicate_pd: bool,14/// allow projection pushdown optimizations15pub projection_pd: bool,16pub streamable: bool,17pub validate_output: bool,18}1920// Except for Opaque functions, this only has the DSL name of the function.21#[derive(Clone, IntoStaticStr)]22#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]23#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]24#[strum(serialize_all = "SCREAMING_SNAKE_CASE")]25pub enum DslFunction {26RowIndex {27name: PlSmallStr,28offset: Option<IdxSize>,29},30// This is both in DSL and IR because we want to be able to serialize it.31#[cfg(feature = "python")]32OpaquePython(OpaquePythonUdf),33Explode {34columns: Selector,35options: ExplodeOptions,36allow_empty: bool,37},38#[cfg(feature = "pivot")]39Unpivot {40args: UnpivotArgsDSL,41},42Rename {43existing: Arc<[PlSmallStr]>,44new: Arc<[PlSmallStr]>,45strict: bool,46},47Unnest {48columns: Selector,49separator: Option<PlSmallStr>,50},51Stats(StatsFunction),52/// FillValue53FillNan(Expr),54// Function that is already converted to IR.55#[cfg_attr(any(feature = "serde", feature = "dsl-schema"), serde(skip))]56FunctionIR(FunctionIR),57Hint(HintIR),58}5960#[derive(Clone)]61#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]62#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]63pub enum StatsFunction {64Var {65ddof: u8,66},67Std {68ddof: u8,69},70Quantile {71quantile: Expr,72method: QuantileMethod,73},74Median,75Mean,76Sum,77Min,78Max,79}8081pub(crate) fn validate_columns_in_input<S: AsRef<str>, I: IntoIterator<Item = S>>(82columns: I,83input_schema: &Schema,84operation_name: &str,85) -> PolarsResult<()> {86let columns = columns.into_iter();87for c in columns {88polars_ensure!(input_schema.contains(c.as_ref()), ColumnNotFound: "'{}' on column: '{}' is invalid\n\nSchema at this point: {:?}", operation_name, c.as_ref(), input_schema)89}90Ok(())91}9293impl DslFunction {94pub(crate) fn into_function_ir(self, input_schema: &Schema) -> PolarsResult<FunctionIR> {95let function = match self {96#[cfg(feature = "pivot")]97DslFunction::Unpivot { args } => {98polars_ensure!(99!input_schema.contains("variable"),100Duplicate: "duplicate column name variable"101);102103polars_ensure!(104!input_schema.contains("value"),105Duplicate: "duplicate column name value"106);107108let on = match args.on {109None => None,110Some(on) => Some(111on.into_columns(input_schema, &Default::default())?112.into_iter()113.collect::<Vec<_>>(),114),115};116117let index = args118.index119.into_columns(input_schema, &Default::default())?120.into_vec();121122let args = UnpivotArgsIR::new(123input_schema.iter().map(|(name, _)| name.clone()).collect(),124on,125index,126args.value_name,127args.variable_name,128);129130FunctionIR::Unpivot {131args: Arc::new(args),132schema: Default::default(),133}134},135DslFunction::FunctionIR(func) => func,136DslFunction::RowIndex { name, offset } => {137polars_ensure!(138!input_schema.contains(&name),139Duplicate: "duplicate column name {name}"140);141142FunctionIR::RowIndex {143name,144offset,145schema: Default::default(),146}147},148DslFunction::Unnest { columns, separator } => {149let columns = columns.into_columns(input_schema, &Default::default())?;150let columns: Arc<[PlSmallStr]> = columns.into_iter().collect();151for col in columns.iter() {152let dtype = input_schema.try_get(col.as_str())?;153polars_ensure!(154dtype.is_struct(),155InvalidOperation: "invalid dtype: expected 'Struct', got '{:?}' for '{}'", dtype, col156);157}158FunctionIR::Unnest { columns, separator }159},160DslFunction::Hint(h) => FunctionIR::Hint(h),161#[cfg(feature = "python")]162DslFunction::OpaquePython(inner) => FunctionIR::OpaquePython(inner),163DslFunction::Stats(_)164| DslFunction::FillNan(_)165| DslFunction::Rename { .. }166| DslFunction::Explode { .. } => {167// We should not reach this.168panic!("impl error")169},170};171Ok(function)172}173}174175impl Debug for DslFunction {176fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {177write!(f, "{self}")178}179}180181impl Display for DslFunction {182fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {183use DslFunction::*;184match self {185FunctionIR(inner) => write!(f, "{inner}"),186v => {187let s: &str = v.into();188write!(f, "{s}")189},190}191}192}193194impl From<FunctionIR> for DslFunction {195fn from(value: FunctionIR) -> Self {196DslFunction::FunctionIR(value)197}198}199200201