Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/freebsd-src
Path: blob/main/tests/sys/netlink/test_rtnl_ifaddr.py
39483 views
1
import ipaddress
2
import socket
3
import struct
4
5
import pytest
6
from atf_python.sys.net.vnet import SingleVnetTestTemplate
7
from atf_python.sys.netlink.attrs import NlAttr
8
from atf_python.sys.netlink.attrs import NlAttrIp
9
from atf_python.sys.netlink.attrs import NlAttrNested
10
from atf_python.sys.netlink.attrs import NlAttrU32
11
from atf_python.sys.netlink.base_headers import NlmBaseFlags
12
from atf_python.sys.netlink.base_headers import NlmNewFlags
13
from atf_python.sys.netlink.base_headers import Nlmsghdr
14
from atf_python.sys.netlink.message import NlMsgType
15
from atf_python.sys.netlink.netlink import NetlinkTestTemplate
16
from atf_python.sys.netlink.netlink import Nlsock
17
from atf_python.sys.netlink.netlink_generic import CarpAttrType
18
from atf_python.sys.netlink.netlink_generic import CarpGenMessage
19
from atf_python.sys.netlink.netlink_generic import CarpMsgType
20
from atf_python.sys.netlink.netlink_route import IfaAttrType
21
from atf_python.sys.netlink.netlink_route import IfaCacheInfo
22
from atf_python.sys.netlink.netlink_route import IfafAttrType
23
from atf_python.sys.netlink.netlink_route import IfafFlags6
24
from atf_python.sys.netlink.netlink_route import IfaFlags
25
from atf_python.sys.netlink.netlink_route import NetlinkIfaMessage
26
from atf_python.sys.netlink.netlink_route import NlRtMsgType
27
from atf_python.sys.netlink.netlink_route import RtScope
28
from atf_python.sys.netlink.utils import enum_or_int
29
from atf_python.sys.netlink.utils import NlConst
30
31
32
class TestRtNlIfaddrList(NetlinkTestTemplate, SingleVnetTestTemplate):
33
def setup_method(self, method):
34
method_name = method.__name__
35
if "4" in method_name:
36
if "nofilter" in method_name:
37
self.IPV4_PREFIXES = ["192.0.2.1/24", "169.254.169.254/16"]
38
else:
39
self.IPV4_PREFIXES = ["192.0.2.1/24"]
40
if "6" in method_name:
41
self.IPV6_PREFIXES = ["2001:db8::1/64"]
42
super().setup_method(method)
43
self.setup_netlink(NlConst.NETLINK_ROUTE)
44
45
def test_46_nofilter(self):
46
"""Tests that listing outputs both IPv4/IPv6 and interfaces"""
47
msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)
48
msg.set_request()
49
self.write_message(msg)
50
51
ret = []
52
for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):
53
ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index)
54
family = rx_msg.base_hdr.ifa_family
55
scope = rx_msg.base_hdr.ifa_scope
56
ret.append((ifname, family, scope))
57
58
ifname = "lo0"
59
assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET and r[2] == RtScope.RT_SCOPE_HOST.value]) == 1
60
assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6 and r[2] == RtScope.RT_SCOPE_HOST.value]) == 1
61
assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6 and r[2] == RtScope.RT_SCOPE_LINK.value]) == 1
62
assert len([r for r in ret if r[0] == ifname]) == 3
63
64
ifname = self.vnet.iface_alias_map["if1"].name
65
assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET and r[2] == RtScope.RT_SCOPE_LINK.value]) == 1
66
assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET and r[2] == RtScope.RT_SCOPE_UNIVERSE.value]) == 1
67
assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6 and r[2] == RtScope.RT_SCOPE_LINK.value]) == 1
68
assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6 and r[2] == RtScope.RT_SCOPE_UNIVERSE.value]) == 1
69
assert len([r for r in ret if r[0] == ifname]) == 4
70
71
def test_46_filter_iface(self):
72
"""Tests that listing outputs both IPv4/IPv6 for the specific interface"""
73
epair_ifname = self.vnet.iface_alias_map["if1"].name
74
75
msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)
76
msg.set_request()
77
msg.base_hdr.ifa_index = socket.if_nametoindex(epair_ifname)
78
self.write_message(msg)
79
80
ret = []
81
for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):
82
ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index)
83
family = rx_msg.base_hdr.ifa_family
84
ret.append((ifname, family, rx_msg))
85
86
ifname = epair_ifname
87
assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET]) == 1
88
assert len([r for r in ret if r[0] == ifname and r[1] == socket.AF_INET6]) == 2
89
assert len(ret) == 3
90
91
def test_46_filter_family_compat(self):
92
"""Tests that family filtering works with the stripped header"""
93
94
hdr = Nlmsghdr(
95
nlmsg_len=17,
96
nlmsg_type=NlRtMsgType.RTM_GETADDR.value,
97
nlmsg_flags=NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value,
98
nlmsg_seq=self.helper.get_seq(),
99
)
100
data = bytes(hdr) + struct.pack("@B", socket.AF_INET)
101
self.nlsock.write_data(data)
102
103
ret = []
104
for rx_msg in self.read_msg_list(hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):
105
ifname = socket.if_indextoname(rx_msg.base_hdr.ifa_index)
106
family = rx_msg.base_hdr.ifa_family
107
ret.append((ifname, family, rx_msg))
108
assert len(ret) == 2
109
110
def filter_iface_family(self, family, num_items):
111
"""Tests that listing outputs IPv4 for the specific interface"""
112
epair_ifname = self.vnet.iface_alias_map["if1"].name
113
114
msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)
115
msg.set_request()
116
msg.base_hdr.ifa_family = family
117
msg.base_hdr.ifa_index = socket.if_nametoindex(epair_ifname)
118
self.write_message(msg)
119
120
ret = []
121
for rx_msg in self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR):
122
assert family == rx_msg.base_hdr.ifa_family
123
assert epair_ifname == socket.if_indextoname(rx_msg.base_hdr.ifa_index)
124
ret.append(rx_msg)
125
assert len(ret) == num_items
126
return ret
127
128
def test_4_broadcast(self):
129
"""Tests header/attr output for listing IPv4 ifas on broadcast iface"""
130
ret = self.filter_iface_family(socket.AF_INET, 1)
131
# Should be 192.0.2.1/24
132
msg = ret[0]
133
# Family and ifindex has been checked already
134
assert msg.base_hdr.ifa_prefixlen == 24
135
# Ignore IFA_FLAGS for now
136
assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
137
138
assert msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == "192.0.2.1"
139
assert msg.get_nla(IfaAttrType.IFA_LOCAL).addr == "192.0.2.1"
140
assert msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == "192.0.2.255"
141
142
epair_ifname = self.vnet.iface_alias_map["if1"].name
143
assert msg.get_nla(IfaAttrType.IFA_LABEL).text == epair_ifname
144
145
def test_6_broadcast(self):
146
"""Tests header/attr output for listing IPv6 ifas on broadcast iface"""
147
ret = self.filter_iface_family(socket.AF_INET6, 2)
148
# Should be 192.0.2.1/24
149
if ret[0].base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value:
150
(gmsg, lmsg) = ret
151
else:
152
(lmsg, gmsg) = ret
153
# Start with global ( 2001:db8::1/64 )
154
msg = gmsg
155
# Family and ifindex has been checked already
156
assert msg.base_hdr.ifa_prefixlen == 64
157
# Ignore IFA_FLAGS for now
158
assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
159
160
assert msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == "2001:db8::1"
161
assert msg.get_nla(IfaAttrType.IFA_LOCAL) is None
162
assert msg.get_nla(IfaAttrType.IFA_BROADCAST) is None
163
164
epair_ifname = self.vnet.iface_alias_map["if1"].name
165
assert msg.get_nla(IfaAttrType.IFA_LABEL).text == epair_ifname
166
167
# Local: fe80::/64
168
msg = lmsg
169
assert msg.base_hdr.ifa_prefixlen == 64
170
# Ignore IFA_FLAGS for now
171
assert msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_LINK.value
172
173
addr = ipaddress.ip_address(msg.get_nla(IfaAttrType.IFA_ADDRESS).addr)
174
assert addr.is_link_local
175
# Verify that ifindex is not emmbedded
176
assert struct.unpack("!H", addr.packed[2:4])[0] == 0
177
assert msg.get_nla(IfaAttrType.IFA_LOCAL) is None
178
assert msg.get_nla(IfaAttrType.IFA_BROADCAST) is None
179
180
epair_ifname = self.vnet.iface_alias_map["if1"].name
181
assert msg.get_nla(IfaAttrType.IFA_LABEL).text == epair_ifname
182
183
184
class RtnlIfaOps(NetlinkTestTemplate, SingleVnetTestTemplate):
185
def setup_method(self, method):
186
super().setup_method(method)
187
self.setup_netlink(NlConst.NETLINK_ROUTE)
188
189
def send_check_success(self, msg):
190
rx_msg = self.get_reply(msg)
191
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
192
assert rx_msg.error_code == 0
193
194
@staticmethod
195
def get_family_from_ip(ip):
196
if ip.version == 4:
197
return socket.AF_INET
198
return socket.AF_INET6
199
200
def create_msg(self, ifa):
201
iface = self.vnet.iface_alias_map["if1"]
202
203
msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_NEWADDR.value)
204
msg.set_request()
205
msg.nl_hdr.nlmsg_flags |= (
206
NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
207
)
208
msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip)
209
msg.base_hdr.ifa_index = iface.ifindex
210
msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen
211
return msg
212
213
def get_ifa_list(self, ifindex=0, family=0):
214
msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value)
215
msg.set_request()
216
msg.base_hdr.ifa_family = family
217
msg.base_hdr.ifa_index = ifindex
218
self.write_message(msg)
219
return self.read_msg_list(msg.nl_hdr.nlmsg_seq, NlRtMsgType.RTM_NEWADDR)
220
221
def find_msg_by_ifa(self, msg_list, ip):
222
for msg in msg_list:
223
if msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ip):
224
return msg
225
return None
226
227
def setup_dummy_carp(self, ifindex: int, vhid: int):
228
self.require_module("carp")
229
230
nlsock = Nlsock(NlConst.NETLINK_GENERIC, self.helper)
231
family_id = nlsock.get_genl_family_id("carp")
232
233
msg = CarpGenMessage(self.helper, family_id, CarpMsgType.CARP_NL_CMD_SET)
234
msg.set_request()
235
msg.add_nla(NlAttrU32(CarpAttrType.CARP_NL_VHID, vhid))
236
msg.add_nla(NlAttrU32(CarpAttrType.CARP_NL_IFINDEX, ifindex))
237
rx_msg = nlsock.get_reply(msg)
238
239
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
240
assert rx_msg.error_code == 0
241
242
243
class TestRtNlIfaddrOpsBroadcast(RtnlIfaOps):
244
def test_add_4(self):
245
"""Tests IPv4 address addition to the standard broadcast interface"""
246
ifa = ipaddress.ip_interface("192.0.2.1/24")
247
ifa_brd = ifa.network.broadcast_address
248
iface = self.vnet.iface_alias_map["if1"]
249
250
msg = self.create_msg(ifa)
251
msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
252
msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))
253
254
self.send_check_success(msg)
255
256
lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
257
assert len(lst) == 1
258
rx_msg = lst[0]
259
260
assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen
261
assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
262
263
assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
264
assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)
265
assert rx_msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == str(ifa_brd)
266
267
@pytest.mark.parametrize(
268
"brd",
269
[
270
pytest.param((32, True, "192.0.2.1"), id="auto_32"),
271
pytest.param((31, True, "255.255.255.255"), id="auto_31"),
272
pytest.param((30, True, "192.0.2.3"), id="auto_30"),
273
pytest.param((30, False, "192.0.2.2"), id="custom_30"),
274
pytest.param((24, False, "192.0.2.7"), id="custom_24"),
275
],
276
)
277
def test_add_4_brd(self, brd):
278
"""Tests proper broadcast setup when adding IPv4 ifa"""
279
plen, auto_brd, ifa_brd_str = brd
280
ifa = ipaddress.ip_interface("192.0.2.1/{}".format(plen))
281
iface = self.vnet.iface_alias_map["if1"]
282
ifa_brd = ipaddress.ip_address(ifa_brd_str)
283
284
msg = self.create_msg(ifa)
285
msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
286
if not auto_brd:
287
msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))
288
289
self.send_check_success(msg)
290
291
lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
292
assert len(lst) == 1
293
rx_msg = lst[0]
294
295
assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen
296
assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
297
298
assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
299
assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)
300
assert rx_msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == str(ifa_brd)
301
302
def test_add_6(self):
303
ifa = ipaddress.ip_interface("2001:db8::1/64")
304
iface = self.vnet.iface_alias_map["if1"]
305
306
msg = self.create_msg(ifa)
307
msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
308
309
self.send_check_success(msg)
310
311
lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
312
assert len(lst) == 2
313
rx_msg_gu = self.find_msg_by_ifa(lst, ifa.ip)
314
assert rx_msg_gu is not None
315
316
assert rx_msg_gu.base_hdr.ifa_prefixlen == ifa.network.prefixlen
317
assert rx_msg_gu.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
318
assert rx_msg_gu.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
319
320
def test_add_4_carp(self):
321
ifa = ipaddress.ip_interface("192.0.2.1/24")
322
ifa_brd = ifa.network.broadcast_address
323
iface = self.vnet.iface_alias_map["if1"]
324
vhid = 77
325
326
self.setup_dummy_carp(iface.ifindex, vhid)
327
328
msg = self.create_msg(ifa)
329
msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
330
msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))
331
attrs_bsd = [NlAttrU32(IfafAttrType.IFAF_VHID, vhid)]
332
msg.add_nla(NlAttrNested(IfaAttrType.IFA_FREEBSD, attrs_bsd))
333
334
self.send_check_success(msg)
335
336
lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
337
assert len(lst) == 1
338
rx_msg = lst[0]
339
340
assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen
341
assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
342
343
assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
344
assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)
345
assert rx_msg.get_nla(IfaAttrType.IFA_BROADCAST).addr == str(ifa_brd)
346
ifa_bsd = rx_msg.get_nla(IfaAttrType.IFA_FREEBSD)
347
assert ifa_bsd.get_nla(IfafAttrType.IFAF_VHID).u32 == vhid
348
349
def test_add_6_carp(self):
350
ifa = ipaddress.ip_interface("2001:db8::1/64")
351
iface = self.vnet.iface_alias_map["if1"]
352
vhid = 77
353
354
self.setup_dummy_carp(iface.ifindex, vhid)
355
356
msg = self.create_msg(ifa)
357
msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
358
attrs_bsd = [NlAttrU32(IfafAttrType.IFAF_VHID, vhid)]
359
msg.add_nla(NlAttrNested(IfaAttrType.IFA_FREEBSD, attrs_bsd))
360
361
self.send_check_success(msg)
362
363
lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
364
assert len(lst) == 2
365
rx_msg_gu = self.find_msg_by_ifa(lst, ifa.ip)
366
assert rx_msg_gu is not None
367
368
assert rx_msg_gu.base_hdr.ifa_prefixlen == ifa.network.prefixlen
369
assert rx_msg_gu.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
370
assert rx_msg_gu.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
371
ifa_bsd = rx_msg_gu.get_nla(IfaAttrType.IFA_FREEBSD)
372
assert ifa_bsd.get_nla(IfafAttrType.IFAF_VHID).u32 == vhid
373
374
def test_add_6_lifetime(self):
375
ifa = ipaddress.ip_interface("2001:db8::1/64")
376
iface = self.vnet.iface_alias_map["if1"]
377
pref_time = 43200
378
valid_time = 86400
379
380
ci = IfaCacheInfo(ifa_prefered=pref_time, ifa_valid=valid_time)
381
382
msg = self.create_msg(ifa)
383
msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
384
msg.add_nla(NlAttr(IfaAttrType.IFA_CACHEINFO, bytes(ci)))
385
386
self.send_check_success(msg)
387
388
lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
389
assert len(lst) == 2
390
rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
391
assert rx_msg is not None
392
393
ci = rx_msg.get_nla(IfaAttrType.IFA_CACHEINFO).ci
394
assert pref_time - 5 <= ci.ifa_prefered <= pref_time
395
assert valid_time - 5 <= ci.ifa_valid <= valid_time
396
assert ci.cstamp > 0
397
assert ci.tstamp > 0
398
assert ci.tstamp >= ci.cstamp
399
400
@pytest.mark.parametrize(
401
"flags_str",
402
[
403
"autoconf",
404
"deprecated",
405
"autoconf,deprecated",
406
"prefer_source",
407
],
408
)
409
def test_add_6_flags(self, flags_str):
410
ifa = ipaddress.ip_interface("2001:db8::1/64")
411
iface = self.vnet.iface_alias_map["if1"]
412
413
flags_map = {
414
"autoconf": {"nl": 0, "f": IfafFlags6.IN6_IFF_AUTOCONF},
415
"deprecated": {
416
"nl": IfaFlags.IFA_F_DEPRECATED,
417
"f": IfafFlags6.IN6_IFF_DEPRECATED,
418
},
419
"prefer_source": {"nl": 0, "f": IfafFlags6.IN6_IFF_PREFER_SOURCE},
420
}
421
nl_flags = 0
422
f_flags = 0
423
424
for flag_str in flags_str.split(","):
425
d = flags_map.get(flag_str, {})
426
nl_flags |= enum_or_int(d.get("nl", 0))
427
f_flags |= enum_or_int(d.get("f", 0))
428
429
msg = self.create_msg(ifa)
430
msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
431
msg.add_nla(NlAttrU32(IfaAttrType.IFA_FLAGS, nl_flags))
432
attrs_bsd = [NlAttrU32(IfafAttrType.IFAF_FLAGS, f_flags)]
433
msg.add_nla(NlAttrNested(IfaAttrType.IFA_FREEBSD, attrs_bsd))
434
435
self.send_check_success(msg)
436
437
lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
438
assert len(lst) == 2
439
rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
440
assert rx_msg is not None
441
442
assert rx_msg.get_nla(IfaAttrType.IFA_FLAGS).u32 == nl_flags
443
ifa_bsd = rx_msg.get_nla(IfaAttrType.IFA_FREEBSD)
444
assert ifa_bsd.get_nla(IfafAttrType.IFAF_FLAGS).u32 == f_flags
445
446
def test_add_4_empty_message(self):
447
"""Tests correct failure w/ empty message"""
448
msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_NEWADDR.value)
449
msg.set_request()
450
msg.nl_hdr.nlmsg_flags |= (
451
NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value
452
)
453
454
rx_msg = self.get_reply(msg)
455
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
456
assert rx_msg.error_code != 0
457
458
def test_add_4_empty_ifindex(self):
459
"""Tests correct failure w/ empty ifindex"""
460
ifa = ipaddress.ip_interface("192.0.2.1/24")
461
ifa_brd = ifa.network.broadcast_address
462
463
msg = self.create_msg(ifa)
464
msg.base_hdr.ifa_index = 0
465
msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
466
msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))
467
468
rx_msg = self.get_reply(msg)
469
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
470
assert rx_msg.error_code != 0
471
472
def test_add_4_empty_addr(self):
473
"""Tests correct failure w/ empty address"""
474
ifa = ipaddress.ip_interface("192.0.2.1/24")
475
ifa_brd = ifa.network.broadcast_address
476
477
msg = self.create_msg(ifa)
478
msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))
479
480
rx_msg = self.get_reply(msg)
481
assert rx_msg.is_type(NlMsgType.NLMSG_ERROR)
482
assert rx_msg.error_code != 0
483
484
@pytest.mark.parametrize(
485
"ifa_str",
486
[
487
pytest.param("192.0.2.1/32", id="ipv4_host"),
488
pytest.param("192.0.2.1/24", id="ipv4_prefix"),
489
pytest.param("2001:db8::1/64", id="ipv6_gu_prefix"),
490
pytest.param("2001:db8::1/128", id="ipv6_gu_host"),
491
],
492
)
493
@pytest.mark.parametrize(
494
"tlv",
495
[
496
pytest.param("local", id="ifa_local"),
497
pytest.param("address", id="ifa_address"),
498
],
499
)
500
def test_del(self, tlv, ifa_str):
501
"""Tests address deletion from the standard broadcast interface"""
502
ifa = ipaddress.ip_interface(ifa_str)
503
ifa_brd = ifa.network.broadcast_address
504
iface = self.vnet.iface_alias_map["if1"]
505
506
msg = self.create_msg(ifa)
507
msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
508
msg.add_nla(NlAttrIp(IfaAttrType.IFA_BROADCAST, str(ifa_brd)))
509
510
self.send_check_success(msg)
511
lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
512
rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
513
assert rx_msg is not None
514
515
msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_DELADDR.value)
516
msg.set_request()
517
msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip)
518
msg.base_hdr.ifa_index = iface.ifindex
519
msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen
520
521
if tlv == "local":
522
msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
523
if tlv == "address":
524
msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(ifa.ip)))
525
526
self.send_check_success(msg)
527
lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
528
rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
529
assert rx_msg is None
530
531
532
class TestRtNlIfaddrOpsP2p(RtnlIfaOps):
533
IFTYPE = "gif"
534
535
@pytest.mark.parametrize(
536
"ifa_pair",
537
[
538
pytest.param(["192.0.2.1/24", "192.0.2.2"], id="dst_inside_24"),
539
pytest.param(["192.0.2.1/30", "192.0.2.2"], id="dst_inside_30"),
540
pytest.param(["192.0.2.1/31", "192.0.2.2"], id="dst_inside_31"),
541
pytest.param(["192.0.2.1/32", "192.0.2.2"], id="dst_outside_32"),
542
pytest.param(["192.0.2.1/30", "192.0.2.100"], id="dst_outside_30"),
543
],
544
)
545
def test_add_4(self, ifa_pair):
546
"""Tests IPv4 address addition to the p2p interface"""
547
ifa = ipaddress.ip_interface(ifa_pair[0])
548
peer_ip = ipaddress.ip_address(ifa_pair[1])
549
iface = self.vnet.iface_alias_map["if1"]
550
551
msg = self.create_msg(ifa)
552
msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
553
msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip)))
554
555
self.send_check_success(msg)
556
557
lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
558
assert len(lst) == 1
559
rx_msg = lst[0]
560
561
assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen
562
assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
563
564
assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)
565
assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(peer_ip)
566
567
@pytest.mark.parametrize(
568
"ifa_pair",
569
[
570
pytest.param(
571
["2001:db8::1/64", "2001:db8::2"],
572
id="dst_inside_64",
573
marks=pytest.mark.xfail(reason="currently fails"),
574
),
575
pytest.param(
576
["2001:db8::1/127", "2001:db8::2"],
577
id="dst_inside_127",
578
marks=pytest.mark.xfail(reason="currently fails"),
579
),
580
pytest.param(["2001:db8::1/128", "2001:db8::2"], id="dst_outside_128"),
581
pytest.param(
582
["2001:db8::1/64", "2001:db8:2::2"],
583
id="dst_outside_64",
584
marks=pytest.mark.xfail(reason="currently fails"),
585
),
586
],
587
)
588
def test_add_6(self, ifa_pair):
589
"""Tests IPv6 address addition to the p2p interface"""
590
ifa = ipaddress.ip_interface(ifa_pair[0])
591
peer_ip = ipaddress.ip_address(ifa_pair[1])
592
iface = self.vnet.iface_alias_map["if1"]
593
594
msg = self.create_msg(ifa)
595
msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
596
msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip)))
597
598
self.send_check_success(msg)
599
600
lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
601
assert len(lst) == 2
602
rx_msg_gu = self.find_msg_by_ifa(lst, peer_ip)
603
assert rx_msg_gu is not None
604
605
assert rx_msg_gu.base_hdr.ifa_prefixlen == ifa.network.prefixlen
606
assert rx_msg_gu.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
607
assert rx_msg_gu.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)
608
assert rx_msg_gu.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(peer_ip)
609
610
@pytest.mark.parametrize(
611
"ifa_pair",
612
[
613
pytest.param(["192.0.2.1/30", "192.0.2.2"], id="ipv4_dst_inside_30"),
614
pytest.param(["192.0.2.1/32", "192.0.2.2"], id="ipv4_dst_outside_32"),
615
pytest.param(["2001:db8::1/128", "2001:db8::2"], id="ip6_dst_outside_128"),
616
],
617
)
618
@pytest.mark.parametrize(
619
"tlv_pair",
620
[
621
pytest.param(["a", ""], id="ifa_addr=addr"),
622
pytest.param(["", "a"], id="ifa_local=addr"),
623
pytest.param(["a", "a"], id="ifa_addr=addr,ifa_local=addr"),
624
],
625
)
626
def test_del(self, tlv_pair, ifa_pair):
627
"""Tests address deletion from the P2P interface"""
628
ifa = ipaddress.ip_interface(ifa_pair[0])
629
peer_ip = ipaddress.ip_address(ifa_pair[1])
630
iface = self.vnet.iface_alias_map["if1"]
631
ifa_addr_str, ifa_local_str = tlv_pair
632
633
msg = self.create_msg(ifa)
634
msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
635
msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip)))
636
637
self.send_check_success(msg)
638
lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
639
rx_msg = self.find_msg_by_ifa(lst, peer_ip)
640
assert rx_msg is not None
641
642
msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_DELADDR.value)
643
msg.set_request()
644
msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip)
645
msg.base_hdr.ifa_index = iface.ifindex
646
msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen
647
648
if "a" in ifa_addr_str:
649
msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(ifa.ip)))
650
if "p" in ifa_addr_str:
651
msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(peer_ip)))
652
if "a" in ifa_local_str:
653
msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
654
if "p" in ifa_local_str:
655
msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(peer_ip)))
656
657
self.send_check_success(msg)
658
lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
659
rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
660
assert rx_msg is None
661
662
663
class TestRtNlAddIfaddrLo(RtnlIfaOps):
664
IFTYPE = "lo"
665
666
@pytest.mark.parametrize(
667
"ifa_str",
668
[
669
pytest.param("192.0.2.1/24", id="prefix"),
670
pytest.param("192.0.2.1/32", id="host"),
671
],
672
)
673
def test_add_4(self, ifa_str):
674
"""Tests IPv4 address addition to the loopback interface"""
675
ifa = ipaddress.ip_interface(ifa_str)
676
iface = self.vnet.iface_alias_map["if1"]
677
678
msg = self.create_msg(ifa)
679
msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
680
681
self.send_check_success(msg)
682
683
lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
684
assert len(lst) == 1
685
rx_msg = lst[0]
686
687
assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen
688
assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
689
690
assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
691
assert rx_msg.get_nla(IfaAttrType.IFA_LOCAL).addr == str(ifa.ip)
692
693
@pytest.mark.parametrize(
694
"ifa_str",
695
[
696
pytest.param("2001:db8::1/64", id="gu_prefix"),
697
pytest.param("2001:db8::1/128", id="gu_host"),
698
],
699
)
700
def test_add_6(self, ifa_str):
701
"""Tests IPv6 address addition to the loopback interface"""
702
ifa = ipaddress.ip_interface(ifa_str)
703
iface = self.vnet.iface_alias_map["if1"]
704
705
msg = self.create_msg(ifa)
706
msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
707
708
self.send_check_success(msg)
709
710
lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
711
assert len(lst) == 2 # link-local should be auto-created as well
712
rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
713
assert rx_msg is not None
714
715
assert rx_msg.base_hdr.ifa_prefixlen == ifa.network.prefixlen
716
assert rx_msg.base_hdr.ifa_scope == RtScope.RT_SCOPE_UNIVERSE.value
717
assert rx_msg.get_nla(IfaAttrType.IFA_ADDRESS).addr == str(ifa.ip)
718
719
@pytest.mark.parametrize(
720
"ifa_str",
721
[
722
pytest.param("192.0.2.1/32", id="ipv4_host"),
723
pytest.param("192.0.2.1/24", id="ipv4_prefix"),
724
pytest.param("2001:db8::1/64", id="ipv6_gu_prefix"),
725
pytest.param("2001:db8::1/128", id="ipv6_gu_host"),
726
],
727
)
728
@pytest.mark.parametrize(
729
"tlv",
730
[
731
pytest.param("local", id="ifa_local"),
732
pytest.param("address", id="ifa_address"),
733
],
734
)
735
def test_del(self, tlv, ifa_str):
736
"""Tests address deletion from the loopback interface"""
737
ifa = ipaddress.ip_interface(ifa_str)
738
iface = self.vnet.iface_alias_map["if1"]
739
740
msg = self.create_msg(ifa)
741
msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
742
743
self.send_check_success(msg)
744
lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
745
rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
746
assert rx_msg is not None
747
748
msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_DELADDR.value)
749
msg.set_request()
750
msg.base_hdr.ifa_family = self.get_family_from_ip(ifa.ip)
751
msg.base_hdr.ifa_index = iface.ifindex
752
msg.base_hdr.ifa_prefixlen = ifa.network.prefixlen
753
754
if tlv == "local":
755
msg.add_nla(NlAttrIp(IfaAttrType.IFA_LOCAL, str(ifa.ip)))
756
if tlv == "address":
757
msg.add_nla(NlAttrIp(IfaAttrType.IFA_ADDRESS, str(ifa.ip)))
758
759
self.send_check_success(msg)
760
lst = self.get_ifa_list(iface.ifindex, self.get_family_from_ip(ifa.ip))
761
rx_msg = self.find_msg_by_ifa(lst, ifa.ip)
762
assert rx_msg is None
763
764