Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/file_cache/eviction.rs
8422 views
1
use std::sync::Arc;
2
use std::sync::atomic::AtomicU64;
3
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
4
5
use fs4::fs_std::FileExt;
6
use polars_error::{PolarsError, PolarsResult};
7
use polars_utils::pl_path::PlRefPath;
8
9
use super::cache_lock::{GLOBAL_FILE_CACHE_LOCK, GlobalFileCacheGuardExclusive};
10
use super::metadata::EntryMetadata;
11
use crate::pl_async;
12
13
#[derive(Debug, Clone)]
14
pub(super) struct EvictionCandidate {
15
path: PlRefPath,
16
metadata_path: PlRefPath,
17
metadata_last_modified: SystemTime,
18
ttl: u64,
19
}
20
21
pub(super) struct EvictionManager {
22
pub(super) data_dir: PlRefPath,
23
pub(super) metadata_dir: PlRefPath,
24
pub(super) files_to_remove: Option<Vec<EvictionCandidate>>,
25
pub(super) min_ttl: Arc<AtomicU64>,
26
pub(super) notify_ttl_updated: Arc<tokio::sync::Notify>,
27
}
28
29
impl EvictionCandidate {
30
fn update_ttl(&mut self) {
31
let Ok(metadata_last_modified) =
32
std::fs::metadata(&self.metadata_path).map(|md| md.modified().unwrap())
33
else {
34
self.ttl = 0;
35
return;
36
};
37
38
if self.metadata_last_modified == metadata_last_modified {
39
return;
40
}
41
42
let Ok(ref mut file) = std::fs::OpenOptions::new()
43
.read(true)
44
.open(&self.metadata_path)
45
else {
46
self.ttl = 0;
47
return;
48
};
49
50
let ttl = EntryMetadata::try_from_reader(file)
51
.map(|x| x.ttl)
52
.unwrap_or(0);
53
54
self.metadata_last_modified = metadata_last_modified;
55
self.ttl = ttl;
56
}
57
58
fn should_remove(&self, now: &SystemTime) -> bool {
59
let Ok(metadata) = std::fs::metadata(&self.path) else {
60
return false;
61
};
62
63
if let Ok(duration) = now.duration_since(
64
metadata
65
.accessed()
66
.unwrap_or_else(|_| metadata.modified().unwrap()),
67
) {
68
duration.as_secs() >= self.ttl
69
} else {
70
false
71
}
72
}
73
74
fn try_evict(
75
&mut self,
76
now: &SystemTime,
77
verbose: bool,
78
_guard: &GlobalFileCacheGuardExclusive,
79
) {
80
self.update_ttl();
81
let path = &self.path;
82
83
if !path.as_std_path().exists() {
84
if verbose {
85
eprintln!(
86
"[EvictionManager] evict_files: skipping {} (path no longer exists)",
87
path
88
);
89
}
90
return;
91
}
92
93
let metadata = std::fs::metadata(path).unwrap();
94
95
let since_last_accessed = match now.duration_since(
96
metadata
97
.accessed()
98
.unwrap_or_else(|_| metadata.modified().unwrap()),
99
) {
100
Ok(v) => v.as_secs(),
101
Err(_) => {
102
if verbose {
103
eprintln!(
104
"[EvictionManager] evict_files: skipping {} (last accessed time was updated)",
105
path
106
);
107
}
108
return;
109
},
110
};
111
112
if since_last_accessed < self.ttl {
113
if verbose {
114
eprintln!(
115
"[EvictionManager] evict_files: skipping {} (last accessed time was updated)",
116
path
117
);
118
}
119
return;
120
}
121
122
{
123
let file = std::fs::OpenOptions::new().read(true).open(path).unwrap();
124
125
if file.try_lock_exclusive().is_err() {
126
if verbose {
127
eprintln!(
128
"[EvictionManager] evict_files: skipping {} (file is locked)",
129
self.path
130
);
131
}
132
return;
133
}
134
}
135
136
if let Err(err) = std::fs::remove_file(path) {
137
if verbose {
138
eprintln!(
139
"[EvictionManager] evict_files: error removing file: {} ({})",
140
path, err
141
);
142
}
143
} else if verbose {
144
eprintln!("[EvictionManager] evict_files: removed file at {}", path);
145
}
146
}
147
}
148
149
impl EvictionManager {
150
/// # Safety
151
/// The following directories exist:
152
/// * `self.data_dir`
153
/// * `self.metadata_dir`
154
pub(super) fn run_in_background(mut self) {
155
let verbose = false;
156
157
if verbose {
158
eprintln!(
159
"[EvictionManager] creating cache eviction background task, self.min_ttl = {}",
160
self.min_ttl.load(std::sync::atomic::Ordering::Relaxed)
161
);
162
}
163
164
pl_async::get_runtime().spawn(async move {
165
// Give some time at startup for other code to run.
166
tokio::time::sleep(Duration::from_secs(3)).await;
167
let mut last_eviction_time;
168
169
loop {
170
let this: &'static mut Self = unsafe { std::mem::transmute(&mut self) };
171
172
let result = tokio::task::spawn_blocking(|| this.update_file_list())
173
.await
174
.unwrap();
175
176
last_eviction_time = Instant::now();
177
178
match result {
179
Ok(_) if self.files_to_remove.as_ref().unwrap().is_empty() => {},
180
Ok(_) => loop {
181
if let Some(guard) = GLOBAL_FILE_CACHE_LOCK.try_lock_eviction() {
182
if verbose {
183
eprintln!(
184
"[EvictionManager] got exclusive cache lock, evicting {} files",
185
self.files_to_remove.as_ref().unwrap().len()
186
);
187
}
188
189
tokio::task::block_in_place(|| self.evict_files(&guard));
190
break;
191
}
192
tokio::time::sleep(Duration::from_secs(7)).await;
193
},
194
Err(err) => {
195
if verbose {
196
eprintln!("[EvictionManager] error updating file list: {err}");
197
}
198
},
199
}
200
201
loop {
202
let min_ttl = self.min_ttl.load(std::sync::atomic::Ordering::Relaxed);
203
let sleep_interval = std::cmp::max(min_ttl / 4, {
204
#[cfg(debug_assertions)]
205
{
206
3
207
}
208
#[cfg(not(debug_assertions))]
209
{
210
60
211
}
212
});
213
214
let since_last_eviction =
215
Instant::now().duration_since(last_eviction_time).as_secs();
216
let sleep_interval = sleep_interval.saturating_sub(since_last_eviction);
217
let sleep_interval = Duration::from_secs(sleep_interval);
218
219
tokio::select! {
220
_ = self.notify_ttl_updated.notified() => {
221
continue;
222
}
223
_ = tokio::time::sleep(sleep_interval) => {
224
break;
225
}
226
}
227
}
228
}
229
});
230
}
231
232
fn update_file_list(&mut self) -> PolarsResult<()> {
233
let data_files_iter = match std::fs::read_dir(self.data_dir.as_std_path()) {
234
Ok(v) => v,
235
Err(e) => {
236
let msg = format!("failed to read data directory: {e}");
237
238
return Err(PolarsError::IO {
239
error: e.into(),
240
msg: Some(msg.into()),
241
});
242
},
243
};
244
245
let metadata_files_iter = match std::fs::read_dir(self.metadata_dir.as_std_path()) {
246
Ok(v) => v,
247
Err(e) => {
248
let msg = format!("failed to read metadata directory: {e}");
249
250
return Err(PolarsError::IO {
251
error: e.into(),
252
msg: Some(msg.into()),
253
});
254
},
255
};
256
257
let mut files_to_remove = Vec::with_capacity(
258
data_files_iter
259
.size_hint()
260
.1
261
.unwrap_or(data_files_iter.size_hint().0)
262
+ metadata_files_iter
263
.size_hint()
264
.1
265
.unwrap_or(metadata_files_iter.size_hint().0),
266
);
267
268
let now = SystemTime::now();
269
270
for file in data_files_iter {
271
let file = file?;
272
let path = PlRefPath::try_from_pathbuf(file.path())?;
273
274
let hash = path
275
.file_name()
276
.unwrap()
277
.to_str()
278
.unwrap()
279
.get(..32)
280
.unwrap();
281
let metadata_path = self.metadata_dir.join(hash);
282
283
let mut eviction_candidate = EvictionCandidate {
284
path,
285
metadata_path,
286
metadata_last_modified: UNIX_EPOCH,
287
ttl: 0,
288
};
289
eviction_candidate.update_ttl();
290
291
if eviction_candidate.should_remove(&now) {
292
files_to_remove.push(eviction_candidate);
293
}
294
}
295
296
for file in metadata_files_iter {
297
let file = file?;
298
let path = PlRefPath::try_from_pathbuf(file.path())?;
299
let metadata_path = path.clone();
300
301
let mut eviction_candidate = EvictionCandidate {
302
path,
303
metadata_path,
304
metadata_last_modified: UNIX_EPOCH,
305
ttl: 0,
306
};
307
308
eviction_candidate.update_ttl();
309
310
if eviction_candidate.should_remove(&now) {
311
files_to_remove.push(eviction_candidate);
312
}
313
}
314
315
self.files_to_remove = Some(files_to_remove);
316
317
Ok(())
318
}
319
320
/// # Panics
321
/// Panics if `self.files_to_remove` is `None`.
322
fn evict_files(&mut self, _guard: &GlobalFileCacheGuardExclusive) {
323
let verbose = false;
324
let mut files_to_remove = self.files_to_remove.take().unwrap();
325
let now = &SystemTime::now();
326
327
for eviction_candidate in files_to_remove.iter_mut() {
328
eviction_candidate.try_evict(now, verbose, _guard);
329
}
330
}
331
}
332
333