Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/file_cache/eviction.rs
6939 views
1
use std::path::{Path, PathBuf};
2
use std::sync::Arc;
3
use std::sync::atomic::AtomicU64;
4
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
5
6
use fs4::fs_std::FileExt;
7
use polars_error::{PolarsError, PolarsResult};
8
9
use super::cache_lock::{GLOBAL_FILE_CACHE_LOCK, GlobalFileCacheGuardExclusive};
10
use super::metadata::EntryMetadata;
11
use crate::pl_async;
12
13
#[derive(Debug, Clone)]
14
pub(super) struct EvictionCandidate {
15
path: PathBuf,
16
metadata_path: PathBuf,
17
metadata_last_modified: SystemTime,
18
ttl: u64,
19
}
20
21
pub(super) struct EvictionManager {
22
pub(super) data_dir: Box<Path>,
23
pub(super) metadata_dir: Box<Path>,
24
pub(super) files_to_remove: Option<Vec<EvictionCandidate>>,
25
pub(super) min_ttl: Arc<AtomicU64>,
26
pub(super) notify_ttl_updated: Arc<tokio::sync::Notify>,
27
}
28
29
impl EvictionCandidate {
30
fn update_ttl(&mut self) {
31
let Ok(metadata_last_modified) =
32
std::fs::metadata(&self.metadata_path).map(|md| md.modified().unwrap())
33
else {
34
self.ttl = 0;
35
return;
36
};
37
38
if self.metadata_last_modified == metadata_last_modified {
39
return;
40
}
41
42
let Ok(ref mut file) = std::fs::OpenOptions::new()
43
.read(true)
44
.open(&self.metadata_path)
45
else {
46
self.ttl = 0;
47
return;
48
};
49
50
let ttl = EntryMetadata::try_from_reader(file)
51
.map(|x| x.ttl)
52
.unwrap_or(0);
53
54
self.metadata_last_modified = metadata_last_modified;
55
self.ttl = ttl;
56
}
57
58
fn should_remove(&self, now: &SystemTime) -> bool {
59
let Ok(metadata) = std::fs::metadata(&self.path) else {
60
return false;
61
};
62
63
if let Ok(duration) = now.duration_since(
64
metadata
65
.accessed()
66
.unwrap_or_else(|_| metadata.modified().unwrap()),
67
) {
68
duration.as_secs() >= self.ttl
69
} else {
70
false
71
}
72
}
73
74
fn try_evict(
75
&mut self,
76
now: &SystemTime,
77
verbose: bool,
78
_guard: &GlobalFileCacheGuardExclusive,
79
) {
80
self.update_ttl();
81
let path = &self.path;
82
83
if !path.exists() {
84
if verbose {
85
eprintln!(
86
"[EvictionManager] evict_files: skipping {} (path no longer exists)",
87
path.to_str().unwrap()
88
);
89
}
90
return;
91
}
92
93
let metadata = std::fs::metadata(path).unwrap();
94
95
let since_last_accessed = match now.duration_since(
96
metadata
97
.accessed()
98
.unwrap_or_else(|_| metadata.modified().unwrap()),
99
) {
100
Ok(v) => v.as_secs(),
101
Err(_) => {
102
if verbose {
103
eprintln!(
104
"[EvictionManager] evict_files: skipping {} (last accessed time was updated)",
105
path.to_str().unwrap()
106
);
107
}
108
return;
109
},
110
};
111
112
if since_last_accessed < self.ttl {
113
if verbose {
114
eprintln!(
115
"[EvictionManager] evict_files: skipping {} (last accessed time was updated)",
116
path.to_str().unwrap()
117
);
118
}
119
return;
120
}
121
122
{
123
let file = std::fs::OpenOptions::new().read(true).open(path).unwrap();
124
125
if file.try_lock_exclusive().is_err() {
126
if verbose {
127
eprintln!(
128
"[EvictionManager] evict_files: skipping {} (file is locked)",
129
self.path.to_str().unwrap()
130
);
131
}
132
return;
133
}
134
}
135
136
if let Err(err) = std::fs::remove_file(path) {
137
if verbose {
138
eprintln!(
139
"[EvictionManager] evict_files: error removing file: {} ({})",
140
path.to_str().unwrap(),
141
err
142
);
143
}
144
} else if verbose {
145
eprintln!(
146
"[EvictionManager] evict_files: removed file at {}",
147
path.to_str().unwrap()
148
);
149
}
150
}
151
}
152
153
impl EvictionManager {
154
/// # Safety
155
/// The following directories exist:
156
/// * `self.data_dir`
157
/// * `self.metadata_dir`
158
pub(super) fn run_in_background(mut self) {
159
let verbose = false;
160
161
if verbose {
162
eprintln!(
163
"[EvictionManager] creating cache eviction background task, self.min_ttl = {}",
164
self.min_ttl.load(std::sync::atomic::Ordering::Relaxed)
165
);
166
}
167
168
pl_async::get_runtime().spawn(async move {
169
// Give some time at startup for other code to run.
170
tokio::time::sleep(Duration::from_secs(3)).await;
171
let mut last_eviction_time;
172
173
loop {
174
let this: &'static mut Self = unsafe { std::mem::transmute(&mut self) };
175
176
let result = tokio::task::spawn_blocking(|| this.update_file_list())
177
.await
178
.unwrap();
179
180
last_eviction_time = Instant::now();
181
182
match result {
183
Ok(_) if self.files_to_remove.as_ref().unwrap().is_empty() => {},
184
Ok(_) => loop {
185
if let Some(guard) = GLOBAL_FILE_CACHE_LOCK.try_lock_eviction() {
186
if verbose {
187
eprintln!(
188
"[EvictionManager] got exclusive cache lock, evicting {} files",
189
self.files_to_remove.as_ref().unwrap().len()
190
);
191
}
192
193
tokio::task::block_in_place(|| self.evict_files(&guard));
194
break;
195
}
196
tokio::time::sleep(Duration::from_secs(7)).await;
197
},
198
Err(err) => {
199
if verbose {
200
eprintln!("[EvictionManager] error updating file list: {err}");
201
}
202
},
203
}
204
205
loop {
206
let min_ttl = self.min_ttl.load(std::sync::atomic::Ordering::Relaxed);
207
let sleep_interval = std::cmp::max(min_ttl / 4, {
208
#[cfg(debug_assertions)]
209
{
210
3
211
}
212
#[cfg(not(debug_assertions))]
213
{
214
60
215
}
216
});
217
218
let since_last_eviction =
219
Instant::now().duration_since(last_eviction_time).as_secs();
220
let sleep_interval = sleep_interval.saturating_sub(since_last_eviction);
221
let sleep_interval = Duration::from_secs(sleep_interval);
222
223
tokio::select! {
224
_ = self.notify_ttl_updated.notified() => {
225
continue;
226
}
227
_ = tokio::time::sleep(sleep_interval) => {
228
break;
229
}
230
}
231
}
232
}
233
});
234
}
235
236
fn update_file_list(&mut self) -> PolarsResult<()> {
237
let data_files_iter = match std::fs::read_dir(self.data_dir.as_ref()) {
238
Ok(v) => v,
239
Err(e) => {
240
let msg = format!("failed to read data directory: {e}");
241
242
return Err(PolarsError::IO {
243
error: e.into(),
244
msg: Some(msg.into()),
245
});
246
},
247
};
248
249
let metadata_files_iter = match std::fs::read_dir(self.metadata_dir.as_ref()) {
250
Ok(v) => v,
251
Err(e) => {
252
let msg = format!("failed to read metadata directory: {e}");
253
254
return Err(PolarsError::IO {
255
error: e.into(),
256
msg: Some(msg.into()),
257
});
258
},
259
};
260
261
let mut files_to_remove = Vec::with_capacity(
262
data_files_iter
263
.size_hint()
264
.1
265
.unwrap_or(data_files_iter.size_hint().0)
266
+ metadata_files_iter
267
.size_hint()
268
.1
269
.unwrap_or(metadata_files_iter.size_hint().0),
270
);
271
272
let now = SystemTime::now();
273
274
for file in data_files_iter {
275
let file = file?;
276
let path = file.path();
277
278
let hash = path
279
.file_name()
280
.unwrap()
281
.to_str()
282
.unwrap()
283
.get(..32)
284
.unwrap();
285
let metadata_path = self.metadata_dir.join(hash);
286
287
let mut eviction_candidate = EvictionCandidate {
288
path,
289
metadata_path,
290
metadata_last_modified: UNIX_EPOCH,
291
ttl: 0,
292
};
293
eviction_candidate.update_ttl();
294
295
if eviction_candidate.should_remove(&now) {
296
files_to_remove.push(eviction_candidate);
297
}
298
}
299
300
for file in metadata_files_iter {
301
let file = file?;
302
let path = file.path();
303
let metadata_path = path.clone();
304
305
let mut eviction_candidate = EvictionCandidate {
306
path,
307
metadata_path,
308
metadata_last_modified: UNIX_EPOCH,
309
ttl: 0,
310
};
311
312
eviction_candidate.update_ttl();
313
314
if eviction_candidate.should_remove(&now) {
315
files_to_remove.push(eviction_candidate);
316
}
317
}
318
319
self.files_to_remove = Some(files_to_remove);
320
321
Ok(())
322
}
323
324
/// # Panics
325
/// Panics if `self.files_to_remove` is `None`.
326
fn evict_files(&mut self, _guard: &GlobalFileCacheGuardExclusive) {
327
let verbose = false;
328
let mut files_to_remove = self.files_to_remove.take().unwrap();
329
let now = &SystemTime::now();
330
331
for eviction_candidate in files_to_remove.iter_mut() {
332
eviction_candidate.try_evict(now, verbose, _guard);
333
}
334
}
335
}
336
337