Path: blob/main/crates/test-programs/src/sockets.rs
3054 views
use crate::wasi::clocks::monotonic_clock;1use crate::wasi::io::poll::{self, Pollable};2use crate::wasi::io::streams::{InputStream, OutputStream, StreamError};3use crate::wasi::random;4use crate::wasi::sockets::instance_network;5use crate::wasi::sockets::ip_name_lookup;6use crate::wasi::sockets::network::{7ErrorCode, IpAddress, IpAddressFamily, IpSocketAddress, Ipv4SocketAddress, Ipv6SocketAddress,8Network,9};10use crate::wasi::sockets::tcp::TcpSocket;11use crate::wasi::sockets::udp::{12IncomingDatagram, IncomingDatagramStream, OutgoingDatagram, OutgoingDatagramStream, UdpSocket,13};14use crate::wasi::sockets::{tcp_create_socket, udp_create_socket};15use std::ops::Range;1617const TIMEOUT_NS: u64 = 1_000_000_000;1819pub fn supports_ipv6() -> bool {20std::env::var("DISABLE_IPV6").is_err()21}2223impl Pollable {24pub fn block_until(&self, timeout: &Pollable) -> Result<(), ErrorCode> {25let ready = poll::poll(&[self, timeout]);26assert!(ready.len() > 0);27match ready[0] {280 => Ok(()),291 => Err(ErrorCode::Timeout),30_ => unreachable!(),31}32}33}3435impl InputStream {36pub fn blocking_read_to_end(&self) -> Result<Vec<u8>, crate::wasi::io::error::Error> {37let mut data = vec![];38loop {39match self.blocking_read(1024 * 1024) {40Ok(chunk) => data.extend(chunk),41Err(StreamError::Closed) => return Ok(data),42Err(StreamError::LastOperationFailed(e)) => return Err(e),43}44}45}46}4748impl OutputStream {49pub fn blocking_write_util(&self, mut bytes: &[u8]) -> Result<(), StreamError> {50let timeout = monotonic_clock::subscribe_duration(TIMEOUT_NS);51let pollable = self.subscribe();5253while !bytes.is_empty() {54pollable.block_until(&timeout).expect("write timed out");5556let permit = self.check_write()?;5758let len = bytes.len().min(permit as usize);59let (chunk, rest) = bytes.split_at(len);6061self.write(chunk)?;6263self.blocking_flush()?;6465bytes = rest;66}67Ok(())68}69}7071impl Network {72pub fn default() -> Network {73instance_network::instance_network()74}7576pub fn blocking_resolve_addresses(&self, name: &str) -> Result<Vec<IpAddress>, ErrorCode> {77let stream = ip_name_lookup::resolve_addresses(&self, name)?;7879let timeout = monotonic_clock::subscribe_duration(TIMEOUT_NS);80let pollable = stream.subscribe();8182let mut addresses = vec![];8384loop {85match stream.resolve_next_address() {86Ok(Some(addr)) => {87addresses.push(addr);88}89Ok(None) => match addresses[..] {90[] => return Err(ErrorCode::NameUnresolvable),91_ => return Ok(addresses),92},93Err(ErrorCode::WouldBlock) => {94pollable.block_until(&timeout)?;95}96Err(err) => return Err(err),97}98}99}100101/// Same as `Network::blocking_resolve_addresses` but ignores post validation errors102///103/// The ignored error codes signal that the input passed validation104/// and a lookup was actually attempted, but failed. These are ignored to105/// make the CI tests less flaky.106pub fn permissive_blocking_resolve_addresses(107&self,108name: &str,109) -> Result<Vec<IpAddress>, ErrorCode> {110match self.blocking_resolve_addresses(name) {111Err(ErrorCode::NameUnresolvable | ErrorCode::TemporaryResolverFailure) => Ok(vec![]),112r => r,113}114}115}116117impl TcpSocket {118pub fn new(address_family: IpAddressFamily) -> Result<TcpSocket, ErrorCode> {119tcp_create_socket::create_tcp_socket(address_family)120}121122pub fn blocking_bind(123&self,124network: &Network,125local_address: IpSocketAddress,126) -> Result<(), ErrorCode> {127let timeout = monotonic_clock::subscribe_duration(TIMEOUT_NS);128let sub = self.subscribe();129130self.start_bind(&network, local_address)?;131132loop {133match self.finish_bind() {134Err(ErrorCode::WouldBlock) => sub.block_until(&timeout)?,135result => return result,136}137}138}139140pub fn blocking_listen(&self) -> Result<(), ErrorCode> {141let timeout = monotonic_clock::subscribe_duration(TIMEOUT_NS);142let sub = self.subscribe();143144self.start_listen()?;145146loop {147match self.finish_listen() {148Err(ErrorCode::WouldBlock) => sub.block_until(&timeout)?,149result => return result,150}151}152}153154pub fn blocking_connect(155&self,156network: &Network,157remote_address: IpSocketAddress,158) -> Result<(InputStream, OutputStream), ErrorCode> {159let timeout = monotonic_clock::subscribe_duration(TIMEOUT_NS);160let sub = self.subscribe();161162self.start_connect(&network, remote_address)?;163164loop {165match self.finish_connect() {166Err(ErrorCode::WouldBlock) => sub.block_until(&timeout)?,167result => return result,168}169}170}171172pub fn blocking_accept(&self) -> Result<(TcpSocket, InputStream, OutputStream), ErrorCode> {173let timeout = monotonic_clock::subscribe_duration(TIMEOUT_NS);174let sub = self.subscribe();175176loop {177match self.accept() {178Err(ErrorCode::WouldBlock) => sub.block_until(&timeout)?,179result => return result,180}181}182}183}184185impl UdpSocket {186pub fn new(address_family: IpAddressFamily) -> Result<UdpSocket, ErrorCode> {187udp_create_socket::create_udp_socket(address_family)188}189190pub fn blocking_bind(191&self,192network: &Network,193local_address: IpSocketAddress,194) -> Result<(), ErrorCode> {195let timeout = monotonic_clock::subscribe_duration(TIMEOUT_NS);196let sub = self.subscribe();197198self.start_bind(&network, local_address)?;199200loop {201match self.finish_bind() {202Err(ErrorCode::WouldBlock) => sub.block_until(&timeout)?,203result => return result,204}205}206}207208pub fn blocking_bind_unspecified(&self, network: &Network) -> Result<(), ErrorCode> {209let ip = IpAddress::new_unspecified(self.address_family());210let port = 0;211212self.blocking_bind(network, IpSocketAddress::new(ip, port))213}214}215216impl OutgoingDatagramStream {217fn blocking_check_send(&self, timeout: &Pollable) -> Result<u64, ErrorCode> {218let sub = self.subscribe();219220loop {221match self.check_send() {222Ok(0) => sub.block_until(timeout)?,223result => return result,224}225}226}227228pub fn blocking_send(&self, mut datagrams: &[OutgoingDatagram]) -> Result<(), ErrorCode> {229let timeout = monotonic_clock::subscribe_duration(TIMEOUT_NS);230231while !datagrams.is_empty() {232let permit = self.blocking_check_send(&timeout)?;233let chunk_len = datagrams.len().min(permit as usize);234match self.send(&datagrams[..chunk_len]) {235Ok(0) => {}236Ok(packets_sent) => {237let packets_sent = packets_sent as usize;238datagrams = &datagrams[packets_sent..];239}240Err(err) => return Err(err),241}242}243244Ok(())245}246}247248impl IncomingDatagramStream {249pub fn blocking_receive(&self, count: Range<u64>) -> Result<Vec<IncomingDatagram>, ErrorCode> {250let timeout = monotonic_clock::subscribe_duration(TIMEOUT_NS);251let pollable = self.subscribe();252let mut datagrams = vec![];253254loop {255match self.receive(count.end - datagrams.len() as u64) {256Ok(mut chunk) => {257datagrams.append(&mut chunk);258259if datagrams.len() >= count.start as usize {260return Ok(datagrams);261} else {262pollable.block_until(&timeout)?;263}264}265Err(err) => return Err(err),266}267}268}269}270271impl IpAddress {272pub const IPV4_BROADCAST: IpAddress = IpAddress::Ipv4((255, 255, 255, 255));273274pub const IPV4_LOOPBACK: IpAddress = IpAddress::Ipv4((127, 0, 0, 1));275pub const IPV6_LOOPBACK: IpAddress = IpAddress::Ipv6((0, 0, 0, 0, 0, 0, 0, 1));276277pub const IPV4_UNSPECIFIED: IpAddress = IpAddress::Ipv4((0, 0, 0, 0));278pub const IPV6_UNSPECIFIED: IpAddress = IpAddress::Ipv6((0, 0, 0, 0, 0, 0, 0, 0));279280pub const IPV4_MAPPED_LOOPBACK: IpAddress =281IpAddress::Ipv6((0, 0, 0, 0, 0, 0xFFFF, 0x7F00, 0x0001));282283pub const fn new_loopback(family: IpAddressFamily) -> IpAddress {284match family {285IpAddressFamily::Ipv4 => Self::IPV4_LOOPBACK,286IpAddressFamily::Ipv6 => Self::IPV6_LOOPBACK,287}288}289290pub const fn new_unspecified(family: IpAddressFamily) -> IpAddress {291match family {292IpAddressFamily::Ipv4 => Self::IPV4_UNSPECIFIED,293IpAddressFamily::Ipv6 => Self::IPV6_UNSPECIFIED,294}295}296297pub const fn family(&self) -> IpAddressFamily {298match self {299IpAddress::Ipv4(_) => IpAddressFamily::Ipv4,300IpAddress::Ipv6(_) => IpAddressFamily::Ipv6,301}302}303}304305impl PartialEq for IpAddress {306fn eq(&self, other: &Self) -> bool {307match (self, other) {308(Self::Ipv4(left), Self::Ipv4(right)) => left == right,309(Self::Ipv6(left), Self::Ipv6(right)) => left == right,310_ => false,311}312}313}314315impl IpSocketAddress {316pub const fn new(ip: IpAddress, port: u16) -> IpSocketAddress {317match ip {318IpAddress::Ipv4(addr) => IpSocketAddress::Ipv4(Ipv4SocketAddress {319port,320address: addr,321}),322IpAddress::Ipv6(addr) => IpSocketAddress::Ipv6(Ipv6SocketAddress {323port,324address: addr,325flow_info: 0,326scope_id: 0,327}),328}329}330331pub const fn ip(&self) -> IpAddress {332match self {333IpSocketAddress::Ipv4(addr) => IpAddress::Ipv4(addr.address),334IpSocketAddress::Ipv6(addr) => IpAddress::Ipv6(addr.address),335}336}337338pub const fn port(&self) -> u16 {339match self {340IpSocketAddress::Ipv4(addr) => addr.port,341IpSocketAddress::Ipv6(addr) => addr.port,342}343}344345pub const fn family(&self) -> IpAddressFamily {346match self {347IpSocketAddress::Ipv4(_) => IpAddressFamily::Ipv4,348IpSocketAddress::Ipv6(_) => IpAddressFamily::Ipv6,349}350}351}352353impl PartialEq for Ipv4SocketAddress {354fn eq(&self, other: &Self) -> bool {355self.port == other.port && self.address == other.address356}357}358359impl PartialEq for Ipv6SocketAddress {360fn eq(&self, other: &Self) -> bool {361self.port == other.port362&& self.flow_info == other.flow_info363&& self.address == other.address364&& self.scope_id == other.scope_id365}366}367368impl PartialEq for IpSocketAddress {369fn eq(&self, other: &Self) -> bool {370match (self, other) {371(Self::Ipv4(l0), Self::Ipv4(r0)) => l0 == r0,372(Self::Ipv6(l0), Self::Ipv6(r0)) => l0 == r0,373_ => false,374}375}376}377378fn generate_random_u16(range: Range<u16>) -> u16 {379let start = range.start as u64;380let end = range.end as u64;381let port = start + (random::random::get_random_u64() % (end - start));382port as u16383}384385/// Execute the inner function with a randomly generated port.386/// To prevent random failures, we make a few attempts before giving up.387pub fn attempt_random_port<F>(388local_address: IpAddress,389mut f: F,390) -> Result<IpSocketAddress, ErrorCode>391where392F: FnMut(IpSocketAddress) -> Result<(), ErrorCode>,393{394const MAX_ATTEMPTS: u32 = 10;395let mut i = 0;396loop {397i += 1;398399let port: u16 = generate_random_u16(1024..u16::MAX);400let sock_addr = IpSocketAddress::new(local_address, port);401402match f(sock_addr) {403Ok(_) => return Ok(sock_addr),404Err(e) if i >= MAX_ATTEMPTS => return Err(e),405// Try again if the port is already taken. This can sometimes show up as `AccessDenied` on Windows.406Err(ErrorCode::AddressInUse | ErrorCode::AccessDenied) => {}407Err(e) => return Err(e),408}409}410}411412413