Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-mem-engine/src/executors/union.rs
6940 views
1
use polars_core::utils::concat_df;
2
3
use super::*;
4
5
pub(crate) struct UnionExec {
6
pub(crate) inputs: Vec<Box<dyn Executor>>,
7
pub(crate) options: UnionOptions,
8
}
9
10
impl Executor for UnionExec {
11
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
12
state.should_stop()?;
13
#[cfg(debug_assertions)]
14
{
15
if state.verbose() {
16
eprintln!("run UnionExec")
17
}
18
}
19
let mut inputs = std::mem::take(&mut self.inputs);
20
21
let sliced_path = if let Some((offset, _)) = self.options.slice {
22
offset >= 0
23
} else {
24
false
25
};
26
27
if !self.options.parallel || sliced_path {
28
if state.verbose() {
29
if !self.options.parallel {
30
eprintln!("UNION: `parallel=false` union is run sequentially")
31
} else {
32
eprintln!("UNION: `slice is set` union is run sequentially")
33
}
34
}
35
36
let (slice_offset, mut slice_len) = self.options.slice.unwrap_or((0, usize::MAX));
37
let mut slice_offset = slice_offset as usize;
38
let mut dfs = Vec::with_capacity(inputs.len());
39
40
for (idx, mut input) in inputs.into_iter().enumerate() {
41
let mut state = state.split();
42
state.branch_idx += idx;
43
44
let df = input.execute(&mut state)?;
45
46
if !sliced_path {
47
dfs.push(df);
48
continue;
49
}
50
51
let height = df.height();
52
// this part can be skipped as we haven't reached the offset yet
53
// TODO!: don't read the file yet!
54
if slice_offset > height {
55
slice_offset -= height;
56
}
57
// applying the slice
58
// continue iteration
59
else if slice_offset + slice_len > height {
60
slice_len -= height - slice_offset;
61
if slice_offset == 0 {
62
dfs.push(df);
63
} else {
64
dfs.push(df.slice(slice_offset as i64, usize::MAX));
65
slice_offset = 0;
66
}
67
}
68
// we finished the slice
69
else {
70
dfs.push(df.slice(slice_offset as i64, slice_len));
71
break;
72
}
73
}
74
75
concat_df(&dfs)
76
} else {
77
if state.verbose() {
78
eprintln!("UNION: union is run in parallel")
79
}
80
81
// we don't use par_iter directly because the LP may also start threads for every LP (for instance scan_csv)
82
// this might then lead to a rayon SO. So we take a multitude of the threads to keep work stealing
83
// within bounds
84
let out = POOL.install(|| {
85
inputs
86
.chunks_mut(POOL.current_num_threads() * 3)
87
.map(|chunk| {
88
chunk
89
.into_par_iter()
90
.enumerate()
91
.map(|(idx, input)| {
92
let mut input = std::mem::take(input);
93
let mut state = state.split();
94
state.branch_idx += idx;
95
input.execute(&mut state)
96
})
97
.collect::<PolarsResult<Vec<_>>>()
98
})
99
.collect::<PolarsResult<Vec<_>>>()
100
});
101
102
concat_df(out?.iter().flat_map(|dfs| dfs.iter())).map(|df| {
103
if let Some((offset, len)) = self.options.slice {
104
df.slice(offset, len)
105
} else {
106
df
107
}
108
})
109
}
110
.map(|mut df| {
111
if self.options.rechunk {
112
df.as_single_chunk_par();
113
}
114
df
115
})
116
}
117
}
118
119