Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/physical_plan/lower_expr.rs
8429 views
1
use std::sync::Arc;
2
3
use polars_core::chunked_array::cast::CastOptions;
4
use polars_core::frame::DataFrame;
5
use polars_core::prelude::{
6
DataType, Field, IDX_DTYPE, InitHashMaps, PlHashMap, PlHashSet, PlIndexMap, PlIndexSet,
7
};
8
use polars_core::scalar::Scalar;
9
use polars_core::schema::{Schema, SchemaExt};
10
use polars_error::PolarsResult;
11
use polars_expr::state::ExecutionState;
12
use polars_expr::{ExpressionConversionState, create_physical_expr};
13
use polars_ops::frame::{JoinArgs, JoinType};
14
use polars_ops::series::{RLE_LENGTH_COLUMN_NAME, RLE_VALUE_COLUMN_NAME};
15
use polars_plan::plans::AExpr;
16
use polars_plan::plans::expr_ir::{ExprIR, OutputName};
17
use polars_plan::prelude::*;
18
use polars_utils::arena::{Arena, Node};
19
use polars_utils::itertools::Itertools;
20
use polars_utils::pl_str::PlSmallStr;
21
use polars_utils::{unique_column_name, unitvec};
22
use slotmap::SlotMap;
23
24
use super::fmt::fmt_exprs;
25
use super::{PhysNode, PhysNodeKey, PhysNodeKind, PhysStream, StreamingLowerIRContext};
26
use crate::physical_plan::ZipBehavior;
27
use crate::physical_plan::lower_group_by::build_group_by_stream;
28
use crate::physical_plan::lower_ir::{build_filter_stream, build_row_idx_stream};
29
30
type ExprNodeKey = Node;
31
32
pub(crate) struct ExprCache {
33
is_elementwise: PlHashMap<Node, bool>,
34
is_input_independent: PlHashMap<Node, bool>,
35
is_length_preserving: PlHashMap<Node, bool>,
36
}
37
38
impl ExprCache {
39
pub fn with_capacity(capacity: usize) -> Self {
40
Self {
41
is_elementwise: PlHashMap::with_capacity(capacity),
42
is_input_independent: PlHashMap::with_capacity(capacity),
43
is_length_preserving: PlHashMap::with_capacity(capacity),
44
}
45
}
46
}
47
48
struct LowerExprContext<'a> {
49
prepare_visualization: bool,
50
expr_arena: &'a mut Arena<AExpr>,
51
phys_sm: &'a mut SlotMap<PhysNodeKey, PhysNode>,
52
cache: &'a mut ExprCache,
53
}
54
55
impl<'a> From<LowerExprContext<'a>> for StreamingLowerIRContext {
56
fn from(value: LowerExprContext<'a>) -> Self {
57
Self {
58
prepare_visualization: value.prepare_visualization,
59
}
60
}
61
}
62
impl<'a> From<&LowerExprContext<'a>> for StreamingLowerIRContext {
63
fn from(value: &LowerExprContext<'a>) -> Self {
64
Self {
65
prepare_visualization: value.prepare_visualization,
66
}
67
}
68
}
69
70
pub(crate) fn is_fake_elementwise_function(expr: &AExpr) -> bool {
71
// The in-memory engine treats ApplyList as elementwise but this is not actually
72
// the case. It doesn't cause any problems for the in-memory engine because of
73
// how it does the execution but it causes errors for new-streaming.
74
75
// Some other functions are also marked as elementwise for filter pushdown
76
// but aren't actually elementwise (e.g. arguments aren't same length).
77
match expr {
78
AExpr::Function { function, .. } => {
79
use IRFunctionExpr as F;
80
match function {
81
#[cfg(feature = "is_in")]
82
F::Boolean(IRBooleanFunction::IsIn { .. }) => true,
83
#[cfg(feature = "replace")]
84
F::Replace | F::ReplaceStrict { .. } => true,
85
_ => false,
86
}
87
},
88
_ => false,
89
}
90
}
91
92
pub(crate) fn is_elementwise_rec_cached(
93
expr_key: ExprNodeKey,
94
arena: &Arena<AExpr>,
95
cache: &mut ExprCache,
96
) -> bool {
97
if !cache.is_elementwise.contains_key(&expr_key) {
98
cache.is_elementwise.insert(
99
expr_key,
100
(|| {
101
let mut expr_key = expr_key;
102
let mut stack = unitvec![];
103
104
loop {
105
let ae = arena.get(expr_key);
106
107
if is_fake_elementwise_function(ae) {
108
return false;
109
}
110
111
if !polars_plan::plans::is_elementwise(&mut stack, ae, arena) {
112
return false;
113
}
114
115
let Some(next_key) = stack.pop() else {
116
break;
117
};
118
119
expr_key = next_key;
120
}
121
122
true
123
})(),
124
);
125
}
126
127
*cache.is_elementwise.get(&expr_key).unwrap()
128
}
129
130
#[recursive::recursive]
131
pub fn is_input_independent_rec(
132
expr_key: ExprNodeKey,
133
arena: &Arena<AExpr>,
134
cache: &mut PlHashMap<ExprNodeKey, bool>,
135
) -> bool {
136
if let Some(ret) = cache.get(&expr_key) {
137
return *ret;
138
}
139
140
let ret = match arena.get(expr_key) {
141
// Handled separately in `Eval`.
142
AExpr::Element => unreachable!(),
143
AExpr::StructField(_) => false,
144
AExpr::Explode { expr: inner, .. }
145
| AExpr::Cast {
146
expr: inner,
147
dtype: _,
148
options: _,
149
}
150
| AExpr::Sort {
151
expr: inner,
152
options: _,
153
} => is_input_independent_rec(*inner, arena, cache),
154
AExpr::Column(_) => false,
155
156
AExpr::Literal(_) => true,
157
AExpr::BinaryExpr { left, op: _, right } => {
158
is_input_independent_rec(*left, arena, cache)
159
&& is_input_independent_rec(*right, arena, cache)
160
},
161
AExpr::Gather {
162
expr,
163
idx,
164
returns_scalar: _,
165
null_on_oob: _,
166
} => {
167
is_input_independent_rec(*expr, arena, cache)
168
&& is_input_independent_rec(*idx, arena, cache)
169
},
170
AExpr::SortBy {
171
expr,
172
by,
173
sort_options: _,
174
} => {
175
is_input_independent_rec(*expr, arena, cache)
176
&& by
177
.iter()
178
.all(|expr| is_input_independent_rec(*expr, arena, cache))
179
},
180
AExpr::Filter { input, by } => {
181
is_input_independent_rec(*input, arena, cache)
182
&& is_input_independent_rec(*by, arena, cache)
183
},
184
AExpr::Agg(agg_expr) => match agg_expr.get_input() {
185
polars_plan::plans::NodeInputs::Leaf => true,
186
polars_plan::plans::NodeInputs::Single(expr) => {
187
is_input_independent_rec(expr, arena, cache)
188
},
189
polars_plan::plans::NodeInputs::Many(exprs) => exprs
190
.iter()
191
.all(|expr| is_input_independent_rec(*expr, arena, cache)),
192
},
193
AExpr::Ternary {
194
predicate,
195
truthy,
196
falsy,
197
} => {
198
is_input_independent_rec(*predicate, arena, cache)
199
&& is_input_independent_rec(*truthy, arena, cache)
200
&& is_input_independent_rec(*falsy, arena, cache)
201
},
202
AExpr::AnonymousFunction {
203
input,
204
function: _,
205
options: _,
206
fmt_str: _,
207
}
208
| AExpr::AnonymousAgg {
209
input,
210
function: _,
211
fmt_str: _,
212
}
213
| AExpr::Function {
214
input,
215
function: _,
216
options: _,
217
} => input
218
.iter()
219
.all(|expr| is_input_independent_rec(expr.node(), arena, cache)),
220
AExpr::Eval {
221
expr,
222
evaluation: _,
223
variant: _,
224
} => is_input_independent_rec(*expr, arena, cache),
225
AExpr::StructEval { expr, evaluation } => {
226
is_input_independent_rec(*expr, arena, cache)
227
&& evaluation
228
.iter()
229
.all(|expr| is_input_independent_rec(expr.node(), arena, cache))
230
},
231
#[cfg(feature = "dynamic_group_by")]
232
AExpr::Rolling {
233
function,
234
index_column,
235
period: _,
236
offset: _,
237
closed_window: _,
238
} => {
239
is_input_independent_rec(*function, arena, cache)
240
&& is_input_independent_rec(*index_column, arena, cache)
241
},
242
AExpr::Over {
243
function,
244
partition_by,
245
order_by,
246
mapping: _,
247
} => {
248
is_input_independent_rec(*function, arena, cache)
249
&& partition_by
250
.iter()
251
.all(|expr| is_input_independent_rec(*expr, arena, cache))
252
&& order_by
253
.iter()
254
.all(|(expr, _options)| is_input_independent_rec(*expr, arena, cache))
255
},
256
AExpr::Slice {
257
input,
258
offset,
259
length,
260
} => {
261
is_input_independent_rec(*input, arena, cache)
262
&& is_input_independent_rec(*offset, arena, cache)
263
&& is_input_independent_rec(*length, arena, cache)
264
},
265
AExpr::Len => false,
266
};
267
268
cache.insert(expr_key, ret);
269
ret
270
}
271
272
pub fn is_input_independent(
273
expr_key: ExprNodeKey,
274
expr_arena: &Arena<AExpr>,
275
cache: &mut ExprCache,
276
) -> bool {
277
is_input_independent_rec(expr_key, expr_arena, &mut cache.is_input_independent)
278
}
279
280
fn is_input_independent_ctx(expr_key: ExprNodeKey, ctx: &mut LowerExprContext) -> bool {
281
is_input_independent_rec(
282
expr_key,
283
ctx.expr_arena,
284
&mut ctx.cache.is_input_independent,
285
)
286
}
287
288
fn build_input_independent_node_with_ctx(
289
exprs: &[ExprIR],
290
ctx: &mut LowerExprContext,
291
) -> PolarsResult<PhysNodeKey> {
292
let output_schema = compute_output_schema(&Schema::default(), exprs, ctx.expr_arena)?;
293
Ok(ctx.phys_sm.insert(PhysNode::new(
294
output_schema,
295
PhysNodeKind::InputIndependentSelect {
296
selectors: exprs.to_vec(),
297
},
298
)))
299
}
300
301
#[recursive::recursive]
302
pub fn is_length_preserving_rec(
303
expr_key: ExprNodeKey,
304
arena: &Arena<AExpr>,
305
cache: &mut PlHashMap<ExprNodeKey, bool>,
306
) -> bool {
307
if let Some(ret) = cache.get(&expr_key) {
308
return *ret;
309
}
310
311
let ret = match arena.get(expr_key) {
312
// Handled separately in `Eval`.
313
AExpr::Element => unreachable!(),
314
// Mapped to `Column` in `StructEval`.
315
AExpr::StructField(_) => unreachable!(),
316
317
AExpr::Gather { .. }
318
| AExpr::Explode { .. }
319
| AExpr::Filter { .. }
320
| AExpr::Agg(_)
321
| AExpr::Slice { .. }
322
| AExpr::Len
323
| AExpr::Literal(_) => false,
324
325
AExpr::Column(_) => true,
326
327
AExpr::Cast {
328
expr: inner,
329
dtype: _,
330
options: _,
331
}
332
| AExpr::Sort {
333
expr: inner,
334
options: _,
335
}
336
| AExpr::SortBy {
337
expr: inner,
338
by: _,
339
sort_options: _,
340
} => is_length_preserving_rec(*inner, arena, cache),
341
342
AExpr::BinaryExpr { left, op: _, right } => {
343
// As long as at least one input is length-preserving the other side
344
// should either broadcast or have the same length.
345
is_length_preserving_rec(*left, arena, cache)
346
|| is_length_preserving_rec(*right, arena, cache)
347
},
348
AExpr::Ternary {
349
predicate,
350
truthy,
351
falsy,
352
} => {
353
is_length_preserving_rec(*predicate, arena, cache)
354
|| is_length_preserving_rec(*truthy, arena, cache)
355
|| is_length_preserving_rec(*falsy, arena, cache)
356
},
357
AExpr::AnonymousAgg { .. } => false,
358
AExpr::AnonymousFunction {
359
input,
360
function: _,
361
options,
362
fmt_str: _,
363
}
364
| AExpr::Function {
365
input,
366
function: _,
367
options,
368
} => {
369
// TODO: actually inspect the functions? This is overly conservative.
370
options.is_length_preserving()
371
&& input
372
.iter()
373
.all(|expr| is_length_preserving_rec(expr.node(), arena, cache))
374
},
375
AExpr::Eval {
376
expr,
377
evaluation: _,
378
variant: _,
379
} => is_length_preserving_rec(*expr, arena, cache),
380
#[cfg(feature = "dynamic_group_by")]
381
AExpr::Rolling {
382
function: _,
383
index_column: _,
384
period: _,
385
offset: _,
386
closed_window: _,
387
} => true,
388
AExpr::StructEval {
389
expr,
390
evaluation: _,
391
} => is_length_preserving_rec(*expr, arena, cache),
392
AExpr::Over {
393
function: _, // Actually shouldn't matter for window functions.
394
partition_by: _,
395
order_by: _,
396
mapping,
397
} => !matches!(mapping, WindowMapping::Explode),
398
};
399
400
cache.insert(expr_key, ret);
401
ret
402
}
403
404
#[expect(dead_code)]
405
pub fn is_length_preserving(
406
expr_key: ExprNodeKey,
407
expr_arena: &Arena<AExpr>,
408
cache: &mut ExprCache,
409
) -> bool {
410
is_length_preserving_rec(expr_key, expr_arena, &mut cache.is_length_preserving)
411
}
412
413
fn is_length_preserving_ctx(expr_key: ExprNodeKey, ctx: &mut LowerExprContext) -> bool {
414
is_length_preserving_rec(
415
expr_key,
416
ctx.expr_arena,
417
&mut ctx.cache.is_length_preserving,
418
)
419
}
420
421
fn build_fallback_node_with_ctx(
422
input: PhysStream,
423
exprs: &[ExprIR],
424
ctx: &mut LowerExprContext,
425
) -> PolarsResult<PhysNodeKey> {
426
// Pre-select only the columns that are needed for this fallback expression.
427
let input_schema = &ctx.phys_sm[input.node].output_schema;
428
let mut select_names: PlHashSet<_> = exprs
429
.iter()
430
.flat_map(|expr| {
431
polars_plan::utils::aexpr_to_leaf_names_iter(expr.node(), ctx.expr_arena).cloned()
432
})
433
.collect();
434
// To keep the length correct we have to ensure we select at least one
435
// column.
436
if select_names.is_empty() {
437
if let Some(name) = input_schema.iter_names().next() {
438
select_names.insert(name.clone());
439
}
440
}
441
let input_stream = if input_schema
442
.iter_names()
443
.any(|name| !select_names.contains(name))
444
{
445
let select_exprs = select_names
446
.into_iter()
447
.map(|name| {
448
ExprIR::new(
449
ctx.expr_arena.add(AExpr::Column(name.clone())),
450
OutputName::ColumnLhs(name),
451
)
452
})
453
.collect_vec();
454
build_select_stream_with_ctx(input, &select_exprs, ctx)?
455
} else {
456
input
457
};
458
459
let output_schema = schema_for_select(input_stream, exprs, ctx)?;
460
let mut conv_state = ExpressionConversionState::new(false);
461
let phys_exprs = exprs
462
.iter()
463
.map(|expr| {
464
create_physical_expr(
465
expr,
466
ctx.expr_arena,
467
&ctx.phys_sm[input_stream.node].output_schema,
468
&mut conv_state,
469
)
470
})
471
.try_collect_vec()?;
472
let map = move |df| {
473
let exec_state = ExecutionState::new();
474
let columns = phys_exprs
475
.iter()
476
.map(|phys_expr| phys_expr.evaluate(&df, &exec_state))
477
.try_collect()?;
478
479
DataFrame::new_infer_broadcast(columns)
480
};
481
482
let format_str = ctx.prepare_visualization.then(|| {
483
let mut buffer = String::new();
484
buffer.push_str("SELECT [\n");
485
fmt_exprs(
486
&mut buffer,
487
exprs,
488
ctx.expr_arena,
489
super::fmt::FormatExprStyle::Select,
490
);
491
buffer.push(']');
492
buffer
493
});
494
let kind = PhysNodeKind::InMemoryMap {
495
input: input_stream,
496
map: Arc::new(map),
497
format_str,
498
};
499
Ok(ctx.phys_sm.insert(PhysNode::new(output_schema, kind)))
500
}
501
502
fn simplify_input_streams(
503
orig_input: PhysStream,
504
mut input_streams: PlIndexSet<PhysStream>,
505
ctx: &mut LowerExprContext,
506
) -> PolarsResult<PlIndexSet<PhysStream>> {
507
// Flatten nested zips (ensures the original input columns only occur once).
508
if input_streams.len() > 1 {
509
let mut flattened_input_streams = PlIndexSet::with_capacity(input_streams.len());
510
for input_stream in input_streams {
511
if let PhysNodeKind::Zip {
512
inputs,
513
zip_behavior: ZipBehavior::Broadcast,
514
} = &ctx.phys_sm[input_stream.node].kind
515
{
516
flattened_input_streams.extend(inputs);
517
} else {
518
flattened_input_streams.insert(input_stream);
519
}
520
}
521
input_streams = flattened_input_streams;
522
}
523
524
// Merge reduce nodes that directly operate on the original input.
525
let mut combined_exprs = vec![];
526
input_streams = input_streams
527
.into_iter()
528
.filter(|input_stream| {
529
if let PhysNodeKind::Reduce {
530
input: inner,
531
exprs,
532
} = &ctx.phys_sm[input_stream.node].kind
533
{
534
if *inner == orig_input {
535
combined_exprs.extend(exprs.iter().cloned());
536
ctx.phys_sm.remove(input_stream.node);
537
return false;
538
}
539
}
540
true
541
})
542
.collect();
543
if !combined_exprs.is_empty() {
544
let output_schema = schema_for_select(orig_input, &combined_exprs, ctx)?;
545
let kind = PhysNodeKind::Reduce {
546
input: orig_input,
547
exprs: combined_exprs,
548
};
549
let reduce_node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, kind));
550
input_streams.insert(PhysStream::first(reduce_node_key));
551
}
552
553
Ok(input_streams)
554
}
555
556
// Assuming that agg_node is a reduction, lowers its input recursively and
557
// returns a Reduce node as well a node corresponding to the column to select
558
// from the Reduce node for the aggregate.
559
fn lower_reduce_node(
560
input: PhysStream,
561
agg_node: Node,
562
ctx: &mut LowerExprContext,
563
) -> PolarsResult<(PhysStream, Node)> {
564
let agg_aexpr = ctx.expr_arena.get(agg_node).clone();
565
let mut agg_input = Vec::with_capacity(1);
566
agg_aexpr.inputs_rev(&mut agg_input);
567
agg_input.reverse();
568
569
let (trans_input, trans_exprs) = lower_exprs_with_ctx(input, &agg_input, ctx)?;
570
let trans_agg_node = ctx.expr_arena.add(agg_aexpr.replace_inputs(&trans_exprs));
571
572
let out_name = unique_column_name();
573
let expr_ir = ExprIR::new(trans_agg_node, OutputName::Alias(out_name.clone()));
574
let output_schema = schema_for_select(trans_input, std::slice::from_ref(&expr_ir), ctx)?;
575
let kind = PhysNodeKind::Reduce {
576
input: trans_input,
577
exprs: vec![expr_ir],
578
};
579
580
let reduce_node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, kind));
581
let reduce_stream = PhysStream::first(reduce_node_key);
582
let out_node = ctx.expr_arena.add(AExpr::Column(out_name));
583
Ok((reduce_stream, out_node))
584
}
585
586
// In the recursive lowering we don't bother with named expressions at all, so
587
// we work directly with Nodes.
588
#[recursive::recursive]
589
fn lower_exprs_with_ctx(
590
input: PhysStream,
591
exprs: &[Node],
592
ctx: &mut LowerExprContext,
593
) -> PolarsResult<(PhysStream, Vec<Node>)> {
594
// We have to catch this case separately, in case all the input independent expressions are elementwise.
595
// TODO: we shouldn't always do this when recursing, e.g. pl.col.a.sum() + 1 will still hit this in the recursion.
596
if exprs.iter().all(|e| is_input_independent_ctx(*e, ctx)) {
597
let expr_irs = exprs
598
.iter()
599
.map(|e| ExprIR::new(*e, OutputName::Alias(unique_column_name())))
600
.collect_vec();
601
let node = build_input_independent_node_with_ctx(&expr_irs, ctx)?;
602
let out_exprs = expr_irs
603
.iter()
604
.map(|e| ctx.expr_arena.add(AExpr::Column(e.output_name().clone())))
605
.collect();
606
return Ok((PhysStream::first(node), out_exprs));
607
}
608
609
// Fallback expressions that can directly be applied to the original input.
610
let mut fallback_subset = Vec::new();
611
612
// Streams containing the columns used for executing transformed expressions.
613
let mut input_streams = PlIndexSet::new();
614
615
// The final transformed expressions that will be selected from the zipped
616
// together transformed nodes.
617
let mut transformed_exprs = Vec::with_capacity(exprs.len());
618
619
for expr in exprs.iter().copied() {
620
if is_elementwise_rec_cached(expr, ctx.expr_arena, ctx.cache) {
621
if !is_input_independent_ctx(expr, ctx) {
622
input_streams.insert(input);
623
}
624
transformed_exprs.push(expr);
625
continue;
626
}
627
628
match ctx.expr_arena.get(expr).clone() {
629
// Handled separately in `Eval` expressions.
630
AExpr::Element => unreachable!(),
631
// Mapped to `Column` in `StructEval`.
632
AExpr::StructField(_) => unreachable!(),
633
634
AExpr::Explode {
635
expr: inner,
636
options,
637
} => {
638
// While explode is streamable, it is not elementwise, so we
639
// have to transform it to a select node.
640
let (trans_input, trans_exprs) = lower_exprs_with_ctx(input, &[inner], ctx)?;
641
let exploded_name = unique_column_name();
642
let trans_inner = ctx.expr_arena.add(AExpr::Explode {
643
expr: trans_exprs[0],
644
options,
645
});
646
let explode_expr =
647
ExprIR::new(trans_inner, OutputName::Alias(exploded_name.clone()));
648
let output_schema =
649
schema_for_select(trans_input, std::slice::from_ref(&explode_expr), ctx)?;
650
let node_kind = PhysNodeKind::Select {
651
input: trans_input,
652
selectors: vec![explode_expr.clone()],
653
extend_original: false,
654
};
655
let node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, node_kind));
656
input_streams.insert(PhysStream::first(node_key));
657
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(exploded_name)));
658
},
659
AExpr::Column(_) => unreachable!("column should always be streamable"),
660
AExpr::Literal(_) => {
661
let out_name = unique_column_name();
662
let inner_expr = ExprIR::new(expr, OutputName::Alias(out_name.clone()));
663
let node_key = build_input_independent_node_with_ctx(&[inner_expr], ctx)?;
664
input_streams.insert(PhysStream::first(node_key));
665
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));
666
},
667
668
AExpr::Function {
669
input: ref inner_exprs,
670
function: IRFunctionExpr::Repeat,
671
options: _,
672
} => {
673
assert!(inner_exprs.len() == 2);
674
let out_name = unique_column_name();
675
let value_expr_ir = inner_exprs[0].with_alias(out_name.clone());
676
let repeats_expr_ir = inner_exprs[1].clone();
677
let value_stream = build_select_stream_with_ctx(input, &[value_expr_ir], ctx)?;
678
let repeats_stream = build_select_stream_with_ctx(input, &[repeats_expr_ir], ctx)?;
679
680
let output_schema = ctx.phys_sm[value_stream.node].output_schema.clone();
681
let kind = PhysNodeKind::Repeat {
682
value: value_stream,
683
repeats: repeats_stream,
684
};
685
let repeat_node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, kind));
686
input_streams.insert(PhysStream::first(repeat_node_key));
687
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));
688
},
689
690
AExpr::Function {
691
input: ref inner_exprs,
692
function: IRFunctionExpr::ExtendConstant,
693
options: _,
694
} => {
695
assert!(inner_exprs.len() == 3);
696
let input_schema = &ctx.phys_sm[input.node].output_schema;
697
let out_name = unique_column_name();
698
let first_ir = inner_exprs[0].with_alias(out_name.clone());
699
let out_dtype = first_ir.dtype(input_schema, ctx.expr_arena)?;
700
let mut value_expr_ir = inner_exprs[1].with_alias(out_name.clone());
701
let repeats_expr_ir = inner_exprs[2].clone();
702
703
// Cast the value if necessary.
704
if value_expr_ir.dtype(input_schema, ctx.expr_arena)? != out_dtype {
705
let cast_expr = AExpr::Cast {
706
expr: value_expr_ir.node(),
707
dtype: out_dtype.clone(),
708
options: CastOptions::NonStrict,
709
};
710
value_expr_ir = ExprIR::new(
711
ctx.expr_arena.add(cast_expr),
712
OutputName::Alias(out_name.clone()),
713
);
714
}
715
716
let first_stream = build_select_stream_with_ctx(input, &[first_ir], ctx)?;
717
let value_stream = build_select_stream_with_ctx(input, &[value_expr_ir], ctx)?;
718
let repeats_stream = build_select_stream_with_ctx(input, &[repeats_expr_ir], ctx)?;
719
720
let output_schema = ctx.phys_sm[first_stream.node].output_schema.clone();
721
let repeat_kind = PhysNodeKind::Repeat {
722
value: value_stream,
723
repeats: repeats_stream,
724
};
725
let repeat_node_key = ctx
726
.phys_sm
727
.insert(PhysNode::new(output_schema.clone(), repeat_kind));
728
729
let concat_kind = PhysNodeKind::OrderedUnion {
730
inputs: vec![first_stream, PhysStream::first(repeat_node_key)],
731
};
732
let concat_node_key = ctx
733
.phys_sm
734
.insert(PhysNode::new(output_schema, concat_kind));
735
input_streams.insert(PhysStream::first(concat_node_key));
736
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));
737
},
738
739
AExpr::Function {
740
input: ref inner_exprs,
741
function: IRFunctionExpr::ConcatExpr(_rechunk),
742
options: _,
743
} => {
744
// We have to lower each expression separately as they might have different lengths.
745
let mut concat_streams = Vec::new();
746
let out_name = unique_column_name();
747
for inner_expr in inner_exprs {
748
let (trans_input, trans_expr) =
749
lower_exprs_with_ctx(input, &[inner_expr.node()], ctx)?;
750
let select_expr =
751
ExprIR::new(trans_expr[0], OutputName::Alias(out_name.clone()));
752
concat_streams.push(build_select_stream_with_ctx(
753
trans_input,
754
&[select_expr],
755
ctx,
756
)?);
757
}
758
759
let output_schema = ctx.phys_sm[concat_streams[0].node].output_schema.clone();
760
let node_kind = PhysNodeKind::OrderedUnion {
761
inputs: concat_streams,
762
};
763
let node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, node_kind));
764
input_streams.insert(PhysStream::first(node_key));
765
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));
766
},
767
768
AExpr::Function {
769
input: ref inner_exprs,
770
function: IRFunctionExpr::Unique(maintain_order),
771
options: _,
772
} => {
773
assert!(inner_exprs.len() == 1);
774
// Lower to no-aggregate group-by with unique name.
775
let tmp_name = unique_column_name();
776
let (trans_input, trans_inner_exprs) =
777
lower_exprs_with_ctx(input, &[inner_exprs[0].node()], ctx)?;
778
let group_by_key_expr =
779
ExprIR::new(trans_inner_exprs[0], OutputName::Alias(tmp_name.clone()));
780
let group_by_output_schema =
781
schema_for_select(trans_input, std::slice::from_ref(&group_by_key_expr), ctx)?;
782
let group_by_stream = build_group_by_stream(
783
trans_input,
784
&[group_by_key_expr],
785
&[],
786
group_by_output_schema,
787
maintain_order,
788
Arc::new(GroupbyOptions::default()),
789
None,
790
ctx.expr_arena,
791
ctx.phys_sm,
792
ctx.cache,
793
StreamingLowerIRContext::from(&*ctx),
794
false,
795
)?;
796
input_streams.insert(group_by_stream);
797
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(tmp_name)));
798
},
799
800
AExpr::Function {
801
input: ref inner_exprs,
802
function: IRFunctionExpr::UniqueCounts,
803
options: _,
804
} => {
805
// Transform:
806
// expr.unique_counts().alias(name)
807
// ->
808
// .select(expr.alias(name))
809
// .group_by(_ = name, maintain_order=True)
810
// .agg(name = pl.len())
811
// .select(name)
812
813
assert_eq!(inner_exprs.len(), 1);
814
815
let input_schema = &ctx.phys_sm[input.node].output_schema;
816
817
let key_name = unique_column_name();
818
let tmp_count_name = unique_column_name();
819
820
let input_expr = &inner_exprs[0];
821
let output_dtype = input_expr.dtype(input_schema, ctx.expr_arena)?.clone();
822
let group_by_output_schema = Arc::new(Schema::from_iter([
823
(key_name.clone(), output_dtype),
824
(tmp_count_name.clone(), IDX_DTYPE),
825
]));
826
827
let keys = [input_expr.with_alias(key_name)];
828
let aggs = [ExprIR::new(
829
ctx.expr_arena.add(AExpr::Len),
830
OutputName::Alias(tmp_count_name.clone()),
831
)];
832
833
let stream = build_group_by_stream(
834
input,
835
&keys,
836
&aggs,
837
group_by_output_schema,
838
true,
839
Default::default(),
840
None,
841
ctx.expr_arena,
842
ctx.phys_sm,
843
ctx.cache,
844
StreamingLowerIRContext {
845
prepare_visualization: ctx.prepare_visualization,
846
},
847
false,
848
)?;
849
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(tmp_count_name)));
850
input_streams.insert(stream);
851
},
852
AExpr::Function {
853
input: ref inner_exprs,
854
function:
855
IRFunctionExpr::ValueCounts {
856
sort: false,
857
parallel: _,
858
name: count_name,
859
normalize: false,
860
},
861
options: _,
862
} => {
863
// Transform:
864
// expr.value_counts(
865
// sort=False,
866
// parallel=_,
867
// name=count_name,
868
// normalize=False
869
// ).alias(name)
870
// ->
871
// .select(expr.alias(name))
872
// .group_by(name)
873
// .agg(count_name = pl.len())
874
// .select(pl.struct([name, count_name]))
875
876
assert_eq!(inner_exprs.len(), 1);
877
878
let input_schema = &ctx.phys_sm[input.node].output_schema;
879
880
let tmp_value_name = unique_column_name();
881
let tmp_count_name = unique_column_name();
882
883
let input_expr = &inner_exprs[0];
884
let output_field = input_expr.field(input_schema, ctx.expr_arena)?;
885
let group_by_output_schema = Arc::new(Schema::from_iter([
886
output_field.clone().with_name(tmp_value_name.clone()),
887
Field::new(tmp_count_name.clone(), IDX_DTYPE),
888
]));
889
890
let keys = [input_expr.with_alias(tmp_value_name.clone())];
891
let aggs = [ExprIR::new(
892
ctx.expr_arena.add(AExpr::Len),
893
OutputName::Alias(tmp_count_name.clone()),
894
)];
895
896
let stream = build_group_by_stream(
897
input,
898
&keys,
899
&aggs,
900
group_by_output_schema,
901
false,
902
Default::default(),
903
None,
904
ctx.expr_arena,
905
ctx.phys_sm,
906
ctx.cache,
907
StreamingLowerIRContext {
908
prepare_visualization: ctx.prepare_visualization,
909
},
910
false,
911
)?;
912
913
let value = ExprIR::new(
914
ctx.expr_arena.add(AExpr::Column(tmp_value_name)),
915
OutputName::Alias(output_field.name),
916
);
917
let count = ExprIR::new(
918
ctx.expr_arena.add(AExpr::Column(tmp_count_name)),
919
OutputName::Alias(count_name.clone()),
920
);
921
922
transformed_exprs.push(
923
AExprBuilder::function(
924
vec![value, count],
925
IRFunctionExpr::AsStruct,
926
ctx.expr_arena,
927
)
928
.node(),
929
);
930
input_streams.insert(stream);
931
},
932
933
#[cfg(feature = "mode")]
934
AExpr::Function {
935
input: ref inner_exprs,
936
function: IRFunctionExpr::Mode { maintain_order },
937
options: _,
938
} => {
939
// Transform:
940
// expr.mode()
941
// ->
942
// .select(_t = expr)
943
// .group_by(_t)
944
// .agg(count_name = pl.len())
945
// .select(_t.filter(count_name == count_name.max())
946
947
assert_eq!(inner_exprs.len(), 1);
948
949
let tmp_value_name = unique_column_name();
950
let tmp_count_name = unique_column_name();
951
952
let stream = build_select_stream_with_ctx(
953
input,
954
&[inner_exprs[0].with_alias(tmp_value_name.clone())],
955
ctx,
956
)?;
957
958
let mut group_by_output_schema =
959
ctx.phys_sm[stream.node].output_schema.as_ref().clone();
960
group_by_output_schema.insert(tmp_count_name.clone(), IDX_DTYPE);
961
962
let keys = [AExprBuilder::col(tmp_value_name.clone(), ctx.expr_arena)
963
.expr_ir(tmp_value_name.clone())];
964
let aggs = [ExprIR::new(
965
ctx.expr_arena.add(AExpr::Len),
966
OutputName::Alias(tmp_count_name.clone()),
967
)];
968
969
let stream = build_group_by_stream(
970
stream,
971
&keys,
972
&aggs,
973
Arc::new(group_by_output_schema),
974
maintain_order,
975
Default::default(),
976
None,
977
ctx.expr_arena,
978
ctx.phys_sm,
979
ctx.cache,
980
StreamingLowerIRContext {
981
prepare_visualization: ctx.prepare_visualization,
982
},
983
false,
984
)?;
985
986
let stream = build_select_stream_with_ctx(
987
stream,
988
&[AExprBuilder::col(tmp_value_name.clone(), ctx.expr_arena)
989
.filter(
990
AExprBuilder::col(tmp_count_name.clone(), ctx.expr_arena).eq(
991
AExprBuilder::col(tmp_count_name.clone(), ctx.expr_arena)
992
.max(ctx.expr_arena),
993
ctx.expr_arena,
994
),
995
ctx.expr_arena,
996
)
997
.expr_ir(tmp_value_name.clone())],
998
ctx,
999
)?;
1000
1001
transformed_exprs
1002
.push(AExprBuilder::col(tmp_value_name.clone(), ctx.expr_arena).node());
1003
input_streams.insert(stream);
1004
},
1005
1006
AExpr::Function {
1007
input: ref inner_exprs,
1008
function: IRFunctionExpr::ArgUnique,
1009
options: _,
1010
} => {
1011
// Transform:
1012
// expr.arg_unique()
1013
// ->
1014
// .with_row_index(IDX)
1015
// .group_by(expr)
1016
// .agg(IDX = IDX.first())
1017
// .select(IDX.sort())
1018
1019
assert_eq!(inner_exprs.len(), 1);
1020
1021
let expr_name = unique_column_name();
1022
let idx_name = unique_column_name();
1023
1024
let stream = build_select_stream_with_ctx(
1025
input,
1026
&[inner_exprs[0].with_alias(expr_name.clone())],
1027
ctx,
1028
)?;
1029
1030
let mut group_by_output_schema =
1031
ctx.phys_sm[stream.node].output_schema.as_ref().clone();
1032
group_by_output_schema.insert(idx_name.clone(), IDX_DTYPE);
1033
1034
let stream = build_row_idx_stream(stream, idx_name.clone(), None, ctx.phys_sm);
1035
1036
let keys =
1037
[AExprBuilder::col(expr_name.clone(), ctx.expr_arena).expr_ir(expr_name)];
1038
let aggs = [AExprBuilder::col(idx_name.clone(), ctx.expr_arena)
1039
.first(ctx.expr_arena)
1040
.expr_ir(idx_name.clone())];
1041
1042
let stream = build_group_by_stream(
1043
stream,
1044
&keys,
1045
&aggs,
1046
Arc::new(group_by_output_schema),
1047
false,
1048
Default::default(),
1049
None,
1050
ctx.expr_arena,
1051
ctx.phys_sm,
1052
ctx.cache,
1053
StreamingLowerIRContext {
1054
prepare_visualization: ctx.prepare_visualization,
1055
},
1056
false,
1057
)?;
1058
1059
let expr = AExprBuilder::col(idx_name.clone(), ctx.expr_arena)
1060
.sort(Default::default(), ctx.expr_arena)
1061
.expr_ir(idx_name.clone());
1062
let stream = build_select_stream_with_ctx(stream, &[expr], ctx)?;
1063
1064
transformed_exprs.push(AExprBuilder::col(idx_name.clone(), ctx.expr_arena).node());
1065
input_streams.insert(stream);
1066
},
1067
1068
#[cfg(feature = "is_in")]
1069
AExpr::Function {
1070
input: ref inner_exprs,
1071
function: IRFunctionExpr::Boolean(IRBooleanFunction::IsIn { nulls_equal }),
1072
options: _,
1073
} if is_scalar_ae(inner_exprs[1].node(), ctx.expr_arena) => {
1074
// Translate left and right side separately (they could have different lengths).
1075
1076
use polars_core::prelude::ExplodeOptions;
1077
let left_on_name = unique_column_name();
1078
let right_on_name = unique_column_name();
1079
let (trans_input_left, trans_expr_left) =
1080
lower_exprs_with_ctx(input, &[inner_exprs[0].node()], ctx)?;
1081
let right_expr_exploded_node = match ctx.expr_arena.get(inner_exprs[1].node()) {
1082
// expr.implode().explode() ~= expr (and avoids rechunking)
1083
AExpr::Agg(IRAggExpr::Implode(n)) => *n,
1084
_ => AExprBuilder::new_from_node(inner_exprs[1].node())
1085
.explode(
1086
ctx.expr_arena,
1087
ExplodeOptions {
1088
empty_as_null: false,
1089
keep_nulls: true,
1090
},
1091
)
1092
.node(),
1093
};
1094
let (trans_input_right, trans_expr_right) =
1095
lower_exprs_with_ctx(input, &[right_expr_exploded_node], ctx)?;
1096
1097
// We have to ensure the left input has the right name for the semi-anti-join to
1098
// generate the correct output name.
1099
let left_col_expr = ctx.expr_arena.add(AExpr::Column(left_on_name.clone()));
1100
let left_select_stream = build_select_stream_with_ctx(
1101
trans_input_left,
1102
&[ExprIR::new(
1103
trans_expr_left[0],
1104
OutputName::Alias(left_on_name.clone()),
1105
)],
1106
ctx,
1107
)?;
1108
1109
let node_kind = PhysNodeKind::SemiAntiJoin {
1110
input_left: left_select_stream,
1111
input_right: trans_input_right,
1112
left_on: vec![ExprIR::new(
1113
left_col_expr,
1114
OutputName::Alias(left_on_name.clone()),
1115
)],
1116
right_on: vec![ExprIR::new(
1117
trans_expr_right[0],
1118
OutputName::Alias(right_on_name),
1119
)],
1120
args: JoinArgs {
1121
how: JoinType::Semi,
1122
validation: Default::default(),
1123
suffix: None,
1124
slice: None,
1125
nulls_equal,
1126
coalesce: Default::default(),
1127
maintain_order: Default::default(),
1128
build_side: None,
1129
},
1130
output_bool: true,
1131
};
1132
1133
// SemiAntiJoin with output_bool returns a column with the same name as the first
1134
// input column.
1135
let output_schema = Schema::from_iter([(left_on_name.clone(), DataType::Boolean)]);
1136
let node_key = ctx
1137
.phys_sm
1138
.insert(PhysNode::new(Arc::new(output_schema), node_kind));
1139
input_streams.insert(PhysStream::first(node_key));
1140
transformed_exprs.push(left_col_expr);
1141
},
1142
1143
#[cfg(feature = "cum_agg")]
1144
ref agg_expr @ AExpr::Function {
1145
input: ref inner_exprs,
1146
function:
1147
ref function @ (IRFunctionExpr::CumMin { reverse }
1148
| IRFunctionExpr::CumMax { reverse }
1149
| IRFunctionExpr::CumSum { reverse }
1150
| IRFunctionExpr::CumCount { reverse }
1151
| IRFunctionExpr::CumProd { reverse }),
1152
options: _,
1153
} if !reverse => {
1154
use crate::nodes::cum_agg::CumAggKind;
1155
1156
assert_eq!(inner_exprs.len(), 1);
1157
1158
let input_schema = &ctx.phys_sm[input.node].output_schema;
1159
1160
let value_key = unique_column_name();
1161
let value_dtype =
1162
agg_expr.to_dtype(&ToFieldContext::new(ctx.expr_arena, input_schema))?;
1163
1164
let input = build_select_stream_with_ctx(
1165
input,
1166
&[inner_exprs[0].with_alias(value_key.clone())],
1167
ctx,
1168
)?;
1169
let kind = match function {
1170
IRFunctionExpr::CumMin { .. } => CumAggKind::Min,
1171
IRFunctionExpr::CumMax { .. } => CumAggKind::Max,
1172
IRFunctionExpr::CumSum { .. } => CumAggKind::Sum,
1173
IRFunctionExpr::CumCount { .. } => CumAggKind::Count,
1174
IRFunctionExpr::CumProd { .. } => CumAggKind::Prod,
1175
_ => unreachable!(),
1176
};
1177
let node_kind = PhysNodeKind::CumAgg { input, kind };
1178
1179
let output_schema = Schema::from_iter([(value_key.clone(), value_dtype.clone())]);
1180
let node_key = ctx
1181
.phys_sm
1182
.insert(PhysNode::new(Arc::new(output_schema), node_kind));
1183
input_streams.insert(PhysStream::first(node_key));
1184
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(value_key)));
1185
},
1186
1187
#[cfg(feature = "diff")]
1188
AExpr::Function {
1189
input: ref inner_exprs,
1190
function: IRFunctionExpr::Diff(null_behavior),
1191
options: _,
1192
} => {
1193
use polars_core::scalar::Scalar;
1194
use polars_core::series::ops::NullBehavior;
1195
1196
assert_eq!(inner_exprs.len(), 2);
1197
1198
// Transform:
1199
// expr.diff(offset, "ignore")
1200
// ->
1201
// expr - expr.shift(offset)
1202
1203
let base_expr_ir = &inner_exprs[0];
1204
let base_dtype =
1205
base_expr_ir.dtype(&ctx.phys_sm[input.node].output_schema, ctx.expr_arena)?;
1206
let offset_expr_ir = &inner_exprs[1];
1207
let offset_dtype =
1208
offset_expr_ir.dtype(&ctx.phys_sm[input.node].output_schema, ctx.expr_arena)?;
1209
1210
let mut base = AExprBuilder::new_from_node(base_expr_ir.node());
1211
let cast_dtype = match base_dtype {
1212
DataType::UInt8 => Some(DataType::Int16),
1213
DataType::UInt16 => Some(DataType::Int32),
1214
DataType::UInt32 | DataType::UInt64 => Some(DataType::Int64),
1215
_ => None,
1216
};
1217
if let Some(dtype) = cast_dtype {
1218
base = base.cast(dtype, ctx.expr_arena);
1219
}
1220
1221
let mut offset = AExprBuilder::new_from_node(offset_expr_ir.node());
1222
if offset_dtype != &DataType::Int64 {
1223
offset = offset.cast(DataType::Int64, ctx.expr_arena);
1224
}
1225
1226
let shifted = base.shift(offset.node(), ctx.expr_arena);
1227
let mut output = base.minus(shifted.node(), ctx.expr_arena);
1228
1229
if null_behavior == NullBehavior::Drop {
1230
// Without the column size, slice can only remove leading nulls.
1231
// So if the offset was negative, the nulls appeared at the end of the column.
1232
// In that case, shift the column forward to move the nulls back to the front.
1233
let zero_literal =
1234
AExprBuilder::lit(LiteralValue::new_idxsize(0), ctx.expr_arena);
1235
let offset_neg = offset.negate(ctx.expr_arena);
1236
let offset_if_negative = AExprBuilder::function(
1237
vec![offset_neg.expr_ir_unnamed(), zero_literal.expr_ir_unnamed()],
1238
IRFunctionExpr::MaxHorizontal,
1239
ctx.expr_arena,
1240
);
1241
output = output.shift(offset_if_negative, ctx.expr_arena);
1242
1243
// Remove the nulls that were introduced by the shift
1244
let offset_abs = offset.abs(ctx.expr_arena);
1245
let null_literal = AExprBuilder::lit(
1246
LiteralValue::Scalar(Scalar::null(DataType::Int64)),
1247
ctx.expr_arena,
1248
);
1249
output = output.slice(offset_abs, null_literal, ctx.expr_arena);
1250
}
1251
1252
let (stream, nodes) = lower_exprs_with_ctx(input, &[output.node()], ctx)?;
1253
input_streams.insert(stream);
1254
transformed_exprs.extend(nodes);
1255
},
1256
1257
AExpr::Function {
1258
input: ref inner_exprs,
1259
function: IRFunctionExpr::RLE,
1260
options: _,
1261
} => {
1262
assert_eq!(inner_exprs.len(), 1);
1263
1264
let input_schema = &ctx.phys_sm[input.node].output_schema;
1265
1266
let value_key = unique_column_name();
1267
let value_dtype = inner_exprs[0].dtype(input_schema, ctx.expr_arena)?;
1268
1269
let input = build_select_stream_with_ctx(
1270
input,
1271
&[inner_exprs[0].with_alias(value_key.clone())],
1272
ctx,
1273
)?;
1274
let node_kind = PhysNodeKind::Rle(input);
1275
1276
let output_schema = Schema::from_iter([(
1277
value_key.clone(),
1278
DataType::Struct(vec![
1279
Field::new(
1280
PlSmallStr::from_static(RLE_VALUE_COLUMN_NAME),
1281
value_dtype.clone(),
1282
),
1283
Field::new(PlSmallStr::from_static(RLE_LENGTH_COLUMN_NAME), IDX_DTYPE),
1284
]),
1285
)]);
1286
let node_key = ctx
1287
.phys_sm
1288
.insert(PhysNode::new(Arc::new(output_schema), node_kind));
1289
input_streams.insert(PhysStream::first(node_key));
1290
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(value_key)));
1291
},
1292
1293
AExpr::Function {
1294
input: ref inner_exprs,
1295
function: IRFunctionExpr::RLEID,
1296
options: _,
1297
} => {
1298
assert_eq!(inner_exprs.len(), 1);
1299
1300
let value_key = unique_column_name();
1301
1302
let input = build_select_stream_with_ctx(
1303
input,
1304
&[inner_exprs[0].with_alias(value_key.clone())],
1305
ctx,
1306
)?;
1307
let node_kind = PhysNodeKind::RleId(input);
1308
1309
let output_schema = Schema::from_iter([(value_key.clone(), IDX_DTYPE)]);
1310
let node_key = ctx
1311
.phys_sm
1312
.insert(PhysNode::new(Arc::new(output_schema), node_kind));
1313
input_streams.insert(PhysStream::first(node_key));
1314
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(value_key.clone())));
1315
},
1316
1317
AExpr::Function {
1318
input: ref inner_exprs,
1319
function: IRFunctionExpr::GatherEvery { n, offset },
1320
options: _,
1321
} => {
1322
assert_eq!(inner_exprs.len(), 1);
1323
1324
let value_key = unique_column_name();
1325
1326
let input = build_select_stream_with_ctx(
1327
input,
1328
&[inner_exprs[0].with_alias(value_key.clone())],
1329
ctx,
1330
)?;
1331
let node_kind = PhysNodeKind::GatherEvery { input, n, offset };
1332
1333
let output_schema = ctx.phys_sm[input.node].output_schema.clone();
1334
let node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, node_kind));
1335
input_streams.insert(PhysStream::first(node_key));
1336
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(value_key.clone())));
1337
},
1338
1339
AExpr::Function {
1340
input: ref inner_exprs,
1341
function: ref function @ (IRFunctionExpr::PeakMin | IRFunctionExpr::PeakMax),
1342
options: _,
1343
} => {
1344
assert_eq!(inner_exprs.len(), 1);
1345
1346
let value_key = unique_column_name();
1347
1348
let input = build_select_stream_with_ctx(
1349
input,
1350
&[inner_exprs[0].with_alias(value_key.clone())],
1351
ctx,
1352
)?;
1353
let is_peak_max = matches!(function, IRFunctionExpr::PeakMax);
1354
let node_kind = PhysNodeKind::PeakMinMax { input, is_peak_max };
1355
1356
let output_schema = Schema::from_iter([(value_key.clone(), DataType::Boolean)]);
1357
let node_key = ctx
1358
.phys_sm
1359
.insert(PhysNode::new(Arc::new(output_schema), node_kind));
1360
input_streams.insert(PhysStream::first(node_key));
1361
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(value_key.clone())));
1362
},
1363
1364
// pl.row_index() maps to this.
1365
#[cfg(feature = "range")]
1366
AExpr::Function {
1367
input: ref inner_exprs,
1368
function: IRFunctionExpr::Range(IRRangeFunction::IntRange { step: 1, dtype }),
1369
options: _,
1370
} if {
1371
let start_is_zero = match ctx.expr_arena.get(inner_exprs[0].node()) {
1372
AExpr::Literal(lit) => lit.extract_usize().ok() == Some(0),
1373
_ => false,
1374
};
1375
let stop_is_len = matches!(ctx.expr_arena.get(inner_exprs[1].node()), AExpr::Len);
1376
1377
dtype == DataType::IDX_DTYPE && start_is_zero && stop_is_len
1378
} =>
1379
{
1380
let out_name = unique_column_name();
1381
let row_idx_col_aexpr = ctx.expr_arena.add(AExpr::Column(out_name.clone()));
1382
let row_idx_col_expr_ir =
1383
ExprIR::new(row_idx_col_aexpr, OutputName::ColumnLhs(out_name.clone()));
1384
let row_idx_stream = build_select_stream_with_ctx(
1385
build_row_idx_stream(input, out_name, None, ctx.phys_sm),
1386
&[row_idx_col_expr_ir],
1387
ctx,
1388
)?;
1389
input_streams.insert(row_idx_stream);
1390
transformed_exprs.push(row_idx_col_aexpr);
1391
},
1392
1393
#[cfg(feature = "range")]
1394
AExpr::Function {
1395
input: ref inner_exprs,
1396
function: IRFunctionExpr::Range(IRRangeFunction::IntRange { step: 1, dtype }),
1397
options: _,
1398
} if {
1399
let start_is_zero = match ctx.expr_arena.get(inner_exprs[0].node()) {
1400
AExpr::Literal(lit) => lit.extract_usize().ok() == Some(0),
1401
_ => false,
1402
};
1403
let stop_is_count = matches!(
1404
ctx.expr_arena.get(inner_exprs[1].node()),
1405
AExpr::Agg(IRAggExpr::Count { .. })
1406
);
1407
1408
start_is_zero && stop_is_count
1409
} =>
1410
{
1411
let AExpr::Agg(IRAggExpr::Count {
1412
input: input_expr,
1413
include_nulls,
1414
}) = ctx.expr_arena.get(inner_exprs[1].node())
1415
else {
1416
unreachable!();
1417
};
1418
let (input_expr, include_nulls) = (*input_expr, *include_nulls);
1419
1420
let out_name = unique_column_name();
1421
let mut row_idx_col_aexpr = ctx.expr_arena.add(AExpr::Column(out_name.clone()));
1422
if dtype != IDX_DTYPE {
1423
row_idx_col_aexpr = AExprBuilder::new_from_node(row_idx_col_aexpr)
1424
.cast(dtype, ctx.expr_arena)
1425
.node();
1426
}
1427
let row_idx_col_expr_ir =
1428
ExprIR::new(row_idx_col_aexpr, OutputName::ColumnLhs(out_name.clone()));
1429
1430
let mut input_expr = AExprBuilder::new_from_node(input_expr);
1431
if !include_nulls {
1432
input_expr = input_expr.drop_nulls(ctx.expr_arena);
1433
}
1434
let input_expr = input_expr.expr_ir_retain_name(ctx.expr_arena);
1435
1436
let row_idx_stream = build_select_stream_with_ctx(
1437
build_row_idx_stream(
1438
build_select_stream_with_ctx(input, &[input_expr], ctx)?,
1439
out_name,
1440
None,
1441
ctx.phys_sm,
1442
),
1443
&[row_idx_col_expr_ir],
1444
ctx,
1445
)?;
1446
input_streams.insert(row_idx_stream);
1447
transformed_exprs.push(row_idx_col_aexpr);
1448
},
1449
1450
// Lower arbitrary elementwise functions.
1451
ref node @ AExpr::Function {
1452
input: ref inner_exprs,
1453
options,
1454
..
1455
}
1456
| ref node @ AExpr::AnonymousFunction {
1457
input: ref inner_exprs,
1458
options,
1459
..
1460
} if options.is_elementwise() && !is_fake_elementwise_function(node) => {
1461
let inner_nodes = inner_exprs.iter().map(|expr| expr.node()).collect_vec();
1462
let (trans_input, trans_exprs) = lower_exprs_with_ctx(input, &inner_nodes, ctx)?;
1463
1464
// The function may be sensitive to names (e.g. pl.struct), so we restore them.
1465
let new_input = trans_exprs
1466
.into_iter()
1467
.zip(inner_exprs)
1468
.map(|(trans, orig)| {
1469
ExprIR::new(trans, OutputName::Alias(orig.output_name().clone()))
1470
})
1471
.collect_vec();
1472
let mut new_node = node.clone();
1473
match &mut new_node {
1474
AExpr::Function { input, .. } | AExpr::AnonymousFunction { input, .. } => {
1475
*input = new_input;
1476
},
1477
_ => unreachable!(),
1478
}
1479
input_streams.insert(trans_input);
1480
transformed_exprs.push(ctx.expr_arena.add(new_node));
1481
},
1482
1483
// Lower arbitrary row-separable functions.
1484
ref node @ AExpr::Function {
1485
input: ref inner_exprs,
1486
ref function,
1487
options,
1488
} if options.is_row_separable() && !is_fake_elementwise_function(node) => {
1489
// While these functions are streamable, they are not elementwise, so we
1490
// have to transform them to a select node.
1491
let inner_nodes = inner_exprs.iter().map(|x| x.node()).collect_vec();
1492
let (trans_input, trans_exprs) = lower_exprs_with_ctx(input, &inner_nodes, ctx)?;
1493
let out_name = unique_column_name();
1494
let trans_inner = ctx.expr_arena.add(AExpr::Function {
1495
input: trans_exprs
1496
.iter()
1497
.map(|node| ExprIR::from_node(*node, ctx.expr_arena))
1498
.collect(),
1499
function: function.clone(),
1500
options,
1501
});
1502
let func_expr = ExprIR::new(trans_inner, OutputName::Alias(out_name.clone()));
1503
let output_schema =
1504
schema_for_select(trans_input, std::slice::from_ref(&func_expr), ctx)?;
1505
let node_kind = PhysNodeKind::Select {
1506
input: trans_input,
1507
selectors: vec![func_expr.clone()],
1508
extend_original: false,
1509
};
1510
let node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, node_kind));
1511
input_streams.insert(PhysStream::first(node_key));
1512
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));
1513
},
1514
1515
AExpr::BinaryExpr { left, op, right } => {
1516
let (trans_input, trans_exprs) = lower_exprs_with_ctx(input, &[left, right], ctx)?;
1517
let bin_expr = AExpr::BinaryExpr {
1518
left: trans_exprs[0],
1519
op,
1520
right: trans_exprs[1],
1521
};
1522
input_streams.insert(trans_input);
1523
transformed_exprs.push(ctx.expr_arena.add(bin_expr));
1524
},
1525
AExpr::Eval {
1526
expr: inner,
1527
evaluation,
1528
variant,
1529
} => match variant {
1530
EvalVariant::List
1531
| EvalVariant::ListAgg
1532
| EvalVariant::Array { as_list: _ }
1533
| EvalVariant::ArrayAgg => {
1534
let (trans_input, trans_expr) = lower_exprs_with_ctx(input, &[inner], ctx)?;
1535
let eval_expr = AExpr::Eval {
1536
expr: trans_expr[0],
1537
evaluation,
1538
variant,
1539
};
1540
input_streams.insert(trans_input);
1541
transformed_exprs.push(ctx.expr_arena.add(eval_expr));
1542
},
1543
EvalVariant::Cumulative { .. } => {
1544
// Cumulative is not elementwise, this would need a special node.
1545
let out_name = unique_column_name();
1546
fallback_subset.push(ExprIR::new(expr, OutputName::Alias(out_name.clone())));
1547
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));
1548
},
1549
},
1550
AExpr::StructEval {
1551
expr: inner,
1552
mut evaluation,
1553
} => {
1554
// Transform (simplified):
1555
// expr.struct.with_fields(evaluation).alias(name)
1556
// ->
1557
// .select(expr)
1558
// .with_columns(validity = expr.is_not_null())
1559
// .map(df.struct.unnest()))
1560
// .with_columns([evaluation])
1561
// .select(pl.when(validity).then(as_struct()).alias(name)
1562
//
1563
// Any reference to `StructField(x)` gets remapped to `Column(PREFIX_x)` prior to
1564
// calling `unnest()`, with PREFIX being unique for each StructEval expression.
1565
1566
// Evaluate input `expr` and capture `col` references from `evaluation`
1567
let out_name = unique_column_name();
1568
let inner_expr_ir = ExprIR::new(inner, OutputName::Alias(out_name.clone()));
1569
let mut expr_irs = Vec::new();
1570
expr_irs.push(inner_expr_ir);
1571
1572
// Any column expression inside evaluation must be added explicitly.
1573
let eval_col_names: PlHashSet<_> = evaluation
1574
.iter()
1575
.flat_map(|expr| {
1576
polars_plan::utils::aexpr_to_leaf_names_iter(expr.node(), ctx.expr_arena)
1577
})
1578
.cloned()
1579
.collect();
1580
for name in eval_col_names {
1581
expr_irs.push(ExprIR::new(
1582
ctx.expr_arena.add(AExpr::Column(name.clone())),
1583
OutputName::ColumnLhs(name),
1584
));
1585
}
1586
let stream = build_select_stream_with_ctx(input, &expr_irs, ctx)?;
1587
1588
// Capture validity as an extra column.
1589
let validity_name = polars_utils::format_pl_smallstr!(
1590
"{}{}",
1591
out_name,
1592
PlSmallStr::from_static("_VLD")
1593
);
1594
let validity_input_node = ctx.expr_arena.add(AExpr::Column(out_name.clone()));
1595
let validity_expr_ir = ExprIR::new(
1596
validity_input_node,
1597
OutputName::Alias(validity_name.clone()),
1598
);
1599
let validity_expr = AExprBuilder::function(
1600
vec![validity_expr_ir],
1601
IRFunctionExpr::Boolean(IRBooleanFunction::IsNotNull),
1602
ctx.expr_arena,
1603
);
1604
let validity_node = validity_expr.node();
1605
let validity_expr_ir =
1606
ExprIR::new(validity_node, OutputName::Alias(validity_name.clone()));
1607
let stream = build_hstack_stream(
1608
stream,
1609
&[validity_expr_ir],
1610
ctx.expr_arena,
1611
ctx.phys_sm,
1612
ctx.cache,
1613
StreamingLowerIRContext {
1614
prepare_visualization: ctx.prepare_visualization,
1615
},
1616
)?;
1617
1618
// Rewrite any `StructField(x)`` expression into a `Col(prefix_x)`` expression.
1619
let separator = PlSmallStr::from_static("_FLD_");
1620
let field_prefix = polars_utils::format_pl_smallstr!("{}{}", out_name, separator);
1621
evaluation.iter_mut().for_each(|e| {
1622
e.set_node(structfield_to_column(
1623
e.node(),
1624
ctx.expr_arena,
1625
&field_prefix,
1626
))
1627
});
1628
1629
// Unnest.
1630
let unnest_fn = FunctionIR::Unnest {
1631
columns: Arc::new([out_name.clone()]),
1632
separator: Some(separator.clone()),
1633
};
1634
let input_schema = ctx.phys_sm[stream.node].output_schema.clone();
1635
let output_schema = unnest_fn.schema(&input_schema)?.into_owned();
1636
let format_str = ctx.prepare_visualization.then(|| {
1637
format!(
1638
"UNNEST columns: [{}], separator: \"{}\"",
1639
out_name.as_str(),
1640
separator.as_str()
1641
)
1642
});
1643
let map = Arc::new(move |df| unnest_fn.evaluate(df));
1644
let node_kind = PhysNodeKind::Map {
1645
input: stream,
1646
map,
1647
format_str,
1648
};
1649
let node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, node_kind));
1650
let stream = PhysStream::first(node_key);
1651
1652
// Evaluate `evaluation`, using `with_columns`.
1653
// This requires output names to be prefixed, as they refer to the local StructField namespace.
1654
// Note, native columns are still included in the stream but could be dropped (nice-to-have).
1655
evaluation.iter_mut().for_each(|e| {
1656
*e = e.with_alias(polars_utils::format_pl_smallstr!(
1657
"{}{}",
1658
&field_prefix,
1659
e.output_name()
1660
));
1661
});
1662
let stream = build_hstack_stream(
1663
stream,
1664
&evaluation,
1665
ctx.expr_arena,
1666
ctx.phys_sm,
1667
ctx.cache,
1668
StreamingLowerIRContext {
1669
prepare_visualization: ctx.prepare_visualization,
1670
},
1671
)?;
1672
1673
// Nest any column that belongs to the StructField namespace back into a Struct.
1674
let mut fields_expr_irs = Vec::new();
1675
let eval_schema = ctx.phys_sm[stream.node].output_schema.clone();
1676
for (name, _) in eval_schema.iter() {
1677
if let Some(stripped_name) = name.strip_prefix(field_prefix.as_str()) {
1678
let node = ctx.expr_arena.add(AExpr::Column(name.clone()));
1679
fields_expr_irs.push(
1680
ExprIR::from_node(node, ctx.expr_arena)
1681
.with_alias(PlSmallStr::from_str(stripped_name)),
1682
);
1683
}
1684
}
1685
let as_struct_expr = AExprBuilder::function(
1686
fields_expr_irs,
1687
IRFunctionExpr::AsStruct,
1688
ctx.expr_arena,
1689
);
1690
let as_struct_node = as_struct_expr.node();
1691
1692
// Apply validity.
1693
let with_validity = AExprBuilder::when_then_otherwise(
1694
AExprBuilder::col(validity_name.clone(), ctx.expr_arena),
1695
AExprBuilder::new_from_node(as_struct_node),
1696
AExprBuilder::lit(
1697
LiteralValue::Scalar(Scalar::null(DataType::Null)),
1698
ctx.expr_arena,
1699
),
1700
ctx.expr_arena,
1701
);
1702
let with_validity_node = with_validity.node();
1703
let validity_expr_ir =
1704
ExprIR::new(with_validity_node, OutputName::Alias(out_name.clone()));
1705
let stream = build_select_stream_with_ctx(stream, &[validity_expr_ir], ctx)?;
1706
let exit_node = ctx.expr_arena.add(AExpr::Column(out_name.clone()));
1707
1708
// Finalize.
1709
input_streams.insert(stream);
1710
transformed_exprs.push(exit_node);
1711
},
1712
AExpr::Ternary {
1713
predicate,
1714
truthy,
1715
falsy,
1716
} => {
1717
let (trans_input, trans_exprs) =
1718
lower_exprs_with_ctx(input, &[predicate, truthy, falsy], ctx)?;
1719
let tern_expr = AExpr::Ternary {
1720
predicate: trans_exprs[0],
1721
truthy: trans_exprs[1],
1722
falsy: trans_exprs[2],
1723
};
1724
input_streams.insert(trans_input);
1725
transformed_exprs.push(ctx.expr_arena.add(tern_expr));
1726
},
1727
AExpr::Cast {
1728
expr: inner,
1729
dtype,
1730
options,
1731
} => {
1732
let (trans_input, trans_exprs) = lower_exprs_with_ctx(input, &[inner], ctx)?;
1733
input_streams.insert(trans_input);
1734
transformed_exprs.push(ctx.expr_arena.add(AExpr::Cast {
1735
expr: trans_exprs[0],
1736
dtype,
1737
options,
1738
}));
1739
},
1740
AExpr::Sort {
1741
expr: inner,
1742
options,
1743
} => {
1744
// As we'll refer to the sorted column twice, ensure the inner
1745
// expr is available as a column by selecting first.
1746
let sorted_name = unique_column_name();
1747
let inner_expr_ir = ExprIR::new(inner, OutputName::Alias(sorted_name.clone()));
1748
let select_stream =
1749
build_select_stream_with_ctx(input, std::slice::from_ref(&inner_expr_ir), ctx)?;
1750
let col_expr = ctx.expr_arena.add(AExpr::Column(sorted_name.clone()));
1751
let kind = PhysNodeKind::Sort {
1752
input: select_stream,
1753
by_column: vec![ExprIR::new(col_expr, OutputName::Alias(sorted_name))],
1754
slice: None,
1755
sort_options: (&options).into(),
1756
};
1757
let output_schema = ctx.phys_sm[select_stream.node].output_schema.clone();
1758
let node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, kind));
1759
input_streams.insert(PhysStream::first(node_key));
1760
transformed_exprs.push(col_expr);
1761
},
1762
1763
AExpr::SortBy {
1764
expr: inner,
1765
by,
1766
sort_options,
1767
} => {
1768
// Select our inputs (if we don't do this we'll waste time sorting irrelevant columns).
1769
let sorted_name = unique_column_name();
1770
let by_names = by.iter().map(|_| unique_column_name()).collect_vec();
1771
let all_inner_expr_irs = [(&sorted_name, inner)]
1772
.into_iter()
1773
.chain(by_names.iter().zip(by.iter().copied()))
1774
.map(|(name, inner)| ExprIR::new(inner, OutputName::Alias(name.clone())))
1775
.collect_vec();
1776
let select_stream = build_select_stream_with_ctx(input, &all_inner_expr_irs, ctx)?;
1777
1778
// Sort the inputs.
1779
let kind = PhysNodeKind::Sort {
1780
input: select_stream,
1781
by_column: by_names
1782
.into_iter()
1783
.map(|name| {
1784
ExprIR::new(
1785
ctx.expr_arena.add(AExpr::Column(name.clone())),
1786
OutputName::Alias(name),
1787
)
1788
})
1789
.collect(),
1790
slice: None,
1791
sort_options,
1792
};
1793
let output_schema = ctx.phys_sm[select_stream.node].output_schema.clone();
1794
let sort_node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, kind));
1795
1796
let sorted_col_expr = ctx.expr_arena.add(AExpr::Column(sorted_name.clone()));
1797
input_streams.insert(PhysStream::first(sort_node_key));
1798
transformed_exprs.push(sorted_col_expr);
1799
},
1800
1801
#[cfg(feature = "top_k")]
1802
AExpr::Function {
1803
input: inner_exprs,
1804
function: function @ (IRFunctionExpr::TopK { .. } | IRFunctionExpr::TopKBy { .. }),
1805
options: _,
1806
} => {
1807
// Select our inputs.
1808
let by = &inner_exprs[2..];
1809
let out_name = unique_column_name();
1810
let by_names = by.iter().map(|_| unique_column_name()).collect_vec();
1811
let data_irs = [(&out_name, &inner_exprs[0])]
1812
.into_iter()
1813
.chain(by_names.iter().zip(by.iter()))
1814
.map(|(name, inner)| ExprIR::new(inner.node(), OutputName::Alias(name.clone())))
1815
.collect_vec();
1816
let data_stream = build_select_stream_with_ctx(input, &data_irs, ctx)?;
1817
let k_stream = build_select_stream_with_ctx(input, &inner_exprs[1..2], ctx)?;
1818
1819
// Create 'by' column expressions.
1820
let out_col_node = ctx.expr_arena.add(AExpr::Column(out_name.clone()));
1821
let out_col_expr = ExprIR::new(out_col_node, OutputName::Alias(out_name));
1822
let (by_column, reverse) = match function {
1823
IRFunctionExpr::TopK { descending } => {
1824
(vec![out_col_expr.clone()], vec![descending])
1825
},
1826
IRFunctionExpr::TopKBy {
1827
descending: reverse,
1828
} => {
1829
let by_column = by_names
1830
.into_iter()
1831
.map(|name| {
1832
ExprIR::new(
1833
ctx.expr_arena.add(AExpr::Column(name.clone())),
1834
OutputName::Alias(name),
1835
)
1836
})
1837
.collect();
1838
(by_column, reverse.clone())
1839
},
1840
_ => unreachable!(),
1841
};
1842
1843
let kind = PhysNodeKind::TopK {
1844
input: data_stream,
1845
k: k_stream,
1846
nulls_last: vec![true; by_column.len()],
1847
reverse,
1848
by_column,
1849
};
1850
let output_schema = ctx.phys_sm[data_stream.node].output_schema.clone();
1851
let node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, kind));
1852
input_streams.insert(PhysStream::first(node_key));
1853
transformed_exprs.push(out_col_node);
1854
},
1855
1856
AExpr::Filter { input: inner, by } => {
1857
// Select our inputs (if we don't do this we'll waste time filtering irrelevant columns).
1858
let out_name = unique_column_name();
1859
let by_name = unique_column_name();
1860
let inner_expr_ir = ExprIR::new(inner, OutputName::Alias(out_name.clone()));
1861
let by_expr_ir = ExprIR::new(by, OutputName::Alias(by_name.clone()));
1862
let select_stream =
1863
build_select_stream_with_ctx(input, &[inner_expr_ir, by_expr_ir], ctx)?;
1864
1865
// Add a filter node.
1866
let predicate = ExprIR::new(
1867
ctx.expr_arena.add(AExpr::Column(by_name.clone())),
1868
OutputName::Alias(by_name),
1869
);
1870
let kind = PhysNodeKind::Filter {
1871
input: select_stream,
1872
predicate,
1873
};
1874
let output_schema = ctx.phys_sm[select_stream.node].output_schema.clone();
1875
let filter_node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, kind));
1876
input_streams.insert(PhysStream::first(filter_node_key));
1877
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));
1878
},
1879
1880
AExpr::AnonymousAgg {
1881
input: _,
1882
fmt_str: _,
1883
function: _,
1884
} => {
1885
let (trans_stream, trans_expr) = lower_reduce_node(input, expr, ctx)?;
1886
input_streams.insert(trans_stream);
1887
transformed_exprs.push(trans_expr);
1888
},
1889
// Aggregates.
1890
AExpr::Agg(agg) => match agg {
1891
// Change agg mutably so we can share the codepath for all of these.
1892
IRAggExpr::Min { .. }
1893
| IRAggExpr::Max { .. }
1894
| IRAggExpr::First(_)
1895
| IRAggExpr::FirstNonNull(_)
1896
| IRAggExpr::Last(_)
1897
| IRAggExpr::LastNonNull(_)
1898
| IRAggExpr::Item { .. }
1899
| IRAggExpr::Sum(_)
1900
| IRAggExpr::Mean(_)
1901
| IRAggExpr::Var { .. }
1902
| IRAggExpr::Std { .. }
1903
| IRAggExpr::Count { .. } => {
1904
let (trans_stream, trans_expr) = lower_reduce_node(input, expr, ctx)?;
1905
input_streams.insert(trans_stream);
1906
transformed_exprs.push(trans_expr);
1907
},
1908
IRAggExpr::NUnique(inner) => {
1909
// Lower to no-aggregate group-by with unique name feeding into len aggregate.
1910
let tmp_name = unique_column_name();
1911
let (trans_input, trans_inner_exprs) =
1912
lower_exprs_with_ctx(input, &[inner], ctx)?;
1913
let group_by_key_expr =
1914
ExprIR::new(trans_inner_exprs[0], OutputName::Alias(tmp_name.clone()));
1915
let group_by_output_schema = schema_for_select(
1916
trans_input,
1917
std::slice::from_ref(&group_by_key_expr),
1918
ctx,
1919
)?;
1920
let group_by_stream = build_group_by_stream(
1921
trans_input,
1922
&[group_by_key_expr],
1923
&[],
1924
group_by_output_schema,
1925
false,
1926
Arc::new(GroupbyOptions::default()),
1927
None,
1928
ctx.expr_arena,
1929
ctx.phys_sm,
1930
ctx.cache,
1931
StreamingLowerIRContext::from(&*ctx),
1932
false,
1933
)?;
1934
1935
let len_node = ctx.expr_arena.add(AExpr::Len);
1936
let len_expr_ir = ExprIR::new(len_node, OutputName::Alias(tmp_name.clone()));
1937
let output_schema = schema_for_select(
1938
group_by_stream,
1939
std::slice::from_ref(&len_expr_ir),
1940
ctx,
1941
)?;
1942
let kind = PhysNodeKind::Reduce {
1943
input: group_by_stream,
1944
exprs: vec![len_expr_ir],
1945
};
1946
1947
let reduce_node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, kind));
1948
input_streams.insert(PhysStream::first(reduce_node_key));
1949
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(tmp_name)));
1950
},
1951
IRAggExpr::Median(_)
1952
| IRAggExpr::Implode(_)
1953
| IRAggExpr::Quantile { .. }
1954
| IRAggExpr::AggGroups(_) => {
1955
let out_name = unique_column_name();
1956
fallback_subset.push(ExprIR::new(expr, OutputName::Alias(out_name.clone())));
1957
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));
1958
},
1959
},
1960
1961
#[cfg(feature = "bitwise")]
1962
AExpr::Function {
1963
function:
1964
IRFunctionExpr::Bitwise(
1965
IRBitwiseFunction::And | IRBitwiseFunction::Or | IRBitwiseFunction::Xor,
1966
),
1967
..
1968
} => {
1969
let (trans_stream, trans_expr) = lower_reduce_node(input, expr, ctx)?;
1970
input_streams.insert(trans_stream);
1971
transformed_exprs.push(trans_expr);
1972
},
1973
1974
#[cfg(feature = "approx_unique")]
1975
AExpr::Function {
1976
function: IRFunctionExpr::ApproxNUnique,
1977
..
1978
} => {
1979
let (trans_stream, trans_expr) = lower_reduce_node(input, expr, ctx)?;
1980
input_streams.insert(trans_stream);
1981
transformed_exprs.push(trans_expr);
1982
},
1983
1984
AExpr::Function {
1985
function:
1986
IRFunctionExpr::Boolean(
1987
IRBooleanFunction::Any { .. } | IRBooleanFunction::All { .. },
1988
)
1989
| IRFunctionExpr::MinBy
1990
| IRFunctionExpr::MaxBy
1991
| IRFunctionExpr::NullCount,
1992
..
1993
} => {
1994
let (trans_stream, trans_expr) = lower_reduce_node(input, expr, ctx)?;
1995
input_streams.insert(trans_stream);
1996
transformed_exprs.push(trans_expr);
1997
},
1998
1999
// Length-based expressions.
2000
AExpr::Len => {
2001
let out_name = unique_column_name();
2002
let expr_ir = ExprIR::new(expr, OutputName::Alias(out_name.clone()));
2003
let output_schema = schema_for_select(input, std::slice::from_ref(&expr_ir), ctx)?;
2004
let kind = PhysNodeKind::Reduce {
2005
input,
2006
exprs: vec![expr_ir],
2007
};
2008
let reduce_node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, kind));
2009
input_streams.insert(PhysStream::first(reduce_node_key));
2010
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));
2011
},
2012
2013
AExpr::Function {
2014
input: ref inner_exprs,
2015
function: IRFunctionExpr::ArgWhere,
2016
options: _,
2017
} => {
2018
// pl.arg_where(expr)
2019
//
2020
// ->
2021
// .select(predicate_name = expr)
2022
// .with_row_index(out_name)
2023
// .filter(predicate_name)
2024
// .select(out_name)
2025
let out_name = unique_column_name();
2026
let predicate_name = unique_column_name();
2027
let predicate = build_select_stream_with_ctx(
2028
input,
2029
&[inner_exprs[0].with_alias(predicate_name.clone())],
2030
ctx,
2031
)?;
2032
let row_index =
2033
build_row_idx_stream(predicate, out_name.clone(), None, ctx.phys_sm);
2034
2035
let filter_stream = build_filter_stream(
2036
row_index,
2037
AExprBuilder::col(predicate_name.clone(), ctx.expr_arena)
2038
.expr_ir(predicate_name),
2039
ctx.expr_arena,
2040
ctx.phys_sm,
2041
ctx.cache,
2042
StreamingLowerIRContext {
2043
prepare_visualization: ctx.prepare_visualization,
2044
},
2045
)?;
2046
input_streams.insert(filter_stream);
2047
transformed_exprs.push(AExprBuilder::col(out_name.clone(), ctx.expr_arena).node());
2048
},
2049
2050
AExpr::Slice {
2051
input: inner,
2052
offset,
2053
length,
2054
} => {
2055
let out_name = unique_column_name();
2056
let inner_expr_ir = ExprIR::new(inner, OutputName::Alias(out_name.clone()));
2057
let offset_expr_ir = ExprIR::from_node(offset, ctx.expr_arena);
2058
let length_expr_ir = ExprIR::from_node(length, ctx.expr_arena);
2059
let input_stream = build_select_stream_with_ctx(input, &[inner_expr_ir], ctx)?;
2060
let offset_stream = build_select_stream_with_ctx(input, &[offset_expr_ir], ctx)?;
2061
let length_stream = build_select_stream_with_ctx(input, &[length_expr_ir], ctx)?;
2062
2063
let output_schema = ctx.phys_sm[input_stream.node].output_schema.clone();
2064
let kind = PhysNodeKind::DynamicSlice {
2065
input: input_stream,
2066
offset: offset_stream,
2067
length: length_stream,
2068
};
2069
let slice_node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, kind));
2070
input_streams.insert(PhysStream::first(slice_node_key));
2071
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));
2072
},
2073
2074
AExpr::Function {
2075
input: ref inner_exprs,
2076
function: func @ (IRFunctionExpr::Shift | IRFunctionExpr::ShiftAndFill),
2077
options: _,
2078
} => {
2079
let out_name = unique_column_name();
2080
let data_col_expr = inner_exprs[0].with_alias(out_name.clone());
2081
let trans_data_column = build_select_stream_with_ctx(input, &[data_col_expr], ctx)?;
2082
let trans_offset =
2083
build_select_stream_with_ctx(input, &[inner_exprs[1].clone()], ctx)?;
2084
2085
let trans_fill = if func == IRFunctionExpr::ShiftAndFill {
2086
let fill_expr = inner_exprs[2].with_alias(out_name.clone());
2087
Some(build_select_stream_with_ctx(input, &[fill_expr], ctx)?)
2088
} else {
2089
None
2090
};
2091
2092
let output_schema = ctx.phys_sm[trans_data_column.node].output_schema.clone();
2093
let node_key = ctx.phys_sm.insert(PhysNode::new(
2094
output_schema,
2095
PhysNodeKind::Shift {
2096
input: trans_data_column,
2097
offset: trans_offset,
2098
fill: trans_fill,
2099
},
2100
));
2101
2102
input_streams.insert(PhysStream::first(node_key));
2103
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));
2104
},
2105
2106
#[cfg(feature = "ewma")]
2107
AExpr::Function {
2108
input: input_exprs,
2109
function:
2110
ewm_variant @ IRFunctionExpr::EwmMean { options }
2111
| ewm_variant @ IRFunctionExpr::EwmVar { options }
2112
| ewm_variant @ IRFunctionExpr::EwmStd { options },
2113
options: _,
2114
} => {
2115
let out_name = unique_column_name();
2116
2117
let input = match input_exprs.as_slice() {
2118
[input_expr] => build_select_stream_with_ctx(
2119
input,
2120
&[input_expr.with_alias(out_name.clone())],
2121
ctx,
2122
)?,
2123
_ => panic!("{:?}", input_exprs),
2124
};
2125
2126
let input_schema = ctx.phys_sm[input.node].output_schema.clone();
2127
assert_eq!(input_schema.len(), 1);
2128
let output_schema = input_schema;
2129
2130
let kind = match ewm_variant {
2131
IRFunctionExpr::EwmMean { .. } => PhysNodeKind::EwmMean { input, options },
2132
IRFunctionExpr::EwmVar { .. } => PhysNodeKind::EwmVar { input, options },
2133
IRFunctionExpr::EwmStd { .. } => PhysNodeKind::EwmStd { input, options },
2134
_ => unreachable!(),
2135
};
2136
let node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, kind));
2137
input_streams.insert(PhysStream::first(node_key));
2138
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));
2139
},
2140
2141
#[cfg(feature = "dynamic_group_by")]
2142
rolling_function @ AExpr::Rolling {
2143
function,
2144
index_column,
2145
period,
2146
offset,
2147
closed_window,
2148
} => {
2149
// function.rolling(index_column=index_column)
2150
//
2151
// ->
2152
//
2153
// .select(*LIVE_COLUMNS(function), _tmp0 = index_column)
2154
// .rolling(_tmp0)
2155
// .agg(_tmp1 = function)
2156
// .select(_tmp1)
2157
2158
let out_name = unique_column_name();
2159
let index_column_name = unique_column_name();
2160
2161
let index_column_expr_ir =
2162
AExprBuilder::new_from_node(index_column).expr_ir(index_column_name.clone());
2163
2164
let input_schema = &ctx.phys_sm[input.node].output_schema;
2165
let output_dtype = rolling_function
2166
.to_dtype(&ToFieldContext::new(ctx.expr_arena, input_schema))?;
2167
let output_schema = Schema::from_iter([
2168
index_column_expr_ir.field(input_schema, ctx.expr_arena)?,
2169
Field::new(out_name.clone(), output_dtype),
2170
]);
2171
2172
let input_columns = aexpr_to_leaf_names(function, ctx.expr_arena)
2173
.into_iter()
2174
.map(|n| AExprBuilder::col(n.clone(), ctx.expr_arena).expr_ir(n))
2175
.chain(std::iter::once(index_column_expr_ir.clone()))
2176
.collect::<Vec<_>>();
2177
let input = build_select_stream_with_ctx(input, &input_columns, ctx)?;
2178
2179
let kind = PhysNodeKind::RollingGroupBy {
2180
input,
2181
index_column: index_column_name,
2182
period,
2183
offset,
2184
closed: closed_window,
2185
slice: None,
2186
aggs: vec![AExprBuilder::new_from_node(function).expr_ir(out_name.clone())],
2187
};
2188
let node_key = ctx
2189
.phys_sm
2190
.insert(PhysNode::new(Arc::new(output_schema), kind));
2191
let input = PhysStream::first(node_key);
2192
2193
let input = build_select_stream_with_ctx(
2194
input,
2195
&[AExprBuilder::col(out_name.clone(), ctx.expr_arena)
2196
.expr_ir(out_name.clone())],
2197
ctx,
2198
)?;
2199
input_streams.insert(input);
2200
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));
2201
},
2202
2203
AExpr::AnonymousFunction { .. }
2204
| AExpr::Function { .. }
2205
| AExpr::Over { .. }
2206
| AExpr::Gather { .. } => {
2207
let out_name = unique_column_name();
2208
fallback_subset.push(ExprIR::new(expr, OutputName::Alias(out_name.clone())));
2209
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));
2210
},
2211
}
2212
}
2213
2214
if !fallback_subset.is_empty() {
2215
let fallback_node = build_fallback_node_with_ctx(input, &fallback_subset, ctx)?;
2216
input_streams.insert(PhysStream::first(fallback_node));
2217
}
2218
2219
// Simplify the input nodes (also ensures the original input only occurs
2220
// once in the zip).
2221
input_streams = simplify_input_streams(input, input_streams, ctx)?;
2222
2223
if input_streams.len() == 1 {
2224
// No need for any multiplexing/zipping, can directly execute.
2225
return Ok((input_streams.into_iter().next().unwrap(), transformed_exprs));
2226
}
2227
2228
let zip_inputs = input_streams.into_iter().collect_vec();
2229
let output_schema = zip_inputs
2230
.iter()
2231
.flat_map(|stream| ctx.phys_sm[stream.node].output_schema.iter_fields())
2232
.collect();
2233
let zip_kind = PhysNodeKind::Zip {
2234
inputs: zip_inputs,
2235
zip_behavior: ZipBehavior::Broadcast,
2236
};
2237
let zip_node = ctx
2238
.phys_sm
2239
.insert(PhysNode::new(Arc::new(output_schema), zip_kind));
2240
2241
Ok((PhysStream::first(zip_node), transformed_exprs))
2242
}
2243
2244
/// Computes the schema that selecting the given expressions on the input schema
2245
/// would result in.
2246
pub fn compute_output_schema(
2247
input_schema: &Schema,
2248
exprs: &[ExprIR],
2249
expr_arena: &Arena<AExpr>,
2250
) -> PolarsResult<Arc<Schema>> {
2251
let output_schema: Schema = exprs
2252
.iter()
2253
.map(|e| {
2254
let name = e.output_name().clone();
2255
let dtype = e
2256
.dtype(input_schema, expr_arena)?
2257
.clone()
2258
.materialize_unknown(true)
2259
.unwrap();
2260
PolarsResult::Ok(Field::new(name, dtype))
2261
})
2262
.try_collect()?;
2263
Ok(Arc::new(output_schema))
2264
}
2265
2266
/// Computes the schema that selecting the given expressions on the input node
2267
/// would result in.
2268
fn schema_for_select(
2269
input: PhysStream,
2270
exprs: &[ExprIR],
2271
ctx: &mut LowerExprContext,
2272
) -> PolarsResult<Arc<Schema>> {
2273
let input_schema = &ctx.phys_sm[input.node].output_schema;
2274
compute_output_schema(input_schema, exprs, ctx.expr_arena)
2275
}
2276
2277
fn build_select_stream_with_ctx(
2278
input: PhysStream,
2279
exprs: &[ExprIR],
2280
ctx: &mut LowerExprContext,
2281
) -> PolarsResult<PhysStream> {
2282
if exprs
2283
.iter()
2284
.all(|e| is_input_independent_ctx(e.node(), ctx))
2285
{
2286
return Ok(PhysStream::first(build_input_independent_node_with_ctx(
2287
exprs, ctx,
2288
)?));
2289
}
2290
2291
// Are we only selecting simple columns, with the same name?
2292
let all_simple_columns: Option<Vec<PlSmallStr>> = exprs
2293
.iter()
2294
.map(|e| match ctx.expr_arena.get(e.node()) {
2295
AExpr::Column(name) if name == e.output_name() => Some(name.clone()),
2296
_ => None,
2297
})
2298
.collect();
2299
2300
if let Some(columns) = all_simple_columns {
2301
let input_schema = ctx.phys_sm[input.node].output_schema.clone();
2302
if input_schema.len() == columns.len()
2303
&& input_schema.iter_names().zip(&columns).all(|(l, r)| l == r)
2304
{
2305
// Input node already has the correct schema, just pass through.
2306
return Ok(input);
2307
}
2308
2309
let output_schema = Arc::new(input_schema.try_project(&columns)?);
2310
let node_kind = PhysNodeKind::SimpleProjection { input, columns };
2311
let node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, node_kind));
2312
return Ok(PhysStream::first(node_key));
2313
}
2314
2315
// Actual lowering is needed.
2316
let node_exprs = exprs.iter().map(|e| e.node()).collect_vec();
2317
let (transformed_input, transformed_exprs) = lower_exprs_with_ctx(input, &node_exprs, ctx)?;
2318
let trans_expr_irs = exprs
2319
.iter()
2320
.zip(transformed_exprs)
2321
.map(|(e, te)| ExprIR::new(te, OutputName::Alias(e.output_name().clone())))
2322
.collect_vec();
2323
let output_schema = schema_for_select(transformed_input, &trans_expr_irs, ctx)?;
2324
let node_kind = PhysNodeKind::Select {
2325
input: transformed_input,
2326
selectors: trans_expr_irs,
2327
extend_original: false,
2328
};
2329
let node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, node_kind));
2330
Ok(PhysStream::first(node_key))
2331
}
2332
2333
/// Lowers an input node plus a set of expressions on that input node to an
2334
/// equivalent (input node, set of expressions) pair, ensuring that the new set
2335
/// of expressions can run on the streaming engine.
2336
///
2337
/// Ensures that if the input node is transformed it has unique column names.
2338
pub fn lower_exprs(
2339
input: PhysStream,
2340
exprs: &[ExprIR],
2341
expr_arena: &mut Arena<AExpr>,
2342
phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,
2343
expr_cache: &mut ExprCache,
2344
ctx: StreamingLowerIRContext,
2345
) -> PolarsResult<(PhysStream, Vec<ExprIR>)> {
2346
let mut ctx = LowerExprContext {
2347
expr_arena,
2348
phys_sm,
2349
cache: expr_cache,
2350
prepare_visualization: ctx.prepare_visualization,
2351
};
2352
let node_exprs = exprs.iter().map(|e| e.node()).collect_vec();
2353
let (transformed_input, transformed_exprs) =
2354
lower_exprs_with_ctx(input, &node_exprs, &mut ctx)?;
2355
let trans_expr_irs = exprs
2356
.iter()
2357
.zip(transformed_exprs)
2358
.map(|(e, te)| ExprIR::new(te, OutputName::Alias(e.output_name().clone())))
2359
.collect_vec();
2360
Ok((transformed_input, trans_expr_irs))
2361
}
2362
2363
/// Builds a new selection node given an input stream and the expressions to
2364
/// select for, if needed.
2365
pub fn build_select_stream(
2366
input: PhysStream,
2367
exprs: &[ExprIR],
2368
expr_arena: &mut Arena<AExpr>,
2369
phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,
2370
expr_cache: &mut ExprCache,
2371
ctx: StreamingLowerIRContext,
2372
) -> PolarsResult<PhysStream> {
2373
let mut ctx = LowerExprContext {
2374
expr_arena,
2375
phys_sm,
2376
cache: expr_cache,
2377
prepare_visualization: ctx.prepare_visualization,
2378
};
2379
build_select_stream_with_ctx(input, exprs, &mut ctx)
2380
}
2381
2382
/// Builds a hstack node given an input stream and the expressions to add.
2383
pub fn build_hstack_stream(
2384
input: PhysStream,
2385
exprs: &[ExprIR],
2386
expr_arena: &mut Arena<AExpr>,
2387
phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,
2388
expr_cache: &mut ExprCache,
2389
ctx: StreamingLowerIRContext,
2390
) -> PolarsResult<PhysStream> {
2391
let input_schema = &phys_sm[input.node].output_schema;
2392
if exprs
2393
.iter()
2394
.all(|e| is_elementwise_rec_cached(e.node(), expr_arena, expr_cache))
2395
{
2396
let mut output_schema = input_schema.as_ref().clone();
2397
for expr in exprs {
2398
output_schema.insert(
2399
expr.output_name().clone(),
2400
expr.dtype(input_schema, expr_arena)?
2401
.clone()
2402
.materialize_unknown(true)?,
2403
);
2404
}
2405
let output_schema = Arc::new(output_schema);
2406
2407
let selectors = exprs.to_vec();
2408
let kind = PhysNodeKind::Select {
2409
input,
2410
selectors,
2411
extend_original: true,
2412
};
2413
let node_key = phys_sm.insert(PhysNode {
2414
output_schema,
2415
kind,
2416
});
2417
2418
Ok(PhysStream::first(node_key))
2419
} else {
2420
// We already handled the all-streamable case above, so things get more complicated.
2421
// For simplicity we just do a normal select with all the original columns prepended.
2422
let mut selectors = PlIndexMap::with_capacity(input_schema.len() + exprs.len());
2423
for name in input_schema.iter_names() {
2424
let col_name = name.clone();
2425
let col_expr = expr_arena.add(AExpr::Column(col_name.clone()));
2426
selectors.insert(
2427
name.clone(),
2428
ExprIR::new(col_expr, OutputName::ColumnLhs(col_name)),
2429
);
2430
}
2431
for expr in exprs {
2432
selectors.insert(expr.output_name().clone(), expr.clone());
2433
}
2434
let selectors = selectors.into_values().collect_vec();
2435
build_length_preserving_select_stream(
2436
input, &selectors, expr_arena, phys_sm, expr_cache, ctx,
2437
)
2438
}
2439
}
2440
2441
/// Builds a new selection node given an input stream and the expressions to
2442
/// select for, if needed. Preserves the length of the input, like in with_columns.
2443
pub fn build_length_preserving_select_stream(
2444
input: PhysStream,
2445
exprs: &[ExprIR],
2446
expr_arena: &mut Arena<AExpr>,
2447
phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,
2448
expr_cache: &mut ExprCache,
2449
ctx: StreamingLowerIRContext,
2450
) -> PolarsResult<PhysStream> {
2451
let mut ctx = LowerExprContext {
2452
expr_arena,
2453
phys_sm,
2454
cache: expr_cache,
2455
prepare_visualization: ctx.prepare_visualization,
2456
};
2457
let already_length_preserving = exprs
2458
.iter()
2459
.any(|expr| is_length_preserving_ctx(expr.node(), &mut ctx));
2460
let input_schema = &ctx.phys_sm[input.node].output_schema;
2461
if exprs.is_empty() || input_schema.is_empty() || already_length_preserving {
2462
return build_select_stream_with_ctx(input, exprs, &mut ctx);
2463
}
2464
2465
// Hacky work-around: append an input column with a temporary name, but
2466
// remove it from the final selector. This should ensure scalars gets zipped
2467
// back to the input to broadcast them.
2468
let tmp_name = unique_column_name();
2469
let first_col = ctx.expr_arena.add(AExpr::Column(
2470
input_schema.iter_names_cloned().next().unwrap(),
2471
));
2472
let mut tmp_exprs = Vec::with_capacity(exprs.len() + 1);
2473
tmp_exprs.extend(exprs.iter().cloned());
2474
tmp_exprs.push(ExprIR::new(first_col, OutputName::Alias(tmp_name.clone())));
2475
2476
let out_stream = build_select_stream_with_ctx(input, &tmp_exprs, &mut ctx)?;
2477
let PhysNodeKind::Select { selectors, .. } = &mut ctx.phys_sm[out_stream.node].kind else {
2478
unreachable!()
2479
};
2480
assert!(selectors.pop().unwrap().output_name() == &tmp_name);
2481
let out_schema = Arc::make_mut(&mut phys_sm[out_stream.node].output_schema);
2482
out_schema.shift_remove(tmp_name.as_ref()).unwrap();
2483
Ok(out_stream)
2484
}
2485
2486