Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/streaming/test_streaming.py
6939 views
1
from __future__ import annotations
2
3
import time
4
from datetime import date
5
from pathlib import Path
6
from typing import TYPE_CHECKING, Any
7
8
import numpy as np
9
import pytest
10
11
import polars as pl
12
from polars.exceptions import PolarsInefficientMapWarning
13
from polars.testing import assert_frame_equal
14
15
if TYPE_CHECKING:
16
from polars._typing import JoinStrategy
17
18
pytestmark = pytest.mark.xdist_group("streaming")
19
20
21
def test_streaming_categoricals_5921() -> None:
22
out_lazy = (
23
pl.DataFrame({"X": ["a", "a", "a", "b", "b"], "Y": [2, 2, 2, 1, 1]})
24
.lazy()
25
.with_columns(pl.col("X").cast(pl.Categorical))
26
.group_by("X")
27
.agg(pl.col("Y").min())
28
.sort("Y", descending=True)
29
.collect(engine="streaming")
30
)
31
32
out_eager = (
33
pl.DataFrame({"X": ["a", "a", "a", "b", "b"], "Y": [2, 2, 2, 1, 1]})
34
.with_columns(pl.col("X").cast(pl.Categorical))
35
.group_by("X")
36
.agg(pl.col("Y").min())
37
.sort("Y", descending=True)
38
)
39
40
for out in [out_eager, out_lazy]:
41
assert out.dtypes == [pl.Categorical, pl.Int64]
42
assert out.to_dict(as_series=False) == {"X": ["a", "b"], "Y": [2, 1]}
43
44
45
def test_streaming_block_on_literals_6054() -> None:
46
df = pl.DataFrame({"col_1": [0] * 5 + [1] * 5})
47
s = pl.Series("col_2", list(range(10)))
48
49
assert df.lazy().with_columns(s).group_by("col_1").agg(pl.all().first()).collect(
50
engine="streaming"
51
).sort("col_1").to_dict(as_series=False) == {"col_1": [0, 1], "col_2": [0, 5]}
52
53
54
@pytest.mark.may_fail_auto_streaming
55
@pytest.mark.may_fail_cloud # reason: non-pure map_batches
56
def test_streaming_streamable_functions(monkeypatch: Any, capfd: Any) -> None:
57
monkeypatch.setenv("POLARS_IDEAL_MORSEL_SIZE", "1")
58
calls = 0
59
60
def func(df: pl.DataFrame) -> pl.DataFrame:
61
nonlocal calls
62
calls += 1
63
return df.with_columns(pl.col("a").alias("b"))
64
65
assert (
66
pl.DataFrame({"a": list(range(100))})
67
.lazy()
68
.map_batches(
69
function=func,
70
schema={"a": pl.Int64, "b": pl.Int64},
71
streamable=True,
72
)
73
).collect(engine="streaming").to_dict(as_series=False) == {
74
"a": list(range(100)),
75
"b": list(range(100)),
76
}
77
78
assert calls > 1
79
80
81
@pytest.mark.slow
82
@pytest.mark.may_fail_auto_streaming
83
@pytest.mark.may_fail_cloud # reason: timing
84
def test_cross_join_stack() -> None:
85
a = pl.Series(np.arange(100_000)).to_frame().lazy()
86
t0 = time.time()
87
assert a.join(a, how="cross").head().collect(engine="streaming").shape == (5, 2)
88
t1 = time.time()
89
assert (t1 - t0) < 0.5
90
91
92
def test_streaming_literal_expansion() -> None:
93
df = pl.DataFrame(
94
{
95
"y": ["a", "b"],
96
"z": [1, 2],
97
}
98
)
99
100
q = df.lazy().select(
101
x=pl.lit("constant"),
102
y=pl.col("y"),
103
z=pl.col("z"),
104
)
105
106
assert q.collect(engine="streaming").to_dict(as_series=False) == {
107
"x": ["constant", "constant"],
108
"y": ["a", "b"],
109
"z": [1, 2],
110
}
111
assert q.group_by(["x", "y"]).agg(pl.mean("z")).sort("y").collect(
112
engine="streaming"
113
).to_dict(as_series=False) == {
114
"x": ["constant", "constant"],
115
"y": ["a", "b"],
116
"z": [1.0, 2.0],
117
}
118
assert q.group_by(["x"]).agg(pl.mean("z")).collect().to_dict(as_series=False) == {
119
"x": ["constant"],
120
"z": [1.5],
121
}
122
123
124
@pytest.mark.may_fail_auto_streaming
125
def test_streaming_apply(monkeypatch: Any, capfd: Any) -> None:
126
monkeypatch.setenv("POLARS_VERBOSE", "1")
127
128
q = pl.DataFrame({"a": [1, 2]}).lazy()
129
with pytest.warns(PolarsInefficientMapWarning, match="with this one instead"):
130
(
131
q.select(
132
pl.col("a").map_elements(lambda x: x * 2, return_dtype=pl.Int64)
133
).collect(engine="streaming")
134
)
135
136
137
def test_streaming_sortedness_propagation_9494() -> None:
138
assert (
139
pl.DataFrame(
140
{
141
"when": [date(2023, 5, 10), date(2023, 5, 20), date(2023, 6, 10)],
142
"what": [1, 2, 3],
143
}
144
)
145
.lazy()
146
.sort("when")
147
.group_by_dynamic("when", every="1mo")
148
.agg(pl.col("what").sum())
149
.collect(engine="streaming")
150
).to_dict(as_series=False) == {
151
"when": [date(2023, 5, 1), date(2023, 6, 1)],
152
"what": [3, 3],
153
}
154
155
156
@pytest.mark.write_disk
157
@pytest.mark.slow
158
def test_streaming_generic_left_and_inner_join_from_disk(tmp_path: Path) -> None:
159
tmp_path.mkdir(exist_ok=True)
160
p0 = tmp_path / "df0.parquet"
161
p1 = tmp_path / "df1.parquet"
162
# by loading from disk, we get different chunks
163
n = 200_000
164
k = 100
165
166
d0: dict[str, np.ndarray[Any, Any]] = {
167
f"x{i}": np.random.random(n) for i in range(k)
168
}
169
d0.update({"id": np.arange(n)})
170
171
df0 = pl.DataFrame(d0)
172
df1 = df0.clone().select(pl.all().shuffle(111))
173
174
df0.write_parquet(p0)
175
df1.write_parquet(p1)
176
177
lf0 = pl.scan_parquet(p0)
178
lf1 = pl.scan_parquet(p1).select(pl.all().name.suffix("_r"))
179
180
join_strategies: list[JoinStrategy] = ["left", "inner"]
181
for how in join_strategies:
182
assert_frame_equal(
183
lf0.join(
184
lf1, left_on="id", right_on="id_r", how=how, maintain_order="left"
185
).collect(engine="streaming"),
186
lf0.join(lf1, left_on="id", right_on="id_r", how=how).collect(
187
engine="in-memory"
188
),
189
check_row_order=how == "left",
190
)
191
192
193
def test_streaming_9776() -> None:
194
df = pl.DataFrame({"col_1": ["a"] * 1000, "ID": [None] + ["a"] * 999})
195
ordered = (
196
df.group_by("col_1", "ID", maintain_order=True)
197
.len()
198
.filter(pl.col("col_1") == "a")
199
)
200
unordered = (
201
df.group_by("col_1", "ID", maintain_order=False)
202
.len()
203
.filter(pl.col("col_1") == "a")
204
)
205
expected = [("a", None, 1), ("a", "a", 999)]
206
assert ordered.rows() == expected
207
assert unordered.sort(["col_1", "ID"]).rows() == expected
208
209
210
@pytest.mark.write_disk
211
def test_stream_empty_file(tmp_path: Path) -> None:
212
p = tmp_path / "in.parquet"
213
schema = {
214
"KLN_NR": pl.String,
215
}
216
217
df = pl.DataFrame(
218
{
219
"KLN_NR": [],
220
},
221
schema=schema,
222
)
223
df.write_parquet(p)
224
assert pl.scan_parquet(p).collect(engine="streaming").schema == schema
225
226
227
def test_streaming_empty_df() -> None:
228
df = pl.DataFrame(
229
[
230
pl.Series("a", ["a", "b", "c", "b", "a", "a"], dtype=pl.Categorical()),
231
pl.Series("b", ["b", "c", "c", "b", "a", "c"], dtype=pl.Categorical()),
232
]
233
)
234
235
result = (
236
df.lazy()
237
.join(df.lazy(), on="a", how="inner")
238
.filter(False)
239
.collect(engine="streaming")
240
)
241
242
assert result.to_dict(as_series=False) == {"a": [], "b": [], "b_right": []}
243
244
245
def test_streaming_duplicate_cols_5537() -> None:
246
assert pl.DataFrame({"a": [1, 2, 3], "b": [1, 2, 3]}).lazy().with_columns(
247
(pl.col("a") * 2).alias("foo"), (pl.col("a") * 3)
248
).collect(engine="streaming").to_dict(as_series=False) == {
249
"a": [3, 6, 9],
250
"b": [1, 2, 3],
251
"foo": [2, 4, 6],
252
}
253
254
255
def test_null_sum_streaming_10455() -> None:
256
df = pl.DataFrame(
257
{
258
"x": [1] * 10,
259
"y": [None] * 10,
260
},
261
schema={"x": pl.Int64, "y": pl.Float32},
262
)
263
result = df.lazy().group_by("x").sum().collect(engine="streaming")
264
expected = {"x": [1], "y": [0.0]}
265
assert result.to_dict(as_series=False) == expected
266
267
268
def test_boolean_agg_schema() -> None:
269
df = pl.DataFrame(
270
{
271
"x": [1, 1, 1],
272
"y": [False, True, False],
273
}
274
).lazy()
275
276
agg_df = df.group_by("x").agg(pl.col("y").max().alias("max_y"))
277
278
for streaming in [True, False]:
279
assert (
280
agg_df.collect(engine="streaming" if streaming else "in-memory").schema
281
== agg_df.collect_schema()
282
== {"x": pl.Int64, "max_y": pl.Boolean}
283
)
284
285
286
@pytest.mark.write_disk
287
def test_streaming_csv_headers_but_no_data_13770(tmp_path: Path) -> None:
288
with Path.open(tmp_path / "header_no_data.csv", "w") as f:
289
f.write("name, age\n")
290
291
schema = {"name": pl.String, "age": pl.Int32}
292
df = (
293
pl.scan_csv(tmp_path / "header_no_data.csv", schema=schema)
294
.head()
295
.collect(engine="streaming")
296
)
297
assert df.height == 0
298
assert df.schema == schema
299
300
301
@pytest.mark.write_disk
302
def test_streaming_with_hconcat(tmp_path: Path) -> None:
303
df1 = pl.DataFrame(
304
{
305
"id": [0, 0, 1, 1, 2, 2],
306
"x": [0.0, 1.0, 2.0, 3.0, 4.0, 5.0],
307
}
308
)
309
df1.write_parquet(tmp_path / "df1.parquet")
310
311
df2 = pl.DataFrame(
312
{
313
"y": [6.0, 7.0, 8.0, 9.0, 10.0, 11.0],
314
}
315
)
316
df2.write_parquet(tmp_path / "df2.parquet")
317
318
lf1 = pl.scan_parquet(tmp_path / "df1.parquet")
319
lf2 = pl.scan_parquet(tmp_path / "df2.parquet")
320
query = (
321
pl.concat([lf1, lf2], how="horizontal")
322
.group_by("id")
323
.agg(pl.all().mean())
324
.sort(pl.col("id"))
325
)
326
327
result = query.collect(engine="streaming")
328
329
expected = pl.DataFrame(
330
{
331
"id": [0, 1, 2],
332
"x": [0.5, 2.5, 4.5],
333
"y": [6.5, 8.5, 10.5],
334
}
335
)
336
337
assert_frame_equal(result, expected)
338
339
340
@pytest.mark.write_disk
341
def test_elementwise_identification_in_ternary_15767(tmp_path: Path) -> None:
342
tmp_path.mkdir(exist_ok=True)
343
344
(
345
pl.LazyFrame({"a": pl.Series([1])})
346
.with_columns(b=pl.col("a").is_in(pl.Series([1, 2, 3])))
347
.sink_parquet(tmp_path / "1")
348
)
349
350
(
351
pl.LazyFrame({"a": pl.Series([1])})
352
.with_columns(
353
b=pl.when(pl.col("a").is_in(pl.Series([1, 2, 3]))).then(pl.col("a"))
354
)
355
.sink_parquet(tmp_path / "1")
356
)
357
358
359
def test_streaming_temporal_17669() -> None:
360
df = (
361
pl.LazyFrame({"a": [1, 2, 3]}, schema={"a": pl.Datetime("us")})
362
.with_columns(
363
b=pl.col("a").dt.date(),
364
c=pl.col("a").dt.time(),
365
)
366
.collect(engine="streaming")
367
)
368
assert df.schema == {
369
"a": pl.Datetime("us"),
370
"b": pl.Date,
371
"c": pl.Time,
372
}
373
374
375
def test_i128_sum_reduction() -> None:
376
assert (
377
pl.Series("a", [1, 2, 3], pl.Int128)
378
.to_frame()
379
.lazy()
380
.sum()
381
.collect(engine="streaming")
382
.item()
383
== 6
384
)
385
386