Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bytecodealliance
GitHub Repository: bytecodealliance/wasmtime
Path: blob/main/crates/test-programs/src/bin/async_cancel_transmit.rs
3068 views
1
mod bindings {
2
wit_bindgen::generate!({
3
path: "../misc/component-async-tests/wit",
4
world: "synchronous-transmit-guest",
5
});
6
}
7
8
use {
9
std::{
10
mem::{self, ManuallyDrop},
11
slice,
12
},
13
test_programs::async_::{
14
BLOCKED, CALLBACK_CODE_EXIT, CALLBACK_CODE_YIELD, COMPLETED, DROPPED, EVENT_NONE,
15
context_get, context_set,
16
},
17
};
18
19
#[cfg(target_arch = "wasm32")]
20
#[link(wasm_import_module = "[export]local:local/synchronous-transmit")]
21
unsafe extern "C" {
22
#[link_name = "[task-return]start"]
23
fn task_return_start(_: u32, _: *const u8, _: usize, _: u32, _: u8);
24
}
25
#[cfg(not(target_arch = "wasm32"))]
26
unsafe extern "C" fn task_return_start(_: u32, _: *const u8, _: usize, _: u32, _: u8) {
27
unreachable!()
28
}
29
30
#[cfg(target_arch = "wasm32")]
31
#[link(wasm_import_module = "[export]local:local/synchronous-transmit")]
32
unsafe extern "C" {
33
#[link_name = "[stream-new-0]start"]
34
fn stream_new() -> u64;
35
}
36
#[cfg(not(target_arch = "wasm32"))]
37
unsafe extern "C" fn stream_new() -> u64 {
38
unreachable!()
39
}
40
41
#[cfg(target_arch = "wasm32")]
42
#[link(wasm_import_module = "[export]local:local/synchronous-transmit")]
43
unsafe extern "C" {
44
#[link_name = "[async-lower][stream-write-0]start"]
45
fn stream_write(_: u32, _: *const u8, _: usize) -> u32;
46
}
47
#[cfg(not(target_arch = "wasm32"))]
48
unsafe extern "C" fn stream_write(_: u32, _: *const u8, _: usize) -> u32 {
49
unreachable!()
50
}
51
52
#[cfg(target_arch = "wasm32")]
53
#[link(wasm_import_module = "[export]local:local/synchronous-transmit")]
54
unsafe extern "C" {
55
#[link_name = "[async-lower][stream-read-0]start"]
56
fn stream_read(_: u32, _: *mut u8, _: usize) -> u32;
57
}
58
#[cfg(not(target_arch = "wasm32"))]
59
unsafe extern "C" fn stream_read(_: u32, _: *mut u8, _: usize) -> u32 {
60
unreachable!()
61
}
62
63
#[cfg(target_arch = "wasm32")]
64
#[link(wasm_import_module = "[export]local:local/synchronous-transmit")]
65
unsafe extern "C" {
66
#[link_name = "[stream-cancel-write-0]start"]
67
fn stream_cancel_write(_: u32) -> u32;
68
}
69
#[cfg(not(target_arch = "wasm32"))]
70
unsafe extern "C" fn stream_cancel_write(_: u32) -> u32 {
71
unreachable!()
72
}
73
74
#[cfg(target_arch = "wasm32")]
75
#[link(wasm_import_module = "[export]local:local/synchronous-transmit")]
76
unsafe extern "C" {
77
#[link_name = "[stream-cancel-read-0]start"]
78
fn stream_cancel_read(_: u32) -> u32;
79
}
80
#[cfg(not(target_arch = "wasm32"))]
81
unsafe extern "C" fn stream_cancel_read(_: u32) -> u32 {
82
unreachable!()
83
}
84
85
#[cfg(target_arch = "wasm32")]
86
#[link(wasm_import_module = "[export]local:local/synchronous-transmit")]
87
unsafe extern "C" {
88
#[link_name = "[stream-drop-readable-0]start"]
89
fn stream_drop_readable(_: u32);
90
}
91
#[cfg(not(target_arch = "wasm32"))]
92
unsafe extern "C" fn stream_drop_readable(_: u32) {
93
unreachable!()
94
}
95
96
#[cfg(target_arch = "wasm32")]
97
#[link(wasm_import_module = "[export]local:local/synchronous-transmit")]
98
unsafe extern "C" {
99
#[link_name = "[stream-drop-writable-0]start"]
100
fn stream_drop_writable(_: u32);
101
}
102
#[cfg(not(target_arch = "wasm32"))]
103
unsafe extern "C" fn stream_drop_writable(_: u32) {
104
unreachable!()
105
}
106
107
#[cfg(target_arch = "wasm32")]
108
#[link(wasm_import_module = "[export]local:local/synchronous-transmit")]
109
unsafe extern "C" {
110
#[link_name = "[future-new-1]start"]
111
fn future_new() -> u64;
112
}
113
#[cfg(not(target_arch = "wasm32"))]
114
unsafe extern "C" fn future_new() -> u64 {
115
unreachable!()
116
}
117
118
#[cfg(target_arch = "wasm32")]
119
#[link(wasm_import_module = "[export]local:local/synchronous-transmit")]
120
unsafe extern "C" {
121
#[link_name = "[async-lower][future-write-1]start"]
122
fn future_write(_: u32, _: *const u8) -> u32;
123
}
124
#[cfg(not(target_arch = "wasm32"))]
125
unsafe extern "C" fn future_write(_: u32, _: *const u8) -> u32 {
126
unreachable!()
127
}
128
129
#[cfg(target_arch = "wasm32")]
130
#[link(wasm_import_module = "[export]local:local/synchronous-transmit")]
131
unsafe extern "C" {
132
#[link_name = "[async-lower][future-read-1]start"]
133
fn future_read(_: u32, _: *mut u8) -> u32;
134
}
135
#[cfg(not(target_arch = "wasm32"))]
136
unsafe extern "C" fn future_read(_: u32, _: *mut u8) -> u32 {
137
unreachable!()
138
}
139
140
#[cfg(target_arch = "wasm32")]
141
#[link(wasm_import_module = "[export]local:local/synchronous-transmit")]
142
unsafe extern "C" {
143
#[link_name = "[future-cancel-write-1]start"]
144
fn future_cancel_write(_: u32) -> u32;
145
}
146
#[cfg(not(target_arch = "wasm32"))]
147
unsafe extern "C" fn future_cancel_write(_: u32) -> u32 {
148
unreachable!()
149
}
150
151
#[cfg(target_arch = "wasm32")]
152
#[link(wasm_import_module = "[export]local:local/synchronous-transmit")]
153
unsafe extern "C" {
154
#[link_name = "[future-cancel-read-1]start"]
155
fn future_cancel_read(_: u32) -> u32;
156
}
157
#[cfg(not(target_arch = "wasm32"))]
158
unsafe extern "C" fn future_cancel_read(_: u32) -> u32 {
159
unreachable!()
160
}
161
162
#[cfg(target_arch = "wasm32")]
163
#[link(wasm_import_module = "[export]local:local/synchronous-transmit")]
164
unsafe extern "C" {
165
#[link_name = "[future-drop-readable-1]start"]
166
fn future_drop_readable(_: u32);
167
}
168
#[cfg(not(target_arch = "wasm32"))]
169
unsafe extern "C" fn future_drop_readable(_: u32) {
170
unreachable!()
171
}
172
173
#[cfg(target_arch = "wasm32")]
174
#[link(wasm_import_module = "[export]local:local/synchronous-transmit")]
175
unsafe extern "C" {
176
#[link_name = "[future-drop-writable-1]start"]
177
fn future_drop_writable(_: u32);
178
}
179
#[cfg(not(target_arch = "wasm32"))]
180
unsafe extern "C" fn future_drop_writable(_: u32) {
181
unreachable!()
182
}
183
184
static STREAM_BYTES_TO_WRITE: &[u8] = &[1, 3, 5, 7, 11];
185
static FUTURE_BYTE_TO_WRITE: u8 = 13;
186
187
enum State {
188
S0 {
189
stream: u32,
190
stream_expected: Vec<u8>,
191
future: u32,
192
future_expected: u8,
193
},
194
S1 {
195
stream_read_buffer: *mut u8,
196
stream_tx: u32,
197
stream: u32,
198
stream_expected: Vec<u8>,
199
future_read_buffer: *mut u8,
200
future_tx: u32,
201
future: u32,
202
future_expected: u8,
203
},
204
}
205
206
#[unsafe(export_name = "[async-lift]local:local/synchronous-transmit#start")]
207
unsafe extern "C" fn export_start(
208
stream: u32,
209
stream_expected: u32,
210
stream_expected_len: u32,
211
future: u32,
212
future_expected: u8,
213
) -> u32 {
214
let stream_expected_len = usize::try_from(stream_expected_len).unwrap();
215
216
unsafe {
217
context_set(
218
u32::try_from(Box::into_raw(Box::new(State::S0 {
219
stream,
220
stream_expected: Vec::from_raw_parts(
221
stream_expected as usize as *mut u8,
222
stream_expected_len,
223
stream_expected_len,
224
),
225
future,
226
future_expected,
227
})) as usize)
228
.unwrap(),
229
);
230
231
callback_start(EVENT_NONE, 0, 0)
232
}
233
}
234
235
#[unsafe(export_name = "[callback][async-lift]local:local/synchronous-transmit#start")]
236
unsafe extern "C" fn callback_start(event0: u32, _event1: u32, _event2: u32) -> u32 {
237
unsafe {
238
let state = &mut *(usize::try_from(context_get()).unwrap() as *mut State);
239
match state {
240
&mut State::S0 {
241
stream,
242
ref mut stream_expected,
243
future,
244
future_expected,
245
} => {
246
assert_eq!(event0, EVENT_NONE);
247
248
// Here we assume specific behavior from the writers, namely:
249
//
250
// - They will not send us anything until after we cancel the
251
// reads, and even then there will be a delay.
252
//
253
// - When they _do_ send, they will send us all the bytes it
254
// told us to expect at once.
255
let stream_read_buffer =
256
ManuallyDrop::new(vec![0_u8; stream_expected.len()]).as_mut_ptr();
257
let status = stream_read(stream, stream_read_buffer, stream_expected.len());
258
assert_eq!(status, BLOCKED);
259
260
let future_read_buffer = Box::into_raw(Box::new(0_u8));
261
let status = future_read(future, future_read_buffer);
262
assert_eq!(status, BLOCKED);
263
264
let pair = stream_new();
265
let stream_tx = u32::try_from(pair >> 32).unwrap();
266
let stream_rx = u32::try_from(pair & 0xFFFFFFFF_u64).unwrap();
267
268
let pair = future_new();
269
let future_tx = u32::try_from(pair >> 32).unwrap();
270
let future_rx = u32::try_from(pair & 0xFFFFFFFF_u64).unwrap();
271
272
task_return_start(
273
stream_rx,
274
STREAM_BYTES_TO_WRITE.as_ptr(),
275
STREAM_BYTES_TO_WRITE.len(),
276
future_rx,
277
FUTURE_BYTE_TO_WRITE,
278
);
279
280
// Here we assume specific behavior from the readers, namely:
281
//
282
// - They will not read anything until after we cancel the
283
// write, and even then there will be a delay.
284
//
285
// - When they _do_ read, they will accept all the bytes we told
286
// it to expect at once.
287
let status = stream_write(
288
stream_tx,
289
STREAM_BYTES_TO_WRITE.as_ptr(),
290
STREAM_BYTES_TO_WRITE.len(),
291
);
292
assert_eq!(status, BLOCKED);
293
294
let status = future_write(future_tx, &FUTURE_BYTE_TO_WRITE);
295
assert_eq!(status, BLOCKED);
296
297
*state = State::S1 {
298
stream_read_buffer,
299
stream_tx,
300
stream,
301
stream_expected: mem::take(stream_expected),
302
future_read_buffer,
303
future_tx,
304
future,
305
future_expected,
306
};
307
308
CALLBACK_CODE_YIELD
309
}
310
311
&mut State::S1 {
312
stream_read_buffer,
313
stream_tx,
314
stream,
315
ref mut stream_expected,
316
future_read_buffer,
317
future_tx,
318
future,
319
future_expected,
320
} => {
321
// Now we synchronously cancel everything and expect that the
322
// reads and writes complete.
323
324
let status = stream_cancel_read(stream);
325
assert_eq!(
326
status,
327
DROPPED | u32::try_from(stream_expected.len() << 4).unwrap()
328
);
329
let received = Box::from_raw(slice::from_raw_parts_mut(
330
stream_read_buffer,
331
stream_expected.len(),
332
));
333
assert_eq!(&received[..], stream_expected);
334
stream_drop_readable(stream);
335
336
let status = stream_cancel_write(stream_tx);
337
assert_eq!(
338
status,
339
DROPPED | u32::try_from(STREAM_BYTES_TO_WRITE.len() << 4).unwrap()
340
);
341
stream_drop_writable(stream_tx);
342
343
let status = future_cancel_read(future);
344
assert_eq!(status, COMPLETED);
345
let received = Box::from_raw(future_read_buffer);
346
assert_eq!(*received, future_expected);
347
future_drop_readable(future);
348
349
let status = future_cancel_write(future_tx);
350
assert_eq!(status, COMPLETED);
351
future_drop_writable(future_tx);
352
353
CALLBACK_CODE_EXIT
354
}
355
}
356
}
357
}
358
359
// Unused function; required since this file is built as a `bin`:
360
fn main() {}
361
362