Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-time/src/windows/group_by.rs
6939 views
1
use arrow::legacy::time_zone::Tz;
2
use arrow::trusted_len::TrustedLen;
3
use polars_core::POOL;
4
use polars_core::prelude::*;
5
use polars_core::utils::_split_offsets;
6
use polars_core::utils::flatten::flatten_par;
7
use rayon::prelude::*;
8
#[cfg(feature = "serde")]
9
use serde::{Deserialize, Serialize};
10
use strum_macros::IntoStaticStr;
11
12
use crate::prelude::*;
13
14
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, IntoStaticStr)]
15
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
16
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
17
#[strum(serialize_all = "snake_case")]
18
pub enum ClosedWindow {
19
Left,
20
Right,
21
Both,
22
None,
23
}
24
25
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, IntoStaticStr)]
26
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
27
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
28
#[strum(serialize_all = "snake_case")]
29
pub enum Label {
30
Left,
31
Right,
32
DataPoint,
33
}
34
35
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, IntoStaticStr)]
36
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
37
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
38
#[strum(serialize_all = "snake_case")]
39
pub enum StartBy {
40
WindowBound,
41
DataPoint,
42
/// only useful if periods are weekly
43
Monday,
44
Tuesday,
45
Wednesday,
46
Thursday,
47
Friday,
48
Saturday,
49
Sunday,
50
}
51
52
impl Default for StartBy {
53
fn default() -> Self {
54
Self::WindowBound
55
}
56
}
57
58
impl StartBy {
59
pub fn weekday(&self) -> Option<u32> {
60
match self {
61
StartBy::Monday => Some(0),
62
StartBy::Tuesday => Some(1),
63
StartBy::Wednesday => Some(2),
64
StartBy::Thursday => Some(3),
65
StartBy::Friday => Some(4),
66
StartBy::Saturday => Some(5),
67
StartBy::Sunday => Some(6),
68
_ => None,
69
}
70
}
71
}
72
73
#[allow(clippy::too_many_arguments)]
74
fn update_groups_and_bounds(
75
bounds_iter: BoundsIter<'_>,
76
mut start: usize,
77
time: &[i64],
78
closed_window: ClosedWindow,
79
include_lower_bound: bool,
80
include_upper_bound: bool,
81
lower_bound: &mut Vec<i64>,
82
upper_bound: &mut Vec<i64>,
83
groups: &mut Vec<[IdxSize; 2]>,
84
) {
85
'bounds: for bi in bounds_iter {
86
// find starting point of window
87
for &t in &time[start..time.len().saturating_sub(1)] {
88
// the window is behind the time values.
89
if bi.is_future(t, closed_window) {
90
continue 'bounds;
91
}
92
if bi.is_member_entry(t, closed_window) {
93
break;
94
}
95
start += 1;
96
}
97
98
// find members of this window
99
let mut end = start;
100
101
// last value isn't always added
102
if end == time.len() - 1 {
103
let t = time[end];
104
if bi.is_member(t, closed_window) {
105
if include_lower_bound {
106
lower_bound.push(bi.start);
107
}
108
if include_upper_bound {
109
upper_bound.push(bi.stop);
110
}
111
groups.push([end as IdxSize, 1])
112
}
113
continue;
114
}
115
for &t in &time[end..] {
116
if !bi.is_member_exit(t, closed_window) {
117
break;
118
}
119
end += 1;
120
}
121
let len = end - start;
122
123
if include_lower_bound {
124
lower_bound.push(bi.start);
125
}
126
if include_upper_bound {
127
upper_bound.push(bi.stop);
128
}
129
groups.push([start as IdxSize, len as IdxSize])
130
}
131
}
132
133
/// Window boundaries are created based on the given `Window`, which is defined by:
134
/// - every
135
/// - period
136
/// - offset
137
///
138
/// And every window boundary we search for the values that fit that window by the given
139
/// `ClosedWindow`. The groups are return as `GroupTuples` together with the lower bound and upper
140
/// bound timestamps. These timestamps indicate the start (lower) and end (upper) of the window of
141
/// that group.
142
///
143
/// If `include_boundaries` is `false` those `lower` and `upper` vectors will be empty.
144
#[allow(clippy::too_many_arguments)]
145
pub fn group_by_windows(
146
window: Window,
147
time: &[i64],
148
closed_window: ClosedWindow,
149
tu: TimeUnit,
150
tz: &Option<TimeZone>,
151
include_lower_bound: bool,
152
include_upper_bound: bool,
153
start_by: StartBy,
154
) -> PolarsResult<(GroupsSlice, Vec<i64>, Vec<i64>)> {
155
let start = time[0];
156
// the boundary we define here is not yet correct. It doesn't take 'period' into account
157
// and it doesn't have the proper starting point. This boundary is used as a proxy to find
158
// the proper 'boundary' in 'window.get_overlapping_bounds_iter'.
159
let boundary = if time.len() > 1 {
160
// +1 because left or closed boundary could match the next window if it is on the boundary
161
let stop = time[time.len() - 1] + 1;
162
Bounds::new_checked(start, stop)
163
} else {
164
let stop = start + 1;
165
Bounds::new_checked(start, stop)
166
};
167
168
let size = {
169
match tu {
170
TimeUnit::Nanoseconds => window.estimate_overlapping_bounds_ns(boundary),
171
TimeUnit::Microseconds => window.estimate_overlapping_bounds_us(boundary),
172
TimeUnit::Milliseconds => window.estimate_overlapping_bounds_ms(boundary),
173
}
174
};
175
let size_lower = if include_lower_bound { size } else { 0 };
176
let size_upper = if include_upper_bound { size } else { 0 };
177
let mut lower_bound = Vec::with_capacity(size_lower);
178
let mut upper_bound = Vec::with_capacity(size_upper);
179
180
let mut groups = Vec::with_capacity(size);
181
let start_offset = 0;
182
183
match tz {
184
#[cfg(feature = "timezones")]
185
Some(tz) => {
186
update_groups_and_bounds(
187
window.get_overlapping_bounds_iter(
188
boundary,
189
closed_window,
190
tu,
191
tz.parse::<Tz>().ok().as_ref(),
192
start_by,
193
)?,
194
start_offset,
195
time,
196
closed_window,
197
include_lower_bound,
198
include_upper_bound,
199
&mut lower_bound,
200
&mut upper_bound,
201
&mut groups,
202
);
203
},
204
_ => {
205
update_groups_and_bounds(
206
window.get_overlapping_bounds_iter(boundary, closed_window, tu, None, start_by)?,
207
start_offset,
208
time,
209
closed_window,
210
include_lower_bound,
211
include_upper_bound,
212
&mut lower_bound,
213
&mut upper_bound,
214
&mut groups,
215
);
216
},
217
};
218
219
Ok((groups, lower_bound, upper_bound))
220
}
221
222
// t is right at the end of the window
223
// ------t---
224
// [------]
225
#[inline]
226
#[allow(clippy::too_many_arguments)]
227
pub(crate) fn group_by_values_iter_lookbehind(
228
period: Duration,
229
offset: Duration,
230
time: &[i64],
231
closed_window: ClosedWindow,
232
tu: TimeUnit,
233
tz: Option<Tz>,
234
start_offset: usize,
235
upper_bound: Option<usize>,
236
) -> PolarsResult<impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_> {
237
debug_assert!(offset.duration_ns() == period.duration_ns());
238
debug_assert!(offset.negative);
239
let add = match tu {
240
TimeUnit::Nanoseconds => Duration::add_ns,
241
TimeUnit::Microseconds => Duration::add_us,
242
TimeUnit::Milliseconds => Duration::add_ms,
243
};
244
245
let upper_bound = upper_bound.unwrap_or(time.len());
246
// Use binary search to find the initial start as that is behind.
247
let mut start = if let Some(&t) = time.get(start_offset) {
248
let lower = add(&offset, t, tz.as_ref())?;
249
// We have `period == -offset`, so `t + offset + period` is equal to `t`,
250
// and `upper` is trivially equal to `t` itself. Using the trivial calculation,
251
// instead of `upper = lower + period`, avoids issues around
252
// `t - 1mo + 1mo` not round-tripping.
253
let upper = t;
254
let b = Bounds::new(lower, upper);
255
let slice = &time[..start_offset];
256
slice.partition_point(|v| !b.is_member(*v, closed_window))
257
} else {
258
0
259
};
260
let mut end = start;
261
let mut last = time[start_offset];
262
Ok(time[start_offset..upper_bound]
263
.iter()
264
.enumerate()
265
.map(move |(mut i, t)| {
266
// Fast path for duplicates.
267
if *t == last && i > 0 {
268
let len = end - start;
269
let offset = start as IdxSize;
270
return Ok((offset, len as IdxSize));
271
}
272
last = *t;
273
i += start_offset;
274
275
let lower = add(&offset, *t, tz.as_ref())?;
276
let upper = *t;
277
278
let b = Bounds::new(lower, upper);
279
280
for &t in unsafe { time.get_unchecked(start..i) } {
281
if b.is_member_entry(t, closed_window) {
282
break;
283
}
284
start += 1;
285
}
286
287
// faster path, check if `i` is member.
288
if b.is_member_exit(*t, closed_window) {
289
end = i;
290
} else {
291
end = std::cmp::max(end, start);
292
}
293
// we still must loop to consume duplicates
294
for &t in unsafe { time.get_unchecked(end..) } {
295
if !b.is_member_exit(t, closed_window) {
296
break;
297
}
298
end += 1;
299
}
300
301
let len = end - start;
302
let offset = start as IdxSize;
303
304
Ok((offset, len as IdxSize))
305
}))
306
}
307
308
// this one is correct for all lookbehind/lookaheads, but is slower
309
// window is completely behind t and t itself is not a member
310
// ---------------t---
311
// [---]
312
pub(crate) fn group_by_values_iter_window_behind_t(
313
period: Duration,
314
offset: Duration,
315
time: &[i64],
316
closed_window: ClosedWindow,
317
tu: TimeUnit,
318
tz: Option<Tz>,
319
) -> impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_ {
320
let add = match tu {
321
TimeUnit::Nanoseconds => Duration::add_ns,
322
TimeUnit::Microseconds => Duration::add_us,
323
TimeUnit::Milliseconds => Duration::add_ms,
324
};
325
326
let mut start = 0;
327
let mut end = start;
328
let mut last = time[0];
329
let mut started = false;
330
time.iter().map(move |lower| {
331
// Fast path for duplicates.
332
if *lower == last && started {
333
let len = end - start;
334
let offset = start as IdxSize;
335
return Ok((offset, len as IdxSize));
336
}
337
last = *lower;
338
started = true;
339
let lower = add(&offset, *lower, tz.as_ref())?;
340
let upper = add(&period, lower, tz.as_ref())?;
341
342
let b = Bounds::new(lower, upper);
343
if b.is_future(time[0], closed_window) {
344
Ok((0, 0))
345
} else {
346
for &t in &time[start..] {
347
if b.is_member_entry(t, closed_window) {
348
break;
349
}
350
start += 1;
351
}
352
353
end = std::cmp::max(start, end);
354
for &t in &time[end..] {
355
if !b.is_member_exit(t, closed_window) {
356
break;
357
}
358
end += 1;
359
}
360
361
let len = end - start;
362
let offset = start as IdxSize;
363
364
Ok((offset, len as IdxSize))
365
}
366
})
367
}
368
369
// window is with -1 periods of t
370
// ----t---
371
// [---]
372
pub(crate) fn group_by_values_iter_partial_lookbehind(
373
period: Duration,
374
offset: Duration,
375
time: &[i64],
376
closed_window: ClosedWindow,
377
tu: TimeUnit,
378
tz: Option<Tz>,
379
) -> impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_ {
380
let add = match tu {
381
TimeUnit::Nanoseconds => Duration::add_ns,
382
TimeUnit::Microseconds => Duration::add_us,
383
TimeUnit::Milliseconds => Duration::add_ms,
384
};
385
386
let mut start = 0;
387
let mut end = start;
388
let mut last = time[0];
389
time.iter().enumerate().map(move |(i, lower)| {
390
// Fast path for duplicates.
391
if *lower == last && i > 0 {
392
let len = end - start;
393
let offset = start as IdxSize;
394
return Ok((offset, len as IdxSize));
395
}
396
last = *lower;
397
398
let lower = add(&offset, *lower, tz.as_ref())?;
399
let upper = add(&period, lower, tz.as_ref())?;
400
401
let b = Bounds::new(lower, upper);
402
403
for &t in &time[start..] {
404
if b.is_member_entry(t, closed_window) || start == i {
405
break;
406
}
407
start += 1;
408
}
409
410
end = std::cmp::max(start, end);
411
for &t in &time[end..] {
412
if !b.is_member_exit(t, closed_window) {
413
break;
414
}
415
end += 1;
416
}
417
418
let len = end - start;
419
let offset = start as IdxSize;
420
421
Ok((offset, len as IdxSize))
422
})
423
}
424
425
#[allow(clippy::too_many_arguments)]
426
// window is completely ahead of t and t itself is not a member
427
// --t-----------
428
// [---]
429
pub(crate) fn group_by_values_iter_lookahead(
430
period: Duration,
431
offset: Duration,
432
time: &[i64],
433
closed_window: ClosedWindow,
434
tu: TimeUnit,
435
tz: Option<Tz>,
436
start_offset: usize,
437
upper_bound: Option<usize>,
438
) -> impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_ {
439
let upper_bound = upper_bound.unwrap_or(time.len());
440
441
let add = match tu {
442
TimeUnit::Nanoseconds => Duration::add_ns,
443
TimeUnit::Microseconds => Duration::add_us,
444
TimeUnit::Milliseconds => Duration::add_ms,
445
};
446
let mut start = start_offset;
447
let mut end = start;
448
449
let mut last = time[start_offset];
450
let mut started = false;
451
time[start_offset..upper_bound].iter().map(move |lower| {
452
// Fast path for duplicates.
453
if *lower == last && started {
454
let len = end - start;
455
let offset = start as IdxSize;
456
return Ok((offset, len as IdxSize));
457
}
458
started = true;
459
last = *lower;
460
461
let lower = add(&offset, *lower, tz.as_ref())?;
462
let upper = add(&period, lower, tz.as_ref())?;
463
464
let b = Bounds::new(lower, upper);
465
466
for &t in &time[start..] {
467
if b.is_member_entry(t, closed_window) {
468
break;
469
}
470
start += 1;
471
}
472
473
end = std::cmp::max(start, end);
474
for &t in &time[end..] {
475
if !b.is_member_exit(t, closed_window) {
476
break;
477
}
478
end += 1;
479
}
480
481
let len = end - start;
482
let offset = start as IdxSize;
483
484
Ok((offset, len as IdxSize))
485
})
486
}
487
488
#[cfg(feature = "rolling_window_by")]
489
#[inline]
490
pub(crate) fn group_by_values_iter(
491
period: Duration,
492
time: &[i64],
493
closed_window: ClosedWindow,
494
tu: TimeUnit,
495
tz: Option<Tz>,
496
) -> PolarsResult<impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_> {
497
let mut offset = period;
498
offset.negative = true;
499
// t is at the right endpoint of the window
500
group_by_values_iter_lookbehind(period, offset, time, closed_window, tu, tz, 0, None)
501
}
502
503
/// Checks if the boundary elements don't split on duplicates.
504
/// If they do we remove them
505
fn prune_splits_on_duplicates(time: &[i64], thread_offsets: &mut Vec<(usize, usize)>) {
506
let is_valid = |window: &[(usize, usize)]| -> bool {
507
debug_assert_eq!(window.len(), 2);
508
let left_block_end = window[0].0 + window[0].1.saturating_sub(1);
509
let right_block_start = window[1].0;
510
time[left_block_end] != time[right_block_start]
511
};
512
513
if time.is_empty() || thread_offsets.len() <= 1 || thread_offsets.windows(2).all(is_valid) {
514
return;
515
}
516
517
let mut new = vec![];
518
for window in thread_offsets.windows(2) {
519
let this_block_is_valid = is_valid(window);
520
if this_block_is_valid {
521
// Only push left block
522
new.push(window[0])
523
}
524
}
525
// Check last block
526
if thread_offsets.len().is_multiple_of(2) {
527
let window = &thread_offsets[thread_offsets.len() - 2..];
528
if is_valid(window) {
529
new.push(thread_offsets[thread_offsets.len() - 1])
530
}
531
}
532
// We pruned invalid blocks, now we must correct the lengths.
533
if new.len() <= 1 {
534
new = vec![(0, time.len())];
535
} else {
536
let mut previous_start = time.len();
537
for window in new.iter_mut().rev() {
538
window.1 = previous_start - window.0;
539
previous_start = window.0;
540
}
541
new[0].0 = 0;
542
new[0].1 = new[1].0;
543
debug_assert_eq!(new.iter().map(|w| w.1).sum::<usize>(), time.len());
544
// Call again to check.
545
prune_splits_on_duplicates(time, &mut new)
546
}
547
std::mem::swap(thread_offsets, &mut new);
548
}
549
550
#[allow(clippy::too_many_arguments)]
551
fn group_by_values_iter_lookbehind_collected(
552
period: Duration,
553
offset: Duration,
554
time: &[i64],
555
closed_window: ClosedWindow,
556
tu: TimeUnit,
557
tz: Option<Tz>,
558
start_offset: usize,
559
upper_bound: Option<usize>,
560
) -> PolarsResult<Vec<[IdxSize; 2]>> {
561
let iter = group_by_values_iter_lookbehind(
562
period,
563
offset,
564
time,
565
closed_window,
566
tu,
567
tz,
568
start_offset,
569
upper_bound,
570
)?;
571
iter.map(|result| result.map(|(offset, len)| [offset, len]))
572
.collect::<PolarsResult<Vec<_>>>()
573
}
574
575
#[allow(clippy::too_many_arguments)]
576
pub(crate) fn group_by_values_iter_lookahead_collected(
577
period: Duration,
578
offset: Duration,
579
time: &[i64],
580
closed_window: ClosedWindow,
581
tu: TimeUnit,
582
tz: Option<Tz>,
583
start_offset: usize,
584
upper_bound: Option<usize>,
585
) -> PolarsResult<Vec<[IdxSize; 2]>> {
586
let iter = group_by_values_iter_lookahead(
587
period,
588
offset,
589
time,
590
closed_window,
591
tu,
592
tz,
593
start_offset,
594
upper_bound,
595
);
596
iter.map(|result| result.map(|(offset, len)| [offset as IdxSize, len]))
597
.collect::<PolarsResult<Vec<_>>>()
598
}
599
600
/// Different from `group_by_windows`, where define window buckets and search which values fit that
601
/// pre-defined bucket.
602
///
603
/// This function defines every window based on the:
604
/// - timestamp (lower bound)
605
/// - timestamp + period (upper bound)
606
/// where timestamps are the individual values in the array `time`
607
pub fn group_by_values(
608
period: Duration,
609
offset: Duration,
610
time: &[i64],
611
closed_window: ClosedWindow,
612
tu: TimeUnit,
613
tz: Option<Tz>,
614
) -> PolarsResult<GroupsSlice> {
615
if time.is_empty() {
616
return Ok(GroupsSlice::from(vec![]));
617
}
618
619
let mut thread_offsets = _split_offsets(time.len(), POOL.current_num_threads());
620
// there are duplicates in the splits, so we opt for a single partition
621
prune_splits_on_duplicates(time, &mut thread_offsets);
622
623
// If we start from within parallel work we will do this single threaded.
624
let run_parallel = !POOL.current_thread_has_pending_tasks().unwrap_or(false);
625
626
// we have a (partial) lookbehind window
627
if offset.negative && !offset.is_zero() {
628
// lookbehind
629
if offset.duration_ns() == period.duration_ns() {
630
// t is right at the end of the window
631
// ------t---
632
// [------]
633
if !run_parallel {
634
let vecs = group_by_values_iter_lookbehind_collected(
635
period,
636
offset,
637
time,
638
closed_window,
639
tu,
640
tz,
641
0,
642
None,
643
)?;
644
return Ok(GroupsSlice::from(vecs));
645
}
646
647
POOL.install(|| {
648
let vals = thread_offsets
649
.par_iter()
650
.copied()
651
.map(|(base_offset, len)| {
652
let upper_bound = base_offset + len;
653
group_by_values_iter_lookbehind_collected(
654
period,
655
offset,
656
time,
657
closed_window,
658
tu,
659
tz,
660
base_offset,
661
Some(upper_bound),
662
)
663
})
664
.collect::<PolarsResult<Vec<_>>>()?;
665
Ok(flatten_par(&vals))
666
})
667
} else if ((offset.duration_ns() >= period.duration_ns())
668
&& matches!(closed_window, ClosedWindow::Left | ClosedWindow::None))
669
|| ((offset.duration_ns() > period.duration_ns())
670
&& matches!(closed_window, ClosedWindow::Right | ClosedWindow::Both))
671
{
672
// window is completely behind t and t itself is not a member
673
// ---------------t---
674
// [---]
675
let iter =
676
group_by_values_iter_window_behind_t(period, offset, time, closed_window, tu, tz);
677
iter.map(|result| result.map(|(offset, len)| [offset, len]))
678
.collect::<PolarsResult<_>>()
679
}
680
// partial lookbehind
681
// this one is still single threaded
682
// can make it parallel later, its a bit more complicated because the boundaries are unknown
683
// window is with -1 periods of t
684
// ----t---
685
// [---]
686
else {
687
let iter = group_by_values_iter_partial_lookbehind(
688
period,
689
offset,
690
time,
691
closed_window,
692
tu,
693
tz,
694
);
695
iter.map(|result| result.map(|(offset, len)| [offset, len]))
696
.collect::<PolarsResult<_>>()
697
}
698
} else if !offset.is_zero()
699
|| closed_window == ClosedWindow::Right
700
|| closed_window == ClosedWindow::None
701
{
702
// window is completely ahead of t and t itself is not a member
703
// --t-----------
704
// [---]
705
706
if !run_parallel {
707
let vecs = group_by_values_iter_lookahead_collected(
708
period,
709
offset,
710
time,
711
closed_window,
712
tu,
713
tz,
714
0,
715
None,
716
)?;
717
return Ok(GroupsSlice::from(vecs));
718
}
719
720
POOL.install(|| {
721
let vals = thread_offsets
722
.par_iter()
723
.copied()
724
.map(|(base_offset, len)| {
725
let lower_bound = base_offset;
726
let upper_bound = base_offset + len;
727
group_by_values_iter_lookahead_collected(
728
period,
729
offset,
730
time,
731
closed_window,
732
tu,
733
tz,
734
lower_bound,
735
Some(upper_bound),
736
)
737
})
738
.collect::<PolarsResult<Vec<_>>>()?;
739
Ok(flatten_par(&vals))
740
})
741
} else {
742
if !run_parallel {
743
let vecs = group_by_values_iter_lookahead_collected(
744
period,
745
offset,
746
time,
747
closed_window,
748
tu,
749
tz,
750
0,
751
None,
752
)?;
753
return Ok(GroupsSlice::from(vecs));
754
}
755
756
// Offset is 0 and window is closed on the left:
757
// it must be that the window starts at t and t is a member
758
// --t-----------
759
// [---]
760
POOL.install(|| {
761
let vals = thread_offsets
762
.par_iter()
763
.copied()
764
.map(|(base_offset, len)| {
765
let lower_bound = base_offset;
766
let upper_bound = base_offset + len;
767
group_by_values_iter_lookahead_collected(
768
period,
769
offset,
770
time,
771
closed_window,
772
tu,
773
tz,
774
lower_bound,
775
Some(upper_bound),
776
)
777
})
778
.collect::<PolarsResult<Vec<_>>>()?;
779
Ok(flatten_par(&vals))
780
})
781
}
782
}
783
784
#[cfg(test)]
785
mod test {
786
use super::*;
787
788
#[test]
789
fn test_prune_duplicates() {
790
// |--|------------|----|---------|
791
// 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
792
let time = &[0, 1, 1, 2, 2, 2, 3, 4, 5, 6, 5];
793
let mut splits = vec![(0, 2), (2, 4), (6, 2), (8, 3)];
794
prune_splits_on_duplicates(time, &mut splits);
795
assert_eq!(splits, &[(0, 6), (6, 2), (8, 3)]);
796
}
797
}
798
799