diff --git a/tests/sys/netpfil/pf/Makefile b/tests/sys/netpfil/pf/Makefile index 9bb40b911d4c..115a38666cc7 100644 --- a/tests/sys/netpfil/pf/Makefile +++ b/tests/sys/netpfil/pf/Makefile @@ -1,31 +1,32 @@ # $FreeBSD$ PACKAGE= tests TESTSDIR= ${TESTSBASE}/sys/netpfil/pf TESTS_SUBDIRS+= ioctl ATF_TESTS_SH+= anchor \ pass_block \ forward \ fragmentation \ names \ nat \ set_tos \ src_track \ rdr \ route_to \ synproxy \ set_skip \ pfsync \ table ${PACKAGE}FILES+= utils.subr \ echo_inetd.conf \ + sniffer.py \ pft_ping.py \ CVE-2019-5597.py ${PACKAGE}FILESMODE_pft_ping.py= 0555 ${PACKAGE}FILESMODE_CVE-2019-5597.py= 0555 .include diff --git a/tests/sys/netpfil/pf/pft_ping.py b/tests/sys/netpfil/pf/pft_ping.py index 0b70c2235894..e77d0835134f 100644 --- a/tests/sys/netpfil/pf/pft_ping.py +++ b/tests/sys/netpfil/pf/pft_ping.py @@ -1,156 +1,135 @@ #!/usr/local/bin/python2.7 import argparse import scapy.all as sp import sys -import threading +from sniffer import Sniffer PAYLOAD_MAGIC = 0x42c0ffee -class Sniffer(threading.Thread): - def __init__(self, args, check_function): - threading.Thread.__init__(self) - - self._args = args - self._recvif = args.recvif[0] - self._check_function = check_function - self.foundCorrectPacket = False - - self.start() - - def _checkPacket(self, packet): - ret = self._check_function(self._args, packet) - if ret: - self.foundCorrectPacket = True - return ret - - def run(self): - self.packets = sp.sniff(iface=self._recvif, - stop_filter=self._checkPacket, timeout=3) - def check_ping_request(args, packet): if args.ip6: return check_ping6_request(args, packet) else: return check_ping4_request(args, packet) def check_ping4_request(args, packet): """ Verify that the packet matches what we'd have sent """ dst_ip = args.to[0] ip = packet.getlayer(sp.IP) if not ip: return False if ip.dst != dst_ip: return False icmp = packet.getlayer(sp.ICMP) if not icmp: return False if sp.icmptypes[icmp.type] != 'echo-request': return False raw = packet.getlayer(sp.Raw) if not raw: return False if raw.load != str(PAYLOAD_MAGIC): return False # Wait to check expectations until we've established this is the packet we # sent. if args.expect_tos: if ip.tos != int(args.expect_tos[0]): print "Unexpected ToS value %d, expected %s" \ % (ip.tos, args.expect_tos[0]) return False return True def check_ping6_request(args, packet): """ Verify that the packet matches what we'd have sent """ dst_ip = args.to[0] ip = packet.getlayer(sp.IPv6) if not ip: return False if ip.dst != dst_ip: return False icmp = packet.getlayer(sp.ICMPv6EchoRequest) if not icmp: return False if icmp.data != str(PAYLOAD_MAGIC): return False return True def ping(send_if, dst_ip, args): ether = sp.Ether() ip = sp.IP(dst=dst_ip) icmp = sp.ICMP(type='echo-request') raw = sp.Raw(str(PAYLOAD_MAGIC)) if args.send_tos: ip.tos = int(args.send_tos[0]) req = ether / ip / icmp / raw sp.sendp(req, iface=send_if, verbose=False) def ping6(send_if, dst_ip, args): ether = sp.Ether() ip6 = sp.IPv6(dst=dst_ip) icmp = sp.ICMPv6EchoRequest(data=PAYLOAD_MAGIC) req = ether / ip6 / icmp sp.sendp(req, iface=send_if, verbose=False) def main(): parser = argparse.ArgumentParser("pft_ping.py", description="Ping test tool") parser.add_argument('--sendif', nargs=1, required=True, help='The interface through which the packet(s) will be sent') parser.add_argument('--recvif', nargs=1, help='The interface on which to expect the ICMP echo response') parser.add_argument('--ip6', action='store_true', help='Use IPv6') parser.add_argument('--to', nargs=1, required=True, help='The destination IP address for the ICMP echo request') # Packet settings parser.add_argument('--send-tos', nargs=1, help='Set the ToS value for the transmitted packet') # Expectations parser.add_argument('--expect-tos', nargs=1, help='The expected ToS value in the received packet') args = parser.parse_args() # We may not have a default route. Tell scapy where to start looking for routes sp.conf.iface6 = args.sendif[0] sniffer = None if not args.recvif is None: sniffer = Sniffer(args, check_ping_request) if args.ip6: ping6(args.sendif[0], args.to[0], args) else: ping(args.sendif[0], args.to[0], args) if sniffer: sniffer.join() if sniffer.foundCorrectPacket: sys.exit(0) else: sys.exit(1) if __name__ == '__main__': main() diff --git a/tests/sys/netpfil/pf/sniffer.py b/tests/sys/netpfil/pf/sniffer.py new file mode 100644 index 000000000000..c71f6e1f5729 --- /dev/null +++ b/tests/sys/netpfil/pf/sniffer.py @@ -0,0 +1,25 @@ +# $FreeBSD$ + +import threading +import scapy.all as sp + +class Sniffer(threading.Thread): + def __init__(self, args, check_function): + threading.Thread.__init__(self) + + self._args = args + self._recvif = args.recvif[0] + self._check_function = check_function + self.foundCorrectPacket = False + + self.start() + + def _checkPacket(self, packet): + ret = self._check_function(self._args, packet) + if ret: + self.foundCorrectPacket = True + return ret + + def run(self): + self.packets = sp.sniff(iface=self._recvif, + stop_filter=self._checkPacket, timeout=3)