Path: blob/main/py-polars/tests/unit/streaming/test_streaming_unique.py
8416 views
from __future__ import annotations12import os3from typing import TYPE_CHECKING, Any45import pytest67import polars as pl8from polars.testing import assert_frame_equal910if TYPE_CHECKING:11from pathlib import Path1213from tests.conftest import PlMonkeyPatch1415pytestmark = pytest.mark.xdist_group("streaming")161718@pytest.mark.write_disk19@pytest.mark.slow20def test_streaming_out_of_core_unique(21io_files_path: Path, tmp_path: Path, plmonkeypatch: PlMonkeyPatch, capfd: Any22) -> None:23morsel_size = os.environ.get("POLARS_IDEAL_MORSEL_SIZE")24if morsel_size is not None and int(morsel_size) < 1000:25pytest.skip("test is too slow for small morsel sizes")2627tmp_path.mkdir(exist_ok=True)28plmonkeypatch.setenv("POLARS_TEMP_DIR", str(tmp_path))29plmonkeypatch.setenv("POLARS_FORCE_OOC", "1")30plmonkeypatch.setenv("POLARS_VERBOSE", "1")31plmonkeypatch.setenv("POLARS_STREAMING_GROUPBY_SPILL_SIZE", "256")32df = pl.read_csv(io_files_path / "foods*.csv")33# this creates 10M rows34q = df.lazy()35q = q.join(q, how="cross").select(df.columns).head(10_000)3637# uses out-of-core unique38df1 = q.join(q.head(1000), how="cross").unique().collect(engine="streaming")39# this ensures the cross join gives equal result but uses the in-memory unique40df2 = q.join(q.head(1000), how="cross").collect(engine="streaming").unique()41assert df1.shape == df2.shape4243# TODO: Re-enable this check when this issue is fixed: https://github.com/pola-rs/polars/issues/1046644_ = capfd.readouterr().err45# assert "OOC group_by started" in err464748def test_streaming_unique() -> None:49df = pl.DataFrame({"a": [1, 2, 2, 2], "b": [3, 4, 4, 4], "c": [5, 6, 7, 7]})50q = df.lazy().unique(subset=["a", "c"], maintain_order=False).sort(["a", "b", "c"])51assert_frame_equal(q.collect(engine="streaming"), q.collect(engine="in-memory"))5253q = df.lazy().unique(subset=["b", "c"], maintain_order=False).sort(["a", "b", "c"])54assert_frame_equal(q.collect(engine="streaming"), q.collect(engine="in-memory"))5556q = df.lazy().unique(subset=None, maintain_order=False).sort(["a", "b", "c"])57assert_frame_equal(q.collect(engine="streaming"), q.collect(engine="in-memory"))585960