Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-ops/src/frame/join/general.rs
6940 views
1
use polars_utils::format_pl_smallstr;
2
3
use super::*;
4
use crate::series::coalesce_columns;
5
6
pub fn _join_suffix_name(name: &str, suffix: &str) -> PlSmallStr {
7
format_pl_smallstr!("{name}{suffix}")
8
}
9
10
fn get_suffix(suffix: Option<PlSmallStr>) -> PlSmallStr {
11
suffix.unwrap_or_else(|| PlSmallStr::from_static("_right"))
12
}
13
14
/// Renames the columns on the right to not clash with the left using a specified or otherwise default suffix
15
/// and then merges the right dataframe into the left
16
#[doc(hidden)]
17
pub fn _finish_join(
18
mut df_left: DataFrame,
19
mut df_right: DataFrame,
20
suffix: Option<PlSmallStr>,
21
) -> PolarsResult<DataFrame> {
22
let mut left_names = PlHashSet::with_capacity(df_left.width());
23
24
df_left.get_columns().iter().for_each(|series| {
25
left_names.insert(series.name());
26
});
27
28
let mut rename_strs = Vec::with_capacity(df_right.width());
29
let right_names = df_right.schema();
30
31
for name in right_names.iter_names() {
32
if left_names.contains(name) {
33
rename_strs.push(name.clone())
34
}
35
}
36
37
let suffix = get_suffix(suffix);
38
39
for name in rename_strs {
40
let new_name = _join_suffix_name(name.as_str(), suffix.as_str());
41
// Safety: IR resolving should guarantee this passes
42
df_right.rename(&name, new_name.clone()).unwrap();
43
}
44
45
drop(left_names);
46
// Safety: IR resolving should guarantee this passes
47
unsafe { df_left.hstack_mut_unchecked(df_right.get_columns()) };
48
Ok(df_left)
49
}
50
51
pub fn _coalesce_full_join(
52
mut df: DataFrame,
53
keys_left: &[PlSmallStr],
54
keys_right: &[PlSmallStr],
55
suffix: Option<PlSmallStr>,
56
df_left: &DataFrame,
57
) -> DataFrame {
58
// No need to allocate the schema because we already
59
// know for certain that the column name for left is `name`
60
// and for right is `name + suffix`
61
let schema_left = if keys_left == keys_right {
62
Arc::new(Schema::default())
63
} else {
64
df_left.schema().clone()
65
};
66
67
let schema = df.schema().clone();
68
let mut to_remove = Vec::with_capacity(keys_right.len());
69
70
// SAFETY: we maintain invariants.
71
let columns = unsafe { df.get_columns_mut() };
72
let suffix = get_suffix(suffix);
73
for (l, r) in keys_left.iter().zip(keys_right.iter()) {
74
let pos_l = schema.get_full(l.as_str()).unwrap().0;
75
76
let r = if l == r || schema_left.contains(r.as_str()) {
77
_join_suffix_name(r.as_str(), suffix.as_str())
78
} else {
79
r.clone()
80
};
81
let pos_r = schema.get_full(&r).unwrap().0;
82
83
let l = columns[pos_l].clone();
84
let r = columns[pos_r].clone();
85
86
columns[pos_l] = coalesce_columns(&[l, r]).unwrap();
87
to_remove.push(pos_r);
88
}
89
// sort in reverse order, so the indexes remain correct if we remove.
90
to_remove.sort_by(|a, b| b.cmp(a));
91
for pos in to_remove {
92
let _ = columns.remove(pos);
93
}
94
df.clear_schema();
95
df
96
}
97
98
#[cfg(feature = "chunked_ids")]
99
pub(crate) fn create_chunked_index_mapping(chunks: &[ArrayRef], len: usize) -> Vec<ChunkId> {
100
let mut vals = Vec::with_capacity(len);
101
102
for (chunk_i, chunk) in chunks.iter().enumerate() {
103
vals.extend(
104
(0..chunk.len()).map(|array_i| ChunkId::store(chunk_i as IdxSize, array_i as IdxSize)),
105
)
106
}
107
108
vals
109
}
110
111