Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
google
GitHub Repository: google/crosvm
Path: blob/main/common/audio_streams/src/capture.rs
5394 views
1
// Copyright 2019 The ChromiumOS Authors
2
// Use of this source code is governed by a BSD-style license that can be
3
// found in the LICENSE file.
4
5
//! ```
6
//! use audio_streams::{BoxError, capture::CaptureBuffer, SampleFormat, StreamSource,
7
//! NoopStreamSource};
8
//! use std::io::Read;
9
//!
10
//! const buffer_size: usize = 120;
11
//! const num_channels: usize = 2;
12
//!
13
//! # fn main() -> std::result::Result<(),BoxError> {
14
//! let mut stream_source = NoopStreamSource::new();
15
//! let sample_format = SampleFormat::S16LE;
16
//! let frame_size = num_channels * sample_format.sample_bytes();
17
//!
18
//! let (_, mut stream) = stream_source
19
//! .new_capture_stream(num_channels, sample_format, 48000, buffer_size, &[])?;
20
//! // Capture 10 buffers of zeros.
21
//! let mut buf = Vec::new();
22
//! buf.resize(buffer_size * frame_size, 0xa5u8);
23
//! for _ in 0..10 {
24
//! let mut copy_func = |stream_buffer: &mut CaptureBuffer| {
25
//! assert_eq!(stream_buffer.read(&mut buf)?, buffer_size * frame_size);
26
//! Ok(())
27
//! };
28
//! stream.read_capture_buffer(&mut copy_func)?;
29
//! }
30
//! # Ok (())
31
//! # }
32
//! ```
33
34
use std::io;
35
use std::io::Read;
36
use std::io::Write;
37
use std::time::Duration;
38
use std::time::Instant;
39
40
use async_trait::async_trait;
41
use remain::sorted;
42
use thiserror::Error;
43
44
use super::async_api::AudioStreamsExecutor;
45
use super::AsyncBufferCommit;
46
use super::AudioBuffer;
47
use super::BoxError;
48
use super::BufferCommit;
49
use super::NoopBufferCommit;
50
use super::SampleFormat;
51
52
/// `CaptureBufferStream` provides `CaptureBuffer`s to read with audio samples from capture.
53
pub trait CaptureBufferStream: Send {
54
fn next_capture_buffer<'b, 's: 'b>(&'s mut self) -> Result<CaptureBuffer<'b>, BoxError>;
55
56
/// Call `f` with a `CaptureBuffer`, and trigger the buffer done call back after. `f` can read
57
/// the capture data from the given `CaptureBuffer`.
58
fn read_capture_buffer<'b, 's: 'b>(
59
&'s mut self,
60
f: &mut dyn FnMut(&mut CaptureBuffer<'b>) -> Result<(), BoxError>,
61
) -> Result<(), BoxError> {
62
let mut buf = self.next_capture_buffer()?;
63
f(&mut buf)?;
64
buf.commit();
65
Ok(())
66
}
67
}
68
69
impl<S: CaptureBufferStream + ?Sized> CaptureBufferStream for &mut S {
70
fn next_capture_buffer<'b, 's: 'b>(&'s mut self) -> Result<CaptureBuffer<'b>, BoxError> {
71
(**self).next_capture_buffer()
72
}
73
}
74
75
#[async_trait(?Send)]
76
pub trait AsyncCaptureBufferStream: Send {
77
async fn next_capture_buffer<'a>(
78
&'a mut self,
79
_ex: &dyn AudioStreamsExecutor,
80
) -> Result<AsyncCaptureBuffer<'a>, BoxError>;
81
}
82
83
#[async_trait(?Send)]
84
impl<S: AsyncCaptureBufferStream + ?Sized> AsyncCaptureBufferStream for &mut S {
85
async fn next_capture_buffer<'a>(
86
&'a mut self,
87
ex: &dyn AudioStreamsExecutor,
88
) -> Result<AsyncCaptureBuffer<'a>, BoxError> {
89
(**self).next_capture_buffer(ex).await
90
}
91
}
92
93
/// `CaptureBuffer` contains a block of audio samples got from capture stream. It provides
94
/// temporary view to those samples and will notifies capture stream when dropped.
95
/// Note that it'll always send `buffer.len() / frame_size` to drop function when it got destroyed
96
/// since `CaptureBufferStream` assumes that users get all the samples from the buffer.
97
pub struct CaptureBuffer<'a> {
98
buffer: AudioBuffer<'a>,
99
drop: &'a mut dyn BufferCommit,
100
}
101
102
/// Async version of 'CaptureBuffer`
103
pub struct AsyncCaptureBuffer<'a> {
104
buffer: AudioBuffer<'a>,
105
trigger: &'a mut dyn AsyncBufferCommit,
106
}
107
108
/// Errors that are possible from a `CaptureBuffer`.
109
#[sorted]
110
#[derive(Error, Debug)]
111
pub enum CaptureBufferError {
112
#[error("Invalid buffer length")]
113
InvalidLength,
114
}
115
116
impl<'a> CaptureBuffer<'a> {
117
/// Creates a new `CaptureBuffer` that holds a reference to the backing memory specified in
118
/// `buffer`.
119
pub fn new<F>(
120
frame_size: usize,
121
buffer: &'a mut [u8],
122
drop: &'a mut F,
123
) -> Result<Self, CaptureBufferError>
124
where
125
F: BufferCommit,
126
{
127
if buffer.len() % frame_size != 0 {
128
return Err(CaptureBufferError::InvalidLength);
129
}
130
131
Ok(CaptureBuffer {
132
buffer: AudioBuffer {
133
buffer,
134
frame_size,
135
offset: 0,
136
},
137
drop,
138
})
139
}
140
141
/// Returns the number of audio frames that fit in the buffer.
142
pub fn frame_capacity(&self) -> usize {
143
self.buffer.frame_capacity()
144
}
145
146
/// This triggers the callback of `BufferCommit`. This should be called after the data is read
147
/// from the buffer.
148
///
149
/// Always sends `frame_capacity`.
150
pub fn commit(&mut self) {
151
self.drop.commit(self.frame_capacity());
152
}
153
154
pub fn latency_bytes(&self) -> u32 {
155
self.drop.latency_bytes()
156
}
157
158
/// Reads up to `size` bytes directly from this buffer inside of the given callback function.
159
pub fn copy_cb<F: FnOnce(&[u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
160
self.buffer.read_copy_cb(size, cb)
161
}
162
}
163
164
impl Read for CaptureBuffer<'_> {
165
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
166
self.buffer.read(buf)
167
}
168
}
169
170
impl<'a> AsyncCaptureBuffer<'a> {
171
/// Creates a new `AsyncCaptureBuffer` that holds a reference to the backing memory specified in
172
/// `buffer`.
173
pub fn new<F>(
174
frame_size: usize,
175
buffer: &'a mut [u8],
176
trigger: &'a mut F,
177
) -> Result<Self, CaptureBufferError>
178
where
179
F: AsyncBufferCommit,
180
{
181
if buffer.len() % frame_size != 0 {
182
return Err(CaptureBufferError::InvalidLength);
183
}
184
185
Ok(AsyncCaptureBuffer {
186
buffer: AudioBuffer {
187
buffer,
188
frame_size,
189
offset: 0,
190
},
191
trigger,
192
})
193
}
194
195
/// Returns the number of audio frames that fit in the buffer.
196
pub fn frame_capacity(&self) -> usize {
197
self.buffer.frame_capacity()
198
}
199
200
/// This triggers the callback of `AsyncBufferCommit`. This should be called after the data is
201
/// read from the buffer.
202
///
203
/// Always sends `frame_capacity`.
204
pub async fn commit(&mut self) {
205
self.trigger.commit(self.frame_capacity()).await;
206
}
207
208
pub fn latency_bytes(&self) -> u32 {
209
self.trigger.latency_bytes()
210
}
211
212
/// Reads up to `size` bytes directly from this buffer inside of the given callback function.
213
pub fn copy_cb<F: FnOnce(&[u8])>(&mut self, size: usize, cb: F) -> io::Result<usize> {
214
self.buffer.read_copy_cb(size, cb)
215
}
216
217
/// Copy data to an io::Write
218
pub fn copy_to(&mut self, writer: &mut dyn Write) -> io::Result<usize> {
219
self.buffer.copy_to(writer)
220
}
221
}
222
223
impl Read for AsyncCaptureBuffer<'_> {
224
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
225
self.buffer.read(buf)
226
}
227
}
228
229
/// Stream that provides null capture samples.
230
pub struct NoopCaptureStream {
231
buffer: Vec<u8>,
232
frame_size: usize,
233
interval: Duration,
234
next_frame: Duration,
235
start_time: Option<Instant>,
236
buffer_drop: NoopBufferCommit,
237
}
238
239
impl NoopCaptureStream {
240
pub fn new(
241
num_channels: usize,
242
format: SampleFormat,
243
frame_rate: u32,
244
buffer_size: usize,
245
) -> Self {
246
let frame_size = format.sample_bytes() * num_channels;
247
let interval = Duration::from_millis(buffer_size as u64 * 1000 / frame_rate as u64);
248
NoopCaptureStream {
249
buffer: vec![0; buffer_size * frame_size],
250
frame_size,
251
interval,
252
next_frame: interval,
253
start_time: None,
254
buffer_drop: NoopBufferCommit {
255
which_buffer: false,
256
},
257
}
258
}
259
}
260
261
impl CaptureBufferStream for NoopCaptureStream {
262
fn next_capture_buffer<'b, 's: 'b>(&'s mut self) -> Result<CaptureBuffer<'b>, BoxError> {
263
if let Some(start_time) = self.start_time {
264
let elapsed = start_time.elapsed();
265
if elapsed < self.next_frame {
266
std::thread::sleep(self.next_frame - elapsed);
267
}
268
self.next_frame += self.interval;
269
} else {
270
self.start_time = Some(Instant::now());
271
self.next_frame = self.interval;
272
}
273
Ok(CaptureBuffer::new(
274
self.frame_size,
275
&mut self.buffer,
276
&mut self.buffer_drop,
277
)?)
278
}
279
}
280
281
#[async_trait(?Send)]
282
impl AsyncCaptureBufferStream for NoopCaptureStream {
283
async fn next_capture_buffer<'a>(
284
&'a mut self,
285
ex: &dyn AudioStreamsExecutor,
286
) -> Result<AsyncCaptureBuffer<'a>, BoxError> {
287
if let Some(start_time) = self.start_time {
288
let elapsed = start_time.elapsed();
289
if elapsed < self.next_frame {
290
ex.delay(self.next_frame - elapsed).await?;
291
}
292
self.next_frame += self.interval;
293
} else {
294
self.start_time = Some(Instant::now());
295
self.next_frame = self.interval;
296
}
297
Ok(AsyncCaptureBuffer::new(
298
self.frame_size,
299
&mut self.buffer,
300
&mut self.buffer_drop,
301
)?)
302
}
303
}
304
305
/// Call `f` with a `AsyncCaptureBuffer`, and trigger the buffer done call back after. `f` can read
306
/// the capture data from the given `AsyncCaptureBuffer`.
307
///
308
/// This cannot be a trait method because trait methods with generic parameters are not object safe.
309
pub async fn async_read_capture_buffer<F>(
310
stream: &mut dyn AsyncCaptureBufferStream,
311
f: F,
312
ex: &dyn AudioStreamsExecutor,
313
) -> Result<(), BoxError>
314
where
315
F: FnOnce(&mut AsyncCaptureBuffer) -> Result<(), BoxError>,
316
{
317
let mut buf = stream.next_capture_buffer(ex).await?;
318
f(&mut buf)?;
319
buf.commit().await;
320
Ok(())
321
}
322
323
#[cfg(test)]
324
mod tests {
325
use futures::FutureExt;
326
327
use super::super::async_api::test::TestExecutor;
328
use super::super::*;
329
use super::*;
330
331
#[test]
332
fn invalid_buffer_length() {
333
// Capture buffers can't be created with a size that isn't divisible by the frame size.
334
let mut cp_buf = [0xa5u8; 480 * 2 * 2 + 1];
335
let mut buffer_drop = NoopBufferCommit {
336
which_buffer: false,
337
};
338
assert!(CaptureBuffer::new(2, &mut cp_buf, &mut buffer_drop).is_err());
339
}
340
341
#[test]
342
fn commit() {
343
struct TestCommit {
344
frame_count: usize,
345
}
346
impl BufferCommit for TestCommit {
347
fn commit(&mut self, nwritten: usize) {
348
self.frame_count += nwritten;
349
}
350
}
351
let mut test_commit = TestCommit { frame_count: 0 };
352
{
353
const FRAME_SIZE: usize = 4;
354
let mut buf = [0u8; 480 * FRAME_SIZE];
355
let mut cp_buf = CaptureBuffer::new(FRAME_SIZE, &mut buf, &mut test_commit).unwrap();
356
let mut local_buf = [0u8; 240 * FRAME_SIZE];
357
assert_eq!(cp_buf.read(&mut local_buf).unwrap(), 240 * FRAME_SIZE);
358
cp_buf.commit();
359
}
360
// This should be 480 no matter how many samples are read.
361
assert_eq!(test_commit.frame_count, 480);
362
}
363
364
#[test]
365
fn sixteen_bit_stereo() {
366
let mut server = NoopStreamSource::new();
367
let (_, mut stream) = server
368
.new_capture_stream(2, SampleFormat::S16LE, 48000, 480, &[])
369
.unwrap();
370
let mut copy_func = |b: &mut CaptureBuffer| {
371
assert_eq!(b.buffer.frame_capacity(), 480);
372
let mut pb_buf = [0xa5u8; 480 * 2 * 2];
373
assert_eq!(b.read(&mut pb_buf).unwrap(), 480 * 2 * 2);
374
Ok(())
375
};
376
stream.read_capture_buffer(&mut copy_func).unwrap();
377
}
378
379
#[test]
380
fn consumption_rate() {
381
let mut server = NoopStreamSource::new();
382
let (_, mut stream) = server
383
.new_capture_stream(2, SampleFormat::S16LE, 48000, 480, &[])
384
.unwrap();
385
let start = Instant::now();
386
{
387
let mut copy_func = |b: &mut CaptureBuffer| {
388
let mut cp_buf = [0xa5u8; 480 * 2 * 2];
389
assert_eq!(b.read(&mut cp_buf).unwrap(), 480 * 2 * 2);
390
for buf in cp_buf.iter() {
391
assert_eq!(*buf, 0, "Read samples should all be zeros.");
392
}
393
Ok(())
394
};
395
stream.read_capture_buffer(&mut copy_func).unwrap();
396
}
397
// The second call should block until the first buffer is consumed.
398
let mut assert_func = |_: &mut CaptureBuffer| {
399
let elapsed = start.elapsed();
400
assert!(
401
elapsed > Duration::from_millis(10),
402
"next_capture_buffer didn't block long enough {}",
403
elapsed.subsec_millis()
404
);
405
Ok(())
406
};
407
stream.read_capture_buffer(&mut assert_func).unwrap();
408
}
409
410
#[test]
411
fn async_commit() {
412
struct TestCommit {
413
frame_count: usize,
414
}
415
#[async_trait(?Send)]
416
impl AsyncBufferCommit for TestCommit {
417
async fn commit(&mut self, nwritten: usize) {
418
self.frame_count += nwritten;
419
}
420
}
421
async fn this_test() {
422
let mut test_commit = TestCommit { frame_count: 0 };
423
{
424
const FRAME_SIZE: usize = 4;
425
let mut buf = [0u8; 480 * FRAME_SIZE];
426
let mut cp_buf =
427
AsyncCaptureBuffer::new(FRAME_SIZE, &mut buf, &mut test_commit).unwrap();
428
let mut local_buf = [0u8; 240 * FRAME_SIZE];
429
assert_eq!(cp_buf.read(&mut local_buf).unwrap(), 240 * FRAME_SIZE);
430
cp_buf.commit().await;
431
}
432
// This should be 480 no matter how many samples are read.
433
assert_eq!(test_commit.frame_count, 480);
434
}
435
436
this_test().now_or_never();
437
}
438
439
#[test]
440
fn consumption_rate_async() {
441
async fn this_test(ex: &TestExecutor) {
442
let mut server = NoopStreamSource::new();
443
let (_, mut stream) = server
444
.new_async_capture_stream(2, SampleFormat::S16LE, 48000, 480, &[], ex)
445
.unwrap();
446
let start = Instant::now();
447
{
448
let copy_func = |buf: &mut AsyncCaptureBuffer| {
449
let mut cp_buf = [0xa5u8; 480 * 2 * 2];
450
assert_eq!(buf.read(&mut cp_buf).unwrap(), 480 * 2 * 2);
451
for buf in cp_buf.iter() {
452
assert_eq!(*buf, 0, "Read samples should all be zeros.");
453
}
454
Ok(())
455
};
456
async_read_capture_buffer(&mut *stream, copy_func, ex)
457
.await
458
.unwrap();
459
}
460
// The second call should block until the first buffer is consumed.
461
let assert_func = |_: &mut AsyncCaptureBuffer| {
462
let elapsed = start.elapsed();
463
assert!(
464
elapsed > Duration::from_millis(10),
465
"write_playback_buffer didn't block long enough {}",
466
elapsed.subsec_millis()
467
);
468
Ok(())
469
};
470
async_read_capture_buffer(&mut *stream, assert_func, ex)
471
.await
472
.unwrap();
473
}
474
475
let ex = TestExecutor {};
476
this_test(&ex).now_or_never();
477
}
478
}
479
480