Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/io/test_delta.py
6939 views
1
from __future__ import annotations
2
3
import os
4
import warnings
5
from datetime import datetime, timezone
6
from pathlib import Path
7
from typing import Any
8
9
import pyarrow as pa
10
import pyarrow.fs
11
import pytest
12
from deltalake import DeltaTable, write_deltalake
13
from deltalake.exceptions import DeltaError, TableNotFoundError
14
from deltalake.table import TableMerger
15
16
import polars as pl
17
from polars.io.cloud.credential_provider._builder import (
18
_init_credential_provider_builder,
19
)
20
from polars.testing import assert_frame_equal, assert_frame_not_equal
21
22
23
@pytest.fixture
24
def delta_table_path(io_files_path: Path) -> Path:
25
return io_files_path / "delta-table"
26
27
28
def test_scan_delta(delta_table_path: Path) -> None:
29
ldf = pl.scan_delta(delta_table_path, version=0)
30
31
expected = pl.DataFrame({"name": ["Joey", "Ivan"], "age": [14, 32]})
32
assert_frame_equal(expected, ldf.collect(), check_dtypes=False)
33
34
35
def test_scan_delta_version(delta_table_path: Path) -> None:
36
df1 = pl.scan_delta(delta_table_path, version=0).collect()
37
df2 = pl.scan_delta(delta_table_path, version=1).collect()
38
39
assert_frame_not_equal(df1, df2)
40
41
42
@pytest.mark.write_disk
43
def test_scan_delta_timestamp_version(tmp_path: Path) -> None:
44
df_sample = pl.DataFrame({"name": ["Joey"], "age": [14]})
45
df_sample.write_delta(tmp_path, mode="append")
46
47
df_sample2 = pl.DataFrame({"name": ["Ivan"], "age": [34]})
48
df_sample2.write_delta(tmp_path, mode="append")
49
50
log_dir = tmp_path / "_delta_log"
51
log_mtime_pair = [
52
("00000000000000000000.json", datetime(2010, 1, 1).timestamp()),
53
("00000000000000000001.json", datetime(2024, 1, 1).timestamp()),
54
]
55
for file_name, dt_epoch in log_mtime_pair:
56
file_path = log_dir / file_name
57
os.utime(str(file_path), (dt_epoch, dt_epoch))
58
59
df1 = pl.scan_delta(
60
str(tmp_path), version=datetime(2010, 1, 1, tzinfo=timezone.utc)
61
).collect()
62
df2 = pl.scan_delta(
63
str(tmp_path), version=datetime(2024, 1, 1, tzinfo=timezone.utc)
64
).collect()
65
66
assert_frame_equal(df1, df_sample)
67
assert_frame_equal(df2, pl.concat([df_sample, df_sample2]), check_row_order=False)
68
69
70
def test_scan_delta_columns(delta_table_path: Path) -> None:
71
ldf = pl.scan_delta(delta_table_path, version=0).select("name")
72
73
expected = pl.DataFrame({"name": ["Joey", "Ivan"]})
74
assert_frame_equal(expected, ldf.collect(), check_dtypes=False)
75
76
77
def test_scan_delta_relative(delta_table_path: Path) -> None:
78
rel_delta_table_path = str(delta_table_path / ".." / "delta-table")
79
80
ldf = pl.scan_delta(rel_delta_table_path, version=0)
81
82
expected = pl.DataFrame({"name": ["Joey", "Ivan"], "age": [14, 32]})
83
assert_frame_equal(expected, ldf.collect(), check_dtypes=False)
84
85
ldf = pl.scan_delta(rel_delta_table_path, version=1)
86
assert_frame_not_equal(expected, ldf.collect())
87
88
89
def test_read_delta(delta_table_path: Path) -> None:
90
df = pl.read_delta(delta_table_path, version=0)
91
92
expected = pl.DataFrame({"name": ["Joey", "Ivan"], "age": [14, 32]})
93
assert_frame_equal(expected, df, check_dtypes=False)
94
95
96
def test_read_delta_version(delta_table_path: Path) -> None:
97
df1 = pl.read_delta(delta_table_path, version=0)
98
df2 = pl.read_delta(delta_table_path, version=1)
99
100
assert_frame_not_equal(df1, df2)
101
102
103
@pytest.mark.write_disk
104
def test_read_delta_timestamp_version(tmp_path: Path) -> None:
105
df_sample = pl.DataFrame({"name": ["Joey"], "age": [14]})
106
df_sample.write_delta(tmp_path, mode="append")
107
108
df_sample2 = pl.DataFrame({"name": ["Ivan"], "age": [34]})
109
df_sample2.write_delta(tmp_path, mode="append")
110
111
log_dir = tmp_path / "_delta_log"
112
log_mtime_pair = [
113
("00000000000000000000.json", datetime(2010, 1, 1).timestamp()),
114
("00000000000000000001.json", datetime(2024, 1, 1).timestamp()),
115
]
116
for file_name, dt_epoch in log_mtime_pair:
117
file_path = log_dir / file_name
118
os.utime(str(file_path), (dt_epoch, dt_epoch))
119
120
df1 = pl.read_delta(
121
str(tmp_path), version=datetime(2010, 1, 1, tzinfo=timezone.utc)
122
)
123
df2 = pl.read_delta(
124
str(tmp_path), version=datetime(2024, 1, 1, tzinfo=timezone.utc)
125
)
126
127
assert_frame_equal(df1, df_sample)
128
assert_frame_equal(df2, pl.concat([df_sample, df_sample2]), check_row_order=False)
129
130
131
def test_read_delta_columns(delta_table_path: Path) -> None:
132
df = pl.read_delta(delta_table_path, version=0, columns=["name"])
133
134
expected = pl.DataFrame({"name": ["Joey", "Ivan"]})
135
assert_frame_equal(expected, df, check_dtypes=False)
136
137
138
def test_read_delta_relative(delta_table_path: Path) -> None:
139
rel_delta_table_path = str(delta_table_path / ".." / "delta-table")
140
141
df = pl.read_delta(rel_delta_table_path, version=0)
142
143
expected = pl.DataFrame({"name": ["Joey", "Ivan"], "age": [14, 32]})
144
assert_frame_equal(expected, df, check_dtypes=False)
145
146
147
@pytest.mark.write_disk
148
def test_write_delta(df: pl.DataFrame, tmp_path: Path) -> None:
149
v0 = df.select(pl.col(pl.String))
150
v1 = df.select(pl.col(pl.Int64))
151
df_supported = df.drop(["cat", "enum", "time"])
152
153
# Case: Success (version 0)
154
v0.write_delta(tmp_path)
155
156
# Case: Error if table exists
157
with pytest.raises(DeltaError, match="A table already exists"):
158
v0.write_delta(tmp_path)
159
160
# Case: Overwrite with new version (version 1)
161
v1.write_delta(
162
tmp_path, mode="overwrite", delta_write_options={"schema_mode": "overwrite"}
163
)
164
165
# Case: Error if schema contains unsupported columns
166
with pytest.raises(TypeError):
167
df.write_delta(
168
tmp_path, mode="overwrite", delta_write_options={"schema_mode": "overwrite"}
169
)
170
171
partitioned_tbl_uri = (tmp_path / ".." / "partitioned_table").resolve()
172
173
# Case: Write new partitioned table (version 0)
174
df_supported.write_delta(
175
partitioned_tbl_uri, delta_write_options={"partition_by": "strings"}
176
)
177
178
# Case: Read back
179
tbl = DeltaTable(tmp_path)
180
partitioned_tbl = DeltaTable(partitioned_tbl_uri)
181
182
pl_df_0 = pl.read_delta(tbl.table_uri, version=0)
183
pl_df_1 = pl.read_delta(tbl.table_uri, version=1)
184
pl_df_partitioned = pl.read_delta(partitioned_tbl_uri)
185
186
assert v0.shape == pl_df_0.shape
187
assert v0.columns == pl_df_0.columns
188
assert v1.shape == pl_df_1.shape
189
assert v1.columns == pl_df_1.columns
190
191
assert df_supported.shape == pl_df_partitioned.shape
192
assert sorted(df_supported.columns) == sorted(pl_df_partitioned.columns)
193
194
assert tbl.version() == 1
195
assert partitioned_tbl.version() == 0
196
assert Path(partitioned_tbl.table_uri) == partitioned_tbl_uri
197
assert partitioned_tbl.metadata().partition_columns == ["strings"]
198
199
assert_frame_equal(v0, pl_df_0, check_row_order=False)
200
assert_frame_equal(v1, pl_df_1, check_row_order=False)
201
202
cols = [c for c in df_supported.columns if not c.startswith("list_")]
203
assert_frame_equal(
204
df_supported.select(cols),
205
pl_df_partitioned.select(cols),
206
check_row_order=False,
207
)
208
209
# Case: Append to existing tables
210
v1.write_delta(tmp_path, mode="append")
211
tbl = DeltaTable(tmp_path)
212
pl_df_1 = pl.read_delta(tbl.table_uri, version=2)
213
214
assert tbl.version() == 2
215
assert pl_df_1.shape == (6, 2) # Rows are doubled
216
assert v1.columns == pl_df_1.columns
217
218
df_supported.write_delta(partitioned_tbl_uri, mode="append")
219
partitioned_tbl = DeltaTable(partitioned_tbl_uri)
220
pl_df_partitioned = pl.read_delta(partitioned_tbl.table_uri, version=1)
221
222
assert partitioned_tbl.version() == 1
223
assert pl_df_partitioned.shape == (6, 14) # Rows are doubled
224
assert sorted(df_supported.columns) == sorted(pl_df_partitioned.columns)
225
226
df_supported.write_delta(partitioned_tbl_uri, mode="overwrite")
227
228
229
@pytest.mark.write_disk
230
def test_write_delta_overwrite_schema_deprecated(
231
df: pl.DataFrame, tmp_path: Path
232
) -> None:
233
df = df.select(pl.col(pl.Int64))
234
with pytest.deprecated_call():
235
df.write_delta(tmp_path, mode="overwrite", overwrite_schema=True)
236
result = pl.read_delta(tmp_path)
237
assert_frame_equal(df, result)
238
239
240
@pytest.mark.write_disk
241
@pytest.mark.parametrize(
242
"series",
243
[
244
pl.Series("string", ["test"], dtype=pl.String),
245
pl.Series("uint", [1], dtype=pl.UInt64),
246
pl.Series("int", [1], dtype=pl.Int64),
247
pl.Series(
248
"uint_list",
249
[[[[[1, 2, 3], [1, 2, 3]], [[1, 2, 3], [1, 2, 3]]]]],
250
dtype=pl.List(pl.List(pl.List(pl.List(pl.UInt16)))),
251
),
252
pl.Series(
253
"date_ns", [datetime(2010, 1, 1, 0, 0)], dtype=pl.Datetime(time_unit="ns")
254
).dt.replace_time_zone("Australia/Lord_Howe"),
255
pl.Series(
256
"date_us",
257
[datetime(2010, 1, 1, 0, 0)],
258
dtype=pl.Datetime(time_unit="us"),
259
),
260
pl.Series(
261
"list_date",
262
[
263
[
264
datetime(2010, 1, 1, 0, 0),
265
datetime(2010, 1, 2, 0, 0),
266
]
267
],
268
dtype=pl.List(pl.Datetime(time_unit="ns")),
269
),
270
pl.Series(
271
"list_date_us",
272
[
273
[
274
datetime(2010, 1, 1, 0, 0),
275
datetime(2010, 1, 2, 0, 0),
276
]
277
],
278
dtype=pl.List(pl.Datetime(time_unit="ms")),
279
),
280
pl.Series(
281
"nested_list_date",
282
[
283
[
284
[
285
datetime(2010, 1, 1, 0, 0),
286
datetime(2010, 1, 2, 0, 0),
287
]
288
]
289
],
290
dtype=pl.List(pl.List(pl.Datetime(time_unit="ns"))),
291
),
292
pl.Series(
293
"struct_with_list",
294
[
295
{
296
"date_range": [
297
datetime(2010, 1, 1, 0, 0),
298
datetime(2010, 1, 2, 0, 0),
299
],
300
"date_us": [
301
datetime(2010, 1, 1, 0, 0),
302
datetime(2010, 1, 2, 0, 0),
303
],
304
"date_range_nested": [
305
[
306
datetime(2010, 1, 1, 0, 0),
307
datetime(2010, 1, 2, 0, 0),
308
]
309
],
310
"string": "test",
311
"int": 1,
312
}
313
],
314
dtype=pl.Struct(
315
[
316
pl.Field(
317
"date_range",
318
pl.List(pl.Datetime(time_unit="ms", time_zone="UTC")),
319
),
320
pl.Field(
321
"date_us", pl.List(pl.Datetime(time_unit="ms", time_zone=None))
322
),
323
pl.Field(
324
"date_range_nested",
325
pl.List(pl.List(pl.Datetime(time_unit="ms", time_zone=None))),
326
),
327
pl.Field("string", pl.String),
328
pl.Field("int", pl.UInt32),
329
]
330
),
331
),
332
pl.Series(
333
"list_with_struct_with_list",
334
[
335
[
336
{
337
"date_range": [
338
datetime(2010, 1, 1, 0, 0),
339
datetime(2010, 1, 2, 0, 0),
340
],
341
"date_ns": [
342
datetime(2010, 1, 1, 0, 0),
343
datetime(2010, 1, 2, 0, 0),
344
],
345
"date_range_nested": [
346
[
347
datetime(2010, 1, 1, 0, 0),
348
datetime(2010, 1, 2, 0, 0),
349
]
350
],
351
"string": "test",
352
"int": 1,
353
}
354
]
355
],
356
dtype=pl.List(
357
pl.Struct(
358
[
359
pl.Field(
360
"date_range",
361
pl.List(pl.Datetime(time_unit="ns", time_zone=None)),
362
),
363
pl.Field(
364
"date_ns",
365
pl.List(pl.Datetime(time_unit="ns", time_zone=None)),
366
),
367
pl.Field(
368
"date_range_nested",
369
pl.List(
370
pl.List(pl.Datetime(time_unit="ns", time_zone=None))
371
),
372
),
373
pl.Field("string", pl.String),
374
pl.Field("int", pl.UInt32),
375
]
376
)
377
),
378
),
379
],
380
)
381
def test_write_delta_w_compatible_schema(series: pl.Series, tmp_path: Path) -> None:
382
df = series.to_frame()
383
384
# Create table
385
df.write_delta(tmp_path, mode="append")
386
387
# Write to table again, should pass with reconstructed schema
388
df.write_delta(tmp_path, mode="append")
389
390
tbl = DeltaTable(tmp_path)
391
assert tbl.version() == 1
392
393
394
@pytest.mark.write_disk
395
@pytest.mark.parametrize(
396
"expr",
397
[
398
pl.datetime(2010, 1, 1, time_unit="us", time_zone="UTC"),
399
pl.datetime(2010, 1, 1, time_unit="ns", time_zone="EST"),
400
pl.datetime(2010, 1, 1, time_unit="ms", time_zone="Europe/Amsterdam"),
401
],
402
)
403
def test_write_delta_with_tz_in_df(expr: pl.Expr, tmp_path: Path) -> None:
404
df = pl.select(expr)
405
406
expected_dtype = pl.Datetime("us", "UTC")
407
expected = pl.select(expr.cast(expected_dtype))
408
409
df.write_delta(tmp_path, mode="append")
410
# write second time because delta-rs also casts timestamp with tz to timestamp no tz
411
df.write_delta(tmp_path, mode="append")
412
413
# Check schema of DeltaTable object
414
tbl = DeltaTable(tmp_path)
415
assert pa.schema(tbl.schema().to_arrow()) == expected.to_arrow().schema
416
417
# Check result
418
result = pl.read_delta(tmp_path, version=0)
419
assert_frame_equal(result, expected)
420
421
422
def test_write_delta_with_merge_and_no_table(tmp_path: Path) -> None:
423
df = pl.DataFrame({"a": [1, 2, 3]})
424
425
with pytest.raises(TableNotFoundError):
426
df.write_delta(
427
tmp_path, mode="merge", delta_merge_options={"predicate": "a = a"}
428
)
429
430
431
@pytest.mark.write_disk
432
def test_write_delta_with_merge(tmp_path: Path) -> None:
433
df = pl.DataFrame({"a": [1, 2, 3]})
434
435
df.write_delta(tmp_path)
436
437
merger = df.write_delta(
438
tmp_path,
439
mode="merge",
440
delta_merge_options={
441
"predicate": "s.a = t.a",
442
"source_alias": "s",
443
"target_alias": "t",
444
},
445
)
446
447
assert isinstance(merger, TableMerger)
448
assert merger._builder.source_alias == "s"
449
assert merger._builder.target_alias == "t"
450
451
merger.when_matched_delete(predicate="t.a > 2").execute()
452
453
result = pl.read_delta(tmp_path)
454
455
expected = df.filter(pl.col("a") <= 2)
456
assert_frame_equal(result, expected, check_row_order=False)
457
458
459
@pytest.mark.write_disk
460
def test_unsupported_dtypes(tmp_path: Path) -> None:
461
df = pl.DataFrame({"a": [None]}, schema={"a": pl.Null})
462
with pytest.raises(TypeError, match="unsupported data type"):
463
df.write_delta(tmp_path / "null")
464
465
df = pl.DataFrame({"a": [123]}, schema={"a": pl.Time})
466
with pytest.raises(TypeError, match="unsupported data type"):
467
df.write_delta(tmp_path / "time")
468
469
470
@pytest.mark.skip(
471
reason="upstream bug in delta-rs causing categorical to be written as categorical in parquet"
472
)
473
@pytest.mark.write_disk
474
def test_categorical_becomes_string(tmp_path: Path) -> None:
475
df = pl.DataFrame({"a": ["A", "B", "A"]}, schema={"a": pl.Categorical})
476
df.write_delta(tmp_path)
477
df2 = pl.read_delta(tmp_path)
478
assert_frame_equal(df2, pl.DataFrame({"a": ["A", "B", "A"]}, schema={"a": pl.Utf8}))
479
480
481
def test_scan_delta_DT_input(delta_table_path: Path) -> None:
482
DT = DeltaTable(delta_table_path, version=0)
483
ldf = pl.scan_delta(DT)
484
485
expected = pl.DataFrame({"name": ["Joey", "Ivan"], "age": [14, 32]})
486
assert_frame_equal(expected, ldf.collect(), check_dtypes=False)
487
488
489
@pytest.mark.write_disk
490
def test_read_delta_empty(tmp_path: Path) -> None:
491
tmp_path.mkdir(exist_ok=True)
492
path = str(tmp_path)
493
494
DeltaTable.create(path, pl.DataFrame(schema={"x": pl.Int64}).to_arrow().schema)
495
assert_frame_equal(pl.read_delta(path), pl.DataFrame(schema={"x": pl.Int64}))
496
497
498
@pytest.mark.write_disk
499
def test_read_delta_arrow_map_type(tmp_path: Path) -> None:
500
payload = [
501
{"id": 1, "account_id": {17: "100.01.001 Cash"}},
502
{"id": 2, "account_id": {18: "180.01.001 Cash", 19: "foo"}},
503
]
504
505
schema = pa.schema(
506
[
507
pa.field("id", pa.int32()),
508
pa.field("account_id", pa.map_(pa.int32(), pa.string())),
509
]
510
)
511
table = pa.Table.from_pylist(payload, schema)
512
513
expect = pl.DataFrame(table)
514
515
table_path = str(tmp_path)
516
write_deltalake(
517
table_path,
518
table,
519
mode="overwrite",
520
)
521
522
assert_frame_equal(pl.scan_delta(table_path).collect(), expect)
523
assert_frame_equal(pl.read_delta(table_path), expect)
524
525
526
@pytest.mark.may_fail_cloud # reason: inspects logs
527
@pytest.mark.write_disk
528
def test_scan_delta_nanosecond_timestamp(
529
tmp_path: Path,
530
monkeypatch: pytest.MonkeyPatch,
531
capfd: pytest.CaptureFixture[str],
532
) -> None:
533
df = pl.DataFrame(
534
{"timestamp": [datetime(2025, 1, 1), datetime(2025, 1, 2)]},
535
schema={"timestamp": pl.Datetime("us", time_zone="UTC")},
536
)
537
538
df_nano_ts = pl.DataFrame(
539
{"timestamp": [datetime(2025, 1, 1), datetime(2025, 1, 2)]},
540
schema={"timestamp": pl.Datetime("ns", time_zone=None)},
541
)
542
543
root = tmp_path / "delta"
544
545
df.write_delta(root)
546
547
# Manually overwrite the file with one that has nanosecond timestamps.
548
parquet_files = [x for x in root.iterdir() if x.suffix == ".parquet"]
549
assert len(parquet_files) == 1
550
parquet_file_path = parquet_files[0]
551
552
df_nano_ts.write_parquet(parquet_file_path)
553
554
# Baseline: The timestamp in the file is in nanoseconds.
555
q = pl.scan_parquet(parquet_file_path)
556
assert q.collect_schema() == {"timestamp": pl.Datetime("ns", time_zone=None)}
557
assert_frame_equal(q.collect(), df_nano_ts)
558
559
q = pl.scan_delta(root)
560
561
assert q.collect_schema() == {"timestamp": pl.Datetime("us", time_zone="UTC")}
562
assert_frame_equal(q.collect(), df)
563
564
# Ensure row-group skipping is functioning.
565
q = pl.scan_delta(root).filter(
566
pl.col("timestamp")
567
< pl.lit(datetime(2025, 1, 1), dtype=pl.Datetime("us", time_zone="UTC"))
568
)
569
monkeypatch.setenv("POLARS_VERBOSE", "1")
570
capfd.readouterr()
571
572
assert_frame_equal(q.collect(), df.clear())
573
assert "reading 0 / 1 row groups" in capfd.readouterr().err
574
575
576
@pytest.mark.write_disk
577
def test_scan_delta_nanosecond_timestamp_nested(tmp_path: Path) -> None:
578
df = pl.DataFrame(
579
{
580
"c1": [
581
{"timestamp": datetime(2025, 1, 1)},
582
{"timestamp": datetime(2025, 1, 2)},
583
]
584
},
585
schema={"c1": pl.Struct({"timestamp": pl.Datetime("us", time_zone="UTC")})},
586
)
587
588
df_nano_ts = pl.DataFrame(
589
{
590
"c1": [
591
{"timestamp": datetime(2025, 1, 1)},
592
{"timestamp": datetime(2025, 1, 2)},
593
]
594
},
595
schema={"c1": pl.Struct({"timestamp": pl.Datetime("ns", time_zone=None)})},
596
)
597
598
root = tmp_path / "delta"
599
600
df.write_delta(root)
601
602
# Manually overwrite the file with one that has nanosecond timestamps.
603
parquet_files = [x for x in root.iterdir() if x.suffix == ".parquet"]
604
assert len(parquet_files) == 1
605
parquet_file_path = parquet_files[0]
606
607
df_nano_ts.write_parquet(parquet_file_path)
608
609
# Baseline: The timestamp in the file is in nanoseconds.
610
q = pl.scan_parquet(parquet_file_path)
611
assert q.collect_schema() == {
612
"c1": pl.Struct({"timestamp": pl.Datetime("ns", time_zone=None)})
613
}
614
assert_frame_equal(q.collect(), df_nano_ts)
615
616
q = pl.scan_delta(root)
617
618
assert q.collect_schema() == {
619
"c1": pl.Struct({"timestamp": pl.Datetime("us", time_zone="UTC")})
620
}
621
assert_frame_equal(q.collect(), df)
622
623
624
@pytest.mark.write_disk
625
def test_scan_delta_schema_evolution_nested_struct_field_19915(tmp_path: Path) -> None:
626
(
627
pl.DataFrame(
628
{"a": ["test"], "properties": [{"property_key": {"item": 1}}]}
629
).write_delta(tmp_path)
630
)
631
632
(
633
pl.DataFrame(
634
{
635
"a": ["test1"],
636
"properties": [{"property_key": {"item": 50, "item2": 10}}],
637
}
638
).write_delta(
639
tmp_path,
640
mode="append",
641
delta_write_options={"schema_mode": "merge"},
642
)
643
)
644
645
q = pl.scan_delta(tmp_path)
646
647
expect = pl.DataFrame(
648
{
649
"a": ["test", "test1"],
650
"properties": [
651
{"property_key": {"item": 1, "item2": None}},
652
{"property_key": {"item": 50, "item2": 10}},
653
],
654
},
655
schema={
656
"a": pl.String,
657
"properties": pl.Struct(
658
{"property_key": pl.Struct({"item": pl.Int64, "item2": pl.Int64})}
659
),
660
},
661
)
662
663
assert_frame_equal(q.sort("a").collect(), expect)
664
665
666
@pytest.mark.write_disk
667
def test_scan_delta_storage_options_from_delta_table(
668
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
669
) -> None:
670
import polars.io.delta
671
672
storage_options_checked = False
673
674
def assert_scan_parquet_storage_options(*a: Any, **kw: Any) -> Any:
675
nonlocal storage_options_checked
676
677
assert kw["storage_options"] == {
678
"aws_endpoint_url": "http://localhost:777",
679
"aws_access_key_id": "...",
680
"aws_secret_access_key": "...",
681
"aws_session_token": "...",
682
"endpoint_url": "...",
683
}
684
685
storage_options_checked = True
686
687
return pl.scan_parquet(*a, **kw)
688
689
monkeypatch.setattr(
690
polars.io.delta, "scan_parquet", assert_scan_parquet_storage_options
691
)
692
693
df = pl.DataFrame({"a": ["test"], "properties": [{"property_key": {"item": 1}}]})
694
695
df.write_delta(tmp_path)
696
697
tbl = DeltaTable(
698
tmp_path,
699
storage_options={
700
"aws_endpoint_url": "http://localhost:333",
701
"aws_access_key_id": "...",
702
"aws_secret_access_key": "...",
703
"aws_session_token": "...",
704
},
705
)
706
707
with warnings.catch_warnings():
708
warnings.filterwarnings("ignore", category=RuntimeWarning)
709
710
q = pl.scan_delta(
711
tbl,
712
storage_options={
713
"aws_endpoint_url": "http://localhost:777",
714
"endpoint_url": "...",
715
},
716
)
717
718
assert_frame_equal(q.collect(), df)
719
720
assert storage_options_checked
721
722
723
def test_scan_delta_loads_aws_profile_endpoint_url(
724
tmp_path: Path,
725
monkeypatch: pytest.MonkeyPatch,
726
) -> None:
727
tmp_path.mkdir(exist_ok=True)
728
729
cfg_file_path = tmp_path / "config"
730
731
cfg_file_path.write_text("""\
732
[profile endpoint_333]
733
aws_access_key_id=A
734
aws_secret_access_key=A
735
endpoint_url = http://localhost:333
736
""")
737
738
monkeypatch.setenv("AWS_CONFIG_FILE", str(cfg_file_path))
739
monkeypatch.setenv("AWS_PROFILE", "endpoint_333")
740
741
assert (
742
builder := _init_credential_provider_builder(
743
"auto", "s3://.../...", storage_options=None, caller_name="test"
744
)
745
) is not None
746
747
assert isinstance(
748
provider := builder.build_credential_provider(),
749
pl.CredentialProviderAWS,
750
)
751
752
assert provider._can_use_as_provider()
753
754
assert provider._storage_update_options() == {
755
"endpoint_url": "http://localhost:333"
756
}
757
758
with pytest.raises(OSError, match="http://localhost:333"):
759
pl.scan_delta("s3://.../...")
760
761
with pytest.raises(OSError, match="http://localhost:333"):
762
pl.DataFrame({"x": 1}).write_delta("s3://.../...", mode="append")
763
764