diff --git a/sbin/ipfw/tests/test_add_rule.py b/sbin/ipfw/tests/test_add_rule.py index 65b4e7d33646..42c594a83bcf 100755 --- a/sbin/ipfw/tests/test_add_rule.py +++ b/sbin/ipfw/tests/test_add_rule.py @@ -1,400 +1,433 @@ import errno import json import os import socket import struct import subprocess import sys from ctypes import c_byte from ctypes import c_char from ctypes import c_int from ctypes import c_long from ctypes import c_uint32 from ctypes import c_uint8 from ctypes import c_ulong from ctypes import c_ushort from ctypes import sizeof from ctypes import Structure from enum import Enum from typing import Any from typing import Dict from typing import List from typing import NamedTuple from typing import Optional from typing import Union import pytest from atf_python.sys.netpfil.ipfw.insns import Icmp6RejectCode from atf_python.sys.netpfil.ipfw.insns import IcmpRejectCode from atf_python.sys.netpfil.ipfw.insns import Insn from atf_python.sys.netpfil.ipfw.insns import InsnComment from atf_python.sys.netpfil.ipfw.insns import InsnEmpty from atf_python.sys.netpfil.ipfw.insns import InsnIp from atf_python.sys.netpfil.ipfw.insns import InsnIp6 from atf_python.sys.netpfil.ipfw.insns import InsnPorts from atf_python.sys.netpfil.ipfw.insns import InsnProb from atf_python.sys.netpfil.ipfw.insns import InsnProto from atf_python.sys.netpfil.ipfw.insns import InsnReject from atf_python.sys.netpfil.ipfw.insns import InsnTable from atf_python.sys.netpfil.ipfw.insns import IpFwOpcode from atf_python.sys.netpfil.ipfw.ioctl import CTlv from atf_python.sys.netpfil.ipfw.ioctl import CTlvRule from atf_python.sys.netpfil.ipfw.ioctl import IpFwTlvType from atf_python.sys.netpfil.ipfw.ioctl import IpFwXRule from atf_python.sys.netpfil.ipfw.ioctl import NTlv from atf_python.sys.netpfil.ipfw.ioctl import Op3CmdType from atf_python.sys.netpfil.ipfw.ioctl import RawRule from atf_python.sys.netpfil.ipfw.ipfw import DebugIoReader from atf_python.sys.netpfil.ipfw.utils import enum_from_int from atf_python.utils import BaseTest IPFW_PATH = "/sbin/ipfw" def differ(w_obj, g_obj, w_stack=[], g_stack=[]): if bytes(w_obj) == bytes(g_obj): return True num_objects = 0 for i, w_child in enumerate(w_obj.obj_list): if i > len(g_obj.obj_list): print("MISSING object from chain {}".format(" / ".join(w_stack))) w_child.print_obj() print("==========================") return False g_child = g_obj.obj_list[i] if bytes(w_child) == bytes(g_child): num_objects += 1 continue w_stack.append(w_obj.obj_name) g_stack.append(g_obj.obj_name) if not differ(w_child, g_child, w_stack, g_stack): return False break if num_objects == len(w_obj.obj_list) and num_objects < len(g_obj.obj_list): g_child = g_obj.obj_list[num_objects] print("EXTRA object from chain {}".format(" / ".join(g_stack))) g_child.print_obj() print("==========================") return False print("OBJECTS DIFFER") print("WANTED CHAIN: {}".format(" / ".join(w_stack))) w_obj.print_obj() w_obj.print_obj_hex() print("==========================") print("GOT CHAIN: {}".format(" / ".join(g_stack))) g_obj.print_obj() g_obj.print_obj_hex() print("==========================") return False class TestAddRule(BaseTest): def compile_rule(self, out): tlvs = [] if "objs" in out: tlvs.append(CTlv(IpFwTlvType.IPFW_TLV_TBLNAME_LIST, out["objs"])) rule = RawRule(rulenum=out.get("rulenum", 0), obj_list=out["insns"]) tlvs.append(CTlvRule(obj_list=[rule])) return IpFwXRule(Op3CmdType.IP_FW_XADD, tlvs) def verify_rule(self, in_data: str, out_data): # Prepare the desired output expected = self.compile_rule(out_data) reader = DebugIoReader(IPFW_PATH) ioctls = reader.get_records(in_data) assert len(ioctls) == 1 # Only 1 ioctl request expected got = ioctls[0] if not differ(expected, got): print("=> CMD: {}".format(in_data)) print("=> WANTED:") expected.print_obj() print("==========================") print("=> GOT:") got.print_obj() print("==========================") assert bytes(got) == bytes(expected) @pytest.mark.parametrize( "rule", [ pytest.param( { "in": "add 200 allow ip from any to any", "out": { "insns": [InsnEmpty(IpFwOpcode.O_ACCEPT)], "rulenum": 200, }, }, id="test_rulenum", ), pytest.param( { "in": "add allow ip from { 1.2.3.4 or 2.3.4.5 } to any", "out": { "insns": [ InsnIp(IpFwOpcode.O_IP_SRC, ip="1.2.3.4", is_or=True), InsnIp(IpFwOpcode.O_IP_SRC, ip="2.3.4.5"), InsnEmpty(IpFwOpcode.O_ACCEPT), ], }, }, id="test_or", ), pytest.param( { "in": "add allow ip from table(AAA) to table(BBB)", "out": { "objs": [ NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=1, name="AAA"), NTlv(IpFwTlvType.IPFW_TLV_TBL_NAME, idx=2, name="BBB"), ], "insns": [ InsnTable(IpFwOpcode.O_IP_SRC_LOOKUP, arg1=1), InsnTable(IpFwOpcode.O_IP_DST_LOOKUP, arg1=2), InsnEmpty(IpFwOpcode.O_ACCEPT), ], }, }, id="test_tables", ), pytest.param( { "in": "add allow ip from any to 1.2.3.4 // test comment", "out": { "insns": [ InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"), InsnComment(comment="test comment"), InsnEmpty(IpFwOpcode.O_ACCEPT), ], }, }, id="test_comment", ), + pytest.param( + { + "in": "add tcp-setmss 123 ip from any to 1.2.3.4", + "out": { + "objs": [ + NTlv(IpFwTlvType.IPFW_TLV_EACTION, idx=1, name="tcp-setmss"), + ], + "insns": [ + InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"), + Insn(IpFwOpcode.O_EXTERNAL_ACTION, arg1=1), + Insn(IpFwOpcode.O_EXTERNAL_DATA, arg1=123), + ], + }, + }, + id="test_eaction_tcp-setmss", + ), + pytest.param( + { + "in": "add eaction ntpv6 AAA ip from any to 1.2.3.4", + "out": { + "objs": [ + NTlv(IpFwTlvType.IPFW_TLV_EACTION, idx=1, name="ntpv6"), + NTlv(0, idx=2, name="AAA"), + ], + "insns": [ + InsnIp(IpFwOpcode.O_IP_DST, ip="1.2.3.4"), + Insn(IpFwOpcode.O_EXTERNAL_ACTION, arg1=1), + Insn(IpFwOpcode.O_EXTERNAL_INSTANCE, arg1=2), + ], + }, + }, + id="test_eaction_ntp", + ), ], ) def test_add_rule(self, rule): """Tests if the compiled rule is sane and matches the spec""" self.verify_rule(rule["in"], rule["out"]) @pytest.mark.parametrize( "action", [ pytest.param(("allow", InsnEmpty(IpFwOpcode.O_ACCEPT)), id="test_allow"), pytest.param( ( "abort", Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_REJECT_ABORT), ), id="abort", ), pytest.param( ( "abort6", Insn( IpFwOpcode.O_UNREACH6, arg1=Icmp6RejectCode.ICMP6_UNREACH_ABORT ), ), id="abort6", ), pytest.param(("accept", InsnEmpty(IpFwOpcode.O_ACCEPT)), id="accept"), pytest.param(("deny", InsnEmpty(IpFwOpcode.O_DENY)), id="deny"), pytest.param( ( "reject", Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_HOST), ), id="reject", ), pytest.param( ( "reset", Insn(IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_REJECT_RST), ), id="reset", ), pytest.param( ( "reset6", Insn(IpFwOpcode.O_UNREACH6, arg1=Icmp6RejectCode.ICMP6_UNREACH_RST), ), id="reset6", ), pytest.param( ( "unreach port", InsnReject( IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_PORT ), ), id="unreach_port", ), pytest.param( ( "unreach port", InsnReject( IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_PORT ), ), id="unreach_port", ), pytest.param( ( "unreach needfrag", InsnReject( IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_NEEDFRAG ), ), id="unreach_needfrag", ), pytest.param( ( "unreach needfrag 1420", InsnReject( IpFwOpcode.O_REJECT, arg1=IcmpRejectCode.ICMP_UNREACH_NEEDFRAG, mtu=1420, ), ), id="unreach_needfrag_mtu", ), pytest.param( ( "unreach6 port", Insn( IpFwOpcode.O_UNREACH6, arg1=Icmp6RejectCode.ICMP6_DST_UNREACH_NOPORT, ), ), id="unreach6_port", ), pytest.param(("count", InsnEmpty(IpFwOpcode.O_COUNT)), id="count"), # TOK_NAT pytest.param( ("queue 42", Insn(IpFwOpcode.O_QUEUE, arg1=42)), id="queue_42" ), pytest.param(("pipe 42", Insn(IpFwOpcode.O_PIPE, arg1=42)), id="pipe_42"), pytest.param( ("skipto 42", Insn(IpFwOpcode.O_SKIPTO, arg1=42)), id="skipto_42" ), pytest.param( ("netgraph 42", Insn(IpFwOpcode.O_NETGRAPH, arg1=42)), id="netgraph_42" ), pytest.param( ("ngtee 42", Insn(IpFwOpcode.O_NGTEE, arg1=42)), id="ngtee_42" ), pytest.param( ("divert 42", Insn(IpFwOpcode.O_DIVERT, arg1=42)), id="divert_42" ), pytest.param( ("divert natd", Insn(IpFwOpcode.O_DIVERT, arg1=8668)), id="divert_natd" ), pytest.param(("tee 42", Insn(IpFwOpcode.O_TEE, arg1=42)), id="tee_42"), pytest.param( ("call 420", Insn(IpFwOpcode.O_CALLRETURN, arg1=420)), id="call_420" ), # TOK_FORWARD # TOK_COMMENT pytest.param( ("setfib 1", Insn(IpFwOpcode.O_SETFIB, arg1=1 | 0x8000)), id="setfib_1", marks=pytest.mark.skip("needs net.fibs>1"), ), pytest.param( ("setdscp 42", Insn(IpFwOpcode.O_SETDSCP, arg1=42 | 0x8000)), id="setdscp_42", ), pytest.param(("reass", InsnEmpty(IpFwOpcode.O_REASS)), id="reass"), pytest.param( ("return", InsnEmpty(IpFwOpcode.O_CALLRETURN, is_not=True)), id="return" ), ], ) def test_add_action(self, action): """Tests if the rule action is compiled properly""" rule_in = "add {} ip from any to any".format(action[0]) rule_out = {"insns": [action[1]]} self.verify_rule(rule_in, rule_out) @pytest.mark.parametrize( "insn", [ pytest.param( { "in": "add prob 0.7 allow ip from any to any", "out": InsnProb(prob=0.7), }, id="test_prob", ), pytest.param( { "in": "add allow tcp from any to any", "out": InsnProto(arg1=6), }, id="test_proto", ), pytest.param( { "in": "add allow ip from any to any 57", "out": InsnPorts(IpFwOpcode.O_IP_DSTPORT, port_pairs=[57, 57]), }, id="test_ports", ), ], ) def test_add_single_instruction(self, insn): """Tests if the compiled rule is sane and matches the spec""" # Prepare the desired output out = { "insns": [insn["out"], InsnEmpty(IpFwOpcode.O_ACCEPT)], } self.verify_rule(insn["in"], out) @pytest.mark.parametrize( "opcode", [ pytest.param(IpFwOpcode.O_IP_SRCPORT, id="src"), pytest.param(IpFwOpcode.O_IP_DSTPORT, id="dst"), ], ) @pytest.mark.parametrize( "params", [ pytest.param( { "in": "57", "out": [(57, 57)], }, id="test_single", ), pytest.param( { "in": "57-59", "out": [(57, 59)], }, id="test_range", ), pytest.param( { "in": "57-59,41", "out": [(57, 59), (41, 41)], }, id="test_ranges", ), ], ) def test_add_ports(self, params, opcode): if opcode == IpFwOpcode.O_IP_DSTPORT: txt = "add allow ip from any to any " + params["in"] else: txt = "add allow ip from any " + params["in"] + " to any" out = { "insns": [ InsnPorts(opcode, port_pairs=params["out"]), InsnEmpty(IpFwOpcode.O_ACCEPT), ] } self.verify_rule(txt, out) diff --git a/tests/atf_python/sys/netpfil/ipfw/ioctl.py b/tests/atf_python/sys/netpfil/ipfw/ioctl.py index 45ba96a7fb70..4c6d3f234c6c 100644 --- a/tests/atf_python/sys/netpfil/ipfw/ioctl.py +++ b/tests/atf_python/sys/netpfil/ipfw/ioctl.py @@ -1,505 +1,511 @@ #!/usr/bin/env python3 import os import socket import struct import subprocess import sys from ctypes import c_byte from ctypes import c_char from ctypes import c_int from ctypes import c_long from ctypes import c_uint32 from ctypes import c_uint8 from ctypes import c_ulong from ctypes import c_ushort from ctypes import sizeof from ctypes import Structure from enum import Enum from typing import Any from typing import Dict from typing import List from typing import NamedTuple from typing import Optional from typing import Union import pytest from atf_python.sys.netpfil.ipfw.insns import BaseInsn from atf_python.sys.netpfil.ipfw.insns import insn_attrs from atf_python.sys.netpfil.ipfw.ioctl_headers import IpFwTableLookupType from atf_python.sys.netpfil.ipfw.ioctl_headers import IpFwTlvType from atf_python.sys.netpfil.ipfw.ioctl_headers import Op3CmdType from atf_python.sys.netpfil.ipfw.utils import align8 from atf_python.sys.netpfil.ipfw.utils import AttrDescr from atf_python.sys.netpfil.ipfw.utils import enum_from_int from atf_python.sys.netpfil.ipfw.utils import prepare_attrs_map class IpFw3OpHeader(Structure): _fields_ = [ ("opcode", c_ushort), ("version", c_ushort), ("reserved1", c_ushort), ("reserved2", c_ushort), ] class IpFwObjTlv(Structure): _fields_ = [ ("n_type", c_ushort), ("flags", c_ushort), ("length", c_uint32), ] class BaseTlv(object): obj_enum_class = IpFwTlvType def __init__(self, obj_type): if isinstance(obj_type, Enum): self.obj_type = obj_type.value self._enum = obj_type else: self.obj_type = obj_type self._enum = enum_from_int(self.obj_enum_class, obj_type) self.obj_list = [] def add_obj(self, obj): self.obj_list.append(obj) @property def len(self): return len(bytes(self)) @property def obj_name(self): if self._enum is not None: return self._enum.name else: return "tlv#{}".format(self.obj_type) def print_hdr(self, prepend=""): print( "{}len={} type={}({}){}".format( prepend, self.len, self.obj_name, self.obj_type, self._print_obj_value() ) ) def print_obj(self, prepend=""): self.print_hdr(prepend) prepend = " " + prepend for obj in self.obj_list: obj.print_obj(prepend) + def print_obj_hex(self, prepend=""): + print(prepend) + print() + print(" ".join(["x{:02X}".format(b) for b in bytes(self)])) + @classmethod def _validate(cls, data): if len(data) < sizeof(IpFwObjTlv): raise ValueError("TLV too short") hdr = IpFwObjTlv.from_buffer_copy(data[: sizeof(IpFwObjTlv)]) if len(data) != hdr.length: raise ValueError("wrong TLV size") @classmethod def _parse(cls, data, attr_map): hdr = IpFwObjTlv.from_buffer_copy(data[: sizeof(IpFwObjTlv)]) return cls(hdr.n_type) @classmethod def from_bytes(cls, data, attr_map=None): cls._validate(data) obj = cls._parse(data, attr_map) return obj def __bytes__(self): raise NotImplementedError() def _print_obj_value(self): return " " + " ".join( ["x{:02X}".format(b) for b in self._data[sizeof(IpFwObjTlv) :]] ) def as_hexdump(self): return " ".join(["x{:02X}".format(b) for b in bytes(self)]) class UnknownTlv(BaseTlv): def __init__(self, obj_type, data): super().__init__(obj_type) self._data = data @classmethod def _validate(cls, data): if len(data) < sizeof(IpFwObjNTlv): raise ValueError("TLV size is too short") hdr = IpFwObjTlv.from_buffer_copy(data[: sizeof(IpFwObjTlv)]) if len(data) != hdr.length: raise ValueError("wrong TLV size") @classmethod def _parse(cls, data, attr_map): hdr = IpFwObjTlv.from_buffer_copy(data[: sizeof(IpFwObjTlv)]) self = cls(hdr.n_type, data) return self def __bytes__(self): return self._data class Tlv(BaseTlv): @staticmethod def parse_tlvs(data, attr_map): # print("PARSING " + " ".join(["x{:02X}".format(b) for b in data])) off = 0 ret = [] while off + sizeof(IpFwObjTlv) <= len(data): hdr = IpFwObjTlv.from_buffer_copy(data[off : off + sizeof(IpFwObjTlv)]) if off + hdr.length > len(data): raise ValueError("TLV size do not match") obj_data = data[off : off + hdr.length] obj_descr = attr_map.get(hdr.n_type, None) if obj_descr is None: # raise ValueError("unknown child TLV {}".format(hdr.n_type)) cls = UnknownTlv child_map = {} else: cls = obj_descr["ad"].cls child_map = obj_descr.get("child", {}) # print("FOUND OBJECT type {}".format(cls)) # print() obj = cls.from_bytes(obj_data, child_map) ret.append(obj) off += hdr.length return ret class IpFwObjNTlv(Structure): _fields_ = [ ("head", IpFwObjTlv), ("idx", c_ushort), ("n_set", c_uint8), ("n_type", c_uint8), ("spare", c_uint32), ("name", c_char * 64), ] class NTlv(Tlv): def __init__(self, obj_type, idx=0, n_set=0, n_type=0, name=None): super().__init__(obj_type) self.n_idx = idx self.n_set = n_set self.n_type = n_type self.n_name = name @classmethod def _validate(cls, data): if len(data) != sizeof(IpFwObjNTlv): raise ValueError("TLV size is not correct") hdr = IpFwObjTlv.from_buffer_copy(data[: sizeof(IpFwObjTlv)]) if len(data) != hdr.length: raise ValueError("wrong TLV size") @classmethod def _parse(cls, data, attr_map): hdr = IpFwObjNTlv.from_buffer_copy(data[: sizeof(IpFwObjNTlv)]) name = hdr.name.decode("utf-8") self = cls(hdr.head.n_type, hdr.idx, hdr.n_set, hdr.n_type, name) return self def __bytes__(self): name_bytes = self.n_name.encode("utf-8") if len(name_bytes) < 64: name_bytes += b"\0" * (64 - len(name_bytes)) hdr = IpFwObjNTlv( head=IpFwObjTlv(n_type=self.obj_type, length=sizeof(IpFwObjNTlv)), idx=self.n_idx, n_set=self.n_set, n_type=self.n_type, name=name_bytes[:64], ) return bytes(hdr) def _print_obj_value(self): return " " + "type={} set={} idx={} name={}".format( self.n_type, self.n_set, self.n_idx, self.n_name ) class IpFwObjCTlv(Structure): _fields_ = [ ("head", IpFwObjTlv), ("count", c_uint32), ("objsize", c_ushort), ("version", c_uint8), ("flags", c_uint8), ] class CTlv(Tlv): def __init__(self, obj_type, obj_list=[]): super().__init__(obj_type) if obj_list: self.obj_list.extend(obj_list) @classmethod def _validate(cls, data): if len(data) < sizeof(IpFwObjCTlv): raise ValueError("TLV too short") hdr = IpFwObjCTlv.from_buffer_copy(data[: sizeof(IpFwObjCTlv)]) if len(data) != hdr.head.length: raise ValueError("wrong TLV size") @classmethod def _parse(cls, data, attr_map): hdr = IpFwObjCTlv.from_buffer_copy(data[: sizeof(IpFwObjCTlv)]) tlv_list = cls.parse_tlvs(data[sizeof(IpFwObjCTlv) :], attr_map) if len(tlv_list) != hdr.count: raise ValueError("wrong number of objects") self = cls(hdr.head.n_type, obj_list=tlv_list) return self def __bytes__(self): ret = b"" for obj in self.obj_list: ret += bytes(obj) length = len(ret) + sizeof(IpFwObjCTlv) if self.obj_list: objsize = len(bytes(self.obj_list[0])) else: objsize = 0 hdr = IpFwObjCTlv( head=IpFwObjTlv(n_type=self.obj_type, length=sizeof(IpFwObjNTlv)), count=len(self.obj_list), objsize=objsize, ) return bytes(hdr) + ret def _print_obj_value(self): return "" class IpFwRule(Structure): _fields_ = [ ("act_ofs", c_ushort), ("cmd_len", c_ushort), ("spare", c_ushort), ("n_set", c_uint8), ("flags", c_uint8), ("rulenum", c_uint32), ("n_id", c_uint32), ] class RawRule(Tlv): def __init__(self, obj_type=0, n_set=0, rulenum=0, obj_list=[]): super().__init__(obj_type) self.n_set = n_set self.rulenum = rulenum if obj_list: self.obj_list.extend(obj_list) @classmethod def _validate(cls, data): min_size = sizeof(IpFwRule) if len(data) < min_size: raise ValueError("rule TLV too short") rule = IpFwRule.from_buffer_copy(data[:min_size]) if len(data) != min_size + rule.cmd_len * 4: raise ValueError("rule TLV cmd_len incorrect") @classmethod def _parse(cls, data, attr_map): hdr = IpFwRule.from_buffer_copy(data[: sizeof(IpFwRule)]) self = cls( n_set=hdr.n_set, rulenum=hdr.rulenum, obj_list=BaseInsn.parse_insns(data[sizeof(IpFwRule) :], insn_attrs), ) return self def __bytes__(self): act_ofs = 0 cmd_len = 0 ret = b"" for obj in self.obj_list: if obj.is_action and act_ofs == 0: act_ofs = cmd_len obj_bytes = bytes(obj) cmd_len += len(obj_bytes) // 4 ret += obj_bytes hdr = IpFwRule( act_ofs=act_ofs, cmd_len=cmd_len, n_set=self.n_set, rulenum=self.rulenum, ) return bytes(hdr) + ret @property def obj_name(self): return "rule#{}".format(self.rulenum) def _print_obj_value(self): cmd_len = sum([len(bytes(obj)) for obj in self.obj_list]) // 4 return " set={} cmd_len={}".format(self.n_set, cmd_len) class CTlvRule(CTlv): def __init__(self, obj_type=IpFwTlvType.IPFW_TLV_RULE_LIST, obj_list=[]): super().__init__(obj_type, obj_list) @classmethod def _parse(cls, data, attr_map): chdr = IpFwObjCTlv.from_buffer_copy(data[: sizeof(IpFwObjCTlv)]) rule_list = [] off = sizeof(IpFwObjCTlv) while off + sizeof(IpFwRule) <= len(data): hdr = IpFwRule.from_buffer_copy(data[off : off + sizeof(IpFwRule)]) rule_len = sizeof(IpFwRule) + hdr.cmd_len * 4 # print("FOUND RULE len={} cmd_len={}".format(rule_len, hdr.cmd_len)) if off + rule_len > len(data): raise ValueError("wrong rule size") rule = RawRule.from_bytes(data[off : off + rule_len]) rule_list.append(rule) off += align8(rule_len) if off != len(data): raise ValueError("rule bytes left: off={} len={}".format(off, len(data))) return cls(chdr.head.n_type, obj_list=rule_list) # XXX: _validate def __bytes__(self): ret = b"" for rule in self.obj_list: rule_bytes = bytes(rule) remainder = len(rule_bytes) % 8 if remainder > 0: rule_bytes += b"\0" * (8 - remainder) ret += rule_bytes hdr = IpFwObjCTlv( head=IpFwObjTlv( n_type=self.obj_type, length=len(ret) + sizeof(IpFwObjCTlv) ), count=len(self.obj_list), ) return bytes(hdr) + ret class BaseIpFwMessage(object): messages = [] def __init__(self, msg_type, obj_list=[]): if isinstance(msg_type, Enum): self.obj_type = msg_type.value self._enum = msg_type else: self.obj_type = msg_type self._enum = enum_from_int(self.messages, self.obj_type) self.obj_list = [] if obj_list: self.obj_list.extend(obj_list) def add_obj(self, obj): self.obj_list.append(obj) def get_obj(self, obj_type): obj_type_raw = enum_or_int(obj_type) for obj in self.obj_list: if obj.obj_type == obj_type_raw: return obj return None @staticmethod def parse_header(data: bytes): if len(data) < sizeof(IpFw3OpHeader): raise ValueError("length less than op3 message header") return IpFw3OpHeader.from_buffer_copy(data), sizeof(IpFw3OpHeader) def parse_obj_list(self, data: bytes): off = 0 while off < len(data): # print("PARSE off={} rem={}".format(off, len(data) - off)) hdr = IpFwObjTlv.from_buffer_copy(data[off : off + sizeof(IpFwObjTlv)]) # print(" tlv len {}".format(hdr.length)) if hdr.length + off > len(data): raise ValueError("TLV too big") tlv = Tlv(hdr.n_type, data[off : off + hdr.length]) self.add_obj(tlv) off += hdr.length def is_type(self, msg_type): return enum_or_int(msg_type) == self.msg_type @property def obj_name(self): if self._enum is not None: return self._enum.name else: return "msg#{}".format(self.obj_type) def print_hdr(self, prepend=""): print("{}len={}, type={}".format(prepend, len(bytes(self)), self.obj_name)) @classmethod def from_bytes(cls, data): try: hdr, hdrlen = cls.parse_header(data) self = cls(hdr.opcode) self._orig_data = data except ValueError as e: print("Failed to parse op3 header: {}".format(e)) cls.print_as_bytes(data) raise tlv_list = Tlv.parse_tlvs(data[hdrlen:], self.attr_map) self.obj_list.extend(tlv_list) return self def __bytes__(self): ret = bytes(IpFw3OpHeader(opcode=self.obj_type)) for obj in self.obj_list: ret += bytes(obj) return ret def print_obj(self): self.print_hdr() for obj in self.obj_list: obj.print_obj(" ") @staticmethod def print_as_bytes(data: bytes, descr: str): print("===vv {} (len:{:3d}) vv===".format(descr, len(data))) off = 0 step = 16 while off < len(data): for i in range(step): if off + i < len(data): print(" {:02X}".format(data[off + i]), end="") print("") off += step print("--------------------") rule_attrs = prepare_attrs_map( [ AttrDescr( IpFwTlvType.IPFW_TLV_TBLNAME_LIST, CTlv, [ AttrDescr(IpFwTlvType.IPFW_TLV_TBL_NAME, NTlv), AttrDescr(IpFwTlvType.IPFW_TLV_STATE_NAME, NTlv), + AttrDescr(IpFwTlvType.IPFW_TLV_EACTION, NTlv), ], True, ), AttrDescr(IpFwTlvType.IPFW_TLV_RULE_LIST, CTlvRule), ] ) class IpFwXRule(BaseIpFwMessage): messages = [Op3CmdType.IP_FW_XADD] attr_map = rule_attrs legacy_classes = [] set3_classes = [] get3_classes = [IpFwXRule]