Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/with_row_index.rs
6939 views
1
use polars_core::prelude::*;
2
use polars_core::utils::Container;
3
use polars_utils::pl_str::PlSmallStr;
4
5
use super::compute_node_prelude::*;
6
use crate::DEFAULT_DISTRIBUTOR_BUFFER_SIZE;
7
use crate::async_primitives::distributor_channel::distributor_channel;
8
use crate::async_primitives::wait_group::WaitGroup;
9
10
pub struct WithRowIndexNode {
11
name: PlSmallStr,
12
offset: IdxSize,
13
}
14
15
impl WithRowIndexNode {
16
pub fn new(name: PlSmallStr, offset: Option<IdxSize>) -> Self {
17
Self {
18
name,
19
offset: offset.unwrap_or(0),
20
}
21
}
22
}
23
24
impl ComputeNode for WithRowIndexNode {
25
fn name(&self) -> &str {
26
"with-row-index"
27
}
28
29
fn update_state(
30
&mut self,
31
recv: &mut [PortState],
32
send: &mut [PortState],
33
_state: &StreamingExecutionState,
34
) -> PolarsResult<()> {
35
assert!(recv.len() == 1 && send.len() == 1);
36
recv.swap_with_slice(send);
37
Ok(())
38
}
39
40
fn spawn<'env, 's>(
41
&'env mut self,
42
scope: &'s TaskScope<'s, 'env>,
43
recv_ports: &mut [Option<RecvPort<'_>>],
44
send_ports: &mut [Option<SendPort<'_>>],
45
_state: &'s StreamingExecutionState,
46
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
47
) {
48
assert!(recv_ports.len() == 1 && send_ports.len() == 1);
49
let mut receiver = recv_ports[0].take().unwrap().serial();
50
let senders = send_ports[0].take().unwrap().parallel();
51
52
let (mut distributor, distr_receivers) =
53
distributor_channel(senders.len(), *DEFAULT_DISTRIBUTOR_BUFFER_SIZE);
54
55
let name = self.name.clone();
56
57
// To figure out the correct offsets we need to be serial.
58
join_handles.push(scope.spawn_task(TaskPriority::High, async move {
59
while let Ok(morsel) = receiver.recv().await {
60
let offset = self.offset;
61
self.offset = self
62
.offset
63
.checked_add(morsel.df().len().try_into().unwrap())
64
.unwrap();
65
if distributor.send((morsel, offset)).await.is_err() {
66
break;
67
}
68
}
69
70
Ok(())
71
}));
72
73
// But adding the new row index column can be done in parallel.
74
for (mut send, mut recv) in senders.into_iter().zip(distr_receivers) {
75
let name = name.clone();
76
join_handles.push(scope.spawn_task(TaskPriority::High, async move {
77
let wait_group = WaitGroup::default();
78
while let Ok((morsel, offset)) = recv.recv().await {
79
let mut morsel =
80
morsel.try_map(|df| df.with_row_index(name.clone(), Some(offset)))?;
81
morsel.set_consume_token(wait_group.token());
82
if send.send(morsel).await.is_err() {
83
break;
84
}
85
wait_group.wait().await;
86
}
87
88
Ok(())
89
}));
90
}
91
}
92
}
93
94