Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-utils/src/live_timer.rs
8407 views
1
use std::sync::atomic::{AtomicU64, Ordering};
2
use std::time::Instant;
3
4
const TICKING_BIT: u64 = 1 << 63; // Indicates if the timer is currently ticking.
5
const SESSION_UNIT: u64 = 1; // Base unit for number of sessions.
6
const SESSION_MASK: u64 = TIMER_UNIT - 1; // Mask used to extract session count.
7
const TIMER_UNIT: u64 = 1 << 32; // Base unit for number of timers.
8
9
/// Counts for how much time this timer was 'live'.
10
///
11
/// It is live when there is at least one session, but multiple concurrent
12
/// sessions don't increase the rate at which the timer ticks.
13
///
14
/// Clones of this timer are cheap, and the clones are identical, like an Arc.
15
pub struct LiveTimer {
16
// We use a raw pointer instead of an Arc to ensure starting/stopping sessions only involves
17
// a single atomic operation, while still letting LiveTimerSession be lifetime-free.
18
inner: *mut LiveTimerInner,
19
}
20
21
unsafe impl Send for LiveTimer {}
22
unsafe impl Sync for LiveTimer {}
23
24
impl Default for LiveTimer {
25
fn default() -> Self {
26
Self::new()
27
}
28
}
29
30
impl LiveTimer {
31
pub fn new() -> Self {
32
let inner = LiveTimerInner {
33
base_timestamp: Instant::now(),
34
refcount: AtomicU64::new(TIMER_UNIT),
35
_padding: [0; _],
36
state_ns: AtomicU64::new(0),
37
max_live_ns: AtomicU64::new(0),
38
};
39
40
Self {
41
inner: Box::into_raw(Box::new(inner)),
42
}
43
}
44
45
pub fn start_session(&self) -> LiveTimerSession {
46
unsafe { (&*self.inner).start_session() };
47
LiveTimerSession { inner: self.inner }
48
}
49
50
pub fn total_time_live_ns(&self) -> u64 {
51
unsafe { (&*self.inner).total_time_live_ns() }
52
}
53
}
54
55
impl Clone for LiveTimer {
56
fn clone(&self) -> Self {
57
let inner = unsafe { &*self.inner };
58
inner.refcount.fetch_add(TIMER_UNIT, Ordering::Relaxed);
59
Self { inner: self.inner }
60
}
61
}
62
63
impl Drop for LiveTimer {
64
fn drop(&mut self) {
65
unsafe {
66
let old_rc = (&*self.inner)
67
.refcount
68
.fetch_sub(TIMER_UNIT, Ordering::AcqRel);
69
if old_rc == TIMER_UNIT {
70
drop(Box::from_raw(self.inner))
71
}
72
}
73
}
74
}
75
76
pub struct LiveTimerSession {
77
inner: *mut LiveTimerInner,
78
}
79
80
unsafe impl Send for LiveTimerSession {}
81
unsafe impl Sync for LiveTimerSession {}
82
83
impl Clone for LiveTimerSession {
84
fn clone(&self) -> Self {
85
unsafe { (*self.inner).start_session() };
86
Self { inner: self.inner }
87
}
88
}
89
90
impl Drop for LiveTimerSession {
91
fn drop(&mut self) {
92
unsafe {
93
if (*self.inner).stop_session() {
94
drop(Box::from_raw(self.inner))
95
}
96
}
97
}
98
}
99
100
struct LiveTimerInner {
101
base_timestamp: Instant,
102
/// Contains two 32-bit refcounts: number of timer references and number of live sessions.
103
/// If both are zero this object is destroyed.
104
refcount: AtomicU64,
105
/// Ensures refcount (commonly modified) is on a different cache line to state_ns.
106
_padding: [u8; 64],
107
/// Contains the total amount of time the timer was live. Interpreted differently depending on TICKING_BIT:
108
/// 0 -> Duration::from_nanos(state)
109
/// 1 -> base_timestamp.elapsed() - Duration::from_nanos(state & !TICKING_BIT)
110
state_ns: AtomicU64,
111
/// Used by `total_time_live_ns` to ensure it returns monotonically nondecreasing values.
112
max_live_ns: AtomicU64,
113
}
114
115
impl LiveTimerInner {
116
fn start_session(&self) {
117
// (1) Acquire to ensure we load state_ns from previous stop_session, if necessary.
118
let prev_sessions = self.refcount.fetch_add(SESSION_UNIT, Ordering::Acquire) & SESSION_MASK;
119
if prev_sessions == 0 {
120
let orig_state_ns = self.state_ns.load(Ordering::Relaxed);
121
let start_ns = self.base_timestamp.elapsed().as_nanos() as u64;
122
debug_assert!(orig_state_ns & TICKING_BIT == 0);
123
let new_state_ns = start_ns.saturating_sub(orig_state_ns) | TICKING_BIT;
124
self.state_ns.store(new_state_ns, Ordering::Release); // See (2).
125
}
126
}
127
128
/// Returns true if this timer should be destroyed.
129
fn stop_session(&self) -> bool {
130
let mut stopped = false;
131
let mut stopped_at_ns = u64::MAX;
132
let mut orig_state_ns = u64::MAX;
133
134
// (1) Acquire and Release to ensure we load state_ns from previous start_session, if necessary.
135
self.refcount
136
.fetch_update(Ordering::Release, Ordering::Acquire, |rc| {
137
if rc == SESSION_UNIT {
138
return None; // We're the sole reference, can just destroy.
139
}
140
141
// Stop or re-start the timer if necessary.
142
let should_stop = rc & SESSION_MASK == SESSION_UNIT;
143
if should_stop && !stopped {
144
if stopped_at_ns == u64::MAX {
145
orig_state_ns = self.state_ns.load(Ordering::Relaxed);
146
stopped_at_ns = self.base_timestamp.elapsed().as_nanos() as u64;
147
}
148
let new_state_ns = stopped_at_ns.saturating_sub(orig_state_ns & !TICKING_BIT);
149
self.state_ns.store(new_state_ns, Ordering::Relaxed);
150
stopped = true;
151
} else if !should_stop && stopped {
152
self.state_ns.store(orig_state_ns, Ordering::Release); // See (2).
153
stopped = false;
154
}
155
156
Some(rc - SESSION_UNIT)
157
})
158
.is_err()
159
}
160
161
fn total_time_live_ns(&self) -> u64 {
162
// (2) Acquire ensures the elapsed() call by this function is sequenced after the elapsed()
163
// call that state_ns value was calculated against if it is currently ticking.
164
let state_ns = self.state_ns.load(Ordering::Acquire);
165
let active_time = if state_ns & TICKING_BIT == 0 {
166
state_ns
167
} else {
168
let now_ns = self.base_timestamp.elapsed().as_nanos() as u64;
169
now_ns.saturating_sub(state_ns & !TICKING_BIT)
170
};
171
172
// Needed to ensure monotonicity, our load above interleaved with stops/restarts
173
// could otherwise be non-monotonic.
174
u64::max(
175
active_time,
176
self.max_live_ns.fetch_max(active_time, Ordering::Relaxed),
177
)
178
}
179
}
180
181
impl std::fmt::Debug for LiveTimer {
182
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
183
let refcount = unsafe { (&*self.inner).refcount.load(Ordering::Relaxed) };
184
let active_sessions = refcount & SESSION_MASK;
185
let total_time_live_ns = self.total_time_live_ns();
186
187
return std::fmt::Debug::fmt(
188
&display::LiveTimer {
189
active_sessions,
190
total_time_live_ns,
191
},
192
f,
193
);
194
195
mod display {
196
#[derive(Debug)]
197
#[expect(unused)]
198
pub struct LiveTimer {
199
pub active_sessions: u64,
200
pub total_time_live_ns: u64,
201
}
202
}
203
}
204
}
205
206
impl std::fmt::Debug for LiveTimerSession {
207
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
208
return std::fmt::Debug::fmt(&display::LiveTimerSession {}, f);
209
210
mod display {
211
#[derive(Debug)]
212
pub struct LiveTimerSession {}
213
}
214
}
215
}
216
217