Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/io/test_scan.py
8430 views
1
from __future__ import annotations
2
3
import io
4
import sys
5
import zlib
6
from dataclasses import dataclass
7
from datetime import datetime
8
from functools import partial
9
from math import ceil
10
from pathlib import Path
11
from typing import TYPE_CHECKING, Any
12
13
import pytest
14
15
import polars as pl
16
from polars.exceptions import ComputeError
17
from polars.testing.asserts.frame import assert_frame_equal
18
from tests.unit.io.conftest import format_file_uri, normalize_path_separator_pl
19
20
if TYPE_CHECKING:
21
from collections.abc import Callable
22
23
from polars._typing import SchemaDict
24
from tests.conftest import PlMonkeyPatch
25
26
27
@dataclass
28
class _RowIndex:
29
name: str = "index"
30
offset: int = 0
31
32
33
def _enable_force_async(plmonkeypatch: PlMonkeyPatch) -> None:
34
"""Modifies the provided plmonkeypatch context."""
35
plmonkeypatch.setenv("POLARS_VERBOSE", "1")
36
plmonkeypatch.setenv("POLARS_FORCE_ASYNC", "1")
37
38
39
def _scan(
40
file_path: Path,
41
schema: SchemaDict | None = None,
42
row_index: _RowIndex | None = None,
43
) -> pl.LazyFrame:
44
suffix = file_path.suffix
45
row_index_name = None if row_index is None else row_index.name
46
row_index_offset = 0 if row_index is None else row_index.offset
47
48
if (
49
scan_func := {
50
".ipc": pl.scan_ipc,
51
".parquet": pl.scan_parquet,
52
".csv": pl.scan_csv,
53
".ndjson": pl.scan_ndjson,
54
}.get(suffix)
55
) is not None: # fmt: skip
56
result = scan_func(
57
file_path,
58
row_index_name=row_index_name,
59
row_index_offset=row_index_offset,
60
) # type: ignore[operator]
61
62
else:
63
msg = f"Unknown suffix {suffix}"
64
raise NotImplementedError(msg)
65
66
return result # type: ignore[no-any-return]
67
68
69
def _write(df: pl.DataFrame, file_path: Path) -> None:
70
suffix = file_path.suffix
71
72
if (
73
write_func := {
74
".ipc": pl.DataFrame.write_ipc,
75
".parquet": pl.DataFrame.write_parquet,
76
".csv": pl.DataFrame.write_csv,
77
".ndjson": pl.DataFrame.write_ndjson,
78
}.get(suffix)
79
) is not None: # fmt: skip
80
return write_func(df, file_path) # type: ignore[operator, no-any-return]
81
82
msg = f"Unknown suffix {suffix}"
83
raise NotImplementedError(msg)
84
85
86
@pytest.fixture(
87
scope="session",
88
params=["csv", "ipc", "parquet", "ndjson"],
89
)
90
def data_file_extension(request: pytest.FixtureRequest) -> str:
91
return f".{request.param}"
92
93
94
@pytest.fixture(scope="session")
95
def session_tmp_dir(tmp_path_factory: pytest.TempPathFactory) -> Path:
96
return tmp_path_factory.mktemp("polars-test")
97
98
99
@pytest.fixture(
100
params=[False, True],
101
ids=["sync", "async"],
102
)
103
def force_async(request: pytest.FixtureRequest, plmonkeypatch: PlMonkeyPatch) -> bool:
104
value: bool = request.param
105
return value
106
107
108
@dataclass
109
class _DataFile:
110
path: Path
111
df: pl.DataFrame
112
113
114
def df_with_chunk_size_limit(df: pl.DataFrame, limit: int) -> pl.DataFrame:
115
return pl.concat(
116
(
117
df.slice(i * limit, min(limit, df.height - i * limit))
118
for i in range(ceil(df.height / limit))
119
),
120
rechunk=False,
121
)
122
123
124
@pytest.fixture(scope="session")
125
def data_file_single(session_tmp_dir: Path, data_file_extension: str) -> _DataFile:
126
max_rows_per_batch = 727
127
file_path = (session_tmp_dir / "data").with_suffix(data_file_extension)
128
df = pl.DataFrame(
129
{
130
"sequence": range(10000),
131
}
132
)
133
assert max_rows_per_batch < df.height
134
_write(df_with_chunk_size_limit(df, max_rows_per_batch), file_path)
135
return _DataFile(path=file_path, df=df)
136
137
138
@pytest.fixture(scope="session")
139
def data_file_glob(session_tmp_dir: Path, data_file_extension: str) -> _DataFile:
140
max_rows_per_batch = 200
141
row_counts = [
142
100, 186, 95, 185, 90, 84, 115, 81, 87, 217, 126, 85, 98, 122, 129, 122, 1089, 82,
143
234, 86, 93, 90, 91, 263, 87, 126, 86, 161, 191, 1368, 403, 192, 102, 98, 115, 81,
144
111, 305, 92, 534, 431, 150, 90, 128, 152, 118, 127, 124, 229, 368, 81,
145
] # fmt: skip
146
assert sum(row_counts) == 10000
147
148
# Make sure we pad file names with enough zeros to ensure correct
149
# lexicographical ordering.
150
assert len(row_counts) < 100
151
152
# Make sure that some of our data frames consist of multiple chunks which
153
# affects the output of certain file formats.
154
assert any(row_count > max_rows_per_batch for row_count in row_counts)
155
df = pl.DataFrame(
156
{
157
"sequence": range(10000),
158
}
159
)
160
161
row_offset = 0
162
for index, row_count in enumerate(row_counts):
163
file_path = (session_tmp_dir / f"data_{index:02}").with_suffix(
164
data_file_extension
165
)
166
_write(
167
df_with_chunk_size_limit(
168
df.slice(row_offset, row_count), max_rows_per_batch
169
),
170
file_path,
171
)
172
row_offset += row_count
173
return _DataFile(
174
path=(session_tmp_dir / "data_*").with_suffix(data_file_extension), df=df
175
)
176
177
178
@pytest.fixture(scope="session", params=["single", "glob"])
179
def data_file(
180
request: pytest.FixtureRequest,
181
data_file_single: _DataFile,
182
data_file_glob: _DataFile,
183
) -> _DataFile:
184
if request.param == "single":
185
return data_file_single
186
if request.param == "glob":
187
return data_file_glob
188
raise NotImplementedError()
189
190
191
@pytest.mark.write_disk
192
def test_scan(
193
capfd: Any, plmonkeypatch: PlMonkeyPatch, data_file: _DataFile, force_async: bool
194
) -> None:
195
if force_async:
196
_enable_force_async(plmonkeypatch)
197
198
df = _scan(data_file.path, data_file.df.schema).collect()
199
200
assert_frame_equal(df, data_file.df)
201
202
203
@pytest.mark.write_disk
204
def test_scan_with_limit(
205
capfd: Any, plmonkeypatch: PlMonkeyPatch, data_file: _DataFile, force_async: bool
206
) -> None:
207
if force_async:
208
_enable_force_async(plmonkeypatch)
209
210
df = _scan(data_file.path, data_file.df.schema).limit(4483).collect()
211
212
assert_frame_equal(
213
df,
214
pl.DataFrame(
215
{
216
"sequence": range(4483),
217
}
218
),
219
)
220
221
222
@pytest.mark.write_disk
223
def test_scan_with_filter(
224
capfd: Any, plmonkeypatch: PlMonkeyPatch, data_file: _DataFile, force_async: bool
225
) -> None:
226
if force_async:
227
_enable_force_async(plmonkeypatch)
228
229
df = (
230
_scan(data_file.path, data_file.df.schema)
231
.filter(pl.col("sequence") % 2 == 0)
232
.collect()
233
)
234
235
assert_frame_equal(
236
df,
237
pl.DataFrame(
238
{
239
"sequence": (2 * x for x in range(5000)),
240
}
241
),
242
)
243
244
245
@pytest.mark.write_disk
246
def test_scan_with_filter_and_limit(
247
capfd: Any, plmonkeypatch: PlMonkeyPatch, data_file: _DataFile, force_async: bool
248
) -> None:
249
if force_async:
250
_enable_force_async(plmonkeypatch)
251
252
df = (
253
_scan(data_file.path, data_file.df.schema)
254
.filter(pl.col("sequence") % 2 == 0)
255
.limit(4483)
256
.collect()
257
)
258
259
assert_frame_equal(
260
df,
261
pl.DataFrame(
262
{
263
"sequence": (2 * x for x in range(4483)),
264
},
265
),
266
)
267
268
269
@pytest.mark.write_disk
270
def test_scan_with_limit_and_filter(
271
capfd: Any, plmonkeypatch: PlMonkeyPatch, data_file: _DataFile, force_async: bool
272
) -> None:
273
if force_async:
274
_enable_force_async(plmonkeypatch)
275
276
df = (
277
_scan(data_file.path, data_file.df.schema)
278
.limit(4483)
279
.filter(pl.col("sequence") % 2 == 0)
280
.collect()
281
)
282
283
assert_frame_equal(
284
df,
285
pl.DataFrame(
286
{
287
"sequence": (2 * x for x in range(2242)),
288
},
289
),
290
)
291
292
293
@pytest.mark.write_disk
294
def test_scan_with_row_index_and_limit(
295
capfd: Any, plmonkeypatch: PlMonkeyPatch, data_file: _DataFile, force_async: bool
296
) -> None:
297
if force_async:
298
_enable_force_async(plmonkeypatch)
299
300
df = (
301
_scan(data_file.path, data_file.df.schema, row_index=_RowIndex())
302
.limit(4483)
303
.collect()
304
)
305
306
assert_frame_equal(
307
df,
308
pl.DataFrame(
309
{
310
"index": range(4483),
311
"sequence": range(4483),
312
},
313
schema_overrides={"index": pl.UInt32},
314
),
315
)
316
317
318
@pytest.mark.write_disk
319
def test_scan_with_row_index_and_filter(
320
capfd: Any, plmonkeypatch: PlMonkeyPatch, data_file: _DataFile, force_async: bool
321
) -> None:
322
if force_async:
323
_enable_force_async(plmonkeypatch)
324
325
df = (
326
_scan(data_file.path, data_file.df.schema, row_index=_RowIndex())
327
.filter(pl.col("sequence") % 2 == 0)
328
.collect()
329
)
330
331
assert_frame_equal(
332
df,
333
pl.DataFrame(
334
{
335
"index": (2 * x for x in range(5000)),
336
"sequence": (2 * x for x in range(5000)),
337
},
338
schema_overrides={"index": pl.UInt32},
339
),
340
)
341
342
343
@pytest.mark.write_disk
344
def test_scan_with_row_index_limit_and_filter(
345
capfd: Any, plmonkeypatch: PlMonkeyPatch, data_file: _DataFile, force_async: bool
346
) -> None:
347
if force_async:
348
_enable_force_async(plmonkeypatch)
349
350
df = (
351
_scan(data_file.path, data_file.df.schema, row_index=_RowIndex())
352
.limit(4483)
353
.filter(pl.col("sequence") % 2 == 0)
354
.collect()
355
)
356
357
assert_frame_equal(
358
df,
359
pl.DataFrame(
360
{
361
"index": (2 * x for x in range(2242)),
362
"sequence": (2 * x for x in range(2242)),
363
},
364
schema_overrides={"index": pl.UInt32},
365
),
366
)
367
368
369
@pytest.mark.write_disk
370
def test_scan_with_row_index_projected_out(
371
capfd: Any, plmonkeypatch: PlMonkeyPatch, data_file: _DataFile, force_async: bool
372
) -> None:
373
if data_file.path.suffix == ".csv" and force_async:
374
pytest.skip(reason="async reading of .csv not yet implemented")
375
376
if force_async:
377
_enable_force_async(plmonkeypatch)
378
379
subset = next(iter(data_file.df.schema.keys()))
380
df = (
381
_scan(data_file.path, data_file.df.schema, row_index=_RowIndex())
382
.select(subset)
383
.collect()
384
)
385
386
assert_frame_equal(df, data_file.df.select(subset))
387
388
389
@pytest.mark.write_disk
390
def test_scan_with_row_index_filter_and_limit(
391
capfd: Any, plmonkeypatch: PlMonkeyPatch, data_file: _DataFile, force_async: bool
392
) -> None:
393
if data_file.path.suffix == ".csv" and force_async:
394
pytest.skip(reason="async reading of .csv not yet implemented")
395
396
if force_async:
397
_enable_force_async(plmonkeypatch)
398
399
df = (
400
_scan(data_file.path, data_file.df.schema, row_index=_RowIndex())
401
.filter(pl.col("sequence") % 2 == 0)
402
.limit(4483)
403
.collect()
404
)
405
406
assert_frame_equal(
407
df,
408
pl.DataFrame(
409
{
410
"index": (2 * x for x in range(4483)),
411
"sequence": (2 * x for x in range(4483)),
412
},
413
schema_overrides={"index": pl.UInt32},
414
),
415
)
416
417
418
@pytest.mark.write_disk
419
@pytest.mark.parametrize(
420
("scan_func", "write_func"),
421
[
422
(pl.scan_parquet, pl.DataFrame.write_parquet),
423
(pl.scan_ipc, pl.DataFrame.write_ipc),
424
(pl.scan_csv, pl.DataFrame.write_csv),
425
(pl.scan_ndjson, pl.DataFrame.write_ndjson),
426
],
427
)
428
@pytest.mark.parametrize(
429
"streaming",
430
[True, False],
431
)
432
def test_scan_limit_0_does_not_panic(
433
tmp_path: Path,
434
scan_func: Callable[[Any], pl.LazyFrame],
435
write_func: Callable[[pl.DataFrame, Path], None],
436
streaming: bool,
437
) -> None:
438
tmp_path.mkdir(exist_ok=True)
439
path = tmp_path / "data.bin"
440
df = pl.DataFrame({"x": 1})
441
write_func(df, path)
442
assert_frame_equal(
443
scan_func(path)
444
.head(0)
445
.collect(engine="streaming" if streaming else "in-memory"),
446
df.clear(),
447
)
448
449
450
@pytest.mark.write_disk
451
@pytest.mark.parametrize(
452
("scan_func", "write_func"),
453
[
454
(pl.scan_csv, pl.DataFrame.write_csv),
455
(pl.scan_parquet, pl.DataFrame.write_parquet),
456
(pl.scan_ipc, pl.DataFrame.write_ipc),
457
(pl.scan_ndjson, pl.DataFrame.write_ndjson),
458
],
459
)
460
@pytest.mark.parametrize(
461
"glob",
462
[True, False],
463
)
464
def test_scan_directory(
465
tmp_path: Path,
466
scan_func: Callable[..., pl.LazyFrame],
467
write_func: Callable[[pl.DataFrame, Path], None],
468
glob: bool,
469
) -> None:
470
tmp_path.mkdir(exist_ok=True)
471
472
dfs: list[pl.DataFrame] = [
473
pl.DataFrame({"a": [0, 0, 0, 0, 0]}),
474
pl.DataFrame({"a": [1, 1, 1, 1, 1]}),
475
pl.DataFrame({"a": [2, 2, 2, 2, 2]}),
476
]
477
478
paths = [
479
tmp_path / "0.bin",
480
tmp_path / "1.bin",
481
tmp_path / "dir/data.bin",
482
]
483
484
for df, path in zip(dfs, paths, strict=True):
485
path.parent.mkdir(exist_ok=True)
486
write_func(df, path)
487
488
df = pl.concat(dfs)
489
490
scan = scan_func
491
492
if scan_func in [pl.scan_csv, pl.scan_ndjson]:
493
scan = partial(scan, schema=df.schema)
494
495
if scan_func is pl.scan_parquet:
496
scan = partial(scan, glob=glob)
497
498
out = scan(tmp_path).collect()
499
assert_frame_equal(out, df)
500
501
502
@pytest.mark.write_disk
503
def test_scan_glob_excludes_directories(tmp_path: Path) -> None:
504
for dir in ["dir1", "dir2", "dir3"]:
505
(tmp_path / dir).mkdir()
506
507
df = pl.DataFrame({"a": [1, 2, 3]})
508
509
df.write_parquet(tmp_path / "dir1/data.bin")
510
df.write_parquet(tmp_path / "dir2/data.parquet")
511
df.write_parquet(tmp_path / "data.parquet")
512
513
assert_frame_equal(pl.scan_parquet(tmp_path / "**/*.bin").collect(), df)
514
assert_frame_equal(pl.scan_parquet(tmp_path / "**/data*.bin").collect(), df)
515
assert_frame_equal(
516
pl.scan_parquet(tmp_path / "**/*").collect(), pl.concat(3 * [df])
517
)
518
assert_frame_equal(pl.scan_parquet(tmp_path / "*").collect(), df)
519
520
521
@pytest.mark.parametrize("file_name", ["a b", "a %25 b"])
522
@pytest.mark.write_disk
523
def test_scan_async_whitespace_in_path(
524
tmp_path: Path, plmonkeypatch: PlMonkeyPatch, file_name: str
525
) -> None:
526
plmonkeypatch.setenv("POLARS_FORCE_ASYNC", "1")
527
tmp_path.mkdir(exist_ok=True)
528
529
path = tmp_path / f"{file_name}.parquet"
530
df = pl.DataFrame({"x": 1})
531
df.write_parquet(path)
532
assert_frame_equal(pl.scan_parquet(path).collect(), df)
533
assert_frame_equal(pl.scan_parquet(tmp_path).collect(), df)
534
assert_frame_equal(pl.scan_parquet(tmp_path / "*").collect(), df)
535
assert_frame_equal(pl.scan_parquet(tmp_path / "*.parquet").collect(), df)
536
path.unlink()
537
538
539
@pytest.mark.write_disk
540
def test_path_expansion_excludes_empty_files_17362(tmp_path: Path) -> None:
541
tmp_path.mkdir(exist_ok=True)
542
543
df = pl.DataFrame({"x": 1})
544
df.write_parquet(tmp_path / "data.parquet")
545
(tmp_path / "empty").touch()
546
547
assert_frame_equal(pl.scan_parquet(tmp_path).collect(), df)
548
assert_frame_equal(pl.scan_parquet(tmp_path / "*").collect(), df)
549
550
551
@pytest.mark.write_disk
552
def test_path_expansion_empty_directory_does_not_panic(tmp_path: Path) -> None:
553
tmp_path.mkdir(exist_ok=True)
554
555
with pytest.raises(pl.exceptions.ComputeError):
556
pl.scan_parquet(tmp_path).collect()
557
558
with pytest.raises(pl.exceptions.ComputeError):
559
pl.scan_parquet(tmp_path / "**/*").collect()
560
561
562
@pytest.mark.write_disk
563
def test_scan_single_dir_differing_file_extensions_raises_17436(tmp_path: Path) -> None:
564
tmp_path.mkdir(exist_ok=True)
565
566
df = pl.DataFrame({"x": 1})
567
df.write_parquet(tmp_path / "data.parquet")
568
df.write_ipc(tmp_path / "data.ipc")
569
570
with pytest.raises(
571
pl.exceptions.InvalidOperationError, match="different file extensions"
572
):
573
pl.scan_parquet(tmp_path).collect()
574
575
for lf in [
576
pl.scan_parquet(tmp_path / "*.parquet"),
577
pl.scan_ipc(tmp_path / "*.ipc"),
578
]:
579
assert_frame_equal(lf.collect(), df)
580
581
# Ensure passing a glob doesn't trigger file extension checking
582
with pytest.raises(
583
pl.exceptions.ComputeError,
584
match="parquet: File out of specification: The file must end with PAR1",
585
):
586
pl.scan_parquet(tmp_path / "*").collect()
587
588
589
@pytest.mark.parametrize("format", ["parquet", "csv", "ndjson", "ipc"])
590
def test_scan_nonexistent_path(format: str) -> None:
591
path_str = f"my-nonexistent-data.{format}"
592
path = Path(path_str)
593
assert not path.exists()
594
595
scan_function = getattr(pl, f"scan_{format}")
596
597
# Just calling the scan function should not raise any errors
598
result = scan_function(path)
599
assert isinstance(result, pl.LazyFrame)
600
601
# Upon collection, it should fail
602
with pytest.raises(FileNotFoundError):
603
result.collect()
604
605
606
@pytest.mark.write_disk
607
@pytest.mark.parametrize(
608
("scan_func", "write_func"),
609
[
610
(pl.scan_parquet, pl.DataFrame.write_parquet),
611
(pl.scan_ipc, pl.DataFrame.write_ipc),
612
(pl.scan_csv, pl.DataFrame.write_csv),
613
(pl.scan_ndjson, pl.DataFrame.write_ndjson),
614
],
615
)
616
@pytest.mark.parametrize(
617
"streaming",
618
[True, False],
619
)
620
def test_scan_include_file_paths(
621
tmp_path: Path,
622
scan_func: Callable[..., pl.LazyFrame],
623
write_func: Callable[[pl.DataFrame, Path], None],
624
streaming: bool,
625
) -> None:
626
tmp_path.mkdir(exist_ok=True)
627
dfs: list[pl.DataFrame] = []
628
629
for x in ["1", "2"]:
630
path = Path(f"{tmp_path}/{x}.bin").absolute()
631
dfs.append(pl.DataFrame({"x": 10 * [x]}).with_columns(path=pl.lit(str(path))))
632
write_func(dfs[-1].drop("path"), path)
633
634
df = pl.concat(dfs).with_columns(normalize_path_separator_pl(pl.col("path")))
635
assert df.columns == ["x", "path"]
636
637
with pytest.raises(
638
pl.exceptions.DuplicateError,
639
match=r'column name for file paths "x" conflicts with column name from file',
640
):
641
scan_func(tmp_path, include_file_paths="x").collect(
642
engine="streaming" if streaming else "in-memory"
643
)
644
645
f = scan_func
646
if scan_func in [pl.scan_csv, pl.scan_ndjson]:
647
f = partial(f, schema=df.drop("path").schema)
648
649
lf: pl.LazyFrame = f(tmp_path, include_file_paths="path")
650
assert_frame_equal(lf.collect(engine="streaming" if streaming else "in-memory"), df)
651
652
# Test projecting only the path column
653
q = lf.select("path")
654
assert q.collect_schema() == {"path": pl.String}
655
assert_frame_equal(
656
q.collect(engine="streaming" if streaming else "in-memory"),
657
df.select("path"),
658
)
659
660
q = q.select("path").head(3)
661
assert q.collect_schema() == {"path": pl.String}
662
assert_frame_equal(
663
q.collect(engine="streaming" if streaming else "in-memory"),
664
df.select("path").head(3),
665
)
666
667
# Test predicates
668
for predicate in [pl.col("path") != pl.col("x"), pl.col("path") != ""]:
669
assert_frame_equal(
670
lf.filter(predicate).collect(
671
engine="streaming" if streaming else "in-memory"
672
),
673
df,
674
)
675
676
# Test codepaths that materialize empty DataFrames
677
assert_frame_equal(
678
lf.head(0).collect(engine="streaming" if streaming else "in-memory"),
679
df.head(0),
680
)
681
682
683
@pytest.mark.write_disk
684
def test_async_path_expansion_bracket_17629(tmp_path: Path) -> None:
685
path = tmp_path / "data.parquet"
686
687
df = pl.DataFrame({"x": 1})
688
df.write_parquet(path)
689
690
assert_frame_equal(pl.scan_parquet(tmp_path / "[d]ata.parquet").collect(), df)
691
692
693
@pytest.mark.parametrize(
694
"method",
695
["parquet", "csv", "ipc", "ndjson"],
696
)
697
@pytest.mark.may_fail_auto_streaming # unsupported negative slice offset -1 for CSV source
698
def test_scan_in_memory(method: str) -> None:
699
f = io.BytesIO()
700
df = pl.DataFrame(
701
{
702
"a": [1, 2, 3],
703
"b": ["x", "y", "z"],
704
}
705
)
706
707
(getattr(df, f"write_{method}"))(f)
708
709
f.seek(0)
710
result = (getattr(pl, f"scan_{method}"))(f).collect()
711
assert_frame_equal(df, result)
712
713
f.seek(0)
714
result = (getattr(pl, f"scan_{method}"))(f).slice(1, 2).collect()
715
assert_frame_equal(df.slice(1, 2), result)
716
717
f.seek(0)
718
result = (getattr(pl, f"scan_{method}"))(f).slice(-1, 1).collect()
719
assert_frame_equal(df.slice(-1, 1), result)
720
721
g = io.BytesIO()
722
(getattr(df, f"write_{method}"))(g)
723
724
f.seek(0)
725
g.seek(0)
726
result = (getattr(pl, f"scan_{method}"))([f, g]).collect()
727
assert_frame_equal(df.vstack(df), result)
728
729
f.seek(0)
730
g.seek(0)
731
result = (getattr(pl, f"scan_{method}"))([f, g]).slice(1, 2).collect()
732
assert_frame_equal(df.vstack(df).slice(1, 2), result)
733
734
f.seek(0)
735
g.seek(0)
736
result = (getattr(pl, f"scan_{method}"))([f, g]).slice(-1, 1).collect()
737
assert_frame_equal(df.vstack(df).slice(-1, 1), result)
738
739
740
def test_scan_pyobject_zero_copy_buffer_mutate() -> None:
741
f = io.BytesIO()
742
743
df = pl.DataFrame({"x": [1, 2, 3, 4, 5]})
744
df.write_ipc(f)
745
f.seek(0)
746
747
q = pl.scan_ipc(f)
748
assert_frame_equal(q.collect(), df)
749
750
f.write(b"AAA")
751
assert_frame_equal(q.collect(), df)
752
753
754
@pytest.mark.parametrize(
755
"method",
756
["csv", "ndjson"],
757
)
758
def test_scan_stringio(method: str) -> None:
759
f = io.StringIO()
760
df = pl.DataFrame(
761
{
762
"a": [1, 2, 3],
763
"b": ["x", "y", "z"],
764
}
765
)
766
767
(getattr(df, f"write_{method}"))(f)
768
769
f.seek(0)
770
result = (getattr(pl, f"scan_{method}"))(f).collect()
771
assert_frame_equal(df, result)
772
773
g = io.StringIO()
774
(getattr(df, f"write_{method}"))(g)
775
776
f.seek(0)
777
g.seek(0)
778
result = (getattr(pl, f"scan_{method}"))([f, g]).collect()
779
assert_frame_equal(df.vstack(df), result)
780
781
782
def test_scan_double_collect_row_index_invalidates_cached_ir_18892() -> None:
783
lf = pl.scan_csv(io.BytesIO(b"a\n1\n2\n3"))
784
785
lf.collect()
786
787
out = lf.with_row_index().collect()
788
789
assert_frame_equal(
790
out,
791
pl.DataFrame(
792
{"index": [0, 1, 2], "a": [1, 2, 3]},
793
schema={"index": pl.get_index_type(), "a": pl.Int64},
794
),
795
)
796
797
798
def test_scan_include_file_paths_respects_projection_pushdown() -> None:
799
q = pl.scan_csv(b"a,b,c\na1,b1,c1", include_file_paths="path_name").select(
800
["a", "b"]
801
)
802
803
assert_frame_equal(q.collect(), pl.DataFrame({"a": "a1", "b": "b1"}))
804
805
806
def test_streaming_scan_csv_include_file_paths_18257(io_files_path: Path) -> None:
807
lf = pl.scan_csv(
808
io_files_path / "foods1.csv",
809
include_file_paths="path",
810
).select("category", "path")
811
812
assert lf.collect(engine="streaming").columns == ["category", "path"]
813
814
815
def test_streaming_scan_csv_with_row_index_19172(io_files_path: Path) -> None:
816
lf = (
817
pl.scan_csv(io_files_path / "foods1.csv", infer_schema=False)
818
.with_row_index()
819
.select("calories", "index")
820
.head(1)
821
)
822
823
assert_frame_equal(
824
lf.collect(engine="streaming"),
825
pl.DataFrame(
826
{"calories": "45", "index": 0},
827
schema={"calories": pl.String, "index": pl.get_index_type()},
828
),
829
)
830
831
832
@pytest.mark.write_disk
833
def test_predicate_hive_pruning_with_cast(tmp_path: Path) -> None:
834
tmp_path.mkdir(exist_ok=True)
835
836
df = pl.DataFrame({"x": 1})
837
838
(p := (tmp_path / "date=2024-01-01")).mkdir()
839
840
df.write_parquet(p / "1")
841
842
(p := (tmp_path / "date=2024-01-02")).mkdir()
843
844
# Write an invalid parquet file that will cause errors if polars attempts to
845
# read it.
846
# This works because `scan_parquet()` only looks at the first file during
847
# schema inference.
848
(p / "1").write_text("not a parquet file")
849
850
expect = pl.DataFrame({"x": 1, "date": datetime(2024, 1, 1).date()})
851
852
lf = pl.scan_parquet(tmp_path)
853
854
q = lf.filter(pl.col("date") < datetime(2024, 1, 2).date())
855
856
assert_frame_equal(q.collect(), expect)
857
858
# This filter expr with stprtime is effectively what LazyFrame.sql()
859
# generates
860
q = lf.filter(
861
pl.col("date")
862
< pl.lit("2024-01-02").str.strptime(
863
dtype=pl.Date, format="%Y-%m-%d", ambiguous="latest"
864
)
865
)
866
867
assert_frame_equal(q.collect(), expect)
868
869
q = lf.sql("select * from self where date < '2024-01-02'")
870
print(q.explain())
871
assert_frame_equal(q.collect(), expect)
872
873
874
def test_predicate_stats_eval_nested_binary() -> None:
875
bufs: list[bytes] = []
876
877
for i in range(10):
878
b = io.BytesIO()
879
pl.DataFrame({"x": i}).write_parquet(b)
880
b.seek(0)
881
bufs.append(b.read())
882
883
assert_frame_equal(
884
(
885
pl.scan_parquet(bufs)
886
.filter(pl.col("x") % 2 == 0)
887
.collect(optimizations=pl.QueryOptFlags.none())
888
),
889
pl.DataFrame({"x": [0, 2, 4, 6, 8]}),
890
)
891
892
assert_frame_equal(
893
(
894
pl.scan_parquet(bufs)
895
# The literal eval depth limit is 4 -
896
# * crates/polars-expr/src/expressions/mod.rs::PhysicalExpr::evaluate_inline
897
.filter(pl.col("x") == pl.lit("222").str.slice(0, 1).cast(pl.Int64))
898
.collect()
899
),
900
pl.DataFrame({"x": [2]}),
901
)
902
903
904
@pytest.mark.slow
905
@pytest.mark.parametrize("streaming", [True, False])
906
def test_scan_csv_bytesio_memory_usage(
907
streaming: bool,
908
# memory_usage_without_pyarrow: MemoryUsage,
909
) -> None:
910
# memory_usage = memory_usage_without_pyarrow
911
912
# Create CSV that is ~6-7 MB in size:
913
f = io.BytesIO()
914
df = pl.DataFrame({"mydata": pl.int_range(0, 1_000_000, eager=True)})
915
df.write_csv(f)
916
# assert 6_000_000 < f.tell() < 7_000_000
917
f.seek(0, 0)
918
919
# A lazy scan shouldn't make a full copy of the data:
920
# starting_memory = memory_usage.get_current()
921
assert (
922
pl.scan_csv(f)
923
.filter(pl.col("mydata") == 999_999)
924
.collect(engine="streaming" if streaming else "in-memory")
925
.item()
926
== 999_999
927
)
928
# assert memory_usage.get_peak() - starting_memory < 1_000_000
929
930
931
@pytest.mark.parametrize(
932
"scan_type",
933
[
934
(pl.DataFrame.write_parquet, pl.scan_parquet),
935
(pl.DataFrame.write_ipc, pl.scan_ipc),
936
(pl.DataFrame.write_csv, pl.scan_csv),
937
(pl.DataFrame.write_ndjson, pl.scan_ndjson),
938
],
939
)
940
def test_only_project_row_index(scan_type: tuple[Any, Any]) -> None:
941
write, scan = scan_type
942
943
f = io.BytesIO()
944
df = pl.DataFrame([pl.Series("a", [1, 2, 3], pl.UInt32)])
945
write(df, f)
946
947
f.seek(0)
948
s = scan(f, row_index_name="row_index", row_index_offset=42)
949
950
assert_frame_equal(
951
s.select("row_index").collect(),
952
pl.DataFrame({"row_index": [42, 43, 44]}),
953
check_dtypes=False,
954
)
955
956
957
@pytest.mark.parametrize(
958
"scan_type",
959
[
960
(pl.DataFrame.write_parquet, pl.scan_parquet),
961
(pl.DataFrame.write_ipc, pl.scan_ipc),
962
(pl.DataFrame.write_csv, pl.scan_csv),
963
(pl.DataFrame.write_ndjson, pl.scan_ndjson),
964
],
965
)
966
def test_only_project_include_file_paths(scan_type: tuple[Any, Any]) -> None:
967
write, scan = scan_type
968
969
f = io.BytesIO()
970
df = pl.DataFrame([pl.Series("a", [1, 2, 3], pl.UInt32)])
971
write(df, f)
972
973
f.seek(0)
974
s = scan(f, include_file_paths="file_path")
975
976
# The exact value for in-memory buffers is undefined
977
c = s.select("file_path").collect()
978
assert c.height == 3
979
assert c.columns == ["file_path"]
980
981
982
@pytest.mark.parametrize(
983
"scan_type",
984
[
985
(pl.DataFrame.write_parquet, pl.scan_parquet),
986
pytest.param(
987
(pl.DataFrame.write_ipc, pl.scan_ipc),
988
marks=pytest.mark.xfail(
989
reason="has no allow_missing_columns parameter. https://github.com/pola-rs/polars/issues/21166"
990
),
991
),
992
pytest.param(
993
(pl.DataFrame.write_csv, pl.scan_csv),
994
marks=pytest.mark.xfail(
995
reason="has no allow_missing_columns parameter. https://github.com/pola-rs/polars/issues/21166"
996
),
997
),
998
pytest.param(
999
(pl.DataFrame.write_ndjson, pl.scan_ndjson),
1000
marks=pytest.mark.xfail(
1001
reason="has no allow_missing_columns parameter. https://github.com/pola-rs/polars/issues/21166"
1002
),
1003
),
1004
],
1005
)
1006
def test_only_project_missing(scan_type: tuple[Any, Any]) -> None:
1007
write, scan = scan_type
1008
1009
f = io.BytesIO()
1010
g = io.BytesIO()
1011
write(
1012
pl.DataFrame(
1013
[pl.Series("a", [], pl.UInt32), pl.Series("missing", [], pl.Int32)]
1014
),
1015
f,
1016
)
1017
write(pl.DataFrame([pl.Series("a", [1, 2, 3], pl.UInt32)]), g)
1018
1019
f.seek(0)
1020
g.seek(0)
1021
s = scan([f, g], missing_columns="insert")
1022
1023
assert_frame_equal(
1024
s.select("missing").collect(),
1025
pl.DataFrame([pl.Series("missing", [None, None, None], pl.Int32)]),
1026
)
1027
1028
1029
@pytest.mark.write_disk
1030
@pytest.mark.parametrize(
1031
"scan_type",
1032
[
1033
(pl.DataFrame.write_parquet, pl.scan_parquet),
1034
(pl.DataFrame.write_ipc, pl.scan_ipc),
1035
(pl.DataFrame.write_csv, pl.scan_csv),
1036
(pl.DataFrame.write_ndjson, pl.scan_ndjson),
1037
],
1038
)
1039
def test_async_read_21945(tmp_path: Path, scan_type: tuple[Any, Any]) -> None:
1040
f1 = tmp_path / "f1"
1041
f2 = tmp_path / "f2"
1042
1043
pl.DataFrame({"value": [1, 2]}).write_parquet(f1)
1044
pl.DataFrame({"value": [3]}).write_parquet(f2)
1045
1046
df = (
1047
pl.scan_parquet([format_file_uri(str(f1)), str(f2)], include_file_paths="foo")
1048
.filter(value=1)
1049
.collect()
1050
)
1051
1052
assert_frame_equal(
1053
df,
1054
pl.DataFrame(
1055
{
1056
"value": [1],
1057
"foo": [format_file_uri(f1)],
1058
}
1059
),
1060
)
1061
1062
1063
@pytest.mark.write_disk
1064
@pytest.mark.parametrize("with_str_contains", [False, True])
1065
def test_hive_pruning_str_contains_21706(
1066
tmp_path: Path, capfd: Any, plmonkeypatch: PlMonkeyPatch, with_str_contains: bool
1067
) -> None:
1068
df = pl.DataFrame(
1069
{
1070
"pdate": [20250301, 20250301, 20250302, 20250302, 20250303, 20250303],
1071
"prod_id": ["A1", "A2", "B1", "B2", "C1", "C2"],
1072
"price": [11, 22, 33, 44, 55, 66],
1073
}
1074
)
1075
1076
df.write_parquet(tmp_path, partition_by=["pdate"])
1077
1078
plmonkeypatch.setenv("POLARS_VERBOSE", "1")
1079
f = pl.col("pdate") == 20250303
1080
if with_str_contains:
1081
f = f & pl.col("prod_id").str.contains("1")
1082
1083
df = pl.scan_parquet(tmp_path, hive_partitioning=True).filter(f).collect()
1084
1085
captured = capfd.readouterr().err
1086
assert "allows skipping 2 / 3" in captured
1087
1088
assert_frame_equal(
1089
df,
1090
pl.scan_parquet(tmp_path, hive_partitioning=True).collect().filter(f),
1091
)
1092
1093
1094
@pytest.mark.skipif(
1095
sys.platform == "win32", reason="path characters not valid on Windows"
1096
)
1097
@pytest.mark.parametrize(
1098
("scan", "write"),
1099
[
1100
(pl.scan_ipc, pl.DataFrame.write_ipc),
1101
(pl.scan_parquet, pl.DataFrame.write_parquet),
1102
(pl.scan_csv, pl.DataFrame.write_csv),
1103
],
1104
)
1105
@pytest.mark.parametrize("file_name", ["%?", "[", "]"])
1106
def test_scan_no_glob_special_chars_23292(
1107
tmp_path: Path, file_name: str, scan: Any, write: Any
1108
) -> None:
1109
tmp_path.mkdir(exist_ok=True)
1110
1111
path = tmp_path / file_name
1112
df = pl.DataFrame({"a": 1})
1113
write(df, path)
1114
1115
assert_frame_equal(scan(path, glob=False).collect(), df)
1116
assert_frame_equal(scan(f"file://{path}", glob=False).collect(), df)
1117
1118
1119
@pytest.mark.write_disk
1120
@pytest.mark.parametrize(
1121
("scan_function", "failed_message", "name_in_context"),
1122
[
1123
(
1124
pl.scan_parquet,
1125
"failed to retrieve first file schema (parquet)",
1126
"'parquet scan'",
1127
),
1128
(pl.scan_ipc, "failed to retrieve first file schema (ipc)", "'ipc scan'"),
1129
(pl.scan_csv, "failed to retrieve file schemas (csv)", "'csv scan'"),
1130
(
1131
pl.scan_ndjson,
1132
"failed to retrieve first file schema (ndjson)",
1133
"'ndjson scan'",
1134
),
1135
],
1136
)
1137
def test_scan_empty_paths_friendly_error(
1138
tmp_path: Path,
1139
scan_function: Any,
1140
failed_message: str,
1141
name_in_context: str,
1142
) -> None:
1143
q = scan_function(tmp_path)
1144
1145
with pytest.raises(pl.exceptions.ComputeError) as exc:
1146
q.collect()
1147
1148
exc_str = exc.exconly()
1149
1150
assert (
1151
f"ComputeError: {failed_message}: expanded paths were empty "
1152
"(path expansion input: 'paths: "
1153
) in exc_str
1154
1155
assert "glob: true)." in exc_str
1156
assert exc_str.count(tmp_path.name) == 1
1157
1158
if scan_function is pl.scan_parquet:
1159
assert (
1160
"Hint: passing a schema can allow this scan to succeed with an empty DataFrame."
1161
in exc_str
1162
)
1163
1164
# Multiple input paths
1165
q = scan_function([tmp_path, tmp_path])
1166
1167
with pytest.raises(pl.exceptions.ComputeError) as exc:
1168
q.collect()
1169
1170
exc_str = exc.exconly()
1171
1172
assert (
1173
f"ComputeError: {failed_message}: expanded paths were empty "
1174
"(path expansion input: 'paths: "
1175
) in exc_str
1176
1177
assert "glob: true)." in exc_str
1178
1179
assert exc_str.count(tmp_path.name) == 2
1180
1181
q = scan_function([])
1182
1183
with pytest.raises(pl.exceptions.ComputeError) as exc:
1184
q.collect()
1185
1186
exc_str = exc.exconly()
1187
1188
# There is no "path expansion resulted in" for this error message as the
1189
# original input sources were empty.
1190
assert f"ComputeError: {failed_message}: empty input: paths: []" in exc_str
1191
1192
if scan_function is pl.scan_parquet:
1193
assert (
1194
"Hint: passing a schema can allow this scan to succeed with an empty DataFrame."
1195
in exc_str
1196
)
1197
1198
# TODO: glob parameter not supported in some scan types
1199
cx = (
1200
pytest.raises(pl.exceptions.ComputeError, match="glob: false")
1201
if (
1202
scan_function is pl.scan_csv
1203
or scan_function is pl.scan_parquet
1204
or scan_function is pl.scan_ipc
1205
)
1206
else pytest.raises(TypeError, match="unexpected keyword argument 'glob'") # type: ignore[arg-type]
1207
)
1208
1209
with cx:
1210
scan_function(tmp_path, glob=False).collect()
1211
1212
1213
@pytest.mark.parametrize("paths", [[], ["file:///non-existent"]])
1214
@pytest.mark.parametrize("scan_func", [pl.scan_parquet, pl.scan_csv, pl.scan_ndjson])
1215
def test_scan_with_schema_skips_schema_inference(
1216
paths: list[str], scan_func: Any
1217
) -> None:
1218
schema = {"A": pl.Int64}
1219
1220
q = scan_func(paths, schema=schema).head(0)
1221
assert_frame_equal(q.collect(engine="streaming"), pl.DataFrame(schema=schema))
1222
1223
1224
@pytest.mark.parametrize("format_name", ["csv", "ndjson", "lines"])
1225
def test_scan_negative_slice_decompress(format_name: str) -> None:
1226
col_name = "x"
1227
1228
# We could go through sink, but not for lines and this way we are testing
1229
# more narrowly.
1230
def format_line(val: int) -> str:
1231
if format_name == "csv":
1232
return f"{val}\n"
1233
elif format_name == "ndjson":
1234
return f'{{"{col_name}":{val}}}\n'
1235
elif format_name == "lines":
1236
return f"{val}\n"
1237
else:
1238
pytest.fail("unreachable")
1239
1240
buf = bytearray()
1241
if format_name == "csv":
1242
buf.extend(f"{col_name}\n".encode())
1243
for val in range(47):
1244
buf.extend(format_line(val).encode())
1245
1246
compressed_data = zlib.compress(buf, level=1)
1247
1248
lf = getattr(pl, f"scan_{format_name}")(compressed_data).slice(-9, 5)
1249
if format_name == "lines":
1250
lf = lf.select(pl.col("lines").alias(col_name).str.to_integer())
1251
1252
expected = [pl.Series("x", [38, 39, 40, 41, 42])]
1253
got = lf.collect(engine="streaming")
1254
assert_frame_equal(got, pl.DataFrame(expected))
1255
1256
1257
def corrupt_compressed_impl(base_line: bytes, target_size: int) -> bytes:
1258
large_and_simple_data = base_line * round(target_size / len(base_line))
1259
compressed_data = zlib.compress(large_and_simple_data, level=0)
1260
1261
corruption_start_pos = round(len(compressed_data) * 0.9)
1262
corruption_len = 500
1263
1264
# The idea is to corrupt the input to make sure the scan never fully
1265
# decompresses the input.
1266
corrupted_data = bytearray(compressed_data)
1267
corrupted_data[corruption_start_pos : corruption_start_pos + corruption_len] = (
1268
b"\00"
1269
)
1270
# ~1.8MB of valid zlib compressed data to read before the corrupted data
1271
# appears.
1272
return bytes(corrupted_data)
1273
1274
1275
@pytest.fixture(scope="session")
1276
def corrupt_compressed_csv() -> bytes:
1277
return corrupt_compressed_impl(b"line_val\n", int(2e6))
1278
1279
1280
@pytest.fixture(scope="session")
1281
def corrupt_compressed_ndjson() -> bytes:
1282
# The decompressor is part of the line-batch provider which only stops once
1283
# processing noticed that it doesn't need anymore data. This can take a
1284
# bit. Tested with debug/release and calm/stressed machine.
1285
return corrupt_compressed_impl(b'{"line_val": 45}\n', int(100e6))
1286
1287
1288
@pytest.mark.parametrize("schema", [{"line_val": pl.String}, None])
1289
def test_scan_csv_streaming_decompression(
1290
corrupt_compressed_csv: bytes, schema: Any
1291
) -> None:
1292
slice_count = 11
1293
1294
df = (
1295
pl.scan_csv(io.BytesIO(corrupt_compressed_csv), schema=schema)
1296
.slice(0, slice_count)
1297
.collect(engine="streaming")
1298
)
1299
1300
expected = [
1301
pl.Series("line_val", ["line_val"] * slice_count, dtype=pl.String),
1302
]
1303
assert_frame_equal(df, pl.DataFrame(expected))
1304
1305
1306
@pytest.mark.slow
1307
@pytest.mark.parametrize("schema", [{"line_val": pl.Int64}, None])
1308
def test_scan_ndjson_streaming_decompression(
1309
corrupt_compressed_ndjson: bytes, schema: Any
1310
) -> None:
1311
slice_count = 11
1312
1313
df = (
1314
pl.scan_ndjson(io.BytesIO(corrupt_compressed_ndjson), schema=schema)
1315
.slice(0, slice_count)
1316
.collect(engine="streaming")
1317
)
1318
1319
expected = [
1320
pl.Series("line_val", [45] * slice_count, dtype=pl.Int64),
1321
]
1322
assert_frame_equal(df, pl.DataFrame(expected))
1323
1324
1325
def test_scan_file_uri_hostname_component() -> None:
1326
q = pl.scan_parquet("file://hostname:80/data.parquet")
1327
1328
with pytest.raises(
1329
ComputeError,
1330
match="unsupported: non-empty hostname for 'file:' URI: 'hostname:80'",
1331
):
1332
q.collect()
1333
1334
1335
@pytest.mark.write_disk
1336
@pytest.mark.parametrize("polars_force_async", ["0", "1"])
1337
@pytest.mark.parametrize("n_repeats", [1, 2])
1338
def test_scan_path_expansion_sorting_24528(
1339
tmp_path: Path,
1340
polars_force_async: str,
1341
n_repeats: int,
1342
plmonkeypatch: PlMonkeyPatch,
1343
) -> None:
1344
plmonkeypatch.setenv("POLARS_FORCE_ASYNC", polars_force_async)
1345
1346
relpaths = ["a.parquet", "a/a.parquet", "ab.parquet"]
1347
1348
for p in relpaths:
1349
Path(tmp_path / p).parent.mkdir(exist_ok=True, parents=True)
1350
pl.DataFrame({"relpath": p}).write_parquet(tmp_path / p)
1351
1352
assert_frame_equal(
1353
pl.scan_parquet(n_repeats * [tmp_path]).collect(),
1354
pl.DataFrame({"relpath": n_repeats * relpaths}),
1355
)
1356
1357
assert_frame_equal(
1358
pl.scan_parquet(n_repeats * [f"{tmp_path}/**/*"]).collect(),
1359
pl.DataFrame({"relpath": n_repeats * relpaths}),
1360
)
1361
1362
assert_frame_equal(
1363
pl.scan_parquet(n_repeats * [format_file_uri(tmp_path) + "/"]).collect(),
1364
pl.DataFrame({"relpath": n_repeats * relpaths}),
1365
)
1366
1367
assert_frame_equal(
1368
pl.scan_parquet(n_repeats * [format_file_uri(f"{tmp_path}/**/*")]).collect(),
1369
pl.DataFrame({"relpath": n_repeats * relpaths}),
1370
)
1371
1372
1373
def test_scan_sink_error_captures_path() -> None:
1374
storage_options = {
1375
"aws_endpoint_url": "http://localhost:333",
1376
"max_retries": 0,
1377
}
1378
1379
q = pl.scan_parquet(
1380
"s3://.../...",
1381
storage_options=storage_options,
1382
credential_provider=None,
1383
)
1384
1385
with pytest.raises(OSError, match=r"path: s3://.../..."):
1386
q.collect()
1387
1388
with pytest.raises(OSError, match=r"path: s3://.../..."):
1389
pl.LazyFrame({"a": 1}).sink_parquet(
1390
"s3://.../...",
1391
storage_options=storage_options,
1392
credential_provider=None,
1393
)
1394
1395
1396
# TODO: Uncomment file_format once properly instrumented
1397
@pytest.mark.parametrize(
1398
"file_format",
1399
[
1400
"parquet",
1401
"ipc",
1402
# "csv",
1403
"ndjson",
1404
],
1405
)
1406
@pytest.mark.parametrize("partitioned", [True, False])
1407
@pytest.mark.write_disk
1408
def test_scan_metrics(
1409
plmonkeypatch: PlMonkeyPatch,
1410
capfd: pytest.CaptureFixture[str],
1411
file_format: str,
1412
tmp_path: Path,
1413
partitioned: bool,
1414
) -> None:
1415
path = tmp_path / "a"
1416
1417
df = pl.DataFrame({"a": 1})
1418
1419
getattr(pl.LazyFrame, f"sink_{file_format}")(
1420
df.lazy(),
1421
path
1422
if not partitioned
1423
else pl.PartitionBy("", file_path_provider=(lambda _: path), key="a"),
1424
)
1425
1426
with plmonkeypatch.context() as cx:
1427
cx.setenv("POLARS_LOG_METRICS", "1")
1428
cx.setenv("POLARS_FORCE_ASYNC", "1")
1429
capfd.readouterr()
1430
out = getattr(pl, f"scan_{file_format}")(
1431
path,
1432
).collect()
1433
capture = capfd.readouterr().err
1434
1435
[line] = (x for x in capture.splitlines() if x.startswith("multi-scan"))
1436
1437
logged_bytes_requested = int(
1438
pl.select(pl.lit(line).str.extract(r"total_bytes_requested=(\d+)")).item()
1439
)
1440
1441
logged_bytes_received = int(
1442
pl.select(pl.lit(line).str.extract(r"total_bytes_received=(\d+)")).item()
1443
)
1444
1445
# because of how metadata is accounted for, the bytes_requested may deviate
1446
# from the actual file_size
1447
file_size = path.stat().st_size
1448
# note, 131_072 is the maximum metadata size estimate for parquet, ipc
1449
upper_limit_bytes = min(2 * file_size, file_size + 131072)
1450
lower_limit_bytes = 2
1451
1452
assert logged_bytes_requested <= upper_limit_bytes
1453
assert logged_bytes_requested >= lower_limit_bytes
1454
assert logged_bytes_received == logged_bytes_requested
1455
1456
assert_frame_equal(out, df)
1457
1458