Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/freebsd-src
Path: blob/main/tests/sys/netlink/test_rtnl_iface.py
39564 views
1
import errno
2
import socket
3
4
import pytest
5
from atf_python.sys.netlink.netlink_route import IflattrType
6
from atf_python.sys.netlink.netlink_route import IflinkInfo
7
from atf_python.sys.netlink.netlink_route import IfLinkInfoDataVlan
8
from atf_python.sys.netlink.netlink_route import NetlinkIflaMessage
9
from atf_python.sys.netlink.netlink import NetlinkTestTemplate
10
from atf_python.sys.netlink.attrs import NlAttrNested
11
from atf_python.sys.netlink.attrs import NlAttrStr
12
from atf_python.sys.netlink.attrs import NlAttrStrn
13
from atf_python.sys.netlink.attrs import NlAttrU16
14
from atf_python.sys.netlink.attrs import NlAttrU32
15
from atf_python.sys.netlink.utils import NlConst
16
from atf_python.sys.netlink.base_headers import NlmBaseFlags
17
from atf_python.sys.netlink.base_headers import NlmNewFlags
18
from atf_python.sys.netlink.base_headers import NlMsgType
19
from atf_python.sys.netlink.netlink_route import NlRtMsgType
20
from atf_python.sys.netlink.netlink_route import rtnl_ifla_attrs
21
from atf_python.sys.net.vnet import SingleVnetTestTemplate
22
from atf_python.sys.net.tools import ToolsHelper
23
24
25
class TestRtNlIface(NetlinkTestTemplate, SingleVnetTestTemplate):
26
def setup_method(self, method):
27
super().setup_method(method)
28
self.setup_netlink(NlConst.NETLINK_ROUTE)
29
30
def get_interface_byname(self, ifname):
31
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
32
msg.nl_hdr.nlmsg_flags = (
33
NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
34
)
35
msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, ifname))
36
self.write_message(msg)
37
while True:
38
rx_msg = self.read_message()
39
if msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq:
40
if rx_msg.is_type(NlMsgType.NLMSG_ERROR):
41
if rx_msg.error_code != 0:
42
raise ValueError("unable to get interface {}".format(ifname))
43
elif rx_msg.is_type(NlRtMsgType.RTM_NEWLINK):
44
return rx_msg
45
else:
46
raise ValueError("bad message")
47
48
def test_get_iface_byname_error(self):
49
"""Tests error on fetching non-existing interface name"""
50
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
51
msg.nl_hdr.nlmsg_flags = (
52
NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
53
)
54
msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
55
56
rx_msg = self.get_reply(msg)
57
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
58
assert rx_msg.error_code == errno.ENODEV
59
60
def test_get_iface_byindex_error(self):
61
"""Tests error on fetching non-existing interface index"""
62
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
63
msg.nl_hdr.nlmsg_flags = (
64
NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
65
)
66
msg.base_hdr.ifi_index = 2147483647
67
68
rx_msg = self.get_reply(msg)
69
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
70
assert rx_msg.error_code == errno.ENODEV
71
72
@pytest.mark.require_user("root")
73
def test_create_iface_plain(self):
74
"""Tests loopback creation w/o any parameters"""
75
flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
76
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
77
msg.nl_hdr.nlmsg_flags = (
78
flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
79
)
80
msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
81
msg.add_nla(
82
NlAttrNested(
83
IflattrType.IFLA_LINKINFO,
84
[
85
NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
86
],
87
)
88
)
89
90
rx_msg = self.get_reply(msg)
91
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
92
assert rx_msg.error_code == 0
93
94
self.get_interface_byname("lo10")
95
96
@pytest.mark.require_user("root")
97
def test_create_iface_plain_retvals(self):
98
"""Tests loopback creation w/o any parameters"""
99
flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
100
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
101
msg.nl_hdr.nlmsg_flags = (
102
flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
103
)
104
msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
105
msg.add_nla(
106
NlAttrNested(
107
IflattrType.IFLA_LINKINFO,
108
[
109
NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
110
],
111
)
112
)
113
114
rx_msg = self.get_reply(msg)
115
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
116
assert rx_msg.error_code == 0
117
assert rx_msg.cookie is not None
118
nla_list, _ = rx_msg.parse_attrs(bytes(rx_msg.cookie)[4:], rtnl_ifla_attrs)
119
nla_map = {n.nla_type: n for n in nla_list}
120
assert IflattrType.IFLA_IFNAME.value in nla_map
121
assert nla_map[IflattrType.IFLA_IFNAME.value].text == "lo10"
122
assert IflattrType.IFLA_NEW_IFINDEX.value in nla_map
123
assert nla_map[IflattrType.IFLA_NEW_IFINDEX.value].u32 > 0
124
125
lo_msg = self.get_interface_byname("lo10")
126
assert (
127
lo_msg.base_hdr.ifi_index == nla_map[IflattrType.IFLA_NEW_IFINDEX.value].u32
128
)
129
130
@pytest.mark.require_user("root")
131
def test_create_iface_attrs(self):
132
"""Tests interface creation with additional properties"""
133
flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
134
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
135
msg.nl_hdr.nlmsg_flags = (
136
flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
137
)
138
msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
139
msg.add_nla(
140
NlAttrNested(
141
IflattrType.IFLA_LINKINFO,
142
[
143
NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
144
],
145
)
146
)
147
148
# Custom attributes
149
msg.add_nla(NlAttrStr(IflattrType.IFLA_IFALIAS, "test description"))
150
msg.add_nla(NlAttrU32(IflattrType.IFLA_MTU, 1024))
151
152
rx_msg = self.get_reply(msg)
153
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
154
assert rx_msg.error_code == 0
155
156
iface_msg = self.get_interface_byname("lo10")
157
assert iface_msg.get_nla(IflattrType.IFLA_IFALIAS).text == "test description"
158
assert iface_msg.get_nla(IflattrType.IFLA_MTU).u32 == 1024
159
160
@pytest.mark.require_user("root")
161
def test_modify_iface_attrs(self):
162
"""Tests interface modifications"""
163
flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
164
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
165
msg.nl_hdr.nlmsg_flags = (
166
flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
167
)
168
msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
169
msg.add_nla(
170
NlAttrNested(
171
IflattrType.IFLA_LINKINFO,
172
[
173
NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
174
],
175
)
176
)
177
178
rx_msg = self.get_reply(msg)
179
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
180
assert rx_msg.error_code == 0
181
182
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
183
msg.nl_hdr.nlmsg_flags = (
184
NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
185
)
186
msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
187
188
# Custom attributes
189
msg.add_nla(NlAttrStr(IflattrType.IFLA_IFALIAS, "test description"))
190
msg.add_nla(NlAttrU32(IflattrType.IFLA_MTU, 1024))
191
192
rx_msg = self.get_reply(msg)
193
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
194
assert rx_msg.error_code == 0
195
196
iface_msg = self.get_interface_byname("lo10")
197
assert iface_msg.get_nla(IflattrType.IFLA_IFALIAS).text == "test description"
198
assert iface_msg.get_nla(IflattrType.IFLA_MTU).u32 == 1024
199
200
@pytest.mark.require_user("root")
201
def test_delete_iface(self):
202
"""Tests interface modifications"""
203
flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
204
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
205
msg.nl_hdr.nlmsg_flags = (
206
flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
207
)
208
msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
209
msg.add_nla(
210
NlAttrNested(
211
IflattrType.IFLA_LINKINFO,
212
[
213
NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
214
],
215
)
216
)
217
218
rx_msg = self.get_reply(msg)
219
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
220
assert rx_msg.error_code == 0
221
222
iface_msg = self.get_interface_byname("lo10")
223
iface_idx = iface_msg.base_hdr.ifi_index
224
225
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_DELLINK.value)
226
msg.nl_hdr.nlmsg_flags = (
227
NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
228
)
229
msg.base_hdr.ifi_index = iface_idx
230
# msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10"))
231
232
rx_msg = self.get_reply(msg)
233
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
234
assert rx_msg.error_code == 0
235
236
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
237
msg.nl_hdr.nlmsg_flags = (
238
NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
239
)
240
msg.base_hdr.ifi_index = 2147483647
241
242
rx_msg = self.get_reply(msg)
243
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
244
assert rx_msg.error_code == errno.ENODEV
245
246
@pytest.mark.require_user("root")
247
def test_dump_ifaces_many(self):
248
"""Tests if interface dummp is not missing interfaces"""
249
250
ifmap = {}
251
ifmap[socket.if_nametoindex("lo0")] = "lo0"
252
253
for i in range(40):
254
ifname = "lo{}".format(i + 1)
255
flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
256
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
257
msg.nl_hdr.nlmsg_flags = (
258
flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
259
)
260
msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, ifname))
261
msg.add_nla(
262
NlAttrNested(
263
IflattrType.IFLA_LINKINFO,
264
[
265
NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"),
266
],
267
)
268
)
269
270
rx_msg = self.get_reply(msg)
271
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
272
nla_list, _ = rx_msg.parse_attrs(bytes(rx_msg.cookie)[4:], rtnl_ifla_attrs)
273
nla_map = {n.nla_type: n for n in nla_list}
274
assert nla_map[IflattrType.IFLA_IFNAME.value].text == ifname
275
ifindex = nla_map[IflattrType.IFLA_NEW_IFINDEX.value].u32
276
assert ifindex > 0
277
assert ifindex not in ifmap
278
ifmap[ifindex] = ifname
279
280
# Dump all interfaces and check if the output matches ifmap
281
kernel_ifmap = {}
282
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value)
283
msg.nl_hdr.nlmsg_flags = (
284
NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
285
)
286
self.write_message(msg)
287
while True:
288
rx_msg = self.read_message()
289
if msg.nl_hdr.nlmsg_seq != rx_msg.nl_hdr.nlmsg_seq:
290
raise ValueError(
291
"unexpected seq {}".format(rx_msg.nl_hdr.nlmsg_seq)
292
)
293
if rx_msg.is_type(NlMsgType.NLMSG_ERROR):
294
raise ValueError("unexpected message {}".format(rx_msg))
295
if rx_msg.is_type(NlMsgType.NLMSG_DONE):
296
break
297
if not rx_msg.is_type(NlRtMsgType.RTM_NEWLINK):
298
raise ValueError("unexpected message {}".format(rx_msg))
299
300
ifindex = rx_msg.base_hdr.ifi_index
301
assert ifindex == rx_msg.base_hdr.ifi_index
302
ifname = rx_msg.get_nla(IflattrType.IFLA_IFNAME).text
303
if ifname.startswith("lo"):
304
kernel_ifmap[ifindex] = ifname
305
assert kernel_ifmap == ifmap
306
307
#
308
# *
309
# * {len=76, type=RTM_NEWLINK, flags=NLM_F_REQUEST|NLM_F_ACK|NLM_F_EXCL|NLM_F_CREATE, seq=1662892737, pid=0},
310
# * {ifi_family=AF_UNSPEC, ifi_type=ARPHRD_NETROM, ifi_index=0, ifi_flags=0, ifi_change=0},
311
# * {{nla_len=8, nla_type=IFLA_LINK}, 2},
312
# * {{nla_len=12, nla_type=IFLA_IFNAME}, "xvlan22"},
313
# * {{nla_len=24, nla_type=IFLA_LINKINFO},
314
# * {{nla_len=8, nla_type=IFLA_INFO_KIND}, "vlan"...},
315
# * {{nla_len=12, nla_type=IFLA_INFO_DATA}, "\x06\x00\x01\x00\x16\x00\x00\x00"}
316
# */
317
@pytest.mark.require_user("root")
318
def test_create_vlan_plain(self):
319
"""Creates 802.1Q VLAN interface in vlanXX and ifX fashion"""
320
self.require_module("if_vlan")
321
os_ifname = self.vnet.iface_alias_map["if1"].name
322
ifindex = socket.if_nametoindex(os_ifname)
323
324
flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
325
msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value)
326
msg.nl_hdr.nlmsg_flags = (
327
flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value
328
)
329
msg.base_hdr.ifi_index = ifindex
330
331
msg.add_nla(NlAttrU32(IflattrType.IFLA_LINK, ifindex))
332
msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "vlan22"))
333
334
msg.add_nla(
335
NlAttrNested(
336
IflattrType.IFLA_LINKINFO,
337
[
338
NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "vlan"),
339
NlAttrNested(
340
IflinkInfo.IFLA_INFO_DATA,
341
[
342
NlAttrU16(IfLinkInfoDataVlan.IFLA_VLAN_ID, 22),
343
],
344
),
345
],
346
)
347
)
348
349
rx_msg = self.get_reply(msg)
350
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
351
assert rx_msg.error_code == 0
352
353
ToolsHelper.print_net_debug()
354
self.get_interface_byname("vlan22")
355
# ToolsHelper.print_net_debug()
356
357