Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/joins/mod.rs
6939 views
1
use std::sync::LazyLock;
2
3
use crossbeam_queue::ArrayQueue;
4
use polars_core::POOL;
5
use polars_error::PolarsResult;
6
use polars_utils::itertools::Itertools;
7
use rayon::prelude::*;
8
9
use crate::async_executor::{JoinHandle, TaskPriority, TaskScope};
10
use crate::async_primitives::connector::{Receiver, connector};
11
use crate::async_primitives::wait_group::WaitGroup;
12
use crate::morsel::{Morsel, MorselSeq, SourceToken};
13
use crate::pipe::RecvPort;
14
15
pub mod cross_join;
16
pub mod equi_join;
17
pub mod in_memory;
18
#[cfg(feature = "semi_anti_join")]
19
pub mod semi_anti_join;
20
21
static JOIN_SAMPLE_LIMIT: LazyLock<usize> = LazyLock::new(|| {
22
std::env::var("POLARS_JOIN_SAMPLE_LIMIT")
23
.map(|limit| limit.parse().unwrap())
24
.unwrap_or(10_000_000)
25
});
26
27
// If one side is this much bigger than the other side we'll always use the
28
// smaller side as the build side without checking cardinalities.
29
const LOPSIDED_SAMPLE_FACTOR: usize = 10;
30
31
// TODO: improve, generalize this, and move it away from here.
32
struct BufferedStream {
33
morsels: ArrayQueue<Morsel>,
34
post_buffer_offset: MorselSeq,
35
}
36
37
impl BufferedStream {
38
pub fn new(morsels: Vec<Morsel>, start_offset: MorselSeq) -> Self {
39
// Relabel so we can insert into parallel streams later.
40
let mut seq = start_offset;
41
let queue = ArrayQueue::new(morsels.len().max(1));
42
for mut morsel in morsels {
43
morsel.set_seq(seq);
44
queue.push(morsel).unwrap();
45
seq = seq.successor();
46
}
47
48
Self {
49
morsels: queue,
50
post_buffer_offset: seq,
51
}
52
}
53
54
pub fn is_empty(&self) -> bool {
55
self.morsels.is_empty()
56
}
57
58
#[allow(clippy::needless_lifetimes)]
59
pub fn reinsert<'s, 'env>(
60
&'s self,
61
num_pipelines: usize,
62
recv_port: Option<RecvPort<'_>>,
63
scope: &'s TaskScope<'s, 'env>,
64
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
65
) -> Option<Vec<Receiver<Morsel>>> {
66
let receivers = if let Some(p) = recv_port {
67
p.parallel().into_iter().map(Some).collect_vec()
68
} else {
69
(0..num_pipelines).map(|_| None).collect_vec()
70
};
71
72
let source_token = SourceToken::new();
73
let mut out = Vec::new();
74
for orig_recv in receivers {
75
let (mut new_send, new_recv) = connector();
76
out.push(new_recv);
77
let source_token = source_token.clone();
78
join_handles.push(scope.spawn_task(TaskPriority::High, async move {
79
// Act like an InMemorySource node until cached morsels are consumed.
80
let wait_group = WaitGroup::default();
81
loop {
82
let Some(mut morsel) = self.morsels.pop() else {
83
break;
84
};
85
morsel.replace_source_token(source_token.clone());
86
morsel.set_consume_token(wait_group.token());
87
if new_send.send(morsel).await.is_err() {
88
return Ok(());
89
}
90
wait_group.wait().await;
91
// TODO: Unfortunately we can't actually stop here without
92
// re-buffering morsels from the stream that comes after.
93
// if source_token.stop_requested() {
94
// break;
95
// }
96
}
97
98
if let Some(mut recv) = orig_recv {
99
while let Ok(mut morsel) = recv.recv().await {
100
if source_token.stop_requested() {
101
morsel.source_token().stop();
102
}
103
morsel.set_seq(morsel.seq().offset_by(self.post_buffer_offset));
104
if new_send.send(morsel).await.is_err() {
105
break;
106
}
107
}
108
}
109
Ok(())
110
}));
111
}
112
Some(out)
113
}
114
}
115
116
impl Default for BufferedStream {
117
fn default() -> Self {
118
Self {
119
morsels: ArrayQueue::new(1),
120
post_buffer_offset: MorselSeq::default(),
121
}
122
}
123
}
124
125
impl Drop for BufferedStream {
126
fn drop(&mut self) {
127
POOL.install(|| {
128
// Parallel drop as the state might be quite big.
129
(0..self.morsels.len())
130
.into_par_iter()
131
.for_each(|_| drop(self.morsels.pop()));
132
})
133
}
134
}
135
136