Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/execute.rs
6939 views
1
use std::sync::Arc;
2
3
use crossbeam_channel::Sender;
4
use polars_core::POOL;
5
use polars_core::frame::DataFrame;
6
use polars_error::PolarsResult;
7
use polars_expr::state::ExecutionState;
8
use polars_utils::aliases::PlHashSet;
9
use polars_utils::relaxed_cell::RelaxedCell;
10
use slotmap::{SecondaryMap, SparseSecondaryMap};
11
use tokio::task::JoinHandle;
12
13
use crate::async_executor;
14
use crate::graph::{Graph, GraphNode, GraphNodeKey, LogicalPipeKey, PortState};
15
use crate::pipe::PhysicalPipe;
16
17
#[derive(Clone)]
18
pub struct StreamingExecutionState {
19
/// The number of parallel pipelines we have within each stream.
20
pub num_pipelines: usize,
21
22
/// The ExecutionState passed to any non-streaming operations.
23
pub in_memory_exec_state: ExecutionState,
24
25
query_tasks_send: Sender<JoinHandle<PolarsResult<()>>>,
26
subphase_tasks_send: Sender<JoinHandle<PolarsResult<()>>>,
27
}
28
29
impl StreamingExecutionState {
30
/// Spawns a task which is awaited at the end of the query.
31
#[expect(unused)]
32
pub fn spawn_query_task<F: Future<Output = PolarsResult<()>> + Send + 'static>(&self, fut: F) {
33
self.query_tasks_send
34
.send(polars_io::pl_async::get_runtime().spawn(fut))
35
.unwrap();
36
}
37
38
/// Spawns a task which is awaited at the end of the current subphase. That is
39
/// if called inside `update_state` it is awaited after the state update, and
40
/// if called inside `spawn` it is awaited after the execution of that phase is
41
/// complete.
42
pub fn spawn_subphase_task<F: Future<Output = PolarsResult<()>> + Send + 'static>(
43
&self,
44
fut: F,
45
) {
46
self.subphase_tasks_send
47
.send(polars_io::pl_async::get_runtime().spawn(fut))
48
.unwrap();
49
}
50
}
51
52
/// Finds all runnable pipeline blockers in the graph, that is, nodes which:
53
/// - Only have blocked output ports.
54
/// - Have at least one ready input port connected to a ready output port.
55
fn find_runnable_pipeline_blockers(graph: &Graph) -> Vec<GraphNodeKey> {
56
let mut blockers = Vec::new();
57
for (node_key, node) in graph.nodes.iter() {
58
// TODO: how does the multiplexer fit into this?
59
let only_has_blocked_outputs = node
60
.outputs
61
.iter()
62
.all(|o| graph.pipes[*o].send_state == PortState::Blocked);
63
if !only_has_blocked_outputs {
64
continue;
65
}
66
67
let has_input_ready = node.inputs.iter().any(|i| {
68
graph.pipes[*i].send_state == PortState::Ready
69
&& graph.pipes[*i].recv_state == PortState::Ready
70
});
71
if has_input_ready {
72
blockers.push(node_key);
73
}
74
}
75
blockers
76
}
77
78
/// Given a set of nodes expand this set with all nodes which are inputs to the
79
/// set and whose connecting pipe is ready on both sides, recursively.
80
///
81
/// Returns the set of nodes as well as the pipes connecting them.
82
fn expand_ready_subgraph(
83
graph: &Graph,
84
mut nodes: Vec<GraphNodeKey>,
85
) -> (PlHashSet<GraphNodeKey>, Vec<LogicalPipeKey>) {
86
let mut in_subgraph: PlHashSet<GraphNodeKey> = nodes.iter().copied().collect();
87
let mut pipes = Vec::with_capacity(nodes.len());
88
while let Some(node_key) = nodes.pop() {
89
let node = &graph.nodes[node_key];
90
for input_pipe_key in &node.inputs {
91
let input_pipe = &graph.pipes[*input_pipe_key];
92
if input_pipe.send_state == PortState::Ready
93
&& input_pipe.recv_state == PortState::Ready
94
{
95
pipes.push(*input_pipe_key);
96
if in_subgraph.insert(input_pipe.sender) {
97
nodes.push(input_pipe.sender);
98
}
99
}
100
}
101
}
102
103
(in_subgraph, pipes)
104
}
105
106
/// Finds a part of the graph which we can run.
107
fn find_runnable_subgraph(graph: &mut Graph) -> (PlHashSet<GraphNodeKey>, Vec<LogicalPipeKey>) {
108
// Find pipeline blockers, choose a subset with at most one memory intensive
109
// pipeline blocker, and return the subgraph needed to feed them.
110
let blockers = find_runnable_pipeline_blockers(graph);
111
let (mut expensive, cheap): (Vec<_>, Vec<_>) = blockers.into_iter().partition(|b| {
112
graph.nodes[*b]
113
.compute
114
.is_memory_intensive_pipeline_blocker()
115
});
116
117
// TODO: choose which expensive pipeline blocker to run more intelligently.
118
expensive.sort_by_key(|node_key| {
119
// Prefer to run nodes whose outputs are ready to be consumed.
120
// outputs_ready_to_receive
121
graph.nodes[*node_key]
122
.outputs
123
.iter()
124
.filter(|o| graph.pipes[**o].recv_state == PortState::Ready)
125
.count()
126
});
127
128
let mut to_run = cheap;
129
if let Some(node) = expensive.pop() {
130
to_run.push(node);
131
}
132
expand_ready_subgraph(graph, to_run)
133
}
134
135
/// Re-uses the memory for a vec while clearing it. Allows casting the type of
136
/// the vec at the same time. The stdlib specializes collect() to re-use the
137
/// memory.
138
fn reuse_vec<T, U>(v: Vec<T>) -> Vec<U> {
139
v.into_iter().filter_map(|_| None).collect()
140
}
141
142
/// Runs the given subgraph. Assumes the set of pipes is correct for the subgraph.
143
fn run_subgraph(
144
graph: &mut Graph,
145
nodes: &PlHashSet<GraphNodeKey>,
146
pipes: &[LogicalPipeKey],
147
pipe_seq_offsets: &mut SecondaryMap<LogicalPipeKey, Arc<RelaxedCell<u64>>>,
148
state: &StreamingExecutionState,
149
) -> PolarsResult<()> {
150
// Construct physical pipes for the logical pipes we'll use.
151
let mut physical_pipes = SecondaryMap::new();
152
for pipe_key in pipes.iter().copied() {
153
let seq_offset = pipe_seq_offsets
154
.entry(pipe_key)
155
.unwrap()
156
.or_default()
157
.clone();
158
physical_pipes.insert(pipe_key, PhysicalPipe::new(state.num_pipelines, seq_offset));
159
}
160
161
// We do a topological sort of the graph: we want to spawn each node,
162
// starting with the sinks and moving backwards. This order is important
163
// for the initialization of physical pipes - the receive port must be
164
// initialized first.
165
let mut ready = Vec::new();
166
let mut num_send_ports_not_yet_ready = SecondaryMap::new();
167
for node_key in nodes {
168
let node = &graph.nodes[*node_key];
169
let num_outputs_in_subgraph = node
170
.outputs
171
.iter()
172
.filter(|o| physical_pipes.contains_key(**o))
173
.count();
174
num_send_ports_not_yet_ready.insert(*node_key, num_outputs_in_subgraph);
175
if num_outputs_in_subgraph == 0 {
176
ready.push(*node_key);
177
}
178
}
179
180
async_executor::task_scope(|scope| {
181
// Using SlotMap::iter_mut we can get simultaneous mutable references. By storing them and
182
// removing the references from the secondary map as we do our topological sort we ensure
183
// they are unique.
184
let mut node_refs: SecondaryMap<GraphNodeKey, &mut GraphNode> =
185
graph.nodes.iter_mut().collect();
186
187
// Initialize tasks.
188
let mut join_handles = Vec::new();
189
let mut input_pipes = Vec::new();
190
let mut output_pipes = Vec::new();
191
let mut recv_ports = Vec::new();
192
let mut send_ports = Vec::new();
193
while let Some(node_key) = ready.pop() {
194
let node = node_refs.remove(node_key).unwrap();
195
196
// Temporarily remove the physical pipes from the SecondaryMap so that we can mutably
197
// borrow them simultaneously.
198
for input in &node.inputs {
199
input_pipes.push(physical_pipes.remove(*input));
200
}
201
for output in &node.outputs {
202
output_pipes.push(physical_pipes.remove(*output));
203
}
204
205
// Construct the receive/send ports.
206
for input_pipe in &mut input_pipes {
207
recv_ports.push(input_pipe.as_mut().map(|p| p.recv_port()));
208
}
209
for output_pipe in &mut output_pipes {
210
send_ports.push(output_pipe.as_mut().map(|p| p.send_port()));
211
}
212
213
// Spawn a task per pipeline.
214
node.compute.spawn(
215
scope,
216
&mut recv_ports[..],
217
&mut send_ports[..],
218
state,
219
&mut join_handles,
220
);
221
222
// Ensure the ports were consumed.
223
assert!(recv_ports.iter().all(|p| p.is_none()));
224
assert!(send_ports.iter().all(|p| p.is_none()));
225
226
// Reuse the port vectors, clearing the borrow it has on input_/output_pipes.
227
recv_ports = reuse_vec(recv_ports);
228
send_ports = reuse_vec(send_ports);
229
230
// Re-insert the physical pipes into the SecondaryMap.
231
for (input, input_pipe) in node.inputs.iter().zip(input_pipes.drain(..)) {
232
if let Some(pipe) = input_pipe {
233
physical_pipes.insert(*input, pipe);
234
235
// For all the receive ports we just initialized inside spawn(), decrement
236
// the num_send_ports_not_yet_ready for the node it was connected to and mark
237
// the node as ready to spawn if all its send ports are connected to
238
// initialized recv ports.
239
let sender = graph.pipes[*input].sender;
240
if let Some(count) = num_send_ports_not_yet_ready.get_mut(sender) {
241
if *count > 0 {
242
*count -= 1;
243
if *count == 0 {
244
ready.push(sender);
245
}
246
}
247
}
248
}
249
}
250
for (output, output_pipe) in node.outputs.iter().zip(output_pipes.drain(..)) {
251
if let Some(pipe) = output_pipe {
252
physical_pipes.insert(*output, pipe);
253
}
254
}
255
256
// Reuse the pipe vectors, clearing the borrow it has for next iteration.
257
input_pipes = reuse_vec(input_pipes);
258
output_pipes = reuse_vec(output_pipes);
259
}
260
261
// Spawn tasks for all the physical pipes (no-op on most, but needed for
262
// those with distributors or linearizers).
263
for pipe in physical_pipes.values_mut() {
264
pipe.spawn(scope, &mut join_handles);
265
}
266
267
// Wait until all tasks are done.
268
// Only now do we turn on/off wait statistics tracking to reduce noise
269
// from task startup.
270
if std::env::var("POLARS_TRACK_WAIT_STATS").as_deref() == Ok("1") {
271
async_executor::track_task_wait_statistics(true);
272
}
273
let ret = polars_io::pl_async::get_runtime().block_on(async move {
274
for handle in join_handles {
275
handle.await?;
276
}
277
PolarsResult::Ok(())
278
});
279
if std::env::var("POLARS_TRACK_WAIT_STATS").as_deref() == Ok("1") {
280
async_executor::track_task_wait_statistics(false);
281
}
282
ret
283
})?;
284
285
Ok(())
286
}
287
288
pub fn execute_graph(
289
graph: &mut Graph,
290
) -> PolarsResult<SparseSecondaryMap<GraphNodeKey, DataFrame>> {
291
// Get the number of threads from the rayon thread-pool as that respects our config.
292
let num_pipelines = POOL.current_num_threads();
293
async_executor::set_num_threads(num_pipelines);
294
295
let (query_tasks_send, query_tasks_recv) = crossbeam_channel::unbounded();
296
let (subphase_tasks_send, subphase_tasks_recv) = crossbeam_channel::unbounded();
297
298
let state = StreamingExecutionState {
299
num_pipelines,
300
in_memory_exec_state: ExecutionState::default(),
301
query_tasks_send,
302
subphase_tasks_send,
303
};
304
305
// Ensure everything is properly connected.
306
for (node_key, node) in &graph.nodes {
307
for (i, input) in node.inputs.iter().enumerate() {
308
assert!(graph.pipes[*input].receiver == node_key);
309
assert!(graph.pipes[*input].recv_port == i);
310
}
311
for (i, output) in node.outputs.iter().enumerate() {
312
assert!(graph.pipes[*output].sender == node_key);
313
assert!(graph.pipes[*output].send_port == i);
314
}
315
}
316
317
let mut pipe_seq_offsets = SecondaryMap::new();
318
loop {
319
// Update the states.
320
if polars_core::config::verbose() {
321
eprintln!("polars-stream: updating graph state");
322
}
323
graph.update_all_states(&state)?;
324
polars_io::pl_async::get_runtime().block_on(async {
325
while let Ok(handle) = subphase_tasks_recv.try_recv() {
326
handle.await.unwrap()?;
327
}
328
PolarsResult::Ok(())
329
})?;
330
331
// Find a subgraph to run.
332
let (nodes, pipes) = find_runnable_subgraph(graph);
333
if polars_core::config::verbose() {
334
for node in &nodes {
335
eprintln!(
336
"polars-stream: running {} in subgraph",
337
graph.nodes[*node].compute.name()
338
);
339
}
340
}
341
342
if nodes.is_empty() {
343
break;
344
}
345
346
// Run the subgraph until phase completion.
347
run_subgraph(graph, &nodes, &pipes, &mut pipe_seq_offsets, &state)?;
348
polars_io::pl_async::get_runtime().block_on(async {
349
while let Ok(handle) = subphase_tasks_recv.try_recv() {
350
handle.await.unwrap()?;
351
}
352
PolarsResult::Ok(())
353
})?;
354
if polars_core::config::verbose() {
355
eprintln!("polars-stream: done running graph phase");
356
}
357
}
358
359
// Ensure everything is done.
360
for pipe in graph.pipes.values() {
361
assert!(pipe.send_state == PortState::Done && pipe.recv_state == PortState::Done);
362
}
363
364
// Finalize query tasks.
365
polars_io::pl_async::get_runtime().block_on(async {
366
while let Ok(handle) = query_tasks_recv.try_recv() {
367
handle.await.unwrap()?;
368
}
369
PolarsResult::Ok(())
370
})?;
371
372
// Extract output from in-memory nodes.
373
let mut out = SparseSecondaryMap::new();
374
for (node_key, node) in graph.nodes.iter_mut() {
375
if let Some(df) = node.compute.get_output()? {
376
out.insert(node_key, df);
377
}
378
}
379
380
Ok(out)
381
}
382
383