Index: share/mk/atf.test.mk =================================================================== --- share/mk/atf.test.mk +++ share/mk/atf.test.mk @@ -22,6 +22,7 @@ ATF_TESTS_CXX?= ATF_TESTS_SH?= ATF_TESTS_KSH93?= +ATF_TESTS_PYTEST?= .if !empty(ATF_TESTS_C) PROGS+= ${ATF_TESTS_C} @@ -109,3 +110,49 @@ mv ${.TARGET}.tmp ${.TARGET} .endfor .endif + +.if !empty(ATF_TESTS_PYTEST) +# bsd.prog.mk SCRIPTS interface removes file extension unless +# SCRIPTSNAME is set, which is not possible to do here. +# Workaround this by appending another extension (.xtmp) to the +# file name. Use separate loop to avoid dealing with explicitly +# stating expansion for each and every variable. +# +# ATF_TESTS_PYTEST -> contains list of files as is (test_something.py ..) +# _ATF_TESTS_PYTEST -> (test_something.py.xtmp ..) +# +# Former array is iterated to construct Kyuafile, where original file +# names need to be written. +# Latter array is iterated to enable bsd.prog.mk scripts framework - +# namely, installing scripts without .xtmp prefix. Note: this allows to +# not bother about the fact that make target needs to be different from +# the source file. +_TESTS+= ${ATF_TESTS_PYTEST} +_ATF_TESTS_PYTEST= +.for _T in ${ATF_TESTS_PYTEST} +_ATF_TESTS_PYTEST += ${_T}.xtmp +TEST_INTERFACE.${_T}= atf +TEST_METADATA.${_T}+= required_programs="pytest" +.endfor + +SCRIPTS+= ${_ATF_TESTS_PYTEST} +.for _T in ${_ATF_TESTS_PYTEST} +SCRIPTSDIR_${_T}= ${TESTSDIR} +CLEANFILES+= ${_T} ${_T}.tmp +# TODO(jmmv): It seems to me that this SED and SRC functionality should +# exist in bsd.prog.mk along the support for SCRIPTS. Move it there if +# this proves to be useful within the tests. +ATF_TESTS_PYTEST_SED_${_T}?= # empty +ATF_TESTS_PYTEST_SRC_${_T}?= ${.CURDIR}/${_T:S,.xtmp$,,} +${_T}: + echo "#!${TESTSBASE}/atf_pytest_wrapper -S ${TESTSBASE}" > ${.TARGET}.tmp +.if empty(ATF_TESTS_PYTEST_SED_${_T}) + cat ${ATF_TESTS_PYTEST_SRC_${_T}} >>${.TARGET}.tmp +.else + cat ${ATF_TESTS_PYTEST_SRC_${_T}} \ + | sed ${ATF_TESTS_PYTEST_SED_${_T}} >>${.TARGET}.tmp +.endif + chmod +x ${.TARGET}.tmp + mv ${.TARGET}.tmp ${.TARGET} +.endfor +.endif Index: tests/Makefile =================================================================== --- tests/Makefile +++ tests/Makefile @@ -4,12 +4,14 @@ TESTSDIR= ${TESTSBASE} -${PACKAGE}FILES+= README +${PACKAGE}FILES+= README __init__.py conftest.py KYUAFILE= yes SUBDIR+= etc SUBDIR+= sys +SUBDIR+= atf_python +SUBDIR+= freebsd_test_suite SUBDIR_PARALLEL= Index: tests/atf_python/Makefile =================================================================== --- /dev/null +++ tests/atf_python/Makefile @@ -0,0 +1,12 @@ +.include + +.PATH: ${.CURDIR} + +FILES= __init__.py atf_pytest.py +SUBDIR= sys + +.include +FILESDIR= ${TESTSBASE}/atf_python + + +.include Index: tests/atf_python/atf_pytest.py =================================================================== --- /dev/null +++ tests/atf_python/atf_pytest.py @@ -0,0 +1,209 @@ +import types +from typing import Any +from typing import Dict +from typing import List +from typing import Tuple + +import pytest + + +class ATFCleanupItem(pytest.Item): + def runtest(self): + """Runs cleanup procedure for the test instead of the test""" + instance = self.parent.cls() + instance.cleanup(self.nodeid) + + def setup_method_noop(self, method): + """Overrides runtest setup method""" + pass + + def teardown_method_noop(self, method): + """Overrides runtest teardown method""" + pass + + +class ATFTestObj(object): + def __init__(self, obj, has_cleanup): + # Use nodeid without name to properly name class-derived tests + self.ident = obj.nodeid.split("::", 1)[1] + self.description = self._get_test_description(obj) + self.has_cleanup = has_cleanup + self.obj = obj + + def _get_test_description(self, obj): + """Returns first non-empty line from func docstring or func name""" + docstr = obj.function.__doc__ + if docstr: + for line in docstr.split("\n"): + if line: + return line + return obj.name + + def _convert_marks(self, obj) -> Dict[str, Any]: + wj_func = lambda x: " ".join(x) # noqa: E731 + _map: Dict[str, Dict] = { + "require_user": {"name": "require.user"}, + "require_arch": {"name": "require.arch", "fmt": wj_func}, + "require_diskspace": {"name": "require.diskspace"}, + "require_files": {"name": "require.files", "fmt": wj_func}, + "require_machine": {"name": "require.machine", "fmt": wj_func}, + "require_memory": {"name": "require.memory"}, + "require_progs": {"name": "require.progs", "fmt": wj_func}, + "timeout": {}, + } + ret = {} + for mark in obj.iter_markers(): + if mark.name in _map: + name = _map[mark.name].get("name", mark.name) + if "fmt" in _map[mark.name]: + val = _map[mark.name]["fmt"](mark.args[0]) + else: + val = mark.args[0] + ret[name] = val + return ret + + def as_lines(self) -> List[str]: + """Output test definition in ATF-specific format""" + ret = [] + ret.append("ident: {}".format(self.ident)) + ret.append("descr: {}".format(self._get_test_description(self.obj))) + if self.has_cleanup: + ret.append("has.cleanup: true") + for key, value in self._convert_marks(self.obj).items(): + ret.append("{}: {}".format(key, value)) + return ret + + +class ATFHandler(object): + def __init__(self): + self._tests_state_map: Dict[str, Tuple[str, Any]] = {} + + def override_runtest(self, obj): + # Override basic runtest command + obj.runtest = types.MethodType(ATFCleanupItem.runtest, obj) + # Override class setup/teardown + obj.parent.cls.setup_method = ATFCleanupItem.setup_method_noop + obj.parent.cls.teardown_method = ATFCleanupItem.teardown_method_noop + + def get_object_cleanup_class(self, obj): + if hasattr(obj, "parent") and obj.parent is not None: + if hasattr(obj.parent, "cls") and obj.parent.cls is not None: + if hasattr(obj.parent.cls, "cleanup"): + return obj.parent.cls + return None + + def has_object_cleanup(self, obj): + return self.get_object_cleanup_class(obj) is not None + + def list_tests(self, tests: List[str]): + print('Content-Type: application/X-atf-tp; version="1"') + print() + for test_obj in tests: + has_cleanup = self.has_object_cleanup(test_obj) + atf_test = ATFTestObj(test_obj, has_cleanup) + for line in atf_test.as_lines(): + print(line) + print() + + def _extract_report_reason(self, report): + data = report.longrepr + if data is None: + return None + if isinstance(data, Tuple): + # ('/path/to/test.py', 23, 'Skipped: unable to test') + reason = data[2] + for prefix in "Skipped: ": + if reason.startswith(prefix): + reason = reason[len(prefix):] + return reason + else: + # string/ traceback / exception report. Capture the last line + return str(data).split("\n")[-1] + return None + + def add_report(self, report): + # MAP pytest report state to the atf-desired state + # + # ATF test states: + # (1) expected_death, (2) expected_exit, (3) expected_failure + # (4) expected_signal, (5) expected_timeout, (6) passed + # (7) skipped, (8) failed + # + # Note that ATF don't have the concept of "soft xfail" - xpass + # is a failure. It also calls teardown routine in a separate + # process, thus teardown states (pytest-only) are handled as + # body continuation. + + # (stage, state, wasxfail) + + # Just a passing test: WANT: passed + # GOT: (setup, passed, F), (call, passed, F), (teardown, passed, F) + # + # Failing body test: WHAT: failed + # GOT: (setup, passed, F), (call, failed, F), (teardown, passed, F) + # + # pytest.skip test decorator: WANT: skipped + # GOT: (setup,skipped, False), (teardown, passed, False) + # + # pytest.skip call inside test function: WANT: skipped + # GOT: (setup, passed, F), (call, skipped, F), (teardown,passed, F) + # + # mark.xfail decorator+pytest.xfail: WANT: expected_failure + # GOT: (setup, passed, F), (call, skipped, T), (teardown, passed, F) + # + # mark.xfail decorator+pass: WANT: failed + # GOT: (setup, passed, F), (call, passed, T), (teardown, passed, F) + + test_name = report.location[2] + stage = report.when + state = report.outcome + reason = self._extract_report_reason(report) + + # We don't care about strict xfail - it gets translated to False + + if stage == "setup": + if state in ("skipped", "failed"): + # failed init -> failed test, skipped setup -> xskip + # for the whole test + self._tests_state_map[test_name] = (state, reason) + elif stage == "call": + # "call" stage shouldn't matter if setup failed + if self._tests_state_map.get(test_name) == "failed": + return + if state == "failed": + # Record failure & override "skipped" state + self._tests_state_map[test_name] = (state, reason) + elif state == "skipped": + if hasattr(reason, "wasxfail"): + # xfail() called in the test body + state = "expected_failure" + else: + # skip inside the body + pass + self._tests_state_map[test_name] = (state, reason) + elif state == "passed": + if hasattr(reason, "wasxfail"): + # the test was expected to fail but didn't + # mark as hard failure + state = "failed" + self._tests_state_map[test_name] = (state, reason) + elif stage == "teardown": + if state == "failed": + # teardown should be empty, as the cleanup + # procedures should be implemented as a separate + # function/method, so mark teardown failure as + # global failure + self._tests_state_map[test_name] = (state, reason) + + def write_report(self, path): + if self._tests_state_map: + # If we're executing in ATF mode, there has to be just one test + # Anyway, deterministically pick the first one + first_test = list(self._tests_state_map.keys())[0] + state, reason = self._tests_state_map[first_test] + if state == "passed": + line = state + else: + line = "{}: {}".format(state, reason) + with open(path, mode="w") as f: + print(line, file=f) Index: tests/atf_python/sys/Makefile =================================================================== --- /dev/null +++ tests/atf_python/sys/Makefile @@ -0,0 +1,11 @@ +.include + +.PATH: ${.CURDIR} + +FILES= __init__.py +SUBDIR= net + +.include +FILESDIR= ${TESTSBASE}/atf_python/sys + +.include Index: tests/atf_python/sys/net/Makefile =================================================================== --- /dev/null +++ tests/atf_python/sys/net/Makefile @@ -0,0 +1,10 @@ +.include + +.PATH: ${.CURDIR} + +FILES= __init__.py rtsock.py vnet.py + +.include +FILESDIR= ${TESTSBASE}/atf_python/sys/net + +.include Index: tests/atf_python/sys/net/rtsock.py =================================================================== --- /dev/null +++ tests/atf_python/sys/net/rtsock.py @@ -0,0 +1,477 @@ +#!/usr/local/bin/python3 +import os +import socket +import struct +import sys +import unittest +from ctypes import c_byte +from ctypes import c_char +from ctypes import c_int +from ctypes import c_uint32 +from ctypes import c_ulong +from ctypes import c_ushort +from ctypes import sizeof +from ctypes import Structure +from typing import Dict +from typing import List +from typing import Optional + + +def roundup2(val: int, num: int) -> int: + if val % num: + return (val | (num - 1)) + 1 + else: + return val + + +class RtConst: + RTM_VERSION = 5 + + AF_INET = socket.AF_INET + AF_INET6 = socket.AF_INET6 + AF_LINK = socket.AF_LINK + + RTA_DST = 0x1 + RTA_GATEWAY = 0x2 + RTA_NETMASK = 0x4 + RTA_GENMASK = 0x8 + RTA_IFP = 0x10 + RTA_IFA = 0x20 + RTA_AUTHOR = 0x40 + RTA_BRD = 0x80 + + RTM_ADD = 1 + RTM_DELETE = 2 + RTM_CHANGE = 3 + RTM_GET = 4 + + RTF_UP = 0x1 + RTF_GATEWAY = 0x2 + RTF_HOST = 0x4 + RTF_REJECT = 0x8 + RTF_DYNAMIC = 0x10 + RTF_MODIFIED = 0x20 + RTF_DONE = 0x40 + RTF_XRESOLVE = 0x200 + RTF_LLINFO = 0x400 + RTF_LLDATA = 0x400 + RTF_STATIC = 0x800 + RTF_BLACKHOLE = 0x1000 + RTF_PROTO2 = 0x4000 + RTF_PROTO1 = 0x8000 + RTF_PROTO3 = 0x40000 + RTF_FIXEDMTU = 0x80000 + RTF_PINNED = 0x100000 + RTF_LOCAL = 0x200000 + RTF_BROADCAST = 0x400000 + RTF_MULTICAST = 0x800000 + RTF_STICKY = 0x10000000 + RTF_RNH_LOCKED = 0x40000000 + RTF_GWFLAG_COMPAT = 0x80000000 + + @staticmethod + def get_props(prefix: str) -> List[str]: + return [n for n in dir(RtConst) if n.startswith(prefix)] + + @staticmethod + def get_name(prefix: str, value: int) -> str: + props = RtConst.get_props(prefix) + for prop in props: + if getattr(RtConst, prop) == value: + return prop + return "U:" + str(value) + + @staticmethod + def get_bitmask_map(prefix: str, value: int) -> Dict[int, str]: + props = RtConst.get_props(prefix) + propmap = {getattr(RtConst, prop): prop for prop in props} + v = 1 + ret = {} + while value: + if v & value: + if v in propmap: + ret[v] = propmap[v] + else: + ret[v] = hex(v) + value -= v + v *= 2 + return ret + + @staticmethod + def get_bitmask_str(prefix: str, value: int) -> str: + bmap = RtConst.get_bitmask_map(prefix, value) + return ",".join([v for k, v in bmap.items()]) + + +class RtMetrics(Structure): + _fields_ = [ + ("rmx_locks", c_ulong), + ("rmx_mtu", c_ulong), + ("rmx_hopcount", c_ulong), + ("rmx_expire", c_ulong), + ("rmx_recvpipe", c_ulong), + ("rmx_sendpipe", c_ulong), + ("rmx_ssthresh", c_ulong), + ("rmx_rtt", c_ulong), + ("rmx_rttvar", c_ulong), + ("rmx_pksent", c_ulong), + ("rmx_weight", c_ulong), + ("rmx_nhidx", c_ulong), + ("rmx_filler", c_ulong * 2), + ] + + +class RtMsgHdr(Structure): + _fields_ = [ + ("rtm_msglen", c_ushort), + ("rtm_version", c_byte), + ("rtm_type", c_byte), + ("rtm_index", c_ushort), + ("_rtm_spare1", c_ushort), + ("rtm_flags", c_int), + ("rtm_addrs", c_int), + ("rtm_pid", c_int), + ("rtm_seq", c_int), + ("rtm_errno", c_int), + ("rtm_fmask", c_int), + ("rtm_inits", c_ulong), + ("rtm_rmx", RtMetrics), + ] + + +class SockaddrIn(Structure): + _fields_ = [ + ("sin_len", c_byte), + ("sin_family", c_byte), + ("sin_port", c_ushort), + ("sin_addr", c_uint32), + ("sin_zero", c_char * 8), + ] + + +class SockaddrIn6(Structure): + _fields_ = [ + ("sin6_len", c_byte), + ("sin6_family", c_byte), + ("sin6_port", c_ushort), + ("sin6_flowinfo", c_uint32), + ("sin6_addr", c_byte * 16), + ("sin6_scope_id", c_uint32), + ] + + +class SockaddrDl(Structure): + _fields_ = [ + ("sdl_len", c_byte), + ("sdl_family", c_byte), + ("sdl_index", c_ushort), + ("sdl_type", c_byte), + ("sdl_nlen", c_byte), + ("sdl_alen", c_byte), + ("sdl_slen", c_byte), + ("sdl_data", c_byte * 8), + ] + + +class SaHelper(object): + @staticmethod + def ip_sa(ip: str) -> bytes: + addr_int = int.from_bytes(socket.inet_pton(2, ip), sys.byteorder) + sin = SockaddrIn(sizeof(SockaddrIn), socket.AF_INET, 0, addr_int) + return bytes(sin) + + @staticmethod + def ip6_sa(ip6: str, scopeid: int) -> bytes: + addr_bytes = socket.inet_pton(socket.AF_INET6, ip6) + sin6 = SockaddrIn6(sizeof(SockaddrIn6), socket.AF_INET6, 0, 0, addr_bytes, 0) + return bytes(sin6) + + @staticmethod + def link_sa(ifindex: int = 0, iftype: int = 0) -> bytes: + sa = SockaddrDl(sizeof(SockaddrDl), socket.AF_LINK, c_ushort(ifindex), iftype) + return bytes(sa) + + +class BaseRtsockMessage(object): + def __init__(self, rtm_type): + self.rtm_type = rtm_type + self.ut = unittest.TestCase() + self.sa = SaHelper() + + def assertEqual(self, a, b, msg=None): + self.ut.assertEqual(a, b, msg) + + def assertNotEqual(self, a, b, msg=None): + self.ut.assertNotEqual(a, b, msg) + + +class RtsockRtMessage(BaseRtsockMessage): + messages = [ + RtConst.RTM_ADD, + RtConst.RTM_DELETE, + RtConst.RTM_CHANGE, + RtConst.RTM_GET, + ] + + def __init__(self, rtm_type, rtm_seq=1, dst_sa=None, mask_sa=None): + super().__init__(rtm_type) + self.rtm_flags = 0 + self.rtm_seq = rtm_seq + self._attrs = {} + self.rtm_errno = 0 + self.rtm_pid = 0 + self._orig_data = None + if dst_sa: + self.add_sa_attr(RtConst.RTA_DST, dst_sa) + if mask_sa: + self.add_sa_attr(RtConst.RTA_NETMASK, mask_sa) + + def add_sa_attr(self, attr_type, attr_bytes): + self._attrs[attr_type] = attr_bytes + + def add_ip_attr(self, attr_type, ip: str): + self.add_sa_attr(attr_type, self.sa.ip_sa(ip)) + + def add_ip6_attr(self, attr_type, ip6: str, scopeid: int): + self.add_sa_attr(attr_type, self.sa.ip6_sa(ip6, scopeid)) + + def add_link_attr(self, attr_type, ifindex: Optional[int] = 0): + self.add_sa_attr(attr_type, self.sa.link_sa(ifindex)) + + def print_sa_inet(self, sa: bytes): + if len(sa) < 8: + raise Exception("IPv4 sa size too small: {}".format(len(sa))) + addr = socket.inet_ntop(socket.AF_INET, sa[4:8]) + return "{}".format(addr) + + def print_sa_inet6(self, sa: bytes): + if len(sa) < sizeof(SockaddrIn6): + raise Exception("IPv6 sa size too small: {}".format(len(sa))) + addr = socket.inet_ntop(socket.AF_INET6, sa[8:24]) + scopeid = struct.unpack(">I", sa[24:28]) + return "{} scopeid {}".format(addr, scopeid) + + def print_sa_link(self, sa: bytes, hd: Optional[bool] = True): + if len(sa) < sizeof(SockaddrDl): + raise Exception("LINK sa size too small: {}".format(len(sa))) + sdl = SockaddrDl.from_buffer_copy(sa) + if sdl.sdl_index: + ifindex = "link#{} ".format(sdl.sdl_index) + else: + ifindex = "" + if sdl.sdl_nlen: + iface_offset = 8 + if sdl.sdl_nlen + iface_offset > len(sa): + raise Exception( + "LINK sa sdl_nlen {} > total len {}".format(sdl.sdl_nlen, len(sa)) + ) + ifname = "ifname:{} ".format( + bytes.decode(sa[iface_offset:iface_offset + sdl.sdl_nlen]) + ) + else: + ifname = "" + return "{}{}".format(ifindex, ifname) + + def print_sa_unknown(self, sa: bytes): + return "unknown_type:{}".format(sa[1]) + + def print_sa(self, sa: bytes, hd: Optional[bool] = False): + if sa[0] != len(sa): + raise Exception("sa size {} != buffer size {}".format(sa[0], len(sa))) + + if len(sa) < 2: + raise Exception( + "sa type {} too short: {}".format( + RtConst.get_name("AF_", sa[1]), len(sa) + ) + ) + + if sa[1] == socket.AF_INET: + text = self.print_sa_inet(sa) + elif sa[1] == socket.AF_INET6: + text = self.print_sa_inet6(sa) + elif sa[1] == socket.AF_LINK: + text = self.print_sa_link(sa) + else: + text = self.print_sa_unknown(sa) + if hd: + dump = " [{!r}]".format(sa) + else: + dump = "" + return "{}{}".format(text, dump) + + def print_message(self): + # RTM_GET: Report Metrics: len 272, pid: 87839, seq 1, errno 0, flags: + if self._orig_data: + rtm_len = len(self._orig_data) + else: + rtm_len = len(bytes(self)) + print( + "{}: len {}, pid: {}, seq {}, errno {}, flags: <{}>".format( + RtConst.get_name("RTM_", self.rtm_type), + rtm_len, + self.rtm_pid, + self.rtm_seq, + self.rtm_errno, + RtConst.get_bitmask_str("RTF_", self.rtm_flags), + ) + ) + rtm_addrs = sum(list(self._attrs.keys())) + print("Addrs: <{}>".format(RtConst.get_bitmask_str("RTA_", rtm_addrs))) + for attr in sorted(self._attrs.keys()): + sa_data = self.print_sa(self._attrs[attr]) + print(" {}: {}".format(RtConst.get_name("RTA_", attr), sa_data)) + + def verify_sa_inet(self, sa_data): + if len(sa_data) < 8: + raise Exception("IPv4 sa size too small: {}".format(sa_data)) + if sa_data[0] > len(sa_data): + raise Exception( + "IPv4 sin_len too big: {} vs sa size {}: {}".format( + sa_data[0], len(sa_data), sa_data + ) + ) + sin = SockaddrIn.from_buffer_copy(sa_data) + self.assertEqual(sin.sin_port, 0) + assert sin.sin_zero == [0] * 8 + + def compare_sa(self, sa_type, sa_data): + if len(sa_data) < 4: + sa_type_name = RtConst.get_name("RTA_", sa_type) + raise Exception( + "sa_len for type {} too short: {}".format(sa_type_name, len(sa_data)) + ) + our_sa = self._attrs[sa_type] + self.assertEqual(len(sa_data), len(our_sa)) + self.assertEqual(our_sa, sa_data) + + def verify(self, rtm_type: int, rtm_sa): + assert self.rtm_type == rtm_type + assert self.rtm_errno == 0 + hdr = RtMsgHdr.from_buffer_copy(self._orig_data) + assert hdr._rtm_spare1 == 0 + for sa_type, sa_data in rtm_sa.items(): + if sa_type not in self._attrs: + sa_type_name = RtConst.get_name("RTA_", sa_type) + raise Exception("SA type {} not present".format(sa_type_name)) + self.compare_sa(sa_type, sa_data) + + @classmethod + def from_bytes(cls, data: bytes): + if len(data) < sizeof(RtMsgHdr): + raise Exception( + "messages size {} is less than expected {}".format( + len(data), sizeof(RtMsgHdr) + ) + ) + hdr = RtMsgHdr.from_buffer_copy(data) + + self = cls(hdr.rtm_type) + self.rtm_flags = hdr.rtm_flags + self.rtm_seq = hdr.rtm_seq + self.rtm_errno = hdr.rtm_errno + self.rtm_pid = hdr.rtm_pid + self._orig_data = data + + off = sizeof(RtMsgHdr) + v = 1 + addrs_mask = hdr.rtm_addrs + while addrs_mask: + if addrs_mask & v: + addrs_mask -= v + + if off + data[off] > len(data): + raise Exception( + "SA sizeof for {} > total message length: {}+{} > {}".format( + RtConst.get_name("RTA_", v), off, data[off], len(data) + ) + ) + self._attrs[v] = data[off:off + data[off]] + off += roundup2(data[off], 4) + v *= 2 + return self + + def __bytes__(self): + sz = sizeof(RtMsgHdr) + addrs_mask = 0 + for k, v in self._attrs.items(): + sz += roundup2(len(v), 4) + addrs_mask += k + hdr = RtMsgHdr( + rtm_msglen=sz, + rtm_version=RtConst.RTM_VERSION, + rtm_type=self.rtm_type, + rtm_flags=self.rtm_flags, + rtm_seq=self.rtm_seq, + rtm_addrs=addrs_mask, + ) + buf = bytearray(sz) + buf[0:sizeof(RtMsgHdr)] = hdr + off = sizeof(RtMsgHdr) + for attr in sorted(self._attrs.keys()): + v = self._attrs[attr] + sa_len = len(v) + buf[off:off + sa_len] = v + off += roundup2(len(v), 4) + return bytes(buf) + + +class Rtsock: + def __init__(self): + self.socket = self._setup_rtsock() + self.rtm_seq = 1 + self.msgmap = self.build_msgmap() + + def build_msgmap(self): + classes = [RtsockRtMessage] + xmap = {} + for cls in classes: + for message in cls.messages: + xmap[message] = cls + return xmap + + def get_seq(self): + ret = self.rtm_seq + self.rtm_seq += 1 + return ret + + def _setup_rtsock(self) -> socket.socket: + s = socket.socket(socket.AF_ROUTE, socket.SOCK_RAW, socket.AF_UNSPEC) + s.setsockopt(socket.SOL_SOCKET, socket.SO_USELOOPBACK, 1) + return s + + def write_message(self, msg): + print("vvvvvvvv OUT vvvvvvvv") + msg.print_message() + msg_bytes = bytes(msg) + try: + ret = os.write(self.socket.fileno(), msg_bytes) + if ret != -1: + self.assertEqual(ret, len(msg_bytes)) + except Exception as e: + print("write({}) -> {}".format(len(msg_bytes), e)) + + def parse_message(self, data: bytes): + if len(data) < 4: + raise Exception("Short read from rtsock: {} bytes".format(len(data))) + rtm_type = data[4] + if rtm_type not in self.msgmap: + return None + + def write_data(self, data: bytes): + self.socket.send(data) + + def read_data(self, seq: Optional[int] = None) -> bytes: + while True: + data = self.socket.recv(4096) + if seq is None: + break + if len(data) > sizeof(RtMsgHdr): + hdr = RtMsgHdr.from_buffer_copy(data) + if hdr.rtm_seq == seq: + break + return data + + def read_message(self) -> bytes: + data = self.read_data() + return self.parse_message(data) Index: tests/atf_python/sys/net/vnet.py =================================================================== --- /dev/null +++ tests/atf_python/sys/net/vnet.py @@ -0,0 +1,203 @@ +#!/usr/local/bin/python3 +import os +import socket +import time +from ctypes import cdll +from ctypes import get_errno +from ctypes.util import find_library +from typing import List +from typing import Optional + + +def run_cmd(cmd: str) -> str: + print("run: '{}'".format(cmd)) + return os.popen(cmd).read() + + +class VnetInterface(object): + INTERFACES_FNAME = "created_interfaces.lst" + + # defines from net/if_types.h + IFT_LOOP = 0x18 + IFT_ETHER = 0x06 + + def __init__(self, iface_name: str): + self.name = iface_name + self.vnet_name = "" + self.jailed = False + if iface_name.startswith("lo"): + self.iftype = self.IFT_LOOP + else: + self.iftype = self.IFT_ETHER + + @property + def ifindex(self): + return socket.if_nametoindex(self.name) + + def set_vnet(self, vnet_name: str): + self.vnet_name = vnet_name + + def set_jailed(self, jailed: bool): + self.jailed = jailed + + def run_cmd(self, cmd): + if self.vnet_name and not self.jailed: + cmd = "jexec {} {}".format(self.vnet_name, cmd) + run_cmd(cmd) + + @staticmethod + def file_append_line(line): + with open(VnetInterface.INTERFACES_FNAME, "a") as f: + f.write(line + "\n") + + @classmethod + def create_iface(cls, iface_name: str): + name = run_cmd("/sbin/ifconfig {} create".format(iface_name)).rstrip() + if not name: + raise Exception("Unable to create iface {}".format(iface_name)) + cls.file_append_line(name) + if name.startswith("epair"): + cls.file_append_line(name[:-1] + "b") + return cls(name) + + @staticmethod + def cleanup_ifaces(): + try: + with open(VnetInterface.INTERFACES_FNAME, "r") as f: + for line in f: + run_cmd("/sbin/ifconfig {} destroy".format(line.strip())) + os.unlink(VnetInterface.INTERFACES_FNAME) + except Exception: + pass + + def setup_addr(self, addr: str): + if ":" in addr: + family = "inet6" + else: + family = "inet" + cmd = "/sbin/ifconfig {} {} {}".format(self.name, family, addr) + self.run_cmd(cmd) + + def delete_addr(self, addr: str): + if ":" in addr: + cmd = "/sbin/ifconfig {} inet6 {} delete".format(self.name, addr) + else: + cmd = "/sbin/ifconfig {} -alias {}".format(self.name, addr) + self.run_cmd(cmd) + + def turn_up(self): + cmd = "/sbin/ifconfig {} up".format(self.name) + self.run_cmd(cmd) + + def enable_ipv6(self): + cmd = "/usr/sbin/ndp -i {} -disabled".format(self.name) + self.run_cmd(cmd) + + +class VnetInstance(object): + JAILS_FNAME = "created_jails.lst" + + def __init__(self, vnet_name: str, jid: int, ifaces: List[VnetInterface]): + self.name = vnet_name + self.jid = jid + self.ifaces = ifaces + for iface in ifaces: + iface.set_vnet(vnet_name) + iface.set_jailed(True) + + def run_vnet_cmd(self, cmd): + if self.vnet_name: + cmd = "jexec {} {}".format(self.vnet_name, cmd) + return run_cmd(cmd) + + @staticmethod + def wait_interface(vnet_name: str, iface_name: str): + cmd = "jexec {} /sbin/ifconfig -l".format(vnet_name) + for i in range(50): + ifaces = run_cmd(cmd).strip().split(" ") + if iface_name in ifaces: + return True + time.sleep(0.1) + return False + + @staticmethod + def file_append_line(line): + with open(VnetInstance.JAILS_FNAME, "a") as f: + f.write(line + "\n") + + @staticmethod + def cleanup_vnets(): + try: + with open(VnetInstance.JAILS_FNAME) as f: + for line in f: + run_cmd("/usr/sbin/jail -r {}".format(line.strip())) + os.unlink(VnetInstance.JAILS_FNAME) + except Exception: + pass + + @classmethod + def create_with_interfaces(cls, vnet_name: str, ifaces: List[VnetInterface]): + iface_cmds = " ".join(["vnet.interface={}".format(i.name) for i in ifaces]) + cmd = "/usr/sbin/jail -i -c name={} persist vnet {}".format( + vnet_name, iface_cmds + ) + jid_str = run_cmd(cmd) + jid = int(jid_str) + if jid <= 0: + raise Exception("Jail creation failed, output: {}".format(jid)) + cls.file_append_line(vnet_name) + + for iface in ifaces: + if cls.wait_interface(vnet_name, iface.name): + continue + raise Exception( + "Interface {} has not appeared in vnet {}".format(iface.name, vnet_name) + ) + return cls(vnet_name, jid, ifaces) + + @staticmethod + def attach_jid(jid: int): + _path: Optional[str] = find_library("c") + if _path is None: + raise Exception("libc not found") + path: str = _path + libc = cdll.LoadLibrary(path) + if libc.jail_attach(jid) != 0: + raise Exception("jail_attach() failed: errno {}".format(get_errno())) + + def attach(self): + self.attach_jid(self.jid) + + +class SingleVnetTestTemplate(object): + num_epairs = 1 + IPV6_PREFIXES = [] + IPV4_PREFIXES = [] + + def setup_method(self, method): + test_name = method.__name__ + vnet_name = "jail_{}".format(test_name) + ifaces = [] + for i in range(self.num_epairs): + ifaces.append(VnetInterface.create_iface("epair")) + self.vnet = VnetInstance.create_with_interfaces(vnet_name, ifaces) + self.vnet.attach() + for i, addr in enumerate(self.IPV6_PREFIXES): + if addr: + iface = self.vnet.ifaces[i] + iface.turn_up() + iface.enable_ipv6() + iface.setup_addr(addr) + for i, addr in enumerate(self.IPV4_PREFIXES): + if addr: + iface = self.vnet.ifaces[i] + iface.turn_up() + iface.setup_addr(addr) + + def cleanup(self, nodeid: str): + print("==== vnet cleanup ===") + VnetInstance.cleanup_vnets() + VnetInterface.cleanup_ifaces() + + def run_cmd(self, cmd: str) -> str: + return os.popen(cmd).read() Index: tests/conftest.py =================================================================== --- /dev/null +++ tests/conftest.py @@ -0,0 +1,122 @@ +import pytest + +from atf_python.atf_pytest import ATFHandler + + +PLUGIN_ENABLED = False +DEFAULT_HANDLER = None + + +def get_handler(): + global DEFAULT_HANDLER + if DEFAULT_HANDLER is None: + DEFAULT_HANDLER = ATFHandler() + return DEFAULT_HANDLER + + +def pytest_addoption(parser): + """Add file output""" + # Add meta-values + group = parser.getgroup("general", "Running and selection options") + group.addoption("--atf-var", dest="atf_vars", action="append", default=[]) + group.addoption( + "--atf-source-dir", + type=str, + dest="atf_source_dir", + help="Path to the test source directory", + ) + group.addoption( + "--atf-cleanup", + default=False, + action="store_true", + dest="atf_cleanup", + help="Call cleanup procedure for a given test", + ) + group = parser.getgroup("terminal reporting", "reporting", after="general") + group.addoption( + "--atf", + default=False, + action="store_true", + help="Enable test listing/results output in atf format", + ) + group.addoption( + "--atf-file", + type=str, + dest="atf_file", + help="Path to the status file provided by atf runtime", + ) + + +@pytest.mark.trylast +def pytest_configure(config): + if config.option.help: + return + + # Register markings anyway to avoid warnings + config.addinivalue_line("markers", "require_user(name): user to run the test with") + config.addinivalue_line( + "markers", "require_arch(names): List[str] of support archs" + ) + # config.addinivalue_line("markers", "require_config(config): List[Tuple[str,Any]] of k=v pairs") + config.addinivalue_line( + "markers", "require_diskspace(amount): str with required diskspace" + ) + config.addinivalue_line( + "markers", "require_files(space): List[str] with file paths" + ) + config.addinivalue_line( + "markers", "require_machine(names): List[str] of support machine types" + ) + config.addinivalue_line( + "markers", "require_memory(amount): str with required memory" + ) + config.addinivalue_line( + "markers", "require_progs(space): List[str] with file paths" + ) + config.addinivalue_line( + "markers", "timeout(dur): int/float with max duration in sec" + ) + + global PLUGIN_ENABLED + PLUGIN_ENABLED = config.option.atf + if not PLUGIN_ENABLED: + return + get_handler() + + if config.option.collectonly: + # Need to output list of tests to stdout, hence override + # standard reporter plugin + reporter = config.pluginmanager.getplugin("terminalreporter") + if reporter: + config.pluginmanager.unregister(reporter) + + +def pytest_collection_modifyitems(session, config, items): + """If cleanup is requested, replace collected tests with their cleanups (if any)""" + if PLUGIN_ENABLED and config.option.atf_cleanup: + new_items = [] + handler = get_handler() + for obj in items: + if handler.has_object_cleanup(obj): + handler.override_runtest(obj) + new_items.append(obj) + items.clear() + items.extend(new_items) + + +def pytest_collection_finish(session): + if PLUGIN_ENABLED and session.config.option.collectonly: + handler = get_handler() + handler.list_tests(session.items) + + +def pytest_runtest_logreport(report): + if PLUGIN_ENABLED: + handler = get_handler() + handler.add_report(report) + + +def pytest_unconfigure(config): + if PLUGIN_ENABLED and config.option.atf_file: + handler = get_handler() + handler.write_report(config.option.atf_file) Index: tests/freebsd_test_suite/Makefile =================================================================== --- /dev/null +++ tests/freebsd_test_suite/Makefile @@ -0,0 +1,11 @@ +.include + +PACKAGE=tests +PROG= atf_pytest_wrapper +MAN= +BINDIR= + +.include +DESTDIR=${TESTSBASE} + +.include Index: tests/freebsd_test_suite/atf_pytest_wrapper.c =================================================================== --- /dev/null +++ tests/freebsd_test_suite/atf_pytest_wrapper.c @@ -0,0 +1,138 @@ +#include +#include +#include +#include +#include +#include +#include + +#include + + +int +main(int argc, char **argv) +{ + char *src_dir = NULL, *dst_file = NULL, *python_path = NULL; + bool FLAG_list = false, FLAG_debug = false; + + if (FLAG_debug) { + fprintf(stderr, "IN: "); + for (int i = 0; i < argc; i++) { + fprintf(stderr, "'%s' ", argv[i]); + } + fprintf(stderr, "\n"); + } + + // atf_wrap ["-S pythonpath"] path_to_script -l + // atf_wrap path_to_script -r /path1 -s /path2 -v k1=v1 -v k2=v2 test_name + // + // skip binary name + argc--; argv++; + // parse "-S path" from kernel + if (argc > 0 && !strncmp(argv[0], "-S ", 3)) { + python_path = &argv[0][3]; + argc--; argv++; + } + + if (argc < 2) + exit(1); + char *script_path = argv[0]; + + // allocate storage for kv pairs + char **kv = calloc(8, sizeof(char *)); + int kv_num = 0, kv_size = 8; + + int c; + while ((c = getopt(argc, argv, "lr:s:v:")) != -1) { + switch (c) { + case 'l': + FLAG_list = true; + break; + case 's': + src_dir = optarg; + break; + case 'r': + dst_file = optarg; + break; + case 'v': + if (kv_num == kv_size) { + kv_size *= 2; + kv = realloc(kv, kv_size * sizeof(char *)); + } + kv[kv_num++] = optarg; + break; + } + } + argc -= optind; + argv += optind; + + if (python_path != NULL) { + char *current_path = getenv("PYTHONPATH"); + if (current_path != NULL) { + int total_len = strlen(python_path) + strlen(current_path) + 2; + char *new_path = malloc(total_len); + snprintf(new_path, total_len, "%s:%s", python_path, current_path); + python_path = new_path; + } + setenv("PYTHONPATH", python_path, 1); + } + + int idx = 0, num_args = 20 + 2 * kv_num; + char **xargs = malloc(num_args * sizeof(char *)); + + xargs[idx++] = strdup("pytest"); + xargs[idx++] = strdup("-p"); + xargs[idx++] = strdup("no:cacheprovider"); + xargs[idx++] = strdup("-s"); + xargs[idx++] = strdup("--atf"); + + if (FLAG_list) { + // pytest -p no:cacheprovider --atf --co ${path} + xargs[idx++] = strdup("--co"); + xargs[idx++] = script_path; + xargs[idx++] = NULL; + } else { + /* One argument has to be ready */ + if (argc != 1) + errx(1, "wrong number of argument"); + + bool FLAG_cleanup = false; + char *test_name = *argv; + int tlen = strlen(test_name); + // len(":cleanup") = 8 + if (tlen >= 8 && strcmp(&test_name[tlen - 8], ":cleanup") == 0) { + test_name[tlen - 8] = '\0'; + FLAG_cleanup = true; + } + char test_id[1024]; + snprintf(test_id, sizeof(test_id), "%s::%s", script_path, test_name); + + /* pytest -p no:cacheprovider --atf --atf-file /path /path/to/test::test_name */ + if (FLAG_cleanup) + xargs[idx++] = strdup("--atf-cleanup"); + if (src_dir != NULL) { + xargs[idx++] = strdup("--atf-source-dir"); + xargs[idx++] = src_dir; + } + if (dst_file != NULL) { + xargs[idx++] = strdup("--atf-file"); + xargs[idx++] = dst_file; + } + for (int i = 0; i < kv_num; i++) { + xargs[idx++] = strdup("--atf-var"); + xargs[idx++] = kv[i]; + } + xargs[idx++] = test_id; + xargs[idx++] = NULL; + } + if (FLAG_debug) { + fprintf(stderr, "OUT: "); + for (int i = 0; xargs[i] != NULL; i++) { + xargs[i] = strdup(xargs[i]); + fprintf(stderr, "'%s' ", xargs[i]); + } + fprintf(stderr, "\n"); + } + execvp(xargs[0], (char **)xargs); + return (0); +} Index: tests/sys/net/routing/Makefile =================================================================== --- tests/sys/net/routing/Makefile +++ tests/sys/net/routing/Makefile @@ -7,6 +7,7 @@ ATF_TESTS_C += test_rtsock_l3 ATF_TESTS_C += test_rtsock_lladdr +ATF_TESTS_PYTEST = test_pytest.py test_rtsock_multipath.py ${PACKAGE}FILES+= generic_cleanup.sh ${PACKAGE}FILESMODE_generic_cleanup.sh=0555 Index: tests/sys/net/routing/test_pytest.py =================================================================== --- /dev/null +++ tests/sys/net/routing/test_pytest.py @@ -0,0 +1,93 @@ +#!/home/melifaro/atf_wrapper +import pytest + + +@pytest.mark.require_user("root") +@pytest.mark.require_arch(["amd64", "i386"]) +# @pytest.mark.require_diskspace("100M") +@pytest.mark.require_files(["/bin/ls", "/bin/test"]) +@pytest.mark.require_machine(["amd64", "i386"]) +@pytest.mark.require_memory("100M") +@pytest.mark.require_progs(["/bin/ls", "/bin/test"]) +@pytest.mark.timeout("299") +def test_test0_param_check(): + pass + + +@pytest.mark.parametrize("test_input,expected", [("a", "a"), ("bb", "bb")]) +def test_test1(test_input, expected): + """Description 1""" + assert test_input == expected + + +@pytest.mark.require_user("root") +def test_test2(): + """Description 2""" + print("TEST2 -> STDOUT MESSAGE") + print("TEST2 -> STDERR MESSAGE", file=sys.stderr) + assert True + + +def test_test2_cleanup(): + print("CLEANUP CALLED") + + +@pytest.mark.skip(reason="unable to test") +def test_test3(): + """Description 2""" + assert True + + +@pytest.mark.skip() +def test_test31(): + """Description 2""" + assert True + + +def test_test4(): + """Description 2""" + pytest.skip("because I can") + + +@pytest.mark.xfail +def test_test5_marked_xfail_xfailed(): + """Description 2""" + pytest.xfail("because I can") + + +@pytest.mark.xfail +def test_test6_marked_xfail_passed(): + """Description 2""" + pass + + +@pytest.mark.xfail(strict=True) +def test_test7_marked_xfail_strict_xfailed(): + """Description 2""" + pytest.xfail("because I can") + pass + + +@pytest.mark.xfail(strict=True) +def test_test8_marked_xfail_strict_xfailed(): + """Description 2""" + pass + + +def test_test9_call_failed(): + """Description 1""" + assert True == False + + +class TestClass: + def test_first(self): + """Dummy description""" + print() + print("stdout check") + + def test_second(self): + pass + + def cleanup(self, nodeid: str): + test_name = nodeid.split("::", 1)[1] + print("CLASS CLEANUP called for {}".format(test_name)) Index: tests/sys/net/routing/test_rtsock_multipath.py =================================================================== --- /dev/null +++ tests/sys/net/routing/test_rtsock_multipath.py @@ -0,0 +1,83 @@ +#!/usr/tests/atf_pytest_wrapper -S /usr/tests +import pytest +from atf_python.sys.net.rtsock import RtConst +from atf_python.sys.net.rtsock import Rtsock +from atf_python.sys.net.rtsock import RtsockRtMessage +from atf_python.sys.net.rtsock import SaHelper +from atf_python.sys.net.vnet import SingleVnetTestTemplate +from atf_python.sys.net.vnet import VnetInstance + + +class BaseIPv4RoutingTestTemplate(SingleVnetTestTemplate): + IPV4_PREFIXES = ["192.0.2.1/24"] + + def setup_method(self, method_name): + super().setup_method(method_name) + self.rtsock = Rtsock() + + +class BaseIPv6RoutingTestTemplate(SingleVnetTestTemplate): + IPV6_PREFIXES = ["2001:DB8::1/64"] + + def setup_method(self, method): + super().setup_method(method) + self.rtsock = Rtsock() + + +class TestRtmGetv4ExactSuccess(BaseIPv4RoutingTestTemplate): + @pytest.mark.require_user("root") + @pytest.mark.require_arch(["amd64", "i386"]) + # @pytest.mark.require_diskspace("100M") + @pytest.mark.require_files(["/bin/ls", "/bin/test"]) + @pytest.mark.require_machine(["amd64", "i386"]) + @pytest.mark.require_memory("100M") + @pytest.mark.require_progs(["/bin/ls", "/bin/test"]) + @pytest.mark.timeout("299") + def test_rtm_get_v4_exact_success(self): + """Tests RTM_GET with exact prefix lookup on an interface prefix""" + sa = SaHelper() + msg = RtsockRtMessage( + RtConst.RTM_GET, + self.rtsock.get_seq(), + sa.ip_sa("192.0.2.0"), + sa.ip_sa("255.255.255.0"), + ) + self.rtsock.write_message(msg) + + iface = self.vnet.ifaces[0] + desired_sa = { + RtConst.RTA_DST: sa.ip_sa("192.0.2.0"), + RtConst.RTA_NETMASK: sa.ip_sa("255.255.255.0"), + RtConst.RTA_GATEWAY: sa.link_sa(ifindex=iface.ifindex, iftype=iface.iftype), + } + + data = self.rtsock.read_data(msg.rtm_seq) + msg = RtsockRtMessage.from_bytes(data) + print("vvvvvvvv IN vvvvvvvv") + msg.print_message() + msg.verify(RtConst.RTM_GET, desired_sa) + + +class TestAddRouteWithRta(BaseIPv4RoutingTestTemplate): + def test_add_route_with_rta(self): + sa = SaHelper() + msg = RtsockRtMessage( + RtConst.RTM_GET, + self.rtsock.get_seq(), + sa.ip_sa("192.0.2.0"), + sa.ip_sa("255.255.255.0"), + ) + self.rtsock.write_message(msg) + + iface = self.vnet.ifaces[0] + desired_sa = { + RtConst.RTA_DST: sa.ip_sa("192.0.2.0"), + RtConst.RTA_NETMASK: sa.ip_sa("255.255.255.0"), + RtConst.RTA_GATEWAY: sa.link_sa(ifindex=iface.ifindex, iftype=iface.iftype), + } + + data = self.rtsock.read_data(msg.rtm_seq) + msg = RtsockRtMessage.from_bytes(data) + print("vvvvvvvv IN vvvvvvvv") + msg.print_message() + msg.verify(RtConst.RTM_GET, desired_sa)