Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/in_memory_sink.rs
6939 views
1
use std::sync::Arc;
2
3
use parking_lot::Mutex;
4
use polars_core::schema::Schema;
5
use polars_core::utils::accumulate_dataframes_vertical_unchecked;
6
7
use super::compute_node_prelude::*;
8
use crate::utils::in_memory_linearize::linearize;
9
10
pub struct InMemorySinkNode {
11
morsels_per_pipe: Mutex<Vec<Vec<(MorselSeq, DataFrame)>>>,
12
schema: Arc<Schema>,
13
}
14
15
impl InMemorySinkNode {
16
pub fn new(schema: Arc<Schema>) -> Self {
17
Self {
18
morsels_per_pipe: Mutex::default(),
19
schema,
20
}
21
}
22
}
23
24
impl ComputeNode for InMemorySinkNode {
25
fn name(&self) -> &str {
26
"in-memory-sink"
27
}
28
29
fn update_state(
30
&mut self,
31
recv: &mut [PortState],
32
send: &mut [PortState],
33
_state: &StreamingExecutionState,
34
) -> PolarsResult<()> {
35
assert!(send.is_empty());
36
assert!(recv.len() == 1);
37
38
// We are always ready to receive, unless the sender is done, then we're
39
// also done.
40
if recv[0] != PortState::Done {
41
recv[0] = PortState::Ready;
42
}
43
Ok(())
44
}
45
46
fn is_memory_intensive_pipeline_blocker(&self) -> bool {
47
true
48
}
49
50
fn spawn<'env, 's>(
51
&'env mut self,
52
scope: &'s TaskScope<'s, 'env>,
53
recv_ports: &mut [Option<RecvPort<'_>>],
54
send_ports: &mut [Option<SendPort<'_>>],
55
_state: &'s StreamingExecutionState,
56
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
57
) {
58
assert!(recv_ports.len() == 1 && send_ports.is_empty());
59
let receivers = recv_ports[0].take().unwrap().parallel();
60
61
for mut recv in receivers {
62
let slf = &*self;
63
join_handles.push(scope.spawn_task(TaskPriority::High, async move {
64
let mut morsels = Vec::new();
65
while let Ok(mut morsel) = recv.recv().await {
66
morsel.take_consume_token();
67
morsels.push((morsel.seq(), morsel.into_df()));
68
}
69
70
slf.morsels_per_pipe.lock().push(morsels);
71
Ok(())
72
}));
73
}
74
}
75
76
fn get_output(&mut self) -> PolarsResult<Option<DataFrame>> {
77
let morsels_per_pipe = core::mem::take(&mut *self.morsels_per_pipe.get_mut());
78
let dataframes = linearize(morsels_per_pipe);
79
if dataframes.is_empty() {
80
Ok(Some(DataFrame::empty_with_schema(&self.schema)))
81
} else {
82
Ok(Some(accumulate_dataframes_vertical_unchecked(dataframes)))
83
}
84
}
85
}
86
87