Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/streaming/test_streaming_unique.py
6939 views
1
from __future__ import annotations
2
3
from typing import TYPE_CHECKING, Any
4
5
import pytest
6
7
import polars as pl
8
from polars.testing import assert_frame_equal
9
10
if TYPE_CHECKING:
11
from pathlib import Path
12
13
pytestmark = pytest.mark.xdist_group("streaming")
14
15
16
@pytest.mark.write_disk
17
@pytest.mark.slow
18
def test_streaming_out_of_core_unique(
19
io_files_path: Path, tmp_path: Path, monkeypatch: Any, capfd: Any
20
) -> None:
21
tmp_path.mkdir(exist_ok=True)
22
monkeypatch.setenv("POLARS_TEMP_DIR", str(tmp_path))
23
monkeypatch.setenv("POLARS_FORCE_OOC", "1")
24
monkeypatch.setenv("POLARS_VERBOSE", "1")
25
monkeypatch.setenv("POLARS_STREAMING_GROUPBY_SPILL_SIZE", "256")
26
df = pl.read_csv(io_files_path / "foods*.csv")
27
# this creates 10M rows
28
q = df.lazy()
29
q = q.join(q, how="cross").select(df.columns).head(10_000)
30
31
# uses out-of-core unique
32
df1 = q.join(q.head(1000), how="cross").unique().collect(engine="streaming")
33
# this ensures the cross join gives equal result but uses the in-memory unique
34
df2 = q.join(q.head(1000), how="cross").collect(engine="streaming").unique()
35
assert df1.shape == df2.shape
36
37
# TODO: Re-enable this check when this issue is fixed: https://github.com/pola-rs/polars/issues/10466
38
_ = capfd.readouterr().err
39
# assert "OOC group_by started" in err
40
41
42
def test_streaming_unique() -> None:
43
df = pl.DataFrame({"a": [1, 2, 2, 2], "b": [3, 4, 4, 4], "c": [5, 6, 7, 7]})
44
q = df.lazy().unique(subset=["a", "c"], maintain_order=False).sort(["a", "b", "c"])
45
assert_frame_equal(q.collect(engine="streaming"), q.collect(engine="in-memory"))
46
47
q = df.lazy().unique(subset=["b", "c"], maintain_order=False).sort(["a", "b", "c"])
48
assert_frame_equal(q.collect(engine="streaming"), q.collect(engine="in-memory"))
49
50
q = df.lazy().unique(subset=None, maintain_order=False).sort(["a", "b", "c"])
51
assert_frame_equal(q.collect(engine="streaming"), q.collect(engine="in-memory"))
52
53