Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/streaming/test_streaming.py
8409 views
1
from __future__ import annotations
2
3
import os
4
import time
5
from datetime import date
6
from pathlib import Path
7
from typing import TYPE_CHECKING, Any
8
9
import numpy as np
10
import pytest
11
12
import polars as pl
13
from polars.exceptions import PolarsInefficientMapWarning
14
from polars.testing import assert_frame_equal
15
16
if TYPE_CHECKING:
17
from polars._typing import JoinStrategy
18
from tests.conftest import PlMonkeyPatch
19
20
pytestmark = pytest.mark.xdist_group("streaming")
21
22
23
def test_streaming_categoricals_5921() -> None:
24
out_lazy = (
25
pl.DataFrame({"X": ["a", "a", "a", "b", "b"], "Y": [2, 2, 2, 1, 1]})
26
.lazy()
27
.with_columns(pl.col("X").cast(pl.Categorical))
28
.group_by("X")
29
.agg(pl.col("Y").min())
30
.sort("Y", descending=True)
31
.collect(engine="streaming")
32
)
33
34
out_eager = (
35
pl.DataFrame({"X": ["a", "a", "a", "b", "b"], "Y": [2, 2, 2, 1, 1]})
36
.with_columns(pl.col("X").cast(pl.Categorical))
37
.group_by("X")
38
.agg(pl.col("Y").min())
39
.sort("Y", descending=True)
40
)
41
42
for out in [out_eager, out_lazy]:
43
assert out.dtypes == [pl.Categorical, pl.Int64]
44
assert out.to_dict(as_series=False) == {"X": ["a", "b"], "Y": [2, 1]}
45
46
47
def test_streaming_block_on_literals_6054() -> None:
48
df = pl.DataFrame({"col_1": [0] * 5 + [1] * 5})
49
s = pl.Series("col_2", list(range(10)))
50
51
assert df.lazy().with_columns(s).group_by("col_1").agg(pl.all().first()).collect(
52
engine="streaming"
53
).sort("col_1").to_dict(as_series=False) == {"col_1": [0, 1], "col_2": [0, 5]}
54
55
56
@pytest.mark.may_fail_auto_streaming
57
@pytest.mark.may_fail_cloud # reason: non-pure map_batches
58
def test_streaming_streamable_functions(
59
plmonkeypatch: PlMonkeyPatch, capfd: Any
60
) -> None:
61
plmonkeypatch.setenv("POLARS_IDEAL_MORSEL_SIZE", "1")
62
calls = 0
63
64
def func(df: pl.DataFrame) -> pl.DataFrame:
65
nonlocal calls
66
calls += 1
67
return df.with_columns(pl.col("a").alias("b"))
68
69
assert (
70
pl.DataFrame({"a": list(range(100))})
71
.lazy()
72
.map_batches(
73
function=func,
74
schema={"a": pl.Int64, "b": pl.Int64},
75
streamable=True,
76
)
77
).collect(engine="streaming").to_dict(as_series=False) == {
78
"a": list(range(100)),
79
"b": list(range(100)),
80
}
81
82
assert calls > 1
83
84
85
@pytest.mark.slow
86
@pytest.mark.may_fail_auto_streaming
87
@pytest.mark.may_fail_cloud # reason: timing
88
def test_cross_join_stack() -> None:
89
morsel_size = os.environ.get("POLARS_IDEAL_MORSEL_SIZE")
90
if morsel_size is not None and int(morsel_size) < 1000:
91
pytest.skip("test is too slow for small morsel sizes")
92
93
a = pl.Series(np.arange(100_000)).to_frame().lazy()
94
t0 = time.time()
95
assert a.join(a, how="cross").head().collect(engine="streaming").shape == (5, 2)
96
t1 = time.time()
97
assert (t1 - t0) < 0.5
98
99
100
def test_streaming_literal_expansion() -> None:
101
df = pl.DataFrame(
102
{
103
"y": ["a", "b"],
104
"z": [1, 2],
105
}
106
)
107
108
q = df.lazy().select(
109
x=pl.lit("constant"),
110
y=pl.col("y"),
111
z=pl.col("z"),
112
)
113
114
assert q.collect(engine="streaming").to_dict(as_series=False) == {
115
"x": ["constant", "constant"],
116
"y": ["a", "b"],
117
"z": [1, 2],
118
}
119
assert q.group_by(["x", "y"]).agg(pl.mean("z")).sort("y").collect(
120
engine="streaming"
121
).to_dict(as_series=False) == {
122
"x": ["constant", "constant"],
123
"y": ["a", "b"],
124
"z": [1.0, 2.0],
125
}
126
assert q.group_by(["x"]).agg(pl.mean("z")).collect().to_dict(as_series=False) == {
127
"x": ["constant"],
128
"z": [1.5],
129
}
130
131
132
@pytest.mark.may_fail_auto_streaming
133
def test_streaming_apply(plmonkeypatch: PlMonkeyPatch, capfd: Any) -> None:
134
plmonkeypatch.setenv("POLARS_VERBOSE", "1")
135
136
q = pl.DataFrame({"a": [1, 2]}).lazy()
137
with pytest.warns(
138
PolarsInefficientMapWarning,
139
match="with this one instead",
140
):
141
(
142
q.select(
143
pl.col("a").map_elements(lambda x: x * 2, return_dtype=pl.Int64)
144
).collect(engine="streaming")
145
)
146
147
148
def test_streaming_sortedness_propagation_9494() -> None:
149
assert (
150
pl.DataFrame(
151
{
152
"when": [date(2023, 5, 10), date(2023, 5, 20), date(2023, 6, 10)],
153
"what": [1, 2, 3],
154
}
155
)
156
.lazy()
157
.sort("when")
158
.group_by_dynamic("when", every="1mo")
159
.agg(pl.col("what").sum())
160
.collect(engine="streaming")
161
).to_dict(as_series=False) == {
162
"when": [date(2023, 5, 1), date(2023, 6, 1)],
163
"what": [3, 3],
164
}
165
166
167
@pytest.mark.write_disk
168
@pytest.mark.slow
169
def test_streaming_generic_left_and_inner_join_from_disk(tmp_path: Path) -> None:
170
morsel_size = os.environ.get("POLARS_IDEAL_MORSEL_SIZE")
171
if morsel_size is not None and int(morsel_size) < 1000:
172
pytest.skip("test is too slow for small morsel sizes")
173
174
tmp_path.mkdir(exist_ok=True)
175
p0 = tmp_path / "df0.parquet"
176
p1 = tmp_path / "df1.parquet"
177
# by loading from disk, we get different chunks
178
n = 200_000
179
k = 100
180
181
d0: dict[str, np.ndarray[Any, Any]] = {
182
f"x{i}": np.random.random(n) for i in range(k)
183
}
184
d0.update({"id": np.arange(n)})
185
186
df0 = pl.DataFrame(d0)
187
df1 = df0.clone().select(pl.all().shuffle(111))
188
189
df0.write_parquet(p0)
190
df1.write_parquet(p1)
191
192
lf0 = pl.scan_parquet(p0)
193
lf1 = pl.scan_parquet(p1).select(pl.all().name.suffix("_r"))
194
195
join_strategies: list[JoinStrategy] = ["left", "inner"]
196
for how in join_strategies:
197
assert_frame_equal(
198
lf0.join(lf1, left_on="id", right_on="id_r", how=how).collect(
199
engine="streaming"
200
),
201
lf0.join(lf1, left_on="id", right_on="id_r", how=how).collect(
202
engine="in-memory"
203
),
204
check_row_order=False,
205
)
206
207
208
def test_streaming_9776() -> None:
209
df = pl.DataFrame({"col_1": ["a"] * 1000, "ID": [None] + ["a"] * 999})
210
ordered = (
211
df.group_by("col_1", "ID", maintain_order=True)
212
.len()
213
.filter(pl.col("col_1") == "a")
214
)
215
unordered = (
216
df.group_by("col_1", "ID", maintain_order=False)
217
.len()
218
.filter(pl.col("col_1") == "a")
219
)
220
expected = [("a", None, 1), ("a", "a", 999)]
221
assert ordered.rows() == expected
222
assert unordered.sort(["col_1", "ID"]).rows() == expected
223
224
225
@pytest.mark.write_disk
226
def test_stream_empty_file(tmp_path: Path) -> None:
227
p = tmp_path / "in.parquet"
228
schema = {
229
"KLN_NR": pl.String,
230
}
231
232
df = pl.DataFrame(
233
{
234
"KLN_NR": [],
235
},
236
schema=schema,
237
)
238
df.write_parquet(p)
239
assert pl.scan_parquet(p).collect(engine="streaming").schema == schema
240
241
242
def test_streaming_empty_df() -> None:
243
df = pl.DataFrame(
244
[
245
pl.Series("a", ["a", "b", "c", "b", "a", "a"], dtype=pl.Categorical()),
246
pl.Series("b", ["b", "c", "c", "b", "a", "c"], dtype=pl.Categorical()),
247
]
248
)
249
250
result = (
251
df.lazy()
252
.join(df.lazy(), on="a", how="inner")
253
.filter(False)
254
.collect(engine="streaming")
255
)
256
257
assert result.to_dict(as_series=False) == {"a": [], "b": [], "b_right": []}
258
259
260
def test_streaming_duplicate_cols_5537() -> None:
261
assert pl.DataFrame({"a": [1, 2, 3], "b": [1, 2, 3]}).lazy().with_columns(
262
(pl.col("a") * 2).alias("foo"), (pl.col("a") * 3)
263
).collect(engine="streaming").to_dict(as_series=False) == {
264
"a": [3, 6, 9],
265
"b": [1, 2, 3],
266
"foo": [2, 4, 6],
267
}
268
269
270
def test_null_sum_streaming_10455() -> None:
271
df = pl.DataFrame(
272
{
273
"x": [1] * 10,
274
"y": [None] * 10,
275
},
276
schema={"x": pl.Int64, "y": pl.Float32},
277
)
278
result = df.lazy().group_by("x").sum().collect(engine="streaming")
279
expected = {"x": [1], "y": [0.0]}
280
assert result.to_dict(as_series=False) == expected
281
282
283
def test_boolean_agg_schema() -> None:
284
df = pl.DataFrame(
285
{
286
"x": [1, 1, 1],
287
"y": [False, True, False],
288
}
289
).lazy()
290
291
agg_df = df.group_by("x").agg(pl.col("y").max().alias("max_y"))
292
293
for streaming in [True, False]:
294
assert (
295
agg_df.collect(engine="streaming" if streaming else "in-memory").schema
296
== agg_df.collect_schema()
297
== {"x": pl.Int64, "max_y": pl.Boolean}
298
)
299
300
301
@pytest.mark.write_disk
302
def test_streaming_csv_headers_but_no_data_13770(tmp_path: Path) -> None:
303
with Path.open(tmp_path / "header_no_data.csv", "w") as f:
304
f.write("name, age\n")
305
306
schema = {"name": pl.String, "age": pl.Int32}
307
df = (
308
pl.scan_csv(tmp_path / "header_no_data.csv", schema=schema)
309
.head()
310
.collect(engine="streaming")
311
)
312
assert df.height == 0
313
assert df.schema == schema
314
315
316
@pytest.mark.write_disk
317
def test_streaming_with_hconcat(tmp_path: Path) -> None:
318
df1 = pl.DataFrame(
319
{
320
"id": [0, 0, 1, 1, 2, 2],
321
"x": [0.0, 1.0, 2.0, 3.0, 4.0, 5.0],
322
}
323
)
324
df1.write_parquet(tmp_path / "df1.parquet")
325
326
df2 = pl.DataFrame(
327
{
328
"y": [6.0, 7.0, 8.0, 9.0, 10.0, 11.0],
329
}
330
)
331
df2.write_parquet(tmp_path / "df2.parquet")
332
333
lf1 = pl.scan_parquet(tmp_path / "df1.parquet")
334
lf2 = pl.scan_parquet(tmp_path / "df2.parquet")
335
query = (
336
pl.concat([lf1, lf2], how="horizontal")
337
.group_by("id")
338
.agg(pl.all().mean())
339
.sort(pl.col("id"))
340
)
341
342
result = query.collect(engine="streaming")
343
344
expected = pl.DataFrame(
345
{
346
"id": [0, 1, 2],
347
"x": [0.5, 2.5, 4.5],
348
"y": [6.5, 8.5, 10.5],
349
}
350
)
351
352
assert_frame_equal(result, expected)
353
354
355
@pytest.mark.write_disk
356
def test_elementwise_identification_in_ternary_15767(tmp_path: Path) -> None:
357
tmp_path.mkdir(exist_ok=True)
358
359
(
360
pl.LazyFrame({"a": pl.Series([1])})
361
.with_columns(b=pl.col("a").is_in(pl.Series([1, 2, 3])))
362
.sink_parquet(tmp_path / "1")
363
)
364
365
(
366
pl.LazyFrame({"a": pl.Series([1])})
367
.with_columns(
368
b=pl.when(pl.col("a").is_in(pl.Series([1, 2, 3]))).then(pl.col("a"))
369
)
370
.sink_parquet(tmp_path / "1")
371
)
372
373
374
def test_streaming_temporal_17669() -> None:
375
df = (
376
pl.LazyFrame({"a": [1, 2, 3]}, schema={"a": pl.Datetime("us")})
377
.with_columns(
378
b=pl.col("a").dt.date(),
379
c=pl.col("a").dt.time(),
380
)
381
.collect(engine="streaming")
382
)
383
assert df.schema == {
384
"a": pl.Datetime("us"),
385
"b": pl.Date,
386
"c": pl.Time,
387
}
388
389
390
def test_i128_sum_reduction() -> None:
391
assert (
392
pl.Series("a", [1, 2, 3], pl.Int128)
393
.to_frame()
394
.lazy()
395
.sum()
396
.collect(engine="streaming")
397
.item()
398
== 6
399
)
400
401