Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/tools/testing/selftests/drivers/net/hw/xdp_metadata.py
171056 views
1
#!/usr/bin/env python3
2
# SPDX-License-Identifier: GPL-2.0
3
4
"""
5
Tests for XDP metadata kfuncs (e.g. bpf_xdp_metadata_rx_hash).
6
7
These tests load device-bound XDP programs from xdp_metadata.bpf.o
8
that call metadata kfuncs, send traffic, and verify the extracted
9
metadata via BPF maps.
10
"""
11
from lib.py import ksft_run, ksft_eq, ksft_exit, ksft_ge, ksft_ne, ksft_pr
12
from lib.py import KsftNamedVariant, ksft_variants
13
from lib.py import CmdExitFailure, KsftSkipEx, NetDrvEpEnv
14
from lib.py import NetdevFamily
15
from lib.py import bkg, cmd, rand_port, wait_port_listen
16
from lib.py import ip, bpftool, defer
17
from lib.py import bpf_map_set, bpf_map_dump, bpf_prog_map_ids
18
19
20
def _load_xdp_metadata_prog(cfg, prog_name, bpf_file="xdp_metadata.bpf.o"):
21
"""Load a device-bound XDP metadata program and return prog/map info.
22
23
Returns:
24
dict with 'id', 'name', and 'maps' (name -> map_id).
25
"""
26
abs_path = cfg.net_lib_dir / bpf_file
27
pin_dir = "/sys/fs/bpf/xdp_metadata_test"
28
29
cmd(f"rm -rf {pin_dir}", shell=True, fail=False)
30
cmd(f"mkdir -p {pin_dir}", shell=True)
31
32
try:
33
bpftool(f"prog loadall {abs_path} {pin_dir} type xdp "
34
f"xdpmeta_dev {cfg.ifname}")
35
except CmdExitFailure as e:
36
cmd(f"rm -rf {pin_dir}", shell=True, fail=False)
37
raise KsftSkipEx(
38
f"Failed to load device-bound XDP program '{prog_name}'"
39
) from e
40
defer(cmd, f"rm -rf {pin_dir}", shell=True, fail=False)
41
42
pin_path = f"{pin_dir}/{prog_name}"
43
ip(f"link set dev {cfg.ifname} xdpdrv pinned {pin_path}")
44
defer(ip, f"link set dev {cfg.ifname} xdpdrv off")
45
46
xdp_info = ip(f"-d link show dev {cfg.ifname}", json=True)[0]
47
prog_id = xdp_info["xdp"]["prog"]["id"]
48
49
return {"id": prog_id,
50
"name": xdp_info["xdp"]["prog"]["name"],
51
"maps": bpf_prog_map_ids(prog_id)}
52
53
54
def _send_probe(cfg, port, proto="tcp"):
55
"""Send a single payload from the remote end using socat.
56
57
Args:
58
cfg: Configuration object containing network settings.
59
port: Port number for the exchange.
60
proto: Protocol to use, either "tcp" or "udp".
61
"""
62
cfg.require_cmd("socat", remote=True)
63
64
if proto == "tcp":
65
rx_cmd = f"socat -{cfg.addr_ipver} -T 2 TCP-LISTEN:{port},reuseport STDOUT"
66
tx_cmd = f"echo -n rss_hash_test | socat -t 2 -u STDIN TCP:{cfg.baddr}:{port}"
67
else:
68
rx_cmd = f"socat -{cfg.addr_ipver} -T 2 -u UDP-RECV:{port},reuseport STDOUT"
69
tx_cmd = f"echo -n rss_hash_test | socat -t 2 -u STDIN UDP:{cfg.baddr}:{port}"
70
71
with bkg(rx_cmd, exit_wait=True):
72
wait_port_listen(port, proto=proto)
73
cmd(tx_cmd, host=cfg.remote, shell=True)
74
75
76
# BPF map keys matching the enums in xdp_metadata.bpf.c
77
_SETUP_KEY_PORT = 1
78
79
_RSS_KEY_HASH = 0
80
_RSS_KEY_TYPE = 1
81
_RSS_KEY_PKT_CNT = 2
82
_RSS_KEY_ERR_CNT = 3
83
84
XDP_RSS_L4 = 0x8 # BIT(3) from enum xdp_rss_hash_type
85
86
87
@ksft_variants([
88
KsftNamedVariant("tcp", "tcp"),
89
KsftNamedVariant("udp", "udp"),
90
])
91
def test_xdp_rss_hash(cfg, proto):
92
"""Test RSS hash metadata extraction via bpf_xdp_metadata_rx_hash().
93
94
This test will only run on devices that support xdp-rx-metadata-features.
95
96
Loads the xdp_rss_hash program from xdp_metadata, sends a packet using
97
the specified protocol, and verifies that the program extracted a non-zero
98
hash with an L4 hash type.
99
"""
100
dev_info = cfg.netnl.dev_get({"ifindex": cfg.ifindex})
101
rx_meta = dev_info.get("xdp-rx-metadata-features", [])
102
if "hash" not in rx_meta:
103
raise KsftSkipEx("device does not support XDP rx hash metadata")
104
105
prog_info = _load_xdp_metadata_prog(cfg, "xdp_rss_hash")
106
107
port = rand_port()
108
bpf_map_set("map_xdp_setup", _SETUP_KEY_PORT, port)
109
110
rss_map_id = prog_info["maps"]["map_rss"]
111
112
_send_probe(cfg, port, proto=proto)
113
114
rss = bpf_map_dump(rss_map_id)
115
116
pkt_cnt = rss.get(_RSS_KEY_PKT_CNT, 0)
117
err_cnt = rss.get(_RSS_KEY_ERR_CNT, 0)
118
hash_val = rss.get(_RSS_KEY_HASH, 0)
119
hash_type = rss.get(_RSS_KEY_TYPE, 0)
120
121
ksft_ge(pkt_cnt, 1, comment="should have received at least one packet")
122
ksft_eq(err_cnt, 0, comment=f"RSS hash error count: {err_cnt}")
123
124
ksft_ne(hash_val, 0,
125
f"RSS hash should be non-zero for {proto.upper()} traffic")
126
ksft_pr(f" RSS hash: {hash_val:#010x}")
127
128
ksft_pr(f" RSS hash type: {hash_type:#06x}")
129
ksft_ne(hash_type & XDP_RSS_L4, 0,
130
f"RSS hash type should include L4 for {proto.upper()} traffic")
131
132
133
def main():
134
"""Run XDP metadata kfunc tests against a real device."""
135
with NetDrvEpEnv(__file__) as cfg:
136
cfg.netnl = NetdevFamily()
137
ksft_run(
138
[
139
test_xdp_rss_hash,
140
],
141
args=(cfg,))
142
ksft_exit()
143
144
145
if __name__ == "__main__":
146
main()
147
148