Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/operations/rolling/test_rolling.py
6940 views
1
from __future__ import annotations
2
3
import random
4
import sys
5
from datetime import date, datetime, timedelta
6
from typing import TYPE_CHECKING
7
8
import hypothesis.strategies as st
9
import numpy as np
10
import pytest
11
from hypothesis import assume, given
12
from numpy import nan
13
14
import polars as pl
15
from polars._utils.convert import parse_as_duration_string
16
from polars.exceptions import ComputeError, InvalidOperationError
17
from polars.testing import assert_frame_equal, assert_series_equal
18
from polars.testing.parametric import column, dataframes
19
from polars.testing.parametric.strategies.dtype import _time_units
20
from tests.unit.conftest import INTEGER_DTYPES
21
22
if TYPE_CHECKING:
23
from hypothesis.strategies import SearchStrategy
24
25
from polars._typing import (
26
ClosedInterval,
27
PolarsDataType,
28
QuantileMethod,
29
TimeUnit,
30
)
31
32
33
@pytest.fixture
34
def example_df() -> pl.DataFrame:
35
return pl.DataFrame(
36
{
37
"dt": [
38
datetime(2021, 1, 1),
39
datetime(2021, 1, 2),
40
datetime(2021, 1, 4),
41
datetime(2021, 1, 5),
42
datetime(2021, 1, 7),
43
],
44
"values": pl.arange(0, 5, eager=True),
45
}
46
)
47
48
49
@pytest.mark.parametrize(
50
"period",
51
["1d", "2d", "3d", timedelta(days=1), timedelta(days=2), timedelta(days=3)],
52
)
53
@pytest.mark.parametrize("closed", ["left", "right", "none", "both"])
54
def test_rolling_kernels_and_rolling(
55
example_df: pl.DataFrame, period: str | timedelta, closed: ClosedInterval
56
) -> None:
57
out1 = example_df.set_sorted("dt").select(
58
pl.col("dt"),
59
# this differs from group_by aggregation because the empty window is
60
# null here
61
# where the sum aggregation of an empty set is 0
62
pl.col("values")
63
.rolling_sum_by("dt", period, closed=closed)
64
.fill_null(0)
65
.alias("sum"),
66
pl.col("values").rolling_var_by("dt", period, closed=closed).alias("var"),
67
pl.col("values").rolling_mean_by("dt", period, closed=closed).alias("mean"),
68
pl.col("values").rolling_std_by("dt", period, closed=closed).alias("std"),
69
pl.col("values")
70
.rolling_quantile_by("dt", period, quantile=0.2, closed=closed)
71
.alias("quantile"),
72
)
73
out2 = (
74
example_df.set_sorted("dt")
75
.rolling("dt", period=period, closed=closed)
76
.agg(
77
[
78
pl.col("values").sum().alias("sum"),
79
pl.col("values").var().alias("var"),
80
pl.col("values").mean().alias("mean"),
81
pl.col("values").std().alias("std"),
82
pl.col("values").quantile(quantile=0.2).alias("quantile"),
83
]
84
)
85
)
86
assert_frame_equal(out1, out2)
87
88
89
@pytest.mark.parametrize(
90
("offset", "closed", "expected_values"),
91
[
92
pytest.param(
93
"-1d",
94
"left",
95
[[1], [1, 2], [2, 3], [3, 4]],
96
id="partial lookbehind, left",
97
),
98
pytest.param(
99
"-1d",
100
"right",
101
[[1, 2], [2, 3], [3, 4], [4]],
102
id="partial lookbehind, right",
103
),
104
pytest.param(
105
"-1d",
106
"both",
107
[[1, 2], [1, 2, 3], [2, 3, 4], [3, 4]],
108
id="partial lookbehind, both",
109
),
110
pytest.param(
111
"-1d",
112
"none",
113
[[1], [2], [3], [4]],
114
id="partial lookbehind, none",
115
),
116
pytest.param(
117
"-2d",
118
"left",
119
[[], [1], [1, 2], [2, 3]],
120
id="full lookbehind, left",
121
),
122
pytest.param(
123
"-3d",
124
"left",
125
[[], [], [1], [1, 2]],
126
id="full lookbehind, offset > period, left",
127
),
128
pytest.param(
129
"-3d",
130
"right",
131
[[], [1], [1, 2], [2, 3]],
132
id="full lookbehind, right",
133
),
134
pytest.param(
135
"-3d",
136
"both",
137
[[], [1], [1, 2], [1, 2, 3]],
138
id="full lookbehind, both",
139
),
140
pytest.param(
141
"-2d",
142
"none",
143
[[], [1], [2], [3]],
144
id="full lookbehind, none",
145
),
146
pytest.param(
147
"-3d",
148
"none",
149
[[], [], [1], [2]],
150
id="full lookbehind, offset > period, none",
151
),
152
],
153
)
154
def test_rolling_negative_offset(
155
offset: str, closed: ClosedInterval, expected_values: list[list[int]]
156
) -> None:
157
df = pl.DataFrame(
158
{
159
"ts": pl.datetime_range(
160
datetime(2021, 1, 1), datetime(2021, 1, 4), "1d", eager=True
161
),
162
"value": [1, 2, 3, 4],
163
}
164
)
165
result = df.rolling("ts", period="2d", offset=offset, closed=closed).agg(
166
pl.col("value")
167
)
168
expected = pl.DataFrame(
169
{
170
"ts": pl.datetime_range(
171
datetime(2021, 1, 1), datetime(2021, 1, 4), "1d", eager=True
172
),
173
"value": expected_values,
174
}
175
)
176
assert_frame_equal(result, expected)
177
178
179
def test_rolling_skew() -> None:
180
s = pl.Series([1, 2, 3, 3, 2, 10, 8])
181
assert s.rolling_skew(window_size=4, bias=True).to_list() == pytest.approx(
182
[
183
None,
184
None,
185
None,
186
-0.49338220021815865,
187
0.0,
188
1.097025449363867,
189
0.09770939201338157,
190
]
191
)
192
193
assert s.rolling_skew(window_size=4, bias=False).to_list() == pytest.approx(
194
[
195
None,
196
None,
197
None,
198
-0.8545630383279711,
199
0.0,
200
1.9001038154942962,
201
0.16923763134384154,
202
]
203
)
204
205
206
def test_rolling_kurtosis() -> None:
207
s = pl.Series([1, 2, 3, 3, 2, 10, 8])
208
assert s.rolling_kurtosis(window_size=4, bias=True).to_list() == pytest.approx(
209
[
210
None,
211
None,
212
None,
213
-1.371900826446281,
214
-1.9999999999999991,
215
-0.7055324211778693,
216
-1.7878967572797346,
217
]
218
)
219
assert s.rolling_kurtosis(
220
window_size=4, bias=True, fisher=False
221
).to_list() == pytest.approx(
222
[
223
None,
224
None,
225
None,
226
1.628099173553719,
227
1.0000000000000009,
228
2.2944675788221307,
229
1.2121032427202654,
230
]
231
)
232
233
234
@pytest.mark.parametrize("time_zone", [None, "US/Central"])
235
@pytest.mark.parametrize(
236
("rolling_fn", "expected_values", "expected_dtype"),
237
[
238
("rolling_mean_by", [None, 1.0, 2.0, 3.0, 4.0, 5.0], pl.Float64),
239
("rolling_sum_by", [None, 1, 2, 3, 4, 5], pl.Int64),
240
("rolling_min_by", [None, 1, 2, 3, 4, 5], pl.Int64),
241
("rolling_max_by", [None, 1, 2, 3, 4, 5], pl.Int64),
242
("rolling_std_by", [None, None, None, None, None, None], pl.Float64),
243
("rolling_var_by", [None, None, None, None, None, None], pl.Float64),
244
],
245
)
246
def test_rolling_crossing_dst(
247
time_zone: str | None,
248
rolling_fn: str,
249
expected_values: list[int | None | float],
250
expected_dtype: PolarsDataType,
251
) -> None:
252
ts = pl.datetime_range(
253
datetime(2021, 11, 5), datetime(2021, 11, 10), "1d", time_zone="UTC", eager=True
254
).dt.replace_time_zone(time_zone)
255
df = pl.DataFrame({"ts": ts, "value": [1, 2, 3, 4, 5, 6]})
256
257
result = df.with_columns(
258
getattr(pl.col("value"), rolling_fn)(by="ts", window_size="1d", closed="left")
259
)
260
261
expected = pl.DataFrame(
262
{"ts": ts, "value": expected_values}, schema_overrides={"value": expected_dtype}
263
)
264
assert_frame_equal(result, expected)
265
266
267
def test_rolling_by_invalid() -> None:
268
df = pl.DataFrame(
269
{"a": [1, 2, 3], "b": [4, 5, 6]}, schema_overrides={"a": pl.Int16}
270
).sort("a")
271
msg = "unsupported data type: i16 for temporal/index column, expected UInt64, UInt32, Int64, Int32, Datetime, Date, Duration, or Time"
272
with pytest.raises(InvalidOperationError, match=msg):
273
df.select(pl.col("b").rolling_min_by("a", "2i"))
274
df = pl.DataFrame({"a": [1, 2, 3], "b": [date(2020, 1, 1)] * 3}).sort("b")
275
msg = "`window_size` duration may not be a parsed integer"
276
with pytest.raises(InvalidOperationError, match=msg):
277
df.select(pl.col("a").rolling_min_by("b", "2i"))
278
279
280
def test_rolling_infinity() -> None:
281
s = pl.Series("col", ["-inf", "5", "5"]).cast(pl.Float64)
282
s = s.rolling_mean(2)
283
expected = pl.Series("col", [None, "-inf", "5"]).cast(pl.Float64)
284
assert_series_equal(s, expected)
285
286
287
def test_rolling_by_non_temporal_window_size() -> None:
288
df = pl.DataFrame(
289
{"a": [4, 5, 6], "b": [date(2020, 1, 1), date(2020, 1, 2), date(2020, 1, 3)]}
290
).sort("a", "b")
291
msg = "`window_size` duration may not be a parsed integer"
292
with pytest.raises(InvalidOperationError, match=msg):
293
df.with_columns(pl.col("a").rolling_sum_by("b", "2i", closed="left"))
294
295
296
@pytest.mark.parametrize(
297
"dtype",
298
[
299
pl.UInt8,
300
pl.Int64,
301
pl.Float32,
302
pl.Float64,
303
pl.Time,
304
pl.Date,
305
pl.Datetime("ms"),
306
pl.Datetime("us"),
307
pl.Datetime("ns"),
308
pl.Datetime("ns", "Asia/Kathmandu"),
309
pl.Duration("ms"),
310
pl.Duration("us"),
311
pl.Duration("ns"),
312
],
313
)
314
def test_rolling_extrema(dtype: PolarsDataType) -> None:
315
# sorted data and nulls flags trigger different kernels
316
df = (
317
(
318
pl.DataFrame(
319
{
320
"col1": pl.int_range(0, 7, eager=True),
321
"col2": pl.int_range(0, 7, eager=True).reverse(),
322
}
323
)
324
)
325
.with_columns(
326
pl.when(pl.int_range(0, pl.len(), eager=False) < 2)
327
.then(None)
328
.otherwise(pl.all())
329
.name.keep()
330
.name.suffix("_nulls")
331
)
332
.cast(dtype)
333
)
334
335
expected = {
336
"col1": [None, None, 0, 1, 2, 3, 4],
337
"col2": [None, None, 4, 3, 2, 1, 0],
338
"col1_nulls": [None, None, None, None, 2, 3, 4],
339
"col2_nulls": [None, None, None, None, 2, 1, 0],
340
}
341
result = df.select([pl.all().rolling_min(3)])
342
assert result.to_dict(as_series=False) == {
343
k: pl.Series(v, dtype=dtype).to_list() for k, v in expected.items()
344
}
345
346
expected = {
347
"col1": [None, None, 2, 3, 4, 5, 6],
348
"col2": [None, None, 6, 5, 4, 3, 2],
349
"col1_nulls": [None, None, None, None, 4, 5, 6],
350
"col2_nulls": [None, None, None, None, 4, 3, 2],
351
}
352
result = df.select([pl.all().rolling_max(3)])
353
assert result.to_dict(as_series=False) == {
354
k: pl.Series(v, dtype=dtype).to_list() for k, v in expected.items()
355
}
356
357
# shuffled data triggers other kernels
358
df = df.select([pl.all().shuffle(seed=0)])
359
expected = {
360
"col1": [None, None, 0, 0, 4, 1, 1],
361
"col2": [None, None, 1, 1, 0, 0, 0],
362
"col1_nulls": [None, None, None, None, 4, None, None],
363
"col2_nulls": [None, None, None, None, 0, None, None],
364
}
365
result = df.select([pl.all().rolling_min(3)])
366
assert result.to_dict(as_series=False) == {
367
k: pl.Series(v, dtype=dtype).to_list() for k, v in expected.items()
368
}
369
result = df.select([pl.all().rolling_max(3)])
370
371
expected = {
372
"col1": [None, None, 5, 5, 6, 6, 6],
373
"col2": [None, None, 6, 6, 2, 5, 5],
374
"col1_nulls": [None, None, None, None, 6, None, None],
375
"col2_nulls": [None, None, None, None, 2, None, None],
376
}
377
assert result.to_dict(as_series=False) == {
378
k: pl.Series(v, dtype=dtype).to_list() for k, v in expected.items()
379
}
380
381
382
@pytest.mark.parametrize(
383
"dtype",
384
[
385
pl.UInt8,
386
pl.Int64,
387
pl.Float32,
388
pl.Float64,
389
pl.Time,
390
pl.Date,
391
pl.Datetime("ms"),
392
pl.Datetime("us"),
393
pl.Datetime("ns"),
394
pl.Datetime("ns", "Asia/Kathmandu"),
395
pl.Duration("ms"),
396
pl.Duration("us"),
397
pl.Duration("ns"),
398
],
399
)
400
def test_rolling_group_by_extrema(dtype: PolarsDataType) -> None:
401
# ensure we hit different branches so create
402
403
df = pl.DataFrame(
404
{
405
"col1": pl.arange(0, 7, eager=True).reverse(),
406
}
407
).with_columns(
408
pl.col("col1").reverse().alias("index"),
409
pl.col("col1").cast(dtype),
410
)
411
412
expected = {
413
"col1_list": pl.Series(
414
[
415
[6],
416
[6, 5],
417
[6, 5, 4],
418
[5, 4, 3],
419
[4, 3, 2],
420
[3, 2, 1],
421
[2, 1, 0],
422
],
423
dtype=pl.List(dtype),
424
).to_list(),
425
"col1_min": pl.Series([6, 5, 4, 3, 2, 1, 0], dtype=dtype).to_list(),
426
"col1_max": pl.Series([6, 6, 6, 5, 4, 3, 2], dtype=dtype).to_list(),
427
"col1_first": pl.Series([6, 6, 6, 5, 4, 3, 2], dtype=dtype).to_list(),
428
"col1_last": pl.Series([6, 5, 4, 3, 2, 1, 0], dtype=dtype).to_list(),
429
}
430
result = (
431
df.rolling(
432
index_column="index",
433
period="3i",
434
)
435
.agg(
436
[
437
pl.col("col1").name.suffix("_list"),
438
pl.col("col1").min().name.suffix("_min"),
439
pl.col("col1").max().name.suffix("_max"),
440
pl.col("col1").first().alias("col1_first"),
441
pl.col("col1").last().alias("col1_last"),
442
]
443
)
444
.select(["col1_list", "col1_min", "col1_max", "col1_first", "col1_last"])
445
)
446
assert result.to_dict(as_series=False) == expected
447
448
# ascending order
449
450
df = pl.DataFrame(
451
{
452
"col1": pl.arange(0, 7, eager=True),
453
}
454
).with_columns(
455
pl.col("col1").alias("index"),
456
pl.col("col1").cast(dtype),
457
)
458
459
result = (
460
df.rolling(
461
index_column="index",
462
period="3i",
463
)
464
.agg(
465
[
466
pl.col("col1").name.suffix("_list"),
467
pl.col("col1").min().name.suffix("_min"),
468
pl.col("col1").max().name.suffix("_max"),
469
pl.col("col1").first().alias("col1_first"),
470
pl.col("col1").last().alias("col1_last"),
471
]
472
)
473
.select(["col1_list", "col1_min", "col1_max", "col1_first", "col1_last"])
474
)
475
expected = {
476
"col1_list": pl.Series(
477
[
478
[0],
479
[0, 1],
480
[0, 1, 2],
481
[1, 2, 3],
482
[2, 3, 4],
483
[3, 4, 5],
484
[4, 5, 6],
485
],
486
dtype=pl.List(dtype),
487
).to_list(),
488
"col1_min": pl.Series([0, 0, 0, 1, 2, 3, 4], dtype=dtype).to_list(),
489
"col1_max": pl.Series([0, 1, 2, 3, 4, 5, 6], dtype=dtype).to_list(),
490
"col1_first": pl.Series([0, 0, 0, 1, 2, 3, 4], dtype=dtype).to_list(),
491
"col1_last": pl.Series([0, 1, 2, 3, 4, 5, 6], dtype=dtype).to_list(),
492
}
493
assert result.to_dict(as_series=False) == expected
494
495
# shuffled data.
496
df = pl.DataFrame(
497
{
498
"col1": pl.arange(0, 7, eager=True).shuffle(1),
499
}
500
).with_columns(
501
pl.col("col1").cast(dtype),
502
pl.col("col1").sort().alias("index"),
503
)
504
505
result = (
506
df.rolling(
507
index_column="index",
508
period="3i",
509
)
510
.agg(
511
[
512
pl.col("col1").min().name.suffix("_min"),
513
pl.col("col1").max().name.suffix("_max"),
514
pl.col("col1").name.suffix("_list"),
515
]
516
)
517
.select(["col1_list", "col1_min", "col1_max"])
518
)
519
expected = {
520
"col1_list": pl.Series(
521
[
522
[4],
523
[4, 2],
524
[4, 2, 5],
525
[2, 5, 1],
526
[5, 1, 6],
527
[1, 6, 0],
528
[6, 0, 3],
529
],
530
dtype=pl.List(dtype),
531
).to_list(),
532
"col1_min": pl.Series([4, 2, 2, 1, 1, 0, 0], dtype=dtype).to_list(),
533
"col1_max": pl.Series([4, 4, 5, 5, 6, 6, 6], dtype=dtype).to_list(),
534
}
535
assert result.to_dict(as_series=False) == expected
536
537
538
def test_rolling_slice_pushdown() -> None:
539
df = pl.DataFrame({"a": [1, 2, 3], "b": ["a", "a", "b"], "c": [1, 3, 5]}).lazy()
540
df = (
541
df.sort("a")
542
.rolling(
543
"a",
544
group_by="b",
545
period="2i",
546
)
547
.agg([(pl.col("c") - pl.col("c").shift(fill_value=0)).sum().alias("c")])
548
)
549
assert df.head(2).collect().to_dict(as_series=False) == {
550
"b": ["a", "a"],
551
"a": [1, 2],
552
"c": [1, 3],
553
}
554
555
556
def test_overlapping_groups_4628() -> None:
557
df = pl.DataFrame(
558
{
559
"index": [1, 2, 3, 4, 5, 6],
560
"val": [10, 20, 40, 70, 110, 160],
561
}
562
)
563
assert (
564
df.rolling(index_column=pl.col("index").set_sorted(), period="3i").agg(
565
[
566
pl.col("val").diff(n=1).alias("val.diff"),
567
(pl.col("val") - pl.col("val").shift(1)).alias("val - val.shift"),
568
]
569
)
570
).to_dict(as_series=False) == {
571
"index": [1, 2, 3, 4, 5, 6],
572
"val.diff": [
573
[None],
574
[None, 10],
575
[None, 10, 20],
576
[None, 20, 30],
577
[None, 30, 40],
578
[None, 40, 50],
579
],
580
"val - val.shift": [
581
[None],
582
[None, 10],
583
[None, 10, 20],
584
[None, 20, 30],
585
[None, 30, 40],
586
[None, 40, 50],
587
],
588
}
589
590
591
@pytest.mark.skipif(sys.platform == "win32", reason="Minor numerical diff")
592
def test_rolling_skew_lagging_null_5179() -> None:
593
s = pl.Series([None, 3, 4, 1, None, None, None, None, 3, None, 5, 4, 7, 2, 1, None])
594
result = s.rolling_skew(3, min_samples=1).fill_nan(-1.0)
595
expected = pl.Series(
596
[
597
None,
598
-1.0,
599
0.0,
600
-0.3818017741606059,
601
0.0,
602
-1.0,
603
None,
604
None,
605
-1.0,
606
-1.0,
607
0.0,
608
0.0,
609
0.38180177416060695,
610
0.23906314692954517,
611
0.6309038567106234,
612
0.0,
613
]
614
)
615
assert_series_equal(result, expected, check_names=False)
616
617
618
def test_rolling_var_numerical_stability_5197() -> None:
619
s = pl.Series([*[1.2] * 4, *[3.3] * 7])
620
res = s.to_frame("a").with_columns(pl.col("a").rolling_var(5))[:, 0].to_list()
621
assert res[4:] == pytest.approx(
622
[
623
0.882,
624
1.3229999999999997,
625
1.3229999999999997,
626
0.8819999999999983,
627
0.0,
628
0.0,
629
0.0,
630
]
631
)
632
assert res[:4] == [None] * 4
633
634
635
def test_rolling_iter() -> None:
636
df = pl.DataFrame(
637
{
638
"date": [date(2020, 1, 1), date(2020, 1, 2), date(2020, 1, 5)],
639
"a": [1, 2, 2],
640
"b": [4, 5, 6],
641
}
642
).set_sorted("date")
643
644
# Without 'by' argument
645
result1 = [
646
(name[0], data.shape)
647
for name, data in df.rolling(index_column="date", period="2d")
648
]
649
expected1 = [
650
(date(2020, 1, 1), (1, 3)),
651
(date(2020, 1, 2), (2, 3)),
652
(date(2020, 1, 5), (1, 3)),
653
]
654
assert result1 == expected1
655
656
# With 'by' argument
657
result2 = [
658
(name, data.shape)
659
for name, data in df.rolling(index_column="date", period="2d", group_by="a")
660
]
661
expected2 = [
662
((1, date(2020, 1, 1)), (1, 3)),
663
((2, date(2020, 1, 2)), (1, 3)),
664
((2, date(2020, 1, 5)), (1, 3)),
665
]
666
assert result2 == expected2
667
668
669
def test_rolling_negative_period() -> None:
670
df = pl.DataFrame({"ts": [datetime(2020, 1, 1)], "value": [1]}).with_columns(
671
pl.col("ts").set_sorted()
672
)
673
with pytest.raises(
674
ComputeError, match="rolling window period should be strictly positive"
675
):
676
df.rolling("ts", period="-1d", offset="-1d").agg(pl.col("value"))
677
with pytest.raises(
678
ComputeError, match="rolling window period should be strictly positive"
679
):
680
df.lazy().rolling("ts", period="-1d", offset="-1d").agg(
681
pl.col("value")
682
).collect()
683
with pytest.raises(
684
InvalidOperationError, match="`window_size` must be strictly positive"
685
):
686
df.select(
687
pl.col("value").rolling_min_by("ts", window_size="-1d", closed="left")
688
)
689
with pytest.raises(
690
InvalidOperationError, match="`window_size` must be strictly positive"
691
):
692
df.lazy().select(
693
pl.col("value").rolling_min_by("ts", window_size="-1d", closed="left")
694
).collect()
695
696
697
def test_rolling_skew_window_offset() -> None:
698
assert (pl.arange(0, 20, eager=True) ** 2).rolling_skew(20)[
699
-1
700
] == 0.6612545648596286
701
702
703
def test_rolling_cov_corr() -> None:
704
df = pl.DataFrame({"x": [3, 3, 3, 5, 8], "y": [3, 4, 4, 4, 8]})
705
706
res = df.select(
707
pl.rolling_cov("x", "y", window_size=3).alias("cov"),
708
pl.rolling_corr("x", "y", window_size=3).alias("corr"),
709
).to_dict(as_series=False)
710
assert res["cov"][2:] == pytest.approx([0.0, 0.0, 5.333333333333336])
711
assert res["corr"][2:] == pytest.approx([nan, 0.0, 0.9176629354822473], nan_ok=True)
712
assert res["cov"][:2] == [None] * 2
713
assert res["corr"][:2] == [None] * 2
714
715
716
def test_rolling_cov_corr_nulls() -> None:
717
df1 = pl.DataFrame(
718
{"a": [1.06, 1.07, 0.93, 0.78, 0.85], "lag_a": [1.0, 1.06, 1.07, 0.93, 0.78]}
719
)
720
df2 = pl.DataFrame(
721
{
722
"a": [1.0, 1.06, 1.07, 0.93, 0.78, 0.85],
723
"lag_a": [None, 1.0, 1.06, 1.07, 0.93, 0.78],
724
}
725
)
726
727
val_1 = df1.select(
728
pl.rolling_corr("a", "lag_a", window_size=10, min_samples=5, ddof=1)
729
)
730
val_2 = df2.select(
731
pl.rolling_corr("a", "lag_a", window_size=10, min_samples=5, ddof=1)
732
)
733
734
df1_expected = pl.DataFrame({"a": [None, None, None, None, 0.62204709]})
735
df2_expected = pl.DataFrame({"a": [None, None, None, None, None, 0.62204709]})
736
737
assert_frame_equal(val_1, df1_expected, abs_tol=0.0000001)
738
assert_frame_equal(val_2, df2_expected, abs_tol=0.0000001)
739
740
val_1 = df1.select(
741
pl.rolling_cov("a", "lag_a", window_size=10, min_samples=5, ddof=1)
742
)
743
val_2 = df2.select(
744
pl.rolling_cov("a", "lag_a", window_size=10, min_samples=5, ddof=1)
745
)
746
747
df1_expected = pl.DataFrame({"a": [None, None, None, None, 0.009445]})
748
df2_expected = pl.DataFrame({"a": [None, None, None, None, None, 0.009445]})
749
750
assert_frame_equal(val_1, df1_expected, abs_tol=0.0000001)
751
assert_frame_equal(val_2, df2_expected, abs_tol=0.0000001)
752
753
754
@pytest.mark.parametrize("time_unit", ["ms", "us", "ns"])
755
def test_rolling_empty_window_9406(time_unit: TimeUnit) -> None:
756
datecol = pl.Series(
757
"d",
758
[datetime(2019, 1, x) for x in [16, 17, 18, 22, 23]],
759
dtype=pl.Datetime(time_unit=time_unit, time_zone=None),
760
).set_sorted()
761
rawdata = pl.Series("x", [1.1, 1.2, 1.3, 1.15, 1.25], dtype=pl.Float64)
762
rmin = pl.Series("x", [None, 1.1, 1.1, None, 1.15], dtype=pl.Float64)
763
rmax = pl.Series("x", [None, 1.1, 1.2, None, 1.15], dtype=pl.Float64)
764
df = pl.DataFrame([datecol, rawdata])
765
766
assert_frame_equal(
767
pl.DataFrame([datecol, rmax]),
768
df.select(
769
pl.col("d"),
770
pl.col("x").rolling_max_by("d", window_size="3d", closed="left"),
771
),
772
)
773
assert_frame_equal(
774
pl.DataFrame([datecol, rmin]),
775
df.select(
776
pl.col("d"),
777
pl.col("x").rolling_min_by("d", window_size="3d", closed="left"),
778
),
779
)
780
781
782
def test_rolling_weighted_quantile_10031() -> None:
783
assert_series_equal(
784
pl.Series([1, 2]).rolling_median(window_size=2, weights=[0, 1]),
785
pl.Series([None, 2.0]),
786
)
787
788
assert_series_equal(
789
pl.Series([1, 2, 3, 5]).rolling_quantile(0.7, "linear", 3, [0.1, 0.3, 0.6]),
790
pl.Series([None, None, 2.55, 4.1]),
791
)
792
793
assert_series_equal(
794
pl.Series([1, 2, 3, 5, 8]).rolling_quantile(
795
0.7, "linear", 4, [0.1, 0.2, 0, 0.3]
796
),
797
pl.Series([None, None, None, 3.5, 5.5]),
798
)
799
800
801
def test_rolling_meta_eq_10101() -> None:
802
assert pl.col("A").rolling_sum(10).meta.eq(pl.col("A").rolling_sum(10)) is True
803
804
805
def test_rolling_aggregations_unsorted_raise_10991() -> None:
806
df = pl.DataFrame(
807
{
808
"dt": [datetime(2020, 1, 3), datetime(2020, 1, 1), datetime(2020, 1, 2)],
809
"val": [1, 2, 3],
810
}
811
)
812
result = df.with_columns(roll=pl.col("val").rolling_sum_by("dt", "2d"))
813
expected = pl.DataFrame(
814
{
815
"dt": [datetime(2020, 1, 3), datetime(2020, 1, 1), datetime(2020, 1, 2)],
816
"val": [1, 2, 3],
817
"roll": [4, 2, 5],
818
}
819
)
820
assert_frame_equal(result, expected)
821
result = (
822
df.with_row_index()
823
.sort("dt")
824
.with_columns(roll=pl.col("val").rolling_sum_by("dt", "2d"))
825
.sort("index")
826
.drop("index")
827
)
828
assert_frame_equal(result, expected)
829
830
831
def test_rolling_aggregations_with_over_11225() -> None:
832
start = datetime(2001, 1, 1)
833
834
df_temporal = pl.DataFrame(
835
{
836
"date": [start + timedelta(days=k) for k in range(5)],
837
"group": ["A"] * 2 + ["B"] * 3,
838
}
839
).with_row_index()
840
841
df_temporal = df_temporal.sort("group", "date")
842
843
result = df_temporal.with_columns(
844
rolling_row_mean=pl.col("index")
845
.rolling_mean_by(
846
by="date",
847
window_size="2d",
848
closed="left",
849
)
850
.over("group")
851
)
852
expected = pl.DataFrame(
853
{
854
"index": [0, 1, 2, 3, 4],
855
"date": pl.datetime_range(date(2001, 1, 1), date(2001, 1, 5), eager=True),
856
"group": ["A", "A", "B", "B", "B"],
857
"rolling_row_mean": [None, 0.0, None, 2.0, 2.5],
858
},
859
schema_overrides={"index": pl.UInt32},
860
)
861
assert_frame_equal(result, expected)
862
863
864
@pytest.mark.parametrize("dtype", INTEGER_DTYPES)
865
def test_rolling_ints(dtype: PolarsDataType) -> None:
866
s = pl.Series("a", [1, 2, 3, 2, 1], dtype=dtype)
867
assert_series_equal(
868
s.rolling_min(2), pl.Series("a", [None, 1, 2, 2, 1], dtype=dtype)
869
)
870
assert_series_equal(
871
s.rolling_max(2), pl.Series("a", [None, 2, 3, 3, 2], dtype=dtype)
872
)
873
assert_series_equal(
874
s.rolling_sum(2),
875
pl.Series(
876
"a",
877
[None, 3, 5, 5, 3],
878
dtype=(
879
pl.Int64 if dtype in [pl.Int8, pl.UInt8, pl.Int16, pl.UInt16] else dtype
880
),
881
),
882
)
883
assert_series_equal(s.rolling_mean(2), pl.Series("a", [None, 1.5, 2.5, 2.5, 1.5]))
884
885
assert s.rolling_std(2).to_list()[1] == pytest.approx(0.7071067811865476)
886
assert s.rolling_var(2).to_list()[1] == pytest.approx(0.5)
887
assert s.rolling_std(2, ddof=0).to_list()[1] == pytest.approx(0.5)
888
assert s.rolling_var(2, ddof=0).to_list()[1] == pytest.approx(0.25)
889
890
assert_series_equal(
891
s.rolling_median(4), pl.Series("a", [None, None, None, 2, 2], dtype=pl.Float64)
892
)
893
assert_series_equal(
894
s.rolling_quantile(0, "nearest", 3),
895
pl.Series("a", [None, None, 1, 2, 1], dtype=pl.Float64),
896
)
897
assert_series_equal(
898
s.rolling_quantile(0, "lower", 3),
899
pl.Series("a", [None, None, 1, 2, 1], dtype=pl.Float64),
900
)
901
assert_series_equal(
902
s.rolling_quantile(0, "higher", 3),
903
pl.Series("a", [None, None, 1, 2, 1], dtype=pl.Float64),
904
)
905
assert s.rolling_skew(4).null_count() == 3
906
907
908
def test_rolling_floats() -> None:
909
# 3099
910
# test if we maintain proper dtype
911
for dt in [pl.Float32, pl.Float64]:
912
result = pl.Series([1, 2, 3], dtype=dt).rolling_min(2, weights=[0.1, 0.2])
913
expected = pl.Series([None, 0.1, 0.2], dtype=dt)
914
assert_series_equal(result, expected)
915
916
df = pl.DataFrame({"val": [1.0, 2.0, 3.0, np.nan, 5.0, 6.0, 7.0]})
917
918
for e in [
919
pl.col("val").rolling_min(window_size=3),
920
pl.col("val").rolling_max(window_size=3),
921
]:
922
out = df.with_columns(e).to_series()
923
assert out.null_count() == 2
924
assert np.isnan(out.to_numpy()).sum() == 5
925
926
expected_values = [None, None, 2.0, 3.0, 5.0, 6.0, 6.0]
927
assert (
928
df.with_columns(pl.col("val").rolling_median(window_size=3))
929
.to_series()
930
.to_list()
931
== expected_values
932
)
933
assert (
934
df.with_columns(pl.col("val").rolling_quantile(0.5, window_size=3))
935
.to_series()
936
.to_list()
937
== expected_values
938
)
939
940
nan = float("nan")
941
s = pl.Series("a", [11.0, 2.0, 9.0, nan, 8.0])
942
assert_series_equal(
943
s.rolling_sum(3),
944
pl.Series("a", [None, None, 22.0, nan, nan]),
945
)
946
947
948
def test_rolling_std_nulls_min_samples_1_20076() -> None:
949
result = pl.Series([1, 2, None, 4]).rolling_std(3, min_samples=1)
950
expected = pl.Series(
951
[None, 0.7071067811865476, 0.7071067811865476, 1.4142135623730951]
952
)
953
assert_series_equal(result, expected)
954
955
956
def test_rolling_by_date() -> None:
957
df = pl.DataFrame(
958
{
959
"dt": [date(2020, 1, 1), date(2020, 1, 2), date(2020, 1, 3)],
960
"val": [1, 2, 3],
961
}
962
).sort("dt")
963
964
result = df.with_columns(roll=pl.col("val").rolling_sum_by("dt", "2d"))
965
expected = df.with_columns(roll=pl.Series([1, 3, 5]))
966
assert_frame_equal(result, expected)
967
968
969
@pytest.mark.parametrize("dtype", [pl.Int64, pl.Int32, pl.UInt64, pl.UInt32])
970
def test_rolling_by_integer(dtype: PolarsDataType) -> None:
971
df = (
972
pl.DataFrame({"val": [1, 2, 3]})
973
.with_row_index()
974
.with_columns(pl.col("index").cast(dtype))
975
)
976
result = df.with_columns(roll=pl.col("val").rolling_sum_by("index", "2i"))
977
expected = df.with_columns(roll=pl.Series([1, 3, 5]))
978
assert_frame_equal(result, expected)
979
980
981
@pytest.mark.parametrize("dtype", INTEGER_DTYPES)
982
def test_rolling_sum_by_integer(dtype: PolarsDataType) -> None:
983
lf = (
984
pl.LazyFrame({"a": [1, 2, 3]}, schema={"a": dtype})
985
.with_row_index()
986
.select(pl.col("a").rolling_sum_by("index", "2i"))
987
)
988
result = lf.collect()
989
expected_dtype = (
990
pl.Int64 if dtype in [pl.Int8, pl.UInt8, pl.Int16, pl.UInt16] else dtype
991
)
992
expected = pl.DataFrame({"a": [1, 3, 5]}, schema={"a": expected_dtype})
993
assert_frame_equal(result, expected)
994
assert lf.collect_schema() == expected.schema
995
996
997
def test_rolling_nanoseconds_11003() -> None:
998
df = pl.DataFrame(
999
{
1000
"dt": [
1001
"2020-01-01T00:00:00.000000000",
1002
"2020-01-01T00:00:00.000000100",
1003
"2020-01-01T00:00:00.000000200",
1004
],
1005
"val": [1, 2, 3],
1006
}
1007
)
1008
df = df.with_columns(pl.col("dt").str.to_datetime(time_unit="ns")).set_sorted("dt")
1009
result = df.with_columns(pl.col("val").rolling_sum_by("dt", "500ns"))
1010
expected = df.with_columns(val=pl.Series([1, 3, 6]))
1011
assert_frame_equal(result, expected)
1012
1013
1014
def test_rolling_by_1mo_saturating_12216() -> None:
1015
df = pl.DataFrame(
1016
{
1017
"date": [
1018
date(2020, 6, 29),
1019
date(2020, 6, 30),
1020
date(2020, 7, 30),
1021
date(2020, 7, 31),
1022
date(2020, 8, 1),
1023
],
1024
"val": [1, 2, 3, 4, 5],
1025
}
1026
).set_sorted("date")
1027
result = df.rolling(index_column="date", period="1mo").agg(vals=pl.col("val"))
1028
expected = pl.DataFrame(
1029
{
1030
"date": [
1031
date(2020, 6, 29),
1032
date(2020, 6, 30),
1033
date(2020, 7, 30),
1034
date(2020, 7, 31),
1035
date(2020, 8, 1),
1036
],
1037
"vals": [[1], [1, 2], [3], [3, 4], [3, 4, 5]],
1038
}
1039
)
1040
assert_frame_equal(result, expected)
1041
1042
# check with `closed='both'` against DuckDB output
1043
result = df.rolling(index_column="date", period="1mo", closed="both").agg(
1044
vals=pl.col("val")
1045
)
1046
expected = pl.DataFrame(
1047
{
1048
"date": [
1049
date(2020, 6, 29),
1050
date(2020, 6, 30),
1051
date(2020, 7, 30),
1052
date(2020, 7, 31),
1053
date(2020, 8, 1),
1054
],
1055
"vals": [[1], [1, 2], [2, 3], [2, 3, 4], [3, 4, 5]],
1056
}
1057
)
1058
assert_frame_equal(result, expected)
1059
1060
1061
def test_index_expr_with_literal() -> None:
1062
df = pl.DataFrame({"a": [1, 2, 3], "b": ["a", "b", "c"]}).sort("a")
1063
out = df.rolling(index_column=(5 * pl.col("a")).set_sorted(), period="2i").agg(
1064
pl.col("b")
1065
)
1066
expected = pl.DataFrame({"literal": [5, 10, 15], "b": [["a"], ["b"], ["c"]]})
1067
assert_frame_equal(out, expected)
1068
1069
1070
def test_index_expr_output_name_12244() -> None:
1071
df = pl.DataFrame({"A": [1, 2, 3]})
1072
1073
out = df.rolling(pl.int_range(0, pl.len()), period="2i").agg("A")
1074
assert out.to_dict(as_series=False) == {
1075
"literal": [0, 1, 2],
1076
"A": [[1], [1, 2], [2, 3]],
1077
}
1078
1079
1080
def test_rolling_median() -> None:
1081
for n in range(10, 25):
1082
array = np.random.randint(0, 20, n)
1083
for k in [3, 5, 7]:
1084
a = pl.Series(array)
1085
assert_series_equal(
1086
a.rolling_median(k), pl.from_pandas(a.to_pandas().rolling(k).median())
1087
)
1088
1089
1090
@pytest.mark.slow
1091
def test_rolling_median_2() -> None:
1092
np.random.seed(12)
1093
n = 1000
1094
df = pl.DataFrame({"x": np.random.normal(0, 1, n)})
1095
# this can differ because simd sizes and non-associativity of floats.
1096
assert df.select(
1097
pl.col("x").rolling_median(window_size=10).sum()
1098
).item() == pytest.approx(5.139429061527812)
1099
assert df.select(
1100
pl.col("x").rolling_median(window_size=100).sum()
1101
).item() == pytest.approx(26.60506093611384)
1102
1103
1104
@pytest.mark.parametrize(
1105
("dates", "closed", "expected"),
1106
[
1107
(
1108
[date(2020, 1, 1), date(2020, 1, 2), date(2020, 1, 3)],
1109
"right",
1110
[None, 3, 5],
1111
),
1112
(
1113
[date(2020, 1, 1), date(2020, 1, 2), date(2020, 1, 3)],
1114
"left",
1115
[None, None, 3],
1116
),
1117
(
1118
[date(2020, 1, 1), date(2020, 1, 2), date(2020, 1, 3)],
1119
"both",
1120
[None, 3, 6],
1121
),
1122
(
1123
[date(2020, 1, 1), date(2020, 1, 2), date(2020, 1, 3)],
1124
"none",
1125
[None, None, None],
1126
),
1127
(
1128
[date(2020, 1, 1), date(2020, 1, 2), date(2020, 1, 4)],
1129
"right",
1130
[None, 3, None],
1131
),
1132
(
1133
[date(2020, 1, 1), date(2020, 1, 3), date(2020, 1, 4)],
1134
"right",
1135
[None, None, 5],
1136
),
1137
(
1138
[date(2020, 1, 1), date(2020, 1, 3), date(2020, 1, 5)],
1139
"right",
1140
[None, None, None],
1141
),
1142
],
1143
)
1144
def test_rolling_min_samples(
1145
dates: list[date], closed: ClosedInterval, expected: list[int]
1146
) -> None:
1147
df = pl.DataFrame({"date": dates, "value": [1, 2, 3]}).sort("date")
1148
result = df.select(
1149
pl.col("value").rolling_sum_by(
1150
"date", window_size="2d", min_samples=2, closed=closed
1151
)
1152
)["value"]
1153
assert_series_equal(result, pl.Series("value", expected, pl.Int64))
1154
1155
# Starting with unsorted data
1156
result = (
1157
df.sort("date", descending=True)
1158
.with_columns(
1159
pl.col("value").rolling_sum_by(
1160
"date", window_size="2d", min_samples=2, closed=closed
1161
)
1162
)
1163
.sort("date")["value"]
1164
)
1165
assert_series_equal(result, pl.Series("value", expected, pl.Int64))
1166
1167
1168
def test_rolling_returns_scalar_15656() -> None:
1169
df = pl.DataFrame(
1170
{
1171
"a": [date(2020, 1, 1), date(2020, 1, 2), date(2020, 1, 3)],
1172
"b": [4, 5, 6],
1173
"c": [1, 2, 3],
1174
}
1175
)
1176
result = df.group_by("c").agg(pl.col("b").rolling_mean_by("a", "2d")).sort("c")
1177
expected = pl.DataFrame({"c": [1, 2, 3], "b": [[4.0], [5.0], [6.0]]})
1178
assert_frame_equal(result, expected)
1179
1180
1181
def test_rolling_invalid() -> None:
1182
df = pl.DataFrame(
1183
{
1184
"values": [1, 4],
1185
"times": [datetime(2020, 1, 3), datetime(2020, 1, 1)],
1186
},
1187
)
1188
with pytest.raises(
1189
InvalidOperationError, match="duration may not be a parsed integer"
1190
):
1191
(
1192
df.sort("times")
1193
.rolling("times", period="3000i")
1194
.agg(pl.col("values").sum().alias("sum"))
1195
)
1196
with pytest.raises(
1197
InvalidOperationError, match="duration must be a parsed integer"
1198
):
1199
(
1200
df.with_row_index()
1201
.rolling("index", period="3000d")
1202
.agg(pl.col("values").sum().alias("sum"))
1203
)
1204
1205
1206
def test_by_different_length() -> None:
1207
df = pl.DataFrame({"b": [1]})
1208
with pytest.raises(InvalidOperationError, match="must be the same length"):
1209
df.select(
1210
pl.col("b").rolling_max_by(pl.Series([datetime(2020, 1, 1)] * 2), "1d")
1211
)
1212
1213
1214
def test_incorrect_nulls_16246() -> None:
1215
df = pl.concat(
1216
[
1217
pl.DataFrame({"a": [datetime(2020, 1, 1)], "b": [1]}),
1218
pl.DataFrame({"a": [datetime(2021, 1, 1)], "b": [1]}),
1219
],
1220
rechunk=False,
1221
)
1222
result = df.select(pl.col("b").rolling_max_by("a", "1d"))
1223
expected = pl.DataFrame({"b": [1, 1]})
1224
assert_frame_equal(result, expected)
1225
1226
1227
def test_rolling_with_dst() -> None:
1228
df = pl.DataFrame(
1229
{"a": [datetime(2020, 10, 26, 1), datetime(2020, 10, 26)], "b": [1, 2]}
1230
).with_columns(pl.col("a").dt.replace_time_zone("Europe/London"))
1231
with pytest.raises(ComputeError, match="is ambiguous"):
1232
df.select(pl.col("b").rolling_sum_by("a", "1d"))
1233
with pytest.raises(ComputeError, match="is ambiguous"):
1234
df.sort("a").select(pl.col("b").rolling_sum_by("a", "1d"))
1235
1236
1237
def interval_defs() -> SearchStrategy[ClosedInterval]:
1238
closed: list[ClosedInterval] = ["left", "right", "both", "none"]
1239
return st.sampled_from(closed)
1240
1241
1242
@given(
1243
period=st.timedeltas(
1244
min_value=timedelta(microseconds=0), max_value=timedelta(days=1000)
1245
).map(parse_as_duration_string),
1246
offset=st.timedeltas(
1247
min_value=timedelta(days=-1000), max_value=timedelta(days=1000)
1248
).map(parse_as_duration_string),
1249
closed=interval_defs(),
1250
data=st.data(),
1251
time_unit=_time_units(),
1252
)
1253
def test_rolling_parametric(
1254
period: str,
1255
offset: str,
1256
closed: ClosedInterval,
1257
data: st.DataObject,
1258
time_unit: TimeUnit,
1259
) -> None:
1260
assume(period != "")
1261
dataframe = data.draw(
1262
dataframes(
1263
[
1264
column(
1265
"ts",
1266
strategy=st.datetimes(
1267
min_value=datetime(2000, 1, 1),
1268
max_value=datetime(2001, 1, 1),
1269
),
1270
dtype=pl.Datetime(time_unit),
1271
),
1272
column(
1273
"value",
1274
strategy=st.integers(min_value=-100, max_value=100),
1275
dtype=pl.Int64,
1276
),
1277
],
1278
min_size=1,
1279
)
1280
)
1281
df = dataframe.sort("ts")
1282
result = df.rolling("ts", period=period, offset=offset, closed=closed).agg(
1283
pl.col("value")
1284
)
1285
1286
expected_dict: dict[str, list[object]] = {"ts": [], "value": []}
1287
for ts, _ in df.iter_rows():
1288
window = df.filter(
1289
pl.col("ts").is_between(
1290
pl.lit(ts, dtype=pl.Datetime(time_unit)).dt.offset_by(offset),
1291
pl.lit(ts, dtype=pl.Datetime(time_unit))
1292
.dt.offset_by(offset)
1293
.dt.offset_by(period),
1294
closed=closed,
1295
)
1296
)
1297
value = window["value"].to_list()
1298
expected_dict["ts"].append(ts)
1299
expected_dict["value"].append(value)
1300
expected = pl.DataFrame(expected_dict).select(
1301
pl.col("ts").cast(pl.Datetime(time_unit)),
1302
pl.col("value").cast(pl.List(pl.Int64)),
1303
)
1304
assert_frame_equal(result, expected)
1305
1306
1307
@given(
1308
window_size=st.timedeltas(
1309
min_value=timedelta(microseconds=0), max_value=timedelta(days=2)
1310
).map(parse_as_duration_string),
1311
closed=interval_defs(),
1312
data=st.data(),
1313
time_unit=_time_units(),
1314
aggregation=st.sampled_from(
1315
[
1316
"min",
1317
"max",
1318
"mean",
1319
"sum",
1320
"std",
1321
"var",
1322
"median",
1323
]
1324
),
1325
)
1326
def test_rolling_aggs(
1327
window_size: str,
1328
closed: ClosedInterval,
1329
data: st.DataObject,
1330
time_unit: TimeUnit,
1331
aggregation: str,
1332
) -> None:
1333
assume(window_size != "")
1334
1335
# Testing logic can be faulty when window is more precise than time unit
1336
# https://github.com/pola-rs/polars/issues/11754
1337
assume(not (time_unit == "ms" and "us" in window_size))
1338
1339
dataframe = data.draw(
1340
dataframes(
1341
[
1342
column(
1343
"ts",
1344
strategy=st.datetimes(
1345
min_value=datetime(2000, 1, 1),
1346
max_value=datetime(2001, 1, 1),
1347
),
1348
dtype=pl.Datetime(time_unit),
1349
),
1350
column(
1351
"value",
1352
strategy=st.integers(min_value=-100, max_value=100),
1353
dtype=pl.Int64,
1354
),
1355
],
1356
)
1357
)
1358
df = dataframe.sort("ts")
1359
func = f"rolling_{aggregation}_by"
1360
result = df.with_columns(
1361
getattr(pl.col("value"), func)("ts", window_size=window_size, closed=closed)
1362
)
1363
result_from_unsorted = dataframe.with_columns(
1364
getattr(pl.col("value"), func)("ts", window_size=window_size, closed=closed)
1365
).sort("ts")
1366
1367
expected_dict: dict[str, list[object]] = {"ts": [], "value": []}
1368
for ts, _ in df.iter_rows():
1369
window = df.filter(
1370
pl.col("ts").is_between(
1371
pl.lit(ts, dtype=pl.Datetime(time_unit)).dt.offset_by(
1372
f"-{window_size}"
1373
),
1374
pl.lit(ts, dtype=pl.Datetime(time_unit)),
1375
closed=closed,
1376
)
1377
)
1378
expected_dict["ts"].append(ts)
1379
if window.is_empty():
1380
expected_dict["value"].append(None)
1381
else:
1382
value = getattr(window["value"], aggregation)()
1383
expected_dict["value"].append(value)
1384
expected = pl.DataFrame(expected_dict).select(
1385
pl.col("ts").cast(pl.Datetime(time_unit)),
1386
pl.col("value").cast(result["value"].dtype),
1387
)
1388
assert_frame_equal(result, expected)
1389
assert_frame_equal(result_from_unsorted, expected)
1390
1391
1392
def test_rolling_by_nulls() -> None:
1393
df = pl.DataFrame({"a": [1, None], "b": [1, 2]})
1394
with pytest.raises(
1395
InvalidOperationError, match="not yet supported for series with null values"
1396
):
1397
df.select(pl.col("a").rolling_min_by("b", "2i"))
1398
with pytest.raises(
1399
InvalidOperationError, match="not yet supported for series with null values"
1400
):
1401
df.select(pl.col("b").rolling_min_by("a", "2i"))
1402
1403
1404
def test_window_size_validation() -> None:
1405
df = pl.DataFrame({"x": [1.0]})
1406
1407
with pytest.raises(OverflowError, match=r"can't convert negative int to unsigned"):
1408
df.with_columns(trailing_min=pl.col("x").rolling_min(window_size=-3))
1409
1410
1411
def test_rolling_empty_21032() -> None:
1412
df = pl.DataFrame(schema={"a": pl.Datetime("ms"), "b": pl.Int64()})
1413
1414
result = df.rolling(index_column="a", period=timedelta(days=2)).agg(
1415
pl.col("b").sum()
1416
)
1417
assert_frame_equal(result, df)
1418
1419
result = df.rolling(
1420
index_column="a", period=timedelta(days=2), offset=timedelta(days=3)
1421
).agg(pl.col("b").sum())
1422
assert_frame_equal(result, df)
1423
1424
1425
def test_rolling_offset_agg_15122() -> None:
1426
df = pl.DataFrame({"a": [1, 1, 1, 2, 2, 2], "b": [1, 2, 3, 1, 2, 3]})
1427
1428
result = df.rolling(index_column="b", period="1i", offset="0i", group_by="a").agg(
1429
window=pl.col("b")
1430
)
1431
expected = df.with_columns(window=pl.Series([[2], [3], [], [2], [3], []]))
1432
assert_frame_equal(result, expected)
1433
1434
result = df.rolling(index_column="b", period="1i", offset="1i", group_by="a").agg(
1435
window=pl.col("b")
1436
)
1437
expected = df.with_columns(window=pl.Series([[3], [], [], [3], [], []]))
1438
assert_frame_equal(result, expected)
1439
1440
1441
def test_rolling_sum_stability_11146() -> None:
1442
data_frame = pl.DataFrame(
1443
{
1444
"value": [
1445
0.0,
1446
290.57,
1447
107.0,
1448
172.0,
1449
124.25,
1450
304.0,
1451
379.5,
1452
347.35,
1453
1516.41,
1454
386.12,
1455
226.5,
1456
294.62,
1457
125.5,
1458
0.0,
1459
0.0,
1460
0.0,
1461
0.0,
1462
0.0,
1463
0.0,
1464
0.0,
1465
0.0,
1466
]
1467
}
1468
)
1469
assert (
1470
data_frame.with_columns(
1471
pl.col("value").rolling_mean(window_size=8, min_samples=1).alias("test_col")
1472
)["test_col"][-1]
1473
== 0.0
1474
)
1475
1476
1477
def test_rolling() -> None:
1478
df = pl.DataFrame(
1479
{
1480
"n": [0, 0, 1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7, 8, 8, 9, 9, 10, 10],
1481
"col1": ["A", "B"] * 11,
1482
}
1483
)
1484
1485
assert df.rolling("n", period="1i", group_by="col1").agg().to_dict(
1486
as_series=False
1487
) == {
1488
"col1": [
1489
"A",
1490
"A",
1491
"A",
1492
"A",
1493
"A",
1494
"A",
1495
"A",
1496
"A",
1497
"A",
1498
"A",
1499
"A",
1500
"B",
1501
"B",
1502
"B",
1503
"B",
1504
"B",
1505
"B",
1506
"B",
1507
"B",
1508
"B",
1509
"B",
1510
"B",
1511
],
1512
"n": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
1513
}
1514
1515
1516
@pytest.mark.parametrize(
1517
"method",
1518
["nearest", "higher", "lower", "midpoint", "linear", "equiprobable"],
1519
)
1520
def test_rolling_quantile_with_nulls_22781(method: QuantileMethod) -> None:
1521
lf = pl.LazyFrame(
1522
{
1523
"index": [0, 1, 2, 3, 4, 5, 6, 7, 8],
1524
"a": [None, None, 1.0, None, None, 1.0, 1.0, None, None],
1525
}
1526
)
1527
out = (
1528
lf.rolling("index", period="2i")
1529
.agg(pl.col("a").quantile(0.5, interpolation=method))
1530
.collect()
1531
)
1532
expected = pl.Series("a", [None, None, 1.0, 1.0, None, 1.0, 1.0, 1.0, None])
1533
assert_series_equal(out["a"], expected)
1534
1535
1536
def test_rolling_quantile_nearest_23392() -> None:
1537
base = range(11)
1538
s = pl.Series(base)
1539
1540
shuffle_base = list(base)
1541
random.shuffle(shuffle_base)
1542
s_shuffled = pl.Series(shuffle_base)
1543
1544
for q in np.arange(0, 1.0, 0.02, dtype=float):
1545
out = s.rolling_quantile(q, interpolation="nearest", window_size=11)
1546
1547
# explicit:
1548
expected = pl.Series([None] * 10 + [float(round(q * 10.0))])
1549
assert_series_equal(out, expected)
1550
1551
# equivalence:
1552
equiv = s.quantile(q, interpolation="nearest")
1553
assert out.last() == equiv
1554
1555
# shuffled:
1556
out = s_shuffled.rolling_quantile(q, interpolation="nearest", window_size=11)
1557
assert_series_equal(out, expected)
1558
1559
1560
def test_rolling_quantile_nearest_kernel_23392() -> None:
1561
df = pl.DataFrame(
1562
{
1563
"dt": [
1564
datetime(2021, 1, 1),
1565
datetime(2021, 1, 2),
1566
datetime(2021, 1, 4),
1567
datetime(2021, 1, 5),
1568
datetime(2021, 1, 7),
1569
],
1570
"values": pl.arange(0, 5, eager=True),
1571
}
1572
)
1573
# values (period="3d", quantile=0.7) are chosen to trigger index rounding
1574
out = (
1575
df.set_sorted("dt")
1576
.rolling("dt", period="3d", closed="both")
1577
.agg([pl.col("values").quantile(quantile=0.7).alias("quantile")])
1578
.select("quantile")
1579
)
1580
expected = pl.DataFrame({"quantile": [0.0, 1.0, 1.0, 2.0, 3.0]})
1581
assert_frame_equal(out, expected)
1582
1583
1584
def test_rolling_quantile_nearest_with_nulls_23932() -> None:
1585
lf = pl.LazyFrame(
1586
{
1587
"index": [0, 1, 2, 3, 4, 5, 6],
1588
"a": [None, None, 1.0, 2.0, 3.0, None, None],
1589
}
1590
)
1591
# values (period="3i", quantile=0.7) are chosen to trigger index rounding
1592
out = (
1593
lf.rolling("index", period="3i")
1594
.agg(pl.col("a").quantile(0.7, interpolation="nearest"))
1595
.collect()
1596
)
1597
expected = pl.Series("a", [None, None, 1.0, 2.0, 2.0, 3.0, 3.0])
1598
assert_series_equal(out["a"], expected)
1599
1600
1601
def test_wtd_min_periods_less_window() -> None:
1602
df = pl.DataFrame({"a": [1, 2, 3, 4, 5]}).with_columns(
1603
pl.col("a")
1604
.rolling_mean(
1605
window_size=3, weights=[0.25, 0.5, 0.25], min_samples=2, center=True
1606
)
1607
.alias("kernel_mean")
1608
)
1609
1610
expected = pl.DataFrame(
1611
{"a": [1, 2, 3, 4, 5], "kernel_mean": [1.333333, 2, 3, 4, 4.666667]}
1612
)
1613
1614
assert_frame_equal(df, expected)
1615
1616
df = pl.DataFrame({"a": [1, 2, 3, 4, 5]}).with_columns(
1617
pl.col("a")
1618
.rolling_sum(
1619
window_size=3, weights=[0.25, 0.5, 0.25], min_samples=2, center=True
1620
)
1621
.alias("kernel_sum")
1622
)
1623
expected = pl.DataFrame(
1624
{"a": [1, 2, 3, 4, 5], "kernel_sum": [1.0, 2.0, 3.0, 4.0, 3.5]}
1625
)
1626
1627
df = pl.DataFrame({"a": [1, 2, 3, 4, 5]}).with_columns(
1628
pl.col("a")
1629
.rolling_mean(
1630
window_size=3, weights=[0.2, 0.3, 0.5], min_samples=2, center=False
1631
)
1632
.alias("kernel_mean")
1633
)
1634
1635
expected = pl.DataFrame(
1636
{"a": [1, 2, 3, 4, 5], "kernel_mean": [None, 1.625, 2.3, 3.3, 4.3]}
1637
)
1638
1639
assert_frame_equal(df, expected)
1640
1641
df = pl.DataFrame({"a": [1, 2]}).with_columns(
1642
pl.col("a")
1643
.rolling_mean(
1644
window_size=3, weights=[0.25, 0.5, 0.25], min_samples=2, center=True
1645
)
1646
.alias("kernel_mean")
1647
)
1648
1649
# Handle edge case where the window size is larger than the number of elements
1650
expected = pl.DataFrame({"a": [1, 2], "kernel_mean": [1.333333, 1.666667]})
1651
assert_frame_equal(df, expected)
1652
1653
df = pl.DataFrame({"a": [1, 2]}).with_columns(
1654
pl.col("a")
1655
.rolling_mean(
1656
window_size=3, weights=[0.25, 0.25, 0.5], min_samples=1, center=False
1657
)
1658
.alias("kernel_mean")
1659
)
1660
1661
expected = pl.DataFrame({"a": [1, 2], "kernel_mean": [1.0, 2 * 2 / 3 + 1 * 1 / 3]})
1662
1663
df = pl.DataFrame({"a": [1]}).with_columns(
1664
pl.col("a")
1665
.rolling_sum(
1666
6, center=True, min_samples=0, weights=[1, 10, 100, 1000, 10_000, 100_000]
1667
)
1668
.alias("kernel_sum")
1669
)
1670
expected = pl.DataFrame({"a": [1], "kernel_sum": [1000.0]})
1671
assert_frame_equal(df, expected)
1672
1673
1674
def test_rolling_median_23480() -> None:
1675
vals = [None] * 17 + [3262645.8, 856191.4, 1635379.0, 34707156.0]
1676
evals = [None] * 19 + [1635379.0, (3262645.8 + 1635379.0) / 2]
1677
out = pl.DataFrame({"a": vals}).select(
1678
r15=pl.col("a").rolling_median(15, min_samples=3),
1679
r17=pl.col("a").rolling_median(17, min_samples=3),
1680
)
1681
expected = pl.DataFrame({"r15": evals, "r17": evals})
1682
assert_frame_equal(out, expected)
1683
1684
1685
@pytest.mark.slow
1686
@pytest.mark.parametrize("with_nulls", [True, False])
1687
def test_rolling_sum_non_finite_23115(with_nulls: bool) -> None:
1688
values: list[float | None] = [
1689
0.0,
1690
float("nan"),
1691
float("inf"),
1692
-float("inf"),
1693
42.0,
1694
-3.0,
1695
]
1696
if with_nulls:
1697
values.append(None)
1698
data = random.choices(values, k=1000)
1699
naive = [
1700
sum(0 if x is None else x for x in data[max(0, i + 1 - 4) : i + 1])
1701
if sum(x is not None for x in data[max(0, i + 1 - 4) : i + 1]) >= 2
1702
else None
1703
for i in range(1000)
1704
]
1705
assert_series_equal(pl.Series(data).rolling_sum(4, min_samples=2), pl.Series(naive))
1706
1707