Path: blob/main/tests/sys/netlink/test_rtnl_iface.py
39564 views
import errno1import socket23import pytest4from atf_python.sys.netlink.netlink_route import IflattrType5from atf_python.sys.netlink.netlink_route import IflinkInfo6from atf_python.sys.netlink.netlink_route import IfLinkInfoDataVlan7from atf_python.sys.netlink.netlink_route import NetlinkIflaMessage8from atf_python.sys.netlink.netlink import NetlinkTestTemplate9from atf_python.sys.netlink.attrs import NlAttrNested10from atf_python.sys.netlink.attrs import NlAttrStr11from atf_python.sys.netlink.attrs import NlAttrStrn12from atf_python.sys.netlink.attrs import NlAttrU1613from atf_python.sys.netlink.attrs import NlAttrU3214from atf_python.sys.netlink.utils import NlConst15from atf_python.sys.netlink.base_headers import NlmBaseFlags16from atf_python.sys.netlink.base_headers import NlmNewFlags17from atf_python.sys.netlink.base_headers import NlMsgType18from atf_python.sys.netlink.netlink_route import NlRtMsgType19from atf_python.sys.netlink.netlink_route import rtnl_ifla_attrs20from atf_python.sys.net.vnet import SingleVnetTestTemplate21from atf_python.sys.net.tools import ToolsHelper222324class TestRtNlIface(NetlinkTestTemplate, SingleVnetTestTemplate):25def setup_method(self, method):26super().setup_method(method)27self.setup_netlink(NlConst.NETLINK_ROUTE)2829def get_interface_byname(self, ifname):30msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)31msg.nl_hdr.nlmsg_flags = (32NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value33)34msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, ifname))35self.write_message(msg)36while True:37rx_msg = self.read_message()38if msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq:39if rx_msg.is_type(NlMsgType.NLMSG_ERROR):40if rx_msg.error_code != 0:41raise ValueError("unable to get interface {}".format(ifname))42elif rx_msg.is_type(NlRtMsgType.RTM_NEWLINK):43return rx_msg44else:45raise ValueError("bad message")4647def test_get_iface_byname_error(self):48"""Tests error on fetching non-existing interface name"""49msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)50msg.nl_hdr.nlmsg_flags = (51NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value52)53msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))5455rx_msg = self.get_reply(msg)56assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)57assert rx_msg.error_code == errno.ENODEV5859def test_get_iface_byindex_error(self):60"""Tests error on fetching non-existing interface index"""61msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)62msg.nl_hdr.nlmsg_flags = (63NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value64)65msg.base_hdr.ifi_index = 21474836476667rx_msg = self.get_reply(msg)68assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)69assert rx_msg.error_code == errno.ENODEV7071@pytest.mark.require_user("root")72def test_create_iface_plain(self):73"""Tests loopback creation w/o any parameters"""74flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value75msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)76msg.nl_hdr.nlmsg_flags = (77flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value78)79msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))80msg.add_nla(81NlAttrNested(82IflattrType.IFLA_LINKINFO,83[84NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),85],86)87)8889rx_msg = self.get_reply(msg)90assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)91assert rx_msg.error_code == 09293self.get_interface_byname("lo10")9495@pytest.mark.require_user("root")96def test_create_iface_plain_retvals(self):97"""Tests loopback creation w/o any parameters"""98flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value99msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)100msg.nl_hdr.nlmsg_flags = (101flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value102)103msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))104msg.add_nla(105NlAttrNested(106IflattrType.IFLA_LINKINFO,107[108NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),109],110)111)112113rx_msg = self.get_reply(msg)114assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)115assert rx_msg.error_code == 0116assert rx_msg.cookie is not None117nla_list, _ = rx_msg.parse_attrs(bytes(rx_msg.cookie)[4:], rtnl_ifla_attrs)118nla_map = {n.nla_type: n for n in nla_list}119assert IflattrType.IFLA_IFNAME.value in nla_map120assert nla_map[IflattrType.IFLA_IFNAME.value].text == "lo10"121assert IflattrType.IFLA_NEW_IFINDEX.value in nla_map122assert nla_map[IflattrType.IFLA_NEW_IFINDEX.value].u32 > 0123124lo_msg = self.get_interface_byname("lo10")125assert (126lo_msg.base_hdr.ifi_index == nla_map[IflattrType.IFLA_NEW_IFINDEX.value].u32127)128129@pytest.mark.require_user("root")130def test_create_iface_attrs(self):131"""Tests interface creation with additional properties"""132flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value133msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)134msg.nl_hdr.nlmsg_flags = (135flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value136)137msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))138msg.add_nla(139NlAttrNested(140IflattrType.IFLA_LINKINFO,141[142NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),143],144)145)146147# Custom attributes148msg.add_nla(NlAttrStr(IflattrType.IFLA_IFALIAS, "test description"))149msg.add_nla(NlAttrU32(IflattrType.IFLA_MTU, 1024))150151rx_msg = self.get_reply(msg)152assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)153assert rx_msg.error_code == 0154155iface_msg = self.get_interface_byname("lo10")156assert iface_msg.get_nla(IflattrType.IFLA_IFALIAS).text == "test description"157assert iface_msg.get_nla(IflattrType.IFLA_MTU).u32 == 1024158159@pytest.mark.require_user("root")160def test_modify_iface_attrs(self):161"""Tests interface modifications"""162flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value163msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)164msg.nl_hdr.nlmsg_flags = (165flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value166)167msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))168msg.add_nla(169NlAttrNested(170IflattrType.IFLA_LINKINFO,171[172NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),173],174)175)176177rx_msg = self.get_reply(msg)178assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)179assert rx_msg.error_code == 0180181msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)182msg.nl_hdr.nlmsg_flags = (183NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value184)185msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))186187# Custom attributes188msg.add_nla(NlAttrStr(IflattrType.IFLA_IFALIAS, "test description"))189msg.add_nla(NlAttrU32(IflattrType.IFLA_MTU, 1024))190191rx_msg = self.get_reply(msg)192assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)193assert rx_msg.error_code == 0194195iface_msg = self.get_interface_byname("lo10")196assert iface_msg.get_nla(IflattrType.IFLA_IFALIAS).text == "test description"197assert iface_msg.get_nla(IflattrType.IFLA_MTU).u32 == 1024198199@pytest.mark.require_user("root")200def test_delete_iface(self):201"""Tests interface modifications"""202flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value203msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)204msg.nl_hdr.nlmsg_flags = (205flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value206)207msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))208msg.add_nla(209NlAttrNested(210IflattrType.IFLA_LINKINFO,211[212NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),213],214)215)216217rx_msg = self.get_reply(msg)218assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)219assert rx_msg.error_code == 0220221iface_msg = self.get_interface_byname("lo10")222iface_idx = iface_msg.base_hdr.ifi_index223224msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_DELLINK.value)225msg.nl_hdr.nlmsg_flags = (226NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value227)228msg.base_hdr.ifi_index = iface_idx229# msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))230231rx_msg = self.get_reply(msg)232assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)233assert rx_msg.error_code == 0234235msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)236msg.nl_hdr.nlmsg_flags = (237NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value238)239msg.base_hdr.ifi_index = 2147483647240241rx_msg = self.get_reply(msg)242assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)243assert rx_msg.error_code == errno.ENODEV244245@pytest.mark.require_user("root")246def test_dump_ifaces_many(self):247"""Tests if interface dummp is not missing interfaces"""248249ifmap = {}250ifmap[socket.if_nametoindex("lo0")] = "lo0"251252for i in range(40):253ifname = "lo{}".format(i + 1)254flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value255msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)256msg.nl_hdr.nlmsg_flags = (257flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value258)259msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, ifname))260msg.add_nla(261NlAttrNested(262IflattrType.IFLA_LINKINFO,263[264NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),265],266)267)268269rx_msg = self.get_reply(msg)270assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)271nla_list, _ = rx_msg.parse_attrs(bytes(rx_msg.cookie)[4:], rtnl_ifla_attrs)272nla_map = {n.nla_type: n for n in nla_list}273assert nla_map[IflattrType.IFLA_IFNAME.value].text == ifname274ifindex = nla_map[IflattrType.IFLA_NEW_IFINDEX.value].u32275assert ifindex > 0276assert ifindex not in ifmap277ifmap[ifindex] = ifname278279# Dump all interfaces and check if the output matches ifmap280kernel_ifmap = {}281msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)282msg.nl_hdr.nlmsg_flags = (283NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value284)285self.write_message(msg)286while True:287rx_msg = self.read_message()288if msg.nl_hdr.nlmsg_seq != rx_msg.nl_hdr.nlmsg_seq:289raise ValueError(290"unexpected seq {}".format(rx_msg.nl_hdr.nlmsg_seq)291)292if rx_msg.is_type(NlMsgType.NLMSG_ERROR):293raise ValueError("unexpected message {}".format(rx_msg))294if rx_msg.is_type(NlMsgType.NLMSG_DONE):295break296if not rx_msg.is_type(NlRtMsgType.RTM_NEWLINK):297raise ValueError("unexpected message {}".format(rx_msg))298299ifindex = rx_msg.base_hdr.ifi_index300assert ifindex == rx_msg.base_hdr.ifi_index301ifname = rx_msg.get_nla(IflattrType.IFLA_IFNAME).text302if ifname.startswith("lo"):303kernel_ifmap[ifindex] = ifname304assert kernel_ifmap == ifmap305306#307# *308# * {len=76, type=RTM_NEWLINK, flags=NLM_F_REQUEST|NLM_F_ACK|NLM_F_EXCL|NLM_F_CREATE, seq=1662892737, pid=0},309# * {ifi_family=AF_UNSPEC, ifi_type=ARPHRD_NETROM, ifi_index=0, ifi_flags=0, ifi_change=0},310# * {{nla_len=8, nla_type=IFLA_LINK}, 2},311# * {{nla_len=12, nla_type=IFLA_IFNAME}, "xvlan22"},312# * {{nla_len=24, nla_type=IFLA_LINKINFO},313# * {{nla_len=8, nla_type=IFLA_INFO_KIND}, "vlan"...},314# * {{nla_len=12, nla_type=IFLA_INFO_DATA}, "\x06\x00\x01\x00\x16\x00\x00\x00"}315# */316@pytest.mark.require_user("root")317def test_create_vlan_plain(self):318"""Creates 802.1Q VLAN interface in vlanXX and ifX fashion"""319self.require_module("if_vlan")320os_ifname = self.vnet.iface_alias_map["if1"].name321ifindex = socket.if_nametoindex(os_ifname)322323flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value324msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)325msg.nl_hdr.nlmsg_flags = (326flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value327)328msg.base_hdr.ifi_index = ifindex329330msg.add_nla(NlAttrU32(IflattrType.IFLA_LINK, ifindex))331msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "vlan22"))332333msg.add_nla(334NlAttrNested(335IflattrType.IFLA_LINKINFO,336[337NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "vlan"),338NlAttrNested(339IflinkInfo.IFLA_INFO_DATA,340[341NlAttrU16(IfLinkInfoDataVlan.IFLA_VLAN_ID, 22),342],343),344],345)346)347348rx_msg = self.get_reply(msg)349assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)350assert rx_msg.error_code == 0351352ToolsHelper.print_net_debug()353self.get_interface_byname("vlan22")354# ToolsHelper.print_net_debug()355356357