Path: blob/main/crates/polars-io/src/cloud/object_store_setup.rs
8420 views
use std::sync::{Arc, LazyLock};12use object_store::ObjectStore;3use object_store::local::LocalFileSystem;4use polars_core::config::{self, verbose, verbose_print_sensitive};5use polars_error::{PolarsError, PolarsResult, polars_bail, to_compute_err};6use polars_utils::aliases::PlHashMap;7use polars_utils::pl_path::{PlPath, PlRefPath};8use polars_utils::pl_str::PlSmallStr;9use polars_utils::{format_pl_smallstr, pl_serialize};10use tokio::sync::RwLock;1112use super::{CloudLocation, CloudOptions, CloudType, PolarsObjectStore};13use crate::cloud::{CloudConfig, CloudRetryConfig};1415/// Object stores must be cached. Every object-store will do DNS lookups and16/// get rate limited when querying the DNS (can take up to 5s).17/// Other reasons are connection pools that must be shared between as much as possible.18#[allow(clippy::type_complexity)]19static OBJECT_STORE_CACHE: LazyLock<RwLock<PlHashMap<Vec<u8>, PolarsObjectStore>>> =20LazyLock::new(Default::default);2122#[allow(dead_code)]23fn err_missing_feature(24feature: &str,25cloud_type: &CloudType,26) -> PolarsResult<Arc<dyn ObjectStore>> {27polars_bail!(28ComputeError:29"feature '{}' must be enabled in order to use '{:?}' cloud urls",30feature,31cloud_type,32);33}3435/// Get the key of a url for object store registration.36fn path_and_creds_to_key(path: &PlPath, options: Option<&CloudOptions>) -> Vec<u8> {37// We include credentials as they can expire, so users will send new credentials for the same url.38let cloud_options = options.map(39|CloudOptions {40// Destructure to ensure this breaks if anything changes.41#[cfg(feature = "file_cache")]42file_cache_ttl,43config,44retry_config,45#[cfg(feature = "cloud")]46credential_provider,47}| {48CloudOptionsKey {49#[cfg(feature = "file_cache")]50file_cache_ttl: *file_cache_ttl,51config: config.clone(),52retry_config: *retry_config,53#[cfg(feature = "cloud")]54credential_provider: credential_provider.as_ref().map_or(0, |x| x.func_addr()),55}56},57);5859let cache_key = CacheKey {60url_base: format_pl_smallstr!("{}", &path.as_str()[..path.authority_end_position()]),61cloud_options,62};6364verbose_print_sensitive(|| {65format!(66"object store cache key for path at '{}': {:?}",67path, &cache_key68)69});7071return pl_serialize::serialize_to_bytes::<_, false>(&cache_key).unwrap();7273#[derive(Clone, Debug, PartialEq, Hash, Eq)]74#[cfg_attr(feature = "serde", derive(serde::Serialize))]75struct CacheKey {76url_base: PlSmallStr,77cloud_options: Option<CloudOptionsKey>,78}7980/// Variant of CloudOptions for serializing to a cache key. The credential81/// provider is replaced by the function address.82#[derive(Clone, Debug, PartialEq, Hash, Eq)]83#[cfg_attr(feature = "serde", derive(serde::Serialize))]84struct CloudOptionsKey {85#[cfg(feature = "file_cache")]86file_cache_ttl: u64,87config: Option<CloudConfig>,88retry_config: CloudRetryConfig,89#[cfg(feature = "cloud")]90credential_provider: usize,91}92}9394/// Construct an object_store `Path` from a string without any encoding/decoding.95pub fn object_path_from_str(path: &str) -> PolarsResult<object_store::path::Path> {96object_store::path::Path::parse(path).map_err(to_compute_err)97}9899#[derive(Debug, Clone)]100pub(crate) struct PolarsObjectStoreBuilder {101path: PlRefPath,102cloud_type: CloudType,103options: Option<CloudOptions>,104}105106impl PolarsObjectStoreBuilder {107pub(super) fn path(&self) -> &PlRefPath {108&self.path109}110111pub(super) async fn build_impl(112&self,113// Whether to clear cached credentials for Python credential providers.114clear_cached_credentials: bool,115) -> PolarsResult<Arc<dyn ObjectStore>> {116let options = self117.options118.as_ref()119.unwrap_or_else(|| CloudOptions::default_static_ref());120121if let Some(options) = &self.options122&& verbose()123{124eprintln!(125"build object-store: file_cache_ttl: {}",126options.file_cache_ttl127)128}129130let store = match self.cloud_type {131CloudType::Aws => {132#[cfg(feature = "aws")]133{134let store = options135.build_aws(self.path.clone(), clear_cached_credentials)136.await?;137Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)138}139#[cfg(not(feature = "aws"))]140return err_missing_feature("aws", &self.cloud_type);141},142CloudType::Gcp => {143#[cfg(feature = "gcp")]144{145let store = options.build_gcp(self.path.clone(), clear_cached_credentials)?;146147Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)148}149#[cfg(not(feature = "gcp"))]150return err_missing_feature("gcp", &self.cloud_type);151},152CloudType::Azure => {153{154#[cfg(feature = "azure")]155{156let store =157options.build_azure(self.path.clone(), clear_cached_credentials)?;158Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)159}160}161#[cfg(not(feature = "azure"))]162return err_missing_feature("azure", &self.cloud_type);163},164CloudType::File => {165let local = LocalFileSystem::new();166Ok::<_, PolarsError>(Arc::new(local) as Arc<dyn ObjectStore>)167},168CloudType::Http => {169{170#[cfg(feature = "http")]171{172let store = options.build_http(self.path.clone())?;173PolarsResult::Ok(Arc::new(store) as Arc<dyn ObjectStore>)174}175}176#[cfg(not(feature = "http"))]177return err_missing_feature("http", &cloud_location.scheme);178},179CloudType::Hf => panic!("impl error: unresolved hf:// path"),180}?;181182Ok(store)183}184185/// Note: Use `build_impl` for a non-caching version.186pub(super) async fn build(self) -> PolarsResult<PolarsObjectStore> {187let opt_cache_key = match &self.cloud_type {188CloudType::Aws | CloudType::Gcp | CloudType::Azure => {189Some(path_and_creds_to_key(&self.path, self.options.as_ref()))190},191CloudType::File | CloudType::Http | CloudType::Hf => None,192};193194let opt_cache_write_guard = if let Some(cache_key) = opt_cache_key.as_deref() {195let cache = OBJECT_STORE_CACHE.read().await;196197if let Some(store) = cache.get(cache_key) {198return Ok(store.clone());199}200201drop(cache);202203let cache = OBJECT_STORE_CACHE.write().await;204205if let Some(store) = cache.get(cache_key) {206return Ok(store.clone());207}208209Some(cache)210} else {211None212};213214let store = self.build_impl(false).await?;215let store = PolarsObjectStore::new_from_inner(store, self);216217if let Some(mut cache) = opt_cache_write_guard {218// Clear the cache if we surpass a certain amount of buckets.219if cache.len() >= 8 {220if config::verbose() {221eprintln!(222"build_object_store: clearing store cache (cache.len(): {})",223cache.len()224);225}226cache.clear()227}228229cache.insert(opt_cache_key.unwrap(), store.clone());230}231232Ok(store)233}234235pub(crate) fn is_azure(&self) -> bool {236matches!(&self.cloud_type, CloudType::Azure)237}238}239240/// Build an [`ObjectStore`] based on the URL and passed in url. Return the cloud location and an implementation of the object store.241pub async fn build_object_store(242path: PlRefPath,243#[cfg_attr(244not(any(feature = "aws", feature = "gcp", feature = "azure")),245allow(unused_variables)246)]247options: Option<&CloudOptions>,248glob: bool,249) -> PolarsResult<(CloudLocation, PolarsObjectStore)> {250let path = path.to_absolute_path()?.into_owned();251252let cloud_type = path253.scheme()254.map_or(CloudType::File, CloudType::from_cloud_scheme);255let cloud_location = CloudLocation::new(path.clone(), glob)?;256257let store = PolarsObjectStoreBuilder {258path,259cloud_type,260options: options.cloned(),261}262.build()263.await?;264265Ok((cloud_location, store))266}267268mod test {269#[test]270fn test_object_path_from_str() {271use super::object_path_from_str;272273let path = "%25";274let out = object_path_from_str(path).unwrap();275276assert_eq!(out.as_ref(), path);277}278}279280281