Path: blob/main/crates/polars-stream/src/nodes/dynamic_slice.rs
6939 views
use std::sync::Arc;12use polars_core::schema::Schema;34use super::compute_node_prelude::*;5use crate::nodes::in_memory_sink::InMemorySinkNode;6use crate::nodes::negative_slice::NegativeSliceNode;7use crate::nodes::streaming_slice::StreamingSliceNode;89/// A node that will dispatch either to StreamingSlice or NegativeSlice10/// depending on the offset which is dynamically dispatched.11pub enum DynamicSliceNode {12GatheringParams {13offset: InMemorySinkNode,14length: InMemorySinkNode,15},16Streaming(StreamingSliceNode),17Negative(NegativeSliceNode),18}1920impl DynamicSliceNode {21pub fn new(offset_schema: Arc<Schema>, length_schema: Arc<Schema>) -> Self {22assert!(offset_schema.len() == 1);23assert!(length_schema.len() == 1);24Self::GatheringParams {25offset: InMemorySinkNode::new(offset_schema),26length: InMemorySinkNode::new(length_schema),27}28}29}3031impl ComputeNode for DynamicSliceNode {32fn name(&self) -> &str {33"dynamic-slice"34}3536fn update_state(37&mut self,38recv: &mut [PortState],39send: &mut [PortState],40state: &StreamingExecutionState,41) -> PolarsResult<()> {42assert!(recv.len() == 3 && send.len() == 1);4344if recv[1] == PortState::Done && recv[2] == PortState::Done {45if let Self::GatheringParams { offset, length } = self {46let offset = offset.get_output()?.unwrap();47let length = length.get_output()?.unwrap();48let offset_item = offset.get_columns()[0].get(0)?;49let length_item = length.get_columns()[0].get(0)?;50let offset = offset_item.extract::<i64>().unwrap_or(0);51let length = length_item.extract::<usize>().unwrap_or(usize::MAX);52if let Ok(non_neg_offset) = offset.try_into() {53*self = Self::Streaming(StreamingSliceNode::new(non_neg_offset, length));54} else {55*self = Self::Negative(NegativeSliceNode::new(offset, length));56}57}58}5960match self {61Self::GatheringParams { offset, length } => {62offset.update_state(&mut recv[1..2], &mut [], state)?;63length.update_state(&mut recv[2..3], &mut [], state)?;64recv[0] = PortState::Blocked;65send[0] = PortState::Blocked;66},67Self::Streaming(node) => {68node.update_state(&mut recv[0..1], send, state)?;69recv[1] = PortState::Done;70recv[2] = PortState::Done;71},72Self::Negative(node) => {73node.update_state(&mut recv[0..1], send, state)?;74recv[1] = PortState::Done;75recv[2] = PortState::Done;76},77}78Ok(())79}8081fn spawn<'env, 's>(82&'env mut self,83scope: &'s TaskScope<'s, 'env>,84recv_ports: &mut [Option<RecvPort<'_>>],85send_ports: &mut [Option<SendPort<'_>>],86state: &'s StreamingExecutionState,87join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,88) {89assert!(recv_ports.len() == 3 && send_ports.len() == 1);90match self {91Self::GatheringParams { offset, length } => {92assert!(recv_ports[0].is_none());93assert!(send_ports[0].is_none());94if recv_ports[1].is_some() {95offset.spawn(scope, &mut recv_ports[1..2], &mut [], state, join_handles);96}97if recv_ports[2].is_some() {98length.spawn(scope, &mut recv_ports[2..3], &mut [], state, join_handles);99}100},101Self::Streaming(node) => {102node.spawn(103scope,104&mut recv_ports[0..1],105send_ports,106state,107join_handles,108);109assert!(recv_ports[1].is_none());110assert!(recv_ports[2].is_none());111},112Self::Negative(node) => {113node.spawn(114scope,115&mut recv_ports[0..1],116send_ports,117state,118join_handles,119);120assert!(recv_ports[1].is_none());121assert!(recv_ports[2].is_none());122},123}124}125}126127128