Path: blob/main/py-polars/tests/unit/cloud/test_prepare_cloud_plan.py
8416 views
from io import BytesIO1from pathlib import Path23import pytest45import polars as pl6from polars._utils.cloud import prepare_cloud_plan7from polars.exceptions import InvalidOperationError89CLOUD_SOURCE = "s3://my-nonexistent-bucket/dataset"10DST = "s3://my-nonexistent-bucket/output"111213@pytest.mark.parametrize(14"lf",15[16pl.scan_parquet(CLOUD_SOURCE)17.select("c", pl.lit(2))18.with_row_index()19.sink_parquet(DST, lazy=True),20pl.LazyFrame({"a": [1, 2], "b": [3, 4]})21.select("a", "b")22.filter(pl.col("a") == pl.lit(1))23.sink_parquet(DST, lazy=True),24],25)26def test_prepare_cloud_plan(lf: pl.LazyFrame) -> None:27(result, _flags) = prepare_cloud_plan(lf, allow_local_scans=False)28assert isinstance(result, bytes)2930deserialized = pl.LazyFrame.deserialize(BytesIO(result))31assert isinstance(deserialized, pl.LazyFrame)323334@pytest.mark.parametrize(35"lf",36[37pl.LazyFrame({"a": [1, 2], "b": [3, 4]})38.select(pl.col("a").map_elements(lambda x: sum(x)))39.sink_parquet(DST, lazy=True),40pl.LazyFrame({"a": [1, 2], "b": [3, 4]})41.select(pl.col("b").map_batches(lambda x: sum(x)))42.sink_parquet(DST, lazy=True),43pl.LazyFrame({"a": [1, 2], "b": [3, 4]})44.map_batches(lambda x: x)45.sink_parquet(DST, lazy=True),46pl.scan_parquet(CLOUD_SOURCE)47.filter(pl.col("a") < pl.lit(1).map_elements(lambda x: x + 1))48.sink_parquet(DST, lazy=True),49pl.LazyFrame({"a": [[1, 2], [3, 4, 5]], "b": [3, 4]})50.select(pl.col("a").map_elements(lambda x: sum(x), return_dtype=pl.Int64))51.sink_parquet(DST, lazy=True),52],53)54def test_prepare_cloud_plan_udf(lf: pl.LazyFrame) -> None:55(result, _flags) = prepare_cloud_plan(lf, allow_local_scans=False)56assert isinstance(result, bytes)5758deserialized = pl.LazyFrame.deserialize(BytesIO(result))59assert isinstance(deserialized, pl.LazyFrame)606162def test_prepare_cloud_plan_optimization_toggle() -> None:63lf = pl.LazyFrame({"a": [1, 2], "b": [3, 4]}).sink_parquet(DST, lazy=True)6465(result, _flags) = prepare_cloud_plan(66lf,67allow_local_scans=False,68optimizations=pl.QueryOptFlags(projection_pushdown=False),69)70assert isinstance(result, bytes)7172# TODO: How to check that this optimization was toggled correctly?73deserialized = pl.LazyFrame.deserialize(BytesIO(result))74assert isinstance(deserialized, pl.LazyFrame)757677@pytest.mark.parametrize(78"lf",79[80pl.scan_parquet("data.parquet").sink_parquet(DST, lazy=True),81pl.scan_ndjson(Path("data.ndjson")).sink_parquet(DST, lazy=True),82pl.scan_csv("data-*.csv").sink_parquet(DST, lazy=True),83pl.scan_ipc(["data-1.feather", "data-2.feather"]).sink_parquet(DST, lazy=True),84],85)86def test_prepare_cloud_plan_fail_on_local_data_source(lf: pl.LazyFrame) -> None:87with pytest.raises(88InvalidOperationError,89match="logical plan ineligible for execution on Polars Cloud",90):91prepare_cloud_plan(lf, allow_local_scans=False)929394@pytest.mark.parametrize(95"lf",96[97pl.scan_parquet("data.parquet").sink_parquet(DST, lazy=True),98pl.scan_ndjson(Path("data.ndjson")).sink_parquet(DST, lazy=True),99pl.scan_csv("data-*.csv").sink_parquet(DST, lazy=True),100pl.scan_ipc(["data-1.feather", "data-2.feather"]).sink_parquet(DST, lazy=True),101],102)103def test_prepare_cloud_plan_succeed_on_local_data_source(lf: pl.LazyFrame) -> None:104(result, _flags) = prepare_cloud_plan(lf, allow_local_scans=True)105assert isinstance(result, bytes)106107108