Path: blob/main/tests/atf_python/sys/netpfil/ipfw/ioctl.py
39536 views
#!/usr/bin/env python31import os2import socket3import struct4import subprocess5import sys6from ctypes import c_byte7from ctypes import c_char8from ctypes import c_int9from ctypes import c_long10from ctypes import c_uint3211from ctypes import c_uint812from ctypes import c_ulong13from ctypes import c_ushort14from ctypes import sizeof15from ctypes import Structure16from enum import Enum17from typing import Any18from typing import Dict19from typing import List20from typing import NamedTuple21from typing import Optional22from typing import Union2324import pytest25from atf_python.sys.netpfil.ipfw.insns import BaseInsn26from atf_python.sys.netpfil.ipfw.insns import insn_attrs27from atf_python.sys.netpfil.ipfw.ioctl_headers import IpFwTableLookupType28from atf_python.sys.netpfil.ipfw.ioctl_headers import IpFwTlvType29from atf_python.sys.netpfil.ipfw.ioctl_headers import Op3CmdType30from atf_python.sys.netpfil.ipfw.utils import align831from atf_python.sys.netpfil.ipfw.utils import AttrDescr32from atf_python.sys.netpfil.ipfw.utils import enum_from_int33from atf_python.sys.netpfil.ipfw.utils import prepare_attrs_map343536class IpFw3OpHeader(Structure):37_fields_ = [38("opcode", c_ushort),39("version", c_ushort),40("reserved1", c_ushort),41("reserved2", c_ushort),42]434445class IpFwObjTlv(Structure):46_fields_ = [47("n_type", c_ushort),48("flags", c_ushort),49("length", c_uint32),50]515253class BaseTlv(object):54obj_enum_class = IpFwTlvType5556def __init__(self, obj_type):57if isinstance(obj_type, Enum):58self.obj_type = obj_type.value59self._enum = obj_type60else:61self.obj_type = obj_type62self._enum = enum_from_int(self.obj_enum_class, obj_type)63self.obj_list = []6465def add_obj(self, obj):66self.obj_list.append(obj)6768@property69def len(self):70return len(bytes(self))7172@property73def obj_name(self):74if self._enum is not None:75return self._enum.name76else:77return "tlv#{}".format(self.obj_type)7879def print_hdr(self, prepend=""):80print(81"{}len={} type={}({}){}".format(82prepend, self.len, self.obj_name, self.obj_type, self._print_obj_value()83)84)8586def print_obj(self, prepend=""):87self.print_hdr(prepend)88prepend = " " + prepend89for obj in self.obj_list:90obj.print_obj(prepend)9192def print_obj_hex(self, prepend=""):93print(prepend)94print()95print(" ".join(["x{:02X}".format(b) for b in bytes(self)]))9697@classmethod98def _validate(cls, data):99if len(data) < sizeof(IpFwObjTlv):100raise ValueError("TLV too short")101hdr = IpFwObjTlv.from_buffer_copy(data[: sizeof(IpFwObjTlv)])102if len(data) != hdr.length:103raise ValueError("wrong TLV size")104105@classmethod106def _parse(cls, data, attr_map):107hdr = IpFwObjTlv.from_buffer_copy(data[: sizeof(IpFwObjTlv)])108return cls(hdr.n_type)109110@classmethod111def from_bytes(cls, data, attr_map=None):112cls._validate(data)113obj = cls._parse(data, attr_map)114return obj115116def __bytes__(self):117raise NotImplementedError()118119def _print_obj_value(self):120return " " + " ".join(121["x{:02X}".format(b) for b in self._data[sizeof(IpFwObjTlv) :]]122)123124def as_hexdump(self):125return " ".join(["x{:02X}".format(b) for b in bytes(self)])126127128class UnknownTlv(BaseTlv):129def __init__(self, obj_type, data):130super().__init__(obj_type)131self._data = data132133@classmethod134def _validate(cls, data):135if len(data) < sizeof(IpFwObjNTlv):136raise ValueError("TLV size is too short")137hdr = IpFwObjTlv.from_buffer_copy(data[: sizeof(IpFwObjTlv)])138if len(data) != hdr.length:139raise ValueError("wrong TLV size")140141@classmethod142def _parse(cls, data, attr_map):143hdr = IpFwObjTlv.from_buffer_copy(data[: sizeof(IpFwObjTlv)])144self = cls(hdr.n_type, data)145return self146147def __bytes__(self):148return self._data149150151class Tlv(BaseTlv):152@staticmethod153def parse_tlvs(data, attr_map):154# print("PARSING " + " ".join(["x{:02X}".format(b) for b in data]))155off = 0156ret = []157while off + sizeof(IpFwObjTlv) <= len(data):158hdr = IpFwObjTlv.from_buffer_copy(data[off : off + sizeof(IpFwObjTlv)])159if off + hdr.length > len(data):160raise ValueError("TLV size do not match")161obj_data = data[off : off + hdr.length]162obj_descr = attr_map.get(hdr.n_type, None)163if obj_descr is None:164# raise ValueError("unknown child TLV {}".format(hdr.n_type))165cls = UnknownTlv166child_map = {}167else:168cls = obj_descr["ad"].cls169child_map = obj_descr.get("child", {})170# print("FOUND OBJECT type {}".format(cls))171# print()172obj = cls.from_bytes(obj_data, child_map)173ret.append(obj)174off += hdr.length175return ret176177178class IpFwObjNTlv(Structure):179_fields_ = [180("head", IpFwObjTlv),181("idx", c_ushort),182("n_set", c_uint8),183("n_type", c_uint8),184("spare", c_uint32),185("name", c_char * 64),186]187188189class NTlv(Tlv):190def __init__(self, obj_type, idx=0, n_set=0, n_type=0, name=None):191super().__init__(obj_type)192self.n_idx = idx193self.n_set = n_set194self.n_type = n_type195self.n_name = name196197@classmethod198def _validate(cls, data):199if len(data) != sizeof(IpFwObjNTlv):200raise ValueError("TLV size is not correct")201hdr = IpFwObjTlv.from_buffer_copy(data[: sizeof(IpFwObjTlv)])202if len(data) != hdr.length:203raise ValueError("wrong TLV size")204205@classmethod206def _parse(cls, data, attr_map):207hdr = IpFwObjNTlv.from_buffer_copy(data[: sizeof(IpFwObjNTlv)])208name = hdr.name.decode("utf-8")209self = cls(hdr.head.n_type, hdr.idx, hdr.n_set, hdr.n_type, name)210return self211212def __bytes__(self):213name_bytes = self.n_name.encode("utf-8")214if len(name_bytes) < 64:215name_bytes += b"\0" * (64 - len(name_bytes))216hdr = IpFwObjNTlv(217head=IpFwObjTlv(n_type=self.obj_type, length=sizeof(IpFwObjNTlv)),218idx=self.n_idx,219n_set=self.n_set,220n_type=self.n_type,221name=name_bytes[:64],222)223return bytes(hdr)224225def _print_obj_value(self):226return " " + "type={} set={} idx={} name={}".format(227self.n_type, self.n_set, self.n_idx, self.n_name228)229230231class IpFwObjCTlv(Structure):232_fields_ = [233("head", IpFwObjTlv),234("count", c_uint32),235("objsize", c_ushort),236("version", c_uint8),237("flags", c_uint8),238]239240241class CTlv(Tlv):242def __init__(self, obj_type, obj_list=[]):243super().__init__(obj_type)244if obj_list:245self.obj_list.extend(obj_list)246247@classmethod248def _validate(cls, data):249if len(data) < sizeof(IpFwObjCTlv):250raise ValueError("TLV too short")251hdr = IpFwObjCTlv.from_buffer_copy(data[: sizeof(IpFwObjCTlv)])252if len(data) != hdr.head.length:253raise ValueError("wrong TLV size")254255@classmethod256def _parse(cls, data, attr_map):257hdr = IpFwObjCTlv.from_buffer_copy(data[: sizeof(IpFwObjCTlv)])258tlv_list = cls.parse_tlvs(data[sizeof(IpFwObjCTlv) :], attr_map)259if len(tlv_list) != hdr.count:260raise ValueError("wrong number of objects")261self = cls(hdr.head.n_type, obj_list=tlv_list)262return self263264def __bytes__(self):265ret = b""266for obj in self.obj_list:267ret += bytes(obj)268length = len(ret) + sizeof(IpFwObjCTlv)269if self.obj_list:270objsize = len(bytes(self.obj_list[0]))271else:272objsize = 0273hdr = IpFwObjCTlv(274head=IpFwObjTlv(n_type=self.obj_type, length=sizeof(IpFwObjNTlv)),275count=len(self.obj_list),276objsize=objsize,277)278return bytes(hdr) + ret279280def _print_obj_value(self):281return ""282283284class IpFwRule(Structure):285_fields_ = [286("act_ofs", c_ushort),287("cmd_len", c_ushort),288("spare", c_ushort),289("n_set", c_uint8),290("flags", c_uint8),291("rulenum", c_uint32),292("n_id", c_uint32),293]294295296class RawRule(Tlv):297def __init__(self, obj_type=0, n_set=0, rulenum=0, obj_list=[]):298super().__init__(obj_type)299self.n_set = n_set300self.rulenum = rulenum301if obj_list:302self.obj_list.extend(obj_list)303304@classmethod305def _validate(cls, data):306min_size = sizeof(IpFwRule)307if len(data) < min_size:308raise ValueError("rule TLV too short")309rule = IpFwRule.from_buffer_copy(data[:min_size])310if len(data) != min_size + rule.cmd_len * 4:311raise ValueError("rule TLV cmd_len incorrect")312313@classmethod314def _parse(cls, data, attr_map):315hdr = IpFwRule.from_buffer_copy(data[: sizeof(IpFwRule)])316self = cls(317n_set=hdr.n_set,318rulenum=hdr.rulenum,319obj_list=BaseInsn.parse_insns(data[sizeof(IpFwRule) :], insn_attrs),320)321return self322323def __bytes__(self):324act_ofs = 0325cmd_len = 0326ret = b""327for obj in self.obj_list:328if obj.is_action and act_ofs == 0:329act_ofs = cmd_len330obj_bytes = bytes(obj)331cmd_len += len(obj_bytes) // 4332ret += obj_bytes333334hdr = IpFwRule(335act_ofs=act_ofs,336cmd_len=cmd_len,337n_set=self.n_set,338rulenum=self.rulenum,339)340return bytes(hdr) + ret341342@property343def obj_name(self):344return "rule#{}".format(self.rulenum)345346def _print_obj_value(self):347cmd_len = sum([len(bytes(obj)) for obj in self.obj_list]) // 4348return " set={} cmd_len={}".format(self.n_set, cmd_len)349350351class CTlvRule(CTlv):352def __init__(self, obj_type=IpFwTlvType.IPFW_TLV_RULE_LIST, obj_list=[]):353super().__init__(obj_type, obj_list)354355@classmethod356def _parse(cls, data, attr_map):357chdr = IpFwObjCTlv.from_buffer_copy(data[: sizeof(IpFwObjCTlv)])358rule_list = []359off = sizeof(IpFwObjCTlv)360while off + sizeof(IpFwRule) <= len(data):361hdr = IpFwRule.from_buffer_copy(data[off : off + sizeof(IpFwRule)])362rule_len = sizeof(IpFwRule) + hdr.cmd_len * 4363# print("FOUND RULE len={} cmd_len={}".format(rule_len, hdr.cmd_len))364if off + rule_len > len(data):365raise ValueError("wrong rule size")366rule = RawRule.from_bytes(data[off : off + rule_len])367rule_list.append(rule)368off += align8(rule_len)369if off != len(data):370raise ValueError("rule bytes left: off={} len={}".format(off, len(data)))371return cls(chdr.head.n_type, obj_list=rule_list)372373# XXX: _validate374375def __bytes__(self):376ret = b""377for rule in self.obj_list:378rule_bytes = bytes(rule)379remainder = len(rule_bytes) % 8380if remainder > 0:381rule_bytes += b"\0" * (8 - remainder)382ret += rule_bytes383hdr = IpFwObjCTlv(384head=IpFwObjTlv(385n_type=self.obj_type, length=len(ret) + sizeof(IpFwObjCTlv)386),387count=len(self.obj_list),388)389return bytes(hdr) + ret390391392class BaseIpFwMessage(object):393messages = []394395def __init__(self, msg_type, obj_list=[]):396if isinstance(msg_type, Enum):397self.obj_type = msg_type.value398self._enum = msg_type399else:400self.obj_type = msg_type401self._enum = enum_from_int(self.messages, self.obj_type)402self.obj_list = []403if obj_list:404self.obj_list.extend(obj_list)405406def add_obj(self, obj):407self.obj_list.append(obj)408409def get_obj(self, obj_type):410obj_type_raw = enum_or_int(obj_type)411for obj in self.obj_list:412if obj.obj_type == obj_type_raw:413return obj414return None415416@staticmethod417def parse_header(data: bytes):418if len(data) < sizeof(IpFw3OpHeader):419raise ValueError("length less than op3 message header")420return IpFw3OpHeader.from_buffer_copy(data), sizeof(IpFw3OpHeader)421422def parse_obj_list(self, data: bytes):423off = 0424while off < len(data):425# print("PARSE off={} rem={}".format(off, len(data) - off))426hdr = IpFwObjTlv.from_buffer_copy(data[off : off + sizeof(IpFwObjTlv)])427# print(" tlv len {}".format(hdr.length))428if hdr.length + off > len(data):429raise ValueError("TLV too big")430tlv = Tlv(hdr.n_type, data[off : off + hdr.length])431self.add_obj(tlv)432off += hdr.length433434def is_type(self, msg_type):435return enum_or_int(msg_type) == self.msg_type436437@property438def obj_name(self):439if self._enum is not None:440return self._enum.name441else:442return "msg#{}".format(self.obj_type)443444def print_hdr(self, prepend=""):445print("{}len={}, type={}".format(prepend, len(bytes(self)), self.obj_name))446447@classmethod448def from_bytes(cls, data):449try:450hdr, hdrlen = cls.parse_header(data)451self = cls(hdr.opcode)452self._orig_data = data453except ValueError as e:454print("Failed to parse op3 header: {}".format(e))455cls.print_as_bytes(data)456raise457tlv_list = Tlv.parse_tlvs(data[hdrlen:], self.attr_map)458self.obj_list.extend(tlv_list)459return self460461def __bytes__(self):462ret = bytes(IpFw3OpHeader(opcode=self.obj_type))463for obj in self.obj_list:464ret += bytes(obj)465return ret466467def print_obj(self):468self.print_hdr()469for obj in self.obj_list:470obj.print_obj(" ")471472@staticmethod473def print_as_bytes(data: bytes, descr: str):474print("===vv {} (len:{:3d}) vv===".format(descr, len(data)))475off = 0476step = 16477while off < len(data):478for i in range(step):479if off + i < len(data):480print(" {:02X}".format(data[off + i]), end="")481print("")482off += step483print("--------------------")484485486rule_attrs = prepare_attrs_map(487[488AttrDescr(489IpFwTlvType.IPFW_TLV_TBLNAME_LIST,490CTlv,491[492AttrDescr(IpFwTlvType.IPFW_TLV_TBL_NAME, NTlv),493AttrDescr(IpFwTlvType.IPFW_TLV_STATE_NAME, NTlv),494AttrDescr(IpFwTlvType.IPFW_TLV_EACTION, NTlv),495],496True,497),498AttrDescr(IpFwTlvType.IPFW_TLV_RULE_LIST, CTlvRule),499]500)501502503class IpFwXRule(BaseIpFwMessage):504messages = [Op3CmdType.IP_FW_XADD]505attr_map = rule_attrs506507508legacy_classes = []509set3_classes = []510get3_classes = [IpFwXRule]511512513