Path: blob/main/tests/sys/netlink/test_rtnl_ifaddr.py
39483 views
import ipaddress1import socket2import struct34import pytest5from atf_python.sys.net.vnet import SingleVnetTestTemplate6from atf_python.sys.netlink.attrs import NlAttr7from atf_python.sys.netlink.attrs import NlAttrIp8from atf_python.sys.netlink.attrs import NlAttrNested9from atf_python.sys.netlink.attrs import NlAttrU3210from atf_python.sys.netlink.base_headers import NlmBaseFlags11from atf_python.sys.netlink.base_headers import NlmNewFlags12from atf_python.sys.netlink.base_headers import Nlmsghdr13from atf_python.sys.netlink.message import NlMsgType14from atf_python.sys.netlink.netlink import NetlinkTestTemplate15from atf_python.sys.netlink.netlink import Nlsock16from atf_python.sys.netlink.netlink_generic import CarpAttrType17from atf_python.sys.netlink.netlink_generic import CarpGenMessage18from atf_python.sys.netlink.netlink_generic import CarpMsgType19from atf_python.sys.netlink.netlink_route import IfaAttrType20from atf_python.sys.netlink.netlink_route import IfaCacheInfo21from atf_python.sys.netlink.netlink_route import IfafAttrType22from atf_python.sys.netlink.netlink_route import IfafFlags623from atf_python.sys.netlink.netlink_route import IfaFlags24from atf_python.sys.netlink.netlink_route import NetlinkIfaMessage25from atf_python.sys.netlink.netlink_route import NlRtMsgType26from atf_python.sys.netlink.netlink_route import RtScope27from atf_python.sys.netlink.utils import enum_or_int28from atf_python.sys.netlink.utils import NlConst293031class TestRtNlIfaddrList(NetlinkTestTemplate, SingleVnetTestTemplate):32def setup_method(self, method):33method_name = method.__name__34if "4" in method_name:35if "nofilter" in method_name:36self.IPV4_PREFIXES = ["192.0.2.1/24", "169.254.169.254/16"]37else:38self.IPV4_PREFIXES = ["192.0.2.1/24"]39if "6" in method_name:40self.IPV6_PREFIXES = ["2001:db8::1/64"]41super().setup_method(method)42self.setup_netlink(NlConst.NETLINK_ROUTE)4344def test_46_nofilter(self):45"""Tests that listing outputs both IPv4/IPv6 and interfaces"""46msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)47msg.set_request()48self.write_message(msg)4950ret = []51for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):52ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index)53family = rx_msg.base_hdr.ifa_family54scope = rx_msg.base_hdr.ifa_scope55ret.append((ifname, family, scope))5657ifname = "lo0"58assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET and r[2] == RtScope.RT_SCOPE_HOST.value]) == 159assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6 and r[2] == RtScope.RT_SCOPE_HOST.value]) == 160assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6 and r[2] == RtScope.RT_SCOPE_LINK.value]) == 161assert len([r for r in ret if r[0] == ifname]) == 36263ifname = self.vnet.iface_alias_map["if1"].name64assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET and r[2] == RtScope.RT_SCOPE_LINK.value]) == 165assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET and r[2] == RtScope.RT_SCOPE_UNIVERSE.value]) == 166assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6 and r[2] == RtScope.RT_SCOPE_LINK.value]) == 167assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6 and r[2] == RtScope.RT_SCOPE_UNIVERSE.value]) == 168assert len([r for r in ret if r[0] == ifname]) == 46970def test_46_filter_iface(self):71"""Tests that listing outputs both IPv4/IPv6 for the specific interface"""72epair_ifname = self.vnet.iface_alias_map["if1"].name7374msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)75msg.set_request()76msg.base_hdr.ifa_index = socket.if_nametoindex(epair_ifname)77self.write_message(msg)7879ret = []80for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):81ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index)82family = rx_msg.base_hdr.ifa_family83ret.append((ifname, family, rx_msg))8485ifname = epair_ifname86assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET]) == 187assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6]) == 288assert len(ret) == 38990def test_46_filter_family_compat(self):91"""Tests that family filtering works with the stripped header"""9293hdr = Nlmsghdr(94nlmsg_len=17,95nlmsg_type=NlRtMsgType.RTM_GETADDR.value,96nlmsg_flags=NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value,97nlmsg_seq=self.helper.get_seq(),98)99data = bytes(hdr) + struct.pack("@B", socket.AF_INET)100self.nlsock.write_data(data)101102ret = []103for rx_msg in self.read_msg_list(hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):104ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index)105family = rx_msg.base_hdr.ifa_family106ret.append((ifname, family, rx_msg))107assert len(ret) == 2108109def filter_iface_family(self, family, num_items):110"""Tests that listing outputs IPv4 for the specific interface"""111epair_ifname = self.vnet.iface_alias_map["if1"].name112113msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)114msg.set_request()115msg.base_hdr.ifa_family = family116msg.base_hdr.ifa_index = socket.if_nametoindex(epair_ifname)117self.write_message(msg)118119ret = []120for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):121assert family == rx_msg.base_hdr.ifa_family122assert epair_ifname == socket.if_indextoname(rx_msg.base_hdr.ifa_index)123ret.append(rx_msg)124assert len(ret) == num_items125return ret126127def test_4_broadcast(self):128"""Tests header/attr output for listing IPv4 ifas on broadcast iface"""129ret = self.filter_iface_family(socket.AF_INET, 1)130# Should be 192.0.2.1/24131msg = ret[0]132# Family and ifindex has been checked already133assert msg.base_hdr.ifa_prefixlen == 24134# Ignore IFA_FLAGS for now135assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value136137assert msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == "192.0.2.1"138assert msg.get_nla(IfaAttrType.IFA_LOCAL).addr == "192.0.2.1"139assert msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == "192.0.2.255"140141epair_ifname = self.vnet.iface_alias_map["if1"].name142assert msg.get_nla(IfaAttrType.IFA_LABEL).text == epair_ifname143144def test_6_broadcast(self):145"""Tests header/attr output for listing IPv6 ifas on broadcast iface"""146ret = self.filter_iface_family(socket.AF_INET6, 2)147# Should be 192.0.2.1/24148if ret[0].base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value:149(gmsg, lmsg) = ret150else:151(lmsg, gmsg) = ret152# Start with global ( 2001:db8::1/64 )153msg = gmsg154# Family and ifindex has been checked already155assert msg.base_hdr.ifa_prefixlen == 64156# Ignore IFA_FLAGS for now157assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value158159assert msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == "2001:db8::1"160assert msg.get_nla(IfaAttrType.IFA_LOCAL) is None161assert msg.get_nla(IfaAttrType.IFA_BROADCAST) is None162163epair_ifname = self.vnet.iface_alias_map["if1"].name164assert msg.get_nla(IfaAttrType.IFA_LABEL).text == epair_ifname165166# Local: fe80::/64167msg = lmsg168assert msg.base_hdr.ifa_prefixlen == 64169# Ignore IFA_FLAGS for now170assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_LINK.value171172addr = ipaddress.ip_address(msg.get_nla(IfaAttrType.IFA_ADDRESS).addr)173assert addr.is_link_local174# Verify that ifindex is not emmbedded175assert struct.unpack("!H", addr.packed[2:4])[0] == 0176assert msg.get_nla(IfaAttrType.IFA_LOCAL) is None177assert msg.get_nla(IfaAttrType.IFA_BROADCAST) is None178179epair_ifname = self.vnet.iface_alias_map["if1"].name180assert msg.get_nla(IfaAttrType.IFA_LABEL).text == epair_ifname181182183class RtnlIfaOps(NetlinkTestTemplate, SingleVnetTestTemplate):184def setup_method(self, method):185super().setup_method(method)186self.setup_netlink(NlConst.NETLINK_ROUTE)187188def send_check_success(self, msg):189rx_msg = self.get_reply(msg)190assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)191assert rx_msg.error_code == 0192193@staticmethod194def get_family_from_ip(ip):195if ip.version == 4:196return socket.AF_INET197return socket.AF_INET6198199def create_msg(self, ifa):200iface = self.vnet.iface_alias_map["if1"]201202msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_NEWADDR.value)203msg.set_request()204msg.nl_hdr.nlmsg_flags |= (205NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value206)207msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip)208msg.base_hdr.ifa_index = iface.ifindex209msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen210return msg211212def get_ifa_list(self, ifindex=0, family=0):213msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)214msg.set_request()215msg.base_hdr.ifa_family = family216msg.base_hdr.ifa_index = ifindex217self.write_message(msg)218return self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR)219220def find_msg_by_ifa(self, msg_list, ip):221for msg in msg_list:222if msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ip):223return msg224return None225226def setup_dummy_carp(self, ifindex: int, vhid: int):227self.require_module("carp")228229nlsock = Nlsock(NlConst.NETLINK_GENERIC, self.helper)230family_id = nlsock.get_genl_family_id("carp")231232msg = CarpGenMessage(self.helper, family_id, CarpMsgType.CARP_NL_CMD_SET)233msg.set_request()234msg.add_nla(NlAttrU32(CarpAttrType.CARP_NL_VHID, vhid))235msg.add_nla(NlAttrU32(CarpAttrType.CARP_NL_IFINDEX, ifindex))236rx_msg = nlsock.get_reply(msg)237238assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)239assert rx_msg.error_code == 0240241242class TestRtNlIfaddrOpsBroadcast(RtnlIfaOps):243def test_add_4(self):244"""Tests IPv4 address addition to the standard broadcast interface"""245ifa = ipaddress.ip_interface("192.0.2.1/24")246ifa_brd = ifa.network.broadcast_address247iface = self.vnet.iface_alias_map["if1"]248249msg = self.create_msg(ifa)250msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))251msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))252253self.send_check_success(msg)254255lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))256assert len(lst) == 1257rx_msg = lst[0]258259assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen260assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value261262assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)263assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)264assert rx_msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == str(ifa_brd)265266@pytest.mark.parametrize(267"brd",268[269pytest.param((32, True, "192.0.2.1"), id="auto_32"),270pytest.param((31, True, "255.255.255.255"), id="auto_31"),271pytest.param((30, True, "192.0.2.3"), id="auto_30"),272pytest.param((30, False, "192.0.2.2"), id="custom_30"),273pytest.param((24, False, "192.0.2.7"), id="custom_24"),274],275)276def test_add_4_brd(self, brd):277"""Tests proper broadcast setup when adding IPv4 ifa"""278plen, auto_brd, ifa_brd_str = brd279ifa = ipaddress.ip_interface("192.0.2.1/{}".format(plen))280iface = self.vnet.iface_alias_map["if1"]281ifa_brd = ipaddress.ip_address(ifa_brd_str)282283msg = self.create_msg(ifa)284msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))285if not auto_brd:286msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))287288self.send_check_success(msg)289290lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))291assert len(lst) == 1292rx_msg = lst[0]293294assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen295assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value296297assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)298assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)299assert rx_msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == str(ifa_brd)300301def test_add_6(self):302ifa = ipaddress.ip_interface("2001:db8::1/64")303iface = self.vnet.iface_alias_map["if1"]304305msg = self.create_msg(ifa)306msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))307308self.send_check_success(msg)309310lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))311assert len(lst) == 2312rx_msg_gu = self.find_msg_by_ifa(lst, ifa.ip)313assert rx_msg_gu is not None314315assert rx_msg_gu.base_hdr.ifa_prefixlen == ifa.network.prefixlen316assert rx_msg_gu.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value317assert rx_msg_gu.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)318319def test_add_4_carp(self):320ifa = ipaddress.ip_interface("192.0.2.1/24")321ifa_brd = ifa.network.broadcast_address322iface = self.vnet.iface_alias_map["if1"]323vhid = 77324325self.setup_dummy_carp(iface.ifindex, vhid)326327msg = self.create_msg(ifa)328msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))329msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))330attrs_bsd = [NlAttrU32(IfafAttrType.IFAF_VHID, vhid)]331msg.add_nla(NlAttrNested(IfaAttrType.IFA_FREEBSD, attrs_bsd))332333self.send_check_success(msg)334335lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))336assert len(lst) == 1337rx_msg = lst[0]338339assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen340assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value341342assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)343assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)344assert rx_msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == str(ifa_brd)345ifa_bsd = rx_msg.get_nla(IfaAttrType.IFA_FREEBSD)346assert ifa_bsd.get_nla(IfafAttrType.IFAF_VHID).u32 == vhid347348def test_add_6_carp(self):349ifa = ipaddress.ip_interface("2001:db8::1/64")350iface = self.vnet.iface_alias_map["if1"]351vhid = 77352353self.setup_dummy_carp(iface.ifindex, vhid)354355msg = self.create_msg(ifa)356msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))357attrs_bsd = [NlAttrU32(IfafAttrType.IFAF_VHID, vhid)]358msg.add_nla(NlAttrNested(IfaAttrType.IFA_FREEBSD, attrs_bsd))359360self.send_check_success(msg)361362lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))363assert len(lst) == 2364rx_msg_gu = self.find_msg_by_ifa(lst, ifa.ip)365assert rx_msg_gu is not None366367assert rx_msg_gu.base_hdr.ifa_prefixlen == ifa.network.prefixlen368assert rx_msg_gu.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value369assert rx_msg_gu.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)370ifa_bsd = rx_msg_gu.get_nla(IfaAttrType.IFA_FREEBSD)371assert ifa_bsd.get_nla(IfafAttrType.IFAF_VHID).u32 == vhid372373def test_add_6_lifetime(self):374ifa = ipaddress.ip_interface("2001:db8::1/64")375iface = self.vnet.iface_alias_map["if1"]376pref_time = 43200377valid_time = 86400378379ci = IfaCacheInfo(ifa_prefered=pref_time, ifa_valid=valid_time)380381msg = self.create_msg(ifa)382msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))383msg.add_nla(NlAttr(IfaAttrType.IFA_CACHEINFO, bytes(ci)))384385self.send_check_success(msg)386387lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))388assert len(lst) == 2389rx_msg = self.find_msg_by_ifa(lst, ifa.ip)390assert rx_msg is not None391392ci = rx_msg.get_nla(IfaAttrType.IFA_CACHEINFO).ci393assert pref_time - 5 <= ci.ifa_prefered <= pref_time394assert valid_time - 5 <= ci.ifa_valid <= valid_time395assert ci.cstamp > 0396assert ci.tstamp > 0397assert ci.tstamp >= ci.cstamp398399@pytest.mark.parametrize(400"flags_str",401[402"autoconf",403"deprecated",404"autoconf,deprecated",405"prefer_source",406],407)408def test_add_6_flags(self, flags_str):409ifa = ipaddress.ip_interface("2001:db8::1/64")410iface = self.vnet.iface_alias_map["if1"]411412flags_map = {413"autoconf": {"nl": 0, "f": IfafFlags6.IN6_IFF_AUTOCONF},414"deprecated": {415"nl": IfaFlags.IFA_F_DEPRECATED,416"f": IfafFlags6.IN6_IFF_DEPRECATED,417},418"prefer_source": {"nl": 0, "f": IfafFlags6.IN6_IFF_PREFER_SOURCE},419}420nl_flags = 0421f_flags = 0422423for flag_str in flags_str.split(","):424d = flags_map.get(flag_str, {})425nl_flags |= enum_or_int(d.get("nl", 0))426f_flags |= enum_or_int(d.get("f", 0))427428msg = self.create_msg(ifa)429msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))430msg.add_nla(NlAttrU32(IfaAttrType.IFA_FLAGS, nl_flags))431attrs_bsd = [NlAttrU32(IfafAttrType.IFAF_FLAGS, f_flags)]432msg.add_nla(NlAttrNested(IfaAttrType.IFA_FREEBSD, attrs_bsd))433434self.send_check_success(msg)435436lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))437assert len(lst) == 2438rx_msg = self.find_msg_by_ifa(lst, ifa.ip)439assert rx_msg is not None440441assert rx_msg.get_nla(IfaAttrType.IFA_FLAGS).u32 == nl_flags442ifa_bsd = rx_msg.get_nla(IfaAttrType.IFA_FREEBSD)443assert ifa_bsd.get_nla(IfafAttrType.IFAF_FLAGS).u32 == f_flags444445def test_add_4_empty_message(self):446"""Tests correct failure w/ empty message"""447msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_NEWADDR.value)448msg.set_request()449msg.nl_hdr.nlmsg_flags |= (450NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value451)452453rx_msg = self.get_reply(msg)454assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)455assert rx_msg.error_code != 0456457def test_add_4_empty_ifindex(self):458"""Tests correct failure w/ empty ifindex"""459ifa = ipaddress.ip_interface("192.0.2.1/24")460ifa_brd = ifa.network.broadcast_address461462msg = self.create_msg(ifa)463msg.base_hdr.ifa_index = 0464msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))465msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))466467rx_msg = self.get_reply(msg)468assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)469assert rx_msg.error_code != 0470471def test_add_4_empty_addr(self):472"""Tests correct failure w/ empty address"""473ifa = ipaddress.ip_interface("192.0.2.1/24")474ifa_brd = ifa.network.broadcast_address475476msg = self.create_msg(ifa)477msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))478479rx_msg = self.get_reply(msg)480assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)481assert rx_msg.error_code != 0482483@pytest.mark.parametrize(484"ifa_str",485[486pytest.param("192.0.2.1/32", id="ipv4_host"),487pytest.param("192.0.2.1/24", id="ipv4_prefix"),488pytest.param("2001:db8::1/64", id="ipv6_gu_prefix"),489pytest.param("2001:db8::1/128", id="ipv6_gu_host"),490],491)492@pytest.mark.parametrize(493"tlv",494[495pytest.param("local", id="ifa_local"),496pytest.param("address", id="ifa_address"),497],498)499def test_del(self, tlv, ifa_str):500"""Tests address deletion from the standard broadcast interface"""501ifa = ipaddress.ip_interface(ifa_str)502ifa_brd = ifa.network.broadcast_address503iface = self.vnet.iface_alias_map["if1"]504505msg = self.create_msg(ifa)506msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))507msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))508509self.send_check_success(msg)510lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))511rx_msg = self.find_msg_by_ifa(lst, ifa.ip)512assert rx_msg is not None513514msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_DELADDR.value)515msg.set_request()516msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip)517msg.base_hdr.ifa_index = iface.ifindex518msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen519520if tlv == "local":521msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))522if tlv == "address":523msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(ifa.ip)))524525self.send_check_success(msg)526lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))527rx_msg = self.find_msg_by_ifa(lst, ifa.ip)528assert rx_msg is None529530531class TestRtNlIfaddrOpsP2p(RtnlIfaOps):532IFTYPE = "gif"533534@pytest.mark.parametrize(535"ifa_pair",536[537pytest.param(["192.0.2.1/24", "192.0.2.2"], id="dst_inside_24"),538pytest.param(["192.0.2.1/30", "192.0.2.2"], id="dst_inside_30"),539pytest.param(["192.0.2.1/31", "192.0.2.2"], id="dst_inside_31"),540pytest.param(["192.0.2.1/32", "192.0.2.2"], id="dst_outside_32"),541pytest.param(["192.0.2.1/30", "192.0.2.100"], id="dst_outside_30"),542],543)544def test_add_4(self, ifa_pair):545"""Tests IPv4 address addition to the p2p interface"""546ifa = ipaddress.ip_interface(ifa_pair[0])547peer_ip = ipaddress.ip_address(ifa_pair[1])548iface = self.vnet.iface_alias_map["if1"]549550msg = self.create_msg(ifa)551msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))552msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip)))553554self.send_check_success(msg)555556lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))557assert len(lst) == 1558rx_msg = lst[0]559560assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen561assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value562563assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)564assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(peer_ip)565566@pytest.mark.parametrize(567"ifa_pair",568[569pytest.param(570["2001:db8::1/64", "2001:db8::2"],571id="dst_inside_64",572marks=pytest.mark.xfail(reason="currently fails"),573),574pytest.param(575["2001:db8::1/127", "2001:db8::2"],576id="dst_inside_127",577marks=pytest.mark.xfail(reason="currently fails"),578),579pytest.param(["2001:db8::1/128", "2001:db8::2"], id="dst_outside_128"),580pytest.param(581["2001:db8::1/64", "2001:db8:2::2"],582id="dst_outside_64",583marks=pytest.mark.xfail(reason="currently fails"),584),585],586)587def test_add_6(self, ifa_pair):588"""Tests IPv6 address addition to the p2p interface"""589ifa = ipaddress.ip_interface(ifa_pair[0])590peer_ip = ipaddress.ip_address(ifa_pair[1])591iface = self.vnet.iface_alias_map["if1"]592593msg = self.create_msg(ifa)594msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))595msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip)))596597self.send_check_success(msg)598599lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))600assert len(lst) == 2601rx_msg_gu = self.find_msg_by_ifa(lst, peer_ip)602assert rx_msg_gu is not None603604assert rx_msg_gu.base_hdr.ifa_prefixlen == ifa.network.prefixlen605assert rx_msg_gu.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value606assert rx_msg_gu.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)607assert rx_msg_gu.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(peer_ip)608609@pytest.mark.parametrize(610"ifa_pair",611[612pytest.param(["192.0.2.1/30", "192.0.2.2"], id="ipv4_dst_inside_30"),613pytest.param(["192.0.2.1/32", "192.0.2.2"], id="ipv4_dst_outside_32"),614pytest.param(["2001:db8::1/128", "2001:db8::2"], id="ip6_dst_outside_128"),615],616)617@pytest.mark.parametrize(618"tlv_pair",619[620pytest.param(["a", ""], id="ifa_addr=addr"),621pytest.param(["", "a"], id="ifa_local=addr"),622pytest.param(["a", "a"], id="ifa_addr=addr,ifa_local=addr"),623],624)625def test_del(self, tlv_pair, ifa_pair):626"""Tests address deletion from the P2P interface"""627ifa = ipaddress.ip_interface(ifa_pair[0])628peer_ip = ipaddress.ip_address(ifa_pair[1])629iface = self.vnet.iface_alias_map["if1"]630ifa_addr_str, ifa_local_str = tlv_pair631632msg = self.create_msg(ifa)633msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))634msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip)))635636self.send_check_success(msg)637lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))638rx_msg = self.find_msg_by_ifa(lst, peer_ip)639assert rx_msg is not None640641msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_DELADDR.value)642msg.set_request()643msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip)644msg.base_hdr.ifa_index = iface.ifindex645msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen646647if "a" in ifa_addr_str:648msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(ifa.ip)))649if "p" in ifa_addr_str:650msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip)))651if "a" in ifa_local_str:652msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))653if "p" in ifa_local_str:654msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(peer_ip)))655656self.send_check_success(msg)657lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))658rx_msg = self.find_msg_by_ifa(lst, ifa.ip)659assert rx_msg is None660661662class TestRtNlAddIfaddrLo(RtnlIfaOps):663IFTYPE = "lo"664665@pytest.mark.parametrize(666"ifa_str",667[668pytest.param("192.0.2.1/24", id="prefix"),669pytest.param("192.0.2.1/32", id="host"),670],671)672def test_add_4(self, ifa_str):673"""Tests IPv4 address addition to the loopback interface"""674ifa = ipaddress.ip_interface(ifa_str)675iface = self.vnet.iface_alias_map["if1"]676677msg = self.create_msg(ifa)678msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))679680self.send_check_success(msg)681682lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))683assert len(lst) == 1684rx_msg = lst[0]685686assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen687assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value688689assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)690assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)691692@pytest.mark.parametrize(693"ifa_str",694[695pytest.param("2001:db8::1/64", id="gu_prefix"),696pytest.param("2001:db8::1/128", id="gu_host"),697],698)699def test_add_6(self, ifa_str):700"""Tests IPv6 address addition to the loopback interface"""701ifa = ipaddress.ip_interface(ifa_str)702iface = self.vnet.iface_alias_map["if1"]703704msg = self.create_msg(ifa)705msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))706707self.send_check_success(msg)708709lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))710assert len(lst) == 2 # link-local should be auto-created as well711rx_msg = self.find_msg_by_ifa(lst, ifa.ip)712assert rx_msg is not None713714assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen715assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value716assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)717718@pytest.mark.parametrize(719"ifa_str",720[721pytest.param("192.0.2.1/32", id="ipv4_host"),722pytest.param("192.0.2.1/24", id="ipv4_prefix"),723pytest.param("2001:db8::1/64", id="ipv6_gu_prefix"),724pytest.param("2001:db8::1/128", id="ipv6_gu_host"),725],726)727@pytest.mark.parametrize(728"tlv",729[730pytest.param("local", id="ifa_local"),731pytest.param("address", id="ifa_address"),732],733)734def test_del(self, tlv, ifa_str):735"""Tests address deletion from the loopback interface"""736ifa = ipaddress.ip_interface(ifa_str)737iface = self.vnet.iface_alias_map["if1"]738739msg = self.create_msg(ifa)740msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))741742self.send_check_success(msg)743lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))744rx_msg = self.find_msg_by_ifa(lst, ifa.ip)745assert rx_msg is not None746747msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_DELADDR.value)748msg.set_request()749msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip)750msg.base_hdr.ifa_index = iface.ifindex751msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen752753if tlv == "local":754msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))755if tlv == "address":756msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(ifa.ip)))757758self.send_check_success(msg)759lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))760rx_msg = self.find_msg_by_ifa(lst, ifa.ip)761assert rx_msg is None762763764