Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-buffer/src/storage.rs
8393 views
1
use std::any::Any;
2
use std::marker::PhantomData;
3
use std::mem::ManuallyDrop;
4
use std::ops::{Deref, DerefMut};
5
use std::process::abort;
6
use std::ptr::NonNull;
7
use std::sync::atomic::{AtomicU64, Ordering};
8
9
use bytemuck::Pod;
10
11
// Allows us to transmute between types while also keeping the original
12
// stats and drop method of the Vec around.
13
struct VecVTable {
14
size: usize,
15
align: usize,
16
drop_buffer: unsafe fn(*mut (), usize),
17
}
18
19
impl VecVTable {
20
const fn new<T>() -> Self {
21
unsafe fn drop_buffer<T>(ptr: *mut (), cap: usize) {
22
unsafe { drop(Vec::from_raw_parts(ptr.cast::<T>(), 0, cap)) }
23
}
24
25
Self {
26
size: size_of::<T>(),
27
align: align_of::<T>(),
28
drop_buffer: drop_buffer::<T>,
29
}
30
}
31
32
fn new_static<T>() -> &'static Self {
33
const { &Self::new::<T>() }
34
}
35
}
36
37
enum BackingStorage {
38
Vec {
39
original_capacity: usize, // Elements, not bytes.
40
vtable: &'static VecVTable,
41
},
42
ForeignOwner(Box<dyn Any + Send + 'static>),
43
44
/// Backed by some external method which we do not need to take care of,
45
/// but we still should refcount and drop the SharedStorageInner.
46
External,
47
48
/// Both the backing storage and the SharedStorageInner are leaked, no
49
/// refcounting is done. This technically should be a flag on
50
/// SharedStorageInner instead of being here, but that would add 8 more
51
/// bytes to SharedStorageInner, so here it is.
52
Leaked,
53
}
54
55
struct SharedStorageInner<T> {
56
ref_count: AtomicU64,
57
ptr: *mut T,
58
length_in_bytes: usize,
59
backing: BackingStorage,
60
// https://github.com/rust-lang/rfcs/blob/master/text/0769-sound-generic-drop.md#phantom-data
61
phantom: PhantomData<T>,
62
}
63
64
unsafe impl<T: Sync + Send> Sync for SharedStorageInner<T> {}
65
66
impl<T> SharedStorageInner<T> {
67
pub fn from_vec(mut v: Vec<T>) -> Self {
68
let length_in_bytes = v.len() * size_of::<T>();
69
let original_capacity = v.capacity();
70
let ptr = v.as_mut_ptr();
71
core::mem::forget(v);
72
Self {
73
ref_count: AtomicU64::new(1),
74
ptr,
75
length_in_bytes,
76
backing: BackingStorage::Vec {
77
original_capacity,
78
vtable: VecVTable::new_static::<T>(),
79
},
80
phantom: PhantomData,
81
}
82
}
83
}
84
85
impl<T> Drop for SharedStorageInner<T> {
86
fn drop(&mut self) {
87
match core::mem::replace(&mut self.backing, BackingStorage::External) {
88
BackingStorage::ForeignOwner(o) => drop(o),
89
BackingStorage::Vec {
90
original_capacity,
91
vtable,
92
} => unsafe {
93
// Drop the elements in our slice.
94
if std::mem::needs_drop::<T>() {
95
core::ptr::drop_in_place(core::ptr::slice_from_raw_parts_mut(
96
self.ptr,
97
self.length_in_bytes / size_of::<T>(),
98
));
99
}
100
101
// Free the buffer.
102
if original_capacity > 0 {
103
(vtable.drop_buffer)(self.ptr.cast(), original_capacity);
104
}
105
},
106
BackingStorage::External | BackingStorage::Leaked => {},
107
}
108
}
109
}
110
111
#[repr(transparent)]
112
pub struct SharedStorage<T> {
113
inner: NonNull<SharedStorageInner<T>>,
114
phantom: PhantomData<SharedStorageInner<T>>,
115
}
116
117
unsafe impl<T: Sync + Send> Send for SharedStorage<T> {}
118
unsafe impl<T: Sync + Send> Sync for SharedStorage<T> {}
119
120
impl<T> Default for SharedStorage<T> {
121
fn default() -> Self {
122
Self::empty()
123
}
124
}
125
126
impl<T> SharedStorage<T> {
127
/// Creates an empty SharedStorage.
128
pub const fn empty() -> Self {
129
assert!(align_of::<T>() <= 1 << 30);
130
static INNER: SharedStorageInner<()> = SharedStorageInner {
131
ref_count: AtomicU64::new(1),
132
ptr: core::ptr::without_provenance_mut(1 << 30), // Very overaligned for any T.
133
length_in_bytes: 0,
134
backing: BackingStorage::Leaked,
135
phantom: PhantomData,
136
};
137
138
Self {
139
inner: NonNull::new(&raw const INNER as *mut SharedStorageInner<T>).unwrap(),
140
phantom: PhantomData,
141
}
142
}
143
144
/// Creates a SharedStorage backed by this static slice.
145
pub fn from_static(slice: &'static [T]) -> Self {
146
// SAFETY: the slice has a static lifetime.
147
unsafe { Self::from_slice_unchecked(slice) }
148
}
149
150
/// Creates a SharedStorage backed by this slice.
151
///
152
/// # Safety
153
/// You must ensure this SharedStorage or any of its clones does not outlive
154
/// this slice.
155
pub unsafe fn from_slice_unchecked(slice: &[T]) -> Self {
156
#[expect(clippy::manual_slice_size_calculation)]
157
let length_in_bytes = slice.len() * size_of::<T>();
158
let ptr = slice.as_ptr().cast_mut();
159
let inner = SharedStorageInner {
160
ref_count: AtomicU64::new(1),
161
ptr,
162
length_in_bytes,
163
backing: BackingStorage::External,
164
phantom: PhantomData,
165
};
166
Self {
167
inner: NonNull::new(Box::into_raw(Box::new(inner))).unwrap(),
168
phantom: PhantomData,
169
}
170
}
171
172
/// Calls f with a `SharedStorage` backed by this slice.
173
///
174
/// Aborts if any clones of the SharedStorage still live when `f` returns.
175
pub fn with_slice<R, F: FnOnce(SharedStorage<T>) -> R>(slice: &[T], f: F) -> R {
176
struct AbortIfNotExclusive<T>(SharedStorage<T>);
177
impl<T> Drop for AbortIfNotExclusive<T> {
178
fn drop(&mut self) {
179
if !self.0.is_exclusive() {
180
abort()
181
}
182
}
183
}
184
185
unsafe {
186
let ss = AbortIfNotExclusive(Self::from_slice_unchecked(slice));
187
f(ss.0.clone())
188
}
189
}
190
191
/// Calls f with a `SharedStorage` backed by this vec.
192
///
193
/// # Panics
194
/// Panics if any clones of the SharedStorage still live when `f` returns.
195
pub fn with_vec<R, F: FnOnce(SharedStorage<T>) -> R>(vec: &mut Vec<T>, f: F) -> R {
196
// TODO: this function is intended to allow exclusive conversion back to
197
// a vec, but we need some kind of weak reference for this (that is, two
198
// tiers of 'is_exclusive', one for access and one for keeping the inner
199
// state alive).
200
struct RestoreVec<'a, T>(&'a mut Vec<T>, SharedStorage<T>);
201
impl<'a, T> Drop for RestoreVec<'a, T> {
202
fn drop(&mut self) {
203
*self.0 = self.1.try_take_vec().unwrap();
204
}
205
}
206
207
let tmp = core::mem::take(vec);
208
let ss = RestoreVec(vec, Self::from_vec(tmp));
209
f(ss.1.clone())
210
}
211
212
/// # Safety
213
/// The slice must be valid as long as owner lives.
214
pub unsafe fn from_slice_with_owner<O: Send + 'static>(slice: &[T], owner: O) -> Self {
215
#[expect(clippy::manual_slice_size_calculation)]
216
let length_in_bytes = slice.len() * size_of::<T>();
217
let ptr = slice.as_ptr().cast_mut();
218
let inner = SharedStorageInner {
219
ref_count: AtomicU64::new(1),
220
ptr,
221
length_in_bytes,
222
backing: BackingStorage::ForeignOwner(Box::new(owner)),
223
phantom: PhantomData,
224
};
225
Self {
226
inner: NonNull::new(Box::into_raw(Box::new(inner))).unwrap(),
227
phantom: PhantomData,
228
}
229
}
230
231
pub fn from_owner<O: Send + AsRef<[T]> + 'static>(owner: O) -> Self {
232
let owner = Box::new(owner);
233
let slice: &[T] = (*owner).as_ref();
234
#[expect(clippy::manual_slice_size_calculation)]
235
let length_in_bytes = slice.len() * size_of::<T>();
236
let ptr = slice.as_ptr().cast_mut();
237
let inner = SharedStorageInner {
238
ref_count: AtomicU64::new(1),
239
ptr,
240
length_in_bytes,
241
backing: BackingStorage::ForeignOwner(owner),
242
phantom: PhantomData,
243
};
244
Self {
245
inner: NonNull::new(Box::into_raw(Box::new(inner))).unwrap(),
246
phantom: PhantomData,
247
}
248
}
249
250
pub fn from_vec(v: Vec<T>) -> Self {
251
Self {
252
inner: NonNull::new(Box::into_raw(Box::new(SharedStorageInner::from_vec(v)))).unwrap(),
253
phantom: PhantomData,
254
}
255
}
256
257
/// Leaks this SharedStorage such that it and its inner value is never
258
/// dropped. In return no refcounting needs to be performed.
259
///
260
/// The SharedStorage must be exclusive.
261
pub fn leak(&mut self) {
262
assert!(self.is_exclusive());
263
unsafe {
264
let inner = &mut *self.inner.as_ptr();
265
core::mem::forget(core::mem::replace(
266
&mut inner.backing,
267
BackingStorage::Leaked,
268
));
269
}
270
}
271
272
/// # Safety
273
/// The caller is responsible for ensuring the resulting slice is valid and aligned for U.
274
pub unsafe fn transmute_unchecked<U>(self) -> SharedStorage<U> {
275
let storage = SharedStorage {
276
inner: self.inner.cast(),
277
phantom: PhantomData,
278
};
279
std::mem::forget(self);
280
storage
281
}
282
}
283
284
pub struct SharedStorageAsVecMut<'a, T> {
285
ss: &'a mut SharedStorage<T>,
286
vec: ManuallyDrop<Vec<T>>,
287
}
288
289
impl<T> Deref for SharedStorageAsVecMut<'_, T> {
290
type Target = Vec<T>;
291
292
fn deref(&self) -> &Self::Target {
293
&self.vec
294
}
295
}
296
297
impl<T> DerefMut for SharedStorageAsVecMut<'_, T> {
298
fn deref_mut(&mut self) -> &mut Self::Target {
299
&mut self.vec
300
}
301
}
302
303
impl<T> Drop for SharedStorageAsVecMut<'_, T> {
304
fn drop(&mut self) {
305
unsafe {
306
// Restore the SharedStorage.
307
let vec = ManuallyDrop::take(&mut self.vec);
308
let inner = self.ss.inner.as_ptr();
309
inner.write(SharedStorageInner::from_vec(vec));
310
}
311
}
312
}
313
314
impl<T> SharedStorage<T> {
315
#[inline(always)]
316
pub const fn len(&self) -> usize {
317
self.inner().length_in_bytes / size_of::<T>()
318
}
319
320
#[inline(always)]
321
pub const fn is_empty(&self) -> bool {
322
self.inner().length_in_bytes == 0
323
}
324
325
#[inline(always)]
326
pub const fn as_ptr(&self) -> *const T {
327
self.inner().ptr
328
}
329
330
#[inline(always)]
331
pub fn is_exclusive(&self) -> bool {
332
// Ordering semantics copied from Arc<T>.
333
self.inner().ref_count.load(Ordering::Acquire) == 1
334
}
335
336
/// Gets the reference count of this storage.
337
///
338
/// Because this function takes a shared reference this should not be used
339
/// in cases where we are checking if the refcount is one for safety,
340
/// someone else could increment it in the meantime.
341
#[inline(always)]
342
pub fn refcount(&self) -> u64 {
343
// Ordering semantics copied from Arc<T>.
344
self.inner().ref_count.load(Ordering::Acquire)
345
}
346
347
pub fn try_as_mut_slice(&mut self) -> Option<&mut [T]> {
348
// We don't know if what we're created from may be mutated unless we're
349
// backed by an exclusive Vec. Perhaps in the future we can add a
350
// mutability bit?
351
let inner = self.inner();
352
let may_mut = inner.ref_count.load(Ordering::Acquire) == 1
353
&& matches!(inner.backing, BackingStorage::Vec { .. });
354
may_mut.then(|| {
355
let inner = self.inner();
356
let len = inner.length_in_bytes / size_of::<T>();
357
unsafe { core::slice::from_raw_parts_mut(inner.ptr, len) }
358
})
359
}
360
361
/// Try to take the vec backing this SharedStorage, leaving this as an empty slice.
362
pub fn try_take_vec(&mut self) -> Option<Vec<T>> {
363
// If there are other references we can't get an exclusive reference.
364
if !self.is_exclusive() {
365
return None;
366
}
367
368
let ret;
369
unsafe {
370
let inner = &mut *self.inner.as_ptr();
371
372
// We may only go back to a Vec if we originally came from a Vec
373
// where the desired size/align matches the original.
374
let BackingStorage::Vec {
375
original_capacity,
376
vtable,
377
} = &mut inner.backing
378
else {
379
return None;
380
};
381
382
if vtable.size != size_of::<T>() || vtable.align != align_of::<T>() {
383
return None;
384
}
385
386
// Steal vec from inner.
387
let len = inner.length_in_bytes / size_of::<T>();
388
ret = Vec::from_raw_parts(inner.ptr, len, *original_capacity);
389
*original_capacity = 0;
390
inner.length_in_bytes = 0;
391
}
392
Some(ret)
393
}
394
395
/// Attempts to call the given function with this SharedStorage as a
396
/// reference to a mutable Vec. If this SharedStorage can't be converted to
397
/// a Vec the function is not called and instead returned as an error.
398
pub fn try_as_mut_vec(&mut self) -> Option<SharedStorageAsVecMut<'_, T>> {
399
Some(SharedStorageAsVecMut {
400
vec: ManuallyDrop::new(self.try_take_vec()?),
401
ss: self,
402
})
403
}
404
405
pub fn try_into_vec(mut self) -> Result<Vec<T>, Self> {
406
self.try_take_vec().ok_or(self)
407
}
408
409
#[inline(always)]
410
const fn inner(&self) -> &SharedStorageInner<T> {
411
unsafe { &*self.inner.as_ptr() }
412
}
413
414
/// # Safety
415
/// May only be called once.
416
#[cold]
417
unsafe fn drop_slow(&mut self) {
418
unsafe { drop(Box::from_raw(self.inner.as_ptr())) }
419
}
420
}
421
422
impl<T: Pod> SharedStorage<T> {
423
pub fn try_transmute<U: Pod>(self) -> Result<SharedStorage<U>, Self> {
424
let inner = self.inner();
425
426
// The length of the array in bytes must be a multiple of the target size.
427
// We can skip this check if the size of U divides the size of T.
428
if !size_of::<T>().is_multiple_of(size_of::<U>())
429
&& !inner.length_in_bytes.is_multiple_of(size_of::<U>())
430
{
431
return Err(self);
432
}
433
434
// The pointer must be properly aligned for U.
435
// We can skip this check if the alignment of U divides the alignment of T.
436
if !align_of::<T>().is_multiple_of(align_of::<U>()) && !inner.ptr.cast::<U>().is_aligned() {
437
return Err(self);
438
}
439
440
Ok(unsafe { self.transmute_unchecked::<U>() })
441
}
442
}
443
444
impl SharedStorage<u8> {
445
/// Create a [`SharedStorage<u8>`][SharedStorage] from a [`Vec`] of [`Pod`].
446
pub fn bytes_from_pod_vec<T: Pod>(v: Vec<T>) -> Self {
447
// This can't fail, bytes is compatible with everything.
448
SharedStorage::from_vec(v)
449
.try_transmute::<u8>()
450
.unwrap_or_else(|_| unreachable!())
451
}
452
}
453
454
impl<T> Deref for SharedStorage<T> {
455
type Target = [T];
456
457
#[inline]
458
fn deref(&self) -> &Self::Target {
459
unsafe {
460
let inner = self.inner();
461
let len = inner.length_in_bytes / size_of::<T>();
462
core::slice::from_raw_parts(inner.ptr, len)
463
}
464
}
465
}
466
467
impl<T> Clone for SharedStorage<T> {
468
fn clone(&self) -> Self {
469
let inner = self.inner();
470
if !matches!(inner.backing, BackingStorage::Leaked) {
471
// Ordering semantics copied from Arc<T>.
472
inner.ref_count.fetch_add(1, Ordering::Relaxed);
473
}
474
Self {
475
inner: self.inner,
476
phantom: PhantomData,
477
}
478
}
479
}
480
481
impl<T> Drop for SharedStorage<T> {
482
fn drop(&mut self) {
483
let inner = self.inner();
484
if matches!(inner.backing, BackingStorage::Leaked) {
485
return;
486
}
487
488
// Ordering semantics copied from Arc<T>.
489
if inner.ref_count.fetch_sub(1, Ordering::Release) == 1 {
490
std::sync::atomic::fence(Ordering::Acquire);
491
unsafe {
492
self.drop_slow();
493
}
494
}
495
}
496
}
497
498