diff --git a/sbin/ipfw/tests/test_add_rule.py b/sbin/ipfw/tests/test_add_rule.py index 60c8cebaceaa..c2c4bf0b360c 100755 --- a/sbin/ipfw/tests/test_add_rule.py +++ b/sbin/ipfw/tests/test_add_rule.py @@ -1,492 +1,493 @@ import errno import json import os import socket import struct import subprocess import sys from ctypes import c_byte from ctypes import c_char from ctypes import c_int from ctypes import c_long from ctypes import c_uint32 from ctypes import c_uint8 from ctypes import c_ulong from ctypes import c_ushort from ctypes import sizeof from ctypes import Structure from enum import Enum from typing import Any from typing import Dict from typing import List from typing import NamedTuple from typing import Optional from typing import Union import pytest from atf_python.sys.netpfil.ipfw.insns import Icmp6RejectCode from atf_python.sys.netpfil.ipfw.insns import IcmpRejectCode from atf_python.sys.netpfil.ipfw.insns import Insn from atf_python.sys.netpfil.ipfw.insns import InsnComment from atf_python.sys.netpfil.ipfw.insns import InsnEmpty from atf_python.sys.netpfil.ipfw.insns import InsnIp from atf_python.sys.netpfil.ipfw.insns import InsnIp6 from atf_python.sys.netpfil.ipfw.insns import InsnPorts from atf_python.sys.netpfil.ipfw.insns import InsnProb from atf_python.sys.netpfil.ipfw.insns import InsnProto from atf_python.sys.netpfil.ipfw.insns import InsnReject from atf_python.sys.netpfil.ipfw.insns import InsnTable +from atf_python.sys.netpfil.ipfw.insns import InsnU32 from atf_python.sys.netpfil.ipfw.insns import IpFwOpcode from atf_python.sys.netpfil.ipfw.ioctl import CTlv from atf_python.sys.netpfil.ipfw.ioctl import CTlvRule from atf_python.sys.netpfil.ipfw.ioctl import IpFwTlvType from atf_python.sys.netpfil.ipfw.ioctl import IpFwXRule from atf_python.sys.netpfil.ipfw.ioctl import NTlv from atf_python.sys.netpfil.ipfw.ioctl import Op3CmdType from atf_python.sys.netpfil.ipfw.ioctl import RawRule from atf_python.sys.netpfil.ipfw.ipfw import DebugIoReader from atf_python.sys.netpfil.ipfw.utils import enum_from_int from atf_python.utils import BaseTest IPFW_PATH = "/sbin/ipfw" def differ(w_obj, g_obj, w_stack=[], g_stack=[]): if bytes(w_obj) == bytes(g_obj): return True num_objects = 0 for i, w_child in enumerate(w_obj.obj_list): if i >= len(g_obj.obj_list): print("MISSING object from chain {}".format(" / ".join(w_stack))) w_child.print_obj() print("==========================") return False g_child = g_obj.obj_list[i] if bytes(w_child) == bytes(g_child): num_objects += 1 continue w_stack.append(w_obj.obj_name) g_stack.append(g_obj.obj_name) if not differ(w_child, g_child, w_stack, g_stack): return False break if num_objects == len(w_obj.obj_list) and num_objects < len(g_obj.obj_list): g_child = g_obj.obj_list[num_objects] print("EXTRA object from chain {}".format(" / ".join(g_stack))) g_child.print_obj() print("==========================") return False print("OBJECTS DIFFER") print("WANTED CHAIN: {}".format(" / ".join(w_stack))) w_obj.print_obj() w_obj.print_obj_hex() print("==========================") print("GOT CHAIN: {}".format(" / ".join(g_stack))) g_obj.print_obj() g_obj.print_obj_hex() print("==========================") return False class TestAddRule(BaseTest): def compile_rule(self, out): tlvs = [] if "objs" in out: tlvs.append(CTlv(IpFwTlvType.IPFW_TLV_TBLNAME_LIST, out["objs"])) rule = RawRule(rulenum=out.get("rulenum", 0), obj_list=out["insns"]) tlvs.append(CTlvRule(obj_list=[rule])) return IpFwXRule(Op3CmdType.IP_FW_XADD, tlvs) def verify_rule(self, in_data: str, out_data): # Prepare the desired output expected = self.compile_rule(out_data) reader = DebugIoReader(IPFW_PATH) ioctls = reader.get_records(in_data) assert len(ioctls) == 1 # Only 1 ioctl request expected got = ioctls[0] if not differ(expected, got): print("=> CMD: {}".format(in_data)) print("=> WANTED:") expected.print_obj() print("==========================") print("=> GOT:") got.print_obj() print("==========================") assert bytes(got) == bytes(expected) @pytest.mark.parametrize( "rule", [ pytest.param( { "in": "add 200 allow ip from any to any", "out": { "insns": [InsnEmpty(IpFwOpcode.O_ACCEPT)], "rulenum": 200, }, }, id="test_rulenum", ), pytest.param( { "in": "add allow ip from { 1.2.3.4 or 2.3.4.5 } to any", "out": { "insns": [ InsnIp(IpFwOpcode.O_IP_SRC, ip="1.2.3.4", is_or=True), InsnIp(IpFwOpcode.O_IP_SRC, ip="2.3.4.5"), InsnEmpty(IpFwOpcode.O_ACCEPT), ], }, }, id="test_or", ), pytest.param( { "in": "add allow ip from table(AAA) to table(BBB)", "out": { "objs": [ NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"), NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"), ], "insns": [ - InsnTable(IpFwOpcode.O_IP_SRC_LOOKUP, arg1=1), - InsnTable(IpFwOpcode.O_IP_DST_LOOKUP, arg1=2), + InsnU32(IpFwOpcode.O_IP_SRC_LOOKUP, u32=1), + InsnU32(IpFwOpcode.O_IP_DST_LOOKUP, u32=2), InsnEmpty(IpFwOpcode.O_ACCEPT), ], }, }, id="test_tables", ), pytest.param( { "in": "add allow ip from any to 1.2.3.4 // test comment", "out": { "insns": [ InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"), InsnComment(comment="test comment"), InsnEmpty(IpFwOpcode.O_ACCEPT), ], }, }, id="test_comment", ), pytest.param( { "in": "add tcp-setmss 123 ip from any to 1.2.3.4", "out": { "objs": [ NTlv(IpFwTlvType.IPFW_TLV_EACTION, idx=1, name="tcp-setmss"), ], "insns": [ InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"), - Insn(IpFwOpcode.O_EXTERNAL_ACTION, arg1=1), + InsnU32(IpFwOpcode.O_EXTERNAL_ACTION, u32=1), Insn(IpFwOpcode.O_EXTERNAL_DATA, arg1=123), ], }, }, id="test_eaction_tcp-setmss", ), pytest.param( { "in": "add eaction ntpv6 AAA ip from any to 1.2.3.4", "out": { "objs": [ NTlv(IpFwTlvType.IPFW_TLV_EACTION, idx=1, name="ntpv6"), NTlv(0, idx=2, name="AAA"), ], "insns": [ InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"), - Insn(IpFwOpcode.O_EXTERNAL_ACTION, arg1=1), - Insn(IpFwOpcode.O_EXTERNAL_INSTANCE, arg1=2), + InsnU32(IpFwOpcode.O_EXTERNAL_ACTION, u32=1), + InsnU32(IpFwOpcode.O_EXTERNAL_INSTANCE, u32=2), ], }, }, id="test_eaction_ntp", ), pytest.param( { "in": "add // test comment", "out": { "insns": [ InsnComment(comment="test comment"), Insn(IpFwOpcode.O_COUNT), ], }, }, id="test_action_comment", ), pytest.param( { "in": "add check-state :OUT // test comment", "out": { "objs": [ NTlv(IpFwTlvType.IPFW_TLV_STATE_NAME, idx=1, name="OUT"), ], "insns": [ InsnComment(comment="test comment"), - Insn(IpFwOpcode.O_CHECK_STATE, arg1=1), + InsnU32(IpFwOpcode.O_CHECK_STATE, u32=1), ], }, }, id="test_check_state", ), pytest.param( { "in": "add allow tcp from any to any keep-state :OUT", "out": { "objs": [ NTlv(IpFwTlvType.IPFW_TLV_STATE_NAME, idx=1, name="OUT"), ], "insns": [ - Insn(IpFwOpcode.O_PROBE_STATE, arg1=1), + InsnU32(IpFwOpcode.O_PROBE_STATE, u32=1), Insn(IpFwOpcode.O_PROTO, arg1=6), - Insn(IpFwOpcode.O_KEEP_STATE, arg1=1), + InsnU32(IpFwOpcode.O_KEEP_STATE, u32=1), InsnEmpty(IpFwOpcode.O_ACCEPT), ], }, }, id="test_keep_state", ), pytest.param( { "in": "add allow tcp from any to any record-state", "out": { "objs": [ NTlv(IpFwTlvType.IPFW_TLV_STATE_NAME, idx=1, name="default"), ], "insns": [ Insn(IpFwOpcode.O_PROTO, arg1=6), - Insn(IpFwOpcode.O_KEEP_STATE, arg1=1), + InsnU32(IpFwOpcode.O_KEEP_STATE, u32=1), InsnEmpty(IpFwOpcode.O_ACCEPT), ], }, }, id="test_record_state", ), ], ) def test_add_rule(self, rule): """Tests if the compiled rule is sane and matches the spec""" self.verify_rule(rule["in"], rule["out"]) @pytest.mark.parametrize( "action", [ pytest.param(("allow", InsnEmpty(IpFwOpcode.O_ACCEPT)), id="test_allow"), pytest.param( ( "abort", Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_REJECT_ABORT), ), id="abort", ), pytest.param( ( "abort6", Insn( IpFwOpcode.O_UNREACH6, arg1=Icmp6RejectCode.ICMP6_UNREACH_ABORT ), ), id="abort6", ), pytest.param(("accept", InsnEmpty(IpFwOpcode.O_ACCEPT)), id="accept"), pytest.param(("deny", InsnEmpty(IpFwOpcode.O_DENY)), id="deny"), pytest.param( ( "reject", Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_HOST), ), id="reject", ), pytest.param( ( "reset", Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_REJECT_RST), ), id="reset", ), pytest.param( ( "reset6", Insn(IpFwOpcode.O_UNREACH6, arg1=Icmp6RejectCode.ICMP6_UNREACH_RST), ), id="reset6", ), pytest.param( ( "unreach port", InsnReject( IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_PORT ), ), id="unreach_port", ), pytest.param( ( "unreach port", InsnReject( IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_PORT ), ), id="unreach_port", ), pytest.param( ( "unreach needfrag", InsnReject( IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_NEEDFRAG ), ), id="unreach_needfrag", ), pytest.param( ( "unreach needfrag 1420", InsnReject( IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_NEEDFRAG, mtu=1420, ), ), id="unreach_needfrag_mtu", ), pytest.param( ( "unreach6 port", Insn( IpFwOpcode.O_UNREACH6, arg1=Icmp6RejectCode.ICMP6_DST_UNREACH_NOPORT, ), ), id="unreach6_port", ), pytest.param(("count", InsnEmpty(IpFwOpcode.O_COUNT)), id="count"), # TOK_NAT pytest.param( ("queue 42", Insn(IpFwOpcode.O_QUEUE, arg1=42)), id="queue_42" ), pytest.param(("pipe 42", Insn(IpFwOpcode.O_PIPE, arg1=42)), id="pipe_42"), pytest.param( - ("skipto 42", Insn(IpFwOpcode.O_SKIPTO, arg1=42)), id="skipto_42" + ("skipto 42", InsnU32(IpFwOpcode.O_SKIPTO, u32=42)), id="skipto_42" ), pytest.param( ("netgraph 42", Insn(IpFwOpcode.O_NETGRAPH, arg1=42)), id="netgraph_42" ), pytest.param( ("ngtee 42", Insn(IpFwOpcode.O_NGTEE, arg1=42)), id="ngtee_42" ), pytest.param( ("divert 42", Insn(IpFwOpcode.O_DIVERT, arg1=42)), id="divert_42" ), pytest.param( ("divert natd", Insn(IpFwOpcode.O_DIVERT, arg1=8668)), id="divert_natd" ), pytest.param(("tee 42", Insn(IpFwOpcode.O_TEE, arg1=42)), id="tee_42"), pytest.param( - ("call 420", Insn(IpFwOpcode.O_CALLRETURN, arg1=420)), id="call_420" + ("call 420", InsnU32(IpFwOpcode.O_CALLRETURN, u32=420)), id="call_420" ), # TOK_FORWARD pytest.param( ("setfib 1", Insn(IpFwOpcode.O_SETFIB, arg1=1 | 0x8000)), id="setfib_1", marks=pytest.mark.skip("needs net.fibs>1"), ), pytest.param( ("setdscp 42", Insn(IpFwOpcode.O_SETDSCP, arg1=42 | 0x8000)), id="setdscp_42", ), pytest.param(("reass", InsnEmpty(IpFwOpcode.O_REASS)), id="reass"), pytest.param( - ("return", InsnEmpty(IpFwOpcode.O_CALLRETURN, is_not=True)), id="return" + ("return", InsnU32(IpFwOpcode.O_CALLRETURN, is_not=True)), id="return" ), ], ) def test_add_action(self, action): """Tests if the rule action is compiled properly""" rule_in = "add {} ip from any to any".format(action[0]) rule_out = {"insns": [action[1]]} self.verify_rule(rule_in, rule_out) @pytest.mark.parametrize( "insn", [ pytest.param( { "in": "add prob 0.7 allow ip from any to any", "out": InsnProb(prob=0.7), }, id="test_prob", ), pytest.param( { "in": "add allow tcp from any to any", "out": InsnProto(arg1=6), }, id="test_proto", ), pytest.param( { "in": "add allow ip from any to any 57", "out": InsnPorts(IpFwOpcode.O_IP_DSTPORT, port_pairs=[57, 57]), }, id="test_ports", ), ], ) def test_add_single_instruction(self, insn): """Tests if the compiled rule is sane and matches the spec""" # Prepare the desired output out = { "insns": [insn["out"], InsnEmpty(IpFwOpcode.O_ACCEPT)], } self.verify_rule(insn["in"], out) @pytest.mark.parametrize( "opcode", [ pytest.param(IpFwOpcode.O_IP_SRCPORT, id="src"), pytest.param(IpFwOpcode.O_IP_DSTPORT, id="dst"), ], ) @pytest.mark.parametrize( "params", [ pytest.param( { "in": "57", "out": [(57, 57)], }, id="test_single", ), pytest.param( { "in": "57-59", "out": [(57, 59)], }, id="test_range", ), pytest.param( { "in": "57-59,41", "out": [(57, 59), (41, 41)], }, id="test_ranges", ), ], ) def test_add_ports(self, params, opcode): if opcode == IpFwOpcode.O_IP_DSTPORT: txt = "add allow ip from any to any " + params["in"] else: txt = "add allow ip from any " + params["in"] + " to any" out = { "insns": [ InsnPorts(opcode, port_pairs=params["out"]), InsnEmpty(IpFwOpcode.O_ACCEPT), ] } self.verify_rule(txt, out) diff --git a/tests/atf_python/sys/netpfil/ipfw/insns.py b/tests/atf_python/sys/netpfil/ipfw/insns.py index 12f145f49393..f8a56de901ae 100644 --- a/tests/atf_python/sys/netpfil/ipfw/insns.py +++ b/tests/atf_python/sys/netpfil/ipfw/insns.py @@ -1,555 +1,558 @@ #!/usr/bin/env python3 import os import socket import struct import subprocess import sys from ctypes import c_byte from ctypes import c_char from ctypes import c_int from ctypes import c_long from ctypes import c_uint32 from ctypes import c_uint8 from ctypes import c_ulong from ctypes import c_ushort from ctypes import sizeof from ctypes import Structure from enum import Enum from typing import Any from typing import Dict from typing import List from typing import NamedTuple from typing import Optional from typing import Union from atf_python.sys.netpfil.ipfw.insn_headers import IpFwOpcode from atf_python.sys.netpfil.ipfw.insn_headers import IcmpRejectCode from atf_python.sys.netpfil.ipfw.insn_headers import Icmp6RejectCode from atf_python.sys.netpfil.ipfw.utils import AttrDescr from atf_python.sys.netpfil.ipfw.utils import enum_or_int from atf_python.sys.netpfil.ipfw.utils import enum_from_int from atf_python.sys.netpfil.ipfw.utils import prepare_attrs_map insn_actions = ( IpFwOpcode.O_CHECK_STATE.value, IpFwOpcode.O_REJECT.value, IpFwOpcode.O_UNREACH6.value, IpFwOpcode.O_ACCEPT.value, IpFwOpcode.O_DENY.value, IpFwOpcode.O_COUNT.value, IpFwOpcode.O_NAT.value, IpFwOpcode.O_QUEUE.value, IpFwOpcode.O_PIPE.value, IpFwOpcode.O_SKIPTO.value, IpFwOpcode.O_NETGRAPH.value, IpFwOpcode.O_NGTEE.value, IpFwOpcode.O_DIVERT.value, IpFwOpcode.O_TEE.value, IpFwOpcode.O_CALLRETURN.value, IpFwOpcode.O_FORWARD_IP.value, IpFwOpcode.O_FORWARD_IP6.value, IpFwOpcode.O_SETFIB.value, IpFwOpcode.O_SETDSCP.value, IpFwOpcode.O_REASS.value, IpFwOpcode.O_SETMARK.value, IpFwOpcode.O_EXTERNAL_ACTION.value, ) class IpFwInsn(Structure): _fields_ = [ ("opcode", c_uint8), ("length", c_uint8), ("arg1", c_ushort), ] class BaseInsn(object): obj_enum_class = IpFwOpcode def __init__(self, opcode, is_or, is_not, arg1): if isinstance(opcode, Enum): self.obj_type = opcode.value self._enum = opcode else: self.obj_type = opcode self._enum = enum_from_int(self.obj_enum_class, self.obj_type) self.is_or = is_or self.is_not = is_not self.arg1 = arg1 self.is_action = self.obj_type in insn_actions self.ilen = 1 self.obj_list = [] @property def obj_name(self): if self._enum is not None: return self._enum.name else: return "opcode#{}".format(self.obj_type) @staticmethod def get_insn_len(data: bytes) -> int: (opcode_len,) = struct.unpack("@B", data[1:2]) return opcode_len & 0x3F @classmethod def _validate_len(cls, data, valid_options=None): if len(data) < 4: raise ValueError("opcode too short") opcode_type, opcode_len = struct.unpack("@BB", data[:2]) if len(data) != ((opcode_len & 0x3F) * 4): raise ValueError("wrong length") if valid_options and len(data) not in valid_options: raise ValueError( "len {} not in {} for {}".format( len(data), valid_options, enum_from_int(cls.obj_enum_class, data[0]) ) ) @classmethod def _validate(cls, data): cls._validate_len(data) @classmethod def _parse(cls, data): insn = IpFwInsn.from_buffer_copy(data[:4]) is_or = (insn.length & 0x40) != 0 is_not = (insn.length & 0x80) != 0 return cls(opcode=insn.opcode, is_or=is_or, is_not=is_not, arg1=insn.arg1) @classmethod def from_bytes(cls, data, attr_type_enum): cls._validate(data) opcode = cls._parse(data) opcode._enum = attr_type_enum return opcode def __bytes__(self): raise NotImplementedError() def print_obj(self, prepend=""): is_or = "" if self.is_or: is_or = " [OR]\\" is_not = "" if self.is_not: is_not = "[!] " print( "{}{}len={} type={}({}){}{}".format( prepend, is_not, len(bytes(self)), self.obj_name, self.obj_type, self._print_obj_value(), is_or, ) ) def _print_obj_value(self): raise NotImplementedError() def print_obj_hex(self, prepend=""): print(prepend) print() print(" ".join(["x{:02X}".format(b) for b in bytes(self)])) @staticmethod def parse_insns(data, attr_map): ret = [] off = 0 while off + sizeof(IpFwInsn) <= len(data): hdr = IpFwInsn.from_buffer_copy(data[off : off + sizeof(IpFwInsn)]) insn_len = (hdr.length & 0x3F) * 4 if off + insn_len > len(data): raise ValueError("wrng length") # print("GET insn type {} len {}".format(hdr.opcode, insn_len)) attr = attr_map.get(hdr.opcode, None) if attr is None: cls = InsnUnknown type_enum = enum_from_int(BaseInsn.obj_enum_class, hdr.opcode) else: cls = attr["ad"].cls type_enum = attr["ad"].val insn = cls.from_bytes(data[off : off + insn_len], type_enum) ret.append(insn) off += insn_len if off != len(data): raise ValueError("empty space") return ret class Insn(BaseInsn): def __init__(self, opcode, is_or=False, is_not=False, arg1=0): super().__init__(opcode, is_or=is_or, is_not=is_not, arg1=arg1) @classmethod def _validate(cls, data): cls._validate_len(data, [4]) def __bytes__(self): length = self.ilen if self.is_or: length |= 0x40 if self.is_not: length | 0x80 insn = IpFwInsn(opcode=self.obj_type, length=length, arg1=enum_or_int(self.arg1)) return bytes(insn) def _print_obj_value(self): return " arg1={}".format(self.arg1) class InsnUnknown(Insn): @classmethod def _validate(cls, data): cls._validate_len(data) @classmethod def _parse(cls, data): self = super()._parse(data) self._data = data return self def __bytes__(self): return self._data def _print_obj_value(self): return " " + " ".join(["x{:02X}".format(b) for b in self._data]) class InsnEmpty(Insn): @classmethod def _validate(cls, data): cls._validate_len(data, [4]) insn = IpFwInsn.from_buffer_copy(data[:4]) if insn.arg1 != 0: raise ValueError("arg1 should be empty") def _print_obj_value(self): return "" class InsnComment(Insn): def __init__(self, opcode=IpFwOpcode.O_NOP, is_or=False, is_not=False, arg1=0, comment=""): super().__init__(opcode, is_or=is_or, is_not=is_not, arg1=arg1) if comment: self.comment = comment else: self.comment = "" @classmethod def _validate(cls, data): cls._validate_len(data) if len(data) > 88: raise ValueError("comment too long") @classmethod def _parse(cls, data): self = super()._parse(data) # Comment encoding can be anything, # use utf-8 to ease debugging max_len = 0 for b in range(4, len(data)): if data[b] == b"\0": break max_len += 1 self.comment = data[4:max_len].decode("utf-8") return self def __bytes__(self): ret = super().__bytes__() comment_bytes = self.comment.encode("utf-8") + b"\0" if len(comment_bytes) % 4 > 0: comment_bytes += b"\0" * (4 - (len(comment_bytes) % 4)) ret += comment_bytes return ret def _print_obj_value(self): return " comment='{}'".format(self.comment) class InsnProto(Insn): def __init__(self, opcode=IpFwOpcode.O_PROTO, is_or=False, is_not=False, arg1=0): super().__init__(opcode, is_or=is_or, is_not=is_not, arg1=arg1) def _print_obj_value(self): known_map = {6: "TCP", 17: "UDP", 41: "IPV6"} proto = self.arg1 if proto in known_map: return " proto={}".format(known_map[proto]) else: return " proto=#{}".format(proto) class InsnU32(Insn): def __init__(self, opcode, is_or=False, is_not=False, arg1=0, u32=0): super().__init__(opcode, is_or=is_or, is_not=is_not, arg1=arg1) self.u32 = u32 self.ilen = 2 @classmethod def _validate(cls, data): cls._validate_len(data, [8]) @classmethod def _parse(cls, data): self = super()._parse(data[:4]) self.u32 = struct.unpack("@I", data[4:8])[0] return self def __bytes__(self): return super().__bytes__() + struct.pack("@I", self.u32) def _print_obj_value(self): return " arg1={} u32={}".format(self.arg1, self.u32) class InsnProb(InsnU32): def __init__( self, opcode=IpFwOpcode.O_PROB, is_or=False, is_not=False, arg1=0, u32=0, prob=0.0, ): super().__init__(opcode, is_or=is_or, is_not=is_not) self.prob = prob @property def prob(self): return 1.0 * self.u32 / 0x7FFFFFFF @prob.setter def prob(self, prob: float): self.u32 = int(prob * 0x7FFFFFFF) def _print_obj_value(self): return " prob={}".format(round(self.prob, 5)) class InsnIp(InsnU32): def __init__(self, opcode, is_or=False, is_not=False, arg1=0, u32=0, ip=None): super().__init__(opcode, is_or=is_or, is_not=is_not, arg1=arg1, u32=u32) if ip: self.ip = ip @property def ip(self): return socket.inet_ntop(socket.AF_INET, struct.pack("@I", self.u32)) @ip.setter def ip(self, ip: str): ip_bin = socket.inet_pton(socket.AF_INET, ip) self.u32 = struct.unpack("@I", ip_bin)[0] def _print_opcode_value(self): return " ip={}".format(self.ip) class InsnTable(Insn): @classmethod def _validate(cls, data): cls._validate_len(data, [4, 8]) @classmethod def _parse(cls, data): self = super()._parse(data) if len(data) == 8: (self.val,) = struct.unpack("@I", data[4:8]) self.ilen = 2 else: self.val = None return self def __bytes__(self): ret = super().__bytes__() if getattr(self, "val", None) is not None: ret += struct.pack("@I", self.val) return ret def _print_obj_value(self): if getattr(self, "val", None) is not None: return " table={} value={}".format(self.arg1, self.val) else: return " table={}".format(self.arg1) class InsnReject(Insn): def __init__(self, opcode, is_or=False, is_not=False, arg1=0, mtu=None): super().__init__(opcode, is_or=is_or, is_not=is_not, arg1=arg1) self.mtu = mtu if self.mtu is not None: self.ilen = 2 @classmethod def _validate(cls, data): cls._validate_len(data, [4, 8]) @classmethod def _parse(cls, data): self = super()._parse(data) if len(data) == 8: (self.mtu,) = struct.unpack("@I", data[4:8]) self.ilen = 2 else: self.mtu = None return self def __bytes__(self): ret = super().__bytes__() if getattr(self, "mtu", None) is not None: ret += struct.pack("@I", self.mtu) return ret def _print_obj_value(self): code = enum_from_int(IcmpRejectCode, self.arg1) if getattr(self, "mtu", None) is not None: return " code={} mtu={}".format(code, self.mtu) else: return " code={}".format(code) class InsnPorts(Insn): def __init__(self, opcode, is_or=False, is_not=False, arg1=0, port_pairs=[]): super().__init__(opcode, is_or=is_or, is_not=is_not) self.port_pairs = [] if port_pairs: self.port_pairs = port_pairs @classmethod def _validate(cls, data): if len(data) < 8: raise ValueError("no ports specified") cls._validate_len(data) @classmethod def _parse(cls, data): self = super()._parse(data) off = 4 port_pairs = [] while off + 4 <= len(data): low, high = struct.unpack("@HH", data[off : off + 4]) port_pairs.append((low, high)) off += 4 self.port_pairs = port_pairs return self def __bytes__(self): ret = super().__bytes__() if getattr(self, "val", None) is not None: ret += struct.pack("@I", self.val) return ret def _print_obj_value(self): ret = [] for p in self.port_pairs: if p[0] == p[1]: ret.append(str(p[0])) else: ret.append("{}-{}".format(p[0], p[1])) return " ports={}".format(",".join(ret)) class IpFwInsnIp6(Structure): _fields_ = [ ("o", IpFwInsn), ("addr6", c_byte * 16), ("mask6", c_byte * 16), ] class InsnIp6(Insn): def __init__(self, opcode, is_or=False, is_not=False, arg1=0, ip6=None, mask6=None): super().__init__(opcode, is_or=is_or, is_not=is_not, arg1=arg1) self.ip6 = ip6 self.mask6 = mask6 if mask6 is not None: self.ilen = 9 else: self.ilen = 5 @classmethod def _validate(cls, data): cls._validate_len(data, [4 + 16, 4 + 16 * 2]) @classmethod def _parse(cls, data): self = super()._parse(data) self.ip6 = socket.inet_ntop(socket.AF_INET6, data[4:20]) if len(data) == 4 + 16 * 2: self.mask6 = socket.inet_ntop(socket.AF_INET6, data[20:36]) self.ilen = 9 else: self.mask6 = None self.ilen = 5 return self def __bytes__(self): ret = super().__bytes__() + socket.inet_pton(socket.AF_INET6, self.ip6) if self.mask6 is not None: ret += socket.inet_pton(socket.AF_INET6, self.mask6) return ret def _print_obj_value(self): if self.mask6: return " ip6={}/{}".format(self.ip6, self.mask6) else: return " ip6={}".format(self.ip6) insn_attrs = prepare_attrs_map( [ - AttrDescr(IpFwOpcode.O_CHECK_STATE, Insn), + AttrDescr(IpFwOpcode.O_CHECK_STATE, InsnU32), AttrDescr(IpFwOpcode.O_ACCEPT, InsnEmpty), AttrDescr(IpFwOpcode.O_COUNT, InsnEmpty), AttrDescr(IpFwOpcode.O_REJECT, InsnReject), AttrDescr(IpFwOpcode.O_UNREACH6, Insn), AttrDescr(IpFwOpcode.O_DENY, InsnEmpty), AttrDescr(IpFwOpcode.O_DIVERT, Insn), AttrDescr(IpFwOpcode.O_COUNT, InsnEmpty), AttrDescr(IpFwOpcode.O_QUEUE, Insn), AttrDescr(IpFwOpcode.O_PIPE, Insn), - AttrDescr(IpFwOpcode.O_SKIPTO, Insn), + AttrDescr(IpFwOpcode.O_SKIPTO, InsnU32), AttrDescr(IpFwOpcode.O_NETGRAPH, Insn), AttrDescr(IpFwOpcode.O_NGTEE, Insn), AttrDescr(IpFwOpcode.O_DIVERT, Insn), AttrDescr(IpFwOpcode.O_TEE, Insn), - AttrDescr(IpFwOpcode.O_CALLRETURN, Insn), + AttrDescr(IpFwOpcode.O_CALLRETURN, InsnU32), AttrDescr(IpFwOpcode.O_SETFIB, Insn), AttrDescr(IpFwOpcode.O_SETDSCP, Insn), AttrDescr(IpFwOpcode.O_REASS, InsnEmpty), - AttrDescr(IpFwOpcode.O_SETMARK, Insn), + AttrDescr(IpFwOpcode.O_SETMARK, InsnU32), + + AttrDescr(IpFwOpcode.O_EXTERNAL_ACTION, InsnU32), + AttrDescr(IpFwOpcode.O_EXTERNAL_INSTANCE, InsnU32), AttrDescr(IpFwOpcode.O_NOP, InsnComment), AttrDescr(IpFwOpcode.O_PROTO, InsnProto), AttrDescr(IpFwOpcode.O_PROB, InsnProb), AttrDescr(IpFwOpcode.O_IP_DST_ME, InsnEmpty), AttrDescr(IpFwOpcode.O_IP_SRC_ME, InsnEmpty), AttrDescr(IpFwOpcode.O_IP6_DST_ME, InsnEmpty), AttrDescr(IpFwOpcode.O_IP6_SRC_ME, InsnEmpty), AttrDescr(IpFwOpcode.O_IP_SRC, InsnIp), AttrDescr(IpFwOpcode.O_IP_DST, InsnIp), AttrDescr(IpFwOpcode.O_IP6_DST, InsnIp6), AttrDescr(IpFwOpcode.O_IP6_SRC, InsnIp6), - AttrDescr(IpFwOpcode.O_IP_SRC_LOOKUP, InsnTable), - AttrDescr(IpFwOpcode.O_IP_DST_LOOKUP, InsnTable), + AttrDescr(IpFwOpcode.O_IP_SRC_LOOKUP, InsnU32), + AttrDescr(IpFwOpcode.O_IP_DST_LOOKUP, InsnU32), AttrDescr(IpFwOpcode.O_IP_SRCPORT, InsnPorts), AttrDescr(IpFwOpcode.O_IP_DSTPORT, InsnPorts), - AttrDescr(IpFwOpcode.O_PROBE_STATE, Insn), - AttrDescr(IpFwOpcode.O_KEEP_STATE, Insn), + AttrDescr(IpFwOpcode.O_PROBE_STATE, InsnU32), + AttrDescr(IpFwOpcode.O_KEEP_STATE, InsnU32), ] )