Path: blob/main/crates/polars-time/src/windows/window.rs
8424 views
use arrow::legacy::time_zone::Tz;1use arrow::temporal_conversions::*;2use chrono::NaiveDateTime;3#[cfg(feature = "timezones")]4use chrono::TimeZone;5use now::DateTimeNow;6use polars_core::prelude::*;78use crate::prelude::*;910/// Ensure that earliest datapoint (`t`) is in, or in front of, first window.11///12/// For example, if we have:13///14/// - first datapoint is `2020-01-01 01:00`15/// - `every` is `'1d'`16/// - `period` is `'2d'`17/// - `offset` is `'6h'`18///19/// then truncating the earliest datapoint by `every` and adding `offset` results20/// in the window `[2020-01-01 06:00, 2020-01-03 06:00)`. To give the earliest datapoint21/// a chance of being included, we then shift the window back by `every` to22/// `[2019-12-31 06:00, 2020-01-02 06:00)`.23#[allow(clippy::too_many_arguments)]24pub(crate) fn ensure_t_in_or_in_front_of_window(25mut every: Duration,26t: i64,27offset_fn: fn(&Duration, i64, Option<&Tz>) -> PolarsResult<i64>,28nte_duration_fn: fn(&Duration) -> i64,29period: Duration,30mut start: i64,31closed_window: ClosedWindow,32tz: Option<&Tz>,33) -> PolarsResult<Bounds> {34every.negative = !every.negative;35let mut stop = offset_fn(&period, start, tz)?;3637while Bounds::new(start, stop).is_past(t, closed_window) {38let mut gap = start - t;39if matches!(closed_window, ClosedWindow::Right | ClosedWindow::None) {40gap += 1;41}42debug_assert!(gap >= 1);4344// Ceil division45let stride = (gap + nte_duration_fn(&every) - 1) / nte_duration_fn(&every);46debug_assert!(stride >= 1);47let stride = std::cmp::max(stride, 1);4849start = offset_fn(&(every * stride), start, tz)?;50stop = offset_fn(&period, start, tz)?;51}52Ok(Bounds::new_checked(start, stop))53}5455/// Represents a window in time56#[derive(Copy, Clone)]57pub struct Window {58// The ith window start is expressed via this equation:59// window_start_i = zero + every * i60// window_stop_i = zero + every * i + period61pub(crate) every: Duration,62pub(crate) period: Duration,63pub offset: Duration,64}6566impl Window {67pub fn new(every: Duration, period: Duration, offset: Duration) -> Self {68debug_assert!(!every.negative);69Self {70every,71period,72offset,73}74}7576/// Truncate the given ns timestamp by the window boundary.77pub fn truncate_ns(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {78self.every.truncate_ns(t, tz)79}8081/// Truncate the given us timestamp by the window boundary.82pub fn truncate_us(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {83self.every.truncate_us(t, tz)84}8586/// Truncate the given ms timestamp by the window boundary.87pub fn truncate_ms(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {88self.every.truncate_ms(t, tz)89}9091/// Round the given ns timestamp by the window boundary.92pub fn round_ns(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {93let t = t + self.every.duration_ns() / 2_i64;94self.truncate_ns(t, tz)95}9697/// Round the given us timestamp by the window boundary.98pub fn round_us(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {99let t = t + self.every.duration_ns()100/ (2 * timeunit_scale(ArrowTimeUnit::Nanosecond, ArrowTimeUnit::Microsecond) as i64);101self.truncate_us(t, tz)102}103104/// Round the given ms timestamp by the window boundary.105pub fn round_ms(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {106let t = t + self.every.duration_ns()107/ (2 * timeunit_scale(ArrowTimeUnit::Nanosecond, ArrowTimeUnit::Millisecond) as i64);108self.truncate_ms(t, tz)109}110111/// returns the bounds for the earliest window bounds112/// that contains the given time t. For underlapping windows that113/// do not contain time t, the window directly after time t will be returned.114pub fn get_earliest_bounds_ns(115&self,116t: i64,117closed_window: ClosedWindow,118tz: Option<&Tz>,119) -> PolarsResult<Bounds> {120let start = self.truncate_ns(t, tz)?;121let start = self.offset.add_ns(start, tz)?;122ensure_t_in_or_in_front_of_window(123self.every,124t,125Duration::add_ns,126Duration::nte_duration_ns,127self.period,128start,129closed_window,130tz,131)132}133134pub fn get_earliest_bounds_us(135&self,136t: i64,137closed_window: ClosedWindow,138tz: Option<&Tz>,139) -> PolarsResult<Bounds> {140let start = self.truncate_us(t, tz)?;141let start = self.offset.add_us(start, tz)?;142ensure_t_in_or_in_front_of_window(143self.every,144t,145Duration::add_us,146Duration::nte_duration_us,147self.period,148start,149closed_window,150tz,151)152}153154pub fn get_earliest_bounds_ms(155&self,156t: i64,157closed_window: ClosedWindow,158tz: Option<&Tz>,159) -> PolarsResult<Bounds> {160let start = self.truncate_ms(t, tz)?;161let start = self.offset.add_ms(start, tz)?;162ensure_t_in_or_in_front_of_window(163self.every,164t,165Duration::add_ms,166Duration::nte_duration_ms,167self.period,168start,169closed_window,170tz,171)172}173174pub(crate) fn estimate_overlapping_bounds_ns(&self, boundary: Bounds) -> usize {175(boundary.duration() / self.every.duration_ns()176+ self.period.duration_ns() / self.every.duration_ns()) as usize177}178179pub(crate) fn estimate_overlapping_bounds_us(&self, boundary: Bounds) -> usize {180(boundary.duration() / self.every.duration_us()181+ self.period.duration_us() / self.every.duration_us()) as usize182}183184pub(crate) fn estimate_overlapping_bounds_ms(&self, boundary: Bounds) -> usize {185(boundary.duration() / self.every.duration_ms()186+ self.period.duration_ms() / self.every.duration_ms()) as usize187}188189pub fn get_overlapping_bounds_iter<'a>(190&'a self,191boundary: Bounds,192closed_window: ClosedWindow,193tu: TimeUnit,194tz: Option<&'a Tz>,195start_by: StartBy,196) -> PolarsResult<BoundsIter<'a>> {197BoundsIter::new(*self, closed_window, boundary, tu, tz, start_by)198}199}200201pub struct BoundsIter<'a> {202window: Window,203// wrapping boundary204boundary: Bounds,205// boundary per window iterator206bi: Bounds,207tu: TimeUnit,208tz: Option<&'a Tz>,209}210impl<'a> BoundsIter<'a> {211fn new(212window: Window,213closed_window: ClosedWindow,214boundary: Bounds,215tu: TimeUnit,216tz: Option<&'a Tz>,217start_by: StartBy,218) -> PolarsResult<Self> {219let bi = match start_by {220StartBy::DataPoint => {221let mut boundary = boundary;222let offset_fn = match tu {223TimeUnit::Nanoseconds => Duration::add_ns,224TimeUnit::Microseconds => Duration::add_us,225TimeUnit::Milliseconds => Duration::add_ms,226};227boundary.stop = offset_fn(&window.period, boundary.start, tz)?;228boundary229},230StartBy::WindowBound => match tu {231TimeUnit::Nanoseconds => {232window.get_earliest_bounds_ns(boundary.start, closed_window, tz)?233},234TimeUnit::Microseconds => {235window.get_earliest_bounds_us(boundary.start, closed_window, tz)?236},237TimeUnit::Milliseconds => {238window.get_earliest_bounds_ms(boundary.start, closed_window, tz)?239},240},241_ => {242{243#[allow(clippy::type_complexity)]244let (from, to, offset_fn, nte_duration_fn): (245fn(i64) -> NaiveDateTime,246fn(NaiveDateTime) -> i64,247fn(&Duration, i64, Option<&Tz>) -> PolarsResult<i64>,248fn(&Duration) -> i64,249) = match tu {250TimeUnit::Nanoseconds => (251timestamp_ns_to_datetime,252datetime_to_timestamp_ns,253Duration::add_ns,254Duration::nte_duration_ns,255),256TimeUnit::Microseconds => (257timestamp_us_to_datetime,258datetime_to_timestamp_us,259Duration::add_us,260Duration::nte_duration_us,261),262TimeUnit::Milliseconds => (263timestamp_ms_to_datetime,264datetime_to_timestamp_ms,265Duration::add_ms,266Duration::nte_duration_ms,267),268};269// find beginning of the week.270let dt = from(boundary.start);271match tz {272#[cfg(feature = "timezones")]273Some(tz) => {274let dt = tz.from_utc_datetime(&dt);275let dt = dt.beginning_of_week();276let dt = dt.naive_utc();277let start = to(dt);278// adjust start of the week based on given day of the week279let start = offset_fn(280&Duration::parse(&format!("{}d", start_by.weekday().unwrap())),281start,282Some(tz),283)?;284// apply the 'offset'285let start = offset_fn(&window.offset, start, Some(tz))?;286// make sure the first datapoint has a chance to be included287// and compute the end of the window defined by the 'period'288ensure_t_in_or_in_front_of_window(289window.every,290boundary.start,291offset_fn,292nte_duration_fn,293window.period,294start,295closed_window,296Some(tz),297)?298},299_ => {300let tz = chrono::Utc;301let dt = dt.and_local_timezone(tz).unwrap();302let dt = dt.beginning_of_week();303let dt = dt.naive_utc();304let start = to(dt);305// adjust start of the week based on given day of the week306let start = offset_fn(307&Duration::parse(&format!("{}d", start_by.weekday().unwrap())),308start,309None,310)311.unwrap();312// apply the 'offset'313let start = offset_fn(&window.offset, start, None).unwrap();314// make sure the first datapoint has a chance to be included315// and compute the end of the window defined by the 'period'316ensure_t_in_or_in_front_of_window(317window.every,318boundary.start,319offset_fn,320nte_duration_fn,321window.period,322start,323closed_window,324None,325)?326},327}328}329},330};331Ok(Self {332window,333boundary,334bi,335tu,336tz,337})338}339}340341impl Iterator for BoundsIter<'_> {342type Item = Bounds;343344fn next(&mut self) -> Option<Self::Item> {345if self.bi.start < self.boundary.stop {346let out = self.bi;347match self.tu {348// TODO: find some way to propagate error instead of unwrapping?349// Issue is that `next` needs to return `Option`.350TimeUnit::Nanoseconds => {351self.bi.start = self.window.every.add_ns(self.bi.start, self.tz).unwrap();352self.bi.stop = self.window.period.add_ns(self.bi.start, self.tz).unwrap();353},354TimeUnit::Microseconds => {355self.bi.start = self.window.every.add_us(self.bi.start, self.tz).unwrap();356self.bi.stop = self.window.period.add_us(self.bi.start, self.tz).unwrap();357},358TimeUnit::Milliseconds => {359self.bi.start = self.window.every.add_ms(self.bi.start, self.tz).unwrap();360self.bi.stop = self.window.period.add_ms(self.bi.start, self.tz).unwrap();361},362}363Some(out)364} else {365None366}367}368369fn nth(&mut self, n: usize) -> Option<Self::Item> {370let n: i64 = n.try_into().unwrap();371if self.bi.start < self.boundary.stop {372match self.tu {373TimeUnit::Nanoseconds => {374self.bi.start = (self.window.every * n)375.add_ns(self.bi.start, self.tz)376.unwrap();377self.bi.stop = (self.window.period).add_ns(self.bi.start, self.tz).unwrap();378},379TimeUnit::Microseconds => {380self.bi.start = (self.window.every * n)381.add_us(self.bi.start, self.tz)382.unwrap();383self.bi.stop = (self.window.period).add_us(self.bi.start, self.tz).unwrap();384},385TimeUnit::Milliseconds => {386self.bi.start = (self.window.every * n)387.add_ms(self.bi.start, self.tz)388.unwrap();389self.bi.stop = (self.window.period).add_ms(self.bi.start, self.tz).unwrap();390},391}392self.next()393} else {394None395}396}397}398399impl<'a> BoundsIter<'a> {400/// Number of iterations to advance, such that the bounds are on target; or, in401/// the case of non-constant duration, close to target.402/// Follows the `nth()` convention on Iterator indexing, i.e., a return value of 0403/// implies advancing 1 iteration.404pub fn get_stride(&self, target: i64) -> usize {405let mut stride = 0;406if self.bi.start < self.boundary.stop && target > self.bi.start {407let gap = target - self.bi.start;408match self.tu {409TimeUnit::Nanoseconds => {410if gap411> self.window.every.nte_duration_ns() + self.window.period.nte_duration_ns()412{413stride = ((gap - self.window.period.nte_duration_ns()) as usize)414/ (self.window.every.nte_duration_ns() as usize);415}416},417TimeUnit::Microseconds => {418if gap419> self.window.every.nte_duration_us() + self.window.period.nte_duration_us()420{421stride = ((gap - self.window.period.nte_duration_us()) as usize)422/ (self.window.every.nte_duration_us() as usize);423}424},425TimeUnit::Milliseconds => {426if gap427> self.window.every.nte_duration_ms() + self.window.period.nte_duration_ms()428{429stride = ((gap - self.window.period.nte_duration_ms()) as usize)430/ (self.window.every.nte_duration_ms() as usize);431}432},433}434}435stride436}437}438439440