Path: blob/main/crates/wasi/src/cli/worker_thread_stdin.rs
1693 views
//! Handling for standard in using a worker task.1//!2//! Standard input is a global singleton resource for the entire program which3//! needs special care. Currently this implementation adheres to a few4//! constraints which make this nontrivial to implement.5//!6//! * Any number of guest wasm programs can read stdin. While this doesn't make7//! a ton of sense semantically they shouldn't block forever. Instead it's a8//! race to see who actually reads which parts of stdin.9//!10//! * Data from stdin isn't actually read unless requested. This is done to try11//! to be a good neighbor to others running in the process. Under the12//! assumption that most programs have one "thing" which reads stdin the13//! actual consumption of bytes is delayed until the wasm guest is dynamically14//! chosen to be that "thing". Before that data from stdin is not consumed to15//! avoid taking it from other components in the process.16//!17//! * Tokio's documentation indicates that "interactive stdin" is best done with18//! a helper thread to avoid blocking shutdown of the event loop. That's19//! respected here where all stdin reading happens on a blocking helper thread20//! that, at this time, is never shut down.21//!22//! This module is one that's likely to change over time though as new systems23//! are encountered along with preexisting bugs.2425use crate::cli::{IsTerminal, StdinStream};26use bytes::{Bytes, BytesMut};27use std::io::Read;28use std::mem;29use std::pin::Pin;30use std::sync::{Condvar, Mutex, OnceLock};31use std::task::{Context, Poll};32use tokio::io::{self, AsyncRead, ReadBuf};33use tokio::sync::Notify;34use tokio::sync::futures::Notified;35use wasmtime_wasi_io::{36poll::Pollable,37streams::{InputStream, StreamError},38};3940// Implementation for tokio::io::Stdin41impl IsTerminal for tokio::io::Stdin {42fn is_terminal(&self) -> bool {43std::io::stdin().is_terminal()44}45}46impl StdinStream for tokio::io::Stdin {47fn p2_stream(&self) -> Box<dyn InputStream> {48Box::new(WasiStdin)49}50fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync> {51Box::new(WasiStdinAsyncRead::Ready)52}53}5455// Implementation for std::io::Stdin56impl IsTerminal for std::io::Stdin {57fn is_terminal(&self) -> bool {58std::io::IsTerminal::is_terminal(self)59}60}61impl StdinStream for std::io::Stdin {62fn p2_stream(&self) -> Box<dyn InputStream> {63Box::new(WasiStdin)64}65fn async_stream(&self) -> Box<dyn AsyncRead + Send + Sync> {66Box::new(WasiStdinAsyncRead::Ready)67}68}6970#[derive(Default)]71struct GlobalStdin {72state: Mutex<StdinState>,73read_requested: Condvar,74read_completed: Notify,75}7677#[derive(Default, Debug)]78enum StdinState {79#[default]80ReadNotRequested,81ReadRequested,82Data(BytesMut),83Error(std::io::Error),84Closed,85}8687impl GlobalStdin {88fn get() -> &'static GlobalStdin {89static STDIN: OnceLock<GlobalStdin> = OnceLock::new();90STDIN.get_or_init(|| create())91}92}9394fn create() -> GlobalStdin {95std::thread::spawn(|| {96let state = GlobalStdin::get();97loop {98// Wait for a read to be requested, but don't hold the lock across99// the blocking read.100let mut lock = state.state.lock().unwrap();101lock = state102.read_requested103.wait_while(lock, |state| !matches!(state, StdinState::ReadRequested))104.unwrap();105drop(lock);106107let mut bytes = BytesMut::zeroed(1024);108let (new_state, done) = match std::io::stdin().read(&mut bytes) {109Ok(0) => (StdinState::Closed, true),110Ok(nbytes) => {111bytes.truncate(nbytes);112(StdinState::Data(bytes), false)113}114Err(e) => (StdinState::Error(e), true),115};116117// After the blocking read completes the state should not have been118// tampered with.119debug_assert!(matches!(120*state.state.lock().unwrap(),121StdinState::ReadRequested122));123*state.state.lock().unwrap() = new_state;124state.read_completed.notify_waiters();125if done {126break;127}128}129});130131GlobalStdin::default()132}133134struct WasiStdin;135136#[async_trait::async_trait]137impl InputStream for WasiStdin {138fn read(&mut self, size: usize) -> Result<Bytes, StreamError> {139let g = GlobalStdin::get();140let mut locked = g.state.lock().unwrap();141match mem::replace(&mut *locked, StdinState::ReadRequested) {142StdinState::ReadNotRequested => {143g.read_requested.notify_one();144Ok(Bytes::new())145}146StdinState::ReadRequested => Ok(Bytes::new()),147StdinState::Data(mut data) => {148let size = data.len().min(size);149let bytes = data.split_to(size);150*locked = if data.is_empty() {151StdinState::ReadNotRequested152} else {153StdinState::Data(data)154};155Ok(bytes.freeze())156}157StdinState::Error(e) => {158*locked = StdinState::Closed;159Err(StreamError::LastOperationFailed(e.into()))160}161StdinState::Closed => {162*locked = StdinState::Closed;163Err(StreamError::Closed)164}165}166}167}168169#[async_trait::async_trait]170impl Pollable for WasiStdin {171async fn ready(&mut self) {172let g = GlobalStdin::get();173174// Scope the synchronous `state.lock()` to this block which does not175// `.await` inside of it.176let notified = {177let mut locked = g.state.lock().unwrap();178match *locked {179// If a read isn't requested yet180StdinState::ReadNotRequested => {181g.read_requested.notify_one();182*locked = StdinState::ReadRequested;183g.read_completed.notified()184}185StdinState::ReadRequested => g.read_completed.notified(),186StdinState::Data(_) | StdinState::Closed | StdinState::Error(_) => return,187}188};189190notified.await;191}192}193194enum WasiStdinAsyncRead {195Ready,196Waiting(Notified<'static>),197}198199impl AsyncRead for WasiStdinAsyncRead {200fn poll_read(201mut self: Pin<&mut Self>,202cx: &mut Context<'_>,203buf: &mut ReadBuf<'_>,204) -> Poll<io::Result<()>> {205let g = GlobalStdin::get();206207// Perform everything below in a `loop` to handle the case that a read208// was stolen by another thread, for example, or perhaps a spurious209// notification to `Notified`.210loop {211// If we were previously blocked on reading a "ready" notification,212// wait for that notification to complete.213if let Some(notified) = self.as_mut().notified_future() {214match notified.poll(cx) {215Poll::Ready(()) => self.set(WasiStdinAsyncRead::Ready),216Poll::Pending => break Poll::Pending,217}218}219220assert!(matches!(*self, WasiStdinAsyncRead::Ready));221222// Once we're in the "ready" state then take a look at the global223// state of stdin.224let mut locked = g.state.lock().unwrap();225match mem::replace(&mut *locked, StdinState::ReadRequested) {226// If data is available then drain what we can into `buf`.227StdinState::Data(mut data) => {228let size = data.len().min(buf.remaining());229let bytes = data.split_to(size);230*locked = if data.is_empty() {231StdinState::ReadNotRequested232} else {233StdinState::Data(data)234};235buf.put_slice(&bytes);236break Poll::Ready(Ok(()));237}238239// If stdin failed to be read then we fail with that error and240// transition to "closed"241StdinState::Error(e) => {242*locked = StdinState::Closed;243break Poll::Ready(Err(e));244}245246// If stdin is closed, keep it closed.247StdinState::Closed => {248*locked = StdinState::Closed;249break Poll::Ready(Ok(()));250}251252// For these states we indicate that a read is requested, if it253// wasn't previously requested, and then we transition to254// `Waiting` below by falling through outside this `match`.255StdinState::ReadNotRequested => {256g.read_requested.notify_one();257}258StdinState::ReadRequested => {}259}260261self.set(WasiStdinAsyncRead::Waiting(g.read_completed.notified()));262263// Intentionally drop the lock after the `notified()` future264// creation just above as to work correctly this needs to happen265// within the lock.266drop(locked);267}268}269}270271impl WasiStdinAsyncRead {272fn notified_future(self: Pin<&mut Self>) -> Option<Pin<&mut Notified<'static>>> {273// SAFETY: this is a pin-projection from `self` to the field `Notified`274// internally. Given that `self` is pinned it should be safe to acquire275// a pinned version of the internal field.276unsafe {277match self.get_unchecked_mut() {278WasiStdinAsyncRead::Ready => None,279WasiStdinAsyncRead::Waiting(notified) => Some(Pin::new_unchecked(notified)),280}281}282}283}284285286