Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/execute.rs
8406 views
1
use std::sync::Arc;
2
3
use crossbeam_channel::Sender;
4
use parking_lot::Mutex;
5
use polars_core::POOL;
6
use polars_core::frame::DataFrame;
7
use polars_error::PolarsResult;
8
use polars_expr::state::ExecutionState;
9
use polars_utils::aliases::PlHashSet;
10
use polars_utils::relaxed_cell::RelaxedCell;
11
use polars_utils::reuse_vec::reuse_vec;
12
use slotmap::{SecondaryMap, SparseSecondaryMap};
13
use tokio::task::JoinHandle;
14
15
use crate::async_executor;
16
use crate::graph::{Graph, GraphNode, GraphNodeKey, LogicalPipeKey, PortState};
17
use crate::metrics::{GraphMetrics, MetricsBuilder};
18
use crate::pipe::PhysicalPipe;
19
20
#[derive(Clone)]
21
pub struct StreamingExecutionState {
22
/// The number of parallel pipelines we have within each stream.
23
pub num_pipelines: usize,
24
25
/// The ExecutionState passed to any non-streaming operations.
26
pub in_memory_exec_state: ExecutionState,
27
28
query_tasks_send: Sender<JoinHandle<PolarsResult<()>>>,
29
subphase_tasks_send: Sender<JoinHandle<PolarsResult<()>>>,
30
}
31
32
impl StreamingExecutionState {
33
/// Spawns a task which is awaited at the end of the query.
34
#[allow(unused)]
35
pub fn spawn_query_task<F: Future<Output = PolarsResult<()>> + Send + 'static>(&self, fut: F) {
36
self.query_tasks_send
37
.send(polars_io::pl_async::get_runtime().spawn(fut))
38
.unwrap();
39
}
40
41
/// Spawns a task which is awaited at the end of the current subphase. That is
42
/// if called inside `update_state` it is awaited after the state update, and
43
/// if called inside `spawn` it is awaited after the execution of that phase is
44
/// complete.
45
pub fn spawn_subphase_task<F: Future<Output = PolarsResult<()>> + Send + 'static>(
46
&self,
47
fut: F,
48
) {
49
self.subphase_tasks_send
50
.send(polars_io::pl_async::get_runtime().spawn(fut))
51
.unwrap();
52
}
53
}
54
55
/// Finds all runnable pipeline blockers in the graph, that is, nodes which:
56
/// - Only have blocked output ports.
57
/// - Have at least one ready input port connected to a ready output port.
58
fn find_runnable_pipeline_blockers(graph: &Graph) -> Vec<GraphNodeKey> {
59
let mut blockers = Vec::new();
60
for (node_key, node) in graph.nodes.iter() {
61
// TODO: how does the multiplexer fit into this?
62
let only_has_blocked_outputs = node
63
.outputs
64
.iter()
65
.all(|o| graph.pipes[*o].send_state == PortState::Blocked);
66
if !only_has_blocked_outputs {
67
continue;
68
}
69
70
let has_input_ready = node.inputs.iter().any(|i| {
71
graph.pipes[*i].send_state == PortState::Ready
72
&& graph.pipes[*i].recv_state == PortState::Ready
73
});
74
if has_input_ready {
75
blockers.push(node_key);
76
}
77
}
78
blockers
79
}
80
81
/// Given a set of nodes expand this set with all nodes which are inputs to the
82
/// set and whose connecting pipe is ready on both sides, recursively.
83
///
84
/// Returns the set of nodes as well as the pipes connecting them.
85
fn expand_ready_subgraph(
86
graph: &Graph,
87
mut nodes: Vec<GraphNodeKey>,
88
) -> (PlHashSet<GraphNodeKey>, Vec<LogicalPipeKey>) {
89
let mut in_subgraph: PlHashSet<GraphNodeKey> = nodes.iter().copied().collect();
90
let mut pipes = Vec::with_capacity(nodes.len());
91
while let Some(node_key) = nodes.pop() {
92
let node = &graph.nodes[node_key];
93
for input_pipe_key in &node.inputs {
94
let input_pipe = &graph.pipes[*input_pipe_key];
95
if input_pipe.send_state == PortState::Ready
96
&& input_pipe.recv_state == PortState::Ready
97
{
98
pipes.push(*input_pipe_key);
99
if in_subgraph.insert(input_pipe.sender) {
100
nodes.push(input_pipe.sender);
101
}
102
}
103
}
104
}
105
106
(in_subgraph, pipes)
107
}
108
109
/// Finds a part of the graph which we can run.
110
fn find_runnable_subgraph(graph: &mut Graph) -> (PlHashSet<GraphNodeKey>, Vec<LogicalPipeKey>) {
111
// Find pipeline blockers, choose a subset with at most one memory intensive
112
// pipeline blocker, and return the subgraph needed to feed them.
113
let blockers = find_runnable_pipeline_blockers(graph);
114
let (expensive, cheap): (Vec<_>, Vec<_>) = blockers.into_iter().partition(|b| {
115
graph.nodes[*b]
116
.compute
117
.is_memory_intensive_pipeline_blocker()
118
});
119
120
// If all expensive pipeline blockers left are sinks (InMemorySink), we're not
121
// gaining anything by only running a subset.
122
let only_expensive_sinks_left = expensive
123
.iter()
124
.all(|node_key| graph.nodes[*node_key].outputs.is_empty());
125
126
let mut to_run = cheap;
127
if only_expensive_sinks_left {
128
to_run.extend(expensive);
129
} else {
130
// TODO: choose which expensive pipeline blocker(s) to run more intelligently.
131
let best = expensive.into_iter().max_by_key(|node_key| {
132
// Prefer to run nodes whose outputs are ready to be consumed. Also
133
// prefer to run nodes which have outputs over in-memory sinks.
134
let num_outputs = graph.nodes[*node_key].outputs.len();
135
let num_outputs_ready_to_recv = graph.nodes[*node_key]
136
.outputs
137
.iter()
138
.filter(|o| graph.pipes[**o].recv_state == PortState::Ready)
139
.count();
140
(num_outputs_ready_to_recv, num_outputs)
141
});
142
to_run.extend(best);
143
}
144
145
expand_ready_subgraph(graph, to_run)
146
}
147
148
/// Runs the given subgraph. Assumes the set of pipes is correct for the subgraph.
149
fn run_subgraph(
150
graph: &mut Graph,
151
nodes: &PlHashSet<GraphNodeKey>,
152
pipes: &[LogicalPipeKey],
153
pipe_seq_offsets: &mut SecondaryMap<LogicalPipeKey, Arc<RelaxedCell<u64>>>,
154
state: &StreamingExecutionState,
155
metrics: Option<Arc<Mutex<GraphMetrics>>>,
156
) -> PolarsResult<()> {
157
// Construct physical pipes for the logical pipes we'll use.
158
let mut physical_pipes = SecondaryMap::new();
159
for pipe_key in pipes.iter().copied() {
160
let seq_offset = pipe_seq_offsets
161
.entry(pipe_key)
162
.unwrap()
163
.or_default()
164
.clone();
165
physical_pipes.insert(
166
pipe_key,
167
PhysicalPipe::new(state.num_pipelines, pipe_key, seq_offset, metrics.clone()),
168
);
169
}
170
171
// We do a topological sort of the graph: we want to spawn each node,
172
// starting with the sinks and moving backwards. This order is important
173
// for the initialization of physical pipes - the receive port must be
174
// initialized first.
175
let mut ready = Vec::new();
176
let mut num_send_ports_not_yet_ready = SecondaryMap::new();
177
for node_key in nodes {
178
let node = &graph.nodes[*node_key];
179
let num_outputs_in_subgraph = node
180
.outputs
181
.iter()
182
.filter(|o| physical_pipes.contains_key(**o))
183
.count();
184
num_send_ports_not_yet_ready.insert(*node_key, num_outputs_in_subgraph);
185
if num_outputs_in_subgraph == 0 {
186
ready.push(*node_key);
187
}
188
}
189
190
async_executor::task_scope(|scope| {
191
// Using SlotMap::iter_mut we can get simultaneous mutable references. By storing them and
192
// removing the references from the secondary map as we do our topological sort we ensure
193
// they are unique.
194
let mut node_refs: SecondaryMap<GraphNodeKey, &mut GraphNode> =
195
graph.nodes.iter_mut().collect();
196
197
// Initialize tasks.
198
let mut join_handles = Vec::new();
199
let mut input_pipes = Vec::new();
200
let mut output_pipes = Vec::new();
201
let mut recv_ports = Vec::new();
202
let mut send_ports = Vec::new();
203
while let Some(node_key) = ready.pop() {
204
let node = node_refs.remove(node_key).unwrap();
205
206
// Temporarily remove the physical pipes from the SecondaryMap so that we can mutably
207
// borrow them simultaneously.
208
for input in &node.inputs {
209
input_pipes.push(physical_pipes.remove(*input));
210
}
211
for output in &node.outputs {
212
output_pipes.push(physical_pipes.remove(*output));
213
}
214
215
// Construct the receive/send ports.
216
for input_pipe in &mut input_pipes {
217
recv_ports.push(input_pipe.as_mut().map(|p| p.recv_port()));
218
}
219
for output_pipe in &mut output_pipes {
220
send_ports.push(output_pipe.as_mut().map(|p| p.send_port()));
221
}
222
223
// Spawn the tasks.
224
let pre_spawn_offset = join_handles.len();
225
226
if let Some(graph_metrics) = metrics.clone() {
227
node.compute.set_metrics_builder(MetricsBuilder {
228
graph_key: node_key,
229
graph_metrics,
230
});
231
}
232
233
node.compute.spawn(
234
scope,
235
&mut recv_ports[..],
236
&mut send_ports[..],
237
state,
238
&mut join_handles,
239
);
240
if let Some(lock) = metrics.as_ref() {
241
let mut m = lock.lock();
242
for handle in &join_handles[pre_spawn_offset..] {
243
m.add_task(node_key, handle.metrics().unwrap().clone());
244
}
245
}
246
247
// Ensure the ports were consumed.
248
assert!(recv_ports.iter().all(|p| p.is_none()));
249
assert!(send_ports.iter().all(|p| p.is_none()));
250
251
// Reuse the port vectors, clearing the borrow it has on input_/output_pipes.
252
recv_ports = reuse_vec(recv_ports);
253
send_ports = reuse_vec(send_ports);
254
255
// Re-insert the physical pipes into the SecondaryMap.
256
for (input, input_pipe) in node.inputs.iter().zip(input_pipes.drain(..)) {
257
if let Some(pipe) = input_pipe {
258
physical_pipes.insert(*input, pipe);
259
260
// For all the receive ports we just initialized inside spawn(), decrement
261
// the num_send_ports_not_yet_ready for the node it was connected to and mark
262
// the node as ready to spawn if all its send ports are connected to
263
// initialized recv ports.
264
let sender = graph.pipes[*input].sender;
265
if let Some(count) = num_send_ports_not_yet_ready.get_mut(sender) {
266
if *count > 0 {
267
*count -= 1;
268
if *count == 0 {
269
ready.push(sender);
270
}
271
}
272
}
273
}
274
}
275
for (output, output_pipe) in node.outputs.iter().zip(output_pipes.drain(..)) {
276
if let Some(pipe) = output_pipe {
277
physical_pipes.insert(*output, pipe);
278
}
279
}
280
281
// Reuse the pipe vectors, clearing the borrow it has for next iteration.
282
input_pipes = reuse_vec(input_pipes);
283
output_pipes = reuse_vec(output_pipes);
284
}
285
286
// Spawn tasks for all the physical pipes (no-op on most, but needed for
287
// those with distributors or linearizers).
288
for pipe in physical_pipes.values_mut() {
289
pipe.spawn(scope, &mut join_handles);
290
}
291
292
// Wait until all tasks are done.
293
polars_io::pl_async::get_runtime().block_on(async move {
294
for handle in join_handles {
295
handle.await?;
296
}
297
PolarsResult::Ok(())
298
})
299
})?;
300
301
Ok(())
302
}
303
304
pub fn execute_graph(
305
graph: &mut Graph,
306
metrics: Option<Arc<Mutex<GraphMetrics>>>,
307
) -> PolarsResult<SparseSecondaryMap<GraphNodeKey, DataFrame>> {
308
// Get the number of threads from the rayon thread-pool as that respects our config.
309
let num_pipelines = POOL.current_num_threads();
310
async_executor::set_num_threads(num_pipelines);
311
312
let (query_tasks_send, query_tasks_recv) = crossbeam_channel::unbounded();
313
let (subphase_tasks_send, subphase_tasks_recv) = crossbeam_channel::unbounded();
314
315
let state = StreamingExecutionState {
316
num_pipelines,
317
in_memory_exec_state: ExecutionState::default(),
318
query_tasks_send,
319
subphase_tasks_send,
320
};
321
322
// Ensure everything is properly connected.
323
for (node_key, node) in &graph.nodes {
324
for (i, input) in node.inputs.iter().enumerate() {
325
assert!(graph.pipes[*input].receiver == node_key);
326
assert!(graph.pipes[*input].recv_port == i);
327
}
328
for (i, output) in node.outputs.iter().enumerate() {
329
assert!(graph.pipes[*output].sender == node_key);
330
assert!(graph.pipes[*output].send_port == i);
331
}
332
}
333
334
let mut pipe_seq_offsets = SecondaryMap::new();
335
loop {
336
// Update the states.
337
if polars_core::config::verbose() {
338
eprintln!("polars-stream: updating graph state");
339
}
340
graph.update_all_states(&state, metrics.as_deref())?;
341
342
if let Some(m) = metrics.as_ref() {
343
m.lock().flush(&graph.pipes);
344
}
345
346
polars_io::pl_async::get_runtime().block_on(async {
347
// TODO: track this in metrics.
348
while let Ok(handle) = subphase_tasks_recv.try_recv() {
349
handle.await.unwrap()?;
350
}
351
PolarsResult::Ok(())
352
})?;
353
354
// Find a subgraph to run.
355
let (nodes, pipes) = find_runnable_subgraph(graph);
356
if polars_core::config::verbose() {
357
for node in &nodes {
358
eprintln!(
359
"polars-stream: running {} in subgraph",
360
graph.nodes[*node].compute.name()
361
);
362
}
363
}
364
365
if nodes.is_empty() {
366
break;
367
}
368
369
// Run the subgraph until phase completion.
370
run_subgraph(
371
graph,
372
&nodes,
373
&pipes,
374
&mut pipe_seq_offsets,
375
&state,
376
metrics.clone(),
377
)?;
378
polars_io::pl_async::get_runtime().block_on(async {
379
// TODO: track this in metrics.
380
while let Ok(handle) = subphase_tasks_recv.try_recv() {
381
handle.await.unwrap()?;
382
}
383
PolarsResult::Ok(())
384
})?;
385
if polars_core::config::verbose() {
386
eprintln!("polars-stream: done running graph phase");
387
}
388
}
389
390
// Ensure everything is done.
391
for pipe in graph.pipes.values() {
392
assert!(pipe.send_state == PortState::Done && pipe.recv_state == PortState::Done);
393
}
394
395
// Finalize query tasks.
396
polars_io::pl_async::get_runtime().block_on(async {
397
// TODO: track this in metrics.
398
while let Ok(handle) = query_tasks_recv.try_recv() {
399
handle.await.unwrap()?;
400
}
401
PolarsResult::Ok(())
402
})?;
403
404
// Extract output from in-memory nodes.
405
let mut out = SparseSecondaryMap::new();
406
for (node_key, node) in graph.nodes.iter_mut() {
407
if let Some(df) = node.compute.get_output()? {
408
out.insert(node_key, df);
409
}
410
}
411
412
Ok(out)
413
}
414
415