Path: blob/main/crates/polars-io/src/file_cache/eviction.rs
8422 views
use std::sync::Arc;1use std::sync::atomic::AtomicU64;2use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};34use fs4::fs_std::FileExt;5use polars_error::{PolarsError, PolarsResult};6use polars_utils::pl_path::PlRefPath;78use super::cache_lock::{GLOBAL_FILE_CACHE_LOCK, GlobalFileCacheGuardExclusive};9use super::metadata::EntryMetadata;10use crate::pl_async;1112#[derive(Debug, Clone)]13pub(super) struct EvictionCandidate {14path: PlRefPath,15metadata_path: PlRefPath,16metadata_last_modified: SystemTime,17ttl: u64,18}1920pub(super) struct EvictionManager {21pub(super) data_dir: PlRefPath,22pub(super) metadata_dir: PlRefPath,23pub(super) files_to_remove: Option<Vec<EvictionCandidate>>,24pub(super) min_ttl: Arc<AtomicU64>,25pub(super) notify_ttl_updated: Arc<tokio::sync::Notify>,26}2728impl EvictionCandidate {29fn update_ttl(&mut self) {30let Ok(metadata_last_modified) =31std::fs::metadata(&self.metadata_path).map(|md| md.modified().unwrap())32else {33self.ttl = 0;34return;35};3637if self.metadata_last_modified == metadata_last_modified {38return;39}4041let Ok(ref mut file) = std::fs::OpenOptions::new()42.read(true)43.open(&self.metadata_path)44else {45self.ttl = 0;46return;47};4849let ttl = EntryMetadata::try_from_reader(file)50.map(|x| x.ttl)51.unwrap_or(0);5253self.metadata_last_modified = metadata_last_modified;54self.ttl = ttl;55}5657fn should_remove(&self, now: &SystemTime) -> bool {58let Ok(metadata) = std::fs::metadata(&self.path) else {59return false;60};6162if let Ok(duration) = now.duration_since(63metadata64.accessed()65.unwrap_or_else(|_| metadata.modified().unwrap()),66) {67duration.as_secs() >= self.ttl68} else {69false70}71}7273fn try_evict(74&mut self,75now: &SystemTime,76verbose: bool,77_guard: &GlobalFileCacheGuardExclusive,78) {79self.update_ttl();80let path = &self.path;8182if !path.as_std_path().exists() {83if verbose {84eprintln!(85"[EvictionManager] evict_files: skipping {} (path no longer exists)",86path87);88}89return;90}9192let metadata = std::fs::metadata(path).unwrap();9394let since_last_accessed = match now.duration_since(95metadata96.accessed()97.unwrap_or_else(|_| metadata.modified().unwrap()),98) {99Ok(v) => v.as_secs(),100Err(_) => {101if verbose {102eprintln!(103"[EvictionManager] evict_files: skipping {} (last accessed time was updated)",104path105);106}107return;108},109};110111if since_last_accessed < self.ttl {112if verbose {113eprintln!(114"[EvictionManager] evict_files: skipping {} (last accessed time was updated)",115path116);117}118return;119}120121{122let file = std::fs::OpenOptions::new().read(true).open(path).unwrap();123124if file.try_lock_exclusive().is_err() {125if verbose {126eprintln!(127"[EvictionManager] evict_files: skipping {} (file is locked)",128self.path129);130}131return;132}133}134135if let Err(err) = std::fs::remove_file(path) {136if verbose {137eprintln!(138"[EvictionManager] evict_files: error removing file: {} ({})",139path, err140);141}142} else if verbose {143eprintln!("[EvictionManager] evict_files: removed file at {}", path);144}145}146}147148impl EvictionManager {149/// # Safety150/// The following directories exist:151/// * `self.data_dir`152/// * `self.metadata_dir`153pub(super) fn run_in_background(mut self) {154let verbose = false;155156if verbose {157eprintln!(158"[EvictionManager] creating cache eviction background task, self.min_ttl = {}",159self.min_ttl.load(std::sync::atomic::Ordering::Relaxed)160);161}162163pl_async::get_runtime().spawn(async move {164// Give some time at startup for other code to run.165tokio::time::sleep(Duration::from_secs(3)).await;166let mut last_eviction_time;167168loop {169let this: &'static mut Self = unsafe { std::mem::transmute(&mut self) };170171let result = tokio::task::spawn_blocking(|| this.update_file_list())172.await173.unwrap();174175last_eviction_time = Instant::now();176177match result {178Ok(_) if self.files_to_remove.as_ref().unwrap().is_empty() => {},179Ok(_) => loop {180if let Some(guard) = GLOBAL_FILE_CACHE_LOCK.try_lock_eviction() {181if verbose {182eprintln!(183"[EvictionManager] got exclusive cache lock, evicting {} files",184self.files_to_remove.as_ref().unwrap().len()185);186}187188tokio::task::block_in_place(|| self.evict_files(&guard));189break;190}191tokio::time::sleep(Duration::from_secs(7)).await;192},193Err(err) => {194if verbose {195eprintln!("[EvictionManager] error updating file list: {err}");196}197},198}199200loop {201let min_ttl = self.min_ttl.load(std::sync::atomic::Ordering::Relaxed);202let sleep_interval = std::cmp::max(min_ttl / 4, {203#[cfg(debug_assertions)]204{2053206}207#[cfg(not(debug_assertions))]208{20960210}211});212213let since_last_eviction =214Instant::now().duration_since(last_eviction_time).as_secs();215let sleep_interval = sleep_interval.saturating_sub(since_last_eviction);216let sleep_interval = Duration::from_secs(sleep_interval);217218tokio::select! {219_ = self.notify_ttl_updated.notified() => {220continue;221}222_ = tokio::time::sleep(sleep_interval) => {223break;224}225}226}227}228});229}230231fn update_file_list(&mut self) -> PolarsResult<()> {232let data_files_iter = match std::fs::read_dir(self.data_dir.as_std_path()) {233Ok(v) => v,234Err(e) => {235let msg = format!("failed to read data directory: {e}");236237return Err(PolarsError::IO {238error: e.into(),239msg: Some(msg.into()),240});241},242};243244let metadata_files_iter = match std::fs::read_dir(self.metadata_dir.as_std_path()) {245Ok(v) => v,246Err(e) => {247let msg = format!("failed to read metadata directory: {e}");248249return Err(PolarsError::IO {250error: e.into(),251msg: Some(msg.into()),252});253},254};255256let mut files_to_remove = Vec::with_capacity(257data_files_iter258.size_hint()259.1260.unwrap_or(data_files_iter.size_hint().0)261+ metadata_files_iter262.size_hint()263.1264.unwrap_or(metadata_files_iter.size_hint().0),265);266267let now = SystemTime::now();268269for file in data_files_iter {270let file = file?;271let path = PlRefPath::try_from_pathbuf(file.path())?;272273let hash = path274.file_name()275.unwrap()276.to_str()277.unwrap()278.get(..32)279.unwrap();280let metadata_path = self.metadata_dir.join(hash);281282let mut eviction_candidate = EvictionCandidate {283path,284metadata_path,285metadata_last_modified: UNIX_EPOCH,286ttl: 0,287};288eviction_candidate.update_ttl();289290if eviction_candidate.should_remove(&now) {291files_to_remove.push(eviction_candidate);292}293}294295for file in metadata_files_iter {296let file = file?;297let path = PlRefPath::try_from_pathbuf(file.path())?;298let metadata_path = path.clone();299300let mut eviction_candidate = EvictionCandidate {301path,302metadata_path,303metadata_last_modified: UNIX_EPOCH,304ttl: 0,305};306307eviction_candidate.update_ttl();308309if eviction_candidate.should_remove(&now) {310files_to_remove.push(eviction_candidate);311}312}313314self.files_to_remove = Some(files_to_remove);315316Ok(())317}318319/// # Panics320/// Panics if `self.files_to_remove` is `None`.321fn evict_files(&mut self, _guard: &GlobalFileCacheGuardExclusive) {322let verbose = false;323let mut files_to_remove = self.files_to_remove.take().unwrap();324let now = &SystemTime::now();325326for eviction_candidate in files_to_remove.iter_mut() {327eviction_candidate.try_evict(now, verbose, _guard);328}329}330}331332333