import pytest
import socket
import struct
import subprocess
import time
from pathlib import Path
from atf_python.sys.net.vnet import VnetTestTemplate
class MRouteTestTemplate(VnetTestTemplate):
"""
Helper class for multicast routing tests. Test classes should inherit from this one.
"""
COORD_SOCK = "coord.sock"
@staticmethod
def _msgwait(sock: socket.socket, expected: bytes):
msg = sock.recv(1024)
assert msg == expected
@staticmethod
def sendmsg(msg: bytes, path: str):
s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
s.sendto(msg, path)
s.close()
@staticmethod
def _makesock(path: str):
s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
s.bind(path)
return s
@staticmethod
def mcast_join_INET6(addr: str, port: int):
pass
def jointest(self, vnet):
"""Let the coordinator know that we're ready, and wait for go-ahead."""
coord = self._makesock(vnet.alias + ".sock")
self.sendmsg(b"ok " + vnet.alias.encode(), self.COORD_SOCK)
self._msgwait(coord, b"join")
def donetest(self):
"""Let the coordinator that we completed successfully."""
self.sendmsg(b"done", self.COORD_SOCK)
def starttest(self, vnets: list[str]):
self.vnets = vnets
for vnet in vnets:
self.sendmsg(b"join", vnet + ".sock")
def waittest(self):
for vnet in self.vnets:
self._msgwait(self.coord, b"done")
def setup_method(self, method):
self.coord = self._makesock(self.COORD_SOCK)
super().setup_method(method)
received = set()
vnet_names = set(self.vnet_map.keys()) - {self.vnet.alias}
while len(received) < len(vnet_names):
msg = self.coord.recv(1024)
received.add(msg)
assert received == {b"ok " + name.encode() for name in vnet_names}
class MRouteINETTestTemplate(MRouteTestTemplate):
@staticmethod
def run_pimd(ident: str, ifaces: list[str], rpaddr: str, group: str, fib=0):
conf = f"pimd-{ident}.conf"
with open(conf, "w") as conf_file:
conf_file.write("no phyint\n")
for iface in ifaces:
conf_file.write(f"phyint {iface} enable\n")
conf_file.write(f"rp-address {rpaddr} {group}\n")
cmd = f"setfib {fib} pimd -i {ident} -f {conf} -p pimd-{ident}.pid -n"
return subprocess.Popen(cmd.split(), stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
@staticmethod
def mcast_join(addr: str, port: int):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
mreq = struct.pack("4si", socket.inet_aton(addr), socket.INADDR_ANY)
s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
s.bind((addr, port))
time.sleep(1)
return s
@staticmethod
def mcast_sendto(addr: str, port: int, iface: str, msg: bytes):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
mreqn = struct.pack("iii", socket.INADDR_ANY, socket.INADDR_ANY,
socket.if_nametoindex(iface))
s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, mreqn)
s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 64)
s.sendto(msg, (addr, port))
s.close()
def setup_method(self, method):
self.require_module("ip_mroute")
super().setup_method(method)
class MRouteINET6TestTemplate(MRouteTestTemplate):
@staticmethod
def run_ip6_mrouted(ident: str, ifaces: list[str], fib=0):
ifaces_str = ' '.join(f"-i {iface}" for iface in ifaces)
exepath = Path(__file__).parent / "ip6_mrouted"
cmd = f"setfib {fib} {exepath} {ifaces_str}"
return subprocess.Popen(cmd.split(), stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
@staticmethod
def mcast_join(addr: str, port: int, iface: str):
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
mreq = struct.pack("16si", socket.inet_pton(socket.AF_INET6, addr),
socket.if_nametoindex(iface))
s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)
s.bind((addr, port))
time.sleep(1)
return s
@staticmethod
def mcast_sendto(addr: str, port: int, iface: str, msg: bytes):
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
mreq = struct.pack("i", socket.if_nametoindex(iface))
s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, mreq)
s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, 64)
s.sendto(msg, (addr, port))
s.close()
def setup_method(self, method):
self.require_module("ip6_mroute")
super().setup_method(method)
class Test1RBasicINET(MRouteINETTestTemplate):
"""Basic multicast routing setup with 2 hosts connected via a router."""
TOPOLOGY = {
"vnet_router": {"ifaces": ["if1", "if2"]},
"vnet_host1": {"ifaces": ["if1"]},
"vnet_host2": {"ifaces": ["if2"]},
"if1": {"prefixes4": [("192.168.1.1/24", "192.168.1.2/24")]},
"if2": {"prefixes4": [("192.168.2.1/24", "192.168.2.2/24")]},
}
MULTICAST_ADDR = "239.0.0.1"
def setup_method(self, method):
super().setup_method(method)
ifaces = [self.vnet.iface_alias_map[i].name for i in ["if1", "if2"]]
self.pimd = self.run_pimd("test", ifaces, "127.0.0.1", self.MULTICAST_ADDR + "/32")
time.sleep(3)
def vnet_host1_handler(self, vnet):
self.jointest(vnet)
self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345)
self._msgwait(self.sock, b"Hello, Multicast!")
self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
b"Goodbye, Multicast!")
self._msgwait(self.sock, b"Goodbye, Multicast!")
self.donetest()
def vnet_host2_handler(self, vnet):
self.jointest(vnet)
self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345)
self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
b"Hello, Multicast!")
self._msgwait(self.sock, b"Hello, Multicast!")
self._msgwait(self.sock, b"Goodbye, Multicast!")
self.donetest()
@pytest.mark.require_user("root")
@pytest.mark.require_progs(["pimd"])
@pytest.mark.timeout(30)
def test(self):
self.starttest(["vnet_host1", "vnet_host2"])
self.waittest()
class Test1RCrissCrossINET(MRouteINETTestTemplate):
"""
Test a router connected to four hosts, with pairs of interfaces
in different FIBs.
"""
TOPOLOGY = {
"vnet_router": {"ifaces": ["if1", "if2", "if3", "if4"]},
"vnet_host1": {"ifaces": ["if1"]},
"vnet_host2": {"ifaces": ["if2"]},
"vnet_host3": {"ifaces": ["if3"]},
"vnet_host4": {"ifaces": ["if4"]},
"if1": {
"prefixes4": [("192.168.1.1/24", "192.168.1.2/24")],
"prefixes6": [],
"fib": (0, 0),
},
"if2": {
"prefixes4": [("192.168.2.1/24", "192.168.2.2/24")],
"prefixes6": [],
"fib": (0, 0),
},
"if3": {
"prefixes4": [("192.168.3.1/24", "192.168.3.2/24")],
"prefixes6": [],
"fib": (1, 0),
},
"if4": {
"prefixes4": [("192.168.4.1/24", "192.168.4.2/24")],
"prefixes6": [],
"fib": (1, 0),
},
}
MULTICAST_ADDR = "239.0.0.1"
def setup_method(self, method):
super().setup_method(method)
ifaces = [self.vnet.iface_alias_map[i].name for i in ["if1", "if2"]]
self.pimd0 = self.run_pimd("test0", ifaces, "127.0.0.1", self.MULTICAST_ADDR + "/32",
fib=0)
ifaces = [self.vnet.iface_alias_map[i].name for i in ["if3", "if4"]]
self.pimd1 = self.run_pimd("test1", ifaces, "127.0.0.1", self.MULTICAST_ADDR + "/32",
fib=1)
time.sleep(3)
def vnet_host1_handler(self, vnet):
self.jointest(vnet)
self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345)
self._msgwait(self.sock, b"Hello, Multicast on FIB 0!")
self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
b"Goodbye, Multicast on FIB 0!")
self.donetest()
def vnet_host2_handler(self, vnet):
self.jointest(vnet)
self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345)
self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
b"Hello, Multicast on FIB 0!")
self._msgwait(self.sock, b"Hello, Multicast on FIB 0!")
self._msgwait(self.sock, b"Goodbye, Multicast on FIB 0!")
self.donetest()
def vnet_host3_handler(self, vnet):
self.jointest(vnet)
self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345)
self._msgwait(self.sock, b"Hello, Multicast on FIB 1!")
self.mcast_sendto(self.MULTICAST_ADDR, 12345,
vnet.ifaces[0].name, b"Goodbye, Multicast on FIB 1!")
self.donetest()
def vnet_host4_handler(self, vnet):
self.jointest(vnet)
self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345)
time.sleep(1)
self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
b"Hello, Multicast on FIB 1!")
self._msgwait(self.sock, b"Hello, Multicast on FIB 1!")
self._msgwait(self.sock, b"Goodbye, Multicast on FIB 1!")
self.donetest()
@pytest.mark.require_user("root")
@pytest.mark.require_progs(["pimd"])
@pytest.mark.timeout(30)
def test(self):
self.starttest(["vnet_host1", "vnet_host2", "vnet_host3", "vnet_host4"])
self.waittest()
class Test1RBasicINET6(MRouteINET6TestTemplate):
"""Basic multicast routing setup with 2 hosts connected via a router."""
TOPOLOGY = {
"vnet_router": {"ifaces": ["if1", "if2"]},
"vnet_host1": {"ifaces": ["if1"]},
"vnet_host2": {"ifaces": ["if2"]},
"if1": {
"prefixes6": [("2001:db8:0:1::1/64", "2001:db8:0:1::2/64")]
},
"if2": {
"prefixes6": [("2001:db8:0:2::1/64", "2001:db8:0:2::2/64")]
},
}
MULTICAST_ADDR = "ff05::1"
def setup_method(self, method):
super().setup_method(method)
ifaces = [self.vnet.iface_alias_map[i].name for i in ["if1", "if2"]]
self.mrouted = self.run_ip6_mrouted("test", ifaces)
time.sleep(1)
def vnet_host1_handler(self, vnet):
self.jointest(vnet)
self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name)
self._msgwait(self.sock, b"Hello, Multicast!")
self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
b"Goodbye, Multicast!")
self._msgwait(self.sock, b"Goodbye, Multicast!")
self.donetest()
def vnet_host2_handler(self, vnet):
self.jointest(vnet)
self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name)
self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
b"Hello, Multicast!")
self._msgwait(self.sock, b"Hello, Multicast!")
self._msgwait(self.sock, b"Goodbye, Multicast!")
self.donetest()
@pytest.mark.require_user("root")
@pytest.mark.timeout(30)
def test(self):
self.starttest(["vnet_host1", "vnet_host2"])
self.waittest()
class Test1RCrissCrossINET6(MRouteINET6TestTemplate):
"""
Test a router connected to four hosts, with pairs of interfaces
in different FIBs.
"""
TOPOLOGY = {
"vnet_router": {"ifaces": ["if1", "if2", "if3", "if4"]},
"vnet_host1": {"ifaces": ["if1"]},
"vnet_host2": {"ifaces": ["if2"]},
"vnet_host3": {"ifaces": ["if3"]},
"vnet_host4": {"ifaces": ["if4"]},
"if1": {
"prefixes6": [("2001:db8:0:1::1/64", "2001:db8:0:1::2/64")],
"fib": (0, 0),
},
"if2": {
"prefixes6": [("2001:db8:0:2::1/64", "2001:db8:0:2::2/64")],
"fib": (0, 0),
},
"if3": {
"prefixes6": [("2001:db8:0:3::1/64", "2001:db8:0:3::2/64")],
"fib": (1, 0),
},
"if4": {
"prefixes6": [("2001:db8:0:4::1/64", "2001:db8:0:4::2/64")],
"fib": (1, 0),
},
}
MULTICAST_ADDR = "ff05::1"
def setup_method(self, method):
super().setup_method(method)
ifaces = [self.vnet.iface_alias_map[i].name for i in ["if1", "if2"]]
self.pimd0 = self.run_ip6_mrouted("test0", ifaces, fib=0)
ifaces = [self.vnet.iface_alias_map[i].name for i in ["if3", "if4"]]
self.pimd1 = self.run_ip6_mrouted("test1", ifaces, fib=1)
time.sleep(1)
def vnet_host1_handler(self, vnet):
self.jointest(vnet)
self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name)
self._msgwait(self.sock, b"Hello, Multicast on FIB 0!")
self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
b"Goodbye, Multicast on FIB 0!")
self.donetest()
def vnet_host2_handler(self, vnet):
self.jointest(vnet)
self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name)
self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
b"Hello, Multicast on FIB 0!")
self._msgwait(self.sock, b"Hello, Multicast on FIB 0!")
self._msgwait(self.sock, b"Goodbye, Multicast on FIB 0!")
self.donetest()
def vnet_host3_handler(self, vnet):
self.jointest(vnet)
self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name)
self._msgwait(self.sock, b"Hello, Multicast on FIB 1!")
self.mcast_sendto(self.MULTICAST_ADDR, 12345,
vnet.ifaces[0].name, b"Goodbye, Multicast on FIB 1!")
self.donetest()
def vnet_host4_handler(self, vnet):
self.jointest(vnet)
self.sock = self.mcast_join(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name)
time.sleep(1)
self.mcast_sendto(self.MULTICAST_ADDR, 12345, vnet.ifaces[0].name,
b"Hello, Multicast on FIB 1!")
self._msgwait(self.sock, b"Hello, Multicast on FIB 1!")
self._msgwait(self.sock, b"Goodbye, Multicast on FIB 1!")
self.donetest()
@pytest.mark.require_user("root")
@pytest.mark.timeout(30)
def test(self):
self.starttest(["vnet_host1", "vnet_host2", "vnet_host3", "vnet_host4"])
self.waittest()