Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/freebsd-src
Path: blob/main/tests/sys/netinet6/test_ip6_output.py
39536 views
1
import errno
2
import ipaddress
3
import socket
4
import struct
5
import time
6
from ctypes import c_byte
7
from ctypes import c_uint
8
from ctypes import Structure
9
10
import pytest
11
from atf_python.sys.net.rtsock import SaHelper
12
from atf_python.sys.net.tools import ToolsHelper
13
from atf_python.sys.net.vnet import run_cmd
14
from atf_python.sys.net.vnet import SingleVnetTestTemplate
15
from atf_python.sys.net.vnet import VnetTestTemplate
16
17
18
class In6Pktinfo(Structure):
19
_fields_ = [
20
("ipi6_addr", c_byte * 16),
21
("ipi6_ifindex", c_uint),
22
]
23
24
25
class VerboseSocketServer:
26
def __init__(self, ip: str, port: int, ifname: str = None):
27
self.ip = ip
28
self.port = port
29
30
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
31
s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_RECVPKTINFO, 1)
32
addr = ipaddress.ip_address(ip)
33
if addr.is_link_local and ifname:
34
ifindex = socket.if_nametoindex(ifname)
35
addr_tuple = (ip, port, 0, ifindex)
36
elif addr.is_multicast and ifname:
37
ifindex = socket.if_nametoindex(ifname)
38
mreq = socket.inet_pton(socket.AF_INET6, ip) + struct.pack("I", ifindex)
39
s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)
40
print("## JOINED group {} % {}".format(ip, ifname))
41
addr_tuple = ("::", port, 0, ifindex)
42
else:
43
addr_tuple = (ip, port, 0, 0)
44
print("## Listening on [{}]:{}".format(addr_tuple[0], port))
45
s.bind(addr_tuple)
46
self.socket = s
47
48
def recv(self):
49
# data = self.socket.recv(4096)
50
# print("RX: " + data)
51
data, ancdata, msg_flags, address = self.socket.recvmsg(4096, 128)
52
# Assume ancdata has just 1 item
53
info = In6Pktinfo.from_buffer_copy(ancdata[0][2])
54
dst_ip = socket.inet_ntop(socket.AF_INET6, info.ipi6_addr)
55
dst_iface = socket.if_indextoname(info.ipi6_ifindex)
56
57
tx_obj = {
58
"data": data,
59
"src_ip": address[0],
60
"dst_ip": dst_ip,
61
"dst_iface": dst_iface,
62
}
63
return tx_obj
64
65
66
class BaseTestIP6Ouput(VnetTestTemplate):
67
TOPOLOGY = {
68
"vnet1": {"ifaces": ["if1", "if2", "if3"]},
69
"vnet2": {"ifaces": ["if1", "if2", "if3"]},
70
"if1": {"prefixes6": [("2001:db8:a::1/64", "2001:db8:a::2/64")]},
71
"if2": {"prefixes6": [("2001:db8:b::1/64", "2001:db8:b::2/64")]},
72
"if3": {"prefixes6": [("2001:db8:c::1/64", "2001:db8:c::2/64")]},
73
}
74
DEFAULT_PORT = 45365
75
76
def _vnet2_handler(self, vnet, ip: str, os_ifname: str = None):
77
"""Generic listener that sends first received packet with metadata
78
back to the sender via pipw
79
"""
80
ll_data = ToolsHelper.get_linklocals()
81
# Start listener
82
ss = VerboseSocketServer(ip, self.DEFAULT_PORT, os_ifname)
83
vnet.pipe.send(ll_data)
84
85
tx_obj = ss.recv()
86
tx_obj["dst_iface_alias"] = vnet.iface_map[tx_obj["dst_iface"]].alias
87
vnet.pipe.send(tx_obj)
88
89
90
class TestIP6Output(BaseTestIP6Ouput):
91
def vnet2_handler(self, vnet):
92
ip = str(vnet.iface_alias_map["if2"].first_ipv6.ip)
93
self._vnet2_handler(vnet, ip, None)
94
95
@pytest.mark.require_user("root")
96
def test_output6_base(self):
97
"""Tests simple UDP output"""
98
second_vnet = self.vnet_map["vnet2"]
99
100
# Pick target on if2 vnet2's end
101
ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1])
102
ip = str(ifaddr.ip)
103
104
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
105
data = bytes("AAAA", "utf-8")
106
print("## TX packet to {},{}".format(ip, self.DEFAULT_PORT))
107
108
# Wait for the child to become ready
109
self.wait_object(second_vnet.pipe)
110
s.sendto(data, (ip, self.DEFAULT_PORT))
111
112
# Wait for the received object
113
rx_obj = self.wait_object(second_vnet.pipe)
114
assert rx_obj["dst_ip"] == ip
115
assert rx_obj["dst_iface_alias"] == "if2"
116
117
@pytest.mark.require_user("root")
118
def test_output6_nhop(self):
119
"""Tests UDP output with custom nhop set"""
120
second_vnet = self.vnet_map["vnet2"]
121
122
# Pick target on if2 vnet2's end
123
ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1])
124
ip_dst = str(ifaddr.ip)
125
# Pick nexthop on if1
126
ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if1"]["prefixes6"][0][1])
127
ip_next = str(ifaddr.ip)
128
sin6_next = SaHelper.ip6_sa(ip_next, 0)
129
130
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, 0)
131
s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_NEXTHOP, sin6_next)
132
133
# Wait for the child to become ready
134
self.wait_object(second_vnet.pipe)
135
data = bytes("AAAA", "utf-8")
136
s.sendto(data, (ip_dst, self.DEFAULT_PORT))
137
138
# Wait for the received object
139
rx_obj = self.wait_object(second_vnet.pipe)
140
assert rx_obj["dst_ip"] == ip_dst
141
assert rx_obj["dst_iface_alias"] == "if1"
142
143
@pytest.mark.parametrize(
144
"params",
145
[
146
# esrc: src-ip, if: src-interface, esrc: expected-src,
147
# eif: expected-rx-interface
148
pytest.param({"esrc": "2001:db8:b::1", "eif": "if2"}, id="empty"),
149
pytest.param(
150
{"src": "2001:db8:c::1", "esrc": "2001:db8:c::1", "eif": "if2"},
151
id="iponly1",
152
),
153
pytest.param(
154
{
155
"src": "2001:db8:c::1",
156
"if": "if3",
157
"ex": errno.EHOSTUNREACH,
158
},
159
id="ipandif",
160
),
161
pytest.param(
162
{
163
"src": "2001:db8:c::aaaa",
164
"ex": errno.EADDRNOTAVAIL,
165
},
166
id="nolocalip",
167
),
168
pytest.param(
169
{"if": "if2", "src": "2001:db8:b::1", "eif": "if2"}, id="ifsame"
170
),
171
],
172
)
173
@pytest.mark.require_user("root")
174
def test_output6_pktinfo(self, params):
175
"""Tests simple UDP output"""
176
second_vnet = self.vnet_map["vnet2"]
177
vnet = self.vnet
178
179
# Pick target on if2 vnet2's end
180
ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1])
181
dst_ip = str(ifaddr.ip)
182
183
src_ip = params.get("src", "")
184
src_ifname = params.get("if", "")
185
expected_ip = params.get("esrc", "")
186
expected_ifname = params.get("eif", "")
187
errno = params.get("ex", 0)
188
189
pktinfo = In6Pktinfo()
190
if src_ip:
191
for i, b in enumerate(socket.inet_pton(socket.AF_INET6, src_ip)):
192
pktinfo.ipi6_addr[i] = b
193
if src_ifname:
194
os_ifname = vnet.iface_alias_map[src_ifname].name
195
pktinfo.ipi6_ifindex = socket.if_nametoindex(os_ifname)
196
197
# Wait for the child to become ready
198
self.wait_object(second_vnet.pipe)
199
data = bytes("AAAA", "utf-8")
200
201
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, 0)
202
try:
203
s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_PKTINFO, bytes(pktinfo))
204
aux = (socket.IPPROTO_IPV6, socket.IPV6_PKTINFO, bytes(pktinfo))
205
s.sendto(data, (dst_ip, self.DEFAULT_PORT))
206
except OSError as e:
207
if not errno:
208
raise
209
assert e.errno == errno
210
print("Correctly raised {}".format(e))
211
return
212
213
# Wait for the received object
214
rx_obj = self.wait_object(second_vnet.pipe)
215
216
assert rx_obj["dst_ip"] == dst_ip
217
if expected_ip:
218
assert rx_obj["src_ip"] == expected_ip
219
if expected_ifname:
220
assert rx_obj["dst_iface_alias"] == expected_ifname
221
222
223
class TestIP6OutputLL(BaseTestIP6Ouput):
224
def vnet2_handler(self, vnet):
225
"""Generic listener that sends first received packet with metadata
226
back to the sender via pipw
227
"""
228
os_ifname = vnet.iface_alias_map["if2"].name
229
ll_data = ToolsHelper.get_linklocals()
230
ll_ip, _ = ll_data[os_ifname][0]
231
self._vnet2_handler(vnet, ll_ip, os_ifname)
232
233
@pytest.mark.require_user("root")
234
def test_output6_linklocal(self):
235
"""Tests simple UDP output"""
236
second_vnet = self.vnet_map["vnet2"]
237
238
# Wait for the child to become ready
239
ll_data = self.wait_object(second_vnet.pipe)
240
241
# Pick LL address on if2 vnet2's end
242
ip, _ = ll_data[second_vnet.iface_alias_map["if2"].name][0]
243
# Get local interface scope
244
os_ifname = self.vnet.iface_alias_map["if2"].name
245
scopeid = socket.if_nametoindex(os_ifname)
246
247
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
248
data = bytes("AAAA", "utf-8")
249
target = (ip, self.DEFAULT_PORT, 0, scopeid)
250
print("## TX packet to {}%{},{}".format(ip, scopeid, target[1]))
251
252
s.sendto(data, target)
253
254
# Wait for the received object
255
rx_obj = self.wait_object(second_vnet.pipe)
256
assert rx_obj["dst_ip"] == ip
257
assert rx_obj["dst_iface_alias"] == "if2"
258
259
260
class TestIP6OutputNhopLL(BaseTestIP6Ouput):
261
def vnet2_handler(self, vnet):
262
"""Generic listener that sends first received packet with metadata
263
back to the sender via pipw
264
"""
265
ip = str(vnet.iface_alias_map["if2"].first_ipv6.ip)
266
self._vnet2_handler(vnet, ip, None)
267
268
@pytest.mark.require_user("root")
269
def test_output6_nhop_linklocal(self):
270
"""Tests UDP output with custom link-local nhop set"""
271
second_vnet = self.vnet_map["vnet2"]
272
273
# Wait for the child to become ready
274
ll_data = self.wait_object(second_vnet.pipe)
275
276
# Pick target on if2 vnet2's end
277
ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1])
278
ip_dst = str(ifaddr.ip)
279
# Pick nexthop on if1
280
ip_next, _ = ll_data[second_vnet.iface_alias_map["if1"].name][0]
281
# Get local interfaces
282
os_ifname = self.vnet.iface_alias_map["if1"].name
283
scopeid = socket.if_nametoindex(os_ifname)
284
sin6_next = SaHelper.ip6_sa(ip_next, scopeid)
285
286
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, 0)
287
s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_NEXTHOP, sin6_next)
288
289
data = bytes("AAAA", "utf-8")
290
s.sendto(data, (ip_dst, self.DEFAULT_PORT))
291
292
# Wait for the received object
293
rx_obj = self.wait_object(second_vnet.pipe)
294
assert rx_obj["dst_ip"] == ip_dst
295
assert rx_obj["dst_iface_alias"] == "if1"
296
297
298
class TestIP6OutputScope(BaseTestIP6Ouput):
299
def vnet2_handler(self, vnet):
300
"""Generic listener that sends first received packet with metadata
301
back to the sender via pipw
302
"""
303
bind_ip, bind_ifp = self.wait_object(vnet.pipe)
304
if bind_ip is None:
305
os_ifname = vnet.iface_alias_map[bind_ifp].name
306
ll_data = ToolsHelper.get_linklocals()
307
bind_ip, _ = ll_data[os_ifname][0]
308
if bind_ifp is not None:
309
bind_ifp = vnet.iface_alias_map[bind_ifp].name
310
print("## BIND {}%{}".format(bind_ip, bind_ifp))
311
self._vnet2_handler(vnet, bind_ip, bind_ifp)
312
313
@pytest.mark.parametrize(
314
"params",
315
[
316
# sif/dif: source/destination interface (for link-local addr)
317
# sip/dip: source/destination ip (for non-LL addr)
318
# ex: OSError errno that sendto() must raise
319
pytest.param({"sif": "if2", "dif": "if2"}, id="same"),
320
pytest.param(
321
{
322
"sif": "if1",
323
"dif": "if2",
324
"ex": errno.EHOSTUNREACH,
325
},
326
id="ll_differentif1",
327
),
328
pytest.param(
329
{
330
"sif": "if1",
331
"dip": "2001:db8:b::2",
332
"ex": errno.EHOSTUNREACH,
333
},
334
id="ll_differentif2",
335
),
336
pytest.param(
337
{
338
"sip": "2001:db8:a::1",
339
"dif": "if2",
340
},
341
id="gu_to_ll",
342
),
343
],
344
)
345
@pytest.mark.require_user("root")
346
def test_output6_linklocal_scope(self, params):
347
"""Tests simple UDP output"""
348
second_vnet = self.vnet_map["vnet2"]
349
350
src_ifp = params.get("sif")
351
src_ip = params.get("sip")
352
dst_ifp = params.get("dif")
353
dst_ip = params.get("dip")
354
errno = params.get("ex", 0)
355
356
# Sent ifp/IP to bind on
357
second_vnet = self.vnet_map["vnet2"]
358
second_vnet.pipe.send((dst_ip, dst_ifp))
359
360
# Wait for the child to become ready
361
ll_data = self.wait_object(second_vnet.pipe)
362
363
if dst_ip is None:
364
# Pick LL address on dst_ifp vnet2's end
365
dst_ip, _ = ll_data[second_vnet.iface_alias_map[dst_ifp].name][0]
366
# Get local interface scope
367
os_ifname = self.vnet.iface_alias_map[dst_ifp].name
368
scopeid = socket.if_nametoindex(os_ifname)
369
target = (dst_ip, self.DEFAULT_PORT, 0, scopeid)
370
else:
371
target = (dst_ip, self.DEFAULT_PORT, 0, 0)
372
373
# Bind
374
if src_ip is None:
375
ll_data = ToolsHelper.get_linklocals()
376
os_ifname = self.vnet.iface_alias_map[src_ifp].name
377
src_ip, _ = ll_data[os_ifname][0]
378
scopeid = socket.if_nametoindex(os_ifname)
379
src = (src_ip, self.DEFAULT_PORT, 0, scopeid)
380
else:
381
src = (src_ip, self.DEFAULT_PORT, 0, 0)
382
383
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
384
s.bind(src)
385
data = bytes("AAAA", "utf-8")
386
print("## TX packet {} -> {}".format(src, target))
387
388
try:
389
s.sendto(data, target)
390
except OSError as e:
391
if not errno:
392
raise
393
assert e.errno == errno
394
print("Correctly raised {}".format(e))
395
return
396
397
# Wait for the received object
398
rx_obj = self.wait_object(second_vnet.pipe)
399
assert rx_obj["dst_ip"] == dst_ip
400
assert rx_obj["src_ip"] == src_ip
401
# assert rx_obj["dst_iface_alias"] == "if2"
402
403
404
class TestIP6OutputMulticast(BaseTestIP6Ouput):
405
def vnet2_handler(self, vnet):
406
group = self.wait_object(vnet.pipe)
407
os_ifname = vnet.iface_alias_map["if2"].name
408
self._vnet2_handler(vnet, group, os_ifname)
409
410
@pytest.mark.parametrize("group_scope", ["ff02", "ff05", "ff08", "ff0e"])
411
@pytest.mark.require_user("root")
412
def test_output6_multicast(self, group_scope):
413
"""Tests simple UDP output"""
414
second_vnet = self.vnet_map["vnet2"]
415
416
group = "{}::3456".format(group_scope)
417
second_vnet.pipe.send(group)
418
419
# Pick target on if2 vnet2's end
420
ip = group
421
os_ifname = self.vnet.iface_alias_map["if2"].name
422
ifindex = socket.if_nametoindex(os_ifname)
423
optval = struct.pack("I", ifindex)
424
425
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
426
s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, optval)
427
428
data = bytes("AAAA", "utf-8")
429
430
# Wait for the child to become ready
431
self.wait_object(second_vnet.pipe)
432
433
print("## TX packet to {},{}".format(ip, self.DEFAULT_PORT))
434
s.sendto(data, (ip, self.DEFAULT_PORT))
435
436
# Wait for the received object
437
rx_obj = self.wait_object(second_vnet.pipe)
438
assert rx_obj["dst_ip"] == ip
439
assert rx_obj["dst_iface_alias"] == "if2"
440
441
442
class TestIP6OutputLoopback(SingleVnetTestTemplate):
443
IPV6_PREFIXES = ["2001:db8:a::1/64"]
444
DEFAULT_PORT = 45365
445
446
@pytest.mark.parametrize(
447
"source_validation",
448
[
449
pytest.param(0, id="no_sav"),
450
pytest.param(1, id="sav"),
451
],
452
)
453
@pytest.mark.parametrize("scope", ["gu", "ll", "lo"])
454
def test_output6_self_tcp(self, scope, source_validation):
455
"""Tests IPv6 TCP connection to the local IPv6 address"""
456
457
ToolsHelper.set_sysctl(
458
"net.inet6.ip6.source_address_validation", source_validation
459
)
460
461
if scope == "gu":
462
ip = "2001:db8:a::1"
463
addr_tuple = (ip, self.DEFAULT_PORT)
464
elif scope == "ll":
465
os_ifname = self.vnet.iface_alias_map["if1"].name
466
ifindex = socket.if_nametoindex(os_ifname)
467
ll_data = ToolsHelper.get_linklocals()
468
ip, _ = ll_data[os_ifname][0]
469
addr_tuple = (ip, self.DEFAULT_PORT, 0, ifindex)
470
elif scope == "lo":
471
ip = "::1"
472
ToolsHelper.get_output("route add -6 ::1/128 -iface lo0")
473
ifindex = socket.if_nametoindex("lo0")
474
addr_tuple = (ip, self.DEFAULT_PORT)
475
else:
476
assert 0 == 1
477
print("address: {}".format(addr_tuple))
478
479
start = time.perf_counter()
480
ss = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
481
ss.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_RECVPKTINFO, 1)
482
ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
483
ss.bind(addr_tuple)
484
ss.listen()
485
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
486
s.settimeout(2.0)
487
s.connect(addr_tuple)
488
conn, from_addr = ss.accept()
489
duration = time.perf_counter() - start
490
491
assert from_addr[0] == ip
492
assert duration < 1.0
493
494
@pytest.mark.parametrize(
495
"source_validation",
496
[
497
pytest.param(0, id="no_sav"),
498
pytest.param(1, id="sav"),
499
],
500
)
501
@pytest.mark.parametrize("scope", ["gu", "ll", "lo"])
502
def test_output6_self_udp(self, scope, source_validation):
503
"""Tests IPv6 UDP connection to the local IPv6 address"""
504
505
ToolsHelper.set_sysctl(
506
"net.inet6.ip6.source_address_validation", source_validation
507
)
508
509
if scope == "gu":
510
ip = "2001:db8:a::1"
511
addr_tuple = (ip, self.DEFAULT_PORT)
512
elif scope == "ll":
513
os_ifname = self.vnet.iface_alias_map["if1"].name
514
ifindex = socket.if_nametoindex(os_ifname)
515
ll_data = ToolsHelper.get_linklocals()
516
ip, _ = ll_data[os_ifname][0]
517
addr_tuple = (ip, self.DEFAULT_PORT, 0, ifindex)
518
elif scope == "lo":
519
ip = "::1"
520
ToolsHelper.get_output("route add -6 ::1/128 -iface lo0")
521
ifindex = socket.if_nametoindex("lo0")
522
addr_tuple = (ip, self.DEFAULT_PORT)
523
else:
524
assert 0 == 1
525
print("address: {}".format(addr_tuple))
526
527
start = time.perf_counter()
528
ss = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
529
ss.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_RECVPKTINFO, 1)
530
ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
531
ss.bind(addr_tuple)
532
ss.listen()
533
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP)
534
s.settimeout(2.0)
535
s.connect(addr_tuple)
536
conn, from_addr = ss.accept()
537
duration = time.perf_counter() - start
538
539
assert from_addr[0] == ip
540
assert duration < 1.0
541
542