Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-mem-engine/src/executors/join.rs
6940 views
1
use polars_ops::frame::DataFrameJoinOps;
2
3
use super::*;
4
5
pub struct JoinExec {
6
input_left: Option<Box<dyn Executor>>,
7
input_right: Option<Box<dyn Executor>>,
8
left_on: Vec<Arc<dyn PhysicalExpr>>,
9
right_on: Vec<Arc<dyn PhysicalExpr>>,
10
parallel: bool,
11
args: JoinArgs,
12
options: Option<JoinTypeOptions>,
13
}
14
15
impl JoinExec {
16
#[allow(clippy::too_many_arguments)]
17
pub(crate) fn new(
18
input_left: Box<dyn Executor>,
19
input_right: Box<dyn Executor>,
20
left_on: Vec<Arc<dyn PhysicalExpr>>,
21
right_on: Vec<Arc<dyn PhysicalExpr>>,
22
parallel: bool,
23
args: JoinArgs,
24
options: Option<JoinTypeOptions>,
25
) -> Self {
26
JoinExec {
27
input_left: Some(input_left),
28
input_right: Some(input_right),
29
left_on,
30
right_on,
31
parallel,
32
args,
33
options,
34
}
35
}
36
}
37
38
impl Executor for JoinExec {
39
fn execute<'a>(&'a mut self, state: &'a mut ExecutionState) -> PolarsResult<DataFrame> {
40
state.should_stop()?;
41
#[cfg(debug_assertions)]
42
{
43
if state.verbose() {
44
eprintln!("run JoinExec")
45
}
46
}
47
if state.verbose() {
48
eprintln!("join parallel: {}", self.parallel);
49
};
50
let mut input_left = self.input_left.take().unwrap();
51
let mut input_right = self.input_right.take().unwrap();
52
53
let (df_left, df_right) = if self.parallel {
54
let mut state_right = state.split();
55
let mut state_left = state.split();
56
state_right.branch_idx += 1;
57
58
POOL.join(
59
move || input_left.execute(&mut state_left),
60
move || input_right.execute(&mut state_right),
61
)
62
} else {
63
(input_left.execute(state), input_right.execute(state))
64
};
65
66
let df_left = df_left?;
67
let df_right = df_right?;
68
69
let profile_name = if state.has_node_timer() {
70
let by = self
71
.left_on
72
.iter()
73
.map(|s| Ok(s.to_field(df_left.schema())?.name))
74
.collect::<PolarsResult<Vec<_>>>()?;
75
let name = comma_delimited("join".to_string(), &by);
76
Cow::Owned(name)
77
} else {
78
Cow::Borrowed("")
79
};
80
81
state.record(|| {
82
83
let left_on_series = self
84
.left_on
85
.iter()
86
.map(|e| e.evaluate(&df_left, state))
87
.collect::<PolarsResult<Vec<_>>>()?;
88
89
let right_on_series = self
90
.right_on
91
.iter()
92
.map(|e| e.evaluate(&df_right, state))
93
.collect::<PolarsResult<Vec<_>>>()?;
94
95
// prepare the tolerance
96
// we must ensure that we use the right units
97
#[cfg(feature = "asof_join")]
98
{
99
if let JoinType::AsOf(options) = &mut self.args.how {
100
use polars_core::utils::arrow::temporal_conversions::MILLISECONDS_IN_DAY;
101
if let Some(tol) = &options.tolerance_str {
102
let duration = polars_time::Duration::try_parse(tol)?;
103
polars_ensure!(
104
duration.months() == 0,
105
ComputeError: "cannot use month offset in timedelta of an asof join; \
106
consider using 4 weeks"
107
);
108
let left_asof = df_left.column(left_on_series[0].name())?;
109
use DataType::*;
110
match left_asof.dtype() {
111
Datetime(tu, _) | Duration(tu) => {
112
let tolerance = match tu {
113
TimeUnit::Nanoseconds => duration.duration_ns(),
114
TimeUnit::Microseconds => duration.duration_us(),
115
TimeUnit::Milliseconds => duration.duration_ms(),
116
};
117
options.tolerance = Some(Scalar::from(tolerance))
118
}
119
Date => {
120
let days = (duration.duration_ms() / MILLISECONDS_IN_DAY) as i32;
121
options.tolerance = Some(Scalar::from(days))
122
}
123
Time => {
124
let tolerance = duration.duration_ns();
125
options.tolerance = Some(Scalar::from(tolerance))
126
}
127
_ => {
128
panic!("can only use timedelta string language with Date/Datetime/Duration/Time dtypes")
129
}
130
}
131
}
132
}
133
}
134
135
let df = df_left._join_impl(
136
&df_right,
137
left_on_series.into_iter().map(|c| c.take_materialized_series()).collect(),
138
right_on_series.into_iter().map(|c| c.take_materialized_series()).collect(),
139
self.args.clone(),
140
self.options.clone(),
141
true,
142
state.verbose(),
143
);
144
145
if state.verbose() {
146
eprintln!("{:?} join dataframes finished", self.args.how);
147
};
148
df
149
150
}, profile_name)
151
}
152
}
153
154