Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/cloud/test_prepare_cloud_plan.py
8416 views
1
from io import BytesIO
2
from pathlib import Path
3
4
import pytest
5
6
import polars as pl
7
from polars._utils.cloud import prepare_cloud_plan
8
from polars.exceptions import InvalidOperationError
9
10
CLOUD_SOURCE = "s3://my-nonexistent-bucket/dataset"
11
DST = "s3://my-nonexistent-bucket/output"
12
13
14
@pytest.mark.parametrize(
15
"lf",
16
[
17
pl.scan_parquet(CLOUD_SOURCE)
18
.select("c", pl.lit(2))
19
.with_row_index()
20
.sink_parquet(DST, lazy=True),
21
pl.LazyFrame({"a": [1, 2], "b": [3, 4]})
22
.select("a", "b")
23
.filter(pl.col("a") == pl.lit(1))
24
.sink_parquet(DST, lazy=True),
25
],
26
)
27
def test_prepare_cloud_plan(lf: pl.LazyFrame) -> None:
28
(result, _flags) = prepare_cloud_plan(lf, allow_local_scans=False)
29
assert isinstance(result, bytes)
30
31
deserialized = pl.LazyFrame.deserialize(BytesIO(result))
32
assert isinstance(deserialized, pl.LazyFrame)
33
34
35
@pytest.mark.parametrize(
36
"lf",
37
[
38
pl.LazyFrame({"a": [1, 2], "b": [3, 4]})
39
.select(pl.col("a").map_elements(lambda x: sum(x)))
40
.sink_parquet(DST, lazy=True),
41
pl.LazyFrame({"a": [1, 2], "b": [3, 4]})
42
.select(pl.col("b").map_batches(lambda x: sum(x)))
43
.sink_parquet(DST, lazy=True),
44
pl.LazyFrame({"a": [1, 2], "b": [3, 4]})
45
.map_batches(lambda x: x)
46
.sink_parquet(DST, lazy=True),
47
pl.scan_parquet(CLOUD_SOURCE)
48
.filter(pl.col("a") < pl.lit(1).map_elements(lambda x: x + 1))
49
.sink_parquet(DST, lazy=True),
50
pl.LazyFrame({"a": [[1, 2], [3, 4, 5]], "b": [3, 4]})
51
.select(pl.col("a").map_elements(lambda x: sum(x), return_dtype=pl.Int64))
52
.sink_parquet(DST, lazy=True),
53
],
54
)
55
def test_prepare_cloud_plan_udf(lf: pl.LazyFrame) -> None:
56
(result, _flags) = prepare_cloud_plan(lf, allow_local_scans=False)
57
assert isinstance(result, bytes)
58
59
deserialized = pl.LazyFrame.deserialize(BytesIO(result))
60
assert isinstance(deserialized, pl.LazyFrame)
61
62
63
def test_prepare_cloud_plan_optimization_toggle() -> None:
64
lf = pl.LazyFrame({"a": [1, 2], "b": [3, 4]}).sink_parquet(DST, lazy=True)
65
66
(result, _flags) = prepare_cloud_plan(
67
lf,
68
allow_local_scans=False,
69
optimizations=pl.QueryOptFlags(projection_pushdown=False),
70
)
71
assert isinstance(result, bytes)
72
73
# TODO: How to check that this optimization was toggled correctly?
74
deserialized = pl.LazyFrame.deserialize(BytesIO(result))
75
assert isinstance(deserialized, pl.LazyFrame)
76
77
78
@pytest.mark.parametrize(
79
"lf",
80
[
81
pl.scan_parquet("data.parquet").sink_parquet(DST, lazy=True),
82
pl.scan_ndjson(Path("data.ndjson")).sink_parquet(DST, lazy=True),
83
pl.scan_csv("data-*.csv").sink_parquet(DST, lazy=True),
84
pl.scan_ipc(["data-1.feather", "data-2.feather"]).sink_parquet(DST, lazy=True),
85
],
86
)
87
def test_prepare_cloud_plan_fail_on_local_data_source(lf: pl.LazyFrame) -> None:
88
with pytest.raises(
89
InvalidOperationError,
90
match="logical plan ineligible for execution on Polars Cloud",
91
):
92
prepare_cloud_plan(lf, allow_local_scans=False)
93
94
95
@pytest.mark.parametrize(
96
"lf",
97
[
98
pl.scan_parquet("data.parquet").sink_parquet(DST, lazy=True),
99
pl.scan_ndjson(Path("data.ndjson")).sink_parquet(DST, lazy=True),
100
pl.scan_csv("data-*.csv").sink_parquet(DST, lazy=True),
101
pl.scan_ipc(["data-1.feather", "data-2.feather"]).sink_parquet(DST, lazy=True),
102
],
103
)
104
def test_prepare_cloud_plan_succeed_on_local_data_source(lf: pl.LazyFrame) -> None:
105
(result, _flags) = prepare_cloud_plan(lf, allow_local_scans=True)
106
assert isinstance(result, bytes)
107
108