Path: blob/main/crates/fuzzing/src/oracles/component_async.rs
3061 views
//! For a high-level overview of this fuzz target see `fuzz_async.rs`12use crate::block_on;3use crate::generators::component_async::exports::wasmtime_fuzz::fuzz::async_test::Guest;4use crate::generators::component_async::wasmtime_fuzz::fuzz::async_test::{self, Command};5use crate::generators::component_async::wasmtime_fuzz::fuzz::types;6use crate::generators::component_async::{ComponentAsync, FuzzAsyncPre, Scope};7use futures::channel::oneshot;8use std::collections::{HashMap, HashSet};9use std::mem;10use std::pin::Pin;11use std::sync::{Arc, OnceLock, Weak};12use std::task::{Context, Poll, Waker};13use std::time::Instant;14use wasmtime::component::{15Access, Accessor, AccessorTask, Component, Destination, FutureConsumer, FutureProducer,16FutureReader, HasSelf, Linker, ResourceTable, Source, StreamConsumer, StreamProducer,17StreamReader, StreamResult, VecBuffer,18};19use wasmtime::{AsContextMut, Config, Engine, Result, Store, StoreContextMut};20use wasmtime_wasi::{WasiCtx, WasiCtxView, WasiView};2122static STATE: OnceLock<(Engine, FuzzAsyncPre<Data>)> = OnceLock::new();2324/// Initializes state for future fuzz runs.25///26/// This will create an `Engine` to run this fuzzer within and it will27/// additionally precompile the component that will be used for fuzzing.28///29/// There are a few points of note about this:30///31/// * The `misc` fuzzer is manually instrumented with this function as the init32/// hook to ensure this runs before any other fuzzing.33///34/// * Compilation of the component takes quite some time with35/// fuzzing-instrumented Cranelift. To assist with local development this36/// implements a cache which is serialized/deserialized via an env var.37pub fn init() {38crate::init_fuzzing();3940STATE.get_or_init(|| {41let mut config = Config::new();42config.wasm_component_model_async(true);43let engine = Engine::new(&config).unwrap();44let component = compile(&engine);45let mut linker = Linker::new(&engine);46wasmtime_wasi::p2::add_to_linker_async(&mut linker).unwrap();47async_test::add_to_linker::<_, HasSelf<Data>>(&mut linker, |d| d).unwrap();48types::add_to_linker::<_, HasSelf<Data>>(&mut linker, |d| d).unwrap();4950let pre = linker.instantiate_pre(&component).unwrap();51let pre = FuzzAsyncPre::new(pre).unwrap();5253(engine, pre)54});5556fn compile(engine: &Engine) -> Component {57let wasm_path = test_programs_artifacts::FUZZ_ASYNC_COMPONENT;58let wasm = test_programs_artifacts::fuzz_async_component_bytes!();59let wasm = &wasm[..];60let cwasm_cache = std::env::var("COMPONENT_ASYNC_CWASM_CACHE").ok();61if let Some(path) = &cwasm_cache62&& let Ok(cwasm_mtime) = std::fs::metadata(&path).and_then(|m| m.modified())63&& let Ok(wasm_mtime) = std::fs::metadata(wasm_path).and_then(|m| m.modified())64&& cwasm_mtime > wasm_mtime65{66log::debug!("Using cached component async cwasm at {path}");67unsafe {68return Component::deserialize_file(engine, path).unwrap();69}70}7172let composition = {73let mut config = wasm_compose::config::Config::default();74let tempdir = tempfile::TempDir::new().unwrap();75let path = tempdir.path().join("fuzz-async.wasm");76std::fs::write(&path, wasm).unwrap();77config.definitions.push(path.clone());7879wasm_compose::composer::ComponentComposer::new(&path, &config)80.compose()81.unwrap()82};83let start = Instant::now();84let component = Component::new(&engine, &composition).unwrap();85if let Some(path) = cwasm_cache {86log::debug!("Caching component async cwasm to {path}");87std::fs::write(path, &component.serialize().unwrap()).unwrap();88} else if start.elapsed() > std::time::Duration::from_secs(1) {89eprintln!(90"91!!!!!!!!!!!!!!!!!!!!!!!!!!9293Component compilation is slow, try setting `COMPONENT_ASYNC_CWASM_CACHE=path` to94cache compilation results9596!!!!!!!!!!!!!!!!!!!!!!!!!!97"98);99}100return component;101}102}103104#[derive(Default)]105struct Data {106ctx: WasiCtx,107table: ResourceTable,108wakers: HashMap<Scope, Waker>,109commands: Vec<(Scope, Command)>,110111guest_caller_stream: Option<StreamReader<Command>>,112guest_callee_stream: Option<StreamReader<Command>>,113114host_pending_async_calls: HashMap<u32, oneshot::Sender<()>>,115host_pending_async_calls_cancelled: HashSet<u32>,116guest_pending_async_calls_ready: HashSet<u32>,117118// State of futures/streams. Note that while #12091 is unresolved an119// `Arc`/`Weak` combo is used to detect when wasmtime drops futures/streams120// and the various halves we're interacting with using traits.121host_futures: HashMap<u32, FutureReader<u32>>,122host_future_producers: HashMap<u32, (HostFutureProducerState, Weak<()>)>,123host_future_consumers: HashMap<u32, (HostFutureConsumerState, Weak<()>)>,124host_streams: HashMap<u32, StreamReader<u32>>,125host_stream_producers: HashMap<u32, (HostStreamProducerState, Weak<()>)>,126host_stream_consumers: HashMap<u32, (HostStreamConsumerState, Weak<()>)>,127}128129impl WasiView for Data {130fn ctx(&mut self) -> WasiCtxView<'_> {131WasiCtxView {132ctx: &mut self.ctx,133table: &mut self.table,134}135}136}137138impl async_test::HostWithStore for HasSelf<Data> {139async fn async_ready<T>(_store: &Accessor<T, Self>) {}140141async fn async_pending<T>(store: &Accessor<T, Self>, id: u32) {142let (tx, rx) = oneshot::channel();143store.with(|mut s| s.get().host_pending_async_calls.insert(id, tx));144let record = RecordCancelOnDrop { store, id };145rx.await.unwrap();146mem::forget(record);147148struct RecordCancelOnDrop<'a, T: 'static> {149store: &'a Accessor<T, HasSelf<Data>>,150id: u32,151}152153impl<T> Drop for RecordCancelOnDrop<'_, T> {154fn drop(&mut self) {155self.store.with(|mut s| {156s.get().host_pending_async_calls_cancelled.insert(self.id);157});158}159}160}161162async fn init<T>(_store: &Accessor<T, Self>, _scope: types::Scope) {}163}164165impl async_test::Host for Data {166fn sync_ready(&mut self) {}167168fn future_take(&mut self, id: u32) -> FutureReader<u32> {169self.host_futures.remove(&id).unwrap()170}171172fn future_receive(&mut self, id: u32, future: FutureReader<u32>) {173let prev = self.host_futures.insert(id, future);174assert!(prev.is_none());175}176177fn stream_take(&mut self, id: u32) -> StreamReader<u32> {178self.host_streams.remove(&id).unwrap()179}180181fn stream_receive(&mut self, id: u32, stream: StreamReader<u32>) {182let prev = self.host_streams.insert(id, stream);183assert!(prev.is_none());184}185}186187impl types::HostWithStore for HasSelf<Data> {188fn get_commands<T>(189mut store: Access<'_, T, Self>,190scope: types::Scope,191) -> StreamReader<Command> {192let data = store.get();193match scope {194types::Scope::Caller => data.guest_caller_stream.take().unwrap(),195types::Scope::Callee => data.guest_callee_stream.take().unwrap(),196}197}198}199200impl types::Host for Data {}201202/// Executes the `input` provided, assuming that `init` has been previously203/// executed.204pub fn run(mut input: ComponentAsync) {205log::debug!("Running component async fuzz test with\n{input:?}");206207// Commands are executed in the order that they're listed in the input, but208// to make it easier on the `StreamProducer` implementation below they're209// popped off the back. To ensure that they're all delivered in the right210// order reverse the list to ensure the correct order is maintained.211input.commands.reverse();212213let (engine, pre) = STATE.get().unwrap();214let mut store = Store::new(215engine,216Data {217ctx: WasiCtx::builder().inherit_stdio().inherit_env().build(),218commands: input.commands,219..Data::default()220},221);222223let guest_caller_stream = StreamReader::new(&mut store, SharedStream(Scope::GuestCaller));224let guest_callee_stream = StreamReader::new(&mut store, SharedStream(Scope::GuestCallee));225store.data_mut().guest_caller_stream = Some(guest_caller_stream);226store.data_mut().guest_callee_stream = Some(guest_callee_stream);227block_on(async {228let instance = pre.instantiate_async(&mut store).await.unwrap();229let test = instance.wasmtime_fuzz_fuzz_async_test();230231let mut host_caller = SharedStream(Scope::HostCaller);232let mut host_callee = SharedStream(Scope::HostCallee);233store234.run_concurrent(async |store| {235// Kick off stream reads in the guest. This function will return236// but the tasks in the guest will keep running after they237// return to process stream items.238test.call_init(store, types::Scope::Caller).await.unwrap();239240// Simultaneously process commands from both host streams. These241// will return once the entire command queue is exhausted.242futures::join!(243async {244while let Some(cmd) = host_caller.next(store).await {245host_caller_cmd(&test, store, cmd).await;246}247},248async {249while let Some(cmd) = host_callee.next(store).await {250host_callee_cmd(store, cmd).await;251}252},253);254255// Note that there may still be pending async work in the guest256// (or host). It's intentional that it's not cleaned up here to257// help test situations where async work is all abruptly258// cancelled by just being dropped in the host.259})260.await261.unwrap();262});263}264265/// See documentation in `fuzz_async.rs` for what's going on here.266async fn test_property<F>(store: &Accessor<Data>, mut f: F) -> bool267where268F: FnMut(&mut Data) -> bool,269{270for _ in 0..1000 {271let ready = store.with(|mut s| f(s.get()));272if ready {273return true;274}275276crate::YieldN(1).await;277}278279return false;280}281282async fn await_property<F>(store: &Accessor<Data>, desc: &str, f: F)283where284F: FnMut(&mut Data) -> bool,285{286assert!(287test_property(store, f).await,288"timed out waiting for {desc}",289);290}291292async fn host_caller_cmd(test: &Guest, store: &Accessor<Data>, cmd: Command) {293match cmd {294Command::Ack => {}295Command::SyncReadyCall => test.call_sync_ready(store).await.unwrap(),296Command::AsyncReadyCall => test.call_async_ready(store).await.unwrap(),297Command::AsyncPendingExportComplete(_i) => todo!(),298Command::AsyncPendingExportAssertCancelled(_i) => todo!(),299Command::AsyncPendingImportCall(i) => {300struct RunPendingImport {301test: Guest,302i: u32,303}304305store.spawn(RunPendingImport {306test: test.clone(),307i,308});309310impl AccessorTask<Data> for RunPendingImport {311async fn run(self, store: &Accessor<Data>) -> Result<()> {312self.test.call_async_pending(store, self.i).await?;313store.with(|mut s| {314s.get().guest_pending_async_calls_ready.insert(self.i);315});316Ok(())317}318}319}320Command::AsyncPendingImportCancel(_i) => todo!(),321Command::AsyncPendingImportAssertReady(i) => {322assert!(323test_property(store, |s| s.guest_pending_async_calls_ready.remove(&i)).await,324"expected async_pending import {i} to be ready",325);326}327328Command::FutureTake(i) => {329let future = test.call_future_take(store, i).await.unwrap();330store.with(|mut s| {331let prev = s.get().host_futures.insert(i, future);332assert!(prev.is_none());333});334}335Command::FutureGive(i) => {336let future = store.with(|mut s| s.get().host_futures.remove(&i).unwrap());337test.call_future_receive(store, i, future).await.unwrap();338}339Command::StreamTake(i) => {340let stream = test.call_stream_take(store, i).await.unwrap();341store.with(|mut s| {342let prev = s.get().host_streams.insert(i, stream);343assert!(prev.is_none());344});345}346Command::StreamGive(i) => {347let stream = store.with(|mut s| s.get().host_streams.remove(&i).unwrap());348test.call_stream_receive(store, i, stream).await.unwrap();349}350351other => future_or_stream_cmd(store, other).await,352}353}354355async fn host_callee_cmd(store: &Accessor<Data>, cmd: Command) {356match cmd {357Command::Ack => {}358Command::SyncReadyCall => todo!(),359Command::AsyncReadyCall => todo!(),360Command::AsyncPendingExportComplete(i) => store.with(|mut s| {361s.get()362.host_pending_async_calls363.remove(&i)364.unwrap()365.send(())366.unwrap();367}),368Command::AsyncPendingExportAssertCancelled(i) => {369assert!(370test_property(store, |s| s.host_pending_async_calls_cancelled.remove(&i)).await,371"expected async_pending export {i} to be cancelled",372);373}374Command::AsyncPendingImportCall(_i) => todo!(),375Command::AsyncPendingImportCancel(_i) => todo!(),376Command::AsyncPendingImportAssertReady(_i) => todo!(),377378other => future_or_stream_cmd(store, other).await,379}380}381382async fn future_or_stream_cmd(store: &Accessor<Data>, cmd: Command) {383match cmd {384// These commands should be handled above385Command::Ack386| Command::SyncReadyCall387| Command::AsyncReadyCall388| Command::AsyncPendingExportComplete(_)389| Command::AsyncPendingExportAssertCancelled(_)390| Command::AsyncPendingImportCall(_)391| Command::AsyncPendingImportCancel(_)392| Command::FutureTake(_)393| Command::FutureGive(_)394| Command::StreamTake(_)395| Command::StreamGive(_)396| Command::AsyncPendingImportAssertReady(_) => unreachable!(),397398Command::FutureNew(id) => {399store.with(|mut s| {400let arc = Arc::new(());401let weak = Arc::downgrade(&arc);402let future = FutureReader::new(&mut s, HostFutureProducer(id, arc));403let data = s.get();404let prev = data.host_futures.insert(id, future);405assert!(prev.is_none());406let prev = data407.host_future_producers408.insert(id, (HostFutureProducerState::Idle, weak));409assert!(prev.is_none());410});411}412Command::FutureDropReadable(id) => {413store.with(|mut s| match s.get().host_futures.remove(&id) {414Some(mut future) => future.close(&mut s),415None => {416let (mut state, _weak) = s.get().host_future_consumers.remove(&id).unwrap();417state.wake_by_ref();418}419})420}421Command::FutureWriteReady(payload) => {422await_property(store, "future write should be waiting", |s| {423matches!(424s.host_future_producers.get(&payload.future),425Some((HostFutureProducerState::Waiting(_), _))426)427})428.await;429store.with(|mut s| {430let state = s431.get()432.host_future_producers433.get_mut(&payload.future)434.unwrap();435match state {436(HostFutureProducerState::Waiting(waker), _) => {437waker.wake_by_ref();438state.0 = HostFutureProducerState::Writing(payload.item);439}440(state, _) => panic!("future not waiting: {state:?}"),441}442})443}444Command::FutureWritePending(payload) => store.with(|mut s| {445let state = s446.get()447.host_future_producers448.get_mut(&payload.future)449.unwrap();450match state {451(HostFutureProducerState::Idle, _) => {452state.0 = HostFutureProducerState::Writing(payload.item);453}454_ => panic!("future not idle"),455}456}),457Command::FutureWriteDropped(id) => store.with(|mut s| {458let (state, weak) = s.get().host_future_producers.remove(&id).unwrap();459assert!(matches!(state, HostFutureProducerState::Idle));460assert!(weak.upgrade().is_none());461}),462Command::FutureReadReady(payload) => {463let id = payload.future;464store.with(|mut s| {465let arc = Arc::new(());466let weak = Arc::downgrade(&arc);467let data = s.get();468let future = data.host_futures.remove(&id).unwrap();469let prev = data470.host_future_consumers471.insert(id, (HostFutureConsumerState::Consuming, weak));472assert!(prev.is_none());473future.pipe(&mut s, HostFutureConsumer(id, arc));474});475476await_property(store, "future should be present", |s| {477matches!(478s.host_future_consumers[&id],479(HostFutureConsumerState::Complete(_), _)480)481})482.await;483484store.with(|mut s| {485let (state, _) = s.get().host_future_consumers.remove(&id).unwrap();486match state {487HostFutureConsumerState::Complete(i) => assert_eq!(i, payload.item),488_ => panic!("future not complete"),489}490});491}492Command::FutureReadPending(id) => {493ensure_future_reading(store, id);494store.with(|mut s| {495let (state, _) = s.get().host_future_consumers.get_mut(&id).unwrap();496state.wake_by_ref();497assert!(498matches!(state, HostFutureConsumerState::Idle),499"bad state: {state:?}",500);501*state = HostFutureConsumerState::Consuming;502})503}504Command::FutureCancelWrite(id) => store.with(|mut s| {505let (state, _) = s.get().host_future_producers.get_mut(&id).unwrap();506assert!(matches!(state, HostFutureProducerState::Writing(_)));507*state = HostFutureProducerState::Idle;508}),509Command::FutureCancelRead(id) => store.with(|mut s| {510let (state, _) = s.get().host_future_consumers.get_mut(&id).unwrap();511assert!(matches!(state, HostFutureConsumerState::Consuming));512*state = HostFutureConsumerState::Idle;513}),514Command::FutureReadAssertComplete(payload) => {515await_property(store, "future read should be complete", |s| {516matches!(517s.host_future_consumers.get(&payload.future),518Some((HostFutureConsumerState::Complete(_), _))519)520})521.await;522store.with(|mut s| {523let (state, _) = s524.get()525.host_future_consumers526.remove(&payload.future)527.unwrap();528match state {529HostFutureConsumerState::Complete(i) => assert_eq!(i, payload.item),530_ => panic!("future not complete"),531}532})533}534Command::FutureWriteAssertComplete(id) => store.with(|mut s| {535let (state, weak) = s.get().host_future_producers.remove(&id).unwrap();536assert!(matches!(state, HostFutureProducerState::Complete));537assert!(weak.upgrade().is_none());538}),539Command::FutureWriteAssertDropped(id) => store.with(|mut s| {540let (state, weak) = s.get().host_future_producers.remove(&id).unwrap();541assert!(matches!(state, HostFutureProducerState::Writing(_)));542assert!(weak.upgrade().is_none());543}),544545Command::StreamNew(id) => {546store.with(|mut s| {547let arc = Arc::new(());548let weak = Arc::downgrade(&arc);549let stream = StreamReader::new(&mut s, HostStreamProducer(id, arc));550let data = s.get();551let prev = data.host_streams.insert(id, stream);552assert!(prev.is_none());553let prev = data554.host_stream_producers555.insert(id, (HostStreamProducerState::idle(), weak));556assert!(prev.is_none());557});558}559Command::StreamDropReadable(id) => {560store.with(|mut s| match s.get().host_streams.remove(&id) {561Some(mut stream) => {562stream.close(&mut s);563}564None => {565let (mut state, _weak) = s.get().host_stream_consumers.remove(&id).unwrap();566state.wake_by_ref();567}568})569}570Command::StreamDropWritable(id) => store.with(|mut s| {571let (mut state, _weak) = s.get().host_stream_producers.remove(&id).unwrap();572state.wake_by_ref();573}),574Command::StreamWriteReady(payload) => {575let id = payload.stream;576store.with(|mut s| {577let (state, _) = s.get().host_stream_producers.get_mut(&id).unwrap();578state.wake_by_ref();579match state.kind {580HostStreamProducerStateKind::Idle => {581state.kind = HostStreamProducerStateKind::Writing(stream_payload(582payload.item,583payload.op_count,584));585}586_ => panic!("stream not idle: {state:?}"),587}588});589await_property(store, "stream should complete a write", |s| {590matches!(591s.host_stream_producers[&id].0.kind,592HostStreamProducerStateKind::Wrote(_),593)594})595.await;596store.with(|mut s| {597let (state, _) = s.get().host_stream_producers.get_mut(&id).unwrap();598match state.kind {599HostStreamProducerStateKind::Wrote(amt) => {600assert_eq!(amt, payload.ready_count);601state.kind = HostStreamProducerStateKind::Idle;602}603_ => panic!("stream not idle: {state:?}"),604}605});606}607Command::StreamReadReady(payload) => {608let id = payload.stream;609ensure_stream_reading(store, id);610store.with(|mut s| {611let (state, _) = s.get().host_stream_consumers.get_mut(&id).unwrap();612state.wake_by_ref();613state.kind = HostStreamConsumerStateKind::Consuming(payload.op_count);614});615await_property(store, "stream should complete a read", |s| {616matches!(617s.host_stream_consumers[&id].0.kind,618HostStreamConsumerStateKind::Consumed(_),619)620})621.await;622623store.with(|mut s| {624let (state, _) = s.get().host_stream_consumers.get_mut(&id).unwrap();625match &state.kind {626HostStreamConsumerStateKind::Consumed(last_read) => {627assert_eq!(628*last_read,629stream_payload(payload.item, payload.ready_count)630);631state.kind = HostStreamConsumerStateKind::Idle;632}633_ => panic!("future not complete"),634}635});636}637Command::StreamWritePending(payload) => store.with(|mut s| {638let (state, _) = s639.get()640.host_stream_producers641.get_mut(&payload.stream)642.unwrap();643state.wake_by_ref();644match state.kind {645HostStreamProducerStateKind::Idle => {646state.kind = HostStreamProducerStateKind::Writing(stream_payload(647payload.item,648payload.count,649));650}651_ => panic!("stream not idle {:?}", state.kind),652}653}),654Command::StreamReadPending(payload) => {655ensure_stream_reading(store, payload.stream);656store.with(|mut s| {657let (state, _) = s658.get()659.host_stream_consumers660.get_mut(&payload.stream)661.unwrap();662state.wake_by_ref();663assert!(matches!(state.kind, HostStreamConsumerStateKind::Idle));664state.kind = HostStreamConsumerStateKind::Consuming(payload.count);665})666}667Command::StreamWriteDropped(payload) => store.with(|mut s| {668let (state, weak) = s669.get()670.host_stream_producers671.get_mut(&payload.stream)672.unwrap();673assert!(matches!(state.kind, HostStreamProducerStateKind::Idle));674assert!(weak.upgrade().is_none());675}),676Command::StreamReadDropped(payload) => {677ensure_stream_reading(store, payload.stream);678await_property(store, "stream read should get dropped", |s| {679let weak = &s.host_stream_consumers[&payload.stream].1;680weak.upgrade().is_none()681})682.await;683store.with(|mut s| {684let (state, weak) = s685.get()686.host_stream_consumers687.get_mut(&payload.stream)688.unwrap();689assert!(matches!(state.kind, HostStreamConsumerStateKind::Idle));690assert!(weak.upgrade().is_none());691})692}693Command::StreamCancelWrite(id) => store.with(|mut s| {694let (state, _) = s.get().host_stream_producers.get_mut(&id).unwrap();695assert!(696matches!(state.kind, HostStreamProducerStateKind::Writing(_)),697"invalid state {state:?}",698);699state.kind = HostStreamProducerStateKind::Idle;700state.wake_by_ref();701}),702Command::StreamCancelRead(id) => store.with(|mut s| {703let (state, _) = s.get().host_stream_consumers.get_mut(&id).unwrap();704assert!(matches!(705state.kind,706HostStreamConsumerStateKind::Consuming(_)707));708state.kind = HostStreamConsumerStateKind::Idle;709}),710Command::StreamReadAssertComplete(payload) => store.with(|mut s| {711let (state, _) = s712.get()713.host_stream_consumers714.get_mut(&payload.stream)715.unwrap();716match &state.kind {717HostStreamConsumerStateKind::Consumed(last_read) => {718assert_eq!(*last_read, stream_payload(payload.item, payload.count));719state.kind = HostStreamConsumerStateKind::Idle;720}721_ => panic!("stream not complete"),722}723}),724Command::StreamWriteAssertComplete(payload) => store.with(|mut s| {725let (state, _) = s726.get()727.host_stream_producers728.get_mut(&payload.stream)729.unwrap();730match state.kind {731HostStreamProducerStateKind::Wrote(amt) => {732assert_eq!(amt, payload.count);733state.kind = HostStreamProducerStateKind::Idle;734}735_ => panic!("stream not complete: {:?}", state.kind),736}737}),738Command::StreamWriteAssertDropped(payload) => {739await_property(store, "stream write should be dropped", |s| {740let weak = &s.host_stream_producers[&payload.stream].1;741weak.upgrade().is_none()742})743.await;744store.with(|mut s| {745let (state, weak) = s746.get()747.host_stream_producers748.get_mut(&payload.stream)749.unwrap();750assert!(matches!(751state.kind,752HostStreamProducerStateKind::Writing(_)753));754assert!(weak.upgrade().is_none());755})756}757Command::StreamReadAssertDropped(id) => {758await_property(store, "stream read should be dropped", |s| {759let weak = &s.host_stream_consumers[&id].1;760weak.upgrade().is_none()761})762.await;763store.with(|mut s| {764let (state, weak) = s.get().host_stream_consumers.get_mut(&id).unwrap();765assert!(matches!(766state.kind,767HostStreamConsumerStateKind::Consuming(_),768));769assert!(weak.upgrade().is_none());770})771}772}773}774775fn stream_payload(item: u32, count: u32) -> Vec<u32> {776(item..item + count).collect()777}778779fn ensure_future_reading(store: &Accessor<Data>, id: u32) {780store.with(|mut s| {781let data = s.get();782if !data.host_futures.contains_key(&id) {783return;784}785log::debug!("future consume: start {id}");786let arc = Arc::new(());787let weak = Arc::downgrade(&arc);788let data = s.get();789let future = data.host_futures.remove(&id).unwrap();790let prev = data791.host_future_consumers792.insert(id, (HostFutureConsumerState::Idle, weak));793assert!(prev.is_none());794future.pipe(&mut s, HostFutureConsumer(id, arc));795});796}797798fn ensure_stream_reading(store: &Accessor<Data>, id: u32) {799store.with(|mut s| {800let data = s.get();801if !data.host_streams.contains_key(&id) {802return;803}804log::debug!("stream consume: start {id}");805let arc = Arc::new(());806let weak = Arc::downgrade(&arc);807let prev = data.host_stream_consumers.insert(808id,809(810HostStreamConsumerState {811kind: HostStreamConsumerStateKind::Idle,812waker: None,813},814weak,815),816);817assert!(prev.is_none());818let stream = data.host_streams.remove(&id).unwrap();819stream.pipe(&mut s, HostStreamConsumer(id, arc));820});821}822823struct HostFutureConsumer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>);824825/// Note that this is only created once a read is actually initiated on a826/// future. It's also not possible to cancel a host-based read on a future,827/// hence why this is simpler than the `HostFutureProducerState` state below.828#[derive(Debug)]829enum HostFutureConsumerState {830Idle,831Waiting(Waker),832Consuming,833Complete(u32),834}835836impl HostFutureConsumerState {837fn wake_by_ref(&mut self) {838if let HostFutureConsumerState::Waiting(waker) = &self {839waker.wake_by_ref();840*self = HostFutureConsumerState::Idle;841}842}843}844845impl FutureConsumer<Data> for HostFutureConsumer {846type Item = u32;847848fn poll_consume(849self: Pin<&mut Self>,850cx: &mut Context<'_>,851mut store: StoreContextMut<'_, Data>,852mut source: Source<'_, Self::Item>,853finish: bool,854) -> Poll<Result<()>> {855let state = match store.data_mut().host_future_consumers.get_mut(&self.0) {856Some(state) => state,857None => {858log::debug!("consume: closed {}", self.0);859return Poll::Ready(Ok(()));860}861};862match state.0 {863HostFutureConsumerState::Idle | HostFutureConsumerState::Waiting(_) => {864if finish {865log::debug!("consume: cancel {}", self.0);866state.0 = HostFutureConsumerState::Idle;867Poll::Ready(Ok(()))868} else {869log::debug!("consume: wait {}", self.0);870state.0 = HostFutureConsumerState::Waiting(cx.waker().clone());871Poll::Pending872}873}874HostFutureConsumerState::Consuming => {875log::debug!("consume: done {}", self.0);876let mut item = None;877source.read(&mut store, &mut item).unwrap();878store879.data_mut()880.host_future_consumers881.get_mut(&self.0)882.unwrap()883.0 = HostFutureConsumerState::Complete(item.unwrap());884Poll::Ready(Ok(()))885}886HostFutureConsumerState::Complete(_) => unreachable!(),887}888}889}890891struct HostFutureProducer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>);892893#[derive(Debug)]894enum HostFutureProducerState {895Idle,896Waiting(Waker),897Writing(u32),898Complete,899}900901impl FutureProducer<Data> for HostFutureProducer {902type Item = u32;903904fn poll_produce(905self: Pin<&mut Self>,906cx: &mut Context<'_>,907mut store: StoreContextMut<'_, Data>,908finish: bool,909) -> Poll<Result<Option<Self::Item>>> {910let state = store911.data_mut()912.host_future_producers913.get_mut(&self.0)914.unwrap();915match state.0 {916HostFutureProducerState::Idle | HostFutureProducerState::Waiting(_) => {917if finish {918log::debug!("produce: cancel {}", self.0);919state.0 = HostFutureProducerState::Idle;920Poll::Ready(Ok(None))921} else {922log::debug!("produce: wait {}", self.0);923state.0 = HostFutureProducerState::Waiting(cx.waker().clone());924Poll::Pending925}926}927HostFutureProducerState::Writing(item) => {928log::debug!("produce: done {}", self.0);929state.0 = HostFutureProducerState::Complete;930Poll::Ready(Ok(Some(item)))931}932HostFutureProducerState::Complete => unreachable!(),933}934}935}936937struct HostStreamConsumer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>);938939#[derive(Debug)]940struct HostStreamConsumerState {941waker: Option<Waker>,942kind: HostStreamConsumerStateKind,943}944945#[derive(Debug)]946enum HostStreamConsumerStateKind {947Idle,948Consuming(u32),949Consumed(Vec<u32>),950}951952impl HostStreamConsumerState {953fn wake_by_ref(&mut self) {954if let Some(waker) = self.waker.take() {955waker.wake();956}957}958}959960impl StreamConsumer<Data> for HostStreamConsumer {961type Item = u32;962963fn poll_consume(964self: Pin<&mut Self>,965cx: &mut Context<'_>,966mut store: StoreContextMut<'_, Data>,967mut source: Source<'_, Self::Item>,968finish: bool,969) -> Poll<Result<StreamResult>> {970let remaining = source.remaining(&mut store);971let state = match store.data_mut().host_stream_consumers.get_mut(&self.0) {972Some((state, _)) => state,973None => {974log::debug!("stream consume: dropped {}", self.0);975return Poll::Ready(Ok(StreamResult::Dropped));976}977};978match state.kind {979HostStreamConsumerStateKind::Idle | HostStreamConsumerStateKind::Consumed(_) => {980if finish {981log::debug!("stream consume: cancel {}", self.0);982state.waker = None;983Poll::Ready(Ok(StreamResult::Cancelled))984} else {985log::debug!("stream consume: wait {}", self.0);986state.waker = Some(cx.waker().clone());987Poll::Pending988}989}990HostStreamConsumerStateKind::Consuming(amt) => {991// The writer is performing a zero-length write. We always992// complete that without updating our own state.993if remaining == 0 {994log::debug!("stream consume: completing zero-length write {}", self.0);995return Poll::Ready(Ok(StreamResult::Completed));996}997998// If this is a zero-length read then block the writer but update our own state.999if amt == 0 {1000log::debug!("stream consume: finishing zero-length read {}", self.0);1001state.kind = HostStreamConsumerStateKind::Consumed(Vec::new());1002state.waker = Some(cx.waker().clone());1003return Poll::Pending;1004}10051006// For non-zero sizes perform the read/copy.1007log::debug!("stream consume: done {}", self.0);1008let mut dst = Vec::with_capacity(amt as usize);1009source.read(&mut store, &mut dst).unwrap();1010let state = &mut store1011.data_mut()1012.host_stream_consumers1013.get_mut(&self.0)1014.unwrap()1015.0;1016state.kind = HostStreamConsumerStateKind::Consumed(dst);1017state.waker = None;1018Poll::Ready(Ok(StreamResult::Completed))1019}1020}1021}1022}10231024impl Drop for HostStreamConsumer {1025fn drop(&mut self) {1026log::debug!("stream consume: drop {}", self.0);1027}1028}10291030struct HostStreamProducer(u32, #[expect(dead_code, reason = "drop-tracking")] Arc<()>);10311032#[derive(Debug)]1033struct HostStreamProducerState {1034kind: HostStreamProducerStateKind,1035waker: Option<Waker>,1036}10371038#[derive(Debug)]1039enum HostStreamProducerStateKind {1040Idle,1041Writing(Vec<u32>),1042Wrote(u32),1043}10441045impl HostStreamProducerState {1046fn idle() -> Self {1047HostStreamProducerState {1048kind: HostStreamProducerStateKind::Idle,1049waker: None,1050}1051}10521053fn wake_by_ref(&mut self) {1054if let Some(waker) = self.waker.take() {1055waker.wake();1056}1057}1058}10591060impl StreamProducer<Data> for HostStreamProducer {1061type Item = u32;1062type Buffer = VecBuffer<u32>;10631064fn poll_produce(1065self: Pin<&mut Self>,1066cx: &mut Context<'_>,1067mut store: StoreContextMut<'_, Data>,1068mut dst: Destination<'_, Self::Item, Self::Buffer>,1069finish: bool,1070) -> Poll<Result<StreamResult>> {1071let remaining = dst.remaining(&mut store);1072let data = store.data_mut();1073let state = match data.host_stream_producers.get_mut(&self.0) {1074Some((state, _)) => state,1075None => {1076log::debug!("stream produce: dropped {}", self.0);1077return Poll::Ready(Ok(StreamResult::Dropped));1078}1079};1080match &mut state.kind {1081HostStreamProducerStateKind::Idle | HostStreamProducerStateKind::Wrote(_) => {1082if finish {1083log::debug!("stream produce: cancel {}", self.0);1084state.waker = None;1085Poll::Ready(Ok(StreamResult::Cancelled))1086} else {1087log::debug!("stream produce: wait {}", self.0);1088state.waker = Some(cx.waker().clone());1089Poll::Pending1090}1091}1092HostStreamProducerStateKind::Writing(buf) => {1093// Keep the other side blocked for a zero-length write1094// originated from the host.1095if buf.len() == 0 {1096log::debug!("stream produce: zero-length write {}", self.0);1097state.kind = HostStreamProducerStateKind::Wrote(0);1098state.waker = Some(cx.waker().clone());1099return Poll::Pending;1100}1101log::debug!("stream produce: write {}", self.0);1102match remaining {1103Some(amt) => {1104// If the guest is doing a zero-length read then we've1105// got some data for them. Complete the read but leave1106// ourselves in the same `Writing` state as before.1107if amt == 0 {1108state.waker = None;1109return Poll::Ready(Ok(StreamResult::Completed));1110}11111112// Don't let wasmtime buffer up data for us, so truncate1113// the buffer we're sending over to the amount that the1114// reader is requesting.1115if amt < buf.len() {1116buf.truncate(amt);1117}1118}11191120// At this time host<->host stream reads/writes aren't1121// fuzzed since that brings up a bunch of weird edge cases1122// which aren't fun to deal with and aren't interesting1123// either.1124None => unreachable!(),1125}1126let count = buf.len() as u32;1127dst.set_buffer(mem::take(buf).into());1128state.kind = HostStreamProducerStateKind::Wrote(count);1129state.waker = None;1130Poll::Ready(Ok(StreamResult::Completed))1131}1132}1133}1134}11351136impl Drop for HostStreamProducer {1137fn drop(&mut self) {1138log::debug!("stream produce: drop {}", self.0);1139}1140}11411142struct SharedStream(Scope);11431144impl SharedStream {1145async fn next(&mut self, accessor: &Accessor<Data>) -> Option<Command> {1146std::future::poll_fn(|cx| {1147accessor.with(|mut store| {1148self.poll(cx, store.as_context_mut(), false)1149.map(|pair| match pair {1150(None, StreamResult::Dropped) => None,1151(Some(item), StreamResult::Completed) => Some(item),1152_ => unreachable!(),1153})1154})1155})1156.await1157}11581159fn poll(1160&mut self,1161cx: &mut Context<'_>,1162mut store: StoreContextMut<'_, Data>,1163finish: bool,1164) -> Poll<(Option<Command>, StreamResult)> {1165let data = store.data_mut();11661167// If no more commands remain then this is a closed and dropped stream.1168let Some((scope, command)) = data.commands.last_mut() else {1169log::debug!("Stream closed: {:?}", self.0);1170return Poll::Ready((None, StreamResult::Dropped));1171};11721173// If the next queued up command is for the scope that this stream is1174// attached to then send off the command.1175if *scope == self.0 {1176let ret = Some(*command);11771178// All commands are followed up with an "ack", and after the "ack"1179// is delivered then the command is popped to move on to the next1180// command. The reason for this is to guarantee that a command has1181// been processed before moving on to the next command. This helps1182// make the fuzzing easier to work with by being able to implicitly1183// assume that a command has been processed by the time something1184// else is. Otherwise it might be possible that wasmtime has a set1185// of commands/callbacks that are all delivered at the same time and1186// the component model doesn't specify what order they happen1187// within. By forcing an "ack" it ensures a more expected ordering1188// of execution to assist with fuzzing without losing really all1189// that much coverage.1190if matches!(command, Command::Ack) {1191data.commands.pop();1192} else {1193*command = Command::Ack;1194}11951196// After a command was popped other streams may be able to make1197// progress so wake them all up.1198for (_, waker) in data.wakers.drain() {1199waker.wake();1200}1201log::debug!("Delivering command {ret:?} for {:?}", self.0);1202return Poll::Ready((ret, StreamResult::Completed));1203}12041205// The command queue is non-empty and the next command isn't meant for1206// us, so someone else needs to drain the queue. Enqueue our waker.1207if finish {1208Poll::Ready((None, StreamResult::Cancelled))1209} else {1210data.wakers.insert(self.0, cx.waker().clone());1211Poll::Pending1212}1213}1214}12151216impl StreamProducer<Data> for SharedStream {1217type Item = Command;1218type Buffer = Option<Command>;12191220fn poll_produce<'a>(1221mut self: Pin<&mut Self>,1222cx: &mut Context<'_>,1223store: StoreContextMut<'a, Data>,1224mut destination: Destination<'a, Self::Item, Self::Buffer>,1225finish: bool,1226) -> Poll<Result<StreamResult>> {1227let (item, result) = std::task::ready!(self.poll(cx, store, finish));1228destination.set_buffer(item);1229Poll::Ready(Ok(result))1230}1231}12321233#[cfg(test)]1234mod tests {1235use super::{ComponentAsync, Scope, init, run};1236use crate::oracles::component_async::types::*;1237use crate::test::test_n_times;1238use Scope::*;12391240#[test]1241fn smoke() {1242init();12431244test_n_times(50, |c, _| {1245run(c);1246Ok(())1247});1248}12491250// ========================================================================1251// A series of fuzz-generated test cases which caused problems during the1252// development of this fuzzer. Feel free to delete/edit/etc if the fuzzer1253// changes over time.12541255#[test]1256fn simple() {1257init();12581259run(ComponentAsync {1260commands: vec![1261(GuestCaller, Command::AsyncPendingImportCall(0)),1262(GuestCallee, Command::AsyncPendingImportCall(1)),1263(GuestCallee, Command::AsyncPendingExportComplete(0)),1264(GuestCaller, Command::AsyncPendingImportAssertReady(0)),1265(GuestCaller, Command::AsyncPendingImportCall(2)),1266],1267});1268}12691270#[test]1271fn somewhat_larger() {1272static COMMANDS: &[(Scope, Command)] = &[1273(GuestCallee, Command::FutureNew(0)),1274(HostCaller, Command::FutureNew(1)),1275(GuestCallee, Command::FutureReadPending(0)),1276(GuestCaller, Command::AsyncPendingImportCall(2)),1277(GuestCaller, Command::AsyncPendingImportCall(3)),1278(GuestCaller, Command::AsyncPendingImportCall(4)),1279(GuestCaller, Command::AsyncPendingImportCall(5)),1280(GuestCallee, Command::AsyncPendingExportComplete(5)),1281(GuestCallee, Command::AsyncPendingExportComplete(3)),1282(GuestCallee, Command::AsyncPendingExportComplete(4)),1283(GuestCallee, Command::AsyncPendingExportComplete(2)),1284(GuestCaller, Command::AsyncPendingImportCall(6)),1285(GuestCallee, Command::AsyncPendingExportComplete(6)),1286(GuestCaller, Command::AsyncPendingImportCall(7)),1287(GuestCallee, Command::AsyncPendingExportComplete(7)),1288(GuestCaller, Command::AsyncPendingImportCall(8)),1289(GuestCallee, Command::AsyncPendingExportComplete(8)),1290(GuestCaller, Command::AsyncPendingImportCall(9)),1291(GuestCallee, Command::AsyncPendingExportComplete(9)),1292(GuestCaller, Command::AsyncPendingImportCall(10)),1293(GuestCallee, Command::AsyncPendingExportComplete(10)),1294(GuestCaller, Command::AsyncPendingImportCall(11)),1295(GuestCallee, Command::AsyncPendingExportComplete(11)),1296(GuestCaller, Command::AsyncPendingImportCall(12)),1297(GuestCallee, Command::AsyncPendingExportComplete(12)),1298(GuestCaller, Command::AsyncPendingImportCall(13)),1299(GuestCallee, Command::AsyncPendingExportComplete(13)),1300(GuestCaller, Command::AsyncPendingImportCall(14)),1301(GuestCallee, Command::AsyncPendingExportComplete(14)),1302(GuestCaller, Command::AsyncPendingImportCall(15)),1303(GuestCallee, Command::AsyncPendingExportComplete(15)),1304(GuestCaller, Command::AsyncPendingImportCall(16)),1305(GuestCallee, Command::AsyncPendingExportComplete(16)),1306(GuestCaller, Command::AsyncPendingImportCall(17)),1307(GuestCallee, Command::AsyncPendingExportComplete(17)),1308(GuestCaller, Command::AsyncPendingImportCall(18)),1309(GuestCallee, Command::AsyncPendingExportComplete(18)),1310(GuestCaller, Command::AsyncPendingImportCall(19)),1311(GuestCallee, Command::AsyncPendingExportComplete(19)),1312(GuestCaller, Command::AsyncPendingImportCall(20)),1313(GuestCallee, Command::AsyncPendingExportComplete(20)),1314(GuestCaller, Command::AsyncPendingImportCall(21)),1315(GuestCallee, Command::AsyncPendingExportComplete(21)),1316(GuestCaller, Command::AsyncPendingImportCall(22)),1317(GuestCallee, Command::AsyncPendingExportComplete(22)),1318(GuestCaller, Command::AsyncPendingImportCall(23)),1319(GuestCallee, Command::AsyncPendingExportComplete(23)),1320(GuestCaller, Command::AsyncPendingImportCall(24)),1321(GuestCallee, Command::AsyncPendingExportComplete(24)),1322(GuestCaller, Command::AsyncPendingImportCall(25)),1323(GuestCallee, Command::AsyncPendingExportComplete(25)),1324(GuestCaller, Command::AsyncPendingImportCall(26)),1325(GuestCallee, Command::AsyncPendingExportComplete(26)),1326(GuestCaller, Command::AsyncPendingImportCall(27)),1327(GuestCallee, Command::AsyncPendingExportComplete(27)),1328(GuestCaller, Command::AsyncPendingImportCall(28)),1329(GuestCallee, Command::AsyncPendingExportComplete(28)),1330(GuestCaller, Command::AsyncPendingImportCall(29)),1331(GuestCallee, Command::AsyncPendingExportComplete(29)),1332(GuestCaller, Command::AsyncPendingImportCall(30)),1333(GuestCallee, Command::AsyncPendingExportComplete(30)),1334(GuestCaller, Command::AsyncPendingImportCall(31)),1335(GuestCallee, Command::AsyncPendingExportComplete(31)),1336(GuestCaller, Command::AsyncPendingImportCall(32)),1337(GuestCallee, Command::AsyncPendingExportComplete(32)),1338(GuestCaller, Command::AsyncPendingImportCall(33)),1339(GuestCallee, Command::AsyncPendingExportComplete(33)),1340(GuestCaller, Command::AsyncPendingImportCall(34)),1341(GuestCallee, Command::AsyncPendingExportComplete(34)),1342(GuestCaller, Command::AsyncPendingImportCall(35)),1343(GuestCallee, Command::AsyncPendingExportComplete(35)),1344(GuestCaller, Command::AsyncPendingImportCall(36)),1345(GuestCallee, Command::AsyncPendingExportComplete(36)),1346(GuestCaller, Command::AsyncPendingImportCall(37)),1347(GuestCallee, Command::AsyncPendingExportComplete(37)),1348(GuestCaller, Command::AsyncPendingImportAssertReady(36)),1349];1350init();13511352run(ComponentAsync {1353commands: COMMANDS.to_vec(),1354});1355}13561357#[test]1358fn simple_stream1() {1359init();13601361run(ComponentAsync {1362commands: vec![1363(HostCallee, Command::StreamNew(1)),1364(1365HostCallee,1366Command::StreamReadPending(StreamReadPayload {1367stream: 1,1368count: 2,1369}),1370),1371(HostCallee, Command::StreamCancelRead(1)),1372(GuestCaller, Command::SyncReadyCall),1373(1374HostCallee,1375Command::StreamWritePending(StreamWritePayload {1376stream: 1,1377item: 3,1378count: 2,1379}),1380),1381(HostCallee, Command::StreamCancelWrite(1)),1382(HostCallee, Command::StreamDropWritable(1)),1383(1384HostCallee,1385Command::StreamReadDropped(StreamReadPayload {1386stream: 1,1387count: 1,1388}),1389),1390],1391});1392}13931394#[test]1395fn simple_stream3() {1396init();13971398run(ComponentAsync {1399commands: vec![1400(GuestCaller, Command::StreamNew(26)),1401(1402GuestCaller,1403Command::StreamReadPending(StreamReadPayload {1404stream: 26,1405count: 10,1406}),1407),1408(GuestCaller, Command::StreamDropWritable(26)),1409(GuestCaller, Command::StreamReadAssertDropped(26)),1410],1411});1412}14131414#[test]1415fn simple_stream4() {1416init();14171418run(ComponentAsync {1419commands: vec![1420(GuestCaller, Command::StreamNew(23)),1421(1422GuestCaller,1423Command::StreamWritePending(StreamWritePayload {1424stream: 23,1425item: 24,1426count: 2,1427}),1428),1429(GuestCaller, Command::StreamGive(23)),1430(GuestCallee, Command::StreamDropReadable(23)),1431(1432GuestCaller,1433Command::StreamWriteAssertDropped(StreamReadPayload {1434stream: 23,1435count: 0,1436}),1437),1438],1439});1440}14411442#[test]1443fn zero_length_behavior() {1444init();14451446run(ComponentAsync {1447commands: vec![1448(GuestCaller, Command::StreamNew(10)),1449(HostCaller, Command::StreamTake(10)),1450(1451GuestCaller,1452Command::StreamWritePending(StreamWritePayload {1453stream: 10,1454item: 13,1455count: 5,1456}),1457),1458(1459HostCaller,1460Command::StreamReadReady(StreamReadyPayload {1461stream: 10,1462item: 0,1463op_count: 0,1464ready_count: 0,1465}),1466),1467(1468HostCaller,1469Command::StreamReadReady(StreamReadyPayload {1470stream: 10,1471item: 0,1472op_count: 0,1473ready_count: 0,1474}),1475),1476],1477});1478}1479}148014811482