Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bytecodealliance
GitHub Repository: bytecodealliance/wasmtime
Path: blob/main/crates/test-programs/src/bin/async_transmit_caller.rs
1693 views
1
mod bindings {
2
wit_bindgen::generate!({
3
path: "../misc/component-async-tests/wit",
4
world: "transmit-caller",
5
});
6
7
use super::Component;
8
export!(Component);
9
}
10
11
use {
12
bindings::{
13
exports::local::local::run::Guest,
14
local::local::transmit::{self, Control},
15
wit_future, wit_stream,
16
},
17
futures::{FutureExt, StreamExt, future, stream::FuturesUnordered},
18
std::{
19
future::{Future, IntoFuture},
20
pin::{Pin, pin},
21
task::Poll,
22
},
23
wit_bindgen::{FutureWriteCancel, StreamResult},
24
};
25
26
struct Component;
27
28
impl Guest for Component {
29
async fn run() {
30
let (mut control_tx, control_rx) = wit_stream::new();
31
let (mut caller_stream_tx, caller_stream_rx) = wit_stream::new();
32
let (mut caller_future_tx1, caller_future_rx1) = wit_future::new(|| todo!());
33
let (caller_future_tx2, caller_future_rx2) = wit_future::new(|| String::new());
34
35
let (mut callee_stream_rx, mut callee_future_rx1, callee_future_rx2) = transmit::exchange(
36
control_rx,
37
caller_stream_rx,
38
caller_future_rx1,
39
caller_future_rx2,
40
)
41
.await;
42
43
// Tell peer to read from its end of the stream and assert that the result matches an expected value.
44
assert!(
45
control_tx
46
.write_one(Control::ReadStream("a".into()))
47
.await
48
.is_none()
49
);
50
assert!(caller_stream_tx.write_one("a".into()).await.is_none());
51
52
// Start writing another value, but cancel the write before telling the peer to read.
53
{
54
let send = Box::pin(caller_stream_tx.write_one("b".into()));
55
assert!(poll(send).await.is_err());
56
}
57
58
// Tell the peer to read an expected value again, which should _not_ match the value provided in the
59
// canceled write above.
60
assert!(
61
control_tx
62
.write_one(Control::ReadStream("c".into()))
63
.await
64
.is_none()
65
);
66
assert!(caller_stream_tx.write_one("c".into()).await.is_none());
67
68
// Tell the peer to do a zero-length read, do a zero-length write; assert the latter completes, then do a
69
// non-zero-length write, assert that it does _not_ complete, then tell the peer to do a non-zero-length
70
// read and assert that the write completes.
71
assert!(
72
control_tx
73
.write_one(Control::ReadStreamZero)
74
.await
75
.is_none()
76
);
77
{
78
assert_eq!(
79
caller_stream_tx.write(Vec::new()).await.0,
80
StreamResult::Complete(0)
81
);
82
83
let send = Box::pin(caller_stream_tx.write_one("d".into()));
84
let Err(send) = poll(send).await else {
85
panic!()
86
};
87
88
let mut futures = FuturesUnordered::new();
89
futures.push(Box::pin(send.map(|v| {
90
assert!(v.is_none());
91
})) as Pin<Box<dyn Future<Output = _>>>);
92
futures.push(Box::pin(
93
control_tx
94
.write_one(Control::ReadStream("d".into()))
95
.map(|v| {
96
assert!(v.is_none());
97
}),
98
));
99
while let Some(()) = futures.next().await {}
100
}
101
102
// Start writing a value to the future, but cancel the write before telling the peer to read.
103
{
104
let send = Box::pin(caller_future_tx1.write("x".into()));
105
match poll(send).await {
106
Ok(_) => panic!(),
107
Err(mut send) => {
108
caller_future_tx1 = match send.as_mut().cancel() {
109
FutureWriteCancel::AlreadySent => unreachable!(),
110
FutureWriteCancel::Dropped(_) => unreachable!(),
111
FutureWriteCancel::Cancelled(_, writer) => writer,
112
}
113
}
114
}
115
}
116
117
// Tell the peer to read an expected value again, which should _not_ match the value provided in the
118
// canceled write above.
119
assert!(
120
control_tx
121
.write_one(Control::ReadFuture("y".into()))
122
.await
123
.is_none()
124
);
125
caller_future_tx1.write("y".into()).await.unwrap();
126
127
// Tell the peer to write a value to its end of the stream, then read from our end and assert the value
128
// matches.
129
assert!(
130
control_tx
131
.write_one(Control::WriteStream("a".into()))
132
.await
133
.is_none()
134
);
135
assert_eq!(callee_stream_rx.next().await, Some("a".into()));
136
137
// Start reading a value from the stream, but cancel the read before telling the peer to write.
138
{
139
let next = Box::pin(callee_stream_rx.read(Vec::with_capacity(1)));
140
assert!(poll(next).await.is_err());
141
}
142
143
// Once again, tell the peer to write a value to its end of the stream, then read from our end and assert
144
// the value matches.
145
assert!(
146
control_tx
147
.write_one(Control::WriteStream("b".into()))
148
.await
149
.is_none()
150
);
151
assert_eq!(callee_stream_rx.next().await, Some("b".into()));
152
153
// Tell the peer to do a zero-length write, assert that the read does _not_ complete, then tell the peer to
154
// do a non-zero-length write and assert that the read completes.
155
assert!(
156
control_tx
157
.write_one(Control::WriteStreamZero)
158
.await
159
.is_none()
160
);
161
{
162
let next = Box::pin(callee_stream_rx.next());
163
let Err(next) = poll(next).await else {
164
panic!()
165
};
166
167
let mut futures = FuturesUnordered::new();
168
futures.push(Box::pin(next.map(|v| {
169
assert_eq!(v, Some("c".into()));
170
})) as Pin<Box<dyn Future<Output = _>>>);
171
futures.push(Box::pin(
172
control_tx
173
.write_one(Control::WriteStream("c".into()))
174
.map(|v| {
175
assert!(v.is_none());
176
}),
177
));
178
while let Some(()) = futures.next().await {}
179
}
180
181
// Start reading a value from the future, but cancel the read before telling the peer to write.
182
{
183
let next = Box::pin(callee_future_rx1.into_future());
184
match poll(next).await {
185
Ok(_) => panic!(),
186
Err(mut next) => callee_future_rx1 = next.as_mut().cancel().unwrap_err(),
187
}
188
}
189
190
// Tell the peer to write a value to its end of the future, then read from our end and assert the value
191
// matches.
192
assert!(
193
control_tx
194
.write_one(Control::WriteFuture("b".into()))
195
.await
196
.is_none()
197
);
198
assert_eq!(callee_future_rx1.into_future().await, "b");
199
200
// Start writing a value to the stream, but drop the stream without telling the peer to read.
201
let send = Box::pin(caller_stream_tx.write_one("d".into()));
202
assert!(poll(send).await.is_err());
203
drop(caller_stream_tx);
204
205
// Start reading a value from the stream, but drop the stream without telling the peer to write.
206
let next = Box::pin(callee_stream_rx.next());
207
assert!(poll(next).await.is_err());
208
drop(callee_stream_rx);
209
210
// Start writing a value to the future, but drop the write without telling the peer to read.
211
{
212
let send = pin!(caller_future_tx2.write("x".into()));
213
assert!(poll(send).await.is_err());
214
}
215
216
// Start reading a value from the future, but drop the read without telling the peer to write.
217
{
218
let next = Box::pin(callee_future_rx2.into_future());
219
assert!(poll(next).await.is_err());
220
}
221
}
222
}
223
224
async fn poll<T, F: Future<Output = T> + Unpin>(fut: F) -> Result<T, F> {
225
let mut fut = Some(fut);
226
future::poll_fn(move |cx| {
227
let mut fut = fut.take().unwrap();
228
Poll::Ready(match fut.poll_unpin(cx) {
229
Poll::Ready(v) => Ok(v),
230
Poll::Pending => Err(fut),
231
})
232
})
233
.await
234
}
235
236
// Unused function; required since this file is built as a `bin`:
237
fn main() {}
238
239