Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/interop/numpy/test_ufunc_expr.py
8406 views
1
from __future__ import annotations
2
3
from typing import TYPE_CHECKING, Any, cast
4
5
import numpy as np
6
import pytest
7
8
import polars as pl
9
from polars.testing import assert_frame_equal, assert_series_equal
10
11
if TYPE_CHECKING:
12
from collections.abc import Callable
13
14
15
def test_ufunc() -> None:
16
df = pl.DataFrame([pl.Series("a", [1, 2, 3, 4], dtype=pl.UInt8)])
17
out = df.select(
18
np.power(pl.col("a"), 2).alias("power_uint8"), # type: ignore[call-overload]
19
np.power(pl.col("a"), 2.0).alias("power_float64"), # type: ignore[call-overload]
20
np.power(pl.col("a"), 2, dtype=np.uint16).alias("power_uint16"), # type: ignore[call-overload]
21
)
22
expected = pl.DataFrame(
23
[
24
pl.Series("power_uint8", [1, 4, 9, 16], dtype=pl.UInt8),
25
pl.Series("power_float64", [1.0, 4.0, 9.0, 16.0], dtype=pl.Float64),
26
pl.Series("power_uint16", [1, 4, 9, 16], dtype=pl.UInt16),
27
]
28
)
29
assert_frame_equal(out, expected)
30
assert out.dtypes == expected.dtypes
31
32
33
def test_ufunc_expr_not_first() -> None:
34
"""Check numpy ufunc expressions also work if expression not the first argument."""
35
df = pl.DataFrame([pl.Series("a", [1, 2, 3], dtype=pl.Float64)])
36
out = df.select(
37
np.power(2.0, cast("Any", pl.col("a"))).alias("power"),
38
(2.0 / cast("Any", pl.col("a"))).alias("divide_scalar"),
39
)
40
expected = pl.DataFrame(
41
[
42
pl.Series("power", [2**1, 2**2, 2**3], dtype=pl.Float64),
43
pl.Series("divide_scalar", [2 / 1, 2 / 2, 2 / 3], dtype=pl.Float64),
44
]
45
)
46
assert_frame_equal(out, expected)
47
48
49
def test_lazy_ufunc() -> None:
50
ldf = pl.LazyFrame([pl.Series("a", [1, 2, 3, 4], dtype=pl.UInt8)])
51
out = ldf.select(
52
np.power(cast("Any", pl.col("a")), 2).alias("power_uint8"),
53
np.power(cast("Any", pl.col("a")), 2.0).alias("power_float64"),
54
np.power(cast("Any", pl.col("a")), 2, dtype=np.uint16).alias("power_uint16"),
55
)
56
expected = pl.DataFrame(
57
[
58
pl.Series("power_uint8", [1, 4, 9, 16], dtype=pl.UInt8),
59
pl.Series("power_float64", [1.0, 4.0, 9.0, 16.0], dtype=pl.Float64),
60
pl.Series("power_uint16", [1, 4, 9, 16], dtype=pl.UInt16),
61
]
62
)
63
assert_frame_equal(out.collect(), expected)
64
65
66
def test_lazy_ufunc_expr_not_first() -> None:
67
"""Check numpy ufunc expressions also work if expression not the first argument."""
68
ldf = pl.LazyFrame([pl.Series("a", [1, 2, 3], dtype=pl.Float64)])
69
out = ldf.select(
70
np.power(2.0, cast("Any", pl.col("a"))).alias("power"),
71
(2.0 / cast("Any", pl.col("a"))).alias("divide_scalar"),
72
)
73
expected = pl.DataFrame(
74
[
75
pl.Series("power", [2**1, 2**2, 2**3], dtype=pl.Float64),
76
pl.Series("divide_scalar", [2 / 1, 2 / 2, 2 / 3], dtype=pl.Float64),
77
]
78
)
79
assert_frame_equal(out.collect(), expected)
80
81
82
def test_ufunc_recognition() -> None:
83
df = pl.DataFrame({"a": [1, 1, 2, 2], "b": [1.1, 2.2, 3.3, 4.4]})
84
assert_frame_equal(df.select(np.exp(pl.col("b"))), df.select(pl.col("b").exp()))
85
86
87
# https://github.com/pola-rs/polars/issues/6770
88
def test_ufunc_multiple_expressions() -> None:
89
df = pl.DataFrame(
90
{
91
"v": [
92
-4.293,
93
-2.4659,
94
-1.8378,
95
-0.2821,
96
-4.5649,
97
-3.8128,
98
-7.4274,
99
3.3443,
100
3.8604,
101
-4.2200,
102
],
103
"u": [
104
-11.2268,
105
6.3478,
106
7.1681,
107
3.4986,
108
2.7320,
109
-1.0695,
110
-10.1408,
111
11.2327,
112
6.6623,
113
-8.1412,
114
],
115
}
116
)
117
expected = np.arctan2(df.get_column("v"), df.get_column("u"))
118
result = df.select(np.arctan2(pl.col("v"), pl.col("u")))[:, 0] # type: ignore[call-overload]
119
assert_series_equal(expected, result) # type: ignore[arg-type]
120
121
122
def test_repeated_name_ufunc_17472() -> None:
123
"""If a ufunc takes multiple inputs has a repeating name, this works."""
124
df = pl.DataFrame({"a": [6.0]})
125
result = df.select(np.divide(pl.col("a"), pl.col("a"))) # type: ignore[call-overload]
126
expected = pl.DataFrame({"a": [1.0]})
127
assert_frame_equal(expected, result)
128
129
130
def test_grouped_ufunc() -> None:
131
df = pl.DataFrame({"id": ["a", "a", "b", "b"], "values": [0.1, 0.1, -0.1, -0.1]})
132
df.group_by("id").agg(pl.col("values").log1p().sum().pipe(np.expm1))
133
134
135
def test_generalized_ufunc_scalar() -> None:
136
numba = pytest.importorskip("numba", exc_type=ImportError)
137
138
@numba.guvectorize([(numba.int64[:], numba.int64[:])], "(n)->()") # type: ignore[misc, untyped-decorator]
139
def my_custom_sum(arr, result) -> None: # type: ignore[no-untyped-def] # noqa: ANN001
140
total = 0
141
for value in arr:
142
total += value
143
result[0] = total
144
145
# Make type checkers happy:
146
custom_sum = cast("Callable[[object], object]", my_custom_sum)
147
148
# Demonstrate NumPy as the canonical expected behavior:
149
assert custom_sum(np.array([10, 2, 3], dtype=np.int64)) == 15
150
151
# Direct call of the gufunc:
152
df = pl.DataFrame({"values": [10, 2, 3]})
153
assert custom_sum(df.get_column("values")) == 15
154
155
# Indirect call of the gufunc:
156
indirect = df.select(
157
pl.col("values").map_batches(
158
custom_sum, returns_scalar=True, return_dtype=pl.self_dtype()
159
)
160
)
161
assert_frame_equal(indirect, pl.DataFrame({"values": 15}))
162
indirect = df.select(
163
pl.col("values").map_batches(
164
lambda s: pl.Series([custom_sum(s)]),
165
returns_scalar=False,
166
return_dtype=pl.self_dtype(),
167
)
168
)
169
assert_frame_equal(indirect, pl.DataFrame({"values": [15]}))
170
171
# group_by()
172
df = pl.DataFrame({"labels": ["a", "b", "a", "b"], "values": [10, 2, 3, 30]})
173
indirect = (
174
df.group_by("labels")
175
.agg(
176
pl.col("values").map_batches(
177
custom_sum, returns_scalar=True, return_dtype=pl.self_dtype()
178
)
179
)
180
.sort("labels")
181
)
182
assert_frame_equal(
183
indirect, pl.DataFrame({"labels": ["a", "b"], "values": [13, 32]})
184
)
185
186
187
def make_gufunc_mean() -> Callable[[pl.Series], pl.Series]:
188
numba = pytest.importorskip("numba", exc_type=ImportError)
189
190
@numba.guvectorize([(numba.float64[:], numba.float64[:])], "(n)->(n)") # type: ignore[misc, untyped-decorator]
191
def gufunc_mean(arr: Any, result: Any) -> None:
192
mean = arr.mean()
193
for i in range(len(arr)):
194
result[i] = mean + i
195
196
return gufunc_mean # type: ignore[no-any-return]
197
198
199
def test_generalized_ufunc() -> None:
200
gufunc_mean = make_gufunc_mean()
201
df = pl.DataFrame({"s": [1.0, 2.0, 3.0]})
202
result = df.select([pl.col("s").map_batches(gufunc_mean).alias("result")])
203
expected = pl.DataFrame({"result": [2.0, 3.0, 4.0]})
204
assert_frame_equal(result, expected)
205
206
207
def test_grouped_generalized_ufunc() -> None:
208
gufunc_mean = make_gufunc_mean()
209
df = pl.DataFrame({"id": ["a", "a", "b", "b"], "values": [1.0, 2.0, 3.0, 4.0]})
210
result = (
211
df.group_by("id")
212
.agg(pl.col("values").map_batches(gufunc_mean, return_dtype=pl.self_dtype()))
213
.sort("id")
214
)
215
expected = pl.DataFrame({"id": ["a", "b"], "values": [[1.5, 2.5], [3.5, 4.5]]})
216
assert_frame_equal(result, expected)
217
218
219
def test_ufunc_chain() -> None:
220
df = pl.DataFrame(
221
data={"A": [2, 10, 11, 12, 3, 10, 11, 12], "counter": [1, 2, 3, 4, 5, 6, 7, 8]}
222
)
223
result = df.rolling(index_column="counter", period="2i").agg(
224
(np.log(pl.col("A"))).mean().alias("mean_numpy"),
225
(pl.col("A")).log().mean().alias("mean_polars"),
226
)
227
assert_series_equal(result["mean_numpy"], result["mean_polars"].alias("mean_numpy"))
228
229