Path: blob/main/py-polars/tests/unit/io/test_lazy_parquet.py
8484 views
from __future__ import annotations12import base643import io4import subprocess5import sys6from collections import OrderedDict7from datetime import datetime8from pathlib import Path9from threading import Thread10from typing import TYPE_CHECKING, Any1112import pandas as pd13import pyarrow as pa14import pyarrow.parquet as pq15import pytest1617import polars as pl18from polars.exceptions import ComputeError, SchemaError19from polars.testing import assert_frame_equal2021if TYPE_CHECKING:22from polars._typing import ParallelStrategy23from tests.conftest import PlMonkeyPatch242526@pytest.fixture27def parquet_file_path(io_files_path: Path) -> Path:28return io_files_path / "small.parquet"293031@pytest.fixture32def foods_parquet_path(io_files_path: Path) -> Path:33return io_files_path / "foods1.parquet"343536def test_scan_parquet(parquet_file_path: Path) -> None:37df = pl.scan_parquet(parquet_file_path)38assert df.collect().shape == (4, 3)394041def test_scan_parquet_local_with_async(42plmonkeypatch: PlMonkeyPatch, foods_parquet_path: Path43) -> None:44plmonkeypatch.setenv("POLARS_FORCE_ASYNC", "1")45pl.scan_parquet(foods_parquet_path.relative_to(Path.cwd())).head(1).collect()464748def test_row_index(foods_parquet_path: Path) -> None:49df = pl.read_parquet(foods_parquet_path, row_index_name="row_index")50assert df["row_index"].to_list() == list(range(27))5152df = (53pl.scan_parquet(foods_parquet_path, row_index_name="row_index")54.filter(pl.col("category") == pl.lit("vegetables"))55.collect()56)5758assert df["row_index"].to_list() == [0, 6, 11, 13, 14, 20, 25]5960df = (61pl.scan_parquet(foods_parquet_path, row_index_name="row_index")62.with_row_index("foo", 10)63.filter(pl.col("category") == pl.lit("vegetables"))64.collect()65)6667assert df["foo"].to_list() == [10, 16, 21, 23, 24, 30, 35]686970def test_row_index_len_16543(foods_parquet_path: Path) -> None:71q = pl.scan_parquet(foods_parquet_path).with_row_index()72assert q.select(pl.all()).select(pl.len()).collect().item() == 27737475@pytest.mark.write_disk76def test_categorical_parquet_statistics(tmp_path: Path) -> None:77tmp_path.mkdir(exist_ok=True)7879df = pl.DataFrame(80{81"book": [82"bookA",83"bookA",84"bookB",85"bookA",86"bookA",87"bookC",88"bookC",89"bookC",90],91"transaction_id": [1, 2, 3, 4, 5, 6, 7, 8],92"user": ["bob", "bob", "bob", "tim", "lucy", "lucy", "lucy", "lucy"],93}94).with_columns(pl.col("book").cast(pl.Categorical))9596file_path = tmp_path / "books.parquet"97df.write_parquet(file_path, statistics=True)9899parallel_options: list[ParallelStrategy] = [100"auto",101"columns",102"row_groups",103"none",104]105for par in parallel_options:106df = (107pl.scan_parquet(file_path, parallel=par)108.filter(pl.col("book") == "bookA")109.collect()110)111assert df.shape == (4, 3)112113114@pytest.mark.write_disk115def test_parquet_eq_stats(tmp_path: Path) -> None:116tmp_path.mkdir(exist_ok=True)117118file_path = tmp_path / "stats.parquet"119120df1 = pd.DataFrame({"a": [None, 1, None, 2, 3, 3, 4, 4, 5, 5]})121df1.to_parquet(file_path, engine="pyarrow")122df = pl.scan_parquet(file_path).filter(pl.col("a") == 4).collect()123assert df["a"].to_list() == [4.0, 4.0]124125assert (126pl.scan_parquet(file_path).filter(pl.col("a") == 2).select(pl.col("a").sum())127).collect()[0, "a"] == 2.0128129assert pl.scan_parquet(file_path).filter(pl.col("a") == 5).collect().shape == (1302,1311,132)133134135@pytest.mark.write_disk136def test_parquet_is_in_stats(tmp_path: Path) -> None:137tmp_path.mkdir(exist_ok=True)138139file_path = tmp_path / "stats.parquet"140141df1 = pd.DataFrame({"a": [None, 1, None, 2, 3, 3, 4, 4, 5, 5]})142df1.to_parquet(file_path, engine="pyarrow")143df = pl.scan_parquet(file_path).filter(pl.col("a").is_in([5])).collect()144assert df["a"].to_list() == [5.0, 5.0]145146assert (147pl.scan_parquet(file_path)148.filter(pl.col("a").is_in([5]))149.select(pl.col("a").sum())150).collect()[0, "a"] == 10.0151152assert (153pl.scan_parquet(file_path)154.filter(pl.col("a").is_in([1, 2, 3]))155.select(pl.col("a").sum())156).collect()[0, "a"] == 9.0157158assert (159pl.scan_parquet(file_path)160.filter(pl.col("a").is_in([1, 2, 3]))161.select(pl.col("a").sum())162).collect()[0, "a"] == 9.0163164assert (165pl.scan_parquet(file_path)166.filter(pl.col("a").is_in([5]))167.select(pl.col("a").sum())168).collect()[0, "a"] == 10.0169170assert pl.scan_parquet(file_path).filter(171pl.col("a").is_in([1, 2, 3, 4, 5])172).collect().shape == (8, 1)173174175@pytest.mark.write_disk176def test_parquet_stats(tmp_path: Path) -> None:177tmp_path.mkdir(exist_ok=True)178179file_path = tmp_path / "binary_stats.parquet"180181df1 = pd.DataFrame({"a": [None, 1, None, 2, 3, 3, 4, 4, 5, 5]})182df1.to_parquet(file_path, engine="pyarrow")183df = (184pl.scan_parquet(file_path)185.filter(pl.col("a").is_not_null() & (pl.col("a") > 4))186.collect()187)188assert df["a"].to_list() == [5.0, 5.0]189190assert (191pl.scan_parquet(file_path).filter(pl.col("a") > 4).select(pl.col("a").sum())192).collect()[0, "a"] == 10.0193194assert (195pl.scan_parquet(file_path).filter(pl.col("a") < 4).select(pl.col("a").sum())196).collect()[0, "a"] == 9.0197198assert (199pl.scan_parquet(file_path).filter(pl.col("a") < 4).select(pl.col("a").sum())200).collect()[0, "a"] == 9.0201202assert (203pl.scan_parquet(file_path).filter(pl.col("a") > 4).select(pl.col("a").sum())204).collect()[0, "a"] == 10.0205assert pl.scan_parquet(file_path).filter(206(pl.col("a") * 10) > 5.0207).collect().shape == (8, 1)208209210def test_row_index_schema_parquet(parquet_file_path: Path) -> None:211assert (212pl.scan_parquet(str(parquet_file_path), row_index_name="id")213.select(["id", "b"])214.collect()215).dtypes == [pl.get_index_type(), pl.String]216217218@pytest.mark.may_fail_cloud # reason: inspects logs219@pytest.mark.write_disk220def test_parquet_is_in_statistics(221plmonkeypatch: PlMonkeyPatch, capfd: Any, tmp_path: Path222) -> None:223tmp_path.mkdir(exist_ok=True)224225plmonkeypatch.setenv("POLARS_VERBOSE", "1")226227df = pl.DataFrame({"idx": pl.arange(0, 100, eager=True)}).with_columns(228(pl.col("idx") // 25).alias("part")229)230df = pl.concat(df.partition_by("part", as_dict=False), rechunk=False)231assert df.n_chunks("all") == [4, 4]232233file_path = tmp_path / "stats.parquet"234df.write_parquet(file_path, statistics=True, use_pyarrow=False)235236file_path = tmp_path / "stats.parquet"237df.write_parquet(file_path, statistics=True, use_pyarrow=False)238239for pred in [240pl.col("idx").is_in([150, 200, 300]),241pl.col("idx").is_in([5, 250, 350]),242]:243result = pl.scan_parquet(file_path).filter(pred).collect()244assert_frame_equal(result, df.filter(pred))245246captured = capfd.readouterr().err247assert "Predicate pushdown: reading 1 / 1 row groups" in captured248assert "Predicate pushdown: reading 0 / 1 row groups" in captured249250251@pytest.mark.may_fail_cloud # reason: inspects logs252@pytest.mark.write_disk253def test_parquet_statistics(254plmonkeypatch: PlMonkeyPatch, capfd: Any, tmp_path: Path255) -> None:256tmp_path.mkdir(exist_ok=True)257258plmonkeypatch.setenv("POLARS_VERBOSE", "1")259260df = pl.DataFrame({"idx": pl.arange(0, 100, eager=True)}).with_columns(261(pl.col("idx") // 25).alias("part")262)263df = pl.concat(df.partition_by("part", as_dict=False), rechunk=False)264assert df.n_chunks("all") == [4, 4]265266file_path = tmp_path / "stats.parquet"267df.write_parquet(file_path, statistics=True, use_pyarrow=False, row_group_size=50)268269for pred in [270pl.col("idx") < 50,271pl.col("idx") > 50,272pl.col("idx").null_count() != 0,273pl.col("idx").null_count() == 0,274pl.col("idx").min() == pl.col("part").null_count(),275]:276result = pl.scan_parquet(file_path).filter(pred).collect()277assert_frame_equal(result, df.filter(pred))278279captured = capfd.readouterr().err280281assert "Predicate pushdown: reading 1 / 2 row groups" in captured282283284@pytest.mark.write_disk285def test_categorical(tmp_path: Path) -> None:286tmp_path.mkdir(exist_ok=True)287288df = pl.DataFrame(289[290pl.Series("name", ["Bob", "Alice", "Bob"], pl.Categorical),291pl.Series("amount", [100, 200, 300]),292]293)294295file_path = tmp_path / "categorical.parquet"296df.write_parquet(file_path)297298result = (299pl.scan_parquet(file_path)300.group_by("name")301.agg(pl.col("amount").sum())302.collect()303.sort("name")304)305expected = pl.DataFrame(306{"name": ["Alice", "Bob"], "amount": [200, 400]},307schema_overrides={"name": pl.Categorical},308)309assert_frame_equal(result, expected)310311312def test_glob_n_rows(io_files_path: Path) -> None:313file_path = io_files_path / "foods*.parquet"314df = pl.scan_parquet(file_path, n_rows=40).collect()315316# 27 rows from foods1.parquet and 13 from foods2.parquet317assert df.shape == (40, 4)318319# take first and last rows320assert df[[0, 39]].to_dict(as_series=False) == {321"category": ["vegetables", "seafood"],322"calories": [45, 146],323"fats_g": [0.5, 6.0],324"sugars_g": [2, 2],325}326327328@pytest.mark.write_disk329def test_parquet_statistics_filter_9925(tmp_path: Path) -> None:330tmp_path.mkdir(exist_ok=True)331file_path = tmp_path / "codes.parquet"332df = pl.DataFrame({"code": [300964, 300972, 500_000, 26]})333df.write_parquet(file_path, statistics=True)334335q = pl.scan_parquet(file_path).filter(336(pl.col("code").floordiv(100_000)).is_in([0, 3])337)338assert q.collect().to_dict(as_series=False) == {"code": [300964, 300972, 26]}339340341@pytest.mark.write_disk342def test_parquet_statistics_filter_11069(tmp_path: Path) -> None:343tmp_path.mkdir(exist_ok=True)344file_path = tmp_path / "foo.parquet"345pl.DataFrame({"x": [1, None]}).write_parquet(file_path, statistics=False)346347result = pl.scan_parquet(file_path).filter(pl.col("x").is_null()).collect()348expected = {"x": [None]}349assert result.to_dict(as_series=False) == expected350351352def test_parquet_list_arg(io_files_path: Path) -> None:353first = io_files_path / "foods1.parquet"354second = io_files_path / "foods2.parquet"355356df = pl.scan_parquet(source=[first, second]).collect()357assert df.shape == (54, 4)358assert df.row(-1) == ("seafood", 194, 12.0, 1)359assert df.row(0) == ("vegetables", 45, 0.5, 2)360361362@pytest.mark.write_disk363def test_parquet_many_row_groups_12297(tmp_path: Path) -> None:364tmp_path.mkdir(exist_ok=True)365file_path = tmp_path / "foo.parquet"366df = pl.DataFrame({"x": range(100)})367df.write_parquet(file_path, row_group_size=5, use_pyarrow=True)368assert_frame_equal(pl.scan_parquet(file_path).collect(), df)369370371@pytest.mark.write_disk372def test_row_index_empty_file(tmp_path: Path) -> None:373tmp_path.mkdir(exist_ok=True)374file_path = tmp_path / "test.parquet"375df = pl.DataFrame({"a": []}, schema={"a": pl.Float32})376df.write_parquet(file_path)377result = pl.scan_parquet(file_path).with_row_index("idx").collect()378assert result.schema == OrderedDict([("idx", pl.UInt32), ("a", pl.Float32)])379380381@pytest.mark.write_disk382def test_io_struct_async_12500(tmp_path: Path) -> None:383file_path = tmp_path / "test.parquet"384pl.DataFrame(385[386pl.Series("c1", [{"a": "foo", "b": "bar"}], dtype=pl.Struct),387pl.Series("c2", [18]),388]389).write_parquet(file_path)390assert pl.scan_parquet(file_path).select("c1").collect().to_dict(391as_series=False392) == {"c1": [{"a": "foo", "b": "bar"}]}393394395@pytest.mark.write_disk396@pytest.mark.parametrize("streaming", [True, False])397def test_parquet_different_schema(tmp_path: Path, streaming: bool) -> None:398# Schema is different but the projected columns are same dtype.399f1 = tmp_path / "a.parquet"400f2 = tmp_path / "b.parquet"401a = pl.DataFrame({"a": [1.0], "b": "a"})402403b = pl.DataFrame({"a": [1], "b": "a"})404405a.write_parquet(f1)406b.write_parquet(f2)407assert pl.scan_parquet([f1, f2]).select("b").collect(408engine="streaming" if streaming else "in-memory"409).columns == ["b"]410411412@pytest.mark.write_disk413def test_nested_slice_12480(tmp_path: Path) -> None:414path = tmp_path / "data.parquet"415df = pl.select(pl.lit(1).repeat_by(10_000).explode().cast(pl.List(pl.Int32)))416417df.write_parquet(path, use_pyarrow=True, pyarrow_options={"data_page_size": 1})418419assert pl.scan_parquet(path).slice(0, 1).collect().height == 1420421422@pytest.mark.write_disk423def test_scan_deadlock_rayon_spawn_from_async_15172(424plmonkeypatch: PlMonkeyPatch, tmp_path: Path425) -> None:426plmonkeypatch.setenv("POLARS_FORCE_ASYNC", "1")427plmonkeypatch.setenv("POLARS_MAX_THREADS", "1")428path = tmp_path / "data.parquet"429430df = pl.Series("x", [1]).to_frame()431df.write_parquet(path)432433results = [pl.DataFrame()]434435def scan_collect() -> None:436results[0] = pl.collect_all([pl.scan_parquet(path)])[0]437438# Make sure we don't sit there hanging forever on the broken case439t = Thread(target=scan_collect, daemon=True)440t.start()441t.join(5)442443assert results[0].equals(df)444445446@pytest.mark.write_disk447@pytest.mark.parametrize("streaming", [True, False])448def test_parquet_schema_mismatch_panic_17067(tmp_path: Path, streaming: bool) -> None:449pl.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}).write_parquet(tmp_path / "1.parquet")450pl.DataFrame({"c": [1, 2, 3], "d": [4, 5, 6]}).write_parquet(tmp_path / "2.parquet")451452if streaming:453with pytest.raises(pl.exceptions.SchemaError):454pl.scan_parquet(tmp_path).collect(engine="streaming")455else:456with pytest.raises(pl.exceptions.SchemaError):457pl.scan_parquet(tmp_path).collect(engine="in-memory")458459460@pytest.mark.write_disk461def test_predicate_push_down_categorical_17744(tmp_path: Path) -> None:462path = tmp_path / "1"463464df = pl.DataFrame(465data={466"n": [1, 2, 3],467"ccy": ["USD", "JPY", "EUR"],468},469schema_overrides={"ccy": pl.Categorical()},470)471df.write_parquet(path)472expect = df.head(1).with_columns(pl.col(pl.Categorical).cast(pl.String))473474lf = pl.scan_parquet(path)475476for predicate in [pl.col("ccy") == "USD", pl.col("ccy").is_in(["USD"])]:477assert_frame_equal(478lf.filter(predicate)479.with_columns(pl.col(pl.Categorical).cast(pl.String))480.collect(),481expect,482)483484485@pytest.mark.write_disk486@pytest.mark.parametrize("streaming", [True, False])487def test_parquet_slice_pushdown_non_zero_offset(488tmp_path: Path, streaming: bool489) -> None:490paths = [tmp_path / "1", tmp_path / "2", tmp_path / "3"]491dfs = [pl.DataFrame({"x": i}) for i in range(len(paths))]492493for df, p in zip(dfs, paths, strict=True):494df.write_parquet(p)495496# Parquet files containing only the metadata - i.e. the data parts are removed.497# Used to test that a reader doesn't try to read any data.498def trim_to_metadata(path: str | Path) -> None:499path = Path(path)500v = path.read_bytes()501metadata_and_footer_len = 8 + int.from_bytes(v[-8:][:4], "little")502path.write_bytes(v[-metadata_and_footer_len:])503504trim_to_metadata(paths[0])505trim_to_metadata(paths[2])506507# Check baseline:508# * Metadata can be read without error509assert pl.read_parquet_schema(paths[0]) == dfs[0].schema510# * Attempting to read any data will error511with pytest.raises(ComputeError):512pl.scan_parquet(paths[0]).collect(513engine="streaming" if streaming else "in-memory"514)515516df = dfs[1]517assert_frame_equal(518pl.scan_parquet(paths)519.slice(1, 1)520.collect(engine="streaming" if streaming else "in-memory"),521df,522)523assert_frame_equal(524pl.scan_parquet(paths[1:])525.head(1)526.collect(engine="streaming" if streaming else "in-memory"),527df,528)529assert_frame_equal(530(531pl.scan_parquet([paths[1], paths[1], paths[1]])532.with_row_index()533.slice(1, 1)534.collect(engine="streaming" if streaming else "in-memory")535),536df.with_row_index(offset=1),537)538assert_frame_equal(539(540pl.scan_parquet([paths[1], paths[1], paths[1]])541.with_row_index(offset=1)542.slice(1, 1)543.collect(engine="streaming" if streaming else "in-memory")544),545df.with_row_index(offset=2),546)547assert_frame_equal(548pl.scan_parquet(paths[1:])549.head(1)550.collect(engine="streaming" if streaming else "in-memory"),551df,552)553554# Negative slice unsupported in streaming555if not streaming:556assert_frame_equal(pl.scan_parquet(paths).slice(-2, 1).collect(), df)557assert_frame_equal(pl.scan_parquet(paths[:2]).tail(1).collect(), df)558assert_frame_equal(559pl.scan_parquet(paths[1:]).slice(-99, 1).collect(), df.clear()560)561562path = tmp_path / "data"563df = pl.select(x=pl.int_range(0, 50))564df.write_parquet(path)565assert_frame_equal(pl.scan_parquet(path).slice(-100, 75).collect(), df.head(25))566assert_frame_equal(567pl.scan_parquet([path, path]).with_row_index().slice(-25, 100).collect(),568pl.concat([df, df]).with_row_index().slice(75),569)570assert_frame_equal(571pl.scan_parquet([path, path])572.with_row_index(offset=10)573.slice(-25, 100)574.collect(),575pl.concat([df, df]).with_row_index(offset=10).slice(75),576)577assert_frame_equal(578pl.scan_parquet(path).slice(-1, (1 << 32) - 1).collect(), df.tail(1)579)580581582@pytest.mark.write_disk583def test_predicate_slice_pushdown_row_index_20485(tmp_path: Path) -> None:584tmp_path.mkdir(exist_ok=True)585586file_path = tmp_path / "slice_pushdown.parquet"587row_group_size = 100000588num_row_groups = 3589590df = pl.select(ref=pl.int_range(num_row_groups * row_group_size))591df.write_parquet(file_path, row_group_size=row_group_size)592593# Use a slice that starts near the end of one row group and extends into the next594# to test handling of slices that span multiple row groups.595slice_start = 199995596slice_len = 10597ldf = pl.scan_parquet(file_path)598sliced_df = ldf.with_row_index().slice(slice_start, slice_len).collect()599sliced_df_no_pushdown = (600ldf.with_row_index()601.slice(slice_start, slice_len)602.collect(optimizations=pl.QueryOptFlags(slice_pushdown=False))603)604605expected_index = list(range(slice_start, slice_start + slice_len))606actual_index = list(sliced_df["index"])607assert actual_index == expected_index608609assert_frame_equal(sliced_df, sliced_df_no_pushdown)610611612@pytest.mark.write_disk613@pytest.mark.parametrize("streaming", [True, False])614def test_parquet_row_groups_shift_bug_18739(tmp_path: Path, streaming: bool) -> None:615tmp_path.mkdir(exist_ok=True)616path = tmp_path / "data.bin"617618df = pl.DataFrame({"id": range(100)})619df.write_parquet(path, row_group_size=1)620621lf = pl.scan_parquet(path)622assert_frame_equal(df, lf.collect(engine="streaming" if streaming else "in-memory"))623624625@pytest.mark.write_disk626@pytest.mark.parametrize("streaming", [True, False])627def test_dsl2ir_cached_metadata(tmp_path: Path, streaming: bool) -> None:628df = pl.DataFrame({"x": 1})629path = tmp_path / "1"630df.write_parquet(path)631632lf = pl.scan_parquet(path)633assert_frame_equal(lf.collect(), df)634635# Removes the metadata portion of the parquet file.636# Used to test that a reader doesn't try to read the metadata.637def remove_metadata(path: str | Path) -> None:638path = Path(path)639v = path.read_bytes()640metadata_and_footer_len = 8 + int.from_bytes(v[-8:][:4], "little")641path.write_bytes(v[:-metadata_and_footer_len] + b"PAR1")642643remove_metadata(path)644assert_frame_equal(lf.collect(engine="streaming" if streaming else "in-memory"), df)645646647@pytest.mark.write_disk648def test_parquet_unaligned_schema_read(tmp_path: Path) -> None:649dfs = [650pl.DataFrame({"a": 1, "b": 10}),651pl.DataFrame({"b": 11, "a": 2}),652pl.DataFrame({"x": 3, "a": 3, "y": 3, "b": 12}),653]654655paths = [tmp_path / "1", tmp_path / "2", tmp_path / "3"]656657for df, path in zip(dfs, paths, strict=True):658df.write_parquet(path)659660lf = pl.scan_parquet(paths, extra_columns="ignore")661662assert_frame_equal(663lf.select("a").collect(engine="in-memory"),664pl.DataFrame({"a": [1, 2, 3]}),665)666667assert_frame_equal(668lf.with_row_index().select("a").collect(engine="in-memory"),669pl.DataFrame({"a": [1, 2, 3]}),670)671672assert_frame_equal(673lf.select("b", "a").collect(engine="in-memory"),674pl.DataFrame({"b": [10, 11, 12], "a": [1, 2, 3]}),675)676677assert_frame_equal(678pl.scan_parquet(paths[:2]).collect(engine="in-memory"),679pl.DataFrame({"a": [1, 2], "b": [10, 11]}),680)681682lf = pl.scan_parquet(paths, extra_columns="raise")683684with pytest.raises(pl.exceptions.SchemaError):685lf.collect(engine="in-memory")686687with pytest.raises(pl.exceptions.SchemaError):688lf.with_row_index().collect(engine="in-memory")689690691@pytest.mark.write_disk692@pytest.mark.parametrize("streaming", [True, False])693def test_parquet_unaligned_schema_read_dtype_mismatch(694tmp_path: Path, streaming: bool695) -> None:696dfs = [697pl.DataFrame({"a": 1, "b": 10}),698pl.DataFrame({"b": "11", "a": "2"}),699]700701paths = [tmp_path / "1", tmp_path / "2"]702703for df, path in zip(dfs, paths, strict=True):704df.write_parquet(path)705706lf = pl.scan_parquet(paths)707708with pytest.raises(pl.exceptions.SchemaError, match="data type mismatch"):709lf.collect(engine="streaming" if streaming else "in-memory")710711712@pytest.mark.write_disk713@pytest.mark.parametrize("streaming", [True, False])714def test_parquet_unaligned_schema_read_missing_cols_from_first(715tmp_path: Path, streaming: bool716) -> None:717dfs = [718pl.DataFrame({"a": 1, "b": 10}),719pl.DataFrame({"b": 11}),720]721722paths = [tmp_path / "1", tmp_path / "2"]723724for df, path in zip(dfs, paths, strict=True):725df.write_parquet(path)726727lf = pl.scan_parquet(paths)728729with pytest.raises(730(pl.exceptions.SchemaError, pl.exceptions.ColumnNotFoundError),731):732lf.collect(engine="streaming" if streaming else "in-memory")733734735@pytest.mark.parametrize("parallel", ["columns", "row_groups", "prefiltered", "none"])736@pytest.mark.parametrize("streaming", [True, False])737@pytest.mark.write_disk738def test_parquet_schema_arg(739tmp_path: Path,740parallel: ParallelStrategy,741streaming: bool,742) -> None:743tmp_path.mkdir(exist_ok=True)744dfs = [pl.DataFrame({"a": 1, "b": 1}), pl.DataFrame({"a": 2, "b": 2})]745paths = [tmp_path / "1", tmp_path / "2"]746747for df, path in zip(dfs, paths, strict=True):748df.write_parquet(path)749750schema: dict[str, pl.DataType] = {751"1": pl.Datetime(time_unit="ms", time_zone="CET"),752"a": pl.Int64(),753"b": pl.Int64(),754}755756# Test `schema` containing an extra column.757758lf = pl.scan_parquet(paths, parallel=parallel, schema=schema)759760with pytest.raises((pl.exceptions.SchemaError, pl.exceptions.ColumnNotFoundError)):761lf.collect(engine="streaming" if streaming else "in-memory")762763lf = pl.scan_parquet(764paths, parallel=parallel, schema=schema, missing_columns="insert"765)766767assert_frame_equal(768lf.collect(engine="streaming" if streaming else "in-memory"),769pl.DataFrame({"1": None, "a": [1, 2], "b": [1, 2]}, schema=schema),770)771772# Just one test that `read_parquet` is propagating this argument.773assert_frame_equal(774pl.read_parquet(775paths, parallel=parallel, schema=schema, missing_columns="insert"776),777pl.DataFrame({"1": None, "a": [1, 2], "b": [1, 2]}, schema=schema),778)779780# Issue #19081: If a schema arg is passed, ensure its fields are propagated781# to the IR, otherwise even if `missing_columns='insert'`, downstream782# `select()`s etc. will fail with ColumnNotFound if the column is not in783# the first file.784lf = pl.scan_parquet(785paths, parallel=parallel, schema=schema, missing_columns="insert"786).select("1")787788s = lf.collect(engine="streaming" if streaming else "in-memory").to_series()789assert s.len() == 2790assert s.null_count() == 2791792# Test files containing extra columns not in `schema`793794schema: dict[str, type[pl.DataType]] = {"a": pl.Int64} # type: ignore[no-redef]795796for missing_columns in ["insert", "raise"]:797lf = pl.scan_parquet(798paths,799parallel=parallel,800schema=schema,801missing_columns=missing_columns, # type: ignore[arg-type]802)803804with pytest.raises(pl.exceptions.SchemaError):805lf.collect(engine="streaming" if streaming else "in-memory")806807lf = pl.scan_parquet(808paths,809parallel=parallel,810schema=schema,811extra_columns="ignore",812).select("a")813814assert_frame_equal(815lf.collect(engine="in-memory"),816pl.DataFrame({"a": [1, 2]}, schema=schema),817)818819schema: dict[str, type[pl.DataType]] = {"a": pl.Int64, "b": pl.Int8} # type: ignore[no-redef]820821lf = pl.scan_parquet(paths, parallel=parallel, schema=schema)822823with pytest.raises(824pl.exceptions.SchemaError,825match="data type mismatch for column b: incoming: Int64 != target: Int8",826):827lf.collect(engine="streaming" if streaming else "in-memory")828829830def test_scan_parquet_empty_path_expansion(tmp_path: Path) -> None:831tmp_path.mkdir(exist_ok=True)832833with pytest.raises(834ComputeError,835match=r"failed to retrieve first file schema \(parquet\): "836r"expanded paths were empty \(path expansion input: "837".*Hint: passing a schema can allow this scan to succeed with an empty DataFrame",838):839pl.scan_parquet(tmp_path).collect()840841# Scan succeeds when schema is provided842assert_frame_equal(843pl.scan_parquet(tmp_path, schema={"x": pl.Int64}).collect(),844pl.DataFrame(schema={"x": pl.Int64}),845)846847assert_frame_equal(848pl.scan_parquet(tmp_path, schema={"x": pl.Int64}).with_row_index().collect(),849pl.DataFrame(schema={"x": pl.Int64}).with_row_index(),850)851852assert_frame_equal(853pl.scan_parquet(854tmp_path, schema={"x": pl.Int64}, hive_schema={"h": pl.String}855).collect(),856pl.DataFrame(schema={"x": pl.Int64, "h": pl.String}),857)858859assert_frame_equal(860(861pl.scan_parquet(862tmp_path, schema={"x": pl.Int64}, hive_schema={"h": pl.String}863)864.with_row_index()865.collect()866),867pl.DataFrame(schema={"x": pl.Int64, "h": pl.String}).with_row_index(),868)869870871@pytest.mark.parametrize("missing_columns", ["insert", "raise"])872@pytest.mark.write_disk873def test_scan_parquet_ignores_dtype_mismatch_for_non_projected_columns_19249(874tmp_path: Path,875missing_columns: str,876) -> None:877tmp_path.mkdir(exist_ok=True)878paths = [tmp_path / "1", tmp_path / "2"]879880pl.DataFrame({"a": 1, "b": 1}, schema={"a": pl.Int32, "b": pl.UInt8}).write_parquet(881paths[0]882)883pl.DataFrame(884{"a": 1, "b": 1}, schema={"a": pl.Int32, "b": pl.UInt64}885).write_parquet(paths[1])886887assert_frame_equal(888pl.scan_parquet(paths, missing_columns=missing_columns) # type: ignore[arg-type]889.select("a")890.collect(engine="in-memory"),891pl.DataFrame({"a": [1, 1]}, schema={"a": pl.Int32}),892)893894895@pytest.mark.parametrize("streaming", [True, False])896@pytest.mark.write_disk897def test_scan_parquet_streaming_row_index_19606(898tmp_path: Path, streaming: bool899) -> None:900tmp_path.mkdir(exist_ok=True)901paths = [tmp_path / "1", tmp_path / "2"]902903dfs = [pl.DataFrame({"x": i}) for i in range(len(paths))]904905for df, p in zip(dfs, paths, strict=True):906df.write_parquet(p)907908assert_frame_equal(909pl.scan_parquet(tmp_path)910.with_row_index()911.collect(engine="streaming" if streaming else "in-memory"),912pl.DataFrame(913{"index": [0, 1], "x": [0, 1]}, schema={"index": pl.UInt32, "x": pl.Int64}914),915)916917918def test_scan_parquet_prefilter_panic_22452() -> None:919# This is, the easiest way to control the threadpool size so that it is stable.920out = subprocess.check_output(921[922sys.executable,923"-c",924"""\925import os926927os.environ["POLARS_MAX_THREADS"] = "2"928929import io930931import polars as pl932from polars.testing import assert_frame_equal933934assert pl.thread_pool_size() == 2935936f = io.BytesIO()937938df = pl.DataFrame({x: 1 for x in ["a", "b", "c", "d", "e"]})939df.write_parquet(f)940f.seek(0)941942assert_frame_equal(943pl.scan_parquet(f, parallel="prefiltered")944.filter(pl.col(c) == 1 for c in ["a", "b", "c"])945.collect(),946df,947)948949print("OK", end="")950""",951],952)953954assert out == b"OK"955956957@pytest.mark.slow958def test_scan_parquet_in_mem_to_streaming_dispatch_deadlock_22641() -> None:959out = subprocess.check_output(960[961sys.executable,962"-c",963"""\964import os965966os.environ["POLARS_MAX_THREADS"] = "1"967os.environ["POLARS_VERBOSE"] = "1"968969import io970import sys971from threading import Thread972973import polars as pl974975assert pl.thread_pool_size() == 1976977f = io.BytesIO()978pl.DataFrame({"x": 1}).write_parquet(f)979980q = (981pl.scan_parquet(f)982.filter(pl.sum_horizontal(pl.col("x"), pl.col("x"), pl.col("x")) >= 0)983.join(pl.scan_parquet(f), on="x", how="left")984)985986results = [987pl.DataFrame(),988pl.DataFrame(),989pl.DataFrame(),990pl.DataFrame(),991pl.DataFrame(),992]993994995def run():996# Also test just a single scan997pl.scan_parquet(f).collect()998999print("QUERY-FENCE", file=sys.stderr)10001001results[0] = q.collect()10021003print("QUERY-FENCE", file=sys.stderr)10041005results[1] = pl.concat([q, q, q]).collect().head(1)10061007print("QUERY-FENCE", file=sys.stderr)10081009results[2] = pl.collect_all([q, q, q])[0]10101011print("QUERY-FENCE", file=sys.stderr)10121013results[3] = pl.collect_all(3 * [pl.concat(3 * [q])])[0].head(1)10141015print("QUERY-FENCE", file=sys.stderr)10161017results[4] = q.collect(background=True).fetch_blocking()101810191020t = Thread(target=run, daemon=True)1021t.start()1022t.join(5)10231024assert [x.equals(pl.DataFrame({"x": 1})) for x in results] == [1025True,1026True,1027True,1028True,1029True,1030]10311032print("OK", end="", file=sys.stderr)1033""",1034],1035stderr=subprocess.STDOUT,1036)10371038assert out.endswith(b"OK")10391040def ensure_caches_dropped(verbose_log: str) -> None:1041cache_hit_prefix = "CACHE HIT: cache id: "10421043ids_hit = {1044x[len(cache_hit_prefix) :]1045for x in verbose_log.splitlines()1046if x.startswith(cache_hit_prefix)1047}10481049cache_drop_prefix = "CACHE DROP: cache id: "10501051ids_dropped = {1052x[len(cache_drop_prefix) :]1053for x in verbose_log.splitlines()1054if x.startswith(cache_drop_prefix)1055}10561057assert ids_hit == ids_dropped10581059out_str = out.decode()10601061for logs in out_str.split("QUERY-FENCE"):1062ensure_caches_dropped(logs)106310641065def test_parquet_prefiltering_inserted_column_23268() -> None:1066df = pl.DataFrame({"a": [1, 2, 3, 4]}, schema={"a": pl.Int8})10671068f = io.BytesIO()1069df.write_parquet(f)10701071f.seek(0)1072assert_frame_equal(1073(1074pl.scan_parquet(1075f,1076schema={"a": pl.Int8, "b": pl.Int16},1077missing_columns="insert",1078)1079.filter(pl.col("a") == 3)1080.filter(pl.col("b") == 3)1081.collect()1082),1083pl.DataFrame(schema={"a": pl.Int8, "b": pl.Int16}),1084)108510861087@pytest.mark.may_fail_cloud # reason: inspects logs1088def test_scan_parquet_prefilter_with_cast(1089plmonkeypatch: PlMonkeyPatch,1090capfd: pytest.CaptureFixture[str],1091) -> None:1092f = io.BytesIO()10931094df = pl.DataFrame(1095{1096"a": ["A", "B", "C", "D", "E", "F"],1097"b": pl.Series([1, 1, 1, 1, 0, 1], dtype=pl.UInt8),1098}1099)11001101df.write_parquet(f, row_group_size=3)11021103md = pq.read_metadata(f)11041105assert [md.row_group(i).num_rows for i in range(md.num_row_groups)] == [3, 3]11061107q = pl.scan_parquet(1108f,1109schema={"a": pl.String, "b": pl.Int16},1110cast_options=pl.ScanCastOptions(integer_cast="upcast"),1111include_file_paths="file_path",1112).filter(pl.col("b") - 1 == pl.lit(-1, dtype=pl.Int16))11131114with plmonkeypatch.context() as cx:1115cx.setenv("POLARS_VERBOSE", "1")1116capfd.readouterr()1117out = q.collect()1118capture = capfd.readouterr().err11191120assert (1121"[ParquetFileReader]: Pre-filtered decode enabled (1 live, 1 non-live)"1122in capture1123)1124assert (1125"[ParquetFileReader]: Predicate pushdown: reading 1 / 2 row groups" in capture1126)11271128assert_frame_equal(1129out,1130pl.DataFrame(1131{1132"a": "E",1133"b": pl.Series([0], dtype=pl.Int16),1134"file_path": "in-mem",1135}1136),1137)113811391140def test_prefilter_with_n_rows_23790() -> None:1141df = pl.DataFrame(1142{1143"a": ["A", "B", "C", "D", "E", "F"],1144"b": [1, 2, 3, 4, 5, 6],1145}1146)11471148f = io.BytesIO()11491150df.write_parquet(f, row_group_size=2)11511152f.seek(0)11531154md = pq.read_metadata(f)11551156assert [md.row_group(i).num_rows for i in range(md.num_row_groups)] == [2, 2, 2]11571158f.seek(0)1159q = pl.scan_parquet(f, n_rows=3).filter(pl.col("b").is_in([1, 3]))11601161assert_frame_equal(q.collect(), pl.DataFrame({"a": ["A", "C"], "b": [1, 3]}))11621163# With row index / file_path11641165df = pl.DataFrame(1166{1167"a": ["A", "B", "C", "D", "E", "F"],1168"b": [1, 2, 3, 4, 5, 6],1169}1170)11711172f = io.BytesIO()11731174df.write_parquet(f, row_group_size=2)11751176f.seek(0)1177md = pq.read_metadata(f)11781179assert [md.row_group(i).num_rows for i in range(md.num_row_groups)] == [2, 2, 2]11801181f.seek(0)1182q = pl.scan_parquet(1183f,1184n_rows=3,1185row_index_name="index",1186include_file_paths="file_path",1187).filter(pl.col("b").is_in([1, 3]))11881189assert_frame_equal(1190q.collect(),1191pl.DataFrame(1192{1193"index": pl.Series([0, 2], dtype=pl.get_index_type()),1194"a": ["A", "C"],1195"b": [1, 3],1196"file_path": "in-mem",1197}1198),1199)120012011202def test_scan_parquet_filter_index_panic_23849(plmonkeypatch: PlMonkeyPatch) -> None:1203plmonkeypatch.setenv("POLARS_PARQUET_DECODE_TARGET_VALUES_PER_THREAD", "5")1204num_rows = 31205num_cols = 512061207f = io.BytesIO()12081209pl.select(1210pl.int_range(0, num_rows).alias(f"col_{i}") for i in range(num_cols)1211).write_parquet(f)12121213for parallel in ["auto", "columns", "row_groups", "prefiltered", "none"]:1214pl.scan_parquet(f, parallel=parallel).filter( # type: ignore[arg-type]1215pl.col("col_0").ge(0) & pl.col("col_0").lt(num_rows + 1)1216).collect()121712181219@pytest.mark.write_disk1220def test_sink_large_rows_25834(tmp_path: Path, plmonkeypatch: PlMonkeyPatch) -> None:1221plmonkeypatch.setenv("POLARS_IDEAL_SINK_MORSEL_SIZE_BYTES", "1")1222df = pl.select(idx=pl.repeat(1, 20_000), bytes=pl.lit(b"AAAAA"))12231224df.write_parquet(tmp_path / "single.parquet")1225assert_frame_equal(pl.scan_parquet(tmp_path / "single.parquet").collect(), df)12261227md = pq.read_metadata(tmp_path / "single.parquet")1228assert [md.row_group(i).num_rows for i in range(md.num_row_groups)] == [122916384,12303616,1231]12321233df.write_parquet(1234tmp_path / "partitioned",1235partition_by="idx",1236)1237assert_frame_equal(pl.scan_parquet(tmp_path / "partitioned").collect(), df)123812391240def test_scan_parquet_prefilter_is_between_non_column_input_26283() -> None:1241f = io.BytesIO()12421243df = pl.DataFrame(1244{1245"timestamp": pl.datetime_range(1246start=datetime(2026, 1, 1),1247end=datetime(2026, 1, 1, 0, 5, 0),1248interval="1s",1249eager=True,1250),1251},1252schema={"timestamp": pl.Datetime("us")},1253height=301,1254)12551256df.write_parquet(f)1257f.seek(0)12581259q = pl.scan_parquet(f).filter(1260pl.col("timestamp")1261.dt.date()1262.cast(pl.Datetime("us"))1263.is_between(datetime(2026, 1, 1), datetime(2026, 1, 1))1264)12651266assert_frame_equal(q.collect(), df)126712681269def test_sink_parquet_arrow_schema() -> None:1270df = pl.DataFrame({"x": [0, 1, None]})12711272f = io.BytesIO()1273df.lazy().sink_parquet(1274f,1275arrow_schema=pa.schema(1276[1277pa.field(1278"x",1279pa.int64(),1280metadata={"custom_field_md_key": "custom_field_md_value"},1281)1282],1283),1284)12851286f.seek(0)12871288assert (1289pq.read_schema(f).field("x").metadata[b"custom_field_md_key"]1290== b"custom_field_md_value"1291)12921293f = io.BytesIO()12941295df.lazy().sink_parquet(1296f,1297arrow_schema=pa.schema(1298[pa.field("x", pa.int64())],1299metadata={"custom_schema_md_key": "custom_schema_md_value"},1300),1301metadata={"custom_footer_md_key": "custom_footer_md_value"},1302)13031304f.seek(0)13051306assert pq.read_schema(f).metadata == {1307b"custom_schema_md_key": b"custom_schema_md_value"1308}1309assert (1310pq.read_metadata(f).metadata[b"custom_footer_md_key"]1311== b"custom_footer_md_value"1312)1313assert (1314pl.read_parquet_metadata(f)["custom_footer_md_key"] == "custom_footer_md_value"1315)1316assert pa.ipc.read_schema(1317pa.BufferReader(base64.b64decode(pq.read_metadata(f).metadata[b"ARROW:schema"]))1318).metadata == {b"custom_schema_md_key": b"custom_schema_md_value"}13191320with pytest.raises(1321SchemaError,1322match=r"provided dtype \(Int32\) does not match output dtype \(Int64\)",1323):1324df.lazy().sink_parquet(1325io.BytesIO(),1326arrow_schema=pa.schema(1327[pa.field("x", pa.int32())],1328),1329)13301331with pytest.raises(1332SchemaError,1333match="nullable is false but array contained 1 NULL",1334):1335df.lazy().sink_parquet(1336io.BytesIO(),1337arrow_schema=pa.schema(1338[pa.field("x", pa.int64(), nullable=False)],1339),1340)13411342with pytest.raises(1343SchemaError,1344match=r"schema names in arrow_schema differ",1345):1346df.lazy().sink_parquet(1347io.BytesIO(),1348arrow_schema=pa.schema(1349[pa.field("z", pa.int64())],1350),1351)13521353with pytest.raises(1354SchemaError,1355match="schema names in arrow_schema differ",1356):1357df.lazy().sink_parquet(1358io.BytesIO(),1359arrow_schema=pa.schema([]),1360)13611362with pytest.raises(1363SchemaError,1364match="schema names in arrow_schema differ",1365):1366df.lazy().sink_parquet(1367io.BytesIO(),1368arrow_schema=pa.schema(1369[1370pa.field(1371"x",1372pa.int64(),1373),1374pa.field(1375"y",1376pa.int64(),1377),1378],1379),1380)138113821383def test_sink_parquet_arrow_schema_logical_types() -> None:1384from tests.unit.datatypes.test_extension import PythonTestExtension13851386df = pl.DataFrame(1387{1388"categorical": pl.Series(1389["A"], dtype=pl.Categorical(pl.Categories.random())1390),1391"datetime": pl.Series([datetime(2026, 1, 1)], dtype=pl.Datetime("ns")),1392"extension[str]": pl.Series(["A"], dtype=PythonTestExtension(pl.String)),1393}1394)13951396with pytest.raises(SchemaError, match=r"Dictionary\(UInt32, LargeUtf8, false\)"):1397df.select("categorical").lazy().sink_parquet(1398io.BytesIO(),1399arrow_schema=pa.schema(1400[pa.field("categorical", pa.null())],1401),1402)14031404df.select("categorical").lazy().sink_parquet(1405io.BytesIO(),1406arrow_schema=pa.schema(1407[pa.field("categorical", pa.dictionary(pa.uint32(), pa.large_string()))],1408),1409)14101411with pytest.raises(SchemaError, match=r"Timestamp\(Nanosecond, None\)"):1412df.select("datetime").lazy().sink_parquet(1413io.BytesIO(),1414arrow_schema=pa.schema(1415[pa.field("datetime", pa.null())],1416),1417)14181419df.select("datetime").lazy().sink_parquet(1420io.BytesIO(),1421arrow_schema=pa.schema(1422[pa.field("datetime", pa.timestamp("ns"))],1423),1424)14251426def build_pyarrow_extension_type(name: str) -> Any:1427class PythonTestExtensionPyarrow(pa.ExtensionType): # type: ignore[misc]1428def __init__(self, data_type: pa.DataType) -> None:1429super().__init__(data_type, name)14301431def __arrow_ext_serialize__(self) -> bytes:1432return b""14331434@classmethod1435def __arrow_ext_deserialize__(1436cls, storage_type: Any, serialized: Any1437) -> Any:1438return PythonTestExtensionPyarrow(storage_type[0].type)14391440return PythonTestExtensionPyarrow(pa.large_string())14411442with pytest.raises(1443SchemaError,1444match=r'Extension\(ExtensionType { name: "testing.python_test_extension", inner: LargeUtf8, metadata: None }\)',1445):1446df.select("extension[str]").lazy().sink_parquet(1447io.BytesIO(),1448arrow_schema=pa.schema(1449[pa.field("extension[str]", build_pyarrow_extension_type("name"))],1450),1451)14521453df.select("extension[str]").lazy().sink_parquet(1454io.BytesIO(),1455arrow_schema=pa.schema(1456[1457pa.field(1458"extension[str]",1459build_pyarrow_extension_type("testing.python_test_extension"),1460)1461],1462),1463)146414651466def test_sink_parquet_arrow_schema_nested_types() -> None:1467df = pl.DataFrame(1468{1469"list[struct{a:int64}]": pl.Series(1470[[{"a": 1}, {"a": None}]], dtype=pl.List(pl.Struct({"a": pl.Int64}))1471),1472"array[int64, 2]": pl.Series([[0, None]], dtype=pl.Array(pl.Int64, 2)),1473}1474)14751476with pytest.raises(SchemaError, match="struct dtype mismatch"):1477df.select("list[struct{a:int64}]").lazy().sink_parquet(1478io.BytesIO(),1479arrow_schema=pa.schema(1480[1481pa.field(1482"list[struct{a:int64}]",1483pa.large_list(pa.struct([])),1484)1485],1486),1487)14881489with pytest.raises(SchemaError, match="struct dtype mismatch"):1490df.select("list[struct{a:int64}]").lazy().sink_parquet(1491io.BytesIO(),1492arrow_schema=pa.schema(1493[1494pa.field(1495"list[struct{a:int64}]",1496pa.large_list(1497pa.struct(1498[pa.field("a", pa.int64()), pa.field("b", pa.int64())]1499)1500),1501)1502],1503),1504)15051506with pytest.raises(1507SchemaError,1508match="nullable is false but array contained 1 NULL",1509):1510df.select("list[struct{a:int64}]").lazy().sink_parquet(1511io.BytesIO(),1512arrow_schema=pa.schema(1513[1514pa.field(1515"list[struct{a:int64}]",1516pa.large_list(1517pa.struct([pa.field("a", pa.int64(), nullable=False)])1518),1519)1520],1521),1522)15231524df.select("list[struct{a:int64}]").lazy().sink_parquet(1525io.BytesIO(),1526arrow_schema=pa.schema(1527[1528pa.field(1529"list[struct{a:int64}]",1530pa.large_list(pa.struct([pa.field("a", pa.int64())])),1531)1532],1533),1534)15351536with pytest.raises(SchemaError, match="fixed-size list dtype mismatch:"):1537df.select("array[int64, 2]").lazy().sink_parquet(1538io.BytesIO(),1539arrow_schema=pa.schema(1540[1541pa.field(1542"array[int64, 2]",1543pa.list_(pa.int64(), 0),1544)1545],1546),1547)15481549df.select("array[int64, 2]").lazy().sink_parquet(1550io.BytesIO(),1551arrow_schema=pa.schema(1552[1553pa.field(1554"array[int64, 2]",1555pa.list_(pa.int64(), 2),1556)1557],1558),1559)156015611562def test_sink_parquet_writes_strings_as_largeutf8_by_default() -> None:1563df = pl.DataFrame({"string": "A", "binary": [b"A"]})15641565with pytest.raises(1566SchemaError,1567match=r"provided dtype \(Utf8View\) does not match output dtype \(LargeUtf8\)",1568):1569df.lazy().select("string").sink_parquet(1570io.BytesIO(), arrow_schema=pa.schema([pa.field("string", pa.string_view())])1571)15721573with pytest.raises(1574SchemaError,1575match=r"provided dtype \(BinaryView\) does not match output dtype \(LargeBinary\)",1576):1577df.lazy().select("binary").sink_parquet(1578io.BytesIO(), arrow_schema=pa.schema([pa.field("binary", pa.binary_view())])1579)15801581f = io.BytesIO()15821583arrow_schema = pa.schema(1584[1585pa.field("string", pa.large_string()),1586pa.field("binary", pa.large_binary()),1587]1588)15891590df.lazy().sink_parquet(f, arrow_schema=arrow_schema)15911592f.seek(0)15931594assert pq.read_schema(f) == arrow_schema15951596f.seek(0)15971598assert_frame_equal(pl.scan_parquet(f).collect(), df)159916001601def test_sink_parquet_pyarrow_filter_string_type_26435() -> None:1602df = pl.DataFrame({"string": ["A", None, "B"], "int": [0, 1, 2]})16031604f = io.BytesIO()16051606df.write_parquet(f)16071608f.seek(0)16091610assert_frame_equal(1611pl.DataFrame(pq.read_table(f, filters=[("int", "=", 0)])),1612pl.DataFrame({"string": "A", "int": 0}),1613)16141615f.seek(0)16161617assert_frame_equal(1618pl.DataFrame(pq.read_table(f, filters=[("string", "=", "A")])),1619pl.DataFrame({"string": "A", "int": 0}),1620)162116221623def test_scan_parquet_temporal_lit_comparison_skip_batch_24095_25731(1624plmonkeypatch: PlMonkeyPatch,1625capfd: pytest.CaptureFixture[str],1626) -> None:1627plmonkeypatch.setenv("POLARS_VERBOSE", "1")16281629df = pl.DataFrame(1630{1631"datetime[ns]": pl.Series(1632[1633"2025-12-31 23:59:59.999999999",1634"2025-12-31 23:59:59.999999999",1635"2026-01-01 00:00:00.000000000",1636"2026-01-01 00:00:00.000000999",1637"2026-01-01 00:00:00.000001000",1638"2026-01-01 00:00:00.000001999",1639"2026-01-01 00:00:00.000002000",1640"2026-01-01 00:00:00.000999999",1641"2026-01-01 00:00:00.001000000",1642"2026-01-01 00:00:00.001000000",1643]1644).str.strptime(dtype=pl.Datetime("ns"), format="%Y-%m-%d %H:%M:%S%.f"),1645"duration[ns]": pl.Series(1646[-1, -1, 0, 999, 1000, 1999, 2000, -1, -1, -1]1647).cast(pl.Duration("ns")),1648}1649)16501651f = io.BytesIO()1652df.write_parquet(f, row_group_size=2)16531654q = pl.scan_parquet(f).filter(1655pl.col("datetime[ns]") == pl.lit(datetime(2026, 1, 1))1656)16571658capfd.readouterr()1659out = q.collect()1660capture = capfd.readouterr().err16611662assert "reading 1 / 5 row groups" in capture16631664assert_frame_equal(1665out.select(pl.col("datetime[ns]").dt.to_string()),1666pl.DataFrame(1667{1668"datetime[ns]": pl.Series(1669[1670"2026-01-01 00:00:00.000000000",1671"2026-01-01 00:00:00.000000999",1672]1673)1674}1675),1676)16771678q = pl.scan_parquet(f).filter(1679pl.col("datetime[ns]").is_between(1680pl.lit(datetime(2026, 1, 1)),1681pl.lit(datetime(2026, 1, 1, microsecond=1)),1682)1683)16841685capfd.readouterr()1686out = q.collect()1687capture = capfd.readouterr().err16881689assert "reading 2 / 5 row groups" in capture16901691assert_frame_equal(1692out.select(pl.col("datetime[ns]").dt.to_string()),1693pl.DataFrame(1694{1695"datetime[ns]": pl.Series(1696[1697"2026-01-01 00:00:00.000000000",1698"2026-01-01 00:00:00.000000999",1699"2026-01-01 00:00:00.000001000",1700"2026-01-01 00:00:00.000001999",1701]1702)1703}1704),1705)17061707capfd.readouterr()17081709q = pl.scan_parquet(f).filter(1710pl.col("datetime[ns]").is_between(1711pl.lit(datetime(2026, 1, 1)),1712pl.lit(datetime(2026, 1, 1, microsecond=1)),1713)1714)17151716capfd.readouterr()17171718out = q.collect()1719capture = capfd.readouterr().err17201721assert "reading 2 / 5 row groups" in capture17221723assert_frame_equal(1724out.select(pl.col("duration[ns]").dt.to_string()),1725pl.DataFrame(1726{1727"duration[ns]": pl.Series(1728[1729"PT0S",1730"PT0.000000999S",1731"PT0.000001S",1732"PT0.000001999S",1733]1734)1735}1736),1737)17381739capfd.readouterr()174017411742