Path: blob/main/devices/src/virtio/vsock/sys/windows/vsock.rs
5394 views
// Copyright 2022 The ChromiumOS Authors1// Use of this source code is governed by a BSD-style license that can be2// found in the LICENSE file.34use std::cell::RefCell;5use std::collections::BTreeMap;6use std::collections::HashMap;7use std::fmt;8use std::fmt::Display;9use std::io;10use std::io::Read;11use std::io::Write;12use std::os::windows::io::RawHandle;13use std::rc::Rc;14use std::result;15use std::sync::Arc;16use std::thread;1718use anyhow::anyhow;19use anyhow::Context;20use base::error;21use base::info;22use base::named_pipes;23use base::named_pipes::BlockingMode;24use base::named_pipes::FramingMode;25use base::named_pipes::OverlappedWrapper;26use base::named_pipes::PipeConnection;27use base::warn;28use base::AsRawDescriptor;29use base::Error as SysError;30use base::Event;31use base::EventExt;32use base::WorkerThread;33use cros_async::select3;34use cros_async::select6;35use cros_async::sync::RwLock;36use cros_async::AsyncError;37use cros_async::EventAsync;38use cros_async::Executor;39use cros_async::SelectResult;40use data_model::Le32;41use data_model::Le64;42use futures::channel::mpsc;43use futures::channel::oneshot;44use futures::pin_mut;45use futures::select;46use futures::select_biased;47use futures::stream::FuturesUnordered;48use futures::FutureExt;49use futures::SinkExt;50use futures::StreamExt;51use remain::sorted;52use serde::Deserialize;53use serde::Serialize;54use snapshot::AnySnapshot;55use thiserror::Error as ThisError;56use vm_memory::GuestMemory;57use zerocopy::FromBytes;58use zerocopy::FromZeros;59use zerocopy::IntoBytes;6061use crate::virtio::async_utils;62use crate::virtio::copy_config;63use crate::virtio::create_stop_oneshot;64use crate::virtio::vsock::sys::windows::protocol::virtio_vsock_config;65use crate::virtio::vsock::sys::windows::protocol::virtio_vsock_event;66use crate::virtio::vsock::sys::windows::protocol::virtio_vsock_hdr;67use crate::virtio::vsock::sys::windows::protocol::vsock_op;68use crate::virtio::vsock::sys::windows::protocol::TYPE_STREAM_SOCKET;69use crate::virtio::DescriptorChain;70use crate::virtio::DeviceType;71use crate::virtio::Interrupt;72use crate::virtio::Queue;73use crate::virtio::StoppedWorker;74use crate::virtio::VirtioDevice;75use crate::virtio::Writer;76use crate::Suspendable;7778#[sorted]79#[derive(ThisError, Debug)]80pub enum VsockError {81#[error("Failed to await next descriptor chain from queue: {0}")]82AwaitQueue(AsyncError),83#[error("OverlappedWrapper error.")]84BadOverlappedWrapper,85#[error("Failed to clone descriptor: {0}")]86CloneDescriptor(SysError),87#[error("Failed to create EventAsync: {0}")]88CreateEventAsync(AsyncError),89#[error("Failed to create wait context: {0}")]90CreateWaitContext(SysError),91#[error("Failed to read queue: {0}")]92ReadQueue(io::Error),93#[error("Failed to reset event object: {0}")]94ResetEventObject(SysError),95#[error("Failed to run executor: {0}")]96RunExecutor(AsyncError),97#[error("Failed to write to pipe, port {0}: {1}")]98WriteFailed(PortPair, io::Error),99#[error("Failed to write queue: {0}")]100WriteQueue(io::Error),101}102pub type Result<T> = result::Result<T, VsockError>;103104// Vsock has three virt IO queues: rx, tx, and event.105const QUEUE_SIZE: u16 = 256;106const QUEUE_SIZES: &[u16] = &[QUEUE_SIZE, QUEUE_SIZE, QUEUE_SIZE];107// We overload port numbers so that if message is to be received from108// CONNECTION_EVENT_PORT_NUM (invalid port number), we recognize that a109// new connection was set up.110const CONNECTION_EVENT_PORT_NUM: u32 = u32::MAX;111112/// Number of bytes in a kilobyte. Used to simplify and clarify buffer size definitions.113const KB: usize = 1024;114115/// Size of the buffer we read into from the host side named pipe. Note that data flows from the116/// host pipe -> this buffer -> rx queue.117/// This should be large enough to facilitate fast transmission of host data, see b/232950349.118const TEMP_READ_BUF_SIZE_BYTES: usize = 4 * KB;119120/// In the case where the host side named pipe does not have a specified buffer size, we'll default121/// to telling the guest that this is the amount of extra RX space available (e.g. buf_alloc).122/// This should be larger than the volume of data that the guest will generally send at one time to123/// minimize credit update packtes (see MIN_FREE_BUFFER_PCT below).124const DEFAULT_BUF_ALLOC_BYTES: usize = 128 * KB;125126/// Minimum free buffer threshold to notify the peer with a credit update127/// message. This is in case we are ingesting many messages without an128/// opportunity to send a message back to the peer with a buffer size update.129/// This value is a percentage of `buf_alloc`.130/// TODO(b/204246759): This value was chosen without much more thought than "it131/// works". It should probably be adjusted, along with DEFAULT_BUF_ALLOC, to a132/// value that makes empirical sense for the packet sizes that we expect to133/// receive.134/// TODO(b/239848326): Replace float with integer, in order to remove risk135/// of losing precision. Ie. change to `10` and perform136/// `FOO * MIN_FREE_BUFFER_PCT / 100`137const MIN_FREE_BUFFER_PCT: f64 = 0.1;138139// Number of packets to buffer in the tx processing channels.140const CHANNEL_SIZE: usize = 256;141142type VsockConnectionMap = RwLock<HashMap<PortPair, VsockConnection>>;143144/// Virtio device for exposing entropy to the guest OS through virtio.145pub struct Vsock {146guest_cid: u64,147host_guid: Option<String>,148features: u64,149acked_features: u64,150worker_thread: Option<WorkerThread<Option<(PausedQueues, VsockConnectionMap)>>>,151/// Stores any active connections when the device sleeps. This allows us to sleep/wake152/// without disrupting active connections, which is useful when taking a snapshot.153sleeping_connections: Option<VsockConnectionMap>,154/// If true, we should send a TRANSPORT_RESET event to the guest at the next opportunity.155/// Used to inform the guest all connections are broken when we restore a snapshot.156needs_transport_reset: bool,157}158159/// Snapshotted state of Vsock. These fields are serialized in order to validate they haven't160/// changed when this device is restored.161#[derive(Serialize, Deserialize)]162struct VsockSnapshot {163guest_cid: u64,164features: u64,165acked_features: u64,166}167168impl Vsock {169pub fn new(guest_cid: u64, host_guid: Option<String>, base_features: u64) -> Result<Vsock> {170Ok(Vsock {171guest_cid,172host_guid,173features: base_features,174acked_features: 0,175worker_thread: None,176sleeping_connections: None,177needs_transport_reset: false,178})179}180181fn get_config(&self) -> virtio_vsock_config {182virtio_vsock_config {183guest_cid: Le64::from(self.guest_cid),184}185}186187fn stop_worker(&mut self) -> StoppedWorker<(PausedQueues, VsockConnectionMap)> {188if let Some(worker_thread) = self.worker_thread.take() {189if let Some(queues_and_conns) = worker_thread.stop() {190StoppedWorker::WithQueues(Box::new(queues_and_conns))191} else {192StoppedWorker::MissingQueues193}194} else {195StoppedWorker::AlreadyStopped196}197}198199fn start_worker(200&mut self,201mem: GuestMemory,202mut queues: VsockQueues,203existing_connections: Option<VsockConnectionMap>,204) -> anyhow::Result<()> {205let rx_queue = queues.rx;206let tx_queue = queues.tx;207let event_queue = queues.event;208209let host_guid = self.host_guid.clone();210let guest_cid = self.guest_cid;211let needs_transport_reset = self.needs_transport_reset;212self.needs_transport_reset = false;213self.worker_thread = Some(WorkerThread::start(214"userspace_virtio_vsock",215move |kill_evt| {216let mut worker = Worker::new(217mem,218host_guid,219guest_cid,220existing_connections,221needs_transport_reset,222);223let result = worker.run(rx_queue, tx_queue, event_queue, kill_evt);224225match result {226Err(e) => {227error!("userspace vsock worker thread exited with error: {:?}", e);228None229}230Ok(paused_queues_and_connections_option) => {231paused_queues_and_connections_option232}233}234},235));236237Ok(())238}239}240241impl VirtioDevice for Vsock {242fn keep_rds(&self) -> Vec<RawHandle> {243Vec::new()244}245246fn read_config(&self, offset: u64, data: &mut [u8]) {247copy_config(data, 0, self.get_config().as_bytes(), offset);248}249250fn device_type(&self) -> DeviceType {251DeviceType::Vsock252}253254fn queue_max_sizes(&self) -> &[u16] {255QUEUE_SIZES256}257258fn features(&self) -> u64 {259self.features260}261262fn ack_features(&mut self, value: u64) {263self.acked_features |= value;264}265266fn activate(267&mut self,268mem: GuestMemory,269_interrupt: Interrupt,270mut queues: BTreeMap<usize, Queue>,271) -> anyhow::Result<()> {272if queues.len() != QUEUE_SIZES.len() {273return Err(anyhow!(274"Failed to activate vsock device. queues.len(): {} != {}",275queues.len(),276QUEUE_SIZES.len(),277));278}279280let vsock_queues = VsockQueues {281rx: queues.remove(&0).unwrap(),282tx: queues.remove(&1).unwrap(),283event: queues.remove(&2).unwrap(),284};285286self.start_worker(mem, vsock_queues, None)287}288289fn virtio_sleep(&mut self) -> anyhow::Result<Option<BTreeMap<usize, Queue>>> {290match self.stop_worker() {291StoppedWorker::WithQueues(paused_queues_and_conns) => {292let (queues, sleeping_connections) = *paused_queues_and_conns;293self.sleeping_connections = Some(sleeping_connections);294Ok(Some(queues.into()))295}296StoppedWorker::MissingQueues => {297anyhow::bail!("vsock queue workers did not stop cleanly")298}299StoppedWorker::AlreadyStopped => {300// The device isn't in the activated state.301Ok(None)302}303}304}305306fn virtio_wake(307&mut self,308queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>,309) -> anyhow::Result<()> {310if let Some((mem, _interrupt, queues)) = queues_state {311let connections = self.sleeping_connections.take();312self.start_worker(313mem,314queues315.try_into()316.expect("Failed to convert queue BTreeMap to VsockQueues"),317connections,318)?;319}320Ok(())321}322323fn virtio_snapshot(&mut self) -> anyhow::Result<AnySnapshot> {324AnySnapshot::to_any(VsockSnapshot {325guest_cid: self.guest_cid,326features: self.features,327acked_features: self.acked_features,328})329.context("failed to serialize vsock snapshot")330}331332fn virtio_restore(&mut self, data: AnySnapshot) -> anyhow::Result<()> {333let vsock_snapshot: VsockSnapshot =334AnySnapshot::from_any(data).context("error deserializing vsock snapshot")?;335anyhow::ensure!(336self.guest_cid == vsock_snapshot.guest_cid,337"expected guest_cid to match, but they did not. Live: {}, snapshot: {}",338self.guest_cid,339vsock_snapshot.guest_cid340);341anyhow::ensure!(342self.features == vsock_snapshot.features,343"vsock: expected features to match, but they did not. Live: {}, snapshot: {}",344self.features,345vsock_snapshot.features346);347self.acked_features = vsock_snapshot.acked_features;348self.needs_transport_reset = true;349350Ok(())351}352}353354#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)]355pub struct PortPair {356host: u32,357guest: u32,358}359360impl Display for PortPair {361fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {362write!(f, "(host port: {}, guest port: {})", self.host, self.guest)363}364}365366impl PortPair {367fn from_tx_header(header: &virtio_vsock_hdr) -> PortPair {368PortPair {369host: header.dst_port.to_native(),370guest: header.src_port.to_native(),371}372}373}374375// Note: variables herein do not have to be atomic because this struct is guarded376// by a RwLock.377struct VsockConnection {378// The guest port.379guest_port: Le32,380381// The actual named (asynchronous) pipe connection.382pipe: PipeConnection,383// The overlapped struct contains an event object for the named pipe.384// This lets us select() on the pipes by waiting on the events.385// This is for Reads only.386overlapped: Box<OverlappedWrapper>,387// Read buffer for the named pipes. These are needed because reads complete388// asynchronously.389buffer: Box<[u8; TEMP_READ_BUF_SIZE_BYTES]>,390391// Total free-running count of received bytes.392recv_cnt: usize,393394// Total free-running count of received bytes that the peer has been informed of.395prev_recv_cnt: usize,396397// Total auxiliary buffer space available to receive packets from the driver, not including398// the virtqueue itself. For us, this is tx buffer on the named pipe into which we drain399// packets for the connection. Note that if the named pipe has a grow on demand TX buffer,400// we use DEFAULT_BUF_ALLOC instead.401buf_alloc: usize,402403// Peer (driver) total free-running count of received bytes.404peer_recv_cnt: usize,405406// Peer (driver) total rx buffer allocated.407peer_buf_alloc: usize,408409// Total free-running count of transmitted bytes.410tx_cnt: usize,411412// State tracking for full buffer condition. Currently just used for logging. If the peer's413// buffer does not have space for a maximum-sized message (TEMP_READ_BUF_SIZE_BYTES), this414// gets set to `true`. Once there's enough space in the buffer, this gets unset.415is_buffer_full: bool,416}417418struct Worker {419mem: GuestMemory,420host_guid: Option<String>,421guest_cid: u64,422// Map of host port to a VsockConnection.423connections: VsockConnectionMap,424connection_event: Event,425device_event_queue_tx: mpsc::Sender<virtio_vsock_event>,426device_event_queue_rx: Option<mpsc::Receiver<virtio_vsock_event>>,427send_protocol_reset: bool,428}429430impl Worker {431fn new(432mem: GuestMemory,433host_guid: Option<String>,434guest_cid: u64,435existing_connections: Option<VsockConnectionMap>,436send_protocol_reset: bool,437) -> Worker {438// Buffer size here is arbitrary, but must be at least one since we need439// to be able to write a reset event to the channel when the device440// worker is brought up on a VM restore. Note that we send exactly one441// message per VM session, so we should never see these messages backing442// up.443let (device_event_queue_tx, device_event_queue_rx) = mpsc::channel(4);444445Worker {446mem,447host_guid,448guest_cid,449connections: existing_connections.unwrap_or_default(),450connection_event: Event::new().unwrap(),451device_event_queue_tx,452device_event_queue_rx: Some(device_event_queue_rx),453send_protocol_reset,454}455}456457async fn process_rx_queue(458&self,459recv_queue: Arc<RwLock<Queue>>,460mut rx_queue_evt: EventAsync,461ex: &Executor,462mut stop_rx: oneshot::Receiver<()>,463) -> Result<()> {464'connections_changed: loop {465// Run continuously until exit evt466467// TODO(b/200810561): Optimize this FuturesUnordered code.468// Set up the EventAsyncs to select on469let futures = FuturesUnordered::new();470// This needs to be its own scope since it holds a RwLock on `self.connections`.471{472let connections = self.connections.read_lock().await;473for (port, connection) in connections.iter() {474let h_evt = connection475.overlapped476.get_h_event_ref()477.ok_or_else(|| {478error!("Missing h_event in OverlappedWrapper.");479VsockError::BadOverlappedWrapper480})481.unwrap()482.try_clone()483.map_err(|e| {484error!("Could not clone h_event.");485VsockError::CloneDescriptor(e)486})?;487let evt_async = EventAsync::new(h_evt, ex).map_err(|e| {488error!("Could not create EventAsync.");489VsockError::CreateEventAsync(e)490})?;491futures.push(wait_event_and_return_port_pair(evt_async, *port));492}493}494let connection_evt_clone = self.connection_event.try_clone().map_err(|e| {495error!("Could not clone connection_event.");496VsockError::CloneDescriptor(e)497})?;498let connection_evt_async = EventAsync::new(connection_evt_clone, ex).map_err(|e| {499error!("Could not create EventAsync.");500VsockError::CreateEventAsync(e)501})?;502futures.push(wait_event_and_return_port_pair(503connection_evt_async,504PortPair {505host: CONNECTION_EVENT_PORT_NUM,506guest: 0,507},508));509510// Wait to service the sockets. Note that for fairness, it is critical that we service511// all ready sockets in a single wakeup to avoid starvation. This is why ready_chunks512// is used, as it returns all currently *ready* futures from the stream.513//514// The expect here only triggers if the FuturesUnordered stream is exhausted. This never515// happens because it has at least one item, and we only request chunks once.516let futures_len = futures.len();517let mut ready_chunks = futures.ready_chunks(futures_len);518let ports = select_biased! {519ports = ready_chunks.next() => {520ports.expect("failed to wait on vsock sockets")521}522_ = stop_rx => {523break;524}525};526527for port in ports {528if port.host == CONNECTION_EVENT_PORT_NUM {529// New connection event. Setup futures again.530if let Err(e) = self.connection_event.reset() {531error!("vsock: port: {}: could not reset connection_event.", port);532return Err(VsockError::ResetEventObject(e));533}534continue 'connections_changed;535}536let mut connections = self.connections.lock().await;537let connection = if let Some(conn) = connections.get_mut(&port) {538conn539} else {540// We could have been scheduled to run the rx queue *before* the connection was541// closed. In that case, we do nothing. The code which closed the connection542// (e.g. in response to a message in the tx/rx queues) will handle notifying543// the guest/host as required.544continue 'connections_changed;545};546547// Check if the peer has enough space in their buffer to548// receive the maximum amount of data that we could possibly549// read from the host pipe. If not, we should continue to550// process other sockets as each socket has an independent551// buffer.552let peer_free_buf_size =553connection.peer_buf_alloc - (connection.tx_cnt - connection.peer_recv_cnt);554if peer_free_buf_size < TEMP_READ_BUF_SIZE_BYTES {555if !connection.is_buffer_full {556warn!(557"vsock: port {}: Peer has insufficient free buffer space ({} bytes available)",558port, peer_free_buf_size559);560connection.is_buffer_full = true;561}562continue;563} else if connection.is_buffer_full {564connection.is_buffer_full = false;565}566567let pipe_connection = &mut connection.pipe;568let overlapped = &mut connection.overlapped;569let guest_port = connection.guest_port;570let buffer = &mut connection.buffer;571572match overlapped.get_h_event_ref() {573Some(h_event) => {574if let Err(e) = h_event.reset() {575error!(576"vsock: port: {}: Could not reset event in OverlappedWrapper.",577port578);579return Err(VsockError::ResetEventObject(e));580}581}582None => {583error!(584"vsock: port: {}: missing h_event in OverlappedWrapper.",585port586);587return Err(VsockError::BadOverlappedWrapper);588}589}590591let data_size = match pipe_connection.get_overlapped_result(&mut *overlapped) {592Ok(size) => size as usize,593Err(e) => {594error!("vsock: port {}: Failed to read from pipe {}", port, e);595// TODO(b/237278629): Close the connection if we fail to read.596continue 'connections_changed;597}598};599600let response_header = virtio_vsock_hdr {601src_cid: 2.into(), // Host CID602dst_cid: self.guest_cid.into(), // Guest CID603src_port: Le32::from(port.host),604dst_port: guest_port,605len: Le32::from(data_size as u32),606type_: TYPE_STREAM_SOCKET.into(),607op: vsock_op::VIRTIO_VSOCK_OP_RW.into(),608buf_alloc: Le32::from(connection.buf_alloc as u32),609fwd_cnt: Le32::from(connection.recv_cnt as u32),610..Default::default()611};612613connection.prev_recv_cnt = connection.recv_cnt;614615// We have to only write to the queue once, so we construct a new buffer616// with the concatenated header and data.617const HEADER_SIZE: usize = std::mem::size_of::<virtio_vsock_hdr>();618let data_read = &buffer[..data_size];619let mut header_and_data = vec![0u8; HEADER_SIZE + data_size];620header_and_data[..HEADER_SIZE].copy_from_slice(response_header.as_bytes());621header_and_data[HEADER_SIZE..].copy_from_slice(data_read);622{623let mut recv_queue_lock = recv_queue.lock().await;624let write_fut = self625.write_bytes_to_queue(626&mut recv_queue_lock,627&mut rx_queue_evt,628&header_and_data[..],629)630.fuse();631pin_mut!(write_fut);632// If `stop_rx` is fired but the virt queue is full, this loop will break633// without draining the `header_and_data`.634select_biased! {635write = write_fut => {},636_ = stop_rx => {637break;638}639}640}641642connection.tx_cnt += data_size;643644// Start reading again so we receive the message and645// event signal immediately.646647// SAFETY:648// Unsafe because the read could happen at any time649// after this function is called. We ensure safety650// by allocating the buffer and overlapped struct651// on the heap.652unsafe {653match pipe_connection.read_overlapped(&mut buffer[..], &mut *overlapped) {654Ok(()) => {}655Err(e) => {656error!("vsock: port {}: Failed to read from pipe {}", port, e);657}658}659}660}661}662Ok(())663}664665async fn process_tx_queue(666&self,667mut queue: Queue,668mut queue_evt: EventAsync,669mut process_packets_queue: mpsc::Sender<(virtio_vsock_hdr, Vec<u8>)>,670mut stop_rx: oneshot::Receiver<()>,671) -> Result<Queue> {672loop {673// Run continuously until exit evt674let mut avail_desc = match queue675.next_async_interruptable(&mut queue_evt, &mut stop_rx)676.await677{678Ok(Some(d)) => d,679Ok(None) => break,680Err(e) => {681error!("vsock: Failed to read descriptor {}", e);682return Err(VsockError::AwaitQueue(e));683}684};685686let reader = &mut avail_desc.reader;687while reader.available_bytes() >= std::mem::size_of::<virtio_vsock_hdr>() {688let header = match reader.read_obj::<virtio_vsock_hdr>() {689Ok(hdr) => hdr,690Err(e) => {691error!("vsock: Error while reading header: {}", e);692break;693}694};695696let len = header.len.to_native() as usize;697if reader.available_bytes() < len {698error!("vsock: Error reading packet data");699break;700}701702let mut data = vec![0_u8; len];703if len > 0 {704if let Err(e) = reader.read_exact(&mut data) {705error!("vosck: failed to read data from tx packet: {:?}", e);706}707}708709if let Err(e) = process_packets_queue.send((header, data)).await {710error!(711"Error while sending packet to queue, dropping packet: {:?}",712e713)714};715}716717queue.add_used(avail_desc);718queue.trigger_interrupt();719}720721Ok(queue)722}723724fn calculate_buf_alloc_from_pipe(pipe: &PipeConnection, port: PortPair) -> usize {725match pipe.get_info() {726Ok(info) => {727if info.outgoing_buffer_size > 0 {728info.outgoing_buffer_size as usize729} else {730info!(731"vsock: port {}: using default extra rx buffer size \732(named pipe does not have an explicit buffer size)",733port734);735736// A zero buffer size implies that the buffer grows as737// needed. We set our own cap here for flow control738// purposes.739DEFAULT_BUF_ALLOC_BYTES740}741}742Err(e) => {743error!(744"vsock: port {}: failed to get named pipe info, using default \745buf size. Error: {}",746port, e747);748DEFAULT_BUF_ALLOC_BYTES749}750}751}752753/// Processes a connection request and returns whether to return a response (true), or reset754/// (false).755async fn handle_vsock_connection_request(&self, header: virtio_vsock_hdr) -> bool {756let port = PortPair::from_tx_header(&header);757info!("vsock: port {}: Received connection request", port);758759if self.connections.read_lock().await.contains_key(&port) {760// Connection exists, nothing for us to do.761warn!(762"vsock: port {}: accepting connection request on already connected port",763port764);765return true;766}767768if self.host_guid.is_none() {769error!(770"vsock: port {}: Cannot accept guest-initiated connections \771without host-guid, rejecting connection",772port,773);774return false;775}776777// We have a new connection to establish.778let mut overlapped_wrapper =779Box::new(OverlappedWrapper::new(/* include_event= */ true).unwrap());780let pipe_result = named_pipes::create_client_pipe(781get_pipe_name(782self.host_guid.as_ref().unwrap(),783header.dst_port.to_native(),784)785.as_str(),786&FramingMode::Byte,787&BlockingMode::Wait,788true, /* overlapped */789);790791match pipe_result {792Ok(mut pipe_connection) => {793let mut buffer = Box::new([0u8; TEMP_READ_BUF_SIZE_BYTES]);794info!("vsock: port {}: created client pipe", port);795796// SAFETY:797// Unsafe because the read could happen at any time798// after this function is called. We ensure safety799// by allocating the buffer and overlapped struct800// on the heap.801unsafe {802match pipe_connection.read_overlapped(&mut buffer[..], &mut overlapped_wrapper)803{804Ok(()) => {}805Err(e) => {806error!("vsock: port {}: Failed to read from pipe {}", port, e);807return false;808}809}810}811info!("vsock: port {}: started read on client pipe", port);812813let buf_alloc = Self::calculate_buf_alloc_from_pipe(&pipe_connection, port);814let connection = VsockConnection {815guest_port: header.src_port,816pipe: pipe_connection,817overlapped: overlapped_wrapper,818peer_buf_alloc: header.buf_alloc.to_native() as usize,819peer_recv_cnt: header.fwd_cnt.to_native() as usize,820buf_alloc,821buffer,822// The connection has just been made, so we haven't received823// anything yet.824recv_cnt: 0_usize,825prev_recv_cnt: 0_usize,826tx_cnt: 0_usize,827is_buffer_full: false,828};829self.connections.lock().await.insert(port, connection);830self.connection_event.signal().unwrap_or_else(|_| {831panic!("Failed to signal new connection event for vsock port {port}.")832});833info!("vsock: port {}: signaled connection ready", port);834true835}836Err(e) => {837info!(838"vsock: No waiting pipe connection on port {}, \839not connecting (err: {:?})",840port, e841);842false843}844}845}846847async fn handle_vsock_guest_data(848&self,849header: virtio_vsock_hdr,850data: &[u8],851ex: &Executor,852) -> Result<()> {853let port = PortPair::from_tx_header(&header);854let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap();855{856let mut connections = self.connections.lock().await;857if let Some(connection) = connections.get_mut(&port) {858// Update peer buffer/recv counters859connection.peer_recv_cnt = header.fwd_cnt.to_native() as usize;860connection.peer_buf_alloc = header.buf_alloc.to_native() as usize;861862let pipe = &mut connection.pipe;863// We have to provide a OVERLAPPED struct to write to the pipe.864//865// SAFETY: safe because data & overlapped_wrapper live until the866// overlapped operation completes (we wait on completion below).867if let Err(e) = unsafe { pipe.write_overlapped(data, &mut overlapped_wrapper) } {868return Err(VsockError::WriteFailed(port, e));869}870} else {871error!(872"vsock: Guest attempted to send data packet over unconnected \873port ({}), dropping packet",874port875);876return Ok(());877}878}879if let Some(write_completed_event) = overlapped_wrapper.get_h_event_ref() {880// Don't block the executor while the write completes. This time should881// always be negligible, but will sometimes be non-zero in cases where882// traffic is high on the NamedPipe, especially a duplex pipe.883if let Ok(cloned_event) = write_completed_event.try_clone() {884if let Ok(async_event) = EventAsync::new_without_reset(cloned_event, ex) {885let _ = async_event.next_val().await;886} else {887error!(888"vsock: port {}: Failed to convert write event to async",889port890);891}892} else {893error!(894"vsock: port {}: Failed to clone write completion event",895port896);897}898} else {899error!(900"vsock: port: {}: Failed to get overlapped event for write",901port902);903}904905let mut connections = self.connections.lock().await;906if let Some(connection) = connections.get_mut(&port) {907let pipe = &mut connection.pipe;908match pipe.get_overlapped_result(&mut overlapped_wrapper) {909Ok(len) => {910// We've received bytes from the guest, account for them in our911// received bytes counter.912connection.recv_cnt += len as usize;913914if len != data.len() as u32 {915return Err(VsockError::WriteFailed(916port,917std::io::Error::other(format!(918"port {} failed to write correct number of bytes:919(expected: {}, wrote: {})",920port,921data.len(),922len923)),924));925}926}927Err(e) => {928return Err(VsockError::WriteFailed(port, e));929}930}931} else {932error!(933"vsock: Guest attempted to send data packet over unconnected \934port ({}), dropping packet",935port936);937}938Ok(())939}940941async fn process_tx_packets(942&self,943send_queue: &Arc<RwLock<Queue>>,944rx_queue_evt: Event,945mut packet_recv_queue: mpsc::Receiver<(virtio_vsock_hdr, Vec<u8>)>,946ex: &Executor,947mut stop_rx: oneshot::Receiver<()>,948) {949let mut packet_queues = HashMap::new();950let mut futures = FuturesUnordered::new();951// Push a pending future that will never complete into FuturesUnordered.952// This will keep us from spinning on spurious notifications when we953// don't have any open connections.954futures.push(std::future::pending::<PortPair>().left_future());955956let mut stop_future = FuturesUnordered::new();957stop_future.push(stop_rx);958loop {959let (new_packet, connection, stop_rx_res) =960select3(packet_recv_queue.next(), futures.next(), stop_future.next()).await;961match connection {962SelectResult::Finished(Some(port)) => {963packet_queues.remove(&port);964}965SelectResult::Finished(_) => {966// This is only triggered when FuturesUnordered completes967// all pending futures. Right now, this can never happen, as968// we have a pending future that we push that will never969// complete.970}971SelectResult::Pending(_) => {972// Nothing to do.973}974};975match new_packet {976SelectResult::Finished(Some(packet)) => {977let port = PortPair::from_tx_header(&packet.0);978let queue = packet_queues.entry(port).or_insert_with(|| {979let (send, recv) = mpsc::channel(CHANNEL_SIZE);980let event_async = EventAsync::new(981rx_queue_evt.try_clone().expect("Failed to clone event"),982ex,983)984.expect("Failed to set up the rx queue event");985futures.push(986self.process_tx_packets_for_port(987port,988recv,989send_queue,990event_async,991ex,992)993.right_future(),994);995send996});997// Try to send the packet. Do not block other ports if the queue is full.998if let Err(e) = queue.try_send(packet) {999error!(1000"vsock: port {}: error sending packet to queue, dropping packet: {:?}",1001port, e1002)1003}1004}1005SelectResult::Finished(_) => {1006// Triggers when the channel is closed; no more packets coming.1007packet_recv_queue.close();1008return;1009}1010SelectResult::Pending(_) => {1011// Nothing to do.1012}1013}1014match stop_rx_res {1015SelectResult::Finished(_) => {1016break;1017}1018SelectResult::Pending(_) => {1019// Nothing to do.1020}1021}1022}1023}10241025async fn process_tx_packets_for_port(1026&self,1027port: PortPair,1028mut packet_recv_queue: mpsc::Receiver<(virtio_vsock_hdr, Vec<u8>)>,1029send_queue: &Arc<RwLock<Queue>>,1030mut rx_queue_evt: EventAsync,1031ex: &Executor,1032) -> PortPair {1033while let Some((header, data)) = packet_recv_queue.next().await {1034if !self1035.handle_tx_packet(header, &data, send_queue, &mut rx_queue_evt, ex)1036.await1037{1038packet_recv_queue.close();1039if let Ok(Some(_)) = packet_recv_queue.try_next() {1040warn!("vsock: closing port {} with unprocessed packets", port);1041} else {1042info!("vsock: closing port {} cleanly", port)1043}1044break;1045}1046}1047port1048}10491050async fn handle_tx_packet(1051&self,1052header: virtio_vsock_hdr,1053data: &[u8],1054send_queue: &Arc<RwLock<Queue>>,1055rx_queue_evt: &mut EventAsync,1056ex: &Executor,1057) -> bool {1058let mut is_open = true;1059let port = PortPair::from_tx_header(&header);1060match header.op.to_native() {1061vsock_op::VIRTIO_VSOCK_OP_INVALID => {1062error!("vsock: Invalid Operation requested, dropping packet");1063}1064vsock_op::VIRTIO_VSOCK_OP_REQUEST => {1065let (resp_op, buf_alloc, fwd_cnt) =1066if self.handle_vsock_connection_request(header).await {1067let connections = self.connections.read_lock().await;10681069connections.get(&port).map_or_else(1070|| {1071warn!("vsock: port: {} connection closed during connect", port);1072is_open = false;1073(vsock_op::VIRTIO_VSOCK_OP_RST, 0, 0)1074},1075|conn| {1076(1077vsock_op::VIRTIO_VSOCK_OP_RESPONSE,1078conn.buf_alloc as u32,1079conn.recv_cnt as u32,1080)1081},1082)1083} else {1084is_open = false;1085(vsock_op::VIRTIO_VSOCK_OP_RST, 0, 0)1086};10871088let response_header = virtio_vsock_hdr {1089src_cid: { header.dst_cid },1090dst_cid: { header.src_cid },1091src_port: { header.dst_port },1092dst_port: { header.src_port },1093len: 0.into(),1094type_: TYPE_STREAM_SOCKET.into(),1095op: resp_op.into(),1096buf_alloc: Le32::from(buf_alloc),1097fwd_cnt: Le32::from(fwd_cnt),1098..Default::default()1099};1100// Safe because virtio_vsock_hdr is a simple data struct and converts cleanly to1101// bytes.1102self.write_bytes_to_queue(1103&mut *send_queue.lock().await,1104rx_queue_evt,1105response_header.as_bytes(),1106)1107.await1108.expect("vsock: failed to write to queue");1109info!(1110"vsock: port {}: replied {} to connection request",1111port,1112if resp_op == vsock_op::VIRTIO_VSOCK_OP_RESPONSE {1113"success"1114} else {1115"reset"1116},1117);1118}1119vsock_op::VIRTIO_VSOCK_OP_RESPONSE => {1120// TODO(b/237811512): Implement this for host-initiated connections1121}1122vsock_op::VIRTIO_VSOCK_OP_RST => {1123// TODO(b/237811512): Implement this for host-initiated connections1124}1125vsock_op::VIRTIO_VSOCK_OP_SHUTDOWN => {1126// While the header provides flags to specify tx/rx-specific shutdown,1127// we only support full shutdown.1128// TODO(b/237811512): Provide an optimal way to notify host of shutdowns1129// while still maintaining easy reconnections.1130let mut connections = self.connections.lock().await;1131if connections.remove(&port).is_some() {1132let mut response = virtio_vsock_hdr {1133src_cid: { header.dst_cid },1134dst_cid: { header.src_cid },1135src_port: { header.dst_port },1136dst_port: { header.src_port },1137len: 0.into(),1138type_: TYPE_STREAM_SOCKET.into(),1139op: vsock_op::VIRTIO_VSOCK_OP_RST.into(),1140// There is no buffer on a closed connection1141buf_alloc: 0.into(),1142// There is no fwd_cnt anymore on a closed connection1143fwd_cnt: 0.into(),1144..Default::default()1145};1146// Safe because virtio_vsock_hdr is a simple data struct and converts cleanly to1147// bytes1148self.write_bytes_to_queue(1149&mut *send_queue.lock().await,1150rx_queue_evt,1151response.as_mut_bytes(),1152)1153.await1154.expect("vsock: failed to write to queue");1155self.connection_event1156.signal()1157.expect("vsock: failed to write to event");1158info!("vsock: port: {}: disconnected by the guest", port);1159} else {1160error!("vsock: Attempted to close unopened port: {}", port);1161}1162is_open = false;1163}1164vsock_op::VIRTIO_VSOCK_OP_RW => {1165match self.handle_vsock_guest_data(header, data, ex).await {1166Ok(()) => {1167if self1168.check_free_buffer_threshold(header)1169.await1170.unwrap_or(false)1171{1172// Send a credit update if we're below the minimum free1173// buffer size. We skip this if the connection is closed,1174// which could've happened if we were closed on the other1175// end.1176info!(1177"vsock: port {}: Buffer below threshold; sending credit update.",1178port1179);1180self.send_vsock_credit_update(send_queue, rx_queue_evt, header)1181.await;1182}1183}1184Err(e) => {1185error!("vsock: port {}: resetting connection: {}", port, e);1186self.send_vsock_reset(send_queue, rx_queue_evt, header)1187.await;1188is_open = false;1189}1190}1191}1192// An update from our peer with their buffer state, which they are sending1193// (probably) due to a a credit request *we* made.1194vsock_op::VIRTIO_VSOCK_OP_CREDIT_UPDATE => {1195let port = PortPair::from_tx_header(&header);1196let mut connections = self.connections.lock().await;1197if let Some(connection) = connections.get_mut(&port) {1198connection.peer_recv_cnt = header.fwd_cnt.to_native() as usize;1199connection.peer_buf_alloc = header.buf_alloc.to_native() as usize;1200} else {1201error!("vsock: port {}: got credit update on unknown port", port);1202is_open = false;1203}1204}1205// A request from our peer to get *our* buffer state. We reply to the RX queue.1206vsock_op::VIRTIO_VSOCK_OP_CREDIT_REQUEST => {1207info!(1208"vsock: port {}: Got credit request from peer; sending credit update.",1209port,1210);1211self.send_vsock_credit_update(send_queue, rx_queue_evt, header)1212.await;1213}1214_ => {1215error!(1216"vsock: port {}: unknown operation requested, dropping packet",1217port1218);1219}1220}1221is_open1222}12231224// Checks if how much free buffer our peer thinks that *we* have available1225// is below our threshold percentage. If the connection is closed, returns `None`.1226async fn check_free_buffer_threshold(&self, header: virtio_vsock_hdr) -> Option<bool> {1227let mut connections = self.connections.lock().await;1228let port = PortPair::from_tx_header(&header);1229connections.get_mut(&port).map(|connection| {1230let threshold: usize = (MIN_FREE_BUFFER_PCT * connection.buf_alloc as f64) as usize;1231connection.buf_alloc - (connection.recv_cnt - connection.prev_recv_cnt) < threshold1232})1233}12341235async fn send_vsock_credit_update(1236&self,1237send_queue: &Arc<RwLock<Queue>>,1238rx_queue_evt: &mut EventAsync,1239header: virtio_vsock_hdr,1240) {1241let mut connections = self.connections.lock().await;1242let port = PortPair::from_tx_header(&header);12431244if let Some(connection) = connections.get_mut(&port) {1245let mut response = virtio_vsock_hdr {1246src_cid: { header.dst_cid },1247dst_cid: { header.src_cid },1248src_port: { header.dst_port },1249dst_port: { header.src_port },1250len: 0.into(),1251type_: TYPE_STREAM_SOCKET.into(),1252op: vsock_op::VIRTIO_VSOCK_OP_CREDIT_UPDATE.into(),1253buf_alloc: Le32::from(connection.buf_alloc as u32),1254fwd_cnt: Le32::from(connection.recv_cnt as u32),1255..Default::default()1256};12571258connection.prev_recv_cnt = connection.recv_cnt;12591260// Safe because virtio_vsock_hdr is a simple data struct and converts cleanly1261// to bytes1262self.write_bytes_to_queue(1263&mut *send_queue.lock().await,1264rx_queue_evt,1265response.as_mut_bytes(),1266)1267.await1268.unwrap_or_else(|_| panic!("vsock: port {port}: failed to write to queue"));1269} else {1270error!(1271"vsock: port {}: error sending credit update on unknown port",1272port1273);1274}1275}12761277async fn send_vsock_reset(1278&self,1279send_queue: &Arc<RwLock<Queue>>,1280rx_queue_evt: &mut EventAsync,1281header: virtio_vsock_hdr,1282) {1283let mut connections = self.connections.lock().await;1284let port = PortPair::from_tx_header(&header);1285if let Some(connection) = connections.remove(&port) {1286let mut response = virtio_vsock_hdr {1287src_cid: { header.dst_cid },1288dst_cid: { header.src_cid },1289src_port: { header.dst_port },1290dst_port: { header.src_port },1291len: 0.into(),1292type_: TYPE_STREAM_SOCKET.into(),1293op: vsock_op::VIRTIO_VSOCK_OP_RST.into(),1294buf_alloc: Le32::from(connection.buf_alloc as u32),1295fwd_cnt: Le32::from(connection.recv_cnt as u32),1296..Default::default()1297};12981299// Safe because virtio_vsock_hdr is a simple data struct and converts cleanly1300// to bytes1301self.write_bytes_to_queue(1302&mut *send_queue.lock().await,1303rx_queue_evt,1304response.as_mut_bytes(),1305)1306.await1307.expect("failed to write to queue");1308} else {1309error!("vsock: port {}: error closing unknown port", port);1310}1311}13121313async fn write_bytes_to_queue(1314&self,1315queue: &mut Queue,1316queue_evt: &mut EventAsync,1317bytes: &[u8],1318) -> Result<()> {1319let mut avail_desc = match queue.next_async(queue_evt).await {1320Ok(d) => d,1321Err(e) => {1322error!("vsock: failed to read descriptor {}", e);1323return Err(VsockError::AwaitQueue(e));1324}1325};1326self.write_bytes_to_queue_inner(queue, avail_desc, bytes)1327}13281329async fn write_bytes_to_queue_interruptable(1330&self,1331queue: &mut Queue,1332queue_evt: &mut EventAsync,1333bytes: &[u8],1334mut stop_rx: &mut oneshot::Receiver<()>,1335) -> Result<()> {1336let mut avail_desc = match queue.next_async_interruptable(queue_evt, stop_rx).await {1337Ok(d) => match d {1338Some(desc) => desc,1339None => return Ok(()),1340},1341Err(e) => {1342error!("vsock: failed to read descriptor {}", e);1343return Err(VsockError::AwaitQueue(e));1344}1345};1346self.write_bytes_to_queue_inner(queue, avail_desc, bytes)1347}13481349fn write_bytes_to_queue_inner(1350&self,1351queue: &mut Queue,1352mut desc_chain: DescriptorChain,1353bytes: &[u8],1354) -> Result<()> {1355let writer = &mut desc_chain.writer;1356let res = writer.write_all(bytes);13571358if let Err(e) = res {1359error!(1360"vsock: failed to write {} bytes to queue, err: {:?}",1361bytes.len(),1362e1363);1364return Err(VsockError::WriteQueue(e));1365}13661367if writer.bytes_written() > 0 {1368queue.add_used(desc_chain);1369queue.trigger_interrupt();1370Ok(())1371} else {1372error!("vsock: failed to write bytes to queue");1373Err(VsockError::WriteQueue(std::io::Error::other(1374"failed to write bytes to queue",1375)))1376}1377}13781379async fn process_event_queue(1380&self,1381mut queue: Queue,1382mut queue_evt: EventAsync,1383mut stop_rx: oneshot::Receiver<()>,1384mut vsock_event_receiver: mpsc::Receiver<virtio_vsock_event>,1385) -> Result<Queue> {1386loop {1387let vsock_event = select_biased! {1388vsock_event = vsock_event_receiver.next() => {1389vsock_event1390}1391_ = stop_rx => {1392break;1393}1394};1395let vsock_event = match vsock_event {1396Some(event) => event,1397None => break,1398};1399self.write_bytes_to_queue_interruptable(1400&mut queue,1401&mut queue_evt,1402vsock_event.as_bytes(),1403&mut stop_rx,1404)1405.await?;1406}1407Ok(queue)1408}14091410fn run(1411mut self,1412rx_queue: Queue,1413tx_queue: Queue,1414event_queue: Queue,1415kill_evt: Event,1416) -> Result<Option<(PausedQueues, VsockConnectionMap)>> {1417let rx_queue_evt = rx_queue1418.event()1419.try_clone()1420.map_err(VsockError::CloneDescriptor)?;14211422// Note that this mutex won't ever be contended because the HandleExecutor is single1423// threaded. We need the mutex for compile time correctness, but technically it is not1424// actually providing mandatory locking, at least not at the moment. If we later use a1425// multi-threaded executor, then this lock will be important.1426let rx_queue_arc = Arc::new(RwLock::new(rx_queue));14271428// Run executor / create futures in a scope, preventing extra reference to `rx_queue_arc`.1429let res = {1430let ex = Executor::new().unwrap();14311432let rx_evt_async = EventAsync::new(1433rx_queue_evt1434.try_clone()1435.map_err(VsockError::CloneDescriptor)?,1436&ex,1437)1438.expect("Failed to set up the rx queue event");1439let mut stop_queue_oneshots = Vec::new();14401441let vsock_event_receiver = self1442.device_event_queue_rx1443.take()1444.expect("event queue rx must be present");14451446let stop_rx = create_stop_oneshot(&mut stop_queue_oneshots);1447let rx_handler =1448self.process_rx_queue(rx_queue_arc.clone(), rx_evt_async, &ex, stop_rx);1449let rx_handler = rx_handler.fuse();1450pin_mut!(rx_handler);14511452let (send, recv) = mpsc::channel(CHANNEL_SIZE);14531454let tx_evt_async = EventAsync::new(1455tx_queue1456.event()1457.try_clone()1458.map_err(VsockError::CloneDescriptor)?,1459&ex,1460)1461.expect("Failed to set up the tx queue event");1462let stop_rx = create_stop_oneshot(&mut stop_queue_oneshots);1463let tx_handler = self.process_tx_queue(tx_queue, tx_evt_async, send, stop_rx);1464let tx_handler = tx_handler.fuse();1465pin_mut!(tx_handler);14661467let stop_rx = create_stop_oneshot(&mut stop_queue_oneshots);1468let packet_handler =1469self.process_tx_packets(&rx_queue_arc, rx_queue_evt, recv, &ex, stop_rx);1470let packet_handler = packet_handler.fuse();1471pin_mut!(packet_handler);14721473let event_evt_async = EventAsync::new(1474event_queue1475.event()1476.try_clone()1477.map_err(VsockError::CloneDescriptor)?,1478&ex,1479)1480.expect("Failed to set up the event queue event");1481let stop_rx = create_stop_oneshot(&mut stop_queue_oneshots);1482let event_handler = self.process_event_queue(1483event_queue,1484event_evt_async,1485stop_rx,1486vsock_event_receiver,1487);1488let event_handler = event_handler.fuse();1489pin_mut!(event_handler);14901491let kill_evt = EventAsync::new(kill_evt, &ex).expect("Failed to set up the kill event");1492let kill_handler = kill_evt.next_val();1493pin_mut!(kill_handler);14941495let mut device_event_queue_tx = self.device_event_queue_tx.clone();1496if self.send_protocol_reset {1497ex.run_until(async move { device_event_queue_tx.send(1498virtio_vsock_event {1499id: virtio_sys::virtio_vsock::virtio_vsock_event_id_VIRTIO_VSOCK_EVENT_TRANSPORT_RESET1500.into(),1501}).await1502}).expect("failed to write to empty mpsc queue.");1503}15041505ex.run_until(async {1506select! {1507_ = kill_handler.fuse() => (),1508_ = rx_handler => return Err(anyhow!("rx_handler stopped unexpetedly")),1509_ = tx_handler => return Err(anyhow!("tx_handler stop unexpectedly.")),1510_ = packet_handler => return Err(anyhow!("packet_handler stop unexpectedly.")),1511_ = event_handler => return Err(anyhow!("event_handler stop unexpectedly.")),1512}1513// kill_evt has fired15141515for stop_tx in stop_queue_oneshots {1516if stop_tx.send(()).is_err() {1517return Err(anyhow!("failed to request stop for queue future"));1518}1519}15201521rx_handler.await.context("Failed to stop rx handler.")?;1522packet_handler.await;15231524Ok((1525tx_handler.await.context("Failed to stop tx handler.")?,1526event_handler1527.await1528.context("Failed to stop event handler.")?,1529))1530})1531};15321533// At this point, a request to stop this worker has been sent or an error has happened in1534// one of the futures, which will stop this worker anyways.15351536let queues_and_connections = match res {1537Ok(main_future_res) => match main_future_res {1538Ok((tx_queue, event_handler_queue)) => {1539let rx_queue = match Arc::try_unwrap(rx_queue_arc) {1540Ok(queue_rw_lock) => queue_rw_lock.into_inner(),1541Err(_) => panic!("failed to recover queue from worker"),1542};1543let paused_queues = PausedQueues::new(rx_queue, tx_queue, event_handler_queue);1544Some((paused_queues, self.connections))1545}1546Err(e) => {1547error!("Error happened in a vsock future: {}", e);1548None1549}1550},1551Err(e) => {1552error!("error happened in executor: {}", e);1553None1554}1555};15561557Ok(queues_and_connections)1558}1559}15601561/// Queues & events for the vsock device.1562struct VsockQueues {1563rx: Queue,1564tx: Queue,1565event: Queue,1566}15671568impl TryFrom<BTreeMap<usize, Queue>> for VsockQueues {1569type Error = anyhow::Error;1570fn try_from(mut queues: BTreeMap<usize, Queue>) -> result::Result<Self, Self::Error> {1571if queues.len() < 3 {1572anyhow::bail!(1573"{} queues were found, but an activated vsock must have at 3 active queues.",1574queues.len()1575);1576}15771578Ok(VsockQueues {1579rx: queues.remove(&0).context("the rx queue is required.")?,1580tx: queues.remove(&1).context("the tx queue is required.")?,1581event: queues.remove(&2).context("the event queue is required.")?,1582})1583}1584}15851586impl From<PausedQueues> for BTreeMap<usize, Queue> {1587fn from(queues: PausedQueues) -> Self {1588let mut ret = BTreeMap::new();1589ret.insert(0, queues.rx_queue);1590ret.insert(1, queues.tx_queue);1591ret.insert(2, queues.event_queue);1592ret1593}1594}15951596struct PausedQueues {1597rx_queue: Queue,1598tx_queue: Queue,1599event_queue: Queue,1600}16011602impl PausedQueues {1603fn new(rx_queue: Queue, tx_queue: Queue, event_queue: Queue) -> Self {1604PausedQueues {1605rx_queue,1606tx_queue,1607event_queue,1608}1609}1610}16111612fn get_pipe_name(guid: &str, pipe: u32) -> String {1613format!("\\\\.\\pipe\\{guid}\\vsock-{pipe}")1614}16151616async fn wait_event_and_return_port_pair(evt: EventAsync, pair: PortPair) -> PortPair {1617// This doesn't reset the event since we have to call GetOverlappedResult1618// on the OVERLAPPED struct first before resetting it.1619let _ = evt.get_io_source_ref().wait_for_handle().await;1620pair1621}162216231624