Path: blob/main/crates/misc/component-async-tests/tests/scenario/streams.rs
1693 views
use {1super::util::{config, make_component},2anyhow::Result,3component_async_tests::{4Ctx, closed_streams,5util::{OneshotConsumer, OneshotProducer, PipeConsumer, PipeProducer},6},7futures::{8Sink, SinkExt, Stream, StreamExt,9channel::{mpsc, oneshot},10future,11},12std::{13pin::Pin,14sync::{Arc, Mutex},15task::{Context, Poll},16},17wasmtime::{18Engine, Store, StoreContextMut,19component::{20Destination, FutureReader, Linker, ResourceTable, Source, StreamConsumer,21StreamProducer, StreamReader, StreamResult,22},23},24wasmtime_wasi::WasiCtxBuilder,25};2627pub struct DirectPipeProducer<S>(S);2829impl<D, S: Stream<Item = u8> + Send + 'static> StreamProducer<D> for DirectPipeProducer<S> {30type Item = u8;31type Buffer = Option<u8>;3233fn poll_produce<'a>(34self: Pin<&mut Self>,35cx: &mut Context<'_>,36store: StoreContextMut<D>,37destination: Destination<'a, Self::Item, Self::Buffer>,38finish: bool,39) -> Poll<Result<StreamResult>> {40// SAFETY: This is a standard pin-projection, and we never move41// out of `self`.42let stream = unsafe { self.map_unchecked_mut(|v| &mut v.0) };4344match stream.poll_next(cx) {45Poll::Pending => {46if finish {47Poll::Ready(Ok(StreamResult::Cancelled))48} else {49Poll::Pending50}51}52Poll::Ready(Some(item)) => {53let mut destination = destination.as_direct(store, 1);54destination.remaining()[0] = item;55destination.mark_written(1);56Poll::Ready(Ok(StreamResult::Completed))57}58Poll::Ready(None) => Poll::Ready(Ok(StreamResult::Dropped)),59}60}61}6263pub struct DirectPipeConsumer<S>(S);6465impl<D, S: Sink<u8, Error: std::error::Error + Send + Sync> + Send + 'static> StreamConsumer<D>66for DirectPipeConsumer<S>67{68type Item = u8;6970fn poll_consume(71self: Pin<&mut Self>,72cx: &mut Context<'_>,73store: StoreContextMut<D>,74source: Source<Self::Item>,75finish: bool,76) -> Poll<Result<StreamResult>> {77// SAFETY: This is a standard pin-projection, and we never move78// out of `self`.79let mut sink = unsafe { self.map_unchecked_mut(|v| &mut v.0) };8081let on_pending = || {82if finish {83Poll::Ready(Ok(StreamResult::Cancelled))84} else {85Poll::Pending86}87};8889match sink.as_mut().poll_flush(cx) {90Poll::Pending => on_pending(),91Poll::Ready(result) => {92result?;93match sink.as_mut().poll_ready(cx) {94Poll::Pending => on_pending(),95Poll::Ready(result) => {96result?;97let mut source = source.as_direct(store);98let item = source.remaining()[0];99source.mark_read(1);100sink.start_send(item)?;101Poll::Ready(Ok(StreamResult::Completed))102}103}104}105}106}107}108109#[tokio::test]110pub async fn async_closed_streams() -> Result<()> {111let engine = Engine::new(&config())?;112113let mut store = Store::new(114&engine,115Ctx {116wasi: WasiCtxBuilder::new().inherit_stdio().build(),117table: ResourceTable::default(),118continue_: false,119wakers: Arc::new(Mutex::new(None)),120},121);122123let mut linker = Linker::new(&engine);124125wasmtime_wasi::p2::add_to_linker_async(&mut linker)?;126127let component = make_component(128&engine,129&[test_programs_artifacts::ASYNC_CLOSED_STREAMS_COMPONENT],130)131.await?;132133let instance = linker.instantiate_async(&mut store, &component).await?;134135let values = vec![42_u8, 43, 44];136137let value = 42_u8;138139// First, test stream host->host140for direct_producer in [true, false] {141for direct_consumer in [true, false] {142let (mut input_tx, input_rx) = mpsc::channel(1);143let (output_tx, mut output_rx) = mpsc::channel(1);144let reader = if direct_producer {145StreamReader::new(instance, &mut store, DirectPipeProducer(input_rx))146} else {147StreamReader::new(instance, &mut store, PipeProducer::new(input_rx))148};149if direct_consumer {150reader.pipe(&mut store, DirectPipeConsumer(output_tx));151} else {152reader.pipe(&mut store, PipeConsumer::new(output_tx));153}154155instance156.run_concurrent(&mut store, async |_| {157let (a, b) = future::join(158async {159for &value in &values {160input_tx.send(value).await?;161}162drop(input_tx);163anyhow::Ok(())164},165async {166for &value in &values {167assert_eq!(Some(value), output_rx.next().await);168}169assert!(output_rx.next().await.is_none());170Ok(())171},172)173.await;174175a.and(b)176})177.await??;178}179}180181// Next, test futures host->host182{183let (input_tx, input_rx) = oneshot::channel();184let (output_tx, output_rx) = oneshot::channel();185FutureReader::new(instance, &mut store, OneshotProducer::new(input_rx))186.pipe(&mut store, OneshotConsumer::new(output_tx));187188instance189.run_concurrent(&mut store, async |_| {190_ = input_tx.send(value);191assert_eq!(value, output_rx.await?);192anyhow::Ok(())193})194.await??;195}196197// Next, test stream host->guest198{199let (mut tx, rx) = mpsc::channel(1);200let rx = StreamReader::new(instance, &mut store, PipeProducer::new(rx));201202let closed_streams = closed_streams::bindings::ClosedStreams::new(&mut store, &instance)?;203204let values = values.clone();205206instance207.run_concurrent(&mut store, async move |accessor| {208let (a, b) = future::join(209async {210for &value in &values {211tx.send(value).await?;212}213drop(tx);214Ok(())215},216closed_streams.local_local_closed().call_read_stream(217accessor,218rx,219values.clone(),220),221)222.await;223224a.and(b)225})226.await??;227}228229// Next, test futures host->guest230{231let (tx, rx) = oneshot::channel();232let rx = FutureReader::new(instance, &mut store, OneshotProducer::new(rx));233let (_, rx_ignored) = oneshot::channel();234let rx_ignored = FutureReader::new(instance, &mut store, OneshotProducer::new(rx_ignored));235236let closed_streams = closed_streams::bindings::ClosedStreams::new(&mut store, &instance)?;237238instance239.run_concurrent(&mut store, async move |accessor| {240_ = tx.send(value);241closed_streams242.local_local_closed()243.call_read_future(accessor, rx, value, rx_ignored)244.await245})246.await??;247}248249Ok(())250}251252253