Path: blob/main/crates/polars-ops/src/frame/join/asof/groups.rs
6940 views
use std::hash::Hash;12use num_traits::Zero;3use polars_core::hashing::_HASHMAP_INIT_SIZE;4use polars_core::prelude::*;5use polars_core::series::BitRepr;6use polars_core::utils::flatten::flatten_nullable;7use polars_core::utils::split_and_flatten;8use polars_core::{POOL, with_match_physical_float_polars_type};9use polars_utils::abs_diff::AbsDiff;10use polars_utils::hashing::{DirtyHash, hash_to_partition};11use polars_utils::nulls::IsNull;12use polars_utils::total_ord::{ToTotalOrd, TotalEq, TotalHash};13use rayon::prelude::*;1415use super::*;16use crate::frame::join::{prepare_binary, prepare_keys_multiple};1718fn compute_len_offsets<I: IntoIterator<Item = usize>>(iter: I) -> Vec<usize> {19let mut cumlen = 0;20iter.into_iter()21.map(|l| {22let offset = cumlen;23cumlen += l;24offset25})26.collect()27}2829#[inline(always)]30fn materialize_nullable(idx: Option<IdxSize>) -> NullableIdxSize {31match idx {32Some(t) => NullableIdxSize::from(t),33None => NullableIdxSize::null(),34}35}3637fn asof_in_group<'a, T, A, F>(38left_val: T::Physical<'a>,39right_val_arr: &'a T::Array,40right_grp_idxs: &[IdxSize],41group_states: &mut PlHashMap<IdxSize, A>,42filter: F,43allow_eq: bool,44) -> Option<IdxSize>45where46T: PolarsDataType,47A: AsofJoinState<T::Physical<'a>>,48F: Fn(T::Physical<'a>, T::Physical<'a>) -> bool,49{50// We use the index of the first element in a group as an identifier to51// associate with the group state.52let id = right_grp_idxs.first()?;53let grp_state = group_states.entry(*id).or_insert_with(|| A::new(allow_eq));5455unsafe {56let r_grp_idx = grp_state.next(57&left_val,58|i| {59// SAFETY: the group indices are valid, and next() only calls with60// i < right_grp_idxs.len().61right_val_arr.get_unchecked(*right_grp_idxs.get_unchecked(i as usize) as usize)62},63right_grp_idxs.len() as IdxSize,64)?;6566// SAFETY: r_grp_idx is valid, as is r_idx (which must be non-null) if67// we get here.68let r_idx = *right_grp_idxs.get_unchecked(r_grp_idx as usize);69let right_val = right_val_arr.value_unchecked(r_idx as usize);70filter(left_val, right_val).then_some(r_idx)71}72}7374fn asof_join_by_numeric<T, S, A, F>(75by_left: &ChunkedArray<S>,76by_right: &ChunkedArray<S>,77left_asof: &ChunkedArray<T>,78right_asof: &ChunkedArray<T>,79filter: F,80allow_eq: bool,81) -> PolarsResult<IdxArr>82where83T: PolarsDataType,84S: PolarsNumericType,85S::Native: TotalHash + TotalEq + DirtyHash + ToTotalOrd,86<S::Native as ToTotalOrd>::TotalOrdItem: Send + Sync + Copy + Hash + Eq + DirtyHash + IsNull,87A: for<'a> AsofJoinState<T::Physical<'a>>,88F: Sync + for<'a> Fn(T::Physical<'a>, T::Physical<'a>) -> bool,89{90let (left_asof, right_asof) = POOL.join(|| left_asof.rechunk(), || right_asof.rechunk());91let left_val_arr = left_asof.downcast_as_array();92let right_val_arr = right_asof.downcast_as_array();9394let n_threads = POOL.current_num_threads();95// `strict` is false so that we always flatten. Even if there are more chunks than threads.96let split_by_left = split_and_flatten(by_left, n_threads);97let split_by_right = split_and_flatten(by_right, n_threads);98let offsets = compute_len_offsets(split_by_left.iter().map(|s| s.len()));99100// TODO: handle nulls more efficiently. Right now we just join on the value101// ignoring the validity mask, and ignore the nulls later.102let right_slices = split_by_right103.iter()104.map(|ca| {105assert_eq!(ca.chunks().len(), 1);106ca.downcast_iter().next().unwrap().values_iter().copied()107})108.collect();109let hash_tbls = build_tables(right_slices, false);110let n_tables = hash_tbls.len();111112// Now we probe the right hand side for each left hand side.113let out = split_by_left114.into_par_iter()115.zip(offsets)116.map(|(by_left, offset)| {117let mut results = Vec::with_capacity(by_left.len());118let mut group_states: PlHashMap<IdxSize, A> =119PlHashMap::with_capacity(_HASHMAP_INIT_SIZE);120121assert_eq!(by_left.chunks().len(), 1);122let by_left_chunk = by_left.downcast_iter().next().unwrap();123for (rel_idx_left, opt_by_left_k) in by_left_chunk.iter().enumerate() {124let Some(by_left_k) = opt_by_left_k else {125results.push(NullableIdxSize::null());126continue;127};128let by_left_k = by_left_k.to_total_ord();129let idx_left = (rel_idx_left + offset) as IdxSize;130let Some(left_val) = left_val_arr.get(idx_left as usize) else {131results.push(NullableIdxSize::null());132continue;133};134135let group_probe_table = unsafe {136hash_tbls.get_unchecked(hash_to_partition(by_left_k.dirty_hash(), n_tables))137};138let Some(right_grp_idxs) = group_probe_table.get(&by_left_k) else {139results.push(NullableIdxSize::null());140continue;141};142let id = asof_in_group::<T, A, &F>(143left_val,144right_val_arr,145right_grp_idxs.as_slice(),146&mut group_states,147&filter,148allow_eq,149);150results.push(materialize_nullable(id));151}152results153});154155let bufs = POOL.install(|| out.collect::<Vec<_>>());156Ok(flatten_nullable(&bufs))157}158159fn asof_join_by_binary<B, T, A, F>(160by_left: &ChunkedArray<B>,161by_right: &ChunkedArray<B>,162left_asof: &ChunkedArray<T>,163right_asof: &ChunkedArray<T>,164filter: F,165allow_eq: bool,166) -> IdxArr167where168B: PolarsDataType,169for<'b> <B::Array as StaticArray>::ValueT<'b>: AsRef<[u8]>,170T: PolarsDataType,171A: for<'a> AsofJoinState<T::Physical<'a>>,172F: Sync + for<'a> Fn(T::Physical<'a>, T::Physical<'a>) -> bool,173{174let (left_asof, right_asof) = POOL.join(|| left_asof.rechunk(), || right_asof.rechunk());175let left_val_arr = left_asof.downcast_as_array();176let right_val_arr = right_asof.downcast_as_array();177178let (prep_by_left, prep_by_right, _, _) = prepare_binary::<B>(by_left, by_right, false);179let offsets = compute_len_offsets(prep_by_left.iter().map(|s| s.len()));180let hash_tbls = build_tables(prep_by_right, false);181let n_tables = hash_tbls.len();182183// Now we probe the right hand side for each left hand side.184let iter = prep_by_left185.into_par_iter()186.zip(offsets)187.map(|(by_left, offset)| {188let mut results = Vec::with_capacity(by_left.len());189let mut group_states: PlHashMap<_, A> = PlHashMap::with_capacity(_HASHMAP_INIT_SIZE);190191for (rel_idx_left, by_left_k) in by_left.iter().enumerate() {192let idx_left = (rel_idx_left + offset) as IdxSize;193let Some(left_val) = left_val_arr.get(idx_left as usize) else {194results.push(NullableIdxSize::null());195continue;196};197198let group_probe_table = unsafe {199hash_tbls.get_unchecked(hash_to_partition(by_left_k.dirty_hash(), n_tables))200};201let Some(right_grp_idxs) = group_probe_table.get(by_left_k) else {202results.push(NullableIdxSize::null());203continue;204};205let id = asof_in_group::<T, A, &F>(206left_val,207right_val_arr,208right_grp_idxs.as_slice(),209&mut group_states,210&filter,211allow_eq,212);213214results.push(materialize_nullable(id));215}216results217});218let bufs = POOL.install(|| iter.collect::<Vec<_>>());219flatten_nullable(&bufs)220}221222#[allow(clippy::too_many_arguments)]223fn dispatch_join_by_type<T, A, F>(224left_asof: &ChunkedArray<T>,225right_asof: &ChunkedArray<T>,226left_by: &mut DataFrame,227right_by: &mut DataFrame,228filter: F,229allow_eq: bool,230) -> PolarsResult<IdxArr>231where232T: PolarsDataType,233A: for<'a> AsofJoinState<T::Physical<'a>>,234F: Sync + for<'a> Fn(T::Physical<'a>, T::Physical<'a>) -> bool,235{236let out = if left_by.width() == 1 {237let left_by_s = left_by.get_columns()[0].to_physical_repr();238let right_by_s = right_by.get_columns()[0].to_physical_repr();239let left_dtype = left_by_s.dtype();240let right_dtype = right_by_s.dtype();241polars_ensure!(left_dtype == right_dtype,242ComputeError: "mismatching dtypes in 'by' parameter of asof-join: `{left_dtype}` and `{right_dtype}`",243);244match left_dtype {245DataType::String => {246let left_by = &left_by_s.str().unwrap().as_binary();247let right_by = right_by_s.str().unwrap().as_binary();248asof_join_by_binary::<BinaryType, T, A, F>(249left_by, &right_by, left_asof, right_asof, filter, allow_eq,250)251},252DataType::Binary => {253let left_by = &left_by_s.binary().unwrap();254let right_by = right_by_s.binary().unwrap();255asof_join_by_binary::<BinaryType, T, A, F>(256left_by, right_by, left_asof, right_asof, filter, allow_eq,257)258},259x if x.is_float() => {260with_match_physical_float_polars_type!(left_by_s.dtype(), |$T| {261let left_by: &ChunkedArray<$T> = left_by_s.as_materialized_series().as_ref().as_ref().as_ref();262let right_by: &ChunkedArray<$T> = right_by_s.as_materialized_series().as_ref().as_ref().as_ref();263asof_join_by_numeric::<T, $T, A, F>(264left_by, right_by, left_asof, right_asof, filter, allow_eq265)?266})267},268_ => {269let left_by = left_by_s.bit_repr();270let right_by = right_by_s.bit_repr();271272let (Some(left_by), Some(right_by)) = (left_by, right_by) else {273polars_bail!(nyi = "Dispatch join for {left_dtype} and {right_dtype}");274};275276use BitRepr as B;277match (left_by, right_by) {278(B::U32(left_by), B::U32(right_by)) => {279asof_join_by_numeric::<T, UInt32Type, A, F>(280&left_by, &right_by, left_asof, right_asof, filter, allow_eq,281)?282},283(B::U64(left_by), B::U64(right_by)) => {284asof_join_by_numeric::<T, UInt64Type, A, F>(285&left_by, &right_by, left_asof, right_asof, filter, allow_eq,286)?287},288#[cfg(feature = "dtype-i128")]289(B::I128(left_by), B::I128(right_by)) => {290asof_join_by_numeric::<T, Int128Type, A, F>(291&left_by, &right_by, left_asof, right_asof, filter, allow_eq,292)?293},294// We have already asserted that the datatypes are the same.295_ => unreachable!(),296}297},298}299} else {300for (lhs, rhs) in left_by.get_columns().iter().zip(right_by.get_columns()) {301polars_ensure!(lhs.dtype() == rhs.dtype(),302ComputeError: "mismatching dtypes in 'by' parameter of asof-join: `{}` and `{}`", lhs.dtype(), rhs.dtype()303);304}305306// TODO: @scalar-opt.307let left_by_series: Vec<_> = left_by.materialized_column_iter().cloned().collect();308let right_by_series: Vec<_> = right_by.materialized_column_iter().cloned().collect();309let lhs_keys = prepare_keys_multiple(&left_by_series, false)?;310let rhs_keys = prepare_keys_multiple(&right_by_series, false)?;311asof_join_by_binary::<BinaryOffsetType, T, A, F>(312&lhs_keys, &rhs_keys, left_asof, right_asof, filter, allow_eq,313)314};315Ok(out)316}317318#[allow(clippy::too_many_arguments)]319fn dispatch_join_strategy<T: PolarsDataType>(320left_asof: &ChunkedArray<T>,321right_asof: &Series,322left_by: &mut DataFrame,323right_by: &mut DataFrame,324strategy: AsofStrategy,325allow_eq: bool,326) -> PolarsResult<IdxArr>327where328for<'a> T::Physical<'a>: PartialOrd,329{330let right_asof = left_asof.unpack_series_matching_type(right_asof)?;331332let filter = |_a: T::Physical<'_>, _b: T::Physical<'_>| true;333match strategy {334AsofStrategy::Backward => dispatch_join_by_type::<T, AsofJoinBackwardState, _>(335left_asof, right_asof, left_by, right_by, filter, allow_eq,336),337AsofStrategy::Forward => dispatch_join_by_type::<T, AsofJoinForwardState, _>(338left_asof, right_asof, left_by, right_by, filter, allow_eq,339),340AsofStrategy::Nearest => unimplemented!(),341}342}343344#[allow(clippy::too_many_arguments)]345fn dispatch_join_strategy_numeric<T: PolarsNumericType>(346left_asof: &ChunkedArray<T>,347right_asof: &Series,348left_by: &mut DataFrame,349right_by: &mut DataFrame,350strategy: AsofStrategy,351tolerance: Option<AnyValue<'static>>,352allow_eq: bool,353) -> PolarsResult<IdxArr> {354let right_ca = left_asof.unpack_series_matching_type(right_asof)?;355356if let Some(tol) = tolerance {357let native_tolerance: T::Native = tol.try_extract()?;358let abs_tolerance = native_tolerance.abs_diff(T::Native::zero());359let filter = |a: T::Native, b: T::Native| a.abs_diff(b) <= abs_tolerance;360match strategy {361AsofStrategy::Backward => dispatch_join_by_type::<T, AsofJoinBackwardState, _>(362left_asof, right_ca, left_by, right_by, filter, allow_eq,363),364AsofStrategy::Forward => dispatch_join_by_type::<T, AsofJoinForwardState, _>(365left_asof, right_ca, left_by, right_by, filter, allow_eq,366),367AsofStrategy::Nearest => dispatch_join_by_type::<T, AsofJoinNearestState, _>(368left_asof, right_ca, left_by, right_by, filter, allow_eq,369),370}371} else {372let filter = |_a: T::Physical<'_>, _b: T::Physical<'_>| true;373match strategy {374AsofStrategy::Backward => dispatch_join_by_type::<T, AsofJoinBackwardState, _>(375left_asof, right_ca, left_by, right_by, filter, allow_eq,376),377AsofStrategy::Forward => dispatch_join_by_type::<T, AsofJoinForwardState, _>(378left_asof, right_ca, left_by, right_by, filter, allow_eq,379),380AsofStrategy::Nearest => dispatch_join_by_type::<T, AsofJoinNearestState, _>(381left_asof, right_ca, left_by, right_by, filter, allow_eq,382),383}384}385}386387#[allow(clippy::too_many_arguments)]388fn dispatch_join_type(389left_asof: &Series,390right_asof: &Series,391left_by: &mut DataFrame,392right_by: &mut DataFrame,393strategy: AsofStrategy,394tolerance: Option<AnyValue<'static>>,395allow_eq: bool,396) -> PolarsResult<IdxArr> {397match left_asof.dtype() {398DataType::Int64 => {399let ca = left_asof.i64().unwrap();400dispatch_join_strategy_numeric(401ca, right_asof, left_by, right_by, strategy, tolerance, allow_eq,402)403},404DataType::Int32 => {405let ca = left_asof.i32().unwrap();406dispatch_join_strategy_numeric(407ca, right_asof, left_by, right_by, strategy, tolerance, allow_eq,408)409},410DataType::UInt64 => {411let ca = left_asof.u64().unwrap();412dispatch_join_strategy_numeric(413ca, right_asof, left_by, right_by, strategy, tolerance, allow_eq,414)415},416DataType::UInt32 => {417let ca = left_asof.u32().unwrap();418dispatch_join_strategy_numeric(419ca, right_asof, left_by, right_by, strategy, tolerance, allow_eq,420)421},422#[cfg(feature = "dtype-i128")]423DataType::Int128 => {424let ca = left_asof.i128().unwrap();425dispatch_join_strategy_numeric(426ca, right_asof, left_by, right_by, strategy, tolerance, allow_eq,427)428},429DataType::Float32 => {430let ca = left_asof.f32().unwrap();431dispatch_join_strategy_numeric(432ca, right_asof, left_by, right_by, strategy, tolerance, allow_eq,433)434},435DataType::Float64 => {436let ca = left_asof.f64().unwrap();437dispatch_join_strategy_numeric(438ca, right_asof, left_by, right_by, strategy, tolerance, allow_eq,439)440},441DataType::Boolean => {442let ca = left_asof.bool().unwrap();443dispatch_join_strategy::<BooleanType>(444ca, right_asof, left_by, right_by, strategy, allow_eq,445)446},447DataType::Binary => {448let ca = left_asof.binary().unwrap();449dispatch_join_strategy::<BinaryType>(450ca, right_asof, left_by, right_by, strategy, allow_eq,451)452},453DataType::String => {454let ca = left_asof.str().unwrap();455let right_binary = right_asof.cast(&DataType::Binary).unwrap();456dispatch_join_strategy::<BinaryType>(457&ca.as_binary(),458&right_binary,459left_by,460right_by,461strategy,462allow_eq,463)464},465DataType::Int8 | DataType::UInt8 | DataType::Int16 | DataType::UInt16 => {466let left_asof = left_asof.cast(&DataType::Int32).unwrap();467let right_asof = right_asof.cast(&DataType::Int32).unwrap();468let ca = left_asof.i32().unwrap();469dispatch_join_strategy_numeric(470ca,471&right_asof,472left_by,473right_by,474strategy,475tolerance,476allow_eq,477)478},479dt => polars_bail!(opq = asof_join, dt),480}481}482483pub trait AsofJoinBy: IntoDf {484#[allow(clippy::too_many_arguments)]485#[doc(hidden)]486fn _join_asof_by(487&self,488other: &DataFrame,489left_on: &Series,490right_on: &Series,491left_by: Vec<PlSmallStr>,492right_by: Vec<PlSmallStr>,493strategy: AsofStrategy,494tolerance: Option<AnyValue<'static>>,495suffix: Option<PlSmallStr>,496slice: Option<(i64, usize)>,497coalesce: bool,498allow_eq: bool,499check_sortedness: bool,500) -> PolarsResult<DataFrame> {501let (self_sliced_slot, left_slice_s); // Keeps temporaries alive.502let (self_df, other_df, left_key, right_key);503if let Some((offset, len)) = slice {504self_sliced_slot = self.to_df().slice(offset, len);505left_slice_s = left_on.slice(offset, len);506left_key = &left_slice_s;507right_key = right_on;508self_df = &self_sliced_slot;509other_df = other;510} else {511self_df = self.to_df();512other_df = other;513left_key = left_on;514right_key = right_on;515}516517let left_asof = left_key.to_physical_repr();518let right_asof = right_key.to_physical_repr();519let right_asof_name = right_asof.name();520let left_asof_name = left_asof.name();521check_asof_columns(522&left_asof,523&right_asof,524tolerance.is_some(),525check_sortedness,526!(left_by.is_empty() && right_by.is_empty()),527)?;528529let mut left_by = self_df.select(left_by)?;530let mut right_by = other_df.select(right_by)?;531532unsafe {533for (l, r) in left_by534.get_columns_mut()535.iter_mut()536.zip(right_by.get_columns_mut().iter_mut())537{538*l = l.to_physical_repr();539*r = r.to_physical_repr();540}541}542543let right_join_tuples = dispatch_join_type(544&left_asof,545&right_asof,546&mut left_by,547&mut right_by,548strategy,549tolerance,550allow_eq,551)?;552553let mut drop_these = right_by.get_column_names();554if coalesce && left_asof_name == right_asof_name {555drop_these.push(right_asof_name);556}557558let cols = other_df559.get_columns()560.iter()561.filter(|s| !drop_these.contains(&s.name()))562.cloned()563.collect();564let proj_other_df = unsafe { DataFrame::new_no_checks(other_df.height(), cols) };565566let left = self_df.clone();567568// SAFETY: join tuples are in bounds.569let right_df = unsafe {570proj_other_df.take_unchecked(&IdxCa::with_chunk(PlSmallStr::EMPTY, right_join_tuples))571};572573_finish_join(left, right_df, suffix)574}575576/// This is similar to a left-join except that we match on nearest key577/// rather than equal keys. The keys must be sorted to perform an asof join.578/// This is a special implementation of an asof join that searches for the579/// nearest keys within a subgroup set by `by`.580#[allow(clippy::too_many_arguments)]581fn join_asof_by<I, S>(582&self,583other: &DataFrame,584left_on: &str,585right_on: &str,586left_by: I,587right_by: I,588strategy: AsofStrategy,589tolerance: Option<AnyValue<'static>>,590allow_eq: bool,591check_sortedness: bool,592) -> PolarsResult<DataFrame>593where594I: IntoIterator<Item = S>,595S: AsRef<str>,596{597let self_df = self.to_df();598let left_by = left_by.into_iter().map(|s| s.as_ref().into()).collect();599let right_by = right_by.into_iter().map(|s| s.as_ref().into()).collect();600let left_key = self_df.column(left_on)?.as_materialized_series();601let right_key = other.column(right_on)?.as_materialized_series();602self_df._join_asof_by(603other,604left_key,605right_key,606left_by,607right_by,608strategy,609tolerance,610None,611None,612true,613allow_eq,614check_sortedness,615)616}617}618619impl AsofJoinBy for DataFrame {}620621#[cfg(test)]622mod test {623use super::*;624625#[test]626fn test_asof_by() -> PolarsResult<()> {627let a = df![628"a" => [-1, 2, 3, 3, 3, 4],629"b" => ["a", "b", "c", "d", "e", "f"]630]?;631632let b = df![633"a" => [1, 2, 3, 3],634"b" => ["a", "b", "c", "d"],635"right_vals" => [1, 2, 3, 4]636]?;637638let out = a.join_asof_by(639&b,640"a",641"a",642["b"],643["b"],644AsofStrategy::Backward,645None,646true,647true,648)?;649assert_eq!(out.get_column_names(), &["a", "b", "right_vals"]);650let out = out.column("right_vals").unwrap();651let out = out.i32().unwrap();652assert_eq!(653Vec::from(out),654&[None, Some(2), Some(3), Some(4), None, None]655);656Ok(())657}658659#[test]660fn test_asof_by2() -> PolarsResult<()> {661let trades = df![662"time" => [23i64, 38, 48, 48, 48],663"ticker" => ["MSFT", "MSFT", "GOOG", "GOOG", "AAPL"],664"groups_numeric" => [1, 1, 2, 2, 3],665"bid" => [51.95, 51.95, 720.77, 720.92, 98.0]666]?;667668let quotes = df![669"time" => [23i64,67023,67130,67241,67348,67449,67572,67675],677"ticker" => ["GOOG", "MSFT", "MSFT", "MSFT", "GOOG", "AAPL", "GOOG", "MSFT"],678"groups_numeric" => [2, 1, 1, 1, 2, 3, 2, 1],679"bid" => [720.5, 51.95, 51.97, 51.99, 720.5, 97.99, 720.5, 52.01]680681]?;682683let out = trades.join_asof_by(684"es,685"time",686"time",687["ticker"],688["ticker"],689AsofStrategy::Backward,690None,691true,692true,693)?;694let a = out.column("bid_right").unwrap();695let a = a.f64().unwrap();696let expected = &[Some(51.95), Some(51.97), Some(720.5), Some(720.5), None];697698assert_eq!(Vec::from(a), expected);699700let out = trades.join_asof_by(701"es,702"time",703"time",704["groups_numeric"],705["groups_numeric"],706AsofStrategy::Backward,707None,708true,709true,710)?;711let a = out.column("bid_right").unwrap();712let a = a.f64().unwrap();713714assert_eq!(Vec::from(a), expected);715716Ok(())717}718719#[test]720fn test_asof_by3() -> PolarsResult<()> {721let a = df![722"a" => [ -1, 2, 2, 3, 3, 3, 4],723"b" => ["a", "a", "b", "c", "d", "e", "f"]724]?;725726let b = df![727"a" => [ 1, 3, 2, 3, 2],728"b" => ["a", "a", "b", "c", "d"],729"right_vals" => [ 1, 3, 2, 3, 4]730]?;731732let out = a.join_asof_by(733&b,734"a",735"a",736["b"],737["b"],738AsofStrategy::Forward,739None,740true,741true,742)?;743assert_eq!(out.get_column_names(), &["a", "b", "right_vals"]);744let out = out.column("right_vals").unwrap();745let out = out.i32().unwrap();746assert_eq!(747Vec::from(out),748&[Some(1), Some(3), Some(2), Some(3), None, None, None]749);750751let out = a.join_asof_by(752&b,753"a",754"a",755["b"],756["b"],757AsofStrategy::Forward,758Some(AnyValue::Int32(1)),759true,760true,761)?;762assert_eq!(out.get_column_names(), &["a", "b", "right_vals"]);763let out = out.column("right_vals").unwrap();764let out = out.i32().unwrap();765assert_eq!(766Vec::from(out),767&[None, Some(3), Some(2), Some(3), None, None, None]768);769770Ok(())771}772773#[test]774fn test_asof_by4() -> PolarsResult<()> {775let trades = df![776"time" => [23i64, 38, 48, 48, 48],777"ticker" => ["MSFT", "MSFT", "GOOG", "GOOG", "AAPL"],778"groups_numeric" => [1, 1, 2, 2, 3],779"bid" => [51.95, 51.95, 720.77, 720.92, 98.0]780]?;781782let quotes = df![783"time" => [23i64, 23, 30, 41, 48, 49, 72, 75],784"ticker" => ["GOOG", "MSFT", "MSFT", "MSFT", "GOOG", "AAPL", "GOOG", "MSFT"],785"bid" => [720.5, 51.95, 51.97, 51.99, 720.5, 97.99, 720.5, 52.01],786"groups_numeric" => [2, 1, 1, 1, 2, 3, 2, 1],787788]?;789/*790trades:791shape: (5, 4)792┌──────┬────────┬────────────────┬────────┐793│ time ┆ ticker ┆ groups_numeric ┆ bid │794│ --- ┆ --- ┆ --- ┆ --- │795│ i64 ┆ str ┆ i32 ┆ f64 │796╞══════╪════════╪════════════════╪════════╡797│ 23 ┆ MSFT ┆ 1 ┆ 51.95 │798│ 38 ┆ MSFT ┆ 1 ┆ 51.95 │799│ 48 ┆ GOOG ┆ 2 ┆ 720.77 │800│ 48 ┆ GOOG ┆ 2 ┆ 720.92 │801│ 48 ┆ AAPL ┆ 3 ┆ 98.0 │802└──────┴────────┴────────────────┴────────┘803quotes:804shape: (8, 4)805┌──────┬────────┬───────┬────────────────┐806│ time ┆ ticker ┆ bid ┆ groups_numeric │807│ --- ┆ --- ┆ --- ┆ --- │808│ i64 ┆ str ┆ f64 ┆ i32 │809╞══════╪════════╪═══════╪════════════════╡810│ 23 ┆ GOOG ┆ 720.5 ┆ 2 │811│ 23 ┆ MSFT ┆ 51.95 ┆ 1 │812│ 30 ┆ MSFT ┆ 51.97 ┆ 1 │813│ 41 ┆ MSFT ┆ 51.99 ┆ 1 │814│ 48 ┆ GOOG ┆ 720.5 ┆ 2 │815│ 49 ┆ AAPL ┆ 97.99 ┆ 3 │816│ 72 ┆ GOOG ┆ 720.5 ┆ 2 │817│ 75 ┆ MSFT ┆ 52.01 ┆ 1 │818└──────┴────────┴───────┴────────────────┘819*/820821let out = trades.join_asof_by(822"es,823"time",824"time",825["ticker"],826["ticker"],827AsofStrategy::Forward,828None,829true,830true,831)?;832let a = out.column("bid_right").unwrap();833let a = a.f64().unwrap();834let expected = &[835Some(51.95),836Some(51.99),837Some(720.5),838Some(720.5),839Some(97.99),840];841842assert_eq!(Vec::from(a), expected);843844let out = trades.join_asof_by(845"es,846"time",847"time",848["groups_numeric"],849["groups_numeric"],850AsofStrategy::Forward,851None,852true,853true,854)?;855let a = out.column("bid_right").unwrap();856let a = a.f64().unwrap();857858assert_eq!(Vec::from(a), expected);859860Ok(())861}862}863864865