Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/io/test_parquet.py
8449 views
1
from __future__ import annotations
2
3
import decimal
4
import functools
5
import io
6
import subprocess
7
import sys
8
import warnings
9
from datetime import date, datetime, time, timezone
10
from decimal import Decimal
11
from itertools import chain
12
from typing import TYPE_CHECKING, Any, Literal, cast
13
from zoneinfo import ZoneInfo
14
15
import fsspec
16
import numpy as np
17
import pandas as pd
18
import pyarrow as pa
19
import pyarrow.dataset as ds
20
import pyarrow.parquet as pq
21
import pytest
22
from hypothesis import given
23
from hypothesis import strategies as st
24
25
import polars as pl
26
from polars._utils.various import parse_version
27
from polars.testing import assert_frame_equal, assert_series_equal
28
from polars.testing.parametric import column, dataframes
29
from polars.testing.parametric.strategies.core import series
30
31
if TYPE_CHECKING:
32
from collections.abc import Callable
33
from pathlib import Path
34
35
from polars._typing import (
36
ParallelStrategy,
37
ParquetCompression,
38
ParquetMetadata,
39
ParquetMetadataContext,
40
TimeUnit,
41
)
42
from tests.conftest import PlMonkeyPatch
43
from tests.unit.conftest import MemoryUsage
44
45
46
@pytest.mark.may_fail_auto_streaming
47
def test_round_trip(df: pl.DataFrame) -> None:
48
f = io.BytesIO()
49
df.write_parquet(f)
50
f.seek(0)
51
assert_frame_equal(pl.read_parquet(f), df)
52
53
54
@pytest.mark.may_fail_auto_streaming
55
def test_scan_round_trip(df: pl.DataFrame) -> None:
56
f = io.BytesIO()
57
df.write_parquet(f)
58
f.seek(0)
59
assert_frame_equal(pl.scan_parquet(f).collect(), df)
60
f.seek(0)
61
assert_frame_equal(pl.scan_parquet(f).head().collect(), df.head())
62
63
64
COMPRESSIONS = [
65
"lz4",
66
"uncompressed",
67
"snappy",
68
"gzip",
69
"brotli",
70
"zstd",
71
]
72
73
74
@pytest.mark.write_disk
75
def test_write_parquet_using_pyarrow_9753(tmp_path: Path) -> None:
76
tmp_path.mkdir(exist_ok=True)
77
78
df = pl.DataFrame({"a": [1, 2, 3]})
79
df.write_parquet(
80
tmp_path / "test.parquet",
81
compression="zstd",
82
statistics=True,
83
use_pyarrow=True,
84
pyarrow_options={"coerce_timestamps": "us"},
85
)
86
87
88
@pytest.mark.parametrize("compression", COMPRESSIONS)
89
def test_write_parquet_using_pyarrow_write_to_dataset_with_partitioning(
90
tmp_path: Path,
91
compression: ParquetCompression,
92
) -> None:
93
df = pl.DataFrame({"a": [1, 2, 3], "partition_col": ["one", "two", "two"]})
94
path_to_write = tmp_path / "test_folder"
95
path_to_write.mkdir(exist_ok=True)
96
df.write_parquet(
97
file=path_to_write,
98
statistics=True,
99
use_pyarrow=True,
100
row_group_size=128,
101
pyarrow_options={
102
"partition_cols": ["partition_col"],
103
"compression": compression,
104
},
105
)
106
107
# cast is necessary as pyarrow writes partitions as categorical type
108
read_df = pl.read_parquet(path_to_write, use_pyarrow=True).with_columns(
109
pl.col("partition_col").cast(pl.String)
110
)
111
assert_frame_equal(df, read_df)
112
113
114
@pytest.fixture
115
def small_parquet_path(io_files_path: Path) -> Path:
116
return io_files_path / "small.parquet"
117
118
119
@pytest.mark.parametrize("compression", COMPRESSIONS)
120
@pytest.mark.parametrize("use_pyarrow", [True, False])
121
def test_to_from_buffer(
122
df: pl.DataFrame, compression: ParquetCompression, use_pyarrow: bool
123
) -> None:
124
df = df[["list_str"]]
125
buf = io.BytesIO()
126
df.write_parquet(buf, compression=compression, use_pyarrow=use_pyarrow)
127
buf.seek(0)
128
read_df = pl.read_parquet(buf, use_pyarrow=use_pyarrow)
129
assert_frame_equal(df, read_df, categorical_as_str=True)
130
131
132
@pytest.mark.parametrize("use_pyarrow", [True, False])
133
@pytest.mark.parametrize("rechunk_and_expected_chunks", [(True, 1), (False, 3)])
134
@pytest.mark.may_fail_auto_streaming
135
@pytest.mark.may_fail_cloud # reason: chunking
136
def test_read_parquet_respects_rechunk_16416(
137
use_pyarrow: bool, rechunk_and_expected_chunks: tuple[bool, int]
138
) -> None:
139
# Create a dataframe with 3 chunks:
140
df = pl.DataFrame({"a": [1]})
141
df = pl.concat([df, df, df])
142
buf = io.BytesIO()
143
df.write_parquet(buf, row_group_size=1)
144
buf.seek(0)
145
146
rechunk, expected_chunks = rechunk_and_expected_chunks
147
result = pl.read_parquet(buf, use_pyarrow=use_pyarrow, rechunk=rechunk)
148
assert result.n_chunks() == expected_chunks
149
150
151
@pytest.mark.write_disk
152
@pytest.mark.parametrize("compression", COMPRESSIONS)
153
def test_to_from_file(
154
df: pl.DataFrame, compression: ParquetCompression, tmp_path: Path
155
) -> None:
156
tmp_path.mkdir(exist_ok=True)
157
158
file_path = tmp_path / "small.avro"
159
df.write_parquet(file_path, compression=compression)
160
read_df = pl.read_parquet(file_path)
161
assert_frame_equal(df, read_df, categorical_as_str=True)
162
163
164
def test_select_columns() -> None:
165
df = pl.DataFrame({"a": [1, 2, 3], "b": [True, False, True], "c": ["a", "b", "c"]})
166
expected = pl.DataFrame({"b": [True, False, True], "c": ["a", "b", "c"]})
167
168
f = io.BytesIO()
169
df.write_parquet(f)
170
f.seek(0)
171
172
read_df = pl.read_parquet(f, columns=["b", "c"], use_pyarrow=False)
173
assert_frame_equal(expected, read_df)
174
175
176
def test_select_projection() -> None:
177
df = pl.DataFrame({"a": [1, 2, 3], "b": [True, False, True], "c": ["a", "b", "c"]})
178
expected = pl.DataFrame({"b": [True, False, True], "c": ["a", "b", "c"]})
179
f = io.BytesIO()
180
df.write_parquet(f)
181
f.seek(0)
182
183
read_df = pl.read_parquet(f, columns=[1, 2], use_pyarrow=False)
184
assert_frame_equal(expected, read_df)
185
186
187
@pytest.mark.parametrize("compression", COMPRESSIONS)
188
@pytest.mark.parametrize("use_pyarrow", [True, False])
189
def test_parquet_datetime(compression: ParquetCompression, use_pyarrow: bool) -> None:
190
# This failed because parquet writers cast datetime to Date
191
f = io.BytesIO()
192
data = {
193
"datetime": [ # unix timestamp in ms
194
1618354800000,
195
1618354740000,
196
1618354680000,
197
1618354620000,
198
1618354560000,
199
],
200
"value1": [73.1999969482, 71.0999984741, 74.5, 69.5999984741, 69.6999969482],
201
"value2": [59.5999984741, 61.0, 62.2999992371, 56.9000015259, 60.0],
202
}
203
df = pl.DataFrame(data)
204
df = df.with_columns(df["datetime"].cast(pl.Datetime))
205
206
df.write_parquet(f, use_pyarrow=use_pyarrow, compression=compression)
207
f.seek(0)
208
read = pl.read_parquet(f)
209
assert_frame_equal(read, df)
210
211
212
def test_nested_parquet() -> None:
213
f = io.BytesIO()
214
data = [
215
{"a": [{"b": 0}]},
216
{"a": [{"b": 1}, {"b": 2}]},
217
]
218
df = pd.DataFrame(data)
219
df.to_parquet(f)
220
221
read = pl.read_parquet(f, use_pyarrow=True)
222
assert read.columns == ["a"]
223
assert isinstance(read.dtypes[0], pl.datatypes.List)
224
assert isinstance(read.dtypes[0].inner, pl.datatypes.Struct)
225
226
227
@pytest.mark.write_disk
228
def test_glob_parquet(df: pl.DataFrame, tmp_path: Path) -> None:
229
tmp_path.mkdir(exist_ok=True)
230
file_path = tmp_path / "small.parquet"
231
df.write_parquet(file_path)
232
233
path_glob = tmp_path / "small*.parquet"
234
assert pl.read_parquet(path_glob).shape == (3, df.width)
235
assert pl.scan_parquet(path_glob).collect().shape == (3, df.width)
236
237
238
def test_chunked_round_trip() -> None:
239
df1 = pl.DataFrame(
240
{
241
"a": [1] * 2,
242
"l": [[1] for j in range(2)],
243
}
244
)
245
df2 = pl.DataFrame(
246
{
247
"a": [2] * 3,
248
"l": [[2] for j in range(3)],
249
}
250
)
251
252
df = df1.vstack(df2)
253
254
f = io.BytesIO()
255
df.write_parquet(f)
256
f.seek(0)
257
assert_frame_equal(pl.read_parquet(f), df)
258
259
260
@pytest.mark.write_disk
261
def test_lazy_self_join_file_cache_prop_3979(df: pl.DataFrame, tmp_path: Path) -> None:
262
tmp_path.mkdir(exist_ok=True)
263
264
file_path = tmp_path / "small.parquet"
265
df.write_parquet(file_path)
266
267
a = pl.scan_parquet(file_path)
268
b = pl.DataFrame({"a": [1]}).lazy()
269
270
expected_shape = (3, df.width + b.collect_schema().len())
271
assert a.join(b, how="cross").collect().shape == expected_shape
272
assert b.join(a, how="cross").collect().shape == expected_shape
273
274
275
def test_recursive_logical_type() -> None:
276
df = pl.DataFrame({"str": ["A", "B", "A", "B", "C"], "group": [1, 1, 2, 1, 2]})
277
df = df.with_columns(pl.col("str").cast(pl.Categorical))
278
279
df_groups = df.group_by("group").agg([pl.col("str").alias("cat_list")])
280
f = io.BytesIO()
281
df_groups.write_parquet(f, use_pyarrow=True)
282
f.seek(0)
283
read = pl.read_parquet(f, use_pyarrow=True)
284
assert read.dtypes == [pl.Int64, pl.List(pl.Categorical)]
285
assert read.shape == (2, 2)
286
287
288
def test_nested_dictionary() -> None:
289
df = (
290
pl.DataFrame({"str": ["A", "B", "A", "B", "C"], "group": [1, 1, 2, 1, 2]})
291
.with_columns(pl.col("str").cast(pl.Categorical))
292
.group_by("group")
293
.agg([pl.col("str").alias("cat_list")])
294
)
295
f = io.BytesIO()
296
df.write_parquet(f)
297
f.seek(0)
298
299
read_df = pl.read_parquet(f)
300
assert_frame_equal(df, read_df)
301
302
303
def test_row_group_size_saturation() -> None:
304
df = pl.DataFrame({"a": [1, 2, 3]})
305
f = io.BytesIO()
306
307
# request larger chunk than rows in df
308
df.write_parquet(f, row_group_size=1024)
309
f.seek(0)
310
assert_frame_equal(pl.read_parquet(f), df)
311
312
313
def test_nested_sliced() -> None:
314
for df in [
315
pl.Series([[1, 2], [3, 4], [5, 6]]).slice(2, 2).to_frame(),
316
pl.Series([[None, 2], [3, 4], [5, 6]]).to_frame(),
317
pl.Series([[None, 2], [3, 4], [5, 6]]).slice(2, 2).to_frame(),
318
pl.Series([["a", "a"], ["", "a"], ["c", "de"]]).slice(3, 2).to_frame(),
319
pl.Series([[None, True], [False, False], [True, True]]).slice(2, 2).to_frame(),
320
]:
321
f = io.BytesIO()
322
df.write_parquet(f)
323
f.seek(0)
324
assert_frame_equal(pl.read_parquet(f), df)
325
326
327
def test_parquet_5795() -> None:
328
df_pd = pd.DataFrame(
329
{
330
"a": [
331
"V",
332
"V",
333
"V",
334
"V",
335
"V",
336
"V",
337
"V",
338
"V",
339
"V",
340
"V",
341
"V",
342
"V",
343
"V",
344
"V",
345
None,
346
None,
347
None,
348
None,
349
None,
350
None,
351
]
352
}
353
)
354
f = io.BytesIO()
355
df_pd.to_parquet(f)
356
f.seek(0)
357
assert_frame_equal(pl.read_parquet(f), pl.from_pandas(df_pd))
358
359
360
def test_parquet_nesting_structs_list() -> None:
361
f = io.BytesIO()
362
df = pl.from_records(
363
[
364
{
365
"id": 1,
366
"list_of_structs_col": [
367
{"a": 10, "b": [10, 11, 12]},
368
{"a": 11, "b": [13, 14, 15]},
369
],
370
},
371
{
372
"id": 2,
373
"list_of_structs_col": [
374
{"a": 44, "b": [12]},
375
],
376
},
377
]
378
)
379
380
df.write_parquet(f)
381
f.seek(0)
382
383
assert_frame_equal(pl.read_parquet(f), df)
384
385
386
def test_parquet_nested_dictionaries_6217() -> None:
387
_type = pa.dictionary(pa.int64(), pa.string())
388
389
fields = [("a_type", _type)]
390
struct_type = pa.struct(fields)
391
392
col1 = pa.StructArray.from_arrays(
393
[pa.DictionaryArray.from_arrays([0, 0, 1], ["A", "B"])],
394
fields=struct_type,
395
)
396
397
table = pa.table({"Col1": col1})
398
399
df = pl.from_arrow(table)
400
401
f = io.BytesIO()
402
import pyarrow.parquet as pq
403
404
pq.write_table(table, f, compression="snappy")
405
f.seek(0)
406
read = pl.read_parquet(f)
407
assert_frame_equal(read, df) # type: ignore[arg-type]
408
409
410
@pytest.mark.write_disk
411
def test_head_union(tmp_path: Path) -> None:
412
tmp_path.mkdir(exist_ok=True)
413
414
df1 = pl.DataFrame({"a": [0, 1, 2], "b": [1, 2, 3]})
415
df2 = pl.DataFrame({"a": [3, 4, 5], "b": [4, 5, 6]})
416
417
file_path_1 = tmp_path / "df_fetch_1.parquet"
418
file_path_2 = tmp_path / "df_fetch_2.parquet"
419
file_path_glob = tmp_path / "df_fetch_*.parquet"
420
421
df1.write_parquet(file_path_1)
422
df2.write_parquet(file_path_2)
423
424
result_one = pl.scan_parquet(file_path_1).head(1).collect()
425
result_glob = pl.scan_parquet(file_path_glob).head(1).collect()
426
427
expected = pl.DataFrame({"a": [0], "b": [1]})
428
assert_frame_equal(result_one, expected)
429
430
# Both fetch 1 per file or 1 per dataset would be ok, as we don't guarantee anything
431
# currently we have one per dataset.
432
expected = pl.DataFrame({"a": [0], "b": [1]})
433
assert_frame_equal(result_glob, expected)
434
435
436
@pytest.mark.slow
437
def test_struct_pyarrow_dataset_5796(tmp_path: Path) -> None:
438
tmp_path.mkdir(exist_ok=True)
439
440
num_rows = 2**17 + 1
441
442
df = pl.from_records([{"id": i, "nested": {"a": i}} for i in range(num_rows)])
443
file_path = tmp_path / "out.parquet"
444
df.write_parquet(file_path, use_pyarrow=True)
445
tbl = ds.dataset(file_path).to_table()
446
result = pl.from_arrow(tbl)
447
448
assert_frame_equal(result, df) # type: ignore[arg-type]
449
450
451
@pytest.mark.slow
452
@pytest.mark.parametrize("case", [1048576, 1048577])
453
def test_parquet_chunks_545(case: int) -> None:
454
f = io.BytesIO()
455
# repeat until it has case instances
456
df = pd.DataFrame(
457
np.tile([1.0, pd.to_datetime("2010-10-10")], [case, 1]),
458
columns=["floats", "dates"],
459
)
460
461
# write as parquet
462
df.to_parquet(f)
463
f.seek(0)
464
465
# read it with polars
466
polars_df = pl.read_parquet(f)
467
assert_frame_equal(pl.DataFrame(df), polars_df)
468
469
470
def test_nested_null_roundtrip() -> None:
471
f = io.BytesIO()
472
df = pl.DataFrame(
473
{
474
"experiences": [
475
[
476
{"company": "Google", "years": None},
477
{"company": "Facebook", "years": None},
478
],
479
]
480
}
481
)
482
483
df.write_parquet(f)
484
f.seek(0)
485
df_read = pl.read_parquet(f)
486
assert_frame_equal(df_read, df)
487
488
489
def test_parquet_nested_list_pandas() -> None:
490
# pandas/pyarrow writes as nested null dict
491
df_pd = pd.DataFrame({"listcol": [[] * 10]})
492
f = io.BytesIO()
493
df_pd.to_parquet(f)
494
f.seek(0)
495
df = pl.read_parquet(f)
496
assert df.dtypes == [pl.List(pl.Null)]
497
assert df.to_dict(as_series=False) == {"listcol": [[]]}
498
499
500
def test_parquet_cat_roundtrip() -> None:
501
f = io.BytesIO()
502
503
df = pl.DataFrame({"a": ["a", "b", "c", "d"]}).with_columns(
504
pl.col("a").cast(pl.Categorical)
505
)
506
507
df.write_parquet(f, row_group_size=2)
508
f.seek(0)
509
assert_series_equal(pl.read_parquet(f)["a"], df["a"])
510
511
512
def test_tz_aware_parquet_9586(io_files_path: Path) -> None:
513
result = pl.read_parquet(io_files_path / "tz_aware.parquet")
514
expected = pl.DataFrame(
515
{"UTC_DATETIME_ID": [datetime(2023, 6, 26, 14, 15, 0, tzinfo=timezone.utc)]}
516
).select(pl.col("*").cast(pl.Datetime("ns", "UTC")))
517
assert_frame_equal(result, expected)
518
519
520
def test_nested_list_page_reads_to_end_11548() -> None:
521
df = pl.select(
522
pl.repeat(pl.arange(0, 2048, dtype=pl.UInt64).implode(), 2).alias("x"),
523
)
524
525
f = io.BytesIO()
526
527
pq.write_table(df.to_arrow(), f, data_page_size=1)
528
529
f.seek(0)
530
531
result = pl.read_parquet(f).select(pl.col("x").list.len())
532
assert result.to_series().to_list() == [2048, 2048]
533
534
535
def test_parquet_nano_second_schema() -> None:
536
value = time(9, 0, 0)
537
f = io.BytesIO()
538
df = pd.DataFrame({"Time": [value]})
539
df.to_parquet(f)
540
f.seek(0)
541
assert pl.read_parquet(f).item() == value
542
543
544
def test_nested_struct_read_12610() -> None:
545
n = 1_025
546
expect = pl.select(a=pl.int_range(0, n), b=pl.repeat(1, n)).with_columns(
547
struct=pl.struct(pl.all())
548
)
549
550
f = io.BytesIO()
551
expect.write_parquet(
552
f,
553
use_pyarrow=True,
554
)
555
f.seek(0)
556
557
actual = pl.read_parquet(f)
558
assert_frame_equal(expect, actual)
559
560
561
@pytest.mark.write_disk
562
def test_decimal_parquet(tmp_path: Path) -> None:
563
path = tmp_path / "foo.parquet"
564
df = pl.DataFrame(
565
{
566
"foo": [1, 2, 3],
567
"bar": ["6", "7", "8"],
568
}
569
)
570
571
df = df.with_columns(pl.col("bar").cast(pl.Decimal(scale=3)))
572
573
df.write_parquet(path, statistics=True)
574
out = pl.scan_parquet(path).filter(foo=2).collect().to_dict(as_series=False)
575
assert out == {"foo": [2], "bar": [Decimal("7")]}
576
577
578
@pytest.mark.write_disk
579
def test_enum_parquet(tmp_path: Path) -> None:
580
path = tmp_path / "enum.parquet"
581
df = pl.DataFrame(
582
[pl.Series("e", ["foo", "bar", "ham"], dtype=pl.Enum(["foo", "bar", "ham"]))]
583
)
584
df.write_parquet(path)
585
out = pl.read_parquet(path)
586
assert_frame_equal(df, out)
587
588
589
def test_parquet_rle_non_nullable_12814() -> None:
590
column = (
591
pl.select(x=pl.arange(0, 1025, dtype=pl.Int64) // 10).to_series().to_arrow()
592
)
593
schema = pa.schema([pa.field("foo", pa.int64(), nullable=False)])
594
table = pa.Table.from_arrays([column], schema=schema)
595
596
f = io.BytesIO()
597
pq.write_table(table, f, data_page_size=1)
598
599
f.seek(0)
600
expect = pl.DataFrame(table).tail(10)
601
actual = pl.read_parquet(f).tail(10)
602
603
assert_frame_equal(expect, actual)
604
605
606
@pytest.mark.slow
607
def test_parquet_12831() -> None:
608
n = 70_000
609
df = pl.DataFrame({"x": ["aaaaaa"] * n})
610
f = io.BytesIO()
611
df.write_parquet(f, row_group_size=int(1e8), data_page_size=512)
612
f.seek(0)
613
assert_frame_equal(pl.from_arrow(pq.read_table(f)), df) # type: ignore[arg-type]
614
615
616
@pytest.mark.write_disk
617
def test_parquet_struct_categorical(tmp_path: Path) -> None:
618
tmp_path.mkdir(exist_ok=True)
619
620
df = pl.DataFrame(
621
[
622
pl.Series("a", ["bob"], pl.Categorical),
623
pl.Series("b", ["foo"], pl.Categorical),
624
]
625
)
626
627
file_path = tmp_path / "categorical.parquet"
628
df.write_parquet(file_path)
629
630
out = pl.read_parquet(file_path).select(pl.col("b").value_counts())
631
assert out.to_dict(as_series=False) == {"b": [{"b": "foo", "count": 1}]}
632
633
634
@pytest.mark.write_disk
635
def test_null_parquet(tmp_path: Path) -> None:
636
tmp_path.mkdir(exist_ok=True)
637
638
df = pl.DataFrame([pl.Series("foo", [], dtype=pl.Int8)])
639
file_path = tmp_path / "null.parquet"
640
df.write_parquet(file_path)
641
out = pl.read_parquet(file_path)
642
assert_frame_equal(out, df)
643
644
645
@pytest.mark.write_disk
646
def test_write_parquet_with_null_col(tmp_path: Path) -> None:
647
tmp_path.mkdir(exist_ok=True)
648
649
df1 = pl.DataFrame({"nulls": [None] * 2, "ints": [1] * 2})
650
df2 = pl.DataFrame({"nulls": [None] * 2, "ints": [1] * 2})
651
df3 = pl.DataFrame({"nulls": [None] * 3, "ints": [1] * 3})
652
df = df1.vstack(df2)
653
df = df.vstack(df3)
654
file_path = tmp_path / "with_null.parquet"
655
df.write_parquet(file_path, row_group_size=3)
656
out = pl.read_parquet(file_path)
657
assert_frame_equal(out, df)
658
659
660
@pytest.mark.write_disk
661
def test_scan_parquet_binary_buffered_reader(tmp_path: Path) -> None:
662
tmp_path.mkdir(exist_ok=True)
663
664
df = pl.DataFrame({"a": [1, 2, 3]})
665
file_path = tmp_path / "test.parquet"
666
df.write_parquet(file_path)
667
668
with file_path.open("rb") as f:
669
out = pl.scan_parquet(f).collect()
670
assert_frame_equal(out, df)
671
672
673
@pytest.mark.write_disk
674
def test_read_parquet_binary_buffered_reader(tmp_path: Path) -> None:
675
tmp_path.mkdir(exist_ok=True)
676
677
df = pl.DataFrame({"a": [1, 2, 3]})
678
file_path = tmp_path / "test.parquet"
679
df.write_parquet(file_path)
680
681
with file_path.open("rb") as f:
682
out = pl.read_parquet(f)
683
assert_frame_equal(out, df)
684
685
686
@pytest.mark.write_disk
687
def test_read_parquet_binary_file_io(tmp_path: Path) -> None:
688
tmp_path.mkdir(exist_ok=True)
689
690
df = pl.DataFrame({"a": [1, 2, 3]})
691
file_path = tmp_path / "test.parquet"
692
df.write_parquet(file_path)
693
694
with file_path.open("rb", buffering=0) as f:
695
out = pl.read_parquet(f)
696
assert_frame_equal(out, df)
697
698
699
# https://github.com/pola-rs/polars/issues/15760
700
@pytest.mark.write_disk
701
def test_read_parquet_binary_fsspec(tmp_path: Path) -> None:
702
tmp_path.mkdir(exist_ok=True)
703
704
df = pl.DataFrame({"a": [1, 2, 3]})
705
file_path = tmp_path / "test.parquet"
706
df.write_parquet(file_path)
707
708
with fsspec.open(file_path) as f:
709
out = pl.read_parquet(f)
710
assert_frame_equal(out, df)
711
712
713
def test_read_parquet_binary_bytes_io() -> None:
714
df = pl.DataFrame({"a": [1, 2, 3]})
715
f = io.BytesIO()
716
df.write_parquet(f)
717
f.seek(0)
718
719
out = pl.read_parquet(f)
720
assert_frame_equal(out, df)
721
722
723
def test_read_parquet_binary_bytes() -> None:
724
df = pl.DataFrame({"a": [1, 2, 3]})
725
f = io.BytesIO()
726
df.write_parquet(f)
727
bytes = f.getvalue()
728
729
out = pl.read_parquet(bytes)
730
assert_frame_equal(out, df)
731
732
733
def test_utc_timezone_normalization_13670(tmp_path: Path) -> None:
734
"""'+00:00' timezones becomes 'UTC' timezone."""
735
utc_path = tmp_path / "utc.parquet"
736
zero_path = tmp_path / "00_00.parquet"
737
utc_lowercase_path = tmp_path / "utc_lowercase.parquet"
738
for tz, path in [
739
("+00:00", zero_path),
740
("UTC", utc_path),
741
("utc", utc_lowercase_path),
742
]:
743
pq.write_table(
744
pa.table(
745
{"c1": [1234567890123] * 10},
746
schema=pa.schema([pa.field("c1", pa.timestamp("ms", tz=tz))]),
747
),
748
path,
749
)
750
751
df = pl.scan_parquet([utc_path, zero_path]).head(5).collect()
752
assert cast("pl.Datetime", df.schema["c1"]).time_zone == "UTC"
753
df = pl.scan_parquet([zero_path, utc_path]).head(5).collect()
754
assert cast("pl.Datetime", df.schema["c1"]).time_zone == "UTC"
755
df = pl.scan_parquet([zero_path, utc_lowercase_path]).head(5).collect()
756
assert cast("pl.Datetime", df.schema["c1"]).time_zone == "UTC"
757
758
759
def test_parquet_rle_14333() -> None:
760
vals = [True, False, True, False, True, False, True, False, True, False]
761
table = pa.table({"a": vals})
762
763
f = io.BytesIO()
764
pq.write_table(table, f, data_page_version="2.0")
765
f.seek(0)
766
assert pl.read_parquet(f)["a"].to_list() == vals
767
768
769
def test_parquet_rle_null_binary_read_14638() -> None:
770
df = pl.DataFrame({"x": [None]}, schema={"x": pl.String})
771
772
f = io.BytesIO()
773
df.write_parquet(f, use_pyarrow=True)
774
f.seek(0)
775
assert "RLE_DICTIONARY" in pq.read_metadata(f).row_group(0).column(0).encodings
776
f.seek(0)
777
assert_frame_equal(df, pl.read_parquet(f))
778
779
780
def test_parquet_string_rle_encoding() -> None:
781
n = 3
782
data = {
783
"id": ["abcdefgh"] * n,
784
}
785
786
df = pl.DataFrame(data)
787
f = io.BytesIO()
788
df.write_parquet(f, use_pyarrow=False)
789
f.seek(0)
790
791
assert (
792
"RLE_DICTIONARY"
793
in pq.ParquetFile(f).metadata.to_dict()["row_groups"][0]["columns"][0][
794
"encodings"
795
]
796
)
797
798
799
@pytest.mark.may_fail_auto_streaming
800
def test_sliced_dict_with_nulls_14904() -> None:
801
df = (
802
pl.DataFrame({"x": [None, None]})
803
.cast(pl.Categorical)
804
.with_columns(y=pl.concat_list("x"))
805
.slice(0, 1)
806
)
807
test_round_trip(df)
808
809
810
@pytest.fixture
811
def empty_compressed_datapage_v2_path(io_files_path: Path) -> Path:
812
return io_files_path / "empty_datapage_v2.snappy.parquet"
813
814
815
def test_read_empty_compressed_datapage_v2_22170(
816
empty_compressed_datapage_v2_path: Path,
817
) -> None:
818
df = pl.DataFrame({"value": [None]}, schema={"value": pl.Float32})
819
assert_frame_equal(df, pl.read_parquet(empty_compressed_datapage_v2_path))
820
821
822
def test_parquet_array_dtype() -> None:
823
df = pl.DataFrame({"x": []})
824
df = df.cast({"x": pl.Array(pl.Int64, shape=3)})
825
test_round_trip(df)
826
827
828
def test_parquet_array_dtype_nulls() -> None:
829
df = pl.DataFrame({"x": [[1, 2], None, [None, 3]]})
830
df = df.cast({"x": pl.Array(pl.Int64, shape=2)})
831
test_round_trip(df)
832
833
834
@pytest.mark.parametrize(
835
("series", "dtype"),
836
[
837
([[1, 2, 3]], pl.List(pl.Int64)),
838
([[1, None, 3]], pl.List(pl.Int64)),
839
(
840
[{"a": []}, {"a": [1]}, {"a": [1, 2, 3]}],
841
pl.Struct({"a": pl.List(pl.Int64)}),
842
),
843
([{"a": None}, None, {"a": [1, 2, None]}], pl.Struct({"a": pl.List(pl.Int64)})),
844
(
845
[[{"a": []}, {"a": [1]}, {"a": [1, 2, 3]}], None, [{"a": []}, {"a": [42]}]],
846
pl.List(pl.Struct({"a": pl.List(pl.Int64)})),
847
),
848
(
849
[
850
[1, None, 3],
851
None,
852
[1, 3, 4],
853
None,
854
[9, None, 4],
855
[None, 42, 13],
856
[37, 511, None],
857
],
858
pl.List(pl.Int64),
859
),
860
([[1, 2, 3]], pl.Array(pl.Int64, 3)),
861
([[1, None, 3], None, [1, 2, None]], pl.Array(pl.Int64, 3)),
862
([[1, 2], None, [None, 3]], pl.Array(pl.Int64, 2)),
863
pytest.param(
864
[[], [], []],
865
pl.Array(pl.Int64, 0),
866
marks=pytest.mark.may_fail_cloud,
867
), # reason: zero-width array
868
pytest.param(
869
[[], None, []],
870
pl.Array(pl.Int64, 0),
871
marks=pytest.mark.may_fail_cloud,
872
),
873
(
874
[[[1, 5, 2], [42, 13, 37]], [[1, 2, 3], [5, 2, 3]], [[1, 2, 1], [3, 1, 3]]],
875
pl.Array(pl.Array(pl.Int8, 3), 2),
876
),
877
(
878
[[[1, 5, 2], [42, 13, 37]], None, [None, [3, 1, 3]]],
879
pl.Array(pl.Array(pl.Int8, 3), 2),
880
),
881
(
882
[
883
[[[2, 1], None, [4, 1], None], []],
884
None,
885
[None, [[4, 4], None, [1, 2]]],
886
],
887
pl.Array(pl.List(pl.Array(pl.Int8, 2)), 2),
888
),
889
([[[], []]], pl.Array(pl.List(pl.Array(pl.Int8, 2)), 2)),
890
(
891
[
892
[
893
[[[42, 13, 37, 15, 9, 20, 0, 0, 5, 10], None]],
894
[None, [None, [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]], None],
895
]
896
],
897
pl.Array(pl.List(pl.Array(pl.Array(pl.Int8, 10), 2)), 2),
898
),
899
(
900
[
901
None,
902
[None],
903
[[None]],
904
[[[None]]],
905
[[[[None]]]],
906
[[[[[None]]]]],
907
[[[[[1]]]]],
908
],
909
pl.Array(pl.Array(pl.Array(pl.Array(pl.Array(pl.Int8, 1), 1), 1), 1), 1),
910
),
911
(
912
[
913
None,
914
[None],
915
[[]],
916
[[None]],
917
[[[None], None]],
918
[[[None], [None]]],
919
[[[[None]], [[[1]]]]],
920
[[[[[None]]]]],
921
[[[[[1]]]]],
922
],
923
pl.Array(pl.List(pl.Array(pl.List(pl.Array(pl.Int8, 1)), 1)), 1),
924
),
925
],
926
)
927
def test_complex_types(series: list[Any], dtype: pl.DataType) -> None:
928
xs = pl.Series(series, dtype=dtype)
929
df = pl.DataFrame({"x": xs})
930
931
test_round_trip(df)
932
933
934
@pytest.mark.write_disk
935
def test_parquet_array_statistics(tmp_path: Path) -> None:
936
tmp_path.mkdir(exist_ok=True)
937
938
df = pl.DataFrame({"a": [[1, 2, 3], [4, 5, 6], [7, 8, 9]], "b": [1, 2, 3]})
939
file_path = tmp_path / "test.parquet"
940
941
df.with_columns(a=pl.col("a").list.to_array(3)).lazy().filter(
942
pl.col("a") != [1, 2, 3]
943
).collect()
944
df.with_columns(a=pl.col("a").list.to_array(3)).lazy().sink_parquet(file_path)
945
946
result = pl.scan_parquet(file_path).filter(pl.col("a") != [1, 2, 3]).collect()
947
assert result.to_dict(as_series=False) == {"a": [[4, 5, 6], [7, 8, 9]], "b": [2, 3]}
948
949
950
@pytest.mark.slow
951
@pytest.mark.write_disk
952
def test_read_parquet_only_loads_selected_columns_15098(
953
memory_usage_without_pyarrow: MemoryUsage, tmp_path: Path
954
) -> None:
955
"""Only requested columns are loaded by ``read_parquet()``."""
956
tmp_path.mkdir(exist_ok=True)
957
958
# Each column will be about 8MB of RAM
959
series = pl.arange(0, 1_000_000, dtype=pl.Int64, eager=True)
960
961
file_path = tmp_path / "multicolumn.parquet"
962
df = pl.DataFrame(
963
{
964
"a": series,
965
"b": series,
966
}
967
)
968
df.write_parquet(file_path)
969
del df, series
970
971
memory_usage_without_pyarrow.reset_tracking()
972
973
# Only load one column:
974
df = pl.read_parquet([file_path], columns=["b"], rechunk=False)
975
del df
976
# Only one column's worth of memory should be used; 2 columns would be
977
# 16_000_000 at least, but there's some overhead.
978
# assert 8_000_000 < memory_usage_without_pyarrow.get_peak() < 13_000_000
979
980
981
@pytest.mark.release
982
def test_max_statistic_parquet_writer() -> None:
983
# this hits the maximal page size
984
# so the row group will be split into multiple pages
985
# the page statistics need to be correctly reduced
986
# for this query to make sense
987
n = 150_000
988
989
# int64 is important to hit the page size
990
df = pl.int_range(0, n, eager=True, dtype=pl.Int64).alias("int").to_frame()
991
f = io.BytesIO()
992
df.write_parquet(f, statistics=True, use_pyarrow=False, row_group_size=n)
993
f.seek(0)
994
result = pl.scan_parquet(f).filter(pl.col("int") > n - 3).collect()
995
expected = pl.DataFrame({"int": [149998, 149999]})
996
assert_frame_equal(result, expected)
997
998
999
@pytest.mark.slow
1000
def test_hybrid_rle() -> None:
1001
# 10_007 elements to test if not a nice multiple of 8
1002
n = 10_007
1003
literal_literal = []
1004
literal_rle = []
1005
for i in range(500):
1006
literal_literal.append(np.repeat(i, 5))
1007
literal_literal.append(np.repeat(i + 2, 11))
1008
literal_rle.append(np.repeat(i, 5))
1009
literal_rle.append(np.repeat(i + 2, 15))
1010
literal_literal.append(np.random.randint(0, 10, size=2007))
1011
literal_rle.append(np.random.randint(0, 10, size=7))
1012
1013
literal_literal_flat = np.concatenate(literal_literal)
1014
literal_rle_flat = np.concatenate(literal_rle)
1015
df = pl.DataFrame(
1016
{
1017
# Primitive types
1018
"i64": pl.Series([1, 2], dtype=pl.Int64).sample(n, with_replacement=True),
1019
"u64": pl.Series([1, 2], dtype=pl.UInt64).sample(n, with_replacement=True),
1020
"i8": pl.Series([1, 2], dtype=pl.Int8).sample(n, with_replacement=True),
1021
"u8": pl.Series([1, 2], dtype=pl.UInt8).sample(n, with_replacement=True),
1022
"string": pl.Series(["abc", "def"], dtype=pl.String).sample(
1023
n, with_replacement=True
1024
),
1025
"categorical": pl.Series(["aaa", "bbb"], dtype=pl.Categorical).sample(
1026
n, with_replacement=True
1027
),
1028
# Fill up bit-packing buffer in middle of consecutive run
1029
"large_bit_pack": np.concatenate(
1030
[np.repeat(i, 5) for i in range(2000)]
1031
+ [np.random.randint(0, 10, size=7)]
1032
),
1033
# Literal run that is not a multiple of 8 followed by consecutive
1034
# run initially long enough to RLE but not after padding literal
1035
"literal_literal": literal_literal_flat,
1036
# Literal run that is not a multiple of 8 followed by consecutive
1037
# run long enough to RLE even after padding literal
1038
"literal_rle": literal_rle_flat,
1039
# Final run not long enough to RLE
1040
"final_literal": np.concatenate(
1041
[np.random.randint(0, 100, 10_000), np.repeat(-1, 7)]
1042
),
1043
# Final run long enough to RLE
1044
"final_rle": np.concatenate(
1045
[np.random.randint(0, 100, 9_998), np.repeat(-1, 9)]
1046
),
1047
# Test filling up bit-packing buffer for encode_bool,
1048
# which is only used to encode validities
1049
"large_bit_pack_validity": [0, None] * 4092
1050
+ [0] * 9
1051
+ [1] * 9
1052
+ [2] * 10
1053
+ [0] * 1795,
1054
}
1055
)
1056
f = io.BytesIO()
1057
df.write_parquet(f)
1058
f.seek(0)
1059
for col in pq.ParquetFile(f).metadata.to_dict()["row_groups"][0]["columns"]:
1060
assert "RLE_DICTIONARY" in col["encodings"]
1061
f.seek(0)
1062
assert_frame_equal(pl.read_parquet(f), df)
1063
1064
1065
@given(
1066
df=dataframes(
1067
allowed_dtypes=[
1068
pl.Null,
1069
pl.List,
1070
pl.Array,
1071
pl.Int8,
1072
pl.Int16,
1073
pl.Int32,
1074
pl.Int64,
1075
pl.Int128,
1076
pl.UInt8,
1077
pl.UInt32,
1078
pl.UInt64,
1079
pl.UInt128,
1080
pl.Date,
1081
pl.Time,
1082
pl.Binary,
1083
pl.Float16,
1084
pl.Float32,
1085
pl.Float64,
1086
pl.String,
1087
pl.Boolean,
1088
],
1089
min_size=1,
1090
max_size=500,
1091
)
1092
)
1093
@pytest.mark.slow
1094
def test_roundtrip_parametric(df: pl.DataFrame) -> None:
1095
f = io.BytesIO()
1096
df.write_parquet(f)
1097
f.seek(0)
1098
result = pl.read_parquet(f)
1099
1100
assert_frame_equal(df, result)
1101
1102
1103
def test_parquet_statistics_uint64_16683() -> None:
1104
u64_max = (1 << 64) - 1
1105
df = pl.Series("a", [u64_max, 0], dtype=pl.UInt64).to_frame()
1106
file = io.BytesIO()
1107
df.write_parquet(file, statistics=True)
1108
file.seek(0)
1109
statistics = pq.read_metadata(file).row_group(0).column(0).statistics
1110
1111
assert statistics.min == 0
1112
assert statistics.max == u64_max
1113
1114
1115
@pytest.mark.slow
1116
@pytest.mark.parametrize("nullable", [True, False])
1117
def test_read_byte_stream_split(nullable: bool) -> None:
1118
rng = np.random.default_rng(123)
1119
num_rows = 1_000
1120
values = rng.uniform(-1.0e6, 1.0e6, num_rows)
1121
if nullable:
1122
validity_mask = rng.integers(0, 2, num_rows).astype(np.bool_)
1123
else:
1124
validity_mask = None
1125
1126
schema = pa.schema(
1127
[
1128
pa.field("floats", type=pa.float32(), nullable=nullable),
1129
pa.field("doubles", type=pa.float64(), nullable=nullable),
1130
]
1131
)
1132
arrays = [pa.array(values, type=field.type, mask=validity_mask) for field in schema]
1133
table = pa.Table.from_arrays(arrays, schema=schema)
1134
df = cast("pl.DataFrame", pl.from_arrow(table))
1135
1136
f = io.BytesIO()
1137
pq.write_table(
1138
table, f, compression="snappy", use_dictionary=False, use_byte_stream_split=True
1139
)
1140
1141
f.seek(0)
1142
read = pl.read_parquet(f)
1143
1144
assert_frame_equal(read, df)
1145
1146
1147
@pytest.mark.slow
1148
@pytest.mark.parametrize("rows_nullable", [True, False])
1149
@pytest.mark.parametrize("item_nullable", [True, False])
1150
def test_read_byte_stream_split_arrays(
1151
item_nullable: bool, rows_nullable: bool
1152
) -> None:
1153
rng = np.random.default_rng(123)
1154
num_rows = 1_000
1155
max_array_len = 10
1156
array_lengths = rng.integers(0, max_array_len + 1, num_rows)
1157
if rows_nullable:
1158
row_validity_mask = rng.integers(0, 2, num_rows).astype(np.bool_)
1159
array_lengths[row_validity_mask] = 0
1160
row_validity_mask = pa.array(row_validity_mask)
1161
else:
1162
row_validity_mask = None
1163
1164
offsets = np.zeros(num_rows + 1, dtype=np.int64)
1165
np.cumsum(array_lengths, out=offsets[1:])
1166
num_values = offsets[-1]
1167
values = rng.uniform(-1.0e6, 1.0e6, num_values)
1168
1169
if item_nullable:
1170
element_validity_mask = rng.integers(0, 2, num_values).astype(np.bool_)
1171
else:
1172
element_validity_mask = None
1173
1174
schema = pa.schema(
1175
[
1176
pa.field(
1177
"floats",
1178
type=pa.list_(pa.field("", pa.float32(), nullable=item_nullable)),
1179
nullable=rows_nullable,
1180
),
1181
pa.field(
1182
"doubles",
1183
type=pa.list_(pa.field("", pa.float64(), nullable=item_nullable)),
1184
nullable=rows_nullable,
1185
),
1186
]
1187
)
1188
arrays = [
1189
pa.ListArray.from_arrays(
1190
pa.array(offsets),
1191
pa.array(values, type=field.type.field(0).type, mask=element_validity_mask),
1192
mask=row_validity_mask,
1193
)
1194
for field in schema
1195
]
1196
table = pa.Table.from_arrays(arrays, schema=schema)
1197
df = cast("pl.DataFrame", pl.from_arrow(table))
1198
1199
f = io.BytesIO()
1200
pq.write_table(
1201
table, f, compression="snappy", use_dictionary=False, use_byte_stream_split=True
1202
)
1203
1204
f.seek(0)
1205
read = pl.read_parquet(f)
1206
1207
assert_frame_equal(read, df)
1208
1209
1210
def test_parquet_nested_null_array_17795() -> None:
1211
f = io.BytesIO()
1212
pl.DataFrame([{"struct": {"field": None}}]).write_parquet(f)
1213
f.seek(0)
1214
pq.read_table(f)
1215
1216
1217
def test_parquet_record_batches_pyarrow_fixed_size_list_16614() -> None:
1218
# @NOTE:
1219
# The minimum that I could get it to crash which was ~132000, but let's
1220
# just do 150000 to be sure.
1221
n = 150000
1222
x = pl.DataFrame(
1223
{"x": np.linspace((1, 2), (2 * n, 2 * n * 1), n, dtype=np.float32)},
1224
schema={"x": pl.Array(pl.Float32, 2)},
1225
)
1226
1227
f = io.BytesIO()
1228
x.write_parquet(f)
1229
f.seek(0)
1230
b = pl.read_parquet(f, use_pyarrow=True)
1231
1232
assert b["x"].shape[0] == n
1233
assert_frame_equal(b, x)
1234
1235
1236
def test_parquet_list_element_field_name() -> None:
1237
f = io.BytesIO()
1238
(
1239
pl.DataFrame(
1240
{
1241
"a": [[1, 2], [1, 1, 1]],
1242
},
1243
schema={"a": pl.List(pl.Int64)},
1244
).write_parquet(f, use_pyarrow=False)
1245
)
1246
1247
f.seek(0)
1248
schema_str = str(pq.read_schema(f))
1249
assert "<element: int64>" in schema_str
1250
assert "child 0, element: int64" in schema_str
1251
1252
1253
def test_nested_decimal() -> None:
1254
df = pl.DataFrame(
1255
{
1256
"a": [
1257
{"f0": None},
1258
None,
1259
]
1260
},
1261
schema={"a": pl.Struct({"f0": pl.Decimal(precision=38, scale=8)})},
1262
)
1263
test_round_trip(df)
1264
1265
1266
def test_nested_non_uniform_primitive() -> None:
1267
df = pl.DataFrame(
1268
{"a": [{"x": 0, "y": None}]},
1269
schema={
1270
"a": pl.Struct(
1271
{
1272
"x": pl.Int16,
1273
"y": pl.Int64,
1274
}
1275
)
1276
},
1277
)
1278
test_round_trip(df)
1279
1280
1281
def test_parquet_nested_struct_17933() -> None:
1282
df = pl.DataFrame(
1283
{"a": [{"x": {"u": None}, "y": True}]},
1284
schema={
1285
"a": pl.Struct(
1286
{
1287
"x": pl.Struct({"u": pl.String}),
1288
"y": pl.Boolean(),
1289
}
1290
)
1291
},
1292
)
1293
test_round_trip(df)
1294
1295
1296
# This is fixed with POLARS_FORCE_MULTISCAN=1. Without it we have
1297
# first_metadata.unwrap() on None.
1298
@pytest.mark.may_fail_auto_streaming
1299
def test_parquet_pyarrow_map() -> None:
1300
xs = [
1301
[
1302
(0, 5),
1303
(1, 10),
1304
(2, 19),
1305
(3, 96),
1306
]
1307
]
1308
1309
table = pa.table(
1310
[xs],
1311
schema=pa.schema(
1312
[
1313
("x", pa.map_(pa.int32(), pa.int32(), keys_sorted=True)),
1314
]
1315
),
1316
)
1317
1318
f = io.BytesIO()
1319
pq.write_table(table, f)
1320
1321
expected = pl.DataFrame(
1322
{
1323
"x": [
1324
{"key": 0, "value": 5},
1325
{"key": 1, "value": 10},
1326
{"key": 2, "value": 19},
1327
{"key": 3, "value": 96},
1328
]
1329
},
1330
schema={"x": pl.Struct({"key": pl.Int32, "value": pl.Int32})},
1331
)
1332
f.seek(0)
1333
assert_frame_equal(pl.read_parquet(f).explode(["x"]), expected)
1334
1335
# Test for https://github.com/pola-rs/polars/issues/21317
1336
# Specifying schema/allow_missing_columns
1337
for missing_columns in ["insert", "raise"]:
1338
f.seek(0)
1339
assert_frame_equal(
1340
pl.read_parquet(
1341
f,
1342
schema={"x": pl.List(pl.Struct({"key": pl.Int32, "value": pl.Int32}))},
1343
missing_columns=missing_columns, # type: ignore[arg-type]
1344
).explode(["x"]),
1345
expected,
1346
)
1347
1348
1349
@pytest.mark.parametrize(
1350
("s", "elem"),
1351
[
1352
(pl.Series(["", "hello", "hi", ""], dtype=pl.String), ""),
1353
(pl.Series([0, 1, 2, 0], dtype=pl.Int64), 0),
1354
(pl.Series([[0], [1], [2], [0]], dtype=pl.Array(pl.Int64, 1)), [0]),
1355
(
1356
pl.Series([[0, 1], [1, 2], [2, 3], [0, 1]], dtype=pl.Array(pl.Int64, 2)),
1357
[0, 1],
1358
),
1359
],
1360
)
1361
def test_parquet_high_nested_null_17805(
1362
s: pl.Series, elem: str | int | list[int]
1363
) -> None:
1364
test_round_trip(
1365
pl.DataFrame({"a": s}).select(
1366
pl.when(pl.col("a") == elem)
1367
.then(pl.lit(None))
1368
.otherwise(pl.concat_list(pl.col("a").alias("b")))
1369
.alias("c")
1370
)
1371
)
1372
1373
1374
def test_struct_plain_encoded_statistics() -> None:
1375
df = pl.DataFrame(
1376
{
1377
"a": [None, None, None, None, {"x": None, "y": 0}],
1378
},
1379
schema={"a": pl.Struct({"x": pl.Int8, "y": pl.Int8})},
1380
)
1381
1382
test_scan_round_trip(df)
1383
1384
1385
@given(
1386
df=dataframes(
1387
min_size=5,
1388
excluded_dtypes=[pl.Decimal, pl.Categorical],
1389
allow_masked_out=False, # PyArrow does not support this
1390
)
1391
)
1392
def test_scan_round_trip_parametric(df: pl.DataFrame) -> None:
1393
test_scan_round_trip(df)
1394
1395
1396
def test_empty_rg_no_dict_page_18146() -> None:
1397
df = pl.DataFrame(
1398
{
1399
"a": [],
1400
},
1401
schema={"a": pl.String},
1402
)
1403
1404
f = io.BytesIO()
1405
pq.write_table(df.to_arrow(), f, compression="NONE", use_dictionary=False)
1406
f.seek(0)
1407
assert_frame_equal(pl.read_parquet(f), df)
1408
1409
1410
def test_write_sliced_lists_18069() -> None:
1411
f = io.BytesIO()
1412
a = pl.Series(3 * [None, ["$"] * 3], dtype=pl.List(pl.String))
1413
1414
before = pl.DataFrame({"a": a}).slice(4, 2)
1415
before.write_parquet(f)
1416
1417
f.seek(0)
1418
after = pl.read_parquet(f)
1419
1420
assert_frame_equal(before, after)
1421
1422
1423
def test_null_array_dict_pages_18085() -> None:
1424
test = pd.DataFrame(
1425
[
1426
{"A": float("NaN"), "B": 3, "C": None},
1427
{"A": float("NaN"), "B": None, "C": None},
1428
]
1429
)
1430
1431
f = io.BytesIO()
1432
test.to_parquet(f)
1433
f.seek(0)
1434
pl.read_parquet(f)
1435
1436
1437
@given(
1438
df=dataframes(
1439
min_size=1,
1440
max_size=1000,
1441
allowed_dtypes=[
1442
pl.List,
1443
pl.Float16,
1444
pl.Float32,
1445
pl.Float64,
1446
],
1447
allow_masked_out=False, # PyArrow does not support this
1448
),
1449
row_group_size=st.integers(min_value=10, max_value=1000),
1450
)
1451
def test_byte_stream_split_encoding_roundtrip(
1452
df: pl.DataFrame, row_group_size: int
1453
) -> None:
1454
f = io.BytesIO()
1455
pq.write_table(
1456
df.to_arrow(),
1457
f,
1458
compression="NONE",
1459
use_dictionary=False,
1460
column_encoding="BYTE_STREAM_SPLIT",
1461
write_statistics=False,
1462
row_group_size=row_group_size,
1463
)
1464
1465
f.seek(0)
1466
assert_frame_equal(pl.read_parquet(f), df)
1467
1468
1469
@given(
1470
df=dataframes(
1471
min_size=1,
1472
max_size=1000,
1473
allowed_dtypes=[
1474
pl.List,
1475
pl.Int8,
1476
pl.Int16,
1477
pl.Int32,
1478
pl.Int64,
1479
pl.UInt8,
1480
pl.UInt16,
1481
pl.UInt32,
1482
pl.UInt64,
1483
],
1484
allow_masked_out=False, # PyArrow does not support this
1485
),
1486
row_group_size=st.integers(min_value=10, max_value=1000),
1487
)
1488
def test_delta_encoding_roundtrip(df: pl.DataFrame, row_group_size: int) -> None:
1489
f = io.BytesIO()
1490
pq.write_table(
1491
df.to_arrow(),
1492
f,
1493
compression="NONE",
1494
use_dictionary=False,
1495
column_encoding="DELTA_BINARY_PACKED",
1496
write_statistics=False,
1497
row_group_size=row_group_size,
1498
)
1499
1500
f.seek(0)
1501
assert_frame_equal(pl.read_parquet(f), df)
1502
1503
1504
@given(
1505
df=dataframes(min_size=1, max_size=1000, allowed_dtypes=[pl.String, pl.Binary]),
1506
row_group_size=st.integers(min_value=10, max_value=1000),
1507
)
1508
def test_delta_length_byte_array_encoding_roundtrip(
1509
df: pl.DataFrame, row_group_size: int
1510
) -> None:
1511
f = io.BytesIO()
1512
pq.write_table(
1513
df.to_arrow(),
1514
f,
1515
compression="NONE",
1516
use_dictionary=False,
1517
column_encoding="DELTA_LENGTH_BYTE_ARRAY",
1518
write_statistics=False,
1519
row_group_size=row_group_size,
1520
)
1521
1522
f.seek(0)
1523
assert_frame_equal(pl.read_parquet(f), df)
1524
1525
1526
@given(
1527
df=dataframes(min_size=1, max_size=1000, allowed_dtypes=[pl.String, pl.Binary]),
1528
row_group_size=st.integers(min_value=10, max_value=1000),
1529
)
1530
def test_delta_strings_encoding_roundtrip(
1531
df: pl.DataFrame, row_group_size: int
1532
) -> None:
1533
f = io.BytesIO()
1534
pq.write_table(
1535
df.to_arrow(),
1536
f,
1537
compression="NONE",
1538
use_dictionary=False,
1539
column_encoding="DELTA_BYTE_ARRAY",
1540
write_statistics=False,
1541
row_group_size=row_group_size,
1542
)
1543
1544
f.seek(0)
1545
assert_frame_equal(pl.read_parquet(f), df)
1546
1547
1548
EQUALITY_OPERATORS = ["__eq__", "__lt__", "__le__", "__gt__", "__ge__"]
1549
BOOLEAN_OPERATORS = ["__or__", "__and__"]
1550
1551
1552
@given(
1553
df=dataframes(
1554
min_size=0, max_size=100, min_cols=2, max_cols=5, allowed_dtypes=[pl.Int32]
1555
),
1556
first_op=st.sampled_from(EQUALITY_OPERATORS),
1557
second_op=st.sampled_from(
1558
[None]
1559
+ [
1560
(booljoin, eq)
1561
for booljoin in BOOLEAN_OPERATORS
1562
for eq in EQUALITY_OPERATORS
1563
]
1564
),
1565
l1=st.integers(min_value=0, max_value=1000),
1566
l2=st.integers(min_value=0, max_value=1000),
1567
r1=st.integers(min_value=0, max_value=1000),
1568
r2=st.integers(min_value=0, max_value=1000),
1569
)
1570
@pytest.mark.parametrize("parallel_st", ["auto", "prefiltered"])
1571
def test_predicate_filtering(
1572
df: pl.DataFrame,
1573
first_op: str,
1574
second_op: None | tuple[str, str],
1575
l1: int,
1576
l2: int,
1577
r1: int,
1578
r2: int,
1579
parallel_st: Literal["auto", "prefiltered"],
1580
) -> None:
1581
f = io.BytesIO()
1582
df.write_parquet(f, row_group_size=5)
1583
1584
cols = df.columns
1585
1586
l1s = cols[l1 % len(cols)]
1587
l2s = cols[l2 % len(cols)]
1588
expr = (getattr(pl.col(l1s), first_op))(pl.col(l2s))
1589
1590
if second_op is not None:
1591
r1s = cols[r1 % len(cols)]
1592
r2s = cols[r2 % len(cols)]
1593
expr = getattr(expr, second_op[0])(
1594
(getattr(pl.col(r1s), second_op[1]))(pl.col(r2s))
1595
)
1596
1597
f.seek(0)
1598
result = pl.scan_parquet(f, parallel=parallel_st).filter(expr).collect()
1599
assert_frame_equal(result, df.filter(expr))
1600
1601
1602
@pytest.mark.parametrize(
1603
"use_dictionary",
1604
[False, True],
1605
)
1606
@pytest.mark.parametrize(
1607
"data_page_size",
1608
[1, None],
1609
)
1610
@given(
1611
s=series(
1612
min_size=1,
1613
max_size=10,
1614
excluded_dtypes=[
1615
pl.Int128,
1616
pl.UInt128,
1617
pl.Decimal,
1618
pl.Categorical,
1619
pl.Enum,
1620
pl.Struct, # See #19612.
1621
],
1622
allow_masked_out=False, # PyArrow does not support this
1623
),
1624
offset=st.integers(0, 10),
1625
length=st.integers(0, 10),
1626
)
1627
def test_pyarrow_slice_roundtrip(
1628
s: pl.Series,
1629
use_dictionary: bool,
1630
data_page_size: int | None,
1631
offset: int,
1632
length: int,
1633
) -> None:
1634
offset %= len(s) + 1
1635
length %= len(s) - offset + 1
1636
1637
f = io.BytesIO()
1638
df = s.to_frame()
1639
pq.write_table(
1640
df.to_arrow(),
1641
f,
1642
compression="NONE",
1643
use_dictionary=use_dictionary,
1644
data_page_size=data_page_size,
1645
)
1646
1647
f.seek(0)
1648
scanned = pl.scan_parquet(f).slice(offset, length).collect()
1649
assert_frame_equal(scanned, df.slice(offset, length))
1650
1651
1652
@given(
1653
df=dataframes(
1654
min_size=1,
1655
max_size=5,
1656
min_cols=1,
1657
max_cols=1,
1658
excluded_dtypes=[pl.Decimal, pl.Categorical, pl.Enum],
1659
),
1660
offset=st.integers(0, 100),
1661
length=st.integers(0, 100),
1662
)
1663
def test_slice_roundtrip(df: pl.DataFrame, offset: int, length: int) -> None:
1664
offset %= df.height + 1
1665
length %= df.height - offset + 1
1666
1667
f = io.BytesIO()
1668
df.write_parquet(f)
1669
1670
f.seek(0)
1671
scanned = pl.scan_parquet(f).slice(offset, length).collect()
1672
assert_frame_equal(scanned, df.slice(offset, length))
1673
1674
1675
def test_struct_prefiltered() -> None:
1676
df = pl.DataFrame({"a": {"x": 1, "y": 2}})
1677
f = io.BytesIO()
1678
df.write_parquet(f)
1679
1680
f.seek(0)
1681
(
1682
pl.scan_parquet(f, parallel="prefiltered")
1683
.filter(pl.col("a").struct.field("x") == 1)
1684
.collect()
1685
)
1686
1687
1688
@pytest.mark.parametrize(
1689
"data",
1690
[
1691
(
1692
[{"x": ""}, {"x": "0"}],
1693
pa.struct([pa.field("x", pa.string(), nullable=True)]),
1694
),
1695
(
1696
[{"x": ""}, {"x": "0"}],
1697
pa.struct([pa.field("x", pa.string(), nullable=False)]),
1698
),
1699
([[""], ["0"]], pa.list_(pa.field("item", pa.string(), nullable=False))),
1700
([[""], ["0"]], pa.list_(pa.field("item", pa.string(), nullable=True))),
1701
([[""], ["0"]], pa.list_(pa.field("item", pa.string(), nullable=False), 1)),
1702
([[""], ["0"]], pa.list_(pa.field("item", pa.string(), nullable=True), 1)),
1703
(
1704
[["", "1"], ["0", "2"]],
1705
pa.list_(pa.field("item", pa.string(), nullable=False), 2),
1706
),
1707
(
1708
[["", "1"], ["0", "2"]],
1709
pa.list_(pa.field("item", pa.string(), nullable=True), 2),
1710
),
1711
],
1712
)
1713
@pytest.mark.parametrize("nullable", [False, True])
1714
def test_nested_skip_18303(
1715
data: tuple[list[dict[str, str] | list[str]], pa.DataType],
1716
nullable: bool,
1717
) -> None:
1718
schema = pa.schema([pa.field("a", data[1], nullable=nullable)])
1719
tb = pa.table({"a": data[0]}, schema=schema)
1720
1721
f = io.BytesIO()
1722
pq.write_table(tb, f)
1723
1724
f.seek(0)
1725
scanned = pl.scan_parquet(f).slice(1, 1).collect()
1726
1727
assert_frame_equal(scanned, pl.DataFrame(tb).slice(1, 1))
1728
1729
1730
def test_nested_span_multiple_pages_18400() -> None:
1731
width = 4100
1732
df = pl.DataFrame(
1733
[
1734
pl.Series(
1735
"a",
1736
[
1737
list(range(width)),
1738
list(range(width)),
1739
],
1740
pl.Array(pl.Int64, width),
1741
),
1742
]
1743
)
1744
1745
f = io.BytesIO()
1746
pq.write_table(
1747
df.to_arrow(),
1748
f,
1749
use_dictionary=False,
1750
data_page_size=1024,
1751
column_encoding={"a": "PLAIN"},
1752
)
1753
1754
f.seek(0)
1755
assert_frame_equal(df.head(1), pl.read_parquet(f, n_rows=1))
1756
1757
1758
@given(
1759
df=dataframes(
1760
min_size=0,
1761
max_size=1000,
1762
min_cols=2,
1763
max_cols=5,
1764
excluded_dtypes=[pl.Decimal, pl.Categorical, pl.Enum, pl.Array],
1765
include_cols=[column("filter_col", pl.Boolean, allow_null=False)],
1766
),
1767
)
1768
def test_parametric_small_page_mask_filtering(df: pl.DataFrame) -> None:
1769
f = io.BytesIO()
1770
df.write_parquet(f, data_page_size=1024)
1771
1772
expr = pl.col("filter_col")
1773
f.seek(0)
1774
result = pl.scan_parquet(f, parallel="prefiltered").filter(expr).collect()
1775
assert_frame_equal(result, df.filter(expr))
1776
1777
1778
@pytest.mark.parametrize(
1779
"value",
1780
[
1781
"abcd",
1782
0,
1783
0.0,
1784
False,
1785
],
1786
)
1787
def test_different_page_validity_across_pages(value: str | int | float | bool) -> None:
1788
df = pl.DataFrame(
1789
{
1790
"a": [None] + [value] * 4000,
1791
}
1792
)
1793
1794
f = io.BytesIO()
1795
pq.write_table(
1796
df.to_arrow(),
1797
f,
1798
use_dictionary=False,
1799
data_page_size=1024,
1800
column_encoding={"a": "PLAIN"},
1801
)
1802
1803
f.seek(0)
1804
assert_frame_equal(df, pl.read_parquet(f))
1805
1806
1807
@given(
1808
df=dataframes(
1809
min_size=0,
1810
max_size=100,
1811
min_cols=2,
1812
max_cols=5,
1813
allowed_dtypes=[pl.String, pl.Binary],
1814
include_cols=[
1815
column("filter_col", pl.Int8, st.integers(0, 1), allow_null=False)
1816
],
1817
),
1818
)
1819
def test_delta_length_byte_array_prefiltering(df: pl.DataFrame) -> None:
1820
cols = df.columns
1821
1822
encodings = dict.fromkeys(cols, "DELTA_LENGTH_BYTE_ARRAY")
1823
encodings["filter_col"] = "PLAIN"
1824
1825
f = io.BytesIO()
1826
pq.write_table(
1827
df.to_arrow(),
1828
f,
1829
use_dictionary=False,
1830
column_encoding=encodings,
1831
)
1832
1833
f.seek(0)
1834
expr = pl.col("filter_col") == 0
1835
result = pl.scan_parquet(f, parallel="prefiltered").filter(expr).collect()
1836
assert_frame_equal(result, df.filter(expr))
1837
1838
1839
@given(
1840
df=dataframes(
1841
min_size=0,
1842
max_size=10,
1843
min_cols=1,
1844
max_cols=5,
1845
excluded_dtypes=[pl.Decimal, pl.Categorical, pl.Enum],
1846
include_cols=[
1847
column("filter_col", pl.Int8, st.integers(0, 1), allow_null=False)
1848
],
1849
),
1850
)
1851
def test_general_prefiltering(df: pl.DataFrame) -> None:
1852
f = io.BytesIO()
1853
df.write_parquet(f)
1854
1855
expr = pl.col("filter_col") == 0
1856
1857
f.seek(0)
1858
result = pl.scan_parquet(f, parallel="prefiltered").filter(expr).collect()
1859
assert_frame_equal(result, df.filter(expr))
1860
1861
1862
@given(
1863
df=dataframes(
1864
min_size=0,
1865
max_size=10,
1866
min_cols=1,
1867
max_cols=5,
1868
excluded_dtypes=[pl.Decimal, pl.Categorical, pl.Enum],
1869
include_cols=[column("filter_col", pl.Boolean, allow_null=False)],
1870
),
1871
)
1872
def test_row_index_prefiltering(df: pl.DataFrame) -> None:
1873
f = io.BytesIO()
1874
df.write_parquet(f)
1875
1876
expr = pl.col("filter_col")
1877
1878
f.seek(0)
1879
result = (
1880
pl.scan_parquet(
1881
f, row_index_name="ri", row_index_offset=42, parallel="prefiltered"
1882
)
1883
.filter(expr)
1884
.collect()
1885
)
1886
assert_frame_equal(result, df.with_row_index("ri", 42).filter(expr))
1887
1888
1889
def test_empty_parquet() -> None:
1890
f_pd = io.BytesIO()
1891
f_pl = io.BytesIO()
1892
1893
pd.DataFrame().to_parquet(f_pd)
1894
pl.DataFrame().write_parquet(f_pl)
1895
1896
f_pd.seek(0)
1897
f_pl.seek(0)
1898
1899
empty_from_pd = pl.read_parquet(f_pd)
1900
assert empty_from_pd.shape == (0, 0)
1901
1902
empty_from_pl = pl.read_parquet(f_pl)
1903
assert empty_from_pl.shape == (0, 0)
1904
1905
1906
@pytest.mark.parametrize(
1907
"strategy",
1908
["columns", "row_groups", "prefiltered"],
1909
)
1910
@pytest.mark.write_disk
1911
def test_row_index_projection_pushdown_18463(
1912
tmp_path: Path, strategy: ParallelStrategy
1913
) -> None:
1914
tmp_path.mkdir(exist_ok=True)
1915
f = tmp_path / "test.parquet"
1916
1917
pl.DataFrame({"A": [1, 4], "B": [2, 5]}).write_parquet(f)
1918
1919
df = pl.scan_parquet(f, parallel=strategy).with_row_index()
1920
1921
assert_frame_equal(df.select("index").collect(), df.collect().select("index"))
1922
1923
df = pl.scan_parquet(f, parallel=strategy).with_row_index("other_idx_name")
1924
1925
assert_frame_equal(
1926
df.select("other_idx_name").collect(), df.collect().select("other_idx_name")
1927
)
1928
1929
df = pl.scan_parquet(f, parallel=strategy).with_row_index(offset=42)
1930
1931
assert_frame_equal(df.select("index").collect(), df.collect().select("index"))
1932
1933
df = pl.scan_parquet(f, parallel=strategy).with_row_index()
1934
1935
assert_frame_equal(
1936
df.select("index").slice(1, 1).collect(),
1937
df.collect().select("index").slice(1, 1),
1938
)
1939
1940
1941
@pytest.mark.write_disk
1942
def test_write_binary_open_file(tmp_path: Path) -> None:
1943
df = pl.DataFrame({"a": [1, 2, 3]})
1944
1945
path = tmp_path / "test.parquet"
1946
1947
with path.open("wb") as f_write:
1948
df.write_parquet(f_write)
1949
1950
out = pl.read_parquet(path)
1951
assert_frame_equal(out, df)
1952
1953
1954
def test_prefilter_with_projection() -> None:
1955
f = io.BytesIO()
1956
pl.DataFrame({"a": [1], "b": [2]}).write_parquet(f)
1957
1958
f.seek(0)
1959
(
1960
pl.scan_parquet(f, parallel="prefiltered")
1961
.filter(pl.col.a == 1)
1962
.select(pl.col.a)
1963
.collect()
1964
)
1965
1966
1967
@pytest.mark.parametrize("parallel_strategy", ["prefiltered", "row_groups"])
1968
@pytest.mark.parametrize(
1969
"df",
1970
[
1971
pl.DataFrame({"x": 1, "y": 1}),
1972
pl.DataFrame({"x": 1, "b": 1, "y": 1}), # hive columns in file
1973
],
1974
)
1975
@pytest.mark.write_disk
1976
def test_prefilter_with_hive_19766(
1977
tmp_path: Path, df: pl.DataFrame, parallel_strategy: str
1978
) -> None:
1979
tmp_path.mkdir(exist_ok=True)
1980
(tmp_path / "a=1/b=1").mkdir(exist_ok=True, parents=True)
1981
1982
df.write_parquet(tmp_path / "a=1/b=1/1")
1983
expect = df.with_columns(a=pl.lit(1, dtype=pl.Int64), b=pl.lit(1, dtype=pl.Int64))
1984
1985
lf = pl.scan_parquet(tmp_path, parallel=parallel_strategy) # type: ignore[arg-type]
1986
1987
for predicate in [
1988
pl.col("a") == 1,
1989
pl.col("x") == 1,
1990
(pl.col("a") == 1) & (pl.col("x") == 1),
1991
pl.col("b") == 1,
1992
pl.col("y") == 1,
1993
(pl.col("a") == 1) & (pl.col("b") == 1),
1994
]:
1995
assert_frame_equal(
1996
lf.filter(predicate).collect(),
1997
expect,
1998
)
1999
2000
2001
@pytest.mark.parametrize("parallel", ["columns", "row_groups", "prefiltered", "none"])
2002
@pytest.mark.parametrize("streaming", [True, False])
2003
@pytest.mark.parametrize("projection", [pl.all(), pl.col("b")])
2004
@pytest.mark.write_disk
2005
def test_allow_missing_columns(
2006
tmp_path: Path,
2007
parallel: str,
2008
streaming: bool,
2009
projection: pl.Expr,
2010
) -> None:
2011
tmp_path.mkdir(exist_ok=True)
2012
dfs = [pl.DataFrame({"a": 1, "b": 1}), pl.DataFrame({"a": 2})]
2013
paths = [tmp_path / "1", tmp_path / "2"]
2014
2015
for df, path in zip(dfs, paths, strict=True):
2016
df.write_parquet(path)
2017
2018
expected_full = pl.DataFrame({"a": [1, 2], "b": [1, None]})
2019
expected = expected_full.select(projection)
2020
2021
with pytest.raises(
2022
pl.exceptions.ColumnNotFoundError,
2023
match="passing `missing_columns='insert'`",
2024
):
2025
pl.read_parquet(paths, parallel=parallel) # type: ignore[arg-type]
2026
2027
with pytest.raises(
2028
pl.exceptions.ColumnNotFoundError,
2029
match="passing `missing_columns='insert'`",
2030
):
2031
pl.scan_parquet(paths, parallel=parallel).select(projection).collect( # type: ignore[arg-type]
2032
engine="streaming" if streaming else "in-memory"
2033
)
2034
2035
assert_frame_equal(
2036
pl.read_parquet(
2037
paths,
2038
parallel=parallel, # type: ignore[arg-type]
2039
missing_columns="insert",
2040
).select(projection),
2041
expected,
2042
)
2043
2044
assert_frame_equal(
2045
pl.scan_parquet(paths, parallel=parallel, missing_columns="insert") # type: ignore[arg-type]
2046
.select(projection)
2047
.collect(engine="streaming" if streaming else "in-memory"),
2048
expected,
2049
)
2050
2051
# Test deprecated parameter
2052
2053
with warnings.catch_warnings():
2054
warnings.simplefilter("ignore", DeprecationWarning)
2055
2056
with pytest.raises(
2057
pl.exceptions.ColumnNotFoundError,
2058
match="passing `missing_columns='insert'`",
2059
):
2060
assert_frame_equal(
2061
pl.scan_parquet(
2062
paths,
2063
parallel=parallel, # type: ignore[arg-type]
2064
allow_missing_columns=False,
2065
).collect(engine="streaming" if streaming else "in-memory"),
2066
expected_full,
2067
)
2068
2069
assert_frame_equal(
2070
pl.scan_parquet(
2071
paths,
2072
parallel=parallel, # type: ignore[arg-type]
2073
allow_missing_columns=True,
2074
).collect(engine="streaming" if streaming else "in-memory"),
2075
expected_full,
2076
)
2077
2078
2079
def test_nested_nonnullable_19158() -> None:
2080
# Bug is based on the top-level struct being nullable and the inner list
2081
# not being nullable.
2082
tbl = pa.table(
2083
{
2084
"a": [{"x": [1]}, None, {"x": [1, 2]}, None],
2085
},
2086
schema=pa.schema(
2087
[
2088
pa.field(
2089
"a",
2090
pa.struct([pa.field("x", pa.list_(pa.int8()), nullable=False)]),
2091
nullable=True,
2092
)
2093
]
2094
),
2095
)
2096
2097
f = io.BytesIO()
2098
pq.write_table(tbl, f)
2099
2100
f.seek(0)
2101
assert_frame_equal(pl.read_parquet(f), pl.DataFrame(tbl))
2102
2103
2104
D = Decimal
2105
2106
2107
@pytest.mark.parametrize("precision", range(1, 37, 2))
2108
@pytest.mark.parametrize(
2109
"nesting",
2110
[
2111
# Struct
2112
lambda t: ([{"x": None}, None], pl.Struct({"x": t})),
2113
lambda t: ([None, {"x": None}], pl.Struct({"x": t})),
2114
lambda t: ([{"x": D("1.5")}, None], pl.Struct({"x": t})),
2115
lambda t: ([{"x": D("1.5")}, {"x": D("4.8")}], pl.Struct({"x": t})),
2116
# Array
2117
lambda t: ([[None, None, D("8.2")], None], pl.Array(t, 3)),
2118
lambda t: ([None, [None, D("8.9"), None]], pl.Array(t, 3)),
2119
lambda t: ([[D("1.5"), D("3.7"), D("4.1")], None], pl.Array(t, 3)),
2120
lambda t: (
2121
[[D("1.5"), D("3.7"), D("4.1")], [D("2.8"), D("5.2"), D("8.9")]],
2122
pl.Array(t, 3),
2123
),
2124
# List
2125
lambda t: ([[None, D("8.2")], None], pl.List(t)),
2126
lambda t: ([None, [D("8.9"), None]], pl.List(t)),
2127
lambda t: ([[D("1.5"), D("4.1")], None], pl.List(t)),
2128
lambda t: ([[D("1.5"), D("3.7"), D("4.1")], [D("2.8"), D("8.9")]], pl.List(t)),
2129
],
2130
)
2131
def test_decimal_precision_nested_roundtrip(
2132
nesting: Callable[[pl.DataType], tuple[list[Any], pl.DataType]],
2133
precision: int,
2134
) -> None:
2135
# Limit the context as to not disturb any other tests
2136
with decimal.localcontext() as ctx:
2137
ctx.prec = precision
2138
2139
decimal_dtype = pl.Decimal(precision=precision)
2140
values, dtype = nesting(decimal_dtype)
2141
2142
df = pl.Series("a", values, dtype).to_frame()
2143
2144
test_round_trip(df)
2145
2146
2147
@pytest.mark.may_fail_cloud # reason: sortedness flag
2148
@pytest.mark.parametrize("parallel", ["prefiltered", "columns", "row_groups", "auto"])
2149
def test_conserve_sortedness(
2150
plmonkeypatch: PlMonkeyPatch,
2151
capfd: pytest.CaptureFixture[str],
2152
parallel: ParallelStrategy,
2153
) -> None:
2154
f = io.BytesIO()
2155
2156
df = pl.DataFrame(
2157
{
2158
"a": [1, 2, 3, 4, 5, None],
2159
"b": [1.0, 2.0, 3.0, 4.0, 5.0, None],
2160
"c": [None, 5, 4, 3, 2, 1],
2161
"d": [None, 5.0, 4.0, 3.0, 2.0, 1.0],
2162
"a_nosort": [1, 2, 3, 4, 5, None],
2163
"f": range(6),
2164
}
2165
)
2166
2167
for col, descending, nulls_last in [("a", False, False), ("c", True, True)]:
2168
col_idx = df.get_column_index(col)
2169
f.seek(0)
2170
pq.write_table(
2171
df.to_arrow(),
2172
f,
2173
sorting_columns=[
2174
pq.SortingColumn(col_idx, descending, nulls_last),
2175
],
2176
)
2177
f.truncate()
2178
f.seek(0)
2179
2180
plmonkeypatch.setenv("POLARS_VERBOSE", "1")
2181
2182
df = pl.scan_parquet(f, parallel=parallel).filter(pl.col.f > 1).collect()
2183
2184
captured = capfd.readouterr().err
2185
2186
# @NOTE: We don't conserve sortedness for anything except integers at the
2187
# moment.
2188
assert (
2189
captured.count("Parquet conserved SortingColumn for column chunk of") == 1
2190
)
2191
assert (
2192
f"Parquet conserved SortingColumn for column chunk of '{col}' to {'Descending' if descending else 'Ascending'}"
2193
in captured
2194
)
2195
2196
2197
@pytest.mark.parametrize("use_dictionary", [True, False])
2198
@pytest.mark.parametrize(
2199
"values",
2200
[
2201
(size, x)
2202
for size in [1, 2, 3, 4, 8, 12, 15, 16, 32]
2203
for x in [
2204
[list(range(size)), list(range(7, 7 + size))],
2205
[list(range(size)), None],
2206
[list(range(i, i + size)) for i in range(13)],
2207
[list(range(i, i + size)) if i % 3 < 2 else None for i in range(13)],
2208
]
2209
],
2210
)
2211
@pytest.mark.parametrize(
2212
"filt",
2213
[
2214
lambda _: None,
2215
lambda _: pl.col.f > 0,
2216
lambda _: pl.col.f > 1,
2217
lambda _: pl.col.f < 5,
2218
lambda _: pl.col.f % 2 == 0,
2219
lambda _: pl.col.f % 5 < 4,
2220
lambda values: (0, min(1, len(values))),
2221
lambda _: (1, 1),
2222
lambda _: (-2, 1),
2223
lambda values: (1, len(values) - 2),
2224
],
2225
)
2226
def test_fixed_size_binary(
2227
use_dictionary: bool,
2228
values: tuple[int, list[None | list[int]]],
2229
filt: Callable[[list[None | list[int]]], None | pl.Expr | tuple[int, int]],
2230
) -> None:
2231
size, elems = values
2232
bs = [bytes(v) if v is not None else None for v in elems]
2233
2234
tbl = pa.table(
2235
{
2236
"a": bs,
2237
"f": range(len(bs)),
2238
},
2239
schema=pa.schema(
2240
[
2241
pa.field("a", pa.binary(length=size), nullable=True),
2242
pa.field("f", pa.int32(), nullable=True),
2243
]
2244
),
2245
)
2246
2247
df = pl.DataFrame(tbl)
2248
2249
f = io.BytesIO()
2250
pq.write_table(tbl, f, use_dictionary=use_dictionary)
2251
2252
f.seek(0)
2253
2254
loaded: pl.DataFrame
2255
if isinstance(filt, pl.Expr):
2256
loaded = pl.scan_parquet(f).filter(filt).collect()
2257
df = df.filter(filt)
2258
elif isinstance(filt, tuple):
2259
loaded = pl.scan_parquet(f).slice(filt[0], filt[1]).collect()
2260
df = df.slice(filt[0], filt[1])
2261
else:
2262
loaded = pl.read_parquet(f)
2263
2264
assert_frame_equal(loaded, df)
2265
2266
2267
def test_decode_f16() -> None:
2268
values = [float("nan"), 0.0, 0.5, 1.0, 1.5]
2269
2270
table = pa.Table.from_pydict(
2271
{
2272
"x": pa.array(np.array(values, dtype=np.float16), type=pa.float16()),
2273
}
2274
)
2275
2276
df = pl.Series("x", values, pl.Float16).to_frame()
2277
2278
f = io.BytesIO()
2279
pq.write_table(table, f)
2280
2281
f.seek(0)
2282
assert_frame_equal(pl.read_parquet(f), df)
2283
2284
f.seek(0)
2285
assert_frame_equal(
2286
pl.scan_parquet(f).filter(pl.col.x > 0.5).collect(),
2287
df.filter(pl.col.x > 0.5),
2288
)
2289
2290
f.seek(0)
2291
assert_frame_equal(
2292
pl.scan_parquet(f).slice(1, 3).collect(),
2293
df.slice(1, 3),
2294
)
2295
2296
2297
def test_invalid_utf8_binary() -> None:
2298
a = pl.Series("a", [b"\x80"], pl.Binary).to_frame()
2299
f = io.BytesIO()
2300
2301
a.write_parquet(f)
2302
f.seek(0)
2303
out = pl.read_parquet(f)
2304
2305
assert_frame_equal(a, out)
2306
2307
2308
@pytest.mark.parametrize(
2309
"dtype",
2310
[
2311
pl.Null,
2312
pl.Int8,
2313
pl.Int32,
2314
pl.Datetime(),
2315
pl.String,
2316
pl.Binary,
2317
pl.Boolean,
2318
pl.Struct({"x": pl.Int32}),
2319
pl.List(pl.Int32),
2320
pytest.param(
2321
pl.Array(pl.Int32, 0), marks=pytest.mark.may_fail_cloud
2322
), # reason: zero-width array
2323
pl.Array(pl.Int32, 2),
2324
],
2325
)
2326
@pytest.mark.parametrize(
2327
"filt",
2328
[
2329
pl.col.f == 0,
2330
pl.col.f != 0,
2331
pl.col.f == 1,
2332
pl.col.f != 1,
2333
pl.col.f == 2,
2334
pl.col.f != 2,
2335
pl.col.f == 3,
2336
pl.col.f != 3,
2337
],
2338
)
2339
def test_filter_only_invalid(dtype: pl.DataType, filt: pl.Expr) -> None:
2340
df = pl.DataFrame(
2341
[
2342
pl.Series("a", [None, None, None], dtype),
2343
pl.Series("f", range(3), pl.Int32),
2344
]
2345
)
2346
2347
f = io.BytesIO()
2348
2349
df.write_parquet(f)
2350
f.seek(0)
2351
out = pl.scan_parquet(f, parallel="prefiltered").filter(filt).collect()
2352
2353
assert_frame_equal(df.filter(filt), out)
2354
2355
2356
def test_nested_nulls() -> None:
2357
df = pl.Series(
2358
"a",
2359
[
2360
[None, None],
2361
None,
2362
[None, 1],
2363
[None, None],
2364
[2, None],
2365
],
2366
pl.Array(pl.Int32, 2),
2367
).to_frame()
2368
2369
f = io.BytesIO()
2370
df.write_parquet(f)
2371
2372
f.seek(0)
2373
out = pl.read_parquet(f)
2374
assert_frame_equal(out, df)
2375
2376
2377
@pytest.mark.parametrize("content", [[], [None], [None, 0.0]])
2378
def test_nested_dicts(content: list[float | None]) -> None:
2379
df = pl.Series("a", [content], pl.List(pl.Float64)).to_frame()
2380
2381
f = io.BytesIO()
2382
df.write_parquet(f, use_pyarrow=True)
2383
f.seek(0)
2384
assert_frame_equal(df, pl.read_parquet(f))
2385
2386
2387
@pytest.mark.parametrize(
2388
"leading_nulls",
2389
[
2390
[],
2391
[None] * 7,
2392
],
2393
)
2394
@pytest.mark.parametrize(
2395
"trailing_nulls",
2396
[
2397
[],
2398
[None] * 7,
2399
],
2400
)
2401
@pytest.mark.parametrize(
2402
"first_chunk",
2403
# Create both RLE and Bitpacked chunks
2404
[
2405
[1] * 57,
2406
[1 if i % 7 < 3 and i % 5 > 3 else None for i in range(57)],
2407
list(range(57)),
2408
[i if i % 7 < 3 and i % 5 > 3 else None for i in range(57)],
2409
],
2410
)
2411
@pytest.mark.parametrize(
2412
"second_chunk",
2413
# Create both RLE and Bitpacked chunks
2414
[
2415
[2] * 57,
2416
[2 if i % 7 < 3 and i % 5 > 3 else None for i in range(57)],
2417
list(range(57)),
2418
[i if i % 7 < 3 and i % 5 > 3 else None for i in range(57)],
2419
],
2420
)
2421
@pytest.mark.slow
2422
def test_dict_slices(
2423
leading_nulls: list[None],
2424
trailing_nulls: list[None],
2425
first_chunk: list[None | int],
2426
second_chunk: list[None | int],
2427
) -> None:
2428
df = pl.Series(
2429
"a", leading_nulls + first_chunk + second_chunk + trailing_nulls, pl.Int64
2430
).to_frame()
2431
2432
f = io.BytesIO()
2433
df.write_parquet(f)
2434
2435
for offset in chain([0, 1, 2], range(3, df.height, 3)):
2436
for length in chain([df.height, 1, 2], range(3, df.height - offset, 3)):
2437
f.seek(0)
2438
assert_frame_equal(
2439
pl.scan_parquet(f).slice(offset, length).collect(),
2440
df.slice(offset, length),
2441
)
2442
2443
2444
@pytest.mark.parametrize(
2445
"mask",
2446
[
2447
[i % 13 < 3 and i % 17 > 3 for i in range(57 * 2)],
2448
[False] * 23 + [True] * 68 + [False] * 23,
2449
[False] * 23 + [True] * 24 + [False] * 20 + [True] * 24 + [False] * 23,
2450
[True] + [False] * 22 + [True] * 24 + [False] * 20 + [True] * 24 + [False] * 23,
2451
[False] * 23 + [True] * 24 + [False] * 20 + [True] * 24 + [False] * 22 + [True],
2452
[True]
2453
+ [False] * 22
2454
+ [True] * 24
2455
+ [False] * 20
2456
+ [True] * 24
2457
+ [False] * 22
2458
+ [True],
2459
[False] * 56 + [True] * 58,
2460
[False] * 57 + [True] * 57,
2461
[False] * 58 + [True] * 56,
2462
[True] * 56 + [False] * 58,
2463
[True] * 57 + [False] * 57,
2464
[True] * 58 + [False] * 56,
2465
],
2466
)
2467
@pytest.mark.parametrize(
2468
"first_chunk",
2469
# Create both RLE and Bitpacked chunks
2470
[
2471
[1] * 57,
2472
[1 if i % 7 < 3 and i % 5 > 3 else None for i in range(57)],
2473
list(range(57)),
2474
[i if i % 7 < 3 and i % 5 > 3 else None for i in range(57)],
2475
],
2476
)
2477
@pytest.mark.parametrize(
2478
"second_chunk",
2479
# Create both RLE and Bitpacked chunks
2480
[
2481
[2] * 57,
2482
[2 if i % 7 < 3 and i % 5 > 3 else None for i in range(57)],
2483
list(range(57)),
2484
[i if i % 7 < 3 and i % 5 > 3 else None for i in range(57)],
2485
],
2486
)
2487
def test_dict_masked(
2488
mask: list[bool],
2489
first_chunk: list[None | int],
2490
second_chunk: list[None | int],
2491
) -> None:
2492
df = pl.DataFrame(
2493
[
2494
pl.Series("a", first_chunk + second_chunk, pl.Int64),
2495
pl.Series("f", mask, pl.Boolean),
2496
]
2497
)
2498
2499
f = io.BytesIO()
2500
df.write_parquet(f)
2501
2502
f.seek(0)
2503
assert_frame_equal(
2504
pl.scan_parquet(f, parallel="prefiltered").filter(pl.col.f).collect(),
2505
df.filter(pl.col.f),
2506
)
2507
2508
2509
@pytest.mark.may_fail_auto_streaming
2510
def test_categorical_sliced_20017() -> None:
2511
f = io.BytesIO()
2512
df = (
2513
pl.Series("a", ["a", None])
2514
.to_frame()
2515
.with_columns(pl.col.a.cast(pl.Categorical))
2516
)
2517
df.write_parquet(f)
2518
2519
f.seek(0)
2520
assert_frame_equal(
2521
pl.read_parquet(f, n_rows=1),
2522
df.head(1),
2523
)
2524
2525
2526
@given(
2527
s=series(name="a", dtype=pl.String, min_size=7, max_size=7),
2528
mask=series(
2529
name="mask", dtype=pl.Boolean, min_size=7, max_size=7, allow_null=False
2530
),
2531
)
2532
def test_categorical_parametric_masked(s: pl.Series, mask: pl.Series) -> None:
2533
f = io.BytesIO()
2534
2535
df = pl.DataFrame([s, mask]).with_columns(pl.col.a.cast(pl.Categorical))
2536
df.write_parquet(f)
2537
2538
f.seek(0)
2539
assert_frame_equal(
2540
pl.scan_parquet(f, parallel="prefiltered").filter(pl.col.mask).collect(),
2541
df.filter(pl.col.mask),
2542
)
2543
2544
2545
@given(
2546
s=series(name="a", dtype=pl.String, min_size=7, max_size=7),
2547
start=st.integers(0, 6),
2548
length=st.integers(1, 7),
2549
)
2550
def test_categorical_parametric_sliced(s: pl.Series, start: int, length: int) -> None:
2551
length = min(7 - start, length)
2552
2553
f = io.BytesIO()
2554
2555
df = s.to_frame().with_columns(pl.col.a.cast(pl.Categorical))
2556
df.write_parquet(f)
2557
2558
f.seek(0)
2559
assert_frame_equal(
2560
pl.scan_parquet(f).slice(start, length).collect(),
2561
df.slice(start, length),
2562
)
2563
2564
2565
@pytest.mark.write_disk
2566
def test_prefilter_with_projection_column_order_20175(tmp_path: Path) -> None:
2567
path = tmp_path / "1"
2568
2569
pl.DataFrame({"a": 1, "b": 1, "c": 1, "d": 1, "e": 1}).write_parquet(path)
2570
2571
q = (
2572
pl.scan_parquet(path, parallel="prefiltered")
2573
.filter(pl.col("a") == 1)
2574
.select("a", "d", "c")
2575
)
2576
2577
assert_frame_equal(q.collect(), pl.DataFrame({"a": 1, "d": 1, "c": 1}))
2578
2579
f = io.BytesIO()
2580
2581
pl.read_csv(b"""\
2582
c0,c1,c2,c3,c4,c5,c6,c7,c8,c9,c10
2583
1,1,1,1,1,1,1,1,1,1,1
2584
1,1,1,1,1,1,1,1,1,1,1
2585
""").write_parquet(f)
2586
2587
f.seek(0)
2588
2589
q = (
2590
pl.scan_parquet(
2591
f,
2592
rechunk=True,
2593
parallel="prefiltered",
2594
)
2595
.filter(
2596
pl.col("c0") == 1,
2597
)
2598
.select("c0", "c9", "c3")
2599
)
2600
2601
assert_frame_equal(
2602
q.collect(),
2603
pl.read_csv(b"""\
2604
c0,c9,c3
2605
1,1,1
2606
1,1,1
2607
"""),
2608
)
2609
2610
2611
def test_utf8_verification_with_slice_20174() -> None:
2612
f = io.BytesIO()
2613
pq.write_table(
2614
pl.Series("s", ["a", "a" * 128]).to_frame().to_arrow(), f, use_dictionary=False
2615
)
2616
2617
f.seek(0)
2618
assert_frame_equal(
2619
pl.scan_parquet(f).head(1).collect(),
2620
pl.Series("s", ["a"]).to_frame(),
2621
)
2622
2623
2624
@pytest.mark.parametrize("parallel", ["prefiltered", "row_groups"])
2625
@pytest.mark.parametrize(
2626
"projection",
2627
[
2628
{"a": pl.Int64(), "b": pl.Int64()},
2629
{"b": pl.Int64(), "a": pl.Int64()},
2630
],
2631
)
2632
def test_parquet_prefiltered_unordered_projection_20175(
2633
parallel: str, projection: dict[str, pl.DataType]
2634
) -> None:
2635
df = pl.DataFrame(
2636
[
2637
pl.Series("a", [0], pl.Int64),
2638
pl.Series("b", [0], pl.Int64),
2639
]
2640
)
2641
2642
f = io.BytesIO()
2643
df.write_parquet(f)
2644
2645
f.seek(0)
2646
out = (
2647
pl.scan_parquet(f, parallel=parallel) # type: ignore[arg-type]
2648
.filter(pl.col.a >= 0)
2649
.select(*projection.keys())
2650
.collect()
2651
)
2652
2653
assert out.schema == projection
2654
2655
2656
def test_parquet_unsupported_dictionary_to_pl_17945() -> None:
2657
t = pa.table(
2658
{
2659
"col1": pa.DictionaryArray.from_arrays([0, 0, None, 1], [42, 1337]),
2660
},
2661
schema=pa.schema({"col1": pa.dictionary(pa.uint32(), pa.int64())}),
2662
)
2663
2664
f = io.BytesIO()
2665
pq.write_table(t, f, use_dictionary=False)
2666
f.truncate()
2667
2668
f.seek(0)
2669
assert_series_equal(
2670
pl.Series("col1", [42, 42, None, 1337], pl.Int64),
2671
pl.read_parquet(f).to_series(),
2672
)
2673
2674
f.seek(0)
2675
pq.write_table(t, f)
2676
f.truncate()
2677
2678
f.seek(0)
2679
assert_series_equal(
2680
pl.Series("col1", [42, 42, None, 1337], pl.Int64),
2681
pl.read_parquet(f).to_series(),
2682
)
2683
2684
2685
@pytest.mark.may_fail_auto_streaming
2686
def test_parquet_cast_to_cat() -> None:
2687
t = pa.table(
2688
{
2689
"col1": pa.DictionaryArray.from_arrays([0, 0, None, 1], ["A", "B"]),
2690
},
2691
schema=pa.schema({"col1": pa.dictionary(pa.uint32(), pa.string())}),
2692
)
2693
2694
f = io.BytesIO()
2695
pq.write_table(t, f, use_dictionary=False)
2696
f.truncate()
2697
2698
f.seek(0)
2699
assert_series_equal(
2700
pl.Series("col1", ["A", "A", None, "B"], pl.Categorical),
2701
pl.read_parquet(f).to_series(),
2702
)
2703
2704
f.seek(0)
2705
pq.write_table(t, f)
2706
f.truncate()
2707
2708
f.seek(0)
2709
assert_series_equal(
2710
pl.Series("col1", ["A", "A", None, "B"], pl.Categorical),
2711
pl.read_parquet(f).to_series(),
2712
)
2713
2714
2715
def test_parquet_roundtrip_lex_cat_20288() -> None:
2716
f = io.BytesIO()
2717
df = pl.Series("a", ["A", "B"], pl.Categorical()).to_frame()
2718
df.write_parquet(f)
2719
f.seek(0)
2720
dt = pl.scan_parquet(f).collect_schema()["a"]
2721
assert isinstance(dt, pl.Categorical)
2722
assert dt.ordering == "lexical"
2723
2724
2725
def test_from_parquet_20271() -> None:
2726
f = io.BytesIO()
2727
df = pl.Series("b", ["D", "E"], pl.Categorical).to_frame()
2728
df.write_parquet(f)
2729
del df
2730
f.seek(0)
2731
df = pl.read_parquet(f)
2732
assert_series_equal(df.to_series(), pl.Series("b", ["D", "E"], pl.Categorical))
2733
2734
2735
def test_boolean_slice_pushdown_20314() -> None:
2736
s = pl.Series("a", [None, False, True])
2737
f = io.BytesIO()
2738
2739
s.to_frame().write_parquet(f)
2740
2741
f.seek(0)
2742
assert pl.scan_parquet(f).slice(2, 1).collect().item()
2743
2744
2745
def test_load_pred_pushdown_fsl_19241() -> None:
2746
f = io.BytesIO()
2747
2748
fsl = pl.Series("a", [[[1, 2]]], pl.Array(pl.Array(pl.Int8, 2), 1))
2749
filt = pl.Series("f", [1])
2750
2751
pl.DataFrame([fsl, filt]).write_parquet(f)
2752
2753
f.seek(0)
2754
q = pl.scan_parquet(f, parallel="prefiltered").filter(pl.col.f != 4)
2755
2756
assert_frame_equal(q.collect(), pl.DataFrame([fsl, filt]))
2757
2758
2759
def test_struct_list_statistics_20510() -> None:
2760
# Test PyArrow - Utf8ViewArray
2761
data = {
2762
"name": ["a", "b"],
2763
"data": [
2764
{"title": "Title", "data": [0, 1, 3]},
2765
{"title": "Title", "data": [0, 1, 3]},
2766
],
2767
}
2768
df = pl.DataFrame(
2769
data,
2770
schema=pl.Schema(
2771
{
2772
"name": pl.String(),
2773
"data": pl.Struct(
2774
{
2775
"title": pl.String,
2776
"data": pl.List(pl.Int64),
2777
}
2778
),
2779
}
2780
),
2781
)
2782
2783
f = io.BytesIO()
2784
df.write_parquet(f)
2785
f.seek(0)
2786
result = pl.scan_parquet(f).filter(pl.col("name") == "b").collect()
2787
2788
assert_frame_equal(result, df.filter(pl.col("name") == "b"))
2789
2790
# Test PyArrow - Utf8Array
2791
tb = pa.table(
2792
data,
2793
schema=pa.schema(
2794
[
2795
("name", pa.string()),
2796
(
2797
"data",
2798
pa.struct(
2799
[
2800
("title", pa.string()),
2801
("data", pa.list_(pa.int64())),
2802
]
2803
),
2804
),
2805
]
2806
),
2807
)
2808
2809
f.seek(0)
2810
pq.write_table(tb, f)
2811
f.truncate()
2812
f.seek(0)
2813
result = pl.scan_parquet(f).filter(pl.col("name") == "b").collect()
2814
2815
assert_frame_equal(result, df.filter(pl.col("name") == "b"))
2816
2817
2818
def test_required_masked_skip_values_20809(plmonkeypatch: PlMonkeyPatch) -> None:
2819
df = pl.DataFrame(
2820
[pl.Series("a", list(range(20)) + [42] * 15), pl.Series("b", range(35))]
2821
)
2822
needle = [16, 33]
2823
2824
f = io.BytesIO()
2825
df.write_parquet(f)
2826
2827
f.seek(0)
2828
plmonkeypatch.setenv("POLARS_PQ_PREFILTERED_MASK", "pre")
2829
df1 = (
2830
pl.scan_parquet(f, parallel="prefiltered")
2831
.filter(pl.col.b.is_in(needle))
2832
.collect()
2833
)
2834
2835
f.seek(0)
2836
df2 = pl.read_parquet(f, parallel="columns").filter(pl.col.b.is_in(needle))
2837
2838
assert_frame_equal(df1, df2)
2839
2840
2841
def get_tests_from_dtype(
2842
dtype: pl.DataType, f: Callable[[int], Any]
2843
) -> list[tuple[pl.DataType, list[Any], list[Any]]]:
2844
return [
2845
(dtype, [f(i) for i in range(10)], [f(i) for i in range(11)]),
2846
(
2847
dtype,
2848
[f(i) for i in range(1337)],
2849
[f(i) for i in [0, 1, 5, 7, 101, 1023, 1336, 1337, 1338]],
2850
),
2851
(
2852
dtype,
2853
list(
2854
functools.reduce(
2855
lambda x, y: list(x) + y,
2856
([f(i)] * (i % 13) for i in range(1337)),
2857
[],
2858
)
2859
),
2860
[f(i) for i in [0, 1, 5, 7, 101, 1023, 1336, 1337, 1338]],
2861
),
2862
(
2863
dtype,
2864
[f(5)] * 37 + [f(10)] * 61 + [f(1996)] * 21,
2865
[f(i) for i in [1, 5, 10, 1996]],
2866
),
2867
]
2868
2869
2870
@pytest.mark.parametrize("strategy", ["columns", "prefiltered"])
2871
@pytest.mark.parametrize(
2872
("dtype", "values", "needles"),
2873
get_tests_from_dtype(pl.Int8(), lambda x: (x % 256) - 128)
2874
+ get_tests_from_dtype(pl.Int32(), lambda x: x % 256)
2875
+ get_tests_from_dtype(pl.Date(), lambda x: date(year=1 + x, month=10, day=5))
2876
+ get_tests_from_dtype(pl.String(), lambda x: str(x))
2877
+ get_tests_from_dtype(pl.String(), lambda x: "i" * x)
2878
+ get_tests_from_dtype(pl.String(), lambda x: f"long_strings_with_the_number_{x}"),
2879
)
2880
def test_equality_filter(
2881
strategy: ParallelStrategy,
2882
dtype: pl.DataType,
2883
values: list[Any],
2884
needles: list[Any],
2885
) -> None:
2886
df = pl.DataFrame(
2887
[
2888
pl.Series("a", values, dtype),
2889
]
2890
)
2891
2892
f = io.BytesIO()
2893
df.write_parquet(f)
2894
2895
for needle in needles:
2896
f.seek(0)
2897
scan = pl.scan_parquet(f, parallel=strategy)
2898
try:
2899
assert_frame_equal(
2900
df.filter(pl.col.a == pl.lit(needle, dtype)),
2901
scan.filter(pl.col.a == pl.lit(needle, dtype)).collect(),
2902
)
2903
except:
2904
import sys
2905
2906
print(f"needle: {needle}", file=sys.stderr)
2907
raise
2908
2909
2910
def test_nested_string_slice_utf8_21202() -> None:
2911
s = pl.Series(
2912
"a",
2913
[
2914
["A" * 128],
2915
["A"],
2916
],
2917
pl.List(pl.String()),
2918
)
2919
2920
f = io.BytesIO()
2921
s.to_frame().write_parquet(f)
2922
2923
f.seek(0)
2924
assert_series_equal(
2925
pl.scan_parquet(f).slice(1, 1).collect().to_series(),
2926
s.slice(1, 1),
2927
)
2928
2929
2930
def test_filter_true_predicate_21204() -> None:
2931
f = io.BytesIO()
2932
2933
df = pl.DataFrame({"a": [1]})
2934
df.write_parquet(f)
2935
f.seek(0)
2936
lf = pl.scan_parquet(f).filter(pl.lit(True))
2937
assert_frame_equal(lf.collect(), df)
2938
2939
2940
def test_nested_deprecated_int96_timestamps_21332() -> None:
2941
f = io.BytesIO()
2942
2943
df = pl.DataFrame({"a": [{"t": datetime(2025, 1, 1)}]})
2944
2945
pq.write_table(
2946
df.to_arrow(),
2947
f,
2948
use_deprecated_int96_timestamps=True,
2949
)
2950
2951
f.seek(0)
2952
assert_frame_equal(
2953
pl.read_parquet(f),
2954
df,
2955
)
2956
2957
2958
def test_final_masked_optional_iteration_21378() -> None:
2959
# fmt: off
2960
values = [
2961
1, 0, 0, 0, 0, 1, 1, 1,
2962
1, 0, 0, 1, 1, 1, 1, 0,
2963
0, 1, 1, 1, 0, 1, 0, 0,
2964
1, 1, 0, 0, 0, 1, 1, 1,
2965
0, 1, 0, 0, 1, 1, 1, 1,
2966
0, 1, 1, 1, 0, 1, 0, 1,
2967
0, 1, 1, 0, 1, 0, 1, 1,
2968
0, 0, 0, 0, 1, 0, 0, 0,
2969
0, 1, 1, 1, 0, 0, 1, 1,
2970
0, 0, 1, 1, 0, 0, 0, 1,
2971
1, 1, 0, 1, 1, 1, 0, 0,
2972
0, 0, 0, 0, 0, 0, 0, 0,
2973
1, 1, 1, 1, 1, 1, 1, 0,
2974
0, 0, 1, 0, 1, 1, 0, 0,
2975
0, 1, 1, 0, 0, 1, 0, 0,
2976
1, 1, 1, 1, 0, 0, 1, 0,
2977
0, 1, 1, 0, 0, 1, 1, 1,
2978
1, 1, 1, 0, 1, 1, 0, 1,
2979
0, 1, 0, 1, 0, 1, 0, 1,
2980
0, 0, 0, 1, 1, 0, 0, 0,
2981
1, 1, 0, 1, 0, 1, 0, 1,
2982
0, 1, 0, 0, 0, 0, 0, 1,
2983
0, 0, 1, 1, 0, 0, 1, 1,
2984
0, 1, 0, 0, 0, 1, 1, 1,
2985
1, 0, 1, 0, 1, 0, 1, 1,
2986
1, 0, 1, 0, 0, 1, 0, 1,
2987
0, 1, 1, 1, 0, 0, 0, 1,
2988
1, 1, 1, 1, 1, 1, 0, 0,
2989
1, 0, 0, 0, 0, 0, 0, 1,
2990
1, 1, 1, 0, 0, 0, 0, 0,
2991
1, 1, 1, 0, 0, 0, 1, 1,
2992
0, 0, 0, 0, 0, 1, 1, 0,
2993
0, 0, 1, 0, 0, 0, 0, 1,
2994
0, 0, 0, 0, 0, 1, 0, 0,
2995
1, 0, 1, 0, 0, 1, 0, 0,
2996
0, 1, 1, 1, 0, 0, 1, 1,
2997
1, 0, 1, 1, 0, 0, 0, 1,
2998
0, 0, 1, 1, 0, 1, 0, 1,
2999
0, 1, 1, 1, 0, 0, 0, 1,
3000
0, 0, 0, 1, 0, 1, 0, 1,
3001
0, 1, 0, 1, 0, 1, 1, 1,
3002
1, 0, 1, 1, 1, 1, 1, 0,
3003
1, 0, 1, 0, 0, 0, 1, 1,
3004
0, 0, 0, 1, 0, 0, 1, 0,
3005
0, 1, 0, 0, 1, 0, 1, 1,
3006
1, 0, 0, 1, 0, 1, 1, 0,
3007
0, 1, 0, 1, 1, 0, 1, 0,
3008
0, 0, 0, 1, 1, 1, 0, 0,
3009
0, 1, 0, 1, 1, 0, 1, 1,
3010
1, 1, 0, 1, 0, 1, 0, 1,
3011
1, 1, 0, 1, 0, 0, 1, 0,
3012
1, 1, 0, 1, 1, 0, 0, 1,
3013
0, 0, 0, 0, 0, 1, 0, 0,
3014
0, 1, 0, 0, 1, 1, 1, 1,
3015
1, 0, 1, 1, 1, 0, 1, 1,
3016
1, 1, 0, 0, 0, 0, 1, 1,
3017
]
3018
3019
df = pl.DataFrame(
3020
[
3021
pl.Series("x", [None if x == 1 else 0.0 for x in values], pl.Float32),
3022
pl.Series(
3023
"f",
3024
[False] * 164 +
3025
[True] * 10 +
3026
[False] * 264 +
3027
[True] * 10,
3028
pl.Boolean(),
3029
),
3030
]
3031
)
3032
3033
f = io.BytesIO()
3034
df.write_parquet(f)
3035
f.seek(0)
3036
3037
output = pl.scan_parquet(f, parallel="prefiltered").filter(pl.col.f).collect()
3038
assert_frame_equal(df.filter(pl.col.f), output)
3039
3040
3041
def test_predicate_empty_is_in_21450() -> None:
3042
f = io.BytesIO()
3043
df = pl.DataFrame({"a": [1, 2], "b": [3, 4]})
3044
df.write_parquet(f)
3045
3046
f.seek(0)
3047
assert_frame_equal(
3048
df.clear(),
3049
pl.scan_parquet(f).filter(pl.col("a").is_in([])).collect(),
3050
)
3051
3052
3053
@pytest.mark.write_disk
3054
def test_scan_parquet_filter_statistics_load_missing_column_21391(
3055
tmp_path: Path,
3056
) -> None:
3057
root = tmp_path
3058
dfs = [pl.DataFrame({"x": 1, "y": 1}), pl.DataFrame({"x": 2})]
3059
3060
for i, df in enumerate(dfs):
3061
df.write_parquet(root / f"{i}.parquet")
3062
3063
assert_frame_equal(
3064
(
3065
pl.scan_parquet(root, missing_columns="insert")
3066
.filter(pl.col("y") == 1)
3067
.collect()
3068
),
3069
pl.DataFrame({"x": 1, "y": 1}),
3070
)
3071
3072
3073
@pytest.mark.parametrize(
3074
"ty",
3075
[
3076
(lambda i: i, pl.Int8, True),
3077
(lambda i: datetime(year=2025, month=9, day=i), pl.Datetime, True),
3078
(lambda i: float(i), pl.Float32, True),
3079
(lambda i: str(i), pl.String, True),
3080
(lambda i: str(i) + "make it a bit longer", pl.String, True),
3081
(lambda i: [i, i + 7] * (i % 3), pl.List(pl.Int32), True),
3082
(lambda i: {"x": i}, pl.Struct({"x": pl.Int32}), True),
3083
(lambda i: [i, i + 3, i + 7], pl.Array(pl.Int32, 3), False),
3084
],
3085
)
3086
def test_filter_nulls_21538(ty: tuple[Callable[[int], Any], pl.DataType, bool]) -> None:
3087
i_to_value, dtype, do_no_dicts = ty
3088
3089
patterns: list[list[int | None]] = [
3090
[None, None, None, None, None],
3091
[1, None, None, 2, None],
3092
[None, 1, 2, 3, 4],
3093
[1, 2, 3, 4, None],
3094
[None, 1, 2, 3, None],
3095
[None, 1, None, 3, None],
3096
[1, 2, 3, 4, 5],
3097
]
3098
3099
df = pl.DataFrame(
3100
[
3101
pl.Series(
3102
f"p{i}", [None if v is None else i_to_value(v) for v in pattern], dtype
3103
)
3104
for i, pattern in enumerate(patterns)
3105
]
3106
)
3107
3108
fs = []
3109
3110
dicts_f = io.BytesIO()
3111
df.write_parquet(dicts_f)
3112
fs += [dicts_f]
3113
3114
if do_no_dicts:
3115
no_dicts_f = io.BytesIO()
3116
pq.write_table(df.to_arrow(), no_dicts_f, use_dictionary=False)
3117
fs += [no_dicts_f]
3118
3119
for f in fs:
3120
for i in range(len(patterns)):
3121
f.seek(0)
3122
assert_frame_equal(
3123
pl.scan_parquet(f).filter(pl.col(f"p{i}").is_null()).collect(),
3124
df.filter(pl.col(f"p{i}").is_null()),
3125
)
3126
3127
f.seek(0)
3128
assert_frame_equal(
3129
pl.scan_parquet(f).filter(pl.col(f"p{i}").is_not_null()).collect(),
3130
df.filter(pl.col(f"p{i}").is_not_null()),
3131
)
3132
3133
3134
def test_unspecialized_decoding_prefiltering() -> None:
3135
df = pl.DataFrame(
3136
{
3137
"a": [None, None, None, "abc"],
3138
"b": [False, True, False, True],
3139
}
3140
)
3141
3142
cols = df.columns
3143
3144
encodings = dict.fromkeys(cols, "DELTA_LENGTH_BYTE_ARRAY")
3145
encodings["b"] = "PLAIN"
3146
3147
f = io.BytesIO()
3148
pq.write_table(
3149
df.to_arrow(),
3150
f,
3151
use_dictionary=False,
3152
column_encoding=encodings,
3153
)
3154
3155
f.seek(0)
3156
expr = pl.col("b")
3157
result = (
3158
pl.scan_parquet(f, parallel="prefiltered")
3159
.filter(expr)
3160
.collect(engine="streaming")
3161
)
3162
assert_frame_equal(result, df.filter(expr))
3163
3164
3165
@pytest.mark.parametrize("parallel", ["columns", "row_groups"])
3166
def test_filtering_on_other_parallel_modes_with_statistics(
3167
parallel: ParallelStrategy,
3168
) -> None:
3169
f = io.BytesIO()
3170
3171
pl.DataFrame(
3172
{
3173
"a": [1, 4, 9, 2, 4, 8, 3, 4, 7],
3174
}
3175
).write_parquet(f, row_group_size=3)
3176
3177
f.seek(0)
3178
assert_series_equal(
3179
pl.scan_parquet(f, parallel=parallel)
3180
.filter(pl.col.a == 4)
3181
.collect()
3182
.to_series(),
3183
pl.Series("a", [4, 4, 4]),
3184
)
3185
3186
3187
def test_filter_on_logical_dtype_22252() -> None:
3188
f = io.BytesIO()
3189
pl.Series("a", [datetime(1996, 10, 5)]).to_frame().write_parquet(f)
3190
f.seek(0)
3191
pl.scan_parquet(f).filter(pl.col.a.dt.weekday() == 6).collect()
3192
3193
3194
def test_filter_nan_22289() -> None:
3195
f = io.BytesIO()
3196
pl.DataFrame(
3197
{"a": [1, 2, float("nan")], "b": [float("nan"), 5, 6]}, strict=False
3198
).write_parquet(f)
3199
3200
f.seek(0)
3201
lf = pl.scan_parquet(f)
3202
3203
assert_frame_equal(
3204
lf.collect().filter(pl.col.a.is_not_nan()),
3205
lf.filter(pl.col.a.is_not_nan()).collect(),
3206
)
3207
3208
assert_frame_equal(
3209
lf.collect().filter(pl.col.a.is_nan()),
3210
lf.filter(pl.col.a.is_nan()).collect(),
3211
)
3212
3213
3214
def test_encode_utf8_check_22467() -> None:
3215
f = io.BytesIO()
3216
values = ["😀" * 129, "😀"]
3217
3218
pq.write_table(pl.Series(values).to_frame().to_arrow(), f, use_dictionary=False)
3219
3220
f.seek(0)
3221
pl.scan_parquet(f).slice(1, 1).collect()
3222
3223
3224
def test_reencode_categoricals_22385() -> None:
3225
tbl = pl.Series("a", ["abc"], pl.Categorical()).to_frame().to_arrow()
3226
tbl = tbl.cast(
3227
pa.schema(
3228
[
3229
pa.field(
3230
"a",
3231
pa.dictionary(pa.int32(), pa.large_string()),
3232
metadata=tbl.schema[0].metadata,
3233
),
3234
]
3235
)
3236
)
3237
3238
f = io.BytesIO()
3239
pq.write_table(tbl, f)
3240
3241
f.seek(0)
3242
pl.scan_parquet(f).collect()
3243
3244
3245
def test_parquet_read_timezone_22506() -> None:
3246
f = io.BytesIO()
3247
3248
pd.DataFrame(
3249
{
3250
"a": [1, 2],
3251
"b": pd.to_datetime(
3252
["2020-01-01T00:00:00+01:00", "2020-01-02T00:00:00+01:00"]
3253
),
3254
}
3255
).to_parquet(f)
3256
3257
assert b'"metadata": {"timezone": "+01:00"}}' in f.getvalue()
3258
3259
f.seek(0)
3260
3261
pd_version = parse_version(pd.__version__)
3262
time_unit: TimeUnit = "us" if pd_version >= (3,) else "ns"
3263
3264
assert_frame_equal(
3265
pl.read_parquet(f),
3266
pl.DataFrame(
3267
{
3268
"a": [1, 2],
3269
"b": [
3270
datetime(2020, 1, 1, tzinfo=ZoneInfo("Etc/GMT-1")),
3271
datetime(2020, 1, 2, tzinfo=ZoneInfo("Etc/GMT-1")),
3272
],
3273
},
3274
schema={
3275
"a": pl.Int64,
3276
"b": pl.Datetime(time_unit=time_unit, time_zone="Etc/GMT-1"),
3277
},
3278
),
3279
)
3280
3281
3282
@pytest.mark.parametrize("static", [True, False])
3283
@pytest.mark.parametrize("lazy", [True, False])
3284
def test_read_write_metadata(static: bool, lazy: bool) -> None:
3285
metadata = {"hello": "world", "something": "else"}
3286
md: ParquetMetadata = metadata
3287
if not static:
3288
md = lambda ctx: metadata # noqa: E731
3289
3290
df = pl.DataFrame({"a": [1, 2, 3]})
3291
3292
f = io.BytesIO()
3293
if lazy:
3294
df.lazy().sink_parquet(f, metadata=md)
3295
else:
3296
df.write_parquet(f, metadata=md)
3297
3298
f.seek(0)
3299
actual = pl.read_parquet_metadata(f)
3300
assert "ARROW:schema" in actual
3301
assert metadata == {k: v for k, v in actual.items() if k != "ARROW:schema"}
3302
3303
3304
@pytest.mark.write_disk
3305
def test_metadata_callback_info(tmp_path: Path) -> None:
3306
df = pl.DataFrame({"a": [1, 2, 3]})
3307
num_writes = 0
3308
3309
def fn_metadata(ctx: ParquetMetadataContext) -> dict[str, str]:
3310
nonlocal num_writes
3311
num_writes += 1
3312
return {}
3313
3314
df.write_parquet(tmp_path, partition_by="a", metadata=fn_metadata)
3315
3316
assert num_writes == len(df)
3317
3318
3319
def test_multiple_sorting_columns() -> None:
3320
df = pl.DataFrame(
3321
{
3322
"a": [1, 1, 1, 2, 2, 2],
3323
"b": [1, 2, 3, 1, 2, 3],
3324
}
3325
)
3326
3327
f = io.BytesIO()
3328
pq.write_table(
3329
df.to_arrow(),
3330
f,
3331
sorting_columns=[pq.SortingColumn(0), pq.SortingColumn(1)],
3332
)
3333
3334
f.seek(0)
3335
roundtrip = pl.read_parquet(f)
3336
assert roundtrip.get_column("a").is_sorted()
3337
assert not roundtrip.get_column("b").is_sorted()
3338
assert_frame_equal(roundtrip.sort("b"), df.sort("b"))
3339
3340
3341
@pytest.mark.write_disk
3342
def test_read_parquet_duplicate_range_start_fetch_23139(tmp_path: Path) -> None:
3343
tmp_path.mkdir(exist_ok=True)
3344
path = tmp_path / "data.parquet"
3345
3346
df = pl.DataFrame(
3347
schema={
3348
"a": pl.Boolean,
3349
"b": pl.Boolean,
3350
}
3351
)
3352
3353
df.write_parquet(path, use_pyarrow=True)
3354
3355
assert_frame_equal(pl.read_parquet(path), df)
3356
3357
3358
@pytest.mark.parametrize(
3359
("value", "scan_dtype", "filter_expr"),
3360
[
3361
(pl.lit(1, dtype=pl.Int8), pl.Int16, pl.col("x") > 1),
3362
(pl.lit(1.0, dtype=pl.Float64), pl.Float32, pl.col("x") > 1.0),
3363
(pl.lit(1.0, dtype=pl.Float32), pl.Float64, pl.col("x") > 1.0),
3364
(
3365
pl.lit(
3366
datetime(2025, 1, 1),
3367
dtype=pl.Datetime(time_unit="ns", time_zone="Europe/Amsterdam"),
3368
),
3369
pl.Datetime(time_unit="ms", time_zone="Australia/Sydney"),
3370
pl.col("x")
3371
!= pl.lit(
3372
datetime(2025, 1, 1, 10),
3373
dtype=pl.Datetime(time_unit="ms", time_zone="Australia/Sydney"),
3374
),
3375
),
3376
# Note: This is not implemented at all
3377
# (
3378
# pl.lit({"a": 1}, dtype=pl.Struct({"a": pl.Int8})),
3379
# pl.Struct({"a": pl.Int64}),
3380
# pl.col("x").struct.field("a") > 1,
3381
# ),
3382
],
3383
)
3384
def test_scan_parquet_skip_row_groups_with_cast(
3385
value: Any,
3386
scan_dtype: pl.DataType,
3387
filter_expr: pl.Expr,
3388
plmonkeypatch: PlMonkeyPatch,
3389
capfd: pytest.CaptureFixture[str],
3390
) -> None:
3391
f = io.BytesIO()
3392
3393
df = pl.select(x=value)
3394
3395
df.write_parquet(f)
3396
3397
q = pl.scan_parquet(
3398
f,
3399
schema={"x": scan_dtype},
3400
cast_options=pl.ScanCastOptions(
3401
integer_cast="upcast",
3402
float_cast=["upcast", "downcast"],
3403
datetime_cast=["convert-timezone", "nanosecond-downcast"],
3404
missing_struct_fields="insert",
3405
),
3406
).filter(filter_expr)
3407
3408
plmonkeypatch.setenv("POLARS_VERBOSE", "1")
3409
capfd.readouterr()
3410
out = q.collect()
3411
assert "reading 0 / 1 row groups" in capfd.readouterr().err
3412
3413
assert_frame_equal(out, pl.DataFrame(schema={"x": scan_dtype}))
3414
3415
3416
@pytest.mark.parametrize(
3417
("value", "scan_dtype", "filter_expr"),
3418
[
3419
(pl.lit(1, dtype=pl.Int8), pl.Int16, pl.col("x") == 1),
3420
(pl.lit(1.0, dtype=pl.Float64), pl.Float32, pl.col("x") == 1.0),
3421
(pl.lit(1.0, dtype=pl.Float32), pl.Float64, pl.col("x") == 1.0),
3422
(
3423
pl.lit(
3424
datetime(2025, 1, 1),
3425
dtype=pl.Datetime(time_unit="ns", time_zone="Europe/Amsterdam"),
3426
),
3427
pl.Datetime(time_unit="ms", time_zone="Australia/Sydney"),
3428
pl.col("x")
3429
== pl.lit(
3430
datetime(2025, 1, 1, 10),
3431
dtype=pl.Datetime(time_unit="ms", time_zone="Australia/Sydney"),
3432
),
3433
),
3434
(
3435
pl.lit({"a": 1}, dtype=pl.Struct({"a": pl.Int8})),
3436
pl.Struct({"a": pl.Int64}),
3437
pl.col("x").struct.field("a") == 1,
3438
),
3439
],
3440
)
3441
@pytest.mark.may_fail_cloud # reason: looks at stdout
3442
def test_scan_parquet_skip_row_groups_with_cast_inclusions(
3443
value: Any,
3444
scan_dtype: pl.DataType,
3445
filter_expr: pl.Expr,
3446
plmonkeypatch: PlMonkeyPatch,
3447
capfd: pytest.CaptureFixture[str],
3448
) -> None:
3449
f = io.BytesIO()
3450
df = pl.select(x=value)
3451
df.write_parquet(f)
3452
3453
f.seek(0)
3454
q = pl.scan_parquet(
3455
f,
3456
schema={"x": scan_dtype},
3457
cast_options=pl.ScanCastOptions(
3458
integer_cast="upcast",
3459
float_cast=["upcast", "downcast"],
3460
datetime_cast=["convert-timezone", "nanosecond-downcast"],
3461
missing_struct_fields="insert",
3462
),
3463
).filter(filter_expr)
3464
3465
plmonkeypatch.setenv("POLARS_VERBOSE", "1")
3466
capfd.readouterr()
3467
out = q.collect()
3468
assert "reading 1 / 1 row groups" in capfd.readouterr().err
3469
3470
assert_frame_equal(out, pl.select(x=value).select(pl.first().cast(scan_dtype)))
3471
3472
3473
def test_roundtrip_int128() -> None:
3474
f = io.BytesIO()
3475
s = pl.Series("a", [1, 2, 3], pl.Int128)
3476
s.to_frame().write_parquet(f)
3477
f.seek(0)
3478
assert_series_equal(pl.read_parquet(f).to_series(), s)
3479
3480
3481
def test_write_nested_categoricals() -> None:
3482
df = pl.select(
3483
pl.lit(pl.Series("col", ["a", "b"], dtype=pl.Categorical)).implode().implode(),
3484
)
3485
3486
f = io.BytesIO()
3487
df.lazy().sink_parquet(f)
3488
3489
f.seek(0)
3490
assert_frame_equal(pl.read_parquet(f), df)
3491
3492
3493
def test_literal_predicate_23901() -> None:
3494
df = pl.DataFrame({"x": range(10)})
3495
3496
f = io.BytesIO()
3497
df.write_parquet(f, row_group_size=1)
3498
3499
f.seek(0)
3500
assert_frame_equal(pl.scan_parquet(f).filter(pl.lit(1) == 1).collect(), df)
3501
3502
3503
def test_str_plain_is_in_more_than_4_values_24167() -> None:
3504
f = io.BytesIO()
3505
3506
df = pl.DataFrame({"a": ["a"]})
3507
pq.write_table(df.to_arrow(), f, use_dictionary=False)
3508
3509
f.seek(0)
3510
lf = pl.scan_parquet(f).filter(pl.col.a.is_in(["a", "y", "z", "w", "x"]))
3511
3512
assert_frame_equal(
3513
lf.collect(),
3514
lf.collect(optimizations=pl.QueryOptFlags(predicate_pushdown=False)),
3515
)
3516
3517
3518
def test_binary_offset_roundtrip() -> None:
3519
f = io.BytesIO()
3520
pl.LazyFrame(
3521
{
3522
"a": [1, 2, 3],
3523
}
3524
).select(pl.col.a._row_encode()).sink_parquet(f)
3525
3526
f.seek(0)
3527
lf = pl.scan_parquet(f)
3528
3529
assert "binary[offset]" in str(lf.collect())
3530
3531
3532
@pytest.mark.parametrize(
3533
"df",
3534
[
3535
pl.Series("a", [], pl.Struct([])).to_frame(),
3536
pl.Series("a", [{}]).to_frame(),
3537
pl.Series("a", [{}, None, {}]).to_frame(),
3538
pl.Series("a", [[{}, None], [{}]], pl.List(pl.Struct([]))).to_frame(),
3539
pl.Series("a", [[{}, None], None, []], pl.List(pl.Struct([]))).to_frame(),
3540
pl.DataFrame(
3541
{
3542
"a": [{}, None, {}],
3543
"b": [1, 2, 3],
3544
}
3545
),
3546
pl.DataFrame(
3547
{
3548
"b": [1, 2, 3],
3549
"a": [{}, None, {}],
3550
}
3551
),
3552
pl.DataFrame(
3553
{
3554
"x": [2, 3, 4],
3555
"a": [{}, None, {}],
3556
"b": [1, 2, 3],
3557
}
3558
),
3559
],
3560
)
3561
def test_empty_struct_roundtrip(df: pl.DataFrame, plmonkeypatch: PlMonkeyPatch) -> None:
3562
plmonkeypatch.setenv("POLARS_ALLOW_PQ_EMPTY_STRUCT", "1")
3563
3564
f = io.BytesIO()
3565
df.write_parquet(f)
3566
3567
f.seek(0)
3568
assert_frame_equal(df, pl.read_parquet(f))
3569
3570
3571
def test_parquet_is_in_multiple_pages_25312() -> None:
3572
tbl = pl.DataFrame(
3573
{
3574
"a": ["a"] * 5000 + [None],
3575
}
3576
).to_arrow()
3577
3578
f = io.BytesIO()
3579
pq.write_table(tbl, f, data_page_size=1, use_dictionary=False)
3580
f.seek(0)
3581
3582
assert_frame_equal(
3583
pl.scan_parquet(f).filter(pl.col.a.is_in(["a"])).collect(),
3584
pl.DataFrame(
3585
{
3586
"a": ["a"] * 5000,
3587
}
3588
),
3589
)
3590
3591
3592
@given(s=series(name="a", dtype=pl.String()))
3593
def test_regex_prefiltering_parametric(s: pl.Series) -> None:
3594
f = io.BytesIO()
3595
s.to_frame().write_parquet(f)
3596
3597
predicates = [
3598
pl.col.a.str.starts_with("aaa"),
3599
pl.col.a.str.ends_with("aaa"),
3600
pl.col.a.str.contains("aaa"),
3601
]
3602
3603
if (it := s.first()) is not None:
3604
assert isinstance(it, str)
3605
escape = pl.escape_regex(it)
3606
predicates += [
3607
pl.col.a.str.starts_with(it),
3608
pl.col.a.str.starts_with(it[1:]),
3609
pl.col.a.str.starts_with(it[:-1]),
3610
pl.col.a.str.ends_with(it),
3611
pl.col.a.str.ends_with(it[1:]),
3612
pl.col.a.str.ends_with(it[:-1]),
3613
pl.col.a.str.contains(escape),
3614
pl.col.a.str.contains(it, literal=True),
3615
pl.col.a.str.contains(it[1:], literal=True),
3616
]
3617
3618
if (it := s.last()) is not None:
3619
assert isinstance(it, str)
3620
escape = pl.escape_regex(it)
3621
predicates += [
3622
pl.col.a.str.starts_with(it),
3623
pl.col.a.str.starts_with(it[1:]),
3624
pl.col.a.str.starts_with(it[:-1]),
3625
pl.col.a.str.ends_with(it),
3626
pl.col.a.str.ends_with(it[1:]),
3627
pl.col.a.str.ends_with(it[:-1]),
3628
pl.col.a.str.contains(escape),
3629
pl.col.a.str.contains(it, literal=True),
3630
pl.col.a.str.contains(it[1:], literal=True),
3631
]
3632
3633
for p in predicates:
3634
f.seek(0)
3635
assert_series_equal(
3636
pl.scan_parquet(f).filter(p).collect().to_series(),
3637
s.to_frame().filter(p).to_series(),
3638
)
3639
3640
3641
@given(
3642
s=series(
3643
name="a",
3644
allowed_dtypes=[pl.Int8, pl.UInt8, pl.Int32, pl.UInt32, pl.Int64, pl.UInt64],
3645
),
3646
start=st.integers(),
3647
end=st.integers(),
3648
)
3649
def test_between_prefiltering_parametric(s: pl.Series, start: int, end: int) -> None:
3650
f = io.BytesIO()
3651
s.to_frame().write_parquet(f)
3652
3653
if s.dtype.is_unsigned_integer():
3654
start = abs(start)
3655
end = abs(end)
3656
3657
start %= s.upper_bound().item() + 1
3658
end %= s.upper_bound().item() + 1
3659
3660
if start > end:
3661
t = start
3662
start = end
3663
end = t
3664
3665
predicates = [
3666
pl.col.a.is_between(pl.lit(start, s.dtype), pl.lit(end, s.dtype)),
3667
pl.col.a > pl.lit(start, s.dtype),
3668
pl.col.a < pl.lit(end, s.dtype),
3669
pl.col.a >= pl.lit(start, s.dtype),
3670
pl.col.a <= pl.lit(end, s.dtype),
3671
pl.col.a.is_in(pl.lit([start, end], pl.List(s.dtype))),
3672
pl.col.a == pl.lit(start, s.dtype),
3673
pl.lit(True, pl.Boolean),
3674
pl.lit(False, pl.Boolean),
3675
]
3676
3677
for p in predicates:
3678
f.seek(0)
3679
assert_series_equal(
3680
pl.scan_parquet(f).filter(p).collect().to_series(),
3681
s.to_frame().filter(p).to_series(),
3682
)
3683
3684
3685
@pytest.mark.parametrize(
3686
("pl_dtype", "pa_dtype"),
3687
[
3688
(pl.Null, pa.null()),
3689
(pl.Boolean, pa.bool_()),
3690
(pl.String, pa.large_string()),
3691
(pl.Binary, pa.large_binary()),
3692
(pl.Int8, pa.int8()),
3693
(pl.Int16, pa.int16()),
3694
(pl.Int32, pa.int32()),
3695
(pl.Int64, pa.int64()),
3696
# Int128 is a custom polars type
3697
(pl.UInt8, pa.uint8()),
3698
(pl.UInt16, pa.uint16()),
3699
(pl.UInt32, pa.uint32()),
3700
(pl.UInt64, pa.uint64()),
3701
# UInt128 is a custom polars type
3702
(pl.Float16, pa.float16()),
3703
(pl.Float32, pa.float32()),
3704
(pl.Float64, pa.float64()),
3705
(pl.Decimal(38, 10), pa.decimal128(38, 10)),
3706
(pl.Categorical, pa.dictionary(pa.uint32(), pa.string())),
3707
(pl.Enum(["x", "y", "z"]), pa.dictionary(pa.uint8(), pa.string())),
3708
(pl.List(pl.Int32), pa.large_list(pa.int32())),
3709
(pl.Array(pl.Int32, 3), pa.list_(pa.int32(), 3)),
3710
(
3711
pl.Struct({"x": pl.Int32, "y": pl.Utf8}),
3712
pa.struct(
3713
[
3714
pa.field("x", pa.int32()),
3715
pa.field("y", pa.large_string()),
3716
]
3717
),
3718
),
3719
(pl.Date, pa.date32()),
3720
(pl.Datetime(time_unit="ms"), pa.timestamp("ms")),
3721
(
3722
pl.Datetime(time_unit="ms", time_zone="CET"),
3723
pa.timestamp("ms", tz="CET"),
3724
),
3725
(pl.Duration(time_unit="ms"), pa.duration("ms")),
3726
(pl.Time, pa.time64("ns")),
3727
],
3728
)
3729
def test_parquet_schema_correctness(
3730
pl_dtype: pl.DataType, pa_dtype: pa.DataType
3731
) -> None:
3732
f = io.BytesIO()
3733
df = pl.DataFrame({"a": []}, schema={"a": pl_dtype})
3734
df.write_parquet(f, use_pyarrow=False)
3735
f.seek(0)
3736
table = pq.read_table(f)
3737
assert table["a"].type == pa_dtype
3738
3739
f.truncate(0)
3740
df = pl.DataFrame({"a": []}, schema={"a": pl.Struct([pl.Field("f0", pl_dtype)])})
3741
df.write_parquet(f, use_pyarrow=False)
3742
f.seek(0)
3743
table = pq.read_table(f)
3744
assert table["a"].type == pa.struct([pa.field("f0", pa_dtype)])
3745
3746
3747
@pytest.mark.slow
3748
def test_parquet_is_in_pushdown_large_26007() -> None:
3749
# Create parquet with large_string type and ZSTD compression.
3750
df = pl.DataFrame(
3751
{
3752
"id": list(range(161877)),
3753
"symbol": pl.Series([f"SYM{i:06d} VALUE" for i in range(161877)]),
3754
"val": pl.Series([f"SYM{i:06d} VALUE" for i in range(161877)]),
3755
"currency": ["USD"] * 161877,
3756
}
3757
)
3758
3759
buf = io.BytesIO()
3760
df.write_parquet(buf, compression="zstd")
3761
buf.seek(0)
3762
3763
# Large filter.
3764
filter_keys = [f"SYM{i:06d} VALUE" for i in range(0, 161877, 100)] * 10
3765
3766
lf = pl.scan_parquet(buf)
3767
result = lf.filter(pl.col("symbol").is_in(filter_keys)).collect()
3768
expected = pl.DataFrame(
3769
{
3770
"id": list(range(0, 161877, 100)),
3771
"symbol": pl.Series([f"SYM{i:06d} VALUE" for i in range(0, 161877, 100)]),
3772
"val": pl.Series([f"SYM{i:06d} VALUE" for i in range(0, 161877, 100)]),
3773
"currency": ["USD"] * (1 + 161877 // 100),
3774
}
3775
)
3776
assert_frame_equal(result, expected)
3777
3778
3779
@pytest.mark.write_disk
3780
def test_parquet_ordered_cat_26174(tmp_path: Path) -> None:
3781
df_pandas = pd.DataFrame(
3782
{
3783
"dummy": pd.Categorical(
3784
["alpha", "beta", "gamma", "alpha", "delta", "beta", "gamma"],
3785
ordered=True,
3786
)
3787
}
3788
)
3789
df_pandas.to_parquet(tmp_path / "test.parquet", index=False)
3790
3791
df = pl.scan_parquet(tmp_path / "test.parquet").collect()
3792
assert_frame_equal(
3793
df,
3794
pl.DataFrame(
3795
{"dummy": ["alpha", "beta", "gamma", "alpha", "delta", "beta", "gamma"]},
3796
schema={"dummy": pl.Categorical},
3797
),
3798
)
3799
3800
3801
def test_scan_parquet_is_in_with_long_string_26332() -> None:
3802
short_values = [f"s{i:04d}" for i in range(200)]
3803
long_value = "L" * 64
3804
values = short_values[:100] + [long_value] + short_values[100:]
3805
3806
df = pl.DataFrame({"id": range(len(values)), "s": values})
3807
3808
f = io.BytesIO()
3809
df.write_parquet(f)
3810
f.seek(0)
3811
3812
result = pl.scan_parquet(f).filter(pl.col("s").is_in(["s0001"])).collect()
3813
expected = pl.DataFrame({"id": [1], "s": ["s0001"]})
3814
3815
assert_frame_equal(result, expected)
3816
3817
3818
def test_parquet_data_page_size_dictionary_encoding_20141(tmp_path: Path) -> None:
3819
df = pl.DataFrame({"a": ["A"] * 10_000}, schema={"a": pl.Enum(["A"])})
3820
3821
tmp_file = tmp_path / "test_20141.parquet"
3822
df.write_parquet(tmp_file, data_page_size=1)
3823
3824
assert_frame_equal(df, pl.read_parquet(tmp_file))
3825
3826
if sys.platform.startswith("win"):
3827
return
3828
3829
code = f'import os; os.environ["PARQUET_DO_VERBOSE"]="1"; import polars as pl; pl.read_parquet("{tmp_file}")'
3830
result = subprocess.run(
3831
[sys.executable, "-c", code], capture_output=True, text=True
3832
)
3833
data_pages = result.stderr.count("DataPageV1")
3834
dict_pages = result.stderr.count("DictPage")
3835
3836
tmp_file.unlink()
3837
3838
assert data_pages == 10_000, (
3839
f"BUG #20141: Expected 10000 data pages, got {data_pages}"
3840
)
3841
assert dict_pages == 1, f"Expected 1 dict page, got {dict_pages}"
3842
3843
3844
def test_parquet_nested_dictionary_multipage(tmp_path: Path) -> None:
3845
# Math
3846
# 5 rows, 28 bytes
3847
# bytes_per_row = ceil(28/5) = 6
3848
# data_page_size = 20
3849
# rows_per_page = ceil(20/6) = 4
3850
# 5 rows / 4 rows per page = 2 pages
3851
df = pl.DataFrame(
3852
{"a": [["A", "B"], ["C"], ["A", "C", "B"], [], ["B"]]},
3853
schema={"a": pl.List(pl.Categorical)},
3854
)
3855
3856
path = tmp_path / "test.parquet"
3857
df.write_parquet(path, data_page_size=20)
3858
3859
assert_frame_equal(df, pl.read_parquet(path))
3860
3861
if sys.platform.startswith("win"):
3862
return
3863
3864
code = f'import os; os.environ["PARQUET_DO_VERBOSE"]="1"; import polars as pl; pl.read_parquet("{path}")'
3865
result = subprocess.run(
3866
[sys.executable, "-c", code], capture_output=True, text=True
3867
)
3868
3869
data_pages = result.stderr.count("DataPageV1")
3870
dict_pages = result.stderr.count("DictPage")
3871
3872
assert dict_pages == 1, f"Expected 1 dict page, got {dict_pages}"
3873
assert data_pages == 2, f"Expected 2 data pages, got {data_pages}"
3874
3875
3876
def test_parquet_dict_and_data_page_offset_26531(tmp_path: Path) -> None:
3877
df = pl.DataFrame(
3878
{
3879
"id": [1, 2, 3, 4, 5],
3880
"empty_dict_col": pl.Series(
3881
[None, None, None, None, None], dtype=pl.Categorical
3882
),
3883
}
3884
)
3885
3886
filename = tmp_path / "file_native.parquet"
3887
df.write_parquet(filename, use_pyarrow=False)
3888
3889
meta = pq.read_metadata(filename)
3890
rg = meta.row_group(0)
3891
col = rg.column(1) # empty_dict_col
3892
3893
assert "RLE_DICTIONARY" in str(col.encodings)
3894
assert col.dictionary_page_offset is not None
3895
assert col.data_page_offset > col.dictionary_page_offset
3896
3897