Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-time/src/upsample.rs
8415 views
1
#[cfg(feature = "timezones")]
2
use polars_core::datatypes::time_zone::parse_time_zone;
3
use polars_core::prelude::*;
4
use polars_core::utils::accumulate_dataframes_vertical_unchecked;
5
use polars_ops::prelude::*;
6
use polars_ops::series::SeriesMethods;
7
8
use crate::prelude::*;
9
10
pub trait PolarsUpsample {
11
/// Upsample a [`DataFrame`] at a regular frequency.
12
///
13
/// # Arguments
14
/// * `by` - First group by these columns and then upsample for every group
15
/// * `time_column` - Will be used to determine a date_range.
16
/// Note that this column has to be sorted for the output to make sense.
17
/// * `every` - interval will start 'every' duration
18
/// * `offset` - change the start of the date_range by this offset.
19
///
20
/// The `every` and `offset` arguments are created with
21
/// the following string language:
22
/// - 1ns (1 nanosecond)
23
/// - 1us (1 microsecond)
24
/// - 1ms (1 millisecond)
25
/// - 1s (1 second)
26
/// - 1m (1 minute)
27
/// - 1h (1 hour)
28
/// - 1d (1 calendar day)
29
/// - 1w (1 calendar week)
30
/// - 1mo (1 calendar month)
31
/// - 1q (1 calendar quarter)
32
/// - 1y (1 calendar year)
33
/// - 1i (1 index count)
34
///
35
/// Or combine them:
36
/// "3d12h4m25s" # 3 days, 12 hours, 4 minutes, and 25 seconds
37
///
38
/// By "calendar day", we mean the corresponding time on the next
39
/// day (which may not be 24 hours, depending on daylight savings).
40
/// Similarly for "calendar week", "calendar month", "calendar quarter",
41
/// and "calendar year".
42
fn upsample<I: IntoVec<PlSmallStr>>(
43
&self,
44
by: I,
45
time_column: &str,
46
every: Duration,
47
) -> PolarsResult<DataFrame>;
48
49
/// Upsample a [`DataFrame`] at a regular frequency.
50
///
51
/// Similar to [`upsample`][PolarsUpsample::upsample], but order of the
52
/// DataFrame is maintained when `by` is specified.
53
///
54
/// # Arguments
55
/// * `by` - First group by these columns and then upsample for every group
56
/// * `time_column` - Will be used to determine a date_range.
57
/// Note that this column has to be sorted for the output to make sense.
58
/// * `every` - interval will start 'every' duration
59
/// * `offset` - change the start of the date_range by this offset.
60
///
61
/// The `every` and `offset` arguments are created with
62
/// the following string language:
63
/// - 1ns (1 nanosecond)
64
/// - 1us (1 microsecond)
65
/// - 1ms (1 millisecond)
66
/// - 1s (1 second)
67
/// - 1m (1 minute)
68
/// - 1h (1 hour)
69
/// - 1d (1 calendar day)
70
/// - 1w (1 calendar week)
71
/// - 1mo (1 calendar month)
72
/// - 1q (1 calendar quarter)
73
/// - 1y (1 calendar year)
74
/// - 1i (1 index count)
75
///
76
/// Or combine them:
77
/// "3d12h4m25s" # 3 days, 12 hours, 4 minutes, and 25 seconds
78
///
79
/// By "calendar day", we mean the corresponding time on the next
80
/// day (which may not be 24 hours, depending on daylight savings).
81
/// Similarly for "calendar week", "calendar month", "calendar quarter",
82
/// and "calendar year".
83
fn upsample_stable<I: IntoVec<PlSmallStr>>(
84
&self,
85
by: I,
86
time_column: &str,
87
every: Duration,
88
) -> PolarsResult<DataFrame>;
89
}
90
91
impl PolarsUpsample for DataFrame {
92
fn upsample<I: IntoVec<PlSmallStr>>(
93
&self,
94
by: I,
95
time_column: &str,
96
every: Duration,
97
) -> PolarsResult<DataFrame> {
98
let by = by.into_vec();
99
let time_type = self.column(time_column)?.dtype();
100
ensure_duration_matches_dtype(every, time_type, "every")?;
101
upsample_impl(self, by, time_column, every, false)
102
}
103
104
fn upsample_stable<I: IntoVec<PlSmallStr>>(
105
&self,
106
by: I,
107
time_column: &str,
108
every: Duration,
109
) -> PolarsResult<DataFrame> {
110
let by = by.into_vec();
111
let time_type = self.column(time_column)?.dtype();
112
ensure_duration_matches_dtype(every, time_type, "every")?;
113
upsample_impl(self, by, time_column, every, true)
114
}
115
}
116
117
fn upsample_impl(
118
source: &DataFrame,
119
by: Vec<PlSmallStr>,
120
index_column: &str,
121
every: Duration,
122
stable: bool,
123
) -> PolarsResult<DataFrame> {
124
let s = source.column(index_column)?;
125
let original_type = s.dtype();
126
127
let needs_cast = matches!(
128
original_type,
129
DataType::Date | DataType::UInt32 | DataType::UInt64 | DataType::Int32 | DataType::Int64
130
);
131
132
let mut df = source.clone();
133
134
if needs_cast {
135
df.try_apply(index_column, |s| match s.dtype() {
136
#[cfg(feature = "dtype-date")]
137
DataType::Date => s.cast(&DataType::Datetime(TimeUnit::Microseconds, None)),
138
DataType::UInt32 | DataType::UInt64 | DataType::Int32 => s
139
.cast(&DataType::Int64)?
140
.cast(&DataType::Datetime(TimeUnit::Nanoseconds, None)),
141
DataType::Int64 => s.cast(&DataType::Datetime(TimeUnit::Nanoseconds, None)),
142
_ => Ok(s.clone()),
143
})?;
144
}
145
146
let mut out = upsample_core(&df, by, index_column, every, stable)?;
147
148
if needs_cast {
149
out.try_apply(index_column, |s| s.cast(original_type))?;
150
}
151
152
Ok(out)
153
}
154
155
fn upsample_core(
156
source: &DataFrame,
157
by: Vec<PlSmallStr>,
158
index_column: &str,
159
every: Duration,
160
stable: bool,
161
) -> PolarsResult<DataFrame> {
162
if by.is_empty() {
163
let index_column = source.column(index_column)?;
164
return upsample_single_impl(source, index_column.as_materialized_series(), every);
165
}
166
167
let source_schema = source.schema();
168
169
let group_keys_df = source.select(by)?;
170
let group_keys_schema = group_keys_df.schema();
171
172
let groups = if stable {
173
group_keys_df.group_by_stable(group_keys_schema.iter_names_cloned())
174
} else {
175
group_keys_df.group_by(group_keys_schema.iter_names_cloned())
176
}?
177
.into_groups();
178
179
let non_group_keys_df = unsafe {
180
source.select_unchecked(
181
source_schema
182
.iter_names()
183
.filter(|name| !group_keys_schema.contains(name.as_str())),
184
)?
185
};
186
187
let upsample_index_col_idx: Option<usize> = non_group_keys_df.schema().index_of(index_column);
188
189
// don't parallelize this, this may SO on large data.
190
let dfs: Vec<DataFrame> = groups
191
.iter()
192
.map(|g| {
193
let first_idx = g.first();
194
195
let mut non_group_keys_df = unsafe { non_group_keys_df.gather_group_unchecked(&g) };
196
197
if let Some(i) = upsample_index_col_idx {
198
non_group_keys_df = upsample_single_impl(
199
&non_group_keys_df,
200
non_group_keys_df.columns()[i].as_materialized_series(),
201
every,
202
)?
203
}
204
205
let mut out = non_group_keys_df;
206
207
let group_keys_df = group_keys_df.new_from_index(first_idx as usize, out.height());
208
209
let out_cols = unsafe { out.columns_mut() };
210
211
out_cols.reserve(group_keys_df.width());
212
out_cols.extend(group_keys_df.into_columns());
213
214
Ok(out)
215
})
216
.collect::<PolarsResult<_>>()?;
217
218
Ok(unsafe {
219
accumulate_dataframes_vertical_unchecked(dfs)
220
.select_unchecked(source_schema.iter_names())?
221
.with_schema(source_schema.clone())
222
})
223
}
224
225
fn upsample_single_impl(
226
source: &DataFrame,
227
index_column: &Series,
228
every: Duration,
229
) -> PolarsResult<DataFrame> {
230
index_column.ensure_sorted_arg("upsample")?;
231
let index_col_name = index_column.name();
232
233
use DataType::*;
234
match index_column.dtype() {
235
#[cfg(any(feature = "dtype-date", feature = "dtype-datetime"))]
236
Datetime(tu, tz) => {
237
let s = index_column.cast(&Int64).unwrap();
238
let ca = s.i64().unwrap();
239
let first = ca.iter().flatten().next();
240
let last = ca.iter().flatten().next_back();
241
match (first, last) {
242
(Some(first), Some(last)) => {
243
let tz = match tz {
244
#[cfg(feature = "timezones")]
245
Some(tz) => Some(parse_time_zone(tz)?),
246
_ => None,
247
};
248
let range = datetime_range_impl(
249
index_col_name.clone(),
250
first,
251
last,
252
every,
253
ClosedWindow::Both,
254
*tu,
255
tz.as_ref(),
256
)?
257
.into_series()
258
.into_frame();
259
range.join(
260
source,
261
[index_col_name.clone()],
262
[index_col_name.clone()],
263
JoinArgs::new(JoinType::Left),
264
None,
265
)
266
},
267
_ => polars_bail!(
268
ComputeError: "cannot determine upsample boundaries: all elements are null"
269
),
270
}
271
},
272
dt => polars_bail!(
273
ComputeError: "upsample not allowed for index column of dtype {}", dt,
274
),
275
}
276
}
277
278