Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/io/test_delta.py
8424 views
1
from __future__ import annotations
2
3
import os
4
import pickle
5
import warnings
6
from datetime import datetime, timezone
7
from decimal import Decimal
8
from pathlib import Path
9
from typing import TYPE_CHECKING, Any
10
11
import pyarrow as pa
12
import pytest
13
from deltalake import DeltaTable, write_deltalake
14
from deltalake.exceptions import DeltaError, TableNotFoundError
15
from deltalake.table import TableMerger
16
17
import polars as pl
18
from polars.io.cloud._utils import NoPickleOption
19
from polars.io.cloud.credential_provider._builder import (
20
_init_credential_provider_builder,
21
)
22
from polars.io.delta._dataset import DeltaDataset
23
from polars.io.delta._utils import _extract_table_statistics_from_delta_add_actions
24
from polars.testing import assert_frame_equal, assert_frame_not_equal
25
26
if TYPE_CHECKING:
27
from tests.conftest import PlMonkeyPatch
28
29
30
@pytest.fixture
31
def delta_table_path(io_files_path: Path) -> Path:
32
return io_files_path / "delta-table"
33
34
35
def new_pl_delta_dataset(source: str | DeltaTable) -> DeltaDataset:
36
return DeltaDataset(
37
table_=NoPickleOption(source if isinstance(source, DeltaTable) else None),
38
table_uri_=source if not isinstance(source, DeltaTable) else None,
39
version=None,
40
storage_options=None,
41
credential_provider_builder=None,
42
delta_table_options=None,
43
use_pyarrow=False,
44
pyarrow_options=None,
45
rechunk=False,
46
)
47
48
49
def test_scan_delta(delta_table_path: Path) -> None:
50
ldf = pl.scan_delta(delta_table_path, version=0)
51
52
expected = pl.DataFrame({"name": ["Joey", "Ivan"], "age": [14, 32]})
53
assert_frame_equal(expected, ldf.collect(), check_dtypes=False)
54
55
56
def test_scan_delta_version(delta_table_path: Path) -> None:
57
df1 = pl.scan_delta(delta_table_path, version=0).collect()
58
df2 = pl.scan_delta(delta_table_path, version=1).collect()
59
60
assert_frame_not_equal(df1, df2)
61
62
63
@pytest.mark.write_disk
64
def test_scan_delta_timestamp_version(tmp_path: Path) -> None:
65
df_sample = pl.DataFrame({"name": ["Joey"], "age": [14]})
66
df_sample.write_delta(tmp_path, mode="append")
67
68
df_sample2 = pl.DataFrame({"name": ["Ivan"], "age": [34]})
69
df_sample2.write_delta(tmp_path, mode="append")
70
71
log_dir = tmp_path / "_delta_log"
72
log_mtime_pair = [
73
("00000000000000000000.json", datetime(2010, 1, 1).timestamp()),
74
("00000000000000000001.json", datetime(2024, 1, 1).timestamp()),
75
]
76
for file_name, dt_epoch in log_mtime_pair:
77
file_path = log_dir / file_name
78
os.utime(str(file_path), (dt_epoch, dt_epoch))
79
80
df1 = pl.scan_delta(
81
str(tmp_path), version=datetime(2010, 1, 1, tzinfo=timezone.utc)
82
).collect()
83
df2 = pl.scan_delta(
84
str(tmp_path), version=datetime(2024, 1, 1, tzinfo=timezone.utc)
85
).collect()
86
87
assert_frame_equal(df1, df_sample)
88
assert_frame_equal(df2, pl.concat([df_sample, df_sample2]), check_row_order=False)
89
90
91
def test_scan_delta_columns(delta_table_path: Path) -> None:
92
ldf = pl.scan_delta(delta_table_path, version=0).select("name")
93
94
expected = pl.DataFrame({"name": ["Joey", "Ivan"]})
95
assert_frame_equal(expected, ldf.collect(), check_dtypes=False)
96
97
98
def test_scan_delta_polars_storage_options_keys(
99
delta_table_path: Path,
100
plmonkeypatch: PlMonkeyPatch,
101
capfd: pytest.CaptureFixture[str],
102
) -> None:
103
plmonkeypatch.setenv("POLARS_VERBOSE_SENSITIVE", "1")
104
lf = pl.scan_delta(
105
delta_table_path,
106
version=0,
107
storage_options={
108
"file_cache_ttl": 7,
109
"max_retries": 3,
110
"retry_timeout_ms": 9873,
111
"retry_init_backoff_ms": 9874,
112
"retry_max_backoff_ms": 9875,
113
"retry_base_multiplier": 3.14159,
114
},
115
credential_provider=None,
116
).select("name")
117
118
lf.collect()
119
120
capture = capfd.readouterr().err
121
122
assert "file_cache_ttl: 7" in capture
123
124
assert (
125
"""\
126
max_retries: Some(3), \
127
retry_timeout: Some(9.873s), \
128
retry_init_backoff: Some(9.874s), \
129
retry_max_backoff: Some(9.875s), \
130
retry_base_multiplier: Some(TotalOrdWrap(3.14159)) }"""
131
in capture
132
)
133
134
135
def test_scan_delta_relative(delta_table_path: Path) -> None:
136
rel_delta_table_path = str(delta_table_path / ".." / "delta-table")
137
138
ldf = pl.scan_delta(rel_delta_table_path, version=0)
139
140
expected = pl.DataFrame({"name": ["Joey", "Ivan"], "age": [14, 32]})
141
assert_frame_equal(expected, ldf.collect(), check_dtypes=False)
142
143
ldf = pl.scan_delta(rel_delta_table_path, version=1)
144
assert_frame_not_equal(expected, ldf.collect())
145
146
147
def test_read_delta(delta_table_path: Path) -> None:
148
df = pl.read_delta(delta_table_path, version=0)
149
150
expected = pl.DataFrame({"name": ["Joey", "Ivan"], "age": [14, 32]})
151
assert_frame_equal(expected, df, check_dtypes=False)
152
153
154
def test_read_delta_version(delta_table_path: Path) -> None:
155
df1 = pl.read_delta(delta_table_path, version=0)
156
df2 = pl.read_delta(delta_table_path, version=1)
157
158
assert_frame_not_equal(df1, df2)
159
160
161
@pytest.mark.write_disk
162
def test_read_delta_timestamp_version(tmp_path: Path) -> None:
163
df_sample = pl.DataFrame({"name": ["Joey"], "age": [14]})
164
df_sample.write_delta(tmp_path, mode="append")
165
166
df_sample2 = pl.DataFrame({"name": ["Ivan"], "age": [34]})
167
df_sample2.write_delta(tmp_path, mode="append")
168
169
log_dir = tmp_path / "_delta_log"
170
log_mtime_pair = [
171
("00000000000000000000.json", datetime(2010, 1, 1).timestamp()),
172
("00000000000000000001.json", datetime(2024, 1, 1).timestamp()),
173
]
174
for file_name, dt_epoch in log_mtime_pair:
175
file_path = log_dir / file_name
176
os.utime(str(file_path), (dt_epoch, dt_epoch))
177
178
df1 = pl.read_delta(
179
str(tmp_path), version=datetime(2010, 1, 1, tzinfo=timezone.utc)
180
)
181
df2 = pl.read_delta(
182
str(tmp_path), version=datetime(2024, 1, 1, tzinfo=timezone.utc)
183
)
184
185
assert_frame_equal(df1, df_sample)
186
assert_frame_equal(df2, pl.concat([df_sample, df_sample2]), check_row_order=False)
187
188
189
def test_read_delta_columns(delta_table_path: Path) -> None:
190
df = pl.read_delta(delta_table_path, version=0, columns=["name"])
191
192
expected = pl.DataFrame({"name": ["Joey", "Ivan"]})
193
assert_frame_equal(expected, df, check_dtypes=False)
194
195
196
def test_read_delta_relative(delta_table_path: Path) -> None:
197
rel_delta_table_path = str(delta_table_path / ".." / "delta-table")
198
199
df = pl.read_delta(rel_delta_table_path, version=0)
200
201
expected = pl.DataFrame({"name": ["Joey", "Ivan"], "age": [14, 32]})
202
assert_frame_equal(expected, df, check_dtypes=False)
203
204
205
@pytest.mark.write_disk
206
def test_write_delta(df: pl.DataFrame, tmp_path: Path) -> None:
207
v0 = df.select(pl.col(pl.String))
208
v1 = df.select(pl.col(pl.Int64))
209
df_supported = df.drop(["cat", "enum", "time"])
210
211
# Case: Success (version 0)
212
v0.write_delta(tmp_path)
213
214
# Case: Error if table exists
215
with pytest.raises(DeltaError, match="A table already exists"):
216
v0.write_delta(tmp_path)
217
218
# Case: Overwrite with new version (version 1)
219
v1.write_delta(
220
tmp_path, mode="overwrite", delta_write_options={"schema_mode": "overwrite"}
221
)
222
223
# Case: Error if schema contains unsupported columns
224
with pytest.raises(TypeError):
225
df.write_delta(
226
tmp_path, mode="overwrite", delta_write_options={"schema_mode": "overwrite"}
227
)
228
229
partitioned_tbl_uri = (tmp_path / ".." / "partitioned_table").resolve()
230
231
# Case: Write new partitioned table (version 0)
232
df_supported.write_delta(
233
partitioned_tbl_uri, delta_write_options={"partition_by": "strings"}
234
)
235
236
# Case: Read back
237
tbl = DeltaTable(tmp_path)
238
partitioned_tbl = DeltaTable(partitioned_tbl_uri)
239
240
pl_df_0 = pl.read_delta(tbl.table_uri, version=0)
241
pl_df_1 = pl.read_delta(tbl.table_uri, version=1)
242
pl_df_partitioned = pl.read_delta(partitioned_tbl_uri)
243
244
assert v0.shape == pl_df_0.shape
245
assert v0.columns == pl_df_0.columns
246
assert v1.shape == pl_df_1.shape
247
assert v1.columns == pl_df_1.columns
248
249
assert df_supported.shape == pl_df_partitioned.shape
250
assert sorted(df_supported.columns) == sorted(pl_df_partitioned.columns)
251
252
assert tbl.version() == 1
253
assert partitioned_tbl.version() == 0
254
255
uri = partitioned_tbl.table_uri.removeprefix("file://")
256
if os.name == "nt" and uri.startswith("/"):
257
uri = uri[1:]
258
259
assert Path(uri) == partitioned_tbl_uri
260
assert partitioned_tbl.metadata().partition_columns == ["strings"]
261
262
assert_frame_equal(v0, pl_df_0, check_row_order=False)
263
assert_frame_equal(v1, pl_df_1, check_row_order=False)
264
265
cols = [c for c in df_supported.columns if not c.startswith("list_")]
266
assert_frame_equal(
267
df_supported.select(cols),
268
pl_df_partitioned.select(cols),
269
check_row_order=False,
270
)
271
272
# Case: Append to existing tables
273
v1.write_delta(tmp_path, mode="append")
274
tbl = DeltaTable(tmp_path)
275
pl_df_1 = pl.read_delta(tbl.table_uri, version=2)
276
277
assert tbl.version() == 2
278
assert pl_df_1.shape == (6, 2) # Rows are doubled
279
assert v1.columns == pl_df_1.columns
280
281
df_supported.write_delta(partitioned_tbl_uri, mode="append")
282
partitioned_tbl = DeltaTable(partitioned_tbl_uri)
283
pl_df_partitioned = pl.read_delta(partitioned_tbl.table_uri, version=1)
284
285
assert partitioned_tbl.version() == 1
286
assert pl_df_partitioned.shape == (6, 14) # Rows are doubled
287
assert sorted(df_supported.columns) == sorted(pl_df_partitioned.columns)
288
289
df_supported.write_delta(partitioned_tbl_uri, mode="overwrite")
290
291
292
@pytest.mark.write_disk
293
def test_sink_delta(df: pl.DataFrame, tmp_path: Path) -> None:
294
v0 = df.lazy().select(pl.col(pl.String))
295
v1 = df.lazy().select(pl.col(pl.Int64))
296
df_supported = df.drop(["cat", "enum", "time"])
297
298
# Case: Success (version 0)
299
v0.sink_delta(tmp_path)
300
301
# Case: Error if table exists
302
with pytest.raises(DeltaError, match="A table already exists"):
303
v0.sink_delta(tmp_path)
304
305
# Case: Overwrite with new version (version 1)
306
v1.sink_delta(
307
tmp_path, mode="overwrite", delta_write_options={"schema_mode": "overwrite"}
308
)
309
310
# Case: Error if schema contains unsupported columns
311
with pytest.raises(TypeError):
312
df.lazy().sink_delta(
313
tmp_path, mode="overwrite", delta_write_options={"schema_mode": "overwrite"}
314
)
315
316
partitioned_tbl_uri = (tmp_path / ".." / "partitioned_table_sink").resolve()
317
318
# Case: Write new partitioned table (version 0)
319
df_supported.lazy().sink_delta(
320
partitioned_tbl_uri, delta_write_options={"partition_by": "strings"}
321
)
322
323
# Case: Read back
324
tbl = DeltaTable(tmp_path)
325
partitioned_tbl = DeltaTable(partitioned_tbl_uri)
326
327
pl_df_0 = pl.read_delta(tbl.table_uri, version=0)
328
pl_df_1 = pl.read_delta(tbl.table_uri, version=1)
329
pl_df_partitioned = pl.read_delta(partitioned_tbl_uri)
330
331
assert v0.collect().shape == pl_df_0.shape
332
assert v0.collect_schema().names() == pl_df_0.columns
333
assert v1.collect().shape == pl_df_1.shape
334
assert v1.collect_schema().names() == pl_df_1.columns
335
336
assert df_supported.shape == pl_df_partitioned.shape
337
assert sorted(df_supported.columns) == sorted(pl_df_partitioned.columns)
338
339
assert tbl.version() == 1
340
assert partitioned_tbl.version() == 0
341
342
uri = partitioned_tbl.table_uri.removeprefix("file://")
343
if os.name == "nt" and uri.startswith("/"):
344
uri = uri[1:]
345
346
assert Path(uri) == partitioned_tbl_uri
347
assert partitioned_tbl.metadata().partition_columns == ["strings"]
348
349
assert_frame_equal(v0.collect(), pl_df_0, check_row_order=False)
350
assert_frame_equal(v1.collect(), pl_df_1, check_row_order=False)
351
352
cols = [c for c in df_supported.columns if not c.startswith("list_")]
353
assert_frame_equal(
354
df_supported.select(cols),
355
pl_df_partitioned.select(cols),
356
check_row_order=False,
357
)
358
359
# Case: Append to existing tables
360
v1.sink_delta(tmp_path, mode="append")
361
tbl = DeltaTable(tmp_path)
362
pl_df_1 = pl.read_delta(tbl.table_uri, version=2)
363
364
assert tbl.version() == 2
365
assert pl_df_1.shape == (6, 2) # Rows are doubled
366
assert v1.collect_schema().names() == pl_df_1.columns
367
368
df_supported.lazy().sink_delta(partitioned_tbl_uri, mode="append")
369
partitioned_tbl = DeltaTable(partitioned_tbl_uri)
370
pl_df_partitioned = pl.read_delta(partitioned_tbl.table_uri, version=1)
371
372
assert partitioned_tbl.version() == 1
373
assert pl_df_partitioned.shape == (6, 14) # Rows are doubled
374
assert sorted(df_supported.columns) == sorted(pl_df_partitioned.columns)
375
376
df_supported.lazy().sink_delta(partitioned_tbl_uri, mode="overwrite")
377
378
379
@pytest.mark.write_disk
380
def test_write_delta_overwrite_schema_deprecated(
381
df: pl.DataFrame, tmp_path: Path
382
) -> None:
383
df = df.select(pl.col(pl.Int64))
384
with pytest.deprecated_call():
385
df.write_delta(tmp_path, mode="overwrite", overwrite_schema=True)
386
result = pl.read_delta(tmp_path)
387
assert_frame_equal(df, result)
388
389
390
@pytest.mark.write_disk
391
@pytest.mark.parametrize(
392
"series",
393
[
394
pl.Series("string", ["test"], dtype=pl.String),
395
pl.Series("uint", [1], dtype=pl.UInt64),
396
pl.Series("int", [1], dtype=pl.Int64),
397
pl.Series(
398
"uint_list",
399
[[[[[1, 2, 3], [1, 2, 3]], [[1, 2, 3], [1, 2, 3]]]]],
400
dtype=pl.List(pl.List(pl.List(pl.List(pl.UInt16)))),
401
),
402
pl.Series(
403
"date_ns", [datetime(2010, 1, 1, 0, 0)], dtype=pl.Datetime(time_unit="ns")
404
).dt.replace_time_zone("Australia/Lord_Howe"),
405
pl.Series(
406
"date_us",
407
[datetime(2010, 1, 1, 0, 0)],
408
dtype=pl.Datetime(time_unit="us"),
409
),
410
pl.Series(
411
"list_date",
412
[
413
[
414
datetime(2010, 1, 1, 0, 0),
415
datetime(2010, 1, 2, 0, 0),
416
]
417
],
418
dtype=pl.List(pl.Datetime(time_unit="ns")),
419
),
420
pl.Series(
421
"list_date_us",
422
[
423
[
424
datetime(2010, 1, 1, 0, 0),
425
datetime(2010, 1, 2, 0, 0),
426
]
427
],
428
dtype=pl.List(pl.Datetime(time_unit="ms")),
429
),
430
pl.Series(
431
"nested_list_date",
432
[
433
[
434
[
435
datetime(2010, 1, 1, 0, 0),
436
datetime(2010, 1, 2, 0, 0),
437
]
438
]
439
],
440
dtype=pl.List(pl.List(pl.Datetime(time_unit="ns"))),
441
),
442
pl.Series(
443
"struct_with_list",
444
[
445
{
446
"date_range": [
447
datetime(2010, 1, 1, 0, 0),
448
datetime(2010, 1, 2, 0, 0),
449
],
450
"date_us": [
451
datetime(2010, 1, 1, 0, 0),
452
datetime(2010, 1, 2, 0, 0),
453
],
454
"date_range_nested": [
455
[
456
datetime(2010, 1, 1, 0, 0),
457
datetime(2010, 1, 2, 0, 0),
458
]
459
],
460
"string": "test",
461
"int": 1,
462
}
463
],
464
dtype=pl.Struct(
465
[
466
pl.Field(
467
"date_range",
468
pl.List(pl.Datetime(time_unit="ms", time_zone="UTC")),
469
),
470
pl.Field(
471
"date_us", pl.List(pl.Datetime(time_unit="ms", time_zone=None))
472
),
473
pl.Field(
474
"date_range_nested",
475
pl.List(pl.List(pl.Datetime(time_unit="ms", time_zone=None))),
476
),
477
pl.Field("string", pl.String),
478
pl.Field("int", pl.UInt32),
479
]
480
),
481
),
482
pl.Series(
483
"list_with_struct_with_list",
484
[
485
[
486
{
487
"date_range": [
488
datetime(2010, 1, 1, 0, 0),
489
datetime(2010, 1, 2, 0, 0),
490
],
491
"date_ns": [
492
datetime(2010, 1, 1, 0, 0),
493
datetime(2010, 1, 2, 0, 0),
494
],
495
"date_range_nested": [
496
[
497
datetime(2010, 1, 1, 0, 0),
498
datetime(2010, 1, 2, 0, 0),
499
]
500
],
501
"string": "test",
502
"int": 1,
503
}
504
]
505
],
506
dtype=pl.List(
507
pl.Struct(
508
[
509
pl.Field(
510
"date_range",
511
pl.List(pl.Datetime(time_unit="ns", time_zone=None)),
512
),
513
pl.Field(
514
"date_ns",
515
pl.List(pl.Datetime(time_unit="ns", time_zone=None)),
516
),
517
pl.Field(
518
"date_range_nested",
519
pl.List(
520
pl.List(pl.Datetime(time_unit="ns", time_zone=None))
521
),
522
),
523
pl.Field("string", pl.String),
524
pl.Field("int", pl.UInt32),
525
]
526
)
527
),
528
),
529
],
530
)
531
def test_write_delta_w_compatible_schema(series: pl.Series, tmp_path: Path) -> None:
532
df = series.to_frame()
533
534
# Create table
535
df.write_delta(tmp_path, mode="append")
536
537
# Write to table again, should pass with reconstructed schema
538
df.write_delta(tmp_path, mode="append")
539
540
tbl = DeltaTable(tmp_path)
541
assert tbl.version() == 1
542
543
544
@pytest.mark.write_disk
545
@pytest.mark.parametrize(
546
"expr",
547
[
548
pl.datetime(2010, 1, 1, time_unit="us", time_zone="UTC"),
549
pl.datetime(2010, 1, 1, time_unit="ns", time_zone="America/New_York"),
550
pl.datetime(2010, 1, 1, time_unit="ms", time_zone="Europe/Amsterdam"),
551
],
552
)
553
def test_write_delta_with_tz_in_df(expr: pl.Expr, tmp_path: Path) -> None:
554
df = pl.select(expr)
555
556
expected_dtype = pl.Datetime("us", "UTC")
557
expected = pl.select(expr.cast(expected_dtype))
558
559
df.write_delta(tmp_path, mode="append")
560
# write second time because delta-rs also casts timestamp with tz to timestamp no tz
561
df.write_delta(tmp_path, mode="append")
562
563
# Check schema of DeltaTable object
564
tbl = DeltaTable(tmp_path)
565
assert pa.schema(tbl.schema().to_arrow()) == expected.to_arrow().schema
566
567
# Check result
568
result = pl.read_delta(tmp_path, version=0)
569
assert_frame_equal(result, expected)
570
571
572
def test_write_delta_with_merge_and_no_table(tmp_path: Path) -> None:
573
df = pl.DataFrame({"a": [1, 2, 3]})
574
575
with pytest.raises(TableNotFoundError):
576
df.write_delta(
577
tmp_path, mode="merge", delta_merge_options={"predicate": "a = a"}
578
)
579
580
581
@pytest.mark.write_disk
582
def test_write_delta_with_merge(tmp_path: Path) -> None:
583
df = pl.DataFrame({"a": [1, 2, 3]})
584
585
df.write_delta(tmp_path)
586
587
merger = df.write_delta(
588
tmp_path,
589
mode="merge",
590
delta_merge_options={
591
"predicate": "s.a = t.a",
592
"source_alias": "s",
593
"target_alias": "t",
594
},
595
)
596
597
assert isinstance(merger, TableMerger)
598
assert merger._builder.source_alias == "s"
599
assert merger._builder.target_alias == "t"
600
601
merger.when_matched_delete(predicate="t.a > 2").execute()
602
603
result = pl.read_delta(tmp_path)
604
605
expected = df.filter(pl.col("a") <= 2)
606
assert_frame_equal(result, expected, check_row_order=False)
607
608
609
@pytest.mark.write_disk
610
def test_unsupported_dtypes(tmp_path: Path) -> None:
611
df = pl.DataFrame({"a": [None]}, schema={"a": pl.Null})
612
with pytest.raises(TypeError, match="unsupported data type"):
613
df.write_delta(tmp_path / "null")
614
615
df = pl.DataFrame({"a": [123]}, schema={"a": pl.Time})
616
with pytest.raises(TypeError, match="unsupported data type"):
617
df.write_delta(tmp_path / "time")
618
619
620
@pytest.mark.skip(
621
reason="upstream bug in delta-rs causing categorical to be written as categorical in parquet"
622
)
623
@pytest.mark.write_disk
624
def test_categorical_becomes_string(tmp_path: Path) -> None:
625
df = pl.DataFrame({"a": ["A", "B", "A"]}, schema={"a": pl.Categorical})
626
df.write_delta(tmp_path)
627
df2 = pl.read_delta(tmp_path)
628
assert_frame_equal(df2, pl.DataFrame({"a": ["A", "B", "A"]}, schema={"a": pl.Utf8}))
629
630
631
def test_scan_delta_DT_input(delta_table_path: Path) -> None:
632
DT = DeltaTable(delta_table_path, version=0)
633
ldf = pl.scan_delta(DT)
634
635
expected = pl.DataFrame({"name": ["Joey", "Ivan"], "age": [14, 32]})
636
assert_frame_equal(expected, ldf.collect(), check_dtypes=False)
637
638
639
@pytest.mark.write_disk
640
def test_read_delta_empty(tmp_path: Path) -> None:
641
tmp_path.mkdir(exist_ok=True)
642
path = str(tmp_path)
643
644
DeltaTable.create(path, pl.DataFrame(schema={"x": pl.Int64}).to_arrow().schema)
645
assert_frame_equal(pl.read_delta(path), pl.DataFrame(schema={"x": pl.Int64}))
646
647
648
@pytest.mark.write_disk
649
def test_read_delta_arrow_map_type(tmp_path: Path) -> None:
650
payload = [
651
{"id": 1, "account_id": {17: "100.01.001 Cash"}},
652
{"id": 2, "account_id": {18: "180.01.001 Cash", 19: "foo"}},
653
]
654
655
schema = pa.schema(
656
[
657
pa.field("id", pa.int32()),
658
pa.field("account_id", pa.map_(pa.int32(), pa.string())),
659
]
660
)
661
table = pa.Table.from_pylist(payload, schema)
662
663
expect = pl.DataFrame(table)
664
665
table_path = str(tmp_path)
666
write_deltalake(
667
table_path,
668
table,
669
mode="overwrite",
670
)
671
672
assert_frame_equal(pl.scan_delta(table_path).collect(), expect)
673
assert_frame_equal(pl.read_delta(table_path), expect)
674
675
676
@pytest.mark.may_fail_cloud # reason: inspects logs
677
@pytest.mark.write_disk
678
def test_scan_delta_nanosecond_timestamp(
679
tmp_path: Path,
680
plmonkeypatch: PlMonkeyPatch,
681
capfd: pytest.CaptureFixture[str],
682
) -> None:
683
df = pl.DataFrame(
684
{"timestamp": [datetime(2025, 1, 1), datetime(2025, 1, 2)]},
685
schema={"timestamp": pl.Datetime("us", time_zone="UTC")},
686
)
687
688
df_nano_ts = pl.DataFrame(
689
{"timestamp": [datetime(2025, 1, 1), datetime(2025, 1, 2)]},
690
schema={"timestamp": pl.Datetime("ns", time_zone=None)},
691
)
692
693
root = tmp_path / "delta"
694
695
import deltalake
696
697
df.write_delta(
698
root,
699
delta_write_options={
700
"writer_properties": deltalake.WriterProperties(
701
default_column_properties=deltalake.ColumnProperties(
702
statistics_enabled="NONE"
703
)
704
)
705
},
706
)
707
708
# Manually overwrite the file with one that has nanosecond timestamps.
709
parquet_files = [x for x in root.iterdir() if x.suffix == ".parquet"]
710
assert len(parquet_files) == 1
711
parquet_file_path = parquet_files[0]
712
713
df_nano_ts.write_parquet(parquet_file_path)
714
715
# Baseline: The timestamp in the file is in nanoseconds.
716
q = pl.scan_parquet(parquet_file_path)
717
assert q.collect_schema() == {"timestamp": pl.Datetime("ns", time_zone=None)}
718
assert_frame_equal(q.collect(), df_nano_ts)
719
720
q = pl.scan_delta(root)
721
722
assert q.collect_schema() == {"timestamp": pl.Datetime("us", time_zone="UTC")}
723
assert_frame_equal(q.collect(), df)
724
725
# Ensure row-group skipping is functioning.
726
q = pl.scan_delta(root).filter(
727
pl.col("timestamp")
728
< pl.lit(datetime(2025, 1, 1), dtype=pl.Datetime("us", time_zone="UTC"))
729
)
730
plmonkeypatch.setenv("POLARS_VERBOSE", "1")
731
capfd.readouterr()
732
733
assert_frame_equal(q.collect(), df.clear())
734
assert "reading 0 / 1 row groups" in capfd.readouterr().err
735
736
737
@pytest.mark.write_disk
738
def test_scan_delta_nanosecond_timestamp_nested(tmp_path: Path) -> None:
739
df = pl.DataFrame(
740
{
741
"c1": [
742
{"timestamp": datetime(2025, 1, 1)},
743
{"timestamp": datetime(2025, 1, 2)},
744
]
745
},
746
schema={"c1": pl.Struct({"timestamp": pl.Datetime("us", time_zone="UTC")})},
747
)
748
749
df_nano_ts = pl.DataFrame(
750
{
751
"c1": [
752
{"timestamp": datetime(2025, 1, 1)},
753
{"timestamp": datetime(2025, 1, 2)},
754
]
755
},
756
schema={"c1": pl.Struct({"timestamp": pl.Datetime("ns", time_zone=None)})},
757
)
758
759
root = tmp_path / "delta"
760
761
df.write_delta(root)
762
763
# Manually overwrite the file with one that has nanosecond timestamps.
764
parquet_files = [x for x in root.iterdir() if x.suffix == ".parquet"]
765
assert len(parquet_files) == 1
766
parquet_file_path = parquet_files[0]
767
768
df_nano_ts.write_parquet(parquet_file_path)
769
770
# Baseline: The timestamp in the file is in nanoseconds.
771
q = pl.scan_parquet(parquet_file_path)
772
assert q.collect_schema() == {
773
"c1": pl.Struct({"timestamp": pl.Datetime("ns", time_zone=None)})
774
}
775
assert_frame_equal(q.collect(), df_nano_ts)
776
777
q = pl.scan_delta(root)
778
779
assert q.collect_schema() == {
780
"c1": pl.Struct({"timestamp": pl.Datetime("us", time_zone="UTC")})
781
}
782
assert_frame_equal(q.collect(), df)
783
784
785
@pytest.mark.write_disk
786
def test_scan_delta_schema_evolution_nested_struct_field_19915(tmp_path: Path) -> None:
787
(
788
pl.DataFrame(
789
{"a": ["test"], "properties": [{"property_key": {"item": 1}}]}
790
).write_delta(tmp_path)
791
)
792
793
(
794
pl.DataFrame(
795
{
796
"a": ["test1"],
797
"properties": [{"property_key": {"item": 50, "item2": 10}}],
798
}
799
).write_delta(
800
tmp_path,
801
mode="append",
802
delta_write_options={"schema_mode": "merge"},
803
)
804
)
805
806
q = pl.scan_delta(tmp_path)
807
808
expect = pl.DataFrame(
809
{
810
"a": ["test", "test1"],
811
"properties": [
812
{"property_key": {"item": 1, "item2": None}},
813
{"property_key": {"item": 50, "item2": 10}},
814
],
815
},
816
schema={
817
"a": pl.String,
818
"properties": pl.Struct(
819
{"property_key": pl.Struct({"item": pl.Int64, "item2": pl.Int64})}
820
),
821
},
822
)
823
824
assert_frame_equal(q.sort("a").collect(), expect)
825
826
827
@pytest.mark.write_disk
828
def test_scan_delta_storage_options_from_delta_table(
829
tmp_path: Path, plmonkeypatch: PlMonkeyPatch
830
) -> None:
831
import polars.io.delta._dataset
832
833
storage_options_checked = False
834
835
def assert_scan_parquet_storage_options(*a: Any, **kw: Any) -> Any:
836
nonlocal storage_options_checked
837
838
assert kw["storage_options"] == {
839
"aws_endpoint_url": "http://localhost:777",
840
"aws_access_key_id": "...",
841
"aws_secret_access_key": "...",
842
"aws_session_token": "...",
843
"endpoint_url": "...",
844
}
845
846
storage_options_checked = True
847
848
return pl.scan_parquet(*a, **kw)
849
850
plmonkeypatch.setattr(
851
polars.io.delta._dataset, "scan_parquet", assert_scan_parquet_storage_options
852
)
853
854
df = pl.DataFrame({"a": ["test"], "properties": [{"property_key": {"item": 1}}]})
855
856
df.write_delta(tmp_path)
857
858
tbl = DeltaTable(
859
tmp_path,
860
storage_options={
861
"aws_endpoint_url": "http://localhost:333",
862
"aws_access_key_id": "...",
863
"aws_secret_access_key": "...",
864
"aws_session_token": "...",
865
},
866
)
867
868
with warnings.catch_warnings():
869
warnings.filterwarnings("ignore", category=RuntimeWarning)
870
871
q = pl.scan_delta(
872
tbl,
873
storage_options={
874
"aws_endpoint_url": "http://localhost:777",
875
"endpoint_url": "...",
876
},
877
)
878
879
assert_frame_equal(q.collect(), df)
880
881
assert storage_options_checked
882
883
884
def test_scan_delta_loads_aws_profile_endpoint_url(
885
tmp_path: Path,
886
plmonkeypatch: PlMonkeyPatch,
887
) -> None:
888
tmp_path.mkdir(exist_ok=True)
889
890
cfg_file_path = tmp_path / "config"
891
892
cfg_file_path.write_text("""\
893
[profile endpoint_333]
894
aws_access_key_id=A
895
aws_secret_access_key=A
896
endpoint_url = http://127.0.0.1:54321
897
""")
898
899
plmonkeypatch.setenv("AWS_CONFIG_FILE", str(cfg_file_path))
900
plmonkeypatch.setenv("AWS_PROFILE", "endpoint_333")
901
902
assert (
903
builder := _init_credential_provider_builder(
904
"auto", "s3://.../...", storage_options=None, caller_name="test"
905
)
906
) is not None
907
908
assert isinstance(
909
provider := builder.build_credential_provider(),
910
pl.CredentialProviderAWS,
911
)
912
913
assert provider._can_use_as_provider()
914
915
assert provider._storage_update_options() == {
916
"endpoint_url": "http://127.0.0.1:54321"
917
}
918
919
with pytest.raises((DeltaError, OSError), match=r"http://127.0.0.1:54321"):
920
pl.scan_delta("s3://.../...").collect()
921
922
with pytest.raises((DeltaError, OSError), match=r"http://127.0.0.1:54321"):
923
pl.DataFrame({"x": 1}).write_delta("s3://.../...", mode="append")
924
925
926
def _df_many_types() -> pl.DataFrame:
927
return pl.DataFrame(
928
{
929
"p": [10, 10, 20, 20, 30, 30],
930
"a": [1, 2, 3, 4, 5, None],
931
"bool": [False, False, True, True, True, None],
932
"int": [1, 2, 3, 4, 5, None],
933
"float": [1.0, 2.0, 3.0, 4.0, 5.0, None],
934
"string": ["a", "b", "c", "cc", "ccc", None],
935
"struct": [
936
{"x": 1, "y": 10},
937
{"x": 2, "y": 20},
938
{"x": 3, "y": 30},
939
{"x": 4, "y": 40},
940
{"x": 5, "y": 50},
941
None,
942
],
943
}
944
).with_columns(
945
decimal=pl.col.a.cast(pl.Decimal(10, 2)),
946
date=pl.date_range(pl.date(2020, 1, 1), pl.date(2020, 1, 6), closed="both"),
947
datetime=pl.datetime_range(
948
pl.datetime(2020, 1, 1), pl.datetime(2020, 1, 6), closed="both"
949
),
950
)
951
952
953
# TODO: uncomment dtype when fixed
954
@pytest.mark.parametrize(
955
"expr",
956
[
957
# Bool
958
# pl.col.bool == False, ## see github issue #26290, to be confirmed
959
# pl.col.bool <= False,
960
# pl.col.bool < True,
961
pl.col.bool.is_null(),
962
# Integer
963
pl.col.int == 2,
964
pl.col.int <= 2,
965
pl.col.int < 3,
966
pl.col.int.is_null(),
967
(pl.col.int < 2) & (pl.col.int.is_not_null()),
968
# Float ## see github issue #26238
969
# pl.col.float == 2.0,
970
# pl.col.float <= 2.0,
971
# pl.col.float < 3.0,
972
pl.col.float.is_null(),
973
# mixed
974
(pl.col.int == 2) & (pl.col.float.is_not_null()),
975
# String
976
pl.col.string == "b",
977
pl.col.string <= "b",
978
pl.col.string.is_null(),
979
# Decimal
980
pl.col.decimal == pl.lit(2.0).cast(pl.Decimal(10, 2)),
981
pl.col.decimal <= pl.lit(2.0).cast(pl.Decimal(10, 2)),
982
pl.col.decimal < pl.lit(3.0).cast(pl.Decimal(10, 2)),
983
pl.col.decimal.is_null(),
984
# Struct # see github issue #26239
985
# pl.col.struct == {"x": 2, "y": 20},
986
# pl.col.struct.is_null(),
987
# Date & datetime
988
pl.col.date == pl.date(2020, 1, 1),
989
pl.col.datetime == pl.datetime(2020, 1, 1),
990
# on predicate
991
pl.col.p == 10,
992
],
993
)
994
@pytest.mark.write_disk
995
def test_scan_delta_filter_delta_log_statistics_23780(
996
tmp_path: Path,
997
plmonkeypatch: PlMonkeyPatch,
998
capfd: pytest.CaptureFixture[str],
999
expr: pl.Expr,
1000
) -> None:
1001
df = _df_many_types()
1002
root = tmp_path / "delta"
1003
df.write_delta(root, delta_write_options={"partition_by": "p"})
1004
1005
plmonkeypatch.setenv("POLARS_VERBOSE", "1")
1006
capfd.readouterr()
1007
1008
assert_frame_equal(
1009
pl.scan_delta(root).filter(expr).collect(),
1010
df.filter(expr),
1011
check_column_order=False,
1012
check_row_order=False,
1013
)
1014
assert "skipping 2 / 3 files" in capfd.readouterr().err
1015
1016
1017
@pytest.mark.write_disk
1018
def test_scan_delta_extract_table_statistics_df(tmp_path: Path) -> None:
1019
import datetime
1020
1021
df = _df_many_types()
1022
root = tmp_path / "delta"
1023
df.write_delta(root, delta_write_options={"partition_by": "p"})
1024
1025
statistics_df = _extract_table_statistics_from_delta_add_actions(
1026
pl.DataFrame(DeltaTable(tmp_path / "delta").get_add_actions()),
1027
filter_columns=df.columns,
1028
schema=df.schema,
1029
verbose=False,
1030
)
1031
1032
assert statistics_df is not None
1033
1034
assert_frame_equal(
1035
statistics_df,
1036
pl.DataFrame(
1037
[
1038
pl.Series('len', [2, 2, 2], dtype=pl.Int64),
1039
pl.Series('p_nc', [None, None, None], dtype=pl.UInt32),
1040
pl.Series('p_min', [None, None, None], dtype=pl.Int64),
1041
pl.Series('p_max', [None, None, None], dtype=pl.Int64),
1042
pl.Series('a_nc', [0, 1, 0], dtype=pl.Int64),
1043
pl.Series('a_min', [1, 5, 3], dtype=pl.Int64),
1044
pl.Series('a_max', [2, 5, 4], dtype=pl.Int64),
1045
pl.Series('bool_nc', [0, 1, 0], dtype=pl.Int64),
1046
pl.Series('bool_min', [None, None, None], dtype=pl.Boolean),
1047
pl.Series('bool_max', [None, None, None], dtype=pl.Boolean),
1048
pl.Series('int_nc', [0, 1, 0], dtype=pl.Int64),
1049
pl.Series('int_min', [1, 5, 3], dtype=pl.Int64),
1050
pl.Series('int_max', [2, 5, 4], dtype=pl.Int64),
1051
pl.Series('float_nc', [0, 1, 0], dtype=pl.Int64),
1052
pl.Series('float_min', [1.0, 5.0, 3.0], dtype=pl.Float64),
1053
pl.Series('float_max', [2.0, 5.0, 4.0], dtype=pl.Float64),
1054
pl.Series('string_nc', [0, 1, 0], dtype=pl.Int64),
1055
pl.Series('string_min', ['a', 'ccc', 'c'], dtype=pl.String),
1056
pl.Series('string_max', ['b', 'ccc', 'cc'], dtype=pl.String),
1057
pl.Series('struct_nc', [{'x': 0, 'y': 0}, {'x': 1, 'y': 1}, {'x': 0, 'y': 0}], dtype=pl.Struct({'x': pl.Int64, 'y': pl.Int64})),
1058
pl.Series('struct_min', [{'x': 1, 'y': 10}, {'x': 5, 'y': 50}, {'x': 3, 'y': 30}], dtype=pl.Struct({'x': pl.Int64, 'y': pl.Int64})),
1059
pl.Series('struct_max', [{'x': 2, 'y': 20}, {'x': 5, 'y': 50}, {'x': 4, 'y': 40}], dtype=pl.Struct({'x': pl.Int64, 'y': pl.Int64})),
1060
pl.Series('decimal_nc', [0, 1, 0], dtype=pl.Int64),
1061
pl.Series('decimal_min', [Decimal('1.00'), Decimal('5.00'), Decimal('3.00')], dtype=pl.Decimal(precision=10, scale=2)),
1062
pl.Series('decimal_max', [Decimal('2.00'), Decimal('5.00'), Decimal('4.00')], dtype=pl.Decimal(precision=10, scale=2)),
1063
pl.Series('date_nc', [0, 0, 0], dtype=pl.Int64),
1064
pl.Series('date_min', [datetime.date(2020, 1, 1), datetime.date(2020, 1, 5), datetime.date(2020, 1, 3)], dtype=pl.Date),
1065
pl.Series('date_max', [datetime.date(2020, 1, 2), datetime.date(2020, 1, 6), datetime.date(2020, 1, 4)], dtype=pl.Date),
1066
pl.Series('datetime_nc', [0, 0, 0], dtype=pl.Int64),
1067
pl.Series('datetime_min', [datetime.datetime(2020, 1, 1, 0, 0), datetime.datetime(2020, 1, 5, 0, 0), datetime.datetime(2020, 1, 3, 0, 0)], dtype=pl.Datetime(time_unit='us', time_zone=None)),
1068
pl.Series('datetime_max', [datetime.datetime(2020, 1, 2, 0, 0), datetime.datetime(2020, 1, 6, 0, 0), datetime.datetime(2020, 1, 4, 0, 0)], dtype=pl.Datetime(time_unit='us', time_zone=None)),
1069
]
1070
),
1071
check_row_order=False
1072
) # fmt: skip
1073
1074
1075
@pytest.mark.parametrize(
1076
("expr", "n_cols", "expect_n_files_skipped"),
1077
[
1078
(pl.col.a == 2, "0", 0),
1079
(pl.col.a == 2, "1", 1),
1080
(pl.col.b == 2, "1", 0),
1081
(pl.col.a == 2, "2", 1),
1082
],
1083
)
1084
@pytest.mark.write_disk
1085
def test_scan_delta_filter_delta_log_statistics_partial_23780(
1086
tmp_path: Path,
1087
plmonkeypatch: PlMonkeyPatch,
1088
capfd: pytest.CaptureFixture[str],
1089
expr: pl.Expr,
1090
n_cols: str,
1091
expect_n_files_skipped: int,
1092
) -> None:
1093
df = pl.DataFrame({"p": [10, 10, 20, 20], "a": [1, 2, 3, 4], "b": [1, 2, 3, 4]})
1094
1095
root = tmp_path / "delta"
1096
df.write_delta(
1097
root,
1098
delta_write_options={
1099
"partition_by": "p",
1100
"configuration": {
1101
"delta.dataSkippingNumIndexedCols": n_cols # Disable stats collection
1102
},
1103
},
1104
)
1105
1106
plmonkeypatch.setenv("POLARS_VERBOSE", "1")
1107
capfd.readouterr()
1108
1109
assert_frame_equal(
1110
pl.scan_delta(root).filter(expr).collect(),
1111
df.filter(expr),
1112
check_column_order=False,
1113
check_row_order=False,
1114
)
1115
assert f"skipping {expect_n_files_skipped} / 2 files" in capfd.readouterr().err
1116
1117
1118
@pytest.mark.write_disk
1119
def test_scan_delta_filter_delta_log_statistics_delete_partition_23780(
1120
tmp_path: Path,
1121
plmonkeypatch: PlMonkeyPatch,
1122
capfd: pytest.CaptureFixture[str],
1123
) -> None:
1124
df = pl.DataFrame(
1125
{
1126
"p": [10, 10, 20, 30],
1127
"a": [1, 2, 3, 4],
1128
}
1129
)
1130
root = tmp_path / "delta"
1131
1132
df.write_delta(root, delta_write_options={"partition_by": "p"})
1133
1134
plmonkeypatch.setenv("POLARS_VERBOSE", "1")
1135
capfd.readouterr()
1136
1137
expr = pl.col.a == 2
1138
assert_frame_equal(
1139
pl.scan_delta(root).filter(expr).collect(),
1140
df.filter(expr),
1141
check_column_order=False,
1142
check_row_order=False,
1143
)
1144
assert "skipping 2 / 3 files" in capfd.readouterr().err
1145
1146
from deltalake import DeltaTable
1147
1148
dt = DeltaTable(root)
1149
dt.delete("p = 30")
1150
1151
assert_frame_equal(
1152
pl.scan_delta(root).filter(expr).collect(),
1153
df.filter(expr),
1154
check_column_order=False,
1155
check_row_order=False,
1156
)
1157
assert "skipping 1 / 2 files" in capfd.readouterr().err
1158
1159
1160
@pytest.mark.parametrize("use_pyarrow", [True, False])
1161
@pytest.mark.write_disk
1162
def test_scan_delta_use_pyarrow(tmp_path: Path, use_pyarrow: bool) -> None:
1163
df = pl.DataFrame({"year": [2025, 2026, 2026], "month": [0, 0, 0]})
1164
df.write_delta(tmp_path, delta_write_options={"partition_by": "year"})
1165
1166
assert_frame_equal(
1167
pl.scan_delta(tmp_path, use_pyarrow=use_pyarrow)
1168
.filter(pl.col("year") == 2026)
1169
.collect(),
1170
pl.DataFrame({"year": [2026, 2026], "month": [0, 0]}),
1171
)
1172
1173
assert_frame_equal(
1174
pl.scan_delta(tmp_path, use_pyarrow=use_pyarrow)
1175
.filter(pl.col("year") == 2026)
1176
.head(1)
1177
.collect(),
1178
pl.DataFrame({"year": [2026], "month": [0]}),
1179
)
1180
1181
# Delta does not have stable file scan ordering.
1182
assert (
1183
pl.scan_delta(tmp_path, use_pyarrow=use_pyarrow).head(1).collect().height == 1
1184
)
1185
1186
1187
@pytest.mark.parametrize("use_pyarrow", [True, False])
1188
@pytest.mark.write_disk
1189
def test_scan_delta_use_pyarrow_single_file(tmp_path: Path, use_pyarrow: bool) -> None:
1190
df = pl.DataFrame({"year": [2025, 2026, 2026], "month": [0, 0, 0]})
1191
df.write_delta(tmp_path)
1192
1193
assert_frame_equal(
1194
pl.scan_delta(tmp_path, use_pyarrow=use_pyarrow)
1195
.filter(pl.col("year") == 2026)
1196
.collect(),
1197
pl.DataFrame({"year": [2026, 2026], "month": [0, 0]}),
1198
)
1199
1200
assert_frame_equal(
1201
pl.scan_delta(tmp_path, use_pyarrow=use_pyarrow)
1202
.filter(pl.col("year") == 2026)
1203
.head(1)
1204
.collect(),
1205
pl.DataFrame({"year": [2026], "month": [0]}),
1206
)
1207
1208
assert_frame_equal(
1209
pl.scan_delta(tmp_path, use_pyarrow=use_pyarrow).head(1).collect(),
1210
pl.DataFrame({"year": [2025], "month": [0]}),
1211
)
1212
1213
assert_frame_equal(
1214
pl.scan_delta(tmp_path, use_pyarrow=use_pyarrow)
1215
.head(1)
1216
.filter(pl.col("year") == 2026)
1217
.collect(),
1218
pl.DataFrame(schema={"year": pl.Int64, "month": pl.Int64}),
1219
)
1220
1221
1222
@pytest.mark.write_disk
1223
def test_delta_dataset_does_not_pickle_table_object(tmp_path: Path) -> None:
1224
df = pl.DataFrame({"row_index": [0, 1, 2, 3, 4]})
1225
df.write_delta(tmp_path)
1226
1227
dataset = new_pl_delta_dataset(DeltaTable(tmp_path))
1228
1229
assert dataset.table_.get() is not None
1230
dataset = pickle.loads(pickle.dumps(dataset))
1231
assert dataset.table_.get() is None
1232
1233
assert_frame_equal(dataset.to_dataset_scan()[0].collect(), df) # type: ignore[index]
1234
1235
1236
@pytest.mark.parametrize("use_pyarrow", [True, False])
1237
@pytest.mark.write_disk
1238
def test_delta_partition_filter(tmp_path: Path, use_pyarrow: bool) -> None:
1239
df = pl.DataFrame({"row_index": [0, 1, 2, 3, 4], "year": 2026})
1240
df.write_delta(tmp_path, delta_write_options={"partition_by": "year"})
1241
1242
for path in DeltaTable(tmp_path).file_uris():
1243
Path(path).unlink()
1244
1245
with pytest.raises((FileNotFoundError, OSError)):
1246
pl.scan_delta(tmp_path, use_pyarrow=use_pyarrow).collect()
1247
1248
assert_frame_equal(
1249
pl.scan_delta(tmp_path, use_pyarrow=use_pyarrow)
1250
.filter(pl.col("year") < 0)
1251
.collect(),
1252
pl.DataFrame(schema=df.schema),
1253
)
1254
1255
1256
@pytest.mark.write_disk
1257
@pytest.mark.parametrize("use_pyarrow", [True, False])
1258
def test_scan_delta_collect_without_version_scans_latest(
1259
tmp_path: Path,
1260
use_pyarrow: bool,
1261
plmonkeypatch: PlMonkeyPatch,
1262
capfd: pytest.CaptureFixture[str],
1263
) -> None:
1264
pl.DataFrame({"a": [0]}).write_delta(tmp_path)
1265
table = DeltaTable(tmp_path)
1266
1267
q = pl.scan_delta(table, use_pyarrow=use_pyarrow)
1268
1269
assert_frame_equal(q.collect(), pl.DataFrame({"a": [0]}))
1270
1271
pl.DataFrame({"a": [1]}).write_delta(table, mode="append")
1272
1273
assert_frame_equal(q.collect().sort("*"), pl.DataFrame({"a": [0, 1]}))
1274
1275
version = table.version()
1276
1277
q_with_id = pl.scan_delta(table, use_pyarrow=use_pyarrow, version=version)
1278
1279
assert_frame_equal(q_with_id.collect().sort("*"), pl.DataFrame({"a": [0, 1]}))
1280
1281
pl.DataFrame({"a": [2]}).write_delta(table, mode="append")
1282
1283
assert_frame_equal(q.collect().sort("*"), pl.DataFrame({"a": [0, 1, 2]}))
1284
1285
plmonkeypatch.setenv("POLARS_VERBOSE", "1")
1286
capfd.readouterr()
1287
1288
assert_frame_equal(q_with_id.collect().sort("*"), pl.DataFrame({"a": [0, 1]}))
1289
1290
capture = capfd.readouterr().err
1291
1292
assert (
1293
"DeltaDataset: to_dataset_scan(): early return (version_key = '1')" in capture
1294
)
1295
1296
1297
@pytest.mark.write_disk
1298
def test_scan_delta_filter_delta_log_statistics_missing_26444(tmp_path: Path) -> None:
1299
pl.DataFrame({"x": [1, 2], "y": [True, False]}).write_delta(tmp_path)
1300
1301
assert_frame_equal(
1302
pl.scan_delta(tmp_path).filter("y").collect(),
1303
pl.DataFrame({"x": 1, "y": True}),
1304
)
1305
1306
schema = {
1307
"bool": pl.Boolean,
1308
"string": pl.String,
1309
"binary": pl.Binary,
1310
"int8": pl.Int8,
1311
"null": pl.Null,
1312
}
1313
1314
for actions_df in [
1315
pl.DataFrame({"num_records": [1, 2, 3]}),
1316
pl.DataFrame({"num_records": [1, 2, 3], "min": [{}, {}, {}]}),
1317
pl.DataFrame({"num_records": [1, 2, 3], "max": [{}, {}, {}]}),
1318
pl.DataFrame({"num_records": [1, 2, 3], "null_count": [{}, {}, {}]}),
1319
]:
1320
df = _extract_table_statistics_from_delta_add_actions(
1321
actions_df,
1322
filter_columns=[*schema],
1323
schema=schema,
1324
verbose=False,
1325
)
1326
1327
assert df is not None
1328
1329
assert_frame_equal(
1330
df,
1331
pl.DataFrame(
1332
[
1333
pl.Series("len", [1, 2, 3], dtype=pl.Int64),
1334
pl.Series("bool_nc", [None, None, None], dtype=pl.UInt32),
1335
pl.Series("bool_min", [None, None, None], dtype=pl.Boolean),
1336
pl.Series("bool_max", [None, None, None], dtype=pl.Boolean),
1337
pl.Series("string_nc", [None, None, None], dtype=pl.UInt32),
1338
pl.Series("string_min", [None, None, None], dtype=pl.String),
1339
pl.Series("string_max", [None, None, None], dtype=pl.String),
1340
pl.Series("binary_nc", [None, None, None], dtype=pl.UInt32),
1341
pl.Series("binary_min", [None, None, None], dtype=pl.Binary),
1342
pl.Series("binary_max", [None, None, None], dtype=pl.Binary),
1343
pl.Series("int8_nc", [None, None, None], dtype=pl.UInt32),
1344
pl.Series("int8_min", [None, None, None], dtype=pl.Int8),
1345
pl.Series("int8_max", [None, None, None], dtype=pl.Int8),
1346
pl.Series("null_nc", [None, None, None], dtype=pl.UInt32),
1347
pl.Series("null_min", [None, None, None], dtype=pl.Null),
1348
pl.Series("null_max", [None, None, None], dtype=pl.Null),
1349
]
1350
),
1351
)
1352
1353
assert (
1354
_extract_table_statistics_from_delta_add_actions(
1355
pl.DataFrame(),
1356
filter_columns=[*schema],
1357
schema=schema,
1358
verbose=False,
1359
)
1360
is None
1361
)
1362
1363