Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
google
GitHub Repository: google/crosvm
Path: blob/main/cros_async/src/sys/windows/handle_executor.rs
5394 views
1
// Copyright 2020 The ChromiumOS Authors
2
// Use of this source code is governed by a BSD-style license that can be
3
// found in the LICENSE file.
4
5
use std::collections::HashMap;
6
use std::future::Future;
7
use std::io;
8
use std::mem;
9
use std::pin::Pin;
10
use std::sync::mpsc;
11
use std::sync::Arc;
12
use std::sync::Weak;
13
use std::task::Waker;
14
15
use base::named_pipes::BoxedOverlapped;
16
use base::warn;
17
use base::AsRawDescriptor;
18
use base::Error as SysError;
19
use base::RawDescriptor;
20
use futures::task::Context;
21
use futures::task::Poll;
22
use sync::Mutex;
23
use thiserror::Error as ThisError;
24
use winapi::um::handleapi::INVALID_HANDLE_VALUE;
25
use winapi::um::minwinbase::OVERLAPPED;
26
27
use crate::common_executor;
28
use crate::common_executor::RawExecutor;
29
use crate::common_executor::RawTaskHandle;
30
use crate::sys::windows::io_completion_port::CompletionPacket;
31
use crate::sys::windows::io_completion_port::IoCompletionPort;
32
use crate::waker::WakerToken;
33
use crate::waker::WeakWake;
34
use crate::AsyncError;
35
use crate::AsyncResult;
36
use crate::IoSource;
37
use crate::TaskHandle;
38
39
const DEFAULT_IO_CONCURRENCY: u32 = 1;
40
41
#[derive(Debug, ThisError)]
42
pub enum Error {
43
#[error("IO completion port operation failed: {0}")]
44
IocpOperationFailed(SysError),
45
#[error("Failed to get future from executor run.")]
46
FailedToReadFutureFromWakerChannel(mpsc::RecvError),
47
#[error("executor gone before future was dropped.")]
48
ExecutorGone,
49
#[error("tried to remove overlapped operation but it didn't exist.")]
50
RemoveNonExistentOperation,
51
}
52
53
impl From<Error> for io::Error {
54
fn from(e: Error) -> Self {
55
use Error::*;
56
match e {
57
FailedToReadFutureFromWakerChannel(e) => io::Error::other(e),
58
IocpOperationFailed(e) => io::Error::other(e),
59
ExecutorGone => io::Error::other(e),
60
RemoveNonExistentOperation => io::Error::other(e),
61
}
62
}
63
}
64
65
impl From<Error> for AsyncError {
66
fn from(e: Error) -> Self {
67
AsyncError::SysVariants(e.into())
68
}
69
}
70
71
pub type Result<T> = std::result::Result<T, Error>;
72
73
/// Represents an overlapped operation that running (or has completed but not yet woken).
74
struct OpData {
75
waker: Option<Waker>,
76
}
77
78
/// The current status of a future that is running or has completed on HandleReactor.
79
enum OpStatus {
80
Pending(OpData),
81
Completed(CompletionPacket),
82
}
83
84
pub struct HandleReactor {
85
iocp: IoCompletionPort,
86
overlapped_ops: Mutex<HashMap<WakerToken, OpStatus>>,
87
}
88
89
impl HandleReactor {
90
pub fn new_with(concurrency: u32) -> Result<Self> {
91
let iocp = IoCompletionPort::new(concurrency)?;
92
Ok(Self {
93
iocp,
94
overlapped_ops: Mutex::new(HashMap::with_capacity(64)),
95
})
96
}
97
98
pub fn new() -> Result<Self> {
99
Self::new_with(DEFAULT_IO_CONCURRENCY)
100
}
101
102
/// All descriptors must be first registered with IOCP before any completion packets can be
103
/// received for them.
104
pub(crate) fn register_descriptor(&self, rd: &dyn AsRawDescriptor) -> Result<()> {
105
self.iocp.register_descriptor(rd)
106
}
107
108
/// When an overlapped operation is created, it is registered with the executor here. This way,
109
/// when the executor's run thread picks up the completion events, it can associate them back
110
/// to the correct overlapped operation. Notice that here, no waker is registered. This is
111
/// because the await hasn't happened yet, so there is no waker. Once the await is triggered,
112
/// we'll invoke get_overlapped_op_if_ready which will register the waker.
113
pub(crate) fn register_overlapped_op(&self, token: &WakerToken) {
114
let mut ops = self.overlapped_ops.lock();
115
ops.insert(*token, OpStatus::Pending(OpData { waker: None }));
116
}
117
118
/// Called to register an overlapped IO source with the executor. From here, the source can
119
/// register overlapped operations that will be managed by the executor.
120
#[allow(dead_code)]
121
pub(crate) fn register_overlapped_source(
122
&self,
123
raw: &Arc<RawExecutor<HandleReactor>>,
124
rd: &dyn AsRawDescriptor,
125
) -> Result<RegisteredOverlappedSource> {
126
RegisteredOverlappedSource::new(rd, raw)
127
}
128
129
/// Every time an `OverlappedOperation` is polled, this method will be called. It's a trick to
130
/// register the waker so that completion events can trigger it from the executor's main thread.
131
fn get_overlapped_op_if_ready(
132
&self,
133
token: &WakerToken,
134
cx: &mut Context,
135
) -> Option<CompletionPacket> {
136
let mut ops = self.overlapped_ops.lock();
137
138
if let OpStatus::Pending(data) = ops
139
.get_mut(token)
140
.expect("`get_overlapped_op_if_ready` called on unknown operation")
141
{
142
data.waker = Some(cx.waker().clone());
143
return None;
144
}
145
if let OpStatus::Completed(pkt) = ops.remove(token).unwrap() {
146
return Some(pkt);
147
}
148
unreachable!("OpStatus didn't match any known variant.");
149
}
150
151
/// When an `OverlappedOperation` is dropped, this is called to so we don't leak registered
152
/// operations. It's possible the operation was already removed (e.g. via polling), in which
153
/// case this has no effect.
154
fn remove_overlapped_op(&self, token: &WakerToken) {
155
let mut ops = self.overlapped_ops.lock();
156
if ops.remove(token).is_none() {
157
warn!("Tried to remove non-existent overlapped operation from HandleReactor.");
158
}
159
}
160
}
161
162
impl common_executor::Reactor for HandleReactor {
163
fn new() -> std::io::Result<Self> {
164
Ok(HandleReactor::new()?)
165
}
166
167
fn wake(&self) {
168
self.iocp.wake().expect("wakeup failed on HandleReactor.");
169
}
170
171
fn on_executor_drop<'a>(&'a self) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
172
// TODO: Cancel overlapped ops and/or wait for everything to complete like the linux
173
// reactors?
174
Box::pin(async {})
175
}
176
177
fn wait_for_work(&self, set_processing: impl Fn()) -> std::io::Result<()> {
178
let completion_packets = self.iocp.poll()?;
179
180
set_processing();
181
182
for pkt in completion_packets {
183
if pkt.completion_key as RawDescriptor == INVALID_HANDLE_VALUE {
184
// These completion packets aren't from overlapped operations. They're from
185
// something calling HandleReactor::wake, so they've already enqueued whatever
186
// they think is runnable into the queue. All the packet does is wake up the
187
// executor loop.
188
continue;
189
}
190
191
let mut overlapped_ops = self.overlapped_ops.lock();
192
if let Some(op) = overlapped_ops.get_mut(&WakerToken(pkt.overlapped_ptr)) {
193
let waker = match mem::replace(op, OpStatus::Completed(pkt)) {
194
OpStatus::Pending(OpData { waker }) => waker,
195
OpStatus::Completed(_) => panic!("operation completed more than once"),
196
};
197
drop(overlapped_ops);
198
if let Some(waker) = waker {
199
waker.wake();
200
} else {
201
// We shouldn't ever get a completion packet for an IO operation that hasn't
202
// registered its waker.
203
warn!(
204
"got a completion packet for an IO operation that had no waker.\
205
future may be stalled."
206
)
207
}
208
}
209
}
210
Ok(())
211
}
212
213
fn new_source<F: AsRawDescriptor>(
214
&self,
215
_ex: &Arc<RawExecutor<Self>>,
216
f: F,
217
) -> AsyncResult<IoSource<F>> {
218
Ok(IoSource::Handle(super::HandleSource::new(f)?))
219
}
220
221
fn wrap_task_handle<R>(task: RawTaskHandle<HandleReactor, R>) -> TaskHandle<R> {
222
TaskHandle::Handle(task)
223
}
224
}
225
226
/// Represents a handle that has been registered for overlapped operations with a specific executor.
227
/// From here, the OverlappedSource can register overlapped operations with the executor.
228
pub(crate) struct RegisteredOverlappedSource {
229
ex: Weak<RawExecutor<HandleReactor>>,
230
}
231
232
impl RegisteredOverlappedSource {
233
fn new(
234
rd: &dyn AsRawDescriptor,
235
ex: &Arc<RawExecutor<HandleReactor>>,
236
) -> Result<RegisteredOverlappedSource> {
237
ex.reactor.register_descriptor(rd)?;
238
Ok(Self {
239
ex: Arc::downgrade(ex),
240
})
241
}
242
243
/// Registers an overlapped IO operation with this executor. Call this function with the
244
/// overlapped struct that represents the operation **before** making the overlapped IO call.
245
///
246
/// NOTE: you MUST pass OverlappedOperation::get_overlapped_ptr() as the overlapped IO pointer
247
/// in the IO call.
248
pub fn register_overlapped_operation(
249
&self,
250
offset: Option<u64>,
251
) -> Result<OverlappedOperation> {
252
OverlappedOperation::new(offset, self.ex.clone())
253
}
254
}
255
256
impl WeakWake for HandleReactor {
257
fn wake_by_ref(weak_self: &Weak<Self>) {
258
if let Some(arc_self) = weak_self.upgrade() {
259
common_executor::Reactor::wake(&*arc_self);
260
}
261
}
262
}
263
264
/// Represents a pending overlapped IO operation. This must be used in the following manner or
265
/// undefined behavior will result:
266
/// 1. The reactor in use is a HandleReactor.
267
/// 2. Immediately after the IO syscall, this future MUST be awaited. We rely on the fact that
268
/// the executor cannot poll the IOCP before this future is polled for the first time to
269
/// ensure the waker has been registered. (If the executor polls the IOCP before the waker is
270
/// registered, the future will stall.)
271
pub(crate) struct OverlappedOperation {
272
overlapped: BoxedOverlapped,
273
ex: Weak<RawExecutor<HandleReactor>>,
274
completed: bool,
275
}
276
277
impl OverlappedOperation {
278
fn new(offset: Option<u64>, ex: Weak<RawExecutor<HandleReactor>>) -> Result<Self> {
279
let ret = Self {
280
overlapped: BoxedOverlapped(Box::new(base::create_overlapped(offset, None))),
281
ex,
282
completed: false,
283
};
284
ret.register_op()?;
285
Ok(ret)
286
}
287
288
fn register_op(&self) -> Result<()> {
289
self.ex
290
.upgrade()
291
.ok_or(Error::ExecutorGone)?
292
.reactor
293
.register_overlapped_op(&self.get_token());
294
Ok(())
295
}
296
297
/// Returns a pointer to the overlapped struct representing the operation. This MUST be used
298
/// when making the overlapped IO call or the executor will not be able to wake the right
299
/// future.
300
pub fn get_overlapped(&mut self) -> &mut OVERLAPPED {
301
&mut self.overlapped.0
302
}
303
304
#[inline]
305
pub fn get_token(&self) -> WakerToken {
306
WakerToken((&*self.overlapped.0) as *const _ as usize)
307
}
308
}
309
310
impl Future for OverlappedOperation {
311
type Output = Result<CompletionPacket>;
312
313
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
314
if self.completed {
315
panic!("OverlappedOperation polled after returning Poll::Ready");
316
}
317
if let Some(ex) = self.ex.upgrade() {
318
if let Some(completion_pkt) =
319
ex.reactor.get_overlapped_op_if_ready(&self.get_token(), cx)
320
{
321
self.completed = true;
322
Poll::Ready(Ok(completion_pkt))
323
} else {
324
Poll::Pending
325
}
326
} else {
327
Poll::Ready(Err(Error::ExecutorGone))
328
}
329
}
330
}
331
332
impl Drop for OverlappedOperation {
333
fn drop(&mut self) {
334
if !self.completed {
335
if let Some(ex) = self.ex.upgrade() {
336
ex.reactor.remove_overlapped_op(&self.get_token());
337
}
338
}
339
}
340
}
341
342
#[cfg(test)]
343
mod test {
344
use super::*;
345
const FUT_MSG: i32 = 5;
346
use std::rc::Rc;
347
348
use futures::channel::mpsc as fut_mpsc;
349
use futures::SinkExt;
350
use futures::StreamExt;
351
352
use crate::BlockingPool;
353
use crate::ExecutorTrait;
354
355
#[test]
356
fn run_future() {
357
let (send, recv) = mpsc::channel();
358
async fn this_test(send: mpsc::Sender<i32>) {
359
send.send(FUT_MSG).unwrap();
360
}
361
362
let ex = RawExecutor::<HandleReactor>::new().unwrap();
363
ex.run_until(this_test(send)).unwrap();
364
assert_eq!(recv.recv().unwrap(), FUT_MSG);
365
}
366
367
#[test]
368
fn spawn_future() {
369
let (send, recv) = fut_mpsc::channel(1);
370
let (send_done_signal, receive_done_signal) = mpsc::channel();
371
372
async fn message_sender(mut send: fut_mpsc::Sender<i32>) {
373
send.send(FUT_MSG).await.unwrap();
374
}
375
376
async fn message_receiver(mut recv: fut_mpsc::Receiver<i32>, done: mpsc::Sender<bool>) {
377
assert_eq!(recv.next().await.unwrap(), FUT_MSG);
378
done.send(true).unwrap();
379
}
380
381
let ex = RawExecutor::<HandleReactor>::new().unwrap();
382
ex.spawn(message_sender(send)).detach();
383
ex.run_until(message_receiver(recv, send_done_signal))
384
.unwrap();
385
assert_eq!(receive_done_signal.recv().unwrap(), true);
386
}
387
388
// Dropping a task that owns a BlockingPool shouldn't leak the pool.
389
#[test]
390
fn drop_detached_blocking_pool() {
391
struct Cleanup(BlockingPool);
392
393
impl Drop for Cleanup {
394
fn drop(&mut self) {
395
// Make sure we shutdown cleanly (BlockingPool::drop just prints a warning).
396
self.0
397
.shutdown(Some(
398
std::time::Instant::now() + std::time::Duration::from_secs(1),
399
))
400
.unwrap();
401
}
402
}
403
404
let rc = Rc::new(std::cell::Cell::new(0));
405
{
406
let ex = RawExecutor::<HandleReactor>::new().unwrap();
407
let rc_clone = rc.clone();
408
ex.spawn_local(async move {
409
rc_clone.set(1);
410
let pool = Cleanup(BlockingPool::new(1, std::time::Duration::new(60, 0)));
411
let (send, recv) = std::sync::mpsc::sync_channel::<()>(0);
412
// Spawn a blocking task.
413
let blocking_task = pool.0.spawn(move || {
414
// Rendezvous.
415
assert_eq!(recv.recv(), Ok(()));
416
// Wait for drop.
417
assert_eq!(recv.recv(), Err(std::sync::mpsc::RecvError));
418
});
419
// Make sure it has actually started (using a "rendezvous channel" send).
420
//
421
// Without this step, we'll have a race where we can shutdown the blocking pool
422
// before the worker thread pops off the task.
423
send.send(()).unwrap();
424
// Wait for it to finish
425
blocking_task.await;
426
rc_clone.set(2);
427
})
428
.detach();
429
ex.run_until(async {}).unwrap();
430
// `ex` is dropped here. If everything is working as expected, it should drop all of
431
// its tasks, including `send` and `pool` (in that order, which is important). `pool`'s
432
// `Drop` impl will try to join all the worker threads, which should work because send
433
// half of the channel closed.
434
}
435
assert_eq!(rc.get(), 1);
436
Rc::try_unwrap(rc).expect("Rc had too many refs");
437
}
438
}
439
440