Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bytecodealliance
GitHub Repository: bytecodealliance/wasmtime
Path: blob/main/crates/misc/component-async-tests/tests/scenario/transmit.rs
1693 views
1
use std::future::{self, Future};
2
use std::pin::Pin;
3
use std::sync::{Arc, Mutex};
4
use std::task::{Context, Poll};
5
use std::time::Duration;
6
7
use super::util::{config, make_component, test_run, test_run_with_count};
8
use anyhow::{Result, anyhow};
9
use cancel::exports::local::local::cancel::Mode;
10
use component_async_tests::transmit::bindings::exports::local::local::transmit::Control;
11
use component_async_tests::util::{OneshotConsumer, OneshotProducer, PipeConsumer, PipeProducer};
12
use component_async_tests::{Ctx, sleep, transmit};
13
use futures::{
14
FutureExt, SinkExt, StreamExt, TryStreamExt,
15
channel::{mpsc, oneshot},
16
stream::FuturesUnordered,
17
};
18
use wasmtime::component::{
19
Accessor, Component, Destination, FutureReader, HasSelf, Instance, Linker, ResourceTable,
20
Source, StreamConsumer, StreamProducer, StreamReader, StreamResult, Val,
21
};
22
use wasmtime::{AsContextMut, Engine, Store, StoreContextMut, Trap};
23
use wasmtime_wasi::WasiCtxBuilder;
24
25
mod readiness {
26
wasmtime::component::bindgen!({
27
path: "wit",
28
world: "readiness-guest"
29
});
30
}
31
32
struct ReadinessProducer {
33
buffer: Vec<u8>,
34
sleep: Pin<Box<dyn Future<Output = ()> + Send>>,
35
}
36
37
impl<D> StreamProducer<D> for ReadinessProducer {
38
type Item = u8;
39
type Buffer = Option<u8>;
40
41
fn poll_produce<'a>(
42
self: Pin<&mut Self>,
43
cx: &mut Context<'_>,
44
mut store: StoreContextMut<'a, D>,
45
destination: Destination<'a, Self::Item, Self::Buffer>,
46
finish: bool,
47
) -> Poll<Result<StreamResult>> {
48
let me = self.get_mut();
49
50
match me.sleep.as_mut().poll(cx) {
51
Poll::Pending => {
52
if finish {
53
Poll::Ready(Ok(StreamResult::Cancelled))
54
} else {
55
Poll::Pending
56
}
57
}
58
Poll::Ready(()) => {
59
me.sleep = async {}.boxed();
60
let capacity = destination.remaining(store.as_context_mut());
61
if capacity == Some(0) {
62
Poll::Ready(Ok(StreamResult::Completed))
63
} else {
64
assert_eq!(capacity, Some(me.buffer.len()));
65
let mut destination = destination.as_direct(store, me.buffer.len());
66
destination.remaining().copy_from_slice(&me.buffer);
67
destination.mark_written(me.buffer.len());
68
69
Poll::Ready(Ok(StreamResult::Dropped))
70
}
71
}
72
}
73
}
74
}
75
76
struct ReadinessConsumer {
77
expected: Vec<u8>,
78
sleep: Pin<Box<dyn Future<Output = ()> + Send>>,
79
}
80
81
impl<D> StreamConsumer<D> for ReadinessConsumer {
82
type Item = u8;
83
84
fn poll_consume(
85
self: Pin<&mut Self>,
86
cx: &mut Context<'_>,
87
mut store: StoreContextMut<D>,
88
source: Source<Self::Item>,
89
finish: bool,
90
) -> Poll<Result<StreamResult>> {
91
let me = self.get_mut();
92
93
match me.sleep.as_mut().poll(cx) {
94
Poll::Pending => {
95
if finish {
96
Poll::Ready(Ok(StreamResult::Cancelled))
97
} else {
98
Poll::Pending
99
}
100
}
101
Poll::Ready(()) => {
102
me.sleep = async {}.boxed();
103
let available = source.remaining(store.as_context_mut());
104
if available == 0 {
105
Poll::Ready(Ok(StreamResult::Completed))
106
} else {
107
assert_eq!(available, me.expected.len());
108
let mut source = source.as_direct(store);
109
assert_eq!(&me.expected, source.remaining());
110
source.mark_read(me.expected.len());
111
112
Poll::Ready(Ok(StreamResult::Dropped))
113
}
114
}
115
}
116
}
117
}
118
119
#[tokio::test]
120
pub async fn async_readiness() -> Result<()> {
121
let component = test_programs_artifacts::ASYNC_READINESS_COMPONENT;
122
123
let engine = Engine::new(&config())?;
124
125
let component = make_component(&engine, &[component]).await?;
126
127
let mut linker = Linker::new(&engine);
128
129
wasmtime_wasi::p2::add_to_linker_async(&mut linker)?;
130
131
let mut store = Store::new(
132
&engine,
133
Ctx {
134
wasi: WasiCtxBuilder::new().inherit_stdio().build(),
135
table: ResourceTable::default(),
136
continue_: false,
137
wakers: Arc::new(Mutex::new(None)),
138
},
139
);
140
141
let instance = linker.instantiate_async(&mut store, &component).await?;
142
let readiness_guest = readiness::ReadinessGuest::new(&mut store, &instance)?;
143
let expected = vec![2u8, 4, 6, 8, 9];
144
let rx = StreamReader::new(
145
instance,
146
&mut store,
147
ReadinessProducer {
148
buffer: expected.clone(),
149
sleep: component_async_tests::util::sleep(Duration::from_millis(delay_millis()))
150
.boxed(),
151
},
152
);
153
let result = instance
154
.run_concurrent(&mut store, async move |accessor| {
155
let (rx, expected) = readiness_guest
156
.local_local_readiness()
157
.call_start(accessor, rx, expected)
158
.await?;
159
160
accessor.with(|access| {
161
rx.pipe(
162
access,
163
ReadinessConsumer {
164
expected,
165
sleep: component_async_tests::util::sleep(Duration::from_millis(
166
delay_millis(),
167
))
168
.boxed(),
169
},
170
)
171
});
172
173
future::pending::<Result<()>>().await
174
})
175
.await;
176
177
// As of this writing, passing a future which never resolves to
178
// `Instance::run_concurrent` and expecting a `Trap::AsyncDeadlock` is
179
// the only way to join all tasks for the `Instance`, so that's what we
180
// do:
181
assert!(matches!(
182
result.unwrap_err().downcast::<Trap>(),
183
Ok(Trap::AsyncDeadlock)
184
));
185
186
Ok(())
187
}
188
189
#[tokio::test]
190
pub async fn async_poll_synchronous() -> Result<()> {
191
test_run(&[test_programs_artifacts::ASYNC_POLL_SYNCHRONOUS_COMPONENT]).await
192
}
193
194
#[tokio::test]
195
pub async fn async_poll_stackless() -> Result<()> {
196
test_run(&[test_programs_artifacts::ASYNC_POLL_STACKLESS_COMPONENT]).await
197
}
198
199
mod cancel {
200
wasmtime::component::bindgen!({
201
path: "wit",
202
world: "cancel-host",
203
exports: { default: async | store },
204
});
205
}
206
207
// No-op function; we only test this by composing it in `async_cancel_caller`
208
#[allow(
209
dead_code,
210
reason = "here only to make the `assert_test_exists` macro happy"
211
)]
212
pub fn async_cancel_callee() {}
213
214
#[tokio::test]
215
pub async fn async_cancel_caller() -> Result<()> {
216
test_cancel(Mode::Normal).await
217
}
218
219
#[tokio::test]
220
pub async fn async_cancel_caller_leak_task_after_cancel() -> Result<()> {
221
test_cancel(Mode::LeakTaskAfterCancel).await
222
}
223
224
#[tokio::test]
225
pub async fn async_trap_cancel_guest_after_start_cancelled() -> Result<()> {
226
test_cancel_trap(Mode::TrapCancelGuestAfterStartCancelled).await
227
}
228
229
#[tokio::test]
230
pub async fn async_trap_cancel_guest_after_return_cancelled() -> Result<()> {
231
test_cancel_trap(Mode::TrapCancelGuestAfterReturnCancelled).await
232
}
233
234
#[tokio::test]
235
pub async fn async_trap_cancel_guest_after_return() -> Result<()> {
236
test_cancel_trap(Mode::TrapCancelGuestAfterReturn).await
237
}
238
239
#[tokio::test]
240
pub async fn async_trap_cancel_host_after_return_cancelled() -> Result<()> {
241
test_cancel_trap(Mode::TrapCancelHostAfterReturnCancelled).await
242
}
243
244
#[tokio::test]
245
pub async fn async_trap_cancel_host_after_return() -> Result<()> {
246
test_cancel_trap(Mode::TrapCancelHostAfterReturn).await
247
}
248
249
fn delay_millis() -> u64 {
250
// Miri-based builds are much slower to run, so we delay longer in that case
251
// to ensure that async calls which the test expects to return `BLOCKED`
252
// actually do so.
253
//
254
// TODO: Make this test (more) deterministic so that such tuning is not
255
// necessary.
256
if cfg!(miri) { 1000 } else { 10 }
257
}
258
259
async fn test_cancel_trap(mode: Mode) -> Result<()> {
260
let message = "`subtask.cancel` called after terminal status delivered";
261
let trap = test_cancel(mode).await.unwrap_err();
262
assert!(
263
format!("{trap:?}").contains(message),
264
"expected `{message}`; got `{trap:?}`",
265
);
266
Ok(())
267
}
268
269
async fn test_cancel(mode: Mode) -> Result<()> {
270
let engine = Engine::new(&config())?;
271
272
let component = make_component(
273
&engine,
274
&[
275
test_programs_artifacts::ASYNC_CANCEL_CALLER_COMPONENT,
276
test_programs_artifacts::ASYNC_CANCEL_CALLEE_COMPONENT,
277
],
278
)
279
.await?;
280
281
let mut linker = Linker::new(&engine);
282
283
wasmtime_wasi::p2::add_to_linker_async(&mut linker)?;
284
sleep::local::local::sleep::add_to_linker::<_, Ctx>(&mut linker, |ctx| ctx)?;
285
286
let mut store = Store::new(
287
&engine,
288
Ctx {
289
wasi: WasiCtxBuilder::new().inherit_stdio().build(),
290
table: ResourceTable::default(),
291
continue_: false,
292
wakers: Arc::new(Mutex::new(None)),
293
},
294
);
295
296
let instance = linker.instantiate_async(&mut store, &component).await?;
297
let cancel_host = cancel::CancelHost::new(&mut store, &instance)?;
298
instance
299
.run_concurrent(&mut store, async move |accessor| {
300
cancel_host
301
.local_local_cancel()
302
.call_run(accessor, mode, delay_millis())
303
.await
304
})
305
.await??;
306
307
Ok(())
308
}
309
310
#[tokio::test]
311
pub async fn async_intertask_communication() -> Result<()> {
312
test_run_with_count(
313
&[test_programs_artifacts::ASYNC_INTERTASK_COMMUNICATION_COMPONENT],
314
2,
315
)
316
.await
317
}
318
319
#[tokio::test]
320
pub async fn async_transmit_caller() -> Result<()> {
321
test_run(&[
322
test_programs_artifacts::ASYNC_TRANSMIT_CALLER_COMPONENT,
323
test_programs_artifacts::ASYNC_TRANSMIT_CALLEE_COMPONENT,
324
])
325
.await
326
}
327
328
#[tokio::test]
329
pub async fn async_transmit_callee() -> Result<()> {
330
test_transmit(test_programs_artifacts::ASYNC_TRANSMIT_CALLEE_COMPONENT).await
331
}
332
333
pub trait TransmitTest {
334
type Instance: Send + Sync;
335
type Params;
336
type Result: Send + Sync + 'static;
337
338
fn instantiate(
339
store: impl AsContextMut<Data = Ctx>,
340
component: &Component,
341
linker: &Linker<Ctx>,
342
) -> impl Future<Output = Result<(Self::Instance, Instance)>>;
343
344
fn call<'a>(
345
accessor: &'a Accessor<Ctx, HasSelf<Ctx>>,
346
instance: &'a Self::Instance,
347
params: Self::Params,
348
) -> impl Future<Output = Result<Self::Result>> + Send + 'a;
349
350
fn into_params(
351
control: StreamReader<Control>,
352
caller_stream: StreamReader<String>,
353
caller_future1: FutureReader<String>,
354
caller_future2: FutureReader<String>,
355
) -> Self::Params;
356
357
fn from_result(
358
store: impl AsContextMut<Data = Ctx>,
359
instance: Instance,
360
result: Self::Result,
361
) -> Result<(
362
StreamReader<String>,
363
FutureReader<String>,
364
FutureReader<String>,
365
)>;
366
}
367
368
struct StaticTransmitTest;
369
370
impl TransmitTest for StaticTransmitTest {
371
type Instance = transmit::bindings::TransmitCallee;
372
type Params = (
373
StreamReader<Control>,
374
StreamReader<String>,
375
FutureReader<String>,
376
FutureReader<String>,
377
);
378
type Result = (
379
StreamReader<String>,
380
FutureReader<String>,
381
FutureReader<String>,
382
);
383
384
async fn instantiate(
385
mut store: impl AsContextMut<Data = Ctx>,
386
component: &Component,
387
linker: &Linker<Ctx>,
388
) -> Result<(Self::Instance, Instance)> {
389
let instance = linker.instantiate_async(&mut store, component).await?;
390
let callee = transmit::bindings::TransmitCallee::new(store, &instance)?;
391
Ok((callee, instance))
392
}
393
394
fn call<'a>(
395
accessor: &'a Accessor<Ctx, HasSelf<Ctx>>,
396
instance: &'a Self::Instance,
397
params: Self::Params,
398
) -> impl Future<Output = Result<Self::Result>> + Send + 'a {
399
instance
400
.local_local_transmit()
401
.call_exchange(accessor, params.0, params.1, params.2, params.3)
402
}
403
404
fn into_params(
405
control: StreamReader<Control>,
406
caller_stream: StreamReader<String>,
407
caller_future1: FutureReader<String>,
408
caller_future2: FutureReader<String>,
409
) -> Self::Params {
410
(control, caller_stream, caller_future1, caller_future2)
411
}
412
413
fn from_result(
414
_: impl AsContextMut<Data = Ctx>,
415
_: Instance,
416
result: Self::Result,
417
) -> Result<(
418
StreamReader<String>,
419
FutureReader<String>,
420
FutureReader<String>,
421
)> {
422
Ok(result)
423
}
424
}
425
426
struct DynamicTransmitTest;
427
428
impl TransmitTest for DynamicTransmitTest {
429
type Instance = Instance;
430
type Params = Vec<Val>;
431
type Result = Val;
432
433
async fn instantiate(
434
store: impl AsContextMut<Data = Ctx>,
435
component: &Component,
436
linker: &Linker<Ctx>,
437
) -> Result<(Self::Instance, Instance)> {
438
let instance = linker.instantiate_async(store, component).await?;
439
Ok((instance, instance))
440
}
441
442
async fn call<'a>(
443
accessor: &'a Accessor<Ctx, HasSelf<Ctx>>,
444
instance: &'a Self::Instance,
445
params: Self::Params,
446
) -> Result<Self::Result> {
447
let exchange_function = accessor.with(|mut store| {
448
let transmit_instance = instance
449
.get_export_index(store.as_context_mut(), None, "local:local/transmit")
450
.ok_or_else(|| anyhow!("can't find `local:local/transmit` in instance"))?;
451
let exchange_function = instance
452
.get_export_index(
453
store.as_context_mut(),
454
Some(&transmit_instance),
455
"[async]exchange",
456
)
457
.ok_or_else(|| anyhow!("can't find `exchange` in instance"))?;
458
instance
459
.get_func(store.as_context_mut(), exchange_function)
460
.ok_or_else(|| anyhow!("can't find `exchange` in instance"))
461
})?;
462
463
let mut results = vec![Val::Bool(false)];
464
exchange_function
465
.call_concurrent(accessor, &params, &mut results)
466
.await?;
467
Ok(results.pop().unwrap())
468
}
469
470
fn into_params(
471
control: StreamReader<Control>,
472
caller_stream: StreamReader<String>,
473
caller_future1: FutureReader<String>,
474
caller_future2: FutureReader<String>,
475
) -> Self::Params {
476
vec![
477
control.into_val(),
478
caller_stream.into_val(),
479
caller_future1.into_val(),
480
caller_future2.into_val(),
481
]
482
}
483
484
fn from_result(
485
mut store: impl AsContextMut<Data = Ctx>,
486
instance: Instance,
487
result: Self::Result,
488
) -> Result<(
489
StreamReader<String>,
490
FutureReader<String>,
491
FutureReader<String>,
492
)> {
493
let Val::Tuple(fields) = result else {
494
unreachable!()
495
};
496
let stream = StreamReader::from_val(store.as_context_mut(), instance, &fields[0])?;
497
let future1 = FutureReader::from_val(store.as_context_mut(), instance, &fields[1])?;
498
let future2 = FutureReader::from_val(store.as_context_mut(), instance, &fields[2])?;
499
Ok((stream, future1, future2))
500
}
501
}
502
503
async fn test_transmit(component: &str) -> Result<()> {
504
test_transmit_with::<StaticTransmitTest>(component).await?;
505
test_transmit_with::<DynamicTransmitTest>(component).await
506
}
507
508
async fn test_transmit_with<Test: TransmitTest + 'static>(component: &str) -> Result<()> {
509
let engine = Engine::new(&config())?;
510
511
let make_store = || {
512
Store::new(
513
&engine,
514
Ctx {
515
wasi: WasiCtxBuilder::new().inherit_stdio().build(),
516
table: ResourceTable::default(),
517
continue_: false,
518
wakers: Arc::new(Mutex::new(None)),
519
},
520
)
521
};
522
523
let component = make_component(&engine, &[component]).await?;
524
525
let mut linker = Linker::new(&engine);
526
527
wasmtime_wasi::p2::add_to_linker_async(&mut linker)?;
528
529
let mut store = make_store();
530
531
let (test, instance) = Test::instantiate(&mut store, &component, &linker).await?;
532
533
enum Event<Test: TransmitTest> {
534
Result(Test::Result),
535
ControlWriteA(mpsc::Sender<Control>),
536
ControlWriteB(mpsc::Sender<Control>),
537
ControlWriteC(mpsc::Sender<Control>),
538
ControlWriteD,
539
WriteA,
540
ReadC(mpsc::Receiver<String>, Option<String>),
541
ReadD(mpsc::Receiver<String>, Option<String>),
542
ReadNone(Option<String>),
543
}
544
545
let (mut control_tx, control_rx) = mpsc::channel(1);
546
let control_rx = StreamReader::new(instance, &mut store, PipeProducer::new(control_rx));
547
let (mut caller_stream_tx, caller_stream_rx) = mpsc::channel(1);
548
let caller_stream_rx =
549
StreamReader::new(instance, &mut store, PipeProducer::new(caller_stream_rx));
550
let (caller_future1_tx, caller_future1_rx) = oneshot::channel();
551
let caller_future1_rx = FutureReader::new(
552
instance,
553
&mut store,
554
OneshotProducer::new(caller_future1_rx),
555
);
556
let (_, caller_future2_rx) = oneshot::channel();
557
let caller_future2_rx = FutureReader::new(
558
instance,
559
&mut store,
560
OneshotProducer::new(caller_future2_rx),
561
);
562
let (callee_future1_tx, callee_future1_rx) = oneshot::channel();
563
let (callee_stream_tx, callee_stream_rx) = mpsc::channel(1);
564
instance
565
.run_concurrent(&mut store, async |accessor| {
566
let mut caller_future1_tx = Some(caller_future1_tx);
567
let mut callee_future1_tx = Some(callee_future1_tx);
568
let mut callee_future1_rx = Some(callee_future1_rx);
569
let mut callee_stream_tx = Some(callee_stream_tx);
570
let mut callee_stream_rx = Some(callee_stream_rx);
571
let mut complete = false;
572
let mut futures = FuturesUnordered::<
573
Pin<Box<dyn Future<Output = Result<Event<Test>>> + Send>>,
574
>::new();
575
576
futures.push(
577
async move {
578
control_tx.send(Control::ReadStream("a".into())).await?;
579
Ok(Event::ControlWriteA(control_tx))
580
}
581
.boxed(),
582
);
583
584
futures.push(
585
async move {
586
caller_stream_tx.send(String::from("a")).await?;
587
Ok(Event::WriteA)
588
}
589
.boxed(),
590
);
591
592
futures.push(
593
Test::call(
594
accessor,
595
&test,
596
Test::into_params(
597
control_rx,
598
caller_stream_rx,
599
caller_future1_rx,
600
caller_future2_rx,
601
),
602
)
603
.map(|v| v.map(Event::Result))
604
.boxed(),
605
);
606
607
while let Some(event) = futures.try_next().await? {
608
match event {
609
Event::Result(result) => {
610
accessor.with(|mut store| {
611
let (callee_stream_rx, callee_future1_rx, _) =
612
Test::from_result(&mut store, instance, result)?;
613
callee_stream_rx.pipe(
614
&mut store,
615
PipeConsumer::new(callee_stream_tx.take().unwrap()),
616
);
617
callee_future1_rx.pipe(
618
&mut store,
619
OneshotConsumer::new(callee_future1_tx.take().unwrap()),
620
);
621
anyhow::Ok(())
622
})?;
623
}
624
Event::ControlWriteA(mut control_tx) => {
625
futures.push(
626
async move {
627
control_tx.send(Control::ReadFuture("b".into())).await?;
628
Ok(Event::ControlWriteB(control_tx))
629
}
630
.boxed(),
631
);
632
}
633
Event::WriteA => {
634
_ = caller_future1_tx.take().unwrap().send("b".into());
635
let mut callee_stream_rx = callee_stream_rx.take().unwrap();
636
futures.push(
637
async move {
638
let value = callee_stream_rx.next().await;
639
Ok(Event::ReadC(callee_stream_rx, value))
640
}
641
.boxed(),
642
);
643
}
644
Event::ControlWriteB(mut control_tx) => {
645
futures.push(
646
async move {
647
control_tx.send(Control::WriteStream("c".into())).await?;
648
Ok(Event::ControlWriteC(control_tx))
649
}
650
.boxed(),
651
);
652
}
653
Event::ControlWriteC(mut control_tx) => {
654
futures.push(
655
async move {
656
control_tx.send(Control::WriteFuture("d".into())).await?;
657
Ok(Event::ControlWriteD)
658
}
659
.boxed(),
660
);
661
}
662
Event::ReadC(callee_stream_rx, mut value) => {
663
assert_eq!(value.take().as_deref(), Some("c"));
664
futures.push(
665
callee_future1_rx
666
.take()
667
.unwrap()
668
.map(|v| Event::ReadD(callee_stream_rx, v.ok()))
669
.map(Ok)
670
.boxed(),
671
);
672
}
673
Event::ControlWriteD => {}
674
Event::ReadD(_, None) => unreachable!(),
675
Event::ReadD(mut callee_stream_rx, Some(value)) => {
676
assert_eq!(&value, "d");
677
futures.push(
678
async move { Ok(Event::ReadNone(callee_stream_rx.next().await)) }
679
.boxed(),
680
);
681
}
682
Event::ReadNone(Some(_)) => unreachable!(),
683
Event::ReadNone(None) => {
684
complete = true;
685
}
686
}
687
}
688
689
assert!(complete);
690
691
anyhow::Ok(())
692
})
693
.await??;
694
Ok(())
695
}
696
697