diff --git a/etc/mtree/BSD.tests.dist b/etc/mtree/BSD.tests.dist --- a/etc/mtree/BSD.tests.dist +++ b/etc/mtree/BSD.tests.dist @@ -450,6 +450,8 @@ .. ifconfig .. + ipfw + .. md5 .. mdconfig diff --git a/sbin/ipfw/ipfw2.h b/sbin/ipfw/ipfw2.h --- a/sbin/ipfw/ipfw2.h +++ b/sbin/ipfw/ipfw2.h @@ -48,6 +48,7 @@ int test_only; /* only check syntax */ int comment_only; /* only print action and comment */ int verbose; /* be verbose on some commands */ + int debug_only; /* output ioctl i/o on stdout */ /* The options below can have multiple values. */ diff --git a/sbin/ipfw/ipfw2.c b/sbin/ipfw/ipfw2.c --- a/sbin/ipfw/ipfw2.c +++ b/sbin/ipfw/ipfw2.c @@ -587,6 +587,13 @@ return (strcmp(a, b)); } +struct debug_header { + uint16_t cmd_type; + uint16_t spare1; + uint32_t opt_name; + uint32_t total_len; + uint32_t spare2; +}; /* * conditionally runs the command. @@ -597,8 +604,18 @@ { int i; + if (g_co.debug_only) { + struct debug_header dbg = { + .cmd_type = 1, + .opt_name = optname, + .total_len = optlen + sizeof(struct debug_header), + }; + write(1, &dbg, sizeof(dbg)); + write(1, optval, optlen); + } + if (g_co.test_only) - return 0; + return (0); if (ipfw_socket == -1) ipfw_socket = socket(AF_INET, SOCK_RAW, IPPROTO_RAW); @@ -617,7 +634,7 @@ } else { i = setsockopt(ipfw_socket, IPPROTO_IP, optname, optval, optlen); } - return i; + return (i); } /* @@ -634,6 +651,18 @@ do_set3(int optname, ip_fw3_opheader *op3, size_t optlen) { + op3->opcode = optname; + + if (g_co.debug_only) { + struct debug_header dbg = { + .cmd_type = 2, + .opt_name = optname, + .total_len = optlen, sizeof(struct debug_header), + }; + write(1, &dbg, sizeof(dbg)); + write(1, op3, optlen); + } + if (g_co.test_only) return (0); @@ -642,7 +671,6 @@ if (ipfw_socket < 0) err(EX_UNAVAILABLE, "socket"); - op3->opcode = optname; return (setsockopt(ipfw_socket, IPPROTO_IP, IP_FW3, op3, optlen)); } @@ -663,6 +691,18 @@ int error; socklen_t len; + op3->opcode = optname; + + if (g_co.debug_only) { + struct debug_header dbg = { + .cmd_type = 3, + .opt_name = optname, + .total_len = *optlen + sizeof(struct debug_header), + }; + write(1, &dbg, sizeof(dbg)); + write(1, op3, *optlen); + } + if (g_co.test_only) return (0); @@ -671,7 +711,6 @@ if (ipfw_socket < 0) err(EX_UNAVAILABLE, "socket"); - op3->opcode = optname; len = *optlen; error = getsockopt(ipfw_socket, IPPROTO_IP, IP_FW3, op3, &len); diff --git a/sbin/ipfw/main.c b/sbin/ipfw/main.c --- a/sbin/ipfw/main.c +++ b/sbin/ipfw/main.c @@ -277,7 +277,7 @@ optind = optreset = 1; /* restart getopt() */ if (is_ipfw()) { - while ((ch = getopt(ac, av, "abcdDefhinNp:qs:STtv")) != -1) + while ((ch = getopt(ac, av, "abcdDefhinNp:qs:STtvx")) != -1) switch (ch) { case 'a': do_acct = 1; @@ -354,6 +354,10 @@ g_co.verbose = 1; break; + case 'x': /* debug output */ + g_co.debug_only = 1; + break; + default: free(save_av); return 1; diff --git a/sbin/ipfw/tests/Makefile b/sbin/ipfw/tests/Makefile new file mode 100644 --- /dev/null +++ b/sbin/ipfw/tests/Makefile @@ -0,0 +1,5 @@ +PACKAGE= tests + +ATF_TESTS_PYTEST+= test_add_rule.py + +.include diff --git a/sbin/ipfw/tests/test_add_rule.py b/sbin/ipfw/tests/test_add_rule.py new file mode 100755 --- /dev/null +++ b/sbin/ipfw/tests/test_add_rule.py @@ -0,0 +1,282 @@ +import errno +import json +import os +import socket +import struct +import subprocess +import sys +from ctypes import c_byte +from ctypes import c_char +from ctypes import c_int +from ctypes import c_long +from ctypes import c_uint32 +from ctypes import c_uint8 +from ctypes import c_ulong +from ctypes import c_ushort +from ctypes import sizeof +from ctypes import Structure +from enum import Enum +from typing import Any +from typing import Dict +from typing import List +from typing import NamedTuple +from typing import Optional +from typing import Union + +import pytest +from atf_python.sys.netpfil.ipfw.insns import Insn +from atf_python.sys.netpfil.ipfw.insns import InsnEmpty +from atf_python.sys.netpfil.ipfw.insns import InsnIp +from atf_python.sys.netpfil.ipfw.insns import InsnIp6 +from atf_python.sys.netpfil.ipfw.insns import InsnPorts +from atf_python.sys.netpfil.ipfw.insns import InsnProb +from atf_python.sys.netpfil.ipfw.insns import InsnProto +from atf_python.sys.netpfil.ipfw.insns import InsnTable +from atf_python.sys.netpfil.ipfw.insns import IpFwOpcode +from atf_python.sys.netpfil.ipfw.ioctl import CTlv +from atf_python.sys.netpfil.ipfw.ioctl import CTlvRule +from atf_python.sys.netpfil.ipfw.ioctl import IpFwTlvType +from atf_python.sys.netpfil.ipfw.ioctl import IpFwXRule +from atf_python.sys.netpfil.ipfw.ioctl import NTlv +from atf_python.sys.netpfil.ipfw.ioctl import Op3CmdType +from atf_python.sys.netpfil.ipfw.ioctl import RawRule +from atf_python.sys.netpfil.ipfw.ipfw import DebugIoReader +from atf_python.sys.netpfil.ipfw.utils import enum_from_int +from atf_python.utils import BaseTest + +IPFW_PATH = "/usr/obj/usr/home/melifaro/free/head/amd64.amd64/sbin/ipfw/ipfw" + + +def differ(w_obj, g_obj, w_stack=[], g_stack=[]): + if bytes(w_obj) == bytes(g_obj): + return True + num_objects = 0 + for i, w_child in enumerate(w_obj.obj_list): + if i > len(g_obj.obj_list): + print("MISSING object from chain {}".format(" / ".join(w_stack))) + w_child.print_obj() + print("==========================") + return False + g_child = g_obj.obj_list[i] + if bytes(w_child) == bytes(g_child): + num_objects += 1 + continue + w_stack.append(w_obj.obj_name) + g_stack.append(g_obj.obj_name) + if not differ(w_child, g_child, w_stack, g_stack): + return False + break + if num_objects == len(w_obj.obj_list) and num_objects < len(g_obj.obj_list): + g_child = g_obj.obj_list[num_objects] + print("EXTRA object from chain {}".format(" / ".join(g_stack))) + g_child.print_obj() + print("==========================") + return False + print("OBJECTS DIFFER") + print("WANTED CHAIN: {}".format(" / ".join(w_stack))) + w_obj.print_obj() + print("==========================") + print("GOT CHAIN: {}".format(" / ".join(g_stack))) + g_obj.print_obj() + print("==========================") + return False + + +class TestAddRule(BaseTest): + def compile_rule(self, out): + tlvs = [] + if "objs" in out: + tlvs.append(CTlv(IpFwTlvType.IPFW_TLV_TBLNAME_LIST, out["objs"])) + rule = RawRule(rulenum=out.get("rulenum", 0), obj_list=out["insns"]) + tlvs.append(CTlvRule(obj_list=[rule])) + return IpFwXRule(Op3CmdType.IP_FW_XADD, tlvs) + + def verify_rule(self, in_data: str, out_data): + # Prepare the desired output + expected = self.compile_rule(out_data) + + reader = DebugIoReader(IPFW_PATH) + ioctls = reader.get_records(in_data) + assert len(ioctls) == 1 # Only 1 ioctl request expected + got = ioctls[0] + + if not differ(expected, got): + print("=> CMD: {}".format(in_data)) + print("=> WANTED:") + expected.print_obj() + print("==========================") + print("=> GOT:") + got.print_obj() + print("==========================") + assert bytes(got) == bytes(expected) + + @pytest.mark.parametrize( + "rule", + [ + pytest.param( + { + "in": "add 200 allow ip from any to any", + "out": { + "insns": [InsnEmpty(IpFwOpcode.O_ACCEPT)], + "rulenum": 200, + }, + }, + id="test_rulenum", + ), + pytest.param( + { + "in": "add allow ip from { 1.2.3.4 or 2.3.4.5 } to any", + "out": { + "insns": [ + InsnIp(IpFwOpcode.O_IP_SRC, ip="1.2.3.4", is_or=True), + InsnIp(IpFwOpcode.O_IP_SRC, ip="2.3.4.5"), + InsnEmpty(IpFwOpcode.O_ACCEPT), + ], + }, + }, + id="test_or", + ), + pytest.param( + { + "in": "add allow ip from table(AAA) to table(BBB)", + "out": { + "objs": [ + NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"), + NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"), + ], + "insns": [ + InsnTable(IpFwOpcode.O_IP_SRC_LOOKUP, arg1=1), + InsnTable(IpFwOpcode.O_IP_DST_LOOKUP, arg1=2), + InsnEmpty(IpFwOpcode.O_ACCEPT), + ], + }, + }, + id="test_tables", + ), + pytest.param( + { + "in": "add allow ip from any to any keep-state :default", + "out": { + "objs": [ + NTlv( + IpFwTlvType.IPFW_TLV_STATE_NAME, idx=1, name=":default" + ), + ], + "insns": [ + Insn(IpFwOpcode.O_CHECK_STATE, arg1=1), + Insn(IpFwOpcode.O_KEEP_STATE, arg1=1), + InsnEmpty(IpFwOpcode.O_ACCEPT), + ], + }, + }, + id="test_keep_state", + ), + ], + ) + def test_add_rule(self, rule): + """Tests if the compiled rule is sane and matches the spec""" + self.verify_rule(rule["in"], rule["out"]) + + @pytest.mark.parametrize( + "insn", + [ + pytest.param( + { + "in": "add prob 0.7 allow ip from any to any", + "out": InsnProb(prob=0.7), + }, + id="test_prob", + ), + pytest.param( + { + "in": "add allow tcp from any to any", + "out": InsnProto(arg1=6), + }, + id="test_proto", + ), + pytest.param( + { + "in": "add allow ip from any to any 57", + "out": InsnPorts(IpFwOpcode.O_IP_DSTPORT, port_pairs=[57, 57]), + }, + id="test_ports", + ), + ], + ) + def test_add_single_instruction(self, insn): + """Tests if the compiled rule is sane and matches the spec""" + + # Prepare the desired output + out = { + "insns": [insn["out"], InsnEmpty(IpFwOpcode.O_ACCEPT)], + } + self.verify_rule(insn["in"], out) + + @pytest.mark.parametrize( + "opcode", + [ + pytest.param(IpFwOpcode.O_IP_SRCPORT, id="src"), + pytest.param(IpFwOpcode.O_IP_DSTPORT, id="dst"), + ], + ) + @pytest.mark.parametrize( + "params", + [ + pytest.param( + { + "in": "57", + "out": [(57, 57)], + }, + id="test_single", + ), + pytest.param( + { + "in": "57-59", + "out": [(57, 59)], + }, + id="test_range", + ), + pytest.param( + { + "in": "57-59,41", + "out": [(57, 59), (41, 41)], + }, + id="test_ranges", + ), + ], + ) + def test_add_ports(self, params, opcode): + if opcode == IpFwOpcode.O_IP_DSTPORT: + txt = "add allow ip from any to any " + params["in"] + else: + txt = "add allow ip from any " + params["in"] + " to any" + out = { + "insns": [ + InsnPorts(opcode, port_pairs=params["out"]), + InsnEmpty(IpFwOpcode.O_ACCEPT), + ] + } + self.verify_rule(txt, out) + + +tests = [ + { + "in": "add 200 prob 0.7 allow tcp from 2a02::1 to { table(AAA) or table(BBB) } 57", + "out": { + "objs": [ + NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"), + NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"), + ], + "insns": [ + InsnProb(prob=0.7), + InsnProto(arg1=6), + InsnIp6(IpFwOpcode.O_IP6_SRC, ip6="2a02::1"), + InsnTable(IpFwOpcode.O_IP_DST_LOOKUP, arg1=1, is_or=True), + InsnTable(IpFwOpcode.O_IP_DST_LOOKUP, arg1=2), + InsnPorts(IpFwOpcode.O_IP_DSTPORT, port_pairs=[57, 57]), + InsnEmpty(IpFwOpcode.O_ACCEPT), + ], + "rulenum": 200, + }, + }, +] diff --git a/tests/atf_python/sys/Makefile b/tests/atf_python/sys/Makefile --- a/tests/atf_python/sys/Makefile +++ b/tests/atf_python/sys/Makefile @@ -3,7 +3,7 @@ .PATH: ${.CURDIR} FILES= __init__.py -SUBDIR= net netlink +SUBDIR= net netlink netpfil .include FILESDIR= ${TESTSBASE}/atf_python/sys diff --git a/tests/atf_python/sys/Makefile b/tests/atf_python/sys/netpfil/Makefile copy from tests/atf_python/sys/Makefile copy to tests/atf_python/sys/netpfil/Makefile --- a/tests/atf_python/sys/Makefile +++ b/tests/atf_python/sys/netpfil/Makefile @@ -3,9 +3,9 @@ .PATH: ${.CURDIR} FILES= __init__.py -SUBDIR= net netlink +SUBDIR= ipfw .include -FILESDIR= ${TESTSBASE}/atf_python/sys +FILESDIR= ${TESTSBASE}/atf_python/sys/netpfil .include diff --git a/tests/atf_python/sys/netpfil/__init__.py b/tests/atf_python/sys/netpfil/__init__.py new file mode 100644 diff --git a/tests/atf_python/sys/netpfil/ipfw/Makefile b/tests/atf_python/sys/netpfil/ipfw/Makefile new file mode 100644 --- /dev/null +++ b/tests/atf_python/sys/netpfil/ipfw/Makefile @@ -0,0 +1,11 @@ +.include + +.PATH: ${.CURDIR} + +FILES= __init__.py insns.py ioctl.py ipfw.py utils.py + +.include +FILESDIR= ${TESTSBASE}/atf_python/sys/netpfil/ipfw + +.include + diff --git a/tests/atf_python/sys/netpfil/ipfw/__init__.py b/tests/atf_python/sys/netpfil/ipfw/__init__.py new file mode 100644 diff --git a/tests/atf_python/sys/netpfil/ipfw/insns.py b/tests/atf_python/sys/netpfil/ipfw/insns.py new file mode 100644 --- /dev/null +++ b/tests/atf_python/sys/netpfil/ipfw/insns.py @@ -0,0 +1,551 @@ +#!/usr/bin/env python3 +import os +import socket +import struct +import subprocess +import sys +from ctypes import c_byte +from ctypes import c_char +from ctypes import c_int +from ctypes import c_long +from ctypes import c_uint32 +from ctypes import c_uint8 +from ctypes import c_ulong +from ctypes import c_ushort +from ctypes import sizeof +from ctypes import Structure +from enum import Enum +from typing import Any +from typing import Dict +from typing import List +from typing import NamedTuple +from typing import Optional +from typing import Union + +from atf_python.sys.netpfil.ipfw.utils import AttrDescr +from atf_python.sys.netpfil.ipfw.utils import enum_from_int +from atf_python.sys.netpfil.ipfw.utils import prepare_attrs_map + + +class IpFwOpcode(Enum): + O_NOP = 0 + + O_IP_SRC = 1 # u32 = IP + O_IP_SRC_MASK = 2 # ip = IP/mask + O_IP_SRC_ME = 3 # none + O_IP_SRC_SET = 4 # u32=base, arg1=len, bitmap + O_IP_DST = 5 # u32 = IP + O_IP_DST_MASK = 6 # ip = IP/mask + O_IP_DST_ME = 7 # none + O_IP_DST_SET = 8 # u32=base, arg1=len, bitmap + O_IP_SRCPORT = 9 # (n)port list:mask 4 byte ea + O_IP_DSTPORT = 10 # (n)port list:mask 4 byte ea + O_PROTO = 11 # arg1=protocol + O_MACADDR2 = 12 # 2 mac addr:mask + O_MAC_TYPE = 13 # same as srcport + O_LAYER2 = 14 # none + O_IN = 15 # none + O_FRAG = 16 # none + O_RECV = 17 # none + O_XMIT = 18 # none + O_VIA = 19 # none + O_IPOPT = 20 # arg1 = 2*u8 bitmap + O_IPLEN = 21 # arg1 = len + O_IPID = 22 # arg1 = id + O_IPTOS = 23 # arg1 = id + O_IPPRECEDENCE = 24 # arg1 = precedence << 5 + O_IPTTL = 25 # arg1 = TTL + O_IPVER = 26 # arg1 = version + O_UID = 27 # u32 = id + O_GID = 28 # u32 = id + O_ESTAB = 29 # none (tcp established) + O_TCPFLAGS = 30 # arg1 = 2*u8 bitmap + O_TCPWIN = 31 # arg1 = desired win + O_TCPSEQ = 32 # u32 = desired seq. + O_TCPACK = 33 # u32 = desired seq. + O_ICMPTYPE = 34 # u32 = icmp bitmap + O_TCPOPTS = 35 # arg1 = 2*u8 bitmap + O_VERREVPATH = 36 # none + O_VERSRCREACH = 37 # none + O_PROBE_STATE = 38 # none + O_KEEP_STATE = 39 # none + O_LIMIT = 40 # ipfw_insn_limit + O_LIMIT_PARENT = 41 # dyn_type, not an opcode. + O_LOG = 42 # ipfw_insn_log + O_PROB = 43 # u32 = match probability + O_CHECK_STATE = 44 # none + O_ACCEPT = 45 # none + O_DENY = 46 # none + O_REJECT = 47 # arg1=icmp arg (same as deny) + O_COUNT = 48 # none + O_SKIPTO = 49 # arg1=next rule number + O_PIPE = 50 # arg1=pipe number + O_QUEUE = 51 # arg1=queue number + O_DIVERT = 52 # arg1=port number + O_TEE = 53 # arg1=port number + O_FORWARD_IP = 54 # fwd sockaddr + O_FORWARD_MAC = 55 # fwd mac + O_NAT = 56 # nope + O_REASS = 57 # none + O_IPSEC = 58 # has ipsec history + O_IP_SRC_LOOKUP = 59 # arg1=table number, u32=value + O_IP_DST_LOOKUP = 60 # arg1=table number, u32=value + O_ANTISPOOF = 61 # none + O_JAIL = 62 # u32 = id + O_ALTQ = 63 # u32 = altq classif. qid + O_DIVERTED = 64 # arg1=bitmap (1:loop, 2:out) + O_TCPDATALEN = 65 # arg1 = tcp data len + O_IP6_SRC = 66 # address without mask + O_IP6_SRC_ME = 67 # my addresses + O_IP6_SRC_MASK = 68 # address with the mask + O_IP6_DST = 69 + O_IP6_DST_ME = 70 + O_IP6_DST_MASK = 71 + O_FLOW6ID = 72 # for flow id tag in the ipv6 pkt + O_ICMP6TYPE = 73 # icmp6 packet type filtering + O_EXT_HDR = 75 # filtering for ipv6 extension header + O_IP6 = 76 + O_NETGRAPH = 77 # send to ng_ipfw + O_NGTEE = 78 # copy to ng_ipfw + O_IP4 = 79 + O_UNREACH6 = 80 # arg1=icmpv6 code arg (deny) + O_TAG = 81 # arg1=tag number + O_TAGGED = 82 # arg1=tag number + O_SETFIB = 83 # arg1=FIB number + O_FIB = 84 # arg1=FIB desired fib number + O_SOCKARG = 85 # socket argument + O_CALLRETURN = 86 # arg1=called rule number + O_FORWARD_IP6 = 87 # fwd sockaddr_in6 + O_DSCP = 88 # 2 u32 = DSCP mask + O_SETDSCP = 89 # arg1=DSCP value + O_IP_FLOW_LOOKUP = 90 # arg1=table number, u32=value + O_EXTERNAL_ACTION = 91 # arg1=id of external action handler + O_EXTERNAL_INSTANCE = 92 # arg1=id of eaction handler instance + O_EXTERNAL_DATA = 93 # variable length data + O_SKIP_ACTION = 94 # none + O_TCPMSS = 95 # arg1=MSS value + O_MAC_SRC_LOOKUP = 96 # arg1=table number, u32=value + O_MAC_DST_LOOKUP = 97 # arg1=table number, u32=value + O_SETMARK = 98 # u32 = value + O_MARK = 99 # 2 u32 = value, bitmask + + O_LAST_OPCODE = 100 # not an opcode! + + +insn_actions = ( + IpFwOpcode.O_CHECK_STATE.value, + IpFwOpcode.O_REJECT.value, + IpFwOpcode.O_UNREACH6.value, + IpFwOpcode.O_ACCEPT.value, + IpFwOpcode.O_DENY.value, + IpFwOpcode.O_REJECT.value, + IpFwOpcode.O_COUNT.value, + IpFwOpcode.O_NAT.value, + IpFwOpcode.O_QUEUE.value, + IpFwOpcode.O_PIPE.value, + IpFwOpcode.O_SKIPTO.value, + IpFwOpcode.O_NETGRAPH.value, + IpFwOpcode.O_NGTEE.value, + IpFwOpcode.O_DIVERT.value, + IpFwOpcode.O_TEE.value, + IpFwOpcode.O_CALLRETURN.value, + IpFwOpcode.O_FORWARD_IP.value, + IpFwOpcode.O_FORWARD_IP6.value, + IpFwOpcode.O_SETFIB.value, + IpFwOpcode.O_SETDSCP.value, + IpFwOpcode.O_REASS.value, + IpFwOpcode.O_SETMARK.value, + IpFwOpcode.O_EXTERNAL_ACTION.value, +) + + +class IpFwInsn(Structure): + _fields_ = [ + ("opcode", c_uint8), + ("length", c_uint8), + ("arg1", c_ushort), + ] + + +class BaseInsn(object): + obj_enum_class = IpFwOpcode + + def __init__(self, opcode, is_or, is_not, arg1): + if isinstance(opcode, Enum): + self.obj_type = opcode.value + self._enum = opcode + else: + self.obj_type = opcode + self._enum = enum_from_int(self.obj_enum_class, self.obj_type) + self.is_or = is_or + self.is_not = is_not + self.arg1 = arg1 + self.is_action = self.obj_type in insn_actions + self.ilen = 1 + self.obj_list = [] + + @property + def obj_name(self): + if self._enum is not None: + return self._enum.name + else: + return "opcode#{}".format(self.obj_type) + + @staticmethod + def get_insn_len(data: bytes) -> int: + (opcode_len,) = struct.unpack("@B", data[1:2]) + return opcode_len & 0x3F + + @classmethod + def _validate_len(cls, data, valid_options=None): + if len(data) < 4: + raise ValueError("opcode too short") + opcode_type, opcode_len = struct.unpack("@BB", data[:2]) + if len(data) != ((opcode_len & 0x3F) * 4): + raise ValueError("wrong length") + if valid_options and len(data) not in valid_options: + raise ValueError( + "len {} not in {} for opcode#{}".format( + len(data), valid_options, data[0] + ) + ) + + @classmethod + def _validate(cls, data): + cls._validate_len(data) + + @classmethod + def _parse(cls, data): + insn = IpFwInsn.from_buffer_copy(data[:4]) + is_or = (insn.length & 0x40) != 0 + is_not = (insn.length & 0x80) != 0 + return cls(opcode=insn.opcode, is_or=is_or, is_not=is_not, arg1=insn.arg1) + + @classmethod + def from_bytes(cls, data, attr_type_enum): + cls._validate(data) + opcode = cls._parse(data) + opcode._enum = attr_type_enum + return opcode + + def __bytes__(self): + raise NotImplementedError() + + def print_obj(self, prepend=""): + is_or = "" + if self.is_or: + is_or = " [OR]\\" + is_not = "" + if self.is_not: + is_not = "[!] " + print( + "{}{}len={} type={}({}){}{}".format( + prepend, + is_not, + len(bytes(self)), + self.obj_name, + self.obj_type, + self._print_obj_value(), + is_or, + ) + ) + + def _print_obj_value(self): + raise NotImplementedError() + + @staticmethod + def parse_insns(data, attr_map): + ret = [] + off = 0 + while off + sizeof(IpFwInsn) <= len(data): + hdr = IpFwInsn.from_buffer_copy(data[off : off + sizeof(IpFwInsn)]) + insn_len = (hdr.length & 0x3F) * 4 + if off + insn_len > len(data): + raise ValueError("wrng length") + # print("GET insn type {} len {}".format(hdr.opcode, insn_len)) + attr = attr_map.get(hdr.opcode, None) + if attr is None: + cls = InsnUnknown + type_enum = enum_from_int(BaseInsn.obj_enum_class, hdr.opcode) + else: + cls = attr["ad"].cls + type_enum = attr["ad"].val + insn = cls.from_bytes(data[off : off + insn_len], type_enum) + ret.append(insn) + off += insn_len + + if off != len(data): + raise ValueError("empty space") + return ret + + +class Insn(BaseInsn): + def __init__(self, opcode, is_or=False, is_not=False, arg1=0): + super().__init__(opcode, is_or=is_or, is_not=is_not, arg1=arg1) + + @classmethod + def _validate(cls, data): + cls._validate_len(data, [4]) + + def __bytes__(self): + length = self.ilen + if self.is_or: + length |= 0x40 + if self.is_not: + length | 0x80 + insn = IpFwInsn(opcode=self.obj_type, length=length, arg1=self.arg1) + return bytes(insn) + + def _print_obj_value(self): + return " arg1={}".format(self.arg1) + + +class InsnUnknown(Insn): + @classmethod + def _validate(cls, data): + cls._validate_len(data) + + @classmethod + def _parse(cls, data): + self = super()._parse(data) + self._data = data + return self + + def __bytes__(self): + return self._data + + def _print_obj_value(self): + return " " + " ".join(["x{:02X}".format(b) for b in self._data]) + + +class InsnEmpty(Insn): + @classmethod + def _validate(cls, data): + cls._validate_len(data, [4]) + insn = IpFwInsn.from_buffer_copy(data[:4]) + if insn.arg1 != 0: + raise ValueError("arg1 should be empty") + + def _print_obj_value(self): + return "" + + +class InsnProto(Insn): + def __init__(self, opcode=IpFwOpcode.O_PROTO, is_or=False, is_not=False, arg1=0): + super().__init__(opcode, is_or=is_or, is_not=is_not, arg1=arg1) + + def _print_obj_value(self): + known_map = {6: "TCP", 17: "UDP", 41: "IPV6"} + proto = self.arg1 + if proto in known_map: + return " proto={}".format(known_map[proto]) + else: + return " proto=#{}".format(proto) + + +class InsnU32(Insn): + def __init__(self, opcode, is_or=False, is_not=False, arg1=0, u32=0): + super().__init__(opcode, is_or=is_or, is_not=is_not, arg1=arg1) + self.u32 = u32 + self.ilen = 2 + + @classmethod + def _validate(cls, data): + cls._validate_len(data, [8]) + + @classmethod + def _parse(cls, data): + self = super()._parse(data[:4]) + self.u32 = struct.unpack("@I", data[4:8])[0] + return self + + def __bytes__(self): + return super().__bytes__() + struct.pack("@I", self.u32) + + def _print_obj_value(self): + return " arg1={} u32={}".format(self.arg1, self.u32) + + +class InsnProb(InsnU32): + def __init__( + self, + opcode=IpFwOpcode.O_PROB, + is_or=False, + is_not=False, + arg1=0, + u32=0, + prob=0.0, + ): + super().__init__(opcode, is_or=is_or, is_not=is_not) + self.prob = prob + + @property + def prob(self): + return 1.0 * self.u32 / 0x7FFFFFFF + + @prob.setter + def prob(self, prob: float): + self.u32 = int(prob * 0x7FFFFFFF) + + def _print_obj_value(self): + return " prob={}".format(round(self.prob, 5)) + + +class InsnIp(InsnU32): + def __init__(self, opcode, is_or=False, is_not=False, arg1=0, u32=0, ip=None): + super().__init__(opcode, is_or=is_or, is_not=is_not, arg1=arg1, u32=u32) + if ip: + self.ip = ip + + @property + def ip(self): + return socket.inet_ntop(socket.AF_INET, struct.pack("@I", self.u32)) + + @ip.setter + def ip(self, ip: str): + ip_bin = socket.inet_pton(socket.AF_INET, ip) + self.u32 = struct.unpack("@I", ip_bin)[0] + + def _print_opcode_value(self): + return " ip={}".format(self.ip) + + +class InsnTable(Insn): + @classmethod + def _validate(cls, data): + cls._validate_len(data, [4, 8]) + + @classmethod + def _parse(cls, data): + self = super()._parse(data) + + if len(data) == 8: + (self.val,) = struct.unpack("@I", data[4:8]) + self.ilen = 2 + else: + self.val = None + return self + + def __bytes__(self): + ret = super().__bytes__() + if getattr(self, "val", None) is not None: + ret += struct.pack("@I", self.val) + return ret + + def _print_obj_value(self): + if getattr(self, "val", None) is not None: + return " table={} value={}".format(self.arg1, self.val) + else: + return " table={}".format(self.arg1) + + +class InsnPorts(Insn): + def __init__(self, opcode, is_or=False, is_not=False, arg1=0, port_pairs=[]): + super().__init__(opcode, is_or=is_or, is_not=is_not) + self.port_pairs = [] + if port_pairs: + self.port_pairs = port_pairs + + @classmethod + def _validate(cls, data): + if len(data) < 8: + raise ValueError("no ports specified") + cls._validate_len(data) + + @classmethod + def _parse(cls, data): + self = super()._parse(data) + + off = 4 + port_pairs = [] + while off + 4 <= len(data): + low, high = struct.unpack("@HH", data[off : off + 4]) + port_pairs.append((low, high)) + off += 4 + self.port_pairs = port_pairs + return self + + def __bytes__(self): + ret = super().__bytes__() + if getattr(self, "val", None) is not None: + ret += struct.pack("@I", self.val) + return ret + + def _print_obj_value(self): + ret = [] + for p in self.port_pairs: + if p[0] == p[1]: + ret.append(str(p[0])) + else: + ret.append("{}-{}".format(p[0], p[1])) + return " ports={}".format(",".join(ret)) + + +class IpFwInsnIp6(Structure): + _fields_ = [ + ("o", IpFwInsn), + ("addr6", c_byte * 16), + ("mask6", c_byte * 16), + ] + + +class InsnIp6(Insn): + def __init__(self, opcode, is_or=False, is_not=False, arg1=0, ip6=None, mask6=None): + super().__init__(opcode, is_or=is_or, is_not=is_not, arg1=arg1) + self.ip6 = ip6 + self.mask6 = mask6 + if mask6 is not None: + self.ilen = 9 + else: + self.ilen = 5 + + @classmethod + def _validate(cls, data): + cls._validate_len(data, [4 + 16, 4 + 16 * 2]) + + @classmethod + def _parse(cls, data): + self = super()._parse(data) + self.ip6 = socket.inet_ntop(socket.AF_INET6, data[4:20]) + + if len(data) == 4 + 16 * 2: + self.mask6 = socket.inet_ntop(socket.AF_INET6, data[20:36]) + self.ilen = 9 + else: + self.mask6 = None + self.ilen = 5 + return self + + def __bytes__(self): + ret = super().__bytes__() + socket.inet_pton(socket.AF_INET6, self.ip6) + if self.mask6 is not None: + ret += socket.inet_pton(socket.AF_INET6, self.mask6) + return ret + + def _print_obj_value(self): + if self.mask6: + return " ip6={}/{}".format(self.ip6, self.mask6) + else: + return " ip6={}".format(self.ip6) + + +insn_attrs = prepare_attrs_map( + [ + AttrDescr(IpFwOpcode.O_NOP, Insn), + AttrDescr(IpFwOpcode.O_PROTO, InsnProto), + AttrDescr(IpFwOpcode.O_PROB, InsnProb), + AttrDescr(IpFwOpcode.O_IP_DST_ME, InsnEmpty), + AttrDescr(IpFwOpcode.O_IP_SRC_ME, InsnEmpty), + AttrDescr(IpFwOpcode.O_IP6_DST_ME, InsnEmpty), + AttrDescr(IpFwOpcode.O_IP6_SRC_ME, InsnEmpty), + AttrDescr(IpFwOpcode.O_ACCEPT, InsnEmpty), + AttrDescr(IpFwOpcode.O_IP_SRC, InsnIp), + AttrDescr(IpFwOpcode.O_IP_DST, InsnIp), + AttrDescr(IpFwOpcode.O_IP6_DST, InsnIp6), + AttrDescr(IpFwOpcode.O_IP6_SRC, InsnIp6), + AttrDescr(IpFwOpcode.O_IP_SRC_LOOKUP, InsnTable), + AttrDescr(IpFwOpcode.O_IP_DST_LOOKUP, InsnTable), + AttrDescr(IpFwOpcode.O_IP_SRCPORT, InsnPorts), + AttrDescr(IpFwOpcode.O_IP_DSTPORT, InsnPorts), + ] +) diff --git a/tests/atf_python/sys/netpfil/ipfw/ioctl.py b/tests/atf_python/sys/netpfil/ipfw/ioctl.py new file mode 100644 --- /dev/null +++ b/tests/atf_python/sys/netpfil/ipfw/ioctl.py @@ -0,0 +1,570 @@ +#!/usr/bin/env python3 +import os +import socket +import struct +import subprocess +import sys +from ctypes import c_byte +from ctypes import c_char +from ctypes import c_int +from ctypes import c_long +from ctypes import c_uint32 +from ctypes import c_uint8 +from ctypes import c_ulong +from ctypes import c_ushort +from ctypes import sizeof +from ctypes import Structure +from enum import Enum +from typing import Any +from typing import Dict +from typing import List +from typing import NamedTuple +from typing import Optional +from typing import Union + +import pytest +from atf_python.sys.netpfil.ipfw.insns import BaseInsn +from atf_python.sys.netpfil.ipfw.insns import insn_attrs +from atf_python.sys.netpfil.ipfw.utils import align8 +from atf_python.sys.netpfil.ipfw.utils import AttrDescr +from atf_python.sys.netpfil.ipfw.utils import enum_from_int +from atf_python.sys.netpfil.ipfw.utils import prepare_attrs_map + + +class IpFw3OpHeader(Structure): + _fields_ = [ + ("opcode", c_ushort), + ("version", c_ushort), + ("reserved1", c_ushort), + ("reserved2", c_ushort), + ] + + +class Op3CmdType(Enum): + IP_FW_TABLE_XADD = 86 + IP_FW_TABLE_XDEL = 87 + IP_FW_TABLE_XGETSIZE = 88 + IP_FW_TABLE_XLIST = 89 + IP_FW_TABLE_XDESTROY = 90 + IP_FW_TABLES_XLIST = 92 + IP_FW_TABLE_XINFO = 93 + IP_FW_TABLE_XFLUSH = 94 + IP_FW_TABLE_XCREATE = 95 + IP_FW_TABLE_XMODIFY = 96 + IP_FW_XGET = 97 + IP_FW_XADD = 98 + IP_FW_XDEL = 99 + IP_FW_XMOVE = 100 + IP_FW_XZERO = 101 + IP_FW_XRESETLOG = 102 + IP_FW_SET_SWAP = 103 + IP_FW_SET_MOVE = 104 + IP_FW_SET_ENABLE = 105 + IP_FW_TABLE_XFIND = 106 + IP_FW_XIFLIST = 107 + IP_FW_TABLES_ALIST = 108 + IP_FW_TABLE_XSWAP = 109 + IP_FW_TABLE_VLIST = 110 + + IP_FW_NAT44_XCONFIG = 111 + IP_FW_NAT44_DESTROY = 112 + IP_FW_NAT44_XGETCONFIG = 113 + IP_FW_NAT44_LIST_NAT = 114 + IP_FW_NAT44_XGETLOG = 115 + + IP_FW_DUMP_SOPTCODES = 116 + IP_FW_DUMP_SRVOBJECTS = 117 + + IP_FW_NAT64STL_CREATE = 130 + IP_FW_NAT64STL_DESTROY = 131 + IP_FW_NAT64STL_CONFIG = 132 + IP_FW_NAT64STL_LIST = 133 + IP_FW_NAT64STL_STATS = 134 + IP_FW_NAT64STL_RESET_STATS = 135 + + IP_FW_NAT64LSN_CREATE = 140 + IP_FW_NAT64LSN_DESTROY = 141 + IP_FW_NAT64LSN_CONFIG = 142 + IP_FW_NAT64LSN_LIST = 143 + IP_FW_NAT64LSN_STATS = 144 + IP_FW_NAT64LSN_LIST_STATES = 145 + IP_FW_NAT64LSN_RESET_STATS = 146 + + IP_FW_NPTV6_CREATE = 150 + IP_FW_NPTV6_DESTROY = 151 + IP_FW_NPTV6_CONFIG = 152 + IP_FW_NPTV6_LIST = 153 + IP_FW_NPTV6_STATS = 154 + IP_FW_NPTV6_RESET_STATS = 155 + + IP_FW_NAT64CLAT_CREATE = 160 + IP_FW_NAT64CLAT_DESTROY = 161 + IP_FW_NAT64CLAT_CONFIG = 162 + IP_FW_NAT64CLAT_LIST = 163 + IP_FW_NAT64CLAT_STATS = 164 + IP_FW_NAT64CLAT_RESET_STATS = 165 + + +class IpFwTableLookupType(Enum): + LOOKUP_DST_IP = 0 + LOOKUP_SRC_IP = 1 + LOOKUP_DST_PORT = 2 + LOOKUP_SRC_PORT = 3 + LOOKUP_UID = 4 + LOOKUP_JAIL = 5 + LOOKUP_DSCP = 6 + LOOKUP_DST_MAC = 7 + LOOKUP_SRC_MAC = 8 + LOOKUP_MARK = 9 + + +class IpFwObjTlv(Structure): + _fields_ = [ + ("n_type", c_ushort), + ("flags", c_ushort), + ("length", c_uint32), + ] + + +class IpFwTlvType(Enum): + IPFW_TLV_TBL_NAME = 1 + IPFW_TLV_TBLNAME_LIST = 2 + IPFW_TLV_RULE_LIST = 3 + IPFW_TLV_DYNSTATE_LIST = 4 + IPFW_TLV_TBL_ENT = 5 + IPFW_TLV_DYN_ENT = 6 + IPFW_TLV_RULE_ENT = 7 + IPFW_TLV_TBLENT_LIST = 8 + IPFW_TLV_RANGE = 9 + IPFW_TLV_EACTION = 10 + IPFW_TLV_COUNTERS = 11 + IPFW_TLV_OBJDATA = 12 + IPFW_TLV_STATE_NAME = 14 + + +class BaseTlv(object): + obj_enum_class = IpFwTlvType + + def __init__(self, obj_type): + if isinstance(obj_type, Enum): + self.obj_type = obj_type.value + self._enum = obj_type + else: + self.obj_type = obj_type + self._enum = enum_from_int(self.obj_enum_class, obj_type) + self.obj_list = [] + + def add_obj(self, obj): + self.obj_list.append(obj) + + @property + def len(self): + return len(bytes(self)) + + @property + def obj_name(self): + if self._enum is not None: + return self._enum.name + else: + return "tlv#{}".format(self.obj_type) + + def print_hdr(self, prepend=""): + print( + "{}len={} type={}({}){}".format( + prepend, self.len, self.obj_name, self.obj_type, self._print_obj_value() + ) + ) + + def print_obj(self, prepend=""): + self.print_hdr(prepend) + prepend = " " + prepend + for obj in self.obj_list: + obj.print_obj(prepend) + + @classmethod + def _validate(cls, data): + if len(data) < sizeof(IpFwObjTlv): + raise ValueError("TLV too short") + hdr = IpFwObjTlv.from_buffer_copy(data[: sizeof(IpFwObjTlv)]) + if len(data) != hdr.length: + raise ValueError("wrong TLV size") + + @classmethod + def _parse(cls, data): + hdr = IpFwObjTlv.from_buffer_copy(data[: sizeof(IpFwObjTlv)]) + return cls(hdr.n_type) + + @classmethod + def from_bytes(cls, data, attr_map=None): + cls._validate(data) + obj = cls._parse(data, attr_map) + return obj + + @staticmethod + def parse_tlvs(data, attr_map): + # print("PARSING " + " ".join(["x{:02X}".format(b) for b in data])) + off = 0 + ret = [] + while off + sizeof(IpFwObjTlv) <= len(data): + hdr = IpFwObjTlv.from_buffer_copy(data[off : off + sizeof(IpFwObjTlv)]) + if off + hdr.length > len(data): + raise ValueError("TLV size do not match") + obj_data = data[off : off + hdr.length] + obj_descr = attr_map.get(hdr.n_type, None) + if obj_descr is None: + # raise ValueError("unknown child TLV {}".format(hdr.n_type)) + cls = BaseTlv + child_map = {} + else: + cls = obj_descr["ad"].cls + child_map = obj_descr.get("child", {}) + # print("FOUND OBJECT type {}".format(cls)) + # print() + obj = cls.from_bytes(obj_data, child_map) + ret.append(obj) + off += hdr.length + return ret + + def __bytes__(self): + raise NotImplementedError() + + def _print_obj_value(self): + return " " + " ".join( + ["x{:02X}".format(b) for b in self._data[sizeof(IpFwObjTlv) :]] + ) + + def as_hexdump(self): + return " ".join(["x{:02X}".format(b) for b in bytes(self)]) + + +class IpFwObjNTlv(Structure): + _fields_ = [ + ("head", IpFwObjTlv), + ("idx", c_ushort), + ("n_set", c_uint8), + ("n_type", c_uint8), + ("spare", c_uint32), + ("name", c_char * 64), + ] + + +class NTlv(BaseTlv): + def __init__(self, obj_type, idx=0, n_set=0, n_type=0, name=None): + super().__init__(obj_type) + self.n_idx = idx + self.n_set = n_set + self.n_type = n_type + self.n_name = name + + @classmethod + def _validate(cls, data): + if len(data) != sizeof(IpFwObjNTlv): + raise ValueError("TLV size is not correct") + hdr = IpFwObjTlv.from_buffer_copy(data[: sizeof(IpFwObjTlv)]) + if len(data) != hdr.length: + raise ValueError("wrong TLV size") + + @classmethod + def _parse(cls, data, attr_map): + hdr = IpFwObjNTlv.from_buffer_copy(data[: sizeof(IpFwObjNTlv)]) + name = hdr.name.decode("utf-8") + self = cls(hdr.head.n_type, hdr.idx, hdr.n_set, hdr.n_type, name) + return self + + def __bytes__(self): + name_bytes = self.n_name.encode("utf-8") + if len(name_bytes) < 64: + name_bytes += b"\0" * (64 - len(name_bytes)) + hdr = IpFwObjNTlv( + head=IpFwObjTlv(n_type=self.obj_type, length=sizeof(IpFwObjNTlv)), + idx=self.n_idx, + n_set=self.n_set, + n_type=self.n_type, + name=name_bytes[:64], + ) + return bytes(hdr) + + def _print_obj_value(self): + return " " + "type={} set={} idx={} name={}".format( + self.n_type, self.n_set, self.n_idx, self.n_name + ) + + +class IpFwObjCTlv(Structure): + _fields_ = [ + ("head", IpFwObjTlv), + ("count", c_uint32), + ("objsize", c_ushort), + ("version", c_uint8), + ("flags", c_uint8), + ] + + +class CTlv(BaseTlv): + def __init__(self, obj_type, obj_list=[]): + super().__init__(obj_type) + if obj_list: + self.obj_list.extend(obj_list) + + @classmethod + def _validate(cls, data): + if len(data) < sizeof(IpFwObjCTlv): + raise ValueError("TLV too short") + hdr = IpFwObjCTlv.from_buffer_copy(data[: sizeof(IpFwObjCTlv)]) + if len(data) != hdr.head.length: + raise ValueError("wrong TLV size") + + @classmethod + def _parse(cls, data, attr_map): + hdr = IpFwObjCTlv.from_buffer_copy(data[: sizeof(IpFwObjCTlv)]) + tlv_list = cls.parse_tlvs(data[sizeof(IpFwObjCTlv) :], attr_map) + if len(tlv_list) != hdr.count: + raise ValueError("wrong number of objects") + self = cls(hdr.head.n_type, obj_list=tlv_list) + return self + + def __bytes__(self): + ret = b"" + for obj in self.obj_list: + ret += bytes(obj) + length = len(ret) + sizeof(IpFwObjCTlv) + if self.obj_list: + objsize = len(bytes(self.obj_list[0])) + else: + objsize = 0 + hdr = IpFwObjCTlv( + head=IpFwObjTlv(n_type=self.obj_type, length=sizeof(IpFwObjNTlv)), + count=len(self.obj_list), + objsize=objsize, + ) + return bytes(hdr) + ret + + def _print_obj_value(self): + return "" + + +class IpFwRule(Structure): + _fields_ = [ + ("act_ofs", c_ushort), + ("cmd_len", c_ushort), + ("spare", c_ushort), + ("n_set", c_uint8), + ("flags", c_uint8), + ("rulenum", c_uint32), + ("n_id", c_uint32), + ] + + +class RawRule(BaseTlv): + def __init__(self, obj_type=0, n_set=0, rulenum=0, obj_list=[]): + super().__init__(obj_type) + self.n_set = n_set + self.rulenum = rulenum + if obj_list: + self.obj_list.extend(obj_list) + + @classmethod + def _validate(cls, data): + min_size = sizeof(IpFwRule) + if len(data) < min_size: + raise ValueError("rule TLV too short") + rule = IpFwRule.from_buffer_copy(data[:min_size]) + if len(data) != min_size + rule.cmd_len * 4: + raise ValueError("rule TLV cmd_len incorrect") + + @classmethod + def _parse(cls, data, attr_map): + hdr = IpFwRule.from_buffer_copy(data[: sizeof(IpFwRule)]) + self = cls( + n_set=hdr.n_set, + rulenum=hdr.rulenum, + obj_list=BaseInsn.parse_insns(data[sizeof(IpFwRule) :], insn_attrs), + ) + return self + + def __bytes__(self): + act_ofs = 0 + cmd_len = 0 + ret = b"" + for obj in self.obj_list: + if obj.is_action and act_ofs == 0: + act_ofs = cmd_len + obj_bytes = bytes(obj) + cmd_len += len(obj_bytes) // 4 + ret += obj_bytes + + hdr = IpFwRule( + act_ofs=act_ofs, + cmd_len=cmd_len, + n_set=self.n_set, + rulenum=self.rulenum, + ) + return bytes(hdr) + ret + + @property + def obj_name(self): + return "rule#{}".format(self.rulenum) + + def _print_obj_value(self): + cmd_len = sum([len(bytes(obj)) for obj in self.obj_list]) // 4 + return " set={} cmd_len={}".format(self.n_set, cmd_len) + + +class CTlvRule(CTlv): + def __init__(self, obj_type=IpFwTlvType.IPFW_TLV_RULE_LIST, obj_list=[]): + super().__init__(obj_type, obj_list) + + @classmethod + def _parse(cls, data, attr_map): + chdr = IpFwObjCTlv.from_buffer_copy(data[: sizeof(IpFwObjCTlv)]) + rule_list = [] + off = sizeof(IpFwObjCTlv) + while off + sizeof(IpFwRule) <= len(data): + hdr = IpFwRule.from_buffer_copy(data[off : off + sizeof(IpFwRule)]) + rule_len = sizeof(IpFwRule) + hdr.cmd_len * 4 + # print("FOUND RULE len={} cmd_len={}".format(rule_len, hdr.cmd_len)) + if off + rule_len > len(data): + raise ValueError("wrong rule size") + rule = RawRule.from_bytes(data[off : off + rule_len]) + rule_list.append(rule) + off += align8(rule_len) + if off != len(data): + raise ValueError("rule bytes left: off={} len={}".format(off, len(data))) + return cls(chdr.head.n_type, obj_list=rule_list) + + # XXX: _validate + + def __bytes__(self): + ret = b"" + for rule in self.obj_list: + rule_bytes = bytes(rule) + remainder = len(rule_bytes) % 8 + if remainder > 0: + rule_bytes += b"\0" * (8 - remainder) + ret += rule_bytes + hdr = IpFwObjCTlv( + head=IpFwObjTlv( + n_type=self.obj_type, length=len(ret) + sizeof(IpFwObjCTlv) + ), + count=len(self.obj_list), + ) + return bytes(hdr) + ret + + +class BaseIpFwMessage(object): + messages = [] + + def __init__(self, msg_type, obj_list=[]): + if isinstance(msg_type, Enum): + self.obj_type = msg_type.value + self._enum = msg_type + else: + self.obj_type = msg_type + self._enum = enum_from_int(self.messages, self.obj_type) + self.obj_list = [] + if obj_list: + self.obj_list.extend(obj_list) + + def add_obj(self, obj): + self.obj_list.append(obj) + + def get_obj(self, obj_type): + obj_type_raw = enum_or_int(obj_type) + for obj in self.obj_list: + if obj.obj_type == obj_type_raw: + return obj + return None + + @staticmethod + def parse_header(data: bytes): + if len(data) < sizeof(IpFw3OpHeader): + raise ValueError("length less than op3 message header") + return IpFw3OpHeader.from_buffer_copy(data), sizeof(IpFw3OpHeader) + + def parse_obj_list(self, data: bytes): + off = 0 + while off < len(data): + # print("PARSE off={} rem={}".format(off, len(data) - off)) + hdr = IpFwObjTlv.from_buffer_copy(data[off : off + sizeof(IpFwObjTlv)]) + # print(" tlv len {}".format(hdr.length)) + if hdr.length + off > len(data): + raise ValueError("TLV too big") + tlv = BaseTlv(hdr.n_type, data[off : off + hdr.length]) + self.add_obj(tlv) + off += hdr.length + + def is_type(self, msg_type): + return enum_or_int(msg_type) == self.msg_type + + @property + def obj_name(self): + if self._enum is not None: + return self._enum.name + else: + return "msg#{}".format(self.obj_type) + + def print_hdr(self, prepend=""): + print("{}len={}, type={}".format(prepend, len(bytes(self)), self.obj_name)) + + @classmethod + def from_bytes(cls, data): + try: + hdr, hdrlen = cls.parse_header(data) + self = cls(hdr.opcode) + self._orig_data = data + except ValueError as e: + print("Failed to parse op3 header: {}".format(e)) + cls.print_as_bytes(data) + raise + tlv_list = BaseTlv.parse_tlvs(data[hdrlen:], self.attr_map) + self.obj_list.extend(tlv_list) + return self + + def __bytes__(self): + ret = bytes(IpFw3OpHeader(opcode=self.obj_type)) + for obj in self.obj_list: + ret += bytes(obj) + return ret + + def print_obj(self): + self.print_hdr() + for obj in self.obj_list: + obj.print_obj(" ") + + @staticmethod + def print_as_bytes(data: bytes, descr: str): + print("===vv {} (len:{:3d}) vv===".format(descr, len(data))) + off = 0 + step = 16 + while off < len(data): + for i in range(step): + if off + i < len(data): + print(" {:02X}".format(data[off + i]), end="") + print("") + off += step + print("--------------------") + + +rule_attrs = prepare_attrs_map( + [ + AttrDescr( + IpFwTlvType.IPFW_TLV_TBLNAME_LIST, + CTlv, + [ + AttrDescr(IpFwTlvType.IPFW_TLV_TBL_NAME, NTlv), + ], + True, + ), + AttrDescr(IpFwTlvType.IPFW_TLV_RULE_LIST, CTlvRule), + ] +) + + +class IpFwXRule(BaseIpFwMessage): + messages = [Op3CmdType.IP_FW_XADD] + attr_map = rule_attrs + + +legacy_classes = [] +set3_classes = [] +get3_classes = [IpFwXRule] diff --git a/tests/atf_python/sys/netpfil/ipfw/ipfw.py b/tests/atf_python/sys/netpfil/ipfw/ipfw.py new file mode 100644 --- /dev/null +++ b/tests/atf_python/sys/netpfil/ipfw/ipfw.py @@ -0,0 +1,118 @@ +#!/usr/bin/env python3 +import os +import socket +import struct +import subprocess +import sys +from ctypes import c_byte +from ctypes import c_char +from ctypes import c_int +from ctypes import c_long +from ctypes import c_uint32 +from ctypes import c_uint8 +from ctypes import c_ulong +from ctypes import c_ushort +from ctypes import sizeof +from ctypes import Structure +from enum import Enum +from typing import Any +from typing import Dict +from typing import List +from typing import NamedTuple +from typing import Optional +from typing import Union + +from atf_python.sys.netpfil.ipfw.ioctl import get3_classes +from atf_python.sys.netpfil.ipfw.ioctl import legacy_classes +from atf_python.sys.netpfil.ipfw.ioctl import set3_classes +from atf_python.sys.netpfil.ipfw.utils import AttrDescr +from atf_python.sys.netpfil.ipfw.utils import enum_from_int +from atf_python.sys.netpfil.ipfw.utils import enum_or_int +from atf_python.sys.netpfil.ipfw.utils import prepare_attrs_map + + +class DebugHeader(Structure): + _fields_ = [ + ("cmd_type", c_ushort), + ("spare1", c_ushort), + ("opt_name", c_uint32), + ("total_len", c_uint32), + ("spare2", c_uint32), + ] + + +class DebugType(Enum): + DO_CMD = 1 + DO_SET3 = 2 + DO_GET3 = 3 + + +class DebugIoReader(object): + HANDLER_CLASSES = { + DebugType.DO_CMD: legacy_classes, + DebugType.DO_SET3: set3_classes, + DebugType.DO_GET3: get3_classes, + } + + def __init__(self, ipfw_path): + self._msgmap = self.build_msgmap() + self.ipfw_path = ipfw_path + + def build_msgmap(self): + xmap = {} + for debug_type, handler_classes in self.HANDLER_CLASSES.items(): + debug_type = enum_or_int(debug_type) + if debug_type not in xmap: + xmap[debug_type] = {} + for handler_class in handler_classes: + for msg in handler_class.messages: + xmap[debug_type][enum_or_int(msg)] = handler_class + return xmap + + def print_obj_header(self, hdr): + debug_type = "#{}".format(hdr.cmd_type) + for _type in self.HANDLER_CLASSES.keys(): + if _type.value == hdr.cmd_type: + debug_type = _type.name.lower() + break + print( + "@@ record for {} len={} optname={}".format( + debug_type, hdr.total_len, hdr.opt_name + ) + ) + + def parse_record(self, data): + hdr = DebugHeader.from_buffer_copy(data[: sizeof(DebugHeader)]) + data = data[sizeof(DebugHeader) :] + cls = self._msgmap[hdr.cmd_type].get(hdr.opt_name) + if cls is not None: + return cls.from_bytes(data) + raise ValueError( + "unsupported cmd_type={} opt_name={}".format(hdr.cmd_type, hdr.opt_name) + ) + + def get_record_from_stdin(self): + data = sys.stdin.buffer.peek(sizeof(DebugHeader)) + if len(data) == 0: + return None + + hdr = DebugHeader.from_buffer_copy(data) + data = sys.stdin.buffer.read(hdr.total_len) + return self.parse_record(data) + + def get_records_from_buffer(self, data): + off = 0 + ret = [] + while off + sizeof(DebugHeader) <= len(data): + hdr = DebugHeader.from_buffer_copy(data[off : off + sizeof(DebugHeader)]) + ret.append(self.parse_record(data[off : off + hdr.total_len])) + off += hdr.total_len + return ret + + def run_ipfw(self, cmd: str) -> bytes: + args = [self.ipfw_path, "-xqn"] + cmd.split() + r = subprocess.run(args, capture_output=True) + return r.stdout + + def get_records(self, cmd: str): + return self.get_records_from_buffer(self.run_ipfw(cmd)) diff --git a/tests/atf_python/sys/netpfil/ipfw/utils.py b/tests/atf_python/sys/netpfil/ipfw/utils.py new file mode 100644 --- /dev/null +++ b/tests/atf_python/sys/netpfil/ipfw/utils.py @@ -0,0 +1,59 @@ +#!/usr/bin/env python3 + +import os +import socket +import struct +import subprocess +import sys +from enum import Enum +from typing import Dict +from typing import List +from typing import Optional +from typing import Union +from typing import Any +from typing import NamedTuple +import pytest + + +def roundup2(val: int, num: int) -> int: + if val % num: + return (val | (num - 1)) + 1 + else: + return val + + +def align8(val: int) -> int: + return roundup2(val, 8) + + +def enum_or_int(val) -> int: + if isinstance(val, Enum): + return val.value + return val + + +def enum_from_int(enum_class: Enum, val: int) -> Enum: + for item in enum_class: + if val == item.value: + return item + return None + + +class AttrDescr(NamedTuple): + val: Enum + cls: Any + child_map: Any = None + is_array: bool = False + + +def prepare_attrs_map(attrs: List[AttrDescr]) -> Dict[str, Dict]: + ret = {} + for ad in attrs: + ret[ad.val.value] = {"ad": ad} + if ad.child_map: + ret[ad.val.value]["child"] = prepare_attrs_map(ad.child_map) + ret[ad.val.value]["is_array"] = ad.is_array + return ret + + +