Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bytecodealliance
GitHub Repository: bytecodealliance/wasmtime
Path: blob/main/crates/misc/component-async-tests/tests/scenario/streams.rs
3088 views
1
use {
2
super::util::{config, make_component},
3
component_async_tests::{
4
Ctx, closed_streams,
5
util::{OneshotConsumer, OneshotProducer, PipeConsumer, PipeProducer},
6
},
7
futures::{
8
FutureExt, Sink, SinkExt, Stream, StreamExt,
9
channel::{mpsc, oneshot},
10
future,
11
},
12
std::{
13
mem,
14
ops::DerefMut,
15
pin::Pin,
16
sync::{Arc, Mutex},
17
task::{self, Context, Poll},
18
time::Duration,
19
},
20
wasmtime::{
21
Engine, Result, Store, StoreContextMut,
22
component::{
23
Destination, FutureReader, Lift, Linker, ResourceTable, Source, StreamConsumer,
24
StreamProducer, StreamReader, StreamResult, VecBuffer,
25
},
26
},
27
wasmtime_wasi::WasiCtxBuilder,
28
};
29
30
pub struct DirectPipeProducer<S>(S);
31
32
impl<D, S: Stream<Item = u8> + Send + 'static> StreamProducer<D> for DirectPipeProducer<S> {
33
type Item = u8;
34
type Buffer = Option<u8>;
35
36
fn poll_produce<'a>(
37
self: Pin<&mut Self>,
38
cx: &mut Context<'_>,
39
store: StoreContextMut<D>,
40
destination: Destination<'a, Self::Item, Self::Buffer>,
41
finish: bool,
42
) -> Poll<Result<StreamResult>> {
43
// SAFETY: This is a standard pin-projection, and we never move
44
// out of `self`.
45
let stream = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
46
47
match stream.poll_next(cx) {
48
Poll::Pending => {
49
if finish {
50
Poll::Ready(Ok(StreamResult::Cancelled))
51
} else {
52
Poll::Pending
53
}
54
}
55
Poll::Ready(Some(item)) => {
56
let mut destination = destination.as_direct(store, 1);
57
destination.remaining()[0] = item;
58
destination.mark_written(1);
59
Poll::Ready(Ok(StreamResult::Completed))
60
}
61
Poll::Ready(None) => Poll::Ready(Ok(StreamResult::Dropped)),
62
}
63
}
64
}
65
66
pub struct DirectPipeConsumer<S>(S);
67
68
impl<D, S: Sink<u8, Error: std::error::Error + Send + Sync> + Send + 'static> StreamConsumer<D>
69
for DirectPipeConsumer<S>
70
{
71
type Item = u8;
72
73
fn poll_consume(
74
self: Pin<&mut Self>,
75
cx: &mut Context<'_>,
76
store: StoreContextMut<D>,
77
source: Source<Self::Item>,
78
finish: bool,
79
) -> Poll<Result<StreamResult>> {
80
// SAFETY: This is a standard pin-projection, and we never move
81
// out of `self`.
82
let mut sink = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
83
84
let on_pending = || {
85
if finish {
86
Poll::Ready(Ok(StreamResult::Cancelled))
87
} else {
88
Poll::Pending
89
}
90
};
91
92
match sink.as_mut().poll_flush(cx) {
93
Poll::Pending => on_pending(),
94
Poll::Ready(result) => {
95
result?;
96
match sink.as_mut().poll_ready(cx) {
97
Poll::Pending => on_pending(),
98
Poll::Ready(result) => {
99
result?;
100
let mut source = source.as_direct(store);
101
let item = source.remaining()[0];
102
source.mark_read(1);
103
sink.start_send(item)?;
104
Poll::Ready(Ok(StreamResult::Completed))
105
}
106
}
107
}
108
}
109
}
110
}
111
112
#[tokio::test]
113
pub async fn async_closed_streams() -> Result<()> {
114
let engine = Engine::new(&config())?;
115
116
let mut store = Store::new(
117
&engine,
118
Ctx {
119
wasi: WasiCtxBuilder::new().inherit_stdio().build(),
120
table: ResourceTable::default(),
121
continue_: false,
122
},
123
);
124
125
let mut linker = Linker::new(&engine);
126
127
wasmtime_wasi::p2::add_to_linker_async(&mut linker)?;
128
129
let component = make_component(
130
&engine,
131
&[test_programs_artifacts::ASYNC_CLOSED_STREAMS_COMPONENT],
132
)
133
.await?;
134
135
let instance = linker.instantiate_async(&mut store, &component).await?;
136
137
let values = vec![42_u8, 43, 44];
138
139
let value = 42_u8;
140
141
// First, test stream host->host
142
for direct_producer in [true, false] {
143
for direct_consumer in [true, false] {
144
let (mut input_tx, input_rx) = mpsc::channel(1);
145
let (output_tx, mut output_rx) = mpsc::channel(1);
146
let reader = if direct_producer {
147
StreamReader::new(&mut store, DirectPipeProducer(input_rx))
148
} else {
149
StreamReader::new(&mut store, PipeProducer::new(input_rx))
150
};
151
if direct_consumer {
152
reader.pipe(&mut store, DirectPipeConsumer(output_tx));
153
} else {
154
reader.pipe(&mut store, PipeConsumer::new(output_tx));
155
}
156
157
store
158
.run_concurrent(async |_| {
159
let (a, b) = future::join(
160
async {
161
for &value in &values {
162
input_tx.send(value).await?;
163
}
164
drop(input_tx);
165
wasmtime::error::Ok(())
166
},
167
async {
168
for &value in &values {
169
assert_eq!(Some(value), output_rx.next().await);
170
}
171
assert!(output_rx.next().await.is_none());
172
Ok(())
173
},
174
)
175
.await;
176
177
a.and(b)
178
})
179
.await??;
180
}
181
}
182
183
// Next, test futures host->host
184
{
185
let (input_tx, input_rx) = oneshot::channel();
186
let (output_tx, output_rx) = oneshot::channel();
187
FutureReader::new(&mut store, OneshotProducer::new(input_rx))
188
.pipe(&mut store, OneshotConsumer::new(output_tx));
189
190
store
191
.run_concurrent(async |_| {
192
_ = input_tx.send(value);
193
assert_eq!(value, output_rx.await?);
194
wasmtime::error::Ok(())
195
})
196
.await??;
197
}
198
199
// Next, test stream host->guest
200
{
201
let (mut tx, rx) = mpsc::channel(1);
202
let rx = StreamReader::new(&mut store, PipeProducer::new(rx));
203
204
let closed_streams = closed_streams::bindings::ClosedStreams::new(&mut store, &instance)?;
205
206
let values = values.clone();
207
208
store
209
.run_concurrent(async move |accessor| {
210
let (a, b) = future::join(
211
async {
212
for &value in &values {
213
tx.send(value).await?;
214
}
215
drop(tx);
216
Ok(())
217
},
218
closed_streams.local_local_closed().call_read_stream(
219
accessor,
220
rx,
221
values.clone(),
222
),
223
)
224
.await;
225
226
a.and(b)
227
})
228
.await??;
229
}
230
231
// Next, test futures host->guest
232
{
233
let (tx, rx) = oneshot::channel();
234
let rx = FutureReader::new(&mut store, OneshotProducer::new(rx));
235
let (_, rx_ignored) = oneshot::channel();
236
let rx_ignored = FutureReader::new(&mut store, OneshotProducer::new(rx_ignored));
237
238
let closed_streams = closed_streams::bindings::ClosedStreams::new(&mut store, &instance)?;
239
240
store
241
.run_concurrent(async move |accessor| {
242
_ = tx.send(value);
243
closed_streams
244
.local_local_closed()
245
.call_read_future(accessor, rx, value, rx_ignored)
246
.await
247
})
248
.await??;
249
}
250
251
Ok(())
252
}
253
254
mod closed_stream {
255
wasmtime::component::bindgen!({
256
path: "wit",
257
world: "closed-stream-guest",
258
exports: { default: store | async },
259
});
260
}
261
262
#[tokio::test]
263
pub async fn async_closed_stream() -> Result<()> {
264
let engine = Engine::new(&config())?;
265
266
let component = make_component(
267
&engine,
268
&[test_programs_artifacts::ASYNC_CLOSED_STREAM_COMPONENT],
269
)
270
.await?;
271
272
let mut linker = Linker::new(&engine);
273
274
wasmtime_wasi::p2::add_to_linker_async(&mut linker)?;
275
276
let mut store = Store::new(
277
&engine,
278
Ctx {
279
wasi: WasiCtxBuilder::new().inherit_stdio().build(),
280
table: ResourceTable::default(),
281
continue_: false,
282
},
283
);
284
285
let instance = linker.instantiate_async(&mut store, &component).await?;
286
let guest = closed_stream::ClosedStreamGuest::new(&mut store, &instance)?;
287
store
288
.run_concurrent(async move |accessor| {
289
let stream = guest.local_local_closed_stream().call_get(accessor).await?;
290
291
let (tx, mut rx) = mpsc::channel(1);
292
accessor.with(move |store| stream.pipe(store, PipeConsumer::new(tx)));
293
assert!(rx.next().await.is_none());
294
295
Ok(())
296
})
297
.await?
298
}
299
300
struct VecProducer<T> {
301
source: Vec<T>,
302
sleep: Pin<Box<dyn Future<Output = ()> + Send>>,
303
}
304
305
impl<T> VecProducer<T> {
306
fn new(source: Vec<T>, delay: bool) -> Self {
307
Self {
308
source,
309
sleep: if delay {
310
tokio::time::sleep(Duration::from_millis(10)).boxed()
311
} else {
312
async {}.boxed()
313
},
314
}
315
}
316
}
317
318
impl<D, T: Lift + Unpin + 'static> StreamProducer<D> for VecProducer<T> {
319
type Item = T;
320
type Buffer = VecBuffer<T>;
321
322
fn poll_produce(
323
mut self: Pin<&mut Self>,
324
cx: &mut Context<'_>,
325
_: StoreContextMut<D>,
326
mut destination: Destination<Self::Item, Self::Buffer>,
327
_: bool,
328
) -> Poll<Result<StreamResult>> {
329
let sleep = &mut self.as_mut().get_mut().sleep;
330
task::ready!(sleep.as_mut().poll(cx));
331
*sleep = async {}.boxed();
332
333
destination.set_buffer(mem::take(&mut self.get_mut().source).into());
334
Poll::Ready(Ok(StreamResult::Dropped))
335
}
336
}
337
338
struct OneAtATime<T> {
339
destination: Arc<Mutex<Vec<T>>>,
340
sleep: Pin<Box<dyn Future<Output = ()> + Send>>,
341
}
342
343
impl<T> OneAtATime<T> {
344
fn new(destination: Arc<Mutex<Vec<T>>>, delay: bool) -> Self {
345
Self {
346
destination,
347
sleep: if delay {
348
tokio::time::sleep(Duration::from_millis(10)).boxed()
349
} else {
350
async {}.boxed()
351
},
352
}
353
}
354
}
355
356
impl<D, T: Lift + 'static> StreamConsumer<D> for OneAtATime<T> {
357
type Item = T;
358
359
fn poll_consume(
360
mut self: Pin<&mut Self>,
361
cx: &mut Context<'_>,
362
store: StoreContextMut<D>,
363
mut source: Source<Self::Item>,
364
_: bool,
365
) -> Poll<Result<StreamResult>> {
366
let sleep = &mut self.as_mut().get_mut().sleep;
367
task::ready!(sleep.as_mut().poll(cx));
368
*sleep = async {}.boxed();
369
370
let value = &mut None;
371
source.read(store, value)?;
372
self.destination.lock().unwrap().push(value.take().unwrap());
373
Poll::Ready(Ok(StreamResult::Completed))
374
}
375
}
376
377
mod short_reads {
378
wasmtime::component::bindgen!({
379
path: "wit",
380
world: "short-reads-guest",
381
exports: { default: async | task_exit },
382
});
383
}
384
385
#[tokio::test]
386
pub async fn async_short_reads() -> Result<()> {
387
test_async_short_reads(false).await
388
}
389
390
#[tokio::test]
391
async fn async_short_reads_with_delay() -> Result<()> {
392
test_async_short_reads(true).await
393
}
394
395
async fn test_async_short_reads(delay: bool) -> Result<()> {
396
use short_reads::exports::local::local::short_reads::Thing;
397
398
let engine = Engine::new(&config())?;
399
400
let component = make_component(
401
&engine,
402
&[test_programs_artifacts::ASYNC_SHORT_READS_COMPONENT],
403
)
404
.await?;
405
406
let mut linker = Linker::new(&engine);
407
408
wasmtime_wasi::p2::add_to_linker_async(&mut linker)?;
409
410
let mut store = Store::new(
411
&engine,
412
Ctx {
413
wasi: WasiCtxBuilder::new().inherit_stdio().build(),
414
table: ResourceTable::default(),
415
continue_: false,
416
},
417
);
418
419
let guest =
420
short_reads::ShortReadsGuest::instantiate_async(&mut store, &component, &linker).await?;
421
let thing = guest.local_local_short_reads().thing();
422
423
let strings = ["a", "b", "c", "d", "e"];
424
let mut things = Vec::with_capacity(strings.len());
425
for string in strings {
426
things.push(thing.call_constructor(&mut store, string).await?);
427
}
428
429
store
430
.run_concurrent(async |store| {
431
let count = things.len();
432
let stream =
433
store.with(|store| StreamReader::new(store, VecProducer::new(things, delay)));
434
435
let (stream, task) = guest
436
.local_local_short_reads()
437
.call_short_reads(store, stream)
438
.await?;
439
440
let received_things = Arc::new(Mutex::new(Vec::<Thing>::with_capacity(count)));
441
// Read just one item at a time from the guest, forcing it to
442
// re-take ownership of any unwritten items.
443
store.with(|store| stream.pipe(store, OneAtATime::new(received_things.clone(), delay)));
444
445
task.block(store).await;
446
447
assert_eq!(count, received_things.lock().unwrap().len());
448
449
let mut received_strings = Vec::with_capacity(strings.len());
450
let received_things = mem::take(received_things.lock().unwrap().deref_mut());
451
for it in received_things {
452
received_strings.push(thing.call_get(store, it).await?.0);
453
}
454
455
assert_eq!(
456
&strings[..],
457
&received_strings
458
.iter()
459
.map(|s| s.as_str())
460
.collect::<Vec<_>>()
461
);
462
463
wasmtime::error::Ok(())
464
})
465
.await?
466
}
467
468