Path: blob/master/tools/testing/selftests/drivers/net/hw/devmem.py
26295 views
#!/usr/bin/env python31# SPDX-License-Identifier: GPL-2.023from os import path4from lib.py import ksft_run, ksft_exit5from lib.py import ksft_eq, KsftSkipEx6from lib.py import NetDrvEpEnv7from lib.py import bkg, cmd, rand_port, wait_port_listen8from lib.py import ksft_disruptive91011def require_devmem(cfg):12if not hasattr(cfg, "_devmem_probed"):13probe_command = f"{cfg.bin_local} -f {cfg.ifname}"14cfg._devmem_supported = cmd(probe_command, fail=False, shell=True).ret == 015cfg._devmem_probed = True1617if not cfg._devmem_supported:18raise KsftSkipEx("Test requires devmem support")192021@ksft_disruptive22def check_rx(cfg) -> None:23require_devmem(cfg)2425port = rand_port()26socat = f"socat -u - TCP{cfg.addr_ipver}:{cfg.addr}:{port},bind={cfg.remote_addr}:{port}"27listen_cmd = f"{cfg.bin_local} -l -f {cfg.ifname} -s {cfg.addr} -p {port} -c {cfg.remote_addr} -v 7"2829with bkg(listen_cmd, exit_wait=True) as ncdevmem:30wait_port_listen(port)31cmd(f"yes $(echo -e \x01\x02\x03\x04\x05\x06) | \32head -c 1K | {socat}", host=cfg.remote, shell=True)3334ksft_eq(ncdevmem.ret, 0)353637@ksft_disruptive38def check_tx(cfg) -> None:39require_devmem(cfg)4041port = rand_port()42listen_cmd = f"socat -U - TCP{cfg.addr_ipver}-LISTEN:{port}"4344with bkg(listen_cmd) as socat:45wait_port_listen(port)46cmd(f"echo -e \"hello\\nworld\"| {cfg.bin_remote} -f {cfg.ifname} -s {cfg.addr} -p {port}", host=cfg.remote, shell=True)4748ksft_eq(socat.stdout.strip(), "hello\nworld")495051@ksft_disruptive52def check_tx_chunks(cfg) -> None:53require_devmem(cfg)5455port = rand_port()56listen_cmd = f"socat -U - TCP{cfg.addr_ipver}-LISTEN:{port}"5758with bkg(listen_cmd, exit_wait=True) as socat:59wait_port_listen(port)60cmd(f"echo -e \"hello\\nworld\"| {cfg.bin_remote} -f {cfg.ifname} -s {cfg.addr} -p {port} -z 3", host=cfg.remote, shell=True)6162ksft_eq(socat.stdout.strip(), "hello\nworld")636465def main() -> None:66with NetDrvEpEnv(__file__) as cfg:67cfg.bin_local = path.abspath(path.dirname(__file__) + "/ncdevmem")68cfg.bin_remote = cfg.remote.deploy(cfg.bin_local)6970ksft_run([check_rx, check_tx, check_tx_chunks],71args=(cfg, ))72ksft_exit()737475if __name__ == "__main__":76main()777879