Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-io/src/utils/byte_source.rs
8412 views
1
use std::ops::Range;
2
use std::path::Path;
3
use std::sync::Arc;
4
5
use polars_buffer::Buffer;
6
use polars_core::prelude::PlHashMap;
7
use polars_error::{PolarsResult, feature_gated};
8
use polars_utils::_limit_path_len_io_err;
9
use polars_utils::mmap::MMapSemaphore;
10
use polars_utils::pl_path::PlRefPath;
11
12
use crate::cloud::options::CloudOptions;
13
#[cfg(feature = "cloud")]
14
use crate::cloud::{
15
CloudLocation, ObjectStorePath, PolarsObjectStore, build_object_store, object_path_from_str,
16
};
17
use crate::metrics::IOMetrics;
18
19
#[allow(async_fn_in_trait)]
20
pub trait ByteSource: Send + Sync {
21
async fn get_size(&self) -> PolarsResult<usize>;
22
/// # Panics
23
/// Panics if `range` is not in bounds.
24
async fn get_range(&self, range: Range<usize>) -> PolarsResult<Buffer<u8>>;
25
/// Note: This will mutably sort ranges for coalescing.
26
async fn get_ranges(
27
&self,
28
ranges: &mut [Range<usize>],
29
) -> PolarsResult<PlHashMap<usize, Buffer<u8>>>;
30
}
31
32
/// Byte source backed by a `Buffer`, which can potentially be memory-mapped.
33
pub struct BufferByteSource(pub Buffer<u8>);
34
35
impl BufferByteSource {
36
async fn try_new_mmap_from_path(
37
path: &Path,
38
_cloud_options: Option<&CloudOptions>,
39
) -> PolarsResult<Self> {
40
let file = Arc::new(
41
tokio::fs::File::open(path)
42
.await
43
.map_err(|err| _limit_path_len_io_err(path, err))?
44
.into_std()
45
.await,
46
);
47
48
Ok(Self(Buffer::from_owner(MMapSemaphore::new_from_file(
49
&file,
50
)?)))
51
}
52
}
53
54
impl ByteSource for BufferByteSource {
55
async fn get_size(&self) -> PolarsResult<usize> {
56
Ok(self.0.as_ref().len())
57
}
58
59
async fn get_range(&self, range: Range<usize>) -> PolarsResult<Buffer<u8>> {
60
let out = self.0.clone().sliced(range);
61
Ok(out)
62
}
63
64
async fn get_ranges(
65
&self,
66
ranges: &mut [Range<usize>],
67
) -> PolarsResult<PlHashMap<usize, Buffer<u8>>> {
68
Ok(ranges
69
.iter()
70
.map(|x| (x.start, self.0.clone().sliced(x.clone())))
71
.collect())
72
}
73
}
74
75
#[cfg(feature = "cloud")]
76
pub struct ObjectStoreByteSource {
77
store: PolarsObjectStore,
78
path: ObjectStorePath,
79
}
80
81
#[cfg(feature = "cloud")]
82
impl ObjectStoreByteSource {
83
async fn try_new_from_path(
84
path: PlRefPath,
85
cloud_options: Option<&CloudOptions>,
86
io_metrics: Option<Arc<IOMetrics>>,
87
) -> PolarsResult<Self> {
88
let (CloudLocation { prefix, .. }, mut store) =
89
build_object_store(path, cloud_options, false).await?;
90
let path = object_path_from_str(&prefix)?;
91
92
store.set_io_metrics(io_metrics);
93
94
Ok(Self { store, path })
95
}
96
}
97
98
#[cfg(feature = "cloud")]
99
impl ByteSource for ObjectStoreByteSource {
100
async fn get_size(&self) -> PolarsResult<usize> {
101
Ok(self.store.head(&self.path).await?.size as usize)
102
}
103
104
async fn get_range(&self, range: Range<usize>) -> PolarsResult<Buffer<u8>> {
105
self.store.get_range(&self.path, range).await
106
}
107
108
async fn get_ranges(
109
&self,
110
ranges: &mut [Range<usize>],
111
) -> PolarsResult<PlHashMap<usize, Buffer<u8>>> {
112
self.store.get_ranges_sort(&self.path, ranges).await
113
}
114
}
115
116
/// Dynamic dispatch to async functions.
117
pub enum DynByteSource {
118
Buffer(BufferByteSource),
119
#[cfg(feature = "cloud")]
120
Cloud(ObjectStoreByteSource),
121
}
122
123
impl DynByteSource {
124
pub fn variant_name(&self) -> &str {
125
match self {
126
Self::Buffer(_) => "Buffer",
127
#[cfg(feature = "cloud")]
128
Self::Cloud(_) => "Cloud",
129
}
130
}
131
}
132
133
impl Default for DynByteSource {
134
fn default() -> Self {
135
Self::Buffer(BufferByteSource(Buffer::new()))
136
}
137
}
138
139
impl ByteSource for DynByteSource {
140
async fn get_size(&self) -> PolarsResult<usize> {
141
match self {
142
Self::Buffer(v) => v.get_size().await,
143
#[cfg(feature = "cloud")]
144
Self::Cloud(v) => v.get_size().await,
145
}
146
}
147
148
async fn get_range(&self, range: Range<usize>) -> PolarsResult<Buffer<u8>> {
149
match self {
150
Self::Buffer(v) => v.get_range(range).await,
151
#[cfg(feature = "cloud")]
152
Self::Cloud(v) => v.get_range(range).await,
153
}
154
}
155
156
async fn get_ranges(
157
&self,
158
ranges: &mut [Range<usize>],
159
) -> PolarsResult<PlHashMap<usize, Buffer<u8>>> {
160
match self {
161
Self::Buffer(v) => v.get_ranges(ranges).await,
162
#[cfg(feature = "cloud")]
163
Self::Cloud(v) => v.get_ranges(ranges).await,
164
}
165
}
166
}
167
168
impl From<BufferByteSource> for DynByteSource {
169
fn from(value: BufferByteSource) -> Self {
170
Self::Buffer(value)
171
}
172
}
173
174
#[cfg(feature = "cloud")]
175
impl From<ObjectStoreByteSource> for DynByteSource {
176
fn from(value: ObjectStoreByteSource) -> Self {
177
Self::Cloud(value)
178
}
179
}
180
181
impl From<Buffer<u8>> for DynByteSource {
182
fn from(value: Buffer<u8>) -> Self {
183
Self::Buffer(BufferByteSource(value))
184
}
185
}
186
187
#[derive(Clone, Debug)]
188
pub enum DynByteSourceBuilder {
189
Mmap,
190
/// Supports both cloud and local files, requires cloud feature.
191
ObjectStore,
192
}
193
194
impl DynByteSourceBuilder {
195
pub async fn try_build_from_path(
196
&self,
197
path: PlRefPath,
198
cloud_options: Option<&CloudOptions>,
199
io_metrics: Option<Arc<IOMetrics>>,
200
) -> PolarsResult<DynByteSource> {
201
Ok(match self {
202
Self::Mmap => {
203
BufferByteSource::try_new_mmap_from_path(path.as_std_path(), cloud_options)
204
.await?
205
.into()
206
},
207
Self::ObjectStore => feature_gated!("cloud", {
208
ObjectStoreByteSource::try_new_from_path(path, cloud_options, io_metrics)
209
.await?
210
.into()
211
}),
212
})
213
}
214
}
215
216