Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/io/test_parquet.py
6939 views
1
from __future__ import annotations
2
3
import decimal
4
import functools
5
import io
6
import warnings
7
from datetime import date, datetime, time, timezone
8
from decimal import Decimal
9
from itertools import chain
10
from typing import TYPE_CHECKING, Any, Callable, Literal, cast
11
from zoneinfo import ZoneInfo
12
13
import fsspec
14
import numpy as np
15
import pandas as pd
16
import pyarrow as pa
17
import pyarrow.dataset as ds
18
import pyarrow.parquet as pq
19
import pytest
20
from hypothesis import given
21
from hypothesis import strategies as st
22
23
import polars as pl
24
from polars.exceptions import ComputeError
25
from polars.io.parquet import ParquetFieldOverwrites
26
from polars.testing import assert_frame_equal, assert_series_equal
27
from polars.testing.parametric import column, dataframes
28
from polars.testing.parametric.strategies.core import series
29
30
if TYPE_CHECKING:
31
from pathlib import Path
32
33
from polars._typing import (
34
ParallelStrategy,
35
ParquetCompression,
36
ParquetMetadata,
37
ParquetMetadataContext,
38
)
39
from tests.unit.conftest import MemoryUsage
40
41
42
@pytest.mark.may_fail_auto_streaming
43
def test_round_trip(df: pl.DataFrame) -> None:
44
f = io.BytesIO()
45
df.write_parquet(f)
46
f.seek(0)
47
assert_frame_equal(pl.read_parquet(f), df)
48
49
50
@pytest.mark.may_fail_auto_streaming
51
def test_scan_round_trip(df: pl.DataFrame) -> None:
52
f = io.BytesIO()
53
df.write_parquet(f)
54
f.seek(0)
55
assert_frame_equal(pl.scan_parquet(f).collect(), df)
56
f.seek(0)
57
assert_frame_equal(pl.scan_parquet(f).head().collect(), df.head())
58
59
60
COMPRESSIONS = [
61
"lz4",
62
"uncompressed",
63
"snappy",
64
"gzip",
65
# "lzo", # LZO compression currently not supported by Arrow backend
66
"brotli",
67
"zstd",
68
]
69
70
71
@pytest.mark.write_disk
72
def test_write_parquet_using_pyarrow_9753(tmp_path: Path) -> None:
73
tmp_path.mkdir(exist_ok=True)
74
75
df = pl.DataFrame({"a": [1, 2, 3]})
76
df.write_parquet(
77
tmp_path / "test.parquet",
78
compression="zstd",
79
statistics=True,
80
use_pyarrow=True,
81
pyarrow_options={"coerce_timestamps": "us"},
82
)
83
84
85
@pytest.mark.parametrize("compression", COMPRESSIONS)
86
def test_write_parquet_using_pyarrow_write_to_dataset_with_partitioning(
87
tmp_path: Path,
88
compression: ParquetCompression,
89
) -> None:
90
df = pl.DataFrame({"a": [1, 2, 3], "partition_col": ["one", "two", "two"]})
91
path_to_write = tmp_path / "test_folder"
92
path_to_write.mkdir(exist_ok=True)
93
df.write_parquet(
94
file=path_to_write,
95
statistics=True,
96
use_pyarrow=True,
97
row_group_size=128,
98
pyarrow_options={
99
"partition_cols": ["partition_col"],
100
"compression": compression,
101
},
102
)
103
104
# cast is necessary as pyarrow writes partitions as categorical type
105
read_df = pl.read_parquet(path_to_write, use_pyarrow=True).with_columns(
106
pl.col("partition_col").cast(pl.String)
107
)
108
assert_frame_equal(df, read_df)
109
110
111
@pytest.fixture
112
def small_parquet_path(io_files_path: Path) -> Path:
113
return io_files_path / "small.parquet"
114
115
116
@pytest.mark.parametrize("compression", COMPRESSIONS)
117
@pytest.mark.parametrize("use_pyarrow", [True, False])
118
def test_to_from_buffer(
119
df: pl.DataFrame, compression: ParquetCompression, use_pyarrow: bool
120
) -> None:
121
df = df[["list_str"]]
122
buf = io.BytesIO()
123
df.write_parquet(buf, compression=compression, use_pyarrow=use_pyarrow)
124
buf.seek(0)
125
read_df = pl.read_parquet(buf, use_pyarrow=use_pyarrow)
126
assert_frame_equal(df, read_df, categorical_as_str=True)
127
128
129
@pytest.mark.parametrize("use_pyarrow", [True, False])
130
@pytest.mark.parametrize("rechunk_and_expected_chunks", [(True, 1), (False, 3)])
131
@pytest.mark.may_fail_auto_streaming
132
@pytest.mark.may_fail_cloud # reason: chunking
133
def test_read_parquet_respects_rechunk_16416(
134
use_pyarrow: bool, rechunk_and_expected_chunks: tuple[bool, int]
135
) -> None:
136
# Create a dataframe with 3 chunks:
137
df = pl.DataFrame({"a": [1]})
138
df = pl.concat([df, df, df])
139
buf = io.BytesIO()
140
df.write_parquet(buf, row_group_size=1)
141
buf.seek(0)
142
143
rechunk, expected_chunks = rechunk_and_expected_chunks
144
result = pl.read_parquet(buf, use_pyarrow=use_pyarrow, rechunk=rechunk)
145
assert result.n_chunks() == expected_chunks
146
147
148
def test_to_from_buffer_lzo(df: pl.DataFrame) -> None:
149
buf = io.BytesIO()
150
# Writing lzo compressed parquet files is not supported for now.
151
with pytest.raises(ComputeError):
152
df.write_parquet(buf, compression="lzo", use_pyarrow=False)
153
buf.seek(0)
154
155
buf = io.BytesIO()
156
with pytest.raises(OSError):
157
# Writing lzo compressed parquet files is not supported for now.
158
df.write_parquet(buf, compression="lzo", use_pyarrow=True)
159
buf.seek(0)
160
# Invalid parquet file as writing failed.
161
with pytest.raises(ComputeError):
162
_ = pl.read_parquet(buf)
163
164
165
@pytest.mark.write_disk
166
@pytest.mark.parametrize("compression", COMPRESSIONS)
167
def test_to_from_file(
168
df: pl.DataFrame, compression: ParquetCompression, tmp_path: Path
169
) -> None:
170
tmp_path.mkdir(exist_ok=True)
171
172
file_path = tmp_path / "small.avro"
173
df.write_parquet(file_path, compression=compression)
174
read_df = pl.read_parquet(file_path)
175
assert_frame_equal(df, read_df, categorical_as_str=True)
176
177
178
@pytest.mark.write_disk
179
def test_to_from_file_lzo(df: pl.DataFrame, tmp_path: Path) -> None:
180
tmp_path.mkdir(exist_ok=True)
181
182
file_path = tmp_path / "small.avro"
183
184
# Writing lzo compressed parquet files is not supported for now.
185
with pytest.raises(ComputeError):
186
df.write_parquet(file_path, compression="lzo", use_pyarrow=False)
187
# Invalid parquet file as writing failed.
188
with pytest.raises(ComputeError):
189
_ = pl.read_parquet(file_path)
190
191
# Writing lzo compressed parquet files is not supported for now.
192
with pytest.raises(OSError):
193
df.write_parquet(file_path, compression="lzo", use_pyarrow=True)
194
# Invalid parquet file as writing failed.
195
with pytest.raises(FileNotFoundError):
196
_ = pl.read_parquet(file_path)
197
198
199
def test_select_columns() -> None:
200
df = pl.DataFrame({"a": [1, 2, 3], "b": [True, False, True], "c": ["a", "b", "c"]})
201
expected = pl.DataFrame({"b": [True, False, True], "c": ["a", "b", "c"]})
202
203
f = io.BytesIO()
204
df.write_parquet(f)
205
f.seek(0)
206
207
read_df = pl.read_parquet(f, columns=["b", "c"], use_pyarrow=False)
208
assert_frame_equal(expected, read_df)
209
210
211
def test_select_projection() -> None:
212
df = pl.DataFrame({"a": [1, 2, 3], "b": [True, False, True], "c": ["a", "b", "c"]})
213
expected = pl.DataFrame({"b": [True, False, True], "c": ["a", "b", "c"]})
214
f = io.BytesIO()
215
df.write_parquet(f)
216
f.seek(0)
217
218
read_df = pl.read_parquet(f, columns=[1, 2], use_pyarrow=False)
219
assert_frame_equal(expected, read_df)
220
221
222
@pytest.mark.parametrize("compression", COMPRESSIONS)
223
@pytest.mark.parametrize("use_pyarrow", [True, False])
224
def test_parquet_datetime(compression: ParquetCompression, use_pyarrow: bool) -> None:
225
# This failed because parquet writers cast datetime to Date
226
f = io.BytesIO()
227
data = {
228
"datetime": [ # unix timestamp in ms
229
1618354800000,
230
1618354740000,
231
1618354680000,
232
1618354620000,
233
1618354560000,
234
],
235
"value1": [73.1999969482, 71.0999984741, 74.5, 69.5999984741, 69.6999969482],
236
"value2": [59.5999984741, 61.0, 62.2999992371, 56.9000015259, 60.0],
237
}
238
df = pl.DataFrame(data)
239
df = df.with_columns(df["datetime"].cast(pl.Datetime))
240
241
df.write_parquet(f, use_pyarrow=use_pyarrow, compression=compression)
242
f.seek(0)
243
read = pl.read_parquet(f)
244
assert_frame_equal(read, df)
245
246
247
def test_nested_parquet() -> None:
248
f = io.BytesIO()
249
data = [
250
{"a": [{"b": 0}]},
251
{"a": [{"b": 1}, {"b": 2}]},
252
]
253
df = pd.DataFrame(data)
254
df.to_parquet(f)
255
256
read = pl.read_parquet(f, use_pyarrow=True)
257
assert read.columns == ["a"]
258
assert isinstance(read.dtypes[0], pl.datatypes.List)
259
assert isinstance(read.dtypes[0].inner, pl.datatypes.Struct)
260
261
262
@pytest.mark.write_disk
263
def test_glob_parquet(df: pl.DataFrame, tmp_path: Path) -> None:
264
tmp_path.mkdir(exist_ok=True)
265
file_path = tmp_path / "small.parquet"
266
df.write_parquet(file_path)
267
268
path_glob = tmp_path / "small*.parquet"
269
assert pl.read_parquet(path_glob).shape == (3, df.width)
270
assert pl.scan_parquet(path_glob).collect().shape == (3, df.width)
271
272
273
def test_chunked_round_trip() -> None:
274
df1 = pl.DataFrame(
275
{
276
"a": [1] * 2,
277
"l": [[1] for j in range(2)],
278
}
279
)
280
df2 = pl.DataFrame(
281
{
282
"a": [2] * 3,
283
"l": [[2] for j in range(3)],
284
}
285
)
286
287
df = df1.vstack(df2)
288
289
f = io.BytesIO()
290
df.write_parquet(f)
291
f.seek(0)
292
assert_frame_equal(pl.read_parquet(f), df)
293
294
295
@pytest.mark.write_disk
296
def test_lazy_self_join_file_cache_prop_3979(df: pl.DataFrame, tmp_path: Path) -> None:
297
tmp_path.mkdir(exist_ok=True)
298
299
file_path = tmp_path / "small.parquet"
300
df.write_parquet(file_path)
301
302
a = pl.scan_parquet(file_path)
303
b = pl.DataFrame({"a": [1]}).lazy()
304
305
expected_shape = (3, df.width + b.collect_schema().len())
306
assert a.join(b, how="cross").collect().shape == expected_shape
307
assert b.join(a, how="cross").collect().shape == expected_shape
308
309
310
def test_recursive_logical_type() -> None:
311
df = pl.DataFrame({"str": ["A", "B", "A", "B", "C"], "group": [1, 1, 2, 1, 2]})
312
df = df.with_columns(pl.col("str").cast(pl.Categorical))
313
314
df_groups = df.group_by("group").agg([pl.col("str").alias("cat_list")])
315
f = io.BytesIO()
316
df_groups.write_parquet(f, use_pyarrow=True)
317
f.seek(0)
318
read = pl.read_parquet(f, use_pyarrow=True)
319
assert read.dtypes == [pl.Int64, pl.List(pl.Categorical)]
320
assert read.shape == (2, 2)
321
322
323
def test_nested_dictionary() -> None:
324
df = (
325
pl.DataFrame({"str": ["A", "B", "A", "B", "C"], "group": [1, 1, 2, 1, 2]})
326
.with_columns(pl.col("str").cast(pl.Categorical))
327
.group_by("group")
328
.agg([pl.col("str").alias("cat_list")])
329
)
330
f = io.BytesIO()
331
df.write_parquet(f)
332
f.seek(0)
333
334
read_df = pl.read_parquet(f)
335
assert_frame_equal(df, read_df)
336
337
338
def test_row_group_size_saturation() -> None:
339
df = pl.DataFrame({"a": [1, 2, 3]})
340
f = io.BytesIO()
341
342
# request larger chunk than rows in df
343
df.write_parquet(f, row_group_size=1024)
344
f.seek(0)
345
assert_frame_equal(pl.read_parquet(f), df)
346
347
348
def test_nested_sliced() -> None:
349
for df in [
350
pl.Series([[1, 2], [3, 4], [5, 6]]).slice(2, 2).to_frame(),
351
pl.Series([[None, 2], [3, 4], [5, 6]]).to_frame(),
352
pl.Series([[None, 2], [3, 4], [5, 6]]).slice(2, 2).to_frame(),
353
pl.Series([["a", "a"], ["", "a"], ["c", "de"]]).slice(3, 2).to_frame(),
354
pl.Series([[None, True], [False, False], [True, True]]).slice(2, 2).to_frame(),
355
]:
356
f = io.BytesIO()
357
df.write_parquet(f)
358
f.seek(0)
359
assert_frame_equal(pl.read_parquet(f), df)
360
361
362
def test_parquet_5795() -> None:
363
df_pd = pd.DataFrame(
364
{
365
"a": [
366
"V",
367
"V",
368
"V",
369
"V",
370
"V",
371
"V",
372
"V",
373
"V",
374
"V",
375
"V",
376
"V",
377
"V",
378
"V",
379
"V",
380
None,
381
None,
382
None,
383
None,
384
None,
385
None,
386
]
387
}
388
)
389
f = io.BytesIO()
390
df_pd.to_parquet(f)
391
f.seek(0)
392
assert_frame_equal(pl.read_parquet(f), pl.from_pandas(df_pd))
393
394
395
def test_parquet_nesting_structs_list() -> None:
396
f = io.BytesIO()
397
df = pl.from_records(
398
[
399
{
400
"id": 1,
401
"list_of_structs_col": [
402
{"a": 10, "b": [10, 11, 12]},
403
{"a": 11, "b": [13, 14, 15]},
404
],
405
},
406
{
407
"id": 2,
408
"list_of_structs_col": [
409
{"a": 44, "b": [12]},
410
],
411
},
412
]
413
)
414
415
df.write_parquet(f)
416
f.seek(0)
417
418
assert_frame_equal(pl.read_parquet(f), df)
419
420
421
def test_parquet_nested_dictionaries_6217() -> None:
422
_type = pa.dictionary(pa.int64(), pa.string())
423
424
fields = [("a_type", _type)]
425
struct_type = pa.struct(fields)
426
427
col1 = pa.StructArray.from_arrays(
428
[pa.DictionaryArray.from_arrays([0, 0, 1], ["A", "B"])],
429
fields=struct_type,
430
)
431
432
table = pa.table({"Col1": col1})
433
434
df = pl.from_arrow(table)
435
436
f = io.BytesIO()
437
import pyarrow.parquet as pq
438
439
pq.write_table(table, f, compression="snappy")
440
f.seek(0)
441
read = pl.read_parquet(f)
442
assert_frame_equal(read, df) # type: ignore[arg-type]
443
444
445
@pytest.mark.write_disk
446
def test_head_union(tmp_path: Path) -> None:
447
tmp_path.mkdir(exist_ok=True)
448
449
df1 = pl.DataFrame({"a": [0, 1, 2], "b": [1, 2, 3]})
450
df2 = pl.DataFrame({"a": [3, 4, 5], "b": [4, 5, 6]})
451
452
file_path_1 = tmp_path / "df_fetch_1.parquet"
453
file_path_2 = tmp_path / "df_fetch_2.parquet"
454
file_path_glob = tmp_path / "df_fetch_*.parquet"
455
456
df1.write_parquet(file_path_1)
457
df2.write_parquet(file_path_2)
458
459
result_one = pl.scan_parquet(file_path_1).head(1).collect()
460
result_glob = pl.scan_parquet(file_path_glob).head(1).collect()
461
462
expected = pl.DataFrame({"a": [0], "b": [1]})
463
assert_frame_equal(result_one, expected)
464
465
# Both fetch 1 per file or 1 per dataset would be ok, as we don't guarantee anything
466
# currently we have one per dataset.
467
expected = pl.DataFrame({"a": [0], "b": [1]})
468
assert_frame_equal(result_glob, expected)
469
470
471
@pytest.mark.slow
472
def test_struct_pyarrow_dataset_5796(tmp_path: Path) -> None:
473
tmp_path.mkdir(exist_ok=True)
474
475
num_rows = 2**17 + 1
476
477
df = pl.from_records([{"id": i, "nested": {"a": i}} for i in range(num_rows)])
478
file_path = tmp_path / "out.parquet"
479
df.write_parquet(file_path, use_pyarrow=True)
480
tbl = ds.dataset(file_path).to_table()
481
result = pl.from_arrow(tbl)
482
483
assert_frame_equal(result, df) # type: ignore[arg-type]
484
485
486
@pytest.mark.slow
487
@pytest.mark.parametrize("case", [1048576, 1048577])
488
def test_parquet_chunks_545(case: int) -> None:
489
f = io.BytesIO()
490
# repeat until it has case instances
491
df = pd.DataFrame(
492
np.tile([1.0, pd.to_datetime("2010-10-10")], [case, 1]),
493
columns=["floats", "dates"],
494
)
495
496
# write as parquet
497
df.to_parquet(f)
498
f.seek(0)
499
500
# read it with polars
501
polars_df = pl.read_parquet(f)
502
assert_frame_equal(pl.DataFrame(df), polars_df)
503
504
505
def test_nested_null_roundtrip() -> None:
506
f = io.BytesIO()
507
df = pl.DataFrame(
508
{
509
"experiences": [
510
[
511
{"company": "Google", "years": None},
512
{"company": "Facebook", "years": None},
513
],
514
]
515
}
516
)
517
518
df.write_parquet(f)
519
f.seek(0)
520
df_read = pl.read_parquet(f)
521
assert_frame_equal(df_read, df)
522
523
524
def test_parquet_nested_list_pandas() -> None:
525
# pandas/pyarrow writes as nested null dict
526
df_pd = pd.DataFrame({"listcol": [[] * 10]})
527
f = io.BytesIO()
528
df_pd.to_parquet(f)
529
f.seek(0)
530
df = pl.read_parquet(f)
531
assert df.dtypes == [pl.List(pl.Null)]
532
assert df.to_dict(as_series=False) == {"listcol": [[]]}
533
534
535
def test_parquet_cat_roundtrip() -> None:
536
f = io.BytesIO()
537
538
df = pl.DataFrame({"a": ["a", "b", "c", "d"]}).with_columns(
539
pl.col("a").cast(pl.Categorical)
540
)
541
542
df.write_parquet(f, row_group_size=2)
543
f.seek(0)
544
assert_series_equal(pl.read_parquet(f)["a"], df["a"])
545
546
547
def test_tz_aware_parquet_9586(io_files_path: Path) -> None:
548
result = pl.read_parquet(io_files_path / "tz_aware.parquet")
549
expected = pl.DataFrame(
550
{"UTC_DATETIME_ID": [datetime(2023, 6, 26, 14, 15, 0, tzinfo=timezone.utc)]}
551
).select(pl.col("*").cast(pl.Datetime("ns", "UTC")))
552
assert_frame_equal(result, expected)
553
554
555
def test_nested_list_page_reads_to_end_11548() -> None:
556
df = pl.select(
557
pl.repeat(pl.arange(0, 2048, dtype=pl.UInt64).implode(), 2).alias("x"),
558
)
559
560
f = io.BytesIO()
561
562
pq.write_table(df.to_arrow(), f, data_page_size=1)
563
564
f.seek(0)
565
566
result = pl.read_parquet(f).select(pl.col("x").list.len())
567
assert result.to_series().to_list() == [2048, 2048]
568
569
570
def test_parquet_nano_second_schema() -> None:
571
value = time(9, 0, 0)
572
f = io.BytesIO()
573
df = pd.DataFrame({"Time": [value]})
574
df.to_parquet(f)
575
f.seek(0)
576
assert pl.read_parquet(f).item() == value
577
578
579
def test_nested_struct_read_12610() -> None:
580
n = 1_025
581
expect = pl.select(a=pl.int_range(0, n), b=pl.repeat(1, n)).with_columns(
582
struct=pl.struct(pl.all())
583
)
584
585
f = io.BytesIO()
586
expect.write_parquet(
587
f,
588
use_pyarrow=True,
589
)
590
f.seek(0)
591
592
actual = pl.read_parquet(f)
593
assert_frame_equal(expect, actual)
594
595
596
@pytest.mark.write_disk
597
def test_decimal_parquet(tmp_path: Path) -> None:
598
path = tmp_path / "foo.parquet"
599
df = pl.DataFrame(
600
{
601
"foo": [1, 2, 3],
602
"bar": ["6", "7", "8"],
603
}
604
)
605
606
df = df.with_columns(pl.col("bar").cast(pl.Decimal))
607
608
df.write_parquet(path, statistics=True)
609
out = pl.scan_parquet(path).filter(foo=2).collect().to_dict(as_series=False)
610
assert out == {"foo": [2], "bar": [Decimal("7")]}
611
612
613
@pytest.mark.write_disk
614
def test_enum_parquet(tmp_path: Path) -> None:
615
path = tmp_path / "enum.parquet"
616
df = pl.DataFrame(
617
[pl.Series("e", ["foo", "bar", "ham"], dtype=pl.Enum(["foo", "bar", "ham"]))]
618
)
619
df.write_parquet(path)
620
out = pl.read_parquet(path)
621
assert_frame_equal(df, out)
622
623
624
def test_parquet_rle_non_nullable_12814() -> None:
625
column = (
626
pl.select(x=pl.arange(0, 1025, dtype=pl.Int64) // 10).to_series().to_arrow()
627
)
628
schema = pa.schema([pa.field("foo", pa.int64(), nullable=False)])
629
table = pa.Table.from_arrays([column], schema=schema)
630
631
f = io.BytesIO()
632
pq.write_table(table, f, data_page_size=1)
633
634
f.seek(0)
635
expect = pl.DataFrame(table).tail(10)
636
actual = pl.read_parquet(f).tail(10)
637
638
assert_frame_equal(expect, actual)
639
640
641
@pytest.mark.slow
642
def test_parquet_12831() -> None:
643
n = 70_000
644
df = pl.DataFrame({"x": ["aaaaaa"] * n})
645
f = io.BytesIO()
646
df.write_parquet(f, row_group_size=int(1e8), data_page_size=512)
647
f.seek(0)
648
assert_frame_equal(pl.from_arrow(pq.read_table(f)), df) # type: ignore[arg-type]
649
650
651
@pytest.mark.write_disk
652
def test_parquet_struct_categorical(tmp_path: Path) -> None:
653
tmp_path.mkdir(exist_ok=True)
654
655
df = pl.DataFrame(
656
[
657
pl.Series("a", ["bob"], pl.Categorical),
658
pl.Series("b", ["foo"], pl.Categorical),
659
]
660
)
661
662
file_path = tmp_path / "categorical.parquet"
663
df.write_parquet(file_path)
664
665
out = pl.read_parquet(file_path).select(pl.col("b").value_counts())
666
assert out.to_dict(as_series=False) == {"b": [{"b": "foo", "count": 1}]}
667
668
669
@pytest.mark.write_disk
670
def test_null_parquet(tmp_path: Path) -> None:
671
tmp_path.mkdir(exist_ok=True)
672
673
df = pl.DataFrame([pl.Series("foo", [], dtype=pl.Int8)])
674
file_path = tmp_path / "null.parquet"
675
df.write_parquet(file_path)
676
out = pl.read_parquet(file_path)
677
assert_frame_equal(out, df)
678
679
680
@pytest.mark.write_disk
681
def test_write_parquet_with_null_col(tmp_path: Path) -> None:
682
tmp_path.mkdir(exist_ok=True)
683
684
df1 = pl.DataFrame({"nulls": [None] * 2, "ints": [1] * 2})
685
df2 = pl.DataFrame({"nulls": [None] * 2, "ints": [1] * 2})
686
df3 = pl.DataFrame({"nulls": [None] * 3, "ints": [1] * 3})
687
df = df1.vstack(df2)
688
df = df.vstack(df3)
689
file_path = tmp_path / "with_null.parquet"
690
df.write_parquet(file_path, row_group_size=3)
691
out = pl.read_parquet(file_path)
692
assert_frame_equal(out, df)
693
694
695
@pytest.mark.write_disk
696
def test_scan_parquet_binary_buffered_reader(tmp_path: Path) -> None:
697
tmp_path.mkdir(exist_ok=True)
698
699
df = pl.DataFrame({"a": [1, 2, 3]})
700
file_path = tmp_path / "test.parquet"
701
df.write_parquet(file_path)
702
703
with file_path.open("rb") as f:
704
out = pl.scan_parquet(f).collect()
705
assert_frame_equal(out, df)
706
707
708
@pytest.mark.write_disk
709
def test_read_parquet_binary_buffered_reader(tmp_path: Path) -> None:
710
tmp_path.mkdir(exist_ok=True)
711
712
df = pl.DataFrame({"a": [1, 2, 3]})
713
file_path = tmp_path / "test.parquet"
714
df.write_parquet(file_path)
715
716
with file_path.open("rb") as f:
717
out = pl.read_parquet(f)
718
assert_frame_equal(out, df)
719
720
721
@pytest.mark.write_disk
722
def test_read_parquet_binary_file_io(tmp_path: Path) -> None:
723
tmp_path.mkdir(exist_ok=True)
724
725
df = pl.DataFrame({"a": [1, 2, 3]})
726
file_path = tmp_path / "test.parquet"
727
df.write_parquet(file_path)
728
729
with file_path.open("rb", buffering=0) as f:
730
out = pl.read_parquet(f)
731
assert_frame_equal(out, df)
732
733
734
# https://github.com/pola-rs/polars/issues/15760
735
@pytest.mark.write_disk
736
def test_read_parquet_binary_fsspec(tmp_path: Path) -> None:
737
tmp_path.mkdir(exist_ok=True)
738
739
df = pl.DataFrame({"a": [1, 2, 3]})
740
file_path = tmp_path / "test.parquet"
741
df.write_parquet(file_path)
742
743
with fsspec.open(file_path) as f:
744
out = pl.read_parquet(f)
745
assert_frame_equal(out, df)
746
747
748
def test_read_parquet_binary_bytes_io() -> None:
749
df = pl.DataFrame({"a": [1, 2, 3]})
750
f = io.BytesIO()
751
df.write_parquet(f)
752
f.seek(0)
753
754
out = pl.read_parquet(f)
755
assert_frame_equal(out, df)
756
757
758
def test_read_parquet_binary_bytes() -> None:
759
df = pl.DataFrame({"a": [1, 2, 3]})
760
f = io.BytesIO()
761
df.write_parquet(f)
762
bytes = f.getvalue()
763
764
out = pl.read_parquet(bytes)
765
assert_frame_equal(out, df)
766
767
768
def test_utc_timezone_normalization_13670(tmp_path: Path) -> None:
769
"""'+00:00' timezones becomes 'UTC' timezone."""
770
utc_path = tmp_path / "utc.parquet"
771
zero_path = tmp_path / "00_00.parquet"
772
utc_lowercase_path = tmp_path / "utc_lowercase.parquet"
773
for tz, path in [
774
("+00:00", zero_path),
775
("UTC", utc_path),
776
("utc", utc_lowercase_path),
777
]:
778
pq.write_table(
779
pa.table(
780
{"c1": [1234567890123] * 10},
781
schema=pa.schema([pa.field("c1", pa.timestamp("ms", tz=tz))]),
782
),
783
path,
784
)
785
786
df = pl.scan_parquet([utc_path, zero_path]).head(5).collect()
787
assert cast(pl.Datetime, df.schema["c1"]).time_zone == "UTC"
788
df = pl.scan_parquet([zero_path, utc_path]).head(5).collect()
789
assert cast(pl.Datetime, df.schema["c1"]).time_zone == "UTC"
790
df = pl.scan_parquet([zero_path, utc_lowercase_path]).head(5).collect()
791
assert cast(pl.Datetime, df.schema["c1"]).time_zone == "UTC"
792
793
794
def test_parquet_rle_14333() -> None:
795
vals = [True, False, True, False, True, False, True, False, True, False]
796
table = pa.table({"a": vals})
797
798
f = io.BytesIO()
799
pq.write_table(table, f, data_page_version="2.0")
800
f.seek(0)
801
assert pl.read_parquet(f)["a"].to_list() == vals
802
803
804
def test_parquet_rle_null_binary_read_14638() -> None:
805
df = pl.DataFrame({"x": [None]}, schema={"x": pl.String})
806
807
f = io.BytesIO()
808
df.write_parquet(f, use_pyarrow=True)
809
f.seek(0)
810
assert "RLE_DICTIONARY" in pq.read_metadata(f).row_group(0).column(0).encodings
811
f.seek(0)
812
assert_frame_equal(df, pl.read_parquet(f))
813
814
815
def test_parquet_string_rle_encoding() -> None:
816
n = 3
817
data = {
818
"id": ["abcdefgh"] * n,
819
}
820
821
df = pl.DataFrame(data)
822
f = io.BytesIO()
823
df.write_parquet(f, use_pyarrow=False)
824
f.seek(0)
825
826
assert (
827
"RLE_DICTIONARY"
828
in pq.ParquetFile(f).metadata.to_dict()["row_groups"][0]["columns"][0][
829
"encodings"
830
]
831
)
832
833
834
@pytest.mark.may_fail_auto_streaming
835
def test_sliced_dict_with_nulls_14904() -> None:
836
df = (
837
pl.DataFrame({"x": [None, None]})
838
.cast(pl.Categorical)
839
.with_columns(y=pl.concat_list("x"))
840
.slice(0, 1)
841
)
842
test_round_trip(df)
843
844
845
@pytest.fixture
846
def empty_compressed_datapage_v2_path(io_files_path: Path) -> Path:
847
return io_files_path / "empty_datapage_v2.snappy.parquet"
848
849
850
def test_read_empty_compressed_datapage_v2_22170(
851
empty_compressed_datapage_v2_path: Path,
852
) -> None:
853
df = pl.DataFrame({"value": [None]}, schema={"value": pl.Float32})
854
assert_frame_equal(df, pl.read_parquet(empty_compressed_datapage_v2_path))
855
856
857
def test_parquet_array_dtype() -> None:
858
df = pl.DataFrame({"x": []})
859
df = df.cast({"x": pl.Array(pl.Int64, shape=3)})
860
test_round_trip(df)
861
862
863
def test_parquet_array_dtype_nulls() -> None:
864
df = pl.DataFrame({"x": [[1, 2], None, [None, 3]]})
865
df = df.cast({"x": pl.Array(pl.Int64, shape=2)})
866
test_round_trip(df)
867
868
869
@pytest.mark.parametrize(
870
("series", "dtype"),
871
[
872
([[1, 2, 3]], pl.List(pl.Int64)),
873
([[1, None, 3]], pl.List(pl.Int64)),
874
(
875
[{"a": []}, {"a": [1]}, {"a": [1, 2, 3]}],
876
pl.Struct({"a": pl.List(pl.Int64)}),
877
),
878
([{"a": None}, None, {"a": [1, 2, None]}], pl.Struct({"a": pl.List(pl.Int64)})),
879
(
880
[[{"a": []}, {"a": [1]}, {"a": [1, 2, 3]}], None, [{"a": []}, {"a": [42]}]],
881
pl.List(pl.Struct({"a": pl.List(pl.Int64)})),
882
),
883
(
884
[
885
[1, None, 3],
886
None,
887
[1, 3, 4],
888
None,
889
[9, None, 4],
890
[None, 42, 13],
891
[37, 511, None],
892
],
893
pl.List(pl.Int64),
894
),
895
([[1, 2, 3]], pl.Array(pl.Int64, 3)),
896
([[1, None, 3], None, [1, 2, None]], pl.Array(pl.Int64, 3)),
897
([[1, 2], None, [None, 3]], pl.Array(pl.Int64, 2)),
898
pytest.param(
899
[[], [], []],
900
pl.Array(pl.Int64, 0),
901
marks=pytest.mark.may_fail_cloud,
902
), # reason: zero-width array
903
pytest.param(
904
[[], None, []],
905
pl.Array(pl.Int64, 0),
906
marks=pytest.mark.may_fail_cloud,
907
),
908
(
909
[[[1, 5, 2], [42, 13, 37]], [[1, 2, 3], [5, 2, 3]], [[1, 2, 1], [3, 1, 3]]],
910
pl.Array(pl.Array(pl.Int8, 3), 2),
911
),
912
(
913
[[[1, 5, 2], [42, 13, 37]], None, [None, [3, 1, 3]]],
914
pl.Array(pl.Array(pl.Int8, 3), 2),
915
),
916
(
917
[
918
[[[2, 1], None, [4, 1], None], []],
919
None,
920
[None, [[4, 4], None, [1, 2]]],
921
],
922
pl.Array(pl.List(pl.Array(pl.Int8, 2)), 2),
923
),
924
([[[], []]], pl.Array(pl.List(pl.Array(pl.Int8, 2)), 2)),
925
(
926
[
927
[
928
[[[42, 13, 37, 15, 9, 20, 0, 0, 5, 10], None]],
929
[None, [None, [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]], None],
930
]
931
],
932
pl.Array(pl.List(pl.Array(pl.Array(pl.Int8, 10), 2)), 2),
933
),
934
(
935
[
936
None,
937
[None],
938
[[None]],
939
[[[None]]],
940
[[[[None]]]],
941
[[[[[None]]]]],
942
[[[[[1]]]]],
943
],
944
pl.Array(pl.Array(pl.Array(pl.Array(pl.Array(pl.Int8, 1), 1), 1), 1), 1),
945
),
946
(
947
[
948
None,
949
[None],
950
[[]],
951
[[None]],
952
[[[None], None]],
953
[[[None], [None]]],
954
[[[[None]], [[[1]]]]],
955
[[[[[None]]]]],
956
[[[[[1]]]]],
957
],
958
pl.Array(pl.List(pl.Array(pl.List(pl.Array(pl.Int8, 1)), 1)), 1),
959
),
960
],
961
)
962
def test_complex_types(series: list[Any], dtype: pl.DataType) -> None:
963
xs = pl.Series(series, dtype=dtype)
964
df = pl.DataFrame({"x": xs})
965
966
test_round_trip(df)
967
968
969
@pytest.mark.write_disk
970
def test_parquet_array_statistics(tmp_path: Path) -> None:
971
tmp_path.mkdir(exist_ok=True)
972
973
df = pl.DataFrame({"a": [[1, 2, 3], [4, 5, 6], [7, 8, 9]], "b": [1, 2, 3]})
974
file_path = tmp_path / "test.parquet"
975
976
df.with_columns(a=pl.col("a").list.to_array(3)).lazy().filter(
977
pl.col("a") != [1, 2, 3]
978
).collect()
979
df.with_columns(a=pl.col("a").list.to_array(3)).lazy().sink_parquet(file_path)
980
981
result = pl.scan_parquet(file_path).filter(pl.col("a") != [1, 2, 3]).collect()
982
assert result.to_dict(as_series=False) == {"a": [[4, 5, 6], [7, 8, 9]], "b": [2, 3]}
983
984
985
@pytest.mark.slow
986
@pytest.mark.write_disk
987
def test_read_parquet_only_loads_selected_columns_15098(
988
memory_usage_without_pyarrow: MemoryUsage, tmp_path: Path
989
) -> None:
990
"""Only requested columns are loaded by ``read_parquet()``."""
991
tmp_path.mkdir(exist_ok=True)
992
993
# Each column will be about 8MB of RAM
994
series = pl.arange(0, 1_000_000, dtype=pl.Int64, eager=True)
995
996
file_path = tmp_path / "multicolumn.parquet"
997
df = pl.DataFrame(
998
{
999
"a": series,
1000
"b": series,
1001
}
1002
)
1003
df.write_parquet(file_path)
1004
del df, series
1005
1006
memory_usage_without_pyarrow.reset_tracking()
1007
1008
# Only load one column:
1009
df = pl.read_parquet([file_path], columns=["b"], rechunk=False)
1010
del df
1011
# Only one column's worth of memory should be used; 2 columns would be
1012
# 16_000_000 at least, but there's some overhead.
1013
# assert 8_000_000 < memory_usage_without_pyarrow.get_peak() < 13_000_000
1014
1015
1016
@pytest.mark.release
1017
def test_max_statistic_parquet_writer() -> None:
1018
# this hits the maximal page size
1019
# so the row group will be split into multiple pages
1020
# the page statistics need to be correctly reduced
1021
# for this query to make sense
1022
n = 150_000
1023
1024
# int64 is important to hit the page size
1025
df = pl.int_range(0, n, eager=True, dtype=pl.Int64).alias("int").to_frame()
1026
f = io.BytesIO()
1027
df.write_parquet(f, statistics=True, use_pyarrow=False, row_group_size=n)
1028
f.seek(0)
1029
result = pl.scan_parquet(f).filter(pl.col("int") > n - 3).collect()
1030
expected = pl.DataFrame({"int": [149998, 149999]})
1031
assert_frame_equal(result, expected)
1032
1033
1034
@pytest.mark.slow
1035
def test_hybrid_rle() -> None:
1036
# 10_007 elements to test if not a nice multiple of 8
1037
n = 10_007
1038
literal_literal = []
1039
literal_rle = []
1040
for i in range(500):
1041
literal_literal.append(np.repeat(i, 5))
1042
literal_literal.append(np.repeat(i + 2, 11))
1043
literal_rle.append(np.repeat(i, 5))
1044
literal_rle.append(np.repeat(i + 2, 15))
1045
literal_literal.append(np.random.randint(0, 10, size=2007))
1046
literal_rle.append(np.random.randint(0, 10, size=7))
1047
literal_literal = np.concatenate(literal_literal)
1048
literal_rle = np.concatenate(literal_rle)
1049
df = pl.DataFrame(
1050
{
1051
# Primitive types
1052
"i64": pl.Series([1, 2], dtype=pl.Int64).sample(n, with_replacement=True),
1053
"u64": pl.Series([1, 2], dtype=pl.UInt64).sample(n, with_replacement=True),
1054
"i8": pl.Series([1, 2], dtype=pl.Int8).sample(n, with_replacement=True),
1055
"u8": pl.Series([1, 2], dtype=pl.UInt8).sample(n, with_replacement=True),
1056
"string": pl.Series(["abc", "def"], dtype=pl.String).sample(
1057
n, with_replacement=True
1058
),
1059
"categorical": pl.Series(["aaa", "bbb"], dtype=pl.Categorical).sample(
1060
n, with_replacement=True
1061
),
1062
# Fill up bit-packing buffer in middle of consecutive run
1063
"large_bit_pack": np.concatenate(
1064
[np.repeat(i, 5) for i in range(2000)]
1065
+ [np.random.randint(0, 10, size=7)]
1066
),
1067
# Literal run that is not a multiple of 8 followed by consecutive
1068
# run initially long enough to RLE but not after padding literal
1069
"literal_literal": literal_literal,
1070
# Literal run that is not a multiple of 8 followed by consecutive
1071
# run long enough to RLE even after padding literal
1072
"literal_rle": literal_rle,
1073
# Final run not long enough to RLE
1074
"final_literal": np.concatenate(
1075
[np.random.randint(0, 100, 10_000), np.repeat(-1, 7)]
1076
),
1077
# Final run long enough to RLE
1078
"final_rle": np.concatenate(
1079
[np.random.randint(0, 100, 9_998), np.repeat(-1, 9)]
1080
),
1081
# Test filling up bit-packing buffer for encode_bool,
1082
# which is only used to encode validities
1083
"large_bit_pack_validity": [0, None] * 4092
1084
+ [0] * 9
1085
+ [1] * 9
1086
+ [2] * 10
1087
+ [0] * 1795,
1088
}
1089
)
1090
f = io.BytesIO()
1091
df.write_parquet(f)
1092
f.seek(0)
1093
for col in pq.ParquetFile(f).metadata.to_dict()["row_groups"][0]["columns"]:
1094
assert "RLE_DICTIONARY" in col["encodings"]
1095
f.seek(0)
1096
assert_frame_equal(pl.read_parquet(f), df)
1097
1098
1099
@given(
1100
df=dataframes(
1101
allowed_dtypes=[
1102
pl.Null,
1103
pl.List,
1104
pl.Array,
1105
pl.Int8,
1106
pl.UInt8,
1107
pl.UInt32,
1108
pl.Int64,
1109
# pl.Date, # Turned off because of issue #17599
1110
# pl.Time, # Turned off because of issue #17599
1111
pl.Binary,
1112
pl.Float32,
1113
pl.Float64,
1114
pl.String,
1115
pl.Boolean,
1116
],
1117
min_size=1,
1118
max_size=500,
1119
)
1120
)
1121
@pytest.mark.slow
1122
def test_roundtrip_parametric(df: pl.DataFrame) -> None:
1123
f = io.BytesIO()
1124
df.write_parquet(f)
1125
f.seek(0)
1126
result = pl.read_parquet(f)
1127
1128
assert_frame_equal(df, result)
1129
1130
1131
def test_parquet_statistics_uint64_16683() -> None:
1132
u64_max = (1 << 64) - 1
1133
df = pl.Series("a", [u64_max, 0], dtype=pl.UInt64).to_frame()
1134
file = io.BytesIO()
1135
df.write_parquet(file, statistics=True)
1136
file.seek(0)
1137
statistics = pq.read_metadata(file).row_group(0).column(0).statistics
1138
1139
assert statistics.min == 0
1140
assert statistics.max == u64_max
1141
1142
1143
@pytest.mark.slow
1144
@pytest.mark.parametrize("nullable", [True, False])
1145
def test_read_byte_stream_split(nullable: bool) -> None:
1146
rng = np.random.default_rng(123)
1147
num_rows = 1_000
1148
values = rng.uniform(-1.0e6, 1.0e6, num_rows)
1149
if nullable:
1150
validity_mask = rng.integers(0, 2, num_rows).astype(np.bool_)
1151
else:
1152
validity_mask = None
1153
1154
schema = pa.schema(
1155
[
1156
pa.field("floats", type=pa.float32(), nullable=nullable),
1157
pa.field("doubles", type=pa.float64(), nullable=nullable),
1158
]
1159
)
1160
arrays = [pa.array(values, type=field.type, mask=validity_mask) for field in schema]
1161
table = pa.Table.from_arrays(arrays, schema=schema)
1162
df = cast(pl.DataFrame, pl.from_arrow(table))
1163
1164
f = io.BytesIO()
1165
pq.write_table(
1166
table, f, compression="snappy", use_dictionary=False, use_byte_stream_split=True
1167
)
1168
1169
f.seek(0)
1170
read = pl.read_parquet(f)
1171
1172
assert_frame_equal(read, df)
1173
1174
1175
@pytest.mark.slow
1176
@pytest.mark.parametrize("rows_nullable", [True, False])
1177
@pytest.mark.parametrize("item_nullable", [True, False])
1178
def test_read_byte_stream_split_arrays(
1179
item_nullable: bool, rows_nullable: bool
1180
) -> None:
1181
rng = np.random.default_rng(123)
1182
num_rows = 1_000
1183
max_array_len = 10
1184
array_lengths = rng.integers(0, max_array_len + 1, num_rows)
1185
if rows_nullable:
1186
row_validity_mask = rng.integers(0, 2, num_rows).astype(np.bool_)
1187
array_lengths[row_validity_mask] = 0
1188
row_validity_mask = pa.array(row_validity_mask)
1189
else:
1190
row_validity_mask = None
1191
1192
offsets = np.zeros(num_rows + 1, dtype=np.int64)
1193
np.cumsum(array_lengths, out=offsets[1:])
1194
num_values = offsets[-1]
1195
values = rng.uniform(-1.0e6, 1.0e6, num_values)
1196
1197
if item_nullable:
1198
element_validity_mask = rng.integers(0, 2, num_values).astype(np.bool_)
1199
else:
1200
element_validity_mask = None
1201
1202
schema = pa.schema(
1203
[
1204
pa.field(
1205
"floats",
1206
type=pa.list_(pa.field("", pa.float32(), nullable=item_nullable)),
1207
nullable=rows_nullable,
1208
),
1209
pa.field(
1210
"doubles",
1211
type=pa.list_(pa.field("", pa.float64(), nullable=item_nullable)),
1212
nullable=rows_nullable,
1213
),
1214
]
1215
)
1216
arrays = [
1217
pa.ListArray.from_arrays(
1218
pa.array(offsets),
1219
pa.array(values, type=field.type.field(0).type, mask=element_validity_mask),
1220
mask=row_validity_mask,
1221
)
1222
for field in schema
1223
]
1224
table = pa.Table.from_arrays(arrays, schema=schema)
1225
df = cast(pl.DataFrame, pl.from_arrow(table))
1226
1227
f = io.BytesIO()
1228
pq.write_table(
1229
table, f, compression="snappy", use_dictionary=False, use_byte_stream_split=True
1230
)
1231
1232
f.seek(0)
1233
read = pl.read_parquet(f)
1234
1235
assert_frame_equal(read, df)
1236
1237
1238
def test_parquet_nested_null_array_17795() -> None:
1239
f = io.BytesIO()
1240
pl.DataFrame([{"struct": {"field": None}}]).write_parquet(f)
1241
f.seek(0)
1242
pq.read_table(f)
1243
1244
1245
def test_parquet_record_batches_pyarrow_fixed_size_list_16614() -> None:
1246
# @NOTE:
1247
# The minimum that I could get it to crash which was ~132000, but let's
1248
# just do 150000 to be sure.
1249
n = 150000
1250
x = pl.DataFrame(
1251
{"x": np.linspace((1, 2), (2 * n, 2 * n * 1), n, dtype=np.float32)},
1252
schema={"x": pl.Array(pl.Float32, 2)},
1253
)
1254
1255
f = io.BytesIO()
1256
x.write_parquet(f)
1257
f.seek(0)
1258
b = pl.read_parquet(f, use_pyarrow=True)
1259
1260
assert b["x"].shape[0] == n
1261
assert_frame_equal(b, x)
1262
1263
1264
def test_parquet_list_element_field_name() -> None:
1265
f = io.BytesIO()
1266
(
1267
pl.DataFrame(
1268
{
1269
"a": [[1, 2], [1, 1, 1]],
1270
},
1271
schema={"a": pl.List(pl.Int64)},
1272
).write_parquet(f, use_pyarrow=False)
1273
)
1274
1275
f.seek(0)
1276
schema_str = str(pq.read_schema(f))
1277
assert "<element: int64>" in schema_str
1278
assert "child 0, element: int64" in schema_str
1279
1280
1281
def test_nested_decimal() -> None:
1282
df = pl.DataFrame(
1283
{
1284
"a": [
1285
{"f0": None},
1286
None,
1287
]
1288
},
1289
schema={"a": pl.Struct({"f0": pl.Decimal(precision=38, scale=8)})},
1290
)
1291
test_round_trip(df)
1292
1293
1294
def test_nested_non_uniform_primitive() -> None:
1295
df = pl.DataFrame(
1296
{"a": [{"x": 0, "y": None}]},
1297
schema={
1298
"a": pl.Struct(
1299
{
1300
"x": pl.Int16,
1301
"y": pl.Int64,
1302
}
1303
)
1304
},
1305
)
1306
test_round_trip(df)
1307
1308
1309
def test_parquet_nested_struct_17933() -> None:
1310
df = pl.DataFrame(
1311
{"a": [{"x": {"u": None}, "y": True}]},
1312
schema={
1313
"a": pl.Struct(
1314
{
1315
"x": pl.Struct({"u": pl.String}),
1316
"y": pl.Boolean(),
1317
}
1318
)
1319
},
1320
)
1321
test_round_trip(df)
1322
1323
1324
# This is fixed with POLARS_FORCE_MULTISCAN=1. Without it we have
1325
# first_metadata.unwrap() on None.
1326
@pytest.mark.may_fail_auto_streaming
1327
def test_parquet_pyarrow_map() -> None:
1328
xs = [
1329
[
1330
(0, 5),
1331
(1, 10),
1332
(2, 19),
1333
(3, 96),
1334
]
1335
]
1336
1337
table = pa.table(
1338
[xs],
1339
schema=pa.schema(
1340
[
1341
("x", pa.map_(pa.int32(), pa.int32(), keys_sorted=True)),
1342
]
1343
),
1344
)
1345
1346
f = io.BytesIO()
1347
pq.write_table(table, f)
1348
1349
expected = pl.DataFrame(
1350
{
1351
"x": [
1352
{"key": 0, "value": 5},
1353
{"key": 1, "value": 10},
1354
{"key": 2, "value": 19},
1355
{"key": 3, "value": 96},
1356
]
1357
},
1358
schema={"x": pl.Struct({"key": pl.Int32, "value": pl.Int32})},
1359
)
1360
f.seek(0)
1361
assert_frame_equal(pl.read_parquet(f).explode(["x"]), expected)
1362
1363
# Test for https://github.com/pola-rs/polars/issues/21317
1364
# Specifying schema/allow_missing_columns
1365
for missing_columns in ["insert", "raise"]:
1366
f.seek(0)
1367
assert_frame_equal(
1368
pl.read_parquet(
1369
f,
1370
schema={"x": pl.List(pl.Struct({"key": pl.Int32, "value": pl.Int32}))},
1371
missing_columns=missing_columns, # type: ignore[arg-type]
1372
).explode(["x"]),
1373
expected,
1374
)
1375
1376
1377
@pytest.mark.parametrize(
1378
("s", "elem"),
1379
[
1380
(pl.Series(["", "hello", "hi", ""], dtype=pl.String), ""),
1381
(pl.Series([0, 1, 2, 0], dtype=pl.Int64), 0),
1382
(pl.Series([[0], [1], [2], [0]], dtype=pl.Array(pl.Int64, 1)), [0]),
1383
(
1384
pl.Series([[0, 1], [1, 2], [2, 3], [0, 1]], dtype=pl.Array(pl.Int64, 2)),
1385
[0, 1],
1386
),
1387
],
1388
)
1389
def test_parquet_high_nested_null_17805(
1390
s: pl.Series, elem: str | int | list[int]
1391
) -> None:
1392
test_round_trip(
1393
pl.DataFrame({"a": s}).select(
1394
pl.when(pl.col("a") == elem)
1395
.then(pl.lit(None))
1396
.otherwise(pl.concat_list(pl.col("a").alias("b")))
1397
.alias("c")
1398
)
1399
)
1400
1401
1402
def test_struct_plain_encoded_statistics() -> None:
1403
df = pl.DataFrame(
1404
{
1405
"a": [None, None, None, None, {"x": None, "y": 0}],
1406
},
1407
schema={"a": pl.Struct({"x": pl.Int8, "y": pl.Int8})},
1408
)
1409
1410
test_scan_round_trip(df)
1411
1412
1413
@given(
1414
df=dataframes(
1415
min_size=5,
1416
excluded_dtypes=[pl.Decimal, pl.Categorical],
1417
allow_masked_out=False, # PyArrow does not support this
1418
)
1419
)
1420
def test_scan_round_trip_parametric(df: pl.DataFrame) -> None:
1421
test_scan_round_trip(df)
1422
1423
1424
def test_empty_rg_no_dict_page_18146() -> None:
1425
df = pl.DataFrame(
1426
{
1427
"a": [],
1428
},
1429
schema={"a": pl.String},
1430
)
1431
1432
f = io.BytesIO()
1433
pq.write_table(df.to_arrow(), f, compression="NONE", use_dictionary=False)
1434
f.seek(0)
1435
assert_frame_equal(pl.read_parquet(f), df)
1436
1437
1438
def test_write_sliced_lists_18069() -> None:
1439
f = io.BytesIO()
1440
a = pl.Series(3 * [None, ["$"] * 3], dtype=pl.List(pl.String))
1441
1442
before = pl.DataFrame({"a": a}).slice(4, 2)
1443
before.write_parquet(f)
1444
1445
f.seek(0)
1446
after = pl.read_parquet(f)
1447
1448
assert_frame_equal(before, after)
1449
1450
1451
def test_null_array_dict_pages_18085() -> None:
1452
test = pd.DataFrame(
1453
[
1454
{"A": float("NaN"), "B": 3, "C": None},
1455
{"A": float("NaN"), "B": None, "C": None},
1456
]
1457
)
1458
1459
f = io.BytesIO()
1460
test.to_parquet(f)
1461
f.seek(0)
1462
pl.read_parquet(f)
1463
1464
1465
@given(
1466
df=dataframes(
1467
min_size=1,
1468
max_size=1000,
1469
allowed_dtypes=[
1470
pl.List,
1471
pl.Int8,
1472
pl.Int16,
1473
pl.Int32,
1474
pl.Int64,
1475
pl.UInt8,
1476
pl.UInt16,
1477
pl.UInt32,
1478
pl.UInt64,
1479
],
1480
allow_masked_out=False, # PyArrow does not support this
1481
),
1482
row_group_size=st.integers(min_value=10, max_value=1000),
1483
)
1484
def test_delta_encoding_roundtrip(df: pl.DataFrame, row_group_size: int) -> None:
1485
f = io.BytesIO()
1486
pq.write_table(
1487
df.to_arrow(),
1488
f,
1489
compression="NONE",
1490
use_dictionary=False,
1491
column_encoding="DELTA_BINARY_PACKED",
1492
write_statistics=False,
1493
row_group_size=row_group_size,
1494
)
1495
1496
f.seek(0)
1497
assert_frame_equal(pl.read_parquet(f), df)
1498
1499
1500
@given(
1501
df=dataframes(min_size=1, max_size=1000, allowed_dtypes=[pl.String, pl.Binary]),
1502
row_group_size=st.integers(min_value=10, max_value=1000),
1503
)
1504
def test_delta_length_byte_array_encoding_roundtrip(
1505
df: pl.DataFrame, row_group_size: int
1506
) -> None:
1507
f = io.BytesIO()
1508
pq.write_table(
1509
df.to_arrow(),
1510
f,
1511
compression="NONE",
1512
use_dictionary=False,
1513
column_encoding="DELTA_LENGTH_BYTE_ARRAY",
1514
write_statistics=False,
1515
row_group_size=row_group_size,
1516
)
1517
1518
f.seek(0)
1519
assert_frame_equal(pl.read_parquet(f), df)
1520
1521
1522
@given(
1523
df=dataframes(min_size=1, max_size=1000, allowed_dtypes=[pl.String, pl.Binary]),
1524
row_group_size=st.integers(min_value=10, max_value=1000),
1525
)
1526
def test_delta_strings_encoding_roundtrip(
1527
df: pl.DataFrame, row_group_size: int
1528
) -> None:
1529
f = io.BytesIO()
1530
pq.write_table(
1531
df.to_arrow(),
1532
f,
1533
compression="NONE",
1534
use_dictionary=False,
1535
column_encoding="DELTA_BYTE_ARRAY",
1536
write_statistics=False,
1537
row_group_size=row_group_size,
1538
)
1539
1540
f.seek(0)
1541
assert_frame_equal(pl.read_parquet(f), df)
1542
1543
1544
EQUALITY_OPERATORS = ["__eq__", "__lt__", "__le__", "__gt__", "__ge__"]
1545
BOOLEAN_OPERATORS = ["__or__", "__and__"]
1546
1547
1548
@given(
1549
df=dataframes(
1550
min_size=0, max_size=100, min_cols=2, max_cols=5, allowed_dtypes=[pl.Int32]
1551
),
1552
first_op=st.sampled_from(EQUALITY_OPERATORS),
1553
second_op=st.sampled_from(
1554
[None]
1555
+ [
1556
(booljoin, eq)
1557
for booljoin in BOOLEAN_OPERATORS
1558
for eq in EQUALITY_OPERATORS
1559
]
1560
),
1561
l1=st.integers(min_value=0, max_value=1000),
1562
l2=st.integers(min_value=0, max_value=1000),
1563
r1=st.integers(min_value=0, max_value=1000),
1564
r2=st.integers(min_value=0, max_value=1000),
1565
)
1566
@pytest.mark.parametrize("parallel_st", ["auto", "prefiltered"])
1567
def test_predicate_filtering(
1568
df: pl.DataFrame,
1569
first_op: str,
1570
second_op: None | tuple[str, str],
1571
l1: int,
1572
l2: int,
1573
r1: int,
1574
r2: int,
1575
parallel_st: Literal["auto", "prefiltered"],
1576
) -> None:
1577
f = io.BytesIO()
1578
df.write_parquet(f, row_group_size=5)
1579
1580
cols = df.columns
1581
1582
l1s = cols[l1 % len(cols)]
1583
l2s = cols[l2 % len(cols)]
1584
expr = (getattr(pl.col(l1s), first_op))(pl.col(l2s))
1585
1586
if second_op is not None:
1587
r1s = cols[r1 % len(cols)]
1588
r2s = cols[r2 % len(cols)]
1589
expr = getattr(expr, second_op[0])(
1590
(getattr(pl.col(r1s), second_op[1]))(pl.col(r2s))
1591
)
1592
1593
f.seek(0)
1594
result = pl.scan_parquet(f, parallel=parallel_st).filter(expr).collect()
1595
assert_frame_equal(result, df.filter(expr))
1596
1597
1598
@pytest.mark.parametrize(
1599
"use_dictionary",
1600
[False, True],
1601
)
1602
@pytest.mark.parametrize(
1603
"data_page_size",
1604
[1, None],
1605
)
1606
@given(
1607
s=series(
1608
min_size=1,
1609
max_size=10,
1610
excluded_dtypes=[
1611
pl.Decimal,
1612
pl.Categorical,
1613
pl.Enum,
1614
pl.Struct, # See #19612.
1615
],
1616
allow_masked_out=False, # PyArrow does not support this
1617
),
1618
offset=st.integers(0, 10),
1619
length=st.integers(0, 10),
1620
)
1621
def test_pyarrow_slice_roundtrip(
1622
s: pl.Series,
1623
use_dictionary: bool,
1624
data_page_size: int | None,
1625
offset: int,
1626
length: int,
1627
) -> None:
1628
offset %= len(s) + 1
1629
length %= len(s) - offset + 1
1630
1631
f = io.BytesIO()
1632
df = s.to_frame()
1633
pq.write_table(
1634
df.to_arrow(),
1635
f,
1636
compression="NONE",
1637
use_dictionary=use_dictionary,
1638
data_page_size=data_page_size,
1639
)
1640
1641
f.seek(0)
1642
scanned = pl.scan_parquet(f).slice(offset, length).collect()
1643
assert_frame_equal(scanned, df.slice(offset, length))
1644
1645
1646
@given(
1647
df=dataframes(
1648
min_size=1,
1649
max_size=5,
1650
min_cols=1,
1651
max_cols=1,
1652
excluded_dtypes=[pl.Decimal, pl.Categorical, pl.Enum],
1653
),
1654
offset=st.integers(0, 100),
1655
length=st.integers(0, 100),
1656
)
1657
def test_slice_roundtrip(df: pl.DataFrame, offset: int, length: int) -> None:
1658
offset %= df.height + 1
1659
length %= df.height - offset + 1
1660
1661
f = io.BytesIO()
1662
df.write_parquet(f)
1663
1664
f.seek(0)
1665
scanned = pl.scan_parquet(f).slice(offset, length).collect()
1666
assert_frame_equal(scanned, df.slice(offset, length))
1667
1668
1669
def test_struct_prefiltered() -> None:
1670
df = pl.DataFrame({"a": {"x": 1, "y": 2}})
1671
f = io.BytesIO()
1672
df.write_parquet(f)
1673
1674
f.seek(0)
1675
(
1676
pl.scan_parquet(f, parallel="prefiltered")
1677
.filter(pl.col("a").struct.field("x") == 1)
1678
.collect()
1679
)
1680
1681
1682
@pytest.mark.parametrize(
1683
"data",
1684
[
1685
(
1686
[{"x": ""}, {"x": "0"}],
1687
pa.struct([pa.field("x", pa.string(), nullable=True)]),
1688
),
1689
(
1690
[{"x": ""}, {"x": "0"}],
1691
pa.struct([pa.field("x", pa.string(), nullable=False)]),
1692
),
1693
([[""], ["0"]], pa.list_(pa.field("item", pa.string(), nullable=False))),
1694
([[""], ["0"]], pa.list_(pa.field("item", pa.string(), nullable=True))),
1695
([[""], ["0"]], pa.list_(pa.field("item", pa.string(), nullable=False), 1)),
1696
([[""], ["0"]], pa.list_(pa.field("item", pa.string(), nullable=True), 1)),
1697
(
1698
[["", "1"], ["0", "2"]],
1699
pa.list_(pa.field("item", pa.string(), nullable=False), 2),
1700
),
1701
(
1702
[["", "1"], ["0", "2"]],
1703
pa.list_(pa.field("item", pa.string(), nullable=True), 2),
1704
),
1705
],
1706
)
1707
@pytest.mark.parametrize("nullable", [False, True])
1708
def test_nested_skip_18303(
1709
data: tuple[list[dict[str, str] | list[str]], pa.DataType],
1710
nullable: bool,
1711
) -> None:
1712
schema = pa.schema([pa.field("a", data[1], nullable=nullable)])
1713
tb = pa.table({"a": data[0]}, schema=schema)
1714
1715
f = io.BytesIO()
1716
pq.write_table(tb, f)
1717
1718
f.seek(0)
1719
scanned = pl.scan_parquet(f).slice(1, 1).collect()
1720
1721
assert_frame_equal(scanned, pl.DataFrame(tb).slice(1, 1))
1722
1723
1724
def test_nested_span_multiple_pages_18400() -> None:
1725
width = 4100
1726
df = pl.DataFrame(
1727
[
1728
pl.Series(
1729
"a",
1730
[
1731
list(range(width)),
1732
list(range(width)),
1733
],
1734
pl.Array(pl.Int64, width),
1735
),
1736
]
1737
)
1738
1739
f = io.BytesIO()
1740
pq.write_table(
1741
df.to_arrow(),
1742
f,
1743
use_dictionary=False,
1744
data_page_size=1024,
1745
column_encoding={"a": "PLAIN"},
1746
)
1747
1748
f.seek(0)
1749
assert_frame_equal(df.head(1), pl.read_parquet(f, n_rows=1))
1750
1751
1752
@given(
1753
df=dataframes(
1754
min_size=0,
1755
max_size=1000,
1756
min_cols=2,
1757
max_cols=5,
1758
excluded_dtypes=[pl.Decimal, pl.Categorical, pl.Enum, pl.Array],
1759
include_cols=[column("filter_col", pl.Boolean, allow_null=False)],
1760
),
1761
)
1762
def test_parametric_small_page_mask_filtering(df: pl.DataFrame) -> None:
1763
f = io.BytesIO()
1764
df.write_parquet(f, data_page_size=1024)
1765
1766
expr = pl.col("filter_col")
1767
f.seek(0)
1768
result = pl.scan_parquet(f, parallel="prefiltered").filter(expr).collect()
1769
assert_frame_equal(result, df.filter(expr))
1770
1771
1772
@pytest.mark.parametrize(
1773
"value",
1774
[
1775
"abcd",
1776
0,
1777
0.0,
1778
False,
1779
],
1780
)
1781
def test_different_page_validity_across_pages(value: str | int | float | bool) -> None:
1782
df = pl.DataFrame(
1783
{
1784
"a": [None] + [value] * 4000,
1785
}
1786
)
1787
1788
f = io.BytesIO()
1789
pq.write_table(
1790
df.to_arrow(),
1791
f,
1792
use_dictionary=False,
1793
data_page_size=1024,
1794
column_encoding={"a": "PLAIN"},
1795
)
1796
1797
f.seek(0)
1798
assert_frame_equal(df, pl.read_parquet(f))
1799
1800
1801
@given(
1802
df=dataframes(
1803
min_size=0,
1804
max_size=100,
1805
min_cols=2,
1806
max_cols=5,
1807
allowed_dtypes=[pl.String, pl.Binary],
1808
include_cols=[
1809
column("filter_col", pl.Int8, st.integers(0, 1), allow_null=False)
1810
],
1811
),
1812
)
1813
def test_delta_length_byte_array_prefiltering(df: pl.DataFrame) -> None:
1814
cols = df.columns
1815
1816
encodings = dict.fromkeys(cols, "DELTA_LENGTH_BYTE_ARRAY")
1817
encodings["filter_col"] = "PLAIN"
1818
1819
f = io.BytesIO()
1820
pq.write_table(
1821
df.to_arrow(),
1822
f,
1823
use_dictionary=False,
1824
column_encoding=encodings,
1825
)
1826
1827
f.seek(0)
1828
expr = pl.col("filter_col") == 0
1829
result = pl.scan_parquet(f, parallel="prefiltered").filter(expr).collect()
1830
assert_frame_equal(result, df.filter(expr))
1831
1832
1833
@given(
1834
df=dataframes(
1835
min_size=0,
1836
max_size=10,
1837
min_cols=1,
1838
max_cols=5,
1839
excluded_dtypes=[pl.Decimal, pl.Categorical, pl.Enum],
1840
include_cols=[
1841
column("filter_col", pl.Int8, st.integers(0, 1), allow_null=False)
1842
],
1843
),
1844
)
1845
def test_general_prefiltering(df: pl.DataFrame) -> None:
1846
f = io.BytesIO()
1847
df.write_parquet(f)
1848
1849
expr = pl.col("filter_col") == 0
1850
1851
f.seek(0)
1852
result = pl.scan_parquet(f, parallel="prefiltered").filter(expr).collect()
1853
assert_frame_equal(result, df.filter(expr))
1854
1855
1856
@given(
1857
df=dataframes(
1858
min_size=0,
1859
max_size=10,
1860
min_cols=1,
1861
max_cols=5,
1862
excluded_dtypes=[pl.Decimal, pl.Categorical, pl.Enum],
1863
include_cols=[column("filter_col", pl.Boolean, allow_null=False)],
1864
),
1865
)
1866
def test_row_index_prefiltering(df: pl.DataFrame) -> None:
1867
f = io.BytesIO()
1868
df.write_parquet(f)
1869
1870
expr = pl.col("filter_col")
1871
1872
f.seek(0)
1873
result = (
1874
pl.scan_parquet(
1875
f, row_index_name="ri", row_index_offset=42, parallel="prefiltered"
1876
)
1877
.filter(expr)
1878
.collect()
1879
)
1880
assert_frame_equal(result, df.with_row_index("ri", 42).filter(expr))
1881
1882
1883
def test_empty_parquet() -> None:
1884
f_pd = io.BytesIO()
1885
f_pl = io.BytesIO()
1886
1887
pd.DataFrame().to_parquet(f_pd)
1888
pl.DataFrame().write_parquet(f_pl)
1889
1890
f_pd.seek(0)
1891
f_pl.seek(0)
1892
1893
empty_from_pd = pl.read_parquet(f_pd)
1894
assert empty_from_pd.shape == (0, 0)
1895
1896
empty_from_pl = pl.read_parquet(f_pl)
1897
assert empty_from_pl.shape == (0, 0)
1898
1899
1900
@pytest.mark.parametrize(
1901
"strategy",
1902
["columns", "row_groups", "prefiltered"],
1903
)
1904
@pytest.mark.write_disk
1905
def test_row_index_projection_pushdown_18463(
1906
tmp_path: Path, strategy: ParallelStrategy
1907
) -> None:
1908
tmp_path.mkdir(exist_ok=True)
1909
f = tmp_path / "test.parquet"
1910
1911
pl.DataFrame({"A": [1, 4], "B": [2, 5]}).write_parquet(f)
1912
1913
df = pl.scan_parquet(f, parallel=strategy).with_row_index()
1914
1915
assert_frame_equal(df.select("index").collect(), df.collect().select("index"))
1916
1917
df = pl.scan_parquet(f, parallel=strategy).with_row_index("other_idx_name")
1918
1919
assert_frame_equal(
1920
df.select("other_idx_name").collect(), df.collect().select("other_idx_name")
1921
)
1922
1923
df = pl.scan_parquet(f, parallel=strategy).with_row_index(offset=42)
1924
1925
assert_frame_equal(df.select("index").collect(), df.collect().select("index"))
1926
1927
df = pl.scan_parquet(f, parallel=strategy).with_row_index()
1928
1929
assert_frame_equal(
1930
df.select("index").slice(1, 1).collect(),
1931
df.collect().select("index").slice(1, 1),
1932
)
1933
1934
1935
@pytest.mark.write_disk
1936
def test_write_binary_open_file(tmp_path: Path) -> None:
1937
df = pl.DataFrame({"a": [1, 2, 3]})
1938
1939
path = tmp_path / "test.parquet"
1940
1941
with path.open("wb") as f_write:
1942
df.write_parquet(f_write)
1943
1944
out = pl.read_parquet(path)
1945
assert_frame_equal(out, df)
1946
1947
1948
def test_prefilter_with_projection() -> None:
1949
f = io.BytesIO()
1950
pl.DataFrame({"a": [1], "b": [2]}).write_parquet(f)
1951
1952
f.seek(0)
1953
(
1954
pl.scan_parquet(f, parallel="prefiltered")
1955
.filter(pl.col.a == 1)
1956
.select(pl.col.a)
1957
.collect()
1958
)
1959
1960
1961
@pytest.mark.parametrize("parallel_strategy", ["prefiltered", "row_groups"])
1962
@pytest.mark.parametrize(
1963
"df",
1964
[
1965
pl.DataFrame({"x": 1, "y": 1}),
1966
pl.DataFrame({"x": 1, "b": 1, "y": 1}), # hive columns in file
1967
],
1968
)
1969
@pytest.mark.write_disk
1970
def test_prefilter_with_hive_19766(
1971
tmp_path: Path, df: pl.DataFrame, parallel_strategy: str
1972
) -> None:
1973
tmp_path.mkdir(exist_ok=True)
1974
(tmp_path / "a=1/b=1").mkdir(exist_ok=True, parents=True)
1975
1976
df.write_parquet(tmp_path / "a=1/b=1/1")
1977
expect = df.with_columns(a=pl.lit(1, dtype=pl.Int64), b=pl.lit(1, dtype=pl.Int64))
1978
1979
lf = pl.scan_parquet(tmp_path, parallel=parallel_strategy) # type: ignore[arg-type]
1980
1981
for predicate in [
1982
pl.col("a") == 1,
1983
pl.col("x") == 1,
1984
(pl.col("a") == 1) & (pl.col("x") == 1),
1985
pl.col("b") == 1,
1986
pl.col("y") == 1,
1987
(pl.col("a") == 1) & (pl.col("b") == 1),
1988
]:
1989
assert_frame_equal(
1990
lf.filter(predicate).collect(),
1991
expect,
1992
)
1993
1994
1995
@pytest.mark.parametrize("parallel", ["columns", "row_groups", "prefiltered", "none"])
1996
@pytest.mark.parametrize("streaming", [True, False])
1997
@pytest.mark.parametrize("projection", [pl.all(), pl.col("b")])
1998
@pytest.mark.write_disk
1999
def test_allow_missing_columns(
2000
tmp_path: Path,
2001
parallel: str,
2002
streaming: bool,
2003
projection: pl.Expr,
2004
) -> None:
2005
tmp_path.mkdir(exist_ok=True)
2006
dfs = [pl.DataFrame({"a": 1, "b": 1}), pl.DataFrame({"a": 2})]
2007
paths = [tmp_path / "1", tmp_path / "2"]
2008
2009
for df, path in zip(dfs, paths):
2010
df.write_parquet(path)
2011
2012
expected_full = pl.DataFrame({"a": [1, 2], "b": [1, None]})
2013
expected = expected_full.select(projection)
2014
2015
with pytest.raises(
2016
pl.exceptions.ColumnNotFoundError,
2017
match="passing `missing_columns='insert'`",
2018
):
2019
pl.read_parquet(paths, parallel=parallel) # type: ignore[arg-type]
2020
2021
with pytest.raises(
2022
pl.exceptions.ColumnNotFoundError,
2023
match="passing `missing_columns='insert'`",
2024
):
2025
pl.scan_parquet(paths, parallel=parallel).select(projection).collect( # type: ignore[arg-type]
2026
engine="streaming" if streaming else "in-memory"
2027
)
2028
2029
assert_frame_equal(
2030
pl.read_parquet(
2031
paths,
2032
parallel=parallel, # type: ignore[arg-type]
2033
missing_columns="insert",
2034
).select(projection),
2035
expected,
2036
)
2037
2038
assert_frame_equal(
2039
pl.scan_parquet(paths, parallel=parallel, missing_columns="insert") # type: ignore[arg-type]
2040
.select(projection)
2041
.collect(engine="streaming" if streaming else "in-memory"),
2042
expected,
2043
)
2044
2045
# Test deprecated parameter
2046
2047
with warnings.catch_warnings():
2048
warnings.simplefilter("ignore", DeprecationWarning)
2049
2050
with pytest.raises(
2051
pl.exceptions.ColumnNotFoundError,
2052
match="passing `missing_columns='insert'`",
2053
):
2054
assert_frame_equal(
2055
pl.scan_parquet(
2056
paths,
2057
parallel=parallel, # type: ignore[arg-type]
2058
allow_missing_columns=False,
2059
).collect(engine="streaming" if streaming else "in-memory"),
2060
expected_full,
2061
)
2062
2063
assert_frame_equal(
2064
pl.scan_parquet(
2065
paths,
2066
parallel=parallel, # type: ignore[arg-type]
2067
allow_missing_columns=True,
2068
).collect(engine="streaming" if streaming else "in-memory"),
2069
expected_full,
2070
)
2071
2072
2073
def test_nested_nonnullable_19158() -> None:
2074
# Bug is based on the top-level struct being nullable and the inner list
2075
# not being nullable.
2076
tbl = pa.table(
2077
{
2078
"a": [{"x": [1]}, None, {"x": [1, 2]}, None],
2079
},
2080
schema=pa.schema(
2081
[
2082
pa.field(
2083
"a",
2084
pa.struct([pa.field("x", pa.list_(pa.int8()), nullable=False)]),
2085
nullable=True,
2086
)
2087
]
2088
),
2089
)
2090
2091
f = io.BytesIO()
2092
pq.write_table(tbl, f)
2093
2094
f.seek(0)
2095
assert_frame_equal(pl.read_parquet(f), pl.DataFrame(tbl))
2096
2097
2098
D = Decimal
2099
2100
2101
@pytest.mark.parametrize("precision", range(1, 37, 2))
2102
@pytest.mark.parametrize(
2103
"nesting",
2104
[
2105
# Struct
2106
lambda t: ([{"x": None}, None], pl.Struct({"x": t})),
2107
lambda t: ([None, {"x": None}], pl.Struct({"x": t})),
2108
lambda t: ([{"x": D("1.5")}, None], pl.Struct({"x": t})),
2109
lambda t: ([{"x": D("1.5")}, {"x": D("4.8")}], pl.Struct({"x": t})),
2110
# Array
2111
lambda t: ([[None, None, D("8.2")], None], pl.Array(t, 3)),
2112
lambda t: ([None, [None, D("8.9"), None]], pl.Array(t, 3)),
2113
lambda t: ([[D("1.5"), D("3.7"), D("4.1")], None], pl.Array(t, 3)),
2114
lambda t: (
2115
[[D("1.5"), D("3.7"), D("4.1")], [D("2.8"), D("5.2"), D("8.9")]],
2116
pl.Array(t, 3),
2117
),
2118
# List
2119
lambda t: ([[None, D("8.2")], None], pl.List(t)),
2120
lambda t: ([None, [D("8.9"), None]], pl.List(t)),
2121
lambda t: ([[D("1.5"), D("4.1")], None], pl.List(t)),
2122
lambda t: ([[D("1.5"), D("3.7"), D("4.1")], [D("2.8"), D("8.9")]], pl.List(t)),
2123
],
2124
)
2125
def test_decimal_precision_nested_roundtrip(
2126
nesting: Callable[[pl.DataType], tuple[list[Any], pl.DataType]],
2127
precision: int,
2128
) -> None:
2129
# Limit the context as to not disturb any other tests
2130
with decimal.localcontext() as ctx:
2131
ctx.prec = precision
2132
2133
decimal_dtype = pl.Decimal(precision=precision)
2134
values, dtype = nesting(decimal_dtype)
2135
2136
df = pl.Series("a", values, dtype).to_frame()
2137
2138
test_round_trip(df)
2139
2140
2141
@pytest.mark.may_fail_cloud # reason: sortedness flag
2142
@pytest.mark.parametrize("parallel", ["prefiltered", "columns", "row_groups", "auto"])
2143
def test_conserve_sortedness(
2144
monkeypatch: Any, capfd: pytest.CaptureFixture[str], parallel: ParallelStrategy
2145
) -> None:
2146
f = io.BytesIO()
2147
2148
df = pl.DataFrame(
2149
{
2150
"a": [1, 2, 3, 4, 5, None],
2151
"b": [1.0, 2.0, 3.0, 4.0, 5.0, None],
2152
"c": [None, 5, 4, 3, 2, 1],
2153
"d": [None, 5.0, 4.0, 3.0, 2.0, 1.0],
2154
"a_nosort": [1, 2, 3, 4, 5, None],
2155
"f": range(6),
2156
}
2157
)
2158
2159
for col, descending, nulls_last in [("a", False, False), ("c", True, True)]:
2160
col_idx = df.get_column_index(col)
2161
f.seek(0)
2162
pq.write_table(
2163
df.to_arrow(),
2164
f,
2165
sorting_columns=[
2166
pq.SortingColumn(col_idx, descending, nulls_last),
2167
],
2168
)
2169
f.truncate()
2170
f.seek(0)
2171
2172
monkeypatch.setenv("POLARS_VERBOSE", "1")
2173
2174
df = pl.scan_parquet(f, parallel=parallel).filter(pl.col.f > 1).collect()
2175
2176
captured = capfd.readouterr().err
2177
2178
# @NOTE: We don't conserve sortedness for anything except integers at the
2179
# moment.
2180
assert (
2181
captured.count("Parquet conserved SortingColumn for column chunk of") == 1
2182
)
2183
assert (
2184
f"Parquet conserved SortingColumn for column chunk of '{col}' to {'Descending' if descending else 'Ascending'}"
2185
in captured
2186
)
2187
2188
2189
@pytest.mark.parametrize("use_dictionary", [True, False])
2190
@pytest.mark.parametrize(
2191
"values",
2192
[
2193
(size, x)
2194
for size in [1, 2, 3, 4, 8, 12, 15, 16, 32]
2195
for x in [
2196
[list(range(size)), list(range(7, 7 + size))],
2197
[list(range(size)), None],
2198
[list(range(i, i + size)) for i in range(13)],
2199
[list(range(i, i + size)) if i % 3 < 2 else None for i in range(13)],
2200
]
2201
],
2202
)
2203
@pytest.mark.parametrize(
2204
"filt",
2205
[
2206
lambda _: None,
2207
lambda _: pl.col.f > 0,
2208
lambda _: pl.col.f > 1,
2209
lambda _: pl.col.f < 5,
2210
lambda _: pl.col.f % 2 == 0,
2211
lambda _: pl.col.f % 5 < 4,
2212
lambda values: (0, min(1, len(values))),
2213
lambda _: (1, 1),
2214
lambda _: (-2, 1),
2215
lambda values: (1, len(values) - 2),
2216
],
2217
)
2218
def test_fixed_size_binary(
2219
use_dictionary: bool,
2220
values: tuple[int, list[None | list[int]]],
2221
filt: Callable[[list[None | list[int]]], None | pl.Expr | tuple[int, int]],
2222
) -> None:
2223
size, elems = values
2224
bs = [bytes(v) if v is not None else None for v in elems]
2225
2226
tbl = pa.table(
2227
{
2228
"a": bs,
2229
"f": range(len(bs)),
2230
},
2231
schema=pa.schema(
2232
[
2233
pa.field("a", pa.binary(length=size), nullable=True),
2234
pa.field("f", pa.int32(), nullable=True),
2235
]
2236
),
2237
)
2238
2239
df = pl.DataFrame(tbl)
2240
2241
f = io.BytesIO()
2242
pq.write_table(tbl, f, use_dictionary=use_dictionary)
2243
2244
f.seek(0)
2245
2246
loaded: pl.DataFrame
2247
if isinstance(filt, pl.Expr):
2248
loaded = pl.scan_parquet(f).filter(filt).collect()
2249
df = df.filter(filt)
2250
elif isinstance(filt, tuple):
2251
loaded = pl.scan_parquet(f).slice(filt[0], filt[1]).collect()
2252
df = df.slice(filt[0], filt[1])
2253
else:
2254
loaded = pl.read_parquet(f)
2255
2256
assert_frame_equal(loaded, df)
2257
2258
2259
def test_decode_f16() -> None:
2260
values = [float("nan"), 0.0, 0.5, 1.0, 1.5]
2261
2262
table = pa.Table.from_pydict(
2263
{
2264
"x": pa.array(np.array(values, dtype=np.float16), type=pa.float16()),
2265
}
2266
)
2267
2268
df = pl.Series("x", values, pl.Float32).to_frame()
2269
2270
f = io.BytesIO()
2271
pq.write_table(table, f)
2272
2273
f.seek(0)
2274
assert_frame_equal(pl.read_parquet(f), df)
2275
2276
f.seek(0)
2277
assert_frame_equal(
2278
pl.scan_parquet(f).filter(pl.col.x > 0.5).collect(),
2279
df.filter(pl.col.x > 0.5),
2280
)
2281
2282
f.seek(0)
2283
assert_frame_equal(
2284
pl.scan_parquet(f).slice(1, 3).collect(),
2285
df.slice(1, 3),
2286
)
2287
2288
2289
def test_invalid_utf8_binary() -> None:
2290
a = pl.Series("a", [b"\x80"], pl.Binary).to_frame()
2291
f = io.BytesIO()
2292
2293
a.write_parquet(f)
2294
f.seek(0)
2295
out = pl.read_parquet(f)
2296
2297
assert_frame_equal(a, out)
2298
2299
2300
@pytest.mark.parametrize(
2301
"dtype",
2302
[
2303
pl.Null,
2304
pl.Int8,
2305
pl.Int32,
2306
pl.Datetime(),
2307
pl.String,
2308
pl.Binary,
2309
pl.Boolean,
2310
pl.Struct({"x": pl.Int32}),
2311
pl.List(pl.Int32),
2312
pytest.param(
2313
pl.Array(pl.Int32, 0), marks=pytest.mark.may_fail_cloud
2314
), # reason: zero-width array
2315
pl.Array(pl.Int32, 2),
2316
],
2317
)
2318
@pytest.mark.parametrize(
2319
"filt",
2320
[
2321
pl.col.f == 0,
2322
pl.col.f != 0,
2323
pl.col.f == 1,
2324
pl.col.f != 1,
2325
pl.col.f == 2,
2326
pl.col.f != 2,
2327
pl.col.f == 3,
2328
pl.col.f != 3,
2329
],
2330
)
2331
def test_filter_only_invalid(dtype: pl.DataType, filt: pl.Expr) -> None:
2332
df = pl.DataFrame(
2333
[
2334
pl.Series("a", [None, None, None], dtype),
2335
pl.Series("f", range(3), pl.Int32),
2336
]
2337
)
2338
2339
f = io.BytesIO()
2340
2341
df.write_parquet(f)
2342
f.seek(0)
2343
out = pl.scan_parquet(f, parallel="prefiltered").filter(filt).collect()
2344
2345
assert_frame_equal(df.filter(filt), out)
2346
2347
2348
def test_nested_nulls() -> None:
2349
df = pl.Series(
2350
"a",
2351
[
2352
[None, None],
2353
None,
2354
[None, 1],
2355
[None, None],
2356
[2, None],
2357
],
2358
pl.Array(pl.Int32, 2),
2359
).to_frame()
2360
2361
f = io.BytesIO()
2362
df.write_parquet(f)
2363
2364
f.seek(0)
2365
out = pl.read_parquet(f)
2366
assert_frame_equal(out, df)
2367
2368
2369
@pytest.mark.parametrize("content", [[], [None], [None, 0.0]])
2370
def test_nested_dicts(content: list[float | None]) -> None:
2371
df = pl.Series("a", [content], pl.List(pl.Float64)).to_frame()
2372
2373
f = io.BytesIO()
2374
df.write_parquet(f, use_pyarrow=True)
2375
f.seek(0)
2376
assert_frame_equal(df, pl.read_parquet(f))
2377
2378
2379
@pytest.mark.parametrize(
2380
"leading_nulls",
2381
[
2382
[],
2383
[None] * 7,
2384
],
2385
)
2386
@pytest.mark.parametrize(
2387
"trailing_nulls",
2388
[
2389
[],
2390
[None] * 7,
2391
],
2392
)
2393
@pytest.mark.parametrize(
2394
"first_chunk",
2395
# Create both RLE and Bitpacked chunks
2396
[
2397
[1] * 57,
2398
[1 if i % 7 < 3 and i % 5 > 3 else None for i in range(57)],
2399
list(range(57)),
2400
[i if i % 7 < 3 and i % 5 > 3 else None for i in range(57)],
2401
],
2402
)
2403
@pytest.mark.parametrize(
2404
"second_chunk",
2405
# Create both RLE and Bitpacked chunks
2406
[
2407
[2] * 57,
2408
[2 if i % 7 < 3 and i % 5 > 3 else None for i in range(57)],
2409
list(range(57)),
2410
[i if i % 7 < 3 and i % 5 > 3 else None for i in range(57)],
2411
],
2412
)
2413
@pytest.mark.slow
2414
def test_dict_slices(
2415
leading_nulls: list[None],
2416
trailing_nulls: list[None],
2417
first_chunk: list[None | int],
2418
second_chunk: list[None | int],
2419
) -> None:
2420
df = pl.Series(
2421
"a", leading_nulls + first_chunk + second_chunk + trailing_nulls, pl.Int64
2422
).to_frame()
2423
2424
f = io.BytesIO()
2425
df.write_parquet(f)
2426
2427
for offset in chain([0, 1, 2], range(3, df.height, 3)):
2428
for length in chain([df.height, 1, 2], range(3, df.height - offset, 3)):
2429
f.seek(0)
2430
assert_frame_equal(
2431
pl.scan_parquet(f).slice(offset, length).collect(),
2432
df.slice(offset, length),
2433
)
2434
2435
2436
@pytest.mark.parametrize(
2437
"mask",
2438
[
2439
[i % 13 < 3 and i % 17 > 3 for i in range(57 * 2)],
2440
[False] * 23 + [True] * 68 + [False] * 23,
2441
[False] * 23 + [True] * 24 + [False] * 20 + [True] * 24 + [False] * 23,
2442
[True] + [False] * 22 + [True] * 24 + [False] * 20 + [True] * 24 + [False] * 23,
2443
[False] * 23 + [True] * 24 + [False] * 20 + [True] * 24 + [False] * 22 + [True],
2444
[True]
2445
+ [False] * 22
2446
+ [True] * 24
2447
+ [False] * 20
2448
+ [True] * 24
2449
+ [False] * 22
2450
+ [True],
2451
[False] * 56 + [True] * 58,
2452
[False] * 57 + [True] * 57,
2453
[False] * 58 + [True] * 56,
2454
[True] * 56 + [False] * 58,
2455
[True] * 57 + [False] * 57,
2456
[True] * 58 + [False] * 56,
2457
],
2458
)
2459
@pytest.mark.parametrize(
2460
"first_chunk",
2461
# Create both RLE and Bitpacked chunks
2462
[
2463
[1] * 57,
2464
[1 if i % 7 < 3 and i % 5 > 3 else None for i in range(57)],
2465
list(range(57)),
2466
[i if i % 7 < 3 and i % 5 > 3 else None for i in range(57)],
2467
],
2468
)
2469
@pytest.mark.parametrize(
2470
"second_chunk",
2471
# Create both RLE and Bitpacked chunks
2472
[
2473
[2] * 57,
2474
[2 if i % 7 < 3 and i % 5 > 3 else None for i in range(57)],
2475
list(range(57)),
2476
[i if i % 7 < 3 and i % 5 > 3 else None for i in range(57)],
2477
],
2478
)
2479
def test_dict_masked(
2480
mask: list[bool],
2481
first_chunk: list[None | int],
2482
second_chunk: list[None | int],
2483
) -> None:
2484
df = pl.DataFrame(
2485
[
2486
pl.Series("a", first_chunk + second_chunk, pl.Int64),
2487
pl.Series("f", mask, pl.Boolean),
2488
]
2489
)
2490
2491
f = io.BytesIO()
2492
df.write_parquet(f)
2493
2494
f.seek(0)
2495
assert_frame_equal(
2496
pl.scan_parquet(f, parallel="prefiltered").filter(pl.col.f).collect(),
2497
df.filter(pl.col.f),
2498
)
2499
2500
2501
@pytest.mark.may_fail_auto_streaming
2502
def test_categorical_sliced_20017() -> None:
2503
f = io.BytesIO()
2504
df = (
2505
pl.Series("a", ["a", None])
2506
.to_frame()
2507
.with_columns(pl.col.a.cast(pl.Categorical))
2508
)
2509
df.write_parquet(f)
2510
2511
f.seek(0)
2512
assert_frame_equal(
2513
pl.read_parquet(f, n_rows=1),
2514
df.head(1),
2515
)
2516
2517
2518
@given(
2519
s=series(name="a", dtype=pl.String, min_size=7, max_size=7),
2520
mask=series(
2521
name="mask", dtype=pl.Boolean, min_size=7, max_size=7, allow_null=False
2522
),
2523
)
2524
def test_categorical_parametric_masked(s: pl.Series, mask: pl.Series) -> None:
2525
f = io.BytesIO()
2526
2527
df = pl.DataFrame([s, mask]).with_columns(pl.col.a.cast(pl.Categorical))
2528
df.write_parquet(f)
2529
2530
f.seek(0)
2531
assert_frame_equal(
2532
pl.scan_parquet(f, parallel="prefiltered").filter(pl.col.mask).collect(),
2533
df.filter(pl.col.mask),
2534
)
2535
2536
2537
@given(
2538
s=series(name="a", dtype=pl.String, min_size=7, max_size=7),
2539
start=st.integers(0, 6),
2540
length=st.integers(1, 7),
2541
)
2542
def test_categorical_parametric_sliced(s: pl.Series, start: int, length: int) -> None:
2543
length = min(7 - start, length)
2544
2545
f = io.BytesIO()
2546
2547
df = s.to_frame().with_columns(pl.col.a.cast(pl.Categorical))
2548
df.write_parquet(f)
2549
2550
f.seek(0)
2551
assert_frame_equal(
2552
pl.scan_parquet(f).slice(start, length).collect(),
2553
df.slice(start, length),
2554
)
2555
2556
2557
@pytest.mark.write_disk
2558
def test_prefilter_with_projection_column_order_20175(tmp_path: Path) -> None:
2559
path = tmp_path / "1"
2560
2561
pl.DataFrame({"a": 1, "b": 1, "c": 1, "d": 1, "e": 1}).write_parquet(path)
2562
2563
q = (
2564
pl.scan_parquet(path, parallel="prefiltered")
2565
.filter(pl.col("a") == 1)
2566
.select("a", "d", "c")
2567
)
2568
2569
assert_frame_equal(q.collect(), pl.DataFrame({"a": 1, "d": 1, "c": 1}))
2570
2571
f = io.BytesIO()
2572
2573
pl.read_csv(b"""\
2574
c0,c1,c2,c3,c4,c5,c6,c7,c8,c9,c10
2575
1,1,1,1,1,1,1,1,1,1,1
2576
1,1,1,1,1,1,1,1,1,1,1
2577
""").write_parquet(f)
2578
2579
f.seek(0)
2580
2581
q = (
2582
pl.scan_parquet(
2583
f,
2584
rechunk=True,
2585
parallel="prefiltered",
2586
)
2587
.filter(
2588
pl.col("c0") == 1,
2589
)
2590
.select("c0", "c9", "c3")
2591
)
2592
2593
assert_frame_equal(
2594
q.collect(),
2595
pl.read_csv(b"""\
2596
c0,c9,c3
2597
1,1,1
2598
1,1,1
2599
"""),
2600
)
2601
2602
2603
def test_utf8_verification_with_slice_20174() -> None:
2604
f = io.BytesIO()
2605
pq.write_table(
2606
pl.Series("s", ["a", "a" * 128]).to_frame().to_arrow(), f, use_dictionary=False
2607
)
2608
2609
f.seek(0)
2610
assert_frame_equal(
2611
pl.scan_parquet(f).head(1).collect(),
2612
pl.Series("s", ["a"]).to_frame(),
2613
)
2614
2615
2616
@pytest.mark.parametrize("parallel", ["prefiltered", "row_groups"])
2617
@pytest.mark.parametrize(
2618
"projection",
2619
[
2620
{"a": pl.Int64(), "b": pl.Int64()},
2621
{"b": pl.Int64(), "a": pl.Int64()},
2622
],
2623
)
2624
def test_parquet_prefiltered_unordered_projection_20175(
2625
parallel: str, projection: dict[str, pl.DataType]
2626
) -> None:
2627
df = pl.DataFrame(
2628
[
2629
pl.Series("a", [0], pl.Int64),
2630
pl.Series("b", [0], pl.Int64),
2631
]
2632
)
2633
2634
f = io.BytesIO()
2635
df.write_parquet(f)
2636
2637
f.seek(0)
2638
out = (
2639
pl.scan_parquet(f, parallel=parallel) # type: ignore[arg-type]
2640
.filter(pl.col.a >= 0)
2641
.select(*projection.keys())
2642
.collect()
2643
)
2644
2645
assert out.schema == projection
2646
2647
2648
def test_parquet_unsupported_dictionary_to_pl_17945() -> None:
2649
t = pa.table(
2650
{
2651
"col1": pa.DictionaryArray.from_arrays([0, 0, None, 1], [42, 1337]),
2652
},
2653
schema=pa.schema({"col1": pa.dictionary(pa.uint32(), pa.int64())}),
2654
)
2655
2656
f = io.BytesIO()
2657
pq.write_table(t, f, use_dictionary=False)
2658
f.truncate()
2659
2660
f.seek(0)
2661
assert_series_equal(
2662
pl.Series("col1", [42, 42, None, 1337], pl.Int64),
2663
pl.read_parquet(f).to_series(),
2664
)
2665
2666
f.seek(0)
2667
pq.write_table(t, f)
2668
f.truncate()
2669
2670
f.seek(0)
2671
assert_series_equal(
2672
pl.Series("col1", [42, 42, None, 1337], pl.Int64),
2673
pl.read_parquet(f).to_series(),
2674
)
2675
2676
2677
@pytest.mark.may_fail_auto_streaming
2678
def test_parquet_cast_to_cat() -> None:
2679
t = pa.table(
2680
{
2681
"col1": pa.DictionaryArray.from_arrays([0, 0, None, 1], ["A", "B"]),
2682
},
2683
schema=pa.schema({"col1": pa.dictionary(pa.uint32(), pa.string())}),
2684
)
2685
2686
f = io.BytesIO()
2687
pq.write_table(t, f, use_dictionary=False)
2688
f.truncate()
2689
2690
f.seek(0)
2691
assert_series_equal(
2692
pl.Series("col1", ["A", "A", None, "B"], pl.Categorical),
2693
pl.read_parquet(f).to_series(),
2694
)
2695
2696
f.seek(0)
2697
pq.write_table(t, f)
2698
f.truncate()
2699
2700
f.seek(0)
2701
assert_series_equal(
2702
pl.Series("col1", ["A", "A", None, "B"], pl.Categorical),
2703
pl.read_parquet(f).to_series(),
2704
)
2705
2706
2707
def test_parquet_roundtrip_lex_cat_20288() -> None:
2708
f = io.BytesIO()
2709
df = pl.Series("a", ["A", "B"], pl.Categorical(ordering="lexical")).to_frame()
2710
df.write_parquet(f)
2711
f.seek(0)
2712
dt = pl.scan_parquet(f).collect_schema()["a"]
2713
assert isinstance(dt, pl.Categorical)
2714
assert dt.ordering == "lexical"
2715
2716
2717
def test_from_parquet_20271() -> None:
2718
f = io.BytesIO()
2719
df = pl.Series("b", ["D", "E"], pl.Categorical).to_frame()
2720
df.write_parquet(f)
2721
del df
2722
f.seek(0)
2723
df = pl.read_parquet(f)
2724
assert_series_equal(df.to_series(), pl.Series("b", ["D", "E"], pl.Categorical))
2725
2726
2727
def test_boolean_slice_pushdown_20314() -> None:
2728
s = pl.Series("a", [None, False, True])
2729
f = io.BytesIO()
2730
2731
s.to_frame().write_parquet(f)
2732
2733
f.seek(0)
2734
assert pl.scan_parquet(f).slice(2, 1).collect().item()
2735
2736
2737
def test_load_pred_pushdown_fsl_19241() -> None:
2738
f = io.BytesIO()
2739
2740
fsl = pl.Series("a", [[[1, 2]]], pl.Array(pl.Array(pl.Int8, 2), 1))
2741
filt = pl.Series("f", [1])
2742
2743
pl.DataFrame([fsl, filt]).write_parquet(f)
2744
2745
f.seek(0)
2746
q = pl.scan_parquet(f, parallel="prefiltered").filter(pl.col.f != 4)
2747
2748
assert_frame_equal(q.collect(), pl.DataFrame([fsl, filt]))
2749
2750
2751
def test_struct_list_statistics_20510() -> None:
2752
# Test PyArrow - Utf8ViewArray
2753
data = {
2754
"name": ["a", "b"],
2755
"data": [
2756
{"title": "Title", "data": [0, 1, 3]},
2757
{"title": "Title", "data": [0, 1, 3]},
2758
],
2759
}
2760
df = pl.DataFrame(
2761
data,
2762
schema=pl.Schema(
2763
{
2764
"name": pl.String(),
2765
"data": pl.Struct(
2766
{
2767
"title": pl.String,
2768
"data": pl.List(pl.Int64),
2769
}
2770
),
2771
}
2772
),
2773
)
2774
2775
f = io.BytesIO()
2776
df.write_parquet(f)
2777
f.seek(0)
2778
result = pl.scan_parquet(f).filter(pl.col("name") == "b").collect()
2779
2780
assert_frame_equal(result, df.filter(pl.col("name") == "b"))
2781
2782
# Test PyArrow - Utf8Array
2783
tb = pa.table(
2784
data,
2785
schema=pa.schema(
2786
[
2787
("name", pa.string()),
2788
(
2789
"data",
2790
pa.struct(
2791
[
2792
("title", pa.string()),
2793
("data", pa.list_(pa.int64())),
2794
]
2795
),
2796
),
2797
]
2798
),
2799
)
2800
2801
f.seek(0)
2802
pq.write_table(tb, f)
2803
f.truncate()
2804
f.seek(0)
2805
result = pl.scan_parquet(f).filter(pl.col("name") == "b").collect()
2806
2807
assert_frame_equal(result, df.filter(pl.col("name") == "b"))
2808
2809
2810
def test_required_masked_skip_values_20809(monkeypatch: Any) -> None:
2811
df = pl.DataFrame(
2812
[pl.Series("a", list(range(20)) + [42] * 15), pl.Series("b", range(35))]
2813
)
2814
needle = [16, 33]
2815
2816
f = io.BytesIO()
2817
df.write_parquet(f)
2818
2819
f.seek(0)
2820
monkeypatch.setenv("POLARS_PQ_PREFILTERED_MASK", "pre")
2821
df1 = (
2822
pl.scan_parquet(f, parallel="prefiltered")
2823
.filter(pl.col.b.is_in(needle))
2824
.collect()
2825
)
2826
2827
f.seek(0)
2828
df2 = pl.read_parquet(f, parallel="columns").filter(pl.col.b.is_in(needle))
2829
2830
assert_frame_equal(df1, df2)
2831
2832
2833
def get_tests_from_dtype(
2834
dtype: pl.DataType, f: Callable[[int], Any]
2835
) -> list[tuple[pl.DataType, list[Any], list[Any]]]:
2836
return [
2837
(dtype, [f(i) for i in range(10)], [f(i) for i in range(11)]),
2838
(
2839
dtype,
2840
[f(i) for i in range(1337)],
2841
[f(i) for i in [0, 1, 5, 7, 101, 1023, 1336, 1337, 1338]],
2842
),
2843
(
2844
dtype,
2845
list(
2846
functools.reduce(
2847
lambda x, y: list(x) + y,
2848
([f(i)] * (i % 13) for i in range(1337)),
2849
[],
2850
)
2851
),
2852
[f(i) for i in [0, 1, 5, 7, 101, 1023, 1336, 1337, 1338]],
2853
),
2854
(
2855
dtype,
2856
[f(5)] * 37 + [f(10)] * 61 + [f(1996)] * 21,
2857
[f(i) for i in [1, 5, 10, 1996]],
2858
),
2859
]
2860
2861
2862
@pytest.mark.parametrize("strategy", ["columns", "prefiltered"])
2863
@pytest.mark.parametrize(
2864
("dtype", "values", "needles"),
2865
get_tests_from_dtype(pl.Int8(), lambda x: (x % 256) - 128)
2866
+ get_tests_from_dtype(pl.Int32(), lambda x: x % 256)
2867
+ get_tests_from_dtype(pl.Date(), lambda x: date(year=1 + x, month=10, day=5))
2868
+ get_tests_from_dtype(pl.String(), lambda x: str(x))
2869
+ get_tests_from_dtype(pl.String(), lambda x: "i" * x)
2870
+ get_tests_from_dtype(pl.String(), lambda x: f"long_strings_with_the_number_{x}"),
2871
)
2872
def test_equality_filter(
2873
strategy: ParallelStrategy,
2874
dtype: pl.DataType,
2875
values: list[Any],
2876
needles: list[Any],
2877
) -> None:
2878
df = pl.DataFrame(
2879
[
2880
pl.Series("a", values, dtype),
2881
]
2882
)
2883
2884
f = io.BytesIO()
2885
df.write_parquet(f)
2886
2887
for needle in needles:
2888
f.seek(0)
2889
scan = pl.scan_parquet(f, parallel=strategy)
2890
try:
2891
assert_frame_equal(
2892
df.filter(pl.col.a == pl.lit(needle, dtype)),
2893
scan.filter(pl.col.a == pl.lit(needle, dtype)).collect(),
2894
)
2895
except:
2896
import sys
2897
2898
print(f"needle: {needle}", file=sys.stderr)
2899
raise
2900
2901
2902
def test_nested_string_slice_utf8_21202() -> None:
2903
s = pl.Series(
2904
"a",
2905
[
2906
["A" * 128],
2907
["A"],
2908
],
2909
pl.List(pl.String()),
2910
)
2911
2912
f = io.BytesIO()
2913
s.to_frame().write_parquet(f)
2914
2915
f.seek(0)
2916
assert_series_equal(
2917
pl.scan_parquet(f).slice(1, 1).collect().to_series(),
2918
s.slice(1, 1),
2919
)
2920
2921
2922
def test_filter_true_predicate_21204() -> None:
2923
f = io.BytesIO()
2924
2925
df = pl.DataFrame({"a": [1]})
2926
df.write_parquet(f)
2927
f.seek(0)
2928
lf = pl.scan_parquet(f).filter(pl.lit(True))
2929
assert_frame_equal(lf.collect(), df)
2930
2931
2932
def test_nested_deprecated_int96_timestamps_21332() -> None:
2933
f = io.BytesIO()
2934
2935
df = pl.DataFrame({"a": [{"t": datetime(2025, 1, 1)}]})
2936
2937
pq.write_table(
2938
df.to_arrow(),
2939
f,
2940
use_deprecated_int96_timestamps=True,
2941
)
2942
2943
f.seek(0)
2944
assert_frame_equal(
2945
pl.read_parquet(f),
2946
df,
2947
)
2948
2949
2950
def test_final_masked_optional_iteration_21378() -> None:
2951
# fmt: off
2952
values = [
2953
1, 0, 0, 0, 0, 1, 1, 1,
2954
1, 0, 0, 1, 1, 1, 1, 0,
2955
0, 1, 1, 1, 0, 1, 0, 0,
2956
1, 1, 0, 0, 0, 1, 1, 1,
2957
0, 1, 0, 0, 1, 1, 1, 1,
2958
0, 1, 1, 1, 0, 1, 0, 1,
2959
0, 1, 1, 0, 1, 0, 1, 1,
2960
0, 0, 0, 0, 1, 0, 0, 0,
2961
0, 1, 1, 1, 0, 0, 1, 1,
2962
0, 0, 1, 1, 0, 0, 0, 1,
2963
1, 1, 0, 1, 1, 1, 0, 0,
2964
0, 0, 0, 0, 0, 0, 0, 0,
2965
1, 1, 1, 1, 1, 1, 1, 0,
2966
0, 0, 1, 0, 1, 1, 0, 0,
2967
0, 1, 1, 0, 0, 1, 0, 0,
2968
1, 1, 1, 1, 0, 0, 1, 0,
2969
0, 1, 1, 0, 0, 1, 1, 1,
2970
1, 1, 1, 0, 1, 1, 0, 1,
2971
0, 1, 0, 1, 0, 1, 0, 1,
2972
0, 0, 0, 1, 1, 0, 0, 0,
2973
1, 1, 0, 1, 0, 1, 0, 1,
2974
0, 1, 0, 0, 0, 0, 0, 1,
2975
0, 0, 1, 1, 0, 0, 1, 1,
2976
0, 1, 0, 0, 0, 1, 1, 1,
2977
1, 0, 1, 0, 1, 0, 1, 1,
2978
1, 0, 1, 0, 0, 1, 0, 1,
2979
0, 1, 1, 1, 0, 0, 0, 1,
2980
1, 1, 1, 1, 1, 1, 0, 0,
2981
1, 0, 0, 0, 0, 0, 0, 1,
2982
1, 1, 1, 0, 0, 0, 0, 0,
2983
1, 1, 1, 0, 0, 0, 1, 1,
2984
0, 0, 0, 0, 0, 1, 1, 0,
2985
0, 0, 1, 0, 0, 0, 0, 1,
2986
0, 0, 0, 0, 0, 1, 0, 0,
2987
1, 0, 1, 0, 0, 1, 0, 0,
2988
0, 1, 1, 1, 0, 0, 1, 1,
2989
1, 0, 1, 1, 0, 0, 0, 1,
2990
0, 0, 1, 1, 0, 1, 0, 1,
2991
0, 1, 1, 1, 0, 0, 0, 1,
2992
0, 0, 0, 1, 0, 1, 0, 1,
2993
0, 1, 0, 1, 0, 1, 1, 1,
2994
1, 0, 1, 1, 1, 1, 1, 0,
2995
1, 0, 1, 0, 0, 0, 1, 1,
2996
0, 0, 0, 1, 0, 0, 1, 0,
2997
0, 1, 0, 0, 1, 0, 1, 1,
2998
1, 0, 0, 1, 0, 1, 1, 0,
2999
0, 1, 0, 1, 1, 0, 1, 0,
3000
0, 0, 0, 1, 1, 1, 0, 0,
3001
0, 1, 0, 1, 1, 0, 1, 1,
3002
1, 1, 0, 1, 0, 1, 0, 1,
3003
1, 1, 0, 1, 0, 0, 1, 0,
3004
1, 1, 0, 1, 1, 0, 0, 1,
3005
0, 0, 0, 0, 0, 1, 0, 0,
3006
0, 1, 0, 0, 1, 1, 1, 1,
3007
1, 0, 1, 1, 1, 0, 1, 1,
3008
1, 1, 0, 0, 0, 0, 1, 1,
3009
]
3010
3011
df = pl.DataFrame(
3012
[
3013
pl.Series("x", [None if x == 1 else 0.0 for x in values], pl.Float32),
3014
pl.Series(
3015
"f",
3016
[False] * 164 +
3017
[True] * 10 +
3018
[False] * 264 +
3019
[True] * 10,
3020
pl.Boolean(),
3021
),
3022
]
3023
)
3024
3025
f = io.BytesIO()
3026
df.write_parquet(f)
3027
f.seek(0)
3028
3029
output = pl.scan_parquet(f, parallel="prefiltered").filter(pl.col.f).collect()
3030
assert_frame_equal(df.filter(pl.col.f), output)
3031
3032
3033
def test_predicate_empty_is_in_21450() -> None:
3034
f = io.BytesIO()
3035
df = pl.DataFrame({"a": [1, 2], "b": [3, 4]})
3036
df.write_parquet(f)
3037
3038
f.seek(0)
3039
assert_frame_equal(
3040
df.clear(),
3041
pl.scan_parquet(f).filter(pl.col("a").is_in([])).collect(),
3042
)
3043
3044
3045
@pytest.mark.write_disk
3046
def test_scan_parquet_filter_statistics_load_missing_column_21391(
3047
tmp_path: Path,
3048
) -> None:
3049
root = tmp_path
3050
dfs = [pl.DataFrame({"x": 1, "y": 1}), pl.DataFrame({"x": 2})]
3051
3052
for i, df in enumerate(dfs):
3053
df.write_parquet(root / f"{i}.parquet")
3054
3055
assert_frame_equal(
3056
(
3057
pl.scan_parquet(root, missing_columns="insert")
3058
.filter(pl.col("y") == 1)
3059
.collect()
3060
),
3061
pl.DataFrame({"x": 1, "y": 1}),
3062
)
3063
3064
3065
@pytest.mark.parametrize(
3066
"ty",
3067
[
3068
(lambda i: i, pl.Int8, True),
3069
(lambda i: datetime(year=2025, month=9, day=i), pl.Datetime, True),
3070
(lambda i: float(i), pl.Float32, True),
3071
(lambda i: str(i), pl.String, True),
3072
(lambda i: str(i) + "make it a bit longer", pl.String, True),
3073
(lambda i: [i, i + 7] * (i % 3), pl.List(pl.Int32), True),
3074
(lambda i: {"x": i}, pl.Struct({"x": pl.Int32}), True),
3075
(lambda i: [i, i + 3, i + 7], pl.Array(pl.Int32, 3), False),
3076
],
3077
)
3078
def test_filter_nulls_21538(ty: tuple[Callable[[int], Any], pl.DataType, bool]) -> None:
3079
i_to_value, dtype, do_no_dicts = ty
3080
3081
patterns: list[list[int | None]] = [
3082
[None, None, None, None, None],
3083
[1, None, None, 2, None],
3084
[None, 1, 2, 3, 4],
3085
[1, 2, 3, 4, None],
3086
[None, 1, 2, 3, None],
3087
[None, 1, None, 3, None],
3088
[1, 2, 3, 4, 5],
3089
]
3090
3091
df = pl.DataFrame(
3092
[
3093
pl.Series(
3094
f"p{i}", [None if v is None else i_to_value(v) for v in pattern], dtype
3095
)
3096
for i, pattern in enumerate(patterns)
3097
]
3098
)
3099
3100
fs = []
3101
3102
dicts_f = io.BytesIO()
3103
df.write_parquet(dicts_f)
3104
fs += [dicts_f]
3105
3106
if do_no_dicts:
3107
no_dicts_f = io.BytesIO()
3108
pq.write_table(df.to_arrow(), no_dicts_f, use_dictionary=False)
3109
fs += [no_dicts_f]
3110
3111
for f in fs:
3112
for i in range(len(patterns)):
3113
f.seek(0)
3114
assert_frame_equal(
3115
pl.scan_parquet(f).filter(pl.col(f"p{i}").is_null()).collect(),
3116
df.filter(pl.col(f"p{i}").is_null()),
3117
)
3118
3119
f.seek(0)
3120
assert_frame_equal(
3121
pl.scan_parquet(f).filter(pl.col(f"p{i}").is_not_null()).collect(),
3122
df.filter(pl.col(f"p{i}").is_not_null()),
3123
)
3124
3125
3126
def test_unspecialized_decoding_prefiltering() -> None:
3127
df = pl.DataFrame(
3128
{
3129
"a": [None, None, None, "abc"],
3130
"b": [False, True, False, True],
3131
}
3132
)
3133
3134
cols = df.columns
3135
3136
encodings = dict.fromkeys(cols, "DELTA_LENGTH_BYTE_ARRAY")
3137
encodings["b"] = "PLAIN"
3138
3139
f = io.BytesIO()
3140
pq.write_table(
3141
df.to_arrow(),
3142
f,
3143
use_dictionary=False,
3144
column_encoding=encodings,
3145
)
3146
3147
f.seek(0)
3148
expr = pl.col("b")
3149
result = (
3150
pl.scan_parquet(f, parallel="prefiltered")
3151
.filter(expr)
3152
.collect(engine="streaming")
3153
)
3154
assert_frame_equal(result, df.filter(expr))
3155
3156
3157
@pytest.mark.parametrize("parallel", ["columns", "row_groups"])
3158
def test_filtering_on_other_parallel_modes_with_statistics(
3159
parallel: ParallelStrategy,
3160
) -> None:
3161
f = io.BytesIO()
3162
3163
pl.DataFrame(
3164
{
3165
"a": [1, 4, 9, 2, 4, 8, 3, 4, 7],
3166
}
3167
).write_parquet(f, row_group_size=3)
3168
3169
f.seek(0)
3170
assert_series_equal(
3171
pl.scan_parquet(f, parallel=parallel)
3172
.filter(pl.col.a == 4)
3173
.collect()
3174
.to_series(),
3175
pl.Series("a", [4, 4, 4]),
3176
)
3177
3178
3179
def test_filter_on_logical_dtype_22252() -> None:
3180
f = io.BytesIO()
3181
pl.Series("a", [datetime(1996, 10, 5)]).to_frame().write_parquet(f)
3182
f.seek(0)
3183
pl.scan_parquet(f).filter(pl.col.a.dt.weekday() == 6).collect()
3184
3185
3186
def test_filter_nan_22289() -> None:
3187
f = io.BytesIO()
3188
pl.DataFrame(
3189
{"a": [1, 2, float("nan")], "b": [float("nan"), 5, 6]}, strict=False
3190
).write_parquet(f)
3191
3192
f.seek(0)
3193
lf = pl.scan_parquet(f)
3194
3195
assert_frame_equal(
3196
lf.collect().filter(pl.col.a.is_not_nan()),
3197
lf.filter(pl.col.a.is_not_nan()).collect(),
3198
)
3199
3200
assert_frame_equal(
3201
lf.collect().filter(pl.col.a.is_nan()),
3202
lf.filter(pl.col.a.is_nan()).collect(),
3203
)
3204
3205
3206
def test_encode_utf8_check_22467() -> None:
3207
f = io.BytesIO()
3208
values = ["😀" * 129, "😀"]
3209
3210
pq.write_table(pl.Series(values).to_frame().to_arrow(), f, use_dictionary=False)
3211
3212
f.seek(0)
3213
pl.scan_parquet(f).slice(1, 1).collect()
3214
3215
3216
def test_reencode_categoricals_22385() -> None:
3217
tbl = pl.Series("a", ["abc"], pl.Categorical()).to_frame().to_arrow()
3218
tbl = tbl.cast(
3219
pa.schema(
3220
[
3221
pa.field(
3222
"a",
3223
pa.dictionary(pa.int32(), pa.large_string()),
3224
metadata=tbl.schema[0].metadata,
3225
),
3226
]
3227
)
3228
)
3229
3230
f = io.BytesIO()
3231
pq.write_table(tbl, f)
3232
3233
f.seek(0)
3234
pl.scan_parquet(f).collect()
3235
3236
3237
def test_parquet_read_timezone_22506() -> None:
3238
f = io.BytesIO()
3239
3240
pd.DataFrame(
3241
{
3242
"a": [1, 2],
3243
"b": pd.to_datetime(
3244
["2020-01-01T00:00:00+01:00", "2020-01-02T00:00:00+01:00"]
3245
),
3246
}
3247
).to_parquet(f)
3248
3249
assert b'"metadata": {"timezone": "+01:00"}}' in f.getvalue()
3250
3251
f.seek(0)
3252
3253
assert_frame_equal(
3254
pl.read_parquet(f),
3255
pl.DataFrame(
3256
{
3257
"a": [1, 2],
3258
"b": [
3259
datetime(2020, 1, 1, tzinfo=ZoneInfo("Etc/GMT-1")),
3260
datetime(2020, 1, 2, tzinfo=ZoneInfo("Etc/GMT-1")),
3261
],
3262
},
3263
schema={
3264
"a": pl.Int64,
3265
"b": pl.Datetime(time_unit="ns", time_zone="Etc/GMT-1"),
3266
},
3267
),
3268
)
3269
3270
3271
@pytest.mark.parametrize("static", [True, False])
3272
@pytest.mark.parametrize("lazy", [True, False])
3273
def test_read_write_metadata(static: bool, lazy: bool) -> None:
3274
metadata = {"hello": "world", "something": "else"}
3275
md: ParquetMetadata = metadata
3276
if not static:
3277
md = lambda ctx: metadata # noqa: E731
3278
3279
df = pl.DataFrame({"a": [1, 2, 3]})
3280
3281
f = io.BytesIO()
3282
if lazy:
3283
df.lazy().sink_parquet(f, metadata=md)
3284
else:
3285
df.write_parquet(f, metadata=md)
3286
3287
f.seek(0)
3288
actual = pl.read_parquet_metadata(f)
3289
assert "ARROW:schema" in actual
3290
assert metadata == {k: v for k, v in actual.items() if k != "ARROW:schema"}
3291
3292
3293
@pytest.mark.write_disk
3294
def test_metadata_callback_info(tmp_path: Path) -> None:
3295
df = pl.DataFrame({"a": [1, 2, 3]})
3296
num_writes = 0
3297
3298
def fn_metadata(ctx: ParquetMetadataContext) -> dict[str, str]:
3299
nonlocal num_writes
3300
num_writes += 1
3301
return {}
3302
3303
df.write_parquet(tmp_path, partition_by="a", metadata=fn_metadata)
3304
3305
assert num_writes == len(df)
3306
3307
3308
def test_field_overwrites_metadata() -> None:
3309
f = io.BytesIO()
3310
lf = pl.LazyFrame(
3311
{
3312
"a": [None, 2, 3, 4],
3313
"b": [[1, 2, 3], [42], [13], [37]],
3314
"c": [
3315
{"x": "a", "y": 42},
3316
{"x": "b", "y": 13},
3317
{"x": "X", "y": 37},
3318
{"x": "Y", "y": 15},
3319
],
3320
}
3321
)
3322
lf.sink_parquet(
3323
f,
3324
field_overwrites={
3325
"a": ParquetFieldOverwrites(metadata={"flat_from_polars": "yes"}),
3326
"b": ParquetFieldOverwrites(
3327
children=ParquetFieldOverwrites(metadata={"listitem": "yes"}),
3328
metadata={"list": "true"},
3329
),
3330
"c": ParquetFieldOverwrites(
3331
children=[
3332
ParquetFieldOverwrites(name="x", metadata={"md": "yes"}),
3333
ParquetFieldOverwrites(name="y", metadata={"md2": "Yes!"}),
3334
],
3335
metadata={"struct": "true"},
3336
),
3337
},
3338
)
3339
3340
f.seek(0)
3341
schema = pq.read_schema(f)
3342
assert schema[0].metadata[b"flat_from_polars"] == b"yes"
3343
assert schema[1].metadata[b"list"] == b"true"
3344
assert schema[1].type.value_field.metadata[b"listitem"] == b"yes"
3345
assert schema[2].metadata[b"struct"] == b"true"
3346
assert schema[2].type.fields[0].metadata[b"md"] == b"yes"
3347
assert schema[2].type.fields[1].metadata[b"md2"] == b"Yes!"
3348
3349
3350
def multiple_test_sorting_columns() -> None:
3351
df = pl.DataFrame(
3352
{
3353
"a": [1, 1, 1, 2, 2, 2],
3354
"b": [1, 2, 3, 1, 2, 3],
3355
}
3356
)
3357
3358
f = io.BytesIO()
3359
pq.write_table(
3360
df.to_arrow(),
3361
f,
3362
sorting_columns=[pq.SortingColumn(0), pq.SortingColumn(1)],
3363
)
3364
3365
f.seek(0)
3366
roundtrip = pl.read_parquet(f)
3367
assert roundtrip.get_column("a").is_sorted()
3368
assert not roundtrip.get_column("b").is_sorted()
3369
assert_frame_equal(roundtrip.sort("b"), df.sort("b"))
3370
3371
3372
@pytest.mark.write_disk
3373
def test_read_parquet_duplicate_range_start_fetch_23139(tmp_path: Path) -> None:
3374
tmp_path.mkdir(exist_ok=True)
3375
path = tmp_path / "data.parquet"
3376
3377
df = pl.DataFrame(
3378
schema={
3379
"a": pl.Boolean,
3380
"b": pl.Boolean,
3381
}
3382
)
3383
3384
df.write_parquet(path, use_pyarrow=True)
3385
3386
assert_frame_equal(pl.read_parquet(path), df)
3387
3388
3389
@pytest.mark.parametrize(
3390
("value", "scan_dtype", "filter_expr"),
3391
[
3392
(pl.lit(1, dtype=pl.Int8), pl.Int16, pl.col("x") > 1),
3393
(pl.lit(1.0, dtype=pl.Float64), pl.Float32, pl.col("x") > 1.0),
3394
(pl.lit(1.0, dtype=pl.Float32), pl.Float64, pl.col("x") > 1.0),
3395
(
3396
pl.lit(
3397
datetime(2025, 1, 1),
3398
dtype=pl.Datetime(time_unit="ns", time_zone="Europe/Amsterdam"),
3399
),
3400
pl.Datetime(time_unit="ms", time_zone="Australia/Sydney"),
3401
pl.col("x")
3402
!= pl.lit(
3403
datetime(2025, 1, 1, 10),
3404
dtype=pl.Datetime(time_unit="ms", time_zone="Australia/Sydney"),
3405
),
3406
),
3407
# Note: This is not implemented at all
3408
# (
3409
# pl.lit({"a": 1}, dtype=pl.Struct({"a": pl.Int8})),
3410
# pl.Struct({"a": pl.Int64}),
3411
# pl.col("x").struct.field("a") > 1,
3412
# ),
3413
],
3414
)
3415
def test_scan_parquet_skip_row_groups_with_cast(
3416
value: Any,
3417
scan_dtype: pl.DataType,
3418
filter_expr: pl.Expr,
3419
monkeypatch: pytest.MonkeyPatch,
3420
capfd: pytest.CaptureFixture[str],
3421
) -> None:
3422
f = io.BytesIO()
3423
3424
df = pl.select(x=value)
3425
3426
df.write_parquet(f)
3427
3428
q = pl.scan_parquet(
3429
f,
3430
schema={"x": scan_dtype},
3431
cast_options=pl.ScanCastOptions(
3432
integer_cast="upcast",
3433
float_cast=["upcast", "downcast"],
3434
datetime_cast=["convert-timezone", "nanosecond-downcast"],
3435
missing_struct_fields="insert",
3436
),
3437
).filter(filter_expr)
3438
3439
monkeypatch.setenv("POLARS_VERBOSE", "1")
3440
capfd.readouterr()
3441
out = q.collect()
3442
assert "reading 0 / 1 row groups" in capfd.readouterr().err
3443
3444
assert_frame_equal(out, pl.DataFrame(schema={"x": scan_dtype}))
3445
3446
3447
@pytest.mark.parametrize(
3448
("value", "scan_dtype", "filter_expr"),
3449
[
3450
(pl.lit(1, dtype=pl.Int8), pl.Int16, pl.col("x") == 1),
3451
(pl.lit(1.0, dtype=pl.Float64), pl.Float32, pl.col("x") == 1.0),
3452
(pl.lit(1.0, dtype=pl.Float32), pl.Float64, pl.col("x") == 1.0),
3453
(
3454
pl.lit(
3455
datetime(2025, 1, 1),
3456
dtype=pl.Datetime(time_unit="ns", time_zone="Europe/Amsterdam"),
3457
),
3458
pl.Datetime(time_unit="ms", time_zone="Australia/Sydney"),
3459
pl.col("x")
3460
== pl.lit(
3461
datetime(2025, 1, 1, 10),
3462
dtype=pl.Datetime(time_unit="ms", time_zone="Australia/Sydney"),
3463
),
3464
),
3465
(
3466
pl.lit({"a": 1}, dtype=pl.Struct({"a": pl.Int8})),
3467
pl.Struct({"a": pl.Int64}),
3468
pl.col("x").struct.field("a") == 1,
3469
),
3470
],
3471
)
3472
@pytest.mark.may_fail_cloud # reason: looks at stdout
3473
def test_scan_parquet_skip_row_groups_with_cast_inclusions(
3474
value: Any,
3475
scan_dtype: pl.DataType,
3476
filter_expr: pl.Expr,
3477
monkeypatch: pytest.MonkeyPatch,
3478
capfd: pytest.CaptureFixture[str],
3479
) -> None:
3480
f = io.BytesIO()
3481
df = pl.select(x=value)
3482
df.write_parquet(f)
3483
3484
f.seek(0)
3485
q = pl.scan_parquet(
3486
f,
3487
schema={"x": scan_dtype},
3488
cast_options=pl.ScanCastOptions(
3489
integer_cast="upcast",
3490
float_cast=["upcast", "downcast"],
3491
datetime_cast=["convert-timezone", "nanosecond-downcast"],
3492
missing_struct_fields="insert",
3493
),
3494
).filter(filter_expr)
3495
3496
monkeypatch.setenv("POLARS_VERBOSE", "1")
3497
capfd.readouterr()
3498
out = q.collect()
3499
assert "reading 1 / 1 row groups" in capfd.readouterr().err
3500
3501
assert_frame_equal(out, pl.select(x=value).select(pl.first().cast(scan_dtype)))
3502
3503
3504
def test_roundtrip_int128() -> None:
3505
f = io.BytesIO()
3506
s = pl.Series("a", [1, 2, 3], pl.Int128)
3507
s.to_frame().write_parquet(f)
3508
f.seek(0)
3509
assert_series_equal(pl.read_parquet(f).to_series(), s)
3510
3511
3512
def test_write_nested_categoricals() -> None:
3513
df = pl.select(
3514
pl.lit(pl.Series("col", ["a", "b"], dtype=pl.Categorical)).implode().implode(),
3515
)
3516
3517
f = io.BytesIO()
3518
df.lazy().sink_parquet(f)
3519
3520
f.seek(0)
3521
assert_frame_equal(pl.read_parquet(f), df)
3522
3523
3524
def test_literal_predicate_23901() -> None:
3525
df = pl.DataFrame({"x": range(10)})
3526
3527
f = io.BytesIO()
3528
df.write_parquet(f, row_group_size=1)
3529
3530
f.seek(0)
3531
assert_frame_equal(pl.scan_parquet(f).filter(pl.lit(1) == 1).collect(), df)
3532
3533
3534
def test_str_plain_is_in_more_than_4_values_24167() -> None:
3535
f = io.BytesIO()
3536
3537
df = pl.DataFrame({"a": ["a"]})
3538
pq.write_table(df.to_arrow(), f, use_dictionary=False)
3539
3540
f.seek(0)
3541
lf = pl.scan_parquet(f).filter(pl.col.a.is_in(["a", "y", "z", "w", "x"]))
3542
3543
assert_frame_equal(
3544
lf.collect(),
3545
lf.collect(optimizations=pl.QueryOptFlags(predicate_pushdown=False)),
3546
)
3547
3548