Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/freebsd-src
Path: blob/main/tests/sys/netpfil/pf/nat64.py
39536 views
1
#
2
# SPDX-License-Identifier: BSD-2-Clause
3
#
4
# Copyright (c) 2024 Rubicon Communications, LLC (Netgate)
5
#
6
# Redistribution and use in source and binary forms, with or without
7
# modification, are permitted provided that the following conditions
8
# are met:
9
# 1. Redistributions of source code must retain the above copyright
10
# notice, this list of conditions and the following disclaimer.
11
# 2. Redistributions in binary form must reproduce the above copyright
12
# notice, this list of conditions and the following disclaimer in the
13
# documentation and/or other materials provided with the distribution.
14
#
15
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18
# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25
# SUCH DAMAGE.
26
27
import pytest
28
import selectors
29
import socket
30
import sys
31
from utils import DelayedSend
32
from atf_python.sys.net.tools import ToolsHelper
33
from atf_python.sys.net.vnet import VnetTestTemplate
34
35
class TestNAT64(VnetTestTemplate):
36
REQUIRED_MODULES = [ "pf", "pflog" ]
37
TOPOLOGY = {
38
"vnet1": {"ifaces": ["if1"]},
39
"vnet2": {"ifaces": ["if1", "if2"]},
40
"vnet3": {"ifaces": ["if2", "if3"]},
41
"vnet4": {"ifaces": ["if3"]},
42
"if1": {"prefixes6": [("2001:db8::2/64", "2001:db8::1/64")]},
43
"if2": {"prefixes4": [("192.0.2.1/24", "192.0.2.2/24")]},
44
"if3": {"prefixes4": [("198.51.100.1/24", "198.51.100.2/24")]}
45
}
46
47
def vnet4_handler(self, vnet):
48
ToolsHelper.print_output("/sbin/route add default 198.51.100.1")
49
50
def vnet3_handler(self, vnet):
51
ToolsHelper.print_output("/sbin/sysctl net.inet.ip.forwarding=1")
52
ToolsHelper.print_output("/sbin/sysctl net.inet.ip.ttl=62")
53
ToolsHelper.print_output("/sbin/sysctl net.inet.udp.checksum=0")
54
55
sel = selectors.DefaultSelector()
56
t = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
57
t.bind(("0.0.0.0", 1234))
58
t.setblocking(False)
59
t.listen()
60
sel.register(t, selectors.EVENT_READ, data=None)
61
62
u = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
63
u.bind(("0.0.0.0", 4444))
64
u.setblocking(False)
65
sel.register(u, selectors.EVENT_READ, data="UDP")
66
67
while True:
68
events = sel.select(timeout=20)
69
for key, mask in events:
70
sock = key.fileobj
71
if key.data is None:
72
conn, addr = sock.accept()
73
print(f"Accepted connection from {addr}")
74
data = types.SimpleNamespace(addr=addr, inb=b"", outb=b"")
75
events = selectors.EVENT_READ | selectors.EVENT_WRITE
76
sel.register(conn, events, data=data)
77
elif key.data == "UDP":
78
recv_data, addr = sock.recvfrom(1024)
79
print(f"Received UDP {recv_data} from {addr}")
80
sock.sendto(b"foo", addr)
81
else:
82
if mask & selectors.EVENT_READ:
83
recv_data = sock.recv(1024)
84
print(f"Received TCP {recv_data}")
85
sock.send(b"foo")
86
else:
87
print("Unknown event?")
88
t.close()
89
u.close()
90
return
91
92
def vnet2_handler(self, vnet):
93
ifname = vnet.iface_alias_map["if1"].name
94
95
ToolsHelper.print_output("/sbin/sysctl net.inet6.ip6.forwarding=1")
96
ToolsHelper.print_output("/sbin/route add default 192.0.2.2")
97
ToolsHelper.print_output("/sbin/pfctl -e")
98
ToolsHelper.pf_rules([
99
"block",
100
"pass inet6 proto icmp6 icmp6-type { neighbrsol, neighbradv }",
101
"pass in on %s inet6 af-to inet from 192.0.2.1" % ifname,
102
])
103
104
vnet.pipe.send(socket.if_nametoindex("pflog0"))
105
106
@pytest.mark.require_user("root")
107
@pytest.mark.require_progs(["scapy"])
108
def test_tcp_rst(self):
109
ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1")
110
111
import scapy.all as sp
112
113
# Send a SYN
114
packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \
115
/ sp.TCP(dport=1222, flags="S")
116
117
# Get a reply
118
reply = sp.sr1(packet)
119
120
# We expect to get a RST here.
121
tcp = reply.getlayer(sp.TCP)
122
assert tcp
123
assert "R" in tcp.flags
124
125
# Now try to SYN to an open port
126
packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \
127
/ sp.TCP(dport=1234, flags="S")
128
reply = sp.sr1(packet)
129
130
tcp = reply.getlayer(sp.TCP)
131
assert tcp
132
133
# We don't get RST
134
assert "R" not in tcp.flags
135
136
# We do get SYN|ACK
137
assert "S" in tcp.flags
138
assert "A" in tcp.flags
139
140
@pytest.mark.require_user("root")
141
@pytest.mark.require_progs(["scapy"])
142
def test_udp_port_closed(self):
143
ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1")
144
145
import scapy.all as sp
146
147
packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \
148
/ sp.UDP(dport=1222) / sp.Raw("bar")
149
reply = sp.sr1(packet, timeout=3)
150
print(reply.show())
151
152
# We expect an ICMPv6 error, not a UDP reply
153
assert not reply.getlayer(sp.UDP)
154
icmp = reply.getlayer(sp.ICMPv6DestUnreach)
155
assert icmp
156
assert icmp.type == 1
157
assert icmp.code == 4
158
udp = reply.getlayer(sp.UDPerror)
159
assert udp
160
assert udp.dport == 1222
161
162
@pytest.mark.require_user("root")
163
@pytest.mark.require_progs(["scapy"])
164
def test_address_unreachable(self):
165
ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1")
166
167
import scapy.all as sp
168
169
packet = sp.IPv6(dst="64:ff9b::203.0.113.2") \
170
/ sp.UDP(dport=1222) / sp.Raw("bar")
171
reply = sp.sr1(packet, timeout=3)
172
print(reply.show())
173
174
# We expect an ICMPv6 error, not a UDP reply
175
assert not reply.getlayer(sp.UDP)
176
icmp = reply.getlayer(sp.ICMPv6DestUnreach)
177
assert icmp
178
assert icmp.type == 1
179
assert icmp.code == 0
180
udp = reply.getlayer(sp.UDPerror)
181
assert udp
182
assert udp.dport == 1222
183
184
# Check the hop limit
185
ip6 = reply.getlayer(sp.IPv6)
186
assert ip6.hlim == 61
187
188
@pytest.mark.require_user("root")
189
@pytest.mark.require_progs(["scapy"])
190
def test_udp_checksum(self):
191
ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1")
192
193
import scapy.all as sp
194
195
# Send an outbound UDP packet to establish state
196
packet = sp.IPv6(dst="64:ff9b::192.0.2.2") \
197
/ sp.UDP(sport=3333, dport=4444) / sp.Raw("foo")
198
199
# Get a reply
200
# We'll send the reply without UDP checksum on the IPv4 side
201
# but that's not valid for IPv6, so expect pf to update the checksum.
202
reply = sp.sr1(packet, timeout=5)
203
204
udp = reply.getlayer(sp.UDP)
205
assert udp
206
assert udp.chksum != 0
207
208
def common_test_source_addr(self, packet):
209
vnet = self.vnet_map["vnet1"]
210
sendif = vnet.iface_alias_map["if1"].name
211
212
import scapy.all as sp
213
214
print("Outbound:\n")
215
packet.show()
216
217
s = DelayedSend(packet)
218
219
# We expect an ICMPv6 error here, where we'll verify the source address of
220
# the outer packet
221
packets = sp.sniff(iface=sendif, timeout=5)
222
223
for reply in packets:
224
print("Reply:\n")
225
reply.show()
226
icmp = reply.getlayer(sp.ICMPv6TimeExceeded)
227
if not icmp:
228
continue
229
230
ip = reply.getlayer(sp.IPv6)
231
assert icmp
232
assert ip.src == "64:ff9b::c000:202"
233
return reply
234
235
# If we don't find the packet we expect to see
236
assert False
237
238
@pytest.mark.require_user("root")
239
@pytest.mark.require_progs(["scapy"])
240
def test_source_addr_tcp(self):
241
ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1")
242
import scapy.all as sp
243
244
packet = sp.IPv6(dst="64:ff9b::198.51.100.2", hlim=2) \
245
/ sp.TCP(sport=1111, dport=2222, flags="S")
246
self.common_test_source_addr(packet)
247
248
@pytest.mark.require_user("root")
249
@pytest.mark.require_progs(["scapy"])
250
def test_source_addr_udp(self):
251
ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1")
252
import scapy.all as sp
253
254
packet = sp.IPv6(dst="64:ff9b::198.51.100.2", hlim=2) \
255
/ sp.UDP(sport=1111, dport=2222) / sp.Raw("foo")
256
self.common_test_source_addr(packet)
257
258
@pytest.mark.require_user("root")
259
@pytest.mark.require_progs(["scapy"])
260
def test_source_addr_sctp(self):
261
ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1")
262
import scapy.all as sp
263
264
packet = sp.IPv6(dst="64:ff9b::198.51.100.2", hlim=2) \
265
/ sp.SCTP(sport=1111, dport=2222) \
266
/ sp.SCTPChunkInit(init_tag=1, n_in_streams=1, n_out_streams=1, a_rwnd=1500)
267
self.common_test_source_addr(packet)
268
269
@pytest.mark.require_user("root")
270
@pytest.mark.require_progs(["scapy"])
271
def test_source_addr_icmp(self):
272
ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1")
273
import scapy.all as sp
274
275
packet = sp.IPv6(dst="64:ff9b::198.51.100.2", hlim=2) \
276
/ sp.ICMPv6EchoRequest() / sp.Raw("foo")
277
reply = self.common_test_source_addr(packet)
278
icmp = reply.getlayer(sp.ICMPv6EchoRequest)
279
assert icmp
280
281
@pytest.mark.require_user("root")
282
@pytest.mark.require_progs(["scapy"])
283
def test_bad_len(self):
284
"""
285
PR 288224: we can panic if the IPv6 plen is longer than the packet length.
286
"""
287
ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1")
288
import scapy.all as sp
289
290
packet = sp.IPv6(dst="64:ff9b::198.51.100.2", hlim=2, plen=512) \
291
/ sp.ICMPv6EchoRequest() / sp.Raw("foo")
292
reply = sp.sr1(packet, timeout=3)
293
# We don't expect a reply to a corrupted packet
294
assert not reply
295
296
@pytest.mark.require_user("root")
297
@pytest.mark.require_progs(["scapy"])
298
def test_noip6(self):
299
"""
300
PR 288263: link-local target address in icmp6 ADVERT can cause NULL deref
301
"""
302
ifname = self.vnet.iface_alias_map["if1"].name
303
gw_mac = self.vnet.iface_alias_map["if1"].epairb.ether
304
scopeid = self.wait_object(self.vnet_map["vnet2"].pipe)
305
ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1")
306
307
import scapy.all as sp
308
309
pkt = sp.Ether(dst=gw_mac) \
310
/ sp.IPv6(dst="64:ff9b::203.0.113.2") \
311
/ sp.ICMPv6ND_NA(tgt="FFA2:%x:2821:125F:1D27:B3B2:3F6F:C43C" % scopeid)
312
pkt.show()
313
sp.hexdump(pkt)
314
s = DelayedSend(pkt, sendif=ifname)
315
316
packets = sp.sniff(iface=ifname, timeout=5)
317
for r in packets:
318
r.show()
319
320
# Try scope id that likely doesn't have an interface at all
321
pkt = sp.Ether(dst=gw_mac) \
322
/ sp.IPv6(dst="64:ff9b::203.0.113.2") \
323
/ sp.ICMPv6ND_NA(tgt="FFA2:%x:2821:125F:1D27:B3B2:3F6F:C43C" % 255)
324
pkt.show()
325
sp.hexdump(pkt)
326
s = DelayedSend(pkt, sendif=ifname)
327
328
packets = sp.sniff(iface=ifname, timeout=5)
329
for r in packets:
330
r.show()
331
332
@pytest.mark.require_user("root")
333
@pytest.mark.require_progs(["scapy"])
334
def test_ttl_zero(self):
335
"""
336
PR 288274: we can use an mbuf after free on TTL = 0
337
"""
338
ifname = self.vnet.iface_alias_map["if1"].name
339
gw_mac = self.vnet.iface_alias_map["if1"].epairb.ether
340
ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1")
341
342
import scapy.all as sp
343
344
pkt = sp.Ether(dst=gw_mac) \
345
/ sp.IPv6(dst="64:ff9b::192.0.2.2", hlim=0) \
346
/ sp.SCTP(sport=1111, dport=2222) \
347
/ sp.SCTPChunkInit(init_tag=1, n_in_streams=1, n_out_streams=1, \
348
a_rwnd=1500, params=[ \
349
sp.SCTPChunkParamIPv4Addr() \
350
])
351
pkt.show()
352
sp.hexdump(pkt)
353
s = DelayedSend(pkt, sendif=ifname)
354
355
packets = sp.sniff(iface=ifname, timeout=5)
356
for r in packets:
357
r.show()
358
359
360