Path: blob/main/crates/polars-stream/src/nodes/simple_projection.rs
8424 views
use std::sync::Arc;12use polars_core::schema::Schema;3use polars_utils::pl_str::PlSmallStr;45use super::compute_node_prelude::*;67pub struct SimpleProjectionNode {8columns: Vec<PlSmallStr>,9input_schema: Arc<Schema>,10}1112impl SimpleProjectionNode {13pub fn new(columns: Vec<PlSmallStr>, input_schema: Arc<Schema>) -> Self {14Self {15columns,16input_schema,17}18}19}2021impl ComputeNode for SimpleProjectionNode {22fn name(&self) -> &str {23"simple-projection"24}2526fn update_state(27&mut self,28recv: &mut [PortState],29send: &mut [PortState],30_state: &StreamingExecutionState,31) -> PolarsResult<()> {32assert!(recv.len() == 1 && send.len() == 1);33recv.swap_with_slice(send);34Ok(())35}3637fn spawn<'env, 's>(38&'env mut self,39scope: &'s TaskScope<'s, 'env>,40recv_ports: &mut [Option<RecvPort<'_>>],41send_ports: &mut [Option<SendPort<'_>>],42_state: &'s StreamingExecutionState,43join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,44) {45assert!(recv_ports.len() == 1 && send_ports.len() == 1);46let receivers = recv_ports[0].take().unwrap().parallel();47let senders = send_ports[0].take().unwrap().parallel();4849for (mut recv, mut send) in receivers.into_iter().zip(senders) {50let slf = &*self;51join_handles.push(scope.spawn_task(TaskPriority::High, async move {52while let Ok(morsel) = recv.recv().await {53let morsel = morsel.try_map(|df| unsafe {54df.with_schema(slf.input_schema.clone())55.select_unchecked(slf.columns.as_slice())56})?;5758if send.send(morsel).await.is_err() {59break;60}61}6263Ok(())64}));65}66}67}686970