Path: blob/main/crates/polars-ops/src/frame/join/hash_join/mod.rs
6940 views
#![allow(unsafe_op_in_unsafe_fn)]1pub(super) mod single_keys;2mod single_keys_dispatch;3mod single_keys_inner;4mod single_keys_left;5mod single_keys_outer;6#[cfg(feature = "semi_anti_join")]7mod single_keys_semi_anti;8pub(super) mod sort_merge;9use arrow::array::ArrayRef;10use polars_core::POOL;11use polars_core::utils::_set_partition_size;12use polars_utils::index::ChunkId;13pub(super) use single_keys::*;14pub use single_keys_dispatch::SeriesJoin;15#[cfg(feature = "asof_join")]16pub(super) use single_keys_dispatch::prepare_binary;17use single_keys_inner::*;18use single_keys_left::*;19use single_keys_outer::*;20#[cfg(feature = "semi_anti_join")]21use single_keys_semi_anti::*;22pub(crate) use sort_merge::*;2324pub use super::*;25#[cfg(feature = "chunked_ids")]26use crate::chunked_array::gather::chunked::TakeChunkedHorPar;2728pub fn default_join_ids() -> ChunkJoinOptIds {29#[cfg(feature = "chunked_ids")]30{31Either::Left(vec![])32}33#[cfg(not(feature = "chunked_ids"))]34{35vec![]36}37}3839macro_rules! det_hash_prone_order {40($self:expr, $other:expr) => {{41// The shortest relation will be used to create a hash table.42if $self.len() > $other.len() {43($self, $other, false)44} else {45($other, $self, true)46}47}};48}4950#[cfg(feature = "performant")]51use arrow::legacy::conversion::primitive_to_vec;52pub(super) use det_hash_prone_order;5354pub trait JoinDispatch: IntoDf {55/// # Safety56/// Join tuples must be in bounds57#[cfg(feature = "chunked_ids")]58unsafe fn create_left_df_chunked(59&self,60chunk_ids: &[ChunkId],61left_join: bool,62was_sliced: bool,63) -> DataFrame {64let df_self = self.to_df();6566let left_join_no_duplicate_matches =67left_join && !was_sliced && chunk_ids.len() == df_self.height();6869if left_join_no_duplicate_matches {70df_self.clone()71} else {72// left join keys are in ascending order73let sorted = if left_join {74IsSorted::Ascending75} else {76IsSorted::Not77};78df_self._take_chunked_unchecked_hor_par(chunk_ids, sorted)79}80}8182/// # Safety83/// Join tuples must be in bounds84unsafe fn _create_left_df_from_slice(85&self,86join_tuples: &[IdxSize],87left_join: bool,88was_sliced: bool,89sorted_tuple_idx: bool,90) -> DataFrame {91let df_self = self.to_df();9293let left_join_no_duplicate_matches =94sorted_tuple_idx && left_join && !was_sliced && join_tuples.len() == df_self.height();9596if left_join_no_duplicate_matches {97df_self.clone()98} else {99let sorted = if sorted_tuple_idx {100IsSorted::Ascending101} else {102IsSorted::Not103};104105df_self._take_unchecked_slice_sorted(join_tuples, true, sorted)106}107}108109#[cfg(feature = "semi_anti_join")]110/// # Safety111/// `idx` must be in bounds112unsafe fn _finish_anti_semi_join(113&self,114mut idx: &[IdxSize],115slice: Option<(i64, usize)>,116) -> DataFrame {117let ca_self = self.to_df();118if let Some((offset, len)) = slice {119idx = slice_slice(idx, offset, len);120}121// idx from anti-semi join should always be sorted122ca_self._take_unchecked_slice_sorted(idx, true, IsSorted::Ascending)123}124125#[cfg(feature = "semi_anti_join")]126fn _semi_anti_join_from_series(127&self,128s_left: &Series,129s_right: &Series,130slice: Option<(i64, usize)>,131anti: bool,132nulls_equal: bool,133) -> PolarsResult<DataFrame> {134let ca_self = self.to_df();135136let idx = s_left.hash_join_semi_anti(s_right, anti, nulls_equal)?;137// SAFETY:138// indices are in bounds139Ok(unsafe { ca_self._finish_anti_semi_join(&idx, slice) })140}141fn _full_join_from_series(142&self,143other: &DataFrame,144s_left: &Series,145s_right: &Series,146args: JoinArgs,147) -> PolarsResult<DataFrame> {148let df_self = self.to_df();149150// Get the indexes of the joined relations151let (mut join_idx_l, mut join_idx_r) =152s_left.hash_join_outer(s_right, args.validation, args.nulls_equal)?;153154try_raise_keyboard_interrupt();155if let Some((offset, len)) = args.slice {156let (offset, len) = slice_offsets(offset, len, join_idx_l.len());157join_idx_l.slice(offset, len);158join_idx_r.slice(offset, len);159}160let idx_ca_l = IdxCa::with_chunk("a".into(), join_idx_l);161let idx_ca_r = IdxCa::with_chunk("b".into(), join_idx_r);162163let (df_left, df_right) = if args.maintain_order != MaintainOrderJoin::None {164let mut df = DataFrame::new(vec![165idx_ca_l.into_series().into(),166idx_ca_r.into_series().into(),167])?;168169let options = SortMultipleOptions::new()170.with_order_descending(false)171.with_maintain_order(true)172.with_nulls_last(true);173174let columns = match args.maintain_order {175MaintainOrderJoin::Left => vec!["a"],176MaintainOrderJoin::LeftRight => vec!["a", "b"],177MaintainOrderJoin::Right => vec!["b"],178MaintainOrderJoin::RightLeft => vec!["b", "a"],179_ => unreachable!(),180};181182df.sort_in_place(columns, options)?;183184let join_tuples_left = df.column("a").unwrap().idx().unwrap();185let join_tuples_right = df.column("b").unwrap().idx().unwrap();186POOL.join(187|| unsafe { df_self.take_unchecked(join_tuples_left) },188|| unsafe { other.take_unchecked(join_tuples_right) },189)190} else {191POOL.join(192|| unsafe { df_self.take_unchecked(&idx_ca_l) },193|| unsafe { other.take_unchecked(&idx_ca_r) },194)195};196197let coalesce = args.coalesce.coalesce(&JoinType::Full);198let out = _finish_join(df_left, df_right, args.suffix.clone());199if coalesce {200Ok(_coalesce_full_join(201out?,202&[s_left.name().clone()],203&[s_right.name().clone()],204args.suffix,205df_self,206))207} else {208out209}210}211}212213impl JoinDispatch for DataFrame {}214215216