Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-mem-engine/src/executors/hconcat.rs
6940 views
1
use polars_core::functions::concat_df_horizontal;
2
3
use super::*;
4
5
pub(crate) struct HConcatExec {
6
pub(crate) inputs: Vec<Box<dyn Executor>>,
7
pub(crate) options: HConcatOptions,
8
}
9
10
impl Executor for HConcatExec {
11
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
12
#[cfg(debug_assertions)]
13
{
14
if state.verbose() {
15
eprintln!("run HConcatExec")
16
}
17
}
18
let mut inputs = std::mem::take(&mut self.inputs);
19
20
let dfs = if !self.options.parallel {
21
if state.verbose() {
22
eprintln!("HCONCAT: `parallel=false` hconcat is run sequentially")
23
}
24
let mut dfs = Vec::with_capacity(inputs.len());
25
for (idx, mut input) in inputs.into_iter().enumerate() {
26
let mut state = state.split();
27
state.branch_idx += idx;
28
29
let df = input.execute(&mut state)?;
30
31
dfs.push(df);
32
}
33
dfs
34
} else {
35
if state.verbose() {
36
eprintln!("HCONCAT: hconcat is run in parallel")
37
}
38
// We don't use par_iter directly because the LP may also start threads for every LP (for instance scan_csv)
39
// this might then lead to a rayon SO. So we take a multitude of the threads to keep work stealing
40
// within bounds
41
let out = POOL.install(|| {
42
inputs
43
.chunks_mut(POOL.current_num_threads() * 3)
44
.map(|chunk| {
45
chunk
46
.into_par_iter()
47
.enumerate()
48
.map(|(idx, input)| {
49
let mut input = std::mem::take(input);
50
let mut state = state.split();
51
state.branch_idx += idx;
52
input.execute(&mut state)
53
})
54
.collect::<PolarsResult<Vec<_>>>()
55
})
56
.collect::<PolarsResult<Vec<_>>>()
57
});
58
out?.into_iter().flatten().collect()
59
};
60
61
// Invariant of IR. Schema is already checked to contain no duplicates.
62
concat_df_horizontal(&dfs, false)
63
}
64
}
65
66