Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/io/test_sink.py
6939 views
1
import io
2
from pathlib import Path
3
from typing import Any
4
5
import pytest
6
7
import polars as pl
8
from polars._typing import EngineType
9
from polars.testing import assert_frame_equal
10
11
SINKS = [
12
(pl.scan_ipc, pl.LazyFrame.sink_ipc),
13
(pl.scan_parquet, pl.LazyFrame.sink_parquet),
14
(pl.scan_csv, pl.LazyFrame.sink_csv),
15
(pl.scan_ndjson, pl.LazyFrame.sink_ndjson),
16
]
17
18
19
@pytest.mark.parametrize(("scan", "sink"), SINKS)
20
@pytest.mark.parametrize("engine", ["in-memory", "streaming"])
21
@pytest.mark.write_disk
22
def test_mkdir(tmp_path: Path, scan: Any, sink: Any, engine: EngineType) -> None:
23
df = pl.DataFrame(
24
{
25
"a": [1, 2, 3],
26
}
27
)
28
29
with pytest.raises(FileNotFoundError):
30
sink(df.lazy(), tmp_path / "a" / "b" / "c" / "file", engine=engine)
31
32
f = tmp_path / "a" / "b" / "c" / "file2"
33
sink(df.lazy(), f, mkdir=True)
34
35
assert_frame_equal(scan(f).collect(), df)
36
37
38
def test_write_mkdir(tmp_path: Path) -> None:
39
df = pl.DataFrame(
40
{
41
"a": [1, 2, 3],
42
}
43
)
44
45
with pytest.raises(FileNotFoundError):
46
df.write_parquet(tmp_path / "a" / "b" / "c" / "file")
47
48
f = tmp_path / "a" / "b" / "c" / "file2"
49
df.write_parquet(f, mkdir=True)
50
51
assert_frame_equal(pl.read_parquet(f), df)
52
53
54
@pytest.mark.parametrize(("scan", "sink"), SINKS)
55
@pytest.mark.parametrize("engine", ["in-memory", "streaming"])
56
@pytest.mark.write_disk
57
def test_lazy_sinks(tmp_path: Path, scan: Any, sink: Any, engine: EngineType) -> None:
58
df = pl.DataFrame({"a": [1, 2, 3]})
59
lf1 = sink(df.lazy(), tmp_path / "a", lazy=True)
60
lf2 = sink(df.lazy(), tmp_path / "b", lazy=True)
61
62
assert not Path(tmp_path / "a").exists()
63
assert not Path(tmp_path / "b").exists()
64
65
pl.collect_all([lf1, lf2], engine=engine)
66
67
assert_frame_equal(scan(tmp_path / "a").collect(), df)
68
assert_frame_equal(scan(tmp_path / "b").collect(), df)
69
70
71
@pytest.mark.parametrize(
72
"sink",
73
[
74
pl.LazyFrame.sink_ipc,
75
pl.LazyFrame.sink_parquet,
76
pl.LazyFrame.sink_csv,
77
pl.LazyFrame.sink_ndjson,
78
],
79
)
80
@pytest.mark.write_disk
81
def test_double_lazy_error(sink: Any) -> None:
82
df = pl.DataFrame({})
83
84
with pytest.raises(
85
pl.exceptions.InvalidOperationError,
86
match="cannot create a sink on top of another sink",
87
):
88
sink(sink(df.lazy(), "a", lazy=True), "b")
89
90
91
@pytest.mark.parametrize(("scan", "sink"), SINKS)
92
def test_sink_to_memory(sink: Any, scan: Any) -> None:
93
df = pl.DataFrame(
94
{
95
"a": [5, 10, 1996],
96
}
97
)
98
99
f = io.BytesIO()
100
sink(df.lazy(), f)
101
102
f.seek(0)
103
assert_frame_equal(
104
scan(f).collect(),
105
df,
106
)
107
108
109
@pytest.mark.parametrize(("scan", "sink"), SINKS)
110
@pytest.mark.write_disk
111
def test_sink_to_file(tmp_path: Path, sink: Any, scan: Any) -> None:
112
df = pl.DataFrame(
113
{
114
"a": [5, 10, 1996],
115
}
116
)
117
118
with (tmp_path / "f").open("w+") as f:
119
sink(df.lazy(), f, sync_on_close="all")
120
f.seek(0)
121
assert_frame_equal(
122
scan(f).collect(),
123
df,
124
)
125
126
127
@pytest.mark.parametrize(("scan", "sink"), SINKS)
128
def test_sink_empty(sink: Any, scan: Any) -> None:
129
df = pl.LazyFrame(data={"col1": ["a"]})
130
131
df_empty = pl.LazyFrame(
132
data={"col1": []},
133
schema={"col1": str},
134
)
135
136
expected = df_empty.join(df, how="cross").collect()
137
expected_schema = expected.schema
138
139
kwargs = {}
140
if scan == pl.scan_ndjson:
141
kwargs["schema"] = expected_schema
142
143
# right empty
144
f = io.BytesIO()
145
sink(df.join(df_empty, how="cross"), f)
146
f.seek(0)
147
assert_frame_equal(scan(f, **kwargs), expected.lazy())
148
149
# left empty
150
f.seek(0)
151
sink(df_empty.join(df, how="cross"), f)
152
f.truncate()
153
f.seek(0)
154
assert_frame_equal(scan(f, **kwargs), expected.lazy())
155
156
# both empty
157
f.seek(0)
158
sink(df_empty.join(df_empty, how="cross"), f)
159
f.truncate()
160
f.seek(0)
161
assert_frame_equal(scan(f, **kwargs), expected.lazy())
162
163