Path: blob/main/crates/polars-stream/src/nodes/rle_id.rs
6939 views
use polars_core::frame::DataFrame;1use polars_core::prelude::{AnyValue, Column, DataType};2use polars_core::scalar::Scalar;3use polars_error::PolarsResult;4use polars_utils::IdxSize;56use super::ComputeNode;7use crate::async_executor::{JoinHandle, TaskPriority, TaskScope};8use crate::execute::StreamingExecutionState;9use crate::graph::PortState;10use crate::pipe::{RecvPort, SendPort};1112pub struct RleIdNode {13index: IdxSize,1415dtype: DataType,16last: Option<AnyValue<'static>>,17}1819impl RleIdNode {20pub fn new(dtype: DataType) -> Self {21Self {22index: 0,23dtype,24last: None,25}26}27}2829impl ComputeNode for RleIdNode {30fn name(&self) -> &str {31"rle_id"32}3334fn update_state(35&mut self,36recv: &mut [PortState],37send: &mut [PortState],38_state: &StreamingExecutionState,39) -> PolarsResult<()> {40assert!(recv.len() == 1 && send.len() == 1);41recv.swap_with_slice(send);42Ok(())43}4445fn spawn<'env, 's>(46&'env mut self,47scope: &'s TaskScope<'s, 'env>,48recv_ports: &mut [Option<RecvPort<'_>>],49send_ports: &mut [Option<SendPort<'_>>],50_state: &'s StreamingExecutionState,51join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,52) {53assert_eq!(recv_ports.len(), 1);54assert_eq!(send_ports.len(), 1);5556let mut recv = recv_ports[0].take().unwrap().serial();57let mut send = send_ports[0].take().unwrap().serial();5859join_handles.push(scope.spawn_task(TaskPriority::High, async move {60let mut lengths = Vec::new();61while let Ok(mut m) = recv.recv().await {62let df = m.df_mut();63if df.height() == 0 {64continue;65}6667assert_eq!(df.width(), 1);68let column = &df[0];6970let name = column.name().clone();7172lengths.clear();73polars_ops::series::rle_lengths(column, &mut lengths)?;7475// If the last value seen is different from this first value here, bump the index76// by 1.77if let Some(last) = self.last.take() {78let fst = Scalar::new(self.dtype.clone(), column.get(0).unwrap().into_static());79let last = Scalar::new(self.dtype.clone(), last);80self.index += IdxSize::from(fst != last);81}82self.last = Some(column.get(column.len() - 1).unwrap().into_static());8384let column = if lengths.len() == 1 {85// If we only have one unique value, just give a Scalar column.86Column::new_scalar(name, Scalar::from(self.index), lengths[0] as usize)87} else {88let mut values = Vec::with_capacity(column.len());89values.extend(std::iter::repeat_n(self.index, lengths[0] as usize));90for length in lengths.iter().skip(1) {91self.index += 1;92values.extend(std::iter::repeat_n(self.index, *length as usize));93}94let mut column = Column::new(name, values);95column.set_sorted_flag(polars_core::series::IsSorted::Ascending);96column97};9899*df = unsafe { DataFrame::new_no_checks(column.len(), vec![column]) };100101if send.send(m).await.is_err() {102break;103}104}105106Ok(())107}));108}109}110111112