Path: blob/main/crates/polars-mem-engine/src/executors/hconcat.rs
6940 views
use polars_core::functions::concat_df_horizontal;12use super::*;34pub(crate) struct HConcatExec {5pub(crate) inputs: Vec<Box<dyn Executor>>,6pub(crate) options: HConcatOptions,7}89impl Executor for HConcatExec {10fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {11#[cfg(debug_assertions)]12{13if state.verbose() {14eprintln!("run HConcatExec")15}16}17let mut inputs = std::mem::take(&mut self.inputs);1819let dfs = if !self.options.parallel {20if state.verbose() {21eprintln!("HCONCAT: `parallel=false` hconcat is run sequentially")22}23let mut dfs = Vec::with_capacity(inputs.len());24for (idx, mut input) in inputs.into_iter().enumerate() {25let mut state = state.split();26state.branch_idx += idx;2728let df = input.execute(&mut state)?;2930dfs.push(df);31}32dfs33} else {34if state.verbose() {35eprintln!("HCONCAT: hconcat is run in parallel")36}37// We don't use par_iter directly because the LP may also start threads for every LP (for instance scan_csv)38// this might then lead to a rayon SO. So we take a multitude of the threads to keep work stealing39// within bounds40let out = POOL.install(|| {41inputs42.chunks_mut(POOL.current_num_threads() * 3)43.map(|chunk| {44chunk45.into_par_iter()46.enumerate()47.map(|(idx, input)| {48let mut input = std::mem::take(input);49let mut state = state.split();50state.branch_idx += idx;51input.execute(&mut state)52})53.collect::<PolarsResult<Vec<_>>>()54})55.collect::<PolarsResult<Vec<_>>>()56});57out?.into_iter().flatten().collect()58};5960// Invariant of IR. Schema is already checked to contain no duplicates.61concat_df_horizontal(&dfs, false)62}63}646566