Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/physical_plan/lower_ir.rs
8448 views
1
use std::sync::Arc;
2
3
use parking_lot::Mutex;
4
use polars_core::frame::{DataFrame, UniqueKeepStrategy};
5
use polars_core::prelude::{DataType, PlHashMap, PlHashSet};
6
use polars_core::scalar::Scalar;
7
use polars_core::schema::Schema;
8
use polars_core::{SchemaExtPl, config};
9
use polars_error::PolarsResult;
10
use polars_expr::state::ExecutionState;
11
use polars_mem_engine::create_physical_plan;
12
use polars_ops::frame::JoinType;
13
use polars_plan::constants::get_literal_name;
14
use polars_plan::dsl::default_values::DefaultFieldValues;
15
use polars_plan::dsl::deletion::DeletionFilesList;
16
use polars_plan::dsl::{CallbackSinkType, ExtraColumnsPolicy, FileScanIR, SinkTypeIR};
17
use polars_plan::plans::expr_ir::{ExprIR, OutputName};
18
use polars_plan::plans::{
19
AExpr, FunctionIR, IR, IRAggExpr, LiteralValue, are_keys_sorted_any, is_sorted,
20
write_ir_non_recursive,
21
};
22
use polars_plan::prelude::*;
23
use polars_utils::arena::{Arena, Node};
24
use polars_utils::itertools::Itertools;
25
use polars_utils::pl_str::PlSmallStr;
26
#[cfg(feature = "parquet")]
27
use polars_utils::relaxed_cell::RelaxedCell;
28
use polars_utils::row_counter::RowCounter;
29
use polars_utils::slice_enum::Slice;
30
use polars_utils::unique_id::UniqueId;
31
use polars_utils::{IdxSize, format_pl_smallstr, unique_column_name};
32
use slotmap::SlotMap;
33
34
use super::lower_expr::build_hstack_stream;
35
use super::{PhysNode, PhysNodeKey, PhysNodeKind, PhysStream};
36
use crate::nodes::io_sources::multi_scan;
37
use crate::nodes::io_sources::multi_scan::components::forbid_extra_columns::ForbidExtraColumns;
38
use crate::nodes::io_sources::multi_scan::components::projection::builder::ProjectionBuilder;
39
use crate::nodes::io_sources::multi_scan::reader_interface::builder::FileReaderBuilder;
40
use crate::physical_plan::ZipBehavior;
41
use crate::physical_plan::lower_expr::{ExprCache, build_select_stream, lower_exprs};
42
use crate::physical_plan::lower_group_by::build_group_by_stream;
43
use crate::utils::late_materialized_df::LateMaterializedDataFrame;
44
45
/// Creates a new PhysStream which outputs a slice of the input stream.
46
pub fn build_slice_stream(
47
input: PhysStream,
48
offset: i64,
49
length: usize,
50
phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,
51
) -> PhysStream {
52
if offset >= 0 {
53
let offset = offset as usize;
54
PhysStream::first(phys_sm.insert(PhysNode::new(
55
phys_sm[input.node].output_schema.clone(),
56
PhysNodeKind::StreamingSlice {
57
input,
58
offset,
59
length,
60
},
61
)))
62
} else {
63
PhysStream::first(phys_sm.insert(PhysNode::new(
64
phys_sm[input.node].output_schema.clone(),
65
PhysNodeKind::NegativeSlice {
66
input,
67
offset,
68
length,
69
},
70
)))
71
}
72
}
73
74
/// Creates a new PhysStream which is filters the input stream.
75
pub(super) fn build_filter_stream(
76
input: PhysStream,
77
predicate: ExprIR,
78
expr_arena: &mut Arena<AExpr>,
79
phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,
80
expr_cache: &mut ExprCache,
81
ctx: StreamingLowerIRContext,
82
) -> PolarsResult<PhysStream> {
83
let predicate = predicate;
84
let cols_and_predicate = phys_sm[input.node]
85
.output_schema
86
.iter_names()
87
.cloned()
88
.map(|name| {
89
ExprIR::new(
90
expr_arena.add(AExpr::Column(name.clone())),
91
OutputName::ColumnLhs(name),
92
)
93
})
94
.chain([predicate])
95
.collect_vec();
96
let (trans_input, mut trans_cols_and_predicate) = lower_exprs(
97
input,
98
&cols_and_predicate,
99
expr_arena,
100
phys_sm,
101
expr_cache,
102
ctx,
103
)?;
104
105
let filter_schema = phys_sm[trans_input.node].output_schema.clone();
106
let filter = PhysNodeKind::Filter {
107
input: trans_input,
108
predicate: trans_cols_and_predicate.last().unwrap().clone(),
109
};
110
111
let post_filter = phys_sm.insert(PhysNode::new(filter_schema, filter));
112
trans_cols_and_predicate.pop(); // Remove predicate.
113
build_select_stream(
114
PhysStream::first(post_filter),
115
&trans_cols_and_predicate,
116
expr_arena,
117
phys_sm,
118
expr_cache,
119
ctx,
120
)
121
}
122
123
/// Creates a new PhysStream with row index attached with the given name.
124
pub fn build_row_idx_stream(
125
input: PhysStream,
126
name: PlSmallStr,
127
offset: Option<IdxSize>,
128
phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,
129
) -> PhysStream {
130
let input_schema = &phys_sm[input.node].output_schema;
131
let mut output_schema = (**input_schema).clone();
132
output_schema
133
.insert_at_index(0, name.clone(), DataType::IDX_DTYPE)
134
.unwrap();
135
let kind = PhysNodeKind::WithRowIndex {
136
input,
137
name,
138
offset,
139
};
140
let with_row_idx_node_key = phys_sm.insert(PhysNode::new(Arc::new(output_schema), kind));
141
PhysStream::first(with_row_idx_node_key)
142
}
143
144
#[derive(Debug, Clone, Copy)]
145
pub struct StreamingLowerIRContext {
146
pub prepare_visualization: bool,
147
}
148
149
#[recursive::recursive]
150
#[allow(clippy::too_many_arguments)]
151
pub fn lower_ir(
152
node: Node,
153
ir_arena: &mut Arena<IR>,
154
expr_arena: &mut Arena<AExpr>,
155
phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,
156
schema_cache: &mut PlHashMap<Node, Arc<Schema>>,
157
expr_cache: &mut ExprCache,
158
cache_nodes: &mut PlHashMap<UniqueId, PhysStream>,
159
ctx: StreamingLowerIRContext,
160
mut disable_morsel_split: Option<bool>,
161
) -> PolarsResult<PhysStream> {
162
// Helper macro to simplify recursive calls.
163
macro_rules! lower_ir {
164
($input:expr) => {{
165
// Disable for remaining execution graph if it wasn't explicitly set
166
// by the current IR.
167
disable_morsel_split.get_or_insert(false);
168
169
lower_ir(
170
$input,
171
ir_arena,
172
expr_arena,
173
phys_sm,
174
schema_cache,
175
expr_cache,
176
cache_nodes,
177
ctx,
178
disable_morsel_split,
179
)
180
}};
181
}
182
183
// Require the code below to explicitly set this to `true`
184
if disable_morsel_split == Some(true) {
185
disable_morsel_split.take();
186
}
187
188
let ir_node = ir_arena.get(node);
189
let output_schema = IR::schema_with_cache(node, ir_arena, schema_cache);
190
let node_kind = match ir_node {
191
IR::SimpleProjection { input, columns } => {
192
disable_morsel_split.get_or_insert(true);
193
let columns = columns.iter_names_cloned().collect::<Vec<_>>();
194
let phys_input = lower_ir!(*input)?;
195
PhysNodeKind::SimpleProjection {
196
input: phys_input,
197
columns,
198
}
199
},
200
201
IR::Select { input, expr, .. } => {
202
let selectors = expr.clone();
203
204
if selectors
205
.iter()
206
.all(|e| matches!(expr_arena.get(e.node()), AExpr::Len | AExpr::Column(_)))
207
{
208
disable_morsel_split.get_or_insert(true);
209
}
210
211
let phys_input = lower_ir!(*input)?;
212
return build_select_stream(
213
phys_input, &selectors, expr_arena, phys_sm, expr_cache, ctx,
214
);
215
},
216
217
IR::HStack { input, exprs, .. } => {
218
let exprs = exprs.to_vec();
219
let phys_input = lower_ir!(*input)?;
220
return build_hstack_stream(phys_input, &exprs, expr_arena, phys_sm, expr_cache, ctx);
221
},
222
223
IR::Slice { input, offset, len } => {
224
let offset = *offset;
225
let len = *len as usize;
226
let phys_input = lower_ir!(*input)?;
227
return Ok(build_slice_stream(phys_input, offset, len, phys_sm));
228
},
229
230
IR::Filter { input, predicate } => {
231
let predicate = predicate.clone();
232
let phys_input = lower_ir!(*input)?;
233
return build_filter_stream(
234
phys_input, predicate, expr_arena, phys_sm, expr_cache, ctx,
235
);
236
},
237
238
IR::DataFrameScan {
239
df,
240
output_schema: projection,
241
schema,
242
..
243
} => {
244
let schema = schema.clone(); // This is initially the schema of df, but can change with the projection.
245
let mut node_kind = PhysNodeKind::InMemorySource {
246
df: df.clone(),
247
disable_morsel_split: disable_morsel_split.unwrap_or(true),
248
};
249
250
// Do we need to apply a projection?
251
if let Some(projection_schema) = projection {
252
if projection_schema.len() != schema.len()
253
|| projection_schema
254
.iter_names()
255
.zip(schema.iter_names())
256
.any(|(l, r)| l != r)
257
{
258
let phys_input = phys_sm.insert(PhysNode::new(schema, node_kind));
259
node_kind = PhysNodeKind::SimpleProjection {
260
input: PhysStream::first(phys_input),
261
columns: projection_schema.iter_names_cloned().collect::<Vec<_>>(),
262
};
263
}
264
}
265
266
node_kind
267
},
268
269
IR::Sink { input, payload } => match payload {
270
SinkTypeIR::Memory => {
271
disable_morsel_split.get_or_insert(true);
272
let phys_input = lower_ir!(*input)?;
273
PhysNodeKind::InMemorySink { input: phys_input }
274
},
275
SinkTypeIR::Callback(CallbackSinkType {
276
function,
277
maintain_order,
278
chunk_size,
279
}) => {
280
let function = function.clone();
281
let maintain_order = *maintain_order;
282
let chunk_size = *chunk_size;
283
let phys_input = lower_ir!(*input)?;
284
PhysNodeKind::CallbackSink {
285
input: phys_input,
286
function,
287
maintain_order,
288
chunk_size,
289
}
290
},
291
292
SinkTypeIR::File(options) => {
293
let options = options.clone();
294
let input = lower_ir!(*input)?;
295
PhysNodeKind::FileSink { input, options }
296
},
297
298
SinkTypeIR::Partitioned(options) => {
299
let options = options.clone();
300
let input = lower_ir!(*input)?;
301
PhysNodeKind::PartitionedSink { input, options }
302
},
303
},
304
305
IR::SinkMultiple { inputs } => {
306
disable_morsel_split.get_or_insert(true);
307
let mut sinks = Vec::with_capacity(inputs.len());
308
for input in inputs.clone() {
309
let phys_node_stream = match ir_arena.get(input) {
310
IR::Sink { .. } => lower_ir!(input)?,
311
_ => lower_ir!(ir_arena.add(IR::Sink {
312
input,
313
payload: SinkTypeIR::Memory
314
}))?,
315
};
316
sinks.push(phys_node_stream.node);
317
}
318
PhysNodeKind::SinkMultiple { sinks }
319
},
320
321
#[cfg(feature = "merge_sorted")]
322
IR::MergeSorted {
323
input_left,
324
input_right,
325
key,
326
} => {
327
let input_left = *input_left;
328
let input_right = *input_right;
329
let key = key.clone();
330
331
let mut phys_left = lower_ir!(input_left)?;
332
let mut phys_right = lower_ir!(input_right)?;
333
334
let left_schema = &phys_sm[phys_left.node].output_schema;
335
let right_schema = &phys_sm[phys_right.node].output_schema;
336
337
left_schema.ensure_is_exact_match(right_schema).unwrap();
338
339
let key_dtype = left_schema.try_get(key.as_str())?.clone();
340
341
let key_name = unique_column_name();
342
use polars_plan::plans::{AExprBuilder, RowEncodingVariant};
343
344
// Add the key column as the last column for both inputs.
345
for s in [&mut phys_left, &mut phys_right] {
346
let key_dtype = key_dtype.clone();
347
let mut expr = AExprBuilder::col(key.clone(), expr_arena);
348
if key_dtype.is_nested() {
349
expr = AExprBuilder::row_encode(
350
vec![expr.expr_ir(key_name.clone())],
351
vec![key_dtype],
352
RowEncodingVariant::Ordered {
353
descending: None,
354
nulls_last: None,
355
broadcast_nulls: None,
356
},
357
expr_arena,
358
);
359
}
360
361
*s = build_hstack_stream(
362
*s,
363
&[expr.expr_ir(key_name.clone())],
364
expr_arena,
365
phys_sm,
366
expr_cache,
367
ctx,
368
)?;
369
}
370
371
PhysNodeKind::MergeSorted {
372
input_left: phys_left,
373
input_right: phys_right,
374
}
375
},
376
377
IR::MapFunction { input, function } => {
378
let function = function.clone();
379
let phys_input = lower_ir!(*input)?;
380
381
match function {
382
FunctionIR::RowIndex {
383
name,
384
offset,
385
schema: _,
386
} => PhysNodeKind::WithRowIndex {
387
input: phys_input,
388
name,
389
offset,
390
},
391
392
function if function.is_streamable() => {
393
let map = Arc::new(move |df| function.evaluate(df));
394
let format_str = ctx.prepare_visualization.then(|| {
395
let mut buffer = String::new();
396
write_ir_non_recursive(
397
&mut buffer,
398
ir_arena.get(node),
399
expr_arena,
400
phys_sm.get(phys_input.node).unwrap().output_schema.as_ref(),
401
0,
402
)
403
.unwrap();
404
buffer
405
});
406
PhysNodeKind::Map {
407
input: phys_input,
408
map,
409
format_str,
410
}
411
},
412
413
function => {
414
let format_str = ctx.prepare_visualization.then(|| {
415
let mut buffer = String::new();
416
write_ir_non_recursive(
417
&mut buffer,
418
ir_arena.get(node),
419
expr_arena,
420
phys_sm.get(phys_input.node).unwrap().output_schema.as_ref(),
421
0,
422
)
423
.unwrap();
424
buffer
425
});
426
let map = Arc::new(move |df| function.evaluate(df));
427
PhysNodeKind::InMemoryMap {
428
input: phys_input,
429
map,
430
format_str,
431
}
432
},
433
}
434
},
435
436
IR::Sort {
437
input,
438
by_column,
439
slice,
440
sort_options,
441
} => {
442
let slice = slice.clone();
443
let mut by_column = by_column.clone();
444
let mut sort_options = sort_options.clone();
445
let phys_input = lower_ir!(*input)?;
446
447
// See if we can insert a top k.
448
let mut limit = u64::MAX;
449
if let Some((0, l, _)) = slice {
450
limit = limit.min(l as u64);
451
}
452
#[allow(clippy::unnecessary_cast)]
453
if let Some(l) = sort_options.limit {
454
limit = limit.min(l as u64);
455
};
456
457
let mut stream = phys_input;
458
if limit < u64::MAX {
459
// If we need to maintain order augment with row index.
460
if sort_options.maintain_order {
461
let row_idx_name = unique_column_name();
462
stream = build_row_idx_stream(stream, row_idx_name.clone(), None, phys_sm);
463
464
// Add row index to sort columns.
465
let row_idx_node = expr_arena.add(AExpr::Column(row_idx_name.clone()));
466
by_column.push(ExprIR::new(
467
row_idx_node,
468
OutputName::ColumnLhs(row_idx_name),
469
));
470
sort_options.descending.push(false);
471
sort_options.nulls_last.push(true);
472
473
// No longer needed for the actual sort itself, handled by row index.
474
sort_options.maintain_order = false;
475
}
476
477
let k_node =
478
expr_arena.add(AExpr::Literal(LiteralValue::Scalar(Scalar::from(limit))));
479
let k_selector = ExprIR::from_node(k_node, expr_arena);
480
let k_output_schema = Schema::from_iter([(get_literal_name(), DataType::UInt64)]);
481
let k_node = phys_sm.insert(PhysNode::new(
482
Arc::new(k_output_schema),
483
PhysNodeKind::InputIndependentSelect {
484
selectors: vec![k_selector],
485
},
486
));
487
488
let mut trans_by_column;
489
(stream, trans_by_column) =
490
lower_exprs(stream, &by_column, expr_arena, phys_sm, expr_cache, ctx)?;
491
492
trans_by_column = trans_by_column
493
.into_iter()
494
.enumerate()
495
.map(|(i, expr)| expr.with_alias(format_pl_smallstr!("__POLARS_KEYCOL_{}", i)))
496
.collect_vec();
497
498
stream = PhysStream::first(phys_sm.insert(PhysNode {
499
output_schema: phys_sm[stream.node].output_schema.clone(),
500
kind: PhysNodeKind::TopK {
501
input: stream,
502
k: PhysStream::first(k_node),
503
by_column: trans_by_column,
504
reverse: sort_options.descending.iter().map(|x| !x).collect(),
505
nulls_last: sort_options.nulls_last.clone(),
506
dyn_pred: slice.as_ref().and_then(|t| t.2.clone()),
507
},
508
}));
509
}
510
511
stream = PhysStream::first(phys_sm.insert(PhysNode {
512
output_schema: phys_sm[stream.node].output_schema.clone(),
513
kind: PhysNodeKind::Sort {
514
input: stream,
515
by_column,
516
slice: slice.as_ref().map(|t| (t.0, t.1)),
517
sort_options,
518
},
519
}));
520
521
// Remove any temporary columns we may have added.
522
let exprs: Vec<_> = output_schema
523
.iter_names()
524
.map(|name| {
525
let node = expr_arena.add(AExpr::Column(name.clone()));
526
ExprIR::new(node, OutputName::ColumnLhs(name.clone()))
527
})
528
.collect();
529
stream = build_select_stream(stream, &exprs, expr_arena, phys_sm, expr_cache, ctx)?;
530
531
return Ok(stream);
532
},
533
IR::Union { inputs, options } => {
534
let options = *options;
535
536
let inputs = inputs
537
.clone() // Needed to borrow ir_arena mutably.
538
.into_iter()
539
.map(|input| lower_ir!(input))
540
.collect::<Result<_, _>>()?;
541
542
let kind = if options.maintain_order {
543
PhysNodeKind::OrderedUnion { inputs }
544
} else {
545
PhysNodeKind::UnorderedUnion { inputs }
546
};
547
548
let node = phys_sm.insert(PhysNode {
549
output_schema,
550
kind,
551
});
552
let mut stream = PhysStream::first(node);
553
554
if let Some((offset, length)) = options.slice {
555
stream = build_slice_stream(stream, offset, length, phys_sm);
556
}
557
558
return Ok(stream);
559
},
560
561
IR::HConcat {
562
inputs,
563
schema: _,
564
options,
565
} => {
566
let zip_behavior = if options.strict {
567
ZipBehavior::Strict
568
} else if options.broadcast_unit_length {
569
ZipBehavior::Broadcast
570
} else {
571
ZipBehavior::NullExtend
572
};
573
let inputs = inputs
574
.clone() // Needed to borrow ir_arena mutably.
575
.into_iter()
576
.map(|input| lower_ir!(input))
577
.collect::<Result<_, _>>()?;
578
PhysNodeKind::Zip {
579
inputs,
580
zip_behavior,
581
}
582
},
583
584
v @ IR::Scan { .. } => {
585
let IR::Scan {
586
sources: scan_sources,
587
file_info,
588
mut hive_parts,
589
output_schema: _,
590
scan_type,
591
predicate,
592
predicate_file_skip_applied,
593
unified_scan_args,
594
} = v.clone()
595
else {
596
unreachable!();
597
};
598
599
if (scan_sources.is_empty()
600
&& !matches!(scan_type.as_ref(), FileScanIR::Anonymous { .. }))
601
|| unified_scan_args
602
.pre_slice
603
.as_ref()
604
.is_some_and(|slice| slice.len() == 0)
605
{
606
if config::verbose() {
607
eprintln!("lower_ir: scan IR lowered as empty InMemorySource")
608
}
609
610
// If there are no sources, just provide an empty in-memory source with the right
611
// schema.
612
PhysNodeKind::InMemorySource {
613
df: Arc::new(DataFrame::empty_with_schema(output_schema.as_ref())),
614
disable_morsel_split: disable_morsel_split.unwrap_or(true),
615
}
616
} else if output_schema.is_empty()
617
&& let Some((physical_rows, deleted_rows)) = unified_scan_args.row_count
618
&& unified_scan_args.pre_slice.is_none()
619
&& predicate.is_none()
620
{
621
// Fast-count for scan_iceberg will hit here.
622
let row_counter = RowCounter::new(physical_rows, deleted_rows);
623
row_counter.num_rows_idxsize()?;
624
let num_rows = row_counter.num_rows()?;
625
626
if config::verbose() {
627
eprintln!(
628
"lower_ir: scan IR lowered as 0-width InMemorySource with height {} ({:?})",
629
num_rows, &row_counter
630
)
631
}
632
633
PhysNodeKind::InMemorySource {
634
df: Arc::new(DataFrame::empty_with_height(num_rows)),
635
disable_morsel_split: disable_morsel_split.unwrap_or(true),
636
}
637
} else {
638
let file_reader_builder: Arc<dyn FileReaderBuilder> = match &*scan_type {
639
#[cfg(feature = "parquet")]
640
FileScanIR::Parquet {
641
options,
642
metadata: first_metadata,
643
} => Arc::new(
644
crate::nodes::io_sources::parquet::builder::ParquetReaderBuilder {
645
options: Arc::new(options.clone()),
646
first_metadata: first_metadata.clone(),
647
prefetch_limit: RelaxedCell::new_usize(0),
648
prefetch_semaphore: std::sync::OnceLock::new(),
649
shared_prefetch_wait_group_slot: Default::default(),
650
io_metrics: std::sync::OnceLock::new(),
651
},
652
) as _,
653
654
#[cfg(feature = "ipc")]
655
FileScanIR::Ipc {
656
options,
657
metadata: first_metadata,
658
} => Arc::new(crate::nodes::io_sources::ipc::builder::IpcReaderBuilder {
659
options: Arc::new(options.clone()),
660
first_metadata: first_metadata.clone(),
661
prefetch_limit: RelaxedCell::new_usize(0),
662
prefetch_semaphore: std::sync::OnceLock::new(),
663
shared_prefetch_wait_group_slot: Default::default(),
664
io_metrics: std::sync::OnceLock::new(),
665
}) as _,
666
667
#[cfg(feature = "csv")]
668
FileScanIR::Csv { options } => Arc::new(Arc::clone(options)) as _,
669
670
#[cfg(feature = "json")]
671
FileScanIR::NDJson { options } => Arc::new(
672
crate::nodes::io_sources::ndjson::builder::NDJsonReaderBuilder {
673
options: Arc::new(options.clone()),
674
prefetch_limit: RelaxedCell::new_usize(0),
675
prefetch_semaphore: std::sync::OnceLock::new(),
676
shared_prefetch_wait_group_slot: Default::default(),
677
io_metrics: std::sync::OnceLock::new(),
678
},
679
) as _,
680
// Arc::new(options.clone()) as _,
681
#[cfg(feature = "python")]
682
FileScanIR::PythonDataset {
683
dataset_object: _,
684
cached_ir,
685
} => {
686
use crate::physical_plan::io::python_dataset::python_dataset_scan_to_reader_builder;
687
let guard = cached_ir.lock().unwrap();
688
689
let expanded_scan = guard
690
.as_ref()
691
.expect("python dataset should be resolved")
692
.python_scan()
693
.expect("should be python scan");
694
695
python_dataset_scan_to_reader_builder(expanded_scan)
696
},
697
698
#[cfg(feature = "scan_lines")]
699
FileScanIR::Lines { name: _ } => {
700
Arc::new(crate::nodes::io_sources::lines::LineReaderBuilder {
701
prefetch_limit: RelaxedCell::new_usize(0),
702
prefetch_semaphore: std::sync::OnceLock::new(),
703
shared_prefetch_wait_group_slot: Default::default(),
704
io_metrics: std::sync::OnceLock::new(),
705
}) as _
706
},
707
708
FileScanIR::Anonymous { .. } => todo!("unimplemented: AnonymousScan"),
709
};
710
711
{
712
let cloud_options = unified_scan_args.cloud_options.clone().map(Arc::new);
713
let file_schema = file_info.schema;
714
715
let (projected_schema, file_schema) =
716
multi_scan::functions::resolve_projections::resolve_projections(
717
&output_schema,
718
&file_schema,
719
&mut hive_parts,
720
unified_scan_args
721
.row_index
722
.as_ref()
723
.map(|ri| ri.name.as_str()),
724
unified_scan_args
725
.include_file_paths
726
.as_ref()
727
.map(|x| x.as_str()),
728
);
729
730
let file_projection_builder = ProjectionBuilder::new(
731
projected_schema,
732
unified_scan_args.column_mapping.as_ref(),
733
unified_scan_args
734
.default_values
735
.filter(|DefaultFieldValues::Iceberg(v)| !v.is_empty())
736
.map(|DefaultFieldValues::Iceberg(v)| v),
737
);
738
739
// TODO: We ignore the parameter for some scan types to maintain old behavior,
740
// as they currently don't expose an API for it to be configured.
741
let extra_columns_policy = match &*scan_type {
742
#[cfg(feature = "parquet")]
743
FileScanIR::Parquet { .. } => unified_scan_args.extra_columns_policy,
744
745
_ => {
746
if unified_scan_args.projection.is_some() {
747
ExtraColumnsPolicy::Ignore
748
} else {
749
ExtraColumnsPolicy::Raise
750
}
751
},
752
};
753
754
let forbid_extra_columns = ForbidExtraColumns::opt_new(
755
&extra_columns_policy,
756
&file_schema,
757
unified_scan_args.column_mapping.as_ref(),
758
);
759
760
let pre_slice = unified_scan_args.pre_slice.clone();
761
let disable_morsel_split = disable_morsel_split.unwrap_or(true);
762
763
let mut multi_scan_node = PhysNodeKind::MultiScan {
764
scan_sources,
765
file_reader_builder,
766
cloud_options,
767
file_projection_builder,
768
output_schema: output_schema.clone(),
769
row_index: None,
770
pre_slice,
771
predicate,
772
predicate_file_skip_applied,
773
hive_parts,
774
cast_columns_policy: unified_scan_args.cast_columns_policy,
775
missing_columns_policy: unified_scan_args.missing_columns_policy,
776
forbid_extra_columns,
777
include_file_paths: unified_scan_args.include_file_paths,
778
// Set to None if empty for performance.
779
deletion_files: DeletionFilesList::filter_empty(
780
unified_scan_args.deletion_files,
781
),
782
table_statistics: unified_scan_args.table_statistics,
783
file_schema,
784
disable_morsel_split,
785
};
786
787
let PhysNodeKind::MultiScan {
788
output_schema: multi_scan_output_schema,
789
row_index: row_index_to_multiscan,
790
pre_slice: pre_slice_to_multiscan,
791
predicate: predicate_to_multiscan,
792
..
793
} = &mut multi_scan_node
794
else {
795
unreachable!()
796
};
797
798
let mut row_index_post = unified_scan_args.row_index;
799
800
// * If a predicate was pushed then we always push row index
801
if predicate_to_multiscan.is_some()
802
|| matches!(pre_slice_to_multiscan, Some(Slice::Negative { .. }))
803
{
804
*row_index_to_multiscan = row_index_post.take();
805
}
806
807
// TODO
808
// Projection pushdown could change the row index column position. Ideally it shouldn't,
809
// and instead just put a projection on top of the scan node in the IR. But for now
810
// we do that step here.
811
let mut schema_after_row_index_post = multi_scan_output_schema.clone();
812
let mut reorder_after_row_index_post = false;
813
814
// Remove row index from multiscan schema if not pushed.
815
if let Some(ri) = row_index_post.as_ref() {
816
let row_index_post_position =
817
multi_scan_output_schema.index_of(&ri.name).unwrap();
818
let (_, dtype) = Arc::make_mut(multi_scan_output_schema)
819
.shift_remove_index(row_index_post_position)
820
.unwrap();
821
822
if row_index_post_position != 0 {
823
reorder_after_row_index_post = true;
824
let mut schema =
825
Schema::with_capacity(multi_scan_output_schema.len() + 1);
826
schema.extend([(ri.name.clone(), dtype)]);
827
schema.extend(
828
multi_scan_output_schema
829
.iter()
830
.map(|(k, v)| (k.clone(), v.clone())),
831
);
832
schema_after_row_index_post = Arc::new(schema);
833
}
834
}
835
836
// If we have no predicate and no slice or positive slice, we can reorder the row index to after
837
// the slice by adjusting the offset. This can remove a serial synchronization step in multiscan
838
// and allow the reader to still skip rows.
839
let row_index_post_after_slice = (|| {
840
let mut row_index = row_index_post.take()?;
841
842
let positive_offset = match pre_slice_to_multiscan {
843
Some(Slice::Positive { offset, .. }) => Some(*offset),
844
None => Some(0),
845
Some(Slice::Negative { .. }) => unreachable!(),
846
}?;
847
848
row_index.offset = row_index.offset.saturating_add(
849
IdxSize::try_from(positive_offset).unwrap_or(IdxSize::MAX),
850
);
851
852
Some(row_index)
853
})();
854
855
let mut stream = {
856
let node_key = phys_sm.insert(PhysNode::new(
857
multi_scan_output_schema.clone(),
858
multi_scan_node,
859
));
860
PhysStream::first(node_key)
861
};
862
863
if let Some(ri) = row_index_post {
864
let node = PhysNodeKind::WithRowIndex {
865
input: stream,
866
name: ri.name,
867
offset: Some(ri.offset),
868
};
869
870
let node_key = phys_sm.insert(PhysNode {
871
output_schema: schema_after_row_index_post.clone(),
872
kind: node,
873
});
874
875
stream = PhysStream::first(node_key);
876
877
if reorder_after_row_index_post {
878
let node = PhysNodeKind::SimpleProjection {
879
input: stream,
880
columns: output_schema.iter_names_cloned().collect(),
881
};
882
883
let node_key = phys_sm.insert(PhysNode {
884
output_schema: output_schema.clone(),
885
kind: node,
886
});
887
888
stream = PhysStream::first(node_key);
889
}
890
}
891
892
if let Some(ri) = row_index_post_after_slice {
893
let node = PhysNodeKind::WithRowIndex {
894
input: stream,
895
name: ri.name,
896
offset: Some(ri.offset),
897
};
898
899
let node_key = phys_sm.insert(PhysNode {
900
output_schema: schema_after_row_index_post,
901
kind: node,
902
});
903
904
stream = PhysStream::first(node_key);
905
906
if reorder_after_row_index_post {
907
let node = PhysNodeKind::SimpleProjection {
908
input: stream,
909
columns: output_schema.iter_names_cloned().collect(),
910
};
911
912
let node_key = phys_sm.insert(PhysNode {
913
output_schema: output_schema.clone(),
914
kind: node,
915
});
916
917
stream = PhysStream::first(node_key);
918
}
919
}
920
921
return Ok(stream);
922
}
923
}
924
},
925
926
#[cfg(feature = "python")]
927
v @ IR::PythonScan { options } => {
928
use polars_plan::dsl::python_dsl::PythonScanSource;
929
930
match options.python_source {
931
PythonScanSource::Pyarrow => {
932
// Fallback to in-memory engine.
933
let input = PhysNodeKind::InMemorySource {
934
df: Arc::new(DataFrame::default()),
935
disable_morsel_split: disable_morsel_split.unwrap_or(true),
936
};
937
let input_key =
938
phys_sm.insert(PhysNode::new(Arc::new(Schema::default()), input));
939
let phys_input = PhysStream::first(input_key);
940
941
let lmdf = Arc::new(LateMaterializedDataFrame::default());
942
let mut lp_arena = Arena::default();
943
let scan_lp_node = lp_arena.add(v.clone());
944
945
let executor = Mutex::new(create_physical_plan(
946
scan_lp_node,
947
&mut lp_arena,
948
expr_arena,
949
None,
950
)?);
951
952
let format_str = ctx.prepare_visualization.then(|| {
953
let mut buffer = String::new();
954
write_ir_non_recursive(
955
&mut buffer,
956
ir_arena.get(node),
957
expr_arena,
958
phys_sm.get(phys_input.node).unwrap().output_schema.as_ref(),
959
0,
960
)
961
.unwrap();
962
buffer
963
});
964
965
PhysNodeKind::InMemoryMap {
966
input: phys_input,
967
map: Arc::new(move |df| {
968
lmdf.set_materialized_dataframe(df);
969
let mut state = ExecutionState::new();
970
executor.lock().execute(&mut state)
971
}),
972
format_str,
973
}
974
},
975
_ => PhysNodeKind::PythonScan {
976
options: options.clone(),
977
},
978
}
979
},
980
IR::Cache { input, id } => {
981
let id = *id;
982
if let Some(cached) = cache_nodes.get(&id) {
983
return Ok(*cached);
984
}
985
986
let phys_input = lower_ir!(*input)?;
987
cache_nodes.insert(id, phys_input);
988
return Ok(phys_input);
989
},
990
991
IR::GroupBy {
992
input,
993
keys,
994
aggs,
995
schema: output_schema,
996
apply,
997
maintain_order,
998
options,
999
} => {
1000
let input = *input;
1001
let keys = keys.clone();
1002
let aggs = aggs.clone();
1003
let output_schema = output_schema.clone();
1004
let apply = apply.clone();
1005
let maintain_order = *maintain_order;
1006
let options = options.clone();
1007
1008
let phys_input = lower_ir!(input)?;
1009
1010
let input_schema = &phys_sm[phys_input.node].output_schema;
1011
let are_keys_sorted = are_keys_sorted_any(
1012
is_sorted(input, ir_arena, expr_arena).as_ref(),
1013
&keys,
1014
expr_arena,
1015
input_schema,
1016
)
1017
.is_some();
1018
1019
return build_group_by_stream(
1020
phys_input,
1021
&keys,
1022
&aggs,
1023
output_schema,
1024
maintain_order,
1025
options,
1026
apply,
1027
expr_arena,
1028
phys_sm,
1029
expr_cache,
1030
ctx,
1031
are_keys_sorted,
1032
);
1033
},
1034
IR::Join {
1035
input_left,
1036
input_right,
1037
schema: _,
1038
left_on,
1039
right_on,
1040
options,
1041
} => {
1042
let input_left = *input_left;
1043
let input_right = *input_right;
1044
let input_left_schema = IR::schema_with_cache(input_left, ir_arena, schema_cache);
1045
let input_right_schema = IR::schema_with_cache(input_right, ir_arena, schema_cache);
1046
let left_on = left_on.clone();
1047
let right_on = right_on.clone();
1048
let get_expr_name = |e: &ExprIR| e.output_name().clone();
1049
let left_on_names = left_on.iter().map(get_expr_name).collect_vec();
1050
let right_on_names = right_on.iter().map(get_expr_name).collect_vec();
1051
let args = options.args.clone();
1052
let options = options.options.clone();
1053
let left_df_sortedness = is_sorted(input_left, ir_arena, expr_arena);
1054
let left_on_sorted = are_keys_sorted_any(
1055
left_df_sortedness.as_ref(),
1056
&left_on,
1057
expr_arena,
1058
&input_left_schema,
1059
);
1060
let right_df_sortedness = is_sorted(input_right, ir_arena, expr_arena);
1061
let right_on_sorted = are_keys_sorted_any(
1062
right_df_sortedness.as_ref(),
1063
&right_on,
1064
expr_arena,
1065
&input_right_schema,
1066
);
1067
let join_keys_sorted_together =
1068
Option::zip(left_on_sorted.as_ref(), right_on_sorted.as_ref())
1069
.is_some_and(|(ls, rs)| ls == rs);
1070
let use_streaming_merge_join = args.how.is_equi() && join_keys_sorted_together;
1071
#[cfg(feature = "asof_join")]
1072
let use_streaming_asof_join = if let JoinType::AsOf(ref asof_options) = args.how {
1073
// Grouped asof-join is not yet supported in the streaming engine.
1074
asof_options.left_by.is_none() && asof_options.right_by.is_none()
1075
} else {
1076
false
1077
};
1078
#[cfg(not(feature = "asof_join"))]
1079
let use_streaming_asof_join = false;
1080
1081
let phys_left = lower_ir!(input_left)?;
1082
let phys_right = lower_ir!(input_right)?;
1083
1084
if (args.how.is_equi() || args.how.is_semi_anti() || use_streaming_asof_join)
1085
&& !args.validation.needs_checks()
1086
{
1087
// When lowering the expressions for the keys we need to ensure we keep around the
1088
// payload columns, otherwise the input nodes can get replaced by input-independent
1089
// nodes since the lowering code does not see we access any non-literal expressions.
1090
// So we add dummy expressions before lowering and remove them afterwards.
1091
1092
let mut aug_left_on = left_on.clone();
1093
for name in phys_sm[phys_left.node].output_schema.iter_names() {
1094
let col_expr = expr_arena.add(AExpr::Column(name.clone()));
1095
aug_left_on.push(ExprIR::new(col_expr, OutputName::ColumnLhs(name.clone())));
1096
}
1097
let mut aug_right_on = right_on.clone();
1098
for name in phys_sm[phys_right.node].output_schema.iter_names() {
1099
let col_expr = expr_arena.add(AExpr::Column(name.clone()));
1100
aug_right_on.push(ExprIR::new(col_expr, OutputName::ColumnLhs(name.clone())));
1101
}
1102
1103
let (mut trans_input_left, mut trans_left_on) = lower_exprs(
1104
phys_left,
1105
&aug_left_on,
1106
expr_arena,
1107
phys_sm,
1108
expr_cache,
1109
ctx,
1110
)?;
1111
let (mut trans_input_right, mut trans_right_on) = lower_exprs(
1112
phys_right,
1113
&aug_right_on,
1114
expr_arena,
1115
phys_sm,
1116
expr_cache,
1117
ctx,
1118
)?;
1119
1120
trans_left_on.drain(left_on.len()..);
1121
trans_right_on.drain(right_on.len()..);
1122
1123
let mut key_descending = left_on_sorted.as_ref().and_then(|v| v[0].descending);
1124
let key_nulls_last = left_on_sorted.as_ref().and_then(|v| v[0].nulls_last);
1125
let mut tmp_left_key_col = None;
1126
let mut tmp_right_key_col = None;
1127
if use_streaming_merge_join || use_streaming_asof_join {
1128
(trans_input_left, trans_left_on, tmp_left_key_col) = append_sorted_key_column(
1129
trans_input_left,
1130
trans_left_on,
1131
left_on_sorted.as_ref(),
1132
Some(!args.nulls_equal),
1133
expr_arena,
1134
phys_sm,
1135
expr_cache,
1136
ctx,
1137
)?;
1138
(trans_input_right, trans_right_on, tmp_right_key_col) =
1139
append_sorted_key_column(
1140
trans_input_right,
1141
trans_right_on,
1142
right_on_sorted.as_ref(),
1143
Some(!args.nulls_equal),
1144
expr_arena,
1145
phys_sm,
1146
expr_cache,
1147
ctx,
1148
)?;
1149
}
1150
1151
let node = if use_streaming_merge_join {
1152
let keys_are_row_encoded = left_on_names.len() > 1;
1153
if keys_are_row_encoded {
1154
key_descending = Some(false);
1155
}
1156
phys_sm.insert(PhysNode::new(
1157
output_schema,
1158
PhysNodeKind::MergeJoin {
1159
input_left: trans_input_left,
1160
input_right: trans_input_right,
1161
left_on: left_on_names,
1162
right_on: right_on_names,
1163
tmp_left_key_col,
1164
tmp_right_key_col,
1165
keys_row_encoded: keys_are_row_encoded,
1166
descending: key_descending.unwrap(),
1167
nulls_last: key_nulls_last.unwrap(),
1168
args: args.clone(),
1169
},
1170
))
1171
} else if args.how.is_equi() {
1172
phys_sm.insert(PhysNode::new(
1173
output_schema,
1174
PhysNodeKind::EquiJoin {
1175
input_left: trans_input_left,
1176
input_right: trans_input_right,
1177
left_on: trans_left_on,
1178
right_on: trans_right_on,
1179
args: args.clone(),
1180
},
1181
))
1182
} else if use_streaming_asof_join {
1183
assert!(left_on_names.len() == 1 && right_on_names.len() == 1);
1184
phys_sm.insert(PhysNode::new(
1185
output_schema,
1186
PhysNodeKind::AsOfJoin {
1187
input_left: trans_input_left,
1188
input_right: trans_input_right,
1189
left_on: left_on_names[0].clone(),
1190
right_on: right_on_names[0].clone(),
1191
tmp_left_key_col,
1192
tmp_right_key_col,
1193
args: args.clone(),
1194
},
1195
))
1196
} else {
1197
phys_sm.insert(PhysNode::new(
1198
output_schema,
1199
PhysNodeKind::SemiAntiJoin {
1200
input_left: trans_input_left,
1201
input_right: trans_input_right,
1202
left_on: trans_left_on,
1203
right_on: trans_right_on,
1204
args: args.clone(),
1205
output_bool: false,
1206
},
1207
))
1208
};
1209
let mut stream = PhysStream::first(node);
1210
if let Some((offset, len)) = args.slice {
1211
stream = build_slice_stream(stream, offset, len, phys_sm);
1212
}
1213
return Ok(stream);
1214
} else if args.how.is_cross() {
1215
let node = phys_sm.insert(PhysNode::new(
1216
output_schema,
1217
PhysNodeKind::CrossJoin {
1218
input_left: phys_left,
1219
input_right: phys_right,
1220
args: args.clone(),
1221
},
1222
));
1223
let mut stream = PhysStream::first(node);
1224
if let Some((offset, len)) = args.slice {
1225
stream = build_slice_stream(stream, offset, len, phys_sm);
1226
}
1227
return Ok(stream);
1228
} else {
1229
PhysNodeKind::InMemoryJoin {
1230
input_left: phys_left,
1231
input_right: phys_right,
1232
left_on,
1233
right_on,
1234
args,
1235
options,
1236
}
1237
}
1238
},
1239
1240
IR::Distinct { input, options } => {
1241
let options = options.clone();
1242
let input = *input;
1243
let phys_input = lower_ir!(input)?;
1244
1245
// We don't have a dedicated distinct operator (yet), lower to group
1246
// by with an aggregate for each column.
1247
let input_schema = &phys_sm[phys_input.node].output_schema;
1248
if input_schema.is_empty() {
1249
// Can't group (or have duplicates) if dataframe has zero-width.
1250
return Ok(phys_input);
1251
}
1252
1253
if options.maintain_order && options.keep_strategy == UniqueKeepStrategy::Last {
1254
// Unfortunately the order-preserving groupby always orders by the first occurrence
1255
// of the group so we can't lower this and have to fallback.
1256
let input_schema = phys_sm[phys_input.node].output_schema.clone();
1257
let lmdf = Arc::new(LateMaterializedDataFrame::default());
1258
let mut lp_arena = Arena::default();
1259
let input_lp_node = lp_arena.add(lmdf.clone().as_ir_node(input_schema));
1260
let distinct_lp_node = lp_arena.add(IR::Distinct {
1261
input: input_lp_node,
1262
options,
1263
});
1264
let executor = Mutex::new(create_physical_plan(
1265
distinct_lp_node,
1266
&mut lp_arena,
1267
expr_arena,
1268
Some(crate::dispatch::build_streaming_query_executor),
1269
)?);
1270
1271
let format_str = ctx.prepare_visualization.then(|| {
1272
let mut buffer = String::new();
1273
write_ir_non_recursive(
1274
&mut buffer,
1275
ir_arena.get(node),
1276
expr_arena,
1277
phys_sm.get(phys_input.node).unwrap().output_schema.as_ref(),
1278
0,
1279
)
1280
.unwrap();
1281
buffer
1282
});
1283
let distinct_node = PhysNode {
1284
output_schema,
1285
kind: PhysNodeKind::InMemoryMap {
1286
input: phys_input,
1287
map: Arc::new(move |df| {
1288
lmdf.set_materialized_dataframe(df);
1289
let mut state = ExecutionState::new();
1290
executor.lock().execute(&mut state)
1291
}),
1292
format_str,
1293
},
1294
};
1295
1296
return Ok(PhysStream::first(phys_sm.insert(distinct_node)));
1297
}
1298
1299
// Create the key and aggregate expressions.
1300
let all_col_names = input_schema.iter_names().cloned().collect_vec();
1301
let key_names = if let Some(subset) = options.subset {
1302
subset.to_vec()
1303
} else {
1304
all_col_names.clone()
1305
};
1306
let key_name_set: PlHashSet<_> = key_names.iter().cloned().collect();
1307
1308
let mut group_by_output_schema = Schema::with_capacity(all_col_names.len() + 1);
1309
let keys = key_names
1310
.iter()
1311
.map(|name| {
1312
group_by_output_schema
1313
.insert(name.clone(), input_schema.get(name).unwrap().clone());
1314
let col_expr = expr_arena.add(AExpr::Column(name.clone()));
1315
ExprIR::new(col_expr, OutputName::ColumnLhs(name.clone()))
1316
})
1317
.collect_vec();
1318
1319
let mut aggs = all_col_names
1320
.iter()
1321
.filter(|name| !key_name_set.contains(*name))
1322
.map(|name| {
1323
group_by_output_schema
1324
.insert(name.clone(), input_schema.get(name).unwrap().clone());
1325
let col_expr = expr_arena.add(AExpr::Column(name.clone()));
1326
use UniqueKeepStrategy::*;
1327
let agg_expr = match options.keep_strategy {
1328
First | None | Any => {
1329
expr_arena.add(AExpr::Agg(IRAggExpr::First(col_expr)))
1330
},
1331
Last => expr_arena.add(AExpr::Agg(IRAggExpr::Last(col_expr))),
1332
};
1333
ExprIR::new(agg_expr, OutputName::ColumnLhs(name.clone()))
1334
})
1335
.collect_vec();
1336
1337
if options.keep_strategy == UniqueKeepStrategy::None {
1338
// Track the length so we can filter out non-unique keys later.
1339
let name = unique_column_name();
1340
group_by_output_schema.insert(name.clone(), DataType::IDX_DTYPE);
1341
aggs.push(ExprIR::new(
1342
expr_arena.add(AExpr::Len),
1343
OutputName::Alias(name),
1344
));
1345
}
1346
1347
let are_keys_sorted = are_keys_sorted_any(
1348
is_sorted(input, ir_arena, expr_arena).as_ref(),
1349
&keys,
1350
expr_arena,
1351
input_schema,
1352
)
1353
.is_some();
1354
1355
let mut stream = build_group_by_stream(
1356
phys_input,
1357
&keys,
1358
&aggs,
1359
Arc::new(group_by_output_schema),
1360
options.maintain_order,
1361
Arc::new(GroupbyOptions::default()),
1362
None,
1363
expr_arena,
1364
phys_sm,
1365
expr_cache,
1366
ctx,
1367
are_keys_sorted,
1368
)?;
1369
1370
if options.keep_strategy == UniqueKeepStrategy::None {
1371
// Filter to keep only those groups with length 1.
1372
let unique_name = aggs.last().unwrap().output_name();
1373
let left = expr_arena.add(AExpr::Column(unique_name.clone()));
1374
let right = expr_arena.add(AExpr::Literal(LiteralValue::new_idxsize(1)));
1375
let predicate_aexpr = expr_arena.add(AExpr::BinaryExpr {
1376
left,
1377
op: polars_plan::dsl::Operator::Eq,
1378
right,
1379
});
1380
let predicate =
1381
ExprIR::new(predicate_aexpr, OutputName::ColumnLhs(unique_name.clone()));
1382
stream =
1383
build_filter_stream(stream, predicate, expr_arena, phys_sm, expr_cache, ctx)?;
1384
}
1385
1386
// Restore column order and drop the temporary length column if any.
1387
let exprs = all_col_names
1388
.iter()
1389
.map(|name| {
1390
let col_expr = expr_arena.add(AExpr::Column(name.clone()));
1391
ExprIR::new(col_expr, OutputName::ColumnLhs(name.clone()))
1392
})
1393
.collect_vec();
1394
stream = build_select_stream(stream, &exprs, expr_arena, phys_sm, expr_cache, ctx)?;
1395
1396
// We didn't pass the slice earlier to build_group_by_stream because
1397
// we might have the intermediate keep = "none" filter.
1398
if let Some((offset, length)) = options.slice {
1399
stream = build_slice_stream(stream, offset, length, phys_sm);
1400
}
1401
1402
return Ok(stream);
1403
},
1404
IR::ExtContext { .. } => todo!(),
1405
IR::Invalid => unreachable!(),
1406
};
1407
1408
let node_key = phys_sm.insert(PhysNode::new(output_schema, node_kind));
1409
Ok(PhysStream::first(node_key))
1410
}
1411
1412
/// Append a sorted key column to the DataFrame.
1413
///
1414
/// If keys_sorted is None, the sortedness of the key will be decided by the
1415
/// default sortedness behavior of RowEncodingVariant::Ordered.
1416
#[allow(clippy::too_many_arguments)]
1417
fn append_sorted_key_column(
1418
phys_input: PhysStream,
1419
mut key_exprs: Vec<ExprIR>,
1420
keys_sorted: Option<&Vec<AExprSorted>>,
1421
broadcast_nulls: Option<bool>,
1422
expr_arena: &mut Arena<AExpr>,
1423
phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,
1424
expr_cache: &mut ExprCache,
1425
ctx: StreamingLowerIRContext,
1426
) -> PolarsResult<(PhysStream, Vec<ExprIR>, Option<PlSmallStr>)> {
1427
let input_schema = &phys_sm[phys_input.node].output_schema.clone();
1428
let use_row_encoding =
1429
key_exprs.len() > 1 || key_exprs[0].dtype(input_schema, expr_arena)?.is_nested();
1430
let key_expr_is_trivial =
1431
|c: &ExprIR, ea: &mut Arena<AExpr>| matches!(ea.get(c.node()), AExpr::Column(_));
1432
let (phys_output, key_col_name) = if use_row_encoding {
1433
let key_col_name = unique_column_name();
1434
let tfc = ToFieldContext::new(expr_arena, input_schema);
1435
let sorted_descending =
1436
keys_sorted.and_then(|v| v.iter().map(|s| s.descending).collect::<Option<Vec<_>>>());
1437
let sorted_nulls_last =
1438
keys_sorted.and_then(|v| v.iter().map(|s| s.nulls_last).collect::<Option<Vec<_>>>());
1439
let expr_dtype = |e: &ExprIR| expr_arena.get(e.node()).to_dtype(&tfc);
1440
let row_encode_col_expr = AExprBuilder::row_encode(
1441
key_exprs.clone(),
1442
key_exprs.iter().map(expr_dtype).try_collect_vec()?,
1443
RowEncodingVariant::Ordered {
1444
descending: sorted_descending,
1445
nulls_last: sorted_nulls_last,
1446
broadcast_nulls,
1447
},
1448
expr_arena,
1449
)
1450
.expr_ir(key_col_name.clone());
1451
key_exprs.clear();
1452
key_exprs.push(row_encode_col_expr);
1453
let output =
1454
build_hstack_stream(phys_input, &key_exprs, expr_arena, phys_sm, expr_cache, ctx)?;
1455
(output, Some(key_col_name))
1456
} else if !key_expr_is_trivial(&key_exprs[0], expr_arena) {
1457
let key_col_name = unique_column_name();
1458
key_exprs[0] = key_exprs[0].with_alias(key_col_name.clone());
1459
let output =
1460
build_hstack_stream(phys_input, &key_exprs, expr_arena, phys_sm, expr_cache, ctx)?;
1461
(output, Some(key_col_name))
1462
} else {
1463
(phys_input, None)
1464
};
1465
Ok((phys_output, key_exprs, key_col_name))
1466
}
1467
1468