Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/freebsd-src
Path: blob/main/tests/sys/netinet6/test_ip6_output.py
104924 views
1
import errno
2
import ipaddress
3
import socket
4
import struct
5
import time
6
from ctypes import c_byte
7
from ctypes import c_uint
8
from ctypes import Structure
9
10
import pytest
11
from atf_python.sys.net.rtsock import SaHelper
12
from atf_python.sys.net.tools import ToolsHelper
13
from atf_python.sys.net.vnet import SingleVnetTestTemplate
14
from atf_python.sys.net.vnet import VnetTestTemplate
15
16
17
class In6Pktinfo(Structure):
18
_fields_ = [
19
("ipi6_addr", c_byte * 16),
20
("ipi6_ifindex", c_uint),
21
]
22
23
24
class VerboseSocketServer:
25
def __init__(self, ip: str, port: int, ifname: str = None):
26
self.ip = ip
27
self.port = port
28
29
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
30
s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_RECVPKTINFO, 1)
31
addr = ipaddress.ip_address(ip)
32
if addr.is_link_local and ifname:
33
ifindex = socket.if_nametoindex(ifname)
34
addr_tuple = (ip, port, 0, ifindex)
35
elif addr.is_multicast and ifname:
36
ifindex = socket.if_nametoindex(ifname)
37
mreq = socket.inet_pton(socket.AF_INET6, ip) + struct.pack("I", ifindex)
38
s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)
39
print("## JOINED group {} % {}".format(ip, ifname))
40
addr_tuple = ("::", port, 0, ifindex)
41
else:
42
addr_tuple = (ip, port, 0, 0)
43
print("## Listening on [{}]:{}".format(addr_tuple[0], port))
44
s.bind(addr_tuple)
45
self.socket = s
46
47
def recv(self):
48
# data = self.socket.recv(4096)
49
# print("RX: " + data)
50
data, ancdata, msg_flags, address = self.socket.recvmsg(4096, 128)
51
# Assume ancdata has just 1 item
52
info = In6Pktinfo.from_buffer_copy(ancdata[0][2])
53
dst_ip = socket.inet_ntop(socket.AF_INET6, info.ipi6_addr)
54
dst_iface = socket.if_indextoname(info.ipi6_ifindex)
55
56
tx_obj = {
57
"data": data,
58
"src_ip": address[0],
59
"dst_ip": dst_ip,
60
"dst_iface": dst_iface,
61
}
62
return tx_obj
63
64
65
class BaseTestIP6Ouput(VnetTestTemplate):
66
TOPOLOGY = {
67
"vnet1": {"ifaces": ["if1", "if2", "if3"]},
68
"vnet2": {"ifaces": ["if1", "if2", "if3"]},
69
"if1": {"prefixes6": [("2001:db8:a::1/64", "2001:db8:a::2/64")]},
70
"if2": {"prefixes6": [("2001:db8:b::1/64", "2001:db8:b::2/64")]},
71
"if3": {"prefixes6": [("2001:db8:c::1/64", "2001:db8:c::2/64")]},
72
}
73
DEFAULT_PORT = 45365
74
75
def _vnet2_handler(self, vnet, ip: str, os_ifname: str = None):
76
"""Generic listener that sends first received packet with metadata
77
back to the sender via pipw
78
"""
79
ll_data = ToolsHelper.get_linklocals()
80
# Start listener
81
ss = VerboseSocketServer(ip, self.DEFAULT_PORT, os_ifname)
82
vnet.pipe.send(ll_data)
83
84
tx_obj = ss.recv()
85
tx_obj["dst_iface_alias"] = vnet.iface_map[tx_obj["dst_iface"]].alias
86
vnet.pipe.send(tx_obj)
87
88
89
class TestIP6Output(BaseTestIP6Ouput):
90
def vnet2_handler(self, vnet):
91
ip = str(vnet.iface_alias_map["if2"].first_ipv6.ip)
92
self._vnet2_handler(vnet, ip, None)
93
94
@pytest.mark.require_user("root")
95
def test_output6_base(self):
96
"""Tests simple UDP output"""
97
second_vnet = self.vnet_map["vnet2"]
98
99
# Pick target on if2 vnet2's end
100
ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1])
101
ip = str(ifaddr.ip)
102
103
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
104
data = bytes("AAAA", "utf-8")
105
print("## TX packet to {},{}".format(ip, self.DEFAULT_PORT))
106
107
# Wait for the child to become ready
108
self.wait_object(second_vnet.pipe)
109
s.sendto(data, (ip, self.DEFAULT_PORT))
110
111
# Wait for the received object
112
rx_obj = self.wait_object(second_vnet.pipe)
113
assert rx_obj["dst_ip"] == ip
114
assert rx_obj["dst_iface_alias"] == "if2"
115
116
@pytest.mark.require_user("root")
117
def test_output6_nhop(self):
118
"""Tests UDP output with custom nhop set"""
119
second_vnet = self.vnet_map["vnet2"]
120
121
# Pick target on if2 vnet2's end
122
ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1])
123
ip_dst = str(ifaddr.ip)
124
# Pick nexthop on if1
125
ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if1"]["prefixes6"][0][1])
126
ip_next = str(ifaddr.ip)
127
sin6_next = SaHelper.ip6_sa(ip_next, 0)
128
129
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, 0)
130
s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_NEXTHOP, sin6_next)
131
132
# Wait for the child to become ready
133
self.wait_object(second_vnet.pipe)
134
data = bytes("AAAA", "utf-8")
135
s.sendto(data, (ip_dst, self.DEFAULT_PORT))
136
137
# Wait for the received object
138
rx_obj = self.wait_object(second_vnet.pipe)
139
assert rx_obj["dst_ip"] == ip_dst
140
assert rx_obj["dst_iface_alias"] == "if1"
141
142
@pytest.mark.parametrize(
143
"params",
144
[
145
# esrc: src-ip, if: src-interface, esrc: expected-src,
146
# eif: expected-rx-interface
147
pytest.param({"esrc": "2001:db8:b::1", "eif": "if2"}, id="empty"),
148
pytest.param(
149
{"src": "2001:db8:c::1", "esrc": "2001:db8:c::1", "eif": "if2"},
150
id="iponly1",
151
),
152
pytest.param(
153
{
154
"src": "2001:db8:c::1",
155
"if": "if3",
156
"ex": errno.EHOSTUNREACH,
157
},
158
id="ipandif",
159
),
160
pytest.param(
161
{
162
"src": "2001:db8:c::aaaa",
163
"ex": errno.EADDRNOTAVAIL,
164
},
165
id="nolocalip",
166
),
167
pytest.param(
168
{"if": "if2", "src": "2001:db8:b::1", "eif": "if2"}, id="ifsame"
169
),
170
],
171
)
172
@pytest.mark.require_user("root")
173
def test_output6_pktinfo(self, params):
174
"""Tests simple UDP output"""
175
second_vnet = self.vnet_map["vnet2"]
176
vnet = self.vnet
177
178
# Pick target on if2 vnet2's end
179
ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1])
180
dst_ip = str(ifaddr.ip)
181
182
src_ip = params.get("src", "")
183
src_ifname = params.get("if", "")
184
expected_ip = params.get("esrc", "")
185
expected_ifname = params.get("eif", "")
186
errno = params.get("ex", 0)
187
188
pktinfo = In6Pktinfo()
189
if src_ip:
190
for i, b in enumerate(socket.inet_pton(socket.AF_INET6, src_ip)):
191
pktinfo.ipi6_addr[i] = b
192
if src_ifname:
193
os_ifname = vnet.iface_alias_map[src_ifname].name
194
pktinfo.ipi6_ifindex = socket.if_nametoindex(os_ifname)
195
196
# Wait for the child to become ready
197
self.wait_object(second_vnet.pipe)
198
data = bytes("AAAA", "utf-8")
199
200
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, 0)
201
try:
202
s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_PKTINFO, bytes(pktinfo))
203
aux = (socket.IPPROTO_IPV6, socket.IPV6_PKTINFO, bytes(pktinfo))
204
s.sendto(data, (dst_ip, self.DEFAULT_PORT))
205
except OSError as e:
206
if not errno:
207
raise
208
assert e.errno == errno
209
print("Correctly raised {}".format(e))
210
return
211
212
# Wait for the received object
213
rx_obj = self.wait_object(second_vnet.pipe)
214
215
assert rx_obj["dst_ip"] == dst_ip
216
if expected_ip:
217
assert rx_obj["src_ip"] == expected_ip
218
if expected_ifname:
219
assert rx_obj["dst_iface_alias"] == expected_ifname
220
221
222
class TestIP6OutputLL(BaseTestIP6Ouput):
223
def vnet2_handler(self, vnet):
224
"""Generic listener that sends first received packet with metadata
225
back to the sender via pipw
226
"""
227
os_ifname = vnet.iface_alias_map["if2"].name
228
ll_data = ToolsHelper.get_linklocals()
229
ll_ip, _ = ll_data[os_ifname][0]
230
self._vnet2_handler(vnet, ll_ip, os_ifname)
231
232
@pytest.mark.require_user("root")
233
def test_output6_linklocal(self):
234
"""Tests simple UDP output"""
235
second_vnet = self.vnet_map["vnet2"]
236
237
# Wait for the child to become ready
238
ll_data = self.wait_object(second_vnet.pipe)
239
240
# Pick LL address on if2 vnet2's end
241
ip, _ = ll_data[second_vnet.iface_alias_map["if2"].name][0]
242
# Get local interface scope
243
os_ifname = self.vnet.iface_alias_map["if2"].name
244
scopeid = socket.if_nametoindex(os_ifname)
245
246
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
247
data = bytes("AAAA", "utf-8")
248
target = (ip, self.DEFAULT_PORT, 0, scopeid)
249
print("## TX packet to {}%{},{}".format(ip, scopeid, target[1]))
250
251
s.sendto(data, target)
252
253
# Wait for the received object
254
rx_obj = self.wait_object(second_vnet.pipe)
255
assert rx_obj["dst_ip"] == ip
256
assert rx_obj["dst_iface_alias"] == "if2"
257
258
259
class TestIP6OutputNhopLL(BaseTestIP6Ouput):
260
def vnet2_handler(self, vnet):
261
"""Generic listener that sends first received packet with metadata
262
back to the sender via pipw
263
"""
264
ip = str(vnet.iface_alias_map["if2"].first_ipv6.ip)
265
self._vnet2_handler(vnet, ip, None)
266
267
@pytest.mark.require_user("root")
268
def test_output6_nhop_linklocal(self):
269
"""Tests UDP output with custom link-local nhop set"""
270
second_vnet = self.vnet_map["vnet2"]
271
272
# Wait for the child to become ready
273
ll_data = self.wait_object(second_vnet.pipe)
274
275
# Pick target on if2 vnet2's end
276
ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1])
277
ip_dst = str(ifaddr.ip)
278
# Pick nexthop on if1
279
ip_next, _ = ll_data[second_vnet.iface_alias_map["if1"].name][0]
280
# Get local interfaces
281
os_ifname = self.vnet.iface_alias_map["if1"].name
282
scopeid = socket.if_nametoindex(os_ifname)
283
sin6_next = SaHelper.ip6_sa(ip_next, scopeid)
284
285
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, 0)
286
s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_NEXTHOP, sin6_next)
287
288
data = bytes("AAAA", "utf-8")
289
s.sendto(data, (ip_dst, self.DEFAULT_PORT))
290
291
# Wait for the received object
292
rx_obj = self.wait_object(second_vnet.pipe)
293
assert rx_obj["dst_ip"] == ip_dst
294
assert rx_obj["dst_iface_alias"] == "if1"
295
296
297
class TestIP6OutputScope(BaseTestIP6Ouput):
298
def vnet2_handler(self, vnet):
299
"""Generic listener that sends first received packet with metadata
300
back to the sender via pipw
301
"""
302
bind_ip, bind_ifp = self.wait_object(vnet.pipe)
303
if bind_ip is None:
304
os_ifname = vnet.iface_alias_map[bind_ifp].name
305
ll_data = ToolsHelper.get_linklocals()
306
bind_ip, _ = ll_data[os_ifname][0]
307
if bind_ifp is not None:
308
bind_ifp = vnet.iface_alias_map[bind_ifp].name
309
print("## BIND {}%{}".format(bind_ip, bind_ifp))
310
self._vnet2_handler(vnet, bind_ip, bind_ifp)
311
312
@pytest.mark.parametrize(
313
"params",
314
[
315
# sif/dif: source/destination interface (for link-local addr)
316
# sip/dip: source/destination ip (for non-LL addr)
317
# ex: OSError errno that sendto() must raise
318
pytest.param({"sif": "if2", "dif": "if2"}, id="same"),
319
pytest.param(
320
{
321
"sif": "if1",
322
"dif": "if2",
323
"ex": errno.EHOSTUNREACH,
324
},
325
id="ll_differentif1",
326
),
327
pytest.param(
328
{
329
"sif": "if1",
330
"dip": "2001:db8:b::2",
331
"ex": errno.EHOSTUNREACH,
332
},
333
id="ll_differentif2",
334
),
335
pytest.param(
336
{
337
"sip": "2001:db8:a::1",
338
"dif": "if2",
339
},
340
id="gu_to_ll",
341
),
342
],
343
)
344
@pytest.mark.require_user("root")
345
def test_output6_linklocal_scope(self, params):
346
"""Tests simple UDP output"""
347
second_vnet = self.vnet_map["vnet2"]
348
349
src_ifp = params.get("sif")
350
src_ip = params.get("sip")
351
dst_ifp = params.get("dif")
352
dst_ip = params.get("dip")
353
errno = params.get("ex", 0)
354
355
# Sent ifp/IP to bind on
356
second_vnet = self.vnet_map["vnet2"]
357
second_vnet.pipe.send((dst_ip, dst_ifp))
358
359
# Wait for the child to become ready
360
ll_data = self.wait_object(second_vnet.pipe)
361
362
if dst_ip is None:
363
# Pick LL address on dst_ifp vnet2's end
364
dst_ip, _ = ll_data[second_vnet.iface_alias_map[dst_ifp].name][0]
365
# Get local interface scope
366
os_ifname = self.vnet.iface_alias_map[dst_ifp].name
367
scopeid = socket.if_nametoindex(os_ifname)
368
target = (dst_ip, self.DEFAULT_PORT, 0, scopeid)
369
else:
370
target = (dst_ip, self.DEFAULT_PORT, 0, 0)
371
372
# Bind
373
if src_ip is None:
374
ll_data = ToolsHelper.get_linklocals()
375
os_ifname = self.vnet.iface_alias_map[src_ifp].name
376
src_ip, _ = ll_data[os_ifname][0]
377
scopeid = socket.if_nametoindex(os_ifname)
378
src = (src_ip, self.DEFAULT_PORT, 0, scopeid)
379
else:
380
src = (src_ip, self.DEFAULT_PORT, 0, 0)
381
382
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
383
s.bind(src)
384
data = bytes("AAAA", "utf-8")
385
print("## TX packet {} -> {}".format(src, target))
386
387
try:
388
s.sendto(data, target)
389
except OSError as e:
390
if not errno:
391
raise
392
assert e.errno == errno
393
print("Correctly raised {}".format(e))
394
return
395
396
# Wait for the received object
397
rx_obj = self.wait_object(second_vnet.pipe)
398
assert rx_obj["dst_ip"] == dst_ip
399
assert rx_obj["src_ip"] == src_ip
400
# assert rx_obj["dst_iface_alias"] == "if2"
401
402
403
class TestIP6OutputMulticast(BaseTestIP6Ouput):
404
def vnet2_handler(self, vnet):
405
group = self.wait_object(vnet.pipe)
406
os_ifname = vnet.iface_alias_map["if2"].name
407
self._vnet2_handler(vnet, group, os_ifname)
408
409
@pytest.mark.parametrize("group_scope", ["ff02", "ff05", "ff08", "ff0e"])
410
@pytest.mark.require_user("root")
411
def test_output6_multicast(self, group_scope):
412
"""Tests simple UDP output"""
413
second_vnet = self.vnet_map["vnet2"]
414
415
group = "{}::3456".format(group_scope)
416
second_vnet.pipe.send(group)
417
418
# Pick target on if2 vnet2's end
419
ip = group
420
os_ifname = self.vnet.iface_alias_map["if2"].name
421
ifindex = socket.if_nametoindex(os_ifname)
422
optval = struct.pack("I", ifindex)
423
424
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
425
s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, optval)
426
427
data = bytes("AAAA", "utf-8")
428
429
# Wait for the child to become ready
430
self.wait_object(second_vnet.pipe)
431
432
print("## TX packet to {},{}".format(ip, self.DEFAULT_PORT))
433
s.sendto(data, (ip, self.DEFAULT_PORT))
434
435
# Wait for the received object
436
rx_obj = self.wait_object(second_vnet.pipe)
437
assert rx_obj["dst_ip"] == ip
438
assert rx_obj["dst_iface_alias"] == "if2"
439
440
441
class TestIP6OutputLoopback(SingleVnetTestTemplate):
442
IPV6_PREFIXES = ["2001:db8:a::1/64"]
443
DEFAULT_PORT = 45365
444
445
@pytest.mark.parametrize(
446
"source_validation",
447
[
448
pytest.param(0, id="no_sav"),
449
pytest.param(1, id="sav"),
450
],
451
)
452
@pytest.mark.parametrize("scope", ["gu", "ll", "lo"])
453
def test_output6_self_tcp(self, scope, source_validation):
454
"""Tests IPv6 TCP connection to the local IPv6 address"""
455
456
ToolsHelper.set_sysctl(
457
"net.inet6.ip6.source_address_validation", source_validation
458
)
459
460
if scope == "gu":
461
ip = "2001:db8:a::1"
462
addr_tuple = (ip, self.DEFAULT_PORT)
463
elif scope == "ll":
464
os_ifname = self.vnet.iface_alias_map["if1"].name
465
ifindex = socket.if_nametoindex(os_ifname)
466
ll_data = ToolsHelper.get_linklocals()
467
ip, _ = ll_data[os_ifname][0]
468
addr_tuple = (ip, self.DEFAULT_PORT, 0, ifindex)
469
elif scope == "lo":
470
ip = "::1"
471
ToolsHelper.get_output("route add -6 ::1/128 -iface lo0")
472
ifindex = socket.if_nametoindex("lo0")
473
addr_tuple = (ip, self.DEFAULT_PORT)
474
else:
475
assert 0 == 1
476
print("address: {}".format(addr_tuple))
477
478
start = time.perf_counter()
479
ss = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
480
ss.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_RECVPKTINFO, 1)
481
ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
482
ss.bind(addr_tuple)
483
ss.listen()
484
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
485
s.settimeout(2.0)
486
s.connect(addr_tuple)
487
conn, from_addr = ss.accept()
488
duration = time.perf_counter() - start
489
490
assert from_addr[0] == ip
491
assert duration < 1.0
492
493
@pytest.mark.parametrize(
494
"source_validation",
495
[
496
pytest.param(0, id="no_sav"),
497
pytest.param(1, id="sav"),
498
],
499
)
500
@pytest.mark.parametrize("scope", ["gu", "ll", "lo"])
501
def test_output6_self_udp(self, scope, source_validation):
502
"""Tests IPv6 UDP connection to the local IPv6 address"""
503
504
ToolsHelper.set_sysctl(
505
"net.inet6.ip6.source_address_validation", source_validation
506
)
507
508
if scope == "gu":
509
ip = "2001:db8:a::1"
510
addr_tuple = (ip, self.DEFAULT_PORT)
511
elif scope == "ll":
512
os_ifname = self.vnet.iface_alias_map["if1"].name
513
ifindex = socket.if_nametoindex(os_ifname)
514
ll_data = ToolsHelper.get_linklocals()
515
ip, _ = ll_data[os_ifname][0]
516
addr_tuple = (ip, self.DEFAULT_PORT, 0, ifindex)
517
elif scope == "lo":
518
ip = "::1"
519
ToolsHelper.get_output("route add -6 ::1/128 -iface lo0")
520
ifindex = socket.if_nametoindex("lo0")
521
addr_tuple = (ip, self.DEFAULT_PORT)
522
else:
523
assert 0 == 1
524
print("address: {}".format(addr_tuple))
525
526
start = time.perf_counter()
527
ss = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
528
ss.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_RECVPKTINFO, 1)
529
ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
530
ss.bind(addr_tuple)
531
ss.listen()
532
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
533
s.settimeout(2.0)
534
s.connect(addr_tuple)
535
conn, from_addr = ss.accept()
536
duration = time.perf_counter() - start
537
538
assert from_addr[0] == ip
539
assert duration < 1.0
540
541