Path: blob/main/crates/polars-expr/src/expressions/column.rs
8416 views
use std::borrow::Cow;12use polars_core::prelude::*;3use polars_plan::constants::CSE_REPLACED;45use super::*;6use crate::expressions::{AggregationContext, PhysicalExpr};78#[derive(Debug)]9pub struct ColumnExpr {10name: PlSmallStr,11expr: Expr,12schema: SchemaRef,13}1415impl ColumnExpr {16pub fn new(name: PlSmallStr, expr: Expr, schema: SchemaRef) -> Self {17Self { name, expr, schema }18}19}2021impl ColumnExpr {22fn check_external_context(23&self,24out: PolarsResult<Column>,25state: &ExecutionState,26) -> PolarsResult<Column> {27match out {28Ok(col) => Ok(col),29Err(e) => {30if state.ext_contexts.is_empty() {31Err(e)32} else {33for df in state.ext_contexts.as_ref() {34let out = df.column(&self.name);35if out.is_ok() {36return out.cloned();37}38}39Err(e)40}41},42}43}4445fn process_by_idx(46&self,47out: &Column,48_state: &ExecutionState,49_schema: &Schema,50df: &DataFrame,51check_state_schema: bool,52) -> PolarsResult<Column> {53if out.name() != &*self.name {54if check_state_schema {55if let Some(schema) = _state.get_schema() {56return self.process_from_state_schema(df, _state, &schema);57}58}5960df.column(&self.name).cloned()61} else {62Ok(out.clone())63}64}65fn process_by_linear_search(66&self,67df: &DataFrame,68_state: &ExecutionState,69_panic_during_test: bool,70) -> PolarsResult<Column> {71df.column(&self.name).cloned()72}7374fn process_from_state_schema(75&self,76df: &DataFrame,77state: &ExecutionState,78schema: &Schema,79) -> PolarsResult<Column> {80match schema.get_full(&self.name) {81None => self.process_by_linear_search(df, state, true),82Some((idx, _, _)) => match df.columns().get(idx) {83Some(out) => self.process_by_idx(out, state, schema, df, false),84None => self.process_by_linear_search(df, state, true),85},86}87}8889fn process_cse(&self, df: &DataFrame, schema: &Schema) -> PolarsResult<Column> {90// The CSE columns are added on the rhs.91let offset = schema.len();92let columns = &df.columns()[offset..];93// Linear search will be relatively cheap as we only search the CSE columns.94Ok(columns95.iter()96.find(|s| s.name() == &self.name)97.unwrap()98.clone())99}100}101102impl PhysicalExpr for ColumnExpr {103fn as_expression(&self) -> Option<&Expr> {104Some(&self.expr)105}106107fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Column> {108let out = match self.schema.get_full(&self.name) {109Some((idx, _, _)) => {110// check if the schema was correct111// if not do O(n) search112match df.columns().get(idx) {113Some(out) => self.process_by_idx(out, state, &self.schema, df, true),114None => {115// partitioned group_by special case116if let Some(schema) = state.get_schema() {117self.process_from_state_schema(df, state, &schema)118} else {119self.process_by_linear_search(df, state, true)120}121},122}123},124// in the future we will throw an error here125// now we do a linear search first as the lazy reported schema may still be incorrect126// in debug builds we panic so that it can be fixed when occurring127None => {128if self.name.starts_with(CSE_REPLACED) {129return self.process_cse(df, &self.schema);130}131self.process_by_linear_search(df, state, true)132},133};134self.check_external_context(out, state)135}136137#[allow(clippy::ptr_arg)]138fn evaluate_on_groups<'a>(139&self,140df: &DataFrame,141groups: &'a GroupPositions,142state: &ExecutionState,143) -> PolarsResult<AggregationContext<'a>> {144let c = self.evaluate(df, state)?;145Ok(AggregationContext::new(c, Cow::Borrowed(groups), false))146}147148fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {149input_schema.get_field(&self.name).ok_or_else(|| {150polars_err!(151ColumnNotFound: "could not find {:?} in schema: {:?}", self.name, &input_schema152)153})154}155fn is_scalar(&self) -> bool {156false157}158159fn as_column(&self) -> Option<PlSmallStr> {160Some(self.name.clone())161}162}163164165