Path: blob/main/crates/polars-time/src/windows/group_by.rs
6939 views
use arrow::legacy::time_zone::Tz;1use arrow::trusted_len::TrustedLen;2use polars_core::POOL;3use polars_core::prelude::*;4use polars_core::utils::_split_offsets;5use polars_core::utils::flatten::flatten_par;6use rayon::prelude::*;7#[cfg(feature = "serde")]8use serde::{Deserialize, Serialize};9use strum_macros::IntoStaticStr;1011use crate::prelude::*;1213#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, IntoStaticStr)]14#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]15#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]16#[strum(serialize_all = "snake_case")]17pub enum ClosedWindow {18Left,19Right,20Both,21None,22}2324#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, IntoStaticStr)]25#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]26#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]27#[strum(serialize_all = "snake_case")]28pub enum Label {29Left,30Right,31DataPoint,32}3334#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, IntoStaticStr)]35#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]36#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]37#[strum(serialize_all = "snake_case")]38pub enum StartBy {39WindowBound,40DataPoint,41/// only useful if periods are weekly42Monday,43Tuesday,44Wednesday,45Thursday,46Friday,47Saturday,48Sunday,49}5051impl Default for StartBy {52fn default() -> Self {53Self::WindowBound54}55}5657impl StartBy {58pub fn weekday(&self) -> Option<u32> {59match self {60StartBy::Monday => Some(0),61StartBy::Tuesday => Some(1),62StartBy::Wednesday => Some(2),63StartBy::Thursday => Some(3),64StartBy::Friday => Some(4),65StartBy::Saturday => Some(5),66StartBy::Sunday => Some(6),67_ => None,68}69}70}7172#[allow(clippy::too_many_arguments)]73fn update_groups_and_bounds(74bounds_iter: BoundsIter<'_>,75mut start: usize,76time: &[i64],77closed_window: ClosedWindow,78include_lower_bound: bool,79include_upper_bound: bool,80lower_bound: &mut Vec<i64>,81upper_bound: &mut Vec<i64>,82groups: &mut Vec<[IdxSize; 2]>,83) {84'bounds: for bi in bounds_iter {85// find starting point of window86for &t in &time[start..time.len().saturating_sub(1)] {87// the window is behind the time values.88if bi.is_future(t, closed_window) {89continue 'bounds;90}91if bi.is_member_entry(t, closed_window) {92break;93}94start += 1;95}9697// find members of this window98let mut end = start;99100// last value isn't always added101if end == time.len() - 1 {102let t = time[end];103if bi.is_member(t, closed_window) {104if include_lower_bound {105lower_bound.push(bi.start);106}107if include_upper_bound {108upper_bound.push(bi.stop);109}110groups.push([end as IdxSize, 1])111}112continue;113}114for &t in &time[end..] {115if !bi.is_member_exit(t, closed_window) {116break;117}118end += 1;119}120let len = end - start;121122if include_lower_bound {123lower_bound.push(bi.start);124}125if include_upper_bound {126upper_bound.push(bi.stop);127}128groups.push([start as IdxSize, len as IdxSize])129}130}131132/// Window boundaries are created based on the given `Window`, which is defined by:133/// - every134/// - period135/// - offset136///137/// And every window boundary we search for the values that fit that window by the given138/// `ClosedWindow`. The groups are return as `GroupTuples` together with the lower bound and upper139/// bound timestamps. These timestamps indicate the start (lower) and end (upper) of the window of140/// that group.141///142/// If `include_boundaries` is `false` those `lower` and `upper` vectors will be empty.143#[allow(clippy::too_many_arguments)]144pub fn group_by_windows(145window: Window,146time: &[i64],147closed_window: ClosedWindow,148tu: TimeUnit,149tz: &Option<TimeZone>,150include_lower_bound: bool,151include_upper_bound: bool,152start_by: StartBy,153) -> PolarsResult<(GroupsSlice, Vec<i64>, Vec<i64>)> {154let start = time[0];155// the boundary we define here is not yet correct. It doesn't take 'period' into account156// and it doesn't have the proper starting point. This boundary is used as a proxy to find157// the proper 'boundary' in 'window.get_overlapping_bounds_iter'.158let boundary = if time.len() > 1 {159// +1 because left or closed boundary could match the next window if it is on the boundary160let stop = time[time.len() - 1] + 1;161Bounds::new_checked(start, stop)162} else {163let stop = start + 1;164Bounds::new_checked(start, stop)165};166167let size = {168match tu {169TimeUnit::Nanoseconds => window.estimate_overlapping_bounds_ns(boundary),170TimeUnit::Microseconds => window.estimate_overlapping_bounds_us(boundary),171TimeUnit::Milliseconds => window.estimate_overlapping_bounds_ms(boundary),172}173};174let size_lower = if include_lower_bound { size } else { 0 };175let size_upper = if include_upper_bound { size } else { 0 };176let mut lower_bound = Vec::with_capacity(size_lower);177let mut upper_bound = Vec::with_capacity(size_upper);178179let mut groups = Vec::with_capacity(size);180let start_offset = 0;181182match tz {183#[cfg(feature = "timezones")]184Some(tz) => {185update_groups_and_bounds(186window.get_overlapping_bounds_iter(187boundary,188closed_window,189tu,190tz.parse::<Tz>().ok().as_ref(),191start_by,192)?,193start_offset,194time,195closed_window,196include_lower_bound,197include_upper_bound,198&mut lower_bound,199&mut upper_bound,200&mut groups,201);202},203_ => {204update_groups_and_bounds(205window.get_overlapping_bounds_iter(boundary, closed_window, tu, None, start_by)?,206start_offset,207time,208closed_window,209include_lower_bound,210include_upper_bound,211&mut lower_bound,212&mut upper_bound,213&mut groups,214);215},216};217218Ok((groups, lower_bound, upper_bound))219}220221// t is right at the end of the window222// ------t---223// [------]224#[inline]225#[allow(clippy::too_many_arguments)]226pub(crate) fn group_by_values_iter_lookbehind(227period: Duration,228offset: Duration,229time: &[i64],230closed_window: ClosedWindow,231tu: TimeUnit,232tz: Option<Tz>,233start_offset: usize,234upper_bound: Option<usize>,235) -> PolarsResult<impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_> {236debug_assert!(offset.duration_ns() == period.duration_ns());237debug_assert!(offset.negative);238let add = match tu {239TimeUnit::Nanoseconds => Duration::add_ns,240TimeUnit::Microseconds => Duration::add_us,241TimeUnit::Milliseconds => Duration::add_ms,242};243244let upper_bound = upper_bound.unwrap_or(time.len());245// Use binary search to find the initial start as that is behind.246let mut start = if let Some(&t) = time.get(start_offset) {247let lower = add(&offset, t, tz.as_ref())?;248// We have `period == -offset`, so `t + offset + period` is equal to `t`,249// and `upper` is trivially equal to `t` itself. Using the trivial calculation,250// instead of `upper = lower + period`, avoids issues around251// `t - 1mo + 1mo` not round-tripping.252let upper = t;253let b = Bounds::new(lower, upper);254let slice = &time[..start_offset];255slice.partition_point(|v| !b.is_member(*v, closed_window))256} else {2570258};259let mut end = start;260let mut last = time[start_offset];261Ok(time[start_offset..upper_bound]262.iter()263.enumerate()264.map(move |(mut i, t)| {265// Fast path for duplicates.266if *t == last && i > 0 {267let len = end - start;268let offset = start as IdxSize;269return Ok((offset, len as IdxSize));270}271last = *t;272i += start_offset;273274let lower = add(&offset, *t, tz.as_ref())?;275let upper = *t;276277let b = Bounds::new(lower, upper);278279for &t in unsafe { time.get_unchecked(start..i) } {280if b.is_member_entry(t, closed_window) {281break;282}283start += 1;284}285286// faster path, check if `i` is member.287if b.is_member_exit(*t, closed_window) {288end = i;289} else {290end = std::cmp::max(end, start);291}292// we still must loop to consume duplicates293for &t in unsafe { time.get_unchecked(end..) } {294if !b.is_member_exit(t, closed_window) {295break;296}297end += 1;298}299300let len = end - start;301let offset = start as IdxSize;302303Ok((offset, len as IdxSize))304}))305}306307// this one is correct for all lookbehind/lookaheads, but is slower308// window is completely behind t and t itself is not a member309// ---------------t---310// [---]311pub(crate) fn group_by_values_iter_window_behind_t(312period: Duration,313offset: Duration,314time: &[i64],315closed_window: ClosedWindow,316tu: TimeUnit,317tz: Option<Tz>,318) -> impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_ {319let add = match tu {320TimeUnit::Nanoseconds => Duration::add_ns,321TimeUnit::Microseconds => Duration::add_us,322TimeUnit::Milliseconds => Duration::add_ms,323};324325let mut start = 0;326let mut end = start;327let mut last = time[0];328let mut started = false;329time.iter().map(move |lower| {330// Fast path for duplicates.331if *lower == last && started {332let len = end - start;333let offset = start as IdxSize;334return Ok((offset, len as IdxSize));335}336last = *lower;337started = true;338let lower = add(&offset, *lower, tz.as_ref())?;339let upper = add(&period, lower, tz.as_ref())?;340341let b = Bounds::new(lower, upper);342if b.is_future(time[0], closed_window) {343Ok((0, 0))344} else {345for &t in &time[start..] {346if b.is_member_entry(t, closed_window) {347break;348}349start += 1;350}351352end = std::cmp::max(start, end);353for &t in &time[end..] {354if !b.is_member_exit(t, closed_window) {355break;356}357end += 1;358}359360let len = end - start;361let offset = start as IdxSize;362363Ok((offset, len as IdxSize))364}365})366}367368// window is with -1 periods of t369// ----t---370// [---]371pub(crate) fn group_by_values_iter_partial_lookbehind(372period: Duration,373offset: Duration,374time: &[i64],375closed_window: ClosedWindow,376tu: TimeUnit,377tz: Option<Tz>,378) -> impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_ {379let add = match tu {380TimeUnit::Nanoseconds => Duration::add_ns,381TimeUnit::Microseconds => Duration::add_us,382TimeUnit::Milliseconds => Duration::add_ms,383};384385let mut start = 0;386let mut end = start;387let mut last = time[0];388time.iter().enumerate().map(move |(i, lower)| {389// Fast path for duplicates.390if *lower == last && i > 0 {391let len = end - start;392let offset = start as IdxSize;393return Ok((offset, len as IdxSize));394}395last = *lower;396397let lower = add(&offset, *lower, tz.as_ref())?;398let upper = add(&period, lower, tz.as_ref())?;399400let b = Bounds::new(lower, upper);401402for &t in &time[start..] {403if b.is_member_entry(t, closed_window) || start == i {404break;405}406start += 1;407}408409end = std::cmp::max(start, end);410for &t in &time[end..] {411if !b.is_member_exit(t, closed_window) {412break;413}414end += 1;415}416417let len = end - start;418let offset = start as IdxSize;419420Ok((offset, len as IdxSize))421})422}423424#[allow(clippy::too_many_arguments)]425// window is completely ahead of t and t itself is not a member426// --t-----------427// [---]428pub(crate) fn group_by_values_iter_lookahead(429period: Duration,430offset: Duration,431time: &[i64],432closed_window: ClosedWindow,433tu: TimeUnit,434tz: Option<Tz>,435start_offset: usize,436upper_bound: Option<usize>,437) -> impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_ {438let upper_bound = upper_bound.unwrap_or(time.len());439440let add = match tu {441TimeUnit::Nanoseconds => Duration::add_ns,442TimeUnit::Microseconds => Duration::add_us,443TimeUnit::Milliseconds => Duration::add_ms,444};445let mut start = start_offset;446let mut end = start;447448let mut last = time[start_offset];449let mut started = false;450time[start_offset..upper_bound].iter().map(move |lower| {451// Fast path for duplicates.452if *lower == last && started {453let len = end - start;454let offset = start as IdxSize;455return Ok((offset, len as IdxSize));456}457started = true;458last = *lower;459460let lower = add(&offset, *lower, tz.as_ref())?;461let upper = add(&period, lower, tz.as_ref())?;462463let b = Bounds::new(lower, upper);464465for &t in &time[start..] {466if b.is_member_entry(t, closed_window) {467break;468}469start += 1;470}471472end = std::cmp::max(start, end);473for &t in &time[end..] {474if !b.is_member_exit(t, closed_window) {475break;476}477end += 1;478}479480let len = end - start;481let offset = start as IdxSize;482483Ok((offset, len as IdxSize))484})485}486487#[cfg(feature = "rolling_window_by")]488#[inline]489pub(crate) fn group_by_values_iter(490period: Duration,491time: &[i64],492closed_window: ClosedWindow,493tu: TimeUnit,494tz: Option<Tz>,495) -> PolarsResult<impl TrustedLen<Item = PolarsResult<(IdxSize, IdxSize)>> + '_> {496let mut offset = period;497offset.negative = true;498// t is at the right endpoint of the window499group_by_values_iter_lookbehind(period, offset, time, closed_window, tu, tz, 0, None)500}501502/// Checks if the boundary elements don't split on duplicates.503/// If they do we remove them504fn prune_splits_on_duplicates(time: &[i64], thread_offsets: &mut Vec<(usize, usize)>) {505let is_valid = |window: &[(usize, usize)]| -> bool {506debug_assert_eq!(window.len(), 2);507let left_block_end = window[0].0 + window[0].1.saturating_sub(1);508let right_block_start = window[1].0;509time[left_block_end] != time[right_block_start]510};511512if time.is_empty() || thread_offsets.len() <= 1 || thread_offsets.windows(2).all(is_valid) {513return;514}515516let mut new = vec![];517for window in thread_offsets.windows(2) {518let this_block_is_valid = is_valid(window);519if this_block_is_valid {520// Only push left block521new.push(window[0])522}523}524// Check last block525if thread_offsets.len().is_multiple_of(2) {526let window = &thread_offsets[thread_offsets.len() - 2..];527if is_valid(window) {528new.push(thread_offsets[thread_offsets.len() - 1])529}530}531// We pruned invalid blocks, now we must correct the lengths.532if new.len() <= 1 {533new = vec![(0, time.len())];534} else {535let mut previous_start = time.len();536for window in new.iter_mut().rev() {537window.1 = previous_start - window.0;538previous_start = window.0;539}540new[0].0 = 0;541new[0].1 = new[1].0;542debug_assert_eq!(new.iter().map(|w| w.1).sum::<usize>(), time.len());543// Call again to check.544prune_splits_on_duplicates(time, &mut new)545}546std::mem::swap(thread_offsets, &mut new);547}548549#[allow(clippy::too_many_arguments)]550fn group_by_values_iter_lookbehind_collected(551period: Duration,552offset: Duration,553time: &[i64],554closed_window: ClosedWindow,555tu: TimeUnit,556tz: Option<Tz>,557start_offset: usize,558upper_bound: Option<usize>,559) -> PolarsResult<Vec<[IdxSize; 2]>> {560let iter = group_by_values_iter_lookbehind(561period,562offset,563time,564closed_window,565tu,566tz,567start_offset,568upper_bound,569)?;570iter.map(|result| result.map(|(offset, len)| [offset, len]))571.collect::<PolarsResult<Vec<_>>>()572}573574#[allow(clippy::too_many_arguments)]575pub(crate) fn group_by_values_iter_lookahead_collected(576period: Duration,577offset: Duration,578time: &[i64],579closed_window: ClosedWindow,580tu: TimeUnit,581tz: Option<Tz>,582start_offset: usize,583upper_bound: Option<usize>,584) -> PolarsResult<Vec<[IdxSize; 2]>> {585let iter = group_by_values_iter_lookahead(586period,587offset,588time,589closed_window,590tu,591tz,592start_offset,593upper_bound,594);595iter.map(|result| result.map(|(offset, len)| [offset as IdxSize, len]))596.collect::<PolarsResult<Vec<_>>>()597}598599/// Different from `group_by_windows`, where define window buckets and search which values fit that600/// pre-defined bucket.601///602/// This function defines every window based on the:603/// - timestamp (lower bound)604/// - timestamp + period (upper bound)605/// where timestamps are the individual values in the array `time`606pub fn group_by_values(607period: Duration,608offset: Duration,609time: &[i64],610closed_window: ClosedWindow,611tu: TimeUnit,612tz: Option<Tz>,613) -> PolarsResult<GroupsSlice> {614if time.is_empty() {615return Ok(GroupsSlice::from(vec![]));616}617618let mut thread_offsets = _split_offsets(time.len(), POOL.current_num_threads());619// there are duplicates in the splits, so we opt for a single partition620prune_splits_on_duplicates(time, &mut thread_offsets);621622// If we start from within parallel work we will do this single threaded.623let run_parallel = !POOL.current_thread_has_pending_tasks().unwrap_or(false);624625// we have a (partial) lookbehind window626if offset.negative && !offset.is_zero() {627// lookbehind628if offset.duration_ns() == period.duration_ns() {629// t is right at the end of the window630// ------t---631// [------]632if !run_parallel {633let vecs = group_by_values_iter_lookbehind_collected(634period,635offset,636time,637closed_window,638tu,639tz,6400,641None,642)?;643return Ok(GroupsSlice::from(vecs));644}645646POOL.install(|| {647let vals = thread_offsets648.par_iter()649.copied()650.map(|(base_offset, len)| {651let upper_bound = base_offset + len;652group_by_values_iter_lookbehind_collected(653period,654offset,655time,656closed_window,657tu,658tz,659base_offset,660Some(upper_bound),661)662})663.collect::<PolarsResult<Vec<_>>>()?;664Ok(flatten_par(&vals))665})666} else if ((offset.duration_ns() >= period.duration_ns())667&& matches!(closed_window, ClosedWindow::Left | ClosedWindow::None))668|| ((offset.duration_ns() > period.duration_ns())669&& matches!(closed_window, ClosedWindow::Right | ClosedWindow::Both))670{671// window is completely behind t and t itself is not a member672// ---------------t---673// [---]674let iter =675group_by_values_iter_window_behind_t(period, offset, time, closed_window, tu, tz);676iter.map(|result| result.map(|(offset, len)| [offset, len]))677.collect::<PolarsResult<_>>()678}679// partial lookbehind680// this one is still single threaded681// can make it parallel later, its a bit more complicated because the boundaries are unknown682// window is with -1 periods of t683// ----t---684// [---]685else {686let iter = group_by_values_iter_partial_lookbehind(687period,688offset,689time,690closed_window,691tu,692tz,693);694iter.map(|result| result.map(|(offset, len)| [offset, len]))695.collect::<PolarsResult<_>>()696}697} else if !offset.is_zero()698|| closed_window == ClosedWindow::Right699|| closed_window == ClosedWindow::None700{701// window is completely ahead of t and t itself is not a member702// --t-----------703// [---]704705if !run_parallel {706let vecs = group_by_values_iter_lookahead_collected(707period,708offset,709time,710closed_window,711tu,712tz,7130,714None,715)?;716return Ok(GroupsSlice::from(vecs));717}718719POOL.install(|| {720let vals = thread_offsets721.par_iter()722.copied()723.map(|(base_offset, len)| {724let lower_bound = base_offset;725let upper_bound = base_offset + len;726group_by_values_iter_lookahead_collected(727period,728offset,729time,730closed_window,731tu,732tz,733lower_bound,734Some(upper_bound),735)736})737.collect::<PolarsResult<Vec<_>>>()?;738Ok(flatten_par(&vals))739})740} else {741if !run_parallel {742let vecs = group_by_values_iter_lookahead_collected(743period,744offset,745time,746closed_window,747tu,748tz,7490,750None,751)?;752return Ok(GroupsSlice::from(vecs));753}754755// Offset is 0 and window is closed on the left:756// it must be that the window starts at t and t is a member757// --t-----------758// [---]759POOL.install(|| {760let vals = thread_offsets761.par_iter()762.copied()763.map(|(base_offset, len)| {764let lower_bound = base_offset;765let upper_bound = base_offset + len;766group_by_values_iter_lookahead_collected(767period,768offset,769time,770closed_window,771tu,772tz,773lower_bound,774Some(upper_bound),775)776})777.collect::<PolarsResult<Vec<_>>>()?;778Ok(flatten_par(&vals))779})780}781}782783#[cfg(test)]784mod test {785use super::*;786787#[test]788fn test_prune_duplicates() {789// |--|------------|----|---------|790// 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10791let time = &[0, 1, 1, 2, 2, 2, 3, 4, 5, 6, 5];792let mut splits = vec![(0, 2), (2, 4), (6, 2), (8, 3)];793prune_splits_on_duplicates(time, &mut splits);794assert_eq!(splits, &[(0, 6), (6, 2), (8, 3)]);795}796}797798799