Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/operations/rolling/test_rolling.py
8458 views
1
from __future__ import annotations
2
3
import random
4
from datetime import date, datetime, time, timedelta
5
from typing import TYPE_CHECKING
6
from zoneinfo import ZoneInfo
7
8
import hypothesis.strategies as st
9
import numpy as np
10
import pytest
11
from hypothesis import assume, given
12
from numpy import nan
13
14
import polars as pl
15
from polars._utils.convert import parse_as_duration_string
16
from polars.exceptions import ComputeError, InvalidOperationError
17
from polars.meta.index_type import get_index_type
18
from polars.testing import assert_frame_equal, assert_series_equal
19
from polars.testing.parametric import column, dataframes, series
20
from polars.testing.parametric.strategies.dtype import _time_units
21
from tests.unit.conftest import INTEGER_DTYPES, NUMERIC_DTYPES, TEMPORAL_DTYPES
22
23
if TYPE_CHECKING:
24
from collections.abc import Callable
25
26
from hypothesis.strategies import SearchStrategy
27
28
from polars._typing import (
29
ClosedInterval,
30
PolarsDataType,
31
QuantileMethod,
32
RankMethod,
33
TimeUnit,
34
)
35
36
37
@pytest.fixture
38
def example_df() -> pl.DataFrame:
39
return pl.DataFrame(
40
{
41
"dt": [
42
datetime(2021, 1, 1),
43
datetime(2021, 1, 2),
44
datetime(2021, 1, 4),
45
datetime(2021, 1, 5),
46
datetime(2021, 1, 7),
47
],
48
"values": pl.arange(0, 5, eager=True),
49
}
50
)
51
52
53
@pytest.mark.parametrize(
54
"period",
55
["1d", "2d", "3d", timedelta(days=1), timedelta(days=2), timedelta(days=3)],
56
)
57
@pytest.mark.parametrize("closed", ["left", "right", "none", "both"])
58
def test_rolling_kernels_and_rolling(
59
example_df: pl.DataFrame, period: str | timedelta, closed: ClosedInterval
60
) -> None:
61
out1 = example_df.set_sorted("dt").select(
62
pl.col("dt"),
63
# this differs from group_by aggregation because the empty window is
64
# null here
65
# where the sum aggregation of an empty set is 0
66
pl.col("values")
67
.rolling_sum_by("dt", period, closed=closed)
68
.fill_null(0)
69
.alias("sum"),
70
pl.col("values").rolling_var_by("dt", period, closed=closed).alias("var"),
71
pl.col("values").rolling_mean_by("dt", period, closed=closed).alias("mean"),
72
pl.col("values").rolling_std_by("dt", period, closed=closed).alias("std"),
73
pl.col("values")
74
.rolling_quantile_by("dt", period, quantile=0.2, closed=closed)
75
.alias("quantile"),
76
)
77
out2 = (
78
example_df.set_sorted("dt")
79
.rolling("dt", period=period, closed=closed)
80
.agg(
81
[
82
pl.col("values").sum().alias("sum"),
83
pl.col("values").var().alias("var"),
84
pl.col("values").mean().alias("mean"),
85
pl.col("values").std().alias("std"),
86
pl.col("values").quantile(quantile=0.2).alias("quantile"),
87
]
88
)
89
)
90
assert_frame_equal(out1, out2)
91
92
93
@pytest.mark.parametrize(
94
"period",
95
["1d", "2d", "3d", timedelta(days=1), timedelta(days=2), timedelta(days=3)],
96
)
97
@pytest.mark.parametrize("closed", ["right", "both"])
98
def test_rolling_rank_kernels_and_rolling(
99
example_df: pl.DataFrame, period: str | timedelta, closed: ClosedInterval
100
) -> None:
101
out1 = example_df.set_sorted("dt").select(
102
pl.col("dt"),
103
pl.col("values").rolling_rank_by("dt", period, closed=closed).alias("rank"),
104
)
105
out2 = (
106
example_df.set_sorted("dt")
107
.rolling("dt", period=period, closed=closed)
108
.agg([pl.col("values").rank().last().alias("rank")])
109
)
110
assert_frame_equal(out1, out2)
111
112
113
@pytest.mark.parametrize("closed", ["left", "none"])
114
def test_rolling_rank_needs_closed_right(
115
example_df: pl.DataFrame, closed: ClosedInterval
116
) -> None:
117
pat = r"`rolling_rank_by` window needs to be closed on the right side \(i.e., `closed` must be `right` or `both`\)"
118
with pytest.raises(InvalidOperationError, match=pat):
119
example_df.set_sorted("dt").select(
120
pl.col("values").rolling_rank_by("dt", "2d", closed=closed).alias("rank"),
121
)
122
123
124
@pytest.mark.parametrize(
125
("offset", "closed", "expected_values"),
126
[
127
pytest.param(
128
"-1d",
129
"left",
130
[[1], [1, 2], [2, 3], [3, 4]],
131
id="partial lookbehind, left",
132
),
133
pytest.param(
134
"-1d",
135
"right",
136
[[1, 2], [2, 3], [3, 4], [4]],
137
id="partial lookbehind, right",
138
),
139
pytest.param(
140
"-1d",
141
"both",
142
[[1, 2], [1, 2, 3], [2, 3, 4], [3, 4]],
143
id="partial lookbehind, both",
144
),
145
pytest.param(
146
"-1d",
147
"none",
148
[[1], [2], [3], [4]],
149
id="partial lookbehind, none",
150
),
151
pytest.param(
152
"-2d",
153
"left",
154
[[], [1], [1, 2], [2, 3]],
155
id="full lookbehind, left",
156
),
157
pytest.param(
158
"-3d",
159
"left",
160
[[], [], [1], [1, 2]],
161
id="full lookbehind, offset > period, left",
162
),
163
pytest.param(
164
"-3d",
165
"right",
166
[[], [1], [1, 2], [2, 3]],
167
id="full lookbehind, right",
168
),
169
pytest.param(
170
"-3d",
171
"both",
172
[[], [1], [1, 2], [1, 2, 3]],
173
id="full lookbehind, both",
174
),
175
pytest.param(
176
"-2d",
177
"none",
178
[[], [1], [2], [3]],
179
id="full lookbehind, none",
180
),
181
pytest.param(
182
"-3d",
183
"none",
184
[[], [], [1], [2]],
185
id="full lookbehind, offset > period, none",
186
),
187
],
188
)
189
def test_rolling_negative_offset(
190
offset: str, closed: ClosedInterval, expected_values: list[list[int]]
191
) -> None:
192
df = pl.DataFrame(
193
{
194
"ts": pl.datetime_range(
195
datetime(2021, 1, 1), datetime(2021, 1, 4), "1d", eager=True
196
),
197
"value": [1, 2, 3, 4],
198
}
199
)
200
result = df.rolling("ts", period="2d", offset=offset, closed=closed).agg(
201
pl.col("value")
202
)
203
expected = pl.DataFrame(
204
{
205
"ts": pl.datetime_range(
206
datetime(2021, 1, 1), datetime(2021, 1, 4), "1d", eager=True
207
),
208
"value": expected_values,
209
}
210
)
211
assert_frame_equal(result, expected)
212
213
214
def test_rolling_skew() -> None:
215
s = pl.Series([1, 2, 3, 3, 2, 10, 8])
216
assert s.rolling_skew(window_size=4, bias=True).to_list() == pytest.approx(
217
[
218
None,
219
None,
220
None,
221
-0.49338220021815865,
222
0.0,
223
1.097025449363867,
224
0.09770939201338157,
225
]
226
)
227
228
assert s.rolling_skew(window_size=4, bias=False).to_list() == pytest.approx(
229
[
230
None,
231
None,
232
None,
233
-0.8545630383279711,
234
0.0,
235
1.9001038154942962,
236
0.16923763134384154,
237
]
238
)
239
240
241
def test_rolling_kurtosis() -> None:
242
s = pl.Series([1, 2, 3, 3, 2, 10, 8])
243
assert s.rolling_kurtosis(window_size=4, bias=True).to_list() == pytest.approx(
244
[
245
None,
246
None,
247
None,
248
-1.371900826446281,
249
-1.9999999999999991,
250
-0.7055324211778693,
251
-1.7878967572797346,
252
]
253
)
254
assert s.rolling_kurtosis(
255
window_size=4, bias=True, fisher=False
256
).to_list() == pytest.approx(
257
[
258
None,
259
None,
260
None,
261
1.628099173553719,
262
1.0000000000000009,
263
2.2944675788221307,
264
1.2121032427202654,
265
]
266
)
267
268
269
@pytest.mark.parametrize("time_zone", [None, "America/Chicago"])
270
@pytest.mark.parametrize(
271
("rolling_fn", "expected_values", "expected_dtype"),
272
[
273
("rolling_mean_by", [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], pl.Float64),
274
("rolling_sum_by", [1, 2, 3, 4, 5, 6], pl.Int64),
275
("rolling_min_by", [1, 2, 3, 4, 5, 6], pl.Int64),
276
("rolling_max_by", [1, 2, 3, 4, 5, 6], pl.Int64),
277
("rolling_std_by", [None, None, None, None, None, None], pl.Float64),
278
("rolling_var_by", [None, None, None, None, None, None], pl.Float64),
279
("rolling_rank_by", [1.0, 1.0, 1.0, 1.0, 1.0, 1.0], pl.Float64),
280
],
281
)
282
def test_rolling_crossing_dst(
283
time_zone: str | None,
284
rolling_fn: str,
285
expected_values: list[int | None | float],
286
expected_dtype: PolarsDataType,
287
) -> None:
288
ts = pl.datetime_range(
289
datetime(2021, 11, 5), datetime(2021, 11, 10), "1d", time_zone="UTC", eager=True
290
).dt.replace_time_zone(time_zone)
291
df = pl.DataFrame({"ts": ts, "value": [1, 2, 3, 4, 5, 6]})
292
293
result = df.with_columns(
294
getattr(pl.col("value"), rolling_fn)(by="ts", window_size="1d", closed="right")
295
)
296
297
expected = pl.DataFrame(
298
{"ts": ts, "value": expected_values}, schema_overrides={"value": expected_dtype}
299
)
300
assert_frame_equal(result, expected)
301
302
303
def test_rolling_by_invalid() -> None:
304
df = pl.DataFrame(
305
{"a": [1, 2, 3], "b": [4, 5, 6]}, schema_overrides={"a": pl.Int16}
306
).sort("a")
307
msg = "unsupported data type: i16 for temporal/index column, expected UInt64, UInt32, Int64, Int32, Datetime, Date, Duration, or Time"
308
with pytest.raises(InvalidOperationError, match=msg):
309
df.select(pl.col("b").rolling_min_by("a", "2i"))
310
df = pl.DataFrame({"a": [1, 2, 3], "b": [date(2020, 1, 1)] * 3}).sort("b")
311
msg = "`window_size` duration may not be a parsed integer"
312
with pytest.raises(InvalidOperationError, match=msg):
313
df.select(pl.col("a").rolling_min_by("b", "2i"))
314
315
316
def test_rolling_infinity() -> None:
317
s = pl.Series("col", ["-inf", "5", "5"]).cast(pl.Float64)
318
s = s.rolling_mean(2)
319
expected = pl.Series("col", [None, "-inf", "5"]).cast(pl.Float64)
320
assert_series_equal(s, expected)
321
322
323
def test_rolling_by_non_temporal_window_size() -> None:
324
df = pl.DataFrame(
325
{"a": [4, 5, 6], "b": [date(2020, 1, 1), date(2020, 1, 2), date(2020, 1, 3)]}
326
).sort("a", "b")
327
msg = "`window_size` duration may not be a parsed integer"
328
with pytest.raises(InvalidOperationError, match=msg):
329
df.with_columns(pl.col("a").rolling_sum_by("b", "2i", closed="left"))
330
331
332
@pytest.mark.parametrize(
333
"dtype",
334
[
335
pl.UInt8,
336
pl.Int64,
337
pl.Float32,
338
pl.Float64,
339
pl.Time,
340
pl.Date,
341
pl.Datetime("ms"),
342
pl.Datetime("us"),
343
pl.Datetime("ns"),
344
pl.Datetime("ns", "Asia/Kathmandu"),
345
pl.Duration("ms"),
346
pl.Duration("us"),
347
pl.Duration("ns"),
348
],
349
)
350
def test_rolling_extrema(dtype: PolarsDataType) -> None:
351
# sorted data and nulls flags trigger different kernels
352
df = (
353
(
354
pl.DataFrame(
355
{
356
"col1": pl.int_range(0, 7, eager=True),
357
"col2": pl.int_range(0, 7, eager=True).reverse(),
358
}
359
)
360
)
361
.with_columns(
362
pl.when(pl.int_range(0, pl.len(), eager=False) < 2)
363
.then(None)
364
.otherwise(pl.all())
365
.name.keep()
366
.name.suffix("_nulls")
367
)
368
.cast(dtype)
369
)
370
371
expected = {
372
"col1": [None, None, 0, 1, 2, 3, 4],
373
"col2": [None, None, 4, 3, 2, 1, 0],
374
"col1_nulls": [None, None, None, None, 2, 3, 4],
375
"col2_nulls": [None, None, None, None, 2, 1, 0],
376
}
377
result = df.select([pl.all().rolling_min(3)])
378
assert result.to_dict(as_series=False) == {
379
k: pl.Series(v, dtype=dtype).to_list() for k, v in expected.items()
380
}
381
382
expected = {
383
"col1": [None, None, 2, 3, 4, 5, 6],
384
"col2": [None, None, 6, 5, 4, 3, 2],
385
"col1_nulls": [None, None, None, None, 4, 5, 6],
386
"col2_nulls": [None, None, None, None, 4, 3, 2],
387
}
388
result = df.select([pl.all().rolling_max(3)])
389
assert result.to_dict(as_series=False) == {
390
k: pl.Series(v, dtype=dtype).to_list() for k, v in expected.items()
391
}
392
393
# shuffled data triggers other kernels
394
df = df.select([pl.all().shuffle(seed=0)])
395
expected = {
396
"col1": [None, None, 0, 0, 4, 1, 1],
397
"col2": [None, None, 1, 1, 0, 0, 0],
398
"col1_nulls": [None, None, None, None, 4, None, None],
399
"col2_nulls": [None, None, None, None, 0, None, None],
400
}
401
result = df.select([pl.all().rolling_min(3)])
402
assert result.to_dict(as_series=False) == {
403
k: pl.Series(v, dtype=dtype).to_list() for k, v in expected.items()
404
}
405
result = df.select([pl.all().rolling_max(3)])
406
407
expected = {
408
"col1": [None, None, 5, 5, 6, 6, 6],
409
"col2": [None, None, 6, 6, 2, 5, 5],
410
"col1_nulls": [None, None, None, None, 6, None, None],
411
"col2_nulls": [None, None, None, None, 2, None, None],
412
}
413
assert result.to_dict(as_series=False) == {
414
k: pl.Series(v, dtype=dtype).to_list() for k, v in expected.items()
415
}
416
417
418
@pytest.mark.parametrize(
419
"dtype",
420
[
421
pl.UInt8,
422
pl.Int64,
423
pl.Float32,
424
pl.Float64,
425
pl.Time,
426
pl.Date,
427
pl.Datetime("ms"),
428
pl.Datetime("us"),
429
pl.Datetime("ns"),
430
pl.Datetime("ns", "Asia/Kathmandu"),
431
pl.Duration("ms"),
432
pl.Duration("us"),
433
pl.Duration("ns"),
434
],
435
)
436
def test_rolling_group_by_extrema(dtype: PolarsDataType) -> None:
437
# ensure we hit different branches so create
438
439
df = pl.DataFrame(
440
{
441
"col1": pl.arange(0, 7, eager=True).reverse(),
442
}
443
).with_columns(
444
pl.col("col1").reverse().alias("index"),
445
pl.col("col1").cast(dtype),
446
)
447
448
expected = {
449
"col1_list": pl.Series(
450
[
451
[6],
452
[6, 5],
453
[6, 5, 4],
454
[5, 4, 3],
455
[4, 3, 2],
456
[3, 2, 1],
457
[2, 1, 0],
458
],
459
dtype=pl.List(dtype),
460
).to_list(),
461
"col1_min": pl.Series([6, 5, 4, 3, 2, 1, 0], dtype=dtype).to_list(),
462
"col1_max": pl.Series([6, 6, 6, 5, 4, 3, 2], dtype=dtype).to_list(),
463
"col1_first": pl.Series([6, 6, 6, 5, 4, 3, 2], dtype=dtype).to_list(),
464
"col1_last": pl.Series([6, 5, 4, 3, 2, 1, 0], dtype=dtype).to_list(),
465
}
466
result = (
467
df.rolling(
468
index_column="index",
469
period="3i",
470
)
471
.agg(
472
[
473
pl.col("col1").name.suffix("_list"),
474
pl.col("col1").min().name.suffix("_min"),
475
pl.col("col1").max().name.suffix("_max"),
476
pl.col("col1").first().alias("col1_first"),
477
pl.col("col1").last().alias("col1_last"),
478
]
479
)
480
.select(["col1_list", "col1_min", "col1_max", "col1_first", "col1_last"])
481
)
482
assert result.to_dict(as_series=False) == expected
483
484
# ascending order
485
486
df = pl.DataFrame(
487
{
488
"col1": pl.arange(0, 7, eager=True),
489
}
490
).with_columns(
491
pl.col("col1").alias("index"),
492
pl.col("col1").cast(dtype),
493
)
494
495
result = (
496
df.rolling(
497
index_column="index",
498
period="3i",
499
)
500
.agg(
501
[
502
pl.col("col1").name.suffix("_list"),
503
pl.col("col1").min().name.suffix("_min"),
504
pl.col("col1").max().name.suffix("_max"),
505
pl.col("col1").first().alias("col1_first"),
506
pl.col("col1").last().alias("col1_last"),
507
]
508
)
509
.select(["col1_list", "col1_min", "col1_max", "col1_first", "col1_last"])
510
)
511
expected = {
512
"col1_list": pl.Series(
513
[
514
[0],
515
[0, 1],
516
[0, 1, 2],
517
[1, 2, 3],
518
[2, 3, 4],
519
[3, 4, 5],
520
[4, 5, 6],
521
],
522
dtype=pl.List(dtype),
523
).to_list(),
524
"col1_min": pl.Series([0, 0, 0, 1, 2, 3, 4], dtype=dtype).to_list(),
525
"col1_max": pl.Series([0, 1, 2, 3, 4, 5, 6], dtype=dtype).to_list(),
526
"col1_first": pl.Series([0, 0, 0, 1, 2, 3, 4], dtype=dtype).to_list(),
527
"col1_last": pl.Series([0, 1, 2, 3, 4, 5, 6], dtype=dtype).to_list(),
528
}
529
assert result.to_dict(as_series=False) == expected
530
531
# shuffled data.
532
df = pl.DataFrame(
533
{
534
"col1": pl.arange(0, 7, eager=True).shuffle(1),
535
}
536
).with_columns(
537
pl.col("col1").cast(dtype),
538
pl.col("col1").sort().alias("index"),
539
)
540
541
result = (
542
df.rolling(
543
index_column="index",
544
period="3i",
545
)
546
.agg(
547
[
548
pl.col("col1").min().name.suffix("_min"),
549
pl.col("col1").max().name.suffix("_max"),
550
pl.col("col1").name.suffix("_list"),
551
]
552
)
553
.select(["col1_list", "col1_min", "col1_max"])
554
)
555
expected = {
556
"col1_list": pl.Series(
557
[
558
[4],
559
[4, 2],
560
[4, 2, 5],
561
[2, 5, 1],
562
[5, 1, 6],
563
[1, 6, 0],
564
[6, 0, 3],
565
],
566
dtype=pl.List(dtype),
567
).to_list(),
568
"col1_min": pl.Series([4, 2, 2, 1, 1, 0, 0], dtype=dtype).to_list(),
569
"col1_max": pl.Series([4, 4, 5, 5, 6, 6, 6], dtype=dtype).to_list(),
570
}
571
assert result.to_dict(as_series=False) == expected
572
573
574
def test_rolling_slice_pushdown() -> None:
575
df = pl.DataFrame({"a": [1, 2, 3], "b": ["a", "a", "b"], "c": [1, 3, 5]}).lazy()
576
df = (
577
df.sort("a")
578
.rolling(
579
"a",
580
group_by="b",
581
period="2i",
582
)
583
.agg([(pl.col("c") - pl.col("c").shift(fill_value=0)).sum().alias("c")])
584
)
585
assert df.head(2).collect().to_dict(as_series=False) == {
586
"b": ["a", "a"],
587
"a": [1, 2],
588
"c": [1, 3],
589
}
590
591
592
def test_overlapping_groups_4628() -> None:
593
df = pl.DataFrame(
594
{
595
"index": [1, 2, 3, 4, 5, 6],
596
"val": [10, 20, 40, 70, 110, 160],
597
}
598
)
599
assert (
600
df.rolling(index_column=pl.col("index").set_sorted(), period="3i").agg(
601
[
602
pl.col("val").diff(n=1).alias("val.diff"),
603
(pl.col("val") - pl.col("val").shift(1)).alias("val - val.shift"),
604
]
605
)
606
).to_dict(as_series=False) == {
607
"index": [1, 2, 3, 4, 5, 6],
608
"val.diff": [
609
[None],
610
[None, 10],
611
[None, 10, 20],
612
[None, 20, 30],
613
[None, 30, 40],
614
[None, 40, 50],
615
],
616
"val - val.shift": [
617
[None],
618
[None, 10],
619
[None, 10, 20],
620
[None, 20, 30],
621
[None, 30, 40],
622
[None, 40, 50],
623
],
624
}
625
626
627
def test_rolling_skew_lagging_null_5179() -> None:
628
s = pl.Series([None, 3, 4, 1, None, None, None, None, 3, None, 5, 4, 7, 2, 1, None])
629
result = s.rolling_skew(3, min_samples=1).fill_nan(-1.0)
630
expected = pl.Series(
631
[
632
None,
633
-1.0,
634
0.0,
635
-0.3818017741606059,
636
0.0,
637
-1.0,
638
None,
639
None,
640
-1.0,
641
-1.0,
642
0.0,
643
0.0,
644
0.38180177416060695,
645
0.23906314692954517,
646
0.6309038567106234,
647
0.0,
648
]
649
)
650
assert_series_equal(result, expected, check_names=False)
651
652
653
def test_rolling_var_numerical_stability_5197() -> None:
654
s = pl.Series([*[1.2] * 4, *[3.3] * 7])
655
res = s.to_frame("a").with_columns(pl.col("a").rolling_var(5))[:, 0].to_list()
656
assert res[4:] == pytest.approx(
657
[
658
0.882,
659
1.3229999999999997,
660
1.3229999999999997,
661
0.8819999999999983,
662
0.0,
663
0.0,
664
0.0,
665
]
666
)
667
assert res[:4] == [None] * 4
668
669
670
def test_rolling_iter() -> None:
671
df = pl.DataFrame(
672
{
673
"date": [date(2020, 1, 1), date(2020, 1, 2), date(2020, 1, 5)],
674
"a": [1, 2, 2],
675
"b": [4, 5, 6],
676
}
677
).set_sorted("date")
678
679
# Without 'by' argument
680
result1 = [
681
(name[0], data.shape)
682
for name, data in df.rolling(index_column="date", period="2d")
683
]
684
expected1 = [
685
(date(2020, 1, 1), (1, 3)),
686
(date(2020, 1, 2), (2, 3)),
687
(date(2020, 1, 5), (1, 3)),
688
]
689
assert result1 == expected1
690
691
# With 'by' argument
692
result2 = [
693
(name, data.shape)
694
for name, data in df.rolling(index_column="date", period="2d", group_by="a")
695
]
696
expected2 = [
697
((1, date(2020, 1, 1)), (1, 3)),
698
((2, date(2020, 1, 2)), (1, 3)),
699
((2, date(2020, 1, 5)), (1, 3)),
700
]
701
assert result2 == expected2
702
703
704
def test_rolling_negative_period() -> None:
705
df = pl.DataFrame({"ts": [datetime(2020, 1, 1)], "value": [1]}).with_columns(
706
pl.col("ts").set_sorted()
707
)
708
with pytest.raises(
709
ComputeError, match="rolling window period should be strictly positive"
710
):
711
df.rolling("ts", period="-1d", offset="-1d").agg(pl.col("value"))
712
with pytest.raises(
713
ComputeError, match="rolling window period should be strictly positive"
714
):
715
df.lazy().rolling("ts", period="-1d", offset="-1d").agg(
716
pl.col("value")
717
).collect()
718
with pytest.raises(
719
InvalidOperationError, match="`window_size` must be strictly positive"
720
):
721
df.select(
722
pl.col("value").rolling_min_by("ts", window_size="-1d", closed="left")
723
)
724
with pytest.raises(
725
InvalidOperationError, match="`window_size` must be strictly positive"
726
):
727
df.lazy().select(
728
pl.col("value").rolling_min_by("ts", window_size="-1d", closed="left")
729
).collect()
730
731
732
def test_rolling_skew_window_offset() -> None:
733
assert (pl.arange(0, 20, eager=True) ** 2).rolling_skew(20)[
734
-1
735
] == 0.6612545648596286
736
737
738
def test_rolling_cov_corr() -> None:
739
df = pl.DataFrame({"x": [3, 3, 3, 5, 8], "y": [3, 4, 4, 4, 8]})
740
741
res = df.select(
742
pl.rolling_cov("x", "y", window_size=3).alias("cov"),
743
pl.rolling_corr("x", "y", window_size=3).alias("corr"),
744
).to_dict(as_series=False)
745
assert res["cov"][2:] == pytest.approx([0.0, 0.0, 5.333333333333336])
746
assert res["corr"][2:] == pytest.approx([nan, 0.0, 0.9176629354822473], nan_ok=True)
747
assert res["cov"][:2] == [None] * 2
748
assert res["corr"][:2] == [None] * 2
749
750
751
def test_rolling_cov_corr_nulls() -> None:
752
df1 = pl.DataFrame(
753
{"a": [1.06, 1.07, 0.93, 0.78, 0.85], "lag_a": [1.0, 1.06, 1.07, 0.93, 0.78]}
754
)
755
df2 = pl.DataFrame(
756
{
757
"a": [1.0, 1.06, 1.07, 0.93, 0.78, 0.85],
758
"lag_a": [None, 1.0, 1.06, 1.07, 0.93, 0.78],
759
}
760
)
761
762
val_1 = df1.select(
763
pl.rolling_corr("a", "lag_a", window_size=10, min_samples=5, ddof=1)
764
)
765
val_2 = df2.select(
766
pl.rolling_corr("a", "lag_a", window_size=10, min_samples=5, ddof=1)
767
)
768
769
df1_expected = pl.DataFrame({"a": [None, None, None, None, 0.62204709]})
770
df2_expected = pl.DataFrame({"a": [None, None, None, None, None, 0.62204709]})
771
772
assert_frame_equal(val_1, df1_expected, abs_tol=0.0000001)
773
assert_frame_equal(val_2, df2_expected, abs_tol=0.0000001)
774
775
val_1 = df1.select(
776
pl.rolling_cov("a", "lag_a", window_size=10, min_samples=5, ddof=1)
777
)
778
val_2 = df2.select(
779
pl.rolling_cov("a", "lag_a", window_size=10, min_samples=5, ddof=1)
780
)
781
782
df1_expected = pl.DataFrame({"a": [None, None, None, None, 0.009445]})
783
df2_expected = pl.DataFrame({"a": [None, None, None, None, None, 0.009445]})
784
785
assert_frame_equal(val_1, df1_expected, abs_tol=0.0000001)
786
assert_frame_equal(val_2, df2_expected, abs_tol=0.0000001)
787
788
789
@pytest.mark.parametrize("time_unit", ["ms", "us", "ns"])
790
def test_rolling_empty_window_9406(time_unit: TimeUnit) -> None:
791
datecol = pl.Series(
792
"d",
793
[datetime(2019, 1, x) for x in [16, 17, 18, 22, 23]],
794
dtype=pl.Datetime(time_unit=time_unit, time_zone=None),
795
).set_sorted()
796
rawdata = pl.Series("x", [1.1, 1.2, 1.3, 1.15, 1.25], dtype=pl.Float64)
797
rmin = pl.Series("x", [None, 1.1, 1.1, None, 1.15], dtype=pl.Float64)
798
rmax = pl.Series("x", [None, 1.1, 1.2, None, 1.15], dtype=pl.Float64)
799
df = pl.DataFrame([datecol, rawdata])
800
801
assert_frame_equal(
802
pl.DataFrame([datecol, rmax]),
803
df.select(
804
pl.col("d"),
805
pl.col("x").rolling_max_by("d", window_size="3d", closed="left"),
806
),
807
)
808
assert_frame_equal(
809
pl.DataFrame([datecol, rmin]),
810
df.select(
811
pl.col("d"),
812
pl.col("x").rolling_min_by("d", window_size="3d", closed="left"),
813
),
814
)
815
816
817
def test_rolling_weighted_quantile_10031() -> None:
818
assert_series_equal(
819
pl.Series([1, 2]).rolling_median(window_size=2, weights=[0, 1]),
820
pl.Series([None, 2.0]),
821
)
822
823
assert_series_equal(
824
pl.Series([1, 2, 3, 5]).rolling_quantile(0.7, "linear", 3, [0.1, 0.3, 0.6]),
825
pl.Series([None, None, 2.55, 4.1]),
826
)
827
828
assert_series_equal(
829
pl.Series([1, 2, 3, 5, 8]).rolling_quantile(
830
0.7, "linear", 4, [0.1, 0.2, 0, 0.3]
831
),
832
pl.Series([None, None, None, 3.5, 5.5]),
833
)
834
835
836
def test_rolling_meta_eq_10101() -> None:
837
assert pl.col("A").rolling_sum(10).meta.eq(pl.col("A").rolling_sum(10)) is True
838
839
840
def test_rolling_aggregations_unsorted_raise_10991() -> None:
841
df = pl.DataFrame(
842
{
843
"dt": [datetime(2020, 1, 3), datetime(2020, 1, 1), datetime(2020, 1, 2)],
844
"val": [1, 2, 3],
845
}
846
)
847
result = df.with_columns(roll=pl.col("val").rolling_sum_by("dt", "2d"))
848
expected = pl.DataFrame(
849
{
850
"dt": [datetime(2020, 1, 3), datetime(2020, 1, 1), datetime(2020, 1, 2)],
851
"val": [1, 2, 3],
852
"roll": [4, 2, 5],
853
}
854
)
855
assert_frame_equal(result, expected)
856
result = (
857
df.with_row_index()
858
.sort("dt")
859
.with_columns(roll=pl.col("val").rolling_sum_by("dt", "2d"))
860
.sort("index")
861
.drop("index")
862
)
863
assert_frame_equal(result, expected)
864
865
866
def test_rolling_aggregations_with_over_11225() -> None:
867
start = datetime(2001, 1, 1)
868
869
df_temporal = pl.DataFrame(
870
{
871
"date": [start + timedelta(days=k) for k in range(5)],
872
"group": ["A"] * 2 + ["B"] * 3,
873
}
874
).with_row_index()
875
876
df_temporal = df_temporal.sort("group", "date")
877
878
result = df_temporal.with_columns(
879
rolling_row_mean=pl.col("index")
880
.rolling_mean_by(
881
by="date",
882
window_size="2d",
883
closed="left",
884
)
885
.over("group")
886
)
887
expected = pl.DataFrame(
888
{
889
"index": [0, 1, 2, 3, 4],
890
"date": pl.datetime_range(date(2001, 1, 1), date(2001, 1, 5), eager=True),
891
"group": ["A", "A", "B", "B", "B"],
892
"rolling_row_mean": [None, 0.0, None, 2.0, 2.5],
893
},
894
schema_overrides={"index": pl.get_index_type()},
895
)
896
assert_frame_equal(result, expected)
897
898
899
@pytest.mark.parametrize("dtype", INTEGER_DTYPES)
900
def test_rolling_ints(dtype: PolarsDataType) -> None:
901
s = pl.Series("a", [1, 2, 3, 2, 1], dtype=dtype)
902
assert_series_equal(
903
s.rolling_min(2), pl.Series("a", [None, 1, 2, 2, 1], dtype=dtype)
904
)
905
assert_series_equal(
906
s.rolling_max(2), pl.Series("a", [None, 2, 3, 3, 2], dtype=dtype)
907
)
908
assert_series_equal(
909
s.rolling_sum(2),
910
pl.Series(
911
"a",
912
[None, 3, 5, 5, 3],
913
dtype=(
914
pl.Int64 if dtype in [pl.Int8, pl.UInt8, pl.Int16, pl.UInt16] else dtype
915
),
916
),
917
)
918
assert_series_equal(s.rolling_mean(2), pl.Series("a", [None, 1.5, 2.5, 2.5, 1.5]))
919
920
assert s.rolling_std(2).to_list()[1] == pytest.approx(0.7071067811865476)
921
assert s.rolling_var(2).to_list()[1] == pytest.approx(0.5)
922
assert s.rolling_std(2, ddof=0).to_list()[1] == pytest.approx(0.5)
923
assert s.rolling_var(2, ddof=0).to_list()[1] == pytest.approx(0.25)
924
925
assert_series_equal(
926
s.rolling_median(4), pl.Series("a", [None, None, None, 2, 2], dtype=pl.Float64)
927
)
928
assert_series_equal(
929
s.rolling_quantile(0, "nearest", 3),
930
pl.Series("a", [None, None, 1, 2, 1], dtype=pl.Float64),
931
)
932
assert_series_equal(
933
s.rolling_quantile(0, "lower", 3),
934
pl.Series("a", [None, None, 1, 2, 1], dtype=pl.Float64),
935
)
936
assert_series_equal(
937
s.rolling_quantile(0, "higher", 3),
938
pl.Series("a", [None, None, 1, 2, 1], dtype=pl.Float64),
939
)
940
assert s.rolling_skew(4).null_count() == 3
941
942
943
def test_rolling_floats() -> None:
944
# 3099
945
# test if we maintain proper dtype
946
for dt in [pl.Float32, pl.Float64]:
947
result = pl.Series([1, 2, 3], dtype=dt).rolling_min(2, weights=[0.1, 0.2])
948
expected = pl.Series([None, 0.1, 0.2], dtype=dt)
949
assert_series_equal(result, expected)
950
951
df = pl.DataFrame({"val": [1.0, 2.0, 3.0, np.nan, 5.0, 6.0, 7.0]})
952
953
for e in [
954
pl.col("val").rolling_min(window_size=3),
955
pl.col("val").rolling_max(window_size=3),
956
]:
957
out = df.with_columns(e).to_series()
958
assert out.null_count() == 2
959
assert np.isnan(out.to_numpy()).sum() == 5
960
961
expected_values = [None, None, 2.0, 3.0, 5.0, 6.0, 6.0]
962
assert (
963
df.with_columns(pl.col("val").rolling_median(window_size=3))
964
.to_series()
965
.to_list()
966
== expected_values
967
)
968
assert (
969
df.with_columns(pl.col("val").rolling_quantile(0.5, window_size=3))
970
.to_series()
971
.to_list()
972
== expected_values
973
)
974
975
nan = float("nan")
976
s = pl.Series("a", [11.0, 2.0, 9.0, nan, 8.0])
977
assert_series_equal(
978
s.rolling_sum(3),
979
pl.Series("a", [None, None, 22.0, nan, nan]),
980
)
981
982
983
def test_rolling_std_nulls_min_samples_1_20076() -> None:
984
result = pl.Series([1, 2, None, 4]).rolling_std(3, min_samples=1)
985
expected = pl.Series(
986
[None, 0.7071067811865476, 0.7071067811865476, 1.4142135623730951]
987
)
988
assert_series_equal(result, expected)
989
990
991
@pytest.mark.parametrize(
992
("bools", "window", "expected"),
993
[
994
(
995
[[True, False, True]],
996
2,
997
[[None, 1, 1]],
998
),
999
(
1000
[[True, False, True, True, False, False, False, True, True]],
1001
4,
1002
[[None, None, None, 3, 2, 2, 1, 1, 2]],
1003
),
1004
],
1005
)
1006
def test_rolling_eval_boolean_list(
1007
bools: list[list[bool]], window: int, expected: list[list[int]]
1008
) -> None:
1009
for accessor, dtype in (
1010
("list", pl.List(pl.Boolean)),
1011
("arr", pl.Array(pl.Boolean, shape=len(bools[0]))),
1012
):
1013
s = pl.Series(name="bools", values=bools, dtype=dtype)
1014
res = getattr(s, accessor).eval(pl.element().rolling_sum(window)).to_list()
1015
assert res == expected
1016
1017
1018
def test_rolling_by_date() -> None:
1019
df = pl.DataFrame(
1020
{
1021
"dt": [date(2020, 1, 1), date(2020, 1, 2), date(2020, 1, 3)],
1022
"val": [1, 2, 3],
1023
}
1024
).sort("dt")
1025
1026
result = df.with_columns(roll=pl.col("val").rolling_sum_by("dt", "2d"))
1027
expected = df.with_columns(roll=pl.Series([1, 3, 5]))
1028
assert_frame_equal(result, expected)
1029
1030
1031
@pytest.mark.parametrize("dtype", [pl.Int64, pl.Int32, pl.UInt64, pl.UInt32])
1032
def test_rolling_by_integer(dtype: PolarsDataType) -> None:
1033
df = (
1034
pl.DataFrame({"val": [1, 2, 3]})
1035
.with_row_index()
1036
.with_columns(pl.col("index").cast(dtype))
1037
)
1038
result = df.with_columns(roll=pl.col("val").rolling_sum_by("index", "2i"))
1039
expected = df.with_columns(roll=pl.Series([1, 3, 5]))
1040
assert_frame_equal(result, expected)
1041
1042
1043
@pytest.mark.parametrize("dtype", INTEGER_DTYPES)
1044
def test_rolling_sum_by_integer(dtype: PolarsDataType) -> None:
1045
lf = (
1046
pl.LazyFrame({"a": [1, 2, 3]}, schema={"a": dtype})
1047
.with_row_index()
1048
.select(pl.col("a").rolling_sum_by("index", "2i"))
1049
)
1050
result = lf.collect()
1051
expected_dtype = (
1052
pl.Int64 if dtype in [pl.Int8, pl.UInt8, pl.Int16, pl.UInt16] else dtype
1053
)
1054
expected = pl.DataFrame({"a": [1, 3, 5]}, schema={"a": expected_dtype})
1055
assert_frame_equal(result, expected)
1056
assert lf.collect_schema() == expected.schema
1057
1058
1059
def test_rolling_nanoseconds_11003() -> None:
1060
df = pl.DataFrame(
1061
{
1062
"dt": [
1063
"2020-01-01T00:00:00.000000000",
1064
"2020-01-01T00:00:00.000000100",
1065
"2020-01-01T00:00:00.000000200",
1066
],
1067
"val": [1, 2, 3],
1068
}
1069
)
1070
df = df.with_columns(pl.col("dt").str.to_datetime(time_unit="ns")).set_sorted("dt")
1071
result = df.with_columns(pl.col("val").rolling_sum_by("dt", "500ns"))
1072
expected = df.with_columns(val=pl.Series([1, 3, 6]))
1073
assert_frame_equal(result, expected)
1074
1075
1076
def test_rolling_by_1mo_saturating_12216() -> None:
1077
df = pl.DataFrame(
1078
{
1079
"date": [
1080
date(2020, 6, 29),
1081
date(2020, 6, 30),
1082
date(2020, 7, 30),
1083
date(2020, 7, 31),
1084
date(2020, 8, 1),
1085
],
1086
"val": [1, 2, 3, 4, 5],
1087
}
1088
).set_sorted("date")
1089
result = df.rolling(index_column="date", period="1mo").agg(vals=pl.col("val"))
1090
expected = pl.DataFrame(
1091
{
1092
"date": [
1093
date(2020, 6, 29),
1094
date(2020, 6, 30),
1095
date(2020, 7, 30),
1096
date(2020, 7, 31),
1097
date(2020, 8, 1),
1098
],
1099
"vals": [[1], [1, 2], [3], [3, 4], [3, 4, 5]],
1100
}
1101
)
1102
assert_frame_equal(result, expected)
1103
1104
# check with `closed='both'` against DuckDB output
1105
result = df.rolling(index_column="date", period="1mo", closed="both").agg(
1106
vals=pl.col("val")
1107
)
1108
expected = pl.DataFrame(
1109
{
1110
"date": [
1111
date(2020, 6, 29),
1112
date(2020, 6, 30),
1113
date(2020, 7, 30),
1114
date(2020, 7, 31),
1115
date(2020, 8, 1),
1116
],
1117
"vals": [[1], [1, 2], [2, 3], [2, 3, 4], [3, 4, 5]],
1118
}
1119
)
1120
assert_frame_equal(result, expected)
1121
1122
1123
def test_index_expr_with_literal() -> None:
1124
df = pl.DataFrame({"a": [1, 2, 3], "b": ["a", "b", "c"]}).sort("a")
1125
out = df.rolling(index_column=(5 * pl.col("a")).set_sorted(), period="2i").agg(
1126
pl.col("b")
1127
)
1128
expected = pl.DataFrame({"literal": [5, 10, 15], "b": [["a"], ["b"], ["c"]]})
1129
assert_frame_equal(out, expected)
1130
1131
1132
def test_index_expr_output_name_12244() -> None:
1133
df = pl.DataFrame({"A": [1, 2, 3]})
1134
1135
out = df.rolling(pl.int_range(0, pl.len()), period="2i").agg("A")
1136
assert out.to_dict(as_series=False) == {
1137
"literal": [0, 1, 2],
1138
"A": [[1], [1, 2], [2, 3]],
1139
}
1140
1141
1142
def test_rolling_median() -> None:
1143
for n in range(10, 25):
1144
array = np.random.randint(0, 20, n)
1145
for k in [3, 5, 7]:
1146
a = pl.Series(array)
1147
assert_series_equal(
1148
a.rolling_median(k), pl.from_pandas(a.to_pandas().rolling(k).median())
1149
)
1150
1151
1152
@pytest.mark.slow
1153
def test_rolling_median_2() -> None:
1154
np.random.seed(12)
1155
n = 1000
1156
df = pl.DataFrame({"x": np.random.normal(0, 1, n)})
1157
# this can differ because simd sizes and non-associativity of floats.
1158
assert df.select(
1159
pl.col("x").rolling_median(window_size=10).sum()
1160
).item() == pytest.approx(5.139429061527812)
1161
assert df.select(
1162
pl.col("x").rolling_median(window_size=100).sum()
1163
).item() == pytest.approx(26.60506093611384)
1164
1165
1166
@pytest.mark.parametrize(
1167
("dates", "closed", "expected"),
1168
[
1169
(
1170
[date(2020, 1, 1), date(2020, 1, 2), date(2020, 1, 3)],
1171
"right",
1172
[None, 3, 5],
1173
),
1174
(
1175
[date(2020, 1, 1), date(2020, 1, 2), date(2020, 1, 3)],
1176
"left",
1177
[None, None, 3],
1178
),
1179
(
1180
[date(2020, 1, 1), date(2020, 1, 2), date(2020, 1, 3)],
1181
"both",
1182
[None, 3, 6],
1183
),
1184
(
1185
[date(2020, 1, 1), date(2020, 1, 2), date(2020, 1, 3)],
1186
"none",
1187
[None, None, None],
1188
),
1189
(
1190
[date(2020, 1, 1), date(2020, 1, 2), date(2020, 1, 4)],
1191
"right",
1192
[None, 3, None],
1193
),
1194
(
1195
[date(2020, 1, 1), date(2020, 1, 3), date(2020, 1, 4)],
1196
"right",
1197
[None, None, 5],
1198
),
1199
(
1200
[date(2020, 1, 1), date(2020, 1, 3), date(2020, 1, 5)],
1201
"right",
1202
[None, None, None],
1203
),
1204
],
1205
)
1206
def test_rolling_min_samples(
1207
dates: list[date], closed: ClosedInterval, expected: list[int]
1208
) -> None:
1209
df = pl.DataFrame({"date": dates, "value": [1, 2, 3]}).sort("date")
1210
result = df.select(
1211
pl.col("value").rolling_sum_by(
1212
"date", window_size="2d", min_samples=2, closed=closed
1213
)
1214
)["value"]
1215
assert_series_equal(result, pl.Series("value", expected, pl.Int64))
1216
1217
# Starting with unsorted data
1218
result = (
1219
df.sort("date", descending=True)
1220
.with_columns(
1221
pl.col("value").rolling_sum_by(
1222
"date", window_size="2d", min_samples=2, closed=closed
1223
)
1224
)
1225
.sort("date")["value"]
1226
)
1227
assert_series_equal(result, pl.Series("value", expected, pl.Int64))
1228
1229
1230
def test_rolling_returns_scalar_15656() -> None:
1231
df = pl.DataFrame(
1232
{
1233
"a": [date(2020, 1, 1), date(2020, 1, 2), date(2020, 1, 3)],
1234
"b": [4, 5, 6],
1235
"c": [1, 2, 3],
1236
}
1237
)
1238
result = df.group_by("c").agg(pl.col("b").rolling_mean_by("a", "2d")).sort("c")
1239
expected = pl.DataFrame({"c": [1, 2, 3], "b": [[4.0], [5.0], [6.0]]})
1240
assert_frame_equal(result, expected)
1241
1242
1243
def test_rolling_invalid() -> None:
1244
df = pl.DataFrame(
1245
{
1246
"values": [1, 4],
1247
"times": [datetime(2020, 1, 3), datetime(2020, 1, 1)],
1248
},
1249
)
1250
with pytest.raises(
1251
InvalidOperationError, match="duration may not be a parsed integer"
1252
):
1253
(
1254
df.sort("times")
1255
.rolling("times", period="3000i")
1256
.agg(pl.col("values").sum().alias("sum"))
1257
)
1258
with pytest.raises(
1259
InvalidOperationError, match="duration must be a parsed integer"
1260
):
1261
(
1262
df.with_row_index()
1263
.rolling("index", period="3000d")
1264
.agg(pl.col("values").sum().alias("sum"))
1265
)
1266
1267
1268
def test_by_different_length() -> None:
1269
df = pl.DataFrame({"b": [1]})
1270
with pytest.raises(InvalidOperationError, match="must be the same length"):
1271
df.select(
1272
pl.col("b").rolling_max_by(pl.Series([datetime(2020, 1, 1)] * 2), "1d")
1273
)
1274
1275
1276
def test_incorrect_nulls_16246() -> None:
1277
df = pl.concat(
1278
[
1279
pl.DataFrame({"a": [datetime(2020, 1, 1)], "b": [1]}),
1280
pl.DataFrame({"a": [datetime(2021, 1, 1)], "b": [1]}),
1281
],
1282
rechunk=False,
1283
)
1284
result = df.select(pl.col("b").rolling_max_by("a", "1d"))
1285
expected = pl.DataFrame({"b": [1, 1]})
1286
assert_frame_equal(result, expected)
1287
1288
1289
def test_rolling_with_dst() -> None:
1290
df = pl.DataFrame(
1291
{"a": [datetime(2020, 10, 26, 1), datetime(2020, 10, 26)], "b": [1, 2]}
1292
).with_columns(pl.col("a").dt.replace_time_zone("Europe/London"))
1293
result = df.select(pl.col("b").rolling_sum_by("a", "1d"))
1294
expected = pl.DataFrame({"b": [3, 2]})
1295
assert_frame_equal(result, expected)
1296
1297
result = df.sort("a").select(pl.col("b").rolling_sum_by("a", "1d"))
1298
expected = pl.DataFrame({"b": [2, 3]})
1299
assert_frame_equal(result, expected)
1300
1301
1302
def interval_defs() -> SearchStrategy[ClosedInterval]:
1303
closed: list[ClosedInterval] = ["left", "right", "both", "none"]
1304
return st.sampled_from(closed)
1305
1306
1307
@given(
1308
period=st.timedeltas(
1309
min_value=timedelta(microseconds=0), max_value=timedelta(days=1000)
1310
).map(parse_as_duration_string),
1311
offset=st.timedeltas(
1312
min_value=timedelta(days=-1000), max_value=timedelta(days=1000)
1313
).map(parse_as_duration_string),
1314
closed=interval_defs(),
1315
data=st.data(),
1316
time_unit=_time_units(),
1317
)
1318
def test_rolling_parametric(
1319
period: str,
1320
offset: str,
1321
closed: ClosedInterval,
1322
data: st.DataObject,
1323
time_unit: TimeUnit,
1324
) -> None:
1325
assume(period != "")
1326
dataframe = data.draw(
1327
dataframes(
1328
[
1329
column(
1330
"ts",
1331
strategy=st.datetimes(
1332
min_value=datetime(2000, 1, 1),
1333
max_value=datetime(2001, 1, 1),
1334
),
1335
dtype=pl.Datetime(time_unit),
1336
),
1337
column(
1338
"value",
1339
strategy=st.integers(min_value=-100, max_value=100),
1340
dtype=pl.Int64,
1341
),
1342
],
1343
min_size=1,
1344
)
1345
)
1346
df = dataframe.sort("ts")
1347
result = df.rolling("ts", period=period, offset=offset, closed=closed).agg(
1348
pl.col("value")
1349
)
1350
1351
expected_dict: dict[str, list[object]] = {"ts": [], "value": []}
1352
for ts, _ in df.iter_rows():
1353
window = df.filter(
1354
pl.col("ts").is_between(
1355
pl.lit(ts, dtype=pl.Datetime(time_unit)).dt.offset_by(offset),
1356
pl.lit(ts, dtype=pl.Datetime(time_unit))
1357
.dt.offset_by(offset)
1358
.dt.offset_by(period),
1359
closed=closed,
1360
)
1361
)
1362
value = window["value"].to_list()
1363
expected_dict["ts"].append(ts)
1364
expected_dict["value"].append(value)
1365
expected = pl.DataFrame(expected_dict).select(
1366
pl.col("ts").cast(pl.Datetime(time_unit)),
1367
pl.col("value").cast(pl.List(pl.Int64)),
1368
)
1369
assert_frame_equal(result, expected)
1370
1371
1372
@given(
1373
window_size=st.timedeltas(
1374
min_value=timedelta(microseconds=0), max_value=timedelta(days=2)
1375
).map(parse_as_duration_string),
1376
closed=interval_defs(),
1377
data=st.data(),
1378
time_unit=_time_units(),
1379
aggregation=st.sampled_from(
1380
[
1381
"min",
1382
"max",
1383
"mean",
1384
"sum",
1385
"std",
1386
"var",
1387
"median",
1388
]
1389
),
1390
)
1391
def test_rolling_aggs(
1392
window_size: str,
1393
closed: ClosedInterval,
1394
data: st.DataObject,
1395
time_unit: TimeUnit,
1396
aggregation: str,
1397
) -> None:
1398
assume(window_size != "")
1399
1400
# Testing logic can be faulty when window is more precise than time unit
1401
# https://github.com/pola-rs/polars/issues/11754
1402
assume(not (time_unit == "ms" and "us" in window_size))
1403
1404
dataframe = data.draw(
1405
dataframes(
1406
[
1407
column(
1408
"ts",
1409
strategy=st.datetimes(
1410
min_value=datetime(2000, 1, 1),
1411
max_value=datetime(2001, 1, 1),
1412
),
1413
dtype=pl.Datetime(time_unit),
1414
),
1415
column(
1416
"value",
1417
strategy=st.integers(min_value=-100, max_value=100),
1418
dtype=pl.Int64,
1419
),
1420
],
1421
)
1422
)
1423
df = dataframe.sort("ts")
1424
func = f"rolling_{aggregation}_by"
1425
result = df.with_columns(
1426
getattr(pl.col("value"), func)("ts", window_size=window_size, closed=closed)
1427
)
1428
result_from_unsorted = dataframe.with_columns(
1429
getattr(pl.col("value"), func)("ts", window_size=window_size, closed=closed)
1430
).sort("ts")
1431
1432
expected_dict: dict[str, list[object]] = {"ts": [], "value": []}
1433
for ts, _ in df.iter_rows():
1434
window = df.filter(
1435
pl.col("ts").is_between(
1436
pl.lit(ts, dtype=pl.Datetime(time_unit)).dt.offset_by(
1437
f"-{window_size}"
1438
),
1439
pl.lit(ts, dtype=pl.Datetime(time_unit)),
1440
closed=closed,
1441
)
1442
)
1443
expected_dict["ts"].append(ts)
1444
if window.is_empty():
1445
expected_dict["value"].append(None)
1446
else:
1447
value = getattr(window["value"], aggregation)()
1448
expected_dict["value"].append(value)
1449
expected = pl.DataFrame(expected_dict).select(
1450
pl.col("ts").cast(pl.Datetime(time_unit)),
1451
pl.col("value").cast(result["value"].dtype),
1452
)
1453
assert_frame_equal(result, expected)
1454
assert_frame_equal(result_from_unsorted, expected)
1455
1456
1457
def test_window_size_validation() -> None:
1458
df = pl.DataFrame({"x": [1.0]})
1459
1460
with pytest.raises(OverflowError, match=r"can't convert negative int to unsigned"):
1461
df.with_columns(trailing_min=pl.col("x").rolling_min(window_size=-3))
1462
1463
1464
def test_rolling_empty_21032() -> None:
1465
df = pl.DataFrame(schema={"a": pl.Datetime("ms"), "b": pl.Int64()})
1466
1467
result = df.rolling(index_column="a", period=timedelta(days=2)).agg(
1468
pl.col("b").sum()
1469
)
1470
assert_frame_equal(result, df)
1471
1472
result = df.rolling(
1473
index_column="a", period=timedelta(days=2), offset=timedelta(days=3)
1474
).agg(pl.col("b").sum())
1475
assert_frame_equal(result, df)
1476
1477
1478
def test_rolling_offset_agg_15122() -> None:
1479
df = pl.DataFrame({"a": [1, 1, 1, 2, 2, 2], "b": [1, 2, 3, 1, 2, 3]})
1480
1481
result = df.rolling(index_column="b", period="1i", offset="0i", group_by="a").agg(
1482
window=pl.col("b")
1483
)
1484
expected = df.with_columns(window=pl.Series([[2], [3], [], [2], [3], []]))
1485
assert_frame_equal(result, expected)
1486
1487
result = df.rolling(index_column="b", period="1i", offset="1i", group_by="a").agg(
1488
window=pl.col("b")
1489
)
1490
expected = df.with_columns(window=pl.Series([[3], [], [], [3], [], []]))
1491
assert_frame_equal(result, expected)
1492
1493
1494
def test_rolling_sum_stability_11146() -> None:
1495
data_frame = pl.DataFrame(
1496
{
1497
"value": [
1498
0.0,
1499
290.57,
1500
107.0,
1501
172.0,
1502
124.25,
1503
304.0,
1504
379.5,
1505
347.35,
1506
1516.41,
1507
386.12,
1508
226.5,
1509
294.62,
1510
125.5,
1511
0.0,
1512
0.0,
1513
0.0,
1514
0.0,
1515
0.0,
1516
0.0,
1517
0.0,
1518
0.0,
1519
]
1520
}
1521
)
1522
assert (
1523
data_frame.with_columns(
1524
pl.col("value").rolling_mean(window_size=8, min_samples=1).alias("test_col")
1525
)["test_col"][-1]
1526
== 0.0
1527
)
1528
1529
1530
def test_rolling() -> None:
1531
df = pl.DataFrame(
1532
{
1533
"n": [0, 0, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7, 8, 8, 9, 9, 10, 10],
1534
"col1": ["A", "B"] * 11,
1535
}
1536
)
1537
1538
assert df.rolling("n", period="1i", group_by="col1").agg().to_dict(
1539
as_series=False
1540
) == {
1541
"col1": [
1542
"A",
1543
"A",
1544
"A",
1545
"A",
1546
"A",
1547
"A",
1548
"A",
1549
"A",
1550
"A",
1551
"A",
1552
"A",
1553
"B",
1554
"B",
1555
"B",
1556
"B",
1557
"B",
1558
"B",
1559
"B",
1560
"B",
1561
"B",
1562
"B",
1563
"B",
1564
],
1565
"n": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
1566
}
1567
1568
1569
@pytest.mark.parametrize(
1570
"method",
1571
["nearest", "higher", "lower", "midpoint", "linear", "equiprobable"],
1572
)
1573
def test_rolling_quantile_with_nulls_22781(method: QuantileMethod) -> None:
1574
lf = pl.LazyFrame(
1575
{
1576
"index": [0, 1, 2, 3, 4, 5, 6, 7, 8],
1577
"a": [None, None, 1.0, None, None, 1.0, 1.0, None, None],
1578
}
1579
)
1580
out = (
1581
lf.rolling("index", period="2i")
1582
.agg(pl.col("a").quantile(0.5, interpolation=method))
1583
.collect()
1584
)
1585
expected = pl.Series("a", [None, None, 1.0, 1.0, None, 1.0, 1.0, 1.0, None])
1586
assert_series_equal(out["a"], expected)
1587
1588
1589
def test_rolling_quantile_nearest_23392() -> None:
1590
base = range(11)
1591
s = pl.Series(base)
1592
1593
shuffle_base = list(base)
1594
random.shuffle(shuffle_base)
1595
s_shuffled = pl.Series(shuffle_base)
1596
1597
for q in np.arange(0, 1.0, 0.02, dtype=float):
1598
out = s.rolling_quantile(q, interpolation="nearest", window_size=11)
1599
1600
# explicit:
1601
expected = pl.Series([None] * 10 + [float(round(q * 10.0))])
1602
assert_series_equal(out, expected)
1603
1604
# equivalence:
1605
equiv = s.quantile(q, interpolation="nearest")
1606
assert out.last() == equiv
1607
1608
# shuffled:
1609
out = s_shuffled.rolling_quantile(q, interpolation="nearest", window_size=11)
1610
assert_series_equal(out, expected)
1611
1612
1613
def test_rolling_quantile_temporals() -> None:
1614
tz = ZoneInfo("Asia/Tokyo")
1615
dt = pl.Datetime("ms", "Asia/Tokyo")
1616
# We use ms to verify that the correct time unit is propagating.
1617
lf = pl.LazyFrame(
1618
{
1619
"date": [date(2025, 1, x) for x in range(1, 6)],
1620
"datetime": [datetime(2025, 1, x) for x in range(1, 6)],
1621
"datetime_tu_tz": pl.Series(
1622
[datetime(2025, 1, x, tzinfo=tz) for x in range(1, 6)], dtype=dt
1623
),
1624
"duration": pl.Series(
1625
[timedelta(hours=x) for x in range(1, 6)], dtype=pl.Duration("ms")
1626
),
1627
"time": [time(hour=x) for x in range(1, 6)],
1628
}
1629
)
1630
result = lf.select(
1631
rolling_date=pl.col("date").rolling_quantile(
1632
quantile=0.5, window_size=4, interpolation="linear"
1633
),
1634
rolling_datetime=pl.col("datetime").rolling_quantile(
1635
quantile=0.5, window_size=4, interpolation="linear"
1636
),
1637
rolling_datetime_tu_tz=pl.col("datetime_tu_tz").rolling_quantile(
1638
quantile=0.5, window_size=4, interpolation="linear"
1639
),
1640
rolling_duration=pl.col("duration").rolling_quantile(
1641
quantile=0.5, window_size=4, interpolation="linear"
1642
),
1643
rolling_time=pl.col("time").rolling_quantile(
1644
quantile=0.5, window_size=4, interpolation="linear"
1645
),
1646
)
1647
expected = pl.DataFrame(
1648
{
1649
"rolling_date": pl.Series(
1650
[None, None, None, datetime(2025, 1, 2, 12), datetime(2025, 1, 3, 12)],
1651
dtype=pl.Datetime,
1652
),
1653
"rolling_datetime": pl.Series(
1654
[None, None, None, datetime(2025, 1, 2, 12), datetime(2025, 1, 3, 12)]
1655
),
1656
"rolling_datetime_tu_tz": pl.Series(
1657
[
1658
None,
1659
None,
1660
None,
1661
datetime(2025, 1, 2, 12, tzinfo=tz),
1662
datetime(2025, 1, 3, 12, tzinfo=tz),
1663
],
1664
dtype=dt,
1665
),
1666
"rolling_duration": pl.Series(
1667
[None, None, None, timedelta(hours=2.5), timedelta(hours=3.5)],
1668
dtype=pl.Duration("ms"),
1669
),
1670
"rolling_time": [
1671
None,
1672
None,
1673
None,
1674
time(hour=2, minute=30),
1675
time(hour=3, minute=30),
1676
],
1677
}
1678
)
1679
assert result.collect_schema() == pl.Schema(
1680
{ # type: ignore[arg-type]
1681
"rolling_date": pl.Datetime("us"),
1682
"rolling_datetime": pl.Datetime("us"),
1683
"rolling_datetime_tu_tz": dt,
1684
"rolling_duration": pl.Duration("ms"),
1685
"rolling_time": pl.Time,
1686
}
1687
)
1688
assert_frame_equal(result.collect(), expected)
1689
1690
1691
def test_rolling_agg_quantile_temporal() -> None:
1692
tz = ZoneInfo("Asia/Tokyo")
1693
dt = pl.Datetime("ms", "Asia/Tokyo")
1694
# We use ms to verify that the correct time unit is propagating.
1695
lf = pl.LazyFrame(
1696
{
1697
"index": [1, 2, 3, 4, 5],
1698
"int": [1, 2, 3, 4, 5],
1699
"date": [date(2025, 1, x) for x in range(1, 6)],
1700
"datetime": [datetime(2025, 1, x) for x in range(1, 6)],
1701
"datetime_tu_tz": pl.Series(
1702
[datetime(2025, 1, x, tzinfo=tz) for x in range(1, 6)], dtype=dt
1703
),
1704
"duration": pl.Series(
1705
[timedelta(hours=x) for x in range(1, 6)], dtype=pl.Duration("ms")
1706
),
1707
"time": [time(hour=x) for x in range(1, 6)],
1708
}
1709
)
1710
1711
# Using rolling.agg()
1712
result1 = lf.rolling("index", period="4i").agg(
1713
rolling_int=pl.col("int").quantile(0.5, "linear"),
1714
rolling_date=pl.col("date").quantile(0.5, "linear"),
1715
rolling_datetime=pl.col("datetime").quantile(0.5, "linear"),
1716
rolling_datetime_tu_tz=pl.col("datetime_tu_tz").quantile(0.5, "linear"),
1717
rolling_duration=pl.col("duration").quantile(0.5, "linear"),
1718
rolling_time=pl.col("time").quantile(0.5, "linear"),
1719
)
1720
# Using rolling_quantile_by()
1721
result2 = lf.select(
1722
"index",
1723
rolling_int=pl.col("int").rolling_quantile_by(
1724
"index", window_size="4i", quantile=0.5, interpolation="linear"
1725
),
1726
rolling_date=pl.col("date").rolling_quantile_by(
1727
"index", window_size="4i", quantile=0.5, interpolation="linear"
1728
),
1729
rolling_datetime=pl.col("datetime").rolling_quantile_by(
1730
"index", window_size="4i", quantile=0.5, interpolation="linear"
1731
),
1732
rolling_datetime_tu_tz=pl.col("datetime_tu_tz").rolling_quantile_by(
1733
"index", window_size="4i", quantile=0.5, interpolation="linear"
1734
),
1735
rolling_duration=pl.col("duration").rolling_quantile_by(
1736
"index", window_size="4i", quantile=0.5, interpolation="linear"
1737
),
1738
rolling_time=pl.col("time").rolling_quantile_by(
1739
"index", window_size="4i", quantile=0.5, interpolation="linear"
1740
),
1741
)
1742
expected = pl.DataFrame(
1743
{
1744
"index": [1, 2, 3, 4, 5],
1745
"rolling_int": [1.0, 1.5, 2.0, 2.5, 3.5],
1746
"rolling_date": pl.Series(
1747
[
1748
datetime(2025, 1, 1),
1749
datetime(2025, 1, 1, 12),
1750
datetime(2025, 1, 2),
1751
datetime(2025, 1, 2, 12),
1752
datetime(2025, 1, 3, 12),
1753
]
1754
),
1755
"rolling_datetime": pl.Series(
1756
[
1757
datetime(2025, 1, 1),
1758
datetime(2025, 1, 1, 12),
1759
datetime(2025, 1, 2),
1760
datetime(2025, 1, 2, 12),
1761
datetime(2025, 1, 3, 12),
1762
]
1763
),
1764
"rolling_datetime_tu_tz": pl.Series(
1765
[
1766
datetime(2025, 1, 1, tzinfo=tz),
1767
datetime(2025, 1, 1, 12, tzinfo=tz),
1768
datetime(2025, 1, 2, tzinfo=tz),
1769
datetime(2025, 1, 2, 12, tzinfo=tz),
1770
datetime(2025, 1, 3, 12, tzinfo=tz),
1771
],
1772
dtype=dt,
1773
),
1774
"rolling_duration": pl.Series(
1775
[
1776
timedelta(hours=1),
1777
timedelta(hours=1.5),
1778
timedelta(hours=2),
1779
timedelta(hours=2.5),
1780
timedelta(hours=3.5),
1781
],
1782
dtype=pl.Duration("ms"),
1783
),
1784
"rolling_time": [
1785
time(hour=1),
1786
time(hour=1, minute=30),
1787
time(hour=2),
1788
time(hour=2, minute=30),
1789
time(hour=3, minute=30),
1790
],
1791
}
1792
)
1793
expected_schema = pl.Schema(
1794
{ # type: ignore[arg-type]
1795
"index": pl.Int64,
1796
"rolling_int": pl.Float64,
1797
"rolling_date": pl.Datetime("us"),
1798
"rolling_datetime": pl.Datetime("us"),
1799
"rolling_datetime_tu_tz": dt,
1800
"rolling_duration": pl.Duration("ms"),
1801
"rolling_time": pl.Time,
1802
}
1803
)
1804
assert result1.collect_schema() == expected_schema
1805
assert result2.collect_schema() == expected_schema
1806
assert_frame_equal(result1.collect(), expected)
1807
assert_frame_equal(result2.collect(), expected)
1808
1809
1810
def test_rolling_quantile_nearest_kernel_23392() -> None:
1811
df = pl.DataFrame(
1812
{
1813
"dt": [
1814
datetime(2021, 1, 1),
1815
datetime(2021, 1, 2),
1816
datetime(2021, 1, 4),
1817
datetime(2021, 1, 5),
1818
datetime(2021, 1, 7),
1819
],
1820
"values": pl.arange(0, 5, eager=True),
1821
}
1822
)
1823
# values (period="3d", quantile=0.7) are chosen to trigger index rounding
1824
out = (
1825
df.set_sorted("dt")
1826
.rolling("dt", period="3d", closed="both")
1827
.agg([pl.col("values").quantile(quantile=0.7).alias("quantile")])
1828
.select("quantile")
1829
)
1830
expected = pl.DataFrame({"quantile": [0.0, 1.0, 1.0, 2.0, 3.0]})
1831
assert_frame_equal(out, expected)
1832
1833
1834
def test_rolling_quantile_nearest_with_nulls_23932() -> None:
1835
lf = pl.LazyFrame(
1836
{
1837
"index": [0, 1, 2, 3, 4, 5, 6],
1838
"a": [None, None, 1.0, 2.0, 3.0, None, None],
1839
}
1840
)
1841
# values (period="3i", quantile=0.7) are chosen to trigger index rounding
1842
out = (
1843
lf.rolling("index", period="3i")
1844
.agg(pl.col("a").quantile(0.7, interpolation="nearest"))
1845
.collect()
1846
)
1847
expected = pl.Series("a", [None, None, 1.0, 2.0, 2.0, 3.0, 3.0])
1848
assert_series_equal(out["a"], expected)
1849
1850
1851
def test_wtd_min_periods_less_window() -> None:
1852
df = pl.DataFrame({"a": [1, 2, 3, 4, 5]}).with_columns(
1853
pl.col("a")
1854
.rolling_mean(
1855
window_size=3, weights=[0.25, 0.5, 0.25], min_samples=2, center=True
1856
)
1857
.alias("kernel_mean")
1858
)
1859
1860
expected = pl.DataFrame(
1861
{"a": [1, 2, 3, 4, 5], "kernel_mean": [1.333333, 2, 3, 4, 4.666667]}
1862
)
1863
1864
assert_frame_equal(df, expected)
1865
1866
df = pl.DataFrame({"a": [1, 2, 3, 4, 5]}).with_columns(
1867
pl.col("a")
1868
.rolling_sum(
1869
window_size=3, weights=[0.25, 0.5, 0.25], min_samples=2, center=True
1870
)
1871
.alias("kernel_sum")
1872
)
1873
expected = pl.DataFrame(
1874
{"a": [1, 2, 3, 4, 5], "kernel_sum": [1.0, 2.0, 3.0, 4.0, 3.5]}
1875
)
1876
1877
df = pl.DataFrame({"a": [1, 2, 3, 4, 5]}).with_columns(
1878
pl.col("a")
1879
.rolling_mean(
1880
window_size=3, weights=[0.2, 0.3, 0.5], min_samples=2, center=False
1881
)
1882
.alias("kernel_mean")
1883
)
1884
1885
expected = pl.DataFrame(
1886
{"a": [1, 2, 3, 4, 5], "kernel_mean": [None, 1.625, 2.3, 3.3, 4.3]}
1887
)
1888
1889
assert_frame_equal(df, expected)
1890
1891
df = pl.DataFrame({"a": [1, 2]}).with_columns(
1892
pl.col("a")
1893
.rolling_mean(
1894
window_size=3, weights=[0.25, 0.5, 0.25], min_samples=2, center=True
1895
)
1896
.alias("kernel_mean")
1897
)
1898
1899
# Handle edge case where the window size is larger than the number of elements
1900
expected = pl.DataFrame({"a": [1, 2], "kernel_mean": [1.333333, 1.666667]})
1901
assert_frame_equal(df, expected)
1902
1903
df = pl.DataFrame({"a": [1, 2]}).with_columns(
1904
pl.col("a")
1905
.rolling_mean(
1906
window_size=3, weights=[0.25, 0.25, 0.5], min_samples=1, center=False
1907
)
1908
.alias("kernel_mean")
1909
)
1910
1911
expected = pl.DataFrame({"a": [1, 2], "kernel_mean": [1.0, 2 * 2 / 3 + 1 * 1 / 3]})
1912
1913
df = pl.DataFrame({"a": [1]}).with_columns(
1914
pl.col("a")
1915
.rolling_sum(
1916
6, center=True, min_samples=0, weights=[1, 10, 100, 1000, 10_000, 100_000]
1917
)
1918
.alias("kernel_sum")
1919
)
1920
expected = pl.DataFrame({"a": [1], "kernel_sum": [1000.0]})
1921
assert_frame_equal(df, expected)
1922
1923
1924
def test_rolling_median_23480() -> None:
1925
vals = [None] * 17 + [3262645.8, 856191.4, 1635379.0, 34707156.0]
1926
evals = [None] * 19 + [1635379.0, (3262645.8 + 1635379.0) / 2]
1927
out = pl.DataFrame({"a": vals}).select(
1928
r15=pl.col("a").rolling_median(15, min_samples=3),
1929
r17=pl.col("a").rolling_median(17, min_samples=3),
1930
)
1931
expected = pl.DataFrame({"r15": evals, "r17": evals})
1932
assert_frame_equal(out, expected)
1933
1934
1935
@pytest.mark.slow
1936
@pytest.mark.parametrize("with_nulls", [True, False])
1937
def test_rolling_sum_non_finite_23115(with_nulls: bool) -> None:
1938
values: list[float | None] = [
1939
0.0,
1940
float("nan"),
1941
float("inf"),
1942
-float("inf"),
1943
42.0,
1944
-3.0,
1945
]
1946
if with_nulls:
1947
values.append(None)
1948
data = random.choices(values, k=1000)
1949
naive = [
1950
(
1951
sum(0 if x is None else x for x in data[max(0, i + 1 - 4) : i + 1])
1952
if sum(x is not None for x in data[max(0, i + 1 - 4) : i + 1]) >= 2
1953
else None
1954
)
1955
for i in range(1000)
1956
]
1957
assert_series_equal(pl.Series(data).rolling_sum(4, min_samples=2), pl.Series(naive))
1958
1959
1960
@pytest.mark.parametrize(
1961
("method", "out_dtype"),
1962
[
1963
("average", pl.Float64),
1964
("min", get_index_type()),
1965
("max", get_index_type()),
1966
("dense", get_index_type()),
1967
],
1968
)
1969
@given(
1970
s=series(
1971
name="a",
1972
allowed_dtypes=NUMERIC_DTYPES + TEMPORAL_DTYPES + [pl.Boolean],
1973
min_size=1,
1974
max_size=50,
1975
),
1976
window_size=st.integers(1, 50),
1977
)
1978
def test_rolling_rank(
1979
s: pl.Series,
1980
window_size: int,
1981
method: RankMethod,
1982
out_dtype: pl.DataType,
1983
) -> None:
1984
df = pl.DataFrame({"a": s})
1985
expected = (
1986
df.with_row_index()
1987
.with_columns(
1988
a=pl.col("a")
1989
.rank(method=method)
1990
.rolling(index_column="index", period=f"{window_size}i")
1991
.list.last()
1992
.cast(out_dtype)
1993
)
1994
.drop("index")
1995
)
1996
actual = df.lazy().select(
1997
pl.col("a").rolling_rank(
1998
window_size=window_size, method=method, seed=0, min_samples=1
1999
)
2000
)
2001
assert actual.collect_schema() == actual.collect().schema
2002
assert_frame_equal(actual.collect(), expected)
2003
2004
2005
@pytest.mark.parametrize("center", [False, True])
2006
@given(
2007
s=series(
2008
name="a",
2009
allowed_dtypes=NUMERIC_DTYPES + TEMPORAL_DTYPES + [pl.Boolean],
2010
min_size=1,
2011
max_size=50,
2012
),
2013
window_size=st.integers(1, 50),
2014
)
2015
def test_rolling_rank_method_random(
2016
s: pl.Series, window_size: int, center: bool
2017
) -> None:
2018
df = pl.DataFrame({"a": s})
2019
actual = df.lazy().with_columns(
2020
lo=pl.col("a").rolling_rank(
2021
window_size=window_size, method="min", center=center
2022
),
2023
hi=pl.col("a").rolling_rank(
2024
window_size=window_size, method="max", center=center
2025
),
2026
random=pl.col("a").rolling_rank(
2027
window_size=window_size,
2028
method="random",
2029
center=center,
2030
),
2031
)
2032
2033
assert actual.collect_schema() == actual.collect().schema, (
2034
f"expected {actual.collect_schema()}, got {actual.collect().schema}"
2035
)
2036
assert (
2037
actual.select(
2038
(
2039
(pl.col("lo") <= pl.col("random")) & (pl.col("random") <= pl.col("hi"))
2040
).all()
2041
)
2042
.collect()
2043
.item()
2044
)
2045
2046
2047
@pytest.mark.parametrize("op", [pl.Expr.rolling_mean, pl.Expr.rolling_median])
2048
def test_rolling_mean_median_temporals(op: Callable[..., pl.Expr]) -> None:
2049
tz = ZoneInfo("Asia/Tokyo")
2050
# We use ms to verify that the correct time unit is propagating.
2051
dt = pl.Datetime("ms", "Asia/Tokyo")
2052
lf = pl.LazyFrame(
2053
{
2054
"int": [1, 2, 3, 4, 5],
2055
"date": [date(2025, 1, x) for x in range(1, 6)],
2056
"datetime": [datetime(2025, 1, x) for x in range(1, 6)],
2057
"datetime_tu_tz": pl.Series(
2058
[datetime(2025, 1, x, tzinfo=tz) for x in range(1, 6)], dtype=dt
2059
),
2060
"duration": pl.Series(
2061
[timedelta(hours=x) for x in range(1, 6)], dtype=pl.Duration("ms")
2062
),
2063
"time": [time(hour=x) for x in range(1, 6)],
2064
}
2065
)
2066
result = lf.select(
2067
rolling_date=op(pl.col("date"), window_size=4),
2068
rolling_datetime=op(pl.col("datetime"), window_size=4),
2069
rolling_datetime_tu_tz=op(pl.col("datetime_tu_tz"), window_size=4),
2070
rolling_duration=op(pl.col("duration"), window_size=4),
2071
rolling_time=op(pl.col("time"), window_size=4),
2072
)
2073
expected = pl.DataFrame(
2074
{
2075
"rolling_date": pl.Series(
2076
[None, None, None, datetime(2025, 1, 2, 12), datetime(2025, 1, 3, 12)],
2077
dtype=pl.Datetime,
2078
),
2079
"rolling_datetime": pl.Series(
2080
[None, None, None, datetime(2025, 1, 2, 12), datetime(2025, 1, 3, 12)]
2081
),
2082
"rolling_datetime_tu_tz": pl.Series(
2083
[
2084
None,
2085
None,
2086
None,
2087
datetime(2025, 1, 2, 12, tzinfo=tz),
2088
datetime(2025, 1, 3, 12, tzinfo=tz),
2089
],
2090
dtype=dt,
2091
),
2092
"rolling_duration": pl.Series(
2093
[None, None, None, timedelta(hours=2.5), timedelta(hours=3.5)],
2094
dtype=pl.Duration("ms"),
2095
),
2096
"rolling_time": [
2097
None,
2098
None,
2099
None,
2100
time(hour=2, minute=30),
2101
time(hour=3, minute=30),
2102
],
2103
}
2104
)
2105
assert result.collect_schema() == pl.Schema(
2106
{ # type: ignore[arg-type]
2107
"rolling_date": pl.Datetime("us"),
2108
"rolling_datetime": pl.Datetime("us"),
2109
"rolling_datetime_tu_tz": dt,
2110
"rolling_duration": pl.Duration("ms"),
2111
"rolling_time": pl.Time,
2112
}
2113
)
2114
assert_frame_equal(result.collect(), expected)
2115
2116
2117
@pytest.mark.parametrize(
2118
"op",
2119
[
2120
(pl.Expr.mean, pl.Expr.rolling_mean_by),
2121
(pl.Expr.median, pl.Expr.rolling_median_by),
2122
],
2123
)
2124
def test_rolling_agg_mean_median_temporal(
2125
op: tuple[Callable[..., pl.Expr], Callable[..., pl.Expr]],
2126
) -> None:
2127
tz = ZoneInfo("Asia/Tokyo")
2128
# We use ms to verify that the correct time unit is propagating.
2129
dt = pl.Datetime("ms", "Asia/Tokyo")
2130
lf = pl.LazyFrame(
2131
{
2132
"index": [1, 2, 3, 4, 5],
2133
"int": [1, 2, 3, 4, 5],
2134
"date": [date(2025, 1, x) for x in range(1, 6)],
2135
"datetime": [datetime(2025, 1, x) for x in range(1, 6)],
2136
"datetime_tu_tz": pl.Series(
2137
[datetime(2025, 1, x, tzinfo=tz) for x in range(1, 6)], dtype=dt
2138
),
2139
"duration": pl.Series(
2140
[timedelta(hours=x) for x in range(1, 6)], dtype=pl.Duration("ms")
2141
),
2142
"time": [time(hour=x) for x in range(1, 6)],
2143
}
2144
)
2145
2146
# Using rolling.agg()
2147
result1 = lf.rolling("index", period="4i").agg(
2148
rolling_int=op[0](pl.col("int")),
2149
rolling_date=op[0](pl.col("date")),
2150
rolling_datetime=op[0](pl.col("datetime")),
2151
rolling_datetime_tu_tz=op[0](pl.col("datetime_tu_tz")),
2152
rolling_duration=op[0](pl.col("duration")),
2153
rolling_time=op[0](pl.col("time")),
2154
)
2155
# Using rolling_quantile_by()
2156
result2 = lf.select(
2157
"index",
2158
rolling_int=op[1](pl.col("int"), "index", window_size="4i"),
2159
rolling_date=op[1](pl.col("date"), "index", window_size="4i"),
2160
rolling_datetime=op[1](pl.col("datetime"), "index", window_size="4i"),
2161
rolling_datetime_tu_tz=op[1](
2162
pl.col("datetime_tu_tz"), "index", window_size="4i"
2163
),
2164
rolling_duration=op[1](pl.col("duration"), "index", window_size="4i"),
2165
rolling_time=op[1](pl.col("time"), "index", window_size="4i"),
2166
)
2167
expected = pl.DataFrame(
2168
{
2169
"index": [1, 2, 3, 4, 5],
2170
"rolling_int": [1.0, 1.5, 2.0, 2.5, 3.5],
2171
"rolling_date": pl.Series(
2172
[
2173
datetime(2025, 1, 1),
2174
datetime(2025, 1, 1, 12),
2175
datetime(2025, 1, 2),
2176
datetime(2025, 1, 2, 12),
2177
datetime(2025, 1, 3, 12),
2178
]
2179
),
2180
"rolling_datetime": pl.Series(
2181
[
2182
datetime(2025, 1, 1),
2183
datetime(2025, 1, 1, 12),
2184
datetime(2025, 1, 2),
2185
datetime(2025, 1, 2, 12),
2186
datetime(2025, 1, 3, 12),
2187
]
2188
),
2189
"rolling_datetime_tu_tz": pl.Series(
2190
[
2191
datetime(2025, 1, 1, tzinfo=tz),
2192
datetime(2025, 1, 1, 12, tzinfo=tz),
2193
datetime(2025, 1, 2, tzinfo=tz),
2194
datetime(2025, 1, 2, 12, tzinfo=tz),
2195
datetime(2025, 1, 3, 12, tzinfo=tz),
2196
],
2197
dtype=dt,
2198
),
2199
"rolling_duration": pl.Series(
2200
[
2201
timedelta(hours=1),
2202
timedelta(hours=1.5),
2203
timedelta(hours=2),
2204
timedelta(hours=2.5),
2205
timedelta(hours=3.5),
2206
],
2207
dtype=pl.Duration("ms"),
2208
),
2209
"rolling_time": [
2210
time(hour=1),
2211
time(hour=1, minute=30),
2212
time(hour=2),
2213
time(hour=2, minute=30),
2214
time(hour=3, minute=30),
2215
],
2216
}
2217
)
2218
expected_schema = pl.Schema(
2219
{ # type: ignore[arg-type]
2220
"index": pl.Int64,
2221
"rolling_int": pl.Float64,
2222
"rolling_date": pl.Datetime("us"),
2223
"rolling_datetime": pl.Datetime("us"),
2224
"rolling_datetime_tu_tz": dt,
2225
"rolling_duration": pl.Duration("ms"),
2226
"rolling_time": pl.Time,
2227
}
2228
)
2229
assert result1.collect_schema() == expected_schema
2230
assert result2.collect_schema() == expected_schema
2231
assert_frame_equal(result1.collect(), expected)
2232
assert_frame_equal(result2.collect(), expected)
2233
2234
2235
@pytest.mark.parametrize(
2236
("df", "expected"),
2237
[
2238
(
2239
pl.DataFrame(
2240
{"a": [1, 2, 3, 4], "offset": [0, 0, 0, 0], "len": [3, 1, 2, 1]}
2241
),
2242
pl.DataFrame({"a": [6, 2, 7, 4]}),
2243
),
2244
(
2245
pl.DataFrame(
2246
{
2247
"a": [1, 2, 3, 4, 5, 6],
2248
"offset": [0, 0, 2, 0, 0, 0],
2249
"len": [3, 1, 3, 3, 1, 1],
2250
}
2251
),
2252
pl.DataFrame({"a": [6, 2, 11, 15, 5, 6]}),
2253
),
2254
(
2255
pl.DataFrame(
2256
{"a": [1, 2, 3, None], "offset": [0, 0, 0, 0], "len": [3, 1, 2, 1]}
2257
),
2258
pl.DataFrame({"a": [6, 2, 3, 0]}),
2259
),
2260
(
2261
pl.DataFrame(
2262
{
2263
"a": [1, 2, 3, 4, 5, None],
2264
"offset": [0, 0, 2, 0, 0, 0],
2265
"len": [3, 1, 3, 3, 1, 1],
2266
}
2267
),
2268
pl.DataFrame({"a": [6, 2, 5, 9, 5, 0]}),
2269
),
2270
],
2271
)
2272
def test_rolling_agg_sum_varying_slice_25434(
2273
df: pl.DataFrame, expected: pl.DataFrame
2274
) -> None:
2275
out = df.with_row_index().select(
2276
pl.col("a")
2277
.slice(pl.col("offset").first(), pl.col("len").first())
2278
.sum()
2279
.rolling("index", period=f"{df.height}i", offset="0i", closed="left")
2280
)
2281
assert_frame_equal(out, expected)
2282
2283
2284
@pytest.mark.parametrize("with_nulls", [True, False])
2285
def test_rolling_agg_sum_varying_slice_fuzz(with_nulls: bool) -> None:
2286
n = 1000
2287
max_rand = 10
2288
2289
def opt_null(n: int) -> int | None:
2290
return None if random.randint(0, max_rand) == max_rand and with_nulls else n
2291
2292
df = pl.DataFrame(
2293
{
2294
"a": [opt_null(i) for i in range(n)],
2295
"offset": [random.randint(0, max_rand) for _ in range(n)],
2296
"length": [random.randint(0, max_rand) for _ in range(n)],
2297
}
2298
)
2299
2300
out = df.with_row_index().select(
2301
pl.col("a")
2302
.slice(pl.col("offset").first(), pl.col("length").first())
2303
.sum()
2304
.rolling("index", period=f"{df.height}i", offset="0i", closed="left")
2305
)
2306
2307
out = out.select(pl.col("a").fill_null(0))
2308
df = df.with_columns(pl.col("a").fill_null(0))
2309
2310
(a, offset, length) = (
2311
df["a"].to_list(),
2312
df["offset"].to_list(),
2313
df["length"].to_list(),
2314
)
2315
expected = [sum(a[i + offset[i] : i + offset[i] + length[i]]) for i in range(n)]
2316
assert_frame_equal(out, pl.DataFrame({"a": expected}))
2317
2318
2319
def test_rolling_midpoint_25793() -> None:
2320
df = pl.DataFrame({"i": [1, 2, 3, 4], "x": [1, 2, 3, 4]})
2321
2322
out = df.select(
2323
pl.col.x.quantile(0.5, interpolation="midpoint").rolling("i", period="4i")
2324
)
2325
expected = pl.DataFrame({"x": [1.0, 1.5, 2.0, 2.5]})
2326
assert_frame_equal(out, expected)
2327
2328
out = df.select(
2329
pl.col.x.cumulative_eval(pl.element().quantile(0.5, interpolation="midpoint"))
2330
)
2331
assert_frame_equal(out, expected)
2332
2333
2334
def test_rolling_rank_closed_left_26147() -> None:
2335
df = pl.DataFrame(
2336
{
2337
"date": [datetime(2025, 1, 1), datetime(2025, 1, 1)],
2338
"x": [0, 1],
2339
"x_flipped": [1, 0],
2340
}
2341
)
2342
actual = df.with_columns(
2343
x_ranked=pl.col("x").rolling_rank_by("date", "2d"),
2344
x_flipped_ranked=pl.col("x_flipped").rolling_rank_by("date", "2d"),
2345
)
2346
expected = df.with_columns(
2347
x_ranked=pl.Series([1.0, 2.0]),
2348
x_flipped_ranked=pl.Series([2.0, 1.0]),
2349
)
2350
assert_frame_equal(actual, expected)
2351
2352