Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/streaming/test_streaming_group_by.py
6939 views
1
from __future__ import annotations
2
3
from datetime import date
4
from typing import TYPE_CHECKING, Any
5
6
import numpy as np
7
import pytest
8
9
import polars as pl
10
from polars.exceptions import DuplicateError
11
from polars.testing import assert_frame_equal
12
from tests.unit.conftest import INTEGER_DTYPES
13
14
if TYPE_CHECKING:
15
from pathlib import Path
16
17
pytestmark = pytest.mark.xdist_group("streaming")
18
19
20
@pytest.mark.slow
21
def test_streaming_group_by_sorted_fast_path_nulls_10273() -> None:
22
df = pl.Series(
23
name="x",
24
values=(
25
*(i for i in range(4) for _ in range(100)),
26
*(None for _ in range(100)),
27
),
28
).to_frame()
29
30
assert (
31
df.set_sorted("x")
32
.lazy()
33
.group_by("x")
34
.agg(pl.len())
35
.collect(engine="streaming")
36
.sort("x")
37
).to_dict(as_series=False) == {
38
"x": [None, 0, 1, 2, 3],
39
"len": [100, 100, 100, 100, 100],
40
}
41
42
43
def test_streaming_group_by_types() -> None:
44
df = pl.DataFrame(
45
{
46
"person_id": [1, 1],
47
"year": [1995, 1995],
48
"person_name": ["bob", "foo"],
49
"bool": [True, False],
50
"date": [date(2022, 1, 1), date(2022, 1, 1)],
51
}
52
)
53
54
for by in ["person_id", "year", "date", ["person_id", "year"]]:
55
out = (
56
(
57
df.lazy()
58
.group_by(by)
59
.agg(
60
[
61
pl.col("person_name").first().alias("str_first"),
62
pl.col("person_name").last().alias("str_last"),
63
pl.col("person_name").mean().alias("str_mean"),
64
pl.col("person_name").sum().alias("str_sum"),
65
pl.col("bool").first().alias("bool_first"),
66
pl.col("bool").last().alias("bool_last"),
67
pl.col("bool").mean().alias("bool_mean"),
68
pl.col("bool").sum().alias("bool_sum"),
69
# pl.col("date").sum().alias("date_sum"),
70
# Date streaming mean/median has been temporarily disabled
71
# pl.col("date").mean().alias("date_mean"),
72
pl.col("date").first().alias("date_first"),
73
pl.col("date").last().alias("date_last"),
74
pl.col("date").min().alias("date_min"),
75
pl.col("date").max().alias("date_max"),
76
]
77
)
78
)
79
.select(pl.all().exclude(by))
80
.collect(engine="streaming")
81
)
82
assert out.schema == {
83
"str_first": pl.String,
84
"str_last": pl.String,
85
"str_mean": pl.String,
86
"str_sum": pl.String,
87
"bool_first": pl.Boolean,
88
"bool_last": pl.Boolean,
89
"bool_mean": pl.Float64,
90
"bool_sum": pl.UInt32,
91
# "date_sum": pl.Date,
92
# "date_mean": pl.Date,
93
"date_first": pl.Date,
94
"date_last": pl.Date,
95
"date_min": pl.Date,
96
"date_max": pl.Date,
97
}
98
99
assert out.to_dict(as_series=False) == {
100
"str_first": ["bob"],
101
"str_last": ["foo"],
102
"str_mean": [None],
103
"str_sum": [None],
104
"bool_first": [True],
105
"bool_last": [False],
106
"bool_mean": [0.5],
107
"bool_sum": [1],
108
# "date_sum": [None],
109
# Date streaming mean/median has been temporarily disabled
110
# "date_mean": [date(2022, 1, 1)],
111
"date_first": [date(2022, 1, 1)],
112
"date_last": [date(2022, 1, 1)],
113
"date_min": [date(2022, 1, 1)],
114
"date_max": [date(2022, 1, 1)],
115
}
116
117
with pytest.raises(DuplicateError):
118
(
119
df.lazy()
120
.group_by("person_id")
121
.agg(
122
[
123
pl.col("person_name").first().alias("str_first"),
124
pl.col("person_name").last().alias("str_last"),
125
pl.col("person_name").mean().alias("str_mean"),
126
pl.col("person_name").sum().alias("str_sum"),
127
pl.col("bool").first().alias("bool_first"),
128
pl.col("bool").last().alias("bool_first"),
129
]
130
)
131
.select(pl.all().exclude("person_id"))
132
.collect(engine="streaming")
133
)
134
135
136
def test_streaming_group_by_min_max() -> None:
137
df = pl.DataFrame(
138
{
139
"person_id": [1, 2, 3, 4, 5, 6],
140
"year": [1995, 1995, 1995, 2, 2, 2],
141
}
142
)
143
out = (
144
df.lazy()
145
.group_by("year")
146
.agg([pl.min("person_id").alias("min"), pl.max("person_id").alias("max")])
147
.collect()
148
.sort("year")
149
)
150
assert out["min"].to_list() == [4, 1]
151
assert out["max"].to_list() == [6, 3]
152
153
154
def test_streaming_non_streaming_gb() -> None:
155
n = 100
156
df = pl.DataFrame({"a": np.random.randint(0, 20, n)})
157
q = df.lazy().group_by("a").agg(pl.len()).sort("a")
158
assert_frame_equal(q.collect(engine="streaming"), q.collect())
159
160
q = df.lazy().with_columns(pl.col("a").cast(pl.String))
161
q = q.group_by("a").agg(pl.len()).sort("a")
162
assert_frame_equal(q.collect(engine="streaming"), q.collect())
163
q = df.lazy().with_columns(pl.col("a").alias("b"))
164
q = q.group_by(["a", "b"]).agg(pl.len(), pl.col("a").sum().alias("sum_a")).sort("a")
165
assert_frame_equal(q.collect(engine="streaming"), q.collect())
166
167
168
def test_streaming_group_by_sorted_fast_path() -> None:
169
a = np.random.randint(0, 20, 80)
170
df = pl.DataFrame(
171
{
172
# test on int8 as that also tests proper conversions
173
"a": pl.Series(np.sort(a), dtype=pl.Int8)
174
}
175
).with_row_index()
176
177
df_sorted = df.with_columns(pl.col("a").set_sorted())
178
179
for streaming in [True, False]:
180
results = []
181
for df_ in [df, df_sorted]:
182
out = (
183
df_.lazy()
184
.group_by("a")
185
.agg(
186
[
187
pl.first("a").alias("first"),
188
pl.last("a").alias("last"),
189
pl.sum("a").alias("sum"),
190
pl.mean("a").alias("mean"),
191
pl.count("a").alias("count"),
192
pl.min("a").alias("min"),
193
pl.max("a").alias("max"),
194
]
195
)
196
.sort("a")
197
.collect(engine="streaming" if streaming else "in-memory")
198
)
199
results.append(out)
200
201
assert_frame_equal(results[0], results[1])
202
203
204
@pytest.fixture(scope="module")
205
def random_integers() -> pl.Series:
206
np.random.seed(1)
207
return pl.Series("a", np.random.randint(0, 10, 100), dtype=pl.Int64)
208
209
210
@pytest.mark.write_disk
211
def test_streaming_group_by_ooc_q1(
212
random_integers: pl.Series,
213
tmp_path: Path,
214
monkeypatch: Any,
215
) -> None:
216
tmp_path.mkdir(exist_ok=True)
217
monkeypatch.setenv("POLARS_TEMP_DIR", str(tmp_path))
218
monkeypatch.setenv("POLARS_FORCE_OOC", "1")
219
220
lf = random_integers.to_frame().lazy()
221
result = (
222
lf.group_by("a")
223
.agg(pl.first("a").alias("a_first"), pl.last("a").alias("a_last"))
224
.sort("a")
225
.collect(engine="streaming")
226
)
227
228
expected = pl.DataFrame(
229
{
230
"a": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
231
"a_first": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
232
"a_last": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
233
}
234
)
235
assert_frame_equal(result, expected)
236
237
238
@pytest.mark.write_disk
239
def test_streaming_group_by_ooc_q2(
240
random_integers: pl.Series,
241
tmp_path: Path,
242
monkeypatch: Any,
243
) -> None:
244
tmp_path.mkdir(exist_ok=True)
245
monkeypatch.setenv("POLARS_TEMP_DIR", str(tmp_path))
246
monkeypatch.setenv("POLARS_FORCE_OOC", "1")
247
248
lf = random_integers.cast(str).to_frame().lazy()
249
result = (
250
lf.group_by("a")
251
.agg(pl.first("a").alias("a_first"), pl.last("a").alias("a_last"))
252
.sort("a")
253
.collect(engine="streaming")
254
)
255
256
expected = pl.DataFrame(
257
{
258
"a": ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"],
259
"a_first": ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"],
260
"a_last": ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"],
261
}
262
)
263
assert_frame_equal(result, expected)
264
265
266
@pytest.mark.write_disk
267
def test_streaming_group_by_ooc_q3(
268
random_integers: pl.Series,
269
tmp_path: Path,
270
monkeypatch: Any,
271
) -> None:
272
tmp_path.mkdir(exist_ok=True)
273
monkeypatch.setenv("POLARS_TEMP_DIR", str(tmp_path))
274
monkeypatch.setenv("POLARS_FORCE_OOC", "1")
275
276
lf = pl.LazyFrame({"a": random_integers, "b": random_integers})
277
result = (
278
lf.group_by("a", "b")
279
.agg(pl.first("a").alias("a_first"), pl.last("a").alias("a_last"))
280
.sort("a")
281
.collect(engine="streaming")
282
)
283
284
expected = pl.DataFrame(
285
{
286
"a": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
287
"b": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
288
"a_first": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
289
"a_last": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
290
}
291
)
292
assert_frame_equal(result, expected)
293
294
295
def test_streaming_group_by_struct_key() -> None:
296
df = pl.DataFrame(
297
{"A": [1, 2, 3, 2], "B": ["google", "ms", "apple", "ms"], "C": [2, 3, 4, 3]}
298
)
299
df1 = df.lazy().with_columns(pl.struct(["A", "C"]).alias("tuples"))
300
assert df1.group_by("tuples").agg(pl.len(), pl.col("B").first()).sort("B").collect(
301
engine="streaming"
302
).to_dict(as_series=False) == {
303
"tuples": [{"A": 3, "C": 4}, {"A": 1, "C": 2}, {"A": 2, "C": 3}],
304
"len": [1, 1, 2],
305
"B": ["apple", "google", "ms"],
306
}
307
308
309
@pytest.mark.slow
310
def test_streaming_group_by_all_numeric_types_stability_8570() -> None:
311
m = 1000
312
n = 1000
313
314
rng = np.random.default_rng(seed=0)
315
dfa = pl.DataFrame({"x": pl.arange(start=0, end=n, eager=True)})
316
dfb = pl.DataFrame(
317
{
318
"y": rng.integers(low=0, high=10, size=m),
319
"z": rng.integers(low=0, high=2, size=m),
320
}
321
)
322
dfc = dfa.join(dfb, how="cross")
323
324
for keys in [["x", "y"], "z"]:
325
for dtype in [*INTEGER_DTYPES, pl.Boolean]:
326
# the alias checks if the schema is correctly handled
327
dfd = (
328
dfc.lazy()
329
.with_columns(pl.col("z").cast(dtype))
330
.group_by(keys)
331
.agg(pl.col("z").sum().alias("z_sum"))
332
.collect(engine="streaming")
333
)
334
assert dfd["z_sum"].sum() == dfc["z"].sum()
335
336
337
def test_streaming_group_by_categorical_aggregate() -> None:
338
out = (
339
pl.LazyFrame(
340
{
341
"a": pl.Series(
342
["a", "a", "b", "b", "c", "c", None, None], dtype=pl.Categorical
343
),
344
"b": pl.Series(
345
pl.date_range(
346
date(2023, 4, 28),
347
date(2023, 5, 5),
348
eager=True,
349
).to_list(),
350
dtype=pl.Date,
351
),
352
}
353
)
354
.group_by(["a", "b"])
355
.agg([pl.col("a").first().alias("sum")])
356
.collect(engine="streaming")
357
)
358
359
assert out.sort("b").to_dict(as_series=False) == {
360
"a": ["a", "a", "b", "b", "c", "c", None, None],
361
"b": [
362
date(2023, 4, 28),
363
date(2023, 4, 29),
364
date(2023, 4, 30),
365
date(2023, 5, 1),
366
date(2023, 5, 2),
367
date(2023, 5, 3),
368
date(2023, 5, 4),
369
date(2023, 5, 5),
370
],
371
"sum": ["a", "a", "b", "b", "c", "c", None, None],
372
}
373
374
375
def test_streaming_group_by_list_9758() -> None:
376
payload = {"a": [[1, 2]]}
377
assert (
378
pl.LazyFrame(payload)
379
.group_by("a")
380
.first()
381
.collect(engine="streaming")
382
.to_dict(as_series=False)
383
== payload
384
)
385
386
387
def test_group_by_min_max_string_type() -> None:
388
table = pl.from_dict({"a": [1, 1, 2, 2, 2], "b": ["a", "b", "c", "d", None]})
389
390
expected = {"a": [1, 2], "min": ["a", "c"], "max": ["b", "d"]}
391
392
for streaming in [True, False]:
393
assert (
394
table.lazy()
395
.group_by("a")
396
.agg([pl.min("b").alias("min"), pl.max("b").alias("max")])
397
.collect(engine="streaming" if streaming else "in-memory")
398
.sort("a")
399
.to_dict(as_series=False)
400
== expected
401
)
402
403
404
@pytest.mark.parametrize("literal", [True, "foo", 1])
405
def test_streaming_group_by_literal(literal: Any) -> None:
406
df = pl.LazyFrame({"a": range(20)})
407
408
assert df.group_by(pl.lit(literal)).agg(
409
[
410
pl.col("a").count().alias("a_count"),
411
pl.col("a").sum().alias("a_sum"),
412
]
413
).collect(engine="streaming").to_dict(as_series=False) == {
414
"literal": [literal],
415
"a_count": [20],
416
"a_sum": [190],
417
}
418
419
420
@pytest.mark.parametrize("streaming", [True, False])
421
def test_group_by_multiple_keys_one_literal(streaming: bool) -> None:
422
df = pl.DataFrame({"a": [1, 1, 2], "b": [4, 5, 6]})
423
424
expected = {"a": [1, 2], "literal": [1, 1], "b": [5, 6]}
425
assert (
426
df.lazy()
427
.group_by("a", pl.lit(1))
428
.agg(pl.col("b").max())
429
.sort(["a", "b"])
430
.collect(engine="streaming" if streaming else "in-memory")
431
.to_dict(as_series=False)
432
== expected
433
)
434
435
436
def test_streaming_group_null_count() -> None:
437
df = pl.DataFrame({"g": [1] * 6, "a": ["yes", None] * 3}).lazy()
438
assert df.group_by("g").agg(pl.col("a").count()).collect(
439
engine="streaming"
440
).to_dict(as_series=False) == {"g": [1], "a": [3]}
441
442
443
def test_streaming_group_by_binary_15116() -> None:
444
assert (
445
pl.LazyFrame(
446
{
447
"str": [
448
"A",
449
"A",
450
"BB",
451
"BB",
452
"CCCC",
453
"CCCC",
454
"DDDDDDDD",
455
"DDDDDDDD",
456
"EEEEEEEEEEEEEEEE",
457
"A",
458
]
459
}
460
)
461
.select([pl.col("str").cast(pl.Binary)])
462
.group_by(["str"])
463
.agg([pl.len().alias("count")])
464
).sort("str").collect(engine="streaming").to_dict(as_series=False) == {
465
"str": [b"A", b"BB", b"CCCC", b"DDDDDDDD", b"EEEEEEEEEEEEEEEE"],
466
"count": [3, 2, 2, 2, 1],
467
}
468
469
470
def test_streaming_group_by_convert_15380(partition_limit: int) -> None:
471
assert (
472
pl.DataFrame({"a": [1] * partition_limit}).group_by(b="a").len()["len"].item()
473
== partition_limit
474
)
475
476
477
@pytest.mark.parametrize("streaming", [True, False])
478
@pytest.mark.parametrize("n_rows_limit_offset", [-1, +3])
479
def test_streaming_group_by_boolean_mean_15610(
480
n_rows_limit_offset: int, streaming: bool, partition_limit: int
481
) -> None:
482
n_rows = partition_limit + n_rows_limit_offset
483
484
# Also test non-streaming because it sometimes dispatched to streaming agg.
485
expect = pl.DataFrame({"a": [False, True], "c": [0.0, 0.5]})
486
487
n_repeats = n_rows // 3
488
assert n_repeats > 0
489
490
out = (
491
pl.select(
492
a=pl.repeat([True, False, True], n_repeats).explode(),
493
b=pl.repeat([True, False, False], n_repeats).explode(),
494
)
495
.lazy()
496
.group_by("a")
497
.agg(c=pl.mean("b"))
498
.sort("a")
499
.collect(engine="streaming" if streaming else "in-memory")
500
)
501
502
assert_frame_equal(out, expect)
503
504
505
def test_streaming_group_by_all_null_21593() -> None:
506
df = pl.DataFrame(
507
{
508
"col_1": ["A", "B", "C", "D"],
509
"col_2": ["test", None, None, None],
510
}
511
)
512
513
out = df.lazy().group_by(pl.all()).min().collect(engine="streaming")
514
assert_frame_equal(df, out, check_row_order=False)
515
516