Path: blob/main/tests/sys/netlink/test_rtnl_route.py
39536 views
import ipaddress1import socket23import pytest4from atf_python.sys.net.tools import ToolsHelper5from atf_python.sys.net.vnet import IfaceFactory6from atf_python.sys.net.vnet import SingleVnetTestTemplate7from atf_python.sys.netlink.attrs import NlAttrIp8from atf_python.sys.netlink.attrs import NlAttrU329from atf_python.sys.netlink.base_headers import NlmBaseFlags10from atf_python.sys.netlink.base_headers import NlmGetFlags11from atf_python.sys.netlink.base_headers import NlmNewFlags12from atf_python.sys.netlink.base_headers import NlMsgType13from atf_python.sys.netlink.netlink import NetlinkTestTemplate14from atf_python.sys.netlink.netlink_route import NetlinkRtMessage15from atf_python.sys.netlink.netlink_route import NlRtMsgType16from atf_python.sys.netlink.netlink_route import RtattrType17from atf_python.sys.netlink.utils import NlConst181920class TestRtNlRoute(NetlinkTestTemplate, SingleVnetTestTemplate):21IPV6_PREFIXES = ["2001:db8::1/64"]2223def setup_method(self, method):24super().setup_method(method)25self.setup_netlink(NlConst.NETLINK_ROUTE)2627@pytest.mark.timeout(5)28def test_add_route6_ll_gw(self):29epair_ifname = self.vnet.iface_alias_map["if1"].name30epair_ifindex = socket.if_nametoindex(epair_ifname)3132msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_NEWROUTE)33msg.set_request()34msg.add_nlflags([NlmNewFlags.NLM_F_CREATE])35msg.base_hdr.rtm_family = socket.AF_INET636msg.base_hdr.rtm_dst_len = 6437msg.add_nla(NlAttrIp(RtattrType.RTA_DST, "2001:db8:2::"))38msg.add_nla(NlAttrIp(RtattrType.RTA_GATEWAY, "fe80::1"))39msg.add_nla(NlAttrU32(RtattrType.RTA_OIF, epair_ifindex))4041rx_msg = self.get_reply(msg)42assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)43assert rx_msg.error_code == 04445ToolsHelper.print_net_debug()46ToolsHelper.print_output("netstat -6onW")4748@pytest.mark.timeout(5)49def test_add_route6_ll_if_gw(self):50self.require_module("if_tun")51tun_ifname = IfaceFactory().create_iface("", "tun")[0].name52tun_ifindex = socket.if_nametoindex(tun_ifname)5354msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_NEWROUTE)55msg.set_request()56msg.add_nlflags([NlmNewFlags.NLM_F_CREATE])57msg.base_hdr.rtm_family = socket.AF_INET658msg.base_hdr.rtm_dst_len = 6459msg.add_nla(NlAttrIp(RtattrType.RTA_DST, "2001:db8:2::"))60msg.add_nla(NlAttrU32(RtattrType.RTA_OIF, tun_ifindex))6162rx_msg = self.get_reply(msg)63assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)64assert rx_msg.error_code == 06566ToolsHelper.print_net_debug()67ToolsHelper.print_output("netstat -6onW")6869@pytest.mark.timeout(5)70def test_add_route4_ll_if_gw(self):71self.require_module("if_tun")72tun_ifname = IfaceFactory().create_iface("", "tun")[0].name73tun_ifindex = socket.if_nametoindex(tun_ifname)7475msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_NEWROUTE)76msg.set_request()77msg.add_nlflags([NlmNewFlags.NLM_F_CREATE])78msg.base_hdr.rtm_family = socket.AF_INET79msg.base_hdr.rtm_dst_len = 3280msg.add_nla(NlAttrIp(RtattrType.RTA_DST, "192.0.2.1"))81msg.add_nla(NlAttrU32(RtattrType.RTA_OIF, tun_ifindex))8283rx_msg = self.get_reply(msg)84assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)85assert rx_msg.error_code == 08687ToolsHelper.print_net_debug()88ToolsHelper.print_output("netstat -4onW")8990@pytest.mark.timeout(20)91def test_buffer_override(self):92msg_flags = (93NlmBaseFlags.NLM_F_ACK.value94| NlmBaseFlags.NLM_F_REQUEST.value95| NlmNewFlags.NLM_F_CREATE.value96)9798num_routes = 100099base_address = bytearray(ipaddress.ip_address("2001:db8:ffff::").packed)100for i in range(num_routes):101base_address[7] = i % 256102base_address[6] = i // 256103prefix_address = ipaddress.IPv6Address(bytes(base_address))104105msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_NEWROUTE.value)106msg.nl_hdr.nlmsg_flags = msg_flags107msg.base_hdr.rtm_family = socket.AF_INET6108msg.base_hdr.rtm_dst_len = 65109msg.add_nla(NlAttrIp(RtattrType.RTA_DST, str(prefix_address)))110msg.add_nla(NlAttrIp(RtattrType.RTA_GATEWAY, "2001:db8::2"))111112self.write_message(msg, silent=True)113rx_msg = self.read_message(silent=True)114assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)115assert msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq116assert rx_msg.error_code == 0117# Now, dump118msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_GETROUTE.value)119msg.nl_hdr.nlmsg_flags = (120NlmBaseFlags.NLM_F_ACK.value121| NlmBaseFlags.NLM_F_REQUEST.value122| NlmGetFlags.NLM_F_ROOT.value123| NlmGetFlags.NLM_F_MATCH.value124)125msg.base_hdr.rtm_family = socket.AF_INET6126self.write_message(msg)127num_received = 0128while True:129rx_msg = self.read_message(silent=True)130if msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq:131if rx_msg.is_type(NlMsgType.NLMSG_ERROR):132if rx_msg.error_code != 0:133raise ValueError(134"unable to dump routes: error {}".format(rx_msg.error_code)135)136if rx_msg.is_type(NlMsgType.NLMSG_DONE):137break138if rx_msg.is_type(NlRtMsgType.RTM_NEWROUTE):139if rx_msg.base_hdr.rtm_dst_len == 65:140num_received += 1141assert num_routes == num_received142143144