diff --git a/libexec/atf/atf-pytest-wrapper/atf_pytest_wrapper.cpp b/libexec/atf/atf-pytest-wrapper/atf_pytest_wrapper.cpp index bc7eec3b851d..6baa85999070 100644 --- a/libexec/atf/atf-pytest-wrapper/atf_pytest_wrapper.cpp +++ b/libexec/atf/atf-pytest-wrapper/atf_pytest_wrapper.cpp @@ -1,194 +1,209 @@ #include #include +#include #include #include #include #include class Handler { private: const std::string kPytestName = "pytest"; const std::string kCleanupSuffix = ":cleanup"; const std::string kPythonPathEnv = "PYTHONPATH"; + const std::string kAtfVar = "_ATF_VAR_"; public: // Test listing requested bool flag_list = false; // Output debug data (will break listing) bool flag_debug = false; // Cleanup for the test requested bool flag_cleanup = false; // Test source directory (provided by ATF) std::string src_dir; // Path to write test status to (provided by ATF) std::string dst_file; // Path to add to PYTHONPATH (provided by the schebang args) std::string python_path; // Path to the script (provided by the schebang wrapper) std::string script_path; // Name of the test to run (provided by ATF) std::string test_name; // kv pairs (provided by ATF) - std::vector kv_list; + std::map kv_map; // our binary name std::string binary_name; static std::vector ToVector(int argc, char **argv) { std::vector ret; for (int i = 0; i < argc; i++) { ret.emplace_back(std::string(argv[i])); } return ret; } static void PrintVector(std::string prefix, const std::vector &vec) { std::cerr << prefix << ": "; for (auto &val: vec) { std::cerr << "'" << val << "' "; } std::cerr << std::endl; } void Usage(std::string msg, bool exit_with_error) { std::cerr << binary_name << ": ERROR: " << msg << "." << std::endl; std::cerr << binary_name << ": See atf-test-program(1) for usage details." << std::endl; exit(exit_with_error != 0); } // Parse args received from the OS. There can be multiple valid options: // * with schebang args (#!/binary -P/path): // atf_wrap '-P /path' /path/to/script -l // * without schebang args // atf_wrap /path/to/script -l // Running test: // atf_wrap '-P /path' /path/to/script -r /path1 -s /path2 -vk1=v1 testname void Parse(int argc, char **argv) { if (flag_debug) { PrintVector("IN", ToVector(argc, argv)); } // getopt() skips the first argument (as it is typically binary name) // it is possible to have either '-P\s*/path' followed by the script name // or just the script name. Parse kernel-provided arg manually and adjust // array to make getopt work binary_name = std::string(argv[0]); argc--; argv++; // parse -P\s*path from the kernel. if (argc > 0 && !strncmp(argv[0], "-P", 2)) { char *path = &argv[0][2]; while (*path == ' ') path++; python_path = std::string(path); argc--; argv++; } // The next argument is a script name. Copy and keep argc/argv the same // Show usage for empty args if (argc == 0) { Usage("Must provide a test case name", true); } script_path = std::string(argv[0]); int c; while ((c = getopt(argc, argv, "lr:s:v:")) != -1) { switch (c) { case 'l': flag_list = true; break; case 's': src_dir = std::string(optarg); break; case 'r': dst_file = std::string(optarg); break; case 'v': - kv_list.emplace_back(std::string(optarg)); + { + std::string kv = std::string(optarg); + size_t splitter = kv.find("="); + if (splitter == std::string::npos) { + Usage("Unknown variable: " + kv, true); + } + kv_map[kv.substr(0, splitter)] = kv.substr(splitter + 1); + } break; default: Usage("Unknown option -" + std::string(1, static_cast(c)), true); } } argc -= optind; argv += optind; if (flag_list) { return; } // There should be just one argument with the test name if (argc != 1) { Usage("Must provide a test case name", true); } test_name = std::string(argv[0]); if (test_name.size() > kCleanupSuffix.size() && std::equal(kCleanupSuffix.rbegin(), kCleanupSuffix.rend(), test_name.rbegin())) { test_name = test_name.substr(0, test_name.size() - kCleanupSuffix.size()); flag_cleanup = true; } } std::vector BuildArgs() { std::vector args = {"pytest", "-p", "no:cacheprovider", "-s", "--atf"}; if (flag_list) { args.push_back("--co"); args.push_back(script_path); return args; } if (flag_cleanup) { args.push_back("--atf-cleanup"); } // workaround pytest parser bug: // https://github.com/pytest-dev/pytest/issues/3097 // use '--arg=value' format instead of '--arg value' for all // path-like options if (!src_dir.empty()) { args.push_back("--atf-source-dir=" + src_dir); } if (!dst_file.empty()) { args.push_back("--atf-file=" + dst_file); } - for (auto &pair: kv_list) { - args.push_back("--atf-var"); - args.push_back(pair); - } // Create nodeid from the test path &name args.push_back(script_path + "::" + test_name); return args; } - void SetEnv() { + void SetPythonPath() { if (!python_path.empty()) { char *env_path = getenv(kPythonPathEnv.c_str()); if (env_path != nullptr) { python_path = python_path + ":" + std::string(env_path); } setenv(kPythonPathEnv.c_str(), python_path.c_str(), 1); } } + void SetEnv() { + SetPythonPath(); + + // Pass ATF kv pairs as env variables to avoid dealing with + // pytest parser + for (auto [k, v]: kv_map) { + setenv((kAtfVar + k).c_str(), v.c_str(), 1); + } + } + int Run(std::string binary, std::vector args) { if (flag_debug) { PrintVector("OUT", args); } // allocate array with final NULL char **arr = new char*[args.size() + 1](); for (unsigned long i = 0; i < args.size(); i++) { // work around 'char *const *' arr[i] = strdup(args[i].c_str()); } return (execvp(binary.c_str(), arr) != 0); } int Process() { SetEnv(); return Run(kPytestName, BuildArgs()); } }; int main(int argc, char **argv) { Handler handler; handler.Parse(argc, argv); return handler.Process(); } diff --git a/tests/atf_python/atf_pytest.py b/tests/atf_python/atf_pytest.py index 89c0e3a515b9..f72122fb740e 100644 --- a/tests/atf_python/atf_pytest.py +++ b/tests/atf_python/atf_pytest.py @@ -1,218 +1,224 @@ import types from typing import Any from typing import Dict from typing import List from typing import NamedTuple from typing import Tuple import pytest +import os class ATFCleanupItem(pytest.Item): def runtest(self): """Runs cleanup procedure for the test instead of the test""" instance = self.parent.cls() instance.cleanup(self.nodeid) def setup_method_noop(self, method): """Overrides runtest setup method""" pass def teardown_method_noop(self, method): """Overrides runtest teardown method""" pass class ATFTestObj(object): def __init__(self, obj, has_cleanup): # Use nodeid without name to properly name class-derived tests self.ident = obj.nodeid.split("::", 1)[1] self.description = self._get_test_description(obj) self.has_cleanup = has_cleanup self.obj = obj def _get_test_description(self, obj): """Returns first non-empty line from func docstring or func name""" docstr = obj.function.__doc__ if docstr: for line in docstr.split("\n"): if line: return line return obj.name def _convert_marks(self, obj) -> Dict[str, Any]: wj_func = lambda x: " ".join(x) # noqa: E731 _map: Dict[str, Dict] = { "require_user": {"name": "require.user"}, "require_arch": {"name": "require.arch", "fmt": wj_func}, "require_diskspace": {"name": "require.diskspace"}, "require_files": {"name": "require.files", "fmt": wj_func}, "require_machine": {"name": "require.machine", "fmt": wj_func}, "require_memory": {"name": "require.memory"}, "require_progs": {"name": "require.progs", "fmt": wj_func}, "timeout": {}, } ret = {} for mark in obj.iter_markers(): if mark.name in _map: name = _map[mark.name].get("name", mark.name) if "fmt" in _map[mark.name]: val = _map[mark.name]["fmt"](mark.args[0]) else: val = mark.args[0] ret[name] = val return ret def as_lines(self) -> List[str]: """Output test definition in ATF-specific format""" ret = [] ret.append("ident: {}".format(self.ident)) ret.append("descr: {}".format(self._get_test_description(self.obj))) if self.has_cleanup: ret.append("has.cleanup: true") for key, value in self._convert_marks(self.obj).items(): ret.append("{}: {}".format(key, value)) return ret class ATFHandler(object): class ReportState(NamedTuple): state: str reason: str def __init__(self): self._tests_state_map: Dict[str, ReportStatus] = {} def override_runtest(self, obj): # Override basic runtest command obj.runtest = types.MethodType(ATFCleanupItem.runtest, obj) # Override class setup/teardown obj.parent.cls.setup_method = ATFCleanupItem.setup_method_noop obj.parent.cls.teardown_method = ATFCleanupItem.teardown_method_noop def get_object_cleanup_class(self, obj): if hasattr(obj, "parent") and obj.parent is not None: if hasattr(obj.parent, "cls") and obj.parent.cls is not None: if hasattr(obj.parent.cls, "cleanup"): return obj.parent.cls return None def has_object_cleanup(self, obj): return self.get_object_cleanup_class(obj) is not None def list_tests(self, tests: List[str]): print('Content-Type: application/X-atf-tp; version="1"') print() for test_obj in tests: has_cleanup = self.has_object_cleanup(test_obj) atf_test = ATFTestObj(test_obj, has_cleanup) for line in atf_test.as_lines(): print(line) print() def set_report_state(self, test_name: str, state: str, reason: str): self._tests_state_map[test_name] = self.ReportState(state, reason) def _extract_report_reason(self, report): data = report.longrepr if data is None: return None if isinstance(data, Tuple): # ('/path/to/test.py', 23, 'Skipped: unable to test') reason = data[2] for prefix in "Skipped: ": if reason.startswith(prefix): reason = reason[len(prefix):] return reason else: # string/ traceback / exception report. Capture the last line return str(data).split("\n")[-1] return None def add_report(self, report): # MAP pytest report state to the atf-desired state # # ATF test states: # (1) expected_death, (2) expected_exit, (3) expected_failure # (4) expected_signal, (5) expected_timeout, (6) passed # (7) skipped, (8) failed # # Note that ATF don't have the concept of "soft xfail" - xpass # is a failure. It also calls teardown routine in a separate # process, thus teardown states (pytest-only) are handled as # body continuation. # (stage, state, wasxfail) # Just a passing test: WANT: passed # GOT: (setup, passed, F), (call, passed, F), (teardown, passed, F) # # Failing body test: WHAT: failed # GOT: (setup, passed, F), (call, failed, F), (teardown, passed, F) # # pytest.skip test decorator: WANT: skipped # GOT: (setup,skipped, False), (teardown, passed, False) # # pytest.skip call inside test function: WANT: skipped # GOT: (setup, passed, F), (call, skipped, F), (teardown,passed, F) # # mark.xfail decorator+pytest.xfail: WANT: expected_failure # GOT: (setup, passed, F), (call, skipped, T), (teardown, passed, F) # # mark.xfail decorator+pass: WANT: failed # GOT: (setup, passed, F), (call, passed, T), (teardown, passed, F) test_name = report.location[2] stage = report.when state = report.outcome reason = self._extract_report_reason(report) # We don't care about strict xfail - it gets translated to False if stage == "setup": if state in ("skipped", "failed"): # failed init -> failed test, skipped setup -> xskip # for the whole test self.set_report_state(test_name, state, reason) elif stage == "call": # "call" stage shouldn't matter if setup failed if test_name in self._tests_state_map: if self._tests_state_map[test_name].state == "failed": return if state == "failed": # Record failure & override "skipped" state self.set_report_state(test_name, state, reason) elif state == "skipped": if hasattr(reason, "wasxfail"): # xfail() called in the test body state = "expected_failure" else: # skip inside the body pass self.set_report_state(test_name, state, reason) elif state == "passed": if hasattr(reason, "wasxfail"): # the test was expected to fail but didn't # mark as hard failure state = "failed" self.set_report_state(test_name, state, reason) elif stage == "teardown": if state == "failed": # teardown should be empty, as the cleanup # procedures should be implemented as a separate # function/method, so mark teardown failure as # global failure self.set_report_state(test_name, state, reason) def write_report(self, path): if self._tests_state_map: # If we're executing in ATF mode, there has to be just one test # Anyway, deterministically pick the first one first_test_name = next(iter(self._tests_state_map)) test = self._tests_state_map[first_test_name] if test.state == "passed": line = test.state else: line = "{}: {}".format(test.state, test.reason) with open(path, mode="w") as f: print(line, file=f) + + @staticmethod + def get_atf_vars() -> Dict[str, str]: + px = "_ATF_VAR_" + return {k[len(px):]: v for k, v in os.environ.items() if k.startswith(px)} diff --git a/tests/conftest.py b/tests/conftest.py index 193d2adfb5e0..65c8bf5f0d01 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,121 +1,126 @@ import pytest from atf_python.atf_pytest import ATFHandler +from typing import Dict PLUGIN_ENABLED = False DEFAULT_HANDLER = None def get_handler(): global DEFAULT_HANDLER if DEFAULT_HANDLER is None: DEFAULT_HANDLER = ATFHandler() return DEFAULT_HANDLER def pytest_addoption(parser): """Add file output""" # Add meta-values group = parser.getgroup("general", "Running and selection options") - group.addoption("--atf-var", dest="atf_vars", action="append", default=[]) group.addoption( "--atf-source-dir", type=str, dest="atf_source_dir", help="Path to the test source directory", ) group.addoption( "--atf-cleanup", default=False, action="store_true", dest="atf_cleanup", help="Call cleanup procedure for a given test", ) group = parser.getgroup("terminal reporting", "reporting", after="general") group.addoption( "--atf", default=False, action="store_true", help="Enable test listing/results output in atf format", ) group.addoption( "--atf-file", type=str, dest="atf_file", help="Path to the status file provided by atf runtime", ) +@pytest.fixture(autouse=True, scope="session") +def atf_vars() -> Dict[str, str]: + return ATFHandler.get_atf_vars() + + @pytest.mark.trylast def pytest_configure(config): if config.option.help: return # Register markings anyway to avoid warnings config.addinivalue_line("markers", "require_user(name): user to run the test with") config.addinivalue_line( "markers", "require_arch(names): List[str] of support archs" ) # config.addinivalue_line("markers", "require_config(config): List[Tuple[str,Any]] of k=v pairs") config.addinivalue_line( "markers", "require_diskspace(amount): str with required diskspace" ) config.addinivalue_line( "markers", "require_files(space): List[str] with file paths" ) config.addinivalue_line( "markers", "require_machine(names): List[str] of support machine types" ) config.addinivalue_line( "markers", "require_memory(amount): str with required memory" ) config.addinivalue_line( "markers", "require_progs(space): List[str] with file paths" ) config.addinivalue_line( "markers", "timeout(dur): int/float with max duration in sec" ) global PLUGIN_ENABLED PLUGIN_ENABLED = config.option.atf if not PLUGIN_ENABLED: return get_handler() if config.option.collectonly: # Need to output list of tests to stdout, hence override # standard reporter plugin reporter = config.pluginmanager.getplugin("terminalreporter") if reporter: config.pluginmanager.unregister(reporter) def pytest_collection_modifyitems(session, config, items): """If cleanup is requested, replace collected tests with their cleanups (if any)""" if PLUGIN_ENABLED and config.option.atf_cleanup: new_items = [] handler = get_handler() for obj in items: if handler.has_object_cleanup(obj): handler.override_runtest(obj) new_items.append(obj) items.clear() items.extend(new_items) def pytest_collection_finish(session): if PLUGIN_ENABLED and session.config.option.collectonly: handler = get_handler() handler.list_tests(session.items) def pytest_runtest_logreport(report): if PLUGIN_ENABLED: handler = get_handler() handler.add_report(report) def pytest_unconfigure(config): if PLUGIN_ENABLED and config.option.atf_file: handler = get_handler() handler.write_report(config.option.atf_file)