Path: blob/main/py-polars/tests/unit/streaming/test_streaming.py
8409 views
from __future__ import annotations12import os3import time4from datetime import date5from pathlib import Path6from typing import TYPE_CHECKING, Any78import numpy as np9import pytest1011import polars as pl12from polars.exceptions import PolarsInefficientMapWarning13from polars.testing import assert_frame_equal1415if TYPE_CHECKING:16from polars._typing import JoinStrategy17from tests.conftest import PlMonkeyPatch1819pytestmark = pytest.mark.xdist_group("streaming")202122def test_streaming_categoricals_5921() -> None:23out_lazy = (24pl.DataFrame({"X": ["a", "a", "a", "b", "b"], "Y": [2, 2, 2, 1, 1]})25.lazy()26.with_columns(pl.col("X").cast(pl.Categorical))27.group_by("X")28.agg(pl.col("Y").min())29.sort("Y", descending=True)30.collect(engine="streaming")31)3233out_eager = (34pl.DataFrame({"X": ["a", "a", "a", "b", "b"], "Y": [2, 2, 2, 1, 1]})35.with_columns(pl.col("X").cast(pl.Categorical))36.group_by("X")37.agg(pl.col("Y").min())38.sort("Y", descending=True)39)4041for out in [out_eager, out_lazy]:42assert out.dtypes == [pl.Categorical, pl.Int64]43assert out.to_dict(as_series=False) == {"X": ["a", "b"], "Y": [2, 1]}444546def test_streaming_block_on_literals_6054() -> None:47df = pl.DataFrame({"col_1": [0] * 5 + [1] * 5})48s = pl.Series("col_2", list(range(10)))4950assert df.lazy().with_columns(s).group_by("col_1").agg(pl.all().first()).collect(51engine="streaming"52).sort("col_1").to_dict(as_series=False) == {"col_1": [0, 1], "col_2": [0, 5]}535455@pytest.mark.may_fail_auto_streaming56@pytest.mark.may_fail_cloud # reason: non-pure map_batches57def test_streaming_streamable_functions(58plmonkeypatch: PlMonkeyPatch, capfd: Any59) -> None:60plmonkeypatch.setenv("POLARS_IDEAL_MORSEL_SIZE", "1")61calls = 06263def func(df: pl.DataFrame) -> pl.DataFrame:64nonlocal calls65calls += 166return df.with_columns(pl.col("a").alias("b"))6768assert (69pl.DataFrame({"a": list(range(100))})70.lazy()71.map_batches(72function=func,73schema={"a": pl.Int64, "b": pl.Int64},74streamable=True,75)76).collect(engine="streaming").to_dict(as_series=False) == {77"a": list(range(100)),78"b": list(range(100)),79}8081assert calls > 1828384@pytest.mark.slow85@pytest.mark.may_fail_auto_streaming86@pytest.mark.may_fail_cloud # reason: timing87def test_cross_join_stack() -> None:88morsel_size = os.environ.get("POLARS_IDEAL_MORSEL_SIZE")89if morsel_size is not None and int(morsel_size) < 1000:90pytest.skip("test is too slow for small morsel sizes")9192a = pl.Series(np.arange(100_000)).to_frame().lazy()93t0 = time.time()94assert a.join(a, how="cross").head().collect(engine="streaming").shape == (5, 2)95t1 = time.time()96assert (t1 - t0) < 0.5979899def test_streaming_literal_expansion() -> None:100df = pl.DataFrame(101{102"y": ["a", "b"],103"z": [1, 2],104}105)106107q = df.lazy().select(108x=pl.lit("constant"),109y=pl.col("y"),110z=pl.col("z"),111)112113assert q.collect(engine="streaming").to_dict(as_series=False) == {114"x": ["constant", "constant"],115"y": ["a", "b"],116"z": [1, 2],117}118assert q.group_by(["x", "y"]).agg(pl.mean("z")).sort("y").collect(119engine="streaming"120).to_dict(as_series=False) == {121"x": ["constant", "constant"],122"y": ["a", "b"],123"z": [1.0, 2.0],124}125assert q.group_by(["x"]).agg(pl.mean("z")).collect().to_dict(as_series=False) == {126"x": ["constant"],127"z": [1.5],128}129130131@pytest.mark.may_fail_auto_streaming132def test_streaming_apply(plmonkeypatch: PlMonkeyPatch, capfd: Any) -> None:133plmonkeypatch.setenv("POLARS_VERBOSE", "1")134135q = pl.DataFrame({"a": [1, 2]}).lazy()136with pytest.warns(137PolarsInefficientMapWarning,138match="with this one instead",139):140(141q.select(142pl.col("a").map_elements(lambda x: x * 2, return_dtype=pl.Int64)143).collect(engine="streaming")144)145146147def test_streaming_sortedness_propagation_9494() -> None:148assert (149pl.DataFrame(150{151"when": [date(2023, 5, 10), date(2023, 5, 20), date(2023, 6, 10)],152"what": [1, 2, 3],153}154)155.lazy()156.sort("when")157.group_by_dynamic("when", every="1mo")158.agg(pl.col("what").sum())159.collect(engine="streaming")160).to_dict(as_series=False) == {161"when": [date(2023, 5, 1), date(2023, 6, 1)],162"what": [3, 3],163}164165166@pytest.mark.write_disk167@pytest.mark.slow168def test_streaming_generic_left_and_inner_join_from_disk(tmp_path: Path) -> None:169morsel_size = os.environ.get("POLARS_IDEAL_MORSEL_SIZE")170if morsel_size is not None and int(morsel_size) < 1000:171pytest.skip("test is too slow for small morsel sizes")172173tmp_path.mkdir(exist_ok=True)174p0 = tmp_path / "df0.parquet"175p1 = tmp_path / "df1.parquet"176# by loading from disk, we get different chunks177n = 200_000178k = 100179180d0: dict[str, np.ndarray[Any, Any]] = {181f"x{i}": np.random.random(n) for i in range(k)182}183d0.update({"id": np.arange(n)})184185df0 = pl.DataFrame(d0)186df1 = df0.clone().select(pl.all().shuffle(111))187188df0.write_parquet(p0)189df1.write_parquet(p1)190191lf0 = pl.scan_parquet(p0)192lf1 = pl.scan_parquet(p1).select(pl.all().name.suffix("_r"))193194join_strategies: list[JoinStrategy] = ["left", "inner"]195for how in join_strategies:196assert_frame_equal(197lf0.join(lf1, left_on="id", right_on="id_r", how=how).collect(198engine="streaming"199),200lf0.join(lf1, left_on="id", right_on="id_r", how=how).collect(201engine="in-memory"202),203check_row_order=False,204)205206207def test_streaming_9776() -> None:208df = pl.DataFrame({"col_1": ["a"] * 1000, "ID": [None] + ["a"] * 999})209ordered = (210df.group_by("col_1", "ID", maintain_order=True)211.len()212.filter(pl.col("col_1") == "a")213)214unordered = (215df.group_by("col_1", "ID", maintain_order=False)216.len()217.filter(pl.col("col_1") == "a")218)219expected = [("a", None, 1), ("a", "a", 999)]220assert ordered.rows() == expected221assert unordered.sort(["col_1", "ID"]).rows() == expected222223224@pytest.mark.write_disk225def test_stream_empty_file(tmp_path: Path) -> None:226p = tmp_path / "in.parquet"227schema = {228"KLN_NR": pl.String,229}230231df = pl.DataFrame(232{233"KLN_NR": [],234},235schema=schema,236)237df.write_parquet(p)238assert pl.scan_parquet(p).collect(engine="streaming").schema == schema239240241def test_streaming_empty_df() -> None:242df = pl.DataFrame(243[244pl.Series("a", ["a", "b", "c", "b", "a", "a"], dtype=pl.Categorical()),245pl.Series("b", ["b", "c", "c", "b", "a", "c"], dtype=pl.Categorical()),246]247)248249result = (250df.lazy()251.join(df.lazy(), on="a", how="inner")252.filter(False)253.collect(engine="streaming")254)255256assert result.to_dict(as_series=False) == {"a": [], "b": [], "b_right": []}257258259def test_streaming_duplicate_cols_5537() -> None:260assert pl.DataFrame({"a": [1, 2, 3], "b": [1, 2, 3]}).lazy().with_columns(261(pl.col("a") * 2).alias("foo"), (pl.col("a") * 3)262).collect(engine="streaming").to_dict(as_series=False) == {263"a": [3, 6, 9],264"b": [1, 2, 3],265"foo": [2, 4, 6],266}267268269def test_null_sum_streaming_10455() -> None:270df = pl.DataFrame(271{272"x": [1] * 10,273"y": [None] * 10,274},275schema={"x": pl.Int64, "y": pl.Float32},276)277result = df.lazy().group_by("x").sum().collect(engine="streaming")278expected = {"x": [1], "y": [0.0]}279assert result.to_dict(as_series=False) == expected280281282def test_boolean_agg_schema() -> None:283df = pl.DataFrame(284{285"x": [1, 1, 1],286"y": [False, True, False],287}288).lazy()289290agg_df = df.group_by("x").agg(pl.col("y").max().alias("max_y"))291292for streaming in [True, False]:293assert (294agg_df.collect(engine="streaming" if streaming else "in-memory").schema295== agg_df.collect_schema()296== {"x": pl.Int64, "max_y": pl.Boolean}297)298299300@pytest.mark.write_disk301def test_streaming_csv_headers_but_no_data_13770(tmp_path: Path) -> None:302with Path.open(tmp_path / "header_no_data.csv", "w") as f:303f.write("name, age\n")304305schema = {"name": pl.String, "age": pl.Int32}306df = (307pl.scan_csv(tmp_path / "header_no_data.csv", schema=schema)308.head()309.collect(engine="streaming")310)311assert df.height == 0312assert df.schema == schema313314315@pytest.mark.write_disk316def test_streaming_with_hconcat(tmp_path: Path) -> None:317df1 = pl.DataFrame(318{319"id": [0, 0, 1, 1, 2, 2],320"x": [0.0, 1.0, 2.0, 3.0, 4.0, 5.0],321}322)323df1.write_parquet(tmp_path / "df1.parquet")324325df2 = pl.DataFrame(326{327"y": [6.0, 7.0, 8.0, 9.0, 10.0, 11.0],328}329)330df2.write_parquet(tmp_path / "df2.parquet")331332lf1 = pl.scan_parquet(tmp_path / "df1.parquet")333lf2 = pl.scan_parquet(tmp_path / "df2.parquet")334query = (335pl.concat([lf1, lf2], how="horizontal")336.group_by("id")337.agg(pl.all().mean())338.sort(pl.col("id"))339)340341result = query.collect(engine="streaming")342343expected = pl.DataFrame(344{345"id": [0, 1, 2],346"x": [0.5, 2.5, 4.5],347"y": [6.5, 8.5, 10.5],348}349)350351assert_frame_equal(result, expected)352353354@pytest.mark.write_disk355def test_elementwise_identification_in_ternary_15767(tmp_path: Path) -> None:356tmp_path.mkdir(exist_ok=True)357358(359pl.LazyFrame({"a": pl.Series([1])})360.with_columns(b=pl.col("a").is_in(pl.Series([1, 2, 3])))361.sink_parquet(tmp_path / "1")362)363364(365pl.LazyFrame({"a": pl.Series([1])})366.with_columns(367b=pl.when(pl.col("a").is_in(pl.Series([1, 2, 3]))).then(pl.col("a"))368)369.sink_parquet(tmp_path / "1")370)371372373def test_streaming_temporal_17669() -> None:374df = (375pl.LazyFrame({"a": [1, 2, 3]}, schema={"a": pl.Datetime("us")})376.with_columns(377b=pl.col("a").dt.date(),378c=pl.col("a").dt.time(),379)380.collect(engine="streaming")381)382assert df.schema == {383"a": pl.Datetime("us"),384"b": pl.Date,385"c": pl.Time,386}387388389def test_i128_sum_reduction() -> None:390assert (391pl.Series("a", [1, 2, 3], pl.Int128)392.to_frame()393.lazy()394.sum()395.collect(engine="streaming")396.item()397== 6398)399400401