Path: blob/main/crates/test-programs/src/bin/async_cancel_transmit.rs
3068 views
mod bindings {1wit_bindgen::generate!({2path: "../misc/component-async-tests/wit",3world: "synchronous-transmit-guest",4});5}67use {8std::{9mem::{self, ManuallyDrop},10slice,11},12test_programs::async_::{13BLOCKED, CALLBACK_CODE_EXIT, CALLBACK_CODE_YIELD, COMPLETED, DROPPED, EVENT_NONE,14context_get, context_set,15},16};1718#[cfg(target_arch = "wasm32")]19#[link(wasm_import_module = "[export]local:local/synchronous-transmit")]20unsafe extern "C" {21#[link_name = "[task-return]start"]22fn task_return_start(_: u32, _: *const u8, _: usize, _: u32, _: u8);23}24#[cfg(not(target_arch = "wasm32"))]25unsafe extern "C" fn task_return_start(_: u32, _: *const u8, _: usize, _: u32, _: u8) {26unreachable!()27}2829#[cfg(target_arch = "wasm32")]30#[link(wasm_import_module = "[export]local:local/synchronous-transmit")]31unsafe extern "C" {32#[link_name = "[stream-new-0]start"]33fn stream_new() -> u64;34}35#[cfg(not(target_arch = "wasm32"))]36unsafe extern "C" fn stream_new() -> u64 {37unreachable!()38}3940#[cfg(target_arch = "wasm32")]41#[link(wasm_import_module = "[export]local:local/synchronous-transmit")]42unsafe extern "C" {43#[link_name = "[async-lower][stream-write-0]start"]44fn stream_write(_: u32, _: *const u8, _: usize) -> u32;45}46#[cfg(not(target_arch = "wasm32"))]47unsafe extern "C" fn stream_write(_: u32, _: *const u8, _: usize) -> u32 {48unreachable!()49}5051#[cfg(target_arch = "wasm32")]52#[link(wasm_import_module = "[export]local:local/synchronous-transmit")]53unsafe extern "C" {54#[link_name = "[async-lower][stream-read-0]start"]55fn stream_read(_: u32, _: *mut u8, _: usize) -> u32;56}57#[cfg(not(target_arch = "wasm32"))]58unsafe extern "C" fn stream_read(_: u32, _: *mut u8, _: usize) -> u32 {59unreachable!()60}6162#[cfg(target_arch = "wasm32")]63#[link(wasm_import_module = "[export]local:local/synchronous-transmit")]64unsafe extern "C" {65#[link_name = "[stream-cancel-write-0]start"]66fn stream_cancel_write(_: u32) -> u32;67}68#[cfg(not(target_arch = "wasm32"))]69unsafe extern "C" fn stream_cancel_write(_: u32) -> u32 {70unreachable!()71}7273#[cfg(target_arch = "wasm32")]74#[link(wasm_import_module = "[export]local:local/synchronous-transmit")]75unsafe extern "C" {76#[link_name = "[stream-cancel-read-0]start"]77fn stream_cancel_read(_: u32) -> u32;78}79#[cfg(not(target_arch = "wasm32"))]80unsafe extern "C" fn stream_cancel_read(_: u32) -> u32 {81unreachable!()82}8384#[cfg(target_arch = "wasm32")]85#[link(wasm_import_module = "[export]local:local/synchronous-transmit")]86unsafe extern "C" {87#[link_name = "[stream-drop-readable-0]start"]88fn stream_drop_readable(_: u32);89}90#[cfg(not(target_arch = "wasm32"))]91unsafe extern "C" fn stream_drop_readable(_: u32) {92unreachable!()93}9495#[cfg(target_arch = "wasm32")]96#[link(wasm_import_module = "[export]local:local/synchronous-transmit")]97unsafe extern "C" {98#[link_name = "[stream-drop-writable-0]start"]99fn stream_drop_writable(_: u32);100}101#[cfg(not(target_arch = "wasm32"))]102unsafe extern "C" fn stream_drop_writable(_: u32) {103unreachable!()104}105106#[cfg(target_arch = "wasm32")]107#[link(wasm_import_module = "[export]local:local/synchronous-transmit")]108unsafe extern "C" {109#[link_name = "[future-new-1]start"]110fn future_new() -> u64;111}112#[cfg(not(target_arch = "wasm32"))]113unsafe extern "C" fn future_new() -> u64 {114unreachable!()115}116117#[cfg(target_arch = "wasm32")]118#[link(wasm_import_module = "[export]local:local/synchronous-transmit")]119unsafe extern "C" {120#[link_name = "[async-lower][future-write-1]start"]121fn future_write(_: u32, _: *const u8) -> u32;122}123#[cfg(not(target_arch = "wasm32"))]124unsafe extern "C" fn future_write(_: u32, _: *const u8) -> u32 {125unreachable!()126}127128#[cfg(target_arch = "wasm32")]129#[link(wasm_import_module = "[export]local:local/synchronous-transmit")]130unsafe extern "C" {131#[link_name = "[async-lower][future-read-1]start"]132fn future_read(_: u32, _: *mut u8) -> u32;133}134#[cfg(not(target_arch = "wasm32"))]135unsafe extern "C" fn future_read(_: u32, _: *mut u8) -> u32 {136unreachable!()137}138139#[cfg(target_arch = "wasm32")]140#[link(wasm_import_module = "[export]local:local/synchronous-transmit")]141unsafe extern "C" {142#[link_name = "[future-cancel-write-1]start"]143fn future_cancel_write(_: u32) -> u32;144}145#[cfg(not(target_arch = "wasm32"))]146unsafe extern "C" fn future_cancel_write(_: u32) -> u32 {147unreachable!()148}149150#[cfg(target_arch = "wasm32")]151#[link(wasm_import_module = "[export]local:local/synchronous-transmit")]152unsafe extern "C" {153#[link_name = "[future-cancel-read-1]start"]154fn future_cancel_read(_: u32) -> u32;155}156#[cfg(not(target_arch = "wasm32"))]157unsafe extern "C" fn future_cancel_read(_: u32) -> u32 {158unreachable!()159}160161#[cfg(target_arch = "wasm32")]162#[link(wasm_import_module = "[export]local:local/synchronous-transmit")]163unsafe extern "C" {164#[link_name = "[future-drop-readable-1]start"]165fn future_drop_readable(_: u32);166}167#[cfg(not(target_arch = "wasm32"))]168unsafe extern "C" fn future_drop_readable(_: u32) {169unreachable!()170}171172#[cfg(target_arch = "wasm32")]173#[link(wasm_import_module = "[export]local:local/synchronous-transmit")]174unsafe extern "C" {175#[link_name = "[future-drop-writable-1]start"]176fn future_drop_writable(_: u32);177}178#[cfg(not(target_arch = "wasm32"))]179unsafe extern "C" fn future_drop_writable(_: u32) {180unreachable!()181}182183static STREAM_BYTES_TO_WRITE: &[u8] = &[1, 3, 5, 7, 11];184static FUTURE_BYTE_TO_WRITE: u8 = 13;185186enum State {187S0 {188stream: u32,189stream_expected: Vec<u8>,190future: u32,191future_expected: u8,192},193S1 {194stream_read_buffer: *mut u8,195stream_tx: u32,196stream: u32,197stream_expected: Vec<u8>,198future_read_buffer: *mut u8,199future_tx: u32,200future: u32,201future_expected: u8,202},203}204205#[unsafe(export_name = "[async-lift]local:local/synchronous-transmit#start")]206unsafe extern "C" fn export_start(207stream: u32,208stream_expected: u32,209stream_expected_len: u32,210future: u32,211future_expected: u8,212) -> u32 {213let stream_expected_len = usize::try_from(stream_expected_len).unwrap();214215unsafe {216context_set(217u32::try_from(Box::into_raw(Box::new(State::S0 {218stream,219stream_expected: Vec::from_raw_parts(220stream_expected as usize as *mut u8,221stream_expected_len,222stream_expected_len,223),224future,225future_expected,226})) as usize)227.unwrap(),228);229230callback_start(EVENT_NONE, 0, 0)231}232}233234#[unsafe(export_name = "[callback][async-lift]local:local/synchronous-transmit#start")]235unsafe extern "C" fn callback_start(event0: u32, _event1: u32, _event2: u32) -> u32 {236unsafe {237let state = &mut *(usize::try_from(context_get()).unwrap() as *mut State);238match state {239&mut State::S0 {240stream,241ref mut stream_expected,242future,243future_expected,244} => {245assert_eq!(event0, EVENT_NONE);246247// Here we assume specific behavior from the writers, namely:248//249// - They will not send us anything until after we cancel the250// reads, and even then there will be a delay.251//252// - When they _do_ send, they will send us all the bytes it253// told us to expect at once.254let stream_read_buffer =255ManuallyDrop::new(vec![0_u8; stream_expected.len()]).as_mut_ptr();256let status = stream_read(stream, stream_read_buffer, stream_expected.len());257assert_eq!(status, BLOCKED);258259let future_read_buffer = Box::into_raw(Box::new(0_u8));260let status = future_read(future, future_read_buffer);261assert_eq!(status, BLOCKED);262263let pair = stream_new();264let stream_tx = u32::try_from(pair >> 32).unwrap();265let stream_rx = u32::try_from(pair & 0xFFFFFFFF_u64).unwrap();266267let pair = future_new();268let future_tx = u32::try_from(pair >> 32).unwrap();269let future_rx = u32::try_from(pair & 0xFFFFFFFF_u64).unwrap();270271task_return_start(272stream_rx,273STREAM_BYTES_TO_WRITE.as_ptr(),274STREAM_BYTES_TO_WRITE.len(),275future_rx,276FUTURE_BYTE_TO_WRITE,277);278279// Here we assume specific behavior from the readers, namely:280//281// - They will not read anything until after we cancel the282// write, and even then there will be a delay.283//284// - When they _do_ read, they will accept all the bytes we told285// it to expect at once.286let status = stream_write(287stream_tx,288STREAM_BYTES_TO_WRITE.as_ptr(),289STREAM_BYTES_TO_WRITE.len(),290);291assert_eq!(status, BLOCKED);292293let status = future_write(future_tx, &FUTURE_BYTE_TO_WRITE);294assert_eq!(status, BLOCKED);295296*state = State::S1 {297stream_read_buffer,298stream_tx,299stream,300stream_expected: mem::take(stream_expected),301future_read_buffer,302future_tx,303future,304future_expected,305};306307CALLBACK_CODE_YIELD308}309310&mut State::S1 {311stream_read_buffer,312stream_tx,313stream,314ref mut stream_expected,315future_read_buffer,316future_tx,317future,318future_expected,319} => {320// Now we synchronously cancel everything and expect that the321// reads and writes complete.322323let status = stream_cancel_read(stream);324assert_eq!(325status,326DROPPED | u32::try_from(stream_expected.len() << 4).unwrap()327);328let received = Box::from_raw(slice::from_raw_parts_mut(329stream_read_buffer,330stream_expected.len(),331));332assert_eq!(&received[..], stream_expected);333stream_drop_readable(stream);334335let status = stream_cancel_write(stream_tx);336assert_eq!(337status,338DROPPED | u32::try_from(STREAM_BYTES_TO_WRITE.len() << 4).unwrap()339);340stream_drop_writable(stream_tx);341342let status = future_cancel_read(future);343assert_eq!(status, COMPLETED);344let received = Box::from_raw(future_read_buffer);345assert_eq!(*received, future_expected);346future_drop_readable(future);347348let status = future_cancel_write(future_tx);349assert_eq!(status, COMPLETED);350future_drop_writable(future_tx);351352CALLBACK_CODE_EXIT353}354}355}356}357358// Unused function; required since this file is built as a `bin`:359fn main() {}360361362