diff --git a/tests/atf_python/sys/net/tools.py b/tests/atf_python/sys/net/tools.py index c67941b414fc..23bb5f4b4128 100644 --- a/tests/atf_python/sys/net/tools.py +++ b/tests/atf_python/sys/net/tools.py @@ -1,73 +1,86 @@ #!/usr/local/bin/python3 import json import os import socket import time from ctypes import cdll from ctypes import get_errno from ctypes.util import find_library from typing import List from typing import Optional class ToolsHelper(object): NETSTAT_PATH = "/usr/bin/netstat" IFCONFIG_PATH = "/sbin/ifconfig" @classmethod def get_output(cls, cmd: str, verbose=False) -> str: if verbose: print("run: '{}'".format(cmd)) return os.popen(cmd).read() @classmethod def print_output(cls, cmd: str, verbose=True): if verbose: print("======= {} =====".format(cmd)) print(cls.get_output(cmd)) if verbose: print() @classmethod def print_net_debug(cls): cls.print_output("ifconfig") cls.print_output("netstat -rnW") @classmethod def set_sysctl(cls, oid, val): cls.get_output("sysctl {}={}".format(oid, val)) @classmethod def get_routes(cls, family: str, fibnum: int = 0): family_key = {"inet": "-4", "inet6": "-6"}.get(family) out = cls.get_output( - "{} {} -rn -F {} --libxo json".format(cls.NETSTAT_PATH, family_key, fibnum) + "{} {} -rnW -F {} --libxo json".format(cls.NETSTAT_PATH, family_key, fibnum) ) js = json.loads(out) js = js["statistics"]["route-information"]["route-table"]["rt-family"] if js: return js[0]["rt-entry"] else: return [] + @classmethod + def get_nhops(cls, family: str, fibnum: int = 0): + family_key = {"inet": "-4", "inet6": "-6"}.get(family) + out = cls.get_output( + "{} {} -onW -F {} --libxo json".format(cls.NETSTAT_PATH, family_key, fibnum) + ) + js = json.loads(out) + js = js["statistics"]["route-nhop-information"]["nhop-table"]["rt-family"] + if js: + return js[0]["nh-entry"] + else: + return [] + @classmethod def get_linklocals(cls): ret = {} ifname = None ips = [] for line in cls.get_output(cls.IFCONFIG_PATH).splitlines(): if line[0].isalnum(): if ifname: ret[ifname] = ips ips = [] ifname = line.split(":")[0] else: words = line.split() if words[0] == "inet6" and words[1].startswith("fe80"): # inet6 fe80::1%lo0 prefixlen 64 scopeid 0x2 ip = words[1].split("%")[0] scopeid = int(words[words.index("scopeid") + 1], 16) ips.append((ip, scopeid)) if ifname: ret[ifname] = ips return ret diff --git a/tests/atf_python/sys/net/vnet.py b/tests/atf_python/sys/net/vnet.py index 663f7695a0cc..0d9f969b28d9 100644 --- a/tests/atf_python/sys/net/vnet.py +++ b/tests/atf_python/sys/net/vnet.py @@ -1,460 +1,464 @@ #!/usr/local/bin/python3 import copy import ipaddress import os import socket import sys import time from ctypes import cdll from ctypes import get_errno from ctypes.util import find_library from multiprocessing import Pipe from multiprocessing import Process from typing import Dict from typing import List from typing import NamedTuple from typing import Optional from atf_python.sys.net.tools import ToolsHelper def run_cmd(cmd: str, verbose=True) -> str: print("run: '{}'".format(cmd)) return os.popen(cmd).read() def convert_test_name(test_name: str) -> str: """Convert test name to a string that can be used in the file/jail names""" ret = "" for char in test_name: if char.isalnum() or char in ("_", "-"): ret += char elif char in ("["): ret += "_" return ret class VnetInterface(object): # defines from net/if_types.h IFT_LOOP = 0x18 IFT_ETHER = 0x06 def __init__(self, iface_alias: str, iface_name: str): self.name = iface_name self.alias = iface_alias self.vnet_name = "" self.jailed = False self.addr_map: Dict[str, Dict] = {"inet6": {}, "inet": {}} self.prefixes4: List[List[str]] = [] self.prefixes6: List[List[str]] = [] if iface_name.startswith("lo"): self.iftype = self.IFT_LOOP else: self.iftype = self.IFT_ETHER @property def ifindex(self): return socket.if_nametoindex(self.name) @property def first_ipv6(self): d = self.addr_map["inet6"] return d[next(iter(d))] @property def first_ipv4(self): d = self.addr_map["inet"] return d[next(iter(d))] def set_vnet(self, vnet_name: str): self.vnet_name = vnet_name def set_jailed(self, jailed: bool): self.jailed = jailed def run_cmd( self, cmd, verbose=False, ): if self.vnet_name and not self.jailed: cmd = "jexec {} {}".format(self.vnet_name, cmd) return run_cmd(cmd, verbose) @classmethod def setup_loopback(cls, vnet_name: str): lo = VnetInterface("", "lo0") lo.set_vnet(vnet_name) lo.turn_up() @classmethod def create_iface(cls, alias_name: str, iface_name: str) -> List["VnetInterface"]: name = run_cmd("/sbin/ifconfig {} create".format(iface_name)).rstrip() if not name: raise Exception("Unable to create iface {}".format(iface_name)) ret = [cls(alias_name, name)] if name.startswith("epair"): ret.append(cls(alias_name, name[:-1] + "b")) return ret def setup_addr(self, _addr: str): addr = ipaddress.ip_interface(_addr) if addr.version == 6: family = "inet6" + cmd = "/sbin/ifconfig {} {} {}".format(self.name, family, addr) else: family = "inet" - cmd = "/sbin/ifconfig {} {} {}".format(self.name, family, addr) + if self.addr_map[family]: + cmd = "/sbin/ifconfig {} alias {}".format(self.name, addr) + else: + cmd = "/sbin/ifconfig {} {} {}".format(self.name, family, addr) self.run_cmd(cmd) - self.addr_map[family][str(addr)] = addr + self.addr_map[family][str(addr.ip)] = addr def delete_addr(self, _addr: str): addr = ipaddress.ip_address(_addr) if addr.version == 6: family = "inet6" cmd = "/sbin/ifconfig {} inet6 {} delete".format(self.name, addr) else: family = "inet" cmd = "/sbin/ifconfig {} -alias {}".format(self.name, addr) self.run_cmd(cmd) del self.addr_map[family][str(addr)] def turn_up(self): cmd = "/sbin/ifconfig {} up".format(self.name) self.run_cmd(cmd) def enable_ipv6(self): cmd = "/usr/sbin/ndp -i {} -disabled".format(self.name) self.run_cmd(cmd) def has_tentative(self) -> bool: """True if an interface has some addresses in tenative state""" cmd = "/sbin/ifconfig {} inet6".format(self.name) out = self.run_cmd(cmd, verbose=False) for line in out.splitlines(): if "tentative" in line: return True return False class IfaceFactory(object): INTERFACES_FNAME = "created_ifaces.lst" def __init__(self, test_name: str): self.test_name = test_name test_id = convert_test_name(test_name) self.file_name = self.INTERFACES_FNAME def _register_iface(self, iface_name: str): with open(self.file_name, "a") as f: f.write(iface_name + "\n") def create_iface(self, alias_name: str, iface_name: str) -> List[VnetInterface]: ifaces = VnetInterface.create_iface(alias_name, iface_name) for iface in ifaces: self._register_iface(iface.name) return ifaces def cleanup(self): try: with open(self.file_name, "r") as f: for line in f: run_cmd("/sbin/ifconfig {} destroy".format(line.strip())) os.unlink(self.INTERFACES_FNAME) except Exception: pass class VnetInstance(object): def __init__( self, vnet_alias: str, vnet_name: str, jid: int, ifaces: List[VnetInterface] ): self.name = vnet_name self.alias = vnet_alias # reference in the test topology self.jid = jid self.ifaces = ifaces self.iface_alias_map = {} # iface.alias: iface self.iface_map = {} # iface.name: iface for iface in ifaces: iface.set_vnet(vnet_name) iface.set_jailed(True) self.iface_alias_map[iface.alias] = iface self.iface_map[iface.name] = iface self.need_dad = False # Disable duplicate address detection by default self.attached = False self.pipe = None self.subprocess = None def run_vnet_cmd(self, cmd): if not self.attached: cmd = "jexec {} {}".format(self.name, cmd) return run_cmd(cmd) def disable_dad(self): self.run_vnet_cmd("/sbin/sysctl net.inet6.ip6.dad_count=0") def set_pipe(self, pipe): self.pipe = pipe def set_subprocess(self, p): self.subprocess = p @staticmethod def attach_jid(jid: int): _path: Optional[str] = find_library("c") if _path is None: raise Exception("libc not found") path: str = _path libc = cdll.LoadLibrary(path) if libc.jail_attach(jid) != 0: raise Exception("jail_attach() failed: errno {}".format(get_errno())) def attach(self): self.attach_jid(self.jid) self.attached = True class VnetFactory(object): JAILS_FNAME = "created_jails.lst" def __init__(self, test_name: str): self.test_name = test_name self.test_id = convert_test_name(test_name) self.file_name = self.JAILS_FNAME self._vnets: List[str] = [] def _register_vnet(self, vnet_name: str): self._vnets.append(vnet_name) with open(self.file_name, "a") as f: f.write(vnet_name + "\n") @staticmethod def _wait_interfaces(vnet_name: str, ifaces: List[str]) -> List[str]: cmd = "jexec {} /sbin/ifconfig -l".format(vnet_name) not_matched: List[str] = [] for i in range(50): vnet_ifaces = run_cmd(cmd).strip().split(" ") not_matched = [] for iface_name in ifaces: if iface_name not in vnet_ifaces: not_matched.append(iface_name) if len(not_matched) == 0: return [] time.sleep(0.1) return not_matched def create_vnet(self, vnet_alias: str, ifaces: List[VnetInterface]): vnet_name = "jail_{}".format(self.test_id) if self._vnets: # add number to distinguish jails vnet_name = "{}_{}".format(vnet_name, len(self._vnets) + 1) iface_cmds = " ".join(["vnet.interface={}".format(i.name) for i in ifaces]) cmd = "/usr/sbin/jail -i -c name={} persist vnet {}".format( vnet_name, iface_cmds ) jid_str = run_cmd(cmd) jid = int(jid_str) if jid <= 0: raise Exception("Jail creation failed, output: {}".format(jid)) self._register_vnet(vnet_name) # Run expedited version of routing VnetInterface.setup_loopback(vnet_name) not_found = self._wait_interfaces(vnet_name, [i.name for i in ifaces]) if not_found: raise Exception( "Interfaces {} has not appeared in vnet {}".format(not_found, vnet_name) ) return VnetInstance(vnet_alias, vnet_name, jid, ifaces) def cleanup(self): try: with open(self.file_name) as f: for line in f: jail_name = line.strip() ToolsHelper.print_output( "/usr/sbin/jexec {} ifconfig -l".format(jail_name) ) run_cmd("/usr/sbin/jail -r {}".format(line.strip())) os.unlink(self.JAILS_FNAME) except OSError: pass class SingleInterfaceMap(NamedTuple): ifaces: List[VnetInterface] vnet_aliases: List[str] class VnetTestTemplate(object): TOPOLOGY = {} def _get_vnet_handler(self, vnet_alias: str): handler_name = "{}_handler".format(vnet_alias) return getattr(self, handler_name, None) def _setup_vnet(self, vnet: VnetInstance, obj_map: Dict, pipe): """Base Handler to setup given VNET. Can be run in a subprocess. If so, passes control to the special vnetX_handler() after setting up interface addresses """ vnet.attach() print("# setup_vnet({})".format(vnet.name)) topo = obj_map["topo_map"] ipv6_ifaces = [] # Disable DAD if not vnet.need_dad: vnet.disable_dad() for iface in vnet.ifaces: # check index of vnet within an interface # as we have prefixes for both ends of the interface iface_map = obj_map["iface_map"][iface.alias] idx = iface_map.vnet_aliases.index(vnet.alias) prefixes6 = topo[iface.alias].get("prefixes6", []) prefixes4 = topo[iface.alias].get("prefixes4", []) if prefixes6 or prefixes4: ipv6_ifaces.append(iface) iface.turn_up() if prefixes6: iface.enable_ipv6() for prefix in prefixes6 + prefixes4: iface.setup_addr(prefix[idx]) for iface in ipv6_ifaces: while iface.has_tentative(): time.sleep(0.1) # Run actual handler handler = self._get_vnet_handler(vnet.alias) if handler: # Do unbuffered stdout for children # so the logs are present if the child hangs sys.stdout.reconfigure(line_buffering=True) handler(vnet, obj_map, pipe) def setup_topology(self, topo: Dict, test_name: str): """Creates jails & interfaces for the provided topology""" iface_map: Dict[str, SingleInterfaceMap] = {} vnet_map = {} iface_factory = IfaceFactory(test_name) vnet_factory = VnetFactory(test_name) for obj_name, obj_data in topo.items(): if obj_name.startswith("if"): epair_ifaces = iface_factory.create_iface(obj_name, "epair") smap = SingleInterfaceMap(epair_ifaces, []) iface_map[obj_name] = smap for obj_name, obj_data in topo.items(): if obj_name.startswith("vnet"): vnet_ifaces = [] for iface_alias in obj_data["ifaces"]: # epair creates 2 interfaces, grab first _available_ # and map it to the VNET being created idx = len(iface_map[iface_alias].vnet_aliases) iface_map[iface_alias].vnet_aliases.append(obj_name) vnet_ifaces.append(iface_map[iface_alias].ifaces[idx]) vnet = vnet_factory.create_vnet(obj_name, vnet_ifaces) vnet_map[obj_name] = vnet # Debug output print("============= TEST TOPOLOGY =============") for vnet_alias, vnet in vnet_map.items(): print("# vnet {} -> {}".format(vnet.alias, vnet.name), end="") handler = self._get_vnet_handler(vnet.alias) if handler: print(" handler: {}".format(handler.__name__), end="") print() for iface_alias, iface_data in iface_map.items(): vnets = iface_data.vnet_aliases ifaces: List[VnetInterface] = iface_data.ifaces if len(vnets) == 1 and len(ifaces) == 2: print( "# iface {}: {}::{} -> main::{}".format( iface_alias, vnets[0], ifaces[0].name, ifaces[1].name ) ) elif len(vnets) == 2 and len(ifaces) == 2: print( "# iface {}: {}::{} -> {}::{}".format( iface_alias, vnets[0], ifaces[0].name, vnets[1], ifaces[1].name ) ) else: print( "# iface {}: ifaces: {} vnets: {}".format( iface_alias, vnets, [i.name for i in ifaces] ) ) print() return {"iface_map": iface_map, "vnet_map": vnet_map, "topo_map": topo} def setup_method(self, method): """Sets up all the required topology and handlers for the given test""" # 'test_ip6_output.py::TestIP6Output::test_output6_pktinfo[ipandif] (setup)' test_id = os.environ.get("PYTEST_CURRENT_TEST").split(" ")[0] test_name = test_id.split("::")[-1] topology = self.TOPOLOGY # First, setup kernel objects - interfaces & vnets obj_map = self.setup_topology(topology, test_name) main_vnet = None # one without subprocess handler for vnet_alias, vnet in obj_map["vnet_map"].items(): if self._get_vnet_handler(vnet_alias): # Need subprocess to run parent_pipe, child_pipe = Pipe() p = Process( target=self._setup_vnet, args=( vnet, obj_map, child_pipe, ), ) vnet.set_pipe(parent_pipe) vnet.set_subprocess(p) p.start() else: if main_vnet is not None: raise Exception("there can be only 1 VNET w/o handler") main_vnet = vnet # Main vnet needs to be the last, so all the other subprocesses # are started & their pipe handles collected self.vnet = main_vnet self._setup_vnet(main_vnet, obj_map, None) # Save state for the main handler self.iface_map = obj_map["iface_map"] self.vnet_map = obj_map["vnet_map"] def cleanup(self, test_id: str): # pytest test id: file::class::test_name test_name = test_id.split("::")[-1] print("==== vnet cleanup ===") print("# test_name: '{}'".format(test_name)) VnetFactory(test_name).cleanup() IfaceFactory(test_name).cleanup() def wait_object(self, pipe, timeout=5): if pipe.poll(timeout): return pipe.recv() raise TimeoutError @property def curvnet(self): pass class SingleVnetTestTemplate(VnetTestTemplate): IPV6_PREFIXES: List[str] = [] IPV4_PREFIXES: List[str] = [] def setup_method(self, method): topology = copy.deepcopy( { "vnet1": {"ifaces": ["if1"]}, "if1": {"prefixes4": [], "prefixes6": []}, } ) for prefix in self.IPV6_PREFIXES: topology["if1"]["prefixes6"].append((prefix,)) for prefix in self.IPV4_PREFIXES: topology["if1"]["prefixes4"].append((prefix,)) self.TOPOLOGY = topology super().setup_method(method) diff --git a/tests/sys/net/routing/Makefile b/tests/sys/net/routing/Makefile index d71ba828f958..45034ff211b1 100644 --- a/tests/sys/net/routing/Makefile +++ b/tests/sys/net/routing/Makefile @@ -1,21 +1,22 @@ # $FreeBSD$ PACKAGE= tests WARNS?= 1 TESTSDIR= ${TESTSBASE}/sys/net/routing ATF_TESTS_C += test_rtsock_l3 ATF_TESTS_C += test_rtsock_lladdr +ATF_TESTS_PYTEST += test_routing_l3.py ATF_TESTS_PYTEST += test_rtsock_multipath.py ${PACKAGE}FILES+= generic_cleanup.sh ${PACKAGE}FILESMODE_generic_cleanup.sh=0555 # Most of the tests operates on a common IPv4/IPv6 prefix, # so running them in parallel will lead to weird results. TEST_METADATA+= is_exclusive=true CFLAGS+= -I${.CURDIR:H:H:H} .include diff --git a/tests/sys/net/routing/test_routing_l3.py b/tests/sys/net/routing/test_routing_l3.py new file mode 100755 index 000000000000..74017ae0459c --- /dev/null +++ b/tests/sys/net/routing/test_routing_l3.py @@ -0,0 +1,81 @@ +import ipaddress + +import pytest +from atf_python.sys.net.tools import ToolsHelper +from atf_python.sys.net.vnet import VnetTestTemplate + + +class TestIfOps(VnetTestTemplate): + TOPOLOGY = { + "vnet1": {"ifaces": ["if1", "if2"]}, + "if1": {"prefixes4": [], "prefixes6": []}, + "if2": {"prefixes4": [], "prefixes6": []}, + } + + @pytest.mark.parametrize("family", ["inet", "inet6"]) + @pytest.mark.require_user("root") + def test_change_prefix_route(self, family): + """Tests that prefix route changes to the new one upon addr deletion""" + vnet = self.vnet_map["vnet1"] + first_iface = vnet.iface_alias_map["if1"] + second_iface = vnet.iface_alias_map["if2"] + if family == "inet": + first_addr = ipaddress.ip_interface("192.0.2.1/24") + second_addr = ipaddress.ip_interface("192.0.2.2/24") + else: + first_addr = ipaddress.ip_interface("2001:db8::1/64") + second_addr = ipaddress.ip_interface("2001:db8::2/64") + + first_iface.setup_addr(str(first_addr)) + second_iface.setup_addr(str(second_addr)) + + # At this time prefix should be pointing to the first interface + routes = ToolsHelper.get_routes(family) + px = [r for r in routes if r["destination"] == str(first_addr.network)][0] + assert px["interface-name"] == first_iface.name + + # Now delete address from the first interface and verify switchover + first_iface.delete_addr(first_addr.ip) + + routes = ToolsHelper.get_routes(family) + px = [r for r in routes if r["destination"] == str(first_addr.network)][0] + assert px["interface-name"] == second_iface.name + + @pytest.mark.parametrize( + "family", + [ + "inet", + pytest.param("inet6", marks=pytest.mark.xfail(reason="currently fails")), + ], + ) + @pytest.mark.require_user("root") + def test_change_prefix_route_same_iface(self, family): + """Tests that prefix route changes to the new ifa upon addr deletion""" + vnet = self.vnet_map["vnet1"] + first_iface = vnet.iface_alias_map["if1"] + + if family == "inet": + first_addr = ipaddress.ip_interface("192.0.2.1/24") + second_addr = ipaddress.ip_interface("192.0.2.2/24") + else: + first_addr = ipaddress.ip_interface("2001:db8::1/64") + second_addr = ipaddress.ip_interface("2001:db8::2/64") + + first_iface.setup_addr(str(first_addr)) + first_iface.setup_addr(str(second_addr)) + + # At this time prefix should be pointing to the first interface + routes = ToolsHelper.get_routes(family) + px = [r for r in routes if r["destination"] == str(first_addr.network)][0] + assert px["interface-name"] == first_iface.name + + # Now delete address from the first interface and verify switchover + first_iface.delete_addr(str(first_addr.ip)) + + routes = ToolsHelper.get_routes(family) + px = [r for r in routes if r["destination"] == str(first_addr.network)][0] + nhop_kidx = px["nhop"] + assert px["interface-name"] == first_iface.name + nhops = ToolsHelper.get_nhops(family) + nh = [nh for nh in nhops if nh["index"] == nhop_kidx][0] + assert nh["ifa"] == str(second_addr.ip)