Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bytecodealliance
GitHub Repository: bytecodealliance/wasmtime
Path: blob/main/crates/misc/component-async-tests/tests/scenario/transmit.rs
3088 views
1
use std::future::Future;
2
use std::pin::Pin;
3
use std::task::{self, Context, Poll};
4
use std::time::Duration;
5
6
use super::util::{config, make_component, test_run, test_run_with_count};
7
use cancel::exports::local::local::cancel::Mode;
8
use component_async_tests::transmit::bindings::exports::local::local::transmit::Control;
9
use component_async_tests::util::{OneshotConsumer, OneshotProducer, PipeConsumer, PipeProducer};
10
use component_async_tests::{Ctx, sleep, transmit};
11
use futures::{
12
FutureExt, SinkExt, StreamExt, TryStreamExt,
13
channel::{mpsc, oneshot},
14
stream::FuturesUnordered,
15
};
16
use wasmtime::component::{
17
Accessor, Component, Destination, FutureConsumer, FutureProducer, FutureReader, HasSelf,
18
Instance, Linker, ResourceTable, Source, StreamConsumer, StreamProducer, StreamReader,
19
StreamResult, Val,
20
};
21
use wasmtime::{AsContextMut, Engine, Result, Store, StoreContextMut, format_err};
22
use wasmtime_wasi::WasiCtxBuilder;
23
24
struct BufferStreamProducer {
25
buffer: Vec<u8>,
26
}
27
28
impl<D> StreamProducer<D> for BufferStreamProducer {
29
type Item = u8;
30
type Buffer = Option<u8>;
31
32
fn poll_produce<'a>(
33
self: Pin<&mut Self>,
34
_: &mut Context<'_>,
35
mut store: StoreContextMut<'a, D>,
36
destination: Destination<'a, Self::Item, Self::Buffer>,
37
_: bool,
38
) -> Poll<Result<StreamResult>> {
39
let me = self.get_mut();
40
let capacity = destination.remaining(store.as_context_mut());
41
if capacity == Some(0) {
42
Poll::Ready(Ok(StreamResult::Completed))
43
} else {
44
assert_eq!(capacity, Some(me.buffer.len()));
45
let mut destination = destination.as_direct(store, me.buffer.len());
46
destination.remaining().copy_from_slice(&me.buffer);
47
destination.mark_written(me.buffer.len());
48
49
Poll::Ready(Ok(StreamResult::Dropped))
50
}
51
}
52
}
53
54
struct BufferStreamConsumer {
55
expected: Vec<u8>,
56
}
57
58
impl<D> StreamConsumer<D> for BufferStreamConsumer {
59
type Item = u8;
60
61
fn poll_consume(
62
self: Pin<&mut Self>,
63
_: &mut Context<'_>,
64
mut store: StoreContextMut<D>,
65
source: Source<Self::Item>,
66
_: bool,
67
) -> Poll<Result<StreamResult>> {
68
let me = self.get_mut();
69
let available = source.remaining(store.as_context_mut());
70
if available == 0 {
71
Poll::Ready(Ok(StreamResult::Completed))
72
} else {
73
assert_eq!(available, me.expected.len());
74
let mut source = source.as_direct(store);
75
assert_eq!(&me.expected, source.remaining());
76
source.mark_read(me.expected.len());
77
78
Poll::Ready(Ok(StreamResult::Dropped))
79
}
80
}
81
}
82
83
struct ValueFutureProducer {
84
value: u8,
85
}
86
87
impl<D> FutureProducer<D> for ValueFutureProducer {
88
type Item = u8;
89
90
fn poll_produce<'a>(
91
self: Pin<&mut Self>,
92
_: &mut Context<'_>,
93
_: StoreContextMut<'a, D>,
94
_: bool,
95
) -> Poll<Result<Option<Self::Item>>> {
96
Poll::Ready(Ok(Some(self.value)))
97
}
98
}
99
100
struct ValueFutureConsumer {
101
expected: u8,
102
}
103
104
impl<D> FutureConsumer<D> for ValueFutureConsumer {
105
type Item = u8;
106
107
fn poll_consume(
108
self: Pin<&mut Self>,
109
_: &mut Context<'_>,
110
store: StoreContextMut<D>,
111
mut source: Source<'_, Self::Item>,
112
_: bool,
113
) -> Poll<Result<()>> {
114
let value = &mut None;
115
source.read(store, value)?;
116
assert_eq!(value.take(), Some(self.expected));
117
Poll::Ready(Ok(()))
118
}
119
}
120
121
struct DelayedStreamProducer<P> {
122
inner: P,
123
sleep: Pin<Box<dyn Future<Output = ()> + Send>>,
124
}
125
126
impl<D, P: StreamProducer<D>> StreamProducer<D> for DelayedStreamProducer<P> {
127
type Item = P::Item;
128
type Buffer = P::Buffer;
129
130
fn poll_produce<'a>(
131
mut self: Pin<&mut Self>,
132
cx: &mut Context<'_>,
133
store: StoreContextMut<'a, D>,
134
destination: Destination<'a, Self::Item, Self::Buffer>,
135
finish: bool,
136
) -> Poll<Result<StreamResult>> {
137
// SAFETY: We never move out of `self`.
138
let sleep = unsafe { &mut self.as_mut().get_unchecked_mut().sleep };
139
task::ready!(sleep.as_mut().poll(cx));
140
*sleep = async {}.boxed();
141
142
// SAFETY: This is a standard pin-projection, and we never move out
143
// of `self`.
144
let inner = unsafe { self.map_unchecked_mut(|v| &mut v.inner) };
145
inner.poll_produce(cx, store, destination, finish)
146
}
147
}
148
149
struct DelayedStreamConsumer<C> {
150
inner: C,
151
sleep: Pin<Box<dyn Future<Output = ()> + Send>>,
152
}
153
154
impl<D, C: StreamConsumer<D>> StreamConsumer<D> for DelayedStreamConsumer<C> {
155
type Item = C::Item;
156
157
fn poll_consume(
158
mut self: Pin<&mut Self>,
159
cx: &mut Context<'_>,
160
store: StoreContextMut<D>,
161
source: Source<Self::Item>,
162
finish: bool,
163
) -> Poll<Result<StreamResult>> {
164
// SAFETY: We never move out of `self`.
165
let sleep = unsafe { &mut self.as_mut().get_unchecked_mut().sleep };
166
task::ready!(sleep.as_mut().poll(cx));
167
*sleep = async {}.boxed();
168
169
// SAFETY: This is a standard pin-projection, and we never move out
170
// of `self`.
171
let inner = unsafe { self.map_unchecked_mut(|v| &mut v.inner) };
172
inner.poll_consume(cx, store, source, finish)
173
}
174
}
175
176
struct DelayedFutureProducer<P> {
177
inner: P,
178
sleep: Pin<Box<dyn Future<Output = ()> + Send>>,
179
}
180
181
impl<D, P: FutureProducer<D>> FutureProducer<D> for DelayedFutureProducer<P> {
182
type Item = P::Item;
183
184
fn poll_produce<'a>(
185
mut self: Pin<&mut Self>,
186
cx: &mut Context<'_>,
187
store: StoreContextMut<'a, D>,
188
finish: bool,
189
) -> Poll<Result<Option<Self::Item>>> {
190
// SAFETY: We never move out of `self`.
191
let sleep = unsafe { &mut self.as_mut().get_unchecked_mut().sleep };
192
task::ready!(sleep.as_mut().poll(cx));
193
*sleep = async {}.boxed();
194
195
// SAFETY: This is a standard pin-projection, and we never move out
196
// of `self`.
197
let inner = unsafe { self.map_unchecked_mut(|v| &mut v.inner) };
198
inner.poll_produce(cx, store, finish)
199
}
200
}
201
202
struct DelayedFutureConsumer<C> {
203
inner: C,
204
sleep: Pin<Box<dyn Future<Output = ()> + Send>>,
205
}
206
207
impl<D, C: FutureConsumer<D>> FutureConsumer<D> for DelayedFutureConsumer<C> {
208
type Item = C::Item;
209
210
fn poll_consume(
211
mut self: Pin<&mut Self>,
212
cx: &mut Context<'_>,
213
store: StoreContextMut<D>,
214
source: Source<'_, Self::Item>,
215
finish: bool,
216
) -> Poll<Result<()>> {
217
// SAFETY: We never move out of `self`.
218
let sleep = unsafe { &mut self.as_mut().get_unchecked_mut().sleep };
219
task::ready!(sleep.as_mut().poll(cx));
220
*sleep = async {}.boxed();
221
222
// SAFETY: This is a standard pin-projection, and we never move out
223
// of `self`.
224
let inner = unsafe { self.map_unchecked_mut(|v| &mut v.inner) };
225
inner.poll_consume(cx, store, source, finish)
226
}
227
}
228
229
struct ProcrastinatingStreamProducer<P>(P);
230
231
impl<D, P: StreamProducer<D>> StreamProducer<D> for ProcrastinatingStreamProducer<P> {
232
type Item = P::Item;
233
type Buffer = P::Buffer;
234
235
fn poll_produce<'a>(
236
self: Pin<&mut Self>,
237
cx: &mut Context<'_>,
238
store: StoreContextMut<'a, D>,
239
destination: Destination<'a, Self::Item, Self::Buffer>,
240
finish: bool,
241
) -> Poll<Result<StreamResult>> {
242
if finish {
243
// SAFETY: This is a standard pin-projection, and we never move out
244
// of `self`.
245
let producer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
246
producer.poll_produce(cx, store, destination, true)
247
} else {
248
Poll::Pending
249
}
250
}
251
}
252
253
struct ProcrastinatingStreamConsumer<C>(C);
254
255
impl<D, C: StreamConsumer<D>> StreamConsumer<D> for ProcrastinatingStreamConsumer<C> {
256
type Item = C::Item;
257
258
fn poll_consume(
259
self: Pin<&mut Self>,
260
cx: &mut Context<'_>,
261
store: StoreContextMut<D>,
262
source: Source<'_, Self::Item>,
263
finish: bool,
264
) -> Poll<Result<StreamResult>> {
265
if finish {
266
// SAFETY: This is a standard pin-projection, and we never move out
267
// of `self`.
268
let consumer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
269
consumer.poll_consume(cx, store, source, true)
270
} else {
271
Poll::Pending
272
}
273
}
274
}
275
276
struct ProcrastinatingFutureProducer<P>(P);
277
278
impl<D, P: FutureProducer<D>> FutureProducer<D> for ProcrastinatingFutureProducer<P> {
279
type Item = P::Item;
280
281
fn poll_produce<'a>(
282
self: Pin<&mut Self>,
283
cx: &mut Context<'_>,
284
store: StoreContextMut<'a, D>,
285
finish: bool,
286
) -> Poll<Result<Option<Self::Item>>> {
287
if finish {
288
// SAFETY: This is a standard pin-projection, and we never move out
289
// of `self`.
290
let producer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
291
producer.poll_produce(cx, store, true)
292
} else {
293
Poll::Pending
294
}
295
}
296
}
297
298
struct ProcrastinatingFutureConsumer<C>(C);
299
300
impl<D, C: FutureConsumer<D>> FutureConsumer<D> for ProcrastinatingFutureConsumer<C> {
301
type Item = C::Item;
302
303
fn poll_consume(
304
self: Pin<&mut Self>,
305
cx: &mut Context<'_>,
306
store: StoreContextMut<D>,
307
source: Source<'_, Self::Item>,
308
finish: bool,
309
) -> Poll<Result<()>> {
310
if finish {
311
// SAFETY: This is a standard pin-projection, and we never move out
312
// of `self`.
313
let consumer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
314
consumer.poll_consume(cx, store, source, true)
315
} else {
316
Poll::Pending
317
}
318
}
319
}
320
321
fn sleep() -> Pin<Box<dyn Future<Output = ()> + Send>> {
322
component_async_tests::util::sleep(Duration::from_millis(delay_millis())).boxed()
323
}
324
325
mod readiness {
326
wasmtime::component::bindgen!({
327
path: "wit",
328
world: "readiness-guest",
329
exports: { default: task_exit },
330
});
331
}
332
333
#[tokio::test]
334
pub async fn async_readiness() -> Result<()> {
335
let component = test_programs_artifacts::ASYNC_READINESS_COMPONENT;
336
337
let engine = Engine::new(&config())?;
338
339
let component = make_component(&engine, &[component]).await?;
340
341
let mut linker = Linker::new(&engine);
342
343
wasmtime_wasi::p2::add_to_linker_async(&mut linker)?;
344
345
let mut store = Store::new(
346
&engine,
347
Ctx {
348
wasi: WasiCtxBuilder::new().inherit_stdio().build(),
349
table: ResourceTable::default(),
350
continue_: false,
351
},
352
);
353
354
let readiness_guest =
355
readiness::ReadinessGuest::instantiate_async(&mut store, &component, &linker).await?;
356
let expected = vec![2u8, 4, 6, 8, 9];
357
let rx = StreamReader::new(
358
&mut store,
359
DelayedStreamProducer {
360
inner: BufferStreamProducer {
361
buffer: expected.clone(),
362
},
363
sleep: sleep(),
364
},
365
);
366
store
367
.run_concurrent(async move |accessor| {
368
let ((rx, expected), task_exit) = readiness_guest
369
.local_local_readiness()
370
.call_start(accessor, rx, expected)
371
.await?;
372
373
accessor.with(|access| {
374
rx.pipe(
375
access,
376
DelayedStreamConsumer {
377
inner: BufferStreamConsumer { expected },
378
sleep: sleep(),
379
},
380
)
381
});
382
383
task_exit.block(accessor).await;
384
385
Ok(())
386
})
387
.await?
388
}
389
390
#[tokio::test]
391
pub async fn async_poll_synchronous() -> Result<()> {
392
test_run(&[test_programs_artifacts::ASYNC_POLL_SYNCHRONOUS_COMPONENT]).await
393
}
394
395
#[tokio::test]
396
pub async fn async_poll_stackless() -> Result<()> {
397
test_run(&[test_programs_artifacts::ASYNC_POLL_STACKLESS_COMPONENT]).await
398
}
399
400
mod cancel {
401
wasmtime::component::bindgen!({
402
path: "wit",
403
world: "cancel-host",
404
exports: { default: async | store | task_exit },
405
});
406
}
407
408
// No-op function; we only test this by composing it in `async_cancel_caller`
409
#[allow(
410
dead_code,
411
reason = "here only to make the `assert_test_exists` macro happy"
412
)]
413
pub fn async_cancel_callee() {}
414
415
#[tokio::test]
416
pub async fn async_cancel_caller() -> Result<()> {
417
test_cancel(Mode::Normal).await
418
}
419
420
#[tokio::test]
421
pub async fn async_cancel_caller_leak_task_after_cancel() -> Result<()> {
422
test_cancel(Mode::LeakTaskAfterCancel).await
423
}
424
425
#[tokio::test]
426
pub async fn async_trap_cancel_guest_after_start_cancelled() -> Result<()> {
427
test_cancel_trap(Mode::TrapCancelGuestAfterStartCancelled).await
428
}
429
430
#[tokio::test]
431
pub async fn async_trap_cancel_guest_after_return_cancelled() -> Result<()> {
432
test_cancel_trap(Mode::TrapCancelGuestAfterReturnCancelled).await
433
}
434
435
#[tokio::test]
436
pub async fn async_trap_cancel_guest_after_return() -> Result<()> {
437
test_cancel_trap(Mode::TrapCancelGuestAfterReturn).await
438
}
439
440
#[tokio::test]
441
pub async fn async_trap_cancel_host_after_return_cancelled() -> Result<()> {
442
test_cancel_trap(Mode::TrapCancelHostAfterReturnCancelled).await
443
}
444
445
#[tokio::test]
446
pub async fn async_trap_cancel_host_after_return() -> Result<()> {
447
test_cancel_trap(Mode::TrapCancelHostAfterReturn).await
448
}
449
450
fn delay_millis() -> u64 {
451
// Miri-based builds are much slower to run, so we delay longer in that case
452
// to ensure that async calls which the test expects to return `BLOCKED`
453
// actually do so.
454
//
455
// TODO: Make this test (more) deterministic so that such tuning is not
456
// necessary.
457
if cfg!(miri) { 1000 } else { 10 }
458
}
459
460
async fn test_cancel_trap(mode: Mode) -> Result<()> {
461
let message = "`subtask.cancel` called after terminal status delivered";
462
let trap = test_cancel(mode).await.unwrap_err();
463
assert!(
464
format!("{trap:?}").contains(message),
465
"expected `{message}`; got `{trap:?}`",
466
);
467
Ok(())
468
}
469
470
async fn test_cancel(mode: Mode) -> Result<()> {
471
let engine = Engine::new(&config())?;
472
473
let component = make_component(
474
&engine,
475
&[
476
test_programs_artifacts::ASYNC_CANCEL_CALLER_COMPONENT,
477
test_programs_artifacts::ASYNC_CANCEL_CALLEE_COMPONENT,
478
],
479
)
480
.await?;
481
482
let mut linker = Linker::new(&engine);
483
484
wasmtime_wasi::p2::add_to_linker_async(&mut linker)?;
485
sleep::local::local::sleep::add_to_linker::<_, Ctx>(&mut linker, |ctx| ctx)?;
486
487
let mut store = Store::new(
488
&engine,
489
Ctx {
490
wasi: WasiCtxBuilder::new().inherit_stdio().build(),
491
table: ResourceTable::default(),
492
continue_: false,
493
},
494
);
495
496
let cancel_host =
497
cancel::CancelHost::instantiate_async(&mut store, &component, &linker).await?;
498
store
499
.run_concurrent(async move |accessor| {
500
let ((), task) = cancel_host
501
.local_local_cancel()
502
.call_run(accessor, mode, delay_millis())
503
.await?;
504
task.block(accessor).await;
505
Ok::<_, wasmtime::Error>(())
506
})
507
.await??;
508
509
Ok(())
510
}
511
512
#[tokio::test]
513
pub async fn async_intertask_communication() -> Result<()> {
514
test_run_with_count(
515
&[test_programs_artifacts::ASYNC_INTERTASK_COMMUNICATION_COMPONENT],
516
2,
517
)
518
.await
519
}
520
521
#[tokio::test]
522
pub async fn async_transmit_caller() -> Result<()> {
523
test_run(&[
524
test_programs_artifacts::ASYNC_TRANSMIT_CALLER_COMPONENT,
525
test_programs_artifacts::ASYNC_TRANSMIT_CALLEE_COMPONENT,
526
])
527
.await
528
}
529
530
#[tokio::test]
531
pub async fn async_transmit_callee() -> Result<()> {
532
test_transmit(test_programs_artifacts::ASYNC_TRANSMIT_CALLEE_COMPONENT).await
533
}
534
535
pub trait TransmitTest {
536
type Instance: Send + Sync;
537
type Params;
538
type Result: Send + Sync + 'static;
539
540
fn instantiate(
541
store: impl AsContextMut<Data = Ctx>,
542
component: &Component,
543
linker: &Linker<Ctx>,
544
) -> impl Future<Output = Result<Self::Instance>>;
545
546
fn call<'a>(
547
accessor: &'a Accessor<Ctx, HasSelf<Ctx>>,
548
instance: &'a Self::Instance,
549
params: Self::Params,
550
) -> impl Future<Output = Result<Self::Result>> + Send + 'a;
551
552
fn into_params(
553
store: impl AsContextMut<Data = Ctx>,
554
control: StreamReader<Control>,
555
caller_stream: StreamReader<String>,
556
caller_future1: FutureReader<String>,
557
caller_future2: FutureReader<String>,
558
) -> Self::Params;
559
560
fn from_result(
561
store: impl AsContextMut<Data = Ctx>,
562
result: Self::Result,
563
) -> Result<(
564
StreamReader<String>,
565
FutureReader<String>,
566
FutureReader<String>,
567
)>;
568
}
569
570
struct StaticTransmitTest;
571
572
impl TransmitTest for StaticTransmitTest {
573
type Instance = transmit::bindings::TransmitCallee;
574
type Params = (
575
StreamReader<Control>,
576
StreamReader<String>,
577
FutureReader<String>,
578
FutureReader<String>,
579
);
580
type Result = (
581
StreamReader<String>,
582
FutureReader<String>,
583
FutureReader<String>,
584
);
585
586
async fn instantiate(
587
store: impl AsContextMut<Data = Ctx>,
588
component: &Component,
589
linker: &Linker<Ctx>,
590
) -> Result<Self::Instance> {
591
transmit::bindings::TransmitCallee::instantiate_async(store, &component, &linker).await
592
}
593
594
fn call<'a>(
595
accessor: &'a Accessor<Ctx, HasSelf<Ctx>>,
596
instance: &'a Self::Instance,
597
params: Self::Params,
598
) -> impl Future<Output = Result<Self::Result>> + Send + 'a {
599
instance
600
.local_local_transmit()
601
.call_exchange(accessor, params.0, params.1, params.2, params.3)
602
}
603
604
fn into_params(
605
_store: impl AsContextMut<Data = Ctx>,
606
control: StreamReader<Control>,
607
caller_stream: StreamReader<String>,
608
caller_future1: FutureReader<String>,
609
caller_future2: FutureReader<String>,
610
) -> Self::Params {
611
(control, caller_stream, caller_future1, caller_future2)
612
}
613
614
fn from_result(
615
_: impl AsContextMut<Data = Ctx>,
616
result: Self::Result,
617
) -> Result<(
618
StreamReader<String>,
619
FutureReader<String>,
620
FutureReader<String>,
621
)> {
622
Ok(result)
623
}
624
}
625
626
struct DynamicTransmitTest;
627
628
impl TransmitTest for DynamicTransmitTest {
629
type Instance = Instance;
630
type Params = Vec<Val>;
631
type Result = Val;
632
633
async fn instantiate(
634
store: impl AsContextMut<Data = Ctx>,
635
component: &Component,
636
linker: &Linker<Ctx>,
637
) -> Result<Self::Instance> {
638
linker.instantiate_async(store, component).await
639
}
640
641
async fn call<'a>(
642
accessor: &'a Accessor<Ctx, HasSelf<Ctx>>,
643
instance: &'a Self::Instance,
644
params: Self::Params,
645
) -> Result<Self::Result> {
646
let exchange_function = accessor.with(|mut store| {
647
let transmit_instance = instance
648
.get_export_index(store.as_context_mut(), None, "local:local/transmit")
649
.ok_or_else(|| format_err!("can't find `local:local/transmit` in instance"))?;
650
let exchange_function = instance
651
.get_export_index(store.as_context_mut(), Some(&transmit_instance), "exchange")
652
.ok_or_else(|| format_err!("can't find `exchange` in instance"))?;
653
instance
654
.get_func(store.as_context_mut(), exchange_function)
655
.ok_or_else(|| format_err!("can't find `exchange` in instance"))
656
})?;
657
658
let mut results = vec![Val::Bool(false)];
659
exchange_function
660
.call_concurrent(accessor, &params, &mut results)
661
.await?;
662
Ok(results.pop().unwrap())
663
}
664
665
fn into_params(
666
mut store: impl AsContextMut<Data = Ctx>,
667
control: StreamReader<Control>,
668
caller_stream: StreamReader<String>,
669
caller_future1: FutureReader<String>,
670
caller_future2: FutureReader<String>,
671
) -> Self::Params {
672
vec![
673
control.try_into_stream_any(&mut store).unwrap().into(),
674
caller_stream
675
.try_into_stream_any(&mut store)
676
.unwrap()
677
.into(),
678
caller_future1
679
.try_into_future_any(&mut store)
680
.unwrap()
681
.into(),
682
caller_future2
683
.try_into_future_any(&mut store)
684
.unwrap()
685
.into(),
686
]
687
}
688
689
fn from_result(
690
_store: impl AsContextMut<Data = Ctx>,
691
result: Self::Result,
692
) -> Result<(
693
StreamReader<String>,
694
FutureReader<String>,
695
FutureReader<String>,
696
)> {
697
let Val::Tuple(fields) = result else {
698
unreachable!()
699
};
700
let mut fields = fields.into_iter();
701
let Val::Stream(stream) = fields.next().unwrap() else {
702
unreachable!()
703
};
704
let Val::Future(future1) = fields.next().unwrap() else {
705
unreachable!()
706
};
707
let Val::Future(future2) = fields.next().unwrap() else {
708
unreachable!()
709
};
710
let stream = StreamReader::try_from_stream_any(stream).unwrap();
711
let future1 = FutureReader::try_from_future_any(future1).unwrap();
712
let future2 = FutureReader::try_from_future_any(future2).unwrap();
713
Ok((stream, future1, future2))
714
}
715
}
716
717
async fn test_transmit(component: &str) -> Result<()> {
718
test_transmit_with::<StaticTransmitTest>(component).await?;
719
test_transmit_with::<DynamicTransmitTest>(component).await
720
}
721
722
async fn test_transmit_with<Test: TransmitTest + 'static>(component: &str) -> Result<()> {
723
let engine = Engine::new(&config())?;
724
725
let component = make_component(&engine, &[component]).await?;
726
727
let mut linker = Linker::new(&engine);
728
729
wasmtime_wasi::p2::add_to_linker_async(&mut linker)?;
730
731
let mut store = Store::new(
732
&engine,
733
Ctx {
734
wasi: WasiCtxBuilder::new().inherit_stdio().build(),
735
table: ResourceTable::default(),
736
continue_: false,
737
},
738
);
739
740
let test = Test::instantiate(&mut store, &component, &linker).await?;
741
742
enum Event<Test: TransmitTest> {
743
Result(Test::Result),
744
ControlWriteA(mpsc::Sender<Control>),
745
ControlWriteB(mpsc::Sender<Control>),
746
ControlWriteC(mpsc::Sender<Control>),
747
ControlWriteD,
748
WriteA,
749
ReadC(mpsc::Receiver<String>, Option<String>),
750
ReadD(mpsc::Receiver<String>, Option<String>),
751
ReadNone(Option<String>),
752
}
753
754
let (mut control_tx, control_rx) = mpsc::channel(1);
755
let control_rx = StreamReader::new(&mut store, PipeProducer::new(control_rx));
756
let (mut caller_stream_tx, caller_stream_rx) = mpsc::channel(1);
757
let caller_stream_rx = StreamReader::new(&mut store, PipeProducer::new(caller_stream_rx));
758
let (caller_future1_tx, caller_future1_rx) = oneshot::channel();
759
let caller_future1_rx = FutureReader::new(&mut store, OneshotProducer::new(caller_future1_rx));
760
let (_, caller_future2_rx) = oneshot::channel();
761
let caller_future2_rx = FutureReader::new(&mut store, OneshotProducer::new(caller_future2_rx));
762
let (callee_future1_tx, callee_future1_rx) = oneshot::channel();
763
let (callee_stream_tx, callee_stream_rx) = mpsc::channel(1);
764
store
765
.run_concurrent(async |accessor| {
766
let mut caller_future1_tx = Some(caller_future1_tx);
767
let mut callee_future1_tx = Some(callee_future1_tx);
768
let mut callee_future1_rx = Some(callee_future1_rx);
769
let mut callee_stream_tx = Some(callee_stream_tx);
770
let mut callee_stream_rx = Some(callee_stream_rx);
771
let mut complete = false;
772
let mut futures = FuturesUnordered::<
773
Pin<Box<dyn Future<Output = Result<Event<Test>>> + Send>>,
774
>::new();
775
776
futures.push(
777
async move {
778
control_tx.send(Control::ReadStream("a".into())).await?;
779
Ok(Event::ControlWriteA(control_tx))
780
}
781
.boxed(),
782
);
783
784
futures.push(
785
async move {
786
caller_stream_tx.send(String::from("a")).await?;
787
Ok(Event::WriteA)
788
}
789
.boxed(),
790
);
791
792
let params = accessor.with(|s| {
793
Test::into_params(
794
s,
795
control_rx,
796
caller_stream_rx,
797
caller_future1_rx,
798
caller_future2_rx,
799
)
800
});
801
802
futures.push(
803
Test::call(accessor, &test, params)
804
.map(|v| v.map(Event::Result))
805
.boxed(),
806
);
807
808
while let Some(event) = futures.try_next().await? {
809
match event {
810
Event::Result(result) => {
811
accessor.with(|mut store| {
812
let (callee_stream_rx, callee_future1_rx, _) =
813
Test::from_result(&mut store, result)?;
814
callee_stream_rx.pipe(
815
&mut store,
816
PipeConsumer::new(callee_stream_tx.take().unwrap()),
817
);
818
callee_future1_rx.pipe(
819
&mut store,
820
OneshotConsumer::new(callee_future1_tx.take().unwrap()),
821
);
822
wasmtime::error::Ok(())
823
})?;
824
}
825
Event::ControlWriteA(mut control_tx) => {
826
futures.push(
827
async move {
828
control_tx.send(Control::ReadFuture("b".into())).await?;
829
Ok(Event::ControlWriteB(control_tx))
830
}
831
.boxed(),
832
);
833
}
834
Event::WriteA => {
835
_ = caller_future1_tx.take().unwrap().send("b".into());
836
let mut callee_stream_rx = callee_stream_rx.take().unwrap();
837
futures.push(
838
async move {
839
let value = callee_stream_rx.next().await;
840
Ok(Event::ReadC(callee_stream_rx, value))
841
}
842
.boxed(),
843
);
844
}
845
Event::ControlWriteB(mut control_tx) => {
846
futures.push(
847
async move {
848
control_tx.send(Control::WriteStream("c".into())).await?;
849
Ok(Event::ControlWriteC(control_tx))
850
}
851
.boxed(),
852
);
853
}
854
Event::ControlWriteC(mut control_tx) => {
855
futures.push(
856
async move {
857
control_tx.send(Control::WriteFuture("d".into())).await?;
858
Ok(Event::ControlWriteD)
859
}
860
.boxed(),
861
);
862
}
863
Event::ReadC(callee_stream_rx, mut value) => {
864
assert_eq!(value.take().as_deref(), Some("c"));
865
futures.push(
866
callee_future1_rx
867
.take()
868
.unwrap()
869
.map(|v| Event::ReadD(callee_stream_rx, v.ok()))
870
.map(Ok)
871
.boxed(),
872
);
873
}
874
Event::ControlWriteD => {}
875
Event::ReadD(_, None) => unreachable!(),
876
Event::ReadD(mut callee_stream_rx, Some(value)) => {
877
assert_eq!(&value, "d");
878
futures.push(
879
async move { Ok(Event::ReadNone(callee_stream_rx.next().await)) }
880
.boxed(),
881
);
882
}
883
Event::ReadNone(Some(_)) => unreachable!(),
884
Event::ReadNone(None) => {
885
complete = true;
886
}
887
}
888
}
889
890
assert!(complete);
891
892
wasmtime::error::Ok(())
893
})
894
.await??;
895
Ok(())
896
}
897
898
mod synchronous_transmit {
899
wasmtime::component::bindgen!({
900
path: "wit",
901
world: "synchronous-transmit-guest",
902
exports: { default: task_exit },
903
});
904
}
905
906
#[tokio::test]
907
pub async fn async_cancel_transmit() -> Result<()> {
908
test_synchronous_transmit(
909
test_programs_artifacts::ASYNC_CANCEL_TRANSMIT_COMPONENT,
910
true,
911
)
912
.await
913
}
914
915
#[tokio::test]
916
pub async fn async_synchronous_transmit() -> Result<()> {
917
test_synchronous_transmit(
918
test_programs_artifacts::ASYNC_SYNCHRONOUS_TRANSMIT_COMPONENT,
919
false,
920
)
921
.await
922
}
923
924
async fn test_synchronous_transmit(component: &str, procrastinate: bool) -> Result<()> {
925
let engine = Engine::new(&config())?;
926
927
let component = make_component(&engine, &[component]).await?;
928
929
let mut linker = Linker::new(&engine);
930
931
wasmtime_wasi::p2::add_to_linker_async(&mut linker)?;
932
933
let mut store = Store::new(
934
&engine,
935
Ctx {
936
wasi: WasiCtxBuilder::new().inherit_stdio().build(),
937
table: ResourceTable::default(),
938
continue_: false,
939
},
940
);
941
942
let instance = linker.instantiate_async(&mut store, &component).await?;
943
let guest = synchronous_transmit::SynchronousTransmitGuest::new(&mut store, &instance)?;
944
let stream_expected = vec![2u8, 4, 6, 8, 9];
945
let producer = DelayedStreamProducer {
946
inner: BufferStreamProducer {
947
buffer: stream_expected.clone(),
948
},
949
sleep: sleep(),
950
};
951
let stream = if procrastinate {
952
StreamReader::new(&mut store, ProcrastinatingStreamProducer(producer))
953
} else {
954
StreamReader::new(&mut store, producer)
955
};
956
let future_expected = 10;
957
let producer = DelayedFutureProducer {
958
inner: ValueFutureProducer {
959
value: future_expected,
960
},
961
sleep: sleep(),
962
};
963
let future = if procrastinate {
964
FutureReader::new(&mut store, ProcrastinatingFutureProducer(producer))
965
} else {
966
FutureReader::new(&mut store, producer)
967
};
968
store
969
.run_concurrent(async move |accessor| {
970
let ((stream, stream_expected, future, future_expected), task_exit) = guest
971
.local_local_synchronous_transmit()
972
.call_start(accessor, stream, stream_expected, future, future_expected)
973
.await?;
974
975
accessor.with(|mut access| {
976
let consumer = DelayedStreamConsumer {
977
inner: BufferStreamConsumer {
978
expected: stream_expected,
979
},
980
sleep: sleep(),
981
};
982
if procrastinate {
983
stream.pipe(&mut access, ProcrastinatingStreamConsumer(consumer));
984
} else {
985
stream.pipe(&mut access, consumer);
986
}
987
let consumer = DelayedFutureConsumer {
988
inner: ValueFutureConsumer {
989
expected: future_expected,
990
},
991
sleep: sleep(),
992
};
993
if procrastinate {
994
future.pipe(access, ProcrastinatingFutureConsumer(consumer));
995
} else {
996
future.pipe(access, consumer);
997
}
998
});
999
1000
task_exit.block(accessor).await;
1001
1002
Ok(())
1003
})
1004
.await?
1005
}
1006
1007