Page MenuHomeFreeBSD

D40488.diff
No OneTemporary

D40488.diff

diff --git a/etc/mtree/BSD.tests.dist b/etc/mtree/BSD.tests.dist
--- a/etc/mtree/BSD.tests.dist
+++ b/etc/mtree/BSD.tests.dist
@@ -450,6 +450,8 @@
..
ifconfig
..
+ ipfw
+ ..
md5
..
mdconfig
diff --git a/sbin/ipfw/ipfw2.h b/sbin/ipfw/ipfw2.h
--- a/sbin/ipfw/ipfw2.h
+++ b/sbin/ipfw/ipfw2.h
@@ -48,6 +48,7 @@
int test_only; /* only check syntax */
int comment_only; /* only print action and comment */
int verbose; /* be verbose on some commands */
+ int debug_only; /* output ioctl i/o on stdout */
/* The options below can have multiple values. */
diff --git a/sbin/ipfw/ipfw2.c b/sbin/ipfw/ipfw2.c
--- a/sbin/ipfw/ipfw2.c
+++ b/sbin/ipfw/ipfw2.c
@@ -587,6 +587,13 @@
return (strcmp(a, b));
}
+struct debug_header {
+ uint16_t cmd_type;
+ uint16_t spare1;
+ uint32_t opt_name;
+ uint32_t total_len;
+ uint32_t spare2;
+};
/*
* conditionally runs the command.
@@ -597,8 +604,18 @@
{
int i;
+ if (g_co.debug_only) {
+ struct debug_header dbg = {
+ .cmd_type = 1,
+ .opt_name = optname,
+ .total_len = optlen + sizeof(struct debug_header),
+ };
+ write(1, &dbg, sizeof(dbg));
+ write(1, optval, optlen);
+ }
+
if (g_co.test_only)
- return 0;
+ return (0);
if (ipfw_socket == -1)
ipfw_socket = socket(AF_INET, SOCK_RAW, IPPROTO_RAW);
@@ -617,7 +634,7 @@
} else {
i = setsockopt(ipfw_socket, IPPROTO_IP, optname, optval, optlen);
}
- return i;
+ return (i);
}
/*
@@ -634,6 +651,18 @@
do_set3(int optname, ip_fw3_opheader *op3, size_t optlen)
{
+ op3->opcode = optname;
+
+ if (g_co.debug_only) {
+ struct debug_header dbg = {
+ .cmd_type = 2,
+ .opt_name = optname,
+ .total_len = optlen, sizeof(struct debug_header),
+ };
+ write(1, &dbg, sizeof(dbg));
+ write(1, op3, optlen);
+ }
+
if (g_co.test_only)
return (0);
@@ -642,7 +671,6 @@
if (ipfw_socket < 0)
err(EX_UNAVAILABLE, "socket");
- op3->opcode = optname;
return (setsockopt(ipfw_socket, IPPROTO_IP, IP_FW3, op3, optlen));
}
@@ -663,6 +691,18 @@
int error;
socklen_t len;
+ op3->opcode = optname;
+
+ if (g_co.debug_only) {
+ struct debug_header dbg = {
+ .cmd_type = 3,
+ .opt_name = optname,
+ .total_len = *optlen + sizeof(struct debug_header),
+ };
+ write(1, &dbg, sizeof(dbg));
+ write(1, op3, *optlen);
+ }
+
if (g_co.test_only)
return (0);
@@ -671,7 +711,6 @@
if (ipfw_socket < 0)
err(EX_UNAVAILABLE, "socket");
- op3->opcode = optname;
len = *optlen;
error = getsockopt(ipfw_socket, IPPROTO_IP, IP_FW3, op3, &len);
diff --git a/sbin/ipfw/main.c b/sbin/ipfw/main.c
--- a/sbin/ipfw/main.c
+++ b/sbin/ipfw/main.c
@@ -277,7 +277,7 @@
optind = optreset = 1; /* restart getopt() */
if (is_ipfw()) {
- while ((ch = getopt(ac, av, "abcdDefhinNp:qs:STtv")) != -1)
+ while ((ch = getopt(ac, av, "abcdDefhinNp:qs:STtvx")) != -1)
switch (ch) {
case 'a':
do_acct = 1;
@@ -354,6 +354,10 @@
g_co.verbose = 1;
break;
+ case 'x': /* debug output */
+ g_co.debug_only = 1;
+ break;
+
default:
free(save_av);
return 1;
diff --git a/sbin/ipfw/tests/Makefile b/sbin/ipfw/tests/Makefile
new file mode 100644
--- /dev/null
+++ b/sbin/ipfw/tests/Makefile
@@ -0,0 +1,5 @@
+PACKAGE= tests
+
+ATF_TESTS_PYTEST+= test_add_rule.py
+
+.include <bsd.test.mk>
diff --git a/sbin/ipfw/tests/test_add_rule.py b/sbin/ipfw/tests/test_add_rule.py
new file mode 100755
--- /dev/null
+++ b/sbin/ipfw/tests/test_add_rule.py
@@ -0,0 +1,400 @@
+import errno
+import json
+import os
+import socket
+import struct
+import subprocess
+import sys
+from ctypes import c_byte
+from ctypes import c_char
+from ctypes import c_int
+from ctypes import c_long
+from ctypes import c_uint32
+from ctypes import c_uint8
+from ctypes import c_ulong
+from ctypes import c_ushort
+from ctypes import sizeof
+from ctypes import Structure
+from enum import Enum
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import NamedTuple
+from typing import Optional
+from typing import Union
+
+import pytest
+from atf_python.sys.netpfil.ipfw.insns import Icmp6RejectCode
+from atf_python.sys.netpfil.ipfw.insns import IcmpRejectCode
+from atf_python.sys.netpfil.ipfw.insns import Insn
+from atf_python.sys.netpfil.ipfw.insns import InsnComment
+from atf_python.sys.netpfil.ipfw.insns import InsnEmpty
+from atf_python.sys.netpfil.ipfw.insns import InsnIp
+from atf_python.sys.netpfil.ipfw.insns import InsnIp6
+from atf_python.sys.netpfil.ipfw.insns import InsnPorts
+from atf_python.sys.netpfil.ipfw.insns import InsnProb
+from atf_python.sys.netpfil.ipfw.insns import InsnProto
+from atf_python.sys.netpfil.ipfw.insns import InsnReject
+from atf_python.sys.netpfil.ipfw.insns import InsnTable
+from atf_python.sys.netpfil.ipfw.insns import IpFwOpcode
+from atf_python.sys.netpfil.ipfw.ioctl import CTlv
+from atf_python.sys.netpfil.ipfw.ioctl import CTlvRule
+from atf_python.sys.netpfil.ipfw.ioctl import IpFwTlvType
+from atf_python.sys.netpfil.ipfw.ioctl import IpFwXRule
+from atf_python.sys.netpfil.ipfw.ioctl import NTlv
+from atf_python.sys.netpfil.ipfw.ioctl import Op3CmdType
+from atf_python.sys.netpfil.ipfw.ioctl import RawRule
+from atf_python.sys.netpfil.ipfw.ipfw import DebugIoReader
+from atf_python.sys.netpfil.ipfw.utils import enum_from_int
+from atf_python.utils import BaseTest
+
+
+IPFW_PATH = "/sbin/ipfw"
+
+
+def differ(w_obj, g_obj, w_stack=[], g_stack=[]):
+ if bytes(w_obj) == bytes(g_obj):
+ return True
+ num_objects = 0
+ for i, w_child in enumerate(w_obj.obj_list):
+ if i > len(g_obj.obj_list):
+ print("MISSING object from chain {}".format(" / ".join(w_stack)))
+ w_child.print_obj()
+ print("==========================")
+ return False
+ g_child = g_obj.obj_list[i]
+ if bytes(w_child) == bytes(g_child):
+ num_objects += 1
+ continue
+ w_stack.append(w_obj.obj_name)
+ g_stack.append(g_obj.obj_name)
+ if not differ(w_child, g_child, w_stack, g_stack):
+ return False
+ break
+ if num_objects == len(w_obj.obj_list) and num_objects < len(g_obj.obj_list):
+ g_child = g_obj.obj_list[num_objects]
+ print("EXTRA object from chain {}".format(" / ".join(g_stack)))
+ g_child.print_obj()
+ print("==========================")
+ return False
+ print("OBJECTS DIFFER")
+ print("WANTED CHAIN: {}".format(" / ".join(w_stack)))
+ w_obj.print_obj()
+ w_obj.print_obj_hex()
+ print("==========================")
+ print("GOT CHAIN: {}".format(" / ".join(g_stack)))
+ g_obj.print_obj()
+ g_obj.print_obj_hex()
+ print("==========================")
+ return False
+
+
+class TestAddRule(BaseTest):
+ def compile_rule(self, out):
+ tlvs = []
+ if "objs" in out:
+ tlvs.append(CTlv(IpFwTlvType.IPFW_TLV_TBLNAME_LIST, out["objs"]))
+ rule = RawRule(rulenum=out.get("rulenum", 0), obj_list=out["insns"])
+ tlvs.append(CTlvRule(obj_list=[rule]))
+ return IpFwXRule(Op3CmdType.IP_FW_XADD, tlvs)
+
+ def verify_rule(self, in_data: str, out_data):
+ # Prepare the desired output
+ expected = self.compile_rule(out_data)
+
+ reader = DebugIoReader(IPFW_PATH)
+ ioctls = reader.get_records(in_data)
+ assert len(ioctls) == 1 # Only 1 ioctl request expected
+ got = ioctls[0]
+
+ if not differ(expected, got):
+ print("=> CMD: {}".format(in_data))
+ print("=> WANTED:")
+ expected.print_obj()
+ print("==========================")
+ print("=> GOT:")
+ got.print_obj()
+ print("==========================")
+ assert bytes(got) == bytes(expected)
+
+ @pytest.mark.parametrize(
+ "rule",
+ [
+ pytest.param(
+ {
+ "in": "add 200 allow ip from any to any",
+ "out": {
+ "insns": [InsnEmpty(IpFwOpcode.O_ACCEPT)],
+ "rulenum": 200,
+ },
+ },
+ id="test_rulenum",
+ ),
+ pytest.param(
+ {
+ "in": "add allow ip from { 1.2.3.4 or 2.3.4.5 } to any",
+ "out": {
+ "insns": [
+ InsnIp(IpFwOpcode.O_IP_SRC, ip="1.2.3.4", is_or=True),
+ InsnIp(IpFwOpcode.O_IP_SRC, ip="2.3.4.5"),
+ InsnEmpty(IpFwOpcode.O_ACCEPT),
+ ],
+ },
+ },
+ id="test_or",
+ ),
+ pytest.param(
+ {
+ "in": "add allow ip from table(AAA) to table(BBB)",
+ "out": {
+ "objs": [
+ NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"),
+ NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"),
+ ],
+ "insns": [
+ InsnTable(IpFwOpcode.O_IP_SRC_LOOKUP, arg1=1),
+ InsnTable(IpFwOpcode.O_IP_DST_LOOKUP, arg1=2),
+ InsnEmpty(IpFwOpcode.O_ACCEPT),
+ ],
+ },
+ },
+ id="test_tables",
+ ),
+ pytest.param(
+ {
+ "in": "add allow ip from any to 1.2.3.4 // test comment",
+ "out": {
+ "insns": [
+ InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"),
+ InsnComment(comment="test comment"),
+ InsnEmpty(IpFwOpcode.O_ACCEPT),
+ ],
+ },
+ },
+ id="test_comment",
+ ),
+ ],
+ )
+ def test_add_rule(self, rule):
+ """Tests if the compiled rule is sane and matches the spec"""
+ self.verify_rule(rule["in"], rule["out"])
+
+ @pytest.mark.parametrize(
+ "action",
+ [
+ pytest.param(("allow", InsnEmpty(IpFwOpcode.O_ACCEPT)), id="test_allow"),
+ pytest.param(
+ (
+ "abort",
+ Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_REJECT_ABORT),
+ ),
+ id="abort",
+ ),
+ pytest.param(
+ (
+ "abort6",
+ Insn(
+ IpFwOpcode.O_UNREACH6, arg1=Icmp6RejectCode.ICMP6_UNREACH_ABORT
+ ),
+ ),
+ id="abort6",
+ ),
+ pytest.param(("accept", InsnEmpty(IpFwOpcode.O_ACCEPT)), id="accept"),
+ pytest.param(("deny", InsnEmpty(IpFwOpcode.O_DENY)), id="deny"),
+ pytest.param(
+ (
+ "reject",
+ Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_HOST),
+ ),
+ id="reject",
+ ),
+ pytest.param(
+ (
+ "reset",
+ Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_REJECT_RST),
+ ),
+ id="reset",
+ ),
+ pytest.param(
+ (
+ "reset6",
+ Insn(IpFwOpcode.O_UNREACH6, arg1=Icmp6RejectCode.ICMP6_UNREACH_RST),
+ ),
+ id="reset6",
+ ),
+ pytest.param(
+ (
+ "unreach port",
+ InsnReject(
+ IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_PORT
+ ),
+ ),
+ id="unreach_port",
+ ),
+ pytest.param(
+ (
+ "unreach port",
+ InsnReject(
+ IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_PORT
+ ),
+ ),
+ id="unreach_port",
+ ),
+ pytest.param(
+ (
+ "unreach needfrag",
+ InsnReject(
+ IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_NEEDFRAG
+ ),
+ ),
+ id="unreach_needfrag",
+ ),
+ pytest.param(
+ (
+ "unreach needfrag 1420",
+ InsnReject(
+ IpFwOpcode.O_REJECT,
+ arg1=IcmpRejectCode.ICMP_UNREACH_NEEDFRAG,
+ mtu=1420,
+ ),
+ ),
+ id="unreach_needfrag_mtu",
+ ),
+ pytest.param(
+ (
+ "unreach6 port",
+ Insn(
+ IpFwOpcode.O_UNREACH6,
+ arg1=Icmp6RejectCode.ICMP6_DST_UNREACH_NOPORT,
+ ),
+ ),
+ id="unreach6_port",
+ ),
+ pytest.param(("count", InsnEmpty(IpFwOpcode.O_COUNT)), id="count"),
+ # TOK_NAT
+ pytest.param(
+ ("queue 42", Insn(IpFwOpcode.O_QUEUE, arg1=42)), id="queue_42"
+ ),
+ pytest.param(("pipe 42", Insn(IpFwOpcode.O_PIPE, arg1=42)), id="pipe_42"),
+ pytest.param(
+ ("skipto 42", Insn(IpFwOpcode.O_SKIPTO, arg1=42)), id="skipto_42"
+ ),
+ pytest.param(
+ ("netgraph 42", Insn(IpFwOpcode.O_NETGRAPH, arg1=42)), id="netgraph_42"
+ ),
+ pytest.param(
+ ("ngtee 42", Insn(IpFwOpcode.O_NGTEE, arg1=42)), id="ngtee_42"
+ ),
+ pytest.param(
+ ("divert 42", Insn(IpFwOpcode.O_DIVERT, arg1=42)), id="divert_42"
+ ),
+ pytest.param(
+ ("divert natd", Insn(IpFwOpcode.O_DIVERT, arg1=8668)), id="divert_natd"
+ ),
+ pytest.param(("tee 42", Insn(IpFwOpcode.O_TEE, arg1=42)), id="tee_42"),
+ pytest.param(
+ ("call 420", Insn(IpFwOpcode.O_CALLRETURN, arg1=420)), id="call_420"
+ ),
+ # TOK_FORWARD
+ # TOK_COMMENT
+ pytest.param(
+ ("setfib 1", Insn(IpFwOpcode.O_SETFIB, arg1=1 | 0x8000)),
+ id="setfib_1",
+ marks=pytest.mark.skip("needs net.fibs>1"),
+ ),
+ pytest.param(
+ ("setdscp 42", Insn(IpFwOpcode.O_SETDSCP, arg1=42 | 0x8000)),
+ id="setdscp_42",
+ ),
+ pytest.param(("reass", InsnEmpty(IpFwOpcode.O_REASS)), id="reass"),
+ pytest.param(
+ ("return", InsnEmpty(IpFwOpcode.O_CALLRETURN, is_not=True)), id="return"
+ ),
+ ],
+ )
+ def test_add_action(self, action):
+ """Tests if the rule action is compiled properly"""
+ rule_in = "add {} ip from any to any".format(action[0])
+ rule_out = {"insns": [action[1]]}
+ self.verify_rule(rule_in, rule_out)
+
+ @pytest.mark.parametrize(
+ "insn",
+ [
+ pytest.param(
+ {
+ "in": "add prob 0.7 allow ip from any to any",
+ "out": InsnProb(prob=0.7),
+ },
+ id="test_prob",
+ ),
+ pytest.param(
+ {
+ "in": "add allow tcp from any to any",
+ "out": InsnProto(arg1=6),
+ },
+ id="test_proto",
+ ),
+ pytest.param(
+ {
+ "in": "add allow ip from any to any 57",
+ "out": InsnPorts(IpFwOpcode.O_IP_DSTPORT, port_pairs=[57, 57]),
+ },
+ id="test_ports",
+ ),
+ ],
+ )
+ def test_add_single_instruction(self, insn):
+ """Tests if the compiled rule is sane and matches the spec"""
+
+ # Prepare the desired output
+ out = {
+ "insns": [insn["out"], InsnEmpty(IpFwOpcode.O_ACCEPT)],
+ }
+ self.verify_rule(insn["in"], out)
+
+ @pytest.mark.parametrize(
+ "opcode",
+ [
+ pytest.param(IpFwOpcode.O_IP_SRCPORT, id="src"),
+ pytest.param(IpFwOpcode.O_IP_DSTPORT, id="dst"),
+ ],
+ )
+ @pytest.mark.parametrize(
+ "params",
+ [
+ pytest.param(
+ {
+ "in": "57",
+ "out": [(57, 57)],
+ },
+ id="test_single",
+ ),
+ pytest.param(
+ {
+ "in": "57-59",
+ "out": [(57, 59)],
+ },
+ id="test_range",
+ ),
+ pytest.param(
+ {
+ "in": "57-59,41",
+ "out": [(57, 59), (41, 41)],
+ },
+ id="test_ranges",
+ ),
+ ],
+ )
+ def test_add_ports(self, params, opcode):
+ if opcode == IpFwOpcode.O_IP_DSTPORT:
+ txt = "add allow ip from any to any " + params["in"]
+ else:
+ txt = "add allow ip from any " + params["in"] + " to any"
+ out = {
+ "insns": [
+ InsnPorts(opcode, port_pairs=params["out"]),
+ InsnEmpty(IpFwOpcode.O_ACCEPT),
+ ]
+ }
+ self.verify_rule(txt, out)
diff --git a/tests/atf_python/sys/Makefile b/tests/atf_python/sys/Makefile
--- a/tests/atf_python/sys/Makefile
+++ b/tests/atf_python/sys/Makefile
@@ -3,7 +3,7 @@
.PATH: ${.CURDIR}
FILES= __init__.py
-SUBDIR= net netlink
+SUBDIR= net netlink netpfil
.include <bsd.own.mk>
FILESDIR= ${TESTSBASE}/atf_python/sys
diff --git a/tests/atf_python/sys/Makefile b/tests/atf_python/sys/netpfil/Makefile
copy from tests/atf_python/sys/Makefile
copy to tests/atf_python/sys/netpfil/Makefile
--- a/tests/atf_python/sys/Makefile
+++ b/tests/atf_python/sys/netpfil/Makefile
@@ -3,9 +3,9 @@
.PATH: ${.CURDIR}
FILES= __init__.py
-SUBDIR= net netlink
+SUBDIR= ipfw
.include <bsd.own.mk>
-FILESDIR= ${TESTSBASE}/atf_python/sys
+FILESDIR= ${TESTSBASE}/atf_python/sys/netpfil
.include <bsd.prog.mk>
diff --git a/tests/atf_python/sys/netpfil/__init__.py b/tests/atf_python/sys/netpfil/__init__.py
new file mode 100644
diff --git a/tests/atf_python/sys/netpfil/ipfw/Makefile b/tests/atf_python/sys/netpfil/ipfw/Makefile
new file mode 100644
--- /dev/null
+++ b/tests/atf_python/sys/netpfil/ipfw/Makefile
@@ -0,0 +1,12 @@
+.include <src.opts.mk>
+
+.PATH: ${.CURDIR}
+
+FILES= __init__.py insns.py insn_headers.py ioctl.py ioctl_headers.py \
+ ipfw.py utils.py
+
+.include <bsd.own.mk>
+FILESDIR= ${TESTSBASE}/atf_python/sys/netpfil/ipfw
+
+.include <bsd.prog.mk>
+
diff --git a/tests/atf_python/sys/netpfil/ipfw/__init__.py b/tests/atf_python/sys/netpfil/ipfw/__init__.py
new file mode 100644
diff --git a/tests/atf_python/sys/netpfil/ipfw/insn_headers.py b/tests/atf_python/sys/netpfil/ipfw/insn_headers.py
new file mode 100644
--- /dev/null
+++ b/tests/atf_python/sys/netpfil/ipfw/insn_headers.py
@@ -0,0 +1,198 @@
+from enum import Enum
+
+
+class IpFwOpcode(Enum):
+ O_NOP = 0
+ O_IP_SRC = 1
+ O_IP_SRC_MASK = 2
+ O_IP_SRC_ME = 3
+ O_IP_SRC_SET = 4
+ O_IP_DST = 5
+ O_IP_DST_MASK = 6
+ O_IP_DST_ME = 7
+ O_IP_DST_SET = 8
+ O_IP_SRCPORT = 9
+ O_IP_DSTPORT = 10
+ O_PROTO = 11
+ O_MACADDR2 = 12
+ O_MAC_TYPE = 13
+ O_LAYER2 = 14
+ O_IN = 15
+ O_FRAG = 16
+ O_RECV = 17
+ O_XMIT = 18
+ O_VIA = 19
+ O_IPOPT = 20
+ O_IPLEN = 21
+ O_IPID = 22
+ O_IPTOS = 23
+ O_IPPRECEDENCE = 24
+ O_IPTTL = 25
+ O_IPVER = 26
+ O_UID = 27
+ O_GID = 28
+ O_ESTAB = 29
+ O_TCPFLAGS = 30
+ O_TCPWIN = 31
+ O_TCPSEQ = 32
+ O_TCPACK = 33
+ O_ICMPTYPE = 34
+ O_TCPOPTS = 35
+ O_VERREVPATH = 36
+ O_VERSRCREACH = 37
+ O_PROBE_STATE = 38
+ O_KEEP_STATE = 39
+ O_LIMIT = 40
+ O_LIMIT_PARENT = 41
+ O_LOG = 42
+ O_PROB = 43
+ O_CHECK_STATE = 44
+ O_ACCEPT = 45
+ O_DENY = 46
+ O_REJECT = 47
+ O_COUNT = 48
+ O_SKIPTO = 49
+ O_PIPE = 50
+ O_QUEUE = 51
+ O_DIVERT = 52
+ O_TEE = 53
+ O_FORWARD_IP = 54
+ O_FORWARD_MAC = 55
+ O_NAT = 56
+ O_REASS = 57
+ O_IPSEC = 58
+ O_IP_SRC_LOOKUP = 59
+ O_IP_DST_LOOKUP = 60
+ O_ANTISPOOF = 61
+ O_JAIL = 62
+ O_ALTQ = 63
+ O_DIVERTED = 64
+ O_TCPDATALEN = 65
+ O_IP6_SRC = 66
+ O_IP6_SRC_ME = 67
+ O_IP6_SRC_MASK = 68
+ O_IP6_DST = 69
+ O_IP6_DST_ME = 70
+ O_IP6_DST_MASK = 71
+ O_FLOW6ID = 72
+ O_ICMP6TYPE = 73
+ O_EXT_HDR = 74
+ O_IP6 = 75
+ O_NETGRAPH = 76
+ O_NGTEE = 77
+ O_IP4 = 78
+ O_UNREACH6 = 79
+ O_TAG = 80
+ O_TAGGED = 81
+ O_SETFIB = 82
+ O_FIB = 83
+ O_SOCKARG = 84
+ O_CALLRETURN = 85
+ O_FORWARD_IP6 = 86
+ O_DSCP = 87
+ O_SETDSCP = 88
+ O_IP_FLOW_LOOKUP = 89
+ O_EXTERNAL_ACTION = 90
+ O_EXTERNAL_INSTANCE = 91
+ O_EXTERNAL_DATA = 92
+ O_SKIP_ACTION = 93
+ O_TCPMSS = 94
+ O_MAC_SRC_LOOKUP = 95
+ O_MAC_DST_LOOKUP = 96
+ O_SETMARK = 97
+ O_MARK = 98
+ O_LAST_OPCODE = 99
+
+
+class Op3CmdType(Enum):
+ IP_FW_TABLE_XADD = 86
+ IP_FW_TABLE_XDEL = 87
+ IP_FW_TABLE_XGETSIZE = 88
+ IP_FW_TABLE_XLIST = 89
+ IP_FW_TABLE_XDESTROY = 90
+ IP_FW_TABLES_XLIST = 92
+ IP_FW_TABLE_XINFO = 93
+ IP_FW_TABLE_XFLUSH = 94
+ IP_FW_TABLE_XCREATE = 95
+ IP_FW_TABLE_XMODIFY = 96
+ IP_FW_XGET = 97
+ IP_FW_XADD = 98
+ IP_FW_XDEL = 99
+ IP_FW_XMOVE = 100
+ IP_FW_XZERO = 101
+ IP_FW_XRESETLOG = 102
+ IP_FW_SET_SWAP = 103
+ IP_FW_SET_MOVE = 104
+ IP_FW_SET_ENABLE = 105
+ IP_FW_TABLE_XFIND = 106
+ IP_FW_XIFLIST = 107
+ IP_FW_TABLES_ALIST = 108
+ IP_FW_TABLE_XSWAP = 109
+ IP_FW_TABLE_VLIST = 110
+ IP_FW_NAT44_XCONFIG = 111
+ IP_FW_NAT44_DESTROY = 112
+ IP_FW_NAT44_XGETCONFIG = 113
+ IP_FW_NAT44_LIST_NAT = 114
+ IP_FW_NAT44_XGETLOG = 115
+ IP_FW_DUMP_SOPTCODES = 116
+ IP_FW_DUMP_SRVOBJECTS = 117
+ IP_FW_NAT64STL_CREATE = 130
+ IP_FW_NAT64STL_DESTROY = 131
+ IP_FW_NAT64STL_CONFIG = 132
+ IP_FW_NAT64STL_LIST = 133
+ IP_FW_NAT64STL_STATS = 134
+ IP_FW_NAT64STL_RESET_STATS = 135
+ IP_FW_NAT64LSN_CREATE = 140
+ IP_FW_NAT64LSN_DESTROY = 141
+ IP_FW_NAT64LSN_CONFIG = 142
+ IP_FW_NAT64LSN_LIST = 143
+ IP_FW_NAT64LSN_STATS = 144
+ IP_FW_NAT64LSN_LIST_STATES = 145
+ IP_FW_NAT64LSN_RESET_STATS = 146
+ IP_FW_NPTV6_CREATE = 150
+ IP_FW_NPTV6_DESTROY = 151
+ IP_FW_NPTV6_CONFIG = 152
+ IP_FW_NPTV6_LIST = 153
+ IP_FW_NPTV6_STATS = 154
+ IP_FW_NPTV6_RESET_STATS = 155
+ IP_FW_NAT64CLAT_CREATE = 160
+ IP_FW_NAT64CLAT_DESTROY = 161
+ IP_FW_NAT64CLAT_CONFIG = 162
+ IP_FW_NAT64CLAT_LIST = 163
+ IP_FW_NAT64CLAT_STATS = 164
+ IP_FW_NAT64CLAT_RESET_STATS = 165
+
+
+class IcmpRejectCode(Enum):
+ ICMP_UNREACH_NET = 0
+ ICMP_UNREACH_HOST = 1
+ ICMP_UNREACH_PROTOCOL = 2
+ ICMP_UNREACH_PORT = 3
+ ICMP_UNREACH_NEEDFRAG = 4
+ ICMP_UNREACH_SRCFAIL = 5
+ ICMP_UNREACH_NET_UNKNOWN = 6
+ ICMP_UNREACH_HOST_UNKNOWN = 7
+ ICMP_UNREACH_ISOLATED = 8
+ ICMP_UNREACH_NET_PROHIB = 9
+ ICMP_UNREACH_HOST_PROHIB = 10
+ ICMP_UNREACH_TOSNET = 11
+ ICMP_UNREACH_TOSHOST = 12
+ ICMP_UNREACH_FILTER_PROHIB = 13
+ ICMP_UNREACH_HOST_PRECEDENCE = 14
+ ICMP_UNREACH_PRECEDENCE_CUTOFF = 15
+ ICMP_REJECT_RST = 256
+ ICMP_REJECT_ABORT = 257
+
+
+class Icmp6RejectCode(Enum):
+ ICMP6_DST_UNREACH_NOROUTE = 0
+ ICMP6_DST_UNREACH_ADMIN = 1
+ ICMP6_DST_UNREACH_BEYONDSCOPE = 2
+ ICMP6_DST_UNREACH_NOTNEIGHBOR = 2
+ ICMP6_DST_UNREACH_ADDR = 3
+ ICMP6_DST_UNREACH_NOPORT = 4
+ ICMP6_DST_UNREACH_POLICY = 5
+ ICMP6_DST_UNREACH_REJECT = 6
+ ICMP6_DST_UNREACH_SRCROUTE = 7
+ ICMP6_UNREACH_RST = 256
+ ICMP6_UNREACH_ABORT = 257
diff --git a/tests/atf_python/sys/netpfil/ipfw/insns.py b/tests/atf_python/sys/netpfil/ipfw/insns.py
new file mode 100644
--- /dev/null
+++ b/tests/atf_python/sys/netpfil/ipfw/insns.py
@@ -0,0 +1,555 @@
+#!/usr/bin/env python3
+import os
+import socket
+import struct
+import subprocess
+import sys
+from ctypes import c_byte
+from ctypes import c_char
+from ctypes import c_int
+from ctypes import c_long
+from ctypes import c_uint32
+from ctypes import c_uint8
+from ctypes import c_ulong
+from ctypes import c_ushort
+from ctypes import sizeof
+from ctypes import Structure
+from enum import Enum
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import NamedTuple
+from typing import Optional
+from typing import Union
+
+from atf_python.sys.netpfil.ipfw.insn_headers import IpFwOpcode
+from atf_python.sys.netpfil.ipfw.insn_headers import IcmpRejectCode
+from atf_python.sys.netpfil.ipfw.insn_headers import Icmp6RejectCode
+from atf_python.sys.netpfil.ipfw.utils import AttrDescr
+from atf_python.sys.netpfil.ipfw.utils import enum_or_int
+from atf_python.sys.netpfil.ipfw.utils import enum_from_int
+from atf_python.sys.netpfil.ipfw.utils import prepare_attrs_map
+
+
+insn_actions = (
+ IpFwOpcode.O_CHECK_STATE.value,
+ IpFwOpcode.O_REJECT.value,
+ IpFwOpcode.O_UNREACH6.value,
+ IpFwOpcode.O_ACCEPT.value,
+ IpFwOpcode.O_DENY.value,
+ IpFwOpcode.O_COUNT.value,
+ IpFwOpcode.O_NAT.value,
+ IpFwOpcode.O_QUEUE.value,
+ IpFwOpcode.O_PIPE.value,
+ IpFwOpcode.O_SKIPTO.value,
+ IpFwOpcode.O_NETGRAPH.value,
+ IpFwOpcode.O_NGTEE.value,
+ IpFwOpcode.O_DIVERT.value,
+ IpFwOpcode.O_TEE.value,
+ IpFwOpcode.O_CALLRETURN.value,
+ IpFwOpcode.O_FORWARD_IP.value,
+ IpFwOpcode.O_FORWARD_IP6.value,
+ IpFwOpcode.O_SETFIB.value,
+ IpFwOpcode.O_SETDSCP.value,
+ IpFwOpcode.O_REASS.value,
+ IpFwOpcode.O_SETMARK.value,
+ IpFwOpcode.O_EXTERNAL_ACTION.value,
+)
+
+
+class IpFwInsn(Structure):
+ _fields_ = [
+ ("opcode", c_uint8),
+ ("length", c_uint8),
+ ("arg1", c_ushort),
+ ]
+
+
+class BaseInsn(object):
+ obj_enum_class = IpFwOpcode
+
+ def __init__(self, opcode, is_or, is_not, arg1):
+ if isinstance(opcode, Enum):
+ self.obj_type = opcode.value
+ self._enum = opcode
+ else:
+ self.obj_type = opcode
+ self._enum = enum_from_int(self.obj_enum_class, self.obj_type)
+ self.is_or = is_or
+ self.is_not = is_not
+ self.arg1 = arg1
+ self.is_action = self.obj_type in insn_actions
+ self.ilen = 1
+ self.obj_list = []
+
+ @property
+ def obj_name(self):
+ if self._enum is not None:
+ return self._enum.name
+ else:
+ return "opcode#{}".format(self.obj_type)
+
+ @staticmethod
+ def get_insn_len(data: bytes) -> int:
+ (opcode_len,) = struct.unpack("@B", data[1:2])
+ return opcode_len & 0x3F
+
+ @classmethod
+ def _validate_len(cls, data, valid_options=None):
+ if len(data) < 4:
+ raise ValueError("opcode too short")
+ opcode_type, opcode_len = struct.unpack("@BB", data[:2])
+ if len(data) != ((opcode_len & 0x3F) * 4):
+ raise ValueError("wrong length")
+ if valid_options and len(data) not in valid_options:
+ raise ValueError(
+ "len {} not in {} for {}".format(
+ len(data), valid_options,
+ enum_from_int(cls.obj_enum_class, data[0])
+ )
+ )
+
+ @classmethod
+ def _validate(cls, data):
+ cls._validate_len(data)
+
+ @classmethod
+ def _parse(cls, data):
+ insn = IpFwInsn.from_buffer_copy(data[:4])
+ is_or = (insn.length & 0x40) != 0
+ is_not = (insn.length & 0x80) != 0
+ return cls(opcode=insn.opcode, is_or=is_or, is_not=is_not, arg1=insn.arg1)
+
+ @classmethod
+ def from_bytes(cls, data, attr_type_enum):
+ cls._validate(data)
+ opcode = cls._parse(data)
+ opcode._enum = attr_type_enum
+ return opcode
+
+ def __bytes__(self):
+ raise NotImplementedError()
+
+ def print_obj(self, prepend=""):
+ is_or = ""
+ if self.is_or:
+ is_or = " [OR]\\"
+ is_not = ""
+ if self.is_not:
+ is_not = "[!] "
+ print(
+ "{}{}len={} type={}({}){}{}".format(
+ prepend,
+ is_not,
+ len(bytes(self)),
+ self.obj_name,
+ self.obj_type,
+ self._print_obj_value(),
+ is_or,
+ )
+ )
+
+ def _print_obj_value(self):
+ raise NotImplementedError()
+
+ def print_obj_hex(self, prepend=""):
+ print(prepend)
+ print()
+ print(" ".join(["x{:02X}".format(b) for b in bytes(self)]))
+
+ @staticmethod
+ def parse_insns(data, attr_map):
+ ret = []
+ off = 0
+ while off + sizeof(IpFwInsn) <= len(data):
+ hdr = IpFwInsn.from_buffer_copy(data[off : off + sizeof(IpFwInsn)])
+ insn_len = (hdr.length & 0x3F) * 4
+ if off + insn_len > len(data):
+ raise ValueError("wrng length")
+ # print("GET insn type {} len {}".format(hdr.opcode, insn_len))
+ attr = attr_map.get(hdr.opcode, None)
+ if attr is None:
+ cls = InsnUnknown
+ type_enum = enum_from_int(BaseInsn.obj_enum_class, hdr.opcode)
+ else:
+ cls = attr["ad"].cls
+ type_enum = attr["ad"].val
+ insn = cls.from_bytes(data[off : off + insn_len], type_enum)
+ ret.append(insn)
+ off += insn_len
+
+ if off != len(data):
+ raise ValueError("empty space")
+ return ret
+
+
+class Insn(BaseInsn):
+ def __init__(self, opcode, is_or=False, is_not=False, arg1=0):
+ super().__init__(opcode, is_or=is_or, is_not=is_not, arg1=arg1)
+
+ @classmethod
+ def _validate(cls, data):
+ cls._validate_len(data, [4])
+
+ def __bytes__(self):
+ length = self.ilen
+ if self.is_or:
+ length |= 0x40
+ if self.is_not:
+ length | 0x80
+ insn = IpFwInsn(opcode=self.obj_type, length=length, arg1=enum_or_int(self.arg1))
+ return bytes(insn)
+
+ def _print_obj_value(self):
+ return " arg1={}".format(self.arg1)
+
+
+class InsnUnknown(Insn):
+ @classmethod
+ def _validate(cls, data):
+ cls._validate_len(data)
+
+ @classmethod
+ def _parse(cls, data):
+ self = super()._parse(data)
+ self._data = data
+ return self
+
+ def __bytes__(self):
+ return self._data
+
+ def _print_obj_value(self):
+ return " " + " ".join(["x{:02X}".format(b) for b in self._data])
+
+
+class InsnEmpty(Insn):
+ @classmethod
+ def _validate(cls, data):
+ cls._validate_len(data, [4])
+ insn = IpFwInsn.from_buffer_copy(data[:4])
+ if insn.arg1 != 0:
+ raise ValueError("arg1 should be empty")
+
+ def _print_obj_value(self):
+ return ""
+
+
+class InsnComment(Insn):
+ def __init__(self, opcode=IpFwOpcode.O_NOP, is_or=False, is_not=False, arg1=0, comment=""):
+ super().__init__(opcode, is_or=is_or, is_not=is_not, arg1=arg1)
+ if comment:
+ self.comment = comment
+ else:
+ self.comment = ""
+
+ @classmethod
+ def _validate(cls, data):
+ cls._validate_len(data)
+ if len(data) > 88:
+ raise ValueError("comment too long")
+
+ @classmethod
+ def _parse(cls, data):
+ self = super()._parse(data)
+ # Comment encoding can be anything,
+ # use utf-8 to ease debugging
+ max_len = 0
+ for b in range(4, len(data)):
+ if data[b] == b"\0":
+ break
+ max_len += 1
+ self.comment = data[4:max_len].decode("utf-8")
+ return self
+
+ def __bytes__(self):
+ ret = super().__bytes__()
+ comment_bytes = self.comment.encode("utf-8") + b"\0"
+ if len(comment_bytes) % 4 > 0:
+ comment_bytes += b"\0" * (4 - (len(comment_bytes) % 4))
+ ret += comment_bytes
+ return ret
+
+ def _print_obj_value(self):
+ return " comment='{}'".format(self.comment)
+
+
+class InsnProto(Insn):
+ def __init__(self, opcode=IpFwOpcode.O_PROTO, is_or=False, is_not=False, arg1=0):
+ super().__init__(opcode, is_or=is_or, is_not=is_not, arg1=arg1)
+
+ def _print_obj_value(self):
+ known_map = {6: "TCP", 17: "UDP", 41: "IPV6"}
+ proto = self.arg1
+ if proto in known_map:
+ return " proto={}".format(known_map[proto])
+ else:
+ return " proto=#{}".format(proto)
+
+
+class InsnU32(Insn):
+ def __init__(self, opcode, is_or=False, is_not=False, arg1=0, u32=0):
+ super().__init__(opcode, is_or=is_or, is_not=is_not, arg1=arg1)
+ self.u32 = u32
+ self.ilen = 2
+
+ @classmethod
+ def _validate(cls, data):
+ cls._validate_len(data, [8])
+
+ @classmethod
+ def _parse(cls, data):
+ self = super()._parse(data[:4])
+ self.u32 = struct.unpack("@I", data[4:8])[0]
+ return self
+
+ def __bytes__(self):
+ return super().__bytes__() + struct.pack("@I", self.u32)
+
+ def _print_obj_value(self):
+ return " arg1={} u32={}".format(self.arg1, self.u32)
+
+
+class InsnProb(InsnU32):
+ def __init__(
+ self,
+ opcode=IpFwOpcode.O_PROB,
+ is_or=False,
+ is_not=False,
+ arg1=0,
+ u32=0,
+ prob=0.0,
+ ):
+ super().__init__(opcode, is_or=is_or, is_not=is_not)
+ self.prob = prob
+
+ @property
+ def prob(self):
+ return 1.0 * self.u32 / 0x7FFFFFFF
+
+ @prob.setter
+ def prob(self, prob: float):
+ self.u32 = int(prob * 0x7FFFFFFF)
+
+ def _print_obj_value(self):
+ return " prob={}".format(round(self.prob, 5))
+
+
+class InsnIp(InsnU32):
+ def __init__(self, opcode, is_or=False, is_not=False, arg1=0, u32=0, ip=None):
+ super().__init__(opcode, is_or=is_or, is_not=is_not, arg1=arg1, u32=u32)
+ if ip:
+ self.ip = ip
+
+ @property
+ def ip(self):
+ return socket.inet_ntop(socket.AF_INET, struct.pack("@I", self.u32))
+
+ @ip.setter
+ def ip(self, ip: str):
+ ip_bin = socket.inet_pton(socket.AF_INET, ip)
+ self.u32 = struct.unpack("@I", ip_bin)[0]
+
+ def _print_opcode_value(self):
+ return " ip={}".format(self.ip)
+
+
+class InsnTable(Insn):
+ @classmethod
+ def _validate(cls, data):
+ cls._validate_len(data, [4, 8])
+
+ @classmethod
+ def _parse(cls, data):
+ self = super()._parse(data)
+
+ if len(data) == 8:
+ (self.val,) = struct.unpack("@I", data[4:8])
+ self.ilen = 2
+ else:
+ self.val = None
+ return self
+
+ def __bytes__(self):
+ ret = super().__bytes__()
+ if getattr(self, "val", None) is not None:
+ ret += struct.pack("@I", self.val)
+ return ret
+
+ def _print_obj_value(self):
+ if getattr(self, "val", None) is not None:
+ return " table={} value={}".format(self.arg1, self.val)
+ else:
+ return " table={}".format(self.arg1)
+
+
+class InsnReject(Insn):
+ def __init__(self, opcode, is_or=False, is_not=False, arg1=0, mtu=None):
+ super().__init__(opcode, is_or=is_or, is_not=is_not, arg1=arg1)
+ self.mtu = mtu
+ if self.mtu is not None:
+ self.ilen = 2
+
+ @classmethod
+ def _validate(cls, data):
+ cls._validate_len(data, [4, 8])
+
+ @classmethod
+ def _parse(cls, data):
+ self = super()._parse(data)
+
+ if len(data) == 8:
+ (self.mtu,) = struct.unpack("@I", data[4:8])
+ self.ilen = 2
+ else:
+ self.mtu = None
+ return self
+
+ def __bytes__(self):
+ ret = super().__bytes__()
+ if getattr(self, "mtu", None) is not None:
+ ret += struct.pack("@I", self.mtu)
+ return ret
+
+ def _print_obj_value(self):
+ code = enum_from_int(IcmpRejectCode, self.arg1)
+ if getattr(self, "mtu", None) is not None:
+ return " code={} mtu={}".format(code, self.mtu)
+ else:
+ return " code={}".format(code)
+
+
+class InsnPorts(Insn):
+ def __init__(self, opcode, is_or=False, is_not=False, arg1=0, port_pairs=[]):
+ super().__init__(opcode, is_or=is_or, is_not=is_not)
+ self.port_pairs = []
+ if port_pairs:
+ self.port_pairs = port_pairs
+
+ @classmethod
+ def _validate(cls, data):
+ if len(data) < 8:
+ raise ValueError("no ports specified")
+ cls._validate_len(data)
+
+ @classmethod
+ def _parse(cls, data):
+ self = super()._parse(data)
+
+ off = 4
+ port_pairs = []
+ while off + 4 <= len(data):
+ low, high = struct.unpack("@HH", data[off : off + 4])
+ port_pairs.append((low, high))
+ off += 4
+ self.port_pairs = port_pairs
+ return self
+
+ def __bytes__(self):
+ ret = super().__bytes__()
+ if getattr(self, "val", None) is not None:
+ ret += struct.pack("@I", self.val)
+ return ret
+
+ def _print_obj_value(self):
+ ret = []
+ for p in self.port_pairs:
+ if p[0] == p[1]:
+ ret.append(str(p[0]))
+ else:
+ ret.append("{}-{}".format(p[0], p[1]))
+ return " ports={}".format(",".join(ret))
+
+
+class IpFwInsnIp6(Structure):
+ _fields_ = [
+ ("o", IpFwInsn),
+ ("addr6", c_byte * 16),
+ ("mask6", c_byte * 16),
+ ]
+
+
+class InsnIp6(Insn):
+ def __init__(self, opcode, is_or=False, is_not=False, arg1=0, ip6=None, mask6=None):
+ super().__init__(opcode, is_or=is_or, is_not=is_not, arg1=arg1)
+ self.ip6 = ip6
+ self.mask6 = mask6
+ if mask6 is not None:
+ self.ilen = 9
+ else:
+ self.ilen = 5
+
+ @classmethod
+ def _validate(cls, data):
+ cls._validate_len(data, [4 + 16, 4 + 16 * 2])
+
+ @classmethod
+ def _parse(cls, data):
+ self = super()._parse(data)
+ self.ip6 = socket.inet_ntop(socket.AF_INET6, data[4:20])
+
+ if len(data) == 4 + 16 * 2:
+ self.mask6 = socket.inet_ntop(socket.AF_INET6, data[20:36])
+ self.ilen = 9
+ else:
+ self.mask6 = None
+ self.ilen = 5
+ return self
+
+ def __bytes__(self):
+ ret = super().__bytes__() + socket.inet_pton(socket.AF_INET6, self.ip6)
+ if self.mask6 is not None:
+ ret += socket.inet_pton(socket.AF_INET6, self.mask6)
+ return ret
+
+ def _print_obj_value(self):
+ if self.mask6:
+ return " ip6={}/{}".format(self.ip6, self.mask6)
+ else:
+ return " ip6={}".format(self.ip6)
+
+
+insn_attrs = prepare_attrs_map(
+ [
+ AttrDescr(IpFwOpcode.O_CHECK_STATE, Insn),
+ AttrDescr(IpFwOpcode.O_ACCEPT, InsnEmpty),
+ AttrDescr(IpFwOpcode.O_COUNT, InsnEmpty),
+
+ AttrDescr(IpFwOpcode.O_REJECT, InsnReject),
+ AttrDescr(IpFwOpcode.O_UNREACH6, Insn),
+ AttrDescr(IpFwOpcode.O_DENY, InsnEmpty),
+ AttrDescr(IpFwOpcode.O_DIVERT, Insn),
+ AttrDescr(IpFwOpcode.O_COUNT, InsnEmpty),
+ AttrDescr(IpFwOpcode.O_QUEUE, Insn),
+ AttrDescr(IpFwOpcode.O_PIPE, Insn),
+ AttrDescr(IpFwOpcode.O_SKIPTO, Insn),
+ AttrDescr(IpFwOpcode.O_NETGRAPH, Insn),
+ AttrDescr(IpFwOpcode.O_NGTEE, Insn),
+ AttrDescr(IpFwOpcode.O_DIVERT, Insn),
+ AttrDescr(IpFwOpcode.O_TEE, Insn),
+ AttrDescr(IpFwOpcode.O_CALLRETURN, Insn),
+ AttrDescr(IpFwOpcode.O_SETFIB, Insn),
+ AttrDescr(IpFwOpcode.O_SETDSCP, Insn),
+ AttrDescr(IpFwOpcode.O_REASS, InsnEmpty),
+ AttrDescr(IpFwOpcode.O_SETMARK, Insn),
+
+
+
+ AttrDescr(IpFwOpcode.O_NOP, InsnComment),
+ AttrDescr(IpFwOpcode.O_PROTO, InsnProto),
+ AttrDescr(IpFwOpcode.O_PROB, InsnProb),
+ AttrDescr(IpFwOpcode.O_IP_DST_ME, InsnEmpty),
+ AttrDescr(IpFwOpcode.O_IP_SRC_ME, InsnEmpty),
+ AttrDescr(IpFwOpcode.O_IP6_DST_ME, InsnEmpty),
+ AttrDescr(IpFwOpcode.O_IP6_SRC_ME, InsnEmpty),
+ AttrDescr(IpFwOpcode.O_IP_SRC, InsnIp),
+ AttrDescr(IpFwOpcode.O_IP_DST, InsnIp),
+ AttrDescr(IpFwOpcode.O_IP6_DST, InsnIp6),
+ AttrDescr(IpFwOpcode.O_IP6_SRC, InsnIp6),
+ AttrDescr(IpFwOpcode.O_IP_SRC_LOOKUP, InsnTable),
+ AttrDescr(IpFwOpcode.O_IP_DST_LOOKUP, InsnTable),
+ AttrDescr(IpFwOpcode.O_IP_SRCPORT, InsnPorts),
+ AttrDescr(IpFwOpcode.O_IP_DSTPORT, InsnPorts),
+ AttrDescr(IpFwOpcode.O_PROBE_STATE, Insn),
+ AttrDescr(IpFwOpcode.O_KEEP_STATE, Insn),
+ ]
+)
diff --git a/tests/atf_python/sys/netpfil/ipfw/ioctl.py b/tests/atf_python/sys/netpfil/ipfw/ioctl.py
new file mode 100644
--- /dev/null
+++ b/tests/atf_python/sys/netpfil/ipfw/ioctl.py
@@ -0,0 +1,505 @@
+#!/usr/bin/env python3
+import os
+import socket
+import struct
+import subprocess
+import sys
+from ctypes import c_byte
+from ctypes import c_char
+from ctypes import c_int
+from ctypes import c_long
+from ctypes import c_uint32
+from ctypes import c_uint8
+from ctypes import c_ulong
+from ctypes import c_ushort
+from ctypes import sizeof
+from ctypes import Structure
+from enum import Enum
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import NamedTuple
+from typing import Optional
+from typing import Union
+
+import pytest
+from atf_python.sys.netpfil.ipfw.insns import BaseInsn
+from atf_python.sys.netpfil.ipfw.insns import insn_attrs
+from atf_python.sys.netpfil.ipfw.ioctl_headers import IpFwTableLookupType
+from atf_python.sys.netpfil.ipfw.ioctl_headers import IpFwTlvType
+from atf_python.sys.netpfil.ipfw.ioctl_headers import Op3CmdType
+from atf_python.sys.netpfil.ipfw.utils import align8
+from atf_python.sys.netpfil.ipfw.utils import AttrDescr
+from atf_python.sys.netpfil.ipfw.utils import enum_from_int
+from atf_python.sys.netpfil.ipfw.utils import prepare_attrs_map
+
+
+class IpFw3OpHeader(Structure):
+ _fields_ = [
+ ("opcode", c_ushort),
+ ("version", c_ushort),
+ ("reserved1", c_ushort),
+ ("reserved2", c_ushort),
+ ]
+
+
+class IpFwObjTlv(Structure):
+ _fields_ = [
+ ("n_type", c_ushort),
+ ("flags", c_ushort),
+ ("length", c_uint32),
+ ]
+
+
+class BaseTlv(object):
+ obj_enum_class = IpFwTlvType
+
+ def __init__(self, obj_type):
+ if isinstance(obj_type, Enum):
+ self.obj_type = obj_type.value
+ self._enum = obj_type
+ else:
+ self.obj_type = obj_type
+ self._enum = enum_from_int(self.obj_enum_class, obj_type)
+ self.obj_list = []
+
+ def add_obj(self, obj):
+ self.obj_list.append(obj)
+
+ @property
+ def len(self):
+ return len(bytes(self))
+
+ @property
+ def obj_name(self):
+ if self._enum is not None:
+ return self._enum.name
+ else:
+ return "tlv#{}".format(self.obj_type)
+
+ def print_hdr(self, prepend=""):
+ print(
+ "{}len={} type={}({}){}".format(
+ prepend, self.len, self.obj_name, self.obj_type, self._print_obj_value()
+ )
+ )
+
+ def print_obj(self, prepend=""):
+ self.print_hdr(prepend)
+ prepend = " " + prepend
+ for obj in self.obj_list:
+ obj.print_obj(prepend)
+
+ @classmethod
+ def _validate(cls, data):
+ if len(data) < sizeof(IpFwObjTlv):
+ raise ValueError("TLV too short")
+ hdr = IpFwObjTlv.from_buffer_copy(data[: sizeof(IpFwObjTlv)])
+ if len(data) != hdr.length:
+ raise ValueError("wrong TLV size")
+
+ @classmethod
+ def _parse(cls, data, attr_map):
+ hdr = IpFwObjTlv.from_buffer_copy(data[: sizeof(IpFwObjTlv)])
+ return cls(hdr.n_type)
+
+ @classmethod
+ def from_bytes(cls, data, attr_map=None):
+ cls._validate(data)
+ obj = cls._parse(data, attr_map)
+ return obj
+
+ def __bytes__(self):
+ raise NotImplementedError()
+
+ def _print_obj_value(self):
+ return " " + " ".join(
+ ["x{:02X}".format(b) for b in self._data[sizeof(IpFwObjTlv) :]]
+ )
+
+ def as_hexdump(self):
+ return " ".join(["x{:02X}".format(b) for b in bytes(self)])
+
+
+class UnknownTlv(BaseTlv):
+ def __init__(self, obj_type, data):
+ super().__init__(obj_type)
+ self._data = data
+
+ @classmethod
+ def _validate(cls, data):
+ if len(data) < sizeof(IpFwObjNTlv):
+ raise ValueError("TLV size is too short")
+ hdr = IpFwObjTlv.from_buffer_copy(data[: sizeof(IpFwObjTlv)])
+ if len(data) != hdr.length:
+ raise ValueError("wrong TLV size")
+
+ @classmethod
+ def _parse(cls, data, attr_map):
+ hdr = IpFwObjTlv.from_buffer_copy(data[: sizeof(IpFwObjTlv)])
+ self = cls(hdr.n_type, data)
+ return self
+
+ def __bytes__(self):
+ return self._data
+
+
+class Tlv(BaseTlv):
+ @staticmethod
+ def parse_tlvs(data, attr_map):
+ # print("PARSING " + " ".join(["x{:02X}".format(b) for b in data]))
+ off = 0
+ ret = []
+ while off + sizeof(IpFwObjTlv) <= len(data):
+ hdr = IpFwObjTlv.from_buffer_copy(data[off : off + sizeof(IpFwObjTlv)])
+ if off + hdr.length > len(data):
+ raise ValueError("TLV size do not match")
+ obj_data = data[off : off + hdr.length]
+ obj_descr = attr_map.get(hdr.n_type, None)
+ if obj_descr is None:
+ # raise ValueError("unknown child TLV {}".format(hdr.n_type))
+ cls = UnknownTlv
+ child_map = {}
+ else:
+ cls = obj_descr["ad"].cls
+ child_map = obj_descr.get("child", {})
+ # print("FOUND OBJECT type {}".format(cls))
+ # print()
+ obj = cls.from_bytes(obj_data, child_map)
+ ret.append(obj)
+ off += hdr.length
+ return ret
+
+
+class IpFwObjNTlv(Structure):
+ _fields_ = [
+ ("head", IpFwObjTlv),
+ ("idx", c_ushort),
+ ("n_set", c_uint8),
+ ("n_type", c_uint8),
+ ("spare", c_uint32),
+ ("name", c_char * 64),
+ ]
+
+
+class NTlv(Tlv):
+ def __init__(self, obj_type, idx=0, n_set=0, n_type=0, name=None):
+ super().__init__(obj_type)
+ self.n_idx = idx
+ self.n_set = n_set
+ self.n_type = n_type
+ self.n_name = name
+
+ @classmethod
+ def _validate(cls, data):
+ if len(data) != sizeof(IpFwObjNTlv):
+ raise ValueError("TLV size is not correct")
+ hdr = IpFwObjTlv.from_buffer_copy(data[: sizeof(IpFwObjTlv)])
+ if len(data) != hdr.length:
+ raise ValueError("wrong TLV size")
+
+ @classmethod
+ def _parse(cls, data, attr_map):
+ hdr = IpFwObjNTlv.from_buffer_copy(data[: sizeof(IpFwObjNTlv)])
+ name = hdr.name.decode("utf-8")
+ self = cls(hdr.head.n_type, hdr.idx, hdr.n_set, hdr.n_type, name)
+ return self
+
+ def __bytes__(self):
+ name_bytes = self.n_name.encode("utf-8")
+ if len(name_bytes) < 64:
+ name_bytes += b"\0" * (64 - len(name_bytes))
+ hdr = IpFwObjNTlv(
+ head=IpFwObjTlv(n_type=self.obj_type, length=sizeof(IpFwObjNTlv)),
+ idx=self.n_idx,
+ n_set=self.n_set,
+ n_type=self.n_type,
+ name=name_bytes[:64],
+ )
+ return bytes(hdr)
+
+ def _print_obj_value(self):
+ return " " + "type={} set={} idx={} name={}".format(
+ self.n_type, self.n_set, self.n_idx, self.n_name
+ )
+
+
+class IpFwObjCTlv(Structure):
+ _fields_ = [
+ ("head", IpFwObjTlv),
+ ("count", c_uint32),
+ ("objsize", c_ushort),
+ ("version", c_uint8),
+ ("flags", c_uint8),
+ ]
+
+
+class CTlv(Tlv):
+ def __init__(self, obj_type, obj_list=[]):
+ super().__init__(obj_type)
+ if obj_list:
+ self.obj_list.extend(obj_list)
+
+ @classmethod
+ def _validate(cls, data):
+ if len(data) < sizeof(IpFwObjCTlv):
+ raise ValueError("TLV too short")
+ hdr = IpFwObjCTlv.from_buffer_copy(data[: sizeof(IpFwObjCTlv)])
+ if len(data) != hdr.head.length:
+ raise ValueError("wrong TLV size")
+
+ @classmethod
+ def _parse(cls, data, attr_map):
+ hdr = IpFwObjCTlv.from_buffer_copy(data[: sizeof(IpFwObjCTlv)])
+ tlv_list = cls.parse_tlvs(data[sizeof(IpFwObjCTlv) :], attr_map)
+ if len(tlv_list) != hdr.count:
+ raise ValueError("wrong number of objects")
+ self = cls(hdr.head.n_type, obj_list=tlv_list)
+ return self
+
+ def __bytes__(self):
+ ret = b""
+ for obj in self.obj_list:
+ ret += bytes(obj)
+ length = len(ret) + sizeof(IpFwObjCTlv)
+ if self.obj_list:
+ objsize = len(bytes(self.obj_list[0]))
+ else:
+ objsize = 0
+ hdr = IpFwObjCTlv(
+ head=IpFwObjTlv(n_type=self.obj_type, length=sizeof(IpFwObjNTlv)),
+ count=len(self.obj_list),
+ objsize=objsize,
+ )
+ return bytes(hdr) + ret
+
+ def _print_obj_value(self):
+ return ""
+
+
+class IpFwRule(Structure):
+ _fields_ = [
+ ("act_ofs", c_ushort),
+ ("cmd_len", c_ushort),
+ ("spare", c_ushort),
+ ("n_set", c_uint8),
+ ("flags", c_uint8),
+ ("rulenum", c_uint32),
+ ("n_id", c_uint32),
+ ]
+
+
+class RawRule(Tlv):
+ def __init__(self, obj_type=0, n_set=0, rulenum=0, obj_list=[]):
+ super().__init__(obj_type)
+ self.n_set = n_set
+ self.rulenum = rulenum
+ if obj_list:
+ self.obj_list.extend(obj_list)
+
+ @classmethod
+ def _validate(cls, data):
+ min_size = sizeof(IpFwRule)
+ if len(data) < min_size:
+ raise ValueError("rule TLV too short")
+ rule = IpFwRule.from_buffer_copy(data[:min_size])
+ if len(data) != min_size + rule.cmd_len * 4:
+ raise ValueError("rule TLV cmd_len incorrect")
+
+ @classmethod
+ def _parse(cls, data, attr_map):
+ hdr = IpFwRule.from_buffer_copy(data[: sizeof(IpFwRule)])
+ self = cls(
+ n_set=hdr.n_set,
+ rulenum=hdr.rulenum,
+ obj_list=BaseInsn.parse_insns(data[sizeof(IpFwRule) :], insn_attrs),
+ )
+ return self
+
+ def __bytes__(self):
+ act_ofs = 0
+ cmd_len = 0
+ ret = b""
+ for obj in self.obj_list:
+ if obj.is_action and act_ofs == 0:
+ act_ofs = cmd_len
+ obj_bytes = bytes(obj)
+ cmd_len += len(obj_bytes) // 4
+ ret += obj_bytes
+
+ hdr = IpFwRule(
+ act_ofs=act_ofs,
+ cmd_len=cmd_len,
+ n_set=self.n_set,
+ rulenum=self.rulenum,
+ )
+ return bytes(hdr) + ret
+
+ @property
+ def obj_name(self):
+ return "rule#{}".format(self.rulenum)
+
+ def _print_obj_value(self):
+ cmd_len = sum([len(bytes(obj)) for obj in self.obj_list]) // 4
+ return " set={} cmd_len={}".format(self.n_set, cmd_len)
+
+
+class CTlvRule(CTlv):
+ def __init__(self, obj_type=IpFwTlvType.IPFW_TLV_RULE_LIST, obj_list=[]):
+ super().__init__(obj_type, obj_list)
+
+ @classmethod
+ def _parse(cls, data, attr_map):
+ chdr = IpFwObjCTlv.from_buffer_copy(data[: sizeof(IpFwObjCTlv)])
+ rule_list = []
+ off = sizeof(IpFwObjCTlv)
+ while off + sizeof(IpFwRule) <= len(data):
+ hdr = IpFwRule.from_buffer_copy(data[off : off + sizeof(IpFwRule)])
+ rule_len = sizeof(IpFwRule) + hdr.cmd_len * 4
+ # print("FOUND RULE len={} cmd_len={}".format(rule_len, hdr.cmd_len))
+ if off + rule_len > len(data):
+ raise ValueError("wrong rule size")
+ rule = RawRule.from_bytes(data[off : off + rule_len])
+ rule_list.append(rule)
+ off += align8(rule_len)
+ if off != len(data):
+ raise ValueError("rule bytes left: off={} len={}".format(off, len(data)))
+ return cls(chdr.head.n_type, obj_list=rule_list)
+
+ # XXX: _validate
+
+ def __bytes__(self):
+ ret = b""
+ for rule in self.obj_list:
+ rule_bytes = bytes(rule)
+ remainder = len(rule_bytes) % 8
+ if remainder > 0:
+ rule_bytes += b"\0" * (8 - remainder)
+ ret += rule_bytes
+ hdr = IpFwObjCTlv(
+ head=IpFwObjTlv(
+ n_type=self.obj_type, length=len(ret) + sizeof(IpFwObjCTlv)
+ ),
+ count=len(self.obj_list),
+ )
+ return bytes(hdr) + ret
+
+
+class BaseIpFwMessage(object):
+ messages = []
+
+ def __init__(self, msg_type, obj_list=[]):
+ if isinstance(msg_type, Enum):
+ self.obj_type = msg_type.value
+ self._enum = msg_type
+ else:
+ self.obj_type = msg_type
+ self._enum = enum_from_int(self.messages, self.obj_type)
+ self.obj_list = []
+ if obj_list:
+ self.obj_list.extend(obj_list)
+
+ def add_obj(self, obj):
+ self.obj_list.append(obj)
+
+ def get_obj(self, obj_type):
+ obj_type_raw = enum_or_int(obj_type)
+ for obj in self.obj_list:
+ if obj.obj_type == obj_type_raw:
+ return obj
+ return None
+
+ @staticmethod
+ def parse_header(data: bytes):
+ if len(data) < sizeof(IpFw3OpHeader):
+ raise ValueError("length less than op3 message header")
+ return IpFw3OpHeader.from_buffer_copy(data), sizeof(IpFw3OpHeader)
+
+ def parse_obj_list(self, data: bytes):
+ off = 0
+ while off < len(data):
+ # print("PARSE off={} rem={}".format(off, len(data) - off))
+ hdr = IpFwObjTlv.from_buffer_copy(data[off : off + sizeof(IpFwObjTlv)])
+ # print(" tlv len {}".format(hdr.length))
+ if hdr.length + off > len(data):
+ raise ValueError("TLV too big")
+ tlv = Tlv(hdr.n_type, data[off : off + hdr.length])
+ self.add_obj(tlv)
+ off += hdr.length
+
+ def is_type(self, msg_type):
+ return enum_or_int(msg_type) == self.msg_type
+
+ @property
+ def obj_name(self):
+ if self._enum is not None:
+ return self._enum.name
+ else:
+ return "msg#{}".format(self.obj_type)
+
+ def print_hdr(self, prepend=""):
+ print("{}len={}, type={}".format(prepend, len(bytes(self)), self.obj_name))
+
+ @classmethod
+ def from_bytes(cls, data):
+ try:
+ hdr, hdrlen = cls.parse_header(data)
+ self = cls(hdr.opcode)
+ self._orig_data = data
+ except ValueError as e:
+ print("Failed to parse op3 header: {}".format(e))
+ cls.print_as_bytes(data)
+ raise
+ tlv_list = Tlv.parse_tlvs(data[hdrlen:], self.attr_map)
+ self.obj_list.extend(tlv_list)
+ return self
+
+ def __bytes__(self):
+ ret = bytes(IpFw3OpHeader(opcode=self.obj_type))
+ for obj in self.obj_list:
+ ret += bytes(obj)
+ return ret
+
+ def print_obj(self):
+ self.print_hdr()
+ for obj in self.obj_list:
+ obj.print_obj(" ")
+
+ @staticmethod
+ def print_as_bytes(data: bytes, descr: str):
+ print("===vv {} (len:{:3d}) vv===".format(descr, len(data)))
+ off = 0
+ step = 16
+ while off < len(data):
+ for i in range(step):
+ if off + i < len(data):
+ print(" {:02X}".format(data[off + i]), end="")
+ print("")
+ off += step
+ print("--------------------")
+
+
+rule_attrs = prepare_attrs_map(
+ [
+ AttrDescr(
+ IpFwTlvType.IPFW_TLV_TBLNAME_LIST,
+ CTlv,
+ [
+ AttrDescr(IpFwTlvType.IPFW_TLV_TBL_NAME, NTlv),
+ AttrDescr(IpFwTlvType.IPFW_TLV_STATE_NAME, NTlv),
+ ],
+ True,
+ ),
+ AttrDescr(IpFwTlvType.IPFW_TLV_RULE_LIST, CTlvRule),
+ ]
+)
+
+
+class IpFwXRule(BaseIpFwMessage):
+ messages = [Op3CmdType.IP_FW_XADD]
+ attr_map = rule_attrs
+
+
+legacy_classes = []
+set3_classes = []
+get3_classes = [IpFwXRule]
diff --git a/tests/atf_python/sys/netpfil/ipfw/ioctl_headers.py b/tests/atf_python/sys/netpfil/ipfw/ioctl_headers.py
new file mode 100644
--- /dev/null
+++ b/tests/atf_python/sys/netpfil/ipfw/ioctl_headers.py
@@ -0,0 +1,90 @@
+from enum import Enum
+
+
+class Op3CmdType(Enum):
+ IP_FW_TABLE_XADD = 86
+ IP_FW_TABLE_XDEL = 87
+ IP_FW_TABLE_XGETSIZE = 88
+ IP_FW_TABLE_XLIST = 89
+ IP_FW_TABLE_XDESTROY = 90
+ IP_FW_TABLES_XLIST = 92
+ IP_FW_TABLE_XINFO = 93
+ IP_FW_TABLE_XFLUSH = 94
+ IP_FW_TABLE_XCREATE = 95
+ IP_FW_TABLE_XMODIFY = 96
+ IP_FW_XGET = 97
+ IP_FW_XADD = 98
+ IP_FW_XDEL = 99
+ IP_FW_XMOVE = 100
+ IP_FW_XZERO = 101
+ IP_FW_XRESETLOG = 102
+ IP_FW_SET_SWAP = 103
+ IP_FW_SET_MOVE = 104
+ IP_FW_SET_ENABLE = 105
+ IP_FW_TABLE_XFIND = 106
+ IP_FW_XIFLIST = 107
+ IP_FW_TABLES_ALIST = 108
+ IP_FW_TABLE_XSWAP = 109
+ IP_FW_TABLE_VLIST = 110
+ IP_FW_NAT44_XCONFIG = 111
+ IP_FW_NAT44_DESTROY = 112
+ IP_FW_NAT44_XGETCONFIG = 113
+ IP_FW_NAT44_LIST_NAT = 114
+ IP_FW_NAT44_XGETLOG = 115
+ IP_FW_DUMP_SOPTCODES = 116
+ IP_FW_DUMP_SRVOBJECTS = 117
+ IP_FW_NAT64STL_CREATE = 130
+ IP_FW_NAT64STL_DESTROY = 131
+ IP_FW_NAT64STL_CONFIG = 132
+ IP_FW_NAT64STL_LIST = 133
+ IP_FW_NAT64STL_STATS = 134
+ IP_FW_NAT64STL_RESET_STATS = 135
+ IP_FW_NAT64LSN_CREATE = 140
+ IP_FW_NAT64LSN_DESTROY = 141
+ IP_FW_NAT64LSN_CONFIG = 142
+ IP_FW_NAT64LSN_LIST = 143
+ IP_FW_NAT64LSN_STATS = 144
+ IP_FW_NAT64LSN_LIST_STATES = 145
+ IP_FW_NAT64LSN_RESET_STATS = 146
+ IP_FW_NPTV6_CREATE = 150
+ IP_FW_NPTV6_DESTROY = 151
+ IP_FW_NPTV6_CONFIG = 152
+ IP_FW_NPTV6_LIST = 153
+ IP_FW_NPTV6_STATS = 154
+ IP_FW_NPTV6_RESET_STATS = 155
+ IP_FW_NAT64CLAT_CREATE = 160
+ IP_FW_NAT64CLAT_DESTROY = 161
+ IP_FW_NAT64CLAT_CONFIG = 162
+ IP_FW_NAT64CLAT_LIST = 163
+ IP_FW_NAT64CLAT_STATS = 164
+ IP_FW_NAT64CLAT_RESET_STATS = 165
+
+
+class IpFwTableLookupType(Enum):
+ LOOKUP_DST_IP = 0
+ LOOKUP_SRC_IP = 1
+ LOOKUP_DST_PORT = 2
+ LOOKUP_SRC_PORT = 3
+ LOOKUP_UID = 4
+ LOOKUP_JAIL = 5
+ LOOKUP_DSCP = 6
+ LOOKUP_DST_MAC = 7
+ LOOKUP_SRC_MAC = 8
+ LOOKUP_MARK = 9
+
+
+class IpFwTlvType(Enum):
+ IPFW_TLV_TBL_NAME = 1
+ IPFW_TLV_TBLNAME_LIST = 2
+ IPFW_TLV_RULE_LIST = 3
+ IPFW_TLV_DYNSTATE_LIST = 4
+ IPFW_TLV_TBL_ENT = 5
+ IPFW_TLV_DYN_ENT = 6
+ IPFW_TLV_RULE_ENT = 7
+ IPFW_TLV_TBLENT_LIST = 8
+ IPFW_TLV_RANGE = 9
+ IPFW_TLV_EACTION = 10
+ IPFW_TLV_COUNTERS = 11
+ IPFW_TLV_OBJDATA = 12
+ IPFW_TLV_STATE_NAME = 14
+ IPFW_TLV_EACTION_BASE = 1000
diff --git a/tests/atf_python/sys/netpfil/ipfw/ipfw.py b/tests/atf_python/sys/netpfil/ipfw/ipfw.py
new file mode 100644
--- /dev/null
+++ b/tests/atf_python/sys/netpfil/ipfw/ipfw.py
@@ -0,0 +1,118 @@
+#!/usr/bin/env python3
+import os
+import socket
+import struct
+import subprocess
+import sys
+from ctypes import c_byte
+from ctypes import c_char
+from ctypes import c_int
+from ctypes import c_long
+from ctypes import c_uint32
+from ctypes import c_uint8
+from ctypes import c_ulong
+from ctypes import c_ushort
+from ctypes import sizeof
+from ctypes import Structure
+from enum import Enum
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import NamedTuple
+from typing import Optional
+from typing import Union
+
+from atf_python.sys.netpfil.ipfw.ioctl import get3_classes
+from atf_python.sys.netpfil.ipfw.ioctl import legacy_classes
+from atf_python.sys.netpfil.ipfw.ioctl import set3_classes
+from atf_python.sys.netpfil.ipfw.utils import AttrDescr
+from atf_python.sys.netpfil.ipfw.utils import enum_from_int
+from atf_python.sys.netpfil.ipfw.utils import enum_or_int
+from atf_python.sys.netpfil.ipfw.utils import prepare_attrs_map
+
+
+class DebugHeader(Structure):
+ _fields_ = [
+ ("cmd_type", c_ushort),
+ ("spare1", c_ushort),
+ ("opt_name", c_uint32),
+ ("total_len", c_uint32),
+ ("spare2", c_uint32),
+ ]
+
+
+class DebugType(Enum):
+ DO_CMD = 1
+ DO_SET3 = 2
+ DO_GET3 = 3
+
+
+class DebugIoReader(object):
+ HANDLER_CLASSES = {
+ DebugType.DO_CMD: legacy_classes,
+ DebugType.DO_SET3: set3_classes,
+ DebugType.DO_GET3: get3_classes,
+ }
+
+ def __init__(self, ipfw_path):
+ self._msgmap = self.build_msgmap()
+ self.ipfw_path = ipfw_path
+
+ def build_msgmap(self):
+ xmap = {}
+ for debug_type, handler_classes in self.HANDLER_CLASSES.items():
+ debug_type = enum_or_int(debug_type)
+ if debug_type not in xmap:
+ xmap[debug_type] = {}
+ for handler_class in handler_classes:
+ for msg in handler_class.messages:
+ xmap[debug_type][enum_or_int(msg)] = handler_class
+ return xmap
+
+ def print_obj_header(self, hdr):
+ debug_type = "#{}".format(hdr.cmd_type)
+ for _type in self.HANDLER_CLASSES.keys():
+ if _type.value == hdr.cmd_type:
+ debug_type = _type.name.lower()
+ break
+ print(
+ "@@ record for {} len={} optname={}".format(
+ debug_type, hdr.total_len, hdr.opt_name
+ )
+ )
+
+ def parse_record(self, data):
+ hdr = DebugHeader.from_buffer_copy(data[: sizeof(DebugHeader)])
+ data = data[sizeof(DebugHeader) :]
+ cls = self._msgmap[hdr.cmd_type].get(hdr.opt_name)
+ if cls is not None:
+ return cls.from_bytes(data)
+ raise ValueError(
+ "unsupported cmd_type={} opt_name={}".format(hdr.cmd_type, hdr.opt_name)
+ )
+
+ def get_record_from_stdin(self):
+ data = sys.stdin.buffer.peek(sizeof(DebugHeader))
+ if len(data) == 0:
+ return None
+
+ hdr = DebugHeader.from_buffer_copy(data)
+ data = sys.stdin.buffer.read(hdr.total_len)
+ return self.parse_record(data)
+
+ def get_records_from_buffer(self, data):
+ off = 0
+ ret = []
+ while off + sizeof(DebugHeader) <= len(data):
+ hdr = DebugHeader.from_buffer_copy(data[off : off + sizeof(DebugHeader)])
+ ret.append(self.parse_record(data[off : off + hdr.total_len]))
+ off += hdr.total_len
+ return ret
+
+ def run_ipfw(self, cmd: str) -> bytes:
+ args = [self.ipfw_path, "-xqn"] + cmd.split()
+ r = subprocess.run(args, capture_output=True)
+ return r.stdout
+
+ def get_records(self, cmd: str):
+ return self.get_records_from_buffer(self.run_ipfw(cmd))
diff --git a/tests/atf_python/sys/netpfil/ipfw/utils.py b/tests/atf_python/sys/netpfil/ipfw/utils.py
new file mode 100644
--- /dev/null
+++ b/tests/atf_python/sys/netpfil/ipfw/utils.py
@@ -0,0 +1,61 @@
+#!/usr/bin/env python3
+
+import os
+import socket
+import struct
+import subprocess
+import sys
+from enum import Enum
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Union
+from typing import Any
+from typing import NamedTuple
+import pytest
+
+
+def roundup2(val: int, num: int) -> int:
+ if val % num:
+ return (val | (num - 1)) + 1
+ else:
+ return val
+
+
+def align8(val: int) -> int:
+ return roundup2(val, 8)
+
+
+def enum_or_int(val) -> int:
+ if isinstance(val, Enum):
+ return val.value
+ return val
+
+
+def enum_from_int(enum_class: Enum, val) -> Enum:
+ if isinstance(val, Enum):
+ return val
+ for item in enum_class:
+ if val == item.value:
+ return item
+ return None
+
+
+class AttrDescr(NamedTuple):
+ val: Enum
+ cls: Any
+ child_map: Any = None
+ is_array: bool = False
+
+
+def prepare_attrs_map(attrs: List[AttrDescr]) -> Dict[str, Dict]:
+ ret = {}
+ for ad in attrs:
+ ret[ad.val.value] = {"ad": ad}
+ if ad.child_map:
+ ret[ad.val.value]["child"] = prepare_attrs_map(ad.child_map)
+ ret[ad.val.value]["is_array"] = ad.is_array
+ return ret
+
+
+

File Metadata

Mime Type
text/plain
Expires
Mon, Oct 20, 6:17 PM (12 h, 20 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
23979163
Default Alt Text
D40488.diff (64 KB)

Event Timeline