Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/physical_plan/lower_ir.rs
6939 views
1
use std::sync::Arc;
2
3
use parking_lot::Mutex;
4
use polars_core::frame::{DataFrame, UniqueKeepStrategy};
5
use polars_core::prelude::{DataType, PlHashMap, PlHashSet};
6
use polars_core::scalar::Scalar;
7
use polars_core::schema::Schema;
8
use polars_core::{SchemaExtPl, config};
9
use polars_error::{PolarsResult, polars_bail};
10
use polars_expr::state::ExecutionState;
11
use polars_mem_engine::create_physical_plan;
12
use polars_plan::constants::get_literal_name;
13
use polars_plan::dsl::default_values::DefaultFieldValues;
14
use polars_plan::dsl::deletion::DeletionFilesList;
15
use polars_plan::dsl::{
16
ExtraColumnsPolicy, FileScanIR, FileSinkType, PartitionSinkTypeIR, PartitionVariantIR,
17
SinkTypeIR,
18
};
19
use polars_plan::plans::expr_ir::{ExprIR, OutputName};
20
use polars_plan::plans::{AExpr, FunctionIR, IR, IRAggExpr, LiteralValue, write_ir_non_recursive};
21
use polars_plan::prelude::GroupbyOptions;
22
use polars_utils::arena::{Arena, Node};
23
use polars_utils::itertools::Itertools;
24
use polars_utils::pl_str::PlSmallStr;
25
use polars_utils::slice_enum::Slice;
26
use polars_utils::unique_id::UniqueId;
27
use polars_utils::{IdxSize, unique_column_name};
28
use slotmap::SlotMap;
29
30
use super::lower_expr::build_hstack_stream;
31
use super::{PhysNode, PhysNodeKey, PhysNodeKind, PhysStream};
32
use crate::nodes::io_sources::multi_scan;
33
use crate::nodes::io_sources::multi_scan::components::forbid_extra_columns::ForbidExtraColumns;
34
use crate::nodes::io_sources::multi_scan::components::projection::builder::ProjectionBuilder;
35
use crate::nodes::io_sources::multi_scan::reader_interface::builder::FileReaderBuilder;
36
use crate::physical_plan::lower_expr::{ExprCache, build_select_stream, lower_exprs};
37
use crate::physical_plan::lower_group_by::build_group_by_stream;
38
use crate::utils::late_materialized_df::LateMaterializedDataFrame;
39
40
/// Creates a new PhysStream which outputs a slice of the input stream.
41
pub fn build_slice_stream(
42
input: PhysStream,
43
offset: i64,
44
length: usize,
45
phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,
46
) -> PhysStream {
47
if offset >= 0 {
48
let offset = offset as usize;
49
PhysStream::first(phys_sm.insert(PhysNode::new(
50
phys_sm[input.node].output_schema.clone(),
51
PhysNodeKind::StreamingSlice {
52
input,
53
offset,
54
length,
55
},
56
)))
57
} else {
58
PhysStream::first(phys_sm.insert(PhysNode::new(
59
phys_sm[input.node].output_schema.clone(),
60
PhysNodeKind::NegativeSlice {
61
input,
62
offset,
63
length,
64
},
65
)))
66
}
67
}
68
69
/// Creates a new PhysStream which is filters the input stream.
70
pub(super) fn build_filter_stream(
71
input: PhysStream,
72
predicate: ExprIR,
73
expr_arena: &mut Arena<AExpr>,
74
phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,
75
expr_cache: &mut ExprCache,
76
ctx: StreamingLowerIRContext,
77
) -> PolarsResult<PhysStream> {
78
let predicate = predicate;
79
let cols_and_predicate = phys_sm[input.node]
80
.output_schema
81
.iter_names()
82
.cloned()
83
.map(|name| {
84
ExprIR::new(
85
expr_arena.add(AExpr::Column(name.clone())),
86
OutputName::ColumnLhs(name),
87
)
88
})
89
.chain([predicate])
90
.collect_vec();
91
let (trans_input, mut trans_cols_and_predicate) = lower_exprs(
92
input,
93
&cols_and_predicate,
94
expr_arena,
95
phys_sm,
96
expr_cache,
97
ctx,
98
)?;
99
100
let filter_schema = phys_sm[trans_input.node].output_schema.clone();
101
let filter = PhysNodeKind::Filter {
102
input: trans_input,
103
predicate: trans_cols_and_predicate.last().unwrap().clone(),
104
};
105
106
let post_filter = phys_sm.insert(PhysNode::new(filter_schema, filter));
107
trans_cols_and_predicate.pop(); // Remove predicate.
108
build_select_stream(
109
PhysStream::first(post_filter),
110
&trans_cols_and_predicate,
111
expr_arena,
112
phys_sm,
113
expr_cache,
114
ctx,
115
)
116
}
117
118
/// Creates a new PhysStream with row index attached with the given name.
119
pub fn build_row_idx_stream(
120
input: PhysStream,
121
name: PlSmallStr,
122
offset: Option<IdxSize>,
123
phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,
124
) -> PhysStream {
125
let input_schema = &phys_sm[input.node].output_schema;
126
let mut output_schema = (**input_schema).clone();
127
output_schema
128
.insert_at_index(0, name.clone(), DataType::IDX_DTYPE)
129
.unwrap();
130
let kind = PhysNodeKind::WithRowIndex {
131
input,
132
name,
133
offset,
134
};
135
let with_row_idx_node_key = phys_sm.insert(PhysNode::new(Arc::new(output_schema), kind));
136
PhysStream::first(with_row_idx_node_key)
137
}
138
139
#[derive(Debug, Clone, Copy)]
140
pub struct StreamingLowerIRContext {
141
pub prepare_visualization: bool,
142
}
143
144
#[recursive::recursive]
145
#[allow(clippy::too_many_arguments)]
146
pub fn lower_ir(
147
node: Node,
148
ir_arena: &mut Arena<IR>,
149
expr_arena: &mut Arena<AExpr>,
150
phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,
151
schema_cache: &mut PlHashMap<Node, Arc<Schema>>,
152
expr_cache: &mut ExprCache,
153
cache_nodes: &mut PlHashMap<UniqueId, PhysStream>,
154
ctx: StreamingLowerIRContext,
155
) -> PolarsResult<PhysStream> {
156
// Helper macro to simplify recursive calls.
157
macro_rules! lower_ir {
158
($input:expr) => {
159
lower_ir(
160
$input,
161
ir_arena,
162
expr_arena,
163
phys_sm,
164
schema_cache,
165
expr_cache,
166
cache_nodes,
167
ctx,
168
)
169
};
170
}
171
172
let ir_node = ir_arena.get(node);
173
let output_schema = IR::schema_with_cache(node, ir_arena, schema_cache);
174
let node_kind = match ir_node {
175
IR::SimpleProjection { input, columns } => {
176
let columns = columns.iter_names_cloned().collect::<Vec<_>>();
177
let phys_input = lower_ir!(*input)?;
178
PhysNodeKind::SimpleProjection {
179
input: phys_input,
180
columns,
181
}
182
},
183
184
IR::Select { input, expr, .. } => {
185
let selectors = expr.clone();
186
let phys_input = lower_ir!(*input)?;
187
return build_select_stream(
188
phys_input, &selectors, expr_arena, phys_sm, expr_cache, ctx,
189
);
190
},
191
192
IR::HStack { input, exprs, .. } => {
193
let exprs = exprs.to_vec();
194
let phys_input = lower_ir!(*input)?;
195
return build_hstack_stream(phys_input, &exprs, expr_arena, phys_sm, expr_cache, ctx);
196
},
197
198
IR::Slice { input, offset, len } => {
199
let offset = *offset;
200
let len = *len as usize;
201
let phys_input = lower_ir!(*input)?;
202
return Ok(build_slice_stream(phys_input, offset, len, phys_sm));
203
},
204
205
IR::Filter { input, predicate } => {
206
let predicate = predicate.clone();
207
let phys_input = lower_ir!(*input)?;
208
return build_filter_stream(
209
phys_input, predicate, expr_arena, phys_sm, expr_cache, ctx,
210
);
211
},
212
213
IR::DataFrameScan {
214
df,
215
output_schema: projection,
216
schema,
217
..
218
} => {
219
let schema = schema.clone(); // This is initially the schema of df, but can change with the projection.
220
let mut node_kind = PhysNodeKind::InMemorySource { df: df.clone() };
221
222
// Do we need to apply a projection?
223
if let Some(projection_schema) = projection {
224
if projection_schema.len() != schema.len()
225
|| projection_schema
226
.iter_names()
227
.zip(schema.iter_names())
228
.any(|(l, r)| l != r)
229
{
230
let phys_input = phys_sm.insert(PhysNode::new(schema, node_kind));
231
node_kind = PhysNodeKind::SimpleProjection {
232
input: PhysStream::first(phys_input),
233
columns: projection_schema.iter_names_cloned().collect::<Vec<_>>(),
234
};
235
}
236
}
237
238
node_kind
239
},
240
241
IR::Sink { input, payload } => match payload {
242
SinkTypeIR::Memory => {
243
let phys_input = lower_ir!(*input)?;
244
PhysNodeKind::InMemorySink { input: phys_input }
245
},
246
SinkTypeIR::File(FileSinkType {
247
target,
248
sink_options,
249
file_type,
250
cloud_options,
251
}) => {
252
let target = target.clone();
253
let sink_options = sink_options.clone();
254
let file_type = file_type.clone();
255
let cloud_options = cloud_options.clone();
256
257
let phys_input = lower_ir!(*input)?;
258
PhysNodeKind::FileSink {
259
target,
260
sink_options,
261
file_type,
262
input: phys_input,
263
cloud_options,
264
}
265
},
266
SinkTypeIR::Partition(PartitionSinkTypeIR {
267
base_path,
268
file_path_cb,
269
sink_options,
270
variant,
271
file_type,
272
cloud_options,
273
per_partition_sort_by,
274
finish_callback,
275
}) => {
276
let base_path = base_path.clone();
277
let file_path_cb = file_path_cb.clone();
278
let sink_options = sink_options.clone();
279
let variant = variant.clone();
280
let file_type = file_type.clone();
281
let cloud_options = cloud_options.clone();
282
let per_partition_sort_by = per_partition_sort_by.clone();
283
let finish_callback = finish_callback.clone();
284
285
let mut input = lower_ir!(*input)?;
286
match &variant {
287
PartitionVariantIR::MaxSize(_) => {},
288
PartitionVariantIR::Parted {
289
key_exprs,
290
include_key: _,
291
}
292
| PartitionVariantIR::ByKey {
293
key_exprs,
294
include_key: _,
295
} => {
296
if key_exprs.is_empty() {
297
polars_bail!(InvalidOperation: "cannot partition by-key without key expressions");
298
}
299
300
let input_schema = &phys_sm[input.node].output_schema;
301
let mut select_output_schema = input_schema.as_ref().clone();
302
for key_expr in key_exprs.iter() {
303
select_output_schema.insert(
304
key_expr.output_name().clone(),
305
key_expr.dtype(input_schema.as_ref(), expr_arena)?.clone(),
306
);
307
}
308
309
let select_output_schema = Arc::new(select_output_schema);
310
let node = phys_sm.insert(PhysNode {
311
output_schema: select_output_schema,
312
kind: PhysNodeKind::Select {
313
input,
314
selectors: key_exprs.clone(),
315
extend_original: true,
316
},
317
});
318
input = PhysStream::first(node);
319
},
320
};
321
322
PhysNodeKind::PartitionSink {
323
input,
324
base_path,
325
file_path_cb,
326
sink_options,
327
variant,
328
file_type,
329
cloud_options,
330
per_partition_sort_by,
331
finish_callback,
332
}
333
},
334
},
335
336
IR::SinkMultiple { inputs } => {
337
let mut sinks = Vec::with_capacity(inputs.len());
338
for input in inputs.clone() {
339
let phys_node_stream = match ir_arena.get(input) {
340
IR::Sink { .. } => lower_ir!(input)?,
341
_ => lower_ir!(ir_arena.add(IR::Sink {
342
input,
343
payload: SinkTypeIR::Memory
344
}))?,
345
};
346
sinks.push(phys_node_stream.node);
347
}
348
PhysNodeKind::SinkMultiple { sinks }
349
},
350
351
#[cfg(feature = "merge_sorted")]
352
IR::MergeSorted {
353
input_left,
354
input_right,
355
key,
356
} => {
357
let input_left = *input_left;
358
let input_right = *input_right;
359
let key = key.clone();
360
361
let mut phys_left = lower_ir!(input_left)?;
362
let mut phys_right = lower_ir!(input_right)?;
363
364
let left_schema = &phys_sm[phys_left.node].output_schema;
365
let right_schema = &phys_sm[phys_right.node].output_schema;
366
367
left_schema.ensure_is_exact_match(right_schema).unwrap();
368
369
let key_dtype = left_schema.try_get(key.as_str())?.clone();
370
371
let key_name = unique_column_name();
372
use polars_plan::plans::{AExprBuilder, RowEncodingVariant};
373
374
// Add the key column as the last column for both inputs.
375
for s in [&mut phys_left, &mut phys_right] {
376
let key_dtype = key_dtype.clone();
377
let mut expr = AExprBuilder::col(key.clone(), expr_arena);
378
if key_dtype.is_nested() {
379
expr = expr.row_encode_unary(
380
RowEncodingVariant::Ordered {
381
descending: None,
382
nulls_last: None,
383
},
384
key_dtype,
385
expr_arena,
386
);
387
}
388
389
*s = build_hstack_stream(
390
*s,
391
&[expr.expr_ir(key_name.clone())],
392
expr_arena,
393
phys_sm,
394
expr_cache,
395
ctx,
396
)?;
397
}
398
399
PhysNodeKind::MergeSorted {
400
input_left: phys_left,
401
input_right: phys_right,
402
}
403
},
404
405
IR::MapFunction { input, function } => {
406
let function = function.clone();
407
let phys_input = lower_ir!(*input)?;
408
409
match function {
410
FunctionIR::RowIndex {
411
name,
412
offset,
413
schema: _,
414
} => PhysNodeKind::WithRowIndex {
415
input: phys_input,
416
name,
417
offset,
418
},
419
420
function if function.is_streamable() => {
421
let map = Arc::new(move |df| function.evaluate(df));
422
PhysNodeKind::Map {
423
input: phys_input,
424
map,
425
}
426
},
427
428
function => {
429
let format_str = ctx.prepare_visualization.then(|| {
430
let mut buffer = String::new();
431
write_ir_non_recursive(
432
&mut buffer,
433
ir_arena.get(node),
434
expr_arena,
435
phys_sm.get(phys_input.node).unwrap().output_schema.as_ref(),
436
0,
437
)
438
.unwrap();
439
buffer
440
});
441
let map = Arc::new(move |df| function.evaluate(df));
442
PhysNodeKind::InMemoryMap {
443
input: phys_input,
444
map,
445
format_str,
446
}
447
},
448
}
449
},
450
451
IR::Sort {
452
input,
453
by_column,
454
slice,
455
sort_options,
456
} => {
457
let slice = *slice;
458
let mut by_column = by_column.clone();
459
let mut sort_options = sort_options.clone();
460
let phys_input = lower_ir!(*input)?;
461
462
// See if we can insert a top k.
463
let mut limit = u64::MAX;
464
if let Some((0, l)) = slice {
465
limit = limit.min(l as u64);
466
}
467
#[allow(clippy::unnecessary_cast)]
468
if let Some(l) = sort_options.limit {
469
limit = limit.min(l as u64);
470
};
471
472
let mut stream = phys_input;
473
if limit < u64::MAX {
474
// If we need to maintain order augment with row index.
475
if sort_options.maintain_order {
476
let row_idx_name = unique_column_name();
477
stream = build_row_idx_stream(stream, row_idx_name.clone(), None, phys_sm);
478
479
// Add row index to sort columns.
480
let row_idx_node = expr_arena.add(AExpr::Column(row_idx_name.clone()));
481
by_column.push(ExprIR::new(
482
row_idx_node,
483
OutputName::ColumnLhs(row_idx_name),
484
));
485
sort_options.descending.push(false);
486
sort_options.nulls_last.push(true);
487
488
// No longer needed for the actual sort itself, handled by row index.
489
sort_options.maintain_order = false;
490
}
491
492
let k_node =
493
expr_arena.add(AExpr::Literal(LiteralValue::Scalar(Scalar::from(limit))));
494
let k_selector = ExprIR::from_node(k_node, expr_arena);
495
let k_output_schema =
496
Schema::from_iter([(get_literal_name().clone(), DataType::UInt64)]);
497
let k_node = phys_sm.insert(PhysNode::new(
498
Arc::new(k_output_schema),
499
PhysNodeKind::InputIndependentSelect {
500
selectors: vec![k_selector],
501
},
502
));
503
504
let trans_by_column;
505
(stream, trans_by_column) =
506
lower_exprs(stream, &by_column, expr_arena, phys_sm, expr_cache, ctx)?;
507
508
stream = PhysStream::first(phys_sm.insert(PhysNode {
509
output_schema: phys_sm[stream.node].output_schema.clone(),
510
kind: PhysNodeKind::TopK {
511
input: stream,
512
k: PhysStream::first(k_node),
513
by_column: trans_by_column,
514
reverse: sort_options.descending.iter().map(|x| !x).collect(),
515
nulls_last: sort_options.nulls_last.clone(),
516
},
517
}));
518
}
519
520
stream = PhysStream::first(phys_sm.insert(PhysNode {
521
output_schema: phys_sm[stream.node].output_schema.clone(),
522
kind: PhysNodeKind::Sort {
523
input: stream,
524
by_column,
525
slice,
526
sort_options,
527
},
528
}));
529
530
// Remove any temporary columns we may have added.
531
let exprs: Vec<_> = output_schema
532
.iter_names()
533
.map(|name| {
534
let node = expr_arena.add(AExpr::Column(name.clone()));
535
ExprIR::new(node, OutputName::ColumnLhs(name.clone()))
536
})
537
.collect();
538
stream = build_select_stream(stream, &exprs, expr_arena, phys_sm, expr_cache, ctx)?;
539
540
return Ok(stream);
541
},
542
543
IR::Union { inputs, options } => {
544
let options = *options;
545
let inputs = inputs
546
.clone() // Needed to borrow ir_arena mutably.
547
.into_iter()
548
.map(|input| lower_ir!(input))
549
.collect::<Result<_, _>>()?;
550
551
let node = phys_sm.insert(PhysNode {
552
output_schema,
553
kind: PhysNodeKind::OrderedUnion { inputs },
554
});
555
let mut stream = PhysStream::first(node);
556
if let Some((offset, length)) = options.slice {
557
stream = build_slice_stream(stream, offset, length, phys_sm);
558
}
559
return Ok(stream);
560
},
561
562
IR::HConcat {
563
inputs,
564
schema: _,
565
options: _,
566
} => {
567
let inputs = inputs
568
.clone() // Needed to borrow ir_arena mutably.
569
.into_iter()
570
.map(|input| lower_ir!(input))
571
.collect::<Result<_, _>>()?;
572
PhysNodeKind::Zip {
573
inputs,
574
null_extend: true,
575
}
576
},
577
578
v @ IR::Scan { .. } => {
579
let IR::Scan {
580
sources: scan_sources,
581
file_info,
582
mut hive_parts,
583
output_schema: _,
584
scan_type,
585
predicate,
586
unified_scan_args,
587
} = v.clone()
588
else {
589
unreachable!();
590
};
591
592
if scan_sources.is_empty()
593
|| unified_scan_args
594
.pre_slice
595
.as_ref()
596
.is_some_and(|slice| slice.len() == 0)
597
{
598
if config::verbose() {
599
eprintln!("lower_ir: scan IR had empty sources")
600
}
601
602
// If there are no sources, just provide an empty in-memory source with the right
603
// schema.
604
PhysNodeKind::InMemorySource {
605
df: Arc::new(DataFrame::empty_with_schema(output_schema.as_ref())),
606
}
607
} else {
608
let file_reader_builder = match &*scan_type {
609
#[cfg(feature = "parquet")]
610
FileScanIR::Parquet {
611
options,
612
metadata: first_metadata,
613
} => Arc::new(
614
crate::nodes::io_sources::parquet::builder::ParquetReaderBuilder {
615
options: Arc::new(options.clone()),
616
first_metadata: first_metadata.clone(),
617
},
618
) as Arc<dyn FileReaderBuilder>,
619
620
#[cfg(feature = "ipc")]
621
FileScanIR::Ipc {
622
options: polars_io::ipc::IpcScanOptions {},
623
metadata: first_metadata,
624
} => Arc::new(crate::nodes::io_sources::ipc::builder::IpcReaderBuilder {
625
first_metadata: first_metadata.clone(),
626
}) as Arc<dyn FileReaderBuilder>,
627
628
#[cfg(feature = "csv")]
629
FileScanIR::Csv { options } => {
630
Arc::new(Arc::new(options.clone())) as Arc<dyn FileReaderBuilder>
631
},
632
633
#[cfg(feature = "json")]
634
FileScanIR::NDJson { options } => {
635
Arc::new(Arc::new(options.clone())) as Arc<dyn FileReaderBuilder>
636
},
637
638
#[cfg(feature = "python")]
639
FileScanIR::PythonDataset {
640
dataset_object: _,
641
cached_ir,
642
} => {
643
use crate::physical_plan::io::python_dataset::python_dataset_scan_to_reader_builder;
644
let guard = cached_ir.lock().unwrap();
645
646
let expanded_scan = guard
647
.as_ref()
648
.expect("python dataset should be resolved")
649
.python_scan()
650
.expect("should be python scan");
651
652
python_dataset_scan_to_reader_builder(expanded_scan)
653
},
654
655
FileScanIR::Anonymous { .. } => todo!("unimplemented: AnonymousScan"),
656
};
657
658
{
659
let cloud_options = &unified_scan_args.cloud_options;
660
let output_schema =
661
if std::env::var("POLARS_FORCE_EMPTY_PROJECT").as_deref() == Ok("1") {
662
Default::default()
663
} else {
664
output_schema
665
};
666
667
let cloud_options = cloud_options.clone().map(Arc::new);
668
let file_schema = file_info.schema;
669
670
let (projected_schema, file_schema) =
671
multi_scan::functions::resolve_projections::resolve_projections(
672
&output_schema,
673
&file_schema,
674
&mut hive_parts,
675
unified_scan_args
676
.row_index
677
.as_ref()
678
.map(|ri| ri.name.as_str()),
679
unified_scan_args
680
.include_file_paths
681
.as_ref()
682
.map(|x| x.as_str()),
683
);
684
685
let file_projection_builder = ProjectionBuilder::new(
686
projected_schema,
687
unified_scan_args.column_mapping.as_ref(),
688
unified_scan_args
689
.default_values
690
.filter(|DefaultFieldValues::Iceberg(v)| !v.is_empty())
691
.map(|DefaultFieldValues::Iceberg(v)| v),
692
);
693
694
// TODO: We ignore the parameter for some scan types to maintain old behavior,
695
// as they currently don't expose an API for it to be configured.
696
let extra_columns_policy = match &*scan_type {
697
#[cfg(feature = "parquet")]
698
FileScanIR::Parquet { .. } => unified_scan_args.extra_columns_policy,
699
700
_ => {
701
if unified_scan_args.projection.is_some() {
702
ExtraColumnsPolicy::Ignore
703
} else {
704
ExtraColumnsPolicy::Raise
705
}
706
},
707
};
708
709
let forbid_extra_columns = ForbidExtraColumns::opt_new(
710
&extra_columns_policy,
711
&file_schema,
712
unified_scan_args.column_mapping.as_ref(),
713
);
714
715
let mut multi_scan_node = PhysNodeKind::MultiScan {
716
scan_sources,
717
file_reader_builder,
718
cloud_options,
719
file_projection_builder,
720
output_schema: output_schema.clone(),
721
row_index: None,
722
pre_slice: None,
723
predicate: None,
724
hive_parts,
725
cast_columns_policy: unified_scan_args.cast_columns_policy,
726
missing_columns_policy: unified_scan_args.missing_columns_policy,
727
forbid_extra_columns,
728
include_file_paths: unified_scan_args.include_file_paths,
729
// Set to None if empty for performance.
730
deletion_files: DeletionFilesList::filter_empty(
731
unified_scan_args.deletion_files,
732
),
733
file_schema,
734
};
735
736
let PhysNodeKind::MultiScan {
737
output_schema: multi_scan_output_schema,
738
row_index: row_index_to_multiscan,
739
pre_slice: pre_slice_to_multiscan,
740
predicate: predicate_to_multiscan,
741
..
742
} = &mut multi_scan_node
743
else {
744
unreachable!()
745
};
746
747
let pre_slice = unified_scan_args.pre_slice.clone();
748
749
let mut row_index_post = unified_scan_args.row_index;
750
let mut pre_slice_post = pre_slice.clone();
751
let mut predicate_post = predicate;
752
753
// Always send predicate and slice to multiscan as they can be used to prune files. If the
754
// underlying reader does not support predicates, multiscan will apply it in post.
755
*predicate_to_multiscan = predicate_post.take();
756
// * Negative slice is resolved internally by the multiscan.
757
// * Note that is done via a row-count pass
758
*pre_slice_to_multiscan = pre_slice_post.take();
759
760
// * If a predicate was pushed then we always push row index
761
if predicate_to_multiscan.is_some()
762
|| matches!(pre_slice, Some(Slice::Negative { .. }))
763
{
764
*row_index_to_multiscan = row_index_post.take();
765
}
766
767
// TODO
768
// Projection pushdown could change the row index column position. Ideally it shouldn't,
769
// and instead just put a projection on top of the scan node in the IR. But for now
770
// we do that step here.
771
let mut schema_after_row_index_post = multi_scan_output_schema.clone();
772
let mut reorder_after_row_index_post = false;
773
774
// Remove row index from multiscan schema if not pushed.
775
if let Some(ri) = row_index_post.as_ref() {
776
let row_index_post_position =
777
multi_scan_output_schema.index_of(&ri.name).unwrap();
778
let (_, dtype) = Arc::make_mut(multi_scan_output_schema)
779
.shift_remove_index(row_index_post_position)
780
.unwrap();
781
782
if row_index_post_position != 0 {
783
reorder_after_row_index_post = true;
784
let mut schema =
785
Schema::with_capacity(multi_scan_output_schema.len() + 1);
786
schema.extend([(ri.name.clone(), dtype)]);
787
schema.extend(
788
multi_scan_output_schema
789
.iter()
790
.map(|(k, v)| (k.clone(), v.clone())),
791
);
792
schema_after_row_index_post = Arc::new(schema);
793
}
794
}
795
796
// If we have no predicate and no slice or positive slice, we can reorder the row index to after
797
// the slice by adjusting the offset. This can remove a serial synchronization step in multiscan
798
// and allow the reader to still skip rows.
799
let row_index_post_after_slice = (|| {
800
let mut row_index = row_index_post.take()?;
801
802
let positive_offset = match pre_slice {
803
Some(Slice::Positive { offset, .. }) => Some(offset),
804
None => Some(0),
805
Some(Slice::Negative { .. }) => unreachable!(),
806
}?;
807
808
row_index.offset = row_index.offset.saturating_add(
809
IdxSize::try_from(positive_offset).unwrap_or(IdxSize::MAX),
810
);
811
812
Some(row_index)
813
})();
814
815
let mut stream = {
816
let node_key = phys_sm.insert(PhysNode::new(
817
multi_scan_output_schema.clone(),
818
multi_scan_node,
819
));
820
PhysStream::first(node_key)
821
};
822
823
if let Some(ri) = row_index_post {
824
let node = PhysNodeKind::WithRowIndex {
825
input: stream,
826
name: ri.name,
827
offset: Some(ri.offset),
828
};
829
830
let node_key = phys_sm.insert(PhysNode {
831
output_schema: schema_after_row_index_post.clone(),
832
kind: node,
833
});
834
835
stream = PhysStream::first(node_key);
836
837
if reorder_after_row_index_post {
838
let node = PhysNodeKind::SimpleProjection {
839
input: stream,
840
columns: output_schema.iter_names_cloned().collect(),
841
};
842
843
let node_key = phys_sm.insert(PhysNode {
844
output_schema: output_schema.clone(),
845
kind: node,
846
});
847
848
stream = PhysStream::first(node_key);
849
}
850
}
851
852
if let Some(pre_slice) = pre_slice_post {
853
// TODO: Use native Slice enum in the slice node.
854
let (offset, length) = <(i64, usize)>::try_from(pre_slice).unwrap();
855
stream = build_slice_stream(stream, offset, length, phys_sm);
856
}
857
858
if let Some(ri) = row_index_post_after_slice {
859
let node = PhysNodeKind::WithRowIndex {
860
input: stream,
861
name: ri.name,
862
offset: Some(ri.offset),
863
};
864
865
let node_key = phys_sm.insert(PhysNode {
866
output_schema: schema_after_row_index_post,
867
kind: node,
868
});
869
870
stream = PhysStream::first(node_key);
871
872
if reorder_after_row_index_post {
873
let node = PhysNodeKind::SimpleProjection {
874
input: stream,
875
columns: output_schema.iter_names_cloned().collect(),
876
};
877
878
let node_key = phys_sm.insert(PhysNode {
879
output_schema: output_schema.clone(),
880
kind: node,
881
});
882
883
stream = PhysStream::first(node_key);
884
}
885
}
886
887
if let Some(predicate) = predicate_post {
888
stream = build_filter_stream(
889
stream, predicate, expr_arena, phys_sm, expr_cache, ctx,
890
)?;
891
}
892
893
return Ok(stream);
894
}
895
}
896
},
897
898
#[cfg(feature = "python")]
899
IR::PythonScan { options } => PhysNodeKind::PythonScan {
900
options: options.clone(),
901
},
902
903
IR::Cache { input, id } => {
904
let id = *id;
905
if let Some(cached) = cache_nodes.get(&id) {
906
return Ok(*cached);
907
}
908
909
let phys_input = lower_ir!(*input)?;
910
cache_nodes.insert(id, phys_input);
911
return Ok(phys_input);
912
},
913
914
IR::GroupBy {
915
input,
916
keys,
917
aggs,
918
schema: output_schema,
919
apply,
920
maintain_order,
921
options,
922
} => {
923
let input = *input;
924
let keys = keys.clone();
925
let aggs = aggs.clone();
926
let output_schema = output_schema.clone();
927
let apply = apply.clone();
928
let maintain_order = *maintain_order;
929
let options = options.clone();
930
931
let phys_input = lower_ir!(input)?;
932
return build_group_by_stream(
933
phys_input,
934
&keys,
935
&aggs,
936
output_schema,
937
maintain_order,
938
options,
939
apply,
940
expr_arena,
941
phys_sm,
942
expr_cache,
943
ctx,
944
);
945
},
946
IR::Join {
947
input_left,
948
input_right,
949
schema: _,
950
left_on,
951
right_on,
952
options,
953
} => {
954
let input_left = *input_left;
955
let input_right = *input_right;
956
let left_on = left_on.clone();
957
let right_on = right_on.clone();
958
let args = options.args.clone();
959
let options = options.options.clone();
960
let phys_left = lower_ir!(input_left)?;
961
let phys_right = lower_ir!(input_right)?;
962
if (args.how.is_equi() || args.how.is_semi_anti()) && !args.validation.needs_checks() {
963
// When lowering the expressions for the keys we need to ensure we keep around the
964
// payload columns, otherwise the input nodes can get replaced by input-independent
965
// nodes since the lowering code does not see we access any non-literal expressions.
966
// So we add dummy expressions before lowering and remove them afterwards.
967
let mut aug_left_on = left_on.clone();
968
for name in phys_sm[phys_left.node].output_schema.iter_names() {
969
let col_expr = expr_arena.add(AExpr::Column(name.clone()));
970
aug_left_on.push(ExprIR::new(col_expr, OutputName::ColumnLhs(name.clone())));
971
}
972
let mut aug_right_on = right_on.clone();
973
for name in phys_sm[phys_right.node].output_schema.iter_names() {
974
let col_expr = expr_arena.add(AExpr::Column(name.clone()));
975
aug_right_on.push(ExprIR::new(col_expr, OutputName::ColumnLhs(name.clone())));
976
}
977
let (trans_input_left, mut trans_left_on) = lower_exprs(
978
phys_left,
979
&aug_left_on,
980
expr_arena,
981
phys_sm,
982
expr_cache,
983
ctx,
984
)?;
985
let (trans_input_right, mut trans_right_on) = lower_exprs(
986
phys_right,
987
&aug_right_on,
988
expr_arena,
989
phys_sm,
990
expr_cache,
991
ctx,
992
)?;
993
trans_left_on.drain(left_on.len()..);
994
trans_right_on.drain(right_on.len()..);
995
996
let node = if args.how.is_equi() {
997
phys_sm.insert(PhysNode::new(
998
output_schema,
999
PhysNodeKind::EquiJoin {
1000
input_left: trans_input_left,
1001
input_right: trans_input_right,
1002
left_on: trans_left_on,
1003
right_on: trans_right_on,
1004
args: args.clone(),
1005
},
1006
))
1007
} else {
1008
phys_sm.insert(PhysNode::new(
1009
output_schema,
1010
PhysNodeKind::SemiAntiJoin {
1011
input_left: trans_input_left,
1012
input_right: trans_input_right,
1013
left_on: trans_left_on,
1014
right_on: trans_right_on,
1015
args: args.clone(),
1016
output_bool: false,
1017
},
1018
))
1019
};
1020
let mut stream = PhysStream::first(node);
1021
if let Some((offset, len)) = args.slice {
1022
stream = build_slice_stream(stream, offset, len, phys_sm);
1023
}
1024
return Ok(stream);
1025
} else if args.how.is_cross() {
1026
let node = phys_sm.insert(PhysNode::new(
1027
output_schema,
1028
PhysNodeKind::CrossJoin {
1029
input_left: phys_left,
1030
input_right: phys_right,
1031
args: args.clone(),
1032
},
1033
));
1034
let mut stream = PhysStream::first(node);
1035
if let Some((offset, len)) = args.slice {
1036
stream = build_slice_stream(stream, offset, len, phys_sm);
1037
}
1038
return Ok(stream);
1039
} else {
1040
PhysNodeKind::InMemoryJoin {
1041
input_left: phys_left,
1042
input_right: phys_right,
1043
left_on,
1044
right_on,
1045
args,
1046
options,
1047
}
1048
}
1049
},
1050
1051
IR::Distinct { input, options } => {
1052
let options = options.clone();
1053
let phys_input = lower_ir!(*input)?;
1054
1055
// We don't have a dedicated distinct operator (yet), lower to group
1056
// by with an aggregate for each column.
1057
let input_schema = &phys_sm[phys_input.node].output_schema;
1058
if input_schema.is_empty() {
1059
// Can't group (or have duplicates) if dataframe has zero-width.
1060
return Ok(phys_input);
1061
}
1062
1063
if options.maintain_order && options.keep_strategy == UniqueKeepStrategy::Last {
1064
// Unfortunately the order-preserving groupby always orders by the first occurrence
1065
// of the group so we can't lower this and have to fallback.
1066
let input_schema = phys_sm[phys_input.node].output_schema.clone();
1067
let lmdf = Arc::new(LateMaterializedDataFrame::default());
1068
let mut lp_arena = Arena::default();
1069
let input_lp_node = lp_arena.add(lmdf.clone().as_ir_node(input_schema));
1070
let distinct_lp_node = lp_arena.add(IR::Distinct {
1071
input: input_lp_node,
1072
options,
1073
});
1074
let executor = Mutex::new(create_physical_plan(
1075
distinct_lp_node,
1076
&mut lp_arena,
1077
expr_arena,
1078
None,
1079
)?);
1080
1081
let format_str = ctx.prepare_visualization.then(|| {
1082
let mut buffer = String::new();
1083
write_ir_non_recursive(
1084
&mut buffer,
1085
ir_arena.get(node),
1086
expr_arena,
1087
phys_sm.get(phys_input.node).unwrap().output_schema.as_ref(),
1088
0,
1089
)
1090
.unwrap();
1091
buffer
1092
});
1093
let distinct_node = PhysNode {
1094
output_schema,
1095
kind: PhysNodeKind::InMemoryMap {
1096
input: phys_input,
1097
map: Arc::new(move |df| {
1098
lmdf.set_materialized_dataframe(df);
1099
let mut state = ExecutionState::new();
1100
executor.lock().execute(&mut state)
1101
}),
1102
format_str,
1103
},
1104
};
1105
1106
return Ok(PhysStream::first(phys_sm.insert(distinct_node)));
1107
}
1108
1109
// Create the key and aggregate expressions.
1110
let all_col_names = input_schema.iter_names().cloned().collect_vec();
1111
let key_names = if let Some(subset) = options.subset {
1112
subset.to_vec()
1113
} else {
1114
all_col_names.clone()
1115
};
1116
let key_name_set: PlHashSet<_> = key_names.iter().cloned().collect();
1117
1118
let mut group_by_output_schema = Schema::with_capacity(all_col_names.len() + 1);
1119
let keys = key_names
1120
.iter()
1121
.map(|name| {
1122
group_by_output_schema
1123
.insert(name.clone(), input_schema.get(name).unwrap().clone());
1124
let col_expr = expr_arena.add(AExpr::Column(name.clone()));
1125
ExprIR::new(col_expr, OutputName::ColumnLhs(name.clone()))
1126
})
1127
.collect_vec();
1128
1129
let mut aggs = all_col_names
1130
.iter()
1131
.filter(|name| !key_name_set.contains(*name))
1132
.map(|name| {
1133
group_by_output_schema
1134
.insert(name.clone(), input_schema.get(name).unwrap().clone());
1135
let col_expr = expr_arena.add(AExpr::Column(name.clone()));
1136
use UniqueKeepStrategy::*;
1137
let agg_expr = match options.keep_strategy {
1138
First | None | Any => {
1139
expr_arena.add(AExpr::Agg(IRAggExpr::First(col_expr)))
1140
},
1141
Last => expr_arena.add(AExpr::Agg(IRAggExpr::Last(col_expr))),
1142
};
1143
ExprIR::new(agg_expr, OutputName::ColumnLhs(name.clone()))
1144
})
1145
.collect_vec();
1146
1147
if options.keep_strategy == UniqueKeepStrategy::None {
1148
// Track the length so we can filter out non-unique keys later.
1149
let name = unique_column_name();
1150
group_by_output_schema.insert(name.clone(), DataType::IDX_DTYPE);
1151
aggs.push(ExprIR::new(
1152
expr_arena.add(AExpr::Len),
1153
OutputName::Alias(name),
1154
));
1155
}
1156
1157
let mut stream = build_group_by_stream(
1158
phys_input,
1159
&keys,
1160
&aggs,
1161
Arc::new(group_by_output_schema),
1162
options.maintain_order,
1163
Arc::new(GroupbyOptions::default()),
1164
None,
1165
expr_arena,
1166
phys_sm,
1167
expr_cache,
1168
ctx,
1169
)?;
1170
1171
if options.keep_strategy == UniqueKeepStrategy::None {
1172
// Filter to keep only those groups with length 1.
1173
let unique_name = aggs.last().unwrap().output_name();
1174
let left = expr_arena.add(AExpr::Column(unique_name.clone()));
1175
let right = expr_arena.add(AExpr::Literal(LiteralValue::new_idxsize(1)));
1176
let predicate_aexpr = expr_arena.add(AExpr::BinaryExpr {
1177
left,
1178
op: polars_plan::dsl::Operator::Eq,
1179
right,
1180
});
1181
let predicate =
1182
ExprIR::new(predicate_aexpr, OutputName::ColumnLhs(unique_name.clone()));
1183
stream =
1184
build_filter_stream(stream, predicate, expr_arena, phys_sm, expr_cache, ctx)?;
1185
}
1186
1187
// Restore column order and drop the temporary length column if any.
1188
let exprs = all_col_names
1189
.iter()
1190
.map(|name| {
1191
let col_expr = expr_arena.add(AExpr::Column(name.clone()));
1192
ExprIR::new(col_expr, OutputName::ColumnLhs(name.clone()))
1193
})
1194
.collect_vec();
1195
stream = build_select_stream(stream, &exprs, expr_arena, phys_sm, expr_cache, ctx)?;
1196
1197
// We didn't pass the slice earlier to build_group_by_stream because
1198
// we might have the intermediate keep = "none" filter.
1199
if let Some((offset, length)) = options.slice {
1200
stream = build_slice_stream(stream, offset, length, phys_sm);
1201
}
1202
1203
return Ok(stream);
1204
},
1205
IR::ExtContext { .. } => todo!(),
1206
IR::Invalid => unreachable!(),
1207
};
1208
1209
let node_key = phys_sm.insert(PhysNode::new(output_schema, node_kind));
1210
Ok(PhysStream::first(node_key))
1211
}
1212
1213