Path: blob/main/crates/polars-stream/src/nodes/joins/mod.rs
8475 views
use std::sync::LazyLock;12use crossbeam_queue::ArrayQueue;3use polars_core::POOL;4use polars_error::PolarsResult;5use polars_ooc::AccessPattern::Fifo;6use polars_ooc::{Token, mm};7use polars_utils::itertools::Itertools;8use rayon::prelude::*;910use crate::async_executor::{JoinHandle, TaskPriority, TaskScope};11use crate::async_primitives::wait_group::WaitGroup;12use crate::morsel::{Morsel, MorselSeq, SourceToken};13use crate::pipe::{PortReceiver, RecvPort, port_channel};1415#[cfg(feature = "asof_join")]16pub mod asof_join;17pub mod cross_join;18pub mod equi_join;19pub mod in_memory;20pub mod merge_join;21#[cfg(feature = "semi_anti_join")]22pub mod semi_anti_join;23mod utils;2425static JOIN_SAMPLE_LIMIT: LazyLock<usize> = LazyLock::new(|| {26std::env::var("POLARS_JOIN_SAMPLE_LIMIT")27.map(|limit| limit.parse().unwrap())28.unwrap_or(10_000_000)29});3031// If one side is this much bigger than the other side we'll always use the32// smaller side as the build side without checking cardinalities.33const LOPSIDED_SAMPLE_FACTOR: usize = 10;3435// TODO: improve, generalize this, and move it away from here.36struct BufferedStream {37morsels: ArrayQueue<(Token, MorselSeq)>,38post_buffer_offset: MorselSeq,39}4041impl BufferedStream {42pub fn new(morsels: Vec<Morsel>, start_offset: MorselSeq) -> Self {43// Relabel so we can insert into parallel streams later.44let mut seq = start_offset;45let queue = ArrayQueue::new(morsels.len().max(1));46for morsel in morsels {47let token = mm().store_blocking(morsel.into_df(), Fifo);48queue.push((token, seq)).unwrap();49seq = seq.successor();50}5152Self {53morsels: queue,54post_buffer_offset: seq,55}56}5758pub fn is_empty(&self) -> bool {59self.morsels.is_empty()60}6162#[allow(clippy::needless_lifetimes)]63pub fn reinsert<'s, 'env>(64&'s self,65num_pipelines: usize,66recv_port: Option<RecvPort<'_>>,67scope: &'s TaskScope<'s, 'env>,68join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,69) -> Option<Vec<PortReceiver>> {70let receivers = if let Some(p) = recv_port {71p.parallel().into_iter().map(Some).collect_vec()72} else {73(0..num_pipelines).map(|_| None).collect_vec()74};7576let source_token = SourceToken::new();77let mut out = Vec::new();78for orig_recv in receivers {79let (mut new_send, new_recv) = port_channel(None);80out.push(new_recv);81let source_token = source_token.clone();82join_handles.push(scope.spawn_task(TaskPriority::High, async move {83// Act like an InMemorySource node until cached morsels are consumed.84let wait_group = WaitGroup::default();85loop {86let Some((token, seq)) = self.morsels.pop() else {87break;88};89let df = token.into_df().await;90let mut morsel = Morsel::new(df, seq, source_token.clone());91morsel.set_consume_token(wait_group.token());92if new_send.send(morsel).await.is_err() {93return Ok(());94}95wait_group.wait().await;96// TODO: Unfortunately we can't actually stop here without97// re-buffering morsels from the stream that comes after.98// if source_token.stop_requested() {99// break;100// }101}102103if let Some(mut recv) = orig_recv {104while let Ok(mut morsel) = recv.recv().await {105if source_token.stop_requested() {106morsel.source_token().stop();107}108morsel.set_seq(morsel.seq().offset_by(self.post_buffer_offset));109if new_send.send(morsel).await.is_err() {110break;111}112}113}114Ok(())115}));116}117Some(out)118}119}120121impl Default for BufferedStream {122fn default() -> Self {123Self {124morsels: ArrayQueue::new(1),125post_buffer_offset: MorselSeq::default(),126}127}128}129130impl Drop for BufferedStream {131fn drop(&mut self) {132POOL.install(|| {133// Parallel drop as the state might be quite big.134(0..self.morsels.len()).into_par_iter().for_each(|_| {135drop(self.morsels.pop());136});137})138}139}140141142