Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/freebsd-src
Path: blob/main/tests/sys/netpfil/pf/frag6.py
39536 views
1
import pytest
2
import logging
3
import random
4
logging.getLogger("scapy").setLevel(logging.CRITICAL)
5
from utils import DelayedSend
6
from atf_python.sys.net.tools import ToolsHelper
7
from atf_python.sys.net.vnet import VnetTestTemplate
8
9
class TestFrag6(VnetTestTemplate):
10
REQUIRED_MODULES = ["pf", "dummymbuf"]
11
TOPOLOGY = {
12
"vnet1": {"ifaces": ["if1"]},
13
"vnet2": {"ifaces": ["if1"]},
14
"if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]},
15
}
16
17
def vnet2_handler(self, vnet):
18
ifname = vnet.iface_alias_map["if1"].name
19
ToolsHelper.print_output("/sbin/pfctl -e")
20
ToolsHelper.pf_rules([
21
"scrub fragment reassemble min-ttl 10",
22
"pass",
23
"block in inet6 proto icmp6 icmp6-type echoreq",
24
])
25
ToolsHelper.print_output("/sbin/pfilctl link -i dummymbuf:inet6 inet6")
26
ToolsHelper.print_output("/sbin/sysctl net.dummymbuf.rules=\"inet6 in %s enlarge 3000;\"" % ifname)
27
28
def check_ping_reply(self, packet):
29
print(packet)
30
return False
31
32
@pytest.mark.require_user("root")
33
@pytest.mark.require_progs(["scapy"])
34
def test_dup_frag_hdr(self):
35
"Test packets with duplicate fragment headers"
36
srv_vnet = self.vnet_map["vnet2"]
37
38
# Import in the correct vnet, so at to not confuse Scapy
39
import scapy.all as sp
40
41
packet = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \
42
/ sp.IPv6ExtHdrFragment(offset = 0, m = 0) \
43
/ sp.IPv6ExtHdrFragment(offset = 0, m = 0) \
44
/ sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f00f') * 128))
45
46
# Delay the send so the sniffer is running when we transmit.
47
s = DelayedSend(packet)
48
49
packets = sp.sniff(iface=self.vnet.iface_alias_map["if1"].name,
50
timeout=3)
51
for p in packets:
52
assert not p.getlayer(sp.ICMPv6EchoReply)
53
54
@pytest.mark.require_user("root")
55
@pytest.mark.require_progs(["scapy"])
56
def test_overlong(self):
57
"Test overly long fragmented packet"
58
59
# Import in the correct vnet, so at to not confuse Scapy
60
import scapy.all as sp
61
62
curr = 0
63
pkts = []
64
65
frag_id = random.randint(0,0xffffffff)
66
gran = 1200
67
68
i = 0
69
while curr <= 65535:
70
ipv61 = sp.IPv6(src="2001:db8::1", dst="2001:db8::2")
71
more = True
72
g = gran
73
if curr + gran > 65535:
74
more = False
75
g = 65530 - curr
76
if i == 0:
77
pkt = ipv61 / sp.IPv6ExtHdrHopByHop(options=[sp.PadN(optlen=2), sp.Pad1()]) / \
78
sp.IPv6ExtHdrFragment(id = frag_id, offset = curr // 8, m = more) / bytes([i] * g)
79
else:
80
pkt = ipv61 / sp.IPv6ExtHdrFragment(id = frag_id, offset = curr // 8, m = more) / bytes([i] * g)
81
pkts.append(pkt)
82
curr += gran
83
i += 1
84
85
sp.send(pkts, inter = 0.1)
86
87
class TestFrag6HopHyHop(VnetTestTemplate):
88
REQUIRED_MODULES = ["pf"]
89
TOPOLOGY = {
90
"vnet1": {"ifaces": ["if1", "if2"]},
91
"vnet2": {"ifaces": ["if1", "if2"]},
92
"if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]},
93
"if2": {"prefixes6": [("2001:db8:666::1/64", "2001:db8:1::2/64")]},
94
}
95
96
def vnet2_handler(self, vnet):
97
ifname = vnet.iface_alias_map["if1"].name
98
ToolsHelper.print_output("/sbin/sysctl net.inet6.ip6.forwarding=1")
99
ToolsHelper.print_output("/usr/sbin/ndp -s 2001:db8:1::1 00:01:02:03:04:05")
100
ToolsHelper.print_output("/sbin/pfctl -e")
101
ToolsHelper.print_output("/sbin/pfctl -x loud")
102
ToolsHelper.pf_rules([
103
"scrub fragment reassemble min-ttl 10",
104
"pass allow-opts",
105
])
106
107
@pytest.mark.require_user("root")
108
@pytest.mark.require_progs(["scapy"])
109
def test_hop_by_hop(self):
110
"Verify that we reject non-first hop-by-hop headers"
111
if1 = self.vnet.iface_alias_map["if1"].name
112
if2 = self.vnet.iface_alias_map["if2"].name
113
ToolsHelper.print_output("/sbin/route add -6 default 2001:db8::2")
114
ToolsHelper.print_output("/sbin/ping6 -c 1 2001:db8:1::2")
115
116
# Import in the correct vnet, so at to not confuse Scapy
117
import scapy.all as sp
118
119
# A hop-by-hop header is accepted if it's the first header
120
pkt = sp.IPv6(src="2001:db8::1", dst="2001:db8:1::1") \
121
/ sp.IPv6ExtHdrHopByHop() \
122
/ sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f0') * 30))
123
pkt.show()
124
125
# Delay the send so the sniffer is running when we transmit.
126
s = DelayedSend(pkt)
127
128
replies = sp.sniff(iface=if2, timeout=3)
129
found = False
130
for p in replies:
131
p.show()
132
ip6 = p.getlayer(sp.IPv6)
133
hbh = p.getlayer(sp.IPv6ExtHdrHopByHop)
134
icmp6 = p.getlayer(sp.ICMPv6EchoRequest)
135
136
if not ip6 or not icmp6:
137
continue
138
assert ip6.src == "2001:db8::1"
139
assert ip6.dst == "2001:db8:1::1"
140
assert hbh
141
assert icmp6
142
found = True
143
assert found
144
145
# A hop-by-hop header causes the packet to be dropped if it's not the
146
# first extension header
147
pkt = sp.IPv6(src="2001:db8::1", dst="2001:db8:1::1") \
148
/ sp.IPv6ExtHdrFragment(offset=0, m=0) \
149
/ sp.IPv6ExtHdrHopByHop() \
150
/ sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f0') * 30))
151
pkt2 = sp.IPv6(src="2001:db8::1", dst="2001:db8:1::1") \
152
/ sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f0') * 30))
153
154
# Delay the send so the sniffer is running when we transmit.
155
ToolsHelper.print_output("/sbin/ping6 -c 1 2001:db8:1::2")
156
157
s = DelayedSend([ pkt2, pkt ])
158
replies = sp.sniff(iface=if2, timeout=10)
159
found = False
160
for p in replies:
161
# Expect to find the packet without the hop-by-hop header, not the
162
# one with
163
p.show()
164
ip6 = p.getlayer(sp.IPv6)
165
hbh = p.getlayer(sp.IPv6ExtHdrHopByHop)
166
icmp6 = p.getlayer(sp.ICMPv6EchoRequest)
167
168
if not ip6 or not icmp6:
169
continue
170
assert ip6.src == "2001:db8::1"
171
assert ip6.dst == "2001:db8:1::1"
172
assert not hbh
173
assert icmp6
174
found = True
175
assert found
176
177
class TestFrag6_Overlap(VnetTestTemplate):
178
REQUIRED_MODULES = ["pf"]
179
TOPOLOGY = {
180
"vnet1": {"ifaces": ["if1"]},
181
"vnet2": {"ifaces": ["if1"]},
182
"if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]},
183
}
184
185
def vnet2_handler(self, vnet):
186
ToolsHelper.print_output("/sbin/pfctl -e")
187
ToolsHelper.print_output("/sbin/pfctl -x loud")
188
ToolsHelper.pf_rules([
189
"scrub fragment reassemble",
190
"pass",
191
])
192
193
@pytest.mark.require_user("root")
194
@pytest.mark.require_progs(["scapy"])
195
def test_overlap(self):
196
"Ensure we discard packets with overlapping fragments"
197
198
# Import in the correct vnet, so at to not confuse Scapy
199
import scapy.all as sp
200
201
packet = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \
202
/ sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f00f') * 90))
203
frags = sp.fragment6(packet, 128)
204
assert len(frags) == 3
205
206
f = frags[0].getlayer(sp.IPv6ExtHdrFragment)
207
# Fragment with overlap
208
overlap = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \
209
/ sp.IPv6ExtHdrFragment(offset = 4, m = 1, id = f.id, nh = f.nh) \
210
/ sp.raw(bytes.fromhex('f00f') * 4)
211
frags = [ frags[0], frags[1], overlap, frags[2] ]
212
213
# Delay the send so the sniffer is running when we transmit.
214
s = DelayedSend(frags)
215
216
packets = sp.sniff(iface=self.vnet.iface_alias_map["if1"].name,
217
timeout=3)
218
for p in packets:
219
p.show()
220
assert not p.getlayer(sp.ICMPv6EchoReply)
221
222
class TestFrag6_RouteTo(VnetTestTemplate):
223
REQUIRED_MODULES = ["pf"]
224
TOPOLOGY = {
225
"vnet1": {"ifaces": ["if1"]},
226
"vnet2": {"ifaces": ["if1", "if2"]},
227
"vnet3": {"ifaces": ["if2"]},
228
"if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]},
229
"if2": {"prefixes6": [("2001:db8:1::1/64", "2001:db8:1::2/64")]},
230
}
231
232
def vnet2_handler(self, vnet):
233
if2name = vnet.iface_alias_map["if2"].name
234
ToolsHelper.print_output("/sbin/pfctl -e")
235
ToolsHelper.print_output("/sbin/pfctl -x loud")
236
ToolsHelper.pf_rules([
237
"scrub fragment reassemble",
238
"pass in route-to (%s 2001:db8:1::2) from 2001:db8::1 to 2001:db8:666::1" % if2name,
239
])
240
241
ToolsHelper.print_output("/sbin/ifconfig %s mtu 1300" % if2name)
242
ToolsHelper.print_output("/sbin/sysctl net.inet6.ip6.forwarding=1")
243
244
def vnet3_handler(self, vnet):
245
pass
246
247
@pytest.mark.require_user("root")
248
@pytest.mark.require_progs(["scapy"])
249
def test_too_big(self):
250
ToolsHelper.print_output("/sbin/route add -6 default 2001:db8::2")
251
252
# Import in the correct vnet, so at to not confuse Scapy
253
import scapy.all as sp
254
255
pkt = sp.IPv6(dst="2001:db8:666::1") \
256
/ sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f0') * 3000))
257
frags = sp.fragment6(pkt, 1320)
258
259
reply = sp.sr1(frags, timeout=3)
260
if reply:
261
reply.show()
262
263
assert reply
264
265
ip6 = reply.getlayer(sp.IPv6)
266
icmp6 = reply.getlayer(sp.ICMPv6PacketTooBig)
267
err_ip6 = reply.getlayer(sp.IPerror6)
268
269
assert ip6
270
assert ip6.src == "2001:db8::2"
271
assert ip6.dst == "2001:db8::1"
272
assert icmp6
273
assert icmp6.mtu == 1300
274
assert err_ip6
275
assert err_ip6.src == "2001:db8::1"
276
assert err_ip6.dst == "2001:db8:666::1"
277
278