Path: blob/main/crates/test-programs/src/bin/async_transmit_caller.rs
1693 views
mod bindings {1wit_bindgen::generate!({2path: "../misc/component-async-tests/wit",3world: "transmit-caller",4});56use super::Component;7export!(Component);8}910use {11bindings::{12exports::local::local::run::Guest,13local::local::transmit::{self, Control},14wit_future, wit_stream,15},16futures::{FutureExt, StreamExt, future, stream::FuturesUnordered},17std::{18future::{Future, IntoFuture},19pin::{Pin, pin},20task::Poll,21},22wit_bindgen::{FutureWriteCancel, StreamResult},23};2425struct Component;2627impl Guest for Component {28async fn run() {29let (mut control_tx, control_rx) = wit_stream::new();30let (mut caller_stream_tx, caller_stream_rx) = wit_stream::new();31let (mut caller_future_tx1, caller_future_rx1) = wit_future::new(|| todo!());32let (caller_future_tx2, caller_future_rx2) = wit_future::new(|| String::new());3334let (mut callee_stream_rx, mut callee_future_rx1, callee_future_rx2) = transmit::exchange(35control_rx,36caller_stream_rx,37caller_future_rx1,38caller_future_rx2,39)40.await;4142// Tell peer to read from its end of the stream and assert that the result matches an expected value.43assert!(44control_tx45.write_one(Control::ReadStream("a".into()))46.await47.is_none()48);49assert!(caller_stream_tx.write_one("a".into()).await.is_none());5051// Start writing another value, but cancel the write before telling the peer to read.52{53let send = Box::pin(caller_stream_tx.write_one("b".into()));54assert!(poll(send).await.is_err());55}5657// Tell the peer to read an expected value again, which should _not_ match the value provided in the58// canceled write above.59assert!(60control_tx61.write_one(Control::ReadStream("c".into()))62.await63.is_none()64);65assert!(caller_stream_tx.write_one("c".into()).await.is_none());6667// Tell the peer to do a zero-length read, do a zero-length write; assert the latter completes, then do a68// non-zero-length write, assert that it does _not_ complete, then tell the peer to do a non-zero-length69// read and assert that the write completes.70assert!(71control_tx72.write_one(Control::ReadStreamZero)73.await74.is_none()75);76{77assert_eq!(78caller_stream_tx.write(Vec::new()).await.0,79StreamResult::Complete(0)80);8182let send = Box::pin(caller_stream_tx.write_one("d".into()));83let Err(send) = poll(send).await else {84panic!()85};8687let mut futures = FuturesUnordered::new();88futures.push(Box::pin(send.map(|v| {89assert!(v.is_none());90})) as Pin<Box<dyn Future<Output = _>>>);91futures.push(Box::pin(92control_tx93.write_one(Control::ReadStream("d".into()))94.map(|v| {95assert!(v.is_none());96}),97));98while let Some(()) = futures.next().await {}99}100101// Start writing a value to the future, but cancel the write before telling the peer to read.102{103let send = Box::pin(caller_future_tx1.write("x".into()));104match poll(send).await {105Ok(_) => panic!(),106Err(mut send) => {107caller_future_tx1 = match send.as_mut().cancel() {108FutureWriteCancel::AlreadySent => unreachable!(),109FutureWriteCancel::Dropped(_) => unreachable!(),110FutureWriteCancel::Cancelled(_, writer) => writer,111}112}113}114}115116// Tell the peer to read an expected value again, which should _not_ match the value provided in the117// canceled write above.118assert!(119control_tx120.write_one(Control::ReadFuture("y".into()))121.await122.is_none()123);124caller_future_tx1.write("y".into()).await.unwrap();125126// Tell the peer to write a value to its end of the stream, then read from our end and assert the value127// matches.128assert!(129control_tx130.write_one(Control::WriteStream("a".into()))131.await132.is_none()133);134assert_eq!(callee_stream_rx.next().await, Some("a".into()));135136// Start reading a value from the stream, but cancel the read before telling the peer to write.137{138let next = Box::pin(callee_stream_rx.read(Vec::with_capacity(1)));139assert!(poll(next).await.is_err());140}141142// Once again, tell the peer to write a value to its end of the stream, then read from our end and assert143// the value matches.144assert!(145control_tx146.write_one(Control::WriteStream("b".into()))147.await148.is_none()149);150assert_eq!(callee_stream_rx.next().await, Some("b".into()));151152// Tell the peer to do a zero-length write, assert that the read does _not_ complete, then tell the peer to153// do a non-zero-length write and assert that the read completes.154assert!(155control_tx156.write_one(Control::WriteStreamZero)157.await158.is_none()159);160{161let next = Box::pin(callee_stream_rx.next());162let Err(next) = poll(next).await else {163panic!()164};165166let mut futures = FuturesUnordered::new();167futures.push(Box::pin(next.map(|v| {168assert_eq!(v, Some("c".into()));169})) as Pin<Box<dyn Future<Output = _>>>);170futures.push(Box::pin(171control_tx172.write_one(Control::WriteStream("c".into()))173.map(|v| {174assert!(v.is_none());175}),176));177while let Some(()) = futures.next().await {}178}179180// Start reading a value from the future, but cancel the read before telling the peer to write.181{182let next = Box::pin(callee_future_rx1.into_future());183match poll(next).await {184Ok(_) => panic!(),185Err(mut next) => callee_future_rx1 = next.as_mut().cancel().unwrap_err(),186}187}188189// Tell the peer to write a value to its end of the future, then read from our end and assert the value190// matches.191assert!(192control_tx193.write_one(Control::WriteFuture("b".into()))194.await195.is_none()196);197assert_eq!(callee_future_rx1.into_future().await, "b");198199// Start writing a value to the stream, but drop the stream without telling the peer to read.200let send = Box::pin(caller_stream_tx.write_one("d".into()));201assert!(poll(send).await.is_err());202drop(caller_stream_tx);203204// Start reading a value from the stream, but drop the stream without telling the peer to write.205let next = Box::pin(callee_stream_rx.next());206assert!(poll(next).await.is_err());207drop(callee_stream_rx);208209// Start writing a value to the future, but drop the write without telling the peer to read.210{211let send = pin!(caller_future_tx2.write("x".into()));212assert!(poll(send).await.is_err());213}214215// Start reading a value from the future, but drop the read without telling the peer to write.216{217let next = Box::pin(callee_future_rx2.into_future());218assert!(poll(next).await.is_err());219}220}221}222223async fn poll<T, F: Future<Output = T> + Unpin>(fut: F) -> Result<T, F> {224let mut fut = Some(fut);225future::poll_fn(move |cx| {226let mut fut = fut.take().unwrap();227Poll::Ready(match fut.poll_unpin(cx) {228Poll::Ready(v) => Ok(v),229Poll::Pending => Err(fut),230})231})232.await233}234235// Unused function; required since this file is built as a `bin`:236fn main() {}237238239