Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/ordered_union.rs
6939 views
1
use super::compute_node_prelude::*;
2
3
/// A node that first passes through all data from the first input, then the
4
/// second input, etc.
5
pub struct OrderedUnionNode {
6
cur_input_idx: usize,
7
max_morsel_seq_sent: MorselSeq,
8
morsel_offset: MorselSeq,
9
}
10
11
impl OrderedUnionNode {
12
pub fn new() -> Self {
13
Self {
14
cur_input_idx: 0,
15
max_morsel_seq_sent: MorselSeq::new(0),
16
morsel_offset: MorselSeq::new(0),
17
}
18
}
19
}
20
21
impl ComputeNode for OrderedUnionNode {
22
fn name(&self) -> &str {
23
"ordered-union"
24
}
25
26
fn update_state(
27
&mut self,
28
recv: &mut [PortState],
29
send: &mut [PortState],
30
_state: &StreamingExecutionState,
31
) -> PolarsResult<()> {
32
assert!(self.cur_input_idx <= recv.len() && send.len() == 1);
33
34
// Skip inputs that are done.
35
while self.cur_input_idx < recv.len() && recv[self.cur_input_idx] == PortState::Done {
36
self.cur_input_idx += 1;
37
}
38
39
// Act like a normal pass-through node for the current input, or mark
40
// ourselves as done if all inputs are handled.
41
if self.cur_input_idx < recv.len() {
42
core::mem::swap(&mut recv[self.cur_input_idx], &mut send[0]);
43
} else {
44
send[0] = PortState::Done;
45
}
46
47
// Mark all inputs after the current one as blocked.
48
for r in recv.iter_mut().skip(self.cur_input_idx + 1) {
49
*r = PortState::Blocked;
50
}
51
52
// Set the morsel offset one higher than any sent so far.
53
self.morsel_offset = self.max_morsel_seq_sent.successor();
54
Ok(())
55
}
56
57
fn spawn<'env, 's>(
58
&'env mut self,
59
scope: &'s TaskScope<'s, 'env>,
60
recv_ports: &mut [Option<RecvPort<'_>>],
61
send_ports: &mut [Option<SendPort<'_>>],
62
_state: &'s StreamingExecutionState,
63
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
64
) {
65
let ready_count = recv_ports.iter().filter(|r| r.is_some()).count();
66
assert!(ready_count == 1 && send_ports.len() == 1);
67
let receivers = recv_ports[self.cur_input_idx].take().unwrap().parallel();
68
let senders = send_ports[0].take().unwrap().parallel();
69
70
let mut inner_handles = Vec::new();
71
for (mut recv, mut send) in receivers.into_iter().zip(senders) {
72
let morsel_offset = self.morsel_offset;
73
inner_handles.push(scope.spawn_task(TaskPriority::High, async move {
74
let mut max_seq = MorselSeq::new(0);
75
while let Ok(mut morsel) = recv.recv().await {
76
// Ensure the morsel sequence id stream is monotonic.
77
let seq = morsel.seq().offset_by(morsel_offset);
78
max_seq = max_seq.max(seq);
79
80
morsel.set_seq(seq);
81
if send.send(morsel).await.is_err() {
82
break;
83
}
84
}
85
max_seq
86
}));
87
}
88
89
join_handles.push(scope.spawn_task(TaskPriority::High, async move {
90
// Update our global maximum.
91
for handle in inner_handles {
92
self.max_morsel_seq_sent = self.max_morsel_seq_sent.max(handle.await);
93
}
94
Ok(())
95
}));
96
}
97
}
98
99