Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/streaming/test_streaming_io.py
8416 views
1
from __future__ import annotations
2
3
from typing import TYPE_CHECKING, Any
4
5
import pytest
6
7
import polars as pl
8
from polars.testing import assert_frame_equal
9
10
if TYPE_CHECKING:
11
from pathlib import Path
12
13
from tests.conftest import PlMonkeyPatch
14
15
pytestmark = pytest.mark.xdist_group("streaming")
16
17
18
@pytest.mark.write_disk
19
def test_streaming_parquet_glob_5900(df: pl.DataFrame, tmp_path: Path) -> None:
20
tmp_path.mkdir(exist_ok=True)
21
file_path = tmp_path / "small.parquet"
22
df.write_parquet(file_path)
23
24
path_glob = tmp_path / "small*.parquet"
25
result = (
26
pl.scan_parquet(path_glob).select(pl.all().first()).collect(engine="streaming")
27
)
28
assert result.shape == (1, df.width)
29
30
31
def test_scan_slice_streaming(io_files_path: Path) -> None:
32
foods_file_path = io_files_path / "foods1.csv"
33
df = pl.scan_csv(foods_file_path).head(5).collect(engine="streaming")
34
assert df.shape == (5, 4)
35
36
# globbing
37
foods_file_path = io_files_path / "foods*.csv"
38
df = pl.scan_csv(foods_file_path).head(5).collect(engine="streaming")
39
assert df.shape == (5, 4)
40
41
42
@pytest.mark.parametrize("dtype", [pl.Int8, pl.UInt8, pl.Int16, pl.UInt16])
43
def test_scan_csv_overwrite_small_dtypes(
44
io_files_path: Path, dtype: pl.DataType
45
) -> None:
46
file_path = io_files_path / "foods1.csv"
47
df = pl.scan_csv(file_path, schema_overrides={"sugars_g": dtype}).collect(
48
engine="streaming"
49
)
50
assert df.dtypes == [pl.String, pl.Int64, pl.Float64, dtype]
51
52
53
@pytest.mark.write_disk
54
def test_sink_parquet(io_files_path: Path, tmp_path: Path) -> None:
55
tmp_path.mkdir(exist_ok=True)
56
57
file = io_files_path / "small.parquet"
58
59
file_path = tmp_path / "sink.parquet"
60
61
df_scanned = pl.scan_parquet(file)
62
df_scanned.sink_parquet(file_path)
63
64
result = pl.read_parquet(file_path)
65
df_read = pl.read_parquet(file)
66
assert_frame_equal(result, df_read)
67
68
69
@pytest.mark.write_disk
70
def test_sink_parquet_10115(tmp_path: Path) -> None:
71
in_path = tmp_path / "in.parquet"
72
out_path = tmp_path / "out.parquet"
73
74
# this fails if the schema will be incorrectly due to the projection
75
# pushdown
76
(pl.DataFrame([{"x": 1, "y": "foo"}]).write_parquet(in_path))
77
78
joiner = pl.LazyFrame([{"y": "foo", "z": "_"}])
79
80
(
81
pl.scan_parquet(in_path)
82
.join(joiner, how="left", on="y")
83
.select("x", "y", "z")
84
.sink_parquet(out_path) #
85
)
86
87
assert pl.read_parquet(out_path).to_dict(as_series=False) == {
88
"x": [1],
89
"y": ["foo"],
90
"z": ["_"],
91
}
92
93
94
@pytest.mark.write_disk
95
def test_sink_ipc(io_files_path: Path, tmp_path: Path) -> None:
96
tmp_path.mkdir(exist_ok=True)
97
98
file = io_files_path / "small.parquet"
99
100
file_path = tmp_path / "sink.ipc"
101
102
df_scanned = pl.scan_parquet(file)
103
df_scanned.sink_ipc(file_path)
104
105
result = pl.read_ipc(file_path)
106
df_read = pl.read_parquet(file)
107
assert_frame_equal(result, df_read)
108
109
110
@pytest.mark.write_disk
111
def test_sink_csv(io_files_path: Path, tmp_path: Path) -> None:
112
source_file = io_files_path / "small.parquet"
113
target_file = tmp_path / "sink.csv"
114
115
pl.scan_parquet(source_file).sink_csv(target_file)
116
117
source_data = pl.read_parquet(source_file)
118
target_data = pl.read_csv(target_file)
119
assert_frame_equal(target_data, source_data)
120
121
122
@pytest.mark.write_disk
123
def test_sink_csv_14494(tmp_path: Path) -> None:
124
pl.LazyFrame({"c": [1, 2, 3]}, schema={"c": pl.Int64}).filter(
125
pl.col("c") > 10
126
).sink_csv(tmp_path / "sink.csv")
127
assert pl.read_csv(tmp_path / "sink.csv").columns == ["c"]
128
129
130
@pytest.mark.parametrize(("value"), ["abc", ""])
131
def test_sink_csv_exception_for_separator(value: str) -> None:
132
df = pl.LazyFrame({"dummy": ["abc"]})
133
with pytest.raises(ValueError, match="should be a single byte character, but is"):
134
df.sink_csv("path", separator=value)
135
136
137
@pytest.mark.parametrize(("value"), ["abc", ""])
138
def test_sink_csv_exception_for_quote(value: str) -> None:
139
df = pl.LazyFrame({"dummy": ["abc"]})
140
with pytest.raises(ValueError, match="should be a single byte character, but is"):
141
df.sink_csv("path", quote_char=value)
142
143
144
def test_sink_csv_batch_size_zero() -> None:
145
lf = pl.LazyFrame({"a": [1, 2, 3], "b": [1, 2, 3]})
146
with pytest.raises(ValueError, match="invalid zero value"):
147
lf.sink_csv("test.csv", batch_size=0)
148
149
150
@pytest.mark.write_disk
151
def test_sink_csv_nested_data(tmp_path: Path) -> None:
152
tmp_path.mkdir(exist_ok=True)
153
path = tmp_path / "data.csv"
154
155
lf = pl.LazyFrame({"list": [[1, 2, 3, 4, 5]]})
156
with pytest.raises(
157
pl.exceptions.ComputeError, match="CSV format does not support nested data"
158
):
159
lf.sink_csv(path)
160
161
162
def test_scan_csv_only_header_10792(io_files_path: Path) -> None:
163
foods_file_path = io_files_path / "only_header.csv"
164
df = pl.scan_csv(foods_file_path).collect(engine="streaming")
165
assert df.to_dict(as_series=False) == {"Name": [], "Address": []}
166
167
168
def test_scan_empty_csv_10818(io_files_path: Path) -> None:
169
empty_file_path = io_files_path / "empty.csv"
170
df = pl.scan_csv(empty_file_path, raise_if_empty=False).collect(engine="streaming")
171
assert df.is_empty()
172
173
174
@pytest.mark.write_disk
175
def test_streaming_cross_join_schema(tmp_path: Path) -> None:
176
file_path = tmp_path / "temp.parquet"
177
a = pl.DataFrame({"a": [1, 2]}).lazy()
178
b = pl.DataFrame({"b": ["b"]}).lazy()
179
a.join(b, how="cross").sink_parquet(file_path)
180
read = pl.read_parquet(file_path, parallel="none")
181
assert read.to_dict(as_series=False) == {"a": [1, 2], "b": ["b", "b"]}
182
183
184
@pytest.mark.write_disk
185
def test_sink_ndjson_should_write_same_data(
186
io_files_path: Path, tmp_path: Path
187
) -> None:
188
tmp_path.mkdir(exist_ok=True)
189
190
source_path = io_files_path / "foods1.csv"
191
target_path = tmp_path / "foods_test.ndjson"
192
193
expected = pl.read_csv(source_path)
194
195
lf = pl.scan_csv(source_path)
196
lf.sink_ndjson(target_path)
197
df = pl.read_ndjson(target_path)
198
199
assert_frame_equal(df, expected)
200
201
202
@pytest.mark.write_disk
203
@pytest.mark.parametrize("streaming", [False, True])
204
def test_parquet_eq_statistics(
205
plmonkeypatch: PlMonkeyPatch, capfd: Any, tmp_path: Path, streaming: bool
206
) -> None:
207
tmp_path.mkdir(exist_ok=True)
208
209
plmonkeypatch.setenv("POLARS_VERBOSE", "1")
210
211
df = pl.DataFrame({"idx": pl.arange(100, 200, eager=True)}).with_columns(
212
(pl.col("idx") // 25).alias("part")
213
)
214
df = pl.concat(df.partition_by("part", as_dict=False), rechunk=False)
215
assert df.n_chunks("all") == [4, 4]
216
217
file_path = tmp_path / "stats.parquet"
218
df.write_parquet(file_path, statistics=True, use_pyarrow=False)
219
220
for pred in [
221
pl.col("idx") == 50,
222
pl.col("idx") == 150,
223
pl.col("idx") == 210,
224
]:
225
result = (
226
pl.scan_parquet(file_path)
227
.filter(pred)
228
.collect(engine="streaming" if streaming else "in-memory")
229
)
230
assert_frame_equal(result, df.filter(pred))
231
232
captured = capfd.readouterr().err
233
assert (
234
"[ParquetFileReader]: Predicate pushdown: reading 1 / 1 row groups" in captured
235
)
236
assert (
237
"[ParquetFileReader]: Predicate pushdown: reading 0 / 1 row groups" in captured
238
)
239
240
241
@pytest.mark.write_disk
242
def test_streaming_empty_parquet_16523(tmp_path: Path) -> None:
243
file_path = tmp_path / "foo.parquet"
244
df = pl.DataFrame({"a": []}, schema={"a": pl.Int32})
245
df.write_parquet(file_path)
246
q = pl.scan_parquet(file_path)
247
q2 = pl.LazyFrame({"a": [1]}, schema={"a": pl.Int32})
248
assert q.join(q2, on="a").collect(engine="streaming").shape == (0, 1)
249
250
251
@pytest.mark.parametrize(
252
"method",
253
["parquet", "csv", "ipc", "ndjson"],
254
)
255
@pytest.mark.write_disk
256
def test_sink_phases(tmp_path: Path, method: str) -> None:
257
df = pl.DataFrame(
258
{
259
"a": [1, 2, 3, 4, 5, 6, 7],
260
"b": [
261
"some",
262
"text",
263
"over-here-is-very-long",
264
"and",
265
"some",
266
"more",
267
"text",
268
],
269
}
270
)
271
272
# Ordered Unions lead to many phase transitions.
273
ref_df = pl.concat([df] * 100)
274
lf = pl.concat([df.lazy()] * 100)
275
276
(getattr(lf, f"sink_{method}"))(tmp_path / f"t.{method}", engine="streaming")
277
df = (getattr(pl, f"scan_{method}"))(tmp_path / f"t.{method}").collect()
278
279
assert_frame_equal(df, ref_df)
280
281
(getattr(lf, f"sink_{method}"))(
282
tmp_path / f"t.{method}", maintain_order=False, engine="streaming"
283
)
284
height = (
285
(getattr(pl, f"scan_{method}"))(tmp_path / f"t.{method}")
286
.select(pl.len())
287
.collect()[0, 0]
288
)
289
assert height == ref_df.height
290
291
292
def test_empty_sink_parquet_join_14863(tmp_path: Path) -> None:
293
file_path = tmp_path / "empty.parquet"
294
lf = pl.LazyFrame(schema=["a", "b", "c"]).cast(pl.String)
295
lf.sink_parquet(file_path)
296
assert_frame_equal(
297
pl.LazyFrame({"a": ["uno"]}).join(pl.scan_parquet(file_path), on="a").collect(),
298
lf.collect(),
299
)
300
301
302
@pytest.mark.write_disk
303
def test_scan_non_existent_file_21527() -> None:
304
with pytest.raises(
305
FileNotFoundError,
306
match=r"a-file-that-does-not-exist",
307
):
308
pl.scan_parquet("a-file-that-does-not-exist").sink_ipc(
309
"x.ipc", engine="streaming"
310
)
311
312