Path: blob/main/crates/polars-time/src/windows/group_by.rs
8430 views
use std::collections::VecDeque;12use arrow::legacy::time_zone::Tz;3use arrow::temporal_conversions::{4timestamp_ms_to_datetime, timestamp_ns_to_datetime, timestamp_us_to_datetime,5};6use arrow::trusted_len::TrustedLen;7use chrono::NaiveDateTime;8#[cfg(feature = "timezones")]9use chrono::TimeZone as _;10use now::DateTimeNow;11use polars_core::POOL;12use polars_core::prelude::*;13use polars_core::utils::_split_offsets;14use polars_core::utils::flatten::flatten_par;15use rayon::prelude::*;16#[cfg(feature = "serde")]17use serde::{Deserialize, Serialize};18use strum_macros::IntoStaticStr;1920use crate::prelude::*;2122#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, IntoStaticStr)]23#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]24#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]25#[strum(serialize_all = "snake_case")]26pub enum ClosedWindow {27Left,28Right,29Both,30None,31}3233#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, IntoStaticStr)]34#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]35#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]36#[strum(serialize_all = "snake_case")]37pub enum Label {38Left,39Right,40DataPoint,41}4243#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, IntoStaticStr)]44#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]45#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]46#[strum(serialize_all = "snake_case")]47#[derive(Default)]48pub enum StartBy {49#[default]50WindowBound,51DataPoint,52/// only useful if periods are weekly53Monday,54Tuesday,55Wednesday,56Thursday,57Friday,58Saturday,59Sunday,60}6162impl StartBy {63pub fn weekday(&self) -> Option<u32> {64match self {65StartBy::Monday => Some(0),66StartBy::Tuesday => Some(1),67StartBy::Wednesday => Some(2),68StartBy::Thursday => Some(3),69StartBy::Friday => Some(4),70StartBy::Saturday => Some(5),71StartBy::Sunday => Some(6),72_ => None,73}74}75}7677#[allow(clippy::too_many_arguments)]78fn update_groups_and_bounds(79bounds_iter: BoundsIter<'_>,80mut start: usize,81time: &[i64],82closed_window: ClosedWindow,83include_lower_bound: bool,84include_upper_bound: bool,85lower_bound: &mut Vec<i64>,86upper_bound: &mut Vec<i64>,87groups: &mut Vec<[IdxSize; 2]>,88) {89let mut iter = bounds_iter.into_iter();90let mut stride = 0;9192'bounds: while let Some(bi) = iter.nth(stride) {93let mut has_member = false;94// find starting point of window95for &t in &time[start..time.len().saturating_sub(1)] {96// the window is behind the time values.97if bi.is_future(t, closed_window) {98stride = iter.get_stride(t);99continue 'bounds;100}101if bi.is_member_entry(t, closed_window) {102has_member = true;103break;104}105// element drops out of the window106start += 1;107}108109// update stride so we can fast-forward in case of sparse data110stride = if has_member {1110112} else {113debug_assert!(start < time.len());114iter.get_stride(time[start])115};116117// find members of this window118let mut end = start;119120// last value isn't always added121if end == time.len() - 1 {122let t = time[end];123if bi.is_member(t, closed_window) {124if include_lower_bound {125lower_bound.push(bi.start);126}127if include_upper_bound {128upper_bound.push(bi.stop);129}130groups.push([end as IdxSize, 1])131}132continue;133}134for &t in &time[end..] {135if !bi.is_member_exit(t, closed_window) {136break;137}138end += 1;139}140let len = end - start;141142if include_lower_bound {143lower_bound.push(bi.start);144}145if include_upper_bound {146upper_bound.push(bi.stop);147}148groups.push([start as IdxSize, len as IdxSize])149}150}151152/// Window boundaries are created based on the given `Window`, which is defined by:153/// - every154/// - period155/// - offset156///157/// And every window boundary we search for the values that fit that window by the given158/// `ClosedWindow`. The groups are return as `GroupTuples` together with the lower bound and upper159/// bound timestamps. These timestamps indicate the start (lower) and end (upper) of the window of160/// that group.161///162/// If `include_boundaries` is `false` those `lower` and `upper` vectors will be empty.163#[allow(clippy::too_many_arguments)]164pub fn group_by_windows(165window: Window,166time: &[i64],167closed_window: ClosedWindow,168tu: TimeUnit,169tz: &Option<TimeZone>,170include_lower_bound: bool,171include_upper_bound: bool,172start_by: StartBy,173) -> PolarsResult<(GroupsSlice, Vec<i64>, Vec<i64>)> {174let start = time[0];175// the boundary we define here is not yet correct. It doesn't take 'period' into account176// and it doesn't have the proper starting point. This boundary is used as a proxy to find177// the proper 'boundary' in 'window.get_overlapping_bounds_iter'.178let boundary = if time.len() > 1 {179// +1 because left or closed boundary could match the next window if it is on the boundary180let stop = time[time.len() - 1] + 1;181Bounds::new_checked(start, stop)182} else {183let stop = start + 1;184Bounds::new_checked(start, stop)185};186187let size = {188match tu {189TimeUnit::Nanoseconds => window.estimate_overlapping_bounds_ns(boundary),190TimeUnit::Microseconds => window.estimate_overlapping_bounds_us(boundary),191TimeUnit::Milliseconds => window.estimate_overlapping_bounds_ms(boundary),192}193};194let size_lower = if include_lower_bound { size } else { 0 };195let size_upper = if include_upper_bound { size } else { 0 };196let mut lower_bound = Vec::with_capacity(size_lower);197let mut upper_bound = Vec::with_capacity(size_upper);198199let mut groups = Vec::with_capacity(size);200let start_offset = 0;201202match tz {203#[cfg(feature = "timezones")]204Some(tz) => {205update_groups_and_bounds(206window.get_overlapping_bounds_iter(207boundary,208closed_window,209tu,210tz.parse::<Tz>().ok().as_ref(),211start_by,212)?,213start_offset,214time,215closed_window,216include_lower_bound,217include_upper_bound,218&mut lower_bound,219&mut upper_bound,220&mut groups,221);222},223_ => {224update_groups_and_bounds(225window.get_overlapping_bounds_iter(boundary, closed_window, tu, None, start_by)?,226start_offset,227time,228closed_window,229include_lower_bound,230include_upper_bound,231&mut lower_bound,232&mut upper_bound,233&mut groups,234);235},236};237238Ok((groups, lower_bound, upper_bound))239}240241// t is right at the end of the window242// ------t---243// [------]244#[inline]245#[allow(clippy::too_many_arguments)]246pub(crate) fn group_by_values_iter_lookbehind(247period: Duration,248offset: Duration,249time: &[i64],250closed_window: ClosedWindow,251tu: TimeUnit,252tz: Option<Tz>,253start_offset: usize,254upper_bound: Option<usize>,255) -> PolarsResult<impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_> {256debug_assert!(offset.duration_ns() == period.duration_ns());257debug_assert!(offset.negative);258let add = match tu {259TimeUnit::Nanoseconds => Duration::add_ns,260TimeUnit::Microseconds => Duration::add_us,261TimeUnit::Milliseconds => Duration::add_ms,262};263264let upper_bound = upper_bound.unwrap_or(time.len());265// Use binary search to find the initial start as that is behind.266let mut start = if let Some(&t) = time.get(start_offset) {267let lower = add(&offset, t, tz.as_ref())?;268// We have `period == -offset`, so `t + offset + period` is equal to `t`,269// and `upper` is trivially equal to `t` itself. Using the trivial calculation,270// instead of `upper = lower + period`, avoids issues around271// `t - 1mo + 1mo` not round-tripping.272let upper = t;273let b = Bounds::new(lower, upper);274let slice = &time[..start_offset];275slice.partition_point(|v| !b.is_member(*v, closed_window))276} else {2770278};279let mut end = start;280let mut last = time[start_offset];281Ok(time[start_offset..upper_bound]282.iter()283.enumerate()284.map(move |(mut i, t)| {285// Fast path for duplicates.286if *t == last && i > 0 {287let len = end - start;288let offset = start as IdxSize;289return Ok((offset, len as IdxSize));290}291last = *t;292i += start_offset;293294let lower = add(&offset, *t, tz.as_ref())?;295let upper = *t;296297let b = Bounds::new(lower, upper);298299for &t in unsafe { time.get_unchecked(start..i) } {300if b.is_member_entry(t, closed_window) {301break;302}303start += 1;304}305306// faster path, check if `i` is member.307if b.is_member_exit(*t, closed_window) {308end = i;309} else {310end = std::cmp::max(end, start);311}312// we still must loop to consume duplicates313for &t in unsafe { time.get_unchecked(end..) } {314if !b.is_member_exit(t, closed_window) {315break;316}317end += 1;318}319320let len = end - start;321let offset = start as IdxSize;322323Ok((offset, len as IdxSize))324}))325}326327// this one is correct for all lookbehind/lookaheads, but is slower328// window is completely behind t and t itself is not a member329// ---------------t---330// [---]331pub(crate) fn group_by_values_iter_window_behind_t(332period: Duration,333offset: Duration,334time: &[i64],335closed_window: ClosedWindow,336tu: TimeUnit,337tz: Option<Tz>,338) -> impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_ {339let add = match tu {340TimeUnit::Nanoseconds => Duration::add_ns,341TimeUnit::Microseconds => Duration::add_us,342TimeUnit::Milliseconds => Duration::add_ms,343};344345let mut start = 0;346let mut end = start;347let mut last = time[0];348let mut started = false;349time.iter().map(move |lower| {350// Fast path for duplicates.351if *lower == last && started {352let len = end - start;353let offset = start as IdxSize;354return Ok((offset, len as IdxSize));355}356last = *lower;357started = true;358let lower = add(&offset, *lower, tz.as_ref())?;359let upper = add(&period, lower, tz.as_ref())?;360361let b = Bounds::new(lower, upper);362if b.is_future(time[0], closed_window) {363Ok((0, 0))364} else {365for &t in &time[start..] {366if b.is_member_entry(t, closed_window) {367break;368}369start += 1;370}371372end = std::cmp::max(start, end);373for &t in &time[end..] {374if !b.is_member_exit(t, closed_window) {375break;376}377end += 1;378}379380let len = end - start;381let offset = start as IdxSize;382383Ok((offset, len as IdxSize))384}385})386}387388// window is with -1 periods of t389// ----t---390// [---]391pub(crate) fn group_by_values_iter_partial_lookbehind(392period: Duration,393offset: Duration,394time: &[i64],395closed_window: ClosedWindow,396tu: TimeUnit,397tz: Option<Tz>,398) -> impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_ {399let add = match tu {400TimeUnit::Nanoseconds => Duration::add_ns,401TimeUnit::Microseconds => Duration::add_us,402TimeUnit::Milliseconds => Duration::add_ms,403};404405let mut start = 0;406let mut end = start;407let mut last = time[0];408time.iter().enumerate().map(move |(i, lower)| {409// Fast path for duplicates.410if *lower == last && i > 0 {411let len = end - start;412let offset = start as IdxSize;413return Ok((offset, len as IdxSize));414}415last = *lower;416417let lower = add(&offset, *lower, tz.as_ref())?;418let upper = add(&period, lower, tz.as_ref())?;419420let b = Bounds::new(lower, upper);421422for &t in &time[start..] {423if b.is_member_entry(t, closed_window) || start == i {424break;425}426start += 1;427}428429end = std::cmp::max(start, end);430for &t in &time[end..] {431if !b.is_member_exit(t, closed_window) {432break;433}434end += 1;435}436437let len = end - start;438let offset = start as IdxSize;439440Ok((offset, len as IdxSize))441})442}443444#[allow(clippy::too_many_arguments)]445// window is completely ahead of t and t itself is not a member446// --t-----------447// [---]448pub(crate) fn group_by_values_iter_lookahead(449period: Duration,450offset: Duration,451time: &[i64],452closed_window: ClosedWindow,453tu: TimeUnit,454tz: Option<Tz>,455start_offset: usize,456upper_bound: Option<usize>,457) -> impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_ {458let upper_bound = upper_bound.unwrap_or(time.len());459460let add = match tu {461TimeUnit::Nanoseconds => Duration::add_ns,462TimeUnit::Microseconds => Duration::add_us,463TimeUnit::Milliseconds => Duration::add_ms,464};465let mut start = start_offset;466let mut end = start;467468let mut last = time[start_offset];469let mut started = false;470time[start_offset..upper_bound].iter().map(move |lower| {471// Fast path for duplicates.472if *lower == last && started {473let len = end - start;474let offset = start as IdxSize;475return Ok((offset, len as IdxSize));476}477started = true;478last = *lower;479480let lower = add(&offset, *lower, tz.as_ref())?;481let upper = add(&period, lower, tz.as_ref())?;482483let b = Bounds::new(lower, upper);484485for &t in &time[start..] {486if b.is_member_entry(t, closed_window) {487break;488}489start += 1;490}491492end = std::cmp::max(start, end);493for &t in &time[end..] {494if !b.is_member_exit(t, closed_window) {495break;496}497end += 1;498}499500let len = end - start;501let offset = start as IdxSize;502503Ok((offset, len as IdxSize))504})505}506507#[cfg(feature = "rolling_window_by")]508#[inline]509pub(crate) fn group_by_values_iter(510period: Duration,511time: &[i64],512closed_window: ClosedWindow,513tu: TimeUnit,514tz: Option<Tz>,515) -> PolarsResult<impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_> {516let mut offset = period;517offset.negative = true;518// t is at the right endpoint of the window519group_by_values_iter_lookbehind(period, offset, time, closed_window, tu, tz, 0, None)520}521522/// Checks if the boundary elements don't split on duplicates.523/// If they do we remove them524fn prune_splits_on_duplicates(time: &[i64], thread_offsets: &mut Vec<(usize, usize)>) {525let is_valid = |window: &[(usize, usize)]| -> bool {526debug_assert_eq!(window.len(), 2);527let left_block_end = window[0].0 + window[0].1.saturating_sub(1);528let right_block_start = window[1].0;529time[left_block_end] != time[right_block_start]530};531532if time.is_empty() || thread_offsets.len() <= 1 || thread_offsets.windows(2).all(is_valid) {533return;534}535536let mut new = vec![];537for window in thread_offsets.windows(2) {538let this_block_is_valid = is_valid(window);539if this_block_is_valid {540// Only push left block541new.push(window[0])542}543}544// Check last block545if thread_offsets.len().is_multiple_of(2) {546let window = &thread_offsets[thread_offsets.len() - 2..];547if is_valid(window) {548new.push(thread_offsets[thread_offsets.len() - 1])549}550}551// We pruned invalid blocks, now we must correct the lengths.552if new.len() <= 1 {553new = vec![(0, time.len())];554} else {555let mut previous_start = time.len();556for window in new.iter_mut().rev() {557window.1 = previous_start - window.0;558previous_start = window.0;559}560new[0].0 = 0;561new[0].1 = new[1].0;562debug_assert_eq!(new.iter().map(|w| w.1).sum::<usize>(), time.len());563// Call again to check.564prune_splits_on_duplicates(time, &mut new)565}566std::mem::swap(thread_offsets, &mut new);567}568569#[allow(clippy::too_many_arguments)]570fn group_by_values_iter_lookbehind_collected(571period: Duration,572offset: Duration,573time: &[i64],574closed_window: ClosedWindow,575tu: TimeUnit,576tz: Option<Tz>,577start_offset: usize,578upper_bound: Option<usize>,579) -> PolarsResult<Vec<[IdxSize; 2]>> {580let iter = group_by_values_iter_lookbehind(581period,582offset,583time,584closed_window,585tu,586tz,587start_offset,588upper_bound,589)?;590iter.map(|result| result.map(|(offset, len)| [offset, len]))591.collect::<PolarsResult<Vec<_>>>()592}593594#[allow(clippy::too_many_arguments)]595pub(crate) fn group_by_values_iter_lookahead_collected(596period: Duration,597offset: Duration,598time: &[i64],599closed_window: ClosedWindow,600tu: TimeUnit,601tz: Option<Tz>,602start_offset: usize,603upper_bound: Option<usize>,604) -> PolarsResult<Vec<[IdxSize; 2]>> {605let iter = group_by_values_iter_lookahead(606period,607offset,608time,609closed_window,610tu,611tz,612start_offset,613upper_bound,614);615iter.map(|result| result.map(|(offset, len)| [offset as IdxSize, len]))616.collect::<PolarsResult<Vec<_>>>()617}618619/// Different from `group_by_windows`, where define window buckets and search which values fit that620/// pre-defined bucket.621///622/// This function defines every window based on the:623/// - timestamp (lower bound)624/// - timestamp + period (upper bound)625/// where timestamps are the individual values in the array `time`626pub fn group_by_values(627period: Duration,628offset: Duration,629time: &[i64],630closed_window: ClosedWindow,631tu: TimeUnit,632tz: Option<Tz>,633) -> PolarsResult<GroupsSlice> {634if time.is_empty() {635return Ok(GroupsSlice::from(vec![]));636}637638let mut thread_offsets = _split_offsets(time.len(), POOL.current_num_threads());639// there are duplicates in the splits, so we opt for a single partition640prune_splits_on_duplicates(time, &mut thread_offsets);641642// If we start from within parallel work we will do this single threaded.643let run_parallel = !POOL.current_thread_has_pending_tasks().unwrap_or(false);644645// we have a (partial) lookbehind window646if offset.negative && !offset.is_zero() {647// lookbehind648if offset.duration_ns() == period.duration_ns() {649// t is right at the end of the window650// ------t---651// [------]652if !run_parallel {653let vecs = group_by_values_iter_lookbehind_collected(654period,655offset,656time,657closed_window,658tu,659tz,6600,661None,662)?;663return Ok(GroupsSlice::from(vecs));664}665666POOL.install(|| {667let vals = thread_offsets668.par_iter()669.copied()670.map(|(base_offset, len)| {671let upper_bound = base_offset + len;672group_by_values_iter_lookbehind_collected(673period,674offset,675time,676closed_window,677tu,678tz,679base_offset,680Some(upper_bound),681)682})683.collect::<PolarsResult<Vec<_>>>()?;684Ok(flatten_par(&vals))685})686} else if ((offset.duration_ns() >= period.duration_ns())687&& matches!(closed_window, ClosedWindow::Left | ClosedWindow::None))688|| ((offset.duration_ns() > period.duration_ns())689&& matches!(closed_window, ClosedWindow::Right | ClosedWindow::Both))690{691// window is completely behind t and t itself is not a member692// ---------------t---693// [---]694let iter =695group_by_values_iter_window_behind_t(period, offset, time, closed_window, tu, tz);696iter.map(|result| result.map(|(offset, len)| [offset, len]))697.collect::<PolarsResult<_>>()698}699// partial lookbehind700// this one is still single threaded701// can make it parallel later, its a bit more complicated because the boundaries are unknown702// window is with -1 periods of t703// ----t---704// [---]705else {706let iter = group_by_values_iter_partial_lookbehind(707period,708offset,709time,710closed_window,711tu,712tz,713);714iter.map(|result| result.map(|(offset, len)| [offset, len]))715.collect::<PolarsResult<_>>()716}717} else if !offset.is_zero()718|| closed_window == ClosedWindow::Right719|| closed_window == ClosedWindow::None720{721// window is completely ahead of t and t itself is not a member722// --t-----------723// [---]724725if !run_parallel {726let vecs = group_by_values_iter_lookahead_collected(727period,728offset,729time,730closed_window,731tu,732tz,7330,734None,735)?;736return Ok(GroupsSlice::from(vecs));737}738739POOL.install(|| {740let vals = thread_offsets741.par_iter()742.copied()743.map(|(base_offset, len)| {744let lower_bound = base_offset;745let upper_bound = base_offset + len;746group_by_values_iter_lookahead_collected(747period,748offset,749time,750closed_window,751tu,752tz,753lower_bound,754Some(upper_bound),755)756})757.collect::<PolarsResult<Vec<_>>>()?;758Ok(flatten_par(&vals))759})760} else {761if !run_parallel {762let vecs = group_by_values_iter_lookahead_collected(763period,764offset,765time,766closed_window,767tu,768tz,7690,770None,771)?;772return Ok(GroupsSlice::from(vecs));773}774775// Offset is 0 and window is closed on the left:776// it must be that the window starts at t and t is a member777// --t-----------778// [---]779POOL.install(|| {780let vals = thread_offsets781.par_iter()782.copied()783.map(|(base_offset, len)| {784let lower_bound = base_offset;785let upper_bound = base_offset + len;786group_by_values_iter_lookahead_collected(787period,788offset,789time,790closed_window,791tu,792tz,793lower_bound,794Some(upper_bound),795)796})797.collect::<PolarsResult<Vec<_>>>()?;798Ok(flatten_par(&vals))799})800}801}802803pub struct RollingWindower {804period: Duration,805offset: Duration,806closed: ClosedWindow,807808add: fn(&Duration, i64, Option<&Tz>) -> PolarsResult<i64>,809tz: Option<Tz>,810811start: IdxSize,812end: IdxSize,813length: IdxSize,814815active: VecDeque<ActiveWindow>,816}817818struct ActiveWindow {819start: i64,820end: i64,821}822823impl ActiveWindow {824#[inline(always)]825fn above_lower_bound(&self, t: i64, closed: ClosedWindow) -> bool {826(t > self.start)827| (matches!(closed, ClosedWindow::Left | ClosedWindow::Both) & (t == self.start))828}829830#[inline(always)]831fn below_upper_bound(&self, t: i64, closed: ClosedWindow) -> bool {832(t < self.end)833| (matches!(closed, ClosedWindow::Right | ClosedWindow::Both) & (t == self.end))834}835}836837fn skip_in_2d_list(l: &[&[i64]], mut n: usize) -> (usize, usize) {838let mut y = 0;839while y < l.len() && (n >= l[y].len() || l[y].is_empty()) {840n -= l[y].len();841y += 1;842}843assert!(n == 0 || y < l.len());844(n, y)845}846fn increment_2d(x: &mut usize, y: &mut usize, l: &[&[i64]]) {847*x += 1;848while *y < l.len() && *x == l[*y].len() {849*y += 1;850*x = 0;851}852}853854impl RollingWindower {855pub fn new(856period: Duration,857offset: Duration,858closed: ClosedWindow,859tu: TimeUnit,860tz: Option<Tz>,861) -> Self {862Self {863period,864offset,865closed,866867add: match tu {868TimeUnit::Nanoseconds => Duration::add_ns,869TimeUnit::Microseconds => Duration::add_us,870TimeUnit::Milliseconds => Duration::add_ms,871},872tz,873874start: 0,875end: 0,876length: 0,877878active: Default::default(),879}880}881882/// Insert new values into the windower.883///884/// This should be given all the old values that were not processed yet.885pub fn insert(886&mut self,887time: &[&[i64]],888windows: &mut Vec<[IdxSize; 2]>,889) -> PolarsResult<IdxSize> {890let (mut i_x, mut i_y) = skip_in_2d_list(time, (self.length - self.start) as usize);891let (mut s_x, mut s_y) = skip_in_2d_list(time, 0); // skip over empty lists892let (mut e_x, mut e_y) = skip_in_2d_list(time, (self.end - self.start) as usize);893894let time_start = self.start;895let mut i = self.length;896while i_y < time.len() {897let t = time[i_y][i_x];898let window_start = (self.add)(&self.offset, t, self.tz.as_ref())?;899// For datetime arithmetic, it does *NOT* hold 0 + a - a == 0. Therefore, we make sure900// that if `offset` and `period` are inverses we keep the `t`.901let window_end = if self.offset == -self.period {902t903} else {904(self.add)(&self.period, window_start, self.tz.as_ref())?905};906907self.active.push_back(ActiveWindow {908start: window_start,909end: window_end,910});911912while let Some(w) = self.active.front() {913if w.below_upper_bound(t, self.closed) {914break;915}916917let w = self.active.pop_front().unwrap();918while self.start < i && !w.above_lower_bound(time[s_y][s_x], self.closed) {919increment_2d(&mut s_x, &mut s_y, time);920self.start += 1;921}922while self.end < i && w.below_upper_bound(time[e_y][e_x], self.closed) {923increment_2d(&mut e_x, &mut e_y, time);924self.end += 1;925}926windows.push([self.start, self.end - self.start]);927}928929increment_2d(&mut i_x, &mut i_y, time);930i += 1;931}932933self.length = i;934Ok(self.start - time_start)935}936937/// Process all remaining items and signal that no more items are coming.938pub fn finalize(&mut self, time: &[&[i64]], windows: &mut Vec<[IdxSize; 2]>) {939assert_eq!(940time.iter().map(|t| t.len()).sum::<usize>() as IdxSize,941self.length - self.start942);943944let (mut s_x, mut s_y) = skip_in_2d_list(time, 0);945let (mut e_x, mut e_y) = skip_in_2d_list(time, (self.end - self.start) as usize);946947windows.extend(self.active.drain(..).map(|w| {948while self.start < self.length && !w.above_lower_bound(time[s_y][s_x], self.closed) {949increment_2d(&mut s_x, &mut s_y, time);950self.start += 1;951}952while self.end < self.length && w.below_upper_bound(time[e_y][e_x], self.closed) {953increment_2d(&mut e_x, &mut e_y, time);954self.end += 1;955}956[self.start, self.end - self.start]957}));958959self.start = 0;960self.end = 0;961self.length = 0;962}963964pub fn reset(&mut self) {965self.active.clear();966self.start = 0;967self.end = 0;968self.length = 0;969}970}971972#[derive(Debug)]973struct ActiveDynWindow {974start: IdxSize,975lower_bound: i64,976upper_bound: i64,977}978979#[inline(always)]980fn is_above_lower_bound(t: i64, lb: i64, closed: ClosedWindow) -> bool {981(t > lb) | (matches!(closed, ClosedWindow::Left | ClosedWindow::Both) & (t == lb))982}983#[inline(always)]984fn is_below_upper_bound(t: i64, ub: i64, closed: ClosedWindow) -> bool {985(t < ub) | (matches!(closed, ClosedWindow::Right | ClosedWindow::Both) & (t == ub))986}987988pub struct GroupByDynamicWindower {989period: Duration,990offset: Duration,991every: Duration,992closed: ClosedWindow,993994start_by: StartBy,995996add: fn(&Duration, i64, Option<&Tz>) -> PolarsResult<i64>,997// Not-to-exceed duration (upper limit).998nte: fn(&Duration) -> i64,999tu: TimeUnit,1000tz: Option<Tz>,10011002include_lower_bound: bool,1003include_upper_bound: bool,10041005num_seen: IdxSize,1006next_lower_bound: i64,1007active: VecDeque<ActiveDynWindow>,1008}10091010impl GroupByDynamicWindower {1011#[expect(clippy::too_many_arguments)]1012pub fn new(1013period: Duration,1014offset: Duration,1015every: Duration,1016start_by: StartBy,1017closed: ClosedWindow,1018tu: TimeUnit,1019tz: Option<Tz>,1020include_lower_bound: bool,1021include_upper_bound: bool,1022) -> Self {1023Self {1024period,1025offset,1026every,1027closed,10281029start_by,10301031add: match tu {1032TimeUnit::Nanoseconds => Duration::add_ns,1033TimeUnit::Microseconds => Duration::add_us,1034TimeUnit::Milliseconds => Duration::add_ms,1035},1036nte: match tu {1037TimeUnit::Nanoseconds => Duration::nte_duration_ns,1038TimeUnit::Microseconds => Duration::nte_duration_us,1039TimeUnit::Milliseconds => Duration::nte_duration_ms,1040},1041tu,1042tz,10431044include_lower_bound,1045include_upper_bound,10461047num_seen: 0,1048next_lower_bound: 0,1049active: Default::default(),1050}1051}10521053pub fn find_first_window_around(1054&self,1055mut lower_bound: i64,1056target: i64,1057) -> PolarsResult<Result<(i64, i64), i64>> {1058let mut upper_bound = (self.add)(&self.period, lower_bound, self.tz.as_ref())?;1059while !is_below_upper_bound(target, upper_bound, self.closed) {1060let gap = target - lower_bound;1061let nth = match self.tu {1062TimeUnit::Nanoseconds1063if gap > self.every.nte_duration_ns() + self.period.nte_duration_ns() =>1064{1065((gap - self.period.nte_duration_ns()) as usize)1066/ (self.every.nte_duration_ns() as usize)1067},1068TimeUnit::Microseconds1069if gap > self.every.nte_duration_us() + self.period.nte_duration_us() =>1070{1071((gap - self.period.nte_duration_us()) as usize)1072/ (self.every.nte_duration_us() as usize)1073},1074TimeUnit::Milliseconds1075if gap > self.every.nte_duration_ms() + self.period.nte_duration_ms() =>1076{1077((gap - self.period.nte_duration_ms()) as usize)1078/ (self.every.nte_duration_ms() as usize)1079},1080_ => 1,1081};10821083let nth: i64 = nth.try_into().unwrap();1084lower_bound = (self.add)(&(self.every * nth), lower_bound, self.tz.as_ref())?;1085upper_bound = (self.add)(&self.period, lower_bound, self.tz.as_ref())?;1086}10871088if is_above_lower_bound(target, lower_bound, self.closed) {1089Ok(Ok((lower_bound, upper_bound)))1090} else {1091Ok(Err(lower_bound))1092}1093}10941095fn start_lower_bound(&self, first: i64) -> PolarsResult<i64> {1096match self.start_by {1097StartBy::DataPoint => Ok(first),1098StartBy::WindowBound => {1099let get_earliest_bounds = match self.tu {1100TimeUnit::Nanoseconds => Window::get_earliest_bounds_ns,1101TimeUnit::Microseconds => Window::get_earliest_bounds_us,1102TimeUnit::Milliseconds => Window::get_earliest_bounds_ms,1103};1104Ok((get_earliest_bounds)(1105&Window::new(self.every, self.period, self.offset),1106first,1107self.closed,1108self.tz.as_ref(),1109)?1110.start)1111},1112_ => {1113{1114#[allow(clippy::type_complexity)]1115let (from, to): (1116fn(i64) -> NaiveDateTime,1117fn(NaiveDateTime) -> i64,1118) = match self.tu {1119TimeUnit::Nanoseconds => {1120(timestamp_ns_to_datetime, datetime_to_timestamp_ns)1121},1122TimeUnit::Microseconds => {1123(timestamp_us_to_datetime, datetime_to_timestamp_us)1124},1125TimeUnit::Milliseconds => {1126(timestamp_ms_to_datetime, datetime_to_timestamp_ms)1127},1128};1129// find beginning of the week.1130let dt = from(first);1131match self.tz.as_ref() {1132#[cfg(feature = "timezones")]1133Some(tz) => {1134let dt = tz.from_utc_datetime(&dt);1135let dt = dt.beginning_of_week();1136let dt = dt.naive_utc();1137let start = to(dt);1138// adjust start of the week based on given day of the week1139let start = (self.add)(1140&Duration::parse(&format!("{}d", self.start_by.weekday().unwrap())),1141start,1142self.tz.as_ref(),1143)?;1144// apply the 'offset'1145let start = (self.add)(&self.offset, start, self.tz.as_ref())?;1146// make sure the first datapoint has a chance to be included1147// and compute the end of the window defined by the 'period'1148Ok(ensure_t_in_or_in_front_of_window(1149self.every,1150first,1151self.add,1152self.nte,1153self.period,1154start,1155self.closed,1156self.tz.as_ref(),1157)?1158.start)1159},1160_ => {1161let tz = chrono::Utc;1162let dt = dt.and_local_timezone(tz).unwrap();1163let dt = dt.beginning_of_week();1164let dt = dt.naive_utc();1165let start = to(dt);1166// adjust start of the week based on given day of the week1167let start = (self.add)(1168&Duration::parse(&format!("{}d", self.start_by.weekday().unwrap())),1169start,1170None,1171)1172.unwrap();1173// apply the 'offset'1174let start = (self.add)(&self.offset, start, None).unwrap();1175// make sure the first datapoint has a chance to be included1176// and compute the end of the window defined by the 'period'1177Ok(ensure_t_in_or_in_front_of_window(1178self.every,1179first,1180self.add,1181self.nte,1182self.period,1183start,1184self.closed,1185None,1186)?1187.start)1188},1189}1190}1191},1192}1193}11941195pub fn insert(1196&mut self,1197time: &[i64],1198windows: &mut Vec<[IdxSize; 2]>,1199lower_bound: &mut Vec<i64>,1200upper_bound: &mut Vec<i64>,1201) -> PolarsResult<()> {1202if time.is_empty() {1203return Ok(());1204}12051206if self.num_seen == 0 {1207debug_assert!(self.active.is_empty());1208self.next_lower_bound = self.start_lower_bound(time[0])?;1209}12101211for &t in time {1212while let Some(w) = self.active.front()1213&& !is_below_upper_bound(t, w.upper_bound, self.closed)1214{1215let w = self.active.pop_front().unwrap();1216windows.push([w.start, self.num_seen - w.start]);1217if self.include_lower_bound {1218lower_bound.push(w.lower_bound);1219}1220if self.include_upper_bound {1221upper_bound.push(w.upper_bound);1222}1223}12241225while is_above_lower_bound(t, self.next_lower_bound, self.closed) {1226match self.find_first_window_around(self.next_lower_bound, t)? {1227Ok((lower_bound, upper_bound)) => {1228self.next_lower_bound =1229(self.add)(&self.every, lower_bound, self.tz.as_ref())?;1230self.active.push_back(ActiveDynWindow {1231start: self.num_seen,1232lower_bound,1233upper_bound,1234});1235},1236Err(lower_bound) => {1237self.next_lower_bound = lower_bound;1238break;1239},1240}1241}12421243self.num_seen += 11244}12451246Ok(())1247}12481249pub fn lowest_needed_index(&self) -> IdxSize {1250self.active.front().map_or(self.num_seen, |w| w.start)1251}12521253pub fn finalize(1254&mut self,1255windows: &mut Vec<[IdxSize; 2]>,1256lower_bound: &mut Vec<i64>,1257upper_bound: &mut Vec<i64>,1258) {1259for w in self.active.drain(..) {1260windows.push([w.start, self.num_seen - w.start]);1261if self.include_lower_bound {1262lower_bound.push(w.lower_bound);1263}1264if self.include_upper_bound {1265upper_bound.push(w.upper_bound);1266}1267}12681269self.next_lower_bound = 0;1270self.num_seen = 0;1271}12721273pub fn num_seen(&self) -> IdxSize {1274self.num_seen1275}12761277pub fn time_unit(&self) -> TimeUnit {1278self.tu1279}1280}12811282#[cfg(test)]1283mod test {1284use super::*;12851286#[test]1287fn test_prune_duplicates() {1288// |--|------------|----|---------|1289// 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 101290let time = &[0, 1, 1, 2, 2, 2, 3, 4, 5, 6, 5];1291let mut splits = vec![(0, 2), (2, 4), (6, 2), (8, 3)];1292prune_splits_on_duplicates(time, &mut splits);1293assert_eq!(splits, &[(0, 6), (6, 2), (8, 3)]);1294}1295}129612971298