Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-lazy/src/tests/cse.rs
6939 views
1
use std::collections::BTreeSet;
2
3
use super::*;
4
5
fn cached_before_root(q: LazyFrame) {
6
let (mut expr_arena, mut lp_arena) = get_arenas();
7
let lp = q.optimize(&mut lp_arena, &mut expr_arena).unwrap();
8
for input in lp_arena.get(lp).inputs() {
9
assert!(matches!(lp_arena.get(input), IR::Cache { .. }));
10
}
11
}
12
13
fn count_caches(q: LazyFrame) -> usize {
14
let IRPlan {
15
lp_top, lp_arena, ..
16
} = q.to_alp_optimized().unwrap();
17
lp_arena
18
.iter(lp_top)
19
.filter(|(_node, lp)| matches!(lp, IR::Cache { .. }))
20
.count()
21
}
22
23
#[test]
24
fn test_cse_self_joins() -> PolarsResult<()> {
25
let lf = scan_foods_ipc();
26
27
let lf = lf.with_column(col("category").str().to_uppercase());
28
29
let lf = lf
30
.clone()
31
.left_join(lf, col("fats_g"), col("fats_g"))
32
.with_comm_subplan_elim(true);
33
34
cached_before_root(lf);
35
36
Ok(())
37
}
38
39
#[test]
40
fn test_cse_unions() -> PolarsResult<()> {
41
let lf = scan_foods_ipc();
42
43
let lf1 = lf.clone().with_column(col("category").str().to_uppercase());
44
45
let lf = concat(
46
&[lf1.clone(), lf, lf1],
47
UnionArgs {
48
rechunk: false,
49
parallel: false,
50
..Default::default()
51
},
52
)?
53
.select([col("category"), col("fats_g")])
54
.with_comm_subplan_elim(true);
55
56
let (mut expr_arena, mut lp_arena) = get_arenas();
57
let lp = lf.clone().optimize(&mut lp_arena, &mut expr_arena).unwrap();
58
let mut cache_count = 0;
59
assert!(lp_arena.iter(lp).all(|(_, lp)| {
60
use IR::*;
61
match lp {
62
Cache { .. } => {
63
cache_count += 1;
64
true
65
},
66
Scan {
67
unified_scan_args, ..
68
} => {
69
if let Some(columns) = &unified_scan_args.projection {
70
columns.len() == 2
71
} else {
72
false
73
}
74
},
75
_ => true,
76
}
77
}));
78
assert_eq!(cache_count, 2);
79
let out = lf.collect()?;
80
assert_eq!(out.get_column_names(), &["category", "fats_g"]);
81
82
Ok(())
83
}
84
85
#[test]
86
fn test_cse_cache_union_projection_pd() -> PolarsResult<()> {
87
let q = df![
88
"a" => [1],
89
"b" => [2],
90
"c" => [3],
91
]?
92
.lazy();
93
94
let q1 = q.clone().filter(col("a").eq(lit(1))).select([col("a")]);
95
let q2 = q.filter(col("a").eq(lit(1))).select([col("a"), col("b")]);
96
let q = q1
97
.left_join(q2, col("a"), col("a"))
98
.with_comm_subplan_elim(true);
99
100
// check that the projection of a is not done before the cache
101
let (mut expr_arena, mut lp_arena) = get_arenas();
102
let lp = q.optimize(&mut lp_arena, &mut expr_arena).unwrap();
103
let mut cache_count = 0;
104
assert!(lp_arena.iter(lp).all(|(_, lp)| {
105
use IR::*;
106
match lp {
107
Cache { .. } => {
108
cache_count += 1;
109
true
110
},
111
DataFrameScan {
112
output_schema: Some(projection),
113
..
114
} => projection.as_ref().len() <= 2,
115
DataFrameScan { .. } => false,
116
_ => true,
117
}
118
}));
119
assert_eq!(cache_count, 2);
120
121
Ok(())
122
}
123
124
#[test]
125
fn test_cse_union2_4925() -> PolarsResult<()> {
126
let lf1 = df![
127
"ts" => [1],
128
"sym" => ["a"],
129
"c" => [true],
130
]?
131
.lazy();
132
133
let lf2 = df![
134
"ts" => [1],
135
"d" => [3],
136
]?
137
.lazy();
138
139
let args = UnionArgs {
140
parallel: false,
141
rechunk: false,
142
..Default::default()
143
};
144
let lf1 = concat(&[lf1.clone(), lf1], args)?;
145
let lf2 = concat(&[lf2.clone(), lf2], args)?;
146
147
let q = lf1.inner_join(lf2, col("ts"), col("ts")).select([
148
col("ts"),
149
col("sym"),
150
col("d") / col("c"),
151
]);
152
153
let (mut expr_arena, mut lp_arena) = get_arenas();
154
let lp = q.optimize(&mut lp_arena, &mut expr_arena).unwrap();
155
156
// ensure we get two different caches
157
// and ensure that every cache only has 1 hit.
158
let cache_ids = lp_arena
159
.iter(lp)
160
.flat_map(|(_, lp)| {
161
use IR::*;
162
match lp {
163
Cache { id, .. } => Some(*id),
164
_ => None,
165
}
166
})
167
.collect::<BTreeSet<_>>();
168
169
assert_eq!(cache_ids.len(), 2);
170
171
Ok(())
172
}
173
174
#[test]
175
fn test_cse_joins_4954() -> PolarsResult<()> {
176
let x = df![
177
"a"=> [1],
178
"b"=> [1],
179
"c"=> [1],
180
]?
181
.lazy();
182
183
let y = df![
184
"a"=> [1],
185
"b"=> [1],
186
]?
187
.lazy();
188
189
let z = df![
190
"a"=> [1],
191
]?
192
.lazy();
193
194
let a = x.left_join(z.clone(), col("a"), col("a"));
195
let b = y.left_join(z, col("a"), col("a"));
196
let c = a.join(
197
b,
198
&[col("a"), col("b")],
199
&[col("a"), col("b")],
200
JoinType::Left.into(),
201
);
202
203
let (mut expr_arena, mut lp_arena) = get_arenas();
204
let lp = c.optimize(&mut lp_arena, &mut expr_arena).unwrap();
205
206
// Ensure we get only one cache and it is not above the join
207
// and ensure that every cache only has 1 hit.
208
let cache_ids = lp_arena
209
.iter(lp)
210
.flat_map(|(_, lp)| {
211
use IR::*;
212
match lp {
213
Cache { id, input, .. } => {
214
assert!(matches!(lp_arena.get(*input), IR::SimpleProjection { .. }));
215
216
Some(*id)
217
},
218
_ => None,
219
}
220
})
221
.collect::<BTreeSet<_>>();
222
223
assert_eq!(cache_ids.len(), 1);
224
225
Ok(())
226
}
227
#[test]
228
#[cfg(feature = "semi_anti_join")]
229
fn test_cache_with_partial_projection() -> PolarsResult<()> {
230
let lf1 = df![
231
"id" => ["a"],
232
"x" => [1],
233
"freq" => [2]
234
]?
235
.lazy();
236
237
let lf2 = df![
238
"id" => ["a"]
239
]?
240
.lazy();
241
242
let q = lf2
243
.join(
244
lf1.clone().select([col("id"), col("freq")]),
245
[col("id")],
246
[col("id")],
247
JoinType::Semi.into(),
248
)
249
.join(
250
lf1.clone().filter(col("x").neq(lit(8))),
251
[col("id")],
252
[col("id")],
253
JoinType::Semi.into(),
254
)
255
.join(
256
lf1.filter(col("x").neq(lit(8))),
257
[col("id")],
258
[col("id")],
259
JoinType::Semi.into(),
260
);
261
262
let (mut expr_arena, mut lp_arena) = get_arenas();
263
let lp = q.optimize(&mut lp_arena, &mut expr_arena).unwrap();
264
265
// EDIT: #15264 this originally
266
// tested 2 caches, but we cannot do that after #15264 due to projection pushdown
267
// running first and the cache semantics changing, so now we test 1. Maybe we can improve later.
268
269
// ensure we get two different caches
270
// and ensure that every cache only has 1 hit.
271
let cache_ids = lp_arena
272
.iter(lp)
273
.flat_map(|(_, lp)| {
274
use IR::*;
275
match lp {
276
Cache { id, .. } => Some(*id),
277
_ => None,
278
}
279
})
280
.collect::<BTreeSet<_>>();
281
assert_eq!(cache_ids.len(), 1);
282
283
Ok(())
284
}
285
286
#[test]
287
#[cfg(feature = "cross_join")]
288
fn test_cse_columns_projections() -> PolarsResult<()> {
289
let right = df![
290
"A" => [1, 2],
291
"B" => [3, 4],
292
"D" => [5, 6]
293
]?
294
.lazy();
295
296
let left = df![
297
"C" => [3, 4],
298
]?
299
.lazy();
300
301
let left = left.cross_join(right.clone().select([col("A")]), None);
302
let q = left.join(
303
right.rename(["B"], ["C"], true),
304
[col("A"), col("C")],
305
[col("A"), col("C")],
306
JoinType::Left.into(),
307
);
308
309
let out = q.collect()?;
310
311
assert_eq!(out.get_column_names(), &["C", "A", "D"]);
312
313
Ok(())
314
}
315
316
#[test]
317
fn test_cse_prune_scan_filter_difference() -> PolarsResult<()> {
318
let lf = scan_foods_ipc();
319
let lf = lf.with_column(col("category").str().to_uppercase());
320
321
let pred = col("fats_g").gt(2.0);
322
323
// If filter are the same, we can cache
324
let q = lf
325
.clone()
326
.filter(pred.clone())
327
.left_join(lf.clone().filter(pred), col("fats_g"), col("fats_g"))
328
.with_comm_subplan_elim(true);
329
cached_before_root(q);
330
331
// If the filters are different the caches are removed.
332
let q = lf
333
.clone()
334
.filter(col("fats_g").gt(2.0))
335
.left_join(
336
lf.filter(col("fats_g").gt(1.0)),
337
col("fats_g"),
338
col("fats_g"),
339
)
340
.with_comm_subplan_elim(true);
341
342
// Check that the caches are removed and that both predicates have been pushed down instead.
343
assert_eq!(count_caches(q.clone()), 0);
344
assert!(predicate_at_scan(q));
345
346
Ok(())
347
}
348
349