Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/tools/testing/selftests/drivers/net/lib/py/env.py
26305 views
1
# SPDX-License-Identifier: GPL-2.0
2
3
import os
4
import time
5
from pathlib import Path
6
from lib.py import KsftSkipEx, KsftXfailEx
7
from lib.py import ksft_setup
8
from lib.py import cmd, ethtool, ip, CmdExitFailure
9
from lib.py import NetNS, NetdevSimDev
10
from .remote import Remote
11
12
13
class NetDrvEnvBase:
14
"""
15
Base class for a NIC / host environments
16
17
Attributes:
18
test_dir: Path to the source directory of the test
19
net_lib_dir: Path to the net/lib directory
20
"""
21
def __init__(self, src_path):
22
self.src_path = Path(src_path)
23
self.test_dir = self.src_path.parent.resolve()
24
self.net_lib_dir = (Path(__file__).parent / "../../../../net/lib").resolve()
25
26
self.env = self._load_env_file()
27
28
def _load_env_file(self):
29
env = os.environ.copy()
30
31
src_dir = Path(self.src_path).parent.resolve()
32
if not (src_dir / "net.config").exists():
33
return ksft_setup(env)
34
35
with open((src_dir / "net.config").as_posix(), 'r') as fp:
36
for line in fp.readlines():
37
full_file = line
38
# Strip comments
39
pos = line.find("#")
40
if pos >= 0:
41
line = line[:pos]
42
line = line.strip()
43
if not line:
44
continue
45
pair = line.split('=', maxsplit=1)
46
if len(pair) != 2:
47
raise Exception("Can't parse configuration line:", full_file)
48
env[pair[0]] = pair[1]
49
return ksft_setup(env)
50
51
52
class NetDrvEnv(NetDrvEnvBase):
53
"""
54
Class for a single NIC / host env, with no remote end
55
"""
56
def __init__(self, src_path, nsim_test=None, **kwargs):
57
super().__init__(src_path)
58
59
self._ns = None
60
61
if 'NETIF' in self.env:
62
if nsim_test is True:
63
raise KsftXfailEx("Test only works on netdevsim")
64
65
self.dev = ip("-d link show dev " + self.env['NETIF'], json=True)[0]
66
else:
67
if nsim_test is False:
68
raise KsftXfailEx("Test does not work on netdevsim")
69
70
self._ns = NetdevSimDev(**kwargs)
71
self.dev = self._ns.nsims[0].dev
72
self.ifname = self.dev['ifname']
73
self.ifindex = self.dev['ifindex']
74
75
def __enter__(self):
76
ip(f"link set dev {self.dev['ifname']} up")
77
78
return self
79
80
def __exit__(self, ex_type, ex_value, ex_tb):
81
"""
82
__exit__ gets called at the end of a "with" block.
83
"""
84
self.__del__()
85
86
def __del__(self):
87
if self._ns:
88
self._ns.remove()
89
self._ns = None
90
91
92
class NetDrvEpEnv(NetDrvEnvBase):
93
"""
94
Class for an environment with a local device and "remote endpoint"
95
which can be used to send traffic in.
96
97
For local testing it creates two network namespaces and a pair
98
of netdevsim devices.
99
"""
100
101
# Network prefixes used for local tests
102
nsim_v4_pfx = "192.0.2."
103
nsim_v6_pfx = "2001:db8::"
104
105
def __init__(self, src_path, nsim_test=None):
106
super().__init__(src_path)
107
108
self._stats_settle_time = None
109
110
# Things we try to destroy
111
self.remote = None
112
# These are for local testing state
113
self._netns = None
114
self._ns = None
115
self._ns_peer = None
116
117
self.addr_v = { "4": None, "6": None }
118
self.remote_addr_v = { "4": None, "6": None }
119
120
if "NETIF" in self.env:
121
if nsim_test is True:
122
raise KsftXfailEx("Test only works on netdevsim")
123
self._check_env()
124
125
self.dev = ip("-d link show dev " + self.env['NETIF'], json=True)[0]
126
127
self.addr_v["4"] = self.env.get("LOCAL_V4")
128
self.addr_v["6"] = self.env.get("LOCAL_V6")
129
self.remote_addr_v["4"] = self.env.get("REMOTE_V4")
130
self.remote_addr_v["6"] = self.env.get("REMOTE_V6")
131
kind = self.env["REMOTE_TYPE"]
132
args = self.env["REMOTE_ARGS"]
133
else:
134
if nsim_test is False:
135
raise KsftXfailEx("Test does not work on netdevsim")
136
137
self.create_local()
138
139
self.dev = self._ns.nsims[0].dev
140
141
self.addr_v["4"] = self.nsim_v4_pfx + "1"
142
self.addr_v["6"] = self.nsim_v6_pfx + "1"
143
self.remote_addr_v["4"] = self.nsim_v4_pfx + "2"
144
self.remote_addr_v["6"] = self.nsim_v6_pfx + "2"
145
kind = "netns"
146
args = self._netns.name
147
148
self.remote = Remote(kind, args, src_path)
149
150
self.addr_ipver = "6" if self.addr_v["6"] else "4"
151
self.addr = self.addr_v[self.addr_ipver]
152
self.remote_addr = self.remote_addr_v[self.addr_ipver]
153
154
# Bracketed addresses, some commands need IPv6 to be inside []
155
self.baddr = f"[{self.addr_v['6']}]" if self.addr_v["6"] else self.addr_v["4"]
156
self.remote_baddr = f"[{self.remote_addr_v['6']}]" if self.remote_addr_v["6"] else self.remote_addr_v["4"]
157
158
self.ifname = self.dev['ifname']
159
self.ifindex = self.dev['ifindex']
160
161
# resolve remote interface name
162
self.remote_ifname = self.resolve_remote_ifc()
163
164
self._required_cmd = {}
165
166
def create_local(self):
167
self._netns = NetNS()
168
self._ns = NetdevSimDev()
169
self._ns_peer = NetdevSimDev(ns=self._netns)
170
171
with open("/proc/self/ns/net") as nsfd0, \
172
open("/var/run/netns/" + self._netns.name) as nsfd1:
173
ifi0 = self._ns.nsims[0].ifindex
174
ifi1 = self._ns_peer.nsims[0].ifindex
175
NetdevSimDev.ctrl_write('link_device',
176
f'{nsfd0.fileno()}:{ifi0} {nsfd1.fileno()}:{ifi1}')
177
178
ip(f" addr add dev {self._ns.nsims[0].ifname} {self.nsim_v4_pfx}1/24")
179
ip(f"-6 addr add dev {self._ns.nsims[0].ifname} {self.nsim_v6_pfx}1/64 nodad")
180
ip(f" link set dev {self._ns.nsims[0].ifname} up")
181
182
ip(f" addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v4_pfx}2/24", ns=self._netns)
183
ip(f"-6 addr add dev {self._ns_peer.nsims[0].ifname} {self.nsim_v6_pfx}2/64 nodad", ns=self._netns)
184
ip(f" link set dev {self._ns_peer.nsims[0].ifname} up", ns=self._netns)
185
186
def _check_env(self):
187
vars_needed = [
188
["LOCAL_V4", "LOCAL_V6"],
189
["REMOTE_V4", "REMOTE_V6"],
190
["REMOTE_TYPE"],
191
["REMOTE_ARGS"]
192
]
193
missing = []
194
195
for choice in vars_needed:
196
for entry in choice:
197
if entry in self.env:
198
break
199
else:
200
missing.append(choice)
201
# Make sure v4 / v6 configs are symmetric
202
if ("LOCAL_V6" in self.env) != ("REMOTE_V6" in self.env):
203
missing.append(["LOCAL_V6", "REMOTE_V6"])
204
if ("LOCAL_V4" in self.env) != ("REMOTE_V4" in self.env):
205
missing.append(["LOCAL_V4", "REMOTE_V4"])
206
if missing:
207
raise Exception("Invalid environment, missing configuration:", missing,
208
"Please see tools/testing/selftests/drivers/net/README.rst")
209
210
def resolve_remote_ifc(self):
211
v4 = v6 = None
212
if self.remote_addr_v["4"]:
213
v4 = ip("addr show to " + self.remote_addr_v["4"], json=True, host=self.remote)
214
if self.remote_addr_v["6"]:
215
v6 = ip("addr show to " + self.remote_addr_v["6"], json=True, host=self.remote)
216
if v4 and v6 and v4[0]["ifname"] != v6[0]["ifname"]:
217
raise Exception("Can't resolve remote interface name, v4 and v6 don't match")
218
if (v4 and len(v4) > 1) or (v6 and len(v6) > 1):
219
raise Exception("Can't resolve remote interface name, multiple interfaces match")
220
return v6[0]["ifname"] if v6 else v4[0]["ifname"]
221
222
def __enter__(self):
223
return self
224
225
def __exit__(self, ex_type, ex_value, ex_tb):
226
"""
227
__exit__ gets called at the end of a "with" block.
228
"""
229
self.__del__()
230
231
def __del__(self):
232
if self._ns:
233
self._ns.remove()
234
self._ns = None
235
if self._ns_peer:
236
self._ns_peer.remove()
237
self._ns_peer = None
238
if self._netns:
239
del self._netns
240
self._netns = None
241
if self.remote:
242
del self.remote
243
self.remote = None
244
245
def require_ipver(self, ipver):
246
if not self.addr_v[ipver] or not self.remote_addr_v[ipver]:
247
raise KsftSkipEx(f"Test requires IPv{ipver} connectivity")
248
249
def _require_cmd(self, comm, key, host=None):
250
cached = self._required_cmd.get(comm, {})
251
if cached.get(key) is None:
252
cached[key] = cmd("command -v -- " + comm, fail=False,
253
shell=True, host=host).ret == 0
254
self._required_cmd[comm] = cached
255
return cached[key]
256
257
def require_cmd(self, comm, local=True, remote=False):
258
if local:
259
if not self._require_cmd(comm, "local"):
260
raise KsftSkipEx("Test requires command: " + comm)
261
if remote:
262
if not self._require_cmd(comm, "remote", host=self.remote):
263
raise KsftSkipEx("Test requires (remote) command: " + comm)
264
265
def wait_hw_stats_settle(self):
266
"""
267
Wait for HW stats to become consistent, some devices DMA HW stats
268
periodically so events won't be reflected until next sync.
269
Good drivers will tell us via ethtool what their sync period is.
270
"""
271
if self._stats_settle_time is None:
272
data = {}
273
try:
274
data = ethtool("-c " + self.ifname, json=True)[0]
275
except CmdExitFailure as e:
276
if "Operation not supported" not in e.cmd.stderr:
277
raise
278
279
self._stats_settle_time = 0.025 + \
280
data.get('stats-block-usecs', 0) / 1000 / 1000
281
282
time.sleep(self._stats_settle_time)
283
284