Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bytecodealliance
GitHub Repository: bytecodealliance/wasmtime
Path: blob/main/crates/cache/src/worker.rs
1693 views
1
//! Background worker that watches over the cache.
2
//!
3
//! It cleans up old cache, updates statistics and optimizes the cache.
4
//! We allow losing some messages (it doesn't hurt) and some races,
5
//! but we guarantee eventual consistency and fault tolerancy.
6
//! Background tasks can be CPU intensive, but the worker thread has low priority.
7
8
#![cfg_attr(
9
not(test),
10
expect(
11
clippy::useless_conversion,
12
reason = "cfg(test) and cfg(not(test)) have a different definition \
13
of `SystemTime`, so conversions below are needed in \
14
one mode but not the other, just ignore the lint in this \
15
module in not(test) mode where the conversion isn't required",
16
)
17
)]
18
19
use super::{CacheConfig, fs_write_atomic};
20
use log::{debug, info, trace, warn};
21
use serde_derive::{Deserialize, Serialize};
22
use std::cmp;
23
use std::collections::HashMap;
24
use std::ffi::OsStr;
25
use std::fmt;
26
use std::fs;
27
use std::path::{Path, PathBuf};
28
use std::sync::mpsc::{Receiver, SyncSender, sync_channel};
29
#[cfg(test)]
30
use std::sync::{Arc, Condvar, Mutex};
31
use std::thread;
32
use std::time::Duration;
33
#[cfg(not(test))]
34
use std::time::SystemTime;
35
#[cfg(test)]
36
use tests::system_time_stub::SystemTimeStub as SystemTime;
37
38
#[derive(Clone)]
39
pub(super) struct Worker {
40
sender: SyncSender<CacheEvent>,
41
#[cfg(test)]
42
stats: Arc<(Mutex<WorkerStats>, Condvar)>,
43
}
44
45
struct WorkerThread {
46
receiver: Receiver<CacheEvent>,
47
cache_config: CacheConfig,
48
#[cfg(test)]
49
stats: Arc<(Mutex<WorkerStats>, Condvar)>,
50
}
51
52
#[cfg(test)]
53
#[derive(Default)]
54
struct WorkerStats {
55
dropped: u32,
56
sent: u32,
57
handled: u32,
58
}
59
60
#[derive(Debug, Clone)]
61
enum CacheEvent {
62
OnCacheGet(PathBuf),
63
OnCacheUpdate(PathBuf),
64
}
65
66
impl Worker {
67
pub(super) fn start_new(cache_config: &CacheConfig) -> Self {
68
let queue_size = match cache_config.worker_event_queue_size() {
69
num if num <= usize::max_value() as u64 => num as usize,
70
_ => usize::max_value(),
71
};
72
let (tx, rx) = sync_channel(queue_size);
73
74
#[cfg(test)]
75
let stats = Arc::new((Mutex::new(WorkerStats::default()), Condvar::new()));
76
77
let worker_thread = WorkerThread {
78
receiver: rx,
79
cache_config: cache_config.clone(),
80
#[cfg(test)]
81
stats: stats.clone(),
82
};
83
84
// when self is dropped, sender will be dropped, what will cause the channel
85
// to hang, and the worker thread to exit -- it happens in the tests
86
// non-tests binary has only a static worker, so Rust doesn't drop it
87
thread::spawn(move || worker_thread.run());
88
89
Self {
90
sender: tx,
91
#[cfg(test)]
92
stats,
93
}
94
}
95
96
pub(super) fn on_cache_get_async(&self, path: impl AsRef<Path>) {
97
let event = CacheEvent::OnCacheGet(path.as_ref().to_path_buf());
98
self.send_cache_event(event);
99
}
100
101
pub(super) fn on_cache_update_async(&self, path: impl AsRef<Path>) {
102
let event = CacheEvent::OnCacheUpdate(path.as_ref().to_path_buf());
103
self.send_cache_event(event);
104
}
105
106
#[inline]
107
fn send_cache_event(&self, event: CacheEvent) {
108
let sent_event = self.sender.try_send(event.clone());
109
110
if let Err(ref err) = sent_event {
111
info!(
112
"Failed to send asynchronously message to worker thread, \
113
event: {event:?}, error: {err}"
114
);
115
}
116
117
#[cfg(test)]
118
{
119
let mut stats = self
120
.stats
121
.0
122
.lock()
123
.expect("Failed to acquire worker stats lock");
124
125
if sent_event.is_ok() {
126
stats.sent += 1;
127
} else {
128
stats.dropped += 1;
129
}
130
}
131
}
132
133
#[cfg(test)]
134
pub(super) fn events_dropped(&self) -> u32 {
135
let stats = self
136
.stats
137
.0
138
.lock()
139
.expect("Failed to acquire worker stats lock");
140
stats.dropped
141
}
142
143
#[cfg(test)]
144
pub(super) fn wait_for_all_events_handled(&self) {
145
let (stats, condvar) = &*self.stats;
146
let mut stats = stats.lock().expect("Failed to acquire worker stats lock");
147
while stats.handled != stats.sent {
148
stats = condvar
149
.wait(stats)
150
.expect("Failed to reacquire worker stats lock");
151
}
152
}
153
}
154
155
impl fmt::Debug for Worker {
156
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
157
f.debug_struct("Worker").finish()
158
}
159
}
160
161
#[derive(Serialize, Deserialize)]
162
struct ModuleCacheStatistics {
163
pub usages: u64,
164
#[serde(rename = "optimized-compression")]
165
pub compression_level: i32,
166
}
167
168
impl ModuleCacheStatistics {
169
fn default(cache_config: &CacheConfig) -> Self {
170
Self {
171
usages: 0,
172
compression_level: cache_config.baseline_compression_level(),
173
}
174
}
175
}
176
177
enum CacheEntry {
178
Recognized {
179
path: PathBuf,
180
mtime: SystemTime,
181
size: u64,
182
},
183
Unrecognized {
184
path: PathBuf,
185
is_dir: bool,
186
},
187
}
188
189
macro_rules! unwrap_or_warn {
190
($result:expr, $cont:stmt, $err_msg:expr, $path:expr) => {
191
match $result {
192
Ok(val) => val,
193
Err(err) => {
194
warn!("{}, path: {}, msg: {}", $err_msg, $path.display(), err);
195
$cont
196
}
197
}
198
};
199
}
200
201
impl WorkerThread {
202
fn run(self) {
203
debug!("Cache worker thread started.");
204
205
Self::lower_thread_priority();
206
207
#[cfg(test)]
208
let (stats, condvar) = &*self.stats;
209
210
for event in self.receiver.iter() {
211
match event {
212
CacheEvent::OnCacheGet(path) => self.handle_on_cache_get(path),
213
CacheEvent::OnCacheUpdate(path) => self.handle_on_cache_update(path),
214
}
215
216
#[cfg(test)]
217
{
218
let mut stats = stats.lock().expect("Failed to acquire worker stats lock");
219
stats.handled += 1;
220
condvar.notify_all();
221
}
222
}
223
}
224
225
#[cfg(target_os = "fuchsia")]
226
fn lower_thread_priority() {
227
// TODO This needs to use Fuchsia thread profiles
228
// https://fuchsia.dev/fuchsia-src/reference/kernel_objects/profile
229
warn!(
230
"Lowering thread priority on Fuchsia is currently a noop. It might affect application performance."
231
);
232
}
233
234
#[cfg(target_os = "windows")]
235
fn lower_thread_priority() {
236
use windows_sys::Win32::System::Threading::*;
237
238
// https://docs.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-setthreadpriority
239
// https://docs.microsoft.com/en-us/windows/win32/procthread/scheduling-priorities
240
241
if unsafe { SetThreadPriority(GetCurrentThread(), THREAD_MODE_BACKGROUND_BEGIN) } == 0 {
242
warn!(
243
"Failed to lower worker thread priority. It might affect application performance."
244
);
245
}
246
}
247
248
#[cfg(not(any(target_os = "windows", target_os = "fuchsia")))]
249
fn lower_thread_priority() {
250
// http://man7.org/linux/man-pages/man7/sched.7.html
251
252
const NICE_DELTA_FOR_BACKGROUND_TASKS: i32 = 3;
253
254
match rustix::process::nice(NICE_DELTA_FOR_BACKGROUND_TASKS) {
255
Ok(current_nice) => {
256
debug!("New nice value of worker thread: {current_nice}");
257
}
258
Err(err) => {
259
warn!(
260
"Failed to lower worker thread priority ({err:?}). It might affect application performance."
261
);
262
}
263
};
264
}
265
266
/// Increases the usage counter and recompresses the file
267
/// if the usage counter reached configurable threshold.
268
fn handle_on_cache_get(&self, path: PathBuf) {
269
trace!("handle_on_cache_get() for path: {}", path.display());
270
271
// construct .stats file path
272
let filename = path.file_name().unwrap().to_str().unwrap();
273
let stats_path = path.with_file_name(format!("{filename}.stats"));
274
275
// load .stats file (default if none or error)
276
let mut stats = read_stats_file(stats_path.as_ref())
277
.unwrap_or_else(|| ModuleCacheStatistics::default(&self.cache_config));
278
279
// step 1: update the usage counter & write to the disk
280
// it's racy, but it's fine (the counter will be just smaller,
281
// sometimes will retrigger recompression)
282
stats.usages += 1;
283
if !write_stats_file(stats_path.as_ref(), &stats) {
284
return;
285
}
286
287
// step 2: recompress if there's a need
288
let opt_compr_lvl = self.cache_config.optimized_compression_level();
289
if stats.compression_level >= opt_compr_lvl
290
|| stats.usages
291
< self
292
.cache_config
293
.optimized_compression_usage_counter_threshold()
294
{
295
return;
296
}
297
298
let lock_path = if let Some(p) = acquire_task_fs_lock(
299
path.as_ref(),
300
self.cache_config.optimizing_compression_task_timeout(),
301
self.cache_config
302
.allowed_clock_drift_for_files_from_future(),
303
) {
304
p
305
} else {
306
return;
307
};
308
309
trace!("Trying to recompress file: {}", path.display());
310
311
// recompress, write to other file, rename (it's atomic file content exchange)
312
// and update the stats file
313
let compressed_cache_bytes = unwrap_or_warn!(
314
fs::read(&path),
315
return,
316
"Failed to read old cache file",
317
path
318
);
319
320
let cache_bytes = unwrap_or_warn!(
321
zstd::decode_all(&compressed_cache_bytes[..]),
322
return,
323
"Failed to decompress cached code",
324
path
325
);
326
327
let recompressed_cache_bytes = unwrap_or_warn!(
328
zstd::encode_all(&cache_bytes[..], opt_compr_lvl),
329
return,
330
"Failed to compress cached code",
331
path
332
);
333
334
unwrap_or_warn!(
335
fs::write(&lock_path, &recompressed_cache_bytes),
336
return,
337
"Failed to write recompressed cache",
338
lock_path
339
);
340
341
unwrap_or_warn!(
342
fs::rename(&lock_path, &path),
343
{
344
if let Err(error) = fs::remove_file(&lock_path) {
345
warn!(
346
"Failed to clean up (remove) recompressed cache, path {}, err: {}",
347
lock_path.display(),
348
error
349
);
350
}
351
352
return;
353
},
354
"Failed to rename recompressed cache",
355
lock_path
356
);
357
358
// update stats file (reload it! recompression can take some time)
359
if let Some(mut new_stats) = read_stats_file(stats_path.as_ref()) {
360
if new_stats.compression_level >= opt_compr_lvl {
361
// Rare race:
362
// two instances with different opt_compr_lvl: we don't know in which order they updated
363
// the cache file and the stats file (they are not updated together atomically)
364
// Possible solution is to use directories per cache entry, but it complicates the system
365
// and is not worth it.
366
debug!(
367
"DETECTED task did more than once (or race with new file): \
368
recompression of {}. Note: if optimized compression level setting \
369
has changed in the meantine, the stats file might contain \
370
inconsistent compression level due to race.",
371
path.display()
372
);
373
} else {
374
new_stats.compression_level = opt_compr_lvl;
375
let _ = write_stats_file(stats_path.as_ref(), &new_stats);
376
}
377
378
if new_stats.usages < stats.usages {
379
debug!(
380
"DETECTED lower usage count (new file or race with counter \
381
increasing): file {}",
382
path.display()
383
);
384
}
385
} else {
386
debug!(
387
"Can't read stats file again to update compression level (it might got \
388
cleaned up): file {}",
389
stats_path.display()
390
);
391
}
392
393
trace!("Task finished: recompress file: {}", path.display());
394
}
395
396
fn directory(&self) -> &PathBuf {
397
self.cache_config
398
.directory()
399
.expect("CacheConfig should be validated before being passed to a WorkerThread")
400
}
401
402
fn handle_on_cache_update(&self, path: PathBuf) {
403
trace!("handle_on_cache_update() for path: {}", path.display());
404
405
// ---------------------- step 1: create .stats file
406
407
// construct .stats file path
408
let filename = path
409
.file_name()
410
.expect("Expected valid cache file name")
411
.to_str()
412
.expect("Expected valid cache file name");
413
let stats_path = path.with_file_name(format!("{filename}.stats"));
414
415
// create and write stats file
416
let mut stats = ModuleCacheStatistics::default(&self.cache_config);
417
stats.usages += 1;
418
write_stats_file(&stats_path, &stats);
419
420
// ---------------------- step 2: perform cleanup task if needed
421
422
// acquire lock for cleanup task
423
// Lock is a proof of recent cleanup task, so we don't want to delete them.
424
// Expired locks will be deleted by the cleanup task.
425
let cleanup_file = self.directory().join(".cleanup"); // some non existing marker file
426
if acquire_task_fs_lock(
427
&cleanup_file,
428
self.cache_config.cleanup_interval(),
429
self.cache_config
430
.allowed_clock_drift_for_files_from_future(),
431
)
432
.is_none()
433
{
434
return;
435
}
436
437
trace!("Trying to clean up cache");
438
439
let mut cache_index = self.list_cache_contents();
440
let future_tolerance = SystemTime::now()
441
.checked_add(
442
self.cache_config
443
.allowed_clock_drift_for_files_from_future(),
444
)
445
.expect("Brace your cache, the next Big Bang is coming (time overflow)");
446
cache_index.sort_unstable_by(|lhs, rhs| {
447
// sort by age
448
use CacheEntry::*;
449
match (lhs, rhs) {
450
(Recognized { mtime: lhs_mt, .. }, Recognized { mtime: rhs_mt, .. }) => {
451
match (*lhs_mt > future_tolerance, *rhs_mt > future_tolerance) {
452
// later == younger
453
(false, false) => rhs_mt.cmp(lhs_mt),
454
// files from far future are treated as oldest recognized files
455
// we want to delete them, so the cache keeps track of recent files
456
// however, we don't delete them uncodintionally,
457
// because .stats file can be overwritten with a meaningful mtime
458
(true, false) => cmp::Ordering::Greater,
459
(false, true) => cmp::Ordering::Less,
460
(true, true) => cmp::Ordering::Equal,
461
}
462
}
463
// unrecognized is kind of infinity
464
(Recognized { .. }, Unrecognized { .. }) => cmp::Ordering::Less,
465
(Unrecognized { .. }, Recognized { .. }) => cmp::Ordering::Greater,
466
(Unrecognized { .. }, Unrecognized { .. }) => cmp::Ordering::Equal,
467
}
468
});
469
470
// find "cut" boundary:
471
// - remove unrecognized files anyway,
472
// - remove some cache files if some quota has been exceeded
473
let mut total_size = 0u64;
474
let mut start_delete_idx = None;
475
let mut start_delete_idx_if_deleting_recognized_items: Option<usize> = None;
476
477
let total_size_limit = self.cache_config.files_total_size_soft_limit();
478
let file_count_limit = self.cache_config.file_count_soft_limit();
479
let tsl_if_deleting = total_size_limit
480
.checked_mul(
481
self.cache_config
482
.files_total_size_limit_percent_if_deleting() as u64,
483
)
484
.unwrap()
485
/ 100;
486
let fcl_if_deleting = file_count_limit
487
.checked_mul(self.cache_config.file_count_limit_percent_if_deleting() as u64)
488
.unwrap()
489
/ 100;
490
491
for (idx, item) in cache_index.iter().enumerate() {
492
let size = if let CacheEntry::Recognized { size, .. } = item {
493
size
494
} else {
495
start_delete_idx = Some(idx);
496
break;
497
};
498
499
total_size += size;
500
if start_delete_idx_if_deleting_recognized_items.is_none()
501
&& (total_size > tsl_if_deleting || (idx + 1) as u64 > fcl_if_deleting)
502
{
503
start_delete_idx_if_deleting_recognized_items = Some(idx);
504
}
505
506
if total_size > total_size_limit || (idx + 1) as u64 > file_count_limit {
507
start_delete_idx = start_delete_idx_if_deleting_recognized_items;
508
break;
509
}
510
}
511
512
if let Some(idx) = start_delete_idx {
513
for item in &cache_index[idx..] {
514
let (result, path, entity) = match item {
515
CacheEntry::Recognized { path, .. }
516
| CacheEntry::Unrecognized {
517
path,
518
is_dir: false,
519
} => (fs::remove_file(path), path, "file"),
520
CacheEntry::Unrecognized { path, is_dir: true } => {
521
(fs::remove_dir_all(path), path, "directory")
522
}
523
};
524
if let Err(err) = result {
525
warn!(
526
"Failed to remove {} during cleanup, path: {}, err: {}",
527
entity,
528
path.display(),
529
err
530
);
531
}
532
}
533
}
534
535
trace!("Task finished: clean up cache");
536
}
537
538
// Be fault tolerant: list as much as you can, and ignore the rest
539
fn list_cache_contents(&self) -> Vec<CacheEntry> {
540
fn enter_dir(
541
vec: &mut Vec<CacheEntry>,
542
dir_path: &Path,
543
level: u8,
544
cache_config: &CacheConfig,
545
) {
546
macro_rules! add_unrecognized {
547
(file: $path:expr) => {
548
add_unrecognized!(false, $path)
549
};
550
(dir: $path:expr) => {
551
add_unrecognized!(true, $path)
552
};
553
($is_dir:expr, $path:expr) => {
554
vec.push(CacheEntry::Unrecognized {
555
path: $path.to_path_buf(),
556
is_dir: $is_dir,
557
})
558
};
559
}
560
macro_rules! add_unrecognized_and {
561
([ $( $ty:ident: $path:expr ),* ], $cont:stmt) => {{
562
$( add_unrecognized!($ty: $path); )*
563
$cont
564
}};
565
}
566
567
macro_rules! unwrap_or {
568
($result:expr, $cont:stmt, $err_msg:expr) => {
569
unwrap_or!($result, $cont, $err_msg, dir_path)
570
};
571
($result:expr, $cont:stmt, $err_msg:expr, $path:expr) => {
572
unwrap_or_warn!(
573
$result,
574
$cont,
575
format!("{}, level: {}", $err_msg, level),
576
$path
577
)
578
};
579
}
580
581
// If we fail to list a directory, something bad is happening anyway
582
// (something touches our cache or we have disk failure)
583
// Try to delete it, so we can stay within soft limits of the cache size.
584
// This comment applies later in this function, too.
585
let it = unwrap_or!(
586
fs::read_dir(dir_path),
587
add_unrecognized_and!([dir: dir_path], return),
588
"Failed to list cache directory, deleting it"
589
);
590
591
let mut cache_files = HashMap::new();
592
for entry in it {
593
// read_dir() returns an iterator over results - in case some of them are errors
594
// we don't know their names, so we can't delete them. We don't want to delete
595
// the whole directory with good entries too, so we just ignore the erroneous entries.
596
let entry = unwrap_or!(
597
entry,
598
continue,
599
"Failed to read a cache dir entry (NOT deleting it, it still occupies space)"
600
);
601
let path = entry.path();
602
match (level, path.is_dir()) {
603
(0..=1, true) => enter_dir(vec, &path, level + 1, cache_config),
604
(0..=1, false) => {
605
if level == 0
606
&& path.file_stem() == Some(OsStr::new(".cleanup"))
607
&& path.extension().is_some()
608
// assume it's cleanup lock
609
&& !is_fs_lock_expired(
610
Some(&entry),
611
&path,
612
cache_config.cleanup_interval(),
613
cache_config.allowed_clock_drift_for_files_from_future(),
614
)
615
{
616
continue; // skip active lock
617
}
618
add_unrecognized!(file: path);
619
}
620
(2, false) => {
621
match path.extension().and_then(OsStr::to_str) {
622
// mod or stats file
623
None | Some("stats") => {
624
cache_files.insert(path, entry);
625
}
626
627
Some(ext) => {
628
// check if valid lock
629
let recognized = ext.starts_with("wip-")
630
&& !is_fs_lock_expired(
631
Some(&entry),
632
&path,
633
cache_config.optimizing_compression_task_timeout(),
634
cache_config.allowed_clock_drift_for_files_from_future(),
635
);
636
637
if !recognized {
638
add_unrecognized!(file: path);
639
}
640
}
641
}
642
}
643
(_, is_dir) => add_unrecognized!(is_dir, path),
644
}
645
}
646
647
// associate module with its stats & handle them
648
// assumption: just mods and stats
649
for (path, entry) in cache_files.iter() {
650
let path_buf: PathBuf;
651
let (mod_, stats_, is_mod) = match path.extension() {
652
Some(_) => {
653
path_buf = path.with_extension("");
654
(
655
cache_files.get(&path_buf).map(|v| (&path_buf, v)),
656
Some((path, entry)),
657
false,
658
)
659
}
660
None => {
661
path_buf = path.with_extension("stats");
662
(
663
Some((path, entry)),
664
cache_files.get(&path_buf).map(|v| (&path_buf, v)),
665
true,
666
)
667
}
668
};
669
670
// construct a cache entry
671
match (mod_, stats_, is_mod) {
672
(Some((mod_path, mod_entry)), Some((stats_path, stats_entry)), true) => {
673
let mod_metadata = unwrap_or!(
674
mod_entry.metadata(),
675
add_unrecognized_and!([file: stats_path, file: mod_path], continue),
676
"Failed to get metadata, deleting BOTH module cache and stats files",
677
mod_path
678
);
679
let stats_mtime = unwrap_or!(
680
stats_entry.metadata().and_then(|m| m.modified()),
681
add_unrecognized_and!(
682
[file: stats_path],
683
unwrap_or!(
684
mod_metadata.modified(),
685
add_unrecognized_and!(
686
[file: stats_path, file: mod_path],
687
continue
688
),
689
"Failed to get mtime, deleting BOTH module cache and stats \
690
files",
691
mod_path
692
)
693
),
694
"Failed to get metadata/mtime, deleting the file",
695
stats_path
696
);
697
// .into() called for the SystemTimeStub if cfg(test)
698
vec.push(CacheEntry::Recognized {
699
path: mod_path.to_path_buf(),
700
mtime: stats_mtime.into(),
701
size: mod_metadata.len(),
702
})
703
}
704
(Some(_), Some(_), false) => (), // was or will be handled by previous branch
705
(Some((mod_path, mod_entry)), None, _) => {
706
let (mod_metadata, mod_mtime) = unwrap_or!(
707
mod_entry
708
.metadata()
709
.and_then(|md| md.modified().map(|mt| (md, mt))),
710
add_unrecognized_and!([file: mod_path], continue),
711
"Failed to get metadata/mtime, deleting the file",
712
mod_path
713
);
714
// .into() called for the SystemTimeStub if cfg(test)
715
vec.push(CacheEntry::Recognized {
716
path: mod_path.to_path_buf(),
717
mtime: mod_mtime.into(),
718
size: mod_metadata.len(),
719
})
720
}
721
(None, Some((stats_path, _stats_entry)), _) => {
722
debug!("Found orphaned stats file: {}", stats_path.display());
723
add_unrecognized!(file: stats_path);
724
}
725
_ => unreachable!(),
726
}
727
}
728
}
729
730
let mut vec = Vec::new();
731
enter_dir(&mut vec, self.directory(), 0, &self.cache_config);
732
vec
733
}
734
}
735
736
fn read_stats_file(path: &Path) -> Option<ModuleCacheStatistics> {
737
fs::read_to_string(path)
738
.map_err(|err| {
739
trace!(
740
"Failed to read stats file, path: {}, err: {}",
741
path.display(),
742
err
743
)
744
})
745
.and_then(|contents| {
746
toml::from_str::<ModuleCacheStatistics>(&contents).map_err(|err| {
747
trace!(
748
"Failed to parse stats file, path: {}, err: {}",
749
path.display(),
750
err,
751
)
752
})
753
})
754
.ok()
755
}
756
757
fn write_stats_file(path: &Path, stats: &ModuleCacheStatistics) -> bool {
758
toml::to_string_pretty(&stats)
759
.map_err(|err| {
760
warn!(
761
"Failed to serialize stats file, path: {}, err: {}",
762
path.display(),
763
err
764
)
765
})
766
.and_then(|serialized| {
767
fs_write_atomic(path, "stats", serialized.as_bytes()).map_err(|_| ())
768
})
769
.is_ok()
770
}
771
772
/// Tries to acquire a lock for specific task.
773
///
774
/// Returns Some(path) to the lock if succeeds. The task path must not
775
/// contain any extension and have file stem.
776
///
777
/// To release a lock you need either manually rename or remove it,
778
/// or wait until it expires and cleanup task removes it.
779
///
780
/// Note: this function is racy. Main idea is: be fault tolerant and
781
/// never block some task. The price is that we rarely do some task
782
/// more than once.
783
fn acquire_task_fs_lock(
784
task_path: &Path,
785
timeout: Duration,
786
allowed_future_drift: Duration,
787
) -> Option<PathBuf> {
788
assert!(task_path.extension().is_none());
789
assert!(task_path.file_stem().is_some());
790
791
// list directory
792
let dir_path = task_path.parent()?;
793
let it = fs::read_dir(dir_path)
794
.map_err(|err| {
795
warn!(
796
"Failed to list cache directory, path: {}, err: {}",
797
dir_path.display(),
798
err
799
)
800
})
801
.ok()?;
802
803
// look for existing locks
804
for entry in it {
805
let entry = entry
806
.map_err(|err| {
807
warn!(
808
"Failed to list cache directory, path: {}, err: {}",
809
dir_path.display(),
810
err
811
)
812
})
813
.ok()?;
814
815
let path = entry.path();
816
if path.is_dir() || path.file_stem() != task_path.file_stem() {
817
continue;
818
}
819
820
// check extension and mtime
821
match path.extension() {
822
None => continue,
823
Some(ext) => {
824
if let Some(ext_str) = ext.to_str() {
825
// if it's None, i.e. not valid UTF-8 string, then that's not our lock for sure
826
if ext_str.starts_with("wip-")
827
&& !is_fs_lock_expired(Some(&entry), &path, timeout, allowed_future_drift)
828
{
829
return None;
830
}
831
}
832
}
833
}
834
}
835
836
// create the lock
837
let lock_path = task_path.with_extension(format!("wip-{}", std::process::id()));
838
let _file = fs::OpenOptions::new()
839
.create_new(true)
840
.write(true)
841
.open(&lock_path)
842
.map_err(|err| {
843
warn!(
844
"Failed to create lock file (note: it shouldn't exists): path: {}, err: {}",
845
lock_path.display(),
846
err
847
)
848
})
849
.ok()?;
850
851
Some(lock_path)
852
}
853
854
// we have either both, or just path; dir entry is desirable since on some platforms we can get
855
// metadata without extra syscalls
856
// furthermore: it's better to get a path if we have it instead of allocating a new one from the dir entry
857
fn is_fs_lock_expired(
858
entry: Option<&fs::DirEntry>,
859
path: &PathBuf,
860
threshold: Duration,
861
allowed_future_drift: Duration,
862
) -> bool {
863
let mtime = match entry
864
.map_or_else(|| path.metadata(), |e| e.metadata())
865
.and_then(|metadata| metadata.modified())
866
{
867
Ok(mt) => mt,
868
Err(err) => {
869
warn!(
870
"Failed to get metadata/mtime, treating as an expired lock, path: {}, err: {}",
871
path.display(),
872
err
873
);
874
return true; // can't read mtime, treat as expired, so this task will not be starved
875
}
876
};
877
878
// DON'T use: mtime.elapsed() -- we must call SystemTime directly for the tests to be deterministic
879
match SystemTime::now().duration_since(mtime) {
880
Ok(elapsed) => elapsed >= threshold,
881
Err(err) => {
882
trace!(
883
"Found mtime in the future, treating as a not expired lock, path: {}, err: {}",
884
path.display(),
885
err
886
);
887
// the lock is expired if the time is too far in the future
888
// it is fine to have network share and not synchronized clocks,
889
// but it's not good when user changes time in their system clock
890
err.duration() > allowed_future_drift
891
}
892
}
893
}
894
895
#[cfg(test)]
896
mod tests;
897
898