Path: blob/main/crates/polars-io/src/cloud/object_store_setup.rs
6939 views
use std::sync::{Arc, LazyLock};12use object_store::ObjectStore;3use object_store::local::LocalFileSystem;4use polars_core::config::{self, verbose_print_sensitive};5use polars_error::{PolarsError, PolarsResult, polars_bail, to_compute_err};6use polars_utils::aliases::PlHashMap;7use polars_utils::pl_str::PlSmallStr;8use polars_utils::{format_pl_smallstr, pl_serialize};9use tokio::sync::RwLock;10use url::Url;1112use super::{CloudLocation, CloudOptions, CloudType, PolarsObjectStore, parse_url};13use crate::cloud::CloudConfig;1415/// Object stores must be cached. Every object-store will do DNS lookups and16/// get rate limited when querying the DNS (can take up to 5s).17/// Other reasons are connection pools that must be shared between as much as possible.18#[allow(clippy::type_complexity)]19static OBJECT_STORE_CACHE: LazyLock<RwLock<PlHashMap<Vec<u8>, PolarsObjectStore>>> =20LazyLock::new(Default::default);2122#[allow(dead_code)]23fn err_missing_feature(feature: &str, scheme: &str) -> PolarsResult<Arc<dyn ObjectStore>> {24polars_bail!(25ComputeError:26"feature '{}' must be enabled in order to use '{}' cloud urls", feature, scheme,27);28}2930/// Get the key of a url for object store registration.31fn url_and_creds_to_key(url: &Url, options: Option<&CloudOptions>) -> Vec<u8> {32// We include credentials as they can expire, so users will send new credentials for the same url.33let cloud_options = options.map(34|CloudOptions {35// Destructure to ensure this breaks if anything changes.36max_retries,37#[cfg(feature = "file_cache")]38file_cache_ttl,39config,40#[cfg(feature = "cloud")]41credential_provider,42}| {43CloudOptions2 {44max_retries: *max_retries,45#[cfg(feature = "file_cache")]46file_cache_ttl: *file_cache_ttl,47config: config.clone(),48#[cfg(feature = "cloud")]49credential_provider: credential_provider.as_ref().map_or(0, |x| x.func_addr()),50}51},52);5354let cache_key = CacheKey {55url_base: format_pl_smallstr!(56"{}",57&url[url::Position::BeforeScheme..url::Position::AfterPort]58),59cloud_options,60};6162verbose_print_sensitive(|| format!("object store cache key: {} {:?}", url, &cache_key));6364return pl_serialize::serialize_to_bytes::<_, false>(&cache_key).unwrap();6566#[derive(Clone, Debug, PartialEq, Hash, Eq)]67#[cfg_attr(feature = "serde", derive(serde::Serialize))]68struct CacheKey {69url_base: PlSmallStr,70cloud_options: Option<CloudOptions2>,71}7273/// Variant of CloudOptions for serializing to a cache key. The credential74/// provider is replaced by the function address.75#[derive(Clone, Debug, PartialEq, Hash, Eq)]76#[cfg_attr(feature = "serde", derive(serde::Serialize))]77struct CloudOptions2 {78max_retries: usize,79#[cfg(feature = "file_cache")]80file_cache_ttl: u64,81config: Option<CloudConfig>,82#[cfg(feature = "cloud")]83credential_provider: usize,84}85}8687/// Construct an object_store `Path` from a string without any encoding/decoding.88pub fn object_path_from_str(path: &str) -> PolarsResult<object_store::path::Path> {89object_store::path::Path::parse(path).map_err(to_compute_err)90}9192#[derive(Debug, Clone)]93pub(crate) struct PolarsObjectStoreBuilder {94url: PlSmallStr,95parsed_url: Url,96#[allow(unused)]97scheme: PlSmallStr,98cloud_type: CloudType,99options: Option<CloudOptions>,100}101102impl PolarsObjectStoreBuilder {103pub(super) async fn build_impl(104&self,105// Whether to clear cached credentials for Python credential providers.106clear_cached_credentials: bool,107) -> PolarsResult<Arc<dyn ObjectStore>> {108let options = self109.options110.as_ref()111.unwrap_or_else(|| CloudOptions::default_static_ref());112113let store = match self.cloud_type {114CloudType::Aws => {115#[cfg(feature = "aws")]116{117let store = options118.build_aws(&self.url, clear_cached_credentials)119.await?;120Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)121}122#[cfg(not(feature = "aws"))]123return err_missing_feature("aws", &self.scheme);124},125CloudType::Gcp => {126#[cfg(feature = "gcp")]127{128let store = options.build_gcp(&self.url, clear_cached_credentials)?;129Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)130}131#[cfg(not(feature = "gcp"))]132return err_missing_feature("gcp", &self.scheme);133},134CloudType::Azure => {135{136#[cfg(feature = "azure")]137{138let store = options.build_azure(&self.url, clear_cached_credentials)?;139Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)140}141}142#[cfg(not(feature = "azure"))]143return err_missing_feature("azure", &self.scheme);144},145CloudType::File => {146let local = LocalFileSystem::new();147Ok::<_, PolarsError>(Arc::new(local) as Arc<dyn ObjectStore>)148},149CloudType::Http => {150{151#[cfg(feature = "http")]152{153let store = options.build_http(&self.url)?;154PolarsResult::Ok(Arc::new(store) as Arc<dyn ObjectStore>)155}156}157#[cfg(not(feature = "http"))]158return err_missing_feature("http", &cloud_location.scheme);159},160CloudType::Hf => panic!("impl error: unresolved hf:// path"),161}?;162163Ok(store)164}165166/// Note: Use `build_impl` for a non-caching version.167pub(super) async fn build(self) -> PolarsResult<PolarsObjectStore> {168let opt_cache_key = match &self.cloud_type {169CloudType::Aws | CloudType::Gcp | CloudType::Azure => Some(url_and_creds_to_key(170&self.parsed_url,171self.options.as_ref(),172)),173CloudType::File | CloudType::Http | CloudType::Hf => None,174};175176let opt_cache_write_guard = if let Some(cache_key) = opt_cache_key.as_deref() {177let cache = OBJECT_STORE_CACHE.read().await;178179if let Some(store) = cache.get(cache_key) {180return Ok(store.clone());181}182183drop(cache);184185let cache = OBJECT_STORE_CACHE.write().await;186187if let Some(store) = cache.get(cache_key) {188return Ok(store.clone());189}190191Some(cache)192} else {193None194};195196let store = self.build_impl(false).await?;197let store = PolarsObjectStore::new_from_inner(store, self);198199if let Some(mut cache) = opt_cache_write_guard {200// Clear the cache if we surpass a certain amount of buckets.201if cache.len() >= 8 {202if config::verbose() {203eprintln!(204"build_object_store: clearing store cache (cache.len(): {})",205cache.len()206);207}208cache.clear()209}210211cache.insert(opt_cache_key.unwrap(), store.clone());212}213214Ok(store)215}216217pub(crate) fn is_azure(&self) -> bool {218matches!(&self.cloud_type, CloudType::Azure)219}220}221222/// Build an [`ObjectStore`] based on the URL and passed in url. Return the cloud location and an implementation of the object store.223pub async fn build_object_store(224url: &str,225#[cfg_attr(226not(any(feature = "aws", feature = "gcp", feature = "azure")),227allow(unused_variables)228)]229options: Option<&CloudOptions>,230glob: bool,231) -> PolarsResult<(CloudLocation, PolarsObjectStore)> {232let parsed = parse_url(url).map_err(to_compute_err)?;233let cloud_location = CloudLocation::from_url(&parsed, glob)?;234let cloud_type = CloudType::from_url(&parsed)?;235236let store = PolarsObjectStoreBuilder {237url: url.into(),238parsed_url: parsed,239scheme: cloud_location.scheme.as_str().into(),240cloud_type,241options: options.cloned(),242}243.build()244.await?;245246Ok((cloud_location, store))247}248249mod test {250#[test]251fn test_object_path_from_str() {252use super::object_path_from_str;253254let path = "%25";255let out = object_path_from_str(path).unwrap();256257assert_eq!(out.as_ref(), path);258}259}260261262