Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/streaming_slice.rs
6939 views
1
use super::compute_node_prelude::*;
2
3
/// A node that will pass-through up to length rows, starting at start_offset.
4
/// Since start_offset must be non-negative this can be done in a streaming
5
/// manner.
6
pub struct StreamingSliceNode {
7
start_offset: usize,
8
length: usize,
9
stream_offset: usize,
10
}
11
12
impl StreamingSliceNode {
13
pub fn new(start_offset: usize, length: usize) -> Self {
14
Self {
15
start_offset,
16
length,
17
stream_offset: 0,
18
}
19
}
20
}
21
22
impl ComputeNode for StreamingSliceNode {
23
fn name(&self) -> &str {
24
"streaming-slice"
25
}
26
27
fn update_state(
28
&mut self,
29
recv: &mut [PortState],
30
send: &mut [PortState],
31
_state: &StreamingExecutionState,
32
) -> PolarsResult<()> {
33
if self.stream_offset >= self.start_offset.saturating_add(self.length) || self.length == 0 {
34
recv[0] = PortState::Done;
35
send[0] = PortState::Done;
36
} else {
37
recv.swap_with_slice(send);
38
}
39
Ok(())
40
}
41
42
fn spawn<'env, 's>(
43
&'env mut self,
44
scope: &'s TaskScope<'s, 'env>,
45
recv_ports: &mut [Option<RecvPort<'_>>],
46
send_ports: &mut [Option<SendPort<'_>>],
47
_state: &'s StreamingExecutionState,
48
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
49
) {
50
assert!(recv_ports.len() == 1 && send_ports.len() == 1);
51
let mut recv = recv_ports[0].take().unwrap().serial();
52
let mut send = send_ports[0].take().unwrap().serial();
53
join_handles.push(scope.spawn_task(TaskPriority::High, async move {
54
let stop_offset = self.start_offset.saturating_add(self.length);
55
56
while let Ok(morsel) = recv.recv().await {
57
let morsel = morsel.map(|df| {
58
let height = df.height();
59
60
// Calculate start/stop offsets within df and update global offset.
61
let relative_start_offset = self
62
.start_offset
63
.saturating_sub(self.stream_offset)
64
.min(height);
65
let relative_stop_offset =
66
stop_offset.saturating_sub(self.stream_offset).min(height);
67
self.stream_offset += height;
68
69
let new_height = relative_stop_offset.saturating_sub(relative_start_offset);
70
if new_height != height {
71
df.slice(relative_start_offset as i64, new_height)
72
} else {
73
df
74
}
75
});
76
77
// Technically not necessary, but it's nice to already tell the
78
// source to stop producing more morsels as we won't be
79
// interested in the results anyway.
80
if self.stream_offset >= stop_offset {
81
morsel.source_token().stop();
82
}
83
84
if morsel.df().height() > 0 && send.send(morsel).await.is_err() {
85
break;
86
}
87
88
if self.stream_offset >= stop_offset {
89
break;
90
}
91
}
92
93
Ok(())
94
}))
95
}
96
}
97
98