Path: blob/main/crates/polars-ops/src/frame/join/asof/groups.rs
8446 views
use std::hash::Hash;12use num_traits::Zero;3use polars_core::hashing::_HASHMAP_INIT_SIZE;4use polars_core::prelude::*;5use polars_core::series::BitRepr;6use polars_core::utils::flatten::flatten_nullable;7use polars_core::utils::split_and_flatten;8use polars_core::{POOL, with_match_physical_float_polars_type};9use polars_utils::abs_diff::AbsDiff;10use polars_utils::hashing::{DirtyHash, hash_to_partition};11use polars_utils::nulls::IsNull;12use polars_utils::total_ord::{ToTotalOrd, TotalEq, TotalHash};13use rayon::prelude::*;1415use super::*;16use crate::frame::join::{prepare_binary, prepare_keys_multiple};1718fn compute_len_offsets<I: IntoIterator<Item = usize>>(iter: I) -> Vec<usize> {19let mut cumlen = 0;20iter.into_iter()21.map(|l| {22let offset = cumlen;23cumlen += l;24offset25})26.collect()27}2829#[inline(always)]30fn materialize_nullable(idx: Option<IdxSize>) -> NullableIdxSize {31match idx {32Some(t) => NullableIdxSize::from(t),33None => NullableIdxSize::null(),34}35}3637fn asof_in_group<'a, T, A, F>(38left_val: T::Physical<'a>,39right_val_arr: &'a T::Array,40right_grp_idxs: &[IdxSize],41group_states: &mut PlHashMap<IdxSize, A>,42filter: F,43allow_eq: bool,44) -> Option<IdxSize>45where46T: PolarsDataType,47A: AsofJoinState<T::Physical<'a>>,48F: Fn(T::Physical<'a>, T::Physical<'a>) -> bool,49{50// We use the index of the first element in a group as an identifier to51// associate with the group state.52let id = right_grp_idxs.first()?;53let grp_state = group_states.entry(*id).or_insert_with(|| A::new(allow_eq));5455unsafe {56let r_grp_idx = grp_state.next(57&left_val,58|i| {59// SAFETY: the group indices are valid, and next() only calls with60// i < right_grp_idxs.len().61right_val_arr.get_unchecked(*right_grp_idxs.get_unchecked(i as usize) as usize)62},63right_grp_idxs.len() as IdxSize,64)?;6566// SAFETY: r_grp_idx is valid, as is r_idx (which must be non-null) if67// we get here.68let r_idx = *right_grp_idxs.get_unchecked(r_grp_idx as usize);69let right_val = right_val_arr.value_unchecked(r_idx as usize);70filter(left_val, right_val).then_some(r_idx)71}72}7374fn asof_join_by_numeric<T, S, A, F>(75by_left: &ChunkedArray<S>,76by_right: &ChunkedArray<S>,77left_asof: &ChunkedArray<T>,78right_asof: &ChunkedArray<T>,79filter: F,80allow_eq: bool,81) -> PolarsResult<IdxArr>82where83T: PolarsDataType,84S: PolarsNumericType,85S::Native: TotalHash + TotalEq + DirtyHash + ToTotalOrd,86<S::Native as ToTotalOrd>::TotalOrdItem: Send + Sync + Copy + Hash + Eq + DirtyHash + IsNull,87A: for<'a> AsofJoinState<T::Physical<'a>>,88F: Sync + for<'a> Fn(T::Physical<'a>, T::Physical<'a>) -> bool,89{90let (left_asof, right_asof) = POOL.join(|| left_asof.rechunk(), || right_asof.rechunk());91let left_val_arr = left_asof.downcast_as_array();92let right_val_arr = right_asof.downcast_as_array();9394let n_threads = POOL.current_num_threads();95// `strict` is false so that we always flatten. Even if there are more chunks than threads.96let split_by_left = split_and_flatten(by_left, n_threads);97let split_by_right = split_and_flatten(by_right, n_threads);98let offsets = compute_len_offsets(split_by_left.iter().map(|s| s.len()));99100// TODO: handle nulls more efficiently. Right now we just join on the value101// ignoring the validity mask, and ignore the nulls later.102let right_slices = split_by_right103.iter()104.map(|ca| {105assert_eq!(ca.chunks().len(), 1);106ca.downcast_iter().next().unwrap().values_iter().copied()107})108.collect();109let hash_tbls = build_tables(right_slices, false);110let n_tables = hash_tbls.len();111112// Now we probe the right hand side for each left hand side.113let out = split_by_left114.into_par_iter()115.zip(offsets)116.map(|(by_left, offset)| {117let mut results = Vec::with_capacity(by_left.len());118let mut group_states: PlHashMap<IdxSize, A> =119PlHashMap::with_capacity(_HASHMAP_INIT_SIZE);120121assert_eq!(by_left.chunks().len(), 1);122let by_left_chunk = by_left.downcast_iter().next().unwrap();123for (rel_idx_left, opt_by_left_k) in by_left_chunk.iter().enumerate() {124let Some(by_left_k) = opt_by_left_k else {125results.push(NullableIdxSize::null());126continue;127};128let by_left_k = by_left_k.to_total_ord();129let idx_left = (rel_idx_left + offset) as IdxSize;130let Some(left_val) = left_val_arr.get(idx_left as usize) else {131results.push(NullableIdxSize::null());132continue;133};134135let group_probe_table = unsafe {136hash_tbls.get_unchecked(hash_to_partition(by_left_k.dirty_hash(), n_tables))137};138let Some(right_grp_idxs) = group_probe_table.get(&by_left_k) else {139results.push(NullableIdxSize::null());140continue;141};142let id = asof_in_group::<T, A, &F>(143left_val,144right_val_arr,145right_grp_idxs.as_slice(),146&mut group_states,147&filter,148allow_eq,149);150results.push(materialize_nullable(id));151}152results153});154155let bufs = POOL.install(|| out.collect::<Vec<_>>());156Ok(flatten_nullable(&bufs))157}158159fn asof_join_by_binary<B, T, A, F>(160by_left: &ChunkedArray<B>,161by_right: &ChunkedArray<B>,162left_asof: &ChunkedArray<T>,163right_asof: &ChunkedArray<T>,164filter: F,165allow_eq: bool,166) -> IdxArr167where168B: PolarsDataType,169for<'b> <B::Array as StaticArray>::ValueT<'b>: AsRef<[u8]>,170T: PolarsDataType,171A: for<'a> AsofJoinState<T::Physical<'a>>,172F: Sync + for<'a> Fn(T::Physical<'a>, T::Physical<'a>) -> bool,173{174let (left_asof, right_asof) = POOL.join(|| left_asof.rechunk(), || right_asof.rechunk());175let left_val_arr = left_asof.downcast_as_array();176let right_val_arr = right_asof.downcast_as_array();177178let (prep_by_left, prep_by_right, _, _) = prepare_binary::<B>(by_left, by_right, false);179let offsets = compute_len_offsets(prep_by_left.iter().map(|s| s.len()));180let hash_tbls = build_tables(prep_by_right, false);181let n_tables = hash_tbls.len();182183// Now we probe the right hand side for each left hand side.184let iter = prep_by_left185.into_par_iter()186.zip(offsets)187.map(|(by_left, offset)| {188let mut results = Vec::with_capacity(by_left.len());189let mut group_states: PlHashMap<_, A> = PlHashMap::with_capacity(_HASHMAP_INIT_SIZE);190191for (rel_idx_left, by_left_k) in by_left.iter().enumerate() {192let idx_left = (rel_idx_left + offset) as IdxSize;193let Some(left_val) = left_val_arr.get(idx_left as usize) else {194results.push(NullableIdxSize::null());195continue;196};197198let group_probe_table = unsafe {199hash_tbls.get_unchecked(hash_to_partition(by_left_k.dirty_hash(), n_tables))200};201let Some(right_grp_idxs) = group_probe_table.get(by_left_k) else {202results.push(NullableIdxSize::null());203continue;204};205let id = asof_in_group::<T, A, &F>(206left_val,207right_val_arr,208right_grp_idxs.as_slice(),209&mut group_states,210&filter,211allow_eq,212);213214results.push(materialize_nullable(id));215}216results217});218let bufs = POOL.install(|| iter.collect::<Vec<_>>());219flatten_nullable(&bufs)220}221222#[allow(clippy::too_many_arguments)]223fn dispatch_join_by_type<T, A, F>(224left_asof: &ChunkedArray<T>,225right_asof: &ChunkedArray<T>,226left_by: &mut DataFrame,227right_by: &mut DataFrame,228filter: F,229allow_eq: bool,230) -> PolarsResult<IdxArr>231where232T: PolarsDataType,233A: for<'a> AsofJoinState<T::Physical<'a>>,234F: Sync + for<'a> Fn(T::Physical<'a>, T::Physical<'a>) -> bool,235{236let out = if left_by.width() == 1 {237let left_by_s = left_by.columns()[0].to_physical_repr();238let right_by_s = right_by.columns()[0].to_physical_repr();239let left_dtype = left_by_s.dtype();240let right_dtype = right_by_s.dtype();241polars_ensure!(left_dtype == right_dtype,242ComputeError: "mismatching dtypes in 'by' parameter of asof-join: `{left_dtype}` and `{right_dtype}`",243);244match left_dtype {245DataType::String => {246let left_by = &left_by_s.str().unwrap().as_binary();247let right_by = right_by_s.str().unwrap().as_binary();248asof_join_by_binary::<BinaryType, T, A, F>(249left_by, &right_by, left_asof, right_asof, filter, allow_eq,250)251},252DataType::Binary => {253let left_by = &left_by_s.binary().unwrap();254let right_by = right_by_s.binary().unwrap();255asof_join_by_binary::<BinaryType, T, A, F>(256left_by, right_by, left_asof, right_asof, filter, allow_eq,257)258},259x if x.is_float() => {260with_match_physical_float_polars_type!(left_by_s.dtype(), |$T| {261let left_by: &ChunkedArray<$T> = left_by_s.as_materialized_series().as_ref().as_ref().as_ref();262let right_by: &ChunkedArray<$T> = right_by_s.as_materialized_series().as_ref().as_ref().as_ref();263asof_join_by_numeric::<T, $T, A, F>(264left_by, right_by, left_asof, right_asof, filter, allow_eq265)?266})267},268_ => {269let left_by = left_by_s.bit_repr();270let right_by = right_by_s.bit_repr();271272let (Some(left_by), Some(right_by)) = (left_by, right_by) else {273polars_bail!(nyi = "Dispatch join for {left_dtype} and {right_dtype}");274};275276use BitRepr as B;277match (left_by, right_by) {278(B::U8(left_by), B::U8(right_by)) => {279asof_join_by_numeric::<T, UInt8Type, A, F>(280&left_by, &right_by, left_asof, right_asof, filter, allow_eq,281)?282},283(B::U16(left_by), B::U16(right_by)) => {284asof_join_by_numeric::<T, UInt16Type, A, F>(285&left_by, &right_by, left_asof, right_asof, filter, allow_eq,286)?287},288(B::U32(left_by), B::U32(right_by)) => {289asof_join_by_numeric::<T, UInt32Type, A, F>(290&left_by, &right_by, left_asof, right_asof, filter, allow_eq,291)?292},293(B::U64(left_by), B::U64(right_by)) => {294asof_join_by_numeric::<T, UInt64Type, A, F>(295&left_by, &right_by, left_asof, right_asof, filter, allow_eq,296)?297},298#[cfg(feature = "dtype-u128")]299(B::U128(left_by), B::U128(right_by)) => {300asof_join_by_numeric::<T, UInt128Type, A, F>(301&left_by, &right_by, left_asof, right_asof, filter, allow_eq,302)?303},304// We have already asserted that the datatypes are the same.305_ => unreachable!(),306}307},308}309} else {310for (lhs, rhs) in left_by.columns().iter().zip(right_by.columns()) {311polars_ensure!(lhs.dtype() == rhs.dtype(),312ComputeError: "mismatching dtypes in 'by' parameter of asof-join: `{}` and `{}`", lhs.dtype(), rhs.dtype()313);314}315316// TODO: @scalar-opt.317let left_by_series: Vec<_> = left_by.materialized_column_iter().cloned().collect();318let right_by_series: Vec<_> = right_by.materialized_column_iter().cloned().collect();319let lhs_keys = prepare_keys_multiple(&left_by_series, false)?;320let rhs_keys = prepare_keys_multiple(&right_by_series, false)?;321asof_join_by_binary::<BinaryOffsetType, T, A, F>(322&lhs_keys, &rhs_keys, left_asof, right_asof, filter, allow_eq,323)324};325Ok(out)326}327328#[allow(clippy::too_many_arguments)]329fn dispatch_join_strategy<T: PolarsDataType>(330left_asof: &ChunkedArray<T>,331right_asof: &Series,332left_by: &mut DataFrame,333right_by: &mut DataFrame,334strategy: AsofStrategy,335allow_eq: bool,336) -> PolarsResult<IdxArr>337where338for<'a> T::Physical<'a>: TotalOrd,339{340let right_asof = left_asof.unpack_series_matching_type(right_asof)?;341342let filter = |_a: T::Physical<'_>, _b: T::Physical<'_>| true;343match strategy {344AsofStrategy::Backward => dispatch_join_by_type::<T, AsofJoinBackwardState, _>(345left_asof, right_asof, left_by, right_by, filter, allow_eq,346),347AsofStrategy::Forward => dispatch_join_by_type::<T, AsofJoinForwardState, _>(348left_asof, right_asof, left_by, right_by, filter, allow_eq,349),350AsofStrategy::Nearest => unimplemented!(),351}352}353354#[allow(clippy::too_many_arguments)]355fn dispatch_join_strategy_numeric<T: PolarsNumericType>(356left_asof: &ChunkedArray<T>,357right_asof: &Series,358left_by: &mut DataFrame,359right_by: &mut DataFrame,360strategy: AsofStrategy,361tolerance: Option<AnyValue<'static>>,362allow_eq: bool,363) -> PolarsResult<IdxArr> {364let right_ca = left_asof.unpack_series_matching_type(right_asof)?;365366if let Some(tol) = tolerance {367let native_tolerance: T::Native = tol.try_extract()?;368let abs_tolerance = native_tolerance.abs_diff(T::Native::zero());369let filter = |a: T::Native, b: T::Native| a.abs_diff(b) <= abs_tolerance;370match strategy {371AsofStrategy::Backward => dispatch_join_by_type::<T, AsofJoinBackwardState, _>(372left_asof, right_ca, left_by, right_by, filter, allow_eq,373),374AsofStrategy::Forward => dispatch_join_by_type::<T, AsofJoinForwardState, _>(375left_asof, right_ca, left_by, right_by, filter, allow_eq,376),377AsofStrategy::Nearest => dispatch_join_by_type::<T, AsofJoinNearestState, _>(378left_asof, right_ca, left_by, right_by, filter, allow_eq,379),380}381} else {382let filter = |_a: T::Physical<'_>, _b: T::Physical<'_>| true;383match strategy {384AsofStrategy::Backward => dispatch_join_by_type::<T, AsofJoinBackwardState, _>(385left_asof, right_ca, left_by, right_by, filter, allow_eq,386),387AsofStrategy::Forward => dispatch_join_by_type::<T, AsofJoinForwardState, _>(388left_asof, right_ca, left_by, right_by, filter, allow_eq,389),390AsofStrategy::Nearest => dispatch_join_by_type::<T, AsofJoinNearestState, _>(391left_asof, right_ca, left_by, right_by, filter, allow_eq,392),393}394}395}396397#[allow(clippy::too_many_arguments)]398fn dispatch_join_type(399left_asof: &Series,400right_asof: &Series,401left_by: &mut DataFrame,402right_by: &mut DataFrame,403strategy: AsofStrategy,404tolerance: Option<AnyValue<'static>>,405allow_eq: bool,406) -> PolarsResult<IdxArr> {407match left_asof.dtype() {408DataType::Int64 => {409let ca = left_asof.i64().unwrap();410dispatch_join_strategy_numeric(411ca, right_asof, left_by, right_by, strategy, tolerance, allow_eq,412)413},414DataType::Int32 => {415let ca = left_asof.i32().unwrap();416dispatch_join_strategy_numeric(417ca, right_asof, left_by, right_by, strategy, tolerance, allow_eq,418)419},420DataType::UInt64 => {421let ca = left_asof.u64().unwrap();422dispatch_join_strategy_numeric(423ca, right_asof, left_by, right_by, strategy, tolerance, allow_eq,424)425},426DataType::UInt32 => {427let ca = left_asof.u32().unwrap();428dispatch_join_strategy_numeric(429ca, right_asof, left_by, right_by, strategy, tolerance, allow_eq,430)431},432#[cfg(feature = "dtype-i128")]433DataType::Int128 => {434let ca = left_asof.i128().unwrap();435dispatch_join_strategy_numeric(436ca, right_asof, left_by, right_by, strategy, tolerance, allow_eq,437)438},439#[cfg(feature = "dtype-u128")]440DataType::UInt128 => {441let ca = left_asof.u128().unwrap();442dispatch_join_strategy_numeric(443ca, right_asof, left_by, right_by, strategy, tolerance, allow_eq,444)445},446#[cfg(feature = "dtype-f16")]447DataType::Float16 => {448let ca = left_asof.f16().unwrap();449dispatch_join_strategy_numeric(450ca, right_asof, left_by, right_by, strategy, tolerance, allow_eq,451)452},453DataType::Float32 => {454let ca = left_asof.f32().unwrap();455dispatch_join_strategy_numeric(456ca, right_asof, left_by, right_by, strategy, tolerance, allow_eq,457)458},459DataType::Float64 => {460let ca = left_asof.f64().unwrap();461dispatch_join_strategy_numeric(462ca, right_asof, left_by, right_by, strategy, tolerance, allow_eq,463)464},465DataType::Boolean => {466let ca = left_asof.bool().unwrap();467dispatch_join_strategy::<BooleanType>(468ca, right_asof, left_by, right_by, strategy, allow_eq,469)470},471DataType::Binary => {472let ca = left_asof.binary().unwrap();473dispatch_join_strategy::<BinaryType>(474ca, right_asof, left_by, right_by, strategy, allow_eq,475)476},477DataType::String => {478let ca = left_asof.str().unwrap();479let right_binary = right_asof.cast(&DataType::Binary).unwrap();480dispatch_join_strategy::<BinaryType>(481&ca.as_binary(),482&right_binary,483left_by,484right_by,485strategy,486allow_eq,487)488},489DataType::Int8 | DataType::UInt8 | DataType::Int16 | DataType::UInt16 => {490let left_asof = left_asof.cast(&DataType::Int32).unwrap();491let right_asof = right_asof.cast(&DataType::Int32).unwrap();492let ca = left_asof.i32().unwrap();493dispatch_join_strategy_numeric(494ca,495&right_asof,496left_by,497right_by,498strategy,499tolerance,500allow_eq,501)502},503dt => polars_bail!(opq = asof_join, dt),504}505}506507pub trait AsofJoinBy: IntoDf {508#[allow(clippy::too_many_arguments)]509#[doc(hidden)]510fn _join_asof_by(511&self,512other: &DataFrame,513left_on: &Series,514right_on: &Series,515left_by: Vec<PlSmallStr>,516right_by: Vec<PlSmallStr>,517strategy: AsofStrategy,518tolerance: Option<AnyValue<'static>>,519suffix: Option<PlSmallStr>,520slice: Option<(i64, usize)>,521coalesce: bool,522allow_eq: bool,523check_sortedness: bool,524) -> PolarsResult<DataFrame> {525let (self_sliced_slot, left_slice_s); // Keeps temporaries alive.526let (self_df, other_df, left_key, right_key);527if let Some((offset, len)) = slice {528self_sliced_slot = self.to_df().slice(offset, len);529left_slice_s = left_on.slice(offset, len);530left_key = &left_slice_s;531right_key = right_on;532self_df = &self_sliced_slot;533other_df = other;534} else {535self_df = self.to_df();536other_df = other;537left_key = left_on;538right_key = right_on;539}540541let left_asof = left_key.to_physical_repr();542let right_asof = right_key.to_physical_repr();543let right_asof_name = right_asof.name();544let left_asof_name = left_asof.name();545check_asof_columns(546&left_asof,547&right_asof,548tolerance.is_some(),549check_sortedness,550!(left_by.is_empty() && right_by.is_empty()),551)?;552553let mut left_by = self_df.select(left_by)?;554let mut right_by = other_df.select(right_by)?;555556for (l, r) in unsafe { left_by.columns_mut() }557.iter_mut()558.zip(unsafe { right_by.columns_mut() }.iter_mut())559{560*l = l.to_physical_repr();561*r = r.to_physical_repr();562}563564let right_join_tuples = dispatch_join_type(565&left_asof,566&right_asof,567&mut left_by,568&mut right_by,569strategy,570tolerance,571allow_eq,572)?;573574let mut drop_these = right_by.get_column_names();575if coalesce && left_asof_name == right_asof_name {576drop_these.push(right_asof_name);577}578579let cols = other_df580.columns()581.iter()582.filter(|s| !drop_these.contains(&s.name()))583.cloned()584.collect();585let proj_other_df = unsafe { DataFrame::new_unchecked(other_df.height(), cols) };586587let left = self_df.clone();588589// SAFETY: join tuples are in bounds.590let right_df = unsafe {591proj_other_df.take_unchecked(&IdxCa::with_chunk(PlSmallStr::EMPTY, right_join_tuples))592};593594_finish_join(left, right_df, suffix)595}596597/// This is similar to a left-join except that we match on nearest key598/// rather than equal keys. The keys must be sorted to perform an asof join.599/// This is a special implementation of an asof join that searches for the600/// nearest keys within a subgroup set by `by`.601#[allow(clippy::too_many_arguments)]602fn join_asof_by<I, S>(603&self,604other: &DataFrame,605left_on: &str,606right_on: &str,607left_by: I,608right_by: I,609strategy: AsofStrategy,610tolerance: Option<AnyValue<'static>>,611allow_eq: bool,612check_sortedness: bool,613) -> PolarsResult<DataFrame>614where615I: IntoIterator<Item = S>,616S: AsRef<str>,617{618let self_df = self.to_df();619let left_by = left_by.into_iter().map(|s| s.as_ref().into()).collect();620let right_by = right_by.into_iter().map(|s| s.as_ref().into()).collect();621let left_key = self_df.column(left_on)?.as_materialized_series();622let right_key = other.column(right_on)?.as_materialized_series();623self_df._join_asof_by(624other,625left_key,626right_key,627left_by,628right_by,629strategy,630tolerance,631None,632None,633true,634allow_eq,635check_sortedness,636)637}638}639640impl AsofJoinBy for DataFrame {}641642#[cfg(test)]643mod test {644use super::*;645646#[test]647fn test_asof_by() -> PolarsResult<()> {648let a = df![649"a" => [-1, 2, 3, 3, 3, 4],650"b" => ["a", "b", "c", "d", "e", "f"]651]?;652653let b = df![654"a" => [1, 2, 3, 3],655"b" => ["a", "b", "c", "d"],656"right_vals" => [1, 2, 3, 4]657]?;658659let out = a.join_asof_by(660&b,661"a",662"a",663["b"],664["b"],665AsofStrategy::Backward,666None,667true,668true,669)?;670assert_eq!(out.get_column_names(), &["a", "b", "right_vals"]);671let out = out.column("right_vals").unwrap();672let out = out.i32().unwrap();673assert_eq!(674Vec::from(out),675&[None, Some(2), Some(3), Some(4), None, None]676);677Ok(())678}679680#[test]681fn test_asof_by2() -> PolarsResult<()> {682let trades = df![683"time" => [23i64, 38, 48, 48, 48],684"ticker" => ["MSFT", "MSFT", "GOOG", "GOOG", "AAPL"],685"groups_numeric" => [1, 1, 2, 2, 3],686"bid" => [51.95, 51.95, 720.77, 720.92, 98.0]687]?;688689let quotes = df![690"time" => [23i64,69123,69230,69341,69448,69549,69672,69775],698"ticker" => ["GOOG", "MSFT", "MSFT", "MSFT", "GOOG", "AAPL", "GOOG", "MSFT"],699"groups_numeric" => [2, 1, 1, 1, 2, 3, 2, 1],700"bid" => [720.5, 51.95, 51.97, 51.99, 720.5, 97.99, 720.5, 52.01]701702]?;703704let out = trades.join_asof_by(705"es,706"time",707"time",708["ticker"],709["ticker"],710AsofStrategy::Backward,711None,712true,713true,714)?;715let a = out.column("bid_right").unwrap();716let a = a.f64().unwrap();717let expected = &[Some(51.95), Some(51.97), Some(720.5), Some(720.5), None];718719assert_eq!(Vec::from(a), expected);720721let out = trades.join_asof_by(722"es,723"time",724"time",725["groups_numeric"],726["groups_numeric"],727AsofStrategy::Backward,728None,729true,730true,731)?;732let a = out.column("bid_right").unwrap();733let a = a.f64().unwrap();734735assert_eq!(Vec::from(a), expected);736737Ok(())738}739740#[test]741fn test_asof_by3() -> PolarsResult<()> {742let a = df![743"a" => [ -1, 2, 2, 3, 3, 3, 4],744"b" => ["a", "a", "b", "c", "d", "e", "f"]745]?;746747let b = df![748"a" => [ 1, 3, 2, 3, 2],749"b" => ["a", "a", "b", "c", "d"],750"right_vals" => [ 1, 3, 2, 3, 4]751]?;752753let out = a.join_asof_by(754&b,755"a",756"a",757["b"],758["b"],759AsofStrategy::Forward,760None,761true,762true,763)?;764assert_eq!(out.get_column_names(), &["a", "b", "right_vals"]);765let out = out.column("right_vals").unwrap();766let out = out.i32().unwrap();767assert_eq!(768Vec::from(out),769&[Some(1), Some(3), Some(2), Some(3), None, None, None]770);771772let out = a.join_asof_by(773&b,774"a",775"a",776["b"],777["b"],778AsofStrategy::Forward,779Some(AnyValue::Int32(1)),780true,781true,782)?;783assert_eq!(out.get_column_names(), &["a", "b", "right_vals"]);784let out = out.column("right_vals").unwrap();785let out = out.i32().unwrap();786assert_eq!(787Vec::from(out),788&[None, Some(3), Some(2), Some(3), None, None, None]789);790791Ok(())792}793794#[test]795fn test_asof_by4() -> PolarsResult<()> {796let trades = df![797"time" => [23i64, 38, 48, 48, 48],798"ticker" => ["MSFT", "MSFT", "GOOG", "GOOG", "AAPL"],799"groups_numeric" => [1, 1, 2, 2, 3],800"bid" => [51.95, 51.95, 720.77, 720.92, 98.0]801]?;802803let quotes = df![804"time" => [23i64, 23, 30, 41, 48, 49, 72, 75],805"ticker" => ["GOOG", "MSFT", "MSFT", "MSFT", "GOOG", "AAPL", "GOOG", "MSFT"],806"bid" => [720.5, 51.95, 51.97, 51.99, 720.5, 97.99, 720.5, 52.01],807"groups_numeric" => [2, 1, 1, 1, 2, 3, 2, 1],808809]?;810/*811trades:812shape: (5, 4)813┌──────┬────────┬────────────────┬────────┐814│ time ┆ ticker ┆ groups_numeric ┆ bid │815│ --- ┆ --- ┆ --- ┆ --- │816│ i64 ┆ str ┆ i32 ┆ f64 │817╞══════╪════════╪════════════════╪════════╡818│ 23 ┆ MSFT ┆ 1 ┆ 51.95 │819│ 38 ┆ MSFT ┆ 1 ┆ 51.95 │820│ 48 ┆ GOOG ┆ 2 ┆ 720.77 │821│ 48 ┆ GOOG ┆ 2 ┆ 720.92 │822│ 48 ┆ AAPL ┆ 3 ┆ 98.0 │823└──────┴────────┴────────────────┴────────┘824quotes:825shape: (8, 4)826┌──────┬────────┬───────┬────────────────┐827│ time ┆ ticker ┆ bid ┆ groups_numeric │828│ --- ┆ --- ┆ --- ┆ --- │829│ i64 ┆ str ┆ f64 ┆ i32 │830╞══════╪════════╪═══════╪════════════════╡831│ 23 ┆ GOOG ┆ 720.5 ┆ 2 │832│ 23 ┆ MSFT ┆ 51.95 ┆ 1 │833│ 30 ┆ MSFT ┆ 51.97 ┆ 1 │834│ 41 ┆ MSFT ┆ 51.99 ┆ 1 │835│ 48 ┆ GOOG ┆ 720.5 ┆ 2 │836│ 49 ┆ AAPL ┆ 97.99 ┆ 3 │837│ 72 ┆ GOOG ┆ 720.5 ┆ 2 │838│ 75 ┆ MSFT ┆ 52.01 ┆ 1 │839└──────┴────────┴───────┴────────────────┘840*/841842let out = trades.join_asof_by(843"es,844"time",845"time",846["ticker"],847["ticker"],848AsofStrategy::Forward,849None,850true,851true,852)?;853let a = out.column("bid_right").unwrap();854let a = a.f64().unwrap();855let expected = &[856Some(51.95),857Some(51.99),858Some(720.5),859Some(720.5),860Some(97.99),861];862863assert_eq!(Vec::from(a), expected);864865let out = trades.join_asof_by(866"es,867"time",868"time",869["groups_numeric"],870["groups_numeric"],871AsofStrategy::Forward,872None,873true,874true,875)?;876let a = out.column("bid_right").unwrap();877let a = a.f64().unwrap();878879assert_eq!(Vec::from(a), expected);880881Ok(())882}883}884885886