Path: blob/main/crates/polars-mem-engine/src/executors/join.rs
8409 views
use polars_ops::frame::DataFrameJoinOps;12use super::*;34pub struct JoinExec {5input_left: Option<Box<dyn Executor>>,6input_right: Option<Box<dyn Executor>>,7left_on: Vec<Arc<dyn PhysicalExpr>>,8right_on: Vec<Arc<dyn PhysicalExpr>>,9parallel: bool,10args: JoinArgs,11options: Option<JoinTypeOptions>,12}1314impl JoinExec {15#[allow(clippy::too_many_arguments)]16pub(crate) fn new(17input_left: Box<dyn Executor>,18input_right: Box<dyn Executor>,19left_on: Vec<Arc<dyn PhysicalExpr>>,20right_on: Vec<Arc<dyn PhysicalExpr>>,21parallel: bool,22args: JoinArgs,23options: Option<JoinTypeOptions>,24) -> Self {25JoinExec {26input_left: Some(input_left),27input_right: Some(input_right),28left_on,29right_on,30parallel,31args,32options,33}34}35}3637impl Executor for JoinExec {38fn execute<'a>(&'a mut self, state: &'a mut ExecutionState) -> PolarsResult<DataFrame> {39state.should_stop()?;40#[cfg(debug_assertions)]41{42if state.verbose() {43eprintln!("run JoinExec")44}45}46if state.verbose() {47eprintln!("join parallel: {}", self.parallel);48};49let mut input_left = self.input_left.take().unwrap();50let mut input_right = self.input_right.take().unwrap();5152let (df_left, df_right) = if self.parallel {53let mut state_right = state.split();54let mut state_left = state.split();55state_right.branch_idx += 1;5657POOL.join(58move || input_left.execute(&mut state_left),59move || input_right.execute(&mut state_right),60)61} else {62(input_left.execute(state), input_right.execute(state))63};6465let df_left = df_left?;66let df_right = df_right?;6768let profile_name = if state.has_node_timer() {69let by = self70.left_on71.iter()72.map(|s| Ok(s.to_field(df_left.schema())?.name))73.collect::<PolarsResult<Vec<_>>>()?;74let name = comma_delimited("join".to_string(), &by);75Cow::Owned(name)76} else {77Cow::Borrowed("")78};7980state.record(81|| {82let left_on_series = self83.left_on84.iter()85.map(|e| e.evaluate(&df_left, state))86.collect::<PolarsResult<Vec<_>>>()?;8788let right_on_series = self89.right_on90.iter()91.map(|e| e.evaluate(&df_right, state))92.collect::<PolarsResult<Vec<_>>>()?;9394let df = df_left._join_impl(95&df_right,96left_on_series97.into_iter()98.map(|c| c.take_materialized_series())99.collect(),100right_on_series101.into_iter()102.map(|c| c.take_materialized_series())103.collect(),104self.args.clone(),105self.options.clone(),106true,107state.verbose(),108);109110if state.verbose() {111eprintln!("{:?} join dataframes finished", self.args.how);112};113df114},115profile_name,116)117}118}119120121