Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bytecodealliance
GitHub Repository: bytecodealliance/wasmtime
Path: blob/main/tests/all/async_functions.rs
1690 views
1
use anyhow::{anyhow, bail};
2
use std::future::Future;
3
use std::pin::Pin;
4
use std::sync::{Arc, Mutex};
5
use std::task::{Context, Poll, Waker};
6
use wasmtime::*;
7
8
fn async_store() -> Store<()> {
9
Store::new(&Engine::new(Config::new().async_support(true)).unwrap(), ())
10
}
11
12
async fn run_smoke_test(store: &mut Store<()>, func: Func) {
13
func.call_async(&mut *store, &[], &mut []).await.unwrap();
14
func.call_async(&mut *store, &[], &mut []).await.unwrap();
15
}
16
17
async fn run_smoke_typed_test(store: &mut Store<()>, func: Func) {
18
let func = func.typed::<(), ()>(&store).unwrap();
19
func.call_async(&mut *store, ()).await.unwrap();
20
func.call_async(&mut *store, ()).await.unwrap();
21
}
22
23
#[tokio::test]
24
async fn smoke() {
25
let mut store = async_store();
26
let func_ty = FuncType::new(store.engine(), None, None);
27
let func = Func::new_async(&mut store, func_ty, move |_caller, _params, _results| {
28
Box::new(async { Ok(()) })
29
});
30
run_smoke_test(&mut store, func).await;
31
run_smoke_typed_test(&mut store, func).await;
32
33
let func = Func::wrap_async(&mut store, move |_caller, _: ()| Box::new(async { Ok(()) }));
34
run_smoke_test(&mut store, func).await;
35
run_smoke_typed_test(&mut store, func).await;
36
}
37
38
#[tokio::test]
39
async fn smoke_host_func() -> Result<()> {
40
let mut store = async_store();
41
let mut linker = Linker::new(store.engine());
42
43
linker.func_new_async(
44
"",
45
"first",
46
FuncType::new(store.engine(), None, None),
47
move |_caller, _params, _results| Box::new(async { Ok(()) }),
48
)?;
49
50
linker.func_wrap_async("", "second", move |_caller, _: ()| {
51
Box::new(async { Ok(()) })
52
})?;
53
54
let func = linker
55
.get(&mut store, "", "first")
56
.unwrap()
57
.into_func()
58
.unwrap();
59
run_smoke_test(&mut store, func).await;
60
run_smoke_typed_test(&mut store, func).await;
61
62
let func = linker
63
.get(&mut store, "", "second")
64
.unwrap()
65
.into_func()
66
.unwrap();
67
run_smoke_test(&mut store, func).await;
68
run_smoke_typed_test(&mut store, func).await;
69
70
Ok(())
71
}
72
73
#[tokio::test]
74
async fn smoke_with_suspension() {
75
let mut store = async_store();
76
let func_ty = FuncType::new(store.engine(), None, None);
77
let func = Func::new_async(&mut store, func_ty, move |_caller, _params, _results| {
78
Box::new(async {
79
tokio::task::yield_now().await;
80
Ok(())
81
})
82
});
83
run_smoke_test(&mut store, func).await;
84
run_smoke_typed_test(&mut store, func).await;
85
86
let func = Func::wrap_async(&mut store, move |_caller, _: ()| {
87
Box::new(async {
88
tokio::task::yield_now().await;
89
Ok(())
90
})
91
});
92
run_smoke_test(&mut store, func).await;
93
run_smoke_typed_test(&mut store, func).await;
94
}
95
96
#[tokio::test]
97
async fn smoke_host_func_with_suspension() -> Result<()> {
98
let mut store = async_store();
99
let mut linker = Linker::new(store.engine());
100
101
linker.func_new_async(
102
"",
103
"first",
104
FuncType::new(store.engine(), None, None),
105
move |_caller, _params, _results| {
106
Box::new(async {
107
tokio::task::yield_now().await;
108
Ok(())
109
})
110
},
111
)?;
112
113
linker.func_wrap_async("", "second", move |_caller, _: ()| {
114
Box::new(async {
115
tokio::task::yield_now().await;
116
Ok(())
117
})
118
})?;
119
120
let func = linker
121
.get(&mut store, "", "first")
122
.unwrap()
123
.into_func()
124
.unwrap();
125
run_smoke_test(&mut store, func).await;
126
run_smoke_typed_test(&mut store, func).await;
127
128
let func = linker
129
.get(&mut store, "", "second")
130
.unwrap()
131
.into_func()
132
.unwrap();
133
run_smoke_test(&mut store, func).await;
134
run_smoke_typed_test(&mut store, func).await;
135
136
Ok(())
137
}
138
139
#[tokio::test]
140
#[cfg_attr(miri, ignore)]
141
async fn recursive_call() {
142
let mut store = async_store();
143
let func_ty = FuncType::new(store.engine(), None, None);
144
let async_wasm_func = Func::new_async(&mut store, func_ty, |_caller, _params, _results| {
145
Box::new(async {
146
tokio::task::yield_now().await;
147
Ok(())
148
})
149
});
150
151
// Create an imported function which recursively invokes another wasm
152
// function asynchronously, although this one is just our own host function
153
// which suffices for this test.
154
let func_ty = FuncType::new(store.engine(), None, None);
155
let func2 = Func::new_async(&mut store, func_ty, move |mut caller, _params, _results| {
156
Box::new(async move {
157
async_wasm_func
158
.call_async(&mut caller, &[], &mut [])
159
.await?;
160
Ok(())
161
})
162
});
163
164
// Create an instance which calls an async import twice.
165
let module = Module::new(
166
store.engine(),
167
"
168
(module
169
(import \"\" \"\" (func))
170
(func (export \"\")
171
;; call imported function which recursively does an async
172
;; call
173
call 0
174
;; do it again, and our various pointers all better align
175
call 0))
176
",
177
)
178
.unwrap();
179
180
let instance = Instance::new_async(&mut store, &module, &[func2.into()])
181
.await
182
.unwrap();
183
let func = instance.get_func(&mut store, "").unwrap();
184
func.call_async(&mut store, &[], &mut []).await.unwrap();
185
}
186
187
#[tokio::test]
188
#[cfg_attr(miri, ignore)]
189
async fn suspend_while_suspending() {
190
let mut store = async_store();
191
192
// Create a synchronous function which calls our asynchronous function and
193
// runs it locally. This shouldn't generally happen but we know everything
194
// is synchronous in this test so it's fine for us to do this.
195
//
196
// The purpose of this test is intended to stress various cases in how
197
// we manage pointers in ways that are not necessarily common but are still
198
// possible in safe code.
199
let func_ty = FuncType::new(store.engine(), None, None);
200
let async_thunk = Func::new_async(&mut store, func_ty, |_caller, _params, _results| {
201
Box::new(async { Ok(()) })
202
});
203
let func_ty = FuncType::new(store.engine(), None, None);
204
let sync_call_async_thunk =
205
Func::new(&mut store, func_ty, move |mut caller, _params, _results| {
206
let mut future = Box::pin(async_thunk.call_async(&mut caller, &[], &mut []));
207
let poll = future
208
.as_mut()
209
.poll(&mut Context::from_waker(Waker::noop()));
210
assert!(poll.is_ready());
211
Ok(())
212
});
213
214
// A small async function that simply awaits once to pump the loops and
215
// then finishes.
216
let func_ty = FuncType::new(store.engine(), None, None);
217
let async_import = Func::new_async(&mut store, func_ty, move |_caller, _params, _results| {
218
Box::new(async move {
219
tokio::task::yield_now().await;
220
Ok(())
221
})
222
});
223
224
let module = Module::new(
225
store.engine(),
226
"
227
(module
228
(import \"\" \"\" (func $sync_call_async_thunk))
229
(import \"\" \"\" (func $async_import))
230
(func (export \"\")
231
;; Set some store-local state and pointers
232
call $sync_call_async_thunk
233
;; .. and hopefully it's all still configured correctly
234
call $async_import))
235
",
236
)
237
.unwrap();
238
let instance = Instance::new_async(
239
&mut store,
240
&module,
241
&[sync_call_async_thunk.into(), async_import.into()],
242
)
243
.await
244
.unwrap();
245
let func = instance.get_func(&mut store, "").unwrap();
246
func.call_async(&mut store, &[], &mut []).await.unwrap();
247
}
248
249
#[tokio::test]
250
async fn cancel_during_run() {
251
let mut store = Store::new(&Engine::new(Config::new().async_support(true)).unwrap(), 0);
252
253
let func_ty = FuncType::new(store.engine(), None, None);
254
let async_thunk = Func::new_async(&mut store, func_ty, move |mut caller, _params, _results| {
255
assert_eq!(*caller.data(), 0);
256
*caller.data_mut() = 1;
257
let dtor = SetOnDrop(caller);
258
Box::new(async move {
259
// SetOnDrop is not destroyed when dropping the reference of it
260
// here. Instead, it is moved into the future where it's forced
261
// to live in and will be destroyed at the end of the future.
262
let _ = &dtor;
263
tokio::task::yield_now().await;
264
Ok(())
265
})
266
});
267
// Shouldn't have called anything yet...
268
assert_eq!(*store.data(), 0);
269
270
// Create our future, but as per async conventions this still doesn't
271
// actually do anything. No wasm or host function has been called yet.
272
let future = Box::pin(async_thunk.call_async(&mut store, &[], &mut []));
273
274
// Push the future forward one tick, which actually runs the host code in
275
// our async func. Our future is designed to be pending once, however.
276
let future = PollOnce::new(future).await;
277
278
// Now that our future is running (on a separate, now-suspended fiber), drop
279
// the future and that should deallocate all the Rust bits as well.
280
drop(future);
281
assert_eq!(*store.data(), 2);
282
283
struct SetOnDrop<'a>(Caller<'a, usize>);
284
285
impl Drop for SetOnDrop<'_> {
286
fn drop(&mut self) {
287
assert_eq!(*self.0.data(), 1);
288
*self.0.data_mut() = 2;
289
}
290
}
291
}
292
293
#[tokio::test]
294
#[cfg_attr(miri, ignore)]
295
async fn iloop_with_fuel() {
296
let engine = Engine::new(Config::new().async_support(true).consume_fuel(true)).unwrap();
297
let mut store = Store::new(&engine, ());
298
store.set_fuel(10_000).unwrap();
299
store.fuel_async_yield_interval(Some(100)).unwrap();
300
let module = Module::new(
301
&engine,
302
"
303
(module
304
(func (loop br 0))
305
(start 0)
306
)
307
",
308
)
309
.unwrap();
310
let instance = Instance::new_async(&mut store, &module, &[]);
311
312
// This should yield a bunch of times but eventually finish
313
let (_, pending) = CountPending::new(Box::pin(instance)).await;
314
assert_eq!(pending, 99);
315
}
316
317
#[tokio::test]
318
#[cfg_attr(miri, ignore)]
319
async fn fuel_eventually_finishes() {
320
let engine = Engine::new(Config::new().async_support(true).consume_fuel(true)).unwrap();
321
let mut store = Store::new(&engine, ());
322
store.set_fuel(u64::MAX).unwrap();
323
store.fuel_async_yield_interval(Some(10)).unwrap();
324
let module = Module::new(
325
&engine,
326
"
327
(module
328
(func
329
(local i32)
330
i32.const 100
331
local.set 0
332
(loop
333
local.get 0
334
i32.const -1
335
i32.add
336
local.tee 0
337
br_if 0)
338
)
339
(start 0)
340
)
341
",
342
)
343
.unwrap();
344
let instance = Instance::new_async(&mut store, &module, &[]);
345
instance.await.unwrap();
346
}
347
348
#[tokio::test]
349
async fn async_with_pooling_stacks() {
350
let mut pool = crate::small_pool_config();
351
pool.total_stacks(1)
352
.max_memory_size(1 << 16)
353
.table_elements(0);
354
let mut config = Config::new();
355
config.async_support(true);
356
config.allocation_strategy(InstanceAllocationStrategy::Pooling(pool));
357
config.memory_guard_size(0);
358
config.memory_reservation(1 << 16);
359
360
let engine = Engine::new(&config).unwrap();
361
let mut store = Store::new(&engine, ());
362
let func_ty = FuncType::new(store.engine(), None, None);
363
let func = Func::new_async(&mut store, func_ty, move |_caller, _params, _results| {
364
Box::new(async { Ok(()) })
365
});
366
367
run_smoke_test(&mut store, func).await;
368
run_smoke_typed_test(&mut store, func).await;
369
}
370
371
#[tokio::test]
372
async fn async_host_func_with_pooling_stacks() -> Result<()> {
373
let mut pooling = crate::small_pool_config();
374
pooling
375
.total_stacks(1)
376
.max_memory_size(1 << 16)
377
.table_elements(0);
378
let mut config = Config::new();
379
config.async_support(true);
380
config.allocation_strategy(InstanceAllocationStrategy::Pooling(pooling));
381
config.memory_guard_size(0);
382
config.memory_reservation(1 << 16);
383
384
let mut store = Store::new(&Engine::new(&config)?, ());
385
let mut linker = Linker::new(store.engine());
386
linker.func_new_async(
387
"",
388
"",
389
FuncType::new(store.engine(), None, None),
390
move |_caller, _params, _results| Box::new(async { Ok(()) }),
391
)?;
392
393
let func = linker.get(&mut store, "", "").unwrap().into_func().unwrap();
394
run_smoke_test(&mut store, func).await;
395
run_smoke_typed_test(&mut store, func).await;
396
Ok(())
397
}
398
399
#[tokio::test]
400
#[cfg_attr(miri, ignore)]
401
async fn async_mpk_protection() -> Result<()> {
402
let _ = env_logger::try_init();
403
404
// Construct a pool with MPK protection enabled; note that the MPK
405
// protection is configured in `small_pool_config`.
406
let mut pooling = crate::small_pool_config();
407
pooling
408
.total_memories(10)
409
.total_stacks(2)
410
.max_memory_size(1 << 16)
411
.table_elements(0);
412
let mut config = Config::new();
413
config.async_support(true);
414
config.allocation_strategy(InstanceAllocationStrategy::Pooling(pooling));
415
config.memory_reservation(1 << 26);
416
config.epoch_interruption(true);
417
let engine = Engine::new(&config)?;
418
419
// Craft a module that loops for several iterations and checks whether it
420
// has access to its memory range (0x0-0x10000).
421
const WAT: &str = "
422
(module
423
(func $start
424
(local $i i32)
425
(local.set $i (i32.const 3))
426
(loop $cont
427
(drop (i32.load (i32.const 0)))
428
(drop (i32.load (i32.const 0xfffc)))
429
(br_if $cont (local.tee $i (i32.sub (local.get $i) (i32.const 1))))))
430
(memory 1)
431
(start $start))
432
";
433
434
// Start two instances of the module in separate fibers, `a` and `b`.
435
async fn run_instance(engine: &Engine, name: &str) -> Instance {
436
let mut store = Store::new(&engine, ());
437
store.set_epoch_deadline(0);
438
store.epoch_deadline_async_yield_and_update(0);
439
let module = Module::new(store.engine(), WAT).unwrap();
440
println!("[{name}] building instance");
441
Instance::new_async(&mut store, &module, &[]).await.unwrap()
442
}
443
let mut a = Box::pin(run_instance(&engine, "a"));
444
let mut b = Box::pin(run_instance(&engine, "b"));
445
446
// Alternately poll each instance until completion. This should exercise
447
// fiber suspensions requiring the `Store` to appropriately save and restore
448
// the PKRU context between suspensions (see `AsyncCx::block_on`).
449
for i in 0..10 {
450
if i % 2 == 0 {
451
match PollOnce::new(a).await {
452
Ok(_) => {
453
println!("[a] done");
454
break;
455
}
456
Err(a_) => {
457
println!("[a] not done");
458
a = a_;
459
}
460
}
461
} else {
462
match PollOnce::new(b).await {
463
Ok(_) => {
464
println!("[b] done");
465
break;
466
}
467
Err(b_) => {
468
println!("[b] not done");
469
b = b_;
470
}
471
}
472
}
473
}
474
475
Ok(())
476
}
477
478
/// This will execute the `future` provided to completion and each invocation of
479
/// `poll` for the future will be executed on a separate thread.
480
pub async fn execute_across_threads<F>(future: F) -> F::Output
481
where
482
F: Future + Send + 'static,
483
F::Output: Send,
484
{
485
let mut future = Box::pin(future);
486
loop {
487
let once = PollOnce::new(future);
488
let handle = tokio::runtime::Handle::current();
489
let result = std::thread::spawn(move || handle.block_on(once))
490
.join()
491
.unwrap();
492
match result {
493
Ok(val) => break val,
494
Err(f) => future = f,
495
}
496
}
497
}
498
499
#[tokio::test]
500
#[cfg_attr(miri, ignore)]
501
async fn resume_separate_thread() {
502
// This test will poll the following future on two threads. Simulating a
503
// trap requires accessing TLS info, so that should be preserved correctly.
504
execute_across_threads(async {
505
let mut store = async_store();
506
let module = Module::new(
507
store.engine(),
508
"
509
(module
510
(import \"\" \"\" (func))
511
(start 0)
512
)
513
",
514
)
515
.unwrap();
516
let func = Func::wrap_async(&mut store, |_, _: ()| {
517
Box::new(async {
518
tokio::task::yield_now().await;
519
Err::<(), _>(anyhow!("test"))
520
})
521
});
522
let result = Instance::new_async(&mut store, &module, &[func.into()]).await;
523
assert!(result.is_err());
524
})
525
.await;
526
}
527
528
#[tokio::test]
529
#[cfg_attr(miri, ignore)]
530
async fn resume_separate_thread2() {
531
// This test will poll the following future on two threads. Catching a
532
// signal requires looking up TLS information to determine whether it's a
533
// trap to handle or not, so that must be preserved correctly across threads.
534
execute_across_threads(async {
535
let mut store = async_store();
536
let module = Module::new(
537
store.engine(),
538
"
539
(module
540
(import \"\" \"\" (func))
541
(func $start
542
call 0
543
unreachable)
544
(start $start)
545
)
546
",
547
)
548
.unwrap();
549
let func = Func::wrap_async(&mut store, |_, _: ()| {
550
Box::new(async {
551
tokio::task::yield_now().await;
552
})
553
});
554
let result = Instance::new_async(&mut store, &module, &[func.into()]).await;
555
assert!(result.is_err());
556
})
557
.await;
558
}
559
560
#[tokio::test]
561
#[cfg_attr(miri, ignore)]
562
async fn resume_separate_thread3() {
563
let _ = env_logger::try_init();
564
565
// This test doesn't actually do anything with cross-thread polls, but
566
// instead it deals with scheduling futures at "odd" times.
567
//
568
// First we'll set up a *synchronous* call which will initialize TLS info.
569
// This call is simply to a host-defined function, but it still has the same
570
// "enter into wasm" semantics since it's just calling a trampoline. In this
571
// situation we'll set up the TLS info so it's in place while the body of
572
// the function executes...
573
let mut store = Store::new(&Engine::default(), None);
574
let f = Func::wrap(&mut store, move |mut caller: Caller<'_, _>| -> Result<()> {
575
// ... and the execution of this host-defined function (while the TLS
576
// info is initialized), will set up a recursive call into wasm. This
577
// recursive call will be done asynchronously so we can suspend it
578
// halfway through.
579
let f = async {
580
let mut store = async_store();
581
let module = Module::new(
582
store.engine(),
583
"
584
(module
585
(import \"\" \"\" (func))
586
(start 0)
587
)
588
",
589
)
590
.unwrap();
591
let func = Func::wrap_async(&mut store, |_, _: ()| {
592
Box::new(async {
593
tokio::task::yield_now().await;
594
})
595
});
596
drop(Instance::new_async(&mut store, &module, &[func.into()]).await);
597
unreachable!()
598
};
599
let mut future = Box::pin(f);
600
let poll = future
601
.as_mut()
602
.poll(&mut Context::from_waker(Waker::noop()));
603
assert!(poll.is_pending());
604
605
// ... so at this point our call into wasm is suspended. The call into
606
// wasm will have overwritten TLS info, and we sure hope that the
607
// information is restored at this point. Note that we squirrel away the
608
// future somewhere else to get dropped later. If we were to drop it
609
// here then we would reenter the future's suspended stack to clean it
610
// up, which would do more alterations of TLS information we're not
611
// testing here.
612
*caller.data_mut() = Some(future);
613
614
// ... all in all this function will need access to the original TLS
615
// information to raise the trap. This TLS information should be
616
// restored even though the asynchronous execution is suspended.
617
bail!("")
618
});
619
assert!(f.call(&mut store, &[], &mut []).is_err());
620
}
621
622
#[tokio::test]
623
#[cfg_attr(miri, ignore)]
624
async fn recursive_async() -> Result<()> {
625
let _ = env_logger::try_init();
626
let mut store = async_store();
627
let m = Module::new(
628
store.engine(),
629
"(module
630
(func (export \"overflow\") call 0)
631
(func (export \"normal\"))
632
)",
633
)?;
634
let i = Instance::new_async(&mut store, &m, &[]).await?;
635
let overflow = i.get_typed_func::<(), ()>(&mut store, "overflow")?;
636
let normal = i.get_typed_func::<(), ()>(&mut store, "normal")?;
637
let f2 = Func::wrap_async(&mut store, move |mut caller, _: ()| {
638
let normal = normal.clone();
639
let overflow = overflow.clone();
640
Box::new(async move {
641
// recursive async calls shouldn't immediately stack overflow...
642
normal.call_async(&mut caller, ()).await?;
643
644
// ... but calls that actually stack overflow should indeed stack
645
// overflow
646
let err = overflow
647
.call_async(&mut caller, ())
648
.await
649
.unwrap_err()
650
.downcast::<Trap>()?;
651
assert_eq!(err, Trap::StackOverflow);
652
Ok(())
653
})
654
});
655
f2.call_async(&mut store, &[], &mut []).await?;
656
Ok(())
657
}
658
659
#[tokio::test]
660
#[cfg_attr(miri, ignore)]
661
async fn linker_module_command() -> Result<()> {
662
let mut store = async_store();
663
let mut linker = Linker::new(store.engine());
664
665
let module1 = Module::new(
666
store.engine(),
667
r#"
668
(module
669
(global $g (mut i32) (i32.const 0))
670
671
(func (export "_start"))
672
673
(func (export "g") (result i32)
674
global.get $g
675
i32.const 1
676
global.set $g)
677
)
678
"#,
679
)?;
680
681
let module2 = Module::new(
682
store.engine(),
683
r#"
684
(module
685
(import "" "g" (func (result i32)))
686
687
(func (export "get") (result i32)
688
call 0)
689
)
690
"#,
691
)?;
692
693
linker.module_async(&mut store, "", &module1).await?;
694
let instance = linker.instantiate_async(&mut store, &module2).await?;
695
let f = instance.get_typed_func::<(), i32>(&mut store, "get")?;
696
assert_eq!(f.call_async(&mut store, ()).await?, 0);
697
assert_eq!(f.call_async(&mut store, ()).await?, 0);
698
699
Ok(())
700
}
701
702
#[tokio::test]
703
#[cfg_attr(miri, ignore)]
704
async fn linker_module_reactor() -> Result<()> {
705
let mut store = async_store();
706
let mut linker = Linker::new(store.engine());
707
let module1 = Module::new(
708
store.engine(),
709
r#"
710
(module
711
(global $g (mut i32) (i32.const 0))
712
713
(func (export "g") (result i32)
714
global.get $g
715
i32.const 1
716
global.set $g)
717
)
718
"#,
719
)?;
720
let module2 = Module::new(
721
store.engine(),
722
r#"
723
(module
724
(import "" "g" (func (result i32)))
725
726
(func (export "get") (result i32)
727
call 0)
728
)
729
"#,
730
)?;
731
732
linker.module_async(&mut store, "", &module1).await?;
733
let instance = linker.instantiate_async(&mut store, &module2).await?;
734
let f = instance.get_typed_func::<(), i32>(&mut store, "get")?;
735
assert_eq!(f.call_async(&mut store, ()).await?, 0);
736
assert_eq!(f.call_async(&mut store, ()).await?, 1);
737
738
Ok(())
739
}
740
741
pub struct CountPending<F> {
742
future: F,
743
yields: usize,
744
}
745
746
impl<F> CountPending<F> {
747
pub fn new(future: F) -> CountPending<F> {
748
CountPending { future, yields: 0 }
749
}
750
}
751
752
impl<F> Future for CountPending<F>
753
where
754
F: Future + Unpin,
755
{
756
type Output = (F::Output, usize);
757
758
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
759
match Pin::new(&mut self.future).poll(cx) {
760
Poll::Pending => {
761
self.yields += 1;
762
Poll::Pending
763
}
764
Poll::Ready(e) => Poll::Ready((e, self.yields)),
765
}
766
}
767
}
768
769
pub struct PollOnce<F>(Option<F>);
770
771
impl<F> PollOnce<F> {
772
pub fn new(future: F) -> PollOnce<F> {
773
PollOnce(Some(future))
774
}
775
}
776
777
impl<F> Future for PollOnce<F>
778
where
779
F: Future + Unpin,
780
{
781
type Output = Result<F::Output, F>;
782
783
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
784
let mut future = self.0.take().unwrap();
785
match Pin::new(&mut future).poll(cx) {
786
Poll::Pending => Poll::Ready(Err(future)),
787
Poll::Ready(val) => Poll::Ready(Ok(val)),
788
}
789
}
790
}
791
792
#[tokio::test]
793
#[cfg_attr(miri, ignore)]
794
async fn non_stacky_async_activations() -> Result<()> {
795
let mut config = Config::new();
796
config.async_support(true);
797
let engine = Engine::new(&config)?;
798
let mut store1: Store<Option<Pin<Box<dyn Future<Output = Result<()>> + Send>>>> =
799
Store::new(&engine, None);
800
let mut linker1 = Linker::new(&engine);
801
802
let module1 = Module::new(
803
&engine,
804
r#"
805
(module $m1
806
(import "" "host_capture_stack" (func $host_capture_stack))
807
(import "" "start_async_instance" (func $start_async_instance))
808
(func $capture_stack (export "capture_stack")
809
call $host_capture_stack
810
)
811
(func $run_sync (export "run_sync")
812
call $start_async_instance
813
)
814
)
815
"#,
816
)?;
817
818
let module2 = Module::new(
819
&engine,
820
r#"
821
(module $m2
822
(import "" "yield" (func $yield))
823
824
(func $run_async (export "run_async")
825
call $yield
826
)
827
)
828
"#,
829
)?;
830
831
let stacks = Arc::new(Mutex::new(vec![]));
832
fn capture_stack(stacks: &Arc<Mutex<Vec<WasmBacktrace>>>, store: impl AsContext) {
833
let mut stacks = stacks.lock().unwrap();
834
stacks.push(wasmtime::WasmBacktrace::force_capture(store));
835
}
836
837
linker1.func_wrap_async("", "host_capture_stack", {
838
let stacks = stacks.clone();
839
move |caller, _: ()| {
840
capture_stack(&stacks, &caller);
841
Box::new(async { Ok(()) })
842
}
843
})?;
844
845
linker1.func_wrap_async("", "start_async_instance", {
846
let stacks = stacks.clone();
847
move |mut caller, _: ()| {
848
let stacks = stacks.clone();
849
capture_stack(&stacks, &caller);
850
851
let module2 = module2.clone();
852
let mut store2 = Store::new(caller.engine(), ());
853
let mut linker2 = Linker::<()>::new(caller.engine());
854
linker2
855
.func_wrap_async("", "yield", {
856
let stacks = stacks.clone();
857
move |caller, _: ()| {
858
let stacks = stacks.clone();
859
Box::new(async move {
860
capture_stack(&stacks, &caller);
861
tokio::task::yield_now().await;
862
capture_stack(&stacks, &caller);
863
Ok(())
864
})
865
}
866
})
867
.unwrap();
868
869
Box::new(async move {
870
let future = PollOnce::new(Box::pin({
871
let stacks = stacks.clone();
872
async move {
873
let instance2 = linker2.instantiate_async(&mut store2, &module2).await?;
874
875
instance2
876
.get_func(&mut store2, "run_async")
877
.unwrap()
878
.call_async(&mut store2, &[], &mut [])
879
.await?;
880
881
capture_stack(&stacks, &store2);
882
anyhow::Ok(())
883
}
884
}) as _)
885
.await
886
.err()
887
.unwrap();
888
capture_stack(&stacks, &caller);
889
*caller.data_mut() = Some(future);
890
Ok(())
891
})
892
}
893
})?;
894
895
let instance1 = linker1.instantiate_async(&mut store1, &module1).await?;
896
instance1
897
.get_typed_func::<(), ()>(&mut store1, "run_sync")?
898
.call_async(&mut store1, ())
899
.await?;
900
let future = store1.data_mut().take().unwrap();
901
future.await?;
902
903
instance1
904
.get_typed_func::<(), ()>(&mut store1, "capture_stack")?
905
.call_async(&mut store1, ())
906
.await?;
907
908
let stacks = stacks.lock().unwrap();
909
eprintln!("stacks = {stacks:#?}");
910
911
assert_eq!(stacks.len(), 6);
912
for (actual, expected) in stacks.iter().zip(vec![
913
vec!["run_sync"],
914
vec!["run_async"],
915
vec!["run_sync"],
916
vec!["run_async"],
917
vec![],
918
vec!["capture_stack"],
919
]) {
920
eprintln!("expected = {expected:?}");
921
eprintln!("actual = {actual:?}");
922
assert_eq!(actual.frames().len(), expected.len());
923
for (actual, expected) in actual.frames().iter().zip(expected) {
924
assert_eq!(actual.func_name(), Some(expected));
925
}
926
}
927
928
Ok(())
929
}
930
931
#[tokio::test]
932
#[cfg_attr(miri, ignore)]
933
async fn gc_preserves_externref_on_historical_async_stacks() -> Result<()> {
934
let _ = env_logger::try_init();
935
936
let mut config = Config::new();
937
config.async_support(true);
938
let engine = Engine::new(&config)?;
939
940
let module = Module::new(
941
&engine,
942
r#"
943
(module $m1
944
(import "" "gc" (func $gc))
945
(import "" "recurse" (func $recurse (param i32)))
946
(import "" "test" (func $test (param i32 externref)))
947
(func (export "run") (param i32 externref)
948
local.get 0
949
if
950
local.get 0
951
i32.const -1
952
i32.add
953
call $recurse
954
else
955
call $gc
956
end
957
958
local.get 0
959
local.get 1
960
call $test
961
)
962
)
963
"#,
964
)?;
965
966
type F = TypedFunc<(i32, Option<Rooted<ExternRef>>), ()>;
967
968
let mut store = Store::new(&engine, None);
969
let mut linker = Linker::<Option<F>>::new(&engine);
970
linker.func_wrap_async("", "gc", |mut cx: Caller<'_, _>, ()| {
971
Box::new(async move { cx.gc_async(None).await })
972
})?;
973
linker.func_wrap(
974
"",
975
"test",
976
|cx: Caller<'_, _>, val: i32, handle: Option<Rooted<ExternRef>>| -> Result<()> {
977
assert_eq!(
978
handle.unwrap().data(&cx)?.unwrap().downcast_ref(),
979
Some(&val)
980
);
981
Ok(())
982
},
983
)?;
984
linker.func_wrap_async(
985
"",
986
"recurse",
987
|mut cx: Caller<'_, Option<F>>, (val,): (i32,)| {
988
let func = cx.data().clone().unwrap();
989
Box::new(async move {
990
let r = Some(ExternRef::new_async(&mut cx, val).await?);
991
Ok(func.call_async(&mut cx, (val, r)).await)
992
})
993
},
994
)?;
995
let instance = linker.instantiate_async(&mut store, &module).await?;
996
let func: F = instance.get_typed_func(&mut store, "run")?;
997
*store.data_mut() = Some(func.clone());
998
999
let r = Some(ExternRef::new_async(&mut store, 5).await?);
1000
func.call_async(&mut store, (5, r)).await?;
1001
1002
Ok(())
1003
}
1004
1005
#[tokio::test]
1006
#[cfg_attr(miri, ignore)]
1007
async fn async_gc_with_func_new_and_func_wrap() -> Result<()> {
1008
let _ = env_logger::try_init();
1009
1010
let mut config = Config::new();
1011
config.async_support(true);
1012
config.wasm_gc(true);
1013
let engine = Engine::new(&config)?;
1014
1015
let module = Module::new(
1016
&engine,
1017
r#"
1018
(module $m1
1019
(import "" "a" (func $a (result externref structref arrayref)))
1020
(import "" "b" (func $b (result externref structref arrayref)))
1021
1022
(table 2 funcref)
1023
(elem (i32.const 0) func $a $b)
1024
1025
(func (export "a")
1026
(call $call (i32.const 0))
1027
)
1028
(func (export "b")
1029
(call $call (i32.const 1))
1030
)
1031
1032
(func $call (param i32)
1033
(local $cnt i32)
1034
(loop $l
1035
(call_indirect (result externref structref arrayref) (local.get 0))
1036
drop
1037
drop
1038
drop
1039
1040
(local.set $cnt (i32.add (local.get $cnt) (i32.const 1)))
1041
1042
(if (i32.lt_u (local.get $cnt) (i32.const 5000))
1043
(then (br $l)))
1044
)
1045
)
1046
)
1047
"#,
1048
)?;
1049
1050
let mut linker = Linker::<()>::new(&engine);
1051
linker.func_wrap_async("", "a", |mut cx: Caller<'_, _>, ()| {
1052
Box::new(async move {
1053
let externref = ExternRef::new_async(&mut cx, 100).await?;
1054
1055
let struct_ty = StructType::new(cx.engine(), [])?;
1056
let struct_pre = StructRefPre::new(&mut cx, struct_ty);
1057
let structref = StructRef::new_async(&mut cx, &struct_pre, &[]).await?;
1058
1059
let array_ty = ArrayType::new(
1060
cx.engine(),
1061
FieldType::new(Mutability::Var, ValType::I32.into()),
1062
);
1063
let array_pre = ArrayRefPre::new(&mut cx, array_ty);
1064
let arrayref = ArrayRef::new_fixed_async(&mut cx, &array_pre, &[]).await?;
1065
1066
Ok((Some(externref), Some(structref), Some(arrayref)))
1067
})
1068
})?;
1069
let ty = FuncType::new(
1070
&engine,
1071
[],
1072
[ValType::EXTERNREF, ValType::STRUCTREF, ValType::ARRAYREF],
1073
);
1074
linker.func_new_async("", "b", ty, |mut cx, _, results| {
1075
Box::new(async move {
1076
results[0] = ExternRef::new_async(&mut cx, 100).await?.into();
1077
1078
let struct_ty = StructType::new(cx.engine(), [])?;
1079
let struct_pre = StructRefPre::new(&mut cx, struct_ty);
1080
results[1] = StructRef::new_async(&mut cx, &struct_pre, &[])
1081
.await?
1082
.into();
1083
1084
let array_ty = ArrayType::new(
1085
cx.engine(),
1086
FieldType::new(Mutability::Var, ValType::I32.into()),
1087
);
1088
let array_pre = ArrayRefPre::new(&mut cx, array_ty);
1089
results[2] = ArrayRef::new_fixed_async(&mut cx, &array_pre, &[])
1090
.await?
1091
.into();
1092
1093
Ok(())
1094
})
1095
})?;
1096
1097
let mut store = Store::new(&engine, ());
1098
let instance = linker.instantiate_async(&mut store, &module).await?;
1099
let a = instance.get_typed_func::<(), ()>(&mut store, "a")?;
1100
a.call_async(&mut store, ()).await?;
1101
1102
let mut store = Store::new(&engine, ());
1103
let instance = linker.instantiate_async(&mut store, &module).await?;
1104
let b = instance.get_typed_func::<(), ()>(&mut store, "b")?;
1105
b.call_async(&mut store, ()).await?;
1106
1107
Ok(())
1108
}
1109
1110