Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/metrics.rs
8440 views
1
use std::sync::Arc;
2
3
use polars_utils::live_timer::{LiveTimer, LiveTimerSession};
4
use polars_utils::relaxed_cell::RelaxedCell;
5
6
pub const HEAD_RESPONSE_SIZE_ESTIMATE: u64 = 1;
7
8
#[derive(Debug, Default, Clone)]
9
pub struct IOMetrics {
10
pub io_timer: LiveTimer,
11
pub bytes_requested: RelaxedCell<u64>,
12
pub bytes_received: RelaxedCell<u64>,
13
pub bytes_sent: RelaxedCell<u64>,
14
}
15
16
#[derive(Debug, Clone)]
17
pub struct OptIOMetrics(pub Option<Arc<IOMetrics>>);
18
19
impl OptIOMetrics {
20
pub fn start_io_session(&self) -> Option<LiveTimerSession> {
21
self.0.as_ref().map(|x| x.io_timer.start_session())
22
}
23
24
pub fn add_bytes_requested(&self, bytes_requested: u64) {
25
self.0
26
.as_ref()
27
.map(|x| x.bytes_requested.fetch_add(bytes_requested));
28
}
29
30
pub fn add_bytes_received(&self, bytes_received: u64) {
31
self.0
32
.as_ref()
33
.map(|x| x.bytes_received.fetch_add(bytes_received));
34
}
35
36
pub fn add_bytes_sent(&self, bytes_sent: u64) {
37
self.0.as_ref().map(|x| x.bytes_sent.fetch_add(bytes_sent));
38
}
39
40
pub async fn record_io_read<F, O>(&self, num_bytes: u64, fut: F) -> O
41
where
42
F: Future<Output = O>,
43
{
44
self.add_bytes_requested(num_bytes);
45
46
let io_session = self.start_io_session();
47
48
let out = fut.await;
49
50
drop(io_session);
51
52
self.add_bytes_received(num_bytes);
53
54
out
55
}
56
57
pub async fn record_bytes_tx<F, O>(&self, num_bytes: u64, fut: F) -> O
58
where
59
F: Future<Output = O>,
60
{
61
let io_session = self.start_io_session();
62
63
let out = fut.await;
64
65
drop(io_session);
66
67
self.add_bytes_sent(num_bytes);
68
69
out
70
}
71
}
72
73