Path: blob/main/crates/polars-io/src/file_cache/file_fetcher.rs
6939 views
use std::sync::Arc;12use polars_error::{PolarsError, PolarsResult};34use super::metadata::FileVersion;5use super::utils::last_modified_u64;6use crate::cloud::PolarsObjectStore;7use crate::pl_async;89pub trait FileFetcher: Send + Sync {10fn get_uri(&self) -> &Arc<str>;11fn fetch_metadata(&self) -> PolarsResult<RemoteMetadata>;12/// Fetches the object to a `local_path`.13fn fetch(&self, local_path: &std::path::Path) -> PolarsResult<()>;14fn fetches_as_symlink(&self) -> bool;15}1617pub struct RemoteMetadata {18pub size: u64,19pub(super) version: FileVersion,20}2122/// A struct that fetches data from local disk and stores it into the `cache`.23/// Mostly used for debugging, it only ever gets called if `POLARS_FORCE_ASYNC` is set.24pub(super) struct LocalFileFetcher {25uri: Arc<str>,26path: Box<std::path::Path>,27}2829impl LocalFileFetcher {30pub(super) fn from_uri(uri: Arc<str>) -> Self {31let path = std::path::PathBuf::from(uri.as_ref()).into_boxed_path();32debug_assert_eq!(33path,34std::fs::canonicalize(&path).unwrap().into_boxed_path()35);3637Self { uri, path }38}39}4041impl FileFetcher for LocalFileFetcher {42fn get_uri(&self) -> &Arc<str> {43&self.uri44}4546fn fetches_as_symlink(&self) -> bool {47#[cfg(target_family = "unix")]48{49true50}51#[cfg(not(target_family = "unix"))]52{53false54}55}5657fn fetch_metadata(&self) -> PolarsResult<RemoteMetadata> {58let metadata = std::fs::metadata(&self.path).map_err(PolarsError::from)?;5960Ok(RemoteMetadata {61size: metadata.len(),62version: FileVersion::Timestamp(last_modified_u64(&metadata)),63})64}6566fn fetch(&self, local_path: &std::path::Path) -> PolarsResult<()> {67#[cfg(target_family = "unix")]68{69std::os::unix::fs::symlink(&self.path, local_path).map_err(PolarsError::from)70}71#[cfg(not(target_family = "unix"))]72{73std::fs::copy(&self.path, local_path).map_err(PolarsError::from)?;74Ok(())75}76}77}7879pub(super) struct CloudFileFetcher {80pub(super) uri: Arc<str>,81pub(super) cloud_path: object_store::path::Path,82pub(super) object_store: PolarsObjectStore,83}8485impl FileFetcher for CloudFileFetcher {86fn get_uri(&self) -> &Arc<str> {87&self.uri88}8990fn fetches_as_symlink(&self) -> bool {91false92}9394fn fetch_metadata(&self) -> PolarsResult<RemoteMetadata> {95let metadata =96pl_async::get_runtime().block_in_place_on(self.object_store.head(&self.cloud_path))?;9798Ok(RemoteMetadata {99size: metadata.size as u64,100version: metadata101.e_tag102.map(|x| FileVersion::ETag(blake3::hash(x.as_bytes()).to_hex()[..32].to_string()))103.unwrap_or_else(|| {104FileVersion::Timestamp(metadata.last_modified.timestamp_millis() as u64)105}),106})107}108109fn fetch(&self, local_path: &std::path::Path) -> PolarsResult<()> {110pl_async::get_runtime().block_in_place_on(async {111let file = &mut tokio::fs::OpenOptions::new()112.write(true)113.truncate(true)114.open(local_path)115.await116.map_err(PolarsError::from)?;117118self.object_store.download(&self.cloud_path, file).await119})120}121}122123124