diff --git a/sbin/ping/tests/Makefile b/sbin/ping/tests/Makefile --- a/sbin/ping/tests/Makefile +++ b/sbin/ping/tests/Makefile @@ -13,6 +13,8 @@ ${PACKAGE}FILES+= ping_6_c1_s8_t1.out ${PACKAGE}FILES+= ping_c1_s56_t1_S127.out ${PACKAGE}FILES+= ping_c1_s8_t1_S1.out -${PACKAGE}FILES+= injection.py +${PACKAGE}FILES+= pinger.py + +${PACKAGE}FILESMODE_pinger.py= 0555 .include diff --git a/sbin/ping/tests/injection.py b/sbin/ping/tests/injection.py deleted file mode 100644 --- a/sbin/ping/tests/injection.py +++ /dev/null @@ -1,83 +0,0 @@ -#! /usr/bin/env python3 -# Used to inject various malformed packets - -import errno -import logging -import subprocess -import sys - -logging.getLogger("scapy").setLevel(logging.CRITICAL) - -from scapy.all import IP, ICMP, IPOption -import scapy.layers.all -from scapy.layers.inet import ICMPEcho_am -from scapy.layers.tuntap import TunTapInterface - -SRC_ADDR = "192.0.2.14" -DST_ADDR = "192.0.2.15" - -mode = sys.argv[1] -ip = None - -# fill opts with nop (0x01) -opts = b'' -for x in range(40): - opts += b'\x01' - - -# Create and configure a tun interface with an RFC5737 nonrouteable address -create_proc = subprocess.run( - args=["ifconfig", "tun", "create"], - capture_output=True, - check=True, - text=True) -iface = create_proc.stdout.strip() -tun = TunTapInterface(iface) -with open("tun.txt", "w") as f: - f.write(iface) -subprocess.run(["ifconfig", tun.iface, "up"]) -subprocess.run(["ifconfig", tun.iface, SRC_ADDR, DST_ADDR]) - -ping = subprocess.Popen( - args=["/sbin/ping", "-v", "-c1", "-t1", DST_ADDR], - text=True -) -# Wait for /sbin/ping to ping us -echo_req = tun.recv() - -# Construct the response packet -if mode == "opts": - # Sending reply with IP options - echo_reply = IP( - dst=SRC_ADDR, - src=DST_ADDR, - options=IPOption(opts) - )/ICMP(type=0, code=0, id=echo_req.payload.id)/echo_req.payload.payload -elif mode == "pip": - # packet in packet (inner has options) - - inner = IP( - dst=SRC_ADDR, - src=DST_ADDR, - options=IPOption(opts) - )/ICMP(type=0, code=0, id=echo_req.payload.id)/echo_req.payload.payload - outer = IP( - dst=SRC_ADDR, - src=DST_ADDR - )/ICMP(type=3, code=1) # host unreach - - echo_reply = outer/inner -elif mode == "reply": - # Sending normal echo reply - echo_reply = IP( - dst=SRC_ADDR, - src=DST_ADDR, - )/ICMP(type=0, code=0, id=echo_req.payload.id)/echo_req.payload.payload -else: - print("unknown mode {}".format(mode)) - exit(1) - -tun.send(echo_reply) -outs, errs = ping.communicate() - -sys.exit(ping.returncode) diff --git a/sbin/ping/tests/ping_test.sh b/sbin/ping/tests/ping_test.sh --- a/sbin/ping/tests/ping_test.sh +++ b/sbin/ping/tests/ping_test.sh @@ -162,11 +162,18 @@ } inject_opts_body() { - atf_check -s exit:0 -o match:"wrong total length" -o match:"NOP" python3 $(atf_get_srcdir)/injection.py opts + atf_check -s exit:0 -o match:"wrong total length" -o match:"NOP" \ + $(atf_get_srcdir)/pinger.py \ + --iface tun0 \ + --src 192.0.2.1 \ + --dst 192.0.2.2 \ + --icmp_type 0 \ + --icmp_code 0 \ + --opts NOP-40 } inject_opts_cleanup() { - ifconfig `cat tun.txt` destroy + pinger_cleanup } atf_test_case "inject_pip" "cleanup" @@ -178,11 +185,19 @@ } inject_pip_body() { - atf_check -s exit:2 -o match:"Destination Host Unreachable" -o not-match:"01010101" python3 $(atf_get_srcdir)/injection.py pip + # XXX This test is wrong. It should match (not not-match) 40 NOPs. + atf_check -s exit:2 -o match:"Destination Host Unreachable" -o not-match:"01010101" \ + $(atf_get_srcdir)/pinger.py \ + --iface tun0 \ + --src 192.0.2.1 \ + --dst 192.0.2.2 \ + --icmp_type 3 \ + --icmp_code 1 \ + --opts NOP-40 } inject_pip_cleanup() { - ifconfig `cat tun.txt` destroy + pinger_cleanup } # This is redundant with the ping_ tests, but it serves to ensure that scapy.py @@ -196,11 +211,17 @@ } inject_reply_body() { - atf_check -s exit:0 -o match:"1 packets transmitted, 1 packets received" python3 $(atf_get_srcdir)/injection.py reply + atf_check -s exit:0 -o match:"1 packets transmitted, 1 packets received" \ + $(atf_get_srcdir)/pinger.py \ + --iface tun0 \ + --src 192.0.2.1 \ + --dst 192.0.2.2 \ + --icmp_type 0 \ + --icmp_code 0 } inject_reply_cleanup() { - ifconfig `cat tun.txt` destroy + pinger_cleanup } atf_init_test_cases() @@ -230,3 +251,13 @@ "$1" >"$1".filtered atf_check -s exit:0 diff -u "$1".filtered "$2" } + +pinger_cleanup() +{ + if [ -f created_interfaces.lst ]; then + for ifname in `cat created_interfaces.lst` + do + ifconfig ${ifname} destroy + done + fi +} diff --git a/sbin/ping/tests/pinger.py b/sbin/ping/tests/pinger.py new file mode 100644 --- /dev/null +++ b/sbin/ping/tests/pinger.py @@ -0,0 +1,198 @@ +#!/usr/bin/env python3 +# +# $FreeBSD$ +# + +import sys +import subprocess +import argparse +import logging +logging.getLogger("scapy").setLevel(logging.CRITICAL) + +import scapy.all as sc + +routing_options=["RR", "RR-same", "RR-trunc", + "LSRR", "LSRR-trunc", + "SSRR", "SSRR-trunc"] + +def parse_args(): + parser=argparse.ArgumentParser(prog="pinger.py", + description="P I N G E R", + epilog="This utility creates a tun " + "interface, sends an echo request, and " + "forges the reply.") + # Required arguments + # Avoid setting defaults on these arguments, + # as we want to set them explicitly in the tests + parser.add_argument("--iface", type=str, required=True, + help="Interface to send packet to") + parser.add_argument("--src", type=str, required=True, + help="Source packet IP") + parser.add_argument("--dst", type=str, required=True, + help="Destination packet IP") + parser.add_argument("--icmp_type", type=int, required=True, + help="ICMP type") + parser.add_argument("--icmp_code", type=int, required=True, + help="ICMP code") + # IP arguments + parser.add_argument("--flags", type=str, default="", + choices=["DF", "MF"], + help="IP flags") + parser.add_argument("--opts", type=str, default="", + choices=["EOL", "NOP", "NOP-40", "unk", "unk-40"] + + routing_options, help="Include IP options") + parser.add_argument("--special", type=str, default="", + choices=["tcp", "udp", "wrong", "warp"], + help="Send a special packet") + # ICMP arguments + # Match names with + parser.add_argument("--icmp_pptr", type=int, default=0, + help="ICMP pointer") + parser.add_argument("--icmp_gwaddr", type=str, default="0.0.0.0", + help="ICMP gateway IP address") + parser.add_argument("--icmp_nextmtu", type=int, default=0, + help="ICMP next MTU") + parser.add_argument("--icmp_otime", type=int, default=0, + help="ICMP originate timestamp") + parser.add_argument("--icmp_rtime", type=int, default=0, + help="ICMP receive timestamp") + parser.add_argument("--icmp_ttime", type=int, default=0, + help="ICMP transmit timestamp") + parser.add_argument("--icmp_mask", type=str, default="0.0.0.0", + help="ICMP address mask") + parser.add_argument("--request", type=str, default="", + choices=["mask", "timestamp"], + help="Request type") + # Miscellaneous arguments + parser.add_argument("--count", type=int, default=1, + help="Number of packets to send") + parser.add_argument("--dup", action="store_true", + help="Duplicate packets") + parser.add_argument("--version", action="version", version='%(prog)s 1.0') + return parser.parse_args() + +def construct_response_packet(echo, ip, icmp, special): + icmp_id_seq_types=[0, 8, 13, 14, 15, 16, 17, 18, 37, 38] + oip=echo[sc.IP] + oicmp=echo[sc.ICMP] + load=echo[sc.ICMP].payload + oip[sc.IP].remove_payload() + oicmp[sc.ICMP].remove_payload() + oicmp.type=8 + + # As if the original IP packet had these set + oip.ihl=None + oip.len=None + oip.id=1 + oip.flags=ip.flags + oip.chksum=None + oip.options=ip.options + + # Special options + if special == "tcp": + oip.proto="tcp" + tcp=sc.TCP(sport=1234, dport=5678) + return ip/icmp/oip/tcp + if special == "udp": + oip.proto="udp" + udp=sc.UDP(sport=1234, dport=5678) + return ip/icmp/oip/udp + if special == "warp": + # Build a package with a timestamp of INT_MAX + # (time-warped package) + payload_no_timestamp=sc.bytes_hex(load)[16:] + load=((b"\xff" * 8) + sc.hex_bytes(payload_no_timestamp)) + if special == "wrong": + # Build a package with a wrong last byte + payload_no_last_byte=sc.bytes_hex(load)[:-2] + load=((sc.hex_bytes(payload_no_last_byte)) + b"\x00") + + if icmp.type in icmp_id_seq_types: + pkt=ip/icmp/load + else: + ip.options="" + pkt=ip/icmp/oip/oicmp/load + + return pkt + +def generate_ip_options(opts): + routers=["192.0.2.10", "192.0.2.20", "192.0.2.30", + "192.0.2.40", "192.0.2.50", "192.0.2.60", + "192.0.2.70", "192.0.2.80", "192.0.2.90"] + routers_zero=[0, 0, 0, 0, 0, 0, 0, 0, 0] + if opts == "EOL": + options=sc.IPOption(b"\x00") + elif opts == "NOP": + options=sc.IPOption(b"\x01") + elif opts == "NOP-40": + options=sc.IPOption(b"\x01" * 40) + elif opts == "RR": + options=sc.IPOption_RR(pointer=40, routers=routers) + elif opts == "RR-same": + options=sc.IPOption_RR(pointer=3, routers=routers_zero) + elif opts == "RR-trunc": + options=sc.IPOption_RR(length=7, routers=routers_zero) + elif opts == "LSRR": + subprocess.run(["sysctl", "net.inet.ip.process_options=0"], check=True) + options=sc.IPOption_LSRR(routers=routers) + elif opts == "LSRR-trunc": + subprocess.run(["sysctl", "net.inet.ip.process_options=0"], check=True) + options=sc.IPOption_LSRR(length=3, routers=routers_zero) + elif opts == "SSRR": + subprocess.run(["sysctl", "net.inet.ip.process_options=0"], check=True) + options=sc.IPOption_SSRR(routers=routers) + elif opts == "SSRR-trunc": + subprocess.run(["sysctl", "net.inet.ip.process_options=0"], check=True) + options=sc.IPOption_SSRR(length=3, routers=routers_zero) + elif opts == "unk": + subprocess.run(["sysctl", "net.inet.ip.process_options=0"], check=True) + options=sc.IPOption(b"\x9f") + elif opts == "unk-40": + subprocess.run(["sysctl", "net.inet.ip.process_options=0"], check=True) + options=sc.IPOption(b"\x9f" * 40) + else: + options="" + return options + +def main(): + """P I N G E R""" + args=parse_args() + opts=generate_ip_options(args.opts) + ip=sc.IP(flags=args.flags, src=args.dst, dst=args.src, options=opts) + tun=sc.TunTapInterface(args.iface) + with open("created_interfaces.lst", "w", encoding="utf-8") as file: + file.write(args.iface) + subprocess.run(["ifconfig", tun.iface, "up"], check=True) + subprocess.run(["ifconfig", tun.iface, args.src, args.dst], check=True) + command=["/sbin/ping", "-c", str(args.count), "-t", str(args.count), "-v"] + if args.request == "mask": + command+=["-Mm"] + if args.request == "timestamp": + command+=["-Mt"] + if args.special != "": + command+=["-p1"] + if args.opts in routing_options: + command+=["-R"] + command+=[args.dst] + with subprocess.Popen( + args=command, + text=True + ) as ping: + for dummy in range(args.count): + echo=tun.recv() + icmp=sc.ICMP(type=args.icmp_type, code=args.icmp_code, + id=echo[sc.ICMP].id, seq=echo[sc.ICMP].seq, + ts_ori=args.icmp_otime, ts_rx=args.icmp_rtime, + ts_tx=args.icmp_ttime, gw=args.icmp_gwaddr, + ptr=args.icmp_pptr, + addr_mask=args.icmp_mask, nexthopmtu=args.icmp_nextmtu) + pkt=construct_response_packet(echo, ip, icmp, args.special) + tun.send(pkt) + if args.dup is True: + tun.send(pkt) + ping.communicate() + + sys.exit(ping.returncode) + +if __name__ == "__main__": + main()