Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/freebsd-src
Path: blob/main/tests/sys/netinet6/frag6/sniffer.py
39488 views
1
2
import threading
3
import logging
4
logging.getLogger("scapy").setLevel(logging.CRITICAL)
5
import scapy.all as sp
6
7
class Sniffer(threading.Thread):
8
def __init__(self, args, check_function):
9
threading.Thread.__init__(self)
10
11
self._args = args
12
self._recvif = args.recvif[0]
13
self._check_function = check_function
14
self.foundCorrectPacket = False
15
self._endme = False
16
17
self.start()
18
19
def _checkPacket(self, packet):
20
ret = self._check_function(self._args, packet)
21
if ret:
22
self.foundCorrectPacket = True
23
return ret
24
25
def setEnd(self):
26
self._endme = True
27
28
def stopFilter(self, pkt):
29
if pkt is not None:
30
self._checkPacket(pkt)
31
if self.foundCorrectPacket or self._endme:
32
return True
33
else:
34
return False
35
36
def run(self):
37
while True:
38
self.packets = sp.sniff(iface=self._recvif, store=False,
39
stop_filter=self.stopFilter, timeout=90)
40
if self.stopFilter(None):
41
break
42
43