Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/freebsd-src
Path: blob/main/tests/sys/net/routing/test_routing_l3.py
39604 views
1
import ipaddress
2
import socket
3
4
import pytest
5
from atf_python.sys.net.rtsock import RtConst
6
from atf_python.sys.net.rtsock import Rtsock
7
from atf_python.sys.net.rtsock import RtsockRtMessage
8
from atf_python.sys.net.rtsock import SaHelper
9
from atf_python.sys.net.tools import ToolsHelper
10
from atf_python.sys.net.vnet import SingleVnetTestTemplate
11
from atf_python.sys.net.vnet import VnetTestTemplate
12
13
14
class TestIfOps(VnetTestTemplate):
15
TOPOLOGY = {
16
"vnet1": {"ifaces": ["if1", "if2"]},
17
"if1": {"prefixes4": [], "prefixes6": []},
18
"if2": {"prefixes4": [], "prefixes6": []},
19
}
20
21
@pytest.mark.parametrize("family", ["inet", "inet6"])
22
@pytest.mark.require_user("root")
23
def test_change_prefix_route(self, family):
24
"""Tests that prefix route changes to the new one upon addr deletion"""
25
vnet = self.vnet_map["vnet1"]
26
first_iface = vnet.iface_alias_map["if1"]
27
second_iface = vnet.iface_alias_map["if2"]
28
if family == "inet":
29
first_addr = ipaddress.ip_interface("192.0.2.1/24")
30
second_addr = ipaddress.ip_interface("192.0.2.2/24")
31
else:
32
first_addr = ipaddress.ip_interface("2001:db8::1/64")
33
second_addr = ipaddress.ip_interface("2001:db8::2/64")
34
35
first_iface.setup_addr(str(first_addr))
36
second_iface.setup_addr(str(second_addr))
37
38
# At this time prefix should be pointing to the first interface
39
routes = ToolsHelper.get_routes(family)
40
px = [r for r in routes if r["destination"] == str(first_addr.network)][0]
41
assert px["interface-name"] == first_iface.name
42
43
# Now delete address from the first interface and verify switchover
44
first_iface.delete_addr(first_addr.ip)
45
46
routes = ToolsHelper.get_routes(family)
47
px = [r for r in routes if r["destination"] == str(first_addr.network)][0]
48
assert px["interface-name"] == second_iface.name
49
50
@pytest.mark.parametrize(
51
"family",
52
[
53
"inet",
54
pytest.param("inet6", marks=pytest.mark.xfail(reason="currently fails")),
55
],
56
)
57
@pytest.mark.require_user("root")
58
def test_change_prefix_route_same_iface(self, family):
59
"""Tests that prefix route changes to the new ifa upon addr deletion"""
60
vnet = self.vnet_map["vnet1"]
61
first_iface = vnet.iface_alias_map["if1"]
62
63
if family == "inet":
64
first_addr = ipaddress.ip_interface("192.0.2.1/24")
65
second_addr = ipaddress.ip_interface("192.0.2.2/24")
66
else:
67
first_addr = ipaddress.ip_interface("2001:db8::1/64")
68
second_addr = ipaddress.ip_interface("2001:db8::2/64")
69
70
first_iface.setup_addr(str(first_addr))
71
first_iface.setup_addr(str(second_addr))
72
73
# At this time prefix should be pointing to the first interface
74
routes = ToolsHelper.get_routes(family)
75
px = [r for r in routes if r["destination"] == str(first_addr.network)][0]
76
assert px["interface-name"] == first_iface.name
77
78
# Now delete address from the first interface and verify switchover
79
first_iface.delete_addr(str(first_addr.ip))
80
81
routes = ToolsHelper.get_routes(family)
82
px = [r for r in routes if r["destination"] == str(first_addr.network)][0]
83
nhop_kidx = px["nhop"]
84
assert px["interface-name"] == first_iface.name
85
nhops = ToolsHelper.get_nhops(family)
86
nh = [nh for nh in nhops if nh["index"] == nhop_kidx][0]
87
assert nh["ifa"] == str(second_addr.ip)
88
89
90
class TestRouteCornerCase1(SingleVnetTestTemplate):
91
@pytest.mark.parametrize("family", ["inet", "inet6"])
92
@pytest.mark.require_user("root")
93
def test_add_direct_route_p2p_wo_ifa(self, family):
94
95
tun_ifname = ToolsHelper.get_output("/sbin/ifconfig tun create").rstrip()
96
tun_ifindex = socket.if_nametoindex(tun_ifname)
97
assert tun_ifindex > 0
98
rtsock = Rtsock()
99
100
if family == "inet":
101
prefix = "172.16.0.0/12"
102
else:
103
prefix = "2a02:6b8::/64"
104
IFT_ETHER = 0x06
105
gw_link = SaHelper.link_sa(ifindex=tun_ifindex, iftype=IFT_ETHER)
106
107
msg = rtsock.new_rtm_add(prefix, gw_link)
108
msg.add_link_attr(RtConst.RTA_IFP, tun_ifindex)
109
rtsock.write_message(msg)
110
111
data = rtsock.read_data(msg.rtm_seq)
112
msg_in = RtsockRtMessage.from_bytes(data)
113
msg_in.print_in_message()
114
115
desired_sa = {
116
RtConst.RTA_DST: msg.get_sa(RtConst.RTA_DST),
117
RtConst.RTA_NETMASK: msg.get_sa(RtConst.RTA_NETMASK),
118
}
119
120
msg_in.verify(msg.rtm_type, desired_sa)
121
122