Path: blob/main/crates/polars-io/src/file_cache/eviction.rs
6939 views
use std::path::{Path, PathBuf};1use std::sync::Arc;2use std::sync::atomic::AtomicU64;3use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};45use fs4::fs_std::FileExt;6use polars_error::{PolarsError, PolarsResult};78use super::cache_lock::{GLOBAL_FILE_CACHE_LOCK, GlobalFileCacheGuardExclusive};9use super::metadata::EntryMetadata;10use crate::pl_async;1112#[derive(Debug, Clone)]13pub(super) struct EvictionCandidate {14path: PathBuf,15metadata_path: PathBuf,16metadata_last_modified: SystemTime,17ttl: u64,18}1920pub(super) struct EvictionManager {21pub(super) data_dir: Box<Path>,22pub(super) metadata_dir: Box<Path>,23pub(super) files_to_remove: Option<Vec<EvictionCandidate>>,24pub(super) min_ttl: Arc<AtomicU64>,25pub(super) notify_ttl_updated: Arc<tokio::sync::Notify>,26}2728impl EvictionCandidate {29fn update_ttl(&mut self) {30let Ok(metadata_last_modified) =31std::fs::metadata(&self.metadata_path).map(|md| md.modified().unwrap())32else {33self.ttl = 0;34return;35};3637if self.metadata_last_modified == metadata_last_modified {38return;39}4041let Ok(ref mut file) = std::fs::OpenOptions::new()42.read(true)43.open(&self.metadata_path)44else {45self.ttl = 0;46return;47};4849let ttl = EntryMetadata::try_from_reader(file)50.map(|x| x.ttl)51.unwrap_or(0);5253self.metadata_last_modified = metadata_last_modified;54self.ttl = ttl;55}5657fn should_remove(&self, now: &SystemTime) -> bool {58let Ok(metadata) = std::fs::metadata(&self.path) else {59return false;60};6162if let Ok(duration) = now.duration_since(63metadata64.accessed()65.unwrap_or_else(|_| metadata.modified().unwrap()),66) {67duration.as_secs() >= self.ttl68} else {69false70}71}7273fn try_evict(74&mut self,75now: &SystemTime,76verbose: bool,77_guard: &GlobalFileCacheGuardExclusive,78) {79self.update_ttl();80let path = &self.path;8182if !path.exists() {83if verbose {84eprintln!(85"[EvictionManager] evict_files: skipping {} (path no longer exists)",86path.to_str().unwrap()87);88}89return;90}9192let metadata = std::fs::metadata(path).unwrap();9394let since_last_accessed = match now.duration_since(95metadata96.accessed()97.unwrap_or_else(|_| metadata.modified().unwrap()),98) {99Ok(v) => v.as_secs(),100Err(_) => {101if verbose {102eprintln!(103"[EvictionManager] evict_files: skipping {} (last accessed time was updated)",104path.to_str().unwrap()105);106}107return;108},109};110111if since_last_accessed < self.ttl {112if verbose {113eprintln!(114"[EvictionManager] evict_files: skipping {} (last accessed time was updated)",115path.to_str().unwrap()116);117}118return;119}120121{122let file = std::fs::OpenOptions::new().read(true).open(path).unwrap();123124if file.try_lock_exclusive().is_err() {125if verbose {126eprintln!(127"[EvictionManager] evict_files: skipping {} (file is locked)",128self.path.to_str().unwrap()129);130}131return;132}133}134135if let Err(err) = std::fs::remove_file(path) {136if verbose {137eprintln!(138"[EvictionManager] evict_files: error removing file: {} ({})",139path.to_str().unwrap(),140err141);142}143} else if verbose {144eprintln!(145"[EvictionManager] evict_files: removed file at {}",146path.to_str().unwrap()147);148}149}150}151152impl EvictionManager {153/// # Safety154/// The following directories exist:155/// * `self.data_dir`156/// * `self.metadata_dir`157pub(super) fn run_in_background(mut self) {158let verbose = false;159160if verbose {161eprintln!(162"[EvictionManager] creating cache eviction background task, self.min_ttl = {}",163self.min_ttl.load(std::sync::atomic::Ordering::Relaxed)164);165}166167pl_async::get_runtime().spawn(async move {168// Give some time at startup for other code to run.169tokio::time::sleep(Duration::from_secs(3)).await;170let mut last_eviction_time;171172loop {173let this: &'static mut Self = unsafe { std::mem::transmute(&mut self) };174175let result = tokio::task::spawn_blocking(|| this.update_file_list())176.await177.unwrap();178179last_eviction_time = Instant::now();180181match result {182Ok(_) if self.files_to_remove.as_ref().unwrap().is_empty() => {},183Ok(_) => loop {184if let Some(guard) = GLOBAL_FILE_CACHE_LOCK.try_lock_eviction() {185if verbose {186eprintln!(187"[EvictionManager] got exclusive cache lock, evicting {} files",188self.files_to_remove.as_ref().unwrap().len()189);190}191192tokio::task::block_in_place(|| self.evict_files(&guard));193break;194}195tokio::time::sleep(Duration::from_secs(7)).await;196},197Err(err) => {198if verbose {199eprintln!("[EvictionManager] error updating file list: {err}");200}201},202}203204loop {205let min_ttl = self.min_ttl.load(std::sync::atomic::Ordering::Relaxed);206let sleep_interval = std::cmp::max(min_ttl / 4, {207#[cfg(debug_assertions)]208{2093210}211#[cfg(not(debug_assertions))]212{21360214}215});216217let since_last_eviction =218Instant::now().duration_since(last_eviction_time).as_secs();219let sleep_interval = sleep_interval.saturating_sub(since_last_eviction);220let sleep_interval = Duration::from_secs(sleep_interval);221222tokio::select! {223_ = self.notify_ttl_updated.notified() => {224continue;225}226_ = tokio::time::sleep(sleep_interval) => {227break;228}229}230}231}232});233}234235fn update_file_list(&mut self) -> PolarsResult<()> {236let data_files_iter = match std::fs::read_dir(self.data_dir.as_ref()) {237Ok(v) => v,238Err(e) => {239let msg = format!("failed to read data directory: {e}");240241return Err(PolarsError::IO {242error: e.into(),243msg: Some(msg.into()),244});245},246};247248let metadata_files_iter = match std::fs::read_dir(self.metadata_dir.as_ref()) {249Ok(v) => v,250Err(e) => {251let msg = format!("failed to read metadata directory: {e}");252253return Err(PolarsError::IO {254error: e.into(),255msg: Some(msg.into()),256});257},258};259260let mut files_to_remove = Vec::with_capacity(261data_files_iter262.size_hint()263.1264.unwrap_or(data_files_iter.size_hint().0)265+ metadata_files_iter266.size_hint()267.1268.unwrap_or(metadata_files_iter.size_hint().0),269);270271let now = SystemTime::now();272273for file in data_files_iter {274let file = file?;275let path = file.path();276277let hash = path278.file_name()279.unwrap()280.to_str()281.unwrap()282.get(..32)283.unwrap();284let metadata_path = self.metadata_dir.join(hash);285286let mut eviction_candidate = EvictionCandidate {287path,288metadata_path,289metadata_last_modified: UNIX_EPOCH,290ttl: 0,291};292eviction_candidate.update_ttl();293294if eviction_candidate.should_remove(&now) {295files_to_remove.push(eviction_candidate);296}297}298299for file in metadata_files_iter {300let file = file?;301let path = file.path();302let metadata_path = path.clone();303304let mut eviction_candidate = EvictionCandidate {305path,306metadata_path,307metadata_last_modified: UNIX_EPOCH,308ttl: 0,309};310311eviction_candidate.update_ttl();312313if eviction_candidate.should_remove(&now) {314files_to_remove.push(eviction_candidate);315}316}317318self.files_to_remove = Some(files_to_remove);319320Ok(())321}322323/// # Panics324/// Panics if `self.files_to_remove` is `None`.325fn evict_files(&mut self, _guard: &GlobalFileCacheGuardExclusive) {326let verbose = false;327let mut files_to_remove = self.files_to_remove.take().unwrap();328let now = &SystemTime::now();329330for eviction_candidate in files_to_remove.iter_mut() {331eviction_candidate.try_evict(now, verbose, _guard);332}333}334}335336337