Path: blob/main/crates/polars-io/src/file_cache/utils.rs
6939 views
use std::path::Path;1use std::sync::{Arc, LazyLock};2use std::time::UNIX_EPOCH;34use polars_error::{PolarsError, PolarsResult};5use polars_utils::plpath::{CloudScheme, PlPathRef};67use super::cache::{FILE_CACHE, get_env_file_cache_ttl};8use super::entry::FileCacheEntry;9use super::file_fetcher::{CloudFileFetcher, LocalFileFetcher};10use crate::cloud::{CloudLocation, CloudOptions, build_object_store, object_path_from_str};11use crate::path_utils::{POLARS_TEMP_DIR_BASE_PATH, ensure_directory_init};12use crate::pl_async;1314pub static FILE_CACHE_PREFIX: LazyLock<Box<Path>> = LazyLock::new(|| {15let path = POLARS_TEMP_DIR_BASE_PATH16.join("file-cache/")17.into_boxed_path();1819if let Err(err) = ensure_directory_init(path.as_ref()) {20panic!(21"failed to create file cache directory: path = {}, err = {}",22path.to_str().unwrap(),23err24);25}2627path28});2930pub(super) fn last_modified_u64(metadata: &std::fs::Metadata) -> u64 {31u64::try_from(32metadata33.modified()34.unwrap()35.duration_since(UNIX_EPOCH)36.unwrap()37.as_millis(),38)39.unwrap()40}4142pub(super) fn update_last_accessed(file: &std::fs::File) {43let file_metadata = file.metadata().unwrap();4445if let Err(e) = file.set_times(46std::fs::FileTimes::new()47.set_modified(file_metadata.modified().unwrap())48.set_accessed(std::time::SystemTime::now()),49) {50panic!("failed to update file last accessed time: {e}");51}52}5354pub fn init_entries_from_uri_list(55mut uri_list: impl ExactSizeIterator<Item = Arc<str>>,56cloud_options: Option<&CloudOptions>,57) -> PolarsResult<Vec<Arc<FileCacheEntry>>> {58init_entries_from_uri_list_impl(&mut uri_list, cloud_options)59}6061fn init_entries_from_uri_list_impl(62uri_list: &mut dyn ExactSizeIterator<Item = Arc<str>>,63cloud_options: Option<&CloudOptions>,64) -> PolarsResult<Vec<Arc<FileCacheEntry>>> {65#[expect(clippy::len_zero)]66if uri_list.len() == 0 {67return Ok(Default::default());68}6970let mut uri_list = uri_list.peekable();7172let first_uri = PlPathRef::new(uri_list.peek().unwrap().as_ref());7374let file_cache_ttl = cloud_options75.map(|x| x.file_cache_ttl)76.unwrap_or_else(get_env_file_cache_ttl);7778if first_uri.is_cloud_url() {79let shared_object_store = (!matches!(80first_uri.scheme(),81Some(CloudScheme::Http | CloudScheme::Https) // Object stores for http are tied to the path.82))83.then(|| {84pl_async::get_runtime().block_in_place_on(async {85let (_, object_store) =86build_object_store(first_uri.to_str(), cloud_options, false).await?;8788PolarsResult::Ok(object_store)89})90})91.transpose()?;9293pl_async::get_runtime().block_in_place_on(async {94futures::future::try_join_all(uri_list.map(|uri| {95let shared_object_store = shared_object_store.clone();9697async move {98let object_store =99if let Some(shared_object_store) = shared_object_store.clone() {100shared_object_store101} else {102let (_, object_store) =103build_object_store(&uri, cloud_options, false).await?;104object_store105};106107FILE_CACHE.init_entry(108uri.clone(),109&|| {110let CloudLocation { prefix, .. } =111CloudLocation::new(uri.as_ref(), false).unwrap();112let cloud_path = object_path_from_str(&prefix)?;113114let uri = uri.clone();115let object_store = object_store.clone();116117Ok(Arc::new(CloudFileFetcher {118uri,119object_store,120cloud_path,121}))122},123file_cache_ttl,124)125}126}))127.await128})129} else {130uri_list131.map(|uri| {132let uri = std::fs::canonicalize(uri.as_ref()).map_err(|err| {133let msg = Some(format!("{}: {}", err, uri.as_ref()).into());134PolarsError::IO {135error: err.into(),136msg,137}138})?;139let uri = Arc::<str>::from(uri.to_str().unwrap());140141FILE_CACHE.init_entry(142uri.clone(),143&|| Ok(Arc::new(LocalFileFetcher::from_uri(uri.clone()))),144file_cache_ttl,145)146})147.collect::<PolarsResult<Vec<_>>>()148}149}150151152