Path: blob/main/crates/polars-io/src/file_cache/cache.rs
8415 views
use std::sync::atomic::AtomicU64;1use std::sync::{Arc, LazyLock, RwLock};23use polars_core::config;4use polars_error::PolarsResult;5use polars_utils::aliases::PlHashMap;6use polars_utils::pl_path::PlRefPath;78use super::entry::{DATA_PREFIX, FileCacheEntry, METADATA_PREFIX};9use super::eviction::EvictionManager;10use super::file_fetcher::FileFetcher;11use super::utils::FILE_CACHE_PREFIX;12use crate::path_utils::ensure_directory_init;1314pub static FILE_CACHE: LazyLock<FileCache> = LazyLock::new(|| {15let prefix = FILE_CACHE_PREFIX.clone();1617if config::verbose() {18eprintln!("file cache prefix: {}", prefix);19}2021let min_ttl = Arc::new(AtomicU64::from(get_env_file_cache_ttl()));22let notify_ttl_updated = Arc::new(tokio::sync::Notify::new());2324let metadata_dir = prefix.join(std::str::from_utf8(&[METADATA_PREFIX]).unwrap());25if let Err(err) = ensure_directory_init(metadata_dir.as_std_path()) {26panic!(27"failed to create file cache metadata directory: path = {}, err = {}",28metadata_dir, err29)30}3132let data_dir = prefix.join(std::str::from_utf8(&[DATA_PREFIX]).unwrap());3334if let Err(err) = ensure_directory_init(data_dir.as_std_path()) {35panic!(36"failed to create file cache data directory: path = {}, err = {}",37data_dir, err38)39}4041EvictionManager {42data_dir,43metadata_dir,44files_to_remove: None,45min_ttl: min_ttl.clone(),46notify_ttl_updated: notify_ttl_updated.clone(),47}48.run_in_background();4950// Safety: We have created the data and metadata directories.51unsafe { FileCache::new_unchecked(prefix, min_ttl, notify_ttl_updated) }52});5354pub struct FileCache {55prefix: PlRefPath,56entries: Arc<RwLock<PlHashMap<PlRefPath, Arc<FileCacheEntry>>>>,57min_ttl: Arc<AtomicU64>,58notify_ttl_updated: Arc<tokio::sync::Notify>,59}6061impl FileCache {62/// # Safety63/// The following directories exist:64/// * `{prefix}/{METADATA_PREFIX}/`65/// * `{prefix}/{DATA_PREFIX}/`66unsafe fn new_unchecked(67prefix: PlRefPath,68min_ttl: Arc<AtomicU64>,69notify_ttl_updated: Arc<tokio::sync::Notify>,70) -> Self {71Self {72prefix,73entries: Default::default(),74min_ttl,75notify_ttl_updated,76}77}7879/// If `uri` is a local path, it must be an absolute path. This is not exposed80/// for now - initialize entries using `init_entries_from_uri_list` instead.81pub(super) fn init_entry(82&self,83uri: PlRefPath,84get_file_fetcher: &dyn Fn() -> PolarsResult<Arc<dyn FileFetcher>>,85ttl: u64,86) -> PolarsResult<Arc<FileCacheEntry>> {87let verbose = config::verbose();8889// Local paths must be absolute or else the cache would be wrong.90if !uri.has_scheme() {91debug_assert_eq!(92std::fs::canonicalize(uri.as_str())93.ok()94.and_then(|x| PlRefPath::try_from_pathbuf(x).ok())95.as_ref(),96Some(&uri)97)98}99100if self101.min_ttl102.fetch_min(ttl, std::sync::atomic::Ordering::Relaxed)103< ttl104{105self.notify_ttl_updated.notify_one();106}107108{109let entries = self.entries.read().unwrap();110111if let Some(entry) = entries.get(&uri) {112if verbose {113eprintln!(114"[file_cache] init_entry: return existing entry for uri = {}",115uri.clone()116);117}118entry.update_ttl(ttl);119return Ok(entry.clone());120}121}122123let uri_hash = blake3::hash(uri.as_bytes()).to_hex()[..32].to_string();124125{126let mut entries = self.entries.write().unwrap();127128// May have been raced129if let Some(entry) = entries.get(&uri) {130if verbose {131eprintln!(132"[file_cache] init_entry: return existing entry for uri = {} (lost init race)",133uri.clone()134);135}136entry.update_ttl(ttl);137return Ok(entry.clone());138}139140if verbose {141eprintln!(142"[file_cache] init_entry: creating new entry for uri = {uri}, hash = {uri_hash}"143);144}145146let entry = Arc::new(FileCacheEntry::new(147uri.clone(),148uri_hash,149self.prefix.clone(),150get_file_fetcher()?,151ttl,152));153entries.insert(uri.clone(), entry.clone());154Ok(entry)155}156}157158/// This function can accept relative local paths.159pub fn get_entry(&self, path: PlRefPath) -> Option<Arc<FileCacheEntry>> {160if path.has_scheme() {161self.entries.read().unwrap().get(&path).cloned()162} else {163let p =164PlRefPath::try_from_pathbuf(std::fs::canonicalize(path.as_str()).unwrap()).unwrap();165self.entries.read().unwrap().get(&p).cloned()166}167}168}169170pub fn get_env_file_cache_ttl() -> u64 {171std::env::var("POLARS_FILE_CACHE_TTL")172.map(|x| x.parse::<u64>().expect("integer"))173.unwrap_or(60 * 60)174}175176177