diff --git a/tests/atf_python/sys/net/vnet.py b/tests/atf_python/sys/net/vnet.py index 68c8ce4e0cba..7afb5c721bf3 100644 --- a/tests/atf_python/sys/net/vnet.py +++ b/tests/atf_python/sys/net/vnet.py @@ -1,550 +1,554 @@ #!/usr/local/bin/python3 import copy import ipaddress import os import re import socket import sys import time from multiprocessing import connection from multiprocessing import Pipe from multiprocessing import Process from typing import Dict from typing import List from typing import NamedTuple from atf_python.sys.net.tools import ToolsHelper from atf_python.utils import BaseTest from atf_python.utils import libc def run_cmd(cmd: str, verbose=True) -> str: if verbose: print("run: '{}'".format(cmd)) return os.popen(cmd).read() def get_topology_id(test_id: str) -> str: """ Gets a unique topology id based on the pytest test_id. "test_ip6_output.py::TestIP6Output::test_output6_pktinfo[ipandif]" -> "TestIP6Output:test_output6_pktinfo[ipandif]" """ return ":".join(test_id.split("::")[-2:]) def convert_test_name(test_name: str) -> str: """Convert test name to a string that can be used in the file/jail names""" ret = "" for char in test_name: if char.isalnum() or char in ("_", "-", ":"): ret += char elif char in ("["): ret += "_" return ret class VnetInterface(object): # defines from net/if_types.h IFT_LOOP = 0x18 IFT_ETHER = 0x06 def __init__(self, iface_alias: str, iface_name: str): self.name = iface_name self.alias = iface_alias self.vnet_name = "" self.jailed = False self.addr_map: Dict[str, Dict] = {"inet6": {}, "inet": {}} self.prefixes4: List[List[str]] = [] self.prefixes6: List[List[str]] = [] if iface_name.startswith("lo"): self.iftype = self.IFT_LOOP else: self.iftype = self.IFT_ETHER + self.ether = ToolsHelper.get_output("/sbin/ifconfig %s ether | awk '/ether/ { print $2; }'" % iface_name).rstrip() @property def ifindex(self): return socket.if_nametoindex(self.name) @property def first_ipv6(self): d = self.addr_map["inet6"] return d[next(iter(d))] @property def first_ipv4(self): d = self.addr_map["inet"] return d[next(iter(d))] def set_vnet(self, vnet_name: str): self.vnet_name = vnet_name def set_jailed(self, jailed: bool): self.jailed = jailed def run_cmd(self, cmd, verbose=False): if self.vnet_name and not self.jailed: cmd = "/usr/sbin/jexec {} {}".format(self.vnet_name, cmd) return run_cmd(cmd, verbose) @classmethod def setup_loopback(cls, vnet_name: str): lo = VnetInterface("", "lo0") lo.set_vnet(vnet_name) lo.setup_addr("127.0.0.1/8") lo.turn_up() @classmethod def create_iface(cls, alias_name: str, iface_name: str) -> List["VnetInterface"]: name = run_cmd("/sbin/ifconfig {} create".format(iface_name)).rstrip() if not name: raise Exception("Unable to create iface {}".format(iface_name)) - ret = [cls(alias_name, name)] + if1 = cls(alias_name, name) + ret = [if1] if name.startswith("epair"): - ret.append(cls(alias_name, name[:-1] + "b")) + if2 = cls(alias_name, name[:-1] + "b") + if1.epairb = if2 + ret.append(if2); return ret def setup_addr(self, _addr: str): addr = ipaddress.ip_interface(_addr) if addr.version == 6: family = "inet6" cmd = "/sbin/ifconfig {} {} {}".format(self.name, family, addr) else: family = "inet" if self.addr_map[family]: cmd = "/sbin/ifconfig {} alias {}".format(self.name, addr) else: cmd = "/sbin/ifconfig {} {} {}".format(self.name, family, addr) self.run_cmd(cmd) self.addr_map[family][str(addr.ip)] = addr def delete_addr(self, _addr: str): addr = ipaddress.ip_address(_addr) if addr.version == 6: family = "inet6" cmd = "/sbin/ifconfig {} inet6 {} delete".format(self.name, addr) else: family = "inet" cmd = "/sbin/ifconfig {} -alias {}".format(self.name, addr) self.run_cmd(cmd) del self.addr_map[family][str(addr)] def turn_up(self): cmd = "/sbin/ifconfig {} up".format(self.name) self.run_cmd(cmd) def enable_ipv6(self): cmd = "/usr/sbin/ndp -i {} -- -disabled".format(self.name) self.run_cmd(cmd) def has_tentative(self) -> bool: """True if an interface has some addresses in tenative state""" cmd = "/sbin/ifconfig {} inet6".format(self.name) out = self.run_cmd(cmd, verbose=False) for line in out.splitlines(): if "tentative" in line: return True return False class IfaceFactory(object): INTERFACES_FNAME = "created_ifaces.lst" AUTODELETE_TYPES = ("epair", "gif", "gre", "lo", "tap", "tun") def __init__(self): self.file_name = self.INTERFACES_FNAME def _register_iface(self, iface_name: str): with open(self.file_name, "a") as f: f.write(iface_name + "\n") def _list_ifaces(self) -> List[str]: ret: List[str] = [] try: with open(self.file_name, "r") as f: for line in f: ret.append(line.strip()) except OSError: pass return ret def create_iface(self, alias_name: str, iface_name: str) -> List[VnetInterface]: ifaces = VnetInterface.create_iface(alias_name, iface_name) for iface in ifaces: if not self.is_autodeleted(iface.name): self._register_iface(iface.name) return ifaces @staticmethod def is_autodeleted(iface_name: str) -> bool: if iface_name == "lo0": return False iface_type = re.split(r"\d+", iface_name)[0] return iface_type in IfaceFactory.AUTODELETE_TYPES def cleanup_vnet_interfaces(self, vnet_name: str) -> List[str]: """Destroys""" ifaces_lst = ToolsHelper.get_output( "/usr/sbin/jexec {} /sbin/ifconfig -l".format(vnet_name) ) for iface_name in ifaces_lst.split(): if not self.is_autodeleted(iface_name): if iface_name not in self._list_ifaces(): print("Skipping interface {}:{}".format(vnet_name, iface_name)) continue run_cmd( "/usr/sbin/jexec {} /sbin/ifconfig {} destroy".format(vnet_name, iface_name) ) def cleanup(self): try: os.unlink(self.INTERFACES_FNAME) except OSError: pass class VnetInstance(object): def __init__( self, vnet_alias: str, vnet_name: str, jid: int, ifaces: List[VnetInterface] ): self.name = vnet_name self.alias = vnet_alias # reference in the test topology self.jid = jid self.ifaces = ifaces self.iface_alias_map = {} # iface.alias: iface self.iface_map = {} # iface.name: iface for iface in ifaces: iface.set_vnet(vnet_name) iface.set_jailed(True) self.iface_alias_map[iface.alias] = iface self.iface_map[iface.name] = iface # Allow reference to interfce aliases as attributes setattr(self, iface.alias, iface) self.need_dad = False # Disable duplicate address detection by default self.attached = False self.pipe = None self.subprocess = None def run_vnet_cmd(self, cmd, verbose=True): if not self.attached: cmd = "/usr/sbin/jexec {} {}".format(self.name, cmd) return run_cmd(cmd, verbose) def disable_dad(self): self.run_vnet_cmd("/sbin/sysctl net.inet6.ip6.dad_count=0") def set_pipe(self, pipe): self.pipe = pipe def set_subprocess(self, p): self.subprocess = p @staticmethod def attach_jid(jid: int): error_code = libc.jail_attach(jid) if error_code != 0: raise Exception("jail_attach() failed: errno {}".format(error_code)) def attach(self): self.attach_jid(self.jid) self.attached = True class VnetFactory(object): JAILS_FNAME = "created_jails.lst" def __init__(self, topology_id: str): self.topology_id = topology_id self.file_name = self.JAILS_FNAME self._vnets: List[str] = [] def _register_vnet(self, vnet_name: str): self._vnets.append(vnet_name) with open(self.file_name, "a") as f: f.write(vnet_name + "\n") @staticmethod def _wait_interfaces(vnet_name: str, ifaces: List[str]) -> List[str]: cmd = "/usr/sbin/jexec {} /sbin/ifconfig -l".format(vnet_name) not_matched: List[str] = [] for i in range(50): vnet_ifaces = run_cmd(cmd).strip().split(" ") not_matched = [] for iface_name in ifaces: if iface_name not in vnet_ifaces: not_matched.append(iface_name) if len(not_matched) == 0: return [] time.sleep(0.1) return not_matched def create_vnet(self, vnet_alias: str, ifaces: List[VnetInterface]): vnet_name = "pytest:{}".format(convert_test_name(self.topology_id)) if self._vnets: # add number to distinguish jails vnet_name = "{}_{}".format(vnet_name, len(self._vnets) + 1) iface_cmds = " ".join(["vnet.interface={}".format(i.name) for i in ifaces]) cmd = "/usr/sbin/jail -i -c name={} persist vnet {}".format( vnet_name, iface_cmds ) jid = 0 try: jid_str = run_cmd(cmd) jid = int(jid_str) except ValueError: print("Jail creation failed, output: {}".format(jid_str)) raise self._register_vnet(vnet_name) # Run expedited version of routing VnetInterface.setup_loopback(vnet_name) not_found = self._wait_interfaces(vnet_name, [i.name for i in ifaces]) if not_found: raise Exception( "Interfaces {} has not appeared in vnet {}".format(not_found, vnet_name) ) return VnetInstance(vnet_alias, vnet_name, jid, ifaces) def cleanup(self): iface_factory = IfaceFactory() try: with open(self.file_name) as f: for line in f: vnet_name = line.strip() iface_factory.cleanup_vnet_interfaces(vnet_name) run_cmd("/usr/sbin/jail -r {}".format(vnet_name)) os.unlink(self.JAILS_FNAME) except OSError: pass class SingleInterfaceMap(NamedTuple): ifaces: List[VnetInterface] vnet_aliases: List[str] class ObjectsMap(NamedTuple): iface_map: Dict[str, SingleInterfaceMap] # keyed by ifX vnet_map: Dict[str, VnetInstance] # keyed by vnetX topo_map: Dict # self.TOPOLOGY class VnetTestTemplate(BaseTest): NEED_ROOT: bool = True TOPOLOGY = {} def _require_default_modules(self): libc.kldload("if_epair.ko") self.require_module("if_epair") def _get_vnet_handler(self, vnet_alias: str): handler_name = "{}_handler".format(vnet_alias) return getattr(self, handler_name, None) def _setup_vnet(self, vnet: VnetInstance, obj_map: Dict, pipe): """Base Handler to setup given VNET. Can be run in a subprocess. If so, passes control to the special vnetX_handler() after setting up interface addresses """ vnet.attach() print("# setup_vnet({})".format(vnet.name)) if pipe is not None: vnet.set_pipe(pipe) topo = obj_map.topo_map ipv6_ifaces = [] # Disable DAD if not vnet.need_dad: vnet.disable_dad() for iface in vnet.ifaces: # check index of vnet within an interface # as we have prefixes for both ends of the interface iface_map = obj_map.iface_map[iface.alias] idx = iface_map.vnet_aliases.index(vnet.alias) prefixes6 = topo[iface.alias].get("prefixes6", []) prefixes4 = topo[iface.alias].get("prefixes4", []) if prefixes6 or prefixes4: ipv6_ifaces.append(iface) iface.turn_up() if prefixes6: iface.enable_ipv6() for prefix in prefixes6 + prefixes4: if prefix[idx]: iface.setup_addr(prefix[idx]) for iface in ipv6_ifaces: while iface.has_tentative(): time.sleep(0.1) # Run actual handler handler = self._get_vnet_handler(vnet.alias) if handler: # Do unbuffered stdout for children # so the logs are present if the child hangs sys.stdout.reconfigure(line_buffering=True) self.drop_privileges() handler(vnet) def _get_topo_ifmap(self, topo: Dict): iface_factory = IfaceFactory() iface_map: Dict[str, SingleInterfaceMap] = {} iface_aliases = set() for obj_name, obj_data in topo.items(): if obj_name.startswith("vnet"): for iface_alias in obj_data["ifaces"]: iface_aliases.add(iface_alias) for iface_alias in iface_aliases: print("Creating {}".format(iface_alias)) iface_data = topo[iface_alias] iface_type = iface_data.get("type", "epair") ifaces = iface_factory.create_iface(iface_alias, iface_type) smap = SingleInterfaceMap(ifaces, []) iface_map[iface_alias] = smap return iface_map def setup_topology(self, topo: Dict, topology_id: str): """Creates jails & interfaces for the provided topology""" vnet_map = {} vnet_factory = VnetFactory(topology_id) iface_map = self._get_topo_ifmap(topo) for obj_name, obj_data in topo.items(): if obj_name.startswith("vnet"): vnet_ifaces = [] for iface_alias in obj_data["ifaces"]: # epair creates 2 interfaces, grab first _available_ # and map it to the VNET being created idx = len(iface_map[iface_alias].vnet_aliases) iface_map[iface_alias].vnet_aliases.append(obj_name) vnet_ifaces.append(iface_map[iface_alias].ifaces[idx]) vnet = vnet_factory.create_vnet(obj_name, vnet_ifaces) vnet_map[obj_name] = vnet # Allow reference to VNETs as attributes setattr(self, obj_name, vnet) # Debug output print("============= TEST TOPOLOGY =============") for vnet_alias, vnet in vnet_map.items(): print("# vnet {} -> {}".format(vnet.alias, vnet.name), end="") handler = self._get_vnet_handler(vnet.alias) if handler: print(" handler: {}".format(handler.__name__), end="") print() for iface_alias, iface_data in iface_map.items(): vnets = iface_data.vnet_aliases ifaces: List[VnetInterface] = iface_data.ifaces if len(vnets) == 1 and len(ifaces) == 2: print( "# iface {}: {}::{} -> main::{}".format( iface_alias, vnets[0], ifaces[0].name, ifaces[1].name ) ) elif len(vnets) == 2 and len(ifaces) == 2: print( "# iface {}: {}::{} -> {}::{}".format( iface_alias, vnets[0], ifaces[0].name, vnets[1], ifaces[1].name ) ) else: print( "# iface {}: ifaces: {} vnets: {}".format( iface_alias, vnets, [i.name for i in ifaces] ) ) print() return ObjectsMap(iface_map, vnet_map, topo) def setup_method(self, _method): """Sets up all the required topology and handlers for the given test""" super().setup_method(_method) self._require_default_modules() # TestIP6Output.test_output6_pktinfo[ipandif] topology_id = get_topology_id(self.test_id) topology = self.TOPOLOGY # First, setup kernel objects - interfaces & vnets obj_map = self.setup_topology(topology, topology_id) main_vnet = None # one without subprocess handler for vnet_alias, vnet in obj_map.vnet_map.items(): if self._get_vnet_handler(vnet_alias): # Need subprocess to run parent_pipe, child_pipe = Pipe() p = Process( target=self._setup_vnet, args=( vnet, obj_map, child_pipe, ), ) vnet.set_pipe(parent_pipe) vnet.set_subprocess(p) p.start() else: if main_vnet is not None: raise Exception("there can be only 1 VNET w/o handler") main_vnet = vnet # Main vnet needs to be the last, so all the other subprocesses # are started & their pipe handles collected self.vnet = main_vnet self._setup_vnet(main_vnet, obj_map, None) # Save state for the main handler self.iface_map = obj_map.iface_map self.vnet_map = obj_map.vnet_map self.drop_privileges() def cleanup(self, test_id: str): # pytest test id: file::class::test_name topology_id = get_topology_id(self.test_id) print("============= vnet cleanup =============") print("# topology_id: '{}'".format(topology_id)) VnetFactory(topology_id).cleanup() IfaceFactory().cleanup() def wait_object(self, pipe, timeout=5): if pipe.poll(timeout): return pipe.recv() raise TimeoutError def wait_objects_any(self, pipe_list, timeout=5): objects = connection.wait(pipe_list, timeout) if objects: return objects[0].recv() raise TimeoutError def send_object(self, pipe, obj): pipe.send(obj) def wait(self): while True: time.sleep(1) @property def curvnet(self): pass class SingleVnetTestTemplate(VnetTestTemplate): IPV6_PREFIXES: List[str] = [] IPV4_PREFIXES: List[str] = [] IFTYPE = "epair" def _setup_default_topology(self): topology = copy.deepcopy( { "vnet1": {"ifaces": ["if1"]}, "if1": {"type": self.IFTYPE, "prefixes4": [], "prefixes6": []}, } ) for prefix in self.IPV6_PREFIXES: topology["if1"]["prefixes6"].append((prefix,)) for prefix in self.IPV4_PREFIXES: topology["if1"]["prefixes4"].append((prefix,)) return topology def setup_method(self, method): if not getattr(self, "TOPOLOGY", None): self.TOPOLOGY = self._setup_default_topology() else: names = self.TOPOLOGY.keys() assert len([n for n in names if n.startswith("vnet")]) == 1 super().setup_method(method) diff --git a/tests/sys/netpfil/pf/header.py b/tests/sys/netpfil/pf/header.py index 6832cfe6d42b..a5e36bc85d14 100644 --- a/tests/sys/netpfil/pf/header.py +++ b/tests/sys/netpfil/pf/header.py @@ -1,217 +1,216 @@ # # SPDX-License-Identifier: BSD-2-Clause # # Copyright (c) 2025 Rubicon Communications, LLC (Netgate) # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. import pytest import re from utils import DelayedSend from atf_python.sys.net.tools import ToolsHelper from atf_python.sys.net.vnet import VnetTestTemplate class TestHeader(VnetTestTemplate): REQUIRED_MODULES = [ "pf" ] TOPOLOGY = { "vnet1": {"ifaces": ["if1", "if2"]}, "vnet2": {"ifaces": ["if1", "if2"]}, "if1": {"prefixes4": [("192.0.2.2/24", "192.0.2.1/24")]}, "if2": {"prefixes4": [("198.51.100.1/24", "198.51.100.2/24")]}, } def vnet2_handler(self, vnet): ToolsHelper.print_output("/sbin/sysctl net.inet.ip.forwarding=1") ToolsHelper.print_output("/usr/sbin/arp -s 198.51.100.3 00:01:02:03:04:05") ToolsHelper.print_output("/sbin/pfctl -e") ToolsHelper.print_output("/sbin/pfctl -x loud") ToolsHelper.pf_rules([ "pass", ]) @pytest.mark.require_user("root") @pytest.mark.require_progs(["scapy"]) def test_too_many(self): "Verify that we drop packets with silly numbers of headers." - sendif = self.vnet.iface_alias_map["if1"].name + sendif = self.vnet.iface_alias_map["if1"] recvif = self.vnet.iface_alias_map["if2"].name - gw_mac = ToolsHelper.get_output("/sbin/ifconfig %s ether | awk '/ether/ { print $2; }'" % sendif) - gw_mac = re.sub("0a$", "0b", gw_mac) + gw_mac = sendif.epairb.ether ToolsHelper.print_output("/sbin/route add default 192.0.2.1") # Import in the correct vnet, so at to not confuse Scapy import scapy.all as sp # Sanity check, ensure we get replies to normal ping pkt = sp.Ether(dst=gw_mac) \ / sp.IP(dst="198.51.100.3") \ / sp.ICMP(type='echo-request') - s = DelayedSend(pkt, sendif) + s = DelayedSend(pkt, sendif.name) reply = sp.sniff(iface=recvif, timeout=3) print(reply) found = False for r in reply: r.show() icmp = r.getlayer(sp.ICMP) if not icmp: continue assert icmp.type == 8 # 'echo-request' found = True assert found # Up to 19 AH headers will pass pkt = sp.Ether(dst=gw_mac) \ / sp.IP(dst="198.51.100.3") for i in range(0, 18): pkt = pkt / sp.AH(nh=51, payloadlen=1) pkt = pkt / sp.AH(nh=1, payloadlen=1) / sp.ICMP(type='echo-request') - s = DelayedSend(pkt, sendif) + s = DelayedSend(pkt, sendif.name) reply = sp.sniff(iface=recvif, timeout=3) print(reply) found = False for r in reply: r.show() ah = r.getlayer(sp.AH) if not ah: continue found = True assert found # But more will get dropped pkt = sp.Ether(dst=gw_mac) \ / sp.IP(dst="198.51.100.3") for i in range(0, 19): pkt = pkt / sp.AH(nh=51, payloadlen=1) pkt = pkt / sp.AH(nh=1, payloadlen=1) / sp.ICMP(type='echo-request') - s = DelayedSend(pkt, sendif) + s = DelayedSend(pkt, sendif.name) reply = sp.sniff(iface=recvif, timeout=3) print(reply) found = False for r in reply: r.show() ah = r.getlayer(sp.AH) if not ah: continue found = True assert not found class TestHeader6(VnetTestTemplate): REQUIRED_MODULES = [ "pf" ] SKIP_MODULES = [ "ipfilter" ] TOPOLOGY = { "vnet1": {"ifaces": ["if1", "if2"]}, "vnet2": {"ifaces": ["if1", "if2"]}, "if1": {"prefixes6": [("2001:db8::2/64", "2001:db8::1/64")]}, "if2": {"prefixes6": [("2001:db8:1::2/64", "2001:db8:1::1/64")]}, } def vnet2_handler(self, vnet): ToolsHelper.print_output("/sbin/sysctl net.inet6.ip6.forwarding=1") ToolsHelper.print_output("/usr/sbin/ndp -s 2001:db8:1::3 00:01:02:03:04:05") ToolsHelper.print_output("/sbin/pfctl -e") ToolsHelper.print_output("/sbin/pfctl -x loud") ToolsHelper.pf_rules([ "pass", ]) @pytest.mark.require_user("root") @pytest.mark.require_progs(["scapy"]) def test_too_many(self): "Verify that we drop packets with silly numbers of headers." ToolsHelper.print_output("/sbin/ifconfig") - sendif = self.vnet.iface_alias_map["if1"].name + sendif = self.vnet.iface_alias_map["if1"] recvif = self.vnet.iface_alias_map["if2"].name - our_mac = ToolsHelper.get_output("/sbin/ifconfig %s ether | awk '/ether/ { print $2; }'" % sendif) - gw_mac = re.sub("0a$", "0b", our_mac) + our_mac = sendif.ether + gw_mac = sendif.epairb.ether ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") # Import in the correct vnet, so at to not confuse Scapy import scapy.all as sp # Sanity check, ensure we get replies to normal ping pkt = sp.Ether(src=our_mac, dst=gw_mac) \ / sp.IPv6(src="2001:db8::2", dst="2001:db8:1::3") \ / sp.ICMPv6EchoRequest() - s = DelayedSend(pkt, sendif) + s = DelayedSend(pkt, sendif.name) reply = sp.sniff(iface=recvif, timeout=3) print(reply) found = False for r in reply: r.show() icmp = r.getlayer(sp.ICMPv6EchoRequest) if not icmp: continue found = True assert found # Up to 19 AH headers will pass pkt = sp.Ether(src=our_mac, dst=gw_mac) \ / sp.IPv6(src="2001:db8::2", dst="2001:db8:1::3") for i in range(0, 18): pkt = pkt / sp.AH(nh=51, payloadlen=1) pkt = pkt / sp.AH(nh=58, payloadlen=1) / sp.ICMPv6EchoRequest() - s = DelayedSend(pkt, sendif) + s = DelayedSend(pkt, sendif.name) reply = sp.sniff(iface=recvif, timeout=3) print(reply) found = False for r in reply: r.show() ah = r.getlayer(sp.AH) if not ah: continue found = True assert found # But more will get dropped pkt = sp.Ether(src=our_mac, dst=gw_mac) \ / sp.IPv6(src="2001:db8::2", dst="2001:db8:1::3") for i in range(0, 19): pkt = pkt / sp.AH(nh=51, payloadlen=1) pkt = pkt / sp.AH(nh=58, payloadlen=1) / sp.ICMPv6EchoRequest() - s = DelayedSend(pkt, sendif) + s = DelayedSend(pkt, sendif.name) reply = sp.sniff(iface=recvif, timeout=3) print(reply) found = False for r in reply: r.show() ah = r.getlayer(sp.AH) if not ah: continue found = True assert not found diff --git a/tests/sys/netpfil/pf/icmp.py b/tests/sys/netpfil/pf/icmp.py index 83096886691e..59f2e8190b30 100644 --- a/tests/sys/netpfil/pf/icmp.py +++ b/tests/sys/netpfil/pf/icmp.py @@ -1,254 +1,254 @@ # # SPDX-License-Identifier: BSD-2-Clause # # Copyright (c) 2024 Rubicon Communications, LLC (Netgate) # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions # are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF # SUCH DAMAGE. import pytest import subprocess import re from atf_python.sys.net.tools import ToolsHelper from atf_python.sys.net.vnet import VnetTestTemplate def check(cmd): ps = subprocess.Popen(cmd, shell=True) ret = ps.wait() if ret != 0: raise Exception("Command %s returned %d" % (cmd, ret)) class TestICMP(VnetTestTemplate): REQUIRED_MODULES = [ "pf" ] TOPOLOGY = { "vnet1": {"ifaces": ["if1"]}, "vnet2": {"ifaces": ["if1", "if2"]}, "vnet3": {"ifaces": ["if2"]}, "if1": {"prefixes4": [("192.0.2.2/24", "192.0.2.1/24")]}, "if2": {"prefixes4": [("198.51.100.1/24", "198.51.100.2/24")]}, } def vnet2_handler(self, vnet): ifname = vnet.iface_alias_map["if1"].name if2name = vnet.iface_alias_map["if2"].name ToolsHelper.print_output("/sbin/pfctl -e") ToolsHelper.pf_rules([ "set reassemble yes", "set state-policy if-bound", "block", "pass inet proto icmp icmp-type echoreq", ]) ToolsHelper.print_output("/sbin/sysctl net.inet.ip.forwarding=1") ToolsHelper.print_output("/sbin/pfctl -x loud") ToolsHelper.print_output("/sbin/ifconfig %s mtu 1492" % if2name) def vnet3_handler(self, vnet): # Import in the correct vnet, so at to not confuse Scapy import scapy.all as sp ifname = vnet.iface_alias_map["if2"].name ToolsHelper.print_output("/sbin/route add default 198.51.100.1") ToolsHelper.print_output("/sbin/ifconfig %s inet alias 198.51.100.3/24" % ifname) ToolsHelper.print_output("/sbin/ifconfig %s mtu 1492" % ifname) def checkfn(packet): icmp = packet.getlayer(sp.ICMP) if not icmp: return False if icmp.type != 3: return False packet.show() return True sp.sniff(iface=ifname, stop_filter=checkfn) vnet.pipe.send("Got ICMP destination unreachable packet") @pytest.mark.require_user("root") @pytest.mark.require_progs(["scapy"]) def test_inner_match(self): vnet = self.vnet_map["vnet1"] dst_vnet = self.vnet_map["vnet3"] - sendif = vnet.iface_alias_map["if1"].name + sendif = vnet.iface_alias_map["if1"] - our_mac = ToolsHelper.get_output("/sbin/ifconfig %s ether | awk '/ether/ { print $2; }'" % sendif) - dst_mac = re.sub("0a$", "0b", our_mac) + our_mac = sendif.ether + dst_mac = sendif.epairb.ether # Import in the correct vnet, so at to not confuse Scapy import scapy.all as sp ToolsHelper.print_output("/sbin/route add default 192.0.2.1") # Sanity check check("/sbin/ping -c 1 192.0.2.1") check("/sbin/ping -c 1 198.51.100.1") check("/sbin/ping -c 2 198.51.100.3") # Establish a state pkt = sp.Ether(src=our_mac, dst=dst_mac) \ / sp.IP(src="192.0.2.2", dst="198.51.100.2") \ / sp.ICMP(type='echo-request') \ / "PAYLOAD" - sp.sendp(pkt, sendif, verbose=False) + sp.sendp(pkt, sendif.name, verbose=False) # Now try to pass an ICMP error message piggy-backing on that state, but # use a different source address pkt = sp.Ether(src=our_mac, dst=dst_mac) \ / sp.IP(src="192.0.2.2", dst="198.51.100.3") \ / sp.ICMP(type='dest-unreach') \ / sp.IP(src="198.51.100.2", dst="192.0.2.2") \ / sp.ICMP(type='echo-reply') - sp.sendp(pkt, sendif, verbose=False) + sp.sendp(pkt, sendif.name, verbose=False) try: rcvd = self.wait_object(dst_vnet.pipe, timeout=1) if rcvd: raise Exception(rcvd) except TimeoutError as e: # We expect the timeout here. It means we didn't get the destination # unreachable packet in vnet3 pass def check_icmp_echo(self, sp, payload_size): packet = sp.IP(dst="198.51.100.2", flags="DF") \ / sp.ICMP(type='echo-request') \ / sp.raw(bytes.fromhex('f0') * payload_size) p = sp.sr1(packet, iface=self.vnet.iface_alias_map["if1"].name, timeout=3) p.show() ip = p.getlayer(sp.IP) icmp = p.getlayer(sp.ICMP) assert ip assert icmp if payload_size > 1464: # Expect ICMP destination unreachable, fragmentation needed assert ip.src == "192.0.2.1" assert ip.dst == "192.0.2.2" assert icmp.type == 3 # dest-unreach assert icmp.code == 4 assert icmp.nexthopmtu == 1492 else: # Expect echo reply assert ip.src == "198.51.100.2" assert ip.dst == "192.0.2.2" assert icmp.type == 0 # "echo-reply" assert icmp.code == 0 return @pytest.mark.require_user("root") @pytest.mark.require_progs(["scapy"]) def test_fragmentation_needed(self): ToolsHelper.print_output("/sbin/route add default 192.0.2.1") ToolsHelper.print_output("/sbin/ping -c 1 198.51.100.2") ToolsHelper.print_output("/sbin/ping -c 1 -D -s 1472 198.51.100.2") # Import in the correct vnet, so at to not confuse Scapy import scapy.all as sp self.check_icmp_echo(sp, 128) self.check_icmp_echo(sp, 1464) self.check_icmp_echo(sp, 1468) class TestICMP_NAT(VnetTestTemplate): REQUIRED_MODULES = [ "pf" ] TOPOLOGY = { "vnet1": {"ifaces": ["if1"]}, "vnet2": {"ifaces": ["if1", "if2"]}, "vnet3": {"ifaces": ["if2"]}, "if1": {"prefixes4": [("192.0.2.2/24", "192.0.2.1/24")]}, "if2": {"prefixes4": [("198.51.100.1/24", "198.51.100.2/24")]}, } def vnet2_handler(self, vnet): ifname = vnet.iface_alias_map["if1"].name if2name = vnet.iface_alias_map["if2"].name ToolsHelper.print_output("/sbin/pfctl -e") ToolsHelper.pf_rules([ "set reassemble yes", "set state-policy if-bound", "nat on %s inet from 192.0.2.0/24 to any -> (%s)" % (if2name, if2name), "block", "pass inet proto icmp icmp-type echoreq", ]) ToolsHelper.print_output("/sbin/sysctl net.inet.ip.forwarding=1") ToolsHelper.print_output("/sbin/pfctl -x loud") def vnet3_handler(self, vnet): ifname = vnet.iface_alias_map["if2"].name ToolsHelper.print_output("/sbin/ifconfig %s inet alias 198.51.100.3/24" % ifname) @pytest.mark.require_user("root") @pytest.mark.require_progs(["scapy"]) def test_id_conflict(self): """ Test ICMP echo requests with the same ID from different clients. Windows does this, and it can confuse pf. """ ifname = self.vnet.iface_alias_map["if1"].name ToolsHelper.print_output("/sbin/route add default 192.0.2.1") ToolsHelper.print_output("/sbin/ifconfig %s inet alias 192.0.2.3/24" % ifname) ToolsHelper.print_output("/sbin/ping -c 1 192.0.2.1") ToolsHelper.print_output("/sbin/ping -c 1 198.51.100.1") ToolsHelper.print_output("/sbin/ping -c 1 198.51.100.2") ToolsHelper.print_output("/sbin/ping -c 1 198.51.100.3") # Import in the correct vnet, so at to not confuse Scapy import scapy.all as sp # Address one packet = sp.IP(src="192.0.2.2", dst="198.51.100.2", flags="DF") \ / sp.ICMP(type='echo-request', id=42) \ / sp.raw(bytes.fromhex('f0') * 16) p = sp.sr1(packet, timeout=3) p.show() ip = p.getlayer(sp.IP) icmp = p.getlayer(sp.ICMP) assert ip assert icmp assert ip.dst == "192.0.2.2" assert icmp.id == 42 # Address one packet = sp.IP(src="192.0.2.3", dst="198.51.100.2", flags="DF") \ / sp.ICMP(type='echo-request', id=42) \ / sp.raw(bytes.fromhex('f0') * 16) p = sp.sr1(packet, timeout=3) p.show() ip = p.getlayer(sp.IP) icmp = p.getlayer(sp.ICMP) assert ip assert icmp assert ip.dst == "192.0.2.3" assert icmp.id == 42