Page Menu
Home
FreeBSD
Search
Configure Global Search
Log In
Files
F149997777
D37876.id114546.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
12 KB
Referenced Files
None
Subscribers
None
D37876.id114546.diff
View Options
diff --git a/sbin/ping/tests/Makefile b/sbin/ping/tests/Makefile
--- a/sbin/ping/tests/Makefile
+++ b/sbin/ping/tests/Makefile
@@ -13,6 +13,8 @@
${PACKAGE}FILES+= ping_6_c1_s8_t1.out
${PACKAGE}FILES+= ping_c1_s56_t1_S127.out
${PACKAGE}FILES+= ping_c1_s8_t1_S1.out
-${PACKAGE}FILES+= injection.py
+${PACKAGE}FILES+= pinger.py
+
+${PACKAGE}FILESMODE_pinger.py= 0555
.include <bsd.test.mk>
diff --git a/sbin/ping/tests/injection.py b/sbin/ping/tests/injection.py
deleted file mode 100644
--- a/sbin/ping/tests/injection.py
+++ /dev/null
@@ -1,83 +0,0 @@
-#! /usr/bin/env python3
-# Used to inject various malformed packets
-
-import errno
-import logging
-import subprocess
-import sys
-
-logging.getLogger("scapy").setLevel(logging.CRITICAL)
-
-from scapy.all import IP, ICMP, IPOption
-import scapy.layers.all
-from scapy.layers.inet import ICMPEcho_am
-from scapy.layers.tuntap import TunTapInterface
-
-SRC_ADDR = "192.0.2.14"
-DST_ADDR = "192.0.2.15"
-
-mode = sys.argv[1]
-ip = None
-
-# fill opts with nop (0x01)
-opts = b''
-for x in range(40):
- opts += b'\x01'
-
-
-# Create and configure a tun interface with an RFC5737 nonrouteable address
-create_proc = subprocess.run(
- args=["ifconfig", "tun", "create"],
- capture_output=True,
- check=True,
- text=True)
-iface = create_proc.stdout.strip()
-tun = TunTapInterface(iface)
-with open("tun.txt", "w") as f:
- f.write(iface)
-subprocess.run(["ifconfig", tun.iface, "up"])
-subprocess.run(["ifconfig", tun.iface, SRC_ADDR, DST_ADDR])
-
-ping = subprocess.Popen(
- args=["/sbin/ping", "-v", "-c1", "-t1", DST_ADDR],
- text=True
-)
-# Wait for /sbin/ping to ping us
-echo_req = tun.recv()
-
-# Construct the response packet
-if mode == "opts":
- # Sending reply with IP options
- echo_reply = IP(
- dst=SRC_ADDR,
- src=DST_ADDR,
- options=IPOption(opts)
- )/ICMP(type=0, code=0, id=echo_req.payload.id)/echo_req.payload.payload
-elif mode == "pip":
- # packet in packet (inner has options)
-
- inner = IP(
- dst=SRC_ADDR,
- src=DST_ADDR,
- options=IPOption(opts)
- )/ICMP(type=0, code=0, id=echo_req.payload.id)/echo_req.payload.payload
- outer = IP(
- dst=SRC_ADDR,
- src=DST_ADDR
- )/ICMP(type=3, code=1) # host unreach
-
- echo_reply = outer/inner
-elif mode == "reply":
- # Sending normal echo reply
- echo_reply = IP(
- dst=SRC_ADDR,
- src=DST_ADDR,
- )/ICMP(type=0, code=0, id=echo_req.payload.id)/echo_req.payload.payload
-else:
- print("unknown mode {}".format(mode))
- exit(1)
-
-tun.send(echo_reply)
-outs, errs = ping.communicate()
-
-sys.exit(ping.returncode)
diff --git a/sbin/ping/tests/ping_test.sh b/sbin/ping/tests/ping_test.sh
--- a/sbin/ping/tests/ping_test.sh
+++ b/sbin/ping/tests/ping_test.sh
@@ -162,11 +162,18 @@
}
inject_opts_body()
{
- atf_check -s exit:0 -o match:"wrong total length" -o match:"NOP" python3 $(atf_get_srcdir)/injection.py opts
+ atf_check -s exit:0 -o match:"wrong total length" -o match:"NOP" \
+ $(atf_get_srcdir)/pinger.py \
+ --iface tun0 \
+ --src 192.0.2.1 \
+ --dst 192.0.2.2 \
+ --icmp_type 0 \
+ --icmp_code 0 \
+ --opts NOP-40
}
inject_opts_cleanup()
{
- ifconfig `cat tun.txt` destroy
+ pinger_cleanup
}
atf_test_case "inject_pip" "cleanup"
@@ -178,11 +185,19 @@
}
inject_pip_body()
{
- atf_check -s exit:2 -o match:"Destination Host Unreachable" -o not-match:"01010101" python3 $(atf_get_srcdir)/injection.py pip
+ # XXX This test is wrong. It should match (not not-match) 40 NOPs.
+ atf_check -s exit:2 -o match:"Destination Host Unreachable" -o not-match:"01010101" \
+ $(atf_get_srcdir)/pinger.py \
+ --iface tun0 \
+ --src 192.0.2.1 \
+ --dst 192.0.2.2 \
+ --icmp_type 3 \
+ --icmp_code 1 \
+ --opts NOP-40
}
inject_pip_cleanup()
{
- ifconfig `cat tun.txt` destroy
+ pinger_cleanup
}
# This is redundant with the ping_ tests, but it serves to ensure that scapy.py
@@ -196,11 +211,17 @@
}
inject_reply_body()
{
- atf_check -s exit:0 -o match:"1 packets transmitted, 1 packets received" python3 $(atf_get_srcdir)/injection.py reply
+ atf_check -s exit:0 -o match:"1 packets transmitted, 1 packets received" \
+ $(atf_get_srcdir)/pinger.py \
+ --iface tun0 \
+ --src 192.0.2.1 \
+ --dst 192.0.2.2 \
+ --icmp_type 0 \
+ --icmp_code 0
}
inject_reply_cleanup()
{
- ifconfig `cat tun.txt` destroy
+ pinger_cleanup
}
atf_init_test_cases()
@@ -230,3 +251,16 @@
"$1" >"$1".filtered
atf_check -s exit:0 diff -u "$1".filtered "$2"
}
+
+pinger_cleanup()
+{
+ if [ -f created_interfaces.lst ]; then
+ for ifname in `cat created_interfaces.lst`
+ do
+ ifconfig ${ifname} destroy
+ done
+ rm created_interfaces.lst
+ fi
+ rm -f std.out
+ rm -f std.out.filtered
+}
diff --git a/sbin/ping/tests/pinger.py b/sbin/ping/tests/pinger.py
new file mode 100644
--- /dev/null
+++ b/sbin/ping/tests/pinger.py
@@ -0,0 +1,197 @@
+#!/usr/bin/env python
+#
+# $FreeBSD$
+#
+
+import sys
+import subprocess
+import argparse
+import logging
+logging.getLogger("scapy").setLevel(logging.CRITICAL)
+
+import scapy.all as sc
+
+routing_options=["RR", "RR-same", "RR-trunc",
+ "LSRR", "LSRR-trunc",
+ "SSRR", "SSRR-trunc"]
+
+def parse_args():
+ parser=argparse.ArgumentParser(prog="pinger.py",
+ description="P I N G E R",
+ epilog="This utility creates a tun "
+ "interface, sends an echo request, and "
+ "forges the reply.")
+ # Required arguments
+ # Avoid setting defaults on these arguments,
+ # as we want to set them explicitly in the tests
+ parser.add_argument("--iface", type=str, required=True,
+ help="Interface to send packet to")
+ parser.add_argument("--src", type=str, required=True,
+ help="Source packet IP")
+ parser.add_argument("--dst", type=str, required=True,
+ help="Destination packet IP")
+ parser.add_argument("--icmp_type", type=int, required=True,
+ help="ICMP type")
+ parser.add_argument("--icmp_code", type=int, required=True,
+ help="ICMP code")
+ # IP arguments
+ parser.add_argument("--flags", type=str, default="",
+ choices=["MF", "DF", "evil"],
+ help="IP flags")
+ parser.add_argument("--opts", type=str, default="",
+ choices=["EOL", "NOP", "NOP-40", "unk", "unk-40"] +
+ routing_options, help="Include IP options")
+ parser.add_argument("--special", type=str, default="",
+ choices=["tcp", "udp", "wrong", "warp"],
+ help="Send a special packet")
+ # ICMP arguments
+ # Match names with <netinet/ip_icmp.h>
+ parser.add_argument("--icmp_pptr", type=int, default=0,
+ help="ICMP pointer")
+ parser.add_argument("--icmp_gwaddr", type=str, default="0.0.0.0",
+ help="ICMP gateway IP address")
+ parser.add_argument("--icmp_nextmtu", type=int, default=0,
+ help="ICMP next MTU")
+ parser.add_argument("--icmp_otime", type=int, default=0,
+ help="ICMP originate timestamp")
+ parser.add_argument("--icmp_rtime", type=int, default=0,
+ help="ICMP receive timestamp")
+ parser.add_argument("--icmp_ttime", type=int, default=0,
+ help="ICMP transmit timestamp")
+ parser.add_argument("--icmp_mask", type=str, default="0.0.0.0",
+ help="ICMP address mask")
+ parser.add_argument("--request", type=str, default="echo",
+ help="Request type (echo, mask, timestamp)")
+ # Miscellaneous arguments
+ parser.add_argument("--count", type=int, default=1,
+ help="Number of packets to send")
+ parser.add_argument("--dup", action="store_true",
+ help="Duplicate packets")
+ parser.add_argument("--version", action="version", version='%(prog)s 1.0')
+ return parser.parse_args()
+
+def construct_response_packet(echo, ip, icmp, special):
+ icmp_id_seq_types=[0, 8, 13, 14, 15, 16, 17, 18, 37, 38]
+ oip=echo[sc.IP]
+ oicmp=echo[sc.ICMP]
+ load=echo[sc.ICMP].payload
+ oip[sc.IP].remove_payload()
+ oicmp[sc.ICMP].remove_payload()
+ oicmp.type=8
+
+ # As if the original IP packet had these set
+ oip.ihl=None
+ oip.len=None
+ oip.id=1
+ oip.flags=ip.flags
+ oip.chksum=None
+ oip.options=ip.options
+
+ # Special options
+ if special == "tcp":
+ oip.proto="tcp"
+ tcp=sc.TCP(sport=1234, dport=5678)
+ return ip/icmp/oip/tcp
+ if special == "udp":
+ oip.proto="udp"
+ udp=sc.UDP(sport=1234, dport=5678)
+ return ip/icmp/oip/udp
+ if special == "warp":
+ # Build a package with a timestamp of INT_MAX
+ # (time-warped package)
+ payload_no_timestamp=sc.bytes_hex(load)[16:]
+ load=((b"\xff" * 8) + sc.hex_bytes(payload_no_timestamp))
+ if special == "wrong":
+ # Build a package with a wrong last byte
+ payload_no_last_byte=sc.bytes_hex(load)[:-2]
+ load=((sc.hex_bytes(payload_no_last_byte)) + b"\x00")
+
+ if icmp.type in icmp_id_seq_types:
+ pkt=ip/icmp/load
+ else:
+ ip.options=""
+ pkt=ip/icmp/oip/oicmp/load
+
+ return pkt
+
+def generate_ip_options(opts):
+ routers=["192.0.2.10", "192.0.2.20", "192.0.2.30",
+ "192.0.2.40", "192.0.2.50", "192.0.2.60",
+ "192.0.2.70", "192.0.2.80", "192.0.2.90"]
+ routers_zero=[0, 0, 0, 0, 0, 0, 0, 0, 0]
+ if opts == "EOL":
+ options=sc.IPOption(b"\x00")
+ elif opts == "NOP":
+ options=sc.IPOption(b"\x01")
+ elif opts == "NOP-40":
+ options=sc.IPOption(b"\x01" * 40)
+ elif opts == "RR":
+ options=sc.IPOption_RR(pointer=40, routers=routers)
+ elif opts == "RR-same":
+ options=sc.IPOption_RR(pointer=3, routers=routers_zero)
+ elif opts == "RR-trunc":
+ options=sc.IPOption_RR(length=7, routers=routers_zero)
+ elif opts == "LSRR":
+ subprocess.run(["sysctl", "net.inet.ip.process_options=0"], check=True)
+ options=sc.IPOption_LSRR(routers=routers)
+ elif opts == "LSRR-trunc":
+ subprocess.run(["sysctl", "net.inet.ip.process_options=0"], check=True)
+ options=sc.IPOption_LSRR(length=3, routers=routers_zero)
+ elif opts == "SSRR":
+ subprocess.run(["sysctl", "net.inet.ip.process_options=0"], check=True)
+ options=sc.IPOption_SSRR(routers=routers)
+ elif opts == "SSRR-trunc":
+ subprocess.run(["sysctl", "net.inet.ip.process_options=0"], check=True)
+ options=sc.IPOption_SSRR(length=3, routers=routers_zero)
+ elif opts == "unk":
+ subprocess.run(["sysctl", "net.inet.ip.process_options=0"], check=True)
+ options=sc.IPOption(b"\x9f")
+ elif opts == "unk-40":
+ subprocess.run(["sysctl", "net.inet.ip.process_options=0"], check=True)
+ options=sc.IPOption(b"\x9f" * 40)
+ else:
+ options=""
+ return options
+
+def main():
+ """P I N G E R"""
+ args=parse_args()
+ opts=generate_ip_options(args.opts)
+ ip=sc.IP(flags=args.flags, src=args.dst, dst=args.src, options=opts)
+ tun=sc.TunTapInterface(args.iface)
+ with open("created_interfaces.lst", "w", encoding="utf-8") as file:
+ file.write(args.iface)
+ subprocess.run(["ifconfig", tun.iface, "up"], check=True)
+ subprocess.run(["ifconfig", tun.iface, args.src, args.dst], check=True)
+ command=["/sbin/ping", "-c", str(args.count), "-t", str(args.count), "-v"]
+ if args.request == "mask":
+ command+=["-Mm"]
+ if args.request == "timestamp":
+ command+=["-Mt"]
+ if args.special != "":
+ command+=["-p1"]
+ if args.opts in routing_options:
+ command+=["-R"]
+ command+=[args.dst]
+ with subprocess.Popen(
+ args=command,
+ text=True
+ ) as ping:
+ for dummy in range(args.count):
+ echo=tun.recv()
+ icmp=sc.ICMP(type=args.icmp_type, code=args.icmp_code,
+ id=echo[sc.ICMP].id, seq=echo[sc.ICMP].seq,
+ ts_ori=args.icmp_otime, ts_rx=args.icmp_rtime,
+ ts_tx=args.icmp_ttime, gw=args.icmp_gwaddr,
+ ptr=args.icmp_pptr,
+ addr_mask=args.icmp_mask, nexthopmtu=args.icmp_nextmtu)
+ pkt=construct_response_packet(echo, ip, icmp, args.special)
+ tun.send(pkt)
+ if args.dup is True:
+ tun.send(pkt)
+ ping.communicate()
+
+ sys.exit(ping.returncode)
+
+if __name__ == "__main__":
+ main()
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sun, Mar 29, 4:09 PM (12 h, 58 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
30538178
Default Alt Text
D37876.id114546.diff (12 KB)
Attached To
Mode
D37876: ping_tests: Introduce a script to test ping
Attached
Detach File
Event Timeline
Log In to Comment