Path: blob/main/crates/test-programs/src/bin/async_readiness.rs
1693 views
mod bindings {1wit_bindgen::generate!({2path: "../misc/component-async-tests/wit",3world: "readiness-guest",4});5}67use {8std::{mem, ptr},9test_programs::async_::{10BLOCKED, CALLBACK_CODE_EXIT, CALLBACK_CODE_WAIT, DROPPED, EVENT_NONE, EVENT_STREAM_READ,11EVENT_STREAM_WRITE, context_get, context_set, waitable_join, waitable_set_drop,12waitable_set_new,13},14};1516#[cfg(target_arch = "wasm32")]17#[link(wasm_import_module = "[export]local:local/readiness")]18unsafe extern "C" {19#[link_name = "[task-return][async]start"]20fn task_return_start(_: u32, _: *const u8, _: usize);21}22#[cfg(not(target_arch = "wasm32"))]23unsafe extern "C" fn task_return_start(_: u32, _: *const u8, _: usize) {24unreachable!()25}2627#[cfg(target_arch = "wasm32")]28#[link(wasm_import_module = "[export]local:local/readiness")]29unsafe extern "C" {30#[link_name = "[stream-new-0][async]start"]31fn stream_new() -> u64;32}33#[cfg(not(target_arch = "wasm32"))]34unsafe extern "C" fn stream_new() -> u64 {35unreachable!()36}3738#[cfg(target_arch = "wasm32")]39#[link(wasm_import_module = "[export]local:local/readiness")]40unsafe extern "C" {41#[link_name = "[async-lower][stream-write-0][async]start"]42fn stream_write(_: u32, _: *const u8, _: usize) -> u32;43}44#[cfg(not(target_arch = "wasm32"))]45unsafe extern "C" fn stream_write(_: u32, _: *const u8, _: usize) -> u32 {46unreachable!()47}4849#[cfg(target_arch = "wasm32")]50#[link(wasm_import_module = "[export]local:local/readiness")]51unsafe extern "C" {52#[link_name = "[async-lower][stream-read-0][async]start"]53fn stream_read(_: u32, _: *mut u8, _: usize) -> u32;54}55#[cfg(not(target_arch = "wasm32"))]56unsafe extern "C" fn stream_read(_: u32, _: *mut u8, _: usize) -> u32 {57unreachable!()58}5960#[cfg(target_arch = "wasm32")]61#[link(wasm_import_module = "[export]local:local/readiness")]62unsafe extern "C" {63#[link_name = "[stream-drop-readable-0][async]start"]64fn stream_drop_readable(_: u32);65}66#[cfg(not(target_arch = "wasm32"))]67unsafe extern "C" fn stream_drop_readable(_: u32) {68unreachable!()69}7071#[cfg(target_arch = "wasm32")]72#[link(wasm_import_module = "[export]local:local/readiness")]73unsafe extern "C" {74#[link_name = "[stream-drop-writable-0][async]start"]75fn stream_drop_writable(_: u32);76}77#[cfg(not(target_arch = "wasm32"))]78unsafe extern "C" fn stream_drop_writable(_: u32) {79unreachable!()80}8182static BYTES_TO_WRITE: &[u8] = &[1, 3, 5, 7, 11];8384enum State {85S0 {86rx: u32,87expected: Vec<u8>,88},89S1 {90set: u32,91tx: Option<u32>,92rx: Option<u32>,93expected: Vec<u8>,94},95}9697#[unsafe(export_name = "[async-lift]local:local/readiness#[async]start")]98unsafe extern "C" fn export_start(rx: u32, expected: u32, expected_len: u32) -> u32 {99let expected_len = usize::try_from(expected_len).unwrap();100101unsafe {102context_set(103u32::try_from(Box::into_raw(Box::new(State::S0 {104rx,105expected: Vec::from_raw_parts(106expected as usize as *mut u8,107expected_len,108expected_len,109),110})) as usize)111.unwrap(),112);113114callback_start(EVENT_NONE, 0, 0)115}116}117118#[unsafe(export_name = "[callback][async-lift]local:local/readiness#[async]start")]119unsafe extern "C" fn callback_start(event0: u32, event1: u32, event2: u32) -> u32 {120unsafe {121let state = &mut *(usize::try_from(context_get()).unwrap() as *mut State);122match state {123State::S0 { rx, expected } => {124assert_eq!(event0, EVENT_NONE);125126// Do a zero-length read to wait until the writer is ready.127//128// Here we assume specific behavior from the writer, namely:129//130// - It is not immediately ready to send us anything.131//132// - When it _is_ ready, it will send us all the bytes it told us to133// expect at once.134let status = stream_read(*rx, ptr::null_mut(), 0);135assert_eq!(status, BLOCKED);136137let set = waitable_set_new();138139waitable_join(*rx, set);140141let tx = {142let pair = stream_new();143let tx = u32::try_from(pair >> 32).unwrap();144let rx = u32::try_from(pair & 0xFFFFFFFF_u64).unwrap();145146// Do a zero-length write to wait until the reader is ready.147//148// Here we assume specific behavior from the reader, namely:149//150// - It is not immediately ready to receive anything (indeed, it151// can't possibly be ready given that we haven't returned the152// read handle to it yet).153//154// - When it _is_ ready, it will accept all the bytes we told it155// to expect at once.156let status = stream_write(tx, ptr::null(), 0);157assert_eq!(status, BLOCKED);158159waitable_join(tx, set);160161task_return_start(rx, BYTES_TO_WRITE.as_ptr(), BYTES_TO_WRITE.len());162163tx164};165166*state = State::S1 {167set,168tx: Some(tx),169rx: Some(*rx),170expected: mem::take(expected),171};172173CALLBACK_CODE_WAIT | (set << 4)174}175176State::S1 {177set,178tx,179rx,180expected,181} => {182if event0 == EVENT_STREAM_READ {183let rx = rx.take().unwrap();184assert_eq!(event1, rx);185assert_eq!(event2, 0);186187// The writer is ready now, so this read should not block.188//189// As noted above, we we rely on the writer sending us all the190// expected bytes at once.191let received = &mut vec![0_u8; expected.len()];192let status = stream_read(rx, received.as_mut_ptr(), received.len());193assert_eq!(194status,195DROPPED | u32::try_from(received.len() << 4).unwrap()196);197assert_eq!(received, expected);198199waitable_join(rx, 0);200stream_drop_readable(rx);201202if tx.is_none() {203waitable_set_drop(*set);204205CALLBACK_CODE_EXIT206} else {207CALLBACK_CODE_WAIT | (*set << 4)208}209} else if event0 == EVENT_STREAM_WRITE {210let tx = tx.take().unwrap();211assert_eq!(event1, tx);212assert_eq!(event2, 0);213214// The reader is ready now, so this write should not block.215//216// As noted above, we we rely on the reader accepting all the217// expected bytes at once.218let status = stream_write(tx, BYTES_TO_WRITE.as_ptr(), BYTES_TO_WRITE.len());219assert_eq!(220status,221DROPPED | u32::try_from(BYTES_TO_WRITE.len() << 4).unwrap()222);223224waitable_join(tx, 0);225stream_drop_writable(tx);226227if rx.is_none() {228waitable_set_drop(*set);229230CALLBACK_CODE_EXIT231} else {232CALLBACK_CODE_WAIT | (*set << 4)233}234} else {235unreachable!()236}237}238}239}240}241242// Unused function; required since this file is built as a `bin`:243fn main() {}244245246