Path: blob/main/crates/polars-stream/src/nodes/input_independent_select.rs
6939 views
use std::sync::Arc;12use polars_core::POOL;3use polars_core::prelude::IntoColumn;4use rayon::iter::IntoParallelRefIterator;5use rayon::prelude::*;67use super::compute_node_prelude::*;8use crate::expression::StreamExpr;9use crate::nodes::in_memory_source::InMemorySourceNode;1011pub enum InputIndependentSelectNode {12ToSelect { selectors: Vec<StreamExpr> },13Source(InMemorySourceNode),14Done,15}1617impl InputIndependentSelectNode {18pub fn new(selectors: Vec<StreamExpr>) -> Self {19Self::ToSelect { selectors }20}21}2223impl ComputeNode for InputIndependentSelectNode {24fn name(&self) -> &str {25"input-independent-select"26}2728fn update_state(29&mut self,30recv: &mut [PortState],31send: &mut [PortState],32state: &StreamingExecutionState,33) -> PolarsResult<()> {34assert!(recv.is_empty() && send.len() == 1);35if send[0] == PortState::Done {36*self = Self::Done;37return Ok(());38}3940POOL.install(|| {41if let Self::ToSelect { selectors } = self {42let empty_df = DataFrame::empty();43let state = ExecutionState::new();44let selected: Result<Vec<_>, _> = selectors45.par_iter()46.map(|selector| {47let s = selector.evaluate_blocking(&empty_df, &state)?;48PolarsResult::Ok(s.into_column())49})50.collect();51let ret = DataFrame::new_with_broadcast(selected?)?;52let src_node = InMemorySourceNode::new(Arc::new(ret), MorselSeq::default());53*self = InputIndependentSelectNode::Source(src_node);54}55PolarsResult::Ok(())56})?;5758match self {59Self::ToSelect { .. } => unreachable!(),60Self::Source(src) => src.update_state(recv, send, state),61Self::Done => {62send[0] = PortState::Done;63Ok(())64},65}66}6768fn spawn<'env, 's>(69&'env mut self,70scope: &'s TaskScope<'s, 'env>,71recv_ports: &mut [Option<RecvPort<'_>>],72send_ports: &mut [Option<SendPort<'_>>],73state: &'s StreamingExecutionState,74join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,75) {76assert!(recv_ports.is_empty() && send_ports.len() == 1);77let Self::Source(src) = self else {78unreachable!()79};80src.spawn(scope, recv_ports, send_ports, state, join_handles);81}82}838485