Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/utils/byte_source.rs
6939 views
1
use std::ops::Range;
2
use std::sync::Arc;
3
4
use polars_core::prelude::PlHashMap;
5
use polars_error::PolarsResult;
6
use polars_utils::_limit_path_len_io_err;
7
use polars_utils::mmap::MemSlice;
8
9
use crate::cloud::{
10
CloudLocation, CloudOptions, ObjectStorePath, PolarsObjectStore, build_object_store,
11
object_path_from_str,
12
};
13
14
#[allow(async_fn_in_trait)]
15
pub trait ByteSource: Send + Sync {
16
async fn get_size(&self) -> PolarsResult<usize>;
17
/// # Panics
18
/// Panics if `range` is not in bounds.
19
async fn get_range(&self, range: Range<usize>) -> PolarsResult<MemSlice>;
20
/// Note: This will mutably sort ranges for coalescing.
21
async fn get_ranges(
22
&self,
23
ranges: &mut [Range<usize>],
24
) -> PolarsResult<PlHashMap<usize, MemSlice>>;
25
}
26
27
/// Byte source backed by a `MemSlice`, which can potentially be memory-mapped.
28
pub struct MemSliceByteSource(pub MemSlice);
29
30
impl MemSliceByteSource {
31
async fn try_new_mmap_from_path(
32
path: &str,
33
_cloud_options: Option<&CloudOptions>,
34
) -> PolarsResult<Self> {
35
let file = Arc::new(
36
tokio::fs::File::open(path)
37
.await
38
.map_err(|err| _limit_path_len_io_err(path.as_ref(), err))?
39
.into_std()
40
.await,
41
);
42
43
Ok(Self(MemSlice::from_file(file.as_ref())?))
44
}
45
}
46
47
impl ByteSource for MemSliceByteSource {
48
async fn get_size(&self) -> PolarsResult<usize> {
49
Ok(self.0.as_ref().len())
50
}
51
52
async fn get_range(&self, range: Range<usize>) -> PolarsResult<MemSlice> {
53
let out = self.0.slice(range);
54
Ok(out)
55
}
56
57
async fn get_ranges(
58
&self,
59
ranges: &mut [Range<usize>],
60
) -> PolarsResult<PlHashMap<usize, MemSlice>> {
61
Ok(ranges
62
.iter()
63
.map(|x| (x.start, self.0.slice(x.clone())))
64
.collect())
65
}
66
}
67
68
pub struct ObjectStoreByteSource {
69
store: PolarsObjectStore,
70
path: ObjectStorePath,
71
}
72
73
impl ObjectStoreByteSource {
74
async fn try_new_from_path(
75
path: &str,
76
cloud_options: Option<&CloudOptions>,
77
) -> PolarsResult<Self> {
78
let (CloudLocation { prefix, .. }, store) =
79
build_object_store(path, cloud_options, false).await?;
80
let path = object_path_from_str(&prefix)?;
81
82
Ok(Self { store, path })
83
}
84
}
85
86
impl ByteSource for ObjectStoreByteSource {
87
async fn get_size(&self) -> PolarsResult<usize> {
88
Ok(self.store.head(&self.path).await?.size as usize)
89
}
90
91
async fn get_range(&self, range: Range<usize>) -> PolarsResult<MemSlice> {
92
let bytes = self.store.get_range(&self.path, range).await?;
93
let mem_slice = MemSlice::from_bytes(bytes);
94
95
Ok(mem_slice)
96
}
97
98
async fn get_ranges(
99
&self,
100
ranges: &mut [Range<usize>],
101
) -> PolarsResult<PlHashMap<usize, MemSlice>> {
102
self.store.get_ranges_sort(&self.path, ranges).await
103
}
104
}
105
106
/// Dynamic dispatch to async functions.
107
pub enum DynByteSource {
108
MemSlice(MemSliceByteSource),
109
Cloud(ObjectStoreByteSource),
110
}
111
112
impl DynByteSource {
113
pub fn variant_name(&self) -> &str {
114
match self {
115
Self::MemSlice(_) => "MemSlice",
116
Self::Cloud(_) => "Cloud",
117
}
118
}
119
}
120
121
impl Default for DynByteSource {
122
fn default() -> Self {
123
Self::MemSlice(MemSliceByteSource(MemSlice::default()))
124
}
125
}
126
127
impl ByteSource for DynByteSource {
128
async fn get_size(&self) -> PolarsResult<usize> {
129
match self {
130
Self::MemSlice(v) => v.get_size().await,
131
Self::Cloud(v) => v.get_size().await,
132
}
133
}
134
135
async fn get_range(&self, range: Range<usize>) -> PolarsResult<MemSlice> {
136
match self {
137
Self::MemSlice(v) => v.get_range(range).await,
138
Self::Cloud(v) => v.get_range(range).await,
139
}
140
}
141
142
async fn get_ranges(
143
&self,
144
ranges: &mut [Range<usize>],
145
) -> PolarsResult<PlHashMap<usize, MemSlice>> {
146
match self {
147
Self::MemSlice(v) => v.get_ranges(ranges).await,
148
Self::Cloud(v) => v.get_ranges(ranges).await,
149
}
150
}
151
}
152
153
impl From<MemSliceByteSource> for DynByteSource {
154
fn from(value: MemSliceByteSource) -> Self {
155
Self::MemSlice(value)
156
}
157
}
158
159
impl From<ObjectStoreByteSource> for DynByteSource {
160
fn from(value: ObjectStoreByteSource) -> Self {
161
Self::Cloud(value)
162
}
163
}
164
165
impl From<MemSlice> for DynByteSource {
166
fn from(value: MemSlice) -> Self {
167
Self::MemSlice(MemSliceByteSource(value))
168
}
169
}
170
171
#[derive(Clone, Debug)]
172
pub enum DynByteSourceBuilder {
173
Mmap,
174
/// Supports both cloud and local files.
175
ObjectStore,
176
}
177
178
impl DynByteSourceBuilder {
179
pub async fn try_build_from_path(
180
&self,
181
path: &str,
182
cloud_options: Option<&CloudOptions>,
183
) -> PolarsResult<DynByteSource> {
184
Ok(match self {
185
Self::Mmap => MemSliceByteSource::try_new_mmap_from_path(path, cloud_options)
186
.await?
187
.into(),
188
Self::ObjectStore => ObjectStoreByteSource::try_new_from_path(path, cloud_options)
189
.await?
190
.into(),
191
})
192
}
193
}
194
195