Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-time/src/windows/window.rs
8424 views
1
use arrow::legacy::time_zone::Tz;
2
use arrow::temporal_conversions::*;
3
use chrono::NaiveDateTime;
4
#[cfg(feature = "timezones")]
5
use chrono::TimeZone;
6
use now::DateTimeNow;
7
use polars_core::prelude::*;
8
9
use crate::prelude::*;
10
11
/// Ensure that earliest datapoint (`t`) is in, or in front of, first window.
12
///
13
/// For example, if we have:
14
///
15
/// - first datapoint is `2020-01-01 01:00`
16
/// - `every` is `'1d'`
17
/// - `period` is `'2d'`
18
/// - `offset` is `'6h'`
19
///
20
/// then truncating the earliest datapoint by `every` and adding `offset` results
21
/// in the window `[2020-01-01 06:00, 2020-01-03 06:00)`. To give the earliest datapoint
22
/// a chance of being included, we then shift the window back by `every` to
23
/// `[2019-12-31 06:00, 2020-01-02 06:00)`.
24
#[allow(clippy::too_many_arguments)]
25
pub(crate) fn ensure_t_in_or_in_front_of_window(
26
mut every: Duration,
27
t: i64,
28
offset_fn: fn(&Duration, i64, Option<&Tz>) -> PolarsResult<i64>,
29
nte_duration_fn: fn(&Duration) -> i64,
30
period: Duration,
31
mut start: i64,
32
closed_window: ClosedWindow,
33
tz: Option<&Tz>,
34
) -> PolarsResult<Bounds> {
35
every.negative = !every.negative;
36
let mut stop = offset_fn(&period, start, tz)?;
37
38
while Bounds::new(start, stop).is_past(t, closed_window) {
39
let mut gap = start - t;
40
if matches!(closed_window, ClosedWindow::Right | ClosedWindow::None) {
41
gap += 1;
42
}
43
debug_assert!(gap >= 1);
44
45
// Ceil division
46
let stride = (gap + nte_duration_fn(&every) - 1) / nte_duration_fn(&every);
47
debug_assert!(stride >= 1);
48
let stride = std::cmp::max(stride, 1);
49
50
start = offset_fn(&(every * stride), start, tz)?;
51
stop = offset_fn(&period, start, tz)?;
52
}
53
Ok(Bounds::new_checked(start, stop))
54
}
55
56
/// Represents a window in time
57
#[derive(Copy, Clone)]
58
pub struct Window {
59
// The ith window start is expressed via this equation:
60
// window_start_i = zero + every * i
61
// window_stop_i = zero + every * i + period
62
pub(crate) every: Duration,
63
pub(crate) period: Duration,
64
pub offset: Duration,
65
}
66
67
impl Window {
68
pub fn new(every: Duration, period: Duration, offset: Duration) -> Self {
69
debug_assert!(!every.negative);
70
Self {
71
every,
72
period,
73
offset,
74
}
75
}
76
77
/// Truncate the given ns timestamp by the window boundary.
78
pub fn truncate_ns(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {
79
self.every.truncate_ns(t, tz)
80
}
81
82
/// Truncate the given us timestamp by the window boundary.
83
pub fn truncate_us(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {
84
self.every.truncate_us(t, tz)
85
}
86
87
/// Truncate the given ms timestamp by the window boundary.
88
pub fn truncate_ms(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {
89
self.every.truncate_ms(t, tz)
90
}
91
92
/// Round the given ns timestamp by the window boundary.
93
pub fn round_ns(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {
94
let t = t + self.every.duration_ns() / 2_i64;
95
self.truncate_ns(t, tz)
96
}
97
98
/// Round the given us timestamp by the window boundary.
99
pub fn round_us(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {
100
let t = t + self.every.duration_ns()
101
/ (2 * timeunit_scale(ArrowTimeUnit::Nanosecond, ArrowTimeUnit::Microsecond) as i64);
102
self.truncate_us(t, tz)
103
}
104
105
/// Round the given ms timestamp by the window boundary.
106
pub fn round_ms(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {
107
let t = t + self.every.duration_ns()
108
/ (2 * timeunit_scale(ArrowTimeUnit::Nanosecond, ArrowTimeUnit::Millisecond) as i64);
109
self.truncate_ms(t, tz)
110
}
111
112
/// returns the bounds for the earliest window bounds
113
/// that contains the given time t. For underlapping windows that
114
/// do not contain time t, the window directly after time t will be returned.
115
pub fn get_earliest_bounds_ns(
116
&self,
117
t: i64,
118
closed_window: ClosedWindow,
119
tz: Option<&Tz>,
120
) -> PolarsResult<Bounds> {
121
let start = self.truncate_ns(t, tz)?;
122
let start = self.offset.add_ns(start, tz)?;
123
ensure_t_in_or_in_front_of_window(
124
self.every,
125
t,
126
Duration::add_ns,
127
Duration::nte_duration_ns,
128
self.period,
129
start,
130
closed_window,
131
tz,
132
)
133
}
134
135
pub fn get_earliest_bounds_us(
136
&self,
137
t: i64,
138
closed_window: ClosedWindow,
139
tz: Option<&Tz>,
140
) -> PolarsResult<Bounds> {
141
let start = self.truncate_us(t, tz)?;
142
let start = self.offset.add_us(start, tz)?;
143
ensure_t_in_or_in_front_of_window(
144
self.every,
145
t,
146
Duration::add_us,
147
Duration::nte_duration_us,
148
self.period,
149
start,
150
closed_window,
151
tz,
152
)
153
}
154
155
pub fn get_earliest_bounds_ms(
156
&self,
157
t: i64,
158
closed_window: ClosedWindow,
159
tz: Option<&Tz>,
160
) -> PolarsResult<Bounds> {
161
let start = self.truncate_ms(t, tz)?;
162
let start = self.offset.add_ms(start, tz)?;
163
ensure_t_in_or_in_front_of_window(
164
self.every,
165
t,
166
Duration::add_ms,
167
Duration::nte_duration_ms,
168
self.period,
169
start,
170
closed_window,
171
tz,
172
)
173
}
174
175
pub(crate) fn estimate_overlapping_bounds_ns(&self, boundary: Bounds) -> usize {
176
(boundary.duration() / self.every.duration_ns()
177
+ self.period.duration_ns() / self.every.duration_ns()) as usize
178
}
179
180
pub(crate) fn estimate_overlapping_bounds_us(&self, boundary: Bounds) -> usize {
181
(boundary.duration() / self.every.duration_us()
182
+ self.period.duration_us() / self.every.duration_us()) as usize
183
}
184
185
pub(crate) fn estimate_overlapping_bounds_ms(&self, boundary: Bounds) -> usize {
186
(boundary.duration() / self.every.duration_ms()
187
+ self.period.duration_ms() / self.every.duration_ms()) as usize
188
}
189
190
pub fn get_overlapping_bounds_iter<'a>(
191
&'a self,
192
boundary: Bounds,
193
closed_window: ClosedWindow,
194
tu: TimeUnit,
195
tz: Option<&'a Tz>,
196
start_by: StartBy,
197
) -> PolarsResult<BoundsIter<'a>> {
198
BoundsIter::new(*self, closed_window, boundary, tu, tz, start_by)
199
}
200
}
201
202
pub struct BoundsIter<'a> {
203
window: Window,
204
// wrapping boundary
205
boundary: Bounds,
206
// boundary per window iterator
207
bi: Bounds,
208
tu: TimeUnit,
209
tz: Option<&'a Tz>,
210
}
211
impl<'a> BoundsIter<'a> {
212
fn new(
213
window: Window,
214
closed_window: ClosedWindow,
215
boundary: Bounds,
216
tu: TimeUnit,
217
tz: Option<&'a Tz>,
218
start_by: StartBy,
219
) -> PolarsResult<Self> {
220
let bi = match start_by {
221
StartBy::DataPoint => {
222
let mut boundary = boundary;
223
let offset_fn = match tu {
224
TimeUnit::Nanoseconds => Duration::add_ns,
225
TimeUnit::Microseconds => Duration::add_us,
226
TimeUnit::Milliseconds => Duration::add_ms,
227
};
228
boundary.stop = offset_fn(&window.period, boundary.start, tz)?;
229
boundary
230
},
231
StartBy::WindowBound => match tu {
232
TimeUnit::Nanoseconds => {
233
window.get_earliest_bounds_ns(boundary.start, closed_window, tz)?
234
},
235
TimeUnit::Microseconds => {
236
window.get_earliest_bounds_us(boundary.start, closed_window, tz)?
237
},
238
TimeUnit::Milliseconds => {
239
window.get_earliest_bounds_ms(boundary.start, closed_window, tz)?
240
},
241
},
242
_ => {
243
{
244
#[allow(clippy::type_complexity)]
245
let (from, to, offset_fn, nte_duration_fn): (
246
fn(i64) -> NaiveDateTime,
247
fn(NaiveDateTime) -> i64,
248
fn(&Duration, i64, Option<&Tz>) -> PolarsResult<i64>,
249
fn(&Duration) -> i64,
250
) = match tu {
251
TimeUnit::Nanoseconds => (
252
timestamp_ns_to_datetime,
253
datetime_to_timestamp_ns,
254
Duration::add_ns,
255
Duration::nte_duration_ns,
256
),
257
TimeUnit::Microseconds => (
258
timestamp_us_to_datetime,
259
datetime_to_timestamp_us,
260
Duration::add_us,
261
Duration::nte_duration_us,
262
),
263
TimeUnit::Milliseconds => (
264
timestamp_ms_to_datetime,
265
datetime_to_timestamp_ms,
266
Duration::add_ms,
267
Duration::nte_duration_ms,
268
),
269
};
270
// find beginning of the week.
271
let dt = from(boundary.start);
272
match tz {
273
#[cfg(feature = "timezones")]
274
Some(tz) => {
275
let dt = tz.from_utc_datetime(&dt);
276
let dt = dt.beginning_of_week();
277
let dt = dt.naive_utc();
278
let start = to(dt);
279
// adjust start of the week based on given day of the week
280
let start = offset_fn(
281
&Duration::parse(&format!("{}d", start_by.weekday().unwrap())),
282
start,
283
Some(tz),
284
)?;
285
// apply the 'offset'
286
let start = offset_fn(&window.offset, start, Some(tz))?;
287
// make sure the first datapoint has a chance to be included
288
// and compute the end of the window defined by the 'period'
289
ensure_t_in_or_in_front_of_window(
290
window.every,
291
boundary.start,
292
offset_fn,
293
nte_duration_fn,
294
window.period,
295
start,
296
closed_window,
297
Some(tz),
298
)?
299
},
300
_ => {
301
let tz = chrono::Utc;
302
let dt = dt.and_local_timezone(tz).unwrap();
303
let dt = dt.beginning_of_week();
304
let dt = dt.naive_utc();
305
let start = to(dt);
306
// adjust start of the week based on given day of the week
307
let start = offset_fn(
308
&Duration::parse(&format!("{}d", start_by.weekday().unwrap())),
309
start,
310
None,
311
)
312
.unwrap();
313
// apply the 'offset'
314
let start = offset_fn(&window.offset, start, None).unwrap();
315
// make sure the first datapoint has a chance to be included
316
// and compute the end of the window defined by the 'period'
317
ensure_t_in_or_in_front_of_window(
318
window.every,
319
boundary.start,
320
offset_fn,
321
nte_duration_fn,
322
window.period,
323
start,
324
closed_window,
325
None,
326
)?
327
},
328
}
329
}
330
},
331
};
332
Ok(Self {
333
window,
334
boundary,
335
bi,
336
tu,
337
tz,
338
})
339
}
340
}
341
342
impl Iterator for BoundsIter<'_> {
343
type Item = Bounds;
344
345
fn next(&mut self) -> Option<Self::Item> {
346
if self.bi.start < self.boundary.stop {
347
let out = self.bi;
348
match self.tu {
349
// TODO: find some way to propagate error instead of unwrapping?
350
// Issue is that `next` needs to return `Option`.
351
TimeUnit::Nanoseconds => {
352
self.bi.start = self.window.every.add_ns(self.bi.start, self.tz).unwrap();
353
self.bi.stop = self.window.period.add_ns(self.bi.start, self.tz).unwrap();
354
},
355
TimeUnit::Microseconds => {
356
self.bi.start = self.window.every.add_us(self.bi.start, self.tz).unwrap();
357
self.bi.stop = self.window.period.add_us(self.bi.start, self.tz).unwrap();
358
},
359
TimeUnit::Milliseconds => {
360
self.bi.start = self.window.every.add_ms(self.bi.start, self.tz).unwrap();
361
self.bi.stop = self.window.period.add_ms(self.bi.start, self.tz).unwrap();
362
},
363
}
364
Some(out)
365
} else {
366
None
367
}
368
}
369
370
fn nth(&mut self, n: usize) -> Option<Self::Item> {
371
let n: i64 = n.try_into().unwrap();
372
if self.bi.start < self.boundary.stop {
373
match self.tu {
374
TimeUnit::Nanoseconds => {
375
self.bi.start = (self.window.every * n)
376
.add_ns(self.bi.start, self.tz)
377
.unwrap();
378
self.bi.stop = (self.window.period).add_ns(self.bi.start, self.tz).unwrap();
379
},
380
TimeUnit::Microseconds => {
381
self.bi.start = (self.window.every * n)
382
.add_us(self.bi.start, self.tz)
383
.unwrap();
384
self.bi.stop = (self.window.period).add_us(self.bi.start, self.tz).unwrap();
385
},
386
TimeUnit::Milliseconds => {
387
self.bi.start = (self.window.every * n)
388
.add_ms(self.bi.start, self.tz)
389
.unwrap();
390
self.bi.stop = (self.window.period).add_ms(self.bi.start, self.tz).unwrap();
391
},
392
}
393
self.next()
394
} else {
395
None
396
}
397
}
398
}
399
400
impl<'a> BoundsIter<'a> {
401
/// Number of iterations to advance, such that the bounds are on target; or, in
402
/// the case of non-constant duration, close to target.
403
/// Follows the `nth()` convention on Iterator indexing, i.e., a return value of 0
404
/// implies advancing 1 iteration.
405
pub fn get_stride(&self, target: i64) -> usize {
406
let mut stride = 0;
407
if self.bi.start < self.boundary.stop && target > self.bi.start {
408
let gap = target - self.bi.start;
409
match self.tu {
410
TimeUnit::Nanoseconds => {
411
if gap
412
> self.window.every.nte_duration_ns() + self.window.period.nte_duration_ns()
413
{
414
stride = ((gap - self.window.period.nte_duration_ns()) as usize)
415
/ (self.window.every.nte_duration_ns() as usize);
416
}
417
},
418
TimeUnit::Microseconds => {
419
if gap
420
> self.window.every.nte_duration_us() + self.window.period.nte_duration_us()
421
{
422
stride = ((gap - self.window.period.nte_duration_us()) as usize)
423
/ (self.window.every.nte_duration_us() as usize);
424
}
425
},
426
TimeUnit::Milliseconds => {
427
if gap
428
> self.window.every.nte_duration_ms() + self.window.period.nte_duration_ms()
429
{
430
stride = ((gap - self.window.period.nte_duration_ms()) as usize)
431
/ (self.window.every.nte_duration_ms() as usize);
432
}
433
},
434
}
435
}
436
stride
437
}
438
}
439
440