Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-stream/src/metrics.rs
8433 views
1
use std::sync::Arc;
2
use std::time::Duration;
3
4
pub use polars_io::metrics::{IOMetrics, OptIOMetrics};
5
use slotmap::{SecondaryMap, SlotMap};
6
7
use crate::LogicalPipe;
8
use crate::async_executor::TaskMetrics;
9
use crate::graph::{GraphNodeKey, LogicalPipeKey};
10
use crate::pipe::PipeMetrics;
11
12
#[derive(Default, Clone)]
13
pub struct NodeMetrics {
14
pub total_polls: u64,
15
pub total_stolen_polls: u64,
16
pub total_poll_time_ns: u64,
17
pub max_poll_time_ns: u64,
18
19
pub total_state_updates: u64,
20
pub total_state_update_time_ns: u64,
21
pub max_state_update_time_ns: u64,
22
23
pub morsels_sent: u64,
24
pub rows_sent: u64,
25
pub largest_morsel_sent: u64,
26
pub morsels_received: u64,
27
pub rows_received: u64,
28
pub largest_morsel_received: u64,
29
30
pub io_total_active_ns: u64,
31
pub io_total_bytes_requested: u64,
32
pub io_total_bytes_received: u64,
33
pub io_total_bytes_sent: u64,
34
35
pub state_update_in_progress: bool,
36
pub num_running_tasks: u32,
37
pub done: bool,
38
}
39
40
impl NodeMetrics {
41
fn add_task(&mut self, task_metrics: &TaskMetrics) {
42
self.total_polls += task_metrics.total_polls.load();
43
self.total_stolen_polls += task_metrics.total_stolen_polls.load();
44
self.total_poll_time_ns += task_metrics.total_poll_time_ns.load();
45
self.max_poll_time_ns = self
46
.max_poll_time_ns
47
.max(task_metrics.max_poll_time_ns.load());
48
self.num_running_tasks += (!task_metrics.done.load()) as u32;
49
}
50
51
fn add_io(&mut self, io_metrics: &IOMetrics) {
52
self.io_total_active_ns += io_metrics.io_timer.total_time_live_ns();
53
self.io_total_bytes_requested += io_metrics.bytes_requested.load();
54
self.io_total_bytes_received += io_metrics.bytes_received.load();
55
self.io_total_bytes_sent += io_metrics.bytes_sent.load();
56
}
57
58
fn start_state_update(&mut self) {
59
self.state_update_in_progress = true;
60
}
61
62
fn stop_state_update(&mut self, time: Duration, is_done: bool) {
63
let time_ns = time.as_nanos() as u64;
64
self.total_state_updates += 1;
65
self.total_state_update_time_ns += time_ns;
66
self.max_state_update_time_ns = self.max_state_update_time_ns.max(time_ns);
67
self.state_update_in_progress = false;
68
self.done = is_done;
69
}
70
71
fn add_send_metrics(&mut self, pipe_metrics: &PipeMetrics) {
72
self.morsels_sent += pipe_metrics.morsels_sent.load();
73
self.rows_sent += pipe_metrics.rows_sent.load();
74
self.largest_morsel_sent = self
75
.largest_morsel_sent
76
.max(pipe_metrics.largest_morsel_sent.load());
77
}
78
79
fn add_recv_metrics(&mut self, pipe_metrics: &PipeMetrics) {
80
self.morsels_received += pipe_metrics.morsels_received.load();
81
self.rows_received += pipe_metrics.rows_received.load();
82
self.largest_morsel_received = self
83
.largest_morsel_received
84
.max(pipe_metrics.largest_morsel_received.load());
85
}
86
}
87
88
#[derive(Default, Clone)]
89
pub struct GraphMetrics {
90
node_metrics: SecondaryMap<GraphNodeKey, NodeMetrics>,
91
in_progress_io_metrics: SecondaryMap<GraphNodeKey, Vec<Arc<IOMetrics>>>,
92
in_progress_task_metrics: SecondaryMap<GraphNodeKey, Vec<Arc<TaskMetrics>>>,
93
in_progress_pipe_metrics: SecondaryMap<LogicalPipeKey, Vec<Arc<PipeMetrics>>>,
94
}
95
96
impl GraphMetrics {
97
pub fn add_task(&mut self, key: GraphNodeKey, task_metrics: Arc<TaskMetrics>) {
98
self.in_progress_task_metrics
99
.entry(key)
100
.unwrap()
101
.or_default()
102
.push(task_metrics);
103
}
104
105
pub fn add_pipe(&mut self, key: LogicalPipeKey, pipe_metrics: Arc<PipeMetrics>) {
106
self.in_progress_pipe_metrics
107
.entry(key)
108
.unwrap()
109
.or_default()
110
.push(pipe_metrics);
111
}
112
113
pub fn start_state_update(&mut self, key: GraphNodeKey) {
114
self.node_metrics
115
.entry(key)
116
.unwrap()
117
.or_default()
118
.start_state_update();
119
}
120
121
pub fn stop_state_update(&mut self, key: GraphNodeKey, time: Duration, is_done: bool) {
122
self.node_metrics[key].stop_state_update(time, is_done);
123
}
124
125
pub fn flush(&mut self, pipes: &SlotMap<LogicalPipeKey, LogicalPipe>) {
126
for (key, in_progress_task_metrics) in self.in_progress_task_metrics.iter_mut() {
127
let this_node_metrics = self.node_metrics.entry(key).unwrap().or_default();
128
this_node_metrics.num_running_tasks = 0;
129
for task_metrics in in_progress_task_metrics.drain(..) {
130
this_node_metrics.add_task(&task_metrics);
131
}
132
}
133
134
for (key, in_progress_io_metrics) in self.in_progress_io_metrics.iter_mut() {
135
let this_node_metrics = self.node_metrics.entry(key).unwrap().or_default();
136
this_node_metrics.num_running_tasks = 0;
137
for io_metrics in in_progress_io_metrics.drain(..) {
138
this_node_metrics.add_io(&io_metrics);
139
}
140
}
141
142
for (key, in_progress_pipe_metrics) in self.in_progress_pipe_metrics.iter_mut() {
143
for pipe_metrics in in_progress_pipe_metrics.drain(..) {
144
let pipe = &pipes[key];
145
self.node_metrics
146
.entry(pipe.receiver)
147
.unwrap()
148
.or_default()
149
.add_recv_metrics(&pipe_metrics);
150
self.node_metrics
151
.entry(pipe.sender)
152
.unwrap()
153
.or_default()
154
.add_send_metrics(&pipe_metrics);
155
}
156
}
157
}
158
159
pub fn get(&self, key: GraphNodeKey) -> Option<&NodeMetrics> {
160
self.node_metrics.get(key)
161
}
162
163
pub fn iter(&self) -> slotmap::secondary::Iter<'_, GraphNodeKey, NodeMetrics> {
164
self.node_metrics.iter()
165
}
166
}
167
168
pub struct MetricsBuilder {
169
pub graph_key: GraphNodeKey,
170
pub graph_metrics: Arc<parking_lot::Mutex<GraphMetrics>>,
171
}
172
173
impl MetricsBuilder {
174
pub fn new_io_metrics(&self) -> Arc<IOMetrics> {
175
let io_metrics: Arc<IOMetrics> = Default::default();
176
177
self.graph_metrics
178
.lock()
179
.in_progress_io_metrics
180
.entry(self.graph_key)
181
.unwrap()
182
.or_default()
183
.push(Arc::clone(&io_metrics));
184
185
io_metrics
186
}
187
}
188
189