diff --git a/tests/atf_python/Makefile b/tests/atf_python/Makefile index 26d419743257..1a2fec387eda 100644 --- a/tests/atf_python/Makefile +++ b/tests/atf_python/Makefile @@ -1,12 +1,12 @@ .include .PATH: ${.CURDIR} -FILES= __init__.py atf_pytest.py +FILES= __init__.py atf_pytest.py utils.py SUBDIR= sys .include FILESDIR= ${TESTSBASE}/atf_python .include diff --git a/tests/atf_python/sys/net/tools.py b/tests/atf_python/sys/net/tools.py index 23bb5f4b4128..567d9d4b21ac 100644 --- a/tests/atf_python/sys/net/tools.py +++ b/tests/atf_python/sys/net/tools.py @@ -1,86 +1,79 @@ #!/usr/local/bin/python3 import json import os -import socket -import time -from ctypes import cdll -from ctypes import get_errno -from ctypes.util import find_library -from typing import List -from typing import Optional class ToolsHelper(object): NETSTAT_PATH = "/usr/bin/netstat" IFCONFIG_PATH = "/sbin/ifconfig" @classmethod def get_output(cls, cmd: str, verbose=False) -> str: if verbose: print("run: '{}'".format(cmd)) return os.popen(cmd).read() @classmethod def print_output(cls, cmd: str, verbose=True): if verbose: print("======= {} =====".format(cmd)) print(cls.get_output(cmd)) if verbose: print() @classmethod def print_net_debug(cls): cls.print_output("ifconfig") cls.print_output("netstat -rnW") @classmethod def set_sysctl(cls, oid, val): cls.get_output("sysctl {}={}".format(oid, val)) @classmethod def get_routes(cls, family: str, fibnum: int = 0): family_key = {"inet": "-4", "inet6": "-6"}.get(family) out = cls.get_output( "{} {} -rnW -F {} --libxo json".format(cls.NETSTAT_PATH, family_key, fibnum) ) js = json.loads(out) js = js["statistics"]["route-information"]["route-table"]["rt-family"] if js: return js[0]["rt-entry"] else: return [] @classmethod def get_nhops(cls, family: str, fibnum: int = 0): family_key = {"inet": "-4", "inet6": "-6"}.get(family) out = cls.get_output( "{} {} -onW -F {} --libxo json".format(cls.NETSTAT_PATH, family_key, fibnum) ) js = json.loads(out) js = js["statistics"]["route-nhop-information"]["nhop-table"]["rt-family"] if js: return js[0]["nh-entry"] else: return [] @classmethod def get_linklocals(cls): ret = {} ifname = None ips = [] for line in cls.get_output(cls.IFCONFIG_PATH).splitlines(): if line[0].isalnum(): if ifname: ret[ifname] = ips ips = [] ifname = line.split(":")[0] else: words = line.split() if words[0] == "inet6" and words[1].startswith("fe80"): # inet6 fe80::1%lo0 prefixlen 64 scopeid 0x2 ip = words[1].split("%")[0] scopeid = int(words[words.index("scopeid") + 1], 16) ips.append((ip, scopeid)) if ifname: ret[ifname] = ips return ret diff --git a/tests/atf_python/sys/net/vnet.py b/tests/atf_python/sys/net/vnet.py index 0d9f969b28d9..faae58e95b6f 100644 --- a/tests/atf_python/sys/net/vnet.py +++ b/tests/atf_python/sys/net/vnet.py @@ -1,464 +1,458 @@ #!/usr/local/bin/python3 import copy import ipaddress import os import socket import sys import time -from ctypes import cdll -from ctypes import get_errno -from ctypes.util import find_library from multiprocessing import Pipe from multiprocessing import Process from typing import Dict from typing import List from typing import NamedTuple -from typing import Optional from atf_python.sys.net.tools import ToolsHelper +from atf_python.utils import libc, BaseTest def run_cmd(cmd: str, verbose=True) -> str: print("run: '{}'".format(cmd)) return os.popen(cmd).read() def convert_test_name(test_name: str) -> str: """Convert test name to a string that can be used in the file/jail names""" ret = "" for char in test_name: if char.isalnum() or char in ("_", "-"): ret += char elif char in ("["): ret += "_" return ret class VnetInterface(object): # defines from net/if_types.h IFT_LOOP = 0x18 IFT_ETHER = 0x06 def __init__(self, iface_alias: str, iface_name: str): self.name = iface_name self.alias = iface_alias self.vnet_name = "" self.jailed = False self.addr_map: Dict[str, Dict] = {"inet6": {}, "inet": {}} self.prefixes4: List[List[str]] = [] self.prefixes6: List[List[str]] = [] if iface_name.startswith("lo"): self.iftype = self.IFT_LOOP else: self.iftype = self.IFT_ETHER @property def ifindex(self): return socket.if_nametoindex(self.name) @property def first_ipv6(self): d = self.addr_map["inet6"] return d[next(iter(d))] @property def first_ipv4(self): d = self.addr_map["inet"] return d[next(iter(d))] def set_vnet(self, vnet_name: str): self.vnet_name = vnet_name def set_jailed(self, jailed: bool): self.jailed = jailed def run_cmd( self, cmd, verbose=False, ): if self.vnet_name and not self.jailed: cmd = "jexec {} {}".format(self.vnet_name, cmd) return run_cmd(cmd, verbose) @classmethod def setup_loopback(cls, vnet_name: str): lo = VnetInterface("", "lo0") lo.set_vnet(vnet_name) lo.turn_up() @classmethod def create_iface(cls, alias_name: str, iface_name: str) -> List["VnetInterface"]: name = run_cmd("/sbin/ifconfig {} create".format(iface_name)).rstrip() if not name: raise Exception("Unable to create iface {}".format(iface_name)) ret = [cls(alias_name, name)] if name.startswith("epair"): ret.append(cls(alias_name, name[:-1] + "b")) return ret def setup_addr(self, _addr: str): addr = ipaddress.ip_interface(_addr) if addr.version == 6: family = "inet6" cmd = "/sbin/ifconfig {} {} {}".format(self.name, family, addr) else: family = "inet" if self.addr_map[family]: cmd = "/sbin/ifconfig {} alias {}".format(self.name, addr) else: cmd = "/sbin/ifconfig {} {} {}".format(self.name, family, addr) self.run_cmd(cmd) self.addr_map[family][str(addr.ip)] = addr def delete_addr(self, _addr: str): addr = ipaddress.ip_address(_addr) if addr.version == 6: family = "inet6" cmd = "/sbin/ifconfig {} inet6 {} delete".format(self.name, addr) else: family = "inet" cmd = "/sbin/ifconfig {} -alias {}".format(self.name, addr) self.run_cmd(cmd) del self.addr_map[family][str(addr)] def turn_up(self): cmd = "/sbin/ifconfig {} up".format(self.name) self.run_cmd(cmd) def enable_ipv6(self): cmd = "/usr/sbin/ndp -i {} -disabled".format(self.name) self.run_cmd(cmd) def has_tentative(self) -> bool: """True if an interface has some addresses in tenative state""" cmd = "/sbin/ifconfig {} inet6".format(self.name) out = self.run_cmd(cmd, verbose=False) for line in out.splitlines(): if "tentative" in line: return True return False class IfaceFactory(object): INTERFACES_FNAME = "created_ifaces.lst" def __init__(self, test_name: str): self.test_name = test_name - test_id = convert_test_name(test_name) + self.test_id = convert_test_name(test_name) self.file_name = self.INTERFACES_FNAME def _register_iface(self, iface_name: str): with open(self.file_name, "a") as f: f.write(iface_name + "\n") def create_iface(self, alias_name: str, iface_name: str) -> List[VnetInterface]: ifaces = VnetInterface.create_iface(alias_name, iface_name) for iface in ifaces: self._register_iface(iface.name) return ifaces def cleanup(self): try: with open(self.file_name, "r") as f: for line in f: run_cmd("/sbin/ifconfig {} destroy".format(line.strip())) os.unlink(self.INTERFACES_FNAME) except Exception: pass class VnetInstance(object): def __init__( self, vnet_alias: str, vnet_name: str, jid: int, ifaces: List[VnetInterface] ): self.name = vnet_name self.alias = vnet_alias # reference in the test topology self.jid = jid self.ifaces = ifaces self.iface_alias_map = {} # iface.alias: iface self.iface_map = {} # iface.name: iface for iface in ifaces: iface.set_vnet(vnet_name) iface.set_jailed(True) self.iface_alias_map[iface.alias] = iface self.iface_map[iface.name] = iface self.need_dad = False # Disable duplicate address detection by default self.attached = False self.pipe = None self.subprocess = None def run_vnet_cmd(self, cmd): if not self.attached: cmd = "jexec {} {}".format(self.name, cmd) return run_cmd(cmd) def disable_dad(self): self.run_vnet_cmd("/sbin/sysctl net.inet6.ip6.dad_count=0") def set_pipe(self, pipe): self.pipe = pipe def set_subprocess(self, p): self.subprocess = p @staticmethod def attach_jid(jid: int): - _path: Optional[str] = find_library("c") - if _path is None: - raise Exception("libc not found") - path: str = _path - libc = cdll.LoadLibrary(path) - if libc.jail_attach(jid) != 0: - raise Exception("jail_attach() failed: errno {}".format(get_errno())) + error_code = libc.jail_attach(jid) + if error_code != 0: + raise Exception("jail_attach() failed: errno {}".format(error_code)) def attach(self): self.attach_jid(self.jid) self.attached = True class VnetFactory(object): JAILS_FNAME = "created_jails.lst" def __init__(self, test_name: str): self.test_name = test_name self.test_id = convert_test_name(test_name) self.file_name = self.JAILS_FNAME self._vnets: List[str] = [] def _register_vnet(self, vnet_name: str): self._vnets.append(vnet_name) with open(self.file_name, "a") as f: f.write(vnet_name + "\n") @staticmethod def _wait_interfaces(vnet_name: str, ifaces: List[str]) -> List[str]: cmd = "jexec {} /sbin/ifconfig -l".format(vnet_name) not_matched: List[str] = [] for i in range(50): vnet_ifaces = run_cmd(cmd).strip().split(" ") not_matched = [] for iface_name in ifaces: if iface_name not in vnet_ifaces: not_matched.append(iface_name) if len(not_matched) == 0: return [] time.sleep(0.1) return not_matched def create_vnet(self, vnet_alias: str, ifaces: List[VnetInterface]): vnet_name = "jail_{}".format(self.test_id) if self._vnets: # add number to distinguish jails vnet_name = "{}_{}".format(vnet_name, len(self._vnets) + 1) iface_cmds = " ".join(["vnet.interface={}".format(i.name) for i in ifaces]) cmd = "/usr/sbin/jail -i -c name={} persist vnet {}".format( vnet_name, iface_cmds ) jid_str = run_cmd(cmd) jid = int(jid_str) if jid <= 0: raise Exception("Jail creation failed, output: {}".format(jid)) self._register_vnet(vnet_name) # Run expedited version of routing VnetInterface.setup_loopback(vnet_name) not_found = self._wait_interfaces(vnet_name, [i.name for i in ifaces]) if not_found: raise Exception( "Interfaces {} has not appeared in vnet {}".format(not_found, vnet_name) ) return VnetInstance(vnet_alias, vnet_name, jid, ifaces) def cleanup(self): try: with open(self.file_name) as f: for line in f: jail_name = line.strip() ToolsHelper.print_output( "/usr/sbin/jexec {} ifconfig -l".format(jail_name) ) run_cmd("/usr/sbin/jail -r {}".format(line.strip())) os.unlink(self.JAILS_FNAME) except OSError: pass class SingleInterfaceMap(NamedTuple): ifaces: List[VnetInterface] vnet_aliases: List[str] -class VnetTestTemplate(object): +class VnetTestTemplate(BaseTest): TOPOLOGY = {} def _get_vnet_handler(self, vnet_alias: str): handler_name = "{}_handler".format(vnet_alias) return getattr(self, handler_name, None) def _setup_vnet(self, vnet: VnetInstance, obj_map: Dict, pipe): """Base Handler to setup given VNET. Can be run in a subprocess. If so, passes control to the special vnetX_handler() after setting up interface addresses """ vnet.attach() print("# setup_vnet({})".format(vnet.name)) topo = obj_map["topo_map"] ipv6_ifaces = [] # Disable DAD if not vnet.need_dad: vnet.disable_dad() for iface in vnet.ifaces: # check index of vnet within an interface # as we have prefixes for both ends of the interface iface_map = obj_map["iface_map"][iface.alias] idx = iface_map.vnet_aliases.index(vnet.alias) prefixes6 = topo[iface.alias].get("prefixes6", []) prefixes4 = topo[iface.alias].get("prefixes4", []) if prefixes6 or prefixes4: ipv6_ifaces.append(iface) iface.turn_up() if prefixes6: iface.enable_ipv6() for prefix in prefixes6 + prefixes4: iface.setup_addr(prefix[idx]) for iface in ipv6_ifaces: while iface.has_tentative(): time.sleep(0.1) # Run actual handler handler = self._get_vnet_handler(vnet.alias) if handler: # Do unbuffered stdout for children # so the logs are present if the child hangs sys.stdout.reconfigure(line_buffering=True) handler(vnet, obj_map, pipe) def setup_topology(self, topo: Dict, test_name: str): """Creates jails & interfaces for the provided topology""" iface_map: Dict[str, SingleInterfaceMap] = {} vnet_map = {} iface_factory = IfaceFactory(test_name) vnet_factory = VnetFactory(test_name) for obj_name, obj_data in topo.items(): if obj_name.startswith("if"): epair_ifaces = iface_factory.create_iface(obj_name, "epair") smap = SingleInterfaceMap(epair_ifaces, []) iface_map[obj_name] = smap for obj_name, obj_data in topo.items(): if obj_name.startswith("vnet"): vnet_ifaces = [] for iface_alias in obj_data["ifaces"]: # epair creates 2 interfaces, grab first _available_ # and map it to the VNET being created idx = len(iface_map[iface_alias].vnet_aliases) iface_map[iface_alias].vnet_aliases.append(obj_name) vnet_ifaces.append(iface_map[iface_alias].ifaces[idx]) vnet = vnet_factory.create_vnet(obj_name, vnet_ifaces) vnet_map[obj_name] = vnet # Debug output print("============= TEST TOPOLOGY =============") for vnet_alias, vnet in vnet_map.items(): print("# vnet {} -> {}".format(vnet.alias, vnet.name), end="") handler = self._get_vnet_handler(vnet.alias) if handler: print(" handler: {}".format(handler.__name__), end="") print() for iface_alias, iface_data in iface_map.items(): vnets = iface_data.vnet_aliases ifaces: List[VnetInterface] = iface_data.ifaces if len(vnets) == 1 and len(ifaces) == 2: print( "# iface {}: {}::{} -> main::{}".format( iface_alias, vnets[0], ifaces[0].name, ifaces[1].name ) ) elif len(vnets) == 2 and len(ifaces) == 2: print( "# iface {}: {}::{} -> {}::{}".format( iface_alias, vnets[0], ifaces[0].name, vnets[1], ifaces[1].name ) ) else: print( "# iface {}: ifaces: {} vnets: {}".format( iface_alias, vnets, [i.name for i in ifaces] ) ) print() return {"iface_map": iface_map, "vnet_map": vnet_map, "topo_map": topo} def setup_method(self, method): """Sets up all the required topology and handlers for the given test""" # 'test_ip6_output.py::TestIP6Output::test_output6_pktinfo[ipandif] (setup)' test_id = os.environ.get("PYTEST_CURRENT_TEST").split(" ")[0] test_name = test_id.split("::")[-1] + self.check_constraints() topology = self.TOPOLOGY # First, setup kernel objects - interfaces & vnets obj_map = self.setup_topology(topology, test_name) main_vnet = None # one without subprocess handler for vnet_alias, vnet in obj_map["vnet_map"].items(): if self._get_vnet_handler(vnet_alias): # Need subprocess to run parent_pipe, child_pipe = Pipe() p = Process( target=self._setup_vnet, args=( vnet, obj_map, child_pipe, ), ) vnet.set_pipe(parent_pipe) vnet.set_subprocess(p) p.start() else: if main_vnet is not None: raise Exception("there can be only 1 VNET w/o handler") main_vnet = vnet # Main vnet needs to be the last, so all the other subprocesses # are started & their pipe handles collected self.vnet = main_vnet self._setup_vnet(main_vnet, obj_map, None) # Save state for the main handler self.iface_map = obj_map["iface_map"] self.vnet_map = obj_map["vnet_map"] def cleanup(self, test_id: str): # pytest test id: file::class::test_name test_name = test_id.split("::")[-1] print("==== vnet cleanup ===") print("# test_name: '{}'".format(test_name)) VnetFactory(test_name).cleanup() IfaceFactory(test_name).cleanup() def wait_object(self, pipe, timeout=5): if pipe.poll(timeout): return pipe.recv() raise TimeoutError @property def curvnet(self): pass class SingleVnetTestTemplate(VnetTestTemplate): IPV6_PREFIXES: List[str] = [] IPV4_PREFIXES: List[str] = [] def setup_method(self, method): topology = copy.deepcopy( { "vnet1": {"ifaces": ["if1"]}, "if1": {"prefixes4": [], "prefixes6": []}, } ) for prefix in self.IPV6_PREFIXES: topology["if1"]["prefixes6"].append((prefix,)) for prefix in self.IPV4_PREFIXES: topology["if1"]["prefixes4"].append((prefix,)) self.TOPOLOGY = topology super().setup_method(method) diff --git a/tests/atf_python/utils.py b/tests/atf_python/utils.py new file mode 100644 index 000000000000..12cd56c10149 --- /dev/null +++ b/tests/atf_python/utils.py @@ -0,0 +1,46 @@ +#!/usr/bin/env python3 +import os +from ctypes import CDLL +from ctypes import get_errno +from ctypes.util import find_library +from typing import List +from typing import Optional + +import pytest + + +class LibCWrapper(object): + def __init__(self): + path: Optional[str] = find_library("c") + if path is None: + raise RuntimeError("libc not found") + self._libc = CDLL(path, use_errno=True) + + def modfind(self, mod_name: str) -> int: + if self._libc.modfind(bytes(mod_name, encoding="ascii")) == -1: + return get_errno() + return 0 + + def jail_attach(self, jid: int) -> int: + if self._libc.jail_attach(jid) != 0: + return get_errno() + return 0 + + +libc = LibCWrapper() + + +class BaseTest(object): + REQUIRED_MODULES: List[str] = [] + + def _check_modules(self): + for mod_name in self.REQUIRED_MODULES: + error_code = libc.modfind(mod_name) + if error_code != 0: + err_str = os.strerror(error_code) + pytest.skip( + "kernel module '{}' not available: {}".format(mod_name, err_str) + ) + + def check_constraints(self): + self._check_modules() diff --git a/tests/sys/Makefile b/tests/sys/Makefile index 2ba60f41b76c..14d96e053b64 100644 --- a/tests/sys/Makefile +++ b/tests/sys/Makefile @@ -1,53 +1,54 @@ # $FreeBSD$ .include TESTSDIR= ${TESTSBASE}/sys TESTS_SUBDIRS+= acl TESTS_SUBDIRS+= aio TESTS_SUBDIRS+= ${_audit} TESTS_SUBDIRS+= auditpipe TESTS_SUBDIRS+= capsicum TESTS_SUBDIRS+= ${_cddl} TESTS_SUBDIRS+= devrandom TESTS_SUBDIRS+= fifo TESTS_SUBDIRS+= file TESTS_SUBDIRS+= fs TESTS_SUBDIRS+= geom TESTS_SUBDIRS+= kern TESTS_SUBDIRS+= kqueue TESTS_SUBDIRS+= mac TESTS_SUBDIRS+= mqueue TESTS_SUBDIRS+= net TESTS_SUBDIRS+= ${_netgraph} TESTS_SUBDIRS+= netinet TESTS_SUBDIRS+= netinet6 TESTS_SUBDIRS+= netipsec +TESTS_SUBDIRS+= netlink TESTS_SUBDIRS+= netmap TESTS_SUBDIRS+= netpfil TESTS_SUBDIRS+= opencrypto TESTS_SUBDIRS+= posixshm TESTS_SUBDIRS+= sys TESTS_SUBDIRS+= vfs TESTS_SUBDIRS+= vm TESTS_SUBDIRS+= vmm .if ${MK_AUDIT} != "no" _audit= audit .endif .if ${MK_CDDL} != "no" _cddl= cddl .endif .if ${MK_NETGRAPH} != "no" _netgraph= netgraph .endif # Items not integrated into kyua runs by default SUBDIR+= pjdfstest SUBDIR+= common .include diff --git a/tests/sys/netlink/test_rtnl_iface.py b/tests/sys/netlink/test_rtnl_iface.py index 38a3075f09c9..3340eaa4d16d 100644 --- a/tests/sys/netlink/test_rtnl_iface.py +++ b/tests/sys/netlink/test_rtnl_iface.py @@ -1,281 +1,283 @@ import errno import socket import pytest from atf_python.sys.net.netlink import IflattrType from atf_python.sys.net.netlink import IflinkInfo from atf_python.sys.net.netlink import IfLinkInfoDataVlan from atf_python.sys.net.netlink import NetlinkIflaMessage from atf_python.sys.net.netlink import NlAttrNested from atf_python.sys.net.netlink import NlAttrStr from atf_python.sys.net.netlink import NlAttrStrn from atf_python.sys.net.netlink import NlAttrU16 from atf_python.sys.net.netlink import NlAttrU32 from atf_python.sys.net.netlink import NlConst from atf_python.sys.net.netlink import NlHelper from atf_python.sys.net.netlink import NlmBaseFlags from atf_python.sys.net.netlink import NlmNewFlags from atf_python.sys.net.netlink import NlMsgType from atf_python.sys.net.netlink import NlRtMsgType from atf_python.sys.net.netlink import Nlsock from atf_python.sys.net.vnet import SingleVnetTestTemplate class TestRtNlIface(SingleVnetTestTemplate): + REQUIRED_MODULES = ["netlink"] + def setup_method(self, method): super().setup_method(method) self.helper = NlHelper() self.nlsock = Nlsock(NlConst.NETLINK_ROUTE, self.helper) def write_message(self, msg): print("") print("============= >> TX MESSAGE =============") msg.print_message() self.nlsock.write_data(bytes(msg)) msg.print_as_bytes(bytes(msg), "-- DATA --") def read_message(self): msg = self.nlsock.read_message() print("") print("============= << RX MESSAGE =============") msg.print_message() return msg def get_reply(self, tx_msg): self.write_message(tx_msg) while True: rx_msg = self.read_message() if tx_msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq: return rx_msg def get_interface_byname(self, ifname): msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value) msg.nl_hdr.nlmsg_flags = ( NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value ) msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, ifname)) self.write_message(msg) while True: rx_msg = self.read_message() if msg.nl_hdr.nlmsg_seq == rx_msg.nl_hdr.nlmsg_seq: if rx_msg.is_type(NlMsgType.NLMSG_ERROR): if rx_msg.error_code != 0: raise ValueError("unable to get interface {}".format(ifname)) elif rx_msg.is_type(NlRtMsgType.RTM_NEWLINK): return rx_msg else: raise ValueError("bad message") def test_get_iface_byname_error(self): """Tests error on fetching non-existing interface name""" msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value) msg.nl_hdr.nlmsg_flags = ( NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value ) msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10")) rx_msg = self.get_reply(msg) assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) assert rx_msg.error_code == errno.ENODEV def test_get_iface_byindex_error(self): """Tests error on fetching non-existing interface index""" msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value) msg.nl_hdr.nlmsg_flags = ( NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value ) msg.base_hdr.ifi_index = 2147483647 rx_msg = self.get_reply(msg) assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) assert rx_msg.error_code == errno.ENODEV @pytest.mark.require_user("root") def test_create_iface_plain(self): """Tests loopback creation w/o any parameters""" flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value) msg.nl_hdr.nlmsg_flags = ( flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value ) msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10")) msg.add_nla( NlAttrNested( IflattrType.IFLA_LINKINFO, [ NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"), ], ) ) rx_msg = self.get_reply(msg) assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) assert rx_msg.error_code == 0 self.get_interface_byname("lo10") @pytest.mark.require_user("root") def test_create_iface_attrs(self): """Tests interface creation with additional properties""" flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value) msg.nl_hdr.nlmsg_flags = ( flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value ) msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10")) msg.add_nla( NlAttrNested( IflattrType.IFLA_LINKINFO, [ NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"), ], ) ) # Custom attributes msg.add_nla(NlAttrStr(IflattrType.IFLA_IFALIAS, "test description")) msg.add_nla(NlAttrU32(IflattrType.IFLA_MTU, 1024)) rx_msg = self.get_reply(msg) assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) assert rx_msg.error_code == 0 iface_msg = self.get_interface_byname("lo10") assert iface_msg.get_nla(IflattrType.IFLA_IFALIAS).text == "test description" assert iface_msg.get_nla(IflattrType.IFLA_MTU).u32 == 1024 @pytest.mark.require_user("root") def test_modify_iface_attrs(self): """Tests interface modifications""" flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value) msg.nl_hdr.nlmsg_flags = ( flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value ) msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10")) msg.add_nla( NlAttrNested( IflattrType.IFLA_LINKINFO, [ NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"), ], ) ) rx_msg = self.get_reply(msg) assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) assert rx_msg.error_code == 0 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value) msg.nl_hdr.nlmsg_flags = ( NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value ) msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10")) # Custom attributes msg.add_nla(NlAttrStr(IflattrType.IFLA_IFALIAS, "test description")) msg.add_nla(NlAttrU32(IflattrType.IFLA_MTU, 1024)) rx_msg = self.get_reply(msg) assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) assert rx_msg.error_code == 0 iface_msg = self.get_interface_byname("lo10") assert iface_msg.get_nla(IflattrType.IFLA_IFALIAS).text == "test description" assert iface_msg.get_nla(IflattrType.IFLA_MTU).u32 == 1024 @pytest.mark.require_user("root") def test_delete_iface(self): """Tests interface modifications""" flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value) msg.nl_hdr.nlmsg_flags = ( flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value ) msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10")) msg.add_nla( NlAttrNested( IflattrType.IFLA_LINKINFO, [ NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "lo"), ], ) ) rx_msg = self.get_reply(msg) assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) assert rx_msg.error_code == 0 iface_msg = self.get_interface_byname("lo10") iface_idx = iface_msg.base_hdr.ifi_index msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_DELLINK.value) msg.nl_hdr.nlmsg_flags = ( NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value ) msg.base_hdr.ifi_index = iface_idx # msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "lo10")) rx_msg = self.get_reply(msg) assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) assert rx_msg.error_code == 0 msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_GETLINK.value) msg.nl_hdr.nlmsg_flags = ( NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value ) msg.base_hdr.ifi_index = 2147483647 rx_msg = self.get_reply(msg) assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) assert rx_msg.error_code == errno.ENODEV # # * # * {len=76, type=RTM_NEWLINK, flags=NLM_F_REQUEST|NLM_F_ACK|NLM_F_EXCL|NLM_F_CREATE, seq=1662892737, pid=0}, # * {ifi_family=AF_UNSPEC, ifi_type=ARPHRD_NETROM, ifi_index=0, ifi_flags=0, ifi_change=0}, # * [ # * {{nla_len=8, nla_type=IFLA_LINK}, 2}, # * {{nla_len=12, nla_type=IFLA_IFNAME}, "xvlan22"}, # * {{nla_len=24, nla_type=IFLA_LINKINFO}, # * [ # * {{nla_len=8, nla_type=IFLA_INFO_KIND}, "vlan"...}, # * {{nla_len=12, nla_type=IFLA_INFO_DATA}, "\x06\x00\x01\x00\x16\x00\x00\x00"}]}]}, iov_len=76}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, 0) = 76 # */ @pytest.mark.skip(reason="vlan support needs more work") @pytest.mark.require_user("root") def test_create_vlan_plain(self): """Creates 802.1Q VLAN interface in vlanXX and ifX fashion""" os_ifname = self.vnet.iface_alias_map["if1"].name ifindex = socket.if_nametoindex(os_ifname) flags = NlmNewFlags.NLM_F_EXCL.value | NlmNewFlags.NLM_F_CREATE.value msg = NetlinkIflaMessage(self.helper, NlRtMsgType.RTM_NEWLINK.value) msg.nl_hdr.nlmsg_flags = ( flags | NlmBaseFlags.NLM_F_ACK.value | NlmBaseFlags.NLM_F_REQUEST.value ) msg.add_nla(NlAttrU32(IflattrType.IFLA_LINK, ifindex)) msg.add_nla(NlAttrStr(IflattrType.IFLA_IFNAME, "vlan22")) msg.add_nla( NlAttrNested( IflattrType.IFLA_LINKINFO, [ NlAttrStrn(IflinkInfo.IFLA_INFO_KIND, "vlan"), NlAttrNested( IflinkInfo.IFLA_INFO_DATA, [ NlAttrU16(IfLinkInfoDataVlan.IFLA_VLAN_ID, 22), ], ), ], ) ) rx_msg = self.get_reply(msg) assert rx_msg.is_type(NlMsgType.NLMSG_ERROR) assert rx_msg.error_code == 0 self.get_interface_byname("vlan22") # ToolsHelper.print_net_debug()