Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-time/src/windows/group_by.rs
8430 views
1
use std::collections::VecDeque;
2
3
use arrow::legacy::time_zone::Tz;
4
use arrow::temporal_conversions::{
5
timestamp_ms_to_datetime, timestamp_ns_to_datetime, timestamp_us_to_datetime,
6
};
7
use arrow::trusted_len::TrustedLen;
8
use chrono::NaiveDateTime;
9
#[cfg(feature = "timezones")]
10
use chrono::TimeZone as _;
11
use now::DateTimeNow;
12
use polars_core::POOL;
13
use polars_core::prelude::*;
14
use polars_core::utils::_split_offsets;
15
use polars_core::utils::flatten::flatten_par;
16
use rayon::prelude::*;
17
#[cfg(feature = "serde")]
18
use serde::{Deserialize, Serialize};
19
use strum_macros::IntoStaticStr;
20
21
use crate::prelude::*;
22
23
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, IntoStaticStr)]
24
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
25
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
26
#[strum(serialize_all = "snake_case")]
27
pub enum ClosedWindow {
28
Left,
29
Right,
30
Both,
31
None,
32
}
33
34
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, IntoStaticStr)]
35
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
36
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
37
#[strum(serialize_all = "snake_case")]
38
pub enum Label {
39
Left,
40
Right,
41
DataPoint,
42
}
43
44
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, IntoStaticStr)]
45
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
46
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
47
#[strum(serialize_all = "snake_case")]
48
#[derive(Default)]
49
pub enum StartBy {
50
#[default]
51
WindowBound,
52
DataPoint,
53
/// only useful if periods are weekly
54
Monday,
55
Tuesday,
56
Wednesday,
57
Thursday,
58
Friday,
59
Saturday,
60
Sunday,
61
}
62
63
impl StartBy {
64
pub fn weekday(&self) -> Option<u32> {
65
match self {
66
StartBy::Monday => Some(0),
67
StartBy::Tuesday => Some(1),
68
StartBy::Wednesday => Some(2),
69
StartBy::Thursday => Some(3),
70
StartBy::Friday => Some(4),
71
StartBy::Saturday => Some(5),
72
StartBy::Sunday => Some(6),
73
_ => None,
74
}
75
}
76
}
77
78
#[allow(clippy::too_many_arguments)]
79
fn update_groups_and_bounds(
80
bounds_iter: BoundsIter<'_>,
81
mut start: usize,
82
time: &[i64],
83
closed_window: ClosedWindow,
84
include_lower_bound: bool,
85
include_upper_bound: bool,
86
lower_bound: &mut Vec<i64>,
87
upper_bound: &mut Vec<i64>,
88
groups: &mut Vec<[IdxSize; 2]>,
89
) {
90
let mut iter = bounds_iter.into_iter();
91
let mut stride = 0;
92
93
'bounds: while let Some(bi) = iter.nth(stride) {
94
let mut has_member = false;
95
// find starting point of window
96
for &t in &time[start..time.len().saturating_sub(1)] {
97
// the window is behind the time values.
98
if bi.is_future(t, closed_window) {
99
stride = iter.get_stride(t);
100
continue 'bounds;
101
}
102
if bi.is_member_entry(t, closed_window) {
103
has_member = true;
104
break;
105
}
106
// element drops out of the window
107
start += 1;
108
}
109
110
// update stride so we can fast-forward in case of sparse data
111
stride = if has_member {
112
0
113
} else {
114
debug_assert!(start < time.len());
115
iter.get_stride(time[start])
116
};
117
118
// find members of this window
119
let mut end = start;
120
121
// last value isn't always added
122
if end == time.len() - 1 {
123
let t = time[end];
124
if bi.is_member(t, closed_window) {
125
if include_lower_bound {
126
lower_bound.push(bi.start);
127
}
128
if include_upper_bound {
129
upper_bound.push(bi.stop);
130
}
131
groups.push([end as IdxSize, 1])
132
}
133
continue;
134
}
135
for &t in &time[end..] {
136
if !bi.is_member_exit(t, closed_window) {
137
break;
138
}
139
end += 1;
140
}
141
let len = end - start;
142
143
if include_lower_bound {
144
lower_bound.push(bi.start);
145
}
146
if include_upper_bound {
147
upper_bound.push(bi.stop);
148
}
149
groups.push([start as IdxSize, len as IdxSize])
150
}
151
}
152
153
/// Window boundaries are created based on the given `Window`, which is defined by:
154
/// - every
155
/// - period
156
/// - offset
157
///
158
/// And every window boundary we search for the values that fit that window by the given
159
/// `ClosedWindow`. The groups are return as `GroupTuples` together with the lower bound and upper
160
/// bound timestamps. These timestamps indicate the start (lower) and end (upper) of the window of
161
/// that group.
162
///
163
/// If `include_boundaries` is `false` those `lower` and `upper` vectors will be empty.
164
#[allow(clippy::too_many_arguments)]
165
pub fn group_by_windows(
166
window: Window,
167
time: &[i64],
168
closed_window: ClosedWindow,
169
tu: TimeUnit,
170
tz: &Option<TimeZone>,
171
include_lower_bound: bool,
172
include_upper_bound: bool,
173
start_by: StartBy,
174
) -> PolarsResult<(GroupsSlice, Vec<i64>, Vec<i64>)> {
175
let start = time[0];
176
// the boundary we define here is not yet correct. It doesn't take 'period' into account
177
// and it doesn't have the proper starting point. This boundary is used as a proxy to find
178
// the proper 'boundary' in 'window.get_overlapping_bounds_iter'.
179
let boundary = if time.len() > 1 {
180
// +1 because left or closed boundary could match the next window if it is on the boundary
181
let stop = time[time.len() - 1] + 1;
182
Bounds::new_checked(start, stop)
183
} else {
184
let stop = start + 1;
185
Bounds::new_checked(start, stop)
186
};
187
188
let size = {
189
match tu {
190
TimeUnit::Nanoseconds => window.estimate_overlapping_bounds_ns(boundary),
191
TimeUnit::Microseconds => window.estimate_overlapping_bounds_us(boundary),
192
TimeUnit::Milliseconds => window.estimate_overlapping_bounds_ms(boundary),
193
}
194
};
195
let size_lower = if include_lower_bound { size } else { 0 };
196
let size_upper = if include_upper_bound { size } else { 0 };
197
let mut lower_bound = Vec::with_capacity(size_lower);
198
let mut upper_bound = Vec::with_capacity(size_upper);
199
200
let mut groups = Vec::with_capacity(size);
201
let start_offset = 0;
202
203
match tz {
204
#[cfg(feature = "timezones")]
205
Some(tz) => {
206
update_groups_and_bounds(
207
window.get_overlapping_bounds_iter(
208
boundary,
209
closed_window,
210
tu,
211
tz.parse::<Tz>().ok().as_ref(),
212
start_by,
213
)?,
214
start_offset,
215
time,
216
closed_window,
217
include_lower_bound,
218
include_upper_bound,
219
&mut lower_bound,
220
&mut upper_bound,
221
&mut groups,
222
);
223
},
224
_ => {
225
update_groups_and_bounds(
226
window.get_overlapping_bounds_iter(boundary, closed_window, tu, None, start_by)?,
227
start_offset,
228
time,
229
closed_window,
230
include_lower_bound,
231
include_upper_bound,
232
&mut lower_bound,
233
&mut upper_bound,
234
&mut groups,
235
);
236
},
237
};
238
239
Ok((groups, lower_bound, upper_bound))
240
}
241
242
// t is right at the end of the window
243
// ------t---
244
// [------]
245
#[inline]
246
#[allow(clippy::too_many_arguments)]
247
pub(crate) fn group_by_values_iter_lookbehind(
248
period: Duration,
249
offset: Duration,
250
time: &[i64],
251
closed_window: ClosedWindow,
252
tu: TimeUnit,
253
tz: Option<Tz>,
254
start_offset: usize,
255
upper_bound: Option<usize>,
256
) -> PolarsResult<impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_> {
257
debug_assert!(offset.duration_ns() == period.duration_ns());
258
debug_assert!(offset.negative);
259
let add = match tu {
260
TimeUnit::Nanoseconds => Duration::add_ns,
261
TimeUnit::Microseconds => Duration::add_us,
262
TimeUnit::Milliseconds => Duration::add_ms,
263
};
264
265
let upper_bound = upper_bound.unwrap_or(time.len());
266
// Use binary search to find the initial start as that is behind.
267
let mut start = if let Some(&t) = time.get(start_offset) {
268
let lower = add(&offset, t, tz.as_ref())?;
269
// We have `period == -offset`, so `t + offset + period` is equal to `t`,
270
// and `upper` is trivially equal to `t` itself. Using the trivial calculation,
271
// instead of `upper = lower + period`, avoids issues around
272
// `t - 1mo + 1mo` not round-tripping.
273
let upper = t;
274
let b = Bounds::new(lower, upper);
275
let slice = &time[..start_offset];
276
slice.partition_point(|v| !b.is_member(*v, closed_window))
277
} else {
278
0
279
};
280
let mut end = start;
281
let mut last = time[start_offset];
282
Ok(time[start_offset..upper_bound]
283
.iter()
284
.enumerate()
285
.map(move |(mut i, t)| {
286
// Fast path for duplicates.
287
if *t == last && i > 0 {
288
let len = end - start;
289
let offset = start as IdxSize;
290
return Ok((offset, len as IdxSize));
291
}
292
last = *t;
293
i += start_offset;
294
295
let lower = add(&offset, *t, tz.as_ref())?;
296
let upper = *t;
297
298
let b = Bounds::new(lower, upper);
299
300
for &t in unsafe { time.get_unchecked(start..i) } {
301
if b.is_member_entry(t, closed_window) {
302
break;
303
}
304
start += 1;
305
}
306
307
// faster path, check if `i` is member.
308
if b.is_member_exit(*t, closed_window) {
309
end = i;
310
} else {
311
end = std::cmp::max(end, start);
312
}
313
// we still must loop to consume duplicates
314
for &t in unsafe { time.get_unchecked(end..) } {
315
if !b.is_member_exit(t, closed_window) {
316
break;
317
}
318
end += 1;
319
}
320
321
let len = end - start;
322
let offset = start as IdxSize;
323
324
Ok((offset, len as IdxSize))
325
}))
326
}
327
328
// this one is correct for all lookbehind/lookaheads, but is slower
329
// window is completely behind t and t itself is not a member
330
// ---------------t---
331
// [---]
332
pub(crate) fn group_by_values_iter_window_behind_t(
333
period: Duration,
334
offset: Duration,
335
time: &[i64],
336
closed_window: ClosedWindow,
337
tu: TimeUnit,
338
tz: Option<Tz>,
339
) -> impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_ {
340
let add = match tu {
341
TimeUnit::Nanoseconds => Duration::add_ns,
342
TimeUnit::Microseconds => Duration::add_us,
343
TimeUnit::Milliseconds => Duration::add_ms,
344
};
345
346
let mut start = 0;
347
let mut end = start;
348
let mut last = time[0];
349
let mut started = false;
350
time.iter().map(move |lower| {
351
// Fast path for duplicates.
352
if *lower == last && started {
353
let len = end - start;
354
let offset = start as IdxSize;
355
return Ok((offset, len as IdxSize));
356
}
357
last = *lower;
358
started = true;
359
let lower = add(&offset, *lower, tz.as_ref())?;
360
let upper = add(&period, lower, tz.as_ref())?;
361
362
let b = Bounds::new(lower, upper);
363
if b.is_future(time[0], closed_window) {
364
Ok((0, 0))
365
} else {
366
for &t in &time[start..] {
367
if b.is_member_entry(t, closed_window) {
368
break;
369
}
370
start += 1;
371
}
372
373
end = std::cmp::max(start, end);
374
for &t in &time[end..] {
375
if !b.is_member_exit(t, closed_window) {
376
break;
377
}
378
end += 1;
379
}
380
381
let len = end - start;
382
let offset = start as IdxSize;
383
384
Ok((offset, len as IdxSize))
385
}
386
})
387
}
388
389
// window is with -1 periods of t
390
// ----t---
391
// [---]
392
pub(crate) fn group_by_values_iter_partial_lookbehind(
393
period: Duration,
394
offset: Duration,
395
time: &[i64],
396
closed_window: ClosedWindow,
397
tu: TimeUnit,
398
tz: Option<Tz>,
399
) -> impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_ {
400
let add = match tu {
401
TimeUnit::Nanoseconds => Duration::add_ns,
402
TimeUnit::Microseconds => Duration::add_us,
403
TimeUnit::Milliseconds => Duration::add_ms,
404
};
405
406
let mut start = 0;
407
let mut end = start;
408
let mut last = time[0];
409
time.iter().enumerate().map(move |(i, lower)| {
410
// Fast path for duplicates.
411
if *lower == last && i > 0 {
412
let len = end - start;
413
let offset = start as IdxSize;
414
return Ok((offset, len as IdxSize));
415
}
416
last = *lower;
417
418
let lower = add(&offset, *lower, tz.as_ref())?;
419
let upper = add(&period, lower, tz.as_ref())?;
420
421
let b = Bounds::new(lower, upper);
422
423
for &t in &time[start..] {
424
if b.is_member_entry(t, closed_window) || start == i {
425
break;
426
}
427
start += 1;
428
}
429
430
end = std::cmp::max(start, end);
431
for &t in &time[end..] {
432
if !b.is_member_exit(t, closed_window) {
433
break;
434
}
435
end += 1;
436
}
437
438
let len = end - start;
439
let offset = start as IdxSize;
440
441
Ok((offset, len as IdxSize))
442
})
443
}
444
445
#[allow(clippy::too_many_arguments)]
446
// window is completely ahead of t and t itself is not a member
447
// --t-----------
448
// [---]
449
pub(crate) fn group_by_values_iter_lookahead(
450
period: Duration,
451
offset: Duration,
452
time: &[i64],
453
closed_window: ClosedWindow,
454
tu: TimeUnit,
455
tz: Option<Tz>,
456
start_offset: usize,
457
upper_bound: Option<usize>,
458
) -> impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_ {
459
let upper_bound = upper_bound.unwrap_or(time.len());
460
461
let add = match tu {
462
TimeUnit::Nanoseconds => Duration::add_ns,
463
TimeUnit::Microseconds => Duration::add_us,
464
TimeUnit::Milliseconds => Duration::add_ms,
465
};
466
let mut start = start_offset;
467
let mut end = start;
468
469
let mut last = time[start_offset];
470
let mut started = false;
471
time[start_offset..upper_bound].iter().map(move |lower| {
472
// Fast path for duplicates.
473
if *lower == last && started {
474
let len = end - start;
475
let offset = start as IdxSize;
476
return Ok((offset, len as IdxSize));
477
}
478
started = true;
479
last = *lower;
480
481
let lower = add(&offset, *lower, tz.as_ref())?;
482
let upper = add(&period, lower, tz.as_ref())?;
483
484
let b = Bounds::new(lower, upper);
485
486
for &t in &time[start..] {
487
if b.is_member_entry(t, closed_window) {
488
break;
489
}
490
start += 1;
491
}
492
493
end = std::cmp::max(start, end);
494
for &t in &time[end..] {
495
if !b.is_member_exit(t, closed_window) {
496
break;
497
}
498
end += 1;
499
}
500
501
let len = end - start;
502
let offset = start as IdxSize;
503
504
Ok((offset, len as IdxSize))
505
})
506
}
507
508
#[cfg(feature = "rolling_window_by")]
509
#[inline]
510
pub(crate) fn group_by_values_iter(
511
period: Duration,
512
time: &[i64],
513
closed_window: ClosedWindow,
514
tu: TimeUnit,
515
tz: Option<Tz>,
516
) -> PolarsResult<impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_> {
517
let mut offset = period;
518
offset.negative = true;
519
// t is at the right endpoint of the window
520
group_by_values_iter_lookbehind(period, offset, time, closed_window, tu, tz, 0, None)
521
}
522
523
/// Checks if the boundary elements don't split on duplicates.
524
/// If they do we remove them
525
fn prune_splits_on_duplicates(time: &[i64], thread_offsets: &mut Vec<(usize, usize)>) {
526
let is_valid = |window: &[(usize, usize)]| -> bool {
527
debug_assert_eq!(window.len(), 2);
528
let left_block_end = window[0].0 + window[0].1.saturating_sub(1);
529
let right_block_start = window[1].0;
530
time[left_block_end] != time[right_block_start]
531
};
532
533
if time.is_empty() || thread_offsets.len() <= 1 || thread_offsets.windows(2).all(is_valid) {
534
return;
535
}
536
537
let mut new = vec![];
538
for window in thread_offsets.windows(2) {
539
let this_block_is_valid = is_valid(window);
540
if this_block_is_valid {
541
// Only push left block
542
new.push(window[0])
543
}
544
}
545
// Check last block
546
if thread_offsets.len().is_multiple_of(2) {
547
let window = &thread_offsets[thread_offsets.len() - 2..];
548
if is_valid(window) {
549
new.push(thread_offsets[thread_offsets.len() - 1])
550
}
551
}
552
// We pruned invalid blocks, now we must correct the lengths.
553
if new.len() <= 1 {
554
new = vec![(0, time.len())];
555
} else {
556
let mut previous_start = time.len();
557
for window in new.iter_mut().rev() {
558
window.1 = previous_start - window.0;
559
previous_start = window.0;
560
}
561
new[0].0 = 0;
562
new[0].1 = new[1].0;
563
debug_assert_eq!(new.iter().map(|w| w.1).sum::<usize>(), time.len());
564
// Call again to check.
565
prune_splits_on_duplicates(time, &mut new)
566
}
567
std::mem::swap(thread_offsets, &mut new);
568
}
569
570
#[allow(clippy::too_many_arguments)]
571
fn group_by_values_iter_lookbehind_collected(
572
period: Duration,
573
offset: Duration,
574
time: &[i64],
575
closed_window: ClosedWindow,
576
tu: TimeUnit,
577
tz: Option<Tz>,
578
start_offset: usize,
579
upper_bound: Option<usize>,
580
) -> PolarsResult<Vec<[IdxSize; 2]>> {
581
let iter = group_by_values_iter_lookbehind(
582
period,
583
offset,
584
time,
585
closed_window,
586
tu,
587
tz,
588
start_offset,
589
upper_bound,
590
)?;
591
iter.map(|result| result.map(|(offset, len)| [offset, len]))
592
.collect::<PolarsResult<Vec<_>>>()
593
}
594
595
#[allow(clippy::too_many_arguments)]
596
pub(crate) fn group_by_values_iter_lookahead_collected(
597
period: Duration,
598
offset: Duration,
599
time: &[i64],
600
closed_window: ClosedWindow,
601
tu: TimeUnit,
602
tz: Option<Tz>,
603
start_offset: usize,
604
upper_bound: Option<usize>,
605
) -> PolarsResult<Vec<[IdxSize; 2]>> {
606
let iter = group_by_values_iter_lookahead(
607
period,
608
offset,
609
time,
610
closed_window,
611
tu,
612
tz,
613
start_offset,
614
upper_bound,
615
);
616
iter.map(|result| result.map(|(offset, len)| [offset as IdxSize, len]))
617
.collect::<PolarsResult<Vec<_>>>()
618
}
619
620
/// Different from `group_by_windows`, where define window buckets and search which values fit that
621
/// pre-defined bucket.
622
///
623
/// This function defines every window based on the:
624
/// - timestamp (lower bound)
625
/// - timestamp + period (upper bound)
626
/// where timestamps are the individual values in the array `time`
627
pub fn group_by_values(
628
period: Duration,
629
offset: Duration,
630
time: &[i64],
631
closed_window: ClosedWindow,
632
tu: TimeUnit,
633
tz: Option<Tz>,
634
) -> PolarsResult<GroupsSlice> {
635
if time.is_empty() {
636
return Ok(GroupsSlice::from(vec![]));
637
}
638
639
let mut thread_offsets = _split_offsets(time.len(), POOL.current_num_threads());
640
// there are duplicates in the splits, so we opt for a single partition
641
prune_splits_on_duplicates(time, &mut thread_offsets);
642
643
// If we start from within parallel work we will do this single threaded.
644
let run_parallel = !POOL.current_thread_has_pending_tasks().unwrap_or(false);
645
646
// we have a (partial) lookbehind window
647
if offset.negative && !offset.is_zero() {
648
// lookbehind
649
if offset.duration_ns() == period.duration_ns() {
650
// t is right at the end of the window
651
// ------t---
652
// [------]
653
if !run_parallel {
654
let vecs = group_by_values_iter_lookbehind_collected(
655
period,
656
offset,
657
time,
658
closed_window,
659
tu,
660
tz,
661
0,
662
None,
663
)?;
664
return Ok(GroupsSlice::from(vecs));
665
}
666
667
POOL.install(|| {
668
let vals = thread_offsets
669
.par_iter()
670
.copied()
671
.map(|(base_offset, len)| {
672
let upper_bound = base_offset + len;
673
group_by_values_iter_lookbehind_collected(
674
period,
675
offset,
676
time,
677
closed_window,
678
tu,
679
tz,
680
base_offset,
681
Some(upper_bound),
682
)
683
})
684
.collect::<PolarsResult<Vec<_>>>()?;
685
Ok(flatten_par(&vals))
686
})
687
} else if ((offset.duration_ns() >= period.duration_ns())
688
&& matches!(closed_window, ClosedWindow::Left | ClosedWindow::None))
689
|| ((offset.duration_ns() > period.duration_ns())
690
&& matches!(closed_window, ClosedWindow::Right | ClosedWindow::Both))
691
{
692
// window is completely behind t and t itself is not a member
693
// ---------------t---
694
// [---]
695
let iter =
696
group_by_values_iter_window_behind_t(period, offset, time, closed_window, tu, tz);
697
iter.map(|result| result.map(|(offset, len)| [offset, len]))
698
.collect::<PolarsResult<_>>()
699
}
700
// partial lookbehind
701
// this one is still single threaded
702
// can make it parallel later, its a bit more complicated because the boundaries are unknown
703
// window is with -1 periods of t
704
// ----t---
705
// [---]
706
else {
707
let iter = group_by_values_iter_partial_lookbehind(
708
period,
709
offset,
710
time,
711
closed_window,
712
tu,
713
tz,
714
);
715
iter.map(|result| result.map(|(offset, len)| [offset, len]))
716
.collect::<PolarsResult<_>>()
717
}
718
} else if !offset.is_zero()
719
|| closed_window == ClosedWindow::Right
720
|| closed_window == ClosedWindow::None
721
{
722
// window is completely ahead of t and t itself is not a member
723
// --t-----------
724
// [---]
725
726
if !run_parallel {
727
let vecs = group_by_values_iter_lookahead_collected(
728
period,
729
offset,
730
time,
731
closed_window,
732
tu,
733
tz,
734
0,
735
None,
736
)?;
737
return Ok(GroupsSlice::from(vecs));
738
}
739
740
POOL.install(|| {
741
let vals = thread_offsets
742
.par_iter()
743
.copied()
744
.map(|(base_offset, len)| {
745
let lower_bound = base_offset;
746
let upper_bound = base_offset + len;
747
group_by_values_iter_lookahead_collected(
748
period,
749
offset,
750
time,
751
closed_window,
752
tu,
753
tz,
754
lower_bound,
755
Some(upper_bound),
756
)
757
})
758
.collect::<PolarsResult<Vec<_>>>()?;
759
Ok(flatten_par(&vals))
760
})
761
} else {
762
if !run_parallel {
763
let vecs = group_by_values_iter_lookahead_collected(
764
period,
765
offset,
766
time,
767
closed_window,
768
tu,
769
tz,
770
0,
771
None,
772
)?;
773
return Ok(GroupsSlice::from(vecs));
774
}
775
776
// Offset is 0 and window is closed on the left:
777
// it must be that the window starts at t and t is a member
778
// --t-----------
779
// [---]
780
POOL.install(|| {
781
let vals = thread_offsets
782
.par_iter()
783
.copied()
784
.map(|(base_offset, len)| {
785
let lower_bound = base_offset;
786
let upper_bound = base_offset + len;
787
group_by_values_iter_lookahead_collected(
788
period,
789
offset,
790
time,
791
closed_window,
792
tu,
793
tz,
794
lower_bound,
795
Some(upper_bound),
796
)
797
})
798
.collect::<PolarsResult<Vec<_>>>()?;
799
Ok(flatten_par(&vals))
800
})
801
}
802
}
803
804
pub struct RollingWindower {
805
period: Duration,
806
offset: Duration,
807
closed: ClosedWindow,
808
809
add: fn(&Duration, i64, Option<&Tz>) -> PolarsResult<i64>,
810
tz: Option<Tz>,
811
812
start: IdxSize,
813
end: IdxSize,
814
length: IdxSize,
815
816
active: VecDeque<ActiveWindow>,
817
}
818
819
struct ActiveWindow {
820
start: i64,
821
end: i64,
822
}
823
824
impl ActiveWindow {
825
#[inline(always)]
826
fn above_lower_bound(&self, t: i64, closed: ClosedWindow) -> bool {
827
(t > self.start)
828
| (matches!(closed, ClosedWindow::Left | ClosedWindow::Both) & (t == self.start))
829
}
830
831
#[inline(always)]
832
fn below_upper_bound(&self, t: i64, closed: ClosedWindow) -> bool {
833
(t < self.end)
834
| (matches!(closed, ClosedWindow::Right | ClosedWindow::Both) & (t == self.end))
835
}
836
}
837
838
fn skip_in_2d_list(l: &[&[i64]], mut n: usize) -> (usize, usize) {
839
let mut y = 0;
840
while y < l.len() && (n >= l[y].len() || l[y].is_empty()) {
841
n -= l[y].len();
842
y += 1;
843
}
844
assert!(n == 0 || y < l.len());
845
(n, y)
846
}
847
fn increment_2d(x: &mut usize, y: &mut usize, l: &[&[i64]]) {
848
*x += 1;
849
while *y < l.len() && *x == l[*y].len() {
850
*y += 1;
851
*x = 0;
852
}
853
}
854
855
impl RollingWindower {
856
pub fn new(
857
period: Duration,
858
offset: Duration,
859
closed: ClosedWindow,
860
tu: TimeUnit,
861
tz: Option<Tz>,
862
) -> Self {
863
Self {
864
period,
865
offset,
866
closed,
867
868
add: match tu {
869
TimeUnit::Nanoseconds => Duration::add_ns,
870
TimeUnit::Microseconds => Duration::add_us,
871
TimeUnit::Milliseconds => Duration::add_ms,
872
},
873
tz,
874
875
start: 0,
876
end: 0,
877
length: 0,
878
879
active: Default::default(),
880
}
881
}
882
883
/// Insert new values into the windower.
884
///
885
/// This should be given all the old values that were not processed yet.
886
pub fn insert(
887
&mut self,
888
time: &[&[i64]],
889
windows: &mut Vec<[IdxSize; 2]>,
890
) -> PolarsResult<IdxSize> {
891
let (mut i_x, mut i_y) = skip_in_2d_list(time, (self.length - self.start) as usize);
892
let (mut s_x, mut s_y) = skip_in_2d_list(time, 0); // skip over empty lists
893
let (mut e_x, mut e_y) = skip_in_2d_list(time, (self.end - self.start) as usize);
894
895
let time_start = self.start;
896
let mut i = self.length;
897
while i_y < time.len() {
898
let t = time[i_y][i_x];
899
let window_start = (self.add)(&self.offset, t, self.tz.as_ref())?;
900
// For datetime arithmetic, it does *NOT* hold 0 + a - a == 0. Therefore, we make sure
901
// that if `offset` and `period` are inverses we keep the `t`.
902
let window_end = if self.offset == -self.period {
903
t
904
} else {
905
(self.add)(&self.period, window_start, self.tz.as_ref())?
906
};
907
908
self.active.push_back(ActiveWindow {
909
start: window_start,
910
end: window_end,
911
});
912
913
while let Some(w) = self.active.front() {
914
if w.below_upper_bound(t, self.closed) {
915
break;
916
}
917
918
let w = self.active.pop_front().unwrap();
919
while self.start < i && !w.above_lower_bound(time[s_y][s_x], self.closed) {
920
increment_2d(&mut s_x, &mut s_y, time);
921
self.start += 1;
922
}
923
while self.end < i && w.below_upper_bound(time[e_y][e_x], self.closed) {
924
increment_2d(&mut e_x, &mut e_y, time);
925
self.end += 1;
926
}
927
windows.push([self.start, self.end - self.start]);
928
}
929
930
increment_2d(&mut i_x, &mut i_y, time);
931
i += 1;
932
}
933
934
self.length = i;
935
Ok(self.start - time_start)
936
}
937
938
/// Process all remaining items and signal that no more items are coming.
939
pub fn finalize(&mut self, time: &[&[i64]], windows: &mut Vec<[IdxSize; 2]>) {
940
assert_eq!(
941
time.iter().map(|t| t.len()).sum::<usize>() as IdxSize,
942
self.length - self.start
943
);
944
945
let (mut s_x, mut s_y) = skip_in_2d_list(time, 0);
946
let (mut e_x, mut e_y) = skip_in_2d_list(time, (self.end - self.start) as usize);
947
948
windows.extend(self.active.drain(..).map(|w| {
949
while self.start < self.length && !w.above_lower_bound(time[s_y][s_x], self.closed) {
950
increment_2d(&mut s_x, &mut s_y, time);
951
self.start += 1;
952
}
953
while self.end < self.length && w.below_upper_bound(time[e_y][e_x], self.closed) {
954
increment_2d(&mut e_x, &mut e_y, time);
955
self.end += 1;
956
}
957
[self.start, self.end - self.start]
958
}));
959
960
self.start = 0;
961
self.end = 0;
962
self.length = 0;
963
}
964
965
pub fn reset(&mut self) {
966
self.active.clear();
967
self.start = 0;
968
self.end = 0;
969
self.length = 0;
970
}
971
}
972
973
#[derive(Debug)]
974
struct ActiveDynWindow {
975
start: IdxSize,
976
lower_bound: i64,
977
upper_bound: i64,
978
}
979
980
#[inline(always)]
981
fn is_above_lower_bound(t: i64, lb: i64, closed: ClosedWindow) -> bool {
982
(t > lb) | (matches!(closed, ClosedWindow::Left | ClosedWindow::Both) & (t == lb))
983
}
984
#[inline(always)]
985
fn is_below_upper_bound(t: i64, ub: i64, closed: ClosedWindow) -> bool {
986
(t < ub) | (matches!(closed, ClosedWindow::Right | ClosedWindow::Both) & (t == ub))
987
}
988
989
pub struct GroupByDynamicWindower {
990
period: Duration,
991
offset: Duration,
992
every: Duration,
993
closed: ClosedWindow,
994
995
start_by: StartBy,
996
997
add: fn(&Duration, i64, Option<&Tz>) -> PolarsResult<i64>,
998
// Not-to-exceed duration (upper limit).
999
nte: fn(&Duration) -> i64,
1000
tu: TimeUnit,
1001
tz: Option<Tz>,
1002
1003
include_lower_bound: bool,
1004
include_upper_bound: bool,
1005
1006
num_seen: IdxSize,
1007
next_lower_bound: i64,
1008
active: VecDeque<ActiveDynWindow>,
1009
}
1010
1011
impl GroupByDynamicWindower {
1012
#[expect(clippy::too_many_arguments)]
1013
pub fn new(
1014
period: Duration,
1015
offset: Duration,
1016
every: Duration,
1017
start_by: StartBy,
1018
closed: ClosedWindow,
1019
tu: TimeUnit,
1020
tz: Option<Tz>,
1021
include_lower_bound: bool,
1022
include_upper_bound: bool,
1023
) -> Self {
1024
Self {
1025
period,
1026
offset,
1027
every,
1028
closed,
1029
1030
start_by,
1031
1032
add: match tu {
1033
TimeUnit::Nanoseconds => Duration::add_ns,
1034
TimeUnit::Microseconds => Duration::add_us,
1035
TimeUnit::Milliseconds => Duration::add_ms,
1036
},
1037
nte: match tu {
1038
TimeUnit::Nanoseconds => Duration::nte_duration_ns,
1039
TimeUnit::Microseconds => Duration::nte_duration_us,
1040
TimeUnit::Milliseconds => Duration::nte_duration_ms,
1041
},
1042
tu,
1043
tz,
1044
1045
include_lower_bound,
1046
include_upper_bound,
1047
1048
num_seen: 0,
1049
next_lower_bound: 0,
1050
active: Default::default(),
1051
}
1052
}
1053
1054
pub fn find_first_window_around(
1055
&self,
1056
mut lower_bound: i64,
1057
target: i64,
1058
) -> PolarsResult<Result<(i64, i64), i64>> {
1059
let mut upper_bound = (self.add)(&self.period, lower_bound, self.tz.as_ref())?;
1060
while !is_below_upper_bound(target, upper_bound, self.closed) {
1061
let gap = target - lower_bound;
1062
let nth = match self.tu {
1063
TimeUnit::Nanoseconds
1064
if gap > self.every.nte_duration_ns() + self.period.nte_duration_ns() =>
1065
{
1066
((gap - self.period.nte_duration_ns()) as usize)
1067
/ (self.every.nte_duration_ns() as usize)
1068
},
1069
TimeUnit::Microseconds
1070
if gap > self.every.nte_duration_us() + self.period.nte_duration_us() =>
1071
{
1072
((gap - self.period.nte_duration_us()) as usize)
1073
/ (self.every.nte_duration_us() as usize)
1074
},
1075
TimeUnit::Milliseconds
1076
if gap > self.every.nte_duration_ms() + self.period.nte_duration_ms() =>
1077
{
1078
((gap - self.period.nte_duration_ms()) as usize)
1079
/ (self.every.nte_duration_ms() as usize)
1080
},
1081
_ => 1,
1082
};
1083
1084
let nth: i64 = nth.try_into().unwrap();
1085
lower_bound = (self.add)(&(self.every * nth), lower_bound, self.tz.as_ref())?;
1086
upper_bound = (self.add)(&self.period, lower_bound, self.tz.as_ref())?;
1087
}
1088
1089
if is_above_lower_bound(target, lower_bound, self.closed) {
1090
Ok(Ok((lower_bound, upper_bound)))
1091
} else {
1092
Ok(Err(lower_bound))
1093
}
1094
}
1095
1096
fn start_lower_bound(&self, first: i64) -> PolarsResult<i64> {
1097
match self.start_by {
1098
StartBy::DataPoint => Ok(first),
1099
StartBy::WindowBound => {
1100
let get_earliest_bounds = match self.tu {
1101
TimeUnit::Nanoseconds => Window::get_earliest_bounds_ns,
1102
TimeUnit::Microseconds => Window::get_earliest_bounds_us,
1103
TimeUnit::Milliseconds => Window::get_earliest_bounds_ms,
1104
};
1105
Ok((get_earliest_bounds)(
1106
&Window::new(self.every, self.period, self.offset),
1107
first,
1108
self.closed,
1109
self.tz.as_ref(),
1110
)?
1111
.start)
1112
},
1113
_ => {
1114
{
1115
#[allow(clippy::type_complexity)]
1116
let (from, to): (
1117
fn(i64) -> NaiveDateTime,
1118
fn(NaiveDateTime) -> i64,
1119
) = match self.tu {
1120
TimeUnit::Nanoseconds => {
1121
(timestamp_ns_to_datetime, datetime_to_timestamp_ns)
1122
},
1123
TimeUnit::Microseconds => {
1124
(timestamp_us_to_datetime, datetime_to_timestamp_us)
1125
},
1126
TimeUnit::Milliseconds => {
1127
(timestamp_ms_to_datetime, datetime_to_timestamp_ms)
1128
},
1129
};
1130
// find beginning of the week.
1131
let dt = from(first);
1132
match self.tz.as_ref() {
1133
#[cfg(feature = "timezones")]
1134
Some(tz) => {
1135
let dt = tz.from_utc_datetime(&dt);
1136
let dt = dt.beginning_of_week();
1137
let dt = dt.naive_utc();
1138
let start = to(dt);
1139
// adjust start of the week based on given day of the week
1140
let start = (self.add)(
1141
&Duration::parse(&format!("{}d", self.start_by.weekday().unwrap())),
1142
start,
1143
self.tz.as_ref(),
1144
)?;
1145
// apply the 'offset'
1146
let start = (self.add)(&self.offset, start, self.tz.as_ref())?;
1147
// make sure the first datapoint has a chance to be included
1148
// and compute the end of the window defined by the 'period'
1149
Ok(ensure_t_in_or_in_front_of_window(
1150
self.every,
1151
first,
1152
self.add,
1153
self.nte,
1154
self.period,
1155
start,
1156
self.closed,
1157
self.tz.as_ref(),
1158
)?
1159
.start)
1160
},
1161
_ => {
1162
let tz = chrono::Utc;
1163
let dt = dt.and_local_timezone(tz).unwrap();
1164
let dt = dt.beginning_of_week();
1165
let dt = dt.naive_utc();
1166
let start = to(dt);
1167
// adjust start of the week based on given day of the week
1168
let start = (self.add)(
1169
&Duration::parse(&format!("{}d", self.start_by.weekday().unwrap())),
1170
start,
1171
None,
1172
)
1173
.unwrap();
1174
// apply the 'offset'
1175
let start = (self.add)(&self.offset, start, None).unwrap();
1176
// make sure the first datapoint has a chance to be included
1177
// and compute the end of the window defined by the 'period'
1178
Ok(ensure_t_in_or_in_front_of_window(
1179
self.every,
1180
first,
1181
self.add,
1182
self.nte,
1183
self.period,
1184
start,
1185
self.closed,
1186
None,
1187
)?
1188
.start)
1189
},
1190
}
1191
}
1192
},
1193
}
1194
}
1195
1196
pub fn insert(
1197
&mut self,
1198
time: &[i64],
1199
windows: &mut Vec<[IdxSize; 2]>,
1200
lower_bound: &mut Vec<i64>,
1201
upper_bound: &mut Vec<i64>,
1202
) -> PolarsResult<()> {
1203
if time.is_empty() {
1204
return Ok(());
1205
}
1206
1207
if self.num_seen == 0 {
1208
debug_assert!(self.active.is_empty());
1209
self.next_lower_bound = self.start_lower_bound(time[0])?;
1210
}
1211
1212
for &t in time {
1213
while let Some(w) = self.active.front()
1214
&& !is_below_upper_bound(t, w.upper_bound, self.closed)
1215
{
1216
let w = self.active.pop_front().unwrap();
1217
windows.push([w.start, self.num_seen - w.start]);
1218
if self.include_lower_bound {
1219
lower_bound.push(w.lower_bound);
1220
}
1221
if self.include_upper_bound {
1222
upper_bound.push(w.upper_bound);
1223
}
1224
}
1225
1226
while is_above_lower_bound(t, self.next_lower_bound, self.closed) {
1227
match self.find_first_window_around(self.next_lower_bound, t)? {
1228
Ok((lower_bound, upper_bound)) => {
1229
self.next_lower_bound =
1230
(self.add)(&self.every, lower_bound, self.tz.as_ref())?;
1231
self.active.push_back(ActiveDynWindow {
1232
start: self.num_seen,
1233
lower_bound,
1234
upper_bound,
1235
});
1236
},
1237
Err(lower_bound) => {
1238
self.next_lower_bound = lower_bound;
1239
break;
1240
},
1241
}
1242
}
1243
1244
self.num_seen += 1
1245
}
1246
1247
Ok(())
1248
}
1249
1250
pub fn lowest_needed_index(&self) -> IdxSize {
1251
self.active.front().map_or(self.num_seen, |w| w.start)
1252
}
1253
1254
pub fn finalize(
1255
&mut self,
1256
windows: &mut Vec<[IdxSize; 2]>,
1257
lower_bound: &mut Vec<i64>,
1258
upper_bound: &mut Vec<i64>,
1259
) {
1260
for w in self.active.drain(..) {
1261
windows.push([w.start, self.num_seen - w.start]);
1262
if self.include_lower_bound {
1263
lower_bound.push(w.lower_bound);
1264
}
1265
if self.include_upper_bound {
1266
upper_bound.push(w.upper_bound);
1267
}
1268
}
1269
1270
self.next_lower_bound = 0;
1271
self.num_seen = 0;
1272
}
1273
1274
pub fn num_seen(&self) -> IdxSize {
1275
self.num_seen
1276
}
1277
1278
pub fn time_unit(&self) -> TimeUnit {
1279
self.tu
1280
}
1281
}
1282
1283
#[cfg(test)]
1284
mod test {
1285
use super::*;
1286
1287
#[test]
1288
fn test_prune_duplicates() {
1289
// |--|------------|----|---------|
1290
// 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
1291
let time = &[0, 1, 1, 2, 2, 2, 3, 4, 5, 6, 5];
1292
let mut splits = vec![(0, 2), (2, 4), (6, 2), (8, 3)];
1293
prune_splits_on_duplicates(time, &mut splits);
1294
assert_eq!(splits, &[(0, 6), (6, 2), (8, 3)]);
1295
}
1296
}
1297
1298