Path: blob/main/crates/wasi/src/p2/host/tcp.rs
3088 views
use crate::p2::bindings::{1sockets::network::{ErrorCode, IpAddressFamily, IpSocketAddress, Network},2sockets::tcp::{self, ShutdownType},3};4use crate::p2::{Pollable, SocketResult};5use crate::sockets::{SocketAddrUse, TcpSocket, WasiSocketsCtxView};6use std::net::SocketAddr;7use wasmtime::component::Resource;8use wasmtime_wasi_io::{9poll::DynPollable,10streams::{DynInputStream, DynOutputStream},11};1213impl tcp::Host for WasiSocketsCtxView<'_> {}1415impl crate::p2::host::tcp::tcp::HostTcpSocket for WasiSocketsCtxView<'_> {16async fn start_bind(17&mut self,18this: Resource<TcpSocket>,19network: Resource<Network>,20local_address: IpSocketAddress,21) -> SocketResult<()> {22let network = self.table.get(&network)?;23let local_address: SocketAddr = local_address.into();2425// Ensure that we're allowed to connect to this address.26network27.check_socket_addr(local_address, SocketAddrUse::TcpBind)28.await?;2930// Bind to the address.31self.table.get_mut(&this)?.start_bind(local_address)?;3233Ok(())34}3536fn finish_bind(&mut self, this: Resource<TcpSocket>) -> SocketResult<()> {37let socket = self.table.get_mut(&this)?;38socket.finish_bind()?;39Ok(())40}4142async fn start_connect(43&mut self,44this: Resource<TcpSocket>,45network: Resource<Network>,46remote_address: IpSocketAddress,47) -> SocketResult<()> {48let network = self.table.get(&network)?;49let remote_address: SocketAddr = remote_address.into();5051// Ensure that we're allowed to connect to this address.52network53.check_socket_addr(remote_address, SocketAddrUse::TcpConnect)54.await?;5556// Start connection57let socket = self.table.get_mut(&this)?;58let future = socket59.start_connect(&remote_address)?60.connect(remote_address);61socket.set_pending_connect(future)?;6263Ok(())64}6566fn finish_connect(67&mut self,68this: Resource<TcpSocket>,69) -> SocketResult<(Resource<DynInputStream>, Resource<DynOutputStream>)> {70let socket = self.table.get_mut(&this)?;7172let result = socket73.take_pending_connect()?74.ok_or(ErrorCode::WouldBlock)?;75socket.finish_connect(result)?;76let (input, output) = socket.p2_streams()?;77let input = self.table.push_child(input, &this)?;78let output = self.table.push_child(output, &this)?;79Ok((input, output))80}8182fn start_listen(&mut self, this: Resource<TcpSocket>) -> SocketResult<()> {83let socket = self.table.get_mut(&this)?;8485socket.start_listen_p2()?;86Ok(())87}8889fn finish_listen(&mut self, this: Resource<TcpSocket>) -> SocketResult<()> {90let socket = self.table.get_mut(&this)?;91socket.finish_listen_p2()?;92Ok(())93}9495fn accept(96&mut self,97this: Resource<TcpSocket>,98) -> SocketResult<(99Resource<TcpSocket>,100Resource<DynInputStream>,101Resource<DynOutputStream>,102)> {103let socket = self.table.get_mut(&this)?;104105let mut tcp_socket = socket.accept()?.ok_or(ErrorCode::WouldBlock)?;106let (input, output) = tcp_socket.p2_streams()?;107108let tcp_socket = self.table.push(tcp_socket)?;109let input_stream = self.table.push_child(input, &tcp_socket)?;110let output_stream = self.table.push_child(output, &tcp_socket)?;111112Ok((tcp_socket, input_stream, output_stream))113}114115fn local_address(&mut self, this: Resource<TcpSocket>) -> SocketResult<IpSocketAddress> {116let socket = self.table.get(&this)?;117Ok(socket.local_address()?.into())118}119120fn remote_address(&mut self, this: Resource<TcpSocket>) -> SocketResult<IpSocketAddress> {121let socket = self.table.get(&this)?;122Ok(socket.remote_address()?.into())123}124125fn is_listening(&mut self, this: Resource<TcpSocket>) -> Result<bool, wasmtime::Error> {126let socket = self.table.get(&this)?;127128Ok(socket.is_listening())129}130131fn address_family(132&mut self,133this: Resource<TcpSocket>,134) -> Result<IpAddressFamily, wasmtime::Error> {135let socket = self.table.get(&this)?;136Ok(socket.address_family().into())137}138139fn set_listen_backlog_size(140&mut self,141this: Resource<TcpSocket>,142value: u64,143) -> SocketResult<()> {144let socket = self.table.get_mut(&this)?;145socket.set_listen_backlog_size(value)?;146Ok(())147}148149fn keep_alive_enabled(&mut self, this: Resource<TcpSocket>) -> SocketResult<bool> {150let socket = self.table.get(&this)?;151Ok(socket.keep_alive_enabled()?)152}153154fn set_keep_alive_enabled(155&mut self,156this: Resource<TcpSocket>,157value: bool,158) -> SocketResult<()> {159let socket = self.table.get(&this)?;160socket.set_keep_alive_enabled(value)?;161Ok(())162}163164fn keep_alive_idle_time(&mut self, this: Resource<TcpSocket>) -> SocketResult<u64> {165let socket = self.table.get(&this)?;166Ok(socket.keep_alive_idle_time()?)167}168169fn set_keep_alive_idle_time(170&mut self,171this: Resource<TcpSocket>,172value: u64,173) -> SocketResult<()> {174let socket = self.table.get_mut(&this)?;175socket.set_keep_alive_idle_time(value)?;176Ok(())177}178179fn keep_alive_interval(&mut self, this: Resource<TcpSocket>) -> SocketResult<u64> {180let socket = self.table.get(&this)?;181Ok(socket.keep_alive_interval()?)182}183184fn set_keep_alive_interval(185&mut self,186this: Resource<TcpSocket>,187value: u64,188) -> SocketResult<()> {189let socket = self.table.get(&this)?;190socket.set_keep_alive_interval(value)?;191Ok(())192}193194fn keep_alive_count(&mut self, this: Resource<TcpSocket>) -> SocketResult<u32> {195let socket = self.table.get(&this)?;196Ok(socket.keep_alive_count()?)197}198199fn set_keep_alive_count(&mut self, this: Resource<TcpSocket>, value: u32) -> SocketResult<()> {200let socket = self.table.get(&this)?;201socket.set_keep_alive_count(value)?;202Ok(())203}204205fn hop_limit(&mut self, this: Resource<TcpSocket>) -> SocketResult<u8> {206let socket = self.table.get(&this)?;207Ok(socket.hop_limit()?)208}209210fn set_hop_limit(&mut self, this: Resource<TcpSocket>, value: u8) -> SocketResult<()> {211let socket = self.table.get_mut(&this)?;212socket.set_hop_limit(value)?;213Ok(())214}215216fn receive_buffer_size(&mut self, this: Resource<TcpSocket>) -> SocketResult<u64> {217let socket = self.table.get(&this)?;218Ok(socket.receive_buffer_size()?)219}220221fn set_receive_buffer_size(222&mut self,223this: Resource<TcpSocket>,224value: u64,225) -> SocketResult<()> {226let socket = self.table.get_mut(&this)?;227socket.set_receive_buffer_size(value)?;228Ok(())229}230231fn send_buffer_size(&mut self, this: Resource<TcpSocket>) -> SocketResult<u64> {232let socket = self.table.get(&this)?;233Ok(socket.send_buffer_size()?)234}235236fn set_send_buffer_size(&mut self, this: Resource<TcpSocket>, value: u64) -> SocketResult<()> {237let socket = self.table.get_mut(&this)?;238socket.set_send_buffer_size(value)?;239Ok(())240}241242fn subscribe(&mut self, this: Resource<TcpSocket>) -> wasmtime::Result<Resource<DynPollable>> {243wasmtime_wasi_io::poll::subscribe(self.table, this)244}245246fn shutdown(247&mut self,248this: Resource<TcpSocket>,249shutdown_type: ShutdownType,250) -> SocketResult<()> {251let socket = self.table.get(&this)?;252253let how = match shutdown_type {254ShutdownType::Receive => std::net::Shutdown::Read,255ShutdownType::Send => std::net::Shutdown::Write,256ShutdownType::Both => std::net::Shutdown::Both,257};258259let state = socket.p2_streaming_state()?;260state.shutdown(how)?;261Ok(())262}263264fn drop(&mut self, this: Resource<TcpSocket>) -> Result<(), wasmtime::Error> {265// As in the filesystem implementation, we assume closing a socket266// doesn't block.267let dropped = self.table.delete(this)?;268drop(dropped);269270Ok(())271}272}273274#[async_trait::async_trait]275impl Pollable for TcpSocket {276async fn ready(&mut self) {277<TcpSocket>::ready(self).await;278}279}280281pub mod sync {282use crate::p2::{283SocketError,284bindings::{285sockets::{286network::Network,287tcp::{self as async_tcp, HostTcpSocket as AsyncHostTcpSocket},288},289sync::sockets::tcp::{290self, Duration, HostTcpSocket, InputStream, IpAddressFamily, IpSocketAddress,291OutputStream, Pollable, ShutdownType, TcpSocket,292},293},294};295use crate::runtime::in_tokio;296use crate::sockets::WasiSocketsCtxView;297use wasmtime::component::Resource;298299impl tcp::Host for WasiSocketsCtxView<'_> {}300301impl HostTcpSocket for WasiSocketsCtxView<'_> {302fn start_bind(303&mut self,304self_: Resource<TcpSocket>,305network: Resource<Network>,306local_address: IpSocketAddress,307) -> Result<(), SocketError> {308in_tokio(async {309AsyncHostTcpSocket::start_bind(self, self_, network, local_address).await310})311}312313fn finish_bind(&mut self, self_: Resource<TcpSocket>) -> Result<(), SocketError> {314AsyncHostTcpSocket::finish_bind(self, self_)315}316317fn start_connect(318&mut self,319self_: Resource<TcpSocket>,320network: Resource<Network>,321remote_address: IpSocketAddress,322) -> Result<(), SocketError> {323in_tokio(async {324AsyncHostTcpSocket::start_connect(self, self_, network, remote_address).await325})326}327328fn finish_connect(329&mut self,330self_: Resource<TcpSocket>,331) -> Result<(Resource<InputStream>, Resource<OutputStream>), SocketError> {332AsyncHostTcpSocket::finish_connect(self, self_)333}334335fn start_listen(&mut self, self_: Resource<TcpSocket>) -> Result<(), SocketError> {336AsyncHostTcpSocket::start_listen(self, self_)337}338339fn finish_listen(&mut self, self_: Resource<TcpSocket>) -> Result<(), SocketError> {340AsyncHostTcpSocket::finish_listen(self, self_)341}342343fn accept(344&mut self,345self_: Resource<TcpSocket>,346) -> Result<347(348Resource<TcpSocket>,349Resource<InputStream>,350Resource<OutputStream>,351),352SocketError,353> {354AsyncHostTcpSocket::accept(self, self_)355}356357fn local_address(358&mut self,359self_: Resource<TcpSocket>,360) -> Result<IpSocketAddress, SocketError> {361AsyncHostTcpSocket::local_address(self, self_)362}363364fn remote_address(365&mut self,366self_: Resource<TcpSocket>,367) -> Result<IpSocketAddress, SocketError> {368AsyncHostTcpSocket::remote_address(self, self_)369}370371fn is_listening(&mut self, self_: Resource<TcpSocket>) -> wasmtime::Result<bool> {372AsyncHostTcpSocket::is_listening(self, self_)373}374375fn address_family(376&mut self,377self_: Resource<TcpSocket>,378) -> wasmtime::Result<IpAddressFamily> {379AsyncHostTcpSocket::address_family(self, self_)380}381382fn set_listen_backlog_size(383&mut self,384self_: Resource<TcpSocket>,385value: u64,386) -> Result<(), SocketError> {387AsyncHostTcpSocket::set_listen_backlog_size(self, self_, value)388}389390fn keep_alive_enabled(&mut self, self_: Resource<TcpSocket>) -> Result<bool, SocketError> {391AsyncHostTcpSocket::keep_alive_enabled(self, self_)392}393394fn set_keep_alive_enabled(395&mut self,396self_: Resource<TcpSocket>,397value: bool,398) -> Result<(), SocketError> {399AsyncHostTcpSocket::set_keep_alive_enabled(self, self_, value)400}401402fn keep_alive_idle_time(403&mut self,404self_: Resource<TcpSocket>,405) -> Result<Duration, SocketError> {406AsyncHostTcpSocket::keep_alive_idle_time(self, self_)407}408409fn set_keep_alive_idle_time(410&mut self,411self_: Resource<TcpSocket>,412value: Duration,413) -> Result<(), SocketError> {414AsyncHostTcpSocket::set_keep_alive_idle_time(self, self_, value)415}416417fn keep_alive_interval(418&mut self,419self_: Resource<TcpSocket>,420) -> Result<Duration, SocketError> {421AsyncHostTcpSocket::keep_alive_interval(self, self_)422}423424fn set_keep_alive_interval(425&mut self,426self_: Resource<TcpSocket>,427value: Duration,428) -> Result<(), SocketError> {429AsyncHostTcpSocket::set_keep_alive_interval(self, self_, value)430}431432fn keep_alive_count(&mut self, self_: Resource<TcpSocket>) -> Result<u32, SocketError> {433AsyncHostTcpSocket::keep_alive_count(self, self_)434}435436fn set_keep_alive_count(437&mut self,438self_: Resource<TcpSocket>,439value: u32,440) -> Result<(), SocketError> {441AsyncHostTcpSocket::set_keep_alive_count(self, self_, value)442}443444fn hop_limit(&mut self, self_: Resource<TcpSocket>) -> Result<u8, SocketError> {445AsyncHostTcpSocket::hop_limit(self, self_)446}447448fn set_hop_limit(449&mut self,450self_: Resource<TcpSocket>,451value: u8,452) -> Result<(), SocketError> {453AsyncHostTcpSocket::set_hop_limit(self, self_, value)454}455456fn receive_buffer_size(&mut self, self_: Resource<TcpSocket>) -> Result<u64, SocketError> {457AsyncHostTcpSocket::receive_buffer_size(self, self_)458}459460fn set_receive_buffer_size(461&mut self,462self_: Resource<TcpSocket>,463value: u64,464) -> Result<(), SocketError> {465AsyncHostTcpSocket::set_receive_buffer_size(self, self_, value)466}467468fn send_buffer_size(&mut self, self_: Resource<TcpSocket>) -> Result<u64, SocketError> {469AsyncHostTcpSocket::send_buffer_size(self, self_)470}471472fn set_send_buffer_size(473&mut self,474self_: Resource<TcpSocket>,475value: u64,476) -> Result<(), SocketError> {477AsyncHostTcpSocket::set_send_buffer_size(self, self_, value)478}479480fn subscribe(481&mut self,482self_: Resource<TcpSocket>,483) -> wasmtime::Result<Resource<Pollable>> {484AsyncHostTcpSocket::subscribe(self, self_)485}486487fn shutdown(488&mut self,489self_: Resource<TcpSocket>,490shutdown_type: ShutdownType,491) -> Result<(), SocketError> {492AsyncHostTcpSocket::shutdown(self, self_, shutdown_type.into())493}494495fn drop(&mut self, rep: Resource<TcpSocket>) -> wasmtime::Result<()> {496AsyncHostTcpSocket::drop(self, rep)497}498}499500impl From<ShutdownType> for async_tcp::ShutdownType {501fn from(other: ShutdownType) -> Self {502match other {503ShutdownType::Receive => async_tcp::ShutdownType::Receive,504ShutdownType::Send => async_tcp::ShutdownType::Send,505ShutdownType::Both => async_tcp::ShutdownType::Both,506}507}508}509}510511512