diff --git a/tests/sys/common/divert.py b/tests/sys/common/divert.py index 1861f87402ad..f23fbe857cbb 100755 --- a/tests/sys/common/divert.py +++ b/tests/sys/common/divert.py @@ -1,83 +1,85 @@ #!/usr/bin/env python # - # SPDX-License-Identifier: BSD-2-Clause # # Copyright (c) 2020 Alexander V. Chernikov # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # # $FreeBSD$ # import socket +import logging +logging.getLogger("scapy").setLevel(logging.CRITICAL) import scapy.all as sc import argparse IPPROTO_DIVERT = 258 def parse_args(): parser = argparse.ArgumentParser(description='divert socket tester') parser.add_argument('--dip', type=str, help='destination packet IP') parser.add_argument('--divert_port', type=int, default=6668, help='divert port to use') parser.add_argument('--test_name', type=str, required=True, help='test name to run') return parser.parse_args() def ipdivert_ip_output_remote_success(args): packet = sc.IP(dst=args.dip) / sc.ICMP(type='echo-request') with socket.socket(socket.AF_INET, socket.SOCK_RAW, IPPROTO_DIVERT) as s: s.bind(('0.0.0.0', args.divert_port)) s.sendto(bytes(packet), ('0.0.0.0', 0)) def ipdivert_ip6_output_remote_success(args): packet = sc.IPv6(dst=args.dip) / sc.ICMPv6EchoRequest() with socket.socket(socket.AF_INET, socket.SOCK_RAW, IPPROTO_DIVERT) as s: s.bind(('0.0.0.0', args.divert_port)) s.sendto(bytes(packet), ('0.0.0.0', 0)) def ipdivert_ip_input_local_success(args): """Sends IPv4 packet to OS stack as inbound local packet.""" packet = sc.IP(dst=args.dip) / sc.ICMP(type='echo-request') with socket.socket(socket.AF_INET, socket.SOCK_RAW, IPPROTO_DIVERT) as s: s.bind(('0.0.0.0', args.divert_port)) s.sendto(bytes(packet), (args.dip, 0)) # XXX: IPv6 local divert is currently not supported # TODO: add IPv4 ifname output verification def main(): args = parse_args() test_ptr = globals()[args.test_name] test_ptr(args) if __name__ == '__main__': main() diff --git a/tests/sys/common/sender.py b/tests/sys/common/sender.py index 483210e54fcb..2ff699a8ef6d 100755 --- a/tests/sys/common/sender.py +++ b/tests/sys/common/sender.py @@ -1,201 +1,203 @@ #!/usr/bin/env python # - # SPDX-License-Identifier: BSD-2-Clause # # Copyright (c) 2020 Alexander V. Chernikov # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # # $FreeBSD$ # from functools import partial import socket +import logging +logging.getLogger("scapy").setLevel(logging.CRITICAL) import scapy.all as sc import argparse import time def parse_args(): parser = argparse.ArgumentParser(description='divert socket tester') parser.add_argument('--dip', type=str, help='destination packet IP') parser.add_argument('--sip', type=str, help='source packet IP') parser.add_argument('--dmac', type=str, help='packet dst mac') parser.add_argument('--smac', type=str, help='packet src mac') parser.add_argument('--iface', type=str, help='interface to use') parser.add_argument('--test_name', type=str, required=True, help='test name to run') return parser.parse_args() def send_packet(args, pkt): sc.sendp(pkt, iface=args.iface, verbose=False) def is_icmp6_echo_request(pkt): return pkt.type == 0x86DD and pkt.payload.nh == 58 and \ pkt.payload.payload.type == 128 def check_forwarded_ip_packet(orig_pkt, fwd_pkt): """ Checks that forwarded ICMP packet @fwd_ptk is the same as @orig_pkt. Assumes router-on-the-stick forwarding behaviour: * src/dst macs are swapped * TTL is decremented """ # Check ether fields assert orig_pkt.src == fwd_pkt.dst assert orig_pkt.dst == fwd_pkt.src assert len(orig_pkt) == len(fwd_pkt) # Check IP fwd_ip = fwd_pkt[sc.IP] orig_ip = orig_pkt[sc.IP] assert orig_ip.src == orig_ip.src assert orig_ip.dst == fwd_ip.dst assert orig_ip.ttl == fwd_ip.ttl + 1 # Check ICMP fwd_icmp = fwd_ip[sc.ICMP] orig_icmp = orig_ip[sc.ICMP] assert bytes(orig_ip.payload) == bytes(fwd_ip.payload) def fwd_ip_icmp_fast(args): """ Sends ICMP packet via args.iface interface. Receives and checks the forwarded packet. Assumes forwarding router decrements TTL """ def filter_f(x): return x.src == args.dmac and x.type == 0x0800 e = sc.Ether(src=args.smac, dst=args.dmac) ip = sc.IP(src=args.sip, dst=args.dip) icmp = sc.ICMP(type='echo-request') pkt = e / ip / icmp send_cb = partial(send_packet, args, pkt) packets = sc.sniff(iface=args.iface, started_callback=send_cb, stop_filter=filter_f, lfilter=filter_f, timeout=5) assert len(packets) > 0 fwd_pkt = packets[-1] try: check_forwarded_ip_packet(pkt, fwd_pkt) except Exception as e: print('Original packet:') pkt.show() print('Forwarded packet:') fwd_pkt.show() for a_packet in packets: a_packet.summary() raise Exception from e def fwd_ip_icmp_slow(args): """ Sends ICMP packet via args.iface interface. Forces slow path processing by introducing IP option. Receives and checks the forwarded packet. Assumes forwarding router decrements TTL """ def filter_f(x): return x.src == args.dmac and x.type == 0x0800 e = sc.Ether(src=args.smac, dst=args.dmac) # Add IP option to switch to 'normal' IP processing stream_id = sc.IPOption_Stream_Id(security=0xFFFF) ip = sc.IP(src=args.sip, dst=args.dip, options=[sc.IPOption_Stream_Id(security=0xFFFF)]) icmp = sc.ICMP(type='echo-request') pkt = e / ip / icmp send_cb = partial(send_packet, args, pkt) packets = sc.sniff(iface=args.iface, started_callback=send_cb, stop_filter=filter_f, lfilter=filter_f, timeout=5) assert len(packets) > 0 check_forwarded_ip_packet(pkt, packets[-1]) def check_forwarded_ip6_packet(orig_pkt, fwd_pkt): """ Checks that forwarded ICMP packet @fwd_ptk is the same as @orig_pkt. Assumes router-on-the-stick forwarding behaviour: * src/dst macs are swapped * TTL is decremented """ # Check ether fields assert orig_pkt.src == fwd_pkt.dst assert orig_pkt.dst == fwd_pkt.src assert len(orig_pkt) == len(fwd_pkt) # Check IP fwd_ip = fwd_pkt[sc.IPv6] orig_ip = orig_pkt[sc.IPv6] assert orig_ip.src == orig_ip.src assert orig_ip.dst == fwd_ip.dst assert orig_ip.hlim == fwd_ip.hlim + 1 # Check ICMPv6 assert bytes(orig_ip.payload) == bytes(fwd_ip.payload) def fwd_ip6_icmp(args): """ Sends ICMPv6 packet via args.iface interface. Receives and checks the forwarded packet. Assumes forwarding router decrements TTL """ def filter_f(x): return x.src == args.dmac and is_icmp6_echo_request(x) e = sc.Ether(src=args.smac, dst=args.dmac) ip = sc.IPv6(src=args.sip, dst=args.dip) icmp = sc.ICMPv6EchoRequest() pkt = e / ip / icmp send_cb = partial(send_packet, args, pkt) packets = sc.sniff(iface=args.iface, started_callback=send_cb, stop_filter=filter_f, lfilter=filter_f, timeout=5) assert len(packets) > 0 fwd_pkt = packets[-1] try: check_forwarded_ip6_packet(pkt, fwd_pkt) except Exception as e: print('Original packet:') pkt.show() print('Forwarded packet:') fwd_pkt.show() for idx, a_packet in enumerate(packets): print('{}: {}'.format(idx, a_packet.summary())) raise Exception from e def main(): args = parse_args() test_ptr = globals()[args.test_name] test_ptr(args) if __name__ == '__main__': main() diff --git a/tests/sys/net/stp.py b/tests/sys/net/stp.py index 4c4c0af4c728..3e7d011efdd1 100644 --- a/tests/sys/net/stp.py +++ b/tests/sys/net/stp.py @@ -1,112 +1,114 @@ #!/usr/bin/env python3 # # SPDX-License-Identifier: BSD-2-Clause # # Copyright (c) 2021 Kristof Provost # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # import argparse +import logging +logging.getLogger("scapy").setLevel(logging.CRITICAL) import scapy.all as sp import sys import os curdir = os.path.dirname(os.path.realpath(__file__)) netpfil_common = curdir + "/../netpfil/common" sys.path.append(netpfil_common) from sniffer import Sniffer def check_stp(args, packet): stp = packet.getlayer(sp.STP) if stp is None: return False if stp.rootmac != "00:0c:29:01:01:01": return False # Ensure we don't get confused by valid STP packets generated by if_bridge if (stp.maxage >= 6 and stp.maxage <= 40) and \ (stp.hellotime >= 1 and stp.hellotime <= 2) and \ (stp.fwddelay >= 4 and stp.fwddelay <= 30): return False print("This packet should have been dropped") print(packet.show()) return True def invalid_stp(send_if): llc = sp.Ether(src="00:0c:29:0b:91:0a", dst="01:80:C2:00:00:00") \ / sp.LLC() # Bad maxage stp = llc / sp.STP(proto=0, rootid=32768, rootmac="00:0c:29:01:01:01", \ bridgeid=32768, bridgemac="00:0c:29:01:01:01", \ portid=0x8007, maxage=41, hellotime=2, fwddelay=30) sp.sendp(stp, iface=send_if, verbose=False) stp = llc / sp.STP(proto=0, rootid=32768, rootmac="00:0c:29:01:01:01", \ bridgeid=32768, bridgemac="00:0c:29:01:01:01", \ portid=0x8007, maxage=5, hellotime=2, fwddelay=30) sp.sendp(stp, iface=send_if, verbose=False) # Bad hellotime stp = llc / sp.STP(proto=0, rootid=32768, rootmac="00:0c:29:01:01:01", \ bridgeid=32768, bridgemac="00:0c:29:01:01:01", \ portid=0x8007, maxage=40, hellotime=3, fwddelay=30) sp.sendp(stp, iface=send_if, verbose=False) stp = llc / sp.STP(proto=0, rootid=32768, rootmac="00:0c:29:01:01:01", \ bridgeid=32768, bridgemac="00:0c:29:01:01:01", \ portid=0x8007, maxage=40, hellotime=1, fwddelay=30) sp.sendp(stp, iface=send_if, verbose=False) # Bad fwddelay stp = llc / sp.STP(proto=0, rootid=32768, rootmac="00:0c:29:01:01:01", \ bridgeid=32768, bridgemac="00:0c:29:01:01:01", \ portid=0x8007, maxage=40, hellotime=2, fwddelay=31) sp.sendp(stp, iface=send_if, verbose=False) stp = llc / sp.STP(proto=0, rootid=32768, rootmac="00:0c:29:01:01:01", \ bridgeid=32768, bridgemac="00:0c:29:01:01:01", \ portid=0x8007, maxage=40, hellotime=2, fwddelay=3) sp.sendp(stp, iface=send_if, verbose=False) def main(): parser = argparse.ArgumentParser("stp.py", description="STP test tool") parser.add_argument('--sendif', nargs=1, required=True, help='The interface through which the packet(s) will be sent') parser.add_argument('--recvif', nargs=1, help='The interface on which to expect the ICMP echo request') args = parser.parse_args() sniffer = Sniffer(args, check_stp) invalid_stp(args.sendif[0]) sniffer.join() # The 'correct' packet is a corrupt STP packet, so it shouldn't turn up. if sniffer.foundCorrectPacket: sys.exit(1) if __name__ == '__main__': main() diff --git a/tests/sys/netinet6/exthdr.py b/tests/sys/netinet6/exthdr.py index 7c09f44c5b58..52739a9cfa11 100644 --- a/tests/sys/netinet6/exthdr.py +++ b/tests/sys/netinet6/exthdr.py @@ -1,273 +1,275 @@ #!/usr/bin/env python #- # SPDX-License-Identifier: BSD-2-Clause # # Copyright (c) 2019 Netflix, Inc. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # # $FreeBSD$ # import argparse +import logging +logging.getLogger("scapy").setLevel(logging.CRITICAL) import scapy.all as sp import socket import sys import frag6.sniffer as Sniffer from time import sleep def check_icmp6_error_dst_unreach_noport(args, packet): ip6 = packet.getlayer(sp.IPv6) if not ip6: return False oip6 = sp.IPv6(src=args.src[0], dst=args.to[0]) if ip6.dst != oip6.src: return False icmp6 = packet.getlayer(sp.ICMPv6DestUnreach) if not icmp6: return False # ICMP6_DST_UNREACH_NOPORT 4 if icmp6.code != 4: return False # Should we check the payload as well? # We are running in a very isolated environment and nothing else # should trigger an ICMPv6 Dest Unreach / Port Unreach so leave it. #icmp6.display() return True def check_icmp6_error_paramprob_header(args, packet): ip6 = packet.getlayer(sp.IPv6) if not ip6: return False oip6 = sp.IPv6(src=args.src[0], dst=args.to[0]) if ip6.dst != oip6.src: return False icmp6 = packet.getlayer(sp.ICMPv6ParamProblem) if not icmp6: return False # ICMP6_PARAMPROB_HEADER 0 if icmp6.code != 0: return False # Should we check the payload as well? # We are running in a very isolated environment and nothing else # should trigger an ICMPv6 Param Prob so leave it. #icmp6.display() return True def check_tcp_rst(args, packet): ip6 = packet.getlayer(sp.IPv6) if not ip6: return False oip6 = sp.IPv6(src=args.src[0], dst=args.to[0]) if ip6.dst != oip6.src: return False tcp = packet.getlayer(sp.TCP) if not tcp: return False # Is TCP RST? if tcp.flags & 0x04: #tcp.display() return True return False def addExt(ext, h): if h is None: return ext if ext is None: ext = h else: ext = ext / h return ext def getExtHdrs(args): ext = None # XXX-TODO Try to put them in an order which could make sense # in real life packets and according to the RFCs. if args.hbh: hbh = sp.IPv6ExtHdrHopByHop(options = \ sp.PadN(optdata="\x00\x00\x00\x00\x00\x00")) ext = addExt(ext, hbh) if args.rh: rh = sp.IPv6ExtHdrRouting(type = 0) ext = addExt(ext, rh) if args.frag6: frag6 = sp.IPv6ExtHdrFragment(offset=0, m=0, id=0x1234) ext = addExt(ext, frag6) if args.esp: # XXX TODO esp = None ext = addExt(ext, esp) if args.ah: # XXX TODO ah = None ext = addExt(ext, ah) if args.dest: dest = sp.IPv6ExtHdrDestOpt(options = \ sp.PadN(optdata="\x00\x00\x00\x00\x00\x00")) ext = addExt(ext, dest) if args.mobi: # XXX TODO mobi = None ext = addExt(ext, mobi) if args.hip: # XXX TODO hip = None ext = addExt(ext, hip) if args.shim6: # XXX TODO shim6 = None ext = addExt(ext, shim6) if args.proto253: # XXX TODO tft = None ext = addExt(ext, tft) if args.proto254: # XXX TODO tff = None ext = addExt(ext, tff) if args.hbhbad: hbhbad = sp.IPv6ExtHdrHopByHop(options = \ sp.PadN(optdata="\x00\x00\x00\x00\x00\x00")) ext = addExt(ext, hbhbad) return ext def main(): parser = argparse.ArgumentParser("exthdr.py", description="IPv6 extension header test tool") parser.add_argument('--sendif', nargs=1, required=True, help='The interface through which the packet will be sent') parser.add_argument('--recvif', nargs=1, required=True, help='The interface on which to check for the packet') parser.add_argument('--src', nargs=1, required=True, help='The source IP address') parser.add_argument('--to', nargs=1, required=True, help='The destination IP address') parser.add_argument('--debug', required=False, action='store_true', help='Enable test debugging') # Extension Headers # See https://www.iana.org/assignments/ipv6-parameters/ipv6-parameters.xhtml parser.add_argument('--hbh', required=False, action='store_true', help='Add IPv6 Hop-by-Hop Option') parser.add_argument('--hbhbad', required=False, action='store_true', help='Add IPv6 Hop-by-Hop Option at an invalid position') parser.add_argument('--rh', required=False, action='store_true', help='Add Routing Header for IPv6') parser.add_argument('--frag6', required=False, action='store_true', help='Add Fragment Header for IPv6') parser.add_argument('--esp', required=False, action='store_true', help='Add Encapsulating Security Payload') parser.add_argument('--ah', required=False, action='store_true', help='Add Authentication Header') parser.add_argument('--dest', required=False, action='store_true', help='Add Destination Options for IPv6') parser.add_argument('--mobi', required=False, action='store_true', help='Add Mobility Header') parser.add_argument('--hip', required=False, action='store_true', help='Add Host Identity Protocol') parser.add_argument('--shim6', required=False, action='store_true', help='Add Shim6 Protocol') parser.add_argument('--proto253', required=False, action='store_true', help='Use for experimentation and testing (253)') parser.add_argument('--proto254', required=False, action='store_true', help='Use for experimentation and testing (254)') args = parser.parse_args() if args.hbhbad: ok = 0 else: ok = 1 ######################################################################## # # Send IPv6 packets with one or more extension headers (combinations # mmight not always make sense depending what user tells us). # We are trying to cover the basic loop and passing mbufs on # and making sure m_pullup() works. # Try for at least UDP and TCP upper layer payloads. # # Expectations: no panics # We are not testing for any other outcome here. # data = "6" * 88 udp = sp.UDP(dport=3456, sport=6543) / data tcp = sp.TCP(dport=4567, sport=7654) ip6 = sp.Ether() / sp.IPv6(src=args.src[0], dst=args.to[0]) for ulp in [ udp, tcp ]: ext = getExtHdrs(args) if ext is not None: pkt = ip6 / ext / ulp else: pkt = ip6 / ulp if args.debug : pkt.display() if not ok: sc = check_icmp6_error_paramprob_header; elif ulp == udp: sc = check_icmp6_error_dst_unreach_noport; elif ulp == tcp: sc = check_tcp_rst; else: sys.exit(2) # Start sniffing on recvif sniffer = Sniffer.Sniffer(args, sc) sp.sendp(pkt, iface=args.sendif[0], verbose=False) sleep(0.10) sniffer.setEnd() sniffer.join() if not sniffer.foundCorrectPacket: sys.exit(not ok) sys.exit(0) if __name__ == '__main__': main() diff --git a/tests/sys/netinet6/frag6/frag6_01.py b/tests/sys/netinet6/frag6/frag6_01.py index 5a6b327337c6..efa99ce65759 100644 --- a/tests/sys/netinet6/frag6/frag6_01.py +++ b/tests/sys/netinet6/frag6/frag6_01.py @@ -1,115 +1,117 @@ #!/usr/bin/env python #- # SPDX-License-Identifier: BSD-2-Clause # # Copyright (c) 2019 Netflix, Inc. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # # $FreeBSD$ # import argparse +import logging +logging.getLogger("scapy").setLevel(logging.CRITICAL) import scapy.all as sp import socket import sys from sniffer import Sniffer from time import sleep def check_icmp6_error(args, packet): ip6 = packet.getlayer(sp.IPv6) if not ip6: return False oip6 = sp.IPv6(src=args.src[0], dst=args.to[0]) if ip6.dst != oip6.src: return False icmp6 = packet.getlayer(sp.ICMPv6ParamProblem) if not icmp6: return False # ICMP6_PARAMPROB_HEADER 0 if icmp6.code != 0: return False # Should we check the payload as well? # We are running in a very isolated environment and nothing else # should trigger an ICMPv6 Param Prob so leave it. #icmp6.display() return True def main(): parser = argparse.ArgumentParser("frag6.py", description="IPv6 fragementation test tool") parser.add_argument('--sendif', nargs=1, required=True, help='The interface through which the packet will be sent') parser.add_argument('--recvif', nargs=1, required=True, help='The interface on which to check for the packet') parser.add_argument('--src', nargs=1, required=True, help='The source IP address') parser.add_argument('--to', nargs=1, required=True, help='The destination IP address') parser.add_argument('--debug', required=False, action='store_true', help='Enable test debugging') args = parser.parse_args() # Start sniffing on recvif sniffer = Sniffer(args, check_icmp6_error) ######################################################################## # # A single start fragment with zero length IPv6 header (jumbo). # Make sure we do hit the Fragment case, which is tricky as the # jumbogram needs to be > 64k. # # A: Jumbo-Fragment not allowed. # R: ICMPv6 param problem. # #data = "6" * (65536 - 2 - 6 - 8 - 8) data = "6" * 65512 ip6f01 = sp.Ether() / \ sp.IPv6(src=args.src[0], dst=args.to[0], plen=0) / \ sp.IPv6ExtHdrHopByHop(options=sp.Jumbo(jumboplen=65536)) / \ sp.IPv6ExtHdrFragment(offset=0, m=1, id=6) / \ sp.UDP(dport=3456, sport=6543) / \ data if args.debug : ip6f01.display() sp.sendp(ip6f01, iface=args.sendif[0], verbose=False) # We should only need to sleep 0.10 but it seems scapy # takes time for this one. sleep(75) sniffer.setEnd() sniffer.join() if not sniffer.foundCorrectPacket: sys.exit(1) sys.exit(0) if __name__ == '__main__': main() diff --git a/tests/sys/netinet6/frag6/frag6_02.py b/tests/sys/netinet6/frag6/frag6_02.py index 945b428c3dac..794801b4a819 100644 --- a/tests/sys/netinet6/frag6/frag6_02.py +++ b/tests/sys/netinet6/frag6/frag6_02.py @@ -1,109 +1,111 @@ #!/usr/bin/env python #- # SPDX-License-Identifier: BSD-2-Clause # # Copyright (c) 2019 Netflix, Inc. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # # $FreeBSD$ # import argparse +import logging +logging.getLogger("scapy").setLevel(logging.CRITICAL) import scapy.all as sp import socket import sys from sniffer import Sniffer from time import sleep def check_icmp6_error(args, packet): ip6 = packet.getlayer(sp.IPv6) if not ip6: return False oip6 = sp.IPv6(src=args.src[0], dst=args.to[0]) if ip6.dst != oip6.src: return False icmp6 = packet.getlayer(sp.ICMPv6ParamProblem) if not icmp6: return False # ICMP6_PARAMPROB_HEADER 0 if icmp6.code != 0: return False # Should we check the payload as well? # We are running in a very isolated environment and nothing else # should trigger an ICMPv6 Param Prob so leave it. #icmp6.display() return True def main(): parser = argparse.ArgumentParser("frag6.py", description="IPv6 fragementation test tool") parser.add_argument('--sendif', nargs=1, required=True, help='The interface through which the packet will be sent') parser.add_argument('--recvif', nargs=1, required=True, help='The interface on which to check for the packet') parser.add_argument('--src', nargs=1, required=True, help='The source IP address') parser.add_argument('--to', nargs=1, required=True, help='The destination IP address') parser.add_argument('--debug', required=False, action='store_true', help='Enable test debugging') args = parser.parse_args() # Start sniffing on recvif sniffer = Sniffer(args, check_icmp6_error) ######################################################################## # # A single start fragment with payload length not % 8. # # A: Error handling in code. # R: ICMPv6 param problem. # data = "6" * 1287 ip6f01 = sp.Ether() / \ sp.IPv6(src=args.src[0], dst=args.to[0]) / \ sp.IPv6ExtHdrFragment(offset=0, m=1, id=5) / \ sp.UDP(dport=3456, sport=6543) / \ data if args.debug : ip6f01.display() sp.sendp(ip6f01, iface=args.sendif[0], verbose=False) sleep(0.10) sniffer.setEnd() sniffer.join() if not sniffer.foundCorrectPacket: sys.exit(1) sys.exit(0) if __name__ == '__main__': main() diff --git a/tests/sys/netinet6/frag6/frag6_03.py b/tests/sys/netinet6/frag6/frag6_03.py index 341de7915bd8..434dfe554a5a 100644 --- a/tests/sys/netinet6/frag6/frag6_03.py +++ b/tests/sys/netinet6/frag6/frag6_03.py @@ -1,134 +1,136 @@ #!/usr/bin/env python #- # SPDX-License-Identifier: BSD-2-Clause # # Copyright (c) 2019 Netflix, Inc. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # # $FreeBSD$ # import argparse +import logging +logging.getLogger("scapy").setLevel(logging.CRITICAL) import scapy.all as sp import socket import sys from sniffer import Sniffer from time import sleep def check_icmp6_error(args, packet): ip6 = packet.getlayer(sp.IPv6) if not ip6: return False oip6 = sp.IPv6(src=args.src[0], dst=args.to[0]) if ip6.dst != oip6.src: return False icmp6 = packet.getlayer(sp.ICMPv6DestUnreach) if not icmp6: return False # ICMP6_DST_UNREACH_NOPORT 4 if icmp6.code != 4: return False # Should we check the payload as well? # We are running in a very isolated environment and nothing else # should trigger an ICMPv6 Dest Unreach / Port Unreach so leave it. #icmp6.display() return True def main(): parser = argparse.ArgumentParser("frag6.py", description="IPv6 fragementation test tool") parser.add_argument('--sendif', nargs=1, required=True, help='The interface through which the packet will be sent') parser.add_argument('--recvif', nargs=1, required=True, help='The interface on which to check for the packet') parser.add_argument('--src', nargs=1, required=True, help='The source IP address') parser.add_argument('--to', nargs=1, required=True, help='The destination IP address') parser.add_argument('--debug', required=False, action='store_true', help='Enable test debugging') args = parser.parse_args() # Start sniffing on recvif sniffer = Sniffer(args, check_icmp6_error) ######################################################################## # # Atomic fragment. # # A: Nothing listening on UDP port. # R: ICMPv6 dst unreach, unreach port. # ip6f01 = sp.Ether() / \ sp.IPv6(src=args.src[0], dst=args.to[0]) / \ sp.IPv6ExtHdrFragment(offset=0, m=0, id=3) / \ sp.UDP(dport=3456, sport=6543) if args.debug : ip6f01.display() sp.sendp(ip6f01, iface=args.sendif[0], verbose=False) sleep(0.10) sniffer.setEnd() sniffer.join() if not sniffer.foundCorrectPacket: sys.exit(1) # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # ## # # Atomic fragment with extension header. # # A: Nothing listening on UDP port. # R: ICMPv6 dst unreach, unreach port. # # Start sniffing on recvif sniffer = Sniffer(args, check_icmp6_error) ip6f01 = sp.Ether() / \ sp.IPv6(src=args.src[0], dst=args.to[0]) / \ sp.IPv6ExtHdrDestOpt(options = \ sp.PadN(optdata="\x00\x00\x00\x00\x00\x00")) / \ sp.IPv6ExtHdrFragment(offset=0, m=0, id=0x3001) / \ sp.UDP(dport=3456, sport=6543) if args.debug : ip6f01.display() sp.sendp(ip6f01, iface=args.sendif[0], verbose=False) sleep(0.10) sniffer.setEnd() sniffer.join() if not sniffer.foundCorrectPacket: sys.exit(1) sys.exit(0) if __name__ == '__main__': main() diff --git a/tests/sys/netinet6/frag6/frag6_04.py b/tests/sys/netinet6/frag6/frag6_04.py index 6c35d4858ccb..8f0a20e003da 100644 --- a/tests/sys/netinet6/frag6/frag6_04.py +++ b/tests/sys/netinet6/frag6/frag6_04.py @@ -1,106 +1,108 @@ #!/usr/bin/env python #- # SPDX-License-Identifier: BSD-2-Clause # # Copyright (c) 2019 Netflix, Inc. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # # $FreeBSD$ # import argparse +import logging +logging.getLogger("scapy").setLevel(logging.CRITICAL) import scapy.all as sp import socket import sys from sniffer import Sniffer from time import sleep def check_icmp6_error(args, packet): ip6 = packet.getlayer(sp.IPv6) if not ip6: return False oip6 = sp.IPv6(src=args.src[0], dst=args.to[0]) if ip6.dst != oip6.src: return False icmp6 = packet.getlayer(sp.ICMPv6ParamProblem) if not icmp6: return False # ICMP6_PARAMPROB_HEADER 0 if icmp6.code != 0: return False # Should we check the payload as well? # We are running in a very isolated environment and nothing else # should trigger an ICMPv6 Param Prob so leave it. #icmp6.display() return True def main(): parser = argparse.ArgumentParser("frag6.py", description="IPv6 fragementation test tool") parser.add_argument('--sendif', nargs=1, required=True, help='The interface through which the packet will be sent') parser.add_argument('--recvif', nargs=1, required=True, help='The interface on which to check for the packet') parser.add_argument('--src', nargs=1, required=True, help='The source IP address') parser.add_argument('--to', nargs=1, required=True, help='The destination IP address') parser.add_argument('--debug', required=False, action='store_true', help='Enable test debugging') args = parser.parse_args() # Start sniffing on recvif sniffer = Sniffer(args, check_icmp6_error) ######################################################################## # # 0-byte first fragment. # # A: 0-byte fragment payload not allowed. Discarded. # R: ICMPv6 param prob, paramprob header. # ip6f01 = sp.Ether() / \ sp.IPv6(src=args.src[0], dst=args.to[0]) / \ sp.IPv6ExtHdrFragment(offset=0, m=1, id=4) if args.debug : ip6f01.display() sp.sendp(ip6f01, iface=args.sendif[0], verbose=False) sleep(0.10) sniffer.setEnd() sniffer.join() if not sniffer.foundCorrectPacket: sys.exit(1) sys.exit(0) if __name__ == '__main__': main() diff --git a/tests/sys/netinet6/frag6/frag6_05.py b/tests/sys/netinet6/frag6/frag6_05.py index d67c35581bbf..f9bc947d5465 100644 --- a/tests/sys/netinet6/frag6/frag6_05.py +++ b/tests/sys/netinet6/frag6/frag6_05.py @@ -1,84 +1,86 @@ #!/usr/bin/env python #- # SPDX-License-Identifier: BSD-2-Clause # # Copyright (c) 2019 Netflix, Inc. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # # $FreeBSD$ # import argparse +import logging +logging.getLogger("scapy").setLevel(logging.CRITICAL) import scapy.all as sp import socket import sys from time import sleep def main(): parser = argparse.ArgumentParser("frag6.py", description="IPv6 fragementation test tool") parser.add_argument('--sendif', nargs=1, required=True, help='The interface through which the packet will be sent') parser.add_argument('--recvif', nargs=1, required=True, help='The interface on which to check for the packet') parser.add_argument('--src', nargs=1, required=True, help='The source IP address') parser.add_argument('--to', nargs=1, required=True, help='The destination IP address') parser.add_argument('--debug', required=False, action='store_true', help='Enable test debugging') args = parser.parse_args() ######################################################################## # # Sysctl set to accept (no|maximum 10) fragments. # # A: Discarded. # R: Silence (statistics only) or ICMPv6 timeout expiry. # data = "6" * 1280 bfid = 0x5001 for i in range(20): fid = bfid + i ip6f01 = sp.Ether() / \ sp.IPv6(src=args.src[0], dst=args.to[0]) / \ sp.IPv6ExtHdrFragment(offset=0, m=1, id=fid) / \ sp.UDP(dport=3456, sport=6543) / \ data if args.debug : ip6f01.display() sp.sendp(ip6f01, iface=args.sendif[0], verbose=False) # Wait for possible expiry to happen. sleep(75) sys.exit(0) if __name__ == '__main__': main() diff --git a/tests/sys/netinet6/frag6/frag6_06.py b/tests/sys/netinet6/frag6/frag6_06.py index 42c8b02042cd..ca4e9b1a5ca0 100644 --- a/tests/sys/netinet6/frag6/frag6_06.py +++ b/tests/sys/netinet6/frag6/frag6_06.py @@ -1,81 +1,83 @@ #!/usr/bin/env python #- # SPDX-License-Identifier: BSD-2-Clause # # Copyright (c) 2019 Netflix, Inc. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # # $FreeBSD$ # import argparse +import logging +logging.getLogger("scapy").setLevel(logging.CRITICAL) import scapy.all as sp import socket import sys from time import sleep def main(): parser = argparse.ArgumentParser("frag6.py", description="IPv6 fragementation test tool") parser.add_argument('--sendif', nargs=1, required=True, help='The interface through which the packet will be sent') parser.add_argument('--recvif', nargs=1, required=True, help='The interface on which to check for the packet') parser.add_argument('--src', nargs=1, required=True, help='The source IP address') parser.add_argument('--to', nargs=1, required=True, help='The destination IP address') parser.add_argument('--debug', required=False, action='store_true', help='Enable test debugging') args = parser.parse_args() ######################################################################## # # Sysctl set to accept (no|maximum 10) fragments on a reassembly queue. # # A: Discarded. # R: Silence (statistics only) or ICMPv6 timeout expiry. # data = "66666666" for i in range(20): foffset=(i * (int)(16 / 8)) ip6f01 = sp.Ether() / \ sp.IPv6(src=args.src[0], dst=args.to[0]) / \ sp.IPv6ExtHdrFragment(offset=foffset, m=1, id=6) / \ sp.UDP(dport=3456, sport=6543) / \ data if args.debug : ip6f01.display() sp.sendp(ip6f01, iface=args.sendif[0], verbose=False) sys.exit(0) if __name__ == '__main__': main() diff --git a/tests/sys/netinet6/frag6/frag6_07.py b/tests/sys/netinet6/frag6/frag6_07.py index c84a783137d3..231f49eac2e0 100644 --- a/tests/sys/netinet6/frag6/frag6_07.py +++ b/tests/sys/netinet6/frag6/frag6_07.py @@ -1,178 +1,180 @@ #!/usr/bin/env python #- # SPDX-License-Identifier: BSD-2-Clause # # Copyright (c) 2019 Netflix, Inc. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # # $FreeBSD$ # import argparse +import logging +logging.getLogger("scapy").setLevel(logging.CRITICAL) import scapy.all as sp import socket import sys from sniffer import Sniffer from time import sleep def check_icmp6_error(args, packet): ip6 = packet.getlayer(sp.IPv6) if not ip6: return False oip6 = sp.IPv6(src=args.src[0], dst=args.to[0]) if ip6.dst != oip6.src: return False icmp6 = packet.getlayer(sp.ICMPv6ParamProblem) if not icmp6: return False # ICMP6_PARAMPROB_HEADER 0 if icmp6.code != 0: return False # Should we check the payload as well? # We are running in a very isolated environment and nothing else # should trigger an ICMPv6 Param Prob so leave it. #icmp6.display() return True def check_icmp6_error_2(args, packet): ip6 = packet.getlayer(sp.IPv6) if not ip6: return False oip6 = sp.IPv6(src=args.src[0], dst=args.to[0]) if ip6.dst != oip6.src: return False icmp6 = packet.getlayer(sp.ICMPv6TimeExceeded) if not icmp6: return False # ICMP6_TIME_EXCEED_REASSEMBLY 1 if icmp6.code != 1: return False # Should we check the payload as well? # We are running in a very isolated environment and nothing else # should trigger an ICMPv6 Time Exceeded / Frag reassembly so leave it. #icmp6.display() return True def main(): parser = argparse.ArgumentParser("frag6.py", description="IPv6 fragementation test tool") parser.add_argument('--sendif', nargs=1, required=True, help='The interface through which the packet will be sent') parser.add_argument('--recvif', nargs=1, required=True, help='The interface on which to check for the packet') parser.add_argument('--src', nargs=1, required=True, help='The source IP address') parser.add_argument('--to', nargs=1, required=True, help='The destination IP address') parser.add_argument('--debug', required=False, action='store_true', help='Enable test debugging') args = parser.parse_args() # Start sniffing on recvif sniffer = Sniffer(args, check_icmp6_error) sniffer2 = Sniffer(args, check_icmp6_error_2) ######################################################################## # # Two fragments with payload and offset set to add up to >64k. # # Make a first fragment arrive and a second to explode everything. # # A: Reassembly failure. # R: ICMPv6 param prob, param header. # R: ICMPv6 timeout (1st frag, off=0) # data = "6" * 1280 ip6f01 = \ sp.Ether() / \ sp.IPv6(src=args.src[0], dst=args.to[0]) / \ sp.IPv6ExtHdrFragment(offset=0, m=1, id=7) / \ sp.UDP(dport=3456, sport=6543) / \ data ip6f02 = \ sp.Ether() / \ sp.IPv6(src=args.src[0], dst=args.to[0]) / \ sp.IPv6ExtHdrFragment(offset=0x1fff, m=1, id=7) / \ sp.UDP(dport=3456, sport=6543) / \ data if args.debug : ip6f01.display() ip6f02.display() sp.sendp(ip6f01, iface=args.sendif[0], verbose=False) sp.sendp(ip6f02, iface=args.sendif[0], verbose=False) sleep(1.00) sniffer.setEnd() sniffer.join() if not sniffer.foundCorrectPacket: sys.exit(1) # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # ## # # A fragment with payload and offset set to add up to >64k. # # Try again with the first packet to make things explode. # # A: Reassembly failure. # R: ICMPv6 param prob, param header. # # Start sniffing on recvif sniffer = Sniffer(args, check_icmp6_error) ip6f01 = \ sp.Ether() / \ sp.IPv6(src=args.src[0], dst=args.to[0]) / \ sp.IPv6ExtHdrFragment(offset=0x1fff, m=1, id=0x7001) / \ sp.UDP(dport=3456, sport=6543) / \ data if args.debug : ip6f01.display() sp.sendp(ip6f01, iface=args.sendif[0], verbose=False) sleep(0.10) sniffer.setEnd() sniffer.join() if not sniffer.foundCorrectPacket: sys.exit(1) # Wait for expiry from first test run. sleep(75) sniffer2.setEnd() sniffer2.join() if not sniffer2.foundCorrectPacket: sys.exit(1) sys.exit(0) if __name__ == '__main__': main() diff --git a/tests/sys/netinet6/frag6/frag6_08.py b/tests/sys/netinet6/frag6/frag6_08.py index fa17e1e5c774..25f57f702e71 100644 --- a/tests/sys/netinet6/frag6/frag6_08.py +++ b/tests/sys/netinet6/frag6/frag6_08.py @@ -1,152 +1,154 @@ #!/usr/bin/env python #- # SPDX-License-Identifier: BSD-2-Clause # # Copyright (c) 2019 Netflix, Inc. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # # $FreeBSD$ # import argparse +import logging +logging.getLogger("scapy").setLevel(logging.CRITICAL) import scapy.all as sp import socket import sys from sniffer import Sniffer from time import sleep def check_icmp6_error(args, packet): ip6 = packet.getlayer(sp.IPv6) if not ip6: return False oip6 = sp.IPv6(src=args.src[0], dst=args.to[0]) if ip6.dst != oip6.src: return False icmp6 = packet.getlayer(sp.ICMPv6ParamProblem) if not icmp6: return False # ICMP6_PARAMPROB_HEADER 0 if icmp6.code != 0: return False # Should we check the payload as well? # We are running in a very isolated environment and nothing else # should trigger an ICMPv6 Param Prob so leave it. #icmp6.display() return True def check_icmp6_error_2(args, packet): ip6 = packet.getlayer(sp.IPv6) if not ip6: return False oip6 = sp.IPv6(src=args.src[0], dst=args.to[0]) if ip6.dst != oip6.src: return False icmp6 = packet.getlayer(sp.ICMPv6TimeExceeded) if not icmp6: return False # ICMP6_TIME_EXCEED_REASSEMBLY 1 if icmp6.code != 1: return False # Should we check the payload as well? # We are running in a very isolated environment and nothing else # should trigger an ICMPv6 Time Exceeded / Frag reassembly so leave it. #icmp6.display() return True def main(): parser = argparse.ArgumentParser("frag6.py", description="IPv6 fragementation test tool") parser.add_argument('--sendif', nargs=1, required=True, help='The interface through which the packet will be sent') parser.add_argument('--recvif', nargs=1, required=True, help='The interface on which to check for the packet') parser.add_argument('--src', nargs=1, required=True, help='The source IP address') parser.add_argument('--to', nargs=1, required=True, help='The destination IP address') parser.add_argument('--debug', required=False, action='store_true', help='Enable test debugging') args = parser.parse_args() # Start sniffing on recvif sniffer = Sniffer(args, check_icmp6_error) sniffer2 = Sniffer(args, check_icmp6_error_2) ######################################################################## # # A fragment with payload and offset set to add up to >64k when # another frag with offset=0 arrives and has an unfrag part. # This is us checking for all fragments queued already when the # one with off=0 arrives. Note: unless the off=0 has its own problem # it will be queued and off!=0 ones might be expunged with param prob. # # A: Reassembly failure, timeout after # R: ICMPv6 param prob, param header (1st frag) # R: ICMPv6 time exceeded (2nd frag, as off=0) # data = "6" * 15 ip6f01 = \ sp.Ether() / \ sp.IPv6(src=args.src[0], dst=args.to[0]) / \ sp.IPv6ExtHdrFragment(offset=0x1ffc, m=0, id=8) / \ sp.UDP(dport=3456, sport=6543) / \ data data = "6" * 8 ip6f02 = \ sp.Ether() / \ sp.IPv6(src=args.src[0], dst=args.to[0]) / \ sp.IPv6ExtHdrDestOpt(options = \ sp.PadN(optdata="\x00\x00\x00\x00\x00\x00")) / \ sp.IPv6ExtHdrFragment(offset=0, m=1, id=8) / \ sp.UDP(dport=3456, sport=6543) / \ data if args.debug : ip6f01.display() ip6f02.display() sp.sendp(ip6f01, iface=args.sendif[0], verbose=False) sp.sendp(ip6f02, iface=args.sendif[0], verbose=False) sleep(1.00) sniffer.setEnd() sniffer.join() if not sniffer.foundCorrectPacket: sys.exit(1) sleep(75) sniffer2.setEnd() sniffer2.join() if not sniffer2.foundCorrectPacket: sys.exit(1) sys.exit(0) if __name__ == '__main__': main() diff --git a/tests/sys/netinet6/frag6/frag6_09.py b/tests/sys/netinet6/frag6/frag6_09.py index 6e9771bc7d58..63ec646e1175 100644 --- a/tests/sys/netinet6/frag6/frag6_09.py +++ b/tests/sys/netinet6/frag6/frag6_09.py @@ -1,109 +1,111 @@ #!/usr/bin/env python #- # SPDX-License-Identifier: BSD-2-Clause # # Copyright (c) 2019 Netflix, Inc. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # # $FreeBSD$ # import argparse +import logging +logging.getLogger("scapy").setLevel(logging.CRITICAL) import scapy.all as sp import socket import sys from sniffer import Sniffer from time import sleep def check_icmp6_error(args, packet): ip6 = packet.getlayer(sp.IPv6) if not ip6: return False oip6 = sp.IPv6(src=args.src[0], dst=args.to[0]) if ip6.dst != oip6.src: return False icmp6 = packet.getlayer(sp.ICMPv6TimeExceeded) if not icmp6: return False # ICMP6_TIME_EXCEED_REASSEMBLY 1 if icmp6.code != 1: return False # Should we check the payload as well? # We are running in a very isolated environment and nothing else # should trigger an ICMPv6 Time Exceeded / Frag reassembly so leave it. #icmp6.display() return True def main(): parser = argparse.ArgumentParser("frag6.py", description="IPv6 fragementation test tool") parser.add_argument('--sendif', nargs=1, required=True, help='The interface through which the packet will be sent') parser.add_argument('--recvif', nargs=1, required=True, help='The interface on which to check for the packet') parser.add_argument('--src', nargs=1, required=True, help='The source IP address') parser.add_argument('--to', nargs=1, required=True, help='The destination IP address') parser.add_argument('--debug', required=False, action='store_true', help='Enable test debugging') args = parser.parse_args() # Start sniffing on recvif sniffer = Sniffer(args, check_icmp6_error) ######################################################################## # # A single start fragment. # # A: Waiting for more data. # R: Timeout / Expiry. # ip6f01 = sp.Ether() / \ sp.IPv6(src=args.src[0], dst=args.to[0]) / \ sp.IPv6ExtHdrFragment(offset=0, m=1, id=2) / \ sp.UDP(dport=3456, sport=6543) if args.debug : ip6f01.display() sp.sendp(ip6f01, iface=args.sendif[0], verbose=False) # Wait for ICMPv6 error generation on timeout. sleep(75) sniffer.setEnd() sniffer.join() if not sniffer.foundCorrectPacket: sys.exit(1) sys.exit(0) if __name__ == '__main__': main() diff --git a/tests/sys/netinet6/frag6/frag6_10.py b/tests/sys/netinet6/frag6/frag6_10.py index 02d25bd96450..fcd331190c02 100644 --- a/tests/sys/netinet6/frag6/frag6_10.py +++ b/tests/sys/netinet6/frag6/frag6_10.py @@ -1,81 +1,83 @@ #!/usr/bin/env python #- # SPDX-License-Identifier: BSD-2-Clause # # Copyright (c) 2019 Netflix, Inc. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # # $FreeBSD$ # import argparse +import logging +logging.getLogger("scapy").setLevel(logging.CRITICAL) import scapy.all as sp import socket import sys from time import sleep def main(): parser = argparse.ArgumentParser("frag6.py", description="IPv6 fragementation test tool") parser.add_argument('--sendif', nargs=1, required=True, help='The interface through which the packet will be sent') parser.add_argument('--recvif', nargs=1, required=True, help='The interface on which to check for the packet') parser.add_argument('--src', nargs=1, required=True, help='The source IP address') parser.add_argument('--to', nargs=1, required=True, help='The destination IP address') parser.add_argument('--debug', required=False, action='store_true', help='Enable test debugging') args = parser.parse_args() ######################################################################## # # A single middle fragment. # # A: Waiting for more data. # R: Timeout / Expiry. # ip6f01 = sp.Ether() / \ sp.IPv6(src=args.src[0], dst=args.to[0]) / \ sp.IPv6ExtHdrFragment(offset=161, m=1, id=7) / \ sp.UDP(dport=3456, sport=6543) if args.debug : ip6f01.display() sp.sendp(ip6f01, iface=args.sendif[0], verbose=False) # We do not generate ICMPv6 for non-off=0-segments. # Wait for expiry. sleep(75) sys.exit(0) if __name__ == '__main__': main() diff --git a/tests/sys/netinet6/frag6/frag6_11.py b/tests/sys/netinet6/frag6/frag6_11.py index a5c8e1918430..6b9643337597 100644 --- a/tests/sys/netinet6/frag6/frag6_11.py +++ b/tests/sys/netinet6/frag6/frag6_11.py @@ -1,81 +1,83 @@ #!/usr/bin/env python #- # SPDX-License-Identifier: BSD-2-Clause # # Copyright (c) 2019 Netflix, Inc. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # # $FreeBSD$ # import argparse +import logging +logging.getLogger("scapy").setLevel(logging.CRITICAL) import scapy.all as sp import socket import sys from time import sleep def main(): parser = argparse.ArgumentParser("frag6.py", description="IPv6 fragementation test tool") parser.add_argument('--sendif', nargs=1, required=True, help='The interface through which the packet will be sent') parser.add_argument('--recvif', nargs=1, required=True, help='The interface on which to check for the packet') parser.add_argument('--src', nargs=1, required=True, help='The source IP address') parser.add_argument('--to', nargs=1, required=True, help='The destination IP address') parser.add_argument('--debug', required=False, action='store_true', help='Enable test debugging') args = parser.parse_args() ######################################################################## # # A single last fragment. # # A: Waiting for more data. # R: Timeout / Expiry. # ip6f01 = sp.Ether() / \ sp.IPv6(src=args.src[0], dst=args.to[0]) / \ sp.IPv6ExtHdrFragment(offset=321, m=0, id=8) / \ sp.UDP(dport=3456, sport=6543) if args.debug : ip6f01.display() sp.sendp(ip6f01, iface=args.sendif[0], verbose=False) # Wait for expiration to happen. We will not see an ICMPv6 as there # is no frag with offset=0. sleep(75) sys.exit(0) if __name__ == '__main__': main() diff --git a/tests/sys/netinet6/frag6/frag6_12.py b/tests/sys/netinet6/frag6/frag6_12.py index 68ea180599e1..a683782f2b69 100644 --- a/tests/sys/netinet6/frag6/frag6_12.py +++ b/tests/sys/netinet6/frag6/frag6_12.py @@ -1,111 +1,113 @@ #!/usr/bin/env python #- # SPDX-License-Identifier: BSD-2-Clause # # Copyright (c) 2019 Netflix, Inc. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # # $FreeBSD$ # import argparse +import logging +logging.getLogger("scapy").setLevel(logging.CRITICAL) import scapy.all as sp import socket import sys from sniffer import Sniffer from time import sleep def check_icmp6_error(args, packet): ip6 = packet.getlayer(sp.IPv6) if not ip6: return False oip6 = sp.IPv6(src=args.src[0], dst=args.to[0]) if ip6.dst != oip6.src: return False icmp6 = packet.getlayer(sp.ICMPv6TimeExceeded) if not icmp6: return False # ICMP6_TIME_EXCEED_REASSEMBLY 1 if icmp6.code != 1: return False # Should we check the payload as well? # We are running in a very isolated environment and nothing else # should trigger an ICMPv6 Time Exceeded / Frag reassembly so leave it. #icmp6.display() return True def main(): parser = argparse.ArgumentParser("frag6.py", description="IPv6 fragementation test tool") parser.add_argument('--sendif', nargs=1, required=True, help='The interface through which the packet will be sent') parser.add_argument('--recvif', nargs=1, required=True, help='The interface on which to check for the packet') parser.add_argument('--src', nargs=1, required=True, help='The source IP address') parser.add_argument('--to', nargs=1, required=True, help='The destination IP address') parser.add_argument('--debug', required=False, action='store_true', help='Enable test debugging') args = parser.parse_args() # Start sniffing on recvif sniffer = Sniffer(args, check_icmp6_error) ######################################################################## # # A single start fragment with payload. # # A: Waiting for more data. # R: Timeout / Expiry. # data = "6" * 1280 ip6f01 = sp.Ether() / \ sp.IPv6(src=args.src[0], dst=args.to[0]) / \ sp.IPv6ExtHdrFragment(offset=0, m=1, id=3) / \ sp.UDP(dport=3456, sport=6543) / \ data if args.debug : ip6f01.display() sp.sendp(ip6f01, iface=args.sendif[0], verbose=False) # Wait for ICMPv6 error generation on timeout. sleep(75) sniffer.setEnd() sniffer.join() if not sniffer.foundCorrectPacket: sys.exit(1) sys.exit(0) if __name__ == '__main__': main() diff --git a/tests/sys/netinet6/frag6/frag6_13.py b/tests/sys/netinet6/frag6/frag6_13.py index da3b2afe239f..e377a4272fa1 100644 --- a/tests/sys/netinet6/frag6/frag6_13.py +++ b/tests/sys/netinet6/frag6/frag6_13.py @@ -1,122 +1,124 @@ #!/usr/bin/env python #- # SPDX-License-Identifier: BSD-2-Clause # # Copyright (c) 2019 Netflix, Inc. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # # $FreeBSD$ # import argparse +import logging +logging.getLogger("scapy").setLevel(logging.CRITICAL) import scapy.all as sp import socket import sys from time import sleep def main(): parser = argparse.ArgumentParser("frag6.py", description="IPv6 fragementation test tool") parser.add_argument('--sendif', nargs=1, required=True, help='The interface through which the packet will be sent') parser.add_argument('--recvif', nargs=1, required=True, help='The interface on which to check for the packet') parser.add_argument('--src', nargs=1, required=True, help='The source IP address') parser.add_argument('--to', nargs=1, required=True, help='The destination IP address') parser.add_argument('--debug', required=False, action='store_true', help='Enable test debugging') args = parser.parse_args() ######################################################################## # # Two fragments with different ECN (Traffic Clas) bits to trigger # error cases. # # A: Reassembly failure. # R: ip6f02 dropped / Timeout (not waiting for). # data = "6" * 8 # IPTOS_ECN_NOTECT ip6f01 = \ sp.Ether() / \ sp.IPv6(src=args.src[0], dst=args.to[0], tc=0x00) / \ sp.IPv6ExtHdrFragment(offset=0, m=1, id=13) / \ sp.UDP(dport=3456, sport=6543) / \ data # IPTOS_ECN_CE ip6f02 = \ sp.Ether() / \ sp.IPv6(src=args.src[0], dst=args.to[0], tc=0x03) / \ sp.IPv6ExtHdrFragment(offset=16, m=0, id=13) / \ sp.UDP(dport=3456, sport=6543) / \ data if args.debug : ip6f01.display() ip6f02.display() sp.sendp(ip6f01, iface=args.sendif[0], verbose=False) sp.sendp(ip6f02, iface=args.sendif[0], verbose=False) # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # ## # # Two fragments with different ECN (Traffic Clas) bits to trigger # error cases. # # A: Reassembly failure. # R: ip6f02 dropped / Timeout (not waiting for). # # IPTOS_ECN_ECT1 ip6f01 = \ sp.Ether() / \ sp.IPv6(src=args.src[0], dst=args.to[0], tc=0x01) / \ sp.IPv6ExtHdrFragment(offset=0, m=1, id=0x1301) / \ sp.UDP(dport=3456, sport=6543) / \ data # IPTOS_ECN_NOTECT ip6f02 = \ sp.Ether() / \ sp.IPv6(src=args.src[0], dst=args.to[0], tc=0x00) / \ sp.IPv6ExtHdrFragment(offset=16, m=0, id=0x1301) / \ sp.UDP(dport=3456, sport=6543) / \ data if args.debug : ip6f01.display() ip6f02.display() sp.sendp(ip6f01, iface=args.sendif[0], verbose=False) sp.sendp(ip6f02, iface=args.sendif[0], verbose=False) # Wait for expiry. sleep(75) sys.exit(0) if __name__ == '__main__': main() diff --git a/tests/sys/netinet6/frag6/frag6_14.py b/tests/sys/netinet6/frag6/frag6_14.py index 915571cfc36f..b53a65e67529 100644 --- a/tests/sys/netinet6/frag6/frag6_14.py +++ b/tests/sys/netinet6/frag6/frag6_14.py @@ -1,137 +1,139 @@ #!/usr/bin/env python #- # SPDX-License-Identifier: BSD-2-Clause # # Copyright (c) 2019 Netflix, Inc. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # # $FreeBSD$ # import argparse +import logging +logging.getLogger("scapy").setLevel(logging.CRITICAL) import scapy.all as sp import socket import sys from time import sleep def main(): parser = argparse.ArgumentParser("frag6.py", description="IPv6 fragementation test tool") parser.add_argument('--sendif', nargs=1, required=True, help='The interface through which the packet will be sent') parser.add_argument('--recvif', nargs=1, required=True, help='The interface on which to check for the packet') parser.add_argument('--src', nargs=1, required=True, help='The source IP address') parser.add_argument('--to', nargs=1, required=True, help='The destination IP address') parser.add_argument('--debug', required=False, action='store_true', help='Enable test debugging') args = parser.parse_args() ######################################################################## # # Send multiple fragments, with an exact overlap in a middle one, # not finishing the full packet (and ignoring the content anyway). # # A: Reassembly failure. # R: dup dropped silently / Timeout (not waiting for). # data = "6" * 8 ip6f01 = \ sp.Ether() / \ sp.IPv6(src=args.src[0], dst=args.to[0]) / \ sp.IPv6ExtHdrFragment(offset=0, m=1, id=14) / \ sp.UDP(dport=3456, sport=6543) / \ data ip6f02 = \ sp.Ether() / \ sp.IPv6(src=args.src[0], dst=args.to[0]) / \ sp.IPv6ExtHdrFragment(offset=0x1000, m=0, id=14) / \ sp.UDP(dport=3456, sport=6543) / \ data ip6f03 = \ sp.Ether() / \ sp.IPv6(src=args.src[0], dst=args.to[0]) / \ sp.IPv6ExtHdrFragment(offset=16, m=1, id=14) / \ sp.UDP(dport=3456, sport=6543) / \ data ip6f04 = \ sp.Ether() / \ sp.IPv6(src=args.src[0], dst=args.to[0]) / \ sp.IPv6ExtHdrFragment(offset=16, m=1, id=14) / \ sp.UDP(dport=3456, sport=6543) / \ data if args.debug : ip6f01.display() ip6f02.display() ip6f03.display() ip6f04.display() sp.sendp(ip6f01, iface=args.sendif[0], verbose=False) sp.sendp(ip6f02, iface=args.sendif[0], verbose=False) sp.sendp(ip6f03, iface=args.sendif[0], verbose=False) sp.sendp(ip6f04, iface=args.sendif[0], verbose=False) # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # ## # # Send multiple fragments, with a partial overlap on the first one # not finishing the full packet (and ignoring the content anyway). # The second packet needs to be the first one in the fragment chain # in order to trigger the 2nd case to test. # # A: Reassembly failure. # R: dup dropped silently / Timeout (not waiting for). # data = "6" * 8 ip6f01 = \ sp.Ether() / \ sp.IPv6(src=args.src[0], dst=args.to[0]) / \ sp.IPv6ExtHdrFragment(offset=10, m=1, id=0x1401) / \ sp.UDP(dport=3456, sport=6543) / \ data ip6f02 = \ sp.Ether() / \ sp.IPv6(src=args.src[0], dst=args.to[0]) / \ sp.IPv6ExtHdrFragment(offset=9, m=0, id=0x1401) / \ sp.UDP(dport=3456, sport=6543) / \ data if args.debug : ip6f01.display() ip6f02.display() sp.sendp(ip6f01, iface=args.sendif[0], verbose=False) sp.sendp(ip6f02, iface=args.sendif[0], verbose=False) # Wait for expiry. sleep(75) sys.exit(0) if __name__ == '__main__': main() diff --git a/tests/sys/netinet6/frag6/frag6_15.py b/tests/sys/netinet6/frag6/frag6_15.py index f9922aaa9717..fff5c1efbe17 100644 --- a/tests/sys/netinet6/frag6/frag6_15.py +++ b/tests/sys/netinet6/frag6/frag6_15.py @@ -1,105 +1,107 @@ #!/usr/bin/env python #- # SPDX-License-Identifier: BSD-2-Clause # # Copyright (c) 2019 Netflix, Inc. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # # $FreeBSD$ # import argparse +import logging +logging.getLogger("scapy").setLevel(logging.CRITICAL) import scapy.all as sp import socket import sys from time import sleep def main(): parser = argparse.ArgumentParser("frag6.py", description="IPv6 fragementation test tool") parser.add_argument('--sendif', nargs=1, required=True, help='The interface through which the packet will be sent') parser.add_argument('--recvif', nargs=1, required=True, help='The interface on which to check for the packet') parser.add_argument('--src', nargs=1, required=True, help='The source IP address') parser.add_argument('--to', nargs=1, required=True, help='The destination IP address') parser.add_argument('--debug', required=False, action='store_true', help='Enable test debugging') args = parser.parse_args() ######################################################################## # # Sysctl set to accept maximum 3 segments on a fragmented packet. # The 4th packet will flush the entire q6. # # A: 4 Discarded. # R: Silence (statistics only) no ICMPv6 as we skip the off=0 segment. # data = "66666666" for i in range(4): foffset=16 + (i * (0x100 + (int)(16 / 8))) ip6f01 = sp.Ether() / \ sp.IPv6(src=args.src[0], dst=args.to[0]) / \ sp.IPv6ExtHdrFragment(offset=foffset, m=1, id=15) / \ sp.UDP(dport=3456, sport=6543) / \ data if args.debug : ip6f01.display() sp.sendp(ip6f01, iface=args.sendif[0], verbose=False) # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # ## # # Sysctl set to accept maximum 3 segments on a fragmented packet. # The 4th packet will flush the entire q6. # This time we play proper offset/length games on the packets in order # to trigger the 2nd test case, with the last packet still having m=1. # # A: 4 Discarded. # R: ICMPv6 timeout expired. # data = "66666666" for i in range(4): foffset=(i * (int)(16 / 8)) ip6f01 = sp.Ether() / \ sp.IPv6(src=args.src[0], dst=args.to[0]) / \ sp.IPv6ExtHdrFragment(offset=foffset, m=1, id=0x1501) / \ sp.UDP(dport=3456, sport=6543) / \ data if args.debug : ip6f01.display() sp.sendp(ip6f01, iface=args.sendif[0], verbose=False) sys.exit(0) if __name__ == '__main__': main() diff --git a/tests/sys/netinet6/frag6/frag6_16.py b/tests/sys/netinet6/frag6/frag6_16.py index 42ac8e2b4675..bf5b78cb6d6b 100644 --- a/tests/sys/netinet6/frag6/frag6_16.py +++ b/tests/sys/netinet6/frag6/frag6_16.py @@ -1,132 +1,134 @@ #!/usr/bin/env python #- # SPDX-License-Identifier: BSD-2-Clause # # Copyright (c) 2019 Netflix, Inc. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # # $FreeBSD$ # import argparse +import logging +logging.getLogger("scapy").setLevel(logging.CRITICAL) import scapy.all as sp import socket import sys from sniffer import Sniffer from time import sleep def check_icmp6_error(args, packet): ip6 = packet.getlayer(sp.IPv6) if not ip6: return False oip6 = sp.IPv6(src=args.src[0], dst=args.to[0]) if ip6.dst != oip6.src: return False icmp6 = packet.getlayer(sp.ICMPv6DestUnreach) if not icmp6: return False # ICMP6_DST_UNREACH_NOPORT 4 if icmp6.code != 4: return False # Should we check the payload as well? # We are running in a very isolated environment and nothing else # should trigger an ICMPv6 Dest Unreach / Port Unreach so leave it. #icmp6.display() return True def main(): parser = argparse.ArgumentParser("frag6.py", description="IPv6 fragementation test tool") parser.add_argument('--sendif', nargs=1, required=True, help='The interface through which the packet will be sent') parser.add_argument('--recvif', nargs=1, required=True, help='The interface on which to check for the packet') parser.add_argument('--src', nargs=1, required=True, help='The source IP address') parser.add_argument('--to', nargs=1, required=True, help='The destination IP address') parser.add_argument('--debug', required=False, action='store_true', help='Enable test debugging') args = parser.parse_args() # Start sniffing on recvif sniffer = Sniffer(args, check_icmp6_error) ######################################################################## # # A single fragmented packet with upper layer data in multiple segments # to pass fragmentation. # We need to do a bit of a dance to get the UDP checksum right. # # A: 1 reassembled packet # R: Statistics and ICMPv6 error (non-fragmentation) as no port open. # data = "6" * 1280 dataall = data * 30 ip6f01 = \ sp.IPv6(src=args.src[0], dst=args.to[0]) / \ sp.UDP(dport=3456, sport=6543) / \ dataall ip6fd = sp.IPv6(sp.raw(ip6f01)) ip6f01 = sp.Ether() / \ sp.IPv6(src=args.src[0], dst=args.to[0]) / \ sp.IPv6ExtHdrFragment(offset=0, m=1, id=16) / \ sp.UDP(dport=3456, sport=6543, len=ip6fd.len, chksum=ip6fd.chksum) / \ data if args.debug : ip6f01.display() sp.sendp(ip6f01, iface=args.sendif[0], verbose=False) foffset=(int)(1288/8) mbit=1 for i in range(1,30): if i == 29: mbit=0 ip6f0n = sp.Ether() / \ sp.IPv6(src=args.src[0], dst=args.to[0]) / \ sp.IPv6ExtHdrFragment(offset=foffset, m=mbit, id=16, nh=socket.IPPROTO_UDP) / \ data if args.debug : ip6f0n.display() sp.sendp(ip6f0n, iface=args.sendif[0], verbose=False) foffset += (int)(1280/8) sleep(0.10) sniffer.setEnd() sniffer.join() if not sniffer.foundCorrectPacket: sys.exit(1) sys.exit(0) if __name__ == '__main__': main() diff --git a/tests/sys/netinet6/frag6/frag6_17.py b/tests/sys/netinet6/frag6/frag6_17.py index a4d6deea0a24..b4a1a1898c2e 100644 --- a/tests/sys/netinet6/frag6/frag6_17.py +++ b/tests/sys/netinet6/frag6/frag6_17.py @@ -1,90 +1,92 @@ #!/usr/bin/env python #- # SPDX-License-Identifier: BSD-2-Clause # # Copyright (c) 2019 Netflix, Inc. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # # $FreeBSD$ # import argparse import random as random +import logging +logging.getLogger("scapy").setLevel(logging.CRITICAL) import scapy.all as sp import socket import sys def main(): parser = argparse.ArgumentParser("frag6.py", description="IPv6 fragementation test tool") parser.add_argument('--sendif', nargs=1, required=True, help='The interface through which the packet will be sent') parser.add_argument('--recvif', nargs=1, required=True, help='The interface on which to check for the packet') parser.add_argument('--src', nargs=1, required=True, help='The source IP address') parser.add_argument('--to', nargs=1, required=True, help='The destination IP address') parser.add_argument('--debug', required=False, action='store_true', help='Enable test debugging') args = parser.parse_args() ######################################################################## # # Send a sample of pseudo-random fragments into the system in order # to test vnet teardown. # # A: Cleaned up and freed # R: No panic (ignoring everything else) # random.seed() packets = []; for i in range(0,127): fid=random.randint(0,0xffff) foffset=random.randint(0,0xffff) fm=random.randint(0,1) fsrc=sp.RandIP6() ip6f01 = sp.Ether() / \ sp.IPv6(src=fsrc, dst=args.to[0]) / \ sp.IPv6ExtHdrFragment(offset=foffset, m=fm, id=fid) / \ sp.UDP(dport=3456, sport=6543) if args.debug: ip6f01.display() packets.append(ip6f01) for p in packets: sp.sendp(p, iface=args.sendif[0], verbose=False) sys.exit(0) if __name__ == '__main__': main() diff --git a/tests/sys/netinet6/frag6/frag6_18.py b/tests/sys/netinet6/frag6/frag6_18.py index a4d6deea0a24..b4a1a1898c2e 100644 --- a/tests/sys/netinet6/frag6/frag6_18.py +++ b/tests/sys/netinet6/frag6/frag6_18.py @@ -1,90 +1,92 @@ #!/usr/bin/env python #- # SPDX-License-Identifier: BSD-2-Clause # # Copyright (c) 2019 Netflix, Inc. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # # $FreeBSD$ # import argparse import random as random +import logging +logging.getLogger("scapy").setLevel(logging.CRITICAL) import scapy.all as sp import socket import sys def main(): parser = argparse.ArgumentParser("frag6.py", description="IPv6 fragementation test tool") parser.add_argument('--sendif', nargs=1, required=True, help='The interface through which the packet will be sent') parser.add_argument('--recvif', nargs=1, required=True, help='The interface on which to check for the packet') parser.add_argument('--src', nargs=1, required=True, help='The source IP address') parser.add_argument('--to', nargs=1, required=True, help='The destination IP address') parser.add_argument('--debug', required=False, action='store_true', help='Enable test debugging') args = parser.parse_args() ######################################################################## # # Send a sample of pseudo-random fragments into the system in order # to test vnet teardown. # # A: Cleaned up and freed # R: No panic (ignoring everything else) # random.seed() packets = []; for i in range(0,127): fid=random.randint(0,0xffff) foffset=random.randint(0,0xffff) fm=random.randint(0,1) fsrc=sp.RandIP6() ip6f01 = sp.Ether() / \ sp.IPv6(src=fsrc, dst=args.to[0]) / \ sp.IPv6ExtHdrFragment(offset=foffset, m=fm, id=fid) / \ sp.UDP(dport=3456, sport=6543) if args.debug: ip6f01.display() packets.append(ip6f01) for p in packets: sp.sendp(p, iface=args.sendif[0], verbose=False) sys.exit(0) if __name__ == '__main__': main() diff --git a/tests/sys/netinet6/frag6/frag6_19.py b/tests/sys/netinet6/frag6/frag6_19.py index 9248f5f40c43..f8613f757573 100644 --- a/tests/sys/netinet6/frag6/frag6_19.py +++ b/tests/sys/netinet6/frag6/frag6_19.py @@ -1,83 +1,85 @@ #!/usr/bin/env python #- # SPDX-License-Identifier: BSD-2-Clause # # Copyright (c) 2019 Netflix, Inc. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # # $FreeBSD$ # import argparse +import logging +logging.getLogger("scapy").setLevel(logging.CRITICAL) import scapy.all as sp import socket import sys def main(): parser = argparse.ArgumentParser("frag6.py", description="IPv6 fragementation test tool") parser.add_argument('--sendif', nargs=1, required=True, help='The interface through which the packet will be sent') parser.add_argument('--recvif', nargs=1, required=True, help='The interface on which to check for the packet') parser.add_argument('--src', nargs=1, required=True, help='The source IP address') parser.add_argument('--to', nargs=1, required=True, help='The destination IP address') parser.add_argument('--debug', required=False, action='store_true', help='Enable test debugging') args = parser.parse_args() ######################################################################## # # Send a sample of sequeneced ID fragments into the system in order # to test bucket distribution. # # A: No overflow at V_ip6_maxfragsperpacket == 64. # R: Stats only, timeout and no ICMPv6 (all ignored). # packets = []; data = "66666666" for i in range(0,127): ip6f01 = sp.Ether() / \ sp.IPv6(src=args.src[0], dst=args.to[0]) / \ sp.IPv6ExtHdrFragment(offset=1, m=1, id=i) / \ data if args.debug: ip6f01.display() packets.append(ip6f01) for p in packets: sp.sendp(p, iface=args.sendif[0], verbose=False) sys.exit(0) if __name__ == '__main__': main() diff --git a/tests/sys/netinet6/frag6/frag6_20.py b/tests/sys/netinet6/frag6/frag6_20.py index 4e935a1f640a..6dd4c2379357 100755 --- a/tests/sys/netinet6/frag6/frag6_20.py +++ b/tests/sys/netinet6/frag6/frag6_20.py @@ -1,137 +1,139 @@ #!/usr/bin/env python #- # SPDX-License-Identifier: BSD-2-Clause # # Copyright (c) 2019 Netflix, Inc. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # # $FreeBSD$ # import argparse +import logging +logging.getLogger("scapy").setLevel(logging.CRITICAL) import scapy.all as sp import socket import sys from sniffer import Sniffer from time import sleep def check_icmp6_error(args, packet): ip6 = packet.getlayer(sp.IPv6) if not ip6: return False oip6 = sp.IPv6(src=args.src[0], dst=args.to[0]) if ip6.dst != oip6.src: return False icmp6 = packet.getlayer(sp.ICMPv6TimeExceeded) if not icmp6: return False # ICMP6_TIME_EXCEED_REASSEMBLY 1 if icmp6.code != 1: return False # Should we check the payload as well? # We are running in a very isolated environment and nothing else # should trigger an ICMPv6 Time Exceeded / Frag reassembly so leave it. #icmp6.display() return True def main(): parser = argparse.ArgumentParser("frag6.py", description="IPv6 fragementation test tool") parser.add_argument('--sendif', nargs=1, required=True, help='The interface through which the packet will be sent') parser.add_argument('--recvif', nargs=1, required=True, help='The interface on which to check for the packet') parser.add_argument('--src', nargs=1, required=True, help='The source IP address') parser.add_argument('--to', nargs=1, required=True, help='The destination IP address') parser.add_argument('--debug', required=False, action='store_true', help='Enable test debugging') args = parser.parse_args() # Start sniffing on recvif sniffer = Sniffer(args, check_icmp6_error) ######################################################################## # # Send a proper first fragment (off=0) and a second fragment which # just fits the 64k. The re-send the first fragment with an extra # unfragmentable part making the 64k to exceed the limit. # This is to make sure we don't allow to update meta-data for a # 1st fragmented packet should a second arrive but given the # fragmentable part is an exact duplicate only that fragment # will be silently discarded. # # A: Reassembly failure, timeout after # R: ICMPv6 time exceeded / statistics for the duplicate # data = "6" * 8 ip6f00 = \ sp.Ether() / \ sp.IPv6(src=args.src[0], dst=args.to[0]) / \ sp.IPv6ExtHdrFragment(offset=0, m=1, id=20) / \ sp.UDP(dport=3456, sport=6543) / \ data data = "6" * 15 ip6f01 = \ sp.Ether() / \ sp.IPv6(src=args.src[0], dst=args.to[0]) / \ sp.IPv6ExtHdrFragment(offset=0x1ffc, m=0, id=20) / \ sp.UDP(dport=3456, sport=6543) / \ data data = "6" * 8 ip6f02 = \ sp.Ether() / \ sp.IPv6(src=args.src[0], dst=args.to[0]) / \ sp.IPv6ExtHdrDestOpt(options = \ sp.PadN(optdata="\x00\x00\x00\x00\x00\x00")) / \ sp.IPv6ExtHdrFragment(offset=0, m=1, id=20) / \ sp.UDP(dport=3456, sport=6543) / \ data if args.debug : ip6f00.display() ip6f01.display() ip6f02.display() sp.sendp(ip6f00, iface=args.sendif[0], verbose=False) sp.sendp(ip6f01, iface=args.sendif[0], verbose=False) sp.sendp(ip6f02, iface=args.sendif[0], verbose=False) sleep(75) sniffer.setEnd() sniffer.join() if not sniffer.foundCorrectPacket: sys.exit(1) sys.exit(0) if __name__ == '__main__': main() diff --git a/tests/sys/netinet6/frag6/sniffer.py b/tests/sys/netinet6/frag6/sniffer.py index 2c5f460488db..7e1d283dbf91 100644 --- a/tests/sys/netinet6/frag6/sniffer.py +++ b/tests/sys/netinet6/frag6/sniffer.py @@ -1,40 +1,42 @@ # $FreeBSD$ import threading +import logging +logging.getLogger("scapy").setLevel(logging.CRITICAL) import scapy.all as sp class Sniffer(threading.Thread): def __init__(self, args, check_function): threading.Thread.__init__(self) self._args = args self._recvif = args.recvif[0] self._check_function = check_function self.foundCorrectPacket = False self._endme = False self.start() def _checkPacket(self, packet): ret = self._check_function(self._args, packet) if ret: self.foundCorrectPacket = True return ret def setEnd(self): self._endme = True def stopFilter(self, pkt): if pkt is not None: self._checkPacket(pkt) if self.foundCorrectPacket or self._endme: return True else: return False def run(self): while True: self.packets = sp.sniff(iface=self._recvif, store=False, stop_filter=self.stopFilter, timeout=90) if self.stopFilter(None): break diff --git a/tests/sys/netinet6/mld.py b/tests/sys/netinet6/mld.py index b1fcf2f8e50a..1f43cee5d6ad 100644 --- a/tests/sys/netinet6/mld.py +++ b/tests/sys/netinet6/mld.py @@ -1,76 +1,78 @@ #!/usr/bin/env python #- # SPDX-License-Identifier: BSD-2-Clause # # Copyright (c) 2019 Netflix, Inc. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # # $FreeBSD$ # import argparse +import logging +logging.getLogger("scapy").setLevel(logging.CRITICAL) import scapy.all as sp import socket import sys import binascii def main(): parser = argparse.ArgumentParser("scapyi386.py", description="IPv6 Ethernet Dest MAC test") parser.add_argument('--sendif', nargs=1, required=True, help='The interface through which the packet will be sent') parser.add_argument('--recvif', nargs=1, required=True, help='The interface on which to check for the packet') parser.add_argument('--src', nargs=1, required=True, help='The source IP address') parser.add_argument('--to', nargs=1, required=True, help='The destination IP address') parser.add_argument('--debug', required=False, action='store_true', help='Enable test debugging') parser.add_argument('--mldraw01', required=False, action='store_true', help='Multicast Listener Query Raw01') args = parser.parse_args() pkt = None if args.mldraw01: pkt = sp.Ether() / \ sp.IPv6(dst="ff02::1", hlim=1, nh=0) / \ sp.IPv6ExtHdrHopByHop(options = sp.RouterAlert(value=0)) / \ sp.ICMPv6MLQuery() if pkt is None: sys.exit(1) if args.debug: pkt.display() sp.sendp(pkt, iface=args.sendif[0], verbose=False) sys.exit(0) if __name__ == '__main__': main() diff --git a/tests/sys/netinet6/redirect.py b/tests/sys/netinet6/redirect.py index 7066f8066518..4f785c615f62 100644 --- a/tests/sys/netinet6/redirect.py +++ b/tests/sys/netinet6/redirect.py @@ -1,84 +1,86 @@ #!/usr/bin/env python # - # SPDX-License-Identifier: BSD-2-Clause # # Copyright (c) 2020 Alexander V. Chernikov # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # # $FreeBSD$ # import argparse +import logging +logging.getLogger("scapy").setLevel(logging.CRITICAL) import scapy.all as sc import socket import sys import fcntl import struct def parse_args(): parser = argparse.ArgumentParser(description='ICMPv6 redirect generator') parser.add_argument('--smac', type=str, required=True, help='eth source mac') parser.add_argument('--dmac', type=str, required=True, help='eth dest mac') parser.add_argument('--sip', type=str, required=True, help='remote router ll source ip') parser.add_argument('--dip', type=str, required=True, help='local router ip') parser.add_argument('--iface', type=str, required=True, help='ifname to send packet to') parser.add_argument('--route', type=str, required=True, help='destination IP to redirect') parser.add_argument('--gw', type=str, required=True, help='redirect GW') return parser.parse_args() def construct_icmp6_redirect(smac, dmac, sip, dip, route_dst, route_gw): e = sc.Ether(src=smac, dst=dmac) l3 = sc.IPv6(src=sip, dst=dip) icmp6 = sc.ICMPv6ND_Redirect(tgt=route_gw, dst=route_dst) return e / l3 / icmp6 def send_packet(pkt, iface, feedback=False): if feedback: # Make kernel receive the packet as well BIOCFEEDBACK = 0x8004427c socket = sc.conf.L2socket(iface=args.iface) fcntl.ioctl(socket.ins, BIOCFEEDBACK, struct.pack('I', 1)) sc.sendp(pkt, socket=socket, verbose=True) else: sc.sendp(pkt, iface=iface, verbose=False) def main(): args = parse_args() pkt = construct_icmp6_redirect(args.smac, args.dmac, args.sip, args.dip, args.route, args.gw) send_packet(pkt, args.iface) if __name__ == '__main__': main() diff --git a/tests/sys/netinet6/scapyi386.py b/tests/sys/netinet6/scapyi386.py index 745c01b45881..ac0c877380c4 100644 --- a/tests/sys/netinet6/scapyi386.py +++ b/tests/sys/netinet6/scapyi386.py @@ -1,85 +1,87 @@ #!/usr/bin/env python #- # SPDX-License-Identifier: BSD-2-Clause # # Copyright (c) 2019 Netflix, Inc. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # # $FreeBSD$ # import argparse +import logging +logging.getLogger("scapy").setLevel(logging.CRITICAL) import scapy.all as sp import socket import sys def main(): parser = argparse.ArgumentParser("scapyi386.py", description="IPv6 Ethernet Dest MAC test") parser.add_argument('--sendif', nargs=1, required=True, help='The interface through which the packet will be sent') parser.add_argument('--recvif', nargs=1, required=True, help='The interface on which to check for the packet') parser.add_argument('--src', nargs=1, required=True, help='The source IP address') parser.add_argument('--to', nargs=1, required=True, help='The destination IP address') parser.add_argument('--debug', required=False, action='store_true', help='Enable test debugging') args = parser.parse_args() ######################################################################## # # A test case to check that IPv6 packets are sent with a proper # (resolved) Ethernet Destination MAC address instead of the BCAST one. # This was needed as test cases did not work properly on i386 due to a # scapy BPF parsing bug. (See PR 239380 and duplicates). # bcmac = sp.Ether(dst="ff:ff:ff:ff:ff:ff").dst data = "6" * 88 pkt = sp.Ether() / \ sp.IPv6(src=args.src[0], dst=args.to[0]) / \ sp.UDP(dport=3456, sport=6543) / \ data sp.sendp(pkt, iface=args.sendif[0], verbose=False) eth = pkt.getlayer(sp.Ether) if eth is None: print("No Ether in packet") pkt.display() sys.exit(1) if eth.dst == bcmac: print("Broadcast dMAC on packet") eth.display() sys.exit(1) sys.exit(0) if __name__ == '__main__': main() diff --git a/tests/sys/netpfil/common/pft_ping.py b/tests/sys/netpfil/common/pft_ping.py index de673f026c77..9cc7c5d5c5c0 100644 --- a/tests/sys/netpfil/common/pft_ping.py +++ b/tests/sys/netpfil/common/pft_ping.py @@ -1,332 +1,334 @@ #!/usr/bin/env python3 # # SPDX-License-Identifier: BSD-2-Clause # # Copyright (c) 2017 Kristof Provost # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. # import argparse +import logging +logging.getLogger("scapy").setLevel(logging.CRITICAL) import scapy.all as sp import socket import sys from sniffer import Sniffer PAYLOAD_MAGIC = bytes.fromhex('42c0ffee') dup_found = 0 def check_dup(args, packet): """ Verify that this is an ICMP packet, and that we only see one """ global dup_found icmp = packet.getlayer(sp.ICMP) if not icmp: return False raw = packet.getlayer(sp.Raw) if not raw: return False if raw.load != PAYLOAD_MAGIC: return False dup_found = dup_found + 1 return False def check_ping_request(args, packet): if args.ip6: return check_ping6_request(args, packet) else: return check_ping4_request(args, packet) def check_ping4_request(args, packet): """ Verify that the packet matches what we'd have sent """ dst_ip = args.to[0] ip = packet.getlayer(sp.IP) if not ip: return False if ip.dst != dst_ip: return False icmp = packet.getlayer(sp.ICMP) if not icmp: return False if sp.icmptypes[icmp.type] != 'echo-request': return False raw = packet.getlayer(sp.Raw) if not raw: return False if raw.load != PAYLOAD_MAGIC: return False # Wait to check expectations until we've established this is the packet we # sent. if args.expect_tos: if ip.tos != int(args.expect_tos[0]): print("Unexpected ToS value %d, expected %d" \ % (ip.tos, int(args.expect_tos[0]))) return False return True def check_ping6_request(args, packet): """ Verify that the packet matches what we'd have sent """ dst_ip = args.to[0] ip = packet.getlayer(sp.IPv6) if not ip: return False if ip.dst != dst_ip: return False icmp = packet.getlayer(sp.ICMPv6EchoRequest) if not icmp: return False if icmp.data != PAYLOAD_MAGIC: return False return True def check_ping_reply(args, packet): if args.ip6: return check_ping6_reply(args, packet) else: return check_ping4_reply(args, packet) def check_ping4_reply(args, packet): """ Check that this is a reply to the ping request we sent """ dst_ip = args.to[0] ip = packet.getlayer(sp.IP) if not ip: return False if ip.src != dst_ip: return False icmp = packet.getlayer(sp.ICMP) if not icmp: return False if sp.icmptypes[icmp.type] != 'echo-reply': return False raw = packet.getlayer(sp.Raw) if not raw: return False if raw.load != PAYLOAD_MAGIC: return False return True def check_ping6_reply(args, packet): """ Check that this is a reply to the ping request we sent """ dst_ip = args.to[0] ip = packet.getlayer(sp.IPv6) if not ip: return False if ip.src != dst_ip: return False icmp = packet.getlayer(sp.ICMPv6EchoReply) if not icmp: print("No echo reply!") return False if icmp.data != PAYLOAD_MAGIC: print("data mismatch") return False return True def ping(send_if, dst_ip, args): ether = sp.Ether() ip = sp.IP(dst=dst_ip) icmp = sp.ICMP(type='echo-request') raw = sp.raw(PAYLOAD_MAGIC) if args.send_tos: ip.tos = int(args.send_tos[0]) if args.fromaddr: ip.src = args.fromaddr[0] req = ether / ip / icmp / raw sp.sendp(req, iface=send_if, verbose=False) def ping6(send_if, dst_ip, args): ether = sp.Ether() ip6 = sp.IPv6(dst=dst_ip) icmp = sp.ICMPv6EchoRequest(data=sp.raw(PAYLOAD_MAGIC)) if args.fromaddr: ip.src = args.fromaddr[0] req = ether / ip6 / icmp sp.sendp(req, iface=send_if, verbose=False) def check_tcpsyn(args, packet): dst_ip = args.to[0] ip = packet.getlayer(sp.IP) if not ip: return False if ip.dst != dst_ip: return False tcp = packet.getlayer(sp.TCP) if not tcp: return False # Verify IP checksum chksum = ip.chksum ip.chksum = None new_chksum = sp.IP(sp.raw(ip)).chksum if chksum != new_chksum: print("Expected IP checksum %x but found %x\n" % (new_cshkum, chksum)) return False # Verify TCP checksum chksum = tcp.chksum packet_raw = sp.raw(packet) tcp.chksum = None newpacket = sp.Ether(sp.raw(packet[sp.Ether])) new_chksum = newpacket[sp.TCP].chksum if chksum != new_chksum: print("Expected TCP checksum %x but found %x\n" % (new_chksum, chksum)) return False return True def tcpsyn(send_if, dst_ip, args): opts=[('Timestamp', (1, 1)), ('MSS', 1280)] if args.tcpopt_unaligned: opts = [('NOP', 0 )] + opts ether = sp.Ether() ip = sp.IP(dst=dst_ip) tcp = sp.TCP(dport=666, flags='S', options=opts) req = ether / ip / tcp sp.sendp(req, iface=send_if, verbose=False) def main(): parser = argparse.ArgumentParser("pft_ping.py", description="Ping test tool") parser.add_argument('--sendif', nargs=1, required=True, help='The interface through which the packet(s) will be sent') parser.add_argument('--recvif', nargs=1, help='The interface on which to expect the ICMP echo request') parser.add_argument('--replyif', nargs=1, help='The interface on which to expect the ICMP echo response') parser.add_argument('--checkdup', nargs=1, help='The interface on which to expect the duplicated ICMP packets') parser.add_argument('--ip6', action='store_true', help='Use IPv6') parser.add_argument('--to', nargs=1, required=True, help='The destination IP address for the ICMP echo request') parser.add_argument('--fromaddr', nargs=1, help='The source IP address for the ICMP echo request') # TCP options parser.add_argument('--tcpsyn', action='store_true', help='Send a TCP SYN packet') parser.add_argument('--tcpopt_unaligned', action='store_true', help='Include unaligned TCP options') # Packet settings parser.add_argument('--send-tos', nargs=1, help='Set the ToS value for the transmitted packet') # Expectations parser.add_argument('--expect-tos', nargs=1, help='The expected ToS value in the received packet') args = parser.parse_args() # We may not have a default route. Tell scapy where to start looking for routes sp.conf.iface6 = args.sendif[0] sniffer = None if not args.recvif is None: checkfn=check_ping_request if args.tcpsyn: checkfn=check_tcpsyn sniffer = Sniffer(args, checkfn) replysniffer = None if not args.replyif is None: checkfn=check_ping_reply replysniffer = Sniffer(args, checkfn, recvif=args.replyif[0]) dupsniffer = None if args.checkdup is not None: dupsniffer = Sniffer(args, check_dup, recvif=args.checkdup[0]) if args.tcpsyn: tcpsyn(args.sendif[0], args.to[0], args) else: if args.ip6: ping6(args.sendif[0], args.to[0], args) else: ping(args.sendif[0], args.to[0], args) if dupsniffer: dupsniffer.join() if dup_found != 1: sys.exit(1) if sniffer: sniffer.join() if sniffer.foundCorrectPacket: sys.exit(0) else: sys.exit(1) if replysniffer: replysniffer.join() if replysniffer.foundCorrectPacket: sys.exit(0) else: sys.exit(1) if __name__ == '__main__': main() diff --git a/tests/sys/netpfil/pf/CVE-2019-5597.py b/tests/sys/netpfil/pf/CVE-2019-5597.py index 1050af506f8d..1b25809cbedf 100644 --- a/tests/sys/netpfil/pf/CVE-2019-5597.py +++ b/tests/sys/netpfil/pf/CVE-2019-5597.py @@ -1,61 +1,63 @@ #!/usr/bin/env python3 # # SPDX-License-Identifier: BSD-2-Clause-FreeBSD # # Copyright (c) 2019 Synacktiv # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. import random +import logging +logging.getLogger("scapy").setLevel(logging.CRITICAL) import scapy.all as sp import sys UDP_PROTO = 17 AH_PROTO = 51 FRAG_PROTO = 44 def main(): intf = sys.argv[1] ipv6_src = sys.argv[2] ipv6_dst = sys.argv[3] ipv6_main = sp.IPv6(dst=ipv6_dst, src=ipv6_src) padding = 8 fid = random.randint(0,100000) frag_0 = sp.IPv6ExtHdrFragment(id=fid, nh=UDP_PROTO, m=1, offset=0) foff_1 = (int)(padding/8) frag_1 = sp.IPv6ExtHdrFragment(id=fid, nh=UDP_PROTO, m=0, offset=foff_1) pkt1_opts = sp.AH(nh=AH_PROTO, payloadlen=200) \ / sp.Raw('XXXX' * 199) \ / sp.AH(nh=FRAG_PROTO, payloadlen=1) \ / frag_1 pkt0 = sp.Ether() / ipv6_main / frag_0 / sp.Raw('A' * padding) pkt1 = sp.Ether() / ipv6_main / pkt1_opts / sp.Raw('B' * padding) sp.sendp(pkt0, iface=intf, verbose=False) sp.sendp(pkt1, iface=intf, verbose=False) if __name__ == '__main__': main() diff --git a/tests/sys/netpfil/pf/CVE-2019-5598.py b/tests/sys/netpfil/pf/CVE-2019-5598.py index ac1e4f3438f5..603a1aef376f 100644 --- a/tests/sys/netpfil/pf/CVE-2019-5598.py +++ b/tests/sys/netpfil/pf/CVE-2019-5598.py @@ -1,90 +1,92 @@ #!/usr/bin/env python3 # # SPDX-License-Identifier: BSD-2-Clause-FreeBSD # # Copyright (c) 2019 Kristof Provost # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. import argparse +import logging +logging.getLogger("scapy").setLevel(logging.CRITICAL) import scapy.all as sp import sys from sniffer import Sniffer def check_icmp_error(args, packet): ip = packet.getlayer(sp.IP) if not ip: return False if ip.dst != args.to[0]: return False icmp = packet.getlayer(sp.ICMP) if not icmp: return False if icmp.type != 3 or icmp.code != 3: return False return True def main(): parser = argparse.ArgumentParser("CVE-2019-icmp.py", description="CVE-2019-icmp test tool") parser.add_argument('--sendif', nargs=1, required=True, help='The interface through which the packet will be sent') parser.add_argument('--recvif', nargs=1, required=True, help='The interface on which to check for the packet') parser.add_argument('--src', nargs=1, required=True, help='The source IP address') parser.add_argument('--to', nargs=1, required=True, help='The destination IP address') args = parser.parse_args() # Send the allowed packet to establish state udp = sp.Ether() / \ sp.IP(src=args.src[0], dst=args.to[0]) / \ sp.UDP(dport=53, sport=1234) sp.sendp(udp, iface=args.sendif[0], verbose=False) # Start sniffing on recvif sniffer = Sniffer(args, check_icmp_error) # Send the bad error packet icmp_reachable = sp.Ether() / \ sp.IP(src=args.src[0], dst=args.to[0]) / \ sp.ICMP(type=3, code=3) / \ sp.IP(src="4.3.2.1", dst="1.2.3.4") / \ sp.UDP(dport=53, sport=1234) sp.sendp(icmp_reachable, iface=args.sendif[0], verbose=False) sniffer.join() if sniffer.foundCorrectPacket: sys.exit(1) sys.exit(0) if __name__ == '__main__': main()