Path: blob/main/crates/polars-io/src/file_cache/entry.rs
6939 views
use std::io::{Seek, SeekFrom};1use std::path::{Path, PathBuf};2use std::sync::atomic::AtomicU64;3use std::sync::{Arc, LazyLock, Mutex};45use fs4::fs_std::FileExt;6use polars_core::config;7use polars_error::{PolarsError, PolarsResult, polars_bail, to_compute_err};89use super::cache_lock::{self, GLOBAL_FILE_CACHE_LOCK};10use super::file_fetcher::{FileFetcher, RemoteMetadata};11use super::file_lock::{FileLock, FileLockAnyGuard};12use super::metadata::{EntryMetadata, FileVersion};13use super::utils::update_last_accessed;1415pub(super) const DATA_PREFIX: u8 = b'd';16pub(super) const METADATA_PREFIX: u8 = b'm';1718struct CachedData {19last_modified: u64,20metadata: Arc<EntryMetadata>,21data_file_path: PathBuf,22}2324struct Inner {25uri: Arc<str>,26uri_hash: String,27path_prefix: Arc<Path>,28metadata: FileLock<PathBuf>,29cached_data: Option<CachedData>,30ttl: Arc<AtomicU64>,31file_fetcher: Arc<dyn FileFetcher>,32}3334struct EntryData {35uri: Arc<str>,36inner: Mutex<Inner>,37ttl: Arc<AtomicU64>,38}3940pub struct FileCacheEntry(EntryData);4142impl EntryMetadata {43fn matches_remote_metadata(&self, remote_metadata: &RemoteMetadata) -> bool {44self.remote_version == remote_metadata.version && self.local_size == remote_metadata.size45}46}4748impl Inner {49fn try_open_assume_latest(&mut self) -> PolarsResult<std::fs::File> {50let verbose = config::verbose();5152{53let cache_guard = GLOBAL_FILE_CACHE_LOCK.lock_shared();54// We want to use an exclusive lock here to avoid an API call in the case where only the55// local TTL was updated.56let metadata_file = &mut self.metadata.acquire_exclusive().unwrap();57update_last_accessed(metadata_file);5859if let Ok(metadata) = self.try_get_metadata(metadata_file, &cache_guard) {60let data_file_path = self.get_cached_data_file_path();6162if metadata.compare_local_state(data_file_path).is_ok() {63if verbose {64eprintln!(65"[file_cache::entry] try_open_assume_latest: opening already fetched file for uri = {}",66self.uri.clone()67);68}69return Ok(finish_open(data_file_path, metadata_file));70}71}72}7374if verbose {75eprintln!(76"[file_cache::entry] try_open_assume_latest: did not find cached file for uri = {}",77self.uri.clone()78);79}8081self.try_open_check_latest()82}8384fn try_open_check_latest(&mut self) -> PolarsResult<std::fs::File> {85let verbose = config::verbose();86let remote_metadata = &self.file_fetcher.fetch_metadata()?;87let cache_guard = GLOBAL_FILE_CACHE_LOCK.lock_shared();8889{90let metadata_file = &mut self.metadata.acquire_shared().unwrap();91update_last_accessed(metadata_file);9293if let Ok(metadata) = self.try_get_metadata(metadata_file, &cache_guard) {94if metadata.matches_remote_metadata(remote_metadata) {95let data_file_path = self.get_cached_data_file_path();9697if metadata.compare_local_state(data_file_path).is_ok() {98if verbose {99eprintln!(100"[file_cache::entry] try_open_check_latest: opening already fetched file for uri = {}",101self.uri.clone()102);103}104return Ok(finish_open(data_file_path, metadata_file));105}106}107}108}109110let metadata_file = &mut self.metadata.acquire_exclusive().unwrap();111let metadata = self112.try_get_metadata(metadata_file, &cache_guard)113// Safety: `metadata_file` is an exclusive guard.114.unwrap_or_else(|_| {115Arc::new(EntryMetadata::new(116self.uri.clone(),117self.ttl.load(std::sync::atomic::Ordering::Relaxed),118))119});120121if metadata.matches_remote_metadata(remote_metadata) {122let data_file_path = self.get_cached_data_file_path();123124if metadata.compare_local_state(data_file_path).is_ok() {125if verbose {126eprintln!(127"[file_cache::entry] try_open_check_latest: opening already fetched file (lost race) for uri = {}",128self.uri.clone()129);130}131return Ok(finish_open(data_file_path, metadata_file));132}133}134135if verbose {136eprintln!(137"[file_cache::entry] try_open_check_latest: fetching new data file for uri = {}, remote_version = {:?}, remote_size = {}",138self.uri.clone(),139remote_metadata.version,140remote_metadata.size141);142}143144let data_file_path = &get_data_file_path(145self.path_prefix.to_str().unwrap().as_bytes(),146self.uri_hash.as_bytes(),147&remote_metadata.version,148);149// Remove the file if it exists, since it doesn't match the metadata.150// This could be left from an aborted process.151let _ = std::fs::remove_file(data_file_path);152if !self.file_fetcher.fetches_as_symlink() {153let file = std::fs::OpenOptions::new()154.write(true)155.create(true)156.truncate(true)157.open(data_file_path)158.map_err(PolarsError::from)?;159160// * Some(true) => always raise161// * Some(false) => never raise162// * None => do not raise if fallocate() is not permitted, otherwise raise.163static RAISE_ALLOC_ERROR: LazyLock<Option<bool>> = LazyLock::new(|| {164let v = match std::env::var("POLARS_IGNORE_FILE_CACHE_ALLOCATE_ERROR").as_deref() {165Ok("1") => Some(false),166Ok("0") => Some(true),167Err(_) => None,168Ok(v) => {169panic!("invalid value {v} for POLARS_IGNORE_FILE_CACHE_ALLOCATE_ERROR")170},171};172if config::verbose() {173eprintln!("[file_cache]: RAISE_ALLOC_ERROR: {v:?}");174}175v176});177178// Initialize it to get the verbose print179let raise_alloc_err = *RAISE_ALLOC_ERROR;180181file.lock_exclusive().unwrap();182if let Err(e) = file.allocate(remote_metadata.size) {183let msg = format!(184"failed to reserve {} bytes on disk to download uri = {}: {:?}",185remote_metadata.size,186self.uri.as_ref(),187e188);189190if raise_alloc_err == Some(true)191|| (raise_alloc_err.is_none() && file.allocate(1).is_ok())192{193polars_bail!(ComputeError: msg)194} else if config::verbose() {195eprintln!("[file_cache]: warning: {msg}")196}197}198}199self.file_fetcher.fetch(data_file_path)?;200201// Don't do this on windows as it will break setting last accessed times.202#[cfg(target_family = "unix")]203if !self.file_fetcher.fetches_as_symlink() {204let mut perms = std::fs::metadata(data_file_path.clone())205.unwrap()206.permissions();207perms.set_readonly(true);208std::fs::set_permissions(data_file_path, perms).unwrap();209}210211let data_file_metadata = std::fs::metadata(data_file_path).unwrap();212let local_last_modified = super::utils::last_modified_u64(&data_file_metadata);213let local_size = data_file_metadata.len();214215if local_size != remote_metadata.size {216polars_bail!(ComputeError: "downloaded file size ({}) does not match expected size ({})", local_size, remote_metadata.size);217}218219let mut metadata = metadata;220let metadata = Arc::make_mut(&mut metadata);221metadata.local_last_modified = local_last_modified;222metadata.local_size = local_size;223metadata.remote_version = remote_metadata.version.clone();224225if let Err(e) = metadata.compare_local_state(data_file_path) {226panic!("metadata mismatch after file fetch: {e}");227}228229let data_file = finish_open(data_file_path, metadata_file);230231metadata_file.set_len(0).unwrap();232metadata_file.seek(SeekFrom::Start(0)).unwrap();233metadata234.try_write(&mut **metadata_file)235.map_err(to_compute_err)?;236237Ok(data_file)238}239240/// Try to read the metadata from disk. If `F` is an exclusive guard, this241/// will update the TTL stored in the metadata file if it does not match.242fn try_get_metadata<F: FileLockAnyGuard>(243&mut self,244metadata_file: &mut F,245_cache_guard: &cache_lock::GlobalFileCacheGuardAny,246) -> PolarsResult<Arc<EntryMetadata>> {247let last_modified = super::utils::last_modified_u64(&metadata_file.metadata().unwrap());248let ttl = self.ttl.load(std::sync::atomic::Ordering::Relaxed);249250for _ in 0..2 {251if let Some(ref cached) = self.cached_data {252if cached.last_modified == last_modified {253if cached.metadata.ttl != ttl {254polars_bail!(ComputeError: "TTL mismatch");255}256257if cached.metadata.uri != self.uri {258unimplemented!(259"hash collision: uri1 = {}, uri2 = {}, hash = {}",260cached.metadata.uri,261self.uri,262self.uri_hash,263);264}265266return Ok(cached.metadata.clone());267}268}269270// Ensure cache is unset if read fails271self.cached_data = None;272273let mut metadata =274EntryMetadata::try_from_reader(&mut **metadata_file).map_err(to_compute_err)?;275276// Note this means if multiple processes on the same system set a277// different TTL for the same path, the metadata file will constantly278// get overwritten.279if metadata.ttl != ttl {280if F::IS_EXCLUSIVE {281metadata.ttl = ttl;282metadata_file.set_len(0).unwrap();283metadata_file.seek(SeekFrom::Start(0)).unwrap();284metadata285.try_write(&mut **metadata_file)286.map_err(to_compute_err)?;287} else {288polars_bail!(ComputeError: "TTL mismatch");289}290}291292let metadata = Arc::new(metadata);293let data_file_path = get_data_file_path(294self.path_prefix.to_str().unwrap().as_bytes(),295self.uri_hash.as_bytes(),296&metadata.remote_version,297);298self.cached_data = Some(CachedData {299last_modified,300metadata,301data_file_path,302});303}304305unreachable!();306}307308/// # Panics309/// Panics if `self.cached_data` is `None`.310fn get_cached_data_file_path(&self) -> &Path {311&self.cached_data.as_ref().unwrap().data_file_path312}313}314315impl FileCacheEntry {316pub(crate) fn new(317uri: Arc<str>,318uri_hash: String,319path_prefix: Arc<Path>,320file_fetcher: Arc<dyn FileFetcher>,321file_cache_ttl: u64,322) -> Self {323let metadata = FileLock::from(get_metadata_file_path(324path_prefix.to_str().unwrap().as_bytes(),325uri_hash.as_bytes(),326));327328debug_assert!(329Arc::ptr_eq(&uri, file_fetcher.get_uri()),330"impl error: entry uri != file_fetcher uri"331);332333let ttl = Arc::new(AtomicU64::from(file_cache_ttl));334335Self(EntryData {336uri: uri.clone(),337inner: Mutex::new(Inner {338uri,339uri_hash,340path_prefix,341metadata,342cached_data: None,343ttl: ttl.clone(),344file_fetcher,345}),346ttl,347})348}349350pub fn uri(&self) -> &Arc<str> {351&self.0.uri352}353354/// Directly returns the cached file if it finds one without checking if355/// there is a newer version on the remote. This does not make any API calls356/// if it finds a cached file, otherwise it simply downloads the file.357pub fn try_open_assume_latest(&self) -> PolarsResult<std::fs::File> {358self.0.inner.lock().unwrap().try_open_assume_latest()359}360361/// Returns the cached file after ensuring it is up to date against the remote362/// This will always perform at least 1 API call for fetching metadata.363pub fn try_open_check_latest(&self) -> PolarsResult<std::fs::File> {364self.0.inner.lock().unwrap().try_open_check_latest()365}366367pub fn update_ttl(&self, ttl: u64) {368self.0.ttl.store(ttl, std::sync::atomic::Ordering::Relaxed);369}370}371372fn finish_open<F: FileLockAnyGuard>(data_file_path: &Path, _metadata_guard: &F) -> std::fs::File {373let file = {374#[cfg(not(target_family = "windows"))]375{376std::fs::OpenOptions::new()377.read(true)378.open(data_file_path)379.unwrap()380}381// windows requires write access to update the last accessed time382#[cfg(target_family = "windows")]383{384std::fs::OpenOptions::new()385.read(true)386.write(true)387.open(data_file_path)388.unwrap()389}390};391update_last_accessed(&file);392if FileExt::try_lock_shared(&file).is_err() {393panic!(394"finish_open: could not acquire shared lock on data file at {}",395data_file_path.to_str().unwrap()396);397}398file399}400401/// `[prefix]/d/[uri hash][last modified]`402fn get_data_file_path(403path_prefix: &[u8],404uri_hash: &[u8],405remote_version: &FileVersion,406) -> PathBuf {407let owned;408let path = [409path_prefix,410&[b'/', DATA_PREFIX, b'/'],411uri_hash,412match remote_version {413FileVersion::Timestamp(v) => {414owned = Some(format!("{v:013x}"));415owned.as_deref().unwrap()416},417FileVersion::ETag(v) => v.as_str(),418FileVersion::Uninitialized => panic!("impl error: version not initialized"),419}420.as_bytes(),421]422.concat();423PathBuf::from(String::from_utf8(path).unwrap())424}425426/// `[prefix]/m/[uri hash]`427fn get_metadata_file_path(path_prefix: &[u8], uri_hash: &[u8]) -> PathBuf {428let bytes = [path_prefix, &[b'/', METADATA_PREFIX, b'/'], uri_hash].concat();429PathBuf::from(String::from_utf8(bytes).unwrap())430}431432433