Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/io/test_scan_row_deletion.py
6939 views
1
from __future__ import annotations
2
3
import subprocess
4
import sys
5
from typing import TYPE_CHECKING
6
7
import pytest
8
9
import polars as pl
10
from polars.testing import assert_frame_equal
11
12
if TYPE_CHECKING:
13
from pathlib import Path
14
15
16
@pytest.fixture(scope="session")
17
def data_files_path(tmp_path_factory: pytest.TempPathFactory) -> Path:
18
"""Creates: 5 parquet files, 5 rows each."""
19
tmp_path = tmp_path_factory.mktemp("data-files")
20
_create_data_files(tmp_path)
21
22
return tmp_path
23
24
25
@pytest.fixture
26
def write_position_deletes(tmp_path: Path) -> WritePositionDeletes:
27
return WritePositionDeletes(tmp_path=tmp_path)
28
29
30
class WritePositionDeletes: # noqa: D101
31
def __init__(self, *, tmp_path: Path) -> None:
32
self.tmp_path = tmp_path
33
self.i = 0
34
35
def __call__(self, positions: pl.Series) -> str:
36
path = self.tmp_path / f"{self.i}"
37
38
(
39
positions.alias("pos")
40
.to_frame()
41
.select(pl.lit("").alias("file_path"), "pos")
42
.write_parquet(path)
43
)
44
45
self.i += 1
46
47
return str(path)
48
49
50
# 5 files x 5 rows each. Contains `physical_index` [0, 1, .., 24].
51
def _create_data_files(tmp_path: Path) -> None:
52
tmp_path.mkdir(exist_ok=True, parents=True)
53
df = pl.select(physical_index=pl.int_range(25, dtype=pl.UInt32))
54
55
parts = []
56
57
for i in [0, 5, 10, 15, 20]:
58
O = "0" if i < 10 else "" # noqa: E741
59
path = tmp_path / f"offset={O}{i}/data.parquet"
60
path.parent.mkdir(exist_ok=True, parents=True)
61
62
part_df = df.slice(i, 5)
63
part_df.write_parquet(path)
64
65
parts.append(part_df.with_columns(offset=pl.lit(i, dtype=pl.Int64)))
66
67
assert_frame_equal(pl.scan_parquet(tmp_path).collect(), pl.concat(parts))
68
69
70
@pytest.mark.parametrize("row_index_offset", [0, 27, 38, 73])
71
def test_scan_row_deletions(
72
data_files_path: Path,
73
write_position_deletes: WritePositionDeletes,
74
row_index_offset: int,
75
) -> None:
76
deletion_files = (
77
"iceberg-position-delete",
78
{
79
0: [
80
write_position_deletes(pl.Series([1, 2])),
81
],
82
1: [
83
write_position_deletes(pl.Series([0, 1, 2])),
84
],
85
4: [
86
write_position_deletes(pl.Series([2, 3])),
87
],
88
},
89
)
90
91
def apply_row_index_offset(values: list[int]) -> list[int]:
92
return [x + row_index_offset for x in values]
93
94
q = pl.scan_parquet(
95
data_files_path,
96
_deletion_files=deletion_files, # type: ignore[arg-type]
97
hive_partitioning=False,
98
).with_row_index(offset=row_index_offset)
99
100
assert q.select(pl.len()).collect().item() == 18
101
102
assert_frame_equal(
103
q.collect(),
104
pl.DataFrame(
105
{
106
"index": apply_row_index_offset([
107
0, 1, 2,
108
3, 4,
109
5, 6, 7, 8, 9,
110
10, 11, 12, 13, 14,
111
15, 16, 17
112
]),
113
"physical_index": [
114
0, 3, 4,
115
8, 9,
116
10, 11, 12, 13, 14,
117
15, 16, 17, 18, 19,
118
20, 21, 24
119
],
120
},
121
schema={"index": pl.get_index_type(), "physical_index": pl.UInt32},
122
)
123
) # fmt: skip
124
125
# head()
126
127
assert_frame_equal(
128
q.head(3).collect(),
129
pl.DataFrame(
130
{
131
"index": apply_row_index_offset([0, 1, 2]),
132
"physical_index": [0, 3, 4],
133
},
134
schema={"index": pl.get_index_type(), "physical_index": pl.UInt32},
135
),
136
)
137
138
assert_frame_equal(
139
q.head(10).collect(),
140
pl.DataFrame(
141
{
142
"index": apply_row_index_offset([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
143
"physical_index": [0, 3, 4, 8, 9, 10, 11, 12, 13, 14],
144
},
145
schema={"index": pl.get_index_type(), "physical_index": pl.UInt32},
146
),
147
)
148
149
# tail()
150
151
assert_frame_equal(
152
q.tail(3).collect(),
153
pl.DataFrame(
154
{
155
"index": apply_row_index_offset([15, 16, 17]),
156
"physical_index": [20, 21, 24],
157
},
158
schema={"index": pl.get_index_type(), "physical_index": pl.UInt32},
159
),
160
)
161
162
assert_frame_equal(
163
q.tail(10).collect(),
164
pl.DataFrame(
165
{
166
"index": apply_row_index_offset(
167
[8, 9, 10, 11, 12, 13, 14, 15, 16, 17]
168
),
169
"physical_index": [
170
13, 14, 15, 16, 17, 18, 19, 20, 21,
171
24
172
],
173
},
174
schema={"index": pl.get_index_type(), "physical_index": pl.UInt32},
175
),
176
) # fmt: skip
177
178
# slice(positive_offset)
179
180
assert_frame_equal(
181
q.slice(2, 10).collect(),
182
pl.DataFrame(
183
{
184
"index": apply_row_index_offset([2, 3, 4, 5, 6, 7, 8, 9, 10, 11]),
185
"physical_index": [4, 8, 9, 10, 11, 12, 13, 14, 15, 16],
186
},
187
schema={"index": pl.get_index_type(), "physical_index": pl.UInt32},
188
),
189
)
190
191
assert_frame_equal(
192
q.slice(5, 10).collect(),
193
pl.DataFrame(
194
{
195
"index": apply_row_index_offset([5, 6, 7, 8, 9, 10, 11, 12, 13, 14]),
196
"physical_index": [10, 11, 12, 13, 14, 15, 16, 17, 18, 19],
197
},
198
schema={"index": pl.get_index_type(), "physical_index": pl.UInt32},
199
),
200
)
201
202
assert_frame_equal(
203
q.slice(10, 10).collect(),
204
pl.DataFrame(
205
{
206
"index": apply_row_index_offset([10, 11, 12, 13, 14, 15, 16, 17]),
207
"physical_index": [15, 16, 17, 18, 19, 20, 21, 24],
208
},
209
schema={"index": pl.get_index_type(), "physical_index": pl.UInt32},
210
),
211
)
212
213
# slice(negative_offset)
214
assert_frame_equal(
215
q.slice(-3, 2).collect(),
216
pl.DataFrame(
217
{
218
"index": apply_row_index_offset([15, 16]),
219
"physical_index": [20, 21],
220
},
221
schema={"index": pl.get_index_type(), "physical_index": pl.UInt32},
222
),
223
)
224
225
assert_frame_equal(
226
q.slice(-23, 10).collect(),
227
pl.DataFrame(
228
{
229
"index": apply_row_index_offset([0, 1, 2, 3, 4]),
230
"physical_index": [0, 3, 4, 8, 9],
231
},
232
schema={"index": pl.get_index_type(), "physical_index": pl.UInt32},
233
),
234
)
235
236
# filter: skip_files
237
q = pl.scan_parquet(
238
data_files_path,
239
_deletion_files=deletion_files, # type: ignore[arg-type]
240
).with_row_index(offset=row_index_offset)
241
242
assert_frame_equal(
243
q.filter(pl.col("offset").is_in([10, 20])).collect(),
244
pl.DataFrame(
245
{
246
"index": apply_row_index_offset([5, 6, 7, 8, 9, 15, 16, 17]),
247
"physical_index": [10, 11, 12, 13, 14, 20, 21, 24],
248
"offset": [10, 10, 10, 10, 10, 20, 20, 20],
249
},
250
schema={
251
"index": pl.get_index_type(),
252
"physical_index": pl.UInt32,
253
"offset": pl.Int64,
254
},
255
),
256
)
257
258
259
@pytest.mark.slow
260
@pytest.mark.write_disk
261
@pytest.mark.parametrize("ideal_morsel_size", [999, 50, 33])
262
@pytest.mark.parametrize("force_empty_capabilities", [True, False])
263
def test_scan_row_deletion_single_large(
264
tmp_path: Path,
265
write_position_deletes: WritePositionDeletes,
266
ideal_morsel_size: int,
267
force_empty_capabilities: bool,
268
) -> None:
269
path = tmp_path / "data.parquet"
270
pl.DataFrame({"physical_index": range(100)}).write_parquet(path)
271
272
positions = pl.Series([
273
0, 1, 2, 3, 4, 5, 6, 7, 8, 22,
274
23, 24, 25, 26, 27, 28, 29, 30, 31, 32,
275
33, 45, 46, 47, 48, 49, 50, 51, 52, 53,
276
54, 64, 65, 66, 67, 68, 69, 70, 71, 79,
277
80, 81, 82, 83, 84, 90, 91, 92, 93, 97,
278
98
279
]) # fmt: skip
280
281
deletion_positions_path = write_position_deletes(positions)
282
283
script_args: list[str] = [
284
str(ideal_morsel_size),
285
"1" if force_empty_capabilities else "0",
286
str(path),
287
deletion_positions_path,
288
]
289
290
# Use a process to ensure ideal morsel size is set correctly.
291
out = subprocess.check_output(
292
[
293
sys.executable,
294
"-c",
295
"""\
296
import os
297
import sys
298
299
(
300
_,
301
ideal_morsel_size,
302
force_empty_capabilities,
303
data_file_path,
304
deletion_positions_path,
305
) = sys.argv
306
307
os.environ["POLARS_VERBOSE"] = "0"
308
os.environ["POLARS_MAX_THREADS"] = "1"
309
os.environ["POLARS_IDEAL_MORSEL_SIZE"] = ideal_morsel_size
310
os.environ["POLARS_FORCE_EMPTY_READER_CAPABILITIES"] = force_empty_capabilities
311
312
import polars as pl
313
from polars.testing import assert_frame_equal
314
315
full_expected_physical = [
316
9, 10, 11, 12, 13, 14, 15, 16, 17, 18,
317
19, 20, 21, 34, 35, 36, 37, 38, 39, 40,
318
41, 42, 43, 44, 55, 56, 57, 58, 59, 60,
319
61, 62, 63, 72, 73, 74, 75, 76, 77, 78,
320
85, 86, 87, 88, 89, 94, 95, 96, 99
321
] # fmt: skip
322
323
deletion_files = (
324
"iceberg-position-delete",
325
{0: [deletion_positions_path]},
326
)
327
328
q = pl.scan_parquet(data_file_path, _deletion_files=deletion_files).with_row_index()
329
330
assert_frame_equal(
331
q.collect(),
332
pl.DataFrame({"physical_index": full_expected_physical}).with_row_index(),
333
)
334
335
assert_frame_equal(
336
q.tail(999).collect(),
337
pl.DataFrame({"physical_index": full_expected_physical}).with_row_index(),
338
)
339
340
# Note: The negative slice is important here. Otherwise row_index does not get
341
# lowered into the post-apply pipeline.
342
for negative_offset in range(1, 49):
343
assert_frame_equal(
344
q.tail(negative_offset).collect(),
345
pl.DataFrame(
346
{"physical_index": full_expected_physical[-negative_offset:]}
347
).with_row_index(offset=49 - negative_offset),
348
)
349
350
assert_frame_equal(
351
q.slice(20).collect(),
352
pl.DataFrame({"physical_index": full_expected_physical[20:]}).with_row_index(
353
offset=20
354
),
355
)
356
357
print("OK", end="")
358
""",
359
*script_args,
360
],
361
stderr=subprocess.STDOUT,
362
)
363
364
assert out == b"OK"
365
366
367
@pytest.mark.write_disk
368
def test_scan_row_deletion_skips_file_with_all_rows_deleted(
369
tmp_path: Path,
370
write_position_deletes: WritePositionDeletes,
371
) -> None:
372
# Create our own copy because we mutate one of the data files
373
data_files_path = tmp_path / "data-files"
374
_create_data_files(data_files_path)
375
376
# Corrupt a parquet file
377
def remove_data(path: Path) -> None:
378
v = path.read_bytes()
379
metadata_and_footer_len = 8 + int.from_bytes(v[-8:][:4], "little")
380
path.write_bytes(b"\x00" * (len(v) - metadata_and_footer_len))
381
path.write_bytes(v[-metadata_and_footer_len:])
382
383
remove_data(data_files_path / "offset=05/data.parquet")
384
385
q = pl.scan_parquet(
386
data_files_path / "offset=05/data.parquet", hive_partitioning=False
387
)
388
389
# Baseline: The metadata is readable but the row groups are not
390
391
assert q.collect_schema() == {"physical_index": pl.UInt32}
392
assert q.select(pl.len()).collect().item() == 5
393
394
with pytest.raises(pl.exceptions.ComputeError, match="Invalid thrift"):
395
q.collect()
396
397
q = pl.scan_parquet(data_files_path, hive_partitioning=False)
398
399
with pytest.raises(pl.exceptions.ComputeError, match="Invalid thrift"):
400
q.collect()
401
402
q = pl.scan_parquet(
403
data_files_path,
404
_deletion_files=(
405
"iceberg-position-delete",
406
{
407
1: [
408
write_position_deletes(pl.Series([0, 1, 2])),
409
write_position_deletes(pl.Series([3, 4])),
410
]
411
},
412
),
413
hive_partitioning=False,
414
)
415
416
expect = pl.DataFrame(
417
{
418
"index": [
419
0, 1, 2, 3, 4,
420
5, 6, 7, 8, 9,
421
10, 11, 12, 13, 14,
422
15, 16, 17, 18, 19,
423
],
424
"physical_index": [
425
0, 1, 2, 3, 4,
426
10, 11, 12, 13, 14,
427
15, 16, 17, 18, 19,
428
20, 21, 22, 23, 24,
429
],
430
},
431
schema={"index": pl.get_index_type(), "physical_index": pl.UInt32},
432
) # fmt: skip
433
434
assert_frame_equal(q.collect(), expect.drop("index"))
435
assert_frame_equal(q.with_row_index().collect(), expect)
436
437
expect = pl.DataFrame(
438
{
439
"index": [
440
10, 11, 12, 13, 14,
441
15, 16, 17, 18, 19,
442
],
443
"physical_index": [
444
15, 16, 17, 18, 19,
445
20, 21, 22, 23, 24,
446
],
447
},
448
schema={"index": pl.get_index_type(), "physical_index": pl.UInt32},
449
) # fmt: skip
450
451
assert_frame_equal(q.slice(10).collect(), expect.drop("index"))
452
assert_frame_equal(q.with_row_index().slice(10).collect(), expect)
453
454