Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/py-polars/tests/unit/operations/test_group_by_dynamic.py
6939 views
1
from __future__ import annotations
2
3
from datetime import date, datetime, timedelta, timezone
4
from typing import TYPE_CHECKING, Any
5
from zoneinfo import ZoneInfo
6
7
import numpy as np
8
import pytest
9
10
import polars as pl
11
from polars.exceptions import ComputeError, InvalidOperationError
12
from polars.testing import assert_frame_equal
13
14
if TYPE_CHECKING:
15
from polars._typing import Label, StartBy
16
17
18
@pytest.mark.parametrize(
19
("input_df", "expected_grouped_df"),
20
[
21
(
22
(
23
pl.DataFrame(
24
{
25
"dt": [
26
datetime(2021, 12, 31, 0, 0, 0),
27
datetime(2022, 1, 1, 0, 0, 1),
28
datetime(2022, 3, 31, 0, 0, 1),
29
datetime(2022, 4, 1, 0, 0, 1),
30
]
31
}
32
)
33
),
34
pl.DataFrame(
35
{
36
"dt": [
37
datetime(2021, 10, 1),
38
datetime(2022, 1, 1),
39
datetime(2022, 4, 1),
40
],
41
"num_points": [1, 2, 1],
42
},
43
schema={"dt": pl.Datetime, "num_points": pl.UInt32},
44
).sort("dt"),
45
)
46
],
47
)
48
def test_group_by_dynamic(
49
input_df: pl.DataFrame, expected_grouped_df: pl.DataFrame
50
) -> None:
51
result = (
52
input_df.sort("dt")
53
.group_by_dynamic("dt", every="1q")
54
.agg(pl.col("dt").count().alias("num_points"))
55
.sort("dt")
56
)
57
assert_frame_equal(result, expected_grouped_df)
58
59
60
@pytest.mark.parametrize(
61
("every", "offset"),
62
[
63
("3d", "-1d"),
64
(timedelta(days=3), timedelta(days=-1)),
65
],
66
)
67
def test_dynamic_group_by_timezone_awareness(
68
every: str | timedelta, offset: str | timedelta
69
) -> None:
70
df = pl.DataFrame(
71
(
72
pl.datetime_range(
73
datetime(2020, 1, 1),
74
datetime(2020, 1, 10),
75
timedelta(days=1),
76
time_unit="ns",
77
eager=True,
78
)
79
.alias("datetime")
80
.dt.replace_time_zone("UTC"),
81
pl.arange(1, 11, eager=True).alias("value"),
82
)
83
)
84
85
assert (
86
df.group_by_dynamic(
87
"datetime",
88
every=every,
89
offset=offset,
90
closed="right",
91
include_boundaries=True,
92
label="datapoint",
93
).agg(pl.col("value").last())
94
).dtypes == [pl.Datetime("ns", "UTC")] * 3 + [pl.Int64]
95
96
97
@pytest.mark.parametrize("tzinfo", [None, ZoneInfo("UTC"), ZoneInfo("Asia/Kathmandu")])
98
def test_group_by_dynamic_startby_5599(tzinfo: ZoneInfo | None) -> None:
99
# start by datapoint
100
start = datetime(2022, 12, 16, tzinfo=tzinfo)
101
stop = datetime(2022, 12, 16, hour=3, tzinfo=tzinfo)
102
df = pl.DataFrame({"date": pl.datetime_range(start, stop, "30m", eager=True)})
103
104
assert df.group_by_dynamic(
105
"date",
106
every="31m",
107
include_boundaries=True,
108
label="datapoint",
109
start_by="datapoint",
110
).agg(pl.len()).to_dict(as_series=False) == {
111
"_lower_boundary": [
112
datetime(2022, 12, 16, 0, 0, tzinfo=tzinfo),
113
datetime(2022, 12, 16, 0, 31, tzinfo=tzinfo),
114
datetime(2022, 12, 16, 1, 2, tzinfo=tzinfo),
115
datetime(2022, 12, 16, 1, 33, tzinfo=tzinfo),
116
datetime(2022, 12, 16, 2, 4, tzinfo=tzinfo),
117
datetime(2022, 12, 16, 2, 35, tzinfo=tzinfo),
118
],
119
"_upper_boundary": [
120
datetime(2022, 12, 16, 0, 31, tzinfo=tzinfo),
121
datetime(2022, 12, 16, 1, 2, tzinfo=tzinfo),
122
datetime(2022, 12, 16, 1, 33, tzinfo=tzinfo),
123
datetime(2022, 12, 16, 2, 4, tzinfo=tzinfo),
124
datetime(2022, 12, 16, 2, 35, tzinfo=tzinfo),
125
datetime(2022, 12, 16, 3, 6, tzinfo=tzinfo),
126
],
127
"date": [
128
datetime(2022, 12, 16, 0, 0, tzinfo=tzinfo),
129
datetime(2022, 12, 16, 1, 0, tzinfo=tzinfo),
130
datetime(2022, 12, 16, 1, 30, tzinfo=tzinfo),
131
datetime(2022, 12, 16, 2, 0, tzinfo=tzinfo),
132
datetime(2022, 12, 16, 2, 30, tzinfo=tzinfo),
133
datetime(2022, 12, 16, 3, 0, tzinfo=tzinfo),
134
],
135
"len": [2, 1, 1, 1, 1, 1],
136
}
137
138
# start by monday
139
start = datetime(2022, 1, 1, tzinfo=tzinfo)
140
stop = datetime(2022, 1, 12, 7, tzinfo=tzinfo)
141
142
df = pl.DataFrame(
143
{"date": pl.datetime_range(start, stop, "12h", eager=True)}
144
).with_columns(pl.col("date").dt.weekday().alias("day"))
145
146
result = df.group_by_dynamic(
147
"date",
148
every="1w",
149
period="3d",
150
include_boundaries=True,
151
start_by="monday",
152
label="datapoint",
153
).agg([pl.len(), pl.col("day").first().alias("data_day")])
154
assert result.to_dict(as_series=False) == {
155
"_lower_boundary": [
156
datetime(2022, 1, 3, 0, 0, tzinfo=tzinfo),
157
datetime(2022, 1, 10, 0, 0, tzinfo=tzinfo),
158
],
159
"_upper_boundary": [
160
datetime(2022, 1, 6, 0, 0, tzinfo=tzinfo),
161
datetime(2022, 1, 13, 0, 0, tzinfo=tzinfo),
162
],
163
"date": [
164
datetime(2022, 1, 3, 0, 0, tzinfo=tzinfo),
165
datetime(2022, 1, 10, 0, 0, tzinfo=tzinfo),
166
],
167
"len": [6, 5],
168
"data_day": [1, 1],
169
}
170
# start by saturday
171
result = df.group_by_dynamic(
172
"date",
173
every="1w",
174
period="3d",
175
include_boundaries=True,
176
start_by="saturday",
177
label="datapoint",
178
).agg([pl.len(), pl.col("day").first().alias("data_day")])
179
assert result.to_dict(as_series=False) == {
180
"_lower_boundary": [
181
datetime(2022, 1, 1, 0, 0, tzinfo=tzinfo),
182
datetime(2022, 1, 8, 0, 0, tzinfo=tzinfo),
183
],
184
"_upper_boundary": [
185
datetime(2022, 1, 4, 0, 0, tzinfo=tzinfo),
186
datetime(2022, 1, 11, 0, 0, tzinfo=tzinfo),
187
],
188
"date": [
189
datetime(2022, 1, 1, 0, 0, tzinfo=tzinfo),
190
datetime(2022, 1, 8, 0, 0, tzinfo=tzinfo),
191
],
192
"len": [6, 6],
193
"data_day": [6, 6],
194
}
195
196
197
def test_group_by_dynamic_by_monday_and_offset_5444() -> None:
198
df = pl.DataFrame(
199
{
200
"date": [
201
"2022-11-01",
202
"2022-11-02",
203
"2022-11-05",
204
"2022-11-08",
205
"2022-11-08",
206
"2022-11-09",
207
"2022-11-10",
208
],
209
"label": ["a", "b", "a", "a", "b", "a", "b"],
210
"value": [1, 2, 3, 4, 5, 6, 7],
211
}
212
).with_columns(pl.col("date").str.strptime(pl.Date, "%Y-%m-%d").set_sorted())
213
214
result = df.group_by_dynamic(
215
"date", every="1w", offset="1d", group_by="label", start_by="monday"
216
).agg(pl.col("value").sum())
217
218
assert result.to_dict(as_series=False) == {
219
"label": ["a", "a", "b", "b"],
220
"date": [
221
date(2022, 11, 1),
222
date(2022, 11, 8),
223
date(2022, 11, 1),
224
date(2022, 11, 8),
225
],
226
"value": [4, 10, 2, 12],
227
}
228
229
# test empty
230
result_empty = (
231
df.filter(pl.col("date") == date(1, 1, 1))
232
.group_by_dynamic(
233
"date", every="1w", offset="1d", group_by="label", start_by="monday"
234
)
235
.agg(pl.col("value").sum())
236
)
237
assert result_empty.schema == result.schema
238
239
240
@pytest.mark.parametrize(
241
("label", "expected"),
242
[
243
("left", [datetime(2020, 1, 1), datetime(2020, 1, 2)]),
244
("right", [datetime(2020, 1, 2), datetime(2020, 1, 3)]),
245
("datapoint", [datetime(2020, 1, 1, 1), datetime(2020, 1, 2, 3)]),
246
],
247
)
248
def test_group_by_dynamic_label(label: Label, expected: list[datetime]) -> None:
249
df = pl.DataFrame(
250
{
251
"ts": [
252
datetime(2020, 1, 1, 1),
253
datetime(2020, 1, 1, 2),
254
datetime(2020, 1, 2, 3),
255
datetime(2020, 1, 2, 4),
256
],
257
"n": [1, 2, 3, 4],
258
"group": ["a", "a", "b", "b"],
259
}
260
).sort("ts")
261
result = (
262
df.group_by_dynamic("ts", every="1d", label=label, group_by="group")
263
.agg(pl.col("n"))["ts"]
264
.to_list()
265
)
266
assert result == expected
267
268
269
@pytest.mark.parametrize(
270
("label", "expected"),
271
[
272
("left", [datetime(2020, 1, 1), datetime(2020, 1, 2), datetime(2020, 1, 3)]),
273
("right", [datetime(2020, 1, 2), datetime(2020, 1, 3), datetime(2020, 1, 4)]),
274
(
275
"datapoint",
276
[datetime(2020, 1, 1, 1), datetime(2020, 1, 2, 2), datetime(2020, 1, 3, 3)],
277
),
278
],
279
)
280
def test_group_by_dynamic_label_with_by(label: Label, expected: list[datetime]) -> None:
281
df = pl.DataFrame(
282
{
283
"ts": [
284
datetime(2020, 1, 1, 1),
285
datetime(2020, 1, 2, 2),
286
datetime(2020, 1, 3, 3),
287
],
288
"n": [1, 2, 3],
289
}
290
).sort("ts")
291
result = (
292
df.group_by_dynamic("ts", every="1d", label=label)
293
.agg(pl.col("n"))["ts"]
294
.to_list()
295
)
296
assert result == expected
297
298
299
def test_group_by_dynamic_slice_pushdown() -> None:
300
df = pl.DataFrame({"a": [1, 2, 3], "b": ["a", "a", "b"], "c": [1, 3, 5]}).lazy()
301
df = (
302
df.sort("a")
303
.group_by_dynamic("a", group_by="b", every="2i")
304
.agg((pl.col("c") - pl.col("c").shift(fill_value=0)).sum().alias("c"))
305
)
306
assert df.head(2).collect().to_dict(as_series=False) == {
307
"b": ["a", "a"],
308
"a": [0, 2],
309
"c": [1, 3],
310
}
311
312
313
def test_rolling_kernels_group_by_dynamic_7548() -> None:
314
assert pl.DataFrame(
315
{"time": pl.arange(0, 4, eager=True), "value": pl.arange(0, 4, eager=True)}
316
).group_by_dynamic("time", every="1i", period="3i").agg(
317
pl.col("value"),
318
pl.col("value").min().alias("min_value"),
319
pl.col("value").max().alias("max_value"),
320
pl.col("value").sum().alias("sum_value"),
321
).to_dict(as_series=False) == {
322
"time": [0, 1, 2, 3],
323
"value": [[0, 1, 2], [1, 2, 3], [2, 3], [3]],
324
"min_value": [0, 1, 2, 3],
325
"max_value": [2, 3, 3, 3],
326
"sum_value": [3, 6, 5, 3],
327
}
328
329
330
def test_rolling_dynamic_sortedness_check() -> None:
331
# when the by argument is passed, the sortedness flag
332
# will be unset as the take shuffles data, so we must explicitly
333
# check the sortedness
334
df = pl.DataFrame(
335
{
336
"idx": [1, 2, -1, 2, 1, 1],
337
"group": [1, 1, 1, 2, 2, 1],
338
}
339
)
340
341
with pytest.raises(ComputeError, match=r"input data is not sorted"):
342
df.group_by_dynamic("idx", every="2i", group_by="group").agg(
343
pl.col("idx").alias("idx1")
344
)
345
346
# no `by` argument
347
with pytest.raises(
348
InvalidOperationError,
349
match=r"argument in operation 'group_by_dynamic' is not sorted",
350
):
351
df.group_by_dynamic("idx", every="2i").agg(pl.col("idx").alias("idx1"))
352
353
354
@pytest.mark.parametrize("time_zone", [None, "UTC", "Asia/Kathmandu"])
355
def test_group_by_dynamic_elementwise_following_mean_agg_6904(
356
time_zone: str | None,
357
) -> None:
358
df = (
359
pl.DataFrame(
360
{
361
"a": [datetime(2021, 1, 1) + timedelta(seconds=2**i) for i in range(5)],
362
"b": [float(i) for i in range(5)],
363
}
364
)
365
.with_columns(pl.col("a").dt.replace_time_zone(time_zone))
366
.lazy()
367
.set_sorted("a")
368
.group_by_dynamic("a", every="10s", period="100s")
369
.agg([pl.col("b").mean().sin().alias("c")])
370
.collect()
371
)
372
assert_frame_equal(
373
df,
374
pl.DataFrame(
375
{
376
"a": [
377
datetime(2021, 1, 1, 0, 0),
378
datetime(2021, 1, 1, 0, 0, 10),
379
],
380
"c": [0.9092974268256817, -0.7568024953079282],
381
}
382
).with_columns(pl.col("a").dt.replace_time_zone(time_zone)),
383
)
384
385
386
@pytest.mark.parametrize("every", ["1h", timedelta(hours=1)])
387
@pytest.mark.parametrize("tzinfo", [None, ZoneInfo("UTC"), ZoneInfo("Asia/Kathmandu")])
388
def test_group_by_dynamic_lazy(every: str | timedelta, tzinfo: ZoneInfo | None) -> None:
389
ldf = pl.LazyFrame(
390
{
391
"time": pl.datetime_range(
392
start=datetime(2021, 12, 16, tzinfo=tzinfo),
393
end=datetime(2021, 12, 16, 2, tzinfo=tzinfo),
394
interval="30m",
395
eager=True,
396
),
397
"n": range(5),
398
}
399
)
400
df = (
401
ldf.group_by_dynamic("time", every=every, closed="right")
402
.agg(
403
[
404
pl.col("time").min().alias("time_min"),
405
pl.col("time").max().alias("time_max"),
406
]
407
)
408
.collect()
409
)
410
assert sorted(df.rows()) == [
411
(
412
datetime(2021, 12, 15, 23, 0, tzinfo=tzinfo),
413
datetime(2021, 12, 16, 0, 0, tzinfo=tzinfo),
414
datetime(2021, 12, 16, 0, 0, tzinfo=tzinfo),
415
),
416
(
417
datetime(2021, 12, 16, 0, 0, tzinfo=tzinfo),
418
datetime(2021, 12, 16, 0, 30, tzinfo=tzinfo),
419
datetime(2021, 12, 16, 1, 0, tzinfo=tzinfo),
420
),
421
(
422
datetime(2021, 12, 16, 1, 0, tzinfo=tzinfo),
423
datetime(2021, 12, 16, 1, 30, tzinfo=tzinfo),
424
datetime(2021, 12, 16, 2, 0, tzinfo=tzinfo),
425
),
426
]
427
428
429
def test_group_by_dynamic_validation() -> None:
430
df = pl.DataFrame(
431
{
432
"index": [0, 0, 1, 1],
433
"group": ["banana", "pear", "banana", "pear"],
434
"weight": [2, 3, 5, 7],
435
}
436
)
437
438
with pytest.raises(ComputeError, match="'every' argument must be positive"):
439
df.group_by_dynamic("index", group_by="group", every="-1i", period="2i").agg(
440
pl.col("weight")
441
)
442
443
444
def test_no_sorted_no_error() -> None:
445
df = pl.DataFrame(
446
{
447
"dt": [datetime(2001, 1, 1), datetime(2001, 1, 2)],
448
}
449
)
450
result = df.group_by_dynamic("dt", every="1h").agg(pl.len().alias("count"))
451
expected = pl.DataFrame(
452
{
453
"dt": [datetime(2001, 1, 1), datetime(2001, 1, 2)],
454
"count": [1, 1],
455
},
456
schema_overrides={"count": pl.get_index_type()},
457
)
458
assert_frame_equal(result, expected)
459
460
461
@pytest.mark.parametrize("tzinfo", [None, ZoneInfo("UTC"), ZoneInfo("Asia/Kathmandu")])
462
def test_truncate_negative_offset(tzinfo: ZoneInfo | None) -> None:
463
time_zone = tzinfo.key if tzinfo is not None else None
464
df = pl.DataFrame(
465
{
466
"event_date": [
467
datetime(2021, 4, 11),
468
datetime(2021, 4, 29),
469
datetime(2021, 5, 29),
470
],
471
"adm1_code": [1, 2, 1],
472
}
473
).set_sorted("event_date")
474
df = df.with_columns(pl.col("event_date").dt.replace_time_zone(time_zone))
475
out = df.group_by_dynamic(
476
index_column="event_date",
477
every="1mo",
478
period="2mo",
479
offset="-1mo",
480
include_boundaries=True,
481
).agg(
482
[
483
pl.col("adm1_code"),
484
]
485
)
486
487
assert out["event_date"].to_list() == [
488
datetime(2021, 3, 1, tzinfo=tzinfo),
489
datetime(2021, 4, 1, tzinfo=tzinfo),
490
datetime(2021, 5, 1, tzinfo=tzinfo),
491
]
492
df = pl.DataFrame(
493
{
494
"event_date": [
495
datetime(2021, 4, 11),
496
datetime(2021, 4, 29),
497
datetime(2021, 5, 29),
498
],
499
"adm1_code": [1, 2, 1],
500
"five_type": ["a", "b", "a"],
501
"actor": ["a", "a", "a"],
502
"admin": ["a", "a", "a"],
503
"fatalities": [10, 20, 30],
504
}
505
).set_sorted("event_date")
506
df = df.with_columns(pl.col("event_date").dt.replace_time_zone(time_zone))
507
508
out = df.group_by_dynamic(
509
index_column="event_date",
510
every="1mo",
511
group_by=["admin", "five_type", "actor"],
512
).agg([pl.col("adm1_code").unique(), (pl.col("fatalities") > 0).sum()])
513
514
assert out["event_date"].to_list() == [
515
datetime(2021, 4, 1, tzinfo=tzinfo),
516
datetime(2021, 5, 1, tzinfo=tzinfo),
517
datetime(2021, 4, 1, tzinfo=tzinfo),
518
]
519
520
for dt in [pl.Int32, pl.Int64]:
521
df = (
522
pl.DataFrame(
523
{
524
"idx": np.arange(6),
525
"A": ["A", "A", "B", "B", "B", "C"],
526
}
527
)
528
.with_columns(pl.col("idx").cast(dt))
529
.set_sorted("idx")
530
)
531
532
out = df.group_by_dynamic(
533
"idx", every="2i", period="3i", include_boundaries=True
534
).agg(pl.col("A"))
535
536
assert out.shape == (3, 4)
537
assert out["A"].to_list() == [
538
["A", "A", "B"],
539
["B", "B", "B"],
540
["B", "C"],
541
]
542
543
544
def test_groupy_by_dynamic_median_10695() -> None:
545
df = pl.DataFrame(
546
{
547
"timestamp": pl.datetime_range(
548
datetime(2023, 8, 22, 15, 44, 30),
549
datetime(2023, 8, 22, 15, 48, 50),
550
"20s",
551
eager=True,
552
),
553
"foo": [2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
554
}
555
)
556
557
assert df.group_by_dynamic(
558
index_column="timestamp",
559
every="60s",
560
period="3m",
561
).agg(pl.col("foo").median()).to_dict(as_series=False) == {
562
"timestamp": [
563
datetime(2023, 8, 22, 15, 44),
564
datetime(2023, 8, 22, 15, 45),
565
datetime(2023, 8, 22, 15, 46),
566
datetime(2023, 8, 22, 15, 47),
567
datetime(2023, 8, 22, 15, 48),
568
],
569
"foo": [1.0, 1.0, 1.0, 1.0, 1.0],
570
}
571
572
573
def test_group_by_dynamic_when_conversion_crosses_dates_7274() -> None:
574
df = (
575
pl.DataFrame(
576
data={
577
"timestamp": ["1970-01-01 00:00:00+01:00", "1970-01-01 01:00:00+01:00"],
578
"value": [1, 1],
579
}
580
)
581
.with_columns(
582
pl.col("timestamp")
583
.str.strptime(pl.Datetime, format="%Y-%m-%d %H:%M:%S%:z")
584
.dt.convert_time_zone("Africa/Lagos")
585
.set_sorted()
586
)
587
.with_columns(
588
pl.col("timestamp")
589
.dt.convert_time_zone("UTC")
590
.alias("timestamp_utc")
591
.set_sorted()
592
)
593
)
594
result = df.group_by_dynamic(
595
index_column="timestamp", every="1d", closed="left"
596
).agg(pl.col("value").count())
597
expected = pl.DataFrame({"timestamp": [datetime(1970, 1, 1)], "value": [2]})
598
expected = expected.with_columns(
599
pl.col("timestamp").dt.replace_time_zone("Africa/Lagos"),
600
pl.col("value").cast(pl.UInt32),
601
)
602
assert_frame_equal(result, expected)
603
result = df.group_by_dynamic(
604
index_column="timestamp_utc", every="1d", closed="left"
605
).agg(pl.col("value").count())
606
expected = pl.DataFrame(
607
{
608
"timestamp_utc": [datetime(1969, 12, 31), datetime(1970, 1, 1)],
609
"value": [1, 1],
610
}
611
)
612
expected = expected.with_columns(
613
pl.col("timestamp_utc").dt.replace_time_zone("UTC"),
614
pl.col("value").cast(pl.UInt32),
615
)
616
assert_frame_equal(result, expected)
617
618
619
@pytest.mark.parametrize("time_zone", [None, "UTC", "Asia/Kathmandu"])
620
def test_default_negative_every_offset_dynamic_group_by(time_zone: str | None) -> None:
621
# 2791
622
dts = [
623
datetime(2020, 1, 1),
624
datetime(2020, 1, 2),
625
datetime(2020, 2, 1),
626
datetime(2020, 3, 1),
627
]
628
df = pl.DataFrame({"dt": dts, "idx": range(len(dts))}).set_sorted("dt")
629
df = df.with_columns(pl.col("dt").dt.replace_time_zone(time_zone))
630
out = df.group_by_dynamic(index_column="dt", every="1mo", closed="right").agg(
631
pl.col("idx")
632
)
633
634
expected = pl.DataFrame(
635
{
636
"dt": [
637
datetime(2019, 12, 1, 0, 0),
638
datetime(2020, 1, 1, 0, 0),
639
datetime(2020, 2, 1, 0, 0),
640
],
641
"idx": [[0], [1, 2], [3]],
642
}
643
)
644
expected = expected.with_columns(pl.col("dt").dt.replace_time_zone(time_zone))
645
assert_frame_equal(out, expected)
646
647
648
@pytest.mark.parametrize(
649
("rule", "offset"),
650
[
651
("1h", timedelta(hours=2)),
652
("1d", timedelta(days=2)),
653
("1w", timedelta(weeks=2)),
654
],
655
)
656
def test_group_by_dynamic_crossing_dst(rule: str, offset: timedelta) -> None:
657
start_dt = datetime(2021, 11, 7)
658
end_dt = start_dt + offset
659
date_range = pl.datetime_range(
660
start_dt, end_dt, rule, time_zone="US/Central", eager=True
661
)
662
df = pl.DataFrame({"time": date_range, "value": range(len(date_range))})
663
result = df.group_by_dynamic("time", every=rule, start_by="datapoint").agg(
664
pl.col("value").mean()
665
)
666
expected = pl.DataFrame(
667
{"time": date_range, "value": range(len(date_range))},
668
schema_overrides={"value": pl.Float64},
669
)
670
assert_frame_equal(result, expected)
671
672
673
@pytest.mark.parametrize(
674
("start_by", "expected_time", "expected_value"),
675
[
676
(
677
"monday",
678
[
679
datetime(2021, 11, 1),
680
datetime(2021, 11, 8),
681
],
682
[0.0, 4.0],
683
),
684
(
685
"tuesday",
686
[
687
datetime(2021, 11, 2),
688
datetime(2021, 11, 9),
689
],
690
[0.5, 4.5],
691
),
692
(
693
"wednesday",
694
[
695
datetime(2021, 11, 3),
696
datetime(2021, 11, 10),
697
],
698
[1.0, 5.0],
699
),
700
(
701
"thursday",
702
[
703
datetime(2021, 11, 4),
704
datetime(2021, 11, 11),
705
],
706
[1.5, 5.5],
707
),
708
(
709
"friday",
710
[
711
datetime(2021, 11, 5),
712
datetime(2021, 11, 12),
713
],
714
[2.0, 6.0],
715
),
716
(
717
"saturday",
718
[
719
datetime(2021, 11, 6),
720
datetime(2021, 11, 13),
721
],
722
[2.5, 6.5],
723
),
724
(
725
"sunday",
726
[
727
datetime(2021, 11, 7),
728
datetime(2021, 11, 14),
729
],
730
[3.0, 7.0],
731
),
732
],
733
)
734
def test_group_by_dynamic_startby_monday_crossing_dst(
735
start_by: StartBy, expected_time: list[datetime], expected_value: list[float]
736
) -> None:
737
start_dt = datetime(2021, 11, 7)
738
end_dt = datetime(2021, 11, 14)
739
date_range = pl.datetime_range(
740
start_dt, end_dt, "1d", time_zone="US/Central", eager=True
741
)
742
df = pl.DataFrame({"time": date_range, "value": range(len(date_range))})
743
result = df.group_by_dynamic("time", every="1w", start_by=start_by).agg(
744
pl.col("value").mean()
745
)
746
expected = pl.DataFrame(
747
{"time": expected_time, "value": expected_value},
748
)
749
expected = expected.with_columns(pl.col("time").dt.replace_time_zone("US/Central"))
750
assert_frame_equal(result, expected)
751
752
753
def test_group_by_dynamic_startby_monday_dst_8737() -> None:
754
start_dt = datetime(2021, 11, 6, 20)
755
stop_dt = datetime(2021, 11, 7, 20)
756
date_range = pl.datetime_range(
757
start_dt, stop_dt, "1d", time_zone="US/Central", eager=True
758
)
759
df = pl.DataFrame({"time": date_range, "value": range(len(date_range))})
760
result = df.group_by_dynamic("time", every="1w", start_by="monday").agg(
761
pl.col("value").mean()
762
)
763
expected = pl.DataFrame(
764
{
765
"time": [
766
datetime(2021, 11, 1),
767
],
768
"value": [0.5],
769
},
770
)
771
expected = expected.with_columns(pl.col("time").dt.replace_time_zone("US/Central"))
772
assert_frame_equal(result, expected)
773
774
775
def test_group_by_dynamic_monthly_crossing_dst() -> None:
776
start_dt = datetime(2021, 11, 1)
777
end_dt = datetime(2021, 12, 1)
778
date_range = pl.datetime_range(
779
start_dt, end_dt, "1mo", time_zone="US/Central", eager=True
780
)
781
df = pl.DataFrame({"time": date_range, "value": range(len(date_range))})
782
result = df.group_by_dynamic("time", every="1mo").agg(pl.col("value").mean())
783
expected = pl.DataFrame(
784
{"time": date_range, "value": range(len(date_range))},
785
schema_overrides={"value": pl.Float64},
786
)
787
assert_frame_equal(result, expected)
788
789
790
def test_group_by_dynamic_2d_9333() -> None:
791
df = pl.DataFrame({"ts": [datetime(2000, 1, 1, 3)], "values": [10.0]})
792
df = df.with_columns(pl.col("ts").set_sorted())
793
result = df.group_by_dynamic("ts", every="2d").agg(pl.col("values"))
794
expected = pl.DataFrame({"ts": [datetime(1999, 12, 31, 0)], "values": [[10.0]]})
795
assert_frame_equal(result, expected)
796
797
798
@pytest.mark.parametrize("every", ["1h", timedelta(hours=1)])
799
@pytest.mark.parametrize("tzinfo", [None, ZoneInfo("UTC"), ZoneInfo("Asia/Kathmandu")])
800
def test_group_by_dynamic_iter(every: str | timedelta, tzinfo: ZoneInfo | None) -> None:
801
time_zone = tzinfo.key if tzinfo is not None else None
802
df = pl.DataFrame(
803
{
804
"datetime": [
805
datetime(2020, 1, 1, 10, 0),
806
datetime(2020, 1, 1, 10, 50),
807
datetime(2020, 1, 1, 11, 10),
808
],
809
"a": [1, 2, 2],
810
"b": [4, 5, 6],
811
}
812
).set_sorted("datetime")
813
df = df.with_columns(pl.col("datetime").dt.replace_time_zone(time_zone))
814
815
# Without 'by' argument
816
result1 = [
817
(name, data.shape)
818
for name, data in df.group_by_dynamic("datetime", every=every, closed="left")
819
]
820
expected1 = [
821
((datetime(2020, 1, 1, 10, tzinfo=tzinfo),), (2, 3)),
822
((datetime(2020, 1, 1, 11, tzinfo=tzinfo),), (1, 3)),
823
]
824
assert result1 == expected1
825
826
# With 'by' argument
827
result2 = [
828
(name, data.shape)
829
for name, data in df.group_by_dynamic(
830
"datetime", every=every, closed="left", group_by="a"
831
)
832
]
833
expected2 = [
834
((1, datetime(2020, 1, 1, 10, tzinfo=tzinfo)), (1, 3)),
835
((2, datetime(2020, 1, 1, 10, tzinfo=tzinfo)), (1, 3)),
836
((2, datetime(2020, 1, 1, 11, tzinfo=tzinfo)), (1, 3)),
837
]
838
assert result2 == expected2
839
840
841
# https://github.com/pola-rs/polars/issues/11339
842
@pytest.mark.parametrize("include_boundaries", [True, False])
843
def test_group_by_dynamic_lazy_schema(include_boundaries: bool) -> None:
844
lf = pl.LazyFrame(
845
{
846
"dt": pl.datetime_range(
847
start=datetime(2022, 2, 10),
848
end=datetime(2022, 2, 12),
849
eager=True,
850
),
851
"n": range(3),
852
}
853
)
854
855
result = lf.group_by_dynamic(
856
"dt", every="2d", closed="right", include_boundaries=include_boundaries
857
).agg(pl.col("dt").min().alias("dt_min"))
858
859
assert result.collect_schema() == result.collect().schema
860
861
862
def test_group_by_dynamic_12414() -> None:
863
df = pl.DataFrame(
864
{
865
"today": [
866
date(2023, 3, 3),
867
date(2023, 8, 31),
868
date(2023, 9, 1),
869
date(2023, 9, 4),
870
],
871
"b": [1, 2, 3, 4],
872
}
873
).sort("today")
874
assert df.group_by_dynamic(
875
"today",
876
every="6mo",
877
period="3d",
878
closed="left",
879
start_by="datapoint",
880
include_boundaries=True,
881
).agg(
882
gt_min_count=(pl.col.b >= (pl.col.b.min())).sum(),
883
).to_dict(as_series=False) == {
884
"_lower_boundary": [datetime(2023, 3, 3, 0, 0), datetime(2023, 9, 3, 0, 0)],
885
"_upper_boundary": [datetime(2023, 3, 6, 0, 0), datetime(2023, 9, 6, 0, 0)],
886
"today": [date(2023, 3, 3), date(2023, 9, 3)],
887
"gt_min_count": [1, 1],
888
}
889
890
891
@pytest.mark.parametrize("input", [[pl.col("b").sum()], pl.col("b").sum()])
892
def test_group_by_dynamic_agg_input_types(input: Any) -> None:
893
df = pl.LazyFrame({"index_column": [0, 1, 2, 3], "b": [1, 3, 1, 2]}).set_sorted(
894
"index_column"
895
)
896
result = df.group_by_dynamic(
897
index_column="index_column", every="2i", closed="right"
898
).agg(input)
899
900
expected = pl.LazyFrame({"index_column": [-2, 0, 2], "b": [1, 4, 2]})
901
assert_frame_equal(result, expected)
902
903
904
@pytest.mark.parametrize("input", [str, "b".join])
905
def test_group_by_dynamic_agg_bad_input_types(input: Any) -> None:
906
df = pl.LazyFrame({"index_column": [0, 1, 2, 3], "b": [1, 3, 1, 2]}).set_sorted(
907
"index_column"
908
)
909
with pytest.raises(TypeError):
910
df.group_by_dynamic(
911
index_column="index_column", every="2i", closed="right"
912
).agg(input)
913
914
915
def test_group_by_dynamic_15225() -> None:
916
df = pl.DataFrame(
917
{
918
"a": [1, 2, 3],
919
"b": [date(2020, 1, 1), date(2020, 1, 2), date(2020, 1, 3)],
920
"c": [1, 1, 2],
921
}
922
)
923
result = df.group_by_dynamic("b", every="2d").agg(pl.sum("a"))
924
expected = pl.DataFrame({"b": [date(2020, 1, 1), date(2020, 1, 3)], "a": [3, 3]})
925
assert_frame_equal(result, expected)
926
result = df.group_by_dynamic("b", every="2d", group_by="c").agg(pl.sum("a"))
927
expected = pl.DataFrame(
928
{"c": [1, 2], "b": [date(2020, 1, 1), date(2020, 1, 3)], "a": [3, 3]}
929
)
930
assert_frame_equal(result, expected)
931
932
933
@pytest.mark.parametrize("start_by", ["window", "friday"])
934
def test_earliest_point_included_when_offset_is_set_15241(start_by: StartBy) -> None:
935
df = pl.DataFrame(
936
data={
937
"t": pl.Series(
938
[
939
datetime(2024, 3, 22, 3, 0, tzinfo=timezone.utc),
940
datetime(2024, 3, 22, 4, 0, tzinfo=timezone.utc),
941
datetime(2024, 3, 22, 5, 0, tzinfo=timezone.utc),
942
datetime(2024, 3, 22, 6, 0, tzinfo=timezone.utc),
943
]
944
),
945
"v": [1, 10, 100, 1000],
946
}
947
).set_sorted("t")
948
result = df.group_by_dynamic(
949
index_column="t",
950
every="1d",
951
offset=timedelta(hours=5),
952
start_by=start_by,
953
).agg("v")
954
expected = pl.DataFrame(
955
{
956
"t": [
957
datetime(2024, 3, 21, 5, 0, tzinfo=timezone.utc),
958
datetime(2024, 3, 22, 5, 0, tzinfo=timezone.utc),
959
],
960
"v": [[1, 10], [100, 1000]],
961
}
962
)
963
assert_frame_equal(result, expected)
964
965
966
def test_group_by_dynamic_invalid() -> None:
967
df = pl.DataFrame(
968
{
969
"values": [1, 4],
970
"times": [datetime(2020, 1, 3), datetime(2020, 1, 1)],
971
},
972
)
973
with pytest.raises(
974
InvalidOperationError, match="duration may not be a parsed integer"
975
):
976
(
977
df.sort("times")
978
.group_by_dynamic("times", every="3000i")
979
.agg(pl.col("values").sum().alias("sum"))
980
)
981
with pytest.raises(
982
InvalidOperationError, match="duration must be a parsed integer"
983
):
984
(
985
df.with_row_index()
986
.group_by_dynamic("index", every="3000d")
987
.agg(pl.col("values").sum().alias("sum"))
988
)
989
990
991
def test_group_by_dynamic_get() -> None:
992
df = pl.DataFrame(
993
{
994
"time": pl.date_range(pl.date(2021, 1, 1), pl.date(2021, 1, 8), eager=True),
995
"data": pl.arange(8, eager=True),
996
}
997
)
998
999
assert df.group_by_dynamic(
1000
index_column="time",
1001
every="2d",
1002
period="3d",
1003
start_by="datapoint",
1004
).agg(
1005
get=pl.col("data").get(1),
1006
).to_dict(as_series=False) == {
1007
"time": [
1008
date(2021, 1, 1),
1009
date(2021, 1, 3),
1010
date(2021, 1, 5),
1011
date(2021, 1, 7),
1012
],
1013
"get": [1, 3, 5, 7],
1014
}
1015
1016
1017
def test_group_by_dynamic_exclude_index_from_expansion_17075() -> None:
1018
lf = pl.LazyFrame(
1019
{
1020
"time": pl.datetime_range(
1021
start=datetime(2021, 12, 16),
1022
end=datetime(2021, 12, 16, 3),
1023
interval="30m",
1024
eager=True,
1025
),
1026
"n": range(7),
1027
"m": range(7),
1028
}
1029
)
1030
1031
assert lf.group_by_dynamic(
1032
"time", every="1h", closed="right"
1033
).last().collect().to_dict(as_series=False) == {
1034
"time": [
1035
datetime(2021, 12, 15, 23, 0),
1036
datetime(2021, 12, 16, 0, 0),
1037
datetime(2021, 12, 16, 1, 0),
1038
datetime(2021, 12, 16, 2, 0),
1039
],
1040
"n": [0, 2, 4, 6],
1041
"m": [0, 2, 4, 6],
1042
}
1043
1044
1045
def test_group_by_dynamic_overlapping_19704() -> None:
1046
df = pl.DataFrame(
1047
{
1048
"a": [datetime(2020, 1, 1), datetime(2020, 2, 1), datetime(2020, 3, 1)],
1049
"b": [1, 2, 3],
1050
}
1051
)
1052
result = df.group_by_dynamic(
1053
"a", every="1mo", period="45d", include_boundaries=True
1054
).agg(pl.col("b").sum())
1055
expected = pl.DataFrame(
1056
{
1057
"_lower_boundary": [
1058
datetime(2020, 1, 1, 0, 0),
1059
datetime(2020, 2, 1, 0, 0),
1060
datetime(2020, 3, 1, 0, 0),
1061
],
1062
"_upper_boundary": [
1063
datetime(2020, 2, 15, 0, 0),
1064
datetime(2020, 3, 17, 0, 0),
1065
datetime(2020, 4, 15, 0, 0),
1066
],
1067
"a": [
1068
datetime(2020, 1, 1, 0, 0),
1069
datetime(2020, 2, 1, 0, 0),
1070
datetime(2020, 3, 1, 0, 0),
1071
],
1072
"b": [3, 5, 3],
1073
}
1074
)
1075
assert_frame_equal(result, expected)
1076
1077
1078
def test_group_by_dynamic_single_row_22585() -> None:
1079
df = pl.DataFrame({"date": [date(2025, 1, 1)], "group": ["x"]})
1080
out = df.group_by_dynamic("date", every="1y", group_by=["group"]).agg(pl.len())
1081
expected = pl.DataFrame(
1082
{"group": ["x"], "date": [date(2025, 1, 1)], "len": [1]}
1083
).with_columns(pl.col("len").cast(pl.UInt32))
1084
assert_frame_equal(expected, out)
1085
1086
1087
def test_group_by_dynamic_zero_sum_23433() -> None:
1088
df = pl.DataFrame(
1089
{
1090
"g": [0, 0, 1, 1, 2, 2, 2, 3, 3],
1091
"x": [None, None, None, None, None, None, None, 1, 2],
1092
}
1093
)
1094
out = df.group_by_dynamic("g", every="1i", period="2i").agg(pl.col.x.sum())
1095
expected = pl.DataFrame({"g": [0, 1, 2, 3], "x": [0, 0, 3, 3]})
1096
assert_frame_equal(out, expected)
1097
1098
1099
def test_group_by_dynamic_null_mean_22724() -> None:
1100
time = pl.datetime_range(
1101
start=datetime(2025, 1, 1, 0, 0, 00),
1102
end=datetime(2025, 1, 1, 0, 0, 10),
1103
interval="1s",
1104
eager=True,
1105
)
1106
1107
b = pl.DataFrame(
1108
{
1109
"time": time,
1110
"value": [None, None, None, 0, None, None, None, None, -1, None, None],
1111
"empty": [None] * len(time),
1112
}
1113
).cast({"value": pl.Float32, "empty": pl.Float32})
1114
gb = b.group_by_dynamic("time", every="2s", period="3s", offset="-3s")
1115
out = gb.agg([pl.col("value").cast(pl.Float32).mean()])
1116
1117
expected = pl.DataFrame(
1118
{
1119
"time": pl.Series(
1120
[
1121
datetime(2024, 12, 31, 23, 59, 59),
1122
datetime(2025, 1, 1, 0, 0, 1),
1123
datetime(2025, 1, 1, 0, 0, 3),
1124
datetime(2025, 1, 1, 0, 0, 5),
1125
datetime(2025, 1, 1, 0, 0, 7),
1126
datetime(2025, 1, 1, 0, 0, 9),
1127
],
1128
dtype=pl.Datetime(time_unit="us", time_zone=None),
1129
),
1130
"value": pl.Series([None, 0.0, 0.0, None, -1.0, None], dtype=pl.Float32),
1131
}
1132
)
1133
assert_frame_equal(out, expected)
1134
1135