Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-time/src/windows/window.rs
6939 views
1
use arrow::legacy::time_zone::Tz;
2
use arrow::temporal_conversions::*;
3
use chrono::NaiveDateTime;
4
#[cfg(feature = "timezones")]
5
use chrono::TimeZone;
6
use now::DateTimeNow;
7
use polars_core::prelude::*;
8
9
use crate::prelude::*;
10
11
/// Ensure that earliest datapoint (`t`) is in, or in front of, first window.
12
///
13
/// For example, if we have:
14
///
15
/// - first datapoint is `2020-01-01 01:00`
16
/// - `every` is `'1d'`
17
/// - `period` is `'2d'`
18
/// - `offset` is `'6h'`
19
///
20
/// then truncating the earliest datapoint by `every` and adding `offset` results
21
/// in the window `[2020-01-01 06:00, 2020-01-03 06:00)`. To give the earliest datapoint
22
/// a chance of being included, we then shift the window back by `every` to
23
/// `[2019-12-31 06:00, 2020-01-02 06:00)`.
24
pub(crate) fn ensure_t_in_or_in_front_of_window(
25
mut every: Duration,
26
t: i64,
27
offset_fn: fn(&Duration, i64, Option<&Tz>) -> PolarsResult<i64>,
28
period: Duration,
29
mut start: i64,
30
closed_window: ClosedWindow,
31
tz: Option<&Tz>,
32
) -> PolarsResult<Bounds> {
33
every.negative = !every.negative;
34
let mut stop = offset_fn(&period, start, tz)?;
35
while Bounds::new(start, stop).is_past(t, closed_window) {
36
start = offset_fn(&every, start, tz)?;
37
stop = offset_fn(&period, start, tz)?;
38
}
39
Ok(Bounds::new_checked(start, stop))
40
}
41
42
/// Represents a window in time
43
#[derive(Copy, Clone)]
44
pub struct Window {
45
// The ith window start is expressed via this equation:
46
// window_start_i = zero + every * i
47
// window_stop_i = zero + every * i + period
48
every: Duration,
49
period: Duration,
50
pub offset: Duration,
51
}
52
53
impl Window {
54
pub fn new(every: Duration, period: Duration, offset: Duration) -> Self {
55
debug_assert!(!every.negative);
56
Self {
57
every,
58
period,
59
offset,
60
}
61
}
62
63
/// Truncate the given ns timestamp by the window boundary.
64
pub fn truncate_ns(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {
65
self.every.truncate_ns(t, tz)
66
}
67
68
/// Truncate the given us timestamp by the window boundary.
69
pub fn truncate_us(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {
70
self.every.truncate_us(t, tz)
71
}
72
73
/// Truncate the given ms timestamp by the window boundary.
74
pub fn truncate_ms(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {
75
self.every.truncate_ms(t, tz)
76
}
77
78
/// Round the given ns timestamp by the window boundary.
79
pub fn round_ns(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {
80
let t = t + self.every.duration_ns() / 2_i64;
81
self.truncate_ns(t, tz)
82
}
83
84
/// Round the given us timestamp by the window boundary.
85
pub fn round_us(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {
86
let t = t + self.every.duration_ns()
87
/ (2 * timeunit_scale(ArrowTimeUnit::Nanosecond, ArrowTimeUnit::Microsecond) as i64);
88
self.truncate_us(t, tz)
89
}
90
91
/// Round the given ms timestamp by the window boundary.
92
pub fn round_ms(&self, t: i64, tz: Option<&Tz>) -> PolarsResult<i64> {
93
let t = t + self.every.duration_ns()
94
/ (2 * timeunit_scale(ArrowTimeUnit::Nanosecond, ArrowTimeUnit::Millisecond) as i64);
95
self.truncate_ms(t, tz)
96
}
97
98
/// returns the bounds for the earliest window bounds
99
/// that contains the given time t. For underlapping windows that
100
/// do not contain time t, the window directly after time t will be returned.
101
pub fn get_earliest_bounds_ns(
102
&self,
103
t: i64,
104
closed_window: ClosedWindow,
105
tz: Option<&Tz>,
106
) -> PolarsResult<Bounds> {
107
let start = self.truncate_ns(t, tz)?;
108
let start = self.offset.add_ns(start, tz)?;
109
ensure_t_in_or_in_front_of_window(
110
self.every,
111
t,
112
Duration::add_ns,
113
self.period,
114
start,
115
closed_window,
116
tz,
117
)
118
}
119
120
pub fn get_earliest_bounds_us(
121
&self,
122
t: i64,
123
closed_window: ClosedWindow,
124
tz: Option<&Tz>,
125
) -> PolarsResult<Bounds> {
126
let start = self.truncate_us(t, tz)?;
127
let start = self.offset.add_us(start, tz)?;
128
ensure_t_in_or_in_front_of_window(
129
self.every,
130
t,
131
Duration::add_us,
132
self.period,
133
start,
134
closed_window,
135
tz,
136
)
137
}
138
139
pub fn get_earliest_bounds_ms(
140
&self,
141
t: i64,
142
closed_window: ClosedWindow,
143
tz: Option<&Tz>,
144
) -> PolarsResult<Bounds> {
145
let start = self.truncate_ms(t, tz)?;
146
let start = self.offset.add_ms(start, tz)?;
147
ensure_t_in_or_in_front_of_window(
148
self.every,
149
t,
150
Duration::add_ms,
151
self.period,
152
start,
153
closed_window,
154
tz,
155
)
156
}
157
158
pub(crate) fn estimate_overlapping_bounds_ns(&self, boundary: Bounds) -> usize {
159
(boundary.duration() / self.every.duration_ns()
160
+ self.period.duration_ns() / self.every.duration_ns()) as usize
161
}
162
163
pub(crate) fn estimate_overlapping_bounds_us(&self, boundary: Bounds) -> usize {
164
(boundary.duration() / self.every.duration_us()
165
+ self.period.duration_us() / self.every.duration_us()) as usize
166
}
167
168
pub(crate) fn estimate_overlapping_bounds_ms(&self, boundary: Bounds) -> usize {
169
(boundary.duration() / self.every.duration_ms()
170
+ self.period.duration_ms() / self.every.duration_ms()) as usize
171
}
172
173
pub fn get_overlapping_bounds_iter<'a>(
174
&'a self,
175
boundary: Bounds,
176
closed_window: ClosedWindow,
177
tu: TimeUnit,
178
tz: Option<&'a Tz>,
179
start_by: StartBy,
180
) -> PolarsResult<BoundsIter<'a>> {
181
BoundsIter::new(*self, closed_window, boundary, tu, tz, start_by)
182
}
183
}
184
185
pub struct BoundsIter<'a> {
186
window: Window,
187
// wrapping boundary
188
boundary: Bounds,
189
// boundary per window iterator
190
bi: Bounds,
191
tu: TimeUnit,
192
tz: Option<&'a Tz>,
193
}
194
impl<'a> BoundsIter<'a> {
195
fn new(
196
window: Window,
197
closed_window: ClosedWindow,
198
boundary: Bounds,
199
tu: TimeUnit,
200
tz: Option<&'a Tz>,
201
start_by: StartBy,
202
) -> PolarsResult<Self> {
203
let bi = match start_by {
204
StartBy::DataPoint => {
205
let mut boundary = boundary;
206
let offset_fn = match tu {
207
TimeUnit::Nanoseconds => Duration::add_ns,
208
TimeUnit::Microseconds => Duration::add_us,
209
TimeUnit::Milliseconds => Duration::add_ms,
210
};
211
boundary.stop = offset_fn(&window.period, boundary.start, tz)?;
212
boundary
213
},
214
StartBy::WindowBound => match tu {
215
TimeUnit::Nanoseconds => {
216
window.get_earliest_bounds_ns(boundary.start, closed_window, tz)?
217
},
218
TimeUnit::Microseconds => {
219
window.get_earliest_bounds_us(boundary.start, closed_window, tz)?
220
},
221
TimeUnit::Milliseconds => {
222
window.get_earliest_bounds_ms(boundary.start, closed_window, tz)?
223
},
224
},
225
_ => {
226
{
227
#[allow(clippy::type_complexity)]
228
let (from, to, offset_fn): (
229
fn(i64) -> NaiveDateTime,
230
fn(NaiveDateTime) -> i64,
231
fn(&Duration, i64, Option<&Tz>) -> PolarsResult<i64>,
232
) = match tu {
233
TimeUnit::Nanoseconds => (
234
timestamp_ns_to_datetime,
235
datetime_to_timestamp_ns,
236
Duration::add_ns,
237
),
238
TimeUnit::Microseconds => (
239
timestamp_us_to_datetime,
240
datetime_to_timestamp_us,
241
Duration::add_us,
242
),
243
TimeUnit::Milliseconds => (
244
timestamp_ms_to_datetime,
245
datetime_to_timestamp_ms,
246
Duration::add_ms,
247
),
248
};
249
// find beginning of the week.
250
let dt = from(boundary.start);
251
match tz {
252
#[cfg(feature = "timezones")]
253
Some(tz) => {
254
let dt = tz.from_utc_datetime(&dt);
255
let dt = dt.beginning_of_week();
256
let dt = dt.naive_utc();
257
let start = to(dt);
258
// adjust start of the week based on given day of the week
259
let start = offset_fn(
260
&Duration::parse(&format!("{}d", start_by.weekday().unwrap())),
261
start,
262
Some(tz),
263
)?;
264
// apply the 'offset'
265
let start = offset_fn(&window.offset, start, Some(tz))?;
266
// make sure the first datapoint has a chance to be included
267
// and compute the end of the window defined by the 'period'
268
ensure_t_in_or_in_front_of_window(
269
window.every,
270
boundary.start,
271
offset_fn,
272
window.period,
273
start,
274
closed_window,
275
Some(tz),
276
)?
277
},
278
_ => {
279
let tz = chrono::Utc;
280
let dt = dt.and_local_timezone(tz).unwrap();
281
let dt = dt.beginning_of_week();
282
let dt = dt.naive_utc();
283
let start = to(dt);
284
// adjust start of the week based on given day of the week
285
let start = offset_fn(
286
&Duration::parse(&format!("{}d", start_by.weekday().unwrap())),
287
start,
288
None,
289
)
290
.unwrap();
291
// apply the 'offset'
292
let start = offset_fn(&window.offset, start, None).unwrap();
293
// make sure the first datapoint has a chance to be included
294
// and compute the end of the window defined by the 'period'
295
ensure_t_in_or_in_front_of_window(
296
window.every,
297
boundary.start,
298
offset_fn,
299
window.period,
300
start,
301
closed_window,
302
None,
303
)?
304
},
305
}
306
}
307
},
308
};
309
Ok(Self {
310
window,
311
boundary,
312
bi,
313
tu,
314
tz,
315
})
316
}
317
}
318
319
impl Iterator for BoundsIter<'_> {
320
type Item = Bounds;
321
322
fn next(&mut self) -> Option<Self::Item> {
323
if self.bi.start < self.boundary.stop {
324
let out = self.bi;
325
match self.tu {
326
// TODO: find some way to propagate error instead of unwrapping?
327
// Issue is that `next` needs to return `Option`.
328
TimeUnit::Nanoseconds => {
329
self.bi.start = self.window.every.add_ns(self.bi.start, self.tz).unwrap();
330
self.bi.stop = self.window.period.add_ns(self.bi.start, self.tz).unwrap();
331
},
332
TimeUnit::Microseconds => {
333
self.bi.start = self.window.every.add_us(self.bi.start, self.tz).unwrap();
334
self.bi.stop = self.window.period.add_us(self.bi.start, self.tz).unwrap();
335
},
336
TimeUnit::Milliseconds => {
337
self.bi.start = self.window.every.add_ms(self.bi.start, self.tz).unwrap();
338
self.bi.stop = self.window.period.add_ms(self.bi.start, self.tz).unwrap();
339
},
340
}
341
Some(out)
342
} else {
343
None
344
}
345
}
346
}
347
348