Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-lazy/src/tests/optimization_checks.rs
6939 views
1
use super::*;
2
3
#[cfg(feature = "parquet")]
4
pub(crate) fn row_index_at_scan(q: LazyFrame) -> bool {
5
let (mut expr_arena, mut lp_arena) = get_arenas();
6
let lp = q.optimize(&mut lp_arena, &mut expr_arena).unwrap();
7
8
lp_arena.iter(lp).any(|(_, lp)| {
9
if let IR::Scan {
10
unified_scan_args, ..
11
} = lp
12
{
13
unified_scan_args.row_index.is_some()
14
} else {
15
false
16
}
17
})
18
}
19
20
pub(crate) fn predicate_at_scan(q: LazyFrame) -> bool {
21
let (mut expr_arena, mut lp_arena) = get_arenas();
22
let lp = q.optimize(&mut lp_arena, &mut expr_arena).unwrap();
23
24
lp_arena.iter(lp).any(|(_, lp)| match lp {
25
IR::Filter { input, .. } => {
26
matches!(lp_arena.get(*input), IR::DataFrameScan { .. })
27
},
28
IR::Scan {
29
predicate: Some(_), ..
30
} => true,
31
_ => false,
32
})
33
}
34
35
pub(crate) fn predicate_at_all_scans(q: LazyFrame) -> bool {
36
let (mut expr_arena, mut lp_arena) = get_arenas();
37
let lp = q.optimize(&mut lp_arena, &mut expr_arena).unwrap();
38
39
lp_arena.iter(lp).all(|(_, lp)| match lp {
40
IR::Filter { input, .. } => {
41
matches!(lp_arena.get(*input), IR::DataFrameScan { .. })
42
},
43
IR::Scan {
44
predicate: Some(_), ..
45
} => true,
46
_ => false,
47
})
48
}
49
50
#[cfg(any(feature = "parquet", feature = "csv"))]
51
fn slice_at_scan(q: LazyFrame) -> bool {
52
let (mut expr_arena, mut lp_arena) = get_arenas();
53
let lp = q.optimize(&mut lp_arena, &mut expr_arena).unwrap();
54
lp_arena.iter(lp).any(|(_, lp)| {
55
use IR::*;
56
match lp {
57
Scan {
58
unified_scan_args, ..
59
} => unified_scan_args.pre_slice.is_some(),
60
_ => false,
61
}
62
})
63
}
64
65
#[test]
66
fn test_pred_pd_1() -> PolarsResult<()> {
67
let df = fruits_cars();
68
69
let q = df
70
.clone()
71
.lazy()
72
.select([col("A"), col("B")])
73
.filter(col("A").gt(lit(1)));
74
75
assert!(predicate_at_scan(q));
76
77
// Check if we understand that we can unwrap the alias.
78
let q = df
79
.clone()
80
.lazy()
81
.select([col("A").alias("C"), col("B")])
82
.filter(col("C").gt(lit(1)));
83
84
assert!(predicate_at_scan(q));
85
86
// Check if we pass hstack.
87
let q = df
88
.lazy()
89
.with_columns([col("A").alias("C"), col("B")])
90
.filter(col("B").gt(lit(1)));
91
92
assert!(predicate_at_scan(q));
93
94
Ok(())
95
}
96
97
#[test]
98
fn test_no_left_join_pass() -> PolarsResult<()> {
99
let df1 = df![
100
"foo" => ["abc", "def", "ghi"],
101
"idx1" => [0, 0, 1],
102
]?;
103
let df2 = df![
104
"bar" => [5, 6],
105
"idx2" => [0, 1],
106
]?;
107
108
let out = df1
109
.lazy()
110
.join(
111
df2.lazy(),
112
[col("idx1")],
113
[col("idx2")],
114
JoinType::Left.into(),
115
)
116
.filter(col("bar").eq(lit(5i32)))
117
.collect()?;
118
119
let expected = df![
120
"foo" => ["abc", "def"],
121
"idx1" => [0, 0],
122
"bar" => [5, 5],
123
]?;
124
125
assert!(out.equals(&expected));
126
Ok(())
127
}
128
129
#[test]
130
#[cfg(feature = "parquet")]
131
pub fn test_simple_slice() -> PolarsResult<()> {
132
let _guard = SINGLE_LOCK.lock().unwrap();
133
let q = scan_foods_parquet(false).limit(3);
134
135
assert!(slice_at_scan(q.clone()));
136
let out = q.collect()?;
137
assert_eq!(out.height(), 3);
138
139
let q = scan_foods_parquet(false)
140
.select([col("category"), col("calories").alias("bar")])
141
.limit(3);
142
assert!(slice_at_scan(q.clone()));
143
let out = q.collect()?;
144
assert_eq!(out.height(), 3);
145
146
Ok(())
147
}
148
149
#[test]
150
#[cfg(feature = "parquet")]
151
#[cfg(feature = "cse")]
152
pub fn test_slice_pushdown_join() -> PolarsResult<()> {
153
let _guard = SINGLE_LOCK.lock().unwrap();
154
let q1 = scan_foods_parquet(false).limit(3);
155
let q2 = scan_foods_parquet(false);
156
157
let q = q1
158
.join(
159
q2,
160
[col("category")],
161
[col("category")],
162
JoinType::Left.into(),
163
)
164
.slice(1, 3)
165
// this inserts a cache and blocks slice pushdown
166
.with_comm_subplan_elim(false);
167
// test if optimization continued beyond the join node
168
assert!(slice_at_scan(q.clone()));
169
170
let (mut expr_arena, mut lp_arena) = get_arenas();
171
let lp = q.clone().optimize(&mut lp_arena, &mut expr_arena).unwrap();
172
assert!(lp_arena.iter(lp).all(|(_, lp)| {
173
use IR::*;
174
match lp {
175
Join { options, .. } => options.args.slice == Some((1, 3)),
176
Slice { .. } => false,
177
_ => true,
178
}
179
}));
180
let out = q.collect()?;
181
assert_eq!(out.shape(), (3, 7));
182
183
Ok(())
184
}
185
186
#[test]
187
#[cfg(feature = "parquet")]
188
pub fn test_slice_pushdown_group_by() -> PolarsResult<()> {
189
let _guard = SINGLE_LOCK.lock().unwrap();
190
let q = scan_foods_parquet(false).limit(100);
191
192
let q = q
193
.group_by([col("category")])
194
.agg([col("calories").sum()])
195
.slice(1, 3);
196
197
// test if optimization continued beyond the group_by node
198
assert!(slice_at_scan(q.clone()));
199
200
let (mut expr_arena, mut lp_arena) = get_arenas();
201
let lp = q.clone().optimize(&mut lp_arena, &mut expr_arena).unwrap();
202
assert!(lp_arena.iter(lp).all(|(_, lp)| {
203
use IR::*;
204
match lp {
205
GroupBy { options, .. } => options.slice == Some((1, 3)),
206
Slice { .. } => false,
207
_ => true,
208
}
209
}));
210
let out = q.collect()?;
211
assert_eq!(out.shape(), (3, 2));
212
213
Ok(())
214
}
215
216
#[test]
217
#[cfg(feature = "parquet")]
218
pub fn test_slice_pushdown_sort() -> PolarsResult<()> {
219
let _guard = SINGLE_LOCK.lock().unwrap();
220
let q = scan_foods_parquet(false).limit(100);
221
222
let q = q
223
.sort(["category"], SortMultipleOptions::default())
224
.slice(1, 3);
225
226
// test if optimization continued beyond the sort node
227
assert!(slice_at_scan(q.clone()));
228
229
let (mut expr_arena, mut lp_arena) = get_arenas();
230
let lp = q.clone().optimize(&mut lp_arena, &mut expr_arena).unwrap();
231
assert!(lp_arena.iter(lp).all(|(_, lp)| {
232
use IR::*;
233
match lp {
234
Sort { slice, .. } => *slice == Some((1, 3)),
235
Slice { .. } => false,
236
_ => true,
237
}
238
}));
239
let out = q.collect()?;
240
assert_eq!(out.shape(), (3, 4));
241
242
Ok(())
243
}
244
245
#[test]
246
#[cfg(feature = "dtype-i16")]
247
pub fn test_predicate_block_cast() -> PolarsResult<()> {
248
let df = df![
249
"value" => [10, 20, 30, 40]
250
]?;
251
252
let lf1 = df
253
.clone()
254
.lazy()
255
.with_column(col("value").cast(DataType::Int16) * lit(0.1).cast(DataType::Float32))
256
.filter(col("value").lt(lit(2.5f32)));
257
258
let lf2 = df
259
.lazy()
260
.select([col("value").cast(DataType::Int16) * lit(0.1).cast(DataType::Float32)])
261
.filter(col("value").lt(lit(2.5f32)));
262
263
for lf in [lf1, lf2] {
264
assert!(!predicate_at_scan(lf.clone()));
265
266
let out = lf.collect()?;
267
let s = out.column("value").unwrap();
268
assert_eq!(
269
s,
270
&Column::new(PlSmallStr::from_static("value"), [1.0f32, 2.0])
271
);
272
}
273
274
Ok(())
275
}
276
277
#[test]
278
fn test_lazy_filter_and_rename() {
279
let df = load_df();
280
let lf = df
281
.clone()
282
.lazy()
283
.rename(["a"], ["x"], true)
284
.filter(col("x").map(
285
|s: Column| Ok(s.as_materialized_series().gt(3)?.into_column()),
286
|_, f| Ok(Field::new(f.name().clone(), DataType::Boolean)),
287
))
288
.select([col("x")]);
289
290
let correct = df! {
291
"x" => &[4, 5]
292
}
293
.unwrap();
294
assert!(lf.collect().unwrap().equals(&correct));
295
296
// now we check if the column is rename or added when we don't select
297
let lf = df.lazy().rename(["a"], ["x"], true).filter(col("x").map(
298
|s: Column| Ok(s.as_materialized_series().gt(3)?.into_column()),
299
|_, f| Ok(Field::new(f.name().clone(), DataType::Boolean)),
300
));
301
// the rename function should not interfere with the predicate pushdown
302
assert!(predicate_at_scan(lf.clone()));
303
304
assert_eq!(lf.collect().unwrap().get_column_names(), &["x", "b", "c"]);
305
}
306
307
#[test]
308
fn test_with_row_index_opts() -> PolarsResult<()> {
309
let df = df![
310
"a" => [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
311
]?;
312
313
let out = df
314
.clone()
315
.lazy()
316
.with_row_index("index", None)
317
.tail(5)
318
.collect()?;
319
let expected = df![
320
"index" => [5 as IdxSize, 6, 7, 8, 9],
321
"a" => [5, 6, 7, 8, 9],
322
]?;
323
324
assert!(out.equals(&expected));
325
let out = df
326
.clone()
327
.lazy()
328
.with_row_index("index", None)
329
.slice(1, 2)
330
.collect()?;
331
assert_eq!(
332
out.column("index")?
333
.idx()?
334
.into_no_null_iter()
335
.collect::<Vec<_>>(),
336
&[1, 2]
337
);
338
339
let out = df
340
.clone()
341
.lazy()
342
.with_row_index("index", None)
343
.filter(col("a").eq(lit(3i32)))
344
.collect()?;
345
assert_eq!(
346
out.column("index")?
347
.idx()?
348
.into_no_null_iter()
349
.collect::<Vec<_>>(),
350
&[3]
351
);
352
353
let out = df
354
.clone()
355
.lazy()
356
.slice(1, 2)
357
.with_row_index("index", None)
358
.collect()?;
359
assert_eq!(
360
out.column("index")?
361
.idx()?
362
.into_no_null_iter()
363
.collect::<Vec<_>>(),
364
&[0, 1]
365
);
366
367
let out = df
368
.lazy()
369
.filter(col("a").eq(lit(3i32)))
370
.with_row_index("index", None)
371
.collect()?;
372
assert_eq!(
373
out.column("index")?
374
.idx()?
375
.into_no_null_iter()
376
.collect::<Vec<_>>(),
377
&[0]
378
);
379
380
Ok(())
381
}
382
383
#[cfg(all(feature = "concat_str", feature = "strings"))]
384
#[test]
385
fn test_string_addition_to_concat_str() -> PolarsResult<()> {
386
let df = df![
387
"a"=> ["a"],
388
"b"=> ["b"],
389
]?;
390
391
let q = df
392
.lazy()
393
.select([lit("foo") + col("a") + col("b") + lit("bar")]);
394
395
let (mut expr_arena, mut lp_arena) = get_arenas();
396
let root = q.clone().optimize(&mut lp_arena, &mut expr_arena)?;
397
let lp = lp_arena.get(root);
398
let e = lp.exprs().next().unwrap();
399
if let AExpr::Function { input, .. } = expr_arena.get(e.node()) {
400
// the concat_str has the 4 expressions as input
401
assert_eq!(input.len(), 4);
402
} else {
403
panic!()
404
}
405
406
let out = q.collect()?;
407
let s = out.column("literal")?;
408
assert_eq!(s.get(0)?, AnyValue::String("fooabbar"));
409
410
Ok(())
411
}
412
#[test]
413
fn test_with_column_prune() -> PolarsResult<()> {
414
// don't
415
let df = df![
416
"c0" => [0],
417
"c1" => [0],
418
"c2" => [0],
419
]?;
420
let (mut expr_arena, mut lp_arena) = get_arenas();
421
422
// only a single expression pruned and only one column selection
423
let q = df
424
.clone()
425
.lazy()
426
.with_columns([col("c0"), col("c1").alias("c4")])
427
.select([col("c1"), col("c4")]);
428
let lp = q.optimize(&mut lp_arena, &mut expr_arena).unwrap();
429
lp_arena.iter(lp).for_each(|(_, lp)| {
430
use IR::*;
431
match lp {
432
DataFrameScan { output_schema, .. } => {
433
let projection = output_schema.as_ref().unwrap();
434
assert_eq!(projection.len(), 1);
435
let name = projection.get_at_index(0).unwrap().0;
436
assert_eq!(name, "c1");
437
},
438
HStack { exprs, .. } => {
439
assert_eq!(exprs.len(), 1);
440
},
441
_ => {},
442
};
443
});
444
445
// whole `with_columns` pruned
446
let mut q = df.lazy().with_column(col("c0")).select([col("c1")]);
447
448
let lp = q.clone().optimize(&mut lp_arena, &mut expr_arena).unwrap();
449
450
// check if with_column is pruned
451
assert!(lp_arena.iter(lp).all(|(_, lp)| {
452
use IR::*;
453
454
matches!(lp, SimpleProjection { .. } | DataFrameScan { .. })
455
}));
456
assert_eq!(
457
q.collect_schema().unwrap().as_ref(),
458
&Schema::from_iter([Field::new(PlSmallStr::from_static("c1"), DataType::Int32)])
459
);
460
Ok(())
461
}
462
463
#[test]
464
#[cfg(feature = "csv")]
465
fn test_slice_at_scan_group_by() -> PolarsResult<()> {
466
let ldf = scan_foods_csv();
467
468
// this tests if slice pushdown restarts aggregation nodes (it did not)
469
let q = ldf
470
.slice(0, 5)
471
.filter(col("calories").lt(lit(10)))
472
.group_by([col("calories")])
473
.agg([col("fats_g").first()])
474
.select([col("fats_g")]);
475
476
assert!(slice_at_scan(q));
477
Ok(())
478
}
479
480
#[test]
481
fn test_flatten_unions() -> PolarsResult<()> {
482
let (mut expr_arena, mut lp_arena) = get_arenas();
483
484
let lf = df! {
485
"a" => [1,2,3,4,5],
486
}
487
.unwrap()
488
.lazy();
489
490
let args = UnionArgs {
491
rechunk: false,
492
parallel: true,
493
..Default::default()
494
};
495
let lf2 = concat(&[lf.clone(), lf.clone()], args).unwrap();
496
let lf3 = concat(&[lf.clone(), lf.clone(), lf], args).unwrap();
497
let lf4 = concat(&[lf2, lf3], args).unwrap();
498
let root = lf4.optimize(&mut lp_arena, &mut expr_arena).unwrap();
499
let lp = lp_arena.get(root);
500
match lp {
501
IR::Union { inputs, .. } => {
502
// we make sure that the nested unions are flattened into a single union
503
assert_eq!(inputs.len(), 5);
504
},
505
_ => panic!(),
506
}
507
Ok(())
508
}
509
510
fn num_occurrences(s: &str, needle: &str) -> usize {
511
let mut i = 0;
512
let mut num = 0;
513
514
while let Some(n) = s[i..].find(needle) {
515
i += n + 1;
516
num += 1;
517
}
518
519
num
520
}
521
522
#[test]
523
fn test_cluster_with_columns() -> Result<(), Box<dyn std::error::Error>> {
524
use polars_core::prelude::*;
525
526
let df = df!("foo" => &[0.5, 1.7, 3.2],
527
"bar" => &[4.1, 1.5, 9.2])?;
528
529
let df = df
530
.lazy()
531
.without_optimizations()
532
.with_cluster_with_columns(true)
533
.with_columns([col("foo") * lit(2.0)])
534
.with_columns([col("bar") / lit(1.5)]);
535
536
let unoptimized = df.clone().to_alp().unwrap();
537
let optimized = df.to_alp_optimized().unwrap();
538
539
let unoptimized = unoptimized.describe();
540
let optimized = optimized.describe();
541
542
println!("\n---\n");
543
544
println!("Unoptimized:\n{unoptimized}",);
545
println!("\n---\n");
546
println!("Optimized:\n{optimized}");
547
548
assert_eq!(num_occurrences(&unoptimized, "WITH_COLUMNS"), 2);
549
assert_eq!(num_occurrences(&optimized, "WITH_COLUMNS"), 1);
550
551
Ok(())
552
}
553
554
#[test]
555
fn test_cluster_with_columns_dependency() -> Result<(), Box<dyn std::error::Error>> {
556
use polars_core::prelude::*;
557
558
let df = df!("foo" => &[0.5, 1.7, 3.2],
559
"bar" => &[4.1, 1.5, 9.2])?;
560
561
let df = df
562
.lazy()
563
.without_optimizations()
564
.with_cluster_with_columns(true)
565
.with_columns([col("foo").alias("buzz")])
566
.with_columns([col("buzz")]);
567
568
let unoptimized = df.clone().to_alp().unwrap();
569
let optimized = df.to_alp_optimized().unwrap();
570
571
let unoptimized = unoptimized.describe();
572
let optimized = optimized.describe();
573
574
println!("\n---\n");
575
576
println!("Unoptimized:\n{unoptimized}",);
577
println!("\n---\n");
578
println!("Optimized:\n{optimized}");
579
580
assert_eq!(num_occurrences(&unoptimized, "WITH_COLUMNS"), 2);
581
assert_eq!(num_occurrences(&optimized, "WITH_COLUMNS"), 2);
582
583
Ok(())
584
}
585
586
#[test]
587
fn test_cluster_with_columns_partial() -> Result<(), Box<dyn std::error::Error>> {
588
use polars_core::prelude::*;
589
590
let df = df!("foo" => &[0.5, 1.7, 3.2],
591
"bar" => &[4.1, 1.5, 9.2])?;
592
593
let df = df
594
.lazy()
595
.without_optimizations()
596
.with_cluster_with_columns(true)
597
.with_columns([col("foo").alias("buzz")])
598
.with_columns([col("buzz"), col("foo") * lit(2.0)]);
599
600
let unoptimized = df.clone().to_alp().unwrap();
601
let optimized = df.to_alp_optimized().unwrap();
602
603
let unoptimized = unoptimized.describe();
604
let optimized = optimized.describe();
605
606
println!("\n---\n");
607
608
println!("Unoptimized:\n{unoptimized}",);
609
println!("\n---\n");
610
println!("Optimized:\n{optimized}");
611
612
assert!(unoptimized.contains(r#"[col("buzz"), [(col("foo")) * (2.0)]]"#));
613
assert!(unoptimized.contains(r#"[col("foo").alias("buzz")]"#));
614
assert!(optimized.contains(r#"[col("buzz")]"#));
615
assert!(optimized.contains(r#"[col("foo").alias("buzz"), [(col("foo")) * (2.0)]]"#));
616
617
Ok(())
618
}
619
620
#[test]
621
fn test_cluster_with_columns_chain() -> Result<(), Box<dyn std::error::Error>> {
622
use polars_core::prelude::*;
623
624
let df = df!("foo" => &[0.5, 1.7, 3.2],
625
"bar" => &[4.1, 1.5, 9.2])?;
626
627
let df = df
628
.lazy()
629
.without_optimizations()
630
.with_cluster_with_columns(true)
631
.with_columns([col("foo").alias("foo1")])
632
.with_columns([col("foo").alias("foo2")])
633
.with_columns([col("foo").alias("foo3")])
634
.with_columns([col("foo").alias("foo4")]);
635
636
let unoptimized = df.clone().to_alp().unwrap();
637
let optimized = df.to_alp_optimized().unwrap();
638
639
let unoptimized = unoptimized.describe();
640
let optimized = optimized.describe();
641
642
println!("\n---\n");
643
644
println!("Unoptimized:\n{unoptimized}",);
645
println!("\n---\n");
646
println!("Optimized:\n{optimized}");
647
648
assert_eq!(num_occurrences(&unoptimized, "WITH_COLUMNS"), 4);
649
assert_eq!(num_occurrences(&optimized, "WITH_COLUMNS"), 1);
650
651
Ok(())
652
}
653
654