Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/freebsd-src
Path: blob/main/tests/atf_python/sys/netpfil/ipfw/ipfw.py
39536 views
1
#!/usr/bin/env python3
2
import os
3
import socket
4
import struct
5
import subprocess
6
import sys
7
from ctypes import c_byte
8
from ctypes import c_char
9
from ctypes import c_int
10
from ctypes import c_long
11
from ctypes import c_uint32
12
from ctypes import c_uint8
13
from ctypes import c_ulong
14
from ctypes import c_ushort
15
from ctypes import sizeof
16
from ctypes import Structure
17
from enum import Enum
18
from typing import Any
19
from typing import Dict
20
from typing import List
21
from typing import NamedTuple
22
from typing import Optional
23
from typing import Union
24
25
from atf_python.sys.netpfil.ipfw.ioctl import get3_classes
26
from atf_python.sys.netpfil.ipfw.ioctl import legacy_classes
27
from atf_python.sys.netpfil.ipfw.ioctl import set3_classes
28
from atf_python.sys.netpfil.ipfw.utils import AttrDescr
29
from atf_python.sys.netpfil.ipfw.utils import enum_from_int
30
from atf_python.sys.netpfil.ipfw.utils import enum_or_int
31
from atf_python.sys.netpfil.ipfw.utils import prepare_attrs_map
32
33
34
class DebugHeader(Structure):
35
_fields_ = [
36
("cmd_type", c_ushort),
37
("spare1", c_ushort),
38
("opt_name", c_uint32),
39
("total_len", c_uint32),
40
("spare2", c_uint32),
41
]
42
43
44
class DebugType(Enum):
45
DO_CMD = 1
46
DO_SET3 = 2
47
DO_GET3 = 3
48
49
50
class DebugIoReader(object):
51
HANDLER_CLASSES = {
52
DebugType.DO_CMD: legacy_classes,
53
DebugType.DO_SET3: set3_classes,
54
DebugType.DO_GET3: get3_classes,
55
}
56
57
def __init__(self, ipfw_path):
58
self._msgmap = self.build_msgmap()
59
self.ipfw_path = ipfw_path
60
61
def build_msgmap(self):
62
xmap = {}
63
for debug_type, handler_classes in self.HANDLER_CLASSES.items():
64
debug_type = enum_or_int(debug_type)
65
if debug_type not in xmap:
66
xmap[debug_type] = {}
67
for handler_class in handler_classes:
68
for msg in handler_class.messages:
69
xmap[debug_type][enum_or_int(msg)] = handler_class
70
return xmap
71
72
def print_obj_header(self, hdr):
73
debug_type = "#{}".format(hdr.cmd_type)
74
for _type in self.HANDLER_CLASSES.keys():
75
if _type.value == hdr.cmd_type:
76
debug_type = _type.name.lower()
77
break
78
print(
79
"@@ record for {} len={} optname={}".format(
80
debug_type, hdr.total_len, hdr.opt_name
81
)
82
)
83
84
def parse_record(self, data):
85
hdr = DebugHeader.from_buffer_copy(data[: sizeof(DebugHeader)])
86
data = data[sizeof(DebugHeader) :]
87
cls = self._msgmap[hdr.cmd_type].get(hdr.opt_name)
88
if cls is not None:
89
return cls.from_bytes(data)
90
raise ValueError(
91
"unsupported cmd_type={} opt_name={}".format(hdr.cmd_type, hdr.opt_name)
92
)
93
94
def get_record_from_stdin(self):
95
data = sys.stdin.buffer.peek(sizeof(DebugHeader))
96
if len(data) == 0:
97
return None
98
99
hdr = DebugHeader.from_buffer_copy(data)
100
data = sys.stdin.buffer.read(hdr.total_len)
101
return self.parse_record(data)
102
103
def get_records_from_buffer(self, data):
104
off = 0
105
ret = []
106
while off + sizeof(DebugHeader) <= len(data):
107
hdr = DebugHeader.from_buffer_copy(data[off : off + sizeof(DebugHeader)])
108
ret.append(self.parse_record(data[off : off + hdr.total_len]))
109
off += hdr.total_len
110
return ret
111
112
def run_ipfw(self, cmd: str) -> bytes:
113
args = [self.ipfw_path, "-xqn"] + cmd.split()
114
r = subprocess.run(args, capture_output=True)
115
return r.stdout
116
117
def get_records(self, cmd: str):
118
return self.get_records_from_buffer(self.run_ipfw(cmd))
119
120