Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-ops/src/frame/join/asof/mod.rs
6940 views
1
mod default;
2
mod groups;
3
use std::borrow::Cow;
4
use std::cmp::Ordering;
5
6
use default::*;
7
pub use groups::AsofJoinBy;
8
use polars_core::prelude::*;
9
use polars_utils::pl_str::PlSmallStr;
10
#[cfg(feature = "serde")]
11
use serde::{Deserialize, Serialize};
12
13
use super::{_finish_join, build_tables};
14
use crate::frame::IntoDf;
15
use crate::series::SeriesMethods;
16
17
#[inline]
18
fn ge_allow_eq<T: PartialOrd>(l: &T, r: &T, allow_eq: bool) -> bool {
19
match l.partial_cmp(r) {
20
Some(Ordering::Equal) => allow_eq,
21
Some(Ordering::Greater) => true,
22
_ => false,
23
}
24
}
25
26
#[inline]
27
fn lt_allow_eq<T: PartialOrd>(l: &T, r: &T, allow_eq: bool) -> bool {
28
match l.partial_cmp(r) {
29
Some(Ordering::Equal) => allow_eq,
30
Some(Ordering::Less) => true,
31
_ => false,
32
}
33
}
34
35
trait AsofJoinState<T> {
36
fn next<F: FnMut(IdxSize) -> Option<T>>(
37
&mut self,
38
left_val: &T,
39
right: F,
40
n_right: IdxSize,
41
) -> Option<IdxSize>;
42
43
fn new(allow_eq: bool) -> Self;
44
}
45
46
struct AsofJoinForwardState {
47
scan_offset: IdxSize,
48
allow_eq: bool,
49
}
50
51
impl<T: PartialOrd> AsofJoinState<T> for AsofJoinForwardState {
52
fn new(allow_eq: bool) -> Self {
53
AsofJoinForwardState {
54
scan_offset: Default::default(),
55
allow_eq,
56
}
57
}
58
#[inline]
59
fn next<F: FnMut(IdxSize) -> Option<T>>(
60
&mut self,
61
left_val: &T,
62
mut right: F,
63
n_right: IdxSize,
64
) -> Option<IdxSize> {
65
while (self.scan_offset) < n_right {
66
if let Some(right_val) = right(self.scan_offset) {
67
if ge_allow_eq(&right_val, left_val, self.allow_eq) {
68
return Some(self.scan_offset);
69
}
70
}
71
self.scan_offset += 1;
72
}
73
None
74
}
75
}
76
77
struct AsofJoinBackwardState {
78
// best_bound is the greatest right index <= left_val.
79
best_bound: Option<IdxSize>,
80
scan_offset: IdxSize,
81
allow_eq: bool,
82
}
83
84
impl<T: PartialOrd> AsofJoinState<T> for AsofJoinBackwardState {
85
fn new(allow_eq: bool) -> Self {
86
AsofJoinBackwardState {
87
scan_offset: Default::default(),
88
best_bound: Default::default(),
89
allow_eq,
90
}
91
}
92
#[inline]
93
fn next<F: FnMut(IdxSize) -> Option<T>>(
94
&mut self,
95
left_val: &T,
96
mut right: F,
97
n_right: IdxSize,
98
) -> Option<IdxSize> {
99
while self.scan_offset < n_right {
100
if let Some(right_val) = right(self.scan_offset) {
101
if lt_allow_eq(&right_val, left_val, self.allow_eq) {
102
self.best_bound = Some(self.scan_offset);
103
} else {
104
break;
105
}
106
}
107
self.scan_offset += 1;
108
}
109
self.best_bound
110
}
111
}
112
113
#[derive(Default)]
114
struct AsofJoinNearestState {
115
// best_bound is the nearest value to left_val, with ties broken towards the last element.
116
best_bound: Option<IdxSize>,
117
scan_offset: IdxSize,
118
allow_eq: bool,
119
}
120
121
impl<T: NumericNative> AsofJoinState<T> for AsofJoinNearestState {
122
fn new(allow_eq: bool) -> Self {
123
AsofJoinNearestState {
124
scan_offset: Default::default(),
125
best_bound: Default::default(),
126
allow_eq,
127
}
128
}
129
#[inline]
130
fn next<F: FnMut(IdxSize) -> Option<T>>(
131
&mut self,
132
left_val: &T,
133
mut right: F,
134
n_right: IdxSize,
135
) -> Option<IdxSize> {
136
// Skipping ahead to the first value greater than left_val. This is
137
// cheaper than computing differences.
138
while self.scan_offset < n_right {
139
if let Some(scan_right_val) = right(self.scan_offset) {
140
if lt_allow_eq(&scan_right_val, left_val, self.allow_eq) {
141
self.best_bound = Some(self.scan_offset);
142
} else {
143
// Now we must compute a difference to see if scan_right_val
144
// is closer than our current best bound.
145
let scan_is_better = if let Some(best_idx) = self.best_bound {
146
let best_right_val = unsafe { right(best_idx).unwrap_unchecked() };
147
let best_diff = left_val.abs_diff(best_right_val);
148
let scan_diff = left_val.abs_diff(scan_right_val);
149
150
lt_allow_eq(&scan_diff, &best_diff, self.allow_eq)
151
} else {
152
true
153
};
154
155
if scan_is_better {
156
self.best_bound = Some(self.scan_offset);
157
self.scan_offset += 1;
158
159
// It is possible there are later elements equal to our
160
// scan, so keep going on.
161
while self.scan_offset < n_right {
162
if let Some(next_right_val) = right(self.scan_offset) {
163
if next_right_val == scan_right_val && self.allow_eq {
164
self.best_bound = Some(self.scan_offset);
165
} else {
166
break;
167
}
168
}
169
170
self.scan_offset += 1;
171
}
172
}
173
174
break;
175
}
176
}
177
178
self.scan_offset += 1;
179
}
180
181
self.best_bound
182
}
183
}
184
185
#[derive(Clone, Debug, PartialEq, Default, Hash)]
186
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
187
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
188
pub struct AsOfOptions {
189
pub strategy: AsofStrategy,
190
/// A tolerance in the same unit as the asof column
191
pub tolerance: Option<Scalar>,
192
/// A time duration specified as a string, for example:
193
/// - "5m"
194
/// - "2h15m"
195
/// - "1d6h"
196
pub tolerance_str: Option<PlSmallStr>,
197
pub left_by: Option<Vec<PlSmallStr>>,
198
pub right_by: Option<Vec<PlSmallStr>>,
199
/// Allow equal matches
200
pub allow_eq: bool,
201
pub check_sortedness: bool,
202
}
203
204
fn check_asof_columns(
205
a: &Series,
206
b: &Series,
207
has_tolerance: bool,
208
check_sortedness: bool,
209
by_groups_present: bool,
210
) -> PolarsResult<()> {
211
let dtype_a = a.dtype();
212
let dtype_b = b.dtype();
213
if has_tolerance {
214
polars_ensure!(
215
dtype_a.to_physical().is_primitive_numeric() && dtype_b.to_physical().is_primitive_numeric(),
216
InvalidOperation:
217
"asof join with tolerance is only supported on numeric/temporal keys"
218
);
219
} else {
220
polars_ensure!(
221
dtype_a.to_physical().is_primitive() && dtype_b.to_physical().is_primitive(),
222
InvalidOperation:
223
"asof join is only supported on primitive key types"
224
);
225
}
226
polars_ensure!(
227
dtype_a == dtype_b,
228
ComputeError: "mismatching key dtypes in asof-join: `{}` and `{}`",
229
a.dtype(), b.dtype()
230
);
231
if check_sortedness {
232
if by_groups_present {
233
polars_warn!("Sortedness of columns cannot be checked when 'by' groups provided");
234
} else {
235
a.ensure_sorted_arg("asof_join")?;
236
b.ensure_sorted_arg("asof_join")?;
237
}
238
}
239
Ok(())
240
}
241
242
#[derive(Clone, Copy, Debug, PartialEq, Eq, Default, Hash)]
243
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
244
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
245
pub enum AsofStrategy {
246
/// selects the last row in the right DataFrame whose ‘on’ key is less than or equal to the left’s key
247
#[default]
248
Backward,
249
/// selects the first row in the right DataFrame whose ‘on’ key is greater than or equal to the left’s key.
250
Forward,
251
/// selects the right in the right DataFrame whose 'on' key is nearest to the left's key.
252
Nearest,
253
}
254
255
pub trait AsofJoin: IntoDf {
256
#[doc(hidden)]
257
#[allow(clippy::too_many_arguments)]
258
fn _join_asof(
259
&self,
260
other: &DataFrame,
261
left_key: &Series,
262
right_key: &Series,
263
strategy: AsofStrategy,
264
tolerance: Option<AnyValue<'static>>,
265
suffix: Option<PlSmallStr>,
266
slice: Option<(i64, usize)>,
267
coalesce: bool,
268
allow_eq: bool,
269
check_sortedness: bool,
270
) -> PolarsResult<DataFrame> {
271
let self_df = self.to_df();
272
273
check_asof_columns(
274
left_key,
275
right_key,
276
tolerance.is_some(),
277
check_sortedness,
278
false,
279
)?;
280
let left_key = left_key.to_physical_repr();
281
let right_key = right_key.to_physical_repr();
282
283
let mut take_idx = match left_key.dtype() {
284
DataType::Int64 => {
285
let ca = left_key.i64().unwrap();
286
join_asof_numeric(ca, &right_key, strategy, tolerance, allow_eq)
287
},
288
DataType::Int32 => {
289
let ca = left_key.i32().unwrap();
290
join_asof_numeric(ca, &right_key, strategy, tolerance, allow_eq)
291
},
292
#[cfg(feature = "dtype-i128")]
293
DataType::Int128 => {
294
let ca = left_key.i128().unwrap();
295
join_asof_numeric(ca, &right_key, strategy, tolerance, allow_eq)
296
},
297
DataType::UInt64 => {
298
let ca = left_key.u64().unwrap();
299
join_asof_numeric(ca, &right_key, strategy, tolerance, allow_eq)
300
},
301
DataType::UInt32 => {
302
let ca = left_key.u32().unwrap();
303
join_asof_numeric(ca, &right_key, strategy, tolerance, allow_eq)
304
},
305
DataType::Float32 => {
306
let ca = left_key.f32().unwrap();
307
join_asof_numeric(ca, &right_key, strategy, tolerance, allow_eq)
308
},
309
DataType::Float64 => {
310
let ca = left_key.f64().unwrap();
311
join_asof_numeric(ca, &right_key, strategy, tolerance, allow_eq)
312
},
313
DataType::Boolean => {
314
let ca = left_key.bool().unwrap();
315
join_asof::<BooleanType>(ca, &right_key, strategy, allow_eq)
316
},
317
DataType::Binary => {
318
let ca = left_key.binary().unwrap();
319
join_asof::<BinaryType>(ca, &right_key, strategy, allow_eq)
320
},
321
DataType::String => {
322
let ca = left_key.str().unwrap();
323
let right_binary = right_key.cast(&DataType::Binary).unwrap();
324
join_asof::<BinaryType>(&ca.as_binary(), &right_binary, strategy, allow_eq)
325
},
326
DataType::Int8 | DataType::UInt8 | DataType::Int16 | DataType::UInt16 => {
327
let left_key = left_key.cast(&DataType::Int32).unwrap();
328
let right_key = right_key.cast(&DataType::Int32).unwrap();
329
let ca = left_key.i32().unwrap();
330
join_asof_numeric(ca, &right_key, strategy, tolerance, allow_eq)
331
},
332
dt => polars_bail!(opq = asof_join, dt),
333
}?;
334
try_raise_keyboard_interrupt();
335
336
// Drop right join column.
337
let other = if coalesce && left_key.name() == right_key.name() {
338
Cow::Owned(other.drop(right_key.name())?)
339
} else {
340
Cow::Borrowed(other)
341
};
342
343
let mut left = self_df.clone();
344
if let Some((offset, len)) = slice {
345
left = left.slice(offset, len);
346
take_idx = take_idx.slice(offset, len);
347
}
348
349
// SAFETY: join tuples are in bounds.
350
let right_df = unsafe { other.take_unchecked(&take_idx) };
351
352
_finish_join(left, right_df, suffix)
353
}
354
}
355
356
impl AsofJoin for DataFrame {}
357
358