Path: blob/master/tools/testing/selftests/bpf/generate_udp_fragments.py
26285 views
#!/bin/env python31# SPDX-License-Identifier: GPL-2.023"""4This script helps generate fragmented UDP packets.56While it is technically possible to dynamically generate7fragmented packets in C, it is much harder to read and write8said code. `scapy` is relatively industry standard and really9easy to read / write.1011So we choose to write this script that generates a valid C12header. Rerun script and commit generated file after any13modifications.14"""1516import argparse17import os1819from scapy.all import *202122# These constants must stay in sync with `ip_check_defrag.c`23VETH1_ADDR = "172.16.1.200"24VETH0_ADDR6 = "fc00::100"25VETH1_ADDR6 = "fc00::200"26CLIENT_PORT = 4887827SERVER_PORT = 4887928MAGIC_MESSAGE = "THIS IS THE ORIGINAL MESSAGE, PLEASE REASSEMBLE ME"293031def print_header(f):32f.write("// SPDX-License-Identifier: GPL-2.0\n")33f.write("/* DO NOT EDIT -- this file is generated */\n")34f.write("\n")35f.write("#ifndef _IP_CHECK_DEFRAG_FRAGS_H\n")36f.write("#define _IP_CHECK_DEFRAG_FRAGS_H\n")37f.write("\n")38f.write("#include <stdint.h>\n")39f.write("\n")404142def print_frags(f, frags, v6):43for idx, frag in enumerate(frags):44# 10 bytes per line to keep width in check45chunks = [frag[i : i + 10] for i in range(0, len(frag), 10)]46chunks_fmted = [", ".join([str(hex(b)) for b in chunk]) for chunk in chunks]47suffix = "6" if v6 else ""4849f.write(f"static uint8_t frag{suffix}_{idx}[] = {{\n")50for chunk in chunks_fmted:51f.write(f"\t{chunk},\n")52f.write(f"}};\n")535455def print_trailer(f):56f.write("\n")57f.write("#endif /* _IP_CHECK_DEFRAG_FRAGS_H */\n")585960def main(f):61# srcip of 0 is filled in by IP_HDRINCL62sip = "0.0.0.0"63sip6 = VETH0_ADDR664dip = VETH1_ADDR65dip6 = VETH1_ADDR666sport = CLIENT_PORT67dport = SERVER_PORT68payload = MAGIC_MESSAGE.encode()6970# Disable UDPv4 checksums to keep code simpler71pkt = IP(src=sip,dst=dip) / UDP(sport=sport,dport=dport,chksum=0) / Raw(load=payload)72# UDPv6 requires a checksum73# Also pin the ipv6 fragment header ID, otherwise it's a random value74pkt6 = IPv6(src=sip6,dst=dip6) / IPv6ExtHdrFragment(id=0xBEEF) / UDP(sport=sport,dport=dport) / Raw(load=payload)7576frags = [f.build() for f in pkt.fragment(24)]77frags6 = [f.build() for f in fragment6(pkt6, 72)]7879print_header(f)80print_frags(f, frags, False)81print_frags(f, frags6, True)82print_trailer(f)838485if __name__ == "__main__":86dir = os.path.dirname(os.path.realpath(__file__))87header = f"{dir}/ip_check_defrag_frags.h"88with open(header, "w") as f:89main(f)909192