Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bytecodealliance
GitHub Repository: bytecodealliance/wasmtime
Path: blob/main/crates/misc/component-async-tests/tests/scenario/streams.rs
1693 views
1
use {
2
super::util::{config, make_component},
3
anyhow::Result,
4
component_async_tests::{
5
Ctx, closed_streams,
6
util::{OneshotConsumer, OneshotProducer, PipeConsumer, PipeProducer},
7
},
8
futures::{
9
Sink, SinkExt, Stream, StreamExt,
10
channel::{mpsc, oneshot},
11
future,
12
},
13
std::{
14
pin::Pin,
15
sync::{Arc, Mutex},
16
task::{Context, Poll},
17
},
18
wasmtime::{
19
Engine, Store, StoreContextMut,
20
component::{
21
Destination, FutureReader, Linker, ResourceTable, Source, StreamConsumer,
22
StreamProducer, StreamReader, StreamResult,
23
},
24
},
25
wasmtime_wasi::WasiCtxBuilder,
26
};
27
28
pub struct DirectPipeProducer<S>(S);
29
30
impl<D, S: Stream<Item = u8> + Send + 'static> StreamProducer<D> for DirectPipeProducer<S> {
31
type Item = u8;
32
type Buffer = Option<u8>;
33
34
fn poll_produce<'a>(
35
self: Pin<&mut Self>,
36
cx: &mut Context<'_>,
37
store: StoreContextMut<D>,
38
destination: Destination<'a, Self::Item, Self::Buffer>,
39
finish: bool,
40
) -> Poll<Result<StreamResult>> {
41
// SAFETY: This is a standard pin-projection, and we never move
42
// out of `self`.
43
let stream = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
44
45
match stream.poll_next(cx) {
46
Poll::Pending => {
47
if finish {
48
Poll::Ready(Ok(StreamResult::Cancelled))
49
} else {
50
Poll::Pending
51
}
52
}
53
Poll::Ready(Some(item)) => {
54
let mut destination = destination.as_direct(store, 1);
55
destination.remaining()[0] = item;
56
destination.mark_written(1);
57
Poll::Ready(Ok(StreamResult::Completed))
58
}
59
Poll::Ready(None) => Poll::Ready(Ok(StreamResult::Dropped)),
60
}
61
}
62
}
63
64
pub struct DirectPipeConsumer<S>(S);
65
66
impl<D, S: Sink<u8, Error: std::error::Error + Send + Sync> + Send + 'static> StreamConsumer<D>
67
for DirectPipeConsumer<S>
68
{
69
type Item = u8;
70
71
fn poll_consume(
72
self: Pin<&mut Self>,
73
cx: &mut Context<'_>,
74
store: StoreContextMut<D>,
75
source: Source<Self::Item>,
76
finish: bool,
77
) -> Poll<Result<StreamResult>> {
78
// SAFETY: This is a standard pin-projection, and we never move
79
// out of `self`.
80
let mut sink = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
81
82
let on_pending = || {
83
if finish {
84
Poll::Ready(Ok(StreamResult::Cancelled))
85
} else {
86
Poll::Pending
87
}
88
};
89
90
match sink.as_mut().poll_flush(cx) {
91
Poll::Pending => on_pending(),
92
Poll::Ready(result) => {
93
result?;
94
match sink.as_mut().poll_ready(cx) {
95
Poll::Pending => on_pending(),
96
Poll::Ready(result) => {
97
result?;
98
let mut source = source.as_direct(store);
99
let item = source.remaining()[0];
100
source.mark_read(1);
101
sink.start_send(item)?;
102
Poll::Ready(Ok(StreamResult::Completed))
103
}
104
}
105
}
106
}
107
}
108
}
109
110
#[tokio::test]
111
pub async fn async_closed_streams() -> Result<()> {
112
let engine = Engine::new(&config())?;
113
114
let mut store = Store::new(
115
&engine,
116
Ctx {
117
wasi: WasiCtxBuilder::new().inherit_stdio().build(),
118
table: ResourceTable::default(),
119
continue_: false,
120
wakers: Arc::new(Mutex::new(None)),
121
},
122
);
123
124
let mut linker = Linker::new(&engine);
125
126
wasmtime_wasi::p2::add_to_linker_async(&mut linker)?;
127
128
let component = make_component(
129
&engine,
130
&[test_programs_artifacts::ASYNC_CLOSED_STREAMS_COMPONENT],
131
)
132
.await?;
133
134
let instance = linker.instantiate_async(&mut store, &component).await?;
135
136
let values = vec![42_u8, 43, 44];
137
138
let value = 42_u8;
139
140
// First, test stream host->host
141
for direct_producer in [true, false] {
142
for direct_consumer in [true, false] {
143
let (mut input_tx, input_rx) = mpsc::channel(1);
144
let (output_tx, mut output_rx) = mpsc::channel(1);
145
let reader = if direct_producer {
146
StreamReader::new(instance, &mut store, DirectPipeProducer(input_rx))
147
} else {
148
StreamReader::new(instance, &mut store, PipeProducer::new(input_rx))
149
};
150
if direct_consumer {
151
reader.pipe(&mut store, DirectPipeConsumer(output_tx));
152
} else {
153
reader.pipe(&mut store, PipeConsumer::new(output_tx));
154
}
155
156
instance
157
.run_concurrent(&mut store, async |_| {
158
let (a, b) = future::join(
159
async {
160
for &value in &values {
161
input_tx.send(value).await?;
162
}
163
drop(input_tx);
164
anyhow::Ok(())
165
},
166
async {
167
for &value in &values {
168
assert_eq!(Some(value), output_rx.next().await);
169
}
170
assert!(output_rx.next().await.is_none());
171
Ok(())
172
},
173
)
174
.await;
175
176
a.and(b)
177
})
178
.await??;
179
}
180
}
181
182
// Next, test futures host->host
183
{
184
let (input_tx, input_rx) = oneshot::channel();
185
let (output_tx, output_rx) = oneshot::channel();
186
FutureReader::new(instance, &mut store, OneshotProducer::new(input_rx))
187
.pipe(&mut store, OneshotConsumer::new(output_tx));
188
189
instance
190
.run_concurrent(&mut store, async |_| {
191
_ = input_tx.send(value);
192
assert_eq!(value, output_rx.await?);
193
anyhow::Ok(())
194
})
195
.await??;
196
}
197
198
// Next, test stream host->guest
199
{
200
let (mut tx, rx) = mpsc::channel(1);
201
let rx = StreamReader::new(instance, &mut store, PipeProducer::new(rx));
202
203
let closed_streams = closed_streams::bindings::ClosedStreams::new(&mut store, &instance)?;
204
205
let values = values.clone();
206
207
instance
208
.run_concurrent(&mut store, async move |accessor| {
209
let (a, b) = future::join(
210
async {
211
for &value in &values {
212
tx.send(value).await?;
213
}
214
drop(tx);
215
Ok(())
216
},
217
closed_streams.local_local_closed().call_read_stream(
218
accessor,
219
rx,
220
values.clone(),
221
),
222
)
223
.await;
224
225
a.and(b)
226
})
227
.await??;
228
}
229
230
// Next, test futures host->guest
231
{
232
let (tx, rx) = oneshot::channel();
233
let rx = FutureReader::new(instance, &mut store, OneshotProducer::new(rx));
234
let (_, rx_ignored) = oneshot::channel();
235
let rx_ignored = FutureReader::new(instance, &mut store, OneshotProducer::new(rx_ignored));
236
237
let closed_streams = closed_streams::bindings::ClosedStreams::new(&mut store, &instance)?;
238
239
instance
240
.run_concurrent(&mut store, async move |accessor| {
241
_ = tx.send(value);
242
closed_streams
243
.local_local_closed()
244
.call_read_future(accessor, rx, value, rx_ignored)
245
.await
246
})
247
.await??;
248
}
249
250
Ok(())
251
}
252
253