Path: blob/main/devices/src/virtio/snd/vios_backend/streams.rs
5394 views
// Copyright 2021 The ChromiumOS Authors1// Use of this source code is governed by a BSD-style license that can be2// found in the LICENSE file.34use std::collections::VecDeque;5use std::sync::mpsc::channel;6use std::sync::mpsc::Receiver;7use std::sync::mpsc::Sender;8use std::sync::Arc;9use std::thread;10use std::time::Duration;11use std::time::Instant;1213use base::error;14use base::set_rt_prio_limit;15use base::set_rt_round_robin;16use base::warn;17use data_model::Le32;18use serde::Deserialize;19use serde::Serialize;20use sync::Mutex;2122use super::Error as VioSError;23use super::Result;24use super::SoundError;25use super::*;26use crate::virtio::snd::common::from_virtio_frame_rate;27use crate::virtio::snd::constants::*;28use crate::virtio::snd::layout::*;29use crate::virtio::DescriptorChain;30use crate::virtio::Queue;3132/// Messages that the worker can send to the stream (thread).33pub enum StreamMsg {34SetParams(DescriptorChain, virtio_snd_pcm_set_params),35Prepare(DescriptorChain),36Start(DescriptorChain),37Stop(DescriptorChain),38Release(DescriptorChain),39Buffer(DescriptorChain),40Break,41}4243#[derive(Clone, Serialize, Deserialize)]44pub enum StreamState {45New,46ParamsSet,47Prepared,48Started,49Stopped,50Released,51}5253pub struct Stream {54stream_id: u32,55receiver: Receiver<Box<StreamMsg>>,56vios_client: Arc<Mutex<VioSClient>>,57control_queue: Arc<Mutex<Queue>>,58io_queue: Arc<Mutex<Queue>>,59capture: bool,60current_state: StreamState,61period: Duration,62start_time: Instant,63next_buffer: Duration,64buffer_queue: VecDeque<DescriptorChain>,65}6667#[derive(Clone, Serialize, Deserialize)]68pub struct StreamSnapshot {69pub current_state: StreamState,70pub period: Duration,71pub next_buffer: Duration,72}7374impl Stream {75/// Start a new stream thread and return its handler.76pub fn try_new(77stream_id: u32,78vios_client: Arc<Mutex<VioSClient>>,79control_queue: Arc<Mutex<Queue>>,80io_queue: Arc<Mutex<Queue>>,81capture: bool,82stream_state: Option<StreamSnapshot>,83) -> Result<StreamProxy> {84let (sender, receiver): (Sender<Box<StreamMsg>>, Receiver<Box<StreamMsg>>) = channel();85let thread = thread::Builder::new()86.name(format!("v_snd_stream:{stream_id}"))87.spawn(move || {88try_set_real_time_priority();89let (current_state, period, next_buffer) =90if let Some(stream_state) = stream_state.clone() {91(92stream_state.current_state,93stream_state.period,94stream_state.next_buffer,95)96} else {97(98StreamState::New,99Duration::from_millis(0),100Duration::from_millis(0),101)102};103104let mut stream = Stream {105stream_id,106receiver,107vios_client: vios_client.clone(),108control_queue,109io_queue,110capture,111current_state,112period,113start_time: Instant::now(),114next_buffer,115buffer_queue: VecDeque::new(),116};117118if let Some(stream_state) = stream_state {119if let Err(e) = vios_client120.lock()121.restore_stream(stream_id, stream_state.current_state)122{123error!("failed to restore stream params: {}", e);124};125}126if let Err(e) = stream.stream_loop() {127error!("virtio-snd: Error in stream {}: {}", stream_id, e);128}129let state = stream.current_state.clone();130StreamSnapshot {131current_state: state,132period: stream.period,133next_buffer: stream.next_buffer,134}135})136.map_err(SoundError::CreateThread)?;137Ok(StreamProxy {138sender,139thread: Some(thread),140})141}142143fn stream_loop(&mut self) -> Result<()> {144loop {145if !self.recv_msg()? {146break;147}148self.maybe_process_queued_buffers()?;149}150Ok(())151}152153fn recv_msg(&mut self) -> Result<bool> {154let msg = self.receiver.recv().map_err(SoundError::StreamThreadRecv)?;155let (code, desc, next_state) = match *msg {156StreamMsg::SetParams(desc, params) => {157let code = match self.vios_client.lock().set_stream_parameters_raw(params) {158Ok(()) => {159let frame_rate = from_virtio_frame_rate(params.rate).unwrap_or(0) as u64;160self.period = Duration::from_nanos(161(params.period_bytes.to_native() as u64 * 1_000_000_000u64)162/ frame_rate163/ params.channels as u64164/ bytes_per_sample(params.format) as u64,165);166VIRTIO_SND_S_OK167}168Err(e) => {169error!(170"virtio-snd: Error setting parameters for stream {}: {}",171self.stream_id, e172);173vios_error_to_status_code(e)174}175};176(code, desc, StreamState::ParamsSet)177}178StreamMsg::Prepare(desc) => {179let code = match self.vios_client.lock().prepare_stream(self.stream_id) {180Ok(()) => VIRTIO_SND_S_OK,181Err(e) => {182error!(183"virtio-snd: Failed to prepare stream {}: {}",184self.stream_id, e185);186vios_error_to_status_code(e)187}188};189(code, desc, StreamState::Prepared)190}191StreamMsg::Start(desc) => {192let code = match self.vios_client.lock().start_stream(self.stream_id) {193Ok(()) => VIRTIO_SND_S_OK,194Err(e) => {195error!(196"virtio-snd: Failed to start stream {}: {}",197self.stream_id, e198);199vios_error_to_status_code(e)200}201};202self.start_time = Instant::now();203self.next_buffer = Duration::from_millis(0);204(code, desc, StreamState::Started)205}206StreamMsg::Stop(desc) => {207let code = match self.vios_client.lock().stop_stream(self.stream_id) {208Ok(()) => VIRTIO_SND_S_OK,209Err(e) => {210error!(211"virtio-snd: Failed to stop stream {}: {}",212self.stream_id, e213);214vios_error_to_status_code(e)215}216};217(code, desc, StreamState::Stopped)218}219StreamMsg::Release(desc) => {220let code = match self.vios_client.lock().release_stream(self.stream_id) {221Ok(()) => VIRTIO_SND_S_OK,222Err(e) => {223error!(224"virtio-snd: Failed to release stream {}: {}",225self.stream_id, e226);227vios_error_to_status_code(e)228}229};230(code, desc, StreamState::Released)231}232StreamMsg::Buffer(d) => {233// Buffers may arrive while in several states:234// - Prepared: Buffer should be queued and played when start cmd arrives235// - Started: Buffer should be processed immediately236// - Stopped: Buffer should be returned to the guest immediately237// Because we may need to wait to process the buffer, we always queue it and238// decide what to do with queued buffers after every message.239self.buffer_queue.push_back(d);240// return here to avoid replying on control queue below241return Ok(true);242}243StreamMsg::Break => {244return Ok(false);245}246};247reply_control_op_status(code, desc, &self.control_queue)?;248self.current_state = next_state;249Ok(true)250}251252fn maybe_process_queued_buffers(&mut self) -> Result<()> {253match self.current_state {254StreamState::Started => {255while let Some(mut desc) = self.buffer_queue.pop_front() {256let reader = &mut desc.reader;257// Ignore the first buffer, it was already read by the time this thread258// receives the descriptor259reader.consume(std::mem::size_of::<virtio_snd_pcm_xfer>());260let writer = &mut desc.writer;261let io_res = if self.capture {262let buffer_size =263writer.available_bytes() - std::mem::size_of::<virtio_snd_pcm_status>();264self.vios_client.lock().request_audio_data(265self.stream_id,266buffer_size,267|vslice| writer.write_from_volatile_slice(*vslice),268)269} else {270self.vios_client.lock().inject_audio_data(271self.stream_id,272reader.available_bytes(),273|vslice| reader.read_to_volatile_slice(vslice),274)275};276let (code, latency) = match io_res {277Ok((latency, _)) => (VIRTIO_SND_S_OK, latency),278Err(e) => {279error!(280"virtio-snd: Failed IO operation in stream {}: {}",281self.stream_id, e282);283(VIRTIO_SND_S_IO_ERR, 0)284}285};286if let Err(e) = writer.write_obj(virtio_snd_pcm_status {287status: Le32::from(code),288latency_bytes: Le32::from(latency),289}) {290error!(291"virtio-snd: Failed to write pcm status from stream {} thread: {}",292self.stream_id, e293);294}295296self.next_buffer += self.period;297let elapsed = self.start_time.elapsed();298if elapsed < self.next_buffer {299// Completing an IO request can be considered an elapsed period300// notification by the driver, so we must wait the right amount of time to301// release the buffer if the sound server client returned too soon.302std::thread::sleep(self.next_buffer - elapsed);303}304{305let mut io_queue_lock = self.io_queue.lock();306io_queue_lock.add_used(desc);307io_queue_lock.trigger_interrupt();308}309}310}311StreamState::Stopped | StreamState::Released => {312// For some reason playback buffers can arrive after stop and release (maybe because313// buffer-ready notifications arrive over eventfds and those are processed in314// random order?). The spec requires the device to not confirm the release of a315// stream until all IO buffers have been released, but that's impossible to316// guarantee if a buffer arrives after release is requested. Luckily it seems to317// work fine if the buffer is released after the release command is completed.318while let Some(desc) = self.buffer_queue.pop_front() {319reply_pcm_buffer_status(VIRTIO_SND_S_OK, 0, desc, &self.io_queue)?;320}321}322StreamState::Prepared => {} // Do nothing, any buffers will be processed after start323_ => {324if !self.buffer_queue.is_empty() {325warn!("virtio-snd: Buffers received while in unexpected state");326}327}328}329Ok(())330}331}332333impl Drop for Stream {334fn drop(&mut self) {335// Try to stop and release the stream in case it was playing, these operations will fail if336// the stream is already released, just ignore that failure337let _ = self.vios_client.lock().stop_stream(self.stream_id);338let _ = self.vios_client.lock().release_stream(self.stream_id);339340// Also release any pending buffer341while let Some(desc) = self.buffer_queue.pop_front() {342if let Err(e) = reply_pcm_buffer_status(VIRTIO_SND_S_IO_ERR, 0, desc, &self.io_queue) {343error!(344"virtio-snd: Failed to reply buffer on stream {}: {}",345self.stream_id, e346);347}348}349}350}351352/// Basically a proxy to the thread handling a particular stream.353pub struct StreamProxy {354sender: Sender<Box<StreamMsg>>,355thread: Option<thread::JoinHandle<StreamSnapshot>>,356}357358impl StreamProxy {359/// Access the underlying sender to clone it or send messages360pub fn msg_sender(&self) -> &Sender<Box<StreamMsg>> {361&self.sender362}363364/// Send a message to the stream thread on the other side of this sender365pub fn send_msg(sender: &Sender<Box<StreamMsg>>, msg: StreamMsg) -> Result<()> {366sender367.send(Box::new(msg))368.map_err(SoundError::StreamThreadSend)369}370371/// Convenience function to send a message to this stream's thread372pub fn send(&self, msg: StreamMsg) -> Result<()> {373Self::send_msg(&self.sender, msg)374}375376pub fn stop_thread(mut self) -> StreamSnapshot {377self.stop_thread_inner().unwrap()378}379380fn stop_thread_inner(&mut self) -> Option<StreamSnapshot> {381if let Some(th) = self.thread.take() {382if let Err(e) = self.send(StreamMsg::Break) {383error!(384"virtio-snd: Failed to send Break msg to stream thread: {}",385e386);387}388match th.join() {389Ok(state) => Some(state),390Err(e) => panic!("virtio-snd: Panic detected on stream thread: {e:?}"),391}392} else {393None394}395}396}397398impl Drop for StreamProxy {399fn drop(&mut self) {400let _ = self.stop_thread_inner();401}402}403404/// Attempts to set the current thread's priority to a value hight enough to handle audio IO. This405/// may fail due to insuficient permissions.406pub fn try_set_real_time_priority() {407const AUDIO_THREAD_RTPRIO: u16 = 10; // Matches other cros audio clients.408if let Err(e) = set_rt_prio_limit(u64::from(AUDIO_THREAD_RTPRIO))409.and_then(|_| set_rt_round_robin(i32::from(AUDIO_THREAD_RTPRIO)))410{411warn!("Failed to set audio stream thread to real time: {}", e);412}413}414415/// Gets the appropriate virtio-snd error to return to the driver from a VioSError.416pub fn vios_error_to_status_code(e: VioSError) -> u32 {417match e {418VioSError::ServerIOError(_) => VIRTIO_SND_S_IO_ERR,419_ => VIRTIO_SND_S_NOT_SUPP,420}421}422423/// Encapsulates sending the virtio_snd_hdr struct back to the driver.424pub fn reply_control_op_status(425code: u32,426mut desc: DescriptorChain,427queue: &Arc<Mutex<Queue>>,428) -> Result<()> {429let writer = &mut desc.writer;430writer431.write_obj(virtio_snd_hdr {432code: Le32::from(code),433})434.map_err(SoundError::QueueIO)?;435{436let mut queue_lock = queue.lock();437queue_lock.add_used(desc);438queue_lock.trigger_interrupt();439}440Ok(())441}442443/// Encapsulates sending the virtio_snd_pcm_status struct back to the driver.444pub fn reply_pcm_buffer_status(445status: u32,446latency_bytes: u32,447mut desc: DescriptorChain,448queue: &Arc<Mutex<Queue>>,449) -> Result<()> {450let writer = &mut desc.writer;451if writer.available_bytes() > std::mem::size_of::<virtio_snd_pcm_status>() {452writer453.consume_bytes(writer.available_bytes() - std::mem::size_of::<virtio_snd_pcm_status>());454}455writer456.write_obj(virtio_snd_pcm_status {457status: Le32::from(status),458latency_bytes: Le32::from(latency_bytes),459})460.map_err(SoundError::QueueIO)?;461{462let mut queue_lock = queue.lock();463queue_lock.add_used(desc);464queue_lock.trigger_interrupt();465}466Ok(())467}468469fn bytes_per_sample(format: u8) -> usize {470match format {471VIRTIO_SND_PCM_FMT_IMA_ADPCM => 1usize,472VIRTIO_SND_PCM_FMT_MU_LAW => 1usize,473VIRTIO_SND_PCM_FMT_A_LAW => 1usize,474VIRTIO_SND_PCM_FMT_S8 => 1usize,475VIRTIO_SND_PCM_FMT_U8 => 1usize,476VIRTIO_SND_PCM_FMT_S16 => 2usize,477VIRTIO_SND_PCM_FMT_U16 => 2usize,478VIRTIO_SND_PCM_FMT_S32 => 4usize,479VIRTIO_SND_PCM_FMT_U32 => 4usize,480VIRTIO_SND_PCM_FMT_FLOAT => 4usize,481VIRTIO_SND_PCM_FMT_FLOAT64 => 8usize,482// VIRTIO_SND_PCM_FMT_DSD_U8483// VIRTIO_SND_PCM_FMT_DSD_U16484// VIRTIO_SND_PCM_FMT_DSD_U32485// VIRTIO_SND_PCM_FMT_IEC958_SUBFRAME486// VIRTIO_SND_PCM_FMT_S18_3487// VIRTIO_SND_PCM_FMT_U18_3488// VIRTIO_SND_PCM_FMT_S20_3489// VIRTIO_SND_PCM_FMT_U20_3490// VIRTIO_SND_PCM_FMT_S24_3491// VIRTIO_SND_PCM_FMT_U24_3492// VIRTIO_SND_PCM_FMT_S20493// VIRTIO_SND_PCM_FMT_U20494// VIRTIO_SND_PCM_FMT_S24495// VIRTIO_SND_PCM_FMT_U24496_ => {497// Some of these formats are not consistently stored in a particular size (24bits is498// sometimes stored in a 32bit word) while others are of variable size.499// The size per sample estimated here is designed to greatly underestimate the time it500// takes to play a buffer and depend instead on timings provided by the sound server if501// it supports these formats.502warn!(503"Unknown sample size for format {}, depending on sound server timing instead.",504format505);5061000usize507}508}509}510511512