Path: blob/main/py-polars/tests/unit/streaming/test_streaming.py
6939 views
from __future__ import annotations12import time3from datetime import date4from pathlib import Path5from typing import TYPE_CHECKING, Any67import numpy as np8import pytest910import polars as pl11from polars.exceptions import PolarsInefficientMapWarning12from polars.testing import assert_frame_equal1314if TYPE_CHECKING:15from polars._typing import JoinStrategy1617pytestmark = pytest.mark.xdist_group("streaming")181920def test_streaming_categoricals_5921() -> None:21out_lazy = (22pl.DataFrame({"X": ["a", "a", "a", "b", "b"], "Y": [2, 2, 2, 1, 1]})23.lazy()24.with_columns(pl.col("X").cast(pl.Categorical))25.group_by("X")26.agg(pl.col("Y").min())27.sort("Y", descending=True)28.collect(engine="streaming")29)3031out_eager = (32pl.DataFrame({"X": ["a", "a", "a", "b", "b"], "Y": [2, 2, 2, 1, 1]})33.with_columns(pl.col("X").cast(pl.Categorical))34.group_by("X")35.agg(pl.col("Y").min())36.sort("Y", descending=True)37)3839for out in [out_eager, out_lazy]:40assert out.dtypes == [pl.Categorical, pl.Int64]41assert out.to_dict(as_series=False) == {"X": ["a", "b"], "Y": [2, 1]}424344def test_streaming_block_on_literals_6054() -> None:45df = pl.DataFrame({"col_1": [0] * 5 + [1] * 5})46s = pl.Series("col_2", list(range(10)))4748assert df.lazy().with_columns(s).group_by("col_1").agg(pl.all().first()).collect(49engine="streaming"50).sort("col_1").to_dict(as_series=False) == {"col_1": [0, 1], "col_2": [0, 5]}515253@pytest.mark.may_fail_auto_streaming54@pytest.mark.may_fail_cloud # reason: non-pure map_batches55def test_streaming_streamable_functions(monkeypatch: Any, capfd: Any) -> None:56monkeypatch.setenv("POLARS_IDEAL_MORSEL_SIZE", "1")57calls = 05859def func(df: pl.DataFrame) -> pl.DataFrame:60nonlocal calls61calls += 162return df.with_columns(pl.col("a").alias("b"))6364assert (65pl.DataFrame({"a": list(range(100))})66.lazy()67.map_batches(68function=func,69schema={"a": pl.Int64, "b": pl.Int64},70streamable=True,71)72).collect(engine="streaming").to_dict(as_series=False) == {73"a": list(range(100)),74"b": list(range(100)),75}7677assert calls > 1787980@pytest.mark.slow81@pytest.mark.may_fail_auto_streaming82@pytest.mark.may_fail_cloud # reason: timing83def test_cross_join_stack() -> None:84a = pl.Series(np.arange(100_000)).to_frame().lazy()85t0 = time.time()86assert a.join(a, how="cross").head().collect(engine="streaming").shape == (5, 2)87t1 = time.time()88assert (t1 - t0) < 0.5899091def test_streaming_literal_expansion() -> None:92df = pl.DataFrame(93{94"y": ["a", "b"],95"z": [1, 2],96}97)9899q = df.lazy().select(100x=pl.lit("constant"),101y=pl.col("y"),102z=pl.col("z"),103)104105assert q.collect(engine="streaming").to_dict(as_series=False) == {106"x": ["constant", "constant"],107"y": ["a", "b"],108"z": [1, 2],109}110assert q.group_by(["x", "y"]).agg(pl.mean("z")).sort("y").collect(111engine="streaming"112).to_dict(as_series=False) == {113"x": ["constant", "constant"],114"y": ["a", "b"],115"z": [1.0, 2.0],116}117assert q.group_by(["x"]).agg(pl.mean("z")).collect().to_dict(as_series=False) == {118"x": ["constant"],119"z": [1.5],120}121122123@pytest.mark.may_fail_auto_streaming124def test_streaming_apply(monkeypatch: Any, capfd: Any) -> None:125monkeypatch.setenv("POLARS_VERBOSE", "1")126127q = pl.DataFrame({"a": [1, 2]}).lazy()128with pytest.warns(PolarsInefficientMapWarning, match="with this one instead"):129(130q.select(131pl.col("a").map_elements(lambda x: x * 2, return_dtype=pl.Int64)132).collect(engine="streaming")133)134135136def test_streaming_sortedness_propagation_9494() -> None:137assert (138pl.DataFrame(139{140"when": [date(2023, 5, 10), date(2023, 5, 20), date(2023, 6, 10)],141"what": [1, 2, 3],142}143)144.lazy()145.sort("when")146.group_by_dynamic("when", every="1mo")147.agg(pl.col("what").sum())148.collect(engine="streaming")149).to_dict(as_series=False) == {150"when": [date(2023, 5, 1), date(2023, 6, 1)],151"what": [3, 3],152}153154155@pytest.mark.write_disk156@pytest.mark.slow157def test_streaming_generic_left_and_inner_join_from_disk(tmp_path: Path) -> None:158tmp_path.mkdir(exist_ok=True)159p0 = tmp_path / "df0.parquet"160p1 = tmp_path / "df1.parquet"161# by loading from disk, we get different chunks162n = 200_000163k = 100164165d0: dict[str, np.ndarray[Any, Any]] = {166f"x{i}": np.random.random(n) for i in range(k)167}168d0.update({"id": np.arange(n)})169170df0 = pl.DataFrame(d0)171df1 = df0.clone().select(pl.all().shuffle(111))172173df0.write_parquet(p0)174df1.write_parquet(p1)175176lf0 = pl.scan_parquet(p0)177lf1 = pl.scan_parquet(p1).select(pl.all().name.suffix("_r"))178179join_strategies: list[JoinStrategy] = ["left", "inner"]180for how in join_strategies:181assert_frame_equal(182lf0.join(183lf1, left_on="id", right_on="id_r", how=how, maintain_order="left"184).collect(engine="streaming"),185lf0.join(lf1, left_on="id", right_on="id_r", how=how).collect(186engine="in-memory"187),188check_row_order=how == "left",189)190191192def test_streaming_9776() -> None:193df = pl.DataFrame({"col_1": ["a"] * 1000, "ID": [None] + ["a"] * 999})194ordered = (195df.group_by("col_1", "ID", maintain_order=True)196.len()197.filter(pl.col("col_1") == "a")198)199unordered = (200df.group_by("col_1", "ID", maintain_order=False)201.len()202.filter(pl.col("col_1") == "a")203)204expected = [("a", None, 1), ("a", "a", 999)]205assert ordered.rows() == expected206assert unordered.sort(["col_1", "ID"]).rows() == expected207208209@pytest.mark.write_disk210def test_stream_empty_file(tmp_path: Path) -> None:211p = tmp_path / "in.parquet"212schema = {213"KLN_NR": pl.String,214}215216df = pl.DataFrame(217{218"KLN_NR": [],219},220schema=schema,221)222df.write_parquet(p)223assert pl.scan_parquet(p).collect(engine="streaming").schema == schema224225226def test_streaming_empty_df() -> None:227df = pl.DataFrame(228[229pl.Series("a", ["a", "b", "c", "b", "a", "a"], dtype=pl.Categorical()),230pl.Series("b", ["b", "c", "c", "b", "a", "c"], dtype=pl.Categorical()),231]232)233234result = (235df.lazy()236.join(df.lazy(), on="a", how="inner")237.filter(False)238.collect(engine="streaming")239)240241assert result.to_dict(as_series=False) == {"a": [], "b": [], "b_right": []}242243244def test_streaming_duplicate_cols_5537() -> None:245assert pl.DataFrame({"a": [1, 2, 3], "b": [1, 2, 3]}).lazy().with_columns(246(pl.col("a") * 2).alias("foo"), (pl.col("a") * 3)247).collect(engine="streaming").to_dict(as_series=False) == {248"a": [3, 6, 9],249"b": [1, 2, 3],250"foo": [2, 4, 6],251}252253254def test_null_sum_streaming_10455() -> None:255df = pl.DataFrame(256{257"x": [1] * 10,258"y": [None] * 10,259},260schema={"x": pl.Int64, "y": pl.Float32},261)262result = df.lazy().group_by("x").sum().collect(engine="streaming")263expected = {"x": [1], "y": [0.0]}264assert result.to_dict(as_series=False) == expected265266267def test_boolean_agg_schema() -> None:268df = pl.DataFrame(269{270"x": [1, 1, 1],271"y": [False, True, False],272}273).lazy()274275agg_df = df.group_by("x").agg(pl.col("y").max().alias("max_y"))276277for streaming in [True, False]:278assert (279agg_df.collect(engine="streaming" if streaming else "in-memory").schema280== agg_df.collect_schema()281== {"x": pl.Int64, "max_y": pl.Boolean}282)283284285@pytest.mark.write_disk286def test_streaming_csv_headers_but_no_data_13770(tmp_path: Path) -> None:287with Path.open(tmp_path / "header_no_data.csv", "w") as f:288f.write("name, age\n")289290schema = {"name": pl.String, "age": pl.Int32}291df = (292pl.scan_csv(tmp_path / "header_no_data.csv", schema=schema)293.head()294.collect(engine="streaming")295)296assert df.height == 0297assert df.schema == schema298299300@pytest.mark.write_disk301def test_streaming_with_hconcat(tmp_path: Path) -> None:302df1 = pl.DataFrame(303{304"id": [0, 0, 1, 1, 2, 2],305"x": [0.0, 1.0, 2.0, 3.0, 4.0, 5.0],306}307)308df1.write_parquet(tmp_path / "df1.parquet")309310df2 = pl.DataFrame(311{312"y": [6.0, 7.0, 8.0, 9.0, 10.0, 11.0],313}314)315df2.write_parquet(tmp_path / "df2.parquet")316317lf1 = pl.scan_parquet(tmp_path / "df1.parquet")318lf2 = pl.scan_parquet(tmp_path / "df2.parquet")319query = (320pl.concat([lf1, lf2], how="horizontal")321.group_by("id")322.agg(pl.all().mean())323.sort(pl.col("id"))324)325326result = query.collect(engine="streaming")327328expected = pl.DataFrame(329{330"id": [0, 1, 2],331"x": [0.5, 2.5, 4.5],332"y": [6.5, 8.5, 10.5],333}334)335336assert_frame_equal(result, expected)337338339@pytest.mark.write_disk340def test_elementwise_identification_in_ternary_15767(tmp_path: Path) -> None:341tmp_path.mkdir(exist_ok=True)342343(344pl.LazyFrame({"a": pl.Series([1])})345.with_columns(b=pl.col("a").is_in(pl.Series([1, 2, 3])))346.sink_parquet(tmp_path / "1")347)348349(350pl.LazyFrame({"a": pl.Series([1])})351.with_columns(352b=pl.when(pl.col("a").is_in(pl.Series([1, 2, 3]))).then(pl.col("a"))353)354.sink_parquet(tmp_path / "1")355)356357358def test_streaming_temporal_17669() -> None:359df = (360pl.LazyFrame({"a": [1, 2, 3]}, schema={"a": pl.Datetime("us")})361.with_columns(362b=pl.col("a").dt.date(),363c=pl.col("a").dt.time(),364)365.collect(engine="streaming")366)367assert df.schema == {368"a": pl.Datetime("us"),369"b": pl.Date,370"c": pl.Time,371}372373374def test_i128_sum_reduction() -> None:375assert (376pl.Series("a", [1, 2, 3], pl.Int128)377.to_frame()378.lazy()379.sum()380.collect(engine="streaming")381.item()382== 6383)384385386