Path: blob/main/crates/polars-stream/src/nodes/joins/mod.rs
6939 views
use std::sync::LazyLock;12use crossbeam_queue::ArrayQueue;3use polars_core::POOL;4use polars_error::PolarsResult;5use polars_utils::itertools::Itertools;6use rayon::prelude::*;78use crate::async_executor::{JoinHandle, TaskPriority, TaskScope};9use crate::async_primitives::connector::{Receiver, connector};10use crate::async_primitives::wait_group::WaitGroup;11use crate::morsel::{Morsel, MorselSeq, SourceToken};12use crate::pipe::RecvPort;1314pub mod cross_join;15pub mod equi_join;16pub mod in_memory;17#[cfg(feature = "semi_anti_join")]18pub mod semi_anti_join;1920static JOIN_SAMPLE_LIMIT: LazyLock<usize> = LazyLock::new(|| {21std::env::var("POLARS_JOIN_SAMPLE_LIMIT")22.map(|limit| limit.parse().unwrap())23.unwrap_or(10_000_000)24});2526// If one side is this much bigger than the other side we'll always use the27// smaller side as the build side without checking cardinalities.28const LOPSIDED_SAMPLE_FACTOR: usize = 10;2930// TODO: improve, generalize this, and move it away from here.31struct BufferedStream {32morsels: ArrayQueue<Morsel>,33post_buffer_offset: MorselSeq,34}3536impl BufferedStream {37pub fn new(morsels: Vec<Morsel>, start_offset: MorselSeq) -> Self {38// Relabel so we can insert into parallel streams later.39let mut seq = start_offset;40let queue = ArrayQueue::new(morsels.len().max(1));41for mut morsel in morsels {42morsel.set_seq(seq);43queue.push(morsel).unwrap();44seq = seq.successor();45}4647Self {48morsels: queue,49post_buffer_offset: seq,50}51}5253pub fn is_empty(&self) -> bool {54self.morsels.is_empty()55}5657#[allow(clippy::needless_lifetimes)]58pub fn reinsert<'s, 'env>(59&'s self,60num_pipelines: usize,61recv_port: Option<RecvPort<'_>>,62scope: &'s TaskScope<'s, 'env>,63join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,64) -> Option<Vec<Receiver<Morsel>>> {65let receivers = if let Some(p) = recv_port {66p.parallel().into_iter().map(Some).collect_vec()67} else {68(0..num_pipelines).map(|_| None).collect_vec()69};7071let source_token = SourceToken::new();72let mut out = Vec::new();73for orig_recv in receivers {74let (mut new_send, new_recv) = connector();75out.push(new_recv);76let source_token = source_token.clone();77join_handles.push(scope.spawn_task(TaskPriority::High, async move {78// Act like an InMemorySource node until cached morsels are consumed.79let wait_group = WaitGroup::default();80loop {81let Some(mut morsel) = self.morsels.pop() else {82break;83};84morsel.replace_source_token(source_token.clone());85morsel.set_consume_token(wait_group.token());86if new_send.send(morsel).await.is_err() {87return Ok(());88}89wait_group.wait().await;90// TODO: Unfortunately we can't actually stop here without91// re-buffering morsels from the stream that comes after.92// if source_token.stop_requested() {93// break;94// }95}9697if let Some(mut recv) = orig_recv {98while let Ok(mut morsel) = recv.recv().await {99if source_token.stop_requested() {100morsel.source_token().stop();101}102morsel.set_seq(morsel.seq().offset_by(self.post_buffer_offset));103if new_send.send(morsel).await.is_err() {104break;105}106}107}108Ok(())109}));110}111Some(out)112}113}114115impl Default for BufferedStream {116fn default() -> Self {117Self {118morsels: ArrayQueue::new(1),119post_buffer_offset: MorselSeq::default(),120}121}122}123124impl Drop for BufferedStream {125fn drop(&mut self) {126POOL.install(|| {127// Parallel drop as the state might be quite big.128(0..self.morsels.len())129.into_par_iter()130.for_each(|_| drop(self.morsels.pop()));131})132}133}134135136