Path: blob/main/crates/misc/component-async-tests/src/util.rs
1692 views
use anyhow::Result;1use futures::{Sink, Stream, channel::oneshot};2use std::{3marker::PhantomData,4pin::Pin,5task::{Context, Poll},6thread,7};8use wasmtime::{9StoreContextMut,10component::{11Accessor, Destination, FutureConsumer, FutureProducer, Lift, Lower, Source, StreamConsumer,12StreamProducer, StreamResult,13},14};1516pub async fn sleep(duration: std::time::Duration) {17if cfg!(miri) {18// TODO: We should be able to use `tokio::time::sleep` here, but as of19// this writing the miri-compatible version of `wasmtime-fiber` uses20// threads behind the scenes, which means thread-local storage is not21// preserved when we switch fibers, and that confuses Tokio. If we ever22// fix that we can stop using our own, special version of `sleep` and23// switch back to the Tokio version.2425let (tx, rx) = oneshot::channel();26let handle = thread::spawn(move || {27thread::sleep(duration);28_ = tx.send(());29});30_ = rx.await;31_ = handle.join();32} else {33tokio::time::sleep(duration).await;34}35}3637pub struct PipeProducer<S>(S);3839impl<S> PipeProducer<S> {40pub fn new(rx: S) -> Self {41Self(rx)42}43}4445impl<D, T: Send + Sync + Lower + 'static, S: Stream<Item = T> + Send + 'static> StreamProducer<D>46for PipeProducer<S>47{48type Item = T;49type Buffer = Option<T>;5051fn poll_produce<'a>(52self: Pin<&mut Self>,53cx: &mut Context<'_>,54_: StoreContextMut<D>,55mut destination: Destination<'a, Self::Item, Self::Buffer>,56finish: bool,57) -> Poll<Result<StreamResult>> {58// SAFETY: This is a standard pin-projection, and we never move59// out of `self`.60let stream = unsafe { self.map_unchecked_mut(|v| &mut v.0) };6162match stream.poll_next(cx) {63Poll::Pending => {64if finish {65Poll::Ready(Ok(StreamResult::Cancelled))66} else {67Poll::Pending68}69}70Poll::Ready(Some(item)) => {71destination.set_buffer(Some(item));72Poll::Ready(Ok(StreamResult::Completed))73}74Poll::Ready(None) => Poll::Ready(Ok(StreamResult::Dropped)),75}76}77}7879pub struct PipeConsumer<T, S>(S, PhantomData<fn() -> T>);8081impl<T, S> PipeConsumer<T, S> {82pub fn new(tx: S) -> Self {83Self(tx, PhantomData)84}85}8687impl<D, T: Lift + 'static, S: Sink<T, Error: std::error::Error + Send + Sync> + Send + 'static>88StreamConsumer<D> for PipeConsumer<T, S>89{90type Item = T;9192fn poll_consume(93self: Pin<&mut Self>,94cx: &mut Context<'_>,95store: StoreContextMut<D>,96mut source: Source<Self::Item>,97finish: bool,98) -> Poll<Result<StreamResult>> {99// SAFETY: This is a standard pin-projection, and we never move100// out of `self`.101let mut sink = unsafe { self.map_unchecked_mut(|v| &mut v.0) };102103let on_pending = || {104if finish {105Poll::Ready(Ok(StreamResult::Cancelled))106} else {107Poll::Pending108}109};110111match sink.as_mut().poll_flush(cx) {112Poll::Pending => on_pending(),113Poll::Ready(result) => {114result?;115match sink.as_mut().poll_ready(cx) {116Poll::Pending => on_pending(),117Poll::Ready(result) => {118result?;119let item = &mut None;120source.read(store, item)?;121sink.start_send(item.take().unwrap())?;122Poll::Ready(Ok(StreamResult::Completed))123}124}125}126}127}128}129130pub struct OneshotProducer<T>(oneshot::Receiver<T>);131132impl<T> OneshotProducer<T> {133pub fn new(rx: oneshot::Receiver<T>) -> Self {134Self(rx)135}136}137138impl<D, T: Send + 'static> FutureProducer<D> for OneshotProducer<T> {139type Item = T;140141async fn produce(self, _: &Accessor<D>) -> Result<T> {142Ok(self.0.await?)143}144}145146pub struct OneshotConsumer<T>(oneshot::Sender<T>);147148impl<T> OneshotConsumer<T> {149pub fn new(tx: oneshot::Sender<T>) -> Self {150Self(tx)151}152}153154impl<D, T: Send + 'static> FutureConsumer<D> for OneshotConsumer<T> {155type Item = T;156157async fn consume(self, _: &Accessor<D>, value: T) -> Result<()> {158_ = self.0.send(value);159Ok(())160}161}162163164