Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/joins/mod.rs
8475 views
1
use std::sync::LazyLock;
2
3
use crossbeam_queue::ArrayQueue;
4
use polars_core::POOL;
5
use polars_error::PolarsResult;
6
use polars_ooc::AccessPattern::Fifo;
7
use polars_ooc::{Token, mm};
8
use polars_utils::itertools::Itertools;
9
use rayon::prelude::*;
10
11
use crate::async_executor::{JoinHandle, TaskPriority, TaskScope};
12
use crate::async_primitives::wait_group::WaitGroup;
13
use crate::morsel::{Morsel, MorselSeq, SourceToken};
14
use crate::pipe::{PortReceiver, RecvPort, port_channel};
15
16
#[cfg(feature = "asof_join")]
17
pub mod asof_join;
18
pub mod cross_join;
19
pub mod equi_join;
20
pub mod in_memory;
21
pub mod merge_join;
22
#[cfg(feature = "semi_anti_join")]
23
pub mod semi_anti_join;
24
mod utils;
25
26
static JOIN_SAMPLE_LIMIT: LazyLock<usize> = LazyLock::new(|| {
27
std::env::var("POLARS_JOIN_SAMPLE_LIMIT")
28
.map(|limit| limit.parse().unwrap())
29
.unwrap_or(10_000_000)
30
});
31
32
// If one side is this much bigger than the other side we'll always use the
33
// smaller side as the build side without checking cardinalities.
34
const LOPSIDED_SAMPLE_FACTOR: usize = 10;
35
36
// TODO: improve, generalize this, and move it away from here.
37
struct BufferedStream {
38
morsels: ArrayQueue<(Token, MorselSeq)>,
39
post_buffer_offset: MorselSeq,
40
}
41
42
impl BufferedStream {
43
pub fn new(morsels: Vec<Morsel>, start_offset: MorselSeq) -> Self {
44
// Relabel so we can insert into parallel streams later.
45
let mut seq = start_offset;
46
let queue = ArrayQueue::new(morsels.len().max(1));
47
for morsel in morsels {
48
let token = mm().store_blocking(morsel.into_df(), Fifo);
49
queue.push((token, seq)).unwrap();
50
seq = seq.successor();
51
}
52
53
Self {
54
morsels: queue,
55
post_buffer_offset: seq,
56
}
57
}
58
59
pub fn is_empty(&self) -> bool {
60
self.morsels.is_empty()
61
}
62
63
#[allow(clippy::needless_lifetimes)]
64
pub fn reinsert<'s, 'env>(
65
&'s self,
66
num_pipelines: usize,
67
recv_port: Option<RecvPort<'_>>,
68
scope: &'s TaskScope<'s, 'env>,
69
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
70
) -> Option<Vec<PortReceiver>> {
71
let receivers = if let Some(p) = recv_port {
72
p.parallel().into_iter().map(Some).collect_vec()
73
} else {
74
(0..num_pipelines).map(|_| None).collect_vec()
75
};
76
77
let source_token = SourceToken::new();
78
let mut out = Vec::new();
79
for orig_recv in receivers {
80
let (mut new_send, new_recv) = port_channel(None);
81
out.push(new_recv);
82
let source_token = source_token.clone();
83
join_handles.push(scope.spawn_task(TaskPriority::High, async move {
84
// Act like an InMemorySource node until cached morsels are consumed.
85
let wait_group = WaitGroup::default();
86
loop {
87
let Some((token, seq)) = self.morsels.pop() else {
88
break;
89
};
90
let df = token.into_df().await;
91
let mut morsel = Morsel::new(df, seq, source_token.clone());
92
morsel.set_consume_token(wait_group.token());
93
if new_send.send(morsel).await.is_err() {
94
return Ok(());
95
}
96
wait_group.wait().await;
97
// TODO: Unfortunately we can't actually stop here without
98
// re-buffering morsels from the stream that comes after.
99
// if source_token.stop_requested() {
100
// break;
101
// }
102
}
103
104
if let Some(mut recv) = orig_recv {
105
while let Ok(mut morsel) = recv.recv().await {
106
if source_token.stop_requested() {
107
morsel.source_token().stop();
108
}
109
morsel.set_seq(morsel.seq().offset_by(self.post_buffer_offset));
110
if new_send.send(morsel).await.is_err() {
111
break;
112
}
113
}
114
}
115
Ok(())
116
}));
117
}
118
Some(out)
119
}
120
}
121
122
impl Default for BufferedStream {
123
fn default() -> Self {
124
Self {
125
morsels: ArrayQueue::new(1),
126
post_buffer_offset: MorselSeq::default(),
127
}
128
}
129
}
130
131
impl Drop for BufferedStream {
132
fn drop(&mut self) {
133
POOL.install(|| {
134
// Parallel drop as the state might be quite big.
135
(0..self.morsels.len()).into_par_iter().for_each(|_| {
136
drop(self.morsels.pop());
137
});
138
})
139
}
140
}
141
142