Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
google
GitHub Repository: google/crosvm
Path: blob/main/cros_async/src/common_executor.rs
5394 views
1
// Copyright 2023 The ChromiumOS Authors
2
// Use of this source code is governed by a BSD-style license that can be
3
// found in the LICENSE file.
4
5
use std::future::Future;
6
use std::io::Result;
7
use std::pin::Pin;
8
use std::sync::atomic::AtomicI32;
9
use std::sync::atomic::Ordering;
10
use std::sync::Arc;
11
use std::sync::Weak;
12
use std::task::Context;
13
use std::task::Poll;
14
15
use async_task::Task;
16
use base::warn;
17
use base::AsRawDescriptor;
18
use base::AsRawDescriptors;
19
use base::RawDescriptor;
20
use futures::task::noop_waker;
21
use pin_utils::pin_mut;
22
use sync::Mutex;
23
24
use crate::queue::RunnableQueue;
25
use crate::waker::WeakWake;
26
use crate::AsyncError;
27
use crate::AsyncResult;
28
use crate::BlockingPool;
29
use crate::DetachedTasks;
30
use crate::ExecutorTrait;
31
use crate::IntoAsync;
32
use crate::IoSource;
33
use crate::TaskHandle;
34
35
/// Abstraction for IO backends.
36
pub trait Reactor: Send + Sync + Sized {
37
fn new() -> Result<Self>;
38
39
/// Called when the executor is being dropped to allow orderly shutdown (e.g. cancelling IO
40
/// work). The returned future will be run to completion.
41
///
42
/// Note that, since this is called from `RawExecutor::drop`, there will not be any
43
/// `Arc<Executor>` left, so weak references to the executor will always fail to upgrade at
44
/// this point. Reactors can potentially make use of this fact to keep more IO work from being
45
/// submitted.
46
fn on_executor_drop<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a>>;
47
48
/// Called when an executor run loop starts on a thread.
49
fn on_thread_start(&self) {}
50
51
/// Block until an event occurs (e.g. IO work is ready) or until `wake` is called.
52
///
53
/// As an optimization, `set_processing` should be called immediately after wake up (i.e.
54
/// before any book keeping is done) so that concurrent calls to wakers can safely skip making
55
/// redundant calls to `Reactor::wake`.
56
fn wait_for_work(&self, set_processing: impl Fn()) -> Result<()>;
57
58
/// Wake up any pending `wait_for_work` calls. If there are none pending, then wake up the next
59
/// `wait_for_work` call (necessary to avoid race conditions).
60
fn wake(&self);
61
62
/// Create an `IoSource` for the backend.
63
fn new_source<F: AsRawDescriptor>(
64
&self,
65
ex: &Arc<RawExecutor<Self>>,
66
f: F,
67
) -> AsyncResult<IoSource<F>>;
68
69
fn wrap_task_handle<R>(task: RawTaskHandle<Self, R>) -> TaskHandle<R>;
70
}
71
72
// Indicates the executor is either within or about to make a `Reactor::wait_for_work` call. When a
73
// waker sees this value, it will call `Reactor::wake`.
74
const WAITING: i32 = 0x1d5b_c019u32 as i32;
75
76
// Indicates the executor is processing futures.
77
const PROCESSING: i32 = 0xd474_77bcu32 as i32;
78
79
// Indicates one or more futures may be ready to make progress (i.e. causes the main loop to return
80
// diretly to PROCESSING instead of WAITING).
81
const WOKEN: i32 = 0x3e4d_3276u32 as i32;
82
83
pub struct RawExecutor<Re: Reactor + 'static> {
84
pub reactor: Re,
85
queue: RunnableQueue,
86
blocking_pool: BlockingPool,
87
state: AtomicI32,
88
detached_tasks: Mutex<DetachedTasks>,
89
}
90
91
impl<Re: Reactor> RawExecutor<Re> {
92
pub fn new_with(reactor: Re) -> AsyncResult<Arc<Self>> {
93
Ok(Arc::new(RawExecutor {
94
reactor,
95
queue: RunnableQueue::new(),
96
blocking_pool: Default::default(),
97
state: AtomicI32::new(PROCESSING),
98
detached_tasks: Mutex::new(DetachedTasks::new()),
99
}))
100
}
101
102
pub fn new() -> AsyncResult<Arc<Self>> {
103
Self::new_with(Re::new().map_err(AsyncError::Io)?)
104
}
105
106
fn wake(&self) {
107
let oldstate = self.state.swap(WOKEN, Ordering::AcqRel);
108
if oldstate == WAITING {
109
self.reactor.wake();
110
}
111
}
112
113
fn run_internal<F: Future>(&self, cx: &mut Context, done: F) -> AsyncResult<F::Output> {
114
self.reactor.on_thread_start();
115
116
pin_mut!(done);
117
118
loop {
119
self.state.store(PROCESSING, Ordering::Release);
120
for runnable in self.queue.iter() {
121
runnable.run();
122
}
123
124
if let Ok(mut tasks) = self.detached_tasks.try_lock() {
125
tasks.poll(cx);
126
}
127
128
if let Poll::Ready(val) = done.as_mut().poll(cx) {
129
return Ok(val);
130
}
131
132
let oldstate = self.state.compare_exchange(
133
PROCESSING,
134
WAITING,
135
Ordering::AcqRel,
136
Ordering::Acquire,
137
);
138
if let Err(oldstate) = oldstate {
139
debug_assert_eq!(oldstate, WOKEN);
140
// One or more futures have become runnable.
141
continue;
142
}
143
144
self.reactor
145
.wait_for_work(|| self.state.store(PROCESSING, Ordering::Release))
146
.map_err(AsyncError::Io)?;
147
}
148
}
149
}
150
151
impl<Re: Reactor + 'static> ExecutorTrait for Arc<RawExecutor<Re>> {
152
fn async_from<'a, F: IntoAsync + 'a>(&self, f: F) -> AsyncResult<IoSource<F>> {
153
self.reactor.new_source(self, f)
154
}
155
156
fn spawn<F>(&self, f: F) -> TaskHandle<F::Output>
157
where
158
F: Future + Send + 'static,
159
F::Output: Send + 'static,
160
{
161
let raw = Arc::downgrade(self);
162
let schedule = move |runnable| {
163
if let Some(r) = raw.upgrade() {
164
r.queue.push_back(runnable);
165
r.wake();
166
}
167
};
168
let (runnable, task) = async_task::spawn(f, schedule);
169
runnable.schedule();
170
Re::wrap_task_handle(RawTaskHandle {
171
task,
172
raw: Arc::downgrade(self),
173
})
174
}
175
176
fn spawn_local<F>(&self, f: F) -> TaskHandle<F::Output>
177
where
178
F: Future + 'static,
179
F::Output: 'static,
180
{
181
let raw = Arc::downgrade(self);
182
let schedule = move |runnable| {
183
if let Some(r) = raw.upgrade() {
184
r.queue.push_back(runnable);
185
r.wake();
186
}
187
};
188
let (runnable, task) = async_task::spawn_local(f, schedule);
189
runnable.schedule();
190
Re::wrap_task_handle(RawTaskHandle {
191
task,
192
raw: Arc::downgrade(self),
193
})
194
}
195
196
fn spawn_blocking<F, R>(&self, f: F) -> TaskHandle<R>
197
where
198
F: FnOnce() -> R + Send + 'static,
199
R: Send + 'static,
200
{
201
self.spawn(self.blocking_pool.spawn(f))
202
}
203
204
fn run_until<F: Future>(&self, f: F) -> AsyncResult<F::Output> {
205
let waker = super::waker::new_waker(Arc::downgrade(self));
206
let mut ctx = Context::from_waker(&waker);
207
208
self.run_internal(&mut ctx, f)
209
}
210
}
211
212
impl<Re: Reactor + AsRawDescriptors> AsRawDescriptors for RawExecutor<Re> {
213
fn as_raw_descriptors(&self) -> Vec<RawDescriptor> {
214
self.reactor.as_raw_descriptors()
215
}
216
}
217
218
impl<Re: Reactor> WeakWake for RawExecutor<Re> {
219
fn wake_by_ref(weak_self: &Weak<Self>) {
220
if let Some(arc_self) = weak_self.upgrade() {
221
RawExecutor::wake(&arc_self);
222
}
223
}
224
}
225
226
impl<Re: Reactor> Drop for RawExecutor<Re> {
227
fn drop(&mut self) {
228
let final_future = self.reactor.on_executor_drop();
229
230
let waker = noop_waker();
231
let mut cx = Context::from_waker(&waker);
232
if let Err(e) = self.run_internal(&mut cx, final_future) {
233
warn!("Failed to drive RawExecutor to completion: {}", e);
234
}
235
}
236
}
237
238
pub struct RawTaskHandle<Re: Reactor + 'static, R> {
239
task: Task<R>,
240
raw: Weak<RawExecutor<Re>>,
241
}
242
243
impl<Re: Reactor, R: Send + 'static> RawTaskHandle<Re, R> {
244
pub fn detach(self) {
245
if let Some(raw) = self.raw.upgrade() {
246
raw.detached_tasks.lock().push(self.task);
247
}
248
}
249
250
pub async fn cancel(self) -> Option<R> {
251
self.task.cancel().await
252
}
253
}
254
255
impl<Re: Reactor, R: 'static> Future for RawTaskHandle<Re, R> {
256
type Output = R;
257
258
fn poll(
259
mut self: std::pin::Pin<&mut Self>,
260
cx: &mut std::task::Context,
261
) -> std::task::Poll<Self::Output> {
262
Pin::new(&mut self.task).poll(cx)
263
}
264
}
265
266