Path: blob/main/crates/misc/component-async-tests/tests/scenario/streams.rs
3088 views
use {1super::util::{config, make_component},2component_async_tests::{3Ctx, closed_streams,4util::{OneshotConsumer, OneshotProducer, PipeConsumer, PipeProducer},5},6futures::{7FutureExt, Sink, SinkExt, Stream, StreamExt,8channel::{mpsc, oneshot},9future,10},11std::{12mem,13ops::DerefMut,14pin::Pin,15sync::{Arc, Mutex},16task::{self, Context, Poll},17time::Duration,18},19wasmtime::{20Engine, Result, Store, StoreContextMut,21component::{22Destination, FutureReader, Lift, Linker, ResourceTable, Source, StreamConsumer,23StreamProducer, StreamReader, StreamResult, VecBuffer,24},25},26wasmtime_wasi::WasiCtxBuilder,27};2829pub struct DirectPipeProducer<S>(S);3031impl<D, S: Stream<Item = u8> + Send + 'static> StreamProducer<D> for DirectPipeProducer<S> {32type Item = u8;33type Buffer = Option<u8>;3435fn poll_produce<'a>(36self: Pin<&mut Self>,37cx: &mut Context<'_>,38store: StoreContextMut<D>,39destination: Destination<'a, Self::Item, Self::Buffer>,40finish: bool,41) -> Poll<Result<StreamResult>> {42// SAFETY: This is a standard pin-projection, and we never move43// out of `self`.44let stream = unsafe { self.map_unchecked_mut(|v| &mut v.0) };4546match stream.poll_next(cx) {47Poll::Pending => {48if finish {49Poll::Ready(Ok(StreamResult::Cancelled))50} else {51Poll::Pending52}53}54Poll::Ready(Some(item)) => {55let mut destination = destination.as_direct(store, 1);56destination.remaining()[0] = item;57destination.mark_written(1);58Poll::Ready(Ok(StreamResult::Completed))59}60Poll::Ready(None) => Poll::Ready(Ok(StreamResult::Dropped)),61}62}63}6465pub struct DirectPipeConsumer<S>(S);6667impl<D, S: Sink<u8, Error: std::error::Error + Send + Sync> + Send + 'static> StreamConsumer<D>68for DirectPipeConsumer<S>69{70type Item = u8;7172fn poll_consume(73self: Pin<&mut Self>,74cx: &mut Context<'_>,75store: StoreContextMut<D>,76source: Source<Self::Item>,77finish: bool,78) -> Poll<Result<StreamResult>> {79// SAFETY: This is a standard pin-projection, and we never move80// out of `self`.81let mut sink = unsafe { self.map_unchecked_mut(|v| &mut v.0) };8283let on_pending = || {84if finish {85Poll::Ready(Ok(StreamResult::Cancelled))86} else {87Poll::Pending88}89};9091match sink.as_mut().poll_flush(cx) {92Poll::Pending => on_pending(),93Poll::Ready(result) => {94result?;95match sink.as_mut().poll_ready(cx) {96Poll::Pending => on_pending(),97Poll::Ready(result) => {98result?;99let mut source = source.as_direct(store);100let item = source.remaining()[0];101source.mark_read(1);102sink.start_send(item)?;103Poll::Ready(Ok(StreamResult::Completed))104}105}106}107}108}109}110111#[tokio::test]112pub async fn async_closed_streams() -> Result<()> {113let engine = Engine::new(&config())?;114115let mut store = Store::new(116&engine,117Ctx {118wasi: WasiCtxBuilder::new().inherit_stdio().build(),119table: ResourceTable::default(),120continue_: false,121},122);123124let mut linker = Linker::new(&engine);125126wasmtime_wasi::p2::add_to_linker_async(&mut linker)?;127128let component = make_component(129&engine,130&[test_programs_artifacts::ASYNC_CLOSED_STREAMS_COMPONENT],131)132.await?;133134let instance = linker.instantiate_async(&mut store, &component).await?;135136let values = vec![42_u8, 43, 44];137138let value = 42_u8;139140// First, test stream host->host141for direct_producer in [true, false] {142for direct_consumer in [true, false] {143let (mut input_tx, input_rx) = mpsc::channel(1);144let (output_tx, mut output_rx) = mpsc::channel(1);145let reader = if direct_producer {146StreamReader::new(&mut store, DirectPipeProducer(input_rx))147} else {148StreamReader::new(&mut store, PipeProducer::new(input_rx))149};150if direct_consumer {151reader.pipe(&mut store, DirectPipeConsumer(output_tx));152} else {153reader.pipe(&mut store, PipeConsumer::new(output_tx));154}155156store157.run_concurrent(async |_| {158let (a, b) = future::join(159async {160for &value in &values {161input_tx.send(value).await?;162}163drop(input_tx);164wasmtime::error::Ok(())165},166async {167for &value in &values {168assert_eq!(Some(value), output_rx.next().await);169}170assert!(output_rx.next().await.is_none());171Ok(())172},173)174.await;175176a.and(b)177})178.await??;179}180}181182// Next, test futures host->host183{184let (input_tx, input_rx) = oneshot::channel();185let (output_tx, output_rx) = oneshot::channel();186FutureReader::new(&mut store, OneshotProducer::new(input_rx))187.pipe(&mut store, OneshotConsumer::new(output_tx));188189store190.run_concurrent(async |_| {191_ = input_tx.send(value);192assert_eq!(value, output_rx.await?);193wasmtime::error::Ok(())194})195.await??;196}197198// Next, test stream host->guest199{200let (mut tx, rx) = mpsc::channel(1);201let rx = StreamReader::new(&mut store, PipeProducer::new(rx));202203let closed_streams = closed_streams::bindings::ClosedStreams::new(&mut store, &instance)?;204205let values = values.clone();206207store208.run_concurrent(async move |accessor| {209let (a, b) = future::join(210async {211for &value in &values {212tx.send(value).await?;213}214drop(tx);215Ok(())216},217closed_streams.local_local_closed().call_read_stream(218accessor,219rx,220values.clone(),221),222)223.await;224225a.and(b)226})227.await??;228}229230// Next, test futures host->guest231{232let (tx, rx) = oneshot::channel();233let rx = FutureReader::new(&mut store, OneshotProducer::new(rx));234let (_, rx_ignored) = oneshot::channel();235let rx_ignored = FutureReader::new(&mut store, OneshotProducer::new(rx_ignored));236237let closed_streams = closed_streams::bindings::ClosedStreams::new(&mut store, &instance)?;238239store240.run_concurrent(async move |accessor| {241_ = tx.send(value);242closed_streams243.local_local_closed()244.call_read_future(accessor, rx, value, rx_ignored)245.await246})247.await??;248}249250Ok(())251}252253mod closed_stream {254wasmtime::component::bindgen!({255path: "wit",256world: "closed-stream-guest",257exports: { default: store | async },258});259}260261#[tokio::test]262pub async fn async_closed_stream() -> Result<()> {263let engine = Engine::new(&config())?;264265let component = make_component(266&engine,267&[test_programs_artifacts::ASYNC_CLOSED_STREAM_COMPONENT],268)269.await?;270271let mut linker = Linker::new(&engine);272273wasmtime_wasi::p2::add_to_linker_async(&mut linker)?;274275let mut store = Store::new(276&engine,277Ctx {278wasi: WasiCtxBuilder::new().inherit_stdio().build(),279table: ResourceTable::default(),280continue_: false,281},282);283284let instance = linker.instantiate_async(&mut store, &component).await?;285let guest = closed_stream::ClosedStreamGuest::new(&mut store, &instance)?;286store287.run_concurrent(async move |accessor| {288let stream = guest.local_local_closed_stream().call_get(accessor).await?;289290let (tx, mut rx) = mpsc::channel(1);291accessor.with(move |store| stream.pipe(store, PipeConsumer::new(tx)));292assert!(rx.next().await.is_none());293294Ok(())295})296.await?297}298299struct VecProducer<T> {300source: Vec<T>,301sleep: Pin<Box<dyn Future<Output = ()> + Send>>,302}303304impl<T> VecProducer<T> {305fn new(source: Vec<T>, delay: bool) -> Self {306Self {307source,308sleep: if delay {309tokio::time::sleep(Duration::from_millis(10)).boxed()310} else {311async {}.boxed()312},313}314}315}316317impl<D, T: Lift + Unpin + 'static> StreamProducer<D> for VecProducer<T> {318type Item = T;319type Buffer = VecBuffer<T>;320321fn poll_produce(322mut self: Pin<&mut Self>,323cx: &mut Context<'_>,324_: StoreContextMut<D>,325mut destination: Destination<Self::Item, Self::Buffer>,326_: bool,327) -> Poll<Result<StreamResult>> {328let sleep = &mut self.as_mut().get_mut().sleep;329task::ready!(sleep.as_mut().poll(cx));330*sleep = async {}.boxed();331332destination.set_buffer(mem::take(&mut self.get_mut().source).into());333Poll::Ready(Ok(StreamResult::Dropped))334}335}336337struct OneAtATime<T> {338destination: Arc<Mutex<Vec<T>>>,339sleep: Pin<Box<dyn Future<Output = ()> + Send>>,340}341342impl<T> OneAtATime<T> {343fn new(destination: Arc<Mutex<Vec<T>>>, delay: bool) -> Self {344Self {345destination,346sleep: if delay {347tokio::time::sleep(Duration::from_millis(10)).boxed()348} else {349async {}.boxed()350},351}352}353}354355impl<D, T: Lift + 'static> StreamConsumer<D> for OneAtATime<T> {356type Item = T;357358fn poll_consume(359mut self: Pin<&mut Self>,360cx: &mut Context<'_>,361store: StoreContextMut<D>,362mut source: Source<Self::Item>,363_: bool,364) -> Poll<Result<StreamResult>> {365let sleep = &mut self.as_mut().get_mut().sleep;366task::ready!(sleep.as_mut().poll(cx));367*sleep = async {}.boxed();368369let value = &mut None;370source.read(store, value)?;371self.destination.lock().unwrap().push(value.take().unwrap());372Poll::Ready(Ok(StreamResult::Completed))373}374}375376mod short_reads {377wasmtime::component::bindgen!({378path: "wit",379world: "short-reads-guest",380exports: { default: async | task_exit },381});382}383384#[tokio::test]385pub async fn async_short_reads() -> Result<()> {386test_async_short_reads(false).await387}388389#[tokio::test]390async fn async_short_reads_with_delay() -> Result<()> {391test_async_short_reads(true).await392}393394async fn test_async_short_reads(delay: bool) -> Result<()> {395use short_reads::exports::local::local::short_reads::Thing;396397let engine = Engine::new(&config())?;398399let component = make_component(400&engine,401&[test_programs_artifacts::ASYNC_SHORT_READS_COMPONENT],402)403.await?;404405let mut linker = Linker::new(&engine);406407wasmtime_wasi::p2::add_to_linker_async(&mut linker)?;408409let mut store = Store::new(410&engine,411Ctx {412wasi: WasiCtxBuilder::new().inherit_stdio().build(),413table: ResourceTable::default(),414continue_: false,415},416);417418let guest =419short_reads::ShortReadsGuest::instantiate_async(&mut store, &component, &linker).await?;420let thing = guest.local_local_short_reads().thing();421422let strings = ["a", "b", "c", "d", "e"];423let mut things = Vec::with_capacity(strings.len());424for string in strings {425things.push(thing.call_constructor(&mut store, string).await?);426}427428store429.run_concurrent(async |store| {430let count = things.len();431let stream =432store.with(|store| StreamReader::new(store, VecProducer::new(things, delay)));433434let (stream, task) = guest435.local_local_short_reads()436.call_short_reads(store, stream)437.await?;438439let received_things = Arc::new(Mutex::new(Vec::<Thing>::with_capacity(count)));440// Read just one item at a time from the guest, forcing it to441// re-take ownership of any unwritten items.442store.with(|store| stream.pipe(store, OneAtATime::new(received_things.clone(), delay)));443444task.block(store).await;445446assert_eq!(count, received_things.lock().unwrap().len());447448let mut received_strings = Vec::with_capacity(strings.len());449let received_things = mem::take(received_things.lock().unwrap().deref_mut());450for it in received_things {451received_strings.push(thing.call_get(store, it).await?.0);452}453454assert_eq!(455&strings[..],456&received_strings457.iter()458.map(|s| s.as_str())459.collect::<Vec<_>>()460);461462wasmtime::error::Ok(())463})464.await?465}466467468