Page Menu
Home
FreeBSD
Search
Configure Global Search
Log In
Files
F132762783
D40488.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
64 KB
Referenced Files
None
Subscribers
None
D40488.diff
View Options
diff --git a/etc/mtree/BSD.tests.dist b/etc/mtree/BSD.tests.dist
--- a/etc/mtree/BSD.tests.dist
+++ b/etc/mtree/BSD.tests.dist
@@ -450,6 +450,8 @@
..
ifconfig
..
+ ipfw
+ ..
md5
..
mdconfig
diff --git a/sbin/ipfw/ipfw2.h b/sbin/ipfw/ipfw2.h
--- a/sbin/ipfw/ipfw2.h
+++ b/sbin/ipfw/ipfw2.h
@@ -48,6 +48,7 @@
int test_only; /* only check syntax */
int comment_only; /* only print action and comment */
int verbose; /* be verbose on some commands */
+ int debug_only; /* output ioctl i/o on stdout */
/* The options below can have multiple values. */
diff --git a/sbin/ipfw/ipfw2.c b/sbin/ipfw/ipfw2.c
--- a/sbin/ipfw/ipfw2.c
+++ b/sbin/ipfw/ipfw2.c
@@ -587,6 +587,13 @@
return (strcmp(a, b));
}
+struct debug_header {
+ uint16_t cmd_type;
+ uint16_t spare1;
+ uint32_t opt_name;
+ uint32_t total_len;
+ uint32_t spare2;
+};
/*
* conditionally runs the command.
@@ -597,8 +604,18 @@
{
int i;
+ if (g_co.debug_only) {
+ struct debug_header dbg = {
+ .cmd_type = 1,
+ .opt_name = optname,
+ .total_len = optlen + sizeof(struct debug_header),
+ };
+ write(1, &dbg, sizeof(dbg));
+ write(1, optval, optlen);
+ }
+
if (g_co.test_only)
- return 0;
+ return (0);
if (ipfw_socket == -1)
ipfw_socket = socket(AF_INET, SOCK_RAW, IPPROTO_RAW);
@@ -617,7 +634,7 @@
} else {
i = setsockopt(ipfw_socket, IPPROTO_IP, optname, optval, optlen);
}
- return i;
+ return (i);
}
/*
@@ -634,6 +651,18 @@
do_set3(int optname, ip_fw3_opheader *op3, size_t optlen)
{
+ op3->opcode = optname;
+
+ if (g_co.debug_only) {
+ struct debug_header dbg = {
+ .cmd_type = 2,
+ .opt_name = optname,
+ .total_len = optlen, sizeof(struct debug_header),
+ };
+ write(1, &dbg, sizeof(dbg));
+ write(1, op3, optlen);
+ }
+
if (g_co.test_only)
return (0);
@@ -642,7 +671,6 @@
if (ipfw_socket < 0)
err(EX_UNAVAILABLE, "socket");
- op3->opcode = optname;
return (setsockopt(ipfw_socket, IPPROTO_IP, IP_FW3, op3, optlen));
}
@@ -663,6 +691,18 @@
int error;
socklen_t len;
+ op3->opcode = optname;
+
+ if (g_co.debug_only) {
+ struct debug_header dbg = {
+ .cmd_type = 3,
+ .opt_name = optname,
+ .total_len = *optlen + sizeof(struct debug_header),
+ };
+ write(1, &dbg, sizeof(dbg));
+ write(1, op3, *optlen);
+ }
+
if (g_co.test_only)
return (0);
@@ -671,7 +711,6 @@
if (ipfw_socket < 0)
err(EX_UNAVAILABLE, "socket");
- op3->opcode = optname;
len = *optlen;
error = getsockopt(ipfw_socket, IPPROTO_IP, IP_FW3, op3, &len);
diff --git a/sbin/ipfw/main.c b/sbin/ipfw/main.c
--- a/sbin/ipfw/main.c
+++ b/sbin/ipfw/main.c
@@ -277,7 +277,7 @@
optind = optreset = 1; /* restart getopt() */
if (is_ipfw()) {
- while ((ch = getopt(ac, av, "abcdDefhinNp:qs:STtv")) != -1)
+ while ((ch = getopt(ac, av, "abcdDefhinNp:qs:STtvx")) != -1)
switch (ch) {
case 'a':
do_acct = 1;
@@ -354,6 +354,10 @@
g_co.verbose = 1;
break;
+ case 'x': /* debug output */
+ g_co.debug_only = 1;
+ break;
+
default:
free(save_av);
return 1;
diff --git a/sbin/ipfw/tests/Makefile b/sbin/ipfw/tests/Makefile
new file mode 100644
--- /dev/null
+++ b/sbin/ipfw/tests/Makefile
@@ -0,0 +1,5 @@
+PACKAGE= tests
+
+ATF_TESTS_PYTEST+= test_add_rule.py
+
+.include <bsd.test.mk>
diff --git a/sbin/ipfw/tests/test_add_rule.py b/sbin/ipfw/tests/test_add_rule.py
new file mode 100755
--- /dev/null
+++ b/sbin/ipfw/tests/test_add_rule.py
@@ -0,0 +1,400 @@
+import errno
+import json
+import os
+import socket
+import struct
+import subprocess
+import sys
+from ctypes import c_byte
+from ctypes import c_char
+from ctypes import c_int
+from ctypes import c_long
+from ctypes import c_uint32
+from ctypes import c_uint8
+from ctypes import c_ulong
+from ctypes import c_ushort
+from ctypes import sizeof
+from ctypes import Structure
+from enum import Enum
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import NamedTuple
+from typing import Optional
+from typing import Union
+
+import pytest
+from atf_python.sys.netpfil.ipfw.insns import Icmp6RejectCode
+from atf_python.sys.netpfil.ipfw.insns import IcmpRejectCode
+from atf_python.sys.netpfil.ipfw.insns import Insn
+from atf_python.sys.netpfil.ipfw.insns import InsnComment
+from atf_python.sys.netpfil.ipfw.insns import InsnEmpty
+from atf_python.sys.netpfil.ipfw.insns import InsnIp
+from atf_python.sys.netpfil.ipfw.insns import InsnIp6
+from atf_python.sys.netpfil.ipfw.insns import InsnPorts
+from atf_python.sys.netpfil.ipfw.insns import InsnProb
+from atf_python.sys.netpfil.ipfw.insns import InsnProto
+from atf_python.sys.netpfil.ipfw.insns import InsnReject
+from atf_python.sys.netpfil.ipfw.insns import InsnTable
+from atf_python.sys.netpfil.ipfw.insns import IpFwOpcode
+from atf_python.sys.netpfil.ipfw.ioctl import CTlv
+from atf_python.sys.netpfil.ipfw.ioctl import CTlvRule
+from atf_python.sys.netpfil.ipfw.ioctl import IpFwTlvType
+from atf_python.sys.netpfil.ipfw.ioctl import IpFwXRule
+from atf_python.sys.netpfil.ipfw.ioctl import NTlv
+from atf_python.sys.netpfil.ipfw.ioctl import Op3CmdType
+from atf_python.sys.netpfil.ipfw.ioctl import RawRule
+from atf_python.sys.netpfil.ipfw.ipfw import DebugIoReader
+from atf_python.sys.netpfil.ipfw.utils import enum_from_int
+from atf_python.utils import BaseTest
+
+
+IPFW_PATH = "/sbin/ipfw"
+
+
+def differ(w_obj, g_obj, w_stack=[], g_stack=[]):
+ if bytes(w_obj) == bytes(g_obj):
+ return True
+ num_objects = 0
+ for i, w_child in enumerate(w_obj.obj_list):
+ if i > len(g_obj.obj_list):
+ print("MISSING object from chain {}".format(" / ".join(w_stack)))
+ w_child.print_obj()
+ print("==========================")
+ return False
+ g_child = g_obj.obj_list[i]
+ if bytes(w_child) == bytes(g_child):
+ num_objects += 1
+ continue
+ w_stack.append(w_obj.obj_name)
+ g_stack.append(g_obj.obj_name)
+ if not differ(w_child, g_child, w_stack, g_stack):
+ return False
+ break
+ if num_objects == len(w_obj.obj_list) and num_objects < len(g_obj.obj_list):
+ g_child = g_obj.obj_list[num_objects]
+ print("EXTRA object from chain {}".format(" / ".join(g_stack)))
+ g_child.print_obj()
+ print("==========================")
+ return False
+ print("OBJECTS DIFFER")
+ print("WANTED CHAIN: {}".format(" / ".join(w_stack)))
+ w_obj.print_obj()
+ w_obj.print_obj_hex()
+ print("==========================")
+ print("GOT CHAIN: {}".format(" / ".join(g_stack)))
+ g_obj.print_obj()
+ g_obj.print_obj_hex()
+ print("==========================")
+ return False
+
+
+class TestAddRule(BaseTest):
+ def compile_rule(self, out):
+ tlvs = []
+ if "objs" in out:
+ tlvs.append(CTlv(IpFwTlvType.IPFW_TLV_TBLNAME_LIST, out["objs"]))
+ rule = RawRule(rulenum=out.get("rulenum", 0), obj_list=out["insns"])
+ tlvs.append(CTlvRule(obj_list=[rule]))
+ return IpFwXRule(Op3CmdType.IP_FW_XADD, tlvs)
+
+ def verify_rule(self, in_data: str, out_data):
+ # Prepare the desired output
+ expected = self.compile_rule(out_data)
+
+ reader = DebugIoReader(IPFW_PATH)
+ ioctls = reader.get_records(in_data)
+ assert len(ioctls) == 1 # Only 1 ioctl request expected
+ got = ioctls[0]
+
+ if not differ(expected, got):
+ print("=> CMD: {}".format(in_data))
+ print("=> WANTED:")
+ expected.print_obj()
+ print("==========================")
+ print("=> GOT:")
+ got.print_obj()
+ print("==========================")
+ assert bytes(got) == bytes(expected)
+
+ @pytest.mark.parametrize(
+ "rule",
+ [
+ pytest.param(
+ {
+ "in": "add 200 allow ip from any to any",
+ "out": {
+ "insns": [InsnEmpty(IpFwOpcode.O_ACCEPT)],
+ "rulenum": 200,
+ },
+ },
+ id="test_rulenum",
+ ),
+ pytest.param(
+ {
+ "in": "add allow ip from { 1.2.3.4 or 2.3.4.5 } to any",
+ "out": {
+ "insns": [
+ InsnIp(IpFwOpcode.O_IP_SRC, ip="1.2.3.4", is_or=True),
+ InsnIp(IpFwOpcode.O_IP_SRC, ip="2.3.4.5"),
+ InsnEmpty(IpFwOpcode.O_ACCEPT),
+ ],
+ },
+ },
+ id="test_or",
+ ),
+ pytest.param(
+ {
+ "in": "add allow ip from table(AAA) to table(BBB)",
+ "out": {
+ "objs": [
+ NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"),
+ NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"),
+ ],
+ "insns": [
+ InsnTable(IpFwOpcode.O_IP_SRC_LOOKUP, arg1=1),
+ InsnTable(IpFwOpcode.O_IP_DST_LOOKUP, arg1=2),
+ InsnEmpty(IpFwOpcode.O_ACCEPT),
+ ],
+ },
+ },
+ id="test_tables",
+ ),
+ pytest.param(
+ {
+ "in": "add allow ip from any to 1.2.3.4 // test comment",
+ "out": {
+ "insns": [
+ InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"),
+ InsnComment(comment="test comment"),
+ InsnEmpty(IpFwOpcode.O_ACCEPT),
+ ],
+ },
+ },
+ id="test_comment",
+ ),
+ ],
+ )
+ def test_add_rule(self, rule):
+ """Tests if the compiled rule is sane and matches the spec"""
+ self.verify_rule(rule["in"], rule["out"])
+
+ @pytest.mark.parametrize(
+ "action",
+ [
+ pytest.param(("allow", InsnEmpty(IpFwOpcode.O_ACCEPT)), id="test_allow"),
+ pytest.param(
+ (
+ "abort",
+ Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_REJECT_ABORT),
+ ),
+ id="abort",
+ ),
+ pytest.param(
+ (
+ "abort6",
+ Insn(
+ IpFwOpcode.O_UNREACH6, arg1=Icmp6RejectCode.ICMP6_UNREACH_ABORT
+ ),
+ ),
+ id="abort6",
+ ),
+ pytest.param(("accept", InsnEmpty(IpFwOpcode.O_ACCEPT)), id="accept"),
+ pytest.param(("deny", InsnEmpty(IpFwOpcode.O_DENY)), id="deny"),
+ pytest.param(
+ (
+ "reject",
+ Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_HOST),
+ ),
+ id="reject",
+ ),
+ pytest.param(
+ (
+ "reset",
+ Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_REJECT_RST),
+ ),
+ id="reset",
+ ),
+ pytest.param(
+ (
+ "reset6",
+ Insn(IpFwOpcode.O_UNREACH6, arg1=Icmp6RejectCode.ICMP6_UNREACH_RST),
+ ),
+ id="reset6",
+ ),
+ pytest.param(
+ (
+ "unreach port",
+ InsnReject(
+ IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_PORT
+ ),
+ ),
+ id="unreach_port",
+ ),
+ pytest.param(
+ (
+ "unreach port",
+ InsnReject(
+ IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_PORT
+ ),
+ ),
+ id="unreach_port",
+ ),
+ pytest.param(
+ (
+ "unreach needfrag",
+ InsnReject(
+ IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_NEEDFRAG
+ ),
+ ),
+ id="unreach_needfrag",
+ ),
+ pytest.param(
+ (
+ "unreach needfrag 1420",
+ InsnReject(
+ IpFwOpcode.O_REJECT,
+ arg1=IcmpRejectCode.ICMP_UNREACH_NEEDFRAG,
+ mtu=1420,
+ ),
+ ),
+ id="unreach_needfrag_mtu",
+ ),
+ pytest.param(
+ (
+ "unreach6 port",
+ Insn(
+ IpFwOpcode.O_UNREACH6,
+ arg1=Icmp6RejectCode.ICMP6_DST_UNREACH_NOPORT,
+ ),
+ ),
+ id="unreach6_port",
+ ),
+ pytest.param(("count", InsnEmpty(IpFwOpcode.O_COUNT)), id="count"),
+ # TOK_NAT
+ pytest.param(
+ ("queue 42", Insn(IpFwOpcode.O_QUEUE, arg1=42)), id="queue_42"
+ ),
+ pytest.param(("pipe 42", Insn(IpFwOpcode.O_PIPE, arg1=42)), id="pipe_42"),
+ pytest.param(
+ ("skipto 42", Insn(IpFwOpcode.O_SKIPTO, arg1=42)), id="skipto_42"
+ ),
+ pytest.param(
+ ("netgraph 42", Insn(IpFwOpcode.O_NETGRAPH, arg1=42)), id="netgraph_42"
+ ),
+ pytest.param(
+ ("ngtee 42", Insn(IpFwOpcode.O_NGTEE, arg1=42)), id="ngtee_42"
+ ),
+ pytest.param(
+ ("divert 42", Insn(IpFwOpcode.O_DIVERT, arg1=42)), id="divert_42"
+ ),
+ pytest.param(
+ ("divert natd", Insn(IpFwOpcode.O_DIVERT, arg1=8668)), id="divert_natd"
+ ),
+ pytest.param(("tee 42", Insn(IpFwOpcode.O_TEE, arg1=42)), id="tee_42"),
+ pytest.param(
+ ("call 420", Insn(IpFwOpcode.O_CALLRETURN, arg1=420)), id="call_420"
+ ),
+ # TOK_FORWARD
+ # TOK_COMMENT
+ pytest.param(
+ ("setfib 1", Insn(IpFwOpcode.O_SETFIB, arg1=1 | 0x8000)),
+ id="setfib_1",
+ marks=pytest.mark.skip("needs net.fibs>1"),
+ ),
+ pytest.param(
+ ("setdscp 42", Insn(IpFwOpcode.O_SETDSCP, arg1=42 | 0x8000)),
+ id="setdscp_42",
+ ),
+ pytest.param(("reass", InsnEmpty(IpFwOpcode.O_REASS)), id="reass"),
+ pytest.param(
+ ("return", InsnEmpty(IpFwOpcode.O_CALLRETURN, is_not=True)), id="return"
+ ),
+ ],
+ )
+ def test_add_action(self, action):
+ """Tests if the rule action is compiled properly"""
+ rule_in = "add {} ip from any to any".format(action[0])
+ rule_out = {"insns": [action[1]]}
+ self.verify_rule(rule_in, rule_out)
+
+ @pytest.mark.parametrize(
+ "insn",
+ [
+ pytest.param(
+ {
+ "in": "add prob 0.7 allow ip from any to any",
+ "out": InsnProb(prob=0.7),
+ },
+ id="test_prob",
+ ),
+ pytest.param(
+ {
+ "in": "add allow tcp from any to any",
+ "out": InsnProto(arg1=6),
+ },
+ id="test_proto",
+ ),
+ pytest.param(
+ {
+ "in": "add allow ip from any to any 57",
+ "out": InsnPorts(IpFwOpcode.O_IP_DSTPORT, port_pairs=[57, 57]),
+ },
+ id="test_ports",
+ ),
+ ],
+ )
+ def test_add_single_instruction(self, insn):
+ """Tests if the compiled rule is sane and matches the spec"""
+
+ # Prepare the desired output
+ out = {
+ "insns": [insn["out"], InsnEmpty(IpFwOpcode.O_ACCEPT)],
+ }
+ self.verify_rule(insn["in"], out)
+
+ @pytest.mark.parametrize(
+ "opcode",
+ [
+ pytest.param(IpFwOpcode.O_IP_SRCPORT, id="src"),
+ pytest.param(IpFwOpcode.O_IP_DSTPORT, id="dst"),
+ ],
+ )
+ @pytest.mark.parametrize(
+ "params",
+ [
+ pytest.param(
+ {
+ "in": "57",
+ "out": [(57, 57)],
+ },
+ id="test_single",
+ ),
+ pytest.param(
+ {
+ "in": "57-59",
+ "out": [(57, 59)],
+ },
+ id="test_range",
+ ),
+ pytest.param(
+ {
+ "in": "57-59,41",
+ "out": [(57, 59), (41, 41)],
+ },
+ id="test_ranges",
+ ),
+ ],
+ )
+ def test_add_ports(self, params, opcode):
+ if opcode == IpFwOpcode.O_IP_DSTPORT:
+ txt = "add allow ip from any to any " + params["in"]
+ else:
+ txt = "add allow ip from any " + params["in"] + " to any"
+ out = {
+ "insns": [
+ InsnPorts(opcode, port_pairs=params["out"]),
+ InsnEmpty(IpFwOpcode.O_ACCEPT),
+ ]
+ }
+ self.verify_rule(txt, out)
diff --git a/tests/atf_python/sys/Makefile b/tests/atf_python/sys/Makefile
--- a/tests/atf_python/sys/Makefile
+++ b/tests/atf_python/sys/Makefile
@@ -3,7 +3,7 @@
.PATH: ${.CURDIR}
FILES= __init__.py
-SUBDIR= net netlink
+SUBDIR= net netlink netpfil
.include <bsd.own.mk>
FILESDIR= ${TESTSBASE}/atf_python/sys
diff --git a/tests/atf_python/sys/Makefile b/tests/atf_python/sys/netpfil/Makefile
copy from tests/atf_python/sys/Makefile
copy to tests/atf_python/sys/netpfil/Makefile
--- a/tests/atf_python/sys/Makefile
+++ b/tests/atf_python/sys/netpfil/Makefile
@@ -3,9 +3,9 @@
.PATH: ${.CURDIR}
FILES= __init__.py
-SUBDIR= net netlink
+SUBDIR= ipfw
.include <bsd.own.mk>
-FILESDIR= ${TESTSBASE}/atf_python/sys
+FILESDIR= ${TESTSBASE}/atf_python/sys/netpfil
.include <bsd.prog.mk>
diff --git a/tests/atf_python/sys/netpfil/__init__.py b/tests/atf_python/sys/netpfil/__init__.py
new file mode 100644
diff --git a/tests/atf_python/sys/netpfil/ipfw/Makefile b/tests/atf_python/sys/netpfil/ipfw/Makefile
new file mode 100644
--- /dev/null
+++ b/tests/atf_python/sys/netpfil/ipfw/Makefile
@@ -0,0 +1,12 @@
+.include <src.opts.mk>
+
+.PATH: ${.CURDIR}
+
+FILES= __init__.py insns.py insn_headers.py ioctl.py ioctl_headers.py \
+ ipfw.py utils.py
+
+.include <bsd.own.mk>
+FILESDIR= ${TESTSBASE}/atf_python/sys/netpfil/ipfw
+
+.include <bsd.prog.mk>
+
diff --git a/tests/atf_python/sys/netpfil/ipfw/__init__.py b/tests/atf_python/sys/netpfil/ipfw/__init__.py
new file mode 100644
diff --git a/tests/atf_python/sys/netpfil/ipfw/insn_headers.py b/tests/atf_python/sys/netpfil/ipfw/insn_headers.py
new file mode 100644
--- /dev/null
+++ b/tests/atf_python/sys/netpfil/ipfw/insn_headers.py
@@ -0,0 +1,198 @@
+from enum import Enum
+
+
+class IpFwOpcode(Enum):
+ O_NOP = 0
+ O_IP_SRC = 1
+ O_IP_SRC_MASK = 2
+ O_IP_SRC_ME = 3
+ O_IP_SRC_SET = 4
+ O_IP_DST = 5
+ O_IP_DST_MASK = 6
+ O_IP_DST_ME = 7
+ O_IP_DST_SET = 8
+ O_IP_SRCPORT = 9
+ O_IP_DSTPORT = 10
+ O_PROTO = 11
+ O_MACADDR2 = 12
+ O_MAC_TYPE = 13
+ O_LAYER2 = 14
+ O_IN = 15
+ O_FRAG = 16
+ O_RECV = 17
+ O_XMIT = 18
+ O_VIA = 19
+ O_IPOPT = 20
+ O_IPLEN = 21
+ O_IPID = 22
+ O_IPTOS = 23
+ O_IPPRECEDENCE = 24
+ O_IPTTL = 25
+ O_IPVER = 26
+ O_UID = 27
+ O_GID = 28
+ O_ESTAB = 29
+ O_TCPFLAGS = 30
+ O_TCPWIN = 31
+ O_TCPSEQ = 32
+ O_TCPACK = 33
+ O_ICMPTYPE = 34
+ O_TCPOPTS = 35
+ O_VERREVPATH = 36
+ O_VERSRCREACH = 37
+ O_PROBE_STATE = 38
+ O_KEEP_STATE = 39
+ O_LIMIT = 40
+ O_LIMIT_PARENT = 41
+ O_LOG = 42
+ O_PROB = 43
+ O_CHECK_STATE = 44
+ O_ACCEPT = 45
+ O_DENY = 46
+ O_REJECT = 47
+ O_COUNT = 48
+ O_SKIPTO = 49
+ O_PIPE = 50
+ O_QUEUE = 51
+ O_DIVERT = 52
+ O_TEE = 53
+ O_FORWARD_IP = 54
+ O_FORWARD_MAC = 55
+ O_NAT = 56
+ O_REASS = 57
+ O_IPSEC = 58
+ O_IP_SRC_LOOKUP = 59
+ O_IP_DST_LOOKUP = 60
+ O_ANTISPOOF = 61
+ O_JAIL = 62
+ O_ALTQ = 63
+ O_DIVERTED = 64
+ O_TCPDATALEN = 65
+ O_IP6_SRC = 66
+ O_IP6_SRC_ME = 67
+ O_IP6_SRC_MASK = 68
+ O_IP6_DST = 69
+ O_IP6_DST_ME = 70
+ O_IP6_DST_MASK = 71
+ O_FLOW6ID = 72
+ O_ICMP6TYPE = 73
+ O_EXT_HDR = 74
+ O_IP6 = 75
+ O_NETGRAPH = 76
+ O_NGTEE = 77
+ O_IP4 = 78
+ O_UNREACH6 = 79
+ O_TAG = 80
+ O_TAGGED = 81
+ O_SETFIB = 82
+ O_FIB = 83
+ O_SOCKARG = 84
+ O_CALLRETURN = 85
+ O_FORWARD_IP6 = 86
+ O_DSCP = 87
+ O_SETDSCP = 88
+ O_IP_FLOW_LOOKUP = 89
+ O_EXTERNAL_ACTION = 90
+ O_EXTERNAL_INSTANCE = 91
+ O_EXTERNAL_DATA = 92
+ O_SKIP_ACTION = 93
+ O_TCPMSS = 94
+ O_MAC_SRC_LOOKUP = 95
+ O_MAC_DST_LOOKUP = 96
+ O_SETMARK = 97
+ O_MARK = 98
+ O_LAST_OPCODE = 99
+
+
+class Op3CmdType(Enum):
+ IP_FW_TABLE_XADD = 86
+ IP_FW_TABLE_XDEL = 87
+ IP_FW_TABLE_XGETSIZE = 88
+ IP_FW_TABLE_XLIST = 89
+ IP_FW_TABLE_XDESTROY = 90
+ IP_FW_TABLES_XLIST = 92
+ IP_FW_TABLE_XINFO = 93
+ IP_FW_TABLE_XFLUSH = 94
+ IP_FW_TABLE_XCREATE = 95
+ IP_FW_TABLE_XMODIFY = 96
+ IP_FW_XGET = 97
+ IP_FW_XADD = 98
+ IP_FW_XDEL = 99
+ IP_FW_XMOVE = 100
+ IP_FW_XZERO = 101
+ IP_FW_XRESETLOG = 102
+ IP_FW_SET_SWAP = 103
+ IP_FW_SET_MOVE = 104
+ IP_FW_SET_ENABLE = 105
+ IP_FW_TABLE_XFIND = 106
+ IP_FW_XIFLIST = 107
+ IP_FW_TABLES_ALIST = 108
+ IP_FW_TABLE_XSWAP = 109
+ IP_FW_TABLE_VLIST = 110
+ IP_FW_NAT44_XCONFIG = 111
+ IP_FW_NAT44_DESTROY = 112
+ IP_FW_NAT44_XGETCONFIG = 113
+ IP_FW_NAT44_LIST_NAT = 114
+ IP_FW_NAT44_XGETLOG = 115
+ IP_FW_DUMP_SOPTCODES = 116
+ IP_FW_DUMP_SRVOBJECTS = 117
+ IP_FW_NAT64STL_CREATE = 130
+ IP_FW_NAT64STL_DESTROY = 131
+ IP_FW_NAT64STL_CONFIG = 132
+ IP_FW_NAT64STL_LIST = 133
+ IP_FW_NAT64STL_STATS = 134
+ IP_FW_NAT64STL_RESET_STATS = 135
+ IP_FW_NAT64LSN_CREATE = 140
+ IP_FW_NAT64LSN_DESTROY = 141
+ IP_FW_NAT64LSN_CONFIG = 142
+ IP_FW_NAT64LSN_LIST = 143
+ IP_FW_NAT64LSN_STATS = 144
+ IP_FW_NAT64LSN_LIST_STATES = 145
+ IP_FW_NAT64LSN_RESET_STATS = 146
+ IP_FW_NPTV6_CREATE = 150
+ IP_FW_NPTV6_DESTROY = 151
+ IP_FW_NPTV6_CONFIG = 152
+ IP_FW_NPTV6_LIST = 153
+ IP_FW_NPTV6_STATS = 154
+ IP_FW_NPTV6_RESET_STATS = 155
+ IP_FW_NAT64CLAT_CREATE = 160
+ IP_FW_NAT64CLAT_DESTROY = 161
+ IP_FW_NAT64CLAT_CONFIG = 162
+ IP_FW_NAT64CLAT_LIST = 163
+ IP_FW_NAT64CLAT_STATS = 164
+ IP_FW_NAT64CLAT_RESET_STATS = 165
+
+
+class IcmpRejectCode(Enum):
+ ICMP_UNREACH_NET = 0
+ ICMP_UNREACH_HOST = 1
+ ICMP_UNREACH_PROTOCOL = 2
+ ICMP_UNREACH_PORT = 3
+ ICMP_UNREACH_NEEDFRAG = 4
+ ICMP_UNREACH_SRCFAIL = 5
+ ICMP_UNREACH_NET_UNKNOWN = 6
+ ICMP_UNREACH_HOST_UNKNOWN = 7
+ ICMP_UNREACH_ISOLATED = 8
+ ICMP_UNREACH_NET_PROHIB = 9
+ ICMP_UNREACH_HOST_PROHIB = 10
+ ICMP_UNREACH_TOSNET = 11
+ ICMP_UNREACH_TOSHOST = 12
+ ICMP_UNREACH_FILTER_PROHIB = 13
+ ICMP_UNREACH_HOST_PRECEDENCE = 14
+ ICMP_UNREACH_PRECEDENCE_CUTOFF = 15
+ ICMP_REJECT_RST = 256
+ ICMP_REJECT_ABORT = 257
+
+
+class Icmp6RejectCode(Enum):
+ ICMP6_DST_UNREACH_NOROUTE = 0
+ ICMP6_DST_UNREACH_ADMIN = 1
+ ICMP6_DST_UNREACH_BEYONDSCOPE = 2
+ ICMP6_DST_UNREACH_NOTNEIGHBOR = 2
+ ICMP6_DST_UNREACH_ADDR = 3
+ ICMP6_DST_UNREACH_NOPORT = 4
+ ICMP6_DST_UNREACH_POLICY = 5
+ ICMP6_DST_UNREACH_REJECT = 6
+ ICMP6_DST_UNREACH_SRCROUTE = 7
+ ICMP6_UNREACH_RST = 256
+ ICMP6_UNREACH_ABORT = 257
diff --git a/tests/atf_python/sys/netpfil/ipfw/insns.py b/tests/atf_python/sys/netpfil/ipfw/insns.py
new file mode 100644
--- /dev/null
+++ b/tests/atf_python/sys/netpfil/ipfw/insns.py
@@ -0,0 +1,555 @@
+#!/usr/bin/env python3
+import os
+import socket
+import struct
+import subprocess
+import sys
+from ctypes import c_byte
+from ctypes import c_char
+from ctypes import c_int
+from ctypes import c_long
+from ctypes import c_uint32
+from ctypes import c_uint8
+from ctypes import c_ulong
+from ctypes import c_ushort
+from ctypes import sizeof
+from ctypes import Structure
+from enum import Enum
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import NamedTuple
+from typing import Optional
+from typing import Union
+
+from atf_python.sys.netpfil.ipfw.insn_headers import IpFwOpcode
+from atf_python.sys.netpfil.ipfw.insn_headers import IcmpRejectCode
+from atf_python.sys.netpfil.ipfw.insn_headers import Icmp6RejectCode
+from atf_python.sys.netpfil.ipfw.utils import AttrDescr
+from atf_python.sys.netpfil.ipfw.utils import enum_or_int
+from atf_python.sys.netpfil.ipfw.utils import enum_from_int
+from atf_python.sys.netpfil.ipfw.utils import prepare_attrs_map
+
+
+insn_actions = (
+ IpFwOpcode.O_CHECK_STATE.value,
+ IpFwOpcode.O_REJECT.value,
+ IpFwOpcode.O_UNREACH6.value,
+ IpFwOpcode.O_ACCEPT.value,
+ IpFwOpcode.O_DENY.value,
+ IpFwOpcode.O_COUNT.value,
+ IpFwOpcode.O_NAT.value,
+ IpFwOpcode.O_QUEUE.value,
+ IpFwOpcode.O_PIPE.value,
+ IpFwOpcode.O_SKIPTO.value,
+ IpFwOpcode.O_NETGRAPH.value,
+ IpFwOpcode.O_NGTEE.value,
+ IpFwOpcode.O_DIVERT.value,
+ IpFwOpcode.O_TEE.value,
+ IpFwOpcode.O_CALLRETURN.value,
+ IpFwOpcode.O_FORWARD_IP.value,
+ IpFwOpcode.O_FORWARD_IP6.value,
+ IpFwOpcode.O_SETFIB.value,
+ IpFwOpcode.O_SETDSCP.value,
+ IpFwOpcode.O_REASS.value,
+ IpFwOpcode.O_SETMARK.value,
+ IpFwOpcode.O_EXTERNAL_ACTION.value,
+)
+
+
+class IpFwInsn(Structure):
+ _fields_ = [
+ ("opcode", c_uint8),
+ ("length", c_uint8),
+ ("arg1", c_ushort),
+ ]
+
+
+class BaseInsn(object):
+ obj_enum_class = IpFwOpcode
+
+ def __init__(self, opcode, is_or, is_not, arg1):
+ if isinstance(opcode, Enum):
+ self.obj_type = opcode.value
+ self._enum = opcode
+ else:
+ self.obj_type = opcode
+ self._enum = enum_from_int(self.obj_enum_class, self.obj_type)
+ self.is_or = is_or
+ self.is_not = is_not
+ self.arg1 = arg1
+ self.is_action = self.obj_type in insn_actions
+ self.ilen = 1
+ self.obj_list = []
+
+ @property
+ def obj_name(self):
+ if self._enum is not None:
+ return self._enum.name
+ else:
+ return "opcode#{}".format(self.obj_type)
+
+ @staticmethod
+ def get_insn_len(data: bytes) -> int:
+ (opcode_len,) = struct.unpack("@B", data[1:2])
+ return opcode_len & 0x3F
+
+ @classmethod
+ def _validate_len(cls, data, valid_options=None):
+ if len(data) < 4:
+ raise ValueError("opcode too short")
+ opcode_type, opcode_len = struct.unpack("@BB", data[:2])
+ if len(data) != ((opcode_len & 0x3F) * 4):
+ raise ValueError("wrong length")
+ if valid_options and len(data) not in valid_options:
+ raise ValueError(
+ "len {} not in {} for {}".format(
+ len(data), valid_options,
+ enum_from_int(cls.obj_enum_class, data[0])
+ )
+ )
+
+ @classmethod
+ def _validate(cls, data):
+ cls._validate_len(data)
+
+ @classmethod
+ def _parse(cls, data):
+ insn = IpFwInsn.from_buffer_copy(data[:4])
+ is_or = (insn.length & 0x40) != 0
+ is_not = (insn.length & 0x80) != 0
+ return cls(opcode=insn.opcode, is_or=is_or, is_not=is_not, arg1=insn.arg1)
+
+ @classmethod
+ def from_bytes(cls, data, attr_type_enum):
+ cls._validate(data)
+ opcode = cls._parse(data)
+ opcode._enum = attr_type_enum
+ return opcode
+
+ def __bytes__(self):
+ raise NotImplementedError()
+
+ def print_obj(self, prepend=""):
+ is_or = ""
+ if self.is_or:
+ is_or = " [OR]\\"
+ is_not = ""
+ if self.is_not:
+ is_not = "[!] "
+ print(
+ "{}{}len={} type={}({}){}{}".format(
+ prepend,
+ is_not,
+ len(bytes(self)),
+ self.obj_name,
+ self.obj_type,
+ self._print_obj_value(),
+ is_or,
+ )
+ )
+
+ def _print_obj_value(self):
+ raise NotImplementedError()
+
+ def print_obj_hex(self, prepend=""):
+ print(prepend)
+ print()
+ print(" ".join(["x{:02X}".format(b) for b in bytes(self)]))
+
+ @staticmethod
+ def parse_insns(data, attr_map):
+ ret = []
+ off = 0
+ while off + sizeof(IpFwInsn) <= len(data):
+ hdr = IpFwInsn.from_buffer_copy(data[off : off + sizeof(IpFwInsn)])
+ insn_len = (hdr.length & 0x3F) * 4
+ if off + insn_len > len(data):
+ raise ValueError("wrng length")
+ # print("GET insn type {} len {}".format(hdr.opcode, insn_len))
+ attr = attr_map.get(hdr.opcode, None)
+ if attr is None:
+ cls = InsnUnknown
+ type_enum = enum_from_int(BaseInsn.obj_enum_class, hdr.opcode)
+ else:
+ cls = attr["ad"].cls
+ type_enum = attr["ad"].val
+ insn = cls.from_bytes(data[off : off + insn_len], type_enum)
+ ret.append(insn)
+ off += insn_len
+
+ if off != len(data):
+ raise ValueError("empty space")
+ return ret
+
+
+class Insn(BaseInsn):
+ def __init__(self, opcode, is_or=False, is_not=False, arg1=0):
+ super().__init__(opcode, is_or=is_or, is_not=is_not, arg1=arg1)
+
+ @classmethod
+ def _validate(cls, data):
+ cls._validate_len(data, [4])
+
+ def __bytes__(self):
+ length = self.ilen
+ if self.is_or:
+ length |= 0x40
+ if self.is_not:
+ length | 0x80
+ insn = IpFwInsn(opcode=self.obj_type, length=length, arg1=enum_or_int(self.arg1))
+ return bytes(insn)
+
+ def _print_obj_value(self):
+ return " arg1={}".format(self.arg1)
+
+
+class InsnUnknown(Insn):
+ @classmethod
+ def _validate(cls, data):
+ cls._validate_len(data)
+
+ @classmethod
+ def _parse(cls, data):
+ self = super()._parse(data)
+ self._data = data
+ return self
+
+ def __bytes__(self):
+ return self._data
+
+ def _print_obj_value(self):
+ return " " + " ".join(["x{:02X}".format(b) for b in self._data])
+
+
+class InsnEmpty(Insn):
+ @classmethod
+ def _validate(cls, data):
+ cls._validate_len(data, [4])
+ insn = IpFwInsn.from_buffer_copy(data[:4])
+ if insn.arg1 != 0:
+ raise ValueError("arg1 should be empty")
+
+ def _print_obj_value(self):
+ return ""
+
+
+class InsnComment(Insn):
+ def __init__(self, opcode=IpFwOpcode.O_NOP, is_or=False, is_not=False, arg1=0, comment=""):
+ super().__init__(opcode, is_or=is_or, is_not=is_not, arg1=arg1)
+ if comment:
+ self.comment = comment
+ else:
+ self.comment = ""
+
+ @classmethod
+ def _validate(cls, data):
+ cls._validate_len(data)
+ if len(data) > 88:
+ raise ValueError("comment too long")
+
+ @classmethod
+ def _parse(cls, data):
+ self = super()._parse(data)
+ # Comment encoding can be anything,
+ # use utf-8 to ease debugging
+ max_len = 0
+ for b in range(4, len(data)):
+ if data[b] == b"\0":
+ break
+ max_len += 1
+ self.comment = data[4:max_len].decode("utf-8")
+ return self
+
+ def __bytes__(self):
+ ret = super().__bytes__()
+ comment_bytes = self.comment.encode("utf-8") + b"\0"
+ if len(comment_bytes) % 4 > 0:
+ comment_bytes += b"\0" * (4 - (len(comment_bytes) % 4))
+ ret += comment_bytes
+ return ret
+
+ def _print_obj_value(self):
+ return " comment='{}'".format(self.comment)
+
+
+class InsnProto(Insn):
+ def __init__(self, opcode=IpFwOpcode.O_PROTO, is_or=False, is_not=False, arg1=0):
+ super().__init__(opcode, is_or=is_or, is_not=is_not, arg1=arg1)
+
+ def _print_obj_value(self):
+ known_map = {6: "TCP", 17: "UDP", 41: "IPV6"}
+ proto = self.arg1
+ if proto in known_map:
+ return " proto={}".format(known_map[proto])
+ else:
+ return " proto=#{}".format(proto)
+
+
+class InsnU32(Insn):
+ def __init__(self, opcode, is_or=False, is_not=False, arg1=0, u32=0):
+ super().__init__(opcode, is_or=is_or, is_not=is_not, arg1=arg1)
+ self.u32 = u32
+ self.ilen = 2
+
+ @classmethod
+ def _validate(cls, data):
+ cls._validate_len(data, [8])
+
+ @classmethod
+ def _parse(cls, data):
+ self = super()._parse(data[:4])
+ self.u32 = struct.unpack("@I", data[4:8])[0]
+ return self
+
+ def __bytes__(self):
+ return super().__bytes__() + struct.pack("@I", self.u32)
+
+ def _print_obj_value(self):
+ return " arg1={} u32={}".format(self.arg1, self.u32)
+
+
+class InsnProb(InsnU32):
+ def __init__(
+ self,
+ opcode=IpFwOpcode.O_PROB,
+ is_or=False,
+ is_not=False,
+ arg1=0,
+ u32=0,
+ prob=0.0,
+ ):
+ super().__init__(opcode, is_or=is_or, is_not=is_not)
+ self.prob = prob
+
+ @property
+ def prob(self):
+ return 1.0 * self.u32 / 0x7FFFFFFF
+
+ @prob.setter
+ def prob(self, prob: float):
+ self.u32 = int(prob * 0x7FFFFFFF)
+
+ def _print_obj_value(self):
+ return " prob={}".format(round(self.prob, 5))
+
+
+class InsnIp(InsnU32):
+ def __init__(self, opcode, is_or=False, is_not=False, arg1=0, u32=0, ip=None):
+ super().__init__(opcode, is_or=is_or, is_not=is_not, arg1=arg1, u32=u32)
+ if ip:
+ self.ip = ip
+
+ @property
+ def ip(self):
+ return socket.inet_ntop(socket.AF_INET, struct.pack("@I", self.u32))
+
+ @ip.setter
+ def ip(self, ip: str):
+ ip_bin = socket.inet_pton(socket.AF_INET, ip)
+ self.u32 = struct.unpack("@I", ip_bin)[0]
+
+ def _print_opcode_value(self):
+ return " ip={}".format(self.ip)
+
+
+class InsnTable(Insn):
+ @classmethod
+ def _validate(cls, data):
+ cls._validate_len(data, [4, 8])
+
+ @classmethod
+ def _parse(cls, data):
+ self = super()._parse(data)
+
+ if len(data) == 8:
+ (self.val,) = struct.unpack("@I", data[4:8])
+ self.ilen = 2
+ else:
+ self.val = None
+ return self
+
+ def __bytes__(self):
+ ret = super().__bytes__()
+ if getattr(self, "val", None) is not None:
+ ret += struct.pack("@I", self.val)
+ return ret
+
+ def _print_obj_value(self):
+ if getattr(self, "val", None) is not None:
+ return " table={} value={}".format(self.arg1, self.val)
+ else:
+ return " table={}".format(self.arg1)
+
+
+class InsnReject(Insn):
+ def __init__(self, opcode, is_or=False, is_not=False, arg1=0, mtu=None):
+ super().__init__(opcode, is_or=is_or, is_not=is_not, arg1=arg1)
+ self.mtu = mtu
+ if self.mtu is not None:
+ self.ilen = 2
+
+ @classmethod
+ def _validate(cls, data):
+ cls._validate_len(data, [4, 8])
+
+ @classmethod
+ def _parse(cls, data):
+ self = super()._parse(data)
+
+ if len(data) == 8:
+ (self.mtu,) = struct.unpack("@I", data[4:8])
+ self.ilen = 2
+ else:
+ self.mtu = None
+ return self
+
+ def __bytes__(self):
+ ret = super().__bytes__()
+ if getattr(self, "mtu", None) is not None:
+ ret += struct.pack("@I", self.mtu)
+ return ret
+
+ def _print_obj_value(self):
+ code = enum_from_int(IcmpRejectCode, self.arg1)
+ if getattr(self, "mtu", None) is not None:
+ return " code={} mtu={}".format(code, self.mtu)
+ else:
+ return " code={}".format(code)
+
+
+class InsnPorts(Insn):
+ def __init__(self, opcode, is_or=False, is_not=False, arg1=0, port_pairs=[]):
+ super().__init__(opcode, is_or=is_or, is_not=is_not)
+ self.port_pairs = []
+ if port_pairs:
+ self.port_pairs = port_pairs
+
+ @classmethod
+ def _validate(cls, data):
+ if len(data) < 8:
+ raise ValueError("no ports specified")
+ cls._validate_len(data)
+
+ @classmethod
+ def _parse(cls, data):
+ self = super()._parse(data)
+
+ off = 4
+ port_pairs = []
+ while off + 4 <= len(data):
+ low, high = struct.unpack("@HH", data[off : off + 4])
+ port_pairs.append((low, high))
+ off += 4
+ self.port_pairs = port_pairs
+ return self
+
+ def __bytes__(self):
+ ret = super().__bytes__()
+ if getattr(self, "val", None) is not None:
+ ret += struct.pack("@I", self.val)
+ return ret
+
+ def _print_obj_value(self):
+ ret = []
+ for p in self.port_pairs:
+ if p[0] == p[1]:
+ ret.append(str(p[0]))
+ else:
+ ret.append("{}-{}".format(p[0], p[1]))
+ return " ports={}".format(",".join(ret))
+
+
+class IpFwInsnIp6(Structure):
+ _fields_ = [
+ ("o", IpFwInsn),
+ ("addr6", c_byte * 16),
+ ("mask6", c_byte * 16),
+ ]
+
+
+class InsnIp6(Insn):
+ def __init__(self, opcode, is_or=False, is_not=False, arg1=0, ip6=None, mask6=None):
+ super().__init__(opcode, is_or=is_or, is_not=is_not, arg1=arg1)
+ self.ip6 = ip6
+ self.mask6 = mask6
+ if mask6 is not None:
+ self.ilen = 9
+ else:
+ self.ilen = 5
+
+ @classmethod
+ def _validate(cls, data):
+ cls._validate_len(data, [4 + 16, 4 + 16 * 2])
+
+ @classmethod
+ def _parse(cls, data):
+ self = super()._parse(data)
+ self.ip6 = socket.inet_ntop(socket.AF_INET6, data[4:20])
+
+ if len(data) == 4 + 16 * 2:
+ self.mask6 = socket.inet_ntop(socket.AF_INET6, data[20:36])
+ self.ilen = 9
+ else:
+ self.mask6 = None
+ self.ilen = 5
+ return self
+
+ def __bytes__(self):
+ ret = super().__bytes__() + socket.inet_pton(socket.AF_INET6, self.ip6)
+ if self.mask6 is not None:
+ ret += socket.inet_pton(socket.AF_INET6, self.mask6)
+ return ret
+
+ def _print_obj_value(self):
+ if self.mask6:
+ return " ip6={}/{}".format(self.ip6, self.mask6)
+ else:
+ return " ip6={}".format(self.ip6)
+
+
+insn_attrs = prepare_attrs_map(
+ [
+ AttrDescr(IpFwOpcode.O_CHECK_STATE, Insn),
+ AttrDescr(IpFwOpcode.O_ACCEPT, InsnEmpty),
+ AttrDescr(IpFwOpcode.O_COUNT, InsnEmpty),
+
+ AttrDescr(IpFwOpcode.O_REJECT, InsnReject),
+ AttrDescr(IpFwOpcode.O_UNREACH6, Insn),
+ AttrDescr(IpFwOpcode.O_DENY, InsnEmpty),
+ AttrDescr(IpFwOpcode.O_DIVERT, Insn),
+ AttrDescr(IpFwOpcode.O_COUNT, InsnEmpty),
+ AttrDescr(IpFwOpcode.O_QUEUE, Insn),
+ AttrDescr(IpFwOpcode.O_PIPE, Insn),
+ AttrDescr(IpFwOpcode.O_SKIPTO, Insn),
+ AttrDescr(IpFwOpcode.O_NETGRAPH, Insn),
+ AttrDescr(IpFwOpcode.O_NGTEE, Insn),
+ AttrDescr(IpFwOpcode.O_DIVERT, Insn),
+ AttrDescr(IpFwOpcode.O_TEE, Insn),
+ AttrDescr(IpFwOpcode.O_CALLRETURN, Insn),
+ AttrDescr(IpFwOpcode.O_SETFIB, Insn),
+ AttrDescr(IpFwOpcode.O_SETDSCP, Insn),
+ AttrDescr(IpFwOpcode.O_REASS, InsnEmpty),
+ AttrDescr(IpFwOpcode.O_SETMARK, Insn),
+
+
+
+ AttrDescr(IpFwOpcode.O_NOP, InsnComment),
+ AttrDescr(IpFwOpcode.O_PROTO, InsnProto),
+ AttrDescr(IpFwOpcode.O_PROB, InsnProb),
+ AttrDescr(IpFwOpcode.O_IP_DST_ME, InsnEmpty),
+ AttrDescr(IpFwOpcode.O_IP_SRC_ME, InsnEmpty),
+ AttrDescr(IpFwOpcode.O_IP6_DST_ME, InsnEmpty),
+ AttrDescr(IpFwOpcode.O_IP6_SRC_ME, InsnEmpty),
+ AttrDescr(IpFwOpcode.O_IP_SRC, InsnIp),
+ AttrDescr(IpFwOpcode.O_IP_DST, InsnIp),
+ AttrDescr(IpFwOpcode.O_IP6_DST, InsnIp6),
+ AttrDescr(IpFwOpcode.O_IP6_SRC, InsnIp6),
+ AttrDescr(IpFwOpcode.O_IP_SRC_LOOKUP, InsnTable),
+ AttrDescr(IpFwOpcode.O_IP_DST_LOOKUP, InsnTable),
+ AttrDescr(IpFwOpcode.O_IP_SRCPORT, InsnPorts),
+ AttrDescr(IpFwOpcode.O_IP_DSTPORT, InsnPorts),
+ AttrDescr(IpFwOpcode.O_PROBE_STATE, Insn),
+ AttrDescr(IpFwOpcode.O_KEEP_STATE, Insn),
+ ]
+)
diff --git a/tests/atf_python/sys/netpfil/ipfw/ioctl.py b/tests/atf_python/sys/netpfil/ipfw/ioctl.py
new file mode 100644
--- /dev/null
+++ b/tests/atf_python/sys/netpfil/ipfw/ioctl.py
@@ -0,0 +1,505 @@
+#!/usr/bin/env python3
+import os
+import socket
+import struct
+import subprocess
+import sys
+from ctypes import c_byte
+from ctypes import c_char
+from ctypes import c_int
+from ctypes import c_long
+from ctypes import c_uint32
+from ctypes import c_uint8
+from ctypes import c_ulong
+from ctypes import c_ushort
+from ctypes import sizeof
+from ctypes import Structure
+from enum import Enum
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import NamedTuple
+from typing import Optional
+from typing import Union
+
+import pytest
+from atf_python.sys.netpfil.ipfw.insns import BaseInsn
+from atf_python.sys.netpfil.ipfw.insns import insn_attrs
+from atf_python.sys.netpfil.ipfw.ioctl_headers import IpFwTableLookupType
+from atf_python.sys.netpfil.ipfw.ioctl_headers import IpFwTlvType
+from atf_python.sys.netpfil.ipfw.ioctl_headers import Op3CmdType
+from atf_python.sys.netpfil.ipfw.utils import align8
+from atf_python.sys.netpfil.ipfw.utils import AttrDescr
+from atf_python.sys.netpfil.ipfw.utils import enum_from_int
+from atf_python.sys.netpfil.ipfw.utils import prepare_attrs_map
+
+
+class IpFw3OpHeader(Structure):
+ _fields_ = [
+ ("opcode", c_ushort),
+ ("version", c_ushort),
+ ("reserved1", c_ushort),
+ ("reserved2", c_ushort),
+ ]
+
+
+class IpFwObjTlv(Structure):
+ _fields_ = [
+ ("n_type", c_ushort),
+ ("flags", c_ushort),
+ ("length", c_uint32),
+ ]
+
+
+class BaseTlv(object):
+ obj_enum_class = IpFwTlvType
+
+ def __init__(self, obj_type):
+ if isinstance(obj_type, Enum):
+ self.obj_type = obj_type.value
+ self._enum = obj_type
+ else:
+ self.obj_type = obj_type
+ self._enum = enum_from_int(self.obj_enum_class, obj_type)
+ self.obj_list = []
+
+ def add_obj(self, obj):
+ self.obj_list.append(obj)
+
+ @property
+ def len(self):
+ return len(bytes(self))
+
+ @property
+ def obj_name(self):
+ if self._enum is not None:
+ return self._enum.name
+ else:
+ return "tlv#{}".format(self.obj_type)
+
+ def print_hdr(self, prepend=""):
+ print(
+ "{}len={} type={}({}){}".format(
+ prepend, self.len, self.obj_name, self.obj_type, self._print_obj_value()
+ )
+ )
+
+ def print_obj(self, prepend=""):
+ self.print_hdr(prepend)
+ prepend = " " + prepend
+ for obj in self.obj_list:
+ obj.print_obj(prepend)
+
+ @classmethod
+ def _validate(cls, data):
+ if len(data) < sizeof(IpFwObjTlv):
+ raise ValueError("TLV too short")
+ hdr = IpFwObjTlv.from_buffer_copy(data[: sizeof(IpFwObjTlv)])
+ if len(data) != hdr.length:
+ raise ValueError("wrong TLV size")
+
+ @classmethod
+ def _parse(cls, data, attr_map):
+ hdr = IpFwObjTlv.from_buffer_copy(data[: sizeof(IpFwObjTlv)])
+ return cls(hdr.n_type)
+
+ @classmethod
+ def from_bytes(cls, data, attr_map=None):
+ cls._validate(data)
+ obj = cls._parse(data, attr_map)
+ return obj
+
+ def __bytes__(self):
+ raise NotImplementedError()
+
+ def _print_obj_value(self):
+ return " " + " ".join(
+ ["x{:02X}".format(b) for b in self._data[sizeof(IpFwObjTlv) :]]
+ )
+
+ def as_hexdump(self):
+ return " ".join(["x{:02X}".format(b) for b in bytes(self)])
+
+
+class UnknownTlv(BaseTlv):
+ def __init__(self, obj_type, data):
+ super().__init__(obj_type)
+ self._data = data
+
+ @classmethod
+ def _validate(cls, data):
+ if len(data) < sizeof(IpFwObjNTlv):
+ raise ValueError("TLV size is too short")
+ hdr = IpFwObjTlv.from_buffer_copy(data[: sizeof(IpFwObjTlv)])
+ if len(data) != hdr.length:
+ raise ValueError("wrong TLV size")
+
+ @classmethod
+ def _parse(cls, data, attr_map):
+ hdr = IpFwObjTlv.from_buffer_copy(data[: sizeof(IpFwObjTlv)])
+ self = cls(hdr.n_type, data)
+ return self
+
+ def __bytes__(self):
+ return self._data
+
+
+class Tlv(BaseTlv):
+ @staticmethod
+ def parse_tlvs(data, attr_map):
+ # print("PARSING " + " ".join(["x{:02X}".format(b) for b in data]))
+ off = 0
+ ret = []
+ while off + sizeof(IpFwObjTlv) <= len(data):
+ hdr = IpFwObjTlv.from_buffer_copy(data[off : off + sizeof(IpFwObjTlv)])
+ if off + hdr.length > len(data):
+ raise ValueError("TLV size do not match")
+ obj_data = data[off : off + hdr.length]
+ obj_descr = attr_map.get(hdr.n_type, None)
+ if obj_descr is None:
+ # raise ValueError("unknown child TLV {}".format(hdr.n_type))
+ cls = UnknownTlv
+ child_map = {}
+ else:
+ cls = obj_descr["ad"].cls
+ child_map = obj_descr.get("child", {})
+ # print("FOUND OBJECT type {}".format(cls))
+ # print()
+ obj = cls.from_bytes(obj_data, child_map)
+ ret.append(obj)
+ off += hdr.length
+ return ret
+
+
+class IpFwObjNTlv(Structure):
+ _fields_ = [
+ ("head", IpFwObjTlv),
+ ("idx", c_ushort),
+ ("n_set", c_uint8),
+ ("n_type", c_uint8),
+ ("spare", c_uint32),
+ ("name", c_char * 64),
+ ]
+
+
+class NTlv(Tlv):
+ def __init__(self, obj_type, idx=0, n_set=0, n_type=0, name=None):
+ super().__init__(obj_type)
+ self.n_idx = idx
+ self.n_set = n_set
+ self.n_type = n_type
+ self.n_name = name
+
+ @classmethod
+ def _validate(cls, data):
+ if len(data) != sizeof(IpFwObjNTlv):
+ raise ValueError("TLV size is not correct")
+ hdr = IpFwObjTlv.from_buffer_copy(data[: sizeof(IpFwObjTlv)])
+ if len(data) != hdr.length:
+ raise ValueError("wrong TLV size")
+
+ @classmethod
+ def _parse(cls, data, attr_map):
+ hdr = IpFwObjNTlv.from_buffer_copy(data[: sizeof(IpFwObjNTlv)])
+ name = hdr.name.decode("utf-8")
+ self = cls(hdr.head.n_type, hdr.idx, hdr.n_set, hdr.n_type, name)
+ return self
+
+ def __bytes__(self):
+ name_bytes = self.n_name.encode("utf-8")
+ if len(name_bytes) < 64:
+ name_bytes += b"\0" * (64 - len(name_bytes))
+ hdr = IpFwObjNTlv(
+ head=IpFwObjTlv(n_type=self.obj_type, length=sizeof(IpFwObjNTlv)),
+ idx=self.n_idx,
+ n_set=self.n_set,
+ n_type=self.n_type,
+ name=name_bytes[:64],
+ )
+ return bytes(hdr)
+
+ def _print_obj_value(self):
+ return " " + "type={} set={} idx={} name={}".format(
+ self.n_type, self.n_set, self.n_idx, self.n_name
+ )
+
+
+class IpFwObjCTlv(Structure):
+ _fields_ = [
+ ("head", IpFwObjTlv),
+ ("count", c_uint32),
+ ("objsize", c_ushort),
+ ("version", c_uint8),
+ ("flags", c_uint8),
+ ]
+
+
+class CTlv(Tlv):
+ def __init__(self, obj_type, obj_list=[]):
+ super().__init__(obj_type)
+ if obj_list:
+ self.obj_list.extend(obj_list)
+
+ @classmethod
+ def _validate(cls, data):
+ if len(data) < sizeof(IpFwObjCTlv):
+ raise ValueError("TLV too short")
+ hdr = IpFwObjCTlv.from_buffer_copy(data[: sizeof(IpFwObjCTlv)])
+ if len(data) != hdr.head.length:
+ raise ValueError("wrong TLV size")
+
+ @classmethod
+ def _parse(cls, data, attr_map):
+ hdr = IpFwObjCTlv.from_buffer_copy(data[: sizeof(IpFwObjCTlv)])
+ tlv_list = cls.parse_tlvs(data[sizeof(IpFwObjCTlv) :], attr_map)
+ if len(tlv_list) != hdr.count:
+ raise ValueError("wrong number of objects")
+ self = cls(hdr.head.n_type, obj_list=tlv_list)
+ return self
+
+ def __bytes__(self):
+ ret = b""
+ for obj in self.obj_list:
+ ret += bytes(obj)
+ length = len(ret) + sizeof(IpFwObjCTlv)
+ if self.obj_list:
+ objsize = len(bytes(self.obj_list[0]))
+ else:
+ objsize = 0
+ hdr = IpFwObjCTlv(
+ head=IpFwObjTlv(n_type=self.obj_type, length=sizeof(IpFwObjNTlv)),
+ count=len(self.obj_list),
+ objsize=objsize,
+ )
+ return bytes(hdr) + ret
+
+ def _print_obj_value(self):
+ return ""
+
+
+class IpFwRule(Structure):
+ _fields_ = [
+ ("act_ofs", c_ushort),
+ ("cmd_len", c_ushort),
+ ("spare", c_ushort),
+ ("n_set", c_uint8),
+ ("flags", c_uint8),
+ ("rulenum", c_uint32),
+ ("n_id", c_uint32),
+ ]
+
+
+class RawRule(Tlv):
+ def __init__(self, obj_type=0, n_set=0, rulenum=0, obj_list=[]):
+ super().__init__(obj_type)
+ self.n_set = n_set
+ self.rulenum = rulenum
+ if obj_list:
+ self.obj_list.extend(obj_list)
+
+ @classmethod
+ def _validate(cls, data):
+ min_size = sizeof(IpFwRule)
+ if len(data) < min_size:
+ raise ValueError("rule TLV too short")
+ rule = IpFwRule.from_buffer_copy(data[:min_size])
+ if len(data) != min_size + rule.cmd_len * 4:
+ raise ValueError("rule TLV cmd_len incorrect")
+
+ @classmethod
+ def _parse(cls, data, attr_map):
+ hdr = IpFwRule.from_buffer_copy(data[: sizeof(IpFwRule)])
+ self = cls(
+ n_set=hdr.n_set,
+ rulenum=hdr.rulenum,
+ obj_list=BaseInsn.parse_insns(data[sizeof(IpFwRule) :], insn_attrs),
+ )
+ return self
+
+ def __bytes__(self):
+ act_ofs = 0
+ cmd_len = 0
+ ret = b""
+ for obj in self.obj_list:
+ if obj.is_action and act_ofs == 0:
+ act_ofs = cmd_len
+ obj_bytes = bytes(obj)
+ cmd_len += len(obj_bytes) // 4
+ ret += obj_bytes
+
+ hdr = IpFwRule(
+ act_ofs=act_ofs,
+ cmd_len=cmd_len,
+ n_set=self.n_set,
+ rulenum=self.rulenum,
+ )
+ return bytes(hdr) + ret
+
+ @property
+ def obj_name(self):
+ return "rule#{}".format(self.rulenum)
+
+ def _print_obj_value(self):
+ cmd_len = sum([len(bytes(obj)) for obj in self.obj_list]) // 4
+ return " set={} cmd_len={}".format(self.n_set, cmd_len)
+
+
+class CTlvRule(CTlv):
+ def __init__(self, obj_type=IpFwTlvType.IPFW_TLV_RULE_LIST, obj_list=[]):
+ super().__init__(obj_type, obj_list)
+
+ @classmethod
+ def _parse(cls, data, attr_map):
+ chdr = IpFwObjCTlv.from_buffer_copy(data[: sizeof(IpFwObjCTlv)])
+ rule_list = []
+ off = sizeof(IpFwObjCTlv)
+ while off + sizeof(IpFwRule) <= len(data):
+ hdr = IpFwRule.from_buffer_copy(data[off : off + sizeof(IpFwRule)])
+ rule_len = sizeof(IpFwRule) + hdr.cmd_len * 4
+ # print("FOUND RULE len={} cmd_len={}".format(rule_len, hdr.cmd_len))
+ if off + rule_len > len(data):
+ raise ValueError("wrong rule size")
+ rule = RawRule.from_bytes(data[off : off + rule_len])
+ rule_list.append(rule)
+ off += align8(rule_len)
+ if off != len(data):
+ raise ValueError("rule bytes left: off={} len={}".format(off, len(data)))
+ return cls(chdr.head.n_type, obj_list=rule_list)
+
+ # XXX: _validate
+
+ def __bytes__(self):
+ ret = b""
+ for rule in self.obj_list:
+ rule_bytes = bytes(rule)
+ remainder = len(rule_bytes) % 8
+ if remainder > 0:
+ rule_bytes += b"\0" * (8 - remainder)
+ ret += rule_bytes
+ hdr = IpFwObjCTlv(
+ head=IpFwObjTlv(
+ n_type=self.obj_type, length=len(ret) + sizeof(IpFwObjCTlv)
+ ),
+ count=len(self.obj_list),
+ )
+ return bytes(hdr) + ret
+
+
+class BaseIpFwMessage(object):
+ messages = []
+
+ def __init__(self, msg_type, obj_list=[]):
+ if isinstance(msg_type, Enum):
+ self.obj_type = msg_type.value
+ self._enum = msg_type
+ else:
+ self.obj_type = msg_type
+ self._enum = enum_from_int(self.messages, self.obj_type)
+ self.obj_list = []
+ if obj_list:
+ self.obj_list.extend(obj_list)
+
+ def add_obj(self, obj):
+ self.obj_list.append(obj)
+
+ def get_obj(self, obj_type):
+ obj_type_raw = enum_or_int(obj_type)
+ for obj in self.obj_list:
+ if obj.obj_type == obj_type_raw:
+ return obj
+ return None
+
+ @staticmethod
+ def parse_header(data: bytes):
+ if len(data) < sizeof(IpFw3OpHeader):
+ raise ValueError("length less than op3 message header")
+ return IpFw3OpHeader.from_buffer_copy(data), sizeof(IpFw3OpHeader)
+
+ def parse_obj_list(self, data: bytes):
+ off = 0
+ while off < len(data):
+ # print("PARSE off={} rem={}".format(off, len(data) - off))
+ hdr = IpFwObjTlv.from_buffer_copy(data[off : off + sizeof(IpFwObjTlv)])
+ # print(" tlv len {}".format(hdr.length))
+ if hdr.length + off > len(data):
+ raise ValueError("TLV too big")
+ tlv = Tlv(hdr.n_type, data[off : off + hdr.length])
+ self.add_obj(tlv)
+ off += hdr.length
+
+ def is_type(self, msg_type):
+ return enum_or_int(msg_type) == self.msg_type
+
+ @property
+ def obj_name(self):
+ if self._enum is not None:
+ return self._enum.name
+ else:
+ return "msg#{}".format(self.obj_type)
+
+ def print_hdr(self, prepend=""):
+ print("{}len={}, type={}".format(prepend, len(bytes(self)), self.obj_name))
+
+ @classmethod
+ def from_bytes(cls, data):
+ try:
+ hdr, hdrlen = cls.parse_header(data)
+ self = cls(hdr.opcode)
+ self._orig_data = data
+ except ValueError as e:
+ print("Failed to parse op3 header: {}".format(e))
+ cls.print_as_bytes(data)
+ raise
+ tlv_list = Tlv.parse_tlvs(data[hdrlen:], self.attr_map)
+ self.obj_list.extend(tlv_list)
+ return self
+
+ def __bytes__(self):
+ ret = bytes(IpFw3OpHeader(opcode=self.obj_type))
+ for obj in self.obj_list:
+ ret += bytes(obj)
+ return ret
+
+ def print_obj(self):
+ self.print_hdr()
+ for obj in self.obj_list:
+ obj.print_obj(" ")
+
+ @staticmethod
+ def print_as_bytes(data: bytes, descr: str):
+ print("===vv {} (len:{:3d}) vv===".format(descr, len(data)))
+ off = 0
+ step = 16
+ while off < len(data):
+ for i in range(step):
+ if off + i < len(data):
+ print(" {:02X}".format(data[off + i]), end="")
+ print("")
+ off += step
+ print("--------------------")
+
+
+rule_attrs = prepare_attrs_map(
+ [
+ AttrDescr(
+ IpFwTlvType.IPFW_TLV_TBLNAME_LIST,
+ CTlv,
+ [
+ AttrDescr(IpFwTlvType.IPFW_TLV_TBL_NAME, NTlv),
+ AttrDescr(IpFwTlvType.IPFW_TLV_STATE_NAME, NTlv),
+ ],
+ True,
+ ),
+ AttrDescr(IpFwTlvType.IPFW_TLV_RULE_LIST, CTlvRule),
+ ]
+)
+
+
+class IpFwXRule(BaseIpFwMessage):
+ messages = [Op3CmdType.IP_FW_XADD]
+ attr_map = rule_attrs
+
+
+legacy_classes = []
+set3_classes = []
+get3_classes = [IpFwXRule]
diff --git a/tests/atf_python/sys/netpfil/ipfw/ioctl_headers.py b/tests/atf_python/sys/netpfil/ipfw/ioctl_headers.py
new file mode 100644
--- /dev/null
+++ b/tests/atf_python/sys/netpfil/ipfw/ioctl_headers.py
@@ -0,0 +1,90 @@
+from enum import Enum
+
+
+class Op3CmdType(Enum):
+ IP_FW_TABLE_XADD = 86
+ IP_FW_TABLE_XDEL = 87
+ IP_FW_TABLE_XGETSIZE = 88
+ IP_FW_TABLE_XLIST = 89
+ IP_FW_TABLE_XDESTROY = 90
+ IP_FW_TABLES_XLIST = 92
+ IP_FW_TABLE_XINFO = 93
+ IP_FW_TABLE_XFLUSH = 94
+ IP_FW_TABLE_XCREATE = 95
+ IP_FW_TABLE_XMODIFY = 96
+ IP_FW_XGET = 97
+ IP_FW_XADD = 98
+ IP_FW_XDEL = 99
+ IP_FW_XMOVE = 100
+ IP_FW_XZERO = 101
+ IP_FW_XRESETLOG = 102
+ IP_FW_SET_SWAP = 103
+ IP_FW_SET_MOVE = 104
+ IP_FW_SET_ENABLE = 105
+ IP_FW_TABLE_XFIND = 106
+ IP_FW_XIFLIST = 107
+ IP_FW_TABLES_ALIST = 108
+ IP_FW_TABLE_XSWAP = 109
+ IP_FW_TABLE_VLIST = 110
+ IP_FW_NAT44_XCONFIG = 111
+ IP_FW_NAT44_DESTROY = 112
+ IP_FW_NAT44_XGETCONFIG = 113
+ IP_FW_NAT44_LIST_NAT = 114
+ IP_FW_NAT44_XGETLOG = 115
+ IP_FW_DUMP_SOPTCODES = 116
+ IP_FW_DUMP_SRVOBJECTS = 117
+ IP_FW_NAT64STL_CREATE = 130
+ IP_FW_NAT64STL_DESTROY = 131
+ IP_FW_NAT64STL_CONFIG = 132
+ IP_FW_NAT64STL_LIST = 133
+ IP_FW_NAT64STL_STATS = 134
+ IP_FW_NAT64STL_RESET_STATS = 135
+ IP_FW_NAT64LSN_CREATE = 140
+ IP_FW_NAT64LSN_DESTROY = 141
+ IP_FW_NAT64LSN_CONFIG = 142
+ IP_FW_NAT64LSN_LIST = 143
+ IP_FW_NAT64LSN_STATS = 144
+ IP_FW_NAT64LSN_LIST_STATES = 145
+ IP_FW_NAT64LSN_RESET_STATS = 146
+ IP_FW_NPTV6_CREATE = 150
+ IP_FW_NPTV6_DESTROY = 151
+ IP_FW_NPTV6_CONFIG = 152
+ IP_FW_NPTV6_LIST = 153
+ IP_FW_NPTV6_STATS = 154
+ IP_FW_NPTV6_RESET_STATS = 155
+ IP_FW_NAT64CLAT_CREATE = 160
+ IP_FW_NAT64CLAT_DESTROY = 161
+ IP_FW_NAT64CLAT_CONFIG = 162
+ IP_FW_NAT64CLAT_LIST = 163
+ IP_FW_NAT64CLAT_STATS = 164
+ IP_FW_NAT64CLAT_RESET_STATS = 165
+
+
+class IpFwTableLookupType(Enum):
+ LOOKUP_DST_IP = 0
+ LOOKUP_SRC_IP = 1
+ LOOKUP_DST_PORT = 2
+ LOOKUP_SRC_PORT = 3
+ LOOKUP_UID = 4
+ LOOKUP_JAIL = 5
+ LOOKUP_DSCP = 6
+ LOOKUP_DST_MAC = 7
+ LOOKUP_SRC_MAC = 8
+ LOOKUP_MARK = 9
+
+
+class IpFwTlvType(Enum):
+ IPFW_TLV_TBL_NAME = 1
+ IPFW_TLV_TBLNAME_LIST = 2
+ IPFW_TLV_RULE_LIST = 3
+ IPFW_TLV_DYNSTATE_LIST = 4
+ IPFW_TLV_TBL_ENT = 5
+ IPFW_TLV_DYN_ENT = 6
+ IPFW_TLV_RULE_ENT = 7
+ IPFW_TLV_TBLENT_LIST = 8
+ IPFW_TLV_RANGE = 9
+ IPFW_TLV_EACTION = 10
+ IPFW_TLV_COUNTERS = 11
+ IPFW_TLV_OBJDATA = 12
+ IPFW_TLV_STATE_NAME = 14
+ IPFW_TLV_EACTION_BASE = 1000
diff --git a/tests/atf_python/sys/netpfil/ipfw/ipfw.py b/tests/atf_python/sys/netpfil/ipfw/ipfw.py
new file mode 100644
--- /dev/null
+++ b/tests/atf_python/sys/netpfil/ipfw/ipfw.py
@@ -0,0 +1,118 @@
+#!/usr/bin/env python3
+import os
+import socket
+import struct
+import subprocess
+import sys
+from ctypes import c_byte
+from ctypes import c_char
+from ctypes import c_int
+from ctypes import c_long
+from ctypes import c_uint32
+from ctypes import c_uint8
+from ctypes import c_ulong
+from ctypes import c_ushort
+from ctypes import sizeof
+from ctypes import Structure
+from enum import Enum
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import NamedTuple
+from typing import Optional
+from typing import Union
+
+from atf_python.sys.netpfil.ipfw.ioctl import get3_classes
+from atf_python.sys.netpfil.ipfw.ioctl import legacy_classes
+from atf_python.sys.netpfil.ipfw.ioctl import set3_classes
+from atf_python.sys.netpfil.ipfw.utils import AttrDescr
+from atf_python.sys.netpfil.ipfw.utils import enum_from_int
+from atf_python.sys.netpfil.ipfw.utils import enum_or_int
+from atf_python.sys.netpfil.ipfw.utils import prepare_attrs_map
+
+
+class DebugHeader(Structure):
+ _fields_ = [
+ ("cmd_type", c_ushort),
+ ("spare1", c_ushort),
+ ("opt_name", c_uint32),
+ ("total_len", c_uint32),
+ ("spare2", c_uint32),
+ ]
+
+
+class DebugType(Enum):
+ DO_CMD = 1
+ DO_SET3 = 2
+ DO_GET3 = 3
+
+
+class DebugIoReader(object):
+ HANDLER_CLASSES = {
+ DebugType.DO_CMD: legacy_classes,
+ DebugType.DO_SET3: set3_classes,
+ DebugType.DO_GET3: get3_classes,
+ }
+
+ def __init__(self, ipfw_path):
+ self._msgmap = self.build_msgmap()
+ self.ipfw_path = ipfw_path
+
+ def build_msgmap(self):
+ xmap = {}
+ for debug_type, handler_classes in self.HANDLER_CLASSES.items():
+ debug_type = enum_or_int(debug_type)
+ if debug_type not in xmap:
+ xmap[debug_type] = {}
+ for handler_class in handler_classes:
+ for msg in handler_class.messages:
+ xmap[debug_type][enum_or_int(msg)] = handler_class
+ return xmap
+
+ def print_obj_header(self, hdr):
+ debug_type = "#{}".format(hdr.cmd_type)
+ for _type in self.HANDLER_CLASSES.keys():
+ if _type.value == hdr.cmd_type:
+ debug_type = _type.name.lower()
+ break
+ print(
+ "@@ record for {} len={} optname={}".format(
+ debug_type, hdr.total_len, hdr.opt_name
+ )
+ )
+
+ def parse_record(self, data):
+ hdr = DebugHeader.from_buffer_copy(data[: sizeof(DebugHeader)])
+ data = data[sizeof(DebugHeader) :]
+ cls = self._msgmap[hdr.cmd_type].get(hdr.opt_name)
+ if cls is not None:
+ return cls.from_bytes(data)
+ raise ValueError(
+ "unsupported cmd_type={} opt_name={}".format(hdr.cmd_type, hdr.opt_name)
+ )
+
+ def get_record_from_stdin(self):
+ data = sys.stdin.buffer.peek(sizeof(DebugHeader))
+ if len(data) == 0:
+ return None
+
+ hdr = DebugHeader.from_buffer_copy(data)
+ data = sys.stdin.buffer.read(hdr.total_len)
+ return self.parse_record(data)
+
+ def get_records_from_buffer(self, data):
+ off = 0
+ ret = []
+ while off + sizeof(DebugHeader) <= len(data):
+ hdr = DebugHeader.from_buffer_copy(data[off : off + sizeof(DebugHeader)])
+ ret.append(self.parse_record(data[off : off + hdr.total_len]))
+ off += hdr.total_len
+ return ret
+
+ def run_ipfw(self, cmd: str) -> bytes:
+ args = [self.ipfw_path, "-xqn"] + cmd.split()
+ r = subprocess.run(args, capture_output=True)
+ return r.stdout
+
+ def get_records(self, cmd: str):
+ return self.get_records_from_buffer(self.run_ipfw(cmd))
diff --git a/tests/atf_python/sys/netpfil/ipfw/utils.py b/tests/atf_python/sys/netpfil/ipfw/utils.py
new file mode 100644
--- /dev/null
+++ b/tests/atf_python/sys/netpfil/ipfw/utils.py
@@ -0,0 +1,61 @@
+#!/usr/bin/env python3
+
+import os
+import socket
+import struct
+import subprocess
+import sys
+from enum import Enum
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Union
+from typing import Any
+from typing import NamedTuple
+import pytest
+
+
+def roundup2(val: int, num: int) -> int:
+ if val % num:
+ return (val | (num - 1)) + 1
+ else:
+ return val
+
+
+def align8(val: int) -> int:
+ return roundup2(val, 8)
+
+
+def enum_or_int(val) -> int:
+ if isinstance(val, Enum):
+ return val.value
+ return val
+
+
+def enum_from_int(enum_class: Enum, val) -> Enum:
+ if isinstance(val, Enum):
+ return val
+ for item in enum_class:
+ if val == item.value:
+ return item
+ return None
+
+
+class AttrDescr(NamedTuple):
+ val: Enum
+ cls: Any
+ child_map: Any = None
+ is_array: bool = False
+
+
+def prepare_attrs_map(attrs: List[AttrDescr]) -> Dict[str, Dict]:
+ ret = {}
+ for ad in attrs:
+ ret[ad.val.value] = {"ad": ad}
+ if ad.child_map:
+ ret[ad.val.value]["child"] = prepare_attrs_map(ad.child_map)
+ ret[ad.val.value]["is_array"] = ad.is_array
+ return ret
+
+
+
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Mon, Oct 20, 6:17 PM (12 h, 20 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
23979163
Default Alt Text
D40488.diff (64 KB)
Attached To
Mode
D40488: ipfw(8): add ioctl/instruction generation tests
Attached
Detach File
Event Timeline
Log In to Comment