Path: blob/main/tests/sys/netpfil/common/sniffer.py
39534 views
#1# SPDX-License-Identifier: BSD-2-Clause2#3# Copyright (c) 2017 Kristof Provost <[email protected]>4#5# Redistribution and use in source and binary forms, with or without6# modification, are permitted provided that the following conditions7# are met:8# 1. Redistributions of source code must retain the above copyright9# notice, this list of conditions and the following disclaimer.10# 2. Redistributions in binary form must reproduce the above copyright11# notice, this list of conditions and the following disclaimer in the12# documentation and/or other materials provided with the distribution.13#14# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND15# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE16# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE17# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE18# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL19# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS20# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)21# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT22# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY23# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF24# SUCH DAMAGE.25#2627import threading28import scapy.all as sp29import sys3031class Sniffer(threading.Thread):32def __init__(self, args, check_function, recvif, timeout=3, defrag=False):33threading.Thread.__init__(self)3435self._sem = threading.Semaphore(0)36self._args = args37self._timeout = timeout38self._recvif = recvif39self._check_function = check_function40self._defrag = defrag41self.correctPackets = 04243self.start()44if not self._sem.acquire(timeout=30):45raise Exception("Failed to start sniffer")4647def _checkPacket(self, packet):48ret = self._check_function(self._args, packet)49if ret:50self.correctPackets += 151return ret5253def _startedCb(self):54self._sem.release()5556def run(self):57self.packets = []58# With fragment reassembly we can't stop the sniffer after catching59# the good packets, as those have not been reassembled. We must60# wait for sniffer to finish and check returned packets instead.61if self._defrag == 'IPv4':62self.packets = sp.sniff(session=sp.IPSession, iface=self._recvif,63timeout=self._timeout, started_callback=self._startedCb)64for p in self.packets:65self._checkPacket(p)66elif self._defrag == 'IPv6':67self.packets = sp.sniff(session=sp.DefaultSession, iface=self._recvif,68timeout=self._timeout, started_callback=self._startedCb)69for p in sp.defragment6(self.packets):70self._checkPacket(p)71else:72self.packets = sp.sniff(iface=self._recvif,73stop_filter=self._checkPacket, timeout=self._timeout,74started_callback=self._startedCb)757677