Path: blob/main/crates/polars-io/src/utils/byte_source.rs
8412 views
use std::ops::Range;1use std::path::Path;2use std::sync::Arc;34use polars_buffer::Buffer;5use polars_core::prelude::PlHashMap;6use polars_error::{PolarsResult, feature_gated};7use polars_utils::_limit_path_len_io_err;8use polars_utils::mmap::MMapSemaphore;9use polars_utils::pl_path::PlRefPath;1011use crate::cloud::options::CloudOptions;12#[cfg(feature = "cloud")]13use crate::cloud::{14CloudLocation, ObjectStorePath, PolarsObjectStore, build_object_store, object_path_from_str,15};16use crate::metrics::IOMetrics;1718#[allow(async_fn_in_trait)]19pub trait ByteSource: Send + Sync {20async fn get_size(&self) -> PolarsResult<usize>;21/// # Panics22/// Panics if `range` is not in bounds.23async fn get_range(&self, range: Range<usize>) -> PolarsResult<Buffer<u8>>;24/// Note: This will mutably sort ranges for coalescing.25async fn get_ranges(26&self,27ranges: &mut [Range<usize>],28) -> PolarsResult<PlHashMap<usize, Buffer<u8>>>;29}3031/// Byte source backed by a `Buffer`, which can potentially be memory-mapped.32pub struct BufferByteSource(pub Buffer<u8>);3334impl BufferByteSource {35async fn try_new_mmap_from_path(36path: &Path,37_cloud_options: Option<&CloudOptions>,38) -> PolarsResult<Self> {39let file = Arc::new(40tokio::fs::File::open(path)41.await42.map_err(|err| _limit_path_len_io_err(path, err))?43.into_std()44.await,45);4647Ok(Self(Buffer::from_owner(MMapSemaphore::new_from_file(48&file,49)?)))50}51}5253impl ByteSource for BufferByteSource {54async fn get_size(&self) -> PolarsResult<usize> {55Ok(self.0.as_ref().len())56}5758async fn get_range(&self, range: Range<usize>) -> PolarsResult<Buffer<u8>> {59let out = self.0.clone().sliced(range);60Ok(out)61}6263async fn get_ranges(64&self,65ranges: &mut [Range<usize>],66) -> PolarsResult<PlHashMap<usize, Buffer<u8>>> {67Ok(ranges68.iter()69.map(|x| (x.start, self.0.clone().sliced(x.clone())))70.collect())71}72}7374#[cfg(feature = "cloud")]75pub struct ObjectStoreByteSource {76store: PolarsObjectStore,77path: ObjectStorePath,78}7980#[cfg(feature = "cloud")]81impl ObjectStoreByteSource {82async fn try_new_from_path(83path: PlRefPath,84cloud_options: Option<&CloudOptions>,85io_metrics: Option<Arc<IOMetrics>>,86) -> PolarsResult<Self> {87let (CloudLocation { prefix, .. }, mut store) =88build_object_store(path, cloud_options, false).await?;89let path = object_path_from_str(&prefix)?;9091store.set_io_metrics(io_metrics);9293Ok(Self { store, path })94}95}9697#[cfg(feature = "cloud")]98impl ByteSource for ObjectStoreByteSource {99async fn get_size(&self) -> PolarsResult<usize> {100Ok(self.store.head(&self.path).await?.size as usize)101}102103async fn get_range(&self, range: Range<usize>) -> PolarsResult<Buffer<u8>> {104self.store.get_range(&self.path, range).await105}106107async fn get_ranges(108&self,109ranges: &mut [Range<usize>],110) -> PolarsResult<PlHashMap<usize, Buffer<u8>>> {111self.store.get_ranges_sort(&self.path, ranges).await112}113}114115/// Dynamic dispatch to async functions.116pub enum DynByteSource {117Buffer(BufferByteSource),118#[cfg(feature = "cloud")]119Cloud(ObjectStoreByteSource),120}121122impl DynByteSource {123pub fn variant_name(&self) -> &str {124match self {125Self::Buffer(_) => "Buffer",126#[cfg(feature = "cloud")]127Self::Cloud(_) => "Cloud",128}129}130}131132impl Default for DynByteSource {133fn default() -> Self {134Self::Buffer(BufferByteSource(Buffer::new()))135}136}137138impl ByteSource for DynByteSource {139async fn get_size(&self) -> PolarsResult<usize> {140match self {141Self::Buffer(v) => v.get_size().await,142#[cfg(feature = "cloud")]143Self::Cloud(v) => v.get_size().await,144}145}146147async fn get_range(&self, range: Range<usize>) -> PolarsResult<Buffer<u8>> {148match self {149Self::Buffer(v) => v.get_range(range).await,150#[cfg(feature = "cloud")]151Self::Cloud(v) => v.get_range(range).await,152}153}154155async fn get_ranges(156&self,157ranges: &mut [Range<usize>],158) -> PolarsResult<PlHashMap<usize, Buffer<u8>>> {159match self {160Self::Buffer(v) => v.get_ranges(ranges).await,161#[cfg(feature = "cloud")]162Self::Cloud(v) => v.get_ranges(ranges).await,163}164}165}166167impl From<BufferByteSource> for DynByteSource {168fn from(value: BufferByteSource) -> Self {169Self::Buffer(value)170}171}172173#[cfg(feature = "cloud")]174impl From<ObjectStoreByteSource> for DynByteSource {175fn from(value: ObjectStoreByteSource) -> Self {176Self::Cloud(value)177}178}179180impl From<Buffer<u8>> for DynByteSource {181fn from(value: Buffer<u8>) -> Self {182Self::Buffer(BufferByteSource(value))183}184}185186#[derive(Clone, Debug)]187pub enum DynByteSourceBuilder {188Mmap,189/// Supports both cloud and local files, requires cloud feature.190ObjectStore,191}192193impl DynByteSourceBuilder {194pub async fn try_build_from_path(195&self,196path: PlRefPath,197cloud_options: Option<&CloudOptions>,198io_metrics: Option<Arc<IOMetrics>>,199) -> PolarsResult<DynByteSource> {200Ok(match self {201Self::Mmap => {202BufferByteSource::try_new_mmap_from_path(path.as_std_path(), cloud_options)203.await?204.into()205},206Self::ObjectStore => feature_gated!("cloud", {207ObjectStoreByteSource::try_new_from_path(path, cloud_options, io_metrics)208.await?209.into()210}),211})212}213}214215216