Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/freebsd-src
Path: blob/main/tests/sys/netlink/test_rtnl_route.py
39536 views
1
import ipaddress
2
import socket
3
4
import pytest
5
from atf_python.sys.net.tools import ToolsHelper
6
from atf_python.sys.net.vnet import IfaceFactory
7
from atf_python.sys.net.vnet import SingleVnetTestTemplate
8
from atf_python.sys.netlink.attrs import NlAttrIp
9
from atf_python.sys.netlink.attrs import NlAttrU32
10
from atf_python.sys.netlink.base_headers import NlmBaseFlags
11
from atf_python.sys.netlink.base_headers import NlmGetFlags
12
from atf_python.sys.netlink.base_headers import NlmNewFlags
13
from atf_python.sys.netlink.base_headers import NlMsgType
14
from atf_python.sys.netlink.netlink import NetlinkTestTemplate
15
from atf_python.sys.netlink.netlink_route import NetlinkRtMessage
16
from atf_python.sys.netlink.netlink_route import NlRtMsgType
17
from atf_python.sys.netlink.netlink_route import RtattrType
18
from atf_python.sys.netlink.utils import NlConst
19
20
21
class TestRtNlRoute(NetlinkTestTemplate, SingleVnetTestTemplate):
22
IPV6_PREFIXES = ["2001:db8::1/64"]
23
24
def setup_method(self, method):
25
super().setup_method(method)
26
self.setup_netlink(NlConst.NETLINK_ROUTE)
27
28
@pytest.mark.timeout(5)
29
def test_add_route6_ll_gw(self):
30
epair_ifname = self.vnet.iface_alias_map["if1"].name
31
epair_ifindex = socket.if_nametoindex(epair_ifname)
32
33
msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_NEWROUTE)
34
msg.set_request()
35
msg.add_nlflags([NlmNewFlags.NLM_F_CREATE])
36
msg.base_hdr.rtm_family = socket.AF_INET6
37
msg.base_hdr.rtm_dst_len = 64
38
msg.add_nla(NlAttrIp(RtattrType.RTA_DST, "2001:db8:2::"))
39
msg.add_nla(NlAttrIp(RtattrType.RTA_GATEWAY, "fe80::1"))
40
msg.add_nla(NlAttrU32(RtattrType.RTA_OIF, epair_ifindex))
41
42
rx_msg = self.get_reply(msg)
43
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
44
assert rx_msg.error_code == 0
45
46
ToolsHelper.print_net_debug()
47
ToolsHelper.print_output("netstat -6onW")
48
49
@pytest.mark.timeout(5)
50
def test_add_route6_ll_if_gw(self):
51
self.require_module("if_tun")
52
tun_ifname = IfaceFactory().create_iface("", "tun")[0].name
53
tun_ifindex = socket.if_nametoindex(tun_ifname)
54
55
msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_NEWROUTE)
56
msg.set_request()
57
msg.add_nlflags([NlmNewFlags.NLM_F_CREATE])
58
msg.base_hdr.rtm_family = socket.AF_INET6
59
msg.base_hdr.rtm_dst_len = 64
60
msg.add_nla(NlAttrIp(RtattrType.RTA_DST, "2001:db8:2::"))
61
msg.add_nla(NlAttrU32(RtattrType.RTA_OIF, tun_ifindex))
62
63
rx_msg = self.get_reply(msg)
64
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
65
assert rx_msg.error_code == 0
66
67
ToolsHelper.print_net_debug()
68
ToolsHelper.print_output("netstat -6onW")
69
70
@pytest.mark.timeout(5)
71
def test_add_route4_ll_if_gw(self):
72
self.require_module("if_tun")
73
tun_ifname = IfaceFactory().create_iface("", "tun")[0].name
74
tun_ifindex = socket.if_nametoindex(tun_ifname)
75
76
msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_NEWROUTE)
77
msg.set_request()
78
msg.add_nlflags([NlmNewFlags.NLM_F_CREATE])
79
msg.base_hdr.rtm_family = socket.AF_INET
80
msg.base_hdr.rtm_dst_len = 32
81
msg.add_nla(NlAttrIp(RtattrType.RTA_DST, "192.0.2.1"))
82
msg.add_nla(NlAttrU32(RtattrType.RTA_OIF, tun_ifindex))
83
84
rx_msg = self.get_reply(msg)
85
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
86
assert rx_msg.error_code == 0
87
88
ToolsHelper.print_net_debug()
89
ToolsHelper.print_output("netstat -4onW")
90
91
@pytest.mark.timeout(20)
92
def test_buffer_override(self):
93
msg_flags = (
94
NlmBaseFlags.NLM_F_ACK.value
95
| NlmBaseFlags.NLM_F_REQUEST.value
96
| NlmNewFlags.NLM_F_CREATE.value
97
)
98
99
num_routes = 1000
100
base_address = bytearray(ipaddress.ip_address("2001:db8:ffff::").packed)
101
for i in range(num_routes):
102
base_address[7] = i % 256
103
base_address[6] = i // 256
104
prefix_address = ipaddress.IPv6Address(bytes(base_address))
105
106
msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_NEWROUTE.value)
107
msg.nl_hdr.nlmsg_flags = msg_flags
108
msg.base_hdr.rtm_family = socket.AF_INET6
109
msg.base_hdr.rtm_dst_len = 65
110
msg.add_nla(NlAttrIp(RtattrType.RTA_DST, str(prefix_address)))
111
msg.add_nla(NlAttrIp(RtattrType.RTA_GATEWAY, "2001:db8::2"))
112
113
self.write_message(msg, silent=True)
114
rx_msg = self.read_message(silent=True)
115
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
116
assert msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq
117
assert rx_msg.error_code == 0
118
# Now, dump
119
msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_GETROUTE.value)
120
msg.nl_hdr.nlmsg_flags = (
121
NlmBaseFlags.NLM_F_ACK.value
122
| NlmBaseFlags.NLM_F_REQUEST.value
123
| NlmGetFlags.NLM_F_ROOT.value
124
| NlmGetFlags.NLM_F_MATCH.value
125
)
126
msg.base_hdr.rtm_family = socket.AF_INET6
127
self.write_message(msg)
128
num_received = 0
129
while True:
130
rx_msg = self.read_message(silent=True)
131
if msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq:
132
if rx_msg.is_type(NlMsgType.NLMSG_ERROR):
133
if rx_msg.error_code != 0:
134
raise ValueError(
135
"unable to dump routes: error {}".format(rx_msg.error_code)
136
)
137
if rx_msg.is_type(NlMsgType.NLMSG_DONE):
138
break
139
if rx_msg.is_type(NlRtMsgType.RTM_NEWROUTE):
140
if rx_msg.base_hdr.rtm_dst_len == 65:
141
num_received += 1
142
assert num_routes == num_received
143
144