Path: blob/main/crates/polars-stream/src/nodes/in_memory_map.rs
6939 views
use std::sync::Arc;12use polars_core::schema::Schema;3use polars_plan::plans::DataFrameUdf;45use super::compute_node_prelude::*;6use super::in_memory_sink::InMemorySinkNode;7use super::in_memory_source::InMemorySourceNode;89pub enum InMemoryMapNode {10Sink {11sink_node: InMemorySinkNode,12map: Arc<dyn DataFrameUdf>,13},14Source(InMemorySourceNode),15Done,16}1718impl InMemoryMapNode {19pub fn new(input_schema: Arc<Schema>, map: Arc<dyn DataFrameUdf>) -> Self {20Self::Sink {21sink_node: InMemorySinkNode::new(input_schema),22map,23}24}25}2627impl ComputeNode for InMemoryMapNode {28fn name(&self) -> &str {29"in-memory-map"30}3132fn update_state(33&mut self,34recv: &mut [PortState],35send: &mut [PortState],36state: &StreamingExecutionState,37) -> PolarsResult<()> {38assert!(recv.len() == 1 && send.len() == 1);3940// If the output doesn't want any more data, transition to being done.41if send[0] == PortState::Done && !matches!(self, Self::Done) {42*self = Self::Done;43}4445// If the input is done, transition to being a source.46if let Self::Sink { sink_node, map } = self {47if recv[0] == PortState::Done {48let df = sink_node.get_output()?;49let source_node = InMemorySourceNode::new(50Arc::new(map.call_udf(df.unwrap())?),51MorselSeq::default(),52);53*self = Self::Source(source_node);54}55}5657match self {58Self::Sink { sink_node, .. } => {59sink_node.update_state(recv, &mut [], state)?;60send[0] = PortState::Blocked;61},62Self::Source(source_node) => {63recv[0] = PortState::Done;64source_node.update_state(&mut [], send, state)?;65},66Self::Done => {67recv[0] = PortState::Done;68send[0] = PortState::Done;69},70}71Ok(())72}7374fn is_memory_intensive_pipeline_blocker(&self) -> bool {75matches!(self, Self::Sink { .. })76}7778fn spawn<'env, 's>(79&'env mut self,80scope: &'s TaskScope<'s, 'env>,81recv_ports: &mut [Option<RecvPort<'_>>],82send_ports: &mut [Option<SendPort<'_>>],83state: &'s StreamingExecutionState,84join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,85) {86match self {87Self::Sink { sink_node, .. } => {88sink_node.spawn(scope, recv_ports, &mut [], state, join_handles)89},90Self::Source(source) => source.spawn(scope, &mut [], send_ports, state, join_handles),91Self::Done => unreachable!(),92}93}94}959697