Path: blob/main/py-polars/tests/unit/cloud/test_prepare_cloud_plan.py
6939 views
from io import BytesIO1from pathlib import Path23import pytest45import polars as pl6from polars._utils.cloud import prepare_cloud_plan7from polars.exceptions import InvalidOperationError89CLOUD_SOURCE = "s3://my-nonexistent-bucket/dataset"10DST = "s3://my-nonexistent-bucket/output"111213@pytest.mark.parametrize(14"lf",15[16pl.scan_parquet(CLOUD_SOURCE)17.select("c", pl.lit(2))18.with_row_index()19.sink_parquet(DST, lazy=True),20pl.LazyFrame({"a": [1, 2], "b": [3, 4]})21.select("a", "b")22.filter(pl.col("a") == pl.lit(1))23.sink_parquet(DST, lazy=True),24],25)26def test_prepare_cloud_plan(lf: pl.LazyFrame) -> None:27result = prepare_cloud_plan(lf)28assert isinstance(result, bytes)2930deserialized = pl.LazyFrame.deserialize(BytesIO(result))31assert isinstance(deserialized, pl.LazyFrame)323334@pytest.mark.parametrize(35"lf",36[37pl.LazyFrame({"a": [1, 2], "b": [3, 4]})38.select(pl.col("a").map_elements(lambda x: sum(x)))39.sink_parquet(DST, lazy=True),40pl.LazyFrame({"a": [1, 2], "b": [3, 4]})41.select(pl.col("b").map_batches(lambda x: sum(x)))42.sink_parquet(DST, lazy=True),43pl.LazyFrame({"a": [1, 2], "b": [3, 4]})44.map_batches(lambda x: x)45.sink_parquet(DST, lazy=True),46pl.scan_parquet(CLOUD_SOURCE)47.filter(pl.col("a") < pl.lit(1).map_elements(lambda x: x + 1))48.sink_parquet(DST, lazy=True),49pl.LazyFrame({"a": [[1, 2], [3, 4, 5]], "b": [3, 4]})50.select(pl.col("a").map_elements(lambda x: sum(x), return_dtype=pl.Int64))51.sink_parquet(DST, lazy=True),52],53)54def test_prepare_cloud_plan_udf(lf: pl.LazyFrame) -> None:55result = prepare_cloud_plan(lf)56assert isinstance(result, bytes)5758deserialized = pl.LazyFrame.deserialize(BytesIO(result))59assert isinstance(deserialized, pl.LazyFrame)606162def test_prepare_cloud_plan_optimization_toggle() -> None:63lf = pl.LazyFrame({"a": [1, 2], "b": [3, 4]}).sink_parquet(DST, lazy=True)6465result = prepare_cloud_plan(66lf, optimizations=pl.QueryOptFlags(projection_pushdown=False)67)68assert isinstance(result, bytes)6970# TODO: How to check that this optimization was toggled correctly?71deserialized = pl.LazyFrame.deserialize(BytesIO(result))72assert isinstance(deserialized, pl.LazyFrame)737475@pytest.mark.parametrize(76"lf",77[78pl.scan_parquet("data.parquet").sink_parquet(DST, lazy=True),79pl.scan_ndjson(Path("data.ndjson")).sink_parquet(DST, lazy=True),80pl.scan_csv("data-*.csv").sink_parquet(DST, lazy=True),81pl.scan_ipc(["data-1.feather", "data-2.feather"]).sink_parquet(DST, lazy=True),82],83)84def test_prepare_cloud_plan_fail_on_local_data_source(lf: pl.LazyFrame) -> None:85with pytest.raises(86InvalidOperationError,87match="logical plan ineligible for execution on Polars Cloud",88):89prepare_cloud_plan(lf)909192