Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/freebsd-src
Path: blob/main/tests/sys/netpfil/pf/header.py
39535 views
1
#
2
# SPDX-License-Identifier: BSD-2-Clause
3
#
4
# Copyright (c) 2025 Rubicon Communications, LLC (Netgate)
5
#
6
# Redistribution and use in source and binary forms, with or without
7
# modification, are permitted provided that the following conditions
8
# are met:
9
# 1. Redistributions of source code must retain the above copyright
10
# notice, this list of conditions and the following disclaimer.
11
# 2. Redistributions in binary form must reproduce the above copyright
12
# notice, this list of conditions and the following disclaimer in the
13
# documentation and/or other materials provided with the distribution.
14
#
15
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18
# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25
# SUCH DAMAGE.
26
27
import pytest
28
import re
29
from utils import DelayedSend
30
from atf_python.sys.net.tools import ToolsHelper
31
from atf_python.sys.net.vnet import VnetTestTemplate
32
33
class TestHeader(VnetTestTemplate):
34
REQUIRED_MODULES = [ "pf" ]
35
TOPOLOGY = {
36
"vnet1": {"ifaces": ["if1", "if2"]},
37
"vnet2": {"ifaces": ["if1", "if2"]},
38
"if1": {"prefixes4": [("192.0.2.2/24", "192.0.2.1/24")]},
39
"if2": {"prefixes4": [("198.51.100.1/24", "198.51.100.2/24")]},
40
}
41
42
def vnet2_handler(self, vnet):
43
ToolsHelper.print_output("/sbin/sysctl net.inet.ip.forwarding=1")
44
ToolsHelper.print_output("/usr/sbin/arp -s 198.51.100.3 00:01:02:03:04:05")
45
ToolsHelper.print_output("/sbin/pfctl -e")
46
ToolsHelper.print_output("/sbin/pfctl -x loud")
47
ToolsHelper.pf_rules([
48
"pass",
49
])
50
51
@pytest.mark.require_user("root")
52
@pytest.mark.require_progs(["scapy"])
53
def test_too_many(self):
54
"Verify that we drop packets with silly numbers of headers."
55
56
sendif = self.vnet.iface_alias_map["if1"]
57
recvif = self.vnet.iface_alias_map["if2"].name
58
gw_mac = sendif.epairb.ether
59
60
ToolsHelper.print_output("/sbin/route add default 192.0.2.1")
61
62
# Import in the correct vnet, so at to not confuse Scapy
63
import scapy.all as sp
64
65
# Sanity check, ensure we get replies to normal ping
66
pkt = sp.Ether(dst=gw_mac) \
67
/ sp.IP(dst="198.51.100.3") \
68
/ sp.ICMP(type='echo-request')
69
s = DelayedSend(pkt, sendif.name)
70
reply = sp.sniff(iface=recvif, timeout=3)
71
print(reply)
72
73
found = False
74
for r in reply:
75
r.show()
76
77
icmp = r.getlayer(sp.ICMP)
78
if not icmp:
79
continue
80
assert icmp.type == 8 # 'echo-request'
81
found = True
82
assert found
83
84
# Up to 19 AH headers will pass
85
pkt = sp.Ether(dst=gw_mac) \
86
/ sp.IP(dst="198.51.100.3")
87
for i in range(0, 18):
88
pkt = pkt / sp.AH(nh=51, payloadlen=1)
89
pkt = pkt / sp.AH(nh=1, payloadlen=1) / sp.ICMP(type='echo-request')
90
91
s = DelayedSend(pkt, sendif.name)
92
reply = sp.sniff(iface=recvif, timeout=3)
93
print(reply)
94
found = False
95
for r in reply:
96
r.show()
97
98
ah = r.getlayer(sp.AH)
99
if not ah:
100
continue
101
found = True
102
assert found
103
104
# But more will get dropped
105
pkt = sp.Ether(dst=gw_mac) \
106
/ sp.IP(dst="198.51.100.3")
107
for i in range(0, 19):
108
pkt = pkt / sp.AH(nh=51, payloadlen=1)
109
pkt = pkt / sp.AH(nh=1, payloadlen=1) / sp.ICMP(type='echo-request')
110
111
s = DelayedSend(pkt, sendif.name)
112
reply = sp.sniff(iface=recvif, timeout=3)
113
print(reply)
114
115
found = False
116
for r in reply:
117
r.show()
118
119
ah = r.getlayer(sp.AH)
120
if not ah:
121
continue
122
found = True
123
assert not found
124
125
class TestHeader6(VnetTestTemplate):
126
REQUIRED_MODULES = [ "pf" ]
127
SKIP_MODULES = [ "ipfilter" ]
128
TOPOLOGY = {
129
"vnet1": {"ifaces": ["if1", "if2"]},
130
"vnet2": {"ifaces": ["if1", "if2"]},
131
"if1": {"prefixes6": [("2001:db8::2/64", "2001:db8::1/64")]},
132
"if2": {"prefixes6": [("2001:db8:1::2/64", "2001:db8:1::1/64")]},
133
}
134
135
def vnet2_handler(self, vnet):
136
ToolsHelper.print_output("/sbin/sysctl net.inet6.ip6.forwarding=1")
137
ToolsHelper.print_output("/usr/sbin/ndp -s 2001:db8:1::3 00:01:02:03:04:05")
138
ToolsHelper.print_output("/sbin/pfctl -e")
139
ToolsHelper.print_output("/sbin/pfctl -x loud")
140
ToolsHelper.pf_rules([
141
"pass",
142
])
143
144
@pytest.mark.require_user("root")
145
@pytest.mark.require_progs(["scapy"])
146
def test_too_many(self):
147
"Verify that we drop packets with silly numbers of headers."
148
ToolsHelper.print_output("/sbin/ifconfig")
149
150
sendif = self.vnet.iface_alias_map["if1"]
151
recvif = self.vnet.iface_alias_map["if2"].name
152
our_mac = sendif.ether
153
gw_mac = sendif.epairb.ether
154
155
ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1")
156
157
# Import in the correct vnet, so at to not confuse Scapy
158
import scapy.all as sp
159
160
# Sanity check, ensure we get replies to normal ping
161
pkt = sp.Ether(src=our_mac, dst=gw_mac) \
162
/ sp.IPv6(src="2001:db8::2", dst="2001:db8:1::3") \
163
/ sp.ICMPv6EchoRequest()
164
s = DelayedSend(pkt, sendif.name)
165
reply = sp.sniff(iface=recvif, timeout=3)
166
print(reply)
167
168
found = False
169
for r in reply:
170
r.show()
171
172
icmp = r.getlayer(sp.ICMPv6EchoRequest)
173
if not icmp:
174
continue
175
found = True
176
assert found
177
178
# Up to 19 AH headers will pass
179
pkt = sp.Ether(src=our_mac, dst=gw_mac) \
180
/ sp.IPv6(src="2001:db8::2", dst="2001:db8:1::3")
181
for i in range(0, 18):
182
pkt = pkt / sp.AH(nh=51, payloadlen=1)
183
pkt = pkt / sp.AH(nh=58, payloadlen=1) / sp.ICMPv6EchoRequest()
184
s = DelayedSend(pkt, sendif.name)
185
reply = sp.sniff(iface=recvif, timeout=3)
186
print(reply)
187
188
found = False
189
for r in reply:
190
r.show()
191
192
ah = r.getlayer(sp.AH)
193
if not ah:
194
continue
195
found = True
196
assert found
197
198
# But more will get dropped
199
pkt = sp.Ether(src=our_mac, dst=gw_mac) \
200
/ sp.IPv6(src="2001:db8::2", dst="2001:db8:1::3")
201
for i in range(0, 19):
202
pkt = pkt / sp.AH(nh=51, payloadlen=1)
203
pkt = pkt / sp.AH(nh=58, payloadlen=1) / sp.ICMPv6EchoRequest()
204
s = DelayedSend(pkt, sendif.name)
205
reply = sp.sniff(iface=recvif, timeout=3)
206
print(reply)
207
208
found = False
209
for r in reply:
210
r.show()
211
212
ah = r.getlayer(sp.AH)
213
if not ah:
214
continue
215
found = True
216
assert not found
217
218