Path: blob/main/py-polars/tests/unit/streaming/test_streaming_unique.py
6939 views
from __future__ import annotations12from typing import TYPE_CHECKING, Any34import pytest56import polars as pl7from polars.testing import assert_frame_equal89if TYPE_CHECKING:10from pathlib import Path1112pytestmark = pytest.mark.xdist_group("streaming")131415@pytest.mark.write_disk16@pytest.mark.slow17def test_streaming_out_of_core_unique(18io_files_path: Path, tmp_path: Path, monkeypatch: Any, capfd: Any19) -> None:20tmp_path.mkdir(exist_ok=True)21monkeypatch.setenv("POLARS_TEMP_DIR", str(tmp_path))22monkeypatch.setenv("POLARS_FORCE_OOC", "1")23monkeypatch.setenv("POLARS_VERBOSE", "1")24monkeypatch.setenv("POLARS_STREAMING_GROUPBY_SPILL_SIZE", "256")25df = pl.read_csv(io_files_path / "foods*.csv")26# this creates 10M rows27q = df.lazy()28q = q.join(q, how="cross").select(df.columns).head(10_000)2930# uses out-of-core unique31df1 = q.join(q.head(1000), how="cross").unique().collect(engine="streaming")32# this ensures the cross join gives equal result but uses the in-memory unique33df2 = q.join(q.head(1000), how="cross").collect(engine="streaming").unique()34assert df1.shape == df2.shape3536# TODO: Re-enable this check when this issue is fixed: https://github.com/pola-rs/polars/issues/1046637_ = capfd.readouterr().err38# assert "OOC group_by started" in err394041def test_streaming_unique() -> None:42df = pl.DataFrame({"a": [1, 2, 2, 2], "b": [3, 4, 4, 4], "c": [5, 6, 7, 7]})43q = df.lazy().unique(subset=["a", "c"], maintain_order=False).sort(["a", "b", "c"])44assert_frame_equal(q.collect(engine="streaming"), q.collect(engine="in-memory"))4546q = df.lazy().unique(subset=["b", "c"], maintain_order=False).sort(["a", "b", "c"])47assert_frame_equal(q.collect(engine="streaming"), q.collect(engine="in-memory"))4849q = df.lazy().unique(subset=None, maintain_order=False).sort(["a", "b", "c"])50assert_frame_equal(q.collect(engine="streaming"), q.collect(engine="in-memory"))515253