Path: blob/main/crates/misc/component-async-tests/tests/scenario/transmit.rs
3088 views
use std::future::Future;1use std::pin::Pin;2use std::task::{self, Context, Poll};3use std::time::Duration;45use super::util::{config, make_component, test_run, test_run_with_count};6use cancel::exports::local::local::cancel::Mode;7use component_async_tests::transmit::bindings::exports::local::local::transmit::Control;8use component_async_tests::util::{OneshotConsumer, OneshotProducer, PipeConsumer, PipeProducer};9use component_async_tests::{Ctx, sleep, transmit};10use futures::{11FutureExt, SinkExt, StreamExt, TryStreamExt,12channel::{mpsc, oneshot},13stream::FuturesUnordered,14};15use wasmtime::component::{16Accessor, Component, Destination, FutureConsumer, FutureProducer, FutureReader, HasSelf,17Instance, Linker, ResourceTable, Source, StreamConsumer, StreamProducer, StreamReader,18StreamResult, Val,19};20use wasmtime::{AsContextMut, Engine, Result, Store, StoreContextMut, format_err};21use wasmtime_wasi::WasiCtxBuilder;2223struct BufferStreamProducer {24buffer: Vec<u8>,25}2627impl<D> StreamProducer<D> for BufferStreamProducer {28type Item = u8;29type Buffer = Option<u8>;3031fn poll_produce<'a>(32self: Pin<&mut Self>,33_: &mut Context<'_>,34mut store: StoreContextMut<'a, D>,35destination: Destination<'a, Self::Item, Self::Buffer>,36_: bool,37) -> Poll<Result<StreamResult>> {38let me = self.get_mut();39let capacity = destination.remaining(store.as_context_mut());40if capacity == Some(0) {41Poll::Ready(Ok(StreamResult::Completed))42} else {43assert_eq!(capacity, Some(me.buffer.len()));44let mut destination = destination.as_direct(store, me.buffer.len());45destination.remaining().copy_from_slice(&me.buffer);46destination.mark_written(me.buffer.len());4748Poll::Ready(Ok(StreamResult::Dropped))49}50}51}5253struct BufferStreamConsumer {54expected: Vec<u8>,55}5657impl<D> StreamConsumer<D> for BufferStreamConsumer {58type Item = u8;5960fn poll_consume(61self: Pin<&mut Self>,62_: &mut Context<'_>,63mut store: StoreContextMut<D>,64source: Source<Self::Item>,65_: bool,66) -> Poll<Result<StreamResult>> {67let me = self.get_mut();68let available = source.remaining(store.as_context_mut());69if available == 0 {70Poll::Ready(Ok(StreamResult::Completed))71} else {72assert_eq!(available, me.expected.len());73let mut source = source.as_direct(store);74assert_eq!(&me.expected, source.remaining());75source.mark_read(me.expected.len());7677Poll::Ready(Ok(StreamResult::Dropped))78}79}80}8182struct ValueFutureProducer {83value: u8,84}8586impl<D> FutureProducer<D> for ValueFutureProducer {87type Item = u8;8889fn poll_produce<'a>(90self: Pin<&mut Self>,91_: &mut Context<'_>,92_: StoreContextMut<'a, D>,93_: bool,94) -> Poll<Result<Option<Self::Item>>> {95Poll::Ready(Ok(Some(self.value)))96}97}9899struct ValueFutureConsumer {100expected: u8,101}102103impl<D> FutureConsumer<D> for ValueFutureConsumer {104type Item = u8;105106fn poll_consume(107self: Pin<&mut Self>,108_: &mut Context<'_>,109store: StoreContextMut<D>,110mut source: Source<'_, Self::Item>,111_: bool,112) -> Poll<Result<()>> {113let value = &mut None;114source.read(store, value)?;115assert_eq!(value.take(), Some(self.expected));116Poll::Ready(Ok(()))117}118}119120struct DelayedStreamProducer<P> {121inner: P,122sleep: Pin<Box<dyn Future<Output = ()> + Send>>,123}124125impl<D, P: StreamProducer<D>> StreamProducer<D> for DelayedStreamProducer<P> {126type Item = P::Item;127type Buffer = P::Buffer;128129fn poll_produce<'a>(130mut self: Pin<&mut Self>,131cx: &mut Context<'_>,132store: StoreContextMut<'a, D>,133destination: Destination<'a, Self::Item, Self::Buffer>,134finish: bool,135) -> Poll<Result<StreamResult>> {136// SAFETY: We never move out of `self`.137let sleep = unsafe { &mut self.as_mut().get_unchecked_mut().sleep };138task::ready!(sleep.as_mut().poll(cx));139*sleep = async {}.boxed();140141// SAFETY: This is a standard pin-projection, and we never move out142// of `self`.143let inner = unsafe { self.map_unchecked_mut(|v| &mut v.inner) };144inner.poll_produce(cx, store, destination, finish)145}146}147148struct DelayedStreamConsumer<C> {149inner: C,150sleep: Pin<Box<dyn Future<Output = ()> + Send>>,151}152153impl<D, C: StreamConsumer<D>> StreamConsumer<D> for DelayedStreamConsumer<C> {154type Item = C::Item;155156fn poll_consume(157mut self: Pin<&mut Self>,158cx: &mut Context<'_>,159store: StoreContextMut<D>,160source: Source<Self::Item>,161finish: bool,162) -> Poll<Result<StreamResult>> {163// SAFETY: We never move out of `self`.164let sleep = unsafe { &mut self.as_mut().get_unchecked_mut().sleep };165task::ready!(sleep.as_mut().poll(cx));166*sleep = async {}.boxed();167168// SAFETY: This is a standard pin-projection, and we never move out169// of `self`.170let inner = unsafe { self.map_unchecked_mut(|v| &mut v.inner) };171inner.poll_consume(cx, store, source, finish)172}173}174175struct DelayedFutureProducer<P> {176inner: P,177sleep: Pin<Box<dyn Future<Output = ()> + Send>>,178}179180impl<D, P: FutureProducer<D>> FutureProducer<D> for DelayedFutureProducer<P> {181type Item = P::Item;182183fn poll_produce<'a>(184mut self: Pin<&mut Self>,185cx: &mut Context<'_>,186store: StoreContextMut<'a, D>,187finish: bool,188) -> Poll<Result<Option<Self::Item>>> {189// SAFETY: We never move out of `self`.190let sleep = unsafe { &mut self.as_mut().get_unchecked_mut().sleep };191task::ready!(sleep.as_mut().poll(cx));192*sleep = async {}.boxed();193194// SAFETY: This is a standard pin-projection, and we never move out195// of `self`.196let inner = unsafe { self.map_unchecked_mut(|v| &mut v.inner) };197inner.poll_produce(cx, store, finish)198}199}200201struct DelayedFutureConsumer<C> {202inner: C,203sleep: Pin<Box<dyn Future<Output = ()> + Send>>,204}205206impl<D, C: FutureConsumer<D>> FutureConsumer<D> for DelayedFutureConsumer<C> {207type Item = C::Item;208209fn poll_consume(210mut self: Pin<&mut Self>,211cx: &mut Context<'_>,212store: StoreContextMut<D>,213source: Source<'_, Self::Item>,214finish: bool,215) -> Poll<Result<()>> {216// SAFETY: We never move out of `self`.217let sleep = unsafe { &mut self.as_mut().get_unchecked_mut().sleep };218task::ready!(sleep.as_mut().poll(cx));219*sleep = async {}.boxed();220221// SAFETY: This is a standard pin-projection, and we never move out222// of `self`.223let inner = unsafe { self.map_unchecked_mut(|v| &mut v.inner) };224inner.poll_consume(cx, store, source, finish)225}226}227228struct ProcrastinatingStreamProducer<P>(P);229230impl<D, P: StreamProducer<D>> StreamProducer<D> for ProcrastinatingStreamProducer<P> {231type Item = P::Item;232type Buffer = P::Buffer;233234fn poll_produce<'a>(235self: Pin<&mut Self>,236cx: &mut Context<'_>,237store: StoreContextMut<'a, D>,238destination: Destination<'a, Self::Item, Self::Buffer>,239finish: bool,240) -> Poll<Result<StreamResult>> {241if finish {242// SAFETY: This is a standard pin-projection, and we never move out243// of `self`.244let producer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };245producer.poll_produce(cx, store, destination, true)246} else {247Poll::Pending248}249}250}251252struct ProcrastinatingStreamConsumer<C>(C);253254impl<D, C: StreamConsumer<D>> StreamConsumer<D> for ProcrastinatingStreamConsumer<C> {255type Item = C::Item;256257fn poll_consume(258self: Pin<&mut Self>,259cx: &mut Context<'_>,260store: StoreContextMut<D>,261source: Source<'_, Self::Item>,262finish: bool,263) -> Poll<Result<StreamResult>> {264if finish {265// SAFETY: This is a standard pin-projection, and we never move out266// of `self`.267let consumer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };268consumer.poll_consume(cx, store, source, true)269} else {270Poll::Pending271}272}273}274275struct ProcrastinatingFutureProducer<P>(P);276277impl<D, P: FutureProducer<D>> FutureProducer<D> for ProcrastinatingFutureProducer<P> {278type Item = P::Item;279280fn poll_produce<'a>(281self: Pin<&mut Self>,282cx: &mut Context<'_>,283store: StoreContextMut<'a, D>,284finish: bool,285) -> Poll<Result<Option<Self::Item>>> {286if finish {287// SAFETY: This is a standard pin-projection, and we never move out288// of `self`.289let producer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };290producer.poll_produce(cx, store, true)291} else {292Poll::Pending293}294}295}296297struct ProcrastinatingFutureConsumer<C>(C);298299impl<D, C: FutureConsumer<D>> FutureConsumer<D> for ProcrastinatingFutureConsumer<C> {300type Item = C::Item;301302fn poll_consume(303self: Pin<&mut Self>,304cx: &mut Context<'_>,305store: StoreContextMut<D>,306source: Source<'_, Self::Item>,307finish: bool,308) -> Poll<Result<()>> {309if finish {310// SAFETY: This is a standard pin-projection, and we never move out311// of `self`.312let consumer = unsafe { self.map_unchecked_mut(|v| &mut v.0) };313consumer.poll_consume(cx, store, source, true)314} else {315Poll::Pending316}317}318}319320fn sleep() -> Pin<Box<dyn Future<Output = ()> + Send>> {321component_async_tests::util::sleep(Duration::from_millis(delay_millis())).boxed()322}323324mod readiness {325wasmtime::component::bindgen!({326path: "wit",327world: "readiness-guest",328exports: { default: task_exit },329});330}331332#[tokio::test]333pub async fn async_readiness() -> Result<()> {334let component = test_programs_artifacts::ASYNC_READINESS_COMPONENT;335336let engine = Engine::new(&config())?;337338let component = make_component(&engine, &[component]).await?;339340let mut linker = Linker::new(&engine);341342wasmtime_wasi::p2::add_to_linker_async(&mut linker)?;343344let mut store = Store::new(345&engine,346Ctx {347wasi: WasiCtxBuilder::new().inherit_stdio().build(),348table: ResourceTable::default(),349continue_: false,350},351);352353let readiness_guest =354readiness::ReadinessGuest::instantiate_async(&mut store, &component, &linker).await?;355let expected = vec![2u8, 4, 6, 8, 9];356let rx = StreamReader::new(357&mut store,358DelayedStreamProducer {359inner: BufferStreamProducer {360buffer: expected.clone(),361},362sleep: sleep(),363},364);365store366.run_concurrent(async move |accessor| {367let ((rx, expected), task_exit) = readiness_guest368.local_local_readiness()369.call_start(accessor, rx, expected)370.await?;371372accessor.with(|access| {373rx.pipe(374access,375DelayedStreamConsumer {376inner: BufferStreamConsumer { expected },377sleep: sleep(),378},379)380});381382task_exit.block(accessor).await;383384Ok(())385})386.await?387}388389#[tokio::test]390pub async fn async_poll_synchronous() -> Result<()> {391test_run(&[test_programs_artifacts::ASYNC_POLL_SYNCHRONOUS_COMPONENT]).await392}393394#[tokio::test]395pub async fn async_poll_stackless() -> Result<()> {396test_run(&[test_programs_artifacts::ASYNC_POLL_STACKLESS_COMPONENT]).await397}398399mod cancel {400wasmtime::component::bindgen!({401path: "wit",402world: "cancel-host",403exports: { default: async | store | task_exit },404});405}406407// No-op function; we only test this by composing it in `async_cancel_caller`408#[allow(409dead_code,410reason = "here only to make the `assert_test_exists` macro happy"411)]412pub fn async_cancel_callee() {}413414#[tokio::test]415pub async fn async_cancel_caller() -> Result<()> {416test_cancel(Mode::Normal).await417}418419#[tokio::test]420pub async fn async_cancel_caller_leak_task_after_cancel() -> Result<()> {421test_cancel(Mode::LeakTaskAfterCancel).await422}423424#[tokio::test]425pub async fn async_trap_cancel_guest_after_start_cancelled() -> Result<()> {426test_cancel_trap(Mode::TrapCancelGuestAfterStartCancelled).await427}428429#[tokio::test]430pub async fn async_trap_cancel_guest_after_return_cancelled() -> Result<()> {431test_cancel_trap(Mode::TrapCancelGuestAfterReturnCancelled).await432}433434#[tokio::test]435pub async fn async_trap_cancel_guest_after_return() -> Result<()> {436test_cancel_trap(Mode::TrapCancelGuestAfterReturn).await437}438439#[tokio::test]440pub async fn async_trap_cancel_host_after_return_cancelled() -> Result<()> {441test_cancel_trap(Mode::TrapCancelHostAfterReturnCancelled).await442}443444#[tokio::test]445pub async fn async_trap_cancel_host_after_return() -> Result<()> {446test_cancel_trap(Mode::TrapCancelHostAfterReturn).await447}448449fn delay_millis() -> u64 {450// Miri-based builds are much slower to run, so we delay longer in that case451// to ensure that async calls which the test expects to return `BLOCKED`452// actually do so.453//454// TODO: Make this test (more) deterministic so that such tuning is not455// necessary.456if cfg!(miri) { 1000 } else { 10 }457}458459async fn test_cancel_trap(mode: Mode) -> Result<()> {460let message = "`subtask.cancel` called after terminal status delivered";461let trap = test_cancel(mode).await.unwrap_err();462assert!(463format!("{trap:?}").contains(message),464"expected `{message}`; got `{trap:?}`",465);466Ok(())467}468469async fn test_cancel(mode: Mode) -> Result<()> {470let engine = Engine::new(&config())?;471472let component = make_component(473&engine,474&[475test_programs_artifacts::ASYNC_CANCEL_CALLER_COMPONENT,476test_programs_artifacts::ASYNC_CANCEL_CALLEE_COMPONENT,477],478)479.await?;480481let mut linker = Linker::new(&engine);482483wasmtime_wasi::p2::add_to_linker_async(&mut linker)?;484sleep::local::local::sleep::add_to_linker::<_, Ctx>(&mut linker, |ctx| ctx)?;485486let mut store = Store::new(487&engine,488Ctx {489wasi: WasiCtxBuilder::new().inherit_stdio().build(),490table: ResourceTable::default(),491continue_: false,492},493);494495let cancel_host =496cancel::CancelHost::instantiate_async(&mut store, &component, &linker).await?;497store498.run_concurrent(async move |accessor| {499let ((), task) = cancel_host500.local_local_cancel()501.call_run(accessor, mode, delay_millis())502.await?;503task.block(accessor).await;504Ok::<_, wasmtime::Error>(())505})506.await??;507508Ok(())509}510511#[tokio::test]512pub async fn async_intertask_communication() -> Result<()> {513test_run_with_count(514&[test_programs_artifacts::ASYNC_INTERTASK_COMMUNICATION_COMPONENT],5152,516)517.await518}519520#[tokio::test]521pub async fn async_transmit_caller() -> Result<()> {522test_run(&[523test_programs_artifacts::ASYNC_TRANSMIT_CALLER_COMPONENT,524test_programs_artifacts::ASYNC_TRANSMIT_CALLEE_COMPONENT,525])526.await527}528529#[tokio::test]530pub async fn async_transmit_callee() -> Result<()> {531test_transmit(test_programs_artifacts::ASYNC_TRANSMIT_CALLEE_COMPONENT).await532}533534pub trait TransmitTest {535type Instance: Send + Sync;536type Params;537type Result: Send + Sync + 'static;538539fn instantiate(540store: impl AsContextMut<Data = Ctx>,541component: &Component,542linker: &Linker<Ctx>,543) -> impl Future<Output = Result<Self::Instance>>;544545fn call<'a>(546accessor: &'a Accessor<Ctx, HasSelf<Ctx>>,547instance: &'a Self::Instance,548params: Self::Params,549) -> impl Future<Output = Result<Self::Result>> + Send + 'a;550551fn into_params(552store: impl AsContextMut<Data = Ctx>,553control: StreamReader<Control>,554caller_stream: StreamReader<String>,555caller_future1: FutureReader<String>,556caller_future2: FutureReader<String>,557) -> Self::Params;558559fn from_result(560store: impl AsContextMut<Data = Ctx>,561result: Self::Result,562) -> Result<(563StreamReader<String>,564FutureReader<String>,565FutureReader<String>,566)>;567}568569struct StaticTransmitTest;570571impl TransmitTest for StaticTransmitTest {572type Instance = transmit::bindings::TransmitCallee;573type Params = (574StreamReader<Control>,575StreamReader<String>,576FutureReader<String>,577FutureReader<String>,578);579type Result = (580StreamReader<String>,581FutureReader<String>,582FutureReader<String>,583);584585async fn instantiate(586store: impl AsContextMut<Data = Ctx>,587component: &Component,588linker: &Linker<Ctx>,589) -> Result<Self::Instance> {590transmit::bindings::TransmitCallee::instantiate_async(store, &component, &linker).await591}592593fn call<'a>(594accessor: &'a Accessor<Ctx, HasSelf<Ctx>>,595instance: &'a Self::Instance,596params: Self::Params,597) -> impl Future<Output = Result<Self::Result>> + Send + 'a {598instance599.local_local_transmit()600.call_exchange(accessor, params.0, params.1, params.2, params.3)601}602603fn into_params(604_store: impl AsContextMut<Data = Ctx>,605control: StreamReader<Control>,606caller_stream: StreamReader<String>,607caller_future1: FutureReader<String>,608caller_future2: FutureReader<String>,609) -> Self::Params {610(control, caller_stream, caller_future1, caller_future2)611}612613fn from_result(614_: impl AsContextMut<Data = Ctx>,615result: Self::Result,616) -> Result<(617StreamReader<String>,618FutureReader<String>,619FutureReader<String>,620)> {621Ok(result)622}623}624625struct DynamicTransmitTest;626627impl TransmitTest for DynamicTransmitTest {628type Instance = Instance;629type Params = Vec<Val>;630type Result = Val;631632async fn instantiate(633store: impl AsContextMut<Data = Ctx>,634component: &Component,635linker: &Linker<Ctx>,636) -> Result<Self::Instance> {637linker.instantiate_async(store, component).await638}639640async fn call<'a>(641accessor: &'a Accessor<Ctx, HasSelf<Ctx>>,642instance: &'a Self::Instance,643params: Self::Params,644) -> Result<Self::Result> {645let exchange_function = accessor.with(|mut store| {646let transmit_instance = instance647.get_export_index(store.as_context_mut(), None, "local:local/transmit")648.ok_or_else(|| format_err!("can't find `local:local/transmit` in instance"))?;649let exchange_function = instance650.get_export_index(store.as_context_mut(), Some(&transmit_instance), "exchange")651.ok_or_else(|| format_err!("can't find `exchange` in instance"))?;652instance653.get_func(store.as_context_mut(), exchange_function)654.ok_or_else(|| format_err!("can't find `exchange` in instance"))655})?;656657let mut results = vec![Val::Bool(false)];658exchange_function659.call_concurrent(accessor, ¶ms, &mut results)660.await?;661Ok(results.pop().unwrap())662}663664fn into_params(665mut store: impl AsContextMut<Data = Ctx>,666control: StreamReader<Control>,667caller_stream: StreamReader<String>,668caller_future1: FutureReader<String>,669caller_future2: FutureReader<String>,670) -> Self::Params {671vec![672control.try_into_stream_any(&mut store).unwrap().into(),673caller_stream674.try_into_stream_any(&mut store)675.unwrap()676.into(),677caller_future1678.try_into_future_any(&mut store)679.unwrap()680.into(),681caller_future2682.try_into_future_any(&mut store)683.unwrap()684.into(),685]686}687688fn from_result(689_store: impl AsContextMut<Data = Ctx>,690result: Self::Result,691) -> Result<(692StreamReader<String>,693FutureReader<String>,694FutureReader<String>,695)> {696let Val::Tuple(fields) = result else {697unreachable!()698};699let mut fields = fields.into_iter();700let Val::Stream(stream) = fields.next().unwrap() else {701unreachable!()702};703let Val::Future(future1) = fields.next().unwrap() else {704unreachable!()705};706let Val::Future(future2) = fields.next().unwrap() else {707unreachable!()708};709let stream = StreamReader::try_from_stream_any(stream).unwrap();710let future1 = FutureReader::try_from_future_any(future1).unwrap();711let future2 = FutureReader::try_from_future_any(future2).unwrap();712Ok((stream, future1, future2))713}714}715716async fn test_transmit(component: &str) -> Result<()> {717test_transmit_with::<StaticTransmitTest>(component).await?;718test_transmit_with::<DynamicTransmitTest>(component).await719}720721async fn test_transmit_with<Test: TransmitTest + 'static>(component: &str) -> Result<()> {722let engine = Engine::new(&config())?;723724let component = make_component(&engine, &[component]).await?;725726let mut linker = Linker::new(&engine);727728wasmtime_wasi::p2::add_to_linker_async(&mut linker)?;729730let mut store = Store::new(731&engine,732Ctx {733wasi: WasiCtxBuilder::new().inherit_stdio().build(),734table: ResourceTable::default(),735continue_: false,736},737);738739let test = Test::instantiate(&mut store, &component, &linker).await?;740741enum Event<Test: TransmitTest> {742Result(Test::Result),743ControlWriteA(mpsc::Sender<Control>),744ControlWriteB(mpsc::Sender<Control>),745ControlWriteC(mpsc::Sender<Control>),746ControlWriteD,747WriteA,748ReadC(mpsc::Receiver<String>, Option<String>),749ReadD(mpsc::Receiver<String>, Option<String>),750ReadNone(Option<String>),751}752753let (mut control_tx, control_rx) = mpsc::channel(1);754let control_rx = StreamReader::new(&mut store, PipeProducer::new(control_rx));755let (mut caller_stream_tx, caller_stream_rx) = mpsc::channel(1);756let caller_stream_rx = StreamReader::new(&mut store, PipeProducer::new(caller_stream_rx));757let (caller_future1_tx, caller_future1_rx) = oneshot::channel();758let caller_future1_rx = FutureReader::new(&mut store, OneshotProducer::new(caller_future1_rx));759let (_, caller_future2_rx) = oneshot::channel();760let caller_future2_rx = FutureReader::new(&mut store, OneshotProducer::new(caller_future2_rx));761let (callee_future1_tx, callee_future1_rx) = oneshot::channel();762let (callee_stream_tx, callee_stream_rx) = mpsc::channel(1);763store764.run_concurrent(async |accessor| {765let mut caller_future1_tx = Some(caller_future1_tx);766let mut callee_future1_tx = Some(callee_future1_tx);767let mut callee_future1_rx = Some(callee_future1_rx);768let mut callee_stream_tx = Some(callee_stream_tx);769let mut callee_stream_rx = Some(callee_stream_rx);770let mut complete = false;771let mut futures = FuturesUnordered::<772Pin<Box<dyn Future<Output = Result<Event<Test>>> + Send>>,773>::new();774775futures.push(776async move {777control_tx.send(Control::ReadStream("a".into())).await?;778Ok(Event::ControlWriteA(control_tx))779}780.boxed(),781);782783futures.push(784async move {785caller_stream_tx.send(String::from("a")).await?;786Ok(Event::WriteA)787}788.boxed(),789);790791let params = accessor.with(|s| {792Test::into_params(793s,794control_rx,795caller_stream_rx,796caller_future1_rx,797caller_future2_rx,798)799});800801futures.push(802Test::call(accessor, &test, params)803.map(|v| v.map(Event::Result))804.boxed(),805);806807while let Some(event) = futures.try_next().await? {808match event {809Event::Result(result) => {810accessor.with(|mut store| {811let (callee_stream_rx, callee_future1_rx, _) =812Test::from_result(&mut store, result)?;813callee_stream_rx.pipe(814&mut store,815PipeConsumer::new(callee_stream_tx.take().unwrap()),816);817callee_future1_rx.pipe(818&mut store,819OneshotConsumer::new(callee_future1_tx.take().unwrap()),820);821wasmtime::error::Ok(())822})?;823}824Event::ControlWriteA(mut control_tx) => {825futures.push(826async move {827control_tx.send(Control::ReadFuture("b".into())).await?;828Ok(Event::ControlWriteB(control_tx))829}830.boxed(),831);832}833Event::WriteA => {834_ = caller_future1_tx.take().unwrap().send("b".into());835let mut callee_stream_rx = callee_stream_rx.take().unwrap();836futures.push(837async move {838let value = callee_stream_rx.next().await;839Ok(Event::ReadC(callee_stream_rx, value))840}841.boxed(),842);843}844Event::ControlWriteB(mut control_tx) => {845futures.push(846async move {847control_tx.send(Control::WriteStream("c".into())).await?;848Ok(Event::ControlWriteC(control_tx))849}850.boxed(),851);852}853Event::ControlWriteC(mut control_tx) => {854futures.push(855async move {856control_tx.send(Control::WriteFuture("d".into())).await?;857Ok(Event::ControlWriteD)858}859.boxed(),860);861}862Event::ReadC(callee_stream_rx, mut value) => {863assert_eq!(value.take().as_deref(), Some("c"));864futures.push(865callee_future1_rx866.take()867.unwrap()868.map(|v| Event::ReadD(callee_stream_rx, v.ok()))869.map(Ok)870.boxed(),871);872}873Event::ControlWriteD => {}874Event::ReadD(_, None) => unreachable!(),875Event::ReadD(mut callee_stream_rx, Some(value)) => {876assert_eq!(&value, "d");877futures.push(878async move { Ok(Event::ReadNone(callee_stream_rx.next().await)) }879.boxed(),880);881}882Event::ReadNone(Some(_)) => unreachable!(),883Event::ReadNone(None) => {884complete = true;885}886}887}888889assert!(complete);890891wasmtime::error::Ok(())892})893.await??;894Ok(())895}896897mod synchronous_transmit {898wasmtime::component::bindgen!({899path: "wit",900world: "synchronous-transmit-guest",901exports: { default: task_exit },902});903}904905#[tokio::test]906pub async fn async_cancel_transmit() -> Result<()> {907test_synchronous_transmit(908test_programs_artifacts::ASYNC_CANCEL_TRANSMIT_COMPONENT,909true,910)911.await912}913914#[tokio::test]915pub async fn async_synchronous_transmit() -> Result<()> {916test_synchronous_transmit(917test_programs_artifacts::ASYNC_SYNCHRONOUS_TRANSMIT_COMPONENT,918false,919)920.await921}922923async fn test_synchronous_transmit(component: &str, procrastinate: bool) -> Result<()> {924let engine = Engine::new(&config())?;925926let component = make_component(&engine, &[component]).await?;927928let mut linker = Linker::new(&engine);929930wasmtime_wasi::p2::add_to_linker_async(&mut linker)?;931932let mut store = Store::new(933&engine,934Ctx {935wasi: WasiCtxBuilder::new().inherit_stdio().build(),936table: ResourceTable::default(),937continue_: false,938},939);940941let instance = linker.instantiate_async(&mut store, &component).await?;942let guest = synchronous_transmit::SynchronousTransmitGuest::new(&mut store, &instance)?;943let stream_expected = vec![2u8, 4, 6, 8, 9];944let producer = DelayedStreamProducer {945inner: BufferStreamProducer {946buffer: stream_expected.clone(),947},948sleep: sleep(),949};950let stream = if procrastinate {951StreamReader::new(&mut store, ProcrastinatingStreamProducer(producer))952} else {953StreamReader::new(&mut store, producer)954};955let future_expected = 10;956let producer = DelayedFutureProducer {957inner: ValueFutureProducer {958value: future_expected,959},960sleep: sleep(),961};962let future = if procrastinate {963FutureReader::new(&mut store, ProcrastinatingFutureProducer(producer))964} else {965FutureReader::new(&mut store, producer)966};967store968.run_concurrent(async move |accessor| {969let ((stream, stream_expected, future, future_expected), task_exit) = guest970.local_local_synchronous_transmit()971.call_start(accessor, stream, stream_expected, future, future_expected)972.await?;973974accessor.with(|mut access| {975let consumer = DelayedStreamConsumer {976inner: BufferStreamConsumer {977expected: stream_expected,978},979sleep: sleep(),980};981if procrastinate {982stream.pipe(&mut access, ProcrastinatingStreamConsumer(consumer));983} else {984stream.pipe(&mut access, consumer);985}986let consumer = DelayedFutureConsumer {987inner: ValueFutureConsumer {988expected: future_expected,989},990sleep: sleep(),991};992if procrastinate {993future.pipe(access, ProcrastinatingFutureConsumer(consumer));994} else {995future.pipe(access, consumer);996}997});998999task_exit.block(accessor).await;10001001Ok(())1002})1003.await?1004}100510061007