Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/io/test_lazy_parquet.py
6939 views
1
from __future__ import annotations
2
3
import io
4
import subprocess
5
import sys
6
from collections import OrderedDict
7
from pathlib import Path
8
from threading import Thread
9
from typing import TYPE_CHECKING, Any
10
11
import pandas as pd
12
import pyarrow.parquet as pq
13
import pytest
14
15
import polars as pl
16
from polars.exceptions import ComputeError
17
from polars.testing import assert_frame_equal
18
19
if TYPE_CHECKING:
20
from polars._typing import ParallelStrategy
21
22
23
@pytest.fixture
24
def parquet_file_path(io_files_path: Path) -> Path:
25
return io_files_path / "small.parquet"
26
27
28
@pytest.fixture
29
def foods_parquet_path(io_files_path: Path) -> Path:
30
return io_files_path / "foods1.parquet"
31
32
33
def test_scan_parquet(parquet_file_path: Path) -> None:
34
df = pl.scan_parquet(parquet_file_path)
35
assert df.collect().shape == (4, 3)
36
37
38
def test_scan_parquet_local_with_async(
39
monkeypatch: Any, foods_parquet_path: Path
40
) -> None:
41
monkeypatch.setenv("POLARS_FORCE_ASYNC", "1")
42
pl.scan_parquet(foods_parquet_path.relative_to(Path.cwd())).head(1).collect()
43
44
45
def test_row_index(foods_parquet_path: Path) -> None:
46
df = pl.read_parquet(foods_parquet_path, row_index_name="row_index")
47
assert df["row_index"].to_list() == list(range(27))
48
49
df = (
50
pl.scan_parquet(foods_parquet_path, row_index_name="row_index")
51
.filter(pl.col("category") == pl.lit("vegetables"))
52
.collect()
53
)
54
55
assert df["row_index"].to_list() == [0, 6, 11, 13, 14, 20, 25]
56
57
df = (
58
pl.scan_parquet(foods_parquet_path, row_index_name="row_index")
59
.with_row_index("foo", 10)
60
.filter(pl.col("category") == pl.lit("vegetables"))
61
.collect()
62
)
63
64
assert df["foo"].to_list() == [10, 16, 21, 23, 24, 30, 35]
65
66
67
def test_row_index_len_16543(foods_parquet_path: Path) -> None:
68
q = pl.scan_parquet(foods_parquet_path).with_row_index()
69
assert q.select(pl.all()).select(pl.len()).collect().item() == 27
70
71
72
@pytest.mark.write_disk
73
def test_categorical_parquet_statistics(tmp_path: Path) -> None:
74
tmp_path.mkdir(exist_ok=True)
75
76
df = pl.DataFrame(
77
{
78
"book": [
79
"bookA",
80
"bookA",
81
"bookB",
82
"bookA",
83
"bookA",
84
"bookC",
85
"bookC",
86
"bookC",
87
],
88
"transaction_id": [1, 2, 3, 4, 5, 6, 7, 8],
89
"user": ["bob", "bob", "bob", "tim", "lucy", "lucy", "lucy", "lucy"],
90
}
91
).with_columns(pl.col("book").cast(pl.Categorical))
92
93
file_path = tmp_path / "books.parquet"
94
df.write_parquet(file_path, statistics=True)
95
96
parallel_options: list[ParallelStrategy] = [
97
"auto",
98
"columns",
99
"row_groups",
100
"none",
101
]
102
for par in parallel_options:
103
df = (
104
pl.scan_parquet(file_path, parallel=par)
105
.filter(pl.col("book") == "bookA")
106
.collect()
107
)
108
assert df.shape == (4, 3)
109
110
111
@pytest.mark.write_disk
112
def test_parquet_eq_stats(tmp_path: Path) -> None:
113
tmp_path.mkdir(exist_ok=True)
114
115
file_path = tmp_path / "stats.parquet"
116
117
df1 = pd.DataFrame({"a": [None, 1, None, 2, 3, 3, 4, 4, 5, 5]})
118
df1.to_parquet(file_path, engine="pyarrow")
119
df = pl.scan_parquet(file_path).filter(pl.col("a") == 4).collect()
120
assert df["a"].to_list() == [4.0, 4.0]
121
122
assert (
123
pl.scan_parquet(file_path).filter(pl.col("a") == 2).select(pl.col("a").sum())
124
).collect()[0, "a"] == 2.0
125
126
assert pl.scan_parquet(file_path).filter(pl.col("a") == 5).collect().shape == (
127
2,
128
1,
129
)
130
131
132
@pytest.mark.write_disk
133
def test_parquet_is_in_stats(tmp_path: Path) -> None:
134
tmp_path.mkdir(exist_ok=True)
135
136
file_path = tmp_path / "stats.parquet"
137
138
df1 = pd.DataFrame({"a": [None, 1, None, 2, 3, 3, 4, 4, 5, 5]})
139
df1.to_parquet(file_path, engine="pyarrow")
140
df = pl.scan_parquet(file_path).filter(pl.col("a").is_in([5])).collect()
141
assert df["a"].to_list() == [5.0, 5.0]
142
143
assert (
144
pl.scan_parquet(file_path)
145
.filter(pl.col("a").is_in([5]))
146
.select(pl.col("a").sum())
147
).collect()[0, "a"] == 10.0
148
149
assert (
150
pl.scan_parquet(file_path)
151
.filter(pl.col("a").is_in([1, 2, 3]))
152
.select(pl.col("a").sum())
153
).collect()[0, "a"] == 9.0
154
155
assert (
156
pl.scan_parquet(file_path)
157
.filter(pl.col("a").is_in([1, 2, 3]))
158
.select(pl.col("a").sum())
159
).collect()[0, "a"] == 9.0
160
161
assert (
162
pl.scan_parquet(file_path)
163
.filter(pl.col("a").is_in([5]))
164
.select(pl.col("a").sum())
165
).collect()[0, "a"] == 10.0
166
167
assert pl.scan_parquet(file_path).filter(
168
pl.col("a").is_in([1, 2, 3, 4, 5])
169
).collect().shape == (8, 1)
170
171
172
@pytest.mark.write_disk
173
def test_parquet_stats(tmp_path: Path) -> None:
174
tmp_path.mkdir(exist_ok=True)
175
176
file_path = tmp_path / "binary_stats.parquet"
177
178
df1 = pd.DataFrame({"a": [None, 1, None, 2, 3, 3, 4, 4, 5, 5]})
179
df1.to_parquet(file_path, engine="pyarrow")
180
df = (
181
pl.scan_parquet(file_path)
182
.filter(pl.col("a").is_not_null() & (pl.col("a") > 4))
183
.collect()
184
)
185
assert df["a"].to_list() == [5.0, 5.0]
186
187
assert (
188
pl.scan_parquet(file_path).filter(pl.col("a") > 4).select(pl.col("a").sum())
189
).collect()[0, "a"] == 10.0
190
191
assert (
192
pl.scan_parquet(file_path).filter(pl.col("a") < 4).select(pl.col("a").sum())
193
).collect()[0, "a"] == 9.0
194
195
assert (
196
pl.scan_parquet(file_path).filter(pl.col("a") < 4).select(pl.col("a").sum())
197
).collect()[0, "a"] == 9.0
198
199
assert (
200
pl.scan_parquet(file_path).filter(pl.col("a") > 4).select(pl.col("a").sum())
201
).collect()[0, "a"] == 10.0
202
assert pl.scan_parquet(file_path).filter(
203
(pl.col("a") * 10) > 5.0
204
).collect().shape == (8, 1)
205
206
207
def test_row_index_schema_parquet(parquet_file_path: Path) -> None:
208
assert (
209
pl.scan_parquet(str(parquet_file_path), row_index_name="id")
210
.select(["id", "b"])
211
.collect()
212
).dtypes == [pl.UInt32, pl.String]
213
214
215
@pytest.mark.may_fail_cloud # reason: inspects logs
216
@pytest.mark.write_disk
217
def test_parquet_is_in_statistics(monkeypatch: Any, capfd: Any, tmp_path: Path) -> None:
218
tmp_path.mkdir(exist_ok=True)
219
220
monkeypatch.setenv("POLARS_VERBOSE", "1")
221
222
df = pl.DataFrame({"idx": pl.arange(0, 100, eager=True)}).with_columns(
223
(pl.col("idx") // 25).alias("part")
224
)
225
df = pl.concat(df.partition_by("part", as_dict=False), rechunk=False)
226
assert df.n_chunks("all") == [4, 4]
227
228
file_path = tmp_path / "stats.parquet"
229
df.write_parquet(file_path, statistics=True, use_pyarrow=False)
230
231
file_path = tmp_path / "stats.parquet"
232
df.write_parquet(file_path, statistics=True, use_pyarrow=False)
233
234
for pred in [
235
pl.col("idx").is_in([150, 200, 300]),
236
pl.col("idx").is_in([5, 250, 350]),
237
]:
238
result = pl.scan_parquet(file_path).filter(pred).collect()
239
assert_frame_equal(result, df.filter(pred))
240
241
captured = capfd.readouterr().err
242
assert "Predicate pushdown: reading 1 / 1 row groups" in captured
243
assert "Predicate pushdown: reading 0 / 1 row groups" in captured
244
245
246
@pytest.mark.may_fail_cloud # reason: inspects logs
247
@pytest.mark.write_disk
248
def test_parquet_statistics(monkeypatch: Any, capfd: Any, tmp_path: Path) -> None:
249
tmp_path.mkdir(exist_ok=True)
250
251
monkeypatch.setenv("POLARS_VERBOSE", "1")
252
253
df = pl.DataFrame({"idx": pl.arange(0, 100, eager=True)}).with_columns(
254
(pl.col("idx") // 25).alias("part")
255
)
256
df = pl.concat(df.partition_by("part", as_dict=False), rechunk=False)
257
assert df.n_chunks("all") == [4, 4]
258
259
file_path = tmp_path / "stats.parquet"
260
df.write_parquet(file_path, statistics=True, use_pyarrow=False, row_group_size=50)
261
262
for pred in [
263
pl.col("idx") < 50,
264
pl.col("idx") > 50,
265
pl.col("idx").null_count() != 0,
266
pl.col("idx").null_count() == 0,
267
pl.col("idx").min() == pl.col("part").null_count(),
268
]:
269
result = pl.scan_parquet(file_path).filter(pred).collect()
270
assert_frame_equal(result, df.filter(pred))
271
272
captured = capfd.readouterr().err
273
274
assert "Predicate pushdown: reading 1 / 2 row groups" in captured
275
276
277
@pytest.mark.write_disk
278
def test_categorical(tmp_path: Path) -> None:
279
tmp_path.mkdir(exist_ok=True)
280
281
df = pl.DataFrame(
282
[
283
pl.Series("name", ["Bob", "Alice", "Bob"], pl.Categorical),
284
pl.Series("amount", [100, 200, 300]),
285
]
286
)
287
288
file_path = tmp_path / "categorical.parquet"
289
df.write_parquet(file_path)
290
291
result = (
292
pl.scan_parquet(file_path)
293
.group_by("name")
294
.agg(pl.col("amount").sum())
295
.collect()
296
.sort("name")
297
)
298
expected = pl.DataFrame(
299
{"name": ["Alice", "Bob"], "amount": [200, 400]},
300
schema_overrides={"name": pl.Categorical},
301
)
302
assert_frame_equal(result, expected)
303
304
305
def test_glob_n_rows(io_files_path: Path) -> None:
306
file_path = io_files_path / "foods*.parquet"
307
df = pl.scan_parquet(file_path, n_rows=40).collect()
308
309
# 27 rows from foods1.parquet and 13 from foods2.parquet
310
assert df.shape == (40, 4)
311
312
# take first and last rows
313
assert df[[0, 39]].to_dict(as_series=False) == {
314
"category": ["vegetables", "seafood"],
315
"calories": [45, 146],
316
"fats_g": [0.5, 6.0],
317
"sugars_g": [2, 2],
318
}
319
320
321
@pytest.mark.write_disk
322
def test_parquet_statistics_filter_9925(tmp_path: Path) -> None:
323
tmp_path.mkdir(exist_ok=True)
324
file_path = tmp_path / "codes.parquet"
325
df = pl.DataFrame({"code": [300964, 300972, 500_000, 26]})
326
df.write_parquet(file_path, statistics=True)
327
328
q = pl.scan_parquet(file_path).filter(
329
(pl.col("code").floordiv(100_000)).is_in([0, 3])
330
)
331
assert q.collect().to_dict(as_series=False) == {"code": [300964, 300972, 26]}
332
333
334
@pytest.mark.write_disk
335
def test_parquet_statistics_filter_11069(tmp_path: Path) -> None:
336
tmp_path.mkdir(exist_ok=True)
337
file_path = tmp_path / "foo.parquet"
338
pl.DataFrame({"x": [1, None]}).write_parquet(file_path, statistics=False)
339
340
result = pl.scan_parquet(file_path).filter(pl.col("x").is_null()).collect()
341
expected = {"x": [None]}
342
assert result.to_dict(as_series=False) == expected
343
344
345
def test_parquet_list_arg(io_files_path: Path) -> None:
346
first = io_files_path / "foods1.parquet"
347
second = io_files_path / "foods2.parquet"
348
349
df = pl.scan_parquet(source=[first, second]).collect()
350
assert df.shape == (54, 4)
351
assert df.row(-1) == ("seafood", 194, 12.0, 1)
352
assert df.row(0) == ("vegetables", 45, 0.5, 2)
353
354
355
@pytest.mark.write_disk
356
def test_parquet_many_row_groups_12297(tmp_path: Path) -> None:
357
tmp_path.mkdir(exist_ok=True)
358
file_path = tmp_path / "foo.parquet"
359
df = pl.DataFrame({"x": range(100)})
360
df.write_parquet(file_path, row_group_size=5, use_pyarrow=True)
361
assert_frame_equal(pl.scan_parquet(file_path).collect(), df)
362
363
364
@pytest.mark.write_disk
365
def test_row_index_empty_file(tmp_path: Path) -> None:
366
tmp_path.mkdir(exist_ok=True)
367
file_path = tmp_path / "test.parquet"
368
df = pl.DataFrame({"a": []}, schema={"a": pl.Float32})
369
df.write_parquet(file_path)
370
result = pl.scan_parquet(file_path).with_row_index("idx").collect()
371
assert result.schema == OrderedDict([("idx", pl.UInt32), ("a", pl.Float32)])
372
373
374
@pytest.mark.write_disk
375
def test_io_struct_async_12500(tmp_path: Path) -> None:
376
file_path = tmp_path / "test.parquet"
377
pl.DataFrame(
378
[
379
pl.Series("c1", [{"a": "foo", "b": "bar"}], dtype=pl.Struct),
380
pl.Series("c2", [18]),
381
]
382
).write_parquet(file_path)
383
assert pl.scan_parquet(file_path).select("c1").collect().to_dict(
384
as_series=False
385
) == {"c1": [{"a": "foo", "b": "bar"}]}
386
387
388
@pytest.mark.write_disk
389
@pytest.mark.parametrize("streaming", [True, False])
390
def test_parquet_different_schema(tmp_path: Path, streaming: bool) -> None:
391
# Schema is different but the projected columns are same dtype.
392
f1 = tmp_path / "a.parquet"
393
f2 = tmp_path / "b.parquet"
394
a = pl.DataFrame({"a": [1.0], "b": "a"})
395
396
b = pl.DataFrame({"a": [1], "b": "a"})
397
398
a.write_parquet(f1)
399
b.write_parquet(f2)
400
assert pl.scan_parquet([f1, f2]).select("b").collect(
401
engine="streaming" if streaming else "in-memory"
402
).columns == ["b"]
403
404
405
@pytest.mark.write_disk
406
def test_nested_slice_12480(tmp_path: Path) -> None:
407
path = tmp_path / "data.parquet"
408
df = pl.select(pl.lit(1).repeat_by(10_000).explode().cast(pl.List(pl.Int32)))
409
410
df.write_parquet(path, use_pyarrow=True, pyarrow_options={"data_page_size": 1})
411
412
assert pl.scan_parquet(path).slice(0, 1).collect().height == 1
413
414
415
@pytest.mark.write_disk
416
def test_scan_deadlock_rayon_spawn_from_async_15172(
417
monkeypatch: Any, tmp_path: Path
418
) -> None:
419
monkeypatch.setenv("POLARS_FORCE_ASYNC", "1")
420
monkeypatch.setenv("POLARS_MAX_THREADS", "1")
421
path = tmp_path / "data.parquet"
422
423
df = pl.Series("x", [1]).to_frame()
424
df.write_parquet(path)
425
426
results = [pl.DataFrame()]
427
428
def scan_collect() -> None:
429
results[0] = pl.collect_all([pl.scan_parquet(path)])[0]
430
431
# Make sure we don't sit there hanging forever on the broken case
432
t = Thread(target=scan_collect, daemon=True)
433
t.start()
434
t.join(5)
435
436
assert results[0].equals(df)
437
438
439
@pytest.mark.write_disk
440
@pytest.mark.parametrize("streaming", [True, False])
441
def test_parquet_schema_mismatch_panic_17067(tmp_path: Path, streaming: bool) -> None:
442
pl.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}).write_parquet(tmp_path / "1.parquet")
443
pl.DataFrame({"c": [1, 2, 3], "d": [4, 5, 6]}).write_parquet(tmp_path / "2.parquet")
444
445
if streaming:
446
with pytest.raises(pl.exceptions.SchemaError):
447
pl.scan_parquet(tmp_path).collect(engine="streaming")
448
else:
449
with pytest.raises(pl.exceptions.SchemaError):
450
pl.scan_parquet(tmp_path).collect(engine="in-memory")
451
452
453
@pytest.mark.write_disk
454
def test_predicate_push_down_categorical_17744(tmp_path: Path) -> None:
455
path = tmp_path / "1"
456
457
df = pl.DataFrame(
458
data={
459
"n": [1, 2, 3],
460
"ccy": ["USD", "JPY", "EUR"],
461
},
462
schema_overrides={"ccy": pl.Categorical("lexical")},
463
)
464
df.write_parquet(path)
465
expect = df.head(1).with_columns(pl.col(pl.Categorical).cast(pl.String))
466
467
lf = pl.scan_parquet(path)
468
469
for predicate in [pl.col("ccy") == "USD", pl.col("ccy").is_in(["USD"])]:
470
assert_frame_equal(
471
lf.filter(predicate)
472
.with_columns(pl.col(pl.Categorical).cast(pl.String))
473
.collect(),
474
expect,
475
)
476
477
478
@pytest.mark.write_disk
479
@pytest.mark.parametrize("streaming", [True, False])
480
def test_parquet_slice_pushdown_non_zero_offset(
481
tmp_path: Path, streaming: bool
482
) -> None:
483
paths = [tmp_path / "1", tmp_path / "2", tmp_path / "3"]
484
dfs = [pl.DataFrame({"x": i}) for i in range(len(paths))]
485
486
for df, p in zip(dfs, paths):
487
df.write_parquet(p)
488
489
# Parquet files containing only the metadata - i.e. the data parts are removed.
490
# Used to test that a reader doesn't try to read any data.
491
def trim_to_metadata(path: str | Path) -> None:
492
path = Path(path)
493
v = path.read_bytes()
494
metadata_and_footer_len = 8 + int.from_bytes(v[-8:][:4], "little")
495
path.write_bytes(v[-metadata_and_footer_len:])
496
497
trim_to_metadata(paths[0])
498
trim_to_metadata(paths[2])
499
500
# Check baseline:
501
# * Metadata can be read without error
502
assert pl.read_parquet_schema(paths[0]) == dfs[0].schema
503
# * Attempting to read any data will error
504
with pytest.raises(ComputeError):
505
pl.scan_parquet(paths[0]).collect(
506
engine="streaming" if streaming else "in-memory"
507
)
508
509
df = dfs[1]
510
assert_frame_equal(
511
pl.scan_parquet(paths)
512
.slice(1, 1)
513
.collect(engine="streaming" if streaming else "in-memory"),
514
df,
515
)
516
assert_frame_equal(
517
pl.scan_parquet(paths[1:])
518
.head(1)
519
.collect(engine="streaming" if streaming else "in-memory"),
520
df,
521
)
522
assert_frame_equal(
523
(
524
pl.scan_parquet([paths[1], paths[1], paths[1]])
525
.with_row_index()
526
.slice(1, 1)
527
.collect(engine="streaming" if streaming else "in-memory")
528
),
529
df.with_row_index(offset=1),
530
)
531
assert_frame_equal(
532
(
533
pl.scan_parquet([paths[1], paths[1], paths[1]])
534
.with_row_index(offset=1)
535
.slice(1, 1)
536
.collect(engine="streaming" if streaming else "in-memory")
537
),
538
df.with_row_index(offset=2),
539
)
540
assert_frame_equal(
541
pl.scan_parquet(paths[1:])
542
.head(1)
543
.collect(engine="streaming" if streaming else "in-memory"),
544
df,
545
)
546
547
# Negative slice unsupported in streaming
548
if not streaming:
549
assert_frame_equal(pl.scan_parquet(paths).slice(-2, 1).collect(), df)
550
assert_frame_equal(pl.scan_parquet(paths[:2]).tail(1).collect(), df)
551
assert_frame_equal(
552
pl.scan_parquet(paths[1:]).slice(-99, 1).collect(), df.clear()
553
)
554
555
path = tmp_path / "data"
556
df = pl.select(x=pl.int_range(0, 50))
557
df.write_parquet(path)
558
assert_frame_equal(pl.scan_parquet(path).slice(-100, 75).collect(), df.head(25))
559
assert_frame_equal(
560
pl.scan_parquet([path, path]).with_row_index().slice(-25, 100).collect(),
561
pl.concat([df, df]).with_row_index().slice(75),
562
)
563
assert_frame_equal(
564
pl.scan_parquet([path, path])
565
.with_row_index(offset=10)
566
.slice(-25, 100)
567
.collect(),
568
pl.concat([df, df]).with_row_index(offset=10).slice(75),
569
)
570
assert_frame_equal(
571
pl.scan_parquet(path).slice(-1, (1 << 32) - 1).collect(), df.tail(1)
572
)
573
574
575
@pytest.mark.write_disk
576
def test_predicate_slice_pushdown_row_index_20485(tmp_path: Path) -> None:
577
tmp_path.mkdir(exist_ok=True)
578
579
file_path = tmp_path / "slice_pushdown.parquet"
580
row_group_size = 100000
581
num_row_groups = 3
582
583
df = pl.select(ref=pl.int_range(num_row_groups * row_group_size))
584
df.write_parquet(file_path, row_group_size=row_group_size)
585
586
# Use a slice that starts near the end of one row group and extends into the next
587
# to test handling of slices that span multiple row groups.
588
slice_start = 199995
589
slice_len = 10
590
ldf = pl.scan_parquet(file_path)
591
sliced_df = ldf.with_row_index().slice(slice_start, slice_len).collect()
592
sliced_df_no_pushdown = (
593
ldf.with_row_index()
594
.slice(slice_start, slice_len)
595
.collect(optimizations=pl.QueryOptFlags(slice_pushdown=False))
596
)
597
598
expected_index = list(range(slice_start, slice_start + slice_len))
599
actual_index = list(sliced_df["index"])
600
assert actual_index == expected_index
601
602
assert_frame_equal(sliced_df, sliced_df_no_pushdown)
603
604
605
@pytest.mark.write_disk
606
@pytest.mark.parametrize("streaming", [True, False])
607
def test_parquet_row_groups_shift_bug_18739(tmp_path: Path, streaming: bool) -> None:
608
tmp_path.mkdir(exist_ok=True)
609
path = tmp_path / "data.bin"
610
611
df = pl.DataFrame({"id": range(100)})
612
df.write_parquet(path, row_group_size=1)
613
614
lf = pl.scan_parquet(path)
615
assert_frame_equal(df, lf.collect(engine="streaming" if streaming else "in-memory"))
616
617
618
@pytest.mark.write_disk
619
@pytest.mark.parametrize("streaming", [True, False])
620
def test_dsl2ir_cached_metadata(tmp_path: Path, streaming: bool) -> None:
621
df = pl.DataFrame({"x": 1})
622
path = tmp_path / "1"
623
df.write_parquet(path)
624
625
lf = pl.scan_parquet(path)
626
assert_frame_equal(lf.collect(), df)
627
628
# Removes the metadata portion of the parquet file.
629
# Used to test that a reader doesn't try to read the metadata.
630
def remove_metadata(path: str | Path) -> None:
631
path = Path(path)
632
v = path.read_bytes()
633
metadata_and_footer_len = 8 + int.from_bytes(v[-8:][:4], "little")
634
path.write_bytes(v[:-metadata_and_footer_len] + b"PAR1")
635
636
remove_metadata(path)
637
assert_frame_equal(lf.collect(engine="streaming" if streaming else "in-memory"), df)
638
639
640
@pytest.mark.write_disk
641
def test_parquet_unaligned_schema_read(tmp_path: Path) -> None:
642
dfs = [
643
pl.DataFrame({"a": 1, "b": 10}),
644
pl.DataFrame({"b": 11, "a": 2}),
645
pl.DataFrame({"x": 3, "a": 3, "y": 3, "b": 12}),
646
]
647
648
paths = [tmp_path / "1", tmp_path / "2", tmp_path / "3"]
649
650
for df, path in zip(dfs, paths):
651
df.write_parquet(path)
652
653
lf = pl.scan_parquet(paths, extra_columns="ignore")
654
655
assert_frame_equal(
656
lf.select("a").collect(engine="in-memory"),
657
pl.DataFrame({"a": [1, 2, 3]}),
658
)
659
660
assert_frame_equal(
661
lf.with_row_index().select("a").collect(engine="in-memory"),
662
pl.DataFrame({"a": [1, 2, 3]}),
663
)
664
665
assert_frame_equal(
666
lf.select("b", "a").collect(engine="in-memory"),
667
pl.DataFrame({"b": [10, 11, 12], "a": [1, 2, 3]}),
668
)
669
670
assert_frame_equal(
671
pl.scan_parquet(paths[:2]).collect(engine="in-memory"),
672
pl.DataFrame({"a": [1, 2], "b": [10, 11]}),
673
)
674
675
lf = pl.scan_parquet(paths, extra_columns="raise")
676
677
with pytest.raises(pl.exceptions.SchemaError):
678
lf.collect(engine="in-memory")
679
680
with pytest.raises(pl.exceptions.SchemaError):
681
lf.with_row_index().collect(engine="in-memory")
682
683
684
@pytest.mark.write_disk
685
@pytest.mark.parametrize("streaming", [True, False])
686
def test_parquet_unaligned_schema_read_dtype_mismatch(
687
tmp_path: Path, streaming: bool
688
) -> None:
689
dfs = [
690
pl.DataFrame({"a": 1, "b": 10}),
691
pl.DataFrame({"b": "11", "a": "2"}),
692
]
693
694
paths = [tmp_path / "1", tmp_path / "2"]
695
696
for df, path in zip(dfs, paths):
697
df.write_parquet(path)
698
699
lf = pl.scan_parquet(paths)
700
701
with pytest.raises(pl.exceptions.SchemaError, match="data type mismatch"):
702
lf.collect(engine="streaming" if streaming else "in-memory")
703
704
705
@pytest.mark.write_disk
706
@pytest.mark.parametrize("streaming", [True, False])
707
def test_parquet_unaligned_schema_read_missing_cols_from_first(
708
tmp_path: Path, streaming: bool
709
) -> None:
710
dfs = [
711
pl.DataFrame({"a": 1, "b": 10}),
712
pl.DataFrame({"b": 11}),
713
]
714
715
paths = [tmp_path / "1", tmp_path / "2"]
716
717
for df, path in zip(dfs, paths):
718
df.write_parquet(path)
719
720
lf = pl.scan_parquet(paths)
721
722
with pytest.raises(
723
(pl.exceptions.SchemaError, pl.exceptions.ColumnNotFoundError),
724
):
725
lf.collect(engine="streaming" if streaming else "in-memory")
726
727
728
@pytest.mark.parametrize("parallel", ["columns", "row_groups", "prefiltered", "none"])
729
@pytest.mark.parametrize("streaming", [True, False])
730
@pytest.mark.write_disk
731
def test_parquet_schema_arg(
732
tmp_path: Path,
733
parallel: ParallelStrategy,
734
streaming: bool,
735
) -> None:
736
tmp_path.mkdir(exist_ok=True)
737
dfs = [pl.DataFrame({"a": 1, "b": 1}), pl.DataFrame({"a": 2, "b": 2})]
738
paths = [tmp_path / "1", tmp_path / "2"]
739
740
for df, path in zip(dfs, paths):
741
df.write_parquet(path)
742
743
schema: dict[str, pl.DataType] = {
744
"1": pl.Datetime(time_unit="ms", time_zone="CET"),
745
"a": pl.Int64(),
746
"b": pl.Int64(),
747
}
748
749
# Test `schema` containing an extra column.
750
751
lf = pl.scan_parquet(paths, parallel=parallel, schema=schema)
752
753
with pytest.raises((pl.exceptions.SchemaError, pl.exceptions.ColumnNotFoundError)):
754
lf.collect(engine="streaming" if streaming else "in-memory")
755
756
lf = pl.scan_parquet(
757
paths, parallel=parallel, schema=schema, missing_columns="insert"
758
)
759
760
assert_frame_equal(
761
lf.collect(engine="streaming" if streaming else "in-memory"),
762
pl.DataFrame({"1": None, "a": [1, 2], "b": [1, 2]}, schema=schema),
763
)
764
765
# Just one test that `read_parquet` is propagating this argument.
766
assert_frame_equal(
767
pl.read_parquet(
768
paths, parallel=parallel, schema=schema, missing_columns="insert"
769
),
770
pl.DataFrame({"1": None, "a": [1, 2], "b": [1, 2]}, schema=schema),
771
)
772
773
# Issue #19081: If a schema arg is passed, ensure its fields are propagated
774
# to the IR, otherwise even if `missing_columns='insert'`, downstream
775
# `select()`s etc. will fail with ColumnNotFound if the column is not in
776
# the first file.
777
lf = pl.scan_parquet(
778
paths, parallel=parallel, schema=schema, missing_columns="insert"
779
).select("1")
780
781
s = lf.collect(engine="streaming" if streaming else "in-memory").to_series()
782
assert s.len() == 2
783
assert s.null_count() == 2
784
785
# Test files containing extra columns not in `schema`
786
787
schema: dict[str, type[pl.DataType]] = {"a": pl.Int64} # type: ignore[no-redef]
788
789
for missing_columns in ["insert", "raise"]:
790
lf = pl.scan_parquet(
791
paths,
792
parallel=parallel,
793
schema=schema,
794
missing_columns=missing_columns, # type: ignore[arg-type]
795
)
796
797
with pytest.raises(pl.exceptions.SchemaError):
798
lf.collect(engine="streaming" if streaming else "in-memory")
799
800
lf = pl.scan_parquet(
801
paths,
802
parallel=parallel,
803
schema=schema,
804
extra_columns="ignore",
805
).select("a")
806
807
assert_frame_equal(
808
lf.collect(engine="in-memory"),
809
pl.DataFrame({"a": [1, 2]}, schema=schema),
810
)
811
812
schema: dict[str, type[pl.DataType]] = {"a": pl.Int64, "b": pl.Int8} # type: ignore[no-redef]
813
814
lf = pl.scan_parquet(paths, parallel=parallel, schema=schema)
815
816
with pytest.raises(
817
pl.exceptions.SchemaError,
818
match="data type mismatch for column b: incoming: Int64 != target: Int8",
819
):
820
lf.collect(engine="streaming" if streaming else "in-memory")
821
822
823
def test_scan_parquet_empty_path_expansion(tmp_path: Path) -> None:
824
tmp_path.mkdir(exist_ok=True)
825
826
with pytest.raises(
827
ComputeError,
828
match=r"failed to retrieve first file schema \(parquet\): "
829
r"expanded paths were empty \(path expansion input: "
830
".*Hint: passing a schema can allow this scan to succeed with an empty DataFrame",
831
):
832
pl.scan_parquet(tmp_path).collect()
833
834
# Scan succeeds when schema is provided
835
assert_frame_equal(
836
pl.scan_parquet(tmp_path, schema={"x": pl.Int64}).collect(),
837
pl.DataFrame(schema={"x": pl.Int64}),
838
)
839
840
assert_frame_equal(
841
pl.scan_parquet(tmp_path, schema={"x": pl.Int64}).with_row_index().collect(),
842
pl.DataFrame(schema={"x": pl.Int64}).with_row_index(),
843
)
844
845
assert_frame_equal(
846
pl.scan_parquet(
847
tmp_path, schema={"x": pl.Int64}, hive_schema={"h": pl.String}
848
).collect(),
849
pl.DataFrame(schema={"x": pl.Int64, "h": pl.String}),
850
)
851
852
assert_frame_equal(
853
(
854
pl.scan_parquet(
855
tmp_path, schema={"x": pl.Int64}, hive_schema={"h": pl.String}
856
)
857
.with_row_index()
858
.collect()
859
),
860
pl.DataFrame(schema={"x": pl.Int64, "h": pl.String}).with_row_index(),
861
)
862
863
864
@pytest.mark.parametrize("missing_columns", ["insert", "raise"])
865
@pytest.mark.write_disk
866
def test_scan_parquet_ignores_dtype_mismatch_for_non_projected_columns_19249(
867
tmp_path: Path,
868
missing_columns: str,
869
) -> None:
870
tmp_path.mkdir(exist_ok=True)
871
paths = [tmp_path / "1", tmp_path / "2"]
872
873
pl.DataFrame({"a": 1, "b": 1}, schema={"a": pl.Int32, "b": pl.UInt8}).write_parquet(
874
paths[0]
875
)
876
pl.DataFrame(
877
{"a": 1, "b": 1}, schema={"a": pl.Int32, "b": pl.UInt64}
878
).write_parquet(paths[1])
879
880
assert_frame_equal(
881
pl.scan_parquet(paths, missing_columns=missing_columns) # type: ignore[arg-type]
882
.select("a")
883
.collect(engine="in-memory"),
884
pl.DataFrame({"a": [1, 1]}, schema={"a": pl.Int32}),
885
)
886
887
888
@pytest.mark.parametrize("streaming", [True, False])
889
@pytest.mark.write_disk
890
def test_scan_parquet_streaming_row_index_19606(
891
tmp_path: Path, streaming: bool
892
) -> None:
893
tmp_path.mkdir(exist_ok=True)
894
paths = [tmp_path / "1", tmp_path / "2"]
895
896
dfs = [pl.DataFrame({"x": i}) for i in range(len(paths))]
897
898
for df, p in zip(dfs, paths):
899
df.write_parquet(p)
900
901
assert_frame_equal(
902
pl.scan_parquet(tmp_path)
903
.with_row_index()
904
.collect(engine="streaming" if streaming else "in-memory"),
905
pl.DataFrame(
906
{"index": [0, 1], "x": [0, 1]}, schema={"index": pl.UInt32, "x": pl.Int64}
907
),
908
)
909
910
911
def test_scan_parquet_prefilter_panic_22452() -> None:
912
# This is, the easiest way to control the threadpool size so that it is stable.
913
out = subprocess.check_output(
914
[
915
sys.executable,
916
"-c",
917
"""\
918
import os
919
920
os.environ["POLARS_MAX_THREADS"] = "2"
921
922
import io
923
924
import polars as pl
925
from polars.testing import assert_frame_equal
926
927
assert pl.thread_pool_size() == 2
928
929
f = io.BytesIO()
930
931
df = pl.DataFrame({x: 1 for x in ["a", "b", "c", "d", "e"]})
932
df.write_parquet(f)
933
f.seek(0)
934
935
assert_frame_equal(
936
pl.scan_parquet(f, parallel="prefiltered")
937
.filter(pl.col(c) == 1 for c in ["a", "b", "c"])
938
.collect(),
939
df,
940
)
941
942
print("OK", end="")
943
""",
944
],
945
)
946
947
assert out == b"OK"
948
949
950
def test_scan_parquet_in_mem_to_streaming_dispatch_deadlock_22641() -> None:
951
out = subprocess.check_output(
952
[
953
sys.executable,
954
"-c",
955
"""\
956
import os
957
958
os.environ["POLARS_MAX_THREADS"] = "1"
959
os.environ["POLARS_VERBOSE"] = "1"
960
961
import io
962
import sys
963
from threading import Thread
964
965
import polars as pl
966
967
assert pl.thread_pool_size() == 1
968
969
f = io.BytesIO()
970
pl.DataFrame({"x": 1}).write_parquet(f)
971
972
q = (
973
pl.scan_parquet(f)
974
.filter(pl.sum_horizontal(pl.col("x"), pl.col("x"), pl.col("x")) >= 0)
975
.join(pl.scan_parquet(f), on="x", how="left")
976
)
977
978
results = [
979
pl.DataFrame(),
980
pl.DataFrame(),
981
pl.DataFrame(),
982
pl.DataFrame(),
983
pl.DataFrame(),
984
]
985
986
987
def run():
988
# Also test just a single scan
989
pl.scan_parquet(f).collect()
990
991
print("QUERY-FENCE", file=sys.stderr)
992
993
results[0] = q.collect()
994
995
print("QUERY-FENCE", file=sys.stderr)
996
997
results[1] = pl.concat([q, q, q]).collect().head(1)
998
999
print("QUERY-FENCE", file=sys.stderr)
1000
1001
results[2] = pl.collect_all([q, q, q])[0]
1002
1003
print("QUERY-FENCE", file=sys.stderr)
1004
1005
results[3] = pl.collect_all(3 * [pl.concat(3 * [q])])[0].head(1)
1006
1007
print("QUERY-FENCE", file=sys.stderr)
1008
1009
results[4] = q.collect(background=True).fetch_blocking()
1010
1011
1012
t = Thread(target=run, daemon=True)
1013
t.start()
1014
t.join(5)
1015
1016
assert [x.equals(pl.DataFrame({"x": 1})) for x in results] == [
1017
True,
1018
True,
1019
True,
1020
True,
1021
True,
1022
]
1023
1024
print("OK", end="", file=sys.stderr)
1025
""",
1026
],
1027
stderr=subprocess.STDOUT,
1028
)
1029
1030
assert out.endswith(b"OK")
1031
1032
def ensure_caches_dropped(verbose_log: str) -> None:
1033
cache_hit_prefix = "CACHE HIT: cache id: "
1034
1035
ids_hit = {
1036
x[len(cache_hit_prefix) :]
1037
for x in verbose_log.splitlines()
1038
if x.startswith(cache_hit_prefix)
1039
}
1040
1041
cache_drop_prefix = "CACHE DROP: cache id: "
1042
1043
ids_dropped = {
1044
x[len(cache_drop_prefix) :]
1045
for x in verbose_log.splitlines()
1046
if x.startswith(cache_drop_prefix)
1047
}
1048
1049
assert ids_hit == ids_dropped
1050
1051
out_str = out.decode()
1052
1053
for logs in out_str.split("QUERY-FENCE"):
1054
ensure_caches_dropped(logs)
1055
1056
1057
def test_parquet_prefiltering_inserted_column_23268() -> None:
1058
df = pl.DataFrame({"a": [1, 2, 3, 4]}, schema={"a": pl.Int8})
1059
1060
f = io.BytesIO()
1061
df.write_parquet(f)
1062
1063
f.seek(0)
1064
assert_frame_equal(
1065
(
1066
pl.scan_parquet(
1067
f,
1068
schema={"a": pl.Int8, "b": pl.Int16},
1069
missing_columns="insert",
1070
)
1071
.filter(pl.col("a") == 3)
1072
.filter(pl.col("b") == 3)
1073
.collect()
1074
),
1075
pl.DataFrame(schema={"a": pl.Int8, "b": pl.Int16}),
1076
)
1077
1078
1079
@pytest.mark.may_fail_cloud # reason: inspects logs
1080
def test_scan_parquet_prefilter_with_cast(
1081
monkeypatch: pytest.MonkeyPatch,
1082
capfd: pytest.CaptureFixture[str],
1083
) -> None:
1084
f = io.BytesIO()
1085
1086
df = pl.DataFrame(
1087
{
1088
"a": ["A", "B", "C", "D", "E", "F"],
1089
"b": pl.Series([1, 1, 1, 1, 0, 1], dtype=pl.UInt8),
1090
}
1091
)
1092
1093
df.write_parquet(f, row_group_size=3)
1094
1095
md = pq.read_metadata(f)
1096
1097
assert [md.row_group(i).num_rows for i in range(md.num_row_groups)] == [3, 3]
1098
1099
q = pl.scan_parquet(
1100
f,
1101
schema={"a": pl.String, "b": pl.Int16},
1102
cast_options=pl.ScanCastOptions(integer_cast="upcast"),
1103
include_file_paths="file_path",
1104
).filter(pl.col("b") - 1 == pl.lit(-1, dtype=pl.Int16))
1105
1106
with monkeypatch.context() as cx:
1107
cx.setenv("POLARS_VERBOSE", "1")
1108
capfd.readouterr()
1109
out = q.collect()
1110
capture = capfd.readouterr().err
1111
1112
assert (
1113
"[ParquetFileReader]: Pre-filtered decode enabled (1 live, 1 non-live)"
1114
in capture
1115
)
1116
assert (
1117
"[ParquetFileReader]: Predicate pushdown: reading 1 / 2 row groups" in capture
1118
)
1119
1120
assert_frame_equal(
1121
out,
1122
pl.DataFrame(
1123
{
1124
"a": "E",
1125
"b": pl.Series([0], dtype=pl.Int16),
1126
"file_path": "in-mem",
1127
}
1128
),
1129
)
1130
1131
1132
def test_prefilter_with_n_rows_23790() -> None:
1133
df = pl.DataFrame(
1134
{
1135
"a": ["A", "B", "C", "D", "E", "F"],
1136
"b": [1, 2, 3, 4, 5, 6],
1137
}
1138
)
1139
1140
f = io.BytesIO()
1141
1142
df.write_parquet(f, row_group_size=2)
1143
1144
f.seek(0)
1145
1146
md = pq.read_metadata(f)
1147
1148
assert [md.row_group(i).num_rows for i in range(md.num_row_groups)] == [2, 2, 2]
1149
1150
f.seek(0)
1151
q = pl.scan_parquet(f, n_rows=3).filter(pl.col("b").is_in([1, 3]))
1152
1153
assert_frame_equal(q.collect(), pl.DataFrame({"a": ["A", "C"], "b": [1, 3]}))
1154
1155
# With row index / file_path
1156
1157
df = pl.DataFrame(
1158
{
1159
"a": ["A", "B", "C", "D", "E", "F"],
1160
"b": [1, 2, 3, 4, 5, 6],
1161
}
1162
)
1163
1164
f = io.BytesIO()
1165
1166
df.write_parquet(f, row_group_size=2)
1167
1168
f.seek(0)
1169
md = pq.read_metadata(f)
1170
1171
assert [md.row_group(i).num_rows for i in range(md.num_row_groups)] == [2, 2, 2]
1172
1173
f.seek(0)
1174
q = pl.scan_parquet(
1175
f,
1176
n_rows=3,
1177
row_index_name="index",
1178
include_file_paths="file_path",
1179
).filter(pl.col("b").is_in([1, 3]))
1180
1181
assert_frame_equal(
1182
q.collect(),
1183
pl.DataFrame(
1184
{
1185
"index": pl.Series([0, 2], dtype=pl.get_index_type()),
1186
"a": ["A", "C"],
1187
"b": [1, 3],
1188
"file_path": "in-mem",
1189
}
1190
),
1191
)
1192
1193
1194
def test_scan_parquet_filter_index_panic_23849(monkeypatch: pytest.MonkeyPatch) -> None:
1195
monkeypatch.setenv("POLARS_PARQUET_DECODE_TARGET_VALUES_PER_THREAD", "5")
1196
num_rows = 3
1197
num_cols = 5
1198
1199
f = io.BytesIO()
1200
1201
pl.select(
1202
pl.int_range(0, num_rows).alias(f"col_{i}") for i in range(num_cols)
1203
).write_parquet(f)
1204
1205
for parallel in ["auto", "columns", "row_groups", "prefiltered", "none"]:
1206
pl.scan_parquet(f, parallel=parallel).filter( # type: ignore[arg-type]
1207
pl.col("col_0").ge(0) & pl.col("col_0").lt(num_rows + 1)
1208
).collect()
1209
1210