Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bytecodealliance
GitHub Repository: bytecodealliance/wasmtime
Path: blob/main/crates/test-programs/src/bin/async_readiness.rs
1693 views
1
mod bindings {
2
wit_bindgen::generate!({
3
path: "../misc/component-async-tests/wit",
4
world: "readiness-guest",
5
});
6
}
7
8
use {
9
std::{mem, ptr},
10
test_programs::async_::{
11
BLOCKED, CALLBACK_CODE_EXIT, CALLBACK_CODE_WAIT, DROPPED, EVENT_NONE, EVENT_STREAM_READ,
12
EVENT_STREAM_WRITE, context_get, context_set, waitable_join, waitable_set_drop,
13
waitable_set_new,
14
},
15
};
16
17
#[cfg(target_arch = "wasm32")]
18
#[link(wasm_import_module = "[export]local:local/readiness")]
19
unsafe extern "C" {
20
#[link_name = "[task-return][async]start"]
21
fn task_return_start(_: u32, _: *const u8, _: usize);
22
}
23
#[cfg(not(target_arch = "wasm32"))]
24
unsafe extern "C" fn task_return_start(_: u32, _: *const u8, _: usize) {
25
unreachable!()
26
}
27
28
#[cfg(target_arch = "wasm32")]
29
#[link(wasm_import_module = "[export]local:local/readiness")]
30
unsafe extern "C" {
31
#[link_name = "[stream-new-0][async]start"]
32
fn stream_new() -> u64;
33
}
34
#[cfg(not(target_arch = "wasm32"))]
35
unsafe extern "C" fn stream_new() -> u64 {
36
unreachable!()
37
}
38
39
#[cfg(target_arch = "wasm32")]
40
#[link(wasm_import_module = "[export]local:local/readiness")]
41
unsafe extern "C" {
42
#[link_name = "[async-lower][stream-write-0][async]start"]
43
fn stream_write(_: u32, _: *const u8, _: usize) -> u32;
44
}
45
#[cfg(not(target_arch = "wasm32"))]
46
unsafe extern "C" fn stream_write(_: u32, _: *const u8, _: usize) -> u32 {
47
unreachable!()
48
}
49
50
#[cfg(target_arch = "wasm32")]
51
#[link(wasm_import_module = "[export]local:local/readiness")]
52
unsafe extern "C" {
53
#[link_name = "[async-lower][stream-read-0][async]start"]
54
fn stream_read(_: u32, _: *mut u8, _: usize) -> u32;
55
}
56
#[cfg(not(target_arch = "wasm32"))]
57
unsafe extern "C" fn stream_read(_: u32, _: *mut u8, _: usize) -> u32 {
58
unreachable!()
59
}
60
61
#[cfg(target_arch = "wasm32")]
62
#[link(wasm_import_module = "[export]local:local/readiness")]
63
unsafe extern "C" {
64
#[link_name = "[stream-drop-readable-0][async]start"]
65
fn stream_drop_readable(_: u32);
66
}
67
#[cfg(not(target_arch = "wasm32"))]
68
unsafe extern "C" fn stream_drop_readable(_: u32) {
69
unreachable!()
70
}
71
72
#[cfg(target_arch = "wasm32")]
73
#[link(wasm_import_module = "[export]local:local/readiness")]
74
unsafe extern "C" {
75
#[link_name = "[stream-drop-writable-0][async]start"]
76
fn stream_drop_writable(_: u32);
77
}
78
#[cfg(not(target_arch = "wasm32"))]
79
unsafe extern "C" fn stream_drop_writable(_: u32) {
80
unreachable!()
81
}
82
83
static BYTES_TO_WRITE: &[u8] = &[1, 3, 5, 7, 11];
84
85
enum State {
86
S0 {
87
rx: u32,
88
expected: Vec<u8>,
89
},
90
S1 {
91
set: u32,
92
tx: Option<u32>,
93
rx: Option<u32>,
94
expected: Vec<u8>,
95
},
96
}
97
98
#[unsafe(export_name = "[async-lift]local:local/readiness#[async]start")]
99
unsafe extern "C" fn export_start(rx: u32, expected: u32, expected_len: u32) -> u32 {
100
let expected_len = usize::try_from(expected_len).unwrap();
101
102
unsafe {
103
context_set(
104
u32::try_from(Box::into_raw(Box::new(State::S0 {
105
rx,
106
expected: Vec::from_raw_parts(
107
expected as usize as *mut u8,
108
expected_len,
109
expected_len,
110
),
111
})) as usize)
112
.unwrap(),
113
);
114
115
callback_start(EVENT_NONE, 0, 0)
116
}
117
}
118
119
#[unsafe(export_name = "[callback][async-lift]local:local/readiness#[async]start")]
120
unsafe extern "C" fn callback_start(event0: u32, event1: u32, event2: u32) -> u32 {
121
unsafe {
122
let state = &mut *(usize::try_from(context_get()).unwrap() as *mut State);
123
match state {
124
State::S0 { rx, expected } => {
125
assert_eq!(event0, EVENT_NONE);
126
127
// Do a zero-length read to wait until the writer is ready.
128
//
129
// Here we assume specific behavior from the writer, namely:
130
//
131
// - It is not immediately ready to send us anything.
132
//
133
// - When it _is_ ready, it will send us all the bytes it told us to
134
// expect at once.
135
let status = stream_read(*rx, ptr::null_mut(), 0);
136
assert_eq!(status, BLOCKED);
137
138
let set = waitable_set_new();
139
140
waitable_join(*rx, set);
141
142
let tx = {
143
let pair = stream_new();
144
let tx = u32::try_from(pair >> 32).unwrap();
145
let rx = u32::try_from(pair & 0xFFFFFFFF_u64).unwrap();
146
147
// Do a zero-length write to wait until the reader is ready.
148
//
149
// Here we assume specific behavior from the reader, namely:
150
//
151
// - It is not immediately ready to receive anything (indeed, it
152
// can't possibly be ready given that we haven't returned the
153
// read handle to it yet).
154
//
155
// - When it _is_ ready, it will accept all the bytes we told it
156
// to expect at once.
157
let status = stream_write(tx, ptr::null(), 0);
158
assert_eq!(status, BLOCKED);
159
160
waitable_join(tx, set);
161
162
task_return_start(rx, BYTES_TO_WRITE.as_ptr(), BYTES_TO_WRITE.len());
163
164
tx
165
};
166
167
*state = State::S1 {
168
set,
169
tx: Some(tx),
170
rx: Some(*rx),
171
expected: mem::take(expected),
172
};
173
174
CALLBACK_CODE_WAIT | (set << 4)
175
}
176
177
State::S1 {
178
set,
179
tx,
180
rx,
181
expected,
182
} => {
183
if event0 == EVENT_STREAM_READ {
184
let rx = rx.take().unwrap();
185
assert_eq!(event1, rx);
186
assert_eq!(event2, 0);
187
188
// The writer is ready now, so this read should not block.
189
//
190
// As noted above, we we rely on the writer sending us all the
191
// expected bytes at once.
192
let received = &mut vec![0_u8; expected.len()];
193
let status = stream_read(rx, received.as_mut_ptr(), received.len());
194
assert_eq!(
195
status,
196
DROPPED | u32::try_from(received.len() << 4).unwrap()
197
);
198
assert_eq!(received, expected);
199
200
waitable_join(rx, 0);
201
stream_drop_readable(rx);
202
203
if tx.is_none() {
204
waitable_set_drop(*set);
205
206
CALLBACK_CODE_EXIT
207
} else {
208
CALLBACK_CODE_WAIT | (*set << 4)
209
}
210
} else if event0 == EVENT_STREAM_WRITE {
211
let tx = tx.take().unwrap();
212
assert_eq!(event1, tx);
213
assert_eq!(event2, 0);
214
215
// The reader is ready now, so this write should not block.
216
//
217
// As noted above, we we rely on the reader accepting all the
218
// expected bytes at once.
219
let status = stream_write(tx, BYTES_TO_WRITE.as_ptr(), BYTES_TO_WRITE.len());
220
assert_eq!(
221
status,
222
DROPPED | u32::try_from(BYTES_TO_WRITE.len() << 4).unwrap()
223
);
224
225
waitable_join(tx, 0);
226
stream_drop_writable(tx);
227
228
if rx.is_none() {
229
waitable_set_drop(*set);
230
231
CALLBACK_CODE_EXIT
232
} else {
233
CALLBACK_CODE_WAIT | (*set << 4)
234
}
235
} else {
236
unreachable!()
237
}
238
}
239
}
240
}
241
}
242
243
// Unused function; required since this file is built as a `bin`:
244
fn main() {}
245
246