Path: blob/main/crates/polars-mem-engine/src/executors/scan/mod.rs
6940 views
#[cfg(feature = "python")]1mod python_scan;23use std::mem;45use polars_utils::slice_enum::Slice;67#[cfg(feature = "python")]8pub(crate) use self::python_scan::*;9use super::*;10use crate::ScanPredicate;11use crate::prelude::*;1213/// Producer of an in memory DataFrame14pub struct DataFrameExec {15pub(crate) df: Arc<DataFrame>,16pub(crate) projection: Option<Vec<PlSmallStr>>,17}1819impl Executor for DataFrameExec {20fn execute(&mut self, _state: &mut ExecutionState) -> PolarsResult<DataFrame> {21let df = mem::take(&mut self.df);22let mut df = Arc::try_unwrap(df).unwrap_or_else(|df| (*df).clone());2324// projection should be before selection as those are free25// TODO: this is only the case if we don't create new columns26if let Some(projection) = &self.projection {27df = df.select(projection.iter().cloned())?;28}2930Ok(df)31}32}3334pub(crate) struct AnonymousScanExec {35pub(crate) function: Arc<dyn AnonymousScan>,36pub(crate) unified_scan_args: Box<UnifiedScanArgs>,37pub(crate) file_info: FileInfo,38pub(crate) predicate: Option<ScanPredicate>,39pub(crate) output_schema: Option<SchemaRef>,40pub(crate) predicate_has_windows: bool,41}4243impl Executor for AnonymousScanExec {44fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {45let mut args = AnonymousScanArgs {46n_rows: self.unified_scan_args.pre_slice.clone().map(|x| {47assert!(matches!(x, Slice::Positive { offset: 0, .. }));4849x.len()50}),51with_columns: self.unified_scan_args.projection.clone(),52schema: self.file_info.schema.clone(),53output_schema: self.output_schema.clone(),54predicate: None,55};56if self.predicate.is_some() {57state.insert_has_window_function_flag()58}5960match (self.function.allows_predicate_pushdown(), &self.predicate) {61(true, Some(predicate)) => state.record(62|| {63args.predicate = predicate.predicate.as_expression().cloned();64self.function.scan(args)65},66"anonymous_scan".into(),67),68(false, Some(predicate)) => state.record(69|| {70let mut df = self.function.scan(args)?;71let s = predicate.predicate.evaluate(&df, state)?;72if self.predicate_has_windows {73state.clear_window_expr_cache()74}75let mask = s.bool().map_err(76|_| polars_err!(ComputeError: "filter predicate was not of type boolean"),77)?;78df = df.filter(mask)?;7980Ok(df)81},82"anonymous_scan".into(),83),84_ => state.record(|| self.function.scan(args), "anonymous_scan".into()),85}86}87}888990