Path: blob/main/devices/src/virtio/snd/vios_backend/worker.rs
5394 views
// Copyright 2021 The ChromiumOS Authors1// Use of this source code is governed by a BSD-style license that can be2// found in the LICENSE file.34use std::io::Read;5use std::sync::mpsc::Sender;6use std::sync::Arc;7use std::thread;89use base::error;10use base::warn;11use base::Event;12use base::EventToken;13use base::WaitContext;14use data_model::Le32;15use sync::Mutex;16use zerocopy::Immutable;17use zerocopy::IntoBytes;1819use super::super::constants::*;20use super::super::layout::*;21use super::streams::*;22use super::Result;23use super::SoundError;24use super::*;25use crate::virtio::DescriptorChain;26use crate::virtio::Queue;2728pub struct Worker {29// Lock order: Must never hold more than one queue lock at the same time.30pub control_queue: Arc<Mutex<Queue>>,31pub event_queue: Option<Queue>,32vios_client: Arc<Mutex<VioSClient>>,33streams: Vec<StreamProxy>,34pub tx_queue: Arc<Mutex<Queue>>,35pub rx_queue: Arc<Mutex<Queue>>,36io_thread: Option<thread::JoinHandle<Result<()>>>,37io_kill: Event,38// saved_stream_state holds the previous state of streams. When the sound device is newly39// created, this will be empty. It will only contain state if the sound device is put to sleep40// OR if we restore a VM.41pub saved_stream_state: Vec<StreamSnapshot>,42}4344impl Worker {45/// Creates a new virtio-snd worker.46pub fn try_new(47vios_client: Arc<Mutex<VioSClient>>,48control_queue: Arc<Mutex<Queue>>,49event_queue: Queue,50tx_queue: Arc<Mutex<Queue>>,51rx_queue: Arc<Mutex<Queue>>,52saved_stream_state: Vec<StreamSnapshot>,53) -> Result<Worker> {54let num_streams = vios_client.lock().num_streams();55let mut streams: Vec<StreamProxy> = Vec::with_capacity(num_streams as usize);56{57for stream_id in 0..num_streams {58let capture = vios_client59.lock()60.stream_info(stream_id)61.map(|i| i.direction == VIRTIO_SND_D_INPUT)62.unwrap_or(false);63let io_queue = if capture { &rx_queue } else { &tx_queue };64streams.push(Stream::try_new(65stream_id,66vios_client.clone(),67control_queue.clone(),68io_queue.clone(),69capture,70saved_stream_state.get(stream_id as usize).cloned(),71)?);72}73}74let (self_kill_io, kill_io) = Event::new()75.and_then(|e| Ok((e.try_clone()?, e)))76.map_err(SoundError::CreateEvent)?;7778let senders: Vec<Sender<Box<StreamMsg>>> =79streams.iter().map(|sp| sp.msg_sender().clone()).collect();80let tx_queue_thread = tx_queue.clone();81let rx_queue_thread = rx_queue.clone();82let io_thread = thread::Builder::new()83.name("v_snd_io".to_string())84.spawn(move || {85try_set_real_time_priority();8687io_loop(tx_queue_thread, rx_queue_thread, senders, kill_io)88})89.map_err(SoundError::CreateThread)?;90Ok(Worker {91control_queue,92event_queue: Some(event_queue),93vios_client,94streams,95tx_queue,96rx_queue,97io_thread: Some(io_thread),98io_kill: self_kill_io,99saved_stream_state: Vec::new(),100})101}102103/// Emulates the virtio-snd device. It won't return until something is written to the kill_evt104/// event or an unrecoverable error occurs.105pub fn control_loop(&mut self, kill_evt: Event) -> Result<()> {106let event_notifier = self107.vios_client108.lock()109.get_event_notifier()110.map_err(SoundError::ClientEventNotifier)?;111#[derive(EventToken)]112enum Token {113ControlQAvailable,114EventQAvailable,115EventTriggered,116Kill,117}118let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[119(self.control_queue.lock().event(), Token::ControlQAvailable),120(121self.event_queue.as_ref().expect("queue missing").event(),122Token::EventQAvailable,123),124(&event_notifier, Token::EventTriggered),125(&kill_evt, Token::Kill),126])127.map_err(SoundError::WaitCtx)?;128129let mut event_queue = self.event_queue.take().expect("event_queue missing");130'wait: loop {131let wait_events = wait_ctx.wait().map_err(SoundError::WaitCtx)?;132133for wait_evt in wait_events.iter().filter(|e| e.is_readable) {134match wait_evt.token {135Token::ControlQAvailable => {136self.control_queue137.lock()138.event()139.wait()140.map_err(SoundError::QueueEvt)?;141self.process_controlq_buffers()?;142}143Token::EventQAvailable => {144// Just read from the event object to make sure the producer of such events145// never blocks. The buffers will only be used when actual virtio-snd146// events are triggered.147event_queue.event().wait().map_err(SoundError::QueueEvt)?;148}149Token::EventTriggered => {150event_notifier.wait().map_err(SoundError::QueueEvt)?;151self.process_event_triggered(&mut event_queue)?;152}153Token::Kill => {154let _ = kill_evt.wait();155break 'wait;156}157}158}159}160self.saved_stream_state = self161.streams162.drain(..)163.map(|stream| stream.stop_thread())164.collect();165self.event_queue = Some(event_queue);166Ok(())167}168169fn stop_io_thread(&mut self) {170if let Err(e) = self.io_kill.signal() {171error!(172"virtio-snd: Failed to send Break msg to stream thread: {}",173e174);175}176if let Some(th) = self.io_thread.take() {177match th.join() {178Err(e) => {179error!("virtio-snd: Panic detected on stream thread: {:?}", e);180}181Ok(r) => {182if let Err(e) = r {183error!("virtio-snd: IO thread exited with and error: {}", e);184}185}186}187}188}189190// Pops and handles all available ontrol queue buffers. Logs minor errors, but returns an191// Err if it encounters an unrecoverable error.192fn process_controlq_buffers(&mut self) -> Result<()> {193while let Some(mut avail_desc) = lock_pop_unlock(&self.control_queue) {194let reader = &mut avail_desc.reader;195let available_bytes = reader.available_bytes();196let Ok(hdr) = reader.peek_obj::<virtio_snd_hdr>() else {197error!(198"virtio-snd: Message received on control queue is too small: {}",199available_bytes200);201return reply_control_op_status(202VIRTIO_SND_S_BAD_MSG,203avail_desc,204&self.control_queue,205);206};207let mut read_buf = vec![0u8; available_bytes];208reader209.read_exact(&mut read_buf)210.map_err(SoundError::QueueIO)?;211let request_type = hdr.code.to_native();212match request_type {213VIRTIO_SND_R_JACK_INFO => {214let (code, info_vec) = {215match self.parse_info_query(&read_buf) {216None => (VIRTIO_SND_S_BAD_MSG, Vec::new()),217Some((start_id, count)) => {218let end_id = start_id.saturating_add(count);219if end_id > self.vios_client.lock().num_jacks() {220error!(221"virtio-snd: Requested info on invalid jacks ids: {}..{}",222start_id,223end_id - 1224);225(VIRTIO_SND_S_NOT_SUPP, Vec::new())226} else {227(228VIRTIO_SND_S_OK,229// Safe to unwrap because we just ensured all the ids are230// valid231(start_id..end_id)232.map(|id| {233self.vios_client.lock().jack_info(id).unwrap()234})235.collect(),236)237}238}239}240};241self.send_info_reply(avail_desc, code, info_vec)?;242}243VIRTIO_SND_R_JACK_REMAP => {244let code = if read_buf.len() != std::mem::size_of::<virtio_snd_jack_remap>() {245error!(246"virtio-snd: The driver sent the wrong number bytes for a jack_remap struct: {}",247read_buf.len()248);249VIRTIO_SND_S_BAD_MSG250} else {251let mut request: virtio_snd_jack_remap = Default::default();252request.as_mut_bytes().copy_from_slice(&read_buf);253let jack_id = request.hdr.jack_id.to_native();254let association = request.association.to_native();255let sequence = request.sequence.to_native();256if let Err(e) =257self.vios_client258.lock()259.remap_jack(jack_id, association, sequence)260{261error!("virtio-snd: Failed to remap jack: {}", e);262vios_error_to_status_code(e)263} else {264VIRTIO_SND_S_OK265}266};267let writer = &mut avail_desc.writer;268writer269.write_obj(virtio_snd_hdr {270code: Le32::from(code),271})272.map_err(SoundError::QueueIO)?;273{274let mut queue_lock = self.control_queue.lock();275queue_lock.add_used(avail_desc);276queue_lock.trigger_interrupt();277}278}279VIRTIO_SND_R_CHMAP_INFO => {280let (code, info_vec) = {281match self.parse_info_query(&read_buf) {282None => (VIRTIO_SND_S_BAD_MSG, Vec::new()),283Some((start_id, count)) => {284let end_id = start_id.saturating_add(count);285let num_chmaps = self.vios_client.lock().num_chmaps();286if end_id > num_chmaps {287error!(288"virtio-snd: Requested info on invalid chmaps ids: {}..{}",289start_id,290end_id - 1291);292(VIRTIO_SND_S_NOT_SUPP, Vec::new())293} else {294(295VIRTIO_SND_S_OK,296// Safe to unwrap because we just ensured all the ids are297// valid298(start_id..end_id)299.map(|id| {300self.vios_client.lock().chmap_info(id).unwrap()301})302.collect(),303)304}305}306}307};308self.send_info_reply(avail_desc, code, info_vec)?;309}310VIRTIO_SND_R_PCM_INFO => {311let (code, info_vec) = {312match self.parse_info_query(&read_buf) {313None => (VIRTIO_SND_S_BAD_MSG, Vec::new()),314Some((start_id, count)) => {315let end_id = start_id.saturating_add(count);316if end_id > self.vios_client.lock().num_streams() {317error!(318"virtio-snd: Requested info on invalid stream ids: {}..{}",319start_id,320end_id - 1321);322(VIRTIO_SND_S_NOT_SUPP, Vec::new())323} else {324(325VIRTIO_SND_S_OK,326// Safe to unwrap because we just ensured all the ids are327// valid328(start_id..end_id)329.map(|id| {330self.vios_client.lock().stream_info(id).unwrap()331})332.collect(),333)334}335}336}337};338self.send_info_reply(avail_desc, code, info_vec)?;339}340VIRTIO_SND_R_PCM_SET_PARAMS => self.process_set_params(avail_desc, &read_buf)?,341VIRTIO_SND_R_PCM_PREPARE => {342self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Prepare(avail_desc))?343}344VIRTIO_SND_R_PCM_RELEASE => {345self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Release(avail_desc))?346}347VIRTIO_SND_R_PCM_START => {348self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Start(avail_desc))?349}350VIRTIO_SND_R_PCM_STOP => {351self.try_parse_pcm_hdr_and_send_msg(&read_buf, StreamMsg::Stop(avail_desc))?352}353_ => {354error!(355"virtio-snd: Unknown control queue mesage code: {}",356request_type357);358reply_control_op_status(359VIRTIO_SND_S_NOT_SUPP,360avail_desc,361&self.control_queue,362)?;363}364}365}366Ok(())367}368369fn process_event_triggered(&mut self, event_queue: &mut Queue) -> Result<()> {370while let Some(evt) = self.vios_client.lock().pop_event() {371if let Some(mut desc) = event_queue.pop() {372let writer = &mut desc.writer;373writer.write_obj(evt).map_err(SoundError::QueueIO)?;374event_queue.add_used(desc);375event_queue.trigger_interrupt();376} else {377warn!("virtio-snd: Dropping event because there are no buffers in virtqueue");378}379}380Ok(())381}382383fn parse_info_query(&mut self, read_buf: &[u8]) -> Option<(u32, u32)> {384if read_buf.len() != std::mem::size_of::<virtio_snd_query_info>() {385error!(386"virtio-snd: The driver sent the wrong number bytes for a pcm_info struct: {}",387read_buf.len()388);389return None;390}391let mut query: virtio_snd_query_info = Default::default();392query.as_mut_bytes().copy_from_slice(read_buf);393let start_id = query.start_id.to_native();394let count = query.count.to_native();395Some((start_id, count))396}397398// Returns Err if it encounters an unrecoverable error, Ok otherwise399fn process_set_params(&mut self, desc: DescriptorChain, read_buf: &[u8]) -> Result<()> {400if read_buf.len() != std::mem::size_of::<virtio_snd_pcm_set_params>() {401error!(402"virtio-snd: The driver sent a buffer of the wrong size for a set_params struct: {}",403read_buf.len()404);405return reply_control_op_status(VIRTIO_SND_S_BAD_MSG, desc, &self.control_queue);406}407let mut params: virtio_snd_pcm_set_params = Default::default();408params.as_mut_bytes().copy_from_slice(read_buf);409let stream_id = params.hdr.stream_id.to_native();410if stream_id < self.vios_client.lock().num_streams() {411self.streams[stream_id as usize].send(StreamMsg::SetParams(desc, params))412} else {413error!(414"virtio-snd: Driver requested operation on invalid stream: {}",415stream_id416);417reply_control_op_status(VIRTIO_SND_S_BAD_MSG, desc, &self.control_queue)418}419}420421// Returns Err if it encounters an unrecoverable error, Ok otherwise422fn try_parse_pcm_hdr_and_send_msg(&mut self, read_buf: &[u8], msg: StreamMsg) -> Result<()> {423if read_buf.len() != std::mem::size_of::<virtio_snd_pcm_hdr>() {424error!(425"virtio-snd: The driver sent a buffer too small to contain a header: {}",426read_buf.len()427);428return reply_control_op_status(429VIRTIO_SND_S_BAD_MSG,430match msg {431StreamMsg::Prepare(d)432| StreamMsg::Start(d)433| StreamMsg::Stop(d)434| StreamMsg::Release(d) => d,435_ => panic!("virtio-snd: Can't handle message. This is a BUG!!"),436},437&self.control_queue,438);439}440let mut pcm_hdr: virtio_snd_pcm_hdr = Default::default();441pcm_hdr.as_mut_bytes().copy_from_slice(read_buf);442let stream_id = pcm_hdr.stream_id.to_native();443if stream_id < self.vios_client.lock().num_streams() {444self.streams[stream_id as usize].send(msg)445} else {446error!(447"virtio-snd: Driver requested operation on invalid stream: {}",448stream_id449);450reply_control_op_status(451VIRTIO_SND_S_BAD_MSG,452match msg {453StreamMsg::Prepare(d)454| StreamMsg::Start(d)455| StreamMsg::Stop(d)456| StreamMsg::Release(d) => d,457_ => panic!("virtio-snd: Can't handle message. This is a BUG!!"),458},459&self.control_queue,460)461}462}463464fn send_info_reply<T: Immutable + IntoBytes>(465&mut self,466mut desc: DescriptorChain,467code: u32,468info_vec: Vec<T>,469) -> Result<()> {470let writer = &mut desc.writer;471writer472.write_obj(virtio_snd_hdr {473code: Le32::from(code),474})475.map_err(SoundError::QueueIO)?;476for info in info_vec {477writer.write_obj(info).map_err(SoundError::QueueIO)?;478}479{480let mut queue_lock = self.control_queue.lock();481queue_lock.add_used(desc);482queue_lock.trigger_interrupt();483}484Ok(())485}486}487488impl Drop for Worker {489fn drop(&mut self) {490self.stop_io_thread();491}492}493494fn io_loop(495tx_queue: Arc<Mutex<Queue>>,496rx_queue: Arc<Mutex<Queue>>,497senders: Vec<Sender<Box<StreamMsg>>>,498kill_evt: Event,499) -> Result<()> {500#[derive(EventToken)]501enum Token {502TxQAvailable,503RxQAvailable,504Kill,505}506let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[507(tx_queue.lock().event(), Token::TxQAvailable),508(rx_queue.lock().event(), Token::RxQAvailable),509(&kill_evt, Token::Kill),510])511.map_err(SoundError::WaitCtx)?;512513'wait: loop {514let wait_events = wait_ctx.wait().map_err(SoundError::WaitCtx)?;515for wait_evt in wait_events.iter().filter(|e| e.is_readable) {516let queue = match wait_evt.token {517Token::TxQAvailable => {518tx_queue519.lock()520.event()521.wait()522.map_err(SoundError::QueueEvt)?;523&tx_queue524}525Token::RxQAvailable => {526rx_queue527.lock()528.event()529.wait()530.map_err(SoundError::QueueEvt)?;531&rx_queue532}533Token::Kill => {534let _ = kill_evt.wait();535break 'wait;536}537};538while let Some(mut avail_desc) = lock_pop_unlock(queue) {539let reader = &mut avail_desc.reader;540let xfer: virtio_snd_pcm_xfer = reader.read_obj().map_err(SoundError::QueueIO)?;541let stream_id = xfer.stream_id.to_native();542if stream_id as usize >= senders.len() {543error!(544"virtio-snd: Driver sent buffer for invalid stream: {}",545stream_id546);547reply_pcm_buffer_status(VIRTIO_SND_S_IO_ERR, 0, avail_desc, queue)?;548} else {549StreamProxy::send_msg(550&senders[stream_id as usize],551StreamMsg::Buffer(avail_desc),552)?;553}554}555}556}557Ok(())558}559560// If queue.lock().pop() is used directly in the condition of a 'while' loop the lock is held over561// the entire loop block. Encapsulating it in this fuction guarantees that the lock is dropped562// immediately after pop() is called, which allows the code to remain somewhat simpler.563fn lock_pop_unlock(queue: &Arc<Mutex<Queue>>) -> Option<DescriptorChain> {564queue.lock().pop()565}566567568