Path: blob/main/crates/polars-mem-engine/src/executors/projection.rs
8420 views
use polars_core::utils::accumulate_dataframes_vertical_unchecked;12use super::*;34/// Take an input Executor (creates the input DataFrame)5/// and a multiple PhysicalExpressions (create the output Series)6pub struct ProjectionExec {7pub(crate) input: Box<dyn Executor>,8pub(crate) expr: Vec<Arc<dyn PhysicalExpr>>,9pub(crate) has_windows: bool,10pub(crate) input_schema: SchemaRef,11#[cfg(test)]12pub(crate) schema: SchemaRef,13pub(crate) options: ProjectionOptions,14// Can run all operations elementwise15pub(crate) allow_vertical_parallelism: bool,16}1718impl ProjectionExec {19fn execute_impl(20&mut self,21state: &ExecutionState,22mut df: DataFrame,23) -> PolarsResult<DataFrame> {24// Vertical and horizontal parallelism.25let df = if self.allow_vertical_parallelism26&& df.first_col_n_chunks() > 127&& df.height() > POOL.current_num_threads() * 228&& self.options.run_parallel29{30let chunks = df.split_chunks().collect::<Vec<_>>();31let iter = chunks.into_par_iter().map(|mut df| {32let selected_cols = evaluate_physical_expressions(33&mut df,34&self.expr,35state,36self.has_windows,37self.options.run_parallel,38)?;39check_expand_literals(40&df,41&self.expr,42selected_cols,43df.shape_has_zero(),44self.options,45)46});4748let df = POOL.install(|| iter.collect::<PolarsResult<Vec<_>>>())?;49accumulate_dataframes_vertical_unchecked(df)50}51// Only horizontal parallelism.52else {53#[allow(clippy::let_and_return)]54let selected_cols = evaluate_physical_expressions(55&mut df,56&self.expr,57state,58self.has_windows,59self.options.run_parallel,60)?;61check_expand_literals(62&df,63&self.expr,64selected_cols,65df.shape_has_zero(),66self.options,67)?68};6970// this only runs during testing and check if the runtime type matches the predicted schema71#[cfg(test)]72#[allow(unused_must_use)]73{74// TODO: also check the types.75for (l, r) in df.columns().iter().zip(self.schema.iter_names()) {76assert_eq!(l.name(), r);77}78}7980Ok(df)81}82}8384impl Executor for ProjectionExec {85fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {86state.should_stop()?;87#[cfg(debug_assertions)]88{89if state.verbose() {90eprintln!("run ProjectionExec");91}92}93let df = self.input.execute(state)?;9495let profile_name = if state.has_node_timer() {96let by = self97.expr98.iter()99.map(|s| profile_name(s.as_ref(), self.input_schema.as_ref()))100.collect::<PolarsResult<Vec<_>>>()?;101let name = comma_delimited("select".to_string(), &by);102Cow::Owned(name)103} else {104Cow::Borrowed("")105};106107if state.has_node_timer() {108let new_state = state.clone();109new_state.record(|| self.execute_impl(state, df), profile_name)110} else {111self.execute_impl(state, df)112}113}114}115116117