Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/interop/numpy/test_ufunc_expr.py
6939 views
1
from __future__ import annotations
2
3
from typing import Any, Callable, cast
4
5
import numpy as np
6
import pytest
7
8
import polars as pl
9
from polars.testing import assert_frame_equal, assert_series_equal
10
11
12
def test_ufunc() -> None:
13
df = pl.DataFrame([pl.Series("a", [1, 2, 3, 4], dtype=pl.UInt8)])
14
out = df.select(
15
np.power(pl.col("a"), 2).alias("power_uint8"), # type: ignore[call-overload]
16
np.power(pl.col("a"), 2.0).alias("power_float64"), # type: ignore[call-overload]
17
np.power(pl.col("a"), 2, dtype=np.uint16).alias("power_uint16"), # type: ignore[call-overload]
18
)
19
expected = pl.DataFrame(
20
[
21
pl.Series("power_uint8", [1, 4, 9, 16], dtype=pl.UInt8),
22
pl.Series("power_float64", [1.0, 4.0, 9.0, 16.0], dtype=pl.Float64),
23
pl.Series("power_uint16", [1, 4, 9, 16], dtype=pl.UInt16),
24
]
25
)
26
assert_frame_equal(out, expected)
27
assert out.dtypes == expected.dtypes
28
29
30
def test_ufunc_expr_not_first() -> None:
31
"""Check numpy ufunc expressions also work if expression not the first argument."""
32
df = pl.DataFrame([pl.Series("a", [1, 2, 3], dtype=pl.Float64)])
33
out = df.select(
34
np.power(2.0, cast(Any, pl.col("a"))).alias("power"),
35
(2.0 / cast(Any, pl.col("a"))).alias("divide_scalar"),
36
)
37
expected = pl.DataFrame(
38
[
39
pl.Series("power", [2**1, 2**2, 2**3], dtype=pl.Float64),
40
pl.Series("divide_scalar", [2 / 1, 2 / 2, 2 / 3], dtype=pl.Float64),
41
]
42
)
43
assert_frame_equal(out, expected)
44
45
46
def test_lazy_ufunc() -> None:
47
ldf = pl.LazyFrame([pl.Series("a", [1, 2, 3, 4], dtype=pl.UInt8)])
48
out = ldf.select(
49
np.power(cast(Any, pl.col("a")), 2).alias("power_uint8"),
50
np.power(cast(Any, pl.col("a")), 2.0).alias("power_float64"),
51
np.power(cast(Any, pl.col("a")), 2, dtype=np.uint16).alias("power_uint16"),
52
)
53
expected = pl.DataFrame(
54
[
55
pl.Series("power_uint8", [1, 4, 9, 16], dtype=pl.UInt8),
56
pl.Series("power_float64", [1.0, 4.0, 9.0, 16.0], dtype=pl.Float64),
57
pl.Series("power_uint16", [1, 4, 9, 16], dtype=pl.UInt16),
58
]
59
)
60
assert_frame_equal(out.collect(), expected)
61
62
63
def test_lazy_ufunc_expr_not_first() -> None:
64
"""Check numpy ufunc expressions also work if expression not the first argument."""
65
ldf = pl.LazyFrame([pl.Series("a", [1, 2, 3], dtype=pl.Float64)])
66
out = ldf.select(
67
np.power(2.0, cast(Any, pl.col("a"))).alias("power"),
68
(2.0 / cast(Any, pl.col("a"))).alias("divide_scalar"),
69
)
70
expected = pl.DataFrame(
71
[
72
pl.Series("power", [2**1, 2**2, 2**3], dtype=pl.Float64),
73
pl.Series("divide_scalar", [2 / 1, 2 / 2, 2 / 3], dtype=pl.Float64),
74
]
75
)
76
assert_frame_equal(out.collect(), expected)
77
78
79
def test_ufunc_recognition() -> None:
80
df = pl.DataFrame({"a": [1, 1, 2, 2], "b": [1.1, 2.2, 3.3, 4.4]})
81
assert_frame_equal(df.select(np.exp(pl.col("b"))), df.select(pl.col("b").exp()))
82
83
84
# https://github.com/pola-rs/polars/issues/6770
85
def test_ufunc_multiple_expressions() -> None:
86
df = pl.DataFrame(
87
{
88
"v": [
89
-4.293,
90
-2.4659,
91
-1.8378,
92
-0.2821,
93
-4.5649,
94
-3.8128,
95
-7.4274,
96
3.3443,
97
3.8604,
98
-4.2200,
99
],
100
"u": [
101
-11.2268,
102
6.3478,
103
7.1681,
104
3.4986,
105
2.7320,
106
-1.0695,
107
-10.1408,
108
11.2327,
109
6.6623,
110
-8.1412,
111
],
112
}
113
)
114
expected = np.arctan2(df.get_column("v"), df.get_column("u"))
115
result = df.select(np.arctan2(pl.col("v"), pl.col("u")))[:, 0] # type: ignore[call-overload]
116
assert_series_equal(expected, result) # type: ignore[arg-type]
117
118
119
def test_repeated_name_ufunc_17472() -> None:
120
"""If a ufunc takes multiple inputs has a repeating name, this works."""
121
df = pl.DataFrame({"a": [6.0]})
122
result = df.select(np.divide(pl.col("a"), pl.col("a"))) # type: ignore[call-overload]
123
expected = pl.DataFrame({"a": [1.0]})
124
assert_frame_equal(expected, result)
125
126
127
def test_grouped_ufunc() -> None:
128
df = pl.DataFrame({"id": ["a", "a", "b", "b"], "values": [0.1, 0.1, -0.1, -0.1]})
129
df.group_by("id").agg(pl.col("values").log1p().sum().pipe(np.expm1))
130
131
132
def test_generalized_ufunc_scalar() -> None:
133
numba = pytest.importorskip("numba")
134
135
@numba.guvectorize([(numba.int64[:], numba.int64[:])], "(n)->()") # type: ignore[misc]
136
def my_custom_sum(arr, result) -> None: # type: ignore[no-untyped-def] # noqa: ANN001
137
total = 0
138
for value in arr:
139
total += value
140
result[0] = total
141
142
# Make type checkers happy:
143
custom_sum = cast(Callable[[object], object], my_custom_sum)
144
145
# Demonstrate NumPy as the canonical expected behavior:
146
assert custom_sum(np.array([10, 2, 3], dtype=np.int64)) == 15
147
148
# Direct call of the gufunc:
149
df = pl.DataFrame({"values": [10, 2, 3]})
150
assert custom_sum(df.get_column("values")) == 15
151
152
# Indirect call of the gufunc:
153
indirect = df.select(
154
pl.col("values").map_batches(
155
custom_sum, returns_scalar=True, return_dtype=pl.self_dtype()
156
)
157
)
158
assert_frame_equal(indirect, pl.DataFrame({"values": 15}))
159
indirect = df.select(
160
pl.col("values").map_batches(
161
lambda s: pl.Series([custom_sum(s)]),
162
returns_scalar=False,
163
return_dtype=pl.self_dtype(),
164
)
165
)
166
assert_frame_equal(indirect, pl.DataFrame({"values": [15]}))
167
168
# group_by()
169
df = pl.DataFrame({"labels": ["a", "b", "a", "b"], "values": [10, 2, 3, 30]})
170
indirect = (
171
df.group_by("labels")
172
.agg(
173
pl.col("values").map_batches(
174
custom_sum, returns_scalar=True, return_dtype=pl.self_dtype()
175
)
176
)
177
.sort("labels")
178
)
179
assert_frame_equal(
180
indirect, pl.DataFrame({"labels": ["a", "b"], "values": [13, 32]})
181
)
182
183
184
def make_gufunc_mean() -> Callable[[pl.Series], pl.Series]:
185
numba = pytest.importorskip("numba")
186
187
@numba.guvectorize([(numba.float64[:], numba.float64[:])], "(n)->(n)") # type: ignore[misc]
188
def gufunc_mean(arr: Any, result: Any) -> None:
189
mean = arr.mean()
190
for i in range(len(arr)):
191
result[i] = mean + i
192
193
return gufunc_mean # type: ignore[no-any-return]
194
195
196
def test_generalized_ufunc() -> None:
197
gufunc_mean = make_gufunc_mean()
198
df = pl.DataFrame({"s": [1.0, 2.0, 3.0]})
199
result = df.select([pl.col("s").map_batches(gufunc_mean).alias("result")])
200
expected = pl.DataFrame({"result": [2.0, 3.0, 4.0]})
201
assert_frame_equal(result, expected)
202
203
204
def test_grouped_generalized_ufunc() -> None:
205
gufunc_mean = make_gufunc_mean()
206
df = pl.DataFrame({"id": ["a", "a", "b", "b"], "values": [1.0, 2.0, 3.0, 4.0]})
207
result = (
208
df.group_by("id")
209
.agg(pl.col("values").map_batches(gufunc_mean, return_dtype=pl.self_dtype()))
210
.sort("id")
211
)
212
expected = pl.DataFrame({"id": ["a", "b"], "values": [[1.5, 2.5], [3.5, 4.5]]})
213
assert_frame_equal(result, expected)
214
215
216
def test_ufunc_chain() -> None:
217
df = pl.DataFrame(
218
data={"A": [2, 10, 11, 12, 3, 10, 11, 12], "counter": [1, 2, 3, 4, 5, 6, 7, 8]}
219
)
220
result = df.rolling(index_column="counter", period="2i").agg(
221
(np.log(pl.col("A"))).mean().alias("mean_numpy"),
222
(pl.col("A")).log().mean().alias("mean_polars"),
223
)
224
assert_series_equal(result["mean_numpy"], result["mean_polars"].alias("mean_numpy"))
225
226