Path: blob/main/crates/polars-stream/src/nodes/streaming_slice.rs
6939 views
use super::compute_node_prelude::*;12/// A node that will pass-through up to length rows, starting at start_offset.3/// Since start_offset must be non-negative this can be done in a streaming4/// manner.5pub struct StreamingSliceNode {6start_offset: usize,7length: usize,8stream_offset: usize,9}1011impl StreamingSliceNode {12pub fn new(start_offset: usize, length: usize) -> Self {13Self {14start_offset,15length,16stream_offset: 0,17}18}19}2021impl ComputeNode for StreamingSliceNode {22fn name(&self) -> &str {23"streaming-slice"24}2526fn update_state(27&mut self,28recv: &mut [PortState],29send: &mut [PortState],30_state: &StreamingExecutionState,31) -> PolarsResult<()> {32if self.stream_offset >= self.start_offset.saturating_add(self.length) || self.length == 0 {33recv[0] = PortState::Done;34send[0] = PortState::Done;35} else {36recv.swap_with_slice(send);37}38Ok(())39}4041fn spawn<'env, 's>(42&'env mut self,43scope: &'s TaskScope<'s, 'env>,44recv_ports: &mut [Option<RecvPort<'_>>],45send_ports: &mut [Option<SendPort<'_>>],46_state: &'s StreamingExecutionState,47join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,48) {49assert!(recv_ports.len() == 1 && send_ports.len() == 1);50let mut recv = recv_ports[0].take().unwrap().serial();51let mut send = send_ports[0].take().unwrap().serial();52join_handles.push(scope.spawn_task(TaskPriority::High, async move {53let stop_offset = self.start_offset.saturating_add(self.length);5455while let Ok(morsel) = recv.recv().await {56let morsel = morsel.map(|df| {57let height = df.height();5859// Calculate start/stop offsets within df and update global offset.60let relative_start_offset = self61.start_offset62.saturating_sub(self.stream_offset)63.min(height);64let relative_stop_offset =65stop_offset.saturating_sub(self.stream_offset).min(height);66self.stream_offset += height;6768let new_height = relative_stop_offset.saturating_sub(relative_start_offset);69if new_height != height {70df.slice(relative_start_offset as i64, new_height)71} else {72df73}74});7576// Technically not necessary, but it's nice to already tell the77// source to stop producing more morsels as we won't be78// interested in the results anyway.79if self.stream_offset >= stop_offset {80morsel.source_token().stop();81}8283if morsel.df().height() > 0 && send.send(morsel).await.is_err() {84break;85}8687if self.stream_offset >= stop_offset {88break;89}90}9192Ok(())93}))94}95}969798