Path: blob/main/crates/polars-io/src/file_cache/cache.rs
6939 views
use std::path::Path;1use std::sync::atomic::AtomicU64;2use std::sync::{Arc, LazyLock, RwLock};34use polars_core::config;5use polars_error::PolarsResult;6use polars_utils::aliases::PlHashMap;7use polars_utils::plpath::PlPathRef;89use super::entry::{DATA_PREFIX, FileCacheEntry, METADATA_PREFIX};10use super::eviction::EvictionManager;11use super::file_fetcher::FileFetcher;12use super::utils::FILE_CACHE_PREFIX;13use crate::path_utils::ensure_directory_init;1415pub static FILE_CACHE: LazyLock<FileCache> = LazyLock::new(|| {16let prefix = FILE_CACHE_PREFIX.as_ref();17let prefix = Arc::<Path>::from(prefix);1819if config::verbose() {20eprintln!("file cache prefix: {}", prefix.to_str().unwrap());21}2223let min_ttl = Arc::new(AtomicU64::from(get_env_file_cache_ttl()));24let notify_ttl_updated = Arc::new(tokio::sync::Notify::new());2526let metadata_dir = prefix27.as_ref()28.join(std::str::from_utf8(&[METADATA_PREFIX]).unwrap())29.into_boxed_path();30if let Err(err) = ensure_directory_init(&metadata_dir) {31panic!(32"failed to create file cache metadata directory: path = {}, err = {}",33metadata_dir.to_str().unwrap(),34err35)36}3738let data_dir = prefix39.as_ref()40.join(std::str::from_utf8(&[DATA_PREFIX]).unwrap())41.into_boxed_path();4243if let Err(err) = ensure_directory_init(&data_dir) {44panic!(45"failed to create file cache data directory: path = {}, err = {}",46data_dir.to_str().unwrap(),47err48)49}5051EvictionManager {52data_dir,53metadata_dir,54files_to_remove: None,55min_ttl: min_ttl.clone(),56notify_ttl_updated: notify_ttl_updated.clone(),57}58.run_in_background();5960// Safety: We have created the data and metadata directories.61unsafe { FileCache::new_unchecked(prefix, min_ttl, notify_ttl_updated) }62});6364pub struct FileCache {65prefix: Arc<Path>,66entries: Arc<RwLock<PlHashMap<Arc<str>, Arc<FileCacheEntry>>>>,67min_ttl: Arc<AtomicU64>,68notify_ttl_updated: Arc<tokio::sync::Notify>,69}7071impl FileCache {72/// # Safety73/// The following directories exist:74/// * `{prefix}/{METADATA_PREFIX}/`75/// * `{prefix}/{DATA_PREFIX}/`76unsafe fn new_unchecked(77prefix: Arc<Path>,78min_ttl: Arc<AtomicU64>,79notify_ttl_updated: Arc<tokio::sync::Notify>,80) -> Self {81Self {82prefix,83entries: Default::default(),84min_ttl,85notify_ttl_updated,86}87}8889/// If `uri` is a local path, it must be an absolute path. This is not exposed90/// for now - initialize entries using `init_entries_from_uri_list` instead.91pub(super) fn init_entry(92&self,93uri: Arc<str>,94get_file_fetcher: &dyn Fn() -> PolarsResult<Arc<dyn FileFetcher>>,95ttl: u64,96) -> PolarsResult<Arc<FileCacheEntry>> {97let verbose = config::verbose();9899#[cfg(debug_assertions)]100{101// Local paths must be absolute or else the cache would be wrong.102if !PlPathRef::new(uri.as_ref()).is_cloud_url() {103let path = Path::new(uri.as_ref());104assert_eq!(path, std::fs::canonicalize(path).unwrap().as_path());105}106}107108if self109.min_ttl110.fetch_min(ttl, std::sync::atomic::Ordering::Relaxed)111< ttl112{113self.notify_ttl_updated.notify_one();114}115116{117let entries = self.entries.read().unwrap();118119if let Some(entry) = entries.get(uri.as_ref()) {120if verbose {121eprintln!(122"[file_cache] init_entry: return existing entry for uri = {}",123uri.clone()124);125}126entry.update_ttl(ttl);127return Ok(entry.clone());128}129}130131let uri_hash = blake3::hash(uri.as_bytes()).to_hex()[..32].to_string();132133{134let mut entries = self.entries.write().unwrap();135136// May have been raced137if let Some(entry) = entries.get(uri.as_ref()) {138if verbose {139eprintln!(140"[file_cache] init_entry: return existing entry for uri = {} (lost init race)",141uri.clone()142);143}144entry.update_ttl(ttl);145return Ok(entry.clone());146}147148if verbose {149eprintln!(150"[file_cache] init_entry: creating new entry for uri = {uri}, hash = {uri_hash}"151);152}153154let entry = Arc::new(FileCacheEntry::new(155uri.clone(),156uri_hash,157self.prefix.clone(),158get_file_fetcher()?,159ttl,160));161entries.insert(uri, entry.clone());162Ok(entry)163}164}165166/// This function can accept relative local paths.167pub fn get_entry(&self, addr: PlPathRef<'_>) -> Option<Arc<FileCacheEntry>> {168match addr {169PlPathRef::Local(p) => {170let p = std::fs::canonicalize(p).unwrap();171self.entries172.read()173.unwrap()174.get(p.to_str().unwrap())175.map(Arc::clone)176},177PlPathRef::Cloud(p) => self.entries.read().unwrap().get(p.uri()).map(Arc::clone),178}179}180}181182pub fn get_env_file_cache_ttl() -> u64 {183std::env::var("POLARS_FILE_CACHE_TTL")184.map(|x| x.parse::<u64>().expect("integer"))185.unwrap_or(60 * 60)186}187188189