Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/io/test_write.py
6939 views
1
from __future__ import annotations
2
3
from pathlib import Path
4
from typing import Callable
5
6
import pytest
7
8
import polars as pl
9
from polars.testing.asserts.frame import assert_frame_equal
10
11
READ_WRITE_FUNC_PARAM = [
12
(pl.read_parquet, pl.DataFrame.write_parquet),
13
(lambda *a: pl.scan_csv(*a).collect(), pl.DataFrame.write_csv),
14
(lambda *a: pl.scan_ipc(*a).collect(), pl.DataFrame.write_ipc),
15
# Sink
16
(pl.read_parquet, lambda df, path: pl.DataFrame.lazy(df).sink_parquet(path)),
17
(
18
lambda *a: pl.scan_csv(*a).collect(),
19
lambda df, path: pl.DataFrame.lazy(df).sink_csv(path),
20
),
21
(
22
lambda *a: pl.scan_ipc(*a).collect(),
23
lambda df, path: pl.DataFrame.lazy(df).sink_ipc(path),
24
),
25
(
26
lambda *a: pl.scan_ndjson(*a).collect(),
27
lambda df, path: pl.DataFrame.lazy(df).sink_ndjson(path),
28
),
29
]
30
31
32
@pytest.mark.parametrize(
33
("read_func", "write_func"),
34
READ_WRITE_FUNC_PARAM,
35
)
36
@pytest.mark.write_disk
37
def test_write_async(
38
read_func: Callable[[Path], pl.DataFrame],
39
write_func: Callable[[pl.DataFrame, Path], None],
40
tmp_path: Path,
41
) -> None:
42
tmp_path.mkdir(exist_ok=True)
43
path = (tmp_path / "1").absolute()
44
path = f"file://{path}" # type: ignore[assignment]
45
46
df = pl.DataFrame({"x": 1})
47
48
write_func(df, path)
49
50
assert_frame_equal(read_func(path), df)
51
52
53
@pytest.mark.parametrize(
54
("read_func", "write_func"),
55
READ_WRITE_FUNC_PARAM,
56
)
57
@pytest.mark.parametrize("opt_absolute_fn", [Path, Path.absolute])
58
@pytest.mark.write_disk
59
def test_write_async_force_async(
60
read_func: Callable[[Path], pl.DataFrame],
61
write_func: Callable[[pl.DataFrame, Path], None],
62
opt_absolute_fn: Callable[[Path], Path],
63
tmp_path: Path,
64
monkeypatch: pytest.MonkeyPatch,
65
) -> None:
66
monkeypatch.setenv("POLARS_FORCE_ASYNC", "1")
67
tmp_path.mkdir(exist_ok=True)
68
path = opt_absolute_fn(tmp_path / "1")
69
70
df = pl.DataFrame({"x": 1})
71
72
write_func(df, path)
73
74
assert_frame_equal(read_func(path), df)
75
76
77
@pytest.mark.write_disk
78
def test_write_with_storage_options_22873(tmp_path: Path) -> None:
79
tmp_path.mkdir(exist_ok=True)
80
path = tmp_path / "data"
81
82
df = pl.DataFrame({"x": 1})
83
84
for func in [
85
pl.DataFrame.write_parquet,
86
pl.DataFrame.write_ipc,
87
pl.DataFrame.write_csv,
88
pl.DataFrame.write_ndjson,
89
]:
90
# TODO: write_ndjson cloud support
91
if func is pl.DataFrame.write_ndjson:
92
with pytest.raises(
93
TypeError, match="unexpected keyword argument 'storage_options'"
94
):
95
func(df, path, storage_options={"test": "1"}) # type: ignore[operator]
96
97
continue
98
99
func(df, path, storage_options={"test": "1"}) # type: ignore[operator]
100
101
lf = df.lazy()
102
103
for func in [
104
pl.LazyFrame.sink_parquet,
105
pl.LazyFrame.sink_ipc,
106
pl.LazyFrame.sink_csv,
107
pl.LazyFrame.sink_ndjson,
108
]:
109
func(lf, path, storage_options={"test": "1"}) # type: ignore[operator]
110
111