Path: blob/main/crates/polars-stream/src/nodes/ordered_union.rs
6939 views
use super::compute_node_prelude::*;12/// A node that first passes through all data from the first input, then the3/// second input, etc.4pub struct OrderedUnionNode {5cur_input_idx: usize,6max_morsel_seq_sent: MorselSeq,7morsel_offset: MorselSeq,8}910impl OrderedUnionNode {11pub fn new() -> Self {12Self {13cur_input_idx: 0,14max_morsel_seq_sent: MorselSeq::new(0),15morsel_offset: MorselSeq::new(0),16}17}18}1920impl ComputeNode for OrderedUnionNode {21fn name(&self) -> &str {22"ordered-union"23}2425fn update_state(26&mut self,27recv: &mut [PortState],28send: &mut [PortState],29_state: &StreamingExecutionState,30) -> PolarsResult<()> {31assert!(self.cur_input_idx <= recv.len() && send.len() == 1);3233// Skip inputs that are done.34while self.cur_input_idx < recv.len() && recv[self.cur_input_idx] == PortState::Done {35self.cur_input_idx += 1;36}3738// Act like a normal pass-through node for the current input, or mark39// ourselves as done if all inputs are handled.40if self.cur_input_idx < recv.len() {41core::mem::swap(&mut recv[self.cur_input_idx], &mut send[0]);42} else {43send[0] = PortState::Done;44}4546// Mark all inputs after the current one as blocked.47for r in recv.iter_mut().skip(self.cur_input_idx + 1) {48*r = PortState::Blocked;49}5051// Set the morsel offset one higher than any sent so far.52self.morsel_offset = self.max_morsel_seq_sent.successor();53Ok(())54}5556fn spawn<'env, 's>(57&'env mut self,58scope: &'s TaskScope<'s, 'env>,59recv_ports: &mut [Option<RecvPort<'_>>],60send_ports: &mut [Option<SendPort<'_>>],61_state: &'s StreamingExecutionState,62join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,63) {64let ready_count = recv_ports.iter().filter(|r| r.is_some()).count();65assert!(ready_count == 1 && send_ports.len() == 1);66let receivers = recv_ports[self.cur_input_idx].take().unwrap().parallel();67let senders = send_ports[0].take().unwrap().parallel();6869let mut inner_handles = Vec::new();70for (mut recv, mut send) in receivers.into_iter().zip(senders) {71let morsel_offset = self.morsel_offset;72inner_handles.push(scope.spawn_task(TaskPriority::High, async move {73let mut max_seq = MorselSeq::new(0);74while let Ok(mut morsel) = recv.recv().await {75// Ensure the morsel sequence id stream is monotonic.76let seq = morsel.seq().offset_by(morsel_offset);77max_seq = max_seq.max(seq);7879morsel.set_seq(seq);80if send.send(morsel).await.is_err() {81break;82}83}84max_seq85}));86}8788join_handles.push(scope.spawn_task(TaskPriority::High, async move {89// Update our global maximum.90for handle in inner_handles {91self.max_morsel_seq_sent = self.max_morsel_seq_sent.max(handle.await);92}93Ok(())94}));95}96}979899