Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/io/test_write.py
8415 views
1
from __future__ import annotations
2
3
from pathlib import Path
4
from typing import TYPE_CHECKING
5
6
import pytest
7
8
import polars as pl
9
from polars.testing.asserts.frame import assert_frame_equal
10
from tests.unit.io.conftest import format_file_uri
11
12
if TYPE_CHECKING:
13
from collections.abc import Callable
14
15
from tests.conftest import PlMonkeyPatch
16
17
READ_WRITE_FUNC_PARAM = [
18
(pl.read_parquet, pl.DataFrame.write_parquet),
19
(lambda *a: pl.scan_csv(*a).collect(), pl.DataFrame.write_csv),
20
(lambda *a: pl.scan_ipc(*a).collect(), pl.DataFrame.write_ipc),
21
# Sink
22
(pl.read_parquet, lambda df, path: pl.DataFrame.lazy(df).sink_parquet(path)),
23
(
24
lambda *a: pl.scan_csv(*a).collect(),
25
lambda df, path: pl.DataFrame.lazy(df).sink_csv(path),
26
),
27
(
28
lambda *a: pl.scan_ipc(*a).collect(),
29
lambda df, path: pl.DataFrame.lazy(df).sink_ipc(path),
30
),
31
(
32
lambda *a: pl.scan_ndjson(*a).collect(),
33
lambda df, path: pl.DataFrame.lazy(df).sink_ndjson(path),
34
),
35
]
36
37
38
@pytest.mark.parametrize(
39
("read_func", "write_func"),
40
READ_WRITE_FUNC_PARAM,
41
)
42
@pytest.mark.write_disk
43
def test_write_async(
44
read_func: Callable[[Path], pl.DataFrame],
45
write_func: Callable[[pl.DataFrame, Path], None],
46
tmp_path: Path,
47
) -> None:
48
tmp_path.mkdir(exist_ok=True)
49
path = (tmp_path / "1").absolute()
50
path = format_file_uri(path) # type: ignore[assignment]
51
52
df = pl.DataFrame({"x": 1})
53
54
write_func(df, path)
55
56
assert_frame_equal(read_func(path), df)
57
58
59
@pytest.mark.parametrize(
60
("read_func", "write_func"),
61
READ_WRITE_FUNC_PARAM,
62
)
63
@pytest.mark.parametrize("opt_absolute_fn", [Path, Path.absolute])
64
@pytest.mark.write_disk
65
def test_write_async_force_async(
66
read_func: Callable[[Path], pl.DataFrame],
67
write_func: Callable[[pl.DataFrame, Path], None],
68
opt_absolute_fn: Callable[[Path], Path],
69
tmp_path: Path,
70
plmonkeypatch: PlMonkeyPatch,
71
) -> None:
72
plmonkeypatch.setenv("POLARS_FORCE_ASYNC", "1")
73
tmp_path.mkdir(exist_ok=True)
74
path = opt_absolute_fn(tmp_path / "1")
75
76
df = pl.DataFrame({"x": 1})
77
78
write_func(df, path)
79
80
assert_frame_equal(read_func(path), df)
81
82
83
@pytest.mark.write_disk
84
def test_write_with_storage_options_22873(tmp_path: Path) -> None:
85
tmp_path.mkdir(exist_ok=True)
86
path = tmp_path / "data"
87
88
df = pl.DataFrame({"x": 1})
89
90
for func in [
91
pl.DataFrame.write_parquet,
92
pl.DataFrame.write_ipc,
93
pl.DataFrame.write_csv,
94
pl.DataFrame.write_ndjson,
95
]:
96
# TODO: write_ndjson cloud support
97
if func is pl.DataFrame.write_ndjson:
98
with pytest.raises(
99
TypeError, match="unexpected keyword argument 'storage_options'"
100
):
101
func(df, path, storage_options={"test": "1"}) # type: ignore[operator]
102
103
continue
104
105
func(df, path, storage_options={"test": "1"}) # type: ignore[operator]
106
107
lf = df.lazy()
108
109
for func in [
110
pl.LazyFrame.sink_parquet,
111
pl.LazyFrame.sink_ipc,
112
pl.LazyFrame.sink_csv,
113
pl.LazyFrame.sink_ndjson,
114
]:
115
func(lf, path, storage_options={"test": "1"}) # type: ignore[operator]
116
117