Path: blob/main/crates/polars-mem-engine/src/executors/projection.rs
6940 views
use polars_core::utils::accumulate_dataframes_vertical_unchecked;12use super::*;34/// Take an input Executor (creates the input DataFrame)5/// and a multiple PhysicalExpressions (create the output Series)6pub struct ProjectionExec {7pub(crate) input: Box<dyn Executor>,8pub(crate) expr: Vec<Arc<dyn PhysicalExpr>>,9pub(crate) has_windows: bool,10pub(crate) input_schema: SchemaRef,11#[cfg(test)]12pub(crate) schema: SchemaRef,13pub(crate) options: ProjectionOptions,14// Can run all operations elementwise15pub(crate) allow_vertical_parallelism: bool,16}1718impl ProjectionExec {19fn execute_impl(20&mut self,21state: &ExecutionState,22mut df: DataFrame,23) -> PolarsResult<DataFrame> {24// Vertical and horizontal parallelism.25let df = if self.allow_vertical_parallelism26&& df.first_col_n_chunks() > 127&& df.height() > POOL.current_num_threads() * 228&& self.options.run_parallel29{30let chunks = df.split_chunks().collect::<Vec<_>>();31let iter = chunks.into_par_iter().map(|mut df| {32let selected_cols = evaluate_physical_expressions(33&mut df,34&self.expr,35state,36self.has_windows,37self.options.run_parallel,38)?;39check_expand_literals(&df, &self.expr, selected_cols, df.is_empty(), self.options)40});4142let df = POOL.install(|| iter.collect::<PolarsResult<Vec<_>>>())?;43accumulate_dataframes_vertical_unchecked(df)44}45// Only horizontal parallelism.46else {47#[allow(clippy::let_and_return)]48let selected_cols = evaluate_physical_expressions(49&mut df,50&self.expr,51state,52self.has_windows,53self.options.run_parallel,54)?;55check_expand_literals(&df, &self.expr, selected_cols, df.is_empty(), self.options)?56};5758// this only runs during testing and check if the runtime type matches the predicted schema59#[cfg(test)]60#[allow(unused_must_use)]61{62// TODO: also check the types.63for (l, r) in df.iter().zip(self.schema.iter_names()) {64assert_eq!(l.name(), r);65}66}6768Ok(df)69}70}7172impl Executor for ProjectionExec {73fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {74state.should_stop()?;75#[cfg(debug_assertions)]76{77if state.verbose() {78eprintln!("run ProjectionExec");79}80}81let df = self.input.execute(state)?;8283let profile_name = if state.has_node_timer() {84let by = self85.expr86.iter()87.map(|s| profile_name(s.as_ref(), self.input_schema.as_ref()))88.collect::<PolarsResult<Vec<_>>>()?;89let name = comma_delimited("select".to_string(), &by);90Cow::Owned(name)91} else {92Cow::Borrowed("")93};9495if state.has_node_timer() {96let new_state = state.clone();97new_state.record(|| self.execute_impl(state, df), profile_name)98} else {99self.execute_impl(state, df)100}101}102}103104105