Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/physical_plan/lower_expr.rs
6939 views
1
use std::sync::Arc;
2
3
use polars_core::chunked_array::cast::CastOptions;
4
use polars_core::frame::DataFrame;
5
use polars_core::prelude::{
6
DataType, Field, IDX_DTYPE, InitHashMaps, PlHashMap, PlHashSet, PlIndexMap,
7
};
8
use polars_core::schema::{Schema, SchemaExt};
9
use polars_error::PolarsResult;
10
use polars_expr::state::ExecutionState;
11
use polars_expr::{ExpressionConversionState, create_physical_expr};
12
use polars_ops::frame::{JoinArgs, JoinType};
13
use polars_ops::series::{RLE_LENGTH_COLUMN_NAME, RLE_VALUE_COLUMN_NAME};
14
use polars_plan::plans::AExpr;
15
use polars_plan::plans::expr_ir::{ExprIR, OutputName};
16
use polars_plan::prelude::*;
17
use polars_utils::arena::{Arena, Node};
18
use polars_utils::itertools::Itertools;
19
use polars_utils::pl_str::PlSmallStr;
20
use polars_utils::{unique_column_name, unitvec};
21
use slotmap::SlotMap;
22
23
use super::fmt::fmt_exprs;
24
use super::{PhysNode, PhysNodeKey, PhysNodeKind, PhysStream, StreamingLowerIRContext};
25
use crate::physical_plan::lower_group_by::build_group_by_stream;
26
use crate::physical_plan::lower_ir::{build_filter_stream, build_row_idx_stream};
27
28
type ExprNodeKey = Node;
29
30
pub(crate) struct ExprCache {
31
is_elementwise: PlHashMap<Node, bool>,
32
is_input_independent: PlHashMap<Node, bool>,
33
is_length_preserving: PlHashMap<Node, bool>,
34
}
35
36
impl ExprCache {
37
pub fn with_capacity(capacity: usize) -> Self {
38
Self {
39
is_elementwise: PlHashMap::with_capacity(capacity),
40
is_input_independent: PlHashMap::with_capacity(capacity),
41
is_length_preserving: PlHashMap::with_capacity(capacity),
42
}
43
}
44
}
45
46
struct LowerExprContext<'a> {
47
prepare_visualization: bool,
48
expr_arena: &'a mut Arena<AExpr>,
49
phys_sm: &'a mut SlotMap<PhysNodeKey, PhysNode>,
50
cache: &'a mut ExprCache,
51
}
52
53
impl<'a> From<LowerExprContext<'a>> for StreamingLowerIRContext {
54
fn from(value: LowerExprContext<'a>) -> Self {
55
Self {
56
prepare_visualization: value.prepare_visualization,
57
}
58
}
59
}
60
impl<'a> From<&LowerExprContext<'a>> for StreamingLowerIRContext {
61
fn from(value: &LowerExprContext<'a>) -> Self {
62
Self {
63
prepare_visualization: value.prepare_visualization,
64
}
65
}
66
}
67
68
pub(crate) fn is_fake_elementwise_function(expr: &AExpr) -> bool {
69
// The in-memory engine treats ApplyList as elementwise but this is not actually
70
// the case. It doesn't cause any problems for the in-memory engine because of
71
// how it does the execution but it causes errors for new-streaming.
72
73
// Some other functions are also marked as elementwise for filter pushdown
74
// but aren't actually elementwise (e.g. arguments aren't same length).
75
match expr {
76
AExpr::Function { function, .. } => {
77
use IRFunctionExpr as F;
78
match function {
79
#[cfg(feature = "is_in")]
80
F::Boolean(IRBooleanFunction::IsIn { .. }) => true,
81
#[cfg(feature = "replace")]
82
F::Replace | F::ReplaceStrict { .. } => true,
83
_ => false,
84
}
85
},
86
_ => false,
87
}
88
}
89
90
pub(crate) fn is_elementwise_rec_cached(
91
expr_key: ExprNodeKey,
92
arena: &Arena<AExpr>,
93
cache: &mut ExprCache,
94
) -> bool {
95
if !cache.is_elementwise.contains_key(&expr_key) {
96
cache.is_elementwise.insert(
97
expr_key,
98
(|| {
99
let mut expr_key = expr_key;
100
let mut stack = unitvec![];
101
102
loop {
103
let ae = arena.get(expr_key);
104
105
if is_fake_elementwise_function(ae) {
106
return false;
107
}
108
109
if !polars_plan::plans::is_elementwise(&mut stack, ae, arena) {
110
return false;
111
}
112
113
let Some(next_key) = stack.pop() else {
114
break;
115
};
116
117
expr_key = next_key;
118
}
119
120
true
121
})(),
122
);
123
}
124
125
*cache.is_elementwise.get(&expr_key).unwrap()
126
}
127
128
#[recursive::recursive]
129
pub fn is_input_independent_rec(
130
expr_key: ExprNodeKey,
131
arena: &Arena<AExpr>,
132
cache: &mut PlHashMap<ExprNodeKey, bool>,
133
) -> bool {
134
if let Some(ret) = cache.get(&expr_key) {
135
return *ret;
136
}
137
138
let ret = match arena.get(expr_key) {
139
AExpr::Explode { expr: inner, .. }
140
| AExpr::Cast {
141
expr: inner,
142
dtype: _,
143
options: _,
144
}
145
| AExpr::Sort {
146
expr: inner,
147
options: _,
148
} => is_input_independent_rec(*inner, arena, cache),
149
AExpr::Column(_) => false,
150
AExpr::Literal(_) => true,
151
AExpr::BinaryExpr { left, op: _, right } => {
152
is_input_independent_rec(*left, arena, cache)
153
&& is_input_independent_rec(*right, arena, cache)
154
},
155
AExpr::Gather {
156
expr,
157
idx,
158
returns_scalar: _,
159
} => {
160
is_input_independent_rec(*expr, arena, cache)
161
&& is_input_independent_rec(*idx, arena, cache)
162
},
163
AExpr::SortBy {
164
expr,
165
by,
166
sort_options: _,
167
} => {
168
is_input_independent_rec(*expr, arena, cache)
169
&& by
170
.iter()
171
.all(|expr| is_input_independent_rec(*expr, arena, cache))
172
},
173
AExpr::Filter { input, by } => {
174
is_input_independent_rec(*input, arena, cache)
175
&& is_input_independent_rec(*by, arena, cache)
176
},
177
AExpr::Agg(agg_expr) => match agg_expr.get_input() {
178
polars_plan::plans::NodeInputs::Leaf => true,
179
polars_plan::plans::NodeInputs::Single(expr) => {
180
is_input_independent_rec(expr, arena, cache)
181
},
182
polars_plan::plans::NodeInputs::Many(exprs) => exprs
183
.iter()
184
.all(|expr| is_input_independent_rec(*expr, arena, cache)),
185
},
186
AExpr::Ternary {
187
predicate,
188
truthy,
189
falsy,
190
} => {
191
is_input_independent_rec(*predicate, arena, cache)
192
&& is_input_independent_rec(*truthy, arena, cache)
193
&& is_input_independent_rec(*falsy, arena, cache)
194
},
195
AExpr::AnonymousFunction {
196
input,
197
function: _,
198
options: _,
199
fmt_str: _,
200
}
201
| AExpr::Function {
202
input,
203
function: _,
204
options: _,
205
} => input
206
.iter()
207
.all(|expr| is_input_independent_rec(expr.node(), arena, cache)),
208
AExpr::Eval {
209
expr,
210
evaluation: _,
211
variant: _,
212
} => is_input_independent_rec(*expr, arena, cache),
213
AExpr::Window {
214
function,
215
partition_by,
216
order_by,
217
options: _,
218
} => {
219
is_input_independent_rec(*function, arena, cache)
220
&& partition_by
221
.iter()
222
.all(|expr| is_input_independent_rec(*expr, arena, cache))
223
&& order_by
224
.iter()
225
.all(|(expr, _options)| is_input_independent_rec(*expr, arena, cache))
226
},
227
AExpr::Slice {
228
input,
229
offset,
230
length,
231
} => {
232
is_input_independent_rec(*input, arena, cache)
233
&& is_input_independent_rec(*offset, arena, cache)
234
&& is_input_independent_rec(*length, arena, cache)
235
},
236
AExpr::Len => false,
237
};
238
239
cache.insert(expr_key, ret);
240
ret
241
}
242
243
pub fn is_input_independent(
244
expr_key: ExprNodeKey,
245
expr_arena: &Arena<AExpr>,
246
cache: &mut ExprCache,
247
) -> bool {
248
is_input_independent_rec(expr_key, expr_arena, &mut cache.is_input_independent)
249
}
250
251
fn is_input_independent_ctx(expr_key: ExprNodeKey, ctx: &mut LowerExprContext) -> bool {
252
is_input_independent_rec(
253
expr_key,
254
ctx.expr_arena,
255
&mut ctx.cache.is_input_independent,
256
)
257
}
258
259
fn build_input_independent_node_with_ctx(
260
exprs: &[ExprIR],
261
ctx: &mut LowerExprContext,
262
) -> PolarsResult<PhysNodeKey> {
263
let output_schema = compute_output_schema(&Schema::default(), exprs, ctx.expr_arena)?;
264
Ok(ctx.phys_sm.insert(PhysNode::new(
265
output_schema,
266
PhysNodeKind::InputIndependentSelect {
267
selectors: exprs.to_vec(),
268
},
269
)))
270
}
271
272
#[recursive::recursive]
273
pub fn is_length_preserving_rec(
274
expr_key: ExprNodeKey,
275
arena: &Arena<AExpr>,
276
cache: &mut PlHashMap<ExprNodeKey, bool>,
277
) -> bool {
278
if let Some(ret) = cache.get(&expr_key) {
279
return *ret;
280
}
281
282
let ret = match arena.get(expr_key) {
283
AExpr::Gather { .. }
284
| AExpr::Explode { .. }
285
| AExpr::Filter { .. }
286
| AExpr::Agg(_)
287
| AExpr::Slice { .. }
288
| AExpr::Len
289
| AExpr::Literal(_) => false,
290
291
AExpr::Column(_) => true,
292
293
AExpr::Cast {
294
expr: inner,
295
dtype: _,
296
options: _,
297
}
298
| AExpr::Sort {
299
expr: inner,
300
options: _,
301
}
302
| AExpr::SortBy {
303
expr: inner,
304
by: _,
305
sort_options: _,
306
} => is_length_preserving_rec(*inner, arena, cache),
307
308
AExpr::BinaryExpr { left, op: _, right } => {
309
// As long as at least one input is length-preserving the other side
310
// should either broadcast or have the same length.
311
is_length_preserving_rec(*left, arena, cache)
312
|| is_length_preserving_rec(*right, arena, cache)
313
},
314
AExpr::Ternary {
315
predicate,
316
truthy,
317
falsy,
318
} => {
319
is_length_preserving_rec(*predicate, arena, cache)
320
|| is_length_preserving_rec(*truthy, arena, cache)
321
|| is_length_preserving_rec(*falsy, arena, cache)
322
},
323
AExpr::AnonymousFunction {
324
input,
325
function: _,
326
options,
327
fmt_str: _,
328
}
329
| AExpr::Function {
330
input,
331
function: _,
332
options,
333
} => {
334
// FIXME: actually inspect the functions? This is overly conservative.
335
options.is_length_preserving()
336
&& input
337
.iter()
338
.all(|expr| is_length_preserving_rec(expr.node(), arena, cache))
339
},
340
AExpr::Eval { .. } => true,
341
AExpr::Window {
342
function: _, // Actually shouldn't matter for window functions.
343
partition_by: _,
344
order_by: _,
345
options,
346
} => !matches!(options, WindowType::Over(WindowMapping::Explode)),
347
};
348
349
cache.insert(expr_key, ret);
350
ret
351
}
352
353
#[expect(dead_code)]
354
pub fn is_length_preserving(
355
expr_key: ExprNodeKey,
356
expr_arena: &Arena<AExpr>,
357
cache: &mut ExprCache,
358
) -> bool {
359
is_length_preserving_rec(expr_key, expr_arena, &mut cache.is_length_preserving)
360
}
361
362
fn is_length_preserving_ctx(expr_key: ExprNodeKey, ctx: &mut LowerExprContext) -> bool {
363
is_length_preserving_rec(
364
expr_key,
365
ctx.expr_arena,
366
&mut ctx.cache.is_length_preserving,
367
)
368
}
369
370
fn build_fallback_node_with_ctx(
371
input: PhysStream,
372
exprs: &[ExprIR],
373
ctx: &mut LowerExprContext,
374
) -> PolarsResult<PhysNodeKey> {
375
// Pre-select only the columns that are needed for this fallback expression.
376
let input_schema = &ctx.phys_sm[input.node].output_schema;
377
let mut select_names: PlHashSet<_> = exprs
378
.iter()
379
.flat_map(|expr| polars_plan::utils::aexpr_to_leaf_names_iter(expr.node(), ctx.expr_arena))
380
.collect();
381
// To keep the length correct we have to ensure we select at least one
382
// column.
383
if select_names.is_empty() {
384
if let Some(name) = input_schema.iter_names().next() {
385
select_names.insert(name.clone());
386
}
387
}
388
let input_stream = if input_schema
389
.iter_names()
390
.any(|name| !select_names.contains(name.as_str()))
391
{
392
let select_exprs = select_names
393
.into_iter()
394
.map(|name| {
395
ExprIR::new(
396
ctx.expr_arena.add(AExpr::Column(name.clone())),
397
OutputName::ColumnLhs(name),
398
)
399
})
400
.collect_vec();
401
build_select_stream_with_ctx(input, &select_exprs, ctx)?
402
} else {
403
input
404
};
405
406
let output_schema = schema_for_select(input_stream, exprs, ctx)?;
407
let mut conv_state = ExpressionConversionState::new(false);
408
let phys_exprs = exprs
409
.iter()
410
.map(|expr| {
411
create_physical_expr(
412
expr,
413
Context::Default,
414
ctx.expr_arena,
415
&ctx.phys_sm[input_stream.node].output_schema,
416
&mut conv_state,
417
)
418
})
419
.try_collect_vec()?;
420
let map = move |df| {
421
let exec_state = ExecutionState::new();
422
let columns = phys_exprs
423
.iter()
424
.map(|phys_expr| phys_expr.evaluate(&df, &exec_state))
425
.try_collect()?;
426
DataFrame::new_with_broadcast(columns)
427
};
428
429
let format_str = ctx.prepare_visualization.then(|| {
430
let mut buffer = String::new();
431
buffer.push_str("SELECT [\n");
432
fmt_exprs(
433
&mut buffer,
434
exprs,
435
ctx.expr_arena,
436
super::fmt::FormatExprStyle::Select,
437
);
438
buffer.push(']');
439
buffer
440
});
441
let kind = PhysNodeKind::InMemoryMap {
442
input: input_stream,
443
map: Arc::new(map),
444
format_str,
445
};
446
Ok(ctx.phys_sm.insert(PhysNode::new(output_schema, kind)))
447
}
448
449
fn simplify_input_streams(
450
orig_input: PhysStream,
451
mut input_streams: PlHashSet<PhysStream>,
452
ctx: &mut LowerExprContext,
453
) -> PolarsResult<PlHashSet<PhysStream>> {
454
// Flatten nested zips (ensures the original input columns only occur once).
455
if input_streams.len() > 1 {
456
let mut flattened_input_streams = PlHashSet::with_capacity(input_streams.len());
457
for input_stream in input_streams {
458
if let PhysNodeKind::Zip {
459
inputs,
460
null_extend: false,
461
} = &ctx.phys_sm[input_stream.node].kind
462
{
463
flattened_input_streams.extend(inputs);
464
ctx.phys_sm.remove(input_stream.node);
465
} else {
466
flattened_input_streams.insert(input_stream);
467
}
468
}
469
input_streams = flattened_input_streams;
470
}
471
472
// Merge reduce nodes that directly operate on the original input.
473
let mut combined_exprs = vec![];
474
input_streams = input_streams
475
.into_iter()
476
.filter(|input_stream| {
477
if let PhysNodeKind::Reduce {
478
input: inner,
479
exprs,
480
} = &ctx.phys_sm[input_stream.node].kind
481
{
482
if *inner == orig_input {
483
combined_exprs.extend(exprs.iter().cloned());
484
ctx.phys_sm.remove(input_stream.node);
485
return false;
486
}
487
}
488
true
489
})
490
.collect();
491
if !combined_exprs.is_empty() {
492
let output_schema = schema_for_select(orig_input, &combined_exprs, ctx)?;
493
let kind = PhysNodeKind::Reduce {
494
input: orig_input,
495
exprs: combined_exprs,
496
};
497
let reduce_node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, kind));
498
input_streams.insert(PhysStream::first(reduce_node_key));
499
}
500
501
Ok(input_streams)
502
}
503
504
// Assuming that agg_node is a single-input reduction, lowers its input recursively
505
// and returns a Reduce node as well a node corresponding to the column to select
506
// from the Reduce node for the aggregate.
507
fn lower_unary_reduce_node(
508
input: PhysStream,
509
agg_node: Node,
510
ctx: &mut LowerExprContext,
511
) -> PolarsResult<(PhysStream, Node)> {
512
let agg_aexpr = ctx.expr_arena.get(agg_node).clone();
513
let mut agg_input = Vec::with_capacity(1);
514
agg_aexpr.inputs_rev(&mut agg_input);
515
assert!(agg_input.len() == 1);
516
517
let (trans_input, trans_exprs) = lower_exprs_with_ctx(input, &agg_input, ctx)?;
518
let trans_agg_node = ctx.expr_arena.add(agg_aexpr.replace_inputs(&trans_exprs));
519
520
let out_name = unique_column_name();
521
let expr_ir = ExprIR::new(trans_agg_node, OutputName::Alias(out_name.clone()));
522
let output_schema = schema_for_select(trans_input, std::slice::from_ref(&expr_ir), ctx)?;
523
let kind = PhysNodeKind::Reduce {
524
input: trans_input,
525
exprs: vec![expr_ir],
526
};
527
528
let reduce_node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, kind));
529
let reduce_stream = PhysStream::first(reduce_node_key);
530
let out_node = ctx.expr_arena.add(AExpr::Column(out_name));
531
Ok((reduce_stream, out_node))
532
}
533
534
// In the recursive lowering we don't bother with named expressions at all, so
535
// we work directly with Nodes.
536
#[recursive::recursive]
537
fn lower_exprs_with_ctx(
538
input: PhysStream,
539
exprs: &[Node],
540
ctx: &mut LowerExprContext,
541
) -> PolarsResult<(PhysStream, Vec<Node>)> {
542
// We have to catch this case separately, in case all the input independent expressions are elementwise.
543
// TODO: we shouldn't always do this when recursing, e.g. pl.col.a.sum() + 1 will still hit this in the recursion.
544
if exprs.iter().all(|e| is_input_independent_ctx(*e, ctx)) {
545
let expr_irs = exprs
546
.iter()
547
.map(|e| ExprIR::new(*e, OutputName::Alias(unique_column_name())))
548
.collect_vec();
549
let node = build_input_independent_node_with_ctx(&expr_irs, ctx)?;
550
let out_exprs = expr_irs
551
.iter()
552
.map(|e| ctx.expr_arena.add(AExpr::Column(e.output_name().clone())))
553
.collect();
554
return Ok((PhysStream::first(node), out_exprs));
555
}
556
557
// Fallback expressions that can directly be applied to the original input.
558
let mut fallback_subset = Vec::new();
559
560
// Streams containing the columns used for executing transformed expressions.
561
let mut input_streams = PlHashSet::new();
562
563
// The final transformed expressions that will be selected from the zipped
564
// together transformed nodes.
565
let mut transformed_exprs = Vec::with_capacity(exprs.len());
566
567
for expr in exprs.iter().copied() {
568
if is_elementwise_rec_cached(expr, ctx.expr_arena, ctx.cache) {
569
if !is_input_independent_ctx(expr, ctx) {
570
input_streams.insert(input);
571
}
572
transformed_exprs.push(expr);
573
continue;
574
}
575
576
match ctx.expr_arena.get(expr).clone() {
577
AExpr::Explode {
578
expr: inner,
579
skip_empty,
580
} => {
581
// While explode is streamable, it is not elementwise, so we
582
// have to transform it to a select node.
583
let (trans_input, trans_exprs) = lower_exprs_with_ctx(input, &[inner], ctx)?;
584
let exploded_name = unique_column_name();
585
let trans_inner = ctx.expr_arena.add(AExpr::Explode {
586
expr: trans_exprs[0],
587
skip_empty,
588
});
589
let explode_expr =
590
ExprIR::new(trans_inner, OutputName::Alias(exploded_name.clone()));
591
let output_schema =
592
schema_for_select(trans_input, std::slice::from_ref(&explode_expr), ctx)?;
593
let node_kind = PhysNodeKind::Select {
594
input: trans_input,
595
selectors: vec![explode_expr.clone()],
596
extend_original: false,
597
};
598
let node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, node_kind));
599
input_streams.insert(PhysStream::first(node_key));
600
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(exploded_name)));
601
},
602
AExpr::Column(_) => unreachable!("column should always be streamable"),
603
AExpr::Literal(_) => {
604
let out_name = unique_column_name();
605
let inner_expr = ExprIR::new(expr, OutputName::Alias(out_name.clone()));
606
let node_key = build_input_independent_node_with_ctx(&[inner_expr], ctx)?;
607
input_streams.insert(PhysStream::first(node_key));
608
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));
609
},
610
611
AExpr::Function {
612
input: ref inner_exprs,
613
function: IRFunctionExpr::Repeat,
614
options: _,
615
} => {
616
assert!(inner_exprs.len() == 2);
617
let out_name = unique_column_name();
618
let value_expr_ir = inner_exprs[0].with_alias(out_name.clone());
619
let repeats_expr_ir = inner_exprs[1].clone();
620
let value_stream = build_select_stream_with_ctx(input, &[value_expr_ir], ctx)?;
621
let repeats_stream = build_select_stream_with_ctx(input, &[repeats_expr_ir], ctx)?;
622
623
let output_schema = ctx.phys_sm[value_stream.node].output_schema.clone();
624
let kind = PhysNodeKind::Repeat {
625
value: value_stream,
626
repeats: repeats_stream,
627
};
628
let repeat_node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, kind));
629
input_streams.insert(PhysStream::first(repeat_node_key));
630
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));
631
},
632
633
AExpr::Function {
634
input: ref inner_exprs,
635
function: IRFunctionExpr::ExtendConstant,
636
options: _,
637
} => {
638
assert!(inner_exprs.len() == 3);
639
let input_schema = &ctx.phys_sm[input.node].output_schema;
640
let out_name = unique_column_name();
641
let first_ir = inner_exprs[0].with_alias(out_name.clone());
642
let out_dtype = first_ir.dtype(input_schema, ctx.expr_arena)?;
643
let mut value_expr_ir = inner_exprs[1].with_alias(out_name.clone());
644
let repeats_expr_ir = inner_exprs[2].clone();
645
646
// Cast the value if necessary.
647
if value_expr_ir.dtype(input_schema, ctx.expr_arena)? != out_dtype {
648
let cast_expr = AExpr::Cast {
649
expr: value_expr_ir.node(),
650
dtype: out_dtype.clone(),
651
options: CastOptions::NonStrict,
652
};
653
value_expr_ir = ExprIR::new(
654
ctx.expr_arena.add(cast_expr),
655
OutputName::Alias(out_name.clone()),
656
);
657
}
658
659
let first_stream = build_select_stream_with_ctx(input, &[first_ir], ctx)?;
660
let value_stream = build_select_stream_with_ctx(input, &[value_expr_ir], ctx)?;
661
let repeats_stream = build_select_stream_with_ctx(input, &[repeats_expr_ir], ctx)?;
662
663
let output_schema = ctx.phys_sm[first_stream.node].output_schema.clone();
664
let repeat_kind = PhysNodeKind::Repeat {
665
value: value_stream,
666
repeats: repeats_stream,
667
};
668
let repeat_node_key = ctx
669
.phys_sm
670
.insert(PhysNode::new(output_schema.clone(), repeat_kind));
671
672
let concat_kind = PhysNodeKind::OrderedUnion {
673
inputs: vec![first_stream, PhysStream::first(repeat_node_key)],
674
};
675
let concat_node_key = ctx
676
.phys_sm
677
.insert(PhysNode::new(output_schema, concat_kind));
678
input_streams.insert(PhysStream::first(concat_node_key));
679
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));
680
},
681
682
AExpr::Function {
683
input: ref inner_exprs,
684
function: IRFunctionExpr::ConcatExpr(_rechunk),
685
options: _,
686
} => {
687
// We have to lower each expression separately as they might have different lengths.
688
let mut concat_streams = Vec::new();
689
let out_name = unique_column_name();
690
for inner_expr in inner_exprs {
691
let (trans_input, trans_expr) =
692
lower_exprs_with_ctx(input, &[inner_expr.node()], ctx)?;
693
let select_expr =
694
ExprIR::new(trans_expr[0], OutputName::Alias(out_name.clone()));
695
concat_streams.push(build_select_stream_with_ctx(
696
trans_input,
697
&[select_expr],
698
ctx,
699
)?);
700
}
701
702
let output_schema = ctx.phys_sm[concat_streams[0].node].output_schema.clone();
703
let node_kind = PhysNodeKind::OrderedUnion {
704
inputs: concat_streams,
705
};
706
let node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, node_kind));
707
input_streams.insert(PhysStream::first(node_key));
708
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));
709
},
710
711
AExpr::Function {
712
input: ref inner_exprs,
713
function: IRFunctionExpr::Unique(maintain_order),
714
options: _,
715
} => {
716
assert!(inner_exprs.len() == 1);
717
// Lower to no-aggregate group-by with unique name.
718
let tmp_name = unique_column_name();
719
let (trans_input, trans_inner_exprs) =
720
lower_exprs_with_ctx(input, &[inner_exprs[0].node()], ctx)?;
721
let group_by_key_expr =
722
ExprIR::new(trans_inner_exprs[0], OutputName::Alias(tmp_name.clone()));
723
let group_by_output_schema =
724
schema_for_select(trans_input, std::slice::from_ref(&group_by_key_expr), ctx)?;
725
let group_by_stream = build_group_by_stream(
726
trans_input,
727
&[group_by_key_expr],
728
&[],
729
group_by_output_schema,
730
maintain_order,
731
Arc::new(GroupbyOptions::default()),
732
None,
733
ctx.expr_arena,
734
ctx.phys_sm,
735
ctx.cache,
736
StreamingLowerIRContext::from(&*ctx),
737
)?;
738
input_streams.insert(group_by_stream);
739
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(tmp_name)));
740
},
741
742
AExpr::Function {
743
input: ref inner_exprs,
744
function: IRFunctionExpr::UniqueCounts,
745
options: _,
746
} => {
747
// Transform:
748
// expr.unique_counts().alias(name)
749
// ->
750
// .select(expr.alias(name))
751
// .group_by(_ = name, maintain_order=True)
752
// .agg(name = pl.len())
753
// .select(name)
754
755
assert_eq!(inner_exprs.len(), 1);
756
757
let input_schema = &ctx.phys_sm[input.node].output_schema;
758
759
let key_name = unique_column_name();
760
let tmp_count_name = unique_column_name();
761
762
let input_expr = &inner_exprs[0];
763
let output_dtype = input_expr.dtype(input_schema, ctx.expr_arena)?.clone();
764
let group_by_output_schema = Arc::new(Schema::from_iter([
765
(key_name.clone(), output_dtype),
766
(tmp_count_name.clone(), IDX_DTYPE),
767
]));
768
769
let keys = [input_expr.with_alias(key_name)];
770
let aggs = [ExprIR::new(
771
ctx.expr_arena.add(AExpr::Len),
772
OutputName::Alias(tmp_count_name.clone()),
773
)];
774
775
let stream = build_group_by_stream(
776
input,
777
&keys,
778
&aggs,
779
group_by_output_schema,
780
true,
781
Default::default(),
782
None,
783
ctx.expr_arena,
784
ctx.phys_sm,
785
ctx.cache,
786
StreamingLowerIRContext {
787
prepare_visualization: ctx.prepare_visualization,
788
},
789
)?;
790
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(tmp_count_name)));
791
input_streams.insert(stream);
792
},
793
AExpr::Function {
794
input: ref inner_exprs,
795
function:
796
IRFunctionExpr::ValueCounts {
797
sort: false,
798
parallel: _,
799
name: count_name,
800
normalize: false,
801
},
802
options: _,
803
} => {
804
// Transform:
805
// expr.value_counts(
806
// sort=False,
807
// parallel=_,
808
// name=count_name,
809
// normalize=False
810
// ).alias(name)
811
// ->
812
// .select(expr.alias(name))
813
// .group_by(name)
814
// .agg(count_name = pl.len())
815
// .select(pl.struct([name, count_name]))
816
817
assert_eq!(inner_exprs.len(), 1);
818
819
let input_schema = &ctx.phys_sm[input.node].output_schema;
820
821
let tmp_value_name = unique_column_name();
822
let tmp_count_name = unique_column_name();
823
824
let input_expr = &inner_exprs[0];
825
let output_field = input_expr.field(input_schema, ctx.expr_arena)?;
826
let group_by_output_schema = Arc::new(Schema::from_iter([
827
output_field.clone().with_name(tmp_value_name.clone()),
828
Field::new(tmp_count_name.clone(), IDX_DTYPE),
829
]));
830
831
let keys = [input_expr.with_alias(tmp_value_name.clone())];
832
let aggs = [ExprIR::new(
833
ctx.expr_arena.add(AExpr::Len),
834
OutputName::Alias(tmp_count_name.clone()),
835
)];
836
837
let stream = build_group_by_stream(
838
input,
839
&keys,
840
&aggs,
841
group_by_output_schema,
842
false,
843
Default::default(),
844
None,
845
ctx.expr_arena,
846
ctx.phys_sm,
847
ctx.cache,
848
StreamingLowerIRContext {
849
prepare_visualization: ctx.prepare_visualization,
850
},
851
)?;
852
853
let value = ExprIR::new(
854
ctx.expr_arena.add(AExpr::Column(tmp_value_name)),
855
OutputName::Alias(output_field.name),
856
);
857
let count = ExprIR::new(
858
ctx.expr_arena.add(AExpr::Column(tmp_count_name)),
859
OutputName::Alias(count_name.clone()),
860
);
861
862
transformed_exprs.push(
863
AExprBuilder::function(
864
vec![value, count],
865
IRFunctionExpr::AsStruct,
866
ctx.expr_arena,
867
)
868
.node(),
869
);
870
input_streams.insert(stream);
871
},
872
873
AExpr::Function {
874
input: ref inner_exprs,
875
function: IRFunctionExpr::ArgUnique,
876
options: _,
877
} => {
878
// Transform:
879
// expr.arg_unique()
880
// ->
881
// .with_row_index(IDX)
882
// .group_by(expr)
883
// .agg(IDX = IDX.first())
884
// .select(IDX.sort())
885
886
assert_eq!(inner_exprs.len(), 1);
887
888
let expr_name = unique_column_name();
889
let idx_name = unique_column_name();
890
891
let stream = build_select_stream_with_ctx(
892
input,
893
&[inner_exprs[0].with_alias(expr_name.clone())],
894
ctx,
895
)?;
896
897
let mut group_by_output_schema =
898
ctx.phys_sm[stream.node].output_schema.as_ref().clone();
899
group_by_output_schema.insert(idx_name.clone(), IDX_DTYPE);
900
901
let stream = build_row_idx_stream(stream, idx_name.clone(), None, ctx.phys_sm);
902
903
let keys =
904
[AExprBuilder::col(expr_name.clone(), ctx.expr_arena).expr_ir(expr_name)];
905
let aggs = [AExprBuilder::col(idx_name.clone(), ctx.expr_arena)
906
.first(ctx.expr_arena)
907
.expr_ir(idx_name.clone())];
908
909
let stream = build_group_by_stream(
910
stream,
911
&keys,
912
&aggs,
913
Arc::new(group_by_output_schema),
914
false,
915
Default::default(),
916
None,
917
ctx.expr_arena,
918
ctx.phys_sm,
919
ctx.cache,
920
StreamingLowerIRContext {
921
prepare_visualization: ctx.prepare_visualization,
922
},
923
)?;
924
925
let expr = AExprBuilder::col(idx_name.clone(), ctx.expr_arena)
926
.sort(Default::default(), ctx.expr_arena)
927
.expr_ir(idx_name.clone());
928
let stream = build_select_stream_with_ctx(stream, &[expr], ctx)?;
929
930
transformed_exprs.push(AExprBuilder::col(idx_name.clone(), ctx.expr_arena).node());
931
input_streams.insert(stream);
932
},
933
934
#[cfg(feature = "is_in")]
935
AExpr::Function {
936
input: ref inner_exprs,
937
function: IRFunctionExpr::Boolean(IRBooleanFunction::IsIn { nulls_equal }),
938
options: _,
939
} if is_scalar_ae(inner_exprs[1].node(), ctx.expr_arena) => {
940
// Translate left and right side separately (they could have different lengths).
941
let left_on_name = unique_column_name();
942
let right_on_name = unique_column_name();
943
let (trans_input_left, trans_expr_left) =
944
lower_exprs_with_ctx(input, &[inner_exprs[0].node()], ctx)?;
945
let right_expr_exploded_node = match ctx.expr_arena.get(inner_exprs[1].node()) {
946
// expr.implode().explode() ~= expr (and avoids rechunking)
947
AExpr::Agg(IRAggExpr::Implode(n)) => *n,
948
_ => ctx.expr_arena.add(AExpr::Explode {
949
expr: inner_exprs[1].node(),
950
skip_empty: true,
951
}),
952
};
953
let (trans_input_right, trans_expr_right) =
954
lower_exprs_with_ctx(input, &[right_expr_exploded_node], ctx)?;
955
956
// We have to ensure the left input has the right name for the semi-anti-join to
957
// generate the correct output name.
958
let left_col_expr = ctx.expr_arena.add(AExpr::Column(left_on_name.clone()));
959
let left_select_stream = build_select_stream_with_ctx(
960
trans_input_left,
961
&[ExprIR::new(
962
trans_expr_left[0],
963
OutputName::Alias(left_on_name.clone()),
964
)],
965
ctx,
966
)?;
967
968
let node_kind = PhysNodeKind::SemiAntiJoin {
969
input_left: left_select_stream,
970
input_right: trans_input_right,
971
left_on: vec![ExprIR::new(
972
left_col_expr,
973
OutputName::Alias(left_on_name.clone()),
974
)],
975
right_on: vec![ExprIR::new(
976
trans_expr_right[0],
977
OutputName::Alias(right_on_name),
978
)],
979
args: JoinArgs {
980
how: JoinType::Semi,
981
validation: Default::default(),
982
suffix: None,
983
slice: None,
984
nulls_equal,
985
coalesce: Default::default(),
986
maintain_order: Default::default(),
987
},
988
output_bool: true,
989
};
990
991
// SemiAntiJoin with output_bool returns a column with the same name as the first
992
// input column.
993
let output_schema = Schema::from_iter([(left_on_name.clone(), DataType::Boolean)]);
994
let node_key = ctx
995
.phys_sm
996
.insert(PhysNode::new(Arc::new(output_schema), node_kind));
997
input_streams.insert(PhysStream::first(node_key));
998
transformed_exprs.push(left_col_expr);
999
},
1000
1001
#[cfg(feature = "cum_agg")]
1002
AExpr::Function {
1003
input: ref inner_exprs,
1004
function:
1005
ref function @ (IRFunctionExpr::CumMin { reverse }
1006
| IRFunctionExpr::CumMax { reverse }
1007
| IRFunctionExpr::CumSum { reverse }
1008
| IRFunctionExpr::CumCount { reverse }
1009
| IRFunctionExpr::CumProd { reverse }),
1010
options: _,
1011
} if !reverse => {
1012
use crate::nodes::cum_agg::CumAggKind;
1013
1014
assert_eq!(inner_exprs.len(), 1);
1015
1016
let input_schema = &ctx.phys_sm[input.node].output_schema;
1017
1018
let value_key = unique_column_name();
1019
let value_dtype = inner_exprs[0].dtype(input_schema, ctx.expr_arena)?;
1020
1021
let input = build_select_stream_with_ctx(
1022
input,
1023
&[inner_exprs[0].with_alias(value_key.clone())],
1024
ctx,
1025
)?;
1026
let kind = match function {
1027
IRFunctionExpr::CumMin { .. } => CumAggKind::Min,
1028
IRFunctionExpr::CumMax { .. } => CumAggKind::Max,
1029
IRFunctionExpr::CumSum { .. } => CumAggKind::Sum,
1030
IRFunctionExpr::CumCount { .. } => CumAggKind::Count,
1031
IRFunctionExpr::CumProd { .. } => CumAggKind::Prod,
1032
_ => unreachable!(),
1033
};
1034
let node_kind = PhysNodeKind::CumAgg { input, kind };
1035
1036
let output_schema = Schema::from_iter([(value_key.clone(), value_dtype.clone())]);
1037
let node_key = ctx
1038
.phys_sm
1039
.insert(PhysNode::new(Arc::new(output_schema), node_kind));
1040
input_streams.insert(PhysStream::first(node_key));
1041
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(value_key)));
1042
},
1043
1044
#[cfg(feature = "diff")]
1045
AExpr::Function {
1046
input: ref inner_exprs,
1047
function: IRFunctionExpr::Diff(null_behavior),
1048
options: _,
1049
} => {
1050
use polars_core::scalar::Scalar;
1051
use polars_core::series::ops::NullBehavior;
1052
1053
assert_eq!(inner_exprs.len(), 2);
1054
1055
// Transform:
1056
// expr.diff(offset, "ignore")
1057
// ->
1058
// expr - expr.shift(offset)
1059
1060
let base_expr_ir = &inner_exprs[0];
1061
let base_dtype =
1062
base_expr_ir.dtype(&ctx.phys_sm[input.node].output_schema, ctx.expr_arena)?;
1063
let offset_expr_ir = &inner_exprs[1];
1064
let offset_dtype =
1065
offset_expr_ir.dtype(&ctx.phys_sm[input.node].output_schema, ctx.expr_arena)?;
1066
1067
let mut base = AExprBuilder::new_from_node(base_expr_ir.node());
1068
let cast_dtype = match base_dtype {
1069
DataType::UInt8 => Some(DataType::Int16),
1070
DataType::UInt16 => Some(DataType::Int32),
1071
DataType::UInt32 | DataType::UInt64 => Some(DataType::Int64),
1072
_ => None,
1073
};
1074
if let Some(dtype) = cast_dtype {
1075
base = base.cast(dtype, ctx.expr_arena);
1076
}
1077
1078
let mut offset = AExprBuilder::new_from_node(offset_expr_ir.node());
1079
if offset_dtype != &DataType::Int64 {
1080
offset = offset.cast(DataType::Int64, ctx.expr_arena);
1081
}
1082
1083
let shifted = base.shift(offset.node(), ctx.expr_arena);
1084
let mut output = base.minus(shifted.node(), ctx.expr_arena);
1085
1086
if null_behavior == NullBehavior::Drop {
1087
// Without the column size, slice can only remove leading nulls.
1088
// So if the offset was negative, the nulls appeared at the end of the column.
1089
// In that case, shift the column forward to move the nulls back to the front.
1090
let zero_literal =
1091
AExprBuilder::lit(LiteralValue::new_idxsize(0), ctx.expr_arena);
1092
let offset_neg = offset.negate(ctx.expr_arena);
1093
let offset_if_negative = AExprBuilder::function(
1094
vec![offset_neg.expr_ir_unnamed(), zero_literal.expr_ir_unnamed()],
1095
IRFunctionExpr::MaxHorizontal,
1096
ctx.expr_arena,
1097
);
1098
output = output.shift(offset_if_negative, ctx.expr_arena);
1099
1100
// Remove the nulls that were introduced by the shift
1101
let offset_abs = offset.abs(ctx.expr_arena);
1102
let null_literal = AExprBuilder::lit(
1103
LiteralValue::Scalar(Scalar::null(DataType::Int64)),
1104
ctx.expr_arena,
1105
);
1106
output = output.slice(offset_abs, null_literal, ctx.expr_arena);
1107
}
1108
1109
let (stream, nodes) = lower_exprs_with_ctx(input, &[output.node()], ctx)?;
1110
input_streams.insert(stream);
1111
transformed_exprs.extend(nodes);
1112
},
1113
1114
AExpr::Function {
1115
input: ref inner_exprs,
1116
function: IRFunctionExpr::RLE,
1117
options: _,
1118
} => {
1119
assert_eq!(inner_exprs.len(), 1);
1120
1121
let input_schema = &ctx.phys_sm[input.node].output_schema;
1122
1123
let value_key = unique_column_name();
1124
let value_dtype = inner_exprs[0].dtype(input_schema, ctx.expr_arena)?;
1125
1126
let input = build_select_stream_with_ctx(
1127
input,
1128
&[inner_exprs[0].with_alias(value_key.clone())],
1129
ctx,
1130
)?;
1131
let node_kind = PhysNodeKind::Rle(input);
1132
1133
let output_schema = Schema::from_iter([(
1134
value_key.clone(),
1135
DataType::Struct(vec![
1136
Field::new(
1137
PlSmallStr::from_static(RLE_VALUE_COLUMN_NAME),
1138
value_dtype.clone(),
1139
),
1140
Field::new(PlSmallStr::from_static(RLE_LENGTH_COLUMN_NAME), IDX_DTYPE),
1141
]),
1142
)]);
1143
let node_key = ctx
1144
.phys_sm
1145
.insert(PhysNode::new(Arc::new(output_schema), node_kind));
1146
input_streams.insert(PhysStream::first(node_key));
1147
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(value_key)));
1148
},
1149
1150
AExpr::Function {
1151
input: ref inner_exprs,
1152
function: IRFunctionExpr::RLEID,
1153
options: _,
1154
} => {
1155
assert_eq!(inner_exprs.len(), 1);
1156
1157
let value_key = unique_column_name();
1158
1159
let input = build_select_stream_with_ctx(
1160
input,
1161
&[inner_exprs[0].with_alias(value_key.clone())],
1162
ctx,
1163
)?;
1164
let node_kind = PhysNodeKind::RleId(input);
1165
1166
let output_schema = Schema::from_iter([(value_key.clone(), IDX_DTYPE)]);
1167
let node_key = ctx
1168
.phys_sm
1169
.insert(PhysNode::new(Arc::new(output_schema), node_kind));
1170
input_streams.insert(PhysStream::first(node_key));
1171
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(value_key.clone())));
1172
},
1173
1174
AExpr::Function {
1175
input: ref inner_exprs,
1176
function: ref function @ (IRFunctionExpr::PeakMin | IRFunctionExpr::PeakMax),
1177
options: _,
1178
} => {
1179
assert_eq!(inner_exprs.len(), 1);
1180
1181
let value_key = unique_column_name();
1182
1183
let input = build_select_stream_with_ctx(
1184
input,
1185
&[inner_exprs[0].with_alias(value_key.clone())],
1186
ctx,
1187
)?;
1188
let is_peak_max = matches!(function, IRFunctionExpr::PeakMax);
1189
let node_kind = PhysNodeKind::PeakMinMax { input, is_peak_max };
1190
1191
let output_schema = Schema::from_iter([(value_key.clone(), DataType::Boolean)]);
1192
let node_key = ctx
1193
.phys_sm
1194
.insert(PhysNode::new(Arc::new(output_schema), node_kind));
1195
input_streams.insert(PhysStream::first(node_key));
1196
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(value_key.clone())));
1197
},
1198
1199
// pl.row_index() maps to this.
1200
#[cfg(feature = "range")]
1201
AExpr::Function {
1202
input: ref inner_exprs,
1203
function: IRFunctionExpr::Range(IRRangeFunction::IntRange { step: 1, dtype }),
1204
options: _,
1205
} if {
1206
let start_is_zero = match ctx.expr_arena.get(inner_exprs[0].node()) {
1207
AExpr::Literal(lit) => lit.extract_usize().ok() == Some(0),
1208
_ => false,
1209
};
1210
let stop_is_len = matches!(ctx.expr_arena.get(inner_exprs[1].node()), AExpr::Len);
1211
1212
dtype == DataType::IDX_DTYPE && start_is_zero && stop_is_len
1213
} =>
1214
{
1215
let out_name = unique_column_name();
1216
let row_idx_col_aexpr = ctx.expr_arena.add(AExpr::Column(out_name.clone()));
1217
let row_idx_col_expr_ir =
1218
ExprIR::new(row_idx_col_aexpr, OutputName::ColumnLhs(out_name.clone()));
1219
let row_idx_stream = build_select_stream_with_ctx(
1220
build_row_idx_stream(input, out_name, None, ctx.phys_sm),
1221
&[row_idx_col_expr_ir],
1222
ctx,
1223
)?;
1224
input_streams.insert(row_idx_stream);
1225
transformed_exprs.push(row_idx_col_aexpr);
1226
},
1227
1228
#[cfg(feature = "range")]
1229
AExpr::Function {
1230
input: ref inner_exprs,
1231
function: IRFunctionExpr::Range(IRRangeFunction::IntRange { step: 1, dtype }),
1232
options: _,
1233
} if {
1234
let start_is_zero = match ctx.expr_arena.get(inner_exprs[0].node()) {
1235
AExpr::Literal(lit) => lit.extract_usize().ok() == Some(0),
1236
_ => false,
1237
};
1238
let stop_is_count = matches!(
1239
ctx.expr_arena.get(inner_exprs[1].node()),
1240
AExpr::Agg(IRAggExpr::Count { .. })
1241
);
1242
1243
start_is_zero && stop_is_count
1244
} =>
1245
{
1246
let AExpr::Agg(IRAggExpr::Count {
1247
input: input_expr,
1248
include_nulls,
1249
}) = ctx.expr_arena.get(inner_exprs[1].node())
1250
else {
1251
unreachable!();
1252
};
1253
let (input_expr, include_nulls) = (*input_expr, *include_nulls);
1254
1255
let out_name = unique_column_name();
1256
let mut row_idx_col_aexpr = ctx.expr_arena.add(AExpr::Column(out_name.clone()));
1257
if dtype != IDX_DTYPE {
1258
row_idx_col_aexpr = AExprBuilder::new_from_node(row_idx_col_aexpr)
1259
.cast(dtype, ctx.expr_arena)
1260
.node();
1261
}
1262
let row_idx_col_expr_ir =
1263
ExprIR::new(row_idx_col_aexpr, OutputName::ColumnLhs(out_name.clone()));
1264
1265
let mut input_expr = AExprBuilder::new_from_node(input_expr);
1266
if !include_nulls {
1267
input_expr = input_expr.drop_nulls(ctx.expr_arena);
1268
}
1269
let input_expr = input_expr.expr_ir_retain_name(ctx.expr_arena);
1270
1271
let row_idx_stream = build_select_stream_with_ctx(
1272
build_row_idx_stream(
1273
build_select_stream_with_ctx(input, &[input_expr], ctx)?,
1274
out_name,
1275
None,
1276
ctx.phys_sm,
1277
),
1278
&[row_idx_col_expr_ir],
1279
ctx,
1280
)?;
1281
input_streams.insert(row_idx_stream);
1282
transformed_exprs.push(row_idx_col_aexpr);
1283
},
1284
1285
// Lower arbitrary elementwise functions.
1286
ref node @ AExpr::Function {
1287
input: ref inner_exprs,
1288
options,
1289
..
1290
}
1291
| ref node @ AExpr::AnonymousFunction {
1292
input: ref inner_exprs,
1293
options,
1294
..
1295
} if options.is_elementwise() && !is_fake_elementwise_function(node) => {
1296
let inner_nodes = inner_exprs.iter().map(|expr| expr.node()).collect_vec();
1297
let (trans_input, trans_exprs) = lower_exprs_with_ctx(input, &inner_nodes, ctx)?;
1298
1299
// The function may be sensitive to names (e.g. pl.struct), so we restore them.
1300
let new_input = trans_exprs
1301
.into_iter()
1302
.zip(inner_exprs)
1303
.map(|(trans, orig)| {
1304
ExprIR::new(trans, OutputName::Alias(orig.output_name().clone()))
1305
})
1306
.collect_vec();
1307
let mut new_node = node.clone();
1308
match &mut new_node {
1309
AExpr::Function { input, .. } | AExpr::AnonymousFunction { input, .. } => {
1310
*input = new_input;
1311
},
1312
_ => unreachable!(),
1313
}
1314
input_streams.insert(trans_input);
1315
transformed_exprs.push(ctx.expr_arena.add(new_node));
1316
},
1317
1318
// Lower arbitrary row-separable functions.
1319
ref node @ AExpr::Function {
1320
input: ref inner_exprs,
1321
ref function,
1322
options,
1323
} if options.is_row_separable() && !is_fake_elementwise_function(node) => {
1324
// While these functions are streamable, they are not elementwise, so we
1325
// have to transform them to a select node.
1326
let inner_nodes = inner_exprs.iter().map(|x| x.node()).collect_vec();
1327
let (trans_input, trans_exprs) = lower_exprs_with_ctx(input, &inner_nodes, ctx)?;
1328
let out_name = unique_column_name();
1329
let trans_inner = ctx.expr_arena.add(AExpr::Function {
1330
input: trans_exprs
1331
.iter()
1332
.map(|node| ExprIR::from_node(*node, ctx.expr_arena))
1333
.collect(),
1334
function: function.clone(),
1335
options,
1336
});
1337
let func_expr = ExprIR::new(trans_inner, OutputName::Alias(out_name.clone()));
1338
let output_schema =
1339
schema_for_select(trans_input, std::slice::from_ref(&func_expr), ctx)?;
1340
let node_kind = PhysNodeKind::Select {
1341
input: trans_input,
1342
selectors: vec![func_expr.clone()],
1343
extend_original: false,
1344
};
1345
let node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, node_kind));
1346
input_streams.insert(PhysStream::first(node_key));
1347
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));
1348
},
1349
1350
AExpr::BinaryExpr { left, op, right } => {
1351
let (trans_input, trans_exprs) = lower_exprs_with_ctx(input, &[left, right], ctx)?;
1352
let bin_expr = AExpr::BinaryExpr {
1353
left: trans_exprs[0],
1354
op,
1355
right: trans_exprs[1],
1356
};
1357
input_streams.insert(trans_input);
1358
transformed_exprs.push(ctx.expr_arena.add(bin_expr));
1359
},
1360
AExpr::Eval {
1361
expr: inner,
1362
evaluation,
1363
variant,
1364
} => match variant {
1365
EvalVariant::List => {
1366
let (trans_input, trans_expr) = lower_exprs_with_ctx(input, &[inner], ctx)?;
1367
let eval_expr = AExpr::Eval {
1368
expr: trans_expr[0],
1369
evaluation,
1370
variant,
1371
};
1372
input_streams.insert(trans_input);
1373
transformed_exprs.push(ctx.expr_arena.add(eval_expr));
1374
},
1375
EvalVariant::Cumulative { .. } => {
1376
// Cumulative is not elementwise, this would need a special node.
1377
let out_name = unique_column_name();
1378
fallback_subset.push(ExprIR::new(expr, OutputName::Alias(out_name.clone())));
1379
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));
1380
},
1381
},
1382
AExpr::Ternary {
1383
predicate,
1384
truthy,
1385
falsy,
1386
} => {
1387
let (trans_input, trans_exprs) =
1388
lower_exprs_with_ctx(input, &[predicate, truthy, falsy], ctx)?;
1389
let tern_expr = AExpr::Ternary {
1390
predicate: trans_exprs[0],
1391
truthy: trans_exprs[1],
1392
falsy: trans_exprs[2],
1393
};
1394
input_streams.insert(trans_input);
1395
transformed_exprs.push(ctx.expr_arena.add(tern_expr));
1396
},
1397
AExpr::Cast {
1398
expr: inner,
1399
dtype,
1400
options,
1401
} => {
1402
let (trans_input, trans_exprs) = lower_exprs_with_ctx(input, &[inner], ctx)?;
1403
input_streams.insert(trans_input);
1404
transformed_exprs.push(ctx.expr_arena.add(AExpr::Cast {
1405
expr: trans_exprs[0],
1406
dtype,
1407
options,
1408
}));
1409
},
1410
AExpr::Sort {
1411
expr: inner,
1412
options,
1413
} => {
1414
// As we'll refer to the sorted column twice, ensure the inner
1415
// expr is available as a column by selecting first.
1416
let sorted_name = unique_column_name();
1417
let inner_expr_ir = ExprIR::new(inner, OutputName::Alias(sorted_name.clone()));
1418
let select_stream =
1419
build_select_stream_with_ctx(input, std::slice::from_ref(&inner_expr_ir), ctx)?;
1420
let col_expr = ctx.expr_arena.add(AExpr::Column(sorted_name.clone()));
1421
let kind = PhysNodeKind::Sort {
1422
input: select_stream,
1423
by_column: vec![ExprIR::new(col_expr, OutputName::Alias(sorted_name))],
1424
slice: None,
1425
sort_options: (&options).into(),
1426
};
1427
let output_schema = ctx.phys_sm[select_stream.node].output_schema.clone();
1428
let node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, kind));
1429
input_streams.insert(PhysStream::first(node_key));
1430
transformed_exprs.push(col_expr);
1431
},
1432
1433
AExpr::SortBy {
1434
expr: inner,
1435
by,
1436
sort_options,
1437
} => {
1438
// Select our inputs (if we don't do this we'll waste time sorting irrelevant columns).
1439
let sorted_name = unique_column_name();
1440
let by_names = by.iter().map(|_| unique_column_name()).collect_vec();
1441
let all_inner_expr_irs = [(&sorted_name, inner)]
1442
.into_iter()
1443
.chain(by_names.iter().zip(by.iter().copied()))
1444
.map(|(name, inner)| ExprIR::new(inner, OutputName::Alias(name.clone())))
1445
.collect_vec();
1446
let select_stream = build_select_stream_with_ctx(input, &all_inner_expr_irs, ctx)?;
1447
1448
// Sort the inputs.
1449
let kind = PhysNodeKind::Sort {
1450
input: select_stream,
1451
by_column: by_names
1452
.into_iter()
1453
.map(|name| {
1454
ExprIR::new(
1455
ctx.expr_arena.add(AExpr::Column(name.clone())),
1456
OutputName::Alias(name),
1457
)
1458
})
1459
.collect(),
1460
slice: None,
1461
sort_options,
1462
};
1463
let output_schema = ctx.phys_sm[select_stream.node].output_schema.clone();
1464
let sort_node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, kind));
1465
1466
let sorted_col_expr = ctx.expr_arena.add(AExpr::Column(sorted_name.clone()));
1467
input_streams.insert(PhysStream::first(sort_node_key));
1468
transformed_exprs.push(sorted_col_expr);
1469
},
1470
1471
#[cfg(feature = "top_k")]
1472
AExpr::Function {
1473
input: inner_exprs,
1474
function: function @ (IRFunctionExpr::TopK { .. } | IRFunctionExpr::TopKBy { .. }),
1475
options: _,
1476
} => {
1477
// Select our inputs.
1478
let by = &inner_exprs[2..];
1479
let out_name = unique_column_name();
1480
let by_names = by.iter().map(|_| unique_column_name()).collect_vec();
1481
let data_irs = [(&out_name, &inner_exprs[0])]
1482
.into_iter()
1483
.chain(by_names.iter().zip(by.iter()))
1484
.map(|(name, inner)| ExprIR::new(inner.node(), OutputName::Alias(name.clone())))
1485
.collect_vec();
1486
let data_stream = build_select_stream_with_ctx(input, &data_irs, ctx)?;
1487
let k_stream = build_select_stream_with_ctx(input, &inner_exprs[1..2], ctx)?;
1488
1489
// Create 'by' column expressions.
1490
let out_col_node = ctx.expr_arena.add(AExpr::Column(out_name.clone()));
1491
let out_col_expr = ExprIR::new(out_col_node, OutputName::Alias(out_name));
1492
let (by_column, reverse) = match function {
1493
IRFunctionExpr::TopK { descending } => {
1494
(vec![out_col_expr.clone()], vec![descending])
1495
},
1496
IRFunctionExpr::TopKBy {
1497
descending: reverse,
1498
} => {
1499
let by_column = by_names
1500
.into_iter()
1501
.map(|name| {
1502
ExprIR::new(
1503
ctx.expr_arena.add(AExpr::Column(name.clone())),
1504
OutputName::Alias(name),
1505
)
1506
})
1507
.collect();
1508
(by_column, reverse.clone())
1509
},
1510
_ => unreachable!(),
1511
};
1512
1513
let kind = PhysNodeKind::TopK {
1514
input: data_stream,
1515
k: k_stream,
1516
nulls_last: vec![true; by_column.len()],
1517
reverse,
1518
by_column,
1519
};
1520
let output_schema = ctx.phys_sm[data_stream.node].output_schema.clone();
1521
let node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, kind));
1522
input_streams.insert(PhysStream::first(node_key));
1523
transformed_exprs.push(out_col_node);
1524
},
1525
1526
AExpr::Filter { input: inner, by } => {
1527
// Select our inputs (if we don't do this we'll waste time filtering irrelevant columns).
1528
let out_name = unique_column_name();
1529
let by_name = unique_column_name();
1530
let inner_expr_ir = ExprIR::new(inner, OutputName::Alias(out_name.clone()));
1531
let by_expr_ir = ExprIR::new(by, OutputName::Alias(by_name.clone()));
1532
let select_stream =
1533
build_select_stream_with_ctx(input, &[inner_expr_ir, by_expr_ir], ctx)?;
1534
1535
// Add a filter node.
1536
let predicate = ExprIR::new(
1537
ctx.expr_arena.add(AExpr::Column(by_name.clone())),
1538
OutputName::Alias(by_name),
1539
);
1540
let kind = PhysNodeKind::Filter {
1541
input: select_stream,
1542
predicate,
1543
};
1544
let output_schema = ctx.phys_sm[select_stream.node].output_schema.clone();
1545
let filter_node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, kind));
1546
input_streams.insert(PhysStream::first(filter_node_key));
1547
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));
1548
},
1549
1550
// Aggregates.
1551
AExpr::Agg(agg) => match agg {
1552
// Change agg mutably so we can share the codepath for all of these.
1553
IRAggExpr::Min { .. }
1554
| IRAggExpr::Max { .. }
1555
| IRAggExpr::First(_)
1556
| IRAggExpr::Last(_)
1557
| IRAggExpr::Sum(_)
1558
| IRAggExpr::Mean(_)
1559
| IRAggExpr::Var { .. }
1560
| IRAggExpr::Std { .. }
1561
| IRAggExpr::Count { .. } => {
1562
let (trans_stream, trans_expr) = lower_unary_reduce_node(input, expr, ctx)?;
1563
input_streams.insert(trans_stream);
1564
transformed_exprs.push(trans_expr);
1565
},
1566
IRAggExpr::NUnique(inner) => {
1567
// Lower to no-aggregate group-by with unique name feeding into len aggregate.
1568
let tmp_name = unique_column_name();
1569
let (trans_input, trans_inner_exprs) =
1570
lower_exprs_with_ctx(input, &[inner], ctx)?;
1571
let group_by_key_expr =
1572
ExprIR::new(trans_inner_exprs[0], OutputName::Alias(tmp_name.clone()));
1573
let group_by_output_schema = schema_for_select(
1574
trans_input,
1575
std::slice::from_ref(&group_by_key_expr),
1576
ctx,
1577
)?;
1578
let group_by_stream = build_group_by_stream(
1579
trans_input,
1580
&[group_by_key_expr],
1581
&[],
1582
group_by_output_schema,
1583
false,
1584
Arc::new(GroupbyOptions::default()),
1585
None,
1586
ctx.expr_arena,
1587
ctx.phys_sm,
1588
ctx.cache,
1589
StreamingLowerIRContext::from(&*ctx),
1590
)?;
1591
1592
let len_node = ctx.expr_arena.add(AExpr::Len);
1593
let len_expr_ir = ExprIR::new(len_node, OutputName::Alias(tmp_name.clone()));
1594
let output_schema = schema_for_select(
1595
group_by_stream,
1596
std::slice::from_ref(&len_expr_ir),
1597
ctx,
1598
)?;
1599
let kind = PhysNodeKind::Reduce {
1600
input: group_by_stream,
1601
exprs: vec![len_expr_ir],
1602
};
1603
1604
let reduce_node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, kind));
1605
input_streams.insert(PhysStream::first(reduce_node_key));
1606
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(tmp_name)));
1607
},
1608
IRAggExpr::Median(_)
1609
| IRAggExpr::Implode(_)
1610
| IRAggExpr::Quantile { .. }
1611
| IRAggExpr::AggGroups(_) => {
1612
let out_name = unique_column_name();
1613
fallback_subset.push(ExprIR::new(expr, OutputName::Alias(out_name.clone())));
1614
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));
1615
},
1616
},
1617
1618
#[cfg(feature = "bitwise")]
1619
AExpr::Function {
1620
function:
1621
IRFunctionExpr::Bitwise(
1622
IRBitwiseFunction::And | IRBitwiseFunction::Or | IRBitwiseFunction::Xor,
1623
),
1624
..
1625
} => {
1626
let (trans_stream, trans_expr) = lower_unary_reduce_node(input, expr, ctx)?;
1627
input_streams.insert(trans_stream);
1628
transformed_exprs.push(trans_expr);
1629
},
1630
1631
AExpr::Function {
1632
function:
1633
IRFunctionExpr::Boolean(
1634
IRBooleanFunction::Any { .. } | IRBooleanFunction::All { .. },
1635
),
1636
..
1637
} => {
1638
let (trans_stream, trans_expr) = lower_unary_reduce_node(input, expr, ctx)?;
1639
input_streams.insert(trans_stream);
1640
transformed_exprs.push(trans_expr);
1641
},
1642
1643
// Length-based expressions.
1644
AExpr::Len => {
1645
let out_name = unique_column_name();
1646
let expr_ir = ExprIR::new(expr, OutputName::Alias(out_name.clone()));
1647
let output_schema = schema_for_select(input, std::slice::from_ref(&expr_ir), ctx)?;
1648
let kind = PhysNodeKind::Reduce {
1649
input,
1650
exprs: vec![expr_ir],
1651
};
1652
let reduce_node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, kind));
1653
input_streams.insert(PhysStream::first(reduce_node_key));
1654
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));
1655
},
1656
1657
AExpr::Function {
1658
input: ref inner_exprs,
1659
function: IRFunctionExpr::ArgWhere,
1660
options: _,
1661
} => {
1662
// pl.arg_where(expr)
1663
//
1664
// ->
1665
// .select(predicate_name = expr)
1666
// .with_row_index(out_name)
1667
// .filter(predicate_name)
1668
// .select(out_name)
1669
let out_name = unique_column_name();
1670
let predicate_name = unique_column_name();
1671
let predicate = build_select_stream_with_ctx(
1672
input,
1673
&[inner_exprs[0].with_alias(predicate_name.clone())],
1674
ctx,
1675
)?;
1676
let row_index =
1677
build_row_idx_stream(predicate, out_name.clone(), None, ctx.phys_sm);
1678
1679
let filter_stream = build_filter_stream(
1680
row_index,
1681
AExprBuilder::col(predicate_name.clone(), ctx.expr_arena)
1682
.expr_ir(predicate_name),
1683
ctx.expr_arena,
1684
ctx.phys_sm,
1685
ctx.cache,
1686
StreamingLowerIRContext {
1687
prepare_visualization: ctx.prepare_visualization,
1688
},
1689
)?;
1690
input_streams.insert(filter_stream);
1691
transformed_exprs.push(AExprBuilder::col(out_name.clone(), ctx.expr_arena).node());
1692
},
1693
1694
AExpr::Slice {
1695
input: inner,
1696
offset,
1697
length,
1698
} => {
1699
let out_name = unique_column_name();
1700
let inner_expr_ir = ExprIR::new(inner, OutputName::Alias(out_name.clone()));
1701
let offset_expr_ir = ExprIR::from_node(offset, ctx.expr_arena);
1702
let length_expr_ir = ExprIR::from_node(length, ctx.expr_arena);
1703
let input_stream = build_select_stream_with_ctx(input, &[inner_expr_ir], ctx)?;
1704
let offset_stream = build_select_stream_with_ctx(input, &[offset_expr_ir], ctx)?;
1705
let length_stream = build_select_stream_with_ctx(input, &[length_expr_ir], ctx)?;
1706
1707
let output_schema = ctx.phys_sm[input_stream.node].output_schema.clone();
1708
let kind = PhysNodeKind::DynamicSlice {
1709
input: input_stream,
1710
offset: offset_stream,
1711
length: length_stream,
1712
};
1713
let slice_node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, kind));
1714
input_streams.insert(PhysStream::first(slice_node_key));
1715
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));
1716
},
1717
1718
AExpr::Function {
1719
input: ref inner_exprs,
1720
function: func @ (IRFunctionExpr::Shift | IRFunctionExpr::ShiftAndFill),
1721
options: _,
1722
} => {
1723
let out_name = unique_column_name();
1724
let data_col_expr = inner_exprs[0].with_alias(out_name.clone());
1725
let trans_data_column = build_select_stream_with_ctx(input, &[data_col_expr], ctx)?;
1726
let trans_offset =
1727
build_select_stream_with_ctx(input, &[inner_exprs[1].clone()], ctx)?;
1728
1729
let trans_fill = if func == IRFunctionExpr::ShiftAndFill {
1730
let fill_expr = inner_exprs[2].with_alias(out_name.clone());
1731
Some(build_select_stream_with_ctx(input, &[fill_expr], ctx)?)
1732
} else {
1733
None
1734
};
1735
1736
let output_schema = ctx.phys_sm[trans_data_column.node].output_schema.clone();
1737
let node_key = ctx.phys_sm.insert(PhysNode::new(
1738
output_schema,
1739
PhysNodeKind::Shift {
1740
input: trans_data_column,
1741
offset: trans_offset,
1742
fill: trans_fill,
1743
},
1744
));
1745
1746
input_streams.insert(PhysStream::first(node_key));
1747
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));
1748
},
1749
1750
AExpr::AnonymousFunction { .. }
1751
| AExpr::Function { .. }
1752
| AExpr::Window { .. }
1753
| AExpr::Gather { .. } => {
1754
let out_name = unique_column_name();
1755
fallback_subset.push(ExprIR::new(expr, OutputName::Alias(out_name.clone())));
1756
transformed_exprs.push(ctx.expr_arena.add(AExpr::Column(out_name)));
1757
},
1758
}
1759
}
1760
1761
if !fallback_subset.is_empty() {
1762
let fallback_node = build_fallback_node_with_ctx(input, &fallback_subset, ctx)?;
1763
input_streams.insert(PhysStream::first(fallback_node));
1764
}
1765
1766
// Simplify the input nodes (also ensures the original input only occurs
1767
// once in the zip).
1768
input_streams = simplify_input_streams(input, input_streams, ctx)?;
1769
1770
if input_streams.len() == 1 {
1771
// No need for any multiplexing/zipping, can directly execute.
1772
return Ok((input_streams.into_iter().next().unwrap(), transformed_exprs));
1773
}
1774
1775
let zip_inputs = input_streams.into_iter().collect_vec();
1776
let output_schema = zip_inputs
1777
.iter()
1778
.flat_map(|stream| ctx.phys_sm[stream.node].output_schema.iter_fields())
1779
.collect();
1780
let zip_kind = PhysNodeKind::Zip {
1781
inputs: zip_inputs,
1782
null_extend: false,
1783
};
1784
let zip_node = ctx
1785
.phys_sm
1786
.insert(PhysNode::new(Arc::new(output_schema), zip_kind));
1787
1788
Ok((PhysStream::first(zip_node), transformed_exprs))
1789
}
1790
1791
/// Computes the schema that selecting the given expressions on the input schema
1792
/// would result in.
1793
pub fn compute_output_schema(
1794
input_schema: &Schema,
1795
exprs: &[ExprIR],
1796
expr_arena: &Arena<AExpr>,
1797
) -> PolarsResult<Arc<Schema>> {
1798
let output_schema: Schema = exprs
1799
.iter()
1800
.map(|e| {
1801
let name = e.output_name().clone();
1802
let dtype = e
1803
.dtype(input_schema, expr_arena)?
1804
.clone()
1805
.materialize_unknown(true)
1806
.unwrap();
1807
PolarsResult::Ok(Field::new(name, dtype))
1808
})
1809
.try_collect()?;
1810
Ok(Arc::new(output_schema))
1811
}
1812
1813
/// Computes the schema that selecting the given expressions on the input node
1814
/// would result in.
1815
fn schema_for_select(
1816
input: PhysStream,
1817
exprs: &[ExprIR],
1818
ctx: &mut LowerExprContext,
1819
) -> PolarsResult<Arc<Schema>> {
1820
let input_schema = &ctx.phys_sm[input.node].output_schema;
1821
compute_output_schema(input_schema, exprs, ctx.expr_arena)
1822
}
1823
1824
fn build_select_stream_with_ctx(
1825
input: PhysStream,
1826
exprs: &[ExprIR],
1827
ctx: &mut LowerExprContext,
1828
) -> PolarsResult<PhysStream> {
1829
if exprs
1830
.iter()
1831
.all(|e| is_input_independent_ctx(e.node(), ctx))
1832
{
1833
return Ok(PhysStream::first(build_input_independent_node_with_ctx(
1834
exprs, ctx,
1835
)?));
1836
}
1837
1838
// Are we only selecting simple columns, with the same name?
1839
let all_simple_columns: Option<Vec<PlSmallStr>> = exprs
1840
.iter()
1841
.map(|e| match ctx.expr_arena.get(e.node()) {
1842
AExpr::Column(name) if name == e.output_name() => Some(name.clone()),
1843
_ => None,
1844
})
1845
.collect();
1846
1847
if let Some(columns) = all_simple_columns {
1848
let input_schema = ctx.phys_sm[input.node].output_schema.clone();
1849
if input_schema.len() == columns.len()
1850
&& input_schema.iter_names().zip(&columns).all(|(l, r)| l == r)
1851
{
1852
// Input node already has the correct schema, just pass through.
1853
return Ok(input);
1854
}
1855
1856
let output_schema = Arc::new(input_schema.try_project(&columns)?);
1857
let node_kind = PhysNodeKind::SimpleProjection { input, columns };
1858
let node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, node_kind));
1859
return Ok(PhysStream::first(node_key));
1860
}
1861
1862
// Actual lowering is needed.
1863
let node_exprs = exprs.iter().map(|e| e.node()).collect_vec();
1864
let (transformed_input, transformed_exprs) = lower_exprs_with_ctx(input, &node_exprs, ctx)?;
1865
let trans_expr_irs = exprs
1866
.iter()
1867
.zip(transformed_exprs)
1868
.map(|(e, te)| ExprIR::new(te, OutputName::Alias(e.output_name().clone())))
1869
.collect_vec();
1870
let output_schema = schema_for_select(transformed_input, &trans_expr_irs, ctx)?;
1871
let node_kind = PhysNodeKind::Select {
1872
input: transformed_input,
1873
selectors: trans_expr_irs,
1874
extend_original: false,
1875
};
1876
let node_key = ctx.phys_sm.insert(PhysNode::new(output_schema, node_kind));
1877
Ok(PhysStream::first(node_key))
1878
}
1879
1880
/// Lowers an input node plus a set of expressions on that input node to an
1881
/// equivalent (input node, set of expressions) pair, ensuring that the new set
1882
/// of expressions can run on the streaming engine.
1883
///
1884
/// Ensures that if the input node is transformed it has unique column names.
1885
pub fn lower_exprs(
1886
input: PhysStream,
1887
exprs: &[ExprIR],
1888
expr_arena: &mut Arena<AExpr>,
1889
phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,
1890
expr_cache: &mut ExprCache,
1891
ctx: StreamingLowerIRContext,
1892
) -> PolarsResult<(PhysStream, Vec<ExprIR>)> {
1893
let mut ctx = LowerExprContext {
1894
expr_arena,
1895
phys_sm,
1896
cache: expr_cache,
1897
prepare_visualization: ctx.prepare_visualization,
1898
};
1899
let node_exprs = exprs.iter().map(|e| e.node()).collect_vec();
1900
let (transformed_input, transformed_exprs) =
1901
lower_exprs_with_ctx(input, &node_exprs, &mut ctx)?;
1902
let trans_expr_irs = exprs
1903
.iter()
1904
.zip(transformed_exprs)
1905
.map(|(e, te)| ExprIR::new(te, OutputName::Alias(e.output_name().clone())))
1906
.collect_vec();
1907
Ok((transformed_input, trans_expr_irs))
1908
}
1909
1910
/// Builds a new selection node given an input stream and the expressions to
1911
/// select for, if needed.
1912
pub fn build_select_stream(
1913
input: PhysStream,
1914
exprs: &[ExprIR],
1915
expr_arena: &mut Arena<AExpr>,
1916
phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,
1917
expr_cache: &mut ExprCache,
1918
ctx: StreamingLowerIRContext,
1919
) -> PolarsResult<PhysStream> {
1920
let mut ctx = LowerExprContext {
1921
expr_arena,
1922
phys_sm,
1923
cache: expr_cache,
1924
prepare_visualization: ctx.prepare_visualization,
1925
};
1926
build_select_stream_with_ctx(input, exprs, &mut ctx)
1927
}
1928
1929
/// Builds a hstack node given an input stream and the expressions to add.
1930
pub fn build_hstack_stream(
1931
input: PhysStream,
1932
exprs: &[ExprIR],
1933
expr_arena: &mut Arena<AExpr>,
1934
phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,
1935
expr_cache: &mut ExprCache,
1936
ctx: StreamingLowerIRContext,
1937
) -> PolarsResult<PhysStream> {
1938
let input_schema = &phys_sm[input.node].output_schema;
1939
if exprs
1940
.iter()
1941
.all(|e| is_elementwise_rec_cached(e.node(), expr_arena, expr_cache))
1942
{
1943
let mut output_schema = input_schema.as_ref().clone();
1944
for expr in exprs {
1945
output_schema.insert(
1946
expr.output_name().clone(),
1947
expr.dtype(input_schema, expr_arena)?.clone(),
1948
);
1949
}
1950
let output_schema = Arc::new(output_schema);
1951
1952
let selectors = exprs.to_vec();
1953
let kind = PhysNodeKind::Select {
1954
input,
1955
selectors,
1956
extend_original: true,
1957
};
1958
let node_key = phys_sm.insert(PhysNode {
1959
output_schema,
1960
kind,
1961
});
1962
1963
Ok(PhysStream::first(node_key))
1964
} else {
1965
// We already handled the all-streamable case above, so things get more complicated.
1966
// For simplicity we just do a normal select with all the original columns prepended.
1967
let mut selectors = PlIndexMap::with_capacity(input_schema.len() + exprs.len());
1968
for name in input_schema.iter_names() {
1969
let col_name = name.clone();
1970
let col_expr = expr_arena.add(AExpr::Column(col_name.clone()));
1971
selectors.insert(
1972
name.clone(),
1973
ExprIR::new(col_expr, OutputName::ColumnLhs(col_name)),
1974
);
1975
}
1976
for expr in exprs {
1977
selectors.insert(expr.output_name().clone(), expr.clone());
1978
}
1979
let selectors = selectors.into_values().collect_vec();
1980
build_length_preserving_select_stream(
1981
input, &selectors, expr_arena, phys_sm, expr_cache, ctx,
1982
)
1983
}
1984
}
1985
1986
/// Builds a new selection node given an input stream and the expressions to
1987
/// select for, if needed. Preserves the length of the input, like in with_columns.
1988
pub fn build_length_preserving_select_stream(
1989
input: PhysStream,
1990
exprs: &[ExprIR],
1991
expr_arena: &mut Arena<AExpr>,
1992
phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,
1993
expr_cache: &mut ExprCache,
1994
ctx: StreamingLowerIRContext,
1995
) -> PolarsResult<PhysStream> {
1996
let mut ctx = LowerExprContext {
1997
expr_arena,
1998
phys_sm,
1999
cache: expr_cache,
2000
prepare_visualization: ctx.prepare_visualization,
2001
};
2002
let already_length_preserving = exprs
2003
.iter()
2004
.any(|expr| is_length_preserving_ctx(expr.node(), &mut ctx));
2005
let input_schema = &ctx.phys_sm[input.node].output_schema;
2006
if exprs.is_empty() || input_schema.is_empty() || already_length_preserving {
2007
return build_select_stream_with_ctx(input, exprs, &mut ctx);
2008
}
2009
2010
// Hacky work-around: append an input column with a temporary name, but
2011
// remove it from the final selector. This should ensure scalars gets zipped
2012
// back to the input to broadcast them.
2013
let tmp_name = unique_column_name();
2014
let first_col = ctx.expr_arena.add(AExpr::Column(
2015
input_schema.iter_names_cloned().next().unwrap(),
2016
));
2017
let mut tmp_exprs = Vec::with_capacity(exprs.len() + 1);
2018
tmp_exprs.extend(exprs.iter().cloned());
2019
tmp_exprs.push(ExprIR::new(first_col, OutputName::Alias(tmp_name.clone())));
2020
2021
let out_stream = build_select_stream_with_ctx(input, &tmp_exprs, &mut ctx)?;
2022
let PhysNodeKind::Select { selectors, .. } = &mut ctx.phys_sm[out_stream.node].kind else {
2023
unreachable!()
2024
};
2025
assert!(selectors.pop().unwrap().output_name() == &tmp_name);
2026
let out_schema = Arc::make_mut(&mut phys_sm[out_stream.node].output_schema);
2027
out_schema.shift_remove(tmp_name.as_ref()).unwrap();
2028
Ok(out_stream)
2029
}
2030
2031