Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bytecodealliance
GitHub Repository: bytecodealliance/wasmtime
Path: blob/main/crates/wasi/src/p2/pipe.rs
1692 views
1
//! Virtual pipes.
2
//!
3
//! These types provide easy implementations of `WasiFile` that mimic much of the behavior of Unix
4
//! pipes. These are particularly helpful for redirecting WASI stdio handles to destinations other
5
//! than OS files.
6
//!
7
//! Some convenience constructors are included for common backing types like `Vec<u8>` and `String`,
8
//! but the virtual pipes can be instantiated with any `Read` or `Write` type.
9
//!
10
use anyhow::anyhow;
11
use bytes::Bytes;
12
use std::pin::{Pin, pin};
13
use std::sync::{Arc, Mutex};
14
use std::task::{Context, Poll};
15
use tokio::io::{self, AsyncRead, AsyncWrite};
16
use tokio::sync::mpsc;
17
use wasmtime_wasi_io::{
18
poll::Pollable,
19
streams::{InputStream, OutputStream, StreamError},
20
};
21
22
pub use crate::p2::write_stream::AsyncWriteStream;
23
24
#[derive(Debug, Clone)]
25
pub struct MemoryInputPipe {
26
buffer: Arc<Mutex<Bytes>>,
27
}
28
29
impl MemoryInputPipe {
30
pub fn new(bytes: impl Into<Bytes>) -> Self {
31
Self {
32
buffer: Arc::new(Mutex::new(bytes.into())),
33
}
34
}
35
36
pub fn is_empty(&self) -> bool {
37
self.buffer.lock().unwrap().is_empty()
38
}
39
}
40
41
#[async_trait::async_trait]
42
impl InputStream for MemoryInputPipe {
43
fn read(&mut self, size: usize) -> Result<Bytes, StreamError> {
44
let mut buffer = self.buffer.lock().unwrap();
45
if buffer.is_empty() {
46
return Err(StreamError::Closed);
47
}
48
49
let size = size.min(buffer.len());
50
let read = buffer.split_to(size);
51
Ok(read)
52
}
53
}
54
55
#[async_trait::async_trait]
56
impl Pollable for MemoryInputPipe {
57
async fn ready(&mut self) {}
58
}
59
60
impl AsyncRead for MemoryInputPipe {
61
fn poll_read(
62
self: Pin<&mut Self>,
63
_cx: &mut Context<'_>,
64
buf: &mut io::ReadBuf<'_>,
65
) -> Poll<io::Result<()>> {
66
let mut buffer = self.buffer.lock().unwrap();
67
let size = buf.remaining().min(buffer.len());
68
let read = buffer.split_to(size);
69
buf.put_slice(&read);
70
Poll::Ready(Ok(()))
71
}
72
}
73
74
#[derive(Debug, Clone)]
75
pub struct MemoryOutputPipe {
76
capacity: usize,
77
buffer: Arc<Mutex<bytes::BytesMut>>,
78
}
79
80
impl MemoryOutputPipe {
81
pub fn new(capacity: usize) -> Self {
82
MemoryOutputPipe {
83
capacity,
84
buffer: std::sync::Arc::new(std::sync::Mutex::new(bytes::BytesMut::new())),
85
}
86
}
87
88
pub fn contents(&self) -> bytes::Bytes {
89
self.buffer.lock().unwrap().clone().freeze()
90
}
91
92
pub fn try_into_inner(self) -> Option<bytes::BytesMut> {
93
std::sync::Arc::into_inner(self.buffer).map(|m| m.into_inner().unwrap())
94
}
95
}
96
97
#[async_trait::async_trait]
98
impl OutputStream for MemoryOutputPipe {
99
fn write(&mut self, bytes: Bytes) -> Result<(), StreamError> {
100
let mut buf = self.buffer.lock().unwrap();
101
if bytes.len() > self.capacity - buf.len() {
102
return Err(StreamError::Trap(anyhow!(
103
"write beyond capacity of MemoryOutputPipe"
104
)));
105
}
106
buf.extend_from_slice(bytes.as_ref());
107
// Always ready for writing
108
Ok(())
109
}
110
fn flush(&mut self) -> Result<(), StreamError> {
111
// This stream is always flushed
112
Ok(())
113
}
114
fn check_write(&mut self) -> Result<usize, StreamError> {
115
let consumed = self.buffer.lock().unwrap().len();
116
if consumed < self.capacity {
117
Ok(self.capacity - consumed)
118
} else {
119
// Since the buffer is full, no more bytes will ever be written
120
Err(StreamError::Closed)
121
}
122
}
123
}
124
125
#[async_trait::async_trait]
126
impl Pollable for MemoryOutputPipe {
127
async fn ready(&mut self) {}
128
}
129
130
impl AsyncWrite for MemoryOutputPipe {
131
fn poll_write(
132
self: Pin<&mut Self>,
133
_cx: &mut Context<'_>,
134
buf: &[u8],
135
) -> Poll<io::Result<usize>> {
136
let mut buffer = self.buffer.lock().unwrap();
137
let amt = buf.len().min(self.capacity - buffer.len());
138
buffer.extend_from_slice(&buf[..amt]);
139
Poll::Ready(Ok(amt))
140
}
141
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
142
Poll::Ready(Ok(()))
143
}
144
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
145
Poll::Ready(Ok(()))
146
}
147
}
148
149
/// Provides a [`InputStream`] impl from a [`tokio::io::AsyncRead`] impl
150
pub struct AsyncReadStream {
151
closed: bool,
152
buffer: Option<Result<Bytes, StreamError>>,
153
receiver: mpsc::Receiver<Result<Bytes, StreamError>>,
154
join_handle: Option<crate::runtime::AbortOnDropJoinHandle<()>>,
155
}
156
157
impl AsyncReadStream {
158
/// Create a [`AsyncReadStream`]. In order to use the [`InputStream`] impl
159
/// provided by this struct, the argument must impl [`tokio::io::AsyncRead`].
160
pub fn new<T: AsyncRead + Send + 'static>(reader: T) -> Self {
161
let (sender, receiver) = mpsc::channel(1);
162
let join_handle = crate::runtime::spawn(async move {
163
let mut reader = pin!(reader);
164
loop {
165
use tokio::io::AsyncReadExt;
166
let mut buf = bytes::BytesMut::with_capacity(4096);
167
let sent = match reader.read_buf(&mut buf).await {
168
Ok(nbytes) if nbytes == 0 => sender.send(Err(StreamError::Closed)).await,
169
Ok(_) => sender.send(Ok(buf.freeze())).await,
170
Err(e) => {
171
sender
172
.send(Err(StreamError::LastOperationFailed(e.into())))
173
.await
174
}
175
};
176
if sent.is_err() {
177
// no more receiver - stop trying to read
178
break;
179
}
180
}
181
});
182
AsyncReadStream {
183
closed: false,
184
buffer: None,
185
receiver,
186
join_handle: Some(join_handle),
187
}
188
}
189
pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<()> {
190
if self.buffer.is_some() || self.closed {
191
return Poll::Ready(());
192
}
193
match self.receiver.poll_recv(cx) {
194
Poll::Ready(Some(res)) => {
195
self.buffer = Some(res);
196
Poll::Ready(())
197
}
198
Poll::Ready(None) => {
199
panic!("no more sender for an open AsyncReadStream - should be impossible")
200
}
201
Poll::Pending => Poll::Pending,
202
}
203
}
204
}
205
206
#[async_trait::async_trait]
207
impl InputStream for AsyncReadStream {
208
fn read(&mut self, size: usize) -> Result<Bytes, StreamError> {
209
use mpsc::error::TryRecvError;
210
211
match self.buffer.take() {
212
Some(Ok(mut bytes)) => {
213
// TODO: de-duplicate the buffer management with the case below
214
let len = bytes.len().min(size);
215
let rest = bytes.split_off(len);
216
if !rest.is_empty() {
217
self.buffer = Some(Ok(rest));
218
}
219
return Ok(bytes);
220
}
221
Some(Err(e)) => {
222
self.closed = true;
223
return Err(e);
224
}
225
None => {}
226
}
227
228
match self.receiver.try_recv() {
229
Ok(Ok(mut bytes)) => {
230
let len = bytes.len().min(size);
231
let rest = bytes.split_off(len);
232
if !rest.is_empty() {
233
self.buffer = Some(Ok(rest));
234
}
235
236
Ok(bytes)
237
}
238
Ok(Err(e)) => {
239
self.closed = true;
240
Err(e)
241
}
242
Err(TryRecvError::Empty) => Ok(Bytes::new()),
243
Err(TryRecvError::Disconnected) => Err(StreamError::Trap(anyhow!(
244
"AsyncReadStream sender died - should be impossible"
245
))),
246
}
247
}
248
249
async fn cancel(&mut self) {
250
match self.join_handle.take() {
251
Some(task) => _ = task.cancel().await,
252
None => {}
253
}
254
}
255
}
256
257
#[async_trait::async_trait]
258
impl Pollable for AsyncReadStream {
259
async fn ready(&mut self) {
260
std::future::poll_fn(|cx| self.poll_ready(cx)).await
261
}
262
}
263
264
/// An output stream that consumes all input written to it, and is always ready.
265
#[derive(Copy, Clone)]
266
pub struct SinkOutputStream;
267
268
#[async_trait::async_trait]
269
impl OutputStream for SinkOutputStream {
270
fn write(&mut self, _buf: Bytes) -> Result<(), StreamError> {
271
Ok(())
272
}
273
fn flush(&mut self) -> Result<(), StreamError> {
274
// This stream is always flushed
275
Ok(())
276
}
277
278
fn check_write(&mut self) -> Result<usize, StreamError> {
279
// This stream is always ready for writing.
280
Ok(usize::MAX)
281
}
282
}
283
284
#[async_trait::async_trait]
285
impl Pollable for SinkOutputStream {
286
async fn ready(&mut self) {}
287
}
288
289
/// A stream that is ready immediately, but will always report that it's closed.
290
#[derive(Copy, Clone)]
291
pub struct ClosedInputStream;
292
293
#[async_trait::async_trait]
294
impl InputStream for ClosedInputStream {
295
fn read(&mut self, _size: usize) -> Result<Bytes, StreamError> {
296
Err(StreamError::Closed)
297
}
298
}
299
300
#[async_trait::async_trait]
301
impl Pollable for ClosedInputStream {
302
async fn ready(&mut self) {}
303
}
304
305
/// An output stream that is always closed.
306
#[derive(Copy, Clone)]
307
pub struct ClosedOutputStream;
308
309
#[async_trait::async_trait]
310
impl OutputStream for ClosedOutputStream {
311
fn write(&mut self, _: Bytes) -> Result<(), StreamError> {
312
Err(StreamError::Closed)
313
}
314
fn flush(&mut self) -> Result<(), StreamError> {
315
Err(StreamError::Closed)
316
}
317
318
fn check_write(&mut self) -> Result<usize, StreamError> {
319
Err(StreamError::Closed)
320
}
321
}
322
323
#[async_trait::async_trait]
324
impl Pollable for ClosedOutputStream {
325
async fn ready(&mut self) {}
326
}
327
328
#[cfg(test)]
329
mod test {
330
use super::*;
331
use std::time::Duration;
332
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
333
334
// This is a gross way to handle CI running under qemu for non-x86 architectures.
335
#[cfg(not(target_arch = "x86_64"))]
336
const TEST_ITERATIONS: usize = 10;
337
338
#[cfg(target_arch = "x86_64")]
339
const TEST_ITERATIONS: usize = 100;
340
341
async fn resolves_immediately<F, O>(fut: F) -> O
342
where
343
F: futures::Future<Output = O>,
344
{
345
// The input `fut` should resolve immediately, but in case it
346
// accidentally doesn't don't hang the test indefinitely. Provide a
347
// generous timeout to account for CI sensitivity and various systems.
348
tokio::time::timeout(Duration::from_secs(2), fut)
349
.await
350
.expect("operation timed out")
351
}
352
353
async fn never_resolves<F: futures::Future>(fut: F) {
354
// The input `fut` should never resolve, so only give it a small window
355
// of budget before we time out. If `fut` is actually resolved this
356
// should show up as a flaky test.
357
tokio::time::timeout(Duration::from_millis(10), fut)
358
.await
359
.err()
360
.expect("operation should time out");
361
}
362
363
pub fn simplex(size: usize) -> (impl AsyncRead, impl AsyncWrite) {
364
let (a, b) = tokio::io::duplex(size);
365
let (_read_half, write_half) = tokio::io::split(a);
366
let (read_half, _write_half) = tokio::io::split(b);
367
(read_half, write_half)
368
}
369
370
#[test_log::test(tokio::test(flavor = "multi_thread"))]
371
async fn empty_read_stream() {
372
let mut reader = AsyncReadStream::new(tokio::io::empty());
373
374
// In a multi-threaded context, the value of state is not deterministic -- the spawned
375
// reader task may run on a different thread.
376
match reader.read(10) {
377
// The reader task ran before we tried to read, and noticed that the input was empty.
378
Err(StreamError::Closed) => {}
379
380
// The reader task hasn't run yet. Call `ready` to await and fill the buffer.
381
Ok(bs) => {
382
assert!(bs.is_empty());
383
resolves_immediately(reader.ready()).await;
384
assert!(matches!(reader.read(0), Err(StreamError::Closed)));
385
}
386
res => panic!("unexpected: {res:?}"),
387
}
388
}
389
390
#[test_log::test(tokio::test(flavor = "multi_thread"))]
391
async fn infinite_read_stream() {
392
let mut reader = AsyncReadStream::new(tokio::io::repeat(0));
393
394
let bs = reader.read(10).unwrap();
395
if bs.is_empty() {
396
// Reader task hasn't run yet. Call `ready` to await and fill the buffer.
397
resolves_immediately(reader.ready()).await;
398
// Now a read should succeed
399
let bs = reader.read(10).unwrap();
400
assert_eq!(bs.len(), 10);
401
} else {
402
assert_eq!(bs.len(), 10);
403
}
404
405
// Subsequent reads should succeed
406
let bs = reader.read(10).unwrap();
407
assert_eq!(bs.len(), 10);
408
409
// Even 0-length reads should succeed and show its open
410
let bs = reader.read(0).unwrap();
411
assert_eq!(bs.len(), 0);
412
}
413
414
async fn finite_async_reader(contents: &[u8]) -> impl AsyncRead + Send + 'static + use<> {
415
let (r, mut w) = simplex(contents.len());
416
w.write_all(contents).await.unwrap();
417
r
418
}
419
420
#[test_log::test(tokio::test(flavor = "multi_thread"))]
421
async fn finite_read_stream() {
422
let mut reader = AsyncReadStream::new(finite_async_reader(&[1; 123]).await);
423
424
let bs = reader.read(123).unwrap();
425
if bs.is_empty() {
426
// Reader task hasn't run yet. Call `ready` to await and fill the buffer.
427
resolves_immediately(reader.ready()).await;
428
// Now a read should succeed
429
let bs = reader.read(123).unwrap();
430
assert_eq!(bs.len(), 123);
431
} else {
432
assert_eq!(bs.len(), 123);
433
}
434
435
// The AsyncRead's should be empty now, but we have a race where the reader task hasn't
436
// yet send that to the AsyncReadStream.
437
match reader.read(0) {
438
Err(StreamError::Closed) => {} // Correct!
439
Ok(bs) => {
440
assert!(bs.is_empty());
441
// Need to await to give this side time to catch up
442
resolves_immediately(reader.ready()).await;
443
// Now a read should show closed
444
assert!(matches!(reader.read(0), Err(StreamError::Closed)));
445
}
446
res => panic!("unexpected: {res:?}"),
447
}
448
}
449
450
#[test_log::test(tokio::test(flavor = "multi_thread"))]
451
// Test that you can write items into the stream, and they get read out in the order they were
452
// written, with the proper indications of readiness for reading:
453
async fn multiple_chunks_read_stream() {
454
let (r, mut w) = simplex(1024);
455
let mut reader = AsyncReadStream::new(r);
456
457
w.write_all(&[123]).await.unwrap();
458
459
let bs = reader.read(1).unwrap();
460
if bs.is_empty() {
461
// Reader task hasn't run yet. Call `ready` to await and fill the buffer.
462
resolves_immediately(reader.ready()).await;
463
// Now a read should succeed
464
let bs = reader.read(1).unwrap();
465
assert_eq!(*bs, [123u8]);
466
} else {
467
assert_eq!(*bs, [123u8]);
468
}
469
470
// The stream should be empty and open now:
471
let bs = reader.read(1).unwrap();
472
assert!(bs.is_empty());
473
474
// We can wait on readiness and it will time out:
475
never_resolves(reader.ready()).await;
476
477
// Still open and empty:
478
let bs = reader.read(1).unwrap();
479
assert!(bs.is_empty());
480
481
// Put something else in the stream:
482
w.write_all(&[45]).await.unwrap();
483
484
// Wait readiness (yes we could possibly win the race and read it out faster, leaving that
485
// out of the test for simplicity)
486
resolves_immediately(reader.ready()).await;
487
488
// read the something else back out:
489
let bs = reader.read(1).unwrap();
490
assert_eq!(*bs, [45u8]);
491
492
// nothing else in there:
493
let bs = reader.read(1).unwrap();
494
assert!(bs.is_empty());
495
496
// We can wait on readiness and it will time out:
497
never_resolves(reader.ready()).await;
498
499
// nothing else in there:
500
let bs = reader.read(1).unwrap();
501
assert!(bs.is_empty());
502
503
// Now close the pipe:
504
drop(w);
505
506
// Wait readiness (yes we could possibly win the race and read it out faster, leaving that
507
// out of the test for simplicity)
508
resolves_immediately(reader.ready()).await;
509
510
// empty and now closed:
511
assert!(matches!(reader.read(1), Err(StreamError::Closed)));
512
}
513
514
#[test_log::test(tokio::test(flavor = "multi_thread"))]
515
// At the moment we are restricting AsyncReadStream from buffering more than 4k. This isn't a
516
// suitable design for all applications, and we will probably make a knob or change the
517
// behavior at some point, but this test shows the behavior as it is implemented:
518
async fn backpressure_read_stream() {
519
let (r, mut w) = simplex(16 * 1024); // Make sure this buffer isn't a bottleneck
520
let mut reader = AsyncReadStream::new(r);
521
522
let writer_task = tokio::task::spawn(async move {
523
// Write twice as much as we can buffer up in an AsyncReadStream:
524
w.write_all(&[123; 8192]).await.unwrap();
525
w
526
});
527
528
resolves_immediately(reader.ready()).await;
529
530
// Now we expect the reader task has sent 4k from the stream to the reader.
531
// Try to read out one bigger than the buffer available:
532
let bs = reader.read(4097).unwrap();
533
assert_eq!(bs.len(), 4096);
534
535
// Allow the crank to turn more:
536
resolves_immediately(reader.ready()).await;
537
538
// Again we expect the reader task has sent 4k from the stream to the reader.
539
// Try to read out one bigger than the buffer available:
540
let bs = reader.read(4097).unwrap();
541
assert_eq!(bs.len(), 4096);
542
543
// The writer task is now finished - join with it:
544
let w = resolves_immediately(writer_task).await;
545
546
// And close the pipe:
547
drop(w);
548
549
// Allow the crank to turn more:
550
resolves_immediately(reader.ready()).await;
551
552
// Now we expect the reader to be empty, and the stream.dropd:
553
assert!(matches!(reader.read(4097), Err(StreamError::Closed)));
554
}
555
556
#[test_log::test(test_log::test(tokio::test(flavor = "multi_thread")))]
557
async fn sink_write_stream() {
558
let mut writer = AsyncWriteStream::new(2048, tokio::io::sink());
559
let chunk = Bytes::from_static(&[0; 1024]);
560
561
let readiness = resolves_immediately(writer.write_ready())
562
.await
563
.expect("write_ready does not trap");
564
assert_eq!(readiness, 2048);
565
// I can write whatever:
566
writer.write(chunk.clone()).expect("write does not error");
567
568
// This may consume 1k of the buffer:
569
let readiness = resolves_immediately(writer.write_ready())
570
.await
571
.expect("write_ready does not trap");
572
assert!(
573
readiness == 1024 || readiness == 2048,
574
"readiness should be 1024 or 2048, got {readiness}"
575
);
576
577
if readiness == 1024 {
578
writer.write(chunk.clone()).expect("write does not error");
579
580
let readiness = resolves_immediately(writer.write_ready())
581
.await
582
.expect("write_ready does not trap");
583
assert!(
584
readiness == 1024 || readiness == 2048,
585
"readiness should be 1024 or 2048, got {readiness}"
586
);
587
}
588
}
589
590
#[test_log::test(tokio::test(flavor = "multi_thread"))]
591
async fn closed_write_stream() {
592
// Run many times because the test is nondeterministic:
593
for n in 0..TEST_ITERATIONS {
594
closed_write_stream_(n).await
595
}
596
}
597
#[tracing::instrument]
598
async fn closed_write_stream_(n: usize) {
599
let (reader, writer) = simplex(1);
600
let mut writer = AsyncWriteStream::new(1024, writer);
601
602
// Drop the reader to allow the worker to transition to the closed state eventually.
603
drop(reader);
604
605
// First the api is going to report the last operation failed, then subsequently
606
// it will be reported as closed. We set this flag once we see LastOperationFailed.
607
let mut should_be_closed = false;
608
609
// Write some data to the stream to ensure we have data that cannot be flushed.
610
let chunk = Bytes::from_static(&[0; 1]);
611
writer
612
.write(chunk.clone())
613
.expect("first write should succeed");
614
615
// The rest of this test should be valid whether or not we check write readiness:
616
let mut write_ready_res = None;
617
if n % 2 == 0 {
618
let r = resolves_immediately(writer.write_ready()).await;
619
// Check write readiness:
620
match r {
621
// worker hasn't processed write yet:
622
Ok(1023) => {}
623
// worker reports failure:
624
Err(StreamError::LastOperationFailed(_)) => {
625
tracing::debug!("discovered stream failure in first write_ready");
626
should_be_closed = true;
627
}
628
r => panic!("unexpected write_ready: {r:?}"),
629
}
630
write_ready_res = Some(r);
631
}
632
633
// When we drop the simplex reader, that causes the simplex writer to return BrokenPipe on
634
// its write. Now that the buffering crank has turned, our next write will give BrokenPipe.
635
let flush_res = writer.flush();
636
match flush_res {
637
// worker reports failure:
638
Err(StreamError::LastOperationFailed(_)) => {
639
tracing::debug!("discovered stream failure trying to flush");
640
assert!(!should_be_closed);
641
should_be_closed = true;
642
}
643
// Already reported failure, now closed
644
Err(StreamError::Closed) => {
645
assert!(
646
should_be_closed,
647
"expected a LastOperationFailed before we see Closed. {write_ready_res:?}"
648
);
649
}
650
// Also possible the worker hasn't processed write yet:
651
Ok(()) => {}
652
Err(e) => panic!("unexpected flush error: {e:?} {write_ready_res:?}"),
653
}
654
655
// Waiting for the flush to complete should always indicate that the channel has been
656
// closed.
657
match resolves_immediately(writer.write_ready()).await {
658
// worker reports failure:
659
Err(StreamError::LastOperationFailed(_)) => {
660
tracing::debug!("discovered stream failure trying to flush");
661
assert!(!should_be_closed);
662
}
663
// Already reported failure, now closed
664
Err(StreamError::Closed) => {
665
assert!(should_be_closed);
666
}
667
r => {
668
panic!(
669
"stream should be reported closed by the end of write_ready after flush, got {r:?}. {write_ready_res:?} {flush_res:?}"
670
)
671
}
672
}
673
}
674
675
#[test_log::test(tokio::test(flavor = "multi_thread"))]
676
async fn multiple_chunks_write_stream() {
677
// Run many times because the test is nondeterministic:
678
for n in 0..TEST_ITERATIONS {
679
multiple_chunks_write_stream_aux(n).await
680
}
681
}
682
#[tracing::instrument]
683
async fn multiple_chunks_write_stream_aux(_: usize) {
684
use std::ops::Deref;
685
686
let (mut reader, writer) = simplex(1024);
687
let mut writer = AsyncWriteStream::new(1024, writer);
688
689
// Write a chunk:
690
let chunk = Bytes::from_static(&[123; 1]);
691
692
let permit = resolves_immediately(writer.write_ready())
693
.await
694
.expect("write should be ready");
695
assert_eq!(permit, 1024);
696
697
writer.write(chunk.clone()).expect("write does not trap");
698
699
// At this point the message will either be waiting for the worker to process the write, or
700
// it will be buffered in the simplex channel.
701
let permit = resolves_immediately(writer.write_ready())
702
.await
703
.expect("write should be ready");
704
assert!(matches!(permit, 1023 | 1024));
705
706
let mut read_buf = vec![0; chunk.len()];
707
let read_len = reader.read_exact(&mut read_buf).await.unwrap();
708
assert_eq!(read_len, chunk.len());
709
assert_eq!(read_buf.as_slice(), chunk.deref());
710
711
// Write a second, different chunk:
712
let chunk2 = Bytes::from_static(&[45; 1]);
713
714
// We're only guaranteed to see a consistent write budget if we flush.
715
writer.flush().expect("channel is still alive");
716
717
let permit = resolves_immediately(writer.write_ready())
718
.await
719
.expect("write should be ready");
720
assert_eq!(permit, 1024);
721
722
writer.write(chunk2.clone()).expect("write does not trap");
723
724
// At this point the message will either be waiting for the worker to process the write, or
725
// it will be buffered in the simplex channel.
726
let permit = resolves_immediately(writer.write_ready())
727
.await
728
.expect("write should be ready");
729
assert!(matches!(permit, 1023 | 1024));
730
731
let mut read2_buf = vec![0; chunk2.len()];
732
let read2_len = reader.read_exact(&mut read2_buf).await.unwrap();
733
assert_eq!(read2_len, chunk2.len());
734
assert_eq!(read2_buf.as_slice(), chunk2.deref());
735
736
// We're only guaranteed to see a consistent write budget if we flush.
737
writer.flush().expect("channel is still alive");
738
739
let permit = resolves_immediately(writer.write_ready())
740
.await
741
.expect("write should be ready");
742
assert_eq!(permit, 1024);
743
}
744
745
#[test_log::test(tokio::test(flavor = "multi_thread"))]
746
async fn backpressure_write_stream() {
747
// Run many times because the test is nondeterministic:
748
for n in 0..TEST_ITERATIONS {
749
backpressure_write_stream_aux(n).await
750
}
751
}
752
#[tracing::instrument]
753
async fn backpressure_write_stream_aux(_: usize) {
754
use futures::future::poll_immediate;
755
756
// The channel can buffer up to 1k, plus another 1k in the stream, before not
757
// accepting more input:
758
let (mut reader, writer) = simplex(1024);
759
let mut writer = AsyncWriteStream::new(1024, writer);
760
761
let chunk = Bytes::from_static(&[0; 1024]);
762
763
let permit = resolves_immediately(writer.write_ready())
764
.await
765
.expect("write should be ready");
766
assert_eq!(permit, 1024);
767
768
writer.write(chunk.clone()).expect("write succeeds");
769
770
// We might still be waiting for the worker to process the message, or the worker may have
771
// processed it and released all the budget back to us.
772
let permit = poll_immediate(writer.write_ready()).await;
773
assert!(matches!(permit, None | Some(Ok(1024))));
774
775
// Given a little time, the worker will process the message and release all the budget
776
// back.
777
let permit = resolves_immediately(writer.write_ready())
778
.await
779
.expect("write should be ready");
780
assert_eq!(permit, 1024);
781
782
// Now fill the buffer between here and the writer task. This should always indicate
783
// back-pressure because now both buffers (simplex and worker) are full.
784
writer.write(chunk.clone()).expect("write does not trap");
785
786
// Try shoving even more down there, and it shouldn't accept more input:
787
writer
788
.write(chunk.clone())
789
.err()
790
.expect("unpermitted write does trap");
791
792
// No amount of waiting will resolve the situation, as nothing is emptying the simplex
793
// buffer.
794
never_resolves(writer.write_ready()).await;
795
796
// There is 2k buffered between the simplex and worker buffers. I should be able to read
797
// all of it out:
798
let mut buf = [0; 2048];
799
reader.read_exact(&mut buf).await.unwrap();
800
801
// and no more:
802
never_resolves(reader.read(&mut buf)).await;
803
804
// Now the backpressure should be cleared, and an additional write should be accepted.
805
let permit = resolves_immediately(writer.write_ready())
806
.await
807
.expect("ready is ok");
808
assert_eq!(permit, 1024);
809
810
// and the write succeeds:
811
writer.write(chunk.clone()).expect("write does not trap");
812
}
813
814
#[test_log::test(tokio::test(flavor = "multi_thread"))]
815
async fn backpressure_write_stream_with_flush() {
816
for n in 0..TEST_ITERATIONS {
817
backpressure_write_stream_with_flush_aux(n).await;
818
}
819
}
820
821
async fn backpressure_write_stream_with_flush_aux(_: usize) {
822
// The channel can buffer up to 1k, plus another 1k in the stream, before not
823
// accepting more input:
824
let (mut reader, writer) = simplex(1024);
825
let mut writer = AsyncWriteStream::new(1024, writer);
826
827
let chunk = Bytes::from_static(&[0; 1024]);
828
829
let permit = resolves_immediately(writer.write_ready())
830
.await
831
.expect("write should be ready");
832
assert_eq!(permit, 1024);
833
834
writer.write(chunk.clone()).expect("write succeeds");
835
836
writer.flush().expect("flush succeeds");
837
838
// Waiting for write_ready to resolve after a flush should always show that we have the
839
// full budget available, as the message will have flushed to the simplex channel.
840
let permit = resolves_immediately(writer.write_ready())
841
.await
842
.expect("write_ready succeeds");
843
assert_eq!(permit, 1024);
844
845
// Write enough to fill the simplex buffer:
846
writer.write(chunk.clone()).expect("write does not trap");
847
848
// Writes should be refused until this flush succeeds.
849
writer.flush().expect("flush succeeds");
850
851
// Try shoving even more down there, and it shouldn't accept more input:
852
writer
853
.write(chunk.clone())
854
.err()
855
.expect("unpermitted write does trap");
856
857
// No amount of waiting will resolve the situation, as nothing is emptying the simplex
858
// buffer.
859
never_resolves(writer.write_ready()).await;
860
861
// There is 2k buffered between the simplex and worker buffers. I should be able to read
862
// all of it out:
863
let mut buf = [0; 2048];
864
reader.read_exact(&mut buf).await.unwrap();
865
866
// and no more:
867
never_resolves(reader.read(&mut buf)).await;
868
869
// Now the backpressure should be cleared, and an additional write should be accepted.
870
let permit = resolves_immediately(writer.write_ready())
871
.await
872
.expect("ready is ok");
873
assert_eq!(permit, 1024);
874
875
// and the write succeeds:
876
writer.write(chunk.clone()).expect("write does not trap");
877
878
writer.flush().expect("flush succeeds");
879
880
let permit = resolves_immediately(writer.write_ready())
881
.await
882
.expect("ready is ok");
883
assert_eq!(permit, 1024);
884
}
885
}
886
887