Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
google
GitHub Repository: google/crosvm
Path: blob/main/devices/src/virtio/vsock/sys/windows/vsock.rs
5394 views
1
// Copyright 2022 The ChromiumOS Authors
2
// Use of this source code is governed by a BSD-style license that can be
3
// found in the LICENSE file.
4
5
use std::cell::RefCell;
6
use std::collections::BTreeMap;
7
use std::collections::HashMap;
8
use std::fmt;
9
use std::fmt::Display;
10
use std::io;
11
use std::io::Read;
12
use std::io::Write;
13
use std::os::windows::io::RawHandle;
14
use std::rc::Rc;
15
use std::result;
16
use std::sync::Arc;
17
use std::thread;
18
19
use anyhow::anyhow;
20
use anyhow::Context;
21
use base::error;
22
use base::info;
23
use base::named_pipes;
24
use base::named_pipes::BlockingMode;
25
use base::named_pipes::FramingMode;
26
use base::named_pipes::OverlappedWrapper;
27
use base::named_pipes::PipeConnection;
28
use base::warn;
29
use base::AsRawDescriptor;
30
use base::Error as SysError;
31
use base::Event;
32
use base::EventExt;
33
use base::WorkerThread;
34
use cros_async::select3;
35
use cros_async::select6;
36
use cros_async::sync::RwLock;
37
use cros_async::AsyncError;
38
use cros_async::EventAsync;
39
use cros_async::Executor;
40
use cros_async::SelectResult;
41
use data_model::Le32;
42
use data_model::Le64;
43
use futures::channel::mpsc;
44
use futures::channel::oneshot;
45
use futures::pin_mut;
46
use futures::select;
47
use futures::select_biased;
48
use futures::stream::FuturesUnordered;
49
use futures::FutureExt;
50
use futures::SinkExt;
51
use futures::StreamExt;
52
use remain::sorted;
53
use serde::Deserialize;
54
use serde::Serialize;
55
use snapshot::AnySnapshot;
56
use thiserror::Error as ThisError;
57
use vm_memory::GuestMemory;
58
use zerocopy::FromBytes;
59
use zerocopy::FromZeros;
60
use zerocopy::IntoBytes;
61
62
use crate::virtio::async_utils;
63
use crate::virtio::copy_config;
64
use crate::virtio::create_stop_oneshot;
65
use crate::virtio::vsock::sys::windows::protocol::virtio_vsock_config;
66
use crate::virtio::vsock::sys::windows::protocol::virtio_vsock_event;
67
use crate::virtio::vsock::sys::windows::protocol::virtio_vsock_hdr;
68
use crate::virtio::vsock::sys::windows::protocol::vsock_op;
69
use crate::virtio::vsock::sys::windows::protocol::TYPE_STREAM_SOCKET;
70
use crate::virtio::DescriptorChain;
71
use crate::virtio::DeviceType;
72
use crate::virtio::Interrupt;
73
use crate::virtio::Queue;
74
use crate::virtio::StoppedWorker;
75
use crate::virtio::VirtioDevice;
76
use crate::virtio::Writer;
77
use crate::Suspendable;
78
79
#[sorted]
80
#[derive(ThisError, Debug)]
81
pub enum VsockError {
82
#[error("Failed to await next descriptor chain from queue: {0}")]
83
AwaitQueue(AsyncError),
84
#[error("OverlappedWrapper error.")]
85
BadOverlappedWrapper,
86
#[error("Failed to clone descriptor: {0}")]
87
CloneDescriptor(SysError),
88
#[error("Failed to create EventAsync: {0}")]
89
CreateEventAsync(AsyncError),
90
#[error("Failed to create wait context: {0}")]
91
CreateWaitContext(SysError),
92
#[error("Failed to read queue: {0}")]
93
ReadQueue(io::Error),
94
#[error("Failed to reset event object: {0}")]
95
ResetEventObject(SysError),
96
#[error("Failed to run executor: {0}")]
97
RunExecutor(AsyncError),
98
#[error("Failed to write to pipe, port {0}: {1}")]
99
WriteFailed(PortPair, io::Error),
100
#[error("Failed to write queue: {0}")]
101
WriteQueue(io::Error),
102
}
103
pub type Result<T> = result::Result<T, VsockError>;
104
105
// Vsock has three virt IO queues: rx, tx, and event.
106
const QUEUE_SIZE: u16 = 256;
107
const QUEUE_SIZES: &[u16] = &[QUEUE_SIZE, QUEUE_SIZE, QUEUE_SIZE];
108
// We overload port numbers so that if message is to be received from
109
// CONNECTION_EVENT_PORT_NUM (invalid port number), we recognize that a
110
// new connection was set up.
111
const CONNECTION_EVENT_PORT_NUM: u32 = u32::MAX;
112
113
/// Number of bytes in a kilobyte. Used to simplify and clarify buffer size definitions.
114
const KB: usize = 1024;
115
116
/// Size of the buffer we read into from the host side named pipe. Note that data flows from the
117
/// host pipe -> this buffer -> rx queue.
118
/// This should be large enough to facilitate fast transmission of host data, see b/232950349.
119
const TEMP_READ_BUF_SIZE_BYTES: usize = 4 * KB;
120
121
/// In the case where the host side named pipe does not have a specified buffer size, we'll default
122
/// to telling the guest that this is the amount of extra RX space available (e.g. buf_alloc).
123
/// This should be larger than the volume of data that the guest will generally send at one time to
124
/// minimize credit update packtes (see MIN_FREE_BUFFER_PCT below).
125
const DEFAULT_BUF_ALLOC_BYTES: usize = 128 * KB;
126
127
/// Minimum free buffer threshold to notify the peer with a credit update
128
/// message. This is in case we are ingesting many messages without an
129
/// opportunity to send a message back to the peer with a buffer size update.
130
/// This value is a percentage of `buf_alloc`.
131
/// TODO(b/204246759): This value was chosen without much more thought than "it
132
/// works". It should probably be adjusted, along with DEFAULT_BUF_ALLOC, to a
133
/// value that makes empirical sense for the packet sizes that we expect to
134
/// receive.
135
/// TODO(b/239848326): Replace float with integer, in order to remove risk
136
/// of losing precision. Ie. change to `10` and perform
137
/// `FOO * MIN_FREE_BUFFER_PCT / 100`
138
const MIN_FREE_BUFFER_PCT: f64 = 0.1;
139
140
// Number of packets to buffer in the tx processing channels.
141
const CHANNEL_SIZE: usize = 256;
142
143
type VsockConnectionMap = RwLock<HashMap<PortPair, VsockConnection>>;
144
145
/// Virtio device for exposing entropy to the guest OS through virtio.
146
pub struct Vsock {
147
guest_cid: u64,
148
host_guid: Option<String>,
149
features: u64,
150
acked_features: u64,
151
worker_thread: Option<WorkerThread<Option<(PausedQueues, VsockConnectionMap)>>>,
152
/// Stores any active connections when the device sleeps. This allows us to sleep/wake
153
/// without disrupting active connections, which is useful when taking a snapshot.
154
sleeping_connections: Option<VsockConnectionMap>,
155
/// If true, we should send a TRANSPORT_RESET event to the guest at the next opportunity.
156
/// Used to inform the guest all connections are broken when we restore a snapshot.
157
needs_transport_reset: bool,
158
}
159
160
/// Snapshotted state of Vsock. These fields are serialized in order to validate they haven't
161
/// changed when this device is restored.
162
#[derive(Serialize, Deserialize)]
163
struct VsockSnapshot {
164
guest_cid: u64,
165
features: u64,
166
acked_features: u64,
167
}
168
169
impl Vsock {
170
pub fn new(guest_cid: u64, host_guid: Option<String>, base_features: u64) -> Result<Vsock> {
171
Ok(Vsock {
172
guest_cid,
173
host_guid,
174
features: base_features,
175
acked_features: 0,
176
worker_thread: None,
177
sleeping_connections: None,
178
needs_transport_reset: false,
179
})
180
}
181
182
fn get_config(&self) -> virtio_vsock_config {
183
virtio_vsock_config {
184
guest_cid: Le64::from(self.guest_cid),
185
}
186
}
187
188
fn stop_worker(&mut self) -> StoppedWorker<(PausedQueues, VsockConnectionMap)> {
189
if let Some(worker_thread) = self.worker_thread.take() {
190
if let Some(queues_and_conns) = worker_thread.stop() {
191
StoppedWorker::WithQueues(Box::new(queues_and_conns))
192
} else {
193
StoppedWorker::MissingQueues
194
}
195
} else {
196
StoppedWorker::AlreadyStopped
197
}
198
}
199
200
fn start_worker(
201
&mut self,
202
mem: GuestMemory,
203
mut queues: VsockQueues,
204
existing_connections: Option<VsockConnectionMap>,
205
) -> anyhow::Result<()> {
206
let rx_queue = queues.rx;
207
let tx_queue = queues.tx;
208
let event_queue = queues.event;
209
210
let host_guid = self.host_guid.clone();
211
let guest_cid = self.guest_cid;
212
let needs_transport_reset = self.needs_transport_reset;
213
self.needs_transport_reset = false;
214
self.worker_thread = Some(WorkerThread::start(
215
"userspace_virtio_vsock",
216
move |kill_evt| {
217
let mut worker = Worker::new(
218
mem,
219
host_guid,
220
guest_cid,
221
existing_connections,
222
needs_transport_reset,
223
);
224
let result = worker.run(rx_queue, tx_queue, event_queue, kill_evt);
225
226
match result {
227
Err(e) => {
228
error!("userspace vsock worker thread exited with error: {:?}", e);
229
None
230
}
231
Ok(paused_queues_and_connections_option) => {
232
paused_queues_and_connections_option
233
}
234
}
235
},
236
));
237
238
Ok(())
239
}
240
}
241
242
impl VirtioDevice for Vsock {
243
fn keep_rds(&self) -> Vec<RawHandle> {
244
Vec::new()
245
}
246
247
fn read_config(&self, offset: u64, data: &mut [u8]) {
248
copy_config(data, 0, self.get_config().as_bytes(), offset);
249
}
250
251
fn device_type(&self) -> DeviceType {
252
DeviceType::Vsock
253
}
254
255
fn queue_max_sizes(&self) -> &[u16] {
256
QUEUE_SIZES
257
}
258
259
fn features(&self) -> u64 {
260
self.features
261
}
262
263
fn ack_features(&mut self, value: u64) {
264
self.acked_features |= value;
265
}
266
267
fn activate(
268
&mut self,
269
mem: GuestMemory,
270
_interrupt: Interrupt,
271
mut queues: BTreeMap<usize, Queue>,
272
) -> anyhow::Result<()> {
273
if queues.len() != QUEUE_SIZES.len() {
274
return Err(anyhow!(
275
"Failed to activate vsock device. queues.len(): {} != {}",
276
queues.len(),
277
QUEUE_SIZES.len(),
278
));
279
}
280
281
let vsock_queues = VsockQueues {
282
rx: queues.remove(&0).unwrap(),
283
tx: queues.remove(&1).unwrap(),
284
event: queues.remove(&2).unwrap(),
285
};
286
287
self.start_worker(mem, vsock_queues, None)
288
}
289
290
fn virtio_sleep(&mut self) -> anyhow::Result<Option<BTreeMap<usize, Queue>>> {
291
match self.stop_worker() {
292
StoppedWorker::WithQueues(paused_queues_and_conns) => {
293
let (queues, sleeping_connections) = *paused_queues_and_conns;
294
self.sleeping_connections = Some(sleeping_connections);
295
Ok(Some(queues.into()))
296
}
297
StoppedWorker::MissingQueues => {
298
anyhow::bail!("vsock queue workers did not stop cleanly")
299
}
300
StoppedWorker::AlreadyStopped => {
301
// The device isn't in the activated state.
302
Ok(None)
303
}
304
}
305
}
306
307
fn virtio_wake(
308
&mut self,
309
queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>,
310
) -> anyhow::Result<()> {
311
if let Some((mem, _interrupt, queues)) = queues_state {
312
let connections = self.sleeping_connections.take();
313
self.start_worker(
314
mem,
315
queues
316
.try_into()
317
.expect("Failed to convert queue BTreeMap to VsockQueues"),
318
connections,
319
)?;
320
}
321
Ok(())
322
}
323
324
fn virtio_snapshot(&mut self) -> anyhow::Result<AnySnapshot> {
325
AnySnapshot::to_any(VsockSnapshot {
326
guest_cid: self.guest_cid,
327
features: self.features,
328
acked_features: self.acked_features,
329
})
330
.context("failed to serialize vsock snapshot")
331
}
332
333
fn virtio_restore(&mut self, data: AnySnapshot) -> anyhow::Result<()> {
334
let vsock_snapshot: VsockSnapshot =
335
AnySnapshot::from_any(data).context("error deserializing vsock snapshot")?;
336
anyhow::ensure!(
337
self.guest_cid == vsock_snapshot.guest_cid,
338
"expected guest_cid to match, but they did not. Live: {}, snapshot: {}",
339
self.guest_cid,
340
vsock_snapshot.guest_cid
341
);
342
anyhow::ensure!(
343
self.features == vsock_snapshot.features,
344
"vsock: expected features to match, but they did not. Live: {}, snapshot: {}",
345
self.features,
346
vsock_snapshot.features
347
);
348
self.acked_features = vsock_snapshot.acked_features;
349
self.needs_transport_reset = true;
350
351
Ok(())
352
}
353
}
354
355
#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)]
356
pub struct PortPair {
357
host: u32,
358
guest: u32,
359
}
360
361
impl Display for PortPair {
362
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
363
write!(f, "(host port: {}, guest port: {})", self.host, self.guest)
364
}
365
}
366
367
impl PortPair {
368
fn from_tx_header(header: &virtio_vsock_hdr) -> PortPair {
369
PortPair {
370
host: header.dst_port.to_native(),
371
guest: header.src_port.to_native(),
372
}
373
}
374
}
375
376
// Note: variables herein do not have to be atomic because this struct is guarded
377
// by a RwLock.
378
struct VsockConnection {
379
// The guest port.
380
guest_port: Le32,
381
382
// The actual named (asynchronous) pipe connection.
383
pipe: PipeConnection,
384
// The overlapped struct contains an event object for the named pipe.
385
// This lets us select() on the pipes by waiting on the events.
386
// This is for Reads only.
387
overlapped: Box<OverlappedWrapper>,
388
// Read buffer for the named pipes. These are needed because reads complete
389
// asynchronously.
390
buffer: Box<[u8; TEMP_READ_BUF_SIZE_BYTES]>,
391
392
// Total free-running count of received bytes.
393
recv_cnt: usize,
394
395
// Total free-running count of received bytes that the peer has been informed of.
396
prev_recv_cnt: usize,
397
398
// Total auxiliary buffer space available to receive packets from the driver, not including
399
// the virtqueue itself. For us, this is tx buffer on the named pipe into which we drain
400
// packets for the connection. Note that if the named pipe has a grow on demand TX buffer,
401
// we use DEFAULT_BUF_ALLOC instead.
402
buf_alloc: usize,
403
404
// Peer (driver) total free-running count of received bytes.
405
peer_recv_cnt: usize,
406
407
// Peer (driver) total rx buffer allocated.
408
peer_buf_alloc: usize,
409
410
// Total free-running count of transmitted bytes.
411
tx_cnt: usize,
412
413
// State tracking for full buffer condition. Currently just used for logging. If the peer's
414
// buffer does not have space for a maximum-sized message (TEMP_READ_BUF_SIZE_BYTES), this
415
// gets set to `true`. Once there's enough space in the buffer, this gets unset.
416
is_buffer_full: bool,
417
}
418
419
struct Worker {
420
mem: GuestMemory,
421
host_guid: Option<String>,
422
guest_cid: u64,
423
// Map of host port to a VsockConnection.
424
connections: VsockConnectionMap,
425
connection_event: Event,
426
device_event_queue_tx: mpsc::Sender<virtio_vsock_event>,
427
device_event_queue_rx: Option<mpsc::Receiver<virtio_vsock_event>>,
428
send_protocol_reset: bool,
429
}
430
431
impl Worker {
432
fn new(
433
mem: GuestMemory,
434
host_guid: Option<String>,
435
guest_cid: u64,
436
existing_connections: Option<VsockConnectionMap>,
437
send_protocol_reset: bool,
438
) -> Worker {
439
// Buffer size here is arbitrary, but must be at least one since we need
440
// to be able to write a reset event to the channel when the device
441
// worker is brought up on a VM restore. Note that we send exactly one
442
// message per VM session, so we should never see these messages backing
443
// up.
444
let (device_event_queue_tx, device_event_queue_rx) = mpsc::channel(4);
445
446
Worker {
447
mem,
448
host_guid,
449
guest_cid,
450
connections: existing_connections.unwrap_or_default(),
451
connection_event: Event::new().unwrap(),
452
device_event_queue_tx,
453
device_event_queue_rx: Some(device_event_queue_rx),
454
send_protocol_reset,
455
}
456
}
457
458
async fn process_rx_queue(
459
&self,
460
recv_queue: Arc<RwLock<Queue>>,
461
mut rx_queue_evt: EventAsync,
462
ex: &Executor,
463
mut stop_rx: oneshot::Receiver<()>,
464
) -> Result<()> {
465
'connections_changed: loop {
466
// Run continuously until exit evt
467
468
// TODO(b/200810561): Optimize this FuturesUnordered code.
469
// Set up the EventAsyncs to select on
470
let futures = FuturesUnordered::new();
471
// This needs to be its own scope since it holds a RwLock on `self.connections`.
472
{
473
let connections = self.connections.read_lock().await;
474
for (port, connection) in connections.iter() {
475
let h_evt = connection
476
.overlapped
477
.get_h_event_ref()
478
.ok_or_else(|| {
479
error!("Missing h_event in OverlappedWrapper.");
480
VsockError::BadOverlappedWrapper
481
})
482
.unwrap()
483
.try_clone()
484
.map_err(|e| {
485
error!("Could not clone h_event.");
486
VsockError::CloneDescriptor(e)
487
})?;
488
let evt_async = EventAsync::new(h_evt, ex).map_err(|e| {
489
error!("Could not create EventAsync.");
490
VsockError::CreateEventAsync(e)
491
})?;
492
futures.push(wait_event_and_return_port_pair(evt_async, *port));
493
}
494
}
495
let connection_evt_clone = self.connection_event.try_clone().map_err(|e| {
496
error!("Could not clone connection_event.");
497
VsockError::CloneDescriptor(e)
498
})?;
499
let connection_evt_async = EventAsync::new(connection_evt_clone, ex).map_err(|e| {
500
error!("Could not create EventAsync.");
501
VsockError::CreateEventAsync(e)
502
})?;
503
futures.push(wait_event_and_return_port_pair(
504
connection_evt_async,
505
PortPair {
506
host: CONNECTION_EVENT_PORT_NUM,
507
guest: 0,
508
},
509
));
510
511
// Wait to service the sockets. Note that for fairness, it is critical that we service
512
// all ready sockets in a single wakeup to avoid starvation. This is why ready_chunks
513
// is used, as it returns all currently *ready* futures from the stream.
514
//
515
// The expect here only triggers if the FuturesUnordered stream is exhausted. This never
516
// happens because it has at least one item, and we only request chunks once.
517
let futures_len = futures.len();
518
let mut ready_chunks = futures.ready_chunks(futures_len);
519
let ports = select_biased! {
520
ports = ready_chunks.next() => {
521
ports.expect("failed to wait on vsock sockets")
522
}
523
_ = stop_rx => {
524
break;
525
}
526
};
527
528
for port in ports {
529
if port.host == CONNECTION_EVENT_PORT_NUM {
530
// New connection event. Setup futures again.
531
if let Err(e) = self.connection_event.reset() {
532
error!("vsock: port: {}: could not reset connection_event.", port);
533
return Err(VsockError::ResetEventObject(e));
534
}
535
continue 'connections_changed;
536
}
537
let mut connections = self.connections.lock().await;
538
let connection = if let Some(conn) = connections.get_mut(&port) {
539
conn
540
} else {
541
// We could have been scheduled to run the rx queue *before* the connection was
542
// closed. In that case, we do nothing. The code which closed the connection
543
// (e.g. in response to a message in the tx/rx queues) will handle notifying
544
// the guest/host as required.
545
continue 'connections_changed;
546
};
547
548
// Check if the peer has enough space in their buffer to
549
// receive the maximum amount of data that we could possibly
550
// read from the host pipe. If not, we should continue to
551
// process other sockets as each socket has an independent
552
// buffer.
553
let peer_free_buf_size =
554
connection.peer_buf_alloc - (connection.tx_cnt - connection.peer_recv_cnt);
555
if peer_free_buf_size < TEMP_READ_BUF_SIZE_BYTES {
556
if !connection.is_buffer_full {
557
warn!(
558
"vsock: port {}: Peer has insufficient free buffer space ({} bytes available)",
559
port, peer_free_buf_size
560
);
561
connection.is_buffer_full = true;
562
}
563
continue;
564
} else if connection.is_buffer_full {
565
connection.is_buffer_full = false;
566
}
567
568
let pipe_connection = &mut connection.pipe;
569
let overlapped = &mut connection.overlapped;
570
let guest_port = connection.guest_port;
571
let buffer = &mut connection.buffer;
572
573
match overlapped.get_h_event_ref() {
574
Some(h_event) => {
575
if let Err(e) = h_event.reset() {
576
error!(
577
"vsock: port: {}: Could not reset event in OverlappedWrapper.",
578
port
579
);
580
return Err(VsockError::ResetEventObject(e));
581
}
582
}
583
None => {
584
error!(
585
"vsock: port: {}: missing h_event in OverlappedWrapper.",
586
port
587
);
588
return Err(VsockError::BadOverlappedWrapper);
589
}
590
}
591
592
let data_size = match pipe_connection.get_overlapped_result(&mut *overlapped) {
593
Ok(size) => size as usize,
594
Err(e) => {
595
error!("vsock: port {}: Failed to read from pipe {}", port, e);
596
// TODO(b/237278629): Close the connection if we fail to read.
597
continue 'connections_changed;
598
}
599
};
600
601
let response_header = virtio_vsock_hdr {
602
src_cid: 2.into(), // Host CID
603
dst_cid: self.guest_cid.into(), // Guest CID
604
src_port: Le32::from(port.host),
605
dst_port: guest_port,
606
len: Le32::from(data_size as u32),
607
type_: TYPE_STREAM_SOCKET.into(),
608
op: vsock_op::VIRTIO_VSOCK_OP_RW.into(),
609
buf_alloc: Le32::from(connection.buf_alloc as u32),
610
fwd_cnt: Le32::from(connection.recv_cnt as u32),
611
..Default::default()
612
};
613
614
connection.prev_recv_cnt = connection.recv_cnt;
615
616
// We have to only write to the queue once, so we construct a new buffer
617
// with the concatenated header and data.
618
const HEADER_SIZE: usize = std::mem::size_of::<virtio_vsock_hdr>();
619
let data_read = &buffer[..data_size];
620
let mut header_and_data = vec![0u8; HEADER_SIZE + data_size];
621
header_and_data[..HEADER_SIZE].copy_from_slice(response_header.as_bytes());
622
header_and_data[HEADER_SIZE..].copy_from_slice(data_read);
623
{
624
let mut recv_queue_lock = recv_queue.lock().await;
625
let write_fut = self
626
.write_bytes_to_queue(
627
&mut recv_queue_lock,
628
&mut rx_queue_evt,
629
&header_and_data[..],
630
)
631
.fuse();
632
pin_mut!(write_fut);
633
// If `stop_rx` is fired but the virt queue is full, this loop will break
634
// without draining the `header_and_data`.
635
select_biased! {
636
write = write_fut => {},
637
_ = stop_rx => {
638
break;
639
}
640
}
641
}
642
643
connection.tx_cnt += data_size;
644
645
// Start reading again so we receive the message and
646
// event signal immediately.
647
648
// SAFETY:
649
// Unsafe because the read could happen at any time
650
// after this function is called. We ensure safety
651
// by allocating the buffer and overlapped struct
652
// on the heap.
653
unsafe {
654
match pipe_connection.read_overlapped(&mut buffer[..], &mut *overlapped) {
655
Ok(()) => {}
656
Err(e) => {
657
error!("vsock: port {}: Failed to read from pipe {}", port, e);
658
}
659
}
660
}
661
}
662
}
663
Ok(())
664
}
665
666
async fn process_tx_queue(
667
&self,
668
mut queue: Queue,
669
mut queue_evt: EventAsync,
670
mut process_packets_queue: mpsc::Sender<(virtio_vsock_hdr, Vec<u8>)>,
671
mut stop_rx: oneshot::Receiver<()>,
672
) -> Result<Queue> {
673
loop {
674
// Run continuously until exit evt
675
let mut avail_desc = match queue
676
.next_async_interruptable(&mut queue_evt, &mut stop_rx)
677
.await
678
{
679
Ok(Some(d)) => d,
680
Ok(None) => break,
681
Err(e) => {
682
error!("vsock: Failed to read descriptor {}", e);
683
return Err(VsockError::AwaitQueue(e));
684
}
685
};
686
687
let reader = &mut avail_desc.reader;
688
while reader.available_bytes() >= std::mem::size_of::<virtio_vsock_hdr>() {
689
let header = match reader.read_obj::<virtio_vsock_hdr>() {
690
Ok(hdr) => hdr,
691
Err(e) => {
692
error!("vsock: Error while reading header: {}", e);
693
break;
694
}
695
};
696
697
let len = header.len.to_native() as usize;
698
if reader.available_bytes() < len {
699
error!("vsock: Error reading packet data");
700
break;
701
}
702
703
let mut data = vec![0_u8; len];
704
if len > 0 {
705
if let Err(e) = reader.read_exact(&mut data) {
706
error!("vosck: failed to read data from tx packet: {:?}", e);
707
}
708
}
709
710
if let Err(e) = process_packets_queue.send((header, data)).await {
711
error!(
712
"Error while sending packet to queue, dropping packet: {:?}",
713
e
714
)
715
};
716
}
717
718
queue.add_used(avail_desc);
719
queue.trigger_interrupt();
720
}
721
722
Ok(queue)
723
}
724
725
fn calculate_buf_alloc_from_pipe(pipe: &PipeConnection, port: PortPair) -> usize {
726
match pipe.get_info() {
727
Ok(info) => {
728
if info.outgoing_buffer_size > 0 {
729
info.outgoing_buffer_size as usize
730
} else {
731
info!(
732
"vsock: port {}: using default extra rx buffer size \
733
(named pipe does not have an explicit buffer size)",
734
port
735
);
736
737
// A zero buffer size implies that the buffer grows as
738
// needed. We set our own cap here for flow control
739
// purposes.
740
DEFAULT_BUF_ALLOC_BYTES
741
}
742
}
743
Err(e) => {
744
error!(
745
"vsock: port {}: failed to get named pipe info, using default \
746
buf size. Error: {}",
747
port, e
748
);
749
DEFAULT_BUF_ALLOC_BYTES
750
}
751
}
752
}
753
754
/// Processes a connection request and returns whether to return a response (true), or reset
755
/// (false).
756
async fn handle_vsock_connection_request(&self, header: virtio_vsock_hdr) -> bool {
757
let port = PortPair::from_tx_header(&header);
758
info!("vsock: port {}: Received connection request", port);
759
760
if self.connections.read_lock().await.contains_key(&port) {
761
// Connection exists, nothing for us to do.
762
warn!(
763
"vsock: port {}: accepting connection request on already connected port",
764
port
765
);
766
return true;
767
}
768
769
if self.host_guid.is_none() {
770
error!(
771
"vsock: port {}: Cannot accept guest-initiated connections \
772
without host-guid, rejecting connection",
773
port,
774
);
775
return false;
776
}
777
778
// We have a new connection to establish.
779
let mut overlapped_wrapper =
780
Box::new(OverlappedWrapper::new(/* include_event= */ true).unwrap());
781
let pipe_result = named_pipes::create_client_pipe(
782
get_pipe_name(
783
self.host_guid.as_ref().unwrap(),
784
header.dst_port.to_native(),
785
)
786
.as_str(),
787
&FramingMode::Byte,
788
&BlockingMode::Wait,
789
true, /* overlapped */
790
);
791
792
match pipe_result {
793
Ok(mut pipe_connection) => {
794
let mut buffer = Box::new([0u8; TEMP_READ_BUF_SIZE_BYTES]);
795
info!("vsock: port {}: created client pipe", port);
796
797
// SAFETY:
798
// Unsafe because the read could happen at any time
799
// after this function is called. We ensure safety
800
// by allocating the buffer and overlapped struct
801
// on the heap.
802
unsafe {
803
match pipe_connection.read_overlapped(&mut buffer[..], &mut overlapped_wrapper)
804
{
805
Ok(()) => {}
806
Err(e) => {
807
error!("vsock: port {}: Failed to read from pipe {}", port, e);
808
return false;
809
}
810
}
811
}
812
info!("vsock: port {}: started read on client pipe", port);
813
814
let buf_alloc = Self::calculate_buf_alloc_from_pipe(&pipe_connection, port);
815
let connection = VsockConnection {
816
guest_port: header.src_port,
817
pipe: pipe_connection,
818
overlapped: overlapped_wrapper,
819
peer_buf_alloc: header.buf_alloc.to_native() as usize,
820
peer_recv_cnt: header.fwd_cnt.to_native() as usize,
821
buf_alloc,
822
buffer,
823
// The connection has just been made, so we haven't received
824
// anything yet.
825
recv_cnt: 0_usize,
826
prev_recv_cnt: 0_usize,
827
tx_cnt: 0_usize,
828
is_buffer_full: false,
829
};
830
self.connections.lock().await.insert(port, connection);
831
self.connection_event.signal().unwrap_or_else(|_| {
832
panic!("Failed to signal new connection event for vsock port {port}.")
833
});
834
info!("vsock: port {}: signaled connection ready", port);
835
true
836
}
837
Err(e) => {
838
info!(
839
"vsock: No waiting pipe connection on port {}, \
840
not connecting (err: {:?})",
841
port, e
842
);
843
false
844
}
845
}
846
}
847
848
async fn handle_vsock_guest_data(
849
&self,
850
header: virtio_vsock_hdr,
851
data: &[u8],
852
ex: &Executor,
853
) -> Result<()> {
854
let port = PortPair::from_tx_header(&header);
855
let mut overlapped_wrapper = OverlappedWrapper::new(/* include_event= */ true).unwrap();
856
{
857
let mut connections = self.connections.lock().await;
858
if let Some(connection) = connections.get_mut(&port) {
859
// Update peer buffer/recv counters
860
connection.peer_recv_cnt = header.fwd_cnt.to_native() as usize;
861
connection.peer_buf_alloc = header.buf_alloc.to_native() as usize;
862
863
let pipe = &mut connection.pipe;
864
// We have to provide a OVERLAPPED struct to write to the pipe.
865
//
866
// SAFETY: safe because data & overlapped_wrapper live until the
867
// overlapped operation completes (we wait on completion below).
868
if let Err(e) = unsafe { pipe.write_overlapped(data, &mut overlapped_wrapper) } {
869
return Err(VsockError::WriteFailed(port, e));
870
}
871
} else {
872
error!(
873
"vsock: Guest attempted to send data packet over unconnected \
874
port ({}), dropping packet",
875
port
876
);
877
return Ok(());
878
}
879
}
880
if let Some(write_completed_event) = overlapped_wrapper.get_h_event_ref() {
881
// Don't block the executor while the write completes. This time should
882
// always be negligible, but will sometimes be non-zero in cases where
883
// traffic is high on the NamedPipe, especially a duplex pipe.
884
if let Ok(cloned_event) = write_completed_event.try_clone() {
885
if let Ok(async_event) = EventAsync::new_without_reset(cloned_event, ex) {
886
let _ = async_event.next_val().await;
887
} else {
888
error!(
889
"vsock: port {}: Failed to convert write event to async",
890
port
891
);
892
}
893
} else {
894
error!(
895
"vsock: port {}: Failed to clone write completion event",
896
port
897
);
898
}
899
} else {
900
error!(
901
"vsock: port: {}: Failed to get overlapped event for write",
902
port
903
);
904
}
905
906
let mut connections = self.connections.lock().await;
907
if let Some(connection) = connections.get_mut(&port) {
908
let pipe = &mut connection.pipe;
909
match pipe.get_overlapped_result(&mut overlapped_wrapper) {
910
Ok(len) => {
911
// We've received bytes from the guest, account for them in our
912
// received bytes counter.
913
connection.recv_cnt += len as usize;
914
915
if len != data.len() as u32 {
916
return Err(VsockError::WriteFailed(
917
port,
918
std::io::Error::other(format!(
919
"port {} failed to write correct number of bytes:
920
(expected: {}, wrote: {})",
921
port,
922
data.len(),
923
len
924
)),
925
));
926
}
927
}
928
Err(e) => {
929
return Err(VsockError::WriteFailed(port, e));
930
}
931
}
932
} else {
933
error!(
934
"vsock: Guest attempted to send data packet over unconnected \
935
port ({}), dropping packet",
936
port
937
);
938
}
939
Ok(())
940
}
941
942
async fn process_tx_packets(
943
&self,
944
send_queue: &Arc<RwLock<Queue>>,
945
rx_queue_evt: Event,
946
mut packet_recv_queue: mpsc::Receiver<(virtio_vsock_hdr, Vec<u8>)>,
947
ex: &Executor,
948
mut stop_rx: oneshot::Receiver<()>,
949
) {
950
let mut packet_queues = HashMap::new();
951
let mut futures = FuturesUnordered::new();
952
// Push a pending future that will never complete into FuturesUnordered.
953
// This will keep us from spinning on spurious notifications when we
954
// don't have any open connections.
955
futures.push(std::future::pending::<PortPair>().left_future());
956
957
let mut stop_future = FuturesUnordered::new();
958
stop_future.push(stop_rx);
959
loop {
960
let (new_packet, connection, stop_rx_res) =
961
select3(packet_recv_queue.next(), futures.next(), stop_future.next()).await;
962
match connection {
963
SelectResult::Finished(Some(port)) => {
964
packet_queues.remove(&port);
965
}
966
SelectResult::Finished(_) => {
967
// This is only triggered when FuturesUnordered completes
968
// all pending futures. Right now, this can never happen, as
969
// we have a pending future that we push that will never
970
// complete.
971
}
972
SelectResult::Pending(_) => {
973
// Nothing to do.
974
}
975
};
976
match new_packet {
977
SelectResult::Finished(Some(packet)) => {
978
let port = PortPair::from_tx_header(&packet.0);
979
let queue = packet_queues.entry(port).or_insert_with(|| {
980
let (send, recv) = mpsc::channel(CHANNEL_SIZE);
981
let event_async = EventAsync::new(
982
rx_queue_evt.try_clone().expect("Failed to clone event"),
983
ex,
984
)
985
.expect("Failed to set up the rx queue event");
986
futures.push(
987
self.process_tx_packets_for_port(
988
port,
989
recv,
990
send_queue,
991
event_async,
992
ex,
993
)
994
.right_future(),
995
);
996
send
997
});
998
// Try to send the packet. Do not block other ports if the queue is full.
999
if let Err(e) = queue.try_send(packet) {
1000
error!(
1001
"vsock: port {}: error sending packet to queue, dropping packet: {:?}",
1002
port, e
1003
)
1004
}
1005
}
1006
SelectResult::Finished(_) => {
1007
// Triggers when the channel is closed; no more packets coming.
1008
packet_recv_queue.close();
1009
return;
1010
}
1011
SelectResult::Pending(_) => {
1012
// Nothing to do.
1013
}
1014
}
1015
match stop_rx_res {
1016
SelectResult::Finished(_) => {
1017
break;
1018
}
1019
SelectResult::Pending(_) => {
1020
// Nothing to do.
1021
}
1022
}
1023
}
1024
}
1025
1026
async fn process_tx_packets_for_port(
1027
&self,
1028
port: PortPair,
1029
mut packet_recv_queue: mpsc::Receiver<(virtio_vsock_hdr, Vec<u8>)>,
1030
send_queue: &Arc<RwLock<Queue>>,
1031
mut rx_queue_evt: EventAsync,
1032
ex: &Executor,
1033
) -> PortPair {
1034
while let Some((header, data)) = packet_recv_queue.next().await {
1035
if !self
1036
.handle_tx_packet(header, &data, send_queue, &mut rx_queue_evt, ex)
1037
.await
1038
{
1039
packet_recv_queue.close();
1040
if let Ok(Some(_)) = packet_recv_queue.try_next() {
1041
warn!("vsock: closing port {} with unprocessed packets", port);
1042
} else {
1043
info!("vsock: closing port {} cleanly", port)
1044
}
1045
break;
1046
}
1047
}
1048
port
1049
}
1050
1051
async fn handle_tx_packet(
1052
&self,
1053
header: virtio_vsock_hdr,
1054
data: &[u8],
1055
send_queue: &Arc<RwLock<Queue>>,
1056
rx_queue_evt: &mut EventAsync,
1057
ex: &Executor,
1058
) -> bool {
1059
let mut is_open = true;
1060
let port = PortPair::from_tx_header(&header);
1061
match header.op.to_native() {
1062
vsock_op::VIRTIO_VSOCK_OP_INVALID => {
1063
error!("vsock: Invalid Operation requested, dropping packet");
1064
}
1065
vsock_op::VIRTIO_VSOCK_OP_REQUEST => {
1066
let (resp_op, buf_alloc, fwd_cnt) =
1067
if self.handle_vsock_connection_request(header).await {
1068
let connections = self.connections.read_lock().await;
1069
1070
connections.get(&port).map_or_else(
1071
|| {
1072
warn!("vsock: port: {} connection closed during connect", port);
1073
is_open = false;
1074
(vsock_op::VIRTIO_VSOCK_OP_RST, 0, 0)
1075
},
1076
|conn| {
1077
(
1078
vsock_op::VIRTIO_VSOCK_OP_RESPONSE,
1079
conn.buf_alloc as u32,
1080
conn.recv_cnt as u32,
1081
)
1082
},
1083
)
1084
} else {
1085
is_open = false;
1086
(vsock_op::VIRTIO_VSOCK_OP_RST, 0, 0)
1087
};
1088
1089
let response_header = virtio_vsock_hdr {
1090
src_cid: { header.dst_cid },
1091
dst_cid: { header.src_cid },
1092
src_port: { header.dst_port },
1093
dst_port: { header.src_port },
1094
len: 0.into(),
1095
type_: TYPE_STREAM_SOCKET.into(),
1096
op: resp_op.into(),
1097
buf_alloc: Le32::from(buf_alloc),
1098
fwd_cnt: Le32::from(fwd_cnt),
1099
..Default::default()
1100
};
1101
// Safe because virtio_vsock_hdr is a simple data struct and converts cleanly to
1102
// bytes.
1103
self.write_bytes_to_queue(
1104
&mut *send_queue.lock().await,
1105
rx_queue_evt,
1106
response_header.as_bytes(),
1107
)
1108
.await
1109
.expect("vsock: failed to write to queue");
1110
info!(
1111
"vsock: port {}: replied {} to connection request",
1112
port,
1113
if resp_op == vsock_op::VIRTIO_VSOCK_OP_RESPONSE {
1114
"success"
1115
} else {
1116
"reset"
1117
},
1118
);
1119
}
1120
vsock_op::VIRTIO_VSOCK_OP_RESPONSE => {
1121
// TODO(b/237811512): Implement this for host-initiated connections
1122
}
1123
vsock_op::VIRTIO_VSOCK_OP_RST => {
1124
// TODO(b/237811512): Implement this for host-initiated connections
1125
}
1126
vsock_op::VIRTIO_VSOCK_OP_SHUTDOWN => {
1127
// While the header provides flags to specify tx/rx-specific shutdown,
1128
// we only support full shutdown.
1129
// TODO(b/237811512): Provide an optimal way to notify host of shutdowns
1130
// while still maintaining easy reconnections.
1131
let mut connections = self.connections.lock().await;
1132
if connections.remove(&port).is_some() {
1133
let mut response = virtio_vsock_hdr {
1134
src_cid: { header.dst_cid },
1135
dst_cid: { header.src_cid },
1136
src_port: { header.dst_port },
1137
dst_port: { header.src_port },
1138
len: 0.into(),
1139
type_: TYPE_STREAM_SOCKET.into(),
1140
op: vsock_op::VIRTIO_VSOCK_OP_RST.into(),
1141
// There is no buffer on a closed connection
1142
buf_alloc: 0.into(),
1143
// There is no fwd_cnt anymore on a closed connection
1144
fwd_cnt: 0.into(),
1145
..Default::default()
1146
};
1147
// Safe because virtio_vsock_hdr is a simple data struct and converts cleanly to
1148
// bytes
1149
self.write_bytes_to_queue(
1150
&mut *send_queue.lock().await,
1151
rx_queue_evt,
1152
response.as_mut_bytes(),
1153
)
1154
.await
1155
.expect("vsock: failed to write to queue");
1156
self.connection_event
1157
.signal()
1158
.expect("vsock: failed to write to event");
1159
info!("vsock: port: {}: disconnected by the guest", port);
1160
} else {
1161
error!("vsock: Attempted to close unopened port: {}", port);
1162
}
1163
is_open = false;
1164
}
1165
vsock_op::VIRTIO_VSOCK_OP_RW => {
1166
match self.handle_vsock_guest_data(header, data, ex).await {
1167
Ok(()) => {
1168
if self
1169
.check_free_buffer_threshold(header)
1170
.await
1171
.unwrap_or(false)
1172
{
1173
// Send a credit update if we're below the minimum free
1174
// buffer size. We skip this if the connection is closed,
1175
// which could've happened if we were closed on the other
1176
// end.
1177
info!(
1178
"vsock: port {}: Buffer below threshold; sending credit update.",
1179
port
1180
);
1181
self.send_vsock_credit_update(send_queue, rx_queue_evt, header)
1182
.await;
1183
}
1184
}
1185
Err(e) => {
1186
error!("vsock: port {}: resetting connection: {}", port, e);
1187
self.send_vsock_reset(send_queue, rx_queue_evt, header)
1188
.await;
1189
is_open = false;
1190
}
1191
}
1192
}
1193
// An update from our peer with their buffer state, which they are sending
1194
// (probably) due to a a credit request *we* made.
1195
vsock_op::VIRTIO_VSOCK_OP_CREDIT_UPDATE => {
1196
let port = PortPair::from_tx_header(&header);
1197
let mut connections = self.connections.lock().await;
1198
if let Some(connection) = connections.get_mut(&port) {
1199
connection.peer_recv_cnt = header.fwd_cnt.to_native() as usize;
1200
connection.peer_buf_alloc = header.buf_alloc.to_native() as usize;
1201
} else {
1202
error!("vsock: port {}: got credit update on unknown port", port);
1203
is_open = false;
1204
}
1205
}
1206
// A request from our peer to get *our* buffer state. We reply to the RX queue.
1207
vsock_op::VIRTIO_VSOCK_OP_CREDIT_REQUEST => {
1208
info!(
1209
"vsock: port {}: Got credit request from peer; sending credit update.",
1210
port,
1211
);
1212
self.send_vsock_credit_update(send_queue, rx_queue_evt, header)
1213
.await;
1214
}
1215
_ => {
1216
error!(
1217
"vsock: port {}: unknown operation requested, dropping packet",
1218
port
1219
);
1220
}
1221
}
1222
is_open
1223
}
1224
1225
// Checks if how much free buffer our peer thinks that *we* have available
1226
// is below our threshold percentage. If the connection is closed, returns `None`.
1227
async fn check_free_buffer_threshold(&self, header: virtio_vsock_hdr) -> Option<bool> {
1228
let mut connections = self.connections.lock().await;
1229
let port = PortPair::from_tx_header(&header);
1230
connections.get_mut(&port).map(|connection| {
1231
let threshold: usize = (MIN_FREE_BUFFER_PCT * connection.buf_alloc as f64) as usize;
1232
connection.buf_alloc - (connection.recv_cnt - connection.prev_recv_cnt) < threshold
1233
})
1234
}
1235
1236
async fn send_vsock_credit_update(
1237
&self,
1238
send_queue: &Arc<RwLock<Queue>>,
1239
rx_queue_evt: &mut EventAsync,
1240
header: virtio_vsock_hdr,
1241
) {
1242
let mut connections = self.connections.lock().await;
1243
let port = PortPair::from_tx_header(&header);
1244
1245
if let Some(connection) = connections.get_mut(&port) {
1246
let mut response = virtio_vsock_hdr {
1247
src_cid: { header.dst_cid },
1248
dst_cid: { header.src_cid },
1249
src_port: { header.dst_port },
1250
dst_port: { header.src_port },
1251
len: 0.into(),
1252
type_: TYPE_STREAM_SOCKET.into(),
1253
op: vsock_op::VIRTIO_VSOCK_OP_CREDIT_UPDATE.into(),
1254
buf_alloc: Le32::from(connection.buf_alloc as u32),
1255
fwd_cnt: Le32::from(connection.recv_cnt as u32),
1256
..Default::default()
1257
};
1258
1259
connection.prev_recv_cnt = connection.recv_cnt;
1260
1261
// Safe because virtio_vsock_hdr is a simple data struct and converts cleanly
1262
// to bytes
1263
self.write_bytes_to_queue(
1264
&mut *send_queue.lock().await,
1265
rx_queue_evt,
1266
response.as_mut_bytes(),
1267
)
1268
.await
1269
.unwrap_or_else(|_| panic!("vsock: port {port}: failed to write to queue"));
1270
} else {
1271
error!(
1272
"vsock: port {}: error sending credit update on unknown port",
1273
port
1274
);
1275
}
1276
}
1277
1278
async fn send_vsock_reset(
1279
&self,
1280
send_queue: &Arc<RwLock<Queue>>,
1281
rx_queue_evt: &mut EventAsync,
1282
header: virtio_vsock_hdr,
1283
) {
1284
let mut connections = self.connections.lock().await;
1285
let port = PortPair::from_tx_header(&header);
1286
if let Some(connection) = connections.remove(&port) {
1287
let mut response = virtio_vsock_hdr {
1288
src_cid: { header.dst_cid },
1289
dst_cid: { header.src_cid },
1290
src_port: { header.dst_port },
1291
dst_port: { header.src_port },
1292
len: 0.into(),
1293
type_: TYPE_STREAM_SOCKET.into(),
1294
op: vsock_op::VIRTIO_VSOCK_OP_RST.into(),
1295
buf_alloc: Le32::from(connection.buf_alloc as u32),
1296
fwd_cnt: Le32::from(connection.recv_cnt as u32),
1297
..Default::default()
1298
};
1299
1300
// Safe because virtio_vsock_hdr is a simple data struct and converts cleanly
1301
// to bytes
1302
self.write_bytes_to_queue(
1303
&mut *send_queue.lock().await,
1304
rx_queue_evt,
1305
response.as_mut_bytes(),
1306
)
1307
.await
1308
.expect("failed to write to queue");
1309
} else {
1310
error!("vsock: port {}: error closing unknown port", port);
1311
}
1312
}
1313
1314
async fn write_bytes_to_queue(
1315
&self,
1316
queue: &mut Queue,
1317
queue_evt: &mut EventAsync,
1318
bytes: &[u8],
1319
) -> Result<()> {
1320
let mut avail_desc = match queue.next_async(queue_evt).await {
1321
Ok(d) => d,
1322
Err(e) => {
1323
error!("vsock: failed to read descriptor {}", e);
1324
return Err(VsockError::AwaitQueue(e));
1325
}
1326
};
1327
self.write_bytes_to_queue_inner(queue, avail_desc, bytes)
1328
}
1329
1330
async fn write_bytes_to_queue_interruptable(
1331
&self,
1332
queue: &mut Queue,
1333
queue_evt: &mut EventAsync,
1334
bytes: &[u8],
1335
mut stop_rx: &mut oneshot::Receiver<()>,
1336
) -> Result<()> {
1337
let mut avail_desc = match queue.next_async_interruptable(queue_evt, stop_rx).await {
1338
Ok(d) => match d {
1339
Some(desc) => desc,
1340
None => return Ok(()),
1341
},
1342
Err(e) => {
1343
error!("vsock: failed to read descriptor {}", e);
1344
return Err(VsockError::AwaitQueue(e));
1345
}
1346
};
1347
self.write_bytes_to_queue_inner(queue, avail_desc, bytes)
1348
}
1349
1350
fn write_bytes_to_queue_inner(
1351
&self,
1352
queue: &mut Queue,
1353
mut desc_chain: DescriptorChain,
1354
bytes: &[u8],
1355
) -> Result<()> {
1356
let writer = &mut desc_chain.writer;
1357
let res = writer.write_all(bytes);
1358
1359
if let Err(e) = res {
1360
error!(
1361
"vsock: failed to write {} bytes to queue, err: {:?}",
1362
bytes.len(),
1363
e
1364
);
1365
return Err(VsockError::WriteQueue(e));
1366
}
1367
1368
if writer.bytes_written() > 0 {
1369
queue.add_used(desc_chain);
1370
queue.trigger_interrupt();
1371
Ok(())
1372
} else {
1373
error!("vsock: failed to write bytes to queue");
1374
Err(VsockError::WriteQueue(std::io::Error::other(
1375
"failed to write bytes to queue",
1376
)))
1377
}
1378
}
1379
1380
async fn process_event_queue(
1381
&self,
1382
mut queue: Queue,
1383
mut queue_evt: EventAsync,
1384
mut stop_rx: oneshot::Receiver<()>,
1385
mut vsock_event_receiver: mpsc::Receiver<virtio_vsock_event>,
1386
) -> Result<Queue> {
1387
loop {
1388
let vsock_event = select_biased! {
1389
vsock_event = vsock_event_receiver.next() => {
1390
vsock_event
1391
}
1392
_ = stop_rx => {
1393
break;
1394
}
1395
};
1396
let vsock_event = match vsock_event {
1397
Some(event) => event,
1398
None => break,
1399
};
1400
self.write_bytes_to_queue_interruptable(
1401
&mut queue,
1402
&mut queue_evt,
1403
vsock_event.as_bytes(),
1404
&mut stop_rx,
1405
)
1406
.await?;
1407
}
1408
Ok(queue)
1409
}
1410
1411
fn run(
1412
mut self,
1413
rx_queue: Queue,
1414
tx_queue: Queue,
1415
event_queue: Queue,
1416
kill_evt: Event,
1417
) -> Result<Option<(PausedQueues, VsockConnectionMap)>> {
1418
let rx_queue_evt = rx_queue
1419
.event()
1420
.try_clone()
1421
.map_err(VsockError::CloneDescriptor)?;
1422
1423
// Note that this mutex won't ever be contended because the HandleExecutor is single
1424
// threaded. We need the mutex for compile time correctness, but technically it is not
1425
// actually providing mandatory locking, at least not at the moment. If we later use a
1426
// multi-threaded executor, then this lock will be important.
1427
let rx_queue_arc = Arc::new(RwLock::new(rx_queue));
1428
1429
// Run executor / create futures in a scope, preventing extra reference to `rx_queue_arc`.
1430
let res = {
1431
let ex = Executor::new().unwrap();
1432
1433
let rx_evt_async = EventAsync::new(
1434
rx_queue_evt
1435
.try_clone()
1436
.map_err(VsockError::CloneDescriptor)?,
1437
&ex,
1438
)
1439
.expect("Failed to set up the rx queue event");
1440
let mut stop_queue_oneshots = Vec::new();
1441
1442
let vsock_event_receiver = self
1443
.device_event_queue_rx
1444
.take()
1445
.expect("event queue rx must be present");
1446
1447
let stop_rx = create_stop_oneshot(&mut stop_queue_oneshots);
1448
let rx_handler =
1449
self.process_rx_queue(rx_queue_arc.clone(), rx_evt_async, &ex, stop_rx);
1450
let rx_handler = rx_handler.fuse();
1451
pin_mut!(rx_handler);
1452
1453
let (send, recv) = mpsc::channel(CHANNEL_SIZE);
1454
1455
let tx_evt_async = EventAsync::new(
1456
tx_queue
1457
.event()
1458
.try_clone()
1459
.map_err(VsockError::CloneDescriptor)?,
1460
&ex,
1461
)
1462
.expect("Failed to set up the tx queue event");
1463
let stop_rx = create_stop_oneshot(&mut stop_queue_oneshots);
1464
let tx_handler = self.process_tx_queue(tx_queue, tx_evt_async, send, stop_rx);
1465
let tx_handler = tx_handler.fuse();
1466
pin_mut!(tx_handler);
1467
1468
let stop_rx = create_stop_oneshot(&mut stop_queue_oneshots);
1469
let packet_handler =
1470
self.process_tx_packets(&rx_queue_arc, rx_queue_evt, recv, &ex, stop_rx);
1471
let packet_handler = packet_handler.fuse();
1472
pin_mut!(packet_handler);
1473
1474
let event_evt_async = EventAsync::new(
1475
event_queue
1476
.event()
1477
.try_clone()
1478
.map_err(VsockError::CloneDescriptor)?,
1479
&ex,
1480
)
1481
.expect("Failed to set up the event queue event");
1482
let stop_rx = create_stop_oneshot(&mut stop_queue_oneshots);
1483
let event_handler = self.process_event_queue(
1484
event_queue,
1485
event_evt_async,
1486
stop_rx,
1487
vsock_event_receiver,
1488
);
1489
let event_handler = event_handler.fuse();
1490
pin_mut!(event_handler);
1491
1492
let kill_evt = EventAsync::new(kill_evt, &ex).expect("Failed to set up the kill event");
1493
let kill_handler = kill_evt.next_val();
1494
pin_mut!(kill_handler);
1495
1496
let mut device_event_queue_tx = self.device_event_queue_tx.clone();
1497
if self.send_protocol_reset {
1498
ex.run_until(async move { device_event_queue_tx.send(
1499
virtio_vsock_event {
1500
id: virtio_sys::virtio_vsock::virtio_vsock_event_id_VIRTIO_VSOCK_EVENT_TRANSPORT_RESET
1501
.into(),
1502
}).await
1503
}).expect("failed to write to empty mpsc queue.");
1504
}
1505
1506
ex.run_until(async {
1507
select! {
1508
_ = kill_handler.fuse() => (),
1509
_ = rx_handler => return Err(anyhow!("rx_handler stopped unexpetedly")),
1510
_ = tx_handler => return Err(anyhow!("tx_handler stop unexpectedly.")),
1511
_ = packet_handler => return Err(anyhow!("packet_handler stop unexpectedly.")),
1512
_ = event_handler => return Err(anyhow!("event_handler stop unexpectedly.")),
1513
}
1514
// kill_evt has fired
1515
1516
for stop_tx in stop_queue_oneshots {
1517
if stop_tx.send(()).is_err() {
1518
return Err(anyhow!("failed to request stop for queue future"));
1519
}
1520
}
1521
1522
rx_handler.await.context("Failed to stop rx handler.")?;
1523
packet_handler.await;
1524
1525
Ok((
1526
tx_handler.await.context("Failed to stop tx handler.")?,
1527
event_handler
1528
.await
1529
.context("Failed to stop event handler.")?,
1530
))
1531
})
1532
};
1533
1534
// At this point, a request to stop this worker has been sent or an error has happened in
1535
// one of the futures, which will stop this worker anyways.
1536
1537
let queues_and_connections = match res {
1538
Ok(main_future_res) => match main_future_res {
1539
Ok((tx_queue, event_handler_queue)) => {
1540
let rx_queue = match Arc::try_unwrap(rx_queue_arc) {
1541
Ok(queue_rw_lock) => queue_rw_lock.into_inner(),
1542
Err(_) => panic!("failed to recover queue from worker"),
1543
};
1544
let paused_queues = PausedQueues::new(rx_queue, tx_queue, event_handler_queue);
1545
Some((paused_queues, self.connections))
1546
}
1547
Err(e) => {
1548
error!("Error happened in a vsock future: {}", e);
1549
None
1550
}
1551
},
1552
Err(e) => {
1553
error!("error happened in executor: {}", e);
1554
None
1555
}
1556
};
1557
1558
Ok(queues_and_connections)
1559
}
1560
}
1561
1562
/// Queues & events for the vsock device.
1563
struct VsockQueues {
1564
rx: Queue,
1565
tx: Queue,
1566
event: Queue,
1567
}
1568
1569
impl TryFrom<BTreeMap<usize, Queue>> for VsockQueues {
1570
type Error = anyhow::Error;
1571
fn try_from(mut queues: BTreeMap<usize, Queue>) -> result::Result<Self, Self::Error> {
1572
if queues.len() < 3 {
1573
anyhow::bail!(
1574
"{} queues were found, but an activated vsock must have at 3 active queues.",
1575
queues.len()
1576
);
1577
}
1578
1579
Ok(VsockQueues {
1580
rx: queues.remove(&0).context("the rx queue is required.")?,
1581
tx: queues.remove(&1).context("the tx queue is required.")?,
1582
event: queues.remove(&2).context("the event queue is required.")?,
1583
})
1584
}
1585
}
1586
1587
impl From<PausedQueues> for BTreeMap<usize, Queue> {
1588
fn from(queues: PausedQueues) -> Self {
1589
let mut ret = BTreeMap::new();
1590
ret.insert(0, queues.rx_queue);
1591
ret.insert(1, queues.tx_queue);
1592
ret.insert(2, queues.event_queue);
1593
ret
1594
}
1595
}
1596
1597
struct PausedQueues {
1598
rx_queue: Queue,
1599
tx_queue: Queue,
1600
event_queue: Queue,
1601
}
1602
1603
impl PausedQueues {
1604
fn new(rx_queue: Queue, tx_queue: Queue, event_queue: Queue) -> Self {
1605
PausedQueues {
1606
rx_queue,
1607
tx_queue,
1608
event_queue,
1609
}
1610
}
1611
}
1612
1613
fn get_pipe_name(guid: &str, pipe: u32) -> String {
1614
format!("\\\\.\\pipe\\{guid}\\vsock-{pipe}")
1615
}
1616
1617
async fn wait_event_and_return_port_pair(evt: EventAsync, pair: PortPair) -> PortPair {
1618
// This doesn't reset the event since we have to call GetOverlappedResult
1619
// on the OVERLAPPED struct first before resetting it.
1620
let _ = evt.get_io_source_ref().wait_for_handle().await;
1621
pair
1622
}
1623
1624