Path: blob/main/py-polars/tests/unit/io/test_lazy_parquet.py
6939 views
from __future__ import annotations12import io3import subprocess4import sys5from collections import OrderedDict6from pathlib import Path7from threading import Thread8from typing import TYPE_CHECKING, Any910import pandas as pd11import pyarrow.parquet as pq12import pytest1314import polars as pl15from polars.exceptions import ComputeError16from polars.testing import assert_frame_equal1718if TYPE_CHECKING:19from polars._typing import ParallelStrategy202122@pytest.fixture23def parquet_file_path(io_files_path: Path) -> Path:24return io_files_path / "small.parquet"252627@pytest.fixture28def foods_parquet_path(io_files_path: Path) -> Path:29return io_files_path / "foods1.parquet"303132def test_scan_parquet(parquet_file_path: Path) -> None:33df = pl.scan_parquet(parquet_file_path)34assert df.collect().shape == (4, 3)353637def test_scan_parquet_local_with_async(38monkeypatch: Any, foods_parquet_path: Path39) -> None:40monkeypatch.setenv("POLARS_FORCE_ASYNC", "1")41pl.scan_parquet(foods_parquet_path.relative_to(Path.cwd())).head(1).collect()424344def test_row_index(foods_parquet_path: Path) -> None:45df = pl.read_parquet(foods_parquet_path, row_index_name="row_index")46assert df["row_index"].to_list() == list(range(27))4748df = (49pl.scan_parquet(foods_parquet_path, row_index_name="row_index")50.filter(pl.col("category") == pl.lit("vegetables"))51.collect()52)5354assert df["row_index"].to_list() == [0, 6, 11, 13, 14, 20, 25]5556df = (57pl.scan_parquet(foods_parquet_path, row_index_name="row_index")58.with_row_index("foo", 10)59.filter(pl.col("category") == pl.lit("vegetables"))60.collect()61)6263assert df["foo"].to_list() == [10, 16, 21, 23, 24, 30, 35]646566def test_row_index_len_16543(foods_parquet_path: Path) -> None:67q = pl.scan_parquet(foods_parquet_path).with_row_index()68assert q.select(pl.all()).select(pl.len()).collect().item() == 27697071@pytest.mark.write_disk72def test_categorical_parquet_statistics(tmp_path: Path) -> None:73tmp_path.mkdir(exist_ok=True)7475df = pl.DataFrame(76{77"book": [78"bookA",79"bookA",80"bookB",81"bookA",82"bookA",83"bookC",84"bookC",85"bookC",86],87"transaction_id": [1, 2, 3, 4, 5, 6, 7, 8],88"user": ["bob", "bob", "bob", "tim", "lucy", "lucy", "lucy", "lucy"],89}90).with_columns(pl.col("book").cast(pl.Categorical))9192file_path = tmp_path / "books.parquet"93df.write_parquet(file_path, statistics=True)9495parallel_options: list[ParallelStrategy] = [96"auto",97"columns",98"row_groups",99"none",100]101for par in parallel_options:102df = (103pl.scan_parquet(file_path, parallel=par)104.filter(pl.col("book") == "bookA")105.collect()106)107assert df.shape == (4, 3)108109110@pytest.mark.write_disk111def test_parquet_eq_stats(tmp_path: Path) -> None:112tmp_path.mkdir(exist_ok=True)113114file_path = tmp_path / "stats.parquet"115116df1 = pd.DataFrame({"a": [None, 1, None, 2, 3, 3, 4, 4, 5, 5]})117df1.to_parquet(file_path, engine="pyarrow")118df = pl.scan_parquet(file_path).filter(pl.col("a") == 4).collect()119assert df["a"].to_list() == [4.0, 4.0]120121assert (122pl.scan_parquet(file_path).filter(pl.col("a") == 2).select(pl.col("a").sum())123).collect()[0, "a"] == 2.0124125assert pl.scan_parquet(file_path).filter(pl.col("a") == 5).collect().shape == (1262,1271,128)129130131@pytest.mark.write_disk132def test_parquet_is_in_stats(tmp_path: Path) -> None:133tmp_path.mkdir(exist_ok=True)134135file_path = tmp_path / "stats.parquet"136137df1 = pd.DataFrame({"a": [None, 1, None, 2, 3, 3, 4, 4, 5, 5]})138df1.to_parquet(file_path, engine="pyarrow")139df = pl.scan_parquet(file_path).filter(pl.col("a").is_in([5])).collect()140assert df["a"].to_list() == [5.0, 5.0]141142assert (143pl.scan_parquet(file_path)144.filter(pl.col("a").is_in([5]))145.select(pl.col("a").sum())146).collect()[0, "a"] == 10.0147148assert (149pl.scan_parquet(file_path)150.filter(pl.col("a").is_in([1, 2, 3]))151.select(pl.col("a").sum())152).collect()[0, "a"] == 9.0153154assert (155pl.scan_parquet(file_path)156.filter(pl.col("a").is_in([1, 2, 3]))157.select(pl.col("a").sum())158).collect()[0, "a"] == 9.0159160assert (161pl.scan_parquet(file_path)162.filter(pl.col("a").is_in([5]))163.select(pl.col("a").sum())164).collect()[0, "a"] == 10.0165166assert pl.scan_parquet(file_path).filter(167pl.col("a").is_in([1, 2, 3, 4, 5])168).collect().shape == (8, 1)169170171@pytest.mark.write_disk172def test_parquet_stats(tmp_path: Path) -> None:173tmp_path.mkdir(exist_ok=True)174175file_path = tmp_path / "binary_stats.parquet"176177df1 = pd.DataFrame({"a": [None, 1, None, 2, 3, 3, 4, 4, 5, 5]})178df1.to_parquet(file_path, engine="pyarrow")179df = (180pl.scan_parquet(file_path)181.filter(pl.col("a").is_not_null() & (pl.col("a") > 4))182.collect()183)184assert df["a"].to_list() == [5.0, 5.0]185186assert (187pl.scan_parquet(file_path).filter(pl.col("a") > 4).select(pl.col("a").sum())188).collect()[0, "a"] == 10.0189190assert (191pl.scan_parquet(file_path).filter(pl.col("a") < 4).select(pl.col("a").sum())192).collect()[0, "a"] == 9.0193194assert (195pl.scan_parquet(file_path).filter(pl.col("a") < 4).select(pl.col("a").sum())196).collect()[0, "a"] == 9.0197198assert (199pl.scan_parquet(file_path).filter(pl.col("a") > 4).select(pl.col("a").sum())200).collect()[0, "a"] == 10.0201assert pl.scan_parquet(file_path).filter(202(pl.col("a") * 10) > 5.0203).collect().shape == (8, 1)204205206def test_row_index_schema_parquet(parquet_file_path: Path) -> None:207assert (208pl.scan_parquet(str(parquet_file_path), row_index_name="id")209.select(["id", "b"])210.collect()211).dtypes == [pl.UInt32, pl.String]212213214@pytest.mark.may_fail_cloud # reason: inspects logs215@pytest.mark.write_disk216def test_parquet_is_in_statistics(monkeypatch: Any, capfd: Any, tmp_path: Path) -> None:217tmp_path.mkdir(exist_ok=True)218219monkeypatch.setenv("POLARS_VERBOSE", "1")220221df = pl.DataFrame({"idx": pl.arange(0, 100, eager=True)}).with_columns(222(pl.col("idx") // 25).alias("part")223)224df = pl.concat(df.partition_by("part", as_dict=False), rechunk=False)225assert df.n_chunks("all") == [4, 4]226227file_path = tmp_path / "stats.parquet"228df.write_parquet(file_path, statistics=True, use_pyarrow=False)229230file_path = tmp_path / "stats.parquet"231df.write_parquet(file_path, statistics=True, use_pyarrow=False)232233for pred in [234pl.col("idx").is_in([150, 200, 300]),235pl.col("idx").is_in([5, 250, 350]),236]:237result = pl.scan_parquet(file_path).filter(pred).collect()238assert_frame_equal(result, df.filter(pred))239240captured = capfd.readouterr().err241assert "Predicate pushdown: reading 1 / 1 row groups" in captured242assert "Predicate pushdown: reading 0 / 1 row groups" in captured243244245@pytest.mark.may_fail_cloud # reason: inspects logs246@pytest.mark.write_disk247def test_parquet_statistics(monkeypatch: Any, capfd: Any, tmp_path: Path) -> None:248tmp_path.mkdir(exist_ok=True)249250monkeypatch.setenv("POLARS_VERBOSE", "1")251252df = pl.DataFrame({"idx": pl.arange(0, 100, eager=True)}).with_columns(253(pl.col("idx") // 25).alias("part")254)255df = pl.concat(df.partition_by("part", as_dict=False), rechunk=False)256assert df.n_chunks("all") == [4, 4]257258file_path = tmp_path / "stats.parquet"259df.write_parquet(file_path, statistics=True, use_pyarrow=False, row_group_size=50)260261for pred in [262pl.col("idx") < 50,263pl.col("idx") > 50,264pl.col("idx").null_count() != 0,265pl.col("idx").null_count() == 0,266pl.col("idx").min() == pl.col("part").null_count(),267]:268result = pl.scan_parquet(file_path).filter(pred).collect()269assert_frame_equal(result, df.filter(pred))270271captured = capfd.readouterr().err272273assert "Predicate pushdown: reading 1 / 2 row groups" in captured274275276@pytest.mark.write_disk277def test_categorical(tmp_path: Path) -> None:278tmp_path.mkdir(exist_ok=True)279280df = pl.DataFrame(281[282pl.Series("name", ["Bob", "Alice", "Bob"], pl.Categorical),283pl.Series("amount", [100, 200, 300]),284]285)286287file_path = tmp_path / "categorical.parquet"288df.write_parquet(file_path)289290result = (291pl.scan_parquet(file_path)292.group_by("name")293.agg(pl.col("amount").sum())294.collect()295.sort("name")296)297expected = pl.DataFrame(298{"name": ["Alice", "Bob"], "amount": [200, 400]},299schema_overrides={"name": pl.Categorical},300)301assert_frame_equal(result, expected)302303304def test_glob_n_rows(io_files_path: Path) -> None:305file_path = io_files_path / "foods*.parquet"306df = pl.scan_parquet(file_path, n_rows=40).collect()307308# 27 rows from foods1.parquet and 13 from foods2.parquet309assert df.shape == (40, 4)310311# take first and last rows312assert df[[0, 39]].to_dict(as_series=False) == {313"category": ["vegetables", "seafood"],314"calories": [45, 146],315"fats_g": [0.5, 6.0],316"sugars_g": [2, 2],317}318319320@pytest.mark.write_disk321def test_parquet_statistics_filter_9925(tmp_path: Path) -> None:322tmp_path.mkdir(exist_ok=True)323file_path = tmp_path / "codes.parquet"324df = pl.DataFrame({"code": [300964, 300972, 500_000, 26]})325df.write_parquet(file_path, statistics=True)326327q = pl.scan_parquet(file_path).filter(328(pl.col("code").floordiv(100_000)).is_in([0, 3])329)330assert q.collect().to_dict(as_series=False) == {"code": [300964, 300972, 26]}331332333@pytest.mark.write_disk334def test_parquet_statistics_filter_11069(tmp_path: Path) -> None:335tmp_path.mkdir(exist_ok=True)336file_path = tmp_path / "foo.parquet"337pl.DataFrame({"x": [1, None]}).write_parquet(file_path, statistics=False)338339result = pl.scan_parquet(file_path).filter(pl.col("x").is_null()).collect()340expected = {"x": [None]}341assert result.to_dict(as_series=False) == expected342343344def test_parquet_list_arg(io_files_path: Path) -> None:345first = io_files_path / "foods1.parquet"346second = io_files_path / "foods2.parquet"347348df = pl.scan_parquet(source=[first, second]).collect()349assert df.shape == (54, 4)350assert df.row(-1) == ("seafood", 194, 12.0, 1)351assert df.row(0) == ("vegetables", 45, 0.5, 2)352353354@pytest.mark.write_disk355def test_parquet_many_row_groups_12297(tmp_path: Path) -> None:356tmp_path.mkdir(exist_ok=True)357file_path = tmp_path / "foo.parquet"358df = pl.DataFrame({"x": range(100)})359df.write_parquet(file_path, row_group_size=5, use_pyarrow=True)360assert_frame_equal(pl.scan_parquet(file_path).collect(), df)361362363@pytest.mark.write_disk364def test_row_index_empty_file(tmp_path: Path) -> None:365tmp_path.mkdir(exist_ok=True)366file_path = tmp_path / "test.parquet"367df = pl.DataFrame({"a": []}, schema={"a": pl.Float32})368df.write_parquet(file_path)369result = pl.scan_parquet(file_path).with_row_index("idx").collect()370assert result.schema == OrderedDict([("idx", pl.UInt32), ("a", pl.Float32)])371372373@pytest.mark.write_disk374def test_io_struct_async_12500(tmp_path: Path) -> None:375file_path = tmp_path / "test.parquet"376pl.DataFrame(377[378pl.Series("c1", [{"a": "foo", "b": "bar"}], dtype=pl.Struct),379pl.Series("c2", [18]),380]381).write_parquet(file_path)382assert pl.scan_parquet(file_path).select("c1").collect().to_dict(383as_series=False384) == {"c1": [{"a": "foo", "b": "bar"}]}385386387@pytest.mark.write_disk388@pytest.mark.parametrize("streaming", [True, False])389def test_parquet_different_schema(tmp_path: Path, streaming: bool) -> None:390# Schema is different but the projected columns are same dtype.391f1 = tmp_path / "a.parquet"392f2 = tmp_path / "b.parquet"393a = pl.DataFrame({"a": [1.0], "b": "a"})394395b = pl.DataFrame({"a": [1], "b": "a"})396397a.write_parquet(f1)398b.write_parquet(f2)399assert pl.scan_parquet([f1, f2]).select("b").collect(400engine="streaming" if streaming else "in-memory"401).columns == ["b"]402403404@pytest.mark.write_disk405def test_nested_slice_12480(tmp_path: Path) -> None:406path = tmp_path / "data.parquet"407df = pl.select(pl.lit(1).repeat_by(10_000).explode().cast(pl.List(pl.Int32)))408409df.write_parquet(path, use_pyarrow=True, pyarrow_options={"data_page_size": 1})410411assert pl.scan_parquet(path).slice(0, 1).collect().height == 1412413414@pytest.mark.write_disk415def test_scan_deadlock_rayon_spawn_from_async_15172(416monkeypatch: Any, tmp_path: Path417) -> None:418monkeypatch.setenv("POLARS_FORCE_ASYNC", "1")419monkeypatch.setenv("POLARS_MAX_THREADS", "1")420path = tmp_path / "data.parquet"421422df = pl.Series("x", [1]).to_frame()423df.write_parquet(path)424425results = [pl.DataFrame()]426427def scan_collect() -> None:428results[0] = pl.collect_all([pl.scan_parquet(path)])[0]429430# Make sure we don't sit there hanging forever on the broken case431t = Thread(target=scan_collect, daemon=True)432t.start()433t.join(5)434435assert results[0].equals(df)436437438@pytest.mark.write_disk439@pytest.mark.parametrize("streaming", [True, False])440def test_parquet_schema_mismatch_panic_17067(tmp_path: Path, streaming: bool) -> None:441pl.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}).write_parquet(tmp_path / "1.parquet")442pl.DataFrame({"c": [1, 2, 3], "d": [4, 5, 6]}).write_parquet(tmp_path / "2.parquet")443444if streaming:445with pytest.raises(pl.exceptions.SchemaError):446pl.scan_parquet(tmp_path).collect(engine="streaming")447else:448with pytest.raises(pl.exceptions.SchemaError):449pl.scan_parquet(tmp_path).collect(engine="in-memory")450451452@pytest.mark.write_disk453def test_predicate_push_down_categorical_17744(tmp_path: Path) -> None:454path = tmp_path / "1"455456df = pl.DataFrame(457data={458"n": [1, 2, 3],459"ccy": ["USD", "JPY", "EUR"],460},461schema_overrides={"ccy": pl.Categorical("lexical")},462)463df.write_parquet(path)464expect = df.head(1).with_columns(pl.col(pl.Categorical).cast(pl.String))465466lf = pl.scan_parquet(path)467468for predicate in [pl.col("ccy") == "USD", pl.col("ccy").is_in(["USD"])]:469assert_frame_equal(470lf.filter(predicate)471.with_columns(pl.col(pl.Categorical).cast(pl.String))472.collect(),473expect,474)475476477@pytest.mark.write_disk478@pytest.mark.parametrize("streaming", [True, False])479def test_parquet_slice_pushdown_non_zero_offset(480tmp_path: Path, streaming: bool481) -> None:482paths = [tmp_path / "1", tmp_path / "2", tmp_path / "3"]483dfs = [pl.DataFrame({"x": i}) for i in range(len(paths))]484485for df, p in zip(dfs, paths):486df.write_parquet(p)487488# Parquet files containing only the metadata - i.e. the data parts are removed.489# Used to test that a reader doesn't try to read any data.490def trim_to_metadata(path: str | Path) -> None:491path = Path(path)492v = path.read_bytes()493metadata_and_footer_len = 8 + int.from_bytes(v[-8:][:4], "little")494path.write_bytes(v[-metadata_and_footer_len:])495496trim_to_metadata(paths[0])497trim_to_metadata(paths[2])498499# Check baseline:500# * Metadata can be read without error501assert pl.read_parquet_schema(paths[0]) == dfs[0].schema502# * Attempting to read any data will error503with pytest.raises(ComputeError):504pl.scan_parquet(paths[0]).collect(505engine="streaming" if streaming else "in-memory"506)507508df = dfs[1]509assert_frame_equal(510pl.scan_parquet(paths)511.slice(1, 1)512.collect(engine="streaming" if streaming else "in-memory"),513df,514)515assert_frame_equal(516pl.scan_parquet(paths[1:])517.head(1)518.collect(engine="streaming" if streaming else "in-memory"),519df,520)521assert_frame_equal(522(523pl.scan_parquet([paths[1], paths[1], paths[1]])524.with_row_index()525.slice(1, 1)526.collect(engine="streaming" if streaming else "in-memory")527),528df.with_row_index(offset=1),529)530assert_frame_equal(531(532pl.scan_parquet([paths[1], paths[1], paths[1]])533.with_row_index(offset=1)534.slice(1, 1)535.collect(engine="streaming" if streaming else "in-memory")536),537df.with_row_index(offset=2),538)539assert_frame_equal(540pl.scan_parquet(paths[1:])541.head(1)542.collect(engine="streaming" if streaming else "in-memory"),543df,544)545546# Negative slice unsupported in streaming547if not streaming:548assert_frame_equal(pl.scan_parquet(paths).slice(-2, 1).collect(), df)549assert_frame_equal(pl.scan_parquet(paths[:2]).tail(1).collect(), df)550assert_frame_equal(551pl.scan_parquet(paths[1:]).slice(-99, 1).collect(), df.clear()552)553554path = tmp_path / "data"555df = pl.select(x=pl.int_range(0, 50))556df.write_parquet(path)557assert_frame_equal(pl.scan_parquet(path).slice(-100, 75).collect(), df.head(25))558assert_frame_equal(559pl.scan_parquet([path, path]).with_row_index().slice(-25, 100).collect(),560pl.concat([df, df]).with_row_index().slice(75),561)562assert_frame_equal(563pl.scan_parquet([path, path])564.with_row_index(offset=10)565.slice(-25, 100)566.collect(),567pl.concat([df, df]).with_row_index(offset=10).slice(75),568)569assert_frame_equal(570pl.scan_parquet(path).slice(-1, (1 << 32) - 1).collect(), df.tail(1)571)572573574@pytest.mark.write_disk575def test_predicate_slice_pushdown_row_index_20485(tmp_path: Path) -> None:576tmp_path.mkdir(exist_ok=True)577578file_path = tmp_path / "slice_pushdown.parquet"579row_group_size = 100000580num_row_groups = 3581582df = pl.select(ref=pl.int_range(num_row_groups * row_group_size))583df.write_parquet(file_path, row_group_size=row_group_size)584585# Use a slice that starts near the end of one row group and extends into the next586# to test handling of slices that span multiple row groups.587slice_start = 199995588slice_len = 10589ldf = pl.scan_parquet(file_path)590sliced_df = ldf.with_row_index().slice(slice_start, slice_len).collect()591sliced_df_no_pushdown = (592ldf.with_row_index()593.slice(slice_start, slice_len)594.collect(optimizations=pl.QueryOptFlags(slice_pushdown=False))595)596597expected_index = list(range(slice_start, slice_start + slice_len))598actual_index = list(sliced_df["index"])599assert actual_index == expected_index600601assert_frame_equal(sliced_df, sliced_df_no_pushdown)602603604@pytest.mark.write_disk605@pytest.mark.parametrize("streaming", [True, False])606def test_parquet_row_groups_shift_bug_18739(tmp_path: Path, streaming: bool) -> None:607tmp_path.mkdir(exist_ok=True)608path = tmp_path / "data.bin"609610df = pl.DataFrame({"id": range(100)})611df.write_parquet(path, row_group_size=1)612613lf = pl.scan_parquet(path)614assert_frame_equal(df, lf.collect(engine="streaming" if streaming else "in-memory"))615616617@pytest.mark.write_disk618@pytest.mark.parametrize("streaming", [True, False])619def test_dsl2ir_cached_metadata(tmp_path: Path, streaming: bool) -> None:620df = pl.DataFrame({"x": 1})621path = tmp_path / "1"622df.write_parquet(path)623624lf = pl.scan_parquet(path)625assert_frame_equal(lf.collect(), df)626627# Removes the metadata portion of the parquet file.628# Used to test that a reader doesn't try to read the metadata.629def remove_metadata(path: str | Path) -> None:630path = Path(path)631v = path.read_bytes()632metadata_and_footer_len = 8 + int.from_bytes(v[-8:][:4], "little")633path.write_bytes(v[:-metadata_and_footer_len] + b"PAR1")634635remove_metadata(path)636assert_frame_equal(lf.collect(engine="streaming" if streaming else "in-memory"), df)637638639@pytest.mark.write_disk640def test_parquet_unaligned_schema_read(tmp_path: Path) -> None:641dfs = [642pl.DataFrame({"a": 1, "b": 10}),643pl.DataFrame({"b": 11, "a": 2}),644pl.DataFrame({"x": 3, "a": 3, "y": 3, "b": 12}),645]646647paths = [tmp_path / "1", tmp_path / "2", tmp_path / "3"]648649for df, path in zip(dfs, paths):650df.write_parquet(path)651652lf = pl.scan_parquet(paths, extra_columns="ignore")653654assert_frame_equal(655lf.select("a").collect(engine="in-memory"),656pl.DataFrame({"a": [1, 2, 3]}),657)658659assert_frame_equal(660lf.with_row_index().select("a").collect(engine="in-memory"),661pl.DataFrame({"a": [1, 2, 3]}),662)663664assert_frame_equal(665lf.select("b", "a").collect(engine="in-memory"),666pl.DataFrame({"b": [10, 11, 12], "a": [1, 2, 3]}),667)668669assert_frame_equal(670pl.scan_parquet(paths[:2]).collect(engine="in-memory"),671pl.DataFrame({"a": [1, 2], "b": [10, 11]}),672)673674lf = pl.scan_parquet(paths, extra_columns="raise")675676with pytest.raises(pl.exceptions.SchemaError):677lf.collect(engine="in-memory")678679with pytest.raises(pl.exceptions.SchemaError):680lf.with_row_index().collect(engine="in-memory")681682683@pytest.mark.write_disk684@pytest.mark.parametrize("streaming", [True, False])685def test_parquet_unaligned_schema_read_dtype_mismatch(686tmp_path: Path, streaming: bool687) -> None:688dfs = [689pl.DataFrame({"a": 1, "b": 10}),690pl.DataFrame({"b": "11", "a": "2"}),691]692693paths = [tmp_path / "1", tmp_path / "2"]694695for df, path in zip(dfs, paths):696df.write_parquet(path)697698lf = pl.scan_parquet(paths)699700with pytest.raises(pl.exceptions.SchemaError, match="data type mismatch"):701lf.collect(engine="streaming" if streaming else "in-memory")702703704@pytest.mark.write_disk705@pytest.mark.parametrize("streaming", [True, False])706def test_parquet_unaligned_schema_read_missing_cols_from_first(707tmp_path: Path, streaming: bool708) -> None:709dfs = [710pl.DataFrame({"a": 1, "b": 10}),711pl.DataFrame({"b": 11}),712]713714paths = [tmp_path / "1", tmp_path / "2"]715716for df, path in zip(dfs, paths):717df.write_parquet(path)718719lf = pl.scan_parquet(paths)720721with pytest.raises(722(pl.exceptions.SchemaError, pl.exceptions.ColumnNotFoundError),723):724lf.collect(engine="streaming" if streaming else "in-memory")725726727@pytest.mark.parametrize("parallel", ["columns", "row_groups", "prefiltered", "none"])728@pytest.mark.parametrize("streaming", [True, False])729@pytest.mark.write_disk730def test_parquet_schema_arg(731tmp_path: Path,732parallel: ParallelStrategy,733streaming: bool,734) -> None:735tmp_path.mkdir(exist_ok=True)736dfs = [pl.DataFrame({"a": 1, "b": 1}), pl.DataFrame({"a": 2, "b": 2})]737paths = [tmp_path / "1", tmp_path / "2"]738739for df, path in zip(dfs, paths):740df.write_parquet(path)741742schema: dict[str, pl.DataType] = {743"1": pl.Datetime(time_unit="ms", time_zone="CET"),744"a": pl.Int64(),745"b": pl.Int64(),746}747748# Test `schema` containing an extra column.749750lf = pl.scan_parquet(paths, parallel=parallel, schema=schema)751752with pytest.raises((pl.exceptions.SchemaError, pl.exceptions.ColumnNotFoundError)):753lf.collect(engine="streaming" if streaming else "in-memory")754755lf = pl.scan_parquet(756paths, parallel=parallel, schema=schema, missing_columns="insert"757)758759assert_frame_equal(760lf.collect(engine="streaming" if streaming else "in-memory"),761pl.DataFrame({"1": None, "a": [1, 2], "b": [1, 2]}, schema=schema),762)763764# Just one test that `read_parquet` is propagating this argument.765assert_frame_equal(766pl.read_parquet(767paths, parallel=parallel, schema=schema, missing_columns="insert"768),769pl.DataFrame({"1": None, "a": [1, 2], "b": [1, 2]}, schema=schema),770)771772# Issue #19081: If a schema arg is passed, ensure its fields are propagated773# to the IR, otherwise even if `missing_columns='insert'`, downstream774# `select()`s etc. will fail with ColumnNotFound if the column is not in775# the first file.776lf = pl.scan_parquet(777paths, parallel=parallel, schema=schema, missing_columns="insert"778).select("1")779780s = lf.collect(engine="streaming" if streaming else "in-memory").to_series()781assert s.len() == 2782assert s.null_count() == 2783784# Test files containing extra columns not in `schema`785786schema: dict[str, type[pl.DataType]] = {"a": pl.Int64} # type: ignore[no-redef]787788for missing_columns in ["insert", "raise"]:789lf = pl.scan_parquet(790paths,791parallel=parallel,792schema=schema,793missing_columns=missing_columns, # type: ignore[arg-type]794)795796with pytest.raises(pl.exceptions.SchemaError):797lf.collect(engine="streaming" if streaming else "in-memory")798799lf = pl.scan_parquet(800paths,801parallel=parallel,802schema=schema,803extra_columns="ignore",804).select("a")805806assert_frame_equal(807lf.collect(engine="in-memory"),808pl.DataFrame({"a": [1, 2]}, schema=schema),809)810811schema: dict[str, type[pl.DataType]] = {"a": pl.Int64, "b": pl.Int8} # type: ignore[no-redef]812813lf = pl.scan_parquet(paths, parallel=parallel, schema=schema)814815with pytest.raises(816pl.exceptions.SchemaError,817match="data type mismatch for column b: incoming: Int64 != target: Int8",818):819lf.collect(engine="streaming" if streaming else "in-memory")820821822def test_scan_parquet_empty_path_expansion(tmp_path: Path) -> None:823tmp_path.mkdir(exist_ok=True)824825with pytest.raises(826ComputeError,827match=r"failed to retrieve first file schema \(parquet\): "828r"expanded paths were empty \(path expansion input: "829".*Hint: passing a schema can allow this scan to succeed with an empty DataFrame",830):831pl.scan_parquet(tmp_path).collect()832833# Scan succeeds when schema is provided834assert_frame_equal(835pl.scan_parquet(tmp_path, schema={"x": pl.Int64}).collect(),836pl.DataFrame(schema={"x": pl.Int64}),837)838839assert_frame_equal(840pl.scan_parquet(tmp_path, schema={"x": pl.Int64}).with_row_index().collect(),841pl.DataFrame(schema={"x": pl.Int64}).with_row_index(),842)843844assert_frame_equal(845pl.scan_parquet(846tmp_path, schema={"x": pl.Int64}, hive_schema={"h": pl.String}847).collect(),848pl.DataFrame(schema={"x": pl.Int64, "h": pl.String}),849)850851assert_frame_equal(852(853pl.scan_parquet(854tmp_path, schema={"x": pl.Int64}, hive_schema={"h": pl.String}855)856.with_row_index()857.collect()858),859pl.DataFrame(schema={"x": pl.Int64, "h": pl.String}).with_row_index(),860)861862863@pytest.mark.parametrize("missing_columns", ["insert", "raise"])864@pytest.mark.write_disk865def test_scan_parquet_ignores_dtype_mismatch_for_non_projected_columns_19249(866tmp_path: Path,867missing_columns: str,868) -> None:869tmp_path.mkdir(exist_ok=True)870paths = [tmp_path / "1", tmp_path / "2"]871872pl.DataFrame({"a": 1, "b": 1}, schema={"a": pl.Int32, "b": pl.UInt8}).write_parquet(873paths[0]874)875pl.DataFrame(876{"a": 1, "b": 1}, schema={"a": pl.Int32, "b": pl.UInt64}877).write_parquet(paths[1])878879assert_frame_equal(880pl.scan_parquet(paths, missing_columns=missing_columns) # type: ignore[arg-type]881.select("a")882.collect(engine="in-memory"),883pl.DataFrame({"a": [1, 1]}, schema={"a": pl.Int32}),884)885886887@pytest.mark.parametrize("streaming", [True, False])888@pytest.mark.write_disk889def test_scan_parquet_streaming_row_index_19606(890tmp_path: Path, streaming: bool891) -> None:892tmp_path.mkdir(exist_ok=True)893paths = [tmp_path / "1", tmp_path / "2"]894895dfs = [pl.DataFrame({"x": i}) for i in range(len(paths))]896897for df, p in zip(dfs, paths):898df.write_parquet(p)899900assert_frame_equal(901pl.scan_parquet(tmp_path)902.with_row_index()903.collect(engine="streaming" if streaming else "in-memory"),904pl.DataFrame(905{"index": [0, 1], "x": [0, 1]}, schema={"index": pl.UInt32, "x": pl.Int64}906),907)908909910def test_scan_parquet_prefilter_panic_22452() -> None:911# This is, the easiest way to control the threadpool size so that it is stable.912out = subprocess.check_output(913[914sys.executable,915"-c",916"""\917import os918919os.environ["POLARS_MAX_THREADS"] = "2"920921import io922923import polars as pl924from polars.testing import assert_frame_equal925926assert pl.thread_pool_size() == 2927928f = io.BytesIO()929930df = pl.DataFrame({x: 1 for x in ["a", "b", "c", "d", "e"]})931df.write_parquet(f)932f.seek(0)933934assert_frame_equal(935pl.scan_parquet(f, parallel="prefiltered")936.filter(pl.col(c) == 1 for c in ["a", "b", "c"])937.collect(),938df,939)940941print("OK", end="")942""",943],944)945946assert out == b"OK"947948949def test_scan_parquet_in_mem_to_streaming_dispatch_deadlock_22641() -> None:950out = subprocess.check_output(951[952sys.executable,953"-c",954"""\955import os956957os.environ["POLARS_MAX_THREADS"] = "1"958os.environ["POLARS_VERBOSE"] = "1"959960import io961import sys962from threading import Thread963964import polars as pl965966assert pl.thread_pool_size() == 1967968f = io.BytesIO()969pl.DataFrame({"x": 1}).write_parquet(f)970971q = (972pl.scan_parquet(f)973.filter(pl.sum_horizontal(pl.col("x"), pl.col("x"), pl.col("x")) >= 0)974.join(pl.scan_parquet(f), on="x", how="left")975)976977results = [978pl.DataFrame(),979pl.DataFrame(),980pl.DataFrame(),981pl.DataFrame(),982pl.DataFrame(),983]984985986def run():987# Also test just a single scan988pl.scan_parquet(f).collect()989990print("QUERY-FENCE", file=sys.stderr)991992results[0] = q.collect()993994print("QUERY-FENCE", file=sys.stderr)995996results[1] = pl.concat([q, q, q]).collect().head(1)997998print("QUERY-FENCE", file=sys.stderr)9991000results[2] = pl.collect_all([q, q, q])[0]10011002print("QUERY-FENCE", file=sys.stderr)10031004results[3] = pl.collect_all(3 * [pl.concat(3 * [q])])[0].head(1)10051006print("QUERY-FENCE", file=sys.stderr)10071008results[4] = q.collect(background=True).fetch_blocking()100910101011t = Thread(target=run, daemon=True)1012t.start()1013t.join(5)10141015assert [x.equals(pl.DataFrame({"x": 1})) for x in results] == [1016True,1017True,1018True,1019True,1020True,1021]10221023print("OK", end="", file=sys.stderr)1024""",1025],1026stderr=subprocess.STDOUT,1027)10281029assert out.endswith(b"OK")10301031def ensure_caches_dropped(verbose_log: str) -> None:1032cache_hit_prefix = "CACHE HIT: cache id: "10331034ids_hit = {1035x[len(cache_hit_prefix) :]1036for x in verbose_log.splitlines()1037if x.startswith(cache_hit_prefix)1038}10391040cache_drop_prefix = "CACHE DROP: cache id: "10411042ids_dropped = {1043x[len(cache_drop_prefix) :]1044for x in verbose_log.splitlines()1045if x.startswith(cache_drop_prefix)1046}10471048assert ids_hit == ids_dropped10491050out_str = out.decode()10511052for logs in out_str.split("QUERY-FENCE"):1053ensure_caches_dropped(logs)105410551056def test_parquet_prefiltering_inserted_column_23268() -> None:1057df = pl.DataFrame({"a": [1, 2, 3, 4]}, schema={"a": pl.Int8})10581059f = io.BytesIO()1060df.write_parquet(f)10611062f.seek(0)1063assert_frame_equal(1064(1065pl.scan_parquet(1066f,1067schema={"a": pl.Int8, "b": pl.Int16},1068missing_columns="insert",1069)1070.filter(pl.col("a") == 3)1071.filter(pl.col("b") == 3)1072.collect()1073),1074pl.DataFrame(schema={"a": pl.Int8, "b": pl.Int16}),1075)107610771078@pytest.mark.may_fail_cloud # reason: inspects logs1079def test_scan_parquet_prefilter_with_cast(1080monkeypatch: pytest.MonkeyPatch,1081capfd: pytest.CaptureFixture[str],1082) -> None:1083f = io.BytesIO()10841085df = pl.DataFrame(1086{1087"a": ["A", "B", "C", "D", "E", "F"],1088"b": pl.Series([1, 1, 1, 1, 0, 1], dtype=pl.UInt8),1089}1090)10911092df.write_parquet(f, row_group_size=3)10931094md = pq.read_metadata(f)10951096assert [md.row_group(i).num_rows for i in range(md.num_row_groups)] == [3, 3]10971098q = pl.scan_parquet(1099f,1100schema={"a": pl.String, "b": pl.Int16},1101cast_options=pl.ScanCastOptions(integer_cast="upcast"),1102include_file_paths="file_path",1103).filter(pl.col("b") - 1 == pl.lit(-1, dtype=pl.Int16))11041105with monkeypatch.context() as cx:1106cx.setenv("POLARS_VERBOSE", "1")1107capfd.readouterr()1108out = q.collect()1109capture = capfd.readouterr().err11101111assert (1112"[ParquetFileReader]: Pre-filtered decode enabled (1 live, 1 non-live)"1113in capture1114)1115assert (1116"[ParquetFileReader]: Predicate pushdown: reading 1 / 2 row groups" in capture1117)11181119assert_frame_equal(1120out,1121pl.DataFrame(1122{1123"a": "E",1124"b": pl.Series([0], dtype=pl.Int16),1125"file_path": "in-mem",1126}1127),1128)112911301131def test_prefilter_with_n_rows_23790() -> None:1132df = pl.DataFrame(1133{1134"a": ["A", "B", "C", "D", "E", "F"],1135"b": [1, 2, 3, 4, 5, 6],1136}1137)11381139f = io.BytesIO()11401141df.write_parquet(f, row_group_size=2)11421143f.seek(0)11441145md = pq.read_metadata(f)11461147assert [md.row_group(i).num_rows for i in range(md.num_row_groups)] == [2, 2, 2]11481149f.seek(0)1150q = pl.scan_parquet(f, n_rows=3).filter(pl.col("b").is_in([1, 3]))11511152assert_frame_equal(q.collect(), pl.DataFrame({"a": ["A", "C"], "b": [1, 3]}))11531154# With row index / file_path11551156df = pl.DataFrame(1157{1158"a": ["A", "B", "C", "D", "E", "F"],1159"b": [1, 2, 3, 4, 5, 6],1160}1161)11621163f = io.BytesIO()11641165df.write_parquet(f, row_group_size=2)11661167f.seek(0)1168md = pq.read_metadata(f)11691170assert [md.row_group(i).num_rows for i in range(md.num_row_groups)] == [2, 2, 2]11711172f.seek(0)1173q = pl.scan_parquet(1174f,1175n_rows=3,1176row_index_name="index",1177include_file_paths="file_path",1178).filter(pl.col("b").is_in([1, 3]))11791180assert_frame_equal(1181q.collect(),1182pl.DataFrame(1183{1184"index": pl.Series([0, 2], dtype=pl.get_index_type()),1185"a": ["A", "C"],1186"b": [1, 3],1187"file_path": "in-mem",1188}1189),1190)119111921193def test_scan_parquet_filter_index_panic_23849(monkeypatch: pytest.MonkeyPatch) -> None:1194monkeypatch.setenv("POLARS_PARQUET_DECODE_TARGET_VALUES_PER_THREAD", "5")1195num_rows = 31196num_cols = 511971198f = io.BytesIO()11991200pl.select(1201pl.int_range(0, num_rows).alias(f"col_{i}") for i in range(num_cols)1202).write_parquet(f)12031204for parallel in ["auto", "columns", "row_groups", "prefiltered", "none"]:1205pl.scan_parquet(f, parallel=parallel).filter( # type: ignore[arg-type]1206pl.col("col_0").ge(0) & pl.col("col_0").lt(num_rows + 1)1207).collect()120812091210