Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
torvalds
GitHub Repository: torvalds/linux
Path: blob/master/tools/testing/selftests/drivers/net/hw/uso.py
171056 views
1
#!/usr/bin/env python3
2
# SPDX-License-Identifier: GPL-2.0
3
4
"""Test USO
5
6
Sends large UDP datagrams with UDP_SEGMENT and verifies that the peer
7
receives the expected total payload and that the NIC transmitted at least
8
the expected number of segments.
9
"""
10
import random
11
import socket
12
import string
13
14
from lib.py import ksft_run, ksft_exit, KsftSkipEx
15
from lib.py import ksft_eq, ksft_ge, ksft_variants, KsftNamedVariant
16
from lib.py import NetDrvEpEnv
17
from lib.py import bkg, defer, ethtool, ip, rand_port, wait_port_listen
18
19
# python doesn't expose this constant, so we need to hardcode it to enable UDP
20
# segmentation for large payloads
21
UDP_SEGMENT = 103
22
23
24
def _send_uso(cfg, ipver, mss, total_payload, port):
25
if ipver == "4":
26
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
27
dst = (cfg.remote_addr_v["4"], port)
28
else:
29
sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
30
dst = (cfg.remote_addr_v["6"], port)
31
32
sock.setsockopt(socket.IPPROTO_UDP, UDP_SEGMENT, mss)
33
payload = ''.join(random.choice(string.ascii_lowercase)
34
for _ in range(total_payload))
35
sock.sendto(payload.encode(), dst)
36
sock.close()
37
38
39
def _get_tx_packets(cfg):
40
stats = ip(f"-s link show dev {cfg.ifname}", json=True)[0]
41
return stats['stats64']['tx']['packets']
42
43
44
def _test_uso(cfg, ipver, mss, total_payload):
45
cfg.require_ipver(ipver)
46
cfg.require_cmd("socat", remote=True)
47
48
features = ethtool(f"-k {cfg.ifname}", json=True)
49
uso_was_on = features[0]["tx-udp-segmentation"]["active"]
50
51
try:
52
ethtool(f"-K {cfg.ifname} tx-udp-segmentation on")
53
except Exception as exc:
54
raise KsftSkipEx(
55
"Device does not support tx-udp-segmentation") from exc
56
if not uso_was_on:
57
defer(ethtool, f"-K {cfg.ifname} tx-udp-segmentation off")
58
59
expected_segs = (total_payload + mss - 1) // mss
60
61
port = rand_port(stype=socket.SOCK_DGRAM)
62
rx_cmd = f"socat -{ipver} -T 2 -u UDP-LISTEN:{port},reuseport STDOUT"
63
64
tx_before = _get_tx_packets(cfg)
65
66
with bkg(rx_cmd, host=cfg.remote, exit_wait=True) as rx:
67
wait_port_listen(port, proto="udp", host=cfg.remote)
68
_send_uso(cfg, ipver, mss, total_payload, port)
69
70
ksft_eq(len(rx.stdout), total_payload,
71
comment=f"Received {len(rx.stdout)}B, expected {total_payload}B")
72
73
cfg.wait_hw_stats_settle()
74
75
tx_after = _get_tx_packets(cfg)
76
tx_delta = tx_after - tx_before
77
78
ksft_ge(tx_delta, expected_segs,
79
comment=f"Expected >= {expected_segs} tx packets, got {tx_delta}")
80
81
82
def _uso_variants():
83
for ipver in ["4", "6"]:
84
yield KsftNamedVariant(f"v{ipver}_partial", ipver, 1400, 1400 * 10 + 500)
85
yield KsftNamedVariant(f"v{ipver}_exact", ipver, 1400, 1400 * 5)
86
87
88
@ksft_variants(_uso_variants())
89
def test_uso(cfg, ipver, mss, total_payload):
90
"""Send a USO datagram and verify the peer receives the expected segments."""
91
_test_uso(cfg, ipver, mss, total_payload)
92
93
94
def main() -> None:
95
"""Run USO tests."""
96
with NetDrvEpEnv(__file__) as cfg:
97
ksft_run([test_uso],
98
args=(cfg, ))
99
ksft_exit()
100
101
102
if __name__ == "__main__":
103
main()
104
105