Path: blob/main/py-polars/tests/unit/streaming/test_streaming_io.py
8416 views
from __future__ import annotations12from typing import TYPE_CHECKING, Any34import pytest56import polars as pl7from polars.testing import assert_frame_equal89if TYPE_CHECKING:10from pathlib import Path1112from tests.conftest import PlMonkeyPatch1314pytestmark = pytest.mark.xdist_group("streaming")151617@pytest.mark.write_disk18def test_streaming_parquet_glob_5900(df: pl.DataFrame, tmp_path: Path) -> None:19tmp_path.mkdir(exist_ok=True)20file_path = tmp_path / "small.parquet"21df.write_parquet(file_path)2223path_glob = tmp_path / "small*.parquet"24result = (25pl.scan_parquet(path_glob).select(pl.all().first()).collect(engine="streaming")26)27assert result.shape == (1, df.width)282930def test_scan_slice_streaming(io_files_path: Path) -> None:31foods_file_path = io_files_path / "foods1.csv"32df = pl.scan_csv(foods_file_path).head(5).collect(engine="streaming")33assert df.shape == (5, 4)3435# globbing36foods_file_path = io_files_path / "foods*.csv"37df = pl.scan_csv(foods_file_path).head(5).collect(engine="streaming")38assert df.shape == (5, 4)394041@pytest.mark.parametrize("dtype", [pl.Int8, pl.UInt8, pl.Int16, pl.UInt16])42def test_scan_csv_overwrite_small_dtypes(43io_files_path: Path, dtype: pl.DataType44) -> None:45file_path = io_files_path / "foods1.csv"46df = pl.scan_csv(file_path, schema_overrides={"sugars_g": dtype}).collect(47engine="streaming"48)49assert df.dtypes == [pl.String, pl.Int64, pl.Float64, dtype]505152@pytest.mark.write_disk53def test_sink_parquet(io_files_path: Path, tmp_path: Path) -> None:54tmp_path.mkdir(exist_ok=True)5556file = io_files_path / "small.parquet"5758file_path = tmp_path / "sink.parquet"5960df_scanned = pl.scan_parquet(file)61df_scanned.sink_parquet(file_path)6263result = pl.read_parquet(file_path)64df_read = pl.read_parquet(file)65assert_frame_equal(result, df_read)666768@pytest.mark.write_disk69def test_sink_parquet_10115(tmp_path: Path) -> None:70in_path = tmp_path / "in.parquet"71out_path = tmp_path / "out.parquet"7273# this fails if the schema will be incorrectly due to the projection74# pushdown75(pl.DataFrame([{"x": 1, "y": "foo"}]).write_parquet(in_path))7677joiner = pl.LazyFrame([{"y": "foo", "z": "_"}])7879(80pl.scan_parquet(in_path)81.join(joiner, how="left", on="y")82.select("x", "y", "z")83.sink_parquet(out_path) #84)8586assert pl.read_parquet(out_path).to_dict(as_series=False) == {87"x": [1],88"y": ["foo"],89"z": ["_"],90}919293@pytest.mark.write_disk94def test_sink_ipc(io_files_path: Path, tmp_path: Path) -> None:95tmp_path.mkdir(exist_ok=True)9697file = io_files_path / "small.parquet"9899file_path = tmp_path / "sink.ipc"100101df_scanned = pl.scan_parquet(file)102df_scanned.sink_ipc(file_path)103104result = pl.read_ipc(file_path)105df_read = pl.read_parquet(file)106assert_frame_equal(result, df_read)107108109@pytest.mark.write_disk110def test_sink_csv(io_files_path: Path, tmp_path: Path) -> None:111source_file = io_files_path / "small.parquet"112target_file = tmp_path / "sink.csv"113114pl.scan_parquet(source_file).sink_csv(target_file)115116source_data = pl.read_parquet(source_file)117target_data = pl.read_csv(target_file)118assert_frame_equal(target_data, source_data)119120121@pytest.mark.write_disk122def test_sink_csv_14494(tmp_path: Path) -> None:123pl.LazyFrame({"c": [1, 2, 3]}, schema={"c": pl.Int64}).filter(124pl.col("c") > 10125).sink_csv(tmp_path / "sink.csv")126assert pl.read_csv(tmp_path / "sink.csv").columns == ["c"]127128129@pytest.mark.parametrize(("value"), ["abc", ""])130def test_sink_csv_exception_for_separator(value: str) -> None:131df = pl.LazyFrame({"dummy": ["abc"]})132with pytest.raises(ValueError, match="should be a single byte character, but is"):133df.sink_csv("path", separator=value)134135136@pytest.mark.parametrize(("value"), ["abc", ""])137def test_sink_csv_exception_for_quote(value: str) -> None:138df = pl.LazyFrame({"dummy": ["abc"]})139with pytest.raises(ValueError, match="should be a single byte character, but is"):140df.sink_csv("path", quote_char=value)141142143def test_sink_csv_batch_size_zero() -> None:144lf = pl.LazyFrame({"a": [1, 2, 3], "b": [1, 2, 3]})145with pytest.raises(ValueError, match="invalid zero value"):146lf.sink_csv("test.csv", batch_size=0)147148149@pytest.mark.write_disk150def test_sink_csv_nested_data(tmp_path: Path) -> None:151tmp_path.mkdir(exist_ok=True)152path = tmp_path / "data.csv"153154lf = pl.LazyFrame({"list": [[1, 2, 3, 4, 5]]})155with pytest.raises(156pl.exceptions.ComputeError, match="CSV format does not support nested data"157):158lf.sink_csv(path)159160161def test_scan_csv_only_header_10792(io_files_path: Path) -> None:162foods_file_path = io_files_path / "only_header.csv"163df = pl.scan_csv(foods_file_path).collect(engine="streaming")164assert df.to_dict(as_series=False) == {"Name": [], "Address": []}165166167def test_scan_empty_csv_10818(io_files_path: Path) -> None:168empty_file_path = io_files_path / "empty.csv"169df = pl.scan_csv(empty_file_path, raise_if_empty=False).collect(engine="streaming")170assert df.is_empty()171172173@pytest.mark.write_disk174def test_streaming_cross_join_schema(tmp_path: Path) -> None:175file_path = tmp_path / "temp.parquet"176a = pl.DataFrame({"a": [1, 2]}).lazy()177b = pl.DataFrame({"b": ["b"]}).lazy()178a.join(b, how="cross").sink_parquet(file_path)179read = pl.read_parquet(file_path, parallel="none")180assert read.to_dict(as_series=False) == {"a": [1, 2], "b": ["b", "b"]}181182183@pytest.mark.write_disk184def test_sink_ndjson_should_write_same_data(185io_files_path: Path, tmp_path: Path186) -> None:187tmp_path.mkdir(exist_ok=True)188189source_path = io_files_path / "foods1.csv"190target_path = tmp_path / "foods_test.ndjson"191192expected = pl.read_csv(source_path)193194lf = pl.scan_csv(source_path)195lf.sink_ndjson(target_path)196df = pl.read_ndjson(target_path)197198assert_frame_equal(df, expected)199200201@pytest.mark.write_disk202@pytest.mark.parametrize("streaming", [False, True])203def test_parquet_eq_statistics(204plmonkeypatch: PlMonkeyPatch, capfd: Any, tmp_path: Path, streaming: bool205) -> None:206tmp_path.mkdir(exist_ok=True)207208plmonkeypatch.setenv("POLARS_VERBOSE", "1")209210df = pl.DataFrame({"idx": pl.arange(100, 200, eager=True)}).with_columns(211(pl.col("idx") // 25).alias("part")212)213df = pl.concat(df.partition_by("part", as_dict=False), rechunk=False)214assert df.n_chunks("all") == [4, 4]215216file_path = tmp_path / "stats.parquet"217df.write_parquet(file_path, statistics=True, use_pyarrow=False)218219for pred in [220pl.col("idx") == 50,221pl.col("idx") == 150,222pl.col("idx") == 210,223]:224result = (225pl.scan_parquet(file_path)226.filter(pred)227.collect(engine="streaming" if streaming else "in-memory")228)229assert_frame_equal(result, df.filter(pred))230231captured = capfd.readouterr().err232assert (233"[ParquetFileReader]: Predicate pushdown: reading 1 / 1 row groups" in captured234)235assert (236"[ParquetFileReader]: Predicate pushdown: reading 0 / 1 row groups" in captured237)238239240@pytest.mark.write_disk241def test_streaming_empty_parquet_16523(tmp_path: Path) -> None:242file_path = tmp_path / "foo.parquet"243df = pl.DataFrame({"a": []}, schema={"a": pl.Int32})244df.write_parquet(file_path)245q = pl.scan_parquet(file_path)246q2 = pl.LazyFrame({"a": [1]}, schema={"a": pl.Int32})247assert q.join(q2, on="a").collect(engine="streaming").shape == (0, 1)248249250@pytest.mark.parametrize(251"method",252["parquet", "csv", "ipc", "ndjson"],253)254@pytest.mark.write_disk255def test_sink_phases(tmp_path: Path, method: str) -> None:256df = pl.DataFrame(257{258"a": [1, 2, 3, 4, 5, 6, 7],259"b": [260"some",261"text",262"over-here-is-very-long",263"and",264"some",265"more",266"text",267],268}269)270271# Ordered Unions lead to many phase transitions.272ref_df = pl.concat([df] * 100)273lf = pl.concat([df.lazy()] * 100)274275(getattr(lf, f"sink_{method}"))(tmp_path / f"t.{method}", engine="streaming")276df = (getattr(pl, f"scan_{method}"))(tmp_path / f"t.{method}").collect()277278assert_frame_equal(df, ref_df)279280(getattr(lf, f"sink_{method}"))(281tmp_path / f"t.{method}", maintain_order=False, engine="streaming"282)283height = (284(getattr(pl, f"scan_{method}"))(tmp_path / f"t.{method}")285.select(pl.len())286.collect()[0, 0]287)288assert height == ref_df.height289290291def test_empty_sink_parquet_join_14863(tmp_path: Path) -> None:292file_path = tmp_path / "empty.parquet"293lf = pl.LazyFrame(schema=["a", "b", "c"]).cast(pl.String)294lf.sink_parquet(file_path)295assert_frame_equal(296pl.LazyFrame({"a": ["uno"]}).join(pl.scan_parquet(file_path), on="a").collect(),297lf.collect(),298)299300301@pytest.mark.write_disk302def test_scan_non_existent_file_21527() -> None:303with pytest.raises(304FileNotFoundError,305match=r"a-file-that-does-not-exist",306):307pl.scan_parquet("a-file-that-does-not-exist").sink_ipc(308"x.ipc", engine="streaming"309)310311312