Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/operations/map/test_map_groups.py
6940 views
1
from __future__ import annotations
2
3
from typing import TYPE_CHECKING, Any
4
5
import numpy as np
6
import pytest
7
8
import polars as pl
9
from polars.exceptions import ComputeError
10
from polars.testing import assert_frame_equal
11
12
if TYPE_CHECKING:
13
from collections.abc import Sequence
14
15
16
def test_map_groups() -> None:
17
df = pl.DataFrame(
18
{
19
"a": ["a", "b", "a", "b", "b", "c"],
20
"b": [1, 2, 3, 4, 5, 6],
21
"c": [6, 5, 4, 3, 2, 1],
22
}
23
)
24
25
result = df.group_by("a").map_groups(lambda df: df[["c"]].sum())
26
27
expected = pl.DataFrame({"c": [10, 10, 1]})
28
assert_frame_equal(result, expected, check_row_order=False)
29
30
31
def test_map_groups_lazy() -> None:
32
lf = pl.LazyFrame({"a": [1, 1, 3], "b": [1.0, 2.0, 3.0]})
33
34
schema = {"a": pl.Float64, "b": pl.Float64}
35
result = lf.group_by("a").map_groups(lambda df: df * 2.0, schema=schema)
36
37
expected = pl.LazyFrame({"a": [6.0, 2.0, 2.0], "b": [6.0, 2.0, 4.0]})
38
assert_frame_equal(result, expected, check_row_order=False)
39
assert result.collect_schema() == expected.collect_schema()
40
41
42
def test_map_groups_rolling() -> None:
43
df = pl.DataFrame(
44
{
45
"a": [1, 2, 3, 4, 5],
46
"b": [1, 2, 3, 4, 5],
47
}
48
).set_sorted("a")
49
50
def function(df: pl.DataFrame) -> pl.DataFrame:
51
return df.select(
52
pl.col("a").min(),
53
pl.col("b").max(),
54
)
55
56
result = df.rolling("a", period="2i").map_groups(function, schema=df.schema)
57
58
expected = pl.DataFrame(
59
[
60
pl.Series("a", [1, 1, 2, 3, 4], dtype=pl.Int64),
61
pl.Series("b", [1, 2, 3, 4, 5], dtype=pl.Int64),
62
]
63
)
64
assert_frame_equal(result, expected)
65
66
67
def test_map_groups_empty() -> None:
68
df = pl.DataFrame(schema={"x": pl.Int64})
69
with pytest.raises(
70
ComputeError, match=r"cannot group_by \+ apply on empty 'DataFrame'"
71
):
72
df.group_by("x").map_groups(lambda x: x)
73
74
75
def test_map_groups_none() -> None:
76
df = pl.DataFrame(
77
{
78
"g": [1, 1, 1, 2, 2, 2, 5],
79
"a": [2, 4, 5, 190, 1, 4, 1],
80
"b": [1, 3, 2, 1, 43, 3, 1],
81
}
82
)
83
84
out = (
85
df.group_by("g", maintain_order=True).agg(
86
pl.map_groups(
87
exprs=["a", pl.col("b") ** 4, pl.col("a") / 4],
88
function=lambda x: x[0] * x[1] + x[2].sum(),
89
return_dtype=pl.Float64,
90
returns_scalar=False,
91
).alias("multiple")
92
)
93
)["multiple"]
94
assert out[0].to_list() == [4.75, 326.75, 82.75]
95
assert out[1].to_list() == [238.75, 3418849.75, 372.75]
96
97
out_df = df.select(pl.map_batches(exprs=["a", "b"], function=lambda s: s[0] * s[1]))
98
assert out_df["a"].to_list() == (df["a"] * df["b"]).to_list()
99
100
# check if we can return None
101
def func(s: Sequence[pl.Series]) -> pl.Series | None:
102
if s[0][0] == 190:
103
return None
104
else:
105
return s[0].implode()
106
107
out = (
108
df.group_by("g", maintain_order=True).agg(
109
pl.map_groups(
110
exprs=["a", pl.col("b") ** 4, pl.col("a") / 4],
111
function=func,
112
return_dtype=pl.self_dtype().wrap_in_list(),
113
returns_scalar=True,
114
).alias("multiple")
115
)
116
)["multiple"]
117
assert out[1] is None
118
119
120
def test_map_groups_object_output() -> None:
121
df = pl.DataFrame(
122
{
123
"names": ["foo", "ham", "spam", "cheese", "egg", "foo"],
124
"dates": ["1", "1", "2", "3", "3", "4"],
125
"groups": ["A", "A", "B", "B", "B", "C"],
126
}
127
)
128
129
class Foo:
130
def __init__(self, payload: Any) -> None:
131
self.payload = payload
132
133
result = df.group_by("groups").agg(
134
pl.map_groups(
135
[pl.col("dates"), pl.col("names")],
136
lambda s: Foo(dict(zip(s[0], s[1]))),
137
return_dtype=pl.Object,
138
returns_scalar=True,
139
)
140
)
141
142
assert result.dtypes == [pl.String, pl.Object]
143
144
145
def test_map_groups_numpy_output_3057() -> None:
146
df = pl.DataFrame(
147
{
148
"id": [0, 0, 0, 1, 1, 1],
149
"t": [2.0, 4.3, 5, 10, 11, 14],
150
"y": [0.0, 1, 1.3, 2, 3, 4],
151
}
152
)
153
154
result = df.group_by("id", maintain_order=True).agg(
155
pl.map_groups(
156
["y", "t"],
157
lambda lst: np.mean([lst[0], lst[1]]),
158
returns_scalar=True,
159
return_dtype=pl.self_dtype(),
160
).alias("result")
161
)
162
163
expected = pl.DataFrame({"id": [0, 1], "result": [2.266666, 7.333333]})
164
assert_frame_equal(result, expected)
165
166
167
def test_map_groups_return_all_null_15260() -> None:
168
def foo(x: Sequence[pl.Series]) -> pl.Series:
169
return pl.Series([x[0][0]], dtype=x[0].dtype)
170
171
assert_frame_equal(
172
pl.DataFrame({"key": [0, 0, 1], "a": [None, None, None]})
173
.group_by("key")
174
.agg(
175
pl.map_groups(
176
exprs=["a"],
177
function=foo,
178
returns_scalar=True,
179
return_dtype=pl.self_dtype(),
180
)
181
)
182
.sort("key"),
183
pl.DataFrame({"key": [0, 1], "a": [None, None]}),
184
)
185
186