Changeset View
Changeset View
Standalone View
Standalone View
tests/sys/net/routing/netlink.py
- This file was added.
#!/usr/local/bin/python3 | |||||
from ctypes import * | |||||
import socket | |||||
import os | |||||
import sys | |||||
import unittest | |||||
import struct | |||||
from enum import Enum, auto | |||||
from typing import List, Callable, Dict, NamedTuple, Optional | |||||
def roundup2(val: int, num: int) -> int: | |||||
if val % num: | |||||
return (val | (num - 1)) + 1 | |||||
else: | |||||
return val | |||||
def align4(val: int) -> int: | |||||
return roundup2(val, 4) | |||||
class SockaddrNl(Structure): | |||||
_fields_ = [ | |||||
("nl_len", c_ubyte), | |||||
("nl_family", c_ubyte), | |||||
("nl_pad", c_ushort), | |||||
("nl_pid", c_uint), | |||||
("nl_groups", c_uint), | |||||
] | |||||
class Nlmsghdr(Structure): | |||||
_fields_ = [ | |||||
("nlmsg_len", c_uint), | |||||
("nlmsg_type", c_ushort), | |||||
("nlmsg_flags", c_ushort), | |||||
("nlmsg_seq", c_uint), | |||||
("nlmsg_pid", c_uint), | |||||
] | |||||
class Nlmsgerr(Structure): | |||||
_fields_ = [ | |||||
("error", c_int), | |||||
("msg", Nlmsghdr), | |||||
] | |||||
class RtattrType(Enum): | |||||
RTA_UNSPEC = 0 | |||||
RTA_DST = auto() | |||||
RTA_SRC = auto() | |||||
RTA_IIF = auto() | |||||
RTA_OIF = auto() | |||||
RTA_GATEWAY = auto() | |||||
RTA_PRIORITY = auto() | |||||
RTA_PREFSRC = auto() | |||||
RTA_METRICS = auto() | |||||
RTA_MULTIPATH = auto() | |||||
RTA_PROTOINFO = auto() | |||||
RTA_FLOW = auto() | |||||
RTA_CACHEINFO = auto() | |||||
RTA_SESSION = auto() | |||||
RTA_MP_ALGO = auto() | |||||
RTA_TABLE = auto() | |||||
RTA_MARK = auto() | |||||
RTA_MFC_STATS = auto() | |||||
RTA_VIA = auto() | |||||
RTA_NEWDST = auto() | |||||
RTA_PREF = auto() | |||||
RTA_ENCAP_TYPE = auto() | |||||
RTA_ENCAP = auto() | |||||
RTA_EXPIRES = auto() | |||||
RTA_PAD = auto() | |||||
RTA_UID = auto() | |||||
RTA_TTL_PROPAGATE = auto() | |||||
RTA_IP_PROTO = auto() | |||||
RTA_SPORT = auto() | |||||
RTA_DPORT = auto() | |||||
RTA_NH_ID = auto() | |||||
class NlMsgType(Enum): | |||||
NLMSG_NOOP = 1 | |||||
NLMSG_ERROR = 2 | |||||
NLMSG_DONE = 3 | |||||
NLMSG_OVERRUN = 4 | |||||
class NlRtMsgType(Enum): | |||||
RTM_NEWLINK = 16 | |||||
RTM_DELLINK = 17 | |||||
RTM_GETLINK = 18 | |||||
RTM_SETLINK = 19 | |||||
RTM_NEWADDR = 20 | |||||
RTM_DELADDR = 21 | |||||
RTM_GETADDR = 22 | |||||
RTM_NEWROUTE = 24 | |||||
RTM_DELROUTE = 25 | |||||
RTM_GETROUTE = 26 | |||||
RTM_NEWNEIGH = 28 | |||||
RTM_DELNEIGH = 27 | |||||
RTM_GETNEIGH = 28 | |||||
RTM_NEWRULE = 32 | |||||
RTM_DELRULE = 33 | |||||
RTM_GETRULE = 34 | |||||
RTM_NEWQDISC = 36 | |||||
RTM_DELQDISC = 37 | |||||
RTM_GETQDISC = 38 | |||||
RTM_NEWTCLASS = 40 | |||||
RTM_DELTCLASS = 41 | |||||
RTM_GETTCLASS = 42 | |||||
RTM_NEWTFILTER = 44 | |||||
RTM_DELTFILTER = 45 | |||||
RTM_GETTFILTER = 46 | |||||
RTM_NEWACTION = 48 | |||||
RTM_DELACTION = 49 | |||||
RTM_GETACTION = 50 | |||||
RTM_NEWPREFIX = 52 | |||||
RTM_GETMULTICAST = 58 | |||||
RTM_GETANYCAST = 62 | |||||
RTM_NEWNEIGHTBL = 64 | |||||
RTM_GETNEIGHTBL = 66 | |||||
RTM_SETNEIGHTBL = 67 | |||||
RTM_NEWNDUSEROPT = 68 | |||||
RTM_NEWADDRLABEL = 72 | |||||
RTM_DELADDRLABEL = 73 | |||||
RTM_GETADDRLABEL = 74 | |||||
RTM_GETDCB = 78 | |||||
RTM_SETDCB = 79 | |||||
RTM_NEWNETCONF = 80 | |||||
RTM_GETNETCONF = 82 | |||||
RTM_NEWMDB = 84 | |||||
RTM_DELMDB = 85 | |||||
RTM_GETMDB = 86 | |||||
RTM_NEWNSID = 88 | |||||
RTM_DELNSID = 89 | |||||
RTM_GETNSID = 90 | |||||
RTM_NEWSTATS = 92 | |||||
RTM_GETSTATS = 94 | |||||
class RtAttr(Structure): | |||||
_fields_ = [ | |||||
("rta_len", c_ushort), | |||||
("rta_type", c_ushort), | |||||
] | |||||
class RtMsgHdr(Structure): | |||||
_fields_ = [ | |||||
("rtm_family", c_ubyte), | |||||
("rtm_dst_len", c_ubyte), | |||||
("rtm_src_len", c_ubyte), | |||||
("rtm_tos", c_ubyte), | |||||
("rtm_table", c_ubyte), | |||||
("rtm_protocol", c_ubyte), | |||||
("rtm_scope", c_ubyte), | |||||
("rtm_type", c_ubyte), | |||||
("rtm_flags", c_uint), | |||||
] | |||||
class RtMsgFlags(Enum): | |||||
RTM_F_NOTIFY = 0x100 | |||||
RTM_F_CLONED = 0x200 | |||||
RTM_F_EQUALIZE = 0x400 | |||||
RTM_F_PREFIX = 0x800 | |||||
RTM_F_LOOKUP_TABLE = 0x1000 | |||||
RTM_F_FIB_MATCH = 0x2000 | |||||
RTM_F_OFFLOAD = 0x4000 | |||||
RTM_F_TRAP = 0x8000 | |||||
RTM_F_OFFLOAD_FAILED = 0x20000000 | |||||
class AddressFamilyLinux(Enum): | |||||
AF_INET = socket.AF_INET | |||||
AF_INET6 = socket.AF_INET6 | |||||
AF_NETLINK = 16 | |||||
class AddressFamilyBsd(Enum): | |||||
AF_INET = socket.AF_INET | |||||
AF_INET6 = socket.AF_INET6 | |||||
AF_NETLINK = 38 | |||||
class NlmBaseFlags(Enum): | |||||
NLM_F_REQUEST = 0x01 | |||||
NLM_F_MULTI = 0x02 | |||||
NLM_F_ACK = 0x04 | |||||
NLM_F_ECHO = 0x08 | |||||
NLM_F_DUMP_INTR = 0x10 | |||||
NLM_F_DUMP_FILTERED = 0x20 | |||||
# XXX: in python3.8 it is possible to | |||||
# class NlmGetFlags(Enum, NlmBaseFlags): | |||||
class NlmGetFlags(Enum): | |||||
NLM_F_ROOT = 0x100 | |||||
NLM_F_MATCH = 0x200 | |||||
NLM_F_ATOMIC = 0x400 | |||||
class NlmNewFlags(Enum): | |||||
NLM_F_REPLACE = 0x100 | |||||
NLM_F_EXCL = 0x200 | |||||
NLM_F_CREATE = 0x400 | |||||
NLM_F_APPEND = 0x800 | |||||
class NlmDeleteFlags(Enum): | |||||
NLM_F_NONREC = 0x100 | |||||
class NlmAckFlags(Enum): | |||||
NLM_F_CAPPED = 0x100 | |||||
NLM_F_ACK_TLVS = 0x200 | |||||
class RtScope(Enum): | |||||
RT_SCOPE_UNIVERSE = 0 | |||||
RT_SCOPE_SITE = 200 | |||||
RT_SCOPE_LINK = 253 | |||||
RT_SCOPE_HOST = 254 | |||||
RT_SCOPE_NOWHERE = 255 | |||||
class RtType(Enum): | |||||
RTN_UNSPEC = 0 | |||||
RTN_UNICAST = auto() | |||||
RTN_LOCAL = auto() | |||||
RTN_BROADCAST = auto() | |||||
RTN_ANYCAST = auto() | |||||
RTN_MULTICAST = auto() | |||||
RTN_BLACKHOLE = auto() | |||||
RTN_UNREACHABLE = auto() | |||||
RTN_PROHIBIT = auto() | |||||
RTN_THROW = auto() | |||||
RTN_NAT = auto() | |||||
RTN_XRESOLVE = auto() | |||||
class RtProto(Enum): | |||||
RTPROT_UNSPEC = 0 | |||||
RTPROT_REDIRECT = 1 | |||||
RTPROT_KERNEL = 2 | |||||
RTPROT_BOOT = 3 | |||||
RTPROT_STATIC = 4 | |||||
RTPROT_GATED = 8 | |||||
RTPROT_RA = 9 | |||||
RTPROT_MRT = 10 | |||||
RTPROT_ZEBRA = 11 | |||||
RTPROT_BIRD = 12 | |||||
RTPROT_DNROUTED = 13 | |||||
RTPROT_XORP = 14 | |||||
RTPROT_NTK = 15 | |||||
RTPROT_DHCP = 16 | |||||
RTPROT_MROUTED = 17 | |||||
RTPROT_KEEPALIVED = 18 | |||||
RTPROT_BABEL = 42 | |||||
RTPROT_OPENR = 99 | |||||
RTPROT_BGP = 186 | |||||
RTPROT_ISIS = 187 | |||||
RTPROT_OSPF = 188 | |||||
RTPROT_RIP = 189 | |||||
RTPROT_EIGRP = 192 | |||||
class NlRtaxType(Enum): | |||||
RTAX_UNSPEC = 0 | |||||
RTAX_LOCK = auto() | |||||
RTAX_MTU = auto() | |||||
RTAX_WINDOW = auto() | |||||
RTAX_RTT = auto() | |||||
RTAX_RTTVAR = auto() | |||||
RTAX_SSTHRESH = auto() | |||||
RTAX_CWND = auto() | |||||
RTAX_ADVMSS = auto() | |||||
RTAX_REORDERING = auto() | |||||
RTAX_HOPLIMIT = auto() | |||||
RTAX_INITCWND = auto() | |||||
RTAX_FEATURES = auto() | |||||
RTAX_RTO_MIN = auto() | |||||
RTAX_INITRWND = auto() | |||||
RTAX_QUICKACK = auto() | |||||
RTAX_CC_ALGO = auto() | |||||
RTAX_FASTOPEN_NO_COOKIE = auto() | |||||
class NlRtGroup(Enum): | |||||
RTNLGRP_NONE = 0 | |||||
RTNLGRP_LINK = auto() | |||||
RTNLGRP_NOTIFY = auto() | |||||
RTNLGRP_NEIGH = auto() | |||||
RTNLGRP_TC = auto() | |||||
RTNLGRP_IPV4_IFADDR = auto() | |||||
RTNLGRP_IPV4_MROUTE = auto() | |||||
RTNLGRP_IPV4_ROUTE = auto() | |||||
RTNLGRP_IPV4_RULE = auto() | |||||
RTNLGRP_IPV6_IFADDR = auto() | |||||
RTNLGRP_IPV6_MROUTE = auto() | |||||
RTNLGRP_IPV6_ROUTE = auto() | |||||
RTNLGRP_IPV6_IFINFO = auto() | |||||
RTNLGRP_DECnet_IFADDR = auto() | |||||
RTNLGRP_NOP2 = auto() | |||||
RTNLGRP_DECnet_ROUTE = auto() | |||||
RTNLGRP_DECnet_RULE = auto() | |||||
RTNLGRP_NOP4 = auto() | |||||
RTNLGRP_IPV6_PREFIX = auto() | |||||
RTNLGRP_IPV6_RULE = auto() | |||||
RTNLGRP_ND_USEROPT = auto() | |||||
RTNLGRP_PHONET_IFADDR = auto() | |||||
RTNLGRP_PHONET_ROUTE = auto() | |||||
RTNLGRP_DCB = auto() | |||||
RTNLGRP_IPV4_NETCONF = auto() | |||||
RTNLGRP_IPV6_NETCONF = auto() | |||||
RTNLGRP_MDB = auto() | |||||
RTNLGRP_MPLS_ROUTE = auto() | |||||
RTNLGRP_NSID = auto() | |||||
RTNLGRP_MPLS_NETCONF = auto() | |||||
RTNLGRP_IPV4_MROUTE_R = auto() | |||||
RTNLGRP_IPV6_MROUTE_R = auto() | |||||
RTNLGRP_NEXTHOP = auto() | |||||
RTNLGRP_BRVLAN = auto() | |||||
class IfinfoMsg(Structure): | |||||
_fields_ = [ | |||||
("ifi_family", c_ubyte), | |||||
("__ifi_pad", c_ubyte), | |||||
("ifi_type", c_ushort), | |||||
("ifi_index", c_int), | |||||
("ifi_flags", c_uint), | |||||
("ifi_change", c_uint), | |||||
] | |||||
class IflattrType(Enum): | |||||
IFLA_UNSPEC = 0 | |||||
IFLA_ADDRESS = auto() | |||||
IFLA_BROADCAST = auto() | |||||
IFLA_IFNAME = auto() | |||||
IFLA_MTU = auto() | |||||
IFLA_LINK = auto() | |||||
IFLA_QDISC = auto() | |||||
IFLA_STATS = auto() | |||||
IFLA_COST = auto() | |||||
IFLA_PRIORITY = auto() | |||||
IFLA_MASTER = auto() | |||||
IFLA_WIRELESS = auto() | |||||
IFLA_PROTINFO = auto() | |||||
IFLA_TXQLEN = auto() | |||||
IFLA_MAP = auto() | |||||
IFLA_WEIGHT = auto() | |||||
IFLA_OPERSTATE = auto() | |||||
IFLA_LINKMODE = auto() | |||||
IFLA_LINKINFO = auto() | |||||
IFLA_NET_NS_PID = auto() | |||||
IFLA_IFALIAS = auto() | |||||
IFLA_NUM_VF = auto() | |||||
IFLA_VFINFO_LIST = auto() | |||||
IFLA_STATS64 = auto() | |||||
IFLA_VF_PORTS = auto() | |||||
IFLA_PORT_SELF = auto() | |||||
IFLA_AF_SPEC = auto() | |||||
IFLA_GROUP = auto() | |||||
IFLA_NET_NS_FD = auto() | |||||
IFLA_EXT_MASK = auto() | |||||
IFLA_PROMISCUITY = auto() | |||||
IFLA_NUM_TX_QUEUES = auto() | |||||
IFLA_NUM_RX_QUEUES = auto() | |||||
IFLA_CARRIER = auto() | |||||
IFLA_PHYS_PORT_ID = auto() | |||||
IFLA_CARRIER_CHANGES = auto() | |||||
IFLA_PHYS_SWITCH_ID = auto() | |||||
IFLA_LINK_NETNSID = auto() | |||||
IFLA_PHYS_PORT_NAME = auto() | |||||
IFLA_PROTO_DOWN = auto() | |||||
IFLA_GSO_MAX_SEGS = auto() | |||||
IFLA_GSO_MAX_SIZE = auto() | |||||
IFLA_PAD = auto() | |||||
IFLA_XDP = auto() | |||||
IFLA_EVENT = auto() | |||||
IFLA_NEW_NETNSID = auto() | |||||
IFLA_IF_NETNSID = auto() | |||||
IFLA_CARRIER_UP_COUNT = auto() | |||||
IFLA_CARRIER_DOWN_COUNT = auto() | |||||
IFLA_NEW_IFINDEX = auto() | |||||
IFLA_MIN_MTU = auto() | |||||
IFLA_MAX_MTU = auto() | |||||
IFLA_PROP_LIST = auto() | |||||
IFLA_ALT_IFNAME = auto() | |||||
IFLA_PERM_ADDRESS = auto() | |||||
IFLA_PROTO_DOWN_REASON = auto() | |||||
class IfaddrMsg(Structure): | |||||
_fields_ = [ | |||||
("ifa_family", c_ubyte), | |||||
("ifa_prefixlen", c_ubyte), | |||||
("ifa_flags", c_ubyte), | |||||
("ifa_scope", c_ubyte), | |||||
("ifa_index", c_uint), | |||||
] | |||||
class IfattrType(Enum): | |||||
IFA_UNSPEC = 0 | |||||
IFA_ADDRESS = auto() | |||||
IFA_LOCAL = auto() | |||||
IFA_LABEL = auto() | |||||
IFA_BROADCAST = auto() | |||||
IFA_ANYCAST = auto() | |||||
IFA_CACHEINFO = auto() | |||||
IFA_MULTICAST = auto() | |||||
IFA_FLAGS = auto() | |||||
IFA_RT_PRIORITY = auto() | |||||
IFA_TARGET_NETNSID = auto() | |||||
class NlConst(): | |||||
AF_NETLINK = 38 | |||||
NETLINK_ROUTE = 0 | |||||
class NlHelper(): | |||||
def __init__(self): | |||||
self._pmap = {} | |||||
self._af_cls = self.get_af_cls() | |||||
def get_af_cls(self): | |||||
if sys.platform.startswith("freebsd"): | |||||
cls = AddressFamilyBsd | |||||
else: | |||||
cls = AddressFamilyLinux | |||||
return cls | |||||
def get_propmap(self, cls): | |||||
if cls not in self._pmap: | |||||
ret = {} | |||||
for prop in dir(cls): | |||||
if not prop.startswith("_"): | |||||
ret[getattr(cls, prop).value] = prop | |||||
self._pmap[cls] = ret | |||||
return self._pmap[cls] | |||||
def get_name_propmap(self, cls): | |||||
ret = {} | |||||
for prop in dir(cls): | |||||
if not prop.startswith("_"): | |||||
ret[prop] = getattr(cls, prop).value | |||||
return ret | |||||
def get_attr_byval(self, cls, attr_val): | |||||
propmap = self.get_propmap(cls) | |||||
return propmap.get(attr_val) | |||||
def get_nlmsg_name(self, val): | |||||
for cls in [NlRtMsgType, NlMsgType]: | |||||
v = self.get_attr_byval(cls, val) | |||||
if v is not None: | |||||
return v | |||||
return "msg#{}".format(val) | |||||
def get_af_name(self, family): | |||||
v = self.get_attr_byval(self._af_cls, family) | |||||
if v is not None: | |||||
return v | |||||
return "af#{}".format(family) | |||||
def get_af_value(self, family_str: str) -> int: | |||||
propmap = self.get_name_propmap(self._af_cls) | |||||
return propmap.get(family_str) | |||||
def get_rta_name(self, val): | |||||
return self.get_attr_byval(RtattrType, val) | |||||
def get_bitmask_map(self, cls, val): | |||||
propmap = self.get_propmap(cls) | |||||
v = 1 | |||||
ret = {} | |||||
while val: | |||||
if v & val: | |||||
if v in propmap: | |||||
ret[v] = propmap[v] | |||||
else: | |||||
ret[v] = hex(v) | |||||
val -= v | |||||
v *= 2 | |||||
return ret | |||||
def get_bitmask_str(self, cls, val): | |||||
bmap = self.get_bitmask_map(cls, val) | |||||
return ",".join([v for k, v in bmap.items()]) | |||||
def get_nlm_flags_str(self, msg_str: str, reply: bool, val): | |||||
if reply: | |||||
return self.get_bitmask_str(NlmAckFlags, val) | |||||
if msg_str.startswith("RTM_GET"): | |||||
return self.get_bitmask_str(NlmGetFlags, val) | |||||
elif msg_str.startswith("RTM_DEL"): | |||||
return self.get_bitmask_str(NlmDeleteFlags, val) | |||||
elif msg_str.startswith("RTM_NEW"): | |||||
return self.get_bitmask_str(NlmNewFlags, val) | |||||
else: | |||||
return self.get_bitmask_str(NlmBaseFlags, val) | |||||
class BaseRtAttr(object): | |||||
def __init__(self, parent, rta_type, rta_len, data=None): | |||||
self.parent = parent | |||||
self.helper = parent.helper | |||||
self.attr_enum = parent.attr_enum | |||||
self.rta_type = rta_type & 0x3f | |||||
self.is_nested = rta_type & (1 << 15) | |||||
self.network_byte_order = rta_type & (1 << 14) | |||||
self.rta_len = rta_len | |||||
self.rta_type_str = self.helper.get_attr_byval(self.attr_enum, self.rta_type) # noqa: E501 | |||||
if data is not None: | |||||
self._validate(data) | |||||
self._parse(data) | |||||
self._orig_data = data | |||||
def print_attribute(self, prepend=""): | |||||
if self.rta_type_str: | |||||
type_str = self.rta_type_str | |||||
else: | |||||
type_str = "rta#{}".format(self.rta_type) | |||||
print("{}rta_len={} rta_type={}({}){}".format(prepend, | |||||
self.rta_len, | |||||
type_str, | |||||
self.rta_type, | |||||
self._print_attr_value()) | |||||
) | |||||
def _print_attr_value(self): | |||||
return " [" + " ".join(["{:02X}".format(b) for b in self._orig_data[4:]]) + "]" # noqa: E501 | |||||
@classmethod | |||||
def from_bytes(cls, parent, data): | |||||
if len(data) < sizeof(RtAttr): | |||||
raise ValueError("length less than rtattr header") | |||||
rta_hdr = RtAttr.from_buffer_copy(data) | |||||
self = cls(parent, rta_hdr.rta_type, rta_hdr.rta_len, data[:rta_hdr.rta_len]) # noqa: E501 | |||||
# XXX: nested | |||||
return self | |||||
def __bytes__(self): | |||||
ret = self._orig_data | |||||
if align4(len(ret)) != len(ret): | |||||
ret += bytes(align4(len(ret)) - len(ret)) | |||||
return ret | |||||
def _validate(self, data): | |||||
pass | |||||
def _parse(self, data): | |||||
pass | |||||
class RtAttrIp(BaseRtAttr): | |||||
def _validate(self, data): | |||||
data_len = len(data) - 4 | |||||
if data_len != 4 and data_len != 16: | |||||
raise ValueError("Error validating attr {}: rta_len is not valid".format( # noqa: E501 | |||||
self.rta_type_str)) | |||||
def _parse(self, data): | |||||
data_len = len(data) - 4 | |||||
if data_len == 4: | |||||
self.family = socket.AF_INET | |||||
self.addr = socket.inet_ntop(self.family, data[4:8]) | |||||
else: | |||||
self.family = socket.AF_INET6 | |||||
self.addr = socket.inet_ntop(self.family, data[4:20]) | |||||
def _print_attr_value(self): | |||||
return " addr={}".format(self.addr) | |||||
class RtAttrU32(BaseRtAttr): | |||||
def _validate(self, data): | |||||
if len(data) != 8: | |||||
raise ValueError("Error validating attr {}: rta_len is not valid".format( # noqa: E501 | |||||
self.rta_type_str)) | |||||
def _parse(self, data): | |||||
self.value = struct.unpack("@I", data[4:8])[0] | |||||
def _print_attr_value(self): | |||||
return " value={}".format(self.value) | |||||
class RtAttrIfindex(RtAttrU32): | |||||
def _print_attr_value(self): | |||||
try: | |||||
ifname = socket.if_indextoname(self.value) | |||||
return " iface={}(#{})".format(ifname, self.value) | |||||
except OSError as e: | |||||
pass | |||||
return " iface=if#{}".format(self.value) | |||||
class RtAttrTable(RtAttrU32): | |||||
def _print_attr_value(self): | |||||
return " rtable={}".format(self.value) | |||||
class RtAttrNhId(RtAttrU32): | |||||
def _print_attr_value(self): | |||||
return " nh_id={}".format(self.value) | |||||
class RtAttrVia(BaseRtAttr): | |||||
def _validate(self, data): | |||||
data_len = len(data) - 4 | |||||
if data_len == 0: | |||||
raise ValueError("Error validating attr {}: empty data".format(self.rta_type_str)) # noqa: E501 | |||||
family = int(data_len[0]) | |||||
if family not in (socket.AF_INET, socket.AF_INET6): | |||||
raise ValueError("Error validating attr {}: unsupported AF {}".format( # noqa: E501 | |||||
self.rta_type_str, family)) | |||||
if family == socket.AF_INET: | |||||
expected_len = 1 + 4 | |||||
else: | |||||
expected_len = 1 + 16 | |||||
if data_len != expected_len: | |||||
raise ValueError("Error validating attr {}: expected len {} got {}".format( # noqa: E501 | |||||
self.rta_type_str, expected_len, data_len)) | |||||
def _parse(self, data): | |||||
data_len = len(data) - 4 | |||||
self.family = int(data_len[0]) | |||||
if self.family == socket.AF_INET: | |||||
self.addr = socket.inet_ntop(self.family, data[5:9]) | |||||
else: | |||||
self.addr = socket.inet_ntop(self.family, data[5:21]) | |||||
def _print_attr_value(self): | |||||
return ", via={}".format(self.addr) | |||||
class RtAttrStr(BaseRtAttr): | |||||
def _validate(self, data): | |||||
try: | |||||
s = data[4:].decode("utf-8") | |||||
except Exception as e: | |||||
raise ValueError("wrong utf-8 string") | |||||
def _parse(self, data): | |||||
self.str = data[4:].decode("utf-8") | |||||
def _print_attr_value(self): | |||||
return " str=\"{}\"".format(self.str) | |||||
rta_class_map = { | |||||
"RTA_DST": RtAttrIp, | |||||
"RTA_SRC": RtAttrIp, | |||||
"RTA_IIF": RtAttrIfindex, | |||||
"RTA_OIF": RtAttrIfindex, | |||||
"RTA_GATEWAY": RtAttrIp, | |||||
"RTA_TABLE": RtAttrTable, | |||||
"RTA_VIA": RtAttrVia, | |||||
"RTA_NH_ID": RtAttrNhId, | |||||
} | |||||
ifla_class_map = { | |||||
"IFLA_MTU": RtAttrU32, | |||||
} | |||||
ifa_class_map = { | |||||
"IFA_ADDRESS": RtAttrIp, | |||||
"IFA_LOCAL": RtAttrIp, | |||||
"IFA_LABEL": RtAttrStr, | |||||
"IFA_BROADCAST": RtAttrIp, | |||||
"IFA_ANYCAST": RtAttrIp, | |||||
"IFA_FLAGS": RtAttrU32, | |||||
} | |||||
class BaseNetlinkMessage(object): | |||||
def __init__(self, helper, nlmsg_type): | |||||
self.nlmsg_type = nlmsg_type | |||||
self.ut = unittest.TestCase() | |||||
self.rta_list = [] | |||||
self._orig_data = None | |||||
self.helper = helper | |||||
self.nl_hdr = Nlmsghdr(nlmsg_type=nlmsg_type) | |||||
def assertEqual(self, a, b, msg=None): | |||||
self.ut.assertEqual(a, b, msg) | |||||
def assertNotEqual(self, a, b, msg=None): | |||||
self.ut.assertNotEqual(a, b, msg) | |||||
@staticmethod | |||||
def parse_nl_header(data: bytes): | |||||
if len(data) < sizeof(Nlmsghdr): | |||||
raise ValueError("length less than netlink message header") | |||||
return Nlmsghdr.from_buffer_copy(data), sizeof(Nlmsghdr) | |||||
def is_reply(self, hdr): | |||||
return hdr.nlmsg_type == NlMsgType.NLMSG_ERROR.value | |||||
def print_nl_header(self, hdr, prepend=""): | |||||
# len=44, type=RTM_DELROUTE, flags=NLM_F_REQUEST|NLM_F_ACK, seq=1641163704, pid=0 # noqa: E501 | |||||
is_reply = self.is_reply(hdr) | |||||
msg_name = self.helper.get_nlmsg_name(hdr.nlmsg_type) | |||||
print("{}len={}, type={}, flags={}(0x{:X}), seq={}, pid={}".format( | |||||
prepend, | |||||
hdr.nlmsg_len, | |||||
msg_name, | |||||
self.helper.get_nlm_flags_str(msg_name, is_reply, hdr.nlmsg_flags), # noqa: E501 | |||||
hdr.nlmsg_flags, | |||||
hdr.nlmsg_seq, | |||||
hdr.nlmsg_pid | |||||
)) | |||||
@classmethod | |||||
def from_bytes(cls, helper, data): | |||||
try: | |||||
hdr, hdrlen = BaseNetlinkMessage.parse_nl_header(data) | |||||
self = cls(helper, hdr.nlmsg_type) | |||||
self._orig_data = data | |||||
self.nl_hdr = hdr | |||||
except ValueError as e: | |||||
print("Failed to parse nl header: {}".format(e)) | |||||
cls.print_as_bytes(data) | |||||
raise | |||||
return self | |||||
def print_message(self): | |||||
self.print_nl_header(self.nl_hdr) | |||||
@staticmethod | |||||
def print_as_bytes(data: bytes, descr: str): | |||||
print("===vv {} (len:{:3d}) vv===".format(descr, len(data))) | |||||
off = 0 | |||||
step = 16 | |||||
while off < len(data): | |||||
for i in range(step): | |||||
if off + i < len(data): | |||||
print(" {:02X}".format(data[off + i]), end="") | |||||
print("") | |||||
off += step | |||||
print("--------------------") | |||||
class NetlinkErrorMessage(BaseNetlinkMessage): | |||||
messages = [NlMsgType.NLMSG_ERROR.value] | |||||
def __init__(self, helper, nlmsg_type, error): | |||||
super().__init__(helper, nlmsg_type) | |||||
self.err_hdr = Nlmsgerr() | |||||
def print_error_header(self, errhdr, prepend=""): | |||||
print("{}error={}, ".format(prepend), end="") | |||||
self.print_nl_header(errhdr.msg, prepend) | |||||
def print_message(self, prepend=""): | |||||
self.print_nl_header(self.nl_nhr, prepend) | |||||
self.print_error_header(self.err_hdr, prepend + " ") | |||||
class BaseNetlinkRtMessage(BaseNetlinkMessage): | |||||
attr_class_map = {} | |||||
attr_enum = None | |||||
def __init__(self, helper, nlm_type): | |||||
super().__init__(helper, nlm_type) | |||||
self.base_hdr = None | |||||
def parse_rta_list(self, data: bytes) -> List[BaseRtAttr]: | |||||
ret = [] | |||||
offset = 0 | |||||
while offset < len(data): | |||||
# print("OFFSET={}".format(offset)) | |||||
if offset + 4 > len(data): | |||||
raise ValueError("only {} bytes remaining".format(len(data) - offset)) # noqa: E501 | |||||
rta_hdr = RtAttr.from_buffer_copy(data[offset:]) | |||||
rta_type_str = self.helper.get_attr_byval(self.attr_enum, rta_hdr.rta_type) # noqa: E501 | |||||
cls = self.attr_class_map.get(rta_type_str, BaseRtAttr) | |||||
rta = cls.from_bytes(self, data[offset:]) | |||||
offset += align4(rta.rta_len) | |||||
if rta.rta_len == 0: | |||||
raise ValueError("empty rta len, {} bytes remaining".format(len(data) - offset)) # noqa: E501 | |||||
ret.append(rta) | |||||
return ret, offset | |||||
@classmethod | |||||
def from_bytes(cls, helper, data): | |||||
try: | |||||
hdr, hdrlen = BaseNetlinkMessage.parse_nl_header(data) | |||||
self = cls(helper, hdr.nlmsg_type) | |||||
self._orig_data = data | |||||
self.nl_hdr = hdr | |||||
except ValueError as e: | |||||
print("Failed to parse nl header: {}".format(e)) | |||||
cls.print_as_bytes(data) | |||||
raise | |||||
offset = align4(hdrlen) | |||||
try: | |||||
base_hdr, hdrlen = self.parse_base_header(data[offset:]) | |||||
self.base_hdr = base_hdr | |||||
offset += align4(hdrlen) | |||||
except ValueError as e: | |||||
print("Failed to parse nl rt header: {}".format(e)) | |||||
cls.print_as_bytes(data) | |||||
raise | |||||
orig_offset = offset | |||||
try: | |||||
rta_list, rta_len = self.parse_rta_list(data[offset:]) | |||||
offset += rta_len | |||||
if offset != len(data): | |||||
raise ValueError("{} bytes left at the end of the packet".format(len(data) - offset)) # noqa: E501 | |||||
self.rta_list = rta_list | |||||
except ValueError as e: | |||||
print("Failed to parse nl rta attributes at offset {}: {}".format(orig_offset, e)) # noqa: E501 | |||||
cls.print_as_bytes(data, "msg dump") | |||||
cls.print_as_bytes(data[orig_offset:], "failed block") | |||||
raise | |||||
return self | |||||
def __bytes__(self): | |||||
ret = bytes() | |||||
for rta in self.rta_list: | |||||
ret += bytes(rta) | |||||
ret = bytes(self.base_hdr) + ret | |||||
self.nl_hdr.nlmsg_len = len(ret) + sizeof(Nlmsghdr) | |||||
return bytes(self.nl_hdr) + ret | |||||
def print_message(self): | |||||
self.print_nl_header(self.nl_hdr) | |||||
self.print_base_header(self.base_hdr, " ") | |||||
for rta in self.rta_list: | |||||
rta.print_attribute(" ") | |||||
class NetlinkRtMessage(BaseNetlinkRtMessage): | |||||
messages = [ | |||||
NlRtMsgType.RTM_NEWROUTE.value, | |||||
NlRtMsgType.RTM_DELROUTE.value, | |||||
NlRtMsgType.RTM_GETROUTE.value, | |||||
] | |||||
attr_class_map = rta_class_map | |||||
attr_enum = RtattrType | |||||
def __init__(self, helper, nlm_type): | |||||
super().__init__(helper, nlm_type) | |||||
self.base_hdr = RtMsgHdr() | |||||
def parse_base_header(self, data): | |||||
if len(data) < sizeof(RtMsgHdr): | |||||
raise ValueError("length less than rtmsg header") | |||||
rtm_hdr = RtMsgHdr.from_buffer_copy(data) | |||||
return (rtm_hdr, sizeof(RtMsgHdr)) | |||||
def print_base_header(self, hdr, prepend=""): | |||||
family = self.helper.get_af_name(hdr.rtm_family) | |||||
print("{}family={}, dst_len={}, src_len={}, tos={}, table={}, protocol={}({}), scope={}({}), type={}({}), flags={}({})".format( # noqa: E501 | |||||
prepend, | |||||
family, | |||||
hdr.rtm_dst_len, | |||||
hdr.rtm_src_len, | |||||
hdr.rtm_tos, | |||||
hdr.rtm_table, | |||||
self.helper.get_attr_byval(RtProto, hdr.rtm_protocol), | |||||
hdr.rtm_protocol, | |||||
self.helper.get_attr_byval(RtScope, hdr.rtm_scope), | |||||
hdr.rtm_scope, | |||||
self.helper.get_attr_byval(RtType, hdr.rtm_type), | |||||
hdr.rtm_type, | |||||
self.helper.get_bitmask_str(RtMsgFlags, hdr.rtm_flags), | |||||
hdr.rtm_flags)) | |||||
class NetlinkIflaMessage(BaseNetlinkRtMessage): | |||||
messages = [ | |||||
NlRtMsgType.RTM_NEWLINK.value, | |||||
NlRtMsgType.RTM_DELLINK.value, | |||||
NlRtMsgType.RTM_GETLINK.value, | |||||
] | |||||
attr_class_map = ifla_class_map | |||||
attr_enum = IflattrType | |||||
def __init__(self, helper, nlm_type): | |||||
super().__init__(helper, nlm_type) | |||||
self.base_hdr = IfinfoMsg() | |||||
def parse_base_header(self, data): | |||||
if len(data) < sizeof(IfinfoMsg): | |||||
raise ValueError("length less than IfinfoMsg header") | |||||
rtm_hdr = IfinfoMsg.from_buffer_copy(data) | |||||
return (rtm_hdr, sizeof(IfinfoMsg)) | |||||
def print_base_header(self, hdr, prepend=""): | |||||
family = self.helper.get_af_name(hdr.ifi_family) | |||||
print("{}family={}, ifi_type={}, ifi_index={}, ifi_flags={}, ifi_change={}".format( # noqa: E501 | |||||
prepend, | |||||
family, | |||||
hdr.ifi_type, | |||||
hdr.ifi_index, | |||||
hdr.ifi_flags, | |||||
hdr.ifi_change)) | |||||
class NetlinkIfaMessage(BaseNetlinkRtMessage): | |||||
messages = [ | |||||
NlRtMsgType.RTM_NEWADDR.value, | |||||
NlRtMsgType.RTM_DELADDR.value, | |||||
NlRtMsgType.RTM_GETADDR.value, | |||||
] | |||||
attr_class_map = ifa_class_map | |||||
attr_enum = IfattrType | |||||
def __init__(self, helper, nlm_type): | |||||
super().__init__(helper, nlm_type) | |||||
self.base_hdr = IfaddrMsg() | |||||
def parse_base_header(self, data): | |||||
if len(data) < sizeof(IfaddrMsg): | |||||
raise ValueError("length less than IfaddrMsg header") | |||||
rtm_hdr = IfaddrMsg.from_buffer_copy(data) | |||||
return (rtm_hdr, sizeof(IfaddrMsg)) | |||||
def print_base_header(self, hdr, prepend=""): | |||||
family = self.helper.get_af_name(hdr.ifa_family) | |||||
print("{}family={}, ifa_prefixlen={}, ifa_flags={}, ifa_scope={}, ifa_index={}".format( # noqa: E501 | |||||
prepend, | |||||
family, | |||||
hdr.ifa_prefixlen, | |||||
hdr.ifa_flags, | |||||
hdr.ifa_scope, | |||||
hdr.ifa_index)) | |||||
class Nlsock(): | |||||
def __init__(self, helper): | |||||
self.helper = helper | |||||
self.sock_fd = self._setup_netlink() | |||||
self._data = bytes() | |||||
self.rtm_seq = 1 | |||||
self.pid = os.getpid() | |||||
self.msgmap = self.build_msgmap() | |||||
self.set_groups(NlRtGroup.RTNLGRP_IPV4_ROUTE.value | NlRtGroup.RTNLGRP_IPV6_ROUTE.value) # noqa: E501 | |||||
def build_msgmap(self): | |||||
classes = [NetlinkRtMessage, NetlinkIfaMessage, NetlinkErrorMessage] | |||||
xmap = {} | |||||
for cls in classes: | |||||
for message in cls.messages: | |||||
xmap[message] = cls | |||||
return xmap | |||||
def get_seq(self): | |||||
ret = self.rtm_seq | |||||
self.rtm_seq += 1 | |||||
return ret | |||||
def _setup_netlink(self) -> int: | |||||
family = self.helper.get_af_value("AF_NETLINK") | |||||
s = socket.socket(family, socket.SOCK_RAW, NlConst.NETLINK_ROUTE) | |||||
return s | |||||
def set_groups(self, mask: int): | |||||
self.sock_fd.setsockopt(socket.SOL_SOCKET, 1, mask) | |||||
# snl = SockaddrNl(nl_len = sizeof(SockaddrNl), nl_family=38, | |||||
# nl_pid=self.pid, nl_groups=mask) | |||||
# xbuffer = create_string_buffer(sizeof(SockaddrNl)) | |||||
# memmove(xbuffer, addressof(snl), sizeof(SockaddrNl)) | |||||
# k = struct.pack("@BBHII", 12, 38, 0, self.pid, mask) | |||||
# self.sock_fd.bind(k) | |||||
def write_message(self, msg): | |||||
print("vvvvvvvv OUT vvvvvvvv") | |||||
msg.print_message() | |||||
msg_bytes = bytes(msg) | |||||
try: | |||||
ret = os.write(self.sock_fd.fileno(), bytes(msg)) | |||||
except Exception as e: | |||||
print("write({}) -> {}".format(len(msg_bytes), e)) | |||||
def parse_message(self, data: bytes): | |||||
if len(data) < sizeof(Nlmsghdr): | |||||
raise Exception("Short read from nl: {} bytes".format(len(data))) | |||||
hdr = Nlmsghdr.from_buffer_copy(data) | |||||
nlmsg_type = hdr.nlmsg_type | |||||
cls = self.msgmap.get(nlmsg_type) | |||||
if not cls: | |||||
cls = BaseNetlinkMessage | |||||
return cls.from_bytes(self.helper, data) | |||||
def write_data(self, data: bytes): | |||||
self.sock_fd.send(data) | |||||
def read_data(self): | |||||
while True: | |||||
data = self.sock_fd.recv(65535) | |||||
self._data += data | |||||
if len(self._data) >= sizeof(Nlmsghdr): | |||||
break | |||||
if seq is None: | |||||
break | |||||
hdr = Nlmsghdr.from_buffer_copy(data) | |||||
if hdr.nlmsg_pid == self.pid and hdr.nlmsg_seq == seq: | |||||
break | |||||
return data | |||||
def read_message(self) -> bytes: | |||||
if len(self._data) < sizeof(Nlmsghdr): | |||||
self.read_data() | |||||
hdr = Nlmsghdr.from_buffer_copy(self._data) | |||||
while (hdr.nlmsg_len > len(self._data)): | |||||
self.read_data() | |||||
raw_msg = self._data[:hdr.nlmsg_len] | |||||
self._data = self._data[hdr.nlmsg_len:] | |||||
return self.parse_message(raw_msg) | |||||
def fill_msg_seq(self, msg): | |||||
msg.nl_hdr.nlmsg_seq = self.get_seq() | |||||
msg.nl_hdr.nlmsg_pid = self.pid | |||||
def request_ifaddrs(self, family): | |||||
msg = NetlinkIfaMessage(self.helper, NlRtMsgType.RTM_GETADDR.value) | |||||
flags = NlmGetFlags.NLM_F_ROOT.value | NlmGetFlags.NLM_F_MATCH.value | |||||
self.fill_msg_seq(msg) | |||||
msg.base_hdr.ifa_family = family | |||||
msg.nl_hdr.nlmsg_flags = flags | NlmBaseFlags.NLM_F_REQUEST.value | |||||
msg_bytes = bytes(msg) | |||||
x = self.parse_message(msg_bytes) | |||||
x.print_message() | |||||
print(msg_bytes) | |||||
# Skip family for now | |||||
self.write_data(msg_bytes) | |||||
def request_routes(self, family): | |||||
msg = NetlinkRtMessage(self.helper, NlRtMsgType.RTM_GETROUTE.value) | |||||
flags = NlmGetFlags.NLM_F_ROOT.value | NlmGetFlags.NLM_F_MATCH.value | |||||
self.fill_msg_seq(msg) | |||||
msg.base_hdr.rtm_family = family | |||||
msg.nl_hdr.nlmsg_flags = flags | NlmBaseFlags.NLM_F_REQUEST.value | |||||
msg_bytes = bytes(msg) | |||||
x = self.parse_message(msg_bytes) | |||||
x.print_message() | |||||
print(msg_bytes) | |||||
# Skip family for now | |||||
self.write_data(msg_bytes) | |||||
def main(): | |||||
helper = NlHelper() | |||||
nl = Nlsock(helper) | |||||
# nl.request_ifaddrs(socket.AF_INET) | |||||
nl.request_routes(0) | |||||
while True: | |||||
msg = nl.read_message() | |||||
print("") | |||||
msg.print_message() | |||||
pass | |||||
if __name__ == "__main__": | |||||
main() |