Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/metrics.rs
7884 views
1
use std::sync::Arc;
2
use std::time::Duration;
3
4
use slotmap::{SecondaryMap, SlotMap};
5
6
use crate::LogicalPipe;
7
use crate::async_executor::TaskMetrics;
8
use crate::graph::{GraphNodeKey, LogicalPipeKey};
9
use crate::pipe::PipeMetrics;
10
11
#[derive(Default, Clone)]
12
pub struct NodeMetrics {
13
pub total_polls: u64,
14
pub total_stolen_polls: u64,
15
pub total_poll_time_ns: u64,
16
pub max_poll_time_ns: u64,
17
18
pub total_state_updates: u64,
19
pub total_state_update_time_ns: u64,
20
pub max_state_update_time_ns: u64,
21
22
pub morsels_sent: u64,
23
pub rows_sent: u64,
24
pub largest_morsel_sent: u64,
25
pub morsels_received: u64,
26
pub rows_received: u64,
27
pub largest_morsel_received: u64,
28
29
pub state_update_in_progress: bool,
30
pub num_running_tasks: u32,
31
pub done: bool,
32
}
33
34
impl NodeMetrics {
35
fn add_task(&mut self, task_metrics: &TaskMetrics) {
36
self.total_polls += task_metrics.total_polls.load();
37
self.total_stolen_polls += task_metrics.total_stolen_polls.load();
38
self.total_poll_time_ns += task_metrics.total_poll_time_ns.load();
39
self.max_poll_time_ns = self
40
.max_poll_time_ns
41
.max(task_metrics.max_poll_time_ns.load());
42
self.num_running_tasks += (!task_metrics.done.load()) as u32;
43
}
44
45
fn start_state_update(&mut self) {
46
self.state_update_in_progress = true;
47
}
48
49
fn stop_state_update(&mut self, time: Duration, is_done: bool) {
50
let time_ns = time.as_nanos() as u64;
51
self.total_state_updates += 1;
52
self.total_state_update_time_ns += time_ns;
53
self.max_state_update_time_ns = self.max_state_update_time_ns.max(time_ns);
54
self.state_update_in_progress = false;
55
self.done = is_done;
56
}
57
58
fn add_send_metrics(&mut self, pipe_metrics: &PipeMetrics) {
59
self.morsels_sent += pipe_metrics.morsels_sent.load();
60
self.rows_sent += pipe_metrics.rows_sent.load();
61
self.largest_morsel_sent = self
62
.largest_morsel_sent
63
.max(pipe_metrics.largest_morsel_sent.load());
64
}
65
66
fn add_recv_metrics(&mut self, pipe_metrics: &PipeMetrics) {
67
self.morsels_received += pipe_metrics.morsels_received.load();
68
self.rows_received += pipe_metrics.rows_received.load();
69
self.largest_morsel_received = self
70
.largest_morsel_received
71
.max(pipe_metrics.largest_morsel_received.load());
72
}
73
}
74
75
#[derive(Default, Clone)]
76
pub struct GraphMetrics {
77
node_metrics: SecondaryMap<GraphNodeKey, NodeMetrics>,
78
in_progress_task_metrics: SecondaryMap<GraphNodeKey, Vec<Arc<TaskMetrics>>>,
79
in_progress_pipe_metrics: SecondaryMap<LogicalPipeKey, Vec<Arc<PipeMetrics>>>,
80
}
81
82
impl GraphMetrics {
83
pub fn add_task(&mut self, key: GraphNodeKey, task_metrics: Arc<TaskMetrics>) {
84
self.in_progress_task_metrics
85
.entry(key)
86
.unwrap()
87
.or_default()
88
.push(task_metrics);
89
}
90
91
pub fn add_pipe(&mut self, key: LogicalPipeKey, pipe_metrics: Arc<PipeMetrics>) {
92
self.in_progress_pipe_metrics
93
.entry(key)
94
.unwrap()
95
.or_default()
96
.push(pipe_metrics);
97
}
98
99
pub fn start_state_update(&mut self, key: GraphNodeKey) {
100
self.node_metrics
101
.entry(key)
102
.unwrap()
103
.or_default()
104
.start_state_update();
105
}
106
107
pub fn stop_state_update(&mut self, key: GraphNodeKey, time: Duration, is_done: bool) {
108
self.node_metrics[key].stop_state_update(time, is_done);
109
}
110
111
pub fn flush(&mut self, pipes: &SlotMap<LogicalPipeKey, LogicalPipe>) {
112
for (key, in_progress_task_metrics) in self.in_progress_task_metrics.iter_mut() {
113
let this_node_metrics = self.node_metrics.entry(key).unwrap().or_default();
114
this_node_metrics.num_running_tasks = 0;
115
for task_metrics in in_progress_task_metrics.drain(..) {
116
this_node_metrics.add_task(&task_metrics);
117
}
118
}
119
120
for (key, in_progress_pipe_metrics) in self.in_progress_pipe_metrics.iter_mut() {
121
for pipe_metrics in in_progress_pipe_metrics.drain(..) {
122
let pipe = &pipes[key];
123
self.node_metrics
124
.entry(pipe.receiver)
125
.unwrap()
126
.or_default()
127
.add_recv_metrics(&pipe_metrics);
128
self.node_metrics
129
.entry(pipe.sender)
130
.unwrap()
131
.or_default()
132
.add_send_metrics(&pipe_metrics);
133
}
134
}
135
}
136
137
pub fn get(&self, key: GraphNodeKey) -> Option<&NodeMetrics> {
138
self.node_metrics.get(key)
139
}
140
141
pub fn iter(&self) -> slotmap::secondary::Iter<'_, GraphNodeKey, NodeMetrics> {
142
self.node_metrics.iter()
143
}
144
}
145
146