Path: blob/main/crates/wasi/src/p2/host/udp.rs
3078 views
use crate::p2::bindings::sockets::network::{ErrorCode, IpAddressFamily, IpSocketAddress, Network};1use crate::p2::bindings::sockets::udp;2use crate::p2::udp::{IncomingDatagramStream, OutgoingDatagramStream, SendState};3use crate::p2::{Pollable, SocketError, SocketResult};4use crate::sockets::util::{is_valid_address_family, is_valid_remote_address};5use crate::sockets::{6MAX_UDP_DATAGRAM_SIZE, SocketAddrUse, SocketAddressFamily, UdpSocket, WasiSocketsCtxView,7};8use async_trait::async_trait;9use std::net::SocketAddr;10use tokio::io::Interest;11use wasmtime::component::Resource;12use wasmtime::format_err;13use wasmtime_wasi_io::poll::DynPollable;1415impl udp::Host for WasiSocketsCtxView<'_> {}1617impl udp::HostUdpSocket for WasiSocketsCtxView<'_> {18async fn start_bind(19&mut self,20this: Resource<udp::UdpSocket>,21network: Resource<Network>,22local_address: IpSocketAddress,23) -> SocketResult<()> {24let local_address = SocketAddr::from(local_address);25let check = self.table.get(&network)?.socket_addr_check.clone();26check.check(local_address, SocketAddrUse::UdpBind).await?;2728let socket = self.table.get_mut(&this)?;29socket.bind(local_address)?;30socket.set_socket_addr_check(Some(check));3132Ok(())33}3435fn finish_bind(&mut self, this: Resource<udp::UdpSocket>) -> SocketResult<()> {36self.table.get_mut(&this)?.finish_bind()?;37Ok(())38}3940async fn stream(41&mut self,42this: Resource<udp::UdpSocket>,43remote_address: Option<IpSocketAddress>,44) -> SocketResult<(45Resource<udp::IncomingDatagramStream>,46Resource<udp::OutgoingDatagramStream>,47)> {48let has_active_streams = self49.table50.iter_children(&this)?51.any(|c| c.is::<IncomingDatagramStream>() || c.is::<OutgoingDatagramStream>());5253if has_active_streams {54return Err(SocketError::trap(format_err!(55"UDP streams not dropped yet"56)));57}5859let socket = self.table.get_mut(&this)?;60let remote_address = remote_address.map(SocketAddr::from);6162if !socket.is_bound() {63return Err(ErrorCode::InvalidState.into());64}6566// We disconnect & (re)connect in two distinct steps for two reasons:67// - To leave our socket instance in a consistent state in case the68// connect fails.69// - When reconnecting to a different address, Linux sometimes fails70// if there isn't a disconnect in between.7172// Step #1: Disconnect73if socket.is_connected() {74socket.disconnect()?;75}7677// Step #2: (Re)connect78if let Some(connect_addr) = remote_address {79let Some(check) = socket.socket_addr_check() else {80return Err(ErrorCode::InvalidState.into());81};82check.check(connect_addr, SocketAddrUse::UdpConnect).await?;83socket.connect_p2(connect_addr)?;84}8586let incoming_stream = IncomingDatagramStream {87inner: socket.socket().clone(),88remote_address,89};90let outgoing_stream = OutgoingDatagramStream {91inner: socket.socket().clone(),92remote_address,93family: socket.address_family(),94send_state: SendState::Idle,95socket_addr_check: socket.socket_addr_check().cloned(),96};9798Ok((99self.table.push_child(incoming_stream, &this)?,100self.table.push_child(outgoing_stream, &this)?,101))102}103104fn local_address(&mut self, this: Resource<udp::UdpSocket>) -> SocketResult<IpSocketAddress> {105let socket = self.table.get(&this)?;106Ok(socket.local_address()?.into())107}108109fn remote_address(&mut self, this: Resource<udp::UdpSocket>) -> SocketResult<IpSocketAddress> {110let socket = self.table.get(&this)?;111Ok(socket.remote_address()?.into())112}113114fn address_family(115&mut self,116this: Resource<udp::UdpSocket>,117) -> Result<IpAddressFamily, wasmtime::Error> {118let socket = self.table.get(&this)?;119Ok(socket.address_family().into())120}121122fn unicast_hop_limit(&mut self, this: Resource<udp::UdpSocket>) -> SocketResult<u8> {123let socket = self.table.get(&this)?;124Ok(socket.unicast_hop_limit()?)125}126127fn set_unicast_hop_limit(128&mut self,129this: Resource<udp::UdpSocket>,130value: u8,131) -> SocketResult<()> {132let socket = self.table.get(&this)?;133socket.set_unicast_hop_limit(value)?;134Ok(())135}136137fn receive_buffer_size(&mut self, this: Resource<udp::UdpSocket>) -> SocketResult<u64> {138let socket = self.table.get(&this)?;139Ok(socket.receive_buffer_size()?)140}141142fn set_receive_buffer_size(143&mut self,144this: Resource<udp::UdpSocket>,145value: u64,146) -> SocketResult<()> {147let socket = self.table.get(&this)?;148socket.set_receive_buffer_size(value)?;149Ok(())150}151152fn send_buffer_size(&mut self, this: Resource<udp::UdpSocket>) -> SocketResult<u64> {153let socket = self.table.get(&this)?;154Ok(socket.send_buffer_size()?)155}156157fn set_send_buffer_size(&mut self, this: Resource<UdpSocket>, value: u64) -> SocketResult<()> {158let socket = self.table.get(&this)?;159socket.set_send_buffer_size(value)?;160Ok(())161}162163fn subscribe(&mut self, this: Resource<UdpSocket>) -> wasmtime::Result<Resource<DynPollable>> {164wasmtime_wasi_io::poll::subscribe(self.table, this)165}166167fn drop(&mut self, this: Resource<udp::UdpSocket>) -> Result<(), wasmtime::Error> {168// As in the filesystem implementation, we assume closing a socket169// doesn't block.170let dropped = self.table.delete(this)?;171drop(dropped);172173Ok(())174}175}176177#[async_trait]178impl Pollable for UdpSocket {179async fn ready(&mut self) {180// None of the socket-level operations block natively181}182}183184impl udp::HostIncomingDatagramStream for WasiSocketsCtxView<'_> {185fn receive(186&mut self,187this: Resource<udp::IncomingDatagramStream>,188max_results: u64,189) -> SocketResult<Vec<udp::IncomingDatagram>> {190// Returns Ok(None) when the message was dropped.191fn recv_one(192stream: &IncomingDatagramStream,193) -> SocketResult<Option<udp::IncomingDatagram>> {194let mut buf = [0; MAX_UDP_DATAGRAM_SIZE];195let (size, received_addr) = stream.inner.try_recv_from(&mut buf)?;196debug_assert!(size <= buf.len());197198match stream.remote_address {199Some(connected_addr) if connected_addr != received_addr => {200// Normally, this should have already been checked for us by the OS.201return Ok(None);202}203_ => {}204}205206Ok(Some(udp::IncomingDatagram {207data: buf[..size].into(),208remote_address: received_addr.into(),209}))210}211212let stream = self.table.get(&this)?;213let max_results: usize = max_results.try_into().unwrap_or(usize::MAX);214215if max_results == 0 {216return Ok(vec![]);217}218219let mut datagrams = vec![];220221while datagrams.len() < max_results {222match recv_one(stream) {223Ok(Some(datagram)) => {224datagrams.push(datagram);225}226Ok(None) => {227// Message was dropped228}229Err(_) if datagrams.len() > 0 => {230return Ok(datagrams);231}232Err(e) if matches!(e.downcast_ref(), Some(ErrorCode::WouldBlock)) => {233return Ok(datagrams);234}235Err(e) => {236return Err(e);237}238}239}240241Ok(datagrams)242}243244fn subscribe(245&mut self,246this: Resource<udp::IncomingDatagramStream>,247) -> wasmtime::Result<Resource<DynPollable>> {248wasmtime_wasi_io::poll::subscribe(self.table, this)249}250251fn drop(&mut self, this: Resource<udp::IncomingDatagramStream>) -> Result<(), wasmtime::Error> {252// As in the filesystem implementation, we assume closing a socket253// doesn't block.254let dropped = self.table.delete(this)?;255drop(dropped);256257Ok(())258}259}260261#[async_trait]262impl Pollable for IncomingDatagramStream {263async fn ready(&mut self) {264// FIXME: Add `Interest::ERROR` when we update to tokio 1.32.265self.inner266.ready(Interest::READABLE)267.await268.expect("failed to await UDP socket readiness");269}270}271272impl udp::HostOutgoingDatagramStream for WasiSocketsCtxView<'_> {273fn check_send(&mut self, this: Resource<udp::OutgoingDatagramStream>) -> SocketResult<u64> {274let stream = self.table.get_mut(&this)?;275276let permit = match stream.send_state {277SendState::Idle => {278const PERMIT: usize = 16;279stream.send_state = SendState::Permitted(PERMIT);280PERMIT281}282SendState::Permitted(n) => n,283SendState::Waiting => 0,284};285286Ok(permit.try_into().unwrap())287}288289async fn send(290&mut self,291this: Resource<udp::OutgoingDatagramStream>,292datagrams: Vec<udp::OutgoingDatagram>,293) -> SocketResult<u64> {294async fn send_one(295stream: &OutgoingDatagramStream,296datagram: &udp::OutgoingDatagram,297) -> SocketResult<()> {298if datagram.data.len() > MAX_UDP_DATAGRAM_SIZE {299return Err(ErrorCode::DatagramTooLarge.into());300}301302let provided_addr = datagram.remote_address.map(SocketAddr::from);303let addr = match (stream.remote_address, provided_addr) {304(None, Some(addr)) => {305let Some(check) = stream.socket_addr_check.as_ref() else {306return Err(ErrorCode::InvalidState.into());307};308check309.check(addr, SocketAddrUse::UdpOutgoingDatagram)310.await?;311addr312}313(Some(addr), None) => addr,314(Some(connected_addr), Some(provided_addr)) if connected_addr == provided_addr => {315connected_addr316}317_ => return Err(ErrorCode::InvalidArgument.into()),318};319320if !is_valid_remote_address(addr) || !is_valid_address_family(addr.ip(), stream.family)321{322return Err(ErrorCode::InvalidArgument.into());323}324325if stream.remote_address == Some(addr) {326stream.inner.try_send(&datagram.data)?;327} else {328stream.inner.try_send_to(&datagram.data, addr)?;329}330331Ok(())332}333334let stream = self.table.get_mut(&this)?;335336match stream.send_state {337SendState::Permitted(n) if n >= datagrams.len() => {338stream.send_state = SendState::Idle;339}340SendState::Permitted(_) => {341return Err(SocketError::trap(wasmtime::format_err!(342"unpermitted: argument exceeds permitted size"343)));344}345SendState::Idle | SendState::Waiting => {346return Err(SocketError::trap(wasmtime::format_err!(347"unpermitted: must call check-send first"348)));349}350}351352if datagrams.is_empty() {353return Ok(0);354}355356let mut count = 0;357358for datagram in datagrams {359match send_one(stream, &datagram).await {360Ok(_) => count += 1,361Err(_) if count > 0 => {362// WIT: "If at least one datagram has been sent successfully, this function never returns an error."363return Ok(count);364}365Err(e) if matches!(e.downcast_ref(), Some(ErrorCode::WouldBlock)) => {366stream.send_state = SendState::Waiting;367return Ok(count);368}369Err(e) => {370return Err(e);371}372}373}374375Ok(count)376}377378fn subscribe(379&mut self,380this: Resource<udp::OutgoingDatagramStream>,381) -> wasmtime::Result<Resource<DynPollable>> {382wasmtime_wasi_io::poll::subscribe(self.table, this)383}384385fn drop(&mut self, this: Resource<udp::OutgoingDatagramStream>) -> Result<(), wasmtime::Error> {386// As in the filesystem implementation, we assume closing a socket387// doesn't block.388let dropped = self.table.delete(this)?;389drop(dropped);390391Ok(())392}393}394395#[async_trait]396impl Pollable for OutgoingDatagramStream {397async fn ready(&mut self) {398match self.send_state {399SendState::Idle | SendState::Permitted(_) => {}400SendState::Waiting => {401// FIXME: Add `Interest::ERROR` when we update to tokio 1.32.402self.inner403.ready(Interest::WRITABLE)404.await405.expect("failed to await UDP socket readiness");406self.send_state = SendState::Idle;407}408}409}410}411412impl From<SocketAddressFamily> for IpAddressFamily {413fn from(family: SocketAddressFamily) -> IpAddressFamily {414match family {415SocketAddressFamily::Ipv4 => IpAddressFamily::Ipv4,416SocketAddressFamily::Ipv6 => IpAddressFamily::Ipv6,417}418}419}420421pub mod sync {422use wasmtime::component::Resource;423424use crate::p2::{425SocketError,426bindings::{427sockets::{428network::Network,429udp::{430self as async_udp,431HostIncomingDatagramStream as AsyncHostIncomingDatagramStream,432HostOutgoingDatagramStream as AsyncHostOutgoingDatagramStream,433HostUdpSocket as AsyncHostUdpSocket, IncomingDatagramStream,434OutgoingDatagramStream,435},436},437sync::sockets::udp::{438self, HostIncomingDatagramStream, HostOutgoingDatagramStream, HostUdpSocket,439IncomingDatagram, IpAddressFamily, IpSocketAddress, OutgoingDatagram, Pollable,440UdpSocket,441},442},443};444use crate::runtime::in_tokio;445use crate::sockets::WasiSocketsCtxView;446447impl udp::Host for WasiSocketsCtxView<'_> {}448449impl HostUdpSocket for WasiSocketsCtxView<'_> {450fn start_bind(451&mut self,452self_: Resource<UdpSocket>,453network: Resource<Network>,454local_address: IpSocketAddress,455) -> Result<(), SocketError> {456in_tokio(async {457AsyncHostUdpSocket::start_bind(self, self_, network, local_address).await458})459}460461fn finish_bind(&mut self, self_: Resource<UdpSocket>) -> Result<(), SocketError> {462AsyncHostUdpSocket::finish_bind(self, self_)463}464465fn stream(466&mut self,467self_: Resource<UdpSocket>,468remote_address: Option<IpSocketAddress>,469) -> Result<470(471Resource<IncomingDatagramStream>,472Resource<OutgoingDatagramStream>,473),474SocketError,475> {476in_tokio(async { AsyncHostUdpSocket::stream(self, self_, remote_address).await })477}478479fn local_address(480&mut self,481self_: Resource<UdpSocket>,482) -> Result<IpSocketAddress, SocketError> {483AsyncHostUdpSocket::local_address(self, self_)484}485486fn remote_address(487&mut self,488self_: Resource<UdpSocket>,489) -> Result<IpSocketAddress, SocketError> {490AsyncHostUdpSocket::remote_address(self, self_)491}492493fn address_family(494&mut self,495self_: Resource<UdpSocket>,496) -> wasmtime::Result<IpAddressFamily> {497AsyncHostUdpSocket::address_family(self, self_)498}499500fn unicast_hop_limit(&mut self, self_: Resource<UdpSocket>) -> Result<u8, SocketError> {501AsyncHostUdpSocket::unicast_hop_limit(self, self_)502}503504fn set_unicast_hop_limit(505&mut self,506self_: Resource<UdpSocket>,507value: u8,508) -> Result<(), SocketError> {509AsyncHostUdpSocket::set_unicast_hop_limit(self, self_, value)510}511512fn receive_buffer_size(&mut self, self_: Resource<UdpSocket>) -> Result<u64, SocketError> {513AsyncHostUdpSocket::receive_buffer_size(self, self_)514}515516fn set_receive_buffer_size(517&mut self,518self_: Resource<UdpSocket>,519value: u64,520) -> Result<(), SocketError> {521AsyncHostUdpSocket::set_receive_buffer_size(self, self_, value)522}523524fn send_buffer_size(&mut self, self_: Resource<UdpSocket>) -> Result<u64, SocketError> {525AsyncHostUdpSocket::send_buffer_size(self, self_)526}527528fn set_send_buffer_size(529&mut self,530self_: Resource<UdpSocket>,531value: u64,532) -> Result<(), SocketError> {533AsyncHostUdpSocket::set_send_buffer_size(self, self_, value)534}535536fn subscribe(537&mut self,538self_: Resource<UdpSocket>,539) -> wasmtime::Result<Resource<Pollable>> {540AsyncHostUdpSocket::subscribe(self, self_)541}542543fn drop(&mut self, rep: Resource<UdpSocket>) -> wasmtime::Result<()> {544AsyncHostUdpSocket::drop(self, rep)545}546}547548impl HostIncomingDatagramStream for WasiSocketsCtxView<'_> {549fn receive(550&mut self,551self_: Resource<IncomingDatagramStream>,552max_results: u64,553) -> Result<Vec<IncomingDatagram>, SocketError> {554Ok(555AsyncHostIncomingDatagramStream::receive(self, self_, max_results)?556.into_iter()557.map(Into::into)558.collect(),559)560}561562fn subscribe(563&mut self,564self_: Resource<IncomingDatagramStream>,565) -> wasmtime::Result<Resource<Pollable>> {566AsyncHostIncomingDatagramStream::subscribe(self, self_)567}568569fn drop(&mut self, rep: Resource<IncomingDatagramStream>) -> wasmtime::Result<()> {570AsyncHostIncomingDatagramStream::drop(self, rep)571}572}573574impl From<async_udp::IncomingDatagram> for IncomingDatagram {575fn from(other: async_udp::IncomingDatagram) -> Self {576let async_udp::IncomingDatagram {577data,578remote_address,579} = other;580Self {581data,582remote_address,583}584}585}586587impl HostOutgoingDatagramStream for WasiSocketsCtxView<'_> {588fn check_send(589&mut self,590self_: Resource<OutgoingDatagramStream>,591) -> Result<u64, SocketError> {592AsyncHostOutgoingDatagramStream::check_send(self, self_)593}594595fn send(596&mut self,597self_: Resource<OutgoingDatagramStream>,598datagrams: Vec<OutgoingDatagram>,599) -> Result<u64, SocketError> {600let datagrams = datagrams.into_iter().map(Into::into).collect();601in_tokio(async { AsyncHostOutgoingDatagramStream::send(self, self_, datagrams).await })602}603604fn subscribe(605&mut self,606self_: Resource<OutgoingDatagramStream>,607) -> wasmtime::Result<Resource<Pollable>> {608AsyncHostOutgoingDatagramStream::subscribe(self, self_)609}610611fn drop(&mut self, rep: Resource<OutgoingDatagramStream>) -> wasmtime::Result<()> {612AsyncHostOutgoingDatagramStream::drop(self, rep)613}614}615616impl From<OutgoingDatagram> for async_udp::OutgoingDatagram {617fn from(other: OutgoingDatagram) -> Self {618let OutgoingDatagram {619data,620remote_address,621} = other;622Self {623data,624remote_address,625}626}627}628}629630631