Path: blob/main/crates/polars-io/src/file_cache/cache_lock.rs
6939 views
use std::sync::atomic::AtomicBool;1use std::sync::{Arc, LazyLock, RwLock, RwLockReadGuard, RwLockWriteGuard};2use std::time::Duration;34use fs4::fs_std::FileExt;56use super::utils::FILE_CACHE_PREFIX;7use crate::pl_async;89pub(super) static GLOBAL_FILE_CACHE_LOCK: LazyLock<GlobalLock> = LazyLock::new(|| {10let path = FILE_CACHE_PREFIX.join(".process-lock");1112let file = std::fs::OpenOptions::new()13.write(true)14.create(true)15.truncate(false)16.open(path)17.map_err(|err| {18panic!("failed to open/create global file cache lockfile: {err}");19})20.unwrap();2122let at_bool = Arc::new(AtomicBool::new(false));23// Holding this access tracker prevents the background task from24// unlocking the lock.25let access_tracker = AccessTracker(at_bool.clone());26let notify_lock_acquired = Arc::new(tokio::sync::Notify::new());27let notify_lock_acquired_2 = notify_lock_acquired.clone();2829pl_async::get_runtime().spawn(async move {30let access_tracker = at_bool;31let notify_lock_acquired = notify_lock_acquired_2;32let verbose = false;3334loop {35if verbose {36eprintln!("file cache background unlock: waiting for acquisition notification");37}3839notify_lock_acquired.notified().await;4041if verbose {42eprintln!("file cache background unlock: got acquisition notification");43}4445loop {46if !access_tracker.swap(false, std::sync::atomic::Ordering::Relaxed) {47if let Some(unlocked_by_this_call) = GLOBAL_FILE_CACHE_LOCK.try_unlock() {48if unlocked_by_this_call && verbose {49eprintln!(50"file cache background unlock: unlocked global file cache lockfile"51);52}53break;54}55}56tokio::time::sleep(Duration::from_secs(3)).await;57}58}59});6061GlobalLock {62inner: RwLock::new(GlobalLockData { file, state: None }),63access_tracker,64notify_lock_acquired,65}66});6768pub(super) enum LockedState {69/// Shared between threads and other processes.70Shared,71#[allow(dead_code)]72/// Locked exclusively by the eviction task of this process.73Eviction,74}7576#[allow(dead_code)]77pub(super) type GlobalFileCacheGuardAny<'a> = RwLockReadGuard<'a, GlobalLockData>;78pub(super) type GlobalFileCacheGuardExclusive<'a> = RwLockWriteGuard<'a, GlobalLockData>;7980pub(super) struct GlobalLockData {81file: std::fs::File,82state: Option<LockedState>,83}8485pub(super) struct GlobalLock {86inner: RwLock<GlobalLockData>,87access_tracker: AccessTracker,88notify_lock_acquired: Arc<tokio::sync::Notify>,89}9091/// Tracks access to the global lock:92/// * The inner `bool` is used to delay the background unlock task from unlocking93/// the global lock until 3 seconds after the last lock attempt.94/// * The `Arc` ref-count is used as a semaphore that allows us to block exclusive95/// lock attempts while temporarily releasing the `RwLock`.96#[derive(Clone)]97struct AccessTracker(Arc<AtomicBool>);9899impl Drop for AccessTracker {100fn drop(&mut self) {101self.0.store(true, std::sync::atomic::Ordering::Relaxed);102}103}104105struct NotifyOnDrop(Arc<tokio::sync::Notify>);106107impl Drop for NotifyOnDrop {108fn drop(&mut self) {109self.0.notify_one();110}111}112113impl GlobalLock {114fn get_access_tracker(&self) -> AccessTracker {115let at = self.access_tracker.clone();116at.0.store(true, std::sync::atomic::Ordering::Relaxed);117at118}119120/// Returns121/// * `None` - Could be locked (ambiguous)122/// * `Some(true)` - Unlocked (by this function call)123/// * `Some(false)` - Unlocked (was not locked)124fn try_unlock(&self) -> Option<bool> {125if let Ok(mut this) = self.inner.try_write() {126if Arc::strong_count(&self.access_tracker.0) <= 2 {127return if this.state.take().is_some() {128FileExt::unlock(&this.file).unwrap();129Some(true)130} else {131Some(false)132};133}134}135None136}137138/// Acquire a shared lock.139pub(super) fn lock_shared(&self) -> GlobalFileCacheGuardAny<'_> {140let access_tracker = self.get_access_tracker();141let _notify_on_drop = NotifyOnDrop(self.notify_lock_acquired.clone());142143{144let this = self.inner.read().unwrap();145146if let Some(LockedState::Shared) = this.state {147return this;148}149}150151{152let mut this = self.inner.write().unwrap();153154if let Some(LockedState::Eviction) = this.state {155FileExt::unlock(&this.file).unwrap();156this.state = None;157}158159if this.state.is_none() {160FileExt::lock_shared(&this.file).unwrap();161this.state = Some(LockedState::Shared);162}163}164165// Safety: Holding the access tracker guard maintains an Arc refcount166// > 2, which prevents automatic unlock.167debug_assert!(Arc::strong_count(&access_tracker.0) > 2);168169{170let this = self.inner.read().unwrap();171172if let Some(LockedState::Eviction) = this.state {173// Try again174drop(this);175return self.lock_shared();176}177178assert!(179this.state.is_some(),180"impl error: global file cache lock was unlocked"181);182this183}184}185186/// Acquire an exclusive lock on the cache directory. Holding this lock freezes187/// all cache operations except for reading from already-opened data files.188#[allow(dead_code)]189pub(super) fn try_lock_eviction(&self) -> Option<GlobalFileCacheGuardExclusive<'_>> {190let access_tracker = self.get_access_tracker();191192if let Ok(mut this) = self.inner.try_write() {193if194// 3:195// * the Lazy<GlobalLock>196// * the global unlock background task197// * this function198Arc::strong_count(&access_tracker.0) > 3 {199return None;200}201202let _notify_on_drop = NotifyOnDrop(self.notify_lock_acquired.clone());203204if let Some(ref state) = this.state {205if matches!(state, LockedState::Eviction) {206return Some(this);207}208}209210if this.state.take().is_some() {211FileExt::unlock(&this.file).unwrap();212}213214if this.file.try_lock_exclusive().is_ok() {215this.state = Some(LockedState::Eviction);216return Some(this);217}218}219None220}221}222223224