Path: blob/main/tests/sys/netinet6/test_ip6_output.py
39536 views
import errno1import ipaddress2import socket3import struct4import time5from ctypes import c_byte6from ctypes import c_uint7from ctypes import Structure89import pytest10from atf_python.sys.net.rtsock import SaHelper11from atf_python.sys.net.tools import ToolsHelper12from atf_python.sys.net.vnet import run_cmd13from atf_python.sys.net.vnet import SingleVnetTestTemplate14from atf_python.sys.net.vnet import VnetTestTemplate151617class In6Pktinfo(Structure):18_fields_ = [19("ipi6_addr", c_byte * 16),20("ipi6_ifindex", c_uint),21]222324class VerboseSocketServer:25def __init__(self, ip: str, port: int, ifname: str = None):26self.ip = ip27self.port = port2829s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)30s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_RECVPKTINFO, 1)31addr = ipaddress.ip_address(ip)32if addr.is_link_local and ifname:33ifindex = socket.if_nametoindex(ifname)34addr_tuple = (ip, port, 0, ifindex)35elif addr.is_multicast and ifname:36ifindex = socket.if_nametoindex(ifname)37mreq = socket.inet_pton(socket.AF_INET6, ip) + struct.pack("I", ifindex)38s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)39print("## JOINED group {} % {}".format(ip, ifname))40addr_tuple = ("::", port, 0, ifindex)41else:42addr_tuple = (ip, port, 0, 0)43print("## Listening on [{}]:{}".format(addr_tuple[0], port))44s.bind(addr_tuple)45self.socket = s4647def recv(self):48# data = self.socket.recv(4096)49# print("RX: " + data)50data, ancdata, msg_flags, address = self.socket.recvmsg(4096, 128)51# Assume ancdata has just 1 item52info = In6Pktinfo.from_buffer_copy(ancdata[0][2])53dst_ip = socket.inet_ntop(socket.AF_INET6, info.ipi6_addr)54dst_iface = socket.if_indextoname(info.ipi6_ifindex)5556tx_obj = {57"data": data,58"src_ip": address[0],59"dst_ip": dst_ip,60"dst_iface": dst_iface,61}62return tx_obj636465class BaseTestIP6Ouput(VnetTestTemplate):66TOPOLOGY = {67"vnet1": {"ifaces": ["if1", "if2", "if3"]},68"vnet2": {"ifaces": ["if1", "if2", "if3"]},69"if1": {"prefixes6": [("2001:db8:a::1/64", "2001:db8:a::2/64")]},70"if2": {"prefixes6": [("2001:db8:b::1/64", "2001:db8:b::2/64")]},71"if3": {"prefixes6": [("2001:db8:c::1/64", "2001:db8:c::2/64")]},72}73DEFAULT_PORT = 453657475def _vnet2_handler(self, vnet, ip: str, os_ifname: str = None):76"""Generic listener that sends first received packet with metadata77back to the sender via pipw78"""79ll_data = ToolsHelper.get_linklocals()80# Start listener81ss = VerboseSocketServer(ip, self.DEFAULT_PORT, os_ifname)82vnet.pipe.send(ll_data)8384tx_obj = ss.recv()85tx_obj["dst_iface_alias"] = vnet.iface_map[tx_obj["dst_iface"]].alias86vnet.pipe.send(tx_obj)878889class TestIP6Output(BaseTestIP6Ouput):90def vnet2_handler(self, vnet):91ip = str(vnet.iface_alias_map["if2"].first_ipv6.ip)92self._vnet2_handler(vnet, ip, None)9394@pytest.mark.require_user("root")95def test_output6_base(self):96"""Tests simple UDP output"""97second_vnet = self.vnet_map["vnet2"]9899# Pick target on if2 vnet2's end100ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1])101ip = str(ifaddr.ip)102103s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)104data = bytes("AAAA", "utf-8")105print("## TX packet to {},{}".format(ip, self.DEFAULT_PORT))106107# Wait for the child to become ready108self.wait_object(second_vnet.pipe)109s.sendto(data, (ip, self.DEFAULT_PORT))110111# Wait for the received object112rx_obj = self.wait_object(second_vnet.pipe)113assert rx_obj["dst_ip"] == ip114assert rx_obj["dst_iface_alias"] == "if2"115116@pytest.mark.require_user("root")117def test_output6_nhop(self):118"""Tests UDP output with custom nhop set"""119second_vnet = self.vnet_map["vnet2"]120121# Pick target on if2 vnet2's end122ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1])123ip_dst = str(ifaddr.ip)124# Pick nexthop on if1125ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if1"]["prefixes6"][0][1])126ip_next = str(ifaddr.ip)127sin6_next = SaHelper.ip6_sa(ip_next, 0)128129s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, 0)130s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_NEXTHOP, sin6_next)131132# Wait for the child to become ready133self.wait_object(second_vnet.pipe)134data = bytes("AAAA", "utf-8")135s.sendto(data, (ip_dst, self.DEFAULT_PORT))136137# Wait for the received object138rx_obj = self.wait_object(second_vnet.pipe)139assert rx_obj["dst_ip"] == ip_dst140assert rx_obj["dst_iface_alias"] == "if1"141142@pytest.mark.parametrize(143"params",144[145# esrc: src-ip, if: src-interface, esrc: expected-src,146# eif: expected-rx-interface147pytest.param({"esrc": "2001:db8:b::1", "eif": "if2"}, id="empty"),148pytest.param(149{"src": "2001:db8:c::1", "esrc": "2001:db8:c::1", "eif": "if2"},150id="iponly1",151),152pytest.param(153{154"src": "2001:db8:c::1",155"if": "if3",156"ex": errno.EHOSTUNREACH,157},158id="ipandif",159),160pytest.param(161{162"src": "2001:db8:c::aaaa",163"ex": errno.EADDRNOTAVAIL,164},165id="nolocalip",166),167pytest.param(168{"if": "if2", "src": "2001:db8:b::1", "eif": "if2"}, id="ifsame"169),170],171)172@pytest.mark.require_user("root")173def test_output6_pktinfo(self, params):174"""Tests simple UDP output"""175second_vnet = self.vnet_map["vnet2"]176vnet = self.vnet177178# Pick target on if2 vnet2's end179ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1])180dst_ip = str(ifaddr.ip)181182src_ip = params.get("src", "")183src_ifname = params.get("if", "")184expected_ip = params.get("esrc", "")185expected_ifname = params.get("eif", "")186errno = params.get("ex", 0)187188pktinfo = In6Pktinfo()189if src_ip:190for i, b in enumerate(socket.inet_pton(socket.AF_INET6, src_ip)):191pktinfo.ipi6_addr[i] = b192if src_ifname:193os_ifname = vnet.iface_alias_map[src_ifname].name194pktinfo.ipi6_ifindex = socket.if_nametoindex(os_ifname)195196# Wait for the child to become ready197self.wait_object(second_vnet.pipe)198data = bytes("AAAA", "utf-8")199200s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, 0)201try:202s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_PKTINFO, bytes(pktinfo))203aux = (socket.IPPROTO_IPV6, socket.IPV6_PKTINFO, bytes(pktinfo))204s.sendto(data, (dst_ip, self.DEFAULT_PORT))205except OSError as e:206if not errno:207raise208assert e.errno == errno209print("Correctly raised {}".format(e))210return211212# Wait for the received object213rx_obj = self.wait_object(second_vnet.pipe)214215assert rx_obj["dst_ip"] == dst_ip216if expected_ip:217assert rx_obj["src_ip"] == expected_ip218if expected_ifname:219assert rx_obj["dst_iface_alias"] == expected_ifname220221222class TestIP6OutputLL(BaseTestIP6Ouput):223def vnet2_handler(self, vnet):224"""Generic listener that sends first received packet with metadata225back to the sender via pipw226"""227os_ifname = vnet.iface_alias_map["if2"].name228ll_data = ToolsHelper.get_linklocals()229ll_ip, _ = ll_data[os_ifname][0]230self._vnet2_handler(vnet, ll_ip, os_ifname)231232@pytest.mark.require_user("root")233def test_output6_linklocal(self):234"""Tests simple UDP output"""235second_vnet = self.vnet_map["vnet2"]236237# Wait for the child to become ready238ll_data = self.wait_object(second_vnet.pipe)239240# Pick LL address on if2 vnet2's end241ip, _ = ll_data[second_vnet.iface_alias_map["if2"].name][0]242# Get local interface scope243os_ifname = self.vnet.iface_alias_map["if2"].name244scopeid = socket.if_nametoindex(os_ifname)245246s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)247data = bytes("AAAA", "utf-8")248target = (ip, self.DEFAULT_PORT, 0, scopeid)249print("## TX packet to {}%{},{}".format(ip, scopeid, target[1]))250251s.sendto(data, target)252253# Wait for the received object254rx_obj = self.wait_object(second_vnet.pipe)255assert rx_obj["dst_ip"] == ip256assert rx_obj["dst_iface_alias"] == "if2"257258259class TestIP6OutputNhopLL(BaseTestIP6Ouput):260def vnet2_handler(self, vnet):261"""Generic listener that sends first received packet with metadata262back to the sender via pipw263"""264ip = str(vnet.iface_alias_map["if2"].first_ipv6.ip)265self._vnet2_handler(vnet, ip, None)266267@pytest.mark.require_user("root")268def test_output6_nhop_linklocal(self):269"""Tests UDP output with custom link-local nhop set"""270second_vnet = self.vnet_map["vnet2"]271272# Wait for the child to become ready273ll_data = self.wait_object(second_vnet.pipe)274275# Pick target on if2 vnet2's end276ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1])277ip_dst = str(ifaddr.ip)278# Pick nexthop on if1279ip_next, _ = ll_data[second_vnet.iface_alias_map["if1"].name][0]280# Get local interfaces281os_ifname = self.vnet.iface_alias_map["if1"].name282scopeid = socket.if_nametoindex(os_ifname)283sin6_next = SaHelper.ip6_sa(ip_next, scopeid)284285s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, 0)286s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_NEXTHOP, sin6_next)287288data = bytes("AAAA", "utf-8")289s.sendto(data, (ip_dst, self.DEFAULT_PORT))290291# Wait for the received object292rx_obj = self.wait_object(second_vnet.pipe)293assert rx_obj["dst_ip"] == ip_dst294assert rx_obj["dst_iface_alias"] == "if1"295296297class TestIP6OutputScope(BaseTestIP6Ouput):298def vnet2_handler(self, vnet):299"""Generic listener that sends first received packet with metadata300back to the sender via pipw301"""302bind_ip, bind_ifp = self.wait_object(vnet.pipe)303if bind_ip is None:304os_ifname = vnet.iface_alias_map[bind_ifp].name305ll_data = ToolsHelper.get_linklocals()306bind_ip, _ = ll_data[os_ifname][0]307if bind_ifp is not None:308bind_ifp = vnet.iface_alias_map[bind_ifp].name309print("## BIND {}%{}".format(bind_ip, bind_ifp))310self._vnet2_handler(vnet, bind_ip, bind_ifp)311312@pytest.mark.parametrize(313"params",314[315# sif/dif: source/destination interface (for link-local addr)316# sip/dip: source/destination ip (for non-LL addr)317# ex: OSError errno that sendto() must raise318pytest.param({"sif": "if2", "dif": "if2"}, id="same"),319pytest.param(320{321"sif": "if1",322"dif": "if2",323"ex": errno.EHOSTUNREACH,324},325id="ll_differentif1",326),327pytest.param(328{329"sif": "if1",330"dip": "2001:db8:b::2",331"ex": errno.EHOSTUNREACH,332},333id="ll_differentif2",334),335pytest.param(336{337"sip": "2001:db8:a::1",338"dif": "if2",339},340id="gu_to_ll",341),342],343)344@pytest.mark.require_user("root")345def test_output6_linklocal_scope(self, params):346"""Tests simple UDP output"""347second_vnet = self.vnet_map["vnet2"]348349src_ifp = params.get("sif")350src_ip = params.get("sip")351dst_ifp = params.get("dif")352dst_ip = params.get("dip")353errno = params.get("ex", 0)354355# Sent ifp/IP to bind on356second_vnet = self.vnet_map["vnet2"]357second_vnet.pipe.send((dst_ip, dst_ifp))358359# Wait for the child to become ready360ll_data = self.wait_object(second_vnet.pipe)361362if dst_ip is None:363# Pick LL address on dst_ifp vnet2's end364dst_ip, _ = ll_data[second_vnet.iface_alias_map[dst_ifp].name][0]365# Get local interface scope366os_ifname = self.vnet.iface_alias_map[dst_ifp].name367scopeid = socket.if_nametoindex(os_ifname)368target = (dst_ip, self.DEFAULT_PORT, 0, scopeid)369else:370target = (dst_ip, self.DEFAULT_PORT, 0, 0)371372# Bind373if src_ip is None:374ll_data = ToolsHelper.get_linklocals()375os_ifname = self.vnet.iface_alias_map[src_ifp].name376src_ip, _ = ll_data[os_ifname][0]377scopeid = socket.if_nametoindex(os_ifname)378src = (src_ip, self.DEFAULT_PORT, 0, scopeid)379else:380src = (src_ip, self.DEFAULT_PORT, 0, 0)381382s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)383s.bind(src)384data = bytes("AAAA", "utf-8")385print("## TX packet {} -> {}".format(src, target))386387try:388s.sendto(data, target)389except OSError as e:390if not errno:391raise392assert e.errno == errno393print("Correctly raised {}".format(e))394return395396# Wait for the received object397rx_obj = self.wait_object(second_vnet.pipe)398assert rx_obj["dst_ip"] == dst_ip399assert rx_obj["src_ip"] == src_ip400# assert rx_obj["dst_iface_alias"] == "if2"401402403class TestIP6OutputMulticast(BaseTestIP6Ouput):404def vnet2_handler(self, vnet):405group = self.wait_object(vnet.pipe)406os_ifname = vnet.iface_alias_map["if2"].name407self._vnet2_handler(vnet, group, os_ifname)408409@pytest.mark.parametrize("group_scope", ["ff02", "ff05", "ff08", "ff0e"])410@pytest.mark.require_user("root")411def test_output6_multicast(self, group_scope):412"""Tests simple UDP output"""413second_vnet = self.vnet_map["vnet2"]414415group = "{}::3456".format(group_scope)416second_vnet.pipe.send(group)417418# Pick target on if2 vnet2's end419ip = group420os_ifname = self.vnet.iface_alias_map["if2"].name421ifindex = socket.if_nametoindex(os_ifname)422optval = struct.pack("I", ifindex)423424s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)425s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, optval)426427data = bytes("AAAA", "utf-8")428429# Wait for the child to become ready430self.wait_object(second_vnet.pipe)431432print("## TX packet to {},{}".format(ip, self.DEFAULT_PORT))433s.sendto(data, (ip, self.DEFAULT_PORT))434435# Wait for the received object436rx_obj = self.wait_object(second_vnet.pipe)437assert rx_obj["dst_ip"] == ip438assert rx_obj["dst_iface_alias"] == "if2"439440441class TestIP6OutputLoopback(SingleVnetTestTemplate):442IPV6_PREFIXES = ["2001:db8:a::1/64"]443DEFAULT_PORT = 45365444445@pytest.mark.parametrize(446"source_validation",447[448pytest.param(0, id="no_sav"),449pytest.param(1, id="sav"),450],451)452@pytest.mark.parametrize("scope", ["gu", "ll", "lo"])453def test_output6_self_tcp(self, scope, source_validation):454"""Tests IPv6 TCP connection to the local IPv6 address"""455456ToolsHelper.set_sysctl(457"net.inet6.ip6.source_address_validation", source_validation458)459460if scope == "gu":461ip = "2001:db8:a::1"462addr_tuple = (ip, self.DEFAULT_PORT)463elif scope == "ll":464os_ifname = self.vnet.iface_alias_map["if1"].name465ifindex = socket.if_nametoindex(os_ifname)466ll_data = ToolsHelper.get_linklocals()467ip, _ = ll_data[os_ifname][0]468addr_tuple = (ip, self.DEFAULT_PORT, 0, ifindex)469elif scope == "lo":470ip = "::1"471ToolsHelper.get_output("route add -6 ::1/128 -iface lo0")472ifindex = socket.if_nametoindex("lo0")473addr_tuple = (ip, self.DEFAULT_PORT)474else:475assert 0 == 1476print("address: {}".format(addr_tuple))477478start = time.perf_counter()479ss = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)480ss.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_RECVPKTINFO, 1)481ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)482ss.bind(addr_tuple)483ss.listen()484s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)485s.settimeout(2.0)486s.connect(addr_tuple)487conn, from_addr = ss.accept()488duration = time.perf_counter() - start489490assert from_addr[0] == ip491assert duration < 1.0492493@pytest.mark.parametrize(494"source_validation",495[496pytest.param(0, id="no_sav"),497pytest.param(1, id="sav"),498],499)500@pytest.mark.parametrize("scope", ["gu", "ll", "lo"])501def test_output6_self_udp(self, scope, source_validation):502"""Tests IPv6 UDP connection to the local IPv6 address"""503504ToolsHelper.set_sysctl(505"net.inet6.ip6.source_address_validation", source_validation506)507508if scope == "gu":509ip = "2001:db8:a::1"510addr_tuple = (ip, self.DEFAULT_PORT)511elif scope == "ll":512os_ifname = self.vnet.iface_alias_map["if1"].name513ifindex = socket.if_nametoindex(os_ifname)514ll_data = ToolsHelper.get_linklocals()515ip, _ = ll_data[os_ifname][0]516addr_tuple = (ip, self.DEFAULT_PORT, 0, ifindex)517elif scope == "lo":518ip = "::1"519ToolsHelper.get_output("route add -6 ::1/128 -iface lo0")520ifindex = socket.if_nametoindex("lo0")521addr_tuple = (ip, self.DEFAULT_PORT)522else:523assert 0 == 1524print("address: {}".format(addr_tuple))525526start = time.perf_counter()527ss = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)528ss.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_RECVPKTINFO, 1)529ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)530ss.bind(addr_tuple)531ss.listen()532s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)533s.settimeout(2.0)534s.connect(addr_tuple)535conn, from_addr = ss.accept()536duration = time.perf_counter() - start537538assert from_addr[0] == ip539assert duration < 1.0540541542