Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-expr/src/expressions/eval.rs
6940 views
1
use std::borrow::Cow;
2
use std::sync::{Arc, Mutex};
3
4
use arrow::array::{Array, ListArray};
5
use polars_core::POOL;
6
use polars_core::chunked_array::builder::AnonymousOwnedListBuilder;
7
use polars_core::chunked_array::from_iterator_par::ChunkedCollectParIterExt;
8
use polars_core::error::{PolarsResult, polars_ensure};
9
use polars_core::frame::DataFrame;
10
use polars_core::prelude::{
11
AnyValue, ChunkCast, ChunkNestingUtils, Column, CompatLevel, DataType, Field, GroupPositions,
12
GroupsType, IntoColumn, ListBuilderTrait, ListChunked,
13
};
14
use polars_core::schema::Schema;
15
use polars_core::series::Series;
16
use polars_core::utils::CustomIterTools;
17
use polars_plan::dsl::{EvalVariant, Expr};
18
use polars_plan::plans::ExprPushdownGroup;
19
use polars_utils::IdxSize;
20
use polars_utils::pl_str::PlSmallStr;
21
use rayon::iter::{IntoParallelIterator, ParallelIterator};
22
23
use super::{AggState, AggregationContext, PhysicalExpr};
24
use crate::state::ExecutionState;
25
26
#[derive(Clone)]
27
pub struct EvalExpr {
28
input: Arc<dyn PhysicalExpr>,
29
evaluation: Arc<dyn PhysicalExpr>,
30
variant: EvalVariant,
31
expr: Expr,
32
allow_threading: bool,
33
// `output_field_with_ctx`` accounts for the aggregation context, if any
34
// It will 'auto-implode/expplode' if needed.
35
output_field_with_ctx: Field,
36
// `non_aggregated_output_dtype`` ignores any aggregation context
37
non_aggregated_output_dtype: DataType,
38
is_scalar: bool,
39
pd_group: ExprPushdownGroup,
40
evaluation_is_scalar: bool,
41
}
42
43
fn offsets_to_groups(offsets: &[i64]) -> Option<GroupPositions> {
44
let mut start = offsets[0];
45
let end = *offsets.last().unwrap();
46
if IdxSize::try_from(end - start).is_err() {
47
return None;
48
}
49
let groups = offsets
50
.iter()
51
.skip(1)
52
.map(|end| {
53
let offset = start as IdxSize;
54
let len = (*end - start) as IdxSize;
55
start = *end;
56
[offset, len]
57
})
58
.collect();
59
Some(
60
GroupsType::Slice {
61
groups,
62
rolling: false,
63
}
64
.into_sliceable(),
65
)
66
}
67
68
impl EvalExpr {
69
#[allow(clippy::too_many_arguments)]
70
pub(crate) fn new(
71
input: Arc<dyn PhysicalExpr>,
72
evaluation: Arc<dyn PhysicalExpr>,
73
variant: EvalVariant,
74
expr: Expr,
75
allow_threading: bool,
76
output_field_with_ctx: Field,
77
non_aggregated_output_dtype: DataType,
78
is_scalar: bool,
79
pd_group: ExprPushdownGroup,
80
evaluation_is_scalar: bool,
81
) -> Self {
82
Self {
83
input,
84
evaluation,
85
variant,
86
expr,
87
allow_threading,
88
output_field_with_ctx,
89
non_aggregated_output_dtype,
90
is_scalar,
91
pd_group,
92
evaluation_is_scalar,
93
}
94
}
95
96
fn run_elementwise_on_values(
97
&self,
98
lst: &ListChunked,
99
state: &ExecutionState,
100
) -> PolarsResult<Column> {
101
if lst.chunks().is_empty() {
102
return Ok(Column::new_empty(
103
self.output_field_with_ctx.name.clone(),
104
&self.non_aggregated_output_dtype,
105
));
106
}
107
108
let lst = lst
109
.trim_lists_to_normalized_offsets()
110
.map_or(Cow::Borrowed(lst), Cow::Owned);
111
112
let output_arrow_dtype = self
113
.non_aggregated_output_dtype
114
.clone()
115
.to_arrow(CompatLevel::newest());
116
let output_arrow_dtype_physical = output_arrow_dtype.underlying_physical_type();
117
118
let apply_to_chunk = |arr: &dyn Array| {
119
let arr: &ListArray<i64> = arr.as_any().downcast_ref().unwrap();
120
121
let values = unsafe {
122
Series::from_chunks_and_dtype_unchecked(
123
PlSmallStr::EMPTY,
124
vec![arr.values().clone()],
125
lst.inner_dtype(),
126
)
127
};
128
129
let df = values.into_frame();
130
131
self.evaluation.evaluate(&df, state).map(|values| {
132
let values = values.take_materialized_series().rechunk().chunks()[0].clone();
133
134
ListArray::<i64>::new(
135
output_arrow_dtype_physical.clone(),
136
arr.offsets().clone(),
137
values,
138
arr.validity().cloned(),
139
)
140
.boxed()
141
})
142
};
143
144
let chunks = if self.allow_threading && lst.chunks().len() > 1 {
145
POOL.install(|| {
146
lst.chunks()
147
.into_par_iter()
148
.map(|x| apply_to_chunk(&**x))
149
.collect::<PolarsResult<Vec<Box<dyn Array>>>>()
150
})?
151
} else {
152
lst.chunks()
153
.iter()
154
.map(|x| apply_to_chunk(&**x))
155
.collect::<PolarsResult<Vec<Box<dyn Array>>>>()?
156
};
157
158
Ok(unsafe {
159
ListChunked::from_chunks(self.output_field_with_ctx.name.clone(), chunks)
160
.cast_unchecked(&self.non_aggregated_output_dtype)
161
.unwrap()
162
}
163
.into_column())
164
}
165
166
fn run_per_sublist(&self, lst: &ListChunked, state: &ExecutionState) -> PolarsResult<Column> {
167
let mut err = None;
168
let mut ca: ListChunked = if self.allow_threading {
169
let m_err = Mutex::new(None);
170
let ca: ListChunked = POOL.install(|| {
171
lst.par_iter()
172
.map(|opt_s| {
173
opt_s.and_then(|s| {
174
let df = s.into_frame();
175
let out = self.evaluation.evaluate(&df, state);
176
match out {
177
Ok(s) => Some(s.take_materialized_series()),
178
Err(e) => {
179
*m_err.lock().unwrap() = Some(e);
180
None
181
},
182
}
183
})
184
})
185
.collect_ca_with_dtype(
186
PlSmallStr::EMPTY,
187
self.non_aggregated_output_dtype.clone(),
188
)
189
});
190
err = m_err.into_inner().unwrap();
191
ca
192
} else {
193
let mut df_container = DataFrame::empty();
194
195
lst.into_iter()
196
.map(|s| {
197
s.and_then(|s| unsafe {
198
df_container.with_column_unchecked(s.into_column());
199
let out = self.evaluation.evaluate(&df_container, state);
200
df_container.clear_columns();
201
match out {
202
Ok(s) => Some(s.take_materialized_series()),
203
Err(e) => {
204
err = Some(e);
205
None
206
},
207
}
208
})
209
})
210
.collect_trusted()
211
};
212
if let Some(err) = err {
213
return Err(err);
214
}
215
216
ca.rename(lst.name().clone());
217
218
// Cast may still be required in some cases, e.g. for an empty frame when running single-threaded
219
if ca.dtype() != &self.non_aggregated_output_dtype {
220
ca.cast(&self.non_aggregated_output_dtype).map(Column::from)
221
} else {
222
Ok(ca.into_column())
223
}
224
}
225
226
fn run_on_group_by_engine(
227
&self,
228
lst: &ListChunked,
229
state: &ExecutionState,
230
) -> PolarsResult<Column> {
231
let lst = lst.rechunk();
232
let arr = lst.downcast_as_array();
233
let groups = offsets_to_groups(arr.offsets()).unwrap();
234
235
// List elements in a series.
236
let values = Series::try_from((PlSmallStr::EMPTY, arr.values().clone())).unwrap();
237
let inner_dtype = lst.inner_dtype();
238
// SAFETY:
239
// Invariant in List means values physicals can be cast to inner dtype
240
let values = unsafe { values.from_physical_unchecked(inner_dtype).unwrap() };
241
242
let df_context = values.into_frame();
243
244
let mut ac = self
245
.evaluation
246
.evaluate_on_groups(&df_context, &groups, state)?;
247
let out = match ac.agg_state() {
248
AggState::AggregatedScalar(_) => {
249
let out = ac.aggregated();
250
out.as_list().into_column()
251
},
252
_ => ac.aggregated(),
253
};
254
Ok(out
255
.with_name(self.output_field_with_ctx.name.clone())
256
.into_column())
257
}
258
259
fn evaluate_on_list_chunked(
260
&self,
261
lst: &ListChunked,
262
state: &ExecutionState,
263
) -> PolarsResult<Column> {
264
let fits_idx_size = lst.get_inner().len() < (IdxSize::MAX as usize);
265
if match self.pd_group {
266
ExprPushdownGroup::Pushable => true,
267
ExprPushdownGroup::Fallible => !lst.has_nulls(),
268
ExprPushdownGroup::Barrier => false,
269
} && !self.evaluation_is_scalar
270
{
271
self.run_elementwise_on_values(lst, state)
272
} else if fits_idx_size && lst.null_count() == 0 && self.evaluation_is_scalar {
273
self.run_on_group_by_engine(lst, state)
274
} else {
275
self.run_per_sublist(lst, state)
276
}
277
}
278
279
fn evaluate_cumulative_eval(
280
&self,
281
input: &Series,
282
min_samples: usize,
283
state: &ExecutionState,
284
) -> PolarsResult<Series> {
285
let finish = |out: Series| {
286
polars_ensure!(
287
out.len() <= 1,
288
ComputeError:
289
"expected single value, got a result with length {}, {:?}",
290
out.len(), out,
291
);
292
Ok(out.get(0).unwrap().into_static())
293
};
294
295
let input = input.clone().with_name(PlSmallStr::EMPTY);
296
let avs = if self.allow_threading {
297
POOL.install(|| {
298
(1..input.len() + 1)
299
.into_par_iter()
300
.map(|len| {
301
let c = input.slice(0, len);
302
if (len - c.null_count()) >= min_samples {
303
let df = c.into_frame();
304
let out = self
305
.evaluation
306
.evaluate(&df, state)?
307
.take_materialized_series();
308
finish(out)
309
} else {
310
Ok(AnyValue::Null)
311
}
312
})
313
.collect::<PolarsResult<Vec<_>>>()
314
})?
315
} else {
316
let mut df_container = DataFrame::empty();
317
(1..input.len() + 1)
318
.map(|len| {
319
let c = input.slice(0, len);
320
if (len - c.null_count()) >= min_samples {
321
unsafe {
322
df_container.with_column_unchecked(c.into_column());
323
let out = self
324
.evaluation
325
.evaluate(&df_container, state)?
326
.take_materialized_series();
327
df_container.clear_columns();
328
finish(out)
329
}
330
} else {
331
Ok(AnyValue::Null)
332
}
333
})
334
.collect::<PolarsResult<Vec<_>>>()?
335
};
336
337
Series::from_any_values_and_dtype(
338
self.output_field_with_ctx.name().clone(),
339
&avs,
340
&self.non_aggregated_output_dtype,
341
true,
342
)
343
}
344
}
345
346
impl PhysicalExpr for EvalExpr {
347
fn as_expression(&self) -> Option<&Expr> {
348
Some(&self.expr)
349
}
350
351
fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Column> {
352
let input = self.input.evaluate(df, state)?;
353
match self.variant {
354
EvalVariant::List => {
355
let lst = input.list()?;
356
self.evaluate_on_list_chunked(lst, state)
357
},
358
EvalVariant::Cumulative { min_samples } => self
359
.evaluate_cumulative_eval(input.as_materialized_series(), min_samples, state)
360
.map(Column::from),
361
}
362
}
363
364
fn evaluate_on_groups<'a>(
365
&self,
366
df: &DataFrame,
367
groups: &'a GroupPositions,
368
state: &ExecutionState,
369
) -> PolarsResult<AggregationContext<'a>> {
370
let mut input = self.input.evaluate_on_groups(df, groups, state)?;
371
match self.variant {
372
EvalVariant::List => {
373
let out = self.evaluate_on_list_chunked(input.get_values().list()?, state)?;
374
input.with_values(out, false, Some(&self.expr))?;
375
},
376
EvalVariant::Cumulative { min_samples } => {
377
let mut builder = AnonymousOwnedListBuilder::new(
378
self.output_field_with_ctx.name().clone(),
379
input.groups().len(),
380
Some(self.non_aggregated_output_dtype.clone()),
381
);
382
for group in input.iter_groups(false) {
383
match group {
384
None => {},
385
Some(group) => {
386
let out =
387
self.evaluate_cumulative_eval(group.as_ref(), min_samples, state)?;
388
builder.append_series(&out)?;
389
},
390
}
391
}
392
393
input.with_values(builder.finish().into_column(), true, Some(&self.expr))?;
394
},
395
}
396
Ok(input)
397
}
398
399
fn to_field(&self, _input_schema: &Schema) -> PolarsResult<Field> {
400
Ok(self.output_field_with_ctx.clone())
401
}
402
403
fn is_scalar(&self) -> bool {
404
self.is_scalar
405
}
406
}
407
408