Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
pola-rs
GitHub Repository: pola-rs/polars
Path: blob/main/crates/polars-utils/src/fixedringbuffer.rs
6939 views
1
/// A ring-buffer with a size determined at creation-time
2
///
3
/// This makes it perfectly suited for buffers that produce and consume at different speeds.
4
pub struct FixedRingBuffer<T> {
5
start: usize,
6
length: usize,
7
buffer: *mut T,
8
/// The wanted fixed capacity in the buffer
9
capacity: usize,
10
11
/// The actually allocated capacity, this should not be used for any calculations and it purely
12
/// used for the deallocation.
13
_buffer_capacity: usize,
14
}
15
16
#[inline(always)]
17
const fn wrapping_add(x: usize, n: usize, capacity: usize) -> usize {
18
assert!(n <= capacity);
19
20
let sub = if capacity - n <= x { capacity } else { 0 };
21
22
x.wrapping_add(n).wrapping_sub(sub)
23
}
24
25
impl<T> FixedRingBuffer<T> {
26
pub fn new(capacity: usize) -> Self {
27
let buffer = Vec::with_capacity(capacity);
28
29
Self {
30
start: 0,
31
length: 0,
32
33
_buffer_capacity: buffer.capacity(),
34
buffer: buffer.leak() as *mut [T] as *mut T,
35
capacity,
36
}
37
}
38
39
#[inline(always)]
40
pub const fn len(&self) -> usize {
41
self.length
42
}
43
44
#[inline(always)]
45
pub const fn capacity(&self) -> usize {
46
self.capacity
47
}
48
49
#[inline(always)]
50
pub const fn remaining_capacity(&self) -> usize {
51
self.capacity - self.len()
52
}
53
54
#[inline(always)]
55
pub const fn is_empty(&self) -> bool {
56
self.length == 0
57
}
58
59
#[inline(always)]
60
pub const fn is_full(&self) -> bool {
61
self.len() == self.capacity
62
}
63
64
/// Get a reference to all elements in the form of two slices.
65
///
66
/// These are in the listed in the order of being pushed into the buffer.
67
#[inline]
68
pub fn as_slices(&self) -> (&[T], &[T]) {
69
// SAFETY: Only pick the part that is actually defined
70
if self.capacity - self.length > self.start {
71
(
72
unsafe {
73
std::slice::from_raw_parts(self.buffer.wrapping_add(self.start), self.length)
74
},
75
&[],
76
)
77
} else {
78
(
79
unsafe {
80
std::slice::from_raw_parts(
81
self.buffer.wrapping_add(self.start),
82
self.capacity - self.start,
83
)
84
},
85
unsafe {
86
std::slice::from_raw_parts(
87
self.buffer,
88
wrapping_add(self.start, self.length, self.capacity),
89
)
90
},
91
)
92
}
93
}
94
95
/// Pop an item at the front of the [`FixedRingBuffer`]
96
#[inline]
97
pub fn pop_front(&mut self) -> Option<T> {
98
if self.is_empty() {
99
return None;
100
}
101
102
// SAFETY: This value is never read again
103
let item = unsafe { self.buffer.wrapping_add(self.start).read() };
104
self.start = wrapping_add(self.start, 1, self.capacity);
105
self.length -= 1;
106
Some(item)
107
}
108
109
/// Push an item into the [`FixedRingBuffer`]
110
///
111
/// Returns `None` if there is no more space
112
#[inline]
113
pub fn push(&mut self, value: T) -> Option<()> {
114
if self.is_full() {
115
return None;
116
}
117
118
let offset = wrapping_add(self.start, self.len(), self.capacity);
119
120
unsafe { self.buffer.wrapping_add(offset).write(value) };
121
self.length += 1;
122
123
Some(())
124
}
125
}
126
127
impl<T: Copy> FixedRingBuffer<T> {
128
/// Add at most `num` items of `value` into the [`FixedRingBuffer`]
129
///
130
/// This returns the amount of items actually added.
131
pub fn fill_repeat(&mut self, value: T, num: usize) -> usize {
132
if num == 0 || self.is_full() {
133
return 0;
134
}
135
136
let num = usize::min(num, self.remaining_capacity());
137
138
let start = wrapping_add(self.start, self.len(), self.capacity);
139
let end = wrapping_add(start, num, self.capacity);
140
141
if start < end {
142
unsafe { std::slice::from_raw_parts_mut(self.buffer.wrapping_add(start), num) }
143
.fill(value);
144
} else {
145
unsafe {
146
std::slice::from_raw_parts_mut(
147
self.buffer.wrapping_add(start),
148
self.capacity - start,
149
)
150
}
151
.fill(value);
152
153
if end != 0 {
154
unsafe { std::slice::from_raw_parts_mut(self.buffer, end) }.fill(value);
155
}
156
}
157
158
self.length += num;
159
160
num
161
}
162
}
163
164
impl<T> Drop for FixedRingBuffer<T> {
165
fn drop(&mut self) {
166
for i in 0..self.length {
167
let offset = wrapping_add(self.start, i, self.capacity);
168
unsafe { self.buffer.wrapping_add(offset).read() };
169
}
170
171
unsafe { Vec::from_raw_parts(self.buffer, 0, self._buffer_capacity) };
172
}
173
}
174
175
#[cfg(test)]
176
mod tests {
177
use super::*;
178
179
#[test]
180
fn basic() {
181
let mut frb = FixedRingBuffer::new(256);
182
183
assert!(frb.pop_front().is_none());
184
185
frb.push(1).unwrap();
186
frb.push(3).unwrap();
187
188
assert_eq!(frb.pop_front(), Some(1));
189
assert_eq!(frb.pop_front(), Some(3));
190
assert_eq!(frb.pop_front(), None);
191
192
assert!(!frb.is_full());
193
assert_eq!(frb.fill_repeat(42, 300), 256);
194
assert!(frb.is_full());
195
196
for _ in 0..256 {
197
assert_eq!(frb.pop_front(), Some(42));
198
assert!(!frb.is_full());
199
}
200
assert_eq!(frb.pop_front(), None);
201
}
202
203
#[test]
204
fn boxed() {
205
let mut frb = FixedRingBuffer::new(256);
206
207
assert!(frb.pop_front().is_none());
208
209
frb.push(Box::new(1)).unwrap();
210
frb.push(Box::new(3)).unwrap();
211
212
assert_eq!(frb.pop_front(), Some(Box::new(1)));
213
assert_eq!(frb.pop_front(), Some(Box::new(3)));
214
assert_eq!(frb.pop_front(), None);
215
}
216
}
217
218