Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bytecodealliance
GitHub Repository: bytecodealliance/wasmtime
Path: blob/main/crates/fuzzing/src/oracles/component_async.rs
3061 views
1
//! For a high-level overview of this fuzz target see `fuzz_async.rs`
2
3
use crate::block_on;
4
use crate::generators::component_async::exports::wasmtime_fuzz::fuzz::async_test::Guest;
5
use crate::generators::component_async::wasmtime_fuzz::fuzz::async_test::{self, Command};
6
use crate::generators::component_async::wasmtime_fuzz::fuzz::types;
7
use crate::generators::component_async::{ComponentAsync, FuzzAsyncPre, Scope};
8
use futures::channel::oneshot;
9
use std::collections::{HashMap, HashSet};
10
use std::mem;
11
use std::pin::Pin;
12
use std::sync::{Arc, OnceLock, Weak};
13
use std::task::{Context, Poll, Waker};
14
use std::time::Instant;
15
use wasmtime::component::{
16
Access, Accessor, AccessorTask, Component, Destination, FutureConsumer, FutureProducer,
17
FutureReader, HasSelf, Linker, ResourceTable, Source, StreamConsumer, StreamProducer,
18
StreamReader, StreamResult, VecBuffer,
19
};
20
use wasmtime::{AsContextMut, Config, Engine, Result, Store, StoreContextMut};
21
use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};
22
23
static STATE: OnceLock<(Engine, FuzzAsyncPre<Data>)> = OnceLock::new();
24
25
/// Initializes state for future fuzz runs.
26
///
27
/// This will create an `Engine` to run this fuzzer within and it will
28
/// additionally precompile the component that will be used for fuzzing.
29
///
30
/// There are a few points of note about this:
31
///
32
/// * The `misc` fuzzer is manually instrumented with this function as the init
33
/// hook to ensure this runs before any other fuzzing.
34
///
35
/// * Compilation of the component takes quite some time with
36
/// fuzzing-instrumented Cranelift. To assist with local development this
37
/// implements a cache which is serialized/deserialized via an env var.
38
pub fn init() {
39
crate::init_fuzzing();
40
41
STATE.get_or_init(|| {
42
let mut config = Config::new();
43
config.wasm_component_model_async(true);
44
let engine = Engine::new(&config).unwrap();
45
let component = compile(&engine);
46
let mut linker = Linker::new(&engine);
47
wasmtime_wasi::p2::add_to_linker_async(&mut linker).unwrap();
48
async_test::add_to_linker::<_, HasSelf<Data>>(&mut linker, |d| d).unwrap();
49
types::add_to_linker::<_, HasSelf<Data>>(&mut linker, |d| d).unwrap();
50
51
let pre = linker.instantiate_pre(&component).unwrap();
52
let pre = FuzzAsyncPre::new(pre).unwrap();
53
54
(engine, pre)
55
});
56
57
fn compile(engine: &Engine) -> Component {
58
let wasm_path = test_programs_artifacts::FUZZ_ASYNC_COMPONENT;
59
let wasm = test_programs_artifacts::fuzz_async_component_bytes!();
60
let wasm = &wasm[..];
61
let cwasm_cache = std::env::var("COMPONENT_ASYNC_CWASM_CACHE").ok();
62
if let Some(path) = &cwasm_cache
63
&& let Ok(cwasm_mtime) = std::fs::metadata(&path).and_then(|m| m.modified())
64
&& let Ok(wasm_mtime) = std::fs::metadata(wasm_path).and_then(|m| m.modified())
65
&& cwasm_mtime > wasm_mtime
66
{
67
log::debug!("Using cached component async cwasm at {path}");
68
unsafe {
69
return Component::deserialize_file(engine, path).unwrap();
70
}
71
}
72
73
let composition = {
74
let mut config = wasm_compose::config::Config::default();
75
let tempdir = tempfile::TempDir::new().unwrap();
76
let path = tempdir.path().join("fuzz-async.wasm");
77
std::fs::write(&path, wasm).unwrap();
78
config.definitions.push(path.clone());
79
80
wasm_compose::composer::ComponentComposer::new(&path, &config)
81
.compose()
82
.unwrap()
83
};
84
let start = Instant::now();
85
let component = Component::new(&engine, &composition).unwrap();
86
if let Some(path) = cwasm_cache {
87
log::debug!("Caching component async cwasm to {path}");
88
std::fs::write(path, &component.serialize().unwrap()).unwrap();
89
} else if start.elapsed() > std::time::Duration::from_secs(1) {
90
eprintln!(
91
"
92
!!!!!!!!!!!!!!!!!!!!!!!!!!
93
94
Component compilation is slow, try setting `COMPONENT_ASYNC_CWASM_CACHE=path` to
95
cache compilation results
96
97
!!!!!!!!!!!!!!!!!!!!!!!!!!
98
"
99
);
100
}
101
return component;
102
}
103
}
104
105
#[derive(Default)]
106
struct Data {
107
ctx: WasiCtx,
108
table: ResourceTable,
109
wakers: HashMap<Scope, Waker>,
110
commands: Vec<(Scope, Command)>,
111
112
guest_caller_stream: Option<StreamReader<Command>>,
113
guest_callee_stream: Option<StreamReader<Command>>,
114
115
host_pending_async_calls: HashMap<u32, oneshot::Sender<()>>,
116
host_pending_async_calls_cancelled: HashSet<u32>,
117
guest_pending_async_calls_ready: HashSet<u32>,
118
119
// State of futures/streams. Note that while #12091 is unresolved an
120
// `Arc`/`Weak` combo is used to detect when wasmtime drops futures/streams
121
// and the various halves we're interacting with using traits.
122
host_futures: HashMap<u32, FutureReader<u32>>,
123
host_future_producers: HashMap<u32, (HostFutureProducerState, Weak<()>)>,
124
host_future_consumers: HashMap<u32, (HostFutureConsumerState, Weak<()>)>,
125
host_streams: HashMap<u32, StreamReader<u32>>,
126
host_stream_producers: HashMap<u32, (HostStreamProducerState, Weak<()>)>,
127
host_stream_consumers: HashMap<u32, (HostStreamConsumerState, Weak<()>)>,
128
}
129
130
impl WasiView for Data {
131
fn ctx(&mut self) -> WasiCtxView<'_> {
132
WasiCtxView {
133
ctx: &mut self.ctx,
134
table: &mut self.table,
135
}
136
}
137
}
138
139
impl async_test::HostWithStore for HasSelf<Data> {
140
async fn async_ready<T>(_store: &Accessor<T, Self>) {}
141
142
async fn async_pending<T>(store: &Accessor<T, Self>, id: u32) {
143
let (tx, rx) = oneshot::channel();
144
store.with(|mut s| s.get().host_pending_async_calls.insert(id, tx));
145
let record = RecordCancelOnDrop { store, id };
146
rx.await.unwrap();
147
mem::forget(record);
148
149
struct RecordCancelOnDrop<'a, T: 'static> {
150
store: &'a Accessor<T, HasSelf<Data>>,
151
id: u32,
152
}
153
154
impl<T> Drop for RecordCancelOnDrop<'_, T> {
155
fn drop(&mut self) {
156
self.store.with(|mut s| {
157
s.get().host_pending_async_calls_cancelled.insert(self.id);
158
});
159
}
160
}
161
}
162
163
async fn init<T>(_store: &Accessor<T, Self>, _scope: types::Scope) {}
164
}
165
166
impl async_test::Host for Data {
167
fn sync_ready(&mut self) {}
168
169
fn future_take(&mut self, id: u32) -> FutureReader<u32> {
170
self.host_futures.remove(&id).unwrap()
171
}
172
173
fn future_receive(&mut self, id: u32, future: FutureReader<u32>) {
174
let prev = self.host_futures.insert(id, future);
175
assert!(prev.is_none());
176
}
177
178
fn stream_take(&mut self, id: u32) -> StreamReader<u32> {
179
self.host_streams.remove(&id).unwrap()
180
}
181
182
fn stream_receive(&mut self, id: u32, stream: StreamReader<u32>) {
183
let prev = self.host_streams.insert(id, stream);
184
assert!(prev.is_none());
185
}
186
}
187
188
impl types::HostWithStore for HasSelf<Data> {
189
fn get_commands<T>(
190
mut store: Access<'_, T, Self>,
191
scope: types::Scope,
192
) -> StreamReader<Command> {
193
let data = store.get();
194
match scope {
195
types::Scope::Caller => data.guest_caller_stream.take().unwrap(),
196
types::Scope::Callee => data.guest_callee_stream.take().unwrap(),
197
}
198
}
199
}
200
201
impl types::Host for Data {}
202
203
/// Executes the `input` provided, assuming that `init` has been previously
204
/// executed.
205
pub fn run(mut input: ComponentAsync) {
206
log::debug!("Running component async fuzz test with\n{input:?}");
207
208
// Commands are executed in the order that they're listed in the input, but
209
// to make it easier on the `StreamProducer` implementation below they're
210
// popped off the back. To ensure that they're all delivered in the right
211
// order reverse the list to ensure the correct order is maintained.
212
input.commands.reverse();
213
214
let (engine, pre) = STATE.get().unwrap();
215
let mut store = Store::new(
216
engine,
217
Data {
218
ctx: WasiCtx::builder().inherit_stdio().inherit_env().build(),
219
commands: input.commands,
220
..Data::default()
221
},
222
);
223
224
let guest_caller_stream = StreamReader::new(&mut store, SharedStream(Scope::GuestCaller));
225
let guest_callee_stream = StreamReader::new(&mut store, SharedStream(Scope::GuestCallee));
226
store.data_mut().guest_caller_stream = Some(guest_caller_stream);
227
store.data_mut().guest_callee_stream = Some(guest_callee_stream);
228
block_on(async {
229
let instance = pre.instantiate_async(&mut store).await.unwrap();
230
let test = instance.wasmtime_fuzz_fuzz_async_test();
231
232
let mut host_caller = SharedStream(Scope::HostCaller);
233
let mut host_callee = SharedStream(Scope::HostCallee);
234
store
235
.run_concurrent(async |store| {
236
// Kick off stream reads in the guest. This function will return
237
// but the tasks in the guest will keep running after they
238
// return to process stream items.
239
test.call_init(store, types::Scope::Caller).await.unwrap();
240
241
// Simultaneously process commands from both host streams. These
242
// will return once the entire command queue is exhausted.
243
futures::join!(
244
async {
245
while let Some(cmd) = host_caller.next(store).await {
246
host_caller_cmd(&test, store, cmd).await;
247
}
248
},
249
async {
250
while let Some(cmd) = host_callee.next(store).await {
251
host_callee_cmd(store, cmd).await;
252
}
253
},
254
);
255
256
// Note that there may still be pending async work in the guest
257
// (or host). It's intentional that it's not cleaned up here to
258
// help test situations where async work is all abruptly
259
// cancelled by just being dropped in the host.
260
})
261
.await
262
.unwrap();
263
});
264
}
265
266
/// See documentation in `fuzz_async.rs` for what's going on here.
267
async fn test_property<F>(store: &Accessor<Data>, mut f: F) -> bool
268
where
269
F: FnMut(&mut Data) -> bool,
270
{
271
for _ in 0..1000 {
272
let ready = store.with(|mut s| f(s.get()));
273
if ready {
274
return true;
275
}
276
277
crate::YieldN(1).await;
278
}
279
280
return false;
281
}
282
283
async fn await_property<F>(store: &Accessor<Data>, desc: &str, f: F)
284
where
285
F: FnMut(&mut Data) -> bool,
286
{
287
assert!(
288
test_property(store, f).await,
289
"timed out waiting for {desc}",
290
);
291
}
292
293
async fn host_caller_cmd(test: &Guest, store: &Accessor<Data>, cmd: Command) {
294
match cmd {
295
Command::Ack => {}
296
Command::SyncReadyCall => test.call_sync_ready(store).await.unwrap(),
297
Command::AsyncReadyCall => test.call_async_ready(store).await.unwrap(),
298
Command::AsyncPendingExportComplete(_i) => todo!(),
299
Command::AsyncPendingExportAssertCancelled(_i) => todo!(),
300
Command::AsyncPendingImportCall(i) => {
301
struct RunPendingImport {
302
test: Guest,
303
i: u32,
304
}
305
306
store.spawn(RunPendingImport {
307
test: test.clone(),
308
i,
309
});
310
311
impl AccessorTask<Data> for RunPendingImport {
312
async fn run(self, store: &Accessor<Data>) -> Result<()> {
313
self.test.call_async_pending(store, self.i).await?;
314
store.with(|mut s| {
315
s.get().guest_pending_async_calls_ready.insert(self.i);
316
});
317
Ok(())
318
}
319
}
320
}
321
Command::AsyncPendingImportCancel(_i) => todo!(),
322
Command::AsyncPendingImportAssertReady(i) => {
323
assert!(
324
test_property(store, |s| s.guest_pending_async_calls_ready.remove(&i)).await,
325
"expected async_pending import {i} to be ready",
326
);
327
}
328
329
Command::FutureTake(i) => {
330
let future = test.call_future_take(store, i).await.unwrap();
331
store.with(|mut s| {
332
let prev = s.get().host_futures.insert(i, future);
333
assert!(prev.is_none());
334
});
335
}
336
Command::FutureGive(i) => {
337
let future = store.with(|mut s| s.get().host_futures.remove(&i).unwrap());
338
test.call_future_receive(store, i, future).await.unwrap();
339
}
340
Command::StreamTake(i) => {
341
let stream = test.call_stream_take(store, i).await.unwrap();
342
store.with(|mut s| {
343
let prev = s.get().host_streams.insert(i, stream);
344
assert!(prev.is_none());
345
});
346
}
347
Command::StreamGive(i) => {
348
let stream = store.with(|mut s| s.get().host_streams.remove(&i).unwrap());
349
test.call_stream_receive(store, i, stream).await.unwrap();
350
}
351
352
other => future_or_stream_cmd(store, other).await,
353
}
354
}
355
356
async fn host_callee_cmd(store: &Accessor<Data>, cmd: Command) {
357
match cmd {
358
Command::Ack => {}
359
Command::SyncReadyCall => todo!(),
360
Command::AsyncReadyCall => todo!(),
361
Command::AsyncPendingExportComplete(i) => store.with(|mut s| {
362
s.get()
363
.host_pending_async_calls
364
.remove(&i)
365
.unwrap()
366
.send(())
367
.unwrap();
368
}),
369
Command::AsyncPendingExportAssertCancelled(i) => {
370
assert!(
371
test_property(store, |s| s.host_pending_async_calls_cancelled.remove(&i)).await,
372
"expected async_pending export {i} to be cancelled",
373
);
374
}
375
Command::AsyncPendingImportCall(_i) => todo!(),
376
Command::AsyncPendingImportCancel(_i) => todo!(),
377
Command::AsyncPendingImportAssertReady(_i) => todo!(),
378
379
other => future_or_stream_cmd(store, other).await,
380
}
381
}
382
383
async fn future_or_stream_cmd(store: &Accessor<Data>, cmd: Command) {
384
match cmd {
385
// These commands should be handled above
386
Command::Ack
387
| Command::SyncReadyCall
388
| Command::AsyncReadyCall
389
| Command::AsyncPendingExportComplete(_)
390
| Command::AsyncPendingExportAssertCancelled(_)
391
| Command::AsyncPendingImportCall(_)
392
| Command::AsyncPendingImportCancel(_)
393
| Command::FutureTake(_)
394
| Command::FutureGive(_)
395
| Command::StreamTake(_)
396
| Command::StreamGive(_)
397
| Command::AsyncPendingImportAssertReady(_) => unreachable!(),
398
399
Command::FutureNew(id) => {
400
store.with(|mut s| {
401
let arc = Arc::new(());
402
let weak = Arc::downgrade(&arc);
403
let future = FutureReader::new(&mut s, HostFutureProducer(id, arc));
404
let data = s.get();
405
let prev = data.host_futures.insert(id, future);
406
assert!(prev.is_none());
407
let prev = data
408
.host_future_producers
409
.insert(id, (HostFutureProducerState::Idle, weak));
410
assert!(prev.is_none());
411
});
412
}
413
Command::FutureDropReadable(id) => {
414
store.with(|mut s| match s.get().host_futures.remove(&id) {
415
Some(mut future) => future.close(&mut s),
416
None => {
417
let (mut state, _weak) = s.get().host_future_consumers.remove(&id).unwrap();
418
state.wake_by_ref();
419
}
420
})
421
}
422
Command::FutureWriteReady(payload) => {
423
await_property(store, "future write should be waiting", |s| {
424
matches!(
425
s.host_future_producers.get(&payload.future),
426
Some((HostFutureProducerState::Waiting(_), _))
427
)
428
})
429
.await;
430
store.with(|mut s| {
431
let state = s
432
.get()
433
.host_future_producers
434
.get_mut(&payload.future)
435
.unwrap();
436
match state {
437
(HostFutureProducerState::Waiting(waker), _) => {
438
waker.wake_by_ref();
439
state.0 = HostFutureProducerState::Writing(payload.item);
440
}
441
(state, _) => panic!("future not waiting: {state:?}"),
442
}
443
})
444
}
445
Command::FutureWritePending(payload) => store.with(|mut s| {
446
let state = s
447
.get()
448
.host_future_producers
449
.get_mut(&payload.future)
450
.unwrap();
451
match state {
452
(HostFutureProducerState::Idle, _) => {
453
state.0 = HostFutureProducerState::Writing(payload.item);
454
}
455
_ => panic!("future not idle"),
456
}
457
}),
458
Command::FutureWriteDropped(id) => store.with(|mut s| {
459
let (state, weak) = s.get().host_future_producers.remove(&id).unwrap();
460
assert!(matches!(state, HostFutureProducerState::Idle));
461
assert!(weak.upgrade().is_none());
462
}),
463
Command::FutureReadReady(payload) => {
464
let id = payload.future;
465
store.with(|mut s| {
466
let arc = Arc::new(());
467
let weak = Arc::downgrade(&arc);
468
let data = s.get();
469
let future = data.host_futures.remove(&id).unwrap();
470
let prev = data
471
.host_future_consumers
472
.insert(id, (HostFutureConsumerState::Consuming, weak));
473
assert!(prev.is_none());
474
future.pipe(&mut s, HostFutureConsumer(id, arc));
475
});
476
477
await_property(store, "future should be present", |s| {
478
matches!(
479
s.host_future_consumers[&id],
480
(HostFutureConsumerState::Complete(_), _)
481
)
482
})
483
.await;
484
485
store.with(|mut s| {
486
let (state, _) = s.get().host_future_consumers.remove(&id).unwrap();
487
match state {
488
HostFutureConsumerState::Complete(i) => assert_eq!(i, payload.item),
489
_ => panic!("future not complete"),
490
}
491
});
492
}
493
Command::FutureReadPending(id) => {
494
ensure_future_reading(store, id);
495
store.with(|mut s| {
496
let (state, _) = s.get().host_future_consumers.get_mut(&id).unwrap();
497
state.wake_by_ref();
498
assert!(
499
matches!(state, HostFutureConsumerState::Idle),
500
"bad state: {state:?}",
501
);
502
*state = HostFutureConsumerState::Consuming;
503
})
504
}
505
Command::FutureCancelWrite(id) => store.with(|mut s| {
506
let (state, _) = s.get().host_future_producers.get_mut(&id).unwrap();
507
assert!(matches!(state, HostFutureProducerState::Writing(_)));
508
*state = HostFutureProducerState::Idle;
509
}),
510
Command::FutureCancelRead(id) => store.with(|mut s| {
511
let (state, _) = s.get().host_future_consumers.get_mut(&id).unwrap();
512
assert!(matches!(state, HostFutureConsumerState::Consuming));
513
*state = HostFutureConsumerState::Idle;
514
}),
515
Command::FutureReadAssertComplete(payload) => {
516
await_property(store, "future read should be complete", |s| {
517
matches!(
518
s.host_future_consumers.get(&payload.future),
519
Some((HostFutureConsumerState::Complete(_), _))
520
)
521
})
522
.await;
523
store.with(|mut s| {
524
let (state, _) = s
525
.get()
526
.host_future_consumers
527
.remove(&payload.future)
528
.unwrap();
529
match state {
530
HostFutureConsumerState::Complete(i) => assert_eq!(i, payload.item),
531
_ => panic!("future not complete"),
532
}
533
})
534
}
535
Command::FutureWriteAssertComplete(id) => store.with(|mut s| {
536
let (state, weak) = s.get().host_future_producers.remove(&id).unwrap();
537
assert!(matches!(state, HostFutureProducerState::Complete));
538
assert!(weak.upgrade().is_none());
539
}),
540
Command::FutureWriteAssertDropped(id) => store.with(|mut s| {
541
let (state, weak) = s.get().host_future_producers.remove(&id).unwrap();
542
assert!(matches!(state, HostFutureProducerState::Writing(_)));
543
assert!(weak.upgrade().is_none());
544
}),
545
546
Command::StreamNew(id) => {
547
store.with(|mut s| {
548
let arc = Arc::new(());
549
let weak = Arc::downgrade(&arc);
550
let stream = StreamReader::new(&mut s, HostStreamProducer(id, arc));
551
let data = s.get();
552
let prev = data.host_streams.insert(id, stream);
553
assert!(prev.is_none());
554
let prev = data
555
.host_stream_producers
556
.insert(id, (HostStreamProducerState::idle(), weak));
557
assert!(prev.is_none());
558
});
559
}
560
Command::StreamDropReadable(id) => {
561
store.with(|mut s| match s.get().host_streams.remove(&id) {
562
Some(mut stream) => {
563
stream.close(&mut s);
564
}
565
None => {
566
let (mut state, _weak) = s.get().host_stream_consumers.remove(&id).unwrap();
567
state.wake_by_ref();
568
}
569
})
570
}
571
Command::StreamDropWritable(id) => store.with(|mut s| {
572
let (mut state, _weak) = s.get().host_stream_producers.remove(&id).unwrap();
573
state.wake_by_ref();
574
}),
575
Command::StreamWriteReady(payload) => {
576
let id = payload.stream;
577
store.with(|mut s| {
578
let (state, _) = s.get().host_stream_producers.get_mut(&id).unwrap();
579
state.wake_by_ref();
580
match state.kind {
581
HostStreamProducerStateKind::Idle => {
582
state.kind = HostStreamProducerStateKind::Writing(stream_payload(
583
payload.item,
584
payload.op_count,
585
));
586
}
587
_ => panic!("stream not idle: {state:?}"),
588
}
589
});
590
await_property(store, "stream should complete a write", |s| {
591
matches!(
592
s.host_stream_producers[&id].0.kind,
593
HostStreamProducerStateKind::Wrote(_),
594
)
595
})
596
.await;
597
store.with(|mut s| {
598
let (state, _) = s.get().host_stream_producers.get_mut(&id).unwrap();
599
match state.kind {
600
HostStreamProducerStateKind::Wrote(amt) => {
601
assert_eq!(amt, payload.ready_count);
602
state.kind = HostStreamProducerStateKind::Idle;
603
}
604
_ => panic!("stream not idle: {state:?}"),
605
}
606
});
607
}
608
Command::StreamReadReady(payload) => {
609
let id = payload.stream;
610
ensure_stream_reading(store, id);
611
store.with(|mut s| {
612
let (state, _) = s.get().host_stream_consumers.get_mut(&id).unwrap();
613
state.wake_by_ref();
614
state.kind = HostStreamConsumerStateKind::Consuming(payload.op_count);
615
});
616
await_property(store, "stream should complete a read", |s| {
617
matches!(
618
s.host_stream_consumers[&id].0.kind,
619
HostStreamConsumerStateKind::Consumed(_),
620
)
621
})
622
.await;
623
624
store.with(|mut s| {
625
let (state, _) = s.get().host_stream_consumers.get_mut(&id).unwrap();
626
match &state.kind {
627
HostStreamConsumerStateKind::Consumed(last_read) => {
628
assert_eq!(
629
*last_read,
630
stream_payload(payload.item, payload.ready_count)
631
);
632
state.kind = HostStreamConsumerStateKind::Idle;
633
}
634
_ => panic!("future not complete"),
635
}
636
});
637
}
638
Command::StreamWritePending(payload) => store.with(|mut s| {
639
let (state, _) = s
640
.get()
641
.host_stream_producers
642
.get_mut(&payload.stream)
643
.unwrap();
644
state.wake_by_ref();
645
match state.kind {
646
HostStreamProducerStateKind::Idle => {
647
state.kind = HostStreamProducerStateKind::Writing(stream_payload(
648
payload.item,
649
payload.count,
650
));
651
}
652
_ => panic!("stream not idle {:?}", state.kind),
653
}
654
}),
655
Command::StreamReadPending(payload) => {
656
ensure_stream_reading(store, payload.stream);
657
store.with(|mut s| {
658
let (state, _) = s
659
.get()
660
.host_stream_consumers
661
.get_mut(&payload.stream)
662
.unwrap();
663
state.wake_by_ref();
664
assert!(matches!(state.kind, HostStreamConsumerStateKind::Idle));
665
state.kind = HostStreamConsumerStateKind::Consuming(payload.count);
666
})
667
}
668
Command::StreamWriteDropped(payload) => store.with(|mut s| {
669
let (state, weak) = s
670
.get()
671
.host_stream_producers
672
.get_mut(&payload.stream)
673
.unwrap();
674
assert!(matches!(state.kind, HostStreamProducerStateKind::Idle));
675
assert!(weak.upgrade().is_none());
676
}),
677
Command::StreamReadDropped(payload) => {
678
ensure_stream_reading(store, payload.stream);
679
await_property(store, "stream read should get dropped", |s| {
680
let weak = &s.host_stream_consumers[&payload.stream].1;
681
weak.upgrade().is_none()
682
})
683
.await;
684
store.with(|mut s| {
685
let (state, weak) = s
686
.get()
687
.host_stream_consumers
688
.get_mut(&payload.stream)
689
.unwrap();
690
assert!(matches!(state.kind, HostStreamConsumerStateKind::Idle));
691
assert!(weak.upgrade().is_none());
692
})
693
}
694
Command::StreamCancelWrite(id) => store.with(|mut s| {
695
let (state, _) = s.get().host_stream_producers.get_mut(&id).unwrap();
696
assert!(
697
matches!(state.kind, HostStreamProducerStateKind::Writing(_)),
698
"invalid state {state:?}",
699
);
700
state.kind = HostStreamProducerStateKind::Idle;
701
state.wake_by_ref();
702
}),
703
Command::StreamCancelRead(id) => store.with(|mut s| {
704
let (state, _) = s.get().host_stream_consumers.get_mut(&id).unwrap();
705
assert!(matches!(
706
state.kind,
707
HostStreamConsumerStateKind::Consuming(_)
708
));
709
state.kind = HostStreamConsumerStateKind::Idle;
710
}),
711
Command::StreamReadAssertComplete(payload) => store.with(|mut s| {
712
let (state, _) = s
713
.get()
714
.host_stream_consumers
715
.get_mut(&payload.stream)
716
.unwrap();
717
match &state.kind {
718
HostStreamConsumerStateKind::Consumed(last_read) => {
719
assert_eq!(*last_read, stream_payload(payload.item, payload.count));
720
state.kind = HostStreamConsumerStateKind::Idle;
721
}
722
_ => panic!("stream not complete"),
723
}
724
}),
725
Command::StreamWriteAssertComplete(payload) => store.with(|mut s| {
726
let (state, _) = s
727
.get()
728
.host_stream_producers
729
.get_mut(&payload.stream)
730
.unwrap();
731
match state.kind {
732
HostStreamProducerStateKind::Wrote(amt) => {
733
assert_eq!(amt, payload.count);
734
state.kind = HostStreamProducerStateKind::Idle;
735
}
736
_ => panic!("stream not complete: {:?}", state.kind),
737
}
738
}),
739
Command::StreamWriteAssertDropped(payload) => {
740
await_property(store, "stream write should be dropped", |s| {
741
let weak = &s.host_stream_producers[&payload.stream].1;
742
weak.upgrade().is_none()
743
})
744
.await;
745
store.with(|mut s| {
746
let (state, weak) = s
747
.get()
748
.host_stream_producers
749
.get_mut(&payload.stream)
750
.unwrap();
751
assert!(matches!(
752
state.kind,
753
HostStreamProducerStateKind::Writing(_)
754
));
755
assert!(weak.upgrade().is_none());
756
})
757
}
758
Command::StreamReadAssertDropped(id) => {
759
await_property(store, "stream read should be dropped", |s| {
760
let weak = &s.host_stream_consumers[&id].1;
761
weak.upgrade().is_none()
762
})
763
.await;
764
store.with(|mut s| {
765
let (state, weak) = s.get().host_stream_consumers.get_mut(&id).unwrap();
766
assert!(matches!(
767
state.kind,
768
HostStreamConsumerStateKind::Consuming(_),
769
));
770
assert!(weak.upgrade().is_none());
771
})
772
}
773
}
774
}
775
776
fn stream_payload(item: u32, count: u32) -> Vec<u32> {
777
(item..item + count).collect()
778
}
779
780
fn ensure_future_reading(store: &Accessor<Data>, id: u32) {
781
store.with(|mut s| {
782
let data = s.get();
783
if !data.host_futures.contains_key(&id) {
784
return;
785
}
786
log::debug!("future consume: start {id}");
787
let arc = Arc::new(());
788
let weak = Arc::downgrade(&arc);
789
let data = s.get();
790
let future = data.host_futures.remove(&id).unwrap();
791
let prev = data
792
.host_future_consumers
793
.insert(id, (HostFutureConsumerState::Idle, weak));
794
assert!(prev.is_none());
795
future.pipe(&mut s, HostFutureConsumer(id, arc));
796
});
797
}
798
799
fn ensure_stream_reading(store: &Accessor<Data>, id: u32) {
800
store.with(|mut s| {
801
let data = s.get();
802
if !data.host_streams.contains_key(&id) {
803
return;
804
}
805
log::debug!("stream consume: start {id}");
806
let arc = Arc::new(());
807
let weak = Arc::downgrade(&arc);
808
let prev = data.host_stream_consumers.insert(
809
id,
810
(
811
HostStreamConsumerState {
812
kind: HostStreamConsumerStateKind::Idle,
813
waker: None,
814
},
815
weak,
816
),
817
);
818
assert!(prev.is_none());
819
let stream = data.host_streams.remove(&id).unwrap();
820
stream.pipe(&mut s, HostStreamConsumer(id, arc));
821
});
822
}
823
824
struct HostFutureConsumer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>);
825
826
/// Note that this is only created once a read is actually initiated on a
827
/// future. It's also not possible to cancel a host-based read on a future,
828
/// hence why this is simpler than the `HostFutureProducerState` state below.
829
#[derive(Debug)]
830
enum HostFutureConsumerState {
831
Idle,
832
Waiting(Waker),
833
Consuming,
834
Complete(u32),
835
}
836
837
impl HostFutureConsumerState {
838
fn wake_by_ref(&mut self) {
839
if let HostFutureConsumerState::Waiting(waker) = &self {
840
waker.wake_by_ref();
841
*self = HostFutureConsumerState::Idle;
842
}
843
}
844
}
845
846
impl FutureConsumer<Data> for HostFutureConsumer {
847
type Item = u32;
848
849
fn poll_consume(
850
self: Pin<&mut Self>,
851
cx: &mut Context<'_>,
852
mut store: StoreContextMut<'_, Data>,
853
mut source: Source<'_, Self::Item>,
854
finish: bool,
855
) -> Poll<Result<()>> {
856
let state = match store.data_mut().host_future_consumers.get_mut(&self.0) {
857
Some(state) => state,
858
None => {
859
log::debug!("consume: closed {}", self.0);
860
return Poll::Ready(Ok(()));
861
}
862
};
863
match state.0 {
864
HostFutureConsumerState::Idle | HostFutureConsumerState::Waiting(_) => {
865
if finish {
866
log::debug!("consume: cancel {}", self.0);
867
state.0 = HostFutureConsumerState::Idle;
868
Poll::Ready(Ok(()))
869
} else {
870
log::debug!("consume: wait {}", self.0);
871
state.0 = HostFutureConsumerState::Waiting(cx.waker().clone());
872
Poll::Pending
873
}
874
}
875
HostFutureConsumerState::Consuming => {
876
log::debug!("consume: done {}", self.0);
877
let mut item = None;
878
source.read(&mut store, &mut item).unwrap();
879
store
880
.data_mut()
881
.host_future_consumers
882
.get_mut(&self.0)
883
.unwrap()
884
.0 = HostFutureConsumerState::Complete(item.unwrap());
885
Poll::Ready(Ok(()))
886
}
887
HostFutureConsumerState::Complete(_) => unreachable!(),
888
}
889
}
890
}
891
892
struct HostFutureProducer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>);
893
894
#[derive(Debug)]
895
enum HostFutureProducerState {
896
Idle,
897
Waiting(Waker),
898
Writing(u32),
899
Complete,
900
}
901
902
impl FutureProducer<Data> for HostFutureProducer {
903
type Item = u32;
904
905
fn poll_produce(
906
self: Pin<&mut Self>,
907
cx: &mut Context<'_>,
908
mut store: StoreContextMut<'_, Data>,
909
finish: bool,
910
) -> Poll<Result<Option<Self::Item>>> {
911
let state = store
912
.data_mut()
913
.host_future_producers
914
.get_mut(&self.0)
915
.unwrap();
916
match state.0 {
917
HostFutureProducerState::Idle | HostFutureProducerState::Waiting(_) => {
918
if finish {
919
log::debug!("produce: cancel {}", self.0);
920
state.0 = HostFutureProducerState::Idle;
921
Poll::Ready(Ok(None))
922
} else {
923
log::debug!("produce: wait {}", self.0);
924
state.0 = HostFutureProducerState::Waiting(cx.waker().clone());
925
Poll::Pending
926
}
927
}
928
HostFutureProducerState::Writing(item) => {
929
log::debug!("produce: done {}", self.0);
930
state.0 = HostFutureProducerState::Complete;
931
Poll::Ready(Ok(Some(item)))
932
}
933
HostFutureProducerState::Complete => unreachable!(),
934
}
935
}
936
}
937
938
struct HostStreamConsumer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>);
939
940
#[derive(Debug)]
941
struct HostStreamConsumerState {
942
waker: Option<Waker>,
943
kind: HostStreamConsumerStateKind,
944
}
945
946
#[derive(Debug)]
947
enum HostStreamConsumerStateKind {
948
Idle,
949
Consuming(u32),
950
Consumed(Vec<u32>),
951
}
952
953
impl HostStreamConsumerState {
954
fn wake_by_ref(&mut self) {
955
if let Some(waker) = self.waker.take() {
956
waker.wake();
957
}
958
}
959
}
960
961
impl StreamConsumer<Data> for HostStreamConsumer {
962
type Item = u32;
963
964
fn poll_consume(
965
self: Pin<&mut Self>,
966
cx: &mut Context<'_>,
967
mut store: StoreContextMut<'_, Data>,
968
mut source: Source<'_, Self::Item>,
969
finish: bool,
970
) -> Poll<Result<StreamResult>> {
971
let remaining = source.remaining(&mut store);
972
let state = match store.data_mut().host_stream_consumers.get_mut(&self.0) {
973
Some((state, _)) => state,
974
None => {
975
log::debug!("stream consume: dropped {}", self.0);
976
return Poll::Ready(Ok(StreamResult::Dropped));
977
}
978
};
979
match state.kind {
980
HostStreamConsumerStateKind::Idle | HostStreamConsumerStateKind::Consumed(_) => {
981
if finish {
982
log::debug!("stream consume: cancel {}", self.0);
983
state.waker = None;
984
Poll::Ready(Ok(StreamResult::Cancelled))
985
} else {
986
log::debug!("stream consume: wait {}", self.0);
987
state.waker = Some(cx.waker().clone());
988
Poll::Pending
989
}
990
}
991
HostStreamConsumerStateKind::Consuming(amt) => {
992
// The writer is performing a zero-length write. We always
993
// complete that without updating our own state.
994
if remaining == 0 {
995
log::debug!("stream consume: completing zero-length write {}", self.0);
996
return Poll::Ready(Ok(StreamResult::Completed));
997
}
998
999
// If this is a zero-length read then block the writer but update our own state.
1000
if amt == 0 {
1001
log::debug!("stream consume: finishing zero-length read {}", self.0);
1002
state.kind = HostStreamConsumerStateKind::Consumed(Vec::new());
1003
state.waker = Some(cx.waker().clone());
1004
return Poll::Pending;
1005
}
1006
1007
// For non-zero sizes perform the read/copy.
1008
log::debug!("stream consume: done {}", self.0);
1009
let mut dst = Vec::with_capacity(amt as usize);
1010
source.read(&mut store, &mut dst).unwrap();
1011
let state = &mut store
1012
.data_mut()
1013
.host_stream_consumers
1014
.get_mut(&self.0)
1015
.unwrap()
1016
.0;
1017
state.kind = HostStreamConsumerStateKind::Consumed(dst);
1018
state.waker = None;
1019
Poll::Ready(Ok(StreamResult::Completed))
1020
}
1021
}
1022
}
1023
}
1024
1025
impl Drop for HostStreamConsumer {
1026
fn drop(&mut self) {
1027
log::debug!("stream consume: drop {}", self.0);
1028
}
1029
}
1030
1031
struct HostStreamProducer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>);
1032
1033
#[derive(Debug)]
1034
struct HostStreamProducerState {
1035
kind: HostStreamProducerStateKind,
1036
waker: Option<Waker>,
1037
}
1038
1039
#[derive(Debug)]
1040
enum HostStreamProducerStateKind {
1041
Idle,
1042
Writing(Vec<u32>),
1043
Wrote(u32),
1044
}
1045
1046
impl HostStreamProducerState {
1047
fn idle() -> Self {
1048
HostStreamProducerState {
1049
kind: HostStreamProducerStateKind::Idle,
1050
waker: None,
1051
}
1052
}
1053
1054
fn wake_by_ref(&mut self) {
1055
if let Some(waker) = self.waker.take() {
1056
waker.wake();
1057
}
1058
}
1059
}
1060
1061
impl StreamProducer<Data> for HostStreamProducer {
1062
type Item = u32;
1063
type Buffer = VecBuffer<u32>;
1064
1065
fn poll_produce(
1066
self: Pin<&mut Self>,
1067
cx: &mut Context<'_>,
1068
mut store: StoreContextMut<'_, Data>,
1069
mut dst: Destination<'_, Self::Item, Self::Buffer>,
1070
finish: bool,
1071
) -> Poll<Result<StreamResult>> {
1072
let remaining = dst.remaining(&mut store);
1073
let data = store.data_mut();
1074
let state = match data.host_stream_producers.get_mut(&self.0) {
1075
Some((state, _)) => state,
1076
None => {
1077
log::debug!("stream produce: dropped {}", self.0);
1078
return Poll::Ready(Ok(StreamResult::Dropped));
1079
}
1080
};
1081
match &mut state.kind {
1082
HostStreamProducerStateKind::Idle | HostStreamProducerStateKind::Wrote(_) => {
1083
if finish {
1084
log::debug!("stream produce: cancel {}", self.0);
1085
state.waker = None;
1086
Poll::Ready(Ok(StreamResult::Cancelled))
1087
} else {
1088
log::debug!("stream produce: wait {}", self.0);
1089
state.waker = Some(cx.waker().clone());
1090
Poll::Pending
1091
}
1092
}
1093
HostStreamProducerStateKind::Writing(buf) => {
1094
// Keep the other side blocked for a zero-length write
1095
// originated from the host.
1096
if buf.len() == 0 {
1097
log::debug!("stream produce: zero-length write {}", self.0);
1098
state.kind = HostStreamProducerStateKind::Wrote(0);
1099
state.waker = Some(cx.waker().clone());
1100
return Poll::Pending;
1101
}
1102
log::debug!("stream produce: write {}", self.0);
1103
match remaining {
1104
Some(amt) => {
1105
// If the guest is doing a zero-length read then we've
1106
// got some data for them. Complete the read but leave
1107
// ourselves in the same `Writing` state as before.
1108
if amt == 0 {
1109
state.waker = None;
1110
return Poll::Ready(Ok(StreamResult::Completed));
1111
}
1112
1113
// Don't let wasmtime buffer up data for us, so truncate
1114
// the buffer we're sending over to the amount that the
1115
// reader is requesting.
1116
if amt < buf.len() {
1117
buf.truncate(amt);
1118
}
1119
}
1120
1121
// At this time host<->host stream reads/writes aren't
1122
// fuzzed since that brings up a bunch of weird edge cases
1123
// which aren't fun to deal with and aren't interesting
1124
// either.
1125
None => unreachable!(),
1126
}
1127
let count = buf.len() as u32;
1128
dst.set_buffer(mem::take(buf).into());
1129
state.kind = HostStreamProducerStateKind::Wrote(count);
1130
state.waker = None;
1131
Poll::Ready(Ok(StreamResult::Completed))
1132
}
1133
}
1134
}
1135
}
1136
1137
impl Drop for HostStreamProducer {
1138
fn drop(&mut self) {
1139
log::debug!("stream produce: drop {}", self.0);
1140
}
1141
}
1142
1143
struct SharedStream(Scope);
1144
1145
impl SharedStream {
1146
async fn next(&mut self, accessor: &Accessor<Data>) -> Option<Command> {
1147
std::future::poll_fn(|cx| {
1148
accessor.with(|mut store| {
1149
self.poll(cx, store.as_context_mut(), false)
1150
.map(|pair| match pair {
1151
(None, StreamResult::Dropped) => None,
1152
(Some(item), StreamResult::Completed) => Some(item),
1153
_ => unreachable!(),
1154
})
1155
})
1156
})
1157
.await
1158
}
1159
1160
fn poll(
1161
&mut self,
1162
cx: &mut Context<'_>,
1163
mut store: StoreContextMut<'_, Data>,
1164
finish: bool,
1165
) -> Poll<(Option<Command>, StreamResult)> {
1166
let data = store.data_mut();
1167
1168
// If no more commands remain then this is a closed and dropped stream.
1169
let Some((scope, command)) = data.commands.last_mut() else {
1170
log::debug!("Stream closed: {:?}", self.0);
1171
return Poll::Ready((None, StreamResult::Dropped));
1172
};
1173
1174
// If the next queued up command is for the scope that this stream is
1175
// attached to then send off the command.
1176
if *scope == self.0 {
1177
let ret = Some(*command);
1178
1179
// All commands are followed up with an "ack", and after the "ack"
1180
// is delivered then the command is popped to move on to the next
1181
// command. The reason for this is to guarantee that a command has
1182
// been processed before moving on to the next command. This helps
1183
// make the fuzzing easier to work with by being able to implicitly
1184
// assume that a command has been processed by the time something
1185
// else is. Otherwise it might be possible that wasmtime has a set
1186
// of commands/callbacks that are all delivered at the same time and
1187
// the component model doesn't specify what order they happen
1188
// within. By forcing an "ack" it ensures a more expected ordering
1189
// of execution to assist with fuzzing without losing really all
1190
// that much coverage.
1191
if matches!(command, Command::Ack) {
1192
data.commands.pop();
1193
} else {
1194
*command = Command::Ack;
1195
}
1196
1197
// After a command was popped other streams may be able to make
1198
// progress so wake them all up.
1199
for (_, waker) in data.wakers.drain() {
1200
waker.wake();
1201
}
1202
log::debug!("Delivering command {ret:?} for {:?}", self.0);
1203
return Poll::Ready((ret, StreamResult::Completed));
1204
}
1205
1206
// The command queue is non-empty and the next command isn't meant for
1207
// us, so someone else needs to drain the queue. Enqueue our waker.
1208
if finish {
1209
Poll::Ready((None, StreamResult::Cancelled))
1210
} else {
1211
data.wakers.insert(self.0, cx.waker().clone());
1212
Poll::Pending
1213
}
1214
}
1215
}
1216
1217
impl StreamProducer<Data> for SharedStream {
1218
type Item = Command;
1219
type Buffer = Option<Command>;
1220
1221
fn poll_produce<'a>(
1222
mut self: Pin<&mut Self>,
1223
cx: &mut Context<'_>,
1224
store: StoreContextMut<'a, Data>,
1225
mut destination: Destination<'a, Self::Item, Self::Buffer>,
1226
finish: bool,
1227
) -> Poll<Result<StreamResult>> {
1228
let (item, result) = std::task::ready!(self.poll(cx, store, finish));
1229
destination.set_buffer(item);
1230
Poll::Ready(Ok(result))
1231
}
1232
}
1233
1234
#[cfg(test)]
1235
mod tests {
1236
use super::{ComponentAsync, Scope, init, run};
1237
use crate::oracles::component_async::types::*;
1238
use crate::test::test_n_times;
1239
use Scope::*;
1240
1241
#[test]
1242
fn smoke() {
1243
init();
1244
1245
test_n_times(50, |c, _| {
1246
run(c);
1247
Ok(())
1248
});
1249
}
1250
1251
// ========================================================================
1252
// A series of fuzz-generated test cases which caused problems during the
1253
// development of this fuzzer. Feel free to delete/edit/etc if the fuzzer
1254
// changes over time.
1255
1256
#[test]
1257
fn simple() {
1258
init();
1259
1260
run(ComponentAsync {
1261
commands: vec![
1262
(GuestCaller, Command::AsyncPendingImportCall(0)),
1263
(GuestCallee, Command::AsyncPendingImportCall(1)),
1264
(GuestCallee, Command::AsyncPendingExportComplete(0)),
1265
(GuestCaller, Command::AsyncPendingImportAssertReady(0)),
1266
(GuestCaller, Command::AsyncPendingImportCall(2)),
1267
],
1268
});
1269
}
1270
1271
#[test]
1272
fn somewhat_larger() {
1273
static COMMANDS: &[(Scope, Command)] = &[
1274
(GuestCallee, Command::FutureNew(0)),
1275
(HostCaller, Command::FutureNew(1)),
1276
(GuestCallee, Command::FutureReadPending(0)),
1277
(GuestCaller, Command::AsyncPendingImportCall(2)),
1278
(GuestCaller, Command::AsyncPendingImportCall(3)),
1279
(GuestCaller, Command::AsyncPendingImportCall(4)),
1280
(GuestCaller, Command::AsyncPendingImportCall(5)),
1281
(GuestCallee, Command::AsyncPendingExportComplete(5)),
1282
(GuestCallee, Command::AsyncPendingExportComplete(3)),
1283
(GuestCallee, Command::AsyncPendingExportComplete(4)),
1284
(GuestCallee, Command::AsyncPendingExportComplete(2)),
1285
(GuestCaller, Command::AsyncPendingImportCall(6)),
1286
(GuestCallee, Command::AsyncPendingExportComplete(6)),
1287
(GuestCaller, Command::AsyncPendingImportCall(7)),
1288
(GuestCallee, Command::AsyncPendingExportComplete(7)),
1289
(GuestCaller, Command::AsyncPendingImportCall(8)),
1290
(GuestCallee, Command::AsyncPendingExportComplete(8)),
1291
(GuestCaller, Command::AsyncPendingImportCall(9)),
1292
(GuestCallee, Command::AsyncPendingExportComplete(9)),
1293
(GuestCaller, Command::AsyncPendingImportCall(10)),
1294
(GuestCallee, Command::AsyncPendingExportComplete(10)),
1295
(GuestCaller, Command::AsyncPendingImportCall(11)),
1296
(GuestCallee, Command::AsyncPendingExportComplete(11)),
1297
(GuestCaller, Command::AsyncPendingImportCall(12)),
1298
(GuestCallee, Command::AsyncPendingExportComplete(12)),
1299
(GuestCaller, Command::AsyncPendingImportCall(13)),
1300
(GuestCallee, Command::AsyncPendingExportComplete(13)),
1301
(GuestCaller, Command::AsyncPendingImportCall(14)),
1302
(GuestCallee, Command::AsyncPendingExportComplete(14)),
1303
(GuestCaller, Command::AsyncPendingImportCall(15)),
1304
(GuestCallee, Command::AsyncPendingExportComplete(15)),
1305
(GuestCaller, Command::AsyncPendingImportCall(16)),
1306
(GuestCallee, Command::AsyncPendingExportComplete(16)),
1307
(GuestCaller, Command::AsyncPendingImportCall(17)),
1308
(GuestCallee, Command::AsyncPendingExportComplete(17)),
1309
(GuestCaller, Command::AsyncPendingImportCall(18)),
1310
(GuestCallee, Command::AsyncPendingExportComplete(18)),
1311
(GuestCaller, Command::AsyncPendingImportCall(19)),
1312
(GuestCallee, Command::AsyncPendingExportComplete(19)),
1313
(GuestCaller, Command::AsyncPendingImportCall(20)),
1314
(GuestCallee, Command::AsyncPendingExportComplete(20)),
1315
(GuestCaller, Command::AsyncPendingImportCall(21)),
1316
(GuestCallee, Command::AsyncPendingExportComplete(21)),
1317
(GuestCaller, Command::AsyncPendingImportCall(22)),
1318
(GuestCallee, Command::AsyncPendingExportComplete(22)),
1319
(GuestCaller, Command::AsyncPendingImportCall(23)),
1320
(GuestCallee, Command::AsyncPendingExportComplete(23)),
1321
(GuestCaller, Command::AsyncPendingImportCall(24)),
1322
(GuestCallee, Command::AsyncPendingExportComplete(24)),
1323
(GuestCaller, Command::AsyncPendingImportCall(25)),
1324
(GuestCallee, Command::AsyncPendingExportComplete(25)),
1325
(GuestCaller, Command::AsyncPendingImportCall(26)),
1326
(GuestCallee, Command::AsyncPendingExportComplete(26)),
1327
(GuestCaller, Command::AsyncPendingImportCall(27)),
1328
(GuestCallee, Command::AsyncPendingExportComplete(27)),
1329
(GuestCaller, Command::AsyncPendingImportCall(28)),
1330
(GuestCallee, Command::AsyncPendingExportComplete(28)),
1331
(GuestCaller, Command::AsyncPendingImportCall(29)),
1332
(GuestCallee, Command::AsyncPendingExportComplete(29)),
1333
(GuestCaller, Command::AsyncPendingImportCall(30)),
1334
(GuestCallee, Command::AsyncPendingExportComplete(30)),
1335
(GuestCaller, Command::AsyncPendingImportCall(31)),
1336
(GuestCallee, Command::AsyncPendingExportComplete(31)),
1337
(GuestCaller, Command::AsyncPendingImportCall(32)),
1338
(GuestCallee, Command::AsyncPendingExportComplete(32)),
1339
(GuestCaller, Command::AsyncPendingImportCall(33)),
1340
(GuestCallee, Command::AsyncPendingExportComplete(33)),
1341
(GuestCaller, Command::AsyncPendingImportCall(34)),
1342
(GuestCallee, Command::AsyncPendingExportComplete(34)),
1343
(GuestCaller, Command::AsyncPendingImportCall(35)),
1344
(GuestCallee, Command::AsyncPendingExportComplete(35)),
1345
(GuestCaller, Command::AsyncPendingImportCall(36)),
1346
(GuestCallee, Command::AsyncPendingExportComplete(36)),
1347
(GuestCaller, Command::AsyncPendingImportCall(37)),
1348
(GuestCallee, Command::AsyncPendingExportComplete(37)),
1349
(GuestCaller, Command::AsyncPendingImportAssertReady(36)),
1350
];
1351
init();
1352
1353
run(ComponentAsync {
1354
commands: COMMANDS.to_vec(),
1355
});
1356
}
1357
1358
#[test]
1359
fn simple_stream1() {
1360
init();
1361
1362
run(ComponentAsync {
1363
commands: vec![
1364
(HostCallee, Command::StreamNew(1)),
1365
(
1366
HostCallee,
1367
Command::StreamReadPending(StreamReadPayload {
1368
stream: 1,
1369
count: 2,
1370
}),
1371
),
1372
(HostCallee, Command::StreamCancelRead(1)),
1373
(GuestCaller, Command::SyncReadyCall),
1374
(
1375
HostCallee,
1376
Command::StreamWritePending(StreamWritePayload {
1377
stream: 1,
1378
item: 3,
1379
count: 2,
1380
}),
1381
),
1382
(HostCallee, Command::StreamCancelWrite(1)),
1383
(HostCallee, Command::StreamDropWritable(1)),
1384
(
1385
HostCallee,
1386
Command::StreamReadDropped(StreamReadPayload {
1387
stream: 1,
1388
count: 1,
1389
}),
1390
),
1391
],
1392
});
1393
}
1394
1395
#[test]
1396
fn simple_stream3() {
1397
init();
1398
1399
run(ComponentAsync {
1400
commands: vec![
1401
(GuestCaller, Command::StreamNew(26)),
1402
(
1403
GuestCaller,
1404
Command::StreamReadPending(StreamReadPayload {
1405
stream: 26,
1406
count: 10,
1407
}),
1408
),
1409
(GuestCaller, Command::StreamDropWritable(26)),
1410
(GuestCaller, Command::StreamReadAssertDropped(26)),
1411
],
1412
});
1413
}
1414
1415
#[test]
1416
fn simple_stream4() {
1417
init();
1418
1419
run(ComponentAsync {
1420
commands: vec![
1421
(GuestCaller, Command::StreamNew(23)),
1422
(
1423
GuestCaller,
1424
Command::StreamWritePending(StreamWritePayload {
1425
stream: 23,
1426
item: 24,
1427
count: 2,
1428
}),
1429
),
1430
(GuestCaller, Command::StreamGive(23)),
1431
(GuestCallee, Command::StreamDropReadable(23)),
1432
(
1433
GuestCaller,
1434
Command::StreamWriteAssertDropped(StreamReadPayload {
1435
stream: 23,
1436
count: 0,
1437
}),
1438
),
1439
],
1440
});
1441
}
1442
1443
#[test]
1444
fn zero_length_behavior() {
1445
init();
1446
1447
run(ComponentAsync {
1448
commands: vec![
1449
(GuestCaller, Command::StreamNew(10)),
1450
(HostCaller, Command::StreamTake(10)),
1451
(
1452
GuestCaller,
1453
Command::StreamWritePending(StreamWritePayload {
1454
stream: 10,
1455
item: 13,
1456
count: 5,
1457
}),
1458
),
1459
(
1460
HostCaller,
1461
Command::StreamReadReady(StreamReadyPayload {
1462
stream: 10,
1463
item: 0,
1464
op_count: 0,
1465
ready_count: 0,
1466
}),
1467
),
1468
(
1469
HostCaller,
1470
Command::StreamReadReady(StreamReadyPayload {
1471
stream: 10,
1472
item: 0,
1473
op_count: 0,
1474
ready_count: 0,
1475
}),
1476
),
1477
],
1478
});
1479
}
1480
}
1481
1482