diff --git a/tests/sys/net/pcp.py b/tests/sys/net/pcp.py index cea88faaf438..c0b6d4efc3b0 100644 --- a/tests/sys/net/pcp.py +++ b/tests/sys/net/pcp.py @@ -1,74 +1,74 @@ #!/usr/bin/env python3 # # SPDX-License-Identifier: BSD-2-Clause # # Copyright (c) 2021 Rubicon Communications, LLC (Netgate). # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. import argparse import logging logging.getLogger("scapy").setLevel(logging.CRITICAL) import scapy.all as sp import sys import os curdir = os.path.dirname(os.path.realpath(__file__)) netpfil_common = curdir + "/../netpfil/common" sys.path.append(netpfil_common) from sniffer import Sniffer def check_pcp(args, packet): vlan = packet.getlayer(sp.Dot1Q) if vlan is None: return False if not packet.getlayer(sp.BOOTP): return False if vlan.prio == int(args.expect_pcp[0]): return True return False def main(): parser = argparse.ArgumentParser("pcp.py", description="PCP test tool") parser.add_argument('--recvif', nargs=1, required=True, help='The interface where to look for packets to check') parser.add_argument('--expect-pcp', nargs=1, help='The expected PCP value on VLAN packets') args = parser.parse_args() - sniffer = Sniffer(args, check_pcp, recvif=args.recvif[0], timeout=20) + sniffer = Sniffer(args, check_pcp, args.recvif[0], timeout=20) sniffer.join() - if sniffer.foundCorrectPacket: + if sniffer.correctPackets: sys.exit(0) sys.exit(1) if __name__ == '__main__': main() diff --git a/tests/sys/net/stp.py b/tests/sys/net/stp.py index 3e7d011efdd1..dc6634fb7279 100644 --- a/tests/sys/net/stp.py +++ b/tests/sys/net/stp.py @@ -1,114 +1,114 @@ #!/usr/bin/env python3 # # SPDX-License-Identifier: BSD-2-Clause # # Copyright (c) 2021 Kristof Provost # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # import argparse import logging logging.getLogger("scapy").setLevel(logging.CRITICAL) import scapy.all as sp import sys import os curdir = os.path.dirname(os.path.realpath(__file__)) netpfil_common = curdir + "/../netpfil/common" sys.path.append(netpfil_common) from sniffer import Sniffer def check_stp(args, packet): stp = packet.getlayer(sp.STP) if stp is None: return False if stp.rootmac != "00:0c:29:01:01:01": return False # Ensure we don't get confused by valid STP packets generated by if_bridge if (stp.maxage >= 6 and stp.maxage <= 40) and \ (stp.hellotime >= 1 and stp.hellotime <= 2) and \ (stp.fwddelay >= 4 and stp.fwddelay <= 30): return False print("This packet should have been dropped") print(packet.show()) return True def invalid_stp(send_if): llc = sp.Ether(src="00:0c:29:0b:91:0a", dst="01:80:C2:00:00:00") \ / sp.LLC() # Bad maxage stp = llc / sp.STP(proto=0, rootid=32768, rootmac="00:0c:29:01:01:01", \ bridgeid=32768, bridgemac="00:0c:29:01:01:01", \ portid=0x8007, maxage=41, hellotime=2, fwddelay=30) sp.sendp(stp, iface=send_if, verbose=False) stp = llc / sp.STP(proto=0, rootid=32768, rootmac="00:0c:29:01:01:01", \ bridgeid=32768, bridgemac="00:0c:29:01:01:01", \ portid=0x8007, maxage=5, hellotime=2, fwddelay=30) sp.sendp(stp, iface=send_if, verbose=False) # Bad hellotime stp = llc / sp.STP(proto=0, rootid=32768, rootmac="00:0c:29:01:01:01", \ bridgeid=32768, bridgemac="00:0c:29:01:01:01", \ portid=0x8007, maxage=40, hellotime=3, fwddelay=30) sp.sendp(stp, iface=send_if, verbose=False) stp = llc / sp.STP(proto=0, rootid=32768, rootmac="00:0c:29:01:01:01", \ bridgeid=32768, bridgemac="00:0c:29:01:01:01", \ portid=0x8007, maxage=40, hellotime=1, fwddelay=30) sp.sendp(stp, iface=send_if, verbose=False) # Bad fwddelay stp = llc / sp.STP(proto=0, rootid=32768, rootmac="00:0c:29:01:01:01", \ bridgeid=32768, bridgemac="00:0c:29:01:01:01", \ portid=0x8007, maxage=40, hellotime=2, fwddelay=31) sp.sendp(stp, iface=send_if, verbose=False) stp = llc / sp.STP(proto=0, rootid=32768, rootmac="00:0c:29:01:01:01", \ bridgeid=32768, bridgemac="00:0c:29:01:01:01", \ portid=0x8007, maxage=40, hellotime=2, fwddelay=3) sp.sendp(stp, iface=send_if, verbose=False) def main(): parser = argparse.ArgumentParser("stp.py", description="STP test tool") parser.add_argument('--sendif', nargs=1, required=True, help='The interface through which the packet(s) will be sent') parser.add_argument('--recvif', nargs=1, help='The interface on which to expect the ICMP echo request') args = parser.parse_args() - sniffer = Sniffer(args, check_stp) + sniffer = Sniffer(args, check_stp, args.recvif[0]) invalid_stp(args.sendif[0]) sniffer.join() # The 'correct' packet is a corrupt STP packet, so it shouldn't turn up. - if sniffer.foundCorrectPacket: + if sniffer.correctPackets: sys.exit(1) if __name__ == '__main__': main() diff --git a/tests/sys/netpfil/common/pft_icmp_check.py b/tests/sys/netpfil/common/pft_icmp_check.py index e3c5b927aa63..070465a198f7 100644 --- a/tests/sys/netpfil/common/pft_icmp_check.py +++ b/tests/sys/netpfil/common/pft_icmp_check.py @@ -1,112 +1,112 @@ #!/usr/bin/env python3 # # SPDX-License-Identifier: BSD-2-Clause # # Copyright (c) 2021 Rubicon Communications, LLC (Netgate) # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # import argparse import logging logging.getLogger("scapy").setLevel(logging.CRITICAL) import random import scapy.all as sp import socket import sys from sniffer import Sniffer PAYLOAD_MAGIC = bytes.fromhex('42c0ffee') def ping(send_if, dst_ip, args): ether = sp.Ether() ip = sp.IP(dst=dst_ip, src=args.fromaddr[0]) icmp = sp.ICMP(type='echo-request') raw = sp.raw(PAYLOAD_MAGIC * 250) # We want 1000 bytes payload, -ish ip.flags = 2 # Don't fragment icmp.seq = random.randint(0, 65535) args.icmp_seq = icmp.seq req = ether / ip / icmp / raw sp.sendp(req, iface=send_if, verbose=False) def check_icmp_too_big(args, packet): """ Verify that this is an ICMP packet too big error, and that the IP addresses in the payload packet match expectations. """ icmp = packet.getlayer(sp.ICMP) if not icmp: return False if not icmp.type == 3: return False ip = packet.getlayer(sp.IPerror) if not ip: return False if ip.src != args.fromaddr[0]: print("Incorrect src addr %s" % ip.src) return False if ip.dst != args.to[0]: print("Incorrect dst addr %s" % ip.dst) return False icmp2 = packet.getlayer(sp.ICMPerror) if not icmp2: print("IPerror doesn't contain ICMP") return False if icmp2.seq != args.icmp_seq: print("Incorrect icmp seq %d != %d" % (icmp2.seq, args.icmp_seq)) return False return True def main(): parser = argparse.ArgumentParser("pft_icmp_check.py", description="ICMP error validation tool") parser.add_argument('--to', nargs=1, required=True, help='The destination IP address') parser.add_argument('--fromaddr', nargs=1, required=True, help='The source IP address') parser.add_argument('--sendif', nargs=1, required=True, help='The interface through which the packet(s) will be sent') parser.add_argument('--recvif', nargs=1, help='The interface on which to expect the ICMP error') args = parser.parse_args() sniffer = None if not args.recvif is None: - sniffer = Sniffer(args, check_icmp_too_big) + sniffer = Sniffer(args, check_icmp_too_big, args.recvif[0]) ping(args.sendif[0], args.to[0], args) if sniffer: sniffer.join() - if sniffer.foundCorrectPacket: + if sniffer.correctPackets: sys.exit(0) else: sys.exit(1) if __name__ == '__main__': main() diff --git a/tests/sys/netpfil/common/pft_ping.py b/tests/sys/netpfil/common/pft_ping.py index 9cc7c5d5c5c0..20d4164c6e3e 100644 --- a/tests/sys/netpfil/common/pft_ping.py +++ b/tests/sys/netpfil/common/pft_ping.py @@ -1,334 +1,334 @@ #!/usr/bin/env python3 # # SPDX-License-Identifier: BSD-2-Clause # # Copyright (c) 2017 Kristof Provost # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # import argparse import logging logging.getLogger("scapy").setLevel(logging.CRITICAL) import scapy.all as sp import socket import sys from sniffer import Sniffer PAYLOAD_MAGIC = bytes.fromhex('42c0ffee') dup_found = 0 def check_dup(args, packet): """ Verify that this is an ICMP packet, and that we only see one """ global dup_found icmp = packet.getlayer(sp.ICMP) if not icmp: return False raw = packet.getlayer(sp.Raw) if not raw: return False if raw.load != PAYLOAD_MAGIC: return False dup_found = dup_found + 1 return False def check_ping_request(args, packet): if args.ip6: return check_ping6_request(args, packet) else: return check_ping4_request(args, packet) def check_ping4_request(args, packet): """ Verify that the packet matches what we'd have sent """ dst_ip = args.to[0] ip = packet.getlayer(sp.IP) if not ip: return False if ip.dst != dst_ip: return False icmp = packet.getlayer(sp.ICMP) if not icmp: return False if sp.icmptypes[icmp.type] != 'echo-request': return False raw = packet.getlayer(sp.Raw) if not raw: return False if raw.load != PAYLOAD_MAGIC: return False # Wait to check expectations until we've established this is the packet we # sent. if args.expect_tos: if ip.tos != int(args.expect_tos[0]): print("Unexpected ToS value %d, expected %d" \ % (ip.tos, int(args.expect_tos[0]))) return False return True def check_ping6_request(args, packet): """ Verify that the packet matches what we'd have sent """ dst_ip = args.to[0] ip = packet.getlayer(sp.IPv6) if not ip: return False if ip.dst != dst_ip: return False icmp = packet.getlayer(sp.ICMPv6EchoRequest) if not icmp: return False if icmp.data != PAYLOAD_MAGIC: return False return True def check_ping_reply(args, packet): if args.ip6: return check_ping6_reply(args, packet) else: return check_ping4_reply(args, packet) def check_ping4_reply(args, packet): """ Check that this is a reply to the ping request we sent """ dst_ip = args.to[0] ip = packet.getlayer(sp.IP) if not ip: return False if ip.src != dst_ip: return False icmp = packet.getlayer(sp.ICMP) if not icmp: return False if sp.icmptypes[icmp.type] != 'echo-reply': return False raw = packet.getlayer(sp.Raw) if not raw: return False if raw.load != PAYLOAD_MAGIC: return False return True def check_ping6_reply(args, packet): """ Check that this is a reply to the ping request we sent """ dst_ip = args.to[0] ip = packet.getlayer(sp.IPv6) if not ip: return False if ip.src != dst_ip: return False icmp = packet.getlayer(sp.ICMPv6EchoReply) if not icmp: print("No echo reply!") return False if icmp.data != PAYLOAD_MAGIC: print("data mismatch") return False return True def ping(send_if, dst_ip, args): ether = sp.Ether() ip = sp.IP(dst=dst_ip) icmp = sp.ICMP(type='echo-request') raw = sp.raw(PAYLOAD_MAGIC) if args.send_tos: ip.tos = int(args.send_tos[0]) if args.fromaddr: ip.src = args.fromaddr[0] req = ether / ip / icmp / raw sp.sendp(req, iface=send_if, verbose=False) def ping6(send_if, dst_ip, args): ether = sp.Ether() ip6 = sp.IPv6(dst=dst_ip) icmp = sp.ICMPv6EchoRequest(data=sp.raw(PAYLOAD_MAGIC)) if args.fromaddr: ip.src = args.fromaddr[0] req = ether / ip6 / icmp sp.sendp(req, iface=send_if, verbose=False) def check_tcpsyn(args, packet): dst_ip = args.to[0] ip = packet.getlayer(sp.IP) if not ip: return False if ip.dst != dst_ip: return False tcp = packet.getlayer(sp.TCP) if not tcp: return False # Verify IP checksum chksum = ip.chksum ip.chksum = None new_chksum = sp.IP(sp.raw(ip)).chksum if chksum != new_chksum: print("Expected IP checksum %x but found %x\n" % (new_cshkum, chksum)) return False # Verify TCP checksum chksum = tcp.chksum packet_raw = sp.raw(packet) tcp.chksum = None newpacket = sp.Ether(sp.raw(packet[sp.Ether])) new_chksum = newpacket[sp.TCP].chksum if chksum != new_chksum: print("Expected TCP checksum %x but found %x\n" % (new_chksum, chksum)) return False return True def tcpsyn(send_if, dst_ip, args): opts=[('Timestamp', (1, 1)), ('MSS', 1280)] if args.tcpopt_unaligned: opts = [('NOP', 0 )] + opts ether = sp.Ether() ip = sp.IP(dst=dst_ip) tcp = sp.TCP(dport=666, flags='S', options=opts) req = ether / ip / tcp sp.sendp(req, iface=send_if, verbose=False) def main(): parser = argparse.ArgumentParser("pft_ping.py", description="Ping test tool") parser.add_argument('--sendif', nargs=1, required=True, help='The interface through which the packet(s) will be sent') parser.add_argument('--recvif', nargs=1, help='The interface on which to expect the ICMP echo request') parser.add_argument('--replyif', nargs=1, help='The interface on which to expect the ICMP echo response') parser.add_argument('--checkdup', nargs=1, help='The interface on which to expect the duplicated ICMP packets') parser.add_argument('--ip6', action='store_true', help='Use IPv6') parser.add_argument('--to', nargs=1, required=True, help='The destination IP address for the ICMP echo request') parser.add_argument('--fromaddr', nargs=1, help='The source IP address for the ICMP echo request') # TCP options parser.add_argument('--tcpsyn', action='store_true', help='Send a TCP SYN packet') parser.add_argument('--tcpopt_unaligned', action='store_true', help='Include unaligned TCP options') # Packet settings parser.add_argument('--send-tos', nargs=1, help='Set the ToS value for the transmitted packet') # Expectations parser.add_argument('--expect-tos', nargs=1, help='The expected ToS value in the received packet') args = parser.parse_args() # We may not have a default route. Tell scapy where to start looking for routes sp.conf.iface6 = args.sendif[0] sniffer = None if not args.recvif is None: checkfn=check_ping_request if args.tcpsyn: checkfn=check_tcpsyn - sniffer = Sniffer(args, checkfn) + sniffer = Sniffer(args, checkfn, args.recvif[0]) replysniffer = None if not args.replyif is None: checkfn=check_ping_reply - replysniffer = Sniffer(args, checkfn, recvif=args.replyif[0]) + replysniffer = Sniffer(args, checkfn, args.replyif[0]) dupsniffer = None if args.checkdup is not None: - dupsniffer = Sniffer(args, check_dup, recvif=args.checkdup[0]) + dupsniffer = Sniffer(args, check_dup, args.checkdup[0]) if args.tcpsyn: tcpsyn(args.sendif[0], args.to[0], args) else: if args.ip6: ping6(args.sendif[0], args.to[0], args) else: ping(args.sendif[0], args.to[0], args) if dupsniffer: dupsniffer.join() if dup_found != 1: sys.exit(1) if sniffer: sniffer.join() - if sniffer.foundCorrectPacket: + if sniffer.correctPackets: sys.exit(0) else: sys.exit(1) if replysniffer: replysniffer.join() - if replysniffer.foundCorrectPacket: + if replysniffer.correctPackets: sys.exit(0) else: sys.exit(1) if __name__ == '__main__': main() diff --git a/tests/sys/netpfil/common/sniffer.py b/tests/sys/netpfil/common/sniffer.py index 5e09a2e4db37..cee6f73e22dc 100644 --- a/tests/sys/netpfil/common/sniffer.py +++ b/tests/sys/netpfil/common/sniffer.py @@ -1,67 +1,61 @@ # $FreeBSD$ # # SPDX-License-Identifier: BSD-2-Clause-FreeBSD # # Copyright (c) 2017 Kristof Provost # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # import threading import scapy.all as sp import sys class Sniffer(threading.Thread): - def __init__(self, args, check_function, recvif=None, timeout=3): + def __init__(self, args, check_function, recvif, timeout=3): threading.Thread.__init__(self) self._sem = threading.Semaphore(0) self._args = args self._timeout = timeout - if recvif is not None: - self._recvif = recvif - else: - self._recvif = args.recvif[0] + self._recvif = recvif self._check_function = check_function - self.foundCorrectPacket = False + self.correctPackets = 0 self.start() if not self._sem.acquire(timeout=30): raise Exception("Failed to start sniffer") def _checkPacket(self, packet): ret = self._check_function(self._args, packet) if ret: - self.foundCorrectPacket = True + self.correctPackets += 1 return ret def _startedCb(self): self._sem.release() def run(self): self.packets = [] - try: - self.packets = sp.sniff(iface=self._recvif, - stop_filter=self._checkPacket, timeout=self._timeout, - started_callback=self._startedCb) - except Exception as e: - print(e, file=sys.stderr) + self.packets = sp.sniff(iface=self._recvif, + stop_filter=self._checkPacket, timeout=self._timeout, + started_callback=self._startedCb) diff --git a/tests/sys/netpfil/pf/CVE-2019-5598.py b/tests/sys/netpfil/pf/CVE-2019-5598.py index 603a1aef376f..b72c04c5e19b 100644 --- a/tests/sys/netpfil/pf/CVE-2019-5598.py +++ b/tests/sys/netpfil/pf/CVE-2019-5598.py @@ -1,92 +1,92 @@ #!/usr/bin/env python3 # # SPDX-License-Identifier: BSD-2-Clause-FreeBSD # # Copyright (c) 2019 Kristof Provost # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. import argparse import logging logging.getLogger("scapy").setLevel(logging.CRITICAL) import scapy.all as sp import sys from sniffer import Sniffer def check_icmp_error(args, packet): ip = packet.getlayer(sp.IP) if not ip: return False if ip.dst != args.to[0]: return False icmp = packet.getlayer(sp.ICMP) if not icmp: return False if icmp.type != 3 or icmp.code != 3: return False return True def main(): parser = argparse.ArgumentParser("CVE-2019-icmp.py", description="CVE-2019-icmp test tool") parser.add_argument('--sendif', nargs=1, required=True, help='The interface through which the packet will be sent') parser.add_argument('--recvif', nargs=1, required=True, help='The interface on which to check for the packet') parser.add_argument('--src', nargs=1, required=True, help='The source IP address') parser.add_argument('--to', nargs=1, required=True, help='The destination IP address') args = parser.parse_args() # Send the allowed packet to establish state udp = sp.Ether() / \ sp.IP(src=args.src[0], dst=args.to[0]) / \ sp.UDP(dport=53, sport=1234) sp.sendp(udp, iface=args.sendif[0], verbose=False) # Start sniffing on recvif - sniffer = Sniffer(args, check_icmp_error) + sniffer = Sniffer(args, check_icmp_error, args.recvif[0]) # Send the bad error packet icmp_reachable = sp.Ether() / \ sp.IP(src=args.src[0], dst=args.to[0]) / \ sp.ICMP(type=3, code=3) / \ sp.IP(src="4.3.2.1", dst="1.2.3.4") / \ sp.UDP(dport=53, sport=1234) sp.sendp(icmp_reachable, iface=args.sendif[0], verbose=False) sniffer.join() - if sniffer.foundCorrectPacket: + if sniffer.correctPackets: sys.exit(1) sys.exit(0) if __name__ == '__main__': main()