Path: blob/main/py-polars/tests/unit/io/test_lazy_ipc.py
8460 views
from __future__ import annotations12import io3import typing4from typing import IO, TYPE_CHECKING, Any56import pyarrow.ipc7import pytest89import polars as pl10from polars.interchange.protocol import CompatLevel11from polars.testing.asserts.frame import assert_frame_equal1213if TYPE_CHECKING:14from pathlib import Path1516from polars._typing import IpcCompression17from tests.conftest import PlMonkeyPatch1819COMPRESSIONS = ["uncompressed", "lz4", "zstd"]202122@pytest.fixture23def foods_ipc_path(io_files_path: Path) -> Path:24return io_files_path / "foods1.ipc"252627def test_row_index(foods_ipc_path: Path) -> None:28df = pl.read_ipc(foods_ipc_path, row_index_name="row_index", use_pyarrow=False)29assert df["row_index"].to_list() == list(range(27))3031df = (32pl.scan_ipc(foods_ipc_path, row_index_name="row_index")33.filter(pl.col("category") == pl.lit("vegetables"))34.collect()35)3637assert df["row_index"].to_list() == [0, 6, 11, 13, 14, 20, 25]3839df = (40pl.scan_ipc(foods_ipc_path, row_index_name="row_index")41.with_row_index("foo", 10)42.filter(pl.col("category") == pl.lit("vegetables"))43.collect()44)4546assert df["foo"].to_list() == [10, 16, 21, 23, 24, 30, 35]474849def test_is_in_type_coercion(foods_ipc_path: Path) -> None:50out = (51pl.scan_ipc(foods_ipc_path)52.filter(pl.col("category").is_in(("vegetables", "ice cream")))53.collect()54)55assert out.shape == (7, 4)56out = (57pl.scan_ipc(foods_ipc_path)58.select(pl.col("category").alias("cat"))59.filter(pl.col("cat").is_in(["vegetables"]))60.collect()61)62assert out.shape == (7, 1)636465def test_row_index_schema(foods_ipc_path: Path) -> None:66assert (67pl.scan_ipc(foods_ipc_path, row_index_name="id")68.select(["id", "category"])69.collect()70).dtypes == [pl.get_index_type(), pl.String]717273def test_glob_n_rows(io_files_path: Path) -> None:74file_path = io_files_path / "foods*.ipc"75df = pl.scan_ipc(file_path, n_rows=40).collect()7677# 27 rows from foods1.ipc and 13 from foods2.ipc78assert df.shape == (40, 4)7980# take first and last rows81assert df[[0, 39]].to_dict(as_series=False) == {82"category": ["vegetables", "seafood"],83"calories": [45, 146],84"fats_g": [0.5, 6.0],85"sugars_g": [2, 2],86}878889def test_ipc_list_arg(io_files_path: Path) -> None:90first = io_files_path / "foods1.ipc"91second = io_files_path / "foods2.ipc"9293df = pl.scan_ipc(source=[first, second]).collect()94assert df.shape == (54, 4)95assert df.row(-1) == ("seafood", 194, 12.0, 1)96assert df.row(0) == ("vegetables", 45, 0.5, 2)979899def test_scan_ipc_local_with_async(100plmonkeypatch: PlMonkeyPatch,101io_files_path: Path,102) -> None:103plmonkeypatch.setenv("POLARS_VERBOSE", "1")104plmonkeypatch.setenv("POLARS_FORCE_ASYNC", "1")105106assert_frame_equal(107pl.scan_ipc(io_files_path / "foods1.ipc").head(1).collect(),108pl.DataFrame(109{110"category": ["vegetables"],111"calories": [45],112"fats_g": [0.5],113"sugars_g": [2],114}115),116)117118119def test_sink_ipc_compat_level_22930() -> None:120df = pl.DataFrame({"a": ["foo"]})121122f1 = io.BytesIO()123f2 = io.BytesIO()124125df.lazy().sink_ipc(f1, compat_level=CompatLevel.oldest(), engine="in-memory")126df.lazy().sink_ipc(f2, compat_level=CompatLevel.oldest(), engine="streaming")127128f1.seek(0)129f2.seek(0)130131t1 = pyarrow.ipc.open_file(f1)132assert "large_string" in str(t1.schema)133assert_frame_equal(pl.DataFrame(t1.read_all()), df)134135t2 = pyarrow.ipc.open_file(f2)136assert "large_string" in str(t2.schema)137assert_frame_equal(pl.DataFrame(t2.read_all()), df)138139140def test_scan_file_info_cache(141capfd: Any, plmonkeypatch: PlMonkeyPatch, foods_ipc_path: Path142) -> None:143plmonkeypatch.setenv("POLARS_VERBOSE", "1")144a = pl.scan_ipc(foods_ipc_path)145b = pl.scan_ipc(foods_ipc_path)146147a.join(b, how="cross").explain()148149captured = capfd.readouterr().err150assert "FILE_INFO CACHE HIT" in captured151152153def test_scan_ipc_file_async(154plmonkeypatch: PlMonkeyPatch,155io_files_path: Path,156) -> None:157plmonkeypatch.setenv("POLARS_FORCE_ASYNC", "1")158159foods1 = io_files_path / "foods1.ipc"160161df = pl.scan_ipc(foods1).collect()162163assert_frame_equal(164pl.scan_ipc(foods1).select(pl.len()).collect(), df.select(pl.len())165)166167assert_frame_equal(168pl.scan_ipc(foods1).head(1).collect(),169df.head(1),170)171172assert_frame_equal(173pl.scan_ipc(foods1).tail(1).collect(),174df.tail(1),175)176177assert_frame_equal(178pl.scan_ipc(foods1).slice(-1, 1).collect(),179df.slice(-1, 1),180)181182assert_frame_equal(183pl.scan_ipc(foods1).slice(7, 10).collect(),184df.slice(7, 10),185)186187assert_frame_equal(188pl.scan_ipc(foods1).select(pl.col.calories).collect(),189df.select(pl.col.calories),190)191192assert_frame_equal(193pl.scan_ipc(foods1).select([pl.col.calories, pl.col.category]).collect(),194df.select([pl.col.calories, pl.col.category]),195)196197assert_frame_equal(198pl.scan_ipc([foods1, foods1]).collect(),199pl.concat([df, df]),200)201202assert_frame_equal(203pl.scan_ipc(foods1).select(pl.col.calories.sum()).collect(),204df.select(pl.col.calories.sum()),205)206207assert_frame_equal(208pl.scan_ipc(foods1, row_index_name="ri", row_index_offset=42)209.slice(0, 1)210.select(pl.col.ri)211.collect(),212df.with_row_index(name="ri", offset=42).slice(0, 1).select(pl.col.ri),213)214215216def test_scan_ipc_file_async_dict(217plmonkeypatch: PlMonkeyPatch,218) -> None:219plmonkeypatch.setenv("POLARS_FORCE_ASYNC", "1")220221buf = io.BytesIO()222lf = pl.LazyFrame(223{"cat": ["A", "B", "C", "A", "C", "B"]}, schema={"cat": pl.Categorical}224).with_row_index()225lf.sink_ipc(buf)226227out = pl.scan_ipc(buf).collect()228expected = lf.collect()229assert_frame_equal(out, expected)230231232# TODO: create multiple record batches through API instead of env variable233def test_scan_ipc_file_async_multiple_record_batches(234plmonkeypatch: PlMonkeyPatch,235) -> None:236plmonkeypatch.setenv("POLARS_FORCE_ASYNC", "1")237plmonkeypatch.setenv("POLARS_IDEAL_SINK_MORSEL_SIZE_ROWS", "10")238239buf = io.BytesIO()240lf = pl.LazyFrame({"a": list(range(100))})241lf.sink_ipc(buf)242df = lf.collect()243244buffers = typing.cast("list[IO[bytes]]", [buf, buf])245246assert_frame_equal(247pl.scan_ipc(buf).collect(),248df,249)250251assert_frame_equal(252pl.scan_ipc(buf).head(15).collect(),253df.head(15),254)255256assert_frame_equal(257pl.scan_ipc(buf).tail(15).collect(),258df.tail(15),259)260261assert_frame_equal(262pl.scan_ipc(buf).slice(45, 20).collect(),263df.slice(45, 20),264)265266assert_frame_equal(267pl.scan_ipc(buffers).slice(85, 30).collect(),268pl.concat([df.slice(85, 15), df.slice(0, 15)]),269)270271assert_frame_equal(272pl.scan_ipc(buf).select(pl.col.a.sum()).collect(),273df.select(pl.col.a.sum()),274)275276assert_frame_equal(277pl.scan_ipc(buffers, row_index_name="ri").tail(15).select(pl.col.ri).collect(),278pl.concat([df, df]).with_row_index("ri").tail(15).select(pl.col.ri),279)280281282@pytest.mark.parametrize("n_a", [1, 999])283@pytest.mark.parametrize("n_b", [1, 12, 13, 999]) # problem starts 13284@pytest.mark.parametrize("compression", COMPRESSIONS)285def test_scan_ipc_varying_block_metadata_len_c4812(286n_a: int, n_b: int, compression: IpcCompression, plmonkeypatch: PlMonkeyPatch287) -> None:288plmonkeypatch.setenv("POLARS_FORCE_ASYNC", "1")289290buf = io.BytesIO()291df = pl.DataFrame({"a": [n_a * "A", n_b * "B"]})292df.lazy().sink_ipc(buf, compression=compression, record_batch_size=1)293294with pyarrow.ipc.open_file(buf) as reader:295assert [296reader.get_batch(i).num_rows for i in range(reader.num_record_batches)297] == [1, 1]298299assert_frame_equal(pl.scan_ipc(buf).collect(), df)300301302@pytest.mark.parametrize(303"record_batch_size", [1, 2, 5, 7, 50, 99, 100, 101, 299, 300, 100_000]304)305@pytest.mark.parametrize("n_chunks", [1, 2, 3])306def test_sink_ipc_record_batch_size(record_batch_size: int, n_chunks: int) -> None:307n_rows = 100308buf = io.BytesIO()309310df0 = pl.DataFrame({"a": list(range(n_rows))})311df = df0312while n_chunks > 1:313df = pl.concat([df, df0])314n_chunks -= 1315316df.lazy().sink_ipc(buf, record_batch_size=record_batch_size)317318buf.seek(0)319out = pl.scan_ipc(buf).collect()320assert_frame_equal(out, df)321322buf.seek(0)323reader = pyarrow.ipc.open_file(buf)324n_batches = reader.num_record_batches325for i in range(n_batches):326n_rows = reader.get_batch(i).num_rows327assert n_rows == record_batch_size or (328i + 1 == n_batches and n_rows <= record_batch_size329)330331332@pytest.mark.parametrize("record_batch_size", [None, 3])333@pytest.mark.parametrize("slice", [(0, 0), (0, 1), (0, 5), (4, 7), (-1, 1), (-5, 4)])334@pytest.mark.parametrize("compression", COMPRESSIONS)335def test_scan_ipc_compression_with_slice_26063(336record_batch_size: int, slice: tuple[int, int], compression: IpcCompression337) -> None:338n_rows = 15339df = pl.DataFrame({"a": range(n_rows)}).with_columns(340pl.col.a.pow(3).cast(pl.String).alias("b")341)342buf = io.BytesIO()343344df.lazy().sink_ipc(345buf, compression=compression, record_batch_size=record_batch_size346)347out = pl.scan_ipc(buf).slice(slice[0], slice[1]).collect()348expected = df.slice(slice[0], slice[1])349assert_frame_equal(out, expected)350351352def test_sink_scan_ipc_round_trip_statistics() -> None:353n_rows = 4_000 # must be higher than (n_vCPU)^2 to avoid sortedness inference354buf = io.BytesIO()355356df = (357pl.DataFrame({"a": range(n_rows)})358.with_columns(pl.col.a.reverse().alias("b"))359.with_columns(pl.col.a.shuffle().alias("d"))360.with_columns(pl.col.a.shuffle().sort().alias("d"))361)362df.lazy().sink_ipc(buf, _record_batch_statistics=True)363364metadata = df._to_metadata()365366# baseline367assert metadata.select(pl.col("sorted_asc").sum()).item() == 2368assert metadata.select(pl.col("sorted_dsc").sum()).item() == 1369370# round-trip371out = pl.scan_ipc(buf, _record_batch_statistics=True).collect()372assert_frame_equal(metadata, out._to_metadata())373374# do not read unless requested375out = pl.scan_ipc(buf).collect()376assert out._to_metadata().select(pl.col("sorted_asc").sum()).item() == 0377assert out._to_metadata().select(pl.col("sorted_dsc").sum()).item() == 0378379# remain pyarrow compatible380out = pl.read_ipc(buf, use_pyarrow=True)381assert_frame_equal(df, out)382383384@pytest.mark.parametrize(385"selection",386[["b"], ["a", "b", "c", "d"], ["d", "c", "a", "b"], ["d", "a", "b"]],387)388@pytest.mark.parametrize("record_batch_size", [None, 100])389def test_sink_scan_ipc_round_trip_statistics_projection(390selection: list[str], record_batch_size: int391) -> None:392n_rows = 4_000 # must be higher than (n_vCPU)^2 to avoid sortedness inference393buf = io.BytesIO()394395df = (396pl.DataFrame({"a": range(n_rows)})397.with_columns(pl.col.a.reverse().alias("b"))398.with_columns(pl.col.a.shuffle().alias("c"))399.with_columns(pl.col.a.shuffle().sort().alias("d"))400)401df.lazy().sink_ipc(402buf, record_batch_size=record_batch_size, _record_batch_statistics=True403)404405# round-trip with projection406df = df.select(selection)407out = pl.scan_ipc(buf, _record_batch_statistics=True).select(selection).collect()408assert_frame_equal(df, out)409assert_frame_equal(df._to_metadata(), out._to_metadata())410411412