Path: blob/main/crates/wasi/src/sockets/tcp.rs
1692 views
use crate::p2::P2TcpStreamingState;1use crate::runtime::with_ambient_tokio_runtime;2use crate::sockets::util::{3ErrorCode, get_unicast_hop_limit, is_valid_address_family, is_valid_remote_address,4is_valid_unicast_address, receive_buffer_size, send_buffer_size, set_keep_alive_count,5set_keep_alive_idle_time, set_keep_alive_interval, set_receive_buffer_size,6set_send_buffer_size, set_unicast_hop_limit, tcp_bind,7};8use crate::sockets::{DEFAULT_TCP_BACKLOG, SocketAddressFamily, WasiSocketsCtx};9use io_lifetimes::AsSocketlike as _;10use io_lifetimes::views::SocketlikeView;11use rustix::io::Errno;12use rustix::net::sockopt;13use std::fmt::Debug;14use std::io;15use std::mem;16use std::net::SocketAddr;17use std::pin::Pin;18use std::sync::Arc;19use std::task::{Context, Poll, Waker};20use std::time::Duration;2122/// The state of a TCP socket.23///24/// This represents the various states a socket can be in during the25/// activities of binding, listening, accepting, and connecting. Note that this26/// state machine encompasses both WASIp2 and WASIp3.27enum TcpState {28/// The initial state for a newly-created socket.29///30/// From here a socket can transition to `BindStarted`, `ListenStarted`, or31/// `Connecting`.32Default(tokio::net::TcpSocket),3334/// A state indicating that a bind has been started and must be finished35/// subsequently with `finish_bind`.36///37/// From here a socket can transition to `Bound`.38BindStarted(tokio::net::TcpSocket),3940/// Binding finished. The socket has an address but is not yet listening for41/// connections.42///43/// From here a socket can transition to `ListenStarted`, or `Connecting`.44Bound(tokio::net::TcpSocket),4546/// Listening on a socket has started and must be completed with47/// `finish_listen`.48///49/// From here a socket can transition to `Listening`.50ListenStarted(tokio::net::TcpSocket),5152/// The socket is now listening and waiting for an incoming connection.53///54/// Sockets will not leave this state.55Listening {56/// The raw tokio-basd TCP listener managing the underyling socket.57listener: Arc<tokio::net::TcpListener>,5859/// The last-accepted connection, set during the `ready` method and read60/// during the `accept` method. Note that this is only used for WASIp261/// at this time.62pending_accept: Option<io::Result<tokio::net::TcpStream>>,63},6465/// An outgoing connection is started.66///67/// This is created via the `start_connect` method. The payload here is an68/// optionally-specified owned future for the result of the connect. In69/// WASIp2 the future lives here, but in WASIp3 it lives on the event loop70/// so this is `None`.71///72/// From here a socket can transition to `ConnectReady` or `Connected`.73Connecting(Option<Pin<Box<dyn Future<Output = io::Result<tokio::net::TcpStream>> + Send>>>),7475/// A connection via `Connecting` has completed.76///77/// This is present for WASIp2 where the `Connecting` state stores `Some` of78/// a future, and the result of that future is recorded here when it79/// finishes as part of the `ready` method.80///81/// From here a socket can transition to `Connected`.82ConnectReady(io::Result<tokio::net::TcpStream>),8384/// A connection has been established.85///86/// This is created either via `finish_connect` or for freshly accepted87/// sockets from a TCP listener.88///89/// From here a socket can transition to `Receiving` or `P2Streaming`.90Connected(Arc<tokio::net::TcpStream>),9192/// A connection has been established and `receive` has been called.93///94/// A socket will not transition out of this state.95#[cfg(feature = "p3")]96Receiving(Arc<tokio::net::TcpStream>),9798/// This is a WASIp2-bound socket which stores some extra state for99/// read/write streams to handle TCP shutdown.100///101/// A socket will not transition out of this state.102P2Streaming(Box<P2TcpStreamingState>),103104/// This is not actually a socket but a deferred error.105///106/// This error came out of `accept` and is deferred until the socket is107/// operated on.108#[cfg(feature = "p3")]109Error(io::Error),110111/// The socket is closed and no more operations can be performed.112Closed,113}114115impl Debug for TcpState {116fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {117match self {118Self::Default(_) => f.debug_tuple("Default").finish(),119Self::BindStarted(_) => f.debug_tuple("BindStarted").finish(),120Self::Bound(_) => f.debug_tuple("Bound").finish(),121Self::ListenStarted { .. } => f.debug_tuple("ListenStarted").finish(),122Self::Listening { .. } => f.debug_tuple("Listening").finish(),123Self::Connecting(..) => f.debug_tuple("Connecting").finish(),124Self::ConnectReady(..) => f.debug_tuple("ConnectReady").finish(),125Self::Connected { .. } => f.debug_tuple("Connected").finish(),126#[cfg(feature = "p3")]127Self::Receiving { .. } => f.debug_tuple("Receiving").finish(),128Self::P2Streaming(_) => f.debug_tuple("P2Streaming").finish(),129#[cfg(feature = "p3")]130Self::Error(..) => f.debug_tuple("Error").finish(),131Self::Closed => write!(f, "Closed"),132}133}134}135136/// A host TCP socket, plus associated bookkeeping.137pub struct TcpSocket {138/// The current state in the bind/listen/accept/connect progression.139tcp_state: TcpState,140141/// The desired listen queue size.142listen_backlog_size: u32,143144family: SocketAddressFamily,145146options: NonInheritedOptions,147}148149impl TcpSocket {150/// Create a new socket in the given family.151pub(crate) fn new(152ctx: &WasiSocketsCtx,153family: SocketAddressFamily,154) -> Result<Self, ErrorCode> {155ctx.allowed_network_uses.check_allowed_tcp()?;156157with_ambient_tokio_runtime(|| {158let socket = match family {159SocketAddressFamily::Ipv4 => tokio::net::TcpSocket::new_v4()?,160SocketAddressFamily::Ipv6 => {161let socket = tokio::net::TcpSocket::new_v6()?;162sockopt::set_ipv6_v6only(&socket, true)?;163socket164}165};166167Ok(Self::from_state(TcpState::Default(socket), family))168})169}170171#[cfg(feature = "p3")]172pub(crate) fn new_error(err: io::Error, family: SocketAddressFamily) -> Self {173TcpSocket::from_state(TcpState::Error(err), family)174}175176/// Creates a new socket with the `result` of an accepted socket from a177/// `TcpListener`.178///179/// This will handle the `result` internally and `result` should be the raw180/// result from a TCP listen operation.181pub(crate) fn new_accept(182result: io::Result<tokio::net::TcpStream>,183options: &NonInheritedOptions,184family: SocketAddressFamily,185) -> io::Result<Self> {186let client = result.map_err(|err| match Errno::from_io_error(&err) {187// From: https://learn.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-accept#:~:text=WSAEINPROGRESS188// > WSAEINPROGRESS: A blocking Windows Sockets 1.1 call is in progress,189// > or the service provider is still processing a callback function.190//191// wasi-sockets doesn't have an equivalent to the EINPROGRESS error,192// because in POSIX this error is only returned by a non-blocking193// `connect` and wasi-sockets has a different solution for that.194#[cfg(windows)]195Some(Errno::INPROGRESS) => Errno::INTR.into(),196197// Normalize Linux' non-standard behavior.198//199// From https://man7.org/linux/man-pages/man2/accept.2.html:200// > Linux accept() passes already-pending network errors on the201// > new socket as an error code from accept(). This behavior202// > differs from other BSD socket implementations. (...)203#[cfg(target_os = "linux")]204Some(205Errno::CONNRESET206| Errno::NETRESET207| Errno::HOSTUNREACH208| Errno::HOSTDOWN209| Errno::NETDOWN210| Errno::NETUNREACH211| Errno::PROTO212| Errno::NOPROTOOPT213| Errno::NONET214| Errno::OPNOTSUPP,215) => Errno::CONNABORTED.into(),216217_ => err,218})?;219options.apply(family, &client);220Ok(Self::from_state(221TcpState::Connected(Arc::new(client)),222family,223))224}225226/// Create a `TcpSocket` from an existing socket.227fn from_state(state: TcpState, family: SocketAddressFamily) -> Self {228Self {229tcp_state: state,230listen_backlog_size: DEFAULT_TCP_BACKLOG,231family,232options: Default::default(),233}234}235236pub(crate) fn as_std_view(&self) -> Result<SocketlikeView<'_, std::net::TcpStream>, ErrorCode> {237match &self.tcp_state {238TcpState::Default(socket)239| TcpState::BindStarted(socket)240| TcpState::Bound(socket)241| TcpState::ListenStarted(socket) => Ok(socket.as_socketlike_view()),242TcpState::Connected(stream) => Ok(stream.as_socketlike_view()),243#[cfg(feature = "p3")]244TcpState::Receiving(stream) => Ok(stream.as_socketlike_view()),245TcpState::Listening { listener, .. } => Ok(listener.as_socketlike_view()),246TcpState::P2Streaming(state) => Ok(state.stream.as_socketlike_view()),247TcpState::Connecting(..) | TcpState::ConnectReady(_) | TcpState::Closed => {248Err(ErrorCode::InvalidState)249}250#[cfg(feature = "p3")]251TcpState::Error(err) => Err(err.into()),252}253}254255pub(crate) fn start_bind(&mut self, addr: SocketAddr) -> Result<(), ErrorCode> {256let ip = addr.ip();257if !is_valid_unicast_address(ip) || !is_valid_address_family(ip, self.family) {258return Err(ErrorCode::InvalidArgument);259}260match mem::replace(&mut self.tcp_state, TcpState::Closed) {261TcpState::Default(sock) => {262if let Err(err) = tcp_bind(&sock, addr) {263self.tcp_state = TcpState::Default(sock);264Err(err)265} else {266self.tcp_state = TcpState::BindStarted(sock);267Ok(())268}269}270tcp_state => {271self.tcp_state = tcp_state;272Err(ErrorCode::InvalidState)273}274}275}276277pub(crate) fn finish_bind(&mut self) -> Result<(), ErrorCode> {278match mem::replace(&mut self.tcp_state, TcpState::Closed) {279TcpState::BindStarted(socket) => {280self.tcp_state = TcpState::Bound(socket);281Ok(())282}283current_state => {284// Reset the state so that the outside world doesn't see this socket as closed285self.tcp_state = current_state;286Err(ErrorCode::NotInProgress)287}288}289}290291pub(crate) fn start_connect(292&mut self,293addr: &SocketAddr,294) -> Result<tokio::net::TcpSocket, ErrorCode> {295match self.tcp_state {296TcpState::Default(..) | TcpState::Bound(..) => {}297TcpState::Connecting(..) => {298return Err(ErrorCode::ConcurrencyConflict);299}300_ => return Err(ErrorCode::InvalidState),301};302303if !is_valid_unicast_address(addr.ip())304|| !is_valid_remote_address(*addr)305|| !is_valid_address_family(addr.ip(), self.family)306{307return Err(ErrorCode::InvalidArgument);308};309310let (TcpState::Default(tokio_socket) | TcpState::Bound(tokio_socket)) =311mem::replace(&mut self.tcp_state, TcpState::Connecting(None))312else {313unreachable!();314};315316Ok(tokio_socket)317}318319/// For WASIp2 this is used to record the actual connection future as part320/// of `start_connect` within this socket state.321pub(crate) fn set_pending_connect(322&mut self,323future: impl Future<Output = io::Result<tokio::net::TcpStream>> + Send + 'static,324) -> Result<(), ErrorCode> {325match &mut self.tcp_state {326TcpState::Connecting(slot @ None) => {327*slot = Some(Box::pin(future));328Ok(())329}330_ => Err(ErrorCode::InvalidState),331}332}333334/// For WASIp2 this retreives the result from the future passed to335/// `set_pending_connect`.336///337/// Return states here are:338///339/// * `Ok(Some(res))` - where `res` is the result of the connect operation.340/// * `Ok(None)` - the connect operation isn't ready yet.341/// * `Err(e)` - a connect operation is not in progress.342pub(crate) fn take_pending_connect(343&mut self,344) -> Result<Option<io::Result<tokio::net::TcpStream>>, ErrorCode> {345match mem::replace(&mut self.tcp_state, TcpState::Connecting(None)) {346TcpState::ConnectReady(result) => Ok(Some(result)),347TcpState::Connecting(Some(mut future)) => {348let mut cx = Context::from_waker(Waker::noop());349match with_ambient_tokio_runtime(|| future.as_mut().poll(&mut cx)) {350Poll::Ready(result) => Ok(Some(result)),351Poll::Pending => {352self.tcp_state = TcpState::Connecting(Some(future));353Ok(None)354}355}356}357current_state => {358self.tcp_state = current_state;359Err(ErrorCode::NotInProgress)360}361}362}363364pub(crate) fn finish_connect(365&mut self,366result: io::Result<tokio::net::TcpStream>,367) -> Result<(), ErrorCode> {368if !matches!(self.tcp_state, TcpState::Connecting(None)) {369return Err(ErrorCode::InvalidState);370}371match result {372Ok(stream) => {373self.tcp_state = TcpState::Connected(Arc::new(stream));374Ok(())375}376Err(err) => {377self.tcp_state = TcpState::Closed;378Err(ErrorCode::from(err))379}380}381}382383pub(crate) fn start_listen(&mut self) -> Result<(), ErrorCode> {384match mem::replace(&mut self.tcp_state, TcpState::Closed) {385TcpState::Bound(tokio_socket) => {386self.tcp_state = TcpState::ListenStarted(tokio_socket);387Ok(())388}389previous_state => {390self.tcp_state = previous_state;391Err(ErrorCode::InvalidState)392}393}394}395396pub(crate) fn finish_listen(&mut self) -> Result<(), ErrorCode> {397let tokio_socket = match mem::replace(&mut self.tcp_state, TcpState::Closed) {398TcpState::ListenStarted(tokio_socket) => tokio_socket,399previous_state => {400self.tcp_state = previous_state;401return Err(ErrorCode::NotInProgress);402}403};404405match with_ambient_tokio_runtime(|| tokio_socket.listen(self.listen_backlog_size)) {406Ok(listener) => {407self.tcp_state = TcpState::Listening {408listener: Arc::new(listener),409pending_accept: None,410};411Ok(())412}413Err(err) => {414self.tcp_state = TcpState::Closed;415416Err(match Errno::from_io_error(&err) {417// See: https://learn.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-listen#:~:text=WSAEMFILE418// According to the docs, `listen` can return EMFILE on Windows.419// This is odd, because we're not trying to create a new socket420// or file descriptor of any kind. So we rewrite it to less421// surprising error code.422//423// At the time of writing, this behavior has never been experimentally424// observed by any of the wasmtime authors, so we're relying fully425// on Microsoft's documentation here.426#[cfg(windows)]427Some(Errno::MFILE) => Errno::NOBUFS.into(),428429_ => err.into(),430})431}432}433}434435pub(crate) fn accept(&mut self) -> Result<Option<Self>, ErrorCode> {436let TcpState::Listening {437listener,438pending_accept,439} = &mut self.tcp_state440else {441return Err(ErrorCode::InvalidState);442};443444let result = match pending_accept.take() {445Some(result) => result,446None => {447let mut cx = std::task::Context::from_waker(Waker::noop());448match with_ambient_tokio_runtime(|| listener.poll_accept(&mut cx))449.map_ok(|(stream, _)| stream)450{451Poll::Ready(result) => result,452Poll::Pending => return Ok(None),453}454}455};456457Ok(Some(Self::new_accept(result, &self.options, self.family)?))458}459460#[cfg(feature = "p3")]461pub(crate) fn start_receive(&mut self) -> Option<&Arc<tokio::net::TcpStream>> {462match mem::replace(&mut self.tcp_state, TcpState::Closed) {463TcpState::Connected(stream) => {464self.tcp_state = TcpState::Receiving(stream);465Some(self.tcp_stream_arc().unwrap())466}467prev => {468self.tcp_state = prev;469None470}471}472}473474pub(crate) fn local_address(&self) -> Result<SocketAddr, ErrorCode> {475match &self.tcp_state {476TcpState::Bound(socket) => Ok(socket.local_addr()?),477TcpState::Connected(stream) => Ok(stream.local_addr()?),478#[cfg(feature = "p3")]479TcpState::Receiving(stream) => Ok(stream.local_addr()?),480TcpState::P2Streaming(state) => Ok(state.stream.local_addr()?),481TcpState::Listening { listener, .. } => Ok(listener.local_addr()?),482#[cfg(feature = "p3")]483TcpState::Error(err) => Err(err.into()),484_ => Err(ErrorCode::InvalidState),485}486}487488pub(crate) fn remote_address(&self) -> Result<SocketAddr, ErrorCode> {489let stream = self.tcp_stream_arc()?;490let addr = stream.peer_addr()?;491Ok(addr)492}493494pub(crate) fn is_listening(&self) -> bool {495matches!(self.tcp_state, TcpState::Listening { .. })496}497498pub(crate) fn address_family(&self) -> SocketAddressFamily {499self.family500}501502pub(crate) fn set_listen_backlog_size(&mut self, value: u64) -> Result<(), ErrorCode> {503const MIN_BACKLOG: u32 = 1;504const MAX_BACKLOG: u32 = i32::MAX as u32; // OS'es will most likely limit it down even further.505506if value == 0 {507return Err(ErrorCode::InvalidArgument);508}509// Silently clamp backlog size. This is OK for us to do, because operating systems do this too.510let value = value511.try_into()512.unwrap_or(MAX_BACKLOG)513.clamp(MIN_BACKLOG, MAX_BACKLOG);514match &self.tcp_state {515TcpState::Default(..) | TcpState::Bound(..) => {516// Socket not listening yet. Stash value for first invocation to `listen`.517self.listen_backlog_size = value;518Ok(())519}520TcpState::Listening { listener, .. } => {521// Try to update the backlog by calling `listen` again.522// Not all platforms support this. We'll only update our own value if the OS supports changing the backlog size after the fact.523if rustix::net::listen(&listener, value.try_into().unwrap_or(i32::MAX)).is_err() {524return Err(ErrorCode::NotSupported);525}526self.listen_backlog_size = value;527Ok(())528}529#[cfg(feature = "p3")]530TcpState::Error(err) => Err(err.into()),531_ => Err(ErrorCode::InvalidState),532}533}534535pub(crate) fn keep_alive_enabled(&self) -> Result<bool, ErrorCode> {536let fd = &*self.as_std_view()?;537let v = sockopt::socket_keepalive(fd)?;538Ok(v)539}540541pub(crate) fn set_keep_alive_enabled(&self, value: bool) -> Result<(), ErrorCode> {542let fd = &*self.as_std_view()?;543sockopt::set_socket_keepalive(fd, value)?;544Ok(())545}546547pub(crate) fn keep_alive_idle_time(&self) -> Result<u64, ErrorCode> {548let fd = &*self.as_std_view()?;549let v = sockopt::tcp_keepidle(fd)?;550Ok(v.as_nanos().try_into().unwrap_or(u64::MAX))551}552553pub(crate) fn set_keep_alive_idle_time(&mut self, value: u64) -> Result<(), ErrorCode> {554let value = {555let fd = self.as_std_view()?;556set_keep_alive_idle_time(&*fd, value)?557};558self.options.set_keep_alive_idle_time(value);559Ok(())560}561562pub(crate) fn keep_alive_interval(&self) -> Result<u64, ErrorCode> {563let fd = &*self.as_std_view()?;564let v = sockopt::tcp_keepintvl(fd)?;565Ok(v.as_nanos().try_into().unwrap_or(u64::MAX))566}567568pub(crate) fn set_keep_alive_interval(&self, value: u64) -> Result<(), ErrorCode> {569let fd = &*self.as_std_view()?;570set_keep_alive_interval(fd, Duration::from_nanos(value))?;571Ok(())572}573574pub(crate) fn keep_alive_count(&self) -> Result<u32, ErrorCode> {575let fd = &*self.as_std_view()?;576let v = sockopt::tcp_keepcnt(fd)?;577Ok(v)578}579580pub(crate) fn set_keep_alive_count(&self, value: u32) -> Result<(), ErrorCode> {581let fd = &*self.as_std_view()?;582set_keep_alive_count(fd, value)?;583Ok(())584}585586pub(crate) fn hop_limit(&self) -> Result<u8, ErrorCode> {587let fd = &*self.as_std_view()?;588let n = get_unicast_hop_limit(fd, self.family)?;589Ok(n)590}591592pub(crate) fn set_hop_limit(&mut self, value: u8) -> Result<(), ErrorCode> {593{594let fd = &*self.as_std_view()?;595set_unicast_hop_limit(fd, self.family, value)?;596}597self.options.set_hop_limit(value);598Ok(())599}600601pub(crate) fn receive_buffer_size(&self) -> Result<u64, ErrorCode> {602let fd = &*self.as_std_view()?;603let n = receive_buffer_size(fd)?;604Ok(n)605}606607pub(crate) fn set_receive_buffer_size(&mut self, value: u64) -> Result<(), ErrorCode> {608let res = {609let fd = &*self.as_std_view()?;610set_receive_buffer_size(fd, value)?611};612self.options.set_receive_buffer_size(res);613Ok(())614}615616pub(crate) fn send_buffer_size(&self) -> Result<u64, ErrorCode> {617let fd = &*self.as_std_view()?;618let n = send_buffer_size(fd)?;619Ok(n)620}621622pub(crate) fn set_send_buffer_size(&mut self, value: u64) -> Result<(), ErrorCode> {623let res = {624let fd = &*self.as_std_view()?;625set_send_buffer_size(fd, value)?626};627self.options.set_send_buffer_size(res);628Ok(())629}630631#[cfg(feature = "p3")]632pub(crate) fn non_inherited_options(&self) -> &NonInheritedOptions {633&self.options634}635636#[cfg(feature = "p3")]637pub(crate) fn tcp_listener_arc(&self) -> Result<&Arc<tokio::net::TcpListener>, ErrorCode> {638match &self.tcp_state {639TcpState::Listening { listener, .. } => Ok(listener),640#[cfg(feature = "p3")]641TcpState::Error(err) => Err(err.into()),642_ => Err(ErrorCode::InvalidState),643}644}645646pub(crate) fn tcp_stream_arc(&self) -> Result<&Arc<tokio::net::TcpStream>, ErrorCode> {647match &self.tcp_state {648TcpState::Connected(socket) => Ok(socket),649#[cfg(feature = "p3")]650TcpState::Receiving(socket) => Ok(socket),651TcpState::P2Streaming(state) => Ok(&state.stream),652#[cfg(feature = "p3")]653TcpState::Error(err) => Err(err.into()),654_ => Err(ErrorCode::InvalidState),655}656}657658pub(crate) fn p2_streaming_state(&self) -> Result<&P2TcpStreamingState, ErrorCode> {659match &self.tcp_state {660TcpState::P2Streaming(state) => Ok(state),661#[cfg(feature = "p3")]662TcpState::Error(err) => Err(err.into()),663_ => Err(ErrorCode::InvalidState),664}665}666667pub(crate) fn set_p2_streaming_state(668&mut self,669state: P2TcpStreamingState,670) -> Result<(), ErrorCode> {671if !matches!(self.tcp_state, TcpState::Connected(_)) {672return Err(ErrorCode::InvalidState);673}674self.tcp_state = TcpState::P2Streaming(Box::new(state));675Ok(())676}677678/// Used for `Pollable` in the WASIp2 implementation this awaits the socket679/// to be connected, if in the connecting state, or for a TCP accept to be680/// ready, if this is in the listening state.681///682/// For all other states this method immediately returns.683pub(crate) async fn ready(&mut self) {684match &mut self.tcp_state {685TcpState::Default(..)686| TcpState::BindStarted(..)687| TcpState::Bound(..)688| TcpState::ListenStarted(..)689| TcpState::ConnectReady(..)690| TcpState::Closed691| TcpState::Connected { .. }692| TcpState::Connecting(None)693| TcpState::Listening {694pending_accept: Some(_),695..696}697| TcpState::P2Streaming(_) => {}698699#[cfg(feature = "p3")]700TcpState::Receiving(_) | TcpState::Error(_) => {}701702TcpState::Connecting(Some(future)) => {703self.tcp_state = TcpState::ConnectReady(future.as_mut().await);704}705706TcpState::Listening {707listener,708pending_accept: slot @ None,709} => {710let result = futures::future::poll_fn(|cx| {711listener.poll_accept(cx).map_ok(|(stream, _)| stream)712})713.await;714*slot = Some(result);715}716}717}718}719720#[cfg(not(target_os = "macos"))]721pub use inherits_option::*;722#[cfg(not(target_os = "macos"))]723mod inherits_option {724use crate::sockets::SocketAddressFamily;725use tokio::net::TcpStream;726727#[derive(Default, Clone)]728pub struct NonInheritedOptions;729730impl NonInheritedOptions {731pub fn set_keep_alive_idle_time(&mut self, _value: u64) {}732733pub fn set_hop_limit(&mut self, _value: u8) {}734735pub fn set_receive_buffer_size(&mut self, _value: usize) {}736737pub fn set_send_buffer_size(&mut self, _value: usize) {}738739pub(crate) fn apply(&self, _family: SocketAddressFamily, _stream: &TcpStream) {}740}741}742743#[cfg(target_os = "macos")]744pub use does_not_inherit_options::*;745#[cfg(target_os = "macos")]746mod does_not_inherit_options {747use crate::sockets::SocketAddressFamily;748use rustix::net::sockopt;749use std::sync::Arc;750use std::sync::atomic::{AtomicU8, AtomicU64, AtomicUsize, Ordering::Relaxed};751use std::time::Duration;752use tokio::net::TcpStream;753754// The socket options below are not automatically inherited from the listener755// on all platforms. So we keep track of which options have been explicitly756// set and manually apply those values to newly accepted clients.757#[derive(Default, Clone)]758pub struct NonInheritedOptions(Arc<Inner>);759760#[derive(Default)]761struct Inner {762receive_buffer_size: AtomicUsize,763send_buffer_size: AtomicUsize,764hop_limit: AtomicU8,765keep_alive_idle_time: AtomicU64, // nanoseconds766}767768impl NonInheritedOptions {769pub fn set_keep_alive_idle_time(&mut self, value: u64) {770self.0.keep_alive_idle_time.store(value, Relaxed);771}772773pub fn set_hop_limit(&mut self, value: u8) {774self.0.hop_limit.store(value, Relaxed);775}776777pub fn set_receive_buffer_size(&mut self, value: usize) {778self.0.receive_buffer_size.store(value, Relaxed);779}780781pub fn set_send_buffer_size(&mut self, value: usize) {782self.0.send_buffer_size.store(value, Relaxed);783}784785pub(crate) fn apply(&self, family: SocketAddressFamily, stream: &TcpStream) {786// Manually inherit socket options from listener. We only have to787// do this on platforms that don't already do this automatically788// and only if a specific value was explicitly set on the listener.789790let receive_buffer_size = self.0.receive_buffer_size.load(Relaxed);791if receive_buffer_size > 0 {792// Ignore potential error.793_ = sockopt::set_socket_recv_buffer_size(&stream, receive_buffer_size);794}795796let send_buffer_size = self.0.send_buffer_size.load(Relaxed);797if send_buffer_size > 0 {798// Ignore potential error.799_ = sockopt::set_socket_send_buffer_size(&stream, send_buffer_size);800}801802// For some reason, IP_TTL is inherited, but IPV6_UNICAST_HOPS isn't.803if family == SocketAddressFamily::Ipv6 {804let hop_limit = self.0.hop_limit.load(Relaxed);805if hop_limit > 0 {806// Ignore potential error.807_ = sockopt::set_ipv6_unicast_hops(&stream, Some(hop_limit));808}809}810811let keep_alive_idle_time = self.0.keep_alive_idle_time.load(Relaxed);812if keep_alive_idle_time > 0 {813// Ignore potential error.814_ = sockopt::set_tcp_keepidle(&stream, Duration::from_nanos(keep_alive_idle_time));815}816}817}818}819820821