/*1* Copyright © 2016 Mozilla Foundation2*3* This program is made available under an ISC-style license. See the4* accompanying file LICENSE for details.5*/67#ifndef CUBEB_RING_BUFFER_H8#define CUBEB_RING_BUFFER_H910#include "cubeb_utils.h"11#include <algorithm>12#include <atomic>13#include <cstdint>14#include <memory>15#include <thread>1617/**18* Single producer single consumer lock-free and wait-free ring buffer.19*20* This data structure allows producing data from one thread, and consuming it21* on another thread, safely and without explicit synchronization. If used on22* two threads, this data structure uses atomics for thread safety. It is23* possible to disable the use of atomics at compile time and only use this data24* structure on one thread.25*26* The role for the producer and the consumer must be constant, i.e., the27* producer should always be on one thread and the consumer should always be on28* another thread.29*30* Some words about the inner workings of this class:31* - Capacity is fixed. Only one allocation is performed, in the constructor.32* When reading and writing, the return value of the method allows checking if33* the ring buffer is empty or full.34* - We always keep the read index at least one element ahead of the write35* index, so we can distinguish between an empty and a full ring buffer: an36* empty ring buffer is when the write index is at the same position as the37* read index. A full buffer is when the write index is exactly one position38* before the read index.39* - We synchronize updates to the read index after having read the data, and40* the write index after having written the data. This means that the each41* thread can only touch a portion of the buffer that is not touched by the42* other thread.43* - Callers are expected to provide buffers. When writing to the queue,44* elements are copied into the internal storage from the buffer passed in.45* When reading from the queue, the user is expected to provide a buffer.46* Because this is a ring buffer, data might not be contiguous in memory,47* providing an external buffer to copy into is an easy way to have linear48* data for further processing.49*/50template <typename T> class ring_buffer_base {51public:52/**53* Constructor for a ring buffer.54*55* This performs an allocation, but is the only allocation that will happen56* for the life time of a `ring_buffer_base`.57*58* @param capacity The maximum number of element this ring buffer will hold.59*/60ring_buffer_base(int capacity)61/* One more element to distinguish from empty and full buffer. */62: capacity_(capacity + 1)63{64assert(storage_capacity() < std::numeric_limits<int>::max() / 2 &&65"buffer too large for the type of index used.");66assert(capacity_ > 0);6768data_.reset(new T[storage_capacity()]);69/* If this queue is using atomics, initializing those members as the last70* action in the constructor acts as a full barrier, and allow capacity() to71* be thread-safe. */72write_index_ = 0;73read_index_ = 0;74}75/**76* Push `count` zero or default constructed elements in the array.77*78* Only safely called on the producer thread.79*80* @param count The number of elements to enqueue.81* @return The number of element enqueued.82*/83int enqueue_default(int count) { return enqueue(nullptr, count); }84/**85* @brief Put an element in the queue86*87* Only safely called on the producer thread.88*89* @param element The element to put in the queue.90*91* @return 1 if the element was inserted, 0 otherwise.92*/93int enqueue(T & element) { return enqueue(&element, 1); }94/**95* Push `count` elements in the ring buffer.96*97* Only safely called on the producer thread.98*99* @param elements a pointer to a buffer containing at least `count` elements.100* If `elements` is nullptr, zero or default constructed elements are101* enqueued.102* @param count The number of elements to read from `elements`103* @return The number of elements successfully coped from `elements` and104* inserted into the ring buffer.105*/106int enqueue(T * elements, int count)107{108#ifndef NDEBUG109assert_correct_thread(producer_id);110#endif111112int wr_idx = write_index_.load(std::memory_order_relaxed);113int rd_idx = read_index_.load(std::memory_order_acquire);114115if (full_internal(rd_idx, wr_idx)) {116return 0;117}118119int to_write = std::min(available_write_internal(rd_idx, wr_idx), count);120121/* First part, from the write index to the end of the array. */122int first_part = std::min(storage_capacity() - wr_idx, to_write);123/* Second part, from the beginning of the array */124int second_part = to_write - first_part;125126if (elements) {127Copy(data_.get() + wr_idx, elements, first_part);128Copy(data_.get(), elements + first_part, second_part);129} else {130ConstructDefault(data_.get() + wr_idx, first_part);131ConstructDefault(data_.get(), second_part);132}133134write_index_.store(increment_index(wr_idx, to_write),135std::memory_order_release);136137return to_write;138}139/**140* Retrieve at most `count` elements from the ring buffer, and copy them to141* `elements`, if non-null.142*143* Only safely called on the consumer side.144*145* @param elements A pointer to a buffer with space for at least `count`146* elements. If `elements` is `nullptr`, `count` element will be discarded.147* @param count The maximum number of elements to dequeue.148* @return The number of elements written to `elements`.149*/150int dequeue(T * elements, int count)151{152#ifndef NDEBUG153assert_correct_thread(consumer_id);154#endif155156int rd_idx = read_index_.load(std::memory_order_relaxed);157int wr_idx = write_index_.load(std::memory_order_acquire);158159if (empty_internal(rd_idx, wr_idx)) {160return 0;161}162163int to_read = std::min(available_read_internal(rd_idx, wr_idx), count);164165int first_part = std::min(storage_capacity() - rd_idx, to_read);166int second_part = to_read - first_part;167168if (elements) {169Copy(elements, data_.get() + rd_idx, first_part);170Copy(elements + first_part, data_.get(), second_part);171}172173read_index_.store(increment_index(rd_idx, to_read),174std::memory_order_release);175176return to_read;177}178/**179* Get the number of available element for consuming.180*181* Only safely called on the consumer thread.182*183* @return The number of available elements for reading.184*/185int available_read() const186{187#ifndef NDEBUG188assert_correct_thread(consumer_id);189#endif190return available_read_internal(191read_index_.load(std::memory_order_relaxed),192write_index_.load(std::memory_order_acquire));193}194/**195* Get the number of available elements for consuming.196*197* Only safely called on the producer thread.198*199* @return The number of empty slots in the buffer, available for writing.200*/201int available_write() const202{203#ifndef NDEBUG204assert_correct_thread(producer_id);205#endif206return available_write_internal(207read_index_.load(std::memory_order_acquire),208write_index_.load(std::memory_order_relaxed));209}210/**211* Get the total capacity, for this ring buffer.212*213* Can be called safely on any thread.214*215* @return The maximum capacity of this ring buffer.216*/217int capacity() const { return storage_capacity() - 1; }218/**219* Reset the consumer and producer thread identifier, in case the thread are220* being changed. This has to be externally synchronized. This is no-op when221* asserts are disabled.222*/223void reset_thread_ids()224{225#ifndef NDEBUG226consumer_id = producer_id = std::thread::id();227#endif228}229230private:231/** Return true if the ring buffer is empty.232*233* @param read_index the read index to consider234* @param write_index the write index to consider235* @return true if the ring buffer is empty, false otherwise.236**/237bool empty_internal(int read_index, int write_index) const238{239return write_index == read_index;240}241/** Return true if the ring buffer is full.242*243* This happens if the write index is exactly one element behind the read244* index.245*246* @param read_index the read index to consider247* @param write_index the write index to consider248* @return true if the ring buffer is full, false otherwise.249**/250bool full_internal(int read_index, int write_index) const251{252return (write_index + 1) % storage_capacity() == read_index;253}254/**255* Return the size of the storage. It is one more than the number of elements256* that can be stored in the buffer.257*258* @return the number of elements that can be stored in the buffer.259*/260int storage_capacity() const { return capacity_; }261/**262* Returns the number of elements available for reading.263*264* @return the number of available elements for reading.265*/266int available_read_internal(int read_index, int write_index) const267{268if (write_index >= read_index) {269return write_index - read_index;270} else {271return write_index + storage_capacity() - read_index;272}273}274/**275* Returns the number of empty elements, available for writing.276*277* @return the number of elements that can be written into the array.278*/279int available_write_internal(int read_index, int write_index) const280{281/* We substract one element here to always keep at least one sample282* free in the buffer, to distinguish between full and empty array. */283int rv = read_index - write_index - 1;284if (write_index >= read_index) {285rv += storage_capacity();286}287return rv;288}289/**290* Increments an index, wrapping it around the storage.291*292* @param index a reference to the index to increment.293* @param increment the number by which `index` is incremented.294* @return the new index.295*/296int increment_index(int index, int increment) const297{298assert(increment >= 0);299return (index + increment) % storage_capacity();300}301/**302* @brief This allows checking that enqueue (resp. dequeue) are always called303* by the right thread.304*305* @param id the id of the thread that has called the calling method first.306*/307#ifndef NDEBUG308static void assert_correct_thread(std::thread::id & id)309{310if (id == std::thread::id()) {311id = std::this_thread::get_id();312return;313}314assert(id == std::this_thread::get_id());315}316#endif317/** Index at which the oldest element is at, in samples. */318std::atomic<int> read_index_;319/** Index at which to write new elements. `write_index` is always at320* least one element ahead of `read_index_`. */321std::atomic<int> write_index_;322/** Maximum number of elements that can be stored in the ring buffer. */323const int capacity_;324/** Data storage */325std::unique_ptr<T[]> data_;326#ifndef NDEBUG327/** The id of the only thread that is allowed to read from the queue. */328mutable std::thread::id consumer_id;329/** The id of the only thread that is allowed to write from the queue. */330mutable std::thread::id producer_id;331#endif332};333334/**335* Adapter for `ring_buffer_base` that exposes an interface in frames.336*/337template <typename T> class audio_ring_buffer_base {338public:339/**340* @brief Constructor.341*342* @param channel_count Number of channels.343* @param capacity_in_frames The capacity in frames.344*/345audio_ring_buffer_base(int channel_count, int capacity_in_frames)346: channel_count(channel_count),347ring_buffer(frames_to_samples(capacity_in_frames))348{349assert(channel_count > 0);350}351/**352* @brief Enqueue silence.353*354* Only safely called on the producer thread.355*356* @param frame_count The number of frames of silence to enqueue.357* @return The number of frames of silence actually written to the queue.358*/359int enqueue_default(int frame_count)360{361return samples_to_frames(362ring_buffer.enqueue(nullptr, frames_to_samples(frame_count)));363}364/**365* @brief Enqueue `frames_count` frames of audio.366*367* Only safely called from the producer thread.368*369* @param [in] frames If non-null, the frames to enqueue.370* Otherwise, silent frames are enqueued.371* @param frame_count The number of frames to enqueue.372*373* @return The number of frames enqueued374*/375376int enqueue(T * frames, int frame_count)377{378return samples_to_frames(379ring_buffer.enqueue(frames, frames_to_samples(frame_count)));380}381382/**383* @brief Removes `frame_count` frames from the buffer, and384* write them to `frames` if it is non-null.385*386* Only safely called on the consumer thread.387*388* @param frames If non-null, the frames are copied to `frames`.389* Otherwise, they are dropped.390* @param frame_count The number of frames to remove.391*392* @return The number of frames actually dequeud.393*/394int dequeue(T * frames, int frame_count)395{396return samples_to_frames(397ring_buffer.dequeue(frames, frames_to_samples(frame_count)));398}399/**400* Get the number of available frames of audio for consuming.401*402* Only safely called on the consumer thread.403*404* @return The number of available frames of audio for reading.405*/406int available_read() const407{408return samples_to_frames(ring_buffer.available_read());409}410/**411* Get the number of available frames of audio for consuming.412*413* Only safely called on the producer thread.414*415* @return The number of empty slots in the buffer, available for writing.416*/417int available_write() const418{419return samples_to_frames(ring_buffer.available_write());420}421/**422* Get the total capacity, for this ring buffer.423*424* Can be called safely on any thread.425*426* @return The maximum capacity of this ring buffer.427*/428int capacity() const { return samples_to_frames(ring_buffer.capacity()); }429430private:431/**432* @brief Frames to samples conversion.433*434* @param frames The number of frames.435*436* @return A number of samples.437*/438int frames_to_samples(int frames) const { return frames * channel_count; }439/**440* @brief Samples to frames conversion.441*442* @param samples The number of samples.443*444* @return A number of frames.445*/446int samples_to_frames(int samples) const { return samples / channel_count; }447/** Number of channels of audio that will stream through this ring buffer. */448int channel_count;449/** The underlying ring buffer that is used to store the data. */450ring_buffer_base<T> ring_buffer;451};452453/**454* Lock-free instantiation of the `ring_buffer_base` type. This is safe to use455* from two threads, one producer, one consumer (that never change role),456* without explicit synchronization.457*/458template <typename T> using lock_free_queue = ring_buffer_base<T>;459/**460* Lock-free instantiation of the `audio_ring_buffer` type. This is safe to use461* from two threads, one producer, one consumer (that never change role),462* without explicit synchronization.463*/464template <typename T>465using lock_free_audio_ring_buffer = audio_ring_buffer_base<T>;466467#endif // CUBEB_RING_BUFFER_H468469470