Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/mod.rs
6939 views
1
#[cfg(feature = "cum_agg")]
2
pub mod cum_agg;
3
pub mod dynamic_slice;
4
pub mod filter;
5
pub mod group_by;
6
pub mod in_memory_map;
7
pub mod in_memory_sink;
8
pub mod in_memory_source;
9
pub mod input_independent_select;
10
pub mod io_sinks;
11
pub mod io_sources;
12
pub mod joins;
13
pub mod map;
14
#[cfg(feature = "merge_sorted")]
15
pub mod merge_sorted;
16
pub mod multiplexer;
17
pub mod negative_slice;
18
pub mod ordered_union;
19
pub mod peak_minmax;
20
pub mod reduce;
21
pub mod repeat;
22
pub mod rle;
23
pub mod rle_id;
24
pub mod select;
25
pub mod shift;
26
pub mod simple_projection;
27
pub mod streaming_slice;
28
pub mod top_k;
29
pub mod with_row_index;
30
pub mod zip;
31
32
/// The imports you'll always need for implementing a ComputeNode.
33
mod compute_node_prelude {
34
pub use polars_core::frame::DataFrame;
35
pub use polars_error::PolarsResult;
36
pub use polars_expr::state::ExecutionState;
37
38
pub use super::ComputeNode;
39
pub use crate::async_executor::{JoinHandle, TaskPriority, TaskScope};
40
pub use crate::execute::StreamingExecutionState;
41
pub use crate::graph::PortState;
42
pub use crate::morsel::{Morsel, MorselSeq};
43
pub use crate::pipe::{RecvPort, SendPort};
44
}
45
46
use compute_node_prelude::*;
47
48
use crate::execute::StreamingExecutionState;
49
50
pub trait ComputeNode: Send {
51
/// The name of this node.
52
fn name(&self) -> &str;
53
54
/// Update the state of this node given the state of our input and output
55
/// ports. May be called multiple times until fully resolved for each
56
/// execution phase.
57
///
58
/// For each input pipe `recv` will contain a respective state of the
59
/// send port that pipe is connected to when called, and it is expected when
60
/// `update_state` returns it contains your computed receive port state.
61
///
62
/// Similarly, for each output pipe `send` will contain the respective
63
/// state of the input port that pipe is connected to when called, and you
64
/// must update it to contain the desired state of your output port.
65
fn update_state(
66
&mut self,
67
recv: &mut [PortState],
68
send: &mut [PortState],
69
state: &StreamingExecutionState,
70
) -> PolarsResult<()>;
71
72
/// If this node (in its current state) is a pipeline blocker, and whether
73
/// this is memory intensive or not.
74
fn is_memory_intensive_pipeline_blocker(&self) -> bool {
75
false
76
}
77
78
/// Spawn the tasks that this compute node needs to receive input(s),
79
/// process it and send to its output(s). Called once per execution phase.
80
fn spawn<'env, 's>(
81
&'env mut self,
82
scope: &'s TaskScope<'s, 'env>,
83
recv_ports: &mut [Option<RecvPort<'_>>],
84
send_ports: &mut [Option<SendPort<'_>>],
85
state: &'s StreamingExecutionState,
86
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
87
);
88
89
/// Called once after the last execution phase to extract output from
90
/// in-memory nodes.
91
fn get_output(&mut self) -> PolarsResult<Option<DataFrame>> {
92
Ok(None)
93
}
94
}
95
96