Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bytecodealliance
GitHub Repository: bytecodealliance/wasmtime
Path: blob/main/crates/misc/component-async-tests/tests/scenario/round_trip.rs
1693 views
1
use std::sync::{
2
Arc, Mutex,
3
atomic::{AtomicU32, Ordering::Relaxed},
4
};
5
use std::time::Duration;
6
7
use super::util::{config, make_component};
8
use anyhow::{Result, anyhow};
9
use component_async_tests::Ctx;
10
use component_async_tests::util::sleep;
11
use futures::{
12
FutureExt,
13
channel::oneshot,
14
stream::{FuturesUnordered, TryStreamExt},
15
};
16
use wasmtime::component::{Accessor, AccessorTask, HasSelf, Instance, Linker, ResourceTable, Val};
17
use wasmtime::{Engine, Store};
18
use wasmtime_wasi::WasiCtxBuilder;
19
20
#[tokio::test]
21
pub async fn async_round_trip_stackful() -> Result<()> {
22
test_round_trip_uncomposed(test_programs_artifacts::ASYNC_ROUND_TRIP_STACKFUL_COMPONENT).await
23
}
24
25
#[tokio::test]
26
pub async fn async_round_trip_synchronous() -> Result<()> {
27
test_round_trip_uncomposed(test_programs_artifacts::ASYNC_ROUND_TRIP_SYNCHRONOUS_COMPONENT)
28
.await
29
}
30
31
#[tokio::test]
32
pub async fn async_round_trip_wait() -> Result<()> {
33
test_round_trip_uncomposed(test_programs_artifacts::ASYNC_ROUND_TRIP_WAIT_COMPONENT).await
34
}
35
36
#[tokio::test]
37
pub async fn async_round_trip_stackless_plus_stackless() -> Result<()> {
38
test_round_trip_composed(
39
test_programs_artifacts::ASYNC_ROUND_TRIP_STACKLESS_COMPONENT,
40
test_programs_artifacts::ASYNC_ROUND_TRIP_STACKLESS_COMPONENT,
41
)
42
.await
43
}
44
45
#[tokio::test]
46
async fn async_round_trip_synchronous_plus_stackless() -> Result<()> {
47
test_round_trip_composed(
48
test_programs_artifacts::ASYNC_ROUND_TRIP_SYNCHRONOUS_COMPONENT,
49
test_programs_artifacts::ASYNC_ROUND_TRIP_STACKLESS_COMPONENT,
50
)
51
.await
52
}
53
54
#[tokio::test]
55
async fn async_round_trip_stackless_plus_synchronous() -> Result<()> {
56
test_round_trip_composed(
57
test_programs_artifacts::ASYNC_ROUND_TRIP_STACKLESS_COMPONENT,
58
test_programs_artifacts::ASYNC_ROUND_TRIP_SYNCHRONOUS_COMPONENT,
59
)
60
.await
61
}
62
63
#[tokio::test]
64
async fn async_round_trip_synchronous_plus_synchronous() -> Result<()> {
65
test_round_trip_composed(
66
test_programs_artifacts::ASYNC_ROUND_TRIP_SYNCHRONOUS_COMPONENT,
67
test_programs_artifacts::ASYNC_ROUND_TRIP_SYNCHRONOUS_COMPONENT,
68
)
69
.await
70
}
71
72
#[tokio::test]
73
async fn async_round_trip_wait_plus_wait() -> Result<()> {
74
test_round_trip_composed(
75
test_programs_artifacts::ASYNC_ROUND_TRIP_WAIT_COMPONENT,
76
test_programs_artifacts::ASYNC_ROUND_TRIP_WAIT_COMPONENT,
77
)
78
.await
79
}
80
81
#[tokio::test]
82
async fn async_round_trip_synchronous_plus_wait() -> Result<()> {
83
test_round_trip_composed(
84
test_programs_artifacts::ASYNC_ROUND_TRIP_SYNCHRONOUS_COMPONENT,
85
test_programs_artifacts::ASYNC_ROUND_TRIP_WAIT_COMPONENT,
86
)
87
.await
88
}
89
90
#[tokio::test]
91
async fn async_round_trip_wait_plus_synchronous() -> Result<()> {
92
test_round_trip_composed(
93
test_programs_artifacts::ASYNC_ROUND_TRIP_WAIT_COMPONENT,
94
test_programs_artifacts::ASYNC_ROUND_TRIP_SYNCHRONOUS_COMPONENT,
95
)
96
.await
97
}
98
99
#[tokio::test]
100
async fn async_round_trip_stackless_plus_wait() -> Result<()> {
101
test_round_trip_composed(
102
test_programs_artifacts::ASYNC_ROUND_TRIP_STACKLESS_COMPONENT,
103
test_programs_artifacts::ASYNC_ROUND_TRIP_WAIT_COMPONENT,
104
)
105
.await
106
}
107
108
#[tokio::test]
109
async fn async_round_trip_wait_plus_stackless() -> Result<()> {
110
test_round_trip_composed(
111
test_programs_artifacts::ASYNC_ROUND_TRIP_WAIT_COMPONENT,
112
test_programs_artifacts::ASYNC_ROUND_TRIP_STACKLESS_COMPONENT,
113
)
114
.await
115
}
116
117
#[tokio::test]
118
async fn async_round_trip_stackful_plus_stackful() -> Result<()> {
119
test_round_trip_composed(
120
test_programs_artifacts::ASYNC_ROUND_TRIP_STACKFUL_COMPONENT,
121
test_programs_artifacts::ASYNC_ROUND_TRIP_STACKFUL_COMPONENT,
122
)
123
.await
124
}
125
126
#[tokio::test]
127
async fn async_round_trip_stackful_plus_stackless() -> Result<()> {
128
test_round_trip_composed(
129
test_programs_artifacts::ASYNC_ROUND_TRIP_STACKFUL_COMPONENT,
130
test_programs_artifacts::ASYNC_ROUND_TRIP_STACKLESS_COMPONENT,
131
)
132
.await
133
}
134
135
#[tokio::test]
136
async fn async_round_trip_stackless_plus_stackful() -> Result<()> {
137
test_round_trip_composed(
138
test_programs_artifacts::ASYNC_ROUND_TRIP_STACKLESS_COMPONENT,
139
test_programs_artifacts::ASYNC_ROUND_TRIP_STACKFUL_COMPONENT,
140
)
141
.await
142
}
143
144
#[tokio::test]
145
async fn async_round_trip_synchronous_plus_stackful() -> Result<()> {
146
test_round_trip_composed(
147
test_programs_artifacts::ASYNC_ROUND_TRIP_SYNCHRONOUS_COMPONENT,
148
test_programs_artifacts::ASYNC_ROUND_TRIP_STACKFUL_COMPONENT,
149
)
150
.await
151
}
152
153
#[tokio::test]
154
async fn async_round_trip_stackful_plus_synchronous() -> Result<()> {
155
test_round_trip_composed(
156
test_programs_artifacts::ASYNC_ROUND_TRIP_STACKFUL_COMPONENT,
157
test_programs_artifacts::ASYNC_ROUND_TRIP_SYNCHRONOUS_COMPONENT,
158
)
159
.await
160
}
161
162
#[tokio::test]
163
pub async fn async_round_trip_stackless() -> Result<()> {
164
test_round_trip_uncomposed(test_programs_artifacts::ASYNC_ROUND_TRIP_STACKLESS_COMPONENT).await
165
}
166
167
#[tokio::test]
168
pub async fn async_round_trip_stackless_joined() -> Result<()> {
169
tokio::join!(
170
async {
171
test_round_trip_uncomposed(
172
test_programs_artifacts::ASYNC_ROUND_TRIP_STACKLESS_COMPONENT,
173
)
174
.await
175
.unwrap()
176
},
177
async {
178
test_round_trip_uncomposed(
179
test_programs_artifacts::ASYNC_ROUND_TRIP_STACKLESS_COMPONENT,
180
)
181
.await
182
.unwrap()
183
},
184
);
185
186
Ok(())
187
}
188
189
#[tokio::test]
190
pub async fn async_round_trip_stackless_sync_import() -> Result<()> {
191
test_round_trip_uncomposed(
192
test_programs_artifacts::ASYNC_ROUND_TRIP_STACKLESS_SYNC_IMPORT_COMPONENT,
193
)
194
.await
195
}
196
197
pub async fn test_round_trip(
198
components: &[&str],
199
inputs_and_outputs: &[(&str, &str)],
200
) -> Result<()> {
201
let engine = Engine::new(&config())?;
202
203
let make_store = || {
204
Store::new(
205
&engine,
206
Ctx {
207
wasi: WasiCtxBuilder::new().inherit_stdio().build(),
208
table: ResourceTable::default(),
209
continue_: false,
210
wakers: Arc::new(Mutex::new(None)),
211
},
212
)
213
};
214
215
let component = make_component(&engine, components).await?;
216
217
// On miri, we only use one call style per test since they take so long to
218
// run. On non-miri, we use every call style for each test.
219
static CALL_STYLE_COUNTER: AtomicU32 = AtomicU32::new(0);
220
let call_style = CALL_STYLE_COUNTER.fetch_add(1, Relaxed) % 5;
221
222
// First, test the `wasmtime-wit-bindgen` static API:
223
{
224
let mut linker = Linker::new(&engine);
225
226
wasmtime_wasi::p2::add_to_linker_async(&mut linker)?;
227
component_async_tests::round_trip::bindings::local::local::baz::add_to_linker::<_, Ctx>(
228
&mut linker,
229
|ctx| ctx,
230
)?;
231
232
let mut store = make_store();
233
234
let instance = linker.instantiate_async(&mut store, &component).await?;
235
let round_trip =
236
component_async_tests::round_trip::bindings::RoundTrip::new(&mut store, &instance)?;
237
238
if call_style == 0 || !cfg!(miri) {
239
// Run the test using `Instance::run_concurrent`:
240
instance
241
.run_concurrent(&mut store, {
242
let inputs_and_outputs = inputs_and_outputs
243
.iter()
244
.map(|(a, b)| (String::from(*a), String::from(*b)))
245
.collect::<Vec<_>>();
246
247
async move |accessor| {
248
let mut futures = FuturesUnordered::new();
249
for (input, output) in &inputs_and_outputs {
250
let output = output.clone();
251
futures.push(
252
round_trip
253
.local_local_baz()
254
.call_foo(accessor, input.clone())
255
.map(move |v| v.map(move |v| (v, output)))
256
.boxed(),
257
);
258
}
259
260
while let Some((actual, expected)) = futures.try_next().await? {
261
assert_eq!(expected, actual);
262
}
263
264
Ok::<_, wasmtime::Error>(())
265
}
266
})
267
.await??;
268
269
instance.assert_concurrent_state_empty(&mut store);
270
}
271
272
if call_style == 1 || !cfg!(miri) {
273
// And again using `Instance::spawn`:
274
struct Task {
275
instance: Instance,
276
inputs_and_outputs: Vec<(String, String)>,
277
tx: oneshot::Sender<()>,
278
}
279
280
impl AccessorTask<Ctx, HasSelf<Ctx>, Result<()>> for Task {
281
async fn run(self, accessor: &Accessor<Ctx>) -> Result<()> {
282
let round_trip = accessor.with(|mut store| {
283
component_async_tests::round_trip::bindings::RoundTrip::new(
284
&mut store,
285
&self.instance,
286
)
287
})?;
288
289
let mut futures = FuturesUnordered::new();
290
for (input, output) in &self.inputs_and_outputs {
291
let output = output.clone();
292
futures.push(
293
round_trip
294
.local_local_baz()
295
.call_foo(accessor, input.clone())
296
.map(move |v| v.map(move |v| (v, output)))
297
.boxed(),
298
);
299
}
300
301
while let Some((actual, expected)) = futures.try_next().await? {
302
assert_eq!(expected, actual);
303
}
304
305
_ = self.tx.send(());
306
307
Ok(())
308
}
309
}
310
311
let (tx, rx) = oneshot::channel();
312
instance.spawn(
313
&mut store,
314
Task {
315
instance,
316
inputs_and_outputs: inputs_and_outputs
317
.iter()
318
.map(|(a, b)| (String::from(*a), String::from(*b)))
319
.collect::<Vec<_>>(),
320
tx,
321
},
322
);
323
324
instance
325
.run_concurrent(&mut store, async |_| rx.await)
326
.await??;
327
328
instance.assert_concurrent_state_empty(&mut store);
329
}
330
331
if call_style == 2 || !cfg!(miri) {
332
// And again using `TypedFunc::call_async`-based bindings:
333
let round_trip =
334
component_async_tests::round_trip::non_concurrent_export_bindings::RoundTrip::new(
335
&mut store, &instance,
336
)?;
337
338
for (input, expected) in inputs_and_outputs {
339
assert_eq!(
340
*expected,
341
&round_trip
342
.local_local_baz()
343
.call_foo(&mut store, input)
344
.await?
345
);
346
}
347
348
instance.assert_concurrent_state_empty(&mut store);
349
}
350
}
351
352
// Now do it again using the dynamic API (except for WASI, where we stick with the static API):
353
{
354
let mut linker = Linker::new(&engine);
355
356
wasmtime_wasi::p2::add_to_linker_async(&mut linker)?;
357
linker
358
.root()
359
.instance("local:local/baz")?
360
.func_new_concurrent("[async]foo", |_, params, results| {
361
Box::pin(async move {
362
sleep(Duration::from_millis(10)).await;
363
let Some(Val::String(s)) = params.into_iter().next() else {
364
unreachable!()
365
};
366
results[0] = Val::String(format!("{s} - entered host - exited host"));
367
Ok(())
368
})
369
})?;
370
371
let mut store = make_store();
372
373
let instance = linker.instantiate_async(&mut store, &component).await?;
374
let baz_instance = instance
375
.get_export_index(&mut store, None, "local:local/baz")
376
.ok_or_else(|| anyhow!("can't find `local:local/baz` in instance"))?;
377
let foo_function = instance
378
.get_export_index(&mut store, Some(&baz_instance), "[async]foo")
379
.ok_or_else(|| anyhow!("can't find `foo` in instance"))?;
380
let foo_function = instance
381
.get_func(&mut store, foo_function)
382
.ok_or_else(|| anyhow!("can't find `foo` in instance"))?;
383
384
if call_style == 3 || !cfg!(miri) {
385
instance
386
.run_concurrent(&mut store, async |store| {
387
// Start three concurrent calls and then join them all:
388
let mut futures = FuturesUnordered::new();
389
for (input, output) in inputs_and_outputs {
390
let output = (*output).to_owned();
391
futures.push(async move {
392
let mut results = vec![Val::Bool(false)];
393
foo_function
394
.call_concurrent(
395
store,
396
&[Val::String((*input).to_owned())],
397
&mut results,
398
)
399
.await?;
400
anyhow::Ok((results, output))
401
});
402
}
403
404
while let Some((actual, expected)) = futures.try_next().await? {
405
let Some(Val::String(actual)) = actual.into_iter().next() else {
406
unreachable!()
407
};
408
assert_eq!(expected, actual);
409
}
410
anyhow::Ok(())
411
})
412
.await??;
413
414
instance.assert_concurrent_state_empty(&mut store);
415
}
416
417
if call_style == 4 || !cfg!(miri) {
418
// Now do it again using `Func::call_async`:
419
for (input, expected) in inputs_and_outputs {
420
let mut results = [Val::Bool(false)];
421
foo_function
422
.call_async(
423
&mut store,
424
&[Val::String((*input).to_owned())],
425
&mut results,
426
)
427
.await?;
428
let Val::String(actual) = &results[0] else {
429
unreachable!()
430
};
431
assert_eq!(*expected, actual);
432
foo_function.post_return_async(&mut store).await?;
433
}
434
435
instance.assert_concurrent_state_empty(&mut store);
436
}
437
}
438
439
Ok(())
440
}
441
442
pub async fn test_round_trip_uncomposed(component: &str) -> Result<()> {
443
test_round_trip(
444
&[component],
445
&[
446
(
447
"hello, world!",
448
"hello, world! - entered guest - entered host - exited host - exited guest",
449
),
450
(
451
"¡hola, mundo!",
452
"¡hola, mundo! - entered guest - entered host - exited host - exited guest",
453
),
454
(
455
"hi y'all!",
456
"hi y'all! - entered guest - entered host - exited host - exited guest",
457
),
458
],
459
)
460
.await
461
}
462
463
pub async fn test_round_trip_composed(a: &str, b: &str) -> Result<()> {
464
test_round_trip(
465
&[a, b],
466
&[
467
(
468
"hello, world!",
469
"hello, world! - entered guest - entered guest - entered host \
470
- exited host - exited guest - exited guest",
471
),
472
(
473
"¡hola, mundo!",
474
"¡hola, mundo! - entered guest - entered guest - entered host \
475
- exited host - exited guest - exited guest",
476
),
477
(
478
"hi y'all!",
479
"hi y'all! - entered guest - entered guest - entered host \
480
- exited host - exited guest - exited guest",
481
),
482
],
483
)
484
.await
485
}
486
487