Changeset View
Standalone View
sbin/ping/tests/pinger.py
- This file was added.
#!/usr/bin/env python3 | |||||
asomers: s/python/python3/ | |||||
import sys | |||||
import subprocess | |||||
import argparse | |||||
import logging | |||||
logging.getLogger("scapy").setLevel(logging.CRITICAL) | |||||
import scapy.all as sc | |||||
routing_options = [ | |||||
"RR", | |||||
"RR-same", | |||||
"RR-trunc", | |||||
"LSRR", | |||||
"LSRR-trunc", | |||||
"SSRR", | |||||
"SSRR-trunc", | |||||
] | |||||
def parse_args(): | |||||
parser = argparse.ArgumentParser( | |||||
prog="pinger.py", | |||||
description="P I N G E R", | |||||
epilog="This utility creates a tun interface, " | |||||
"sends an echo request, and forges the reply.", | |||||
Done Inline ActionsSpecifying the iface on the command line will fail if an interface named tun0 already exists for some reason. Better to do a plain ifconfig tun create and use whatever ifconfig returns. asomers: Specifying the iface on the command line will fail if an interface named `tun0` already exists… | |||||
Done Inline ActionsYes, If you don't mind, I had this planned for the next commit, where I put these tests inside a VNET jail each, and extend tests/sys/common/vnet.subr to create tun interfaces: jlduran_gmail.com: Yes, If you don't mind, I had this planned for the next commit, where I put these tests inside… | |||||
Not Done Inline ActionsWhy bother with vnet? Is that really necessary for these tests? asomers: Why bother with vnet? Is that really necessary for these tests? | |||||
Not Done Inline ActionsMy original thought was, VNET jails are cheap, and allow us to remove the "exclusivity clause" from the Makefile, maybe to allow running these tests in parallel while using the same IP address in all of them. jlduran_gmail.com: My original thought was, VNET jails are cheap, and allow us to remove the "exclusivity clause"… | |||||
Not Done Inline ActionsAhh, removing the "exclusivity clause" is a definite benefit. That makes sense. asomers: Ahh, removing the "exclusivity clause" is a definite benefit. That makes sense. | |||||
) | |||||
# Required arguments | |||||
# Avoid setting defaults on these arguments, | |||||
# as we want to set them explicitly in the tests | |||||
parser.add_argument( | |||||
"--iface", type=str, required=True, help="Interface to send packet to" | |||||
) | |||||
parser.add_argument( | |||||
"--src", type=str, required=True, help="Source packet IP" | |||||
) | |||||
parser.add_argument( | |||||
"--dst", type=str, required=True, help="Destination packet IP" | |||||
Done Inline ActionsProbably not necessary to test the Evil bit. asomers: Probably not necessary to test the Evil bit. | |||||
Done Inline ActionsI'll remove it. I only plan to expose a minor bug using the Don't Fragment bit. jlduran_gmail.com: I'll remove it. I only plan to expose a minor bug using the Don't Fragment bit.
The original… | |||||
) | |||||
parser.add_argument( | |||||
"--icmp_type", type=int, required=True, help="ICMP type" | |||||
) | |||||
parser.add_argument( | |||||
"--icmp_code", type=int, required=True, help="ICMP code" | |||||
) | |||||
# IP arguments | |||||
parser.add_argument( | |||||
"--flags", type=str, default="", choices=["DF", "MF"], help="IP flags" | |||||
) | |||||
parser.add_argument( | |||||
"--opts", | |||||
type=str, | |||||
default="", | |||||
choices=["EOL", "NOP", "NOP-40", "unk", "unk-40"] + routing_options, | |||||
help="Include IP options", | |||||
) | |||||
parser.add_argument( | |||||
"--special", | |||||
type=str, | |||||
default="", | |||||
choices=["tcp", "udp", "wrong", "warp"], | |||||
help="Send a special packet", | |||||
) | |||||
# ICMP arguments | |||||
# Match names with <netinet/ip_icmp.h> | |||||
parser.add_argument( | |||||
"--icmp_pptr", type=int, default=0, help="ICMP pointer" | |||||
) | |||||
parser.add_argument( | |||||
"--icmp_gwaddr", | |||||
type=str, | |||||
default="0.0.0.0", | |||||
help="ICMP gateway IP address", | |||||
Not Done Inline ActionsDoes Scapy have definitions for these magic numbers? asomers: Does Scapy have definitions for these magic numbers? | |||||
Not Done Inline ActionsNot that I'm aware. In fact, this line was taken from scapy: jlduran_gmail.com: Not that I'm aware. In fact, this line was taken from scapy:
https://github. | |||||
Not Done Inline ActionsLame. It would be good to fix that some day, but no need to do it now. asomers: Lame. It would be good to fix that some day, but no need to do it now. | |||||
Not Done Inline ActionsAfter your suggestion, I tried to essentially create a table from <netinet/ip_icmp.h>, but given most of those codes are already deprecated, and we are even missing the names for ICMP codes 37 and 38, I gave up. jlduran_gmail.com: After your suggestion, I tried to essentially create a table from `<netinet/ip_icmp.h>`, but… | |||||
) | |||||
parser.add_argument( | |||||
"--icmp_nextmtu", type=int, default=0, help="ICMP next MTU" | |||||
) | |||||
parser.add_argument( | |||||
"--icmp_otime", type=int, default=0, help="ICMP originate timestamp" | |||||
) | |||||
parser.add_argument( | |||||
"--icmp_rtime", type=int, default=0, help="ICMP receive timestamp" | |||||
) | |||||
parser.add_argument( | |||||
"--icmp_ttime", type=int, default=0, help="ICMP transmit timestamp" | |||||
) | |||||
parser.add_argument( | |||||
"--icmp_mask", type=str, default="0.0.0.0", help="ICMP address mask" | |||||
) | |||||
parser.add_argument( | |||||
"--request", | |||||
type=str, | |||||
default="", | |||||
choices=["mask", "timestamp"], | |||||
help="Request type", | |||||
) | |||||
# Miscellaneous arguments | |||||
parser.add_argument( | |||||
"--count", type=int, default=1, help="Number of packets to send" | |||||
) | |||||
parser.add_argument("--dup", action="store_true", help="Duplicate packets") | |||||
parser.add_argument("--version", action="version", version="%(prog)s 1.0") | |||||
return parser.parse_args() | |||||
def construct_response_packet(echo, ip, icmp, special): | |||||
icmp_id_seq_types = [0, 8, 13, 14, 15, 16, 17, 18, 37, 38] | |||||
oip = echo[sc.IP] | |||||
oicmp = echo[sc.ICMP] | |||||
load = echo[sc.ICMP].payload | |||||
oip[sc.IP].remove_payload() | |||||
oicmp[sc.ICMP].remove_payload() | |||||
oicmp.type = 8 | |||||
# As if the original IP packet had these set | |||||
oip.ihl = None | |||||
oip.len = None | |||||
oip.id = 1 | |||||
oip.flags = ip.flags | |||||
oip.chksum = None | |||||
oip.options = ip.options | |||||
# Special options | |||||
if special == "tcp": | |||||
oip.proto = "tcp" | |||||
tcp = sc.TCP(sport=1234, dport=5678) | |||||
return ip / icmp / oip / tcp | |||||
if special == "udp": | |||||
oip.proto = "udp" | |||||
udp = sc.UDP(sport=1234, dport=5678) | |||||
return ip / icmp / oip / udp | |||||
if special == "warp": | |||||
# Build a package with a timestamp of INT_MAX | |||||
# (time-warped package) | |||||
payload_no_timestamp = sc.bytes_hex(load)[16:] | |||||
load = (b"\xff" * 8) + sc.hex_bytes(payload_no_timestamp) | |||||
if special == "wrong": | |||||
# Build a package with a wrong last byte | |||||
payload_no_last_byte = sc.bytes_hex(load)[:-2] | |||||
load = (sc.hex_bytes(payload_no_last_byte)) + b"\x00" | |||||
if icmp.type in icmp_id_seq_types: | |||||
pkt = ip / icmp / load | |||||
else: | |||||
ip.options = "" | |||||
pkt = ip / icmp / oip / oicmp / load | |||||
return pkt | |||||
def generate_ip_options(opts): | |||||
routers = [ | |||||
"192.0.2.10", | |||||
"192.0.2.20", | |||||
"192.0.2.30", | |||||
"192.0.2.40", | |||||
"192.0.2.50", | |||||
"192.0.2.60", | |||||
"192.0.2.70", | |||||
"192.0.2.80", | |||||
"192.0.2.90", | |||||
Done Inline Actions@melifaro note that this line only works because atf-sh runs each test case in its own temporary directory. I don't think the pytest Kyua wrapper does that. asomers: @melifaro note that this line only works because atf-sh runs each test case in its own… | |||||
Done Inline Actionsatf-sh is just an utility wrapper that implements kyua/atf protocol. melifaro: atf-sh is just an utility wrapper that implements kyua/atf protocol.
All isolation/priv drops… | |||||
Done Inline ActionsThank you! I'll study those pytests! jlduran_gmail.com: Thank you! I'll study those pytests!
At the moment, I fail to see the advantage of one over the… | |||||
Done Inline ActionsFor self reference: D31084 (should become a wiki!) jlduran_gmail.com: For self reference: D31084 (should become a wiki!) | |||||
] | |||||
routers_zero = [0, 0, 0, 0, 0, 0, 0, 0, 0] | |||||
if opts == "EOL": | |||||
options = sc.IPOption(b"\x00") | |||||
elif opts == "NOP": | |||||
options = sc.IPOption(b"\x01") | |||||
elif opts == "NOP-40": | |||||
options = sc.IPOption(b"\x01" * 40) | |||||
elif opts == "RR": | |||||
options = sc.IPOption_RR(pointer=40, routers=routers) | |||||
elif opts == "RR-same": | |||||
options = sc.IPOption_RR(pointer=3, routers=routers_zero) | |||||
elif opts == "RR-trunc": | |||||
options = sc.IPOption_RR(length=7, routers=routers_zero) | |||||
elif opts == "LSRR": | |||||
subprocess.run(["sysctl", "net.inet.ip.process_options=0"], check=True) | |||||
options = sc.IPOption_LSRR(routers=routers) | |||||
elif opts == "LSRR-trunc": | |||||
subprocess.run(["sysctl", "net.inet.ip.process_options=0"], check=True) | |||||
options = sc.IPOption_LSRR(length=3, routers=routers_zero) | |||||
elif opts == "SSRR": | |||||
subprocess.run(["sysctl", "net.inet.ip.process_options=0"], check=True) | |||||
options = sc.IPOption_SSRR(routers=routers) | |||||
elif opts == "SSRR-trunc": | |||||
subprocess.run(["sysctl", "net.inet.ip.process_options=0"], check=True) | |||||
options = sc.IPOption_SSRR(length=3, routers=routers_zero) | |||||
elif opts == "unk": | |||||
subprocess.run(["sysctl", "net.inet.ip.process_options=0"], check=True) | |||||
options = sc.IPOption(b"\x9f") | |||||
elif opts == "unk-40": | |||||
subprocess.run(["sysctl", "net.inet.ip.process_options=0"], check=True) | |||||
options = sc.IPOption(b"\x9f" * 40) | |||||
else: | |||||
options = "" | |||||
return options | |||||
def main(): | |||||
"""P I N G E R""" | |||||
args = parse_args() | |||||
opts = generate_ip_options(args.opts) | |||||
ip = sc.IP(flags=args.flags, src=args.dst, dst=args.src, options=opts) | |||||
tun = sc.TunTapInterface(args.iface) | |||||
subprocess.run(["ifconfig", tun.iface, "up"], check=True) | |||||
subprocess.run(["ifconfig", tun.iface, args.src, args.dst], check=True) | |||||
command = [ | |||||
"/sbin/ping", | |||||
"-c", | |||||
str(args.count), | |||||
"-t", | |||||
str(args.count), | |||||
"-v", | |||||
] | |||||
if args.request == "mask": | |||||
command += ["-Mm"] | |||||
if args.request == "timestamp": | |||||
command += ["-Mt"] | |||||
if args.special != "": | |||||
command += ["-p1"] | |||||
if args.opts in routing_options: | |||||
command += ["-R"] | |||||
command += [args.dst] | |||||
with subprocess.Popen(args=command, text=True) as ping: | |||||
for dummy in range(args.count): | |||||
echo = tun.recv() | |||||
icmp = sc.ICMP( | |||||
type=args.icmp_type, | |||||
code=args.icmp_code, | |||||
id=echo[sc.ICMP].id, | |||||
seq=echo[sc.ICMP].seq, | |||||
ts_ori=args.icmp_otime, | |||||
ts_rx=args.icmp_rtime, | |||||
ts_tx=args.icmp_ttime, | |||||
gw=args.icmp_gwaddr, | |||||
ptr=args.icmp_pptr, | |||||
addr_mask=args.icmp_mask, | |||||
nexthopmtu=args.icmp_nextmtu, | |||||
) | |||||
pkt = construct_response_packet(echo, ip, icmp, args.special) | |||||
tun.send(pkt) | |||||
if args.dup is True: | |||||
tun.send(pkt) | |||||
ping.communicate() | |||||
sys.exit(ping.returncode) | |||||
if __name__ == "__main__": | |||||
main() |
s/python/python3/