Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-mem-engine/src/scan_predicate/functions.rs
7884 views
1
use std::cell::LazyCell;
2
use std::sync::Arc;
3
4
use polars_core::config;
5
use polars_core::error::PolarsResult;
6
use polars_core::prelude::{IDX_DTYPE, IdxCa, InitHashMaps, PlHashMap, PlIndexMap, PlIndexSet};
7
use polars_core::schema::Schema;
8
use polars_error::polars_warn;
9
use polars_expr::{ExpressionConversionState, create_physical_expr};
10
use polars_io::predicates::ScanIOPredicate;
11
use polars_plan::dsl::default_values::{
12
DefaultFieldValues, IcebergIdentityTransformedPartitionFields,
13
};
14
use polars_plan::dsl::deletion::DeletionFilesList;
15
use polars_plan::dsl::{
16
FileScanIR, Operator, PredicateFileSkip, ScanSources, TableStatistics, UnifiedScanArgs,
17
};
18
use polars_plan::plans::expr_ir::{ExprIR, OutputName};
19
use polars_plan::plans::hive::HivePartitionsDf;
20
use polars_plan::plans::predicates::{aexpr_to_column_predicates, aexpr_to_skip_batch_predicate};
21
use polars_plan::plans::{AExpr, ExprIRDisplay, FileInfo, IR, MintermIter};
22
use polars_plan::utils::aexpr_to_leaf_names_iter;
23
use polars_utils::arena::{Arena, Node};
24
use polars_utils::pl_str::PlSmallStr;
25
use polars_utils::{IdxSize, format_pl_smallstr};
26
27
use crate::scan_predicate::skip_files_mask::SkipFilesMask;
28
use crate::scan_predicate::{PhysicalColumnPredicates, ScanPredicate};
29
30
pub fn create_scan_predicate(
31
predicate: &ExprIR,
32
expr_arena: &mut Arena<AExpr>,
33
schema: &Arc<Schema>,
34
hive_schema: Option<&Schema>,
35
state: &mut ExpressionConversionState,
36
create_skip_batch_predicate: bool,
37
create_column_predicates: bool,
38
) -> PolarsResult<ScanPredicate> {
39
let mut predicate = predicate.clone();
40
41
let mut hive_predicate = None;
42
let mut hive_predicate_is_full_predicate = false;
43
44
#[expect(clippy::never_loop)]
45
loop {
46
let Some(hive_schema) = hive_schema else {
47
break;
48
};
49
50
let mut hive_predicate_parts = vec![];
51
let mut non_hive_predicate_parts = vec![];
52
53
for predicate_part in MintermIter::new(predicate.node(), expr_arena) {
54
if aexpr_to_leaf_names_iter(predicate_part, expr_arena)
55
.all(|name| hive_schema.contains(name))
56
{
57
hive_predicate_parts.push(predicate_part)
58
} else {
59
non_hive_predicate_parts.push(predicate_part)
60
}
61
}
62
63
if hive_predicate_parts.is_empty() {
64
break;
65
}
66
67
if non_hive_predicate_parts.is_empty() {
68
hive_predicate_is_full_predicate = true;
69
break;
70
}
71
72
{
73
let mut iter = hive_predicate_parts.into_iter();
74
let mut node = iter.next().unwrap();
75
76
for next_node in iter {
77
node = expr_arena.add(AExpr::BinaryExpr {
78
left: node,
79
op: Operator::And,
80
right: next_node,
81
});
82
}
83
84
hive_predicate = Some(create_physical_expr(
85
&ExprIR::from_node(node, expr_arena),
86
expr_arena,
87
schema,
88
state,
89
)?)
90
}
91
92
{
93
let mut iter = non_hive_predicate_parts.into_iter();
94
let mut node = iter.next().unwrap();
95
96
for next_node in iter {
97
node = expr_arena.add(AExpr::BinaryExpr {
98
left: node,
99
op: Operator::And,
100
right: next_node,
101
});
102
}
103
104
predicate = ExprIR::from_node(node, expr_arena);
105
}
106
107
break;
108
}
109
110
let phys_predicate = create_physical_expr(&predicate, expr_arena, schema, state)?;
111
112
if hive_predicate_is_full_predicate {
113
hive_predicate = Some(phys_predicate.clone());
114
}
115
116
let live_columns = Arc::new(PlIndexSet::from_iter(
117
aexpr_to_leaf_names_iter(predicate.node(), expr_arena).cloned(),
118
));
119
120
let mut skip_batch_predicate = None;
121
122
if create_skip_batch_predicate {
123
if let Some(node) = aexpr_to_skip_batch_predicate(predicate.node(), expr_arena, schema) {
124
let expr = ExprIR::new(node, predicate.output_name_inner().clone());
125
126
if std::env::var("POLARS_OUTPUT_SKIP_BATCH_PRED").as_deref() == Ok("1") {
127
eprintln!("predicate: {}", predicate.display(expr_arena));
128
eprintln!("skip_batch_predicate: {}", expr.display(expr_arena));
129
}
130
131
let mut skip_batch_schema = Schema::with_capacity(1 + live_columns.len());
132
133
skip_batch_schema.insert(PlSmallStr::from_static("len"), IDX_DTYPE);
134
for (col, dtype) in schema.iter() {
135
if !live_columns.contains(col) {
136
continue;
137
}
138
139
skip_batch_schema.insert(format_pl_smallstr!("{col}_min"), dtype.clone());
140
skip_batch_schema.insert(format_pl_smallstr!("{col}_max"), dtype.clone());
141
skip_batch_schema.insert(format_pl_smallstr!("{col}_nc"), IDX_DTYPE);
142
}
143
144
skip_batch_predicate = Some(create_physical_expr(
145
&expr,
146
expr_arena,
147
&Arc::new(skip_batch_schema),
148
state,
149
)?);
150
}
151
}
152
153
let column_predicates = if create_column_predicates {
154
let column_predicates = aexpr_to_column_predicates(predicate.node(), expr_arena, schema);
155
if std::env::var("POLARS_OUTPUT_COLUMN_PREDS").as_deref() == Ok("1") {
156
eprintln!("column_predicates: {{");
157
eprintln!(" [");
158
for (pred, spec) in column_predicates.predicates.values() {
159
eprintln!(
160
" {} ({spec:?}),",
161
ExprIRDisplay::display_node(*pred, expr_arena)
162
);
163
}
164
eprintln!(" ],");
165
eprintln!(
166
" is_sumwise_complete: {}",
167
column_predicates.is_sumwise_complete
168
);
169
eprintln!("}}");
170
}
171
PhysicalColumnPredicates {
172
predicates: column_predicates
173
.predicates
174
.into_iter()
175
.map(|(n, (p, s))| {
176
PolarsResult::Ok((
177
n,
178
(
179
create_physical_expr(
180
&ExprIR::new(p, OutputName::Alias(PlSmallStr::EMPTY)),
181
expr_arena,
182
schema,
183
state,
184
)?,
185
s,
186
),
187
))
188
})
189
.collect::<PolarsResult<PlHashMap<_, _>>>()?,
190
is_sumwise_complete: column_predicates.is_sumwise_complete,
191
}
192
} else {
193
PhysicalColumnPredicates {
194
predicates: PlHashMap::default(),
195
is_sumwise_complete: false,
196
}
197
};
198
199
PolarsResult::Ok(ScanPredicate {
200
predicate: phys_predicate,
201
live_columns,
202
skip_batch_predicate,
203
column_predicates,
204
hive_predicate,
205
hive_predicate_is_full_predicate,
206
})
207
}
208
209
/// # Returns
210
/// (skip_files_mask, predicate)
211
pub fn initialize_scan_predicate<'a>(
212
predicate: Option<&'a ScanIOPredicate>,
213
hive_parts: Option<&HivePartitionsDf>,
214
table_statsitics: Option<&TableStatistics>,
215
verbose: bool,
216
) -> PolarsResult<(Option<SkipFilesMask>, Option<&'a ScanIOPredicate>)> {
217
#[expect(clippy::never_loop)]
218
loop {
219
let Some(predicate) = predicate else {
220
break;
221
};
222
223
let expected_mask_len: usize;
224
225
let (skip_files_mask, send_predicate_to_readers) = if let Some(hive_parts) = hive_parts
226
&& let Some(hive_predicate) = &predicate.hive_predicate
227
{
228
if verbose {
229
eprintln!(
230
"initialize_scan_predicate: Source filter mask initialization via hive partitions"
231
);
232
}
233
234
expected_mask_len = hive_parts.df().height();
235
236
let inclusion_mask = hive_predicate
237
.evaluate_io(hive_parts.df())?
238
.bool()?
239
.rechunk()
240
.into_owned()
241
.downcast_into_iter()
242
.next()
243
.unwrap()
244
.values()
245
.clone();
246
247
(
248
SkipFilesMask::Inclusion(inclusion_mask),
249
!predicate.hive_predicate_is_full_predicate,
250
)
251
} else if let Some(table_statsitics) = table_statsitics
252
&& let Some(skip_batch_predicate) = &predicate.skip_batch_predicate
253
{
254
if verbose {
255
eprintln!(
256
"initialize_scan_predicate: Source filter mask initialization via table statistics"
257
);
258
}
259
260
expected_mask_len = table_statsitics.0.height();
261
262
let exclusion_mask = skip_batch_predicate.evaluate_with_stat_df(&table_statsitics.0)?;
263
264
(SkipFilesMask::Exclusion(exclusion_mask), true)
265
} else {
266
break;
267
};
268
269
if skip_files_mask.len() != expected_mask_len {
270
polars_warn!(
271
"WARNING: \
272
initialize_scan_predicate: \
273
filter mask length mismatch (length: {}, expected: {}). Files \
274
will not be skipped. This is a bug; please open an issue with \
275
a reproducible example if possible.",
276
skip_files_mask.len(),
277
expected_mask_len
278
);
279
return Ok((None, Some(predicate)));
280
}
281
282
if verbose {
283
eprintln!(
284
"initialize_scan_predicate: Predicate pushdown allows skipping {} / {} files",
285
skip_files_mask.num_skipped_files(),
286
skip_files_mask.len()
287
);
288
}
289
290
return Ok((
291
Some(skip_files_mask),
292
send_predicate_to_readers.then_some(predicate),
293
));
294
}
295
296
Ok((None, predicate))
297
}
298
299
/// Filters the list of files in an `IR::Scan` based on the contained predicate. This is possible
300
/// if the predicate has components that refer to only the hive parts and there is no e.g.
301
/// row index / slice.
302
///
303
/// This also applies the projection onto the hive parts.
304
///
305
/// # Panics
306
/// Panics if `scan_ir_node` is not `IR::Scan`.
307
pub fn apply_scan_predicate_to_scan_ir(
308
scan_ir_node: Node,
309
ir_arena: &mut Arena<IR>,
310
expr_arena: &mut Arena<AExpr>,
311
) -> PolarsResult<()> {
312
let scan_ir_schema = IR::schema(ir_arena.get(scan_ir_node), ir_arena).into_owned();
313
let scan_ir = ir_arena.get_mut(scan_ir_node);
314
315
let IR::Scan {
316
sources,
317
hive_parts,
318
predicate,
319
predicate_file_skip_applied,
320
unified_scan_args,
321
file_info,
322
..
323
} = scan_ir
324
else {
325
unreachable!()
326
};
327
328
if let Some(hive_parts) = hive_parts.as_mut() {
329
*hive_parts = hive_parts.filter_columns(&scan_ir_schema);
330
}
331
332
if unified_scan_args.has_row_index_or_slice() || predicate_file_skip_applied.is_some() {
333
return Ok(());
334
}
335
336
let Some(predicate) = predicate else {
337
return Ok(());
338
};
339
340
match sources {
341
// Files cannot be `gather()`ed.
342
ScanSources::Files(_) => return Ok(()),
343
ScanSources::Paths(_) | ScanSources::Buffers(_) => {},
344
}
345
346
let verbose = config::verbose();
347
348
let scan_predicate = create_scan_predicate(
349
predicate,
350
expr_arena,
351
&scan_ir_schema,
352
hive_parts.as_ref().map(|hp| hp.df().schema().as_ref()),
353
&mut ExpressionConversionState::new(true),
354
true, // create_skip_batch_predicate
355
false, // create_column_predicates
356
)?
357
.to_io(None, file_info.schema.clone());
358
359
let (skip_files_mask, predicate_to_readers) = initialize_scan_predicate(
360
Some(&scan_predicate),
361
hive_parts.as_ref(),
362
unified_scan_args.table_statistics.as_ref(),
363
verbose,
364
)?;
365
366
if let Some(skip_files_mask) = skip_files_mask {
367
assert_eq!(skip_files_mask.len(), sources.len());
368
369
let predicate_file_skip = PredicateFileSkip {
370
no_residual_predicate: predicate_to_readers.is_none(),
371
original_len: sources.len(),
372
};
373
374
if verbose {
375
eprintln!("apply_scan_predicate_to_scan_ir: {predicate_file_skip:?}");
376
}
377
378
*predicate_file_skip_applied = Some(predicate_file_skip);
379
380
if skip_files_mask.num_skipped_files() > 0 {
381
filter_scan_ir(scan_ir, skip_files_mask.non_skipped_files_idx_iter())
382
}
383
}
384
385
Ok(())
386
}
387
388
/// Filters the paths for a scan IR. This also involves performing selections on
389
/// e.g. hive partitions, deletion files.
390
///
391
/// Note: `selected_path_indices` should be cheaply cloneable.
392
///
393
/// # Panics
394
/// Panics if `scan_ir` is not `IR::Scan`.
395
pub fn filter_scan_ir<I>(scan_ir: &mut IR, selected_path_indices: I)
396
where
397
I: Iterator<Item = usize> + Clone,
398
{
399
let IR::Scan {
400
sources,
401
file_info:
402
FileInfo {
403
schema: _,
404
reader_schema,
405
row_estimation,
406
},
407
hive_parts,
408
predicate: _,
409
predicate_file_skip_applied: _,
410
output_schema: _,
411
scan_type,
412
unified_scan_args,
413
} = scan_ir
414
else {
415
panic!("{:?}", scan_ir);
416
};
417
418
let size_hint = selected_path_indices.size_hint();
419
420
if size_hint.0 == sources.len()
421
&& size_hint.1 == Some(sources.len())
422
&& selected_path_indices
423
.clone()
424
.enumerate()
425
.all(|(i, x)| i == x)
426
{
427
return;
428
}
429
430
let UnifiedScanArgs {
431
schema: _,
432
cloud_options: _,
433
hive_options: _,
434
rechunk: _,
435
cache: _,
436
glob: _,
437
hidden_file_prefix: _,
438
projection: _,
439
column_mapping: _,
440
default_values,
441
// Ensure these are None.
442
row_index: None,
443
pre_slice: None,
444
cast_columns_policy: _,
445
missing_columns_policy: _,
446
extra_columns_policy: _,
447
include_file_paths: _,
448
table_statistics,
449
deletion_files,
450
row_count,
451
} = unified_scan_args.as_mut()
452
else {
453
panic!("{unified_scan_args:?}")
454
};
455
456
*row_count = None;
457
458
if selected_path_indices.clone().next() != Some(0) {
459
*reader_schema = None;
460
461
// Ensure the metadata is unset, otherwise it may incorrectly be used at
462
// scan. This is especially important for Parquet as it requires the
463
// correct `is_nullable` in the arrow field.
464
match scan_type.as_mut() {
465
#[cfg(feature = "parquet")]
466
FileScanIR::Parquet {
467
options: _,
468
metadata,
469
} => *metadata = None,
470
471
#[cfg(feature = "ipc")]
472
FileScanIR::Ipc {
473
options: _,
474
metadata,
475
} => *metadata = None,
476
477
#[cfg(feature = "csv")]
478
FileScanIR::Csv { options: _ } => {},
479
480
#[cfg(feature = "json")]
481
FileScanIR::NDJson { options: _ } => {},
482
483
#[cfg(feature = "python")]
484
FileScanIR::PythonDataset {
485
dataset_object: _,
486
cached_ir,
487
} => *cached_ir.lock().unwrap() = None,
488
489
#[cfg(feature = "scan_lines")]
490
FileScanIR::Lines { name: _ } => {},
491
492
FileScanIR::Anonymous {
493
options: _,
494
function: _,
495
} => {},
496
}
497
}
498
499
let selected_path_indices_idxsize = LazyCell::new(|| {
500
selected_path_indices
501
.clone()
502
.map(|i| IdxSize::try_from(i).unwrap())
503
.collect::<Vec<_>>()
504
});
505
506
*deletion_files = deletion_files.as_ref().and_then(|x| match x {
507
DeletionFilesList::IcebergPositionDelete(deletions) => {
508
let mut out = None;
509
510
for (out_idx, source_idx) in selected_path_indices.clone().enumerate() {
511
if let Some(v) = deletions.get(&source_idx) {
512
out.get_or_insert_with(|| {
513
PlIndexMap::with_capacity(selected_path_indices.size_hint().0 - out_idx)
514
})
515
.insert(out_idx, v.clone());
516
}
517
}
518
519
out.map(|x| DeletionFilesList::IcebergPositionDelete(Arc::new(x)))
520
},
521
});
522
523
*table_statistics = table_statistics.as_ref().map(|x| {
524
let df_height = IdxSize::try_from(x.0.height()).unwrap();
525
526
assert!(selected_path_indices_idxsize.iter().all(|x| *x < df_height));
527
528
TableStatistics(Arc::new(unsafe {
529
x.0.take_slice_unchecked(&selected_path_indices_idxsize)
530
}))
531
});
532
533
let original_sources_len = sources.len();
534
*sources = sources.gather(selected_path_indices.clone()).unwrap();
535
*row_estimation = (
536
None,
537
row_estimation
538
.1
539
.div_ceil(original_sources_len)
540
.saturating_mul(sources.len()),
541
);
542
543
*hive_parts = hive_parts.as_ref().map(|hp| {
544
let df = hp.df();
545
let df_height = IdxSize::try_from(df.height()).unwrap();
546
547
assert!(selected_path_indices_idxsize.iter().all(|x| *x < df_height));
548
549
// Safety: Asserted all < df.height() above.
550
unsafe { df.take_slice_unchecked(&selected_path_indices_idxsize) }.into()
551
});
552
553
*default_values = default_values.as_ref().map(|x| match x {
554
DefaultFieldValues::Iceberg(v) => {
555
let mut out = PlIndexMap::with_capacity(v.len());
556
let mut gather_indices = PlHashMap::with_capacity(v.len());
557
558
for (k, v) in v.iter() {
559
out.insert(
560
*k,
561
v.as_ref().map_err(Clone::clone).map(|partition_values| {
562
if !gather_indices.contains_key(&partition_values.len()) {
563
gather_indices.insert(
564
partition_values.len(),
565
selected_path_indices
566
.clone()
567
.map(|i| {
568
(i < partition_values.len())
569
.then(|| IdxSize::try_from(i).unwrap())
570
})
571
.collect::<IdxCa>(),
572
);
573
}
574
575
unsafe {
576
partition_values.take_unchecked(
577
gather_indices.get(&partition_values.len()).unwrap(),
578
)
579
}
580
}),
581
);
582
}
583
584
DefaultFieldValues::Iceberg(Arc::new(IcebergIdentityTransformedPartitionFields(out)))
585
},
586
});
587
}
588
589