Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/cloud/object_store_setup.rs
8420 views
1
use std::sync::{Arc, LazyLock};
2
3
use object_store::ObjectStore;
4
use object_store::local::LocalFileSystem;
5
use polars_core::config::{self, verbose, verbose_print_sensitive};
6
use polars_error::{PolarsError, PolarsResult, polars_bail, to_compute_err};
7
use polars_utils::aliases::PlHashMap;
8
use polars_utils::pl_path::{PlPath, PlRefPath};
9
use polars_utils::pl_str::PlSmallStr;
10
use polars_utils::{format_pl_smallstr, pl_serialize};
11
use tokio::sync::RwLock;
12
13
use super::{CloudLocation, CloudOptions, CloudType, PolarsObjectStore};
14
use crate::cloud::{CloudConfig, CloudRetryConfig};
15
16
/// Object stores must be cached. Every object-store will do DNS lookups and
17
/// get rate limited when querying the DNS (can take up to 5s).
18
/// Other reasons are connection pools that must be shared between as much as possible.
19
#[allow(clippy::type_complexity)]
20
static OBJECT_STORE_CACHE: LazyLock<RwLock<PlHashMap<Vec<u8>, PolarsObjectStore>>> =
21
LazyLock::new(Default::default);
22
23
#[allow(dead_code)]
24
fn err_missing_feature(
25
feature: &str,
26
cloud_type: &CloudType,
27
) -> PolarsResult<Arc<dyn ObjectStore>> {
28
polars_bail!(
29
ComputeError:
30
"feature '{}' must be enabled in order to use '{:?}' cloud urls",
31
feature,
32
cloud_type,
33
);
34
}
35
36
/// Get the key of a url for object store registration.
37
fn path_and_creds_to_key(path: &PlPath, options: Option<&CloudOptions>) -> Vec<u8> {
38
// We include credentials as they can expire, so users will send new credentials for the same url.
39
let cloud_options = options.map(
40
|CloudOptions {
41
// Destructure to ensure this breaks if anything changes.
42
#[cfg(feature = "file_cache")]
43
file_cache_ttl,
44
config,
45
retry_config,
46
#[cfg(feature = "cloud")]
47
credential_provider,
48
}| {
49
CloudOptionsKey {
50
#[cfg(feature = "file_cache")]
51
file_cache_ttl: *file_cache_ttl,
52
config: config.clone(),
53
retry_config: *retry_config,
54
#[cfg(feature = "cloud")]
55
credential_provider: credential_provider.as_ref().map_or(0, |x| x.func_addr()),
56
}
57
},
58
);
59
60
let cache_key = CacheKey {
61
url_base: format_pl_smallstr!("{}", &path.as_str()[..path.authority_end_position()]),
62
cloud_options,
63
};
64
65
verbose_print_sensitive(|| {
66
format!(
67
"object store cache key for path at '{}': {:?}",
68
path, &cache_key
69
)
70
});
71
72
return pl_serialize::serialize_to_bytes::<_, false>(&cache_key).unwrap();
73
74
#[derive(Clone, Debug, PartialEq, Hash, Eq)]
75
#[cfg_attr(feature = "serde", derive(serde::Serialize))]
76
struct CacheKey {
77
url_base: PlSmallStr,
78
cloud_options: Option<CloudOptionsKey>,
79
}
80
81
/// Variant of CloudOptions for serializing to a cache key. The credential
82
/// provider is replaced by the function address.
83
#[derive(Clone, Debug, PartialEq, Hash, Eq)]
84
#[cfg_attr(feature = "serde", derive(serde::Serialize))]
85
struct CloudOptionsKey {
86
#[cfg(feature = "file_cache")]
87
file_cache_ttl: u64,
88
config: Option<CloudConfig>,
89
retry_config: CloudRetryConfig,
90
#[cfg(feature = "cloud")]
91
credential_provider: usize,
92
}
93
}
94
95
/// Construct an object_store `Path` from a string without any encoding/decoding.
96
pub fn object_path_from_str(path: &str) -> PolarsResult<object_store::path::Path> {
97
object_store::path::Path::parse(path).map_err(to_compute_err)
98
}
99
100
#[derive(Debug, Clone)]
101
pub(crate) struct PolarsObjectStoreBuilder {
102
path: PlRefPath,
103
cloud_type: CloudType,
104
options: Option<CloudOptions>,
105
}
106
107
impl PolarsObjectStoreBuilder {
108
pub(super) fn path(&self) -> &PlRefPath {
109
&self.path
110
}
111
112
pub(super) async fn build_impl(
113
&self,
114
// Whether to clear cached credentials for Python credential providers.
115
clear_cached_credentials: bool,
116
) -> PolarsResult<Arc<dyn ObjectStore>> {
117
let options = self
118
.options
119
.as_ref()
120
.unwrap_or_else(|| CloudOptions::default_static_ref());
121
122
if let Some(options) = &self.options
123
&& verbose()
124
{
125
eprintln!(
126
"build object-store: file_cache_ttl: {}",
127
options.file_cache_ttl
128
)
129
}
130
131
let store = match self.cloud_type {
132
CloudType::Aws => {
133
#[cfg(feature = "aws")]
134
{
135
let store = options
136
.build_aws(self.path.clone(), clear_cached_credentials)
137
.await?;
138
Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
139
}
140
#[cfg(not(feature = "aws"))]
141
return err_missing_feature("aws", &self.cloud_type);
142
},
143
CloudType::Gcp => {
144
#[cfg(feature = "gcp")]
145
{
146
let store = options.build_gcp(self.path.clone(), clear_cached_credentials)?;
147
148
Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
149
}
150
#[cfg(not(feature = "gcp"))]
151
return err_missing_feature("gcp", &self.cloud_type);
152
},
153
CloudType::Azure => {
154
{
155
#[cfg(feature = "azure")]
156
{
157
let store =
158
options.build_azure(self.path.clone(), clear_cached_credentials)?;
159
Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
160
}
161
}
162
#[cfg(not(feature = "azure"))]
163
return err_missing_feature("azure", &self.cloud_type);
164
},
165
CloudType::File => {
166
let local = LocalFileSystem::new();
167
Ok::<_, PolarsError>(Arc::new(local) as Arc<dyn ObjectStore>)
168
},
169
CloudType::Http => {
170
{
171
#[cfg(feature = "http")]
172
{
173
let store = options.build_http(self.path.clone())?;
174
PolarsResult::Ok(Arc::new(store) as Arc<dyn ObjectStore>)
175
}
176
}
177
#[cfg(not(feature = "http"))]
178
return err_missing_feature("http", &cloud_location.scheme);
179
},
180
CloudType::Hf => panic!("impl error: unresolved hf:// path"),
181
}?;
182
183
Ok(store)
184
}
185
186
/// Note: Use `build_impl` for a non-caching version.
187
pub(super) async fn build(self) -> PolarsResult<PolarsObjectStore> {
188
let opt_cache_key = match &self.cloud_type {
189
CloudType::Aws | CloudType::Gcp | CloudType::Azure => {
190
Some(path_and_creds_to_key(&self.path, self.options.as_ref()))
191
},
192
CloudType::File | CloudType::Http | CloudType::Hf => None,
193
};
194
195
let opt_cache_write_guard = if let Some(cache_key) = opt_cache_key.as_deref() {
196
let cache = OBJECT_STORE_CACHE.read().await;
197
198
if let Some(store) = cache.get(cache_key) {
199
return Ok(store.clone());
200
}
201
202
drop(cache);
203
204
let cache = OBJECT_STORE_CACHE.write().await;
205
206
if let Some(store) = cache.get(cache_key) {
207
return Ok(store.clone());
208
}
209
210
Some(cache)
211
} else {
212
None
213
};
214
215
let store = self.build_impl(false).await?;
216
let store = PolarsObjectStore::new_from_inner(store, self);
217
218
if let Some(mut cache) = opt_cache_write_guard {
219
// Clear the cache if we surpass a certain amount of buckets.
220
if cache.len() >= 8 {
221
if config::verbose() {
222
eprintln!(
223
"build_object_store: clearing store cache (cache.len(): {})",
224
cache.len()
225
);
226
}
227
cache.clear()
228
}
229
230
cache.insert(opt_cache_key.unwrap(), store.clone());
231
}
232
233
Ok(store)
234
}
235
236
pub(crate) fn is_azure(&self) -> bool {
237
matches!(&self.cloud_type, CloudType::Azure)
238
}
239
}
240
241
/// Build an [`ObjectStore`] based on the URL and passed in url. Return the cloud location and an implementation of the object store.
242
pub async fn build_object_store(
243
path: PlRefPath,
244
#[cfg_attr(
245
not(any(feature = "aws", feature = "gcp", feature = "azure")),
246
allow(unused_variables)
247
)]
248
options: Option<&CloudOptions>,
249
glob: bool,
250
) -> PolarsResult<(CloudLocation, PolarsObjectStore)> {
251
let path = path.to_absolute_path()?.into_owned();
252
253
let cloud_type = path
254
.scheme()
255
.map_or(CloudType::File, CloudType::from_cloud_scheme);
256
let cloud_location = CloudLocation::new(path.clone(), glob)?;
257
258
let store = PolarsObjectStoreBuilder {
259
path,
260
cloud_type,
261
options: options.cloned(),
262
}
263
.build()
264
.await?;
265
266
Ok((cloud_location, store))
267
}
268
269
mod test {
270
#[test]
271
fn test_object_path_from_str() {
272
use super::object_path_from_str;
273
274
let path = "%25";
275
let out = object_path_from_str(path).unwrap();
276
277
assert_eq!(out.as_ref(), path);
278
}
279
}
280
281