Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
google
GitHub Repository: google/crosvm
Path: blob/main/devices/src/virtio/block/asynchronous.rs
5394 views
1
// Copyright 2021 The ChromiumOS Authors
2
// Use of this source code is governed by a BSD-style license that can be
3
// found in the LICENSE file.
4
5
use std::cell::RefCell;
6
use std::collections::BTreeMap;
7
use std::collections::BTreeSet;
8
use std::io;
9
use std::io::Write;
10
use std::mem::size_of;
11
#[cfg(windows)]
12
use std::num::NonZeroU32;
13
use std::rc::Rc;
14
use std::result;
15
use std::sync::atomic::AtomicU64;
16
use std::sync::atomic::Ordering;
17
use std::sync::Arc;
18
use std::time::Duration;
19
20
use anyhow::Context;
21
use base::debug;
22
use base::error;
23
use base::info;
24
use base::warn;
25
use base::AsRawDescriptor;
26
use base::Error as SysError;
27
use base::Event;
28
use base::RawDescriptor;
29
use base::Result as SysResult;
30
use base::Timer;
31
use base::Tube;
32
use base::TubeError;
33
use base::WorkerThread;
34
use cros_async::sync::RwLock as AsyncRwLock;
35
use cros_async::AsyncError;
36
use cros_async::AsyncTube;
37
use cros_async::EventAsync;
38
use cros_async::Executor;
39
use cros_async::ExecutorKind;
40
use cros_async::TimerAsync;
41
use data_model::Le16;
42
use data_model::Le32;
43
use data_model::Le64;
44
use disk::AsyncDisk;
45
use disk::DiskFile;
46
use futures::channel::mpsc;
47
use futures::channel::oneshot;
48
use futures::pin_mut;
49
use futures::stream::FuturesUnordered;
50
use futures::stream::StreamExt;
51
use futures::FutureExt;
52
use remain::sorted;
53
use snapshot::AnySnapshot;
54
use thiserror::Error as ThisError;
55
use virtio_sys::virtio_config::VIRTIO_F_RING_PACKED;
56
use vm_control::DiskControlCommand;
57
use vm_control::DiskControlResult;
58
use vm_memory::GuestMemory;
59
use zerocopy::IntoBytes;
60
61
use crate::virtio::async_utils;
62
use crate::virtio::block::sys::*;
63
use crate::virtio::block::DiskOption;
64
use crate::virtio::copy_config;
65
use crate::virtio::device_constants::block::virtio_blk_config;
66
use crate::virtio::device_constants::block::virtio_blk_discard_write_zeroes;
67
use crate::virtio::device_constants::block::virtio_blk_req_header;
68
use crate::virtio::device_constants::block::VIRTIO_BLK_DISCARD_WRITE_ZEROES_FLAG_UNMAP;
69
use crate::virtio::device_constants::block::VIRTIO_BLK_F_BLK_SIZE;
70
use crate::virtio::device_constants::block::VIRTIO_BLK_F_DISCARD;
71
use crate::virtio::device_constants::block::VIRTIO_BLK_F_FLUSH;
72
use crate::virtio::device_constants::block::VIRTIO_BLK_F_MQ;
73
use crate::virtio::device_constants::block::VIRTIO_BLK_F_RO;
74
use crate::virtio::device_constants::block::VIRTIO_BLK_F_SEG_MAX;
75
use crate::virtio::device_constants::block::VIRTIO_BLK_F_WRITE_ZEROES;
76
use crate::virtio::device_constants::block::VIRTIO_BLK_S_IOERR;
77
use crate::virtio::device_constants::block::VIRTIO_BLK_S_OK;
78
use crate::virtio::device_constants::block::VIRTIO_BLK_S_UNSUPP;
79
use crate::virtio::device_constants::block::VIRTIO_BLK_T_DISCARD;
80
use crate::virtio::device_constants::block::VIRTIO_BLK_T_FLUSH;
81
use crate::virtio::device_constants::block::VIRTIO_BLK_T_GET_ID;
82
use crate::virtio::device_constants::block::VIRTIO_BLK_T_IN;
83
use crate::virtio::device_constants::block::VIRTIO_BLK_T_OUT;
84
use crate::virtio::device_constants::block::VIRTIO_BLK_T_WRITE_ZEROES;
85
use crate::virtio::DescriptorChain;
86
use crate::virtio::DeviceType;
87
use crate::virtio::Interrupt;
88
use crate::virtio::Queue;
89
use crate::virtio::Reader;
90
use crate::virtio::VirtioDevice;
91
use crate::virtio::Writer;
92
use crate::PciAddress;
93
94
const DEFAULT_QUEUE_SIZE: u16 = 256;
95
const DEFAULT_NUM_QUEUES: u16 = 16;
96
97
const SECTOR_SHIFT: u8 = 9;
98
const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT;
99
100
const MAX_DISCARD_SECTORS: u32 = u32::MAX;
101
const MAX_WRITE_ZEROES_SECTORS: u32 = u32::MAX;
102
// Arbitrary limits for number of discard/write zeroes segments.
103
const MAX_DISCARD_SEG: u32 = 32;
104
const MAX_WRITE_ZEROES_SEG: u32 = 32;
105
// Hard-coded to 64 KiB (in 512-byte sectors) for now,
106
// but this should probably be based on cluster size for qcow.
107
const DISCARD_SECTOR_ALIGNMENT: u32 = 128;
108
109
#[sorted]
110
#[derive(ThisError, Debug)]
111
enum ExecuteError {
112
#[error("failed to copy ID string: {0}")]
113
CopyId(io::Error),
114
#[error("failed to perform discard or write zeroes; sector={sector} num_sectors={num_sectors} flags={flags}; {ioerr:?}")]
115
DiscardWriteZeroes {
116
ioerr: Option<disk::Error>,
117
sector: u64,
118
num_sectors: u32,
119
flags: u32,
120
},
121
#[error("failed to flush: {0}")]
122
Flush(disk::Error),
123
#[error("not enough space in descriptor chain to write status")]
124
MissingStatus,
125
#[error("out of range")]
126
OutOfRange,
127
#[error("failed to read message: {0}")]
128
Read(io::Error),
129
#[error("io error reading {length} bytes from sector {sector}: {desc_error}")]
130
ReadIo {
131
length: usize,
132
sector: u64,
133
desc_error: disk::Error,
134
},
135
#[error("read only; request_type={request_type}")]
136
ReadOnly { request_type: u32 },
137
#[error("failed to recieve command message: {0}")]
138
ReceivingCommand(TubeError),
139
#[error("failed to send command response: {0}")]
140
SendingResponse(TubeError),
141
#[error("couldn't reset the timer: {0}")]
142
TimerReset(base::Error),
143
#[error("too many segments: {0} > {0}")]
144
TooManySegments(usize, usize),
145
#[error("unsupported ({0})")]
146
Unsupported(u32),
147
#[error("io error writing {length} bytes from sector {sector}: {desc_error}")]
148
WriteIo {
149
length: usize,
150
sector: u64,
151
desc_error: disk::Error,
152
},
153
#[error("failed to write request status: {0}")]
154
WriteStatus(io::Error),
155
}
156
157
enum LogLevel {
158
Debug,
159
Error,
160
}
161
162
impl ExecuteError {
163
fn status(&self) -> u8 {
164
match self {
165
ExecuteError::CopyId(_) => VIRTIO_BLK_S_IOERR,
166
ExecuteError::DiscardWriteZeroes { .. } => VIRTIO_BLK_S_IOERR,
167
ExecuteError::Flush(_) => VIRTIO_BLK_S_IOERR,
168
ExecuteError::MissingStatus => VIRTIO_BLK_S_IOERR,
169
ExecuteError::OutOfRange => VIRTIO_BLK_S_IOERR,
170
ExecuteError::Read(_) => VIRTIO_BLK_S_IOERR,
171
ExecuteError::ReadIo { .. } => VIRTIO_BLK_S_IOERR,
172
ExecuteError::ReadOnly { .. } => VIRTIO_BLK_S_IOERR,
173
ExecuteError::ReceivingCommand(_) => VIRTIO_BLK_S_IOERR,
174
ExecuteError::SendingResponse(_) => VIRTIO_BLK_S_IOERR,
175
ExecuteError::TimerReset(_) => VIRTIO_BLK_S_IOERR,
176
ExecuteError::TooManySegments(_, _) => VIRTIO_BLK_S_IOERR,
177
ExecuteError::WriteIo { .. } => VIRTIO_BLK_S_IOERR,
178
ExecuteError::WriteStatus(_) => VIRTIO_BLK_S_IOERR,
179
ExecuteError::Unsupported(_) => VIRTIO_BLK_S_UNSUPP,
180
}
181
}
182
183
fn log_level(&self) -> LogLevel {
184
match self {
185
// Log disk I/O errors at debug level to avoid flooding the logs.
186
ExecuteError::ReadIo { .. }
187
| ExecuteError::WriteIo { .. }
188
| ExecuteError::Flush { .. }
189
| ExecuteError::DiscardWriteZeroes { .. } => LogLevel::Debug,
190
// Log all other failures as errors.
191
_ => LogLevel::Error,
192
}
193
}
194
}
195
196
/// Errors that happen in block outside of executing a request.
197
/// This includes errors during resize and flush operations.
198
#[sorted]
199
#[derive(ThisError, Debug)]
200
enum ControlError {
201
#[error("failed to fdatasync the disk: {0}")]
202
FdatasyncDisk(disk::Error),
203
#[error("couldn't get a value from a timer for flushing: {0}")]
204
FlushTimer(AsyncError),
205
}
206
207
/// Maximum length of the virtio-block ID string field.
208
const ID_LEN: usize = 20;
209
210
/// Virtio block device identifier.
211
/// This is an ASCII string terminated by a \0, unless all 20 bytes are used,
212
/// in which case the \0 terminator is omitted.
213
type BlockId = [u8; ID_LEN];
214
215
/// Tracks the state of an anynchronous disk.
216
struct DiskState {
217
disk_image: Box<dyn AsyncDisk>,
218
read_only: bool,
219
sparse: bool,
220
id: BlockId,
221
/// A DiskState is owned by each worker's executor and cannot be shared by workers, thus
222
/// `worker_shared_state` holds the state shared by workers in Arc.
223
worker_shared_state: Arc<AsyncRwLock<WorkerSharedState>>,
224
}
225
226
/// Disk state which can be modified by other worker threads
227
struct WorkerSharedState {
228
disk_size: Arc<AtomicU64>,
229
}
230
231
async fn process_one_request(
232
avail_desc: &mut DescriptorChain,
233
disk_state: &AsyncRwLock<DiskState>,
234
flush_timer: &RefCell<TimerAsync<Timer>>,
235
flush_timer_armed: &RefCell<bool>,
236
) -> result::Result<usize, ExecuteError> {
237
let reader = &mut avail_desc.reader;
238
let writer = &mut avail_desc.writer;
239
240
// The last byte of the buffer is virtio_blk_req::status.
241
// Split it into a separate Writer so that status_writer is the final byte and
242
// the original writer is left with just the actual block I/O data.
243
let available_bytes = writer.available_bytes();
244
let status_offset = available_bytes
245
.checked_sub(1)
246
.ok_or(ExecuteError::MissingStatus)?;
247
let mut status_writer = writer.split_at(status_offset);
248
249
let status = match BlockAsync::execute_request(
250
reader,
251
writer,
252
disk_state,
253
flush_timer,
254
flush_timer_armed,
255
)
256
.await
257
{
258
Ok(()) => VIRTIO_BLK_S_OK,
259
Err(e) => {
260
match e.log_level() {
261
LogLevel::Debug => debug!("failed executing disk request: {:#}", e),
262
LogLevel::Error => error!("failed executing disk request: {:#}", e),
263
}
264
e.status()
265
}
266
};
267
268
status_writer
269
.write_all(&[status])
270
.map_err(ExecuteError::WriteStatus)?;
271
Ok(available_bytes)
272
}
273
274
/// Process one descriptor chain asynchronously.
275
async fn process_one_chain(
276
queue: &RefCell<Queue>,
277
mut avail_desc: DescriptorChain,
278
disk_state: &AsyncRwLock<DiskState>,
279
flush_timer: &RefCell<TimerAsync<Timer>>,
280
flush_timer_armed: &RefCell<bool>,
281
) {
282
let len = match process_one_request(&mut avail_desc, disk_state, flush_timer, flush_timer_armed)
283
.await
284
{
285
Ok(len) => len,
286
Err(e) => {
287
error!("block: failed to handle request: {:#}", e);
288
0
289
}
290
};
291
292
let mut queue = queue.borrow_mut();
293
queue.add_used_with_bytes_written(avail_desc, len as u32);
294
queue.trigger_interrupt();
295
}
296
297
// There is one async task running `handle_queue` per virtio queue in use.
298
// Receives messages from the guest and queues a task to complete the operations with the async
299
// executor.
300
async fn handle_queue(
301
disk_state: Rc<AsyncRwLock<DiskState>>,
302
queue: Queue,
303
evt: EventAsync,
304
flush_timer: Rc<RefCell<TimerAsync<Timer>>>,
305
flush_timer_armed: Rc<RefCell<bool>>,
306
mut stop_rx: oneshot::Receiver<()>,
307
) -> Queue {
308
let queue = RefCell::new(queue);
309
let mut background_tasks = FuturesUnordered::new();
310
let evt_future = evt.next_val().fuse();
311
pin_mut!(evt_future);
312
loop {
313
// Wait for the next signal from `evt` and process `background_tasks` in the meantime.
314
//
315
// NOTE: We can't call `evt.next_val()` directly in the `select!` expression. That would
316
// create a new future each time, which, in the completion-based async backends like
317
// io_uring, means we'd submit a new syscall each time (i.e. a race condition on the
318
// eventfd).
319
futures::select! {
320
_ = background_tasks.next() => continue,
321
res = evt_future => {
322
evt_future.set(evt.next_val().fuse());
323
if let Err(e) = res {
324
error!("Failed to read the next queue event: {:#}", e);
325
continue;
326
}
327
}
328
_ = stop_rx => {
329
// Process all the descriptors we've already popped from the queue so that we leave
330
// the queue in a consistent state.
331
background_tasks.collect::<()>().await;
332
return queue.into_inner();
333
}
334
};
335
while let Some(descriptor_chain) = queue.borrow_mut().pop() {
336
background_tasks.push(process_one_chain(
337
&queue,
338
descriptor_chain,
339
&disk_state,
340
&flush_timer,
341
&flush_timer_armed,
342
));
343
}
344
}
345
}
346
347
async fn handle_command_tube(
348
command_tube: &Option<AsyncTube>,
349
interrupt: &RefCell<Option<Interrupt>>,
350
disk_state: Rc<AsyncRwLock<DiskState>>,
351
) -> Result<(), ExecuteError> {
352
let command_tube = match command_tube {
353
Some(c) => c,
354
None => {
355
futures::future::pending::<()>().await;
356
return Ok(());
357
}
358
};
359
loop {
360
match command_tube.next().await {
361
Ok(command) => {
362
let resp = match command {
363
DiskControlCommand::Resize { new_size } => resize(&disk_state, new_size).await,
364
};
365
366
let resp_clone = resp.clone();
367
command_tube
368
.send(resp_clone)
369
.await
370
.map_err(ExecuteError::SendingResponse)?;
371
if let DiskControlResult::Ok = resp {
372
if let Some(interrupt) = &*interrupt.borrow() {
373
interrupt.signal_config_changed();
374
}
375
}
376
}
377
Err(e) => return Err(ExecuteError::ReceivingCommand(e)),
378
}
379
}
380
}
381
382
async fn resize(disk_state: &AsyncRwLock<DiskState>, new_size: u64) -> DiskControlResult {
383
// Acquire exclusive, mutable access to the state so the virtqueue task won't be able to read
384
// the state while resizing.
385
let disk_state = disk_state.lock().await;
386
// Prevent any other worker threads won't be able to do IO.
387
let worker_shared_state = Arc::clone(&disk_state.worker_shared_state);
388
let worker_shared_state = worker_shared_state.lock().await;
389
390
if disk_state.read_only {
391
error!("Attempted to resize read-only block device");
392
return DiskControlResult::Err(SysError::new(libc::EROFS));
393
}
394
395
info!("Resizing block device to {} bytes", new_size);
396
397
if let Err(e) = disk_state.disk_image.set_len(new_size) {
398
error!("Resizing disk failed! {:#}", e);
399
return DiskControlResult::Err(SysError::new(libc::EIO));
400
}
401
402
// Allocate new space if the disk image is not sparse.
403
if !disk_state.sparse {
404
if let Err(e) = disk_state.disk_image.allocate(0, new_size) {
405
error!("Allocating disk space after resize failed! {:#}", e);
406
return DiskControlResult::Err(SysError::new(libc::EIO));
407
}
408
}
409
410
if let Ok(new_disk_size) = disk_state.disk_image.get_len() {
411
worker_shared_state
412
.disk_size
413
.store(new_disk_size, Ordering::Release);
414
}
415
DiskControlResult::Ok
416
}
417
418
/// Periodically flushes the disk when the given timer fires.
419
async fn flush_disk(
420
disk_state: Rc<AsyncRwLock<DiskState>>,
421
timer: TimerAsync<Timer>,
422
armed: Rc<RefCell<bool>>,
423
) -> Result<(), ControlError> {
424
loop {
425
timer.wait().await.map_err(ControlError::FlushTimer)?;
426
if !*armed.borrow() {
427
continue;
428
}
429
430
// Reset armed before calling fdatasync to guarantee that IO requests that started after we
431
// call fdatasync will be committed eventually.
432
*armed.borrow_mut() = false;
433
434
disk_state
435
.read_lock()
436
.await
437
.disk_image
438
.fdatasync()
439
.await
440
.map_err(ControlError::FdatasyncDisk)?;
441
}
442
}
443
444
enum WorkerCmd {
445
StartQueue {
446
index: usize,
447
queue: Queue,
448
},
449
StopQueue {
450
index: usize,
451
// Once the queue is stopped, it will be sent back over `response_tx`.
452
// `None` indicates that there was no queue at the given index.
453
response_tx: oneshot::Sender<Option<Queue>>,
454
},
455
// Stop all queues without recovering the queues' state and without completing any queued up
456
// work .
457
AbortQueues {
458
// Once the queues are stopped, a `()` value will be sent back over `response_tx`.
459
response_tx: oneshot::Sender<()>,
460
},
461
}
462
463
// The main worker thread. Initialized the asynchronous worker tasks and passes them to the executor
464
// to be processed.
465
//
466
// `disk_state` is wrapped by `AsyncRwLock`, which provides both shared and exclusive locks. It's
467
// because the state can be read from the virtqueue task while the control task is processing a
468
// resizing command.
469
async fn run_worker(
470
ex: &Executor,
471
disk_state: &Rc<AsyncRwLock<DiskState>>,
472
control_tube: &Option<AsyncTube>,
473
mut worker_rx: mpsc::UnboundedReceiver<WorkerCmd>,
474
kill_evt: Event,
475
) -> anyhow::Result<()> {
476
// One flush timer per disk.
477
let timer = Timer::new().expect("Failed to create a timer");
478
let flush_timer_armed = Rc::new(RefCell::new(false));
479
480
// Handles control requests.
481
let control_interrupt = RefCell::new(None);
482
let control = handle_command_tube(control_tube, &control_interrupt, disk_state.clone()).fuse();
483
pin_mut!(control);
484
485
// Handle all the queues in one sub-select call.
486
let flush_timer = Rc::new(RefCell::new(
487
TimerAsync::new(
488
// Call try_clone() to share the same underlying FD with the `flush_disk` task.
489
timer.try_clone().expect("Failed to clone flush_timer"),
490
ex,
491
)
492
.expect("Failed to create an async timer"),
493
));
494
495
// Flushes the disk periodically.
496
let flush_timer2 = TimerAsync::new(timer, ex).expect("Failed to create an async timer");
497
let disk_flush = flush_disk(disk_state.clone(), flush_timer2, flush_timer_armed.clone()).fuse();
498
pin_mut!(disk_flush);
499
500
// Exit if the kill event is triggered.
501
let kill = async_utils::await_and_exit(ex, kill_evt).fuse();
502
pin_mut!(kill);
503
504
// Running queue handlers.
505
let mut queue_handlers = FuturesUnordered::new();
506
// Async stop functions for queue handlers, by queue index.
507
let mut queue_handler_stop_fns = std::collections::BTreeMap::new();
508
509
loop {
510
futures::select! {
511
_ = queue_handlers.next() => continue,
512
r = disk_flush => return r.context("failed to flush a disk"),
513
r = control => return r.context("failed to handle a control request"),
514
r = kill => return r.context("failed to wait on the kill event"),
515
worker_cmd = worker_rx.next() => {
516
match worker_cmd {
517
None => anyhow::bail!("worker control channel unexpectedly closed"),
518
Some(WorkerCmd::StartQueue{index, queue}) => {
519
if control_interrupt.borrow().is_none() {
520
*control_interrupt.borrow_mut() = Some(queue.interrupt().clone());
521
}
522
523
let (tx, rx) = oneshot::channel();
524
let kick_evt = queue.event().try_clone().expect("Failed to clone queue event");
525
let (handle_queue_future, remote_handle) = handle_queue(
526
Rc::clone(disk_state),
527
queue,
528
EventAsync::new(kick_evt, ex).expect("Failed to create async event for queue"),
529
Rc::clone(&flush_timer),
530
Rc::clone(&flush_timer_armed),
531
rx,
532
).remote_handle();
533
let old_stop_fn = queue_handler_stop_fns.insert(index, move || {
534
// Ask the handler to stop.
535
tx.send(()).unwrap_or_else(|_| panic!("queue handler channel closed early"));
536
// Wait for its return value.
537
remote_handle
538
});
539
540
// If there was already a handler for this index, stop it before adding the
541
// new handler future.
542
if let Some(stop_fn) = old_stop_fn {
543
warn!("Starting new queue handler without stopping old handler");
544
// Unfortunately we can't just do `stop_fn().await` because the actual
545
// work we are waiting on is in `queue_handlers`. So, run both.
546
let mut fut = stop_fn().fuse();
547
loop {
548
futures::select! {
549
_ = queue_handlers.next() => continue,
550
_queue = fut => break,
551
}
552
}
553
}
554
555
queue_handlers.push(handle_queue_future);
556
}
557
Some(WorkerCmd::StopQueue{index, response_tx}) => {
558
match queue_handler_stop_fns.remove(&index) {
559
Some(stop_fn) => {
560
// NOTE: This await is blocking the select loop. If we want to
561
// support stopping queues concurrently, then it needs to be moved.
562
// For now, keep it simple.
563
//
564
// Unfortunately we can't just do `stop_fn().await` because the
565
// actual work we are waiting on is in `queue_handlers`. So, run
566
// both.
567
let mut fut = stop_fn().fuse();
568
let queue = loop {
569
futures::select! {
570
_ = queue_handlers.next() => continue,
571
queue = fut => break queue,
572
}
573
};
574
575
// If this is the last queue, drop references to the interrupt so
576
// that, when queues are started up again, we'll use the new
577
// interrupt passed with the first queue.
578
if queue_handlers.is_empty() {
579
*control_interrupt.borrow_mut() = None;
580
}
581
582
let _ = response_tx.send(Some(queue));
583
}
584
None => { let _ = response_tx.send(None); },
585
}
586
587
}
588
Some(WorkerCmd::AbortQueues{response_tx}) => {
589
queue_handlers.clear();
590
queue_handler_stop_fns.clear();
591
592
*control_interrupt.borrow_mut() = None;
593
594
let _ = response_tx.send(());
595
}
596
}
597
}
598
};
599
}
600
}
601
602
/// Virtio device for exposing block level read/write operations on a host file.
603
pub struct BlockAsync {
604
// We need to make boot_index public bc the field is used by the main crate to determine boot
605
// order
606
boot_index: Option<usize>,
607
// `None` iff `self.worker_per_queue == false` and the worker thread is running.
608
disk_image: Option<Box<dyn DiskFile>>,
609
disk_size: Arc<AtomicU64>,
610
avail_features: u64,
611
read_only: bool,
612
sparse: bool,
613
seg_max: u32,
614
block_size: u32,
615
id: BlockId,
616
control_tube: Option<Tube>,
617
queue_sizes: Vec<u16>,
618
pub(super) executor_kind: ExecutorKind,
619
// If `worker_per_queue == true`, `worker_threads` contains the worker for each running queue
620
// by index. Otherwise, contains the monolithic worker for all queues at index 0.
621
//
622
// Once a thread is started, we never stop it, except when `BlockAsync` itself is dropped. That
623
// is because we cannot easily convert the `AsyncDisk` back to a `DiskFile` when backed by
624
// Overlapped I/O on Windows because the file becomes permanently associated with the IOCP
625
// instance of the async executor.
626
worker_threads: BTreeMap<usize, (WorkerThread<()>, mpsc::UnboundedSender<WorkerCmd>)>,
627
shared_state: Arc<AsyncRwLock<WorkerSharedState>>,
628
// Whether to run worker threads in parallel for each queue
629
worker_per_queue: bool,
630
// Indices of running queues.
631
// TODO: The worker already tracks this. Only need it here to stop queues on sleep. Maybe add a
632
// worker cmd to stop all at once, then we can delete this field.
633
activated_queues: BTreeSet<usize>,
634
#[cfg(windows)]
635
pub(super) io_concurrency: u32,
636
pci_address: Option<PciAddress>,
637
}
638
639
impl BlockAsync {
640
/// Create a new virtio block device that operates on the given AsyncDisk.
641
pub fn new(
642
base_features: u64,
643
disk_image: Box<dyn DiskFile>,
644
disk_option: &DiskOption,
645
control_tube: Option<Tube>,
646
queue_size: Option<u16>,
647
num_queues: Option<u16>,
648
) -> SysResult<BlockAsync> {
649
let read_only = disk_option.read_only;
650
let sparse = disk_option.sparse;
651
let block_size = disk_option.block_size;
652
let packed_queue = disk_option.packed_queue;
653
let id = disk_option.id.unwrap_or_default();
654
let mut worker_per_queue = disk_option.multiple_workers;
655
// Automatically disable multiple workers if the disk image can't be cloned.
656
if worker_per_queue && disk_image.try_clone().is_err() {
657
base::warn!("multiple workers requested, but not supported by disk image type");
658
worker_per_queue = false;
659
}
660
let executor_kind = disk_option.async_executor.unwrap_or_default();
661
let boot_index = disk_option.bootindex;
662
#[cfg(windows)]
663
let io_concurrency = disk_option.io_concurrency.get();
664
665
if block_size % SECTOR_SIZE as u32 != 0 {
666
error!(
667
"Block size {} is not a multiple of {}.",
668
block_size, SECTOR_SIZE,
669
);
670
return Err(SysError::new(libc::EINVAL));
671
}
672
let disk_size = disk_image.get_len()?;
673
if disk_size % block_size as u64 != 0 {
674
warn!(
675
"Disk size {} is not a multiple of block size {}; \
676
the remainder will not be visible to the guest.",
677
disk_size, block_size,
678
);
679
}
680
let num_queues = num_queues.unwrap_or(DEFAULT_NUM_QUEUES);
681
let multi_queue = match num_queues {
682
0 => panic!("Number of queues cannot be zero for a block device"),
683
1 => false,
684
_ => true,
685
};
686
let q_size = queue_size.unwrap_or(DEFAULT_QUEUE_SIZE);
687
if !q_size.is_power_of_two() {
688
error!("queue size {} is not a power of 2.", q_size);
689
return Err(SysError::new(libc::EINVAL));
690
}
691
let queue_sizes = vec![q_size; num_queues as usize];
692
693
let avail_features =
694
Self::build_avail_features(base_features, read_only, sparse, multi_queue, packed_queue);
695
696
let seg_max = get_seg_max(q_size);
697
698
let disk_size = Arc::new(AtomicU64::new(disk_size));
699
let shared_state = Arc::new(AsyncRwLock::new(WorkerSharedState {
700
disk_size: disk_size.clone(),
701
}));
702
703
Ok(BlockAsync {
704
disk_image: Some(disk_image),
705
disk_size,
706
avail_features,
707
read_only,
708
sparse,
709
seg_max,
710
block_size,
711
id,
712
queue_sizes,
713
worker_threads: BTreeMap::new(),
714
shared_state,
715
worker_per_queue,
716
control_tube,
717
executor_kind,
718
activated_queues: BTreeSet::new(),
719
boot_index,
720
#[cfg(windows)]
721
io_concurrency,
722
pci_address: disk_option.pci_address,
723
})
724
}
725
726
/// Returns the feature flags given the specified attributes.
727
fn build_avail_features(
728
base_features: u64,
729
read_only: bool,
730
sparse: bool,
731
multi_queue: bool,
732
packed_queue: bool,
733
) -> u64 {
734
let mut avail_features = base_features;
735
if read_only {
736
avail_features |= 1 << VIRTIO_BLK_F_RO;
737
} else {
738
if sparse {
739
avail_features |= 1 << VIRTIO_BLK_F_DISCARD;
740
}
741
avail_features |= 1 << VIRTIO_BLK_F_FLUSH;
742
avail_features |= 1 << VIRTIO_BLK_F_WRITE_ZEROES;
743
}
744
avail_features |= 1 << VIRTIO_BLK_F_SEG_MAX;
745
avail_features |= 1 << VIRTIO_BLK_F_BLK_SIZE;
746
if multi_queue {
747
avail_features |= 1 << VIRTIO_BLK_F_MQ;
748
}
749
if packed_queue {
750
avail_features |= 1 << VIRTIO_F_RING_PACKED;
751
}
752
avail_features
753
}
754
755
// Execute a single block device request.
756
// `writer` includes the data region only; the status byte is not included.
757
// It is up to the caller to convert the result of this function into a status byte
758
// and write it to the expected location in guest memory.
759
async fn execute_request(
760
reader: &mut Reader,
761
writer: &mut Writer,
762
disk_state: &AsyncRwLock<DiskState>,
763
flush_timer: &RefCell<TimerAsync<Timer>>,
764
flush_timer_armed: &RefCell<bool>,
765
) -> result::Result<(), ExecuteError> {
766
// Acquire immutable access to prevent tasks from resizing disk.
767
let disk_state = disk_state.read_lock().await;
768
// Acquire immutable access to prevent other worker threads from resizing disk.
769
let worker_shared_state = disk_state.worker_shared_state.read_lock().await;
770
771
let req_header: virtio_blk_req_header = reader.read_obj().map_err(ExecuteError::Read)?;
772
773
let req_type = req_header.req_type.to_native();
774
let sector = req_header.sector.to_native();
775
776
if disk_state.read_only && req_type != VIRTIO_BLK_T_IN && req_type != VIRTIO_BLK_T_GET_ID {
777
return Err(ExecuteError::ReadOnly {
778
request_type: req_type,
779
});
780
}
781
782
/// Check that a request accesses only data within the disk's current size.
783
/// All parameters are in units of bytes.
784
fn check_range(
785
io_start: u64,
786
io_length: u64,
787
disk_size: u64,
788
) -> result::Result<(), ExecuteError> {
789
let io_end = io_start
790
.checked_add(io_length)
791
.ok_or(ExecuteError::OutOfRange)?;
792
if io_end > disk_size {
793
Err(ExecuteError::OutOfRange)
794
} else {
795
Ok(())
796
}
797
}
798
799
let disk_size = worker_shared_state.disk_size.load(Ordering::Relaxed);
800
match req_type {
801
VIRTIO_BLK_T_IN => {
802
let data_len = writer.available_bytes();
803
if data_len == 0 {
804
return Ok(());
805
}
806
let offset = sector
807
.checked_shl(u32::from(SECTOR_SHIFT))
808
.ok_or(ExecuteError::OutOfRange)?;
809
check_range(offset, data_len as u64, disk_size)?;
810
let disk_image = &disk_state.disk_image;
811
writer
812
.write_all_from_at_fut(&**disk_image, data_len, offset)
813
.await
814
.map_err(|desc_error| ExecuteError::ReadIo {
815
length: data_len,
816
sector,
817
desc_error,
818
})?;
819
}
820
VIRTIO_BLK_T_OUT => {
821
let data_len = reader.available_bytes();
822
if data_len == 0 {
823
return Ok(());
824
}
825
let offset = sector
826
.checked_shl(u32::from(SECTOR_SHIFT))
827
.ok_or(ExecuteError::OutOfRange)?;
828
check_range(offset, data_len as u64, disk_size)?;
829
let disk_image = &disk_state.disk_image;
830
reader
831
.read_exact_to_at_fut(&**disk_image, data_len, offset)
832
.await
833
.map_err(|desc_error| ExecuteError::WriteIo {
834
length: data_len,
835
sector,
836
desc_error,
837
})?;
838
839
if !*flush_timer_armed.borrow() {
840
*flush_timer_armed.borrow_mut() = true;
841
842
let flush_delay = Duration::from_secs(60);
843
flush_timer
844
.borrow_mut()
845
.reset_oneshot(flush_delay)
846
.map_err(ExecuteError::TimerReset)?;
847
}
848
}
849
VIRTIO_BLK_T_DISCARD | VIRTIO_BLK_T_WRITE_ZEROES => {
850
if req_type == VIRTIO_BLK_T_DISCARD && !disk_state.sparse {
851
// Discard is a hint; if this is a non-sparse disk, just ignore it.
852
return Ok(());
853
}
854
855
let seg_count =
856
reader.available_bytes() / size_of::<virtio_blk_discard_write_zeroes>();
857
let seg_max = if req_type == VIRTIO_BLK_T_DISCARD {
858
MAX_DISCARD_SEG as usize
859
} else {
860
MAX_WRITE_ZEROES_SEG as usize
861
};
862
if seg_count > seg_max {
863
return Err(ExecuteError::TooManySegments(seg_count, seg_max));
864
}
865
866
while reader.available_bytes() >= size_of::<virtio_blk_discard_write_zeroes>() {
867
let seg: virtio_blk_discard_write_zeroes =
868
reader.read_obj().map_err(ExecuteError::Read)?;
869
870
let sector = seg.sector.to_native();
871
let num_sectors = seg.num_sectors.to_native();
872
let flags = seg.flags.to_native();
873
874
let valid_flags = if req_type == VIRTIO_BLK_T_WRITE_ZEROES {
875
VIRTIO_BLK_DISCARD_WRITE_ZEROES_FLAG_UNMAP
876
} else {
877
0
878
};
879
880
if (flags & !valid_flags) != 0 {
881
return Err(ExecuteError::DiscardWriteZeroes {
882
ioerr: None,
883
sector,
884
num_sectors,
885
flags,
886
});
887
}
888
889
let offset = sector
890
.checked_shl(u32::from(SECTOR_SHIFT))
891
.ok_or(ExecuteError::OutOfRange)?;
892
let length = u64::from(num_sectors)
893
.checked_shl(u32::from(SECTOR_SHIFT))
894
.ok_or(ExecuteError::OutOfRange)?;
895
check_range(offset, length, disk_size)?;
896
897
if req_type == VIRTIO_BLK_T_DISCARD {
898
// Since Discard is just a hint and some filesystems may not implement
899
// FALLOC_FL_PUNCH_HOLE, ignore punch_hole errors.
900
let _ = disk_state.disk_image.punch_hole(offset, length).await;
901
} else {
902
disk_state
903
.disk_image
904
.write_zeroes_at(offset, length)
905
.await
906
.map_err(|e| ExecuteError::DiscardWriteZeroes {
907
ioerr: Some(e),
908
sector,
909
num_sectors,
910
flags,
911
})?;
912
}
913
}
914
}
915
VIRTIO_BLK_T_FLUSH => {
916
disk_state
917
.disk_image
918
.fdatasync()
919
.await
920
.map_err(ExecuteError::Flush)?;
921
922
if *flush_timer_armed.borrow() {
923
flush_timer
924
.borrow_mut()
925
.clear()
926
.map_err(ExecuteError::TimerReset)?;
927
*flush_timer_armed.borrow_mut() = false;
928
}
929
}
930
VIRTIO_BLK_T_GET_ID => {
931
writer
932
.write_all(&disk_state.id)
933
.map_err(ExecuteError::CopyId)?;
934
}
935
t => return Err(ExecuteError::Unsupported(t)),
936
};
937
Ok(())
938
}
939
940
/// Builds and returns the config structure used to specify block features.
941
fn build_config_space(
942
disk_size: u64,
943
seg_max: u32,
944
block_size: u32,
945
num_queues: u16,
946
) -> virtio_blk_config {
947
virtio_blk_config {
948
// If the image is not a multiple of the sector size, the tail bits are not exposed.
949
capacity: Le64::from(disk_size >> SECTOR_SHIFT),
950
seg_max: Le32::from(seg_max),
951
blk_size: Le32::from(block_size),
952
num_queues: Le16::from(num_queues),
953
max_discard_sectors: Le32::from(MAX_DISCARD_SECTORS),
954
discard_sector_alignment: Le32::from(DISCARD_SECTOR_ALIGNMENT),
955
max_write_zeroes_sectors: Le32::from(MAX_WRITE_ZEROES_SECTORS),
956
write_zeroes_may_unmap: 1,
957
max_discard_seg: Le32::from(MAX_DISCARD_SEG),
958
max_write_zeroes_seg: Le32::from(MAX_WRITE_ZEROES_SEG),
959
..Default::default()
960
}
961
}
962
963
/// Get the worker for a queue, starting it if necessary.
964
// NOTE: Can't use `BTreeMap::entry` because it requires an exclusive ref for the whole branch.
965
#[allow(clippy::map_entry)]
966
fn start_worker(
967
&mut self,
968
idx: usize,
969
) -> anyhow::Result<&(WorkerThread<()>, mpsc::UnboundedSender<WorkerCmd>)> {
970
let key = if self.worker_per_queue { idx } else { 0 };
971
if self.worker_threads.contains_key(&key) {
972
return Ok(self.worker_threads.get(&key).unwrap());
973
}
974
975
let ex = self.create_executor();
976
let control_tube = self.control_tube.take();
977
let disk_image = if self.worker_per_queue {
978
self.disk_image
979
.as_ref()
980
.context("Failed to ref a disk image")?
981
.try_clone()
982
.context("Failed to clone a disk image")?
983
} else {
984
self.disk_image
985
.take()
986
.context("Failed to take a disk image")?
987
};
988
let read_only = self.read_only;
989
let sparse = self.sparse;
990
let id = self.id;
991
let worker_shared_state = self.shared_state.clone();
992
993
let (worker_tx, worker_rx) = mpsc::unbounded();
994
let worker_thread = WorkerThread::start("virtio_blk", move |kill_evt| {
995
let async_control =
996
control_tube.map(|c| AsyncTube::new(&ex, c).expect("failed to create async tube"));
997
998
let async_image = match disk_image.to_async_disk(&ex) {
999
Ok(d) => d,
1000
Err(e) => panic!("Failed to create async disk {e:#}"),
1001
};
1002
1003
let disk_state = Rc::new(AsyncRwLock::new(DiskState {
1004
disk_image: async_image,
1005
read_only,
1006
sparse,
1007
id,
1008
worker_shared_state,
1009
}));
1010
1011
if let Err(err_string) = ex
1012
.run_until(async {
1013
let r = run_worker(&ex, &disk_state, &async_control, worker_rx, kill_evt).await;
1014
// Flush any in-memory disk image state to file.
1015
if let Err(e) = disk_state.lock().await.disk_image.flush().await {
1016
error!("failed to flush disk image when stopping worker: {e:?}");
1017
}
1018
r
1019
})
1020
.expect("run_until failed")
1021
{
1022
error!("{:#}", err_string);
1023
}
1024
});
1025
match self.worker_threads.entry(key) {
1026
std::collections::btree_map::Entry::Occupied(_) => unreachable!(),
1027
std::collections::btree_map::Entry::Vacant(e) => {
1028
Ok(e.insert((worker_thread, worker_tx)))
1029
}
1030
}
1031
}
1032
1033
pub fn start_queue(
1034
&mut self,
1035
idx: usize,
1036
queue: Queue,
1037
_mem: GuestMemory,
1038
) -> anyhow::Result<()> {
1039
let (_, worker_tx) = self.start_worker(idx)?;
1040
worker_tx
1041
.unbounded_send(WorkerCmd::StartQueue { index: idx, queue })
1042
.expect("worker channel closed early");
1043
self.activated_queues.insert(idx);
1044
Ok(())
1045
}
1046
1047
pub fn stop_queue(&mut self, idx: usize) -> anyhow::Result<Queue> {
1048
// TODO: Consider stopping the worker thread if this is the last queue managed by it. Then,
1049
// simplify `virtio_sleep` and/or `reset` methods.
1050
let (_, worker_tx) = self
1051
.worker_threads
1052
.get(if self.worker_per_queue { &idx } else { &0 })
1053
.context("worker not found")?;
1054
let (response_tx, response_rx) = oneshot::channel();
1055
worker_tx
1056
.unbounded_send(WorkerCmd::StopQueue {
1057
index: idx,
1058
response_tx,
1059
})
1060
.expect("worker channel closed early");
1061
let queue = cros_async::block_on(async {
1062
response_rx
1063
.await
1064
.expect("response_rx closed early")
1065
.context("queue not found")
1066
})?;
1067
self.activated_queues.remove(&idx);
1068
Ok(queue)
1069
}
1070
}
1071
1072
impl VirtioDevice for BlockAsync {
1073
fn keep_rds(&self) -> Vec<RawDescriptor> {
1074
let mut keep_rds = Vec::new();
1075
1076
if let Some(disk_image) = &self.disk_image {
1077
keep_rds.extend(disk_image.as_raw_descriptors());
1078
}
1079
1080
if let Some(control_tube) = &self.control_tube {
1081
keep_rds.push(control_tube.as_raw_descriptor());
1082
}
1083
1084
keep_rds
1085
}
1086
1087
fn features(&self) -> u64 {
1088
self.avail_features
1089
}
1090
1091
fn device_type(&self) -> DeviceType {
1092
DeviceType::Block
1093
}
1094
1095
fn queue_max_sizes(&self) -> &[u16] {
1096
&self.queue_sizes
1097
}
1098
1099
fn read_config(&self, offset: u64, data: &mut [u8]) {
1100
let config_space = {
1101
let disk_size = self.disk_size.load(Ordering::Acquire);
1102
Self::build_config_space(
1103
disk_size,
1104
self.seg_max,
1105
self.block_size,
1106
self.queue_sizes.len() as u16,
1107
)
1108
};
1109
copy_config(data, 0, config_space.as_bytes(), offset);
1110
}
1111
1112
fn activate(
1113
&mut self,
1114
mem: GuestMemory,
1115
_interrupt: Interrupt,
1116
queues: BTreeMap<usize, Queue>,
1117
) -> anyhow::Result<()> {
1118
for (i, q) in queues {
1119
self.start_queue(i, q, mem.clone())?;
1120
}
1121
Ok(())
1122
}
1123
1124
fn reset(&mut self) -> anyhow::Result<()> {
1125
for (_, (_, worker_tx)) in self.worker_threads.iter_mut() {
1126
let (response_tx, response_rx) = oneshot::channel();
1127
worker_tx
1128
.unbounded_send(WorkerCmd::AbortQueues { response_tx })
1129
.expect("worker channel closed early");
1130
cros_async::block_on(async { response_rx.await.expect("response_rx closed early") });
1131
}
1132
self.activated_queues.clear();
1133
Ok(())
1134
}
1135
1136
fn virtio_sleep(&mut self) -> anyhow::Result<Option<BTreeMap<usize, Queue>>> {
1137
// Reclaim the queues from workers.
1138
let mut queues = BTreeMap::new();
1139
for index in self.activated_queues.clone() {
1140
queues.insert(index, self.stop_queue(index)?);
1141
}
1142
if queues.is_empty() {
1143
return Ok(None); // Not activated.
1144
}
1145
Ok(Some(queues))
1146
}
1147
1148
fn virtio_wake(
1149
&mut self,
1150
queues_state: Option<(GuestMemory, Interrupt, BTreeMap<usize, Queue>)>,
1151
) -> anyhow::Result<()> {
1152
if let Some((mem, _interrupt, queues)) = queues_state {
1153
for (i, q) in queues {
1154
self.start_queue(i, q, mem.clone())?
1155
}
1156
}
1157
Ok(())
1158
}
1159
1160
fn virtio_snapshot(&mut self) -> anyhow::Result<AnySnapshot> {
1161
// `virtio_sleep` ensures there is no pending state, except for the `Queue`s, which are
1162
// handled at a higher layer.
1163
AnySnapshot::to_any(())
1164
}
1165
1166
fn virtio_restore(&mut self, data: AnySnapshot) -> anyhow::Result<()> {
1167
let () = AnySnapshot::from_any(data)?;
1168
Ok(())
1169
}
1170
1171
fn pci_address(&self) -> Option<PciAddress> {
1172
self.pci_address
1173
}
1174
1175
fn bootorder_fw_cfg(&self, pci_slot: u8) -> Option<(Vec<u8>, usize)> {
1176
self.boot_index
1177
.map(|s| (format!("scsi@{pci_slot}/disk@0,0").as_bytes().to_vec(), s))
1178
}
1179
}
1180
1181
#[cfg(test)]
1182
mod tests {
1183
use std::fs::File;
1184
use std::mem::size_of_val;
1185
use std::sync::atomic::AtomicU64;
1186
1187
use data_model::Le32;
1188
use data_model::Le64;
1189
use disk::SingleFileDisk;
1190
use hypervisor::ProtectionType;
1191
use tempfile::tempfile;
1192
use tempfile::TempDir;
1193
use vm_memory::GuestAddress;
1194
1195
use super::*;
1196
use crate::suspendable_virtio_tests;
1197
use crate::virtio::base_features;
1198
use crate::virtio::descriptor_utils::create_descriptor_chain;
1199
use crate::virtio::descriptor_utils::DescriptorType;
1200
use crate::virtio::QueueConfig;
1201
1202
#[test]
1203
fn read_size() {
1204
let f = tempfile().unwrap();
1205
f.set_len(0x1000).unwrap();
1206
1207
let features = base_features(ProtectionType::Unprotected);
1208
let disk_option = DiskOption::default();
1209
let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1210
let mut num_sectors = [0u8; 4];
1211
b.read_config(0, &mut num_sectors);
1212
// size is 0x1000, so num_sectors is 8 (4096/512).
1213
assert_eq!([0x08, 0x00, 0x00, 0x00], num_sectors);
1214
let mut msw_sectors = [0u8; 4];
1215
b.read_config(4, &mut msw_sectors);
1216
// size is 0x1000, so msw_sectors is 0.
1217
assert_eq!([0x00, 0x00, 0x00, 0x00], msw_sectors);
1218
}
1219
1220
#[test]
1221
fn read_block_size() {
1222
let f = tempfile().unwrap();
1223
f.set_len(0x1000).unwrap();
1224
1225
let features = base_features(ProtectionType::Unprotected);
1226
let disk_option = DiskOption {
1227
block_size: 4096,
1228
sparse: false,
1229
..Default::default()
1230
};
1231
let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1232
let mut blk_size = [0u8; 4];
1233
b.read_config(20, &mut blk_size);
1234
// blk_size should be 4096 (0x1000).
1235
assert_eq!([0x00, 0x10, 0x00, 0x00], blk_size);
1236
}
1237
1238
#[test]
1239
fn read_features() {
1240
let tempdir = TempDir::new().unwrap();
1241
let mut path = tempdir.path().to_owned();
1242
path.push("disk_image");
1243
1244
// Feature bits 0-23 and 50-127 are specific for the device type, but
1245
// at the moment crosvm only supports 64 bits of feature bits.
1246
const DEVICE_FEATURE_BITS: u64 = 0xffffff;
1247
1248
// read-write block device
1249
{
1250
let f = File::create(&path).unwrap();
1251
let features = base_features(ProtectionType::Unprotected);
1252
let disk_option = DiskOption::default();
1253
let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1254
// writable device should set VIRTIO_BLK_F_FLUSH + VIRTIO_BLK_F_DISCARD
1255
// + VIRTIO_BLK_F_WRITE_ZEROES + VIRTIO_BLK_F_BLK_SIZE + VIRTIO_BLK_F_SEG_MAX
1256
// + VIRTIO_BLK_F_MQ
1257
assert_eq!(0x7244, b.features() & DEVICE_FEATURE_BITS);
1258
}
1259
1260
// read-write block device, non-sparse
1261
{
1262
let f = File::create(&path).unwrap();
1263
let features = base_features(ProtectionType::Unprotected);
1264
let disk_option = DiskOption {
1265
sparse: false,
1266
..Default::default()
1267
};
1268
let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1269
// writable device should set VIRTIO_F_FLUSH + VIRTIO_BLK_F_RO
1270
// + VIRTIO_BLK_F_BLK_SIZE + VIRTIO_BLK_F_SEG_MAX + VIRTIO_BLK_F_MQ
1271
assert_eq!(0x5244, b.features() & DEVICE_FEATURE_BITS);
1272
}
1273
1274
// read-only block device
1275
{
1276
let f = File::create(&path).unwrap();
1277
let features = base_features(ProtectionType::Unprotected);
1278
let disk_option = DiskOption {
1279
read_only: true,
1280
..Default::default()
1281
};
1282
let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1283
// read-only device should set VIRTIO_BLK_F_RO
1284
// + VIRTIO_BLK_F_BLK_SIZE + VIRTIO_BLK_F_SEG_MAX + VIRTIO_BLK_F_MQ
1285
assert_eq!(0x1064, b.features() & DEVICE_FEATURE_BITS);
1286
}
1287
}
1288
1289
#[test]
1290
fn check_pci_adress_configurability() {
1291
let f = tempfile().unwrap();
1292
1293
let features = base_features(ProtectionType::Unprotected);
1294
let disk_option = DiskOption {
1295
pci_address: Some(PciAddress {
1296
bus: 0,
1297
dev: 1,
1298
func: 1,
1299
}),
1300
..Default::default()
1301
};
1302
let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1303
1304
assert_eq!(b.pci_address(), disk_option.pci_address);
1305
}
1306
1307
#[test]
1308
fn check_runtime_blk_queue_configurability() {
1309
let tempdir = TempDir::new().unwrap();
1310
let mut path = tempdir.path().to_owned();
1311
path.push("disk_image");
1312
let features = base_features(ProtectionType::Unprotected);
1313
1314
// Default case
1315
let f = File::create(&path).unwrap();
1316
let disk_option = DiskOption::default();
1317
let b = BlockAsync::new(features, Box::new(f), &disk_option, None, None, None).unwrap();
1318
assert_eq!(
1319
[DEFAULT_QUEUE_SIZE; DEFAULT_NUM_QUEUES as usize],
1320
b.queue_max_sizes()
1321
);
1322
1323
// Single queue of size 128
1324
let f = File::create(&path).unwrap();
1325
let disk_option = DiskOption::default();
1326
let b = BlockAsync::new(
1327
features,
1328
Box::new(f),
1329
&disk_option,
1330
None,
1331
Some(128),
1332
Some(1),
1333
)
1334
.unwrap();
1335
assert_eq!([128; 1], b.queue_max_sizes());
1336
// Single queue device should not set VIRTIO_BLK_F_MQ
1337
assert_eq!(0, b.features() & (1 << VIRTIO_BLK_F_MQ) as u64);
1338
}
1339
1340
#[test]
1341
fn read_last_sector() {
1342
let ex = Executor::new().expect("creating an executor failed");
1343
1344
let f = tempfile().unwrap();
1345
let disk_size = 0x1000;
1346
f.set_len(disk_size).unwrap();
1347
let af = SingleFileDisk::new(f, &ex).expect("Failed to create SFD");
1348
1349
let mem = Rc::new(
1350
GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1351
.expect("Creating guest memory failed."),
1352
);
1353
1354
let req_hdr = virtio_blk_req_header {
1355
req_type: Le32::from(VIRTIO_BLK_T_IN),
1356
reserved: Le32::from(0),
1357
sector: Le64::from(7), // Disk is 8 sectors long, so this is the last valid sector.
1358
};
1359
mem.write_obj_at_addr(req_hdr, GuestAddress(0x1000))
1360
.expect("writing req failed");
1361
1362
let mut avail_desc = create_descriptor_chain(
1363
&mem,
1364
GuestAddress(0x100), // Place descriptor chain at 0x100.
1365
GuestAddress(0x1000), // Describe buffer at 0x1000.
1366
vec![
1367
// Request header
1368
(DescriptorType::Readable, size_of_val(&req_hdr) as u32),
1369
// I/O buffer (1 sector of data)
1370
(DescriptorType::Writable, 512),
1371
// Request status
1372
(DescriptorType::Writable, 1),
1373
],
1374
0,
1375
)
1376
.expect("create_descriptor_chain failed");
1377
1378
let timer = Timer::new().expect("Failed to create a timer");
1379
let flush_timer = Rc::new(RefCell::new(
1380
TimerAsync::new(timer, &ex).expect("Failed to create an async timer"),
1381
));
1382
let flush_timer_armed = Rc::new(RefCell::new(false));
1383
1384
let disk_state = Rc::new(AsyncRwLock::new(DiskState {
1385
disk_image: Box::new(af),
1386
read_only: false,
1387
sparse: true,
1388
id: Default::default(),
1389
worker_shared_state: Arc::new(AsyncRwLock::new(WorkerSharedState {
1390
disk_size: Arc::new(AtomicU64::new(disk_size)),
1391
})),
1392
}));
1393
1394
let fut = process_one_request(
1395
&mut avail_desc,
1396
&disk_state,
1397
&flush_timer,
1398
&flush_timer_armed,
1399
);
1400
1401
ex.run_until(fut)
1402
.expect("running executor failed")
1403
.expect("execute failed");
1404
1405
let status_offset = GuestAddress((0x1000 + size_of_val(&req_hdr) + 512) as u64);
1406
let status = mem.read_obj_from_addr::<u8>(status_offset).unwrap();
1407
assert_eq!(status, VIRTIO_BLK_S_OK);
1408
}
1409
1410
#[test]
1411
fn read_beyond_last_sector() {
1412
let f = tempfile().unwrap();
1413
let disk_size = 0x1000;
1414
f.set_len(disk_size).unwrap();
1415
let mem = Rc::new(
1416
GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1417
.expect("Creating guest memory failed."),
1418
);
1419
1420
let req_hdr = virtio_blk_req_header {
1421
req_type: Le32::from(VIRTIO_BLK_T_IN),
1422
reserved: Le32::from(0),
1423
sector: Le64::from(7), // Disk is 8 sectors long, so this is the last valid sector.
1424
};
1425
mem.write_obj_at_addr(req_hdr, GuestAddress(0x1000))
1426
.expect("writing req failed");
1427
1428
let mut avail_desc = create_descriptor_chain(
1429
&mem,
1430
GuestAddress(0x100), // Place descriptor chain at 0x100.
1431
GuestAddress(0x1000), // Describe buffer at 0x1000.
1432
vec![
1433
// Request header
1434
(DescriptorType::Readable, size_of_val(&req_hdr) as u32),
1435
// I/O buffer (2 sectors of data - overlap the end of the disk).
1436
(DescriptorType::Writable, 512 * 2),
1437
// Request status
1438
(DescriptorType::Writable, 1),
1439
],
1440
0,
1441
)
1442
.expect("create_descriptor_chain failed");
1443
1444
let ex = Executor::new().expect("creating an executor failed");
1445
1446
let af = SingleFileDisk::new(f, &ex).expect("Failed to create SFD");
1447
let timer = Timer::new().expect("Failed to create a timer");
1448
let flush_timer = Rc::new(RefCell::new(
1449
TimerAsync::new(timer, &ex).expect("Failed to create an async timer"),
1450
));
1451
let flush_timer_armed = Rc::new(RefCell::new(false));
1452
let disk_state = Rc::new(AsyncRwLock::new(DiskState {
1453
disk_image: Box::new(af),
1454
read_only: false,
1455
sparse: true,
1456
id: Default::default(),
1457
worker_shared_state: Arc::new(AsyncRwLock::new(WorkerSharedState {
1458
disk_size: Arc::new(AtomicU64::new(disk_size)),
1459
})),
1460
}));
1461
1462
let fut = process_one_request(
1463
&mut avail_desc,
1464
&disk_state,
1465
&flush_timer,
1466
&flush_timer_armed,
1467
);
1468
1469
ex.run_until(fut)
1470
.expect("running executor failed")
1471
.expect("execute failed");
1472
1473
let status_offset = GuestAddress((0x1000 + size_of_val(&req_hdr) + 512 * 2) as u64);
1474
let status = mem.read_obj_from_addr::<u8>(status_offset).unwrap();
1475
assert_eq!(status, VIRTIO_BLK_S_IOERR);
1476
}
1477
1478
#[test]
1479
fn get_id() {
1480
let ex = Executor::new().expect("creating an executor failed");
1481
1482
let f = tempfile().unwrap();
1483
let disk_size = 0x1000;
1484
f.set_len(disk_size).unwrap();
1485
1486
let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1487
.expect("Creating guest memory failed.");
1488
1489
let req_hdr = virtio_blk_req_header {
1490
req_type: Le32::from(VIRTIO_BLK_T_GET_ID),
1491
reserved: Le32::from(0),
1492
sector: Le64::from(0),
1493
};
1494
mem.write_obj_at_addr(req_hdr, GuestAddress(0x1000))
1495
.expect("writing req failed");
1496
1497
let mut avail_desc = create_descriptor_chain(
1498
&mem,
1499
GuestAddress(0x100), // Place descriptor chain at 0x100.
1500
GuestAddress(0x1000), // Describe buffer at 0x1000.
1501
vec![
1502
// Request header
1503
(DescriptorType::Readable, size_of_val(&req_hdr) as u32),
1504
// I/O buffer (20 bytes for serial)
1505
(DescriptorType::Writable, 20),
1506
// Request status
1507
(DescriptorType::Writable, 1),
1508
],
1509
0,
1510
)
1511
.expect("create_descriptor_chain failed");
1512
1513
let af = SingleFileDisk::new(f, &ex).expect("Failed to create SFD");
1514
let timer = Timer::new().expect("Failed to create a timer");
1515
let flush_timer = Rc::new(RefCell::new(
1516
TimerAsync::new(timer, &ex).expect("Failed to create an async timer"),
1517
));
1518
let flush_timer_armed = Rc::new(RefCell::new(false));
1519
1520
let id = b"a20-byteserialnumber";
1521
1522
let disk_state = Rc::new(AsyncRwLock::new(DiskState {
1523
disk_image: Box::new(af),
1524
read_only: false,
1525
sparse: true,
1526
id: *id,
1527
worker_shared_state: Arc::new(AsyncRwLock::new(WorkerSharedState {
1528
disk_size: Arc::new(AtomicU64::new(disk_size)),
1529
})),
1530
}));
1531
1532
let fut = process_one_request(
1533
&mut avail_desc,
1534
&disk_state,
1535
&flush_timer,
1536
&flush_timer_armed,
1537
);
1538
1539
ex.run_until(fut)
1540
.expect("running executor failed")
1541
.expect("execute failed");
1542
1543
let status_offset = GuestAddress((0x1000 + size_of_val(&req_hdr) + 512) as u64);
1544
let status = mem.read_obj_from_addr::<u8>(status_offset).unwrap();
1545
assert_eq!(status, VIRTIO_BLK_S_OK);
1546
1547
let id_offset = GuestAddress(0x1000 + size_of_val(&req_hdr) as u64);
1548
let returned_id = mem.read_obj_from_addr::<[u8; 20]>(id_offset).unwrap();
1549
assert_eq!(returned_id, *id);
1550
}
1551
1552
#[test]
1553
fn reset_and_reactivate_single_worker() {
1554
reset_and_reactivate(false, None);
1555
}
1556
1557
#[test]
1558
fn reset_and_reactivate_multiple_workers() {
1559
reset_and_reactivate(true, None);
1560
}
1561
1562
#[test]
1563
#[cfg(windows)]
1564
fn reset_and_reactivate_overrlapped_io() {
1565
reset_and_reactivate(
1566
false,
1567
Some(
1568
cros_async::sys::windows::ExecutorKindSys::Overlapped { concurrency: None }.into(),
1569
),
1570
);
1571
}
1572
1573
fn reset_and_reactivate(
1574
enables_multiple_workers: bool,
1575
async_executor: Option<cros_async::ExecutorKind>,
1576
) {
1577
// Create an empty disk image
1578
let f = tempfile::NamedTempFile::new().unwrap();
1579
f.as_file().set_len(0x1000).unwrap();
1580
// Close the file so that it is possible for the disk implementation to take exclusive
1581
// access when opening it.
1582
let path: tempfile::TempPath = f.into_temp_path();
1583
1584
// Create an empty guest memory
1585
let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1586
.expect("Creating guest memory failed.");
1587
1588
// Create a control tube.
1589
// NOTE: We don't want to drop the vmm half of the tube. That would cause the worker thread
1590
// will immediately fail, which isn't what we want to test in this case.
1591
let (_control_tube, control_tube_device) = Tube::pair().unwrap();
1592
1593
// Create a BlockAsync to test
1594
let features = base_features(ProtectionType::Unprotected);
1595
let id = b"Block serial number\0";
1596
let disk_option = DiskOption {
1597
path: path.to_path_buf(),
1598
read_only: true,
1599
id: Some(*id),
1600
sparse: false,
1601
multiple_workers: enables_multiple_workers,
1602
async_executor,
1603
..Default::default()
1604
};
1605
let disk_image = disk_option.open().unwrap();
1606
let mut b = BlockAsync::new(
1607
features,
1608
disk_image,
1609
&disk_option,
1610
Some(control_tube_device),
1611
None,
1612
None,
1613
)
1614
.unwrap();
1615
1616
let interrupt = Interrupt::new_for_test();
1617
1618
// activate with queues of an arbitrary size.
1619
let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1620
q0.set_ready(true);
1621
let q0 = q0
1622
.activate(&mem, Event::new().unwrap(), interrupt.clone())
1623
.expect("QueueConfig::activate");
1624
1625
let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1626
q1.set_ready(true);
1627
let q1 = q1
1628
.activate(&mem, Event::new().unwrap(), interrupt.clone())
1629
.expect("QueueConfig::activate");
1630
1631
b.activate(mem.clone(), interrupt, BTreeMap::from([(0, q0), (1, q1)]))
1632
.expect("activate should succeed");
1633
// assert resources are consumed
1634
if !enables_multiple_workers {
1635
assert!(
1636
b.disk_image.is_none(),
1637
"BlockAsync should not have a disk image"
1638
);
1639
}
1640
assert!(
1641
b.control_tube.is_none(),
1642
"BlockAsync should not have a control tube"
1643
);
1644
assert_eq!(
1645
b.worker_threads.len(),
1646
if enables_multiple_workers { 2 } else { 1 }
1647
);
1648
1649
// reset and assert resources are still not back (should be in the worker thread)
1650
assert!(b.reset().is_ok(), "reset should succeed");
1651
if !enables_multiple_workers {
1652
assert!(
1653
b.disk_image.is_none(),
1654
"BlockAsync should not have a disk image"
1655
);
1656
}
1657
assert!(
1658
b.control_tube.is_none(),
1659
"BlockAsync should not have a control tube"
1660
);
1661
assert_eq!(
1662
b.worker_threads.len(),
1663
if enables_multiple_workers { 2 } else { 1 }
1664
);
1665
assert_eq!(b.id, *b"Block serial number\0");
1666
1667
// re-activate should succeed
1668
let interrupt = Interrupt::new_for_test();
1669
let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1670
q0.set_ready(true);
1671
let q0 = q0
1672
.activate(&mem, Event::new().unwrap(), interrupt.clone())
1673
.expect("QueueConfig::activate");
1674
1675
let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1676
q1.set_ready(true);
1677
let q1 = q1
1678
.activate(&mem, Event::new().unwrap(), interrupt.clone())
1679
.expect("QueueConfig::activate");
1680
1681
b.activate(mem, interrupt, BTreeMap::from([(0, q0), (1, q1)]))
1682
.expect("re-activate should succeed");
1683
}
1684
1685
#[test]
1686
fn resize_with_single_worker() {
1687
resize(false);
1688
}
1689
1690
#[test]
1691
fn resize_with_multiple_workers() {
1692
// Test resize handled by one worker affect the whole state
1693
resize(true);
1694
}
1695
1696
fn resize(enables_multiple_workers: bool) {
1697
// disk image size constants
1698
let original_size = 0x1000;
1699
let resized_size = 0x2000;
1700
1701
// Create an empty disk image
1702
let f = tempfile().unwrap();
1703
f.set_len(original_size).unwrap();
1704
let disk_image: Box<dyn DiskFile> = Box::new(f);
1705
assert_eq!(disk_image.get_len().unwrap(), original_size);
1706
1707
// Create an empty guest memory
1708
let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1709
.expect("Creating guest memory failed.");
1710
1711
// Create a control tube
1712
let (control_tube, control_tube_device) = Tube::pair().unwrap();
1713
1714
// Create a BlockAsync to test
1715
let features = base_features(ProtectionType::Unprotected);
1716
let disk_option = DiskOption {
1717
multiple_workers: enables_multiple_workers,
1718
..Default::default()
1719
};
1720
let mut b = BlockAsync::new(
1721
features,
1722
disk_image.try_clone().unwrap(),
1723
&disk_option,
1724
Some(control_tube_device),
1725
None,
1726
None,
1727
)
1728
.unwrap();
1729
1730
let interrupt = Interrupt::new_for_test();
1731
1732
// activate with queues of an arbitrary size.
1733
let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1734
q0.set_ready(true);
1735
let q0 = q0
1736
.activate(&mem, Event::new().unwrap(), interrupt.clone())
1737
.expect("QueueConfig::activate");
1738
1739
let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1740
q1.set_ready(true);
1741
let q1 = q1
1742
.activate(&mem, Event::new().unwrap(), interrupt.clone())
1743
.expect("QueueConfig::activate");
1744
1745
b.activate(mem, interrupt.clone(), BTreeMap::from([(0, q0), (1, q1)]))
1746
.expect("activate should succeed");
1747
1748
// assert the original size first
1749
assert_eq!(
1750
b.disk_size.load(Ordering::Acquire),
1751
original_size,
1752
"disk_size should be the original size first"
1753
);
1754
let mut capacity = [0u8; 8];
1755
b.read_config(0, &mut capacity);
1756
assert_eq!(
1757
capacity,
1758
// original_size (0x1000) >> SECTOR_SHIFT (9) = 0x8
1759
[0x8, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00],
1760
"read_config should read the original capacity first"
1761
);
1762
1763
// assert resize works
1764
control_tube
1765
.send(&DiskControlCommand::Resize {
1766
new_size: resized_size,
1767
})
1768
.unwrap();
1769
assert_eq!(
1770
control_tube.recv::<DiskControlResult>().unwrap(),
1771
DiskControlResult::Ok,
1772
"resize command should succeed"
1773
);
1774
assert_eq!(
1775
b.disk_size.load(Ordering::Acquire),
1776
resized_size,
1777
"disk_size should be resized to the new size"
1778
);
1779
assert_eq!(
1780
disk_image.get_len().unwrap(),
1781
resized_size,
1782
"underlying disk image should be resized to the new size"
1783
);
1784
let mut capacity = [0u8; 8];
1785
b.read_config(0, &mut capacity);
1786
assert_eq!(
1787
capacity,
1788
// resized_size (0x2000) >> SECTOR_SHIFT (9) = 0x10
1789
[0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00],
1790
"read_config should read the resized capacity"
1791
);
1792
// Wait until the blk signals the interrupt
1793
interrupt
1794
.get_interrupt_evt()
1795
.wait()
1796
.expect("interrupt should be signaled");
1797
1798
assert_eq!(
1799
interrupt.read_interrupt_status(),
1800
crate::virtio::INTERRUPT_STATUS_CONFIG_CHANGED as u8,
1801
"INTERRUPT_STATUS_CONFIG_CHANGED should be signaled"
1802
);
1803
}
1804
1805
#[test]
1806
fn run_worker_threads() {
1807
// Create an empty duplicable disk image
1808
let f = tempfile().unwrap();
1809
f.set_len(0x1000).unwrap();
1810
let disk_image: Box<dyn DiskFile> = Box::new(f);
1811
1812
// Create an empty guest memory
1813
let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)])
1814
.expect("Creating guest memory failed.");
1815
1816
// Create a BlockAsync to test with single worker thread
1817
let features = base_features(ProtectionType::Unprotected);
1818
let disk_option = DiskOption::default();
1819
let mut b = BlockAsync::new(
1820
features,
1821
disk_image.try_clone().unwrap(),
1822
&disk_option,
1823
None,
1824
None,
1825
None,
1826
)
1827
.unwrap();
1828
1829
// activate with queues of an arbitrary size.
1830
let interrupt = Interrupt::new_for_test();
1831
let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1832
q0.set_ready(true);
1833
let q0 = q0
1834
.activate(&mem, Event::new().unwrap(), interrupt.clone())
1835
.expect("QueueConfig::activate");
1836
1837
let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1838
q1.set_ready(true);
1839
let q1 = q1
1840
.activate(&mem, Event::new().unwrap(), interrupt.clone())
1841
.expect("QueueConfig::activate");
1842
1843
b.activate(mem.clone(), interrupt, BTreeMap::from([(0, q0), (1, q1)]))
1844
.expect("activate should succeed");
1845
1846
assert_eq!(b.worker_threads.len(), 1, "1 threads should be spawned.");
1847
drop(b);
1848
1849
// Create a BlockAsync to test with multiple worker threads
1850
let features = base_features(ProtectionType::Unprotected);
1851
let disk_option = DiskOption {
1852
read_only: true,
1853
sparse: false,
1854
multiple_workers: true,
1855
..DiskOption::default()
1856
};
1857
let mut b = BlockAsync::new(features, disk_image, &disk_option, None, None, None).unwrap();
1858
1859
// activate should succeed
1860
let interrupt = Interrupt::new_for_test();
1861
let mut q0 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1862
q0.set_ready(true);
1863
let q0 = q0
1864
.activate(&mem, Event::new().unwrap(), interrupt.clone())
1865
.expect("QueueConfig::activate");
1866
1867
let mut q1 = QueueConfig::new(DEFAULT_QUEUE_SIZE, 0);
1868
q1.set_ready(true);
1869
let q1 = q1
1870
.activate(&mem, Event::new().unwrap(), interrupt.clone())
1871
.expect("QueueConfig::activate");
1872
1873
b.activate(mem, interrupt, BTreeMap::from([(0, q0), (1, q1)]))
1874
.expect("activate should succeed");
1875
1876
assert_eq!(b.worker_threads.len(), 2, "2 threads should be spawned.");
1877
}
1878
1879
struct BlockContext {}
1880
1881
fn modify_device(_block_context: &mut BlockContext, b: &mut BlockAsync) {
1882
b.avail_features = !b.avail_features;
1883
}
1884
1885
fn create_device() -> (BlockContext, BlockAsync) {
1886
// Create an empty disk image
1887
let f = tempfile().unwrap();
1888
f.set_len(0x1000).unwrap();
1889
let disk_image: Box<dyn DiskFile> = Box::new(f);
1890
1891
// Create a BlockAsync to test
1892
let features = base_features(ProtectionType::Unprotected);
1893
let id = b"Block serial number\0";
1894
let disk_option = DiskOption {
1895
read_only: true,
1896
id: Some(*id),
1897
sparse: false,
1898
multiple_workers: true,
1899
..Default::default()
1900
};
1901
(
1902
BlockContext {},
1903
BlockAsync::new(
1904
features,
1905
disk_image.try_clone().unwrap(),
1906
&disk_option,
1907
None,
1908
None,
1909
None,
1910
)
1911
.unwrap(),
1912
)
1913
}
1914
1915
#[cfg(any(target_os = "android", target_os = "linux"))]
1916
suspendable_virtio_tests!(asyncblock, create_device, 2, modify_device);
1917
}
1918
1919