Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/graph.rs
6939 views
1
use polars_error::PolarsResult;
2
use slotmap::{Key, SecondaryMap, SlotMap};
3
4
use crate::execute::StreamingExecutionState;
5
use crate::nodes::ComputeNode;
6
7
slotmap::new_key_type! {
8
pub struct GraphNodeKey;
9
pub struct LogicalPipeKey;
10
}
11
12
/// Represents the compute graph.
13
///
14
/// The `nodes` perform computation and the `pipes` form the connections between nodes
15
/// that data is sent through.
16
#[derive(Default)]
17
pub struct Graph {
18
pub nodes: SlotMap<GraphNodeKey, GraphNode>,
19
pub pipes: SlotMap<LogicalPipeKey, LogicalPipe>,
20
}
21
22
impl Graph {
23
/// Allocate the needed `capacity` for the `Graph`.
24
pub fn with_capacity(capacity: usize) -> Self {
25
Self {
26
nodes: SlotMap::with_capacity_and_key(capacity),
27
pipes: SlotMap::with_capacity_and_key(capacity),
28
}
29
}
30
31
/// Add a new `GraphNode` to the `Graph` and connect the inputs and outputs
32
/// to their respective `LogicalPipe`s.
33
pub fn add_node<N: ComputeNode + 'static>(
34
&mut self,
35
node: N,
36
inputs: impl IntoIterator<Item = (GraphNodeKey, usize)>,
37
) -> GraphNodeKey {
38
// Add the GraphNode.
39
let node_key = self.nodes.insert(GraphNode {
40
compute: Box::new(node),
41
inputs: Vec::new(),
42
outputs: Vec::new(),
43
});
44
45
// Create and add pipes that connect input to output.
46
for (recv_port, (sender, send_port)) in inputs.into_iter().enumerate() {
47
let pipe = LogicalPipe {
48
sender,
49
send_port,
50
send_state: PortState::Blocked,
51
receiver: node_key,
52
recv_port,
53
recv_state: PortState::Blocked,
54
};
55
56
// Add the pipe.
57
let pipe_key = self.pipes.insert(pipe);
58
59
// And connect input to output.
60
self.nodes[node_key].inputs.push(pipe_key);
61
if self.nodes[sender].outputs.len() <= send_port {
62
self.nodes[sender]
63
.outputs
64
.resize(send_port + 1, LogicalPipeKey::null());
65
}
66
assert!(self.nodes[sender].outputs[send_port].is_null());
67
self.nodes[sender].outputs[send_port] = pipe_key;
68
}
69
70
node_key
71
}
72
73
/// Updates all the nodes' states until a fixed point is reached.
74
pub fn update_all_states(&mut self, state: &StreamingExecutionState) -> PolarsResult<()> {
75
let mut to_update: Vec<_> = self.nodes.keys().collect();
76
let mut scheduled_for_update: SecondaryMap<GraphNodeKey, ()> =
77
self.nodes.keys().map(|k| (k, ())).collect();
78
79
let verbose = std::env::var("POLARS_VERBOSE_STATE_UPDATE").as_deref() == Ok("1");
80
81
let mut recv_state = Vec::new();
82
let mut send_state = Vec::new();
83
while let Some(node_key) = to_update.pop() {
84
scheduled_for_update.remove(node_key);
85
let node = &mut self.nodes[node_key];
86
87
// Get the states of nodes this node is connected to.
88
recv_state.clear();
89
send_state.clear();
90
recv_state.extend(node.inputs.iter().map(|i| self.pipes[*i].send_state));
91
send_state.extend(node.outputs.iter().map(|o| self.pipes[*o].recv_state));
92
93
// Compute the new state of this node given its environment.
94
if verbose {
95
eprintln!(
96
"updating {}, before: {recv_state:?} {send_state:?}",
97
node.compute.name()
98
);
99
}
100
node.compute
101
.update_state(&mut recv_state, &mut send_state, state)?;
102
if verbose {
103
eprintln!(
104
"updating {}, after: {recv_state:?} {send_state:?}",
105
node.compute.name()
106
);
107
}
108
109
// Propagate information.
110
for (input, state) in node.inputs.iter().zip(recv_state.iter()) {
111
let pipe = &mut self.pipes[*input];
112
if pipe.recv_state != *state {
113
assert!(
114
pipe.recv_state != PortState::Done,
115
"implementation error: state transition from Done to Blocked/Ready attempted"
116
);
117
pipe.recv_state = *state;
118
if scheduled_for_update.insert(pipe.sender, ()).is_none() {
119
to_update.push(pipe.sender);
120
}
121
}
122
}
123
124
for (output, state) in node.outputs.iter().zip(send_state.iter()) {
125
let pipe = &mut self.pipes[*output];
126
if pipe.send_state != *state {
127
assert!(
128
pipe.send_state != PortState::Done,
129
"implementation error: state transition from Done to Blocked/Ready attempted"
130
);
131
pipe.send_state = *state;
132
if scheduled_for_update.insert(pipe.receiver, ()).is_none() {
133
to_update.push(pipe.receiver);
134
}
135
}
136
}
137
}
138
Ok(())
139
}
140
}
141
142
/// A node in the graph represents a computation performed on the stream of morsels
143
/// that flow through it.
144
pub struct GraphNode {
145
pub compute: Box<dyn ComputeNode>,
146
pub inputs: Vec<LogicalPipeKey>,
147
pub outputs: Vec<LogicalPipeKey>,
148
}
149
150
/// A pipe sends data between nodes.
151
#[allow(unused)] // TODO: remove.
152
pub struct LogicalPipe {
153
// Node that we send data to.
154
pub sender: GraphNodeKey,
155
// Output location:
156
// graph[x].output[i].send_port == i
157
pub send_port: usize,
158
pub send_state: PortState,
159
160
// Node that we receive data from.
161
pub receiver: GraphNodeKey,
162
// Input location:
163
// graph[x].inputs[i].recv_port == i
164
pub recv_port: usize,
165
pub recv_state: PortState,
166
}
167
168
#[derive(Copy, Clone, PartialEq, Eq, Debug, PartialOrd, Ord)]
169
pub enum PortState {
170
Blocked,
171
Ready,
172
Done,
173
}
174
175