Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/io/test_iceberg.py
8424 views
1
# mypy: disable-error-code="attr-defined"
2
from __future__ import annotations
3
4
import contextlib
5
import io
6
import itertools
7
import os
8
import pickle
9
import sys
10
import warnings
11
import zoneinfo
12
from datetime import date, datetime
13
from decimal import Decimal as D
14
from functools import partial
15
from pathlib import Path
16
from typing import TYPE_CHECKING, Any
17
18
import pyarrow as pa
19
import pyarrow.parquet as pq
20
import pydantic
21
import pyiceberg
22
import pytest
23
from pyiceberg.expressions import literal
24
from pyiceberg.partitioning import (
25
BucketTransform,
26
IdentityTransform,
27
PartitionField,
28
PartitionSpec,
29
)
30
from pyiceberg.schema import Schema as IcebergSchema
31
from pyiceberg.types import (
32
BinaryType,
33
BooleanType,
34
DateType,
35
DecimalType,
36
DoubleType,
37
FixedType,
38
FloatType,
39
IntegerType,
40
ListType,
41
LongType,
42
MapType,
43
NestedField,
44
StringType,
45
StructType,
46
TimestampType,
47
TimestamptzType,
48
TimeType,
49
UUIDType,
50
)
51
52
import polars as pl
53
from polars._utils.various import parse_version
54
from polars.io.cloud._utils import NoPickleOption
55
from polars.io.iceberg._dataset import IcebergDataset, _NativeIcebergScanData
56
from polars.io.iceberg._utils import (
57
_convert_predicate,
58
_normalize_windows_iceberg_file_uri,
59
_to_ast,
60
try_convert_pyarrow_predicate,
61
)
62
from polars.testing import assert_frame_equal
63
from tests.unit.io.conftest import normalize_path_separator_pl
64
65
if TYPE_CHECKING:
66
from pyiceberg.table import Table
67
68
from tests.conftest import PlMonkeyPatch
69
70
# Mypy does not understand the constructors and we can't construct the inputs
71
# explicitly since they are abstract base classes.
72
And = Any
73
EqualTo = Any
74
GreaterThan = Any
75
GreaterThanOrEqual = Any
76
In = Any
77
IsNull = Any
78
LessThan = Any
79
LessThanOrEqual = Any
80
Not = Any
81
Or = Any
82
Reference = Any
83
else:
84
from pyiceberg.expressions import (
85
And,
86
EqualTo,
87
GreaterThan,
88
GreaterThanOrEqual,
89
In,
90
IsNull,
91
LessThan,
92
LessThanOrEqual,
93
Not,
94
Or,
95
Reference,
96
)
97
98
99
with warnings.catch_warnings():
100
# Upstream issue at https://github.com/apache/iceberg-python/issues/2648.
101
warnings.simplefilter("ignore", pydantic.warnings.PydanticDeprecatedSince212)
102
# Upstream issue at https://github.com/apache/iceberg-python/issues/2849.
103
warnings.simplefilter("ignore", DeprecationWarning)
104
from pyiceberg.catalog.sql import SqlCatalog
105
from pyiceberg.io.pyarrow import schema_to_pyarrow
106
107
108
def new_pl_iceberg_dataset(source: str | Table) -> IcebergDataset:
109
from pyiceberg.table import Table
110
111
return IcebergDataset(
112
table_=NoPickleOption(source if isinstance(source, Table) else None),
113
metadata_path_=source if not isinstance(source, Table) else None,
114
snapshot_id=None,
115
iceberg_storage_properties=None,
116
reader_override=None,
117
use_metadata_statistics=True,
118
fast_deletion_count=False,
119
use_pyiceberg_filter=True,
120
)
121
122
123
# PyIceberg on Windows uses `file://C:/` rather than `file:///C:/`.
124
def format_file_uri_iceberg(absolute_local_path: str | Path) -> str:
125
absolute_local_path = str(absolute_local_path)
126
127
if sys.platform == "win32":
128
assert absolute_local_path[0].isalpha()
129
assert absolute_local_path[1] == ":"
130
p = absolute_local_path.replace("\\", "/")
131
return f"file://{p}"
132
133
assert absolute_local_path.startswith("/")
134
return f"file://{absolute_local_path}"
135
136
137
@pytest.fixture
138
def iceberg_path(io_files_path: Path) -> str:
139
# Iceberg requires absolute paths, so we'll symlink
140
# the test table into /tmp/iceberg/t1/
141
Path("/tmp/iceberg").mkdir(parents=True, exist_ok=True)
142
current_path = Path(__file__).parent.resolve()
143
144
with contextlib.suppress(FileExistsError):
145
os.symlink(f"{current_path}/files/iceberg-table", "/tmp/iceberg/t1") # noqa: PTH211
146
147
iceberg_path = io_files_path / "iceberg-table" / "metadata" / "v2.metadata.json"
148
return format_file_uri_iceberg(f"{iceberg_path.resolve()}")
149
150
151
@pytest.mark.slow
152
@pytest.mark.write_disk
153
@pytest.mark.filterwarnings(
154
"ignore:No preferred file implementation for scheme*:UserWarning"
155
)
156
@pytest.mark.ci_only
157
class TestIcebergScanIO:
158
"""Test coverage for `iceberg` scan ops."""
159
160
def test_scan_iceberg_plain(self, iceberg_path: str) -> None:
161
q = pl.scan_iceberg(iceberg_path)
162
assert len(q.collect()) == 3
163
assert q.collect_schema() == {
164
"id": pl.Int32,
165
"str": pl.String,
166
"ts": pl.Datetime(time_unit="us", time_zone=None),
167
}
168
169
def test_scan_iceberg_snapshot_id(self, iceberg_path: str) -> None:
170
q = pl.scan_iceberg(iceberg_path, snapshot_id=7051579356916758811)
171
assert len(q.collect()) == 3
172
assert q.collect_schema() == {
173
"id": pl.Int32,
174
"str": pl.String,
175
"ts": pl.Datetime(time_unit="us", time_zone=None),
176
}
177
178
def test_scan_iceberg_snapshot_id_not_found(self, iceberg_path: str) -> None:
179
with pytest.raises(ValueError, match="snapshot ID not found"):
180
pl.scan_iceberg(iceberg_path, snapshot_id=1234567890).collect()
181
182
def test_scan_iceberg_filter_on_partition(self, iceberg_path: str) -> None:
183
ts1 = datetime(2023, 3, 1, 18, 15)
184
ts2 = datetime(2023, 3, 1, 19, 25)
185
ts3 = datetime(2023, 3, 2, 22, 0)
186
187
lf = pl.scan_iceberg(iceberg_path)
188
189
res = lf.filter(pl.col("ts") >= ts2)
190
assert len(res.collect()) == 2
191
192
res = lf.filter(pl.col("ts") > ts2).select(pl.col("id"))
193
assert res.collect().rows() == [(3,)]
194
195
res = lf.filter(pl.col("ts") <= ts2).select("id", "ts")
196
assert res.collect().rows(named=True) == [
197
{"id": 1, "ts": ts1},
198
{"id": 2, "ts": ts2},
199
]
200
201
res = lf.filter(pl.col("ts") > ts3)
202
assert len(res.collect()) == 0
203
204
for constraint in (
205
(pl.col("ts") == ts1) | (pl.col("ts") == ts3),
206
pl.col("ts").is_in([ts1, ts3]),
207
):
208
res = lf.filter(constraint).select("id")
209
assert res.collect().rows() == [(1,), (3,)]
210
211
def test_scan_iceberg_filter_on_column(self, iceberg_path: str) -> None:
212
lf = pl.scan_iceberg(iceberg_path)
213
res = lf.filter(pl.col("id") < 2)
214
assert res.collect().rows() == [(1, "1", datetime(2023, 3, 1, 18, 15))]
215
216
res = lf.filter(pl.col("id") == 2)
217
assert res.collect().rows() == [(2, "2", datetime(2023, 3, 1, 19, 25))]
218
219
res = lf.filter(pl.col("id").is_in([1, 3]))
220
assert res.collect().rows() == [
221
(1, "1", datetime(2023, 3, 1, 18, 15)),
222
(3, "3", datetime(2023, 3, 2, 22, 0)),
223
]
224
225
226
@pytest.mark.ci_only
227
class TestIcebergExpressions:
228
"""Test coverage for `iceberg` expressions comprehension."""
229
230
def test_is_null_expression(self) -> None:
231
expr = _to_ast("(pa.compute.field('id')).is_null()")
232
assert _convert_predicate(expr) == IsNull("id")
233
234
def test_is_not_null_expression(self) -> None:
235
expr = _to_ast("~(pa.compute.field('id')).is_null()")
236
assert _convert_predicate(expr) == Not(IsNull("id"))
237
238
def test_isin_expression(self) -> None:
239
expr = _to_ast("(pa.compute.field('id')).isin([1,2,3])")
240
assert _convert_predicate(expr) == In(
241
"id", {literal(1), literal(2), literal(3)}
242
)
243
244
def test_parse_combined_expression(self) -> None:
245
expr = _to_ast(
246
"(((pa.compute.field('str') == '2') & (pa.compute.field('id') > 10)) | (pa.compute.field('id')).isin([1,2,3]))"
247
)
248
assert _convert_predicate(expr) == Or(
249
left=And(
250
left=EqualTo(term=Reference(name="str"), literal=literal("2")),
251
right=GreaterThan(term="id", literal=literal(10)),
252
),
253
right=In("id", {literal(1), literal(2), literal(3)}),
254
)
255
256
def test_parse_gt(self) -> None:
257
expr = _to_ast("(pa.compute.field('ts') > '2023-08-08')")
258
assert _convert_predicate(expr) == GreaterThan("ts", "2023-08-08")
259
260
def test_parse_gteq(self) -> None:
261
expr = _to_ast("(pa.compute.field('ts') >= '2023-08-08')")
262
assert _convert_predicate(expr) == GreaterThanOrEqual("ts", "2023-08-08")
263
264
def test_parse_eq(self) -> None:
265
expr = _to_ast("(pa.compute.field('ts') == '2023-08-08')")
266
assert _convert_predicate(expr) == EqualTo("ts", "2023-08-08")
267
268
def test_parse_lt(self) -> None:
269
expr = _to_ast("(pa.compute.field('ts') < '2023-08-08')")
270
assert _convert_predicate(expr) == LessThan("ts", "2023-08-08")
271
272
def test_parse_lteq(self) -> None:
273
expr = _to_ast("(pa.compute.field('ts') <= '2023-08-08')")
274
assert _convert_predicate(expr) == LessThanOrEqual("ts", "2023-08-08")
275
276
def test_compare_boolean(self) -> None:
277
expr = _to_ast("(pa.compute.field('ts') == pa.compute.scalar(True))")
278
assert _convert_predicate(expr) == EqualTo("ts", True)
279
280
expr = _to_ast("(pa.compute.field('ts') == pa.compute.scalar(False))")
281
assert _convert_predicate(expr) == EqualTo("ts", False)
282
283
def test_bare_boolean_field(self) -> None:
284
expr = try_convert_pyarrow_predicate("pa.compute.field('is_active')")
285
assert expr == EqualTo("is_active", True)
286
287
def test_bare_boolean_field_negated(self) -> None:
288
expr = try_convert_pyarrow_predicate("~pa.compute.field('is_active')")
289
assert expr == Not(EqualTo("is_active", True))
290
291
292
@pytest.mark.write_disk
293
def test_iceberg_dataset_does_not_pickle_table_object(tmp_path: Path) -> None:
294
catalog = SqlCatalog(
295
"default",
296
uri="sqlite:///:memory:",
297
warehouse=format_file_uri_iceberg(tmp_path),
298
)
299
catalog.create_namespace("namespace")
300
301
catalog.create_table(
302
"namespace.table",
303
IcebergSchema(
304
NestedField(1, "row_index", IntegerType()),
305
),
306
)
307
308
tbl = catalog.load_table("namespace.table")
309
310
df = pl.DataFrame(
311
{"row_index": [0, 1, 2, 3, 4]},
312
schema={"row_index": pl.Int32},
313
)
314
315
df.write_iceberg(tbl, mode="append")
316
317
dataset = new_pl_iceberg_dataset(tbl)
318
319
assert dataset.table_.get() is not None
320
dataset = pickle.loads(pickle.dumps(dataset))
321
assert dataset.table_.get() is None
322
323
assert_frame_equal(dataset.to_dataset_scan()[0].collect(), df) # type: ignore[index]
324
325
326
@pytest.mark.slow
327
@pytest.mark.write_disk
328
@pytest.mark.filterwarnings("ignore:Delete operation did not match any records")
329
@pytest.mark.filterwarnings(
330
"ignore:Iceberg does not have a dictionary type. <class 'pyarrow.lib.DictionaryType'> will be inferred as large_string on read."
331
)
332
def test_write_iceberg(df: pl.DataFrame, tmp_path: Path) -> None:
333
# time64[ns] type is currently not supported in pyiceberg.
334
# https://github.com/apache/iceberg-python/issues/1169
335
df = df.drop("time", "cat", "enum")
336
337
# in-memory catalog
338
catalog = SqlCatalog(
339
"default", uri="sqlite:///:memory:", warehouse=format_file_uri_iceberg(tmp_path)
340
)
341
catalog.create_namespace("foo")
342
table = catalog.create_table(
343
"foo.bar",
344
schema=df.to_arrow().schema,
345
)
346
347
df.write_iceberg(table, mode="overwrite")
348
actual = pl.scan_iceberg(table).collect()
349
350
assert_frame_equal(df, actual)
351
352
# append on top of already written data, expecting twice the data
353
df.write_iceberg(table, mode="append")
354
# double the `df` by vertically stacking the dataframe on top of itself
355
expected = df.vstack(df)
356
actual = pl.scan_iceberg(table).collect()
357
assert_frame_equal(expected, actual, check_dtypes=False)
358
359
360
@pytest.mark.write_disk
361
def test_scan_iceberg_row_index_renamed(tmp_path: Path) -> None:
362
catalog = SqlCatalog(
363
"default",
364
uri="sqlite:///:memory:",
365
warehouse=format_file_uri_iceberg(tmp_path),
366
)
367
catalog.create_namespace("namespace")
368
369
catalog.create_table(
370
"namespace.table",
371
IcebergSchema(
372
NestedField(1, "row_index", IntegerType()),
373
NestedField(2, "file_path", StringType()),
374
),
375
)
376
377
tbl = catalog.load_table("namespace.table")
378
379
pl.DataFrame(
380
{"row_index": [0, 1, 2, 3, 4], "file_path": None},
381
schema={"row_index": pl.Int32, "file_path": pl.String},
382
).write_iceberg(tbl, mode="append")
383
384
with tbl.update_schema() as sch:
385
sch.rename_column("row_index", "row_index_in_file")
386
sch.rename_column("file_path", "file_path_in_file")
387
388
file_paths = [
389
_normalize_windows_iceberg_file_uri(x.file.file_path)
390
for x in tbl.scan().plan_files()
391
]
392
assert len(file_paths) == 1
393
394
q = pl.scan_parquet(
395
file_paths,
396
schema={
397
"row_index_in_file": pl.Int32,
398
"file_path_in_file": pl.String,
399
},
400
_column_mapping=(
401
"iceberg-column-mapping",
402
new_pl_iceberg_dataset(tbl).arrow_schema(),
403
),
404
include_file_paths="file_path",
405
row_index_name="row_index",
406
row_index_offset=3,
407
)
408
409
assert_frame_equal(
410
q.collect().with_columns(normalize_path_separator_pl(pl.col("file_path"))),
411
pl.DataFrame(
412
{
413
"row_index": [3, 4, 5, 6, 7],
414
"row_index_in_file": [0, 1, 2, 3, 4],
415
"file_path_in_file": None,
416
"file_path": file_paths[0],
417
},
418
schema={
419
"row_index": pl.get_index_type(),
420
"row_index_in_file": pl.Int32,
421
"file_path_in_file": pl.String,
422
"file_path": pl.String,
423
},
424
),
425
)
426
427
428
@pytest.mark.write_disk
429
def test_scan_iceberg_polars_storage_options_keys(
430
tmp_path: Path,
431
plmonkeypatch: PlMonkeyPatch,
432
capfd: pytest.CaptureFixture[str],
433
) -> None:
434
plmonkeypatch.setenv("POLARS_VERBOSE_SENSITIVE", "1")
435
catalog = SqlCatalog(
436
"default",
437
uri="sqlite:///:memory:",
438
warehouse=format_file_uri_iceberg(tmp_path),
439
)
440
catalog.create_namespace("namespace")
441
442
catalog.create_table(
443
"namespace.table",
444
IcebergSchema(
445
NestedField(1, "row_index", IntegerType()),
446
NestedField(2, "file_path", StringType()),
447
),
448
)
449
450
tbl = catalog.load_table("namespace.table")
451
452
pl.DataFrame(
453
{"row_index": [0, 1, 2, 3, 4], "file_path": None},
454
schema={"row_index": pl.Int32, "file_path": pl.String},
455
).write_iceberg(tbl, mode="append")
456
457
capfd.readouterr()
458
459
pl.scan_iceberg(
460
tbl,
461
storage_options={
462
"file_cache_ttl": 7,
463
"max_retries": 3,
464
"retry_timeout_ms": 9873,
465
"retry_init_backoff_ms": 9874,
466
"retry_max_backoff_ms": 9875,
467
"retry_base_multiplier": 3.14159,
468
},
469
).collect()
470
471
capture = capfd.readouterr().err
472
473
assert "file_cache_ttl: 7" in capture
474
475
assert (
476
"""\
477
max_retries: Some(3), \
478
retry_timeout: Some(9.873s), \
479
retry_init_backoff: Some(9.874s), \
480
retry_max_backoff: Some(9.875s), \
481
retry_base_multiplier: Some(TotalOrdWrap(3.14159)) }"""
482
in capture
483
)
484
485
486
@pytest.mark.write_disk
487
@pytest.mark.parametrize("reader_override", ["pyiceberg", "native"])
488
def test_scan_iceberg_collect_without_version_scans_latest(
489
tmp_path: Path,
490
reader_override: str,
491
capfd: pytest.CaptureFixture[str],
492
plmonkeypatch: PlMonkeyPatch,
493
) -> None:
494
catalog = SqlCatalog(
495
"default",
496
uri="sqlite:///:memory:",
497
warehouse=format_file_uri_iceberg(tmp_path),
498
)
499
500
catalog.create_namespace("namespace")
501
502
catalog.create_table(
503
"namespace.table",
504
IcebergSchema(
505
NestedField(1, "a", LongType()),
506
),
507
)
508
509
tbl = catalog.load_table("namespace.table")
510
511
q = pl.scan_iceberg(tbl, reader_override=reader_override) # type: ignore[arg-type]
512
513
assert_frame_equal(q.collect(), pl.DataFrame(schema={"a": pl.Int64}))
514
515
pl.DataFrame({"a": 1}).write_iceberg(tbl, mode="append")
516
517
assert_frame_equal(q.collect(), pl.DataFrame({"a": 1}))
518
519
snapshot = tbl.current_snapshot()
520
assert snapshot is not None
521
snapshot_id = snapshot.snapshot_id
522
523
q_with_id = pl.scan_iceberg(
524
tbl,
525
reader_override=reader_override, # type: ignore[arg-type]
526
snapshot_id=snapshot_id,
527
)
528
529
assert_frame_equal(q_with_id.collect(), pl.DataFrame({"a": 1}))
530
531
pl.DataFrame({"a": 2}).write_iceberg(tbl, mode="append")
532
533
assert_frame_equal(q.collect(), pl.DataFrame({"a": [2, 1]}))
534
535
plmonkeypatch.setenv("POLARS_VERBOSE", "1")
536
capfd.readouterr()
537
assert_frame_equal(q_with_id.collect(), pl.DataFrame({"a": 1}))
538
539
assert (
540
"IcebergDataset: to_dataset_scan(): early return (snapshot_id_key = "
541
in capfd.readouterr().err
542
)
543
544
545
@pytest.mark.write_disk
546
def test_scan_iceberg_extra_columns(tmp_path: Path) -> None:
547
catalog = SqlCatalog(
548
"default",
549
uri="sqlite:///:memory:",
550
warehouse=format_file_uri_iceberg(tmp_path),
551
)
552
catalog.create_namespace("namespace")
553
554
catalog.create_table(
555
"namespace.table",
556
IcebergSchema(
557
NestedField(1, "a", IntegerType()),
558
),
559
)
560
561
tbl = catalog.load_table("namespace.table")
562
563
pl.DataFrame(
564
{"a": [0, 1, 2, 3, 4]},
565
schema={"a": pl.Int32},
566
).write_iceberg(tbl, mode="append")
567
568
with tbl.update_schema() as sch:
569
sch.delete_column("a")
570
sch.add_column("a", IntegerType())
571
572
file_paths = [
573
_normalize_windows_iceberg_file_uri(x.file.file_path)
574
for x in tbl.scan().plan_files()
575
]
576
577
assert len(file_paths) == 1
578
579
q = pl.scan_parquet(
580
file_paths,
581
schema={"a": pl.Int32},
582
_column_mapping=(
583
"iceberg-column-mapping",
584
new_pl_iceberg_dataset(tbl).arrow_schema(),
585
),
586
)
587
588
# The original column is considered an extra column despite having the same
589
# name as the physical ID does not match.
590
591
with pytest.raises(
592
pl.exceptions.SchemaError,
593
match="extra column in file outside of expected schema: a",
594
):
595
q.collect()
596
597
q = pl.scan_parquet(
598
file_paths,
599
schema={"a": pl.Int32},
600
_column_mapping=(
601
"iceberg-column-mapping",
602
new_pl_iceberg_dataset(tbl).arrow_schema(),
603
),
604
extra_columns="ignore",
605
missing_columns="insert",
606
)
607
608
assert_frame_equal(
609
q.collect(),
610
pl.DataFrame(
611
{
612
"a": [None, None, None, None, None],
613
},
614
schema={"a": pl.Int32},
615
),
616
)
617
618
619
@pytest.mark.write_disk
620
def test_scan_iceberg_extra_struct_fields(tmp_path: Path) -> None:
621
catalog = SqlCatalog(
622
"default",
623
uri="sqlite:///:memory:",
624
warehouse=format_file_uri_iceberg(tmp_path),
625
)
626
catalog.create_namespace("namespace")
627
628
catalog.create_table(
629
"namespace.table",
630
IcebergSchema(
631
NestedField(1, "a", StructType(NestedField(1, "a", IntegerType()))),
632
),
633
)
634
635
tbl = catalog.load_table("namespace.table")
636
637
pl.DataFrame(
638
{"a": [{"a": 1}, {"a": 2}, {"a": 3}, {"a": 4}, {"a": 5}]},
639
schema={"a": pl.Struct({"a": pl.Int32})},
640
).write_iceberg(tbl, mode="append")
641
642
with tbl.update_schema() as sch:
643
sch.delete_column(("a", "a"))
644
sch.add_column(("a", "a"), IntegerType())
645
646
file_paths = [
647
_normalize_windows_iceberg_file_uri(x.file.file_path)
648
for x in tbl.scan().plan_files()
649
]
650
651
assert len(file_paths) == 1
652
653
q = pl.scan_parquet(
654
file_paths,
655
schema={"a": pl.Struct({"a": pl.Int32})},
656
_column_mapping=(
657
"iceberg-column-mapping",
658
new_pl_iceberg_dataset(tbl).arrow_schema(),
659
),
660
)
661
662
# The original column is considered an extra column despite having the same
663
# name as the physical ID does not match.
664
665
with pytest.raises(
666
pl.exceptions.SchemaError,
667
match="encountered extra struct field: a",
668
):
669
q.collect()
670
671
q = pl.scan_parquet(
672
file_paths,
673
schema={"a": pl.Struct({"a": pl.Int32})},
674
_column_mapping=(
675
"iceberg-column-mapping",
676
new_pl_iceberg_dataset(tbl).arrow_schema(),
677
),
678
cast_options=pl.ScanCastOptions(
679
extra_struct_fields="ignore", missing_struct_fields="insert"
680
),
681
)
682
683
assert_frame_equal(
684
q.collect(),
685
pl.DataFrame(
686
{
687
"a": [
688
{"a": None},
689
{"a": None},
690
{"a": None},
691
{"a": None},
692
{"a": None},
693
],
694
},
695
schema={"a": pl.Struct({"a": pl.Int32})},
696
),
697
)
698
699
700
@pytest.mark.write_disk
701
def test_scan_iceberg_column_deletion(tmp_path: Path) -> None:
702
catalog = SqlCatalog(
703
"default",
704
uri="sqlite:///:memory:",
705
warehouse=format_file_uri_iceberg(tmp_path),
706
)
707
catalog.create_namespace("namespace")
708
709
catalog.create_table(
710
"namespace.table",
711
IcebergSchema(
712
NestedField(1, "a", StructType(NestedField(0, "inner", StringType())))
713
),
714
)
715
716
tbl = catalog.load_table("namespace.table")
717
718
pl.DataFrame({"a": [{"inner": "A"}]}).write_iceberg(tbl, mode="append")
719
720
with tbl.update_schema() as sch:
721
sch.delete_column("a").add_column(
722
"a", StructType(NestedField(0, "inner", StringType()))
723
)
724
725
pl.DataFrame({"a": [{"inner": "A"}]}).write_iceberg(tbl, mode="append")
726
727
expect = pl.DataFrame({"a": [{"inner": "A"}, None]})
728
729
assert_frame_equal(
730
pl.scan_iceberg(tbl, reader_override="pyiceberg").collect(),
731
expect,
732
)
733
734
assert_frame_equal(
735
pl.scan_iceberg(tbl, reader_override="native").collect(),
736
expect,
737
)
738
739
740
@pytest.mark.write_disk
741
def test_scan_iceberg_nested_column_cast_deletion_rename(tmp_path: Path) -> None:
742
catalog = SqlCatalog(
743
"default",
744
uri="sqlite:///:memory:",
745
warehouse=format_file_uri_iceberg(tmp_path),
746
)
747
catalog.create_namespace("namespace")
748
749
next_field_id = partial(next, itertools.count())
750
751
catalog.create_table(
752
"namespace.table",
753
IcebergSchema(
754
NestedField(
755
field_id=next_field_id(),
756
name="column_1",
757
field_type=ListType(
758
element_id=next_field_id(),
759
element=StructType(
760
NestedField(
761
field_id=next_field_id(),
762
name="field_1",
763
field_type=MapType(
764
key_id=next_field_id(),
765
key_type=ListType(
766
element_id=next_field_id(), element=TimestampType(), element_required=False
767
),
768
value_id=next_field_id(),
769
value_type=ListType(
770
element_id=next_field_id(), element=IntegerType(), element_required=False
771
),
772
value_required=False,
773
),
774
required=False,
775
),
776
NestedField(
777
field_id=next_field_id(), name="field_2", field_type=IntegerType(), required=False
778
),
779
NestedField(
780
field_id=next_field_id(), name="field_3", field_type=StringType(), required=False
781
),
782
),
783
element_required=False,
784
),
785
required=False,
786
),
787
NestedField(field_id=next_field_id(), name="column_2", field_type=StringType(), required=False),
788
NestedField(
789
field_id=next_field_id(),
790
name="column_3",
791
field_type=MapType(
792
key_id=next_field_id(),
793
key_type=StructType(
794
NestedField(
795
field_id=next_field_id(), name="field_1", field_type=IntegerType(), required=False
796
),
797
NestedField(
798
field_id=next_field_id(), name="field_2", field_type=IntegerType(), required=False
799
),
800
NestedField(
801
field_id=next_field_id(), name="field_3", field_type=IntegerType(), required=False
802
),
803
),
804
value_id=next_field_id(),
805
value_type=StructType(
806
NestedField(
807
field_id=next_field_id(), name="field_1", field_type=IntegerType(), required=False
808
),
809
NestedField(
810
field_id=next_field_id(), name="field_2", field_type=IntegerType(), required=False
811
),
812
NestedField(
813
field_id=next_field_id(), name="field_3", field_type=IntegerType(), required=False
814
),
815
),
816
value_required=False,
817
),
818
required=False,
819
),
820
),
821
) # fmt: skip
822
823
tbl = catalog.load_table("namespace.table")
824
825
df_dict = {
826
"column_1": [
827
[
828
{
829
"field_1": [
830
{"key": [datetime(2025, 1, 1), None], "value": [1, 2, None]},
831
{"key": [datetime(2025, 1, 1), None], "value": None},
832
],
833
"field_2": 7,
834
"field_3": "F3",
835
}
836
],
837
[
838
{
839
"field_1": [{"key": [datetime(2025, 1, 1), None], "value": None}],
840
"field_2": 7,
841
"field_3": "F3",
842
}
843
],
844
[{"field_1": [], "field_2": None, "field_3": None}],
845
[None],
846
[],
847
],
848
"column_2": ["1", "2", "3", "4", None],
849
"column_3": [
850
[
851
{
852
"key": {"field_1": 1, "field_2": 2, "field_3": 3},
853
"value": {"field_1": 7, "field_2": 8, "field_3": 9},
854
}
855
],
856
[
857
{
858
"key": {"field_1": 1, "field_2": 2, "field_3": 3},
859
"value": {"field_1": 7, "field_2": 8, "field_3": 9},
860
}
861
],
862
[
863
{
864
"key": {"field_1": None, "field_2": None, "field_3": None},
865
"value": {"field_1": None, "field_2": None, "field_3": None},
866
}
867
],
868
[
869
{
870
"key": {"field_1": None, "field_2": None, "field_3": None},
871
"value": None,
872
}
873
],
874
[],
875
],
876
}
877
878
df = pl.DataFrame(
879
df_dict,
880
schema={
881
"column_1": pl.List(
882
pl.Struct(
883
{
884
"field_1": pl.List(
885
pl.Struct({"key": pl.List(pl.Datetime("us")), "value": pl.List(pl.Int32)})
886
),
887
"field_2": pl.Int32,
888
"field_3": pl.String,
889
}
890
)
891
),
892
"column_2": pl.String,
893
"column_3": pl.List(
894
pl.Struct(
895
{
896
"key": pl.Struct({"field_1": pl.Int32, "field_2": pl.Int32, "field_3": pl.Int32}),
897
"value": pl.Struct({"field_1": pl.Int32, "field_2": pl.Int32, "field_3": pl.Int32}),
898
}
899
)
900
),
901
},
902
) # fmt: skip
903
904
# The Iceberg table schema has a `Map` type, whereas the polars DataFrame
905
# stores `list[struct{..}]` - directly using `write_iceberg()` causes the
906
# following error:
907
# * ValueError: PyArrow table contains more columns:
908
# column_1.element.field_1.element
909
# We workaround this by constructing a pyarrow table an arrow schema.
910
arrow_tbl = pa.Table.from_pydict(
911
df_dict,
912
schema=pa.schema(
913
[
914
(
915
"column_1",
916
pa.large_list(
917
pa.struct(
918
[
919
(
920
"field_1",
921
pa.map_(pa.large_list(pa.timestamp("us")), pa.large_list(pa.int32())),
922
),
923
("field_2", pa.int32()),
924
("field_3", pa.string()),
925
]
926
)
927
),
928
),
929
("column_2", pa.string()),
930
(
931
"column_3",
932
pa.map_(
933
pa.struct([("field_1", pa.int32()), ("field_2", pa.int32()), ("field_3", pa.int32())]),
934
pa.struct([("field_1", pa.int32()), ("field_2", pa.int32()), ("field_3", pa.int32())]),
935
),
936
),
937
]
938
),
939
) # fmt: skip
940
941
assert_frame_equal(pl.DataFrame(arrow_tbl), df)
942
943
tbl.append(arrow_tbl)
944
945
assert_frame_equal(pl.scan_iceberg(tbl, reader_override="pyiceberg").collect(), df)
946
assert_frame_equal(pl.scan_iceberg(tbl, reader_override="native").collect(), df)
947
948
# Change schema
949
# Note: Iceberg doesn't allow modifying the "key" part of the Map type.
950
951
# Promote types
952
with tbl.update_schema() as sch:
953
sch.update_column(("column_1", "field_2"), LongType())
954
sch.update_column(("column_3", "value", "field_1"), LongType())
955
sch.update_column(("column_3", "value", "field_2"), LongType())
956
sch.update_column(("column_3", "value", "field_3"), LongType())
957
958
# Delete/Rename:
959
# * Delete `*_2` fields
960
# * Rename:
961
# * `{x}_1` -> `{x}_2`
962
# * `{x}_3` -> `{x}_1`
963
# * And move the field position to 1st
964
965
# Delete `*_2` fields/columns.
966
with tbl.update_schema() as sch:
967
sch.delete_column("column_2")
968
sch.delete_column(("column_3", "value", "field_2"))
969
sch.delete_column(("column_1", "field_2"))
970
971
# Shift nested fields in `column_1`
972
with tbl.update_schema() as sch:
973
sch.rename_column(("column_1", "field_1"), "field_2")
974
975
with tbl.update_schema() as sch:
976
sch.rename_column(("column_1", "field_3"), "field_1")
977
978
with tbl.update_schema() as sch:
979
sch.move_first(("column_1", "field_1"))
980
981
# Shift nested fields in `column_2`
982
with tbl.update_schema() as sch:
983
sch.rename_column(("column_3", "value", "field_1"), "field_2")
984
985
with tbl.update_schema() as sch:
986
sch.rename_column(("column_3", "value", "field_3"), "field_1")
987
988
with tbl.update_schema() as sch:
989
sch.move_first(("column_3", "value", "field_1"))
990
991
# Shift top-level columns
992
with tbl.update_schema() as sch:
993
sch.rename_column("column_1", "column_2")
994
995
with tbl.update_schema() as sch:
996
sch.rename_column("column_3", "column_1")
997
998
with tbl.update_schema() as sch:
999
sch.move_first("column_1")
1000
1001
expect = pl.DataFrame(
1002
{
1003
"column_2": [
1004
[
1005
{
1006
"field_2": [
1007
{"key": [datetime(2025, 1, 1, 0, 0), None], "value": [1, 2, None]},
1008
{"key": [datetime(2025, 1, 1), None], "value": None},
1009
],
1010
"field_1": "F3",
1011
}
1012
],
1013
[{"field_2": [{"key": [datetime(2025, 1, 1, 0, 0), None], "value": None}], "field_1": "F3"}],
1014
[{"field_2": [], "field_1": None}],
1015
[None],
1016
[],
1017
],
1018
"column_1": [
1019
[{"key": {"field_1": 1, "field_2": 2, "field_3": 3}, "value": {"field_2": 7, "field_1": 9}}],
1020
[{"key": {"field_1": 1, "field_2": 2, "field_3": 3}, "value": {"field_2": 7, "field_1": 9}}],
1021
[
1022
{
1023
"key": {"field_1": None, "field_2": None, "field_3": None},
1024
"value": {"field_2": None, "field_1": None},
1025
}
1026
],
1027
[{"key": {"field_1": None, "field_2": None, "field_3": None}, "value": None}],
1028
[],
1029
],
1030
},
1031
schema={
1032
"column_1": pl.List(
1033
pl.Struct(
1034
{
1035
"key": pl.Struct({"field_1": pl.Int32, "field_2": pl.Int32, "field_3": pl.Int32}),
1036
"value": pl.Struct({"field_1": pl.Int64, "field_2": pl.Int64}),
1037
}
1038
)
1039
),
1040
"column_2": pl.List(
1041
pl.Struct(
1042
{
1043
"field_1": pl.String,
1044
"field_2": pl.List(
1045
pl.Struct(
1046
{
1047
"key": pl.List(pl.Datetime(time_unit="us", time_zone=None)),
1048
"value": pl.List(pl.Int32),
1049
}
1050
)
1051
),
1052
}
1053
)
1054
),
1055
},
1056
) # fmt: skip
1057
1058
assert_frame_equal(
1059
pl.scan_iceberg(tbl, reader_override="pyiceberg").collect(), expect
1060
)
1061
assert_frame_equal(pl.scan_iceberg(tbl, reader_override="native").collect(), expect)
1062
1063
1064
@pytest.mark.write_disk
1065
@pytest.mark.xfail(
1066
reason="""\
1067
[Upstream Issue]
1068
PyIceberg writes NULL as empty lists into the Parquet file.
1069
* Issue on Polars repo - https://github.com/pola-rs/polars/issues/23715
1070
* Issue on PyIceberg repo - https://github.com/apache/iceberg-python/issues/2246
1071
"""
1072
)
1073
def test_scan_iceberg_nulls_multiple_nesting(tmp_path: Path) -> None:
1074
catalog = SqlCatalog(
1075
"default",
1076
uri="sqlite:///:memory:",
1077
warehouse=format_file_uri_iceberg(tmp_path),
1078
)
1079
catalog.create_namespace("namespace")
1080
1081
next_field_id = partial(next, itertools.count())
1082
1083
catalog.create_table(
1084
"namespace.table",
1085
IcebergSchema(
1086
NestedField(
1087
field_id=next_field_id(),
1088
name="column_1",
1089
field_type=ListType(
1090
element_id=next_field_id(),
1091
element=StructType(
1092
NestedField(
1093
field_id=next_field_id(),
1094
name="field_1",
1095
field_type=ListType(
1096
element_id=next_field_id(),
1097
element=StructType(
1098
NestedField(field_id=next_field_id(), name="key", field_type=ListType(
1099
element_id=next_field_id(),
1100
element=TimestampType(),
1101
element_required=False,
1102
), required=True),
1103
NestedField(field_id=next_field_id(), name="value", field_type=ListType(
1104
element_id=next_field_id(),
1105
element=IntegerType(),
1106
element_required=False,
1107
), required=False),
1108
),
1109
element_required=False
1110
),
1111
required=False,
1112
),
1113
NestedField(field_id=next_field_id(), name="field_2", field_type=IntegerType(), required=False),
1114
NestedField(field_id=next_field_id(), name="field_3", field_type=StringType(), required=False),
1115
),
1116
element_required=False,
1117
),
1118
required=False,
1119
),
1120
),
1121
) # fmt: skip
1122
1123
tbl = catalog.load_table("namespace.table")
1124
1125
df_dict = {
1126
"column_1": [
1127
[
1128
{
1129
"field_1": [
1130
{"key": [datetime(2025, 1, 1), None], "value": [1, 2, None]}
1131
],
1132
"field_2": 7,
1133
"field_3": "F3",
1134
}
1135
],
1136
[
1137
{
1138
"field_1": [{"key": [datetime(2025, 1, 1), None], "value": None}],
1139
"field_2": 7,
1140
"field_3": "F3",
1141
}
1142
],
1143
[{"field_1": None, "field_2": None, "field_3": None}],
1144
[None],
1145
None,
1146
],
1147
}
1148
1149
df = pl.DataFrame(
1150
df_dict,
1151
schema={
1152
"column_1": pl.List(
1153
pl.Struct(
1154
{
1155
"field_1": pl.List(
1156
pl.Struct(
1157
{
1158
"key": pl.List(pl.Datetime("us")),
1159
"value": pl.List(pl.Int32),
1160
}
1161
)
1162
),
1163
"field_2": pl.Int32,
1164
"field_3": pl.String,
1165
}
1166
)
1167
),
1168
},
1169
)
1170
1171
arrow_tbl = pa.Table.from_pydict(
1172
df_dict,
1173
schema=pa.schema(
1174
[
1175
(
1176
"column_1",
1177
pa.large_list(
1178
pa.struct(
1179
[
1180
(
1181
"field_1",
1182
pa.large_list(
1183
pa.struct(
1184
[
1185
pa.field(
1186
"key",
1187
pa.large_list(pa.timestamp("us")),
1188
nullable=False,
1189
),
1190
("value", pa.large_list(pa.int32())),
1191
]
1192
)
1193
),
1194
),
1195
("field_2", pa.int32()),
1196
("field_3", pa.string()),
1197
]
1198
)
1199
),
1200
)
1201
]
1202
),
1203
)
1204
1205
assert_frame_equal(pl.DataFrame(arrow_tbl), df)
1206
1207
tbl.append(arrow_tbl)
1208
1209
assert_frame_equal(pl.scan_iceberg(tbl, reader_override="pyiceberg").collect(), df)
1210
assert_frame_equal(pl.scan_iceberg(tbl, reader_override="native").collect(), df)
1211
1212
1213
@pytest.mark.write_disk
1214
def test_scan_iceberg_nulls_nested(tmp_path: Path) -> None:
1215
catalog = SqlCatalog(
1216
"default",
1217
uri="sqlite:///:memory:",
1218
warehouse=format_file_uri_iceberg(tmp_path),
1219
)
1220
catalog.create_namespace("namespace")
1221
1222
next_field_id = partial(next, itertools.count())
1223
1224
catalog.create_table(
1225
"namespace.table",
1226
IcebergSchema(
1227
NestedField(
1228
field_id=next_field_id(),
1229
name="column_1",
1230
field_type=ListType(
1231
element_id=next_field_id(),
1232
element=IntegerType(),
1233
element_required=False,
1234
),
1235
required=False,
1236
),
1237
),
1238
)
1239
1240
tbl = catalog.load_table("namespace.table")
1241
1242
df = pl.DataFrame(
1243
{
1244
"column_1": [
1245
[1, 2],
1246
[None],
1247
None,
1248
],
1249
},
1250
schema={
1251
"column_1": pl.List(pl.Int32),
1252
},
1253
)
1254
1255
df_dict = df.to_dict(as_series=False)
1256
1257
assert_frame_equal(pl.DataFrame(df_dict, schema=df.schema), df)
1258
1259
arrow_tbl = pa.Table.from_pydict(
1260
df_dict,
1261
schema=pa.schema(
1262
[
1263
(
1264
"column_1",
1265
pa.large_list(pa.int32()),
1266
)
1267
]
1268
),
1269
)
1270
1271
assert_frame_equal(pl.DataFrame(arrow_tbl), df)
1272
1273
tbl.append(arrow_tbl)
1274
1275
assert_frame_equal(pl.scan_iceberg(tbl, reader_override="pyiceberg").collect(), df)
1276
assert_frame_equal(pl.scan_iceberg(tbl, reader_override="native").collect(), df)
1277
1278
1279
@pytest.mark.write_disk
1280
def test_scan_iceberg_parquet_prefilter_with_column_mapping(
1281
tmp_path: Path,
1282
plmonkeypatch: PlMonkeyPatch,
1283
capfd: pytest.CaptureFixture[str],
1284
) -> None:
1285
catalog = SqlCatalog(
1286
"default",
1287
uri="sqlite:///:memory:",
1288
warehouse=format_file_uri_iceberg(tmp_path),
1289
)
1290
catalog.create_namespace("namespace")
1291
1292
next_field_id = partial(next, itertools.count())
1293
1294
catalog.create_table(
1295
"namespace.table",
1296
IcebergSchema(
1297
NestedField(
1298
field_id=next_field_id(),
1299
name="column_1",
1300
field_type=StringType(),
1301
required=False,
1302
),
1303
NestedField(
1304
field_id=next_field_id(),
1305
name="column_2",
1306
field_type=IntegerType(),
1307
required=False,
1308
),
1309
NestedField(
1310
field_id=next_field_id(),
1311
name="column_3",
1312
field_type=StringType(),
1313
required=False,
1314
),
1315
),
1316
)
1317
1318
tbl = catalog.load_table("namespace.table")
1319
1320
df = pl.DataFrame(
1321
{
1322
"column_1": ["A", "B", "C", "D", "E", "F"],
1323
"column_2": pl.Series([1, 2, 3, 4, 5, 6], dtype=pl.Int32),
1324
"column_3": ["P", "Q", "R", "S", "T", "U"],
1325
}
1326
)
1327
1328
df.slice(0, 3).write_iceberg(tbl, mode="append")
1329
df.slice(3).write_iceberg(tbl, mode="append")
1330
1331
with tbl.update_schema() as sch:
1332
sch.update_column("column_2", LongType())
1333
1334
with tbl.update_schema() as sch:
1335
sch.delete_column("column_1")
1336
1337
with tbl.update_schema() as sch:
1338
sch.rename_column("column_3", "column_1")
1339
1340
with tbl.update_schema() as sch:
1341
sch.rename_column("column_2", "column_3")
1342
1343
with tbl.update_schema() as sch:
1344
sch.move_first("column_1")
1345
1346
assert_frame_equal(
1347
pl.scan_iceberg(tbl, reader_override="native").collect().sort("column_3"),
1348
pl.DataFrame(
1349
{
1350
"column_1": ["P", "Q", "R", "S", "T", "U"],
1351
"column_3": pl.Series([1, 2, 3, 4, 5, 6], dtype=pl.Int64),
1352
}
1353
),
1354
)
1355
1356
# Upstream issue - PyIceberg filter does not handle schema evolution
1357
with pytest.raises(Exception, match="unpack requires a buffer of 8 bytes"):
1358
pl.scan_iceberg(
1359
tbl, reader_override="native", use_pyiceberg_filter=True
1360
).filter(pl.col("column_3") == 5).collect()
1361
1362
q = pl.scan_iceberg(
1363
tbl, reader_override="native", use_pyiceberg_filter=False
1364
).filter(pl.col("column_3") == 5)
1365
1366
with plmonkeypatch.context() as cx:
1367
cx.setenv("POLARS_VERBOSE", "1")
1368
cx.setenv("POLARS_FORCE_EMPTY_READER_CAPABILITIES", "0")
1369
capfd.readouterr()
1370
out = q.collect()
1371
capture = capfd.readouterr().err
1372
1373
assert_frame_equal(
1374
out,
1375
pl.DataFrame(
1376
{
1377
"column_1": ["T"],
1378
"column_3": pl.Series([5], dtype=pl.Int64),
1379
}
1380
),
1381
)
1382
1383
# First file
1384
assert "Source filter mask initialization via table statistics" in capture
1385
assert "Predicate pushdown allows skipping 1 / 2 files" in capture
1386
# Second file
1387
assert (
1388
"[ParquetFileReader]: Predicate pushdown: reading 1 / 1 row groups" in capture
1389
)
1390
assert (
1391
"[ParquetFileReader]: Pre-filtered decode enabled (1 live, 1 non-live)"
1392
in capture
1393
)
1394
1395
1396
# Note: This test also generally covers primitive type round-tripping.
1397
@pytest.mark.parametrize("test_uuid", [True, False])
1398
@pytest.mark.write_disk
1399
def test_fill_missing_fields_with_identity_partition_values(
1400
test_uuid: bool, tmp_path: Path
1401
) -> None:
1402
from datetime import time
1403
1404
catalog = SqlCatalog(
1405
"default",
1406
uri="sqlite:///:memory:",
1407
warehouse=format_file_uri_iceberg(tmp_path),
1408
)
1409
catalog.create_namespace("namespace")
1410
1411
min_version = parse_version(pyiceberg.__version__) >= (0, 10, 0)
1412
1413
test_decimal_and_fixed = min_version
1414
test_uuid = test_uuid and min_version
1415
1416
next_field_id = partial(next, itertools.count(1))
1417
1418
iceberg_schema = IcebergSchema(
1419
NestedField(next_field_id(), "height_provider", IntegerType()),
1420
NestedField(next_field_id(), "BooleanType", BooleanType()),
1421
NestedField(next_field_id(), "IntegerType", IntegerType()),
1422
NestedField(next_field_id(), "LongType", LongType()),
1423
NestedField(next_field_id(), "FloatType", FloatType()),
1424
NestedField(next_field_id(), "DoubleType", DoubleType()),
1425
NestedField(next_field_id(), "DateType", DateType()),
1426
NestedField(next_field_id(), "TimeType", TimeType()),
1427
NestedField(next_field_id(), "TimestampType", TimestampType()),
1428
NestedField(next_field_id(), "TimestamptzType", TimestamptzType()),
1429
NestedField(next_field_id(), "StringType", StringType()),
1430
NestedField(next_field_id(), "BinaryType", BinaryType()),
1431
*(
1432
[
1433
NestedField(next_field_id(), "DecimalType", DecimalType(18, 2)),
1434
NestedField(next_field_id(), "FixedType", FixedType(1)),
1435
]
1436
if test_decimal_and_fixed
1437
else []
1438
),
1439
*([NestedField(next_field_id(), "UUIDType", UUIDType())] if test_uuid else []),
1440
)
1441
1442
arrow_tbl = pa.Table.from_pydict(
1443
{
1444
"height_provider": [1],
1445
"BooleanType": [True],
1446
"IntegerType": [1],
1447
"LongType": [1],
1448
"FloatType": [1.0],
1449
"DoubleType": [1.0],
1450
"DateType": [date(2025, 1, 1)],
1451
"TimeType": [time(11, 30)],
1452
"TimestampType": [datetime(2025, 1, 1)],
1453
"TimestamptzType": [datetime(2025, 1, 1)],
1454
"StringType": ["A"],
1455
"BinaryType": [b"A"],
1456
**(
1457
{"DecimalType": [D("1.0")], "FixedType": [b"A"]}
1458
if test_decimal_and_fixed
1459
else {}
1460
),
1461
**({"UUIDType": [b"0000111100001111"]} if test_uuid else {}),
1462
},
1463
schema=schema_to_pyarrow(iceberg_schema, include_field_ids=False),
1464
)
1465
1466
tbl = catalog.create_table(
1467
"namespace.table",
1468
iceberg_schema,
1469
partition_spec=PartitionSpec(
1470
# We have this to offset the indices
1471
PartitionField(
1472
iceberg_schema.fields[0].field_id, 0, BucketTransform(32), "bucket"
1473
),
1474
*(
1475
PartitionField(field.field_id, 0, IdentityTransform(), field.name)
1476
for field in iceberg_schema.fields[1:]
1477
),
1478
),
1479
)
1480
1481
if test_uuid:
1482
# Note: If this starts working one day we can include it in tests.
1483
with pytest.raises(
1484
pa.ArrowNotImplementedError,
1485
match=r"Keys of type extension<arrow\.uuid>",
1486
):
1487
tbl.append(arrow_tbl)
1488
1489
return
1490
1491
tbl.append(arrow_tbl)
1492
1493
expect = pl.DataFrame(
1494
[
1495
pl.Series('height_provider', [1], dtype=pl.Int32),
1496
pl.Series('BooleanType', [True], dtype=pl.Boolean),
1497
pl.Series('IntegerType', [1], dtype=pl.Int32),
1498
pl.Series('LongType', [1], dtype=pl.Int64),
1499
pl.Series('FloatType', [1.0], dtype=pl.Float32),
1500
pl.Series('DoubleType', [1.0], dtype=pl.Float64),
1501
pl.Series('DateType', [date(2025, 1, 1)], dtype=pl.Date),
1502
pl.Series('TimeType', [time(11, 30)], dtype=pl.Time),
1503
pl.Series('TimestampType', [datetime(2025, 1, 1, 0, 0)], dtype=pl.Datetime(time_unit='us', time_zone=None)),
1504
pl.Series('TimestamptzType', [datetime(2025, 1, 1, 0, 0, tzinfo=zoneinfo.ZoneInfo(key='UTC'))], dtype=pl.Datetime(time_unit='us', time_zone='UTC')),
1505
pl.Series('StringType', ['A'], dtype=pl.String),
1506
pl.Series('BinaryType', [b'A'], dtype=pl.Binary),
1507
*(
1508
[
1509
pl.Series('DecimalType', [D('1.00')], dtype=pl.Decimal(precision=18, scale=2)),
1510
pl.Series('FixedType', [b'A'], dtype=pl.Binary),
1511
]
1512
if test_decimal_and_fixed
1513
else []
1514
),
1515
]
1516
) # fmt: skip
1517
1518
assert_frame_equal(
1519
pl.scan_iceberg(tbl, reader_override="pyiceberg").collect(),
1520
expect,
1521
)
1522
1523
assert_frame_equal(
1524
pl.scan_iceberg(tbl, reader_override="native").collect(),
1525
expect,
1526
)
1527
1528
dfiles = [*tbl.scan().plan_files()]
1529
1530
assert len(dfiles) == 1
1531
1532
p = dfiles[0].file.file_path.removeprefix("file://")
1533
1534
# Drop every column except 'height_provider'
1535
pq.write_table(
1536
pa.Table.from_pydict(
1537
{"height_provider": [1]},
1538
schema=schema_to_pyarrow(iceberg_schema.select("height_provider")),
1539
),
1540
p,
1541
)
1542
1543
out = pl.DataFrame(tbl.scan().to_arrow())
1544
1545
assert_frame_equal(
1546
out.select(pl.col(c).cast(dt) for c, dt in expect.schema.items()),
1547
expect,
1548
)
1549
1550
assert_frame_equal(pl.scan_iceberg(tbl, reader_override="native").collect(), expect)
1551
1552
1553
@pytest.mark.write_disk
1554
def test_fill_missing_fields_with_identity_partition_values_nested(
1555
tmp_path: Path,
1556
) -> None:
1557
catalog = SqlCatalog(
1558
"default",
1559
uri="sqlite:///:memory:",
1560
warehouse=format_file_uri_iceberg(tmp_path),
1561
)
1562
catalog.create_namespace("namespace")
1563
1564
next_field_id = partial(next, itertools.count(1))
1565
1566
iceberg_schema = IcebergSchema(
1567
NestedField(next_field_id(), "height_provider", IntegerType()),
1568
NestedField(
1569
next_field_id(),
1570
"struct_1",
1571
StructType(
1572
NestedField(
1573
next_field_id(),
1574
"struct_2",
1575
StructType(NestedField(2001, "field_1", LongType())),
1576
)
1577
),
1578
),
1579
)
1580
1581
tbl = catalog.create_table(
1582
"namespace.table",
1583
iceberg_schema,
1584
partition_spec=PartitionSpec(
1585
PartitionField(2001, 0, IdentityTransform(), "field_1")
1586
),
1587
)
1588
1589
pl.DataFrame(
1590
{"height_provider": [0], "struct_1": [{"struct_2": {"field_1": 300}}]},
1591
schema=pl.Schema(iceberg_schema.as_arrow()),
1592
).write_iceberg(tbl, mode="append")
1593
1594
expect = pl.DataFrame(
1595
[
1596
pl.Series("height_provider", [0], dtype=pl.Int32),
1597
pl.Series(
1598
"struct_1",
1599
[{"struct_2": {"field_1": 300}}],
1600
dtype=pl.Struct({"struct_2": pl.Struct({"field_1": pl.Int64})}),
1601
),
1602
]
1603
)
1604
1605
assert_frame_equal(
1606
pl.scan_iceberg(tbl, reader_override="pyiceberg").collect(),
1607
expect,
1608
)
1609
1610
assert_frame_equal(
1611
pl.scan_iceberg(tbl, reader_override="native").collect(),
1612
expect,
1613
)
1614
1615
# Note: We will still match even if the partition field is renamed, since it still
1616
# has the same source field ID.
1617
with tbl.update_spec() as spu:
1618
spu.rename_field("field_1", "AAA")
1619
1620
pl.DataFrame(
1621
{"height_provider": [None], "struct_1": [{"struct_2": {"field_1": 301}}]},
1622
schema=pl.Schema(iceberg_schema.as_arrow()),
1623
).write_iceberg(tbl, mode="append")
1624
1625
with tbl.update_spec() as spu:
1626
spu.remove_field("AAA")
1627
1628
pl.DataFrame(
1629
{"height_provider": [None], "struct_1": [{"struct_2": {"field_1": 302}}]},
1630
schema=pl.Schema(iceberg_schema.as_arrow()),
1631
).write_iceberg(tbl, mode="append")
1632
1633
for i, data_file in enumerate(tbl.scan().plan_files()):
1634
p = data_file.file.file_path.removeprefix("file://")
1635
1636
pq.write_table(
1637
pa.Table.from_pydict(
1638
{"height_provider": [i]},
1639
schema=schema_to_pyarrow(iceberg_schema.select("height_provider")),
1640
),
1641
p,
1642
)
1643
1644
# Deleting partitions only takes effect for newly added files.
1645
expect = pl.DataFrame(
1646
[
1647
pl.Series("height_provider", [0, 1, 2], dtype=pl.Int32),
1648
pl.Series(
1649
"struct_1",
1650
[
1651
None,
1652
{"struct_2": {"field_1": 301}},
1653
{"struct_2": {"field_1": 300}},
1654
],
1655
dtype=pl.Struct({"struct_2": pl.Struct({"field_1": pl.Int64})}),
1656
),
1657
]
1658
)
1659
1660
assert_frame_equal(pl.scan_iceberg(tbl, reader_override="native").collect(), expect)
1661
assert_frame_equal(
1662
pl.scan_iceberg(tbl, reader_override="native").select("struct_1").collect(),
1663
expect.select("struct_1"),
1664
)
1665
1666
1667
@pytest.mark.write_disk
1668
def test_scan_iceberg_min_max_statistics_filter(
1669
tmp_path: Path,
1670
plmonkeypatch: PlMonkeyPatch,
1671
capfd: pytest.CaptureFixture[str],
1672
) -> None:
1673
import datetime
1674
1675
catalog = SqlCatalog(
1676
"default",
1677
uri="sqlite:///:memory:",
1678
warehouse=format_file_uri_iceberg(tmp_path),
1679
)
1680
catalog.create_namespace("namespace")
1681
1682
test_decimal_and_fixed = parse_version(pyiceberg.__version__) >= (0, 10, 0)
1683
1684
next_field_id = partial(next, itertools.count(1))
1685
1686
iceberg_schema = IcebergSchema(
1687
NestedField(next_field_id(), "height_provider", IntegerType()),
1688
NestedField(next_field_id(), "BooleanType", BooleanType()),
1689
NestedField(next_field_id(), "IntegerType", IntegerType()),
1690
NestedField(next_field_id(), "LongType", LongType()),
1691
NestedField(next_field_id(), "FloatType", FloatType()),
1692
NestedField(next_field_id(), "DoubleType", DoubleType()),
1693
NestedField(next_field_id(), "DateType", DateType()),
1694
NestedField(next_field_id(), "TimeType", TimeType()),
1695
NestedField(next_field_id(), "TimestampType", TimestampType()),
1696
NestedField(next_field_id(), "TimestamptzType", TimestamptzType()),
1697
NestedField(next_field_id(), "StringType", StringType()),
1698
NestedField(next_field_id(), "BinaryType", BinaryType()),
1699
*(
1700
[
1701
NestedField(next_field_id(), "DecimalType", DecimalType(18, 2)),
1702
NestedField(
1703
next_field_id(), "DecimalTypeLargeValue", DecimalType(38, 0)
1704
),
1705
NestedField(
1706
next_field_id(), "DecimalTypeLargeNegativeValue", DecimalType(38, 0)
1707
),
1708
NestedField(next_field_id(), "FixedType", FixedType(1)),
1709
]
1710
if test_decimal_and_fixed
1711
else []
1712
),
1713
)
1714
1715
pl_schema = pl.Schema(
1716
{
1717
"height_provider": pl.Int32(),
1718
"BooleanType": pl.Boolean(),
1719
"IntegerType": pl.Int32(),
1720
"LongType": pl.Int64(),
1721
"FloatType": pl.Float32(),
1722
"DoubleType": pl.Float64(),
1723
"DateType": pl.Date(),
1724
"TimeType": pl.Time(),
1725
"TimestampType": pl.Datetime(time_unit="us", time_zone=None),
1726
"TimestamptzType": pl.Datetime(time_unit="us", time_zone="UTC"),
1727
"StringType": pl.String(),
1728
"BinaryType": pl.Binary(),
1729
"DecimalType": pl.Decimal(precision=18, scale=2),
1730
"DecimalTypeLargeValue": pl.Decimal(precision=38, scale=0),
1731
"DecimalTypeLargeNegativeValue": pl.Decimal(precision=38, scale=0),
1732
"FixedType": pl.Binary(),
1733
}
1734
)
1735
1736
df_dict = {
1737
"height_provider": [1],
1738
"BooleanType": [True],
1739
"IntegerType": [1],
1740
"LongType": [1],
1741
"FloatType": [1.0],
1742
"DoubleType": [1.0],
1743
"DateType": [datetime.date(2025, 1, 1)],
1744
"TimeType": [datetime.time(11, 30)],
1745
"TimestampType": [datetime.datetime(2025, 1, 1)],
1746
"TimestamptzType": [datetime.datetime(2025, 1, 1)],
1747
"StringType": ["A"],
1748
"BinaryType": [b"A"],
1749
**(
1750
{
1751
"DecimalType": [D("1.00")],
1752
# This helps ensure loads are done with the correct endianness.
1753
"DecimalTypeLargeValue": [D("73377733337777733333377777773333333377")],
1754
"DecimalTypeLargeNegativeValue": [
1755
D("-73377733337777733333377777773333333377")
1756
],
1757
"FixedType": [b"A"],
1758
}
1759
if test_decimal_and_fixed
1760
else {}
1761
),
1762
}
1763
1764
arrow_tbl = pa.Table.from_pydict(
1765
df_dict,
1766
schema=schema_to_pyarrow(iceberg_schema, include_field_ids=False),
1767
)
1768
1769
tbl = catalog.create_table(
1770
"namespace.table",
1771
iceberg_schema,
1772
partition_spec=PartitionSpec(
1773
# We have this to offset the indices
1774
PartitionField(
1775
iceberg_schema.fields[0].field_id, 0, BucketTransform(32), "bucket"
1776
),
1777
*(
1778
PartitionField(field.field_id, 0, IdentityTransform(), field.name)
1779
for field in iceberg_schema.fields[1:]
1780
),
1781
),
1782
)
1783
1784
tbl.append(arrow_tbl)
1785
1786
expect = pl.DataFrame(df_dict, schema=pl_schema)
1787
1788
assert_frame_equal(
1789
pl.scan_iceberg(tbl, reader_override="pyiceberg").collect(),
1790
expect,
1791
)
1792
1793
assert_frame_equal(
1794
pl.scan_iceberg(tbl, reader_override="native").collect(),
1795
expect,
1796
)
1797
1798
# Begin inspecting statistics
1799
1800
scan_data = new_pl_iceberg_dataset(tbl)._to_dataset_scan_impl()
1801
1802
assert isinstance(scan_data, _NativeIcebergScanData)
1803
assert scan_data.statistics_loader is None
1804
assert scan_data.min_max_statistics is None
1805
1806
scan_data = new_pl_iceberg_dataset(tbl)._to_dataset_scan_impl(
1807
filter_columns=["height_provider"]
1808
)
1809
1810
assert isinstance(scan_data, _NativeIcebergScanData)
1811
assert scan_data.min_max_statistics is not None
1812
1813
min_max_values = scan_data.min_max_statistics.with_columns(
1814
pl.all().cast(pl.String)
1815
).transpose(include_header=True)
1816
1817
assert_frame_equal(
1818
min_max_values,
1819
pl.DataFrame(
1820
[
1821
("len", "1"),
1822
("height_provider_nc", "0"),
1823
("height_provider_min", "1"),
1824
("height_provider_max", "1"),
1825
],
1826
orient="row",
1827
schema=min_max_values.schema,
1828
),
1829
)
1830
1831
scan_data = new_pl_iceberg_dataset(tbl)._to_dataset_scan_impl(
1832
filter_columns=pl_schema.names()
1833
)
1834
1835
assert isinstance(scan_data, _NativeIcebergScanData)
1836
assert scan_data.statistics_loader is not None
1837
1838
non_coalesced_min_max_values = (
1839
scan_data.statistics_loader.finish(len(scan_data.sources), {})
1840
.with_columns(pl.all().cast(pl.String))
1841
.transpose(include_header=True)
1842
)
1843
1844
assert_frame_equal(
1845
non_coalesced_min_max_values,
1846
pl.DataFrame(
1847
[
1848
("len", "1"),
1849
("height_provider_nc", "0"),
1850
("height_provider_min", "1"),
1851
("height_provider_max", "1"),
1852
("BooleanType_nc", "0", "1"),
1853
("BooleanType_min", "true"),
1854
("BooleanType_max", "true"),
1855
("IntegerType_nc", "0", "1"),
1856
("IntegerType_min", "1"),
1857
("IntegerType_max", "1"),
1858
("LongType_nc", "0", "1"),
1859
("LongType_min", "1"),
1860
("LongType_max", "1"),
1861
("FloatType_nc", "0", "1"),
1862
("FloatType_min", None),
1863
("FloatType_max", None),
1864
("DoubleType_nc", "0", "1"),
1865
("DoubleType_min", None),
1866
("DoubleType_max", None),
1867
("DateType_nc", "0", "1"),
1868
("DateType_min", "2025-01-01"),
1869
("DateType_max", "2025-01-01"),
1870
("TimeType_nc", "0", "1"),
1871
("TimeType_min", "11:30:00"),
1872
("TimeType_max", "11:30:00"),
1873
("TimestampType_nc", "0", "1"),
1874
("TimestampType_min", "2025-01-01 00:00:00.000000"),
1875
("TimestampType_max", "2025-01-01 00:00:00.000000"),
1876
("TimestamptzType_nc", "0", "1"),
1877
("TimestamptzType_min", "2025-01-01 00:00:00.000000+00:00"),
1878
("TimestamptzType_max", "2025-01-01 00:00:00.000000+00:00"),
1879
("StringType_nc", "0"),
1880
("StringType_min", "A"),
1881
("StringType_max", "A"),
1882
("BinaryType_nc", "0"),
1883
("BinaryType_min", "A"),
1884
("BinaryType_max", "A"),
1885
("DecimalType_nc", "0"),
1886
("DecimalType_min", "1.00"),
1887
("DecimalType_max", "1.00"),
1888
("DecimalTypeLargeValue_nc", "0", "1"),
1889
("DecimalTypeLargeValue_min", "73377733337777733333377777773333333377"),
1890
("DecimalTypeLargeValue_max", "73377733337777733333377777773333333377"),
1891
("DecimalTypeLargeNegativeValue_nc", "0"),
1892
(
1893
"DecimalTypeLargeNegativeValue_min",
1894
"-73377733337777733333377777773333333377",
1895
),
1896
(
1897
"DecimalTypeLargeNegativeValue_max",
1898
"-73377733337777733333377777773333333377",
1899
),
1900
("FixedType_nc", "0"),
1901
("FixedType_min", "A"),
1902
("FixedType_max", "A"),
1903
],
1904
orient="row",
1905
schema=non_coalesced_min_max_values.schema,
1906
),
1907
)
1908
1909
assert scan_data.min_max_statistics is not None
1910
1911
coalesced_min_max_values = scan_data.min_max_statistics.with_columns(
1912
pl.all().cast(pl.String)
1913
).transpose(include_header=True)
1914
1915
coalesced_ne_non_coalesced = pl.concat(
1916
[
1917
non_coalesced_min_max_values.select(
1918
pl.struct(pl.all()).alias("non_coalesced")
1919
),
1920
coalesced_min_max_values.select(pl.struct(pl.all()).alias("coalesced")),
1921
],
1922
how="horizontal",
1923
).filter(pl.first() != pl.last())
1924
1925
# Float statistics are available after coalescing from an identity partition field.
1926
assert_frame_equal(
1927
coalesced_ne_non_coalesced,
1928
pl.DataFrame(
1929
[
1930
(
1931
{"column": "FloatType_min", "column_0": None},
1932
{"column": "FloatType_min", "column_0": "1.0"},
1933
),
1934
(
1935
{"column": "FloatType_max", "column_0": None},
1936
{"column": "FloatType_max", "column_0": "1.0"},
1937
),
1938
(
1939
{"column": "DoubleType_min", "column_0": None},
1940
{"column": "DoubleType_min", "column_0": "1.0"},
1941
),
1942
(
1943
{"column": "DoubleType_max", "column_0": None},
1944
{"column": "DoubleType_max", "column_0": "1.0"},
1945
),
1946
],
1947
orient="row",
1948
schema=coalesced_ne_non_coalesced.schema,
1949
),
1950
)
1951
1952
dfiles = [x.file.file_path for x in tbl.scan().plan_files()]
1953
assert len(dfiles) == 1
1954
1955
Path(dfiles[0].removeprefix("file://")).unlink()
1956
1957
expect_file_not_found_err = pytest.raises(
1958
OSError,
1959
match=(
1960
"The system cannot find the file specified"
1961
if sys.platform == "win32"
1962
else "No such file or directory"
1963
),
1964
)
1965
1966
with expect_file_not_found_err:
1967
pl.scan_iceberg(tbl, reader_override="native").collect()
1968
1969
iceberg_table_filter_seen = False
1970
1971
def ensure_filter_skips_file(filter_expr: pl.Expr) -> None:
1972
nonlocal iceberg_table_filter_seen
1973
1974
with plmonkeypatch.context() as cx:
1975
cx.setenv("POLARS_VERBOSE", "1")
1976
capfd.readouterr()
1977
1978
assert_frame_equal(
1979
pl.scan_iceberg(tbl, reader_override="native").filter(filter_expr),
1980
pl.LazyFrame(schema=pl_schema),
1981
)
1982
1983
capture = capfd.readouterr().err
1984
1985
if "iceberg_table_filter: Some(<redacted>)" in capture:
1986
assert "allows skipping 0 / 0 files" in capture
1987
assert (
1988
"apply_scan_predicate_to_scan_ir: PredicateFileSkip { no_residual_predicate: false, original_len: 0 }"
1989
in capture
1990
)
1991
1992
# Scanning with pyiceberg can also skip the file if the predicate
1993
# can be converted.
1994
assert_frame_equal(
1995
pl.scan_iceberg(tbl, reader_override="pyiceberg").filter(
1996
filter_expr
1997
),
1998
pl.LazyFrame(schema=pl_schema),
1999
)
2000
2001
iceberg_table_filter_seen = True
2002
else:
2003
assert "allows skipping 1 / 1 files" in capture
2004
assert (
2005
"apply_scan_predicate_to_scan_ir: PredicateFileSkip { no_residual_predicate: false, original_len: 1 }"
2006
in capture
2007
)
2008
2009
capfd.readouterr()
2010
2011
assert_frame_equal(
2012
pl.scan_iceberg(tbl, reader_override="native")
2013
.with_row_index()
2014
.filter(filter_expr),
2015
pl.LazyFrame(schema=pl_schema).with_row_index(),
2016
)
2017
2018
capture = capfd.readouterr().err
2019
2020
assert "iceberg_table_filter: Some(<redacted>)" not in capture
2021
2022
# Check different operators
2023
ensure_filter_skips_file(pl.col("IntegerType") > 1)
2024
ensure_filter_skips_file(pl.col("IntegerType") != 1)
2025
ensure_filter_skips_file(pl.col("IntegerType").is_in([0]))
2026
2027
# Ensure `use_metadata_statistics=False` does not skip based on statistics
2028
with expect_file_not_found_err:
2029
pl.scan_iceberg(
2030
tbl,
2031
reader_override="native",
2032
use_metadata_statistics=False,
2033
).filter(pl.col("IntegerType") > 1).collect()
2034
2035
with expect_file_not_found_err:
2036
pickle.loads(
2037
pickle.dumps(
2038
pl.scan_iceberg(
2039
tbl,
2040
reader_override="native",
2041
use_metadata_statistics=False,
2042
).filter(pl.col("IntegerType") > 1)
2043
)
2044
).collect()
2045
2046
# Check different types
2047
ensure_filter_skips_file(pl.col("BooleanType") < True)
2048
ensure_filter_skips_file(pl.col("IntegerType") < 1)
2049
ensure_filter_skips_file(pl.col("LongType") < 1)
2050
ensure_filter_skips_file(pl.col("FloatType") < 1.0)
2051
ensure_filter_skips_file(pl.col("DoubleType") < 1.0)
2052
ensure_filter_skips_file(pl.col("DateType") < datetime.date(2025, 1, 1))
2053
ensure_filter_skips_file(pl.col("TimeType") < datetime.time(11, 30))
2054
ensure_filter_skips_file(pl.col("TimestampType") < datetime.datetime(2025, 1, 1))
2055
ensure_filter_skips_file(
2056
pl.col("TimestamptzType")
2057
< pl.lit(datetime.datetime(2025, 1, 1), dtype=pl.Datetime("ms", "UTC"))
2058
)
2059
ensure_filter_skips_file(pl.col("StringType") < "A")
2060
ensure_filter_skips_file(pl.col("BinaryType") < b"A")
2061
ensure_filter_skips_file(pl.col("DecimalType") < D("1.00"))
2062
ensure_filter_skips_file(
2063
pl.col("DecimalTypeLargeValue") < D("73377733337777733333377777773333333377")
2064
)
2065
ensure_filter_skips_file(
2066
pl.col("DecimalTypeLargeNegativeValue")
2067
< D("-73377733337777733333377777773333333377")
2068
)
2069
ensure_filter_skips_file(pl.col("FixedType") < b"A")
2070
2071
# Check row index. It should have a null_count statistic column of 0.
2072
assert_frame_equal(
2073
pl.scan_iceberg(tbl, reader_override="native")
2074
.with_row_index()
2075
.filter(pl.col("index").is_null()),
2076
pl.LazyFrame(schema={"index": pl.get_index_type(), **pl_schema}),
2077
)
2078
2079
assert iceberg_table_filter_seen
2080
2081
2082
@pytest.mark.write_disk
2083
def test_scan_iceberg_categorical_24140(tmp_path: Path) -> None:
2084
catalog = SqlCatalog(
2085
"default",
2086
uri="sqlite:///:memory:",
2087
warehouse=format_file_uri_iceberg(tmp_path),
2088
)
2089
catalog.create_namespace("namespace")
2090
2091
next_field_id = partial(next, itertools.count(1))
2092
2093
iceberg_schema = IcebergSchema(
2094
NestedField(
2095
next_field_id(),
2096
"values",
2097
StringType(),
2098
),
2099
)
2100
2101
tbl = catalog.create_table("namespace.table", iceberg_schema)
2102
2103
df = pl.DataFrame(
2104
{"values": "A"},
2105
schema={"values": pl.Categorical()},
2106
)
2107
2108
arrow_tbl = df.to_arrow()
2109
2110
arrow_type = arrow_tbl.schema.field("values").type
2111
assert arrow_type.index_type == pa.uint32()
2112
assert arrow_type.value_type == pa.large_string()
2113
2114
tbl.append(arrow_tbl)
2115
2116
expect = pl.DataFrame({"values": "A"}, schema={"values": pl.String})
2117
2118
assert_frame_equal(
2119
pl.scan_iceberg(tbl, reader_override="native").collect(),
2120
expect,
2121
)
2122
2123
2124
@pytest.mark.write_disk
2125
def test_scan_iceberg_fast_count(tmp_path: Path) -> None:
2126
catalog = SqlCatalog(
2127
"default",
2128
uri="sqlite:///:memory:",
2129
warehouse=format_file_uri_iceberg(tmp_path),
2130
)
2131
catalog.create_namespace("namespace")
2132
2133
catalog.create_table(
2134
"namespace.table",
2135
IcebergSchema(NestedField(1, "a", LongType())),
2136
)
2137
2138
tbl = catalog.load_table("namespace.table")
2139
2140
pl.DataFrame({"a": [0, 1, 2, 3, 4]}).write_iceberg(tbl, mode="append")
2141
2142
assert (
2143
pl.scan_iceberg(tbl, reader_override="native", use_metadata_statistics=True)
2144
.select(pl.len())
2145
.collect()
2146
.item()
2147
== 5
2148
)
2149
2150
assert (
2151
pl.scan_iceberg(tbl, reader_override="native", use_metadata_statistics=True)
2152
.filter(pl.col("a") <= 2)
2153
.select(pl.len())
2154
.collect()
2155
.item()
2156
== 3
2157
)
2158
2159
assert (
2160
pl.scan_iceberg(tbl, reader_override="native", use_metadata_statistics=True)
2161
.head(3)
2162
.select(pl.len())
2163
.collect()
2164
.item()
2165
== 3
2166
)
2167
2168
assert (
2169
pl.scan_iceberg(tbl, reader_override="native", use_metadata_statistics=True)
2170
.slice(1, 3)
2171
.select(pl.len())
2172
.collect()
2173
.item()
2174
== 3
2175
)
2176
2177
dfiles = [*tbl.scan().plan_files()]
2178
2179
assert len(dfiles) == 1
2180
2181
p = dfiles[0].file.file_path.removeprefix("file://")
2182
2183
# Overwrite the data file with one that has a different number of rows
2184
pq.write_table(
2185
pa.Table.from_pydict(
2186
{"a": [0, 1, 2]},
2187
schema=schema_to_pyarrow(tbl.schema()),
2188
),
2189
p,
2190
)
2191
2192
# `use_metadata_statistics=False` should disable sourcing the row count from
2193
# Iceberg metadata.
2194
assert (
2195
pl.scan_iceberg(tbl, reader_override="native", use_metadata_statistics=False)
2196
.select(pl.len())
2197
.collect()
2198
.item()
2199
== 3
2200
)
2201
2202
assert (
2203
pickle.loads(
2204
pickle.dumps(
2205
pl.scan_iceberg(
2206
tbl, reader_override="native", use_metadata_statistics=False
2207
).select(pl.len())
2208
)
2209
)
2210
.collect()
2211
.item()
2212
== 3
2213
)
2214
2215
Path(p).unlink()
2216
2217
with pytest.raises(
2218
OSError,
2219
match=(
2220
"The system cannot find the file specified"
2221
if sys.platform == "win32"
2222
else "No such file or directory"
2223
),
2224
):
2225
pl.scan_iceberg(tbl, reader_override="native").collect()
2226
2227
# `select(len())` should be able to return the result from the Iceberg metadata
2228
# without looking at the underlying data files.
2229
assert (
2230
pl.scan_iceberg(tbl, reader_override="native").select(pl.len()).collect().item()
2231
== 5
2232
)
2233
2234
2235
def test_scan_iceberg_idxsize_limit() -> None:
2236
if isinstance(pl.get_index_type(), pl.UInt64):
2237
assert (
2238
pl.scan_parquet([b""], schema={}, _row_count=(1 << 32, 0))
2239
.select(pl.len())
2240
.collect()
2241
.item()
2242
== 1 << 32
2243
)
2244
2245
return
2246
2247
f = io.BytesIO()
2248
2249
pl.DataFrame({"x": 1}).write_parquet(f)
2250
2251
q = pl.scan_parquet([f.getvalue()], schema={"x": pl.Int64}, _row_count=(1 << 32, 0))
2252
2253
assert_frame_equal(q.collect(), pl.DataFrame({"x": 1}))
2254
2255
with pytest.raises(
2256
pl.exceptions.ComputeError,
2257
match=r"row count \(4294967296\) exceeded maximum supported of 4294967295.*Consider installing 'polars\[rt64\]'.",
2258
):
2259
q.select(pl.len()).collect()
2260
2261
2262
@pytest.mark.write_disk
2263
def test_iceberg_filter_bool_26474(tmp_path: Path) -> None:
2264
catalog = SqlCatalog(
2265
"test", uri="sqlite:///:memory:", warehouse=format_file_uri_iceberg(tmp_path)
2266
)
2267
catalog.create_namespace("default")
2268
tbl = catalog.create_table(
2269
"default.test",
2270
IcebergSchema(
2271
NestedField(1, "id", LongType()),
2272
NestedField(2, "foo", BooleanType()),
2273
),
2274
)
2275
2276
schema = {"id": pl.Int64, "foo": pl.Boolean}
2277
2278
dfs = [
2279
pl.DataFrame({"id": [1], "foo": [True]}, schema=schema),
2280
pl.DataFrame({"id": [2], "foo": [False]}, schema=schema),
2281
pl.DataFrame({"id": [3], "foo": [None]}, schema=schema),
2282
]
2283
2284
for df in dfs:
2285
df.write_iceberg(tbl, mode="append")
2286
2287
assert sum(1 for _ in tbl.scan().plan_files()) == 3
2288
2289
dfs_concat = pl.concat(dfs)
2290
2291
for predicate in [
2292
pl.col("foo"),
2293
~pl.col("foo"),
2294
pl.col("foo") & pl.col("foo"),
2295
pl.col("foo") | pl.col("foo"),
2296
pl.col("foo") ^ pl.col("foo"),
2297
pl.col("foo") & ~pl.col("foo"),
2298
pl.col("foo") | ~pl.col("foo"),
2299
pl.col("foo") ^ pl.col("foo"),
2300
pl.col("foo") & pl.col("foo") | pl.col("foo"),
2301
pl.col("foo") | pl.col("foo") & pl.col("foo"),
2302
pl.col("foo") == True, # noqa: E712
2303
pl.col("foo") == False, # noqa: E712
2304
]:
2305
assert_frame_equal(
2306
pl.scan_iceberg(tbl).filter(predicate).collect(),
2307
dfs_concat.filter(predicate),
2308
check_row_order=False,
2309
)
2310
2311
2312
@pytest.mark.write_disk
2313
def test_scan_iceberg_partial_and_pushdown(
2314
tmp_path: Path,
2315
plmonkeypatch: PlMonkeyPatch,
2316
capfd: pytest.CaptureFixture[str],
2317
) -> None:
2318
plmonkeypatch.setenv("POLARS_VERBOSE_SENSITIVE", "1")
2319
2320
catalog = SqlCatalog(
2321
"default",
2322
uri="sqlite:///:memory:",
2323
warehouse=format_file_uri_iceberg(tmp_path),
2324
)
2325
catalog.create_namespace("namespace")
2326
catalog.create_table(
2327
"namespace.table",
2328
IcebergSchema(
2329
NestedField(1, "a", LongType()),
2330
NestedField(2, "b", DoubleType()),
2331
),
2332
)
2333
tbl = catalog.load_table("namespace.table")
2334
pl.DataFrame(
2335
{"a": [1, 2, 3], "b": [10.0, 20.0, 30.0]},
2336
).write_iceberg(tbl, mode="append")
2337
2338
# a > 1 is convertible; cast is not
2339
q = pl.scan_iceberg(tbl).filter(
2340
(pl.col("a") > 1) & (pl.col("b").cast(pl.Int64) > 0)
2341
)
2342
2343
capfd.readouterr()
2344
result = q.collect()
2345
capture = capfd.readouterr().err
2346
2347
# Verify: partial predicate was pushed
2348
assert "pyarrow_predicate = " in capture
2349
assert "pa.compute.field('a') > 1" in capture
2350
# Verify: correctness
2351
assert len(result) == 2
2352
assert result["a"].to_list() == [2, 3]
2353
2354