Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
google
GitHub Repository: google/crosvm
Path: blob/main/devices/src/virtio/snd/vios_backend/streams.rs
5394 views
1
// Copyright 2021 The ChromiumOS Authors
2
// Use of this source code is governed by a BSD-style license that can be
3
// found in the LICENSE file.
4
5
use std::collections::VecDeque;
6
use std::sync::mpsc::channel;
7
use std::sync::mpsc::Receiver;
8
use std::sync::mpsc::Sender;
9
use std::sync::Arc;
10
use std::thread;
11
use std::time::Duration;
12
use std::time::Instant;
13
14
use base::error;
15
use base::set_rt_prio_limit;
16
use base::set_rt_round_robin;
17
use base::warn;
18
use data_model::Le32;
19
use serde::Deserialize;
20
use serde::Serialize;
21
use sync::Mutex;
22
23
use super::Error as VioSError;
24
use super::Result;
25
use super::SoundError;
26
use super::*;
27
use crate::virtio::snd::common::from_virtio_frame_rate;
28
use crate::virtio::snd::constants::*;
29
use crate::virtio::snd::layout::*;
30
use crate::virtio::DescriptorChain;
31
use crate::virtio::Queue;
32
33
/// Messages that the worker can send to the stream (thread).
34
pub enum StreamMsg {
35
SetParams(DescriptorChain, virtio_snd_pcm_set_params),
36
Prepare(DescriptorChain),
37
Start(DescriptorChain),
38
Stop(DescriptorChain),
39
Release(DescriptorChain),
40
Buffer(DescriptorChain),
41
Break,
42
}
43
44
#[derive(Clone, Serialize, Deserialize)]
45
pub enum StreamState {
46
New,
47
ParamsSet,
48
Prepared,
49
Started,
50
Stopped,
51
Released,
52
}
53
54
pub struct Stream {
55
stream_id: u32,
56
receiver: Receiver<Box<StreamMsg>>,
57
vios_client: Arc<Mutex<VioSClient>>,
58
control_queue: Arc<Mutex<Queue>>,
59
io_queue: Arc<Mutex<Queue>>,
60
capture: bool,
61
current_state: StreamState,
62
period: Duration,
63
start_time: Instant,
64
next_buffer: Duration,
65
buffer_queue: VecDeque<DescriptorChain>,
66
}
67
68
#[derive(Clone, Serialize, Deserialize)]
69
pub struct StreamSnapshot {
70
pub current_state: StreamState,
71
pub period: Duration,
72
pub next_buffer: Duration,
73
}
74
75
impl Stream {
76
/// Start a new stream thread and return its handler.
77
pub fn try_new(
78
stream_id: u32,
79
vios_client: Arc<Mutex<VioSClient>>,
80
control_queue: Arc<Mutex<Queue>>,
81
io_queue: Arc<Mutex<Queue>>,
82
capture: bool,
83
stream_state: Option<StreamSnapshot>,
84
) -> Result<StreamProxy> {
85
let (sender, receiver): (Sender<Box<StreamMsg>>, Receiver<Box<StreamMsg>>) = channel();
86
let thread = thread::Builder::new()
87
.name(format!("v_snd_stream:{stream_id}"))
88
.spawn(move || {
89
try_set_real_time_priority();
90
let (current_state, period, next_buffer) =
91
if let Some(stream_state) = stream_state.clone() {
92
(
93
stream_state.current_state,
94
stream_state.period,
95
stream_state.next_buffer,
96
)
97
} else {
98
(
99
StreamState::New,
100
Duration::from_millis(0),
101
Duration::from_millis(0),
102
)
103
};
104
105
let mut stream = Stream {
106
stream_id,
107
receiver,
108
vios_client: vios_client.clone(),
109
control_queue,
110
io_queue,
111
capture,
112
current_state,
113
period,
114
start_time: Instant::now(),
115
next_buffer,
116
buffer_queue: VecDeque::new(),
117
};
118
119
if let Some(stream_state) = stream_state {
120
if let Err(e) = vios_client
121
.lock()
122
.restore_stream(stream_id, stream_state.current_state)
123
{
124
error!("failed to restore stream params: {}", e);
125
};
126
}
127
if let Err(e) = stream.stream_loop() {
128
error!("virtio-snd: Error in stream {}: {}", stream_id, e);
129
}
130
let state = stream.current_state.clone();
131
StreamSnapshot {
132
current_state: state,
133
period: stream.period,
134
next_buffer: stream.next_buffer,
135
}
136
})
137
.map_err(SoundError::CreateThread)?;
138
Ok(StreamProxy {
139
sender,
140
thread: Some(thread),
141
})
142
}
143
144
fn stream_loop(&mut self) -> Result<()> {
145
loop {
146
if !self.recv_msg()? {
147
break;
148
}
149
self.maybe_process_queued_buffers()?;
150
}
151
Ok(())
152
}
153
154
fn recv_msg(&mut self) -> Result<bool> {
155
let msg = self.receiver.recv().map_err(SoundError::StreamThreadRecv)?;
156
let (code, desc, next_state) = match *msg {
157
StreamMsg::SetParams(desc, params) => {
158
let code = match self.vios_client.lock().set_stream_parameters_raw(params) {
159
Ok(()) => {
160
let frame_rate = from_virtio_frame_rate(params.rate).unwrap_or(0) as u64;
161
self.period = Duration::from_nanos(
162
(params.period_bytes.to_native() as u64 * 1_000_000_000u64)
163
/ frame_rate
164
/ params.channels as u64
165
/ bytes_per_sample(params.format) as u64,
166
);
167
VIRTIO_SND_S_OK
168
}
169
Err(e) => {
170
error!(
171
"virtio-snd: Error setting parameters for stream {}: {}",
172
self.stream_id, e
173
);
174
vios_error_to_status_code(e)
175
}
176
};
177
(code, desc, StreamState::ParamsSet)
178
}
179
StreamMsg::Prepare(desc) => {
180
let code = match self.vios_client.lock().prepare_stream(self.stream_id) {
181
Ok(()) => VIRTIO_SND_S_OK,
182
Err(e) => {
183
error!(
184
"virtio-snd: Failed to prepare stream {}: {}",
185
self.stream_id, e
186
);
187
vios_error_to_status_code(e)
188
}
189
};
190
(code, desc, StreamState::Prepared)
191
}
192
StreamMsg::Start(desc) => {
193
let code = match self.vios_client.lock().start_stream(self.stream_id) {
194
Ok(()) => VIRTIO_SND_S_OK,
195
Err(e) => {
196
error!(
197
"virtio-snd: Failed to start stream {}: {}",
198
self.stream_id, e
199
);
200
vios_error_to_status_code(e)
201
}
202
};
203
self.start_time = Instant::now();
204
self.next_buffer = Duration::from_millis(0);
205
(code, desc, StreamState::Started)
206
}
207
StreamMsg::Stop(desc) => {
208
let code = match self.vios_client.lock().stop_stream(self.stream_id) {
209
Ok(()) => VIRTIO_SND_S_OK,
210
Err(e) => {
211
error!(
212
"virtio-snd: Failed to stop stream {}: {}",
213
self.stream_id, e
214
);
215
vios_error_to_status_code(e)
216
}
217
};
218
(code, desc, StreamState::Stopped)
219
}
220
StreamMsg::Release(desc) => {
221
let code = match self.vios_client.lock().release_stream(self.stream_id) {
222
Ok(()) => VIRTIO_SND_S_OK,
223
Err(e) => {
224
error!(
225
"virtio-snd: Failed to release stream {}: {}",
226
self.stream_id, e
227
);
228
vios_error_to_status_code(e)
229
}
230
};
231
(code, desc, StreamState::Released)
232
}
233
StreamMsg::Buffer(d) => {
234
// Buffers may arrive while in several states:
235
// - Prepared: Buffer should be queued and played when start cmd arrives
236
// - Started: Buffer should be processed immediately
237
// - Stopped: Buffer should be returned to the guest immediately
238
// Because we may need to wait to process the buffer, we always queue it and
239
// decide what to do with queued buffers after every message.
240
self.buffer_queue.push_back(d);
241
// return here to avoid replying on control queue below
242
return Ok(true);
243
}
244
StreamMsg::Break => {
245
return Ok(false);
246
}
247
};
248
reply_control_op_status(code, desc, &self.control_queue)?;
249
self.current_state = next_state;
250
Ok(true)
251
}
252
253
fn maybe_process_queued_buffers(&mut self) -> Result<()> {
254
match self.current_state {
255
StreamState::Started => {
256
while let Some(mut desc) = self.buffer_queue.pop_front() {
257
let reader = &mut desc.reader;
258
// Ignore the first buffer, it was already read by the time this thread
259
// receives the descriptor
260
reader.consume(std::mem::size_of::<virtio_snd_pcm_xfer>());
261
let writer = &mut desc.writer;
262
let io_res = if self.capture {
263
let buffer_size =
264
writer.available_bytes() - std::mem::size_of::<virtio_snd_pcm_status>();
265
self.vios_client.lock().request_audio_data(
266
self.stream_id,
267
buffer_size,
268
|vslice| writer.write_from_volatile_slice(*vslice),
269
)
270
} else {
271
self.vios_client.lock().inject_audio_data(
272
self.stream_id,
273
reader.available_bytes(),
274
|vslice| reader.read_to_volatile_slice(vslice),
275
)
276
};
277
let (code, latency) = match io_res {
278
Ok((latency, _)) => (VIRTIO_SND_S_OK, latency),
279
Err(e) => {
280
error!(
281
"virtio-snd: Failed IO operation in stream {}: {}",
282
self.stream_id, e
283
);
284
(VIRTIO_SND_S_IO_ERR, 0)
285
}
286
};
287
if let Err(e) = writer.write_obj(virtio_snd_pcm_status {
288
status: Le32::from(code),
289
latency_bytes: Le32::from(latency),
290
}) {
291
error!(
292
"virtio-snd: Failed to write pcm status from stream {} thread: {}",
293
self.stream_id, e
294
);
295
}
296
297
self.next_buffer += self.period;
298
let elapsed = self.start_time.elapsed();
299
if elapsed < self.next_buffer {
300
// Completing an IO request can be considered an elapsed period
301
// notification by the driver, so we must wait the right amount of time to
302
// release the buffer if the sound server client returned too soon.
303
std::thread::sleep(self.next_buffer - elapsed);
304
}
305
{
306
let mut io_queue_lock = self.io_queue.lock();
307
io_queue_lock.add_used(desc);
308
io_queue_lock.trigger_interrupt();
309
}
310
}
311
}
312
StreamState::Stopped | StreamState::Released => {
313
// For some reason playback buffers can arrive after stop and release (maybe because
314
// buffer-ready notifications arrive over eventfds and those are processed in
315
// random order?). The spec requires the device to not confirm the release of a
316
// stream until all IO buffers have been released, but that's impossible to
317
// guarantee if a buffer arrives after release is requested. Luckily it seems to
318
// work fine if the buffer is released after the release command is completed.
319
while let Some(desc) = self.buffer_queue.pop_front() {
320
reply_pcm_buffer_status(VIRTIO_SND_S_OK, 0, desc, &self.io_queue)?;
321
}
322
}
323
StreamState::Prepared => {} // Do nothing, any buffers will be processed after start
324
_ => {
325
if !self.buffer_queue.is_empty() {
326
warn!("virtio-snd: Buffers received while in unexpected state");
327
}
328
}
329
}
330
Ok(())
331
}
332
}
333
334
impl Drop for Stream {
335
fn drop(&mut self) {
336
// Try to stop and release the stream in case it was playing, these operations will fail if
337
// the stream is already released, just ignore that failure
338
let _ = self.vios_client.lock().stop_stream(self.stream_id);
339
let _ = self.vios_client.lock().release_stream(self.stream_id);
340
341
// Also release any pending buffer
342
while let Some(desc) = self.buffer_queue.pop_front() {
343
if let Err(e) = reply_pcm_buffer_status(VIRTIO_SND_S_IO_ERR, 0, desc, &self.io_queue) {
344
error!(
345
"virtio-snd: Failed to reply buffer on stream {}: {}",
346
self.stream_id, e
347
);
348
}
349
}
350
}
351
}
352
353
/// Basically a proxy to the thread handling a particular stream.
354
pub struct StreamProxy {
355
sender: Sender<Box<StreamMsg>>,
356
thread: Option<thread::JoinHandle<StreamSnapshot>>,
357
}
358
359
impl StreamProxy {
360
/// Access the underlying sender to clone it or send messages
361
pub fn msg_sender(&self) -> &Sender<Box<StreamMsg>> {
362
&self.sender
363
}
364
365
/// Send a message to the stream thread on the other side of this sender
366
pub fn send_msg(sender: &Sender<Box<StreamMsg>>, msg: StreamMsg) -> Result<()> {
367
sender
368
.send(Box::new(msg))
369
.map_err(SoundError::StreamThreadSend)
370
}
371
372
/// Convenience function to send a message to this stream's thread
373
pub fn send(&self, msg: StreamMsg) -> Result<()> {
374
Self::send_msg(&self.sender, msg)
375
}
376
377
pub fn stop_thread(mut self) -> StreamSnapshot {
378
self.stop_thread_inner().unwrap()
379
}
380
381
fn stop_thread_inner(&mut self) -> Option<StreamSnapshot> {
382
if let Some(th) = self.thread.take() {
383
if let Err(e) = self.send(StreamMsg::Break) {
384
error!(
385
"virtio-snd: Failed to send Break msg to stream thread: {}",
386
e
387
);
388
}
389
match th.join() {
390
Ok(state) => Some(state),
391
Err(e) => panic!("virtio-snd: Panic detected on stream thread: {e:?}"),
392
}
393
} else {
394
None
395
}
396
}
397
}
398
399
impl Drop for StreamProxy {
400
fn drop(&mut self) {
401
let _ = self.stop_thread_inner();
402
}
403
}
404
405
/// Attempts to set the current thread's priority to a value hight enough to handle audio IO. This
406
/// may fail due to insuficient permissions.
407
pub fn try_set_real_time_priority() {
408
const AUDIO_THREAD_RTPRIO: u16 = 10; // Matches other cros audio clients.
409
if let Err(e) = set_rt_prio_limit(u64::from(AUDIO_THREAD_RTPRIO))
410
.and_then(|_| set_rt_round_robin(i32::from(AUDIO_THREAD_RTPRIO)))
411
{
412
warn!("Failed to set audio stream thread to real time: {}", e);
413
}
414
}
415
416
/// Gets the appropriate virtio-snd error to return to the driver from a VioSError.
417
pub fn vios_error_to_status_code(e: VioSError) -> u32 {
418
match e {
419
VioSError::ServerIOError(_) => VIRTIO_SND_S_IO_ERR,
420
_ => VIRTIO_SND_S_NOT_SUPP,
421
}
422
}
423
424
/// Encapsulates sending the virtio_snd_hdr struct back to the driver.
425
pub fn reply_control_op_status(
426
code: u32,
427
mut desc: DescriptorChain,
428
queue: &Arc<Mutex<Queue>>,
429
) -> Result<()> {
430
let writer = &mut desc.writer;
431
writer
432
.write_obj(virtio_snd_hdr {
433
code: Le32::from(code),
434
})
435
.map_err(SoundError::QueueIO)?;
436
{
437
let mut queue_lock = queue.lock();
438
queue_lock.add_used(desc);
439
queue_lock.trigger_interrupt();
440
}
441
Ok(())
442
}
443
444
/// Encapsulates sending the virtio_snd_pcm_status struct back to the driver.
445
pub fn reply_pcm_buffer_status(
446
status: u32,
447
latency_bytes: u32,
448
mut desc: DescriptorChain,
449
queue: &Arc<Mutex<Queue>>,
450
) -> Result<()> {
451
let writer = &mut desc.writer;
452
if writer.available_bytes() > std::mem::size_of::<virtio_snd_pcm_status>() {
453
writer
454
.consume_bytes(writer.available_bytes() - std::mem::size_of::<virtio_snd_pcm_status>());
455
}
456
writer
457
.write_obj(virtio_snd_pcm_status {
458
status: Le32::from(status),
459
latency_bytes: Le32::from(latency_bytes),
460
})
461
.map_err(SoundError::QueueIO)?;
462
{
463
let mut queue_lock = queue.lock();
464
queue_lock.add_used(desc);
465
queue_lock.trigger_interrupt();
466
}
467
Ok(())
468
}
469
470
fn bytes_per_sample(format: u8) -> usize {
471
match format {
472
VIRTIO_SND_PCM_FMT_IMA_ADPCM => 1usize,
473
VIRTIO_SND_PCM_FMT_MU_LAW => 1usize,
474
VIRTIO_SND_PCM_FMT_A_LAW => 1usize,
475
VIRTIO_SND_PCM_FMT_S8 => 1usize,
476
VIRTIO_SND_PCM_FMT_U8 => 1usize,
477
VIRTIO_SND_PCM_FMT_S16 => 2usize,
478
VIRTIO_SND_PCM_FMT_U16 => 2usize,
479
VIRTIO_SND_PCM_FMT_S32 => 4usize,
480
VIRTIO_SND_PCM_FMT_U32 => 4usize,
481
VIRTIO_SND_PCM_FMT_FLOAT => 4usize,
482
VIRTIO_SND_PCM_FMT_FLOAT64 => 8usize,
483
// VIRTIO_SND_PCM_FMT_DSD_U8
484
// VIRTIO_SND_PCM_FMT_DSD_U16
485
// VIRTIO_SND_PCM_FMT_DSD_U32
486
// VIRTIO_SND_PCM_FMT_IEC958_SUBFRAME
487
// VIRTIO_SND_PCM_FMT_S18_3
488
// VIRTIO_SND_PCM_FMT_U18_3
489
// VIRTIO_SND_PCM_FMT_S20_3
490
// VIRTIO_SND_PCM_FMT_U20_3
491
// VIRTIO_SND_PCM_FMT_S24_3
492
// VIRTIO_SND_PCM_FMT_U24_3
493
// VIRTIO_SND_PCM_FMT_S20
494
// VIRTIO_SND_PCM_FMT_U20
495
// VIRTIO_SND_PCM_FMT_S24
496
// VIRTIO_SND_PCM_FMT_U24
497
_ => {
498
// Some of these formats are not consistently stored in a particular size (24bits is
499
// sometimes stored in a 32bit word) while others are of variable size.
500
// The size per sample estimated here is designed to greatly underestimate the time it
501
// takes to play a buffer and depend instead on timings provided by the sound server if
502
// it supports these formats.
503
warn!(
504
"Unknown sample size for format {}, depending on sound server timing instead.",
505
format
506
);
507
1000usize
508
}
509
}
510
}
511
512