Changeset View
Changeset View
Standalone View
Standalone View
tests/atf_python/atf_pytest.py
import types | import types | ||||
from typing import Any | from typing import Any | ||||
from typing import Dict | from typing import Dict | ||||
from typing import List | from typing import List | ||||
from typing import NamedTuple | from typing import NamedTuple | ||||
from typing import Optional | |||||
from typing import Tuple | from typing import Tuple | ||||
import pytest | import pytest | ||||
import os | import os | ||||
def nodeid_to_method_name(nodeid: str) -> str: | def nodeid_to_method_name(nodeid: str) -> str: | ||||
"""file_name.py::ClassName::method_name[parametrize] -> method_name""" | """file_name.py::ClassName::method_name[parametrize] -> method_name""" | ||||
Show All 32 Lines | def _get_test_description(self, obj): | ||||
"""Returns first non-empty line from func docstring or func name""" | """Returns first non-empty line from func docstring or func name""" | ||||
docstr = obj.function.__doc__ | docstr = obj.function.__doc__ | ||||
if docstr: | if docstr: | ||||
for line in docstr.split("\n"): | for line in docstr.split("\n"): | ||||
if line: | if line: | ||||
return line | return line | ||||
return obj.name | return obj.name | ||||
@staticmethod | |||||
def _convert_user_mark(mark, obj, ret: Dict): | |||||
username = mark.args[0] | |||||
if username == "unprivileged": | |||||
# Special unprivileged user requested. | |||||
# First, require the unprivileged-user config option presence | |||||
key = "require.config" | |||||
if key not in ret: | |||||
ret[key] = "unprivileged_user" | |||||
else: | |||||
ret[key] = "{} {}".format(ret[key], "unprivileged_user") | |||||
# Check if the framework requires root | |||||
test_cls = ATFHandler.get_test_class(obj) | |||||
if test_cls and getattr(test_cls, "NEED_ROOT", False): | |||||
# Yes, so we ask kyua to run us under root instead | |||||
# It is up to the implementation to switch back to the desired | |||||
# user | |||||
ret["require.user"] = "root" | |||||
else: | |||||
ret["require.user"] = username | |||||
def _convert_marks(self, obj) -> Dict[str, Any]: | def _convert_marks(self, obj) -> Dict[str, Any]: | ||||
Lint: PEP8 E303: too many blank lines (2) | |||||
wj_func = lambda x: " ".join(x) # noqa: E731 | wj_func = lambda x: " ".join(x) # noqa: E731 | ||||
Lint: PEP8 E731 do not assign a lambda expression, use a def Lint: PEP8 E731: do not assign a lambda expression, use a def | |||||
_map: Dict[str, Dict] = { | _map: Dict[str, Dict] = { | ||||
Lint: PEP8 E701 multiple statements on one line (colon) Lint: PEP8 E701: multiple statements on one line (colon) | |||||
"require_user": {"name": "require.user"}, | "require_user": {"handler": self._convert_user_mark}, | ||||
"require_arch": {"name": "require.arch", "fmt": wj_func}, | "require_arch": {"name": "require.arch", "fmt": wj_func}, | ||||
"require_diskspace": {"name": "require.diskspace"}, | "require_diskspace": {"name": "require.diskspace"}, | ||||
"require_files": {"name": "require.files", "fmt": wj_func}, | "require_files": {"name": "require.files", "fmt": wj_func}, | ||||
"require_machine": {"name": "require.machine", "fmt": wj_func}, | "require_machine": {"name": "require.machine", "fmt": wj_func}, | ||||
"require_memory": {"name": "require.memory"}, | "require_memory": {"name": "require.memory"}, | ||||
"require_progs": {"name": "require.progs", "fmt": wj_func}, | "require_progs": {"name": "require.progs", "fmt": wj_func}, | ||||
"timeout": {}, | "timeout": {}, | ||||
} | } | ||||
ret = {} | ret = {} | ||||
for mark in obj.iter_markers(): | for mark in obj.iter_markers(): | ||||
if mark.name in _map: | if mark.name in _map: | ||||
if "handler" in _map[mark.name]: | |||||
_map[mark.name]["handler"](mark, obj, ret) | |||||
continue | |||||
name = _map[mark.name].get("name", mark.name) | name = _map[mark.name].get("name", mark.name) | ||||
if "fmt" in _map[mark.name]: | if "fmt" in _map[mark.name]: | ||||
val = _map[mark.name]["fmt"](mark.args[0]) | val = _map[mark.name]["fmt"](mark.args[0]) | ||||
else: | else: | ||||
val = mark.args[0] | val = mark.args[0] | ||||
ret[name] = val | ret[name] = val | ||||
return ret | return ret | ||||
def as_lines(self) -> List[str]: | def as_lines(self) -> List[str]: | ||||
"""Output test definition in ATF-specific format""" | """Output test definition in ATF-specific format""" | ||||
ret = [] | ret = [] | ||||
ret.append("ident: {}".format(self.ident)) | ret.append("ident: {}".format(self.ident)) | ||||
ret.append("descr: {}".format(self._get_test_description(self.obj))) | ret.append("descr: {}".format(self._get_test_description(self.obj))) | ||||
if self.has_cleanup: | if self.has_cleanup: | ||||
ret.append("has.cleanup: true") | ret.append("has.cleanup: true") | ||||
for key, value in self._convert_marks(self.obj).items(): | for key, value in self._convert_marks(self.obj).items(): | ||||
ret.append("{}: {}".format(key, value)) | ret.append("{}: {}".format(key, value)) | ||||
return ret | return ret | ||||
class ATFHandler(object): | class ATFHandler(object): | ||||
class ReportState(NamedTuple): | class ReportState(NamedTuple): | ||||
state: str | state: str | ||||
Lint: PEP8 E701 multiple statements on one line (colon) Lint: PEP8 E701: multiple statements on one line (colon) | |||||
reason: str | reason: str | ||||
Lint: PEP8 E701 multiple statements on one line (colon) Lint: PEP8 E701: multiple statements on one line (colon) | |||||
def __init__(self): | def __init__(self, report_file_name: Optional[str]): | ||||
self._tests_state_map: Dict[str, ReportStatus] = {} | self._tests_state_map: Dict[str, ReportStatus] = {} | ||||
Lint: PEP8 E701 multiple statements on one line (colon) Lint: PEP8 E701: multiple statements on one line (colon) | |||||
self._report_file_name = report_file_name | |||||
self._report_file_handle = None | |||||
def setup_configure(self): | |||||
fname = self._report_file_name | |||||
if fname: | |||||
self._report_file_handle = open(fname, mode="w") | |||||
def setup_method_pre(self, item): | |||||
"""Called before actually running the test setup_method""" | |||||
# Check if we need to manually drop the privileges | |||||
for mark in item.iter_markers(): | |||||
if mark.name == "require_user": | |||||
cls = self.get_test_class(item) | |||||
cls.TARGET_USER = mark.args[0] | |||||
break | |||||
def override_runtest(self, obj): | def override_runtest(self, obj): | ||||
# Override basic runtest command | # Override basic runtest command | ||||
obj.runtest = types.MethodType(ATFCleanupItem.runtest, obj) | obj.runtest = types.MethodType(ATFCleanupItem.runtest, obj) | ||||
# Override class setup/teardown | # Override class setup/teardown | ||||
obj.parent.cls.setup_method = ATFCleanupItem.setup_method_noop | obj.parent.cls.setup_method = ATFCleanupItem.setup_method_noop | ||||
obj.parent.cls.teardown_method = ATFCleanupItem.teardown_method_noop | obj.parent.cls.teardown_method = ATFCleanupItem.teardown_method_noop | ||||
@staticmethod | @staticmethod | ||||
▲ Show 20 Lines • Show All 110 Lines • ▼ Show 20 Lines | def add_report(self, report): | ||||
elif stage == "teardown": | elif stage == "teardown": | ||||
if state == "failed": | if state == "failed": | ||||
# teardown should be empty, as the cleanup | # teardown should be empty, as the cleanup | ||||
# procedures should be implemented as a separate | # procedures should be implemented as a separate | ||||
# function/method, so mark teardown failure as | # function/method, so mark teardown failure as | ||||
# global failure | # global failure | ||||
self.set_report_state(test_name, state, reason) | self.set_report_state(test_name, state, reason) | ||||
def write_report(self, path): | def write_report(self): | ||||
if self._report_file_handle is None: | |||||
return | |||||
if self._tests_state_map: | if self._tests_state_map: | ||||
# If we're executing in ATF mode, there has to be just one test | # If we're executing in ATF mode, there has to be just one test | ||||
# Anyway, deterministically pick the first one | # Anyway, deterministically pick the first one | ||||
first_test_name = next(iter(self._tests_state_map)) | first_test_name = next(iter(self._tests_state_map)) | ||||
test = self._tests_state_map[first_test_name] | test = self._tests_state_map[first_test_name] | ||||
if test.state == "passed": | if test.state == "passed": | ||||
line = test.state | line = test.state | ||||
else: | else: | ||||
line = "{}: {}".format(test.state, test.reason) | line = "{}: {}".format(test.state, test.reason) | ||||
with open(path, mode="w") as f: | print(line, file=self._report_file_handle) | ||||
print(line, file=f) | self._report_file_handle.close() | ||||
@staticmethod | @staticmethod | ||||
def get_atf_vars() -> Dict[str, str]: | def get_atf_vars() -> Dict[str, str]: | ||||
px = "_ATF_VAR_" | px = "_ATF_VAR_" | ||||
return {k[len(px):]: v for k, v in os.environ.items() if k.startswith(px)} | return {k[len(px):]: v for k, v in os.environ.items() if k.startswith(px)} | ||||
Lint: PEP8 E501 line too long (82 > 79 characters) Lint: PEP8 E501: line too long (82 > 79 characters) |
too many blank lines (2)