Path: blob/main/crates/misc/component-async-tests/tests/scenario/transmit.rs
1693 views
use std::future::{self, Future};1use std::pin::Pin;2use std::sync::{Arc, Mutex};3use std::task::{Context, Poll};4use std::time::Duration;56use super::util::{config, make_component, test_run, test_run_with_count};7use anyhow::{Result, anyhow};8use cancel::exports::local::local::cancel::Mode;9use component_async_tests::transmit::bindings::exports::local::local::transmit::Control;10use component_async_tests::util::{OneshotConsumer, OneshotProducer, PipeConsumer, PipeProducer};11use component_async_tests::{Ctx, sleep, transmit};12use futures::{13FutureExt, SinkExt, StreamExt, TryStreamExt,14channel::{mpsc, oneshot},15stream::FuturesUnordered,16};17use wasmtime::component::{18Accessor, Component, Destination, FutureReader, HasSelf, Instance, Linker, ResourceTable,19Source, StreamConsumer, StreamProducer, StreamReader, StreamResult, Val,20};21use wasmtime::{AsContextMut, Engine, Store, StoreContextMut, Trap};22use wasmtime_wasi::WasiCtxBuilder;2324mod readiness {25wasmtime::component::bindgen!({26path: "wit",27world: "readiness-guest"28});29}3031struct ReadinessProducer {32buffer: Vec<u8>,33sleep: Pin<Box<dyn Future<Output = ()> + Send>>,34}3536impl<D> StreamProducer<D> for ReadinessProducer {37type Item = u8;38type Buffer = Option<u8>;3940fn poll_produce<'a>(41self: Pin<&mut Self>,42cx: &mut Context<'_>,43mut store: StoreContextMut<'a, D>,44destination: Destination<'a, Self::Item, Self::Buffer>,45finish: bool,46) -> Poll<Result<StreamResult>> {47let me = self.get_mut();4849match me.sleep.as_mut().poll(cx) {50Poll::Pending => {51if finish {52Poll::Ready(Ok(StreamResult::Cancelled))53} else {54Poll::Pending55}56}57Poll::Ready(()) => {58me.sleep = async {}.boxed();59let capacity = destination.remaining(store.as_context_mut());60if capacity == Some(0) {61Poll::Ready(Ok(StreamResult::Completed))62} else {63assert_eq!(capacity, Some(me.buffer.len()));64let mut destination = destination.as_direct(store, me.buffer.len());65destination.remaining().copy_from_slice(&me.buffer);66destination.mark_written(me.buffer.len());6768Poll::Ready(Ok(StreamResult::Dropped))69}70}71}72}73}7475struct ReadinessConsumer {76expected: Vec<u8>,77sleep: Pin<Box<dyn Future<Output = ()> + Send>>,78}7980impl<D> StreamConsumer<D> for ReadinessConsumer {81type Item = u8;8283fn poll_consume(84self: Pin<&mut Self>,85cx: &mut Context<'_>,86mut store: StoreContextMut<D>,87source: Source<Self::Item>,88finish: bool,89) -> Poll<Result<StreamResult>> {90let me = self.get_mut();9192match me.sleep.as_mut().poll(cx) {93Poll::Pending => {94if finish {95Poll::Ready(Ok(StreamResult::Cancelled))96} else {97Poll::Pending98}99}100Poll::Ready(()) => {101me.sleep = async {}.boxed();102let available = source.remaining(store.as_context_mut());103if available == 0 {104Poll::Ready(Ok(StreamResult::Completed))105} else {106assert_eq!(available, me.expected.len());107let mut source = source.as_direct(store);108assert_eq!(&me.expected, source.remaining());109source.mark_read(me.expected.len());110111Poll::Ready(Ok(StreamResult::Dropped))112}113}114}115}116}117118#[tokio::test]119pub async fn async_readiness() -> Result<()> {120let component = test_programs_artifacts::ASYNC_READINESS_COMPONENT;121122let engine = Engine::new(&config())?;123124let component = make_component(&engine, &[component]).await?;125126let mut linker = Linker::new(&engine);127128wasmtime_wasi::p2::add_to_linker_async(&mut linker)?;129130let mut store = Store::new(131&engine,132Ctx {133wasi: WasiCtxBuilder::new().inherit_stdio().build(),134table: ResourceTable::default(),135continue_: false,136wakers: Arc::new(Mutex::new(None)),137},138);139140let instance = linker.instantiate_async(&mut store, &component).await?;141let readiness_guest = readiness::ReadinessGuest::new(&mut store, &instance)?;142let expected = vec![2u8, 4, 6, 8, 9];143let rx = StreamReader::new(144instance,145&mut store,146ReadinessProducer {147buffer: expected.clone(),148sleep: component_async_tests::util::sleep(Duration::from_millis(delay_millis()))149.boxed(),150},151);152let result = instance153.run_concurrent(&mut store, async move |accessor| {154let (rx, expected) = readiness_guest155.local_local_readiness()156.call_start(accessor, rx, expected)157.await?;158159accessor.with(|access| {160rx.pipe(161access,162ReadinessConsumer {163expected,164sleep: component_async_tests::util::sleep(Duration::from_millis(165delay_millis(),166))167.boxed(),168},169)170});171172future::pending::<Result<()>>().await173})174.await;175176// As of this writing, passing a future which never resolves to177// `Instance::run_concurrent` and expecting a `Trap::AsyncDeadlock` is178// the only way to join all tasks for the `Instance`, so that's what we179// do:180assert!(matches!(181result.unwrap_err().downcast::<Trap>(),182Ok(Trap::AsyncDeadlock)183));184185Ok(())186}187188#[tokio::test]189pub async fn async_poll_synchronous() -> Result<()> {190test_run(&[test_programs_artifacts::ASYNC_POLL_SYNCHRONOUS_COMPONENT]).await191}192193#[tokio::test]194pub async fn async_poll_stackless() -> Result<()> {195test_run(&[test_programs_artifacts::ASYNC_POLL_STACKLESS_COMPONENT]).await196}197198mod cancel {199wasmtime::component::bindgen!({200path: "wit",201world: "cancel-host",202exports: { default: async | store },203});204}205206// No-op function; we only test this by composing it in `async_cancel_caller`207#[allow(208dead_code,209reason = "here only to make the `assert_test_exists` macro happy"210)]211pub fn async_cancel_callee() {}212213#[tokio::test]214pub async fn async_cancel_caller() -> Result<()> {215test_cancel(Mode::Normal).await216}217218#[tokio::test]219pub async fn async_cancel_caller_leak_task_after_cancel() -> Result<()> {220test_cancel(Mode::LeakTaskAfterCancel).await221}222223#[tokio::test]224pub async fn async_trap_cancel_guest_after_start_cancelled() -> Result<()> {225test_cancel_trap(Mode::TrapCancelGuestAfterStartCancelled).await226}227228#[tokio::test]229pub async fn async_trap_cancel_guest_after_return_cancelled() -> Result<()> {230test_cancel_trap(Mode::TrapCancelGuestAfterReturnCancelled).await231}232233#[tokio::test]234pub async fn async_trap_cancel_guest_after_return() -> Result<()> {235test_cancel_trap(Mode::TrapCancelGuestAfterReturn).await236}237238#[tokio::test]239pub async fn async_trap_cancel_host_after_return_cancelled() -> Result<()> {240test_cancel_trap(Mode::TrapCancelHostAfterReturnCancelled).await241}242243#[tokio::test]244pub async fn async_trap_cancel_host_after_return() -> Result<()> {245test_cancel_trap(Mode::TrapCancelHostAfterReturn).await246}247248fn delay_millis() -> u64 {249// Miri-based builds are much slower to run, so we delay longer in that case250// to ensure that async calls which the test expects to return `BLOCKED`251// actually do so.252//253// TODO: Make this test (more) deterministic so that such tuning is not254// necessary.255if cfg!(miri) { 1000 } else { 10 }256}257258async fn test_cancel_trap(mode: Mode) -> Result<()> {259let message = "`subtask.cancel` called after terminal status delivered";260let trap = test_cancel(mode).await.unwrap_err();261assert!(262format!("{trap:?}").contains(message),263"expected `{message}`; got `{trap:?}`",264);265Ok(())266}267268async fn test_cancel(mode: Mode) -> Result<()> {269let engine = Engine::new(&config())?;270271let component = make_component(272&engine,273&[274test_programs_artifacts::ASYNC_CANCEL_CALLER_COMPONENT,275test_programs_artifacts::ASYNC_CANCEL_CALLEE_COMPONENT,276],277)278.await?;279280let mut linker = Linker::new(&engine);281282wasmtime_wasi::p2::add_to_linker_async(&mut linker)?;283sleep::local::local::sleep::add_to_linker::<_, Ctx>(&mut linker, |ctx| ctx)?;284285let mut store = Store::new(286&engine,287Ctx {288wasi: WasiCtxBuilder::new().inherit_stdio().build(),289table: ResourceTable::default(),290continue_: false,291wakers: Arc::new(Mutex::new(None)),292},293);294295let instance = linker.instantiate_async(&mut store, &component).await?;296let cancel_host = cancel::CancelHost::new(&mut store, &instance)?;297instance298.run_concurrent(&mut store, async move |accessor| {299cancel_host300.local_local_cancel()301.call_run(accessor, mode, delay_millis())302.await303})304.await??;305306Ok(())307}308309#[tokio::test]310pub async fn async_intertask_communication() -> Result<()> {311test_run_with_count(312&[test_programs_artifacts::ASYNC_INTERTASK_COMMUNICATION_COMPONENT],3132,314)315.await316}317318#[tokio::test]319pub async fn async_transmit_caller() -> Result<()> {320test_run(&[321test_programs_artifacts::ASYNC_TRANSMIT_CALLER_COMPONENT,322test_programs_artifacts::ASYNC_TRANSMIT_CALLEE_COMPONENT,323])324.await325}326327#[tokio::test]328pub async fn async_transmit_callee() -> Result<()> {329test_transmit(test_programs_artifacts::ASYNC_TRANSMIT_CALLEE_COMPONENT).await330}331332pub trait TransmitTest {333type Instance: Send + Sync;334type Params;335type Result: Send + Sync + 'static;336337fn instantiate(338store: impl AsContextMut<Data = Ctx>,339component: &Component,340linker: &Linker<Ctx>,341) -> impl Future<Output = Result<(Self::Instance, Instance)>>;342343fn call<'a>(344accessor: &'a Accessor<Ctx, HasSelf<Ctx>>,345instance: &'a Self::Instance,346params: Self::Params,347) -> impl Future<Output = Result<Self::Result>> + Send + 'a;348349fn into_params(350control: StreamReader<Control>,351caller_stream: StreamReader<String>,352caller_future1: FutureReader<String>,353caller_future2: FutureReader<String>,354) -> Self::Params;355356fn from_result(357store: impl AsContextMut<Data = Ctx>,358instance: Instance,359result: Self::Result,360) -> Result<(361StreamReader<String>,362FutureReader<String>,363FutureReader<String>,364)>;365}366367struct StaticTransmitTest;368369impl TransmitTest for StaticTransmitTest {370type Instance = transmit::bindings::TransmitCallee;371type Params = (372StreamReader<Control>,373StreamReader<String>,374FutureReader<String>,375FutureReader<String>,376);377type Result = (378StreamReader<String>,379FutureReader<String>,380FutureReader<String>,381);382383async fn instantiate(384mut store: impl AsContextMut<Data = Ctx>,385component: &Component,386linker: &Linker<Ctx>,387) -> Result<(Self::Instance, Instance)> {388let instance = linker.instantiate_async(&mut store, component).await?;389let callee = transmit::bindings::TransmitCallee::new(store, &instance)?;390Ok((callee, instance))391}392393fn call<'a>(394accessor: &'a Accessor<Ctx, HasSelf<Ctx>>,395instance: &'a Self::Instance,396params: Self::Params,397) -> impl Future<Output = Result<Self::Result>> + Send + 'a {398instance399.local_local_transmit()400.call_exchange(accessor, params.0, params.1, params.2, params.3)401}402403fn into_params(404control: StreamReader<Control>,405caller_stream: StreamReader<String>,406caller_future1: FutureReader<String>,407caller_future2: FutureReader<String>,408) -> Self::Params {409(control, caller_stream, caller_future1, caller_future2)410}411412fn from_result(413_: impl AsContextMut<Data = Ctx>,414_: Instance,415result: Self::Result,416) -> Result<(417StreamReader<String>,418FutureReader<String>,419FutureReader<String>,420)> {421Ok(result)422}423}424425struct DynamicTransmitTest;426427impl TransmitTest for DynamicTransmitTest {428type Instance = Instance;429type Params = Vec<Val>;430type Result = Val;431432async fn instantiate(433store: impl AsContextMut<Data = Ctx>,434component: &Component,435linker: &Linker<Ctx>,436) -> Result<(Self::Instance, Instance)> {437let instance = linker.instantiate_async(store, component).await?;438Ok((instance, instance))439}440441async fn call<'a>(442accessor: &'a Accessor<Ctx, HasSelf<Ctx>>,443instance: &'a Self::Instance,444params: Self::Params,445) -> Result<Self::Result> {446let exchange_function = accessor.with(|mut store| {447let transmit_instance = instance448.get_export_index(store.as_context_mut(), None, "local:local/transmit")449.ok_or_else(|| anyhow!("can't find `local:local/transmit` in instance"))?;450let exchange_function = instance451.get_export_index(452store.as_context_mut(),453Some(&transmit_instance),454"[async]exchange",455)456.ok_or_else(|| anyhow!("can't find `exchange` in instance"))?;457instance458.get_func(store.as_context_mut(), exchange_function)459.ok_or_else(|| anyhow!("can't find `exchange` in instance"))460})?;461462let mut results = vec![Val::Bool(false)];463exchange_function464.call_concurrent(accessor, ¶ms, &mut results)465.await?;466Ok(results.pop().unwrap())467}468469fn into_params(470control: StreamReader<Control>,471caller_stream: StreamReader<String>,472caller_future1: FutureReader<String>,473caller_future2: FutureReader<String>,474) -> Self::Params {475vec![476control.into_val(),477caller_stream.into_val(),478caller_future1.into_val(),479caller_future2.into_val(),480]481}482483fn from_result(484mut store: impl AsContextMut<Data = Ctx>,485instance: Instance,486result: Self::Result,487) -> Result<(488StreamReader<String>,489FutureReader<String>,490FutureReader<String>,491)> {492let Val::Tuple(fields) = result else {493unreachable!()494};495let stream = StreamReader::from_val(store.as_context_mut(), instance, &fields[0])?;496let future1 = FutureReader::from_val(store.as_context_mut(), instance, &fields[1])?;497let future2 = FutureReader::from_val(store.as_context_mut(), instance, &fields[2])?;498Ok((stream, future1, future2))499}500}501502async fn test_transmit(component: &str) -> Result<()> {503test_transmit_with::<StaticTransmitTest>(component).await?;504test_transmit_with::<DynamicTransmitTest>(component).await505}506507async fn test_transmit_with<Test: TransmitTest + 'static>(component: &str) -> Result<()> {508let engine = Engine::new(&config())?;509510let make_store = || {511Store::new(512&engine,513Ctx {514wasi: WasiCtxBuilder::new().inherit_stdio().build(),515table: ResourceTable::default(),516continue_: false,517wakers: Arc::new(Mutex::new(None)),518},519)520};521522let component = make_component(&engine, &[component]).await?;523524let mut linker = Linker::new(&engine);525526wasmtime_wasi::p2::add_to_linker_async(&mut linker)?;527528let mut store = make_store();529530let (test, instance) = Test::instantiate(&mut store, &component, &linker).await?;531532enum Event<Test: TransmitTest> {533Result(Test::Result),534ControlWriteA(mpsc::Sender<Control>),535ControlWriteB(mpsc::Sender<Control>),536ControlWriteC(mpsc::Sender<Control>),537ControlWriteD,538WriteA,539ReadC(mpsc::Receiver<String>, Option<String>),540ReadD(mpsc::Receiver<String>, Option<String>),541ReadNone(Option<String>),542}543544let (mut control_tx, control_rx) = mpsc::channel(1);545let control_rx = StreamReader::new(instance, &mut store, PipeProducer::new(control_rx));546let (mut caller_stream_tx, caller_stream_rx) = mpsc::channel(1);547let caller_stream_rx =548StreamReader::new(instance, &mut store, PipeProducer::new(caller_stream_rx));549let (caller_future1_tx, caller_future1_rx) = oneshot::channel();550let caller_future1_rx = FutureReader::new(551instance,552&mut store,553OneshotProducer::new(caller_future1_rx),554);555let (_, caller_future2_rx) = oneshot::channel();556let caller_future2_rx = FutureReader::new(557instance,558&mut store,559OneshotProducer::new(caller_future2_rx),560);561let (callee_future1_tx, callee_future1_rx) = oneshot::channel();562let (callee_stream_tx, callee_stream_rx) = mpsc::channel(1);563instance564.run_concurrent(&mut store, async |accessor| {565let mut caller_future1_tx = Some(caller_future1_tx);566let mut callee_future1_tx = Some(callee_future1_tx);567let mut callee_future1_rx = Some(callee_future1_rx);568let mut callee_stream_tx = Some(callee_stream_tx);569let mut callee_stream_rx = Some(callee_stream_rx);570let mut complete = false;571let mut futures = FuturesUnordered::<572Pin<Box<dyn Future<Output = Result<Event<Test>>> + Send>>,573>::new();574575futures.push(576async move {577control_tx.send(Control::ReadStream("a".into())).await?;578Ok(Event::ControlWriteA(control_tx))579}580.boxed(),581);582583futures.push(584async move {585caller_stream_tx.send(String::from("a")).await?;586Ok(Event::WriteA)587}588.boxed(),589);590591futures.push(592Test::call(593accessor,594&test,595Test::into_params(596control_rx,597caller_stream_rx,598caller_future1_rx,599caller_future2_rx,600),601)602.map(|v| v.map(Event::Result))603.boxed(),604);605606while let Some(event) = futures.try_next().await? {607match event {608Event::Result(result) => {609accessor.with(|mut store| {610let (callee_stream_rx, callee_future1_rx, _) =611Test::from_result(&mut store, instance, result)?;612callee_stream_rx.pipe(613&mut store,614PipeConsumer::new(callee_stream_tx.take().unwrap()),615);616callee_future1_rx.pipe(617&mut store,618OneshotConsumer::new(callee_future1_tx.take().unwrap()),619);620anyhow::Ok(())621})?;622}623Event::ControlWriteA(mut control_tx) => {624futures.push(625async move {626control_tx.send(Control::ReadFuture("b".into())).await?;627Ok(Event::ControlWriteB(control_tx))628}629.boxed(),630);631}632Event::WriteA => {633_ = caller_future1_tx.take().unwrap().send("b".into());634let mut callee_stream_rx = callee_stream_rx.take().unwrap();635futures.push(636async move {637let value = callee_stream_rx.next().await;638Ok(Event::ReadC(callee_stream_rx, value))639}640.boxed(),641);642}643Event::ControlWriteB(mut control_tx) => {644futures.push(645async move {646control_tx.send(Control::WriteStream("c".into())).await?;647Ok(Event::ControlWriteC(control_tx))648}649.boxed(),650);651}652Event::ControlWriteC(mut control_tx) => {653futures.push(654async move {655control_tx.send(Control::WriteFuture("d".into())).await?;656Ok(Event::ControlWriteD)657}658.boxed(),659);660}661Event::ReadC(callee_stream_rx, mut value) => {662assert_eq!(value.take().as_deref(), Some("c"));663futures.push(664callee_future1_rx665.take()666.unwrap()667.map(|v| Event::ReadD(callee_stream_rx, v.ok()))668.map(Ok)669.boxed(),670);671}672Event::ControlWriteD => {}673Event::ReadD(_, None) => unreachable!(),674Event::ReadD(mut callee_stream_rx, Some(value)) => {675assert_eq!(&value, "d");676futures.push(677async move { Ok(Event::ReadNone(callee_stream_rx.next().await)) }678.boxed(),679);680}681Event::ReadNone(Some(_)) => unreachable!(),682Event::ReadNone(None) => {683complete = true;684}685}686}687688assert!(complete);689690anyhow::Ok(())691})692.await??;693Ok(())694}695696697