Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-mem-engine/src/executors/projection_utils.rs
6940 views
1
use polars_plan::constants::CSE_REPLACED;
2
use polars_utils::itertools::Itertools;
3
4
use super::*;
5
6
pub(super) fn profile_name(
7
s: &dyn PhysicalExpr,
8
input_schema: &Schema,
9
) -> PolarsResult<PlSmallStr> {
10
match s.to_field(input_schema) {
11
Err(e) => Err(e),
12
Ok(fld) => Ok(fld.name),
13
}
14
}
15
16
type IdAndExpression = (u32, Arc<dyn PhysicalExpr>);
17
18
#[cfg(feature = "dynamic_group_by")]
19
fn rolling_evaluate(
20
df: &DataFrame,
21
state: &ExecutionState,
22
rolling: PlHashMap<&RollingGroupOptions, Vec<IdAndExpression>>,
23
) -> PolarsResult<Vec<Vec<(u32, Column)>>> {
24
POOL.install(|| {
25
rolling
26
.par_iter()
27
.map(|(options, partition)| {
28
// clear the cache for every partitioned group
29
let state = state.split();
30
31
let (_time_key, groups) = df.rolling(None, options)?;
32
33
let groups_key = format!("{options:?}");
34
// Set the groups so all expressions in partition can use it.
35
// Create a separate scope, so the lock is dropped, otherwise we deadlock when the
36
// rolling expression try to get read access.
37
state.window_cache.insert_groups(groups_key, groups);
38
partition
39
.par_iter()
40
.map(|(idx, expr)| expr.evaluate(df, &state).map(|s| (*idx, s)))
41
.collect::<PolarsResult<Vec<_>>>()
42
})
43
.collect()
44
})
45
}
46
47
fn window_evaluate(
48
df: &DataFrame,
49
state: &ExecutionState,
50
window: PlHashMap<String, Vec<IdAndExpression>>,
51
) -> PolarsResult<Vec<Vec<(u32, Column)>>> {
52
if window.is_empty() {
53
return Ok(vec![]);
54
}
55
let n_threads = POOL.current_num_threads();
56
57
let max_hor = window.values().map(|v| v.len()).max().unwrap_or(0);
58
let vert = window.len();
59
60
// We don't want to cache and parallel horizontally and vertically as that keeps many cache
61
// states alive.
62
let (cache, par_vertical, par_horizontal) = if max_hor >= n_threads || max_hor >= vert {
63
(true, false, true)
64
} else {
65
(false, true, true)
66
};
67
68
let apply = |partition: &[(u32, Arc<dyn PhysicalExpr>)]| {
69
// clear the cache for every partitioned group
70
let mut state = state.split();
71
// inform the expression it has window functions.
72
state.insert_has_window_function_flag();
73
74
// caching more than one window expression is a complicated topic for another day
75
// see issue #2523
76
let cache = cache
77
&& partition.len() > 1
78
&& partition.iter().all(|(_, e)| {
79
e.as_expression()
80
.unwrap()
81
.into_iter()
82
.filter(|e| matches!(e, Expr::Window { .. }))
83
.count()
84
== 1
85
});
86
let mut first_result = None;
87
// First run 1 to fill the cache. Condvars and such don't work as
88
// rayon threads should not be blocked.
89
if cache {
90
let first = &partition[0];
91
let c = first.1.evaluate(df, &state)?;
92
first_result = Some((first.0, c));
93
state.insert_cache_window_flag();
94
} else {
95
state.remove_cache_window_flag();
96
}
97
98
let apply =
99
|index: &u32, e: &Arc<dyn PhysicalExpr>| e.evaluate(df, &state).map(|c| (*index, c));
100
101
let slice = &partition[first_result.is_some() as usize..];
102
let mut results = if par_horizontal {
103
slice
104
.par_iter()
105
.map(|(index, e)| apply(index, e))
106
.collect::<PolarsResult<Vec<_>>>()?
107
} else {
108
slice
109
.iter()
110
.map(|(index, e)| apply(index, e))
111
.collect::<PolarsResult<Vec<_>>>()?
112
};
113
114
if let Some(item) = first_result {
115
results.push(item)
116
}
117
118
Ok(results)
119
};
120
121
if par_vertical {
122
POOL.install(|| window.par_iter().map(|t| apply(t.1)).collect())
123
} else {
124
window.iter().map(|t| apply(t.1)).collect()
125
}
126
}
127
128
fn execute_projection_cached_window_fns(
129
df: &DataFrame,
130
exprs: &[Arc<dyn PhysicalExpr>],
131
state: &ExecutionState,
132
) -> PolarsResult<Vec<Column>> {
133
// We partition by normal expression and window expression
134
// - the normal expressions can run in parallel
135
// - the window expression take more memory and often use the same group_by keys and join tuples
136
// so they are cached and run sequential
137
138
// the partitioning messes with column order, so we also store the idx
139
// and use those to restore the original projection order
140
#[allow(clippy::type_complexity)]
141
// String: partition_name,
142
// u32: index,
143
let mut windows: PlHashMap<String, Vec<IdAndExpression>> = PlHashMap::default();
144
#[cfg(feature = "dynamic_group_by")]
145
let mut rolling: PlHashMap<&RollingGroupOptions, Vec<IdAndExpression>> = PlHashMap::default();
146
let mut other = Vec::with_capacity(exprs.len());
147
148
// first we partition the window function by the values they group over.
149
// the group_by values should be cached
150
exprs.iter().enumerate_u32().for_each(|(index, phys)| {
151
let mut is_window = false;
152
if let Some(e) = phys.as_expression() {
153
for e in e.into_iter() {
154
if let Expr::Window {
155
partition_by,
156
options,
157
order_by,
158
..
159
} = e
160
{
161
let entry = match options {
162
WindowType::Over(g) => {
163
let g: &str = g.into();
164
let mut key = format!("{:?}_{}", partition_by.as_slice(), g);
165
if let Some((e, k)) = order_by {
166
polars_expr::prelude::window_function_format_order_by(
167
&mut key,
168
e.as_ref(),
169
k,
170
)
171
}
172
windows.entry(key).or_insert_with(Vec::new)
173
},
174
#[cfg(feature = "dynamic_group_by")]
175
WindowType::Rolling(options) => {
176
rolling.entry(options).or_insert_with(Vec::new)
177
},
178
};
179
entry.push((index, phys.clone()));
180
is_window = true;
181
break;
182
}
183
}
184
} else {
185
// Window physical expressions always have the `Expr`.
186
is_window = false;
187
}
188
if !is_window {
189
other.push((index, phys.as_ref()))
190
}
191
});
192
193
let mut selected_columns = POOL.install(|| {
194
other
195
.par_iter()
196
.map(|(idx, expr)| expr.evaluate(df, state).map(|s| (*idx, s)))
197
.collect::<PolarsResult<Vec<_>>>()
198
})?;
199
200
// Run partitioned rolling expressions.
201
// Per partition we run in parallel. We compute the groups before and store them once per partition.
202
// The rolling expression knows how to fetch the groups.
203
#[cfg(feature = "dynamic_group_by")]
204
{
205
let (a, b) = POOL.join(
206
|| rolling_evaluate(df, state, rolling),
207
|| window_evaluate(df, state, windows),
208
);
209
210
let partitions = a?;
211
for part in partitions {
212
selected_columns.extend_from_slice(&part)
213
}
214
let partitions = b?;
215
for part in partitions {
216
selected_columns.extend_from_slice(&part)
217
}
218
}
219
#[cfg(not(feature = "dynamic_group_by"))]
220
{
221
let partitions = window_evaluate(df, state, windows)?;
222
for part in partitions {
223
selected_columns.extend_from_slice(&part)
224
}
225
}
226
227
selected_columns.sort_unstable_by_key(|tpl| tpl.0);
228
let selected_columns = selected_columns.into_iter().map(|tpl| tpl.1).collect();
229
Ok(selected_columns)
230
}
231
232
fn run_exprs_par(
233
df: &DataFrame,
234
exprs: &[Arc<dyn PhysicalExpr>],
235
state: &ExecutionState,
236
) -> PolarsResult<Vec<Column>> {
237
POOL.install(|| {
238
exprs
239
.par_iter()
240
.map(|expr| expr.evaluate(df, state))
241
.collect()
242
})
243
}
244
245
fn run_exprs_seq(
246
df: &DataFrame,
247
exprs: &[Arc<dyn PhysicalExpr>],
248
state: &ExecutionState,
249
) -> PolarsResult<Vec<Column>> {
250
exprs.iter().map(|expr| expr.evaluate(df, state)).collect()
251
}
252
253
pub(super) fn evaluate_physical_expressions(
254
df: &mut DataFrame,
255
exprs: &[Arc<dyn PhysicalExpr>],
256
state: &ExecutionState,
257
has_windows: bool,
258
run_parallel: bool,
259
) -> PolarsResult<Vec<Column>> {
260
let expr_runner = if has_windows {
261
execute_projection_cached_window_fns
262
} else if run_parallel && exprs.len() > 1 {
263
run_exprs_par
264
} else {
265
run_exprs_seq
266
};
267
268
let selected_columns = expr_runner(df, exprs, state)?;
269
270
if has_windows {
271
state.clear_window_expr_cache();
272
}
273
274
Ok(selected_columns)
275
}
276
277
pub(super) fn check_expand_literals(
278
df: &DataFrame,
279
phys_expr: &[Arc<dyn PhysicalExpr>],
280
mut selected_columns: Vec<Column>,
281
zero_length: bool,
282
options: ProjectionOptions,
283
) -> PolarsResult<DataFrame> {
284
let Some(first_len) = selected_columns.first().map(|s| s.len()) else {
285
return Ok(DataFrame::empty());
286
};
287
let duplicate_check = options.duplicate_check;
288
let should_broadcast = options.should_broadcast;
289
290
// When we have CSE we cannot verify scalars yet.
291
let verify_scalar = if !df.get_columns().is_empty() {
292
!df.get_columns()[df.width() - 1]
293
.name()
294
.starts_with(CSE_REPLACED)
295
} else {
296
true
297
};
298
299
let mut df_height = 0;
300
let mut has_empty = false;
301
let mut all_equal_len = true;
302
{
303
let mut names = PlHashSet::with_capacity(selected_columns.len());
304
for s in &selected_columns {
305
let len = s.len();
306
has_empty |= len == 0;
307
df_height = std::cmp::max(df_height, len);
308
if len != first_len {
309
all_equal_len = false;
310
}
311
let name = s.name();
312
313
if duplicate_check && !names.insert(name) {
314
let msg = format!(
315
"the name '{name}' is duplicate\n\n\
316
It's possible that multiple expressions are returning the same default column \
317
name. If this is the case, try renaming the columns with \
318
`.alias(\"new_name\")` to avoid duplicate column names."
319
);
320
return Err(PolarsError::Duplicate(msg.into()));
321
}
322
}
323
}
324
325
// If all series are the same length it is ok. If not we can broadcast Series of length one.
326
if !all_equal_len && should_broadcast {
327
selected_columns = selected_columns
328
.into_iter()
329
.zip(phys_expr)
330
.map(|(series, phys)| {
331
Ok(match series.len() {
332
0 if df_height == 1 => series,
333
1 => {
334
if !has_empty && df_height == 1 {
335
series
336
} else {
337
if has_empty {
338
polars_ensure!(df_height == 1,
339
ShapeMismatch: "Series length {} doesn't match the DataFrame height of {}",
340
series.len(), df_height
341
);
342
343
}
344
345
if verify_scalar && !phys.is_scalar() && std::env::var("POLARS_ALLOW_NON_SCALAR_EXP").as_deref() != Ok("1") {
346
let identifier = match phys.as_expression() {
347
Some(e) => format!("expression: {e}"),
348
None => "this Series".to_string(),
349
};
350
polars_bail!(ShapeMismatch: "Series {}, length {} doesn't match the DataFrame height of {}\n\n\
351
If you want {} to be broadcasted, ensure it is a scalar (for instance by adding '.first()').",
352
series.name(), series.len(), df_height *(!has_empty as usize), identifier
353
);
354
}
355
series.new_from_index(0, df_height * (!has_empty as usize) )
356
}
357
},
358
len if len == df_height => {
359
series
360
},
361
_ => {
362
polars_bail!(
363
ShapeMismatch: "Series length {} doesn't match the DataFrame height of {}",
364
series.len(), df_height
365
)
366
}
367
})
368
})
369
.collect::<PolarsResult<_>>()?
370
}
371
372
// @scalar-opt
373
let selected_columns = selected_columns.into_iter().collect::<Vec<_>>();
374
375
let df = unsafe { DataFrame::new_no_checks_height_from_first(selected_columns) };
376
377
// a literal could be projected to a zero length dataframe.
378
// This prevents a panic.
379
let df = if zero_length {
380
let min = df.get_columns().iter().map(|s| s.len()).min();
381
if min.is_some() { df.head(min) } else { df }
382
} else {
383
df
384
};
385
Ok(df)
386
}
387
388