Path: blob/main/crates/polars-ops/src/frame/join/general.rs
6940 views
use polars_utils::format_pl_smallstr;12use super::*;3use crate::series::coalesce_columns;45pub fn _join_suffix_name(name: &str, suffix: &str) -> PlSmallStr {6format_pl_smallstr!("{name}{suffix}")7}89fn get_suffix(suffix: Option<PlSmallStr>) -> PlSmallStr {10suffix.unwrap_or_else(|| PlSmallStr::from_static("_right"))11}1213/// Renames the columns on the right to not clash with the left using a specified or otherwise default suffix14/// and then merges the right dataframe into the left15#[doc(hidden)]16pub fn _finish_join(17mut df_left: DataFrame,18mut df_right: DataFrame,19suffix: Option<PlSmallStr>,20) -> PolarsResult<DataFrame> {21let mut left_names = PlHashSet::with_capacity(df_left.width());2223df_left.get_columns().iter().for_each(|series| {24left_names.insert(series.name());25});2627let mut rename_strs = Vec::with_capacity(df_right.width());28let right_names = df_right.schema();2930for name in right_names.iter_names() {31if left_names.contains(name) {32rename_strs.push(name.clone())33}34}3536let suffix = get_suffix(suffix);3738for name in rename_strs {39let new_name = _join_suffix_name(name.as_str(), suffix.as_str());40// Safety: IR resolving should guarantee this passes41df_right.rename(&name, new_name.clone()).unwrap();42}4344drop(left_names);45// Safety: IR resolving should guarantee this passes46unsafe { df_left.hstack_mut_unchecked(df_right.get_columns()) };47Ok(df_left)48}4950pub fn _coalesce_full_join(51mut df: DataFrame,52keys_left: &[PlSmallStr],53keys_right: &[PlSmallStr],54suffix: Option<PlSmallStr>,55df_left: &DataFrame,56) -> DataFrame {57// No need to allocate the schema because we already58// know for certain that the column name for left is `name`59// and for right is `name + suffix`60let schema_left = if keys_left == keys_right {61Arc::new(Schema::default())62} else {63df_left.schema().clone()64};6566let schema = df.schema().clone();67let mut to_remove = Vec::with_capacity(keys_right.len());6869// SAFETY: we maintain invariants.70let columns = unsafe { df.get_columns_mut() };71let suffix = get_suffix(suffix);72for (l, r) in keys_left.iter().zip(keys_right.iter()) {73let pos_l = schema.get_full(l.as_str()).unwrap().0;7475let r = if l == r || schema_left.contains(r.as_str()) {76_join_suffix_name(r.as_str(), suffix.as_str())77} else {78r.clone()79};80let pos_r = schema.get_full(&r).unwrap().0;8182let l = columns[pos_l].clone();83let r = columns[pos_r].clone();8485columns[pos_l] = coalesce_columns(&[l, r]).unwrap();86to_remove.push(pos_r);87}88// sort in reverse order, so the indexes remain correct if we remove.89to_remove.sort_by(|a, b| b.cmp(a));90for pos in to_remove {91let _ = columns.remove(pos);92}93df.clear_schema();94df95}9697#[cfg(feature = "chunked_ids")]98pub(crate) fn create_chunked_index_mapping(chunks: &[ArrayRef], len: usize) -> Vec<ChunkId> {99let mut vals = Vec::with_capacity(len);100101for (chunk_i, chunk) in chunks.iter().enumerate() {102vals.extend(103(0..chunk.len()).map(|array_i| ChunkId::store(chunk_i as IdxSize, array_i as IdxSize)),104)105}106107vals108}109110111