Path: blob/main/tests/atf_python/sys/netlink/message.py
39553 views
#!/usr/local/bin/python31import struct2from ctypes import sizeof3from enum import Enum4from typing import List5from typing import NamedTuple67from atf_python.sys.netlink.attrs import NlAttr8from atf_python.sys.netlink.attrs import NlAttrNested9from atf_python.sys.netlink.base_headers import NlmAckFlags10from atf_python.sys.netlink.base_headers import NlmNewFlags11from atf_python.sys.netlink.base_headers import NlmGetFlags12from atf_python.sys.netlink.base_headers import NlmDeleteFlags13from atf_python.sys.netlink.base_headers import NlmBaseFlags14from atf_python.sys.netlink.base_headers import Nlmsghdr15from atf_python.sys.netlink.base_headers import NlMsgType16from atf_python.sys.netlink.utils import align417from atf_python.sys.netlink.utils import enum_or_int18from atf_python.sys.netlink.utils import get_bitmask_str192021class NlMsgCategory(Enum):22UNKNOWN = 023GET = 124NEW = 225DELETE = 326ACK = 4272829class NlMsgProps(NamedTuple):30msg: Enum31category: NlMsgCategory323334class BaseNetlinkMessage(object):35def __init__(self, helper, nlmsg_type):36self.nlmsg_type = enum_or_int(nlmsg_type)37self.nla_list = []38self._orig_data = None39self.helper = helper40self.nl_hdr = Nlmsghdr(41nlmsg_type=self.nlmsg_type, nlmsg_seq=helper.get_seq(), nlmsg_pid=helper.pid42)43self.base_hdr = None4445def set_request(self, need_ack=True):46self.add_nlflags([NlmBaseFlags.NLM_F_REQUEST])47if need_ack:48self.add_nlflags([NlmBaseFlags.NLM_F_ACK])4950def add_nlflags(self, flags: List):51int_flags = 052for flag in flags:53int_flags |= enum_or_int(flag)54self.nl_hdr.nlmsg_flags |= int_flags5556def add_nla(self, nla):57self.nla_list.append(nla)5859def _get_nla(self, nla_list, nla_type):60nla_type_raw = enum_or_int(nla_type)61for nla in nla_list:62if nla.nla_type == nla_type_raw:63return nla64return None6566def get_nla(self, nla_type):67return self._get_nla(self.nla_list, nla_type)6869@staticmethod70def parse_nl_header(data: bytes):71if len(data) < sizeof(Nlmsghdr):72raise ValueError("length less than netlink message header")73return Nlmsghdr.from_buffer_copy(data), sizeof(Nlmsghdr)7475def is_type(self, nlmsg_type):76nlmsg_type_raw = enum_or_int(nlmsg_type)77return nlmsg_type_raw == self.nl_hdr.nlmsg_type7879def is_reply(self, hdr):80return hdr.nlmsg_type == NlMsgType.NLMSG_ERROR.value8182@property83def msg_name(self):84return "msg#{}".format(self._get_msg_type())8586def _get_nl_category(self):87if self.is_reply(self.nl_hdr):88return NlMsgCategory.ACK89return NlMsgCategory.UNKNOWN9091def get_nlm_flags_str(self):92category = self._get_nl_category()93flags = self.nl_hdr.nlmsg_flags9495if category == NlMsgCategory.UNKNOWN:96return self.helper.get_bitmask_str(NlmBaseFlags, flags)97elif category == NlMsgCategory.GET:98flags_enum = NlmGetFlags99elif category == NlMsgCategory.NEW:100flags_enum = NlmNewFlags101elif category == NlMsgCategory.DELETE:102flags_enum = NlmDeleteFlags103elif category == NlMsgCategory.ACK:104flags_enum = NlmAckFlags105return get_bitmask_str([NlmBaseFlags, flags_enum], flags)106107def print_nl_header(self, prepend=""):108# len=44, type=RTM_DELROUTE, flags=NLM_F_REQUEST|NLM_F_ACK, seq=1641163704, pid=0 # noqa: E501109hdr = self.nl_hdr110print(111"{}len={}, type={}, flags={}(0x{:X}), seq={}, pid={}".format(112prepend,113hdr.nlmsg_len,114self.msg_name,115self.get_nlm_flags_str(),116hdr.nlmsg_flags,117hdr.nlmsg_seq,118hdr.nlmsg_pid,119)120)121122@classmethod123def from_bytes(cls, helper, data):124try:125hdr, hdrlen = BaseNetlinkMessage.parse_nl_header(data)126self = cls(helper, hdr.nlmsg_type)127self._orig_data = data128self.nl_hdr = hdr129except ValueError as e:130print("Failed to parse nl header: {}".format(e))131cls.print_as_bytes(data)132raise133return self134135def print_message(self):136self.print_nl_header()137138@staticmethod139def print_as_bytes(data: bytes, descr: str):140print("===vv {} (len:{:3d}) vv===".format(descr, len(data)))141off = 0142step = 16143while off < len(data):144for i in range(step):145if off + i < len(data):146print(" {:02X}".format(data[off + i]), end="")147print("")148off += step149print("--------------------")150151152class StdNetlinkMessage(BaseNetlinkMessage):153nl_attrs_map = {}154155@classmethod156def from_bytes(cls, helper, data):157try:158hdr, hdrlen = BaseNetlinkMessage.parse_nl_header(data)159self = cls(helper, hdr.nlmsg_type)160self._orig_data = data161self.nl_hdr = hdr162except ValueError as e:163print("Failed to parse nl header: {}".format(e))164cls.print_as_bytes(data)165raise166167offset = align4(hdrlen)168try:169base_hdr, hdrlen = self.parse_base_header(data[offset:])170self.base_hdr = base_hdr171offset += align4(hdrlen)172# XXX: CAP_ACK173except ValueError as e:174print("Failed to parse nl rt header: {}".format(e))175cls.print_as_bytes(data)176raise177178orig_offset = offset179try:180nla_list, nla_len = self.parse_nla_list(data[offset:])181offset += nla_len182if offset != len(data):183raise ValueError(184"{} bytes left at the end of the packet".format(len(data) - offset)185) # noqa: E501186self.nla_list = nla_list187except ValueError as e:188print(189"Failed to parse nla attributes at offset {}: {}".format(orig_offset, e)190) # noqa: E501191cls.print_as_bytes(data, "msg dump")192cls.print_as_bytes(data[orig_offset:], "failed block")193raise194return self195196def parse_child(self, data: bytes, attr_key, attr_map):197attrs, _ = self.parse_attrs(data, attr_map)198return NlAttrNested(attr_key, attrs)199200def parse_child_array(self, data: bytes, attr_key, attr_map):201ret = []202off = 0203while len(data) - off >= 4:204nla_len, raw_nla_type = struct.unpack("@HH", data[off : off + 4])205if nla_len + off > len(data):206raise ValueError(207"attr length {} > than the remaining length {}".format(208nla_len, len(data) - off209)210)211nla_type = raw_nla_type & 0x3FFF212val = self.parse_child(data[off + 4 : off + nla_len], nla_type, attr_map)213ret.append(val)214off += align4(nla_len)215return NlAttrNested(attr_key, ret)216217def parse_attrs(self, data: bytes, attr_map):218ret = []219off = 0220while len(data) - off >= 4:221nla_len, raw_nla_type = struct.unpack("@HH", data[off : off + 4])222if nla_len + off > len(data):223raise ValueError(224"attr length {} > than the remaining length {}".format(225nla_len, len(data) - off226)227)228nla_type = raw_nla_type & 0x3FFF229if nla_type in attr_map:230v = attr_map[nla_type]231val = v["ad"].cls.from_bytes(data[off : off + nla_len], v["ad"].val)232if "child" in v:233# nested234child_data = data[off + 4 : off + nla_len]235if v.get("is_array", False):236# Array of nested attributes237val = self.parse_child_array(238child_data, v["ad"].val, v["child"]239)240else:241val = self.parse_child(child_data, v["ad"].val, v["child"])242else:243# unknown attribute244val = NlAttr(raw_nla_type, data[off + 4 : off + nla_len])245ret.append(val)246off += align4(nla_len)247return ret, off248249def parse_nla_list(self, data: bytes) -> List[NlAttr]:250return self.parse_attrs(data, self.nl_attrs_map)251252def __bytes__(self):253ret = bytes()254for nla in self.nla_list:255ret += bytes(nla)256ret = bytes(self.base_hdr) + ret257self.nl_hdr.nlmsg_len = len(ret) + sizeof(Nlmsghdr)258return bytes(self.nl_hdr) + ret259260def _get_msg_type(self):261return self.nl_hdr.nlmsg_type262263@property264def msg_props(self):265msg_type = self._get_msg_type()266for msg_props in self.messages:267if msg_props.msg.value == msg_type:268return msg_props269return None270271@property272def msg_name(self):273msg_props = self.msg_props274if msg_props is not None:275return msg_props.msg.name276return super().msg_name277278def print_base_header(self, hdr, prepend=""):279pass280281def print_message(self):282self.print_nl_header()283self.print_base_header(self.base_hdr, " ")284for nla in self.nla_list:285nla.print_attr(" ")286287288