Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
google
GitHub Repository: google/crosvm
Path: blob/main/cros_async/src/executor.rs
5392 views
1
// Copyright 2024 The ChromiumOS Authors
2
// Use of this source code is governed by a BSD-style license that can be
3
// found in the LICENSE file.
4
5
use std::future::Future;
6
use std::pin::Pin;
7
use std::sync::Arc;
8
use std::sync::OnceLock;
9
10
#[cfg(any(target_os = "android", target_os = "linux"))]
11
use base::warn;
12
#[cfg(any(target_os = "android", target_os = "linux"))]
13
use base::AsRawDescriptors;
14
#[cfg(any(target_os = "android", target_os = "linux"))]
15
use base::RawDescriptor;
16
use serde::Deserialize;
17
use serde_keyvalue::argh::FromArgValue;
18
use serde_keyvalue::ErrorKind;
19
use serde_keyvalue::KeyValueDeserializer;
20
21
use crate::common_executor;
22
use crate::common_executor::RawExecutor;
23
#[cfg(any(target_os = "android", target_os = "linux"))]
24
use crate::sys::linux;
25
#[cfg(windows)]
26
use crate::sys::windows;
27
use crate::sys::ExecutorKindSys;
28
use crate::AsyncResult;
29
use crate::IntoAsync;
30
use crate::IoSource;
31
32
cfg_if::cfg_if! {
33
if #[cfg(feature = "tokio")] {
34
use crate::tokio_executor::TokioExecutor;
35
use crate::tokio_executor::TokioTaskHandle;
36
}
37
}
38
39
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
40
pub enum ExecutorKind {
41
SysVariants(ExecutorKindSys),
42
#[cfg(feature = "tokio")]
43
Tokio,
44
}
45
46
impl From<ExecutorKindSys> for ExecutorKind {
47
fn from(e: ExecutorKindSys) -> ExecutorKind {
48
ExecutorKind::SysVariants(e)
49
}
50
}
51
52
/// If set, [`ExecutorKind::default()`] returns the value of `DEFAULT_EXECUTOR_KIND`.
53
/// If not set, [`ExecutorKind::default()`] returns a statically-chosen default value, and
54
/// [`ExecutorKind::default()`] initializes `DEFAULT_EXECUTOR_KIND` with that value.
55
static DEFAULT_EXECUTOR_KIND: OnceLock<ExecutorKind> = OnceLock::new();
56
57
impl Default for ExecutorKind {
58
fn default() -> Self {
59
#[cfg(any(target_os = "android", target_os = "linux"))]
60
let default_fn = || ExecutorKindSys::Fd.into();
61
#[cfg(windows)]
62
let default_fn = || ExecutorKindSys::Handle.into();
63
*DEFAULT_EXECUTOR_KIND.get_or_init(default_fn)
64
}
65
}
66
67
/// The error type for [`Executor::set_default_executor_kind()`].
68
#[derive(thiserror::Error, Debug)]
69
pub enum SetDefaultExecutorKindError {
70
/// The default executor kind is set more than once.
71
#[error("The default executor kind is already set to {0:?}")]
72
SetMoreThanOnce(ExecutorKind),
73
74
#[cfg(any(target_os = "android", target_os = "linux"))]
75
/// io_uring is unavailable. The reason might be the lack of the kernel support,
76
/// but is not limited to that.
77
#[error("io_uring is unavailable: {0}")]
78
UringUnavailable(linux::uring_executor::Error),
79
}
80
81
impl FromArgValue for ExecutorKind {
82
fn from_arg_value(value: &str) -> std::result::Result<ExecutorKind, String> {
83
// `from_arg_value` returns a `String` as error, but our deserializer API defines its own
84
// error type. Perform parsing from a closure so we can easily map returned errors.
85
let builder = move || {
86
let mut des = KeyValueDeserializer::from(value);
87
88
let kind: ExecutorKind = match (des.parse_identifier()?, des.next_char()) {
89
#[cfg(any(target_os = "android", target_os = "linux"))]
90
("epoll", None) => ExecutorKindSys::Fd.into(),
91
#[cfg(any(target_os = "android", target_os = "linux"))]
92
("uring", None) => ExecutorKindSys::Uring.into(),
93
#[cfg(windows)]
94
("handle", None) => ExecutorKindSys::Handle.into(),
95
#[cfg(windows)]
96
("overlapped", None) => ExecutorKindSys::Overlapped { concurrency: None }.into(),
97
#[cfg(windows)]
98
("overlapped", Some(',')) => {
99
if des.parse_identifier()? != "concurrency" {
100
let kind = ErrorKind::SerdeError("expected `concurrency`".to_string());
101
return Err(des.error_here(kind));
102
}
103
if des.next_char() != Some('=') {
104
return Err(des.error_here(ErrorKind::ExpectedEqual));
105
}
106
let concurrency = des.parse_number()?;
107
ExecutorKindSys::Overlapped {
108
concurrency: Some(concurrency),
109
}
110
.into()
111
}
112
#[cfg(feature = "tokio")]
113
("tokio", None) => ExecutorKind::Tokio,
114
(_identifier, _next) => {
115
let kind = ErrorKind::SerdeError("unexpected kind".to_string());
116
return Err(des.error_here(kind));
117
}
118
};
119
des.finish()?;
120
Ok(kind)
121
};
122
123
builder().map_err(|e| e.to_string())
124
}
125
}
126
127
impl serde::Serialize for ExecutorKind {
128
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
129
where
130
S: serde::Serializer,
131
{
132
match self {
133
ExecutorKind::SysVariants(sv) => sv.serialize(serializer),
134
#[cfg(feature = "tokio")]
135
ExecutorKind::Tokio => "tokio".serialize(serializer),
136
}
137
}
138
}
139
140
impl<'de> Deserialize<'de> for ExecutorKind {
141
fn deserialize<D>(deserializer: D) -> Result<ExecutorKind, D::Error>
142
where
143
D: serde::Deserializer<'de>,
144
{
145
let string = String::deserialize(deserializer)?;
146
ExecutorKind::from_arg_value(&string).map_err(serde::de::Error::custom)
147
}
148
}
149
150
/// Reference to a task managed by the executor.
151
///
152
/// Dropping a `TaskHandle` attempts to cancel the associated task. Call `detach` to allow it to
153
/// continue running the background.
154
///
155
/// `await`ing the `TaskHandle` waits for the task to finish and yields its result.
156
pub enum TaskHandle<R> {
157
#[cfg(any(target_os = "android", target_os = "linux"))]
158
Fd(common_executor::RawTaskHandle<linux::EpollReactor, R>),
159
#[cfg(any(target_os = "android", target_os = "linux"))]
160
Uring(common_executor::RawTaskHandle<linux::UringReactor, R>),
161
#[cfg(windows)]
162
Handle(common_executor::RawTaskHandle<windows::HandleReactor, R>),
163
#[cfg(feature = "tokio")]
164
Tokio(TokioTaskHandle<R>),
165
}
166
167
impl<R: Send + 'static> TaskHandle<R> {
168
pub fn detach(self) {
169
match self {
170
#[cfg(any(target_os = "android", target_os = "linux"))]
171
TaskHandle::Fd(f) => f.detach(),
172
#[cfg(any(target_os = "android", target_os = "linux"))]
173
TaskHandle::Uring(u) => u.detach(),
174
#[cfg(windows)]
175
TaskHandle::Handle(h) => h.detach(),
176
#[cfg(feature = "tokio")]
177
TaskHandle::Tokio(t) => t.detach(),
178
}
179
}
180
181
// Cancel the task and wait for it to stop. Returns the result of the task if it was already
182
// finished.
183
pub async fn cancel(self) -> Option<R> {
184
match self {
185
#[cfg(any(target_os = "android", target_os = "linux"))]
186
TaskHandle::Fd(f) => f.cancel().await,
187
#[cfg(any(target_os = "android", target_os = "linux"))]
188
TaskHandle::Uring(u) => u.cancel().await,
189
#[cfg(windows)]
190
TaskHandle::Handle(h) => h.cancel().await,
191
#[cfg(feature = "tokio")]
192
TaskHandle::Tokio(t) => t.cancel().await,
193
}
194
}
195
}
196
197
impl<R: 'static> Future for TaskHandle<R> {
198
type Output = R;
199
200
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> std::task::Poll<Self::Output> {
201
match self.get_mut() {
202
#[cfg(any(target_os = "android", target_os = "linux"))]
203
TaskHandle::Fd(f) => Pin::new(f).poll(cx),
204
#[cfg(any(target_os = "android", target_os = "linux"))]
205
TaskHandle::Uring(u) => Pin::new(u).poll(cx),
206
#[cfg(windows)]
207
TaskHandle::Handle(h) => Pin::new(h).poll(cx),
208
#[cfg(feature = "tokio")]
209
TaskHandle::Tokio(t) => Pin::new(t).poll(cx),
210
}
211
}
212
}
213
214
pub(crate) trait ExecutorTrait {
215
fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>>;
216
217
fn spawn<F>(&self, f: F) -> TaskHandle<F::Output>
218
where
219
F: Future + Send + 'static,
220
F::Output: Send + 'static;
221
222
fn spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R>
223
where
224
F: FnOnce() -> R + Send + 'static,
225
R: Send + 'static;
226
227
fn spawn_local<F>(&self, f: F) -> TaskHandle<F::Output>
228
where
229
F: Future + 'static,
230
F::Output: 'static;
231
232
fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output>;
233
}
234
235
/// An executor for scheduling tasks that poll futures to completion.
236
///
237
/// All asynchronous operations must run within an executor, which is capable of spawning futures as
238
/// tasks. This executor also provides a mechanism for performing asynchronous I/O operations.
239
///
240
/// The returned type is a cheap, clonable handle to the underlying executor. Cloning it will only
241
/// create a new reference, not a new executor.
242
///
243
/// Note that language limitations (trait objects can have <=1 non auto trait) require this to be
244
/// represented on the POSIX side as an enum, rather than a trait. This leads to some code &
245
/// interface duplication, but as far as we understand that is unavoidable.
246
///
247
/// See <https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2571401/2..6/cros_async/src/executor.rs#b75>
248
/// for further details.
249
///
250
/// # Examples
251
///
252
/// Concurrently wait for multiple files to become readable/writable and then read/write the data.
253
///
254
/// ```
255
/// use std::cmp::min;
256
/// use std::error::Error;
257
/// use std::fs::{File, OpenOptions};
258
///
259
/// use cros_async::{AsyncResult, Executor, IoSource, complete3};
260
/// const CHUNK_SIZE: usize = 32;
261
///
262
/// // Write all bytes from `data` to `f`.
263
/// async fn write_file(f: &IoSource<File>, mut data: Vec<u8>) -> AsyncResult<()> {
264
/// while data.len() > 0 {
265
/// let (count, mut buf) = f.write_from_vec(None, data).await?;
266
///
267
/// data = buf.split_off(count);
268
/// }
269
///
270
/// Ok(())
271
/// }
272
///
273
/// // Transfer `len` bytes of data from `from` to `to`.
274
/// async fn transfer_data(
275
/// from: IoSource<File>,
276
/// to: IoSource<File>,
277
/// len: usize,
278
/// ) -> AsyncResult<usize> {
279
/// let mut rem = len;
280
///
281
/// while rem > 0 {
282
/// let buf = vec![0u8; min(rem, CHUNK_SIZE)];
283
/// let (count, mut data) = from.read_to_vec(None, buf).await?;
284
///
285
/// if count == 0 {
286
/// // End of file. Return the number of bytes transferred.
287
/// return Ok(len - rem);
288
/// }
289
///
290
/// data.truncate(count);
291
/// write_file(&to, data).await?;
292
///
293
/// rem = rem.saturating_sub(count);
294
/// }
295
///
296
/// Ok(len)
297
/// }
298
///
299
/// #[cfg(any(target_os = "android", target_os = "linux"))]
300
/// # fn do_it() -> Result<(), Box<dyn Error>> {
301
/// let ex = Executor::new()?;
302
///
303
/// let (rx, tx) = base::linux::pipe()?;
304
/// let zero = File::open("/dev/zero")?;
305
/// let zero_bytes = CHUNK_SIZE * 7;
306
/// let zero_to_pipe = transfer_data(
307
/// ex.async_from(zero)?,
308
/// ex.async_from(tx.try_clone()?)?,
309
/// zero_bytes,
310
/// );
311
///
312
/// let rand = File::open("/dev/urandom")?;
313
/// let rand_bytes = CHUNK_SIZE * 19;
314
/// let rand_to_pipe = transfer_data(ex.async_from(rand)?, ex.async_from(tx)?, rand_bytes);
315
///
316
/// let null = OpenOptions::new().write(true).open("/dev/null")?;
317
/// let null_bytes = zero_bytes + rand_bytes;
318
/// let pipe_to_null = transfer_data(ex.async_from(rx)?, ex.async_from(null)?, null_bytes);
319
///
320
/// ex.run_until(complete3(
321
/// async { assert_eq!(pipe_to_null.await.unwrap(), null_bytes) },
322
/// async { assert_eq!(zero_to_pipe.await.unwrap(), zero_bytes) },
323
/// async { assert_eq!(rand_to_pipe.await.unwrap(), rand_bytes) },
324
/// ))?;
325
///
326
/// # Ok(())
327
/// # }
328
/// #[cfg(any(target_os = "android", target_os = "linux"))]
329
/// # do_it().unwrap();
330
/// ```
331
#[derive(Clone)]
332
pub enum Executor {
333
#[cfg(any(target_os = "android", target_os = "linux"))]
334
Fd(Arc<RawExecutor<linux::EpollReactor>>),
335
#[cfg(any(target_os = "android", target_os = "linux"))]
336
Uring(Arc<RawExecutor<linux::UringReactor>>),
337
#[cfg(windows)]
338
Handle(Arc<RawExecutor<windows::HandleReactor>>),
339
#[cfg(windows)]
340
Overlapped(Arc<RawExecutor<windows::HandleReactor>>),
341
#[cfg(feature = "tokio")]
342
Tokio(TokioExecutor),
343
}
344
345
impl Executor {
346
/// Create a new `Executor`.
347
pub fn new() -> AsyncResult<Self> {
348
Executor::with_executor_kind(ExecutorKind::default())
349
}
350
351
/// Create a new `Executor` of the given `ExecutorKind`.
352
pub fn with_executor_kind(kind: ExecutorKind) -> AsyncResult<Self> {
353
Ok(match kind {
354
#[cfg(any(target_os = "android", target_os = "linux"))]
355
ExecutorKind::SysVariants(ExecutorKindSys::Fd) => Executor::Fd(RawExecutor::new()?),
356
#[cfg(any(target_os = "android", target_os = "linux"))]
357
ExecutorKind::SysVariants(ExecutorKindSys::Uring) => {
358
Executor::Uring(RawExecutor::new()?)
359
}
360
#[cfg(windows)]
361
ExecutorKind::SysVariants(ExecutorKindSys::Handle) => {
362
Executor::Handle(RawExecutor::new()?)
363
}
364
#[cfg(windows)]
365
ExecutorKind::SysVariants(ExecutorKindSys::Overlapped { concurrency }) => {
366
let reactor = match concurrency {
367
Some(concurrency) => windows::HandleReactor::new_with(concurrency)?,
368
None => windows::HandleReactor::new()?,
369
};
370
Executor::Overlapped(RawExecutor::new_with(reactor)?)
371
}
372
#[cfg(feature = "tokio")]
373
ExecutorKind::Tokio => Executor::Tokio(TokioExecutor::new()?),
374
})
375
}
376
377
/// Set the default ExecutorKind for [`Self::new()`]. This call is effective only once.
378
pub fn set_default_executor_kind(
379
executor_kind: ExecutorKind,
380
) -> Result<(), SetDefaultExecutorKindError> {
381
#[cfg(any(target_os = "android", target_os = "linux"))]
382
if executor_kind == ExecutorKind::SysVariants(ExecutorKindSys::Uring) {
383
linux::uring_executor::check_uring_availability()
384
.map_err(SetDefaultExecutorKindError::UringUnavailable)?;
385
if !crate::is_uring_stable() {
386
warn!(
387
"Enabling io_uring executor on the kernel version where io_uring is unstable"
388
);
389
}
390
}
391
DEFAULT_EXECUTOR_KIND.set(executor_kind).map_err(|_|
392
// `expect` succeeds since this closure runs only when DEFAULT_EXECUTOR_KIND is set.
393
SetDefaultExecutorKindError::SetMoreThanOnce(
394
*DEFAULT_EXECUTOR_KIND
395
.get()
396
.expect("Failed to get DEFAULT_EXECUTOR_KIND"),
397
))
398
}
399
400
/// Create a new `IoSource<F>` associated with `self`. Callers may then use the returned
401
/// `IoSource` to directly start async operations without needing a separate reference to the
402
/// executor.
403
pub fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>> {
404
match self {
405
#[cfg(any(target_os = "android", target_os = "linux"))]
406
Executor::Fd(ex) => ex.async_from(f),
407
#[cfg(any(target_os = "android", target_os = "linux"))]
408
Executor::Uring(ex) => ex.async_from(f),
409
#[cfg(windows)]
410
Executor::Handle(ex) => ex.async_from(f),
411
#[cfg(windows)]
412
Executor::Overlapped(ex) => ex.async_from(f),
413
#[cfg(feature = "tokio")]
414
Executor::Tokio(ex) => ex.async_from(f),
415
}
416
}
417
418
/// Create a new overlapped `IoSource<F>` associated with `self`. Callers may then use the
419
/// If the executor is not overlapped, then Handle source is returned.
420
/// returned `IoSource` to directly start async operations without needing a separate reference
421
/// to the executor.
422
#[cfg(windows)]
423
pub fn async_overlapped_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>> {
424
match self {
425
Executor::Overlapped(ex) => Ok(IoSource::Overlapped(windows::OverlappedSource::new(
426
f, ex, false,
427
)?)),
428
_ => self.async_from(f),
429
}
430
}
431
432
/// Spawn a new future for this executor to run to completion. Callers may use the returned
433
/// `TaskHandle` to await on the result of `f`. Dropping the returned `TaskHandle` will cancel
434
/// `f`, preventing it from being polled again. To drop a `TaskHandle` without canceling the
435
/// future associated with it use `TaskHandle::detach`.
436
///
437
/// # Examples
438
///
439
/// ```
440
/// # use cros_async::AsyncResult;
441
/// # fn example_spawn() -> AsyncResult<()> {
442
/// # use std::thread;
443
///
444
/// # use cros_async::Executor;
445
/// use futures::executor::block_on;
446
///
447
/// # let ex = Executor::new()?;
448
///
449
/// # // Spawn a thread that runs the executor.
450
/// # let ex2 = ex.clone();
451
/// # thread::spawn(move || ex2.run());
452
///
453
/// let task = ex.spawn(async { 7 + 13 });
454
///
455
/// let result = block_on(task);
456
/// assert_eq!(result, 20);
457
/// # Ok(())
458
/// # }
459
///
460
/// # example_spawn().unwrap();
461
/// ```
462
pub fn spawn<F>(&self, f: F) -> TaskHandle<F::Output>
463
where
464
F: Future + Send + 'static,
465
F::Output: Send + 'static,
466
{
467
match self {
468
#[cfg(any(target_os = "android", target_os = "linux"))]
469
Executor::Fd(ex) => ex.spawn(f),
470
#[cfg(any(target_os = "android", target_os = "linux"))]
471
Executor::Uring(ex) => ex.spawn(f),
472
#[cfg(windows)]
473
Executor::Handle(ex) => ex.spawn(f),
474
#[cfg(windows)]
475
Executor::Overlapped(ex) => ex.spawn(f),
476
#[cfg(feature = "tokio")]
477
Executor::Tokio(ex) => ex.spawn(f),
478
}
479
}
480
481
/// Spawn a thread-local task for this executor to drive to completion. Like `spawn` but without
482
/// requiring `Send` on `F` or `F::Output`. This method should only be called from the same
483
/// thread where `run()` or `run_until()` is called.
484
///
485
/// # Panics
486
///
487
/// `Executor::run` and `Executor::run_util` will panic if they try to poll a future that was
488
/// added by calling `spawn_local` from a different thread.
489
///
490
/// # Examples
491
///
492
/// ```
493
/// # use cros_async::AsyncResult;
494
/// # fn example_spawn_local() -> AsyncResult<()> {
495
/// # use cros_async::Executor;
496
///
497
/// # let ex = Executor::new()?;
498
///
499
/// let task = ex.spawn_local(async { 7 + 13 });
500
///
501
/// let result = ex.run_until(task)?;
502
/// assert_eq!(result, 20);
503
/// Ok(())
504
/// # }
505
///
506
/// # example_spawn_local().unwrap();
507
/// ```
508
pub fn spawn_local<F>(&self, f: F) -> TaskHandle<F::Output>
509
where
510
F: Future + 'static,
511
F::Output: 'static,
512
{
513
match self {
514
#[cfg(any(target_os = "android", target_os = "linux"))]
515
Executor::Fd(ex) => ex.spawn_local(f),
516
#[cfg(any(target_os = "android", target_os = "linux"))]
517
Executor::Uring(ex) => ex.spawn_local(f),
518
#[cfg(windows)]
519
Executor::Handle(ex) => ex.spawn_local(f),
520
#[cfg(windows)]
521
Executor::Overlapped(ex) => ex.spawn_local(f),
522
#[cfg(feature = "tokio")]
523
Executor::Tokio(ex) => ex.spawn_local(f),
524
}
525
}
526
527
/// Run the provided closure on a dedicated thread where blocking is allowed.
528
///
529
/// Callers may `await` on the returned `TaskHandle` to wait for the result of `f`. Dropping
530
/// the returned `TaskHandle` may not cancel the operation if it was already started on a
531
/// worker thread.
532
///
533
/// # Panics
534
///
535
/// `await`ing the `TaskHandle` after the `Executor` is dropped will panic if the work was not
536
/// already completed.
537
///
538
/// # Examples
539
///
540
/// ```edition2018
541
/// # use cros_async::Executor;
542
///
543
/// # async fn do_it(ex: &Executor) {
544
/// let res = ex.spawn_blocking(move || {
545
/// // Do some CPU-intensive or blocking work here.
546
///
547
/// 42
548
/// }).await;
549
///
550
/// assert_eq!(res, 42);
551
/// # }
552
///
553
/// # let ex = Executor::new().unwrap();
554
/// # ex.run_until(do_it(&ex)).unwrap();
555
/// ```
556
pub fn spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R>
557
where
558
F: FnOnce() -> R + Send + 'static,
559
R: Send + 'static,
560
{
561
match self {
562
#[cfg(any(target_os = "android", target_os = "linux"))]
563
Executor::Fd(ex) => ex.spawn_blocking(f),
564
#[cfg(any(target_os = "android", target_os = "linux"))]
565
Executor::Uring(ex) => ex.spawn_blocking(f),
566
#[cfg(windows)]
567
Executor::Handle(ex) => ex.spawn_blocking(f),
568
#[cfg(windows)]
569
Executor::Overlapped(ex) => ex.spawn_blocking(f),
570
#[cfg(feature = "tokio")]
571
Executor::Tokio(ex) => ex.spawn_blocking(f),
572
}
573
}
574
575
/// Run the executor indefinitely, driving all spawned futures to completion. This method will
576
/// block the current thread and only return in the case of an error.
577
///
578
/// # Panics
579
///
580
/// Once this method has been called on a thread, it may only be called on that thread from that
581
/// point on. Attempting to call it from another thread will panic.
582
///
583
/// # Examples
584
///
585
/// ```
586
/// # use cros_async::AsyncResult;
587
/// # fn example_run() -> AsyncResult<()> {
588
/// use std::thread;
589
///
590
/// use cros_async::Executor;
591
/// use futures::executor::block_on;
592
///
593
/// let ex = Executor::new()?;
594
///
595
/// // Spawn a thread that runs the executor.
596
/// let ex2 = ex.clone();
597
/// thread::spawn(move || ex2.run());
598
///
599
/// let task = ex.spawn(async { 7 + 13 });
600
///
601
/// let result = block_on(task);
602
/// assert_eq!(result, 20);
603
/// # Ok(())
604
/// # }
605
///
606
/// # example_run().unwrap();
607
/// ```
608
pub fn run(&self) -> AsyncResult<()> {
609
self.run_until(std::future::pending())
610
}
611
612
/// Drive all futures spawned in this executor until `f` completes. This method will block the
613
/// current thread only until `f` is complete and there may still be unfinished futures in the
614
/// executor.
615
///
616
/// # Panics
617
///
618
/// Once this method has been called on a thread, from then onwards it may only be called on
619
/// that thread. Attempting to call it from another thread will panic.
620
///
621
/// # Examples
622
///
623
/// ```
624
/// # use cros_async::AsyncResult;
625
/// # fn example_run_until() -> AsyncResult<()> {
626
/// use cros_async::Executor;
627
///
628
/// let ex = Executor::new()?;
629
///
630
/// let task = ex.spawn_local(async { 7 + 13 });
631
///
632
/// let result = ex.run_until(task)?;
633
/// assert_eq!(result, 20);
634
/// # Ok(())
635
/// # }
636
///
637
/// # example_run_until().unwrap();
638
/// ```
639
pub fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output> {
640
match self {
641
#[cfg(any(target_os = "android", target_os = "linux"))]
642
Executor::Fd(ex) => ex.run_until(f),
643
#[cfg(any(target_os = "android", target_os = "linux"))]
644
Executor::Uring(ex) => ex.run_until(f),
645
#[cfg(windows)]
646
Executor::Handle(ex) => ex.run_until(f),
647
#[cfg(windows)]
648
Executor::Overlapped(ex) => ex.run_until(f),
649
#[cfg(feature = "tokio")]
650
Executor::Tokio(ex) => ex.run_until(f),
651
}
652
}
653
}
654
655
#[cfg(any(target_os = "android", target_os = "linux"))]
656
impl AsRawDescriptors for Executor {
657
fn as_raw_descriptors(&self) -> Vec<RawDescriptor> {
658
match self {
659
Executor::Fd(ex) => ex.as_raw_descriptors(),
660
Executor::Uring(ex) => ex.as_raw_descriptors(),
661
#[cfg(feature = "tokio")]
662
Executor::Tokio(ex) => ex.as_raw_descriptors(),
663
}
664
}
665
}
666
667