Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/streaming/test_streaming_cse.py
6939 views
1
from __future__ import annotations
2
3
from typing import Any
4
5
import pytest
6
7
import polars as pl
8
from polars.testing import assert_frame_equal
9
10
pytestmark = pytest.mark.xdist_group("streaming")
11
12
13
def test_cse_expr_selection_streaming(monkeypatch: Any) -> None:
14
monkeypatch.setenv("POLARS_VERBOSE", "1")
15
q = pl.LazyFrame(
16
{
17
"a": [1, 2, 3, 4],
18
"b": [1, 2, 3, 4],
19
"c": [1, 2, 3, 4],
20
}
21
)
22
23
derived = pl.col("a") * pl.col("b")
24
derived2 = derived * derived
25
26
exprs = [
27
derived.alias("d1"),
28
derived2.alias("d2"),
29
(derived2 * 10).alias("d3"),
30
]
31
32
result = q.select(exprs).collect(
33
optimizations=pl.QueryOptFlags(comm_subexpr_elim=True), engine="streaming"
34
)
35
expected = pl.DataFrame(
36
{"d1": [1, 4, 9, 16], "d2": [1, 16, 81, 256], "d3": [10, 160, 810, 2560]}
37
)
38
assert_frame_equal(result, expected)
39
40
result = q.with_columns(exprs).collect(
41
optimizations=pl.QueryOptFlags(comm_subexpr_elim=True), engine="streaming"
42
)
43
expected = pl.DataFrame(
44
{
45
"a": [1, 2, 3, 4],
46
"b": [1, 2, 3, 4],
47
"c": [1, 2, 3, 4],
48
"d1": [1, 4, 9, 16],
49
"d2": [1, 16, 81, 256],
50
"d3": [10, 160, 810, 2560],
51
}
52
)
53
assert_frame_equal(result, expected)
54
55
56
def test_cse_expr_group_by() -> None:
57
q = pl.LazyFrame(
58
{
59
"a": [1, 2, 3, 4],
60
"b": [1, 2, 3, 4],
61
"c": [1, 2, 3, 4],
62
}
63
)
64
65
derived = pl.col("a") * pl.col("b")
66
67
q = (
68
q.group_by("a")
69
.agg(derived.sum().alias("sum"), derived.min().alias("min"))
70
.sort("min")
71
)
72
73
assert "__POLARS_CSER" in q.explain(
74
optimizations=pl.QueryOptFlags(comm_subexpr_elim=True)
75
)
76
77
expected = pl.DataFrame(
78
{"a": [1, 2, 3, 4], "sum": [1, 4, 9, 16], "min": [1, 4, 9, 16]}
79
)
80
for streaming in [True, False]:
81
out = q.collect(
82
optimizations=pl.QueryOptFlags(comm_subexpr_elim=True),
83
engine="streaming" if streaming else "in-memory",
84
)
85
assert_frame_equal(out, expected)
86
87