Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-mem-engine/src/executors/projection.rs
8420 views
1
use polars_core::utils::accumulate_dataframes_vertical_unchecked;
2
3
use super::*;
4
5
/// Take an input Executor (creates the input DataFrame)
6
/// and a multiple PhysicalExpressions (create the output Series)
7
pub struct ProjectionExec {
8
pub(crate) input: Box<dyn Executor>,
9
pub(crate) expr: Vec<Arc<dyn PhysicalExpr>>,
10
pub(crate) has_windows: bool,
11
pub(crate) input_schema: SchemaRef,
12
#[cfg(test)]
13
pub(crate) schema: SchemaRef,
14
pub(crate) options: ProjectionOptions,
15
// Can run all operations elementwise
16
pub(crate) allow_vertical_parallelism: bool,
17
}
18
19
impl ProjectionExec {
20
fn execute_impl(
21
&mut self,
22
state: &ExecutionState,
23
mut df: DataFrame,
24
) -> PolarsResult<DataFrame> {
25
// Vertical and horizontal parallelism.
26
let df = if self.allow_vertical_parallelism
27
&& df.first_col_n_chunks() > 1
28
&& df.height() > POOL.current_num_threads() * 2
29
&& self.options.run_parallel
30
{
31
let chunks = df.split_chunks().collect::<Vec<_>>();
32
let iter = chunks.into_par_iter().map(|mut df| {
33
let selected_cols = evaluate_physical_expressions(
34
&mut df,
35
&self.expr,
36
state,
37
self.has_windows,
38
self.options.run_parallel,
39
)?;
40
check_expand_literals(
41
&df,
42
&self.expr,
43
selected_cols,
44
df.shape_has_zero(),
45
self.options,
46
)
47
});
48
49
let df = POOL.install(|| iter.collect::<PolarsResult<Vec<_>>>())?;
50
accumulate_dataframes_vertical_unchecked(df)
51
}
52
// Only horizontal parallelism.
53
else {
54
#[allow(clippy::let_and_return)]
55
let selected_cols = evaluate_physical_expressions(
56
&mut df,
57
&self.expr,
58
state,
59
self.has_windows,
60
self.options.run_parallel,
61
)?;
62
check_expand_literals(
63
&df,
64
&self.expr,
65
selected_cols,
66
df.shape_has_zero(),
67
self.options,
68
)?
69
};
70
71
// this only runs during testing and check if the runtime type matches the predicted schema
72
#[cfg(test)]
73
#[allow(unused_must_use)]
74
{
75
// TODO: also check the types.
76
for (l, r) in df.columns().iter().zip(self.schema.iter_names()) {
77
assert_eq!(l.name(), r);
78
}
79
}
80
81
Ok(df)
82
}
83
}
84
85
impl Executor for ProjectionExec {
86
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
87
state.should_stop()?;
88
#[cfg(debug_assertions)]
89
{
90
if state.verbose() {
91
eprintln!("run ProjectionExec");
92
}
93
}
94
let df = self.input.execute(state)?;
95
96
let profile_name = if state.has_node_timer() {
97
let by = self
98
.expr
99
.iter()
100
.map(|s| profile_name(s.as_ref(), self.input_schema.as_ref()))
101
.collect::<PolarsResult<Vec<_>>>()?;
102
let name = comma_delimited("select".to_string(), &by);
103
Cow::Owned(name)
104
} else {
105
Cow::Borrowed("")
106
};
107
108
if state.has_node_timer() {
109
let new_state = state.clone();
110
new_state.record(|| self.execute_impl(state, df), profile_name)
111
} else {
112
self.execute_impl(state, df)
113
}
114
}
115
}
116
117