Path: blob/main/crates/polars-io/src/utils/byte_source.rs
6939 views
use std::ops::Range;1use std::sync::Arc;23use polars_core::prelude::PlHashMap;4use polars_error::PolarsResult;5use polars_utils::_limit_path_len_io_err;6use polars_utils::mmap::MemSlice;78use crate::cloud::{9CloudLocation, CloudOptions, ObjectStorePath, PolarsObjectStore, build_object_store,10object_path_from_str,11};1213#[allow(async_fn_in_trait)]14pub trait ByteSource: Send + Sync {15async fn get_size(&self) -> PolarsResult<usize>;16/// # Panics17/// Panics if `range` is not in bounds.18async fn get_range(&self, range: Range<usize>) -> PolarsResult<MemSlice>;19/// Note: This will mutably sort ranges for coalescing.20async fn get_ranges(21&self,22ranges: &mut [Range<usize>],23) -> PolarsResult<PlHashMap<usize, MemSlice>>;24}2526/// Byte source backed by a `MemSlice`, which can potentially be memory-mapped.27pub struct MemSliceByteSource(pub MemSlice);2829impl MemSliceByteSource {30async fn try_new_mmap_from_path(31path: &str,32_cloud_options: Option<&CloudOptions>,33) -> PolarsResult<Self> {34let file = Arc::new(35tokio::fs::File::open(path)36.await37.map_err(|err| _limit_path_len_io_err(path.as_ref(), err))?38.into_std()39.await,40);4142Ok(Self(MemSlice::from_file(file.as_ref())?))43}44}4546impl ByteSource for MemSliceByteSource {47async fn get_size(&self) -> PolarsResult<usize> {48Ok(self.0.as_ref().len())49}5051async fn get_range(&self, range: Range<usize>) -> PolarsResult<MemSlice> {52let out = self.0.slice(range);53Ok(out)54}5556async fn get_ranges(57&self,58ranges: &mut [Range<usize>],59) -> PolarsResult<PlHashMap<usize, MemSlice>> {60Ok(ranges61.iter()62.map(|x| (x.start, self.0.slice(x.clone())))63.collect())64}65}6667pub struct ObjectStoreByteSource {68store: PolarsObjectStore,69path: ObjectStorePath,70}7172impl ObjectStoreByteSource {73async fn try_new_from_path(74path: &str,75cloud_options: Option<&CloudOptions>,76) -> PolarsResult<Self> {77let (CloudLocation { prefix, .. }, store) =78build_object_store(path, cloud_options, false).await?;79let path = object_path_from_str(&prefix)?;8081Ok(Self { store, path })82}83}8485impl ByteSource for ObjectStoreByteSource {86async fn get_size(&self) -> PolarsResult<usize> {87Ok(self.store.head(&self.path).await?.size as usize)88}8990async fn get_range(&self, range: Range<usize>) -> PolarsResult<MemSlice> {91let bytes = self.store.get_range(&self.path, range).await?;92let mem_slice = MemSlice::from_bytes(bytes);9394Ok(mem_slice)95}9697async fn get_ranges(98&self,99ranges: &mut [Range<usize>],100) -> PolarsResult<PlHashMap<usize, MemSlice>> {101self.store.get_ranges_sort(&self.path, ranges).await102}103}104105/// Dynamic dispatch to async functions.106pub enum DynByteSource {107MemSlice(MemSliceByteSource),108Cloud(ObjectStoreByteSource),109}110111impl DynByteSource {112pub fn variant_name(&self) -> &str {113match self {114Self::MemSlice(_) => "MemSlice",115Self::Cloud(_) => "Cloud",116}117}118}119120impl Default for DynByteSource {121fn default() -> Self {122Self::MemSlice(MemSliceByteSource(MemSlice::default()))123}124}125126impl ByteSource for DynByteSource {127async fn get_size(&self) -> PolarsResult<usize> {128match self {129Self::MemSlice(v) => v.get_size().await,130Self::Cloud(v) => v.get_size().await,131}132}133134async fn get_range(&self, range: Range<usize>) -> PolarsResult<MemSlice> {135match self {136Self::MemSlice(v) => v.get_range(range).await,137Self::Cloud(v) => v.get_range(range).await,138}139}140141async fn get_ranges(142&self,143ranges: &mut [Range<usize>],144) -> PolarsResult<PlHashMap<usize, MemSlice>> {145match self {146Self::MemSlice(v) => v.get_ranges(ranges).await,147Self::Cloud(v) => v.get_ranges(ranges).await,148}149}150}151152impl From<MemSliceByteSource> for DynByteSource {153fn from(value: MemSliceByteSource) -> Self {154Self::MemSlice(value)155}156}157158impl From<ObjectStoreByteSource> for DynByteSource {159fn from(value: ObjectStoreByteSource) -> Self {160Self::Cloud(value)161}162}163164impl From<MemSlice> for DynByteSource {165fn from(value: MemSlice) -> Self {166Self::MemSlice(MemSliceByteSource(value))167}168}169170#[derive(Clone, Debug)]171pub enum DynByteSourceBuilder {172Mmap,173/// Supports both cloud and local files.174ObjectStore,175}176177impl DynByteSourceBuilder {178pub async fn try_build_from_path(179&self,180path: &str,181cloud_options: Option<&CloudOptions>,182) -> PolarsResult<DynByteSource> {183Ok(match self {184Self::Mmap => MemSliceByteSource::try_new_mmap_from_path(path, cloud_options)185.await?186.into(),187Self::ObjectStore => ObjectStoreByteSource::try_new_from_path(path, cloud_options)188.await?189.into(),190})191}192}193194195