Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/streaming/test_streaming_group_by.py
8420 views
1
from __future__ import annotations
2
3
from datetime import date
4
from typing import TYPE_CHECKING, Any
5
6
import numpy as np
7
import pytest
8
9
import polars as pl
10
from polars.exceptions import DuplicateError
11
from polars.testing import assert_frame_equal
12
from tests.unit.conftest import INTEGER_DTYPES
13
14
if TYPE_CHECKING:
15
from pathlib import Path
16
17
from tests.conftest import PlMonkeyPatch
18
19
pytestmark = pytest.mark.xdist_group("streaming")
20
21
22
@pytest.mark.slow
23
def test_streaming_group_by_sorted_fast_path_nulls_10273() -> None:
24
df = pl.Series(
25
name="x",
26
values=(
27
*(i for i in range(4) for _ in range(100)),
28
*(None for _ in range(100)),
29
),
30
).to_frame()
31
32
assert (
33
df.set_sorted("x")
34
.lazy()
35
.group_by("x")
36
.agg(pl.len())
37
.collect(engine="streaming")
38
.sort("x")
39
).to_dict(as_series=False) == {
40
"x": [None, 0, 1, 2, 3],
41
"len": [100, 100, 100, 100, 100],
42
}
43
44
45
def test_streaming_group_by_types() -> None:
46
df = pl.DataFrame(
47
{
48
"person_id": [1, 1],
49
"year": [1995, 1995],
50
"person_name": ["bob", "foo"],
51
"bool": [True, False],
52
"date": [date(2022, 1, 1), date(2022, 1, 1)],
53
}
54
)
55
56
for by in ["person_id", "year", "date", ["person_id", "year"]]:
57
out = (
58
(
59
df.lazy()
60
.group_by(by)
61
.agg(
62
[
63
pl.col("person_name").first().alias("str_first"),
64
pl.col("person_name").last().alias("str_last"),
65
pl.col("bool").first().alias("bool_first"),
66
pl.col("bool").last().alias("bool_last"),
67
pl.col("bool").mean().alias("bool_mean"),
68
pl.col("bool").sum().alias("bool_sum"),
69
# pl.col("date").sum().alias("date_sum"),
70
# Date streaming mean/median has been temporarily disabled
71
# pl.col("date").mean().alias("date_mean"),
72
pl.col("date").first().alias("date_first"),
73
pl.col("date").last().alias("date_last"),
74
pl.col("date").min().alias("date_min"),
75
pl.col("date").max().alias("date_max"),
76
]
77
)
78
)
79
.select(pl.all().exclude(by))
80
.collect(engine="streaming")
81
)
82
assert out.schema == {
83
"str_first": pl.String,
84
"str_last": pl.String,
85
"bool_first": pl.Boolean,
86
"bool_last": pl.Boolean,
87
"bool_mean": pl.Float64,
88
"bool_sum": pl.get_index_type(),
89
# "date_sum": pl.Date,
90
# "date_mean": pl.Date,
91
"date_first": pl.Date,
92
"date_last": pl.Date,
93
"date_min": pl.Date,
94
"date_max": pl.Date,
95
}
96
97
assert out.to_dict(as_series=False) == {
98
"str_first": ["bob"],
99
"str_last": ["foo"],
100
"bool_first": [True],
101
"bool_last": [False],
102
"bool_mean": [0.5],
103
"bool_sum": [1],
104
# "date_sum": [None],
105
# Date streaming mean/median has been temporarily disabled
106
# "date_mean": [date(2022, 1, 1)],
107
"date_first": [date(2022, 1, 1)],
108
"date_last": [date(2022, 1, 1)],
109
"date_min": [date(2022, 1, 1)],
110
"date_max": [date(2022, 1, 1)],
111
}
112
113
with pytest.raises(DuplicateError):
114
(
115
df.lazy()
116
.group_by("person_id")
117
.agg(
118
[
119
pl.col("person_name").first().alias("str_first"),
120
pl.col("person_name").last().alias("str_last"),
121
pl.col("person_name").mean().alias("str_mean"),
122
pl.col("bool").first().alias("bool_first"),
123
pl.col("bool").last().alias("bool_first"),
124
]
125
)
126
.select(pl.all().exclude("person_id"))
127
.collect(engine="streaming")
128
)
129
130
131
def test_streaming_group_by_min_max() -> None:
132
df = pl.DataFrame(
133
{
134
"person_id": [1, 2, 3, 4, 5, 6],
135
"year": [1995, 1995, 1995, 2, 2, 2],
136
}
137
)
138
out = (
139
df.lazy()
140
.group_by("year")
141
.agg([pl.min("person_id").alias("min"), pl.max("person_id").alias("max")])
142
.collect()
143
.sort("year")
144
)
145
assert out["min"].to_list() == [4, 1]
146
assert out["max"].to_list() == [6, 3]
147
148
149
def test_streaming_non_streaming_gb() -> None:
150
n = 100
151
df = pl.DataFrame({"a": np.random.randint(0, 20, n)})
152
q = df.lazy().group_by("a").agg(pl.len()).sort("a")
153
assert_frame_equal(q.collect(engine="streaming"), q.collect())
154
155
q = df.lazy().with_columns(pl.col("a").cast(pl.String))
156
q = q.group_by("a").agg(pl.len()).sort("a")
157
assert_frame_equal(q.collect(engine="streaming"), q.collect())
158
q = df.lazy().with_columns(pl.col("a").alias("b"))
159
q = q.group_by(["a", "b"]).agg(pl.len(), pl.col("a").sum().alias("sum_a")).sort("a")
160
assert_frame_equal(q.collect(engine="streaming"), q.collect())
161
162
163
def test_streaming_group_by_sorted_fast_path() -> None:
164
a = np.random.randint(0, 20, 80)
165
df = pl.DataFrame(
166
{
167
# test on int8 as that also tests proper conversions
168
"a": pl.Series(np.sort(a), dtype=pl.Int8)
169
}
170
).with_row_index()
171
172
df_sorted = df.with_columns(pl.col("a").set_sorted())
173
174
for streaming in [True, False]:
175
results = []
176
for df_ in [df, df_sorted]:
177
out = (
178
df_.lazy()
179
.group_by("a")
180
.agg(
181
[
182
pl.first("a").alias("first"),
183
pl.last("a").alias("last"),
184
pl.sum("a").alias("sum"),
185
pl.mean("a").alias("mean"),
186
pl.count("a").alias("count"),
187
pl.min("a").alias("min"),
188
pl.max("a").alias("max"),
189
]
190
)
191
.sort("a")
192
.collect(engine="streaming" if streaming else "in-memory")
193
)
194
results.append(out)
195
196
assert_frame_equal(results[0], results[1])
197
198
199
@pytest.fixture(scope="module")
200
def random_integers() -> pl.Series:
201
np.random.seed(1)
202
return pl.Series("a", np.random.randint(0, 10, 100), dtype=pl.Int64)
203
204
205
@pytest.mark.write_disk
206
def test_streaming_group_by_ooc_q1(
207
random_integers: pl.Series,
208
tmp_path: Path,
209
plmonkeypatch: PlMonkeyPatch,
210
) -> None:
211
tmp_path.mkdir(exist_ok=True)
212
plmonkeypatch.setenv("POLARS_TEMP_DIR", str(tmp_path))
213
plmonkeypatch.setenv("POLARS_FORCE_OOC", "1")
214
215
lf = random_integers.to_frame().lazy()
216
result = (
217
lf.group_by("a")
218
.agg(pl.first("a").alias("a_first"), pl.last("a").alias("a_last"))
219
.sort("a")
220
.collect(engine="streaming")
221
)
222
223
expected = pl.DataFrame(
224
{
225
"a": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
226
"a_first": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
227
"a_last": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
228
}
229
)
230
assert_frame_equal(result, expected)
231
232
233
@pytest.mark.write_disk
234
def test_streaming_group_by_ooc_q2(
235
random_integers: pl.Series,
236
tmp_path: Path,
237
plmonkeypatch: PlMonkeyPatch,
238
) -> None:
239
tmp_path.mkdir(exist_ok=True)
240
plmonkeypatch.setenv("POLARS_TEMP_DIR", str(tmp_path))
241
plmonkeypatch.setenv("POLARS_FORCE_OOC", "1")
242
243
lf = random_integers.cast(str).to_frame().lazy()
244
result = (
245
lf.group_by("a")
246
.agg(pl.first("a").alias("a_first"), pl.last("a").alias("a_last"))
247
.sort("a")
248
.collect(engine="streaming")
249
)
250
251
expected = pl.DataFrame(
252
{
253
"a": ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"],
254
"a_first": ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"],
255
"a_last": ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"],
256
}
257
)
258
assert_frame_equal(result, expected)
259
260
261
@pytest.mark.write_disk
262
def test_streaming_group_by_ooc_q3(
263
random_integers: pl.Series,
264
tmp_path: Path,
265
plmonkeypatch: PlMonkeyPatch,
266
) -> None:
267
tmp_path.mkdir(exist_ok=True)
268
plmonkeypatch.setenv("POLARS_TEMP_DIR", str(tmp_path))
269
plmonkeypatch.setenv("POLARS_FORCE_OOC", "1")
270
271
lf = pl.LazyFrame({"a": random_integers, "b": random_integers})
272
result = (
273
lf.group_by("a", "b")
274
.agg(pl.first("a").alias("a_first"), pl.last("a").alias("a_last"))
275
.sort("a")
276
.collect(engine="streaming")
277
)
278
279
expected = pl.DataFrame(
280
{
281
"a": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
282
"b": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
283
"a_first": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
284
"a_last": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
285
}
286
)
287
assert_frame_equal(result, expected)
288
289
290
def test_streaming_group_by_struct_key() -> None:
291
df = pl.DataFrame(
292
{"A": [1, 2, 3, 2], "B": ["google", "ms", "apple", "ms"], "C": [2, 3, 4, 3]}
293
)
294
df1 = df.lazy().with_columns(pl.struct(["A", "C"]).alias("tuples"))
295
assert df1.group_by("tuples").agg(pl.len(), pl.col("B").first()).sort("B").collect(
296
engine="streaming"
297
).to_dict(as_series=False) == {
298
"tuples": [{"A": 3, "C": 4}, {"A": 1, "C": 2}, {"A": 2, "C": 3}],
299
"len": [1, 1, 2],
300
"B": ["apple", "google", "ms"],
301
}
302
303
304
@pytest.mark.slow
305
def test_streaming_group_by_all_numeric_types_stability_8570() -> None:
306
m = 1000
307
n = 1000
308
309
rng = np.random.default_rng(seed=0)
310
dfa = pl.DataFrame({"x": pl.arange(start=0, end=n, eager=True)})
311
dfb = pl.DataFrame(
312
{
313
"y": rng.integers(low=0, high=10, size=m),
314
"z": rng.integers(low=0, high=2, size=m),
315
}
316
)
317
dfc = dfa.join(dfb, how="cross")
318
319
for keys in [["x", "y"], "z"]:
320
for dtype in [*INTEGER_DTYPES, pl.Boolean]:
321
# the alias checks if the schema is correctly handled
322
dfd = (
323
dfc.lazy()
324
.with_columns(pl.col("z").cast(dtype))
325
.group_by(keys)
326
.agg(pl.col("z").sum().alias("z_sum"))
327
.collect(engine="streaming")
328
)
329
assert dfd["z_sum"].sum() == dfc["z"].sum()
330
331
332
def test_streaming_group_by_categorical_aggregate() -> None:
333
out = (
334
pl.LazyFrame(
335
{
336
"a": pl.Series(
337
["a", "a", "b", "b", "c", "c", None, None], dtype=pl.Categorical
338
),
339
"b": pl.Series(
340
pl.date_range(
341
date(2023, 4, 28),
342
date(2023, 5, 5),
343
eager=True,
344
).to_list(),
345
dtype=pl.Date,
346
),
347
}
348
)
349
.group_by(["a", "b"])
350
.agg([pl.col("a").first().alias("sum")])
351
.collect(engine="streaming")
352
)
353
354
assert out.sort("b").to_dict(as_series=False) == {
355
"a": ["a", "a", "b", "b", "c", "c", None, None],
356
"b": [
357
date(2023, 4, 28),
358
date(2023, 4, 29),
359
date(2023, 4, 30),
360
date(2023, 5, 1),
361
date(2023, 5, 2),
362
date(2023, 5, 3),
363
date(2023, 5, 4),
364
date(2023, 5, 5),
365
],
366
"sum": ["a", "a", "b", "b", "c", "c", None, None],
367
}
368
369
370
def test_streaming_group_by_list_9758() -> None:
371
payload = {"a": [[1, 2]]}
372
assert (
373
pl.LazyFrame(payload)
374
.group_by("a")
375
.first()
376
.collect(engine="streaming")
377
.to_dict(as_series=False)
378
== payload
379
)
380
381
382
def test_group_by_min_max_string_type() -> None:
383
table = pl.from_dict({"a": [1, 1, 2, 2, 2], "b": ["a", "b", "c", "d", None]})
384
385
expected = {"a": [1, 2], "min": ["a", "c"], "max": ["b", "d"]}
386
387
for streaming in [True, False]:
388
assert (
389
table.lazy()
390
.group_by("a")
391
.agg([pl.min("b").alias("min"), pl.max("b").alias("max")])
392
.collect(engine="streaming" if streaming else "in-memory")
393
.sort("a")
394
.to_dict(as_series=False)
395
== expected
396
)
397
398
399
@pytest.mark.parametrize("literal", [True, "foo", 1])
400
def test_streaming_group_by_literal(literal: Any) -> None:
401
df = pl.LazyFrame({"a": range(20)})
402
403
assert df.group_by(pl.lit(literal)).agg(
404
[
405
pl.col("a").count().alias("a_count"),
406
pl.col("a").sum().alias("a_sum"),
407
]
408
).collect(engine="streaming").to_dict(as_series=False) == {
409
"literal": [literal],
410
"a_count": [20],
411
"a_sum": [190],
412
}
413
414
415
@pytest.mark.parametrize("streaming", [True, False])
416
def test_group_by_multiple_keys_one_literal(streaming: bool) -> None:
417
df = pl.DataFrame({"a": [1, 1, 2], "b": [4, 5, 6]})
418
419
expected = {"a": [1, 2], "literal": [1, 1], "b": [5, 6]}
420
assert (
421
df.lazy()
422
.group_by("a", pl.lit(1))
423
.agg(pl.col("b").max())
424
.sort(["a", "b"])
425
.collect(engine="streaming" if streaming else "in-memory")
426
.to_dict(as_series=False)
427
== expected
428
)
429
430
431
def test_streaming_group_null_count() -> None:
432
df = pl.DataFrame({"g": [1] * 6, "a": ["yes", None] * 3}).lazy()
433
assert df.group_by("g").agg(pl.col("a").count()).collect(
434
engine="streaming"
435
).to_dict(as_series=False) == {"g": [1], "a": [3]}
436
437
438
def test_streaming_group_by_binary_15116() -> None:
439
assert (
440
pl.LazyFrame(
441
{
442
"str": [
443
"A",
444
"A",
445
"BB",
446
"BB",
447
"CCCC",
448
"CCCC",
449
"DDDDDDDD",
450
"DDDDDDDD",
451
"EEEEEEEEEEEEEEEE",
452
"A",
453
]
454
}
455
)
456
.select([pl.col("str").cast(pl.Binary)])
457
.group_by(["str"])
458
.agg([pl.len().alias("count")])
459
).sort("str").collect(engine="streaming").to_dict(as_series=False) == {
460
"str": [b"A", b"BB", b"CCCC", b"DDDDDDDD", b"EEEEEEEEEEEEEEEE"],
461
"count": [3, 2, 2, 2, 1],
462
}
463
464
465
def test_streaming_group_by_convert_15380(partition_limit: int) -> None:
466
assert (
467
pl.DataFrame({"a": [1] * partition_limit}).group_by(b="a").len()["len"].item()
468
== partition_limit
469
)
470
471
472
@pytest.mark.parametrize("streaming", [True, False])
473
@pytest.mark.parametrize("n_rows_limit_offset", [-1, +3])
474
def test_streaming_group_by_boolean_mean_15610(
475
n_rows_limit_offset: int, streaming: bool, partition_limit: int
476
) -> None:
477
n_rows = partition_limit + n_rows_limit_offset
478
479
# Also test non-streaming because it sometimes dispatched to streaming agg.
480
expect = pl.DataFrame({"a": [False, True], "c": [0.0, 0.5]})
481
482
n_repeats = n_rows // 3
483
assert n_repeats > 0
484
485
out = (
486
pl.select(
487
a=pl.repeat([True, False, True], n_repeats).explode(),
488
b=pl.repeat([True, False, False], n_repeats).explode(),
489
)
490
.lazy()
491
.group_by("a")
492
.agg(c=pl.mean("b"))
493
.sort("a")
494
.collect(engine="streaming" if streaming else "in-memory")
495
)
496
497
assert_frame_equal(out, expect)
498
499
500
def test_streaming_group_by_all_null_21593() -> None:
501
df = pl.DataFrame(
502
{
503
"col_1": ["A", "B", "C", "D"],
504
"col_2": ["test", None, None, None],
505
}
506
)
507
508
out = df.lazy().group_by(pl.all()).min().collect(engine="streaming")
509
assert_frame_equal(df, out, check_row_order=False)
510
511