Path: blob/main/crates/polars-mem-engine/src/executors/join.rs
6940 views
use polars_ops::frame::DataFrameJoinOps;12use super::*;34pub struct JoinExec {5input_left: Option<Box<dyn Executor>>,6input_right: Option<Box<dyn Executor>>,7left_on: Vec<Arc<dyn PhysicalExpr>>,8right_on: Vec<Arc<dyn PhysicalExpr>>,9parallel: bool,10args: JoinArgs,11options: Option<JoinTypeOptions>,12}1314impl JoinExec {15#[allow(clippy::too_many_arguments)]16pub(crate) fn new(17input_left: Box<dyn Executor>,18input_right: Box<dyn Executor>,19left_on: Vec<Arc<dyn PhysicalExpr>>,20right_on: Vec<Arc<dyn PhysicalExpr>>,21parallel: bool,22args: JoinArgs,23options: Option<JoinTypeOptions>,24) -> Self {25JoinExec {26input_left: Some(input_left),27input_right: Some(input_right),28left_on,29right_on,30parallel,31args,32options,33}34}35}3637impl Executor for JoinExec {38fn execute<'a>(&'a mut self, state: &'a mut ExecutionState) -> PolarsResult<DataFrame> {39state.should_stop()?;40#[cfg(debug_assertions)]41{42if state.verbose() {43eprintln!("run JoinExec")44}45}46if state.verbose() {47eprintln!("join parallel: {}", self.parallel);48};49let mut input_left = self.input_left.take().unwrap();50let mut input_right = self.input_right.take().unwrap();5152let (df_left, df_right) = if self.parallel {53let mut state_right = state.split();54let mut state_left = state.split();55state_right.branch_idx += 1;5657POOL.join(58move || input_left.execute(&mut state_left),59move || input_right.execute(&mut state_right),60)61} else {62(input_left.execute(state), input_right.execute(state))63};6465let df_left = df_left?;66let df_right = df_right?;6768let profile_name = if state.has_node_timer() {69let by = self70.left_on71.iter()72.map(|s| Ok(s.to_field(df_left.schema())?.name))73.collect::<PolarsResult<Vec<_>>>()?;74let name = comma_delimited("join".to_string(), &by);75Cow::Owned(name)76} else {77Cow::Borrowed("")78};7980state.record(|| {8182let left_on_series = self83.left_on84.iter()85.map(|e| e.evaluate(&df_left, state))86.collect::<PolarsResult<Vec<_>>>()?;8788let right_on_series = self89.right_on90.iter()91.map(|e| e.evaluate(&df_right, state))92.collect::<PolarsResult<Vec<_>>>()?;9394// prepare the tolerance95// we must ensure that we use the right units96#[cfg(feature = "asof_join")]97{98if let JoinType::AsOf(options) = &mut self.args.how {99use polars_core::utils::arrow::temporal_conversions::MILLISECONDS_IN_DAY;100if let Some(tol) = &options.tolerance_str {101let duration = polars_time::Duration::try_parse(tol)?;102polars_ensure!(103duration.months() == 0,104ComputeError: "cannot use month offset in timedelta of an asof join; \105consider using 4 weeks"106);107let left_asof = df_left.column(left_on_series[0].name())?;108use DataType::*;109match left_asof.dtype() {110Datetime(tu, _) | Duration(tu) => {111let tolerance = match tu {112TimeUnit::Nanoseconds => duration.duration_ns(),113TimeUnit::Microseconds => duration.duration_us(),114TimeUnit::Milliseconds => duration.duration_ms(),115};116options.tolerance = Some(Scalar::from(tolerance))117}118Date => {119let days = (duration.duration_ms() / MILLISECONDS_IN_DAY) as i32;120options.tolerance = Some(Scalar::from(days))121}122Time => {123let tolerance = duration.duration_ns();124options.tolerance = Some(Scalar::from(tolerance))125}126_ => {127panic!("can only use timedelta string language with Date/Datetime/Duration/Time dtypes")128}129}130}131}132}133134let df = df_left._join_impl(135&df_right,136left_on_series.into_iter().map(|c| c.take_materialized_series()).collect(),137right_on_series.into_iter().map(|c| c.take_materialized_series()).collect(),138self.args.clone(),139self.options.clone(),140true,141state.verbose(),142);143144if state.verbose() {145eprintln!("{:?} join dataframes finished", self.args.how);146};147df148149}, profile_name)150}151}152153154