Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bytecodealliance
GitHub Repository: bytecodealliance/wasmtime
Path: blob/main/crates/wasi/src/sockets/tcp.rs
3129 views
1
use crate::p2::P2TcpStreamingState;
2
use crate::runtime::with_ambient_tokio_runtime;
3
use crate::sockets::util::{
4
ErrorCode, get_unicast_hop_limit, is_valid_address_family, is_valid_remote_address,
5
is_valid_unicast_address, receive_buffer_size, send_buffer_size, set_keep_alive_count,
6
set_keep_alive_idle_time, set_keep_alive_interval, set_receive_buffer_size,
7
set_send_buffer_size, set_unicast_hop_limit, tcp_bind,
8
};
9
use crate::sockets::{DEFAULT_TCP_BACKLOG, SocketAddressFamily, WasiSocketsCtx};
10
use io_lifetimes::AsSocketlike as _;
11
use io_lifetimes::views::SocketlikeView;
12
use rustix::io::Errno;
13
use rustix::net::sockopt;
14
use std::fmt::Debug;
15
use std::io;
16
use std::mem;
17
use std::net::SocketAddr;
18
use std::pin::Pin;
19
use std::sync::Arc;
20
use std::task::{Context, Poll, Waker};
21
use std::time::Duration;
22
23
/// The state of a TCP socket.
24
///
25
/// This represents the various states a socket can be in during the
26
/// activities of binding, listening, accepting, and connecting. Note that this
27
/// state machine encompasses both WASIp2 and WASIp3.
28
enum TcpState {
29
/// The initial state for a newly-created socket.
30
///
31
/// From here a socket can transition to `BindStarted`, `ListenStarted`, or
32
/// `Connecting`.
33
Default(tokio::net::TcpSocket),
34
35
/// A state indicating that a bind has been started and must be finished
36
/// subsequently with `finish_bind`.
37
///
38
/// From here a socket can transition to `Bound`.
39
BindStarted(tokio::net::TcpSocket),
40
41
/// Binding finished. The socket has an address but is not yet listening for
42
/// connections.
43
///
44
/// From here a socket can transition to `ListenStarted`, or `Connecting`.
45
Bound(tokio::net::TcpSocket),
46
47
/// Listening on a socket has started and must be completed with
48
/// `finish_listen`.
49
///
50
/// From here a socket can transition to `Listening`.
51
ListenStarted(tokio::net::TcpSocket),
52
53
/// The socket is now listening and waiting for an incoming connection.
54
///
55
/// Sockets will not leave this state.
56
Listening {
57
/// The raw tokio-basd TCP listener managing the underlying socket.
58
listener: Arc<tokio::net::TcpListener>,
59
60
/// The last-accepted connection, set during the `ready` method and read
61
/// during the `accept` method. Note that this is only used for WASIp2
62
/// at this time.
63
pending_accept: Option<io::Result<tokio::net::TcpStream>>,
64
},
65
66
/// An outgoing connection is started.
67
///
68
/// This is created via the `start_connect` method. The payload here is an
69
/// optionally-specified owned future for the result of the connect. In
70
/// WASIp2 the future lives here, but in WASIp3 it lives on the event loop
71
/// so this is `None`.
72
///
73
/// From here a socket can transition to `ConnectReady` or `Connected`.
74
Connecting(Option<Pin<Box<dyn Future<Output = io::Result<tokio::net::TcpStream>> + Send>>>),
75
76
/// A connection via `Connecting` has completed.
77
///
78
/// This is present for WASIp2 where the `Connecting` state stores `Some` of
79
/// a future, and the result of that future is recorded here when it
80
/// finishes as part of the `ready` method.
81
///
82
/// From here a socket can transition to `Connected`.
83
ConnectReady(io::Result<tokio::net::TcpStream>),
84
85
/// A connection has been established.
86
///
87
/// This is created either via `finish_connect` or for freshly accepted
88
/// sockets from a TCP listener.
89
///
90
/// From here a socket can transition to `Receiving` or `P2Streaming`.
91
Connected(Arc<tokio::net::TcpStream>),
92
93
/// A connection has been established and `receive` has been called.
94
///
95
/// A socket will not transition out of this state.
96
#[cfg(feature = "p3")]
97
Receiving(Arc<tokio::net::TcpStream>),
98
99
/// This is a WASIp2-bound socket which stores some extra state for
100
/// read/write streams to handle TCP shutdown.
101
///
102
/// A socket will not transition out of this state.
103
P2Streaming(Box<P2TcpStreamingState>),
104
105
/// This is not actually a socket but a deferred error.
106
///
107
/// This error came out of `accept` and is deferred until the socket is
108
/// operated on.
109
#[cfg(feature = "p3")]
110
Error(io::Error),
111
112
/// The socket is closed and no more operations can be performed.
113
Closed,
114
}
115
116
impl Debug for TcpState {
117
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118
match self {
119
Self::Default(_) => f.debug_tuple("Default").finish(),
120
Self::BindStarted(_) => f.debug_tuple("BindStarted").finish(),
121
Self::Bound(_) => f.debug_tuple("Bound").finish(),
122
Self::ListenStarted { .. } => f.debug_tuple("ListenStarted").finish(),
123
Self::Listening { .. } => f.debug_tuple("Listening").finish(),
124
Self::Connecting(..) => f.debug_tuple("Connecting").finish(),
125
Self::ConnectReady(..) => f.debug_tuple("ConnectReady").finish(),
126
Self::Connected { .. } => f.debug_tuple("Connected").finish(),
127
#[cfg(feature = "p3")]
128
Self::Receiving { .. } => f.debug_tuple("Receiving").finish(),
129
Self::P2Streaming(_) => f.debug_tuple("P2Streaming").finish(),
130
#[cfg(feature = "p3")]
131
Self::Error(..) => f.debug_tuple("Error").finish(),
132
Self::Closed => write!(f, "Closed"),
133
}
134
}
135
}
136
137
/// A host TCP socket, plus associated bookkeeping.
138
pub struct TcpSocket {
139
/// The current state in the bind/listen/accept/connect progression.
140
tcp_state: TcpState,
141
142
/// The desired listen queue size.
143
listen_backlog_size: u32,
144
145
family: SocketAddressFamily,
146
147
options: NonInheritedOptions,
148
}
149
150
impl TcpSocket {
151
/// Create a new socket in the given family.
152
pub(crate) fn new(
153
ctx: &WasiSocketsCtx,
154
family: SocketAddressFamily,
155
) -> Result<Self, ErrorCode> {
156
ctx.allowed_network_uses.check_allowed_tcp()?;
157
158
with_ambient_tokio_runtime(|| {
159
let socket = match family {
160
SocketAddressFamily::Ipv4 => tokio::net::TcpSocket::new_v4()?,
161
SocketAddressFamily::Ipv6 => {
162
let socket = tokio::net::TcpSocket::new_v6()?;
163
sockopt::set_ipv6_v6only(&socket, true)?;
164
socket
165
}
166
};
167
168
Ok(Self::from_state(TcpState::Default(socket), family))
169
})
170
}
171
172
#[cfg(feature = "p3")]
173
pub(crate) fn new_error(err: io::Error, family: SocketAddressFamily) -> Self {
174
TcpSocket::from_state(TcpState::Error(err), family)
175
}
176
177
/// Creates a new socket with the `result` of an accepted socket from a
178
/// `TcpListener`.
179
///
180
/// This will handle the `result` internally and `result` should be the raw
181
/// result from a TCP listen operation.
182
pub(crate) fn new_accept(
183
result: io::Result<tokio::net::TcpStream>,
184
options: &NonInheritedOptions,
185
family: SocketAddressFamily,
186
) -> io::Result<Self> {
187
let client = result.map_err(|err| match Errno::from_io_error(&err) {
188
// From: https://learn.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-accept#:~:text=WSAEINPROGRESS
189
// > WSAEINPROGRESS: A blocking Windows Sockets 1.1 call is in progress,
190
// > or the service provider is still processing a callback function.
191
//
192
// wasi-sockets doesn't have an equivalent to the EINPROGRESS error,
193
// because in POSIX this error is only returned by a non-blocking
194
// `connect` and wasi-sockets has a different solution for that.
195
#[cfg(windows)]
196
Some(Errno::INPROGRESS) => Errno::INTR.into(),
197
198
// Normalize Linux' non-standard behavior.
199
//
200
// From https://man7.org/linux/man-pages/man2/accept.2.html:
201
// > Linux accept() passes already-pending network errors on the
202
// > new socket as an error code from accept(). This behavior
203
// > differs from other BSD socket implementations. (...)
204
#[cfg(target_os = "linux")]
205
Some(
206
Errno::CONNRESET
207
| Errno::NETRESET
208
| Errno::HOSTUNREACH
209
| Errno::HOSTDOWN
210
| Errno::NETDOWN
211
| Errno::NETUNREACH
212
| Errno::PROTO
213
| Errno::NOPROTOOPT
214
| Errno::NONET
215
| Errno::OPNOTSUPP,
216
) => Errno::CONNABORTED.into(),
217
218
_ => err,
219
})?;
220
options.apply(family, &client);
221
Ok(Self::from_state(
222
TcpState::Connected(Arc::new(client)),
223
family,
224
))
225
}
226
227
/// Create a `TcpSocket` from an existing socket.
228
fn from_state(state: TcpState, family: SocketAddressFamily) -> Self {
229
Self {
230
tcp_state: state,
231
listen_backlog_size: DEFAULT_TCP_BACKLOG,
232
family,
233
options: Default::default(),
234
}
235
}
236
237
pub(crate) fn as_std_view(&self) -> Result<SocketlikeView<'_, std::net::TcpStream>, ErrorCode> {
238
match &self.tcp_state {
239
TcpState::Default(socket)
240
| TcpState::BindStarted(socket)
241
| TcpState::Bound(socket)
242
| TcpState::ListenStarted(socket) => Ok(socket.as_socketlike_view()),
243
TcpState::Connected(stream) => Ok(stream.as_socketlike_view()),
244
#[cfg(feature = "p3")]
245
TcpState::Receiving(stream) => Ok(stream.as_socketlike_view()),
246
TcpState::Listening { listener, .. } => Ok(listener.as_socketlike_view()),
247
TcpState::P2Streaming(state) => Ok(state.stream.as_socketlike_view()),
248
TcpState::Connecting(..) | TcpState::ConnectReady(_) | TcpState::Closed => {
249
Err(ErrorCode::InvalidState)
250
}
251
#[cfg(feature = "p3")]
252
TcpState::Error(err) => Err(err.into()),
253
}
254
}
255
256
pub(crate) fn start_bind(&mut self, addr: SocketAddr) -> Result<(), ErrorCode> {
257
let ip = addr.ip();
258
if !is_valid_unicast_address(ip) || !is_valid_address_family(ip, self.family) {
259
return Err(ErrorCode::InvalidArgument);
260
}
261
match mem::replace(&mut self.tcp_state, TcpState::Closed) {
262
TcpState::Default(sock) => {
263
if let Err(err) = tcp_bind(&sock, addr) {
264
self.tcp_state = TcpState::Default(sock);
265
Err(err)
266
} else {
267
self.tcp_state = TcpState::BindStarted(sock);
268
Ok(())
269
}
270
}
271
tcp_state => {
272
self.tcp_state = tcp_state;
273
Err(ErrorCode::InvalidState)
274
}
275
}
276
}
277
278
pub(crate) fn finish_bind(&mut self) -> Result<(), ErrorCode> {
279
match mem::replace(&mut self.tcp_state, TcpState::Closed) {
280
TcpState::BindStarted(socket) => {
281
self.tcp_state = TcpState::Bound(socket);
282
Ok(())
283
}
284
current_state => {
285
// Reset the state so that the outside world doesn't see this socket as closed
286
self.tcp_state = current_state;
287
Err(ErrorCode::NotInProgress)
288
}
289
}
290
}
291
292
pub(crate) fn start_connect(
293
&mut self,
294
addr: &SocketAddr,
295
) -> Result<tokio::net::TcpSocket, ErrorCode> {
296
match self.tcp_state {
297
TcpState::Default(..) | TcpState::Bound(..) => {}
298
TcpState::Connecting(..) => {
299
return Err(ErrorCode::ConcurrencyConflict);
300
}
301
_ => return Err(ErrorCode::InvalidState),
302
};
303
304
if !is_valid_unicast_address(addr.ip())
305
|| !is_valid_remote_address(*addr)
306
|| !is_valid_address_family(addr.ip(), self.family)
307
{
308
return Err(ErrorCode::InvalidArgument);
309
};
310
311
let (TcpState::Default(tokio_socket) | TcpState::Bound(tokio_socket)) =
312
mem::replace(&mut self.tcp_state, TcpState::Connecting(None))
313
else {
314
unreachable!();
315
};
316
317
Ok(tokio_socket)
318
}
319
320
/// For WASIp2 this is used to record the actual connection future as part
321
/// of `start_connect` within this socket state.
322
pub(crate) fn set_pending_connect(
323
&mut self,
324
future: impl Future<Output = io::Result<tokio::net::TcpStream>> + Send + 'static,
325
) -> Result<(), ErrorCode> {
326
match &mut self.tcp_state {
327
TcpState::Connecting(slot @ None) => {
328
*slot = Some(Box::pin(future));
329
Ok(())
330
}
331
_ => Err(ErrorCode::InvalidState),
332
}
333
}
334
335
/// For WASIp2 this retrieves the result from the future passed to
336
/// `set_pending_connect`.
337
///
338
/// Return states here are:
339
///
340
/// * `Ok(Some(res))` - where `res` is the result of the connect operation.
341
/// * `Ok(None)` - the connect operation isn't ready yet.
342
/// * `Err(e)` - a connect operation is not in progress.
343
pub(crate) fn take_pending_connect(
344
&mut self,
345
) -> Result<Option<io::Result<tokio::net::TcpStream>>, ErrorCode> {
346
match mem::replace(&mut self.tcp_state, TcpState::Connecting(None)) {
347
TcpState::ConnectReady(result) => Ok(Some(result)),
348
TcpState::Connecting(Some(mut future)) => {
349
let mut cx = Context::from_waker(Waker::noop());
350
match with_ambient_tokio_runtime(|| future.as_mut().poll(&mut cx)) {
351
Poll::Ready(result) => Ok(Some(result)),
352
Poll::Pending => {
353
self.tcp_state = TcpState::Connecting(Some(future));
354
Ok(None)
355
}
356
}
357
}
358
current_state => {
359
self.tcp_state = current_state;
360
Err(ErrorCode::NotInProgress)
361
}
362
}
363
}
364
365
pub(crate) fn finish_connect(
366
&mut self,
367
result: io::Result<tokio::net::TcpStream>,
368
) -> Result<(), ErrorCode> {
369
if !matches!(self.tcp_state, TcpState::Connecting(None)) {
370
return Err(ErrorCode::InvalidState);
371
}
372
match result {
373
Ok(stream) => {
374
self.tcp_state = TcpState::Connected(Arc::new(stream));
375
Ok(())
376
}
377
Err(err) => {
378
self.tcp_state = TcpState::Closed;
379
Err(ErrorCode::from(err))
380
}
381
}
382
}
383
384
/// Start listening using p2 semantics. (no implicit bind)
385
pub(crate) fn start_listen_p2(&mut self) -> Result<(), ErrorCode> {
386
match mem::replace(&mut self.tcp_state, TcpState::Closed) {
387
TcpState::Bound(tokio_socket) => {
388
self.tcp_state = TcpState::ListenStarted(tokio_socket);
389
Ok(())
390
}
391
previous_state => {
392
self.tcp_state = previous_state;
393
Err(ErrorCode::InvalidState)
394
}
395
}
396
}
397
398
pub(crate) fn finish_listen_p2(&mut self) -> Result<(), ErrorCode> {
399
let tokio_socket = match mem::replace(&mut self.tcp_state, TcpState::Closed) {
400
TcpState::ListenStarted(tokio_socket) => tokio_socket,
401
previous_state => {
402
self.tcp_state = previous_state;
403
return Err(ErrorCode::NotInProgress);
404
}
405
};
406
407
self.listen_common(tokio_socket)
408
}
409
410
/// Start listening using p3 semantics. (with implicit bind)
411
#[cfg(feature = "p3")]
412
pub(crate) fn listen_p3(&mut self) -> Result<(), ErrorCode> {
413
let tokio_socket = match mem::replace(&mut self.tcp_state, TcpState::Closed) {
414
TcpState::Bound(tokio_socket) => tokio_socket,
415
TcpState::Default(tokio_socket) => {
416
// Some platforms automatically perform an implicit bind as part
417
// of the `listen` syscall. However this is not ubiquitous
418
// behavior:
419
// - Linux mentions it in their docs [0] that they perform an
420
// implicit bind. This behavior has been experimentally verified.
421
// - Windows requires a `bind` before `listen`. This is both
422
// documented [1] and experimentally verified.
423
// - Other platforms (e.g. macOS, FreeBSD) do not explicitly
424
// document it either way and instead leave it up to the
425
// individual protocol to decide [2]. However, experiments
426
// show that MacOS in fact _does_ perform an implicit bind.
427
//
428
// To ensure consistent behavior across all platforms, we
429
// perform the implicit bind ourselves here.
430
//
431
// [0]: https://man7.org/linux/man-pages/man7/ip.7.html
432
// > An ephemeral port is allocated to a socket in the following
433
// > circumstances: (...) listen(2) is called on a stream socket
434
// > that was not previously bound;
435
//
436
// [1]: https://learn.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-listen
437
// > WSAEINVAL: The socket has not been bound with bind.
438
//
439
// [2]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/listen.html
440
// > EDESTADDRREQ: The socket is not bound to a local address,
441
// > and the protocol does not support listening on an unbound
442
// > socket.
443
let implicit_addr = crate::sockets::util::implicit_bind_addr(self.family);
444
tcp_bind(&tokio_socket, implicit_addr)?;
445
tokio_socket
446
}
447
previous_state => {
448
self.tcp_state = previous_state;
449
return Err(ErrorCode::InvalidState);
450
}
451
};
452
453
self.listen_common(tokio_socket)
454
}
455
456
fn listen_common(&mut self, tokio_socket: tokio::net::TcpSocket) -> Result<(), ErrorCode> {
457
match with_ambient_tokio_runtime(|| tokio_socket.listen(self.listen_backlog_size)) {
458
Ok(listener) => {
459
self.tcp_state = TcpState::Listening {
460
listener: Arc::new(listener),
461
pending_accept: None,
462
};
463
Ok(())
464
}
465
Err(err) => {
466
self.tcp_state = TcpState::Closed;
467
468
Err(match Errno::from_io_error(&err) {
469
// See: https://learn.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-listen#:~:text=WSAEMFILE
470
// According to the docs, `listen` can return EMFILE on Windows.
471
// This is odd, because we're not trying to create a new socket
472
// or file descriptor of any kind. So we rewrite it to less
473
// surprising error code.
474
//
475
// At the time of writing, this behavior has never been experimentally
476
// observed by any of the wasmtime authors, so we're relying fully
477
// on Microsoft's documentation here.
478
#[cfg(windows)]
479
Some(Errno::MFILE) => Errno::NOBUFS.into(),
480
481
_ => err.into(),
482
})
483
}
484
}
485
}
486
487
pub(crate) fn accept(&mut self) -> Result<Option<Self>, ErrorCode> {
488
let TcpState::Listening {
489
listener,
490
pending_accept,
491
} = &mut self.tcp_state
492
else {
493
return Err(ErrorCode::InvalidState);
494
};
495
496
let result = match pending_accept.take() {
497
Some(result) => result,
498
None => {
499
let mut cx = std::task::Context::from_waker(Waker::noop());
500
match with_ambient_tokio_runtime(|| listener.poll_accept(&mut cx))
501
.map_ok(|(stream, _)| stream)
502
{
503
Poll::Ready(result) => result,
504
Poll::Pending => return Ok(None),
505
}
506
}
507
};
508
509
Ok(Some(Self::new_accept(result, &self.options, self.family)?))
510
}
511
512
#[cfg(feature = "p3")]
513
pub(crate) fn start_receive(&mut self) -> Option<&Arc<tokio::net::TcpStream>> {
514
match mem::replace(&mut self.tcp_state, TcpState::Closed) {
515
TcpState::Connected(stream) => {
516
self.tcp_state = TcpState::Receiving(stream);
517
Some(self.tcp_stream_arc().unwrap())
518
}
519
prev => {
520
self.tcp_state = prev;
521
None
522
}
523
}
524
}
525
526
pub(crate) fn local_address(&self) -> Result<SocketAddr, ErrorCode> {
527
match &self.tcp_state {
528
TcpState::Bound(socket) => Ok(socket.local_addr()?),
529
TcpState::Connected(stream) => Ok(stream.local_addr()?),
530
#[cfg(feature = "p3")]
531
TcpState::Receiving(stream) => Ok(stream.local_addr()?),
532
TcpState::P2Streaming(state) => Ok(state.stream.local_addr()?),
533
TcpState::Listening { listener, .. } => Ok(listener.local_addr()?),
534
#[cfg(feature = "p3")]
535
TcpState::Error(err) => Err(err.into()),
536
_ => Err(ErrorCode::InvalidState),
537
}
538
}
539
540
pub(crate) fn remote_address(&self) -> Result<SocketAddr, ErrorCode> {
541
let stream = self.tcp_stream_arc()?;
542
let addr = stream.peer_addr()?;
543
Ok(addr)
544
}
545
546
pub(crate) fn is_listening(&self) -> bool {
547
matches!(self.tcp_state, TcpState::Listening { .. })
548
}
549
550
pub(crate) fn address_family(&self) -> SocketAddressFamily {
551
self.family
552
}
553
554
pub(crate) fn set_listen_backlog_size(&mut self, value: u64) -> Result<(), ErrorCode> {
555
const MIN_BACKLOG: u32 = 1;
556
const MAX_BACKLOG: u32 = i32::MAX as u32; // OS'es will most likely limit it down even further.
557
558
if value == 0 {
559
return Err(ErrorCode::InvalidArgument);
560
}
561
// Silently clamp backlog size. This is OK for us to do, because operating systems do this too.
562
let value = value
563
.try_into()
564
.unwrap_or(MAX_BACKLOG)
565
.clamp(MIN_BACKLOG, MAX_BACKLOG);
566
match &self.tcp_state {
567
TcpState::Default(..) | TcpState::Bound(..) => {
568
// Socket not listening yet. Stash value for first invocation to `listen`.
569
self.listen_backlog_size = value;
570
Ok(())
571
}
572
TcpState::Listening { listener, .. } => {
573
// Try to update the backlog by calling `listen` again.
574
// Not all platforms support this. We'll only update our own value if the OS supports changing the backlog size after the fact.
575
if rustix::net::listen(&listener, value.try_into().unwrap_or(i32::MAX)).is_err() {
576
return Err(ErrorCode::NotSupported);
577
}
578
self.listen_backlog_size = value;
579
Ok(())
580
}
581
#[cfg(feature = "p3")]
582
TcpState::Error(err) => Err(err.into()),
583
_ => Err(ErrorCode::InvalidState),
584
}
585
}
586
587
pub(crate) fn keep_alive_enabled(&self) -> Result<bool, ErrorCode> {
588
let fd = &*self.as_std_view()?;
589
let v = sockopt::socket_keepalive(fd)?;
590
Ok(v)
591
}
592
593
pub(crate) fn set_keep_alive_enabled(&self, value: bool) -> Result<(), ErrorCode> {
594
let fd = &*self.as_std_view()?;
595
sockopt::set_socket_keepalive(fd, value)?;
596
Ok(())
597
}
598
599
pub(crate) fn keep_alive_idle_time(&self) -> Result<u64, ErrorCode> {
600
let fd = &*self.as_std_view()?;
601
let v = sockopt::tcp_keepidle(fd)?;
602
Ok(v.as_nanos().try_into().unwrap_or(u64::MAX))
603
}
604
605
pub(crate) fn set_keep_alive_idle_time(&mut self, value: u64) -> Result<(), ErrorCode> {
606
let value = {
607
let fd = self.as_std_view()?;
608
set_keep_alive_idle_time(&*fd, value)?
609
};
610
self.options.set_keep_alive_idle_time(value);
611
Ok(())
612
}
613
614
pub(crate) fn keep_alive_interval(&self) -> Result<u64, ErrorCode> {
615
let fd = &*self.as_std_view()?;
616
let v = sockopt::tcp_keepintvl(fd)?;
617
Ok(v.as_nanos().try_into().unwrap_or(u64::MAX))
618
}
619
620
pub(crate) fn set_keep_alive_interval(&self, value: u64) -> Result<(), ErrorCode> {
621
let fd = &*self.as_std_view()?;
622
set_keep_alive_interval(fd, Duration::from_nanos(value))?;
623
Ok(())
624
}
625
626
pub(crate) fn keep_alive_count(&self) -> Result<u32, ErrorCode> {
627
let fd = &*self.as_std_view()?;
628
let v = sockopt::tcp_keepcnt(fd)?;
629
Ok(v)
630
}
631
632
pub(crate) fn set_keep_alive_count(&self, value: u32) -> Result<(), ErrorCode> {
633
let fd = &*self.as_std_view()?;
634
set_keep_alive_count(fd, value)?;
635
Ok(())
636
}
637
638
pub(crate) fn hop_limit(&self) -> Result<u8, ErrorCode> {
639
let fd = &*self.as_std_view()?;
640
let n = get_unicast_hop_limit(fd, self.family)?;
641
Ok(n)
642
}
643
644
pub(crate) fn set_hop_limit(&mut self, value: u8) -> Result<(), ErrorCode> {
645
{
646
let fd = &*self.as_std_view()?;
647
set_unicast_hop_limit(fd, self.family, value)?;
648
}
649
self.options.set_hop_limit(value);
650
Ok(())
651
}
652
653
pub(crate) fn receive_buffer_size(&self) -> Result<u64, ErrorCode> {
654
let fd = &*self.as_std_view()?;
655
let n = receive_buffer_size(fd)?;
656
Ok(n)
657
}
658
659
pub(crate) fn set_receive_buffer_size(&mut self, value: u64) -> Result<(), ErrorCode> {
660
let res = {
661
let fd = &*self.as_std_view()?;
662
set_receive_buffer_size(fd, value)?
663
};
664
self.options.set_receive_buffer_size(res);
665
Ok(())
666
}
667
668
pub(crate) fn send_buffer_size(&self) -> Result<u64, ErrorCode> {
669
let fd = &*self.as_std_view()?;
670
let n = send_buffer_size(fd)?;
671
Ok(n)
672
}
673
674
pub(crate) fn set_send_buffer_size(&mut self, value: u64) -> Result<(), ErrorCode> {
675
let res = {
676
let fd = &*self.as_std_view()?;
677
set_send_buffer_size(fd, value)?
678
};
679
self.options.set_send_buffer_size(res);
680
Ok(())
681
}
682
683
#[cfg(feature = "p3")]
684
pub(crate) fn non_inherited_options(&self) -> &NonInheritedOptions {
685
&self.options
686
}
687
688
#[cfg(feature = "p3")]
689
pub(crate) fn tcp_listener_arc(&self) -> Result<&Arc<tokio::net::TcpListener>, ErrorCode> {
690
match &self.tcp_state {
691
TcpState::Listening { listener, .. } => Ok(listener),
692
#[cfg(feature = "p3")]
693
TcpState::Error(err) => Err(err.into()),
694
_ => Err(ErrorCode::InvalidState),
695
}
696
}
697
698
pub(crate) fn tcp_stream_arc(&self) -> Result<&Arc<tokio::net::TcpStream>, ErrorCode> {
699
match &self.tcp_state {
700
TcpState::Connected(socket) => Ok(socket),
701
#[cfg(feature = "p3")]
702
TcpState::Receiving(socket) => Ok(socket),
703
TcpState::P2Streaming(state) => Ok(&state.stream),
704
#[cfg(feature = "p3")]
705
TcpState::Error(err) => Err(err.into()),
706
_ => Err(ErrorCode::InvalidState),
707
}
708
}
709
710
pub(crate) fn p2_streaming_state(&self) -> Result<&P2TcpStreamingState, ErrorCode> {
711
match &self.tcp_state {
712
TcpState::P2Streaming(state) => Ok(state),
713
#[cfg(feature = "p3")]
714
TcpState::Error(err) => Err(err.into()),
715
_ => Err(ErrorCode::InvalidState),
716
}
717
}
718
719
pub(crate) fn set_p2_streaming_state(
720
&mut self,
721
state: P2TcpStreamingState,
722
) -> Result<(), ErrorCode> {
723
if !matches!(self.tcp_state, TcpState::Connected(_)) {
724
return Err(ErrorCode::InvalidState);
725
}
726
self.tcp_state = TcpState::P2Streaming(Box::new(state));
727
Ok(())
728
}
729
730
/// Used for `Pollable` in the WASIp2 implementation this awaits the socket
731
/// to be connected, if in the connecting state, or for a TCP accept to be
732
/// ready, if this is in the listening state.
733
///
734
/// For all other states this method immediately returns.
735
pub(crate) async fn ready(&mut self) {
736
match &mut self.tcp_state {
737
TcpState::Default(..)
738
| TcpState::BindStarted(..)
739
| TcpState::Bound(..)
740
| TcpState::ListenStarted(..)
741
| TcpState::ConnectReady(..)
742
| TcpState::Closed
743
| TcpState::Connected { .. }
744
| TcpState::Connecting(None)
745
| TcpState::Listening {
746
pending_accept: Some(_),
747
..
748
}
749
| TcpState::P2Streaming(_) => {}
750
751
#[cfg(feature = "p3")]
752
TcpState::Receiving(_) | TcpState::Error(_) => {}
753
754
TcpState::Connecting(Some(future)) => {
755
self.tcp_state = TcpState::ConnectReady(future.as_mut().await);
756
}
757
758
TcpState::Listening {
759
listener,
760
pending_accept: slot @ None,
761
} => {
762
let result = futures::future::poll_fn(|cx| {
763
listener.poll_accept(cx).map_ok(|(stream, _)| stream)
764
})
765
.await;
766
*slot = Some(result);
767
}
768
}
769
}
770
}
771
772
#[cfg(not(target_os = "macos"))]
773
pub use inherits_option::*;
774
#[cfg(not(target_os = "macos"))]
775
mod inherits_option {
776
use crate::sockets::SocketAddressFamily;
777
use tokio::net::TcpStream;
778
779
#[derive(Default, Clone)]
780
pub struct NonInheritedOptions;
781
782
impl NonInheritedOptions {
783
pub fn set_keep_alive_idle_time(&mut self, _value: u64) {}
784
785
pub fn set_hop_limit(&mut self, _value: u8) {}
786
787
pub fn set_receive_buffer_size(&mut self, _value: usize) {}
788
789
pub fn set_send_buffer_size(&mut self, _value: usize) {}
790
791
pub(crate) fn apply(&self, _family: SocketAddressFamily, _stream: &TcpStream) {}
792
}
793
}
794
795
#[cfg(target_os = "macos")]
796
pub use does_not_inherit_options::*;
797
#[cfg(target_os = "macos")]
798
mod does_not_inherit_options {
799
use crate::sockets::SocketAddressFamily;
800
use rustix::net::sockopt;
801
use std::sync::Arc;
802
use std::sync::atomic::{AtomicU8, AtomicU64, AtomicUsize, Ordering::Relaxed};
803
use std::time::Duration;
804
use tokio::net::TcpStream;
805
806
// The socket options below are not automatically inherited from the listener
807
// on all platforms. So we keep track of which options have been explicitly
808
// set and manually apply those values to newly accepted clients.
809
#[derive(Default, Clone)]
810
pub struct NonInheritedOptions(Arc<Inner>);
811
812
#[derive(Default)]
813
struct Inner {
814
receive_buffer_size: AtomicUsize,
815
send_buffer_size: AtomicUsize,
816
hop_limit: AtomicU8,
817
keep_alive_idle_time: AtomicU64, // nanoseconds
818
}
819
820
impl NonInheritedOptions {
821
pub fn set_keep_alive_idle_time(&mut self, value: u64) {
822
self.0.keep_alive_idle_time.store(value, Relaxed);
823
}
824
825
pub fn set_hop_limit(&mut self, value: u8) {
826
self.0.hop_limit.store(value, Relaxed);
827
}
828
829
pub fn set_receive_buffer_size(&mut self, value: usize) {
830
self.0.receive_buffer_size.store(value, Relaxed);
831
}
832
833
pub fn set_send_buffer_size(&mut self, value: usize) {
834
self.0.send_buffer_size.store(value, Relaxed);
835
}
836
837
pub(crate) fn apply(&self, family: SocketAddressFamily, stream: &TcpStream) {
838
// Manually inherit socket options from listener. We only have to
839
// do this on platforms that don't already do this automatically
840
// and only if a specific value was explicitly set on the listener.
841
842
let receive_buffer_size = self.0.receive_buffer_size.load(Relaxed);
843
if receive_buffer_size > 0 {
844
// Ignore potential error.
845
_ = sockopt::set_socket_recv_buffer_size(&stream, receive_buffer_size);
846
}
847
848
let send_buffer_size = self.0.send_buffer_size.load(Relaxed);
849
if send_buffer_size > 0 {
850
// Ignore potential error.
851
_ = sockopt::set_socket_send_buffer_size(&stream, send_buffer_size);
852
}
853
854
// For some reason, IP_TTL is inherited, but IPV6_UNICAST_HOPS isn't.
855
if family == SocketAddressFamily::Ipv6 {
856
let hop_limit = self.0.hop_limit.load(Relaxed);
857
if hop_limit > 0 {
858
// Ignore potential error.
859
_ = sockopt::set_ipv6_unicast_hops(&stream, Some(hop_limit));
860
}
861
}
862
863
let keep_alive_idle_time = self.0.keep_alive_idle_time.load(Relaxed);
864
if keep_alive_idle_time > 0 {
865
// Ignore potential error.
866
_ = sockopt::set_tcp_keepidle(&stream, Duration::from_nanos(keep_alive_idle_time));
867
}
868
}
869
}
870
}
871
872