Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-mem-engine/src/executors/projection.rs
6940 views
1
use polars_core::utils::accumulate_dataframes_vertical_unchecked;
2
3
use super::*;
4
5
/// Take an input Executor (creates the input DataFrame)
6
/// and a multiple PhysicalExpressions (create the output Series)
7
pub struct ProjectionExec {
8
pub(crate) input: Box<dyn Executor>,
9
pub(crate) expr: Vec<Arc<dyn PhysicalExpr>>,
10
pub(crate) has_windows: bool,
11
pub(crate) input_schema: SchemaRef,
12
#[cfg(test)]
13
pub(crate) schema: SchemaRef,
14
pub(crate) options: ProjectionOptions,
15
// Can run all operations elementwise
16
pub(crate) allow_vertical_parallelism: bool,
17
}
18
19
impl ProjectionExec {
20
fn execute_impl(
21
&mut self,
22
state: &ExecutionState,
23
mut df: DataFrame,
24
) -> PolarsResult<DataFrame> {
25
// Vertical and horizontal parallelism.
26
let df = if self.allow_vertical_parallelism
27
&& df.first_col_n_chunks() > 1
28
&& df.height() > POOL.current_num_threads() * 2
29
&& self.options.run_parallel
30
{
31
let chunks = df.split_chunks().collect::<Vec<_>>();
32
let iter = chunks.into_par_iter().map(|mut df| {
33
let selected_cols = evaluate_physical_expressions(
34
&mut df,
35
&self.expr,
36
state,
37
self.has_windows,
38
self.options.run_parallel,
39
)?;
40
check_expand_literals(&df, &self.expr, selected_cols, df.is_empty(), self.options)
41
});
42
43
let df = POOL.install(|| iter.collect::<PolarsResult<Vec<_>>>())?;
44
accumulate_dataframes_vertical_unchecked(df)
45
}
46
// Only horizontal parallelism.
47
else {
48
#[allow(clippy::let_and_return)]
49
let selected_cols = evaluate_physical_expressions(
50
&mut df,
51
&self.expr,
52
state,
53
self.has_windows,
54
self.options.run_parallel,
55
)?;
56
check_expand_literals(&df, &self.expr, selected_cols, df.is_empty(), self.options)?
57
};
58
59
// this only runs during testing and check if the runtime type matches the predicted schema
60
#[cfg(test)]
61
#[allow(unused_must_use)]
62
{
63
// TODO: also check the types.
64
for (l, r) in df.iter().zip(self.schema.iter_names()) {
65
assert_eq!(l.name(), r);
66
}
67
}
68
69
Ok(df)
70
}
71
}
72
73
impl Executor for ProjectionExec {
74
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
75
state.should_stop()?;
76
#[cfg(debug_assertions)]
77
{
78
if state.verbose() {
79
eprintln!("run ProjectionExec");
80
}
81
}
82
let df = self.input.execute(state)?;
83
84
let profile_name = if state.has_node_timer() {
85
let by = self
86
.expr
87
.iter()
88
.map(|s| profile_name(s.as_ref(), self.input_schema.as_ref()))
89
.collect::<PolarsResult<Vec<_>>>()?;
90
let name = comma_delimited("select".to_string(), &by);
91
Cow::Owned(name)
92
} else {
93
Cow::Borrowed("")
94
};
95
96
if state.has_node_timer() {
97
let new_state = state.clone();
98
new_state.record(|| self.execute_impl(state, df), profile_name)
99
} else {
100
self.execute_impl(state, df)
101
}
102
}
103
}
104
105