Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/cloud/object_store_setup.rs
6939 views
1
use std::sync::{Arc, LazyLock};
2
3
use object_store::ObjectStore;
4
use object_store::local::LocalFileSystem;
5
use polars_core::config::{self, verbose_print_sensitive};
6
use polars_error::{PolarsError, PolarsResult, polars_bail, to_compute_err};
7
use polars_utils::aliases::PlHashMap;
8
use polars_utils::pl_str::PlSmallStr;
9
use polars_utils::{format_pl_smallstr, pl_serialize};
10
use tokio::sync::RwLock;
11
use url::Url;
12
13
use super::{CloudLocation, CloudOptions, CloudType, PolarsObjectStore, parse_url};
14
use crate::cloud::CloudConfig;
15
16
/// Object stores must be cached. Every object-store will do DNS lookups and
17
/// get rate limited when querying the DNS (can take up to 5s).
18
/// Other reasons are connection pools that must be shared between as much as possible.
19
#[allow(clippy::type_complexity)]
20
static OBJECT_STORE_CACHE: LazyLock<RwLock<PlHashMap<Vec<u8>, PolarsObjectStore>>> =
21
LazyLock::new(Default::default);
22
23
#[allow(dead_code)]
24
fn err_missing_feature(feature: &str, scheme: &str) -> PolarsResult<Arc<dyn ObjectStore>> {
25
polars_bail!(
26
ComputeError:
27
"feature '{}' must be enabled in order to use '{}' cloud urls", feature, scheme,
28
);
29
}
30
31
/// Get the key of a url for object store registration.
32
fn url_and_creds_to_key(url: &Url, options: Option<&CloudOptions>) -> Vec<u8> {
33
// We include credentials as they can expire, so users will send new credentials for the same url.
34
let cloud_options = options.map(
35
|CloudOptions {
36
// Destructure to ensure this breaks if anything changes.
37
max_retries,
38
#[cfg(feature = "file_cache")]
39
file_cache_ttl,
40
config,
41
#[cfg(feature = "cloud")]
42
credential_provider,
43
}| {
44
CloudOptions2 {
45
max_retries: *max_retries,
46
#[cfg(feature = "file_cache")]
47
file_cache_ttl: *file_cache_ttl,
48
config: config.clone(),
49
#[cfg(feature = "cloud")]
50
credential_provider: credential_provider.as_ref().map_or(0, |x| x.func_addr()),
51
}
52
},
53
);
54
55
let cache_key = CacheKey {
56
url_base: format_pl_smallstr!(
57
"{}",
58
&url[url::Position::BeforeScheme..url::Position::AfterPort]
59
),
60
cloud_options,
61
};
62
63
verbose_print_sensitive(|| format!("object store cache key: {} {:?}", url, &cache_key));
64
65
return pl_serialize::serialize_to_bytes::<_, false>(&cache_key).unwrap();
66
67
#[derive(Clone, Debug, PartialEq, Hash, Eq)]
68
#[cfg_attr(feature = "serde", derive(serde::Serialize))]
69
struct CacheKey {
70
url_base: PlSmallStr,
71
cloud_options: Option<CloudOptions2>,
72
}
73
74
/// Variant of CloudOptions for serializing to a cache key. The credential
75
/// provider is replaced by the function address.
76
#[derive(Clone, Debug, PartialEq, Hash, Eq)]
77
#[cfg_attr(feature = "serde", derive(serde::Serialize))]
78
struct CloudOptions2 {
79
max_retries: usize,
80
#[cfg(feature = "file_cache")]
81
file_cache_ttl: u64,
82
config: Option<CloudConfig>,
83
#[cfg(feature = "cloud")]
84
credential_provider: usize,
85
}
86
}
87
88
/// Construct an object_store `Path` from a string without any encoding/decoding.
89
pub fn object_path_from_str(path: &str) -> PolarsResult<object_store::path::Path> {
90
object_store::path::Path::parse(path).map_err(to_compute_err)
91
}
92
93
#[derive(Debug, Clone)]
94
pub(crate) struct PolarsObjectStoreBuilder {
95
url: PlSmallStr,
96
parsed_url: Url,
97
#[allow(unused)]
98
scheme: PlSmallStr,
99
cloud_type: CloudType,
100
options: Option<CloudOptions>,
101
}
102
103
impl PolarsObjectStoreBuilder {
104
pub(super) async fn build_impl(
105
&self,
106
// Whether to clear cached credentials for Python credential providers.
107
clear_cached_credentials: bool,
108
) -> PolarsResult<Arc<dyn ObjectStore>> {
109
let options = self
110
.options
111
.as_ref()
112
.unwrap_or_else(|| CloudOptions::default_static_ref());
113
114
let store = match self.cloud_type {
115
CloudType::Aws => {
116
#[cfg(feature = "aws")]
117
{
118
let store = options
119
.build_aws(&self.url, clear_cached_credentials)
120
.await?;
121
Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
122
}
123
#[cfg(not(feature = "aws"))]
124
return err_missing_feature("aws", &self.scheme);
125
},
126
CloudType::Gcp => {
127
#[cfg(feature = "gcp")]
128
{
129
let store = options.build_gcp(&self.url, clear_cached_credentials)?;
130
Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
131
}
132
#[cfg(not(feature = "gcp"))]
133
return err_missing_feature("gcp", &self.scheme);
134
},
135
CloudType::Azure => {
136
{
137
#[cfg(feature = "azure")]
138
{
139
let store = options.build_azure(&self.url, clear_cached_credentials)?;
140
Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
141
}
142
}
143
#[cfg(not(feature = "azure"))]
144
return err_missing_feature("azure", &self.scheme);
145
},
146
CloudType::File => {
147
let local = LocalFileSystem::new();
148
Ok::<_, PolarsError>(Arc::new(local) as Arc<dyn ObjectStore>)
149
},
150
CloudType::Http => {
151
{
152
#[cfg(feature = "http")]
153
{
154
let store = options.build_http(&self.url)?;
155
PolarsResult::Ok(Arc::new(store) as Arc<dyn ObjectStore>)
156
}
157
}
158
#[cfg(not(feature = "http"))]
159
return err_missing_feature("http", &cloud_location.scheme);
160
},
161
CloudType::Hf => panic!("impl error: unresolved hf:// path"),
162
}?;
163
164
Ok(store)
165
}
166
167
/// Note: Use `build_impl` for a non-caching version.
168
pub(super) async fn build(self) -> PolarsResult<PolarsObjectStore> {
169
let opt_cache_key = match &self.cloud_type {
170
CloudType::Aws | CloudType::Gcp | CloudType::Azure => Some(url_and_creds_to_key(
171
&self.parsed_url,
172
self.options.as_ref(),
173
)),
174
CloudType::File | CloudType::Http | CloudType::Hf => None,
175
};
176
177
let opt_cache_write_guard = if let Some(cache_key) = opt_cache_key.as_deref() {
178
let cache = OBJECT_STORE_CACHE.read().await;
179
180
if let Some(store) = cache.get(cache_key) {
181
return Ok(store.clone());
182
}
183
184
drop(cache);
185
186
let cache = OBJECT_STORE_CACHE.write().await;
187
188
if let Some(store) = cache.get(cache_key) {
189
return Ok(store.clone());
190
}
191
192
Some(cache)
193
} else {
194
None
195
};
196
197
let store = self.build_impl(false).await?;
198
let store = PolarsObjectStore::new_from_inner(store, self);
199
200
if let Some(mut cache) = opt_cache_write_guard {
201
// Clear the cache if we surpass a certain amount of buckets.
202
if cache.len() >= 8 {
203
if config::verbose() {
204
eprintln!(
205
"build_object_store: clearing store cache (cache.len(): {})",
206
cache.len()
207
);
208
}
209
cache.clear()
210
}
211
212
cache.insert(opt_cache_key.unwrap(), store.clone());
213
}
214
215
Ok(store)
216
}
217
218
pub(crate) fn is_azure(&self) -> bool {
219
matches!(&self.cloud_type, CloudType::Azure)
220
}
221
}
222
223
/// Build an [`ObjectStore`] based on the URL and passed in url. Return the cloud location and an implementation of the object store.
224
pub async fn build_object_store(
225
url: &str,
226
#[cfg_attr(
227
not(any(feature = "aws", feature = "gcp", feature = "azure")),
228
allow(unused_variables)
229
)]
230
options: Option<&CloudOptions>,
231
glob: bool,
232
) -> PolarsResult<(CloudLocation, PolarsObjectStore)> {
233
let parsed = parse_url(url).map_err(to_compute_err)?;
234
let cloud_location = CloudLocation::from_url(&parsed, glob)?;
235
let cloud_type = CloudType::from_url(&parsed)?;
236
237
let store = PolarsObjectStoreBuilder {
238
url: url.into(),
239
parsed_url: parsed,
240
scheme: cloud_location.scheme.as_str().into(),
241
cloud_type,
242
options: options.cloned(),
243
}
244
.build()
245
.await?;
246
247
Ok((cloud_location, store))
248
}
249
250
mod test {
251
#[test]
252
fn test_object_path_from_str() {
253
use super::object_path_from_str;
254
255
let path = "%25";
256
let out = object_path_from_str(path).unwrap();
257
258
assert_eq!(out.as_ref(), path);
259
}
260
}
261
262