Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/io/test_sink_batches.py
7884 views
1
from __future__ import annotations
2
3
from typing import TYPE_CHECKING
4
5
import pytest
6
7
import polars as pl
8
from polars.testing import assert_frame_equal
9
10
if TYPE_CHECKING:
11
from polars._typing import EngineType
12
13
14
@pytest.mark.parametrize("engine", ["in-memory", "streaming"])
15
def test_sink_batches(engine: EngineType) -> None:
16
df = pl.DataFrame({"a": range(100)})
17
frames: list[pl.DataFrame] = []
18
19
df.lazy().sink_batches(lambda df: frames.append(df), engine=engine) # type: ignore[call-overload]
20
21
assert_frame_equal(pl.concat(frames), df)
22
23
24
@pytest.mark.parametrize("engine", ["in-memory", "streaming"])
25
def test_sink_batches_early_stop(engine: EngineType) -> None:
26
df = pl.DataFrame({"a": range(1000)})
27
stopped = False
28
29
def cb(_: pl.DataFrame) -> bool | None:
30
nonlocal stopped
31
assert not stopped
32
stopped = True
33
return True
34
35
df.lazy().sink_batches(cb, chunk_size=100, engine=engine) # type: ignore[call-overload]
36
assert stopped
37
38
39
def test_collect_batches() -> None:
40
df = pl.DataFrame({"a": range(100)})
41
frames = []
42
43
for f in df.lazy().collect_batches():
44
frames += [f]
45
46
assert_frame_equal(pl.concat(frames), df)
47
48
49
def test_chunk_size() -> None:
50
df = pl.DataFrame({"a": range(113)})
51
52
for f in df.lazy().collect_batches(chunk_size=17):
53
expected = df.head(17)
54
df = df.slice(17)
55
56
assert_frame_equal(f, expected)
57
58
df = pl.DataFrame({"a": range(10)})
59
60
for f in df.lazy().collect_batches(chunk_size=10):
61
assert not f.is_empty()
62
63
expected = df.head(10)
64
df = df.slice(10)
65
66
assert_frame_equal(f, expected)
67
68