Path: blob/main/crates/wasi/src/sockets/tcp.rs
3129 views
use crate::p2::P2TcpStreamingState;1use crate::runtime::with_ambient_tokio_runtime;2use crate::sockets::util::{3ErrorCode, get_unicast_hop_limit, is_valid_address_family, is_valid_remote_address,4is_valid_unicast_address, receive_buffer_size, send_buffer_size, set_keep_alive_count,5set_keep_alive_idle_time, set_keep_alive_interval, set_receive_buffer_size,6set_send_buffer_size, set_unicast_hop_limit, tcp_bind,7};8use crate::sockets::{DEFAULT_TCP_BACKLOG, SocketAddressFamily, WasiSocketsCtx};9use io_lifetimes::AsSocketlike as _;10use io_lifetimes::views::SocketlikeView;11use rustix::io::Errno;12use rustix::net::sockopt;13use std::fmt::Debug;14use std::io;15use std::mem;16use std::net::SocketAddr;17use std::pin::Pin;18use std::sync::Arc;19use std::task::{Context, Poll, Waker};20use std::time::Duration;2122/// The state of a TCP socket.23///24/// This represents the various states a socket can be in during the25/// activities of binding, listening, accepting, and connecting. Note that this26/// state machine encompasses both WASIp2 and WASIp3.27enum TcpState {28/// The initial state for a newly-created socket.29///30/// From here a socket can transition to `BindStarted`, `ListenStarted`, or31/// `Connecting`.32Default(tokio::net::TcpSocket),3334/// A state indicating that a bind has been started and must be finished35/// subsequently with `finish_bind`.36///37/// From here a socket can transition to `Bound`.38BindStarted(tokio::net::TcpSocket),3940/// Binding finished. The socket has an address but is not yet listening for41/// connections.42///43/// From here a socket can transition to `ListenStarted`, or `Connecting`.44Bound(tokio::net::TcpSocket),4546/// Listening on a socket has started and must be completed with47/// `finish_listen`.48///49/// From here a socket can transition to `Listening`.50ListenStarted(tokio::net::TcpSocket),5152/// The socket is now listening and waiting for an incoming connection.53///54/// Sockets will not leave this state.55Listening {56/// The raw tokio-basd TCP listener managing the underlying socket.57listener: Arc<tokio::net::TcpListener>,5859/// The last-accepted connection, set during the `ready` method and read60/// during the `accept` method. Note that this is only used for WASIp261/// at this time.62pending_accept: Option<io::Result<tokio::net::TcpStream>>,63},6465/// An outgoing connection is started.66///67/// This is created via the `start_connect` method. The payload here is an68/// optionally-specified owned future for the result of the connect. In69/// WASIp2 the future lives here, but in WASIp3 it lives on the event loop70/// so this is `None`.71///72/// From here a socket can transition to `ConnectReady` or `Connected`.73Connecting(Option<Pin<Box<dyn Future<Output = io::Result<tokio::net::TcpStream>> + Send>>>),7475/// A connection via `Connecting` has completed.76///77/// This is present for WASIp2 where the `Connecting` state stores `Some` of78/// a future, and the result of that future is recorded here when it79/// finishes as part of the `ready` method.80///81/// From here a socket can transition to `Connected`.82ConnectReady(io::Result<tokio::net::TcpStream>),8384/// A connection has been established.85///86/// This is created either via `finish_connect` or for freshly accepted87/// sockets from a TCP listener.88///89/// From here a socket can transition to `Receiving` or `P2Streaming`.90Connected(Arc<tokio::net::TcpStream>),9192/// A connection has been established and `receive` has been called.93///94/// A socket will not transition out of this state.95#[cfg(feature = "p3")]96Receiving(Arc<tokio::net::TcpStream>),9798/// This is a WASIp2-bound socket which stores some extra state for99/// read/write streams to handle TCP shutdown.100///101/// A socket will not transition out of this state.102P2Streaming(Box<P2TcpStreamingState>),103104/// This is not actually a socket but a deferred error.105///106/// This error came out of `accept` and is deferred until the socket is107/// operated on.108#[cfg(feature = "p3")]109Error(io::Error),110111/// The socket is closed and no more operations can be performed.112Closed,113}114115impl Debug for TcpState {116fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {117match self {118Self::Default(_) => f.debug_tuple("Default").finish(),119Self::BindStarted(_) => f.debug_tuple("BindStarted").finish(),120Self::Bound(_) => f.debug_tuple("Bound").finish(),121Self::ListenStarted { .. } => f.debug_tuple("ListenStarted").finish(),122Self::Listening { .. } => f.debug_tuple("Listening").finish(),123Self::Connecting(..) => f.debug_tuple("Connecting").finish(),124Self::ConnectReady(..) => f.debug_tuple("ConnectReady").finish(),125Self::Connected { .. } => f.debug_tuple("Connected").finish(),126#[cfg(feature = "p3")]127Self::Receiving { .. } => f.debug_tuple("Receiving").finish(),128Self::P2Streaming(_) => f.debug_tuple("P2Streaming").finish(),129#[cfg(feature = "p3")]130Self::Error(..) => f.debug_tuple("Error").finish(),131Self::Closed => write!(f, "Closed"),132}133}134}135136/// A host TCP socket, plus associated bookkeeping.137pub struct TcpSocket {138/// The current state in the bind/listen/accept/connect progression.139tcp_state: TcpState,140141/// The desired listen queue size.142listen_backlog_size: u32,143144family: SocketAddressFamily,145146options: NonInheritedOptions,147}148149impl TcpSocket {150/// Create a new socket in the given family.151pub(crate) fn new(152ctx: &WasiSocketsCtx,153family: SocketAddressFamily,154) -> Result<Self, ErrorCode> {155ctx.allowed_network_uses.check_allowed_tcp()?;156157with_ambient_tokio_runtime(|| {158let socket = match family {159SocketAddressFamily::Ipv4 => tokio::net::TcpSocket::new_v4()?,160SocketAddressFamily::Ipv6 => {161let socket = tokio::net::TcpSocket::new_v6()?;162sockopt::set_ipv6_v6only(&socket, true)?;163socket164}165};166167Ok(Self::from_state(TcpState::Default(socket), family))168})169}170171#[cfg(feature = "p3")]172pub(crate) fn new_error(err: io::Error, family: SocketAddressFamily) -> Self {173TcpSocket::from_state(TcpState::Error(err), family)174}175176/// Creates a new socket with the `result` of an accepted socket from a177/// `TcpListener`.178///179/// This will handle the `result` internally and `result` should be the raw180/// result from a TCP listen operation.181pub(crate) fn new_accept(182result: io::Result<tokio::net::TcpStream>,183options: &NonInheritedOptions,184family: SocketAddressFamily,185) -> io::Result<Self> {186let client = result.map_err(|err| match Errno::from_io_error(&err) {187// From: https://learn.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-accept#:~:text=WSAEINPROGRESS188// > WSAEINPROGRESS: A blocking Windows Sockets 1.1 call is in progress,189// > or the service provider is still processing a callback function.190//191// wasi-sockets doesn't have an equivalent to the EINPROGRESS error,192// because in POSIX this error is only returned by a non-blocking193// `connect` and wasi-sockets has a different solution for that.194#[cfg(windows)]195Some(Errno::INPROGRESS) => Errno::INTR.into(),196197// Normalize Linux' non-standard behavior.198//199// From https://man7.org/linux/man-pages/man2/accept.2.html:200// > Linux accept() passes already-pending network errors on the201// > new socket as an error code from accept(). This behavior202// > differs from other BSD socket implementations. (...)203#[cfg(target_os = "linux")]204Some(205Errno::CONNRESET206| Errno::NETRESET207| Errno::HOSTUNREACH208| Errno::HOSTDOWN209| Errno::NETDOWN210| Errno::NETUNREACH211| Errno::PROTO212| Errno::NOPROTOOPT213| Errno::NONET214| Errno::OPNOTSUPP,215) => Errno::CONNABORTED.into(),216217_ => err,218})?;219options.apply(family, &client);220Ok(Self::from_state(221TcpState::Connected(Arc::new(client)),222family,223))224}225226/// Create a `TcpSocket` from an existing socket.227fn from_state(state: TcpState, family: SocketAddressFamily) -> Self {228Self {229tcp_state: state,230listen_backlog_size: DEFAULT_TCP_BACKLOG,231family,232options: Default::default(),233}234}235236pub(crate) fn as_std_view(&self) -> Result<SocketlikeView<'_, std::net::TcpStream>, ErrorCode> {237match &self.tcp_state {238TcpState::Default(socket)239| TcpState::BindStarted(socket)240| TcpState::Bound(socket)241| TcpState::ListenStarted(socket) => Ok(socket.as_socketlike_view()),242TcpState::Connected(stream) => Ok(stream.as_socketlike_view()),243#[cfg(feature = "p3")]244TcpState::Receiving(stream) => Ok(stream.as_socketlike_view()),245TcpState::Listening { listener, .. } => Ok(listener.as_socketlike_view()),246TcpState::P2Streaming(state) => Ok(state.stream.as_socketlike_view()),247TcpState::Connecting(..) | TcpState::ConnectReady(_) | TcpState::Closed => {248Err(ErrorCode::InvalidState)249}250#[cfg(feature = "p3")]251TcpState::Error(err) => Err(err.into()),252}253}254255pub(crate) fn start_bind(&mut self, addr: SocketAddr) -> Result<(), ErrorCode> {256let ip = addr.ip();257if !is_valid_unicast_address(ip) || !is_valid_address_family(ip, self.family) {258return Err(ErrorCode::InvalidArgument);259}260match mem::replace(&mut self.tcp_state, TcpState::Closed) {261TcpState::Default(sock) => {262if let Err(err) = tcp_bind(&sock, addr) {263self.tcp_state = TcpState::Default(sock);264Err(err)265} else {266self.tcp_state = TcpState::BindStarted(sock);267Ok(())268}269}270tcp_state => {271self.tcp_state = tcp_state;272Err(ErrorCode::InvalidState)273}274}275}276277pub(crate) fn finish_bind(&mut self) -> Result<(), ErrorCode> {278match mem::replace(&mut self.tcp_state, TcpState::Closed) {279TcpState::BindStarted(socket) => {280self.tcp_state = TcpState::Bound(socket);281Ok(())282}283current_state => {284// Reset the state so that the outside world doesn't see this socket as closed285self.tcp_state = current_state;286Err(ErrorCode::NotInProgress)287}288}289}290291pub(crate) fn start_connect(292&mut self,293addr: &SocketAddr,294) -> Result<tokio::net::TcpSocket, ErrorCode> {295match self.tcp_state {296TcpState::Default(..) | TcpState::Bound(..) => {}297TcpState::Connecting(..) => {298return Err(ErrorCode::ConcurrencyConflict);299}300_ => return Err(ErrorCode::InvalidState),301};302303if !is_valid_unicast_address(addr.ip())304|| !is_valid_remote_address(*addr)305|| !is_valid_address_family(addr.ip(), self.family)306{307return Err(ErrorCode::InvalidArgument);308};309310let (TcpState::Default(tokio_socket) | TcpState::Bound(tokio_socket)) =311mem::replace(&mut self.tcp_state, TcpState::Connecting(None))312else {313unreachable!();314};315316Ok(tokio_socket)317}318319/// For WASIp2 this is used to record the actual connection future as part320/// of `start_connect` within this socket state.321pub(crate) fn set_pending_connect(322&mut self,323future: impl Future<Output = io::Result<tokio::net::TcpStream>> + Send + 'static,324) -> Result<(), ErrorCode> {325match &mut self.tcp_state {326TcpState::Connecting(slot @ None) => {327*slot = Some(Box::pin(future));328Ok(())329}330_ => Err(ErrorCode::InvalidState),331}332}333334/// For WASIp2 this retrieves the result from the future passed to335/// `set_pending_connect`.336///337/// Return states here are:338///339/// * `Ok(Some(res))` - where `res` is the result of the connect operation.340/// * `Ok(None)` - the connect operation isn't ready yet.341/// * `Err(e)` - a connect operation is not in progress.342pub(crate) fn take_pending_connect(343&mut self,344) -> Result<Option<io::Result<tokio::net::TcpStream>>, ErrorCode> {345match mem::replace(&mut self.tcp_state, TcpState::Connecting(None)) {346TcpState::ConnectReady(result) => Ok(Some(result)),347TcpState::Connecting(Some(mut future)) => {348let mut cx = Context::from_waker(Waker::noop());349match with_ambient_tokio_runtime(|| future.as_mut().poll(&mut cx)) {350Poll::Ready(result) => Ok(Some(result)),351Poll::Pending => {352self.tcp_state = TcpState::Connecting(Some(future));353Ok(None)354}355}356}357current_state => {358self.tcp_state = current_state;359Err(ErrorCode::NotInProgress)360}361}362}363364pub(crate) fn finish_connect(365&mut self,366result: io::Result<tokio::net::TcpStream>,367) -> Result<(), ErrorCode> {368if !matches!(self.tcp_state, TcpState::Connecting(None)) {369return Err(ErrorCode::InvalidState);370}371match result {372Ok(stream) => {373self.tcp_state = TcpState::Connected(Arc::new(stream));374Ok(())375}376Err(err) => {377self.tcp_state = TcpState::Closed;378Err(ErrorCode::from(err))379}380}381}382383/// Start listening using p2 semantics. (no implicit bind)384pub(crate) fn start_listen_p2(&mut self) -> Result<(), ErrorCode> {385match mem::replace(&mut self.tcp_state, TcpState::Closed) {386TcpState::Bound(tokio_socket) => {387self.tcp_state = TcpState::ListenStarted(tokio_socket);388Ok(())389}390previous_state => {391self.tcp_state = previous_state;392Err(ErrorCode::InvalidState)393}394}395}396397pub(crate) fn finish_listen_p2(&mut self) -> Result<(), ErrorCode> {398let tokio_socket = match mem::replace(&mut self.tcp_state, TcpState::Closed) {399TcpState::ListenStarted(tokio_socket) => tokio_socket,400previous_state => {401self.tcp_state = previous_state;402return Err(ErrorCode::NotInProgress);403}404};405406self.listen_common(tokio_socket)407}408409/// Start listening using p3 semantics. (with implicit bind)410#[cfg(feature = "p3")]411pub(crate) fn listen_p3(&mut self) -> Result<(), ErrorCode> {412let tokio_socket = match mem::replace(&mut self.tcp_state, TcpState::Closed) {413TcpState::Bound(tokio_socket) => tokio_socket,414TcpState::Default(tokio_socket) => {415// Some platforms automatically perform an implicit bind as part416// of the `listen` syscall. However this is not ubiquitous417// behavior:418// - Linux mentions it in their docs [0] that they perform an419// implicit bind. This behavior has been experimentally verified.420// - Windows requires a `bind` before `listen`. This is both421// documented [1] and experimentally verified.422// - Other platforms (e.g. macOS, FreeBSD) do not explicitly423// document it either way and instead leave it up to the424// individual protocol to decide [2]. However, experiments425// show that MacOS in fact _does_ perform an implicit bind.426//427// To ensure consistent behavior across all platforms, we428// perform the implicit bind ourselves here.429//430// [0]: https://man7.org/linux/man-pages/man7/ip.7.html431// > An ephemeral port is allocated to a socket in the following432// > circumstances: (...) listen(2) is called on a stream socket433// > that was not previously bound;434//435// [1]: https://learn.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-listen436// > WSAEINVAL: The socket has not been bound with bind.437//438// [2]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/listen.html439// > EDESTADDRREQ: The socket is not bound to a local address,440// > and the protocol does not support listening on an unbound441// > socket.442let implicit_addr = crate::sockets::util::implicit_bind_addr(self.family);443tcp_bind(&tokio_socket, implicit_addr)?;444tokio_socket445}446previous_state => {447self.tcp_state = previous_state;448return Err(ErrorCode::InvalidState);449}450};451452self.listen_common(tokio_socket)453}454455fn listen_common(&mut self, tokio_socket: tokio::net::TcpSocket) -> Result<(), ErrorCode> {456match with_ambient_tokio_runtime(|| tokio_socket.listen(self.listen_backlog_size)) {457Ok(listener) => {458self.tcp_state = TcpState::Listening {459listener: Arc::new(listener),460pending_accept: None,461};462Ok(())463}464Err(err) => {465self.tcp_state = TcpState::Closed;466467Err(match Errno::from_io_error(&err) {468// See: https://learn.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-listen#:~:text=WSAEMFILE469// According to the docs, `listen` can return EMFILE on Windows.470// This is odd, because we're not trying to create a new socket471// or file descriptor of any kind. So we rewrite it to less472// surprising error code.473//474// At the time of writing, this behavior has never been experimentally475// observed by any of the wasmtime authors, so we're relying fully476// on Microsoft's documentation here.477#[cfg(windows)]478Some(Errno::MFILE) => Errno::NOBUFS.into(),479480_ => err.into(),481})482}483}484}485486pub(crate) fn accept(&mut self) -> Result<Option<Self>, ErrorCode> {487let TcpState::Listening {488listener,489pending_accept,490} = &mut self.tcp_state491else {492return Err(ErrorCode::InvalidState);493};494495let result = match pending_accept.take() {496Some(result) => result,497None => {498let mut cx = std::task::Context::from_waker(Waker::noop());499match with_ambient_tokio_runtime(|| listener.poll_accept(&mut cx))500.map_ok(|(stream, _)| stream)501{502Poll::Ready(result) => result,503Poll::Pending => return Ok(None),504}505}506};507508Ok(Some(Self::new_accept(result, &self.options, self.family)?))509}510511#[cfg(feature = "p3")]512pub(crate) fn start_receive(&mut self) -> Option<&Arc<tokio::net::TcpStream>> {513match mem::replace(&mut self.tcp_state, TcpState::Closed) {514TcpState::Connected(stream) => {515self.tcp_state = TcpState::Receiving(stream);516Some(self.tcp_stream_arc().unwrap())517}518prev => {519self.tcp_state = prev;520None521}522}523}524525pub(crate) fn local_address(&self) -> Result<SocketAddr, ErrorCode> {526match &self.tcp_state {527TcpState::Bound(socket) => Ok(socket.local_addr()?),528TcpState::Connected(stream) => Ok(stream.local_addr()?),529#[cfg(feature = "p3")]530TcpState::Receiving(stream) => Ok(stream.local_addr()?),531TcpState::P2Streaming(state) => Ok(state.stream.local_addr()?),532TcpState::Listening { listener, .. } => Ok(listener.local_addr()?),533#[cfg(feature = "p3")]534TcpState::Error(err) => Err(err.into()),535_ => Err(ErrorCode::InvalidState),536}537}538539pub(crate) fn remote_address(&self) -> Result<SocketAddr, ErrorCode> {540let stream = self.tcp_stream_arc()?;541let addr = stream.peer_addr()?;542Ok(addr)543}544545pub(crate) fn is_listening(&self) -> bool {546matches!(self.tcp_state, TcpState::Listening { .. })547}548549pub(crate) fn address_family(&self) -> SocketAddressFamily {550self.family551}552553pub(crate) fn set_listen_backlog_size(&mut self, value: u64) -> Result<(), ErrorCode> {554const MIN_BACKLOG: u32 = 1;555const MAX_BACKLOG: u32 = i32::MAX as u32; // OS'es will most likely limit it down even further.556557if value == 0 {558return Err(ErrorCode::InvalidArgument);559}560// Silently clamp backlog size. This is OK for us to do, because operating systems do this too.561let value = value562.try_into()563.unwrap_or(MAX_BACKLOG)564.clamp(MIN_BACKLOG, MAX_BACKLOG);565match &self.tcp_state {566TcpState::Default(..) | TcpState::Bound(..) => {567// Socket not listening yet. Stash value for first invocation to `listen`.568self.listen_backlog_size = value;569Ok(())570}571TcpState::Listening { listener, .. } => {572// Try to update the backlog by calling `listen` again.573// Not all platforms support this. We'll only update our own value if the OS supports changing the backlog size after the fact.574if rustix::net::listen(&listener, value.try_into().unwrap_or(i32::MAX)).is_err() {575return Err(ErrorCode::NotSupported);576}577self.listen_backlog_size = value;578Ok(())579}580#[cfg(feature = "p3")]581TcpState::Error(err) => Err(err.into()),582_ => Err(ErrorCode::InvalidState),583}584}585586pub(crate) fn keep_alive_enabled(&self) -> Result<bool, ErrorCode> {587let fd = &*self.as_std_view()?;588let v = sockopt::socket_keepalive(fd)?;589Ok(v)590}591592pub(crate) fn set_keep_alive_enabled(&self, value: bool) -> Result<(), ErrorCode> {593let fd = &*self.as_std_view()?;594sockopt::set_socket_keepalive(fd, value)?;595Ok(())596}597598pub(crate) fn keep_alive_idle_time(&self) -> Result<u64, ErrorCode> {599let fd = &*self.as_std_view()?;600let v = sockopt::tcp_keepidle(fd)?;601Ok(v.as_nanos().try_into().unwrap_or(u64::MAX))602}603604pub(crate) fn set_keep_alive_idle_time(&mut self, value: u64) -> Result<(), ErrorCode> {605let value = {606let fd = self.as_std_view()?;607set_keep_alive_idle_time(&*fd, value)?608};609self.options.set_keep_alive_idle_time(value);610Ok(())611}612613pub(crate) fn keep_alive_interval(&self) -> Result<u64, ErrorCode> {614let fd = &*self.as_std_view()?;615let v = sockopt::tcp_keepintvl(fd)?;616Ok(v.as_nanos().try_into().unwrap_or(u64::MAX))617}618619pub(crate) fn set_keep_alive_interval(&self, value: u64) -> Result<(), ErrorCode> {620let fd = &*self.as_std_view()?;621set_keep_alive_interval(fd, Duration::from_nanos(value))?;622Ok(())623}624625pub(crate) fn keep_alive_count(&self) -> Result<u32, ErrorCode> {626let fd = &*self.as_std_view()?;627let v = sockopt::tcp_keepcnt(fd)?;628Ok(v)629}630631pub(crate) fn set_keep_alive_count(&self, value: u32) -> Result<(), ErrorCode> {632let fd = &*self.as_std_view()?;633set_keep_alive_count(fd, value)?;634Ok(())635}636637pub(crate) fn hop_limit(&self) -> Result<u8, ErrorCode> {638let fd = &*self.as_std_view()?;639let n = get_unicast_hop_limit(fd, self.family)?;640Ok(n)641}642643pub(crate) fn set_hop_limit(&mut self, value: u8) -> Result<(), ErrorCode> {644{645let fd = &*self.as_std_view()?;646set_unicast_hop_limit(fd, self.family, value)?;647}648self.options.set_hop_limit(value);649Ok(())650}651652pub(crate) fn receive_buffer_size(&self) -> Result<u64, ErrorCode> {653let fd = &*self.as_std_view()?;654let n = receive_buffer_size(fd)?;655Ok(n)656}657658pub(crate) fn set_receive_buffer_size(&mut self, value: u64) -> Result<(), ErrorCode> {659let res = {660let fd = &*self.as_std_view()?;661set_receive_buffer_size(fd, value)?662};663self.options.set_receive_buffer_size(res);664Ok(())665}666667pub(crate) fn send_buffer_size(&self) -> Result<u64, ErrorCode> {668let fd = &*self.as_std_view()?;669let n = send_buffer_size(fd)?;670Ok(n)671}672673pub(crate) fn set_send_buffer_size(&mut self, value: u64) -> Result<(), ErrorCode> {674let res = {675let fd = &*self.as_std_view()?;676set_send_buffer_size(fd, value)?677};678self.options.set_send_buffer_size(res);679Ok(())680}681682#[cfg(feature = "p3")]683pub(crate) fn non_inherited_options(&self) -> &NonInheritedOptions {684&self.options685}686687#[cfg(feature = "p3")]688pub(crate) fn tcp_listener_arc(&self) -> Result<&Arc<tokio::net::TcpListener>, ErrorCode> {689match &self.tcp_state {690TcpState::Listening { listener, .. } => Ok(listener),691#[cfg(feature = "p3")]692TcpState::Error(err) => Err(err.into()),693_ => Err(ErrorCode::InvalidState),694}695}696697pub(crate) fn tcp_stream_arc(&self) -> Result<&Arc<tokio::net::TcpStream>, ErrorCode> {698match &self.tcp_state {699TcpState::Connected(socket) => Ok(socket),700#[cfg(feature = "p3")]701TcpState::Receiving(socket) => Ok(socket),702TcpState::P2Streaming(state) => Ok(&state.stream),703#[cfg(feature = "p3")]704TcpState::Error(err) => Err(err.into()),705_ => Err(ErrorCode::InvalidState),706}707}708709pub(crate) fn p2_streaming_state(&self) -> Result<&P2TcpStreamingState, ErrorCode> {710match &self.tcp_state {711TcpState::P2Streaming(state) => Ok(state),712#[cfg(feature = "p3")]713TcpState::Error(err) => Err(err.into()),714_ => Err(ErrorCode::InvalidState),715}716}717718pub(crate) fn set_p2_streaming_state(719&mut self,720state: P2TcpStreamingState,721) -> Result<(), ErrorCode> {722if !matches!(self.tcp_state, TcpState::Connected(_)) {723return Err(ErrorCode::InvalidState);724}725self.tcp_state = TcpState::P2Streaming(Box::new(state));726Ok(())727}728729/// Used for `Pollable` in the WASIp2 implementation this awaits the socket730/// to be connected, if in the connecting state, or for a TCP accept to be731/// ready, if this is in the listening state.732///733/// For all other states this method immediately returns.734pub(crate) async fn ready(&mut self) {735match &mut self.tcp_state {736TcpState::Default(..)737| TcpState::BindStarted(..)738| TcpState::Bound(..)739| TcpState::ListenStarted(..)740| TcpState::ConnectReady(..)741| TcpState::Closed742| TcpState::Connected { .. }743| TcpState::Connecting(None)744| TcpState::Listening {745pending_accept: Some(_),746..747}748| TcpState::P2Streaming(_) => {}749750#[cfg(feature = "p3")]751TcpState::Receiving(_) | TcpState::Error(_) => {}752753TcpState::Connecting(Some(future)) => {754self.tcp_state = TcpState::ConnectReady(future.as_mut().await);755}756757TcpState::Listening {758listener,759pending_accept: slot @ None,760} => {761let result = futures::future::poll_fn(|cx| {762listener.poll_accept(cx).map_ok(|(stream, _)| stream)763})764.await;765*slot = Some(result);766}767}768}769}770771#[cfg(not(target_os = "macos"))]772pub use inherits_option::*;773#[cfg(not(target_os = "macos"))]774mod inherits_option {775use crate::sockets::SocketAddressFamily;776use tokio::net::TcpStream;777778#[derive(Default, Clone)]779pub struct NonInheritedOptions;780781impl NonInheritedOptions {782pub fn set_keep_alive_idle_time(&mut self, _value: u64) {}783784pub fn set_hop_limit(&mut self, _value: u8) {}785786pub fn set_receive_buffer_size(&mut self, _value: usize) {}787788pub fn set_send_buffer_size(&mut self, _value: usize) {}789790pub(crate) fn apply(&self, _family: SocketAddressFamily, _stream: &TcpStream) {}791}792}793794#[cfg(target_os = "macos")]795pub use does_not_inherit_options::*;796#[cfg(target_os = "macos")]797mod does_not_inherit_options {798use crate::sockets::SocketAddressFamily;799use rustix::net::sockopt;800use std::sync::Arc;801use std::sync::atomic::{AtomicU8, AtomicU64, AtomicUsize, Ordering::Relaxed};802use std::time::Duration;803use tokio::net::TcpStream;804805// The socket options below are not automatically inherited from the listener806// on all platforms. So we keep track of which options have been explicitly807// set and manually apply those values to newly accepted clients.808#[derive(Default, Clone)]809pub struct NonInheritedOptions(Arc<Inner>);810811#[derive(Default)]812struct Inner {813receive_buffer_size: AtomicUsize,814send_buffer_size: AtomicUsize,815hop_limit: AtomicU8,816keep_alive_idle_time: AtomicU64, // nanoseconds817}818819impl NonInheritedOptions {820pub fn set_keep_alive_idle_time(&mut self, value: u64) {821self.0.keep_alive_idle_time.store(value, Relaxed);822}823824pub fn set_hop_limit(&mut self, value: u8) {825self.0.hop_limit.store(value, Relaxed);826}827828pub fn set_receive_buffer_size(&mut self, value: usize) {829self.0.receive_buffer_size.store(value, Relaxed);830}831832pub fn set_send_buffer_size(&mut self, value: usize) {833self.0.send_buffer_size.store(value, Relaxed);834}835836pub(crate) fn apply(&self, family: SocketAddressFamily, stream: &TcpStream) {837// Manually inherit socket options from listener. We only have to838// do this on platforms that don't already do this automatically839// and only if a specific value was explicitly set on the listener.840841let receive_buffer_size = self.0.receive_buffer_size.load(Relaxed);842if receive_buffer_size > 0 {843// Ignore potential error.844_ = sockopt::set_socket_recv_buffer_size(&stream, receive_buffer_size);845}846847let send_buffer_size = self.0.send_buffer_size.load(Relaxed);848if send_buffer_size > 0 {849// Ignore potential error.850_ = sockopt::set_socket_send_buffer_size(&stream, send_buffer_size);851}852853// For some reason, IP_TTL is inherited, but IPV6_UNICAST_HOPS isn't.854if family == SocketAddressFamily::Ipv6 {855let hop_limit = self.0.hop_limit.load(Relaxed);856if hop_limit > 0 {857// Ignore potential error.858_ = sockopt::set_ipv6_unicast_hops(&stream, Some(hop_limit));859}860}861862let keep_alive_idle_time = self.0.keep_alive_idle_time.load(Relaxed);863if keep_alive_idle_time > 0 {864// Ignore potential error.865_ = sockopt::set_tcp_keepidle(&stream, Duration::from_nanos(keep_alive_idle_time));866}867}868}869}870871872