Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/io/test_lazy_parquet.py
8484 views
1
from __future__ import annotations
2
3
import base64
4
import io
5
import subprocess
6
import sys
7
from collections import OrderedDict
8
from datetime import datetime
9
from pathlib import Path
10
from threading import Thread
11
from typing import TYPE_CHECKING, Any
12
13
import pandas as pd
14
import pyarrow as pa
15
import pyarrow.parquet as pq
16
import pytest
17
18
import polars as pl
19
from polars.exceptions import ComputeError, SchemaError
20
from polars.testing import assert_frame_equal
21
22
if TYPE_CHECKING:
23
from polars._typing import ParallelStrategy
24
from tests.conftest import PlMonkeyPatch
25
26
27
@pytest.fixture
28
def parquet_file_path(io_files_path: Path) -> Path:
29
return io_files_path / "small.parquet"
30
31
32
@pytest.fixture
33
def foods_parquet_path(io_files_path: Path) -> Path:
34
return io_files_path / "foods1.parquet"
35
36
37
def test_scan_parquet(parquet_file_path: Path) -> None:
38
df = pl.scan_parquet(parquet_file_path)
39
assert df.collect().shape == (4, 3)
40
41
42
def test_scan_parquet_local_with_async(
43
plmonkeypatch: PlMonkeyPatch, foods_parquet_path: Path
44
) -> None:
45
plmonkeypatch.setenv("POLARS_FORCE_ASYNC", "1")
46
pl.scan_parquet(foods_parquet_path.relative_to(Path.cwd())).head(1).collect()
47
48
49
def test_row_index(foods_parquet_path: Path) -> None:
50
df = pl.read_parquet(foods_parquet_path, row_index_name="row_index")
51
assert df["row_index"].to_list() == list(range(27))
52
53
df = (
54
pl.scan_parquet(foods_parquet_path, row_index_name="row_index")
55
.filter(pl.col("category") == pl.lit("vegetables"))
56
.collect()
57
)
58
59
assert df["row_index"].to_list() == [0, 6, 11, 13, 14, 20, 25]
60
61
df = (
62
pl.scan_parquet(foods_parquet_path, row_index_name="row_index")
63
.with_row_index("foo", 10)
64
.filter(pl.col("category") == pl.lit("vegetables"))
65
.collect()
66
)
67
68
assert df["foo"].to_list() == [10, 16, 21, 23, 24, 30, 35]
69
70
71
def test_row_index_len_16543(foods_parquet_path: Path) -> None:
72
q = pl.scan_parquet(foods_parquet_path).with_row_index()
73
assert q.select(pl.all()).select(pl.len()).collect().item() == 27
74
75
76
@pytest.mark.write_disk
77
def test_categorical_parquet_statistics(tmp_path: Path) -> None:
78
tmp_path.mkdir(exist_ok=True)
79
80
df = pl.DataFrame(
81
{
82
"book": [
83
"bookA",
84
"bookA",
85
"bookB",
86
"bookA",
87
"bookA",
88
"bookC",
89
"bookC",
90
"bookC",
91
],
92
"transaction_id": [1, 2, 3, 4, 5, 6, 7, 8],
93
"user": ["bob", "bob", "bob", "tim", "lucy", "lucy", "lucy", "lucy"],
94
}
95
).with_columns(pl.col("book").cast(pl.Categorical))
96
97
file_path = tmp_path / "books.parquet"
98
df.write_parquet(file_path, statistics=True)
99
100
parallel_options: list[ParallelStrategy] = [
101
"auto",
102
"columns",
103
"row_groups",
104
"none",
105
]
106
for par in parallel_options:
107
df = (
108
pl.scan_parquet(file_path, parallel=par)
109
.filter(pl.col("book") == "bookA")
110
.collect()
111
)
112
assert df.shape == (4, 3)
113
114
115
@pytest.mark.write_disk
116
def test_parquet_eq_stats(tmp_path: Path) -> None:
117
tmp_path.mkdir(exist_ok=True)
118
119
file_path = tmp_path / "stats.parquet"
120
121
df1 = pd.DataFrame({"a": [None, 1, None, 2, 3, 3, 4, 4, 5, 5]})
122
df1.to_parquet(file_path, engine="pyarrow")
123
df = pl.scan_parquet(file_path).filter(pl.col("a") == 4).collect()
124
assert df["a"].to_list() == [4.0, 4.0]
125
126
assert (
127
pl.scan_parquet(file_path).filter(pl.col("a") == 2).select(pl.col("a").sum())
128
).collect()[0, "a"] == 2.0
129
130
assert pl.scan_parquet(file_path).filter(pl.col("a") == 5).collect().shape == (
131
2,
132
1,
133
)
134
135
136
@pytest.mark.write_disk
137
def test_parquet_is_in_stats(tmp_path: Path) -> None:
138
tmp_path.mkdir(exist_ok=True)
139
140
file_path = tmp_path / "stats.parquet"
141
142
df1 = pd.DataFrame({"a": [None, 1, None, 2, 3, 3, 4, 4, 5, 5]})
143
df1.to_parquet(file_path, engine="pyarrow")
144
df = pl.scan_parquet(file_path).filter(pl.col("a").is_in([5])).collect()
145
assert df["a"].to_list() == [5.0, 5.0]
146
147
assert (
148
pl.scan_parquet(file_path)
149
.filter(pl.col("a").is_in([5]))
150
.select(pl.col("a").sum())
151
).collect()[0, "a"] == 10.0
152
153
assert (
154
pl.scan_parquet(file_path)
155
.filter(pl.col("a").is_in([1, 2, 3]))
156
.select(pl.col("a").sum())
157
).collect()[0, "a"] == 9.0
158
159
assert (
160
pl.scan_parquet(file_path)
161
.filter(pl.col("a").is_in([1, 2, 3]))
162
.select(pl.col("a").sum())
163
).collect()[0, "a"] == 9.0
164
165
assert (
166
pl.scan_parquet(file_path)
167
.filter(pl.col("a").is_in([5]))
168
.select(pl.col("a").sum())
169
).collect()[0, "a"] == 10.0
170
171
assert pl.scan_parquet(file_path).filter(
172
pl.col("a").is_in([1, 2, 3, 4, 5])
173
).collect().shape == (8, 1)
174
175
176
@pytest.mark.write_disk
177
def test_parquet_stats(tmp_path: Path) -> None:
178
tmp_path.mkdir(exist_ok=True)
179
180
file_path = tmp_path / "binary_stats.parquet"
181
182
df1 = pd.DataFrame({"a": [None, 1, None, 2, 3, 3, 4, 4, 5, 5]})
183
df1.to_parquet(file_path, engine="pyarrow")
184
df = (
185
pl.scan_parquet(file_path)
186
.filter(pl.col("a").is_not_null() & (pl.col("a") > 4))
187
.collect()
188
)
189
assert df["a"].to_list() == [5.0, 5.0]
190
191
assert (
192
pl.scan_parquet(file_path).filter(pl.col("a") > 4).select(pl.col("a").sum())
193
).collect()[0, "a"] == 10.0
194
195
assert (
196
pl.scan_parquet(file_path).filter(pl.col("a") < 4).select(pl.col("a").sum())
197
).collect()[0, "a"] == 9.0
198
199
assert (
200
pl.scan_parquet(file_path).filter(pl.col("a") < 4).select(pl.col("a").sum())
201
).collect()[0, "a"] == 9.0
202
203
assert (
204
pl.scan_parquet(file_path).filter(pl.col("a") > 4).select(pl.col("a").sum())
205
).collect()[0, "a"] == 10.0
206
assert pl.scan_parquet(file_path).filter(
207
(pl.col("a") * 10) > 5.0
208
).collect().shape == (8, 1)
209
210
211
def test_row_index_schema_parquet(parquet_file_path: Path) -> None:
212
assert (
213
pl.scan_parquet(str(parquet_file_path), row_index_name="id")
214
.select(["id", "b"])
215
.collect()
216
).dtypes == [pl.get_index_type(), pl.String]
217
218
219
@pytest.mark.may_fail_cloud # reason: inspects logs
220
@pytest.mark.write_disk
221
def test_parquet_is_in_statistics(
222
plmonkeypatch: PlMonkeyPatch, capfd: Any, tmp_path: Path
223
) -> None:
224
tmp_path.mkdir(exist_ok=True)
225
226
plmonkeypatch.setenv("POLARS_VERBOSE", "1")
227
228
df = pl.DataFrame({"idx": pl.arange(0, 100, eager=True)}).with_columns(
229
(pl.col("idx") // 25).alias("part")
230
)
231
df = pl.concat(df.partition_by("part", as_dict=False), rechunk=False)
232
assert df.n_chunks("all") == [4, 4]
233
234
file_path = tmp_path / "stats.parquet"
235
df.write_parquet(file_path, statistics=True, use_pyarrow=False)
236
237
file_path = tmp_path / "stats.parquet"
238
df.write_parquet(file_path, statistics=True, use_pyarrow=False)
239
240
for pred in [
241
pl.col("idx").is_in([150, 200, 300]),
242
pl.col("idx").is_in([5, 250, 350]),
243
]:
244
result = pl.scan_parquet(file_path).filter(pred).collect()
245
assert_frame_equal(result, df.filter(pred))
246
247
captured = capfd.readouterr().err
248
assert "Predicate pushdown: reading 1 / 1 row groups" in captured
249
assert "Predicate pushdown: reading 0 / 1 row groups" in captured
250
251
252
@pytest.mark.may_fail_cloud # reason: inspects logs
253
@pytest.mark.write_disk
254
def test_parquet_statistics(
255
plmonkeypatch: PlMonkeyPatch, capfd: Any, tmp_path: Path
256
) -> None:
257
tmp_path.mkdir(exist_ok=True)
258
259
plmonkeypatch.setenv("POLARS_VERBOSE", "1")
260
261
df = pl.DataFrame({"idx": pl.arange(0, 100, eager=True)}).with_columns(
262
(pl.col("idx") // 25).alias("part")
263
)
264
df = pl.concat(df.partition_by("part", as_dict=False), rechunk=False)
265
assert df.n_chunks("all") == [4, 4]
266
267
file_path = tmp_path / "stats.parquet"
268
df.write_parquet(file_path, statistics=True, use_pyarrow=False, row_group_size=50)
269
270
for pred in [
271
pl.col("idx") < 50,
272
pl.col("idx") > 50,
273
pl.col("idx").null_count() != 0,
274
pl.col("idx").null_count() == 0,
275
pl.col("idx").min() == pl.col("part").null_count(),
276
]:
277
result = pl.scan_parquet(file_path).filter(pred).collect()
278
assert_frame_equal(result, df.filter(pred))
279
280
captured = capfd.readouterr().err
281
282
assert "Predicate pushdown: reading 1 / 2 row groups" in captured
283
284
285
@pytest.mark.write_disk
286
def test_categorical(tmp_path: Path) -> None:
287
tmp_path.mkdir(exist_ok=True)
288
289
df = pl.DataFrame(
290
[
291
pl.Series("name", ["Bob", "Alice", "Bob"], pl.Categorical),
292
pl.Series("amount", [100, 200, 300]),
293
]
294
)
295
296
file_path = tmp_path / "categorical.parquet"
297
df.write_parquet(file_path)
298
299
result = (
300
pl.scan_parquet(file_path)
301
.group_by("name")
302
.agg(pl.col("amount").sum())
303
.collect()
304
.sort("name")
305
)
306
expected = pl.DataFrame(
307
{"name": ["Alice", "Bob"], "amount": [200, 400]},
308
schema_overrides={"name": pl.Categorical},
309
)
310
assert_frame_equal(result, expected)
311
312
313
def test_glob_n_rows(io_files_path: Path) -> None:
314
file_path = io_files_path / "foods*.parquet"
315
df = pl.scan_parquet(file_path, n_rows=40).collect()
316
317
# 27 rows from foods1.parquet and 13 from foods2.parquet
318
assert df.shape == (40, 4)
319
320
# take first and last rows
321
assert df[[0, 39]].to_dict(as_series=False) == {
322
"category": ["vegetables", "seafood"],
323
"calories": [45, 146],
324
"fats_g": [0.5, 6.0],
325
"sugars_g": [2, 2],
326
}
327
328
329
@pytest.mark.write_disk
330
def test_parquet_statistics_filter_9925(tmp_path: Path) -> None:
331
tmp_path.mkdir(exist_ok=True)
332
file_path = tmp_path / "codes.parquet"
333
df = pl.DataFrame({"code": [300964, 300972, 500_000, 26]})
334
df.write_parquet(file_path, statistics=True)
335
336
q = pl.scan_parquet(file_path).filter(
337
(pl.col("code").floordiv(100_000)).is_in([0, 3])
338
)
339
assert q.collect().to_dict(as_series=False) == {"code": [300964, 300972, 26]}
340
341
342
@pytest.mark.write_disk
343
def test_parquet_statistics_filter_11069(tmp_path: Path) -> None:
344
tmp_path.mkdir(exist_ok=True)
345
file_path = tmp_path / "foo.parquet"
346
pl.DataFrame({"x": [1, None]}).write_parquet(file_path, statistics=False)
347
348
result = pl.scan_parquet(file_path).filter(pl.col("x").is_null()).collect()
349
expected = {"x": [None]}
350
assert result.to_dict(as_series=False) == expected
351
352
353
def test_parquet_list_arg(io_files_path: Path) -> None:
354
first = io_files_path / "foods1.parquet"
355
second = io_files_path / "foods2.parquet"
356
357
df = pl.scan_parquet(source=[first, second]).collect()
358
assert df.shape == (54, 4)
359
assert df.row(-1) == ("seafood", 194, 12.0, 1)
360
assert df.row(0) == ("vegetables", 45, 0.5, 2)
361
362
363
@pytest.mark.write_disk
364
def test_parquet_many_row_groups_12297(tmp_path: Path) -> None:
365
tmp_path.mkdir(exist_ok=True)
366
file_path = tmp_path / "foo.parquet"
367
df = pl.DataFrame({"x": range(100)})
368
df.write_parquet(file_path, row_group_size=5, use_pyarrow=True)
369
assert_frame_equal(pl.scan_parquet(file_path).collect(), df)
370
371
372
@pytest.mark.write_disk
373
def test_row_index_empty_file(tmp_path: Path) -> None:
374
tmp_path.mkdir(exist_ok=True)
375
file_path = tmp_path / "test.parquet"
376
df = pl.DataFrame({"a": []}, schema={"a": pl.Float32})
377
df.write_parquet(file_path)
378
result = pl.scan_parquet(file_path).with_row_index("idx").collect()
379
assert result.schema == OrderedDict([("idx", pl.UInt32), ("a", pl.Float32)])
380
381
382
@pytest.mark.write_disk
383
def test_io_struct_async_12500(tmp_path: Path) -> None:
384
file_path = tmp_path / "test.parquet"
385
pl.DataFrame(
386
[
387
pl.Series("c1", [{"a": "foo", "b": "bar"}], dtype=pl.Struct),
388
pl.Series("c2", [18]),
389
]
390
).write_parquet(file_path)
391
assert pl.scan_parquet(file_path).select("c1").collect().to_dict(
392
as_series=False
393
) == {"c1": [{"a": "foo", "b": "bar"}]}
394
395
396
@pytest.mark.write_disk
397
@pytest.mark.parametrize("streaming", [True, False])
398
def test_parquet_different_schema(tmp_path: Path, streaming: bool) -> None:
399
# Schema is different but the projected columns are same dtype.
400
f1 = tmp_path / "a.parquet"
401
f2 = tmp_path / "b.parquet"
402
a = pl.DataFrame({"a": [1.0], "b": "a"})
403
404
b = pl.DataFrame({"a": [1], "b": "a"})
405
406
a.write_parquet(f1)
407
b.write_parquet(f2)
408
assert pl.scan_parquet([f1, f2]).select("b").collect(
409
engine="streaming" if streaming else "in-memory"
410
).columns == ["b"]
411
412
413
@pytest.mark.write_disk
414
def test_nested_slice_12480(tmp_path: Path) -> None:
415
path = tmp_path / "data.parquet"
416
df = pl.select(pl.lit(1).repeat_by(10_000).explode().cast(pl.List(pl.Int32)))
417
418
df.write_parquet(path, use_pyarrow=True, pyarrow_options={"data_page_size": 1})
419
420
assert pl.scan_parquet(path).slice(0, 1).collect().height == 1
421
422
423
@pytest.mark.write_disk
424
def test_scan_deadlock_rayon_spawn_from_async_15172(
425
plmonkeypatch: PlMonkeyPatch, tmp_path: Path
426
) -> None:
427
plmonkeypatch.setenv("POLARS_FORCE_ASYNC", "1")
428
plmonkeypatch.setenv("POLARS_MAX_THREADS", "1")
429
path = tmp_path / "data.parquet"
430
431
df = pl.Series("x", [1]).to_frame()
432
df.write_parquet(path)
433
434
results = [pl.DataFrame()]
435
436
def scan_collect() -> None:
437
results[0] = pl.collect_all([pl.scan_parquet(path)])[0]
438
439
# Make sure we don't sit there hanging forever on the broken case
440
t = Thread(target=scan_collect, daemon=True)
441
t.start()
442
t.join(5)
443
444
assert results[0].equals(df)
445
446
447
@pytest.mark.write_disk
448
@pytest.mark.parametrize("streaming", [True, False])
449
def test_parquet_schema_mismatch_panic_17067(tmp_path: Path, streaming: bool) -> None:
450
pl.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}).write_parquet(tmp_path / "1.parquet")
451
pl.DataFrame({"c": [1, 2, 3], "d": [4, 5, 6]}).write_parquet(tmp_path / "2.parquet")
452
453
if streaming:
454
with pytest.raises(pl.exceptions.SchemaError):
455
pl.scan_parquet(tmp_path).collect(engine="streaming")
456
else:
457
with pytest.raises(pl.exceptions.SchemaError):
458
pl.scan_parquet(tmp_path).collect(engine="in-memory")
459
460
461
@pytest.mark.write_disk
462
def test_predicate_push_down_categorical_17744(tmp_path: Path) -> None:
463
path = tmp_path / "1"
464
465
df = pl.DataFrame(
466
data={
467
"n": [1, 2, 3],
468
"ccy": ["USD", "JPY", "EUR"],
469
},
470
schema_overrides={"ccy": pl.Categorical()},
471
)
472
df.write_parquet(path)
473
expect = df.head(1).with_columns(pl.col(pl.Categorical).cast(pl.String))
474
475
lf = pl.scan_parquet(path)
476
477
for predicate in [pl.col("ccy") == "USD", pl.col("ccy").is_in(["USD"])]:
478
assert_frame_equal(
479
lf.filter(predicate)
480
.with_columns(pl.col(pl.Categorical).cast(pl.String))
481
.collect(),
482
expect,
483
)
484
485
486
@pytest.mark.write_disk
487
@pytest.mark.parametrize("streaming", [True, False])
488
def test_parquet_slice_pushdown_non_zero_offset(
489
tmp_path: Path, streaming: bool
490
) -> None:
491
paths = [tmp_path / "1", tmp_path / "2", tmp_path / "3"]
492
dfs = [pl.DataFrame({"x": i}) for i in range(len(paths))]
493
494
for df, p in zip(dfs, paths, strict=True):
495
df.write_parquet(p)
496
497
# Parquet files containing only the metadata - i.e. the data parts are removed.
498
# Used to test that a reader doesn't try to read any data.
499
def trim_to_metadata(path: str | Path) -> None:
500
path = Path(path)
501
v = path.read_bytes()
502
metadata_and_footer_len = 8 + int.from_bytes(v[-8:][:4], "little")
503
path.write_bytes(v[-metadata_and_footer_len:])
504
505
trim_to_metadata(paths[0])
506
trim_to_metadata(paths[2])
507
508
# Check baseline:
509
# * Metadata can be read without error
510
assert pl.read_parquet_schema(paths[0]) == dfs[0].schema
511
# * Attempting to read any data will error
512
with pytest.raises(ComputeError):
513
pl.scan_parquet(paths[0]).collect(
514
engine="streaming" if streaming else "in-memory"
515
)
516
517
df = dfs[1]
518
assert_frame_equal(
519
pl.scan_parquet(paths)
520
.slice(1, 1)
521
.collect(engine="streaming" if streaming else "in-memory"),
522
df,
523
)
524
assert_frame_equal(
525
pl.scan_parquet(paths[1:])
526
.head(1)
527
.collect(engine="streaming" if streaming else "in-memory"),
528
df,
529
)
530
assert_frame_equal(
531
(
532
pl.scan_parquet([paths[1], paths[1], paths[1]])
533
.with_row_index()
534
.slice(1, 1)
535
.collect(engine="streaming" if streaming else "in-memory")
536
),
537
df.with_row_index(offset=1),
538
)
539
assert_frame_equal(
540
(
541
pl.scan_parquet([paths[1], paths[1], paths[1]])
542
.with_row_index(offset=1)
543
.slice(1, 1)
544
.collect(engine="streaming" if streaming else "in-memory")
545
),
546
df.with_row_index(offset=2),
547
)
548
assert_frame_equal(
549
pl.scan_parquet(paths[1:])
550
.head(1)
551
.collect(engine="streaming" if streaming else "in-memory"),
552
df,
553
)
554
555
# Negative slice unsupported in streaming
556
if not streaming:
557
assert_frame_equal(pl.scan_parquet(paths).slice(-2, 1).collect(), df)
558
assert_frame_equal(pl.scan_parquet(paths[:2]).tail(1).collect(), df)
559
assert_frame_equal(
560
pl.scan_parquet(paths[1:]).slice(-99, 1).collect(), df.clear()
561
)
562
563
path = tmp_path / "data"
564
df = pl.select(x=pl.int_range(0, 50))
565
df.write_parquet(path)
566
assert_frame_equal(pl.scan_parquet(path).slice(-100, 75).collect(), df.head(25))
567
assert_frame_equal(
568
pl.scan_parquet([path, path]).with_row_index().slice(-25, 100).collect(),
569
pl.concat([df, df]).with_row_index().slice(75),
570
)
571
assert_frame_equal(
572
pl.scan_parquet([path, path])
573
.with_row_index(offset=10)
574
.slice(-25, 100)
575
.collect(),
576
pl.concat([df, df]).with_row_index(offset=10).slice(75),
577
)
578
assert_frame_equal(
579
pl.scan_parquet(path).slice(-1, (1 << 32) - 1).collect(), df.tail(1)
580
)
581
582
583
@pytest.mark.write_disk
584
def test_predicate_slice_pushdown_row_index_20485(tmp_path: Path) -> None:
585
tmp_path.mkdir(exist_ok=True)
586
587
file_path = tmp_path / "slice_pushdown.parquet"
588
row_group_size = 100000
589
num_row_groups = 3
590
591
df = pl.select(ref=pl.int_range(num_row_groups * row_group_size))
592
df.write_parquet(file_path, row_group_size=row_group_size)
593
594
# Use a slice that starts near the end of one row group and extends into the next
595
# to test handling of slices that span multiple row groups.
596
slice_start = 199995
597
slice_len = 10
598
ldf = pl.scan_parquet(file_path)
599
sliced_df = ldf.with_row_index().slice(slice_start, slice_len).collect()
600
sliced_df_no_pushdown = (
601
ldf.with_row_index()
602
.slice(slice_start, slice_len)
603
.collect(optimizations=pl.QueryOptFlags(slice_pushdown=False))
604
)
605
606
expected_index = list(range(slice_start, slice_start + slice_len))
607
actual_index = list(sliced_df["index"])
608
assert actual_index == expected_index
609
610
assert_frame_equal(sliced_df, sliced_df_no_pushdown)
611
612
613
@pytest.mark.write_disk
614
@pytest.mark.parametrize("streaming", [True, False])
615
def test_parquet_row_groups_shift_bug_18739(tmp_path: Path, streaming: bool) -> None:
616
tmp_path.mkdir(exist_ok=True)
617
path = tmp_path / "data.bin"
618
619
df = pl.DataFrame({"id": range(100)})
620
df.write_parquet(path, row_group_size=1)
621
622
lf = pl.scan_parquet(path)
623
assert_frame_equal(df, lf.collect(engine="streaming" if streaming else "in-memory"))
624
625
626
@pytest.mark.write_disk
627
@pytest.mark.parametrize("streaming", [True, False])
628
def test_dsl2ir_cached_metadata(tmp_path: Path, streaming: bool) -> None:
629
df = pl.DataFrame({"x": 1})
630
path = tmp_path / "1"
631
df.write_parquet(path)
632
633
lf = pl.scan_parquet(path)
634
assert_frame_equal(lf.collect(), df)
635
636
# Removes the metadata portion of the parquet file.
637
# Used to test that a reader doesn't try to read the metadata.
638
def remove_metadata(path: str | Path) -> None:
639
path = Path(path)
640
v = path.read_bytes()
641
metadata_and_footer_len = 8 + int.from_bytes(v[-8:][:4], "little")
642
path.write_bytes(v[:-metadata_and_footer_len] + b"PAR1")
643
644
remove_metadata(path)
645
assert_frame_equal(lf.collect(engine="streaming" if streaming else "in-memory"), df)
646
647
648
@pytest.mark.write_disk
649
def test_parquet_unaligned_schema_read(tmp_path: Path) -> None:
650
dfs = [
651
pl.DataFrame({"a": 1, "b": 10}),
652
pl.DataFrame({"b": 11, "a": 2}),
653
pl.DataFrame({"x": 3, "a": 3, "y": 3, "b": 12}),
654
]
655
656
paths = [tmp_path / "1", tmp_path / "2", tmp_path / "3"]
657
658
for df, path in zip(dfs, paths, strict=True):
659
df.write_parquet(path)
660
661
lf = pl.scan_parquet(paths, extra_columns="ignore")
662
663
assert_frame_equal(
664
lf.select("a").collect(engine="in-memory"),
665
pl.DataFrame({"a": [1, 2, 3]}),
666
)
667
668
assert_frame_equal(
669
lf.with_row_index().select("a").collect(engine="in-memory"),
670
pl.DataFrame({"a": [1, 2, 3]}),
671
)
672
673
assert_frame_equal(
674
lf.select("b", "a").collect(engine="in-memory"),
675
pl.DataFrame({"b": [10, 11, 12], "a": [1, 2, 3]}),
676
)
677
678
assert_frame_equal(
679
pl.scan_parquet(paths[:2]).collect(engine="in-memory"),
680
pl.DataFrame({"a": [1, 2], "b": [10, 11]}),
681
)
682
683
lf = pl.scan_parquet(paths, extra_columns="raise")
684
685
with pytest.raises(pl.exceptions.SchemaError):
686
lf.collect(engine="in-memory")
687
688
with pytest.raises(pl.exceptions.SchemaError):
689
lf.with_row_index().collect(engine="in-memory")
690
691
692
@pytest.mark.write_disk
693
@pytest.mark.parametrize("streaming", [True, False])
694
def test_parquet_unaligned_schema_read_dtype_mismatch(
695
tmp_path: Path, streaming: bool
696
) -> None:
697
dfs = [
698
pl.DataFrame({"a": 1, "b": 10}),
699
pl.DataFrame({"b": "11", "a": "2"}),
700
]
701
702
paths = [tmp_path / "1", tmp_path / "2"]
703
704
for df, path in zip(dfs, paths, strict=True):
705
df.write_parquet(path)
706
707
lf = pl.scan_parquet(paths)
708
709
with pytest.raises(pl.exceptions.SchemaError, match="data type mismatch"):
710
lf.collect(engine="streaming" if streaming else "in-memory")
711
712
713
@pytest.mark.write_disk
714
@pytest.mark.parametrize("streaming", [True, False])
715
def test_parquet_unaligned_schema_read_missing_cols_from_first(
716
tmp_path: Path, streaming: bool
717
) -> None:
718
dfs = [
719
pl.DataFrame({"a": 1, "b": 10}),
720
pl.DataFrame({"b": 11}),
721
]
722
723
paths = [tmp_path / "1", tmp_path / "2"]
724
725
for df, path in zip(dfs, paths, strict=True):
726
df.write_parquet(path)
727
728
lf = pl.scan_parquet(paths)
729
730
with pytest.raises(
731
(pl.exceptions.SchemaError, pl.exceptions.ColumnNotFoundError),
732
):
733
lf.collect(engine="streaming" if streaming else "in-memory")
734
735
736
@pytest.mark.parametrize("parallel", ["columns", "row_groups", "prefiltered", "none"])
737
@pytest.mark.parametrize("streaming", [True, False])
738
@pytest.mark.write_disk
739
def test_parquet_schema_arg(
740
tmp_path: Path,
741
parallel: ParallelStrategy,
742
streaming: bool,
743
) -> None:
744
tmp_path.mkdir(exist_ok=True)
745
dfs = [pl.DataFrame({"a": 1, "b": 1}), pl.DataFrame({"a": 2, "b": 2})]
746
paths = [tmp_path / "1", tmp_path / "2"]
747
748
for df, path in zip(dfs, paths, strict=True):
749
df.write_parquet(path)
750
751
schema: dict[str, pl.DataType] = {
752
"1": pl.Datetime(time_unit="ms", time_zone="CET"),
753
"a": pl.Int64(),
754
"b": pl.Int64(),
755
}
756
757
# Test `schema` containing an extra column.
758
759
lf = pl.scan_parquet(paths, parallel=parallel, schema=schema)
760
761
with pytest.raises((pl.exceptions.SchemaError, pl.exceptions.ColumnNotFoundError)):
762
lf.collect(engine="streaming" if streaming else "in-memory")
763
764
lf = pl.scan_parquet(
765
paths, parallel=parallel, schema=schema, missing_columns="insert"
766
)
767
768
assert_frame_equal(
769
lf.collect(engine="streaming" if streaming else "in-memory"),
770
pl.DataFrame({"1": None, "a": [1, 2], "b": [1, 2]}, schema=schema),
771
)
772
773
# Just one test that `read_parquet` is propagating this argument.
774
assert_frame_equal(
775
pl.read_parquet(
776
paths, parallel=parallel, schema=schema, missing_columns="insert"
777
),
778
pl.DataFrame({"1": None, "a": [1, 2], "b": [1, 2]}, schema=schema),
779
)
780
781
# Issue #19081: If a schema arg is passed, ensure its fields are propagated
782
# to the IR, otherwise even if `missing_columns='insert'`, downstream
783
# `select()`s etc. will fail with ColumnNotFound if the column is not in
784
# the first file.
785
lf = pl.scan_parquet(
786
paths, parallel=parallel, schema=schema, missing_columns="insert"
787
).select("1")
788
789
s = lf.collect(engine="streaming" if streaming else "in-memory").to_series()
790
assert s.len() == 2
791
assert s.null_count() == 2
792
793
# Test files containing extra columns not in `schema`
794
795
schema: dict[str, type[pl.DataType]] = {"a": pl.Int64} # type: ignore[no-redef]
796
797
for missing_columns in ["insert", "raise"]:
798
lf = pl.scan_parquet(
799
paths,
800
parallel=parallel,
801
schema=schema,
802
missing_columns=missing_columns, # type: ignore[arg-type]
803
)
804
805
with pytest.raises(pl.exceptions.SchemaError):
806
lf.collect(engine="streaming" if streaming else "in-memory")
807
808
lf = pl.scan_parquet(
809
paths,
810
parallel=parallel,
811
schema=schema,
812
extra_columns="ignore",
813
).select("a")
814
815
assert_frame_equal(
816
lf.collect(engine="in-memory"),
817
pl.DataFrame({"a": [1, 2]}, schema=schema),
818
)
819
820
schema: dict[str, type[pl.DataType]] = {"a": pl.Int64, "b": pl.Int8} # type: ignore[no-redef]
821
822
lf = pl.scan_parquet(paths, parallel=parallel, schema=schema)
823
824
with pytest.raises(
825
pl.exceptions.SchemaError,
826
match="data type mismatch for column b: incoming: Int64 != target: Int8",
827
):
828
lf.collect(engine="streaming" if streaming else "in-memory")
829
830
831
def test_scan_parquet_empty_path_expansion(tmp_path: Path) -> None:
832
tmp_path.mkdir(exist_ok=True)
833
834
with pytest.raises(
835
ComputeError,
836
match=r"failed to retrieve first file schema \(parquet\): "
837
r"expanded paths were empty \(path expansion input: "
838
".*Hint: passing a schema can allow this scan to succeed with an empty DataFrame",
839
):
840
pl.scan_parquet(tmp_path).collect()
841
842
# Scan succeeds when schema is provided
843
assert_frame_equal(
844
pl.scan_parquet(tmp_path, schema={"x": pl.Int64}).collect(),
845
pl.DataFrame(schema={"x": pl.Int64}),
846
)
847
848
assert_frame_equal(
849
pl.scan_parquet(tmp_path, schema={"x": pl.Int64}).with_row_index().collect(),
850
pl.DataFrame(schema={"x": pl.Int64}).with_row_index(),
851
)
852
853
assert_frame_equal(
854
pl.scan_parquet(
855
tmp_path, schema={"x": pl.Int64}, hive_schema={"h": pl.String}
856
).collect(),
857
pl.DataFrame(schema={"x": pl.Int64, "h": pl.String}),
858
)
859
860
assert_frame_equal(
861
(
862
pl.scan_parquet(
863
tmp_path, schema={"x": pl.Int64}, hive_schema={"h": pl.String}
864
)
865
.with_row_index()
866
.collect()
867
),
868
pl.DataFrame(schema={"x": pl.Int64, "h": pl.String}).with_row_index(),
869
)
870
871
872
@pytest.mark.parametrize("missing_columns", ["insert", "raise"])
873
@pytest.mark.write_disk
874
def test_scan_parquet_ignores_dtype_mismatch_for_non_projected_columns_19249(
875
tmp_path: Path,
876
missing_columns: str,
877
) -> None:
878
tmp_path.mkdir(exist_ok=True)
879
paths = [tmp_path / "1", tmp_path / "2"]
880
881
pl.DataFrame({"a": 1, "b": 1}, schema={"a": pl.Int32, "b": pl.UInt8}).write_parquet(
882
paths[0]
883
)
884
pl.DataFrame(
885
{"a": 1, "b": 1}, schema={"a": pl.Int32, "b": pl.UInt64}
886
).write_parquet(paths[1])
887
888
assert_frame_equal(
889
pl.scan_parquet(paths, missing_columns=missing_columns) # type: ignore[arg-type]
890
.select("a")
891
.collect(engine="in-memory"),
892
pl.DataFrame({"a": [1, 1]}, schema={"a": pl.Int32}),
893
)
894
895
896
@pytest.mark.parametrize("streaming", [True, False])
897
@pytest.mark.write_disk
898
def test_scan_parquet_streaming_row_index_19606(
899
tmp_path: Path, streaming: bool
900
) -> None:
901
tmp_path.mkdir(exist_ok=True)
902
paths = [tmp_path / "1", tmp_path / "2"]
903
904
dfs = [pl.DataFrame({"x": i}) for i in range(len(paths))]
905
906
for df, p in zip(dfs, paths, strict=True):
907
df.write_parquet(p)
908
909
assert_frame_equal(
910
pl.scan_parquet(tmp_path)
911
.with_row_index()
912
.collect(engine="streaming" if streaming else "in-memory"),
913
pl.DataFrame(
914
{"index": [0, 1], "x": [0, 1]}, schema={"index": pl.UInt32, "x": pl.Int64}
915
),
916
)
917
918
919
def test_scan_parquet_prefilter_panic_22452() -> None:
920
# This is, the easiest way to control the threadpool size so that it is stable.
921
out = subprocess.check_output(
922
[
923
sys.executable,
924
"-c",
925
"""\
926
import os
927
928
os.environ["POLARS_MAX_THREADS"] = "2"
929
930
import io
931
932
import polars as pl
933
from polars.testing import assert_frame_equal
934
935
assert pl.thread_pool_size() == 2
936
937
f = io.BytesIO()
938
939
df = pl.DataFrame({x: 1 for x in ["a", "b", "c", "d", "e"]})
940
df.write_parquet(f)
941
f.seek(0)
942
943
assert_frame_equal(
944
pl.scan_parquet(f, parallel="prefiltered")
945
.filter(pl.col(c) == 1 for c in ["a", "b", "c"])
946
.collect(),
947
df,
948
)
949
950
print("OK", end="")
951
""",
952
],
953
)
954
955
assert out == b"OK"
956
957
958
@pytest.mark.slow
959
def test_scan_parquet_in_mem_to_streaming_dispatch_deadlock_22641() -> None:
960
out = subprocess.check_output(
961
[
962
sys.executable,
963
"-c",
964
"""\
965
import os
966
967
os.environ["POLARS_MAX_THREADS"] = "1"
968
os.environ["POLARS_VERBOSE"] = "1"
969
970
import io
971
import sys
972
from threading import Thread
973
974
import polars as pl
975
976
assert pl.thread_pool_size() == 1
977
978
f = io.BytesIO()
979
pl.DataFrame({"x": 1}).write_parquet(f)
980
981
q = (
982
pl.scan_parquet(f)
983
.filter(pl.sum_horizontal(pl.col("x"), pl.col("x"), pl.col("x")) >= 0)
984
.join(pl.scan_parquet(f), on="x", how="left")
985
)
986
987
results = [
988
pl.DataFrame(),
989
pl.DataFrame(),
990
pl.DataFrame(),
991
pl.DataFrame(),
992
pl.DataFrame(),
993
]
994
995
996
def run():
997
# Also test just a single scan
998
pl.scan_parquet(f).collect()
999
1000
print("QUERY-FENCE", file=sys.stderr)
1001
1002
results[0] = q.collect()
1003
1004
print("QUERY-FENCE", file=sys.stderr)
1005
1006
results[1] = pl.concat([q, q, q]).collect().head(1)
1007
1008
print("QUERY-FENCE", file=sys.stderr)
1009
1010
results[2] = pl.collect_all([q, q, q])[0]
1011
1012
print("QUERY-FENCE", file=sys.stderr)
1013
1014
results[3] = pl.collect_all(3 * [pl.concat(3 * [q])])[0].head(1)
1015
1016
print("QUERY-FENCE", file=sys.stderr)
1017
1018
results[4] = q.collect(background=True).fetch_blocking()
1019
1020
1021
t = Thread(target=run, daemon=True)
1022
t.start()
1023
t.join(5)
1024
1025
assert [x.equals(pl.DataFrame({"x": 1})) for x in results] == [
1026
True,
1027
True,
1028
True,
1029
True,
1030
True,
1031
]
1032
1033
print("OK", end="", file=sys.stderr)
1034
""",
1035
],
1036
stderr=subprocess.STDOUT,
1037
)
1038
1039
assert out.endswith(b"OK")
1040
1041
def ensure_caches_dropped(verbose_log: str) -> None:
1042
cache_hit_prefix = "CACHE HIT: cache id: "
1043
1044
ids_hit = {
1045
x[len(cache_hit_prefix) :]
1046
for x in verbose_log.splitlines()
1047
if x.startswith(cache_hit_prefix)
1048
}
1049
1050
cache_drop_prefix = "CACHE DROP: cache id: "
1051
1052
ids_dropped = {
1053
x[len(cache_drop_prefix) :]
1054
for x in verbose_log.splitlines()
1055
if x.startswith(cache_drop_prefix)
1056
}
1057
1058
assert ids_hit == ids_dropped
1059
1060
out_str = out.decode()
1061
1062
for logs in out_str.split("QUERY-FENCE"):
1063
ensure_caches_dropped(logs)
1064
1065
1066
def test_parquet_prefiltering_inserted_column_23268() -> None:
1067
df = pl.DataFrame({"a": [1, 2, 3, 4]}, schema={"a": pl.Int8})
1068
1069
f = io.BytesIO()
1070
df.write_parquet(f)
1071
1072
f.seek(0)
1073
assert_frame_equal(
1074
(
1075
pl.scan_parquet(
1076
f,
1077
schema={"a": pl.Int8, "b": pl.Int16},
1078
missing_columns="insert",
1079
)
1080
.filter(pl.col("a") == 3)
1081
.filter(pl.col("b") == 3)
1082
.collect()
1083
),
1084
pl.DataFrame(schema={"a": pl.Int8, "b": pl.Int16}),
1085
)
1086
1087
1088
@pytest.mark.may_fail_cloud # reason: inspects logs
1089
def test_scan_parquet_prefilter_with_cast(
1090
plmonkeypatch: PlMonkeyPatch,
1091
capfd: pytest.CaptureFixture[str],
1092
) -> None:
1093
f = io.BytesIO()
1094
1095
df = pl.DataFrame(
1096
{
1097
"a": ["A", "B", "C", "D", "E", "F"],
1098
"b": pl.Series([1, 1, 1, 1, 0, 1], dtype=pl.UInt8),
1099
}
1100
)
1101
1102
df.write_parquet(f, row_group_size=3)
1103
1104
md = pq.read_metadata(f)
1105
1106
assert [md.row_group(i).num_rows for i in range(md.num_row_groups)] == [3, 3]
1107
1108
q = pl.scan_parquet(
1109
f,
1110
schema={"a": pl.String, "b": pl.Int16},
1111
cast_options=pl.ScanCastOptions(integer_cast="upcast"),
1112
include_file_paths="file_path",
1113
).filter(pl.col("b") - 1 == pl.lit(-1, dtype=pl.Int16))
1114
1115
with plmonkeypatch.context() as cx:
1116
cx.setenv("POLARS_VERBOSE", "1")
1117
capfd.readouterr()
1118
out = q.collect()
1119
capture = capfd.readouterr().err
1120
1121
assert (
1122
"[ParquetFileReader]: Pre-filtered decode enabled (1 live, 1 non-live)"
1123
in capture
1124
)
1125
assert (
1126
"[ParquetFileReader]: Predicate pushdown: reading 1 / 2 row groups" in capture
1127
)
1128
1129
assert_frame_equal(
1130
out,
1131
pl.DataFrame(
1132
{
1133
"a": "E",
1134
"b": pl.Series([0], dtype=pl.Int16),
1135
"file_path": "in-mem",
1136
}
1137
),
1138
)
1139
1140
1141
def test_prefilter_with_n_rows_23790() -> None:
1142
df = pl.DataFrame(
1143
{
1144
"a": ["A", "B", "C", "D", "E", "F"],
1145
"b": [1, 2, 3, 4, 5, 6],
1146
}
1147
)
1148
1149
f = io.BytesIO()
1150
1151
df.write_parquet(f, row_group_size=2)
1152
1153
f.seek(0)
1154
1155
md = pq.read_metadata(f)
1156
1157
assert [md.row_group(i).num_rows for i in range(md.num_row_groups)] == [2, 2, 2]
1158
1159
f.seek(0)
1160
q = pl.scan_parquet(f, n_rows=3).filter(pl.col("b").is_in([1, 3]))
1161
1162
assert_frame_equal(q.collect(), pl.DataFrame({"a": ["A", "C"], "b": [1, 3]}))
1163
1164
# With row index / file_path
1165
1166
df = pl.DataFrame(
1167
{
1168
"a": ["A", "B", "C", "D", "E", "F"],
1169
"b": [1, 2, 3, 4, 5, 6],
1170
}
1171
)
1172
1173
f = io.BytesIO()
1174
1175
df.write_parquet(f, row_group_size=2)
1176
1177
f.seek(0)
1178
md = pq.read_metadata(f)
1179
1180
assert [md.row_group(i).num_rows for i in range(md.num_row_groups)] == [2, 2, 2]
1181
1182
f.seek(0)
1183
q = pl.scan_parquet(
1184
f,
1185
n_rows=3,
1186
row_index_name="index",
1187
include_file_paths="file_path",
1188
).filter(pl.col("b").is_in([1, 3]))
1189
1190
assert_frame_equal(
1191
q.collect(),
1192
pl.DataFrame(
1193
{
1194
"index": pl.Series([0, 2], dtype=pl.get_index_type()),
1195
"a": ["A", "C"],
1196
"b": [1, 3],
1197
"file_path": "in-mem",
1198
}
1199
),
1200
)
1201
1202
1203
def test_scan_parquet_filter_index_panic_23849(plmonkeypatch: PlMonkeyPatch) -> None:
1204
plmonkeypatch.setenv("POLARS_PARQUET_DECODE_TARGET_VALUES_PER_THREAD", "5")
1205
num_rows = 3
1206
num_cols = 5
1207
1208
f = io.BytesIO()
1209
1210
pl.select(
1211
pl.int_range(0, num_rows).alias(f"col_{i}") for i in range(num_cols)
1212
).write_parquet(f)
1213
1214
for parallel in ["auto", "columns", "row_groups", "prefiltered", "none"]:
1215
pl.scan_parquet(f, parallel=parallel).filter( # type: ignore[arg-type]
1216
pl.col("col_0").ge(0) & pl.col("col_0").lt(num_rows + 1)
1217
).collect()
1218
1219
1220
@pytest.mark.write_disk
1221
def test_sink_large_rows_25834(tmp_path: Path, plmonkeypatch: PlMonkeyPatch) -> None:
1222
plmonkeypatch.setenv("POLARS_IDEAL_SINK_MORSEL_SIZE_BYTES", "1")
1223
df = pl.select(idx=pl.repeat(1, 20_000), bytes=pl.lit(b"AAAAA"))
1224
1225
df.write_parquet(tmp_path / "single.parquet")
1226
assert_frame_equal(pl.scan_parquet(tmp_path / "single.parquet").collect(), df)
1227
1228
md = pq.read_metadata(tmp_path / "single.parquet")
1229
assert [md.row_group(i).num_rows for i in range(md.num_row_groups)] == [
1230
16384,
1231
3616,
1232
]
1233
1234
df.write_parquet(
1235
tmp_path / "partitioned",
1236
partition_by="idx",
1237
)
1238
assert_frame_equal(pl.scan_parquet(tmp_path / "partitioned").collect(), df)
1239
1240
1241
def test_scan_parquet_prefilter_is_between_non_column_input_26283() -> None:
1242
f = io.BytesIO()
1243
1244
df = pl.DataFrame(
1245
{
1246
"timestamp": pl.datetime_range(
1247
start=datetime(2026, 1, 1),
1248
end=datetime(2026, 1, 1, 0, 5, 0),
1249
interval="1s",
1250
eager=True,
1251
),
1252
},
1253
schema={"timestamp": pl.Datetime("us")},
1254
height=301,
1255
)
1256
1257
df.write_parquet(f)
1258
f.seek(0)
1259
1260
q = pl.scan_parquet(f).filter(
1261
pl.col("timestamp")
1262
.dt.date()
1263
.cast(pl.Datetime("us"))
1264
.is_between(datetime(2026, 1, 1), datetime(2026, 1, 1))
1265
)
1266
1267
assert_frame_equal(q.collect(), df)
1268
1269
1270
def test_sink_parquet_arrow_schema() -> None:
1271
df = pl.DataFrame({"x": [0, 1, None]})
1272
1273
f = io.BytesIO()
1274
df.lazy().sink_parquet(
1275
f,
1276
arrow_schema=pa.schema(
1277
[
1278
pa.field(
1279
"x",
1280
pa.int64(),
1281
metadata={"custom_field_md_key": "custom_field_md_value"},
1282
)
1283
],
1284
),
1285
)
1286
1287
f.seek(0)
1288
1289
assert (
1290
pq.read_schema(f).field("x").metadata[b"custom_field_md_key"]
1291
== b"custom_field_md_value"
1292
)
1293
1294
f = io.BytesIO()
1295
1296
df.lazy().sink_parquet(
1297
f,
1298
arrow_schema=pa.schema(
1299
[pa.field("x", pa.int64())],
1300
metadata={"custom_schema_md_key": "custom_schema_md_value"},
1301
),
1302
metadata={"custom_footer_md_key": "custom_footer_md_value"},
1303
)
1304
1305
f.seek(0)
1306
1307
assert pq.read_schema(f).metadata == {
1308
b"custom_schema_md_key": b"custom_schema_md_value"
1309
}
1310
assert (
1311
pq.read_metadata(f).metadata[b"custom_footer_md_key"]
1312
== b"custom_footer_md_value"
1313
)
1314
assert (
1315
pl.read_parquet_metadata(f)["custom_footer_md_key"] == "custom_footer_md_value"
1316
)
1317
assert pa.ipc.read_schema(
1318
pa.BufferReader(base64.b64decode(pq.read_metadata(f).metadata[b"ARROW:schema"]))
1319
).metadata == {b"custom_schema_md_key": b"custom_schema_md_value"}
1320
1321
with pytest.raises(
1322
SchemaError,
1323
match=r"provided dtype \(Int32\) does not match output dtype \(Int64\)",
1324
):
1325
df.lazy().sink_parquet(
1326
io.BytesIO(),
1327
arrow_schema=pa.schema(
1328
[pa.field("x", pa.int32())],
1329
),
1330
)
1331
1332
with pytest.raises(
1333
SchemaError,
1334
match="nullable is false but array contained 1 NULL",
1335
):
1336
df.lazy().sink_parquet(
1337
io.BytesIO(),
1338
arrow_schema=pa.schema(
1339
[pa.field("x", pa.int64(), nullable=False)],
1340
),
1341
)
1342
1343
with pytest.raises(
1344
SchemaError,
1345
match=r"schema names in arrow_schema differ",
1346
):
1347
df.lazy().sink_parquet(
1348
io.BytesIO(),
1349
arrow_schema=pa.schema(
1350
[pa.field("z", pa.int64())],
1351
),
1352
)
1353
1354
with pytest.raises(
1355
SchemaError,
1356
match="schema names in arrow_schema differ",
1357
):
1358
df.lazy().sink_parquet(
1359
io.BytesIO(),
1360
arrow_schema=pa.schema([]),
1361
)
1362
1363
with pytest.raises(
1364
SchemaError,
1365
match="schema names in arrow_schema differ",
1366
):
1367
df.lazy().sink_parquet(
1368
io.BytesIO(),
1369
arrow_schema=pa.schema(
1370
[
1371
pa.field(
1372
"x",
1373
pa.int64(),
1374
),
1375
pa.field(
1376
"y",
1377
pa.int64(),
1378
),
1379
],
1380
),
1381
)
1382
1383
1384
def test_sink_parquet_arrow_schema_logical_types() -> None:
1385
from tests.unit.datatypes.test_extension import PythonTestExtension
1386
1387
df = pl.DataFrame(
1388
{
1389
"categorical": pl.Series(
1390
["A"], dtype=pl.Categorical(pl.Categories.random())
1391
),
1392
"datetime": pl.Series([datetime(2026, 1, 1)], dtype=pl.Datetime("ns")),
1393
"extension[str]": pl.Series(["A"], dtype=PythonTestExtension(pl.String)),
1394
}
1395
)
1396
1397
with pytest.raises(SchemaError, match=r"Dictionary\(UInt32, LargeUtf8, false\)"):
1398
df.select("categorical").lazy().sink_parquet(
1399
io.BytesIO(),
1400
arrow_schema=pa.schema(
1401
[pa.field("categorical", pa.null())],
1402
),
1403
)
1404
1405
df.select("categorical").lazy().sink_parquet(
1406
io.BytesIO(),
1407
arrow_schema=pa.schema(
1408
[pa.field("categorical", pa.dictionary(pa.uint32(), pa.large_string()))],
1409
),
1410
)
1411
1412
with pytest.raises(SchemaError, match=r"Timestamp\(Nanosecond, None\)"):
1413
df.select("datetime").lazy().sink_parquet(
1414
io.BytesIO(),
1415
arrow_schema=pa.schema(
1416
[pa.field("datetime", pa.null())],
1417
),
1418
)
1419
1420
df.select("datetime").lazy().sink_parquet(
1421
io.BytesIO(),
1422
arrow_schema=pa.schema(
1423
[pa.field("datetime", pa.timestamp("ns"))],
1424
),
1425
)
1426
1427
def build_pyarrow_extension_type(name: str) -> Any:
1428
class PythonTestExtensionPyarrow(pa.ExtensionType): # type: ignore[misc]
1429
def __init__(self, data_type: pa.DataType) -> None:
1430
super().__init__(data_type, name)
1431
1432
def __arrow_ext_serialize__(self) -> bytes:
1433
return b""
1434
1435
@classmethod
1436
def __arrow_ext_deserialize__(
1437
cls, storage_type: Any, serialized: Any
1438
) -> Any:
1439
return PythonTestExtensionPyarrow(storage_type[0].type)
1440
1441
return PythonTestExtensionPyarrow(pa.large_string())
1442
1443
with pytest.raises(
1444
SchemaError,
1445
match=r'Extension\(ExtensionType { name: "testing.python_test_extension", inner: LargeUtf8, metadata: None }\)',
1446
):
1447
df.select("extension[str]").lazy().sink_parquet(
1448
io.BytesIO(),
1449
arrow_schema=pa.schema(
1450
[pa.field("extension[str]", build_pyarrow_extension_type("name"))],
1451
),
1452
)
1453
1454
df.select("extension[str]").lazy().sink_parquet(
1455
io.BytesIO(),
1456
arrow_schema=pa.schema(
1457
[
1458
pa.field(
1459
"extension[str]",
1460
build_pyarrow_extension_type("testing.python_test_extension"),
1461
)
1462
],
1463
),
1464
)
1465
1466
1467
def test_sink_parquet_arrow_schema_nested_types() -> None:
1468
df = pl.DataFrame(
1469
{
1470
"list[struct{a:int64}]": pl.Series(
1471
[[{"a": 1}, {"a": None}]], dtype=pl.List(pl.Struct({"a": pl.Int64}))
1472
),
1473
"array[int64, 2]": pl.Series([[0, None]], dtype=pl.Array(pl.Int64, 2)),
1474
}
1475
)
1476
1477
with pytest.raises(SchemaError, match="struct dtype mismatch"):
1478
df.select("list[struct{a:int64}]").lazy().sink_parquet(
1479
io.BytesIO(),
1480
arrow_schema=pa.schema(
1481
[
1482
pa.field(
1483
"list[struct{a:int64}]",
1484
pa.large_list(pa.struct([])),
1485
)
1486
],
1487
),
1488
)
1489
1490
with pytest.raises(SchemaError, match="struct dtype mismatch"):
1491
df.select("list[struct{a:int64}]").lazy().sink_parquet(
1492
io.BytesIO(),
1493
arrow_schema=pa.schema(
1494
[
1495
pa.field(
1496
"list[struct{a:int64}]",
1497
pa.large_list(
1498
pa.struct(
1499
[pa.field("a", pa.int64()), pa.field("b", pa.int64())]
1500
)
1501
),
1502
)
1503
],
1504
),
1505
)
1506
1507
with pytest.raises(
1508
SchemaError,
1509
match="nullable is false but array contained 1 NULL",
1510
):
1511
df.select("list[struct{a:int64}]").lazy().sink_parquet(
1512
io.BytesIO(),
1513
arrow_schema=pa.schema(
1514
[
1515
pa.field(
1516
"list[struct{a:int64}]",
1517
pa.large_list(
1518
pa.struct([pa.field("a", pa.int64(), nullable=False)])
1519
),
1520
)
1521
],
1522
),
1523
)
1524
1525
df.select("list[struct{a:int64}]").lazy().sink_parquet(
1526
io.BytesIO(),
1527
arrow_schema=pa.schema(
1528
[
1529
pa.field(
1530
"list[struct{a:int64}]",
1531
pa.large_list(pa.struct([pa.field("a", pa.int64())])),
1532
)
1533
],
1534
),
1535
)
1536
1537
with pytest.raises(SchemaError, match="fixed-size list dtype mismatch:"):
1538
df.select("array[int64, 2]").lazy().sink_parquet(
1539
io.BytesIO(),
1540
arrow_schema=pa.schema(
1541
[
1542
pa.field(
1543
"array[int64, 2]",
1544
pa.list_(pa.int64(), 0),
1545
)
1546
],
1547
),
1548
)
1549
1550
df.select("array[int64, 2]").lazy().sink_parquet(
1551
io.BytesIO(),
1552
arrow_schema=pa.schema(
1553
[
1554
pa.field(
1555
"array[int64, 2]",
1556
pa.list_(pa.int64(), 2),
1557
)
1558
],
1559
),
1560
)
1561
1562
1563
def test_sink_parquet_writes_strings_as_largeutf8_by_default() -> None:
1564
df = pl.DataFrame({"string": "A", "binary": [b"A"]})
1565
1566
with pytest.raises(
1567
SchemaError,
1568
match=r"provided dtype \(Utf8View\) does not match output dtype \(LargeUtf8\)",
1569
):
1570
df.lazy().select("string").sink_parquet(
1571
io.BytesIO(), arrow_schema=pa.schema([pa.field("string", pa.string_view())])
1572
)
1573
1574
with pytest.raises(
1575
SchemaError,
1576
match=r"provided dtype \(BinaryView\) does not match output dtype \(LargeBinary\)",
1577
):
1578
df.lazy().select("binary").sink_parquet(
1579
io.BytesIO(), arrow_schema=pa.schema([pa.field("binary", pa.binary_view())])
1580
)
1581
1582
f = io.BytesIO()
1583
1584
arrow_schema = pa.schema(
1585
[
1586
pa.field("string", pa.large_string()),
1587
pa.field("binary", pa.large_binary()),
1588
]
1589
)
1590
1591
df.lazy().sink_parquet(f, arrow_schema=arrow_schema)
1592
1593
f.seek(0)
1594
1595
assert pq.read_schema(f) == arrow_schema
1596
1597
f.seek(0)
1598
1599
assert_frame_equal(pl.scan_parquet(f).collect(), df)
1600
1601
1602
def test_sink_parquet_pyarrow_filter_string_type_26435() -> None:
1603
df = pl.DataFrame({"string": ["A", None, "B"], "int": [0, 1, 2]})
1604
1605
f = io.BytesIO()
1606
1607
df.write_parquet(f)
1608
1609
f.seek(0)
1610
1611
assert_frame_equal(
1612
pl.DataFrame(pq.read_table(f, filters=[("int", "=", 0)])),
1613
pl.DataFrame({"string": "A", "int": 0}),
1614
)
1615
1616
f.seek(0)
1617
1618
assert_frame_equal(
1619
pl.DataFrame(pq.read_table(f, filters=[("string", "=", "A")])),
1620
pl.DataFrame({"string": "A", "int": 0}),
1621
)
1622
1623
1624
def test_scan_parquet_temporal_lit_comparison_skip_batch_24095_25731(
1625
plmonkeypatch: PlMonkeyPatch,
1626
capfd: pytest.CaptureFixture[str],
1627
) -> None:
1628
plmonkeypatch.setenv("POLARS_VERBOSE", "1")
1629
1630
df = pl.DataFrame(
1631
{
1632
"datetime[ns]": pl.Series(
1633
[
1634
"2025-12-31 23:59:59.999999999",
1635
"2025-12-31 23:59:59.999999999",
1636
"2026-01-01 00:00:00.000000000",
1637
"2026-01-01 00:00:00.000000999",
1638
"2026-01-01 00:00:00.000001000",
1639
"2026-01-01 00:00:00.000001999",
1640
"2026-01-01 00:00:00.000002000",
1641
"2026-01-01 00:00:00.000999999",
1642
"2026-01-01 00:00:00.001000000",
1643
"2026-01-01 00:00:00.001000000",
1644
]
1645
).str.strptime(dtype=pl.Datetime("ns"), format="%Y-%m-%d %H:%M:%S%.f"),
1646
"duration[ns]": pl.Series(
1647
[-1, -1, 0, 999, 1000, 1999, 2000, -1, -1, -1]
1648
).cast(pl.Duration("ns")),
1649
}
1650
)
1651
1652
f = io.BytesIO()
1653
df.write_parquet(f, row_group_size=2)
1654
1655
q = pl.scan_parquet(f).filter(
1656
pl.col("datetime[ns]") == pl.lit(datetime(2026, 1, 1))
1657
)
1658
1659
capfd.readouterr()
1660
out = q.collect()
1661
capture = capfd.readouterr().err
1662
1663
assert "reading 1 / 5 row groups" in capture
1664
1665
assert_frame_equal(
1666
out.select(pl.col("datetime[ns]").dt.to_string()),
1667
pl.DataFrame(
1668
{
1669
"datetime[ns]": pl.Series(
1670
[
1671
"2026-01-01 00:00:00.000000000",
1672
"2026-01-01 00:00:00.000000999",
1673
]
1674
)
1675
}
1676
),
1677
)
1678
1679
q = pl.scan_parquet(f).filter(
1680
pl.col("datetime[ns]").is_between(
1681
pl.lit(datetime(2026, 1, 1)),
1682
pl.lit(datetime(2026, 1, 1, microsecond=1)),
1683
)
1684
)
1685
1686
capfd.readouterr()
1687
out = q.collect()
1688
capture = capfd.readouterr().err
1689
1690
assert "reading 2 / 5 row groups" in capture
1691
1692
assert_frame_equal(
1693
out.select(pl.col("datetime[ns]").dt.to_string()),
1694
pl.DataFrame(
1695
{
1696
"datetime[ns]": pl.Series(
1697
[
1698
"2026-01-01 00:00:00.000000000",
1699
"2026-01-01 00:00:00.000000999",
1700
"2026-01-01 00:00:00.000001000",
1701
"2026-01-01 00:00:00.000001999",
1702
]
1703
)
1704
}
1705
),
1706
)
1707
1708
capfd.readouterr()
1709
1710
q = pl.scan_parquet(f).filter(
1711
pl.col("datetime[ns]").is_between(
1712
pl.lit(datetime(2026, 1, 1)),
1713
pl.lit(datetime(2026, 1, 1, microsecond=1)),
1714
)
1715
)
1716
1717
capfd.readouterr()
1718
1719
out = q.collect()
1720
capture = capfd.readouterr().err
1721
1722
assert "reading 2 / 5 row groups" in capture
1723
1724
assert_frame_equal(
1725
out.select(pl.col("duration[ns]").dt.to_string()),
1726
pl.DataFrame(
1727
{
1728
"duration[ns]": pl.Series(
1729
[
1730
"PT0S",
1731
"PT0.000000999S",
1732
"PT0.000001S",
1733
"PT0.000001999S",
1734
]
1735
)
1736
}
1737
),
1738
)
1739
1740
capfd.readouterr()
1741
1742