Path: blob/main/devices/src/virtio/snd/common_backend/async_funcs.rs
5394 views
// Copyright 2021 The ChromiumOS Authors1// Use of this source code is governed by a BSD-style license that can be2// found in the LICENSE file.34use std::fmt;5use std::io;6use std::io::Read;7use std::io::Write;8use std::rc::Rc;9use std::sync::atomic::AtomicBool;10use std::sync::atomic::Ordering;11use std::time::Duration;1213use async_trait::async_trait;14use audio_streams::capture::AsyncCaptureBuffer;15use audio_streams::AsyncPlaybackBuffer;16use audio_streams::BoxError;17use base::debug;18use base::error;19use base::info;20use cros_async::sync::Condvar;21use cros_async::sync::RwLock as AsyncRwLock;22use cros_async::AsyncTube;23use cros_async::EventAsync;24use cros_async::Executor;25use cros_async::TimerAsync;26use futures::channel::mpsc;27use futures::channel::oneshot;28use futures::pin_mut;29use futures::select;30use futures::FutureExt;31use futures::SinkExt;32use futures::StreamExt;33use thiserror::Error as ThisError;34use vm_control::SndControlCommand;35use vm_control::VmResponse;36use zerocopy::IntoBytes;3738use super::Error;39use super::SndData;40use super::WorkerStatus;41use crate::virtio::snd::common::*;42use crate::virtio::snd::common_backend::stream_info::SetParams;43use crate::virtio::snd::common_backend::stream_info::StreamInfo;44use crate::virtio::snd::common_backend::DirectionalStream;45use crate::virtio::snd::common_backend::PcmResponse;46use crate::virtio::snd::constants::*;47use crate::virtio::snd::layout::*;48use crate::virtio::DescriptorChain;49use crate::virtio::Queue;50use crate::virtio::Reader;51use crate::virtio::Writer;5253/// Trait to wrap system specific helpers for reading from the start point capture buffer.54#[async_trait(?Send)]55pub trait CaptureBufferReader {56async fn get_next_capture_period(57&mut self,58ex: &Executor,59) -> Result<AsyncCaptureBuffer, BoxError>;60}6162/// Trait to wrap system specific helpers for writing to endpoint playback buffers.63#[async_trait(?Send)]64pub trait PlaybackBufferWriter {65fn new(guest_period_bytes: usize) -> Self66where67Self: Sized;6869/// Returns the period of the endpoint device.70fn endpoint_period_bytes(&self) -> usize;7172/// Read audio samples from the tx virtqueue.73fn copy_to_buffer(74&mut self,75dst_buf: &mut AsyncPlaybackBuffer<'_>,76reader: &mut Reader,77) -> Result<usize, Error> {78dst_buf.copy_from(reader).map_err(Error::Io)79}80}8182#[derive(Debug)]83enum VirtioSndPcmCmd {84SetParams { set_params: SetParams },85Prepare,86Start,87Stop,88Release,89}9091impl fmt::Display for VirtioSndPcmCmd {92fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {93let cmd_code = match self {94VirtioSndPcmCmd::SetParams { set_params: _ } => VIRTIO_SND_R_PCM_SET_PARAMS,95VirtioSndPcmCmd::Prepare => VIRTIO_SND_R_PCM_PREPARE,96VirtioSndPcmCmd::Start => VIRTIO_SND_R_PCM_START,97VirtioSndPcmCmd::Stop => VIRTIO_SND_R_PCM_STOP,98VirtioSndPcmCmd::Release => VIRTIO_SND_R_PCM_RELEASE,99};100f.write_str(get_virtio_snd_r_pcm_cmd_name(cmd_code))101}102}103104#[derive(ThisError, Debug)]105enum VirtioSndPcmCmdError {106#[error("SetParams requires additional parameters")]107SetParams,108#[error("Invalid virtio snd command code")]109InvalidCode,110}111112impl TryFrom<u32> for VirtioSndPcmCmd {113type Error = VirtioSndPcmCmdError;114115fn try_from(code: u32) -> Result<Self, Self::Error> {116match code {117VIRTIO_SND_R_PCM_PREPARE => Ok(VirtioSndPcmCmd::Prepare),118VIRTIO_SND_R_PCM_START => Ok(VirtioSndPcmCmd::Start),119VIRTIO_SND_R_PCM_STOP => Ok(VirtioSndPcmCmd::Stop),120VIRTIO_SND_R_PCM_RELEASE => Ok(VirtioSndPcmCmd::Release),121VIRTIO_SND_R_PCM_SET_PARAMS => Err(VirtioSndPcmCmdError::SetParams),122_ => Err(VirtioSndPcmCmdError::InvalidCode),123}124}125}126127impl VirtioSndPcmCmd {128fn with_set_params_and_direction(129set_params: virtio_snd_pcm_set_params,130dir: u8,131) -> VirtioSndPcmCmd {132let buffer_bytes: u32 = set_params.buffer_bytes.into();133let period_bytes: u32 = set_params.period_bytes.into();134VirtioSndPcmCmd::SetParams {135set_params: SetParams {136channels: set_params.channels,137format: from_virtio_sample_format(set_params.format).unwrap(),138frame_rate: from_virtio_frame_rate(set_params.rate).unwrap(),139buffer_bytes: buffer_bytes as usize,140period_bytes: period_bytes as usize,141dir,142},143}144}145}146147// Returns true if the operation is successful. Returns error if there is148// a runtime/internal error149async fn process_pcm_ctrl(150ex: &Executor,151tx_send: &mpsc::UnboundedSender<PcmResponse>,152rx_send: &mpsc::UnboundedSender<PcmResponse>,153streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>,154cmd: VirtioSndPcmCmd,155writer: &mut Writer,156stream_id: usize,157card_index: usize,158) -> Result<(), Error> {159let streams = streams.read_lock().await;160let mut stream = match streams.get(stream_id) {161Some(stream_info) => stream_info.lock().await,162None => {163error!(164"[Card {}] Stream id={} not found for {}. Error code: VIRTIO_SND_S_BAD_MSG",165card_index, stream_id, cmd166);167return writer168.write_obj(VIRTIO_SND_S_BAD_MSG)169.map_err(Error::WriteResponse);170}171};172173debug!("[Card {}] {} for stream id={}", card_index, cmd, stream_id);174175let result = match cmd {176VirtioSndPcmCmd::SetParams { set_params } => {177let result = stream.set_params(set_params).await;178if result.is_ok() {179debug!(180"[Card {}] VIRTIO_SND_R_PCM_SET_PARAMS for stream id={}. Stream info: {:#?}",181card_index, stream_id, *stream182);183}184result185}186VirtioSndPcmCmd::Prepare => stream.prepare(ex, tx_send, rx_send).await,187VirtioSndPcmCmd::Start => stream.start().await,188VirtioSndPcmCmd::Stop => stream.stop().await,189VirtioSndPcmCmd::Release => stream.release().await,190};191match result {192Ok(_) => writer193.write_obj(VIRTIO_SND_S_OK)194.map_err(Error::WriteResponse),195Err(Error::OperationNotSupported) => {196error!(197"[Card {}] {} for stream id={} failed. Error code: VIRTIO_SND_S_NOT_SUPP.",198card_index, cmd, stream_id199);200201writer202.write_obj(VIRTIO_SND_S_NOT_SUPP)203.map_err(Error::WriteResponse)204}205Err(e) => {206// Runtime/internal error would be more appropriate, but there's207// no such error type208error!(209"[Card {}] {} for stream id={} failed. Error code: VIRTIO_SND_S_IO_ERR. Actual error: {}",210card_index, cmd, stream_id, e211);212writer213.write_obj(VIRTIO_SND_S_IO_ERR)214.map_err(Error::WriteResponse)215}216}217}218219async fn write_data(220mut dst_buf: AsyncPlaybackBuffer<'_>,221reader: Option<&mut Reader>,222buffer_writer: &mut Box<dyn PlaybackBufferWriter>,223) -> Result<u32, Error> {224let transferred = match reader {225Some(reader) => buffer_writer.copy_to_buffer(&mut dst_buf, reader)?,226None => dst_buf227.copy_from(&mut io::repeat(0).take(buffer_writer.endpoint_period_bytes() as u64))228.map_err(Error::Io)?,229};230231if transferred != buffer_writer.endpoint_period_bytes() {232error!(233"Bytes written {} != period_bytes {}",234transferred,235buffer_writer.endpoint_period_bytes()236);237Err(Error::InvalidBufferSize)238} else {239dst_buf.commit().await;240Ok(dst_buf.latency_bytes())241}242}243244async fn read_data(245mut src_buf: AsyncCaptureBuffer<'_>,246writer: Option<&mut Writer>,247period_bytes: usize,248) -> Result<u32, Error> {249let transferred = match writer {250Some(writer) => src_buf.copy_to(writer),251None => src_buf.copy_to(&mut io::sink()),252}253.map_err(Error::Io)?;254if transferred != period_bytes {255error!(256"Bytes written {} != period_bytes {}",257transferred, period_bytes258);259Err(Error::InvalidBufferSize)260} else {261src_buf.commit().await;262Ok(src_buf.latency_bytes())263}264}265266impl From<Result<u32, Error>> for virtio_snd_pcm_status {267fn from(res: Result<u32, Error>) -> Self {268match res {269Ok(latency_bytes) => virtio_snd_pcm_status::new(StatusCode::OK, latency_bytes),270Err(e) => {271error!("PCM I/O message failed: {}", e);272virtio_snd_pcm_status::new(StatusCode::IoErr, 0)273}274}275}276}277278// Drain all DescriptorChain in desc_receiver during WorkerStatus::Quit process.279async fn drain_desc_receiver(280desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>,281sender: &mut mpsc::UnboundedSender<PcmResponse>,282) -> Result<(), Error> {283let mut o_desc_chain = desc_receiver.next().await;284while let Some(desc_chain) = o_desc_chain {285// From the virtio-snd spec:286// The device MUST complete all pending I/O messages for the specified stream ID.287let status = virtio_snd_pcm_status::new(StatusCode::OK, 0);288// Fetch next DescriptorChain to see if the current one is the last one.289o_desc_chain = desc_receiver.next().await;290let (done, future) = if o_desc_chain.is_none() {291let (done, future) = oneshot::channel();292(Some(done), Some(future))293} else {294(None, None)295};296sender297.send(PcmResponse {298desc_chain,299status,300done,301})302.await303.map_err(Error::MpscSend)?;304305if let Some(f) = future {306// From the virtio-snd spec:307// The device MUST NOT complete the control request (VIRTIO_SND_R_PCM_RELEASE)308// while there are pending I/O messages for the specified stream ID.309f.await.map_err(Error::DoneNotTriggered)?;310};311}312Ok(())313}314315/// Start a pcm worker that receives descriptors containing PCM frames (audio data) from the tx/rx316/// queue, and forward them to CRAS. One pcm worker per stream.317///318/// This worker is started when VIRTIO_SND_R_PCM_PREPARE is called, and returned before319/// VIRTIO_SND_R_PCM_RELEASE is completed for the stream.320pub async fn start_pcm_worker(321ex: Executor,322dstream: DirectionalStream,323mut desc_receiver: mpsc::UnboundedReceiver<DescriptorChain>,324status_mutex: Rc<AsyncRwLock<WorkerStatus>>,325mut sender: mpsc::UnboundedSender<PcmResponse>,326period_dur: Duration,327card_index: usize,328muted: Rc<AtomicBool>,329release_signal: Rc<(AsyncRwLock<bool>, Condvar)>,330) -> Result<(), Error> {331let res = pcm_worker_loop(332ex,333dstream,334&mut desc_receiver,335&status_mutex,336&mut sender,337period_dur,338card_index,339muted,340release_signal,341)342.await;343*status_mutex.lock().await = WorkerStatus::Quit;344if res.is_err() {345error!(346"[Card {}] pcm_worker error: {:#?}. Draining desc_receiver",347card_index,348res.as_ref().err()349);350// On error, guaranteed that desc_receiver has not been drained, so drain it here.351// Note that drain blocks until the stream is release.352drain_desc_receiver(&mut desc_receiver, &mut sender).await?;353}354res355}356357async fn pcm_worker_loop(358ex: Executor,359dstream: DirectionalStream,360desc_receiver: &mut mpsc::UnboundedReceiver<DescriptorChain>,361status_mutex: &Rc<AsyncRwLock<WorkerStatus>>,362sender: &mut mpsc::UnboundedSender<PcmResponse>,363period_dur: Duration,364card_index: usize,365muted: Rc<AtomicBool>,366release_signal: Rc<(AsyncRwLock<bool>, Condvar)>,367) -> Result<(), Error> {368let on_release = async {369await_reset_signal(Some(&*release_signal)).await;370// After receiving release signal, wait for up to 2 periods,371// giving it a chance to respond to the last buffer.372if let Err(e) = TimerAsync::sleep(&ex, period_dur * 2).await {373error!(374"[Card {}] Error on sleep after receiving reset signal: {}",375card_index, e376)377}378}379.fuse();380pin_mut!(on_release);381382match dstream {383DirectionalStream::Output(mut sys_direction_output) => loop {384#[cfg(windows)]385let (mut stream, mut buffer_writer_lock) = (386sys_direction_output387.async_playback_buffer_stream388.lock()389.await,390sys_direction_output.buffer_writer.lock().await,391);392#[cfg(windows)]393let buffer_writer = &mut buffer_writer_lock;394#[cfg(any(target_os = "android", target_os = "linux"))]395let (stream, buffer_writer) = (396&mut sys_direction_output.async_playback_buffer_stream,397&mut sys_direction_output.buffer_writer,398);399400let next_buf = stream.next_playback_buffer(&ex).fuse();401pin_mut!(next_buf);402403let dst_buf = select! {404_ = on_release => {405drain_desc_receiver(desc_receiver, sender).await?;406break Ok(());407},408buf = next_buf => buf.map_err(Error::FetchBuffer)?,409};410let worker_status = status_mutex.lock().await;411match *worker_status {412WorkerStatus::Quit => {413drain_desc_receiver(desc_receiver, sender).await?;414if let Err(e) = write_data(dst_buf, None, buffer_writer).await {415error!(416"[Card {}] Error on write_data after worker quit: {}",417card_index, e418)419}420break Ok(());421}422WorkerStatus::Pause => {423write_data(dst_buf, None, buffer_writer).await?;424}425WorkerStatus::Running => match desc_receiver.try_next() {426Err(e) => {427error!(428"[Card {}] Underrun. No new DescriptorChain while running: {}",429card_index, e430);431write_data(dst_buf, None, buffer_writer).await?;432}433Ok(None) => {434error!("[Card {}] Unreachable. status should be Quit when the channel is closed", card_index);435write_data(dst_buf, None, buffer_writer).await?;436return Err(Error::InvalidPCMWorkerState);437}438Ok(Some(mut desc_chain)) => {439let reader = if muted.load(Ordering::Relaxed) {440None441} else {442// stream_id was already read in handle_pcm_queue443Some(&mut desc_chain.reader)444};445let status = write_data(dst_buf, reader, buffer_writer).await.into();446sender447.send(PcmResponse {448desc_chain,449status,450done: None,451})452.await453.map_err(Error::MpscSend)?;454}455},456}457},458DirectionalStream::Input(period_bytes, mut buffer_reader) => loop {459let next_buf = buffer_reader.get_next_capture_period(&ex).fuse();460pin_mut!(next_buf);461462let src_buf = select! {463_ = on_release => {464drain_desc_receiver(desc_receiver, sender).await?;465break Ok(());466},467buf = next_buf => buf.map_err(Error::FetchBuffer)?,468};469470let worker_status = status_mutex.lock().await;471match *worker_status {472WorkerStatus::Quit => {473drain_desc_receiver(desc_receiver, sender).await?;474if let Err(e) = read_data(src_buf, None, period_bytes).await {475error!(476"[Card {}] Error on read_data after worker quit: {}",477card_index, e478)479}480break Ok(());481}482WorkerStatus::Pause => {483read_data(src_buf, None, period_bytes).await?;484}485WorkerStatus::Running => match desc_receiver.try_next() {486Err(e) => {487error!(488"[Card {}] Overrun. No new DescriptorChain while running: {}",489card_index, e490);491read_data(src_buf, None, period_bytes).await?;492}493Ok(None) => {494error!("[Card {}] Unreachable. status should be Quit when the channel is closed", card_index);495read_data(src_buf, None, period_bytes).await?;496return Err(Error::InvalidPCMWorkerState);497}498Ok(Some(mut desc_chain)) => {499let writer = if muted.load(Ordering::Relaxed) {500None501} else {502Some(&mut desc_chain.writer)503};504let status = read_data(src_buf, writer, period_bytes).await.into();505sender506.send(PcmResponse {507desc_chain,508status,509done: None,510})511.await512.map_err(Error::MpscSend)?;513}514},515}516},517}518}519520// Defer pcm message response to the pcm response worker521async fn defer_pcm_response_to_worker(522desc_chain: DescriptorChain,523status: virtio_snd_pcm_status,524response_sender: &mut mpsc::UnboundedSender<PcmResponse>,525) -> Result<(), Error> {526response_sender527.send(PcmResponse {528desc_chain,529status,530done: None,531})532.await533.map_err(Error::MpscSend)534}535536fn send_pcm_response(537mut desc_chain: DescriptorChain,538queue: &mut Queue,539status: virtio_snd_pcm_status,540) -> Result<(), Error> {541let writer = &mut desc_chain.writer;542543// For rx queue only. Fast forward the unused audio data buffer.544if writer.available_bytes() > std::mem::size_of::<virtio_snd_pcm_status>() {545writer546.consume_bytes(writer.available_bytes() - std::mem::size_of::<virtio_snd_pcm_status>());547}548writer.write_obj(status).map_err(Error::WriteResponse)?;549queue.add_used(desc_chain);550queue.trigger_interrupt();551Ok(())552}553554// Await until reset_signal has been released555async fn await_reset_signal(reset_signal_option: Option<&(AsyncRwLock<bool>, Condvar)>) {556match reset_signal_option {557Some((lock, cvar)) => {558let mut reset = lock.lock().await;559while !*reset {560reset = cvar.wait(reset).await;561}562}563None => futures::future::pending().await,564};565}566567pub async fn send_pcm_response_worker(568queue: Rc<AsyncRwLock<Queue>>,569recv: &mut mpsc::UnboundedReceiver<PcmResponse>,570reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>,571) -> Result<(), Error> {572let on_reset = await_reset_signal(reset_signal).fuse();573pin_mut!(on_reset);574575loop {576let next_async = recv.next().fuse();577pin_mut!(next_async);578579let res = select! {580_ = on_reset => break,581res = next_async => res,582};583584if let Some(r) = res {585send_pcm_response(r.desc_chain, &mut *queue.lock().await, r.status)?;586587// Resume pcm_worker588if let Some(done) = r.done {589done.send(()).map_err(Error::OneshotSend)?;590}591} else {592debug!("PcmResponse channel is closed.");593break;594}595}596Ok(())597}598599/// Handle messages from the control tube. This one is not related to virtio spec.600pub async fn handle_ctrl_tube(601streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>,602control_tube: &AsyncTube,603reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>,604) -> Result<(), Error> {605let on_reset = await_reset_signal(reset_signal).fuse();606pin_mut!(on_reset);607608loop {609let next_async = control_tube.next().fuse();610pin_mut!(next_async);611612let cmd = select! {613_ = on_reset => break,614res = next_async => res,615};616617match cmd {618Ok(cmd) => {619let resp = match cmd {620SndControlCommand::MuteAll(muted) => {621let streams = streams.read_lock().await;622for stream in streams.iter() {623let stream_info = stream.lock().await;624stream_info.muted.store(muted, Ordering::Relaxed);625info!("Stream muted = {:?}", muted);626}627VmResponse::Ok628}629};630control_tube631.send(resp)632.await633.map_err(Error::ControlTubeError)?;634}635Err(e) => {636error!("Failed to read the command: {}", e);637return Err(Error::ControlTubeError(e));638}639}640}641642Ok(())643}644645/// Handle messages from the tx or the rx queue. One invocation is needed for646/// each queue.647pub async fn handle_pcm_queue(648streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>,649mut response_sender: mpsc::UnboundedSender<PcmResponse>,650queue: Rc<AsyncRwLock<Queue>>,651queue_event: &EventAsync,652card_index: usize,653reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>,654) -> Result<(), Error> {655let on_reset = await_reset_signal(reset_signal).fuse();656pin_mut!(on_reset);657658loop {659// Manual queue.next_async() to avoid holding the mutex660let next_async = async {661loop {662// Check if there are more descriptors available.663if let Some(chain) = queue.lock().await.pop() {664return Ok(chain);665}666queue_event.next_val().await?;667}668}669.fuse();670pin_mut!(next_async);671672let mut desc_chain = select! {673_ = on_reset => break,674res = next_async => res.map_err(Error::Async)?,675};676677let pcm_xfer: virtio_snd_pcm_xfer =678desc_chain.reader.read_obj().map_err(Error::ReadMessage)?;679let stream_id: usize = u32::from(pcm_xfer.stream_id) as usize;680681let streams = streams.read_lock().await;682let stream_info = match streams.get(stream_id) {683Some(stream_info) => stream_info.read_lock().await,684None => {685error!(686"[Card {}] stream_id ({}) >= num_streams ({})",687card_index,688stream_id,689streams.len()690);691defer_pcm_response_to_worker(692desc_chain,693virtio_snd_pcm_status::new(StatusCode::IoErr, 0),694&mut response_sender,695)696.await?;697continue;698}699};700701match stream_info.sender.as_ref() {702Some(mut s) => {703s.send(desc_chain).await.map_err(Error::MpscSend)?;704if *stream_info.status_mutex.lock().await == WorkerStatus::Quit {705// If sender channel is still intact but worker status is quit,706// the worker quitted unexpectedly. Return error to request a reset.707return Err(Error::PCMWorkerQuittedUnexpectedly);708}709}710None => {711if !stream_info.just_reset {712error!(713"[Card {}] stream {} is not ready. state: {}",714card_index,715stream_id,716get_virtio_snd_r_pcm_cmd_name(stream_info.state)717);718}719defer_pcm_response_to_worker(720desc_chain,721virtio_snd_pcm_status::new(StatusCode::IoErr, 0),722&mut response_sender,723)724.await?;725}726};727}728Ok(())729}730731/// Handle all the control messages from the ctrl queue.732pub async fn handle_ctrl_queue(733ex: &Executor,734streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>,735snd_data: &SndData,736queue: Rc<AsyncRwLock<Queue>>,737queue_event: &mut EventAsync,738tx_send: mpsc::UnboundedSender<PcmResponse>,739rx_send: mpsc::UnboundedSender<PcmResponse>,740card_index: usize,741reset_signal: Option<&(AsyncRwLock<bool>, Condvar)>,742) -> Result<(), Error> {743let on_reset = await_reset_signal(reset_signal).fuse();744pin_mut!(on_reset);745746let mut queue = queue.lock().await;747loop {748let mut desc_chain = {749let next_async = queue.next_async(queue_event).fuse();750pin_mut!(next_async);751752select! {753_ = on_reset => break,754res = next_async => res.map_err(Error::Async)?,755}756};757758let reader = &mut desc_chain.reader;759let writer = &mut desc_chain.writer;760// Don't advance the reader761let code = reader762.peek_obj::<virtio_snd_hdr>()763.map_err(Error::ReadMessage)?764.code765.into();766767let handle_ctrl_msg = async {768match code {769VIRTIO_SND_R_JACK_INFO => {770let query_info: virtio_snd_query_info =771reader.read_obj().map_err(Error::ReadMessage)?;772let start_id: usize = u32::from(query_info.start_id) as usize;773let count: usize = u32::from(query_info.count) as usize;774if start_id + count > snd_data.jack_info.len() {775error!(776"[Card {}] start_id({}) + count({}) must be smaller than \777the number of jacks ({})",778card_index,779start_id,780count,781snd_data.jack_info.len()782);783return writer784.write_obj(VIRTIO_SND_S_BAD_MSG)785.map_err(Error::WriteResponse);786}787// The response consists of the virtio_snd_hdr structure (contains the request788// status code), followed by the device-writable information structures of the789// item. Each information structure begins with the following common header790writer791.write_obj(VIRTIO_SND_S_OK)792.map_err(Error::WriteResponse)?;793for i in start_id..(start_id + count) {794writer795.write_all(snd_data.jack_info[i].as_bytes())796.map_err(Error::WriteResponse)?;797}798Ok(())799}800VIRTIO_SND_R_PCM_INFO => {801let query_info: virtio_snd_query_info =802reader.read_obj().map_err(Error::ReadMessage)?;803let start_id: usize = u32::from(query_info.start_id) as usize;804let count: usize = u32::from(query_info.count) as usize;805if start_id + count > snd_data.pcm_info.len() {806error!(807"[Card {}] start_id({}) + count({}) must be smaller than \808the number of streams ({})",809card_index,810start_id,811count,812snd_data.pcm_info.len()813);814return writer815.write_obj(VIRTIO_SND_S_BAD_MSG)816.map_err(Error::WriteResponse);817}818// The response consists of the virtio_snd_hdr structure (contains the request819// status code), followed by the device-writable information structures of the820// item. Each information structure begins with the following common header821writer822.write_obj(VIRTIO_SND_S_OK)823.map_err(Error::WriteResponse)?;824for i in start_id..(start_id + count) {825writer826.write_all(snd_data.pcm_info[i].as_bytes())827.map_err(Error::WriteResponse)?;828}829Ok(())830}831VIRTIO_SND_R_CHMAP_INFO => {832let query_info: virtio_snd_query_info =833reader.read_obj().map_err(Error::ReadMessage)?;834let start_id: usize = u32::from(query_info.start_id) as usize;835let count: usize = u32::from(query_info.count) as usize;836if start_id + count > snd_data.chmap_info.len() {837error!(838"[Card {}] start_id({}) + count({}) must be smaller than \839the number of chmaps ({})",840card_index,841start_id,842count,843snd_data.chmap_info.len()844);845return writer846.write_obj(VIRTIO_SND_S_BAD_MSG)847.map_err(Error::WriteResponse);848}849// The response consists of the virtio_snd_hdr structure (contains the request850// status code), followed by the device-writable information structures of the851// item. Each information structure begins with the following common header852writer853.write_obj(VIRTIO_SND_S_OK)854.map_err(Error::WriteResponse)?;855for i in start_id..(start_id + count) {856writer857.write_all(snd_data.chmap_info[i].as_bytes())858.map_err(Error::WriteResponse)?;859}860Ok(())861}862VIRTIO_SND_R_JACK_REMAP => {863unreachable!("remap is unsupported");864}865VIRTIO_SND_R_PCM_SET_PARAMS => {866// Raise VIRTIO_SND_S_BAD_MSG or IO error?867let set_params: virtio_snd_pcm_set_params =868reader.read_obj().map_err(Error::ReadMessage)?;869let stream_id: usize = u32::from(set_params.hdr.stream_id) as usize;870let buffer_bytes: u32 = set_params.buffer_bytes.into();871let period_bytes: u32 = set_params.period_bytes.into();872873let dir = match snd_data.pcm_info.get(stream_id) {874Some(pcm_info) => {875if set_params.channels < pcm_info.channels_min876|| set_params.channels > pcm_info.channels_max877{878error!(879"[Card {}] Number of channels ({}) must be between {} and {}",880card_index,881set_params.channels,882pcm_info.channels_min,883pcm_info.channels_max884);885return writer886.write_obj(VIRTIO_SND_S_NOT_SUPP)887.map_err(Error::WriteResponse);888}889if (u64::from(pcm_info.formats) & (1 << set_params.format)) == 0 {890error!(891"[Card {}] PCM format {} is not supported.",892card_index, set_params.format893);894return writer895.write_obj(VIRTIO_SND_S_NOT_SUPP)896.map_err(Error::WriteResponse);897}898if (u64::from(pcm_info.rates) & (1 << set_params.rate)) == 0 {899error!(900"[Card {}] PCM frame rate {} is not supported.",901card_index, set_params.rate902);903return writer904.write_obj(VIRTIO_SND_S_NOT_SUPP)905.map_err(Error::WriteResponse);906}907908pcm_info.direction909}910None => {911error!(912"[Card {}] stream_id {} < streams {}",913card_index,914stream_id,915snd_data.pcm_info.len()916);917return writer918.write_obj(VIRTIO_SND_S_BAD_MSG)919.map_err(Error::WriteResponse);920}921};922923if set_params.features != 0 {924error!("[Card {}] No feature is supported", card_index);925return writer926.write_obj(VIRTIO_SND_S_NOT_SUPP)927.map_err(Error::WriteResponse);928}929930if buffer_bytes % period_bytes != 0 {931error!(932"[Card {}] buffer_bytes({}) must be dividable by period_bytes({})",933card_index, buffer_bytes, period_bytes934);935return writer936.write_obj(VIRTIO_SND_S_BAD_MSG)937.map_err(Error::WriteResponse);938}939940process_pcm_ctrl(941ex,942&tx_send,943&rx_send,944streams,945VirtioSndPcmCmd::with_set_params_and_direction(set_params, dir),946writer,947stream_id,948card_index,949)950.await951}952VIRTIO_SND_R_PCM_PREPARE953| VIRTIO_SND_R_PCM_START954| VIRTIO_SND_R_PCM_STOP955| VIRTIO_SND_R_PCM_RELEASE => {956let hdr: virtio_snd_pcm_hdr = reader.read_obj().map_err(Error::ReadMessage)?;957let stream_id: usize = u32::from(hdr.stream_id) as usize;958let cmd = match VirtioSndPcmCmd::try_from(code) {959Ok(cmd) => cmd,960Err(err) => {961error!(962"[Card {}] Error converting code to command: {}",963card_index, err964);965return writer966.write_obj(VIRTIO_SND_S_BAD_MSG)967.map_err(Error::WriteResponse);968}969};970process_pcm_ctrl(971ex, &tx_send, &rx_send, streams, cmd, writer, stream_id, card_index,972)973.await974.and(Ok(()))?;975Ok(())976}977c => {978error!("[Card {}] Unrecognized code: {}", card_index, c);979writer980.write_obj(VIRTIO_SND_S_BAD_MSG)981.map_err(Error::WriteResponse)982}983}984};985986handle_ctrl_msg.await?;987queue.add_used(desc_chain);988queue.trigger_interrupt();989}990Ok(())991}992993/// Send events to the audio driver.994pub async fn handle_event_queue(995mut queue: Queue,996mut queue_event: EventAsync,997) -> Result<(), Error> {998loop {999let desc_chain = queue1000.next_async(&mut queue_event)1001.await1002.map_err(Error::Async)?;10031004// TODO(woodychow): Poll and forward events from cras asynchronously (API to be added)1005queue.add_used(desc_chain);1006queue.trigger_interrupt();1007}1008}10091010#[cfg(test)]1011mod tests {1012use std::sync::Arc;10131014use audio_streams::NoopStreamSourceGenerator;1015use base::Tube;10161017use super::*;1018use crate::virtio::snd::common_backend::notify_reset_signal;10191020#[test]1021fn test_handle_ctrl_tube_reset_signal() {1022let ex = Executor::new().expect("Failed to create an executor");1023let result = ex.run_until(async {1024let streams: Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>> = Default::default();1025let (t0, _t1) = Tube::pair().expect("Failed to create tube pairs");1026let t0 = AsyncTube::new(&ex, t0).expect("Failed to create async tube");1027let reset_signal = (AsyncRwLock::new(false), Condvar::new());10281029let handle_future = handle_ctrl_tube(&streams, &t0, Some(&reset_signal));1030let notify_future = notify_reset_signal(&reset_signal);1031let (result, _) = futures::join!(handle_future, notify_future);10321033assert!(1034result.is_ok(),1035"handle_ctrl_tube returns an error after reset signal"1036);1037});10381039assert!(result.is_ok(), "ex.run_until returns an error");1040}10411042fn new_stream() -> StreamInfo {1043let card_index = 0;1044StreamInfo::builder(1045Arc::new(Box::new(NoopStreamSourceGenerator::new())),1046card_index,1047)1048.build()1049}10501051#[test]1052fn test_handle_ctrl_tube_receive_mute_cmd() {1053let ex = Executor::new().expect("Failed to create an executor");1054let result = ex.run_until(async {1055let streams: Vec<AsyncRwLock<StreamInfo>> = vec![AsyncRwLock::new(new_stream())];1056let streams = Rc::new(AsyncRwLock::new(streams));10571058let (t0, t1) = Tube::pair().expect("Failed to create tube pairs");1059let t0 = AsyncTube::new(&ex, t0).expect("Failed to create an async tube");1060let t1 = AsyncTube::new(&ex, t1).expect("Failed to create an async tube");1061let reset_signal = (AsyncRwLock::new(false), Condvar::new());10621063let handle_future = handle_ctrl_tube(&streams, &t0, Some(&reset_signal));1064let tube_future = async {1065let _ = t1.send(&SndControlCommand::MuteAll(true)).await;1066let recv_result = t1.next::<VmResponse>().await;1067notify_reset_signal(&reset_signal).await;1068recv_result1069};1070let (handle_result, tube_result) = futures::join!(handle_future, tube_future);10711072assert!(1073handle_result.is_ok(),1074"handle_ctrl_tube returns an error after reset signal"1075);1076assert!(tube_result.is_ok(), "Failed to receive data from the tube");1077assert!(1078matches!(tube_result.unwrap(), VmResponse::Ok),1079"tube_result is not Ok",1080);10811082let streams = streams.read_lock().await;1083let stream = streams.first().unwrap().lock().await;1084assert!(stream.muted.load(Ordering::Relaxed), "Stream is not muted");1085});10861087assert!(result.is_ok(), "ex.run_until returns an error");1088}1089}109010911092