Path: blob/main/crates/polars-utils/src/fixedringbuffer.rs
6939 views
/// A ring-buffer with a size determined at creation-time1///2/// This makes it perfectly suited for buffers that produce and consume at different speeds.3pub struct FixedRingBuffer<T> {4start: usize,5length: usize,6buffer: *mut T,7/// The wanted fixed capacity in the buffer8capacity: usize,910/// The actually allocated capacity, this should not be used for any calculations and it purely11/// used for the deallocation.12_buffer_capacity: usize,13}1415#[inline(always)]16const fn wrapping_add(x: usize, n: usize, capacity: usize) -> usize {17assert!(n <= capacity);1819let sub = if capacity - n <= x { capacity } else { 0 };2021x.wrapping_add(n).wrapping_sub(sub)22}2324impl<T> FixedRingBuffer<T> {25pub fn new(capacity: usize) -> Self {26let buffer = Vec::with_capacity(capacity);2728Self {29start: 0,30length: 0,3132_buffer_capacity: buffer.capacity(),33buffer: buffer.leak() as *mut [T] as *mut T,34capacity,35}36}3738#[inline(always)]39pub const fn len(&self) -> usize {40self.length41}4243#[inline(always)]44pub const fn capacity(&self) -> usize {45self.capacity46}4748#[inline(always)]49pub const fn remaining_capacity(&self) -> usize {50self.capacity - self.len()51}5253#[inline(always)]54pub const fn is_empty(&self) -> bool {55self.length == 056}5758#[inline(always)]59pub const fn is_full(&self) -> bool {60self.len() == self.capacity61}6263/// Get a reference to all elements in the form of two slices.64///65/// These are in the listed in the order of being pushed into the buffer.66#[inline]67pub fn as_slices(&self) -> (&[T], &[T]) {68// SAFETY: Only pick the part that is actually defined69if self.capacity - self.length > self.start {70(71unsafe {72std::slice::from_raw_parts(self.buffer.wrapping_add(self.start), self.length)73},74&[],75)76} else {77(78unsafe {79std::slice::from_raw_parts(80self.buffer.wrapping_add(self.start),81self.capacity - self.start,82)83},84unsafe {85std::slice::from_raw_parts(86self.buffer,87wrapping_add(self.start, self.length, self.capacity),88)89},90)91}92}9394/// Pop an item at the front of the [`FixedRingBuffer`]95#[inline]96pub fn pop_front(&mut self) -> Option<T> {97if self.is_empty() {98return None;99}100101// SAFETY: This value is never read again102let item = unsafe { self.buffer.wrapping_add(self.start).read() };103self.start = wrapping_add(self.start, 1, self.capacity);104self.length -= 1;105Some(item)106}107108/// Push an item into the [`FixedRingBuffer`]109///110/// Returns `None` if there is no more space111#[inline]112pub fn push(&mut self, value: T) -> Option<()> {113if self.is_full() {114return None;115}116117let offset = wrapping_add(self.start, self.len(), self.capacity);118119unsafe { self.buffer.wrapping_add(offset).write(value) };120self.length += 1;121122Some(())123}124}125126impl<T: Copy> FixedRingBuffer<T> {127/// Add at most `num` items of `value` into the [`FixedRingBuffer`]128///129/// This returns the amount of items actually added.130pub fn fill_repeat(&mut self, value: T, num: usize) -> usize {131if num == 0 || self.is_full() {132return 0;133}134135let num = usize::min(num, self.remaining_capacity());136137let start = wrapping_add(self.start, self.len(), self.capacity);138let end = wrapping_add(start, num, self.capacity);139140if start < end {141unsafe { std::slice::from_raw_parts_mut(self.buffer.wrapping_add(start), num) }142.fill(value);143} else {144unsafe {145std::slice::from_raw_parts_mut(146self.buffer.wrapping_add(start),147self.capacity - start,148)149}150.fill(value);151152if end != 0 {153unsafe { std::slice::from_raw_parts_mut(self.buffer, end) }.fill(value);154}155}156157self.length += num;158159num160}161}162163impl<T> Drop for FixedRingBuffer<T> {164fn drop(&mut self) {165for i in 0..self.length {166let offset = wrapping_add(self.start, i, self.capacity);167unsafe { self.buffer.wrapping_add(offset).read() };168}169170unsafe { Vec::from_raw_parts(self.buffer, 0, self._buffer_capacity) };171}172}173174#[cfg(test)]175mod tests {176use super::*;177178#[test]179fn basic() {180let mut frb = FixedRingBuffer::new(256);181182assert!(frb.pop_front().is_none());183184frb.push(1).unwrap();185frb.push(3).unwrap();186187assert_eq!(frb.pop_front(), Some(1));188assert_eq!(frb.pop_front(), Some(3));189assert_eq!(frb.pop_front(), None);190191assert!(!frb.is_full());192assert_eq!(frb.fill_repeat(42, 300), 256);193assert!(frb.is_full());194195for _ in 0..256 {196assert_eq!(frb.pop_front(), Some(42));197assert!(!frb.is_full());198}199assert_eq!(frb.pop_front(), None);200}201202#[test]203fn boxed() {204let mut frb = FixedRingBuffer::new(256);205206assert!(frb.pop_front().is_none());207208frb.push(Box::new(1)).unwrap();209frb.push(Box::new(3)).unwrap();210211assert_eq!(frb.pop_front(), Some(Box::new(1)));212assert_eq!(frb.pop_front(), Some(Box::new(3)));213assert_eq!(frb.pop_front(), None);214}215}216217218