Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
google
GitHub Repository: google/crosvm
Path: blob/main/io_uring/tests/uring.rs
5394 views
1
// Copyright 2022 The ChromiumOS Authors
2
// Use of this source code is governed by a BSD-style license that can be
3
// found in the LICENSE file.
4
5
#![cfg(any(target_os = "android", target_os = "linux"))]
6
7
use std::collections::BTreeSet;
8
use std::fs::File;
9
use std::fs::OpenOptions;
10
use std::io::IoSlice;
11
use std::io::IoSliceMut;
12
use std::io::Read;
13
use std::io::Seek;
14
use std::io::SeekFrom;
15
use std::io::Write;
16
use std::mem;
17
use std::os::unix::io::AsRawFd;
18
use std::os::unix::io::RawFd;
19
use std::path::Path;
20
use std::path::PathBuf;
21
use std::pin::Pin;
22
use std::sync::atomic::AtomicUsize;
23
use std::sync::atomic::Ordering;
24
use std::sync::mpsc::channel;
25
use std::sync::Arc;
26
use std::sync::Barrier;
27
use std::thread;
28
use std::time::Duration;
29
30
use base::pipe;
31
use base::EventType;
32
use base::IoBufMut;
33
use base::WaitContext;
34
use io_uring::Error;
35
use io_uring::URingAllowlist;
36
use io_uring::URingContext;
37
use io_uring::UserData;
38
use libc::EACCES;
39
use sync::Condvar;
40
use sync::Mutex;
41
use tempfile::tempfile;
42
use tempfile::TempDir;
43
44
fn append_file_name(path: &Path, name: &str) -> PathBuf {
45
let mut joined = path.to_path_buf();
46
joined.push(name);
47
joined
48
}
49
50
// TODO(b/315998194): Add safety comment
51
#[allow(clippy::undocumented_unsafe_blocks)]
52
unsafe fn add_one_read(
53
uring: &URingContext,
54
ptr: *mut u8,
55
len: usize,
56
fd: RawFd,
57
offset: Option<u64>,
58
user_data: UserData,
59
) -> Result<(), Error> {
60
uring.add_readv(
61
Pin::from(vec![IoBufMut::from_raw_parts(ptr, len)].into_boxed_slice()),
62
fd,
63
offset,
64
user_data,
65
)
66
}
67
68
// TODO(b/315998194): Add safety comment
69
#[allow(clippy::undocumented_unsafe_blocks)]
70
unsafe fn add_one_write(
71
uring: &URingContext,
72
ptr: *const u8,
73
len: usize,
74
fd: RawFd,
75
offset: Option<u64>,
76
user_data: UserData,
77
) -> Result<(), Error> {
78
uring.add_writev(
79
Pin::from(vec![IoBufMut::from_raw_parts(ptr as *mut u8, len)].into_boxed_slice()),
80
fd,
81
offset,
82
user_data,
83
)
84
}
85
86
fn create_test_file(size: u64) -> std::fs::File {
87
let f = tempfile().unwrap();
88
f.set_len(size).unwrap();
89
f
90
}
91
92
#[test]
93
// Queue as many reads as possible and then collect the completions.
94
fn read_parallel() {
95
const QUEUE_SIZE: usize = 10;
96
const BUF_SIZE: usize = 0x1000;
97
98
let uring = URingContext::new(QUEUE_SIZE, None).unwrap();
99
let mut buf = [0u8; BUF_SIZE * QUEUE_SIZE];
100
let f = create_test_file((BUF_SIZE * QUEUE_SIZE) as u64);
101
102
// check that the whole file can be read and that the queues wrapping is handled by reading
103
// double the quue depth of buffers.
104
for i in 0..QUEUE_SIZE * 64 {
105
let index = i as u64;
106
// TODO(b/315998194): Add safety comment
107
#[allow(clippy::undocumented_unsafe_blocks)]
108
unsafe {
109
let offset = (i % QUEUE_SIZE) * BUF_SIZE;
110
match add_one_read(
111
&uring,
112
buf[offset..].as_mut_ptr(),
113
BUF_SIZE,
114
f.as_raw_fd(),
115
Some(offset as u64),
116
index,
117
) {
118
Ok(_) => (),
119
Err(Error::NoSpace) => {
120
let _ = uring.wait().unwrap().next().unwrap();
121
}
122
Err(_) => panic!("unexpected error from uring wait"),
123
}
124
}
125
}
126
}
127
128
#[test]
129
fn read_readv() {
130
let queue_size = 128;
131
132
let uring = URingContext::new(queue_size, None).unwrap();
133
let mut buf = [0u8; 0x1000];
134
let f = create_test_file(0x1000 * 2);
135
136
// check that the whole file can be read and that the queues wrapping is handled by reading
137
// double the quue depth of buffers.
138
for i in 0..queue_size * 2 {
139
let index = i as u64;
140
// SAFETY:
141
// safe to transmut from IoSlice to iovec.
142
let io_vecs = unsafe {
143
vec![IoSliceMut::new(&mut buf)]
144
.into_iter()
145
.map(|slice| std::mem::transmute::<IoSliceMut, libc::iovec>(slice))
146
};
147
// SAFETY:
148
// Safe because the `wait` call waits until the kernel is done with `buf`.
149
let (user_data_ret, res) = unsafe {
150
uring
151
.add_readv_iter(io_vecs, f.as_raw_fd(), Some((index % 2) * 0x1000), index)
152
.unwrap();
153
uring.wait().unwrap().next().unwrap()
154
};
155
assert_eq!(user_data_ret, index);
156
assert_eq!(res.unwrap(), buf.len() as u32);
157
}
158
}
159
160
#[test]
161
fn readv_vec() {
162
let queue_size = 128;
163
const BUF_SIZE: usize = 0x2000;
164
165
let uring = URingContext::new(queue_size, None).unwrap();
166
let mut buf = [0u8; BUF_SIZE];
167
let mut buf2 = [0u8; BUF_SIZE];
168
let mut buf3 = [0u8; BUF_SIZE];
169
// SAFETY:
170
//safe to transmut from IoSlice to iovec.
171
let io_vecs = unsafe {
172
vec![
173
IoSliceMut::new(&mut buf),
174
IoSliceMut::new(&mut buf2),
175
IoSliceMut::new(&mut buf3),
176
]
177
.into_iter()
178
.map(|slice| std::mem::transmute::<IoSliceMut, libc::iovec>(slice))
179
.collect::<Vec<libc::iovec>>()
180
};
181
let total_len = io_vecs.iter().fold(0, |a, iovec| a + iovec.iov_len);
182
let f = create_test_file(total_len as u64 * 2);
183
// SAFETY:
184
// Safe because the `wait` call waits until the kernel is done with `buf`.
185
let (user_data_ret, res) = unsafe {
186
uring
187
.add_readv_iter(io_vecs.into_iter(), f.as_raw_fd(), Some(0), 55)
188
.unwrap();
189
uring.wait().unwrap().next().unwrap()
190
};
191
assert_eq!(user_data_ret, 55);
192
assert_eq!(res.unwrap(), total_len as u32);
193
}
194
195
#[test]
196
fn write_one_block() {
197
let uring = URingContext::new(16, None).unwrap();
198
let mut buf = [0u8; 4096];
199
let mut f = create_test_file(0);
200
f.write_all(&buf).unwrap();
201
f.write_all(&buf).unwrap();
202
203
// SAFETY:
204
// Safe because the `wait` call waits until the kernel is done mutating `buf`.
205
unsafe {
206
add_one_write(
207
&uring,
208
buf.as_mut_ptr(),
209
buf.len(),
210
f.as_raw_fd(),
211
Some(0),
212
55,
213
)
214
.unwrap();
215
let (user_data, res) = uring.wait().unwrap().next().unwrap();
216
assert_eq!(user_data, 55_u64);
217
assert_eq!(res.unwrap(), buf.len() as u32);
218
}
219
}
220
221
#[test]
222
fn write_one_submit_poll() {
223
let uring = URingContext::new(16, None).unwrap();
224
let mut buf = [0u8; 4096];
225
let mut f = create_test_file(0);
226
f.write_all(&buf).unwrap();
227
f.write_all(&buf).unwrap();
228
229
let ctx: WaitContext<u64> = WaitContext::build_with(&[(&uring, 1)]).unwrap();
230
{
231
// Test that the uring context isn't readable before any events are complete.
232
let events = ctx.wait_timeout(Duration::from_millis(1)).unwrap();
233
assert!(events.iter().next().is_none());
234
}
235
236
// SAFETY:
237
// Safe because the `wait` call waits until the kernel is done mutating `buf`.
238
unsafe {
239
add_one_write(
240
&uring,
241
buf.as_mut_ptr(),
242
buf.len(),
243
f.as_raw_fd(),
244
Some(0),
245
55,
246
)
247
.unwrap();
248
uring.submit().unwrap();
249
// Poll for completion with epoll.
250
let events = ctx.wait().unwrap();
251
let event = events.iter().next().unwrap();
252
assert!(event.is_readable);
253
assert_eq!(event.token, 1);
254
let (user_data, res) = uring.wait().unwrap().next().unwrap();
255
assert_eq!(user_data, 55_u64);
256
assert_eq!(res.unwrap(), buf.len() as u32);
257
}
258
}
259
260
#[test]
261
fn writev_vec() {
262
let queue_size = 128;
263
const BUF_SIZE: usize = 0x2000;
264
const OFFSET: u64 = 0x2000;
265
266
let uring = URingContext::new(queue_size, None).unwrap();
267
let buf = [0xaau8; BUF_SIZE];
268
let buf2 = [0xffu8; BUF_SIZE];
269
let buf3 = [0x55u8; BUF_SIZE];
270
// SAFETY:
271
//safe to transmut from IoSlice to iovec.
272
let io_vecs = unsafe {
273
vec![IoSlice::new(&buf), IoSlice::new(&buf2), IoSlice::new(&buf3)]
274
.into_iter()
275
.map(|slice| std::mem::transmute::<IoSlice, libc::iovec>(slice))
276
.collect::<Vec<libc::iovec>>()
277
};
278
let total_len = io_vecs.iter().fold(0, |a, iovec| a + iovec.iov_len);
279
let mut f = create_test_file(total_len as u64 * 2);
280
// SAFETY:
281
// Safe because the `wait` call waits until the kernel is done with `buf`.
282
let (user_data_ret, res) = unsafe {
283
uring
284
.add_writev_iter(io_vecs.into_iter(), f.as_raw_fd(), Some(OFFSET), 55)
285
.unwrap();
286
uring.wait().unwrap().next().unwrap()
287
};
288
assert_eq!(user_data_ret, 55);
289
assert_eq!(res.unwrap(), total_len as u32);
290
291
let mut read_back = [0u8; BUF_SIZE];
292
f.seek(SeekFrom::Start(OFFSET)).unwrap();
293
f.read_exact(&mut read_back).unwrap();
294
assert!(!read_back.iter().any(|&b| b != 0xaa));
295
f.read_exact(&mut read_back).unwrap();
296
assert!(!read_back.iter().any(|&b| b != 0xff));
297
f.read_exact(&mut read_back).unwrap();
298
assert!(!read_back.iter().any(|&b| b != 0x55));
299
}
300
301
#[test]
302
fn fallocate_fsync() {
303
let tempdir = TempDir::new().unwrap();
304
let file_path = append_file_name(tempdir.path(), "test");
305
306
{
307
let buf = [0u8; 4096];
308
let mut f = OpenOptions::new()
309
.read(true)
310
.write(true)
311
.create_new(true)
312
.open(&file_path)
313
.unwrap();
314
f.write_all(&buf).unwrap();
315
}
316
317
let init_size = std::fs::metadata(&file_path).unwrap().len() as usize;
318
let set_size = init_size + 1024 * 1024 * 50;
319
let f = OpenOptions::new()
320
.read(true)
321
.write(true)
322
.create(true)
323
.truncate(false)
324
.open(&file_path)
325
.unwrap();
326
327
let uring = URingContext::new(16, None).unwrap();
328
uring
329
.add_fallocate(f.as_raw_fd(), 0, set_size as u64, 0, 66)
330
.unwrap();
331
let (user_data, res) = uring.wait().unwrap().next().unwrap();
332
assert_eq!(user_data, 66_u64);
333
match res {
334
Err(e) => {
335
if e.kind() == std::io::ErrorKind::InvalidInput {
336
// skip on kernels that don't support fallocate.
337
return;
338
}
339
panic!("Unexpected fallocate error: {e}");
340
}
341
Ok(val) => assert_eq!(val, 0_u32),
342
}
343
344
// Add a few writes and then fsync
345
let buf = [0u8; 4096];
346
let mut pending = std::collections::BTreeSet::new();
347
// TODO(b/315998194): Add safety comment
348
#[allow(clippy::undocumented_unsafe_blocks)]
349
unsafe {
350
add_one_write(&uring, buf.as_ptr(), buf.len(), f.as_raw_fd(), Some(0), 67).unwrap();
351
pending.insert(67u64);
352
add_one_write(
353
&uring,
354
buf.as_ptr(),
355
buf.len(),
356
f.as_raw_fd(),
357
Some(4096),
358
68,
359
)
360
.unwrap();
361
pending.insert(68);
362
add_one_write(
363
&uring,
364
buf.as_ptr(),
365
buf.len(),
366
f.as_raw_fd(),
367
Some(8192),
368
69,
369
)
370
.unwrap();
371
pending.insert(69);
372
}
373
uring.add_fsync(f.as_raw_fd(), 70).unwrap();
374
pending.insert(70);
375
376
let mut wait_calls = 0;
377
378
while !pending.is_empty() && wait_calls < 5 {
379
let events = uring.wait().unwrap();
380
for (user_data, res) in events {
381
assert!(res.is_ok());
382
assert!(pending.contains(&user_data));
383
pending.remove(&user_data);
384
}
385
wait_calls += 1;
386
}
387
assert!(pending.is_empty());
388
389
uring
390
.add_fallocate(
391
f.as_raw_fd(),
392
init_size as u64,
393
(set_size - init_size) as u64,
394
(libc::FALLOC_FL_PUNCH_HOLE | libc::FALLOC_FL_KEEP_SIZE) as u32,
395
68,
396
)
397
.unwrap();
398
let (user_data, res) = uring.wait().unwrap().next().unwrap();
399
assert_eq!(user_data, 68_u64);
400
assert_eq!(res.unwrap(), 0_u32);
401
402
drop(f); // Close to ensure directory entires for metadata are updated.
403
404
let new_size = std::fs::metadata(&file_path).unwrap().len() as usize;
405
assert_eq!(new_size, set_size);
406
}
407
408
#[test]
409
fn dev_zero_readable() {
410
let f = File::open(Path::new("/dev/zero")).unwrap();
411
let uring = URingContext::new(16, None).unwrap();
412
uring
413
.add_poll_fd(f.as_raw_fd(), EventType::Read, 454)
414
.unwrap();
415
let (user_data, res) = uring.wait().unwrap().next().unwrap();
416
assert_eq!(user_data, 454_u64);
417
assert_eq!(res.unwrap(), 1_u32);
418
}
419
420
#[test]
421
fn queue_many_ebusy_retry() {
422
let num_entries = 16;
423
let f = File::open(Path::new("/dev/zero")).unwrap();
424
let uring = URingContext::new(num_entries, None).unwrap();
425
// Fill the sumbit ring.
426
for sqe_batch in 0..3 {
427
for i in 0..num_entries {
428
uring
429
.add_poll_fd(
430
f.as_raw_fd(),
431
EventType::Read,
432
(sqe_batch * num_entries + i) as u64,
433
)
434
.unwrap();
435
}
436
uring.submit().unwrap();
437
}
438
// Adding more than the number of cqes will cause the uring to return ebusy, make sure that
439
// is handled cleanly and wait still returns the completed entries.
440
uring
441
.add_poll_fd(f.as_raw_fd(), EventType::Read, (num_entries * 3) as u64)
442
.unwrap();
443
// The first wait call should return the cques that are already filled.
444
{
445
let mut results = uring.wait().unwrap();
446
for _i in 0..num_entries * 2 {
447
assert_eq!(results.next().unwrap().1.unwrap(), 1_u32);
448
}
449
assert!(results.next().is_none());
450
}
451
// The second will finish submitting any more sqes and return the rest.
452
let mut results = uring.wait().unwrap();
453
for _i in 0..num_entries + 1 {
454
assert_eq!(results.next().unwrap().1.unwrap(), 1_u32);
455
}
456
assert!(results.next().is_none());
457
}
458
459
#[test]
460
fn wake_with_nop() {
461
const PIPE_READ: UserData = 0;
462
const NOP: UserData = 1;
463
const BUF_DATA: [u8; 16] = [0xf4; 16];
464
465
let uring = URingContext::new(4, None).map(Arc::new).unwrap();
466
let (pipe_out, mut pipe_in) = pipe().unwrap();
467
let (tx, rx) = channel();
468
469
let uring2 = uring.clone();
470
let wait_thread = thread::spawn(move || {
471
let mut buf = [0u8; BUF_DATA.len()];
472
// TODO(b/315998194): Add safety comment
473
#[allow(clippy::undocumented_unsafe_blocks)]
474
unsafe {
475
add_one_read(
476
&uring2,
477
buf.as_mut_ptr(),
478
buf.len(),
479
pipe_out.as_raw_fd(),
480
Some(0),
481
0,
482
)
483
.unwrap();
484
}
485
486
// This is still a bit racy as the other thread may end up adding the NOP before we make
487
// the syscall but I'm not aware of a mechanism that will notify the other thread
488
// exactly when we make the syscall.
489
tx.send(()).unwrap();
490
let mut events = uring2.wait().unwrap();
491
let (user_data, result) = events.next().unwrap();
492
assert_eq!(user_data, NOP);
493
assert_eq!(result.unwrap(), 0);
494
495
tx.send(()).unwrap();
496
let mut events = uring2.wait().unwrap();
497
let (user_data, result) = events.next().unwrap();
498
assert_eq!(user_data, PIPE_READ);
499
assert_eq!(result.unwrap(), buf.len() as u32);
500
assert_eq!(&buf, &BUF_DATA);
501
});
502
503
// Wait until the other thread is about to make the syscall.
504
rx.recv_timeout(Duration::from_secs(10)).unwrap();
505
506
// Now add a NOP operation. This should wake up the other thread even though it cannot yet
507
// read from the pipe.
508
uring.add_nop(NOP).unwrap();
509
uring.submit().unwrap();
510
511
// Wait for the other thread to process the NOP result.
512
rx.recv_timeout(Duration::from_secs(10)).unwrap();
513
514
// Now write to the pipe to finish the uring read.
515
pipe_in.write_all(&BUF_DATA).unwrap();
516
517
wait_thread.join().unwrap();
518
}
519
520
#[test]
521
fn complete_from_any_thread() {
522
let num_entries = 16;
523
let uring = URingContext::new(num_entries, None).map(Arc::new).unwrap();
524
525
// Fill the sumbit ring.
526
for sqe_batch in 0..3 {
527
for i in 0..num_entries {
528
uring.add_nop((sqe_batch * num_entries + i) as u64).unwrap();
529
}
530
uring.submit().unwrap();
531
}
532
533
// Spawn a bunch of threads that pull cqes out of the uring and make sure none of them see a
534
// duplicate.
535
const NUM_THREADS: usize = 7;
536
let completed = Arc::new(Mutex::new(BTreeSet::new()));
537
let cv = Arc::new(Condvar::new());
538
let barrier = Arc::new(Barrier::new(NUM_THREADS));
539
540
let mut threads = Vec::with_capacity(NUM_THREADS);
541
for _ in 0..NUM_THREADS {
542
let uring = uring.clone();
543
let completed = completed.clone();
544
let barrier = barrier.clone();
545
let cv = cv.clone();
546
threads.push(thread::spawn(move || {
547
barrier.wait();
548
549
'wait: while completed.lock().len() < num_entries * 3 {
550
for (user_data, result) in uring.wait().unwrap() {
551
assert_eq!(result.unwrap(), 0);
552
553
let mut completed = completed.lock();
554
assert!(completed.insert(user_data));
555
if completed.len() >= num_entries * 3 {
556
break 'wait;
557
}
558
}
559
}
560
561
cv.notify_one();
562
}));
563
}
564
565
// Wait until all the operations have completed.
566
let mut c = completed.lock();
567
while c.len() < num_entries * 3 {
568
c = cv.wait(c);
569
}
570
mem::drop(c);
571
572
// Let the OS clean up the still-waiting threads after the test run.
573
}
574
575
#[test]
576
fn submit_from_any_thread() {
577
const NUM_THREADS: usize = 7;
578
const ITERATIONS: usize = 113;
579
const NUM_ENTRIES: usize = 16;
580
581
fn wait_for_completion_thread(in_flight: &Mutex<isize>, cv: &Condvar) {
582
let mut in_flight = in_flight.lock();
583
while *in_flight > NUM_ENTRIES as isize {
584
in_flight = cv.wait(in_flight);
585
}
586
}
587
588
let uring = URingContext::new(NUM_ENTRIES, None).map(Arc::new).unwrap();
589
let in_flight = Arc::new(Mutex::new(0));
590
let cv = Arc::new(Condvar::new());
591
592
let mut threads = Vec::with_capacity(NUM_THREADS);
593
for idx in 0..NUM_THREADS {
594
let uring = uring.clone();
595
let in_flight = in_flight.clone();
596
let cv = cv.clone();
597
threads.push(thread::spawn(move || {
598
for iter in 0..ITERATIONS {
599
loop {
600
match uring.add_nop(((idx * NUM_THREADS) + iter) as UserData) {
601
Ok(()) => *in_flight.lock() += 1,
602
Err(Error::NoSpace) => {
603
wait_for_completion_thread(&in_flight, &cv);
604
continue;
605
}
606
Err(e) => panic!("Failed to add nop: {e}"),
607
}
608
609
// We don't need to wait for the completion queue if the submit fails with
610
// EBUSY because we already added the operation to the submit queue. It will
611
// get added eventually.
612
match uring.submit() {
613
Ok(()) => break,
614
Err(Error::RingEnter(libc::EBUSY)) => break,
615
Err(e) => panic!("Failed to submit ops: {e}"),
616
}
617
}
618
}
619
}));
620
}
621
622
let mut completed = 0;
623
while completed < NUM_THREADS * ITERATIONS {
624
for (_, res) in uring.wait().unwrap() {
625
assert_eq!(res.unwrap(), 0);
626
completed += 1;
627
628
let mut in_flight = in_flight.lock();
629
*in_flight -= 1;
630
let notify_submitters = *in_flight <= NUM_ENTRIES as isize;
631
mem::drop(in_flight);
632
633
if notify_submitters {
634
cv.notify_all();
635
}
636
637
if completed >= NUM_THREADS * ITERATIONS {
638
break;
639
}
640
}
641
}
642
643
for t in threads {
644
t.join().unwrap();
645
}
646
647
// Make sure we didn't submit more entries than expected.
648
assert_eq!(*in_flight.lock(), 0);
649
assert_eq!(uring.submit_ring.lock().added, 0);
650
assert_eq!(uring.complete_ring.num_ready(), 0);
651
}
652
653
// TODO(b/183722981): Fix and re-enable test
654
#[test]
655
#[ignore]
656
fn multi_thread_submit_and_complete() {
657
const NUM_SUBMITTERS: usize = 7;
658
const NUM_COMPLETERS: usize = 3;
659
const ITERATIONS: usize = 113;
660
const NUM_ENTRIES: usize = 16;
661
662
fn wait_for_completion_thread(in_flight: &Mutex<isize>, cv: &Condvar) {
663
let mut in_flight = in_flight.lock();
664
while *in_flight > NUM_ENTRIES as isize {
665
in_flight = cv.wait(in_flight);
666
}
667
}
668
669
let uring = URingContext::new(NUM_ENTRIES, None).map(Arc::new).unwrap();
670
let in_flight = Arc::new(Mutex::new(0));
671
let cv = Arc::new(Condvar::new());
672
673
let mut threads = Vec::with_capacity(NUM_SUBMITTERS + NUM_COMPLETERS);
674
for idx in 0..NUM_SUBMITTERS {
675
let uring = uring.clone();
676
let in_flight = in_flight.clone();
677
let cv = cv.clone();
678
threads.push(thread::spawn(move || {
679
for iter in 0..ITERATIONS {
680
loop {
681
match uring.add_nop(((idx * NUM_SUBMITTERS) + iter) as UserData) {
682
Ok(()) => *in_flight.lock() += 1,
683
Err(Error::NoSpace) => {
684
wait_for_completion_thread(&in_flight, &cv);
685
continue;
686
}
687
Err(e) => panic!("Failed to add nop: {e}"),
688
}
689
690
// We don't need to wait for the completion queue if the submit fails with
691
// EBUSY because we already added the operation to the submit queue. It will
692
// get added eventually.
693
match uring.submit() {
694
Ok(()) => break,
695
Err(Error::RingEnter(libc::EBUSY)) => break,
696
Err(e) => panic!("Failed to submit ops: {e}"),
697
}
698
}
699
}
700
}));
701
}
702
703
let completed = Arc::new(AtomicUsize::new(0));
704
for _ in 0..NUM_COMPLETERS {
705
let uring = uring.clone();
706
let in_flight = in_flight.clone();
707
let cv = cv.clone();
708
let completed = completed.clone();
709
threads.push(thread::spawn(move || {
710
while completed.load(Ordering::Relaxed) < NUM_SUBMITTERS * ITERATIONS {
711
for (_, res) in uring.wait().unwrap() {
712
assert_eq!(res.unwrap(), 0);
713
completed.fetch_add(1, Ordering::Relaxed);
714
715
let mut in_flight = in_flight.lock();
716
*in_flight -= 1;
717
let notify_submitters = *in_flight <= NUM_ENTRIES as isize;
718
mem::drop(in_flight);
719
720
if notify_submitters {
721
cv.notify_all();
722
}
723
724
if completed.load(Ordering::Relaxed) >= NUM_SUBMITTERS * ITERATIONS {
725
break;
726
}
727
}
728
}
729
}));
730
}
731
732
for t in threads.drain(..NUM_SUBMITTERS) {
733
t.join().unwrap();
734
}
735
736
// Now that all submitters are finished, add NOPs to wake up any completers blocked on the
737
// syscall.
738
for i in 0..NUM_COMPLETERS {
739
uring
740
.add_nop((NUM_SUBMITTERS * ITERATIONS + i) as UserData)
741
.unwrap();
742
}
743
uring.submit().unwrap();
744
745
for t in threads {
746
t.join().unwrap();
747
}
748
749
// Make sure we didn't submit more entries than expected. Only the last few NOPs added to
750
// wake up the completer threads may still be in the completion ring.
751
assert!(uring.complete_ring.num_ready() <= NUM_COMPLETERS as u32);
752
assert_eq!(
753
in_flight.lock().unsigned_abs() as u32 + uring.complete_ring.num_ready(),
754
NUM_COMPLETERS as u32
755
);
756
assert_eq!(uring.submit_ring.lock().added, 0);
757
}
758
759
#[test]
760
fn restrict_ops() {
761
const TEST_DATA: &[u8; 4] = b"foo!";
762
763
let queue_size = 128;
764
765
// Allow only Readv operation
766
let mut restriction = URingAllowlist::new();
767
restriction.allow_submit_operation(io_uring::URingOperation::Readv);
768
769
let uring = URingContext::new(queue_size, Some(&restriction)).unwrap();
770
771
let mut buf = [0u8; 4];
772
let mut f = create_test_file(4);
773
f.write_all(TEST_DATA).unwrap();
774
775
// add_read, which submits Readv, should succeed
776
777
// TODO(b/315998194): Add safety comment
778
#[allow(clippy::undocumented_unsafe_blocks)]
779
unsafe {
780
add_one_read(
781
&uring,
782
buf.as_mut_ptr(),
783
buf.len(),
784
f.as_raw_fd(),
785
Some(0),
786
0,
787
)
788
.unwrap();
789
}
790
let result = uring.wait().unwrap().next().unwrap();
791
assert!(result.1.is_ok(), "uring read should succeed");
792
assert_eq!(&buf, TEST_DATA, "file should be read to buf");
793
drop(f);
794
795
// add_write should be rejected.
796
797
let mut buf: [u8; 4] = TEST_DATA.to_owned(); // fake data, which should not be written
798
let mut f = create_test_file(4);
799
800
// TODO(b/315998194): Add safety comment
801
#[allow(clippy::undocumented_unsafe_blocks)]
802
unsafe {
803
add_one_write(
804
&uring,
805
buf.as_mut_ptr(),
806
buf.len(),
807
f.as_raw_fd(),
808
Some(0),
809
0,
810
)
811
.unwrap();
812
}
813
let result = uring.wait().unwrap().next().unwrap();
814
assert!(result.1.is_err(), "uring write should fail");
815
assert_eq!(
816
result.1.unwrap_err().raw_os_error(),
817
Some(EACCES),
818
"the error should be permission denied"
819
);
820
let mut result_f = vec![];
821
f.seek(SeekFrom::Start(0)).unwrap(); // rewind to read from the beginning
822
f.read_to_end(&mut result_f).unwrap();
823
assert_eq!(
824
result_f.as_slice(),
825
&[0, 0, 0, 0],
826
"file should not be written and should stay empty"
827
);
828
}
829
830