Path: blob/main/crates/polars-mem-engine/src/executors/stack.rs
6940 views
use polars_core::utils::accumulate_dataframes_vertical_unchecked;1use polars_plan::constants::CSE_REPLACED;23use super::*;45pub struct StackExec {6pub(crate) input: Box<dyn Executor>,7pub(crate) has_windows: bool,8pub(crate) exprs: Vec<Arc<dyn PhysicalExpr>>,9pub(crate) input_schema: SchemaRef,10pub(crate) output_schema: SchemaRef,11pub(crate) options: ProjectionOptions,12// Can run all operations elementwise13pub(crate) allow_vertical_parallelism: bool,14}1516impl StackExec {17fn execute_impl(18&mut self,19state: &ExecutionState,20mut df: DataFrame,21) -> PolarsResult<DataFrame> {22let schema = &*self.output_schema;2324// Vertical and horizontal parallelism.25let df = if self.allow_vertical_parallelism26&& df.first_col_n_chunks() > 127&& df.height() > 028&& self.options.run_parallel29{30let chunks = df.split_chunks().collect::<Vec<_>>();31let iter = chunks.into_par_iter().map(|mut df| {32let res = evaluate_physical_expressions(33&mut df,34&self.exprs,35state,36self.has_windows,37self.options.run_parallel,38)?;39// We don't have to do a broadcast check as cse is not allowed to hit this.40df._add_columns(res.into_iter().collect(), schema)?;41Ok(df)42});4344let df = POOL.install(|| iter.collect::<PolarsResult<Vec<_>>>())?;45accumulate_dataframes_vertical_unchecked(df)46}47// Only horizontal parallelism48else {49let res = evaluate_physical_expressions(50&mut df,51&self.exprs,52state,53self.has_windows,54self.options.run_parallel,55)?;56if !self.options.should_broadcast {57debug_assert!(58res.iter()59.all(|column| column.name().starts_with("__POLARS_CSER_0x")),60"non-broadcasting hstack should only be used for CSE columns"61);62// Safety: this case only appears as a result of63// CSE optimization, and the usage there produces64// new, unique column names. It is immediately65// followed by a projection which pulls out the66// possibly mismatching column lengths.67unsafe { df.column_extend_unchecked(res) };68} else {69let (df_height, df_width) = df.shape();7071// When we have CSE we cannot verify scalars yet.72let verify_scalar = if !df.get_columns().is_empty() {73!df.get_columns()[df.width() - 1]74.name()75.starts_with(CSE_REPLACED)76} else {77true78};79for (i, c) in res.iter().enumerate() {80let len = c.len();81if verify_scalar && len != df_height && len == 1 && df_width > 0 {82#[allow(clippy::collapsible_if)]83if !self.exprs[i].is_scalar()84&& std::env::var("POLARS_ALLOW_NON_SCALAR_EXP").as_deref() != Ok("1")85{86let identifier = match self.exprs[i].as_expression() {87Some(e) => format!("expression: {e}"),88None => "this Series".to_string(),89};90polars_bail!(InvalidOperation: "Series {}, length {} doesn't match the DataFrame height of {}\n\n\91If you want {} to be broadcasted, ensure it is a scalar (for instance by adding '.first()').",92c.name(), len, df_height, identifier93);94}95}96}97df._add_columns(res.into_iter().collect(), schema)?;98}99df100};101102state.clear_window_expr_cache();103104Ok(df)105}106}107108impl Executor for StackExec {109fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {110state.should_stop()?;111#[cfg(debug_assertions)]112{113if state.verbose() {114eprintln!("run StackExec");115}116}117let df = self.input.execute(state)?;118119let profile_name = if state.has_node_timer() {120let by = self121.exprs122.iter()123.map(|s| profile_name(s.as_ref(), self.input_schema.as_ref()))124.collect::<PolarsResult<Vec<_>>>()?;125let name = comma_delimited("with_column".to_string(), &by);126Cow::Owned(name)127} else {128Cow::Borrowed("")129};130131if state.has_node_timer() {132let new_state = state.clone();133new_state.record(|| self.execute_impl(state, df), profile_name)134} else {135self.execute_impl(state, df)136}137}138}139140141