Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/cloud/polars_object_store.rs
8433 views
1
use std::fmt::Display;
2
use std::ops::Range;
3
use std::sync::Arc;
4
5
use futures::{Stream, StreamExt as _, TryStreamExt as _};
6
use hashbrown::hash_map::RawEntryMut;
7
use object_store::path::Path;
8
use object_store::{ObjectMeta, ObjectStore, ObjectStoreExt};
9
use polars_buffer::Buffer;
10
use polars_core::prelude::{InitHashMaps, PlHashMap};
11
use polars_error::{PolarsError, PolarsResult};
12
use polars_utils::pl_path::PlRefPath;
13
use tokio::io::AsyncWriteExt;
14
15
use crate::metrics::HEAD_RESPONSE_SIZE_ESTIMATE;
16
use crate::pl_async::{
17
self, MAX_BUDGET_PER_REQUEST, get_concurrency_limit, get_download_chunk_size,
18
tune_with_concurrency_budget, with_concurrency_budget,
19
};
20
21
#[derive(Debug)]
22
pub struct PolarsObjectStoreError {
23
pub base_url: PlRefPath,
24
pub source: object_store::Error,
25
}
26
27
impl PolarsObjectStoreError {
28
pub fn from_url(base_url: &PlRefPath) -> impl FnOnce(object_store::Error) -> Self {
29
|error| Self {
30
base_url: base_url.clone(),
31
source: error,
32
}
33
}
34
}
35
36
impl Display for PolarsObjectStoreError {
37
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38
write!(
39
f,
40
"object-store error: {} (path: {})",
41
self.source, &self.base_url
42
)
43
}
44
}
45
46
impl std::error::Error for PolarsObjectStoreError {
47
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
48
Some(&self.source)
49
}
50
}
51
52
impl From<PolarsObjectStoreError> for std::io::Error {
53
fn from(value: PolarsObjectStoreError) -> Self {
54
std::io::Error::other(value)
55
}
56
}
57
58
impl From<PolarsObjectStoreError> for PolarsError {
59
fn from(value: PolarsObjectStoreError) -> Self {
60
PolarsError::IO {
61
error: Arc::new(value.into()),
62
msg: None,
63
}
64
}
65
}
66
67
mod inner {
68
69
use std::borrow::Cow;
70
use std::future::Future;
71
use std::sync::Arc;
72
73
use object_store::ObjectStore;
74
use polars_core::config;
75
use polars_error::{PolarsError, PolarsResult};
76
use polars_utils::relaxed_cell::RelaxedCell;
77
78
use crate::cloud::{ObjectStoreErrorContext, PolarsObjectStoreBuilder};
79
use crate::metrics::{IOMetrics, OptIOMetrics};
80
81
#[derive(Debug)]
82
struct Inner {
83
store: tokio::sync::RwLock<Arc<dyn ObjectStore>>,
84
builder: PolarsObjectStoreBuilder,
85
}
86
87
/// Polars wrapper around [`ObjectStore`] functionality. This struct is cheaply cloneable.
88
#[derive(Clone, Debug)]
89
pub struct PolarsObjectStore {
90
inner: Arc<Inner>,
91
/// Avoid contending the Mutex `lock()` until the first re-build.
92
initial_store: std::sync::Arc<dyn ObjectStore>,
93
/// Used for interior mutability. Doesn't need to be shared with other threads so it's not
94
/// inside `Arc<>`.
95
rebuilt: RelaxedCell<bool>,
96
io_metrics: OptIOMetrics,
97
}
98
99
impl PolarsObjectStore {
100
pub(crate) fn new_from_inner(
101
store: Arc<dyn ObjectStore>,
102
builder: PolarsObjectStoreBuilder,
103
) -> Self {
104
let initial_store = store.clone();
105
Self {
106
inner: Arc::new(Inner {
107
store: tokio::sync::RwLock::new(store),
108
builder,
109
}),
110
initial_store,
111
rebuilt: RelaxedCell::from(false),
112
io_metrics: OptIOMetrics(None),
113
}
114
}
115
116
pub fn set_io_metrics(&mut self, io_metrics: Option<Arc<IOMetrics>>) -> &mut Self {
117
self.io_metrics = OptIOMetrics(io_metrics);
118
self
119
}
120
121
pub fn io_metrics(&self) -> &OptIOMetrics {
122
&self.io_metrics
123
}
124
125
/// Gets the underlying [`ObjectStore`] implementation.
126
async fn to_dyn_object_store(&self) -> Cow<'_, Arc<dyn ObjectStore>> {
127
if !self.rebuilt.load() {
128
Cow::Borrowed(&self.initial_store)
129
} else {
130
Cow::Owned(self.inner.store.read().await.clone())
131
}
132
}
133
134
pub async fn rebuild_inner(
135
&self,
136
from_version: &Arc<dyn ObjectStore>,
137
) -> PolarsResult<Arc<dyn ObjectStore>> {
138
let mut current_store = self.inner.store.write().await;
139
140
// If this does not eq, then `inner` was already re-built by another thread.
141
if Arc::ptr_eq(&*current_store, from_version) {
142
*current_store =
143
self.inner
144
.builder
145
.clone()
146
.build_impl(true)
147
.await
148
.map_err(|e| {
149
e.wrap_msg(|e| format!("attempt to rebuild object store failed: {e}"))
150
})?;
151
}
152
153
self.rebuilt.store(true);
154
155
Ok((*current_store).clone())
156
}
157
158
pub async fn exec_with_rebuild_retry_on_err<'s, 'f, Fn, Fut, O>(
159
&'s self,
160
mut func: Fn,
161
) -> PolarsResult<O>
162
where
163
Fn: FnMut(Cow<'s, Arc<dyn ObjectStore>>) -> Fut + 'f,
164
Fut: Future<Output = object_store::Result<O>>,
165
{
166
let store = self.to_dyn_object_store().await;
167
168
let out = func(store.clone()).await;
169
170
let orig_err = match out {
171
Ok(v) => return Ok(v),
172
Err(e) => e,
173
};
174
175
if config::verbose() {
176
eprintln!(
177
"[PolarsObjectStore]: got error: {}, will rebuild store and retry",
178
&orig_err
179
);
180
}
181
182
let store = self
183
.rebuild_inner(&store)
184
.await
185
.map_err(|e| e.wrap_msg(|e| format!("{e}; original error: {orig_err}")))?;
186
187
func(Cow::Owned(store)).await.map_err(|e| {
188
let e: PolarsError = self.error_context().attach_err_info(e).into();
189
190
if self.inner.builder.is_azure()
191
&& std::env::var("POLARS_AUTO_USE_AZURE_STORAGE_ACCOUNT_KEY").as_deref()
192
!= Ok("1")
193
{
194
// Note: This error is intended for Python audiences. The logic for retrieving
195
// these keys exist only on the Python side.
196
e.wrap_msg(|e| {
197
format!(
198
"{e}; note: if you are using Python, consider setting \
199
POLARS_AUTO_USE_AZURE_STORAGE_ACCOUNT_KEY=1 if you would like polars to try to retrieve \
200
and use the storage account keys from Azure CLI to authenticate"
201
)
202
})
203
} else {
204
e
205
}
206
})
207
}
208
209
pub fn error_context(&self) -> ObjectStoreErrorContext {
210
ObjectStoreErrorContext::new(self.inner.builder.path().clone())
211
}
212
}
213
}
214
215
#[derive(Clone)]
216
pub struct ObjectStoreErrorContext {
217
path: PlRefPath,
218
}
219
220
impl ObjectStoreErrorContext {
221
pub fn new(path: PlRefPath) -> Self {
222
Self { path }
223
}
224
225
pub fn attach_err_info(self, err: object_store::Error) -> PolarsObjectStoreError {
226
let ObjectStoreErrorContext { path } = self;
227
228
PolarsObjectStoreError {
229
base_url: path,
230
source: err,
231
}
232
}
233
}
234
235
pub use inner::PolarsObjectStore;
236
237
pub type ObjectStorePath = object_store::path::Path;
238
239
impl PolarsObjectStore {
240
pub fn build_buffered_ranges_stream<'a, T: Iterator<Item = Range<usize>>>(
241
&'a self,
242
path: &'a Path,
243
ranges: T,
244
) -> impl Stream<Item = PolarsResult<Buffer<u8>>> + use<'a, T> {
245
futures::stream::iter(ranges.map(move |range| async move {
246
if range.is_empty() {
247
return Ok(Buffer::new());
248
}
249
250
let out = self
251
.io_metrics()
252
.record_io_read(
253
range.len() as u64,
254
self.exec_with_rebuild_retry_on_err(|s| async move {
255
s.get_range(path, range.start as u64..range.end as u64)
256
.await
257
}),
258
)
259
.await?;
260
261
Ok(Buffer::from_owner(out))
262
}))
263
// Add a limit locally as this gets run inside a single `tune_with_concurrency_budget`.
264
.buffered(get_concurrency_limit() as usize)
265
}
266
267
pub async fn get_range(&self, path: &Path, range: Range<usize>) -> PolarsResult<Buffer<u8>> {
268
if range.is_empty() {
269
return Ok(Buffer::new());
270
}
271
272
let parts = split_range(range.clone());
273
274
if parts.len() == 1 {
275
let out = tune_with_concurrency_budget(1, move || async move {
276
let bytes = self
277
.io_metrics()
278
.record_io_read(
279
range.len() as u64,
280
self.exec_with_rebuild_retry_on_err(|s| async move {
281
s.get_range(path, range.start as u64..range.end as u64)
282
.await
283
}),
284
)
285
.await?;
286
287
PolarsResult::Ok(Buffer::from_owner(bytes))
288
})
289
.await?;
290
291
Ok(out)
292
} else {
293
let parts = tune_with_concurrency_budget(
294
parts.len().clamp(0, MAX_BUDGET_PER_REQUEST) as u32,
295
|| {
296
self.build_buffered_ranges_stream(path, parts)
297
.try_collect::<Vec<Buffer<u8>>>()
298
},
299
)
300
.await?;
301
302
let mut combined = Vec::with_capacity(range.len());
303
304
for part in parts {
305
combined.extend_from_slice(&part)
306
}
307
308
assert_eq!(combined.len(), range.len());
309
310
PolarsResult::Ok(Buffer::from_vec(combined))
311
}
312
}
313
314
/// Fetch byte ranges into a HashMap keyed by the range start. This will mutably sort the
315
/// `ranges` slice for coalescing.
316
///
317
/// # Panics
318
/// Panics if the same range start is used by more than 1 range.
319
pub async fn get_ranges_sort(
320
&self,
321
path: &Path,
322
ranges: &mut [Range<usize>],
323
) -> PolarsResult<PlHashMap<usize, Buffer<u8>>> {
324
if ranges.is_empty() {
325
return Ok(Default::default());
326
}
327
328
ranges.sort_unstable_by_key(|x| x.start);
329
330
let ranges_len = ranges.len();
331
let (merged_ranges, merged_ends): (Vec<_>, Vec<_>) = merge_ranges(ranges).unzip();
332
333
let mut out = PlHashMap::with_capacity(ranges_len);
334
335
let mut stream = self.build_buffered_ranges_stream(path, merged_ranges.iter().cloned());
336
337
tune_with_concurrency_budget(
338
merged_ranges.len().clamp(0, MAX_BUDGET_PER_REQUEST) as u32,
339
|| async {
340
let mut len = 0;
341
let mut current_offset = 0;
342
let mut ends_iter = merged_ends.iter();
343
344
let mut splitted_parts = vec![];
345
346
while let Some(bytes) = stream.try_next().await? {
347
len += bytes.len();
348
let end = *ends_iter.next().unwrap();
349
350
if end == 0 {
351
splitted_parts.push(bytes);
352
continue;
353
}
354
355
let full_range = ranges[current_offset..end]
356
.iter()
357
.cloned()
358
.reduce(|l, r| l.start.min(r.start)..l.end.max(r.end))
359
.unwrap();
360
361
let bytes = if splitted_parts.is_empty() {
362
bytes
363
} else {
364
let mut out = Vec::with_capacity(full_range.len());
365
366
for x in splitted_parts.drain(..) {
367
out.extend_from_slice(&x);
368
}
369
370
out.extend_from_slice(&bytes);
371
Buffer::from(out)
372
};
373
374
assert_eq!(bytes.len(), full_range.len());
375
376
for range in &ranges[current_offset..end] {
377
let slice = bytes
378
.clone()
379
.sliced(range.start - full_range.start..range.end - full_range.start);
380
381
match out.raw_entry_mut().from_key(&range.start) {
382
RawEntryMut::Vacant(slot) => {
383
slot.insert(range.start, slice);
384
},
385
RawEntryMut::Occupied(mut slot) => {
386
if slot.get_mut().len() < slice.len() {
387
*slot.get_mut() = slice;
388
}
389
},
390
}
391
}
392
393
current_offset = end;
394
}
395
396
assert!(splitted_parts.is_empty());
397
398
PolarsResult::Ok(pl_async::Size::from(len as u64))
399
},
400
)
401
.await?;
402
403
Ok(out)
404
}
405
406
pub async fn download(&self, path: &Path, file: &mut tokio::fs::File) -> PolarsResult<()> {
407
let size = self.head(path).await?.size;
408
let parts = split_range(0..size as usize);
409
410
tune_with_concurrency_budget(
411
parts.len().clamp(0, MAX_BUDGET_PER_REQUEST) as u32,
412
|| async {
413
let mut stream = self.build_buffered_ranges_stream(path, parts);
414
let mut len = 0;
415
while let Some(bytes) = stream.try_next().await? {
416
len += bytes.len();
417
file.write_all(&bytes).await?;
418
}
419
420
assert_eq!(len, size as usize);
421
422
PolarsResult::Ok(pl_async::Size::from(len as u64))
423
},
424
)
425
.await?;
426
427
// Dropping is delayed for tokio async files so we need to explicitly
428
// flush here (https://github.com/tokio-rs/tokio/issues/2307#issuecomment-596336451).
429
file.sync_all().await.map_err(PolarsError::from)?;
430
431
Ok(())
432
}
433
434
/// Fetch the metadata of the parquet file, do not memoize it.
435
pub async fn head(&self, path: &Path) -> PolarsResult<ObjectMeta> {
436
with_concurrency_budget(1, || {
437
self.exec_with_rebuild_retry_on_err(|s| {
438
async move {
439
let head_result = self
440
.io_metrics()
441
.record_io_read(HEAD_RESPONSE_SIZE_ESTIMATE, s.head(path))
442
.await;
443
444
if head_result.is_err() {
445
// Pre-signed URLs forbid the HEAD method, but we can still retrieve the header
446
// information with a range 0-1 request.
447
let get_range_0_1_result = self
448
.io_metrics()
449
.record_io_read(
450
HEAD_RESPONSE_SIZE_ESTIMATE + 1,
451
s.get_opts(
452
path,
453
object_store::GetOptions {
454
range: Some((0..1).into()),
455
..Default::default()
456
},
457
),
458
)
459
.await;
460
461
if let Ok(v) = get_range_0_1_result {
462
return Ok(v.meta);
463
}
464
}
465
466
let out = head_result?;
467
468
Ok(out)
469
}
470
})
471
})
472
.await
473
}
474
}
475
476
/// Splits a single range into multiple smaller ranges, which can be downloaded concurrently for
477
/// much higher throughput.
478
fn split_range(range: Range<usize>) -> impl ExactSizeIterator<Item = Range<usize>> {
479
let chunk_size = get_download_chunk_size();
480
481
// Calculate n_parts such that we are as close as possible to the `chunk_size`.
482
let n_parts = [
483
(range.len().div_ceil(chunk_size)).max(1),
484
(range.len() / chunk_size).max(1),
485
]
486
.into_iter()
487
.min_by_key(|x| (range.len() / *x).abs_diff(chunk_size))
488
.unwrap();
489
490
let chunk_size = (range.len() / n_parts).max(1);
491
492
assert_eq!(n_parts, (range.len() / chunk_size).max(1));
493
let bytes_rem = range.len() % chunk_size;
494
495
(0..n_parts).map(move |part_no| {
496
let (start, end) = if part_no == 0 {
497
// Download remainder length in the first chunk since it starts downloading first.
498
let end = range.start + chunk_size + bytes_rem;
499
let end = if end > range.end { range.end } else { end };
500
(range.start, end)
501
} else {
502
let start = bytes_rem + range.start + part_no * chunk_size;
503
(start, start + chunk_size)
504
};
505
506
start..end
507
})
508
}
509
510
/// Note: For optimal performance, `ranges` should be sorted. More generally,
511
/// ranges placed next to each other should also be close in range value.
512
///
513
/// # Returns
514
/// `[(range1, end1), (range2, end2)]`, where:
515
/// * `range1` contains bytes for the ranges from `ranges[0..end1]`
516
/// * `range2` contains bytes for the ranges from `ranges[end1..end2]`
517
/// * etc..
518
///
519
/// Note that if an end value is 0, it means the range is a splitted part and should be combined.
520
fn merge_ranges(ranges: &[Range<usize>]) -> impl Iterator<Item = (Range<usize>, usize)> + '_ {
521
let chunk_size = get_download_chunk_size();
522
523
let mut current_merged_range = ranges.first().map_or(0..0, Clone::clone);
524
// Number of fetched bytes excluding excess.
525
let mut current_n_bytes = current_merged_range.len();
526
527
(0..ranges.len())
528
.filter_map(move |current_idx| {
529
let current_idx = 1 + current_idx;
530
531
if current_idx == ranges.len() {
532
// No more items - flush current state.
533
Some((current_merged_range.clone(), current_idx))
534
} else {
535
let range = ranges[current_idx].clone();
536
537
let new_merged = current_merged_range.start.min(range.start)
538
..current_merged_range.end.max(range.end);
539
540
// E.g.:
541
// |--------|
542
// oo // range1
543
// oo // range2
544
// ^^^ // distance = 3, is_overlapping = false
545
// E.g.:
546
// |--------|
547
// ooooo // range1
548
// ooooo // range2
549
// ^^ // distance = 2, is_overlapping = true
550
let (distance, is_overlapping) = {
551
let l = current_merged_range.end.min(range.end);
552
let r = current_merged_range.start.max(range.start);
553
554
(r.abs_diff(l), r < l)
555
};
556
557
let should_merge = is_overlapping || {
558
let leq_current_len_dist_to_chunk_size = new_merged.len().abs_diff(chunk_size)
559
<= current_merged_range.len().abs_diff(chunk_size);
560
let gap_tolerance =
561
(current_n_bytes.max(range.len()) / 8).clamp(1024 * 1024, 8 * 1024 * 1024);
562
563
leq_current_len_dist_to_chunk_size && distance <= gap_tolerance
564
};
565
566
if should_merge {
567
// Merge to existing range
568
current_merged_range = new_merged;
569
current_n_bytes += if is_overlapping {
570
range.len() - distance
571
} else {
572
range.len()
573
};
574
None
575
} else {
576
let out = (current_merged_range.clone(), current_idx);
577
current_merged_range = range;
578
current_n_bytes = current_merged_range.len();
579
Some(out)
580
}
581
}
582
})
583
.flat_map(|x| {
584
// Split large individual ranges within the list of ranges.
585
let (range, end) = x;
586
let split = split_range(range);
587
let len = split.len();
588
589
split
590
.enumerate()
591
.map(move |(i, range)| (range, if 1 + i == len { end } else { 0 }))
592
})
593
}
594
595
#[cfg(test)]
596
mod tests {
597
598
#[test]
599
fn test_split_range() {
600
use super::{get_download_chunk_size, split_range};
601
602
let chunk_size = get_download_chunk_size();
603
604
assert_eq!(chunk_size, 64 * 1024 * 1024);
605
606
#[allow(clippy::single_range_in_vec_init)]
607
{
608
// Round-trip empty ranges.
609
assert_eq!(split_range(0..0).collect::<Vec<_>>(), [0..0]);
610
assert_eq!(split_range(3..3).collect::<Vec<_>>(), [3..3]);
611
}
612
613
// Threshold to start splitting to 2 ranges
614
//
615
// n - chunk_size == chunk_size - n / 2
616
// n + n / 2 == 2 * chunk_size
617
// 3 * n == 4 * chunk_size
618
// n = 4 * chunk_size / 3
619
let n = 4 * chunk_size / 3;
620
621
#[allow(clippy::single_range_in_vec_init)]
622
{
623
assert_eq!(split_range(0..n).collect::<Vec<_>>(), [0..89478485]);
624
}
625
626
assert_eq!(
627
split_range(0..n + 1).collect::<Vec<_>>(),
628
[0..44739243, 44739243..89478486]
629
);
630
631
// Threshold to start splitting to 3 ranges
632
//
633
// n / 2 - chunk_size == chunk_size - n / 3
634
// n / 2 + n / 3 == 2 * chunk_size
635
// 5 * n == 12 * chunk_size
636
// n == 12 * chunk_size / 5
637
let n = 12 * chunk_size / 5;
638
639
assert_eq!(
640
split_range(0..n).collect::<Vec<_>>(),
641
[0..80530637, 80530637..161061273]
642
);
643
644
assert_eq!(
645
split_range(0..n + 1).collect::<Vec<_>>(),
646
[0..53687092, 53687092..107374183, 107374183..161061274]
647
);
648
}
649
650
#[test]
651
fn test_merge_ranges() {
652
use super::{get_download_chunk_size, merge_ranges};
653
654
let chunk_size = get_download_chunk_size();
655
656
assert_eq!(chunk_size, 64 * 1024 * 1024);
657
658
// Round-trip empty slice
659
assert_eq!(merge_ranges(&[]).collect::<Vec<_>>(), []);
660
661
// We have 1 tiny request followed by 1 huge request. They are combined as it reduces the
662
// `abs_diff()` to the `chunk_size`, but afterwards they are split to 2 evenly sized
663
// requests.
664
assert_eq!(
665
merge_ranges(&[0..1, 1..127 * 1024 * 1024]).collect::<Vec<_>>(),
666
[(0..66584576, 0), (66584576..133169152, 2)]
667
);
668
669
// <= 1MiB gap, merge
670
assert_eq!(
671
merge_ranges(&[0..1, 1024 * 1024 + 1..1024 * 1024 + 2]).collect::<Vec<_>>(),
672
[(0..1048578, 2)]
673
);
674
675
// > 1MiB gap, do not merge
676
assert_eq!(
677
merge_ranges(&[0..1, 1024 * 1024 + 2..1024 * 1024 + 3]).collect::<Vec<_>>(),
678
[(0..1, 1), (1048578..1048579, 2)]
679
);
680
681
// <= 12.5% gap, merge
682
assert_eq!(
683
merge_ranges(&[0..8, 10..11]).collect::<Vec<_>>(),
684
[(0..11, 2)]
685
);
686
687
// <= 12.5% gap relative to RHS, merge
688
assert_eq!(
689
merge_ranges(&[0..1, 3..11]).collect::<Vec<_>>(),
690
[(0..11, 2)]
691
);
692
693
// Overlapping range, merge
694
assert_eq!(
695
merge_ranges(&[0..80 * 1024 * 1024, 10 * 1024 * 1024..70 * 1024 * 1024])
696
.collect::<Vec<_>>(),
697
[(0..80 * 1024 * 1024, 2)]
698
);
699
}
700
}
701
702