diff --git a/tests/atf_python/sys/net/vnet.py b/tests/atf_python/sys/net/vnet.py index faae58e95b6f..aca1b53d388c 100644 --- a/tests/atf_python/sys/net/vnet.py +++ b/tests/atf_python/sys/net/vnet.py @@ -1,458 +1,478 @@ #!/usr/local/bin/python3 import copy import ipaddress import os import socket import sys import time from multiprocessing import Pipe from multiprocessing import Process from typing import Dict from typing import List from typing import NamedTuple from atf_python.sys.net.tools import ToolsHelper -from atf_python.utils import libc, BaseTest +from atf_python.utils import BaseTest +from atf_python.utils import libc def run_cmd(cmd: str, verbose=True) -> str: print("run: '{}'".format(cmd)) return os.popen(cmd).read() +def get_topology_id(test_id: str) -> str: + """ + Gets a unique topology id based on the pytest test_id. + "test_ip6_output.py::TestIP6Output::test_output6_pktinfo[ipandif]" -> + "TestIP6Output:test_output6_pktinfo[ipandif]" + """ + return ":".join(test_id.split("::")[-2:]) + + def convert_test_name(test_name: str) -> str: """Convert test name to a string that can be used in the file/jail names""" ret = "" for char in test_name: - if char.isalnum() or char in ("_", "-"): + if char.isalnum() or char in ("_", "-", ":"): ret += char elif char in ("["): ret += "_" return ret class VnetInterface(object): # defines from net/if_types.h IFT_LOOP = 0x18 IFT_ETHER = 0x06 def __init__(self, iface_alias: str, iface_name: str): self.name = iface_name self.alias = iface_alias self.vnet_name = "" self.jailed = False self.addr_map: Dict[str, Dict] = {"inet6": {}, "inet": {}} self.prefixes4: List[List[str]] = [] self.prefixes6: List[List[str]] = [] if iface_name.startswith("lo"): self.iftype = self.IFT_LOOP else: self.iftype = self.IFT_ETHER @property def ifindex(self): return socket.if_nametoindex(self.name) @property def first_ipv6(self): d = self.addr_map["inet6"] return d[next(iter(d))] @property def first_ipv4(self): d = self.addr_map["inet"] return d[next(iter(d))] def set_vnet(self, vnet_name: str): self.vnet_name = vnet_name def set_jailed(self, jailed: bool): self.jailed = jailed def run_cmd( self, cmd, verbose=False, ): if self.vnet_name and not self.jailed: cmd = "jexec {} {}".format(self.vnet_name, cmd) return run_cmd(cmd, verbose) @classmethod def setup_loopback(cls, vnet_name: str): lo = VnetInterface("", "lo0") lo.set_vnet(vnet_name) lo.turn_up() @classmethod def create_iface(cls, alias_name: str, iface_name: str) -> List["VnetInterface"]: name = run_cmd("/sbin/ifconfig {} create".format(iface_name)).rstrip() if not name: raise Exception("Unable to create iface {}".format(iface_name)) ret = [cls(alias_name, name)] if name.startswith("epair"): ret.append(cls(alias_name, name[:-1] + "b")) return ret def setup_addr(self, _addr: str): addr = ipaddress.ip_interface(_addr) if addr.version == 6: family = "inet6" cmd = "/sbin/ifconfig {} {} {}".format(self.name, family, addr) else: family = "inet" if self.addr_map[family]: cmd = "/sbin/ifconfig {} alias {}".format(self.name, addr) else: cmd = "/sbin/ifconfig {} {} {}".format(self.name, family, addr) self.run_cmd(cmd) self.addr_map[family][str(addr.ip)] = addr def delete_addr(self, _addr: str): addr = ipaddress.ip_address(_addr) if addr.version == 6: family = "inet6" cmd = "/sbin/ifconfig {} inet6 {} delete".format(self.name, addr) else: family = "inet" cmd = "/sbin/ifconfig {} -alias {}".format(self.name, addr) self.run_cmd(cmd) del self.addr_map[family][str(addr)] def turn_up(self): cmd = "/sbin/ifconfig {} up".format(self.name) self.run_cmd(cmd) def enable_ipv6(self): cmd = "/usr/sbin/ndp -i {} -disabled".format(self.name) self.run_cmd(cmd) def has_tentative(self) -> bool: """True if an interface has some addresses in tenative state""" cmd = "/sbin/ifconfig {} inet6".format(self.name) out = self.run_cmd(cmd, verbose=False) for line in out.splitlines(): if "tentative" in line: return True return False class IfaceFactory(object): INTERFACES_FNAME = "created_ifaces.lst" - def __init__(self, test_name: str): - self.test_name = test_name - self.test_id = convert_test_name(test_name) + def __init__(self): self.file_name = self.INTERFACES_FNAME def _register_iface(self, iface_name: str): with open(self.file_name, "a") as f: f.write(iface_name + "\n") def create_iface(self, alias_name: str, iface_name: str) -> List[VnetInterface]: ifaces = VnetInterface.create_iface(alias_name, iface_name) for iface in ifaces: self._register_iface(iface.name) return ifaces def cleanup(self): try: with open(self.file_name, "r") as f: for line in f: run_cmd("/sbin/ifconfig {} destroy".format(line.strip())) os.unlink(self.INTERFACES_FNAME) except Exception: pass class VnetInstance(object): def __init__( self, vnet_alias: str, vnet_name: str, jid: int, ifaces: List[VnetInterface] ): self.name = vnet_name self.alias = vnet_alias # reference in the test topology self.jid = jid self.ifaces = ifaces self.iface_alias_map = {} # iface.alias: iface self.iface_map = {} # iface.name: iface for iface in ifaces: iface.set_vnet(vnet_name) iface.set_jailed(True) self.iface_alias_map[iface.alias] = iface self.iface_map[iface.name] = iface self.need_dad = False # Disable duplicate address detection by default self.attached = False self.pipe = None self.subprocess = None def run_vnet_cmd(self, cmd): if not self.attached: cmd = "jexec {} {}".format(self.name, cmd) return run_cmd(cmd) def disable_dad(self): self.run_vnet_cmd("/sbin/sysctl net.inet6.ip6.dad_count=0") def set_pipe(self, pipe): self.pipe = pipe def set_subprocess(self, p): self.subprocess = p @staticmethod def attach_jid(jid: int): error_code = libc.jail_attach(jid) if error_code != 0: raise Exception("jail_attach() failed: errno {}".format(error_code)) def attach(self): self.attach_jid(self.jid) self.attached = True class VnetFactory(object): JAILS_FNAME = "created_jails.lst" - def __init__(self, test_name: str): - self.test_name = test_name - self.test_id = convert_test_name(test_name) + def __init__(self, topology_id: str): + self.topology_id = topology_id self.file_name = self.JAILS_FNAME self._vnets: List[str] = [] def _register_vnet(self, vnet_name: str): self._vnets.append(vnet_name) with open(self.file_name, "a") as f: f.write(vnet_name + "\n") @staticmethod def _wait_interfaces(vnet_name: str, ifaces: List[str]) -> List[str]: cmd = "jexec {} /sbin/ifconfig -l".format(vnet_name) not_matched: List[str] = [] for i in range(50): vnet_ifaces = run_cmd(cmd).strip().split(" ") not_matched = [] for iface_name in ifaces: if iface_name not in vnet_ifaces: not_matched.append(iface_name) if len(not_matched) == 0: return [] time.sleep(0.1) return not_matched def create_vnet(self, vnet_alias: str, ifaces: List[VnetInterface]): - vnet_name = "jail_{}".format(self.test_id) + vnet_name = "pytest:{}".format(convert_test_name(self.topology_id)) if self._vnets: # add number to distinguish jails vnet_name = "{}_{}".format(vnet_name, len(self._vnets) + 1) iface_cmds = " ".join(["vnet.interface={}".format(i.name) for i in ifaces]) cmd = "/usr/sbin/jail -i -c name={} persist vnet {}".format( vnet_name, iface_cmds ) - jid_str = run_cmd(cmd) - jid = int(jid_str) - if jid <= 0: - raise Exception("Jail creation failed, output: {}".format(jid)) + jid = 0 + try: + jid_str = run_cmd(cmd) + jid = int(jid_str) + except ValueError as e: + print("Jail creation failed, output: {}".format(jid_str)) + raise self._register_vnet(vnet_name) # Run expedited version of routing VnetInterface.setup_loopback(vnet_name) not_found = self._wait_interfaces(vnet_name, [i.name for i in ifaces]) if not_found: raise Exception( "Interfaces {} has not appeared in vnet {}".format(not_found, vnet_name) ) return VnetInstance(vnet_alias, vnet_name, jid, ifaces) def cleanup(self): try: with open(self.file_name) as f: for line in f: - jail_name = line.strip() + vnet_name = line.strip() ToolsHelper.print_output( - "/usr/sbin/jexec {} ifconfig -l".format(jail_name) + "/usr/sbin/jexec {} ifconfig -l".format(vnet_name) ) - run_cmd("/usr/sbin/jail -r {}".format(line.strip())) + run_cmd("/usr/sbin/jail -r {}".format(vnet_name)) os.unlink(self.JAILS_FNAME) except OSError: pass class SingleInterfaceMap(NamedTuple): ifaces: List[VnetInterface] vnet_aliases: List[str] +class ObjectsMap(NamedTuple): + iface_map: Dict[str, SingleInterfaceMap] # keyed by ifX + vnet_map: Dict[str, VnetInstance] # keyed by vnetX + topo_map: Dict # self.TOPOLOGY + + class VnetTestTemplate(BaseTest): TOPOLOGY = {} def _get_vnet_handler(self, vnet_alias: str): handler_name = "{}_handler".format(vnet_alias) return getattr(self, handler_name, None) def _setup_vnet(self, vnet: VnetInstance, obj_map: Dict, pipe): """Base Handler to setup given VNET. Can be run in a subprocess. If so, passes control to the special vnetX_handler() after setting up interface addresses """ vnet.attach() print("# setup_vnet({})".format(vnet.name)) + if pipe is not None: + vnet.set_pipe(pipe) - topo = obj_map["topo_map"] + topo = obj_map.topo_map ipv6_ifaces = [] # Disable DAD if not vnet.need_dad: vnet.disable_dad() for iface in vnet.ifaces: # check index of vnet within an interface # as we have prefixes for both ends of the interface - iface_map = obj_map["iface_map"][iface.alias] + iface_map = obj_map.iface_map[iface.alias] idx = iface_map.vnet_aliases.index(vnet.alias) prefixes6 = topo[iface.alias].get("prefixes6", []) prefixes4 = topo[iface.alias].get("prefixes4", []) if prefixes6 or prefixes4: ipv6_ifaces.append(iface) iface.turn_up() if prefixes6: iface.enable_ipv6() for prefix in prefixes6 + prefixes4: iface.setup_addr(prefix[idx]) for iface in ipv6_ifaces: while iface.has_tentative(): time.sleep(0.1) # Run actual handler handler = self._get_vnet_handler(vnet.alias) if handler: # Do unbuffered stdout for children # so the logs are present if the child hangs sys.stdout.reconfigure(line_buffering=True) - handler(vnet, obj_map, pipe) + handler(vnet) - def setup_topology(self, topo: Dict, test_name: str): + def setup_topology(self, topo: Dict, topology_id: str): """Creates jails & interfaces for the provided topology""" iface_map: Dict[str, SingleInterfaceMap] = {} vnet_map = {} - iface_factory = IfaceFactory(test_name) - vnet_factory = VnetFactory(test_name) + iface_factory = IfaceFactory() + vnet_factory = VnetFactory(topology_id) for obj_name, obj_data in topo.items(): if obj_name.startswith("if"): epair_ifaces = iface_factory.create_iface(obj_name, "epair") smap = SingleInterfaceMap(epair_ifaces, []) iface_map[obj_name] = smap for obj_name, obj_data in topo.items(): if obj_name.startswith("vnet"): vnet_ifaces = [] for iface_alias in obj_data["ifaces"]: # epair creates 2 interfaces, grab first _available_ # and map it to the VNET being created idx = len(iface_map[iface_alias].vnet_aliases) iface_map[iface_alias].vnet_aliases.append(obj_name) vnet_ifaces.append(iface_map[iface_alias].ifaces[idx]) vnet = vnet_factory.create_vnet(obj_name, vnet_ifaces) vnet_map[obj_name] = vnet # Debug output print("============= TEST TOPOLOGY =============") for vnet_alias, vnet in vnet_map.items(): print("# vnet {} -> {}".format(vnet.alias, vnet.name), end="") handler = self._get_vnet_handler(vnet.alias) if handler: print(" handler: {}".format(handler.__name__), end="") print() for iface_alias, iface_data in iface_map.items(): vnets = iface_data.vnet_aliases ifaces: List[VnetInterface] = iface_data.ifaces if len(vnets) == 1 and len(ifaces) == 2: print( "# iface {}: {}::{} -> main::{}".format( iface_alias, vnets[0], ifaces[0].name, ifaces[1].name ) ) elif len(vnets) == 2 and len(ifaces) == 2: print( "# iface {}: {}::{} -> {}::{}".format( iface_alias, vnets[0], ifaces[0].name, vnets[1], ifaces[1].name ) ) else: print( "# iface {}: ifaces: {} vnets: {}".format( iface_alias, vnets, [i.name for i in ifaces] ) ) print() - return {"iface_map": iface_map, "vnet_map": vnet_map, "topo_map": topo} + return ObjectsMap(iface_map, vnet_map, topo) - def setup_method(self, method): + def setup_method(self, _method): """Sets up all the required topology and handlers for the given test""" - # 'test_ip6_output.py::TestIP6Output::test_output6_pktinfo[ipandif] (setup)' - test_id = os.environ.get("PYTEST_CURRENT_TEST").split(" ")[0] - test_name = test_id.split("::")[-1] - self.check_constraints() + super().setup_method(_method) + # TestIP6Output.test_output6_pktinfo[ipandif] + topology_id = get_topology_id(self.test_id) topology = self.TOPOLOGY # First, setup kernel objects - interfaces & vnets - obj_map = self.setup_topology(topology, test_name) + obj_map = self.setup_topology(topology, topology_id) main_vnet = None # one without subprocess handler - for vnet_alias, vnet in obj_map["vnet_map"].items(): + for vnet_alias, vnet in obj_map.vnet_map.items(): if self._get_vnet_handler(vnet_alias): # Need subprocess to run parent_pipe, child_pipe = Pipe() p = Process( target=self._setup_vnet, args=( vnet, obj_map, child_pipe, ), ) vnet.set_pipe(parent_pipe) vnet.set_subprocess(p) p.start() else: if main_vnet is not None: raise Exception("there can be only 1 VNET w/o handler") main_vnet = vnet # Main vnet needs to be the last, so all the other subprocesses # are started & their pipe handles collected self.vnet = main_vnet self._setup_vnet(main_vnet, obj_map, None) # Save state for the main handler - self.iface_map = obj_map["iface_map"] - self.vnet_map = obj_map["vnet_map"] + self.iface_map = obj_map.iface_map + self.vnet_map = obj_map.vnet_map def cleanup(self, test_id: str): # pytest test id: file::class::test_name - test_name = test_id.split("::")[-1] + topology_id = get_topology_id(self.test_id) print("==== vnet cleanup ===") - print("# test_name: '{}'".format(test_name)) - VnetFactory(test_name).cleanup() - IfaceFactory(test_name).cleanup() + print("# topology_id: '{}'".format(topology_id)) + VnetFactory(topology_id).cleanup() + IfaceFactory().cleanup() def wait_object(self, pipe, timeout=5): if pipe.poll(timeout): return pipe.recv() raise TimeoutError + def send_object(self, pipe, obj): + pipe.send(obj) + @property def curvnet(self): pass class SingleVnetTestTemplate(VnetTestTemplate): IPV6_PREFIXES: List[str] = [] IPV4_PREFIXES: List[str] = [] def setup_method(self, method): topology = copy.deepcopy( { "vnet1": {"ifaces": ["if1"]}, "if1": {"prefixes4": [], "prefixes6": []}, } ) for prefix in self.IPV6_PREFIXES: topology["if1"]["prefixes6"].append((prefix,)) for prefix in self.IPV4_PREFIXES: topology["if1"]["prefixes4"].append((prefix,)) self.TOPOLOGY = topology super().setup_method(method) diff --git a/tests/atf_python/utils.py b/tests/atf_python/utils.py index 12cd56c10149..17824262b1fd 100644 --- a/tests/atf_python/utils.py +++ b/tests/atf_python/utils.py @@ -1,46 +1,56 @@ #!/usr/bin/env python3 import os from ctypes import CDLL from ctypes import get_errno from ctypes.util import find_library from typing import List from typing import Optional import pytest class LibCWrapper(object): def __init__(self): path: Optional[str] = find_library("c") if path is None: raise RuntimeError("libc not found") self._libc = CDLL(path, use_errno=True) def modfind(self, mod_name: str) -> int: if self._libc.modfind(bytes(mod_name, encoding="ascii")) == -1: return get_errno() return 0 def jail_attach(self, jid: int) -> int: if self._libc.jail_attach(jid) != 0: return get_errno() return 0 libc = LibCWrapper() class BaseTest(object): REQUIRED_MODULES: List[str] = [] def _check_modules(self): for mod_name in self.REQUIRED_MODULES: error_code = libc.modfind(mod_name) if error_code != 0: err_str = os.strerror(error_code) pytest.skip( "kernel module '{}' not available: {}".format(mod_name, err_str) ) - def check_constraints(self): + @property + def test_id(self): + # 'test_ip6_output.py::TestIP6Output::test_output6_pktinfo[ipandif] (setup)' + return os.environ.get("PYTEST_CURRENT_TEST").split(" ")[0] + + def setup_method(self, method): + """Run all pre-requisits for the test execution""" self._check_modules() + + def cleanup(self, test_id: str): + """Cleanup all test resources here""" + pass diff --git a/tests/sys/netinet6/test_ip6_output.py b/tests/sys/netinet6/test_ip6_output.py index 112423698cdf..1166c4c1bb88 100644 --- a/tests/sys/netinet6/test_ip6_output.py +++ b/tests/sys/netinet6/test_ip6_output.py @@ -1,540 +1,540 @@ import errno import ipaddress import socket import struct import time from ctypes import c_byte from ctypes import c_uint from ctypes import Structure import pytest from atf_python.sys.net.rtsock import SaHelper from atf_python.sys.net.tools import ToolsHelper from atf_python.sys.net.vnet import run_cmd from atf_python.sys.net.vnet import SingleVnetTestTemplate from atf_python.sys.net.vnet import VnetTestTemplate class In6Pktinfo(Structure): _fields_ = [ ("ipi6_addr", c_byte * 16), ("ipi6_ifindex", c_uint), ] class VerboseSocketServer: def __init__(self, ip: str, port: int, ifname: str = None): self.ip = ip self.port = port s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP) s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_RECVPKTINFO, 1) addr = ipaddress.ip_address(ip) if addr.is_link_local and ifname: ifindex = socket.if_nametoindex(ifname) addr_tuple = (ip, port, 0, ifindex) elif addr.is_multicast and ifname: ifindex = socket.if_nametoindex(ifname) mreq = socket.inet_pton(socket.AF_INET6, ip) + struct.pack("I", ifindex) s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq) print("## JOINED group {} % {}".format(ip, ifname)) addr_tuple = ("::", port, 0, ifindex) else: addr_tuple = (ip, port, 0, 0) print("## Listening on [{}]:{}".format(addr_tuple[0], port)) s.bind(addr_tuple) self.socket = s def recv(self): # data = self.socket.recv(4096) # print("RX: " + data) data, ancdata, msg_flags, address = self.socket.recvmsg(4096, 128) # Assume ancdata has just 1 item info = In6Pktinfo.from_buffer_copy(ancdata[0][2]) dst_ip = socket.inet_ntop(socket.AF_INET6, info.ipi6_addr) dst_iface = socket.if_indextoname(info.ipi6_ifindex) tx_obj = { "data": data, "src_ip": address[0], "dst_ip": dst_ip, "dst_iface": dst_iface, } return tx_obj class BaseTestIP6Ouput(VnetTestTemplate): TOPOLOGY = { "vnet1": {"ifaces": ["if1", "if2", "if3"]}, "vnet2": {"ifaces": ["if1", "if2", "if3"]}, "if1": {"prefixes6": [("2001:db8:a::1/64", "2001:db8:a::2/64")]}, "if2": {"prefixes6": [("2001:db8:b::1/64", "2001:db8:b::2/64")]}, "if3": {"prefixes6": [("2001:db8:c::1/64", "2001:db8:c::2/64")]}, } DEFAULT_PORT = 45365 - def _vnet2_handler(self, vnet, obj_map, pipe, ip: str, os_ifname: str = None): + def _vnet2_handler(self, vnet, ip: str, os_ifname: str = None): """Generic listener that sends first received packet with metadata back to the sender via pipw """ ll_data = ToolsHelper.get_linklocals() # Start listener ss = VerboseSocketServer(ip, self.DEFAULT_PORT, os_ifname) - pipe.send(ll_data) + vnet.pipe.send(ll_data) tx_obj = ss.recv() tx_obj["dst_iface_alias"] = vnet.iface_map[tx_obj["dst_iface"]].alias - pipe.send(tx_obj) + vnet.pipe.send(tx_obj) class TestIP6Output(BaseTestIP6Ouput): - def vnet2_handler(self, vnet, obj_map, pipe): + def vnet2_handler(self, vnet): ip = str(vnet.iface_alias_map["if2"].first_ipv6.ip) - self._vnet2_handler(vnet, obj_map, pipe, ip, None) + self._vnet2_handler(vnet, ip, None) @pytest.mark.require_user("root") def test_output6_base(self): """Tests simple UDP output""" second_vnet = self.vnet_map["vnet2"] # Pick target on if2 vnet2's end ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1]) ip = str(ifaddr.ip) s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP) data = bytes("AAAA", "utf-8") print("## TX packet to {},{}".format(ip, self.DEFAULT_PORT)) # Wait for the child to become ready self.wait_object(second_vnet.pipe) s.sendto(data, (ip, self.DEFAULT_PORT)) # Wait for the received object rx_obj = self.wait_object(second_vnet.pipe) assert rx_obj["dst_ip"] == ip assert rx_obj["dst_iface_alias"] == "if2" @pytest.mark.require_user("root") def test_output6_nhop(self): """Tests UDP output with custom nhop set""" second_vnet = self.vnet_map["vnet2"] # Pick target on if2 vnet2's end ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1]) ip_dst = str(ifaddr.ip) # Pick nexthop on if1 ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if1"]["prefixes6"][0][1]) ip_next = str(ifaddr.ip) sin6_next = SaHelper.ip6_sa(ip_next, 0) s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, 0) s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_NEXTHOP, sin6_next) # Wait for the child to become ready self.wait_object(second_vnet.pipe) data = bytes("AAAA", "utf-8") s.sendto(data, (ip_dst, self.DEFAULT_PORT)) # Wait for the received object rx_obj = self.wait_object(second_vnet.pipe) assert rx_obj["dst_ip"] == ip_dst assert rx_obj["dst_iface_alias"] == "if1" @pytest.mark.parametrize( "params", [ # esrc: src-ip, if: src-interface, esrc: expected-src, # eif: expected-rx-interface pytest.param({"esrc": "2001:db8:b::1", "eif": "if2"}, id="empty"), pytest.param( {"src": "2001:db8:c::1", "esrc": "2001:db8:c::1", "eif": "if2"}, id="iponly1", ), pytest.param( { "src": "2001:db8:c::1", "if": "if3", "ex": errno.EHOSTUNREACH, }, id="ipandif", ), pytest.param( { "src": "2001:db8:c::aaaa", "ex": errno.EADDRNOTAVAIL, }, id="nolocalip", ), pytest.param( {"if": "if2", "src": "2001:db8:b::1", "eif": "if2"}, id="ifsame" ), ], ) @pytest.mark.require_user("root") def test_output6_pktinfo(self, params): """Tests simple UDP output""" second_vnet = self.vnet_map["vnet2"] vnet = self.vnet # Pick target on if2 vnet2's end ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1]) dst_ip = str(ifaddr.ip) src_ip = params.get("src", "") src_ifname = params.get("if", "") expected_ip = params.get("esrc", "") expected_ifname = params.get("eif", "") errno = params.get("ex", 0) pktinfo = In6Pktinfo() if src_ip: for i, b in enumerate(socket.inet_pton(socket.AF_INET6, src_ip)): pktinfo.ipi6_addr[i] = b if src_ifname: os_ifname = vnet.iface_alias_map[src_ifname].name pktinfo.ipi6_ifindex = socket.if_nametoindex(os_ifname) # Wait for the child to become ready self.wait_object(second_vnet.pipe) data = bytes("AAAA", "utf-8") s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, 0) try: s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_PKTINFO, bytes(pktinfo)) aux = (socket.IPPROTO_IPV6, socket.IPV6_PKTINFO, bytes(pktinfo)) s.sendto(data, (dst_ip, self.DEFAULT_PORT)) except OSError as e: if not errno: raise assert e.errno == errno print("Correctly raised {}".format(e)) return # Wait for the received object rx_obj = self.wait_object(second_vnet.pipe) assert rx_obj["dst_ip"] == dst_ip if expected_ip: assert rx_obj["src_ip"] == expected_ip if expected_ifname: assert rx_obj["dst_iface_alias"] == expected_ifname class TestIP6OutputLL(BaseTestIP6Ouput): - def vnet2_handler(self, vnet, obj_map, pipe): + def vnet2_handler(self, vnet): """Generic listener that sends first received packet with metadata back to the sender via pipw """ os_ifname = vnet.iface_alias_map["if2"].name ll_data = ToolsHelper.get_linklocals() ll_ip, _ = ll_data[os_ifname][0] - self._vnet2_handler(vnet, obj_map, pipe, ll_ip, os_ifname) + self._vnet2_handler(vnet, ll_ip, os_ifname) @pytest.mark.require_user("root") def test_output6_linklocal(self): """Tests simple UDP output""" second_vnet = self.vnet_map["vnet2"] # Wait for the child to become ready ll_data = self.wait_object(second_vnet.pipe) # Pick LL address on if2 vnet2's end ip, _ = ll_data[second_vnet.iface_alias_map["if2"].name][0] # Get local interface scope os_ifname = self.vnet.iface_alias_map["if2"].name scopeid = socket.if_nametoindex(os_ifname) s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP) data = bytes("AAAA", "utf-8") target = (ip, self.DEFAULT_PORT, 0, scopeid) print("## TX packet to {}%{},{}".format(ip, scopeid, target[1])) s.sendto(data, target) # Wait for the received object rx_obj = self.wait_object(second_vnet.pipe) assert rx_obj["dst_ip"] == ip assert rx_obj["dst_iface_alias"] == "if2" class TestIP6OutputNhopLL(BaseTestIP6Ouput): - def vnet2_handler(self, vnet, obj_map, pipe): + def vnet2_handler(self, vnet): """Generic listener that sends first received packet with metadata back to the sender via pipw """ ip = str(vnet.iface_alias_map["if2"].first_ipv6.ip) - self._vnet2_handler(vnet, obj_map, pipe, ip, None) + self._vnet2_handler(vnet, ip, None) @pytest.mark.require_user("root") def test_output6_nhop_linklocal(self): """Tests UDP output with custom link-local nhop set""" second_vnet = self.vnet_map["vnet2"] # Wait for the child to become ready ll_data = self.wait_object(second_vnet.pipe) # Pick target on if2 vnet2's end ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1]) ip_dst = str(ifaddr.ip) # Pick nexthop on if1 ip_next, _ = ll_data[second_vnet.iface_alias_map["if1"].name][0] # Get local interfaces os_ifname = self.vnet.iface_alias_map["if1"].name scopeid = socket.if_nametoindex(os_ifname) sin6_next = SaHelper.ip6_sa(ip_next, scopeid) s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, 0) s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_NEXTHOP, sin6_next) data = bytes("AAAA", "utf-8") s.sendto(data, (ip_dst, self.DEFAULT_PORT)) # Wait for the received object rx_obj = self.wait_object(second_vnet.pipe) assert rx_obj["dst_ip"] == ip_dst assert rx_obj["dst_iface_alias"] == "if1" class TestIP6OutputScope(BaseTestIP6Ouput): - def vnet2_handler(self, vnet, obj_map, pipe): + def vnet2_handler(self, vnet): """Generic listener that sends first received packet with metadata back to the sender via pipw """ - bind_ip, bind_ifp = self.wait_object(pipe) + bind_ip, bind_ifp = self.wait_object(vnet.pipe) if bind_ip is None: os_ifname = vnet.iface_alias_map[bind_ifp].name ll_data = ToolsHelper.get_linklocals() bind_ip, _ = ll_data[os_ifname][0] if bind_ifp is not None: bind_ifp = vnet.iface_alias_map[bind_ifp].name print("## BIND {}%{}".format(bind_ip, bind_ifp)) - self._vnet2_handler(vnet, obj_map, pipe, bind_ip, bind_ifp) + self._vnet2_handler(vnet, bind_ip, bind_ifp) @pytest.mark.parametrize( "params", [ # sif/dif: source/destination interface (for link-local addr) # sip/dip: source/destination ip (for non-LL addr) # ex: OSError errno that sendto() must raise pytest.param({"sif": "if2", "dif": "if2"}, id="same"), pytest.param( { "sif": "if1", "dif": "if2", "ex": errno.EHOSTUNREACH, }, id="ll_differentif1", ), pytest.param( { "sif": "if1", "dip": "2001:db8:b::2", "ex": errno.EHOSTUNREACH, }, id="ll_differentif2", ), pytest.param( { "sip": "2001:db8:a::1", "dif": "if2", }, id="gu_to_ll", ), ], ) @pytest.mark.require_user("root") def test_output6_linklocal_scope(self, params): """Tests simple UDP output""" second_vnet = self.vnet_map["vnet2"] src_ifp = params.get("sif") src_ip = params.get("sip") dst_ifp = params.get("dif") dst_ip = params.get("dip") errno = params.get("ex", 0) # Sent ifp/IP to bind on second_vnet = self.vnet_map["vnet2"] second_vnet.pipe.send((dst_ip, dst_ifp)) # Wait for the child to become ready ll_data = self.wait_object(second_vnet.pipe) if dst_ip is None: # Pick LL address on dst_ifp vnet2's end dst_ip, _ = ll_data[second_vnet.iface_alias_map[dst_ifp].name][0] # Get local interface scope os_ifname = self.vnet.iface_alias_map[dst_ifp].name scopeid = socket.if_nametoindex(os_ifname) target = (dst_ip, self.DEFAULT_PORT, 0, scopeid) else: target = (dst_ip, self.DEFAULT_PORT, 0, 0) # Bind if src_ip is None: ll_data = ToolsHelper.get_linklocals() os_ifname = self.vnet.iface_alias_map[src_ifp].name src_ip, _ = ll_data[os_ifname][0] scopeid = socket.if_nametoindex(os_ifname) src = (src_ip, self.DEFAULT_PORT, 0, scopeid) else: src = (src_ip, self.DEFAULT_PORT, 0, 0) s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP) s.bind(src) data = bytes("AAAA", "utf-8") print("## TX packet {} -> {}".format(src, target)) try: s.sendto(data, target) except OSError as e: if not errno: raise assert e.errno == errno print("Correctly raised {}".format(e)) return # Wait for the received object rx_obj = self.wait_object(second_vnet.pipe) assert rx_obj["dst_ip"] == dst_ip assert rx_obj["src_ip"] == src_ip # assert rx_obj["dst_iface_alias"] == "if2" class TestIP6OutputMulticast(BaseTestIP6Ouput): - def vnet2_handler(self, vnet, obj_map, pipe): - group = self.wait_object(pipe) + def vnet2_handler(self, vnet): + group = self.wait_object(vnet.pipe) os_ifname = vnet.iface_alias_map["if2"].name - self._vnet2_handler(vnet, obj_map, pipe, group, os_ifname) + self._vnet2_handler(vnet, group, os_ifname) @pytest.mark.parametrize("group_scope", ["ff02", "ff05", "ff08", "ff0e"]) @pytest.mark.require_user("root") def test_output6_multicast(self, group_scope): """Tests simple UDP output""" second_vnet = self.vnet_map["vnet2"] group = "{}::3456".format(group_scope) second_vnet.pipe.send(group) # Pick target on if2 vnet2's end ip = group os_ifname = self.vnet.iface_alias_map["if2"].name ifindex = socket.if_nametoindex(os_ifname) optval = struct.pack("I", ifindex) s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP) s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, optval) data = bytes("AAAA", "utf-8") # Wait for the child to become ready self.wait_object(second_vnet.pipe) print("## TX packet to {},{}".format(ip, self.DEFAULT_PORT)) s.sendto(data, (ip, self.DEFAULT_PORT)) # Wait for the received object rx_obj = self.wait_object(second_vnet.pipe) assert rx_obj["dst_ip"] == ip assert rx_obj["dst_iface_alias"] == "if2" class TestIP6OutputLoopback(SingleVnetTestTemplate): IPV6_PREFIXES = ["2001:db8:a::1/64"] DEFAULT_PORT = 45365 @pytest.mark.parametrize( "source_validation", [ pytest.param(0, id="no_sav"), pytest.param(1, id="sav", marks=pytest.mark.skip(reason="fails")), ], ) @pytest.mark.parametrize("scope", ["gu", "ll", "lo"]) def test_output6_self_tcp(self, scope, source_validation): """Tests IPv6 TCP connection to the local IPv6 address""" ToolsHelper.set_sysctl( "net.inet6.ip6.source_address_validation", source_validation ) if scope == "gu": ip = "2001:db8:a::1" addr_tuple = (ip, self.DEFAULT_PORT) elif scope == "ll": os_ifname = self.vnet.iface_alias_map["if1"].name ifindex = socket.if_nametoindex(os_ifname) ll_data = ToolsHelper.get_linklocals() ip, _ = ll_data[os_ifname][0] addr_tuple = (ip, self.DEFAULT_PORT, 0, ifindex) elif scope == "lo": ip = "::1" ToolsHelper.get_output("route add -6 ::1/128 -iface lo0") ifindex = socket.if_nametoindex("lo0") addr_tuple = (ip, self.DEFAULT_PORT) else: assert 0 == 1 print("address: {}".format(addr_tuple)) start = time.perf_counter() ss = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP) ss.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_RECVPKTINFO, 1) ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) ss.bind(addr_tuple) ss.listen() s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP) s.settimeout(2.0) s.connect(addr_tuple) conn, from_addr = ss.accept() duration = time.perf_counter() - start assert from_addr[0] == ip assert duration < 1.0 @pytest.mark.parametrize( "source_validation", [ pytest.param(0, id="no_sav"), pytest.param(1, id="sav", marks=pytest.mark.skip(reason="fails")), ], ) @pytest.mark.parametrize("scope", ["gu", "ll", "lo"]) def test_output6_self_udp(self, scope, source_validation): """Tests IPv6 UDP connection to the local IPv6 address""" ToolsHelper.set_sysctl( "net.inet6.ip6.source_address_validation", source_validation ) if scope == "gu": ip = "2001:db8:a::1" addr_tuple = (ip, self.DEFAULT_PORT) elif scope == "ll": os_ifname = self.vnet.iface_alias_map["if1"].name ifindex = socket.if_nametoindex(os_ifname) ll_data = ToolsHelper.get_linklocals() ip, _ = ll_data[os_ifname][0] addr_tuple = (ip, self.DEFAULT_PORT, 0, ifindex) elif scope == "lo": ip = "::1" ToolsHelper.get_output("route add -6 ::1/128 -iface lo0") ifindex = socket.if_nametoindex("lo0") addr_tuple = (ip, self.DEFAULT_PORT) else: assert 0 == 1 print("address: {}".format(addr_tuple)) start = time.perf_counter() ss = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP) ss.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_RECVPKTINFO, 1) ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) ss.bind(addr_tuple) ss.listen() s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM, socket.IPPROTO_TCP) s.settimeout(2.0) s.connect(addr_tuple) conn, from_addr = ss.accept() duration = time.perf_counter() - start assert from_addr[0] == ip assert duration < 1.0