Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bytecodealliance
GitHub Repository: bytecodealliance/wasmtime
Path: blob/main/crates/wasi-common/src/tokio/sched/unix.rs
2465 views
1
use crate::{
2
Error,
3
sched::{
4
Poll,
5
subscription::{RwEventFlags, Subscription},
6
},
7
};
8
use std::future::Future;
9
use std::pin::Pin;
10
use std::task::{Context, Poll as FPoll};
11
12
struct FirstReady<'a, T>(Vec<Pin<Box<dyn Future<Output = T> + Send + 'a>>>);
13
14
impl<'a, T> FirstReady<'a, T> {
15
fn new() -> Self {
16
FirstReady(Vec::new())
17
}
18
fn push(&mut self, f: impl Future<Output = T> + Send + 'a) {
19
self.0.push(Box::pin(f));
20
}
21
}
22
23
impl<'a, T> Future for FirstReady<'a, T> {
24
type Output = T;
25
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> FPoll<T> {
26
let mut result = FPoll::Pending;
27
for f in self.as_mut().0.iter_mut() {
28
match f.as_mut().poll(cx) {
29
FPoll::Ready(r) => match result {
30
// First ready gets to set the result. But, continue the loop so all futures
31
// which are ready simultaneously (often on first poll) get to report their
32
// readiness.
33
FPoll::Pending => {
34
result = FPoll::Ready(r);
35
}
36
_ => {}
37
},
38
_ => continue,
39
}
40
}
41
return result;
42
}
43
}
44
45
pub async fn poll_oneoff<'a>(poll: &mut Poll<'a>) -> Result<(), Error> {
46
if poll.is_empty() {
47
return Ok(());
48
}
49
50
let duration = poll
51
.earliest_clock_deadline()
52
.map(|sub| sub.duration_until());
53
54
let mut futures = FirstReady::new();
55
for s in poll.rw_subscriptions() {
56
match s {
57
Subscription::Read(f) => {
58
futures.push(async move {
59
f.file
60
.readable()
61
.await
62
.map_err(|e| e.context("readable future"))?;
63
f.complete(
64
f.file
65
.num_ready_bytes()
66
.map_err(|e| e.context("read num_ready_bytes"))?,
67
RwEventFlags::empty(),
68
);
69
Ok::<(), Error>(())
70
});
71
}
72
73
Subscription::Write(f) => {
74
futures.push(async move {
75
f.file
76
.writable()
77
.await
78
.map_err(|e| e.context("writable future"))?;
79
f.complete(0, RwEventFlags::empty());
80
Ok(())
81
});
82
}
83
Subscription::MonotonicClock { .. } => unreachable!(),
84
}
85
}
86
if let Some(Some(remaining_duration)) = duration {
87
match tokio::time::timeout(remaining_duration, futures).await {
88
Ok(r) => r?,
89
Err(_deadline_elapsed) => {}
90
}
91
} else {
92
futures.await?;
93
}
94
95
Ok(())
96
}
97
98