Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-mem-engine/src/executors/stack.rs
6940 views
1
use polars_core::utils::accumulate_dataframes_vertical_unchecked;
2
use polars_plan::constants::CSE_REPLACED;
3
4
use super::*;
5
6
pub struct StackExec {
7
pub(crate) input: Box<dyn Executor>,
8
pub(crate) has_windows: bool,
9
pub(crate) exprs: Vec<Arc<dyn PhysicalExpr>>,
10
pub(crate) input_schema: SchemaRef,
11
pub(crate) output_schema: SchemaRef,
12
pub(crate) options: ProjectionOptions,
13
// Can run all operations elementwise
14
pub(crate) allow_vertical_parallelism: bool,
15
}
16
17
impl StackExec {
18
fn execute_impl(
19
&mut self,
20
state: &ExecutionState,
21
mut df: DataFrame,
22
) -> PolarsResult<DataFrame> {
23
let schema = &*self.output_schema;
24
25
// Vertical and horizontal parallelism.
26
let df = if self.allow_vertical_parallelism
27
&& df.first_col_n_chunks() > 1
28
&& df.height() > 0
29
&& self.options.run_parallel
30
{
31
let chunks = df.split_chunks().collect::<Vec<_>>();
32
let iter = chunks.into_par_iter().map(|mut df| {
33
let res = evaluate_physical_expressions(
34
&mut df,
35
&self.exprs,
36
state,
37
self.has_windows,
38
self.options.run_parallel,
39
)?;
40
// We don't have to do a broadcast check as cse is not allowed to hit this.
41
df._add_columns(res.into_iter().collect(), schema)?;
42
Ok(df)
43
});
44
45
let df = POOL.install(|| iter.collect::<PolarsResult<Vec<_>>>())?;
46
accumulate_dataframes_vertical_unchecked(df)
47
}
48
// Only horizontal parallelism
49
else {
50
let res = evaluate_physical_expressions(
51
&mut df,
52
&self.exprs,
53
state,
54
self.has_windows,
55
self.options.run_parallel,
56
)?;
57
if !self.options.should_broadcast {
58
debug_assert!(
59
res.iter()
60
.all(|column| column.name().starts_with("__POLARS_CSER_0x")),
61
"non-broadcasting hstack should only be used for CSE columns"
62
);
63
// Safety: this case only appears as a result of
64
// CSE optimization, and the usage there produces
65
// new, unique column names. It is immediately
66
// followed by a projection which pulls out the
67
// possibly mismatching column lengths.
68
unsafe { df.column_extend_unchecked(res) };
69
} else {
70
let (df_height, df_width) = df.shape();
71
72
// When we have CSE we cannot verify scalars yet.
73
let verify_scalar = if !df.get_columns().is_empty() {
74
!df.get_columns()[df.width() - 1]
75
.name()
76
.starts_with(CSE_REPLACED)
77
} else {
78
true
79
};
80
for (i, c) in res.iter().enumerate() {
81
let len = c.len();
82
if verify_scalar && len != df_height && len == 1 && df_width > 0 {
83
#[allow(clippy::collapsible_if)]
84
if !self.exprs[i].is_scalar()
85
&& std::env::var("POLARS_ALLOW_NON_SCALAR_EXP").as_deref() != Ok("1")
86
{
87
let identifier = match self.exprs[i].as_expression() {
88
Some(e) => format!("expression: {e}"),
89
None => "this Series".to_string(),
90
};
91
polars_bail!(InvalidOperation: "Series {}, length {} doesn't match the DataFrame height of {}\n\n\
92
If you want {} to be broadcasted, ensure it is a scalar (for instance by adding '.first()').",
93
c.name(), len, df_height, identifier
94
);
95
}
96
}
97
}
98
df._add_columns(res.into_iter().collect(), schema)?;
99
}
100
df
101
};
102
103
state.clear_window_expr_cache();
104
105
Ok(df)
106
}
107
}
108
109
impl Executor for StackExec {
110
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
111
state.should_stop()?;
112
#[cfg(debug_assertions)]
113
{
114
if state.verbose() {
115
eprintln!("run StackExec");
116
}
117
}
118
let df = self.input.execute(state)?;
119
120
let profile_name = if state.has_node_timer() {
121
let by = self
122
.exprs
123
.iter()
124
.map(|s| profile_name(s.as_ref(), self.input_schema.as_ref()))
125
.collect::<PolarsResult<Vec<_>>>()?;
126
let name = comma_delimited("with_column".to_string(), &by);
127
Cow::Owned(name)
128
} else {
129
Cow::Borrowed("")
130
};
131
132
if state.has_node_timer() {
133
let new_state = state.clone();
134
new_state.record(|| self.execute_impl(state, df), profile_name)
135
} else {
136
self.execute_impl(state, df)
137
}
138
}
139
}
140
141