Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bytecodealliance
GitHub Repository: bytecodealliance/wasmtime
Path: blob/main/tests/all/component_model/async.rs
3081 views
1
use crate::async_functions::{PollOnce, execute_across_threads};
2
use std::pin::Pin;
3
use std::task::{Context, Poll};
4
use wasmtime::Result;
5
use wasmtime::{AsContextMut, Config, Engine, Store, StoreContextMut, Trap, component::*};
6
use wasmtime_component_util::REALLOC_AND_FREE;
7
8
/// This is super::func::thunks, except with an async store.
9
#[tokio::test]
10
#[cfg_attr(miri, ignore)]
11
async fn smoke() -> Result<()> {
12
let component = r#"
13
(component
14
(core module $m
15
(func (export "thunk"))
16
(func (export "thunk-trap") unreachable)
17
)
18
(core instance $i (instantiate $m))
19
(func (export "thunk")
20
(canon lift (core func $i "thunk"))
21
)
22
(func (export "thunk-trap")
23
(canon lift (core func $i "thunk-trap"))
24
)
25
)
26
"#;
27
28
let engine = super::async_engine();
29
let component = Component::new(&engine, component)?;
30
let mut store = Store::new(&engine, ());
31
let instance = Linker::new(&engine)
32
.instantiate_async(&mut store, &component)
33
.await?;
34
35
let thunk = instance.get_typed_func::<(), ()>(&mut store, "thunk")?;
36
37
thunk.call_async(&mut store, ()).await?;
38
39
let err = instance
40
.get_typed_func::<(), ()>(&mut store, "thunk-trap")?
41
.call_async(&mut store, ())
42
.await
43
.unwrap_err();
44
assert_eq!(err.downcast::<Trap>()?, Trap::UnreachableCodeReached);
45
46
Ok(())
47
}
48
49
/// Handle an import function, created using component::Linker::func_wrap_async.
50
#[tokio::test]
51
#[cfg_attr(miri, ignore)]
52
async fn smoke_func_wrap() -> Result<()> {
53
let component = r#"
54
(component
55
(type $f (func))
56
(import "i" (func $f))
57
58
(core module $m
59
(import "imports" "i" (func $i))
60
(func (export "thunk") call $i)
61
)
62
63
(core func $f (canon lower (func $f)))
64
(core instance $i (instantiate $m
65
(with "imports" (instance
66
(export "i" (func $f))
67
))
68
))
69
(func (export "thunk")
70
(canon lift (core func $i "thunk"))
71
)
72
)
73
"#;
74
75
let engine = super::async_engine();
76
let component = Component::new(&engine, component)?;
77
let mut store = Store::new(&engine, ());
78
let mut linker = Linker::new(&engine);
79
let mut root = linker.root();
80
root.func_wrap_async("i", |_: StoreContextMut<()>, _: ()| {
81
Box::new(async { Ok(()) })
82
})?;
83
84
let instance = linker.instantiate_async(&mut store, &component).await?;
85
86
let thunk = instance.get_typed_func::<(), ()>(&mut store, "thunk")?;
87
88
thunk.call_async(&mut store, ()).await?;
89
90
Ok(())
91
}
92
93
// This test stresses TLS management in combination with the `realloc` option
94
// for imported functions. This will create an async computation which invokes a
95
// component that invokes an imported function. The imported function returns a
96
// list which will require invoking malloc.
97
//
98
// As an added stressor all polls are sprinkled across threads through
99
// `execute_across_threads`. Yields are injected liberally by configuring 1
100
// fuel consumption to trigger a yield.
101
//
102
// Overall a yield should happen during malloc which should be an "interesting
103
// situation" with respect to the runtime.
104
#[tokio::test]
105
#[cfg_attr(miri, ignore)]
106
async fn resume_separate_thread() -> Result<()> {
107
let mut config = wasmtime_test_util::component::config();
108
config.consume_fuel(true);
109
let engine = Engine::new(&config)?;
110
let component = format!(
111
r#"
112
(component
113
(import "yield" (func $yield (result (list u8))))
114
(core module $libc
115
(memory (export "memory") 1)
116
{REALLOC_AND_FREE}
117
)
118
(core instance $libc (instantiate $libc))
119
120
(core func $yield
121
(canon lower
122
(func $yield)
123
(memory $libc "memory")
124
(realloc (func $libc "realloc"))
125
)
126
)
127
128
(core module $m
129
(import "" "yield" (func $yield (param i32)))
130
(import "libc" "memory" (memory 0))
131
(func $start
132
i32.const 8
133
call $yield
134
)
135
(start $start)
136
)
137
(core instance (instantiate $m
138
(with "" (instance (export "yield" (func $yield))))
139
(with "libc" (instance $libc))
140
))
141
)
142
"#
143
);
144
let component = Component::new(&engine, component)?;
145
let mut linker = Linker::new(&engine);
146
linker
147
.root()
148
.func_wrap_async("yield", |_: StoreContextMut<()>, _: ()| {
149
Box::new(async {
150
tokio::task::yield_now().await;
151
Ok((vec![1u8, 2u8],))
152
})
153
})?;
154
155
execute_across_threads(async move {
156
let mut store = Store::new(&engine, ());
157
store.set_fuel(u64::MAX).unwrap();
158
store.fuel_async_yield_interval(Some(1)).unwrap();
159
linker.instantiate_async(&mut store, &component).await?;
160
Ok::<_, wasmtime::Error>(())
161
})
162
.await?;
163
Ok(())
164
}
165
166
// This test is intended to stress TLS management in the component model around
167
// the management of the `realloc` function. This creates an async computation
168
// representing the execution of a component model function where entry into the
169
// component uses `realloc` and then the component runs. This async computation
170
// is then polled iteratively with another "wasm activation" (in this case a
171
// core wasm function) on the stack. The poll-per-call should work and nothing
172
// should in theory have problems here.
173
//
174
// As an added stressor all polls are sprinkled across threads through
175
// `execute_across_threads`. Yields are injected liberally by configuring 1
176
// fuel consumption to trigger a yield.
177
//
178
// Overall a yield should happen during malloc which should be an "interesting
179
// situation" with respect to the runtime.
180
#[tokio::test]
181
#[cfg_attr(miri, ignore)]
182
async fn poll_through_wasm_activation() -> Result<()> {
183
let mut config = wasmtime_test_util::component::config();
184
config.consume_fuel(true);
185
let engine = Engine::new(&config)?;
186
let component = format!(
187
r#"
188
(component
189
(core module $m
190
{REALLOC_AND_FREE}
191
(memory (export "memory") 1)
192
(func (export "run") (param i32 i32)
193
)
194
)
195
(core instance $i (instantiate $m))
196
(func (export "run") (param "x" (list u8))
197
(canon lift (core func $i "run")
198
(memory $i "memory")
199
(realloc (func $i "realloc"))))
200
)
201
"#
202
);
203
let component = Component::new(&engine, component)?;
204
let linker = Linker::new(&engine);
205
206
let invoke_component = {
207
let engine = engine.clone();
208
async move {
209
let mut store = Store::new(&engine, ());
210
store.set_fuel(u64::MAX).unwrap();
211
store.fuel_async_yield_interval(Some(1)).unwrap();
212
let instance = linker.instantiate_async(&mut store, &component).await?;
213
let func = instance.get_typed_func::<(Vec<u8>,), ()>(&mut store, "run")?;
214
func.call_async(&mut store, (vec![1, 2, 3],)).await?;
215
Ok::<_, wasmtime::Error>(())
216
}
217
};
218
219
execute_across_threads(async move {
220
let mut store = Store::new(&engine, Some(Box::pin(invoke_component)));
221
let poll_once = wasmtime::Func::wrap_async(&mut store, |mut cx, _: ()| {
222
let invoke_component = cx.data_mut().take().unwrap();
223
Box::new(async move {
224
match PollOnce::new(invoke_component).await {
225
Ok(result) => {
226
result?;
227
Ok(1)
228
}
229
Err(future) => {
230
*cx.data_mut() = Some(future);
231
Ok(0)
232
}
233
}
234
})
235
});
236
let poll_once = poll_once.typed::<(), i32>(&mut store)?;
237
while poll_once.call_async(&mut store, ()).await? != 1 {
238
// loop around to call again
239
}
240
Ok::<_, wasmtime::Error>(())
241
})
242
.await?;
243
Ok(())
244
}
245
246
/// Test async drop method for host resources.
247
#[tokio::test]
248
#[cfg_attr(miri, ignore)]
249
async fn drop_resource_async() -> Result<()> {
250
use std::sync::Arc;
251
use std::sync::Mutex;
252
253
let engine = super::async_engine();
254
let c = Component::new(
255
&engine,
256
r#"
257
(component
258
(import "t" (type $t (sub resource)))
259
260
(core func $drop (canon resource.drop $t))
261
262
(core module $m
263
(import "" "drop" (func $drop (param i32)))
264
(func (export "f") (param i32)
265
(call $drop (local.get 0))
266
)
267
)
268
(core instance $i (instantiate $m
269
(with "" (instance
270
(export "drop" (func $drop))
271
))
272
))
273
274
(func (export "f") (param "x" (own $t))
275
(canon lift (core func $i "f")))
276
)
277
"#,
278
)?;
279
280
struct MyType;
281
282
let mut store = Store::new(&engine, ());
283
let mut linker = Linker::new(&engine);
284
285
let drop_status = Arc::new(Mutex::new("not dropped"));
286
let ds = drop_status.clone();
287
288
linker
289
.root()
290
.resource_async("t", ResourceType::host::<MyType>(), move |_, _| {
291
let ds = ds.clone();
292
Box::new(async move {
293
*ds.lock().unwrap() = "before yield";
294
tokio::task::yield_now().await;
295
*ds.lock().unwrap() = "after yield";
296
Ok(())
297
})
298
})?;
299
let i = linker.instantiate_async(&mut store, &c).await?;
300
let f = i.get_typed_func::<(Resource<MyType>,), ()>(&mut store, "f")?;
301
302
execute_across_threads(async move {
303
let resource = Resource::new_own(100);
304
f.call_async(&mut store, (resource,)).await?;
305
Ok::<_, wasmtime::Error>(())
306
})
307
.await?;
308
309
assert_eq!("after yield", *drop_status.lock().unwrap());
310
311
Ok(())
312
}
313
314
/// Test task deletion in three situations, for every combination of lift/lower/(guest/host):
315
/// 1. An explicit thread calls task.return
316
/// 2. An explicit thread suspends indefinitely
317
/// 3. An explicit thread yield loops indefinitely
318
#[tokio::test]
319
#[cfg_attr(miri, ignore)]
320
async fn task_deletion() -> Result<()> {
321
let mut config = Config::new();
322
config.wasm_component_model_async(true);
323
config.wasm_component_model_threading(true);
324
config.wasm_component_model_async_stackful(true);
325
config.wasm_component_model_async_builtins(true);
326
let engine = Engine::new(&config)?;
327
let component = Component::new(
328
&engine,
329
r#"(component
330
(component $C
331
(core module $Memory (memory (export "mem") 1))
332
(core instance $memory (instantiate $Memory))
333
;; Defines the table for the thread start functions
334
(core module $libc
335
(table (export "__indirect_function_table") 3 funcref))
336
(core module $CM
337
(import "" "mem" (memory 1))
338
(import "" "task.return" (func $task-return (param i32)))
339
(import "" "task.cancel" (func $task-cancel))
340
(import "" "thread.new-indirect" (func $thread-new-indirect (param i32 i32) (result i32)))
341
(import "" "thread.suspend" (func $thread-suspend (result i32)))
342
(import "" "thread.suspend-cancellable" (func $thread-suspend-cancellable (result i32)))
343
(import "" "thread.yield-to" (func $thread-yield-to (param i32) (result i32)))
344
(import "" "thread.yield-to-cancellable" (func $thread-yield-to-cancellable (param i32) (result i32)))
345
(import "" "thread.switch-to" (func $thread-switch-to (param i32) (result i32)))
346
(import "" "thread.switch-to-cancellable" (func $thread-switch-to-cancellable (param i32) (result i32)))
347
(import "" "thread.yield" (func $thread-yield (result i32)))
348
(import "" "thread.yield-cancellable" (func $thread-yield-cancellable (result i32)))
349
(import "" "thread.index" (func $thread-index (result i32)))
350
(import "" "thread.resume-later" (func $thread-resume-later (param i32)))
351
(import "" "waitable.join" (func $waitable.join (param i32 i32)))
352
(import "" "waitable-set.new" (func $waitable-set.new (result i32)))
353
(import "" "waitable-set.wait" (func $waitable-set.wait (param i32 i32) (result i32)))
354
(import "libc" "__indirect_function_table" (table $indirect-function-table 3 funcref))
355
356
;; Indices into the function table for the thread start functions
357
(global $call-return-ftbl-idx i32 (i32.const 0))
358
(global $suspend-ftbl-idx i32 (i32.const 1))
359
(global $yield-loop-ftbl-idx i32 (i32.const 2))
360
361
(func $call-return (param i32)
362
(call $task-return (local.get 0)))
363
364
(func $suspend (param i32)
365
(drop (call $thread-suspend)))
366
367
(func $yield-loop (param i32)
368
(loop $top
369
(drop (call $thread-yield))
370
(br $top)))
371
372
(func (export "explicit-thread-calls-return-stackful")
373
(call $thread-resume-later
374
(call $thread-new-indirect (global.get $call-return-ftbl-idx) (i32.const 42))))
375
376
(func (export "explicit-thread-calls-return-stackless") (result i32)
377
(call $thread-resume-later
378
(call $thread-new-indirect (global.get $call-return-ftbl-idx) (i32.const 42)))
379
(i32.const 0 (; EXIT ;)))
380
381
(func (export "cb") (param i32 i32 i32) (result i32)
382
(unreachable))
383
384
(func (export "explicit-thread-suspends-sync") (result i32)
385
(call $thread-resume-later
386
(call $thread-new-indirect (global.get $suspend-ftbl-idx) (i32.const 42)))
387
(i32.const 42))
388
389
(func (export "explicit-thread-suspends-stackful")
390
(call $thread-resume-later
391
(call $thread-new-indirect (global.get $suspend-ftbl-idx) (i32.const 42)))
392
(call $task-return (i32.const 42)))
393
394
(func (export "explicit-thread-suspends-stackless") (result i32)
395
(call $thread-resume-later
396
(call $thread-new-indirect (global.get $suspend-ftbl-idx) (i32.const 42)))
397
(call $task-return (i32.const 42))
398
(i32.const 0))
399
400
(func (export "explicit-thread-yield-loops-sync") (result i32)
401
(call $thread-resume-later
402
(call $thread-new-indirect (global.get $yield-loop-ftbl-idx) (i32.const 42)))
403
(i32.const 42))
404
405
(func (export "explicit-thread-yield-loops-stackful")
406
(call $thread-resume-later
407
(call $thread-new-indirect (global.get $yield-loop-ftbl-idx) (i32.const 42)))
408
(call $task-return (i32.const 42)))
409
410
(func (export "explicit-thread-yield-loops-stackless") (result i32)
411
(call $thread-resume-later
412
(call $thread-new-indirect (global.get $suspend-ftbl-idx) (i32.const 42)))
413
(call $task-return (i32.const 42))
414
(i32.const 0 (; EXIT ;)))
415
416
;; Initialize the function table that will be used by thread.new-indirect
417
(elem (table $indirect-function-table) (i32.const 0 (; call-return-ftbl-idx ;)) func $call-return)
418
(elem (table $indirect-function-table) (i32.const 1 (; suspend-ftbl-idx ;)) func $suspend)
419
(elem (table $indirect-function-table) (i32.const 2 (; yield-loop-ftbl-idx ;)) func $yield-loop)
420
)
421
422
;; Instantiate the libc module to get the table
423
(core instance $libc (instantiate $libc))
424
;; Get access to `thread.new-indirect` that uses the table from libc
425
(core type $start-func-ty (func (param i32)))
426
(alias core export $libc "__indirect_function_table" (core table $indirect-function-table))
427
428
(core func $task-return (canon task.return (result u32)))
429
(core func $task-cancel (canon task.cancel))
430
(core func $thread-new-indirect
431
(canon thread.new-indirect $start-func-ty (table $indirect-function-table)))
432
(core func $thread-yield (canon thread.yield))
433
(core func $thread-yield-cancellable (canon thread.yield cancellable))
434
(core func $thread-index (canon thread.index))
435
(core func $thread-yield-to (canon thread.yield-to))
436
(core func $thread-yield-to-cancellable (canon thread.yield-to cancellable))
437
(core func $thread-resume-later (canon thread.resume-later))
438
(core func $thread-switch-to (canon thread.switch-to))
439
(core func $thread-switch-to-cancellable (canon thread.switch-to cancellable))
440
(core func $thread-suspend (canon thread.suspend))
441
(core func $thread-suspend-cancellable (canon thread.suspend cancellable))
442
(core func $waitable-set.new (canon waitable-set.new))
443
(core func $waitable.join (canon waitable.join))
444
(core func $waitable-set.wait (canon waitable-set.wait (memory $memory "mem")))
445
446
;; Instantiate the main module
447
(core instance $cm (
448
instantiate $CM
449
(with "" (instance
450
(export "mem" (memory $memory "mem"))
451
(export "task.return" (func $task-return))
452
(export "task.cancel" (func $task-cancel))
453
(export "thread.new-indirect" (func $thread-new-indirect))
454
(export "thread.index" (func $thread-index))
455
(export "thread.yield-to" (func $thread-yield-to))
456
(export "thread.yield-to-cancellable" (func $thread-yield-to-cancellable))
457
(export "thread.yield" (func $thread-yield))
458
(export "thread.yield-cancellable" (func $thread-yield-cancellable))
459
(export "thread.switch-to" (func $thread-switch-to))
460
(export "thread.switch-to-cancellable" (func $thread-switch-to-cancellable))
461
(export "thread.suspend" (func $thread-suspend))
462
(export "thread.suspend-cancellable" (func $thread-suspend-cancellable))
463
(export "thread.resume-later" (func $thread-resume-later))
464
(export "waitable.join" (func $waitable.join))
465
(export "waitable-set.wait" (func $waitable-set.wait))
466
(export "waitable-set.new" (func $waitable-set.new))))
467
(with "libc" (instance $libc))))
468
469
(func (export "explicit-thread-calls-return-stackful") async (result u32)
470
(canon lift (core func $cm "explicit-thread-calls-return-stackful") async))
471
(func (export "explicit-thread-calls-return-stackless") async (result u32)
472
(canon lift (core func $cm "explicit-thread-calls-return-stackless") async (callback (func $cm "cb"))))
473
(func (export "explicit-thread-suspends-sync") async (result u32)
474
(canon lift (core func $cm "explicit-thread-suspends-sync")))
475
(func (export "explicit-thread-suspends-stackful") async (result u32)
476
(canon lift (core func $cm "explicit-thread-suspends-stackful") async))
477
(func (export "explicit-thread-suspends-stackless") async (result u32)
478
(canon lift (core func $cm "explicit-thread-suspends-stackless") async (callback (func $cm "cb"))))
479
(func (export "explicit-thread-yield-loops-sync") async (result u32)
480
(canon lift (core func $cm "explicit-thread-yield-loops-sync")))
481
(func (export "explicit-thread-yield-loops-stackful") async (result u32)
482
(canon lift (core func $cm "explicit-thread-yield-loops-stackful") async))
483
(func (export "explicit-thread-yield-loops-stackless") async (result u32)
484
(canon lift (core func $cm "explicit-thread-yield-loops-stackless") async (callback (func $cm "cb"))))
485
)
486
487
(component $D
488
(import "explicit-thread-calls-return-stackful" (func $explicit-thread-calls-return-stackful async (result u32)))
489
(import "explicit-thread-calls-return-stackless" (func $explicit-thread-calls-return-stackless async (result u32)))
490
(import "explicit-thread-suspends-sync" (func $explicit-thread-suspends-sync async (result u32)))
491
(import "explicit-thread-suspends-stackful" (func $explicit-thread-suspends-stackful async (result u32)))
492
(import "explicit-thread-suspends-stackless" (func $explicit-thread-suspends-stackless async (result u32)))
493
(import "explicit-thread-yield-loops-sync" (func $explicit-thread-yield-loops-sync async (result u32)))
494
(import "explicit-thread-yield-loops-stackful" (func $explicit-thread-yield-loops-stackful async (result u32)))
495
(import "explicit-thread-yield-loops-stackless" (func $explicit-thread-yield-loops-stackless async (result u32)))
496
497
(core module $Memory (memory (export "mem") 1))
498
(core instance $memory (instantiate $Memory))
499
(core module $DM
500
(import "" "mem" (memory 1))
501
(import "" "subtask.cancel" (func $subtask.cancel (param i32) (result i32)))
502
;; sync lowered
503
(import "" "explicit-thread-calls-return-stackful" (func $explicit-thread-calls-return-stackful (result i32)))
504
(import "" "explicit-thread-calls-return-stackless" (func $explicit-thread-calls-return-stackless (result i32)))
505
(import "" "explicit-thread-suspends-sync" (func $explicit-thread-suspends-sync (result i32)))
506
(import "" "explicit-thread-suspends-stackful" (func $explicit-thread-suspends-stackful (result i32)))
507
(import "" "explicit-thread-suspends-stackless" (func $explicit-thread-suspends-stackless (result i32)))
508
(import "" "explicit-thread-yield-loops-sync" (func $explicit-thread-yield-loops-sync (result i32)))
509
(import "" "explicit-thread-yield-loops-stackful" (func $explicit-thread-yield-loops-stackful (result i32)))
510
(import "" "explicit-thread-yield-loops-stackless" (func $explicit-thread-yield-loops-stackless (result i32)))
511
;; async lowered
512
(import "" "explicit-thread-calls-return-stackful-async" (func $explicit-thread-calls-return-stackful-async (param i32) (result i32)))
513
(import "" "explicit-thread-calls-return-stackless-async" (func $explicit-thread-calls-return-stackless-async (param i32) (result i32)))
514
(import "" "explicit-thread-suspends-sync-async" (func $explicit-thread-suspends-sync-async (param i32) (result i32)))
515
(import "" "explicit-thread-suspends-stackful-async" (func $explicit-thread-suspends-stackful-async (param i32) (result i32)))
516
(import "" "explicit-thread-suspends-stackless-async" (func $explicit-thread-suspends-stackless-async (param i32) (result i32)))
517
(import "" "explicit-thread-yield-loops-sync-async" (func $explicit-thread-yield-loops-sync-async (param i32) (result i32)))
518
(import "" "explicit-thread-yield-loops-stackful-async" (func $explicit-thread-yield-loops-stackful-async (param i32) (result i32)))
519
(import "" "explicit-thread-yield-loops-stackless-async" (func $explicit-thread-yield-loops-stackless-async (param i32) (result i32)))
520
(import "" "waitable.join" (func $waitable.join (param i32 i32)))
521
(import "" "waitable-set.new" (func $waitable-set.new (result i32)))
522
(import "" "waitable-set.wait" (func $waitable-set.wait (param i32 i32) (result i32)))
523
(import "" "thread.yield" (func $thread-yield (result i32)))
524
525
(func $check (param i32)
526
(if (i32.ne (local.get 0) (i32.const 42))
527
(then unreachable))
528
)
529
530
(func $check-async (param i32)
531
(local $retp i32) (local $ws i32) (local $ws-retp i32)
532
(local.set $retp (i32.const 8))
533
(local.set $ws-retp (i32.const 16))
534
(local.set $ws (call $waitable-set.new))
535
536
(if (i32.eq (i32.and (local.get 0) (i32.const 0xF)) (i32.const 2 (; RETURNED ;)))
537
(then (call $check (i32.load (local.get $retp))))
538
(else
539
(call $waitable.join (i32.shr_u (local.get 0) (i32.const 4)) (local.get $ws))
540
(drop (call $waitable-set.wait (local.get $ws) (local.get $ws-retp)))
541
(call $check (i32.load (local.get $retp)))))
542
)
543
544
(func $run (export "run") (result i32)
545
(local $retp i32)
546
(local.set $retp (i32.const 8))
547
(call $check (call $explicit-thread-calls-return-stackless))
548
(call $check (call $explicit-thread-calls-return-stackful))
549
(call $check (call $explicit-thread-suspends-sync))
550
(call $check (call $explicit-thread-suspends-stackful))
551
(call $check (call $explicit-thread-suspends-stackless))
552
(call $check (call $explicit-thread-yield-loops-sync))
553
(call $check (call $explicit-thread-yield-loops-stackful))
554
(call $check (call $explicit-thread-yield-loops-stackless))
555
556
(call $check-async (call $explicit-thread-calls-return-stackless-async (local.get $retp)))
557
(call $check-async (call $explicit-thread-calls-return-stackful-async (local.get $retp)))
558
(call $check-async (call $explicit-thread-suspends-sync-async (local.get $retp)))
559
(call $check-async (call $explicit-thread-suspends-stackful-async (local.get $retp)))
560
(call $check-async (call $explicit-thread-suspends-stackless-async (local.get $retp)))
561
(call $check-async (call $explicit-thread-yield-loops-sync-async (local.get $retp)))
562
(call $check-async (call $explicit-thread-yield-loops-stackful-async (local.get $retp)))
563
(call $check-async (call $explicit-thread-yield-loops-stackless-async (local.get $retp)))
564
565
(i32.const 42)
566
)
567
)
568
569
(core func $waitable-set.new (canon waitable-set.new))
570
(core func $waitable-set.wait (canon waitable-set.wait (memory $memory "mem")))
571
(core func $waitable.join (canon waitable.join))
572
(core func $subtask.cancel (canon subtask.cancel async))
573
(core func $thread.yield (canon thread.yield))
574
;; sync lowered
575
(canon lower (func $explicit-thread-calls-return-stackful) (memory $memory "mem") (core func $explicit-thread-calls-return-stackful'))
576
(canon lower (func $explicit-thread-calls-return-stackless) (memory $memory "mem") (core func $explicit-thread-calls-return-stackless'))
577
(canon lower (func $explicit-thread-suspends-sync) (memory $memory "mem") (core func $explicit-thread-suspends-sync'))
578
(canon lower (func $explicit-thread-suspends-stackful) (memory $memory "mem") (core func $explicit-thread-suspends-stackful'))
579
(canon lower (func $explicit-thread-suspends-stackless) (memory $memory "mem") (core func $explicit-thread-suspends-stackless'))
580
(canon lower (func $explicit-thread-yield-loops-sync) (memory $memory "mem") (core func $explicit-thread-yield-loops-sync'))
581
(canon lower (func $explicit-thread-yield-loops-stackful) (memory $memory "mem") (core func $explicit-thread-yield-loops-stackful'))
582
(canon lower (func $explicit-thread-yield-loops-stackless) (memory $memory "mem") (core func $explicit-thread-yield-loops-stackless'))
583
;; async lowered
584
(canon lower (func $explicit-thread-calls-return-stackful) async (memory $memory "mem") (core func $explicit-thread-calls-return-stackful-async'))
585
(canon lower (func $explicit-thread-calls-return-stackless) async (memory $memory "mem") (core func $explicit-thread-calls-return-stackless-async'))
586
(canon lower (func $explicit-thread-suspends-sync) async (memory $memory "mem") (core func $explicit-thread-suspends-sync-async'))
587
(canon lower (func $explicit-thread-suspends-stackful) async (memory $memory "mem") (core func $explicit-thread-suspends-stackful-async'))
588
(canon lower (func $explicit-thread-suspends-stackless) async (memory $memory "mem") (core func $explicit-thread-suspends-stackless-async'))
589
(canon lower (func $explicit-thread-yield-loops-sync) async (memory $memory "mem") (core func $explicit-thread-yield-loops-sync-async'))
590
(canon lower (func $explicit-thread-yield-loops-stackful) async (memory $memory "mem") (core func $explicit-thread-yield-loops-stackful-async'))
591
(canon lower (func $explicit-thread-yield-loops-stackless) async (memory $memory "mem") (core func $explicit-thread-yield-loops-stackless-async'))
592
(core instance $dm (instantiate $DM (with "" (instance
593
(export "mem" (memory $memory "mem"))
594
(export "explicit-thread-calls-return-stackful" (func $explicit-thread-calls-return-stackful'))
595
(export "explicit-thread-calls-return-stackless" (func $explicit-thread-calls-return-stackless'))
596
(export "explicit-thread-suspends-sync" (func $explicit-thread-suspends-sync'))
597
(export "explicit-thread-suspends-stackful" (func $explicit-thread-suspends-stackful'))
598
(export "explicit-thread-suspends-stackless" (func $explicit-thread-suspends-stackless'))
599
(export "explicit-thread-yield-loops-sync" (func $explicit-thread-yield-loops-sync'))
600
(export "explicit-thread-yield-loops-stackful" (func $explicit-thread-yield-loops-stackful'))
601
(export "explicit-thread-yield-loops-stackless" (func $explicit-thread-yield-loops-stackless'))
602
(export "explicit-thread-calls-return-stackful-async" (func $explicit-thread-calls-return-stackful-async'))
603
(export "explicit-thread-calls-return-stackless-async" (func $explicit-thread-calls-return-stackless-async'))
604
(export "explicit-thread-suspends-sync-async" (func $explicit-thread-suspends-sync-async'))
605
(export "explicit-thread-suspends-stackful-async" (func $explicit-thread-suspends-stackful-async'))
606
(export "explicit-thread-suspends-stackless-async" (func $explicit-thread-suspends-stackless-async'))
607
(export "explicit-thread-yield-loops-sync-async" (func $explicit-thread-yield-loops-sync-async'))
608
(export "explicit-thread-yield-loops-stackful-async" (func $explicit-thread-yield-loops-stackful-async'))
609
(export "explicit-thread-yield-loops-stackless-async" (func $explicit-thread-yield-loops-stackless-async'))
610
(export "waitable.join" (func $waitable.join))
611
(export "waitable-set.new" (func $waitable-set.new))
612
(export "waitable-set.wait" (func $waitable-set.wait))
613
(export "subtask.cancel" (func $subtask.cancel))
614
(export "thread.yield" (func $thread.yield))
615
))))
616
(func (export "run") async (result u32) (canon lift (core func $dm "run")))
617
)
618
619
(instance $c (instantiate $C))
620
(instance $d (instantiate $D
621
(with "explicit-thread-calls-return-stackful" (func $c "explicit-thread-calls-return-stackful"))
622
(with "explicit-thread-calls-return-stackless" (func $c "explicit-thread-calls-return-stackless"))
623
(with "explicit-thread-suspends-sync" (func $c "explicit-thread-suspends-sync"))
624
(with "explicit-thread-suspends-stackful" (func $c "explicit-thread-suspends-stackful"))
625
(with "explicit-thread-suspends-stackless" (func $c "explicit-thread-suspends-stackless"))
626
(with "explicit-thread-yield-loops-sync" (func $c "explicit-thread-yield-loops-sync"))
627
(with "explicit-thread-yield-loops-stackful" (func $c "explicit-thread-yield-loops-stackful"))
628
(with "explicit-thread-yield-loops-stackless" (func $c "explicit-thread-yield-loops-stackless"))
629
))
630
(func (export "run") (alias export $d "run"))
631
(func (export "explicit-thread-calls-return-stackful") (alias export $c "explicit-thread-calls-return-stackful"))
632
(func (export "explicit-thread-calls-return-stackless") (alias export $c "explicit-thread-calls-return-stackless"))
633
(func (export "explicit-thread-suspends-sync") (alias export $c "explicit-thread-suspends-sync"))
634
(func (export "explicit-thread-suspends-stackful") (alias export $c "explicit-thread-suspends-stackful"))
635
(func (export "explicit-thread-suspends-stackless") (alias export $c "explicit-thread-suspends-stackless"))
636
(func (export "explicit-thread-yield-loops-sync") (alias export $c "explicit-thread-yield-loops-sync"))
637
(func (export "explicit-thread-yield-loops-stackful") (alias export $c "explicit-thread-yield-loops-stackful"))
638
(func (export "explicit-thread-yield-loops-stackless") (alias export $c "explicit-thread-yield-loops-stackless"))
639
)
640
"#,
641
)?
642
.serialize()?;
643
644
let component = unsafe { Component::deserialize(&engine, &component)? };
645
let mut store = Store::new(&engine, ());
646
let instance = Linker::new(&engine)
647
.instantiate_async(&mut store, &component)
648
.await?;
649
let funcs = vec![
650
"run",
651
"explicit-thread-calls-return-stackful",
652
"explicit-thread-calls-return-stackless",
653
"explicit-thread-suspends-sync",
654
"explicit-thread-suspends-stackful",
655
"explicit-thread-suspends-stackless",
656
"explicit-thread-yield-loops-sync",
657
"explicit-thread-yield-loops-stackful",
658
"explicit-thread-yield-loops-stackless",
659
];
660
for func in funcs {
661
let func = instance.get_typed_func::<(), (u32,)>(&mut store, func)?;
662
assert_eq!(func.call_async(&mut store, ()).await?, (42,));
663
}
664
665
Ok(())
666
}
667
668
#[tokio::test]
669
#[cfg_attr(miri, ignore)]
670
async fn cancel_host_future() -> Result<()> {
671
let mut config = Config::new();
672
config.wasm_component_model_async(true);
673
let engine = Engine::new(&config)?;
674
675
let component = Component::new(
676
&engine,
677
r#"
678
(component
679
(core module $libc (memory (export "memory") 1))
680
(core instance $libc (instantiate $libc))
681
(core module $m
682
(import "" "future.read" (func $future.read (param i32 i32) (result i32)))
683
(import "" "future.cancel-read" (func $future.cancel-read (param i32) (result i32)))
684
(memory (export "memory") 1)
685
686
(func (export "run") (param i32)
687
;; read/cancel attempt 1
688
(call $future.read (local.get 0) (i32.const 100))
689
i32.const -1 ;; BLOCKED
690
i32.ne
691
if unreachable end
692
693
(call $future.cancel-read (local.get 0))
694
i32.const 2 ;; CANCELLED
695
i32.ne
696
if unreachable end
697
698
;; read/cancel attempt 2
699
(call $future.read (local.get 0) (i32.const 100))
700
i32.const -1 ;; BLOCKED
701
i32.ne
702
if unreachable end
703
704
(call $future.cancel-read (local.get 0))
705
i32.const 2 ;; CANCELLED
706
i32.ne
707
if unreachable end
708
)
709
)
710
711
(type $f (future u32))
712
(core func $future.read (canon future.read $f async (memory $libc "memory")))
713
(core func $future.cancel-read (canon future.cancel-read $f))
714
715
(core instance $i (instantiate $m
716
(with "" (instance
717
(export "future.read" (func $future.read))
718
(export "future.cancel-read" (func $future.cancel-read))
719
))
720
))
721
722
(func (export "run") async (param "f" $f)
723
(canon lift
724
(core func $i "run")
725
(memory $libc "memory")
726
)
727
)
728
)
729
"#,
730
)?;
731
732
let mut store = Store::new(&engine, ());
733
let instance = Linker::new(&engine)
734
.instantiate_async(&mut store, &component)
735
.await?;
736
let func = instance.get_typed_func::<(FutureReader<u32>,), ()>(&mut store, "run")?;
737
let reader = FutureReader::new(&mut store, MyFutureReader);
738
func.call_async(&mut store, (reader,)).await?;
739
740
return Ok(());
741
742
struct MyFutureReader;
743
744
impl FutureProducer<()> for MyFutureReader {
745
type Item = u32;
746
747
fn poll_produce(
748
self: Pin<&mut Self>,
749
_cx: &mut Context<'_>,
750
_store: StoreContextMut<()>,
751
finish: bool,
752
) -> Poll<Result<Option<Self::Item>>> {
753
if finish {
754
Poll::Ready(Ok(None))
755
} else {
756
Poll::Pending
757
}
758
}
759
}
760
}
761
762
#[tokio::test]
763
#[cfg_attr(miri, ignore)]
764
async fn run_wasm_in_call_async() -> Result<()> {
765
_ = env_logger::try_init();
766
767
let mut config = Config::new();
768
config.wasm_component_model_async(true);
769
let engine = Engine::new(&config)?;
770
771
let a = Component::new(
772
&engine,
773
r#"
774
(component
775
(type $t (func async))
776
(import "a" (func $f (type $t)))
777
(core func $f (canon lower (func $f)))
778
(core module $a
779
(import "" "f" (func $f))
780
(func (export "run") call $f)
781
)
782
(core instance $a (instantiate $a
783
(with "" (instance (export "f" (func $f))))
784
))
785
(func (export "run") (type $t)
786
(canon lift (core func $a "run")))
787
)
788
"#,
789
)?;
790
let b = Component::new(
791
&engine,
792
r#"
793
(component
794
(type $t (func async))
795
(core module $a
796
(func (export "run"))
797
)
798
(core instance $a (instantiate $a))
799
(func (export "run") (type $t)
800
(canon lift (core func $a "run")))
801
)
802
"#,
803
)?;
804
805
type State = Option<Instance>;
806
807
let mut linker = Linker::new(&engine);
808
linker
809
.root()
810
.func_wrap_concurrent("a", |accessor: &Accessor<State>, (): ()| {
811
Box::pin(async move {
812
let func = accessor.with(|mut access| {
813
access
814
.get()
815
.unwrap()
816
.get_typed_func::<(), ()>(&mut access, "run")
817
})?;
818
func.call_concurrent(accessor, ()).await?;
819
Ok(())
820
})
821
})?;
822
let mut store = Store::new(&engine, None);
823
let instance_a = linker.instantiate_async(&mut store, &a).await?;
824
let instance_b = linker.instantiate_async(&mut store, &b).await?;
825
*store.data_mut() = Some(instance_b);
826
let run = instance_a.get_typed_func::<(), ()>(&mut store, "run")?;
827
run.call_async(&mut store, ()).await?;
828
Ok(())
829
}
830
831
#[tokio::test]
832
#[cfg_attr(miri, ignore)]
833
async fn require_concurrency_support() -> Result<()> {
834
let mut config = Config::new();
835
config.concurrency_support(false);
836
let engine = Engine::new(&config)?;
837
838
let mut store = Store::new(&engine, ());
839
840
assert!(
841
store
842
.run_concurrent(async |_| wasmtime::error::Ok(()))
843
.await
844
.is_err()
845
);
846
847
let ok = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
848
StreamReader::<u32>::new(&mut store, Vec::new());
849
}));
850
assert!(ok.is_err());
851
852
let ok = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
853
FutureReader::new(&mut store, async { wasmtime::error::Ok(0) })
854
}));
855
assert!(ok.is_err());
856
857
let mut linker = Linker::<()>::new(&engine);
858
let mut root = linker.root();
859
860
assert!(
861
root.func_wrap_concurrent::<(), (), _>("f1", |_, _| { todo!() })
862
.is_err()
863
);
864
assert!(
865
root.func_new_concurrent("f2", |_, _, _, _| { todo!() })
866
.is_err()
867
);
868
assert!(
869
root.resource_concurrent("f3", ResourceType::host::<u32>(), |_, _| { todo!() })
870
.is_err()
871
);
872
873
Ok(())
874
}
875
876
#[tokio::test]
877
#[cfg_attr(miri, ignore)]
878
async fn cancel_host_task_does_not_leak() -> Result<()> {
879
let mut config = Config::new();
880
config.wasm_component_model_async(true);
881
let engine = Engine::new(&config)?;
882
883
let mut store = Store::new(&engine, ());
884
let component = Component::new(
885
&engine,
886
r#"(component
887
(import "f" (func $f async))
888
889
(core module $m
890
(import "" "f" (func $f (result i32)))
891
(import "" "cancel" (func $cancel (param i32) (result i32)))
892
(import "" "drop" (func $drop (param i32)))
893
(func (export "run")
894
(local i32)
895
896
;; start the subtask, asserting it's `STARTED`
897
call $f
898
local.tee 0
899
i32.const 0xf
900
i32.and
901
i32.const 1 ;; STARTED
902
i32.ne
903
if unreachable end
904
905
;; extract the task id
906
local.get 0
907
i32.const 4
908
i32.shr_u
909
local.set 0
910
911
;; cancel the subtask asserting it's `RETURN_CANCELLED`
912
local.get 0
913
call $cancel
914
i32.const 4 ;; RETURN_CANCELLED
915
i32.ne
916
if unreachable end
917
918
;; drop the subtask
919
local.get 0
920
call $drop
921
)
922
)
923
(core func $f (canon lower (func $f) async))
924
(core func $cancel (canon subtask.cancel))
925
(core func $drop (canon subtask.drop))
926
(core instance $i (instantiate $m
927
(with "" (instance
928
(export "f" (func $f))
929
(export "cancel" (func $cancel))
930
(export "drop" (func $drop))
931
))
932
))
933
934
(func (export "f") async
935
(canon lift (core func $i "run")))
936
937
938
)"#,
939
)?;
940
941
let mut linker = Linker::new(&engine);
942
linker.root().func_wrap_concurrent("f", |_, ()| {
943
Box::pin(async move {
944
std::future::pending::<()>().await;
945
Ok(())
946
})
947
})?;
948
let instance = linker.instantiate_async(&mut store, &component).await?;
949
let func = instance.get_typed_func::<(), ()>(&mut store, "f")?;
950
store
951
.run_concurrent(async |store| -> wasmtime::Result<()> {
952
func.call_concurrent(store, ()).await?;
953
954
for _ in 0..5 {
955
tokio::task::yield_now().await;
956
}
957
Ok(())
958
})
959
.await??;
960
961
// The host task was cancelled, nothing should remain.
962
store.assert_concurrent_state_empty();
963
964
Ok(())
965
}
966
967
#[tokio::test]
968
#[cfg_attr(miri, ignore)]
969
async fn sync_lower_async_host_does_not_leak() -> Result<()> {
970
let mut config = Config::new();
971
config.wasm_component_model_async(true);
972
let engine = Engine::new(&config)?;
973
974
let mut store = Store::new(&engine, 0);
975
let component = Component::new(
976
&engine,
977
r#"(component
978
(import "f" (func $f async))
979
980
(core module $m
981
(import "" "f" (func $f))
982
(func (export "run")
983
(local $c i32)
984
985
;; call the host 100 times
986
loop
987
call $f
988
(local.tee $c (i32.add (local.get $c) (i32.const 1)))
989
i32.const 100
990
i32.ne
991
if br 1 end
992
end
993
)
994
)
995
(core func $f (canon lower (func $f) ))
996
(core instance $i (instantiate $m
997
(with "" (instance
998
(export "f" (func $f))
999
))
1000
))
1001
1002
(func (export "f") async
1003
(canon lift (core func $i "run")))
1004
1005
1006
)"#,
1007
)?;
1008
1009
let mut linker = Linker::<usize>::new(&engine);
1010
linker
1011
.root()
1012
.func_wrap_concurrent("f", |accessor, (): ()| {
1013
Box::pin(async move {
1014
// Ensure that this doesn't hit the fast path of "ready on
1015
// first poll"
1016
for _ in 0..5 {
1017
tokio::task::yield_now().await;
1018
}
1019
1020
// Keep track of the maximum size of the table in
1021
// concurrent_state.
1022
accessor.with(|mut s| {
1023
let cur = s.as_context_mut().concurrent_state_table_size();
1024
let max = s.data_mut();
1025
*max = (*max).max(cur);
1026
});
1027
Ok(())
1028
})
1029
})?;
1030
let instance = linker.instantiate_async(&mut store, &component).await?;
1031
let func = instance.get_typed_func::<(), ()>(&mut store, "f")?;
1032
func.call_async(&mut store, ()).await?;
1033
1034
// First-level assertion: nothing should remain after the guest has exited.
1035
store.assert_concurrent_state_empty();
1036
1037
// Second-level assertion: state should be incrementally cleaned up along
1038
// the way as the guest calls the host. Things shouldn't leak until the
1039
// guest exits at the end, for example.
1040
assert!(
1041
*store.data() < 100,
1042
"the store peaked at over 100 items in the concurrent table which \
1043
indicates that something isn't getting cleaned up between executions \
1044
of the host"
1045
);
1046
1047
Ok(())
1048
}
1049
1050