Path: blob/main/py-polars/tests/unit/lazyframe/test_serde.py
8424 views
from __future__ import annotations12import io3from typing import TYPE_CHECKING45import pytest6from hypothesis import example, given78import polars as pl9from polars.exceptions import ComputeError10from polars.testing import assert_frame_equal11from polars.testing.parametric import dataframes1213if TYPE_CHECKING:14from pathlib import Path1516from polars._typing import SerializationFormat17from tests.conftest import PlMonkeyPatch181920@given(21lf=dataframes(22lazy=True,23excluded_dtypes=[pl.Struct],24)25)26@example(lf=pl.LazyFrame({"foo": ["a", "b", "a"]}, schema={"foo": pl.Enum(["b", "a"])}))27def test_lf_serde_roundtrip_binary(lf: pl.LazyFrame) -> None:28serialized = lf.serialize(format="binary")29result = pl.LazyFrame.deserialize(io.BytesIO(serialized), format="binary")30assert_frame_equal(result, lf, categorical_as_str=True)313233@given(34lf=dataframes(35lazy=True,36excluded_dtypes=[37pl.Float32, # Bug, see: https://github.com/pola-rs/polars/issues/1721138pl.Float64, # Bug, see: https://github.com/pola-rs/polars/issues/1721139pl.Struct, # Outer nullability not supported40],41)42)43@pytest.mark.filterwarnings("ignore")44def test_lf_serde_roundtrip_json(lf: pl.LazyFrame) -> None:45serialized = lf.serialize(format="json")46result = pl.LazyFrame.deserialize(io.StringIO(serialized), format="json")47assert_frame_equal(result, lf, categorical_as_str=True)484950@pytest.fixture51def lf() -> pl.LazyFrame:52"""Sample LazyFrame for testing serialization/deserialization."""53return pl.LazyFrame({"a": [1, 2, 3], "b": ["a", "b", "c"]}).select("a").sum()545556@pytest.mark.filterwarnings("ignore")57def test_lf_serde_json_stringio(lf: pl.LazyFrame) -> None:58serialized = lf.serialize(format="json")59assert isinstance(serialized, str)60result = pl.LazyFrame.deserialize(io.StringIO(serialized), format="json")61assert_frame_equal(result, lf)626364def test_lf_serde(lf: pl.LazyFrame) -> None:65serialized = lf.serialize()66assert isinstance(serialized, bytes)67result = pl.LazyFrame.deserialize(io.BytesIO(serialized))68assert_frame_equal(result, lf)697071@pytest.mark.parametrize(72("format", "buf"),73[74("binary", io.BytesIO()),75("json", io.StringIO()),76("json", io.BytesIO()),77],78)79@pytest.mark.filterwarnings("ignore")80def test_lf_serde_to_from_buffer(81lf: pl.LazyFrame, format: SerializationFormat, buf: io.IOBase82) -> None:83lf.serialize(buf, format=format)84buf.seek(0)85result = pl.LazyFrame.deserialize(buf, format=format)86assert_frame_equal(lf, result)878889@pytest.mark.write_disk90def test_lf_serde_to_from_file(lf: pl.LazyFrame, tmp_path: Path) -> None:91tmp_path.mkdir(exist_ok=True)9293file_path = tmp_path / "small.bin"94lf.serialize(file_path)95result = pl.LazyFrame.deserialize(file_path)9697assert_frame_equal(lf, result)9899100def test_lf_deserialize_validation() -> None:101f = io.BytesIO(b"hello world!")102with pytest.raises(ComputeError, match="expected value at line 1 column 1"):103pl.LazyFrame.deserialize(f, format="json")104105106@pytest.mark.write_disk107def test_lf_serde_scan(tmp_path: Path) -> None:108tmp_path.mkdir(exist_ok=True)109path = tmp_path / "dataset.parquet"110111df = pl.DataFrame({"a": [1, 2, 3], "b": ["x", "y", "z"]})112df.write_parquet(path)113lf = pl.scan_parquet(path)114115ser = lf.serialize()116result = pl.LazyFrame.deserialize(io.BytesIO(ser))117assert_frame_equal(result, lf)118assert_frame_equal(result.collect(), df)119120121@pytest.mark.filterwarnings("ignore::polars.exceptions.PolarsInefficientMapWarning")122def test_lf_serde_version_specific_lambda() -> None:123lf = pl.LazyFrame({"a": [1, 2, 3]}).select(124pl.col("a").map_elements(lambda x: x + 1, return_dtype=pl.Int64)125)126ser = lf.serialize()127128result = pl.LazyFrame.deserialize(io.BytesIO(ser))129expected = pl.LazyFrame({"a": [2, 3, 4]})130assert_frame_equal(result, expected)131132133def custom_function(x: pl.Series) -> pl.Series:134return x + 1135136137@pytest.mark.may_fail_cloud # reason: cloud does not have access to this scope138@pytest.mark.filterwarnings("ignore::polars.exceptions.PolarsInefficientMapWarning")139def test_lf_serde_version_specific_named_function() -> None:140lf = pl.LazyFrame({"a": [1, 2, 3]}).select(141pl.col("a").map_batches(custom_function, return_dtype=pl.Int64)142)143ser = lf.serialize()144145result = pl.LazyFrame.deserialize(io.BytesIO(ser))146expected = pl.LazyFrame({"a": [2, 3, 4]})147assert_frame_equal(result, expected)148149150@pytest.mark.filterwarnings("ignore::polars.exceptions.PolarsInefficientMapWarning")151def test_lf_serde_map_batches_on_lazyframe() -> None:152lf = pl.LazyFrame({"a": [1, 2, 3]}).map_batches(lambda x: x + 1)153ser = lf.serialize()154155result = pl.LazyFrame.deserialize(io.BytesIO(ser))156expected = pl.LazyFrame({"a": [2, 3, 4]})157assert_frame_equal(result, expected)158159160@pytest.mark.parametrize("max_byte_slice_len", [1, 2, 3, 100, 4294967295])161def test_lf_serde_chunked_bytes(162plmonkeypatch: PlMonkeyPatch, max_byte_slice_len: int163) -> None:164plmonkeypatch.setenv(165"POLARS_SERIALIZE_LAZYFRAME_MAX_BYTE_SLICE_LEN", str(max_byte_slice_len)166)167lf = pl.LazyFrame({"a": range(5000)})168169b = lf.serialize()170171assert_frame_equal(pl.LazyFrame.deserialize(io.BytesIO(b)).collect(), lf.collect())172173174def test_lf_collect_schema_does_not_change_serialize_25719() -> None:175df = pl.DataFrame({"x": [1, 2, 3]})176177lf = df.lazy()178lf.collect_schema()179180assert lf.serialize() == df.lazy().serialize()181lf_sum = lf.sum()182lf_sum.collect_schema()183assert lf_sum.serialize() == df.lazy().sum().serialize()184185q = pl.concat([lf_sum, lf_sum])186187assert_frame_equal(188pl.LazyFrame.deserialize(q.serialize()).collect(),189pl.DataFrame({"x": [6, 6]}),190)191192193