Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-expr/src/expressions/eval.rs
8416 views
1
use std::borrow::Cow;
2
use std::cell::LazyCell;
3
use std::sync::Arc;
4
5
use arrow::bitmap::{Bitmap, BitmapBuilder};
6
use polars_core::chunked_array::builder::AnonymousOwnedListBuilder;
7
use polars_core::error::{PolarsResult, feature_gated};
8
use polars_core::frame::DataFrame;
9
#[cfg(feature = "dtype-array")]
10
use polars_core::prelude::ArrayChunked;
11
use polars_core::prelude::{
12
ChunkCast, ChunkExplode, ChunkNestingUtils, Column, Field, GroupPositions, GroupsType, IdxCa,
13
IntoColumn, ListBuilderTrait, ListChunked,
14
};
15
use polars_core::schema::Schema;
16
use polars_core::series::Series;
17
use polars_plan::dsl::{EvalVariant, Expr};
18
use polars_utils::IdxSize;
19
use polars_utils::pl_str::PlSmallStr;
20
21
use super::{AggregationContext, PhysicalExpr};
22
use crate::state::ExecutionState;
23
24
#[derive(Clone)]
25
pub struct EvalExpr {
26
input: Arc<dyn PhysicalExpr>,
27
evaluation: Arc<dyn PhysicalExpr>,
28
variant: EvalVariant,
29
expr: Expr,
30
output_field: Field,
31
is_scalar: bool,
32
evaluation_is_scalar: bool,
33
evaluation_is_elementwise: bool,
34
evaluation_is_fallible: bool,
35
}
36
37
impl EvalExpr {
38
#[allow(clippy::too_many_arguments)]
39
pub(crate) fn new(
40
input: Arc<dyn PhysicalExpr>,
41
evaluation: Arc<dyn PhysicalExpr>,
42
variant: EvalVariant,
43
expr: Expr,
44
output_field: Field,
45
is_scalar: bool,
46
evaluation_is_scalar: bool,
47
evaluation_is_elementwise: bool,
48
evaluation_is_fallible: bool,
49
) -> Self {
50
Self {
51
input,
52
evaluation,
53
variant,
54
expr,
55
output_field,
56
is_scalar,
57
evaluation_is_scalar,
58
evaluation_is_elementwise,
59
evaluation_is_fallible,
60
}
61
}
62
63
fn evaluate_on_list_chunked(
64
&self,
65
ca: &ListChunked,
66
state: &ExecutionState,
67
is_agg: bool,
68
) -> PolarsResult<Column> {
69
let df = DataFrame::empty_with_height(ca.len());
70
let ca = ca
71
.trim_lists_to_normalized_offsets()
72
.map_or(Cow::Borrowed(ca), Cow::Owned);
73
74
// Fast path: Empty or only nulls.
75
if ca.null_count() == ca.len() {
76
let name = self.output_field.name.clone();
77
return Ok(Column::full_null(name, ca.len(), self.output_field.dtype()));
78
}
79
80
let has_masked_out_values = LazyCell::new(|| ca.has_masked_out_values());
81
let may_fail_on_masked_out_elements = self.evaluation_is_fallible && *has_masked_out_values;
82
83
let flattened = ca.get_inner().into_column();
84
let flattened_len = flattened.len();
85
let validity = ca.rechunk_validity();
86
87
// Fast path: fully elementwise expression without masked out values.
88
if self.evaluation_is_elementwise && !may_fail_on_masked_out_elements {
89
let mut state = state.clone();
90
state.element = Arc::new(Some((flattened, validity.clone())));
91
let mut column = self.evaluation.evaluate(&df, &state)?;
92
93
// Since `lit` is marked as elementwise, this may lead to problems.
94
if column.len() == 1 && flattened_len != 1 {
95
column = column.new_from_index(0, flattened_len);
96
}
97
98
if !is_agg || !self.evaluation_is_scalar {
99
column = ca
100
.with_inner_values(column.as_materialized_series())
101
.into_column();
102
}
103
104
return Ok(column);
105
}
106
107
let offsets = ca.offsets()?;
108
// Detect accidental inclusion of sliced-out elements from chunks after the 1st (if present).
109
assert_eq!(i64::try_from(flattened_len).unwrap(), *offsets.last());
110
111
// Create groups for all valid array elements.
112
let groups = if ca.has_nulls() {
113
let validity = validity.as_ref().unwrap();
114
offsets
115
.offset_and_length_iter()
116
.zip(validity.iter())
117
.filter_map(|((offset, length), validity)| {
118
validity.then_some([offset as IdxSize, length as IdxSize])
119
})
120
.collect()
121
} else {
122
offsets
123
.offset_and_length_iter()
124
.map(|(offset, length)| [offset as IdxSize, length as IdxSize])
125
.collect()
126
};
127
let groups = GroupsType::new_slice(groups, false, true);
128
let groups = Cow::Owned(groups.into_sliceable());
129
130
let mut state = state.clone();
131
state.element = Arc::new(Some((flattened, validity.clone())));
132
133
let mut ac = self.evaluation.evaluate_on_groups(&df, &groups, &state)?;
134
135
ac.groups(); // Update the groups.
136
137
let flat_naive = ac.flat_naive();
138
139
// Fast path. Groups are pointing to the same offsets in the data buffer.
140
if flat_naive.len() == flattened_len
141
&& let Some(output_groups) = ac.groups.as_ref().as_unrolled_slice()
142
&& !(is_agg && self.evaluation_is_scalar)
143
{
144
let groups_are_unchanged = if let Some(validity) = &validity {
145
assert_eq!(validity.set_bits(), output_groups.len());
146
validity
147
.true_idx_iter()
148
.zip(output_groups)
149
.all(|(j, [start, len])| {
150
let (original_start, original_end) =
151
unsafe { offsets.start_end_unchecked(j) };
152
(*start == original_start as IdxSize)
153
& (*len == (original_end - original_start) as IdxSize)
154
})
155
} else {
156
output_groups
157
.iter()
158
.zip(offsets.offset_and_length_iter())
159
.all(|([start, len], (original_start, original_len))| {
160
(*start == original_start as IdxSize) & (*len == original_len as IdxSize)
161
})
162
};
163
164
if groups_are_unchanged {
165
let values = flat_naive.as_materialized_series();
166
return Ok(ca.with_inner_values(values).into_column());
167
}
168
}
169
170
// Slow path. Groups have changed, so we need to gather data again.
171
if is_agg && self.evaluation_is_scalar {
172
let mut values = ac.finalize();
173
174
// We didn't have any groups for the `null` values so we have to reinsert them.
175
if let Some(validity) = validity {
176
values = values.deposit(&validity);
177
}
178
179
Ok(values)
180
} else {
181
let mut ca = ac.aggregated_as_list();
182
183
// We didn't have any groups for the `null` values so we have to reinsert them.
184
if let Some(validity) = validity {
185
ca = Cow::Owned(ca.deposit(&validity));
186
}
187
188
Ok(ca.into_owned().into_column())
189
}
190
}
191
192
#[cfg(feature = "dtype-array")]
193
fn evaluate_on_array_chunked(
194
&self,
195
ca: &ArrayChunked,
196
state: &ExecutionState,
197
as_list: bool,
198
is_agg: bool,
199
) -> PolarsResult<Column> {
200
let df = DataFrame::empty_with_height(ca.len());
201
let ca = ca
202
.trim_lists_to_normalized_offsets()
203
.map_or(Cow::Borrowed(ca), Cow::Owned);
204
205
// Fast path: Empty or only nulls.
206
if ca.null_count() == ca.len() {
207
let name = self.output_field.name.clone();
208
return Ok(Column::full_null(name, ca.len(), self.output_field.dtype()));
209
}
210
211
let flattened = ca.get_inner().into_column();
212
let flattened_len = flattened.len();
213
let validity = ca.rechunk_validity();
214
215
let may_fail_on_masked_out_elements = self.evaluation_is_fallible && ca.has_nulls();
216
217
// Fast path: fully elementwise expression without masked out values.
218
if self.evaluation_is_elementwise && !may_fail_on_masked_out_elements {
219
assert!(!self.evaluation_is_scalar);
220
221
let mut state = state.clone();
222
state.element = Arc::new(Some((flattened, None)));
223
224
let mut column = self.evaluation.evaluate(&df, &state)?;
225
if column.len() == 1 && flattened_len != 1 {
226
column = column.new_from_index(0, flattened_len);
227
}
228
assert_eq!(column.len(), ca.len() * ca.width());
229
230
let dtype = column.dtype().clone();
231
let mut out = ArrayChunked::from_aligned_values(
232
self.output_field.name.clone(),
233
&dtype,
234
ca.width(),
235
column.take_materialized_series().into_chunks(),
236
ca.len(),
237
);
238
239
if let Some(validity) = validity {
240
out.set_validity(&validity);
241
}
242
243
return Ok(if as_list {
244
out.to_list().into_column()
245
} else {
246
out.clone().into_column()
247
});
248
}
249
250
assert_eq!(flattened_len, ca.width() * ca.len());
251
252
// Create groups for all valid array elements.
253
let groups = if ca.has_nulls() {
254
let validity = validity.as_ref().unwrap();
255
(0..ca.len())
256
.filter(|i| unsafe { validity.get_bit_unchecked(*i) })
257
.map(|i| [(i * ca.width()) as IdxSize, ca.width() as IdxSize])
258
.collect()
259
} else {
260
(0..ca.len())
261
.map(|i| [(i * ca.width()) as IdxSize, ca.width() as IdxSize])
262
.collect()
263
};
264
let groups = GroupsType::new_slice(groups, false, true);
265
let groups = Cow::Owned(groups.into_sliceable());
266
267
let mut state = state.clone();
268
state.element = Arc::new(Some((flattened, validity.clone())));
269
270
let mut ac = self.evaluation.evaluate_on_groups(&df, &groups, &state)?;
271
272
ac.groups(); // Update the groups.
273
274
let flat_naive = ac.flat_naive();
275
276
// Fast path. Groups are pointing to the same offsets in the data buffer.
277
if flat_naive.len() == ca.len() * ca.width()
278
&& let Some(output_groups) = ac.groups.as_ref().as_unrolled_slice()
279
&& !(is_agg && self.evaluation_is_scalar)
280
{
281
let ca_width = ca.width() as IdxSize;
282
let groups_are_unchanged = if let Some(validity) = &validity {
283
assert_eq!(validity.set_bits(), output_groups.len());
284
validity
285
.true_idx_iter()
286
.zip(output_groups)
287
.all(|(j, [start, len])| {
288
(*start == j as IdxSize * ca_width) & (*len == ca_width)
289
})
290
} else {
291
use polars_utils::itertools::Itertools;
292
293
output_groups
294
.iter()
295
.enumerate_idx()
296
.all(|(i, [start, len])| (*start == i * ca_width) & (*len == ca_width))
297
};
298
299
if groups_are_unchanged {
300
let values = flat_naive;
301
let dtype = values.dtype().clone();
302
let mut out = ArrayChunked::from_aligned_values(
303
self.output_field.name.clone(),
304
&dtype,
305
ca.width(),
306
values.as_materialized_series().chunks().clone(),
307
ca.len(),
308
);
309
310
if let Some(validity) = validity {
311
out.set_validity(&validity);
312
}
313
314
return Ok(if as_list {
315
out.to_list().into_column()
316
} else {
317
out.into_column()
318
});
319
}
320
}
321
322
// Slow path. Groups have changed, so we need to gather data again.
323
if is_agg && self.evaluation_is_scalar {
324
let mut values = ac.finalize();
325
326
// We didn't have any groups for the `null` values so we have to reinsert them.
327
if let Some(validity) = validity {
328
values = values.deposit(&validity);
329
}
330
331
Ok(values)
332
} else {
333
let mut ca = ac.aggregated_as_list();
334
335
// We didn't have any groups for the `null` values so we have to reinsert them.
336
if let Some(validity) = validity {
337
ca = Cow::Owned(ca.deposit(&validity));
338
}
339
340
Ok(if as_list {
341
ca.into_owned().into_column()
342
} else {
343
ca.cast(self.output_field.dtype()).unwrap().into_column()
344
})
345
}
346
}
347
348
fn evaluate_cumulative_eval(
349
&self,
350
input: &Series,
351
min_samples: usize,
352
state: &ExecutionState,
353
) -> PolarsResult<Column> {
354
if input.is_empty() {
355
return Ok(Column::new_empty(
356
self.output_field.name().clone(),
357
self.output_field.dtype(),
358
));
359
}
360
361
let flattened = input.clone().into_column();
362
let validity = input.rechunk_validity();
363
364
let mut deposit: Option<Bitmap> = None;
365
366
let groups = if min_samples == 0 {
367
(1..input.len() as IdxSize).map(|i| [0, i]).collect()
368
} else {
369
let validity = validity
370
.clone()
371
.unwrap_or_else(|| Bitmap::new_with_value(true, input.len()));
372
let mut count = 0;
373
let mut deposit_builder = BitmapBuilder::with_capacity(input.len());
374
let out = (0..input.len() as IdxSize)
375
.filter(|i| {
376
count += usize::from(unsafe { validity.get_bit_unchecked(*i as usize) });
377
let is_selected = count >= min_samples;
378
unsafe { deposit_builder.push_unchecked(is_selected) };
379
is_selected
380
})
381
.map(|i| [0, i + 1])
382
.collect();
383
deposit = Some(deposit_builder.freeze());
384
out
385
};
386
387
let groups = GroupsType::new_slice(groups, true, true);
388
389
let groups = groups.into_sliceable();
390
391
let df = DataFrame::empty_with_height(input.len());
392
393
let mut state = state.clone();
394
state.element = Arc::new(Some((flattened, validity)));
395
396
let agg = self.evaluation.evaluate_on_groups(&df, &groups, &state)?;
397
let (mut out, _) = agg.get_final_aggregation();
398
399
// Since we only evaluated the expressions on the items that satisfied the min samples, we
400
// need to fix it up here again.
401
if let Some(deposit) = deposit {
402
let mut i = 0;
403
let gather_idxs = deposit
404
.iter()
405
.map(|v| {
406
let out = i;
407
i += IdxSize::from(v);
408
out
409
})
410
.collect::<Vec<IdxSize>>();
411
let gather_idxs =
412
IdxCa::from_vec_validity(PlSmallStr::EMPTY, gather_idxs, Some(deposit));
413
out = unsafe { out.take_unchecked(&gather_idxs) };
414
}
415
416
Ok(out)
417
}
418
}
419
420
impl PhysicalExpr for EvalExpr {
421
fn as_expression(&self) -> Option<&Expr> {
422
Some(&self.expr)
423
}
424
425
fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Column> {
426
let input = self.input.evaluate(df, state)?;
427
match self.variant {
428
EvalVariant::List => {
429
let lst = input.list()?;
430
self.evaluate_on_list_chunked(lst, state, false)
431
},
432
EvalVariant::ListAgg => {
433
let lst = input.list()?;
434
self.evaluate_on_list_chunked(lst, state, true)
435
},
436
EvalVariant::Array { as_list } => feature_gated!("dtype-array", {
437
let arr = input.array()?;
438
self.evaluate_on_array_chunked(arr, state, as_list, false)
439
}),
440
EvalVariant::ArrayAgg => feature_gated!("dtype-array", {
441
let arr = input.array()?;
442
self.evaluate_on_array_chunked(arr, state, true, true)
443
}),
444
EvalVariant::Cumulative { min_samples } => {
445
self.evaluate_cumulative_eval(input.as_materialized_series(), min_samples, state)
446
},
447
}
448
}
449
450
fn evaluate_on_groups<'a>(
451
&self,
452
df: &DataFrame,
453
groups: &'a GroupPositions,
454
state: &ExecutionState,
455
) -> PolarsResult<AggregationContext<'a>> {
456
let mut input = self.input.evaluate_on_groups(df, groups, state)?;
457
input.groups();
458
459
match self.variant {
460
EvalVariant::List => {
461
let input_col = input.flat_naive();
462
let out = self.evaluate_on_list_chunked(input_col.list()?, state, false)?;
463
input.with_values(out, false, Some(&self.expr))?;
464
},
465
EvalVariant::ListAgg => {
466
let input_col = input.flat_naive();
467
let out = self.evaluate_on_list_chunked(input_col.list()?, state, true)?;
468
input.with_values(out, false, Some(&self.expr))?;
469
},
470
EvalVariant::Array { as_list } => feature_gated!("dtype-array", {
471
let arr_col = input.flat_naive();
472
let out =
473
self.evaluate_on_array_chunked(arr_col.array()?, state, as_list, false)?;
474
input.with_values(out, false, Some(&self.expr))?;
475
}),
476
EvalVariant::ArrayAgg => feature_gated!("dtype-array", {
477
let arr_col = input.flat_naive();
478
let out = self.evaluate_on_array_chunked(arr_col.array()?, state, true, true)?;
479
input.with_values(out, false, Some(&self.expr))?;
480
}),
481
EvalVariant::Cumulative { min_samples } => {
482
let mut builder = AnonymousOwnedListBuilder::new(
483
self.output_field.name().clone(),
484
input.groups().len(),
485
Some(self.output_field.dtype.clone()),
486
);
487
for group in input.iter_groups(false) {
488
match group {
489
None => {},
490
Some(group) => {
491
let out =
492
self.evaluate_cumulative_eval(group.as_ref(), min_samples, state)?;
493
builder.append_series(out.as_materialized_series())?;
494
},
495
}
496
}
497
498
input.with_values(builder.finish().into_column(), true, Some(&self.expr))?;
499
},
500
}
501
Ok(input)
502
}
503
504
fn to_field(&self, _input_schema: &Schema) -> PolarsResult<Field> {
505
Ok(self.output_field.clone())
506
}
507
508
fn is_scalar(&self) -> bool {
509
self.is_scalar
510
}
511
}
512
513