Path: blob/main/py-polars/tests/unit/streaming/test_streaming_io.py
6939 views
from __future__ import annotations12from typing import TYPE_CHECKING, Any34import pytest56import polars as pl7from polars.testing import assert_frame_equal89if TYPE_CHECKING:10from pathlib import Path1112pytestmark = pytest.mark.xdist_group("streaming")131415@pytest.mark.write_disk16def test_streaming_parquet_glob_5900(df: pl.DataFrame, tmp_path: Path) -> None:17tmp_path.mkdir(exist_ok=True)18file_path = tmp_path / "small.parquet"19df.write_parquet(file_path)2021path_glob = tmp_path / "small*.parquet"22result = (23pl.scan_parquet(path_glob).select(pl.all().first()).collect(engine="streaming")24)25assert result.shape == (1, df.width)262728def test_scan_slice_streaming(io_files_path: Path) -> None:29foods_file_path = io_files_path / "foods1.csv"30df = pl.scan_csv(foods_file_path).head(5).collect(engine="streaming")31assert df.shape == (5, 4)3233# globbing34foods_file_path = io_files_path / "foods*.csv"35df = pl.scan_csv(foods_file_path).head(5).collect(engine="streaming")36assert df.shape == (5, 4)373839@pytest.mark.parametrize("dtype", [pl.Int8, pl.UInt8, pl.Int16, pl.UInt16])40def test_scan_csv_overwrite_small_dtypes(41io_files_path: Path, dtype: pl.DataType42) -> None:43file_path = io_files_path / "foods1.csv"44df = pl.scan_csv(file_path, schema_overrides={"sugars_g": dtype}).collect(45engine="streaming"46)47assert df.dtypes == [pl.String, pl.Int64, pl.Float64, dtype]484950@pytest.mark.write_disk51def test_sink_parquet(io_files_path: Path, tmp_path: Path) -> None:52tmp_path.mkdir(exist_ok=True)5354file = io_files_path / "small.parquet"5556file_path = tmp_path / "sink.parquet"5758df_scanned = pl.scan_parquet(file)59df_scanned.sink_parquet(file_path)6061result = pl.read_parquet(file_path)62df_read = pl.read_parquet(file)63assert_frame_equal(result, df_read)646566@pytest.mark.write_disk67def test_sink_parquet_10115(tmp_path: Path) -> None:68in_path = tmp_path / "in.parquet"69out_path = tmp_path / "out.parquet"7071# this fails if the schema will be incorrectly due to the projection72# pushdown73(pl.DataFrame([{"x": 1, "y": "foo"}]).write_parquet(in_path))7475joiner = pl.LazyFrame([{"y": "foo", "z": "_"}])7677(78pl.scan_parquet(in_path)79.join(joiner, how="left", on="y")80.select("x", "y", "z")81.sink_parquet(out_path) #82)8384assert pl.read_parquet(out_path).to_dict(as_series=False) == {85"x": [1],86"y": ["foo"],87"z": ["_"],88}899091@pytest.mark.write_disk92def test_sink_ipc(io_files_path: Path, tmp_path: Path) -> None:93tmp_path.mkdir(exist_ok=True)9495file = io_files_path / "small.parquet"9697file_path = tmp_path / "sink.ipc"9899df_scanned = pl.scan_parquet(file)100df_scanned.sink_ipc(file_path)101102result = pl.read_ipc(file_path)103df_read = pl.read_parquet(file)104assert_frame_equal(result, df_read)105106107@pytest.mark.write_disk108def test_sink_csv(io_files_path: Path, tmp_path: Path) -> None:109source_file = io_files_path / "small.parquet"110target_file = tmp_path / "sink.csv"111112pl.scan_parquet(source_file).sink_csv(target_file)113114source_data = pl.read_parquet(source_file)115target_data = pl.read_csv(target_file)116assert_frame_equal(target_data, source_data)117118119@pytest.mark.write_disk120def test_sink_csv_14494(tmp_path: Path) -> None:121pl.LazyFrame({"c": [1, 2, 3]}, schema={"c": pl.Int64}).filter(122pl.col("c") > 10123).sink_csv(tmp_path / "sink.csv")124assert pl.read_csv(tmp_path / "sink.csv").columns == ["c"]125126127@pytest.mark.parametrize(("value"), ["abc", ""])128def test_sink_csv_exception_for_separator(value: str) -> None:129df = pl.LazyFrame({"dummy": ["abc"]})130with pytest.raises(ValueError, match="should be a single byte character, but is"):131df.sink_csv("path", separator=value)132133134@pytest.mark.parametrize(("value"), ["abc", ""])135def test_sink_csv_exception_for_quote(value: str) -> None:136df = pl.LazyFrame({"dummy": ["abc"]})137with pytest.raises(ValueError, match="should be a single byte character, but is"):138df.sink_csv("path", quote_char=value)139140141def test_sink_csv_batch_size_zero() -> None:142lf = pl.LazyFrame({"a": [1, 2, 3], "b": [1, 2, 3]})143with pytest.raises(ValueError, match="invalid zero value"):144lf.sink_csv("test.csv", batch_size=0)145146147@pytest.mark.write_disk148def test_sink_csv_nested_data(tmp_path: Path) -> None:149tmp_path.mkdir(exist_ok=True)150path = tmp_path / "data.csv"151152lf = pl.LazyFrame({"list": [[1, 2, 3, 4, 5]]})153with pytest.raises(154pl.exceptions.ComputeError, match="CSV format does not support nested data"155):156lf.sink_csv(path)157158159def test_scan_csv_only_header_10792(io_files_path: Path) -> None:160foods_file_path = io_files_path / "only_header.csv"161df = pl.scan_csv(foods_file_path).collect(engine="streaming")162assert df.to_dict(as_series=False) == {"Name": [], "Address": []}163164165def test_scan_empty_csv_10818(io_files_path: Path) -> None:166empty_file_path = io_files_path / "empty.csv"167df = pl.scan_csv(empty_file_path, raise_if_empty=False).collect(engine="streaming")168assert df.is_empty()169170171@pytest.mark.write_disk172def test_streaming_cross_join_schema(tmp_path: Path) -> None:173file_path = tmp_path / "temp.parquet"174a = pl.DataFrame({"a": [1, 2]}).lazy()175b = pl.DataFrame({"b": ["b"]}).lazy()176a.join(b, how="cross").sink_parquet(file_path)177read = pl.read_parquet(file_path, parallel="none")178assert read.to_dict(as_series=False) == {"a": [1, 2], "b": ["b", "b"]}179180181@pytest.mark.write_disk182def test_sink_ndjson_should_write_same_data(183io_files_path: Path, tmp_path: Path184) -> None:185tmp_path.mkdir(exist_ok=True)186187source_path = io_files_path / "foods1.csv"188target_path = tmp_path / "foods_test.ndjson"189190expected = pl.read_csv(source_path)191192lf = pl.scan_csv(source_path)193lf.sink_ndjson(target_path)194df = pl.read_ndjson(target_path)195196assert_frame_equal(df, expected)197198199@pytest.mark.write_disk200@pytest.mark.parametrize("streaming", [False, True])201def test_parquet_eq_statistics(202monkeypatch: Any, capfd: Any, tmp_path: Path, streaming: bool203) -> None:204tmp_path.mkdir(exist_ok=True)205206monkeypatch.setenv("POLARS_VERBOSE", "1")207208df = pl.DataFrame({"idx": pl.arange(100, 200, eager=True)}).with_columns(209(pl.col("idx") // 25).alias("part")210)211df = pl.concat(df.partition_by("part", as_dict=False), rechunk=False)212assert df.n_chunks("all") == [4, 4]213214file_path = tmp_path / "stats.parquet"215df.write_parquet(file_path, statistics=True, use_pyarrow=False)216217for pred in [218pl.col("idx") == 50,219pl.col("idx") == 150,220pl.col("idx") == 210,221]:222result = (223pl.scan_parquet(file_path)224.filter(pred)225.collect(engine="streaming" if streaming else "in-memory")226)227assert_frame_equal(result, df.filter(pred))228229captured = capfd.readouterr().err230assert (231"[ParquetFileReader]: Predicate pushdown: reading 1 / 1 row groups" in captured232)233assert (234"[ParquetFileReader]: Predicate pushdown: reading 0 / 1 row groups" in captured235)236237238@pytest.mark.write_disk239def test_streaming_empty_parquet_16523(tmp_path: Path) -> None:240file_path = tmp_path / "foo.parquet"241df = pl.DataFrame({"a": []}, schema={"a": pl.Int32})242df.write_parquet(file_path)243q = pl.scan_parquet(file_path)244q2 = pl.LazyFrame({"a": [1]}, schema={"a": pl.Int32})245assert q.join(q2, on="a").collect(engine="streaming").shape == (0, 1)246247248@pytest.mark.parametrize(249"method",250["parquet", "csv", "ipc", "ndjson"],251)252@pytest.mark.write_disk253def test_sink_phases(tmp_path: Path, method: str) -> None:254df = pl.DataFrame(255{256"a": [1, 2, 3, 4, 5, 6, 7],257"b": [258"some",259"text",260"over-here-is-very-long",261"and",262"some",263"more",264"text",265],266}267)268269# Ordered Unions lead to many phase transitions.270ref_df = pl.concat([df] * 100)271lf = pl.concat([df.lazy()] * 100)272273(getattr(lf, f"sink_{method}"))(tmp_path / f"t.{method}", engine="streaming")274df = (getattr(pl, f"scan_{method}"))(tmp_path / f"t.{method}").collect()275276assert_frame_equal(df, ref_df)277278(getattr(lf, f"sink_{method}"))(279tmp_path / f"t.{method}", maintain_order=False, engine="streaming"280)281height = (282(getattr(pl, f"scan_{method}"))(tmp_path / f"t.{method}")283.select(pl.len())284.collect()[0, 0]285)286assert height == ref_df.height287288289def test_empty_sink_parquet_join_14863(tmp_path: Path) -> None:290file_path = tmp_path / "empty.parquet"291lf = pl.LazyFrame(schema=["a", "b", "c"]).cast(pl.String)292lf.sink_parquet(file_path)293assert_frame_equal(294pl.LazyFrame({"a": ["uno"]}).join(pl.scan_parquet(file_path), on="a").collect(),295lf.collect(),296)297298299@pytest.mark.write_disk300def test_scan_non_existent_file_21527() -> None:301with pytest.raises(302FileNotFoundError,303match=r"a-file-that-does-not-exist",304):305pl.scan_parquet("a-file-that-does-not-exist").sink_ipc(306"x.ipc", engine="streaming"307)308309310