Path: blob/main/devices/src/virtio/snd/common_backend/mod.rs
5394 views
// Copyright 2021 The ChromiumOS Authors1// Use of this source code is governed by a BSD-style license that can be2// found in the LICENSE file.34// virtio-sound spec: https://github.com/oasis-tcs/virtio-spec/blob/master/virtio-sound.tex56use std::collections::BTreeMap;7use std::io;8use std::rc::Rc;9use std::sync::Arc;1011use anyhow::anyhow;12use anyhow::Context;13use audio_streams::BoxError;14use base::debug;15use base::error;16use base::warn;17use base::AsRawDescriptor;18use base::Descriptor;19use base::Error as SysError;20use base::Event;21use base::RawDescriptor;22use base::Tube;23use base::WorkerThread;24use cros_async::block_on;25use cros_async::sync::Condvar;26use cros_async::sync::RwLock as AsyncRwLock;27use cros_async::AsyncError;28use cros_async::AsyncTube;29use cros_async::EventAsync;30use cros_async::Executor;31use futures::channel::mpsc;32use futures::channel::oneshot;33use futures::channel::oneshot::Canceled;34use futures::future::FusedFuture;35use futures::join;36use futures::pin_mut;37use futures::select;38use futures::FutureExt;39use serde::Deserialize;40use serde::Serialize;41use snapshot::AnySnapshot;42use thiserror::Error as ThisError;43use vm_memory::GuestMemory;44use zerocopy::IntoBytes;4546use crate::virtio::async_utils;47use crate::virtio::copy_config;48use crate::virtio::device_constants::snd::virtio_snd_config;49use crate::virtio::snd::common_backend::async_funcs::*;50use crate::virtio::snd::common_backend::stream_info::StreamInfo;51use crate::virtio::snd::common_backend::stream_info::StreamInfoBuilder;52use crate::virtio::snd::common_backend::stream_info::StreamInfoSnapshot;53use crate::virtio::snd::constants::*;54use crate::virtio::snd::file_backend::create_file_stream_source_generators;55use crate::virtio::snd::file_backend::Error as FileError;56use crate::virtio::snd::layout::*;57use crate::virtio::snd::null_backend::create_null_stream_source_generators;58use crate::virtio::snd::parameters::Parameters;59use crate::virtio::snd::parameters::StreamSourceBackend;60use crate::virtio::snd::sys::create_stream_source_generators as sys_create_stream_source_generators;61use crate::virtio::snd::sys::set_audio_thread_priority;62use crate::virtio::snd::sys::SysAsyncStreamObjects;63use crate::virtio::snd::sys::SysAudioStreamSourceGenerator;64use crate::virtio::snd::sys::SysDirectionOutput;65use crate::virtio::DescriptorChain;66use crate::virtio::DeviceType;67use crate::virtio::Interrupt;68use crate::virtio::Queue;69use crate::virtio::VirtioDevice;7071pub mod async_funcs;72pub mod stream_info;7374// control + event + tx + rx queue75pub const MAX_QUEUE_NUM: usize = 4;76pub const MAX_VRING_LEN: u16 = 1024;7778#[derive(ThisError, Debug)]79pub enum Error {80/// next_async failed.81#[error("Failed to read descriptor asynchronously: {0}")]82Async(AsyncError),83/// Creating stream failed.84#[error("Failed to create stream: {0}")]85CreateStream(BoxError),86/// Creating stream failed.87#[error("No stream source found.")]88EmptyStreamSource,89/// Creating kill event failed.90#[error("Failed to create kill event: {0}")]91CreateKillEvent(SysError),92/// Creating WaitContext failed.93#[error("Failed to create wait context: {0}")]94CreateWaitContext(SysError),95#[error("Failed to create file stream source generator")]96CreateFileStreamSourceGenerator(FileError),97/// Cloning kill event failed.98#[error("Failed to clone kill event: {0}")]99CloneKillEvent(SysError),100// Future error.101#[error("Unexpected error. Done was not triggered before dropped: {0}")]102DoneNotTriggered(Canceled),103/// Error reading message from queue.104#[error("Failed to read message: {0}")]105ReadMessage(io::Error),106/// Failed writing a response to a control message.107#[error("Failed to write message response: {0}")]108WriteResponse(io::Error),109// Mpsc read error.110#[error("Error in mpsc: {0}")]111MpscSend(futures::channel::mpsc::SendError),112// Oneshot send error.113#[error("Error in oneshot send")]114OneshotSend(()),115/// Failure in communicating with the host116#[error("Failed to send/receive to/from control tube")]117ControlTubeError(base::TubeError),118/// Stream not found.119#[error("stream id ({0}) < num_streams ({1})")]120StreamNotFound(usize, usize),121/// Fetch buffer error122#[error("Failed to get buffer from CRAS: {0}")]123FetchBuffer(BoxError),124/// Invalid buffer size125#[error("Invalid buffer size")]126InvalidBufferSize,127/// IoError128#[error("I/O failed: {0}")]129Io(io::Error),130/// Operation not supported.131#[error("Operation not supported")]132OperationNotSupported,133/// Writing to a buffer in the guest failed.134#[error("failed to write to buffer: {0}")]135WriteBuffer(io::Error),136// Invalid PCM worker state.137#[error("Invalid PCM worker state")]138InvalidPCMWorkerState,139// Invalid backend.140#[error("Backend is not implemented")]141InvalidBackend,142// Failed to generate StreamSource143#[error("Failed to generate stream source: {0}")]144GenerateStreamSource(BoxError),145// PCM worker unexpectedly quitted.146#[error("PCM worker quitted unexpectedly")]147PCMWorkerQuittedUnexpectedly,148}149150pub enum DirectionalStream {151Input(152usize, // `period_size` in `usize`153Box<dyn CaptureBufferReader>,154),155Output(SysDirectionOutput),156}157158#[derive(Copy, Clone, std::cmp::PartialEq, Eq)]159pub enum WorkerStatus {160Pause = 0,161Running = 1,162Quit = 2,163}164165// Stores constant data166#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]167pub struct SndData {168pub(crate) jack_info: Vec<virtio_snd_jack_info>,169pub(crate) pcm_info: Vec<virtio_snd_pcm_info>,170pub(crate) chmap_info: Vec<virtio_snd_chmap_info>,171}172173impl SndData {174pub fn pcm_info_len(&self) -> usize {175self.pcm_info.len()176}177178pub fn pcm_info_iter(&self) -> std::slice::Iter<'_, virtio_snd_pcm_info> {179self.pcm_info.iter()180}181}182183const SUPPORTED_FORMATS: u64 = 1 << VIRTIO_SND_PCM_FMT_U8184| 1 << VIRTIO_SND_PCM_FMT_S16185| 1 << VIRTIO_SND_PCM_FMT_S24186| 1 << VIRTIO_SND_PCM_FMT_S32;187const SUPPORTED_FRAME_RATES: u64 = 1 << VIRTIO_SND_PCM_RATE_8000188| 1 << VIRTIO_SND_PCM_RATE_11025189| 1 << VIRTIO_SND_PCM_RATE_16000190| 1 << VIRTIO_SND_PCM_RATE_22050191| 1 << VIRTIO_SND_PCM_RATE_32000192| 1 << VIRTIO_SND_PCM_RATE_44100193| 1 << VIRTIO_SND_PCM_RATE_48000;194195// Response from pcm_worker to pcm_queue196pub struct PcmResponse {197pub(crate) desc_chain: DescriptorChain,198pub(crate) status: virtio_snd_pcm_status, // response to the pcm message199pub(crate) done: Option<oneshot::Sender<()>>, // when pcm response is written to the queue200}201202pub struct VirtioSnd {203control_tube: Option<Tube>,204cfg: virtio_snd_config,205snd_data: SndData,206stream_info_builders: Vec<StreamInfoBuilder>,207avail_features: u64,208acked_features: u64,209queue_sizes: Box<[u16]>,210worker_thread: Option<WorkerThread<WorkerReturn>>,211keep_rds: Vec<Descriptor>,212streams_state: Option<Vec<StreamInfoSnapshot>>,213card_index: usize,214}215216#[derive(Serialize, Deserialize)]217struct VirtioSndSnapshot {218avail_features: u64,219acked_features: u64,220queue_sizes: Vec<u16>,221streams_state: Option<Vec<StreamInfoSnapshot>>,222snd_data: SndData,223}224225impl VirtioSnd {226pub fn new(227base_features: u64,228params: Parameters,229control_tube: Tube,230) -> Result<VirtioSnd, Error> {231let params = resize_parameters_pcm_device_config(params);232let cfg = hardcoded_virtio_snd_config(¶ms);233let snd_data = hardcoded_snd_data(¶ms);234let avail_features = base_features;235let mut keep_rds: Vec<RawDescriptor> = Vec::new();236keep_rds.push(control_tube.as_raw_descriptor());237238let stream_info_builders =239create_stream_info_builders(¶ms, &snd_data, &mut keep_rds, params.card_index)?;240241Ok(VirtioSnd {242control_tube: Some(control_tube),243cfg,244snd_data,245stream_info_builders,246avail_features,247acked_features: 0,248queue_sizes: vec![MAX_VRING_LEN; MAX_QUEUE_NUM].into_boxed_slice(),249worker_thread: None,250keep_rds: keep_rds.iter().map(|rd| Descriptor(*rd)).collect(),251streams_state: None,252card_index: params.card_index,253})254}255}256257fn create_stream_source_generators(258params: &Parameters,259snd_data: &SndData,260keep_rds: &mut Vec<RawDescriptor>,261) -> Result<Vec<SysAudioStreamSourceGenerator>, Error> {262let generators = match params.backend {263StreamSourceBackend::NULL => create_null_stream_source_generators(snd_data),264StreamSourceBackend::FILE => {265create_file_stream_source_generators(params, snd_data, keep_rds)266.map_err(Error::CreateFileStreamSourceGenerator)?267}268StreamSourceBackend::Sys(backend) => {269sys_create_stream_source_generators(backend, params, snd_data)270}271};272Ok(generators)273}274275/// Creates [`StreamInfoBuilder`]s by calling [`create_stream_source_generators()`] then zip276/// them with [`crate::virtio::snd::parameters::PCMDeviceParameters`] from the params to set277/// the parameters on each [`StreamInfoBuilder`] (e.g. effects).278pub(crate) fn create_stream_info_builders(279params: &Parameters,280snd_data: &SndData,281keep_rds: &mut Vec<RawDescriptor>,282card_index: usize,283) -> Result<Vec<StreamInfoBuilder>, Error> {284Ok(create_stream_source_generators(params, snd_data, keep_rds)?285.into_iter()286.map(Arc::new)287.zip(snd_data.pcm_info_iter())288.map(|(generator, pcm_info)| {289let device_params = params.get_device_params(pcm_info).unwrap_or_default();290StreamInfo::builder(generator, card_index)291.effects(device_params.effects.unwrap_or_default())292})293.collect())294}295296// To be used with hardcoded_snd_data297pub fn hardcoded_virtio_snd_config(params: &Parameters) -> virtio_snd_config {298virtio_snd_config {299jacks: 0.into(),300streams: params.get_total_streams().into(),301chmaps: (params.num_output_devices * 3 + params.num_input_devices).into(),302}303}304305// To be used with hardcoded_virtio_snd_config306pub fn hardcoded_snd_data(params: &Parameters) -> SndData {307let jack_info: Vec<virtio_snd_jack_info> = Vec::new();308let mut pcm_info: Vec<virtio_snd_pcm_info> = Vec::new();309let mut chmap_info: Vec<virtio_snd_chmap_info> = Vec::new();310311for dev in 0..params.num_output_devices {312for _ in 0..params.num_output_streams {313pcm_info.push(virtio_snd_pcm_info {314hdr: virtio_snd_info {315hda_fn_nid: dev.into(),316},317features: 0.into(), /* 1 << VIRTIO_SND_PCM_F_XXX */318formats: SUPPORTED_FORMATS.into(),319rates: SUPPORTED_FRAME_RATES.into(),320direction: VIRTIO_SND_D_OUTPUT,321channels_min: 1,322channels_max: 6,323padding: [0; 5],324});325}326}327for dev in 0..params.num_input_devices {328for _ in 0..params.num_input_streams {329pcm_info.push(virtio_snd_pcm_info {330hdr: virtio_snd_info {331hda_fn_nid: dev.into(),332},333features: 0.into(), /* 1 << VIRTIO_SND_PCM_F_XXX */334formats: SUPPORTED_FORMATS.into(),335rates: SUPPORTED_FRAME_RATES.into(),336direction: VIRTIO_SND_D_INPUT,337channels_min: 1,338channels_max: 2,339padding: [0; 5],340});341}342}343// Use stereo channel map.344let mut positions = [VIRTIO_SND_CHMAP_NONE; VIRTIO_SND_CHMAP_MAX_SIZE];345positions[0] = VIRTIO_SND_CHMAP_FL;346positions[1] = VIRTIO_SND_CHMAP_FR;347for dev in 0..params.num_output_devices {348chmap_info.push(virtio_snd_chmap_info {349hdr: virtio_snd_info {350hda_fn_nid: dev.into(),351},352direction: VIRTIO_SND_D_OUTPUT,353channels: 2,354positions,355});356}357for dev in 0..params.num_input_devices {358chmap_info.push(virtio_snd_chmap_info {359hdr: virtio_snd_info {360hda_fn_nid: dev.into(),361},362direction: VIRTIO_SND_D_INPUT,363channels: 2,364positions,365});366}367positions[2] = VIRTIO_SND_CHMAP_RL;368positions[3] = VIRTIO_SND_CHMAP_RR;369for dev in 0..params.num_output_devices {370chmap_info.push(virtio_snd_chmap_info {371hdr: virtio_snd_info {372hda_fn_nid: dev.into(),373},374direction: VIRTIO_SND_D_OUTPUT,375channels: 4,376positions,377});378}379positions[2] = VIRTIO_SND_CHMAP_FC;380positions[3] = VIRTIO_SND_CHMAP_LFE;381positions[4] = VIRTIO_SND_CHMAP_RL;382positions[5] = VIRTIO_SND_CHMAP_RR;383for dev in 0..params.num_output_devices {384chmap_info.push(virtio_snd_chmap_info {385hdr: virtio_snd_info {386hda_fn_nid: dev.into(),387},388direction: VIRTIO_SND_D_OUTPUT,389channels: 6,390positions,391});392}393394SndData {395jack_info,396pcm_info,397chmap_info,398}399}400401fn resize_parameters_pcm_device_config(mut params: Parameters) -> Parameters {402if params.output_device_config.len() > params.num_output_devices as usize {403warn!("Truncating output device config due to length > number of output devices");404}405params406.output_device_config407.resize_with(params.num_output_devices as usize, Default::default);408409if params.input_device_config.len() > params.num_input_devices as usize {410warn!("Truncating input device config due to length > number of input devices");411}412params413.input_device_config414.resize_with(params.num_input_devices as usize, Default::default);415416params417}418419impl VirtioDevice for VirtioSnd {420fn keep_rds(&self) -> Vec<RawDescriptor> {421self.keep_rds422.iter()423.map(|descr| descr.as_raw_descriptor())424.collect()425}426427fn device_type(&self) -> DeviceType {428DeviceType::Sound429}430431fn queue_max_sizes(&self) -> &[u16] {432&self.queue_sizes433}434435fn features(&self) -> u64 {436self.avail_features437}438439fn ack_features(&mut self, mut v: u64) {440// Check if the guest is ACK'ing a feature that we didn't claim to have.441let unrequested_features = v & !self.avail_features;442if unrequested_features != 0 {443warn!("virtio_fs got unknown feature ack: {:x}", v);444445// Don't count these features as acked.446v &= !unrequested_features;447}448self.acked_features |= v;449}450451fn read_config(&self, offset: u64, data: &mut [u8]) {452copy_config(data, 0, self.cfg.as_bytes(), offset)453}454455fn activate(456&mut self,457_guest_mem: GuestMemory,458_interrupt: Interrupt,459queues: BTreeMap<usize, Queue>,460) -> anyhow::Result<()> {461if queues.len() != self.queue_sizes.len() {462return Err(anyhow!(463"snd: expected {} queues, got {}",464self.queue_sizes.len(),465queues.len()466));467}468469let snd_data = self.snd_data.clone();470let stream_info_builders = self.stream_info_builders.to_vec();471let streams_state = self.streams_state.take();472let card_index = self.card_index;473let control_tube = self.control_tube.take().unwrap();474self.worker_thread = Some(WorkerThread::start("v_snd_common", move |kill_evt| {475let _thread_priority_handle = set_audio_thread_priority();476if let Err(e) = _thread_priority_handle {477warn!("Failed to set audio thread to real time: {}", e);478};479run_worker(480queues,481snd_data,482kill_evt,483stream_info_builders,484streams_state,485card_index,486control_tube,487)488}));489490Ok(())491}492493fn reset(&mut self) -> anyhow::Result<()> {494if let Some(worker_thread) = self.worker_thread.take() {495let worker = worker_thread.stop();496self.control_tube = Some(worker.control_tube);497}498499Ok(())500}501502fn virtio_sleep(&mut self) -> anyhow::Result<Option<BTreeMap<usize, Queue>>> {503if let Some(worker_thread) = self.worker_thread.take() {504let worker = worker_thread.stop();505self.control_tube = Some(worker.control_tube);506self.snd_data = worker.snd_data;507self.streams_state = Some(worker.streams_state);508return Ok(Some(BTreeMap::from_iter(509worker.queues.into_iter().enumerate(),510)));511}512Ok(None)513}514515fn virtio_wake(516&mut self,517device_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>,518) -> anyhow::Result<()> {519match device_state {520None => Ok(()),521Some((mem, interrupt, queues)) => {522// TODO: activate is just what we want at the moment, but we should probably move523// it into a "start workers" function to make it obvious that it isn't strictly524// used for activate events.525self.activate(mem, interrupt, queues)?;526Ok(())527}528}529}530531fn virtio_snapshot(&mut self) -> anyhow::Result<AnySnapshot> {532let streams_state = if let Some(states) = &self.streams_state {533let mut state_vec = Vec::new();534for state in states {535state_vec.push(state.clone());536}537Some(state_vec)538} else {539None540};541AnySnapshot::to_any(VirtioSndSnapshot {542avail_features: self.avail_features,543acked_features: self.acked_features,544queue_sizes: self.queue_sizes.to_vec(),545streams_state,546snd_data: self.snd_data.clone(),547})548.context("failed to Serialize Sound device")549}550551fn virtio_restore(&mut self, data: AnySnapshot) -> anyhow::Result<()> {552let mut deser: VirtioSndSnapshot =553AnySnapshot::from_any(data).context("failed to Deserialize Sound device")?;554anyhow::ensure!(555deser.avail_features == self.avail_features,556"avail features doesn't match on restore: expected: {}, got: {}",557deser.avail_features,558self.avail_features559);560anyhow::ensure!(561deser.queue_sizes == self.queue_sizes.to_vec(),562"queue sizes doesn't match on restore: expected: {:?}, got: {:?}",563deser.queue_sizes,564self.queue_sizes.to_vec()565);566self.acked_features = deser.acked_features;567anyhow::ensure!(568deser.snd_data == self.snd_data,569"snd data doesn't match on restore: expected: {:?}, got: {:?}",570deser.snd_data,571self.snd_data572);573self.acked_features = deser.acked_features;574self.streams_state = deser.streams_state.take();575Ok(())576}577}578579#[derive(PartialEq)]580enum LoopState {581Continue,582Break,583}584585fn run_worker(586queues: BTreeMap<usize, Queue>,587snd_data: SndData,588kill_evt: Event,589stream_info_builders: Vec<StreamInfoBuilder>,590streams_state: Option<Vec<StreamInfoSnapshot>>,591card_index: usize,592control_tube: Tube,593) -> WorkerReturn {594let ex = Executor::new().expect("Failed to create an executor");595let control_tube = AsyncTube::new(&ex, control_tube).expect("failed to create async snd tube");596597if snd_data.pcm_info_len() != stream_info_builders.len() {598error!(599"snd: expected {} streams, got {}",600snd_data.pcm_info_len(),601stream_info_builders.len(),602);603}604let streams: Vec<AsyncRwLock<StreamInfo>> = stream_info_builders605.into_iter()606.map(StreamInfoBuilder::build)607.map(AsyncRwLock::new)608.collect();609610let (tx_send, mut tx_recv) = mpsc::unbounded();611let (rx_send, mut rx_recv) = mpsc::unbounded();612let tx_send_clone = tx_send.clone();613let rx_send_clone = rx_send.clone();614let restore_task = ex.spawn_local(async move {615if let Some(states) = &streams_state {616let ex = Executor::new().expect("Failed to create an executor");617for (stream, state) in streams.iter().zip(states.iter()) {618stream.lock().await.restore(state);619if state.state == VIRTIO_SND_R_PCM_START || state.state == VIRTIO_SND_R_PCM_PREPARE620{621stream622.lock()623.await624.prepare(&ex, &tx_send_clone, &rx_send_clone)625.await626.expect("failed to prepare PCM");627}628if state.state == VIRTIO_SND_R_PCM_START {629stream630.lock()631.await632.start()633.await634.expect("failed to start PCM");635}636}637}638streams639});640let streams = ex641.run_until(restore_task)642.expect("failed to restore streams");643let streams = Rc::new(AsyncRwLock::new(streams));644645let mut queues: Vec<(Queue, EventAsync)> = queues646.into_values()647.map(|q| {648let e = q.event().try_clone().expect("Failed to clone queue event");649(650q,651EventAsync::new(e, &ex).expect("Failed to create async event for queue"),652)653})654.collect();655656let (ctrl_queue, mut ctrl_queue_evt) = queues.remove(0);657let ctrl_queue = Rc::new(AsyncRwLock::new(ctrl_queue));658let (_event_queue, _event_queue_evt) = queues.remove(0);659let (tx_queue, tx_queue_evt) = queues.remove(0);660let (rx_queue, rx_queue_evt) = queues.remove(0);661662let tx_queue = Rc::new(AsyncRwLock::new(tx_queue));663let rx_queue = Rc::new(AsyncRwLock::new(rx_queue));664665// Exit if the kill event is triggered.666let f_kill = async_utils::await_and_exit(&ex, kill_evt).fuse();667668pin_mut!(f_kill);669670loop {671if run_worker_once(672&ex,673&streams,674&snd_data,675&mut f_kill,676ctrl_queue.clone(),677&mut ctrl_queue_evt,678tx_queue.clone(),679&tx_queue_evt,680tx_send.clone(),681&mut tx_recv,682rx_queue.clone(),683&rx_queue_evt,684rx_send.clone(),685&mut rx_recv,686card_index,687&control_tube,688) == LoopState::Break689{690break;691}692693if let Err(e) = reset_streams(694&ex,695&streams,696&tx_queue,697&mut tx_recv,698&rx_queue,699&mut rx_recv,700) {701error!("Error reset streams: {}", e);702break;703}704}705let streams_state_task = ex.spawn_local(async move {706let mut v = Vec::new();707for stream in streams.read_lock().await.iter() {708v.push(stream.read_lock().await.snapshot());709}710v711});712let streams_state = ex713.run_until(streams_state_task)714.expect("failed to save streams state");715let ctrl_queue = match Rc::try_unwrap(ctrl_queue) {716Ok(q) => q.into_inner(),717Err(_) => panic!("Too many refs to ctrl_queue"),718};719let tx_queue = match Rc::try_unwrap(tx_queue) {720Ok(q) => q.into_inner(),721Err(_) => panic!("Too many refs to tx_queue"),722};723let rx_queue = match Rc::try_unwrap(rx_queue) {724Ok(q) => q.into_inner(),725Err(_) => panic!("Too many refs to rx_queue"),726};727let queues = vec![ctrl_queue, _event_queue, tx_queue, rx_queue];728729WorkerReturn {730control_tube: control_tube.into(),731queues,732snd_data,733streams_state,734}735}736737struct WorkerReturn {738control_tube: Tube,739queues: Vec<Queue>,740snd_data: SndData,741streams_state: Vec<StreamInfoSnapshot>,742}743744async fn notify_reset_signal(reset_signal: &(AsyncRwLock<bool>, Condvar)) {745let (lock, cvar) = reset_signal;746*lock.lock().await = true;747cvar.notify_all();748}749750/// Runs all workers once and exit if any worker exit.751///752/// Returns [`LoopState::Break`] if the worker `f_kill` exits, or something went753/// wrong on shutdown process. The caller should not run the worker again and should exit the main754/// loop.755///756/// If this function returns [`LoopState::Continue`], the caller can continue the main loop by757/// resetting the streams and run the worker again.758fn run_worker_once(759ex: &Executor,760streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>,761snd_data: &SndData,762mut f_kill: &mut (impl FusedFuture<Output = anyhow::Result<()>> + Unpin),763ctrl_queue: Rc<AsyncRwLock<Queue>>,764ctrl_queue_evt: &mut EventAsync,765tx_queue: Rc<AsyncRwLock<Queue>>,766tx_queue_evt: &EventAsync,767tx_send: mpsc::UnboundedSender<PcmResponse>,768tx_recv: &mut mpsc::UnboundedReceiver<PcmResponse>,769rx_queue: Rc<AsyncRwLock<Queue>>,770rx_queue_evt: &EventAsync,771rx_send: mpsc::UnboundedSender<PcmResponse>,772rx_recv: &mut mpsc::UnboundedReceiver<PcmResponse>,773card_index: usize,774control_tube: &AsyncTube,775) -> LoopState {776let tx_send2 = tx_send.clone();777let rx_send2 = rx_send.clone();778779let reset_signal = (AsyncRwLock::new(false), Condvar::new());780781let f_host_ctrl = handle_ctrl_tube(streams, control_tube, Some(&reset_signal)).fuse();782783let f_ctrl = handle_ctrl_queue(784ex,785streams,786snd_data,787ctrl_queue,788ctrl_queue_evt,789tx_send,790rx_send,791card_index,792Some(&reset_signal),793)794.fuse();795796// TODO(woodychow): Enable this when libcras sends jack connect/disconnect evts797// let f_event = handle_event_queue(798// snd_state,799// event_queue,800// event_queue_evt,801// );802let f_tx = handle_pcm_queue(803streams,804tx_send2,805tx_queue.clone(),806tx_queue_evt,807card_index,808Some(&reset_signal),809)810.fuse();811let f_tx_response = send_pcm_response_worker(tx_queue, tx_recv, Some(&reset_signal)).fuse();812let f_rx = handle_pcm_queue(813streams,814rx_send2,815rx_queue.clone(),816rx_queue_evt,817card_index,818Some(&reset_signal),819)820.fuse();821let f_rx_response = send_pcm_response_worker(rx_queue, rx_recv, Some(&reset_signal)).fuse();822823pin_mut!(824f_host_ctrl,825f_ctrl,826f_tx,827f_tx_response,828f_rx,829f_rx_response830);831832let done = async {833select! {834res = f_host_ctrl => (res.context("error in handling host control command"), LoopState::Continue),835res = f_ctrl => (res.context("error in handling ctrl queue"), LoopState::Continue),836res = f_tx => (res.context("error in handling tx queue"), LoopState::Continue),837res = f_tx_response => (res.context("error in handling tx response"), LoopState::Continue),838res = f_rx => (res.context("error in handling rx queue"), LoopState::Continue),839res = f_rx_response => (res.context("error in handling rx response"), LoopState::Continue),840841// For following workers, do not continue the loop842res = f_kill => (res.context("error in await_and_exit"), LoopState::Break),843}844};845846match ex.run_until(done) {847Ok((res, loop_state)) => {848if let Err(e) = res {849error!("Error in worker: {:#}", e);850}851if loop_state == LoopState::Break {852return LoopState::Break;853}854}855Err(e) => {856error!("Error happened in executor: {}", e);857}858}859860warn!("Shutting down all workers for reset procedure");861block_on(notify_reset_signal(&reset_signal));862863let shutdown = async {864loop {865let (res, worker_name) = select!(866res = f_ctrl => (res, "f_ctrl"),867res = f_tx => (res, "f_tx"),868res = f_tx_response => (res, "f_tx_response"),869res = f_rx => (res, "f_rx"),870res = f_rx_response => (res, "f_rx_response"),871complete => break,872);873match res {874Ok(_) => debug!("Worker {} stopped", worker_name),875Err(e) => error!("Worker {} stopped with error {}", worker_name, e),876};877}878};879880if let Err(e) = ex.run_until(shutdown) {881error!("Error happened in executor while shutdown: {}", e);882return LoopState::Break;883}884885LoopState::Continue886}887888fn reset_streams(889ex: &Executor,890streams: &Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>,891tx_queue: &Rc<AsyncRwLock<Queue>>,892tx_recv: &mut mpsc::UnboundedReceiver<PcmResponse>,893rx_queue: &Rc<AsyncRwLock<Queue>>,894rx_recv: &mut mpsc::UnboundedReceiver<PcmResponse>,895) -> Result<(), AsyncError> {896let reset_signal = (AsyncRwLock::new(false), Condvar::new());897898let do_reset = async {899let streams = streams.read_lock().await;900for stream_info in &*streams {901let mut stream_info = stream_info.lock().await;902if stream_info.state == VIRTIO_SND_R_PCM_START {903if let Err(e) = stream_info.stop().await {904error!("Error on stop while resetting stream: {}", e);905}906}907if stream_info.state == VIRTIO_SND_R_PCM_STOP908|| stream_info.state == VIRTIO_SND_R_PCM_PREPARE909{910if let Err(e) = stream_info.release().await {911error!("Error on release while resetting stream: {}", e);912}913}914stream_info.just_reset = true;915}916917notify_reset_signal(&reset_signal).await;918};919920// Run these in a loop to ensure that they will survive until do_reset is finished921let f_tx_response = async {922while send_pcm_response_worker(tx_queue.clone(), tx_recv, Some(&reset_signal))923.await924.is_err()925{}926};927928let f_rx_response = async {929while send_pcm_response_worker(rx_queue.clone(), rx_recv, Some(&reset_signal))930.await931.is_err()932{}933};934935let reset = async {936join!(f_tx_response, f_rx_response, do_reset);937};938939ex.run_until(reset)940}941942#[cfg(test)]943#[allow(clippy::needless_update)]944mod tests {945use audio_streams::StreamEffect;946947use super::*;948use crate::virtio::snd::parameters::PCMDeviceParameters;949950#[test]951fn test_virtio_snd_new() {952let params = Parameters {953num_output_devices: 3,954num_input_devices: 2,955num_output_streams: 3,956num_input_streams: 2,957output_device_config: vec![PCMDeviceParameters {958effects: Some(vec![StreamEffect::EchoCancellation]),959..PCMDeviceParameters::default()960}],961input_device_config: vec![PCMDeviceParameters {962effects: Some(vec![StreamEffect::EchoCancellation]),963..PCMDeviceParameters::default()964}],965..Default::default()966};967968let (t0, _t1) = Tube::pair().expect("failed to create tube");969let res = VirtioSnd::new(123, params, t0).unwrap();970971// Default values972assert_eq!(res.snd_data.jack_info.len(), 0);973assert_eq!(res.acked_features, 0);974assert_eq!(res.worker_thread.is_none(), true);975976assert_eq!(res.avail_features, 123); // avail_features must be equal to the input977assert_eq!(res.cfg.jacks.to_native(), 0);978assert_eq!(res.cfg.streams.to_native(), 13); // (Output = 3*3) + (Input = 2*2)979assert_eq!(res.cfg.chmaps.to_native(), 11); // (Output = 3*3) + (Input = 2*1)980981// Check snd_data.pcm_info982assert_eq!(res.snd_data.pcm_info.len(), 13);983// Check hda_fn_nid (PCM Device number)984let expected_hda_fn_nid = [0, 0, 0, 1, 1, 1, 2, 2, 2, 0, 0, 1, 1];985for (i, pcm_info) in res.snd_data.pcm_info.iter().enumerate() {986assert_eq!(987pcm_info.hdr.hda_fn_nid.to_native(),988expected_hda_fn_nid[i],989"pcm_info index {i} incorrect hda_fn_nid"990);991}992// First 9 devices must be OUTPUT993for i in 0..9 {994assert_eq!(995res.snd_data.pcm_info[i].direction, VIRTIO_SND_D_OUTPUT,996"pcm_info index {i} incorrect direction"997);998}999// Next 4 devices must be INPUT1000for i in 9..13 {1001assert_eq!(1002res.snd_data.pcm_info[i].direction, VIRTIO_SND_D_INPUT,1003"pcm_info index {i} incorrect direction"1004);1005}10061007// Check snd_data.chmap_info1008assert_eq!(res.snd_data.chmap_info.len(), 11);1009let expected_hda_fn_nid = [0, 1, 2, 0, 1, 0, 1, 2, 0, 1, 2];1010// Check hda_fn_nid (PCM Device number)1011for (i, chmap_info) in res.snd_data.chmap_info.iter().enumerate() {1012assert_eq!(1013chmap_info.hdr.hda_fn_nid.to_native(),1014expected_hda_fn_nid[i],1015"chmap_info index {i} incorrect hda_fn_nid"1016);1017}1018}10191020#[test]1021fn test_resize_parameters_pcm_device_config_truncate() {1022// If pcm_device_config is larger than number of devices, it will be truncated1023let params = Parameters {1024num_output_devices: 1,1025num_input_devices: 1,1026output_device_config: vec![PCMDeviceParameters::default(); 3],1027input_device_config: vec![PCMDeviceParameters::default(); 3],1028..Parameters::default()1029};1030let params = resize_parameters_pcm_device_config(params);1031assert_eq!(params.output_device_config.len(), 1);1032assert_eq!(params.input_device_config.len(), 1);1033}10341035#[test]1036fn test_resize_parameters_pcm_device_config_extend() {1037let params = Parameters {1038num_output_devices: 3,1039num_input_devices: 2,1040num_output_streams: 3,1041num_input_streams: 2,1042output_device_config: vec![PCMDeviceParameters {1043effects: Some(vec![StreamEffect::EchoCancellation]),1044..PCMDeviceParameters::default()1045}],1046input_device_config: vec![PCMDeviceParameters {1047effects: Some(vec![StreamEffect::EchoCancellation]),1048..PCMDeviceParameters::default()1049}],1050..Default::default()1051};10521053let params = resize_parameters_pcm_device_config(params);10541055// Check output_device_config correctly extended1056assert_eq!(1057params.output_device_config,1058vec![1059PCMDeviceParameters {1060// Keep from the parameters1061effects: Some(vec![StreamEffect::EchoCancellation]),1062..PCMDeviceParameters::default()1063},1064PCMDeviceParameters::default(), // Extended with default1065PCMDeviceParameters::default(), // Extended with default1066]1067);10681069// Check input_device_config correctly extended1070assert_eq!(1071params.input_device_config,1072vec![1073PCMDeviceParameters {1074// Keep from the parameters1075effects: Some(vec![StreamEffect::EchoCancellation]),1076..PCMDeviceParameters::default()1077},1078PCMDeviceParameters::default(), // Extended with default1079]1080);1081}1082}108310841085