Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-time/src/upsample.rs
6939 views
1
#[cfg(feature = "timezones")]
2
use polars_core::datatypes::time_zone::parse_time_zone;
3
use polars_core::prelude::*;
4
use polars_ops::prelude::*;
5
use polars_ops::series::SeriesMethods;
6
7
use crate::prelude::*;
8
9
pub trait PolarsUpsample {
10
/// Upsample a [`DataFrame`] at a regular frequency.
11
///
12
/// # Arguments
13
/// * `by` - First group by these columns and then upsample for every group
14
/// * `time_column` - Will be used to determine a date_range.
15
/// Note that this column has to be sorted for the output to make sense.
16
/// * `every` - interval will start 'every' duration
17
/// * `offset` - change the start of the date_range by this offset.
18
///
19
/// The `every` and `offset` arguments are created with
20
/// the following string language:
21
/// - 1ns (1 nanosecond)
22
/// - 1us (1 microsecond)
23
/// - 1ms (1 millisecond)
24
/// - 1s (1 second)
25
/// - 1m (1 minute)
26
/// - 1h (1 hour)
27
/// - 1d (1 calendar day)
28
/// - 1w (1 calendar week)
29
/// - 1mo (1 calendar month)
30
/// - 1q (1 calendar quarter)
31
/// - 1y (1 calendar year)
32
/// - 1i (1 index count)
33
///
34
/// Or combine them:
35
/// "3d12h4m25s" # 3 days, 12 hours, 4 minutes, and 25 seconds
36
///
37
/// By "calendar day", we mean the corresponding time on the next
38
/// day (which may not be 24 hours, depending on daylight savings).
39
/// Similarly for "calendar week", "calendar month", "calendar quarter",
40
/// and "calendar year".
41
fn upsample<I: IntoVec<PlSmallStr>>(
42
&self,
43
by: I,
44
time_column: &str,
45
every: Duration,
46
) -> PolarsResult<DataFrame>;
47
48
/// Upsample a [`DataFrame`] at a regular frequency.
49
///
50
/// Similar to [`upsample`][PolarsUpsample::upsample], but order of the
51
/// DataFrame is maintained when `by` is specified.
52
///
53
/// # Arguments
54
/// * `by` - First group by these columns and then upsample for every group
55
/// * `time_column` - Will be used to determine a date_range.
56
/// Note that this column has to be sorted for the output to make sense.
57
/// * `every` - interval will start 'every' duration
58
/// * `offset` - change the start of the date_range by this offset.
59
///
60
/// The `every` and `offset` arguments are created with
61
/// the following string language:
62
/// - 1ns (1 nanosecond)
63
/// - 1us (1 microsecond)
64
/// - 1ms (1 millisecond)
65
/// - 1s (1 second)
66
/// - 1m (1 minute)
67
/// - 1h (1 hour)
68
/// - 1d (1 calendar day)
69
/// - 1w (1 calendar week)
70
/// - 1mo (1 calendar month)
71
/// - 1q (1 calendar quarter)
72
/// - 1y (1 calendar year)
73
/// - 1i (1 index count)
74
///
75
/// Or combine them:
76
/// "3d12h4m25s" # 3 days, 12 hours, 4 minutes, and 25 seconds
77
///
78
/// By "calendar day", we mean the corresponding time on the next
79
/// day (which may not be 24 hours, depending on daylight savings).
80
/// Similarly for "calendar week", "calendar month", "calendar quarter",
81
/// and "calendar year".
82
fn upsample_stable<I: IntoVec<PlSmallStr>>(
83
&self,
84
by: I,
85
time_column: &str,
86
every: Duration,
87
) -> PolarsResult<DataFrame>;
88
}
89
90
impl PolarsUpsample for DataFrame {
91
fn upsample<I: IntoVec<PlSmallStr>>(
92
&self,
93
by: I,
94
time_column: &str,
95
every: Duration,
96
) -> PolarsResult<DataFrame> {
97
let by = by.into_vec();
98
let time_type = self.column(time_column)?.dtype();
99
ensure_duration_matches_dtype(every, time_type, "every")?;
100
upsample_impl(self, by, time_column, every, false)
101
}
102
103
fn upsample_stable<I: IntoVec<PlSmallStr>>(
104
&self,
105
by: I,
106
time_column: &str,
107
every: Duration,
108
) -> PolarsResult<DataFrame> {
109
let by = by.into_vec();
110
let time_type = self.column(time_column)?.dtype();
111
ensure_duration_matches_dtype(every, time_type, "every")?;
112
upsample_impl(self, by, time_column, every, true)
113
}
114
}
115
116
fn upsample_impl(
117
source: &DataFrame,
118
by: Vec<PlSmallStr>,
119
index_column: &str,
120
every: Duration,
121
stable: bool,
122
) -> PolarsResult<DataFrame> {
123
let s = source.column(index_column)?;
124
let time_type = s.dtype();
125
if matches!(time_type, DataType::Date) {
126
let mut df = source.clone();
127
df.apply(index_column, |s| {
128
s.cast(&DataType::Datetime(TimeUnit::Microseconds, None))
129
.unwrap()
130
})
131
.unwrap();
132
let mut out = upsample_impl(&df, by, index_column, every, stable)?;
133
out.apply(index_column, |s| s.cast(time_type).unwrap())
134
.unwrap();
135
Ok(out)
136
} else if matches!(
137
time_type,
138
DataType::UInt32 | DataType::UInt64 | DataType::Int32
139
) {
140
let mut df = source.clone();
141
142
df.apply(index_column, |s| {
143
s.cast(&DataType::Int64)
144
.unwrap()
145
.cast(&DataType::Datetime(TimeUnit::Nanoseconds, None))
146
.unwrap()
147
})
148
.unwrap();
149
let mut out = upsample_impl(&df, by, index_column, every, stable)?;
150
out.apply(index_column, |s| s.cast(time_type).unwrap())
151
.unwrap();
152
Ok(out)
153
} else if matches!(time_type, DataType::Int64) {
154
let mut df = source.clone();
155
df.apply(index_column, |s| {
156
s.cast(&DataType::Datetime(TimeUnit::Nanoseconds, None))
157
.unwrap()
158
})
159
.unwrap();
160
let mut out = upsample_impl(&df, by, index_column, every, stable)?;
161
out.apply(index_column, |s| s.cast(time_type).unwrap())
162
.unwrap();
163
Ok(out)
164
} else if by.is_empty() {
165
let index_column = source.column(index_column)?;
166
upsample_single_impl(source, index_column.as_materialized_series(), every)
167
} else {
168
let gb = if stable {
169
source.group_by_stable(by)
170
} else {
171
source.group_by(by)
172
};
173
// don't parallelize this, this may SO on large data.
174
gb?.apply(|df| {
175
let index_column = df.column(index_column)?;
176
upsample_single_impl(&df, index_column.as_materialized_series(), every)
177
})
178
}
179
}
180
181
fn upsample_single_impl(
182
source: &DataFrame,
183
index_column: &Series,
184
every: Duration,
185
) -> PolarsResult<DataFrame> {
186
index_column.ensure_sorted_arg("upsample")?;
187
let index_col_name = index_column.name();
188
189
use DataType::*;
190
match index_column.dtype() {
191
Datetime(tu, tz) => {
192
let s = index_column.cast(&Int64).unwrap();
193
let ca = s.i64().unwrap();
194
let first = ca.iter().flatten().next();
195
let last = ca.iter().flatten().next_back();
196
match (first, last) {
197
(Some(first), Some(last)) => {
198
let tz = match tz {
199
#[cfg(feature = "timezones")]
200
Some(tz) => Some(parse_time_zone(tz)?),
201
_ => None,
202
};
203
let range = datetime_range_impl(
204
index_col_name.clone(),
205
first,
206
last,
207
every,
208
ClosedWindow::Both,
209
*tu,
210
tz.as_ref(),
211
)?
212
.into_series()
213
.into_frame();
214
range.join(
215
source,
216
[index_col_name.clone()],
217
[index_col_name.clone()],
218
JoinArgs::new(JoinType::Left),
219
None,
220
)
221
},
222
_ => polars_bail!(
223
ComputeError: "cannot determine upsample boundaries: all elements are null"
224
),
225
}
226
},
227
dt => polars_bail!(
228
ComputeError: "upsample not allowed for index column of dtype {}", dt,
229
),
230
}
231
}
232
233