Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/in_memory_map.rs
6939 views
1
use std::sync::Arc;
2
3
use polars_core::schema::Schema;
4
use polars_plan::plans::DataFrameUdf;
5
6
use super::compute_node_prelude::*;
7
use super::in_memory_sink::InMemorySinkNode;
8
use super::in_memory_source::InMemorySourceNode;
9
10
pub enum InMemoryMapNode {
11
Sink {
12
sink_node: InMemorySinkNode,
13
map: Arc<dyn DataFrameUdf>,
14
},
15
Source(InMemorySourceNode),
16
Done,
17
}
18
19
impl InMemoryMapNode {
20
pub fn new(input_schema: Arc<Schema>, map: Arc<dyn DataFrameUdf>) -> Self {
21
Self::Sink {
22
sink_node: InMemorySinkNode::new(input_schema),
23
map,
24
}
25
}
26
}
27
28
impl ComputeNode for InMemoryMapNode {
29
fn name(&self) -> &str {
30
"in-memory-map"
31
}
32
33
fn update_state(
34
&mut self,
35
recv: &mut [PortState],
36
send: &mut [PortState],
37
state: &StreamingExecutionState,
38
) -> PolarsResult<()> {
39
assert!(recv.len() == 1 && send.len() == 1);
40
41
// If the output doesn't want any more data, transition to being done.
42
if send[0] == PortState::Done && !matches!(self, Self::Done) {
43
*self = Self::Done;
44
}
45
46
// If the input is done, transition to being a source.
47
if let Self::Sink { sink_node, map } = self {
48
if recv[0] == PortState::Done {
49
let df = sink_node.get_output()?;
50
let source_node = InMemorySourceNode::new(
51
Arc::new(map.call_udf(df.unwrap())?),
52
MorselSeq::default(),
53
);
54
*self = Self::Source(source_node);
55
}
56
}
57
58
match self {
59
Self::Sink { sink_node, .. } => {
60
sink_node.update_state(recv, &mut [], state)?;
61
send[0] = PortState::Blocked;
62
},
63
Self::Source(source_node) => {
64
recv[0] = PortState::Done;
65
source_node.update_state(&mut [], send, state)?;
66
},
67
Self::Done => {
68
recv[0] = PortState::Done;
69
send[0] = PortState::Done;
70
},
71
}
72
Ok(())
73
}
74
75
fn is_memory_intensive_pipeline_blocker(&self) -> bool {
76
matches!(self, Self::Sink { .. })
77
}
78
79
fn spawn<'env, 's>(
80
&'env mut self,
81
scope: &'s TaskScope<'s, 'env>,
82
recv_ports: &mut [Option<RecvPort<'_>>],
83
send_ports: &mut [Option<SendPort<'_>>],
84
state: &'s StreamingExecutionState,
85
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
86
) {
87
match self {
88
Self::Sink { sink_node, .. } => {
89
sink_node.spawn(scope, recv_ports, &mut [], state, join_handles)
90
},
91
Self::Source(source) => source.spawn(scope, &mut [], send_ports, state, join_handles),
92
Self::Done => unreachable!(),
93
}
94
}
95
}
96
97