Path: blob/main/crates/polars-io/src/cloud/polars_object_store.rs
8433 views
use std::fmt::Display;1use std::ops::Range;2use std::sync::Arc;34use futures::{Stream, StreamExt as _, TryStreamExt as _};5use hashbrown::hash_map::RawEntryMut;6use object_store::path::Path;7use object_store::{ObjectMeta, ObjectStore, ObjectStoreExt};8use polars_buffer::Buffer;9use polars_core::prelude::{InitHashMaps, PlHashMap};10use polars_error::{PolarsError, PolarsResult};11use polars_utils::pl_path::PlRefPath;12use tokio::io::AsyncWriteExt;1314use crate::metrics::HEAD_RESPONSE_SIZE_ESTIMATE;15use crate::pl_async::{16self, MAX_BUDGET_PER_REQUEST, get_concurrency_limit, get_download_chunk_size,17tune_with_concurrency_budget, with_concurrency_budget,18};1920#[derive(Debug)]21pub struct PolarsObjectStoreError {22pub base_url: PlRefPath,23pub source: object_store::Error,24}2526impl PolarsObjectStoreError {27pub fn from_url(base_url: &PlRefPath) -> impl FnOnce(object_store::Error) -> Self {28|error| Self {29base_url: base_url.clone(),30source: error,31}32}33}3435impl Display for PolarsObjectStoreError {36fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {37write!(38f,39"object-store error: {} (path: {})",40self.source, &self.base_url41)42}43}4445impl std::error::Error for PolarsObjectStoreError {46fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {47Some(&self.source)48}49}5051impl From<PolarsObjectStoreError> for std::io::Error {52fn from(value: PolarsObjectStoreError) -> Self {53std::io::Error::other(value)54}55}5657impl From<PolarsObjectStoreError> for PolarsError {58fn from(value: PolarsObjectStoreError) -> Self {59PolarsError::IO {60error: Arc::new(value.into()),61msg: None,62}63}64}6566mod inner {6768use std::borrow::Cow;69use std::future::Future;70use std::sync::Arc;7172use object_store::ObjectStore;73use polars_core::config;74use polars_error::{PolarsError, PolarsResult};75use polars_utils::relaxed_cell::RelaxedCell;7677use crate::cloud::{ObjectStoreErrorContext, PolarsObjectStoreBuilder};78use crate::metrics::{IOMetrics, OptIOMetrics};7980#[derive(Debug)]81struct Inner {82store: tokio::sync::RwLock<Arc<dyn ObjectStore>>,83builder: PolarsObjectStoreBuilder,84}8586/// Polars wrapper around [`ObjectStore`] functionality. This struct is cheaply cloneable.87#[derive(Clone, Debug)]88pub struct PolarsObjectStore {89inner: Arc<Inner>,90/// Avoid contending the Mutex `lock()` until the first re-build.91initial_store: std::sync::Arc<dyn ObjectStore>,92/// Used for interior mutability. Doesn't need to be shared with other threads so it's not93/// inside `Arc<>`.94rebuilt: RelaxedCell<bool>,95io_metrics: OptIOMetrics,96}9798impl PolarsObjectStore {99pub(crate) fn new_from_inner(100store: Arc<dyn ObjectStore>,101builder: PolarsObjectStoreBuilder,102) -> Self {103let initial_store = store.clone();104Self {105inner: Arc::new(Inner {106store: tokio::sync::RwLock::new(store),107builder,108}),109initial_store,110rebuilt: RelaxedCell::from(false),111io_metrics: OptIOMetrics(None),112}113}114115pub fn set_io_metrics(&mut self, io_metrics: Option<Arc<IOMetrics>>) -> &mut Self {116self.io_metrics = OptIOMetrics(io_metrics);117self118}119120pub fn io_metrics(&self) -> &OptIOMetrics {121&self.io_metrics122}123124/// Gets the underlying [`ObjectStore`] implementation.125async fn to_dyn_object_store(&self) -> Cow<'_, Arc<dyn ObjectStore>> {126if !self.rebuilt.load() {127Cow::Borrowed(&self.initial_store)128} else {129Cow::Owned(self.inner.store.read().await.clone())130}131}132133pub async fn rebuild_inner(134&self,135from_version: &Arc<dyn ObjectStore>,136) -> PolarsResult<Arc<dyn ObjectStore>> {137let mut current_store = self.inner.store.write().await;138139// If this does not eq, then `inner` was already re-built by another thread.140if Arc::ptr_eq(&*current_store, from_version) {141*current_store =142self.inner143.builder144.clone()145.build_impl(true)146.await147.map_err(|e| {148e.wrap_msg(|e| format!("attempt to rebuild object store failed: {e}"))149})?;150}151152self.rebuilt.store(true);153154Ok((*current_store).clone())155}156157pub async fn exec_with_rebuild_retry_on_err<'s, 'f, Fn, Fut, O>(158&'s self,159mut func: Fn,160) -> PolarsResult<O>161where162Fn: FnMut(Cow<'s, Arc<dyn ObjectStore>>) -> Fut + 'f,163Fut: Future<Output = object_store::Result<O>>,164{165let store = self.to_dyn_object_store().await;166167let out = func(store.clone()).await;168169let orig_err = match out {170Ok(v) => return Ok(v),171Err(e) => e,172};173174if config::verbose() {175eprintln!(176"[PolarsObjectStore]: got error: {}, will rebuild store and retry",177&orig_err178);179}180181let store = self182.rebuild_inner(&store)183.await184.map_err(|e| e.wrap_msg(|e| format!("{e}; original error: {orig_err}")))?;185186func(Cow::Owned(store)).await.map_err(|e| {187let e: PolarsError = self.error_context().attach_err_info(e).into();188189if self.inner.builder.is_azure()190&& std::env::var("POLARS_AUTO_USE_AZURE_STORAGE_ACCOUNT_KEY").as_deref()191!= Ok("1")192{193// Note: This error is intended for Python audiences. The logic for retrieving194// these keys exist only on the Python side.195e.wrap_msg(|e| {196format!(197"{e}; note: if you are using Python, consider setting \198POLARS_AUTO_USE_AZURE_STORAGE_ACCOUNT_KEY=1 if you would like polars to try to retrieve \199and use the storage account keys from Azure CLI to authenticate"200)201})202} else {203e204}205})206}207208pub fn error_context(&self) -> ObjectStoreErrorContext {209ObjectStoreErrorContext::new(self.inner.builder.path().clone())210}211}212}213214#[derive(Clone)]215pub struct ObjectStoreErrorContext {216path: PlRefPath,217}218219impl ObjectStoreErrorContext {220pub fn new(path: PlRefPath) -> Self {221Self { path }222}223224pub fn attach_err_info(self, err: object_store::Error) -> PolarsObjectStoreError {225let ObjectStoreErrorContext { path } = self;226227PolarsObjectStoreError {228base_url: path,229source: err,230}231}232}233234pub use inner::PolarsObjectStore;235236pub type ObjectStorePath = object_store::path::Path;237238impl PolarsObjectStore {239pub fn build_buffered_ranges_stream<'a, T: Iterator<Item = Range<usize>>>(240&'a self,241path: &'a Path,242ranges: T,243) -> impl Stream<Item = PolarsResult<Buffer<u8>>> + use<'a, T> {244futures::stream::iter(ranges.map(move |range| async move {245if range.is_empty() {246return Ok(Buffer::new());247}248249let out = self250.io_metrics()251.record_io_read(252range.len() as u64,253self.exec_with_rebuild_retry_on_err(|s| async move {254s.get_range(path, range.start as u64..range.end as u64)255.await256}),257)258.await?;259260Ok(Buffer::from_owner(out))261}))262// Add a limit locally as this gets run inside a single `tune_with_concurrency_budget`.263.buffered(get_concurrency_limit() as usize)264}265266pub async fn get_range(&self, path: &Path, range: Range<usize>) -> PolarsResult<Buffer<u8>> {267if range.is_empty() {268return Ok(Buffer::new());269}270271let parts = split_range(range.clone());272273if parts.len() == 1 {274let out = tune_with_concurrency_budget(1, move || async move {275let bytes = self276.io_metrics()277.record_io_read(278range.len() as u64,279self.exec_with_rebuild_retry_on_err(|s| async move {280s.get_range(path, range.start as u64..range.end as u64)281.await282}),283)284.await?;285286PolarsResult::Ok(Buffer::from_owner(bytes))287})288.await?;289290Ok(out)291} else {292let parts = tune_with_concurrency_budget(293parts.len().clamp(0, MAX_BUDGET_PER_REQUEST) as u32,294|| {295self.build_buffered_ranges_stream(path, parts)296.try_collect::<Vec<Buffer<u8>>>()297},298)299.await?;300301let mut combined = Vec::with_capacity(range.len());302303for part in parts {304combined.extend_from_slice(&part)305}306307assert_eq!(combined.len(), range.len());308309PolarsResult::Ok(Buffer::from_vec(combined))310}311}312313/// Fetch byte ranges into a HashMap keyed by the range start. This will mutably sort the314/// `ranges` slice for coalescing.315///316/// # Panics317/// Panics if the same range start is used by more than 1 range.318pub async fn get_ranges_sort(319&self,320path: &Path,321ranges: &mut [Range<usize>],322) -> PolarsResult<PlHashMap<usize, Buffer<u8>>> {323if ranges.is_empty() {324return Ok(Default::default());325}326327ranges.sort_unstable_by_key(|x| x.start);328329let ranges_len = ranges.len();330let (merged_ranges, merged_ends): (Vec<_>, Vec<_>) = merge_ranges(ranges).unzip();331332let mut out = PlHashMap::with_capacity(ranges_len);333334let mut stream = self.build_buffered_ranges_stream(path, merged_ranges.iter().cloned());335336tune_with_concurrency_budget(337merged_ranges.len().clamp(0, MAX_BUDGET_PER_REQUEST) as u32,338|| async {339let mut len = 0;340let mut current_offset = 0;341let mut ends_iter = merged_ends.iter();342343let mut splitted_parts = vec![];344345while let Some(bytes) = stream.try_next().await? {346len += bytes.len();347let end = *ends_iter.next().unwrap();348349if end == 0 {350splitted_parts.push(bytes);351continue;352}353354let full_range = ranges[current_offset..end]355.iter()356.cloned()357.reduce(|l, r| l.start.min(r.start)..l.end.max(r.end))358.unwrap();359360let bytes = if splitted_parts.is_empty() {361bytes362} else {363let mut out = Vec::with_capacity(full_range.len());364365for x in splitted_parts.drain(..) {366out.extend_from_slice(&x);367}368369out.extend_from_slice(&bytes);370Buffer::from(out)371};372373assert_eq!(bytes.len(), full_range.len());374375for range in &ranges[current_offset..end] {376let slice = bytes377.clone()378.sliced(range.start - full_range.start..range.end - full_range.start);379380match out.raw_entry_mut().from_key(&range.start) {381RawEntryMut::Vacant(slot) => {382slot.insert(range.start, slice);383},384RawEntryMut::Occupied(mut slot) => {385if slot.get_mut().len() < slice.len() {386*slot.get_mut() = slice;387}388},389}390}391392current_offset = end;393}394395assert!(splitted_parts.is_empty());396397PolarsResult::Ok(pl_async::Size::from(len as u64))398},399)400.await?;401402Ok(out)403}404405pub async fn download(&self, path: &Path, file: &mut tokio::fs::File) -> PolarsResult<()> {406let size = self.head(path).await?.size;407let parts = split_range(0..size as usize);408409tune_with_concurrency_budget(410parts.len().clamp(0, MAX_BUDGET_PER_REQUEST) as u32,411|| async {412let mut stream = self.build_buffered_ranges_stream(path, parts);413let mut len = 0;414while let Some(bytes) = stream.try_next().await? {415len += bytes.len();416file.write_all(&bytes).await?;417}418419assert_eq!(len, size as usize);420421PolarsResult::Ok(pl_async::Size::from(len as u64))422},423)424.await?;425426// Dropping is delayed for tokio async files so we need to explicitly427// flush here (https://github.com/tokio-rs/tokio/issues/2307#issuecomment-596336451).428file.sync_all().await.map_err(PolarsError::from)?;429430Ok(())431}432433/// Fetch the metadata of the parquet file, do not memoize it.434pub async fn head(&self, path: &Path) -> PolarsResult<ObjectMeta> {435with_concurrency_budget(1, || {436self.exec_with_rebuild_retry_on_err(|s| {437async move {438let head_result = self439.io_metrics()440.record_io_read(HEAD_RESPONSE_SIZE_ESTIMATE, s.head(path))441.await;442443if head_result.is_err() {444// Pre-signed URLs forbid the HEAD method, but we can still retrieve the header445// information with a range 0-1 request.446let get_range_0_1_result = self447.io_metrics()448.record_io_read(449HEAD_RESPONSE_SIZE_ESTIMATE + 1,450s.get_opts(451path,452object_store::GetOptions {453range: Some((0..1).into()),454..Default::default()455},456),457)458.await;459460if let Ok(v) = get_range_0_1_result {461return Ok(v.meta);462}463}464465let out = head_result?;466467Ok(out)468}469})470})471.await472}473}474475/// Splits a single range into multiple smaller ranges, which can be downloaded concurrently for476/// much higher throughput.477fn split_range(range: Range<usize>) -> impl ExactSizeIterator<Item = Range<usize>> {478let chunk_size = get_download_chunk_size();479480// Calculate n_parts such that we are as close as possible to the `chunk_size`.481let n_parts = [482(range.len().div_ceil(chunk_size)).max(1),483(range.len() / chunk_size).max(1),484]485.into_iter()486.min_by_key(|x| (range.len() / *x).abs_diff(chunk_size))487.unwrap();488489let chunk_size = (range.len() / n_parts).max(1);490491assert_eq!(n_parts, (range.len() / chunk_size).max(1));492let bytes_rem = range.len() % chunk_size;493494(0..n_parts).map(move |part_no| {495let (start, end) = if part_no == 0 {496// Download remainder length in the first chunk since it starts downloading first.497let end = range.start + chunk_size + bytes_rem;498let end = if end > range.end { range.end } else { end };499(range.start, end)500} else {501let start = bytes_rem + range.start + part_no * chunk_size;502(start, start + chunk_size)503};504505start..end506})507}508509/// Note: For optimal performance, `ranges` should be sorted. More generally,510/// ranges placed next to each other should also be close in range value.511///512/// # Returns513/// `[(range1, end1), (range2, end2)]`, where:514/// * `range1` contains bytes for the ranges from `ranges[0..end1]`515/// * `range2` contains bytes for the ranges from `ranges[end1..end2]`516/// * etc..517///518/// Note that if an end value is 0, it means the range is a splitted part and should be combined.519fn merge_ranges(ranges: &[Range<usize>]) -> impl Iterator<Item = (Range<usize>, usize)> + '_ {520let chunk_size = get_download_chunk_size();521522let mut current_merged_range = ranges.first().map_or(0..0, Clone::clone);523// Number of fetched bytes excluding excess.524let mut current_n_bytes = current_merged_range.len();525526(0..ranges.len())527.filter_map(move |current_idx| {528let current_idx = 1 + current_idx;529530if current_idx == ranges.len() {531// No more items - flush current state.532Some((current_merged_range.clone(), current_idx))533} else {534let range = ranges[current_idx].clone();535536let new_merged = current_merged_range.start.min(range.start)537..current_merged_range.end.max(range.end);538539// E.g.:540// |--------|541// oo // range1542// oo // range2543// ^^^ // distance = 3, is_overlapping = false544// E.g.:545// |--------|546// ooooo // range1547// ooooo // range2548// ^^ // distance = 2, is_overlapping = true549let (distance, is_overlapping) = {550let l = current_merged_range.end.min(range.end);551let r = current_merged_range.start.max(range.start);552553(r.abs_diff(l), r < l)554};555556let should_merge = is_overlapping || {557let leq_current_len_dist_to_chunk_size = new_merged.len().abs_diff(chunk_size)558<= current_merged_range.len().abs_diff(chunk_size);559let gap_tolerance =560(current_n_bytes.max(range.len()) / 8).clamp(1024 * 1024, 8 * 1024 * 1024);561562leq_current_len_dist_to_chunk_size && distance <= gap_tolerance563};564565if should_merge {566// Merge to existing range567current_merged_range = new_merged;568current_n_bytes += if is_overlapping {569range.len() - distance570} else {571range.len()572};573None574} else {575let out = (current_merged_range.clone(), current_idx);576current_merged_range = range;577current_n_bytes = current_merged_range.len();578Some(out)579}580}581})582.flat_map(|x| {583// Split large individual ranges within the list of ranges.584let (range, end) = x;585let split = split_range(range);586let len = split.len();587588split589.enumerate()590.map(move |(i, range)| (range, if 1 + i == len { end } else { 0 }))591})592}593594#[cfg(test)]595mod tests {596597#[test]598fn test_split_range() {599use super::{get_download_chunk_size, split_range};600601let chunk_size = get_download_chunk_size();602603assert_eq!(chunk_size, 64 * 1024 * 1024);604605#[allow(clippy::single_range_in_vec_init)]606{607// Round-trip empty ranges.608assert_eq!(split_range(0..0).collect::<Vec<_>>(), [0..0]);609assert_eq!(split_range(3..3).collect::<Vec<_>>(), [3..3]);610}611612// Threshold to start splitting to 2 ranges613//614// n - chunk_size == chunk_size - n / 2615// n + n / 2 == 2 * chunk_size616// 3 * n == 4 * chunk_size617// n = 4 * chunk_size / 3618let n = 4 * chunk_size / 3;619620#[allow(clippy::single_range_in_vec_init)]621{622assert_eq!(split_range(0..n).collect::<Vec<_>>(), [0..89478485]);623}624625assert_eq!(626split_range(0..n + 1).collect::<Vec<_>>(),627[0..44739243, 44739243..89478486]628);629630// Threshold to start splitting to 3 ranges631//632// n / 2 - chunk_size == chunk_size - n / 3633// n / 2 + n / 3 == 2 * chunk_size634// 5 * n == 12 * chunk_size635// n == 12 * chunk_size / 5636let n = 12 * chunk_size / 5;637638assert_eq!(639split_range(0..n).collect::<Vec<_>>(),640[0..80530637, 80530637..161061273]641);642643assert_eq!(644split_range(0..n + 1).collect::<Vec<_>>(),645[0..53687092, 53687092..107374183, 107374183..161061274]646);647}648649#[test]650fn test_merge_ranges() {651use super::{get_download_chunk_size, merge_ranges};652653let chunk_size = get_download_chunk_size();654655assert_eq!(chunk_size, 64 * 1024 * 1024);656657// Round-trip empty slice658assert_eq!(merge_ranges(&[]).collect::<Vec<_>>(), []);659660// We have 1 tiny request followed by 1 huge request. They are combined as it reduces the661// `abs_diff()` to the `chunk_size`, but afterwards they are split to 2 evenly sized662// requests.663assert_eq!(664merge_ranges(&[0..1, 1..127 * 1024 * 1024]).collect::<Vec<_>>(),665[(0..66584576, 0), (66584576..133169152, 2)]666);667668// <= 1MiB gap, merge669assert_eq!(670merge_ranges(&[0..1, 1024 * 1024 + 1..1024 * 1024 + 2]).collect::<Vec<_>>(),671[(0..1048578, 2)]672);673674// > 1MiB gap, do not merge675assert_eq!(676merge_ranges(&[0..1, 1024 * 1024 + 2..1024 * 1024 + 3]).collect::<Vec<_>>(),677[(0..1, 1), (1048578..1048579, 2)]678);679680// <= 12.5% gap, merge681assert_eq!(682merge_ranges(&[0..8, 10..11]).collect::<Vec<_>>(),683[(0..11, 2)]684);685686// <= 12.5% gap relative to RHS, merge687assert_eq!(688merge_ranges(&[0..1, 3..11]).collect::<Vec<_>>(),689[(0..11, 2)]690);691692// Overlapping range, merge693assert_eq!(694merge_ranges(&[0..80 * 1024 * 1024, 10 * 1024 * 1024..70 * 1024 * 1024])695.collect::<Vec<_>>(),696[(0..80 * 1024 * 1024, 2)]697);698}699}700701702