Path: blob/master/tools/testing/selftests/drivers/net/lib/py/env.py
49631 views
# SPDX-License-Identifier: GPL-2.012import os3import time4from pathlib import Path5from lib.py import KsftSkipEx, KsftXfailEx6from lib.py import ksft_setup, wait_file7from lib.py import cmd, ethtool, ip, CmdExitFailure8from lib.py import NetNS, NetdevSimDev9from .remote import Remote101112class NetDrvEnvBase:13"""14Base class for a NIC / host environments1516Attributes:17test_dir: Path to the source directory of the test18net_lib_dir: Path to the net/lib directory19"""20def __init__(self, src_path):21self.src_path = Path(src_path)22self.test_dir = self.src_path.parent.resolve()23self.net_lib_dir = (Path(__file__).parent / "../../../../net/lib").resolve()2425self.env = self._load_env_file()2627# Following attrs must be set be inheriting classes28self.dev = None2930def _load_env_file(self):31env = os.environ.copy()3233src_dir = Path(self.src_path).parent.resolve()34if not (src_dir / "net.config").exists():35return ksft_setup(env)3637with open((src_dir / "net.config").as_posix(), 'r') as fp:38for line in fp.readlines():39full_file = line40# Strip comments41pos = line.find("#")42if pos >= 0:43line = line[:pos]44line = line.strip()45if not line:46continue47pair = line.split('=', maxsplit=1)48if len(pair) != 2:49raise Exception("Can't parse configuration line:", full_file)50env[pair[0]] = pair[1]51return ksft_setup(env)5253def __del__(self):54pass5556def __enter__(self):57ip(f"link set dev {self.dev['ifname']} up")58wait_file(f"/sys/class/net/{self.dev['ifname']}/carrier",59lambda x: x.strip() == "1")6061return self6263def __exit__(self, ex_type, ex_value, ex_tb):64"""65__exit__ gets called at the end of a "with" block.66"""67self.__del__()686970class NetDrvEnv(NetDrvEnvBase):71"""72Class for a single NIC / host env, with no remote end73"""74def __init__(self, src_path, nsim_test=None, **kwargs):75super().__init__(src_path)7677self._ns = None7879if 'NETIF' in self.env:80if nsim_test is True:81raise KsftXfailEx("Test only works on netdevsim")8283self.dev = ip("-d link show dev " + self.env['NETIF'], json=True)[0]84else:85if nsim_test is False:86raise KsftXfailEx("Test does not work on netdevsim")8788self._ns = NetdevSimDev(**kwargs)89self.dev = self._ns.nsims[0].dev90self.ifname = self.dev['ifname']91self.ifindex = self.dev['ifindex']9293def __del__(self):94if self._ns:95self._ns.remove()96self._ns = None979899class NetDrvEpEnv(NetDrvEnvBase):100"""101Class for an environment with a local device and "remote endpoint"102which can be used to send traffic in.103104For local testing it creates two network namespaces and a pair105of netdevsim devices.106"""107108# Network prefixes used for local tests109nsim_v4_pfx = "192.0.2."110nsim_v6_pfx = "2001:db8::"111112def __init__(self, src_path, nsim_test=None):113super().__init__(src_path)114115self._stats_settle_time = None116117# Things we try to destroy118self.remote = None119# These are for local testing state120self._netns = None121self._ns = None122self._ns_peer = None123124self.addr_v = { "4": None, "6": None }125self.remote_addr_v = { "4": None, "6": None }126127if "NETIF" in self.env:128if nsim_test is True:129raise KsftXfailEx("Test only works on netdevsim")130self._check_env()131132self.dev = ip("-d link show dev " + self.env['NETIF'], json=True)[0]133134self.addr_v["4"] = self.env.get("LOCAL_V4")135self.addr_v["6"] = self.env.get("LOCAL_V6")136self.remote_addr_v["4"] = self.env.get("REMOTE_V4")137self.remote_addr_v["6"] = self.env.get("REMOTE_V6")138kind = self.env["REMOTE_TYPE"]139args = self.env["REMOTE_ARGS"]140else:141if nsim_test is False:142raise KsftXfailEx("Test does not work on netdevsim")143144self.create_local()145146self.dev = self._ns.nsims[0].dev147148self.addr_v["4"] = self.nsim_v4_pfx + "1"149self.addr_v["6"] = self.nsim_v6_pfx + "1"150self.remote_addr_v["4"] = self.nsim_v4_pfx + "2"151self.remote_addr_v["6"] = self.nsim_v6_pfx + "2"152kind = "netns"153args = self._netns.name154155self.remote = Remote(kind, args, src_path)156157self.addr_ipver = "6" if self.addr_v["6"] else "4"158self.addr = self.addr_v[self.addr_ipver]159self.remote_addr = self.remote_addr_v[self.addr_ipver]160161# Bracketed addresses, some commands need IPv6 to be inside []162self.baddr = f"[{self.addr_v['6']}]" if self.addr_v["6"] else self.addr_v["4"]163self.remote_baddr = f"[{self.remote_addr_v['6']}]" if self.remote_addr_v["6"] else self.remote_addr_v["4"]164165self.ifname = self.dev['ifname']166self.ifindex = self.dev['ifindex']167168# resolve remote interface name169self.remote_ifname = self.resolve_remote_ifc()170self.remote_dev = ip("-d link show dev " + self.remote_ifname,171host=self.remote, json=True)[0]172173self._required_cmd = {}174175def create_local(self):176self._netns = NetNS()177self._ns = NetdevSimDev()178self._ns_peer = NetdevSimDev(ns=self._netns)179180with open("/proc/self/ns/net") as nsfd0, \181open("/var/run/netns/" + self._netns.name) as nsfd1:182ifi0 = self._ns.nsims[0].ifindex183ifi1 = self._ns_peer.nsims[0].ifindex184NetdevSimDev.ctrl_write('link_device',185f'{nsfd0.fileno()}:{ifi0} {nsfd1.fileno()}:{ifi1}')186187ip(f" addr add dev {self._ns.nsims[0].ifname} {self.nsim_v4_pfx}1/24")188ip(f"-6 addr add dev {self._ns.nsims[0].ifname} {self.nsim_v6_pfx}1/64 nodad")189ip(f" link set dev {self._ns.nsims[0].ifname} up")190191ip(f" addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v4_pfx}2/24", ns=self._netns)192ip(f"-6 addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v6_pfx}2/64 nodad", ns=self._netns)193ip(f" link set dev {self._ns_peer.nsims[0].ifname} up", ns=self._netns)194195def _check_env(self):196vars_needed = [197["LOCAL_V4", "LOCAL_V6"],198["REMOTE_V4", "REMOTE_V6"],199["REMOTE_TYPE"],200["REMOTE_ARGS"]201]202missing = []203204for choice in vars_needed:205for entry in choice:206if entry in self.env:207break208else:209missing.append(choice)210# Make sure v4 / v6 configs are symmetric211if ("LOCAL_V6" in self.env) != ("REMOTE_V6" in self.env):212missing.append(["LOCAL_V6", "REMOTE_V6"])213if ("LOCAL_V4" in self.env) != ("REMOTE_V4" in self.env):214missing.append(["LOCAL_V4", "REMOTE_V4"])215if missing:216raise Exception("Invalid environment, missing configuration:", missing,217"Please see tools/testing/selftests/drivers/net/README.rst")218219def resolve_remote_ifc(self):220v4 = v6 = None221if self.remote_addr_v["4"]:222v4 = ip("addr show to " + self.remote_addr_v["4"], json=True, host=self.remote)223if self.remote_addr_v["6"]:224v6 = ip("addr show to " + self.remote_addr_v["6"], json=True, host=self.remote)225if v4 and v6 and v4[0]["ifname"] != v6[0]["ifname"]:226raise Exception("Can't resolve remote interface name, v4 and v6 don't match")227if (v4 and len(v4) > 1) or (v6 and len(v6) > 1):228raise Exception("Can't resolve remote interface name, multiple interfaces match")229return v6[0]["ifname"] if v6 else v4[0]["ifname"]230231def __del__(self):232if self._ns:233self._ns.remove()234self._ns = None235if self._ns_peer:236self._ns_peer.remove()237self._ns_peer = None238if self._netns:239del self._netns240self._netns = None241if self.remote:242del self.remote243self.remote = None244245def require_ipver(self, ipver):246if not self.addr_v[ipver] or not self.remote_addr_v[ipver]:247raise KsftSkipEx(f"Test requires IPv{ipver} connectivity")248249def require_nsim(self):250if self._ns is None:251raise KsftXfailEx("Test only works on netdevsim")252253def _require_cmd(self, comm, key, host=None):254cached = self._required_cmd.get(comm, {})255if cached.get(key) is None:256cached[key] = cmd("command -v -- " + comm, fail=False,257shell=True, host=host).ret == 0258self._required_cmd[comm] = cached259return cached[key]260261def require_cmd(self, comm, local=True, remote=False):262if local:263if not self._require_cmd(comm, "local"):264raise KsftSkipEx("Test requires command: " + comm)265if remote:266if not self._require_cmd(comm, "remote", host=self.remote):267raise KsftSkipEx("Test requires (remote) command: " + comm)268269def wait_hw_stats_settle(self):270"""271Wait for HW stats to become consistent, some devices DMA HW stats272periodically so events won't be reflected until next sync.273Good drivers will tell us via ethtool what their sync period is.274"""275if self._stats_settle_time is None:276data = {}277try:278data = ethtool("-c " + self.ifname, json=True)[0]279except CmdExitFailure as e:280if "Operation not supported" not in e.cmd.stderr:281raise282283self._stats_settle_time = 0.025 + \284data.get('stats-block-usecs', 0) / 1000 / 1000285286time.sleep(self._stats_settle_time)287288289