Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/rle_id.rs
6939 views
1
use polars_core::frame::DataFrame;
2
use polars_core::prelude::{AnyValue, Column, DataType};
3
use polars_core::scalar::Scalar;
4
use polars_error::PolarsResult;
5
use polars_utils::IdxSize;
6
7
use super::ComputeNode;
8
use crate::async_executor::{JoinHandle, TaskPriority, TaskScope};
9
use crate::execute::StreamingExecutionState;
10
use crate::graph::PortState;
11
use crate::pipe::{RecvPort, SendPort};
12
13
pub struct RleIdNode {
14
index: IdxSize,
15
16
dtype: DataType,
17
last: Option<AnyValue<'static>>,
18
}
19
20
impl RleIdNode {
21
pub fn new(dtype: DataType) -> Self {
22
Self {
23
index: 0,
24
dtype,
25
last: None,
26
}
27
}
28
}
29
30
impl ComputeNode for RleIdNode {
31
fn name(&self) -> &str {
32
"rle_id"
33
}
34
35
fn update_state(
36
&mut self,
37
recv: &mut [PortState],
38
send: &mut [PortState],
39
_state: &StreamingExecutionState,
40
) -> PolarsResult<()> {
41
assert!(recv.len() == 1 && send.len() == 1);
42
recv.swap_with_slice(send);
43
Ok(())
44
}
45
46
fn spawn<'env, 's>(
47
&'env mut self,
48
scope: &'s TaskScope<'s, 'env>,
49
recv_ports: &mut [Option<RecvPort<'_>>],
50
send_ports: &mut [Option<SendPort<'_>>],
51
_state: &'s StreamingExecutionState,
52
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
53
) {
54
assert_eq!(recv_ports.len(), 1);
55
assert_eq!(send_ports.len(), 1);
56
57
let mut recv = recv_ports[0].take().unwrap().serial();
58
let mut send = send_ports[0].take().unwrap().serial();
59
60
join_handles.push(scope.spawn_task(TaskPriority::High, async move {
61
let mut lengths = Vec::new();
62
while let Ok(mut m) = recv.recv().await {
63
let df = m.df_mut();
64
if df.height() == 0 {
65
continue;
66
}
67
68
assert_eq!(df.width(), 1);
69
let column = &df[0];
70
71
let name = column.name().clone();
72
73
lengths.clear();
74
polars_ops::series::rle_lengths(column, &mut lengths)?;
75
76
// If the last value seen is different from this first value here, bump the index
77
// by 1.
78
if let Some(last) = self.last.take() {
79
let fst = Scalar::new(self.dtype.clone(), column.get(0).unwrap().into_static());
80
let last = Scalar::new(self.dtype.clone(), last);
81
self.index += IdxSize::from(fst != last);
82
}
83
self.last = Some(column.get(column.len() - 1).unwrap().into_static());
84
85
let column = if lengths.len() == 1 {
86
// If we only have one unique value, just give a Scalar column.
87
Column::new_scalar(name, Scalar::from(self.index), lengths[0] as usize)
88
} else {
89
let mut values = Vec::with_capacity(column.len());
90
values.extend(std::iter::repeat_n(self.index, lengths[0] as usize));
91
for length in lengths.iter().skip(1) {
92
self.index += 1;
93
values.extend(std::iter::repeat_n(self.index, *length as usize));
94
}
95
let mut column = Column::new(name, values);
96
column.set_sorted_flag(polars_core::series::IsSorted::Ascending);
97
column
98
};
99
100
*df = unsafe { DataFrame::new_no_checks(column.len(), vec![column]) };
101
102
if send.send(m).await.is_err() {
103
break;
104
}
105
}
106
107
Ok(())
108
}));
109
}
110
}
111
112