diff --git a/tests/atf_python/atf_pytest.py b/tests/atf_python/atf_pytest.py index d530c7b4515c..db7244d3234b 100644 --- a/tests/atf_python/atf_pytest.py +++ b/tests/atf_python/atf_pytest.py @@ -1,239 +1,283 @@ import types from typing import Any from typing import Dict from typing import List from typing import NamedTuple +from typing import Optional from typing import Tuple import pytest import os def nodeid_to_method_name(nodeid: str) -> str: """file_name.py::ClassName::method_name[parametrize] -> method_name""" return nodeid.split("::")[-1].split("[")[0] class ATFCleanupItem(pytest.Item): def runtest(self): """Runs cleanup procedure for the test instead of the test itself""" instance = self.parent.cls() cleanup_name = "cleanup_{}".format(nodeid_to_method_name(self.nodeid)) if hasattr(instance, cleanup_name): cleanup = getattr(instance, cleanup_name) cleanup(self.nodeid) elif hasattr(instance, "cleanup"): instance.cleanup(self.nodeid) def setup_method_noop(self, method): """Overrides runtest setup method""" pass def teardown_method_noop(self, method): """Overrides runtest teardown method""" pass class ATFTestObj(object): def __init__(self, obj, has_cleanup): # Use nodeid without name to properly name class-derived tests self.ident = obj.nodeid.split("::", 1)[1] self.description = self._get_test_description(obj) self.has_cleanup = has_cleanup self.obj = obj def _get_test_description(self, obj): """Returns first non-empty line from func docstring or func name""" docstr = obj.function.__doc__ if docstr: for line in docstr.split("\n"): if line: return line return obj.name + @staticmethod + def _convert_user_mark(mark, obj, ret: Dict): + username = mark.args[0] + if username == "unprivileged": + # Special unprivileged user requested. + # First, require the unprivileged-user config option presence + key = "require.config" + if key not in ret: + ret[key] = "unprivileged_user" + else: + ret[key] = "{} {}".format(ret[key], "unprivileged_user") + # Check if the framework requires root + test_cls = ATFHandler.get_test_class(obj) + if test_cls and getattr(test_cls, "NEED_ROOT", False): + # Yes, so we ask kyua to run us under root instead + # It is up to the implementation to switch back to the desired + # user + ret["require.user"] = "root" + else: + ret["require.user"] = username + + def _convert_marks(self, obj) -> Dict[str, Any]: wj_func = lambda x: " ".join(x) # noqa: E731 _map: Dict[str, Dict] = { - "require_user": {"name": "require.user"}, + "require_user": {"handler": self._convert_user_mark}, "require_arch": {"name": "require.arch", "fmt": wj_func}, "require_diskspace": {"name": "require.diskspace"}, "require_files": {"name": "require.files", "fmt": wj_func}, "require_machine": {"name": "require.machine", "fmt": wj_func}, "require_memory": {"name": "require.memory"}, "require_progs": {"name": "require.progs", "fmt": wj_func}, "timeout": {}, } ret = {} for mark in obj.iter_markers(): if mark.name in _map: + if "handler" in _map[mark.name]: + _map[mark.name]["handler"](mark, obj, ret) + continue name = _map[mark.name].get("name", mark.name) if "fmt" in _map[mark.name]: val = _map[mark.name]["fmt"](mark.args[0]) else: val = mark.args[0] ret[name] = val return ret def as_lines(self) -> List[str]: """Output test definition in ATF-specific format""" ret = [] ret.append("ident: {}".format(self.ident)) ret.append("descr: {}".format(self._get_test_description(self.obj))) if self.has_cleanup: ret.append("has.cleanup: true") for key, value in self._convert_marks(self.obj).items(): ret.append("{}: {}".format(key, value)) return ret class ATFHandler(object): class ReportState(NamedTuple): state: str reason: str - def __init__(self): + def __init__(self, report_file_name: Optional[str]): self._tests_state_map: Dict[str, ReportStatus] = {} + self._report_file_name = report_file_name + self._report_file_handle = None + + def setup_configure(self): + fname = self._report_file_name + if fname: + self._report_file_handle = open(fname, mode="w") + + def setup_method_pre(self, item): + """Called before actually running the test setup_method""" + # Check if we need to manually drop the privileges + for mark in item.iter_markers(): + if mark.name == "require_user": + cls = self.get_test_class(item) + cls.TARGET_USER = mark.args[0] + break def override_runtest(self, obj): # Override basic runtest command obj.runtest = types.MethodType(ATFCleanupItem.runtest, obj) # Override class setup/teardown obj.parent.cls.setup_method = ATFCleanupItem.setup_method_noop obj.parent.cls.teardown_method = ATFCleanupItem.teardown_method_noop @staticmethod def get_test_class(obj): if hasattr(obj, "parent") and obj.parent is not None: if hasattr(obj.parent, "cls"): return obj.parent.cls def has_object_cleanup(self, obj): cls = self.get_test_class(obj) if cls is not None: method_name = nodeid_to_method_name(obj.nodeid) cleanup_name = "cleanup_{}".format(method_name) if hasattr(cls, "cleanup") or hasattr(cls, cleanup_name): return True return False def list_tests(self, tests: List[str]): print('Content-Type: application/X-atf-tp; version="1"') print() for test_obj in tests: has_cleanup = self.has_object_cleanup(test_obj) atf_test = ATFTestObj(test_obj, has_cleanup) for line in atf_test.as_lines(): print(line) print() def set_report_state(self, test_name: str, state: str, reason: str): self._tests_state_map[test_name] = self.ReportState(state, reason) def _extract_report_reason(self, report): data = report.longrepr if data is None: return None if isinstance(data, Tuple): # ('/path/to/test.py', 23, 'Skipped: unable to test') reason = data[2] for prefix in "Skipped: ": if reason.startswith(prefix): reason = reason[len(prefix):] return reason else: # string/ traceback / exception report. Capture the last line return str(data).split("\n")[-1] return None def add_report(self, report): # MAP pytest report state to the atf-desired state # # ATF test states: # (1) expected_death, (2) expected_exit, (3) expected_failure # (4) expected_signal, (5) expected_timeout, (6) passed # (7) skipped, (8) failed # # Note that ATF don't have the concept of "soft xfail" - xpass # is a failure. It also calls teardown routine in a separate # process, thus teardown states (pytest-only) are handled as # body continuation. # (stage, state, wasxfail) # Just a passing test: WANT: passed # GOT: (setup, passed, F), (call, passed, F), (teardown, passed, F) # # Failing body test: WHAT: failed # GOT: (setup, passed, F), (call, failed, F), (teardown, passed, F) # # pytest.skip test decorator: WANT: skipped # GOT: (setup,skipped, False), (teardown, passed, False) # # pytest.skip call inside test function: WANT: skipped # GOT: (setup, passed, F), (call, skipped, F), (teardown,passed, F) # # mark.xfail decorator+pytest.xfail: WANT: expected_failure # GOT: (setup, passed, F), (call, skipped, T), (teardown, passed, F) # # mark.xfail decorator+pass: WANT: failed # GOT: (setup, passed, F), (call, passed, T), (teardown, passed, F) test_name = report.location[2] stage = report.when state = report.outcome reason = self._extract_report_reason(report) # We don't care about strict xfail - it gets translated to False if stage == "setup": if state in ("skipped", "failed"): # failed init -> failed test, skipped setup -> xskip # for the whole test self.set_report_state(test_name, state, reason) elif stage == "call": # "call" stage shouldn't matter if setup failed if test_name in self._tests_state_map: if self._tests_state_map[test_name].state == "failed": return if state == "failed": # Record failure & override "skipped" state self.set_report_state(test_name, state, reason) elif state == "skipped": if hasattr(reason, "wasxfail"): # xfail() called in the test body state = "expected_failure" else: # skip inside the body pass self.set_report_state(test_name, state, reason) elif state == "passed": if hasattr(reason, "wasxfail"): # the test was expected to fail but didn't # mark as hard failure state = "failed" self.set_report_state(test_name, state, reason) elif stage == "teardown": if state == "failed": # teardown should be empty, as the cleanup # procedures should be implemented as a separate # function/method, so mark teardown failure as # global failure self.set_report_state(test_name, state, reason) - def write_report(self, path): + def write_report(self): + if self._report_file_handle is None: + return if self._tests_state_map: # If we're executing in ATF mode, there has to be just one test # Anyway, deterministically pick the first one first_test_name = next(iter(self._tests_state_map)) test = self._tests_state_map[first_test_name] if test.state == "passed": line = test.state else: line = "{}: {}".format(test.state, test.reason) - with open(path, mode="w") as f: - print(line, file=f) + print(line, file=self._report_file_handle) + self._report_file_handle.close() @staticmethod def get_atf_vars() -> Dict[str, str]: px = "_ATF_VAR_" return {k[len(px):]: v for k, v in os.environ.items() if k.startswith(px)} diff --git a/tests/atf_python/sys/net/vnet.py b/tests/atf_python/sys/net/vnet.py index 1f61269ffe6c..c0e0a24f6687 100644 --- a/tests/atf_python/sys/net/vnet.py +++ b/tests/atf_python/sys/net/vnet.py @@ -1,507 +1,510 @@ #!/usr/local/bin/python3 import copy import ipaddress import re import os import socket import sys import time from multiprocessing import Pipe from multiprocessing import Process from typing import Dict from typing import List from typing import NamedTuple from atf_python.sys.net.tools import ToolsHelper from atf_python.utils import BaseTest from atf_python.utils import libc def run_cmd(cmd: str, verbose=True) -> str: print("run: '{}'".format(cmd)) return os.popen(cmd).read() def get_topology_id(test_id: str) -> str: """ Gets a unique topology id based on the pytest test_id. "test_ip6_output.py::TestIP6Output::test_output6_pktinfo[ipandif]" -> "TestIP6Output:test_output6_pktinfo[ipandif]" """ return ":".join(test_id.split("::")[-2:]) def convert_test_name(test_name: str) -> str: """Convert test name to a string that can be used in the file/jail names""" ret = "" for char in test_name: if char.isalnum() or char in ("_", "-", ":"): ret += char elif char in ("["): ret += "_" return ret class VnetInterface(object): # defines from net/if_types.h IFT_LOOP = 0x18 IFT_ETHER = 0x06 def __init__(self, iface_alias: str, iface_name: str): self.name = iface_name self.alias = iface_alias self.vnet_name = "" self.jailed = False self.addr_map: Dict[str, Dict] = {"inet6": {}, "inet": {}} self.prefixes4: List[List[str]] = [] self.prefixes6: List[List[str]] = [] if iface_name.startswith("lo"): self.iftype = self.IFT_LOOP else: self.iftype = self.IFT_ETHER @property def ifindex(self): return socket.if_nametoindex(self.name) @property def first_ipv6(self): d = self.addr_map["inet6"] return d[next(iter(d))] @property def first_ipv4(self): d = self.addr_map["inet"] return d[next(iter(d))] def set_vnet(self, vnet_name: str): self.vnet_name = vnet_name def set_jailed(self, jailed: bool): self.jailed = jailed def run_cmd( self, cmd, verbose=False, ): if self.vnet_name and not self.jailed: cmd = "jexec {} {}".format(self.vnet_name, cmd) return run_cmd(cmd, verbose) @classmethod def setup_loopback(cls, vnet_name: str): lo = VnetInterface("", "lo0") lo.set_vnet(vnet_name) lo.setup_addr("127.0.0.1/8") lo.turn_up() @classmethod def create_iface(cls, alias_name: str, iface_name: str) -> List["VnetInterface"]: name = run_cmd("/sbin/ifconfig {} create".format(iface_name)).rstrip() if not name: raise Exception("Unable to create iface {}".format(iface_name)) ret = [cls(alias_name, name)] if name.startswith("epair"): ret.append(cls(alias_name, name[:-1] + "b")) return ret def setup_addr(self, _addr: str): addr = ipaddress.ip_interface(_addr) if addr.version == 6: family = "inet6" cmd = "/sbin/ifconfig {} {} {}".format(self.name, family, addr) else: family = "inet" if self.addr_map[family]: cmd = "/sbin/ifconfig {} alias {}".format(self.name, addr) else: cmd = "/sbin/ifconfig {} {} {}".format(self.name, family, addr) self.run_cmd(cmd) self.addr_map[family][str(addr.ip)] = addr def delete_addr(self, _addr: str): addr = ipaddress.ip_address(_addr) if addr.version == 6: family = "inet6" cmd = "/sbin/ifconfig {} inet6 {} delete".format(self.name, addr) else: family = "inet" cmd = "/sbin/ifconfig {} -alias {}".format(self.name, addr) self.run_cmd(cmd) del self.addr_map[family][str(addr)] def turn_up(self): cmd = "/sbin/ifconfig {} up".format(self.name) self.run_cmd(cmd) def enable_ipv6(self): cmd = "/usr/sbin/ndp -i {} -disabled".format(self.name) self.run_cmd(cmd) def has_tentative(self) -> bool: """True if an interface has some addresses in tenative state""" cmd = "/sbin/ifconfig {} inet6".format(self.name) out = self.run_cmd(cmd, verbose=False) for line in out.splitlines(): if "tentative" in line: return True return False class IfaceFactory(object): INTERFACES_FNAME = "created_ifaces.lst" AUTODELETE_TYPES = ("epair", "lo", "tap", "tun") def __init__(self): self.file_name = self.INTERFACES_FNAME def _register_iface(self, iface_name: str): with open(self.file_name, "a") as f: f.write(iface_name + "\n") def _list_ifaces(self) -> List[str]: ret: List[str] = [] try: with open(self.file_name, "r") as f: for line in f: ret.append(line.strip()) except OSError: pass return ret def create_iface(self, alias_name: str, iface_name: str) -> List[VnetInterface]: ifaces = VnetInterface.create_iface(alias_name, iface_name) for iface in ifaces: if not self.is_autodeleted(iface.name): self._register_iface(iface.name) return ifaces @staticmethod def is_autodeleted(iface_name: str) -> bool: iface_type = re.split(r"\d+", iface_name)[0] return iface_type in IfaceFactory.AUTODELETE_TYPES def cleanup_vnet_interfaces(self, vnet_name: str) -> List[str]: """Destroys""" ifaces_lst = ToolsHelper.get_output( "/usr/sbin/jexec {} ifconfig -l".format(vnet_name) ) for iface_name in ifaces_lst.split(): if not self.is_autodeleted(iface_name): if iface_name not in self._list_ifaces(): print("Skipping interface {}:{}".format(vnet_name, iface_name)) continue run_cmd( "/usr/sbin/jexec {} ifconfig {} destroy".format(vnet_name, iface_name) ) def cleanup(self): try: os.unlink(self.INTERFACES_FNAME) except OSError: pass class VnetInstance(object): def __init__( self, vnet_alias: str, vnet_name: str, jid: int, ifaces: List[VnetInterface] ): self.name = vnet_name self.alias = vnet_alias # reference in the test topology self.jid = jid self.ifaces = ifaces self.iface_alias_map = {} # iface.alias: iface self.iface_map = {} # iface.name: iface for iface in ifaces: iface.set_vnet(vnet_name) iface.set_jailed(True) self.iface_alias_map[iface.alias] = iface self.iface_map[iface.name] = iface self.need_dad = False # Disable duplicate address detection by default self.attached = False self.pipe = None self.subprocess = None def run_vnet_cmd(self, cmd): if not self.attached: cmd = "jexec {} {}".format(self.name, cmd) return run_cmd(cmd) def disable_dad(self): self.run_vnet_cmd("/sbin/sysctl net.inet6.ip6.dad_count=0") def set_pipe(self, pipe): self.pipe = pipe def set_subprocess(self, p): self.subprocess = p @staticmethod def attach_jid(jid: int): error_code = libc.jail_attach(jid) if error_code != 0: raise Exception("jail_attach() failed: errno {}".format(error_code)) def attach(self): self.attach_jid(self.jid) self.attached = True class VnetFactory(object): JAILS_FNAME = "created_jails.lst" def __init__(self, topology_id: str): self.topology_id = topology_id self.file_name = self.JAILS_FNAME self._vnets: List[str] = [] def _register_vnet(self, vnet_name: str): self._vnets.append(vnet_name) with open(self.file_name, "a") as f: f.write(vnet_name + "\n") @staticmethod def _wait_interfaces(vnet_name: str, ifaces: List[str]) -> List[str]: cmd = "jexec {} /sbin/ifconfig -l".format(vnet_name) not_matched: List[str] = [] for i in range(50): vnet_ifaces = run_cmd(cmd).strip().split(" ") not_matched = [] for iface_name in ifaces: if iface_name not in vnet_ifaces: not_matched.append(iface_name) if len(not_matched) == 0: return [] time.sleep(0.1) return not_matched def create_vnet(self, vnet_alias: str, ifaces: List[VnetInterface]): vnet_name = "pytest:{}".format(convert_test_name(self.topology_id)) if self._vnets: # add number to distinguish jails vnet_name = "{}_{}".format(vnet_name, len(self._vnets) + 1) iface_cmds = " ".join(["vnet.interface={}".format(i.name) for i in ifaces]) cmd = "/usr/sbin/jail -i -c name={} persist vnet {}".format( vnet_name, iface_cmds ) jid = 0 try: jid_str = run_cmd(cmd) jid = int(jid_str) except ValueError: print("Jail creation failed, output: {}".format(jid_str)) raise self._register_vnet(vnet_name) # Run expedited version of routing VnetInterface.setup_loopback(vnet_name) not_found = self._wait_interfaces(vnet_name, [i.name for i in ifaces]) if not_found: raise Exception( "Interfaces {} has not appeared in vnet {}".format(not_found, vnet_name) ) return VnetInstance(vnet_alias, vnet_name, jid, ifaces) def cleanup(self): iface_factory = IfaceFactory() try: with open(self.file_name) as f: for line in f: vnet_name = line.strip() iface_factory.cleanup_vnet_interfaces(vnet_name) run_cmd("/usr/sbin/jail -r {}".format(vnet_name)) os.unlink(self.JAILS_FNAME) except OSError: pass class SingleInterfaceMap(NamedTuple): ifaces: List[VnetInterface] vnet_aliases: List[str] class ObjectsMap(NamedTuple): iface_map: Dict[str, SingleInterfaceMap] # keyed by ifX vnet_map: Dict[str, VnetInstance] # keyed by vnetX topo_map: Dict # self.TOPOLOGY class VnetTestTemplate(BaseTest): + NEED_ROOT: bool = True TOPOLOGY = {} def _get_vnet_handler(self, vnet_alias: str): handler_name = "{}_handler".format(vnet_alias) return getattr(self, handler_name, None) def _setup_vnet(self, vnet: VnetInstance, obj_map: Dict, pipe): """Base Handler to setup given VNET. Can be run in a subprocess. If so, passes control to the special vnetX_handler() after setting up interface addresses """ vnet.attach() print("# setup_vnet({})".format(vnet.name)) if pipe is not None: vnet.set_pipe(pipe) topo = obj_map.topo_map ipv6_ifaces = [] # Disable DAD if not vnet.need_dad: vnet.disable_dad() for iface in vnet.ifaces: # check index of vnet within an interface # as we have prefixes for both ends of the interface iface_map = obj_map.iface_map[iface.alias] idx = iface_map.vnet_aliases.index(vnet.alias) prefixes6 = topo[iface.alias].get("prefixes6", []) prefixes4 = topo[iface.alias].get("prefixes4", []) if prefixes6 or prefixes4: ipv6_ifaces.append(iface) iface.turn_up() if prefixes6: iface.enable_ipv6() for prefix in prefixes6 + prefixes4: iface.setup_addr(prefix[idx]) for iface in ipv6_ifaces: while iface.has_tentative(): time.sleep(0.1) # Run actual handler handler = self._get_vnet_handler(vnet.alias) if handler: # Do unbuffered stdout for children # so the logs are present if the child hangs sys.stdout.reconfigure(line_buffering=True) + self.drop_privileges() handler(vnet) def setup_topology(self, topo: Dict, topology_id: str): """Creates jails & interfaces for the provided topology""" iface_map: Dict[str, SingleInterfaceMap] = {} vnet_map = {} iface_factory = IfaceFactory() vnet_factory = VnetFactory(topology_id) for obj_name, obj_data in topo.items(): if obj_name.startswith("if"): epair_ifaces = iface_factory.create_iface(obj_name, "epair") smap = SingleInterfaceMap(epair_ifaces, []) iface_map[obj_name] = smap for obj_name, obj_data in topo.items(): if obj_name.startswith("vnet"): vnet_ifaces = [] for iface_alias in obj_data["ifaces"]: # epair creates 2 interfaces, grab first _available_ # and map it to the VNET being created idx = len(iface_map[iface_alias].vnet_aliases) iface_map[iface_alias].vnet_aliases.append(obj_name) vnet_ifaces.append(iface_map[iface_alias].ifaces[idx]) vnet = vnet_factory.create_vnet(obj_name, vnet_ifaces) vnet_map[obj_name] = vnet # Debug output print("============= TEST TOPOLOGY =============") for vnet_alias, vnet in vnet_map.items(): print("# vnet {} -> {}".format(vnet.alias, vnet.name), end="") handler = self._get_vnet_handler(vnet.alias) if handler: print(" handler: {}".format(handler.__name__), end="") print() for iface_alias, iface_data in iface_map.items(): vnets = iface_data.vnet_aliases ifaces: List[VnetInterface] = iface_data.ifaces if len(vnets) == 1 and len(ifaces) == 2: print( "# iface {}: {}::{} -> main::{}".format( iface_alias, vnets[0], ifaces[0].name, ifaces[1].name ) ) elif len(vnets) == 2 and len(ifaces) == 2: print( "# iface {}: {}::{} -> {}::{}".format( iface_alias, vnets[0], ifaces[0].name, vnets[1], ifaces[1].name ) ) else: print( "# iface {}: ifaces: {} vnets: {}".format( iface_alias, vnets, [i.name for i in ifaces] ) ) print() return ObjectsMap(iface_map, vnet_map, topo) def setup_method(self, _method): """Sets up all the required topology and handlers for the given test""" super().setup_method(_method) # TestIP6Output.test_output6_pktinfo[ipandif] topology_id = get_topology_id(self.test_id) topology = self.TOPOLOGY # First, setup kernel objects - interfaces & vnets obj_map = self.setup_topology(topology, topology_id) main_vnet = None # one without subprocess handler for vnet_alias, vnet in obj_map.vnet_map.items(): if self._get_vnet_handler(vnet_alias): # Need subprocess to run parent_pipe, child_pipe = Pipe() p = Process( target=self._setup_vnet, args=( vnet, obj_map, child_pipe, ), ) vnet.set_pipe(parent_pipe) vnet.set_subprocess(p) p.start() else: if main_vnet is not None: raise Exception("there can be only 1 VNET w/o handler") main_vnet = vnet # Main vnet needs to be the last, so all the other subprocesses # are started & their pipe handles collected self.vnet = main_vnet self._setup_vnet(main_vnet, obj_map, None) # Save state for the main handler self.iface_map = obj_map.iface_map self.vnet_map = obj_map.vnet_map + self.drop_privileges() def cleanup(self, test_id: str): # pytest test id: file::class::test_name topology_id = get_topology_id(self.test_id) print("==== vnet cleanup ===") print("# topology_id: '{}'".format(topology_id)) VnetFactory(topology_id).cleanup() IfaceFactory().cleanup() def wait_object(self, pipe, timeout=5): if pipe.poll(timeout): return pipe.recv() raise TimeoutError def send_object(self, pipe, obj): pipe.send(obj) @property def curvnet(self): pass class SingleVnetTestTemplate(VnetTestTemplate): IPV6_PREFIXES: List[str] = [] IPV4_PREFIXES: List[str] = [] def setup_method(self, method): topology = copy.deepcopy( { "vnet1": {"ifaces": ["if1"]}, "if1": {"prefixes4": [], "prefixes6": []}, } ) for prefix in self.IPV6_PREFIXES: topology["if1"]["prefixes6"].append((prefix,)) for prefix in self.IPV4_PREFIXES: topology["if1"]["prefixes4"].append((prefix,)) self.TOPOLOGY = topology super().setup_method(method) diff --git a/tests/atf_python/utils.py b/tests/atf_python/utils.py index fddfadac9a56..c8146b943ce9 100644 --- a/tests/atf_python/utils.py +++ b/tests/atf_python/utils.py @@ -1,52 +1,73 @@ #!/usr/bin/env python3 import os +import pwd from ctypes import CDLL from ctypes import get_errno from ctypes.util import find_library +from typing import Dict from typing import List from typing import Optional import pytest class LibCWrapper(object): def __init__(self): path: Optional[str] = find_library("c") if path is None: raise RuntimeError("libc not found") self._libc = CDLL(path, use_errno=True) def modfind(self, mod_name: str) -> int: if self._libc.modfind(bytes(mod_name, encoding="ascii")) == -1: return get_errno() return 0 def jail_attach(self, jid: int) -> int: if self._libc.jail_attach(jid) != 0: return get_errno() return 0 libc = LibCWrapper() class BaseTest(object): + NEED_ROOT: bool = False # True if the class needs root privileges for the setup + TARGET_USER = None # Set to the target user by the framework REQUIRED_MODULES: List[str] = [] def _check_modules(self): for mod_name in self.REQUIRED_MODULES: error_code = libc.modfind(mod_name) if error_code != 0: err_str = os.strerror(error_code) pytest.skip( "kernel module '{}' not available: {}".format(mod_name, err_str) ) + @property + def atf_vars(self) -> Dict[str, str]: + px = "_ATF_VAR_" + return {k[len(px):]: v for k, v in os.environ.items() if k.startswith(px)} + + def drop_privileges_user(self, user: str): + uid = pwd.getpwnam(user)[2] + print("Dropping privs to {}/{}".format(user, uid)) + os.setuid(uid) + + def drop_privileges(self): + if self.TARGET_USER: + if self.TARGET_USER == "unprivileged": + user = self.atf_vars["unprivileged-user"] + else: + user = self.TARGET_USER + self.drop_privileges_user(user) @property - def test_id(self): + def test_id(self) -> str: # 'test_ip6_output.py::TestIP6Output::test_output6_pktinfo[ipandif] (setup)' return os.environ.get("PYTEST_CURRENT_TEST").split(" ")[0] def setup_method(self, method): """Run all pre-requisits for the test execution""" self._check_modules() diff --git a/tests/conftest.py b/tests/conftest.py index b5ce5ae2286b..687f6bde375e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,126 +1,136 @@ import pytest from atf_python.atf_pytest import ATFHandler from typing import Dict PLUGIN_ENABLED = False DEFAULT_HANDLER = None +def set_handler(config): + global DEFAULT_HANDLER, PLUGIN_ENABLED + DEFAULT_HANDLER = ATFHandler(report_file_name=config.option.atf_file) + PLUGIN_ENABLED = True + return DEFAULT_HANDLER + + def get_handler(): - global DEFAULT_HANDLER - if DEFAULT_HANDLER is None: - DEFAULT_HANDLER = ATFHandler() return DEFAULT_HANDLER def pytest_addoption(parser): """Add file output""" # Add meta-values group = parser.getgroup("general", "Running and selection options") group.addoption( "--atf-source-dir", type=str, dest="atf_source_dir", help="Path to the test source directory", ) group.addoption( "--atf-cleanup", default=False, action="store_true", dest="atf_cleanup", help="Call cleanup procedure for a given test", ) group = parser.getgroup("terminal reporting", "reporting", after="general") group.addoption( "--atf", default=False, action="store_true", help="Enable test listing/results output in atf format", ) group.addoption( "--atf-file", type=str, dest="atf_file", help="Path to the status file provided by atf runtime", ) @pytest.fixture(autouse=True, scope="session") def atf_vars() -> Dict[str, str]: return ATFHandler.get_atf_vars() @pytest.hookimpl(trylast=True) def pytest_configure(config): if config.option.help: return # Register markings anyway to avoid warnings config.addinivalue_line("markers", "require_user(name): user to run the test with") config.addinivalue_line( "markers", "require_arch(names): List[str] of support archs" ) # config.addinivalue_line("markers", "require_config(config): List[Tuple[str,Any]] of k=v pairs") config.addinivalue_line( "markers", "require_diskspace(amount): str with required diskspace" ) config.addinivalue_line( "markers", "require_files(space): List[str] with file paths" ) config.addinivalue_line( "markers", "require_machine(names): List[str] of support machine types" ) config.addinivalue_line( "markers", "require_memory(amount): str with required memory" ) config.addinivalue_line( "markers", "require_progs(space): List[str] with file paths" ) config.addinivalue_line( "markers", "timeout(dur): int/float with max duration in sec" ) - global PLUGIN_ENABLED - PLUGIN_ENABLED = config.option.atf - if not PLUGIN_ENABLED: + if not config.option.atf: return - get_handler() + handler = set_handler(config) if config.option.collectonly: # Need to output list of tests to stdout, hence override # standard reporter plugin reporter = config.pluginmanager.getplugin("terminalreporter") if reporter: config.pluginmanager.unregister(reporter) + else: + handler.setup_configure() def pytest_collection_modifyitems(session, config, items): """If cleanup is requested, replace collected tests with their cleanups (if any)""" if PLUGIN_ENABLED and config.option.atf_cleanup: new_items = [] handler = get_handler() for obj in items: if handler.has_object_cleanup(obj): handler.override_runtest(obj) new_items.append(obj) items.clear() items.extend(new_items) def pytest_collection_finish(session): if PLUGIN_ENABLED and session.config.option.collectonly: handler = get_handler() handler.list_tests(session.items) +def pytest_runtest_setup(item): + if PLUGIN_ENABLED: + handler = get_handler() + handler.setup_method_pre(item) + + def pytest_runtest_logreport(report): if PLUGIN_ENABLED: handler = get_handler() handler.add_report(report) def pytest_unconfigure(config): if PLUGIN_ENABLED and config.option.atf_file: handler = get_handler() - handler.write_report(config.option.atf_file) + handler.write_report()