Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-mem-engine/src/executors/projection_utils.rs
8420 views
1
use polars_plan::constants::CSE_REPLACED;
2
use polars_utils::itertools::Itertools;
3
4
use super::*;
5
6
pub(super) fn profile_name(
7
s: &dyn PhysicalExpr,
8
input_schema: &Schema,
9
) -> PolarsResult<PlSmallStr> {
10
match s.to_field(input_schema) {
11
Err(e) => Err(e),
12
Ok(fld) => Ok(fld.name),
13
}
14
}
15
16
type IdAndExpression = (u32, Arc<dyn PhysicalExpr>);
17
18
#[cfg(feature = "dynamic_group_by")]
19
fn rolling_evaluate(
20
df: &DataFrame,
21
state: &ExecutionState,
22
rolling: PlHashMap<RollingGroupOptions, Vec<IdAndExpression>>,
23
) -> PolarsResult<Vec<Vec<(u32, Column)>>> {
24
POOL.install(|| {
25
rolling
26
.par_iter()
27
.map(|(options, partition)| {
28
// clear the cache for every partitioned group
29
let state = state.split();
30
31
let (_time_key, groups) = df.rolling(None, options)?;
32
33
let groups_key = format!("{options:?}");
34
// Set the groups so all expressions in partition can use it.
35
// Create a separate scope, so the lock is dropped, otherwise we deadlock when the
36
// rolling expression try to get read access.
37
state.window_cache.insert_groups(groups_key, groups);
38
partition
39
.par_iter()
40
.map(|(idx, expr)| expr.evaluate(df, &state).map(|s| (*idx, s)))
41
.collect::<PolarsResult<Vec<_>>>()
42
})
43
.collect()
44
})
45
}
46
47
fn window_evaluate(
48
df: &DataFrame,
49
state: &ExecutionState,
50
window: PlHashMap<String, Vec<IdAndExpression>>,
51
) -> PolarsResult<Vec<Vec<(u32, Column)>>> {
52
if window.is_empty() {
53
return Ok(vec![]);
54
}
55
let n_threads = POOL.current_num_threads();
56
57
let max_hor = window.values().map(|v| v.len()).max().unwrap_or(0);
58
let vert = window.len();
59
60
// We don't want to cache and parallel horizontally and vertically as that keeps many cache
61
// states alive.
62
let (cache, par_vertical, par_horizontal) = if max_hor >= n_threads || max_hor >= vert {
63
(true, false, true)
64
} else {
65
(false, true, true)
66
};
67
68
let apply = |partition: &[(u32, Arc<dyn PhysicalExpr>)]| {
69
// clear the cache for every partitioned group
70
let mut state = state.split();
71
// inform the expression it has window functions.
72
state.insert_has_window_function_flag();
73
74
// caching more than one window expression is a complicated topic for another day
75
// see issue #2523
76
let cache = cache
77
&& partition.len() > 1
78
&& partition.iter().all(|(_, e)| {
79
e.as_expression()
80
.unwrap()
81
.into_iter()
82
.filter(|e| {
83
#[cfg(feature = "dynamic_group_by")]
84
if matches!(e, Expr::Rolling { .. }) {
85
return true;
86
}
87
matches!(e, Expr::Over { .. })
88
})
89
.count()
90
== 1
91
});
92
let mut first_result = None;
93
// First run 1 to fill the cache. Condvars and such don't work as
94
// rayon threads should not be blocked.
95
if cache {
96
let first = &partition[0];
97
let c = first.1.evaluate(df, &state)?;
98
first_result = Some((first.0, c));
99
state.insert_cache_window_flag();
100
} else {
101
state.remove_cache_window_flag();
102
}
103
104
let apply =
105
|index: &u32, e: &Arc<dyn PhysicalExpr>| e.evaluate(df, &state).map(|c| (*index, c));
106
107
let slice = &partition[first_result.is_some() as usize..];
108
let mut results = if par_horizontal {
109
slice
110
.par_iter()
111
.map(|(index, e)| apply(index, e))
112
.collect::<PolarsResult<Vec<_>>>()?
113
} else {
114
slice
115
.iter()
116
.map(|(index, e)| apply(index, e))
117
.collect::<PolarsResult<Vec<_>>>()?
118
};
119
120
if let Some(item) = first_result {
121
results.push(item)
122
}
123
124
Ok(results)
125
};
126
127
if par_vertical {
128
POOL.install(|| window.par_iter().map(|t| apply(t.1)).collect())
129
} else {
130
window.iter().map(|t| apply(t.1)).collect()
131
}
132
}
133
134
fn execute_projection_cached_window_fns(
135
df: &DataFrame,
136
exprs: &[Arc<dyn PhysicalExpr>],
137
state: &ExecutionState,
138
) -> PolarsResult<Vec<Column>> {
139
// We partition by normal expression and window expression
140
// - the normal expressions can run in parallel
141
// - the window expression take more memory and often use the same group_by keys and join tuples
142
// so they are cached and run sequential
143
144
// the partitioning messes with column order, so we also store the idx
145
// and use those to restore the original projection order
146
#[allow(clippy::type_complexity)]
147
// String: partition_name,
148
// u32: index,
149
let mut windows: PlHashMap<String, Vec<IdAndExpression>> = PlHashMap::default();
150
#[cfg(feature = "dynamic_group_by")]
151
let mut rolling: PlHashMap<RollingGroupOptions, Vec<IdAndExpression>> = PlHashMap::default();
152
let mut other = Vec::with_capacity(exprs.len());
153
154
// first we partition the window function by the values they group over.
155
// the group_by values should be cached
156
exprs.iter().enumerate_u32().for_each(|(index, phys)| {
157
let mut is_window = false;
158
if let Some(e) = phys.as_expression() {
159
for e in e.into_iter() {
160
match e {
161
#[cfg(feature = "dynamic_group_by")]
162
Expr::Rolling {
163
function: _,
164
index_column,
165
period,
166
offset,
167
closed_window,
168
} => {
169
if let Expr::Column(index_column) = index_column.as_ref() {
170
let options = RollingGroupOptions {
171
index_column: index_column.clone(),
172
period: *period,
173
offset: *offset,
174
closed_window: *closed_window,
175
};
176
let entry = rolling.entry(options).or_default();
177
entry.push((index, phys.clone()));
178
is_window = true;
179
break;
180
}
181
},
182
Expr::Over {
183
function: _,
184
partition_by,
185
order_by,
186
mapping,
187
} => {
188
let mapping: &str = mapping.into();
189
let mut key = format!("{:?}_{mapping}", partition_by.as_slice());
190
if let Some((e, k)) = order_by {
191
polars_expr::prelude::window_function_format_order_by(
192
&mut key,
193
e.as_ref(),
194
k,
195
)
196
}
197
let entry = windows.entry(key).or_insert_with(Vec::new);
198
entry.push((index, phys.clone()));
199
is_window = true;
200
break;
201
},
202
_ => {},
203
}
204
}
205
} else {
206
// Window physical expressions always have the `Expr`.
207
is_window = false;
208
}
209
if !is_window {
210
other.push((index, phys.as_ref()))
211
}
212
});
213
214
let mut selected_columns = POOL.install(|| {
215
other
216
.par_iter()
217
.map(|(idx, expr)| expr.evaluate(df, state).map(|s| (*idx, s)))
218
.collect::<PolarsResult<Vec<_>>>()
219
})?;
220
221
// Run partitioned rolling expressions.
222
// Per partition we run in parallel. We compute the groups before and store them once per partition.
223
// The rolling expression knows how to fetch the groups.
224
#[cfg(feature = "dynamic_group_by")]
225
{
226
let (a, b) = POOL.join(
227
|| rolling_evaluate(df, state, rolling),
228
|| window_evaluate(df, state, windows),
229
);
230
231
let partitions = a?;
232
for part in partitions {
233
selected_columns.extend_from_slice(&part)
234
}
235
let partitions = b?;
236
for part in partitions {
237
selected_columns.extend_from_slice(&part)
238
}
239
}
240
#[cfg(not(feature = "dynamic_group_by"))]
241
{
242
let partitions = window_evaluate(df, state, windows)?;
243
for part in partitions {
244
selected_columns.extend_from_slice(&part)
245
}
246
}
247
248
selected_columns.sort_unstable_by_key(|tpl| tpl.0);
249
let selected_columns = selected_columns.into_iter().map(|tpl| tpl.1).collect();
250
Ok(selected_columns)
251
}
252
253
fn run_exprs_par(
254
df: &DataFrame,
255
exprs: &[Arc<dyn PhysicalExpr>],
256
state: &ExecutionState,
257
) -> PolarsResult<Vec<Column>> {
258
POOL.install(|| {
259
exprs
260
.par_iter()
261
.map(|expr| expr.evaluate(df, state))
262
.collect()
263
})
264
}
265
266
fn run_exprs_seq(
267
df: &DataFrame,
268
exprs: &[Arc<dyn PhysicalExpr>],
269
state: &ExecutionState,
270
) -> PolarsResult<Vec<Column>> {
271
exprs.iter().map(|expr| expr.evaluate(df, state)).collect()
272
}
273
274
pub(super) fn evaluate_physical_expressions(
275
df: &mut DataFrame,
276
exprs: &[Arc<dyn PhysicalExpr>],
277
state: &ExecutionState,
278
has_windows: bool,
279
run_parallel: bool,
280
) -> PolarsResult<Vec<Column>> {
281
let expr_runner = if has_windows {
282
execute_projection_cached_window_fns
283
} else if run_parallel && exprs.len() > 1 {
284
run_exprs_par
285
} else {
286
run_exprs_seq
287
};
288
289
let selected_columns = expr_runner(df, exprs, state)?;
290
291
if has_windows {
292
state.clear_window_expr_cache();
293
}
294
295
Ok(selected_columns)
296
}
297
298
pub(super) fn check_expand_literals(
299
df: &DataFrame,
300
phys_expr: &[Arc<dyn PhysicalExpr>],
301
mut selected_columns: Vec<Column>,
302
is_empty: bool,
303
options: ProjectionOptions,
304
) -> PolarsResult<DataFrame> {
305
let Some(first_len) = selected_columns.first().map(|s| s.len()) else {
306
return Ok(DataFrame::empty());
307
};
308
let duplicate_check = options.duplicate_check;
309
let should_broadcast = options.should_broadcast;
310
311
// When we have CSE we cannot verify scalars yet.
312
let verify_scalar = if !df.columns().is_empty() {
313
!df.columns()[df.width() - 1]
314
.name()
315
.starts_with(CSE_REPLACED)
316
} else {
317
true
318
};
319
320
let mut df_height = 0;
321
let mut has_empty = false;
322
let mut all_equal_len = true;
323
{
324
let mut names = PlHashSet::with_capacity(selected_columns.len());
325
for s in &selected_columns {
326
let len = s.len();
327
has_empty |= len == 0;
328
df_height = std::cmp::max(df_height, len);
329
if len != first_len {
330
all_equal_len = false;
331
}
332
let name = s.name();
333
334
if duplicate_check && !names.insert(name) {
335
let msg = format!(
336
"the name '{name}' is duplicate\n\n\
337
It's possible that multiple expressions are returning the same default column \
338
name. If this is the case, try renaming the columns with \
339
`.alias(\"new_name\")` to avoid duplicate column names."
340
);
341
return Err(PolarsError::Duplicate(msg.into()));
342
}
343
}
344
}
345
346
// If all series are the same length it is ok. If not we can broadcast Series of length one.
347
if !all_equal_len && should_broadcast {
348
selected_columns = selected_columns
349
.into_iter()
350
.zip(phys_expr)
351
.map(|(series, phys)| {
352
Ok(match series.len() {
353
0 if df_height == 1 => series,
354
1 => {
355
if !has_empty && df_height == 1 {
356
series
357
} else {
358
if has_empty {
359
polars_ensure!(df_height == 1,
360
ShapeMismatch: "Series length {} doesn't match the DataFrame height of {}",
361
series.len(), df_height
362
);
363
364
}
365
366
if verify_scalar && !phys.is_scalar() && std::env::var("POLARS_ALLOW_NON_SCALAR_EXP").as_deref() != Ok("1") {
367
let identifier = match phys.as_expression() {
368
Some(e) => format!("expression: {e}"),
369
None => "this Series".to_string(),
370
};
371
polars_bail!(ShapeMismatch: "Series {}, length {} doesn't match the DataFrame height of {}\n\n\
372
If you want {} to be broadcasted, ensure it is a scalar (for instance by adding '.first()').",
373
series.name(), series.len(), df_height *(!has_empty as usize), identifier
374
);
375
}
376
series.new_from_index(0, df_height * (!has_empty as usize) )
377
}
378
},
379
len if len == df_height => {
380
series
381
},
382
_ => {
383
polars_bail!(
384
ShapeMismatch: "Series length {} doesn't match the DataFrame height of {}",
385
series.len(), df_height
386
)
387
}
388
})
389
})
390
.collect::<PolarsResult<_>>()?
391
}
392
393
// @scalar-opt
394
let selected_columns = selected_columns.into_iter().collect::<Vec<_>>();
395
396
let df = unsafe { DataFrame::new_unchecked_infer_height(selected_columns) };
397
398
// a literal could be projected to a zero length dataframe.
399
// This prevents a panic.
400
let df = if is_empty {
401
let min = df.columns().iter().map(|s| s.len()).min();
402
if min.is_some() { df.head(min) } else { df }
403
} else {
404
df
405
};
406
Ok(df)
407
}
408
409