Path: blob/main/py-polars/tests/unit/io/test_scan_row_deletion.py
6939 views
from __future__ import annotations12import subprocess3import sys4from typing import TYPE_CHECKING56import pytest78import polars as pl9from polars.testing import assert_frame_equal1011if TYPE_CHECKING:12from pathlib import Path131415@pytest.fixture(scope="session")16def data_files_path(tmp_path_factory: pytest.TempPathFactory) -> Path:17"""Creates: 5 parquet files, 5 rows each."""18tmp_path = tmp_path_factory.mktemp("data-files")19_create_data_files(tmp_path)2021return tmp_path222324@pytest.fixture25def write_position_deletes(tmp_path: Path) -> WritePositionDeletes:26return WritePositionDeletes(tmp_path=tmp_path)272829class WritePositionDeletes: # noqa: D10130def __init__(self, *, tmp_path: Path) -> None:31self.tmp_path = tmp_path32self.i = 03334def __call__(self, positions: pl.Series) -> str:35path = self.tmp_path / f"{self.i}"3637(38positions.alias("pos")39.to_frame()40.select(pl.lit("").alias("file_path"), "pos")41.write_parquet(path)42)4344self.i += 14546return str(path)474849# 5 files x 5 rows each. Contains `physical_index` [0, 1, .., 24].50def _create_data_files(tmp_path: Path) -> None:51tmp_path.mkdir(exist_ok=True, parents=True)52df = pl.select(physical_index=pl.int_range(25, dtype=pl.UInt32))5354parts = []5556for i in [0, 5, 10, 15, 20]:57O = "0" if i < 10 else "" # noqa: E74158path = tmp_path / f"offset={O}{i}/data.parquet"59path.parent.mkdir(exist_ok=True, parents=True)6061part_df = df.slice(i, 5)62part_df.write_parquet(path)6364parts.append(part_df.with_columns(offset=pl.lit(i, dtype=pl.Int64)))6566assert_frame_equal(pl.scan_parquet(tmp_path).collect(), pl.concat(parts))676869@pytest.mark.parametrize("row_index_offset", [0, 27, 38, 73])70def test_scan_row_deletions(71data_files_path: Path,72write_position_deletes: WritePositionDeletes,73row_index_offset: int,74) -> None:75deletion_files = (76"iceberg-position-delete",77{780: [79write_position_deletes(pl.Series([1, 2])),80],811: [82write_position_deletes(pl.Series([0, 1, 2])),83],844: [85write_position_deletes(pl.Series([2, 3])),86],87},88)8990def apply_row_index_offset(values: list[int]) -> list[int]:91return [x + row_index_offset for x in values]9293q = pl.scan_parquet(94data_files_path,95_deletion_files=deletion_files, # type: ignore[arg-type]96hive_partitioning=False,97).with_row_index(offset=row_index_offset)9899assert q.select(pl.len()).collect().item() == 18100101assert_frame_equal(102q.collect(),103pl.DataFrame(104{105"index": apply_row_index_offset([1060, 1, 2,1073, 4,1085, 6, 7, 8, 9,10910, 11, 12, 13, 14,11015, 16, 17111]),112"physical_index": [1130, 3, 4,1148, 9,11510, 11, 12, 13, 14,11615, 16, 17, 18, 19,11720, 21, 24118],119},120schema={"index": pl.get_index_type(), "physical_index": pl.UInt32},121)122) # fmt: skip123124# head()125126assert_frame_equal(127q.head(3).collect(),128pl.DataFrame(129{130"index": apply_row_index_offset([0, 1, 2]),131"physical_index": [0, 3, 4],132},133schema={"index": pl.get_index_type(), "physical_index": pl.UInt32},134),135)136137assert_frame_equal(138q.head(10).collect(),139pl.DataFrame(140{141"index": apply_row_index_offset([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),142"physical_index": [0, 3, 4, 8, 9, 10, 11, 12, 13, 14],143},144schema={"index": pl.get_index_type(), "physical_index": pl.UInt32},145),146)147148# tail()149150assert_frame_equal(151q.tail(3).collect(),152pl.DataFrame(153{154"index": apply_row_index_offset([15, 16, 17]),155"physical_index": [20, 21, 24],156},157schema={"index": pl.get_index_type(), "physical_index": pl.UInt32},158),159)160161assert_frame_equal(162q.tail(10).collect(),163pl.DataFrame(164{165"index": apply_row_index_offset(166[8, 9, 10, 11, 12, 13, 14, 15, 16, 17]167),168"physical_index": [16913, 14, 15, 16, 17, 18, 19, 20, 21,17024171],172},173schema={"index": pl.get_index_type(), "physical_index": pl.UInt32},174),175) # fmt: skip176177# slice(positive_offset)178179assert_frame_equal(180q.slice(2, 10).collect(),181pl.DataFrame(182{183"index": apply_row_index_offset([2, 3, 4, 5, 6, 7, 8, 9, 10, 11]),184"physical_index": [4, 8, 9, 10, 11, 12, 13, 14, 15, 16],185},186schema={"index": pl.get_index_type(), "physical_index": pl.UInt32},187),188)189190assert_frame_equal(191q.slice(5, 10).collect(),192pl.DataFrame(193{194"index": apply_row_index_offset([5, 6, 7, 8, 9, 10, 11, 12, 13, 14]),195"physical_index": [10, 11, 12, 13, 14, 15, 16, 17, 18, 19],196},197schema={"index": pl.get_index_type(), "physical_index": pl.UInt32},198),199)200201assert_frame_equal(202q.slice(10, 10).collect(),203pl.DataFrame(204{205"index": apply_row_index_offset([10, 11, 12, 13, 14, 15, 16, 17]),206"physical_index": [15, 16, 17, 18, 19, 20, 21, 24],207},208schema={"index": pl.get_index_type(), "physical_index": pl.UInt32},209),210)211212# slice(negative_offset)213assert_frame_equal(214q.slice(-3, 2).collect(),215pl.DataFrame(216{217"index": apply_row_index_offset([15, 16]),218"physical_index": [20, 21],219},220schema={"index": pl.get_index_type(), "physical_index": pl.UInt32},221),222)223224assert_frame_equal(225q.slice(-23, 10).collect(),226pl.DataFrame(227{228"index": apply_row_index_offset([0, 1, 2, 3, 4]),229"physical_index": [0, 3, 4, 8, 9],230},231schema={"index": pl.get_index_type(), "physical_index": pl.UInt32},232),233)234235# filter: skip_files236q = pl.scan_parquet(237data_files_path,238_deletion_files=deletion_files, # type: ignore[arg-type]239).with_row_index(offset=row_index_offset)240241assert_frame_equal(242q.filter(pl.col("offset").is_in([10, 20])).collect(),243pl.DataFrame(244{245"index": apply_row_index_offset([5, 6, 7, 8, 9, 15, 16, 17]),246"physical_index": [10, 11, 12, 13, 14, 20, 21, 24],247"offset": [10, 10, 10, 10, 10, 20, 20, 20],248},249schema={250"index": pl.get_index_type(),251"physical_index": pl.UInt32,252"offset": pl.Int64,253},254),255)256257258@pytest.mark.slow259@pytest.mark.write_disk260@pytest.mark.parametrize("ideal_morsel_size", [999, 50, 33])261@pytest.mark.parametrize("force_empty_capabilities", [True, False])262def test_scan_row_deletion_single_large(263tmp_path: Path,264write_position_deletes: WritePositionDeletes,265ideal_morsel_size: int,266force_empty_capabilities: bool,267) -> None:268path = tmp_path / "data.parquet"269pl.DataFrame({"physical_index": range(100)}).write_parquet(path)270271positions = pl.Series([2720, 1, 2, 3, 4, 5, 6, 7, 8, 22,27323, 24, 25, 26, 27, 28, 29, 30, 31, 32,27433, 45, 46, 47, 48, 49, 50, 51, 52, 53,27554, 64, 65, 66, 67, 68, 69, 70, 71, 79,27680, 81, 82, 83, 84, 90, 91, 92, 93, 97,27798278]) # fmt: skip279280deletion_positions_path = write_position_deletes(positions)281282script_args: list[str] = [283str(ideal_morsel_size),284"1" if force_empty_capabilities else "0",285str(path),286deletion_positions_path,287]288289# Use a process to ensure ideal morsel size is set correctly.290out = subprocess.check_output(291[292sys.executable,293"-c",294"""\295import os296import sys297298(299_,300ideal_morsel_size,301force_empty_capabilities,302data_file_path,303deletion_positions_path,304) = sys.argv305306os.environ["POLARS_VERBOSE"] = "0"307os.environ["POLARS_MAX_THREADS"] = "1"308os.environ["POLARS_IDEAL_MORSEL_SIZE"] = ideal_morsel_size309os.environ["POLARS_FORCE_EMPTY_READER_CAPABILITIES"] = force_empty_capabilities310311import polars as pl312from polars.testing import assert_frame_equal313314full_expected_physical = [3159, 10, 11, 12, 13, 14, 15, 16, 17, 18,31619, 20, 21, 34, 35, 36, 37, 38, 39, 40,31741, 42, 43, 44, 55, 56, 57, 58, 59, 60,31861, 62, 63, 72, 73, 74, 75, 76, 77, 78,31985, 86, 87, 88, 89, 94, 95, 96, 99320] # fmt: skip321322deletion_files = (323"iceberg-position-delete",324{0: [deletion_positions_path]},325)326327q = pl.scan_parquet(data_file_path, _deletion_files=deletion_files).with_row_index()328329assert_frame_equal(330q.collect(),331pl.DataFrame({"physical_index": full_expected_physical}).with_row_index(),332)333334assert_frame_equal(335q.tail(999).collect(),336pl.DataFrame({"physical_index": full_expected_physical}).with_row_index(),337)338339# Note: The negative slice is important here. Otherwise row_index does not get340# lowered into the post-apply pipeline.341for negative_offset in range(1, 49):342assert_frame_equal(343q.tail(negative_offset).collect(),344pl.DataFrame(345{"physical_index": full_expected_physical[-negative_offset:]}346).with_row_index(offset=49 - negative_offset),347)348349assert_frame_equal(350q.slice(20).collect(),351pl.DataFrame({"physical_index": full_expected_physical[20:]}).with_row_index(352offset=20353),354)355356print("OK", end="")357""",358*script_args,359],360stderr=subprocess.STDOUT,361)362363assert out == b"OK"364365366@pytest.mark.write_disk367def test_scan_row_deletion_skips_file_with_all_rows_deleted(368tmp_path: Path,369write_position_deletes: WritePositionDeletes,370) -> None:371# Create our own copy because we mutate one of the data files372data_files_path = tmp_path / "data-files"373_create_data_files(data_files_path)374375# Corrupt a parquet file376def remove_data(path: Path) -> None:377v = path.read_bytes()378metadata_and_footer_len = 8 + int.from_bytes(v[-8:][:4], "little")379path.write_bytes(b"\x00" * (len(v) - metadata_and_footer_len))380path.write_bytes(v[-metadata_and_footer_len:])381382remove_data(data_files_path / "offset=05/data.parquet")383384q = pl.scan_parquet(385data_files_path / "offset=05/data.parquet", hive_partitioning=False386)387388# Baseline: The metadata is readable but the row groups are not389390assert q.collect_schema() == {"physical_index": pl.UInt32}391assert q.select(pl.len()).collect().item() == 5392393with pytest.raises(pl.exceptions.ComputeError, match="Invalid thrift"):394q.collect()395396q = pl.scan_parquet(data_files_path, hive_partitioning=False)397398with pytest.raises(pl.exceptions.ComputeError, match="Invalid thrift"):399q.collect()400401q = pl.scan_parquet(402data_files_path,403_deletion_files=(404"iceberg-position-delete",405{4061: [407write_position_deletes(pl.Series([0, 1, 2])),408write_position_deletes(pl.Series([3, 4])),409]410},411),412hive_partitioning=False,413)414415expect = pl.DataFrame(416{417"index": [4180, 1, 2, 3, 4,4195, 6, 7, 8, 9,42010, 11, 12, 13, 14,42115, 16, 17, 18, 19,422],423"physical_index": [4240, 1, 2, 3, 4,42510, 11, 12, 13, 14,42615, 16, 17, 18, 19,42720, 21, 22, 23, 24,428],429},430schema={"index": pl.get_index_type(), "physical_index": pl.UInt32},431) # fmt: skip432433assert_frame_equal(q.collect(), expect.drop("index"))434assert_frame_equal(q.with_row_index().collect(), expect)435436expect = pl.DataFrame(437{438"index": [43910, 11, 12, 13, 14,44015, 16, 17, 18, 19,441],442"physical_index": [44315, 16, 17, 18, 19,44420, 21, 22, 23, 24,445],446},447schema={"index": pl.get_index_type(), "physical_index": pl.UInt32},448) # fmt: skip449450assert_frame_equal(q.slice(10).collect(), expect.drop("index"))451assert_frame_equal(q.with_row_index().slice(10).collect(), expect)452453454