diff --git a/tests/atf_python/atf_pytest.py b/tests/atf_python/atf_pytest.py --- a/tests/atf_python/atf_pytest.py +++ b/tests/atf_python/atf_pytest.py @@ -3,6 +3,7 @@ from typing import Dict from typing import List from typing import NamedTuple +from typing import Optional from typing import Tuple import pytest @@ -51,10 +52,32 @@ return line return obj.name + @staticmethod + def _convert_user_mark(mark, obj, ret: Dict): + username = mark.args[0] + if username == "unprivileged": + # Special unprivileged user requested. + # First, require the unprivileged-user config option presence + key = "require.config" + if key not in ret: + ret[key] = "unprivileged_user" + else: + ret[key] = "{} {}".format(ret[key], "unprivileged_user") + # Check if the framework requires root + test_cls = ATFHandler.get_test_class(obj) + if test_cls and getattr(test_cls, "NEED_ROOT", False): + # Yes, so we ask kyua to run us under root instead + # It is up to the implementation to switch back to the desired + # user + ret["require.user"] = "root" + else: + ret["require.user"] = username + + def _convert_marks(self, obj) -> Dict[str, Any]: wj_func = lambda x: " ".join(x) # noqa: E731 _map: Dict[str, Dict] = { - "require_user": {"name": "require.user"}, + "require_user": {"handler": self._convert_user_mark}, "require_arch": {"name": "require.arch", "fmt": wj_func}, "require_diskspace": {"name": "require.diskspace"}, "require_files": {"name": "require.files", "fmt": wj_func}, @@ -66,6 +89,9 @@ ret = {} for mark in obj.iter_markers(): if mark.name in _map: + if "handler" in _map[mark.name]: + _map[mark.name]["handler"](mark, obj, ret) + continue name = _map[mark.name].get("name", mark.name) if "fmt" in _map[mark.name]: val = _map[mark.name]["fmt"](mark.args[0]) @@ -91,8 +117,24 @@ state: str reason: str - def __init__(self): + def __init__(self, report_file_name: Optional[str]): self._tests_state_map: Dict[str, ReportStatus] = {} + self._report_file_name = report_file_name + self._report_file_handle = None + + def setup_configure(self): + fname = self._report_file_name + if fname: + self._report_file_handle = open(fname, mode="w") + + def setup_method_pre(self, item): + """Called before actually running the test setup_method""" + # Check if we need to manually drop the privileges + for mark in item.iter_markers(): + if mark.name == "require_user": + cls = self.get_test_class(item) + cls.TARGET_USER = mark.args[0] + break def override_runtest(self, obj): # Override basic runtest command @@ -220,7 +262,9 @@ # global failure self.set_report_state(test_name, state, reason) - def write_report(self, path): + def write_report(self): + if self._report_file_handle is None: + return if self._tests_state_map: # If we're executing in ATF mode, there has to be just one test # Anyway, deterministically pick the first one @@ -230,8 +274,8 @@ line = test.state else: line = "{}: {}".format(test.state, test.reason) - with open(path, mode="w") as f: - print(line, file=f) + print(line, file=self._report_file_handle) + self._report_file_handle.close() @staticmethod def get_atf_vars() -> Dict[str, str]: diff --git a/tests/atf_python/sys/net/vnet.py b/tests/atf_python/sys/net/vnet.py --- a/tests/atf_python/sys/net/vnet.py +++ b/tests/atf_python/sys/net/vnet.py @@ -300,6 +300,7 @@ class VnetTestTemplate(BaseTest): + NEED_ROOT: bool = True TOPOLOGY = {} def _get_vnet_handler(self, vnet_alias: str): @@ -345,6 +346,7 @@ # Do unbuffered stdout for children # so the logs are present if the child hangs sys.stdout.reconfigure(line_buffering=True) + self.drop_privileges() handler(vnet) def setup_topology(self, topo: Dict, topology_id: str): @@ -436,6 +438,7 @@ # Save state for the main handler self.iface_map = obj_map.iface_map self.vnet_map = obj_map.vnet_map + self.drop_privileges() def cleanup(self, test_id: str): # pytest test id: file::class::test_name diff --git a/tests/atf_python/utils.py b/tests/atf_python/utils.py --- a/tests/atf_python/utils.py +++ b/tests/atf_python/utils.py @@ -1,8 +1,10 @@ #!/usr/bin/env python3 import os +import pwd from ctypes import CDLL from ctypes import get_errno from ctypes.util import find_library +from typing import Dict from typing import List from typing import Optional @@ -31,6 +33,8 @@ class BaseTest(object): + NEED_ROOT: bool = False # True if the class needs root privileges for the setup + TARGET_USER = None # Set to the target user by the framework REQUIRED_MODULES: List[str] = [] def _check_modules(self): @@ -41,9 +45,26 @@ pytest.skip( "kernel module '{}' not available: {}".format(mod_name, err_str) ) + @property + def atf_vars(self) -> Dict[str, str]: + px = "_ATF_VAR_" + return {k[len(px):]: v for k, v in os.environ.items() if k.startswith(px)} + + def drop_privileges_user(self, user: str): + uid = pwd.getpwnam(user)[2] + print("Dropping privs to {}/{}".format(user, uid)) + os.setuid(uid) + + def drop_privileges(self): + if self.TARGET_USER: + if self.TARGET_USER == "unprivileged": + user = self.atf_vars["unprivileged-user"] + else: + user = self.TARGET_USER + self.drop_privileges_user(user) @property - def test_id(self): + def test_id(self) -> str: # 'test_ip6_output.py::TestIP6Output::test_output6_pktinfo[ipandif] (setup)' return os.environ.get("PYTEST_CURRENT_TEST").split(" ")[0] diff --git a/tests/conftest.py b/tests/conftest.py --- a/tests/conftest.py +++ b/tests/conftest.py @@ -7,10 +7,14 @@ DEFAULT_HANDLER = None +def set_handler(config): + global DEFAULT_HANDLER, PLUGIN_ENABLED + DEFAULT_HANDLER = ATFHandler(report_file_name=config.option.atf_file) + PLUGIN_ENABLED = True + return DEFAULT_HANDLER + + def get_handler(): - global DEFAULT_HANDLER - if DEFAULT_HANDLER is None: - DEFAULT_HANDLER = ATFHandler() return DEFAULT_HANDLER @@ -81,11 +85,9 @@ "markers", "timeout(dur): int/float with max duration in sec" ) - global PLUGIN_ENABLED - PLUGIN_ENABLED = config.option.atf - if not PLUGIN_ENABLED: + if not config.option.atf: return - get_handler() + handler = set_handler(config) if config.option.collectonly: # Need to output list of tests to stdout, hence override @@ -93,6 +95,8 @@ reporter = config.pluginmanager.getplugin("terminalreporter") if reporter: config.pluginmanager.unregister(reporter) + else: + handler.setup_configure() def pytest_collection_modifyitems(session, config, items): @@ -114,6 +118,12 @@ handler.list_tests(session.items) +def pytest_runtest_setup(item): + if PLUGIN_ENABLED: + handler = get_handler() + handler.setup_method_pre(item) + + def pytest_runtest_logreport(report): if PLUGIN_ENABLED: handler = get_handler() @@ -123,4 +133,4 @@ def pytest_unconfigure(config): if PLUGIN_ENABLED and config.option.atf_file: handler = get_handler() - handler.write_report(config.option.atf_file) + handler.write_report()