Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/file_cache/cache_lock.rs
6939 views
1
use std::sync::atomic::AtomicBool;
2
use std::sync::{Arc, LazyLock, RwLock, RwLockReadGuard, RwLockWriteGuard};
3
use std::time::Duration;
4
5
use fs4::fs_std::FileExt;
6
7
use super::utils::FILE_CACHE_PREFIX;
8
use crate::pl_async;
9
10
pub(super) static GLOBAL_FILE_CACHE_LOCK: LazyLock<GlobalLock> = LazyLock::new(|| {
11
let path = FILE_CACHE_PREFIX.join(".process-lock");
12
13
let file = std::fs::OpenOptions::new()
14
.write(true)
15
.create(true)
16
.truncate(false)
17
.open(path)
18
.map_err(|err| {
19
panic!("failed to open/create global file cache lockfile: {err}");
20
})
21
.unwrap();
22
23
let at_bool = Arc::new(AtomicBool::new(false));
24
// Holding this access tracker prevents the background task from
25
// unlocking the lock.
26
let access_tracker = AccessTracker(at_bool.clone());
27
let notify_lock_acquired = Arc::new(tokio::sync::Notify::new());
28
let notify_lock_acquired_2 = notify_lock_acquired.clone();
29
30
pl_async::get_runtime().spawn(async move {
31
let access_tracker = at_bool;
32
let notify_lock_acquired = notify_lock_acquired_2;
33
let verbose = false;
34
35
loop {
36
if verbose {
37
eprintln!("file cache background unlock: waiting for acquisition notification");
38
}
39
40
notify_lock_acquired.notified().await;
41
42
if verbose {
43
eprintln!("file cache background unlock: got acquisition notification");
44
}
45
46
loop {
47
if !access_tracker.swap(false, std::sync::atomic::Ordering::Relaxed) {
48
if let Some(unlocked_by_this_call) = GLOBAL_FILE_CACHE_LOCK.try_unlock() {
49
if unlocked_by_this_call && verbose {
50
eprintln!(
51
"file cache background unlock: unlocked global file cache lockfile"
52
);
53
}
54
break;
55
}
56
}
57
tokio::time::sleep(Duration::from_secs(3)).await;
58
}
59
}
60
});
61
62
GlobalLock {
63
inner: RwLock::new(GlobalLockData { file, state: None }),
64
access_tracker,
65
notify_lock_acquired,
66
}
67
});
68
69
pub(super) enum LockedState {
70
/// Shared between threads and other processes.
71
Shared,
72
#[allow(dead_code)]
73
/// Locked exclusively by the eviction task of this process.
74
Eviction,
75
}
76
77
#[allow(dead_code)]
78
pub(super) type GlobalFileCacheGuardAny<'a> = RwLockReadGuard<'a, GlobalLockData>;
79
pub(super) type GlobalFileCacheGuardExclusive<'a> = RwLockWriteGuard<'a, GlobalLockData>;
80
81
pub(super) struct GlobalLockData {
82
file: std::fs::File,
83
state: Option<LockedState>,
84
}
85
86
pub(super) struct GlobalLock {
87
inner: RwLock<GlobalLockData>,
88
access_tracker: AccessTracker,
89
notify_lock_acquired: Arc<tokio::sync::Notify>,
90
}
91
92
/// Tracks access to the global lock:
93
/// * The inner `bool` is used to delay the background unlock task from unlocking
94
/// the global lock until 3 seconds after the last lock attempt.
95
/// * The `Arc` ref-count is used as a semaphore that allows us to block exclusive
96
/// lock attempts while temporarily releasing the `RwLock`.
97
#[derive(Clone)]
98
struct AccessTracker(Arc<AtomicBool>);
99
100
impl Drop for AccessTracker {
101
fn drop(&mut self) {
102
self.0.store(true, std::sync::atomic::Ordering::Relaxed);
103
}
104
}
105
106
struct NotifyOnDrop(Arc<tokio::sync::Notify>);
107
108
impl Drop for NotifyOnDrop {
109
fn drop(&mut self) {
110
self.0.notify_one();
111
}
112
}
113
114
impl GlobalLock {
115
fn get_access_tracker(&self) -> AccessTracker {
116
let at = self.access_tracker.clone();
117
at.0.store(true, std::sync::atomic::Ordering::Relaxed);
118
at
119
}
120
121
/// Returns
122
/// * `None` - Could be locked (ambiguous)
123
/// * `Some(true)` - Unlocked (by this function call)
124
/// * `Some(false)` - Unlocked (was not locked)
125
fn try_unlock(&self) -> Option<bool> {
126
if let Ok(mut this) = self.inner.try_write() {
127
if Arc::strong_count(&self.access_tracker.0) <= 2 {
128
return if this.state.take().is_some() {
129
FileExt::unlock(&this.file).unwrap();
130
Some(true)
131
} else {
132
Some(false)
133
};
134
}
135
}
136
None
137
}
138
139
/// Acquire a shared lock.
140
pub(super) fn lock_shared(&self) -> GlobalFileCacheGuardAny<'_> {
141
let access_tracker = self.get_access_tracker();
142
let _notify_on_drop = NotifyOnDrop(self.notify_lock_acquired.clone());
143
144
{
145
let this = self.inner.read().unwrap();
146
147
if let Some(LockedState::Shared) = this.state {
148
return this;
149
}
150
}
151
152
{
153
let mut this = self.inner.write().unwrap();
154
155
if let Some(LockedState::Eviction) = this.state {
156
FileExt::unlock(&this.file).unwrap();
157
this.state = None;
158
}
159
160
if this.state.is_none() {
161
FileExt::lock_shared(&this.file).unwrap();
162
this.state = Some(LockedState::Shared);
163
}
164
}
165
166
// Safety: Holding the access tracker guard maintains an Arc refcount
167
// > 2, which prevents automatic unlock.
168
debug_assert!(Arc::strong_count(&access_tracker.0) > 2);
169
170
{
171
let this = self.inner.read().unwrap();
172
173
if let Some(LockedState::Eviction) = this.state {
174
// Try again
175
drop(this);
176
return self.lock_shared();
177
}
178
179
assert!(
180
this.state.is_some(),
181
"impl error: global file cache lock was unlocked"
182
);
183
this
184
}
185
}
186
187
/// Acquire an exclusive lock on the cache directory. Holding this lock freezes
188
/// all cache operations except for reading from already-opened data files.
189
#[allow(dead_code)]
190
pub(super) fn try_lock_eviction(&self) -> Option<GlobalFileCacheGuardExclusive<'_>> {
191
let access_tracker = self.get_access_tracker();
192
193
if let Ok(mut this) = self.inner.try_write() {
194
if
195
// 3:
196
// * the Lazy<GlobalLock>
197
// * the global unlock background task
198
// * this function
199
Arc::strong_count(&access_tracker.0) > 3 {
200
return None;
201
}
202
203
let _notify_on_drop = NotifyOnDrop(self.notify_lock_acquired.clone());
204
205
if let Some(ref state) = this.state {
206
if matches!(state, LockedState::Eviction) {
207
return Some(this);
208
}
209
}
210
211
if this.state.take().is_some() {
212
FileExt::unlock(&this.file).unwrap();
213
}
214
215
if this.file.try_lock_exclusive().is_ok() {
216
this.state = Some(LockedState::Eviction);
217
return Some(this);
218
}
219
}
220
None
221
}
222
}
223
224