Path: blob/main/crates/polars-stream/src/nodes/with_row_index.rs
6939 views
use polars_core::prelude::*;1use polars_core::utils::Container;2use polars_utils::pl_str::PlSmallStr;34use super::compute_node_prelude::*;5use crate::DEFAULT_DISTRIBUTOR_BUFFER_SIZE;6use crate::async_primitives::distributor_channel::distributor_channel;7use crate::async_primitives::wait_group::WaitGroup;89pub struct WithRowIndexNode {10name: PlSmallStr,11offset: IdxSize,12}1314impl WithRowIndexNode {15pub fn new(name: PlSmallStr, offset: Option<IdxSize>) -> Self {16Self {17name,18offset: offset.unwrap_or(0),19}20}21}2223impl ComputeNode for WithRowIndexNode {24fn name(&self) -> &str {25"with-row-index"26}2728fn update_state(29&mut self,30recv: &mut [PortState],31send: &mut [PortState],32_state: &StreamingExecutionState,33) -> PolarsResult<()> {34assert!(recv.len() == 1 && send.len() == 1);35recv.swap_with_slice(send);36Ok(())37}3839fn spawn<'env, 's>(40&'env mut self,41scope: &'s TaskScope<'s, 'env>,42recv_ports: &mut [Option<RecvPort<'_>>],43send_ports: &mut [Option<SendPort<'_>>],44_state: &'s StreamingExecutionState,45join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,46) {47assert!(recv_ports.len() == 1 && send_ports.len() == 1);48let mut receiver = recv_ports[0].take().unwrap().serial();49let senders = send_ports[0].take().unwrap().parallel();5051let (mut distributor, distr_receivers) =52distributor_channel(senders.len(), *DEFAULT_DISTRIBUTOR_BUFFER_SIZE);5354let name = self.name.clone();5556// To figure out the correct offsets we need to be serial.57join_handles.push(scope.spawn_task(TaskPriority::High, async move {58while let Ok(morsel) = receiver.recv().await {59let offset = self.offset;60self.offset = self61.offset62.checked_add(morsel.df().len().try_into().unwrap())63.unwrap();64if distributor.send((morsel, offset)).await.is_err() {65break;66}67}6869Ok(())70}));7172// But adding the new row index column can be done in parallel.73for (mut send, mut recv) in senders.into_iter().zip(distr_receivers) {74let name = name.clone();75join_handles.push(scope.spawn_task(TaskPriority::High, async move {76let wait_group = WaitGroup::default();77while let Ok((morsel, offset)) = recv.recv().await {78let mut morsel =79morsel.try_map(|df| df.with_row_index(name.clone(), Some(offset)))?;80morsel.set_consume_token(wait_group.token());81if send.send(morsel).await.is_err() {82break;83}84wait_group.wait().await;85}8687Ok(())88}));89}90}91}929394