Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/io/test_iceberg.py
6939 views
1
# mypy: disable-error-code="attr-defined"
2
from __future__ import annotations
3
4
import contextlib
5
import itertools
6
import os
7
import zoneinfo
8
from datetime import date, datetime
9
from decimal import Decimal as D
10
from functools import partial
11
from pathlib import Path
12
13
import pyarrow as pa
14
import pyarrow.parquet as pq
15
import pyiceberg
16
import pytest
17
from pyiceberg.catalog.sql import SqlCatalog
18
from pyiceberg.io.pyarrow import schema_to_pyarrow
19
from pyiceberg.partitioning import (
20
BucketTransform,
21
IdentityTransform,
22
PartitionField,
23
PartitionSpec,
24
)
25
from pyiceberg.schema import Schema as IcebergSchema
26
from pyiceberg.types import (
27
BinaryType,
28
BooleanType,
29
DateType,
30
DecimalType,
31
DoubleType,
32
FixedType,
33
FloatType,
34
IntegerType,
35
ListType,
36
LongType,
37
MapType,
38
NestedField,
39
StringType,
40
StructType,
41
TimestampType,
42
TimestamptzType,
43
TimeType,
44
UUIDType,
45
)
46
47
import polars as pl
48
from polars._utils.various import parse_version
49
from polars.io.iceberg._utils import _convert_predicate, _to_ast
50
from polars.io.iceberg.dataset import IcebergDataset
51
from polars.testing import assert_frame_equal
52
53
54
@pytest.fixture
55
def iceberg_path(io_files_path: Path) -> str:
56
# Iceberg requires absolute paths, so we'll symlink
57
# the test table into /tmp/iceberg/t1/
58
Path("/tmp/iceberg").mkdir(parents=True, exist_ok=True)
59
current_path = Path(__file__).parent.resolve()
60
61
with contextlib.suppress(FileExistsError):
62
os.symlink(f"{current_path}/files/iceberg-table", "/tmp/iceberg/t1")
63
64
iceberg_path = io_files_path / "iceberg-table" / "metadata" / "v2.metadata.json"
65
return f"file://{iceberg_path.resolve()}"
66
67
68
@pytest.mark.slow
69
@pytest.mark.write_disk
70
@pytest.mark.filterwarnings(
71
"ignore:No preferred file implementation for scheme*:UserWarning"
72
)
73
@pytest.mark.ci_only
74
class TestIcebergScanIO:
75
"""Test coverage for `iceberg` scan ops."""
76
77
def test_scan_iceberg_plain(self, iceberg_path: str) -> None:
78
q = pl.scan_iceberg(iceberg_path)
79
assert len(q.collect()) == 3
80
assert q.collect_schema() == {
81
"id": pl.Int32,
82
"str": pl.String,
83
"ts": pl.Datetime(time_unit="us", time_zone=None),
84
}
85
86
def test_scan_iceberg_snapshot_id(self, iceberg_path: str) -> None:
87
q = pl.scan_iceberg(iceberg_path, snapshot_id=7051579356916758811)
88
assert len(q.collect()) == 3
89
assert q.collect_schema() == {
90
"id": pl.Int32,
91
"str": pl.String,
92
"ts": pl.Datetime(time_unit="us", time_zone=None),
93
}
94
95
def test_scan_iceberg_snapshot_id_not_found(self, iceberg_path: str) -> None:
96
with pytest.raises(ValueError, match="snapshot ID not found"):
97
pl.scan_iceberg(iceberg_path, snapshot_id=1234567890).collect()
98
99
def test_scan_iceberg_filter_on_partition(self, iceberg_path: str) -> None:
100
ts1 = datetime(2023, 3, 1, 18, 15)
101
ts2 = datetime(2023, 3, 1, 19, 25)
102
ts3 = datetime(2023, 3, 2, 22, 0)
103
104
lf = pl.scan_iceberg(iceberg_path)
105
106
res = lf.filter(pl.col("ts") >= ts2)
107
assert len(res.collect()) == 2
108
109
res = lf.filter(pl.col("ts") > ts2).select(pl.col("id"))
110
assert res.collect().rows() == [(3,)]
111
112
res = lf.filter(pl.col("ts") <= ts2).select("id", "ts")
113
assert res.collect().rows(named=True) == [
114
{"id": 1, "ts": ts1},
115
{"id": 2, "ts": ts2},
116
]
117
118
res = lf.filter(pl.col("ts") > ts3)
119
assert len(res.collect()) == 0
120
121
for constraint in (
122
(pl.col("ts") == ts1) | (pl.col("ts") == ts3),
123
pl.col("ts").is_in([ts1, ts3]),
124
):
125
res = lf.filter(constraint).select("id")
126
assert res.collect().rows() == [(1,), (3,)]
127
128
def test_scan_iceberg_filter_on_column(self, iceberg_path: str) -> None:
129
lf = pl.scan_iceberg(iceberg_path)
130
res = lf.filter(pl.col("id") < 2)
131
assert res.collect().rows() == [(1, "1", datetime(2023, 3, 1, 18, 15))]
132
133
res = lf.filter(pl.col("id") == 2)
134
assert res.collect().rows() == [(2, "2", datetime(2023, 3, 1, 19, 25))]
135
136
res = lf.filter(pl.col("id").is_in([1, 3]))
137
assert res.collect().rows() == [
138
(1, "1", datetime(2023, 3, 1, 18, 15)),
139
(3, "3", datetime(2023, 3, 2, 22, 0)),
140
]
141
142
143
@pytest.mark.ci_only
144
class TestIcebergExpressions:
145
"""Test coverage for `iceberg` expressions comprehension."""
146
147
def test_is_null_expression(self) -> None:
148
from pyiceberg.expressions import IsNull
149
150
expr = _to_ast("(pa.compute.field('id')).is_null()")
151
assert _convert_predicate(expr) == IsNull("id")
152
153
def test_is_not_null_expression(self) -> None:
154
from pyiceberg.expressions import IsNull, Not
155
156
expr = _to_ast("~(pa.compute.field('id')).is_null()")
157
assert _convert_predicate(expr) == Not(IsNull("id"))
158
159
def test_isin_expression(self) -> None:
160
from pyiceberg.expressions import In, literal
161
162
expr = _to_ast("(pa.compute.field('id')).isin([1,2,3])")
163
assert _convert_predicate(expr) == In(
164
"id", {literal(1), literal(2), literal(3)}
165
)
166
167
def test_parse_combined_expression(self) -> None:
168
from pyiceberg.expressions import (
169
And,
170
EqualTo,
171
GreaterThan,
172
In,
173
Or,
174
Reference,
175
literal,
176
)
177
178
expr = _to_ast(
179
"(((pa.compute.field('str') == '2') & (pa.compute.field('id') > 10)) | (pa.compute.field('id')).isin([1,2,3]))"
180
)
181
assert _convert_predicate(expr) == Or(
182
left=And(
183
left=EqualTo(term=Reference(name="str"), literal=literal("2")),
184
right=GreaterThan(term="id", literal=literal(10)),
185
),
186
right=In("id", {literal(1), literal(2), literal(3)}),
187
)
188
189
def test_parse_gt(self) -> None:
190
from pyiceberg.expressions import GreaterThan
191
192
expr = _to_ast("(pa.compute.field('ts') > '2023-08-08')")
193
assert _convert_predicate(expr) == GreaterThan("ts", "2023-08-08")
194
195
def test_parse_gteq(self) -> None:
196
from pyiceberg.expressions import GreaterThanOrEqual
197
198
expr = _to_ast("(pa.compute.field('ts') >= '2023-08-08')")
199
assert _convert_predicate(expr) == GreaterThanOrEqual("ts", "2023-08-08")
200
201
def test_parse_eq(self) -> None:
202
from pyiceberg.expressions import EqualTo
203
204
expr = _to_ast("(pa.compute.field('ts') == '2023-08-08')")
205
assert _convert_predicate(expr) == EqualTo("ts", "2023-08-08")
206
207
def test_parse_lt(self) -> None:
208
from pyiceberg.expressions import LessThan
209
210
expr = _to_ast("(pa.compute.field('ts') < '2023-08-08')")
211
assert _convert_predicate(expr) == LessThan("ts", "2023-08-08")
212
213
def test_parse_lteq(self) -> None:
214
from pyiceberg.expressions import LessThanOrEqual
215
216
expr = _to_ast("(pa.compute.field('ts') <= '2023-08-08')")
217
assert _convert_predicate(expr) == LessThanOrEqual("ts", "2023-08-08")
218
219
def test_compare_boolean(self) -> None:
220
from pyiceberg.expressions import EqualTo
221
222
expr = _to_ast("(pa.compute.field('ts') == pa.compute.scalar(True))")
223
assert _convert_predicate(expr) == EqualTo("ts", True)
224
225
expr = _to_ast("(pa.compute.field('ts') == pa.compute.scalar(False))")
226
assert _convert_predicate(expr) == EqualTo("ts", False)
227
228
229
@pytest.mark.slow
230
@pytest.mark.write_disk
231
@pytest.mark.filterwarnings("ignore:Delete operation did not match any records")
232
@pytest.mark.filterwarnings(
233
"ignore:Iceberg does not have a dictionary type. <class 'pyarrow.lib.DictionaryType'> will be inferred as large_string on read."
234
)
235
def test_write_iceberg(df: pl.DataFrame, tmp_path: Path) -> None:
236
# time64[ns] type is currently not supported in pyiceberg.
237
# https://github.com/apache/iceberg-python/issues/1169
238
df = df.drop("time", "cat", "enum")
239
240
# in-memory catalog
241
catalog = SqlCatalog(
242
"default", uri="sqlite:///:memory:", warehouse=f"file://{tmp_path}"
243
)
244
catalog.create_namespace("foo")
245
table = catalog.create_table(
246
"foo.bar",
247
schema=df.to_arrow().schema,
248
)
249
250
df.write_iceberg(table, mode="overwrite")
251
actual = pl.scan_iceberg(table).collect()
252
253
assert_frame_equal(df, actual)
254
255
# append on top of already written data, expecting twice the data
256
df.write_iceberg(table, mode="append")
257
# double the `df` by vertically stacking the dataframe on top of itself
258
expected = df.vstack(df)
259
actual = pl.scan_iceberg(table).collect()
260
assert_frame_equal(expected, actual, check_dtypes=False)
261
262
263
@pytest.mark.write_disk
264
def test_scan_iceberg_row_index_renamed(tmp_path: Path) -> None:
265
catalog = SqlCatalog(
266
"default",
267
uri="sqlite:///:memory:",
268
warehouse=f"file://{tmp_path}",
269
)
270
catalog.create_namespace("namespace")
271
272
catalog.create_table(
273
"namespace.table",
274
IcebergSchema(
275
NestedField(1, "row_index", IntegerType()),
276
NestedField(2, "file_path", StringType()),
277
),
278
)
279
280
tbl = catalog.load_table("namespace.table")
281
282
pl.DataFrame(
283
{"row_index": [0, 1, 2, 3, 4], "file_path": None},
284
schema={"row_index": pl.Int32, "file_path": pl.String},
285
).write_iceberg(tbl, mode="append")
286
287
with tbl.update_schema() as sch:
288
sch.rename_column("row_index", "row_index_in_file")
289
sch.rename_column("file_path", "file_path_in_file")
290
291
file_paths = [x.file.file_path for x in tbl.scan().plan_files()]
292
assert len(file_paths) == 1
293
294
q = pl.scan_parquet(
295
file_paths,
296
schema={
297
"row_index_in_file": pl.Int32,
298
"file_path_in_file": pl.String,
299
},
300
_column_mapping=("iceberg-column-mapping", IcebergDataset(tbl).arrow_schema()),
301
include_file_paths="file_path",
302
row_index_name="row_index",
303
row_index_offset=3,
304
)
305
306
assert_frame_equal(
307
q.collect().with_columns(
308
# To pass Windows CI
309
pl.col("file_path").map_elements(
310
lambda x: str(Path(x).resolve()),
311
return_dtype=pl.String,
312
)
313
),
314
pl.DataFrame(
315
{
316
"row_index": [3, 4, 5, 6, 7],
317
"row_index_in_file": [0, 1, 2, 3, 4],
318
"file_path_in_file": None,
319
"file_path": str(Path(file_paths[0]).resolve()),
320
},
321
schema={
322
"row_index": pl.get_index_type(),
323
"row_index_in_file": pl.Int32,
324
"file_path_in_file": pl.String,
325
"file_path": pl.String,
326
},
327
),
328
)
329
330
331
@pytest.mark.write_disk
332
@pytest.mark.parametrize("reader_override", ["pyiceberg", "native"])
333
def test_scan_iceberg_collect_without_version_scans_latest(
334
tmp_path: Path,
335
reader_override: str,
336
capfd: pytest.CaptureFixture[str],
337
monkeypatch: pytest.MonkeyPatch,
338
) -> None:
339
catalog = SqlCatalog(
340
"default",
341
uri="sqlite:///:memory:",
342
warehouse=f"file://{tmp_path}",
343
)
344
345
catalog.create_namespace("namespace")
346
347
catalog.create_table(
348
"namespace.table",
349
IcebergSchema(
350
NestedField(1, "a", LongType()),
351
),
352
)
353
354
tbl = catalog.load_table("namespace.table")
355
356
q = pl.scan_iceberg(tbl, reader_override=reader_override) # type: ignore[arg-type]
357
358
assert_frame_equal(q.collect(), pl.DataFrame(schema={"a": pl.Int64}))
359
360
pl.DataFrame({"a": 1}).write_iceberg(tbl, mode="append")
361
362
assert_frame_equal(q.collect(), pl.DataFrame({"a": 1}))
363
364
snapshot = tbl.current_snapshot()
365
assert snapshot is not None
366
snapshot_id = snapshot.snapshot_id
367
368
q_with_id = pl.scan_iceberg(
369
tbl,
370
reader_override=reader_override, # type: ignore[arg-type]
371
snapshot_id=snapshot_id,
372
)
373
374
assert_frame_equal(q_with_id.collect(), pl.DataFrame({"a": 1}))
375
376
pl.DataFrame({"a": 2}).write_iceberg(tbl, mode="append")
377
378
assert_frame_equal(q.collect(), pl.DataFrame({"a": [2, 1]}))
379
380
monkeypatch.setenv("POLARS_VERBOSE", "1")
381
capfd.readouterr()
382
assert_frame_equal(q_with_id.collect(), pl.DataFrame({"a": 1}))
383
384
assert (
385
"IcebergDataset: to_dataset_scan(): early return (snapshot_id_key = "
386
in capfd.readouterr().err
387
)
388
389
390
@pytest.mark.write_disk
391
def test_scan_iceberg_extra_columns(tmp_path: Path) -> None:
392
catalog = SqlCatalog(
393
"default",
394
uri="sqlite:///:memory:",
395
warehouse=f"file://{tmp_path}",
396
)
397
catalog.create_namespace("namespace")
398
399
catalog.create_table(
400
"namespace.table",
401
IcebergSchema(
402
NestedField(1, "a", IntegerType()),
403
),
404
)
405
406
tbl = catalog.load_table("namespace.table")
407
408
pl.DataFrame(
409
{"a": [0, 1, 2, 3, 4]},
410
schema={"a": pl.Int32},
411
).write_iceberg(tbl, mode="append")
412
413
with tbl.update_schema() as sch:
414
sch.delete_column("a")
415
sch.add_column("a", IntegerType())
416
417
file_paths = [x.file.file_path for x in tbl.scan().plan_files()]
418
assert len(file_paths) == 1
419
420
q = pl.scan_parquet(
421
file_paths,
422
schema={"a": pl.Int32},
423
_column_mapping=("iceberg-column-mapping", IcebergDataset(tbl).arrow_schema()),
424
)
425
426
# The original column is considered an extra column despite having the same
427
# name as the physical ID does not match.
428
429
with pytest.raises(
430
pl.exceptions.SchemaError,
431
match="extra column in file outside of expected schema: a",
432
):
433
q.collect()
434
435
q = pl.scan_parquet(
436
file_paths,
437
schema={"a": pl.Int32},
438
_column_mapping=("iceberg-column-mapping", IcebergDataset(tbl).arrow_schema()),
439
extra_columns="ignore",
440
missing_columns="insert",
441
)
442
443
assert_frame_equal(
444
q.collect(),
445
pl.DataFrame(
446
{
447
"a": [None, None, None, None, None],
448
},
449
schema={"a": pl.Int32},
450
),
451
)
452
453
454
@pytest.mark.write_disk
455
def test_scan_iceberg_extra_struct_fields(tmp_path: Path) -> None:
456
catalog = SqlCatalog(
457
"default",
458
uri="sqlite:///:memory:",
459
warehouse=f"file://{tmp_path}",
460
)
461
catalog.create_namespace("namespace")
462
463
catalog.create_table(
464
"namespace.table",
465
IcebergSchema(
466
NestedField(1, "a", StructType(NestedField(1, "a", IntegerType()))),
467
),
468
)
469
470
tbl = catalog.load_table("namespace.table")
471
472
pl.DataFrame(
473
{"a": [{"a": 1}, {"a": 2}, {"a": 3}, {"a": 4}, {"a": 5}]},
474
schema={"a": pl.Struct({"a": pl.Int32})},
475
).write_iceberg(tbl, mode="append")
476
477
with tbl.update_schema() as sch:
478
sch.delete_column(("a", "a"))
479
sch.add_column(("a", "a"), IntegerType())
480
481
file_paths = [x.file.file_path for x in tbl.scan().plan_files()]
482
assert len(file_paths) == 1
483
484
q = pl.scan_parquet(
485
file_paths,
486
schema={"a": pl.Struct({"a": pl.Int32})},
487
_column_mapping=("iceberg-column-mapping", IcebergDataset(tbl).arrow_schema()),
488
)
489
490
# The original column is considered an extra column despite having the same
491
# name as the physical ID does not match.
492
493
with pytest.raises(
494
pl.exceptions.SchemaError,
495
match="encountered extra struct field: a",
496
):
497
q.collect()
498
499
q = pl.scan_parquet(
500
file_paths,
501
schema={"a": pl.Struct({"a": pl.Int32})},
502
_column_mapping=("iceberg-column-mapping", IcebergDataset(tbl).arrow_schema()),
503
cast_options=pl.ScanCastOptions(
504
extra_struct_fields="ignore", missing_struct_fields="insert"
505
),
506
)
507
508
assert_frame_equal(
509
q.collect(),
510
pl.DataFrame(
511
{
512
"a": [
513
{"a": None},
514
{"a": None},
515
{"a": None},
516
{"a": None},
517
{"a": None},
518
],
519
},
520
schema={"a": pl.Struct({"a": pl.Int32})},
521
),
522
)
523
524
525
@pytest.mark.write_disk
526
def test_scan_iceberg_column_deletion(tmp_path: Path) -> None:
527
catalog = SqlCatalog(
528
"default",
529
uri="sqlite:///:memory:",
530
warehouse=f"file://{tmp_path}",
531
)
532
catalog.create_namespace("namespace")
533
534
catalog.create_table(
535
"namespace.table",
536
IcebergSchema(
537
NestedField(1, "a", StructType(NestedField(0, "inner", StringType())))
538
),
539
)
540
541
tbl = catalog.load_table("namespace.table")
542
543
pl.DataFrame({"a": [{"inner": "A"}]}).write_iceberg(tbl, mode="append")
544
545
with tbl.update_schema() as sch:
546
sch.delete_column("a").add_column(
547
"a", StructType(NestedField(0, "inner", StringType()))
548
)
549
550
pl.DataFrame({"a": [{"inner": "A"}]}).write_iceberg(tbl, mode="append")
551
552
expect = pl.DataFrame({"a": [{"inner": "A"}, None]})
553
554
assert_frame_equal(
555
pl.scan_iceberg(tbl, reader_override="pyiceberg").collect(),
556
expect,
557
)
558
559
assert_frame_equal(
560
pl.scan_iceberg(tbl, reader_override="native").collect(),
561
expect,
562
)
563
564
565
@pytest.mark.write_disk
566
def test_scan_iceberg_nested_column_cast_deletion_rename(tmp_path: Path) -> None:
567
catalog = SqlCatalog(
568
"default",
569
uri="sqlite:///:memory:",
570
warehouse=f"file://{tmp_path}",
571
)
572
catalog.create_namespace("namespace")
573
574
next_field_id = partial(next, itertools.count())
575
576
catalog.create_table(
577
"namespace.table",
578
IcebergSchema(
579
NestedField(
580
field_id=next_field_id(),
581
name="column_1",
582
field_type=ListType(
583
element_id=next_field_id(),
584
element=StructType(
585
NestedField(
586
field_id=next_field_id(),
587
name="field_1",
588
field_type=MapType(
589
key_id=next_field_id(),
590
key_type=ListType(
591
element_id=next_field_id(), element=TimestampType(), element_required=False
592
),
593
value_id=next_field_id(),
594
value_type=ListType(
595
element_id=next_field_id(), element=IntegerType(), element_required=False
596
),
597
value_required=False,
598
),
599
required=False,
600
),
601
NestedField(
602
field_id=next_field_id(), name="field_2", field_type=IntegerType(), required=False
603
),
604
NestedField(
605
field_id=next_field_id(), name="field_3", field_type=StringType(), required=False
606
),
607
),
608
element_required=False,
609
),
610
required=False,
611
),
612
NestedField(field_id=next_field_id(), name="column_2", field_type=StringType(), required=False),
613
NestedField(
614
field_id=next_field_id(),
615
name="column_3",
616
field_type=MapType(
617
key_id=next_field_id(),
618
key_type=StructType(
619
NestedField(
620
field_id=next_field_id(), name="field_1", field_type=IntegerType(), required=False
621
),
622
NestedField(
623
field_id=next_field_id(), name="field_2", field_type=IntegerType(), required=False
624
),
625
NestedField(
626
field_id=next_field_id(), name="field_3", field_type=IntegerType(), required=False
627
),
628
),
629
value_id=next_field_id(),
630
value_type=StructType(
631
NestedField(
632
field_id=next_field_id(), name="field_1", field_type=IntegerType(), required=False
633
),
634
NestedField(
635
field_id=next_field_id(), name="field_2", field_type=IntegerType(), required=False
636
),
637
NestedField(
638
field_id=next_field_id(), name="field_3", field_type=IntegerType(), required=False
639
),
640
),
641
value_required=False,
642
),
643
required=False,
644
),
645
),
646
) # fmt: skip
647
648
tbl = catalog.load_table("namespace.table")
649
650
df_dict = {
651
"column_1": [
652
[
653
{
654
"field_1": [
655
{"key": [datetime(2025, 1, 1), None], "value": [1, 2, None]},
656
{"key": [datetime(2025, 1, 1), None], "value": None},
657
],
658
"field_2": 7,
659
"field_3": "F3",
660
}
661
],
662
[
663
{
664
"field_1": [{"key": [datetime(2025, 1, 1), None], "value": None}],
665
"field_2": 7,
666
"field_3": "F3",
667
}
668
],
669
[{"field_1": [], "field_2": None, "field_3": None}],
670
[None],
671
[],
672
],
673
"column_2": ["1", "2", "3", "4", None],
674
"column_3": [
675
[
676
{
677
"key": {"field_1": 1, "field_2": 2, "field_3": 3},
678
"value": {"field_1": 7, "field_2": 8, "field_3": 9},
679
}
680
],
681
[
682
{
683
"key": {"field_1": 1, "field_2": 2, "field_3": 3},
684
"value": {"field_1": 7, "field_2": 8, "field_3": 9},
685
}
686
],
687
[
688
{
689
"key": {"field_1": None, "field_2": None, "field_3": None},
690
"value": {"field_1": None, "field_2": None, "field_3": None},
691
}
692
],
693
[
694
{
695
"key": {"field_1": None, "field_2": None, "field_3": None},
696
"value": None,
697
}
698
],
699
[],
700
],
701
}
702
703
df = pl.DataFrame(
704
df_dict,
705
schema={
706
"column_1": pl.List(
707
pl.Struct(
708
{
709
"field_1": pl.List(
710
pl.Struct({"key": pl.List(pl.Datetime("us")), "value": pl.List(pl.Int32)})
711
),
712
"field_2": pl.Int32,
713
"field_3": pl.String,
714
}
715
)
716
),
717
"column_2": pl.String,
718
"column_3": pl.List(
719
pl.Struct(
720
{
721
"key": pl.Struct({"field_1": pl.Int32, "field_2": pl.Int32, "field_3": pl.Int32}),
722
"value": pl.Struct({"field_1": pl.Int32, "field_2": pl.Int32, "field_3": pl.Int32}),
723
}
724
)
725
),
726
},
727
) # fmt: skip
728
729
# The Iceberg table schema has a `Map` type, whereas the polars DataFrame
730
# stores `list[struct{..}]` - directly using `write_iceberg()` causes the
731
# following error:
732
# * ValueError: PyArrow table contains more columns:
733
# column_1.element.field_1.element
734
# We workaround this by constructing a pyarrow table an arrow schema.
735
arrow_tbl = pa.Table.from_pydict(
736
df_dict,
737
schema=pa.schema(
738
[
739
(
740
"column_1",
741
pa.large_list(
742
pa.struct(
743
[
744
(
745
"field_1",
746
pa.map_(pa.large_list(pa.timestamp("us")), pa.large_list(pa.int32())),
747
),
748
("field_2", pa.int32()),
749
("field_3", pa.string()),
750
]
751
)
752
),
753
),
754
("column_2", pa.string()),
755
(
756
"column_3",
757
pa.map_(
758
pa.struct([("field_1", pa.int32()), ("field_2", pa.int32()), ("field_3", pa.int32())]),
759
pa.struct([("field_1", pa.int32()), ("field_2", pa.int32()), ("field_3", pa.int32())]),
760
),
761
),
762
]
763
),
764
) # fmt: skip
765
766
assert_frame_equal(pl.DataFrame(arrow_tbl), df)
767
768
tbl.append(arrow_tbl)
769
770
assert_frame_equal(pl.scan_iceberg(tbl, reader_override="pyiceberg").collect(), df)
771
assert_frame_equal(pl.scan_iceberg(tbl, reader_override="native").collect(), df)
772
773
# Change schema
774
# Note: Iceberg doesn't allow modifying the "key" part of the Map type.
775
776
# Promote types
777
with tbl.update_schema() as sch:
778
sch.update_column(("column_1", "field_2"), LongType())
779
sch.update_column(("column_3", "value", "field_1"), LongType())
780
sch.update_column(("column_3", "value", "field_2"), LongType())
781
sch.update_column(("column_3", "value", "field_3"), LongType())
782
783
# Delete/Rename:
784
# * Delete `*_2` fields
785
# * Rename:
786
# * `{x}_1` -> `{x}_2`
787
# * `{x}_3` -> `{x}_1`
788
# * And move the field position to 1st
789
790
# Delete `*_2` fields/columns.
791
with tbl.update_schema() as sch:
792
sch.delete_column("column_2")
793
sch.delete_column(("column_3", "value", "field_2"))
794
sch.delete_column(("column_1", "field_2"))
795
796
# Shift nested fields in `column_1`
797
with tbl.update_schema() as sch:
798
sch.rename_column(("column_1", "field_1"), "field_2")
799
800
with tbl.update_schema() as sch:
801
sch.rename_column(("column_1", "field_3"), "field_1")
802
803
with tbl.update_schema() as sch:
804
sch.move_first(("column_1", "field_1"))
805
806
# Shift nested fields in `column_2`
807
with tbl.update_schema() as sch:
808
sch.rename_column(("column_3", "value", "field_1"), "field_2")
809
810
with tbl.update_schema() as sch:
811
sch.rename_column(("column_3", "value", "field_3"), "field_1")
812
813
with tbl.update_schema() as sch:
814
sch.move_first(("column_3", "value", "field_1"))
815
816
# Shift top-level columns
817
with tbl.update_schema() as sch:
818
sch.rename_column("column_1", "column_2")
819
820
with tbl.update_schema() as sch:
821
sch.rename_column("column_3", "column_1")
822
823
with tbl.update_schema() as sch:
824
sch.move_first("column_1")
825
826
expect = pl.DataFrame(
827
{
828
"column_2": [
829
[
830
{
831
"field_2": [
832
{"key": [datetime(2025, 1, 1, 0, 0), None], "value": [1, 2, None]},
833
{"key": [datetime(2025, 1, 1), None], "value": None},
834
],
835
"field_1": "F3",
836
}
837
],
838
[{"field_2": [{"key": [datetime(2025, 1, 1, 0, 0), None], "value": None}], "field_1": "F3"}],
839
[{"field_2": [], "field_1": None}],
840
[None],
841
[],
842
],
843
"column_1": [
844
[{"key": {"field_1": 1, "field_2": 2, "field_3": 3}, "value": {"field_2": 7, "field_1": 9}}],
845
[{"key": {"field_1": 1, "field_2": 2, "field_3": 3}, "value": {"field_2": 7, "field_1": 9}}],
846
[
847
{
848
"key": {"field_1": None, "field_2": None, "field_3": None},
849
"value": {"field_2": None, "field_1": None},
850
}
851
],
852
[{"key": {"field_1": None, "field_2": None, "field_3": None}, "value": None}],
853
[],
854
],
855
},
856
schema={
857
"column_1": pl.List(
858
pl.Struct(
859
{
860
"key": pl.Struct({"field_1": pl.Int32, "field_2": pl.Int32, "field_3": pl.Int32}),
861
"value": pl.Struct({"field_1": pl.Int64, "field_2": pl.Int64}),
862
}
863
)
864
),
865
"column_2": pl.List(
866
pl.Struct(
867
{
868
"field_1": pl.String,
869
"field_2": pl.List(
870
pl.Struct(
871
{
872
"key": pl.List(pl.Datetime(time_unit="us", time_zone=None)),
873
"value": pl.List(pl.Int32),
874
}
875
)
876
),
877
}
878
)
879
),
880
},
881
) # fmt: skip
882
883
assert_frame_equal(
884
pl.scan_iceberg(tbl, reader_override="pyiceberg").collect(), expect
885
)
886
assert_frame_equal(pl.scan_iceberg(tbl, reader_override="native").collect(), expect)
887
888
889
@pytest.mark.write_disk
890
@pytest.mark.xfail(
891
reason="""\
892
[Upstream Issue]
893
PyIceberg writes NULL as empty lists into the Parquet file.
894
* Issue on Polars repo - https://github.com/pola-rs/polars/issues/23715
895
* Issue on PyIceberg repo - https://github.com/apache/iceberg-python/issues/2246
896
"""
897
)
898
def test_scan_iceberg_nulls_multiple_nesting(tmp_path: Path) -> None:
899
catalog = SqlCatalog(
900
"default",
901
uri="sqlite:///:memory:",
902
warehouse=f"file://{tmp_path}",
903
)
904
catalog.create_namespace("namespace")
905
906
next_field_id = partial(next, itertools.count())
907
908
catalog.create_table(
909
"namespace.table",
910
IcebergSchema(
911
NestedField(
912
field_id=next_field_id(),
913
name="column_1",
914
field_type=ListType(
915
element_id=next_field_id(),
916
element=StructType(
917
NestedField(
918
field_id=next_field_id(),
919
name="field_1",
920
field_type=ListType(
921
element_id=next_field_id(),
922
element=StructType(
923
NestedField(field_id=next_field_id(), name="key", field_type=ListType(
924
element_id=next_field_id(),
925
element=TimestampType(),
926
element_required=False,
927
), required=True),
928
NestedField(field_id=next_field_id(), name="value", field_type=ListType(
929
element_id=next_field_id(),
930
element=IntegerType(),
931
element_required=False,
932
), required=False),
933
),
934
element_required=False
935
),
936
required=False,
937
),
938
NestedField(field_id=next_field_id(), name="field_2", field_type=IntegerType(), required=False),
939
NestedField(field_id=next_field_id(), name="field_3", field_type=StringType(), required=False),
940
),
941
element_required=False,
942
),
943
required=False,
944
),
945
),
946
) # fmt: skip
947
948
tbl = catalog.load_table("namespace.table")
949
950
df_dict = {
951
"column_1": [
952
[
953
{
954
"field_1": [
955
{"key": [datetime(2025, 1, 1), None], "value": [1, 2, None]}
956
],
957
"field_2": 7,
958
"field_3": "F3",
959
}
960
],
961
[
962
{
963
"field_1": [{"key": [datetime(2025, 1, 1), None], "value": None}],
964
"field_2": 7,
965
"field_3": "F3",
966
}
967
],
968
[{"field_1": None, "field_2": None, "field_3": None}],
969
[None],
970
None,
971
],
972
}
973
974
df = pl.DataFrame(
975
df_dict,
976
schema={
977
"column_1": pl.List(
978
pl.Struct(
979
{
980
"field_1": pl.List(
981
pl.Struct(
982
{
983
"key": pl.List(pl.Datetime("us")),
984
"value": pl.List(pl.Int32),
985
}
986
)
987
),
988
"field_2": pl.Int32,
989
"field_3": pl.String,
990
}
991
)
992
),
993
},
994
)
995
996
arrow_tbl = pa.Table.from_pydict(
997
df_dict,
998
schema=pa.schema(
999
[
1000
(
1001
"column_1",
1002
pa.large_list(
1003
pa.struct(
1004
[
1005
(
1006
"field_1",
1007
pa.large_list(
1008
pa.struct(
1009
[
1010
pa.field(
1011
"key",
1012
pa.large_list(pa.timestamp("us")),
1013
nullable=False,
1014
),
1015
("value", pa.large_list(pa.int32())),
1016
]
1017
)
1018
),
1019
),
1020
("field_2", pa.int32()),
1021
("field_3", pa.string()),
1022
]
1023
)
1024
),
1025
)
1026
]
1027
),
1028
)
1029
1030
assert_frame_equal(pl.DataFrame(arrow_tbl), df)
1031
1032
tbl.append(arrow_tbl)
1033
1034
assert_frame_equal(pl.scan_iceberg(tbl, reader_override="pyiceberg").collect(), df)
1035
assert_frame_equal(pl.scan_iceberg(tbl, reader_override="native").collect(), df)
1036
1037
1038
@pytest.mark.write_disk
1039
def test_scan_iceberg_nulls_nested(tmp_path: Path) -> None:
1040
catalog = SqlCatalog(
1041
"default",
1042
uri="sqlite:///:memory:",
1043
warehouse=f"file://{tmp_path}",
1044
)
1045
catalog.create_namespace("namespace")
1046
1047
next_field_id = partial(next, itertools.count())
1048
1049
catalog.create_table(
1050
"namespace.table",
1051
IcebergSchema(
1052
NestedField(
1053
field_id=next_field_id(),
1054
name="column_1",
1055
field_type=ListType(
1056
element_id=next_field_id(),
1057
element=IntegerType(),
1058
element_required=False,
1059
),
1060
required=False,
1061
),
1062
),
1063
)
1064
1065
tbl = catalog.load_table("namespace.table")
1066
1067
df = pl.DataFrame(
1068
{
1069
"column_1": [
1070
[1, 2],
1071
[None],
1072
None,
1073
],
1074
},
1075
schema={
1076
"column_1": pl.List(pl.Int32),
1077
},
1078
)
1079
1080
df_dict = df.to_dict(as_series=False)
1081
1082
assert_frame_equal(pl.DataFrame(df_dict, schema=df.schema), df)
1083
1084
arrow_tbl = pa.Table.from_pydict(
1085
df_dict,
1086
schema=pa.schema(
1087
[
1088
(
1089
"column_1",
1090
pa.large_list(pa.int32()),
1091
)
1092
]
1093
),
1094
)
1095
1096
assert_frame_equal(pl.DataFrame(arrow_tbl), df)
1097
1098
tbl.append(arrow_tbl)
1099
1100
assert_frame_equal(pl.scan_iceberg(tbl, reader_override="pyiceberg").collect(), df)
1101
assert_frame_equal(pl.scan_iceberg(tbl, reader_override="native").collect(), df)
1102
1103
1104
def test_scan_iceberg_parquet_prefilter_with_column_mapping(
1105
tmp_path: Path,
1106
monkeypatch: pytest.MonkeyPatch,
1107
capfd: pytest.CaptureFixture[str],
1108
) -> None:
1109
catalog = SqlCatalog(
1110
"default",
1111
uri="sqlite:///:memory:",
1112
warehouse=f"file://{tmp_path}",
1113
)
1114
catalog.create_namespace("namespace")
1115
1116
next_field_id = partial(next, itertools.count())
1117
1118
catalog.create_table(
1119
"namespace.table",
1120
IcebergSchema(
1121
NestedField(
1122
field_id=next_field_id(),
1123
name="column_1",
1124
field_type=StringType(),
1125
required=False,
1126
),
1127
NestedField(
1128
field_id=next_field_id(),
1129
name="column_2",
1130
field_type=IntegerType(),
1131
required=False,
1132
),
1133
NestedField(
1134
field_id=next_field_id(),
1135
name="column_3",
1136
field_type=StringType(),
1137
required=False,
1138
),
1139
),
1140
)
1141
1142
tbl = catalog.load_table("namespace.table")
1143
1144
df = pl.DataFrame(
1145
{
1146
"column_1": ["A", "B", "C", "D", "E", "F"],
1147
"column_2": pl.Series([1, 2, 3, 4, 5, 6], dtype=pl.Int32),
1148
"column_3": ["P", "Q", "R", "S", "T", "U"],
1149
}
1150
)
1151
1152
df.slice(0, 3).write_iceberg(tbl, mode="append")
1153
df.slice(3).write_iceberg(tbl, mode="append")
1154
1155
with tbl.update_schema() as sch:
1156
sch.update_column("column_2", LongType())
1157
1158
with tbl.update_schema() as sch:
1159
sch.delete_column("column_1")
1160
1161
with tbl.update_schema() as sch:
1162
sch.rename_column("column_3", "column_1")
1163
1164
with tbl.update_schema() as sch:
1165
sch.rename_column("column_2", "column_3")
1166
1167
with tbl.update_schema() as sch:
1168
sch.move_first("column_1")
1169
1170
assert_frame_equal(
1171
pl.scan_iceberg(tbl, reader_override="native").collect().sort("column_3"),
1172
pl.DataFrame(
1173
{
1174
"column_1": ["P", "Q", "R", "S", "T", "U"],
1175
"column_3": pl.Series([1, 2, 3, 4, 5, 6], dtype=pl.Int64),
1176
}
1177
),
1178
)
1179
1180
q = pl.scan_iceberg(tbl, reader_override="native").filter(pl.col("column_3") == 5)
1181
1182
with monkeypatch.context() as cx:
1183
cx.setenv("POLARS_VERBOSE", "1")
1184
cx.setenv("POLARS_FORCE_EMPTY_READER_CAPABILITIES", "0")
1185
capfd.readouterr()
1186
out = q.collect()
1187
capture = capfd.readouterr().err
1188
1189
assert_frame_equal(
1190
out,
1191
pl.DataFrame(
1192
{
1193
"column_1": ["T"],
1194
"column_3": pl.Series([5], dtype=pl.Int64),
1195
}
1196
),
1197
)
1198
1199
# First file
1200
assert (
1201
"[ParquetFileReader]: Predicate pushdown: reading 0 / 1 row groups" in capture
1202
)
1203
# Second file
1204
assert (
1205
"[ParquetFileReader]: Predicate pushdown: reading 1 / 1 row groups" in capture
1206
)
1207
assert (
1208
"[ParquetFileReader]: Pre-filtered decode enabled (1 live, 1 non-live)"
1209
in capture
1210
)
1211
1212
1213
# Note: This test also generally covers primitive type round-tripping.
1214
@pytest.mark.parametrize("test_uuid", [True, False])
1215
def test_fill_missing_fields_with_identity_partition_values(
1216
test_uuid: bool, tmp_path: Path
1217
) -> None:
1218
from datetime import time
1219
1220
catalog = SqlCatalog(
1221
"default",
1222
uri="sqlite:///:memory:",
1223
warehouse=f"file://{tmp_path}",
1224
)
1225
catalog.create_namespace("namespace")
1226
1227
min_version = parse_version(pyiceberg.__version__) >= (0, 10, 0)
1228
1229
test_decimal_and_fixed = min_version
1230
test_uuid = test_uuid and min_version
1231
1232
next_field_id = partial(next, itertools.count(1))
1233
1234
iceberg_schema = IcebergSchema(
1235
NestedField(next_field_id(), "height_provider", IntegerType()),
1236
NestedField(next_field_id(), "BooleanType", BooleanType()),
1237
NestedField(next_field_id(), "IntegerType", IntegerType()),
1238
NestedField(next_field_id(), "LongType", LongType()),
1239
NestedField(next_field_id(), "FloatType", FloatType()),
1240
NestedField(next_field_id(), "DoubleType", DoubleType()),
1241
NestedField(next_field_id(), "DateType", DateType()),
1242
NestedField(next_field_id(), "TimeType", TimeType()),
1243
NestedField(next_field_id(), "TimestampType", TimestampType()),
1244
NestedField(next_field_id(), "TimestamptzType", TimestamptzType()),
1245
NestedField(next_field_id(), "StringType", StringType()),
1246
NestedField(next_field_id(), "BinaryType", BinaryType()),
1247
*(
1248
[
1249
NestedField(next_field_id(), "DecimalType", DecimalType(18, 2)),
1250
NestedField(next_field_id(), "FixedType", FixedType(1)),
1251
]
1252
if test_decimal_and_fixed
1253
else []
1254
),
1255
*([NestedField(next_field_id(), "UUIDType", UUIDType())] if test_uuid else []),
1256
)
1257
1258
arrow_tbl = pa.Table.from_pydict(
1259
{
1260
"height_provider": [1],
1261
"BooleanType": [True],
1262
"IntegerType": [1],
1263
"LongType": [1],
1264
"FloatType": [1.0],
1265
"DoubleType": [1.0],
1266
"DateType": [date(2025, 1, 1)],
1267
"TimeType": [time(11, 30)],
1268
"TimestampType": [datetime(2025, 1, 1)],
1269
"TimestamptzType": [datetime(2025, 1, 1)],
1270
"StringType": ["A"],
1271
"BinaryType": [b"A"],
1272
**(
1273
{"DecimalType": [D("1.0")], "FixedType": [b"A"]}
1274
if test_decimal_and_fixed
1275
else {}
1276
),
1277
**({"UUIDType": [b"0000111100001111"]} if test_uuid else {}),
1278
},
1279
schema=schema_to_pyarrow(iceberg_schema, include_field_ids=False),
1280
)
1281
1282
tbl = catalog.create_table(
1283
"namespace.table",
1284
iceberg_schema,
1285
partition_spec=PartitionSpec(
1286
# We have this to offset the indices
1287
PartitionField(
1288
iceberg_schema.fields[0].field_id, 0, BucketTransform(32), "bucket"
1289
),
1290
*(
1291
PartitionField(field.field_id, 0, IdentityTransform(), field.name)
1292
for field in iceberg_schema.fields[1:]
1293
),
1294
),
1295
)
1296
1297
if test_uuid:
1298
# Note: If this starts working one day we can include it in tests.
1299
with pytest.raises(
1300
pa.ArrowNotImplementedError, match="Keys of type extension<arrow.uuid>"
1301
):
1302
tbl.append(arrow_tbl)
1303
1304
return
1305
1306
tbl.append(arrow_tbl)
1307
1308
expect = pl.DataFrame(
1309
[
1310
pl.Series('height_provider', [1], dtype=pl.Int32),
1311
pl.Series('BooleanType', [True], dtype=pl.Boolean),
1312
pl.Series('IntegerType', [1], dtype=pl.Int32),
1313
pl.Series('LongType', [1], dtype=pl.Int64),
1314
pl.Series('FloatType', [1.0], dtype=pl.Float32),
1315
pl.Series('DoubleType', [1.0], dtype=pl.Float64),
1316
pl.Series('DateType', [date(2025, 1, 1)], dtype=pl.Date),
1317
pl.Series('TimeType', [time(11, 30)], dtype=pl.Time),
1318
pl.Series('TimestampType', [datetime(2025, 1, 1, 0, 0)], dtype=pl.Datetime(time_unit='us', time_zone=None)),
1319
pl.Series('TimestamptzType', [datetime(2025, 1, 1, 0, 0, tzinfo=zoneinfo.ZoneInfo(key='UTC'))], dtype=pl.Datetime(time_unit='us', time_zone='UTC')),
1320
pl.Series('StringType', ['A'], dtype=pl.String),
1321
pl.Series('BinaryType', [b'A'], dtype=pl.Binary),
1322
*(
1323
[
1324
pl.Series('DecimalType', [D('1.00')], dtype=pl.Decimal(precision=18, scale=2)),
1325
pl.Series('FixedType', [b'A'], dtype=pl.Binary),
1326
]
1327
if test_decimal_and_fixed
1328
else []
1329
),
1330
]
1331
) # fmt: skip
1332
1333
assert_frame_equal(
1334
pl.scan_iceberg(tbl, reader_override="pyiceberg").collect(),
1335
expect,
1336
)
1337
1338
assert_frame_equal(
1339
pl.scan_iceberg(tbl, reader_override="native").collect(),
1340
expect,
1341
)
1342
1343
dfiles = [*tbl.scan().plan_files()]
1344
1345
assert len(dfiles) == 1
1346
1347
p = dfiles[0].file.file_path.removeprefix("file://")
1348
1349
# Drop every column except 'height_provider'
1350
pq.write_table(
1351
pa.Table.from_pydict(
1352
{"height_provider": [1]},
1353
schema=schema_to_pyarrow(iceberg_schema.select("height_provider")),
1354
),
1355
p,
1356
)
1357
1358
out = pl.DataFrame(tbl.scan().to_arrow())
1359
1360
exclude_from_pyiceberg_check = ["TimeType"]
1361
1362
# Issues with reads from PyIceberg:
1363
# * Int32 / Float32 get upcast to 64-bit
1364
# * Logical types load as physical. TimeType cannot pass even with cast due
1365
# to it being in microseconds, whereas polars uses nanoseconds.
1366
for name in exclude_from_pyiceberg_check:
1367
# xfail, these are known problematic
1368
with pytest.raises(AssertionError):
1369
assert_frame_equal(out.select(name), expect.select(name))
1370
1371
assert_frame_equal(
1372
out.select(
1373
pl.col(c).cast(dt)
1374
for c, dt in expect.drop(exclude_from_pyiceberg_check).schema.items()
1375
),
1376
expect.drop(exclude_from_pyiceberg_check),
1377
)
1378
1379
assert_frame_equal(pl.scan_iceberg(tbl, reader_override="native").collect(), expect)
1380
1381
1382
@pytest.mark.skipif(
1383
parse_version(pyiceberg.__version__) < (0, 10, 0),
1384
reason="PyIceberg support for partitioning on nested primitive fields",
1385
)
1386
def test_fill_missing_fields_with_identity_partition_values_nested(
1387
tmp_path: Path,
1388
) -> None:
1389
catalog = SqlCatalog(
1390
"default",
1391
uri="sqlite:///:memory:",
1392
warehouse=f"file://{tmp_path}",
1393
)
1394
catalog.create_namespace("namespace")
1395
1396
next_field_id = partial(next, itertools.count())
1397
next_field_id()
1398
1399
iceberg_schema = IcebergSchema(
1400
NestedField(next_field_id(), "height_provider", IntegerType()),
1401
NestedField(
1402
next_field_id(),
1403
"struct_1",
1404
StructType(
1405
NestedField(
1406
next_field_id(),
1407
"struct_2",
1408
StructType(NestedField(2001, "field_1", LongType())),
1409
)
1410
),
1411
),
1412
)
1413
1414
tbl = catalog.create_table(
1415
"namespace.table",
1416
iceberg_schema,
1417
partition_spec=PartitionSpec(
1418
PartitionField(2001, 0, IdentityTransform(), "field_1")
1419
),
1420
)
1421
1422
pl.DataFrame(
1423
{"height_provider": [0], "struct_1": [{"struct_2": {"field_1": 300}}]},
1424
schema=pl.Schema(iceberg_schema.as_arrow()),
1425
).write_iceberg(tbl, mode="append")
1426
1427
expect = pl.DataFrame(
1428
[
1429
pl.Series("height_provider", [0], dtype=pl.Int32),
1430
pl.Series(
1431
"struct_1",
1432
[{"struct_2": {"field_1": 300}}],
1433
dtype=pl.Struct({"struct_2": pl.Struct({"field_1": pl.Int64})}),
1434
),
1435
]
1436
)
1437
1438
assert_frame_equal(
1439
pl.scan_iceberg(tbl, reader_override="pyiceberg").collect(),
1440
expect,
1441
)
1442
1443
assert_frame_equal(
1444
pl.scan_iceberg(tbl, reader_override="native").collect(),
1445
expect,
1446
)
1447
1448
# Note: We will still match even if the partition field is renamed, since it still
1449
# has the same source field ID.
1450
with tbl.update_spec() as spu:
1451
spu.rename_field("field_1", "AAA")
1452
1453
pl.DataFrame(
1454
{"height_provider": [None], "struct_1": [{"struct_2": {"field_1": 301}}]},
1455
schema=pl.Schema(iceberg_schema.as_arrow()),
1456
).write_iceberg(tbl, mode="append")
1457
1458
with tbl.update_spec() as spu:
1459
spu.remove_field("AAA")
1460
1461
pl.DataFrame(
1462
{"height_provider": [None], "struct_1": [{"struct_2": {"field_1": 302}}]},
1463
schema=pl.Schema(iceberg_schema.as_arrow()),
1464
).write_iceberg(tbl, mode="append")
1465
1466
for i, data_file in enumerate(tbl.scan().plan_files()):
1467
p = data_file.file.file_path
1468
1469
pq.write_table(
1470
pa.Table.from_pydict(
1471
{"height_provider": [i]},
1472
schema=schema_to_pyarrow(iceberg_schema.select("height_provider")),
1473
),
1474
p,
1475
)
1476
1477
# Deleting partitions only takes effect for newly added files.
1478
expect = pl.DataFrame(
1479
[
1480
pl.Series("height_provider", [0, 1, 2], dtype=pl.Int32),
1481
pl.Series(
1482
"struct_1",
1483
[
1484
None,
1485
{"struct_2": {"field_1": 301}},
1486
{"struct_2": {"field_1": 300}},
1487
],
1488
dtype=pl.Struct({"struct_2": pl.Struct({"field_1": pl.Int64})}),
1489
),
1490
]
1491
)
1492
1493
assert_frame_equal(pl.scan_iceberg(tbl, reader_override="native").collect(), expect)
1494
assert_frame_equal(
1495
pl.scan_iceberg(tbl, reader_override="native").select("struct_1").collect(),
1496
expect.select("struct_1"),
1497
)
1498
1499