Index: tests/atf_python/sys/net/tools.py =================================================================== --- tests/atf_python/sys/net/tools.py +++ tests/atf_python/sys/net/tools.py @@ -12,6 +12,7 @@ class ToolsHelper(object): NETSTAT_PATH = "/usr/bin/netstat" + IFCONFIG_PATH = "/sbin/ifconfig" @classmethod def get_output(cls, cmd: str, verbose=False) -> str: @@ -19,6 +20,19 @@ print("run: '{}'".format(cmd)) return os.popen(cmd).read() + @classmethod + def print_output(cls, cmd: str, verbose=True): + if verbose: + print("======= {} =====".format(cmd)) + print(cls.get_output(cmd)) + if verbose: + print() + + @classmethod + def print_net_debug(cls): + cls.print_output("ifconfig") + cls.print_output("netstat -rnW") + @classmethod def get_routes(cls, family: str, fibnum: int = 0): family_key = {"inet": "-4", "inet6": "-6"}.get(family) @@ -31,3 +45,25 @@ return js[0]["rt-entry"] else: return [] + + @classmethod + def get_linklocals(cls): + ret = {} + ifname = None + ips = [] + for line in cls.get_output(cls.IFCONFIG_PATH).splitlines(): + if line[0].isalnum(): + if ifname: + ret[ifname] = ips + ips = [] + ifname = line.split(":")[0] + else: + words = line.split() + if words[0] == "inet6" and words[1].startswith("fe80"): + # inet6 fe80::1%lo0 prefixlen 64 scopeid 0x2 + ip = words[1].split("%")[0] + scopeid = int(words[words.index("scopeid") + 1], 16) + ips.append((ip, scopeid)) + if ifname: + ret[ifname] = ips + return ret Index: tests/atf_python/sys/net/vnet.py =================================================================== --- tests/atf_python/sys/net/vnet.py +++ tests/atf_python/sys/net/vnet.py @@ -1,30 +1,51 @@ #!/usr/local/bin/python3 +import copy +import ipaddress import os import socket +import sys import time from ctypes import cdll from ctypes import get_errno from ctypes.util import find_library +from multiprocessing import Pipe +from multiprocessing import Process +from typing import Dict from typing import List from typing import Optional +from atf_python.sys.net.tools import ToolsHelper -def run_cmd(cmd: str) -> str: + +def run_cmd(cmd: str, verbose=True) -> str: print("run: '{}'".format(cmd)) return os.popen(cmd).read() -class VnetInterface(object): - INTERFACES_FNAME = "created_interfaces.lst" +def convert_test_name(test_name: str) -> str: + """Convert test name to a string that can be used in the file/jail names""" + ret = "" + for char in test_name: + if char.isalnum() or char in ("_", "-"): + ret += char + elif char in ("["): + ret += "_" + return ret + +class VnetInterface(object): # defines from net/if_types.h IFT_LOOP = 0x18 IFT_ETHER = 0x06 - def __init__(self, iface_name: str): + def __init__(self, iface_alias: str, iface_name: str): self.name = iface_name + self.alias = iface_alias self.vnet_name = "" self.jailed = False + self.addr_map = {"inet6": {}, "inet": {}} + self.prefixes4 = [] + self.prefixes6 = [] if iface_name.startswith("lo"): self.iftype = self.IFT_LOOP else: @@ -34,56 +55,61 @@ def ifindex(self): return socket.if_nametoindex(self.name) + @property + def first_ipv6(self): + d = self.addr_map["inet6"] + return d[next(iter(d))] + + @property + def first_ipv4(self): + d = self.addr_map["inet"] + return d[next(iter(d))] + def set_vnet(self, vnet_name: str): self.vnet_name = vnet_name def set_jailed(self, jailed: bool): self.jailed = jailed - def run_cmd(self, cmd): + def run_cmd( + self, + cmd, + verbose=False, + ): if self.vnet_name and not self.jailed: cmd = "jexec {} {}".format(self.vnet_name, cmd) - run_cmd(cmd) - - @staticmethod - def file_append_line(line): - with open(VnetInterface.INTERFACES_FNAME, "a") as f: - f.write(line + "\n") + return run_cmd(cmd, verbose) @classmethod - def create_iface(cls, iface_name: str): + def create_iface(cls, alias_name: str, iface_name: str) -> List["VnetInterface"]: name = run_cmd("/sbin/ifconfig {} create".format(iface_name)).rstrip() if not name: raise Exception("Unable to create iface {}".format(iface_name)) - cls.file_append_line(name) + ret = [cls(alias_name, name)] if name.startswith("epair"): - cls.file_append_line(name[:-1] + "b") - return cls(name) - - @staticmethod - def cleanup_ifaces(): - try: - with open(VnetInterface.INTERFACES_FNAME, "r") as f: - for line in f: - run_cmd("/sbin/ifconfig {} destroy".format(line.strip())) - os.unlink(VnetInterface.INTERFACES_FNAME) - except Exception: - pass + ret.append(cls(alias_name, name[:-1] + "b")) + return ret def setup_addr(self, addr: str): - if ":" in addr: + addr = ipaddress.ip_interface(addr) + if addr.version == 6: family = "inet6" else: family = "inet" cmd = "/sbin/ifconfig {} {} {}".format(self.name, family, addr) self.run_cmd(cmd) + self.addr_map[family][str(addr.ip)] = addr def delete_addr(self, addr: str): - if ":" in addr: + addr = ipaddress.ip_address(addr) + if addr.version == 6: + family = "inet6" cmd = "/sbin/ifconfig {} inet6 {} delete".format(self.name, addr) else: + family = "inet" cmd = "/sbin/ifconfig {} -alias {}".format(self.name, addr) self.run_cmd(cmd) + del self.addr_map[family][str(addr.ip)] def turn_up(self): cmd = "/sbin/ifconfig {} up".format(self.name) @@ -93,50 +119,127 @@ cmd = "/usr/sbin/ndp -i {} -disabled".format(self.name) self.run_cmd(cmd) + def has_tentative(self) -> bool: + """True if an interface has some addresses in tenative state""" + cmd = "/sbin/ifconfig {} inet6".format(self.name) + out = self.run_cmd(cmd, verbose=False) + for line in out.splitlines(): + if "tentative" in line: + return True + return False -class VnetInstance(object): - JAILS_FNAME = "created_jails.lst" - def __init__(self, vnet_name: str, jid: int, ifaces: List[VnetInterface]): +class IfaceFactory(object): + INTERFACES_FNAME = "created_ifaces.lst" + + def __init__(self, test_name: str): + self.test_name = test_name + test_id = convert_test_name(test_name) + self.file_name = self.INTERFACES_FNAME + + def _register_iface(self, iface_name: str): + with open(self.file_name, "a") as f: + f.write(iface_name + "\n") + + def create_iface(self, alias_name: str, iface_name: str) -> List[VnetInterface]: + ifaces = VnetInterface.create_iface(alias_name, iface_name) + for iface in ifaces: + self._register_iface(iface.name) + return ifaces + + def cleanup(self): + try: + with open(self.file_name, "r") as f: + for line in f: + run_cmd("/sbin/ifconfig {} destroy".format(line.strip())) + os.unlink(self.INTERFACES_FNAME) + except Exception: + pass + + +class VnetInstance(object): + def __init__( + self, vnet_alias: str, vnet_name: str, jid: int, ifaces: List[VnetInterface] + ): self.name = vnet_name + self.alias = vnet_alias # reference in the test topology self.jid = jid self.ifaces = ifaces + self.iface_alias_map = {} # iface.alias: iface + self.iface_map = {} # iface.name: iface for iface in ifaces: iface.set_vnet(vnet_name) iface.set_jailed(True) + self.iface_alias_map[iface.alias] = iface + self.iface_map[iface.name] = iface + self.need_dad = False # Disable duplicate address detection by default + self.attached = False + self.pipe = None + self.subprocess = None def run_vnet_cmd(self, cmd): - if self.vnet_name: - cmd = "jexec {} {}".format(self.vnet_name, cmd) + if not self.attached: + cmd = "jexec {} {}".format(self.name, cmd) return run_cmd(cmd) - @staticmethod - def wait_interface(vnet_name: str, iface_name: str): - cmd = "jexec {} /sbin/ifconfig -l".format(vnet_name) - for i in range(50): - ifaces = run_cmd(cmd).strip().split(" ") - if iface_name in ifaces: - return True - time.sleep(0.1) - return False + def disable_dad(self): + self.run_vnet_cmd("/sbin/sysctl net.inet6.ip6.dad_count=0") + + def set_pipe(self, pipe): + self.pipe = pipe + + def set_subprocess(self, p): + self.subprocess = p @staticmethod - def file_append_line(line): - with open(VnetInstance.JAILS_FNAME, "a") as f: - f.write(line + "\n") + def attach_jid(jid: int): + _path: Optional[str] = find_library("c") + if _path is None: + raise Exception("libc not found") + path: str = _path + libc = cdll.LoadLibrary(path) + if libc.jail_attach(jid) != 0: + raise Exception("jail_attach() failed: errno {}".format(get_errno())) + + def attach(self): + self.attach_jid(self.jid) + self.attached = True + + +class VnetFactory(object): + JAILS_FNAME = "created_jails.lst" + + def __init__(self, test_name: str): + self.test_name = test_name + self.test_id = convert_test_name(test_name) + self.file_name = self.JAILS_FNAME + self._vnets = [] + + def _register_vnet(self, vnet_name: str): + self._vnets.append(vnet_name) + with open(self.file_name, "a") as f: + f.write(vnet_name + "\n") @staticmethod - def cleanup_vnets(): - try: - with open(VnetInstance.JAILS_FNAME) as f: - for line in f: - run_cmd("/usr/sbin/jail -r {}".format(line.strip())) - os.unlink(VnetInstance.JAILS_FNAME) - except Exception: - pass + def _wait_interfaces(vnet_name: str, ifaces: List[str]) -> List[str]: + cmd = "jexec {} /sbin/ifconfig -l".format(vnet_name) + not_matched = [] + for i in range(50): + vnet_ifaces = run_cmd(cmd).strip().split(" ") + not_matched = [] + for iface_name in ifaces: + if iface_name not in vnet_ifaces: + not_matched.append(iface_name) + if len(not_matched) == 0: + return [] + time.sleep(0.1) + return not_matched - @classmethod - def create_with_interfaces(cls, vnet_name: str, ifaces: List[VnetInterface]): + def create_vnet(self, vnet_alias: str, ifaces: List[VnetInterface]): + vnet_name = "jail_{}".format(self.test_id) + if self._vnets: + # add number to distinguish jails + vnet_name = "{}_{}".format(vnet_name, len(self._vnets) + 1) iface_cmds = " ".join(["vnet.interface={}".format(i.name) for i in ifaces]) cmd = "/usr/sbin/jail -i -c name={} persist vnet {}".format( vnet_name, iface_cmds @@ -145,59 +248,195 @@ jid = int(jid_str) if jid <= 0: raise Exception("Jail creation failed, output: {}".format(jid)) - cls.file_append_line(vnet_name) + self._register_vnet(vnet_name) - for iface in ifaces: - if cls.wait_interface(vnet_name, iface.name): - continue + not_found = self._wait_interfaces(vnet_name, [i.name for i in ifaces]) + if not_found: raise Exception( - "Interface {} has not appeared in vnet {}".format(iface.name, vnet_name) + "Interfaces {} has not appeared in vnet {}".format(not_found, vnet_name) ) - return cls(vnet_name, jid, ifaces) + return VnetInstance(vnet_alias, vnet_name, jid, ifaces) - @staticmethod - def attach_jid(jid: int): - _path: Optional[str] = find_library("c") - if _path is None: - raise Exception("libc not found") - path: str = _path - libc = cdll.LoadLibrary(path) - if libc.jail_attach(jid) != 0: - raise Exception("jail_attach() failed: errno {}".format(get_errno())) - - def attach(self): - self.attach_jid(self.jid) + def cleanup(self): + try: + with open(self.file_name) as f: + for line in f: + jail_name = line.strip() + ToolsHelper.print_output( + "/usr/sbin/jexec {} ifconfig -l".format(jail_name) + ) + run_cmd("/usr/sbin/jail -r {}".format(line.strip())) + os.unlink(self.JAILS_FNAME) + except OSError: + pass -class SingleVnetTestTemplate(object): - num_epairs = 1 - IPV6_PREFIXES: List[str] = [] - IPV4_PREFIXES: List[str] = [] +class VnetTestTemplate(object): + TOPOLOGY = {} + + def _get_vnet_handler(self, vnet_alias: str): + handler_name = "{}_handler".format(vnet_alias) + return getattr(self, handler_name, None) + + def _setup_vnet(self, vnet: VnetInstance, obj_map: Dict, pipe): + """Base Handler to setup given VNET. + Can be run in a subprocess. If so, passes control to the special + vnetX_handler() after setting up interface addresses + """ + vnet.attach() + print("# setup_vnet({})".format(vnet.name)) + + topo = obj_map["topo_map"] + ipv6_ifaces = [] + # Disable DAD + if not vnet.need_dad: + vnet.disable_dad() + for iface in vnet.ifaces: + # check index of vnet within an interface + # as we have prefixes for both ends of the interface + iface_map = obj_map["iface_map"][iface.alias] + idx = iface_map["vnets"].index(vnet.alias) + prefixes6 = topo[iface.alias].get("prefixes6", []) + prefixes4 = topo[iface.alias].get("prefixes4", []) + if prefixes6 or prefixes4: + ipv6_ifaces.append(iface) + iface.turn_up() + if prefixes6: + iface.enable_ipv6() + for prefix in prefixes6 + prefixes4: + iface.setup_addr(prefix[idx]) + for iface in ipv6_ifaces: + while iface.has_tentative(): + time.sleep(0.1) + + # Run actual handler + handler = self._get_vnet_handler(vnet.alias) + if handler: + # Do unbuffered stdout for children + # so the logs are present if the child hangs + sys.stdout.reconfigure(line_buffering=True) + handler(vnet, obj_map, pipe) + + def setup_topology(self, topo: Dict, test_name: str): + """Creates jails & interfaces for the provided topology""" + iface_map = {} + vnet_map = {} + iface_factory = IfaceFactory(test_name) + vnet_factory = VnetFactory(test_name) + for obj_name, obj_data in topo.items(): + if obj_name.startswith("if"): + ifaces = iface_factory.create_iface(obj_name, "epair") + iface_map[obj_name] = {"ifaces": ifaces, "vnets": []} + for obj_name, obj_data in topo.items(): + if obj_name.startswith("vnet"): + ifaces = [] + for iface_alias in obj_data["ifaces"]: + idx = len(iface_map[iface_alias]["vnets"]) + iface_map[iface_alias]["vnets"].append(obj_name) + ifaces.append(iface_map[iface_alias]["ifaces"][idx]) + vnet = vnet_factory.create_vnet(obj_name, ifaces) + vnet_map[obj_name] = vnet + # Debug output + print("============= TEST TOPOLOGY =============") + for vnet_alias, vnet in vnet_map.items(): + print("# vnet {} -> {}".format(vnet.alias, vnet.name), end="") + handler = self._get_vnet_handler(vnet.alias) + if handler: + print(" handler: {}".format(handler.__name__), end="") + print() + for iface_alias, iface_data in iface_map.items(): + vnets: List[str] = iface_data["vnets"] + ifaces: List[VnetInterface] = iface_data["ifaces"] + if len(vnets) == 1 and len(ifaces) == 2: + print( + "# iface {}: {}::{} -> main::{}".format( + iface_alias, vnets[0], ifaces[0].name, ifaces[1].name + ) + ) + elif len(vnets) == 2 and len(ifaces) == 2: + print( + "# iface {}: {}::{} -> {}::{}".format( + iface_alias, vnets[0], ifaces[0].name, vnets[1], ifaces[1].name + ) + ) + else: + print( + "# iface {}: ifaces: {} vnets: {}".format( + iface_alias, vnets, [i.name for i in ifaces] + ) + ) + print() + return {"iface_map": iface_map, "vnet_map": vnet_map, "topo_map": topo} def setup_method(self, method): - test_name = method.__name__ - vnet_name = "jail_{}".format(test_name) - ifaces = [] - for i in range(self.num_epairs): - ifaces.append(VnetInterface.create_iface("epair")) - self.vnet = VnetInstance.create_with_interfaces(vnet_name, ifaces) - self.vnet.attach() - for i, addr in enumerate(self.IPV6_PREFIXES): - if addr: - iface = self.vnet.ifaces[i] - iface.turn_up() - iface.enable_ipv6() - iface.setup_addr(addr) - for i, addr in enumerate(self.IPV4_PREFIXES): - if addr: - iface = self.vnet.ifaces[i] - iface.turn_up() - iface.setup_addr(addr) + """Sets up all the required topology and handlers for the given test""" + # 'test_ip6_output.py::TestIP6Output::test_output6_pktinfo[ipandif] (setup)' + test_id = os.environ.get("PYTEST_CURRENT_TEST").split(" ")[0] + test_name = test_id.split("::")[-1] + topology = self.TOPOLOGY + # First, setup kernel objects - interfaces & vnets + obj_map = self.setup_topology(topology, test_name) + main_vnet = None # one without subprocess handler + for vnet_alias, vnet in obj_map["vnet_map"].items(): + if self._get_vnet_handler(vnet_alias): + # Need subprocess to run + parent_pipe, child_pipe = Pipe() + p = Process( + target=self._setup_vnet, + args=( + vnet, + obj_map, + child_pipe, + ), + ) + vnet.set_pipe(parent_pipe) + vnet.set_subprocess(p) + p.start() + else: + if main_vnet is not None: + raise Exception("there can be only 1 VNET w/o handler") + main_vnet = vnet + # Main vnet needs to be the last, so all the other subprocesses + # are started & their pipe handles collected + self.vnet = main_vnet + self._setup_vnet(main_vnet, obj_map, None) + # Save state for the main handler + self.iface_map = obj_map["iface_map"] + self.vnet_map = obj_map["vnet_map"] + + def cleanup(self, test_id: str): + # pytest test id: file::class::test_name + test_name = test_id.split("::")[-1] - def cleanup(self, nodeid: str): print("==== vnet cleanup ===") - VnetInstance.cleanup_vnets() - VnetInterface.cleanup_ifaces() + print("# test_name: '{}'".format(test_name)) + VnetFactory(test_name).cleanup() + IfaceFactory(test_name).cleanup() + + def wait_object(self, pipe, timeout=5): + if pipe.poll(timeout): + return pipe.recv() + raise TimeoutError + + @property + def curvnet(self): + pass - def run_cmd(self, cmd: str) -> str: - return os.popen(cmd).read() + +class SingleVnetTestTemplate(VnetTestTemplate): + IPV6_PREFIXES: List[str] = [] + IPV4_PREFIXES: List[str] = [] + + def setup_method(self, method): + topology = copy.deepcopy( + { + "vnet1": {"ifaces": ["if1"]}, + "if1": {"prefixes4": [], "prefixes6": []}, + } + ) + for prefix in self.IPV6_PREFIXES: + topology["if1"]["prefixes6"].append((prefix,)) + for prefix in self.IPV4_PREFIXES: + topology["if1"]["prefixes4"].append((prefix,)) + self.TOPOLOGY = topology + super().setup_method(method) Index: tests/sys/netinet6/Makefile =================================================================== --- tests/sys/netinet6/Makefile +++ tests/sys/netinet6/Makefile @@ -5,6 +5,7 @@ TESTSDIR= ${TESTSBASE}/sys/netinet6 FILESDIR= ${TESTSDIR} +ATF_TESTS_PYTEST= test_ip6_output.py ATF_TESTS_SH= \ exthdr \ mld \ Index: tests/sys/netinet6/test_ip6_output.py =================================================================== --- /dev/null +++ tests/sys/netinet6/test_ip6_output.py @@ -0,0 +1,333 @@ +import errno +import ipaddress +import socket +import struct +import time +from ctypes import c_byte +from ctypes import c_uint +from ctypes import Structure + +import pytest +from atf_python.sys.net.rtsock import SaHelper +from atf_python.sys.net.tools import ToolsHelper +from atf_python.sys.net.vnet import run_cmd +from atf_python.sys.net.vnet import VnetTestTemplate + + +class In6Pktinfo(Structure): + _fields_ = [ + ("ipi6_addr", c_byte * 16), + ("ipi6_ifindex", c_uint), + ] + + +class VerboseSocketServer: + def __init__(self, ip: str, port: int, ifname: str = None): + self.ip = ip + self.port = port + + s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP) + s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_RECVPKTINFO, 1) + addr = ipaddress.ip_address(ip) + if addr.is_link_local and ifname: + ifindex = socket.if_nametoindex(ifname) + addr_tuple = (ip, port, 0, ifindex) + elif addr.is_multicast and ifname: + ifindex = socket.if_nametoindex(ifname) + mreq = socket.inet_pton(socket.AF_INET6, ip) + struct.pack("I", ifindex) + s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq) + print("## JOINED group {} % {}".format(ip, ifname)) + addr_tuple = ("::", port, 0, ifindex) + else: + addr_tuple = (ip, port, 0, 0) + print("## Listening on [{}]:{}".format(addr_tuple[0], port)) + s.bind(addr_tuple) + self.socket = s + + def recv(self): + # data = self.socket.recv(4096) + # print("RX: " + data) + data, ancdata, msg_flags, address = self.socket.recvmsg(4096, 128) + # Assume ancdata has just 1 item + info = In6Pktinfo.from_buffer_copy(ancdata[0][2]) + dst_ip = socket.inet_ntop(socket.AF_INET6, info.ipi6_addr) + dst_iface = socket.if_indextoname(info.ipi6_ifindex) + + tx_obj = { + "data": data, + "src_ip": address[0], + "dst_ip": dst_ip, + "dst_iface": dst_iface, + } + return tx_obj + + +class BaseTestIP6Ouput(VnetTestTemplate): + TOPOLOGY = { + "vnet1": {"ifaces": ["if1", "if2", "if3"]}, + "vnet2": {"ifaces": ["if1", "if2", "if3"]}, + "if1": {"prefixes6": [("2001:db8:a::1/64", "2001:db8:a::2/64")]}, + "if2": {"prefixes6": [("2001:db8:b::1/64", "2001:db8:b::2/64")]}, + "if3": {"prefixes6": [("2001:db8:c::1/64", "2001:db8:c::2/64")]}, + } + DEFAULT_PORT = 45365 + + def _vnet2_handler(self, vnet, obj_map, pipe, ip: str, os_ifname: str = None): + """Generic listener that sends first received packet with metadata + back to the sender via pipw + """ + ll_data = ToolsHelper.get_linklocals() + # Start listener + ss = VerboseSocketServer(ip, self.DEFAULT_PORT, os_ifname) + pipe.send(ll_data) + + tx_obj = ss.recv() + tx_obj["dst_iface_alias"] = vnet.iface_map[tx_obj["dst_iface"]].alias + pipe.send(tx_obj) + + +class TestIP6Output(BaseTestIP6Ouput): + def vnet2_handler(self, vnet, obj_map, pipe): + ip = str(vnet.iface_alias_map["if2"].first_ipv6.ip) + self._vnet2_handler(vnet, obj_map, pipe, ip, None) + + @pytest.mark.require_user("root") + def test_output6_base(self): + """Tests simple UDP output""" + second_vnet = self.vnet_map["vnet2"] + + # Pick target on if2 vnet2's end + ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1]) + ip = str(ifaddr.ip) + + s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP) + data = bytes("AAAA", "utf-8") + print("## TX packet to {},{}".format(ip, self.DEFAULT_PORT)) + + # Wait for the child to become ready + self.wait_object(second_vnet.pipe) + s.sendto(data, (ip, self.DEFAULT_PORT)) + + # Wait for the received object + rx_obj = self.wait_object(second_vnet.pipe) + assert rx_obj["dst_ip"] == ip + assert rx_obj["dst_iface_alias"] == "if2" + + @pytest.mark.require_user("root") + def test_output6_nhop(self): + """Tests UDP output with custom nhop set""" + second_vnet = self.vnet_map["vnet2"] + + # Pick target on if2 vnet2's end + ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1]) + ip_dst = str(ifaddr.ip) + # Pick nexthop on if1 + ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if1"]["prefixes6"][0][1]) + ip_next = str(ifaddr.ip) + sin6_next = SaHelper.ip6_sa(ip_next, 0) + + s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, 0) + s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_NEXTHOP, sin6_next) + + # Wait for the child to become ready + self.wait_object(second_vnet.pipe) + data = bytes("AAAA", "utf-8") + s.sendto(data, (ip_dst, self.DEFAULT_PORT)) + + # Wait for the received object + rx_obj = self.wait_object(second_vnet.pipe) + assert rx_obj["dst_ip"] == ip_dst + assert rx_obj["dst_iface_alias"] == "if1" + + @pytest.mark.parametrize( + "params", + [ + # esrc: src-ip, if: src-interface, esrc: expected-src, + # eif: expected-rx-interface + pytest.param({"esrc": "2001:db8:b::1", "eif": "if2"}, id="empty"), + pytest.param( + {"src": "2001:db8:c::1", "esrc": "2001:db8:c::1", "eif": "if2"}, + id="iponly1", + ), + pytest.param( + { + "src": "2001:db8:c::1", + "if": "if3", + "ex": errno.EHOSTUNREACH, + }, + id="ipandif", + ), + pytest.param( + { + "src": "2001:db8:c::aaaa", + "ex": errno.EADDRNOTAVAIL, + }, + id="nolocalip", + ), + pytest.param( + {"if": "if2", "src": "2001:db8:b::1", "eif": "if2"}, id="ifsame" + ), + ], + ) + @pytest.mark.require_user("root") + def test_output6_pktinfo(self, params): + """Tests simple UDP output""" + second_vnet = self.vnet_map["vnet2"] + vnet = self.vnet + + # Pick target on if2 vnet2's end + ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1]) + dst_ip = str(ifaddr.ip) + + src_ip = params.get("src", "") + src_ifname = params.get("if", "") + expected_ip = params.get("esrc", "") + expected_ifname = params.get("eif", "") + errno = params.get("ex", 0) + + pktinfo = In6Pktinfo() + if src_ip: + for i, b in enumerate(socket.inet_pton(socket.AF_INET6, src_ip)): + pktinfo.ipi6_addr[i] = b + if src_ifname: + os_ifname = vnet.iface_alias_map[src_ifname].name + pktinfo.ipi6_ifindex = socket.if_nametoindex(os_ifname) + + # Wait for the child to become ready + self.wait_object(second_vnet.pipe) + data = bytes("AAAA", "utf-8") + + s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, 0) + try: + s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_PKTINFO, bytes(pktinfo)) + aux = (socket.IPPROTO_IPV6, socket.IPV6_PKTINFO, bytes(pktinfo)) + s.sendto(data, (dst_ip, self.DEFAULT_PORT)) + except OSError as e: + if not errno: + raise + assert e.errno == errno + print("Correctly raised {}".format(e)) + return + + # Wait for the received object + rx_obj = self.wait_object(second_vnet.pipe) + + assert rx_obj["dst_ip"] == dst_ip + if expected_ip: + assert rx_obj["src_ip"] == expected_ip + if expected_ifname: + assert rx_obj["dst_iface_alias"] == expected_ifname + + +class TestIP6OutputLL(BaseTestIP6Ouput): + def vnet2_handler(self, vnet, obj_map, pipe): + """Generic listener that sends first received packet with metadata + back to the sender via pipw + """ + os_ifname = vnet.iface_alias_map["if2"].name + ll_data = ToolsHelper.get_linklocals() + ll_ip, _ = ll_data[os_ifname][0] + self._vnet2_handler(vnet, obj_map, pipe, ll_ip, os_ifname) + + @pytest.mark.require_user("root") + def test_output6_linklocal(self): + """Tests simple UDP output""" + second_vnet = self.vnet_map["vnet2"] + + # Wait for the child to become ready + ll_data = self.wait_object(second_vnet.pipe) + + # Pick LL address on if2 vnet2's end + ip, _ = ll_data[second_vnet.iface_alias_map["if2"].name][0] + # Get local interface scope + os_ifname = self.vnet.iface_alias_map["if2"].name + scopeid = socket.if_nametoindex(os_ifname) + + s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP) + data = bytes("AAAA", "utf-8") + target = (ip, self.DEFAULT_PORT, 0, scopeid) + print("## TX packet to {}%{},{}".format(ip, scopeid, target[1])) + + s.sendto(data, target) + + # Wait for the received object + rx_obj = self.wait_object(second_vnet.pipe) + assert rx_obj["dst_ip"] == ip + assert rx_obj["dst_iface_alias"] == "if2" + + +@pytest.mark.skip(reason="Currently fails") +class TestIP6OutputNhopLL(BaseTestIP6Ouput): + def vnet2_handler(self, vnet, obj_map, pipe): + """Generic listener that sends first received packet with metadata + back to the sender via pipw + """ + ip = str(vnet.iface_alias_map["if2"].first_ipv6.ip) + self._vnet2_handler(vnet, obj_map, pipe, ip, None) + + @pytest.mark.require_user("root") + def test_output6_nhop_linklocal(self): + """Tests UDP output with custom link-local nhop set""" + second_vnet = self.vnet_map["vnet2"] + + # Wait for the child to become ready + ll_data = self.wait_object(second_vnet.pipe) + + # Pick target on if2 vnet2's end + ifaddr = ipaddress.ip_interface(self.TOPOLOGY["if2"]["prefixes6"][0][1]) + ip_dst = str(ifaddr.ip) + # Pick nexthop on if1 + ip_next, _ = ll_data[second_vnet.iface_alias_map["if1"].name][0] + # Get local interfaces + os_ifname = self.vnet.iface_alias_map["if1"].name + scopeid = socket.if_nametoindex(os_ifname) + sin6_next = SaHelper.ip6_sa(ip_next, scopeid) + + s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, 0) + s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_NEXTHOP, sin6_next) + + data = bytes("AAAA", "utf-8") + s.sendto(data, (ip_dst, self.DEFAULT_PORT)) + + # Wait for the received object + rx_obj = self.wait_object(second_vnet.pipe) + assert rx_obj["dst_ip"] == ip_dst + assert rx_obj["dst_iface_alias"] == "if1" + + +class TestIP6OutputMulticast(BaseTestIP6Ouput): + def vnet2_handler(self, vnet, obj_map, pipe): + group = self.wait_object(pipe) + os_ifname = vnet.iface_alias_map["if2"].name + self._vnet2_handler(vnet, obj_map, pipe, group, os_ifname) + + @pytest.mark.parametrize("group_scope", ["ff02", "ff05", "ff08", "ff0e"]) + @pytest.mark.require_user("root") + def test_output6_multicast(self, group_scope): + """Tests simple UDP output""" + second_vnet = self.vnet_map["vnet2"] + + group = "{}::3456".format(group_scope) + second_vnet.pipe.send(group) + + # Pick target on if2 vnet2's end + ip = group + os_ifname = self.vnet.iface_alias_map["if2"].name + ifindex = socket.if_nametoindex(os_ifname) + optval = struct.pack("I", ifindex) + + s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM, socket.IPPROTO_UDP) + s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, optval) + + data = bytes("AAAA", "utf-8") + + # Wait for the child to become ready + self.wait_object(second_vnet.pipe) + + print("## TX packet to {},{}".format(ip, self.DEFAULT_PORT)) + s.sendto(data, (ip, self.DEFAULT_PORT)) + + # Wait for the received object + rx_obj = self.wait_object(second_vnet.pipe) + assert rx_obj["dst_ip"] == ip + assert rx_obj["dst_iface_alias"] == "if2"