Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bytecodealliance
GitHub Repository: bytecodealliance/wasmtime
Path: blob/main/crates/fuzzing/src/generators/component_async.rs
3067 views
1
//! For a high-level overview of this fuzz target see `fuzz_async.rs`
2
3
#![expect(missing_docs, reason = "macro-generated code")]
4
5
use arbitrary::{Arbitrary, Unstructured};
6
use indexmap::{IndexMap, IndexSet};
7
8
wasmtime::component::bindgen!({
9
world: "fuzz-async",
10
imports: {
11
"wasmtime-fuzz:fuzz/types.get-commands": store,
12
},
13
exports: { default: async | store },
14
});
15
16
use wasmtime_fuzz::fuzz::types::{
17
Command, FuturePayload, StreamReadPayload, StreamReadyPayload, StreamWritePayload,
18
};
19
20
const SOFT_MAX_COMMANDS: usize = 100;
21
const MAX_STREAM_COUNT: u32 = 10;
22
23
/// Structure used for the "component async" fuzzer.
24
///
25
/// This encapsulates a list of commands for the fuzzer to run. Note that the
26
/// commands are not 100% arbitrary but instead they're generated similar to
27
/// wasm instructions where only some sequences of instructions are valid. The
28
/// rest of this module is dedicated to the generation of these commands.
29
#[derive(Debug)]
30
pub struct ComponentAsync {
31
/// A sequence of commands to run, tagged with a scope that they're run
32
/// within.
33
pub commands: Vec<(Scope, Command)>,
34
}
35
36
/// The possible "scopes" that async commands run within.
37
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
38
pub enum Scope {
39
/// The outermost layer of the host, which controls invocations of the
40
/// guests.
41
HostCaller,
42
43
/// The first layer of the guest, or the raw exports from the root of the
44
/// component.
45
///
46
/// This imports functions from the `GuestCallee`.
47
GuestCaller,
48
49
/// The second layer of the guest which imports the host functions directly.
50
///
51
/// This is then in turn imported by the `GuestCaller`.
52
GuestCallee,
53
54
/// The innermost layer of the host which provides imported functions to the
55
/// `GuestCallee`.
56
HostCallee,
57
}
58
59
impl Scope {
60
const ALL: &[Scope; 4] = &[
61
Scope::HostCaller,
62
Scope::GuestCaller,
63
Scope::GuestCallee,
64
Scope::HostCallee,
65
];
66
const CALLERS: &[Scope; 3] = &[Scope::HostCaller, Scope::GuestCaller, Scope::GuestCallee];
67
68
fn callee(&self) -> Option<Scope> {
69
match self {
70
Scope::HostCaller => Some(Scope::GuestCaller),
71
Scope::GuestCaller => Some(Scope::GuestCallee),
72
Scope::GuestCallee => Some(Scope::HostCallee),
73
Scope::HostCallee => None,
74
}
75
}
76
77
fn caller(&self) -> Option<Scope> {
78
match self {
79
Scope::HostCaller => None,
80
Scope::GuestCaller => Some(Scope::HostCaller),
81
Scope::GuestCallee => Some(Scope::GuestCaller),
82
Scope::HostCallee => Some(Scope::GuestCallee),
83
}
84
}
85
86
fn is_host(&self) -> bool {
87
match self {
88
Scope::HostCaller | Scope::HostCallee => true,
89
Scope::GuestCaller | Scope::GuestCallee => false,
90
}
91
}
92
}
93
94
impl Arbitrary<'_> for ComponentAsync {
95
fn arbitrary(u: &mut Unstructured<'_>) -> arbitrary::Result<Self> {
96
let mut state = State::default();
97
let mut ret = Vec::new();
98
99
// While there's more unstructured data, and our list of commands isn't
100
// too long, generate some new commands per-component.
101
while !u.is_empty() && ret.len() < SOFT_MAX_COMMANDS {
102
state.generate(u, false, &mut ret)?;
103
}
104
105
// Optionally, if specified, finish up all async operations.
106
if u.arbitrary()? {
107
while !state.is_empty() {
108
state.generate(u, true, &mut ret)?;
109
}
110
}
111
112
Ok(ComponentAsync { commands: ret })
113
}
114
}
115
116
#[derive(Default)]
117
struct State {
118
next_id: u32,
119
120
/// List of scopes that have an active and pending call to the
121
/// `async-pending` function.
122
async_pending: Vec<(Scope, u32)>,
123
124
/// Deferred work that can happen at any time, for example asserting the
125
/// result of some previous operation.
126
deferred: Vec<(Scope, Command)>,
127
128
/// State associated with futures/streams and their handles within.
129
futures: HandleStates<(), u32>,
130
streams: HandleStates<StreamRead, StreamWrite>,
131
}
132
133
#[derive(Default)]
134
struct HandleStates<R, W> {
135
readers: HalfStates<R>,
136
writers: HalfStates<W>,
137
}
138
139
impl<R, W> HandleStates<R, W> {
140
fn is_empty(&self) -> bool {
141
self.readers.is_empty() && self.writers.is_empty()
142
}
143
}
144
145
/// State management for "half" of a future/stream read/write pair.
146
///
147
/// This tracks all the various states of all handles in the system to be able
148
/// to select amongst them an arbitrary operation to perform. This structure's
149
/// sets are primarily manipulated through helper methods to ensure that the set
150
/// metadata all stays in sync.
151
#[derive(Default)]
152
struct HalfStates<T> {
153
/// All known handles of this type, where they're located, etc.
154
handles: IndexMap<u32, (Scope, HalfState, Transferrable)>,
155
156
/// All handles which can currently be dropped. Handles can't be dropped if
157
/// they're in use, for example.
158
droppable: IndexSet<u32>,
159
160
/// All handles which can be read/written from (depending on handle type).
161
/// Handles where both pairs are in the same component can't be
162
/// read/written to for example.
163
ready: IndexSet<u32>,
164
165
/// All handles which can be transferred somewhere else.
166
///
167
/// Some examples of non-transferrable handles are:
168
///
169
/// * writers
170
/// * handles with an outstanding read
171
/// * host-based handles that have been used at least once (FIXME #12090)
172
transferrable: IndexSet<u32>,
173
174
/// Handles currently being read/written to.
175
///
176
/// Also includes state about the operation, such as whether it's been
177
/// dropped on the other side.
178
in_use: IndexMap<u32, (T, OpState)>,
179
180
/// Handles with a pending operation which can be cancelled.
181
cancellable: IndexSet<u32>,
182
}
183
184
enum HalfState {
185
Idle,
186
InUse,
187
}
188
189
#[derive(Copy, Clone, PartialEq, Debug)]
190
enum Transferrable {
191
Yes,
192
No,
193
}
194
195
#[derive(Copy, Clone, PartialEq, Debug)]
196
enum Cancellable {
197
Yes,
198
No,
199
}
200
201
#[derive(Copy, Clone, PartialEq, Debug)]
202
enum OpState {
203
Pending,
204
Dropped,
205
}
206
207
#[derive(Default, Copy, Clone)]
208
struct StreamRead {
209
count: u32,
210
}
211
212
#[derive(Default, Copy, Clone)]
213
struct StreamWrite {
214
item: u32,
215
count: u32,
216
}
217
218
impl<T> HalfStates<T> {
219
fn is_empty(&self) -> bool {
220
self.handles.is_empty()
221
}
222
223
/// Adds a new handle `id` to this set.
224
fn insert(&mut self, id: u32, scope: Scope, transferrable: Transferrable) {
225
let prev = self
226
.handles
227
.insert(id, (scope, HalfState::Idle, transferrable));
228
assert!(prev.is_none());
229
assert!(self.droppable.insert(id));
230
if transferrable == Transferrable::Yes {
231
self.transferrable.insert(id);
232
}
233
}
234
235
/// Removes the handle `id` for closing.
236
fn remove(&mut self, id: u32) -> Scope {
237
let (scope, state, transferrable) = self.handles.swap_remove(&id).unwrap();
238
assert!(matches!(state, HalfState::Idle));
239
self.droppable.swap_remove(&id);
240
self.ready.swap_remove(&id);
241
if transferrable == Transferrable::Yes {
242
assert!(self.transferrable.swap_remove(&id));
243
}
244
scope
245
}
246
247
/// Locks `id` in whatever scope it's currently in for the rest of its
248
/// lifetime, preventing its transfer. This is used as a workaround for
249
/// #12090.
250
fn lock_in_place(&mut self, id: u32) {
251
let (_scope, state, transferrable) = self.handles.get_mut(&id).unwrap();
252
assert!(matches!(state, HalfState::Idle));
253
if matches!(transferrable, Transferrable::Yes) {
254
assert!(self.transferrable.swap_remove(&id));
255
*transferrable = Transferrable::No;
256
}
257
}
258
259
/// Starts an operation on the handle `id`.
260
fn start(&mut self, id: u32, cancellable: Cancellable, payload: T) {
261
let (_scope, state, transferrable) = self.handles.get_mut(&id).unwrap();
262
assert!(matches!(state, HalfState::Idle));
263
assert!(self.ready.swap_remove(&id));
264
self.droppable.swap_remove(&id);
265
*state = HalfState::InUse;
266
let prev = self.in_use.insert(id, (payload, OpState::Pending));
267
assert!(prev.is_none());
268
if *transferrable == Transferrable::Yes {
269
assert!(self.transferrable.swap_remove(&id));
270
}
271
if cancellable == Cancellable::Yes {
272
assert!(self.cancellable.insert(id));
273
}
274
}
275
276
/// Completes an operation on `id`, returning the state it was started with
277
/// along with whether it was dropped.
278
fn stop(&mut self, id: u32) -> (T, OpState) {
279
let (_scope, state, transferrable) = self.handles.get_mut(&id).unwrap();
280
assert!(matches!(state, HalfState::InUse));
281
*state = HalfState::Idle;
282
let dropped = self.in_use.swap_remove(&id).unwrap();
283
self.cancellable.swap_remove(&id);
284
if *transferrable == Transferrable::Yes {
285
assert!(self.transferrable.insert(id));
286
}
287
assert!(self.droppable.insert(id));
288
if dropped.1 != OpState::Dropped {
289
assert!(self.ready.insert(id));
290
} else {
291
self.lock_in_place(id);
292
}
293
dropped
294
}
295
296
/// Updates to `OpState::Dropped` for an operation-in-progress.
297
fn set_in_use_state_dropped(&mut self, id: u32) {
298
let (_, prev) = self.in_use.get_mut(&id).unwrap();
299
assert_eq!(*prev, OpState::Pending);
300
*prev = OpState::Dropped;
301
302
// This operation is now "cancellable" meaning that at any point in the
303
// future it can be resolved since the other end was dropped.
304
self.cancellable.insert(id);
305
}
306
}
307
308
impl State {
309
fn is_empty(&self) -> bool {
310
let State {
311
next_id: _,
312
async_pending,
313
deferred,
314
futures,
315
streams,
316
} = self;
317
async_pending.is_empty() && deferred.is_empty() && futures.is_empty() && streams.is_empty()
318
}
319
320
fn generate(
321
&mut self,
322
u: &mut Unstructured<'_>,
323
finish: bool,
324
commands: &mut Vec<(Scope, Command)>,
325
) -> arbitrary::Result<()> {
326
let mut choices = Vec::new();
327
328
// If we're not finishing up then have the possibility of
329
// immediately-ready sync/async calls and such sort of miscellaneous
330
// work.
331
if !finish {
332
choices.push(Choice::SyncReadyCall);
333
choices.push(Choice::AsyncReadyCall);
334
choices.push(Choice::FutureNew);
335
choices.push(Choice::StreamNew);
336
}
337
338
// If we're not finishing up, and if we don't have too much pending
339
// work, then possibly make some more pending work.
340
if !finish && self.async_pending.len() < 20 {
341
choices.push(Choice::AsyncPendingCall);
342
}
343
344
// If there's pending work, possibly resolve something.
345
if self.async_pending.len() > 0 {
346
choices.push(Choice::AsyncPendingResolve);
347
}
348
349
// If something has been deferred to later, possibly add that command
350
// into the stream.
351
if self.deferred.len() > 0 {
352
choices.push(Choice::Deferred);
353
}
354
355
// Wrap up work with futures by dropping handles, writing, cancelling,
356
// etc.
357
if self.futures.readers.droppable.len() > 0 {
358
choices.push(Choice::FutureDropReadable);
359
}
360
if self.futures.writers.droppable.len() > 0 {
361
choices.push(Choice::FutureDropWritable);
362
}
363
if self.futures.writers.cancellable.len() > 0 {
364
choices.push(Choice::FutureCancelWrite);
365
}
366
if self.futures.readers.cancellable.len() > 0 {
367
choices.push(Choice::FutureCancelRead);
368
}
369
// If more work is allowed kick of reads/transfers.
370
if !finish {
371
if self.futures.writers.ready.len() > 0 {
372
choices.push(Choice::FutureWrite);
373
}
374
if self.futures.readers.ready.len() > 0 {
375
choices.push(Choice::FutureRead);
376
}
377
if self.futures.readers.transferrable.len() > 0 {
378
choices.push(Choice::FutureReaderTransfer);
379
}
380
}
381
382
// Streams can be dropped at any time and their pending operations can
383
// be ceased at any time.
384
if self.streams.readers.droppable.len() > 0 {
385
choices.push(Choice::StreamDropReadable);
386
}
387
if self.streams.writers.droppable.len() > 0 {
388
choices.push(Choice::StreamDropWritable);
389
}
390
if self.streams.readers.cancellable.len() > 0 {
391
choices.push(Choice::StreamEndRead);
392
}
393
if self.streams.writers.cancellable.len() > 0 {
394
choices.push(Choice::StreamEndWrite);
395
}
396
// If more work is allowed then streams can be moved around and new
397
// reads/writes may be started.
398
if !finish {
399
if self.streams.readers.transferrable.len() > 0 {
400
choices.push(Choice::StreamReaderTransfer);
401
}
402
if self.streams.readers.ready.len() > 0 {
403
choices.push(Choice::StreamRead);
404
}
405
if self.streams.writers.ready.len() > 0 {
406
choices.push(Choice::StreamWrite);
407
}
408
}
409
410
#[derive(Debug)]
411
enum Choice {
412
SyncReadyCall,
413
AsyncReadyCall,
414
AsyncPendingCall,
415
AsyncPendingResolve,
416
Deferred,
417
418
FutureNew,
419
FutureReaderTransfer,
420
FutureRead,
421
FutureWrite,
422
FutureCancelRead,
423
FutureCancelWrite,
424
FutureDropReadable,
425
FutureDropWritable,
426
427
StreamNew,
428
StreamReaderTransfer,
429
StreamDropReadable,
430
StreamDropWritable,
431
StreamRead,
432
StreamWrite,
433
StreamEndRead,
434
StreamEndWrite,
435
}
436
437
match u.choose(&choices)? {
438
Choice::SyncReadyCall => {
439
let caller = *u.choose(Scope::CALLERS)?;
440
commands.push((caller, Command::SyncReadyCall));
441
}
442
Choice::AsyncReadyCall => {
443
let caller = *u.choose(Scope::CALLERS)?;
444
commands.push((caller, Command::AsyncReadyCall));
445
}
446
447
Choice::AsyncPendingCall => {
448
let caller = *u.choose(Scope::CALLERS)?;
449
let id = self.next_id();
450
self.async_pending.push((caller, id));
451
commands.push((caller, Command::AsyncPendingImportCall(id)));
452
}
453
454
Choice::AsyncPendingResolve => {
455
let index = u.int_in_range(0..=self.async_pending.len() - 1)?;
456
let (caller, id) = self.async_pending.swap_remove(index);
457
let callee = caller.callee().unwrap();
458
459
// FIXME(#11833) the host can't cancel calls at this time, so
460
// they can only be completed. Everything else though is
461
// guest-initiated which means that the call can be either
462
// completed or cancelled.
463
let complete = caller == Scope::HostCaller || u.arbitrary()?;
464
465
if complete {
466
commands.push((callee, Command::AsyncPendingExportComplete(id)));
467
self.deferred
468
.push((caller, Command::AsyncPendingImportAssertReady(id)));
469
} else {
470
commands.push((caller, Command::AsyncPendingImportCancel(id)));
471
self.deferred
472
.push((callee, Command::AsyncPendingExportAssertCancelled(id)));
473
}
474
}
475
476
Choice::Deferred => {
477
let index = u.int_in_range(0..=self.deferred.len() - 1)?;
478
let (scope, cmd) = self.deferred.swap_remove(index);
479
commands.push((scope, cmd));
480
}
481
482
Choice::FutureNew => {
483
let scope = *u.choose(Scope::ALL)?;
484
let id = self.next_id();
485
commands.push((scope, Command::FutureNew(id)));
486
self.futures.readers.insert(id, scope, Transferrable::Yes);
487
self.futures.writers.insert(id, scope, Transferrable::No);
488
489
// Future writers cannot be dropped without writing.
490
assert!(self.futures.writers.droppable.swap_remove(&id));
491
}
492
Choice::FutureReaderTransfer => {
493
let set = &mut self.futures.readers.transferrable;
494
let i = u.int_in_range(0..=set.len() - 1)?;
495
let id = *set.get_index(i).unwrap();
496
let scope = &mut self.futures.readers.handles[&id].0;
497
498
enum Action {
499
CallerTake(Scope),
500
GiveCallee(Scope),
501
}
502
503
let action = match (scope.caller(), scope.callee()) {
504
(Some(caller), None) => Action::CallerTake(caller),
505
(None, Some(callee)) => Action::GiveCallee(callee),
506
(Some(caller), Some(callee)) => {
507
if u.arbitrary()? {
508
Action::CallerTake(caller)
509
} else {
510
Action::GiveCallee(callee)
511
}
512
}
513
(None, None) => unreachable!(),
514
};
515
match action {
516
Action::CallerTake(caller) => {
517
commands.push((caller, Command::FutureTake(id)));
518
*scope = caller;
519
}
520
Action::GiveCallee(callee) => {
521
commands.push((*scope, Command::FutureGive(id)));
522
*scope = callee;
523
}
524
}
525
526
// See what scope the reader/writer half are in. Allow
527
// operations if they're in different scopes, but disallow
528
// operations if they're in the same scope.
529
let reader_scope = Some(*scope);
530
let writer_scope = self.futures.writers.handles.get(&id).map(|p| p.0);
531
if reader_scope == writer_scope {
532
self.futures.readers.ready.swap_remove(&id);
533
self.futures.writers.ready.swap_remove(&id);
534
} else {
535
self.futures.readers.ready.insert(id);
536
if writer_scope.is_some() && !self.futures.writers.in_use.contains_key(&id) {
537
self.futures.writers.ready.insert(id);
538
}
539
}
540
}
541
Choice::FutureRead => {
542
let set = &self.futures.readers.ready;
543
let i = u.int_in_range(0..=set.len() - 1)?;
544
let id = *set.get_index(i).unwrap();
545
let scope = self.futures.readers.handles[&id].0;
546
547
if let Some((item, _)) = self.futures.writers.in_use.get(&id) {
548
// If the future has an active write, then this should
549
// complete with that write. The write is then resolved and
550
// the future reader/writer are both gone.
551
let item = *item;
552
commands.push((
553
scope,
554
Command::FutureReadReady(FuturePayload { future: id, item }),
555
));
556
let write_scope = self.futures.writers.handles[&id].0;
557
commands.push((write_scope, Command::FutureWriteAssertComplete(id)));
558
559
self.futures.writers.stop(id);
560
self.futures.readers.remove(id);
561
self.futures.writers.remove(id);
562
} else {
563
// If the write-end is idle, then this should be a pending
564
// future read.
565
//
566
// FIXME(#12090) host reads cannot be cancelled
567
let cancellable = if scope.is_host() {
568
Cancellable::No
569
} else {
570
Cancellable::Yes
571
};
572
self.futures.readers.start(id, cancellable, ());
573
commands.push((scope, Command::FutureReadPending(id)));
574
}
575
}
576
Choice::FutureWrite => {
577
let set = &self.futures.writers.ready;
578
let i = u.int_in_range(0..=set.len() - 1)?;
579
let id = *set.get_index(i).unwrap();
580
let scope = self.futures.writers.handles[&id].0;
581
let item = self.next_id();
582
let payload = FuturePayload { future: id, item };
583
584
if !self.futures.readers.handles.contains_key(&id) {
585
// If the reader is gone then this write should complete
586
// immediately with "dropped" and furthermore the writer
587
// should now be removed.
588
commands.push((scope, Command::FutureWriteDropped(id)));
589
self.futures.writers.remove(id);
590
} else if self.futures.readers.in_use.contains_key(&id) {
591
// If the reader is in-progress then this should complete
592
// the read/write pair. The reader/writer are both removed
593
// as a result.
594
commands.push((scope, Command::FutureWriteReady(payload)));
595
let read_scope = self.futures.readers.handles[&id].0;
596
commands.push((read_scope, Command::FutureReadAssertComplete(payload)));
597
self.futures.readers.stop(id);
598
self.futures.readers.remove(id);
599
self.futures.writers.remove(id);
600
} else {
601
// If the read-end is idle, then this should be a pending
602
// future read.
603
self.futures.writers.start(id, Cancellable::Yes, item);
604
commands.push((scope, Command::FutureWritePending(payload)));
605
}
606
}
607
Choice::FutureCancelWrite => {
608
let set = &self.futures.writers.cancellable;
609
let i = u.int_in_range(0..=set.len() - 1)?;
610
let id = *set.get_index(i).unwrap();
611
let scope = self.futures.writers.handles[&id].0;
612
613
let (_write, state) = self.futures.writers.stop(id);
614
match state {
615
OpState::Pending => {
616
commands.push((scope, Command::FutureCancelWrite(id)));
617
assert!(self.futures.writers.droppable.swap_remove(&id));
618
}
619
OpState::Dropped => {
620
commands.push((scope, Command::FutureWriteAssertDropped(id)));
621
self.futures.writers.remove(id);
622
}
623
}
624
}
625
Choice::FutureCancelRead => {
626
let set = &self.futures.readers.cancellable;
627
let i = u.int_in_range(0..=set.len() - 1)?;
628
let id = *set.get_index(i).unwrap();
629
let scope = self.futures.readers.handles[&id].0;
630
631
let (_read, state) = self.futures.readers.stop(id);
632
match state {
633
OpState::Pending => {
634
commands.push((scope, Command::FutureCancelRead(id)));
635
}
636
// Writers cannot be dropped with futures, so this is not
637
// reachable.
638
OpState::Dropped => unreachable!(),
639
}
640
}
641
Choice::FutureDropReadable => {
642
let set = &self.futures.readers.droppable;
643
let i = u.int_in_range(0..=set.len() - 1)?;
644
let id = *set.get_index(i).unwrap();
645
let scope = self.futures.readers.remove(id);
646
commands.push((scope, Command::FutureDropReadable(id)));
647
648
// If the writer is active then its write is now destined to
649
// finish with "dropped", and otherwise the writer is also now
650
// droppable since the reader handle is gone.
651
if self.futures.writers.in_use.contains_key(&id) {
652
self.futures.writers.set_in_use_state_dropped(id);
653
} else {
654
assert!(self.futures.writers.droppable.insert(id));
655
}
656
}
657
Choice::FutureDropWritable => {
658
let set = &self.futures.writers.droppable;
659
let i = u.int_in_range(0..=set.len() - 1)?;
660
let id = *set.get_index(i).unwrap();
661
let scope = self.futures.writers.remove(id);
662
663
// Writers can't actually be dropped prior to writing so fake
664
// a write by writing a value and asserting that the result is
665
// "dropped".
666
commands.push((scope, Command::FutureWriteDropped(id)));
667
668
assert!(!self.futures.readers.handles.contains_key(&id));
669
}
670
671
Choice::StreamNew => {
672
let scope = *u.choose(Scope::ALL)?;
673
let id = self.next_id();
674
commands.push((scope, Command::StreamNew(id)));
675
self.streams.readers.insert(id, scope, Transferrable::Yes);
676
self.streams.writers.insert(id, scope, Transferrable::No);
677
}
678
Choice::StreamReaderTransfer => {
679
let set = &mut self.streams.readers.transferrable;
680
let i = u.int_in_range(0..=set.len() - 1)?;
681
let id = *set.get_index(i).unwrap();
682
let scope = &mut self.streams.readers.handles[&id].0;
683
684
enum Action {
685
CallerTake(Scope),
686
GiveCallee(Scope),
687
}
688
689
let action = match (scope.caller(), scope.callee()) {
690
(Some(caller), None) => Action::CallerTake(caller),
691
(None, Some(callee)) => Action::GiveCallee(callee),
692
(Some(caller), Some(callee)) => {
693
if u.arbitrary()? {
694
Action::CallerTake(caller)
695
} else {
696
Action::GiveCallee(callee)
697
}
698
}
699
(None, None) => unreachable!(),
700
};
701
match action {
702
Action::CallerTake(caller) => {
703
commands.push((caller, Command::StreamTake(id)));
704
*scope = caller;
705
}
706
Action::GiveCallee(callee) => {
707
commands.push((*scope, Command::StreamGive(id)));
708
*scope = callee;
709
}
710
}
711
712
// See what scope the reader/writer half are in. Allow
713
// operations if they're in different scopes, but disallow
714
// operations if they're in the same scope.
715
//
716
// Note that host<->host reads/writes for streams aren't fuzzed
717
// at this time so that's also explicitly disallowed.
718
let reader_scope = Some(*scope);
719
let writer_scope = self.streams.writers.handles.get(&id).map(|p| p.0);
720
if reader_scope == writer_scope
721
|| reader_scope.is_some_and(|s| s.is_host())
722
== writer_scope.is_some_and(|s| s.is_host())
723
{
724
self.streams.readers.ready.swap_remove(&id);
725
self.streams.writers.ready.swap_remove(&id);
726
} else {
727
self.streams.readers.ready.insert(id);
728
if writer_scope.is_some() && !self.streams.writers.in_use.contains_key(&id) {
729
self.streams.writers.ready.insert(id);
730
}
731
}
732
}
733
Choice::StreamDropReadable => {
734
let set = &self.streams.readers.droppable;
735
let i = u.int_in_range(0..=set.len() - 1)?;
736
let id = *set.get_index(i).unwrap();
737
let scope = self.streams.readers.remove(id);
738
commands.push((scope, Command::StreamDropReadable(id)));
739
740
if self.streams.writers.in_use.contains_key(&id) {
741
self.streams.writers.set_in_use_state_dropped(id);
742
}
743
}
744
Choice::StreamDropWritable => {
745
let set = &self.streams.writers.droppable;
746
let i = u.int_in_range(0..=set.len() - 1)?;
747
let id = *set.get_index(i).unwrap();
748
let scope = self.streams.writers.remove(id);
749
commands.push((scope, Command::StreamDropWritable(id)));
750
751
if self.streams.readers.in_use.contains_key(&id) {
752
self.streams.readers.set_in_use_state_dropped(id);
753
}
754
}
755
Choice::StreamRead => {
756
let set = &self.streams.readers.ready;
757
let i = u.int_in_range(0..=set.len() - 1)?;
758
let id = *set.get_index(i).unwrap();
759
let scope = self.streams.readers.handles[&id].0;
760
let count = u.int_in_range(0..=MAX_STREAM_COUNT)?;
761
762
// FIXME(#12090)
763
if scope.is_host() {
764
self.streams.readers.lock_in_place(id);
765
}
766
767
if !self.streams.writers.handles.contains_key(&id) {
768
// If the write handle is dropped, then this should
769
// immediately report as such.
770
commands.push((
771
scope,
772
Command::StreamReadDropped(StreamReadPayload { stream: id, count }),
773
));
774
// Can't read from this stream again, so it's not ready,
775
// and then we also can't lift/lower it any more so it's
776
// locked in place.
777
assert!(self.streams.readers.ready.swap_remove(&id));
778
self.streams.readers.lock_in_place(id);
779
} else if self.streams.writers.in_use.contains_key(&id) {
780
// If the write handle is active then this read should
781
// complete immediately.
782
let write_count = self.streams.writers.in_use[&id].0.count;
783
let write_scope = self.streams.writers.handles[&id].0;
784
let min = count.min(write_count);
785
786
match (count, write_count) {
787
// Two zero-length operations rendezvousing will leave
788
// the reader blocked but the writer should wake up. A
789
// nonzero-length read and a 0-length write performs
790
// the same way too.
791
(0, 0) | (1.., 0) => {
792
self.streams
793
.readers
794
.start(id, Cancellable::Yes, StreamRead { count });
795
commands.push((
796
scope,
797
Command::StreamReadPending(StreamReadPayload { stream: id, count }),
798
));
799
self.streams.writers.stop(id);
800
commands.push((
801
write_scope,
802
Command::StreamWriteAssertComplete(StreamReadPayload {
803
stream: id,
804
count: min,
805
}),
806
));
807
}
808
809
// A zero-length read with a nonzero-length-write
810
// should wake up just the reader and do nothing to the
811
// writer.
812
(0, 1..) => {
813
commands.push((
814
scope,
815
Command::StreamReadReady(StreamReadyPayload {
816
stream: id,
817
item: 0,
818
ready_count: min,
819
op_count: count,
820
}),
821
));
822
}
823
824
// With two nonzero lengths both operations should complete.
825
(1.., 1..) => {
826
let (write, _) = self.streams.writers.stop(id);
827
commands.push((
828
scope,
829
Command::StreamReadReady(StreamReadyPayload {
830
stream: id,
831
item: write.item,
832
ready_count: min,
833
op_count: count,
834
}),
835
));
836
commands.push((
837
write_scope,
838
Command::StreamWriteAssertComplete(StreamReadPayload {
839
stream: id,
840
count: min,
841
}),
842
));
843
}
844
}
845
} else {
846
// If the write handle is not active then this should be in
847
// a pending state now.
848
self.streams
849
.readers
850
.start(id, Cancellable::Yes, StreamRead { count });
851
commands.push((
852
scope,
853
Command::StreamReadPending(StreamReadPayload { stream: id, count }),
854
));
855
}
856
}
857
Choice::StreamWrite => {
858
let set = &self.streams.writers.ready;
859
let i = u.int_in_range(0..=set.len() - 1)?;
860
let id = *set.get_index(i).unwrap();
861
let scope = self.streams.writers.handles[&id].0;
862
let item = self.next_id();
863
let count = u.int_in_range(0..=MAX_STREAM_COUNT)?;
864
865
// FIXME(#12090)
866
if scope.is_host() {
867
self.streams.writers.lock_in_place(id);
868
}
869
870
if !self.streams.readers.handles.contains_key(&id) {
871
// If the read handle is dropped, then this should
872
// immediately report as such.
873
commands.push((
874
scope,
875
Command::StreamWriteDropped(StreamWritePayload {
876
stream: id,
877
item,
878
count,
879
}),
880
));
881
// Cannot write ever again to this handle so remove it from
882
// the writable set.
883
assert!(self.streams.writers.ready.swap_remove(&id));
884
} else if self.streams.readers.in_use.contains_key(&id) {
885
// If the read handle is active then this write should
886
// complete immediately.
887
let read_count = self.streams.readers.in_use[&id].0.count;
888
let read_scope = self.streams.readers.handles[&id].0;
889
let min = count.min(read_count);
890
891
match (read_count, count) {
892
// A zero-length write, no matter what the read half is
893
// pending as, is always ready and doesn't affect the
894
// reader.
895
(_, 0) => {
896
commands.push((
897
scope,
898
Command::StreamWriteReady(StreamReadyPayload {
899
stream: id,
900
item,
901
op_count: count,
902
ready_count: min,
903
}),
904
));
905
}
906
907
// With a zero-length read and a nonzero-length write
908
// the writer is blocked but the reader is unblocked.
909
(0, 1..) => {
910
self.streams.writers.start(
911
id,
912
Cancellable::Yes,
913
StreamWrite { item, count },
914
);
915
commands.push((
916
scope,
917
Command::StreamWritePending(StreamWritePayload {
918
stream: id,
919
item,
920
count,
921
}),
922
));
923
self.streams.readers.stop(id);
924
commands.push((
925
read_scope,
926
Command::StreamReadAssertComplete(StreamWritePayload {
927
stream: id,
928
item,
929
count: min,
930
}),
931
));
932
}
933
934
// Nonzero sizes means that the write immediately
935
// finishes and the read is also now ready to complete.
936
(1.., 1..) => {
937
commands.push((
938
scope,
939
Command::StreamWriteReady(StreamReadyPayload {
940
stream: id,
941
item,
942
op_count: count,
943
ready_count: min,
944
}),
945
));
946
self.streams.readers.stop(id);
947
commands.push((
948
read_scope,
949
Command::StreamReadAssertComplete(StreamWritePayload {
950
stream: id,
951
item,
952
count: min,
953
}),
954
));
955
}
956
}
957
} else {
958
// If the read handle is not active then this should be in
959
// a pending state now.
960
self.streams
961
.writers
962
.start(id, Cancellable::Yes, StreamWrite { item, count });
963
commands.push((
964
scope,
965
Command::StreamWritePending(StreamWritePayload {
966
stream: id,
967
item,
968
count,
969
}),
970
));
971
}
972
}
973
Choice::StreamEndRead => {
974
let set = &self.streams.readers.cancellable;
975
let i = u.int_in_range(0..=set.len() - 1)?;
976
let id = *set.get_index(i).unwrap();
977
let scope = self.streams.readers.handles[&id].0;
978
979
let (_read, state) = self.streams.readers.stop(id);
980
match state {
981
OpState::Pending => {
982
commands.push((scope, Command::StreamCancelRead(id)));
983
}
984
OpState::Dropped => {
985
commands.push((scope, Command::StreamReadAssertDropped(id)));
986
}
987
}
988
}
989
Choice::StreamEndWrite => {
990
let set = &self.streams.writers.cancellable;
991
let i = u.int_in_range(0..=set.len() - 1)?;
992
let id = *set.get_index(i).unwrap();
993
let scope = self.streams.writers.handles[&id].0;
994
995
let (_write, state) = self.streams.writers.stop(id);
996
match state {
997
OpState::Pending => {
998
commands.push((scope, Command::StreamCancelWrite(id)));
999
}
1000
OpState::Dropped => {
1001
commands.push((
1002
scope,
1003
Command::StreamWriteAssertDropped(StreamReadPayload {
1004
stream: id,
1005
count: 0,
1006
}),
1007
));
1008
}
1009
}
1010
}
1011
}
1012
Ok(())
1013
}
1014
1015
fn next_id(&mut self) -> u32 {
1016
let id = self.next_id;
1017
self.next_id += 1;
1018
id
1019
}
1020
}
1021
1022