Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/nodes/mod.rs
8458 views
1
pub mod callback_sink;
2
#[cfg(feature = "cum_agg")]
3
pub mod cum_agg;
4
#[cfg(feature = "dynamic_group_by")]
5
pub mod dynamic_group_by;
6
pub mod dynamic_slice;
7
#[cfg(feature = "ewma")]
8
pub mod ewm;
9
pub mod filter;
10
pub mod gather_every;
11
pub mod group_by;
12
pub mod in_memory_map;
13
pub mod in_memory_sink;
14
pub mod in_memory_source;
15
pub mod input_independent_select;
16
pub mod io_sinks;
17
pub mod io_sources;
18
pub mod joins;
19
pub mod map;
20
#[cfg(feature = "merge_sorted")]
21
pub mod merge_sorted;
22
pub mod multiplexer;
23
pub mod negative_slice;
24
pub mod ordered_union;
25
pub mod peak_minmax;
26
pub mod reduce;
27
pub mod repeat;
28
pub mod rle;
29
pub mod rle_id;
30
#[cfg(feature = "dynamic_group_by")]
31
pub mod rolling_group_by;
32
pub mod select;
33
pub mod shift;
34
pub mod simple_projection;
35
pub mod sorted_group_by;
36
pub mod streaming_slice;
37
pub mod top_k;
38
pub mod unordered_union;
39
pub mod with_row_index;
40
pub mod zip;
41
42
/// The imports you'll always need for implementing a ComputeNode.
43
mod compute_node_prelude {
44
pub use polars_core::frame::DataFrame;
45
pub use polars_error::PolarsResult;
46
pub use polars_expr::state::ExecutionState;
47
48
pub use super::ComputeNode;
49
pub use crate::async_executor::{JoinHandle, TaskPriority, TaskScope};
50
pub use crate::execute::StreamingExecutionState;
51
pub use crate::graph::PortState;
52
pub use crate::morsel::{Morsel, MorselSeq};
53
pub use crate::pipe::{PortReceiver, PortSender, RecvPort, SendPort};
54
}
55
56
use compute_node_prelude::*;
57
58
use crate::execute::StreamingExecutionState;
59
use crate::metrics::MetricsBuilder;
60
61
pub trait ComputeNode: Send {
62
/// The name of this node.
63
fn name(&self) -> &str;
64
65
/// Update the state of this node given the state of our input and output
66
/// ports. May be called multiple times until fully resolved for each
67
/// execution phase.
68
///
69
/// For each input pipe `recv` will contain a respective state of the
70
/// send port that pipe is connected to when called, and it is expected when
71
/// `update_state` returns it contains your computed receive port state.
72
///
73
/// Similarly, for each output pipe `send` will contain the respective
74
/// state of the input port that pipe is connected to when called, and you
75
/// must update it to contain the desired state of your output port.
76
fn update_state(
77
&mut self,
78
recv: &mut [PortState],
79
send: &mut [PortState],
80
state: &StreamingExecutionState,
81
) -> PolarsResult<()>;
82
83
/// If this node (in its current state) is a pipeline blocker, and whether
84
/// this is memory intensive or not.
85
fn is_memory_intensive_pipeline_blocker(&self) -> bool {
86
false
87
}
88
89
/// Spawn the tasks that this compute node needs to receive input(s),
90
/// process it and send to its output(s). Called once per execution phase.
91
fn spawn<'env, 's>(
92
&'env mut self,
93
scope: &'s TaskScope<'s, 'env>,
94
recv_ports: &mut [Option<RecvPort<'_>>],
95
send_ports: &mut [Option<SendPort<'_>>],
96
state: &'s StreamingExecutionState,
97
join_handles: &mut Vec<JoinHandle<PolarsResult<()>>>,
98
);
99
100
fn set_metrics_builder(&mut self, _metrics_builder: MetricsBuilder) {}
101
102
/// Called once after the last execution phase to extract output from
103
/// in-memory nodes.
104
fn get_output(&mut self) -> PolarsResult<Option<DataFrame>> {
105
Ok(None)
106
}
107
}
108
109