Path: blob/main/crates/polars-mem-engine/src/executors/union.rs
6940 views
use polars_core::utils::concat_df;12use super::*;34pub(crate) struct UnionExec {5pub(crate) inputs: Vec<Box<dyn Executor>>,6pub(crate) options: UnionOptions,7}89impl Executor for UnionExec {10fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {11state.should_stop()?;12#[cfg(debug_assertions)]13{14if state.verbose() {15eprintln!("run UnionExec")16}17}18let mut inputs = std::mem::take(&mut self.inputs);1920let sliced_path = if let Some((offset, _)) = self.options.slice {21offset >= 022} else {23false24};2526if !self.options.parallel || sliced_path {27if state.verbose() {28if !self.options.parallel {29eprintln!("UNION: `parallel=false` union is run sequentially")30} else {31eprintln!("UNION: `slice is set` union is run sequentially")32}33}3435let (slice_offset, mut slice_len) = self.options.slice.unwrap_or((0, usize::MAX));36let mut slice_offset = slice_offset as usize;37let mut dfs = Vec::with_capacity(inputs.len());3839for (idx, mut input) in inputs.into_iter().enumerate() {40let mut state = state.split();41state.branch_idx += idx;4243let df = input.execute(&mut state)?;4445if !sliced_path {46dfs.push(df);47continue;48}4950let height = df.height();51// this part can be skipped as we haven't reached the offset yet52// TODO!: don't read the file yet!53if slice_offset > height {54slice_offset -= height;55}56// applying the slice57// continue iteration58else if slice_offset + slice_len > height {59slice_len -= height - slice_offset;60if slice_offset == 0 {61dfs.push(df);62} else {63dfs.push(df.slice(slice_offset as i64, usize::MAX));64slice_offset = 0;65}66}67// we finished the slice68else {69dfs.push(df.slice(slice_offset as i64, slice_len));70break;71}72}7374concat_df(&dfs)75} else {76if state.verbose() {77eprintln!("UNION: union is run in parallel")78}7980// we don't use par_iter directly because the LP may also start threads for every LP (for instance scan_csv)81// this might then lead to a rayon SO. So we take a multitude of the threads to keep work stealing82// within bounds83let out = POOL.install(|| {84inputs85.chunks_mut(POOL.current_num_threads() * 3)86.map(|chunk| {87chunk88.into_par_iter()89.enumerate()90.map(|(idx, input)| {91let mut input = std::mem::take(input);92let mut state = state.split();93state.branch_idx += idx;94input.execute(&mut state)95})96.collect::<PolarsResult<Vec<_>>>()97})98.collect::<PolarsResult<Vec<_>>>()99});100101concat_df(out?.iter().flat_map(|dfs| dfs.iter())).map(|df| {102if let Some((offset, len)) = self.options.slice {103df.slice(offset, len)104} else {105df106}107})108}109.map(|mut df| {110if self.options.rechunk {111df.as_single_chunk_par();112}113df114})115}116}117118119