Path: blob/main/tests/atf_python/sys/netpfil/ipfw/ipfw.py
39536 views
#!/usr/bin/env python31import os2import socket3import struct4import subprocess5import sys6from ctypes import c_byte7from ctypes import c_char8from ctypes import c_int9from ctypes import c_long10from ctypes import c_uint3211from ctypes import c_uint812from ctypes import c_ulong13from ctypes import c_ushort14from ctypes import sizeof15from ctypes import Structure16from enum import Enum17from typing import Any18from typing import Dict19from typing import List20from typing import NamedTuple21from typing import Optional22from typing import Union2324from atf_python.sys.netpfil.ipfw.ioctl import get3_classes25from atf_python.sys.netpfil.ipfw.ioctl import legacy_classes26from atf_python.sys.netpfil.ipfw.ioctl import set3_classes27from atf_python.sys.netpfil.ipfw.utils import AttrDescr28from atf_python.sys.netpfil.ipfw.utils import enum_from_int29from atf_python.sys.netpfil.ipfw.utils import enum_or_int30from atf_python.sys.netpfil.ipfw.utils import prepare_attrs_map313233class DebugHeader(Structure):34_fields_ = [35("cmd_type", c_ushort),36("spare1", c_ushort),37("opt_name", c_uint32),38("total_len", c_uint32),39("spare2", c_uint32),40]414243class DebugType(Enum):44DO_CMD = 145DO_SET3 = 246DO_GET3 = 3474849class DebugIoReader(object):50HANDLER_CLASSES = {51DebugType.DO_CMD: legacy_classes,52DebugType.DO_SET3: set3_classes,53DebugType.DO_GET3: get3_classes,54}5556def __init__(self, ipfw_path):57self._msgmap = self.build_msgmap()58self.ipfw_path = ipfw_path5960def build_msgmap(self):61xmap = {}62for debug_type, handler_classes in self.HANDLER_CLASSES.items():63debug_type = enum_or_int(debug_type)64if debug_type not in xmap:65xmap[debug_type] = {}66for handler_class in handler_classes:67for msg in handler_class.messages:68xmap[debug_type][enum_or_int(msg)] = handler_class69return xmap7071def print_obj_header(self, hdr):72debug_type = "#{}".format(hdr.cmd_type)73for _type in self.HANDLER_CLASSES.keys():74if _type.value == hdr.cmd_type:75debug_type = _type.name.lower()76break77print(78"@@ record for {} len={} optname={}".format(79debug_type, hdr.total_len, hdr.opt_name80)81)8283def parse_record(self, data):84hdr = DebugHeader.from_buffer_copy(data[: sizeof(DebugHeader)])85data = data[sizeof(DebugHeader) :]86cls = self._msgmap[hdr.cmd_type].get(hdr.opt_name)87if cls is not None:88return cls.from_bytes(data)89raise ValueError(90"unsupported cmd_type={} opt_name={}".format(hdr.cmd_type, hdr.opt_name)91)9293def get_record_from_stdin(self):94data = sys.stdin.buffer.peek(sizeof(DebugHeader))95if len(data) == 0:96return None9798hdr = DebugHeader.from_buffer_copy(data)99data = sys.stdin.buffer.read(hdr.total_len)100return self.parse_record(data)101102def get_records_from_buffer(self, data):103off = 0104ret = []105while off + sizeof(DebugHeader) <= len(data):106hdr = DebugHeader.from_buffer_copy(data[off : off + sizeof(DebugHeader)])107ret.append(self.parse_record(data[off : off + hdr.total_len]))108off += hdr.total_len109return ret110111def run_ipfw(self, cmd: str) -> bytes:112args = [self.ipfw_path, "-xqn"] + cmd.split()113r = subprocess.run(args, capture_output=True)114return r.stdout115116def get_records(self, cmd: str):117return self.get_records_from_buffer(self.run_ipfw(cmd))118119120