Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-arrow/src/buffer/immutable.rs
6939 views
1
#![allow(unsafe_op_in_unsafe_fn)]
2
use std::ops::Deref;
3
4
use bytemuck::{Pod, Zeroable};
5
use either::Either;
6
7
use super::IntoIter;
8
use crate::array::{ArrayAccessor, Splitable};
9
use crate::storage::SharedStorage;
10
11
/// [`Buffer`] is a contiguous memory region that can be shared across
12
/// thread boundaries.
13
///
14
/// The easiest way to think about [`Buffer<T>`] is being equivalent to
15
/// a `Arc<Vec<T>>`, with the following differences:
16
/// * slicing and cloning is `O(1)`.
17
/// * it supports external allocated memory
18
///
19
/// The easiest way to create one is to use its implementation of `From<Vec<T>>`.
20
///
21
/// # Examples
22
/// ```
23
/// use polars_arrow::buffer::Buffer;
24
///
25
/// let mut buffer: Buffer<u32> = vec![1, 2, 3].into();
26
/// assert_eq!(buffer.as_ref(), [1, 2, 3].as_ref());
27
///
28
/// // it supports copy-on-write semantics (i.e. back to a `Vec`)
29
/// let vec: Vec<u32> = buffer.into_mut().right().unwrap();
30
/// assert_eq!(vec, vec![1, 2, 3]);
31
///
32
/// // cloning and slicing is `O(1)` (data is shared)
33
/// let mut buffer: Buffer<u32> = vec![1, 2, 3].into();
34
/// let mut sliced = buffer.clone();
35
/// sliced.slice(1, 1);
36
/// assert_eq!(sliced.as_ref(), [2].as_ref());
37
/// // but cloning forbids getting mut since `slice` and `buffer` now share data
38
/// assert_eq!(buffer.get_mut_slice(), None);
39
/// ```
40
#[derive(Clone)]
41
pub struct Buffer<T> {
42
/// The internal byte buffer.
43
storage: SharedStorage<T>,
44
45
/// A pointer into the buffer where our data starts.
46
ptr: *const T,
47
48
// The length of the buffer.
49
length: usize,
50
}
51
52
unsafe impl<T: Send + Sync> Sync for Buffer<T> {}
53
unsafe impl<T: Send + Sync> Send for Buffer<T> {}
54
55
impl<T: PartialEq> PartialEq for Buffer<T> {
56
#[inline]
57
fn eq(&self, other: &Self) -> bool {
58
self.deref() == other.deref()
59
}
60
}
61
62
impl<T: Eq> Eq for Buffer<T> {}
63
64
impl<T: std::hash::Hash> std::hash::Hash for Buffer<T> {
65
#[inline]
66
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
67
self.as_slice().hash(state);
68
}
69
}
70
71
impl<T: std::fmt::Debug> std::fmt::Debug for Buffer<T> {
72
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73
std::fmt::Debug::fmt(&**self, f)
74
}
75
}
76
77
impl<T> Default for Buffer<T> {
78
#[inline]
79
fn default() -> Self {
80
Vec::new().into()
81
}
82
}
83
84
impl<T> Buffer<T> {
85
/// Creates an empty [`Buffer`].
86
#[inline]
87
pub fn new() -> Self {
88
Self::default()
89
}
90
91
/// Auxiliary method to create a new Buffer
92
pub fn from_storage(storage: SharedStorage<T>) -> Self {
93
let ptr = storage.as_ptr();
94
let length = storage.len();
95
Buffer {
96
storage,
97
ptr,
98
length,
99
}
100
}
101
102
pub fn from_static(data: &'static [T]) -> Self {
103
Self::from_storage(SharedStorage::from_static(data))
104
}
105
106
/// Returns the number of bytes in the buffer
107
#[inline]
108
pub fn len(&self) -> usize {
109
self.length
110
}
111
112
/// Returns whether the buffer is empty.
113
#[inline]
114
pub fn is_empty(&self) -> bool {
115
self.length == 0
116
}
117
118
/// Returns whether underlying data is sliced.
119
/// If sliced the [`Buffer`] is backed by
120
/// more data than the length of `Self`.
121
pub fn is_sliced(&self) -> bool {
122
self.storage.len() != self.length
123
}
124
125
/// Expands this slice to the maximum allowed by the underlying storage.
126
/// Only expands towards the end, the offset isn't changed. That is, element
127
/// i before and after this operation refer to the same element.
128
pub fn expand_end_to_storage(self) -> Self {
129
unsafe {
130
let offset = self.ptr.offset_from(self.storage.as_ptr()) as usize;
131
Self {
132
ptr: self.ptr,
133
length: self.storage.len() - offset,
134
storage: self.storage,
135
}
136
}
137
}
138
139
/// Returns the byte slice stored in this buffer
140
#[inline]
141
pub fn as_slice(&self) -> &[T] {
142
// SAFETY:
143
// invariant of this struct `offset + length <= data.len()`
144
debug_assert!(self.offset() + self.length <= self.storage.len());
145
unsafe { std::slice::from_raw_parts(self.ptr, self.length) }
146
}
147
148
/// Returns the byte slice stored in this buffer
149
///
150
/// # Safety
151
/// `index` must be smaller than `len`
152
#[inline]
153
pub(super) unsafe fn get_unchecked(&self, index: usize) -> &T {
154
// SAFETY:
155
// invariant of this function
156
debug_assert!(index < self.length);
157
unsafe { &*self.ptr.add(index) }
158
}
159
160
/// Returns a new [`Buffer`] that is a slice of this buffer starting at `offset`.
161
/// Doing so allows the same memory region to be shared between buffers.
162
/// # Panics
163
/// Panics iff `offset + length` is larger than `len`.
164
#[inline]
165
pub fn sliced(self, offset: usize, length: usize) -> Self {
166
assert!(
167
offset + length <= self.len(),
168
"the offset of the new Buffer cannot exceed the existing length"
169
);
170
// SAFETY: we just checked bounds
171
unsafe { self.sliced_unchecked(offset, length) }
172
}
173
174
/// Slices this buffer starting at `offset`.
175
/// # Panics
176
/// Panics iff `offset + length` is larger than `len`.
177
#[inline]
178
pub fn slice(&mut self, offset: usize, length: usize) {
179
assert!(
180
offset + length <= self.len(),
181
"the offset of the new Buffer cannot exceed the existing length"
182
);
183
// SAFETY: we just checked bounds
184
unsafe { self.slice_unchecked(offset, length) }
185
}
186
187
/// Returns a new [`Buffer`] that is a slice of this buffer starting at `offset`.
188
/// Doing so allows the same memory region to be shared between buffers.
189
///
190
/// # Safety
191
/// The caller must ensure `offset + length <= self.len()`
192
#[inline]
193
#[must_use]
194
pub unsafe fn sliced_unchecked(mut self, offset: usize, length: usize) -> Self {
195
debug_assert!(offset + length <= self.len());
196
197
self.slice_unchecked(offset, length);
198
self
199
}
200
201
/// Slices this buffer starting at `offset`.
202
///
203
/// # Safety
204
/// The caller must ensure `offset + length <= self.len()`
205
#[inline]
206
pub unsafe fn slice_unchecked(&mut self, offset: usize, length: usize) {
207
self.ptr = self.ptr.add(offset);
208
self.length = length;
209
}
210
211
/// Returns a pointer to the start of the storage underlying this buffer.
212
#[inline]
213
pub(crate) fn storage_ptr(&self) -> *const T {
214
self.storage.as_ptr()
215
}
216
217
/// Returns the start offset of this buffer within the underlying storage.
218
#[inline]
219
pub fn offset(&self) -> usize {
220
unsafe {
221
let ret = self.ptr.offset_from(self.storage.as_ptr()) as usize;
222
debug_assert!(ret <= self.storage.len());
223
ret
224
}
225
}
226
227
/// # Safety
228
/// The caller must ensure that the buffer was properly initialized up to `len`.
229
#[inline]
230
pub unsafe fn set_len(&mut self, len: usize) {
231
self.length = len;
232
}
233
234
/// Returns a mutable reference to its underlying [`Vec`], if possible.
235
///
236
/// This operation returns [`Either::Right`] iff this [`Buffer`]:
237
/// * has no alive clones
238
/// * has not been imported from the C data interface (FFI)
239
#[inline]
240
pub fn into_mut(mut self) -> Either<Self, Vec<T>> {
241
// We lose information if the data is sliced.
242
if self.is_sliced() {
243
return Either::Left(self);
244
}
245
match self.storage.try_into_vec() {
246
Ok(v) => Either::Right(v),
247
Err(slf) => {
248
self.storage = slf;
249
Either::Left(self)
250
},
251
}
252
}
253
254
/// Returns a mutable reference to its slice, if possible.
255
///
256
/// This operation returns [`Some`] iff this [`Buffer`]:
257
/// * has no alive clones
258
/// * has not been imported from the C data interface (FFI)
259
#[inline]
260
pub fn get_mut_slice(&mut self) -> Option<&mut [T]> {
261
let offset = self.offset();
262
let slice = self.storage.try_as_mut_slice()?;
263
Some(unsafe { slice.get_unchecked_mut(offset..offset + self.length) })
264
}
265
266
/// Since this takes a shared reference to self, beware that others might
267
/// increment this after you've checked it's equal to 1.
268
pub fn storage_refcount(&self) -> u64 {
269
self.storage.refcount()
270
}
271
}
272
273
impl<T: Pod> Buffer<T> {
274
pub fn try_transmute<U: Pod>(mut self) -> Result<Buffer<U>, Self> {
275
assert_ne!(size_of::<U>(), 0);
276
let ptr = self.ptr as *const U;
277
let length = self.length;
278
match self.storage.try_transmute() {
279
Err(v) => {
280
self.storage = v;
281
Err(self)
282
},
283
Ok(storage) => Ok(Buffer {
284
storage,
285
ptr,
286
length: length.checked_mul(size_of::<T>()).expect("overflow") / size_of::<U>(),
287
}),
288
}
289
}
290
}
291
292
impl<T: Clone> Buffer<T> {
293
pub fn make_mut(self) -> Vec<T> {
294
match self.into_mut() {
295
Either::Right(v) => v,
296
Either::Left(same) => same.as_slice().to_vec(),
297
}
298
}
299
}
300
301
impl<T: Zeroable + Copy> Buffer<T> {
302
pub fn zeroed(len: usize) -> Self {
303
vec![T::zeroed(); len].into()
304
}
305
}
306
307
impl<T> From<Vec<T>> for Buffer<T> {
308
#[inline]
309
fn from(v: Vec<T>) -> Self {
310
Self::from_storage(SharedStorage::from_vec(v))
311
}
312
}
313
314
impl<T> Deref for Buffer<T> {
315
type Target = [T];
316
317
#[inline(always)]
318
fn deref(&self) -> &[T] {
319
self.as_slice()
320
}
321
}
322
323
impl<T> AsRef<[T]> for Buffer<T> {
324
#[inline(always)]
325
fn as_ref(&self) -> &[T] {
326
self.as_slice()
327
}
328
}
329
330
impl<T> FromIterator<T> for Buffer<T> {
331
#[inline]
332
fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
333
Vec::from_iter(iter).into()
334
}
335
}
336
337
impl<T: Copy> IntoIterator for Buffer<T> {
338
type Item = T;
339
340
type IntoIter = IntoIter<T>;
341
342
fn into_iter(self) -> Self::IntoIter {
343
IntoIter::new(self)
344
}
345
}
346
347
unsafe impl<'a, T: 'a> ArrayAccessor<'a> for Buffer<T> {
348
type Item = &'a T;
349
350
unsafe fn value_unchecked(&'a self, index: usize) -> Self::Item {
351
unsafe { &*self.ptr.add(index) }
352
}
353
354
fn len(&self) -> usize {
355
Buffer::len(self)
356
}
357
}
358
359
impl<T> Splitable for Buffer<T> {
360
#[inline(always)]
361
fn check_bound(&self, offset: usize) -> bool {
362
offset <= self.len()
363
}
364
365
unsafe fn _split_at_unchecked(&self, offset: usize) -> (Self, Self) {
366
let storage = &self.storage;
367
368
(
369
Self {
370
storage: storage.clone(),
371
ptr: self.ptr,
372
length: offset,
373
},
374
Self {
375
storage: storage.clone(),
376
ptr: self.ptr.wrapping_add(offset),
377
length: self.length - offset,
378
},
379
)
380
}
381
}
382
383