Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-mem-engine/src/executors/join.rs
8409 views
1
use polars_ops::frame::DataFrameJoinOps;
2
3
use super::*;
4
5
pub struct JoinExec {
6
input_left: Option<Box<dyn Executor>>,
7
input_right: Option<Box<dyn Executor>>,
8
left_on: Vec<Arc<dyn PhysicalExpr>>,
9
right_on: Vec<Arc<dyn PhysicalExpr>>,
10
parallel: bool,
11
args: JoinArgs,
12
options: Option<JoinTypeOptions>,
13
}
14
15
impl JoinExec {
16
#[allow(clippy::too_many_arguments)]
17
pub(crate) fn new(
18
input_left: Box<dyn Executor>,
19
input_right: Box<dyn Executor>,
20
left_on: Vec<Arc<dyn PhysicalExpr>>,
21
right_on: Vec<Arc<dyn PhysicalExpr>>,
22
parallel: bool,
23
args: JoinArgs,
24
options: Option<JoinTypeOptions>,
25
) -> Self {
26
JoinExec {
27
input_left: Some(input_left),
28
input_right: Some(input_right),
29
left_on,
30
right_on,
31
parallel,
32
args,
33
options,
34
}
35
}
36
}
37
38
impl Executor for JoinExec {
39
fn execute<'a>(&'a mut self, state: &'a mut ExecutionState) -> PolarsResult<DataFrame> {
40
state.should_stop()?;
41
#[cfg(debug_assertions)]
42
{
43
if state.verbose() {
44
eprintln!("run JoinExec")
45
}
46
}
47
if state.verbose() {
48
eprintln!("join parallel: {}", self.parallel);
49
};
50
let mut input_left = self.input_left.take().unwrap();
51
let mut input_right = self.input_right.take().unwrap();
52
53
let (df_left, df_right) = if self.parallel {
54
let mut state_right = state.split();
55
let mut state_left = state.split();
56
state_right.branch_idx += 1;
57
58
POOL.join(
59
move || input_left.execute(&mut state_left),
60
move || input_right.execute(&mut state_right),
61
)
62
} else {
63
(input_left.execute(state), input_right.execute(state))
64
};
65
66
let df_left = df_left?;
67
let df_right = df_right?;
68
69
let profile_name = if state.has_node_timer() {
70
let by = self
71
.left_on
72
.iter()
73
.map(|s| Ok(s.to_field(df_left.schema())?.name))
74
.collect::<PolarsResult<Vec<_>>>()?;
75
let name = comma_delimited("join".to_string(), &by);
76
Cow::Owned(name)
77
} else {
78
Cow::Borrowed("")
79
};
80
81
state.record(
82
|| {
83
let left_on_series = self
84
.left_on
85
.iter()
86
.map(|e| e.evaluate(&df_left, state))
87
.collect::<PolarsResult<Vec<_>>>()?;
88
89
let right_on_series = self
90
.right_on
91
.iter()
92
.map(|e| e.evaluate(&df_right, state))
93
.collect::<PolarsResult<Vec<_>>>()?;
94
95
let df = df_left._join_impl(
96
&df_right,
97
left_on_series
98
.into_iter()
99
.map(|c| c.take_materialized_series())
100
.collect(),
101
right_on_series
102
.into_iter()
103
.map(|c| c.take_materialized_series())
104
.collect(),
105
self.args.clone(),
106
self.options.clone(),
107
true,
108
state.verbose(),
109
);
110
111
if state.verbose() {
112
eprintln!("{:?} join dataframes finished", self.args.how);
113
};
114
df
115
},
116
profile_name,
117
)
118
}
119
}
120
121