Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
freebsd
GitHub Repository: freebsd/freebsd-src
Path: blob/main/tests/atf_python/sys/netlink/netlink_generic.py
39553 views
1
#!/usr/local/bin/python3
2
import struct
3
from ctypes import c_int64
4
from ctypes import c_long
5
from ctypes import sizeof
6
from ctypes import Structure
7
from enum import Enum
8
9
from atf_python.sys.netlink.attrs import NlAttr
10
from atf_python.sys.netlink.attrs import NlAttrIp4
11
from atf_python.sys.netlink.attrs import NlAttrIp6
12
from atf_python.sys.netlink.attrs import NlAttrNested
13
from atf_python.sys.netlink.attrs import NlAttrS32
14
from atf_python.sys.netlink.attrs import NlAttrStr
15
from atf_python.sys.netlink.attrs import NlAttrU16
16
from atf_python.sys.netlink.attrs import NlAttrU32
17
from atf_python.sys.netlink.attrs import NlAttrU8
18
from atf_python.sys.netlink.base_headers import GenlMsgHdr
19
from atf_python.sys.netlink.message import NlMsgCategory
20
from atf_python.sys.netlink.message import NlMsgProps
21
from atf_python.sys.netlink.message import StdNetlinkMessage
22
from atf_python.sys.netlink.utils import AttrDescr
23
from atf_python.sys.netlink.utils import enum_or_int
24
from atf_python.sys.netlink.utils import prepare_attrs_map
25
26
27
class NetlinkGenlMessage(StdNetlinkMessage):
28
messages = []
29
nl_attrs_map = {}
30
family_name = None
31
32
def __init__(self, helper, family_id, cmd=0):
33
super().__init__(helper, family_id)
34
self.base_hdr = GenlMsgHdr(cmd=enum_or_int(cmd))
35
36
def parse_base_header(self, data):
37
if len(data) < sizeof(GenlMsgHdr):
38
raise ValueError("length less than GenlMsgHdr header")
39
ghdr = GenlMsgHdr.from_buffer_copy(data)
40
return (ghdr, sizeof(GenlMsgHdr))
41
42
def _get_msg_type(self):
43
return self.base_hdr.cmd
44
45
def print_nl_header(self, prepend=""):
46
# len=44, type=RTM_DELROUTE, flags=NLM_F_REQUEST|NLM_F_ACK, seq=1641163704, pid=0 # noqa: E501
47
hdr = self.nl_hdr
48
print(
49
"{}len={}, family={}, flags={}(0x{:X}), seq={}, pid={}".format(
50
prepend,
51
hdr.nlmsg_len,
52
self.family_name,
53
self.get_nlm_flags_str(),
54
hdr.nlmsg_flags,
55
hdr.nlmsg_seq,
56
hdr.nlmsg_pid,
57
)
58
)
59
60
def print_base_header(self, hdr, prepend=""):
61
print(
62
"{}cmd={} version={} reserved={}".format(
63
prepend, self.msg_name, hdr.version, hdr.reserved
64
)
65
)
66
67
68
GenlCtrlFamilyName = "nlctrl"
69
70
71
class GenlCtrlMsgType(Enum):
72
CTRL_CMD_UNSPEC = 0
73
CTRL_CMD_NEWFAMILY = 1
74
CTRL_CMD_DELFAMILY = 2
75
CTRL_CMD_GETFAMILY = 3
76
CTRL_CMD_NEWOPS = 4
77
CTRL_CMD_DELOPS = 5
78
CTRL_CMD_GETOPS = 6
79
CTRL_CMD_NEWMCAST_GRP = 7
80
CTRL_CMD_DELMCAST_GRP = 8
81
CTRL_CMD_GETMCAST_GRP = 9
82
CTRL_CMD_GETPOLICY = 10
83
84
85
class GenlCtrlAttrType(Enum):
86
CTRL_ATTR_FAMILY_ID = 1
87
CTRL_ATTR_FAMILY_NAME = 2
88
CTRL_ATTR_VERSION = 3
89
CTRL_ATTR_HDRSIZE = 4
90
CTRL_ATTR_MAXATTR = 5
91
CTRL_ATTR_OPS = 6
92
CTRL_ATTR_MCAST_GROUPS = 7
93
CTRL_ATTR_POLICY = 8
94
CTRL_ATTR_OP_POLICY = 9
95
CTRL_ATTR_OP = 10
96
97
98
class GenlCtrlAttrOpType(Enum):
99
CTRL_ATTR_OP_ID = 1
100
CTRL_ATTR_OP_FLAGS = 2
101
102
103
class GenlCtrlAttrMcastGroupsType(Enum):
104
CTRL_ATTR_MCAST_GRP_NAME = 1
105
CTRL_ATTR_MCAST_GRP_ID = 2
106
107
108
genl_ctrl_attrs = prepare_attrs_map(
109
[
110
AttrDescr(GenlCtrlAttrType.CTRL_ATTR_FAMILY_ID, NlAttrU16),
111
AttrDescr(GenlCtrlAttrType.CTRL_ATTR_FAMILY_NAME, NlAttrStr),
112
AttrDescr(GenlCtrlAttrType.CTRL_ATTR_VERSION, NlAttrU32),
113
AttrDescr(GenlCtrlAttrType.CTRL_ATTR_HDRSIZE, NlAttrU32),
114
AttrDescr(GenlCtrlAttrType.CTRL_ATTR_MAXATTR, NlAttrU32),
115
AttrDescr(
116
GenlCtrlAttrType.CTRL_ATTR_OPS,
117
NlAttrNested,
118
[
119
AttrDescr(GenlCtrlAttrOpType.CTRL_ATTR_OP_ID, NlAttrU32),
120
AttrDescr(GenlCtrlAttrOpType.CTRL_ATTR_OP_FLAGS, NlAttrU32),
121
],
122
True,
123
),
124
AttrDescr(
125
GenlCtrlAttrType.CTRL_ATTR_MCAST_GROUPS,
126
NlAttrNested,
127
[
128
AttrDescr(
129
GenlCtrlAttrMcastGroupsType.CTRL_ATTR_MCAST_GRP_NAME, NlAttrStr
130
),
131
AttrDescr(
132
GenlCtrlAttrMcastGroupsType.CTRL_ATTR_MCAST_GRP_ID, NlAttrU32
133
),
134
],
135
True,
136
),
137
]
138
)
139
140
141
class NetlinkGenlCtrlMessage(NetlinkGenlMessage):
142
messages = [
143
NlMsgProps(GenlCtrlMsgType.CTRL_CMD_NEWFAMILY, NlMsgCategory.NEW),
144
NlMsgProps(GenlCtrlMsgType.CTRL_CMD_GETFAMILY, NlMsgCategory.GET),
145
NlMsgProps(GenlCtrlMsgType.CTRL_CMD_DELFAMILY, NlMsgCategory.DELETE),
146
]
147
nl_attrs_map = genl_ctrl_attrs
148
family_name = GenlCtrlFamilyName
149
150
151
CarpFamilyName = "carp"
152
153
154
class CarpMsgType(Enum):
155
CARP_NL_CMD_UNSPEC = 0
156
CARP_NL_CMD_GET = 1
157
CARP_NL_CMD_SET = 2
158
159
160
class CarpAttrType(Enum):
161
CARP_NL_UNSPEC = 0
162
CARP_NL_VHID = 1
163
CARP_NL_STATE = 2
164
CARP_NL_ADVBASE = 3
165
CARP_NL_ADVSKEW = 4
166
CARP_NL_KEY = 5
167
CARP_NL_IFINDEX = 6
168
CARP_NL_ADDR = 7
169
CARP_NL_ADDR6 = 8
170
CARP_NL_IFNAME = 9
171
172
173
carp_gen_attrs = prepare_attrs_map(
174
[
175
AttrDescr(CarpAttrType.CARP_NL_VHID, NlAttrU32),
176
AttrDescr(CarpAttrType.CARP_NL_STATE, NlAttrU32),
177
AttrDescr(CarpAttrType.CARP_NL_ADVBASE, NlAttrS32),
178
AttrDescr(CarpAttrType.CARP_NL_ADVSKEW, NlAttrS32),
179
AttrDescr(CarpAttrType.CARP_NL_KEY, NlAttr),
180
AttrDescr(CarpAttrType.CARP_NL_IFINDEX, NlAttrU32),
181
AttrDescr(CarpAttrType.CARP_NL_ADDR, NlAttrIp4),
182
AttrDescr(CarpAttrType.CARP_NL_ADDR6, NlAttrIp6),
183
AttrDescr(CarpAttrType.CARP_NL_IFNAME, NlAttrStr),
184
]
185
)
186
187
188
class CarpGenMessage(NetlinkGenlMessage):
189
messages = [
190
NlMsgProps(CarpMsgType.CARP_NL_CMD_GET, NlMsgCategory.GET),
191
NlMsgProps(CarpMsgType.CARP_NL_CMD_SET, NlMsgCategory.NEW),
192
]
193
nl_attrs_map = carp_gen_attrs
194
family_name = CarpFamilyName
195
196
197
KtestFamilyName = "ktest"
198
199
200
class KtestMsgType(Enum):
201
KTEST_CMD_UNSPEC = 0
202
KTEST_CMD_LIST = 1
203
KTEST_CMD_RUN = 2
204
KTEST_CMD_NEWTEST = 3
205
KTEST_CMD_NEWMESSAGE = 4
206
207
208
class KtestAttrType(Enum):
209
KTEST_ATTR_MOD_NAME = 1
210
KTEST_ATTR_TEST_NAME = 2
211
KTEST_ATTR_TEST_DESCR = 3
212
KTEST_ATTR_TEST_META = 4
213
214
215
class KtestLogMsgType(Enum):
216
KTEST_MSG_START = 1
217
KTEST_MSG_END = 2
218
KTEST_MSG_LOG = 3
219
KTEST_MSG_FAIL = 4
220
221
222
class KtestMsgAttrType(Enum):
223
KTEST_MSG_ATTR_TS = 1
224
KTEST_MSG_ATTR_FUNC = 2
225
KTEST_MSG_ATTR_FILE = 3
226
KTEST_MSG_ATTR_LINE = 4
227
KTEST_MSG_ATTR_TEXT = 5
228
KTEST_MSG_ATTR_LEVEL = 6
229
KTEST_MSG_ATTR_META = 7
230
231
232
class timespec(Structure):
233
_fields_ = [
234
("tv_sec", c_int64),
235
("tv_nsec", c_long),
236
]
237
238
239
class NlAttrTS(NlAttr):
240
DATA_LEN = sizeof(timespec)
241
242
def __init__(self, nla_type, val):
243
self.ts = val
244
super().__init__(nla_type, b"")
245
246
@property
247
def nla_len(self):
248
return NlAttr.HDR_LEN + self.DATA_LEN
249
250
def _print_attr_value(self):
251
return " tv_sec={} tv_nsec={}".format(self.ts.tv_sec, self.ts.tv_nsec)
252
253
@staticmethod
254
def _validate(data):
255
assert len(data) == NlAttr.HDR_LEN + NlAttrTS.DATA_LEN
256
nla_len, nla_type = struct.unpack("@HH", data[: NlAttr.HDR_LEN])
257
assert nla_len == NlAttr.HDR_LEN + NlAttrTS.DATA_LEN
258
259
@classmethod
260
def _parse(cls, data):
261
nla_len, nla_type = struct.unpack("@HH", data[: NlAttr.HDR_LEN])
262
val = timespec.from_buffer_copy(data[NlAttr.HDR_LEN :])
263
return cls(nla_type, val)
264
265
def __bytes__(self):
266
return self._to_bytes(bytes(self.ts))
267
268
269
ktest_info_attrs = prepare_attrs_map(
270
[
271
AttrDescr(KtestAttrType.KTEST_ATTR_MOD_NAME, NlAttrStr),
272
AttrDescr(KtestAttrType.KTEST_ATTR_TEST_NAME, NlAttrStr),
273
AttrDescr(KtestAttrType.KTEST_ATTR_TEST_DESCR, NlAttrStr),
274
]
275
)
276
277
278
ktest_msg_attrs = prepare_attrs_map(
279
[
280
AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_FUNC, NlAttrStr),
281
AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_FILE, NlAttrStr),
282
AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_LINE, NlAttrU32),
283
AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_TEXT, NlAttrStr),
284
AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_LEVEL, NlAttrU8),
285
AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_TS, NlAttrTS),
286
]
287
)
288
289
290
class KtestInfoMessage(NetlinkGenlMessage):
291
messages = [
292
NlMsgProps(KtestMsgType.KTEST_CMD_LIST, NlMsgCategory.GET),
293
NlMsgProps(KtestMsgType.KTEST_CMD_RUN, NlMsgCategory.NEW),
294
NlMsgProps(KtestMsgType.KTEST_CMD_NEWTEST, NlMsgCategory.NEW),
295
]
296
nl_attrs_map = ktest_info_attrs
297
family_name = KtestFamilyName
298
299
300
class KtestMsgMessage(NetlinkGenlMessage):
301
messages = [
302
NlMsgProps(KtestMsgType.KTEST_CMD_NEWMESSAGE, NlMsgCategory.NEW),
303
]
304
nl_attrs_map = ktest_msg_attrs
305
family_name = KtestFamilyName
306
307
308
handler_classes = {
309
CarpFamilyName: [CarpGenMessage],
310
GenlCtrlFamilyName: [NetlinkGenlCtrlMessage],
311
KtestFamilyName: [KtestInfoMessage, KtestMsgMessage],
312
}
313
314