Path: blob/main/devices/src/virtio/snd/vios_backend/shm_vios.rs
5394 views
// Copyright 2020 The ChromiumOS Authors1// Use of this source code is governed by a BSD-style license that can be2// found in the LICENSE file.34use std::collections::HashMap;5use std::collections::VecDeque;6use std::fs::File;7use std::io::Error as IOError;8use std::io::ErrorKind as IOErrorKind;9use std::io::Seek;10use std::io::SeekFrom;11use std::path::Path;12use std::path::PathBuf;13use std::sync::mpsc::channel;14use std::sync::mpsc::Receiver;15use std::sync::mpsc::RecvError;16use std::sync::mpsc::Sender;17use std::sync::Arc;1819use base::error;20use base::AsRawDescriptor;21use base::Error as BaseError;22use base::Event;23use base::EventToken;24use base::FromRawDescriptor;25use base::IntoRawDescriptor;26use base::MemoryMapping;27use base::MemoryMappingBuilder;28use base::MmapError;29use base::RawDescriptor;30use base::SafeDescriptor;31use base::ScmSocket;32use base::UnixSeqpacket;33use base::VolatileMemory;34use base::VolatileMemoryError;35use base::VolatileSlice;36use base::WaitContext;37use base::WorkerThread;38use remain::sorted;39use serde::Deserialize;40use serde::Serialize;41use sync::Mutex;42use thiserror::Error as ThisError;43use zerocopy::FromBytes;44use zerocopy::Immutable;45use zerocopy::IntoBytes;46use zerocopy::KnownLayout;4748use crate::virtio::snd::constants::*;49use crate::virtio::snd::layout::*;50use crate::virtio::snd::vios_backend::streams::StreamState;5152pub type Result<T> = std::result::Result<T, Error>;5354#[sorted]55#[derive(ThisError, Debug)]56pub enum Error {57#[error("Error memory mapping client_shm: {0}")]58BaseMmapError(BaseError),59#[error("Sender was dropped without sending buffer status, the recv thread may have exited")]60BufferStatusSenderLost(RecvError),61#[error("Command failed with status {0}")]62CommandFailed(u32),63#[error("Error duplicating file descriptor: {0}")]64DupError(BaseError),65#[error("Failed to create Recv event: {0}")]66EventCreateError(BaseError),67#[error("Failed to dup Recv event: {0}")]68EventDupError(BaseError),69#[error("Failed to signal event: {0}")]70EventWriteError(BaseError),71#[error("Failed to get size of tx shared memory: {0}")]72FileSizeError(IOError),73#[error("Error accessing guest's shared memory: {0}")]74GuestMmapError(MmapError),75#[error("No jack with id {0}")]76InvalidJackId(u32),77#[error("No stream with id {0}")]78InvalidStreamId(u32),79#[error("IO buffer operation failed: status = {0}")]80IOBufferError(u32),81#[error("No PCM streams available")]82NoStreamsAvailable,83#[error("Insuficient space for the new buffer in the queue's buffer area")]84OutOfSpace,85#[error("Platform not supported")]86PlatformNotSupported,87#[error("{0}")]88ProtocolError(ProtocolErrorKind),89#[error("Failed to connect to VioS server {1}: {0:?}")]90ServerConnectionError(IOError, PathBuf),91#[error("Failed to communicate with VioS server: {0:?}")]92ServerError(IOError),93#[error("Failed to communicate with VioS server: {0:?}")]94ServerIOError(IOError),95#[error("Error accessing VioS server's shared memory: {0}")]96ServerMmapError(MmapError),97#[error("Failed to duplicate UnixSeqpacket: {0}")]98UnixSeqpacketDupError(IOError),99#[error("Unsupported frame rate: {0}")]100UnsupportedFrameRate(u32),101#[error("Error accessing volatile memory: {0}")]102VolatileMemoryError(VolatileMemoryError),103#[error("Failed to create Recv thread's WaitContext: {0}")]104WaitContextCreateError(BaseError),105#[error("Error waiting for events")]106WaitError(BaseError),107#[error("Invalid operation for stream direction: {0}")]108WrongDirection(u8),109#[error("Set saved params should only be used while restoring the device")]110WrongSetParams,111}112113#[derive(ThisError, Debug)]114pub enum ProtocolErrorKind {115#[error("The server sent a config of the wrong size: {0}")]116UnexpectedConfigSize(usize),117#[error("Received {1} file descriptors from the server, expected {0}")]118UnexpectedNumberOfFileDescriptors(usize, usize), // expected, received119#[error("Server's version ({0}) doesn't match client's")]120VersionMismatch(u32),121#[error("Received a msg with an unexpected size: expected {0}, received {1}")]122UnexpectedMessageSize(usize, usize), // expected, received123}124125/// The client for the VioS backend126///127/// Uses a protocol equivalent to virtio-snd over a shared memory file and a unix socket for128/// notifications. It's thread safe, it can be encapsulated in an Arc smart pointer and shared129/// between threads.130pub struct VioSClient {131// These mutexes should almost never be held simultaneously. If at some point they have to the132// locking order should match the order in which they are declared here.133config: VioSConfig,134jacks: Vec<virtio_snd_jack_info>,135streams: Vec<virtio_snd_pcm_info>,136chmaps: Vec<virtio_snd_chmap_info>,137// The control socket is used from multiple threads to send and wait for a reply, which needs138// to happen atomically, hence the need for a mutex instead of just sharing clones of the139// socket.140control_socket: Mutex<UnixSeqpacket>,141event_socket: UnixSeqpacket,142// These are thread safe and don't require locking143tx: IoBufferQueue,144rx: IoBufferQueue,145// This is accessed by the recv_thread and whatever thread processes the events146events: Arc<Mutex<VecDeque<virtio_snd_event>>>,147event_notifier: Event,148// These are accessed by the recv_thread and the stream threads149tx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>,150rx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>,151recv_thread_state: Arc<Mutex<ThreadFlags>>,152recv_thread: Mutex<Option<WorkerThread<()>>>,153// Params are required to be stored for snapshot/restore. On restore, we don't have the params154// locally available as the VM is started anew, so they need to be restored.155params: HashMap<u32, virtio_snd_pcm_set_params>,156}157158#[derive(Serialize, Deserialize)]159pub struct VioSClientSnapshot {160config: VioSConfig,161jacks: Vec<virtio_snd_jack_info>,162streams: Vec<virtio_snd_pcm_info>,163chmaps: Vec<virtio_snd_chmap_info>,164params: HashMap<u32, virtio_snd_pcm_set_params>,165}166167impl VioSClient {168/// Create a new client given the path to the audio server's socket.169pub fn try_new<P: AsRef<Path>>(server: P) -> Result<VioSClient> {170let client_socket = ScmSocket::try_from(171UnixSeqpacket::connect(server.as_ref())172.map_err(|e| Error::ServerConnectionError(e, server.as_ref().into()))?,173)174.map_err(|e| Error::ServerConnectionError(e, server.as_ref().into()))?;175let mut config: VioSConfig = Default::default();176const NUM_FDS: usize = 5;177let (recv_size, mut safe_fds) = client_socket178.recv_with_fds(config.as_mut_bytes(), NUM_FDS)179.map_err(Error::ServerError)?;180181if recv_size != std::mem::size_of::<VioSConfig>() {182return Err(Error::ProtocolError(183ProtocolErrorKind::UnexpectedConfigSize(recv_size),184));185}186187if config.version != VIOS_VERSION {188return Err(Error::ProtocolError(ProtocolErrorKind::VersionMismatch(189config.version,190)));191}192193fn pop<T: FromRawDescriptor>(194safe_fds: &mut Vec<SafeDescriptor>,195expected: usize,196received: usize,197) -> Result<T> {198// SAFETY:199// Safe because we transfer ownership from the SafeDescriptor to T200unsafe {201Ok(T::from_raw_descriptor(202safe_fds203.pop()204.ok_or(Error::ProtocolError(205ProtocolErrorKind::UnexpectedNumberOfFileDescriptors(206expected, received,207),208))?209.into_raw_descriptor(),210))211}212}213214let fd_count = safe_fds.len();215let rx_shm_file = pop::<File>(&mut safe_fds, NUM_FDS, fd_count)?;216let tx_shm_file = pop::<File>(&mut safe_fds, NUM_FDS, fd_count)?;217let rx_socket = pop::<UnixSeqpacket>(&mut safe_fds, NUM_FDS, fd_count)?;218let tx_socket = pop::<UnixSeqpacket>(&mut safe_fds, NUM_FDS, fd_count)?;219let event_socket = pop::<UnixSeqpacket>(&mut safe_fds, NUM_FDS, fd_count)?;220221if !safe_fds.is_empty() {222return Err(Error::ProtocolError(223ProtocolErrorKind::UnexpectedNumberOfFileDescriptors(NUM_FDS, fd_count),224));225}226227let tx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>> =228Arc::new(Mutex::new(HashMap::new()));229let rx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>> =230Arc::new(Mutex::new(HashMap::new()));231let recv_thread_state = Arc::new(Mutex::new(ThreadFlags {232reporting_events: false,233}));234235let mut client = VioSClient {236config,237jacks: Vec::new(),238streams: Vec::new(),239chmaps: Vec::new(),240control_socket: Mutex::new(client_socket.into_inner()),241event_socket,242tx: IoBufferQueue::new(tx_socket, tx_shm_file)?,243rx: IoBufferQueue::new(rx_socket, rx_shm_file)?,244events: Arc::new(Mutex::new(VecDeque::new())),245event_notifier: Event::new().map_err(Error::EventCreateError)?,246tx_subscribers,247rx_subscribers,248recv_thread_state,249recv_thread: Mutex::new(None),250params: HashMap::new(),251};252client.request_and_cache_info()?;253Ok(client)254}255256/// Get the number of jacks257pub fn num_jacks(&self) -> u32 {258self.config.jacks259}260261/// Get the number of pcm streams262pub fn num_streams(&self) -> u32 {263self.config.streams264}265266/// Get the number of channel maps267pub fn num_chmaps(&self) -> u32 {268self.config.chmaps269}270271/// Get the configuration information on a jack272pub fn jack_info(&self, idx: u32) -> Option<virtio_snd_jack_info> {273self.jacks.get(idx as usize).copied()274}275276/// Get the configuration information on a pcm stream277pub fn stream_info(&self, idx: u32) -> Option<virtio_snd_pcm_info> {278self.streams.get(idx as usize).cloned()279}280281/// Get the configuration information on a channel map282pub fn chmap_info(&self, idx: u32) -> Option<virtio_snd_chmap_info> {283self.chmaps.get(idx as usize).copied()284}285286/// Starts the background thread that receives release messages from the server. If the thread287/// was already started this function does nothing.288/// This thread must be started prior to attempting any stream IO operation or the calling289/// thread would block.290pub fn start_bg_thread(&self) -> Result<()> {291if self.recv_thread.lock().is_some() {292return Ok(());293}294let tx_socket = self.tx.try_clone_socket()?;295let rx_socket = self.rx.try_clone_socket()?;296let event_socket = self297.event_socket298.try_clone()299.map_err(Error::UnixSeqpacketDupError)?;300let mut opt = self.recv_thread.lock();301// The lock on recv_thread was released above to avoid holding more than one lock at a time302// while duplicating the fds. So we have to check the condition again.303if opt.is_none() {304let tx_subscribers = self.tx_subscribers.clone();305let rx_subscribers = self.rx_subscribers.clone();306let event_notifier = self307.event_notifier308.try_clone()309.map_err(Error::EventDupError)?;310let events = self.events.clone();311let recv_thread_state = self.recv_thread_state.clone();312*opt = Some(WorkerThread::start("shm_vios", move |kill_event| {313if let Err(e) = run_recv_thread(314kill_event,315tx_subscribers,316rx_subscribers,317event_notifier,318events,319recv_thread_state,320tx_socket,321rx_socket,322event_socket,323) {324error!("virtio-snd shm_vios worker failed: {e:#}");325}326}));327}328Ok(())329}330331/// Stops the background thread.332pub fn stop_bg_thread(&self) -> Result<()> {333if let Some(recv_thread) = self.recv_thread.lock().take() {334recv_thread.stop();335}336Ok(())337}338339/// Gets an Event object that will trigger every time an event is received from the server340pub fn get_event_notifier(&self) -> Result<Event> {341// Let the background thread know that there is at least one consumer of events342self.recv_thread_state.lock().reporting_events = true;343self.event_notifier344.try_clone()345.map_err(Error::EventDupError)346}347348/// Retrieves one event. Callers should have received a notification through the event notifier349/// before calling this function.350pub fn pop_event(&self) -> Option<virtio_snd_event> {351self.events.lock().pop_front()352}353354/// Remap a jack. This should only be called if the jack announces support for the operation355/// through the features field in the corresponding virtio_snd_jack_info struct.356pub fn remap_jack(&self, jack_id: u32, association: u32, sequence: u32) -> Result<()> {357if jack_id >= self.config.jacks {358return Err(Error::InvalidJackId(jack_id));359}360let msg = virtio_snd_jack_remap {361hdr: virtio_snd_jack_hdr {362hdr: virtio_snd_hdr {363code: VIRTIO_SND_R_JACK_REMAP.into(),364},365jack_id: jack_id.into(),366},367association: association.into(),368sequence: sequence.into(),369};370let control_socket_lock = self.control_socket.lock();371send_cmd(&control_socket_lock, msg)372}373374/// Configures a stream with the given parameters.375pub fn set_stream_parameters(376&mut self,377stream_id: u32,378params: VioSStreamParams,379) -> Result<()> {380self.streams381.get(stream_id as usize)382.ok_or(Error::InvalidStreamId(stream_id))?;383let raw_params: virtio_snd_pcm_set_params = (stream_id, params).into();384// Old value is not needed and is dropped385let _ = self.params.insert(stream_id, raw_params);386let control_socket_lock = self.control_socket.lock();387send_cmd(&control_socket_lock, raw_params)388}389390/// Configures a stream with the given parameters.391pub fn set_stream_parameters_raw(392&mut self,393raw_params: virtio_snd_pcm_set_params,394) -> Result<()> {395let stream_id = raw_params.hdr.stream_id.to_native();396// Old value is not needed and is dropped397let _ = self.params.insert(stream_id, raw_params);398self.streams399.get(stream_id as usize)400.ok_or(Error::InvalidStreamId(stream_id))?;401let control_socket_lock = self.control_socket.lock();402send_cmd(&control_socket_lock, raw_params)403}404405/// Send the PREPARE_STREAM command to the server.406pub fn prepare_stream(&self, stream_id: u32) -> Result<()> {407self.common_stream_op(stream_id, VIRTIO_SND_R_PCM_PREPARE)408}409410/// Send the RELEASE_STREAM command to the server.411pub fn release_stream(&self, stream_id: u32) -> Result<()> {412self.common_stream_op(stream_id, VIRTIO_SND_R_PCM_RELEASE)413}414415/// Send the START_STREAM command to the server.416pub fn start_stream(&self, stream_id: u32) -> Result<()> {417self.common_stream_op(stream_id, VIRTIO_SND_R_PCM_START)418}419420/// Send the STOP_STREAM command to the server.421pub fn stop_stream(&self, stream_id: u32) -> Result<()> {422self.common_stream_op(stream_id, VIRTIO_SND_R_PCM_STOP)423}424425/// Send audio frames to the server. Blocks the calling thread until the server acknowledges426/// the data.427pub fn inject_audio_data<R, Cb: FnOnce(VolatileSlice) -> R>(428&self,429stream_id: u32,430size: usize,431callback: Cb,432) -> Result<(u32, R)> {433if self434.streams435.get(stream_id as usize)436.ok_or(Error::InvalidStreamId(stream_id))?437.direction438!= VIRTIO_SND_D_OUTPUT439{440return Err(Error::WrongDirection(VIRTIO_SND_D_OUTPUT));441}442self.streams443.get(stream_id as usize)444.ok_or(Error::InvalidStreamId(stream_id))?;445let dst_offset = self.tx.allocate_buffer(size)?;446let buffer_slice = self.tx.buffer_at(dst_offset, size)?;447let ret = callback(buffer_slice);448// Register to receive the status before sending the buffer to the server449let (sender, receiver): (Sender<BufferReleaseMsg>, Receiver<BufferReleaseMsg>) = channel();450self.tx_subscribers.lock().insert(dst_offset, sender);451self.tx.send_buffer(stream_id, dst_offset, size)?;452let (_, latency) = await_status(receiver)?;453Ok((latency, ret))454}455456/// Request audio frames from the server. It blocks until the data is available.457pub fn request_audio_data<R, Cb: FnOnce(&VolatileSlice) -> R>(458&self,459stream_id: u32,460size: usize,461callback: Cb,462) -> Result<(u32, R)> {463if self464.streams465.get(stream_id as usize)466.ok_or(Error::InvalidStreamId(stream_id))?467.direction468!= VIRTIO_SND_D_INPUT469{470return Err(Error::WrongDirection(VIRTIO_SND_D_INPUT));471}472let src_offset = self.rx.allocate_buffer(size)?;473// Register to receive the status before sending the buffer to the server474let (sender, receiver): (Sender<BufferReleaseMsg>, Receiver<BufferReleaseMsg>) = channel();475self.rx_subscribers.lock().insert(src_offset, sender);476self.rx.send_buffer(stream_id, src_offset, size)?;477// Make sure no mutexes are held while awaiting for the buffer to be written to478let (recv_size, latency) = await_status(receiver)?;479let buffer_slice = self.rx.buffer_at(src_offset, recv_size)?;480Ok((latency, callback(&buffer_slice)))481}482483/// Get a list of file descriptors used by the implementation.484pub fn keep_rds(&self) -> Vec<RawDescriptor> {485let control_desc = self.control_socket.lock().as_raw_descriptor();486let event_desc = self.event_socket.as_raw_descriptor();487let event_notifier = self.event_notifier.as_raw_descriptor();488let mut ret = vec![control_desc, event_desc, event_notifier];489ret.append(&mut self.tx.keep_rds());490ret.append(&mut self.rx.keep_rds());491ret492}493494fn common_stream_op(&self, stream_id: u32, op: u32) -> Result<()> {495self.streams496.get(stream_id as usize)497.ok_or(Error::InvalidStreamId(stream_id))?;498let msg = virtio_snd_pcm_hdr {499hdr: virtio_snd_hdr { code: op.into() },500stream_id: stream_id.into(),501};502let control_socket_lock = self.control_socket.lock();503send_cmd(&control_socket_lock, msg)504}505506fn request_and_cache_info(&mut self) -> Result<()> {507self.request_and_cache_jacks_info()?;508self.request_and_cache_streams_info()?;509self.request_and_cache_chmaps_info()?;510Ok(())511}512513fn request_info<T: IntoBytes + FromBytes + Default + Copy + Clone>(514&self,515req_code: u32,516count: usize,517) -> Result<Vec<T>> {518let info_size = std::mem::size_of::<T>();519let status_size = std::mem::size_of::<virtio_snd_hdr>();520let req = virtio_snd_query_info {521hdr: virtio_snd_hdr {522code: req_code.into(),523},524start_id: 0u32.into(),525count: (count as u32).into(),526size: (std::mem::size_of::<virtio_snd_query_info>() as u32).into(),527};528let control_socket_lock = self.control_socket.lock();529seq_socket_send(&control_socket_lock, req.as_bytes())?;530let reply = control_socket_lock531.recv_as_vec()532.map_err(Error::ServerIOError)?;533let mut status: virtio_snd_hdr = Default::default();534status535.as_mut_bytes()536.copy_from_slice(&reply[0..status_size]);537if status.code.to_native() != VIRTIO_SND_S_OK {538return Err(Error::CommandFailed(status.code.to_native()));539}540if reply.len() != status_size + count * info_size {541return Err(Error::ProtocolError(542ProtocolErrorKind::UnexpectedMessageSize(count * info_size, reply.len()),543));544}545Ok(reply[status_size..]546.chunks(info_size)547.map(|info_buffer| T::read_from_bytes(info_buffer).unwrap())548.collect())549}550551fn request_and_cache_jacks_info(&mut self) -> Result<()> {552let num_jacks = self.config.jacks as usize;553if num_jacks == 0 {554return Ok(());555}556self.jacks = self.request_info(VIRTIO_SND_R_JACK_INFO, num_jacks)?;557Ok(())558}559560fn request_and_cache_streams_info(&mut self) -> Result<()> {561let num_streams = self.config.streams as usize;562if num_streams == 0 {563return Ok(());564}565self.streams = self.request_info(VIRTIO_SND_R_PCM_INFO, num_streams)?;566Ok(())567}568569fn request_and_cache_chmaps_info(&mut self) -> Result<()> {570let num_chmaps = self.config.chmaps as usize;571if num_chmaps == 0 {572return Ok(());573}574self.chmaps = self.request_info(VIRTIO_SND_R_CHMAP_INFO, num_chmaps)?;575Ok(())576}577578pub fn snapshot(&self) -> VioSClientSnapshot {579VioSClientSnapshot {580config: self.config,581jacks: self.jacks.clone(),582streams: self.streams.clone(),583chmaps: self.chmaps.clone(),584params: self.params.clone(),585}586}587588// Function called `restore` to signify it will happen as part of the snapshot/restore flow. No589// data is actually restored in the case of VioSClient.590pub fn restore(&mut self, data: VioSClientSnapshot) -> anyhow::Result<()> {591anyhow::ensure!(592data.config == self.config,593"config doesn't match on restore: expected: {:?}, got: {:?}",594data.config,595self.config596);597self.jacks = data.jacks;598self.streams = data.streams;599self.chmaps = data.chmaps;600self.params = data.params;601Ok(())602}603604pub fn restore_stream(&mut self, stream_id: u32, state: StreamState) -> Result<()> {605if let Some(params) = self.params.get(&stream_id).cloned() {606self.set_stream_parameters_raw(params)?;607}608match state {609StreamState::Started => {610// If state != prepared, start will always fail.611// As such, it is fine to only print the first error without returning, as the612// second action will then fail.613if let Err(e) = self.prepare_stream(stream_id) {614error!("failed to prepare stream: {}", e);615};616self.start_stream(stream_id)617}618StreamState::Prepared => self.prepare_stream(stream_id),619// Nothing to do here620_ => Ok(()),621}622}623}624625#[derive(Clone, Copy)]626struct ThreadFlags {627reporting_events: bool,628}629630#[derive(EventToken)]631enum Token {632Notification,633TxBufferMsg,634RxBufferMsg,635EventMsg,636}637638fn recv_buffer_status_msg(639socket: &UnixSeqpacket,640subscribers: &Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>,641) -> Result<()> {642let mut msg: IoStatusMsg = Default::default();643let size = socket644.recv(msg.as_mut_bytes())645.map_err(Error::ServerIOError)?;646if size != std::mem::size_of::<IoStatusMsg>() {647return Err(Error::ProtocolError(648ProtocolErrorKind::UnexpectedMessageSize(std::mem::size_of::<IoStatusMsg>(), size),649));650}651let mut status = msg.status.status.into();652if status == u32::MAX {653// Anyone waiting for this would continue to wait for as long as status is654// u32::MAX655status -= 1;656}657let latency = msg.status.latency_bytes.into();658let offset = msg.buffer_offset as usize;659let consumed_len = msg.consumed_len as usize;660let promise_opt = subscribers.lock().remove(&offset);661match promise_opt {662None => error!(663"Received an unexpected buffer status message: {}. This is a BUG!!",664offset665),666Some(sender) => {667if let Err(e) = sender.send(BufferReleaseMsg {668status,669latency,670consumed_len,671}) {672error!("Failed to notify waiting thread: {:?}", e);673}674}675}676Ok(())677}678679fn recv_event(socket: &UnixSeqpacket) -> Result<virtio_snd_event> {680let mut msg: virtio_snd_event = Default::default();681let size = socket682.recv(msg.as_mut_bytes())683.map_err(Error::ServerIOError)?;684if size != std::mem::size_of::<virtio_snd_event>() {685return Err(Error::ProtocolError(686ProtocolErrorKind::UnexpectedMessageSize(std::mem::size_of::<virtio_snd_event>(), size),687));688}689Ok(msg)690}691692fn run_recv_thread(693kill_event: Event,694tx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>,695rx_subscribers: Arc<Mutex<HashMap<usize, Sender<BufferReleaseMsg>>>>,696event_notifier: Event,697event_queue: Arc<Mutex<VecDeque<virtio_snd_event>>>,698state: Arc<Mutex<ThreadFlags>>,699tx_socket: UnixSeqpacket,700rx_socket: UnixSeqpacket,701event_socket: UnixSeqpacket,702) -> Result<()> {703let wait_ctx: WaitContext<Token> = WaitContext::build_with(&[704(&tx_socket, Token::TxBufferMsg),705(&rx_socket, Token::RxBufferMsg),706(&event_socket, Token::EventMsg),707(&kill_event, Token::Notification),708])709.map_err(Error::WaitContextCreateError)?;710let mut running = true;711while running {712let events = wait_ctx.wait().map_err(Error::WaitError)?;713for evt in events {714match evt.token {715Token::TxBufferMsg => recv_buffer_status_msg(&tx_socket, &tx_subscribers)?,716Token::RxBufferMsg => recv_buffer_status_msg(&rx_socket, &rx_subscribers)?,717Token::EventMsg => {718let evt = recv_event(&event_socket)?;719let state_cpy = *state.lock();720if state_cpy.reporting_events {721event_queue.lock().push_back(evt);722event_notifier.signal().map_err(Error::EventWriteError)?;723} // else just drop the events724}725Token::Notification => {726// Just consume the notification and check for termination on the next727// iteration728if let Err(e) = kill_event.wait() {729error!("Failed to consume notification from recv thread: {:?}", e);730}731running = false;732}733}734}735}736Ok(())737}738739fn await_status(promise: Receiver<BufferReleaseMsg>) -> Result<(usize, u32)> {740let BufferReleaseMsg {741status,742latency,743consumed_len,744} = promise.recv().map_err(Error::BufferStatusSenderLost)?;745if status == VIRTIO_SND_S_OK {746Ok((consumed_len, latency))747} else {748Err(Error::IOBufferError(status))749}750}751752struct IoBufferQueue {753socket: UnixSeqpacket,754file: File,755mmap: MemoryMapping,756size: usize,757next: Mutex<usize>,758}759760impl IoBufferQueue {761fn new(socket: UnixSeqpacket, mut file: File) -> Result<IoBufferQueue> {762let size = file.seek(SeekFrom::End(0)).map_err(Error::FileSizeError)? as usize;763764let mmap = MemoryMappingBuilder::new(size)765.from_file(&file)766.build()767.map_err(Error::ServerMmapError)?;768769Ok(IoBufferQueue {770socket,771file,772mmap,773size,774next: Mutex::new(0),775})776}777778fn allocate_buffer(&self, size: usize) -> Result<usize> {779if size > self.size {780return Err(Error::OutOfSpace);781}782let mut next_lock = self.next.lock();783let offset = if size > self.size - *next_lock {784// Can't fit the new buffer at the end of the area, so put it at the beginning7850786} else {787*next_lock788};789*next_lock = offset + size;790Ok(offset)791}792793fn buffer_at(&self, offset: usize, len: usize) -> Result<VolatileSlice> {794self.mmap795.get_slice(offset, len)796.map_err(Error::VolatileMemoryError)797}798799fn try_clone_socket(&self) -> Result<UnixSeqpacket> {800self.socket801.try_clone()802.map_err(Error::UnixSeqpacketDupError)803}804805fn send_buffer(&self, stream_id: u32, offset: usize, size: usize) -> Result<()> {806let msg = IoTransferMsg::new(stream_id, offset, size);807seq_socket_send(&self.socket, msg.as_bytes())808}809810fn keep_rds(&self) -> Vec<RawDescriptor> {811vec![812self.file.as_raw_descriptor(),813self.socket.as_raw_descriptor(),814]815}816}817818/// Groups the parameters used to configure a stream prior to using it.819pub struct VioSStreamParams {820pub buffer_bytes: u32,821pub period_bytes: u32,822pub features: u32,823pub channels: u8,824pub format: u8,825pub rate: u8,826}827828impl From<(u32, VioSStreamParams)> for virtio_snd_pcm_set_params {829fn from(val: (u32, VioSStreamParams)) -> Self {830virtio_snd_pcm_set_params {831hdr: virtio_snd_pcm_hdr {832hdr: virtio_snd_hdr {833code: VIRTIO_SND_R_PCM_SET_PARAMS.into(),834},835stream_id: val.0.into(),836},837buffer_bytes: val.1.buffer_bytes.into(),838period_bytes: val.1.period_bytes.into(),839features: val.1.features.into(),840channels: val.1.channels,841format: val.1.format,842rate: val.1.rate,843padding: 0u8,844}845}846}847848fn send_cmd<T: Immutable + IntoBytes>(control_socket: &UnixSeqpacket, data: T) -> Result<()> {849seq_socket_send(control_socket, data.as_bytes())?;850recv_cmd_status(control_socket)851}852853fn recv_cmd_status(control_socket: &UnixSeqpacket) -> Result<()> {854let mut status: virtio_snd_hdr = Default::default();855control_socket856.recv(status.as_mut_bytes())857.map_err(Error::ServerIOError)?;858if status.code.to_native() == VIRTIO_SND_S_OK {859Ok(())860} else {861Err(Error::CommandFailed(status.code.to_native()))862}863}864865fn seq_socket_send(socket: &UnixSeqpacket, data: &[u8]) -> Result<()> {866loop {867let send_res = socket.send(data);868if let Err(e) = send_res {869match e.kind() {870// Retry if interrupted871IOErrorKind::Interrupted => continue,872_ => return Err(Error::ServerIOError(e)),873}874}875// Success876break;877}878Ok(())879}880881const VIOS_VERSION: u32 = 2;882883#[repr(C)]884#[derive(885Copy,886Clone,887Default,888FromBytes,889Immutable,890IntoBytes,891KnownLayout,892Serialize,893Deserialize,894PartialEq,895Eq,896Debug,897)]898struct VioSConfig {899version: u32,900jacks: u32,901streams: u32,902chmaps: u32,903}904905struct BufferReleaseMsg {906status: u32,907latency: u32,908consumed_len: usize,909}910911#[repr(C)]912#[derive(Copy, Clone, FromBytes, Immutable, IntoBytes, KnownLayout)]913struct IoTransferMsg {914io_xfer: virtio_snd_pcm_xfer,915buffer_offset: u32,916buffer_len: u32,917}918919impl IoTransferMsg {920fn new(stream_id: u32, buffer_offset: usize, buffer_len: usize) -> IoTransferMsg {921IoTransferMsg {922io_xfer: virtio_snd_pcm_xfer {923stream_id: stream_id.into(),924},925buffer_offset: buffer_offset as u32,926buffer_len: buffer_len as u32,927}928}929}930931#[repr(C)]932#[derive(Copy, Clone, Default, FromBytes, Immutable, IntoBytes, KnownLayout)]933struct IoStatusMsg {934status: virtio_snd_pcm_status,935buffer_offset: u32,936consumed_len: u32,937}938939940