Path: blob/main/crates/polars-stream/src/nodes/in_memory_sink.rs
6939 views
use std::sync::Arc;12use parking_lot::Mutex;3use polars_core::schema::Schema;4use polars_core::utils::accumulate_dataframes_vertical_unchecked;56use super::compute_node_prelude::*;7use crate::utils::in_memory_linearize::linearize;89pub struct InMemorySinkNode {10morsels_per_pipe: Mutex<Vec<Vec<(MorselSeq, DataFrame)>>>,11schema: Arc<Schema>,12}1314impl InMemorySinkNode {15pub fn new(schema: Arc<Schema>) -> Self {16Self {17morsels_per_pipe: Mutex::default(),18schema,19}20}21}2223impl ComputeNode for InMemorySinkNode {24fn name(&self) -> &str {25"in-memory-sink"26}2728fn update_state(29&mut self,30recv: &mut [PortState],31send: &mut [PortState],32_state: &StreamingExecutionState,33) -> PolarsResult<()> {34assert!(send.is_empty());35assert!(recv.len() == 1);3637// We are always ready to receive, unless the sender is done, then we're38// also done.39if recv[0] != PortState::Done {40recv[0] = PortState::Ready;41}42Ok(())43}4445fn is_memory_intensive_pipeline_blocker(&self) -> bool {46true47}4849fn spawn<'env, 's>(50&'env mut self,51scope: &'s TaskScope<'s, 'env>,52recv_ports: &mut [Option<RecvPort<'_>>],53send_ports: &mut [Option<SendPort<'_>>],54_state: &'s StreamingExecutionState,55join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,56) {57assert!(recv_ports.len() == 1 && send_ports.is_empty());58let receivers = recv_ports[0].take().unwrap().parallel();5960for mut recv in receivers {61let slf = &*self;62join_handles.push(scope.spawn_task(TaskPriority::High, async move {63let mut morsels = Vec::new();64while let Ok(mut morsel) = recv.recv().await {65morsel.take_consume_token();66morsels.push((morsel.seq(), morsel.into_df()));67}6869slf.morsels_per_pipe.lock().push(morsels);70Ok(())71}));72}73}7475fn get_output(&mut self) -> PolarsResult<Option<DataFrame>> {76let morsels_per_pipe = core::mem::take(&mut *self.morsels_per_pipe.get_mut());77let dataframes = linearize(morsels_per_pipe);78if dataframes.is_empty() {79Ok(Some(DataFrame::empty_with_schema(&self.schema)))80} else {81Ok(Some(accumulate_dataframes_vertical_unchecked(dataframes)))82}83}84}858687