Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/freebsd-src
Path: blob/main/tests/sys/netpfil/common/pft_ping.py
39535 views
1
#!/usr/bin/env python3
2
#
3
# SPDX-License-Identifier: BSD-2-Clause
4
#
5
# Copyright (c) 2017 Kristof Provost <[email protected]>
6
# Copyright (c) 2023 Kajetan Staszkiewicz <[email protected]>
7
#
8
# Redistribution and use in source and binary forms, with or without
9
# modification, are permitted provided that the following conditions
10
# are met:
11
# 1. Redistributions of source code must retain the above copyright
12
# notice, this list of conditions and the following disclaimer.
13
# 2. Redistributions in binary form must reproduce the above copyright
14
# notice, this list of conditions and the following disclaimer in the
15
# documentation and/or other materials provided with the distribution.
16
#
17
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
18
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20
# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
21
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
23
# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
24
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
26
# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
27
# SUCH DAMAGE.
28
#
29
30
import argparse
31
import logging
32
logging.getLogger("scapy").setLevel(logging.CRITICAL)
33
import math
34
import scapy.all as sp
35
import sys
36
import socket
37
38
from copy import copy
39
from sniffer import Sniffer
40
41
logging.basicConfig(format='%(message)s')
42
LOGGER = logging.getLogger(__name__)
43
44
PAYLOAD_MAGIC = bytes.fromhex('42c0ffee')
45
46
def build_payload(l):
47
pl = len(PAYLOAD_MAGIC)
48
ret = PAYLOAD_MAGIC * math.floor(l/pl)
49
ret += PAYLOAD_MAGIC[0:(l % pl)]
50
return ret
51
52
53
def clean_params(params):
54
# Prepare a copy of safe copy of params
55
ret = copy(params)
56
ret.pop('src_address')
57
ret.pop('dst_address')
58
ret.pop('flags')
59
return ret
60
61
def prepare_ipv6(send_params):
62
src_address = send_params.get('src_address')
63
dst_address = send_params.get('dst_address')
64
hlim = send_params.get('hlim')
65
tc = send_params.get('tc')
66
ip6 = sp.IPv6(dst=dst_address)
67
if src_address:
68
ip6.src = src_address
69
if hlim:
70
ip6.hlim = hlim
71
if tc:
72
ip6.tc = tc
73
return ip6
74
75
76
def prepare_ipv4(send_params):
77
src_address = send_params.get('src_address')
78
dst_address = send_params.get('dst_address')
79
flags = send_params.get('flags')
80
tos = send_params.get('tc')
81
ttl = send_params.get('hlim')
82
opt = send_params.get('nop')
83
options = ''
84
if opt:
85
options='\x00'
86
ip = sp.IP(dst=dst_address, options=options)
87
if src_address:
88
ip.src = src_address
89
if flags:
90
ip.flags = flags
91
if tos:
92
ip.tos = tos
93
if ttl:
94
ip.ttl = ttl
95
return ip
96
97
98
def send_icmp_ping(send_params):
99
send_length = send_params['length']
100
send_frag_length = send_params['frag_length']
101
packets = []
102
ether = sp.Ether()
103
if ':' in send_params['dst_address']:
104
ip6 = prepare_ipv6(send_params)
105
icmp = sp.ICMPv6EchoRequest(data=sp.raw(build_payload(send_length)))
106
if send_frag_length:
107
for packet in sp.fragment6(ip6 / icmp, fragSize=send_frag_length):
108
packets.append(ether / packet)
109
else:
110
packets.append(ether / ip6 / icmp)
111
112
else:
113
ip = prepare_ipv4(send_params)
114
icmp = sp.ICMP(type='echo-request')
115
raw = sp.raw(build_payload(send_length))
116
if send_frag_length:
117
for packet in sp.fragment(ip / icmp / raw, fragsize=send_frag_length):
118
packets.append(ether / packet)
119
else:
120
packets.append(ether / ip / icmp / raw)
121
for packet in packets:
122
sp.sendp(packet, iface=send_params['sendif'], verbose=False)
123
124
125
def send_tcp_syn(send_params):
126
tcpopt_unaligned = send_params.get('tcpopt_unaligned')
127
seq = send_params.get('seq')
128
mss = send_params.get('mss')
129
ether = sp.Ether()
130
opts=[('Timestamp', (1, 1)), ('MSS', mss if mss else 1280)]
131
if tcpopt_unaligned:
132
opts = [('NOP', 0 )] + opts
133
if ':' in send_params['dst_address']:
134
ip = prepare_ipv6(send_params)
135
else:
136
ip = prepare_ipv4(send_params)
137
tcp = sp.TCP(
138
sport=send_params.get('sport'), dport=send_params.get('dport'),
139
flags='S', options=opts, seq=seq,
140
)
141
req = ether / ip / tcp
142
sp.sendp(req, iface=send_params['sendif'], verbose=False)
143
144
145
def send_udp(send_params):
146
LOGGER.debug(f'Sending UDP ping')
147
packets = []
148
send_length = send_params['length']
149
send_frag_length = send_params['frag_length']
150
ether = sp.Ether()
151
if ':' in send_params['dst_address']:
152
ip6 = prepare_ipv6(send_params)
153
udp = sp.UDP(
154
sport=send_params.get('sport'), dport=send_params.get('dport'),
155
)
156
raw = sp.Raw(load=build_payload(send_length))
157
if send_frag_length:
158
for packet in sp.fragment6(ip6 / udp / raw, fragSize=send_frag_length):
159
packets.append(ether / packet)
160
else:
161
packets.append(ether / ip6 / udp / raw)
162
else:
163
ip = prepare_ipv4(send_params)
164
udp = sp.UDP(
165
sport=send_params.get('sport'), dport=send_params.get('dport'),
166
)
167
raw = sp.Raw(load=build_payload(send_length))
168
if send_frag_length:
169
for packet in sp.fragment(ip / udp / raw, fragsize=send_frag_length):
170
packets.append(ether / packet)
171
else:
172
packets.append(ether / ip / udp / raw)
173
174
for packet in packets:
175
sp.sendp(packet, iface=send_params['sendif'], verbose=False)
176
177
178
def send_ping(ping_type, send_params):
179
if ping_type == 'icmp':
180
send_icmp_ping(send_params)
181
elif (
182
ping_type == 'tcpsyn' or
183
ping_type == 'tcp3way'
184
):
185
send_tcp_syn(send_params)
186
elif ping_type == 'udp':
187
send_udp(send_params)
188
else:
189
raise Exception('Unsupported ping type')
190
191
192
def check_ipv4(expect_params, packet):
193
src_address = expect_params.get('src_address')
194
dst_address = expect_params.get('dst_address')
195
flags = expect_params.get('flags')
196
tos = expect_params.get('tc')
197
ttl = expect_params.get('hlim')
198
ip = packet.getlayer(sp.IP)
199
LOGGER.debug(f'Packet: {ip}')
200
if not ip:
201
LOGGER.debug('Packet is not IPv4!')
202
return False
203
if src_address and ip.src != src_address:
204
LOGGER.debug(f'Wrong IPv4 source {ip.src}, expected {src_address}')
205
return False
206
if dst_address and ip.dst != dst_address:
207
LOGGER.debug(f'Wrong IPv4 destination {ip.dst}, expected {dst_address}')
208
return False
209
if flags and ip.flags != flags:
210
LOGGER.debug(f'Wrong IP flags value {ip.flags}, expected {flags}')
211
return False
212
if tos and ip.tos != tos:
213
LOGGER.debug(f'Wrong ToS value {ip.tos}, expected {tos}')
214
return False
215
if ttl and ip.ttl != ttl:
216
LOGGER.debug(f'Wrong TTL value {ip.ttl}, expected {ttl}')
217
return False
218
return True
219
220
221
def check_ipv6(expect_params, packet):
222
src_address = expect_params.get('src_address')
223
dst_address = expect_params.get('dst_address')
224
flags = expect_params.get('flags')
225
hlim = expect_params.get('hlim')
226
tc = expect_params.get('tc')
227
ip6 = packet.getlayer(sp.IPv6)
228
if not ip6:
229
LOGGER.debug('Packet is not IPv6!')
230
return False
231
if src_address and socket.inet_pton(socket.AF_INET6, ip6.src) != \
232
socket.inet_pton(socket.AF_INET6, src_address):
233
LOGGER.debug(f'Wrong IPv6 source {ip6.src}, expected {src_address}')
234
return False
235
if dst_address and socket.inet_pton(socket.AF_INET6, ip6.dst) != \
236
socket.inet_pton(socket.AF_INET6, dst_address):
237
LOGGER.debug(f'Wrong IPv6 destination {ip6.dst}, expected {dst_address}')
238
return False
239
# IPv6 has no IP-level checksum.
240
if flags:
241
raise Exception("There's no fragmentation flags in IPv6")
242
if hlim and ip6.hlim != hlim:
243
LOGGER.debug(f'Wrong Hop Limit value {ip6.hlim}, expected {hlim}')
244
return False
245
if tc and ip6.tc != tc:
246
LOGGER.debug(f'Wrong TC value {ip6.tc}, expected {tc}')
247
return False
248
return True
249
250
251
def check_ping_4(expect_params, packet):
252
expect_length = expect_params['length']
253
if not check_ipv4(expect_params, packet):
254
return False
255
icmp = packet.getlayer(sp.ICMP)
256
if not icmp:
257
LOGGER.debug('Packet is not IPv4 ICMP!')
258
return False
259
raw = packet.getlayer(sp.Raw)
260
if not raw:
261
LOGGER.debug('Packet contains no payload!')
262
return False
263
if raw.load != build_payload(expect_length):
264
LOGGER.debug('Payload magic does not match!')
265
return False
266
return True
267
268
269
def check_ping_request_4(expect_params, packet):
270
if not check_ping_4(expect_params, packet):
271
return False
272
icmp = packet.getlayer(sp.ICMP)
273
if sp.icmptypes[icmp.type] != 'echo-request':
274
LOGGER.debug('Packet is not IPv4 ICMP Echo Request!')
275
return False
276
return True
277
278
279
def check_ping_reply_4(expect_params, packet):
280
if not check_ping_4(expect_params, packet):
281
return False
282
icmp = packet.getlayer(sp.ICMP)
283
if sp.icmptypes[icmp.type] != 'echo-reply':
284
LOGGER.debug('Packet is not IPv4 ICMP Echo Reply!')
285
return False
286
return True
287
288
289
def check_ping_request_6(expect_params, packet):
290
expect_length = expect_params['length']
291
if not check_ipv6(expect_params, packet):
292
return False
293
icmp = packet.getlayer(sp.ICMPv6EchoRequest)
294
if not icmp:
295
LOGGER.debug('Packet is not IPv6 ICMP Echo Request!')
296
return False
297
if icmp.data != build_payload(expect_length):
298
LOGGER.debug('Payload magic does not match!')
299
return False
300
return True
301
302
303
def check_ping_reply_6(expect_params, packet):
304
expect_length = expect_params['length']
305
if not check_ipv6(expect_params, packet):
306
return False
307
icmp = packet.getlayer(sp.ICMPv6EchoReply)
308
if not icmp:
309
LOGGER.debug('Packet is not IPv6 ICMP Echo Reply!')
310
return False
311
if icmp.data != build_payload(expect_length):
312
LOGGER.debug('Payload magic does not match!')
313
return False
314
return True
315
316
317
def check_ping_request(args, packet):
318
src_address = args['expect_params'].get('src_address')
319
dst_address = args['expect_params'].get('dst_address')
320
if not (src_address or dst_address):
321
raise Exception('Source or destination address must be given to match the ping request!')
322
if (
323
(src_address and ':' in src_address) or
324
(dst_address and ':' in dst_address)
325
):
326
return check_ping_request_6(args['expect_params'], packet)
327
else:
328
return check_ping_request_4(args['expect_params'], packet)
329
330
331
def check_ping_reply(args, packet):
332
src_address = args['expect_params'].get('src_address')
333
dst_address = args['expect_params'].get('dst_address')
334
if not (src_address or dst_address):
335
raise Exception('Source or destination address must be given to match the ping reply!')
336
if (
337
(src_address and ':' in src_address) or
338
(dst_address and ':' in dst_address)
339
):
340
return check_ping_reply_6(args['expect_params'], packet)
341
else:
342
return check_ping_reply_4(args['expect_params'], packet)
343
344
345
def check_tcp(expect_params, packet):
346
tcp_flags = expect_params.get('tcp_flags')
347
mss = expect_params.get('mss')
348
seq = expect_params.get('seq')
349
tcp = packet.getlayer(sp.TCP)
350
if not tcp:
351
LOGGER.debug('Packet is not TCP!')
352
return False
353
chksum = tcp.chksum
354
tcp.chksum = None
355
newpacket = sp.Ether(sp.raw(packet[sp.Ether]))
356
new_chksum = newpacket[sp.TCP].chksum
357
if new_chksum and chksum != new_chksum:
358
LOGGER.debug(f'Wrong TCP checksum {chksum}, expected {new_chksum}!')
359
return False
360
if tcp_flags and tcp.flags != tcp_flags:
361
LOGGER.debug(f'Wrong TCP flags {tcp.flags}, expected {tcp_flags}!')
362
return False
363
if seq:
364
if tcp_flags == 'S':
365
tcp_seq = tcp.seq
366
elif tcp_flags == 'SA':
367
tcp_seq = tcp.ack - 1
368
if seq != tcp_seq:
369
LOGGER.debug(f'Wrong TCP Sequence Number {tcp_seq}, expected {seq}')
370
return False
371
if mss:
372
for option in tcp.options:
373
if option[0] == 'MSS':
374
if option[1] != mss:
375
LOGGER.debug(f'Wrong TCP MSS {option[1]}, expected {mss}')
376
return False
377
return True
378
379
380
def check_udp(expect_params, packet):
381
expect_length = expect_params['length']
382
udp = packet.getlayer(sp.UDP)
383
if not udp:
384
LOGGER.debug('Packet is not UDP!')
385
return False
386
raw = packet.getlayer(sp.Raw)
387
if not raw:
388
LOGGER.debug('Packet contains no payload!')
389
return False
390
if raw.load != build_payload(expect_length):
391
LOGGER.debug(f'Payload magic does not match len {len(raw.load)} vs {expect_length}!')
392
return False
393
orig_chksum = udp.chksum
394
udp.chksum = None
395
newpacket = sp.Ether(sp.raw(packet[sp.Ether]))
396
new_chksum = newpacket[sp.UDP].chksum
397
if new_chksum and orig_chksum != new_chksum:
398
LOGGER.debug(f'Wrong UDP checksum {orig_chksum}, expected {new_chksum}!')
399
return False
400
401
return True
402
403
404
def check_tcp_syn_request_4(expect_params, packet):
405
if not check_ipv4(expect_params, packet):
406
return False
407
if not check_tcp(expect_params | {'tcp_flags': 'S'}, packet):
408
return False
409
return True
410
411
412
def check_tcp_syn_reply_4(send_params, expect_params, packet):
413
if not check_ipv4(expect_params, packet):
414
return False
415
if not check_tcp(expect_params | {'tcp_flags': 'SA'}, packet):
416
return False
417
return True
418
419
420
def check_tcp_3way_4(args, packet):
421
send_params = args['send_params']
422
423
expect_params_sa = clean_params(args['expect_params'])
424
expect_params_sa['src_address'] = send_params['dst_address']
425
expect_params_sa['dst_address'] = send_params['src_address']
426
427
# Sniff incoming SYN+ACK packet
428
if (
429
check_ipv4(expect_params_sa, packet) and
430
check_tcp(expect_params_sa | {'tcp_flags': 'SA'}, packet)
431
):
432
ether = sp.Ether()
433
ip_sa = packet.getlayer(sp.IP)
434
tcp_sa = packet.getlayer(sp.TCP)
435
reply_params = clean_params(send_params)
436
reply_params['src_address'] = ip_sa.dst
437
reply_params['dst_address'] = ip_sa.src
438
ip_a = prepare_ipv4(reply_params)
439
tcp_a = sp.TCP(
440
sport=tcp_sa.dport, dport=tcp_sa.sport, flags='A',
441
seq=tcp_sa.ack, ack=tcp_sa.seq + 1,
442
)
443
req = ether / ip_a / tcp_a
444
sp.sendp(req, iface=send_params['sendif'], verbose=False)
445
return True
446
447
return False
448
449
450
def check_udp_request_4(expect_params, packet):
451
if not check_ipv4(expect_params, packet):
452
return False
453
if not check_udp(expect_params, packet):
454
return False
455
return True
456
457
458
def check_tcp_syn_request_6(expect_params, packet):
459
if not check_ipv6(expect_params, packet):
460
return False
461
if not check_tcp(expect_params | {'tcp_flags': 'S'}, packet):
462
return False
463
return True
464
465
466
def check_tcp_syn_reply_6(expect_params, packet):
467
if not check_ipv6(expect_params, packet):
468
return False
469
if not check_tcp(expect_params | {'tcp_flags': 'SA'}, packet):
470
return False
471
return True
472
473
474
def check_tcp_3way_6(args, packet):
475
send_params = args['send_params']
476
477
expect_params_sa = clean_params(args['expect_params'])
478
expect_params_sa['src_address'] = send_params['dst_address']
479
expect_params_sa['dst_address'] = send_params['src_address']
480
481
# Sniff incoming SYN+ACK packet
482
if (
483
check_ipv6(expect_params_sa, packet) and
484
check_tcp(expect_params_sa | {'tcp_flags': 'SA'}, packet)
485
):
486
ether = sp.Ether()
487
ip6_sa = packet.getlayer(sp.IPv6)
488
tcp_sa = packet.getlayer(sp.TCP)
489
reply_params = clean_params(send_params)
490
reply_params['src_address'] = ip6_sa.dst
491
reply_params['dst_address'] = ip6_sa.src
492
ip_a = prepare_ipv6(reply_params)
493
tcp_a = sp.TCP(
494
sport=tcp_sa.dport, dport=tcp_sa.sport, flags='A',
495
seq=tcp_sa.ack, ack=tcp_sa.seq + 1,
496
)
497
req = ether / ip_a / tcp_a
498
sp.sendp(req, iface=send_params['sendif'], verbose=False)
499
return True
500
501
return False
502
503
504
def check_udp_request_6(expect_params, packet):
505
if not check_ipv6(expect_params, packet):
506
return False
507
if not check_udp(expect_params, packet):
508
return False
509
return True
510
511
def check_tcp_syn_request(args, packet):
512
expect_params = args['expect_params']
513
src_address = expect_params.get('src_address')
514
dst_address = expect_params.get('dst_address')
515
if not (src_address or dst_address):
516
raise Exception('Source or destination address must be given to match the tcp syn request!')
517
if (
518
(src_address and ':' in src_address) or
519
(dst_address and ':' in dst_address)
520
):
521
return check_tcp_syn_request_6(expect_params, packet)
522
else:
523
return check_tcp_syn_request_4(expect_params, packet)
524
525
526
def check_tcp_syn_reply(args, packet):
527
expect_params = args['expect_params']
528
src_address = expect_params.get('src_address')
529
dst_address = expect_params.get('dst_address')
530
if not (src_address or dst_address):
531
raise Exception('Source or destination address must be given to match the tcp syn reply!')
532
if (
533
(src_address and ':' in src_address) or
534
(dst_address and ':' in dst_address)
535
):
536
return check_tcp_syn_reply_6(expect_params, packet)
537
else:
538
return check_tcp_syn_reply_4(expect_params, packet)
539
540
def check_tcp_3way(args, packet):
541
expect_params = args['expect_params']
542
src_address = expect_params.get('src_address')
543
dst_address = expect_params.get('dst_address')
544
if not (src_address or dst_address):
545
raise Exception('Source or destination address must be given to match the tcp syn reply!')
546
if (
547
(src_address and ':' in src_address) or
548
(dst_address and ':' in dst_address)
549
):
550
return check_tcp_3way_6(args, packet)
551
else:
552
return check_tcp_3way_4(args, packet)
553
554
555
def check_udp_request(args, packet):
556
expect_params = args['expect_params']
557
src_address = expect_params.get('src_address')
558
dst_address = expect_params.get('dst_address')
559
if not (src_address or dst_address):
560
raise Exception('Source or destination address must be given to match the tcp syn request!')
561
if (
562
(src_address and ':' in src_address) or
563
(dst_address and ':' in dst_address)
564
):
565
return check_udp_request_6(expect_params, packet)
566
else:
567
return check_udp_request_4(expect_params, packet)
568
569
570
def setup_sniffer(
571
recvif, ping_type, sniff_type, expect_params, defrag, send_params,
572
):
573
if ping_type == 'icmp' and sniff_type == 'request':
574
checkfn = check_ping_request
575
elif ping_type == 'icmp' and sniff_type == 'reply':
576
checkfn = check_ping_reply
577
elif ping_type == 'tcpsyn' and sniff_type == 'request':
578
checkfn = check_tcp_syn_request
579
elif ping_type == 'tcpsyn' and sniff_type == 'reply':
580
checkfn = check_tcp_syn_reply
581
elif ping_type == 'tcp3way' and sniff_type == 'reply':
582
checkfn = check_tcp_3way
583
elif ping_type == 'udp' and sniff_type == 'request':
584
checkfn = check_udp_request
585
else:
586
raise Exception('Unspported ping and sniff type combination')
587
588
return Sniffer(
589
{'send_params': send_params, 'expect_params': expect_params},
590
checkfn, recvif, defrag=defrag,
591
)
592
593
594
def parse_args():
595
parser = argparse.ArgumentParser("pft_ping.py",
596
description="Ping test tool")
597
598
# Parameters of sent ping request
599
parser.add_argument('--sendif', required=True,
600
help='The interface through which the packet(s) will be sent')
601
parser.add_argument('--to', required=True,
602
help='The destination IP address for the ping request')
603
parser.add_argument('--ping-type',
604
choices=('icmp', 'tcpsyn', 'tcp3way', 'udp'),
605
help='Type of ping: ICMP (default) or TCP SYN or 3-way TCP handshake',
606
default='icmp')
607
parser.add_argument('--fromaddr',
608
help='The source IP address for the ping request')
609
610
# Where to look for packets to analyze.
611
# The '+' format is ugly as it mixes positional with optional syntax.
612
# But we have no positional parameters so I guess it's fine to use it.
613
parser.add_argument('--recvif', nargs='+',
614
help='The interfaces on which to expect the ping request')
615
parser.add_argument('--replyif', nargs='+',
616
help='The interfaces which to expect the ping response')
617
618
# Packet settings
619
parser_send = parser.add_argument_group('Values set in transmitted packets')
620
parser_send.add_argument('--send-flags', type=str,
621
help='IPv4 fragmentation flags')
622
parser_send.add_argument('--send-frag-length', type=int,
623
help='Force IP fragmentation with given fragment length')
624
parser_send.add_argument('--send-hlim', type=int,
625
help='IPv6 Hop Limit or IPv4 Time To Live')
626
parser_send.add_argument('--send-mss', type=int,
627
help='TCP Maximum Segment Size')
628
parser_send.add_argument('--send-seq', type=int,
629
help='TCP sequence number')
630
parser_send.add_argument('--send-sport', type=int,
631
help='TCP source port')
632
parser_send.add_argument('--send-dport', type=int, default=9,
633
help='TCP destination port')
634
parser_send.add_argument('--send-length', type=int, default=len(PAYLOAD_MAGIC),
635
help='ICMP Echo Request payload size')
636
parser_send.add_argument('--send-tc', type=int,
637
help='IPv6 Traffic Class or IPv4 DiffServ / ToS')
638
parser_send.add_argument('--send-tcpopt-unaligned', action='store_true',
639
help='Include unaligned TCP options')
640
parser_send.add_argument('--send-nop', action='store_true',
641
help='Include a NOP IPv4 option')
642
643
# Expectations
644
parser_expect = parser.add_argument_group('Values expected in sniffed packets')
645
parser_expect.add_argument('--expect-flags', type=str,
646
help='IPv4 fragmentation flags')
647
parser_expect.add_argument('--expect-hlim', type=int,
648
help='IPv6 Hop Limit or IPv4 Time To Live')
649
parser_expect.add_argument('--expect-mss', type=int,
650
help='TCP Maximum Segment Size')
651
parser_send.add_argument('--expect-seq', type=int,
652
help='TCP sequence number')
653
parser_expect.add_argument('--expect-tc', type=int,
654
help='IPv6 Traffic Class or IPv4 DiffServ / ToS')
655
656
parser.add_argument('-v', '--verbose', action='store_true',
657
help=('Enable verbose logging. Apart of potentially useful information '
658
'you might see warnings from parsing packets like NDP or other '
659
'packets not related to the test being run. Use only when '
660
'developing because real tests expect empty stderr and stdout.'))
661
662
return parser.parse_args()
663
664
665
def main():
666
args = parse_args()
667
668
if args.verbose:
669
LOGGER.setLevel(logging.DEBUG)
670
671
# Split parameters into send and expect parameters. Parameters might be
672
# missing from the command line, always fill the dictionaries with None.
673
send_params = {}
674
expect_params = {}
675
for param_name in (
676
'flags', 'hlim', 'length', 'mss', 'seq', 'tc', 'frag_length',
677
'sport', 'dport',
678
):
679
param_arg = vars(args).get(f'send_{param_name}')
680
send_params[param_name] = param_arg if param_arg else None
681
param_arg = vars(args).get(f'expect_{param_name}')
682
expect_params[param_name] = param_arg if param_arg else None
683
684
expect_params['length'] = send_params['length']
685
send_params['tcpopt_unaligned'] = args.send_tcpopt_unaligned
686
send_params['nop'] = args.send_nop
687
send_params['src_address'] = args.fromaddr if args.fromaddr else None
688
send_params['dst_address'] = args.to
689
send_params['sendif'] = args.sendif
690
691
# We may not have a default route. Tell scapy where to start looking for routes
692
sp.conf.iface6 = args.sendif
693
694
# Configuration sanity checking.
695
if not (args.replyif or args.recvif):
696
raise Exception('With no reply or recv interface specified no traffic '
697
'can be sniffed and verified!'
698
)
699
700
sniffers = []
701
702
if send_params['frag_length']:
703
if (
704
(send_params['src_address'] and ':' in send_params['src_address']) or
705
(send_params['dst_address'] and ':' in send_params['dst_address'])
706
):
707
defrag = 'IPv6'
708
else:
709
defrag = 'IPv4'
710
else:
711
defrag = False
712
713
if args.recvif:
714
sniffer_params = copy(expect_params)
715
sniffer_params['src_address'] = None
716
sniffer_params['dst_address'] = args.to
717
for iface in args.recvif:
718
LOGGER.debug(f'Installing receive sniffer on {iface}')
719
sniffers.append(
720
setup_sniffer(iface, args.ping_type, 'request',
721
sniffer_params, defrag, send_params,
722
))
723
724
if args.replyif:
725
sniffer_params = copy(expect_params)
726
sniffer_params['src_address'] = args.to
727
sniffer_params['dst_address'] = None
728
for iface in args.replyif:
729
LOGGER.debug(f'Installing reply sniffer on {iface}')
730
sniffers.append(
731
setup_sniffer(iface, args.ping_type, 'reply',
732
sniffer_params, defrag, send_params,
733
))
734
735
LOGGER.debug(f'Installed {len(sniffers)} sniffers')
736
737
send_ping(args.ping_type, send_params)
738
739
err = 0
740
sniffer_num = 0
741
for sniffer in sniffers:
742
sniffer.join()
743
if sniffer.correctPackets == 1:
744
LOGGER.debug(f'Expected ping has been sniffed on {sniffer._recvif}.')
745
else:
746
# Set a bit in err for each failed sniffer.
747
err |= 1<<sniffer_num
748
if sniffer.correctPackets > 1:
749
LOGGER.debug(f'Duplicated ping has been sniffed on {sniffer._recvif}!')
750
else:
751
LOGGER.debug(f'Expected ping has not been sniffed on {sniffer._recvif}!')
752
sniffer_num += 1
753
754
return err
755
756
757
if __name__ == '__main__':
758
sys.exit(main())
759
760