Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
stenzek
GitHub Repository: stenzek/duckstation
Path: blob/master/dep/cubeb/src/cubeb_ringbuffer.h
4246 views
1
/*
2
* Copyright © 2016 Mozilla Foundation
3
*
4
* This program is made available under an ISC-style license. See the
5
* accompanying file LICENSE for details.
6
*/
7
8
#ifndef CUBEB_RING_BUFFER_H
9
#define CUBEB_RING_BUFFER_H
10
11
#include "cubeb_utils.h"
12
#include <algorithm>
13
#include <atomic>
14
#include <cstdint>
15
#include <memory>
16
#include <thread>
17
18
/**
19
* Single producer single consumer lock-free and wait-free ring buffer.
20
*
21
* This data structure allows producing data from one thread, and consuming it
22
* on another thread, safely and without explicit synchronization. If used on
23
* two threads, this data structure uses atomics for thread safety. It is
24
* possible to disable the use of atomics at compile time and only use this data
25
* structure on one thread.
26
*
27
* The role for the producer and the consumer must be constant, i.e., the
28
* producer should always be on one thread and the consumer should always be on
29
* another thread.
30
*
31
* Some words about the inner workings of this class:
32
* - Capacity is fixed. Only one allocation is performed, in the constructor.
33
* When reading and writing, the return value of the method allows checking if
34
* the ring buffer is empty or full.
35
* - We always keep the read index at least one element ahead of the write
36
* index, so we can distinguish between an empty and a full ring buffer: an
37
* empty ring buffer is when the write index is at the same position as the
38
* read index. A full buffer is when the write index is exactly one position
39
* before the read index.
40
* - We synchronize updates to the read index after having read the data, and
41
* the write index after having written the data. This means that the each
42
* thread can only touch a portion of the buffer that is not touched by the
43
* other thread.
44
* - Callers are expected to provide buffers. When writing to the queue,
45
* elements are copied into the internal storage from the buffer passed in.
46
* When reading from the queue, the user is expected to provide a buffer.
47
* Because this is a ring buffer, data might not be contiguous in memory,
48
* providing an external buffer to copy into is an easy way to have linear
49
* data for further processing.
50
*/
51
template <typename T> class ring_buffer_base {
52
public:
53
/**
54
* Constructor for a ring buffer.
55
*
56
* This performs an allocation, but is the only allocation that will happen
57
* for the life time of a `ring_buffer_base`.
58
*
59
* @param capacity The maximum number of element this ring buffer will hold.
60
*/
61
ring_buffer_base(int capacity)
62
/* One more element to distinguish from empty and full buffer. */
63
: capacity_(capacity + 1)
64
{
65
assert(storage_capacity() < std::numeric_limits<int>::max() / 2 &&
66
"buffer too large for the type of index used.");
67
assert(capacity_ > 0);
68
69
data_.reset(new T[storage_capacity()]);
70
/* If this queue is using atomics, initializing those members as the last
71
* action in the constructor acts as a full barrier, and allow capacity() to
72
* be thread-safe. */
73
write_index_ = 0;
74
read_index_ = 0;
75
}
76
/**
77
* Push `count` zero or default constructed elements in the array.
78
*
79
* Only safely called on the producer thread.
80
*
81
* @param count The number of elements to enqueue.
82
* @return The number of element enqueued.
83
*/
84
int enqueue_default(int count) { return enqueue(nullptr, count); }
85
/**
86
* @brief Put an element in the queue
87
*
88
* Only safely called on the producer thread.
89
*
90
* @param element The element to put in the queue.
91
*
92
* @return 1 if the element was inserted, 0 otherwise.
93
*/
94
int enqueue(T & element) { return enqueue(&element, 1); }
95
/**
96
* Push `count` elements in the ring buffer.
97
*
98
* Only safely called on the producer thread.
99
*
100
* @param elements a pointer to a buffer containing at least `count` elements.
101
* If `elements` is nullptr, zero or default constructed elements are
102
* enqueued.
103
* @param count The number of elements to read from `elements`
104
* @return The number of elements successfully coped from `elements` and
105
* inserted into the ring buffer.
106
*/
107
int enqueue(T * elements, int count)
108
{
109
#ifndef NDEBUG
110
assert_correct_thread(producer_id);
111
#endif
112
113
int wr_idx = write_index_.load(std::memory_order_relaxed);
114
int rd_idx = read_index_.load(std::memory_order_acquire);
115
116
if (full_internal(rd_idx, wr_idx)) {
117
return 0;
118
}
119
120
int to_write = std::min(available_write_internal(rd_idx, wr_idx), count);
121
122
/* First part, from the write index to the end of the array. */
123
int first_part = std::min(storage_capacity() - wr_idx, to_write);
124
/* Second part, from the beginning of the array */
125
int second_part = to_write - first_part;
126
127
if (elements) {
128
Copy(data_.get() + wr_idx, elements, first_part);
129
Copy(data_.get(), elements + first_part, second_part);
130
} else {
131
ConstructDefault(data_.get() + wr_idx, first_part);
132
ConstructDefault(data_.get(), second_part);
133
}
134
135
write_index_.store(increment_index(wr_idx, to_write),
136
std::memory_order_release);
137
138
return to_write;
139
}
140
/**
141
* Retrieve at most `count` elements from the ring buffer, and copy them to
142
* `elements`, if non-null.
143
*
144
* Only safely called on the consumer side.
145
*
146
* @param elements A pointer to a buffer with space for at least `count`
147
* elements. If `elements` is `nullptr`, `count` element will be discarded.
148
* @param count The maximum number of elements to dequeue.
149
* @return The number of elements written to `elements`.
150
*/
151
int dequeue(T * elements, int count)
152
{
153
#ifndef NDEBUG
154
assert_correct_thread(consumer_id);
155
#endif
156
157
int rd_idx = read_index_.load(std::memory_order_relaxed);
158
int wr_idx = write_index_.load(std::memory_order_acquire);
159
160
if (empty_internal(rd_idx, wr_idx)) {
161
return 0;
162
}
163
164
int to_read = std::min(available_read_internal(rd_idx, wr_idx), count);
165
166
int first_part = std::min(storage_capacity() - rd_idx, to_read);
167
int second_part = to_read - first_part;
168
169
if (elements) {
170
Copy(elements, data_.get() + rd_idx, first_part);
171
Copy(elements + first_part, data_.get(), second_part);
172
}
173
174
read_index_.store(increment_index(rd_idx, to_read),
175
std::memory_order_release);
176
177
return to_read;
178
}
179
/**
180
* Get the number of available element for consuming.
181
*
182
* Only safely called on the consumer thread.
183
*
184
* @return The number of available elements for reading.
185
*/
186
int available_read() const
187
{
188
#ifndef NDEBUG
189
assert_correct_thread(consumer_id);
190
#endif
191
return available_read_internal(
192
read_index_.load(std::memory_order_relaxed),
193
write_index_.load(std::memory_order_acquire));
194
}
195
/**
196
* Get the number of available elements for consuming.
197
*
198
* Only safely called on the producer thread.
199
*
200
* @return The number of empty slots in the buffer, available for writing.
201
*/
202
int available_write() const
203
{
204
#ifndef NDEBUG
205
assert_correct_thread(producer_id);
206
#endif
207
return available_write_internal(
208
read_index_.load(std::memory_order_acquire),
209
write_index_.load(std::memory_order_relaxed));
210
}
211
/**
212
* Get the total capacity, for this ring buffer.
213
*
214
* Can be called safely on any thread.
215
*
216
* @return The maximum capacity of this ring buffer.
217
*/
218
int capacity() const { return storage_capacity() - 1; }
219
/**
220
* Reset the consumer and producer thread identifier, in case the thread are
221
* being changed. This has to be externally synchronized. This is no-op when
222
* asserts are disabled.
223
*/
224
void reset_thread_ids()
225
{
226
#ifndef NDEBUG
227
consumer_id = producer_id = std::thread::id();
228
#endif
229
}
230
231
private:
232
/** Return true if the ring buffer is empty.
233
*
234
* @param read_index the read index to consider
235
* @param write_index the write index to consider
236
* @return true if the ring buffer is empty, false otherwise.
237
**/
238
bool empty_internal(int read_index, int write_index) const
239
{
240
return write_index == read_index;
241
}
242
/** Return true if the ring buffer is full.
243
*
244
* This happens if the write index is exactly one element behind the read
245
* index.
246
*
247
* @param read_index the read index to consider
248
* @param write_index the write index to consider
249
* @return true if the ring buffer is full, false otherwise.
250
**/
251
bool full_internal(int read_index, int write_index) const
252
{
253
return (write_index + 1) % storage_capacity() == read_index;
254
}
255
/**
256
* Return the size of the storage. It is one more than the number of elements
257
* that can be stored in the buffer.
258
*
259
* @return the number of elements that can be stored in the buffer.
260
*/
261
int storage_capacity() const { return capacity_; }
262
/**
263
* Returns the number of elements available for reading.
264
*
265
* @return the number of available elements for reading.
266
*/
267
int available_read_internal(int read_index, int write_index) const
268
{
269
if (write_index >= read_index) {
270
return write_index - read_index;
271
} else {
272
return write_index + storage_capacity() - read_index;
273
}
274
}
275
/**
276
* Returns the number of empty elements, available for writing.
277
*
278
* @return the number of elements that can be written into the array.
279
*/
280
int available_write_internal(int read_index, int write_index) const
281
{
282
/* We substract one element here to always keep at least one sample
283
* free in the buffer, to distinguish between full and empty array. */
284
int rv = read_index - write_index - 1;
285
if (write_index >= read_index) {
286
rv += storage_capacity();
287
}
288
return rv;
289
}
290
/**
291
* Increments an index, wrapping it around the storage.
292
*
293
* @param index a reference to the index to increment.
294
* @param increment the number by which `index` is incremented.
295
* @return the new index.
296
*/
297
int increment_index(int index, int increment) const
298
{
299
assert(increment >= 0);
300
return (index + increment) % storage_capacity();
301
}
302
/**
303
* @brief This allows checking that enqueue (resp. dequeue) are always called
304
* by the right thread.
305
*
306
* @param id the id of the thread that has called the calling method first.
307
*/
308
#ifndef NDEBUG
309
static void assert_correct_thread(std::thread::id & id)
310
{
311
if (id == std::thread::id()) {
312
id = std::this_thread::get_id();
313
return;
314
}
315
assert(id == std::this_thread::get_id());
316
}
317
#endif
318
/** Index at which the oldest element is at, in samples. */
319
std::atomic<int> read_index_;
320
/** Index at which to write new elements. `write_index` is always at
321
* least one element ahead of `read_index_`. */
322
std::atomic<int> write_index_;
323
/** Maximum number of elements that can be stored in the ring buffer. */
324
const int capacity_;
325
/** Data storage */
326
std::unique_ptr<T[]> data_;
327
#ifndef NDEBUG
328
/** The id of the only thread that is allowed to read from the queue. */
329
mutable std::thread::id consumer_id;
330
/** The id of the only thread that is allowed to write from the queue. */
331
mutable std::thread::id producer_id;
332
#endif
333
};
334
335
/**
336
* Adapter for `ring_buffer_base` that exposes an interface in frames.
337
*/
338
template <typename T> class audio_ring_buffer_base {
339
public:
340
/**
341
* @brief Constructor.
342
*
343
* @param channel_count Number of channels.
344
* @param capacity_in_frames The capacity in frames.
345
*/
346
audio_ring_buffer_base(int channel_count, int capacity_in_frames)
347
: channel_count(channel_count),
348
ring_buffer(frames_to_samples(capacity_in_frames))
349
{
350
assert(channel_count > 0);
351
}
352
/**
353
* @brief Enqueue silence.
354
*
355
* Only safely called on the producer thread.
356
*
357
* @param frame_count The number of frames of silence to enqueue.
358
* @return The number of frames of silence actually written to the queue.
359
*/
360
int enqueue_default(int frame_count)
361
{
362
return samples_to_frames(
363
ring_buffer.enqueue(nullptr, frames_to_samples(frame_count)));
364
}
365
/**
366
* @brief Enqueue `frames_count` frames of audio.
367
*
368
* Only safely called from the producer thread.
369
*
370
* @param [in] frames If non-null, the frames to enqueue.
371
* Otherwise, silent frames are enqueued.
372
* @param frame_count The number of frames to enqueue.
373
*
374
* @return The number of frames enqueued
375
*/
376
377
int enqueue(T * frames, int frame_count)
378
{
379
return samples_to_frames(
380
ring_buffer.enqueue(frames, frames_to_samples(frame_count)));
381
}
382
383
/**
384
* @brief Removes `frame_count` frames from the buffer, and
385
* write them to `frames` if it is non-null.
386
*
387
* Only safely called on the consumer thread.
388
*
389
* @param frames If non-null, the frames are copied to `frames`.
390
* Otherwise, they are dropped.
391
* @param frame_count The number of frames to remove.
392
*
393
* @return The number of frames actually dequeud.
394
*/
395
int dequeue(T * frames, int frame_count)
396
{
397
return samples_to_frames(
398
ring_buffer.dequeue(frames, frames_to_samples(frame_count)));
399
}
400
/**
401
* Get the number of available frames of audio for consuming.
402
*
403
* Only safely called on the consumer thread.
404
*
405
* @return The number of available frames of audio for reading.
406
*/
407
int available_read() const
408
{
409
return samples_to_frames(ring_buffer.available_read());
410
}
411
/**
412
* Get the number of available frames of audio for consuming.
413
*
414
* Only safely called on the producer thread.
415
*
416
* @return The number of empty slots in the buffer, available for writing.
417
*/
418
int available_write() const
419
{
420
return samples_to_frames(ring_buffer.available_write());
421
}
422
/**
423
* Get the total capacity, for this ring buffer.
424
*
425
* Can be called safely on any thread.
426
*
427
* @return The maximum capacity of this ring buffer.
428
*/
429
int capacity() const { return samples_to_frames(ring_buffer.capacity()); }
430
431
private:
432
/**
433
* @brief Frames to samples conversion.
434
*
435
* @param frames The number of frames.
436
*
437
* @return A number of samples.
438
*/
439
int frames_to_samples(int frames) const { return frames * channel_count; }
440
/**
441
* @brief Samples to frames conversion.
442
*
443
* @param samples The number of samples.
444
*
445
* @return A number of frames.
446
*/
447
int samples_to_frames(int samples) const { return samples / channel_count; }
448
/** Number of channels of audio that will stream through this ring buffer. */
449
int channel_count;
450
/** The underlying ring buffer that is used to store the data. */
451
ring_buffer_base<T> ring_buffer;
452
};
453
454
/**
455
* Lock-free instantiation of the `ring_buffer_base` type. This is safe to use
456
* from two threads, one producer, one consumer (that never change role),
457
* without explicit synchronization.
458
*/
459
template <typename T> using lock_free_queue = ring_buffer_base<T>;
460
/**
461
* Lock-free instantiation of the `audio_ring_buffer` type. This is safe to use
462
* from two threads, one producer, one consumer (that never change role),
463
* without explicit synchronization.
464
*/
465
template <typename T>
466
using lock_free_audio_ring_buffer = audio_ring_buffer_base<T>;
467
468
#endif // CUBEB_RING_BUFFER_H
469
470