Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bytecodealliance
GitHub Repository: bytecodealliance/wasmtime
Path: blob/main/tests/all/async_functions.rs
3050 views
1
use std::future::Future;
2
use std::pin::Pin;
3
use std::sync::atomic::AtomicUsize;
4
use std::sync::atomic::Ordering;
5
use std::sync::{Arc, Mutex};
6
use std::task::{Context, Poll, Waker};
7
use wasmtime::*;
8
9
fn async_store() -> Store<()> {
10
Store::new(&Engine::default(), ())
11
}
12
13
async fn run_smoke_test(store: &mut Store<()>, func: Func) {
14
func.call_async(&mut *store, &[], &mut []).await.unwrap();
15
func.call_async(&mut *store, &[], &mut []).await.unwrap();
16
}
17
18
async fn run_smoke_typed_test(store: &mut Store<()>, func: Func) {
19
let func = func.typed::<(), ()>(&store).unwrap();
20
func.call_async(&mut *store, ()).await.unwrap();
21
func.call_async(&mut *store, ()).await.unwrap();
22
}
23
24
#[tokio::test]
25
async fn smoke() {
26
let mut store = async_store();
27
let func_ty = FuncType::new(store.engine(), None, None);
28
let func = Func::new_async(&mut store, func_ty, move |_caller, _params, _results| {
29
Box::new(async { Ok(()) })
30
});
31
run_smoke_test(&mut store, func).await;
32
run_smoke_typed_test(&mut store, func).await;
33
34
let func = Func::wrap_async(&mut store, move |_caller, _: ()| Box::new(async { Ok(()) }));
35
run_smoke_test(&mut store, func).await;
36
run_smoke_typed_test(&mut store, func).await;
37
}
38
39
#[tokio::test]
40
async fn smoke_host_func() -> Result<()> {
41
let mut store = async_store();
42
let mut linker = Linker::new(store.engine());
43
44
linker.func_new_async(
45
"",
46
"first",
47
FuncType::new(store.engine(), None, None),
48
move |_caller, _params, _results| Box::new(async { Ok(()) }),
49
)?;
50
51
linker.func_wrap_async("", "second", move |_caller, _: ()| {
52
Box::new(async { Ok(()) })
53
})?;
54
55
let func = linker
56
.get(&mut store, "", "first")
57
.unwrap()
58
.into_func()
59
.unwrap();
60
run_smoke_test(&mut store, func).await;
61
run_smoke_typed_test(&mut store, func).await;
62
63
let func = linker
64
.get(&mut store, "", "second")
65
.unwrap()
66
.into_func()
67
.unwrap();
68
run_smoke_test(&mut store, func).await;
69
run_smoke_typed_test(&mut store, func).await;
70
71
Ok(())
72
}
73
74
#[tokio::test]
75
async fn smoke_with_suspension() {
76
let mut store = async_store();
77
let func_ty = FuncType::new(store.engine(), None, None);
78
let func = Func::new_async(&mut store, func_ty, move |_caller, _params, _results| {
79
Box::new(async {
80
tokio::task::yield_now().await;
81
Ok(())
82
})
83
});
84
run_smoke_test(&mut store, func).await;
85
run_smoke_typed_test(&mut store, func).await;
86
87
let func = Func::wrap_async(&mut store, move |_caller, _: ()| {
88
Box::new(async {
89
tokio::task::yield_now().await;
90
Ok(())
91
})
92
});
93
run_smoke_test(&mut store, func).await;
94
run_smoke_typed_test(&mut store, func).await;
95
}
96
97
#[tokio::test]
98
async fn smoke_host_func_with_suspension() -> Result<()> {
99
let mut store = async_store();
100
let mut linker = Linker::new(store.engine());
101
102
linker.func_new_async(
103
"",
104
"first",
105
FuncType::new(store.engine(), None, None),
106
move |_caller, _params, _results| {
107
Box::new(async {
108
tokio::task::yield_now().await;
109
Ok(())
110
})
111
},
112
)?;
113
114
linker.func_wrap_async("", "second", move |_caller, _: ()| {
115
Box::new(async {
116
tokio::task::yield_now().await;
117
Ok(())
118
})
119
})?;
120
121
let func = linker
122
.get(&mut store, "", "first")
123
.unwrap()
124
.into_func()
125
.unwrap();
126
run_smoke_test(&mut store, func).await;
127
run_smoke_typed_test(&mut store, func).await;
128
129
let func = linker
130
.get(&mut store, "", "second")
131
.unwrap()
132
.into_func()
133
.unwrap();
134
run_smoke_test(&mut store, func).await;
135
run_smoke_typed_test(&mut store, func).await;
136
137
Ok(())
138
}
139
140
#[tokio::test]
141
#[cfg_attr(miri, ignore)]
142
async fn recursive_call() {
143
let mut store = async_store();
144
let func_ty = FuncType::new(store.engine(), None, None);
145
let async_wasm_func = Func::new_async(&mut store, func_ty, |_caller, _params, _results| {
146
Box::new(async {
147
tokio::task::yield_now().await;
148
Ok(())
149
})
150
});
151
152
// Create an imported function which recursively invokes another wasm
153
// function asynchronously, although this one is just our own host function
154
// which suffices for this test.
155
let func_ty = FuncType::new(store.engine(), None, None);
156
let func2 = Func::new_async(&mut store, func_ty, move |mut caller, _params, _results| {
157
Box::new(async move {
158
async_wasm_func
159
.call_async(&mut caller, &[], &mut [])
160
.await?;
161
Ok(())
162
})
163
});
164
165
// Create an instance which calls an async import twice.
166
let module = Module::new(
167
store.engine(),
168
"
169
(module
170
(import \"\" \"\" (func))
171
(func (export \"\")
172
;; call imported function which recursively does an async
173
;; call
174
call 0
175
;; do it again, and our various pointers all better align
176
call 0))
177
",
178
)
179
.unwrap();
180
181
let instance = Instance::new_async(&mut store, &module, &[func2.into()])
182
.await
183
.unwrap();
184
let func = instance.get_func(&mut store, "").unwrap();
185
func.call_async(&mut store, &[], &mut []).await.unwrap();
186
}
187
188
#[tokio::test]
189
#[cfg_attr(miri, ignore)]
190
async fn suspend_while_suspending() {
191
let mut store = async_store();
192
193
// Create a synchronous function which calls our asynchronous function and
194
// runs it locally. This shouldn't generally happen but we know everything
195
// is synchronous in this test so it's fine for us to do this.
196
//
197
// The purpose of this test is intended to stress various cases in how
198
// we manage pointers in ways that are not necessarily common but are still
199
// possible in safe code.
200
let func_ty = FuncType::new(store.engine(), None, None);
201
let async_thunk = Func::new_async(&mut store, func_ty, |_caller, _params, _results| {
202
Box::new(async { Ok(()) })
203
});
204
let func_ty = FuncType::new(store.engine(), None, None);
205
let sync_call_async_thunk =
206
Func::new(&mut store, func_ty, move |mut caller, _params, _results| {
207
let mut future = Box::pin(async_thunk.call_async(&mut caller, &[], &mut []));
208
let poll = future
209
.as_mut()
210
.poll(&mut Context::from_waker(Waker::noop()));
211
assert!(poll.is_ready());
212
Ok(())
213
});
214
215
// A small async function that simply awaits once to pump the loops and
216
// then finishes.
217
let func_ty = FuncType::new(store.engine(), None, None);
218
let async_import = Func::new_async(&mut store, func_ty, move |_caller, _params, _results| {
219
Box::new(async move {
220
tokio::task::yield_now().await;
221
Ok(())
222
})
223
});
224
225
let module = Module::new(
226
store.engine(),
227
"
228
(module
229
(import \"\" \"\" (func $sync_call_async_thunk))
230
(import \"\" \"\" (func $async_import))
231
(func (export \"\")
232
;; Set some store-local state and pointers
233
call $sync_call_async_thunk
234
;; .. and hopefully it's all still configured correctly
235
call $async_import))
236
",
237
)
238
.unwrap();
239
let instance = Instance::new_async(
240
&mut store,
241
&module,
242
&[sync_call_async_thunk.into(), async_import.into()],
243
)
244
.await
245
.unwrap();
246
let func = instance.get_func(&mut store, "").unwrap();
247
func.call_async(&mut store, &[], &mut []).await.unwrap();
248
}
249
250
#[tokio::test]
251
async fn cancel_during_run() {
252
let mut store = Store::new(&Engine::default(), 0);
253
254
let func_ty = FuncType::new(store.engine(), None, None);
255
let async_thunk = Func::new_async(&mut store, func_ty, move |mut caller, _params, _results| {
256
assert_eq!(*caller.data(), 0);
257
*caller.data_mut() = 1;
258
let dtor = SetOnDrop(caller);
259
Box::new(async move {
260
// SetOnDrop is not destroyed when dropping the reference of it
261
// here. Instead, it is moved into the future where it's forced
262
// to live in and will be destroyed at the end of the future.
263
let _ = &dtor;
264
tokio::task::yield_now().await;
265
Ok(())
266
})
267
});
268
// Shouldn't have called anything yet...
269
assert_eq!(*store.data(), 0);
270
271
// Create our future, but as per async conventions this still doesn't
272
// actually do anything. No wasm or host function has been called yet.
273
let future = Box::pin(async_thunk.call_async(&mut store, &[], &mut []));
274
275
// Push the future forward one tick, which actually runs the host code in
276
// our async func. Our future is designed to be pending once, however.
277
let future = PollOnce::new(future).await;
278
279
// Now that our future is running (on a separate, now-suspended fiber), drop
280
// the future and that should deallocate all the Rust bits as well.
281
drop(future);
282
assert_eq!(*store.data(), 2);
283
284
struct SetOnDrop<'a>(Caller<'a, usize>);
285
286
impl Drop for SetOnDrop<'_> {
287
fn drop(&mut self) {
288
assert_eq!(*self.0.data(), 1);
289
*self.0.data_mut() = 2;
290
}
291
}
292
}
293
294
#[tokio::test]
295
#[cfg_attr(miri, ignore)]
296
async fn iloop_with_fuel() {
297
let engine = Engine::new(Config::new().consume_fuel(true)).unwrap();
298
let mut store = Store::new(&engine, ());
299
store.set_fuel(10_000).unwrap();
300
store.fuel_async_yield_interval(Some(100)).unwrap();
301
let module = Module::new(
302
&engine,
303
"
304
(module
305
(func (loop br 0))
306
(start 0)
307
)
308
",
309
)
310
.unwrap();
311
let instance = Instance::new_async(&mut store, &module, &[]);
312
313
// This should yield a bunch of times but eventually finish
314
let (_, pending) = CountPending::new(Box::pin(instance)).await;
315
assert_eq!(pending, 99);
316
}
317
318
#[tokio::test]
319
#[cfg_attr(miri, ignore)]
320
async fn fuel_eventually_finishes() {
321
let engine = Engine::new(Config::new().consume_fuel(true)).unwrap();
322
let mut store = Store::new(&engine, ());
323
store.set_fuel(u64::MAX).unwrap();
324
store.fuel_async_yield_interval(Some(10)).unwrap();
325
let module = Module::new(
326
&engine,
327
"
328
(module
329
(func
330
(local i32)
331
i32.const 100
332
local.set 0
333
(loop
334
local.get 0
335
i32.const -1
336
i32.add
337
local.tee 0
338
br_if 0)
339
)
340
(start 0)
341
)
342
",
343
)
344
.unwrap();
345
let instance = Instance::new_async(&mut store, &module, &[]);
346
instance.await.unwrap();
347
}
348
349
#[tokio::test]
350
async fn async_with_pooling_stacks() {
351
let mut pool = crate::small_pool_config();
352
pool.total_stacks(1)
353
.max_memory_size(1 << 16)
354
.table_elements(0);
355
let mut config = Config::new();
356
config.allocation_strategy(InstanceAllocationStrategy::Pooling(pool));
357
config.memory_guard_size(0);
358
config.memory_reservation(1 << 16);
359
360
let engine = Engine::new(&config).unwrap();
361
let mut store = Store::new(&engine, ());
362
let func_ty = FuncType::new(store.engine(), None, None);
363
let func = Func::new_async(&mut store, func_ty, move |_caller, _params, _results| {
364
Box::new(async { Ok(()) })
365
});
366
367
run_smoke_test(&mut store, func).await;
368
run_smoke_typed_test(&mut store, func).await;
369
}
370
371
#[tokio::test]
372
async fn async_host_func_with_pooling_stacks() -> Result<()> {
373
let mut pooling = crate::small_pool_config();
374
pooling
375
.total_stacks(1)
376
.max_memory_size(1 << 16)
377
.table_elements(0);
378
let mut config = Config::new();
379
config.allocation_strategy(InstanceAllocationStrategy::Pooling(pooling));
380
config.memory_guard_size(0);
381
config.memory_reservation(1 << 16);
382
383
let mut store = Store::new(&Engine::new(&config)?, ());
384
let mut linker = Linker::new(store.engine());
385
linker.func_new_async(
386
"",
387
"",
388
FuncType::new(store.engine(), None, None),
389
move |_caller, _params, _results| Box::new(async { Ok(()) }),
390
)?;
391
392
let func = linker.get(&mut store, "", "").unwrap().into_func().unwrap();
393
run_smoke_test(&mut store, func).await;
394
run_smoke_typed_test(&mut store, func).await;
395
Ok(())
396
}
397
398
#[tokio::test]
399
#[cfg_attr(miri, ignore)]
400
async fn async_mpk_protection() -> Result<()> {
401
let _ = env_logger::try_init();
402
403
// Construct a pool with MPK protection enabled; note that the MPK
404
// protection is configured in `small_pool_config`.
405
let mut pooling = crate::small_pool_config();
406
pooling
407
.total_memories(10)
408
.total_stacks(2)
409
.max_memory_size(1 << 16)
410
.table_elements(0);
411
let mut config = Config::new();
412
config.allocation_strategy(InstanceAllocationStrategy::Pooling(pooling));
413
config.memory_reservation(1 << 26);
414
config.epoch_interruption(true);
415
let engine = Engine::new(&config)?;
416
417
// Craft a module that loops for several iterations and checks whether it
418
// has access to its memory range (0x0-0x10000).
419
const WAT: &str = "
420
(module
421
(func $start
422
(local $i i32)
423
(local.set $i (i32.const 3))
424
(loop $cont
425
(drop (i32.load (i32.const 0)))
426
(drop (i32.load (i32.const 0xfffc)))
427
(br_if $cont (local.tee $i (i32.sub (local.get $i) (i32.const 1))))))
428
(memory 1)
429
(start $start))
430
";
431
432
// Start two instances of the module in separate fibers, `a` and `b`.
433
async fn run_instance(engine: &Engine, name: &str) -> Instance {
434
let mut store = Store::new(&engine, ());
435
store.set_epoch_deadline(0);
436
store.epoch_deadline_async_yield_and_update(0);
437
let module = Module::new(store.engine(), WAT).unwrap();
438
println!("[{name}] building instance");
439
Instance::new_async(&mut store, &module, &[]).await.unwrap()
440
}
441
let mut a = Box::pin(run_instance(&engine, "a"));
442
let mut b = Box::pin(run_instance(&engine, "b"));
443
444
// Alternately poll each instance until completion. This should exercise
445
// fiber suspensions requiring the `Store` to appropriately save and restore
446
// the PKRU context between suspensions (see `AsyncCx::block_on`).
447
for i in 0..10 {
448
if i % 2 == 0 {
449
match PollOnce::new(a).await {
450
Ok(_) => {
451
println!("[a] done");
452
break;
453
}
454
Err(a_) => {
455
println!("[a] not done");
456
a = a_;
457
}
458
}
459
} else {
460
match PollOnce::new(b).await {
461
Ok(_) => {
462
println!("[b] done");
463
break;
464
}
465
Err(b_) => {
466
println!("[b] not done");
467
b = b_;
468
}
469
}
470
}
471
}
472
473
Ok(())
474
}
475
476
/// This will execute the `future` provided to completion and each invocation of
477
/// `poll` for the future will be executed on a separate thread.
478
pub async fn execute_across_threads<F>(future: F) -> F::Output
479
where
480
F: Future + Send + 'static,
481
F::Output: Send,
482
{
483
let mut future = Box::pin(future);
484
loop {
485
let once = PollOnce::new(future);
486
let handle = tokio::runtime::Handle::current();
487
let result = std::thread::spawn(move || handle.block_on(once))
488
.join()
489
.unwrap();
490
match result {
491
Ok(val) => break val,
492
Err(f) => future = f,
493
}
494
}
495
}
496
497
#[tokio::test]
498
#[cfg_attr(miri, ignore)]
499
async fn resume_separate_thread() {
500
// This test will poll the following future on two threads. Simulating a
501
// trap requires accessing TLS info, so that should be preserved correctly.
502
execute_across_threads(async {
503
let mut store = async_store();
504
let module = Module::new(
505
store.engine(),
506
"
507
(module
508
(import \"\" \"\" (func))
509
(start 0)
510
)
511
",
512
)
513
.unwrap();
514
let func = Func::wrap_async(&mut store, |_, _: ()| {
515
Box::new(async {
516
tokio::task::yield_now().await;
517
Err::<(), _>(format_err!("test"))
518
})
519
});
520
let result = Instance::new_async(&mut store, &module, &[func.into()]).await;
521
assert!(result.is_err());
522
})
523
.await;
524
}
525
526
#[tokio::test]
527
#[cfg_attr(miri, ignore)]
528
async fn resume_separate_thread2() {
529
// This test will poll the following future on two threads. Catching a
530
// signal requires looking up TLS information to determine whether it's a
531
// trap to handle or not, so that must be preserved correctly across threads.
532
execute_across_threads(async {
533
let mut store = async_store();
534
let module = Module::new(
535
store.engine(),
536
"
537
(module
538
(import \"\" \"\" (func))
539
(func $start
540
call 0
541
unreachable)
542
(start $start)
543
)
544
",
545
)
546
.unwrap();
547
let func = Func::wrap_async(&mut store, |_, _: ()| {
548
Box::new(async {
549
tokio::task::yield_now().await;
550
})
551
});
552
let result = Instance::new_async(&mut store, &module, &[func.into()]).await;
553
assert!(result.is_err());
554
})
555
.await;
556
}
557
558
#[tokio::test]
559
#[cfg_attr(miri, ignore)]
560
async fn resume_separate_thread3() {
561
let _ = env_logger::try_init();
562
563
// This test doesn't actually do anything with cross-thread polls, but
564
// instead it deals with scheduling futures at "odd" times.
565
//
566
// First we'll set up a *synchronous* call which will initialize TLS info.
567
// This call is simply to a host-defined function, but it still has the same
568
// "enter into wasm" semantics since it's just calling a trampoline. In this
569
// situation we'll set up the TLS info so it's in place while the body of
570
// the function executes...
571
let mut store = Store::new(&Engine::default(), None);
572
let f = Func::wrap(&mut store, move |mut caller: Caller<'_, _>| -> Result<()> {
573
// ... and the execution of this host-defined function (while the TLS
574
// info is initialized), will set up a recursive call into wasm. This
575
// recursive call will be done asynchronously so we can suspend it
576
// halfway through.
577
let f = async {
578
let mut store = async_store();
579
let module = Module::new(
580
store.engine(),
581
"
582
(module
583
(import \"\" \"\" (func))
584
(start 0)
585
)
586
",
587
)
588
.unwrap();
589
let func = Func::wrap_async(&mut store, |_, _: ()| {
590
Box::new(async {
591
tokio::task::yield_now().await;
592
})
593
});
594
drop(Instance::new_async(&mut store, &module, &[func.into()]).await);
595
unreachable!()
596
};
597
let mut future = Box::pin(f);
598
let poll = future
599
.as_mut()
600
.poll(&mut Context::from_waker(Waker::noop()));
601
assert!(poll.is_pending());
602
603
// ... so at this point our call into wasm is suspended. The call into
604
// wasm will have overwritten TLS info, and we sure hope that the
605
// information is restored at this point. Note that we squirrel away the
606
// future somewhere else to get dropped later. If we were to drop it
607
// here then we would reenter the future's suspended stack to clean it
608
// up, which would do more alterations of TLS information we're not
609
// testing here.
610
*caller.data_mut() = Some(future);
611
612
// ... all in all this function will need access to the original TLS
613
// information to raise the trap. This TLS information should be
614
// restored even though the asynchronous execution is suspended.
615
bail!("")
616
});
617
assert!(f.call(&mut store, &[], &mut []).is_err());
618
}
619
620
#[tokio::test]
621
#[cfg_attr(miri, ignore)]
622
async fn resume_separate_thread_tls() {
623
static COUNTER: AtomicUsize = AtomicUsize::new(0);
624
625
struct IncOnDrop;
626
impl Drop for IncOnDrop {
627
fn drop(&mut self) {
628
COUNTER.fetch_add(1, Ordering::SeqCst);
629
}
630
}
631
632
thread_local!(static FOO: IncOnDrop = IncOnDrop);
633
634
// This test will poll the following future on two threads.
635
// We verify that TLS destructors are run correctly.
636
execute_across_threads(async move {
637
let mut store = async_store();
638
let module = Module::new(
639
store.engine(),
640
"
641
(module
642
(import \"\" \"\" (func))
643
(start 0)
644
)
645
",
646
)
647
.unwrap();
648
let func = Func::wrap_async(&mut store, |_, _: ()| {
649
Box::new(async {
650
tokio::task::yield_now().await;
651
FOO.with(|_f| {});
652
Err::<(), _>(format_err!("test"))
653
})
654
});
655
let result = Instance::new_async(&mut store, &module, &[func.into()]).await;
656
assert!(result.is_err());
657
})
658
.await;
659
660
assert_eq!(COUNTER.load(Ordering::SeqCst), 1);
661
}
662
663
#[tokio::test]
664
#[cfg_attr(miri, ignore)]
665
async fn recursive_async() -> Result<()> {
666
let _ = env_logger::try_init();
667
let mut store = async_store();
668
let m = Module::new(
669
store.engine(),
670
"(module
671
(func (export \"overflow\") call 0)
672
(func (export \"normal\"))
673
)",
674
)?;
675
let i = Instance::new_async(&mut store, &m, &[]).await?;
676
let overflow = i.get_typed_func::<(), ()>(&mut store, "overflow")?;
677
let normal = i.get_typed_func::<(), ()>(&mut store, "normal")?;
678
let f2 = Func::wrap_async(&mut store, move |mut caller, _: ()| {
679
let normal = normal.clone();
680
let overflow = overflow.clone();
681
Box::new(async move {
682
// recursive async calls shouldn't immediately stack overflow...
683
normal.call_async(&mut caller, ()).await?;
684
685
// ... but calls that actually stack overflow should indeed stack
686
// overflow
687
let err = overflow
688
.call_async(&mut caller, ())
689
.await
690
.unwrap_err()
691
.downcast::<Trap>()?;
692
assert_eq!(err, Trap::StackOverflow);
693
Ok(())
694
})
695
});
696
f2.call_async(&mut store, &[], &mut []).await?;
697
Ok(())
698
}
699
700
#[tokio::test]
701
#[cfg_attr(miri, ignore)]
702
async fn linker_module_command() -> Result<()> {
703
let mut store = async_store();
704
let mut linker = Linker::new(store.engine());
705
706
let module1 = Module::new(
707
store.engine(),
708
r#"
709
(module
710
(global $g (mut i32) (i32.const 0))
711
712
(func (export "_start"))
713
714
(func (export "g") (result i32)
715
global.get $g
716
i32.const 1
717
global.set $g)
718
)
719
"#,
720
)?;
721
722
let module2 = Module::new(
723
store.engine(),
724
r#"
725
(module
726
(import "" "g" (func (result i32)))
727
728
(func (export "get") (result i32)
729
call 0)
730
)
731
"#,
732
)?;
733
734
linker.module_async(&mut store, "", &module1).await?;
735
let instance = linker.instantiate_async(&mut store, &module2).await?;
736
let f = instance.get_typed_func::<(), i32>(&mut store, "get")?;
737
assert_eq!(f.call_async(&mut store, ()).await?, 0);
738
assert_eq!(f.call_async(&mut store, ()).await?, 0);
739
740
Ok(())
741
}
742
743
#[tokio::test]
744
#[cfg_attr(miri, ignore)]
745
async fn linker_module_reactor() -> Result<()> {
746
let mut store = async_store();
747
let mut linker = Linker::new(store.engine());
748
let module1 = Module::new(
749
store.engine(),
750
r#"
751
(module
752
(global $g (mut i32) (i32.const 0))
753
754
(func (export "g") (result i32)
755
global.get $g
756
i32.const 1
757
global.set $g)
758
)
759
"#,
760
)?;
761
let module2 = Module::new(
762
store.engine(),
763
r#"
764
(module
765
(import "" "g" (func (result i32)))
766
767
(func (export "get") (result i32)
768
call 0)
769
)
770
"#,
771
)?;
772
773
linker.module_async(&mut store, "", &module1).await?;
774
let instance = linker.instantiate_async(&mut store, &module2).await?;
775
let f = instance.get_typed_func::<(), i32>(&mut store, "get")?;
776
assert_eq!(f.call_async(&mut store, ()).await?, 0);
777
assert_eq!(f.call_async(&mut store, ()).await?, 1);
778
779
Ok(())
780
}
781
782
pub struct CountPending<F> {
783
future: F,
784
yields: usize,
785
}
786
787
impl<F> CountPending<F> {
788
pub fn new(future: F) -> CountPending<F> {
789
CountPending { future, yields: 0 }
790
}
791
}
792
793
impl<F> Future for CountPending<F>
794
where
795
F: Future + Unpin,
796
{
797
type Output = (F::Output, usize);
798
799
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
800
match Pin::new(&mut self.future).poll(cx) {
801
Poll::Pending => {
802
self.yields += 1;
803
Poll::Pending
804
}
805
Poll::Ready(e) => Poll::Ready((e, self.yields)),
806
}
807
}
808
}
809
810
pub struct PollOnce<F>(Option<F>);
811
812
impl<F> PollOnce<F> {
813
pub fn new(future: F) -> PollOnce<F> {
814
PollOnce(Some(future))
815
}
816
}
817
818
impl<F> Future for PollOnce<F>
819
where
820
F: Future + Unpin,
821
{
822
type Output = Result<F::Output, F>;
823
824
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
825
let mut future = self.0.take().unwrap();
826
match Pin::new(&mut future).poll(cx) {
827
Poll::Pending => Poll::Ready(Err(future)),
828
Poll::Ready(val) => Poll::Ready(Ok(val)),
829
}
830
}
831
}
832
833
#[tokio::test]
834
#[cfg_attr(miri, ignore)]
835
async fn non_stacky_async_activations() -> Result<()> {
836
let engine = Engine::default();
837
let mut store1: Store<Option<Pin<Box<dyn Future<Output = Result<()>> + Send>>>> =
838
Store::new(&engine, None);
839
let mut linker1 = Linker::new(&engine);
840
841
let module1 = Module::new(
842
&engine,
843
r#"
844
(module $m1
845
(import "" "host_capture_stack" (func $host_capture_stack))
846
(import "" "start_async_instance" (func $start_async_instance))
847
(func $capture_stack (export "capture_stack")
848
call $host_capture_stack
849
)
850
(func $run_sync (export "run_sync")
851
call $start_async_instance
852
)
853
)
854
"#,
855
)?;
856
857
let module2 = Module::new(
858
&engine,
859
r#"
860
(module $m2
861
(import "" "yield" (func $yield))
862
863
(func $run_async (export "run_async")
864
call $yield
865
)
866
)
867
"#,
868
)?;
869
870
let stacks = Arc::new(Mutex::new(vec![]));
871
fn capture_stack(stacks: &Arc<Mutex<Vec<WasmBacktrace>>>, store: impl AsContext) {
872
let mut stacks = stacks.lock().unwrap();
873
stacks.push(wasmtime::WasmBacktrace::force_capture(store));
874
}
875
876
linker1.func_wrap_async("", "host_capture_stack", {
877
let stacks = stacks.clone();
878
move |caller, _: ()| {
879
capture_stack(&stacks, &caller);
880
Box::new(async { Ok(()) })
881
}
882
})?;
883
884
linker1.func_wrap_async("", "start_async_instance", {
885
let stacks = stacks.clone();
886
move |mut caller, _: ()| {
887
let stacks = stacks.clone();
888
capture_stack(&stacks, &caller);
889
890
let module2 = module2.clone();
891
let mut store2 = Store::new(caller.engine(), ());
892
let mut linker2 = Linker::<()>::new(caller.engine());
893
linker2
894
.func_wrap_async("", "yield", {
895
let stacks = stacks.clone();
896
move |caller, _: ()| {
897
let stacks = stacks.clone();
898
Box::new(async move {
899
capture_stack(&stacks, &caller);
900
tokio::task::yield_now().await;
901
capture_stack(&stacks, &caller);
902
Ok(())
903
})
904
}
905
})
906
.unwrap();
907
908
Box::new(async move {
909
let future = PollOnce::new(Box::pin({
910
let stacks = stacks.clone();
911
async move {
912
let instance2 = linker2.instantiate_async(&mut store2, &module2).await?;
913
914
instance2
915
.get_func(&mut store2, "run_async")
916
.unwrap()
917
.call_async(&mut store2, &[], &mut [])
918
.await?;
919
920
capture_stack(&stacks, &store2);
921
wasmtime::error::Ok(())
922
}
923
}) as _)
924
.await
925
.err()
926
.unwrap();
927
capture_stack(&stacks, &caller);
928
*caller.data_mut() = Some(future);
929
Ok(())
930
})
931
}
932
})?;
933
934
let instance1 = linker1.instantiate_async(&mut store1, &module1).await?;
935
instance1
936
.get_typed_func::<(), ()>(&mut store1, "run_sync")?
937
.call_async(&mut store1, ())
938
.await?;
939
let future = store1.data_mut().take().unwrap();
940
future.await?;
941
942
instance1
943
.get_typed_func::<(), ()>(&mut store1, "capture_stack")?
944
.call_async(&mut store1, ())
945
.await?;
946
947
let stacks = stacks.lock().unwrap();
948
eprintln!("stacks = {stacks:#?}");
949
950
assert_eq!(stacks.len(), 6);
951
for (actual, expected) in stacks.iter().zip(vec![
952
vec!["run_sync"],
953
vec!["run_async"],
954
vec!["run_sync"],
955
vec!["run_async"],
956
vec![],
957
vec!["capture_stack"],
958
]) {
959
eprintln!("expected = {expected:?}");
960
eprintln!("actual = {actual:?}");
961
assert_eq!(actual.frames().len(), expected.len());
962
for (actual, expected) in actual.frames().iter().zip(expected) {
963
assert_eq!(actual.func_name(), Some(expected));
964
}
965
}
966
967
Ok(())
968
}
969
970
#[tokio::test]
971
#[cfg_attr(miri, ignore)]
972
async fn gc_preserves_externref_on_historical_async_stacks() -> Result<()> {
973
let _ = env_logger::try_init();
974
975
let engine = Engine::default();
976
977
let module = Module::new(
978
&engine,
979
r#"
980
(module $m1
981
(import "" "gc" (func $gc))
982
(import "" "recurse" (func $recurse (param i32)))
983
(import "" "test" (func $test (param i32 externref)))
984
(func (export "run") (param i32 externref)
985
local.get 0
986
if
987
local.get 0
988
i32.const -1
989
i32.add
990
call $recurse
991
else
992
call $gc
993
end
994
995
local.get 0
996
local.get 1
997
call $test
998
)
999
)
1000
"#,
1001
)?;
1002
1003
type F = TypedFunc<(i32, Option<Rooted<ExternRef>>), ()>;
1004
1005
let mut store = Store::new(&engine, None);
1006
let mut linker = Linker::<Option<F>>::new(&engine);
1007
linker.func_wrap_async("", "gc", |mut cx: Caller<'_, _>, ()| {
1008
Box::new(async move { cx.gc_async(None).await })
1009
})?;
1010
linker.func_wrap(
1011
"",
1012
"test",
1013
|cx: Caller<'_, _>, val: i32, handle: Option<Rooted<ExternRef>>| -> Result<()> {
1014
assert_eq!(
1015
handle.unwrap().data(&cx)?.unwrap().downcast_ref(),
1016
Some(&val)
1017
);
1018
Ok(())
1019
},
1020
)?;
1021
linker.func_wrap_async(
1022
"",
1023
"recurse",
1024
|mut cx: Caller<'_, Option<F>>, (val,): (i32,)| {
1025
let func = cx.data().clone().unwrap();
1026
Box::new(async move {
1027
let r = Some(ExternRef::new_async(&mut cx, val).await?);
1028
Ok(func.call_async(&mut cx, (val, r)).await)
1029
})
1030
},
1031
)?;
1032
let instance = linker.instantiate_async(&mut store, &module).await?;
1033
let func: F = instance.get_typed_func(&mut store, "run")?;
1034
*store.data_mut() = Some(func.clone());
1035
1036
let r = Some(ExternRef::new_async(&mut store, 5).await?);
1037
func.call_async(&mut store, (5, r)).await?;
1038
1039
Ok(())
1040
}
1041
1042
#[tokio::test]
1043
#[cfg_attr(miri, ignore)]
1044
async fn async_gc_with_func_new_and_func_wrap() -> Result<()> {
1045
let _ = env_logger::try_init();
1046
1047
let mut config = Config::new();
1048
config.wasm_gc(true);
1049
let engine = Engine::new(&config)?;
1050
1051
let module = Module::new(
1052
&engine,
1053
r#"
1054
(module $m1
1055
(import "" "a" (func $a (result externref structref arrayref)))
1056
(import "" "b" (func $b (result externref structref arrayref)))
1057
1058
(table 2 funcref)
1059
(elem (i32.const 0) func $a $b)
1060
1061
(func (export "a")
1062
(call $call (i32.const 0))
1063
)
1064
(func (export "b")
1065
(call $call (i32.const 1))
1066
)
1067
1068
(func $call (param i32)
1069
(local $cnt i32)
1070
(loop $l
1071
(call_indirect (result externref structref arrayref) (local.get 0))
1072
drop
1073
drop
1074
drop
1075
1076
(local.set $cnt (i32.add (local.get $cnt) (i32.const 1)))
1077
1078
(if (i32.lt_u (local.get $cnt) (i32.const 5000))
1079
(then (br $l)))
1080
)
1081
)
1082
)
1083
"#,
1084
)?;
1085
1086
let mut linker = Linker::<()>::new(&engine);
1087
linker.func_wrap_async("", "a", |mut cx: Caller<'_, _>, ()| {
1088
Box::new(async move {
1089
let externref = ExternRef::new_async(&mut cx, 100).await?;
1090
1091
let struct_ty = StructType::new(cx.engine(), [])?;
1092
let struct_pre = StructRefPre::new(&mut cx, struct_ty);
1093
let structref = StructRef::new_async(&mut cx, &struct_pre, &[]).await?;
1094
1095
let array_ty = ArrayType::new(
1096
cx.engine(),
1097
FieldType::new(Mutability::Var, ValType::I32.into()),
1098
);
1099
let array_pre = ArrayRefPre::new(&mut cx, array_ty);
1100
let arrayref = ArrayRef::new_fixed_async(&mut cx, &array_pre, &[]).await?;
1101
1102
Ok((Some(externref), Some(structref), Some(arrayref)))
1103
})
1104
})?;
1105
let ty = FuncType::new(
1106
&engine,
1107
[],
1108
[ValType::EXTERNREF, ValType::STRUCTREF, ValType::ARRAYREF],
1109
);
1110
linker.func_new_async("", "b", ty, |mut cx, _, results| {
1111
Box::new(async move {
1112
results[0] = ExternRef::new_async(&mut cx, 100).await?.into();
1113
1114
let struct_ty = StructType::new(cx.engine(), [])?;
1115
let struct_pre = StructRefPre::new(&mut cx, struct_ty);
1116
results[1] = StructRef::new_async(&mut cx, &struct_pre, &[])
1117
.await?
1118
.into();
1119
1120
let array_ty = ArrayType::new(
1121
cx.engine(),
1122
FieldType::new(Mutability::Var, ValType::I32.into()),
1123
);
1124
let array_pre = ArrayRefPre::new(&mut cx, array_ty);
1125
results[2] = ArrayRef::new_fixed_async(&mut cx, &array_pre, &[])
1126
.await?
1127
.into();
1128
1129
Ok(())
1130
})
1131
})?;
1132
1133
let mut store = Store::new(&engine, ());
1134
let instance = linker.instantiate_async(&mut store, &module).await?;
1135
let a = instance.get_typed_func::<(), ()>(&mut store, "a")?;
1136
a.call_async(&mut store, ()).await?;
1137
1138
let mut store = Store::new(&engine, ());
1139
let instance = linker.instantiate_async(&mut store, &module).await?;
1140
let b = instance.get_typed_func::<(), ()>(&mut store, "b")?;
1141
b.call_async(&mut store, ()).await?;
1142
1143
Ok(())
1144
}
1145
1146