Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/tools/testing/selftests/drivers/net/lib/py/env.py
49631 views
1
# SPDX-License-Identifier: GPL-2.0
2
3
import os
4
import time
5
from pathlib import Path
6
from lib.py import KsftSkipEx, KsftXfailEx
7
from lib.py import ksft_setup, wait_file
8
from lib.py import cmd, ethtool, ip, CmdExitFailure
9
from lib.py import NetNS, NetdevSimDev
10
from .remote import Remote
11
12
13
class NetDrvEnvBase:
14
"""
15
Base class for a NIC / host environments
16
17
Attributes:
18
test_dir: Path to the source directory of the test
19
net_lib_dir: Path to the net/lib directory
20
"""
21
def __init__(self, src_path):
22
self.src_path = Path(src_path)
23
self.test_dir = self.src_path.parent.resolve()
24
self.net_lib_dir = (Path(__file__).parent / "../../../../net/lib").resolve()
25
26
self.env = self._load_env_file()
27
28
# Following attrs must be set be inheriting classes
29
self.dev = None
30
31
def _load_env_file(self):
32
env = os.environ.copy()
33
34
src_dir = Path(self.src_path).parent.resolve()
35
if not (src_dir / "net.config").exists():
36
return ksft_setup(env)
37
38
with open((src_dir / "net.config").as_posix(), 'r') as fp:
39
for line in fp.readlines():
40
full_file = line
41
# Strip comments
42
pos = line.find("#")
43
if pos >= 0:
44
line = line[:pos]
45
line = line.strip()
46
if not line:
47
continue
48
pair = line.split('=', maxsplit=1)
49
if len(pair) != 2:
50
raise Exception("Can't parse configuration line:", full_file)
51
env[pair[0]] = pair[1]
52
return ksft_setup(env)
53
54
def __del__(self):
55
pass
56
57
def __enter__(self):
58
ip(f"link set dev {self.dev['ifname']} up")
59
wait_file(f"/sys/class/net/{self.dev['ifname']}/carrier",
60
lambda x: x.strip() == "1")
61
62
return self
63
64
def __exit__(self, ex_type, ex_value, ex_tb):
65
"""
66
__exit__ gets called at the end of a "with" block.
67
"""
68
self.__del__()
69
70
71
class NetDrvEnv(NetDrvEnvBase):
72
"""
73
Class for a single NIC / host env, with no remote end
74
"""
75
def __init__(self, src_path, nsim_test=None, **kwargs):
76
super().__init__(src_path)
77
78
self._ns = None
79
80
if 'NETIF' in self.env:
81
if nsim_test is True:
82
raise KsftXfailEx("Test only works on netdevsim")
83
84
self.dev = ip("-d link show dev " + self.env['NETIF'], json=True)[0]
85
else:
86
if nsim_test is False:
87
raise KsftXfailEx("Test does not work on netdevsim")
88
89
self._ns = NetdevSimDev(**kwargs)
90
self.dev = self._ns.nsims[0].dev
91
self.ifname = self.dev['ifname']
92
self.ifindex = self.dev['ifindex']
93
94
def __del__(self):
95
if self._ns:
96
self._ns.remove()
97
self._ns = None
98
99
100
class NetDrvEpEnv(NetDrvEnvBase):
101
"""
102
Class for an environment with a local device and "remote endpoint"
103
which can be used to send traffic in.
104
105
For local testing it creates two network namespaces and a pair
106
of netdevsim devices.
107
"""
108
109
# Network prefixes used for local tests
110
nsim_v4_pfx = "192.0.2."
111
nsim_v6_pfx = "2001:db8::"
112
113
def __init__(self, src_path, nsim_test=None):
114
super().__init__(src_path)
115
116
self._stats_settle_time = None
117
118
# Things we try to destroy
119
self.remote = None
120
# These are for local testing state
121
self._netns = None
122
self._ns = None
123
self._ns_peer = None
124
125
self.addr_v = { "4": None, "6": None }
126
self.remote_addr_v = { "4": None, "6": None }
127
128
if "NETIF" in self.env:
129
if nsim_test is True:
130
raise KsftXfailEx("Test only works on netdevsim")
131
self._check_env()
132
133
self.dev = ip("-d link show dev " + self.env['NETIF'], json=True)[0]
134
135
self.addr_v["4"] = self.env.get("LOCAL_V4")
136
self.addr_v["6"] = self.env.get("LOCAL_V6")
137
self.remote_addr_v["4"] = self.env.get("REMOTE_V4")
138
self.remote_addr_v["6"] = self.env.get("REMOTE_V6")
139
kind = self.env["REMOTE_TYPE"]
140
args = self.env["REMOTE_ARGS"]
141
else:
142
if nsim_test is False:
143
raise KsftXfailEx("Test does not work on netdevsim")
144
145
self.create_local()
146
147
self.dev = self._ns.nsims[0].dev
148
149
self.addr_v["4"] = self.nsim_v4_pfx + "1"
150
self.addr_v["6"] = self.nsim_v6_pfx + "1"
151
self.remote_addr_v["4"] = self.nsim_v4_pfx + "2"
152
self.remote_addr_v["6"] = self.nsim_v6_pfx + "2"
153
kind = "netns"
154
args = self._netns.name
155
156
self.remote = Remote(kind, args, src_path)
157
158
self.addr_ipver = "6" if self.addr_v["6"] else "4"
159
self.addr = self.addr_v[self.addr_ipver]
160
self.remote_addr = self.remote_addr_v[self.addr_ipver]
161
162
# Bracketed addresses, some commands need IPv6 to be inside []
163
self.baddr = f"[{self.addr_v['6']}]" if self.addr_v["6"] else self.addr_v["4"]
164
self.remote_baddr = f"[{self.remote_addr_v['6']}]" if self.remote_addr_v["6"] else self.remote_addr_v["4"]
165
166
self.ifname = self.dev['ifname']
167
self.ifindex = self.dev['ifindex']
168
169
# resolve remote interface name
170
self.remote_ifname = self.resolve_remote_ifc()
171
self.remote_dev = ip("-d link show dev " + self.remote_ifname,
172
host=self.remote, json=True)[0]
173
174
self._required_cmd = {}
175
176
def create_local(self):
177
self._netns = NetNS()
178
self._ns = NetdevSimDev()
179
self._ns_peer = NetdevSimDev(ns=self._netns)
180
181
with open("/proc/self/ns/net") as nsfd0, \
182
open("/var/run/netns/" + self._netns.name) as nsfd1:
183
ifi0 = self._ns.nsims[0].ifindex
184
ifi1 = self._ns_peer.nsims[0].ifindex
185
NetdevSimDev.ctrl_write('link_device',
186
f'{nsfd0.fileno()}:{ifi0} {nsfd1.fileno()}:{ifi1}')
187
188
ip(f" addr add dev {self._ns.nsims[0].ifname} {self.nsim_v4_pfx}1/24")
189
ip(f"-6 addr add dev {self._ns.nsims[0].ifname} {self.nsim_v6_pfx}1/64 nodad")
190
ip(f" link set dev {self._ns.nsims[0].ifname} up")
191
192
ip(f" addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v4_pfx}2/24", ns=self._netns)
193
ip(f"-6 addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v6_pfx}2/64 nodad", ns=self._netns)
194
ip(f" link set dev {self._ns_peer.nsims[0].ifname} up", ns=self._netns)
195
196
def _check_env(self):
197
vars_needed = [
198
["LOCAL_V4", "LOCAL_V6"],
199
["REMOTE_V4", "REMOTE_V6"],
200
["REMOTE_TYPE"],
201
["REMOTE_ARGS"]
202
]
203
missing = []
204
205
for choice in vars_needed:
206
for entry in choice:
207
if entry in self.env:
208
break
209
else:
210
missing.append(choice)
211
# Make sure v4 / v6 configs are symmetric
212
if ("LOCAL_V6" in self.env) != ("REMOTE_V6" in self.env):
213
missing.append(["LOCAL_V6", "REMOTE_V6"])
214
if ("LOCAL_V4" in self.env) != ("REMOTE_V4" in self.env):
215
missing.append(["LOCAL_V4", "REMOTE_V4"])
216
if missing:
217
raise Exception("Invalid environment, missing configuration:", missing,
218
"Please see tools/testing/selftests/drivers/net/README.rst")
219
220
def resolve_remote_ifc(self):
221
v4 = v6 = None
222
if self.remote_addr_v["4"]:
223
v4 = ip("addr show to " + self.remote_addr_v["4"], json=True, host=self.remote)
224
if self.remote_addr_v["6"]:
225
v6 = ip("addr show to " + self.remote_addr_v["6"], json=True, host=self.remote)
226
if v4 and v6 and v4[0]["ifname"] != v6[0]["ifname"]:
227
raise Exception("Can't resolve remote interface name, v4 and v6 don't match")
228
if (v4 and len(v4) > 1) or (v6 and len(v6) > 1):
229
raise Exception("Can't resolve remote interface name, multiple interfaces match")
230
return v6[0]["ifname"] if v6 else v4[0]["ifname"]
231
232
def __del__(self):
233
if self._ns:
234
self._ns.remove()
235
self._ns = None
236
if self._ns_peer:
237
self._ns_peer.remove()
238
self._ns_peer = None
239
if self._netns:
240
del self._netns
241
self._netns = None
242
if self.remote:
243
del self.remote
244
self.remote = None
245
246
def require_ipver(self, ipver):
247
if not self.addr_v[ipver] or not self.remote_addr_v[ipver]:
248
raise KsftSkipEx(f"Test requires IPv{ipver} connectivity")
249
250
def require_nsim(self):
251
if self._ns is None:
252
raise KsftXfailEx("Test only works on netdevsim")
253
254
def _require_cmd(self, comm, key, host=None):
255
cached = self._required_cmd.get(comm, {})
256
if cached.get(key) is None:
257
cached[key] = cmd("command -v -- " + comm, fail=False,
258
shell=True, host=host).ret == 0
259
self._required_cmd[comm] = cached
260
return cached[key]
261
262
def require_cmd(self, comm, local=True, remote=False):
263
if local:
264
if not self._require_cmd(comm, "local"):
265
raise KsftSkipEx("Test requires command: " + comm)
266
if remote:
267
if not self._require_cmd(comm, "remote", host=self.remote):
268
raise KsftSkipEx("Test requires (remote) command: " + comm)
269
270
def wait_hw_stats_settle(self):
271
"""
272
Wait for HW stats to become consistent, some devices DMA HW stats
273
periodically so events won't be reflected until next sync.
274
Good drivers will tell us via ethtool what their sync period is.
275
"""
276
if self._stats_settle_time is None:
277
data = {}
278
try:
279
data = ethtool("-c " + self.ifname, json=True)[0]
280
except CmdExitFailure as e:
281
if "Operation not supported" not in e.cmd.stderr:
282
raise
283
284
self._stats_settle_time = 0.025 + \
285
data.get('stats-block-usecs', 0) / 1000 / 1000
286
287
time.sleep(self._stats_settle_time)
288
289