Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
google
GitHub Repository: google/crosvm
Path: blob/main/cros_async/src/lib.rs
5394 views
1
// Copyright 2020 The ChromiumOS Authors
2
// Use of this source code is governed by a BSD-style license that can be
3
// found in the LICENSE file.
4
5
//! An Executor and future combinators based on operations that block on file descriptors.
6
//!
7
//! This crate is meant to be used with the `futures-rs` crate that provides further combinators
8
//! and utility functions to combine and manage futures. All futures will run until they block on a
9
//! file descriptor becoming readable or writable. Facilities are provided to register future
10
//! wakers based on such events.
11
//!
12
//! # Running top-level futures.
13
//!
14
//! Use helper functions based the desired behavior of your application.
15
//!
16
//! ## Completing one of several futures.
17
//!
18
//! If there are several top level tasks that should run until any one completes, use the "select"
19
//! family of executor constructors. These return an [`Executor`](trait.Executor.html) whose `run`
20
//! function will return when the first future completes. The uncompleted futures will also be
21
//! returned so they can be run further or otherwise cleaned up. These functions are inspired by
22
//! the `select_all` function from futures-rs, but built to be run inside an FD based executor and
23
//! to poll only when necessary. See the docs for [`select2`](fn.select2.html),
24
//! [`select3`](fn.select3.html), [`select4`](fn.select4.html), and [`select5`](fn.select5.html).
25
//!
26
//! ## Completing all of several futures.
27
//!
28
//! If there are several top level tasks that all need to be completed, use the "complete" family
29
//! of executor constructors. These return an [`Executor`](trait.Executor.html) whose `run`
30
//! function will return only once all the futures passed to it have completed. These functions are
31
//! inspired by the `join_all` function from futures-rs, but built to be run inside an FD based
32
//! executor and to poll only when necessary. See the docs for [`complete2`](fn.complete2.html),
33
//! [`complete3`](fn.complete3.html), [`complete4`](fn.complete4.html), and
34
//! [`complete5`](fn.complete5.html).
35
//!
36
//! # Implementing new FD-based futures.
37
//!
38
//! For URing implementations should provide an implementation of the `IoSource` trait.
39
//! For the FD executor, new futures can use the existing ability to poll a source to build async
40
//! functionality on top of.
41
//!
42
//! # Implementations
43
//!
44
//! Currently there are two paths for using the asynchronous IO. One uses a WaitContext and drives
45
//! futures based on the FDs signaling they are ready for the opteration. This method will exist so
46
//! long as kernels < 5.4 are supported.
47
//! The other method submits operations to io_uring and is signaled when they complete. This is more
48
//! efficient, but only supported on kernel 5.4+.
49
//! If `IoSource::new` is used to interface with async IO, then the correct backend will be chosen
50
//! automatically.
51
//!
52
//! # Examples
53
//!
54
//! See the docs for `IoSource` if support for kernels <5.4 is required. Focus on `UringSource` if
55
//! all systems have support for io_uring.
56
57
mod async_types;
58
pub mod audio_streams_async;
59
mod blocking;
60
mod common_executor;
61
mod complete;
62
mod event;
63
mod executor;
64
mod io_ext;
65
mod io_source;
66
pub mod mem;
67
mod queue;
68
mod select;
69
pub mod sync;
70
pub mod sys;
71
mod timer;
72
#[cfg(feature = "tokio")]
73
mod tokio_executor;
74
mod waker;
75
76
use std::future::Future;
77
use std::pin::Pin;
78
use std::task::Poll;
79
80
pub use async_types::*;
81
pub use base::Event;
82
#[cfg(any(target_os = "android", target_os = "linux"))]
83
pub use blocking::sys::linux::block_on::block_on;
84
pub use blocking::unblock;
85
pub use blocking::unblock_disarm;
86
pub use blocking::BlockingPool;
87
pub use blocking::CancellableBlockingPool;
88
pub use blocking::TimeoutAction;
89
pub use event::EventAsync;
90
pub use executor::Executor;
91
pub use executor::ExecutorKind;
92
pub(crate) use executor::ExecutorTrait;
93
pub use executor::TaskHandle;
94
#[cfg(windows)]
95
pub use futures::executor::block_on;
96
use futures::stream::FuturesUnordered;
97
pub use io_ext::AsyncError;
98
pub use io_ext::AsyncResult;
99
pub use io_ext::AsyncWrapper;
100
pub use io_ext::IntoAsync;
101
pub use io_source::IoSource;
102
pub use mem::BackingMemory;
103
pub use mem::MemRegion;
104
pub use mem::MemRegionIter;
105
pub use mem::VecIoWrapper;
106
use remain::sorted;
107
pub use select::SelectResult;
108
#[cfg(any(target_os = "android", target_os = "linux"))]
109
pub use sys::linux::uring_executor::is_uring_stable;
110
use thiserror::Error as ThisError;
111
pub use timer::TimerAsync;
112
113
#[sorted]
114
#[derive(ThisError, Debug)]
115
pub enum Error {
116
/// Error from EventAsync
117
#[error("Failure in EventAsync: {0}")]
118
EventAsync(base::Error),
119
/// Error from the handle executor.
120
#[cfg(windows)]
121
#[error("Failure in the handle executor: {0}")]
122
HandleExecutor(sys::windows::handle_executor::Error),
123
#[error("IO error: {0}")]
124
Io(std::io::Error),
125
/// Error from the polled(FD) source, which includes error from the FD executor.
126
#[cfg(any(target_os = "android", target_os = "linux"))]
127
#[error("An error with a poll source: {0}")]
128
PollSource(sys::linux::poll_source::Error),
129
/// Error from Timer.
130
#[error("Failure in Timer: {0}")]
131
Timer(base::Error),
132
/// Error from TimerFd.
133
#[error("Failure in TimerAsync: {0}")]
134
TimerAsync(AsyncError),
135
/// Error from the uring executor.
136
#[cfg(any(target_os = "android", target_os = "linux"))]
137
#[error("Failure in the uring executor: {0}")]
138
URingExecutor(sys::linux::uring_executor::Error),
139
}
140
pub type Result<T> = std::result::Result<T, Error>;
141
142
/// Heterogeneous collection of `async_task:Task` that are running in a "detached" state.
143
///
144
/// We keep them around to ensure they are dropped before the executor they are running on.
145
pub(crate) struct DetachedTasks(FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>>);
146
147
impl DetachedTasks {
148
pub(crate) fn new() -> Self {
149
DetachedTasks(FuturesUnordered::new())
150
}
151
152
pub(crate) fn push<R: Send + 'static>(&self, task: async_task::Task<R>) {
153
// Convert to fallible, otherwise poll could panic if the `Runnable` is dropped early.
154
let task = task.fallible();
155
self.0.push(Box::pin(async {
156
let _ = task.await;
157
}));
158
}
159
160
/// Polls all the tasks, dropping any that complete.
161
pub(crate) fn poll(&mut self, cx: &mut std::task::Context) {
162
use futures::Stream;
163
while let Poll::Ready(Some(_)) = Pin::new(&mut self.0).poll_next(cx) {}
164
}
165
}
166
167
// Select helpers to run until any future completes.
168
169
/// Creates a combinator that runs the two given futures until one completes, returning a tuple
170
/// containing the result of the finished future and the still pending future.
171
///
172
/// # Example
173
///
174
/// ```
175
/// use cros_async::{SelectResult, select2, block_on};
176
/// use futures::future::pending;
177
/// use futures::pin_mut;
178
///
179
/// let first = async {5};
180
/// let second = async {let () = pending().await;};
181
/// pin_mut!(first);
182
/// pin_mut!(second);
183
/// match block_on(select2(first, second)) {
184
/// (SelectResult::Finished(5), SelectResult::Pending(_second)) => (),
185
/// _ => panic!("Select didn't return the first future"),
186
/// };
187
/// ```
188
pub async fn select2<F1: Future + Unpin, F2: Future + Unpin>(
189
f1: F1,
190
f2: F2,
191
) -> (SelectResult<F1>, SelectResult<F2>) {
192
select::Select2::new(f1, f2).await
193
}
194
195
/// Creates a combinator that runs the three given futures until one or more completes, returning a
196
/// tuple containing the result of the finished future(s) and the still pending future(s).
197
///
198
/// # Example
199
///
200
/// ```
201
/// use cros_async::{SelectResult, select3, block_on};
202
/// use futures::future::pending;
203
/// use futures::pin_mut;
204
///
205
/// let first = async {4};
206
/// let second = async {let () = pending().await;};
207
/// let third = async {5};
208
/// pin_mut!(first);
209
/// pin_mut!(second);
210
/// pin_mut!(third);
211
/// match block_on(select3(first, second, third)) {
212
/// (SelectResult::Finished(4),
213
/// SelectResult::Pending(_second),
214
/// SelectResult::Finished(5)) => (),
215
/// _ => panic!("Select didn't return the futures"),
216
/// };
217
/// ```
218
pub async fn select3<F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin>(
219
f1: F1,
220
f2: F2,
221
f3: F3,
222
) -> (SelectResult<F1>, SelectResult<F2>, SelectResult<F3>) {
223
select::Select3::new(f1, f2, f3).await
224
}
225
226
/// Creates a combinator that runs the four given futures until one or more completes, returning a
227
/// tuple containing the result of the finished future(s) and the still pending future(s).
228
///
229
/// # Example
230
///
231
/// ```
232
/// use cros_async::{SelectResult, select4, block_on};
233
/// use futures::future::pending;
234
/// use futures::pin_mut;
235
///
236
/// let first = async {4};
237
/// let second = async {let () = pending().await;};
238
/// let third = async {5};
239
/// let fourth = async {let () = pending().await;};
240
/// pin_mut!(first);
241
/// pin_mut!(second);
242
/// pin_mut!(third);
243
/// pin_mut!(fourth);
244
/// match block_on(select4(first, second, third, fourth)) {
245
/// (SelectResult::Finished(4), SelectResult::Pending(_second),
246
/// SelectResult::Finished(5), SelectResult::Pending(_fourth)) => (),
247
/// _ => panic!("Select didn't return the futures"),
248
/// };
249
/// ```
250
pub async fn select4<
251
F1: Future + Unpin,
252
F2: Future + Unpin,
253
F3: Future + Unpin,
254
F4: Future + Unpin,
255
>(
256
f1: F1,
257
f2: F2,
258
f3: F3,
259
f4: F4,
260
) -> (
261
SelectResult<F1>,
262
SelectResult<F2>,
263
SelectResult<F3>,
264
SelectResult<F4>,
265
) {
266
select::Select4::new(f1, f2, f3, f4).await
267
}
268
269
/// Creates a combinator that runs the five given futures until one or more completes, returning a
270
/// tuple containing the result of the finished future(s) and the still pending future(s).
271
///
272
/// # Example
273
///
274
/// ```
275
/// use cros_async::{SelectResult, select5, block_on};
276
/// use futures::future::pending;
277
/// use futures::pin_mut;
278
///
279
/// let first = async {4};
280
/// let second = async {let () = pending().await;};
281
/// let third = async {5};
282
/// let fourth = async {let () = pending().await;};
283
/// let fifth = async {6};
284
/// pin_mut!(first);
285
/// pin_mut!(second);
286
/// pin_mut!(third);
287
/// pin_mut!(fourth);
288
/// pin_mut!(fifth);
289
/// match block_on(select5(first, second, third, fourth, fifth)) {
290
/// (SelectResult::Finished(4), SelectResult::Pending(_second),
291
/// SelectResult::Finished(5), SelectResult::Pending(_fourth),
292
/// SelectResult::Finished(6)) => (),
293
/// _ => panic!("Select didn't return the futures"),
294
/// };
295
/// ```
296
pub async fn select5<
297
F1: Future + Unpin,
298
F2: Future + Unpin,
299
F3: Future + Unpin,
300
F4: Future + Unpin,
301
F5: Future + Unpin,
302
>(
303
f1: F1,
304
f2: F2,
305
f3: F3,
306
f4: F4,
307
f5: F5,
308
) -> (
309
SelectResult<F1>,
310
SelectResult<F2>,
311
SelectResult<F3>,
312
SelectResult<F4>,
313
SelectResult<F5>,
314
) {
315
select::Select5::new(f1, f2, f3, f4, f5).await
316
}
317
318
/// Creates a combinator that runs the six given futures until one or more completes, returning a
319
/// tuple containing the result of the finished future(s) and the still pending future(s).
320
///
321
/// # Example
322
///
323
/// ```
324
/// use cros_async::{SelectResult, select6, block_on};
325
/// use futures::future::pending;
326
/// use futures::pin_mut;
327
///
328
/// let first = async {1};
329
/// let second = async {let () = pending().await;};
330
/// let third = async {3};
331
/// let fourth = async {let () = pending().await;};
332
/// let fifth = async {5};
333
/// let sixth = async {6};
334
/// pin_mut!(first);
335
/// pin_mut!(second);
336
/// pin_mut!(third);
337
/// pin_mut!(fourth);
338
/// pin_mut!(fifth);
339
/// pin_mut!(sixth);
340
/// match block_on(select6(first, second, third, fourth, fifth, sixth)) {
341
/// (SelectResult::Finished(1), SelectResult::Pending(_second),
342
/// SelectResult::Finished(3), SelectResult::Pending(_fourth),
343
/// SelectResult::Finished(5), SelectResult::Finished(6)) => (),
344
/// _ => panic!("Select didn't return the futures"),
345
/// };
346
/// ```
347
pub async fn select6<
348
F1: Future + Unpin,
349
F2: Future + Unpin,
350
F3: Future + Unpin,
351
F4: Future + Unpin,
352
F5: Future + Unpin,
353
F6: Future + Unpin,
354
>(
355
f1: F1,
356
f2: F2,
357
f3: F3,
358
f4: F4,
359
f5: F5,
360
f6: F6,
361
) -> (
362
SelectResult<F1>,
363
SelectResult<F2>,
364
SelectResult<F3>,
365
SelectResult<F4>,
366
SelectResult<F5>,
367
SelectResult<F6>,
368
) {
369
select::Select6::new(f1, f2, f3, f4, f5, f6).await
370
}
371
372
pub async fn select7<
373
F1: Future + Unpin,
374
F2: Future + Unpin,
375
F3: Future + Unpin,
376
F4: Future + Unpin,
377
F5: Future + Unpin,
378
F6: Future + Unpin,
379
F7: Future + Unpin,
380
>(
381
f1: F1,
382
f2: F2,
383
f3: F3,
384
f4: F4,
385
f5: F5,
386
f6: F6,
387
f7: F7,
388
) -> (
389
SelectResult<F1>,
390
SelectResult<F2>,
391
SelectResult<F3>,
392
SelectResult<F4>,
393
SelectResult<F5>,
394
SelectResult<F6>,
395
SelectResult<F7>,
396
) {
397
select::Select7::new(f1, f2, f3, f4, f5, f6, f7).await
398
}
399
400
pub async fn select8<
401
F1: Future + Unpin,
402
F2: Future + Unpin,
403
F3: Future + Unpin,
404
F4: Future + Unpin,
405
F5: Future + Unpin,
406
F6: Future + Unpin,
407
F7: Future + Unpin,
408
F8: Future + Unpin,
409
>(
410
f1: F1,
411
f2: F2,
412
f3: F3,
413
f4: F4,
414
f5: F5,
415
f6: F6,
416
f7: F7,
417
f8: F8,
418
) -> (
419
SelectResult<F1>,
420
SelectResult<F2>,
421
SelectResult<F3>,
422
SelectResult<F4>,
423
SelectResult<F5>,
424
SelectResult<F6>,
425
SelectResult<F7>,
426
SelectResult<F8>,
427
) {
428
select::Select8::new(f1, f2, f3, f4, f5, f6, f7, f8).await
429
}
430
431
pub async fn select9<
432
F1: Future + Unpin,
433
F2: Future + Unpin,
434
F3: Future + Unpin,
435
F4: Future + Unpin,
436
F5: Future + Unpin,
437
F6: Future + Unpin,
438
F7: Future + Unpin,
439
F8: Future + Unpin,
440
F9: Future + Unpin,
441
>(
442
f1: F1,
443
f2: F2,
444
f3: F3,
445
f4: F4,
446
f5: F5,
447
f6: F6,
448
f7: F7,
449
f8: F8,
450
f9: F9,
451
) -> (
452
SelectResult<F1>,
453
SelectResult<F2>,
454
SelectResult<F3>,
455
SelectResult<F4>,
456
SelectResult<F5>,
457
SelectResult<F6>,
458
SelectResult<F7>,
459
SelectResult<F8>,
460
SelectResult<F9>,
461
) {
462
select::Select9::new(f1, f2, f3, f4, f5, f6, f7, f8, f9).await
463
}
464
465
pub async fn select10<
466
F1: Future + Unpin,
467
F2: Future + Unpin,
468
F3: Future + Unpin,
469
F4: Future + Unpin,
470
F5: Future + Unpin,
471
F6: Future + Unpin,
472
F7: Future + Unpin,
473
F8: Future + Unpin,
474
F9: Future + Unpin,
475
F10: Future + Unpin,
476
>(
477
f1: F1,
478
f2: F2,
479
f3: F3,
480
f4: F4,
481
f5: F5,
482
f6: F6,
483
f7: F7,
484
f8: F8,
485
f9: F9,
486
f10: F10,
487
) -> (
488
SelectResult<F1>,
489
SelectResult<F2>,
490
SelectResult<F3>,
491
SelectResult<F4>,
492
SelectResult<F5>,
493
SelectResult<F6>,
494
SelectResult<F7>,
495
SelectResult<F8>,
496
SelectResult<F9>,
497
SelectResult<F10>,
498
) {
499
select::Select10::new(f1, f2, f3, f4, f5, f6, f7, f8, f9, f10).await
500
}
501
502
pub async fn select11<
503
F1: Future + Unpin,
504
F2: Future + Unpin,
505
F3: Future + Unpin,
506
F4: Future + Unpin,
507
F5: Future + Unpin,
508
F6: Future + Unpin,
509
F7: Future + Unpin,
510
F8: Future + Unpin,
511
F9: Future + Unpin,
512
F10: Future + Unpin,
513
F11: Future + Unpin,
514
>(
515
f1: F1,
516
f2: F2,
517
f3: F3,
518
f4: F4,
519
f5: F5,
520
f6: F6,
521
f7: F7,
522
f8: F8,
523
f9: F9,
524
f10: F10,
525
f11: F11,
526
) -> (
527
SelectResult<F1>,
528
SelectResult<F2>,
529
SelectResult<F3>,
530
SelectResult<F4>,
531
SelectResult<F5>,
532
SelectResult<F6>,
533
SelectResult<F7>,
534
SelectResult<F8>,
535
SelectResult<F9>,
536
SelectResult<F10>,
537
SelectResult<F11>,
538
) {
539
select::Select11::new(f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11).await
540
}
541
542
pub async fn select12<
543
F1: Future + Unpin,
544
F2: Future + Unpin,
545
F3: Future + Unpin,
546
F4: Future + Unpin,
547
F5: Future + Unpin,
548
F6: Future + Unpin,
549
F7: Future + Unpin,
550
F8: Future + Unpin,
551
F9: Future + Unpin,
552
F10: Future + Unpin,
553
F11: Future + Unpin,
554
F12: Future + Unpin,
555
>(
556
f1: F1,
557
f2: F2,
558
f3: F3,
559
f4: F4,
560
f5: F5,
561
f6: F6,
562
f7: F7,
563
f8: F8,
564
f9: F9,
565
f10: F10,
566
f11: F11,
567
f12: F12,
568
) -> (
569
SelectResult<F1>,
570
SelectResult<F2>,
571
SelectResult<F3>,
572
SelectResult<F4>,
573
SelectResult<F5>,
574
SelectResult<F6>,
575
SelectResult<F7>,
576
SelectResult<F8>,
577
SelectResult<F9>,
578
SelectResult<F10>,
579
SelectResult<F11>,
580
SelectResult<F12>,
581
) {
582
select::Select12::new(f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11, f12).await
583
}
584
585
// Combination helpers to run until all futures are complete.
586
587
/// Creates a combinator that runs the two given futures to completion, returning a tuple of the
588
/// outputs each yields.
589
///
590
/// # Example
591
///
592
/// ```
593
/// use cros_async::{complete2, block_on};
594
///
595
/// let first = async {5};
596
/// let second = async {6};
597
/// assert_eq!(block_on(complete2(first, second)), (5,6));
598
/// ```
599
pub async fn complete2<F1, F2>(f1: F1, f2: F2) -> (F1::Output, F2::Output)
600
where
601
F1: Future,
602
F2: Future,
603
{
604
complete::Complete2::new(f1, f2).await
605
}
606
607
/// Creates a combinator that runs the three given futures to completion, returning a tuple of the
608
/// outputs each yields.
609
///
610
/// # Example
611
///
612
/// ```
613
/// use cros_async::{complete3, block_on};
614
///
615
/// let first = async {5};
616
/// let second = async {6};
617
/// let third = async {7};
618
/// assert_eq!(block_on(complete3(first, second, third)), (5,6,7));
619
/// ```
620
pub async fn complete3<F1, F2, F3>(f1: F1, f2: F2, f3: F3) -> (F1::Output, F2::Output, F3::Output)
621
where
622
F1: Future,
623
F2: Future,
624
F3: Future,
625
{
626
complete::Complete3::new(f1, f2, f3).await
627
}
628
629
/// Creates a combinator that runs the four given futures to completion, returning a tuple of the
630
/// outputs each yields.
631
///
632
/// # Example
633
///
634
/// ```
635
/// use cros_async::{complete4, block_on};
636
///
637
/// let first = async {5};
638
/// let second = async {6};
639
/// let third = async {7};
640
/// let fourth = async {8};
641
/// assert_eq!(block_on(complete4(first, second, third, fourth)), (5,6,7,8));
642
/// ```
643
pub async fn complete4<F1, F2, F3, F4>(
644
f1: F1,
645
f2: F2,
646
f3: F3,
647
f4: F4,
648
) -> (F1::Output, F2::Output, F3::Output, F4::Output)
649
where
650
F1: Future,
651
F2: Future,
652
F3: Future,
653
F4: Future,
654
{
655
complete::Complete4::new(f1, f2, f3, f4).await
656
}
657
658
/// Creates a combinator that runs the five given futures to completion, returning a tuple of the
659
/// outputs each yields.
660
///
661
/// # Example
662
///
663
/// ```
664
/// use cros_async::{complete5, block_on};
665
///
666
/// let first = async {5};
667
/// let second = async {6};
668
/// let third = async {7};
669
/// let fourth = async {8};
670
/// let fifth = async {9};
671
/// assert_eq!(block_on(complete5(first, second, third, fourth, fifth)),
672
/// (5,6,7,8,9));
673
/// ```
674
pub async fn complete5<F1, F2, F3, F4, F5>(
675
f1: F1,
676
f2: F2,
677
f3: F3,
678
f4: F4,
679
f5: F5,
680
) -> (F1::Output, F2::Output, F3::Output, F4::Output, F5::Output)
681
where
682
F1: Future,
683
F2: Future,
684
F3: Future,
685
F4: Future,
686
F5: Future,
687
{
688
complete::Complete5::new(f1, f2, f3, f4, f5).await
689
}
690
691