Path: blob/main/crates/polars-stream/src/nodes/simple_projection.rs
6939 views
use std::sync::Arc;12use polars_core::schema::Schema;3use polars_utils::pl_str::PlSmallStr;45use super::compute_node_prelude::*;67pub struct SimpleProjectionNode {8columns: Vec<PlSmallStr>,9input_schema: Arc<Schema>,10}1112impl SimpleProjectionNode {13pub fn new(columns: Vec<PlSmallStr>, input_schema: Arc<Schema>) -> Self {14Self {15columns,16input_schema,17}18}19}2021impl ComputeNode for SimpleProjectionNode {22fn name(&self) -> &str {23"simple-projection"24}2526fn update_state(27&mut self,28recv: &mut [PortState],29send: &mut [PortState],30_state: &StreamingExecutionState,31) -> PolarsResult<()> {32assert!(recv.len() == 1 && send.len() == 1);33recv.swap_with_slice(send);34Ok(())35}3637fn spawn<'env, 's>(38&'env mut self,39scope: &'s TaskScope<'s, 'env>,40recv_ports: &mut [Option<RecvPort<'_>>],41send_ports: &mut [Option<SendPort<'_>>],42_state: &'s StreamingExecutionState,43join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,44) {45assert!(recv_ports.len() == 1 && send_ports.len() == 1);46let receivers = recv_ports[0].take().unwrap().parallel();47let senders = send_ports[0].take().unwrap().parallel();4849for (mut recv, mut send) in receivers.into_iter().zip(senders) {50let slf = &*self;51join_handles.push(scope.spawn_task(TaskPriority::High, async move {52while let Ok(morsel) = recv.recv().await {53let morsel = morsel.try_map(|df| {54// TODO: can this be unchecked?55let check_duplicates = true;56df._select_with_schema_impl(57slf.columns.as_slice(),58&slf.input_schema,59check_duplicates,60)61})?;6263if send.send(morsel).await.is_err() {64break;65}66}6768Ok(())69}));70}71}72}737475