Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/streaming/test_streaming_io.py
6939 views
1
from __future__ import annotations
2
3
from typing import TYPE_CHECKING, Any
4
5
import pytest
6
7
import polars as pl
8
from polars.testing import assert_frame_equal
9
10
if TYPE_CHECKING:
11
from pathlib import Path
12
13
pytestmark = pytest.mark.xdist_group("streaming")
14
15
16
@pytest.mark.write_disk
17
def test_streaming_parquet_glob_5900(df: pl.DataFrame, tmp_path: Path) -> None:
18
tmp_path.mkdir(exist_ok=True)
19
file_path = tmp_path / "small.parquet"
20
df.write_parquet(file_path)
21
22
path_glob = tmp_path / "small*.parquet"
23
result = (
24
pl.scan_parquet(path_glob).select(pl.all().first()).collect(engine="streaming")
25
)
26
assert result.shape == (1, df.width)
27
28
29
def test_scan_slice_streaming(io_files_path: Path) -> None:
30
foods_file_path = io_files_path / "foods1.csv"
31
df = pl.scan_csv(foods_file_path).head(5).collect(engine="streaming")
32
assert df.shape == (5, 4)
33
34
# globbing
35
foods_file_path = io_files_path / "foods*.csv"
36
df = pl.scan_csv(foods_file_path).head(5).collect(engine="streaming")
37
assert df.shape == (5, 4)
38
39
40
@pytest.mark.parametrize("dtype", [pl.Int8, pl.UInt8, pl.Int16, pl.UInt16])
41
def test_scan_csv_overwrite_small_dtypes(
42
io_files_path: Path, dtype: pl.DataType
43
) -> None:
44
file_path = io_files_path / "foods1.csv"
45
df = pl.scan_csv(file_path, schema_overrides={"sugars_g": dtype}).collect(
46
engine="streaming"
47
)
48
assert df.dtypes == [pl.String, pl.Int64, pl.Float64, dtype]
49
50
51
@pytest.mark.write_disk
52
def test_sink_parquet(io_files_path: Path, tmp_path: Path) -> None:
53
tmp_path.mkdir(exist_ok=True)
54
55
file = io_files_path / "small.parquet"
56
57
file_path = tmp_path / "sink.parquet"
58
59
df_scanned = pl.scan_parquet(file)
60
df_scanned.sink_parquet(file_path)
61
62
result = pl.read_parquet(file_path)
63
df_read = pl.read_parquet(file)
64
assert_frame_equal(result, df_read)
65
66
67
@pytest.mark.write_disk
68
def test_sink_parquet_10115(tmp_path: Path) -> None:
69
in_path = tmp_path / "in.parquet"
70
out_path = tmp_path / "out.parquet"
71
72
# this fails if the schema will be incorrectly due to the projection
73
# pushdown
74
(pl.DataFrame([{"x": 1, "y": "foo"}]).write_parquet(in_path))
75
76
joiner = pl.LazyFrame([{"y": "foo", "z": "_"}])
77
78
(
79
pl.scan_parquet(in_path)
80
.join(joiner, how="left", on="y")
81
.select("x", "y", "z")
82
.sink_parquet(out_path) #
83
)
84
85
assert pl.read_parquet(out_path).to_dict(as_series=False) == {
86
"x": [1],
87
"y": ["foo"],
88
"z": ["_"],
89
}
90
91
92
@pytest.mark.write_disk
93
def test_sink_ipc(io_files_path: Path, tmp_path: Path) -> None:
94
tmp_path.mkdir(exist_ok=True)
95
96
file = io_files_path / "small.parquet"
97
98
file_path = tmp_path / "sink.ipc"
99
100
df_scanned = pl.scan_parquet(file)
101
df_scanned.sink_ipc(file_path)
102
103
result = pl.read_ipc(file_path)
104
df_read = pl.read_parquet(file)
105
assert_frame_equal(result, df_read)
106
107
108
@pytest.mark.write_disk
109
def test_sink_csv(io_files_path: Path, tmp_path: Path) -> None:
110
source_file = io_files_path / "small.parquet"
111
target_file = tmp_path / "sink.csv"
112
113
pl.scan_parquet(source_file).sink_csv(target_file)
114
115
source_data = pl.read_parquet(source_file)
116
target_data = pl.read_csv(target_file)
117
assert_frame_equal(target_data, source_data)
118
119
120
@pytest.mark.write_disk
121
def test_sink_csv_14494(tmp_path: Path) -> None:
122
pl.LazyFrame({"c": [1, 2, 3]}, schema={"c": pl.Int64}).filter(
123
pl.col("c") > 10
124
).sink_csv(tmp_path / "sink.csv")
125
assert pl.read_csv(tmp_path / "sink.csv").columns == ["c"]
126
127
128
@pytest.mark.parametrize(("value"), ["abc", ""])
129
def test_sink_csv_exception_for_separator(value: str) -> None:
130
df = pl.LazyFrame({"dummy": ["abc"]})
131
with pytest.raises(ValueError, match="should be a single byte character, but is"):
132
df.sink_csv("path", separator=value)
133
134
135
@pytest.mark.parametrize(("value"), ["abc", ""])
136
def test_sink_csv_exception_for_quote(value: str) -> None:
137
df = pl.LazyFrame({"dummy": ["abc"]})
138
with pytest.raises(ValueError, match="should be a single byte character, but is"):
139
df.sink_csv("path", quote_char=value)
140
141
142
def test_sink_csv_batch_size_zero() -> None:
143
lf = pl.LazyFrame({"a": [1, 2, 3], "b": [1, 2, 3]})
144
with pytest.raises(ValueError, match="invalid zero value"):
145
lf.sink_csv("test.csv", batch_size=0)
146
147
148
@pytest.mark.write_disk
149
def test_sink_csv_nested_data(tmp_path: Path) -> None:
150
tmp_path.mkdir(exist_ok=True)
151
path = tmp_path / "data.csv"
152
153
lf = pl.LazyFrame({"list": [[1, 2, 3, 4, 5]]})
154
with pytest.raises(
155
pl.exceptions.ComputeError, match="CSV format does not support nested data"
156
):
157
lf.sink_csv(path)
158
159
160
def test_scan_csv_only_header_10792(io_files_path: Path) -> None:
161
foods_file_path = io_files_path / "only_header.csv"
162
df = pl.scan_csv(foods_file_path).collect(engine="streaming")
163
assert df.to_dict(as_series=False) == {"Name": [], "Address": []}
164
165
166
def test_scan_empty_csv_10818(io_files_path: Path) -> None:
167
empty_file_path = io_files_path / "empty.csv"
168
df = pl.scan_csv(empty_file_path, raise_if_empty=False).collect(engine="streaming")
169
assert df.is_empty()
170
171
172
@pytest.mark.write_disk
173
def test_streaming_cross_join_schema(tmp_path: Path) -> None:
174
file_path = tmp_path / "temp.parquet"
175
a = pl.DataFrame({"a": [1, 2]}).lazy()
176
b = pl.DataFrame({"b": ["b"]}).lazy()
177
a.join(b, how="cross").sink_parquet(file_path)
178
read = pl.read_parquet(file_path, parallel="none")
179
assert read.to_dict(as_series=False) == {"a": [1, 2], "b": ["b", "b"]}
180
181
182
@pytest.mark.write_disk
183
def test_sink_ndjson_should_write_same_data(
184
io_files_path: Path, tmp_path: Path
185
) -> None:
186
tmp_path.mkdir(exist_ok=True)
187
188
source_path = io_files_path / "foods1.csv"
189
target_path = tmp_path / "foods_test.ndjson"
190
191
expected = pl.read_csv(source_path)
192
193
lf = pl.scan_csv(source_path)
194
lf.sink_ndjson(target_path)
195
df = pl.read_ndjson(target_path)
196
197
assert_frame_equal(df, expected)
198
199
200
@pytest.mark.write_disk
201
@pytest.mark.parametrize("streaming", [False, True])
202
def test_parquet_eq_statistics(
203
monkeypatch: Any, capfd: Any, tmp_path: Path, streaming: bool
204
) -> None:
205
tmp_path.mkdir(exist_ok=True)
206
207
monkeypatch.setenv("POLARS_VERBOSE", "1")
208
209
df = pl.DataFrame({"idx": pl.arange(100, 200, eager=True)}).with_columns(
210
(pl.col("idx") // 25).alias("part")
211
)
212
df = pl.concat(df.partition_by("part", as_dict=False), rechunk=False)
213
assert df.n_chunks("all") == [4, 4]
214
215
file_path = tmp_path / "stats.parquet"
216
df.write_parquet(file_path, statistics=True, use_pyarrow=False)
217
218
for pred in [
219
pl.col("idx") == 50,
220
pl.col("idx") == 150,
221
pl.col("idx") == 210,
222
]:
223
result = (
224
pl.scan_parquet(file_path)
225
.filter(pred)
226
.collect(engine="streaming" if streaming else "in-memory")
227
)
228
assert_frame_equal(result, df.filter(pred))
229
230
captured = capfd.readouterr().err
231
assert (
232
"[ParquetFileReader]: Predicate pushdown: reading 1 / 1 row groups" in captured
233
)
234
assert (
235
"[ParquetFileReader]: Predicate pushdown: reading 0 / 1 row groups" in captured
236
)
237
238
239
@pytest.mark.write_disk
240
def test_streaming_empty_parquet_16523(tmp_path: Path) -> None:
241
file_path = tmp_path / "foo.parquet"
242
df = pl.DataFrame({"a": []}, schema={"a": pl.Int32})
243
df.write_parquet(file_path)
244
q = pl.scan_parquet(file_path)
245
q2 = pl.LazyFrame({"a": [1]}, schema={"a": pl.Int32})
246
assert q.join(q2, on="a").collect(engine="streaming").shape == (0, 1)
247
248
249
@pytest.mark.parametrize(
250
"method",
251
["parquet", "csv", "ipc", "ndjson"],
252
)
253
@pytest.mark.write_disk
254
def test_sink_phases(tmp_path: Path, method: str) -> None:
255
df = pl.DataFrame(
256
{
257
"a": [1, 2, 3, 4, 5, 6, 7],
258
"b": [
259
"some",
260
"text",
261
"over-here-is-very-long",
262
"and",
263
"some",
264
"more",
265
"text",
266
],
267
}
268
)
269
270
# Ordered Unions lead to many phase transitions.
271
ref_df = pl.concat([df] * 100)
272
lf = pl.concat([df.lazy()] * 100)
273
274
(getattr(lf, f"sink_{method}"))(tmp_path / f"t.{method}", engine="streaming")
275
df = (getattr(pl, f"scan_{method}"))(tmp_path / f"t.{method}").collect()
276
277
assert_frame_equal(df, ref_df)
278
279
(getattr(lf, f"sink_{method}"))(
280
tmp_path / f"t.{method}", maintain_order=False, engine="streaming"
281
)
282
height = (
283
(getattr(pl, f"scan_{method}"))(tmp_path / f"t.{method}")
284
.select(pl.len())
285
.collect()[0, 0]
286
)
287
assert height == ref_df.height
288
289
290
def test_empty_sink_parquet_join_14863(tmp_path: Path) -> None:
291
file_path = tmp_path / "empty.parquet"
292
lf = pl.LazyFrame(schema=["a", "b", "c"]).cast(pl.String)
293
lf.sink_parquet(file_path)
294
assert_frame_equal(
295
pl.LazyFrame({"a": ["uno"]}).join(pl.scan_parquet(file_path), on="a").collect(),
296
lf.collect(),
297
)
298
299
300
@pytest.mark.write_disk
301
def test_scan_non_existent_file_21527() -> None:
302
with pytest.raises(
303
FileNotFoundError,
304
match=r"a-file-that-does-not-exist",
305
):
306
pl.scan_parquet("a-file-that-does-not-exist").sink_ipc(
307
"x.ipc", engine="streaming"
308
)
309
310