Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
google
GitHub Repository: google/crosvm
Path: blob/main/cros_async/src/sys/linux/fd_executor.rs
5394 views
1
// Copyright 2020 The ChromiumOS Authors
2
// Use of this source code is governed by a BSD-style license that can be
3
// found in the LICENSE file.
4
5
use std::future::Future;
6
use std::io;
7
use std::mem;
8
use std::os::fd::AsRawFd;
9
use std::pin::Pin;
10
use std::sync::Arc;
11
use std::sync::Weak;
12
use std::task::Context;
13
use std::task::Poll;
14
use std::task::Waker;
15
16
use base::add_fd_flags;
17
use base::warn;
18
use base::AsRawDescriptor;
19
use base::AsRawDescriptors;
20
use base::Event;
21
use base::EventType;
22
use base::RawDescriptor;
23
use base::WaitContext;
24
use remain::sorted;
25
use slab::Slab;
26
use sync::Mutex;
27
use thiserror::Error as ThisError;
28
29
use crate::common_executor::RawExecutor;
30
use crate::common_executor::RawTaskHandle;
31
use crate::common_executor::Reactor;
32
use crate::waker::WakerToken;
33
use crate::AsyncResult;
34
use crate::IoSource;
35
use crate::TaskHandle;
36
37
#[sorted]
38
#[derive(Debug, ThisError)]
39
pub enum Error {
40
#[error("Couldn't clear the wake eventfd")]
41
CantClearWakeEvent(base::Error),
42
/// Failed to clone the Event for waking the executor.
43
#[error("Failed to clone the Event for waking the executor: {0}")]
44
CloneEvent(base::Error),
45
/// Failed to create the Event for waking the executor.
46
#[error("Failed to create the Event for waking the executor: {0}")]
47
CreateEvent(base::Error),
48
/// Creating a context to wait on FDs failed.
49
#[error("An error creating the fd waiting context: {0}")]
50
CreatingContext(base::Error),
51
/// Failed to copy the FD for the polling context.
52
#[error("Failed to copy the FD for the polling context: {0}")]
53
DuplicatingFd(std::io::Error),
54
#[error("Executor failed")]
55
ExecutorError(anyhow::Error),
56
/// The Executor is gone.
57
#[error("The FDExecutor is gone")]
58
ExecutorGone,
59
/// An error occurred when setting the FD non-blocking.
60
#[error("An error occurred setting the FD non-blocking: {0}.")]
61
SettingNonBlocking(base::Error),
62
/// Failed to submit the waker to the polling context.
63
#[error("An error adding to the Aio context: {0}")]
64
SubmittingWaker(base::Error),
65
/// A Waker was canceled, but the operation isn't running.
66
#[error("Unknown waker")]
67
UnknownWaker,
68
/// WaitContext failure.
69
#[error("WaitContext failure: {0}")]
70
WaitContextError(base::Error),
71
}
72
pub type Result<T> = std::result::Result<T, Error>;
73
74
impl From<Error> for io::Error {
75
fn from(e: Error) -> Self {
76
use Error::*;
77
match e {
78
CantClearWakeEvent(e) => e.into(),
79
CloneEvent(e) => e.into(),
80
CreateEvent(e) => e.into(),
81
DuplicatingFd(e) => e,
82
ExecutorError(e) => io::Error::other(e),
83
ExecutorGone => io::Error::other(e),
84
CreatingContext(e) => e.into(),
85
SettingNonBlocking(e) => e.into(),
86
SubmittingWaker(e) => e.into(),
87
UnknownWaker => io::Error::other(e),
88
WaitContextError(e) => e.into(),
89
}
90
}
91
}
92
93
// A poll operation that has been submitted and is potentially being waited on.
94
struct OpData {
95
file: Arc<std::os::fd::OwnedFd>,
96
waker: Option<Waker>,
97
}
98
99
// The current status of a submitted operation.
100
enum OpStatus {
101
Pending(OpData),
102
Completed,
103
// Special status that identifies the "wake up" eventfd, which is essentially always pending.
104
WakeEvent,
105
}
106
107
// An IO source previously registered with an EpollReactor. Used to initiate asynchronous IO with
108
// the associated executor.
109
pub struct RegisteredSource<F> {
110
pub(crate) source: F,
111
ex: Weak<RawExecutor<EpollReactor>>,
112
/// A clone of `source`'s underlying FD. Allows us to ensure that the FD isn't closed during
113
/// the epoll wait call. There are well defined sematics for closing an FD in an epoll context
114
/// so it might be possible to eliminate this dup if someone thinks hard about it.
115
pub(crate) duped_fd: Arc<std::os::fd::OwnedFd>,
116
}
117
118
impl<F: AsRawDescriptor> RegisteredSource<F> {
119
pub(crate) fn new(raw: &Arc<RawExecutor<EpollReactor>>, f: F) -> Result<Self> {
120
let raw_fd = f.as_raw_descriptor();
121
assert_ne!(raw_fd, -1);
122
123
add_fd_flags(raw_fd, libc::O_NONBLOCK).map_err(Error::SettingNonBlocking)?;
124
125
// SAFETY: The FD is open for the duration of the BorrowedFd lifetime (this line) and not
126
// -1 (checked above).
127
let duped_fd = unsafe { std::os::fd::BorrowedFd::borrow_raw(raw_fd) }
128
.try_clone_to_owned()
129
.map_err(Error::DuplicatingFd)?;
130
Ok(RegisteredSource {
131
source: f,
132
ex: Arc::downgrade(raw),
133
duped_fd: Arc::new(duped_fd),
134
})
135
}
136
137
// Start an asynchronous operation to wait for this source to become readable. The returned
138
// future will not be ready until the source is readable.
139
pub fn wait_readable(&self) -> Result<PendingOperation> {
140
let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
141
142
let token = ex
143
.reactor
144
.add_operation(Arc::clone(&self.duped_fd), EventType::Read)?;
145
146
Ok(PendingOperation {
147
token: Some(token),
148
ex: self.ex.clone(),
149
})
150
}
151
152
// Start an asynchronous operation to wait for this source to become writable. The returned
153
// future will not be ready until the source is writable.
154
pub fn wait_writable(&self) -> Result<PendingOperation> {
155
let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
156
157
let token = ex
158
.reactor
159
.add_operation(Arc::clone(&self.duped_fd), EventType::Write)?;
160
161
Ok(PendingOperation {
162
token: Some(token),
163
ex: self.ex.clone(),
164
})
165
}
166
}
167
168
/// A token returned from `add_operation` that can be used to cancel the waker before it completes.
169
/// Used to manage getting the result from the underlying executor for a completed operation.
170
/// Dropping a `PendingOperation` will get the result from the executor.
171
pub struct PendingOperation {
172
token: Option<WakerToken>,
173
ex: Weak<RawExecutor<EpollReactor>>,
174
}
175
176
impl Future for PendingOperation {
177
type Output = Result<()>;
178
179
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
180
let token = self
181
.token
182
.as_ref()
183
.expect("PendingOperation polled after returning Poll::Ready");
184
if let Some(ex) = self.ex.upgrade() {
185
if ex.reactor.is_ready(token, cx) {
186
self.token = None;
187
Poll::Ready(Ok(()))
188
} else {
189
Poll::Pending
190
}
191
} else {
192
Poll::Ready(Err(Error::ExecutorGone))
193
}
194
}
195
}
196
197
impl Drop for PendingOperation {
198
fn drop(&mut self) {
199
if let Some(token) = self.token.take() {
200
if let Some(ex) = self.ex.upgrade() {
201
let _ = ex.reactor.cancel_operation(token);
202
}
203
}
204
}
205
}
206
207
/// `Reactor` that manages async IO work using epoll.
208
pub struct EpollReactor {
209
poll_ctx: WaitContext<usize>,
210
ops: Mutex<Slab<OpStatus>>,
211
// This event is always present in `poll_ctx` with the special op status `WakeEvent`. It is
212
// used by `RawExecutor::wake` to break other threads out of `poll_ctx.wait()` calls (usually
213
// to notify them that `queue` has new work).
214
wake_event: Event,
215
}
216
217
impl EpollReactor {
218
fn new() -> Result<Self> {
219
let reactor = EpollReactor {
220
poll_ctx: WaitContext::new().map_err(Error::CreatingContext)?,
221
ops: Mutex::new(Slab::with_capacity(64)),
222
wake_event: {
223
let wake_event = Event::new().map_err(Error::CreateEvent)?;
224
add_fd_flags(wake_event.as_raw_descriptor(), libc::O_NONBLOCK)
225
.map_err(Error::SettingNonBlocking)?;
226
wake_event
227
},
228
};
229
230
// Add the special "wake up" op.
231
{
232
let mut ops = reactor.ops.lock();
233
let entry = ops.vacant_entry();
234
let next_token = entry.key();
235
reactor
236
.poll_ctx
237
.add_for_event(&reactor.wake_event, EventType::Read, next_token)
238
.map_err(Error::SubmittingWaker)?;
239
entry.insert(OpStatus::WakeEvent);
240
}
241
242
Ok(reactor)
243
}
244
245
fn add_operation(
246
&self,
247
file: Arc<std::os::fd::OwnedFd>,
248
event_type: EventType,
249
) -> Result<WakerToken> {
250
let mut ops = self.ops.lock();
251
let entry = ops.vacant_entry();
252
let next_token = entry.key();
253
self.poll_ctx
254
.add_for_event(&base::Descriptor(file.as_raw_fd()), event_type, next_token)
255
.map_err(Error::SubmittingWaker)?;
256
entry.insert(OpStatus::Pending(OpData { file, waker: None }));
257
Ok(WakerToken(next_token))
258
}
259
260
fn is_ready(&self, token: &WakerToken, cx: &mut Context) -> bool {
261
let mut ops = self.ops.lock();
262
263
let op = ops
264
.get_mut(token.0)
265
.expect("`is_ready` called on unknown operation");
266
match op {
267
OpStatus::Pending(data) => {
268
data.waker = Some(cx.waker().clone());
269
false
270
}
271
OpStatus::Completed => {
272
ops.remove(token.0);
273
true
274
}
275
// unreachable because we never create a WakerToken for `wake_event`.
276
OpStatus::WakeEvent => unreachable!(),
277
}
278
}
279
280
// Remove the waker for the given token if it hasn't fired yet.
281
fn cancel_operation(&self, token: WakerToken) -> Result<()> {
282
match self.ops.lock().remove(token.0) {
283
OpStatus::Pending(data) => self
284
.poll_ctx
285
.delete(&base::Descriptor(data.file.as_raw_fd()))
286
.map_err(Error::WaitContextError),
287
OpStatus::Completed => Ok(()),
288
// unreachable because we never create a WakerToken for `wake_event`.
289
OpStatus::WakeEvent => unreachable!(),
290
}
291
}
292
}
293
294
impl Reactor for EpollReactor {
295
fn new() -> std::io::Result<Self> {
296
Ok(EpollReactor::new()?)
297
}
298
299
fn wake(&self) {
300
if let Err(e) = self.wake_event.signal() {
301
warn!("Failed to notify executor that a future is ready: {}", e);
302
}
303
}
304
305
fn on_executor_drop<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
306
// At this point, there are no strong references to the executor (see `on_executor_drop`
307
// docs). That means all the `RegisteredSource::ex` will fail to upgrade and so no more IO
308
// work can be submitted.
309
310
// Wake up any futures still waiting on poll operations as they are just going to get an
311
// ExecutorGone error now.
312
for op in self.ops.lock().drain() {
313
match op {
314
OpStatus::Pending(mut data) => {
315
if let Some(waker) = data.waker.take() {
316
waker.wake();
317
}
318
319
if let Err(e) = self
320
.poll_ctx
321
.delete(&base::Descriptor(data.file.as_raw_fd()))
322
{
323
warn!("Failed to remove file from EpollCtx: {}", e);
324
}
325
}
326
OpStatus::Completed => {}
327
OpStatus::WakeEvent => {}
328
}
329
}
330
331
// Now run the executor one more time to drive any remaining futures to completion.
332
Box::pin(async {})
333
}
334
335
fn wait_for_work(&self, set_processing: impl Fn()) -> std::io::Result<()> {
336
let events = self.poll_ctx.wait().map_err(Error::WaitContextError)?;
337
338
// Set the state back to PROCESSING to prevent any tasks woken up by the loop below from
339
// writing to the eventfd.
340
set_processing();
341
for e in events.iter() {
342
let token = e.token;
343
let mut ops = self.ops.lock();
344
345
// The op could have been canceled and removed by another thread so ignore it if it
346
// doesn't exist.
347
if let Some(op) = ops.get_mut(token) {
348
let (file, waker) = match mem::replace(op, OpStatus::Completed) {
349
OpStatus::Pending(OpData { file, waker }) => (file, waker),
350
OpStatus::Completed => panic!("poll operation completed more than once"),
351
OpStatus::WakeEvent => {
352
*op = OpStatus::WakeEvent;
353
match self.wake_event.wait() {
354
Ok(_) => {}
355
Err(e) if e.errno() == libc::EWOULDBLOCK => {}
356
Err(e) => return Err(e.into()),
357
}
358
continue;
359
}
360
};
361
362
mem::drop(ops);
363
364
self.poll_ctx
365
.delete(&base::Descriptor(file.as_raw_fd()))
366
.map_err(Error::WaitContextError)?;
367
368
if let Some(waker) = waker {
369
waker.wake();
370
}
371
}
372
}
373
Ok(())
374
}
375
376
fn new_source<F: AsRawDescriptor>(
377
&self,
378
ex: &Arc<RawExecutor<Self>>,
379
f: F,
380
) -> AsyncResult<IoSource<F>> {
381
Ok(IoSource::Epoll(super::PollSource::new(f, ex)?))
382
}
383
384
fn wrap_task_handle<R>(task: RawTaskHandle<EpollReactor, R>) -> TaskHandle<R> {
385
TaskHandle::Fd(task)
386
}
387
}
388
389
impl AsRawDescriptors for EpollReactor {
390
fn as_raw_descriptors(&self) -> Vec<RawDescriptor> {
391
vec![
392
self.poll_ctx.as_raw_descriptor(),
393
self.wake_event.as_raw_descriptor(),
394
]
395
}
396
}
397
398
#[cfg(test)]
399
mod test {
400
use std::cell::RefCell;
401
use std::fs::File;
402
use std::io::Read;
403
use std::io::Write;
404
use std::rc::Rc;
405
406
use futures::future::Either;
407
408
use super::*;
409
use crate::BlockingPool;
410
use crate::ExecutorTrait;
411
412
#[test]
413
fn test_it() {
414
async fn do_test(ex: &Arc<RawExecutor<EpollReactor>>) {
415
let (r, _w) = base::pipe().unwrap();
416
let done = Box::pin(async { 5usize });
417
let source = RegisteredSource::new(ex, r).unwrap();
418
let pending = source.wait_readable().unwrap();
419
match futures::future::select(pending, done).await {
420
Either::Right((5, pending)) => std::mem::drop(pending),
421
_ => panic!("unexpected select result"),
422
}
423
}
424
425
let ex = RawExecutor::<EpollReactor>::new().unwrap();
426
ex.run_until(do_test(&ex)).unwrap();
427
428
// Example of starting the framework and running a future:
429
async fn my_async(x: Rc<RefCell<u64>>) {
430
x.replace(4);
431
}
432
433
let x = Rc::new(RefCell::new(0));
434
{
435
let ex = RawExecutor::<EpollReactor>::new().unwrap();
436
ex.run_until(my_async(x.clone())).unwrap();
437
}
438
assert_eq!(*x.borrow(), 4);
439
}
440
441
#[test]
442
fn drop_before_completion() {
443
const VALUE: u64 = 0x66ae_cb65_12fb_d260;
444
445
async fn write_value(mut tx: File) {
446
let buf = VALUE.to_ne_bytes();
447
tx.write_all(&buf[..]).expect("Failed to write to pipe");
448
}
449
450
async fn check_op(op: PendingOperation) {
451
let err = op.await.expect_err("Task completed successfully");
452
match err {
453
Error::ExecutorGone => {}
454
e => panic!("Unexpected error from task: {e}"),
455
}
456
}
457
458
let (mut rx, tx) = base::pipe().expect("Pipe failed");
459
460
let ex = RawExecutor::<EpollReactor>::new().unwrap();
461
462
let source = RegisteredSource::new(&ex, tx.try_clone().unwrap()).unwrap();
463
let op = source.wait_writable().unwrap();
464
465
ex.spawn_local(write_value(tx)).detach();
466
ex.spawn_local(check_op(op)).detach();
467
468
// Now drop the executor. It should still run until the write to the pipe is complete.
469
mem::drop(ex);
470
471
let mut buf = 0u64.to_ne_bytes();
472
rx.read_exact(&mut buf[..])
473
.expect("Failed to read from pipe");
474
475
assert_eq!(u64::from_ne_bytes(buf), VALUE);
476
}
477
478
// Dropping a task that owns a BlockingPool shouldn't leak the pool.
479
#[test]
480
fn drop_detached_blocking_pool() {
481
struct Cleanup(BlockingPool);
482
483
impl Drop for Cleanup {
484
fn drop(&mut self) {
485
// Make sure we shutdown cleanly (BlockingPool::drop just prints a warning).
486
self.0
487
.shutdown(Some(
488
std::time::Instant::now() + std::time::Duration::from_secs(1),
489
))
490
.unwrap();
491
}
492
}
493
494
let rc = Rc::new(std::cell::Cell::new(0));
495
{
496
let ex = RawExecutor::<EpollReactor>::new().unwrap();
497
let rc_clone = rc.clone();
498
ex.spawn_local(async move {
499
rc_clone.set(1);
500
let pool = Cleanup(BlockingPool::new(1, std::time::Duration::new(60, 0)));
501
let (send, recv) = std::sync::mpsc::sync_channel::<()>(0);
502
// Spawn a blocking task.
503
let blocking_task = pool.0.spawn(move || {
504
// Rendezvous.
505
assert_eq!(recv.recv(), Ok(()));
506
// Wait for drop.
507
assert_eq!(recv.recv(), Err(std::sync::mpsc::RecvError));
508
});
509
// Make sure it has actually started (using a "rendezvous channel" send).
510
//
511
// Without this step, we'll have a race where we can shutdown the blocking pool
512
// before the worker thread pops off the task.
513
send.send(()).unwrap();
514
// Wait for it to finish
515
blocking_task.await;
516
rc_clone.set(2);
517
})
518
.detach();
519
ex.run_until(async {}).unwrap();
520
// `ex` is dropped here. If everything is working as expected, it should drop all of
521
// its tasks, including `send` and `pool` (in that order, which is important). `pool`'s
522
// `Drop` impl will try to join all the worker threads, which should work because send
523
// half of the channel closed.
524
}
525
assert_eq!(rc.get(), 1);
526
Rc::try_unwrap(rc).expect("Rc had too many refs");
527
}
528
529
// Test the waker implementation. This code path doesn't get hit by `IoSource`, only by backend
530
// agnostic libraries, like `BlockingPool` and `futures::channel`.
531
#[test]
532
fn test_non_io_waker() {
533
use std::task::Poll;
534
535
struct Sleep(Option<u64>);
536
537
impl Future for Sleep {
538
type Output = ();
539
540
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
541
if let Some(ms) = self.0.take() {
542
let waker = cx.waker().clone();
543
std::thread::spawn(move || {
544
std::thread::sleep(std::time::Duration::from_millis(ms));
545
waker.wake();
546
});
547
Poll::Pending
548
} else {
549
Poll::Ready(())
550
}
551
}
552
}
553
554
let ex = RawExecutor::<EpollReactor>::new().unwrap();
555
ex.run_until(async move {
556
// Test twice because there was once a bug where the second time panic'd.
557
Sleep(Some(1)).await;
558
Sleep(Some(1)).await;
559
})
560
.unwrap();
561
}
562
}
563
564