Path: blob/main/crates/polars-io/src/file_cache/entry.rs
8327 views
use std::io::{Seek, SeekFrom};1use std::path::{Path, PathBuf};2use std::sync::atomic::AtomicU64;3use std::sync::{Arc, LazyLock, Mutex};45use fs4::fs_std::FileExt;6use polars_core::config;7use polars_error::{PolarsError, PolarsResult, polars_bail, to_compute_err};8use polars_utils::pl_path::PlRefPath;910use super::cache_lock::{self, GLOBAL_FILE_CACHE_LOCK};11use super::file_fetcher::{FileFetcher, RemoteMetadata};12use super::file_lock::{FileLock, FileLockAnyGuard};13use super::metadata::{EntryMetadata, FileVersion};14use super::utils::update_last_accessed;1516pub(super) const DATA_PREFIX: u8 = b'd';17pub(super) const METADATA_PREFIX: u8 = b'm';1819struct CachedData {20last_modified: u64,21metadata: Arc<EntryMetadata>,22data_file_path: PathBuf,23}2425struct Inner {26uri: PlRefPath,27uri_hash: String,28path_prefix: PlRefPath,29metadata: FileLock<PathBuf>,30cached_data: Option<CachedData>,31ttl: Arc<AtomicU64>,32file_fetcher: Arc<dyn FileFetcher>,33}3435struct EntryData {36uri: PlRefPath,37inner: Mutex<Inner>,38ttl: Arc<AtomicU64>,39}4041pub struct FileCacheEntry(EntryData);4243impl EntryMetadata {44fn matches_remote_metadata(&self, remote_metadata: &RemoteMetadata) -> bool {45self.remote_version == remote_metadata.version && self.local_size == remote_metadata.size46}47}4849impl Inner {50fn try_open_assume_latest(&mut self) -> PolarsResult<std::fs::File> {51let verbose = config::verbose();5253{54let cache_guard = GLOBAL_FILE_CACHE_LOCK.lock_shared();55// We want to use an exclusive lock here to avoid an API call in the case where only the56// local TTL was updated.57let metadata_file = &mut self.metadata.acquire_exclusive().unwrap();58update_last_accessed(metadata_file);5960if let Ok(metadata) = self.try_get_metadata(metadata_file, &cache_guard) {61let data_file_path = self.get_cached_data_file_path();6263if metadata.compare_local_state(data_file_path).is_ok() {64if verbose {65eprintln!(66"[file_cache::entry] try_open_assume_latest: opening already fetched file for uri = {}",67self.uri.clone()68);69}70return Ok(finish_open(data_file_path, metadata_file));71}72}73}7475if verbose {76eprintln!(77"[file_cache::entry] try_open_assume_latest: did not find cached file for uri = {}",78self.uri.clone()79);80}8182self.try_open_check_latest()83}8485fn try_open_check_latest(&mut self) -> PolarsResult<std::fs::File> {86let verbose = config::verbose();87let remote_metadata = &self.file_fetcher.fetch_metadata()?;88let cache_guard = GLOBAL_FILE_CACHE_LOCK.lock_shared();8990{91let metadata_file = &mut self.metadata.acquire_shared().unwrap();92update_last_accessed(metadata_file);9394if let Ok(metadata) = self.try_get_metadata(metadata_file, &cache_guard) {95if metadata.matches_remote_metadata(remote_metadata) {96let data_file_path = self.get_cached_data_file_path();9798if metadata.compare_local_state(data_file_path).is_ok() {99if verbose {100eprintln!(101"[file_cache::entry] try_open_check_latest: opening already fetched file for uri = {}",102self.uri.clone()103);104}105return Ok(finish_open(data_file_path, metadata_file));106}107}108}109}110111let metadata_file = &mut self.metadata.acquire_exclusive().unwrap();112let metadata = self113.try_get_metadata(metadata_file, &cache_guard)114// Safety: `metadata_file` is an exclusive guard.115.unwrap_or_else(|_| {116Arc::new(EntryMetadata::new(117self.uri.clone(),118self.ttl.load(std::sync::atomic::Ordering::Relaxed),119))120});121122if metadata.matches_remote_metadata(remote_metadata) {123let data_file_path = self.get_cached_data_file_path();124125if metadata.compare_local_state(data_file_path).is_ok() {126if verbose {127eprintln!(128"[file_cache::entry] try_open_check_latest: opening already fetched file (lost race) for uri = {}",129self.uri.clone()130);131}132return Ok(finish_open(data_file_path, metadata_file));133}134}135136if verbose {137eprintln!(138"[file_cache::entry] try_open_check_latest: fetching new data file for uri = {}, remote_version = {:?}, remote_size = {}",139self.uri.clone(),140remote_metadata.version,141remote_metadata.size142);143}144145let data_file_path = &get_data_file_path(146self.path_prefix.as_bytes(),147self.uri_hash.as_bytes(),148&remote_metadata.version,149);150// Remove the file if it exists, since it doesn't match the metadata.151// This could be left from an aborted process.152let _ = std::fs::remove_file(data_file_path);153if !self.file_fetcher.fetches_as_symlink() {154let file = std::fs::OpenOptions::new()155.write(true)156.create(true)157.truncate(true)158.open(data_file_path)159.map_err(PolarsError::from)?;160161// * Some(true) => always raise162// * Some(false) => never raise163// * None => do not raise if fallocate() is not permitted, otherwise raise.164static RAISE_ALLOC_ERROR: LazyLock<Option<bool>> = LazyLock::new(|| {165let v = match std::env::var("POLARS_IGNORE_FILE_CACHE_ALLOCATE_ERROR").as_deref() {166Ok("1") => Some(false),167Ok("0") => Some(true),168Err(_) => None,169Ok(v) => {170panic!("invalid value {v} for POLARS_IGNORE_FILE_CACHE_ALLOCATE_ERROR")171},172};173if config::verbose() {174eprintln!("[file_cache]: RAISE_ALLOC_ERROR: {v:?}");175}176v177});178179// Initialize it to get the verbose print180let raise_alloc_err = *RAISE_ALLOC_ERROR;181182file.lock_exclusive().unwrap();183if let Err(e) = file.allocate(remote_metadata.size) {184let msg = format!(185"failed to reserve {} bytes on disk to download uri = {}: {:?}",186remote_metadata.size, &self.uri, e187);188189if raise_alloc_err == Some(true)190|| (raise_alloc_err.is_none() && file.allocate(1).is_ok())191{192polars_bail!(ComputeError: msg)193} else if config::verbose() {194eprintln!("[file_cache]: warning: {msg}")195}196}197}198self.file_fetcher.fetch(data_file_path)?;199200// Don't do this on windows as it will break setting last accessed times.201#[cfg(target_family = "unix")]202if !self.file_fetcher.fetches_as_symlink() {203let mut perms = std::fs::metadata(data_file_path.clone())204.unwrap()205.permissions();206perms.set_readonly(true);207std::fs::set_permissions(data_file_path, perms).unwrap();208}209210let data_file_metadata = std::fs::metadata(data_file_path).unwrap();211let local_last_modified = super::utils::last_modified_u64(&data_file_metadata);212let local_size = data_file_metadata.len();213214if local_size != remote_metadata.size {215polars_bail!(ComputeError: "downloaded file size ({}) does not match expected size ({})", local_size, remote_metadata.size);216}217218let mut metadata = metadata;219let metadata = Arc::make_mut(&mut metadata);220metadata.local_last_modified = local_last_modified;221metadata.local_size = local_size;222metadata.remote_version = remote_metadata.version.clone();223224if let Err(e) = metadata.compare_local_state(data_file_path) {225panic!("metadata mismatch after file fetch: {e}");226}227228let data_file = finish_open(data_file_path, metadata_file);229230metadata_file.set_len(0).unwrap();231metadata_file.seek(SeekFrom::Start(0)).unwrap();232metadata233.try_write(&mut **metadata_file)234.map_err(to_compute_err)?;235236Ok(data_file)237}238239/// Try to read the metadata from disk. If `F` is an exclusive guard, this240/// will update the TTL stored in the metadata file if it does not match.241fn try_get_metadata<F: FileLockAnyGuard>(242&mut self,243metadata_file: &mut F,244_cache_guard: &cache_lock::GlobalFileCacheGuardAny,245) -> PolarsResult<Arc<EntryMetadata>> {246let last_modified = super::utils::last_modified_u64(&metadata_file.metadata().unwrap());247let ttl = self.ttl.load(std::sync::atomic::Ordering::Relaxed);248249for _ in 0..2 {250if let Some(ref cached) = self.cached_data {251if cached.last_modified == last_modified {252if cached.metadata.ttl != ttl {253polars_bail!(ComputeError: "TTL mismatch");254}255256if cached.metadata.uri != self.uri {257unimplemented!(258"hash collision: uri1 = {}, uri2 = {}, hash = {}",259cached.metadata.uri,260self.uri,261self.uri_hash,262);263}264265return Ok(cached.metadata.clone());266}267}268269// Ensure cache is unset if read fails270self.cached_data = None;271272let mut metadata =273EntryMetadata::try_from_reader(&mut **metadata_file).map_err(to_compute_err)?;274275// Note this means if multiple processes on the same system set a276// different TTL for the same path, the metadata file will constantly277// get overwritten.278if metadata.ttl != ttl {279if F::IS_EXCLUSIVE {280metadata.ttl = ttl;281metadata_file.set_len(0).unwrap();282metadata_file.seek(SeekFrom::Start(0)).unwrap();283metadata284.try_write(&mut **metadata_file)285.map_err(to_compute_err)?;286} else {287polars_bail!(ComputeError: "TTL mismatch");288}289}290291let metadata = Arc::new(metadata);292let data_file_path = get_data_file_path(293self.path_prefix.as_bytes(),294self.uri_hash.as_bytes(),295&metadata.remote_version,296);297self.cached_data = Some(CachedData {298last_modified,299metadata,300data_file_path,301});302}303304unreachable!();305}306307/// # Panics308/// Panics if `self.cached_data` is `None`.309fn get_cached_data_file_path(&self) -> &Path {310&self.cached_data.as_ref().unwrap().data_file_path311}312}313314impl FileCacheEntry {315pub(crate) fn new(316uri: PlRefPath,317uri_hash: String,318path_prefix: PlRefPath,319file_fetcher: Arc<dyn FileFetcher>,320file_cache_ttl: u64,321) -> Self {322let metadata = FileLock::from(get_metadata_file_path(323path_prefix.as_bytes(),324uri_hash.as_bytes(),325));326327debug_assert!(328PlRefPath::ptr_eq(&uri, file_fetcher.get_uri()),329"impl error: entry uri != file_fetcher uri"330);331332let ttl = Arc::new(AtomicU64::from(file_cache_ttl));333334Self(EntryData {335uri: uri.clone(),336inner: Mutex::new(Inner {337uri,338uri_hash,339path_prefix,340metadata,341cached_data: None,342ttl: ttl.clone(),343file_fetcher,344}),345ttl,346})347}348349pub fn uri(&self) -> &PlRefPath {350&self.0.uri351}352353/// Directly returns the cached file if it finds one without checking if354/// there is a newer version on the remote. This does not make any API calls355/// if it finds a cached file, otherwise it simply downloads the file.356pub fn try_open_assume_latest(&self) -> PolarsResult<std::fs::File> {357self.0.inner.lock().unwrap().try_open_assume_latest()358}359360/// Returns the cached file after ensuring it is up to date against the remote361/// This will always perform at least 1 API call for fetching metadata.362pub fn try_open_check_latest(&self) -> PolarsResult<std::fs::File> {363self.0.inner.lock().unwrap().try_open_check_latest()364}365366pub fn update_ttl(&self, ttl: u64) {367self.0.ttl.store(ttl, std::sync::atomic::Ordering::Relaxed);368}369}370371fn finish_open<F: FileLockAnyGuard>(data_file_path: &Path, _metadata_guard: &F) -> std::fs::File {372let file = {373#[cfg(not(target_family = "windows"))]374{375std::fs::OpenOptions::new()376.read(true)377.open(data_file_path)378.unwrap()379}380// windows requires write access to update the last accessed time381#[cfg(target_family = "windows")]382{383std::fs::OpenOptions::new()384.read(true)385.write(true)386.open(data_file_path)387.unwrap()388}389};390update_last_accessed(&file);391if FileExt::try_lock_shared(&file).is_err() {392panic!(393"finish_open: could not acquire shared lock on data file at {:?}",394data_file_path395);396}397file398}399400/// `[prefix]/d/[uri hash][last modified]`401fn get_data_file_path(402path_prefix: &[u8],403uri_hash: &[u8],404remote_version: &FileVersion,405) -> PathBuf {406let owned;407let path = [408path_prefix,409&[b'/', DATA_PREFIX, b'/'],410uri_hash,411match remote_version {412FileVersion::Timestamp(v) => {413owned = Some(format!("{v:013x}"));414owned.as_deref().unwrap()415},416FileVersion::ETag(v) => v.as_str(),417FileVersion::Uninitialized => panic!("impl error: version not initialized"),418}419.as_bytes(),420]421.concat();422PathBuf::from(String::from_utf8(path).unwrap())423}424425/// `[prefix]/m/[uri hash]`426fn get_metadata_file_path(path_prefix: &[u8], uri_hash: &[u8]) -> PathBuf {427let bytes = [path_prefix, &[b'/', METADATA_PREFIX, b'/'], uri_hash].concat();428PathBuf::from(String::from_utf8(bytes).unwrap())429}430431432