Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/streaming/test_streaming_unique.py
8416 views
1
from __future__ import annotations
2
3
import os
4
from typing import TYPE_CHECKING, Any
5
6
import pytest
7
8
import polars as pl
9
from polars.testing import assert_frame_equal
10
11
if TYPE_CHECKING:
12
from pathlib import Path
13
14
from tests.conftest import PlMonkeyPatch
15
16
pytestmark = pytest.mark.xdist_group("streaming")
17
18
19
@pytest.mark.write_disk
20
@pytest.mark.slow
21
def test_streaming_out_of_core_unique(
22
io_files_path: Path, tmp_path: Path, plmonkeypatch: PlMonkeyPatch, capfd: Any
23
) -> None:
24
morsel_size = os.environ.get("POLARS_IDEAL_MORSEL_SIZE")
25
if morsel_size is not None and int(morsel_size) < 1000:
26
pytest.skip("test is too slow for small morsel sizes")
27
28
tmp_path.mkdir(exist_ok=True)
29
plmonkeypatch.setenv("POLARS_TEMP_DIR", str(tmp_path))
30
plmonkeypatch.setenv("POLARS_FORCE_OOC", "1")
31
plmonkeypatch.setenv("POLARS_VERBOSE", "1")
32
plmonkeypatch.setenv("POLARS_STREAMING_GROUPBY_SPILL_SIZE", "256")
33
df = pl.read_csv(io_files_path / "foods*.csv")
34
# this creates 10M rows
35
q = df.lazy()
36
q = q.join(q, how="cross").select(df.columns).head(10_000)
37
38
# uses out-of-core unique
39
df1 = q.join(q.head(1000), how="cross").unique().collect(engine="streaming")
40
# this ensures the cross join gives equal result but uses the in-memory unique
41
df2 = q.join(q.head(1000), how="cross").collect(engine="streaming").unique()
42
assert df1.shape == df2.shape
43
44
# TODO: Re-enable this check when this issue is fixed: https://github.com/pola-rs/polars/issues/10466
45
_ = capfd.readouterr().err
46
# assert "OOC group_by started" in err
47
48
49
def test_streaming_unique() -> None:
50
df = pl.DataFrame({"a": [1, 2, 2, 2], "b": [3, 4, 4, 4], "c": [5, 6, 7, 7]})
51
q = df.lazy().unique(subset=["a", "c"], maintain_order=False).sort(["a", "b", "c"])
52
assert_frame_equal(q.collect(engine="streaming"), q.collect(engine="in-memory"))
53
54
q = df.lazy().unique(subset=["b", "c"], maintain_order=False).sort(["a", "b", "c"])
55
assert_frame_equal(q.collect(engine="streaming"), q.collect(engine="in-memory"))
56
57
q = df.lazy().unique(subset=None, maintain_order=False).sort(["a", "b", "c"])
58
assert_frame_equal(q.collect(engine="streaming"), q.collect(engine="in-memory"))
59
60