Path: blob/main/crates/polars-time/src/windows/window.rs
6939 views
use arrow::legacy::time_zone::Tz;1use arrow::temporal_conversions::*;2use chrono::NaiveDateTime;3#[cfg(feature = "timezones")]4use chrono::TimeZone;5use now::DateTimeNow;6use polars_core::prelude::*;78use crate::prelude::*;910/// Ensure that earliest datapoint (`t`) is in, or in front of, first window.11///12/// For example, if we have:13///14/// - first datapoint is `2020-01-01 01:00`15/// - `every` is `'1d'`16/// - `period` is `'2d'`17/// - `offset` is `'6h'`18///19/// then truncating the earliest datapoint by `every` and adding `offset` results20/// in the window `[2020-01-01 06:00, 2020-01-03 06:00)`. To give the earliest datapoint21/// a chance of being included, we then shift the window back by `every` to22/// `[2019-12-31 06:00, 2020-01-02 06:00)`.23pub(crate) fn ensure_t_in_or_in_front_of_window(24mut every: Duration,25t: i64,26offset_fn: fn(&Duration, i64, Option<&Tz>) -> PolarsResult<i64>,27period: Duration,28mut start: i64,29closed_window: ClosedWindow,30tz: Option<&Tz>,31) -> PolarsResult<Bounds> {32every.negative = !every.negative;33let mut stop = offset_fn(&period, start, tz)?;34while Bounds::new(start, stop).is_past(t, closed_window) {35start = offset_fn(&every, start, tz)?;36stop = offset_fn(&period, start, tz)?;37}38Ok(Bounds::new_checked(start, stop))39}4041/// Represents a window in time42#[derive(Copy, Clone)]43pub struct Window {44// The ith window start is expressed via this equation:45// window_start_i = zero + every * i46// window_stop_i = zero + every * i + period47every: Duration,48period: Duration,49pub offset: Duration,50}5152impl Window {53pub fn new(every: Duration, period: Duration, offset: Duration) -> Self {54debug_assert!(!every.negative);55Self {56every,57period,58offset,59}60}6162/// Truncate the given ns timestamp by the window boundary.63pub fn truncate_ns(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {64self.every.truncate_ns(t, tz)65}6667/// Truncate the given us timestamp by the window boundary.68pub fn truncate_us(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {69self.every.truncate_us(t, tz)70}7172/// Truncate the given ms timestamp by the window boundary.73pub fn truncate_ms(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {74self.every.truncate_ms(t, tz)75}7677/// Round the given ns timestamp by the window boundary.78pub fn round_ns(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {79let t = t + self.every.duration_ns() / 2_i64;80self.truncate_ns(t, tz)81}8283/// Round the given us timestamp by the window boundary.84pub fn round_us(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {85let t = t + self.every.duration_ns()86/ (2 * timeunit_scale(ArrowTimeUnit::Nanosecond, ArrowTimeUnit::Microsecond) as i64);87self.truncate_us(t, tz)88}8990/// Round the given ms timestamp by the window boundary.91pub fn round_ms(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {92let t = t + self.every.duration_ns()93/ (2 * timeunit_scale(ArrowTimeUnit::Nanosecond, ArrowTimeUnit::Millisecond) as i64);94self.truncate_ms(t, tz)95}9697/// returns the bounds for the earliest window bounds98/// that contains the given time t. For underlapping windows that99/// do not contain time t, the window directly after time t will be returned.100pub fn get_earliest_bounds_ns(101&self,102t: i64,103closed_window: ClosedWindow,104tz: Option<&Tz>,105) -> PolarsResult<Bounds> {106let start = self.truncate_ns(t, tz)?;107let start = self.offset.add_ns(start, tz)?;108ensure_t_in_or_in_front_of_window(109self.every,110t,111Duration::add_ns,112self.period,113start,114closed_window,115tz,116)117}118119pub fn get_earliest_bounds_us(120&self,121t: i64,122closed_window: ClosedWindow,123tz: Option<&Tz>,124) -> PolarsResult<Bounds> {125let start = self.truncate_us(t, tz)?;126let start = self.offset.add_us(start, tz)?;127ensure_t_in_or_in_front_of_window(128self.every,129t,130Duration::add_us,131self.period,132start,133closed_window,134tz,135)136}137138pub fn get_earliest_bounds_ms(139&self,140t: i64,141closed_window: ClosedWindow,142tz: Option<&Tz>,143) -> PolarsResult<Bounds> {144let start = self.truncate_ms(t, tz)?;145let start = self.offset.add_ms(start, tz)?;146ensure_t_in_or_in_front_of_window(147self.every,148t,149Duration::add_ms,150self.period,151start,152closed_window,153tz,154)155}156157pub(crate) fn estimate_overlapping_bounds_ns(&self, boundary: Bounds) -> usize {158(boundary.duration() / self.every.duration_ns()159+ self.period.duration_ns() / self.every.duration_ns()) as usize160}161162pub(crate) fn estimate_overlapping_bounds_us(&self, boundary: Bounds) -> usize {163(boundary.duration() / self.every.duration_us()164+ self.period.duration_us() / self.every.duration_us()) as usize165}166167pub(crate) fn estimate_overlapping_bounds_ms(&self, boundary: Bounds) -> usize {168(boundary.duration() / self.every.duration_ms()169+ self.period.duration_ms() / self.every.duration_ms()) as usize170}171172pub fn get_overlapping_bounds_iter<'a>(173&'a self,174boundary: Bounds,175closed_window: ClosedWindow,176tu: TimeUnit,177tz: Option<&'a Tz>,178start_by: StartBy,179) -> PolarsResult<BoundsIter<'a>> {180BoundsIter::new(*self, closed_window, boundary, tu, tz, start_by)181}182}183184pub struct BoundsIter<'a> {185window: Window,186// wrapping boundary187boundary: Bounds,188// boundary per window iterator189bi: Bounds,190tu: TimeUnit,191tz: Option<&'a Tz>,192}193impl<'a> BoundsIter<'a> {194fn new(195window: Window,196closed_window: ClosedWindow,197boundary: Bounds,198tu: TimeUnit,199tz: Option<&'a Tz>,200start_by: StartBy,201) -> PolarsResult<Self> {202let bi = match start_by {203StartBy::DataPoint => {204let mut boundary = boundary;205let offset_fn = match tu {206TimeUnit::Nanoseconds => Duration::add_ns,207TimeUnit::Microseconds => Duration::add_us,208TimeUnit::Milliseconds => Duration::add_ms,209};210boundary.stop = offset_fn(&window.period, boundary.start, tz)?;211boundary212},213StartBy::WindowBound => match tu {214TimeUnit::Nanoseconds => {215window.get_earliest_bounds_ns(boundary.start, closed_window, tz)?216},217TimeUnit::Microseconds => {218window.get_earliest_bounds_us(boundary.start, closed_window, tz)?219},220TimeUnit::Milliseconds => {221window.get_earliest_bounds_ms(boundary.start, closed_window, tz)?222},223},224_ => {225{226#[allow(clippy::type_complexity)]227let (from, to, offset_fn): (228fn(i64) -> NaiveDateTime,229fn(NaiveDateTime) -> i64,230fn(&Duration, i64, Option<&Tz>) -> PolarsResult<i64>,231) = match tu {232TimeUnit::Nanoseconds => (233timestamp_ns_to_datetime,234datetime_to_timestamp_ns,235Duration::add_ns,236),237TimeUnit::Microseconds => (238timestamp_us_to_datetime,239datetime_to_timestamp_us,240Duration::add_us,241),242TimeUnit::Milliseconds => (243timestamp_ms_to_datetime,244datetime_to_timestamp_ms,245Duration::add_ms,246),247};248// find beginning of the week.249let dt = from(boundary.start);250match tz {251#[cfg(feature = "timezones")]252Some(tz) => {253let dt = tz.from_utc_datetime(&dt);254let dt = dt.beginning_of_week();255let dt = dt.naive_utc();256let start = to(dt);257// adjust start of the week based on given day of the week258let start = offset_fn(259&Duration::parse(&format!("{}d", start_by.weekday().unwrap())),260start,261Some(tz),262)?;263// apply the 'offset'264let start = offset_fn(&window.offset, start, Some(tz))?;265// make sure the first datapoint has a chance to be included266// and compute the end of the window defined by the 'period'267ensure_t_in_or_in_front_of_window(268window.every,269boundary.start,270offset_fn,271window.period,272start,273closed_window,274Some(tz),275)?276},277_ => {278let tz = chrono::Utc;279let dt = dt.and_local_timezone(tz).unwrap();280let dt = dt.beginning_of_week();281let dt = dt.naive_utc();282let start = to(dt);283// adjust start of the week based on given day of the week284let start = offset_fn(285&Duration::parse(&format!("{}d", start_by.weekday().unwrap())),286start,287None,288)289.unwrap();290// apply the 'offset'291let start = offset_fn(&window.offset, start, None).unwrap();292// make sure the first datapoint has a chance to be included293// and compute the end of the window defined by the 'period'294ensure_t_in_or_in_front_of_window(295window.every,296boundary.start,297offset_fn,298window.period,299start,300closed_window,301None,302)?303},304}305}306},307};308Ok(Self {309window,310boundary,311bi,312tu,313tz,314})315}316}317318impl Iterator for BoundsIter<'_> {319type Item = Bounds;320321fn next(&mut self) -> Option<Self::Item> {322if self.bi.start < self.boundary.stop {323let out = self.bi;324match self.tu {325// TODO: find some way to propagate error instead of unwrapping?326// Issue is that `next` needs to return `Option`.327TimeUnit::Nanoseconds => {328self.bi.start = self.window.every.add_ns(self.bi.start, self.tz).unwrap();329self.bi.stop = self.window.period.add_ns(self.bi.start, self.tz).unwrap();330},331TimeUnit::Microseconds => {332self.bi.start = self.window.every.add_us(self.bi.start, self.tz).unwrap();333self.bi.stop = self.window.period.add_us(self.bi.start, self.tz).unwrap();334},335TimeUnit::Milliseconds => {336self.bi.start = self.window.every.add_ms(self.bi.start, self.tz).unwrap();337self.bi.stop = self.window.period.add_ms(self.bi.start, self.tz).unwrap();338},339}340Some(out)341} else {342None343}344}345}346347348