Changeset View
Changeset View
Standalone View
Standalone View
tests/sys/netpfil/common/pft_ping.py
Show First 20 Lines • Show All 109 Lines • ▼ Show 20 Lines | def check_ping6_request(args, packet): | ||||
icmp = packet.getlayer(sp.ICMPv6EchoRequest) | icmp = packet.getlayer(sp.ICMPv6EchoRequest) | ||||
if not icmp: | if not icmp: | ||||
return False | return False | ||||
if icmp.data != PAYLOAD_MAGIC: | if icmp.data != PAYLOAD_MAGIC: | ||||
return False | return False | ||||
return True | return True | ||||
def check_ping_reply(args, packet): | |||||
return check_ping4_reply(args, packet) | |||||
def check_ping4_reply(args, packet): | |||||
""" | |||||
Check that this is a reply to the ping request we sent | |||||
""" | |||||
dst_ip = args.to[0] | |||||
ip = packet.getlayer(sp.IP) | |||||
if not ip: | |||||
return False | |||||
if ip.src != dst_ip: | |||||
return False | |||||
icmp = packet.getlayer(sp.ICMP) | |||||
if not icmp: | |||||
return False | |||||
if sp.icmptypes[icmp.type] != 'echo-reply': | |||||
return False | |||||
raw = packet.getlayer(sp.Raw) | |||||
if not raw: | |||||
return False | |||||
if raw.load != PAYLOAD_MAGIC: | |||||
return False | |||||
return True | |||||
def ping(send_if, dst_ip, args): | def ping(send_if, dst_ip, args): | ||||
ether = sp.Ether() | ether = sp.Ether() | ||||
ip = sp.IP(dst=dst_ip) | ip = sp.IP(dst=dst_ip) | ||||
icmp = sp.ICMP(type='echo-request') | icmp = sp.ICMP(type='echo-request') | ||||
raw = sp.raw(PAYLOAD_MAGIC) | raw = sp.raw(PAYLOAD_MAGIC) | ||||
if args.send_tos: | if args.send_tos: | ||||
ip.tos = int(args.send_tos[0]) | ip.tos = int(args.send_tos[0]) | ||||
if args.fromaddr: | |||||
ip.src = args.fromaddr[0] | |||||
req = ether / ip / icmp / raw | req = ether / ip / icmp / raw | ||||
sp.sendp(req, iface=send_if, verbose=False) | sp.sendp(req, iface=send_if, verbose=False) | ||||
def ping6(send_if, dst_ip, args): | def ping6(send_if, dst_ip, args): | ||||
ether = sp.Ether() | ether = sp.Ether() | ||||
ip6 = sp.IPv6(dst=dst_ip) | ip6 = sp.IPv6(dst=dst_ip) | ||||
icmp = sp.ICMPv6EchoRequest(data=sp.raw(PAYLOAD_MAGIC)) | icmp = sp.ICMPv6EchoRequest(data=sp.raw(PAYLOAD_MAGIC)) | ||||
if args.fromaddr: | |||||
ip.src = args.fromaddr[0] | |||||
req = ether / ip6 / icmp | req = ether / ip6 / icmp | ||||
sp.sendp(req, iface=send_if, verbose=False) | sp.sendp(req, iface=send_if, verbose=False) | ||||
def check_tcpsyn(args, packet): | def check_tcpsyn(args, packet): | ||||
dst_ip = args.to[0] | dst_ip = args.to[0] | ||||
ip = packet.getlayer(sp.IP) | ip = packet.getlayer(sp.IP) | ||||
if not ip: | if not ip: | ||||
▲ Show 20 Lines • Show All 41 Lines • ▼ Show 20 Lines | |||||
def main(): | def main(): | ||||
parser = argparse.ArgumentParser("pft_ping.py", | parser = argparse.ArgumentParser("pft_ping.py", | ||||
description="Ping test tool") | description="Ping test tool") | ||||
parser.add_argument('--sendif', nargs=1, | parser.add_argument('--sendif', nargs=1, | ||||
required=True, | required=True, | ||||
help='The interface through which the packet(s) will be sent') | help='The interface through which the packet(s) will be sent') | ||||
parser.add_argument('--recvif', nargs=1, | parser.add_argument('--recvif', nargs=1, | ||||
help='The interface on which to expect the ICMP echo request') | |||||
parser.add_argument('--replyif', nargs=1, | |||||
help='The interface on which to expect the ICMP echo response') | help='The interface on which to expect the ICMP echo response') | ||||
parser.add_argument('--checkdup', nargs=1, | parser.add_argument('--checkdup', nargs=1, | ||||
help='The interface on which to expect the duplicated ICMP packets') | help='The interface on which to expect the duplicated ICMP packets') | ||||
parser.add_argument('--ip6', action='store_true', | parser.add_argument('--ip6', action='store_true', | ||||
help='Use IPv6') | help='Use IPv6') | ||||
parser.add_argument('--to', nargs=1, | parser.add_argument('--to', nargs=1, | ||||
required=True, | required=True, | ||||
help='The destination IP address for the ICMP echo request') | help='The destination IP address for the ICMP echo request') | ||||
parser.add_argument('--fromaddr', nargs=1, | |||||
help='The source IP address for the ICMP echo request') | |||||
# TCP options | # TCP options | ||||
parser.add_argument('--tcpsyn', action='store_true', | parser.add_argument('--tcpsyn', action='store_true', | ||||
help='Send a TCP SYN packet') | help='Send a TCP SYN packet') | ||||
parser.add_argument('--tcpopt_unaligned', action='store_true', | parser.add_argument('--tcpopt_unaligned', action='store_true', | ||||
help='Include unaligned TCP options') | help='Include unaligned TCP options') | ||||
# Packet settings | # Packet settings | ||||
Show All 12 Lines | def main(): | ||||
sniffer = None | sniffer = None | ||||
if not args.recvif is None: | if not args.recvif is None: | ||||
checkfn=check_ping_request | checkfn=check_ping_request | ||||
if args.tcpsyn: | if args.tcpsyn: | ||||
checkfn=check_tcpsyn | checkfn=check_tcpsyn | ||||
sniffer = Sniffer(args, checkfn) | sniffer = Sniffer(args, checkfn) | ||||
replysniffer = None | |||||
if not args.replyif is None: | |||||
checkfn=check_ping_reply | |||||
replysniffer = Sniffer(args, checkfn, recvif=args.replyif[0]) | |||||
dupsniffer = None | dupsniffer = None | ||||
if args.checkdup is not None: | if args.checkdup is not None: | ||||
dupsniffer = Sniffer(args, check_dup, recvif=args.checkdup[0]) | dupsniffer = Sniffer(args, check_dup, recvif=args.checkdup[0]) | ||||
if args.tcpsyn: | if args.tcpsyn: | ||||
tcpsyn(args.sendif[0], args.to[0], args) | tcpsyn(args.sendif[0], args.to[0], args) | ||||
else: | else: | ||||
if args.ip6: | if args.ip6: | ||||
ping6(args.sendif[0], args.to[0], args) | ping6(args.sendif[0], args.to[0], args) | ||||
else: | else: | ||||
ping(args.sendif[0], args.to[0], args) | ping(args.sendif[0], args.to[0], args) | ||||
if dupsniffer: | if dupsniffer: | ||||
dupsniffer.join() | dupsniffer.join() | ||||
if dup_found != 1: | if dup_found != 1: | ||||
sys.exit(1) | sys.exit(1) | ||||
if sniffer: | if sniffer: | ||||
sniffer.join() | sniffer.join() | ||||
if sniffer.foundCorrectPacket: | if sniffer.foundCorrectPacket: | ||||
sys.exit(0) | |||||
else: | |||||
sys.exit(1) | |||||
if replysniffer: | |||||
replysniffer.join() | |||||
if replysniffer.foundCorrectPacket: | |||||
sys.exit(0) | sys.exit(0) | ||||
else: | else: | ||||
sys.exit(1) | sys.exit(1) | ||||
if __name__ == '__main__': | if __name__ == '__main__': | ||||
main() | main() |