Path: blob/main/py-polars/tests/unit/io/test_multiscan.py
8422 views
from __future__ import annotations12import io3import re4import sys5from functools import partial6from typing import IO, TYPE_CHECKING, Any78import pyarrow.parquet as pq9import pytest10from hypothesis import given11from hypothesis import strategies as st1213import polars as pl14from polars.meta.index_type import get_index_type15from polars.testing import assert_frame_equal16from tests.unit.io.conftest import normalize_path_separator_pl1718if TYPE_CHECKING:19from collections.abc import Callable20from pathlib import Path2122from tests.conftest import PlMonkeyPatch2324SCAN_AND_WRITE_FUNCS = [25(pl.scan_ipc, pl.DataFrame.write_ipc),26(pl.scan_parquet, pl.DataFrame.write_parquet),27(pl.scan_csv, pl.DataFrame.write_csv),28(pl.scan_ndjson, pl.DataFrame.write_ndjson),29]303132@pytest.mark.write_disk33@pytest.mark.parametrize(("scan", "write"), SCAN_AND_WRITE_FUNCS)34def test_include_file_paths(tmp_path: Path, scan: Any, write: Any) -> None:35a_path = tmp_path / "a"36b_path = tmp_path / "b"3738write(pl.DataFrame({"a": [5, 10]}), a_path)39write(pl.DataFrame({"a": [1996]}), b_path)4041out = scan([a_path, b_path], include_file_paths="f")4243assert_frame_equal(44out.collect(),45pl.DataFrame(46{47"a": [5, 10, 1996],48"f": [str(a_path), str(a_path), str(b_path)],49}50).with_columns(normalize_path_separator_pl(pl.col("f"))),51)525354@pytest.mark.parametrize(55("scan", "write", "ext", "supports_missing_columns", "supports_hive_partitioning"),56[57(pl.scan_ipc, pl.DataFrame.write_ipc, "ipc", False, True),58(pl.scan_parquet, pl.DataFrame.write_parquet, "parquet", True, True),59(pl.scan_csv, pl.DataFrame.write_csv, "csv", False, False),60(pl.scan_ndjson, pl.DataFrame.write_ndjson, "jsonl", False, False),61],62)63@pytest.mark.parametrize("missing_column", [False, True])64@pytest.mark.parametrize("row_index", [False, True])65@pytest.mark.parametrize("include_file_paths", [False, True])66@pytest.mark.parametrize("hive", [False, True])67@pytest.mark.parametrize("col", [False, True])68@pytest.mark.write_disk69def test_multiscan_projection(70tmp_path: Path,71scan: Callable[..., pl.LazyFrame],72write: Callable[[pl.DataFrame, Path], Any],73ext: str,74supports_missing_columns: bool,75supports_hive_partitioning: bool,76missing_column: bool,77row_index: bool,78include_file_paths: bool,79hive: bool,80col: bool,81) -> None:82a = pl.DataFrame({"col": [5, 10, 1996]})83b = pl.DataFrame({"col": [13, 37]})8485if missing_column and supports_missing_columns:86a = a.with_columns(missing=pl.Series([420, 2000, 9]))8788a_path: Path89b_path: Path90multiscan_path: Path9192if hive and supports_hive_partitioning:93(tmp_path / "hive_col=0").mkdir()94a_path = tmp_path / "hive_col=0" / f"a.{ext}"95(tmp_path / "hive_col=1").mkdir()96b_path = tmp_path / "hive_col=1" / f"b.{ext}"9798multiscan_path = tmp_path99100else:101a_path = tmp_path / f"a.{ext}"102b_path = tmp_path / f"b.{ext}"103104multiscan_path = tmp_path / f"*.{ext}"105106write(a, a_path)107write(b, b_path)108109base_projection = []110if missing_column and supports_missing_columns:111base_projection += ["missing"]112if row_index:113base_projection += ["row_index"]114if include_file_paths:115base_projection += ["file_path"]116if hive and supports_hive_partitioning:117base_projection += ["hive_col"]118if col:119base_projection += ["col"]120121ifp = "file_path" if include_file_paths else None122ri = "row_index" if row_index else None123124args = {125"missing_columns": "insert" if missing_column else "raise",126"include_file_paths": ifp,127"row_index_name": ri,128"hive_partitioning": hive,129}130131if not supports_missing_columns:132del args["missing_columns"]133if not supports_hive_partitioning:134del args["hive_partitioning"]135136for projection in [137base_projection,138base_projection[::-1],139]:140assert_frame_equal(141scan(multiscan_path, **args).collect(engine="streaming").select(projection),142scan(multiscan_path, **args).select(projection).collect(engine="streaming"),143)144145for remove in range(len(base_projection)):146new_projection = base_projection.copy()147new_projection.pop(remove)148149for projection in [150new_projection,151new_projection[::-1],152]:153assert_frame_equal(154scan(multiscan_path, **args)155.collect(engine="streaming")156.select(projection),157scan(multiscan_path, **args)158.select(projection)159.collect(engine="streaming"),160)161162163@pytest.mark.parametrize(164("scan", "write", "ext"),165[166(pl.scan_ipc, pl.DataFrame.write_ipc, "ipc"),167(pl.scan_parquet, pl.DataFrame.write_parquet, "parquet"),168],169)170@pytest.mark.write_disk171def test_multiscan_hive_predicate(172tmp_path: Path,173scan: Callable[..., pl.LazyFrame],174write: Callable[[pl.DataFrame, Path], Any],175ext: str,176) -> None:177a = pl.DataFrame({"col": [5, 10, 1996]})178b = pl.DataFrame({"col": [13, 37]})179c = pl.DataFrame({"col": [3, 5, 2024]})180181(tmp_path / "hive_col=0").mkdir()182a_path = tmp_path / "hive_col=0" / f"0.{ext}"183(tmp_path / "hive_col=1").mkdir()184b_path = tmp_path / "hive_col=1" / f"0.{ext}"185(tmp_path / "hive_col=2").mkdir()186c_path = tmp_path / "hive_col=2" / f"0.{ext}"187188multiscan_path = tmp_path189190write(a, a_path)191write(b, b_path)192write(c, c_path)193194full = scan(multiscan_path).collect(engine="streaming")195full_ri = full.with_row_index("ri", 42)196197last_pred = None198try:199for pred in [200pl.col.hive_col == 0,201pl.col.hive_col == 1,202pl.col.hive_col == 2,203pl.col.hive_col < 2,204pl.col.hive_col > 0,205pl.col.hive_col != 1,206pl.col.hive_col != 3,207pl.col.col == 13,208pl.col.col != 13,209(pl.col.col != 13) & (pl.col.hive_col == 1),210(pl.col.col != 13) & (pl.col.hive_col != 1),211]:212last_pred = pred213assert_frame_equal(214full.filter(pred),215scan(multiscan_path).filter(pred).collect(engine="streaming"),216)217218assert_frame_equal(219full_ri.filter(pred),220scan(multiscan_path)221.with_row_index("ri", 42)222.filter(pred)223.collect(engine="streaming"),224)225except Exception as _:226print(last_pred)227raise228229230@pytest.mark.parametrize(("scan", "write"), SCAN_AND_WRITE_FUNCS)231@pytest.mark.write_disk232def test_multiscan_row_index(233tmp_path: Path,234scan: Callable[..., pl.LazyFrame],235write: Callable[[pl.DataFrame, Path], Any],236) -> None:237a = pl.DataFrame({"col": [5, 10, 1996]})238b = pl.DataFrame({"col": [42]})239c = pl.DataFrame({"col": [13, 37]})240241write(a, tmp_path / "a")242write(b, tmp_path / "b")243write(c, tmp_path / "c")244245col = pl.concat([a, b, c]).to_series()246g = tmp_path / "*"247248assert_frame_equal(249scan(g, row_index_name="ri").collect(),250pl.DataFrame(251[252pl.Series("ri", range(6), get_index_type()),253col,254]255),256)257258start = 42259assert_frame_equal(260scan(g, row_index_name="ri", row_index_offset=start).collect(),261pl.DataFrame(262[263pl.Series("ri", range(start, start + 6), get_index_type()),264col,265]266),267)268269start = 42270assert_frame_equal(271scan(g, row_index_name="ri", row_index_offset=start).slice(3, 3).collect(),272pl.DataFrame(273[274pl.Series("ri", range(start + 3, start + 6), get_index_type()),275col.slice(3, 3),276]277),278)279280start = 42281assert_frame_equal(282scan(g, row_index_name="ri", row_index_offset=start)283.filter(pl.col("col") < 15)284.collect(),285pl.DataFrame(286[287pl.Series("ri", [start + 0, start + 1, start + 4], get_index_type()),288pl.Series("col", [5, 10, 13]),289]290),291)292293with pytest.raises(294pl.exceptions.DuplicateError, match="duplicate column name index"295):296scan(g).with_row_index().with_row_index().collect()297298assert_frame_equal(299scan(g)300.with_row_index()301.with_row_index("index_1", offset=1)302.with_row_index("index_2", offset=2)303.collect(),304pl.DataFrame(305[306pl.Series("index_2", [2, 3, 4, 5, 6, 7], get_index_type()),307pl.Series("index_1", [1, 2, 3, 4, 5, 6], get_index_type()),308pl.Series("index", [0, 1, 2, 3, 4, 5], get_index_type()),309col,310]311),312)313314315@pytest.mark.parametrize(316("scan", "write", "ext"),317[318(pl.scan_ipc, pl.DataFrame.write_ipc, "ipc"),319(pl.scan_parquet, pl.DataFrame.write_parquet, "parquet"),320pytest.param(321pl.scan_csv,322pl.DataFrame.write_csv,323"csv",324marks=pytest.mark.xfail(325reason="See https://github.com/pola-rs/polars/issues/21211"326),327),328(pl.scan_ndjson, pl.DataFrame.write_ndjson, "jsonl"),329],330)331@pytest.mark.write_disk332def test_schema_mismatch_type_mismatch(333tmp_path: Path,334scan: Callable[..., pl.LazyFrame],335write: Callable[[pl.DataFrame, Path], Any],336ext: str,337) -> None:338a = pl.DataFrame({"xyz_col": [5, 10, 1996]})339b = pl.DataFrame({"xyz_col": ["a", "b", "c"]})340341a_path = tmp_path / f"a.{ext}"342b_path = tmp_path / f"b.{ext}"343344multiscan_path = tmp_path / f"*.{ext}"345346write(a, a_path)347write(b, b_path)348349q = scan(multiscan_path)350351# NDJSON will just parse according to `projected_schema`352cx = (353pytest.raises(354pl.exceptions.ComputeError,355match=re.escape("cannot parse 'a' (string) as Int64"),356)357if scan is pl.scan_ndjson358else pytest.raises(359pl.exceptions.SchemaError, # type: ignore[arg-type]360match=(361"data type mismatch for column xyz_col: "362"incoming: String != target: Int64"363),364)365)366367with cx:368q.collect(engine="streaming")369370371@pytest.mark.parametrize(372("scan", "write", "ext"),373[374# (pl.scan_parquet, pl.DataFrame.write_parquet, "parquet"), # TODO: _375# (pl.scan_ipc, pl.DataFrame.write_ipc, "ipc"), # TODO: _376pytest.param(377pl.scan_csv,378pl.DataFrame.write_csv,379"csv",380marks=pytest.mark.xfail(381reason="See https://github.com/pola-rs/polars/issues/21211"382),383),384# (pl.scan_ndjson, pl.DataFrame.write_ndjson, "jsonl"), # TODO: _385],386)387@pytest.mark.write_disk388def test_schema_mismatch_order_mismatch(389tmp_path: Path,390scan: Callable[..., pl.LazyFrame],391write: Callable[[pl.DataFrame, Path], Any],392ext: str,393) -> None:394a = pl.DataFrame({"x": [5, 10, 1996], "y": ["a", "b", "c"]})395b = pl.DataFrame({"y": ["x", "y"], "x": [1, 2]})396397a_path = tmp_path / f"a.{ext}"398b_path = tmp_path / f"b.{ext}"399400multiscan_path = tmp_path / f"*.{ext}"401402write(a, a_path)403write(b, b_path)404405q = scan(multiscan_path)406407with pytest.raises(pl.exceptions.SchemaError):408q.collect(engine="streaming")409410411@pytest.mark.parametrize(("scan", "write"), SCAN_AND_WRITE_FUNCS)412def test_multiscan_head(413scan: Callable[..., pl.LazyFrame],414write: Callable[[pl.DataFrame, io.BytesIO | Path], Any],415) -> None:416a = io.BytesIO()417b = io.BytesIO()418for f in [a, b]:419write(pl.Series("c1", range(10)).to_frame(), f)420f.seek(0)421422assert_frame_equal(423scan([a, b]).head(5).collect(engine="streaming"),424pl.Series("c1", range(5)).to_frame(),425)426427428@pytest.mark.parametrize(429("scan", "write"),430[431(pl.scan_ipc, pl.DataFrame.write_ipc),432(pl.scan_parquet, pl.DataFrame.write_parquet),433(pl.scan_ndjson, pl.DataFrame.write_ndjson),434(435pl.scan_csv,436pl.DataFrame.write_csv,437),438],439)440def test_multiscan_tail(441scan: Callable[..., pl.LazyFrame],442write: Callable[[pl.DataFrame, io.BytesIO | Path], Any],443) -> None:444a = io.BytesIO()445b = io.BytesIO()446for f in [a, b]:447write(pl.Series("c1", range(10)).to_frame(), f)448f.seek(0)449450assert_frame_equal(451scan([a, b]).tail(5).collect(engine="streaming"),452pl.Series("c1", range(5, 10)).to_frame(),453)454455456@pytest.mark.parametrize(("scan", "write"), SCAN_AND_WRITE_FUNCS)457def test_multiscan_slice_middle(458scan: Callable[..., pl.LazyFrame],459write: Callable[[pl.DataFrame, io.BytesIO | Path], Any],460) -> None:461fs = [io.BytesIO() for _ in range(13)]462for f in fs:463write(pl.Series("c1", range(7)).to_frame(), f)464f.seek(0)465466offset = 5 * 7 - 5467expected = (468list(range(2, 7)) # fs[4]469+ list(range(7)) # fs[5]470+ list(range(5)) # fs[6]471)472expected_series = [pl.Series("c1", expected)]473ri_expected_series = [474pl.Series("ri", range(offset, offset + 17), get_index_type())475] + expected_series476477assert_frame_equal(478scan(fs).slice(offset, 17).collect(engine="streaming"),479pl.DataFrame(expected_series),480)481assert_frame_equal(482scan(fs, row_index_name="ri").slice(offset, 17).collect(engine="streaming"),483pl.DataFrame(ri_expected_series),484)485486# Negative slices487offset = -(13 * 7 - offset)488assert_frame_equal(489scan(fs).slice(offset, 17).collect(engine="streaming"),490pl.DataFrame(expected_series),491)492assert_frame_equal(493scan(fs, row_index_name="ri").slice(offset, 17).collect(engine="streaming"),494pl.DataFrame(ri_expected_series),495)496497498@pytest.mark.parametrize(("scan", "write"), SCAN_AND_WRITE_FUNCS)499@given(offset=st.integers(-100, 100), length=st.integers(0, 101))500def test_multiscan_slice_parametric(501scan: Callable[..., pl.LazyFrame],502write: Callable[[pl.DataFrame, io.BytesIO | Path], Any],503offset: int,504length: int,505) -> None:506ref = io.BytesIO()507write(pl.Series("c1", [i % 7 for i in range(13 * 7)]).to_frame(), ref)508ref.seek(0)509510fs = [io.BytesIO() for _ in range(13)]511for f in fs:512write(pl.Series("c1", range(7)).to_frame(), f)513f.seek(0)514515assert_frame_equal(516scan(ref).slice(offset, length).collect(),517scan(fs).slice(offset, length).collect(engine="streaming"),518)519520ref.seek(0)521for f in fs:522f.seek(0)523524assert_frame_equal(525scan(ref, row_index_name="ri", row_index_offset=42)526.slice(offset, length)527.collect(),528scan(fs, row_index_name="ri", row_index_offset=42)529.slice(offset, length)530.collect(engine="streaming"),531)532533assert_frame_equal(534scan(ref, row_index_name="ri", row_index_offset=42)535.slice(offset, length)536.select("ri")537.collect(),538scan(fs, row_index_name="ri", row_index_offset=42)539.slice(offset, length)540.select("ri")541.collect(engine="streaming"),542)543544545@pytest.mark.parametrize(("scan", "write"), SCAN_AND_WRITE_FUNCS)546def test_many_files(scan: Any, write: Any) -> None:547f = io.BytesIO()548write(pl.DataFrame({"a": [5, 10, 1996]}), f)549bs = f.getvalue()550551out = scan([bs] * 1023)552553assert_frame_equal(554out.collect(),555pl.DataFrame(556{557"a": [5, 10, 1996] * 1023,558}559),560)561562563def test_deadlock_stop_requested(plmonkeypatch: PlMonkeyPatch) -> None:564df = pl.DataFrame(565{566"a": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],567}568)569570f = io.BytesIO()571df.write_parquet(f, row_group_size=1)572573plmonkeypatch.setenv("POLARS_MAX_THREADS", "2")574plmonkeypatch.setenv("POLARS_JOIN_SAMPLE_LIMIT", "1")575576left_fs = [io.BytesIO(f.getbuffer()) for _ in range(10)]577right_fs = [io.BytesIO(f.getbuffer()) for _ in range(10)]578579left = pl.scan_parquet(left_fs) # type: ignore[arg-type]580right = pl.scan_parquet(right_fs) # type: ignore[arg-type]581582left.join(right, pl.col.a == pl.col.a).collect(engine="streaming").height583584585@pytest.mark.parametrize(("scan", "write"), SCAN_AND_WRITE_FUNCS)586def test_deadlock_linearize(scan: Any, write: Any) -> None:587df = pl.DataFrame(588{589"a": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],590}591)592593f = io.BytesIO()594write(df, f)595fs = [io.BytesIO(f.getbuffer()) for _ in range(10)]596lf = scan(fs).head(100)597598assert_frame_equal(599lf.collect(600engine="streaming", optimizations=pl.QueryOptFlags(slice_pushdown=False)601),602pl.concat([df] * 10),603)604605606@pytest.mark.parametrize(607("scan", "write"),608SCAN_AND_WRITE_FUNCS,609)610def test_row_index_filter_22612(scan: Any, write: Any) -> None:611df = pl.DataFrame(612{613"a": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],614}615)616617f = io.BytesIO()618619if write is pl.DataFrame.write_parquet:620df.write_parquet(f, row_group_size=5)621assert pq.read_metadata(f).num_row_groups == 2622else:623write(df, f)624625for end in range(2, 10):626assert_frame_equal(627scan(f)628.with_row_index()629.filter(pl.col("index") >= end - 2, pl.col("index") <= end)630.collect(),631df.with_row_index().slice(end - 2, 3),632)633634assert_frame_equal(635scan(f)636.with_row_index()637.filter(pl.col("index").is_between(end - 2, end))638.collect(),639df.with_row_index().slice(end - 2, 3),640)641642643@pytest.mark.parametrize(("scan", "write"), SCAN_AND_WRITE_FUNCS)644def test_row_index_name_in_file(scan: Any, write: Any) -> None:645f = io.BytesIO()646write(pl.DataFrame({"index": 1}), f)647648with pytest.raises(649pl.exceptions.DuplicateError,650match="cannot add row_index with name 'index': column already exists in file",651):652scan(f).with_row_index().collect()653654655def test_extra_columns_not_ignored_22218() -> None:656dfs = [pl.DataFrame({"a": 1, "b": 1}), pl.DataFrame({"a": 2, "c": 2})]657658files: list[IO[bytes]] = [io.BytesIO(), io.BytesIO()]659660dfs[0].write_parquet(files[0])661dfs[1].write_parquet(files[1])662663with pytest.raises(664pl.exceptions.SchemaError,665match=r"extra column in file outside of expected schema: c, hint: specify .*or pass",666):667pl.scan_parquet(files, missing_columns="insert").select(pl.all()).collect()668669assert_frame_equal(670pl.scan_parquet(671files,672missing_columns="insert",673extra_columns="ignore",674)675.select(pl.all())676.collect(),677pl.DataFrame({"a": [1, 2], "b": [1, None]}),678)679680681@pytest.mark.parametrize(("scan", "write"), SCAN_AND_WRITE_FUNCS)682def test_scan_null_upcast(scan: Any, write: Any) -> None:683dfs = [684pl.DataFrame({"a": [1, 2, 3]}),685pl.select(a=pl.lit(None, dtype=pl.Null)),686]687688files = [io.BytesIO(), io.BytesIO()]689690write(dfs[0], files[0])691write(dfs[1], files[1])692693# Prevent CSV schema inference from loading as string (it looks at multiple694# files).695if scan is pl.scan_csv:696scan = partial(scan, schema=dfs[0].schema)697698assert_frame_equal(699scan(files).collect(),700pl.DataFrame({"a": [1, 2, 3, None]}),701)702703704@pytest.mark.parametrize(705("scan", "write"),706[707(pl.scan_ipc, pl.DataFrame.write_ipc),708(pl.scan_parquet, pl.DataFrame.write_parquet),709(pl.scan_ndjson, pl.DataFrame.write_ndjson),710],711)712def test_scan_null_upcast_to_nested(scan: Any, write: Any) -> None:713schema = {"a": pl.List(pl.Struct({"field": pl.Int64}))}714715dfs = [716pl.DataFrame(717{"a": [[{"field": 1}], [{"field": 2}], []]},718schema=schema,719),720pl.select(a=pl.lit(None, dtype=pl.Null)),721]722723files = [io.BytesIO(), io.BytesIO()]724725write(dfs[0], files[0])726write(dfs[1], files[1])727728# Prevent CSV schema inference from loading as string (it looks at multiple729# files).730if scan is pl.scan_csv:731scan = partial(scan, schema=schema)732733assert_frame_equal(734scan(files).collect(),735pl.DataFrame(736{"a": [[{"field": 1}], [{"field": 2}], [], None]},737schema=schema,738),739)740741742@pytest.mark.parametrize(743("scan", "write"),744[745(pl.scan_parquet, pl.DataFrame.write_parquet),746],747)748@pytest.mark.parametrize(749"prefix",750[751"",752"file:" if sys.platform != "win32" else "file:/",753"file://" if sys.platform != "win32" else "file:///",754],755)756@pytest.mark.parametrize("use_glob", [True, False])757def test_scan_ignore_hidden_files_21762(758tmp_path: Path, scan: Any, write: Any, use_glob: bool, prefix: str759) -> None:760file_names: list[str] = ["a.ext", "_a.ext", ".a.ext", "a_.ext"]761762for file_name in file_names:763write(pl.DataFrame({"rel_path": file_name}), tmp_path / file_name)764765(tmp_path / "folder").mkdir()766767for file_name in file_names:768write(769pl.DataFrame({"rel_path": f"folder/{file_name}"}),770tmp_path / "folder" / file_name,771)772773(tmp_path / "_folder").mkdir()774775for file_name in file_names:776write(777pl.DataFrame({"rel_path": f"_folder/{file_name}"}),778tmp_path / "_folder" / file_name,779)780781suffix = "/**/*.ext" if use_glob else "/" if prefix.startswith("file:") else ""782root = f"{prefix}{tmp_path}{suffix}"783784assert_frame_equal(785scan(root).sort("*"),786pl.LazyFrame(787{788"rel_path": [789".a.ext",790"_a.ext",791"_folder/.a.ext",792"_folder/_a.ext",793"_folder/a.ext",794"_folder/a_.ext",795"a.ext",796"a_.ext",797"folder/.a.ext",798"folder/_a.ext",799"folder/a.ext",800"folder/a_.ext",801]802}803),804)805806assert_frame_equal(807scan(root, hidden_file_prefix=".").sort("*"),808pl.LazyFrame(809{810"rel_path": [811"_a.ext",812"_folder/_a.ext",813"_folder/a.ext",814"_folder/a_.ext",815"a.ext",816"a_.ext",817"folder/_a.ext",818"folder/a.ext",819"folder/a_.ext",820]821}822),823)824825assert_frame_equal(826scan(root, hidden_file_prefix=[".", "_"]).sort("*"),827pl.LazyFrame(828{829"rel_path": [830"_folder/a.ext",831"_folder/a_.ext",832"a.ext",833"a_.ext",834"folder/a.ext",835"folder/a_.ext",836]837}838),839)840841assert_frame_equal(842scan(root, hidden_file_prefix=(".", "_")).sort("*"),843pl.LazyFrame(844{845"rel_path": [846"_folder/a.ext",847"_folder/a_.ext",848"a.ext",849"a_.ext",850"folder/a.ext",851"folder/a_.ext",852]853}854),855)856857# Top-level glob only858root = f"{tmp_path}/*.ext"859860assert_frame_equal(861scan(root).sort("*"),862pl.LazyFrame(863{864"rel_path": [865".a.ext",866"_a.ext",867"a.ext",868"a_.ext",869]870}871),872)873874assert_frame_equal(875scan(root, hidden_file_prefix=".").sort("*"),876pl.LazyFrame(877{878"rel_path": [879"_a.ext",880"a.ext",881"a_.ext",882]883}884),885)886887assert_frame_equal(888scan(root, hidden_file_prefix=[".", "_"]).sort("*"),889pl.LazyFrame(890{891"rel_path": [892"a.ext",893"a_.ext",894]895}896),897)898899# Direct file passed900with pytest.raises(pl.exceptions.ComputeError, match="expanded paths were empty"):901scan(tmp_path / "_a.ext", hidden_file_prefix="_").collect()902903904def test_row_count_estimate_multifile(io_files_path: Path) -> None:905src = io_files_path / "foods*.parquet"906# test that it doesn't check only the first file907assert "ESTIMATED ROWS: 54" in pl.scan_parquet(src).explain()908909910@pytest.mark.parametrize(911("scan", "write", "ext"),912[913(pl.scan_ipc, pl.DataFrame.write_ipc, "ipc"),914(pl.scan_parquet, pl.DataFrame.write_parquet, "parquet"),915],916)917@pytest.mark.parametrize(918("predicate", "expected_indices"),919[920((pl.col.x == 1) & True, [0]),921(True & (pl.col.x == 1), [0]),922],923)924@pytest.mark.write_disk925def test_hive_predicate_filtering_edge_case_25630(926tmp_path: Path,927scan: Callable[..., pl.LazyFrame],928write: Callable[[pl.DataFrame, Path], Any],929ext: str,930predicate: pl.Expr,931expected_indices: list[int],932) -> None:933df = pl.DataFrame({"x": [1, 2, 3], "y": [0, 1, 1]}).with_row_index()934935(tmp_path / "y=0").mkdir()936(tmp_path / "y=1").mkdir()937938# previously we could panic if hive columns were all filtered out of the projection939write(df.filter(pl.col.y == 0).drop("y"), tmp_path / "y=0" / f"data.{ext}")940write(df.filter(pl.col.y == 1).drop("y"), tmp_path / "y=1" / f"data.{ext}")941942res = scan(tmp_path).filter(predicate).select("index").collect(engine="streaming")943expected = pl.DataFrame(944data={"index": expected_indices},945schema={"index": pl.get_index_type()},946)947assert_frame_equal(res, expected)948949950