Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
google
GitHub Repository: google/crosvm
Path: blob/main/common/audio_streams/src/audio_streams.rs
5394 views
1
// Copyright 2019 The ChromiumOS Authors
2
// Use of this source code is governed by a BSD-style license that can be
3
// found in the LICENSE file.
4
5
//! Provides an interface for playing and recording audio.
6
//!
7
//! When implementing an audio playback system, the `StreamSource` trait is implemented.
8
//! Implementors of this trait allow creation of `PlaybackBufferStream` objects. The
9
//! `PlaybackBufferStream` provides the actual audio buffers to be filled with audio samples. These
10
//! buffers can be filled with `write_playback_buffer`.
11
//!
12
//! Users playing audio fill the provided buffers with audio. When a `PlaybackBuffer` is dropped,
13
//! the samples written to it are committed to the `PlaybackBufferStream` it came from.
14
//!
15
//! ```
16
//! use audio_streams::{BoxError, PlaybackBuffer, SampleFormat, StreamSource, NoopStreamSource};
17
//! use std::io::Write;
18
//!
19
//! const buffer_size: usize = 120;
20
//! const num_channels: usize = 2;
21
//!
22
//! # fn main() -> std::result::Result<(), BoxError> {
23
//! let mut stream_source = NoopStreamSource::new();
24
//! let sample_format = SampleFormat::S16LE;
25
//! let frame_size = num_channels * sample_format.sample_bytes();
26
//!
27
//! let (_, mut stream) = stream_source
28
//! .new_playback_stream(num_channels, sample_format, 48000, buffer_size)?;
29
//! // Play 10 buffers of DC.
30
//! let mut buf = Vec::new();
31
//! buf.resize(buffer_size * frame_size, 0xa5u8);
32
//! for _ in 0..10 {
33
//! let mut copy_cb = |stream_buffer: &mut PlaybackBuffer| {
34
//! assert_eq!(stream_buffer.write(&buf)?, buffer_size * frame_size);
35
//! Ok(())
36
//! };
37
//! stream.write_playback_buffer(&mut copy_cb)?;
38
//! }
39
//! # Ok (())
40
//! # }
41
//! ```
42
pub mod async_api;
43
44
use std::cmp::min;
45
use std::error;
46
use std::fmt;
47
use std::fmt::Display;
48
use std::io;
49
use std::io::Read;
50
use std::io::Write;
51
#[cfg(unix)]
52
use std::os::unix::io::RawFd as RawDescriptor;
53
#[cfg(windows)]
54
use std::os::windows::io::RawHandle as RawDescriptor;
55
use std::result::Result;
56
use std::str::FromStr;
57
use std::time::Duration;
58
use std::time::Instant;
59
60
pub use async_api::AsyncStream;
61
pub use async_api::AudioStreamsExecutor;
62
use async_trait::async_trait;
63
use remain::sorted;
64
use serde::Deserialize;
65
use serde::Serialize;
66
use thiserror::Error;
67
68
#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
69
pub enum SampleFormat {
70
U8,
71
S16LE,
72
S24LE,
73
S32LE,
74
}
75
76
impl SampleFormat {
77
pub fn sample_bytes(self) -> usize {
78
use SampleFormat::*;
79
match self {
80
U8 => 1,
81
S16LE => 2,
82
S24LE => 4, // Not a typo, S24_LE samples are stored in 4 byte chunks.
83
S32LE => 4,
84
}
85
}
86
}
87
88
impl Display for SampleFormat {
89
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
90
use SampleFormat::*;
91
match self {
92
U8 => write!(f, "Unsigned 8 bit"),
93
S16LE => write!(f, "Signed 16 bit Little Endian"),
94
S24LE => write!(f, "Signed 24 bit Little Endian"),
95
S32LE => write!(f, "Signed 32 bit Little Endian"),
96
}
97
}
98
}
99
100
impl FromStr for SampleFormat {
101
type Err = SampleFormatError;
102
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
103
match s {
104
"U8" => Ok(SampleFormat::U8),
105
"S16_LE" => Ok(SampleFormat::S16LE),
106
"S24_LE" => Ok(SampleFormat::S24LE),
107
"S32_LE" => Ok(SampleFormat::S32LE),
108
_ => Err(SampleFormatError::InvalidSampleFormat),
109
}
110
}
111
}
112
113
/// Errors that are possible from a `SampleFormat`.
114
#[sorted]
115
#[derive(Error, Debug)]
116
pub enum SampleFormatError {
117
#[error("Must be in [U8, S16_LE, S24_LE, S32_LE]")]
118
InvalidSampleFormat,
119
}
120
121
/// Valid directions of an audio stream.
122
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
123
pub enum StreamDirection {
124
Playback,
125
Capture,
126
}
127
128
/// Valid effects for an audio stream.
129
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Deserialize, Serialize)]
130
pub enum StreamEffect {
131
#[default]
132
NoEffect,
133
#[serde(alias = "aec")]
134
EchoCancellation,
135
}
136
137
pub mod capture;
138
pub mod shm_streams;
139
140
/// Errors that can pass across threads.
141
pub type BoxError = Box<dyn error::Error + Send + Sync>;
142
143
/// Errors that are possible from a `StreamEffect`.
144
#[sorted]
145
#[derive(Error, Debug)]
146
pub enum StreamEffectError {
147
#[error("Must be in [EchoCancellation, aec]")]
148
InvalidEffect,
149
}
150
151
impl FromStr for StreamEffect {
152
type Err = StreamEffectError;
153
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
154
match s {
155
"EchoCancellation" | "aec" => Ok(StreamEffect::EchoCancellation),
156
_ => Err(StreamEffectError::InvalidEffect),
157
}
158
}
159
}
160
161
#[sorted]
162
#[derive(Error, Debug)]
163
pub enum Error {
164
#[error("Unimplemented")]
165
Unimplemented,
166
}
167
168
/// `StreamSourceGenerator` is a trait used to abstract types that create [`StreamSource`].
169
/// It can be used when multiple types of `StreamSource` are needed.
170
pub trait StreamSourceGenerator: Sync + Send {
171
fn generate(&self) -> Result<Box<dyn StreamSource>, BoxError>;
172
}
173
174
/// `StreamSource` creates streams for playback or capture of audio.
175
#[async_trait(?Send)]
176
pub trait StreamSource: Send {
177
/// Returns a stream control and buffer generator object. These are separate as the buffer
178
/// generator might want to be passed to the audio stream.
179
#[allow(clippy::type_complexity)]
180
fn new_playback_stream(
181
&mut self,
182
num_channels: usize,
183
format: SampleFormat,
184
frame_rate: u32,
185
buffer_size: usize,
186
) -> Result<(Box<dyn StreamControl>, Box<dyn PlaybackBufferStream>), BoxError>;
187
188
/// Returns a stream control and async buffer generator object. These are separate as the buffer
189
/// generator might want to be passed to the audio stream.
190
#[allow(clippy::type_complexity)]
191
fn new_async_playback_stream(
192
&mut self,
193
_num_channels: usize,
194
_format: SampleFormat,
195
_frame_rate: u32,
196
_buffer_size: usize,
197
_ex: &dyn AudioStreamsExecutor,
198
) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError> {
199
Err(Box::new(Error::Unimplemented))
200
}
201
202
/// Returns a stream control and async buffer generator object asynchronously.
203
/// Default implementation calls and blocks on `new_async_playback_stream()`.
204
#[allow(clippy::type_complexity)]
205
async fn async_new_async_playback_stream(
206
&mut self,
207
num_channels: usize,
208
format: SampleFormat,
209
frame_rate: u32,
210
buffer_size: usize,
211
ex: &dyn AudioStreamsExecutor,
212
) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError> {
213
self.new_async_playback_stream(num_channels, format, frame_rate, buffer_size, ex)
214
}
215
216
/// Returns a stream control and buffer generator object. These are separate as the buffer
217
/// generator might want to be passed to the audio stream.
218
/// Default implementation returns `NoopStreamControl` and `NoopCaptureStream`.
219
#[allow(clippy::type_complexity)]
220
fn new_capture_stream(
221
&mut self,
222
num_channels: usize,
223
format: SampleFormat,
224
frame_rate: u32,
225
buffer_size: usize,
226
_effects: &[StreamEffect],
227
) -> Result<
228
(
229
Box<dyn StreamControl>,
230
Box<dyn capture::CaptureBufferStream>,
231
),
232
BoxError,
233
> {
234
Ok((
235
Box::new(NoopStreamControl::new()),
236
Box::new(capture::NoopCaptureStream::new(
237
num_channels,
238
format,
239
frame_rate,
240
buffer_size,
241
)),
242
))
243
}
244
245
/// Returns a stream control and async buffer generator object. These are separate as the buffer
246
/// generator might want to be passed to the audio stream.
247
/// Default implementation returns `NoopStreamControl` and `NoopCaptureStream`.
248
#[allow(clippy::type_complexity)]
249
fn new_async_capture_stream(
250
&mut self,
251
num_channels: usize,
252
format: SampleFormat,
253
frame_rate: u32,
254
buffer_size: usize,
255
_effects: &[StreamEffect],
256
_ex: &dyn AudioStreamsExecutor,
257
) -> Result<
258
(
259
Box<dyn StreamControl>,
260
Box<dyn capture::AsyncCaptureBufferStream>,
261
),
262
BoxError,
263
> {
264
Ok((
265
Box::new(NoopStreamControl::new()),
266
Box::new(capture::NoopCaptureStream::new(
267
num_channels,
268
format,
269
frame_rate,
270
buffer_size,
271
)),
272
))
273
}
274
275
/// Returns a stream control and async buffer generator object asynchronously.
276
/// Default implementation calls and blocks on `new_async_capture_stream()`.
277
#[allow(clippy::type_complexity)]
278
async fn async_new_async_capture_stream(
279
&mut self,
280
num_channels: usize,
281
format: SampleFormat,
282
frame_rate: u32,
283
buffer_size: usize,
284
effects: &[StreamEffect],
285
ex: &dyn AudioStreamsExecutor,
286
) -> Result<
287
(
288
Box<dyn StreamControl>,
289
Box<dyn capture::AsyncCaptureBufferStream>,
290
),
291
BoxError,
292
> {
293
self.new_async_capture_stream(num_channels, format, frame_rate, buffer_size, effects, ex)
294
}
295
296
/// Returns any open file descriptors needed by the implementor. The FD list helps users of the
297
/// StreamSource enter Linux jails making sure not to close needed FDs.
298
fn keep_rds(&self) -> Option<Vec<RawDescriptor>> {
299
None
300
}
301
}
302
303
/// `PlaybackBufferStream` provides `PlaybackBuffer`s to fill with audio samples for playback.
304
pub trait PlaybackBufferStream: Send {
305
fn next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError>;
306
307
/// Call `f` with a `PlaybackBuffer`, and trigger the buffer done call back after. `f` should
308
/// write playback data to the given `PlaybackBuffer`.
309
fn write_playback_buffer<'b, 's: 'b>(
310
&'s mut self,
311
f: &mut dyn FnMut(&mut PlaybackBuffer<'b>) -> Result<(), BoxError>,
312
) -> Result<(), BoxError> {
313
let mut buf = self.next_playback_buffer()?;
314
f(&mut buf)?;
315
buf.commit();
316
Ok(())
317
}
318
}
319
320
impl<S: PlaybackBufferStream + ?Sized> PlaybackBufferStream for &mut S {
321
fn next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError> {
322
(**self).next_playback_buffer()
323
}
324
}
325
326
/// `PlaybackBufferStream` provides `PlaybackBuffer`s asynchronously to fill with audio samples for
327
/// playback.
328
#[async_trait(?Send)]
329
pub trait AsyncPlaybackBufferStream: Send {
330
async fn next_playback_buffer<'a>(
331
&'a mut self,
332
_ex: &dyn AudioStreamsExecutor,
333
) -> Result<AsyncPlaybackBuffer<'a>, BoxError>;
334
}
335
336
#[async_trait(?Send)]
337
impl<S: AsyncPlaybackBufferStream + ?Sized> AsyncPlaybackBufferStream for &mut S {
338
async fn next_playback_buffer<'a>(
339
&'a mut self,
340
ex: &dyn AudioStreamsExecutor,
341
) -> Result<AsyncPlaybackBuffer<'a>, BoxError> {
342
(**self).next_playback_buffer(ex).await
343
}
344
}
345
346
/// Call `f` with a `AsyncPlaybackBuffer`, and trigger the buffer done call back after. `f` should
347
/// write playback data to the given `AsyncPlaybackBuffer`.
348
///
349
/// This cannot be a trait method because trait methods with generic parameters are not object safe.
350
pub async fn async_write_playback_buffer<F>(
351
stream: &mut dyn AsyncPlaybackBufferStream,
352
f: F,
353
ex: &dyn AudioStreamsExecutor,
354
) -> Result<(), BoxError>
355
where
356
F: for<'a> FnOnce(&'a mut AsyncPlaybackBuffer) -> Result<(), BoxError>,
357
{
358
let mut buf = stream.next_playback_buffer(ex).await?;
359
f(&mut buf)?;
360
buf.commit().await;
361
Ok(())
362
}
363
364
/// `StreamControl` provides a way to set the volume and mute states of a stream. `StreamControl`
365
/// is separate from the stream so it can be owned by a different thread if needed.
366
pub trait StreamControl: Send + Sync {
367
fn set_volume(&mut self, _scaler: f64) {}
368
fn set_mute(&mut self, _mute: bool) {}
369
}
370
371
/// `BufferCommit` is a cleanup funcion that must be called before dropping the buffer,
372
/// allowing arbitrary code to be run after the buffer is filled or read by the user.
373
pub trait BufferCommit {
374
/// `write_playback_buffer` or `read_capture_buffer` would trigger this automatically. `nframes`
375
/// indicates the number of audio frames that were read or written to the device.
376
fn commit(&mut self, nframes: usize);
377
/// `latency_bytes` the current device latency.
378
/// For playback it means how many bytes need to be consumed
379
/// before the current playback buffer will be played.
380
/// For capture it means the latency in terms of bytes that the capture buffer was recorded.
381
fn latency_bytes(&self) -> u32 {
382
0
383
}
384
}
385
386
/// `AsyncBufferCommit` is a cleanup funcion that must be called before dropping the buffer,
387
/// allowing arbitrary code to be run after the buffer is filled or read by the user.
388
#[async_trait(?Send)]
389
pub trait AsyncBufferCommit {
390
/// `async_write_playback_buffer` or `async_read_capture_buffer` would trigger this
391
/// automatically. `nframes` indicates the number of audio frames that were read or written to
392
/// the device.
393
async fn commit(&mut self, nframes: usize);
394
/// `latency_bytes` the current device latency.
395
/// For playback it means how many bytes need to be consumed
396
/// before the current playback buffer will be played.
397
/// For capture it means the latency in terms of bytes that the capture buffer was recorded.
398
fn latency_bytes(&self) -> u32 {
399
0
400
}
401
}
402
403
/// Errors that are possible from a `PlaybackBuffer`.
404
#[sorted]
405
#[derive(Error, Debug)]
406
pub enum PlaybackBufferError {
407
#[error("Invalid buffer length")]
408
InvalidLength,
409
#[error("Slicing of playback buffer out of bounds")]
410
SliceOutOfBounds,
411
}
412
413
/// `AudioBuffer` is one buffer that holds buffer_size audio frames.
414
/// It is the inner data of `PlaybackBuffer` and `CaptureBuffer`.
415
struct AudioBuffer<'a> {
416
buffer: &'a mut [u8],
417
offset: usize, // Read or Write offset in frames.
418
frame_size: usize, // Size of a frame in bytes.
419
}
420
421
impl AudioBuffer<'_> {
422
/// Returns the number of audio frames that fit in the buffer.
423
pub fn frame_capacity(&self) -> usize {
424
self.buffer.len() / self.frame_size
425
}
426
427
fn calc_len(&self, size: usize) -> usize {
428
min(
429
size / self.frame_size * self.frame_size,
430
self.buffer.len() - self.offset,
431
)
432
}
433
434
/// Writes up to `size` bytes directly to this buffer inside of the given callback function.
435
pub fn write_copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
436
// only write complete frames.
437
let len = self.calc_len(size);
438
cb(&mut self.buffer[self.offset..(self.offset + len)]);
439
self.offset += len;
440
Ok(len)
441
}
442
443
/// Reads up to `size` bytes directly from this buffer inside of the given callback function.
444
pub fn read_copy_cb<F: FnOnce(&[u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
445
let len = self.calc_len(size);
446
cb(&self.buffer[self.offset..(self.offset + len)]);
447
self.offset += len;
448
Ok(len)
449
}
450
451
/// Copy data from an io::Reader
452
pub fn copy_from(&mut self, reader: &mut dyn Read) -> io::Result<usize> {
453
let bytes = reader.read(&mut self.buffer[self.offset..])?;
454
self.offset += bytes;
455
Ok(bytes)
456
}
457
458
/// Copy data to an io::Write
459
pub fn copy_to(&mut self, writer: &mut dyn Write) -> io::Result<usize> {
460
let bytes = writer.write(&self.buffer[self.offset..])?;
461
self.offset += bytes;
462
Ok(bytes)
463
}
464
}
465
466
impl Write for AudioBuffer<'_> {
467
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
468
let written = (&mut self.buffer[self.offset..]).write(&buf[..buf.len()])?;
469
self.offset += written;
470
Ok(written)
471
}
472
473
fn flush(&mut self) -> io::Result<()> {
474
Ok(())
475
}
476
}
477
478
impl Read for AudioBuffer<'_> {
479
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
480
let len = buf.len() / self.frame_size * self.frame_size;
481
let written = (&mut buf[..len]).write(&self.buffer[self.offset..])?;
482
self.offset += written;
483
Ok(written)
484
}
485
}
486
487
/// `PlaybackBuffer` is one buffer that holds buffer_size audio frames. It is used to temporarily
488
/// allow access to an audio buffer and notifes the owning stream of write completion when dropped.
489
pub struct PlaybackBuffer<'a> {
490
buffer: AudioBuffer<'a>,
491
drop: &'a mut dyn BufferCommit,
492
}
493
494
impl<'a> PlaybackBuffer<'a> {
495
/// Creates a new `PlaybackBuffer` that holds a reference to the backing memory specified in
496
/// `buffer`.
497
pub fn new<F>(
498
frame_size: usize,
499
buffer: &'a mut [u8],
500
drop: &'a mut F,
501
) -> Result<Self, PlaybackBufferError>
502
where
503
F: BufferCommit,
504
{
505
if buffer.len() % frame_size != 0 {
506
return Err(PlaybackBufferError::InvalidLength);
507
}
508
509
Ok(PlaybackBuffer {
510
buffer: AudioBuffer {
511
buffer,
512
offset: 0,
513
frame_size,
514
},
515
drop,
516
})
517
}
518
519
/// Returns the number of audio frames that fit in the buffer.
520
pub fn frame_capacity(&self) -> usize {
521
self.buffer.frame_capacity()
522
}
523
524
/// This triggers the commit of `BufferCommit`. This should be called after the data is copied
525
/// to the buffer.
526
pub fn commit(&mut self) {
527
self.drop
528
.commit(self.buffer.offset / self.buffer.frame_size);
529
}
530
531
/// It returns how many bytes need to be consumed
532
/// before the current playback buffer will be played.
533
pub fn latency_bytes(&self) -> u32 {
534
self.drop.latency_bytes()
535
}
536
537
/// Writes up to `size` bytes directly to this buffer inside of the given callback function.
538
pub fn copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
539
self.buffer.write_copy_cb(size, cb)
540
}
541
}
542
543
impl Write for PlaybackBuffer<'_> {
544
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
545
self.buffer.write(buf)
546
}
547
548
fn flush(&mut self) -> io::Result<()> {
549
self.buffer.flush()
550
}
551
}
552
553
/// `AsyncPlaybackBuffer` is the async version of `PlaybackBuffer`.
554
pub struct AsyncPlaybackBuffer<'a> {
555
buffer: AudioBuffer<'a>,
556
trigger: &'a mut dyn AsyncBufferCommit,
557
}
558
559
impl<'a> AsyncPlaybackBuffer<'a> {
560
/// Creates a new `AsyncPlaybackBuffer` that holds a reference to the backing memory specified
561
/// in `buffer`.
562
pub fn new<F>(
563
frame_size: usize,
564
buffer: &'a mut [u8],
565
trigger: &'a mut F,
566
) -> Result<Self, PlaybackBufferError>
567
where
568
F: AsyncBufferCommit,
569
{
570
if buffer.len() % frame_size != 0 {
571
return Err(PlaybackBufferError::InvalidLength);
572
}
573
574
Ok(AsyncPlaybackBuffer {
575
buffer: AudioBuffer {
576
buffer,
577
offset: 0,
578
frame_size,
579
},
580
trigger,
581
})
582
}
583
584
/// Returns the number of audio frames that fit in the buffer.
585
pub fn frame_capacity(&self) -> usize {
586
self.buffer.frame_capacity()
587
}
588
589
/// This triggers the callback of `AsyncBufferCommit`. This should be called after the data is
590
/// copied to the buffer.
591
pub async fn commit(&mut self) {
592
self.trigger
593
.commit(self.buffer.offset / self.buffer.frame_size)
594
.await;
595
}
596
597
/// It returns the latency in terms of bytes that the capture buffer was recorded.
598
pub fn latency_bytes(&self) -> u32 {
599
self.trigger.latency_bytes()
600
}
601
602
/// Writes up to `size` bytes directly to this buffer inside of the given callback function.
603
pub fn copy_cb<F: FnOnce(&mut [u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
604
self.buffer.write_copy_cb(size, cb)
605
}
606
607
/// Copy data from an io::Reader
608
pub fn copy_from(&mut self, reader: &mut dyn Read) -> io::Result<usize> {
609
self.buffer.copy_from(reader)
610
}
611
}
612
613
impl Write for AsyncPlaybackBuffer<'_> {
614
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
615
self.buffer.write(buf)
616
}
617
618
fn flush(&mut self) -> io::Result<()> {
619
self.buffer.flush()
620
}
621
}
622
/// Stream that accepts playback samples but drops them.
623
pub struct NoopStream {
624
buffer: Vec<u8>,
625
frame_size: usize,
626
interval: Duration,
627
next_frame: Duration,
628
start_time: Option<Instant>,
629
buffer_drop: NoopBufferCommit,
630
}
631
632
/// NoopStream data that is needed from the buffer complete callback.
633
struct NoopBufferCommit {
634
which_buffer: bool,
635
}
636
637
impl BufferCommit for NoopBufferCommit {
638
fn commit(&mut self, _nwritten: usize) {
639
// When a buffer completes, switch to the other one.
640
self.which_buffer ^= true;
641
}
642
}
643
644
#[async_trait(?Send)]
645
impl AsyncBufferCommit for NoopBufferCommit {
646
async fn commit(&mut self, _nwritten: usize) {
647
// When a buffer completes, switch to the other one.
648
self.which_buffer ^= true;
649
}
650
}
651
652
impl NoopStream {
653
pub fn new(
654
num_channels: usize,
655
format: SampleFormat,
656
frame_rate: u32,
657
buffer_size: usize,
658
) -> Self {
659
let frame_size = format.sample_bytes() * num_channels;
660
let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64);
661
NoopStream {
662
buffer: vec![0; buffer_size * frame_size],
663
frame_size,
664
interval,
665
next_frame: interval,
666
start_time: None,
667
buffer_drop: NoopBufferCommit {
668
which_buffer: false,
669
},
670
}
671
}
672
}
673
674
impl PlaybackBufferStream for NoopStream {
675
fn next_playback_buffer<'b, 's: 'b>(&'s mut self) -> Result<PlaybackBuffer<'b>, BoxError> {
676
if let Some(start_time) = self.start_time {
677
let elapsed = start_time.elapsed();
678
if elapsed < self.next_frame {
679
std::thread::sleep(self.next_frame - elapsed);
680
}
681
self.next_frame += self.interval;
682
} else {
683
self.start_time = Some(Instant::now());
684
self.next_frame = self.interval;
685
}
686
Ok(PlaybackBuffer::new(
687
self.frame_size,
688
&mut self.buffer,
689
&mut self.buffer_drop,
690
)?)
691
}
692
}
693
694
#[async_trait(?Send)]
695
impl AsyncPlaybackBufferStream for NoopStream {
696
async fn next_playback_buffer<'a>(
697
&'a mut self,
698
ex: &dyn AudioStreamsExecutor,
699
) -> Result<AsyncPlaybackBuffer<'a>, BoxError> {
700
if let Some(start_time) = self.start_time {
701
let elapsed = start_time.elapsed();
702
if elapsed < self.next_frame {
703
ex.delay(self.next_frame - elapsed).await?;
704
}
705
self.next_frame += self.interval;
706
} else {
707
self.start_time = Some(Instant::now());
708
self.next_frame = self.interval;
709
}
710
Ok(AsyncPlaybackBuffer::new(
711
self.frame_size,
712
&mut self.buffer,
713
&mut self.buffer_drop,
714
)?)
715
}
716
}
717
718
/// No-op control for `NoopStream`s.
719
#[derive(Default)]
720
pub struct NoopStreamControl;
721
722
impl NoopStreamControl {
723
pub fn new() -> Self {
724
NoopStreamControl {}
725
}
726
}
727
728
impl StreamControl for NoopStreamControl {}
729
730
/// Source of `NoopStream` and `NoopStreamControl` objects.
731
#[derive(Default)]
732
pub struct NoopStreamSource;
733
734
impl NoopStreamSource {
735
pub fn new() -> Self {
736
NoopStreamSource {}
737
}
738
}
739
740
impl StreamSource for NoopStreamSource {
741
#[allow(clippy::type_complexity)]
742
fn new_playback_stream(
743
&mut self,
744
num_channels: usize,
745
format: SampleFormat,
746
frame_rate: u32,
747
buffer_size: usize,
748
) -> Result<(Box<dyn StreamControl>, Box<dyn PlaybackBufferStream>), BoxError> {
749
Ok((
750
Box::new(NoopStreamControl::new()),
751
Box::new(NoopStream::new(
752
num_channels,
753
format,
754
frame_rate,
755
buffer_size,
756
)),
757
))
758
}
759
760
#[allow(clippy::type_complexity)]
761
fn new_async_playback_stream(
762
&mut self,
763
num_channels: usize,
764
format: SampleFormat,
765
frame_rate: u32,
766
buffer_size: usize,
767
_ex: &dyn AudioStreamsExecutor,
768
) -> Result<(Box<dyn StreamControl>, Box<dyn AsyncPlaybackBufferStream>), BoxError> {
769
Ok((
770
Box::new(NoopStreamControl::new()),
771
Box::new(NoopStream::new(
772
num_channels,
773
format,
774
frame_rate,
775
buffer_size,
776
)),
777
))
778
}
779
}
780
781
/// `NoopStreamSourceGenerator` is a struct that implements [`StreamSourceGenerator`]
782
/// to generate [`NoopStreamSource`].
783
pub struct NoopStreamSourceGenerator;
784
785
impl NoopStreamSourceGenerator {
786
pub fn new() -> Self {
787
NoopStreamSourceGenerator {}
788
}
789
}
790
791
impl Default for NoopStreamSourceGenerator {
792
fn default() -> Self {
793
Self::new()
794
}
795
}
796
797
impl StreamSourceGenerator for NoopStreamSourceGenerator {
798
fn generate(&self) -> Result<Box<dyn StreamSource>, BoxError> {
799
Ok(Box::new(NoopStreamSource))
800
}
801
}
802
803
#[cfg(test)]
804
mod tests {
805
use futures::FutureExt;
806
use io::Write;
807
808
use super::async_api::test::TestExecutor;
809
use super::*;
810
811
#[test]
812
fn invalid_buffer_length() {
813
// Playback buffers can't be created with a size that isn't divisible by the frame size.
814
let mut pb_buf = [0xa5u8; 480 * 2 * 2 + 1];
815
let mut buffer_drop = NoopBufferCommit {
816
which_buffer: false,
817
};
818
assert!(PlaybackBuffer::new(2, &mut pb_buf, &mut buffer_drop).is_err());
819
}
820
821
#[test]
822
fn audio_buffer_copy_from() {
823
const PERIOD_SIZE: usize = 8192;
824
const NUM_CHANNELS: usize = 6;
825
const FRAME_SIZE: usize = NUM_CHANNELS * 2;
826
let mut dst_buf = [0u8; PERIOD_SIZE * FRAME_SIZE];
827
let src_buf = [0xa5u8; PERIOD_SIZE * FRAME_SIZE];
828
let mut aud_buf = AudioBuffer {
829
buffer: &mut dst_buf,
830
offset: 0,
831
frame_size: FRAME_SIZE,
832
};
833
aud_buf
834
.copy_from(&mut &src_buf[..])
835
.expect("all data should be copied.");
836
assert_eq!(dst_buf, src_buf);
837
}
838
839
#[test]
840
fn audio_buffer_copy_from_repeat() {
841
const PERIOD_SIZE: usize = 8192;
842
const NUM_CHANNELS: usize = 6;
843
const FRAME_SIZE: usize = NUM_CHANNELS * 2;
844
let mut dst_buf = [0u8; PERIOD_SIZE * FRAME_SIZE];
845
let mut aud_buf = AudioBuffer {
846
buffer: &mut dst_buf,
847
offset: 0,
848
frame_size: FRAME_SIZE,
849
};
850
let bytes = aud_buf
851
.copy_from(&mut io::repeat(1))
852
.expect("all data should be copied.");
853
assert_eq!(bytes, PERIOD_SIZE * FRAME_SIZE);
854
assert_eq!(dst_buf, [1u8; PERIOD_SIZE * FRAME_SIZE]);
855
}
856
857
#[test]
858
fn audio_buffer_copy_to() {
859
const PERIOD_SIZE: usize = 8192;
860
const NUM_CHANNELS: usize = 6;
861
const FRAME_SIZE: usize = NUM_CHANNELS * 2;
862
let mut dst_buf = [0u8; PERIOD_SIZE * FRAME_SIZE];
863
let mut src_buf = [0xa5u8; PERIOD_SIZE * FRAME_SIZE];
864
let mut aud_buf = AudioBuffer {
865
buffer: &mut src_buf,
866
offset: 0,
867
frame_size: FRAME_SIZE,
868
};
869
aud_buf
870
.copy_to(&mut &mut dst_buf[..])
871
.expect("all data should be copied.");
872
assert_eq!(dst_buf, src_buf);
873
}
874
875
#[test]
876
fn audio_buffer_copy_to_sink() {
877
const PERIOD_SIZE: usize = 8192;
878
const NUM_CHANNELS: usize = 6;
879
const FRAME_SIZE: usize = NUM_CHANNELS * 2;
880
let mut src_buf = [0xa5u8; PERIOD_SIZE * FRAME_SIZE];
881
let mut aud_buf = AudioBuffer {
882
buffer: &mut src_buf,
883
offset: 0,
884
frame_size: FRAME_SIZE,
885
};
886
let bytes = aud_buf
887
.copy_to(&mut io::sink())
888
.expect("all data should be copied.");
889
assert_eq!(bytes, PERIOD_SIZE * FRAME_SIZE);
890
}
891
892
#[test]
893
fn io_copy_audio_buffer() {
894
const PERIOD_SIZE: usize = 8192;
895
const NUM_CHANNELS: usize = 6;
896
const FRAME_SIZE: usize = NUM_CHANNELS * 2;
897
let mut dst_buf = [0u8; PERIOD_SIZE * FRAME_SIZE];
898
let src_buf = [0xa5u8; PERIOD_SIZE * FRAME_SIZE];
899
let mut aud_buf = AudioBuffer {
900
buffer: &mut dst_buf,
901
offset: 0,
902
frame_size: FRAME_SIZE,
903
};
904
io::copy(&mut &src_buf[..], &mut aud_buf).expect("all data should be copied.");
905
assert_eq!(dst_buf, src_buf);
906
}
907
908
#[test]
909
fn commit() {
910
struct TestCommit {
911
frame_count: usize,
912
}
913
impl BufferCommit for TestCommit {
914
fn commit(&mut self, nwritten: usize) {
915
self.frame_count += nwritten;
916
}
917
}
918
let mut test_commit = TestCommit { frame_count: 0 };
919
{
920
const FRAME_SIZE: usize = 4;
921
let mut buf = [0u8; 480 * FRAME_SIZE];
922
let mut pb_buf = PlaybackBuffer::new(FRAME_SIZE, &mut buf, &mut test_commit).unwrap();
923
pb_buf.write_all(&[0xa5u8; 480 * FRAME_SIZE]).unwrap();
924
pb_buf.commit();
925
}
926
assert_eq!(test_commit.frame_count, 480);
927
}
928
929
#[test]
930
fn sixteen_bit_stereo() {
931
let mut server = NoopStreamSource::new();
932
let (_, mut stream) = server
933
.new_playback_stream(2, SampleFormat::S16LE, 48000, 480)
934
.unwrap();
935
let mut copy_cb = |buf: &mut PlaybackBuffer| {
936
assert_eq!(buf.buffer.frame_capacity(), 480);
937
let pb_buf = [0xa5u8; 480 * 2 * 2];
938
assert_eq!(buf.write(&pb_buf).unwrap(), 480 * 2 * 2);
939
Ok(())
940
};
941
stream.write_playback_buffer(&mut copy_cb).unwrap();
942
}
943
944
#[test]
945
fn consumption_rate() {
946
let mut server = NoopStreamSource::new();
947
let (_, mut stream) = server
948
.new_playback_stream(2, SampleFormat::S16LE, 48000, 480)
949
.unwrap();
950
let start = Instant::now();
951
{
952
let mut copy_cb = |buf: &mut PlaybackBuffer| {
953
let pb_buf = [0xa5u8; 480 * 2 * 2];
954
assert_eq!(buf.write(&pb_buf).unwrap(), 480 * 2 * 2);
955
Ok(())
956
};
957
stream.write_playback_buffer(&mut copy_cb).unwrap();
958
}
959
// The second call should block until the first buffer is consumed.
960
let mut assert_cb = |_: &mut PlaybackBuffer| {
961
let elapsed = start.elapsed();
962
assert!(
963
elapsed > Duration::from_millis(10),
964
"next_playback_buffer didn't block long enough {}",
965
elapsed.subsec_millis()
966
);
967
Ok(())
968
};
969
stream.write_playback_buffer(&mut assert_cb).unwrap();
970
}
971
972
#[test]
973
fn async_commit() {
974
struct TestCommit {
975
frame_count: usize,
976
}
977
#[async_trait(?Send)]
978
impl AsyncBufferCommit for TestCommit {
979
async fn commit(&mut self, nwritten: usize) {
980
self.frame_count += nwritten;
981
}
982
}
983
async fn this_test() {
984
let mut test_commit = TestCommit { frame_count: 0 };
985
{
986
const FRAME_SIZE: usize = 4;
987
let mut buf = [0u8; 480 * FRAME_SIZE];
988
let mut pb_buf =
989
AsyncPlaybackBuffer::new(FRAME_SIZE, &mut buf, &mut test_commit).unwrap();
990
pb_buf.write_all(&[0xa5u8; 480 * FRAME_SIZE]).unwrap();
991
pb_buf.commit().await;
992
}
993
assert_eq!(test_commit.frame_count, 480);
994
}
995
996
this_test().now_or_never();
997
}
998
999
#[test]
1000
fn consumption_rate_async() {
1001
async fn this_test(ex: &TestExecutor) {
1002
let mut server = NoopStreamSource::new();
1003
let (_, mut stream) = server
1004
.new_async_playback_stream(2, SampleFormat::S16LE, 48000, 480, ex)
1005
.unwrap();
1006
let start = Instant::now();
1007
{
1008
let copy_func = |buf: &mut AsyncPlaybackBuffer| {
1009
let pb_buf = [0xa5u8; 480 * 2 * 2];
1010
assert_eq!(buf.write(&pb_buf).unwrap(), 480 * 2 * 2);
1011
Ok(())
1012
};
1013
async_write_playback_buffer(&mut *stream, copy_func, ex)
1014
.await
1015
.unwrap();
1016
}
1017
// The second call should block until the first buffer is consumed.
1018
let assert_func = |_: &mut AsyncPlaybackBuffer| {
1019
let elapsed = start.elapsed();
1020
assert!(
1021
elapsed > Duration::from_millis(10),
1022
"write_playback_buffer didn't block long enough {}",
1023
elapsed.subsec_millis()
1024
);
1025
Ok(())
1026
};
1027
1028
async_write_playback_buffer(&mut *stream, assert_func, ex)
1029
.await
1030
.unwrap();
1031
}
1032
1033
let ex = TestExecutor {};
1034
this_test(&ex).now_or_never();
1035
}
1036
1037
#[test]
1038
fn generate_noop_stream_source() {
1039
let generator: Box<dyn StreamSourceGenerator> = Box::new(NoopStreamSourceGenerator::new());
1040
generator
1041
.generate()
1042
.expect("failed to generate stream source");
1043
}
1044
}
1045
1046