Path: blob/main/crates/polars-expr/src/expressions/column.rs
6940 views
use std::borrow::Cow;12use polars_core::prelude::*;3use polars_plan::constants::CSE_REPLACED;45use super::*;6use crate::expressions::{AggregationContext, PartitionedAggregation, PhysicalExpr};78pub struct ColumnExpr {9name: PlSmallStr,10expr: Expr,11schema: SchemaRef,12}1314impl ColumnExpr {15pub fn new(name: PlSmallStr, expr: Expr, schema: SchemaRef) -> Self {16Self { name, expr, schema }17}18}1920impl ColumnExpr {21fn check_external_context(22&self,23out: PolarsResult<Column>,24state: &ExecutionState,25) -> PolarsResult<Column> {26match out {27Ok(col) => Ok(col),28Err(e) => {29if state.ext_contexts.is_empty() {30Err(e)31} else {32for df in state.ext_contexts.as_ref() {33let out = df.column(&self.name);34if out.is_ok() {35return out.cloned();36}37}38Err(e)39}40},41}42}4344fn process_by_idx(45&self,46out: &Column,47_state: &ExecutionState,48_schema: &Schema,49df: &DataFrame,50check_state_schema: bool,51) -> PolarsResult<Column> {52if out.name() != &*self.name {53if check_state_schema {54if let Some(schema) = _state.get_schema() {55return self.process_from_state_schema(df, _state, &schema);56}57}5859df.column(&self.name).cloned()60} else {61Ok(out.clone())62}63}64fn process_by_linear_search(65&self,66df: &DataFrame,67_state: &ExecutionState,68_panic_during_test: bool,69) -> PolarsResult<Column> {70df.column(&self.name).cloned()71}7273fn process_from_state_schema(74&self,75df: &DataFrame,76state: &ExecutionState,77schema: &Schema,78) -> PolarsResult<Column> {79match schema.get_full(&self.name) {80None => self.process_by_linear_search(df, state, true),81Some((idx, _, _)) => match df.get_columns().get(idx) {82Some(out) => self.process_by_idx(out, state, schema, df, false),83None => self.process_by_linear_search(df, state, true),84},85}86}8788fn process_cse(&self, df: &DataFrame, schema: &Schema) -> PolarsResult<Column> {89// The CSE columns are added on the rhs.90let offset = schema.len();91let columns = &df.get_columns()[offset..];92// Linear search will be relatively cheap as we only search the CSE columns.93Ok(columns94.iter()95.find(|s| s.name() == &self.name)96.unwrap()97.clone())98}99}100101impl PhysicalExpr for ColumnExpr {102fn as_expression(&self) -> Option<&Expr> {103Some(&self.expr)104}105106fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Column> {107let out = match self.schema.get_full(&self.name) {108Some((idx, _, _)) => {109// check if the schema was correct110// if not do O(n) search111match df.get_columns().get(idx) {112Some(out) => self.process_by_idx(out, state, &self.schema, df, true),113None => {114// partitioned group_by special case115if let Some(schema) = state.get_schema() {116self.process_from_state_schema(df, state, &schema)117} else {118self.process_by_linear_search(df, state, true)119}120},121}122},123// in the future we will throw an error here124// now we do a linear search first as the lazy reported schema may still be incorrect125// in debug builds we panic so that it can be fixed when occurring126None => {127if self.name.starts_with(CSE_REPLACED) {128return self.process_cse(df, &self.schema);129}130self.process_by_linear_search(df, state, true)131},132};133self.check_external_context(out, state)134}135136#[allow(clippy::ptr_arg)]137fn evaluate_on_groups<'a>(138&self,139df: &DataFrame,140groups: &'a GroupPositions,141state: &ExecutionState,142) -> PolarsResult<AggregationContext<'a>> {143let c = self.evaluate(df, state)?;144Ok(AggregationContext::new(c, Cow::Borrowed(groups), false))145}146147fn as_partitioned_aggregator(&self) -> Option<&dyn PartitionedAggregation> {148Some(self)149}150151fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {152input_schema.get_field(&self.name).ok_or_else(|| {153polars_err!(154ColumnNotFound: "could not find {:?} in schema: {:?}", self.name, &input_schema155)156})157}158fn is_scalar(&self) -> bool {159false160}161}162163impl PartitionedAggregation for ColumnExpr {164fn evaluate_partitioned(165&self,166df: &DataFrame,167_groups: &GroupPositions,168state: &ExecutionState,169) -> PolarsResult<Column> {170self.evaluate(df, state)171}172173fn finalize(174&self,175partitioned: Column,176_groups: &GroupPositions,177_state: &ExecutionState,178) -> PolarsResult<Column> {179Ok(partitioned)180}181}182183184