Path: blob/main/devices/src/virtio/vhost_user_backend/snd.rs
5394 views
// Copyright 2021 The ChromiumOS Authors1// Use of this source code is governed by a BSD-style license that can be2// found in the LICENSE file.34pub mod sys;56use std::borrow::Borrow;7use std::rc::Rc;89use anyhow::anyhow;10use anyhow::bail;11use anyhow::Context;12use base::error;13use base::warn;14use cros_async::sync::RwLock as AsyncRwLock;15use cros_async::EventAsync;16use cros_async::Executor;17use futures::channel::mpsc;18use futures::FutureExt;19use hypervisor::ProtectionType;20use serde::Deserialize;21use serde::Serialize;22use snapshot::AnySnapshot;23pub use sys::run_snd_device;24pub use sys::Options;25use vm_memory::GuestMemory;26use vmm_vhost::message::VhostUserProtocolFeatures;27use vmm_vhost::VHOST_USER_F_PROTOCOL_FEATURES;28use zerocopy::IntoBytes;2930use crate::virtio;31use crate::virtio::copy_config;32use crate::virtio::device_constants::snd::virtio_snd_config;33use crate::virtio::snd::common_backend::async_funcs::handle_ctrl_queue;34use crate::virtio::snd::common_backend::async_funcs::handle_pcm_queue;35use crate::virtio::snd::common_backend::async_funcs::send_pcm_response_worker;36use crate::virtio::snd::common_backend::create_stream_info_builders;37use crate::virtio::snd::common_backend::hardcoded_snd_data;38use crate::virtio::snd::common_backend::hardcoded_virtio_snd_config;39use crate::virtio::snd::common_backend::stream_info::StreamInfo;40use crate::virtio::snd::common_backend::stream_info::StreamInfoBuilder;41use crate::virtio::snd::common_backend::stream_info::StreamInfoSnapshot;42use crate::virtio::snd::common_backend::Error;43use crate::virtio::snd::common_backend::PcmResponse;44use crate::virtio::snd::common_backend::SndData;45use crate::virtio::snd::common_backend::MAX_QUEUE_NUM;46use crate::virtio::snd::constants::VIRTIO_SND_R_PCM_PREPARE;47use crate::virtio::snd::constants::VIRTIO_SND_R_PCM_START;48use crate::virtio::snd::parameters::Parameters;49use crate::virtio::vhost_user_backend::handler::DeviceRequestHandler;50use crate::virtio::vhost_user_backend::handler::Error as DeviceError;51use crate::virtio::vhost_user_backend::handler::VhostUserDevice;52use crate::virtio::vhost_user_backend::handler::WorkerState;53use crate::virtio::vhost_user_backend::VhostUserDeviceBuilder;54use crate::virtio::Queue;5556// Async workers:57// 0 - ctrl58// 1 - event59// 2 - tx60// 3 - rx61const PCM_RESPONSE_WORKER_IDX_OFFSET: usize = 2;62struct SndBackend {63ex: Executor,64cfg: virtio_snd_config,65avail_features: u64,66workers: [Option<WorkerState<Rc<AsyncRwLock<Queue>>, Result<(), Error>>>; MAX_QUEUE_NUM],67// tx and rx68response_workers: [Option<WorkerState<Rc<AsyncRwLock<Queue>>, Result<(), Error>>>; 2],69snd_data: Rc<SndData>,70streams: Rc<AsyncRwLock<Vec<AsyncRwLock<StreamInfo>>>>,71tx_send: mpsc::UnboundedSender<PcmResponse>,72rx_send: mpsc::UnboundedSender<PcmResponse>,73tx_recv: Option<mpsc::UnboundedReceiver<PcmResponse>>,74rx_recv: Option<mpsc::UnboundedReceiver<PcmResponse>>,75// Appended to logs for when there are mutliple audio devices.76card_index: usize,77unmap_guest_memory_on_fork: bool,78}7980#[derive(Serialize, Deserialize)]81struct SndBackendSnapshot {82avail_features: u64,83stream_infos: Option<Vec<StreamInfoSnapshot>>,84snd_data: SndData,85}8687impl SndBackend {88pub fn new(89ex: &Executor,90params: Parameters,91#[cfg(windows)] audio_client_guid: Option<String>,92card_index: usize,93) -> anyhow::Result<Self> {94let cfg = hardcoded_virtio_snd_config(¶ms);95let avail_features = virtio::base_features(ProtectionType::Unprotected)96| 1 << VHOST_USER_F_PROTOCOL_FEATURES;9798let snd_data = hardcoded_snd_data(¶ms);99let mut keep_rds = Vec::new();100let builders = create_stream_info_builders(¶ms, &snd_data, &mut keep_rds, card_index)?;101102if snd_data.pcm_info_len() != builders.len() {103error!(104"[Card {}] snd: expected {} stream info builders, got {}",105card_index,106snd_data.pcm_info_len(),107builders.len(),108)109}110111let streams = builders.into_iter();112113#[cfg(windows)]114let streams = streams115.map(|stream_builder| stream_builder.audio_client_guid(audio_client_guid.clone()));116117let streams = streams118.map(StreamInfoBuilder::build)119.map(AsyncRwLock::new)120.collect();121let streams = Rc::new(AsyncRwLock::new(streams));122123let (tx_send, tx_recv) = mpsc::unbounded();124let (rx_send, rx_recv) = mpsc::unbounded();125126#[cfg(any(target_os = "android", target_os = "linux"))]127let unmap_guest_memory_on_fork = params.unmap_guest_memory_on_fork;128#[cfg(not(any(target_os = "android", target_os = "linux")))]129let unmap_guest_memory_on_fork = false;130131Ok(SndBackend {132ex: ex.clone(),133cfg,134avail_features,135workers: Default::default(),136response_workers: Default::default(),137snd_data: Rc::new(snd_data),138streams,139tx_send,140rx_send,141tx_recv: Some(tx_recv),142rx_recv: Some(rx_recv),143card_index,144unmap_guest_memory_on_fork,145})146}147}148149impl VhostUserDeviceBuilder for SndBackend {150fn build(self: Box<Self>, _ex: &Executor) -> anyhow::Result<Box<dyn vmm_vhost::Backend>> {151let handler = DeviceRequestHandler::new(*self);152Ok(Box::new(handler))153}154}155156impl VhostUserDevice for SndBackend {157fn max_queue_num(&self) -> usize {158MAX_QUEUE_NUM159}160161fn features(&self) -> u64 {162self.avail_features163}164165fn protocol_features(&self) -> VhostUserProtocolFeatures {166VhostUserProtocolFeatures::CONFIG167| VhostUserProtocolFeatures::MQ168| VhostUserProtocolFeatures::DEVICE_STATE169}170171fn unmap_guest_memory_on_fork(&self) -> bool {172self.unmap_guest_memory_on_fork173}174175fn read_config(&self, offset: u64, data: &mut [u8]) {176copy_config(data, 0, self.cfg.as_bytes(), offset)177}178179fn reset(&mut self) {180for worker in self.workers.iter_mut().filter_map(Option::take) {181let _ = self.ex.run_until(worker.queue_task.cancel());182}183}184185fn start_queue(186&mut self,187idx: usize,188queue: virtio::Queue,189_mem: GuestMemory,190) -> anyhow::Result<()> {191if self.workers[idx].is_some() {192warn!(193"[Card {}] Starting new queue handler without stopping old handler",194self.card_index195);196self.stop_queue(idx)?;197}198199let kick_evt = queue200.event()201.try_clone()202.with_context(|| format!("[Card {}] failed to clone queue event", self.card_index))?;203let mut kick_evt = EventAsync::new(kick_evt, &self.ex).with_context(|| {204format!(205"[Card {}] failed to create EventAsync for kick_evt",206self.card_index207)208})?;209let queue = Rc::new(AsyncRwLock::new(queue));210let card_index = self.card_index;211let queue_task = match idx {2120 => {213// ctrl queue214let streams = self.streams.clone();215let snd_data = self.snd_data.clone();216let tx_send = self.tx_send.clone();217let rx_send = self.rx_send.clone();218let ctrl_queue = queue.clone();219220let ex_clone = self.ex.clone();221Some(self.ex.spawn_local(async move {222handle_ctrl_queue(223&ex_clone,224&streams,225&snd_data,226ctrl_queue,227&mut kick_evt,228tx_send,229rx_send,230card_index,231None,232)233.await234}))235}236// TODO(woodychow): Add event queue support237//238// Note: Even though we don't support the event queue, we still need to keep track of239// the Queue so we can return it back in stop_queue. As such, we create a do nothing240// future to "run" this queue so that we track a WorkerState for it (which is how241// we return the Queue back).2421 => Some(self.ex.spawn_local(async move { Ok(()) })),2432 | 3 => {244let (send, recv) = if idx == 2 {245(self.tx_send.clone(), self.tx_recv.take())246} else {247(self.rx_send.clone(), self.rx_recv.take())248};249let mut recv = recv.ok_or_else(|| {250anyhow!("[Card {}] queue restart is not supported", self.card_index)251})?;252let streams = Rc::clone(&self.streams);253let queue_pcm_queue = queue.clone();254let queue_task = self.ex.spawn_local(async move {255handle_pcm_queue(&streams, send, queue_pcm_queue, &kick_evt, card_index, None)256.await257});258259let queue_response_queue = queue.clone();260let response_queue_task = self.ex.spawn_local(async move {261send_pcm_response_worker(queue_response_queue, &mut recv, None).await262});263264self.response_workers[idx - PCM_RESPONSE_WORKER_IDX_OFFSET] = Some(WorkerState {265queue_task: response_queue_task,266queue: queue.clone(),267});268269Some(queue_task)270}271_ => bail!(272"[Card {}] attempted to start unknown queue: {}",273self.card_index,274idx275),276};277278if let Some(queue_task) = queue_task {279self.workers[idx] = Some(WorkerState { queue_task, queue });280}281Ok(())282}283284fn stop_queue(&mut self, idx: usize) -> anyhow::Result<virtio::Queue> {285let worker_queue_rc = self286.workers287.get_mut(idx)288.and_then(Option::take)289.map(|worker| {290// Wait for queue_task to be aborted.291let _ = self.ex.run_until(worker.queue_task.cancel());292worker.queue293});294295if idx == 2 || idx == 3 {296if let Some(worker) = self297.response_workers298.get_mut(idx - PCM_RESPONSE_WORKER_IDX_OFFSET)299.and_then(Option::take)300{301// Wait for queue_task to be aborted.302let _ = self.ex.run_until(worker.queue_task.cancel());303}304}305306if let Some(queue_rc) = worker_queue_rc {307match Rc::try_unwrap(queue_rc) {308Ok(queue_mutex) => Ok(queue_mutex.into_inner()),309Err(_) => panic!(310"[Card {}] failed to recover queue from worker",311self.card_index312),313}314} else {315Err(anyhow::Error::new(DeviceError::WorkerNotFound))316}317}318319fn snapshot(&mut self) -> anyhow::Result<AnySnapshot> {320// now_or_never will succeed here because no workers are running.321let stream_info_snaps = if let Some(stream_infos) = &self.streams.lock().now_or_never() {322let mut snaps = Vec::new();323for stream_info in stream_infos.iter() {324snaps.push(325stream_info326.lock()327.now_or_never()328.unwrap_or_else(|| {329panic!(330"[Card {}] failed to lock audio state during snapshot",331self.card_index332)333})334.snapshot(),335);336}337Some(snaps)338} else {339None340};341let snd_data_ref: &SndData = self.snd_data.borrow();342AnySnapshot::to_any(SndBackendSnapshot {343avail_features: self.avail_features,344stream_infos: stream_info_snaps,345snd_data: snd_data_ref.clone(),346})347.with_context(|| {348format!(349"[Card {}] Failed to serialize SndBackendSnapshot",350self.card_index351)352})353}354355fn restore(&mut self, data: AnySnapshot) -> anyhow::Result<()> {356let deser: SndBackendSnapshot = AnySnapshot::from_any(data).with_context(|| {357format!(358"[Card {}] Failed to deserialize SndBackendSnapshot",359self.card_index360)361})?;362anyhow::ensure!(363deser.avail_features == self.avail_features,364"[Card {}] avail features doesn't match on restore: expected: {}, got: {}",365self.card_index,366deser.avail_features,367self.avail_features368);369let snd_data = self.snd_data.borrow();370anyhow::ensure!(371&deser.snd_data == snd_data,372"[Card {}] snd data doesn't match on restore: expected: {:?}, got: {:?}",373self.card_index,374deser.snd_data,375snd_data,376);377378let ex_clone = self.ex.clone();379let streams_rc = self.streams.clone();380let tx_send_clone = self.tx_send.clone();381let rx_send_clone = self.rx_send.clone();382383let card_index = self.card_index;384let restore_task =385self.ex.spawn_local(async move {386if let Some(stream_infos) = &deser.stream_infos {387for (stream, stream_info) in388streams_rc.lock().await.iter().zip(stream_infos.iter())389{390stream.lock().await.restore(stream_info);391if stream_info.state == VIRTIO_SND_R_PCM_START392|| stream_info.state == VIRTIO_SND_R_PCM_PREPARE393{394stream395.lock()396.await397.prepare(&ex_clone, &tx_send_clone, &rx_send_clone)398.await399.unwrap_or_else(|_| {400panic!("[Card {card_index}] failed to prepare PCM")401});402}403if stream_info.state == VIRTIO_SND_R_PCM_START {404stream.lock().await.start().await.unwrap_or_else(|_| {405panic!("[Card {card_index}] failed to start PCM")406});407}408}409}410});411self.ex412.run_until(restore_task)413.unwrap_or_else(|_| panic!("[Card {}] failed to restore streams", self.card_index));414Ok(())415}416417fn enter_suspended_state(&mut self) -> anyhow::Result<()> {418// This device has no non-queue workers to stop.419Ok(())420}421}422423424