Path: blob/main/tests/sys/netinet6/frag6/sniffer.py
39488 views
1import threading2import logging3logging.getLogger("scapy").setLevel(logging.CRITICAL)4import scapy.all as sp56class Sniffer(threading.Thread):7def __init__(self, args, check_function):8threading.Thread.__init__(self)910self._args = args11self._recvif = args.recvif[0]12self._check_function = check_function13self.foundCorrectPacket = False14self._endme = False1516self.start()1718def _checkPacket(self, packet):19ret = self._check_function(self._args, packet)20if ret:21self.foundCorrectPacket = True22return ret2324def setEnd(self):25self._endme = True2627def stopFilter(self, pkt):28if pkt is not None:29self._checkPacket(pkt)30if self.foundCorrectPacket or self._endme:31return True32else:33return False3435def run(self):36while True:37self.packets = sp.sniff(iface=self._recvif, store=False,38stop_filter=self.stopFilter, timeout=90)39if self.stopFilter(None):40break414243