Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/freebsd-src
Path: blob/main/tests/sys/netinet/ip_mroute.py
178478 views
1
#
2
# Copyright (c) 2025 Stormshield
3
#
4
# SPDX-License-Identifier: BSD-2-Clause
5
#
6
7
import pytest
8
import socket
9
import struct
10
import subprocess
11
import time
12
from pathlib import Path
13
14
from atf_python.sys.net.vnet import VnetTestTemplate
15
16
17
class MRouteTestTemplate(VnetTestTemplate):
18
"""
19
Helper class for multicast routing tests. Test classes should inherit from this one.
20
"""
21
COORD_SOCK = "coord.sock"
22
23
@staticmethod
24
def _msgwait(sock: socket.socket, expected: bytes):
25
msg = sock.recv(1024)
26
assert msg == expected
27
28
@staticmethod
29
def sendmsg(msg: bytes, path: str):
30
s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
31
s.sendto(msg, path)
32
s.close()
33
34
@staticmethod
35
def _makesock(path: str):
36
s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
37
s.bind(path)
38
return s
39
40
@staticmethod
41
def mcast_join_INET6(addr: str, port: int):
42
pass
43
44
def jointest(self, vnet):
45
"""Let the coordinator know that we're ready, and wait for go-ahead."""
46
coord = self._makesock(vnet.alias + ".sock")
47
self.sendmsg(b"ok " + vnet.alias.encode(), self.COORD_SOCK)
48
self._msgwait(coord, b"join")
49
50
def donetest(self):
51
"""Let the coordinator that we completed successfully."""
52
self.sendmsg(b"done", self.COORD_SOCK)
53
54
def starttest(self, vnets: list[str]):
55
self.vnets = vnets
56
for vnet in vnets:
57
self.sendmsg(b"join", vnet + ".sock")
58
59
def waittest(self):
60
for vnet in self.vnets:
61
self._msgwait(self.coord, b"done")
62
63
def setup_method(self, method):
64
self.coord = self._makesock(self.COORD_SOCK)
65
super().setup_method(method)
66
67
# Loop until all other hosts have sent the ok message.
68
received = set()
69
vnet_names = set(self.vnet_map.keys()) - {self.vnet.alias}
70
while len(received) < len(vnet_names):
71
msg = self.coord.recv(1024)
72
received.add(msg)
73
assert received == {b"ok " + name.encode() for name in vnet_names}
74
75
76
class MRouteINETTestTemplate(MRouteTestTemplate):
77
@staticmethod
78
def run_pimd(ident: str, ifaces: list[str], rpaddr: str, group: str, fib=0):
79
conf = f"pimd-{ident}.conf"
80
with open(conf, "w") as conf_file:
81
conf_file.write("no phyint\n")
82
for iface in ifaces:
83
conf_file.write(f"phyint {iface} enable\n")
84
conf_file.write(f"rp-address {rpaddr} {group}\n")
85
86
cmd = f"setfib {fib} pimd -i {ident} -f {conf} -p pimd-{ident}.pid -n"
87
return subprocess.Popen(cmd.split(), stdout=subprocess.DEVNULL,
88
stderr=subprocess.DEVNULL)
89
90
@staticmethod
91
def mcast_join(addr: str, port: int):
92
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
93
mreq = struct.pack("4si", socket.inet_aton(addr), socket.INADDR_ANY)
94
s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
95
s.bind((addr, port))
96
time.sleep(1) # Give the kernel a bit of time to join the group.
97
return s
98
99
@staticmethod
100
def mcast_sendto(addr: str, port: int, iface: str, msg: bytes):
101
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
102
mreqn = struct.pack("iii", socket.INADDR_ANY, socket.INADDR_ANY,
103
socket.if_nametoindex(iface))
104
s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, mreqn)
105
s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 64)
106
s.sendto(msg, (addr, port))
107
s.close()
108
109
def setup_method(self, method):
110
self.require_module("ip_mroute")
111
super().setup_method(method)
112
113
114
class MRouteINET6TestTemplate(MRouteTestTemplate):
115
@staticmethod
116
def run_ip6_mrouted(ident: str, ifaces: list[str], fib=0):
117
ifaces_str = ' '.join(f"-i {iface}" for iface in ifaces)
118
exepath = Path(__file__).parent / "ip6_mrouted"
119
cmd = f"setfib {fib} {exepath} {ifaces_str}"
120
return subprocess.Popen(cmd.split(), stdout=subprocess.DEVNULL,
121
stderr=subprocess.DEVNULL)
122
123
@staticmethod
124
def mcast_join(addr: str, port: int, iface: str):
125
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
126
mreq = struct.pack("16si", socket.inet_pton(socket.AF_INET6, addr),
127
socket.if_nametoindex(iface))
128
s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)
129
s.bind((addr, port))
130
time.sleep(1) # Give the kernel a bit of time to join the
131
return s
132
133
@staticmethod
134
def mcast_sendto(addr: str, port: int, iface: str, msg: bytes):
135
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
136
mreq = struct.pack("i", socket.if_nametoindex(iface))
137
s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, mreq)
138
s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, 64)
139
s.sendto(msg, (addr, port))
140
s.close()
141
142
def setup_method(self, method):
143
self.require_module("ip6_mroute")
144
super().setup_method(method)
145
146
147
class Test1RBasicINET(MRouteINETTestTemplate):
148
"""Basic multicast routing setup with 2 hosts connected via a router."""
149
150
TOPOLOGY = {
151
"vnet_router": {"ifaces": ["if1", "if2"]},
152
"vnet_host1": {"ifaces": ["if1"]},
153
"vnet_host2": {"ifaces": ["if2"]},
154
"if1": {"prefixes4": [("192.168.1.1/24", "192.168.1.2/24")]},
155
"if2": {"prefixes4": [("192.168.2.1/24", "192.168.2.2/24")]},
156
}
157
MULTICAST_ADDR = "239.0.0.1"
158
159
def setup_method(self, method):
160
# Create VNETs and start the handlers.
161
super().setup_method(method)
162
163
ifaces = [self.vnet.iface_alias_map[i].name for i in ["if1", "if2"]]
164
self.pimd = self.run_pimd("test", ifaces, "127.0.0.1", self.MULTICAST_ADDR + "/32")
165
time.sleep(3) # Give pimd a bit of time to get itself together.
166
167
def vnet_host1_handler(self, vnet):
168
self.jointest(vnet)
169
170
self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345)
171
172
# Wait for host 2 to send a message, then send a reply.
173
self._msgwait(self.sock, b"Hello, Multicast!")
174
self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
175
b"Goodbye, Multicast!")
176
self._msgwait(self.sock, b"Goodbye, Multicast!")
177
self.donetest()
178
179
def vnet_host2_handler(self, vnet):
180
self.jointest(vnet)
181
182
self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345)
183
184
# Send a message to host 1, then wait for a reply.
185
self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
186
b"Hello, Multicast!")
187
self._msgwait(self.sock, b"Hello, Multicast!")
188
self._msgwait(self.sock, b"Goodbye, Multicast!")
189
self.donetest()
190
191
@pytest.mark.require_user("root")
192
@pytest.mark.require_progs(["pimd"])
193
@pytest.mark.timeout(30)
194
def test(self):
195
self.starttest(["vnet_host1", "vnet_host2"])
196
self.waittest()
197
198
199
class Test1RCrissCrossINET(MRouteINETTestTemplate):
200
"""
201
Test a router connected to four hosts, with pairs of interfaces
202
in different FIBs.
203
"""
204
205
TOPOLOGY = {
206
"vnet_router": {"ifaces": ["if1", "if2", "if3", "if4"]},
207
"vnet_host1": {"ifaces": ["if1"]},
208
"vnet_host2": {"ifaces": ["if2"]},
209
"vnet_host3": {"ifaces": ["if3"]},
210
"vnet_host4": {"ifaces": ["if4"]},
211
"if1": {
212
"prefixes4": [("192.168.1.1/24", "192.168.1.2/24")],
213
"prefixes6": [],
214
"fib": (0, 0),
215
},
216
"if2": {
217
"prefixes4": [("192.168.2.1/24", "192.168.2.2/24")],
218
"prefixes6": [],
219
"fib": (0, 0),
220
},
221
"if3": {
222
"prefixes4": [("192.168.3.1/24", "192.168.3.2/24")],
223
"prefixes6": [],
224
"fib": (1, 0),
225
},
226
"if4": {
227
"prefixes4": [("192.168.4.1/24", "192.168.4.2/24")],
228
"prefixes6": [],
229
"fib": (1, 0),
230
},
231
}
232
MULTICAST_ADDR = "239.0.0.1"
233
234
def setup_method(self, method):
235
# Create VNETs and start the handlers.
236
super().setup_method(method)
237
238
# Start a pimd instance per FIB.
239
ifaces = [self.vnet.iface_alias_map[i].name for i in ["if1", "if2"]]
240
self.pimd0 = self.run_pimd("test0", ifaces, "127.0.0.1", self.MULTICAST_ADDR + "/32",
241
fib=0)
242
ifaces = [self.vnet.iface_alias_map[i].name for i in ["if3", "if4"]]
243
self.pimd1 = self.run_pimd("test1", ifaces, "127.0.0.1", self.MULTICAST_ADDR + "/32",
244
fib=1)
245
time.sleep(3) # Give pimd a bit of time to get itself together.
246
247
def vnet_host1_handler(self, vnet):
248
self.jointest(vnet)
249
250
self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345)
251
self._msgwait(self.sock, b"Hello, Multicast on FIB 0!")
252
self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
253
b"Goodbye, Multicast on FIB 0!")
254
self.donetest()
255
256
def vnet_host2_handler(self, vnet):
257
self.jointest(vnet)
258
self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345)
259
self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
260
b"Hello, Multicast on FIB 0!")
261
self._msgwait(self.sock, b"Hello, Multicast on FIB 0!")
262
self._msgwait(self.sock, b"Goodbye, Multicast on FIB 0!")
263
self.donetest()
264
265
def vnet_host3_handler(self, vnet):
266
self.jointest(vnet)
267
self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345)
268
self._msgwait(self.sock, b"Hello, Multicast on FIB 1!")
269
self.mcast_sendto(self.MULTICAST_ADDR, 12345,
270
vnet.ifaces[0].name, b"Goodbye, Multicast on FIB 1!")
271
self.donetest()
272
273
def vnet_host4_handler(self, vnet):
274
self.jointest(vnet)
275
self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345)
276
time.sleep(1)
277
self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
278
b"Hello, Multicast on FIB 1!")
279
self._msgwait(self.sock, b"Hello, Multicast on FIB 1!")
280
self._msgwait(self.sock, b"Goodbye, Multicast on FIB 1!")
281
self.donetest()
282
283
@pytest.mark.require_user("root")
284
@pytest.mark.require_progs(["pimd"])
285
@pytest.mark.timeout(30)
286
def test(self):
287
self.starttest(["vnet_host1", "vnet_host2", "vnet_host3", "vnet_host4"])
288
self.waittest()
289
290
291
class Test1RBasicINET6(MRouteINET6TestTemplate):
292
"""Basic multicast routing setup with 2 hosts connected via a router."""
293
294
TOPOLOGY = {
295
"vnet_router": {"ifaces": ["if1", "if2"]},
296
"vnet_host1": {"ifaces": ["if1"]},
297
"vnet_host2": {"ifaces": ["if2"]},
298
"if1": {
299
"prefixes6": [("2001:db8:0:1::1/64", "2001:db8:0:1::2/64")]
300
},
301
"if2": {
302
"prefixes6": [("2001:db8:0:2::1/64", "2001:db8:0:2::2/64")]
303
},
304
}
305
MULTICAST_ADDR = "ff05::1"
306
307
def setup_method(self, method):
308
# Create VNETs and start the handlers.
309
super().setup_method(method)
310
311
ifaces = [self.vnet.iface_alias_map[i].name for i in ["if1", "if2"]]
312
self.mrouted = self.run_ip6_mrouted("test", ifaces)
313
time.sleep(1)
314
315
def vnet_host1_handler(self, vnet):
316
self.jointest(vnet)
317
318
self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name)
319
320
# Wait for host 2 to send a message, then send a reply.
321
self._msgwait(self.sock, b"Hello, Multicast!")
322
self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
323
b"Goodbye, Multicast!")
324
self._msgwait(self.sock, b"Goodbye, Multicast!")
325
self.donetest()
326
327
def vnet_host2_handler(self, vnet):
328
self.jointest(vnet)
329
330
self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name)
331
332
# Send a message to host 1, then wait for a reply.
333
self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
334
b"Hello, Multicast!")
335
self._msgwait(self.sock, b"Hello, Multicast!")
336
self._msgwait(self.sock, b"Goodbye, Multicast!")
337
self.donetest()
338
339
@pytest.mark.require_user("root")
340
@pytest.mark.timeout(30)
341
def test(self):
342
self.starttest(["vnet_host1", "vnet_host2"])
343
self.waittest()
344
345
346
class Test1RCrissCrossINET6(MRouteINET6TestTemplate):
347
"""
348
Test a router connected to four hosts, with pairs of interfaces
349
in different FIBs.
350
"""
351
352
TOPOLOGY = {
353
"vnet_router": {"ifaces": ["if1", "if2", "if3", "if4"]},
354
"vnet_host1": {"ifaces": ["if1"]},
355
"vnet_host2": {"ifaces": ["if2"]},
356
"vnet_host3": {"ifaces": ["if3"]},
357
"vnet_host4": {"ifaces": ["if4"]},
358
"if1": {
359
"prefixes6": [("2001:db8:0:1::1/64", "2001:db8:0:1::2/64")],
360
"fib": (0, 0),
361
},
362
"if2": {
363
"prefixes6": [("2001:db8:0:2::1/64", "2001:db8:0:2::2/64")],
364
"fib": (0, 0),
365
},
366
"if3": {
367
"prefixes6": [("2001:db8:0:3::1/64", "2001:db8:0:3::2/64")],
368
"fib": (1, 0),
369
},
370
"if4": {
371
"prefixes6": [("2001:db8:0:4::1/64", "2001:db8:0:4::2/64")],
372
"fib": (1, 0),
373
},
374
}
375
MULTICAST_ADDR = "ff05::1"
376
377
def setup_method(self, method):
378
# Create VNETs and start the handlers.
379
super().setup_method(method)
380
381
# Start an ip6_mrouted instance per FIB.
382
ifaces = [self.vnet.iface_alias_map[i].name for i in ["if1", "if2"]]
383
self.pimd0 = self.run_ip6_mrouted("test0", ifaces, fib=0)
384
ifaces = [self.vnet.iface_alias_map[i].name for i in ["if3", "if4"]]
385
self.pimd1 = self.run_ip6_mrouted("test1", ifaces, fib=1)
386
time.sleep(1) # Give ip6_mrouted a bit of time to get itself together.
387
388
def vnet_host1_handler(self, vnet):
389
self.jointest(vnet)
390
391
self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name)
392
self._msgwait(self.sock, b"Hello, Multicast on FIB 0!")
393
self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
394
b"Goodbye, Multicast on FIB 0!")
395
self.donetest()
396
397
def vnet_host2_handler(self, vnet):
398
self.jointest(vnet)
399
self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name)
400
self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
401
b"Hello, Multicast on FIB 0!")
402
self._msgwait(self.sock, b"Hello, Multicast on FIB 0!")
403
self._msgwait(self.sock, b"Goodbye, Multicast on FIB 0!")
404
self.donetest()
405
406
def vnet_host3_handler(self, vnet):
407
self.jointest(vnet)
408
self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name)
409
self._msgwait(self.sock, b"Hello, Multicast on FIB 1!")
410
self.mcast_sendto(self.MULTICAST_ADDR, 12345,
411
vnet.ifaces[0].name, b"Goodbye, Multicast on FIB 1!")
412
self.donetest()
413
414
def vnet_host4_handler(self, vnet):
415
self.jointest(vnet)
416
self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name)
417
time.sleep(1)
418
self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
419
b"Hello, Multicast on FIB 1!")
420
self._msgwait(self.sock, b"Hello, Multicast on FIB 1!")
421
self._msgwait(self.sock, b"Goodbye, Multicast on FIB 1!")
422
self.donetest()
423
424
@pytest.mark.require_user("root")
425
@pytest.mark.timeout(30)
426
def test(self):
427
self.starttest(["vnet_host1", "vnet_host2", "vnet_host3", "vnet_host4"])
428
self.waittest()
429
430