Path: blob/master/tools/testing/selftests/drivers/net/lib/py/env.py
26305 views
# SPDX-License-Identifier: GPL-2.012import os3import time4from pathlib import Path5from lib.py import KsftSkipEx, KsftXfailEx6from lib.py import ksft_setup7from lib.py import cmd, ethtool, ip, CmdExitFailure8from lib.py import NetNS, NetdevSimDev9from .remote import Remote101112class NetDrvEnvBase:13"""14Base class for a NIC / host environments1516Attributes:17test_dir: Path to the source directory of the test18net_lib_dir: Path to the net/lib directory19"""20def __init__(self, src_path):21self.src_path = Path(src_path)22self.test_dir = self.src_path.parent.resolve()23self.net_lib_dir = (Path(__file__).parent / "../../../../net/lib").resolve()2425self.env = self._load_env_file()2627def _load_env_file(self):28env = os.environ.copy()2930src_dir = Path(self.src_path).parent.resolve()31if not (src_dir / "net.config").exists():32return ksft_setup(env)3334with open((src_dir / "net.config").as_posix(), 'r') as fp:35for line in fp.readlines():36full_file = line37# Strip comments38pos = line.find("#")39if pos >= 0:40line = line[:pos]41line = line.strip()42if not line:43continue44pair = line.split('=', maxsplit=1)45if len(pair) != 2:46raise Exception("Can't parse configuration line:", full_file)47env[pair[0]] = pair[1]48return ksft_setup(env)495051class NetDrvEnv(NetDrvEnvBase):52"""53Class for a single NIC / host env, with no remote end54"""55def __init__(self, src_path, nsim_test=None, **kwargs):56super().__init__(src_path)5758self._ns = None5960if 'NETIF' in self.env:61if nsim_test is True:62raise KsftXfailEx("Test only works on netdevsim")6364self.dev = ip("-d link show dev " + self.env['NETIF'], json=True)[0]65else:66if nsim_test is False:67raise KsftXfailEx("Test does not work on netdevsim")6869self._ns = NetdevSimDev(**kwargs)70self.dev = self._ns.nsims[0].dev71self.ifname = self.dev['ifname']72self.ifindex = self.dev['ifindex']7374def __enter__(self):75ip(f"link set dev {self.dev['ifname']} up")7677return self7879def __exit__(self, ex_type, ex_value, ex_tb):80"""81__exit__ gets called at the end of a "with" block.82"""83self.__del__()8485def __del__(self):86if self._ns:87self._ns.remove()88self._ns = None899091class NetDrvEpEnv(NetDrvEnvBase):92"""93Class for an environment with a local device and "remote endpoint"94which can be used to send traffic in.9596For local testing it creates two network namespaces and a pair97of netdevsim devices.98"""99100# Network prefixes used for local tests101nsim_v4_pfx = "192.0.2."102nsim_v6_pfx = "2001:db8::"103104def __init__(self, src_path, nsim_test=None):105super().__init__(src_path)106107self._stats_settle_time = None108109# Things we try to destroy110self.remote = None111# These are for local testing state112self._netns = None113self._ns = None114self._ns_peer = None115116self.addr_v = { "4": None, "6": None }117self.remote_addr_v = { "4": None, "6": None }118119if "NETIF" in self.env:120if nsim_test is True:121raise KsftXfailEx("Test only works on netdevsim")122self._check_env()123124self.dev = ip("-d link show dev " + self.env['NETIF'], json=True)[0]125126self.addr_v["4"] = self.env.get("LOCAL_V4")127self.addr_v["6"] = self.env.get("LOCAL_V6")128self.remote_addr_v["4"] = self.env.get("REMOTE_V4")129self.remote_addr_v["6"] = self.env.get("REMOTE_V6")130kind = self.env["REMOTE_TYPE"]131args = self.env["REMOTE_ARGS"]132else:133if nsim_test is False:134raise KsftXfailEx("Test does not work on netdevsim")135136self.create_local()137138self.dev = self._ns.nsims[0].dev139140self.addr_v["4"] = self.nsim_v4_pfx + "1"141self.addr_v["6"] = self.nsim_v6_pfx + "1"142self.remote_addr_v["4"] = self.nsim_v4_pfx + "2"143self.remote_addr_v["6"] = self.nsim_v6_pfx + "2"144kind = "netns"145args = self._netns.name146147self.remote = Remote(kind, args, src_path)148149self.addr_ipver = "6" if self.addr_v["6"] else "4"150self.addr = self.addr_v[self.addr_ipver]151self.remote_addr = self.remote_addr_v[self.addr_ipver]152153# Bracketed addresses, some commands need IPv6 to be inside []154self.baddr = f"[{self.addr_v['6']}]" if self.addr_v["6"] else self.addr_v["4"]155self.remote_baddr = f"[{self.remote_addr_v['6']}]" if self.remote_addr_v["6"] else self.remote_addr_v["4"]156157self.ifname = self.dev['ifname']158self.ifindex = self.dev['ifindex']159160# resolve remote interface name161self.remote_ifname = self.resolve_remote_ifc()162163self._required_cmd = {}164165def create_local(self):166self._netns = NetNS()167self._ns = NetdevSimDev()168self._ns_peer = NetdevSimDev(ns=self._netns)169170with open("/proc/self/ns/net") as nsfd0, \171open("/var/run/netns/" + self._netns.name) as nsfd1:172ifi0 = self._ns.nsims[0].ifindex173ifi1 = self._ns_peer.nsims[0].ifindex174NetdevSimDev.ctrl_write('link_device',175f'{nsfd0.fileno()}:{ifi0} {nsfd1.fileno()}:{ifi1}')176177ip(f" addr add dev {self._ns.nsims[0].ifname} {self.nsim_v4_pfx}1/24")178ip(f"-6 addr add dev {self._ns.nsims[0].ifname} {self.nsim_v6_pfx}1/64 nodad")179ip(f" link set dev {self._ns.nsims[0].ifname} up")180181ip(f" addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v4_pfx}2/24", ns=self._netns)182ip(f"-6 addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v6_pfx}2/64 nodad", ns=self._netns)183ip(f" link set dev {self._ns_peer.nsims[0].ifname} up", ns=self._netns)184185def _check_env(self):186vars_needed = [187["LOCAL_V4", "LOCAL_V6"],188["REMOTE_V4", "REMOTE_V6"],189["REMOTE_TYPE"],190["REMOTE_ARGS"]191]192missing = []193194for choice in vars_needed:195for entry in choice:196if entry in self.env:197break198else:199missing.append(choice)200# Make sure v4 / v6 configs are symmetric201if ("LOCAL_V6" in self.env) != ("REMOTE_V6" in self.env):202missing.append(["LOCAL_V6", "REMOTE_V6"])203if ("LOCAL_V4" in self.env) != ("REMOTE_V4" in self.env):204missing.append(["LOCAL_V4", "REMOTE_V4"])205if missing:206raise Exception("Invalid environment, missing configuration:", missing,207"Please see tools/testing/selftests/drivers/net/README.rst")208209def resolve_remote_ifc(self):210v4 = v6 = None211if self.remote_addr_v["4"]:212v4 = ip("addr show to " + self.remote_addr_v["4"], json=True, host=self.remote)213if self.remote_addr_v["6"]:214v6 = ip("addr show to " + self.remote_addr_v["6"], json=True, host=self.remote)215if v4 and v6 and v4[0]["ifname"] != v6[0]["ifname"]:216raise Exception("Can't resolve remote interface name, v4 and v6 don't match")217if (v4 and len(v4) > 1) or (v6 and len(v6) > 1):218raise Exception("Can't resolve remote interface name, multiple interfaces match")219return v6[0]["ifname"] if v6 else v4[0]["ifname"]220221def __enter__(self):222return self223224def __exit__(self, ex_type, ex_value, ex_tb):225"""226__exit__ gets called at the end of a "with" block.227"""228self.__del__()229230def __del__(self):231if self._ns:232self._ns.remove()233self._ns = None234if self._ns_peer:235self._ns_peer.remove()236self._ns_peer = None237if self._netns:238del self._netns239self._netns = None240if self.remote:241del self.remote242self.remote = None243244def require_ipver(self, ipver):245if not self.addr_v[ipver] or not self.remote_addr_v[ipver]:246raise KsftSkipEx(f"Test requires IPv{ipver} connectivity")247248def _require_cmd(self, comm, key, host=None):249cached = self._required_cmd.get(comm, {})250if cached.get(key) is None:251cached[key] = cmd("command -v -- " + comm, fail=False,252shell=True, host=host).ret == 0253self._required_cmd[comm] = cached254return cached[key]255256def require_cmd(self, comm, local=True, remote=False):257if local:258if not self._require_cmd(comm, "local"):259raise KsftSkipEx("Test requires command: " + comm)260if remote:261if not self._require_cmd(comm, "remote", host=self.remote):262raise KsftSkipEx("Test requires (remote) command: " + comm)263264def wait_hw_stats_settle(self):265"""266Wait for HW stats to become consistent, some devices DMA HW stats267periodically so events won't be reflected until next sync.268Good drivers will tell us via ethtool what their sync period is.269"""270if self._stats_settle_time is None:271data = {}272try:273data = ethtool("-c " + self.ifname, json=True)[0]274except CmdExitFailure as e:275if "Operation not supported" not in e.cmd.stderr:276raise277278self._stats_settle_time = 0.025 + \279data.get('stats-block-usecs', 0) / 1000 / 1000280281time.sleep(self._stats_settle_time)282283284