Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/test_projections.py
6939 views
1
from typing import Literal
2
3
import numpy as np
4
import pytest
5
6
import polars as pl
7
from polars.testing import assert_frame_equal
8
9
10
def test_projection_on_semi_join_4789() -> None:
11
lfa = pl.DataFrame({"a": [1], "p": [1]}).lazy()
12
13
lfb = pl.DataFrame({"seq": [1], "p": [1]}).lazy()
14
15
ab = lfa.join(lfb, on="p", how="semi").inspect()
16
17
intermediate_agg = (ab.group_by("a").agg([pl.col("a").alias("seq")])).select(
18
["a", "seq"]
19
)
20
21
q = ab.join(intermediate_agg, on="a")
22
23
assert q.collect().to_dict(as_series=False) == {"a": [1], "p": [1], "seq": [[1]]}
24
25
26
def test_unpivot_projection_pd_block_4997() -> None:
27
assert (
28
pl.DataFrame({"col1": ["a"], "col2": ["b"]})
29
.with_row_index()
30
.lazy()
31
.unpivot(index="index")
32
.group_by("index")
33
.agg(pl.col("variable").alias("result"))
34
.collect()
35
).to_dict(as_series=False) == {"index": [0], "result": [["col1", "col2"]]}
36
37
38
def test_double_projection_pushdown() -> None:
39
assert (
40
"2/3 COLUMNS"
41
in (
42
pl.DataFrame({"c0": [], "c1": [], "c2": []})
43
.lazy()
44
.select(["c0", "c1", "c2"])
45
.select(["c0", "c1"])
46
).explain()
47
)
48
49
50
def test_group_by_projection_pushdown() -> None:
51
assert (
52
"2/3 COLUMNS"
53
in (
54
pl.DataFrame({"c0": [], "c1": [], "c2": []})
55
.lazy()
56
.group_by("c0")
57
.agg(
58
[
59
pl.col("c1").sum().alias("sum(c1)"),
60
pl.col("c2").mean().alias("mean(c2)"),
61
]
62
)
63
.select(["sum(c1)"])
64
).explain()
65
)
66
67
68
def test_unnest_projection_pushdown() -> None:
69
lf = pl.DataFrame({"x|y|z": [1, 2], "a|b|c": [2, 3]}).lazy()
70
71
mlf = (
72
lf.unpivot()
73
.with_columns(pl.col("variable").str.split_exact("|", 2))
74
.unnest("variable")
75
)
76
mlf = mlf.select(
77
pl.col("field_1").cast(pl.Categorical).alias("row"),
78
pl.col("field_2").cast(pl.Categorical).alias("col"),
79
pl.col("value"),
80
)
81
82
out = (
83
mlf.sort(
84
[pl.col.row.cast(pl.String), pl.col.col.cast(pl.String)],
85
maintain_order=True,
86
)
87
.collect()
88
.to_dict(as_series=False)
89
)
90
assert out == {
91
"row": ["b", "b", "y", "y"],
92
"col": ["c", "c", "z", "z"],
93
"value": [2, 3, 1, 2],
94
}
95
96
97
def test_hconcat_projection_pushdown() -> None:
98
lf1 = pl.LazyFrame({"a": [0, 1, 2], "b": [3, 4, 5]})
99
lf2 = pl.LazyFrame({"c": [6, 7, 8], "d": [9, 10, 11]})
100
query = pl.concat([lf1, lf2], how="horizontal").select(["a", "d"])
101
102
explanation = query.explain()
103
assert explanation.count("1/2 COLUMNS") == 2
104
105
out = query.collect()
106
expected = pl.DataFrame({"a": [0, 1, 2], "d": [9, 10, 11]})
107
assert_frame_equal(out, expected)
108
109
110
def test_hconcat_projection_pushdown_length_maintained() -> None:
111
# We can't eliminate the second input completely as this affects
112
# the length of the result, even though no columns are used.
113
lf1 = pl.LazyFrame({"a": [0, 1], "b": [2, 3]})
114
lf2 = pl.LazyFrame({"c": [4, 5, 6, 7], "d": [8, 9, 10, 11]})
115
query = pl.concat([lf1, lf2], how="horizontal").select(["a"])
116
117
explanation = query.explain()
118
assert "1/2 COLUMNS" in explanation
119
120
out = query.collect()
121
expected = pl.DataFrame({"a": [0, 1, None, None]})
122
assert_frame_equal(out, expected)
123
124
125
@pytest.mark.may_fail_auto_streaming
126
@pytest.mark.may_fail_cloud
127
def test_unnest_columns_available() -> None:
128
df = pl.DataFrame(
129
{
130
"title": ["Avatar", "spectre", "King Kong"],
131
"content_rating": ["PG-13"] * 3,
132
"genres": [
133
"Action|Adventure|Fantasy|Sci-Fi",
134
"Action|Adventure|Thriller",
135
"Action|Adventure|Drama|Romance",
136
],
137
}
138
).lazy()
139
140
q = df.with_columns(
141
pl.col("genres")
142
.str.split("|")
143
.list.to_struct(upper_bound=4, fields=lambda i: f"genre{i + 1}")
144
).unnest("genres")
145
146
out = q.collect()
147
assert out.to_dict(as_series=False) == {
148
"title": ["Avatar", "spectre", "King Kong"],
149
"content_rating": ["PG-13", "PG-13", "PG-13"],
150
"genre1": ["Action", "Action", "Action"],
151
"genre2": ["Adventure", "Adventure", "Adventure"],
152
"genre3": ["Fantasy", "Thriller", "Drama"],
153
"genre4": ["Sci-Fi", None, "Romance"],
154
}
155
156
157
def test_double_projection_union() -> None:
158
lf1 = pl.DataFrame(
159
{
160
"a": [1, 2, 3, 4],
161
"b": [2, 3, 4, 5],
162
"c": [1, 1, 2, 2],
163
"d": [1, 2, 2, 1],
164
}
165
).lazy()
166
167
lf2 = pl.DataFrame(
168
{
169
"a": [5, 6, 7, 8],
170
"b": [6, 7, 8, 9],
171
"c": [1, 2, 1, 3],
172
}
173
).lazy()
174
175
# in this query the group_by projects only 2 columns, that's one
176
# less than the upstream projection so the union will fail if
177
# the select node does not prune one column
178
q = lf1.select(["a", "b", "c"])
179
180
q = pl.concat([q, lf2])
181
182
q = q.group_by("c", maintain_order=True).agg([pl.col("a")])
183
assert q.collect().to_dict(as_series=False) == {
184
"c": [1, 2, 3],
185
"a": [[1, 2, 5, 7], [3, 4, 6], [8]],
186
}
187
188
189
def test_asof_join_projection_() -> None:
190
lf1 = (
191
pl.DataFrame(
192
{
193
"m": np.linspace(0, 5, 7),
194
"a": np.linspace(0, 5, 7),
195
"b": np.linspace(0, 5, 7),
196
"c": pl.Series(np.linspace(0, 5, 7)).cast(str),
197
"d": np.linspace(0, 5, 7),
198
}
199
)
200
.lazy()
201
.set_sorted("b")
202
)
203
lf2 = (
204
pl.DataFrame(
205
{
206
"group": [0, 2, 3, 0, 1, 2, 3],
207
"val": [0.0, 2.5, 2.6, 2.7, 3.4, 4.0, 5.0],
208
"c": ["x", "x", "x", "y", "y", "y", "y"],
209
}
210
)
211
.with_columns(pl.col("val").alias("b"))
212
.lazy()
213
.set_sorted("b")
214
)
215
216
joined = lf1.join_asof(
217
lf2,
218
on="b",
219
by=["c"],
220
strategy="backward",
221
)
222
223
expressions = [
224
"m",
225
"a",
226
"b",
227
"c",
228
"d",
229
pl.lit(0, dtype=pl.Int64).alias("group"),
230
pl.lit(0.1).alias("val"),
231
]
232
dirty_lf1 = lf1.select(expressions)
233
234
concatted = pl.concat([joined, dirty_lf1])
235
assert_frame_equal(
236
concatted.select(["b", "a"]).collect(),
237
pl.DataFrame(
238
{
239
"b": [
240
0.0,
241
0.8333333333333334,
242
1.6666666666666667,
243
2.5,
244
3.3333333333333335,
245
4.166666666666667,
246
5.0,
247
0.0,
248
0.8333333333333334,
249
1.6666666666666667,
250
2.5,
251
3.3333333333333335,
252
4.166666666666667,
253
5.0,
254
],
255
"a": [
256
0.0,
257
0.8333333333333334,
258
1.6666666666666667,
259
2.5,
260
3.3333333333333335,
261
4.166666666666667,
262
5.0,
263
0.0,
264
0.8333333333333334,
265
1.6666666666666667,
266
2.5,
267
3.3333333333333335,
268
4.166666666666667,
269
5.0,
270
],
271
}
272
),
273
check_row_order=False,
274
)
275
276
277
def test_merge_sorted_projection_pd() -> None:
278
lf = pl.LazyFrame(
279
{
280
"foo": [1, 2, 3, 4],
281
"bar": ["patrick", "lukas", "onion", "afk"],
282
}
283
).sort("foo")
284
285
lf2 = pl.LazyFrame({"foo": [5, 6], "bar": ["nice", "false"]}).sort("foo")
286
287
assert (
288
lf.merge_sorted(lf2, key="foo").reverse().select(["bar"])
289
).collect().to_dict(as_series=False) == {
290
"bar": ["false", "nice", "afk", "onion", "lukas", "patrick"]
291
}
292
293
294
def test_distinct_projection_pd_7578() -> None:
295
lf = pl.LazyFrame(
296
{
297
"foo": ["0", "1", "2", "1", "2"],
298
"bar": ["a", "a", "a", "b", "b"],
299
}
300
)
301
302
result = lf.unique().group_by("bar").agg(pl.len())
303
expected = pl.LazyFrame(
304
{
305
"bar": ["a", "b"],
306
"len": [3, 2],
307
},
308
schema_overrides={"len": pl.UInt32},
309
)
310
assert_frame_equal(result, expected, check_row_order=False)
311
312
313
def test_join_suffix_collision_9562() -> None:
314
df = pl.DataFrame(
315
{
316
"foo": [1, 2, 3],
317
"bar": [6.0, 7.0, 8.0],
318
"ham": ["a", "b", "c"],
319
}
320
)
321
other_df = pl.DataFrame(
322
{
323
"apple": ["x", "y", "z"],
324
"ham": ["a", "b", "d"],
325
}
326
)
327
df.join(other_df, on="ham")
328
assert df.lazy().join(
329
other_df.lazy(),
330
how="inner",
331
left_on="ham",
332
right_on="ham",
333
suffix="m",
334
maintain_order="right",
335
).select("ham").collect().to_dict(as_series=False) == {"ham": ["a", "b"]}
336
337
338
def test_projection_join_names_9955() -> None:
339
batting = pl.LazyFrame(
340
{
341
"playerID": ["abercda01"],
342
"yearID": [1871],
343
"lgID": ["NA"],
344
}
345
)
346
347
awards_players = pl.LazyFrame(
348
{
349
"playerID": ["bondto01"],
350
"yearID": [1877],
351
"lgID": ["NL"],
352
}
353
)
354
355
right = awards_players.filter(pl.col("lgID") == "NL").select("playerID")
356
357
q = batting.join(
358
right,
359
left_on=[pl.col("playerID")],
360
right_on=[pl.col("playerID")],
361
how="inner",
362
)
363
364
q = q.select(*batting.collect_schema().keys())
365
366
assert q.collect().schema == {
367
"playerID": pl.String,
368
"yearID": pl.Int64,
369
"lgID": pl.String,
370
}
371
372
373
def test_projection_rename_10595() -> None:
374
lf = pl.LazyFrame(schema={"a": pl.Float32, "b": pl.Float32})
375
376
result = lf.select("a", "b").rename({"b": "a", "a": "b"}).select("a")
377
assert result.collect().schema == {"a": pl.Float32}
378
379
result = (
380
lf.select("a", "b")
381
.rename({"c": "d", "b": "a", "d": "c", "a": "b"}, strict=False)
382
.select("a")
383
)
384
assert result.collect().schema == {"a": pl.Float32}
385
386
387
def test_projection_count_11841() -> None:
388
pl.LazyFrame({"x": 1}).select(records=pl.len()).select(
389
pl.lit(1).alias("x"), pl.all()
390
).collect()
391
392
393
def test_schema_full_outer_join_projection_pd_13287() -> None:
394
lf = pl.LazyFrame({"a": [1, 1], "b": [2, 3]})
395
lf2 = pl.LazyFrame({"a": [1, 1], "c": [2, 3]})
396
397
assert lf.join(
398
lf2,
399
how="full",
400
left_on="a",
401
right_on="c",
402
maintain_order="right_left",
403
).with_columns(
404
pl.col("a").fill_null(pl.col("c")),
405
).select("a").collect().to_dict(as_series=False) == {"a": [2, 3, 1, 1]}
406
407
408
def test_projection_pushdown_full_outer_join_duplicates() -> None:
409
df1 = pl.DataFrame({"a": [1, 2, 3], "b": [10, 20, 30]}).lazy()
410
df2 = pl.DataFrame({"a": [1, 2, 3], "b": [10, 20, 30]}).lazy()
411
assert (
412
df1.join(df2, on="a", how="full", maintain_order="right")
413
.with_columns(c=0)
414
.select("a", "c")
415
.collect()
416
).to_dict(as_series=False) == {"a": [1, 2, 3], "c": [0, 0, 0]}
417
418
419
def test_rolling_key_projected_13617() -> None:
420
df = pl.DataFrame({"idx": [1, 2], "value": ["a", "b"]}).set_sorted("idx")
421
ldf = df.lazy().select(pl.col("value").rolling("idx", period="1i"))
422
plan = ldf.explain(optimizations=pl.QueryOptFlags(projection_pushdown=True))
423
assert r"2/2 COLUMNS" in plan
424
out = ldf.collect(optimizations=pl.QueryOptFlags(projection_pushdown=True))
425
assert out.to_dict(as_series=False) == {"value": [["a"], ["b"]]}
426
427
428
def test_projection_drop_with_series_lit_14382() -> None:
429
df = pl.DataFrame({"b": [1, 6, 8, 7]})
430
df2 = pl.DataFrame({"a": [1, 2, 4, 4], "b": [True, True, True, False]})
431
432
q = (
433
df2.lazy()
434
.select(
435
*["a", "b"], pl.lit("b").alias("b_name"), df.get_column("b").alias("b_old")
436
)
437
.filter(pl.col("b").not_())
438
.drop("b")
439
)
440
assert q.collect().to_dict(as_series=False) == {
441
"a": [4],
442
"b_name": ["b"],
443
"b_old": [7],
444
}
445
446
447
def test_cached_schema_15651() -> None:
448
q = pl.LazyFrame({"col1": [1], "col2": [2], "col3": [3]})
449
q = q.with_row_index()
450
q = q.filter(~pl.col("col1").is_null())
451
# create a subplan diverging from q
452
_ = q.select(pl.len()).collect(
453
optimizations=pl.QueryOptFlags(projection_pushdown=True)
454
)
455
456
# ensure that q's "cached" columns are still correct
457
assert q.collect_schema().names() == q.collect().columns
458
459
460
def test_double_projection_pushdown_15895() -> None:
461
df = (
462
pl.LazyFrame({"A": [0], "B": [1]})
463
.select(C="A", A="B")
464
.group_by(1)
465
.all()
466
.collect(optimizations=pl.QueryOptFlags(projection_pushdown=True))
467
)
468
assert df.to_dict(as_series=False) == {
469
"literal": [1],
470
"C": [[0]],
471
"A": [[1]],
472
}
473
474
475
@pytest.mark.parametrize("join_type", ["inner", "left", "full"])
476
def test_non_coalesce_join_projection_pushdown_16515(
477
join_type: Literal["inner", "left", "full"],
478
) -> None:
479
left = pl.LazyFrame({"x": 1})
480
right = pl.LazyFrame({"y": 1})
481
482
assert (
483
left.join(right, how=join_type, left_on="x", right_on="y", coalesce=False)
484
.select("y")
485
.collect()
486
.item()
487
== 1
488
)
489
490
491
@pytest.mark.parametrize("join_type", ["inner", "left", "full"])
492
def test_non_coalesce_multi_key_join_projection_pushdown_16554(
493
join_type: Literal["inner", "left", "full"],
494
) -> None:
495
lf1 = pl.LazyFrame(
496
{
497
"a": [1, 2, 3, 4, 5],
498
"b": [1, 2, 3, 4, 5],
499
}
500
)
501
lf2 = pl.LazyFrame(
502
{
503
"a": [0, 2, 3, 4, 5],
504
"b": [1, 2, 3, 5, 6],
505
"c": [7, 5, 3, 5, 7],
506
}
507
)
508
509
expect = (
510
lf1.with_columns(a2="a")
511
.join(
512
other=lf2,
513
how=join_type,
514
left_on=["a", "a2"],
515
right_on=["b", "c"],
516
coalesce=False,
517
)
518
.select("a", "b", "c")
519
.collect()
520
)
521
522
out = (
523
lf1.join(
524
other=lf2,
525
how=join_type,
526
left_on=["a", "a"],
527
right_on=["b", "c"],
528
coalesce=False,
529
)
530
.select("a", "b", "c")
531
.collect()
532
)
533
534
assert_frame_equal(out, expect, check_row_order=False)
535
536
537
@pytest.mark.parametrize("how", ["semi", "anti"])
538
def test_projection_pushdown_semi_anti_no_selection(
539
how: Literal["semi", "anti"],
540
) -> None:
541
q_a = pl.LazyFrame({"a": [1, 2, 3]})
542
543
q_b = pl.LazyFrame({"b": [1, 2, 3], "c": [1, 2, 3]})
544
545
assert "1/2 COLUMNS" in (
546
q_a.join(q_b, left_on="a", right_on="b", how=how).explain()
547
)
548
549
550
def test_projection_empty_frame_len_16904() -> None:
551
df = pl.LazyFrame({})
552
553
q = df.select(pl.len())
554
555
assert "0/0 COLUMNS" in q.explain()
556
557
expect = pl.DataFrame({"len": [0]}, schema_overrides={"len": pl.UInt32()})
558
assert_frame_equal(q.collect(), expect)
559
560
561
def test_projection_literal_no_alias_17739() -> None:
562
df = pl.LazyFrame({})
563
assert df.select(pl.lit(False)).select("literal").collect().to_dict(
564
as_series=False
565
) == {"literal": [False]}
566
567
568
def test_projections_collapse_17781() -> None:
569
frame1 = pl.LazyFrame(
570
{
571
"index": [0],
572
"data1": [0],
573
"data2": [0],
574
}
575
)
576
frame2 = pl.LazyFrame(
577
{
578
"index": [0],
579
"label1": [True],
580
"label2": [False],
581
"label3": [False],
582
},
583
schema=[
584
("index", pl.Int64),
585
("label1", pl.Boolean),
586
("label2", pl.Boolean),
587
("label3", pl.Boolean),
588
],
589
)
590
cols = ["index", "data1", "label1", "label2"]
591
592
lf = None
593
for lfj in [frame1, frame2]:
594
use_columns = [c for c in cols if c in lfj.collect_schema().names()]
595
lfj = lfj.select(use_columns)
596
lfj = lfj.select(use_columns)
597
if lf is None:
598
lf = lfj
599
else:
600
lf = lf.join(lfj, on="index", how="left")
601
assert "SELECT " not in lf.explain() # type: ignore[union-attr]
602
603
604
def test_with_columns_projection_pushdown() -> None:
605
# # Summary
606
# `process_hstack` in projection PD incorrectly took a fast-path meant for
607
# LP nodes that don't add new columns to the schema, which stops projection
608
# PD if it sees that the schema lengths on the upper node matches.
609
#
610
# To trigger this, we drop the same number of columns before and after
611
# the with_columns, and in the with_columns we also add the same number of
612
# columns.
613
lf = (
614
pl.scan_csv(
615
b"""\
616
a,b,c,d,e
617
1,1,1,1,1
618
""",
619
include_file_paths="path",
620
)
621
.drop("a", "b")
622
.with_columns(pl.lit(1).alias(x) for x in ["x", "y"])
623
.drop("c", "d")
624
)
625
626
plan = lf.explain().strip()
627
628
assert plan.startswith("WITH_COLUMNS:")
629
# [dyn int: 1.alias("x"), dyn int: 1.alias("y")]
630
# Csv SCAN [20 in-mem bytes]
631
assert plan.endswith("1/6 COLUMNS")
632
633
634
def test_projection_pushdown_height_20221() -> None:
635
q = pl.LazyFrame({"a": range(5)}).select("a", b=pl.col("a").first()).select("b")
636
assert_frame_equal(q.collect(), pl.DataFrame({"b": [0, 0, 0, 0, 0]}))
637
638
639
def test_select_len_20337() -> None:
640
strs = [str(i) for i in range(3)]
641
q = pl.LazyFrame({"a": strs, "b": strs, "c": strs, "d": range(3)})
642
643
q = q.group_by(pl.col("c")).agg(
644
(pl.col("d") * j).alias(f"mult {j}") for j in [1, 2]
645
)
646
647
q = q.with_row_index("foo")
648
assert q.select(pl.len()).collect().item() == 3
649
650
651
def test_filter_count_projection_20902() -> None:
652
lineitem_ldf = pl.LazyFrame(
653
{
654
"l_partkey": [1],
655
"l_quantity": [1],
656
"l_extendedprice": [1],
657
}
658
)
659
assert (
660
"1/3 COLUMNS"
661
in lineitem_ldf.filter(pl.col("l_partkey").is_between(10, 20))
662
.select(pl.len())
663
.explain()
664
)
665
666
667
def test_projection_count_21154() -> None:
668
lf = pl.LazyFrame(
669
{
670
"a": [1, 2, 3],
671
"b": [4, 5, 6],
672
}
673
)
674
675
assert lf.unique("a").select(pl.len()).collect().to_dict(as_series=False) == {
676
"len": [3]
677
}
678
679