Path: blob/main/crates/polars-io/src/file_cache/utils.rs
8421 views
use std::sync::{Arc, LazyLock};1use std::time::UNIX_EPOCH;23use polars_error::{PolarsError, PolarsResult};4use polars_utils::pl_path::{CloudScheme, PlRefPath};56use super::cache::{FILE_CACHE, get_env_file_cache_ttl};7use super::entry::FileCacheEntry;8use super::file_fetcher::{CloudFileFetcher, LocalFileFetcher};9use crate::cloud::{CloudLocation, CloudOptions, build_object_store, object_path_from_str};10use crate::path_utils::{POLARS_TEMP_DIR_BASE_PATH, ensure_directory_init};1112pub static FILE_CACHE_PREFIX: LazyLock<PlRefPath> = LazyLock::new(|| {13let path = PlRefPath::try_from_path(&POLARS_TEMP_DIR_BASE_PATH.join("file-cache/")).unwrap();1415if let Err(err) = ensure_directory_init(path.as_ref()) {16panic!(17"failed to create file cache directory: path = {}, err = {}",18path, err19);20}2122path23});2425pub(super) fn last_modified_u64(metadata: &std::fs::Metadata) -> u64 {26u64::try_from(27metadata28.modified()29.unwrap()30.duration_since(UNIX_EPOCH)31.unwrap()32.as_millis(),33)34.unwrap()35}3637pub(super) fn update_last_accessed(file: &std::fs::File) {38let file_metadata = file.metadata().unwrap();3940if let Err(e) = file.set_times(41std::fs::FileTimes::new()42.set_modified(file_metadata.modified().unwrap())43.set_accessed(std::time::SystemTime::now()),44) {45panic!("failed to update file last accessed time: {e}");46}47}4849pub async fn init_entries_from_uri_list(50uri_list: impl ExactSizeIterator<Item = PlRefPath> + Send + 'static,51cloud_options: Option<&CloudOptions>,52) -> PolarsResult<Vec<Arc<FileCacheEntry>>> {53init_entries_from_uri_list_impl(Box::new(uri_list), cloud_options).await54}5556async fn init_entries_from_uri_list_impl(57uri_list: Box<dyn ExactSizeIterator<Item = PlRefPath> + Send + 'static>,58cloud_options: Option<&CloudOptions>,59) -> PolarsResult<Vec<Arc<FileCacheEntry>>> {60#[allow(clippy::len_zero)]61if uri_list.len() == 0 {62return Ok(Default::default());63}6465let mut uri_list = uri_list.peekable();6667let first_uri = uri_list.peek().unwrap().clone();6869let file_cache_ttl = cloud_options70.map(|x| x.file_cache_ttl)71.unwrap_or_else(get_env_file_cache_ttl);7273if first_uri.has_scheme() {74let shared_object_store = if !matches!(75first_uri.scheme(),76Some(CloudScheme::Http | CloudScheme::Https) // Object stores for http are tied to the path.77) {78let (_, object_store) = build_object_store(first_uri, cloud_options, false).await?;79Some(object_store)80} else {81None82};8384futures::future::try_join_all(uri_list.map(|uri| {85let shared_object_store = shared_object_store.clone();8687async move {88let object_store = if let Some(shared_object_store) = shared_object_store.clone() {89shared_object_store90} else {91let (_, object_store) =92build_object_store(uri.clone(), cloud_options, false).await?;93object_store94};9596FILE_CACHE.init_entry(97uri.clone(),98&|| {99let CloudLocation { prefix, .. } =100CloudLocation::new(uri.clone(), false).unwrap();101let cloud_path = object_path_from_str(&prefix)?;102let object_store = object_store.clone();103104Ok(Arc::new(CloudFileFetcher {105uri: uri.clone(),106object_store,107cloud_path,108}))109},110file_cache_ttl,111)112}113}))114.await115} else {116let mut out = Vec::with_capacity(uri_list.len());117for uri in uri_list {118let uri = tokio::fs::canonicalize(uri.as_str()).await.map_err(|err| {119let msg = Some(format!("{}: {}", err, uri).into());120PolarsError::IO {121error: err.into(),122msg,123}124})?;125let uri = PlRefPath::try_from_pathbuf(uri)?;126127out.push(FILE_CACHE.init_entry(128uri.clone(),129&|| Ok(Arc::new(LocalFileFetcher::from_uri(uri.clone()))),130file_cache_ttl,131)?)132}133Ok(out)134}135}136137138