Path: blob/main/crates/fuzzing/src/generators/component_async.rs
3067 views
//! For a high-level overview of this fuzz target see `fuzz_async.rs`12#![expect(missing_docs, reason = "macro-generated code")]34use arbitrary::{Arbitrary, Unstructured};5use indexmap::{IndexMap, IndexSet};67wasmtime::component::bindgen!({8world: "fuzz-async",9imports: {10"wasmtime-fuzz:fuzz/types.get-commands": store,11},12exports: { default: async | store },13});1415use wasmtime_fuzz::fuzz::types::{16Command, FuturePayload, StreamReadPayload, StreamReadyPayload, StreamWritePayload,17};1819const SOFT_MAX_COMMANDS: usize = 100;20const MAX_STREAM_COUNT: u32 = 10;2122/// Structure used for the "component async" fuzzer.23///24/// This encapsulates a list of commands for the fuzzer to run. Note that the25/// commands are not 100% arbitrary but instead they're generated similar to26/// wasm instructions where only some sequences of instructions are valid. The27/// rest of this module is dedicated to the generation of these commands.28#[derive(Debug)]29pub struct ComponentAsync {30/// A sequence of commands to run, tagged with a scope that they're run31/// within.32pub commands: Vec<(Scope, Command)>,33}3435/// The possible "scopes" that async commands run within.36#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]37pub enum Scope {38/// The outermost layer of the host, which controls invocations of the39/// guests.40HostCaller,4142/// The first layer of the guest, or the raw exports from the root of the43/// component.44///45/// This imports functions from the `GuestCallee`.46GuestCaller,4748/// The second layer of the guest which imports the host functions directly.49///50/// This is then in turn imported by the `GuestCaller`.51GuestCallee,5253/// The innermost layer of the host which provides imported functions to the54/// `GuestCallee`.55HostCallee,56}5758impl Scope {59const ALL: &[Scope; 4] = &[60Scope::HostCaller,61Scope::GuestCaller,62Scope::GuestCallee,63Scope::HostCallee,64];65const CALLERS: &[Scope; 3] = &[Scope::HostCaller, Scope::GuestCaller, Scope::GuestCallee];6667fn callee(&self) -> Option<Scope> {68match self {69Scope::HostCaller => Some(Scope::GuestCaller),70Scope::GuestCaller => Some(Scope::GuestCallee),71Scope::GuestCallee => Some(Scope::HostCallee),72Scope::HostCallee => None,73}74}7576fn caller(&self) -> Option<Scope> {77match self {78Scope::HostCaller => None,79Scope::GuestCaller => Some(Scope::HostCaller),80Scope::GuestCallee => Some(Scope::GuestCaller),81Scope::HostCallee => Some(Scope::GuestCallee),82}83}8485fn is_host(&self) -> bool {86match self {87Scope::HostCaller | Scope::HostCallee => true,88Scope::GuestCaller | Scope::GuestCallee => false,89}90}91}9293impl Arbitrary<'_> for ComponentAsync {94fn arbitrary(u: &mut Unstructured<'_>) -> arbitrary::Result<Self> {95let mut state = State::default();96let mut ret = Vec::new();9798// While there's more unstructured data, and our list of commands isn't99// too long, generate some new commands per-component.100while !u.is_empty() && ret.len() < SOFT_MAX_COMMANDS {101state.generate(u, false, &mut ret)?;102}103104// Optionally, if specified, finish up all async operations.105if u.arbitrary()? {106while !state.is_empty() {107state.generate(u, true, &mut ret)?;108}109}110111Ok(ComponentAsync { commands: ret })112}113}114115#[derive(Default)]116struct State {117next_id: u32,118119/// List of scopes that have an active and pending call to the120/// `async-pending` function.121async_pending: Vec<(Scope, u32)>,122123/// Deferred work that can happen at any time, for example asserting the124/// result of some previous operation.125deferred: Vec<(Scope, Command)>,126127/// State associated with futures/streams and their handles within.128futures: HandleStates<(), u32>,129streams: HandleStates<StreamRead, StreamWrite>,130}131132#[derive(Default)]133struct HandleStates<R, W> {134readers: HalfStates<R>,135writers: HalfStates<W>,136}137138impl<R, W> HandleStates<R, W> {139fn is_empty(&self) -> bool {140self.readers.is_empty() && self.writers.is_empty()141}142}143144/// State management for "half" of a future/stream read/write pair.145///146/// This tracks all the various states of all handles in the system to be able147/// to select amongst them an arbitrary operation to perform. This structure's148/// sets are primarily manipulated through helper methods to ensure that the set149/// metadata all stays in sync.150#[derive(Default)]151struct HalfStates<T> {152/// All known handles of this type, where they're located, etc.153handles: IndexMap<u32, (Scope, HalfState, Transferrable)>,154155/// All handles which can currently be dropped. Handles can't be dropped if156/// they're in use, for example.157droppable: IndexSet<u32>,158159/// All handles which can be read/written from (depending on handle type).160/// Handles where both pairs are in the same component can't be161/// read/written to for example.162ready: IndexSet<u32>,163164/// All handles which can be transferred somewhere else.165///166/// Some examples of non-transferrable handles are:167///168/// * writers169/// * handles with an outstanding read170/// * host-based handles that have been used at least once (FIXME #12090)171transferrable: IndexSet<u32>,172173/// Handles currently being read/written to.174///175/// Also includes state about the operation, such as whether it's been176/// dropped on the other side.177in_use: IndexMap<u32, (T, OpState)>,178179/// Handles with a pending operation which can be cancelled.180cancellable: IndexSet<u32>,181}182183enum HalfState {184Idle,185InUse,186}187188#[derive(Copy, Clone, PartialEq, Debug)]189enum Transferrable {190Yes,191No,192}193194#[derive(Copy, Clone, PartialEq, Debug)]195enum Cancellable {196Yes,197No,198}199200#[derive(Copy, Clone, PartialEq, Debug)]201enum OpState {202Pending,203Dropped,204}205206#[derive(Default, Copy, Clone)]207struct StreamRead {208count: u32,209}210211#[derive(Default, Copy, Clone)]212struct StreamWrite {213item: u32,214count: u32,215}216217impl<T> HalfStates<T> {218fn is_empty(&self) -> bool {219self.handles.is_empty()220}221222/// Adds a new handle `id` to this set.223fn insert(&mut self, id: u32, scope: Scope, transferrable: Transferrable) {224let prev = self225.handles226.insert(id, (scope, HalfState::Idle, transferrable));227assert!(prev.is_none());228assert!(self.droppable.insert(id));229if transferrable == Transferrable::Yes {230self.transferrable.insert(id);231}232}233234/// Removes the handle `id` for closing.235fn remove(&mut self, id: u32) -> Scope {236let (scope, state, transferrable) = self.handles.swap_remove(&id).unwrap();237assert!(matches!(state, HalfState::Idle));238self.droppable.swap_remove(&id);239self.ready.swap_remove(&id);240if transferrable == Transferrable::Yes {241assert!(self.transferrable.swap_remove(&id));242}243scope244}245246/// Locks `id` in whatever scope it's currently in for the rest of its247/// lifetime, preventing its transfer. This is used as a workaround for248/// #12090.249fn lock_in_place(&mut self, id: u32) {250let (_scope, state, transferrable) = self.handles.get_mut(&id).unwrap();251assert!(matches!(state, HalfState::Idle));252if matches!(transferrable, Transferrable::Yes) {253assert!(self.transferrable.swap_remove(&id));254*transferrable = Transferrable::No;255}256}257258/// Starts an operation on the handle `id`.259fn start(&mut self, id: u32, cancellable: Cancellable, payload: T) {260let (_scope, state, transferrable) = self.handles.get_mut(&id).unwrap();261assert!(matches!(state, HalfState::Idle));262assert!(self.ready.swap_remove(&id));263self.droppable.swap_remove(&id);264*state = HalfState::InUse;265let prev = self.in_use.insert(id, (payload, OpState::Pending));266assert!(prev.is_none());267if *transferrable == Transferrable::Yes {268assert!(self.transferrable.swap_remove(&id));269}270if cancellable == Cancellable::Yes {271assert!(self.cancellable.insert(id));272}273}274275/// Completes an operation on `id`, returning the state it was started with276/// along with whether it was dropped.277fn stop(&mut self, id: u32) -> (T, OpState) {278let (_scope, state, transferrable) = self.handles.get_mut(&id).unwrap();279assert!(matches!(state, HalfState::InUse));280*state = HalfState::Idle;281let dropped = self.in_use.swap_remove(&id).unwrap();282self.cancellable.swap_remove(&id);283if *transferrable == Transferrable::Yes {284assert!(self.transferrable.insert(id));285}286assert!(self.droppable.insert(id));287if dropped.1 != OpState::Dropped {288assert!(self.ready.insert(id));289} else {290self.lock_in_place(id);291}292dropped293}294295/// Updates to `OpState::Dropped` for an operation-in-progress.296fn set_in_use_state_dropped(&mut self, id: u32) {297let (_, prev) = self.in_use.get_mut(&id).unwrap();298assert_eq!(*prev, OpState::Pending);299*prev = OpState::Dropped;300301// This operation is now "cancellable" meaning that at any point in the302// future it can be resolved since the other end was dropped.303self.cancellable.insert(id);304}305}306307impl State {308fn is_empty(&self) -> bool {309let State {310next_id: _,311async_pending,312deferred,313futures,314streams,315} = self;316async_pending.is_empty() && deferred.is_empty() && futures.is_empty() && streams.is_empty()317}318319fn generate(320&mut self,321u: &mut Unstructured<'_>,322finish: bool,323commands: &mut Vec<(Scope, Command)>,324) -> arbitrary::Result<()> {325let mut choices = Vec::new();326327// If we're not finishing up then have the possibility of328// immediately-ready sync/async calls and such sort of miscellaneous329// work.330if !finish {331choices.push(Choice::SyncReadyCall);332choices.push(Choice::AsyncReadyCall);333choices.push(Choice::FutureNew);334choices.push(Choice::StreamNew);335}336337// If we're not finishing up, and if we don't have too much pending338// work, then possibly make some more pending work.339if !finish && self.async_pending.len() < 20 {340choices.push(Choice::AsyncPendingCall);341}342343// If there's pending work, possibly resolve something.344if self.async_pending.len() > 0 {345choices.push(Choice::AsyncPendingResolve);346}347348// If something has been deferred to later, possibly add that command349// into the stream.350if self.deferred.len() > 0 {351choices.push(Choice::Deferred);352}353354// Wrap up work with futures by dropping handles, writing, cancelling,355// etc.356if self.futures.readers.droppable.len() > 0 {357choices.push(Choice::FutureDropReadable);358}359if self.futures.writers.droppable.len() > 0 {360choices.push(Choice::FutureDropWritable);361}362if self.futures.writers.cancellable.len() > 0 {363choices.push(Choice::FutureCancelWrite);364}365if self.futures.readers.cancellable.len() > 0 {366choices.push(Choice::FutureCancelRead);367}368// If more work is allowed kick of reads/transfers.369if !finish {370if self.futures.writers.ready.len() > 0 {371choices.push(Choice::FutureWrite);372}373if self.futures.readers.ready.len() > 0 {374choices.push(Choice::FutureRead);375}376if self.futures.readers.transferrable.len() > 0 {377choices.push(Choice::FutureReaderTransfer);378}379}380381// Streams can be dropped at any time and their pending operations can382// be ceased at any time.383if self.streams.readers.droppable.len() > 0 {384choices.push(Choice::StreamDropReadable);385}386if self.streams.writers.droppable.len() > 0 {387choices.push(Choice::StreamDropWritable);388}389if self.streams.readers.cancellable.len() > 0 {390choices.push(Choice::StreamEndRead);391}392if self.streams.writers.cancellable.len() > 0 {393choices.push(Choice::StreamEndWrite);394}395// If more work is allowed then streams can be moved around and new396// reads/writes may be started.397if !finish {398if self.streams.readers.transferrable.len() > 0 {399choices.push(Choice::StreamReaderTransfer);400}401if self.streams.readers.ready.len() > 0 {402choices.push(Choice::StreamRead);403}404if self.streams.writers.ready.len() > 0 {405choices.push(Choice::StreamWrite);406}407}408409#[derive(Debug)]410enum Choice {411SyncReadyCall,412AsyncReadyCall,413AsyncPendingCall,414AsyncPendingResolve,415Deferred,416417FutureNew,418FutureReaderTransfer,419FutureRead,420FutureWrite,421FutureCancelRead,422FutureCancelWrite,423FutureDropReadable,424FutureDropWritable,425426StreamNew,427StreamReaderTransfer,428StreamDropReadable,429StreamDropWritable,430StreamRead,431StreamWrite,432StreamEndRead,433StreamEndWrite,434}435436match u.choose(&choices)? {437Choice::SyncReadyCall => {438let caller = *u.choose(Scope::CALLERS)?;439commands.push((caller, Command::SyncReadyCall));440}441Choice::AsyncReadyCall => {442let caller = *u.choose(Scope::CALLERS)?;443commands.push((caller, Command::AsyncReadyCall));444}445446Choice::AsyncPendingCall => {447let caller = *u.choose(Scope::CALLERS)?;448let id = self.next_id();449self.async_pending.push((caller, id));450commands.push((caller, Command::AsyncPendingImportCall(id)));451}452453Choice::AsyncPendingResolve => {454let index = u.int_in_range(0..=self.async_pending.len() - 1)?;455let (caller, id) = self.async_pending.swap_remove(index);456let callee = caller.callee().unwrap();457458// FIXME(#11833) the host can't cancel calls at this time, so459// they can only be completed. Everything else though is460// guest-initiated which means that the call can be either461// completed or cancelled.462let complete = caller == Scope::HostCaller || u.arbitrary()?;463464if complete {465commands.push((callee, Command::AsyncPendingExportComplete(id)));466self.deferred467.push((caller, Command::AsyncPendingImportAssertReady(id)));468} else {469commands.push((caller, Command::AsyncPendingImportCancel(id)));470self.deferred471.push((callee, Command::AsyncPendingExportAssertCancelled(id)));472}473}474475Choice::Deferred => {476let index = u.int_in_range(0..=self.deferred.len() - 1)?;477let (scope, cmd) = self.deferred.swap_remove(index);478commands.push((scope, cmd));479}480481Choice::FutureNew => {482let scope = *u.choose(Scope::ALL)?;483let id = self.next_id();484commands.push((scope, Command::FutureNew(id)));485self.futures.readers.insert(id, scope, Transferrable::Yes);486self.futures.writers.insert(id, scope, Transferrable::No);487488// Future writers cannot be dropped without writing.489assert!(self.futures.writers.droppable.swap_remove(&id));490}491Choice::FutureReaderTransfer => {492let set = &mut self.futures.readers.transferrable;493let i = u.int_in_range(0..=set.len() - 1)?;494let id = *set.get_index(i).unwrap();495let scope = &mut self.futures.readers.handles[&id].0;496497enum Action {498CallerTake(Scope),499GiveCallee(Scope),500}501502let action = match (scope.caller(), scope.callee()) {503(Some(caller), None) => Action::CallerTake(caller),504(None, Some(callee)) => Action::GiveCallee(callee),505(Some(caller), Some(callee)) => {506if u.arbitrary()? {507Action::CallerTake(caller)508} else {509Action::GiveCallee(callee)510}511}512(None, None) => unreachable!(),513};514match action {515Action::CallerTake(caller) => {516commands.push((caller, Command::FutureTake(id)));517*scope = caller;518}519Action::GiveCallee(callee) => {520commands.push((*scope, Command::FutureGive(id)));521*scope = callee;522}523}524525// See what scope the reader/writer half are in. Allow526// operations if they're in different scopes, but disallow527// operations if they're in the same scope.528let reader_scope = Some(*scope);529let writer_scope = self.futures.writers.handles.get(&id).map(|p| p.0);530if reader_scope == writer_scope {531self.futures.readers.ready.swap_remove(&id);532self.futures.writers.ready.swap_remove(&id);533} else {534self.futures.readers.ready.insert(id);535if writer_scope.is_some() && !self.futures.writers.in_use.contains_key(&id) {536self.futures.writers.ready.insert(id);537}538}539}540Choice::FutureRead => {541let set = &self.futures.readers.ready;542let i = u.int_in_range(0..=set.len() - 1)?;543let id = *set.get_index(i).unwrap();544let scope = self.futures.readers.handles[&id].0;545546if let Some((item, _)) = self.futures.writers.in_use.get(&id) {547// If the future has an active write, then this should548// complete with that write. The write is then resolved and549// the future reader/writer are both gone.550let item = *item;551commands.push((552scope,553Command::FutureReadReady(FuturePayload { future: id, item }),554));555let write_scope = self.futures.writers.handles[&id].0;556commands.push((write_scope, Command::FutureWriteAssertComplete(id)));557558self.futures.writers.stop(id);559self.futures.readers.remove(id);560self.futures.writers.remove(id);561} else {562// If the write-end is idle, then this should be a pending563// future read.564//565// FIXME(#12090) host reads cannot be cancelled566let cancellable = if scope.is_host() {567Cancellable::No568} else {569Cancellable::Yes570};571self.futures.readers.start(id, cancellable, ());572commands.push((scope, Command::FutureReadPending(id)));573}574}575Choice::FutureWrite => {576let set = &self.futures.writers.ready;577let i = u.int_in_range(0..=set.len() - 1)?;578let id = *set.get_index(i).unwrap();579let scope = self.futures.writers.handles[&id].0;580let item = self.next_id();581let payload = FuturePayload { future: id, item };582583if !self.futures.readers.handles.contains_key(&id) {584// If the reader is gone then this write should complete585// immediately with "dropped" and furthermore the writer586// should now be removed.587commands.push((scope, Command::FutureWriteDropped(id)));588self.futures.writers.remove(id);589} else if self.futures.readers.in_use.contains_key(&id) {590// If the reader is in-progress then this should complete591// the read/write pair. The reader/writer are both removed592// as a result.593commands.push((scope, Command::FutureWriteReady(payload)));594let read_scope = self.futures.readers.handles[&id].0;595commands.push((read_scope, Command::FutureReadAssertComplete(payload)));596self.futures.readers.stop(id);597self.futures.readers.remove(id);598self.futures.writers.remove(id);599} else {600// If the read-end is idle, then this should be a pending601// future read.602self.futures.writers.start(id, Cancellable::Yes, item);603commands.push((scope, Command::FutureWritePending(payload)));604}605}606Choice::FutureCancelWrite => {607let set = &self.futures.writers.cancellable;608let i = u.int_in_range(0..=set.len() - 1)?;609let id = *set.get_index(i).unwrap();610let scope = self.futures.writers.handles[&id].0;611612let (_write, state) = self.futures.writers.stop(id);613match state {614OpState::Pending => {615commands.push((scope, Command::FutureCancelWrite(id)));616assert!(self.futures.writers.droppable.swap_remove(&id));617}618OpState::Dropped => {619commands.push((scope, Command::FutureWriteAssertDropped(id)));620self.futures.writers.remove(id);621}622}623}624Choice::FutureCancelRead => {625let set = &self.futures.readers.cancellable;626let i = u.int_in_range(0..=set.len() - 1)?;627let id = *set.get_index(i).unwrap();628let scope = self.futures.readers.handles[&id].0;629630let (_read, state) = self.futures.readers.stop(id);631match state {632OpState::Pending => {633commands.push((scope, Command::FutureCancelRead(id)));634}635// Writers cannot be dropped with futures, so this is not636// reachable.637OpState::Dropped => unreachable!(),638}639}640Choice::FutureDropReadable => {641let set = &self.futures.readers.droppable;642let i = u.int_in_range(0..=set.len() - 1)?;643let id = *set.get_index(i).unwrap();644let scope = self.futures.readers.remove(id);645commands.push((scope, Command::FutureDropReadable(id)));646647// If the writer is active then its write is now destined to648// finish with "dropped", and otherwise the writer is also now649// droppable since the reader handle is gone.650if self.futures.writers.in_use.contains_key(&id) {651self.futures.writers.set_in_use_state_dropped(id);652} else {653assert!(self.futures.writers.droppable.insert(id));654}655}656Choice::FutureDropWritable => {657let set = &self.futures.writers.droppable;658let i = u.int_in_range(0..=set.len() - 1)?;659let id = *set.get_index(i).unwrap();660let scope = self.futures.writers.remove(id);661662// Writers can't actually be dropped prior to writing so fake663// a write by writing a value and asserting that the result is664// "dropped".665commands.push((scope, Command::FutureWriteDropped(id)));666667assert!(!self.futures.readers.handles.contains_key(&id));668}669670Choice::StreamNew => {671let scope = *u.choose(Scope::ALL)?;672let id = self.next_id();673commands.push((scope, Command::StreamNew(id)));674self.streams.readers.insert(id, scope, Transferrable::Yes);675self.streams.writers.insert(id, scope, Transferrable::No);676}677Choice::StreamReaderTransfer => {678let set = &mut self.streams.readers.transferrable;679let i = u.int_in_range(0..=set.len() - 1)?;680let id = *set.get_index(i).unwrap();681let scope = &mut self.streams.readers.handles[&id].0;682683enum Action {684CallerTake(Scope),685GiveCallee(Scope),686}687688let action = match (scope.caller(), scope.callee()) {689(Some(caller), None) => Action::CallerTake(caller),690(None, Some(callee)) => Action::GiveCallee(callee),691(Some(caller), Some(callee)) => {692if u.arbitrary()? {693Action::CallerTake(caller)694} else {695Action::GiveCallee(callee)696}697}698(None, None) => unreachable!(),699};700match action {701Action::CallerTake(caller) => {702commands.push((caller, Command::StreamTake(id)));703*scope = caller;704}705Action::GiveCallee(callee) => {706commands.push((*scope, Command::StreamGive(id)));707*scope = callee;708}709}710711// See what scope the reader/writer half are in. Allow712// operations if they're in different scopes, but disallow713// operations if they're in the same scope.714//715// Note that host<->host reads/writes for streams aren't fuzzed716// at this time so that's also explicitly disallowed.717let reader_scope = Some(*scope);718let writer_scope = self.streams.writers.handles.get(&id).map(|p| p.0);719if reader_scope == writer_scope720|| reader_scope.is_some_and(|s| s.is_host())721== writer_scope.is_some_and(|s| s.is_host())722{723self.streams.readers.ready.swap_remove(&id);724self.streams.writers.ready.swap_remove(&id);725} else {726self.streams.readers.ready.insert(id);727if writer_scope.is_some() && !self.streams.writers.in_use.contains_key(&id) {728self.streams.writers.ready.insert(id);729}730}731}732Choice::StreamDropReadable => {733let set = &self.streams.readers.droppable;734let i = u.int_in_range(0..=set.len() - 1)?;735let id = *set.get_index(i).unwrap();736let scope = self.streams.readers.remove(id);737commands.push((scope, Command::StreamDropReadable(id)));738739if self.streams.writers.in_use.contains_key(&id) {740self.streams.writers.set_in_use_state_dropped(id);741}742}743Choice::StreamDropWritable => {744let set = &self.streams.writers.droppable;745let i = u.int_in_range(0..=set.len() - 1)?;746let id = *set.get_index(i).unwrap();747let scope = self.streams.writers.remove(id);748commands.push((scope, Command::StreamDropWritable(id)));749750if self.streams.readers.in_use.contains_key(&id) {751self.streams.readers.set_in_use_state_dropped(id);752}753}754Choice::StreamRead => {755let set = &self.streams.readers.ready;756let i = u.int_in_range(0..=set.len() - 1)?;757let id = *set.get_index(i).unwrap();758let scope = self.streams.readers.handles[&id].0;759let count = u.int_in_range(0..=MAX_STREAM_COUNT)?;760761// FIXME(#12090)762if scope.is_host() {763self.streams.readers.lock_in_place(id);764}765766if !self.streams.writers.handles.contains_key(&id) {767// If the write handle is dropped, then this should768// immediately report as such.769commands.push((770scope,771Command::StreamReadDropped(StreamReadPayload { stream: id, count }),772));773// Can't read from this stream again, so it's not ready,774// and then we also can't lift/lower it any more so it's775// locked in place.776assert!(self.streams.readers.ready.swap_remove(&id));777self.streams.readers.lock_in_place(id);778} else if self.streams.writers.in_use.contains_key(&id) {779// If the write handle is active then this read should780// complete immediately.781let write_count = self.streams.writers.in_use[&id].0.count;782let write_scope = self.streams.writers.handles[&id].0;783let min = count.min(write_count);784785match (count, write_count) {786// Two zero-length operations rendezvousing will leave787// the reader blocked but the writer should wake up. A788// nonzero-length read and a 0-length write performs789// the same way too.790(0, 0) | (1.., 0) => {791self.streams792.readers793.start(id, Cancellable::Yes, StreamRead { count });794commands.push((795scope,796Command::StreamReadPending(StreamReadPayload { stream: id, count }),797));798self.streams.writers.stop(id);799commands.push((800write_scope,801Command::StreamWriteAssertComplete(StreamReadPayload {802stream: id,803count: min,804}),805));806}807808// A zero-length read with a nonzero-length-write809// should wake up just the reader and do nothing to the810// writer.811(0, 1..) => {812commands.push((813scope,814Command::StreamReadReady(StreamReadyPayload {815stream: id,816item: 0,817ready_count: min,818op_count: count,819}),820));821}822823// With two nonzero lengths both operations should complete.824(1.., 1..) => {825let (write, _) = self.streams.writers.stop(id);826commands.push((827scope,828Command::StreamReadReady(StreamReadyPayload {829stream: id,830item: write.item,831ready_count: min,832op_count: count,833}),834));835commands.push((836write_scope,837Command::StreamWriteAssertComplete(StreamReadPayload {838stream: id,839count: min,840}),841));842}843}844} else {845// If the write handle is not active then this should be in846// a pending state now.847self.streams848.readers849.start(id, Cancellable::Yes, StreamRead { count });850commands.push((851scope,852Command::StreamReadPending(StreamReadPayload { stream: id, count }),853));854}855}856Choice::StreamWrite => {857let set = &self.streams.writers.ready;858let i = u.int_in_range(0..=set.len() - 1)?;859let id = *set.get_index(i).unwrap();860let scope = self.streams.writers.handles[&id].0;861let item = self.next_id();862let count = u.int_in_range(0..=MAX_STREAM_COUNT)?;863864// FIXME(#12090)865if scope.is_host() {866self.streams.writers.lock_in_place(id);867}868869if !self.streams.readers.handles.contains_key(&id) {870// If the read handle is dropped, then this should871// immediately report as such.872commands.push((873scope,874Command::StreamWriteDropped(StreamWritePayload {875stream: id,876item,877count,878}),879));880// Cannot write ever again to this handle so remove it from881// the writable set.882assert!(self.streams.writers.ready.swap_remove(&id));883} else if self.streams.readers.in_use.contains_key(&id) {884// If the read handle is active then this write should885// complete immediately.886let read_count = self.streams.readers.in_use[&id].0.count;887let read_scope = self.streams.readers.handles[&id].0;888let min = count.min(read_count);889890match (read_count, count) {891// A zero-length write, no matter what the read half is892// pending as, is always ready and doesn't affect the893// reader.894(_, 0) => {895commands.push((896scope,897Command::StreamWriteReady(StreamReadyPayload {898stream: id,899item,900op_count: count,901ready_count: min,902}),903));904}905906// With a zero-length read and a nonzero-length write907// the writer is blocked but the reader is unblocked.908(0, 1..) => {909self.streams.writers.start(910id,911Cancellable::Yes,912StreamWrite { item, count },913);914commands.push((915scope,916Command::StreamWritePending(StreamWritePayload {917stream: id,918item,919count,920}),921));922self.streams.readers.stop(id);923commands.push((924read_scope,925Command::StreamReadAssertComplete(StreamWritePayload {926stream: id,927item,928count: min,929}),930));931}932933// Nonzero sizes means that the write immediately934// finishes and the read is also now ready to complete.935(1.., 1..) => {936commands.push((937scope,938Command::StreamWriteReady(StreamReadyPayload {939stream: id,940item,941op_count: count,942ready_count: min,943}),944));945self.streams.readers.stop(id);946commands.push((947read_scope,948Command::StreamReadAssertComplete(StreamWritePayload {949stream: id,950item,951count: min,952}),953));954}955}956} else {957// If the read handle is not active then this should be in958// a pending state now.959self.streams960.writers961.start(id, Cancellable::Yes, StreamWrite { item, count });962commands.push((963scope,964Command::StreamWritePending(StreamWritePayload {965stream: id,966item,967count,968}),969));970}971}972Choice::StreamEndRead => {973let set = &self.streams.readers.cancellable;974let i = u.int_in_range(0..=set.len() - 1)?;975let id = *set.get_index(i).unwrap();976let scope = self.streams.readers.handles[&id].0;977978let (_read, state) = self.streams.readers.stop(id);979match state {980OpState::Pending => {981commands.push((scope, Command::StreamCancelRead(id)));982}983OpState::Dropped => {984commands.push((scope, Command::StreamReadAssertDropped(id)));985}986}987}988Choice::StreamEndWrite => {989let set = &self.streams.writers.cancellable;990let i = u.int_in_range(0..=set.len() - 1)?;991let id = *set.get_index(i).unwrap();992let scope = self.streams.writers.handles[&id].0;993994let (_write, state) = self.streams.writers.stop(id);995match state {996OpState::Pending => {997commands.push((scope, Command::StreamCancelWrite(id)));998}999OpState::Dropped => {1000commands.push((1001scope,1002Command::StreamWriteAssertDropped(StreamReadPayload {1003stream: id,1004count: 0,1005}),1006));1007}1008}1009}1010}1011Ok(())1012}10131014fn next_id(&mut self) -> u32 {1015let id = self.next_id;1016self.next_id += 1;1017id1018}1019}102010211022