Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bytecodealliance
GitHub Repository: bytecodealliance/wasmtime
Path: blob/main/crates/wasi/src/p2/host/tcp.rs
3088 views
1
use crate::p2::bindings::{
2
sockets::network::{ErrorCode, IpAddressFamily, IpSocketAddress, Network},
3
sockets::tcp::{self, ShutdownType},
4
};
5
use crate::p2::{Pollable, SocketResult};
6
use crate::sockets::{SocketAddrUse, TcpSocket, WasiSocketsCtxView};
7
use std::net::SocketAddr;
8
use wasmtime::component::Resource;
9
use wasmtime_wasi_io::{
10
poll::DynPollable,
11
streams::{DynInputStream, DynOutputStream},
12
};
13
14
impl tcp::Host for WasiSocketsCtxView<'_> {}
15
16
impl crate::p2::host::tcp::tcp::HostTcpSocket for WasiSocketsCtxView<'_> {
17
async fn start_bind(
18
&mut self,
19
this: Resource<TcpSocket>,
20
network: Resource<Network>,
21
local_address: IpSocketAddress,
22
) -> SocketResult<()> {
23
let network = self.table.get(&network)?;
24
let local_address: SocketAddr = local_address.into();
25
26
// Ensure that we're allowed to connect to this address.
27
network
28
.check_socket_addr(local_address, SocketAddrUse::TcpBind)
29
.await?;
30
31
// Bind to the address.
32
self.table.get_mut(&this)?.start_bind(local_address)?;
33
34
Ok(())
35
}
36
37
fn finish_bind(&mut self, this: Resource<TcpSocket>) -> SocketResult<()> {
38
let socket = self.table.get_mut(&this)?;
39
socket.finish_bind()?;
40
Ok(())
41
}
42
43
async fn start_connect(
44
&mut self,
45
this: Resource<TcpSocket>,
46
network: Resource<Network>,
47
remote_address: IpSocketAddress,
48
) -> SocketResult<()> {
49
let network = self.table.get(&network)?;
50
let remote_address: SocketAddr = remote_address.into();
51
52
// Ensure that we're allowed to connect to this address.
53
network
54
.check_socket_addr(remote_address, SocketAddrUse::TcpConnect)
55
.await?;
56
57
// Start connection
58
let socket = self.table.get_mut(&this)?;
59
let future = socket
60
.start_connect(&remote_address)?
61
.connect(remote_address);
62
socket.set_pending_connect(future)?;
63
64
Ok(())
65
}
66
67
fn finish_connect(
68
&mut self,
69
this: Resource<TcpSocket>,
70
) -> SocketResult<(Resource<DynInputStream>, Resource<DynOutputStream>)> {
71
let socket = self.table.get_mut(&this)?;
72
73
let result = socket
74
.take_pending_connect()?
75
.ok_or(ErrorCode::WouldBlock)?;
76
socket.finish_connect(result)?;
77
let (input, output) = socket.p2_streams()?;
78
let input = self.table.push_child(input, &this)?;
79
let output = self.table.push_child(output, &this)?;
80
Ok((input, output))
81
}
82
83
fn start_listen(&mut self, this: Resource<TcpSocket>) -> SocketResult<()> {
84
let socket = self.table.get_mut(&this)?;
85
86
socket.start_listen_p2()?;
87
Ok(())
88
}
89
90
fn finish_listen(&mut self, this: Resource<TcpSocket>) -> SocketResult<()> {
91
let socket = self.table.get_mut(&this)?;
92
socket.finish_listen_p2()?;
93
Ok(())
94
}
95
96
fn accept(
97
&mut self,
98
this: Resource<TcpSocket>,
99
) -> SocketResult<(
100
Resource<TcpSocket>,
101
Resource<DynInputStream>,
102
Resource<DynOutputStream>,
103
)> {
104
let socket = self.table.get_mut(&this)?;
105
106
let mut tcp_socket = socket.accept()?.ok_or(ErrorCode::WouldBlock)?;
107
let (input, output) = tcp_socket.p2_streams()?;
108
109
let tcp_socket = self.table.push(tcp_socket)?;
110
let input_stream = self.table.push_child(input, &tcp_socket)?;
111
let output_stream = self.table.push_child(output, &tcp_socket)?;
112
113
Ok((tcp_socket, input_stream, output_stream))
114
}
115
116
fn local_address(&mut self, this: Resource<TcpSocket>) -> SocketResult<IpSocketAddress> {
117
let socket = self.table.get(&this)?;
118
Ok(socket.local_address()?.into())
119
}
120
121
fn remote_address(&mut self, this: Resource<TcpSocket>) -> SocketResult<IpSocketAddress> {
122
let socket = self.table.get(&this)?;
123
Ok(socket.remote_address()?.into())
124
}
125
126
fn is_listening(&mut self, this: Resource<TcpSocket>) -> Result<bool, wasmtime::Error> {
127
let socket = self.table.get(&this)?;
128
129
Ok(socket.is_listening())
130
}
131
132
fn address_family(
133
&mut self,
134
this: Resource<TcpSocket>,
135
) -> Result<IpAddressFamily, wasmtime::Error> {
136
let socket = self.table.get(&this)?;
137
Ok(socket.address_family().into())
138
}
139
140
fn set_listen_backlog_size(
141
&mut self,
142
this: Resource<TcpSocket>,
143
value: u64,
144
) -> SocketResult<()> {
145
let socket = self.table.get_mut(&this)?;
146
socket.set_listen_backlog_size(value)?;
147
Ok(())
148
}
149
150
fn keep_alive_enabled(&mut self, this: Resource<TcpSocket>) -> SocketResult<bool> {
151
let socket = self.table.get(&this)?;
152
Ok(socket.keep_alive_enabled()?)
153
}
154
155
fn set_keep_alive_enabled(
156
&mut self,
157
this: Resource<TcpSocket>,
158
value: bool,
159
) -> SocketResult<()> {
160
let socket = self.table.get(&this)?;
161
socket.set_keep_alive_enabled(value)?;
162
Ok(())
163
}
164
165
fn keep_alive_idle_time(&mut self, this: Resource<TcpSocket>) -> SocketResult<u64> {
166
let socket = self.table.get(&this)?;
167
Ok(socket.keep_alive_idle_time()?)
168
}
169
170
fn set_keep_alive_idle_time(
171
&mut self,
172
this: Resource<TcpSocket>,
173
value: u64,
174
) -> SocketResult<()> {
175
let socket = self.table.get_mut(&this)?;
176
socket.set_keep_alive_idle_time(value)?;
177
Ok(())
178
}
179
180
fn keep_alive_interval(&mut self, this: Resource<TcpSocket>) -> SocketResult<u64> {
181
let socket = self.table.get(&this)?;
182
Ok(socket.keep_alive_interval()?)
183
}
184
185
fn set_keep_alive_interval(
186
&mut self,
187
this: Resource<TcpSocket>,
188
value: u64,
189
) -> SocketResult<()> {
190
let socket = self.table.get(&this)?;
191
socket.set_keep_alive_interval(value)?;
192
Ok(())
193
}
194
195
fn keep_alive_count(&mut self, this: Resource<TcpSocket>) -> SocketResult<u32> {
196
let socket = self.table.get(&this)?;
197
Ok(socket.keep_alive_count()?)
198
}
199
200
fn set_keep_alive_count(&mut self, this: Resource<TcpSocket>, value: u32) -> SocketResult<()> {
201
let socket = self.table.get(&this)?;
202
socket.set_keep_alive_count(value)?;
203
Ok(())
204
}
205
206
fn hop_limit(&mut self, this: Resource<TcpSocket>) -> SocketResult<u8> {
207
let socket = self.table.get(&this)?;
208
Ok(socket.hop_limit()?)
209
}
210
211
fn set_hop_limit(&mut self, this: Resource<TcpSocket>, value: u8) -> SocketResult<()> {
212
let socket = self.table.get_mut(&this)?;
213
socket.set_hop_limit(value)?;
214
Ok(())
215
}
216
217
fn receive_buffer_size(&mut self, this: Resource<TcpSocket>) -> SocketResult<u64> {
218
let socket = self.table.get(&this)?;
219
Ok(socket.receive_buffer_size()?)
220
}
221
222
fn set_receive_buffer_size(
223
&mut self,
224
this: Resource<TcpSocket>,
225
value: u64,
226
) -> SocketResult<()> {
227
let socket = self.table.get_mut(&this)?;
228
socket.set_receive_buffer_size(value)?;
229
Ok(())
230
}
231
232
fn send_buffer_size(&mut self, this: Resource<TcpSocket>) -> SocketResult<u64> {
233
let socket = self.table.get(&this)?;
234
Ok(socket.send_buffer_size()?)
235
}
236
237
fn set_send_buffer_size(&mut self, this: Resource<TcpSocket>, value: u64) -> SocketResult<()> {
238
let socket = self.table.get_mut(&this)?;
239
socket.set_send_buffer_size(value)?;
240
Ok(())
241
}
242
243
fn subscribe(&mut self, this: Resource<TcpSocket>) -> wasmtime::Result<Resource<DynPollable>> {
244
wasmtime_wasi_io::poll::subscribe(self.table, this)
245
}
246
247
fn shutdown(
248
&mut self,
249
this: Resource<TcpSocket>,
250
shutdown_type: ShutdownType,
251
) -> SocketResult<()> {
252
let socket = self.table.get(&this)?;
253
254
let how = match shutdown_type {
255
ShutdownType::Receive => std::net::Shutdown::Read,
256
ShutdownType::Send => std::net::Shutdown::Write,
257
ShutdownType::Both => std::net::Shutdown::Both,
258
};
259
260
let state = socket.p2_streaming_state()?;
261
state.shutdown(how)?;
262
Ok(())
263
}
264
265
fn drop(&mut self, this: Resource<TcpSocket>) -> Result<(), wasmtime::Error> {
266
// As in the filesystem implementation, we assume closing a socket
267
// doesn't block.
268
let dropped = self.table.delete(this)?;
269
drop(dropped);
270
271
Ok(())
272
}
273
}
274
275
#[async_trait::async_trait]
276
impl Pollable for TcpSocket {
277
async fn ready(&mut self) {
278
<TcpSocket>::ready(self).await;
279
}
280
}
281
282
pub mod sync {
283
use crate::p2::{
284
SocketError,
285
bindings::{
286
sockets::{
287
network::Network,
288
tcp::{self as async_tcp, HostTcpSocket as AsyncHostTcpSocket},
289
},
290
sync::sockets::tcp::{
291
self, Duration, HostTcpSocket, InputStream, IpAddressFamily, IpSocketAddress,
292
OutputStream, Pollable, ShutdownType, TcpSocket,
293
},
294
},
295
};
296
use crate::runtime::in_tokio;
297
use crate::sockets::WasiSocketsCtxView;
298
use wasmtime::component::Resource;
299
300
impl tcp::Host for WasiSocketsCtxView<'_> {}
301
302
impl HostTcpSocket for WasiSocketsCtxView<'_> {
303
fn start_bind(
304
&mut self,
305
self_: Resource<TcpSocket>,
306
network: Resource<Network>,
307
local_address: IpSocketAddress,
308
) -> Result<(), SocketError> {
309
in_tokio(async {
310
AsyncHostTcpSocket::start_bind(self, self_, network, local_address).await
311
})
312
}
313
314
fn finish_bind(&mut self, self_: Resource<TcpSocket>) -> Result<(), SocketError> {
315
AsyncHostTcpSocket::finish_bind(self, self_)
316
}
317
318
fn start_connect(
319
&mut self,
320
self_: Resource<TcpSocket>,
321
network: Resource<Network>,
322
remote_address: IpSocketAddress,
323
) -> Result<(), SocketError> {
324
in_tokio(async {
325
AsyncHostTcpSocket::start_connect(self, self_, network, remote_address).await
326
})
327
}
328
329
fn finish_connect(
330
&mut self,
331
self_: Resource<TcpSocket>,
332
) -> Result<(Resource<InputStream>, Resource<OutputStream>), SocketError> {
333
AsyncHostTcpSocket::finish_connect(self, self_)
334
}
335
336
fn start_listen(&mut self, self_: Resource<TcpSocket>) -> Result<(), SocketError> {
337
AsyncHostTcpSocket::start_listen(self, self_)
338
}
339
340
fn finish_listen(&mut self, self_: Resource<TcpSocket>) -> Result<(), SocketError> {
341
AsyncHostTcpSocket::finish_listen(self, self_)
342
}
343
344
fn accept(
345
&mut self,
346
self_: Resource<TcpSocket>,
347
) -> Result<
348
(
349
Resource<TcpSocket>,
350
Resource<InputStream>,
351
Resource<OutputStream>,
352
),
353
SocketError,
354
> {
355
AsyncHostTcpSocket::accept(self, self_)
356
}
357
358
fn local_address(
359
&mut self,
360
self_: Resource<TcpSocket>,
361
) -> Result<IpSocketAddress, SocketError> {
362
AsyncHostTcpSocket::local_address(self, self_)
363
}
364
365
fn remote_address(
366
&mut self,
367
self_: Resource<TcpSocket>,
368
) -> Result<IpSocketAddress, SocketError> {
369
AsyncHostTcpSocket::remote_address(self, self_)
370
}
371
372
fn is_listening(&mut self, self_: Resource<TcpSocket>) -> wasmtime::Result<bool> {
373
AsyncHostTcpSocket::is_listening(self, self_)
374
}
375
376
fn address_family(
377
&mut self,
378
self_: Resource<TcpSocket>,
379
) -> wasmtime::Result<IpAddressFamily> {
380
AsyncHostTcpSocket::address_family(self, self_)
381
}
382
383
fn set_listen_backlog_size(
384
&mut self,
385
self_: Resource<TcpSocket>,
386
value: u64,
387
) -> Result<(), SocketError> {
388
AsyncHostTcpSocket::set_listen_backlog_size(self, self_, value)
389
}
390
391
fn keep_alive_enabled(&mut self, self_: Resource<TcpSocket>) -> Result<bool, SocketError> {
392
AsyncHostTcpSocket::keep_alive_enabled(self, self_)
393
}
394
395
fn set_keep_alive_enabled(
396
&mut self,
397
self_: Resource<TcpSocket>,
398
value: bool,
399
) -> Result<(), SocketError> {
400
AsyncHostTcpSocket::set_keep_alive_enabled(self, self_, value)
401
}
402
403
fn keep_alive_idle_time(
404
&mut self,
405
self_: Resource<TcpSocket>,
406
) -> Result<Duration, SocketError> {
407
AsyncHostTcpSocket::keep_alive_idle_time(self, self_)
408
}
409
410
fn set_keep_alive_idle_time(
411
&mut self,
412
self_: Resource<TcpSocket>,
413
value: Duration,
414
) -> Result<(), SocketError> {
415
AsyncHostTcpSocket::set_keep_alive_idle_time(self, self_, value)
416
}
417
418
fn keep_alive_interval(
419
&mut self,
420
self_: Resource<TcpSocket>,
421
) -> Result<Duration, SocketError> {
422
AsyncHostTcpSocket::keep_alive_interval(self, self_)
423
}
424
425
fn set_keep_alive_interval(
426
&mut self,
427
self_: Resource<TcpSocket>,
428
value: Duration,
429
) -> Result<(), SocketError> {
430
AsyncHostTcpSocket::set_keep_alive_interval(self, self_, value)
431
}
432
433
fn keep_alive_count(&mut self, self_: Resource<TcpSocket>) -> Result<u32, SocketError> {
434
AsyncHostTcpSocket::keep_alive_count(self, self_)
435
}
436
437
fn set_keep_alive_count(
438
&mut self,
439
self_: Resource<TcpSocket>,
440
value: u32,
441
) -> Result<(), SocketError> {
442
AsyncHostTcpSocket::set_keep_alive_count(self, self_, value)
443
}
444
445
fn hop_limit(&mut self, self_: Resource<TcpSocket>) -> Result<u8, SocketError> {
446
AsyncHostTcpSocket::hop_limit(self, self_)
447
}
448
449
fn set_hop_limit(
450
&mut self,
451
self_: Resource<TcpSocket>,
452
value: u8,
453
) -> Result<(), SocketError> {
454
AsyncHostTcpSocket::set_hop_limit(self, self_, value)
455
}
456
457
fn receive_buffer_size(&mut self, self_: Resource<TcpSocket>) -> Result<u64, SocketError> {
458
AsyncHostTcpSocket::receive_buffer_size(self, self_)
459
}
460
461
fn set_receive_buffer_size(
462
&mut self,
463
self_: Resource<TcpSocket>,
464
value: u64,
465
) -> Result<(), SocketError> {
466
AsyncHostTcpSocket::set_receive_buffer_size(self, self_, value)
467
}
468
469
fn send_buffer_size(&mut self, self_: Resource<TcpSocket>) -> Result<u64, SocketError> {
470
AsyncHostTcpSocket::send_buffer_size(self, self_)
471
}
472
473
fn set_send_buffer_size(
474
&mut self,
475
self_: Resource<TcpSocket>,
476
value: u64,
477
) -> Result<(), SocketError> {
478
AsyncHostTcpSocket::set_send_buffer_size(self, self_, value)
479
}
480
481
fn subscribe(
482
&mut self,
483
self_: Resource<TcpSocket>,
484
) -> wasmtime::Result<Resource<Pollable>> {
485
AsyncHostTcpSocket::subscribe(self, self_)
486
}
487
488
fn shutdown(
489
&mut self,
490
self_: Resource<TcpSocket>,
491
shutdown_type: ShutdownType,
492
) -> Result<(), SocketError> {
493
AsyncHostTcpSocket::shutdown(self, self_, shutdown_type.into())
494
}
495
496
fn drop(&mut self, rep: Resource<TcpSocket>) -> wasmtime::Result<()> {
497
AsyncHostTcpSocket::drop(self, rep)
498
}
499
}
500
501
impl From<ShutdownType> for async_tcp::ShutdownType {
502
fn from(other: ShutdownType) -> Self {
503
match other {
504
ShutdownType::Receive => async_tcp::ShutdownType::Receive,
505
ShutdownType::Send => async_tcp::ShutdownType::Send,
506
ShutdownType::Both => async_tcp::ShutdownType::Both,
507
}
508
}
509
}
510
}
511
512