Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-mem-engine/src/planner/lp.rs
6940 views
1
use polars_core::POOL;
2
use polars_core::prelude::*;
3
use polars_expr::state::ExecutionState;
4
use polars_plan::plans::expr_ir::ExprIR;
5
use polars_utils::format_pl_smallstr;
6
use polars_utils::unique_id::UniqueId;
7
use recursive::recursive;
8
9
use self::expr_ir::OutputName;
10
use self::predicates::{aexpr_to_column_predicates, aexpr_to_skip_batch_predicate};
11
#[cfg(feature = "python")]
12
use self::python_dsl::PythonScanSource;
13
use super::*;
14
use crate::ScanPredicate;
15
use crate::executors::{
16
self, CachePrefiller, Executor, PartitionedSinkExecutor, SinkExecutor, sink_name,
17
};
18
use crate::predicate::PhysicalColumnPredicates;
19
20
pub type StreamingExecutorBuilder =
21
fn(Node, &mut Arena<IR>, &mut Arena<AExpr>) -> PolarsResult<Box<dyn Executor>>;
22
23
fn partitionable_gb(
24
keys: &[ExprIR],
25
aggs: &[ExprIR],
26
input_schema: &Schema,
27
expr_arena: &Arena<AExpr>,
28
apply: &Option<PlanCallback<DataFrame, DataFrame>>,
29
) -> bool {
30
// checks:
31
// 1. complex expressions in the group_by itself are also not partitionable
32
// in this case anything more than col("foo")
33
// 2. a custom function cannot be partitioned
34
// 3. we don't bother with more than 2 keys, as the cardinality likely explodes
35
// by the combinations
36
if !keys.is_empty() && keys.len() < 3 && apply.is_none() {
37
// complex expressions in the group_by itself are also not partitionable
38
// in this case anything more than col("foo")
39
for key in keys {
40
if (expr_arena).iter(key.node()).count() > 1
41
|| has_aexpr(key.node(), expr_arena, |ae| match ae {
42
AExpr::Literal(lv) => !lv.is_scalar(),
43
_ => false,
44
})
45
{
46
return false;
47
}
48
}
49
50
can_pre_agg_exprs(aggs, expr_arena, input_schema)
51
} else {
52
false
53
}
54
}
55
56
#[derive(Clone)]
57
struct ConversionState {
58
has_cache_child: bool,
59
has_cache_parent: bool,
60
}
61
62
impl ConversionState {
63
fn new() -> PolarsResult<Self> {
64
Ok(ConversionState {
65
has_cache_child: false,
66
has_cache_parent: false,
67
})
68
}
69
70
fn with_new_branch<K, F: FnOnce(&mut Self) -> K>(&mut self, func: F) -> K {
71
let mut new_state = self.clone();
72
new_state.has_cache_child = false;
73
let out = func(&mut new_state);
74
self.has_cache_child = new_state.has_cache_child;
75
out
76
}
77
}
78
79
pub fn create_physical_plan(
80
root: Node,
81
lp_arena: &mut Arena<IR>,
82
expr_arena: &mut Arena<AExpr>,
83
build_streaming_executor: Option<StreamingExecutorBuilder>,
84
) -> PolarsResult<Box<dyn Executor>> {
85
let mut state = ConversionState::new()?;
86
let mut cache_nodes = Default::default();
87
let plan = create_physical_plan_impl(
88
root,
89
lp_arena,
90
expr_arena,
91
&mut state,
92
&mut cache_nodes,
93
build_streaming_executor,
94
)?;
95
96
if cache_nodes.is_empty() {
97
Ok(plan)
98
} else {
99
Ok(Box::new(CachePrefiller {
100
caches: cache_nodes,
101
phys_plan: plan,
102
}))
103
}
104
}
105
106
pub struct MultiplePhysicalPlans {
107
pub cache_prefiller: Option<Box<dyn Executor>>,
108
pub physical_plans: Vec<Box<dyn Executor>>,
109
}
110
pub fn create_multiple_physical_plans(
111
roots: &[Node],
112
lp_arena: &mut Arena<IR>,
113
expr_arena: &mut Arena<AExpr>,
114
build_streaming_executor: Option<StreamingExecutorBuilder>,
115
) -> PolarsResult<MultiplePhysicalPlans> {
116
let mut state = ConversionState::new()?;
117
let mut cache_nodes = Default::default();
118
let plans = state.with_new_branch(|new_state| {
119
roots
120
.iter()
121
.map(|&node| {
122
create_physical_plan_impl(
123
node,
124
lp_arena,
125
expr_arena,
126
new_state,
127
&mut cache_nodes,
128
build_streaming_executor,
129
)
130
})
131
.collect::<PolarsResult<Vec<_>>>()
132
})?;
133
134
let cache_prefiller = (!cache_nodes.is_empty()).then(|| {
135
struct Empty;
136
impl Executor for Empty {
137
fn execute(&mut self, _cache: &mut ExecutionState) -> PolarsResult<DataFrame> {
138
Ok(DataFrame::empty())
139
}
140
}
141
Box::new(CachePrefiller {
142
caches: cache_nodes,
143
phys_plan: Box::new(Empty),
144
}) as _
145
});
146
147
Ok(MultiplePhysicalPlans {
148
cache_prefiller,
149
physical_plans: plans,
150
})
151
}
152
153
#[cfg(feature = "python")]
154
#[allow(clippy::type_complexity)]
155
pub fn python_scan_predicate(
156
options: &mut PythonOptions,
157
expr_arena: &Arena<AExpr>,
158
state: &mut ExpressionConversionState,
159
) -> PolarsResult<(
160
Option<Arc<dyn polars_expr::prelude::PhysicalExpr>>,
161
Option<Vec<u8>>,
162
)> {
163
let mut predicate_serialized = None;
164
let predicate = if let PythonPredicate::Polars(e) = &options.predicate {
165
// Convert to a pyarrow eval string.
166
if matches!(options.python_source, PythonScanSource::Pyarrow) {
167
use polars_core::config::verbose_print_sensitive;
168
169
let predicate_pa = polars_plan::plans::python::pyarrow::predicate_to_pa(
170
e.node(),
171
expr_arena,
172
Default::default(),
173
);
174
175
verbose_print_sensitive(|| {
176
format!(
177
"python_scan_predicate: \
178
predicate node: {}, \
179
converted pyarrow predicate: {}",
180
ExprIRDisplay::display_node(e.node(), expr_arena),
181
&predicate_pa.as_deref().unwrap_or("<conversion failed>")
182
)
183
});
184
185
if let Some(eval_str) = predicate_pa {
186
options.predicate = PythonPredicate::PyArrow(eval_str);
187
// We don't have to use a physical expression as pyarrow deals with the filter.
188
None
189
} else {
190
Some(create_physical_expr(
191
e,
192
Context::Default,
193
expr_arena,
194
&options.schema,
195
state,
196
)?)
197
}
198
}
199
// Convert to physical expression for the case the reader cannot consume the predicate.
200
else {
201
let dsl_expr = e.to_expr(expr_arena);
202
predicate_serialized = polars_plan::plans::python::predicate::serialize(&dsl_expr)?;
203
204
Some(create_physical_expr(
205
e,
206
Context::Default,
207
expr_arena,
208
&options.schema,
209
state,
210
)?)
211
}
212
} else {
213
None
214
};
215
216
Ok((predicate, predicate_serialized))
217
}
218
219
#[recursive]
220
fn create_physical_plan_impl(
221
root: Node,
222
lp_arena: &mut Arena<IR>,
223
expr_arena: &mut Arena<AExpr>,
224
state: &mut ConversionState,
225
// Cache nodes in order of discovery
226
cache_nodes: &mut PlIndexMap<UniqueId, executors::CachePrefill>,
227
build_streaming_executor: Option<StreamingExecutorBuilder>,
228
) -> PolarsResult<Box<dyn Executor>> {
229
use IR::*;
230
231
macro_rules! recurse {
232
($node:expr, $state: expr) => {
233
create_physical_plan_impl(
234
$node,
235
lp_arena,
236
expr_arena,
237
$state,
238
cache_nodes,
239
build_streaming_executor,
240
)
241
};
242
}
243
244
let logical_plan = if state.has_cache_parent
245
|| matches!(
246
lp_arena.get(root),
247
IR::Scan { .. } // Needed for the streaming impl
248
| IR::Cache { .. } // Needed for plans branching from the same cache node
249
| IR::Sink { // Needed for the streaming impl
250
payload: SinkTypeIR::Partition(_),
251
..
252
}
253
) {
254
lp_arena.get(root).clone()
255
} else {
256
lp_arena.take(root)
257
};
258
259
match logical_plan {
260
#[cfg(feature = "python")]
261
PythonScan { mut options } => {
262
let mut expr_conv_state = ExpressionConversionState::new(true);
263
let (predicate, predicate_serialized) =
264
python_scan_predicate(&mut options, expr_arena, &mut expr_conv_state)?;
265
Ok(Box::new(executors::PythonScanExec {
266
options,
267
predicate,
268
predicate_serialized,
269
}))
270
},
271
Sink { input, payload } => {
272
let input = recurse!(input, state)?;
273
match payload {
274
SinkTypeIR::Memory => Ok(Box::new(SinkExecutor {
275
input,
276
name: "mem".to_string(),
277
f: Box::new(move |df, _state| Ok(Some(df))),
278
})),
279
SinkTypeIR::File(FileSinkType {
280
file_type,
281
target,
282
sink_options,
283
cloud_options,
284
}) => {
285
let name = sink_name(&file_type).to_owned();
286
Ok(Box::new(SinkExecutor {
287
input,
288
name,
289
f: Box::new(move |mut df, _state| {
290
let mut file = target
291
.open_into_writeable(&sink_options, cloud_options.as_ref())?;
292
let writer = &mut *file;
293
294
use std::io::BufWriter;
295
match &file_type {
296
#[cfg(feature = "parquet")]
297
FileType::Parquet(options) => {
298
use polars_io::parquet::write::ParquetWriter;
299
ParquetWriter::new(BufWriter::new(writer))
300
.with_compression(options.compression)
301
.with_statistics(options.statistics)
302
.with_row_group_size(options.row_group_size)
303
.with_data_page_size(options.data_page_size)
304
.with_key_value_metadata(options.key_value_metadata.clone())
305
.finish(&mut df)?;
306
},
307
#[cfg(feature = "ipc")]
308
FileType::Ipc(options) => {
309
use polars_io::SerWriter;
310
use polars_io::ipc::IpcWriter;
311
IpcWriter::new(BufWriter::new(writer))
312
.with_compression(options.compression)
313
.with_compat_level(options.compat_level)
314
.finish(&mut df)?;
315
},
316
#[cfg(feature = "csv")]
317
FileType::Csv(options) => {
318
use polars_io::SerWriter;
319
use polars_io::csv::write::CsvWriter;
320
CsvWriter::new(BufWriter::new(writer))
321
.include_bom(options.include_bom)
322
.include_header(options.include_header)
323
.with_separator(options.serialize_options.separator)
324
.with_line_terminator(
325
options.serialize_options.line_terminator.clone(),
326
)
327
.with_quote_char(options.serialize_options.quote_char)
328
.with_batch_size(options.batch_size)
329
.with_datetime_format(
330
options.serialize_options.datetime_format.clone(),
331
)
332
.with_date_format(
333
options.serialize_options.date_format.clone(),
334
)
335
.with_time_format(
336
options.serialize_options.time_format.clone(),
337
)
338
.with_float_scientific(
339
options.serialize_options.float_scientific,
340
)
341
.with_float_precision(
342
options.serialize_options.float_precision,
343
)
344
.with_decimal_comma(options.serialize_options.decimal_comma)
345
.with_null_value(options.serialize_options.null.clone())
346
.with_quote_style(options.serialize_options.quote_style)
347
.finish(&mut df)?;
348
},
349
#[cfg(feature = "json")]
350
FileType::Json(_options) => {
351
use polars_io::SerWriter;
352
use polars_io::json::{JsonFormat, JsonWriter};
353
354
JsonWriter::new(BufWriter::new(writer))
355
.with_json_format(JsonFormat::JsonLines)
356
.finish(&mut df)?;
357
},
358
#[allow(unreachable_patterns)]
359
_ => panic!("enable filetype feature"),
360
}
361
362
file.sync_on_close(sink_options.sync_on_close)?;
363
file.close()?;
364
365
Ok(None)
366
}),
367
}))
368
},
369
SinkTypeIR::Partition(_) => {
370
let builder = build_streaming_executor
371
.expect("invalid build. Missing feature new-streaming");
372
373
let executor = Box::new(PartitionedSinkExecutor::new(
374
input, builder, root, lp_arena, expr_arena,
375
));
376
377
// Use cache so that this runs during the cache pre-filling stage and not on the
378
// thread pool, it could deadlock since the streaming engine uses the thread
379
// pool internally.
380
let mut prefill = executors::CachePrefill::new_sink(executor);
381
let exec = prefill.make_exec();
382
let existing = cache_nodes.insert(prefill.id(), prefill);
383
384
assert!(existing.is_none());
385
386
Ok(Box::new(exec))
387
},
388
}
389
},
390
SinkMultiple { .. } => {
391
unreachable!("should be handled with create_multiple_physical_plans")
392
},
393
Union { inputs, options } => {
394
let inputs = state.with_new_branch(|new_state| {
395
inputs
396
.into_iter()
397
.map(|node| recurse!(node, new_state))
398
.collect::<PolarsResult<Vec<_>>>()
399
});
400
let inputs = inputs?;
401
Ok(Box::new(executors::UnionExec { inputs, options }))
402
},
403
HConcat {
404
inputs, options, ..
405
} => {
406
let inputs = state.with_new_branch(|new_state| {
407
inputs
408
.into_iter()
409
.map(|node| recurse!(node, new_state))
410
.collect::<PolarsResult<Vec<_>>>()
411
});
412
413
let inputs = inputs?;
414
415
Ok(Box::new(executors::HConcatExec { inputs, options }))
416
},
417
Slice { input, offset, len } => {
418
let input = recurse!(input, state)?;
419
Ok(Box::new(executors::SliceExec { input, offset, len }))
420
},
421
Filter { input, predicate } => {
422
let streamable = is_elementwise_rec(predicate.node(), expr_arena);
423
let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
424
let input = recurse!(input, state)?;
425
let mut state = ExpressionConversionState::new(true);
426
let predicate = create_physical_expr(
427
&predicate,
428
Context::Default,
429
expr_arena,
430
&input_schema,
431
&mut state,
432
)?;
433
Ok(Box::new(executors::FilterExec::new(
434
predicate,
435
input,
436
state.has_windows,
437
streamable,
438
)))
439
},
440
#[allow(unused_variables)]
441
Scan {
442
sources,
443
file_info,
444
hive_parts,
445
output_schema,
446
scan_type,
447
predicate,
448
unified_scan_args,
449
} => {
450
let mut expr_conversion_state = ExpressionConversionState::new(true);
451
452
let mut create_skip_batch_predicate = false;
453
#[cfg(feature = "parquet")]
454
{
455
create_skip_batch_predicate |= matches!(
456
&*scan_type,
457
FileScanIR::Parquet {
458
options: polars_io::prelude::ParquetOptions {
459
use_statistics: true,
460
..
461
},
462
..
463
}
464
);
465
}
466
467
let predicate = predicate
468
.map(|predicate| {
469
create_scan_predicate(
470
&predicate,
471
expr_arena,
472
output_schema.as_ref().unwrap_or(&file_info.schema),
473
None, // hive_schema
474
&mut expr_conversion_state,
475
create_skip_batch_predicate,
476
false,
477
)
478
})
479
.transpose()?;
480
481
match *scan_type {
482
FileScanIR::Anonymous { function, .. } => {
483
Ok(Box::new(executors::AnonymousScanExec {
484
function,
485
predicate,
486
unified_scan_args,
487
file_info,
488
output_schema,
489
predicate_has_windows: expr_conversion_state.has_windows,
490
}))
491
},
492
#[allow(unreachable_patterns)]
493
_ => {
494
// We wrap in a CacheExec so that the new-streaming scan gets called from the
495
// CachePrefiller. This ensures it is called from outside of rayon to avoid
496
// deadlocks.
497
//
498
// Note that we don't actually want it to be kept in memory after being used,
499
// so we set the count to have it be dropped after a single use (or however
500
// many times it is referenced after CSE (subplan)).
501
state.has_cache_parent = true;
502
state.has_cache_child = true;
503
504
let build_func = build_streaming_executor
505
.expect("invalid build. Missing feature new-streaming");
506
507
let executor = build_func(root, lp_arena, expr_arena)?;
508
509
let mut prefill = executors::CachePrefill::new_scan(executor);
510
let exec = prefill.make_exec();
511
512
let existing = cache_nodes.insert(prefill.id(), prefill);
513
514
assert!(existing.is_none());
515
516
Ok(Box::new(exec))
517
},
518
#[allow(unreachable_patterns)]
519
_ => unreachable!(),
520
}
521
},
522
523
Select {
524
expr,
525
input,
526
schema: _schema,
527
options,
528
..
529
} => {
530
let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
531
let input = recurse!(input, state)?;
532
let mut state = ExpressionConversionState::new(POOL.current_num_threads() > expr.len());
533
let phys_expr = create_physical_expressions_from_irs(
534
&expr,
535
Context::Default,
536
expr_arena,
537
&input_schema,
538
&mut state,
539
)?;
540
541
let allow_vertical_parallelism = options.should_broadcast && expr.iter().all(|e| is_elementwise_rec(e.node(), expr_arena))
542
// If all columns are literal we would get a 1 row per thread.
543
&& !phys_expr.iter().all(|p| {
544
p.is_literal()
545
});
546
547
Ok(Box::new(executors::ProjectionExec {
548
input,
549
expr: phys_expr,
550
has_windows: state.has_windows,
551
input_schema,
552
#[cfg(test)]
553
schema: _schema,
554
options,
555
allow_vertical_parallelism,
556
}))
557
},
558
DataFrameScan {
559
df, output_schema, ..
560
} => Ok(Box::new(executors::DataFrameExec {
561
df,
562
projection: output_schema.map(|s| s.iter_names_cloned().collect()),
563
})),
564
Sort {
565
input,
566
by_column,
567
slice,
568
sort_options,
569
} => {
570
debug_assert!(!by_column.is_empty());
571
let input_schema = lp_arena.get(input).schema(lp_arena);
572
let by_column = create_physical_expressions_from_irs(
573
&by_column,
574
Context::Default,
575
expr_arena,
576
input_schema.as_ref(),
577
&mut ExpressionConversionState::new(true),
578
)?;
579
let input = recurse!(input, state)?;
580
Ok(Box::new(executors::SortExec {
581
input,
582
by_column,
583
slice,
584
sort_options,
585
}))
586
},
587
Cache { input, id } => {
588
state.has_cache_parent = true;
589
state.has_cache_child = true;
590
591
if let Some(cache) = cache_nodes.get_mut(&id) {
592
Ok(Box::new(cache.make_exec()))
593
} else {
594
let input = recurse!(input, state)?;
595
596
let mut prefill = executors::CachePrefill::new_cache(input, id);
597
let exec = prefill.make_exec();
598
599
cache_nodes.insert(id, prefill);
600
601
Ok(Box::new(exec))
602
}
603
},
604
Distinct { input, options } => {
605
let input = recurse!(input, state)?;
606
Ok(Box::new(executors::UniqueExec { input, options }))
607
},
608
GroupBy {
609
input,
610
keys,
611
aggs,
612
apply,
613
schema,
614
maintain_order,
615
options,
616
} => {
617
let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
618
let options = Arc::try_unwrap(options).unwrap_or_else(|options| (*options).clone());
619
let phys_keys = create_physical_expressions_from_irs(
620
&keys,
621
Context::Default,
622
expr_arena,
623
&input_schema,
624
&mut ExpressionConversionState::new(true),
625
)?;
626
let phys_aggs = create_physical_expressions_from_irs(
627
&aggs,
628
Context::Aggregation,
629
expr_arena,
630
&input_schema,
631
&mut ExpressionConversionState::new(true),
632
)?;
633
634
let _slice = options.slice;
635
#[cfg(feature = "dynamic_group_by")]
636
if let Some(options) = options.dynamic {
637
let input = recurse!(input, state)?;
638
return Ok(Box::new(executors::GroupByDynamicExec {
639
input,
640
keys: phys_keys,
641
aggs: phys_aggs,
642
options,
643
input_schema,
644
slice: _slice,
645
apply,
646
}));
647
}
648
649
#[cfg(feature = "dynamic_group_by")]
650
if let Some(options) = options.rolling {
651
let input = recurse!(input, state)?;
652
return Ok(Box::new(executors::GroupByRollingExec {
653
input,
654
keys: phys_keys,
655
aggs: phys_aggs,
656
options,
657
input_schema,
658
slice: _slice,
659
apply,
660
}));
661
}
662
663
// We first check if we can partition the group_by on the latest moment.
664
let partitionable = partitionable_gb(&keys, &aggs, &input_schema, expr_arena, &apply);
665
if partitionable {
666
let from_partitioned_ds = lp_arena.iter(input).any(|(_, lp)| {
667
if let Union { options, .. } = lp {
668
options.from_partitioned_ds
669
} else {
670
false
671
}
672
});
673
let input = recurse!(input, state)?;
674
let keys = keys
675
.iter()
676
.map(|e| e.to_expr(expr_arena))
677
.collect::<Vec<_>>();
678
let aggs = aggs
679
.iter()
680
.map(|e| e.to_expr(expr_arena))
681
.collect::<Vec<_>>();
682
Ok(Box::new(executors::PartitionGroupByExec::new(
683
input,
684
phys_keys,
685
phys_aggs,
686
maintain_order,
687
options.slice,
688
input_schema,
689
schema,
690
from_partitioned_ds,
691
keys,
692
aggs,
693
)))
694
} else {
695
let input = recurse!(input, state)?;
696
Ok(Box::new(executors::GroupByExec::new(
697
input,
698
phys_keys,
699
phys_aggs,
700
apply,
701
maintain_order,
702
input_schema,
703
options.slice,
704
)))
705
}
706
},
707
Join {
708
input_left,
709
input_right,
710
left_on,
711
right_on,
712
options,
713
schema,
714
..
715
} => {
716
let schema_left = lp_arena.get(input_left).schema(lp_arena).into_owned();
717
let schema_right = lp_arena.get(input_right).schema(lp_arena).into_owned();
718
719
let (input_left, input_right) = state.with_new_branch(|new_state| {
720
(
721
recurse!(input_left, new_state),
722
recurse!(input_right, new_state),
723
)
724
});
725
let input_left = input_left?;
726
let input_right = input_right?;
727
728
// Todo! remove the force option. It can deadlock.
729
let parallel = if options.force_parallel {
730
true
731
} else {
732
options.allow_parallel
733
};
734
735
let left_on = create_physical_expressions_from_irs(
736
&left_on,
737
Context::Default,
738
expr_arena,
739
&schema_left,
740
&mut ExpressionConversionState::new(true),
741
)?;
742
let right_on = create_physical_expressions_from_irs(
743
&right_on,
744
Context::Default,
745
expr_arena,
746
&schema_right,
747
&mut ExpressionConversionState::new(true),
748
)?;
749
let options = Arc::try_unwrap(options).unwrap_or_else(|options| (*options).clone());
750
751
// Convert the join options, to the physical join options. This requires the physical
752
// planner, so we do this last minute.
753
let join_type_options = options
754
.options
755
.map(|o| {
756
o.compile(|e| {
757
let phys_expr = create_physical_expr(
758
e,
759
Context::Default,
760
expr_arena,
761
&schema,
762
&mut ExpressionConversionState::new(false),
763
)?;
764
765
let execution_state = ExecutionState::default();
766
767
Ok(Arc::new(move |df: DataFrame| {
768
let mask = phys_expr.evaluate(&df, &execution_state)?;
769
let mask = mask.as_materialized_series();
770
let mask = mask.bool()?;
771
df._filter_seq(mask)
772
}))
773
})
774
})
775
.transpose()?;
776
777
Ok(Box::new(executors::JoinExec::new(
778
input_left,
779
input_right,
780
left_on,
781
right_on,
782
parallel,
783
options.args,
784
join_type_options,
785
)))
786
},
787
HStack {
788
input,
789
exprs,
790
schema: output_schema,
791
options,
792
} => {
793
let input_schema = lp_arena.get(input).schema(lp_arena).into_owned();
794
let input = recurse!(input, state)?;
795
796
let allow_vertical_parallelism = options.should_broadcast
797
&& exprs
798
.iter()
799
.all(|e| is_elementwise_rec(e.node(), expr_arena));
800
801
let mut state =
802
ExpressionConversionState::new(POOL.current_num_threads() > exprs.len());
803
804
let phys_exprs = create_physical_expressions_from_irs(
805
&exprs,
806
Context::Default,
807
expr_arena,
808
&input_schema,
809
&mut state,
810
)?;
811
Ok(Box::new(executors::StackExec {
812
input,
813
has_windows: state.has_windows,
814
exprs: phys_exprs,
815
input_schema,
816
output_schema,
817
options,
818
allow_vertical_parallelism,
819
}))
820
},
821
MapFunction {
822
input, function, ..
823
} => {
824
let input = recurse!(input, state)?;
825
Ok(Box::new(executors::UdfExec { input, function }))
826
},
827
ExtContext {
828
input, contexts, ..
829
} => {
830
let input = recurse!(input, state)?;
831
let contexts = contexts
832
.into_iter()
833
.map(|node| recurse!(node, state))
834
.collect::<PolarsResult<_>>()?;
835
Ok(Box::new(executors::ExternalContext { input, contexts }))
836
},
837
SimpleProjection { input, columns } => {
838
let input = recurse!(input, state)?;
839
let exec = executors::ProjectionSimple { input, columns };
840
Ok(Box::new(exec))
841
},
842
#[cfg(feature = "merge_sorted")]
843
MergeSorted {
844
input_left,
845
input_right,
846
key,
847
} => {
848
let (input_left, input_right) = state.with_new_branch(|new_state| {
849
(
850
recurse!(input_left, new_state),
851
recurse!(input_right, new_state),
852
)
853
});
854
let input_left = input_left?;
855
let input_right = input_right?;
856
857
let exec = executors::MergeSorted {
858
input_left,
859
input_right,
860
key,
861
};
862
Ok(Box::new(exec))
863
},
864
Invalid => unreachable!(),
865
}
866
}
867
868
pub fn create_scan_predicate(
869
predicate: &ExprIR,
870
expr_arena: &mut Arena<AExpr>,
871
schema: &Arc<Schema>,
872
hive_schema: Option<&Schema>,
873
state: &mut ExpressionConversionState,
874
create_skip_batch_predicate: bool,
875
create_column_predicates: bool,
876
) -> PolarsResult<ScanPredicate> {
877
let mut predicate = predicate.clone();
878
879
let mut hive_predicate = None;
880
let mut hive_predicate_is_full_predicate = false;
881
882
#[expect(clippy::never_loop)]
883
loop {
884
let Some(hive_schema) = hive_schema else {
885
break;
886
};
887
888
let mut hive_predicate_parts = vec![];
889
let mut non_hive_predicate_parts = vec![];
890
891
for predicate_part in MintermIter::new(predicate.node(), expr_arena) {
892
if aexpr_to_leaf_names_iter(predicate_part, expr_arena)
893
.all(|name| hive_schema.contains(&name))
894
{
895
hive_predicate_parts.push(predicate_part)
896
} else {
897
non_hive_predicate_parts.push(predicate_part)
898
}
899
}
900
901
if hive_predicate_parts.is_empty() {
902
break;
903
}
904
905
if non_hive_predicate_parts.is_empty() {
906
hive_predicate_is_full_predicate = true;
907
break;
908
}
909
910
{
911
let mut iter = hive_predicate_parts.into_iter();
912
let mut node = iter.next().unwrap();
913
914
for next_node in iter {
915
node = expr_arena.add(AExpr::BinaryExpr {
916
left: node,
917
op: Operator::And,
918
right: next_node,
919
});
920
}
921
922
hive_predicate = Some(create_physical_expr(
923
&ExprIR::from_node(node, expr_arena),
924
Context::Default,
925
expr_arena,
926
schema,
927
state,
928
)?)
929
}
930
931
{
932
let mut iter = non_hive_predicate_parts.into_iter();
933
let mut node = iter.next().unwrap();
934
935
for next_node in iter {
936
node = expr_arena.add(AExpr::BinaryExpr {
937
left: node,
938
op: Operator::And,
939
right: next_node,
940
});
941
}
942
943
predicate = ExprIR::from_node(node, expr_arena);
944
}
945
946
break;
947
}
948
949
let phys_predicate =
950
create_physical_expr(&predicate, Context::Default, expr_arena, schema, state)?;
951
952
if hive_predicate_is_full_predicate {
953
hive_predicate = Some(phys_predicate.clone());
954
}
955
956
let live_columns = Arc::new(PlIndexSet::from_iter(aexpr_to_leaf_names_iter(
957
predicate.node(),
958
expr_arena,
959
)));
960
961
let mut skip_batch_predicate = None;
962
963
if create_skip_batch_predicate {
964
if let Some(node) = aexpr_to_skip_batch_predicate(predicate.node(), expr_arena, schema) {
965
let expr = ExprIR::new(node, predicate.output_name_inner().clone());
966
967
if std::env::var("POLARS_OUTPUT_SKIP_BATCH_PRED").as_deref() == Ok("1") {
968
eprintln!("predicate: {}", predicate.display(expr_arena));
969
eprintln!("skip_batch_predicate: {}", expr.display(expr_arena));
970
}
971
972
let mut skip_batch_schema = Schema::with_capacity(1 + live_columns.len());
973
974
skip_batch_schema.insert(PlSmallStr::from_static("len"), IDX_DTYPE);
975
for (col, dtype) in schema.iter() {
976
if !live_columns.contains(col) {
977
continue;
978
}
979
980
skip_batch_schema.insert(format_pl_smallstr!("{col}_min"), dtype.clone());
981
skip_batch_schema.insert(format_pl_smallstr!("{col}_max"), dtype.clone());
982
skip_batch_schema.insert(format_pl_smallstr!("{col}_nc"), IDX_DTYPE);
983
}
984
985
skip_batch_predicate = Some(create_physical_expr(
986
&expr,
987
Context::Default,
988
expr_arena,
989
&Arc::new(skip_batch_schema),
990
state,
991
)?);
992
}
993
}
994
995
let column_predicates = if create_column_predicates {
996
let column_predicates = aexpr_to_column_predicates(predicate.node(), expr_arena, schema);
997
if std::env::var("POLARS_OUTPUT_COLUMN_PREDS").as_deref() == Ok("1") {
998
eprintln!("column_predicates: {{");
999
eprintln!(" [");
1000
for (pred, spec) in column_predicates.predicates.values() {
1001
eprintln!(
1002
" {} ({spec:?}),",
1003
ExprIRDisplay::display_node(*pred, expr_arena)
1004
);
1005
}
1006
eprintln!(" ],");
1007
eprintln!(
1008
" is_sumwise_complete: {}",
1009
column_predicates.is_sumwise_complete
1010
);
1011
eprintln!("}}");
1012
}
1013
PhysicalColumnPredicates {
1014
predicates: column_predicates
1015
.predicates
1016
.into_iter()
1017
.map(|(n, (p, s))| {
1018
PolarsResult::Ok((
1019
n,
1020
(
1021
create_physical_expr(
1022
&ExprIR::new(p, OutputName::Alias(PlSmallStr::EMPTY)),
1023
Context::Default,
1024
expr_arena,
1025
schema,
1026
state,
1027
)?,
1028
s,
1029
),
1030
))
1031
})
1032
.collect::<PolarsResult<PlHashMap<_, _>>>()?,
1033
is_sumwise_complete: column_predicates.is_sumwise_complete,
1034
}
1035
} else {
1036
PhysicalColumnPredicates {
1037
predicates: PlHashMap::default(),
1038
is_sumwise_complete: false,
1039
}
1040
};
1041
1042
PolarsResult::Ok(ScanPredicate {
1043
predicate: phys_predicate,
1044
live_columns,
1045
skip_batch_predicate,
1046
column_predicates,
1047
hive_predicate,
1048
hive_predicate_is_full_predicate,
1049
})
1050
}
1051
1052
#[cfg(test)]
1053
mod tests {
1054
use super::*;
1055
1056
#[test]
1057
fn test_create_multiple_physical_plans_reused_cache() {
1058
// Check that reusing the same cache node doesn't panic.
1059
// CSE creates duplicate cache nodes with the same ID, but cloud reuses them.
1060
1061
let mut ir = Arena::new();
1062
1063
let schema = Schema::from_iter([(PlSmallStr::from_static("x"), DataType::Float32)]);
1064
let scan = ir.add(IR::DataFrameScan {
1065
df: Arc::new(DataFrame::empty_with_schema(&schema)),
1066
schema: Arc::new(schema),
1067
output_schema: None,
1068
});
1069
1070
let cache = ir.add(IR::Cache {
1071
input: scan,
1072
id: UniqueId::new(),
1073
});
1074
1075
let left_sink = ir.add(IR::Sink {
1076
input: cache,
1077
payload: SinkTypeIR::Memory,
1078
});
1079
let right_sink = ir.add(IR::Sink {
1080
input: cache,
1081
payload: SinkTypeIR::Memory,
1082
});
1083
1084
let _multiplan = create_multiple_physical_plans(
1085
&[left_sink, right_sink],
1086
&mut ir,
1087
&mut Arena::new(),
1088
None,
1089
)
1090
.unwrap();
1091
}
1092
}
1093
1094