Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/cloud/test_prepare_cloud_plan.py
6939 views
1
from io import BytesIO
2
from pathlib import Path
3
4
import pytest
5
6
import polars as pl
7
from polars._utils.cloud import prepare_cloud_plan
8
from polars.exceptions import InvalidOperationError
9
10
CLOUD_SOURCE = "s3://my-nonexistent-bucket/dataset"
11
DST = "s3://my-nonexistent-bucket/output"
12
13
14
@pytest.mark.parametrize(
15
"lf",
16
[
17
pl.scan_parquet(CLOUD_SOURCE)
18
.select("c", pl.lit(2))
19
.with_row_index()
20
.sink_parquet(DST, lazy=True),
21
pl.LazyFrame({"a": [1, 2], "b": [3, 4]})
22
.select("a", "b")
23
.filter(pl.col("a") == pl.lit(1))
24
.sink_parquet(DST, lazy=True),
25
],
26
)
27
def test_prepare_cloud_plan(lf: pl.LazyFrame) -> None:
28
result = prepare_cloud_plan(lf)
29
assert isinstance(result, bytes)
30
31
deserialized = pl.LazyFrame.deserialize(BytesIO(result))
32
assert isinstance(deserialized, pl.LazyFrame)
33
34
35
@pytest.mark.parametrize(
36
"lf",
37
[
38
pl.LazyFrame({"a": [1, 2], "b": [3, 4]})
39
.select(pl.col("a").map_elements(lambda x: sum(x)))
40
.sink_parquet(DST, lazy=True),
41
pl.LazyFrame({"a": [1, 2], "b": [3, 4]})
42
.select(pl.col("b").map_batches(lambda x: sum(x)))
43
.sink_parquet(DST, lazy=True),
44
pl.LazyFrame({"a": [1, 2], "b": [3, 4]})
45
.map_batches(lambda x: x)
46
.sink_parquet(DST, lazy=True),
47
pl.scan_parquet(CLOUD_SOURCE)
48
.filter(pl.col("a") < pl.lit(1).map_elements(lambda x: x + 1))
49
.sink_parquet(DST, lazy=True),
50
pl.LazyFrame({"a": [[1, 2], [3, 4, 5]], "b": [3, 4]})
51
.select(pl.col("a").map_elements(lambda x: sum(x), return_dtype=pl.Int64))
52
.sink_parquet(DST, lazy=True),
53
],
54
)
55
def test_prepare_cloud_plan_udf(lf: pl.LazyFrame) -> None:
56
result = prepare_cloud_plan(lf)
57
assert isinstance(result, bytes)
58
59
deserialized = pl.LazyFrame.deserialize(BytesIO(result))
60
assert isinstance(deserialized, pl.LazyFrame)
61
62
63
def test_prepare_cloud_plan_optimization_toggle() -> None:
64
lf = pl.LazyFrame({"a": [1, 2], "b": [3, 4]}).sink_parquet(DST, lazy=True)
65
66
result = prepare_cloud_plan(
67
lf, optimizations=pl.QueryOptFlags(projection_pushdown=False)
68
)
69
assert isinstance(result, bytes)
70
71
# TODO: How to check that this optimization was toggled correctly?
72
deserialized = pl.LazyFrame.deserialize(BytesIO(result))
73
assert isinstance(deserialized, pl.LazyFrame)
74
75
76
@pytest.mark.parametrize(
77
"lf",
78
[
79
pl.scan_parquet("data.parquet").sink_parquet(DST, lazy=True),
80
pl.scan_ndjson(Path("data.ndjson")).sink_parquet(DST, lazy=True),
81
pl.scan_csv("data-*.csv").sink_parquet(DST, lazy=True),
82
pl.scan_ipc(["data-1.feather", "data-2.feather"]).sink_parquet(DST, lazy=True),
83
],
84
)
85
def test_prepare_cloud_plan_fail_on_local_data_source(lf: pl.LazyFrame) -> None:
86
with pytest.raises(
87
InvalidOperationError,
88
match="logical plan ineligible for execution on Polars Cloud",
89
):
90
prepare_cloud_plan(lf)
91
92