Changeset View
Standalone View
sbin/ping/tests/pinger.py
- This file was added.
#!/usr/bin/env python | |||||
asomers: s/python/python3/ | |||||
# | |||||
# $FreeBSD$ | |||||
# | |||||
import sys | |||||
import subprocess | |||||
import argparse | |||||
import logging | |||||
logging.getLogger("scapy").setLevel(logging.CRITICAL) | |||||
import scapy.all as sc | |||||
routing_options=["RR", "RR-same", "RR-trunc", | |||||
"LSRR", "LSRR-trunc", | |||||
"SSRR", "SSRR-trunc"] | |||||
def parse_args(): | |||||
parser=argparse.ArgumentParser(prog="pinger.py", | |||||
description="P I N G E R", | |||||
epilog="This utility creates a tun " | |||||
"interface, sends an echo request, and " | |||||
"forges the reply.") | |||||
# Required arguments | |||||
# Avoid setting defaults on these arguments, | |||||
# as we want to set them explicitly in the tests | |||||
parser.add_argument("--iface", type=str, required=True, | |||||
asomersUnsubmitted Done Inline ActionsSpecifying the iface on the command line will fail if an interface named tun0 already exists for some reason. Better to do a plain ifconfig tun create and use whatever ifconfig returns. asomers: Specifying the iface on the command line will fail if an interface named `tun0` already exists… | |||||
jlduran_gmail.comAuthorUnsubmitted Done Inline ActionsYes, If you don't mind, I had this planned for the next commit, where I put these tests inside a VNET jail each, and extend tests/sys/common/vnet.subr to create tun interfaces: jlduran_gmail.com: Yes, If you don't mind, I had this planned for the next commit, where I put these tests inside… | |||||
asomersUnsubmitted Not Done Inline ActionsWhy bother with vnet? Is that really necessary for these tests? asomers: Why bother with vnet? Is that really necessary for these tests? | |||||
jlduran_gmail.comAuthorUnsubmitted Not Done Inline ActionsMy original thought was, VNET jails are cheap, and allow us to remove the "exclusivity clause" from the Makefile, maybe to allow running these tests in parallel while using the same IP address in all of them. jlduran_gmail.com: My original thought was, VNET jails are cheap, and allow us to remove the "exclusivity clause"… | |||||
asomersUnsubmitted Not Done Inline ActionsAhh, removing the "exclusivity clause" is a definite benefit. That makes sense. asomers: Ahh, removing the "exclusivity clause" is a definite benefit. That makes sense. | |||||
help="Interface to send packet to") | |||||
parser.add_argument("--src", type=str, required=True, | |||||
help="Source packet IP") | |||||
parser.add_argument("--dst", type=str, required=True, | |||||
help="Destination packet IP") | |||||
parser.add_argument("--icmp_type", type=int, required=True, | |||||
help="ICMP type") | |||||
parser.add_argument("--icmp_code", type=int, required=True, | |||||
help="ICMP code") | |||||
# IP arguments | |||||
parser.add_argument("--flags", type=str, default="", | |||||
choices=["MF", "DF", "evil"], | |||||
asomersUnsubmitted Done Inline ActionsProbably not necessary to test the Evil bit. asomers: Probably not necessary to test the Evil bit. | |||||
jlduran_gmail.comAuthorUnsubmitted Done Inline ActionsI'll remove it. I only plan to expose a minor bug using the Don't Fragment bit. jlduran_gmail.com: I'll remove it. I only plan to expose a minor bug using the Don't Fragment bit.
The original… | |||||
help="IP flags") | |||||
parser.add_argument("--opts", type=str, default="", | |||||
choices=["EOL", "NOP", "NOP-40", "unk", "unk-40"] + | |||||
routing_options, help="Include IP options") | |||||
parser.add_argument("--special", type=str, default="", | |||||
choices=["tcp", "udp", "wrong", "warp"], | |||||
help="Send a special packet") | |||||
# ICMP arguments | |||||
# Match names with <netinet/ip_icmp.h> | |||||
parser.add_argument("--icmp_pptr", type=int, default=0, | |||||
help="ICMP pointer") | |||||
parser.add_argument("--icmp_gwaddr", type=str, default="0.0.0.0", | |||||
help="ICMP gateway IP address") | |||||
parser.add_argument("--icmp_nextmtu", type=int, default=0, | |||||
help="ICMP next MTU") | |||||
parser.add_argument("--icmp_otime", type=int, default=0, | |||||
help="ICMP originate timestamp") | |||||
parser.add_argument("--icmp_rtime", type=int, default=0, | |||||
help="ICMP receive timestamp") | |||||
parser.add_argument("--icmp_ttime", type=int, default=0, | |||||
help="ICMP transmit timestamp") | |||||
parser.add_argument("--icmp_mask", type=str, default="0.0.0.0", | |||||
help="ICMP address mask") | |||||
parser.add_argument("--request", type=str, default="echo", | |||||
help="Request type (echo, mask, timestamp)") | |||||
# Miscellaneous arguments | |||||
parser.add_argument("--count", type=int, default=1, | |||||
help="Number of packets to send") | |||||
parser.add_argument("--dup", action="store_true", | |||||
help="Duplicate packets") | |||||
parser.add_argument("--version", action="version", version='%(prog)s 1.0') | |||||
return parser.parse_args() | |||||
def construct_response_packet(echo, ip, icmp, special): | |||||
icmp_id_seq_types=[0, 8, 13, 14, 15, 16, 17, 18, 37, 38] | |||||
asomersUnsubmitted Not Done Inline ActionsDoes Scapy have definitions for these magic numbers? asomers: Does Scapy have definitions for these magic numbers? | |||||
jlduran_gmail.comAuthorUnsubmitted Not Done Inline ActionsNot that I'm aware. In fact, this line was taken from scapy: jlduran_gmail.com: Not that I'm aware. In fact, this line was taken from scapy:
https://github. | |||||
asomersUnsubmitted Not Done Inline ActionsLame. It would be good to fix that some day, but no need to do it now. asomers: Lame. It would be good to fix that some day, but no need to do it now. | |||||
jlduran_gmail.comAuthorUnsubmitted Not Done Inline ActionsAfter your suggestion, I tried to essentially create a table from <netinet/ip_icmp.h>, but given most of those codes are already deprecated, and we are even missing the names for ICMP codes 37 and 38, I gave up. jlduran_gmail.com: After your suggestion, I tried to essentially create a table from `<netinet/ip_icmp.h>`, but… | |||||
oip=echo[sc.IP] | |||||
oicmp=echo[sc.ICMP] | |||||
load=echo[sc.ICMP].payload | |||||
oip[sc.IP].remove_payload() | |||||
oicmp[sc.ICMP].remove_payload() | |||||
oicmp.type=8 | |||||
# As if the original IP packet had these set | |||||
oip.ihl=None | |||||
oip.len=None | |||||
oip.id=1 | |||||
oip.flags=ip.flags | |||||
oip.chksum=None | |||||
oip.options=ip.options | |||||
# Special options | |||||
if special == "tcp": | |||||
oip.proto="tcp" | |||||
tcp=sc.TCP(sport=1234, dport=5678) | |||||
return ip/icmp/oip/tcp | |||||
if special == "udp": | |||||
oip.proto="udp" | |||||
udp=sc.UDP(sport=1234, dport=5678) | |||||
return ip/icmp/oip/udp | |||||
if special == "warp": | |||||
# Build a package with a timestamp of INT_MAX | |||||
# (time-warped package) | |||||
payload_no_timestamp=sc.bytes_hex(load)[16:] | |||||
load=((b"\xff" * 8) + sc.hex_bytes(payload_no_timestamp)) | |||||
if special == "wrong": | |||||
# Build a package with a wrong last byte | |||||
payload_no_last_byte=sc.bytes_hex(load)[:-2] | |||||
load=((sc.hex_bytes(payload_no_last_byte)) + b"\x00") | |||||
if icmp.type in icmp_id_seq_types: | |||||
pkt=ip/icmp/load | |||||
else: | |||||
ip.options="" | |||||
pkt=ip/icmp/oip/oicmp/load | |||||
return pkt | |||||
def generate_ip_options(opts): | |||||
routers=["192.0.2.10", "192.0.2.20", "192.0.2.30", | |||||
"192.0.2.40", "192.0.2.50", "192.0.2.60", | |||||
"192.0.2.70", "192.0.2.80", "192.0.2.90"] | |||||
routers_zero=[0, 0, 0, 0, 0, 0, 0, 0, 0] | |||||
if opts == "EOL": | |||||
options=sc.IPOption(b"\x00") | |||||
elif opts == "NOP": | |||||
options=sc.IPOption(b"\x01") | |||||
elif opts == "NOP-40": | |||||
options=sc.IPOption(b"\x01" * 40) | |||||
elif opts == "RR": | |||||
options=sc.IPOption_RR(pointer=40, routers=routers) | |||||
elif opts == "RR-same": | |||||
options=sc.IPOption_RR(pointer=3, routers=routers_zero) | |||||
elif opts == "RR-trunc": | |||||
options=sc.IPOption_RR(length=7, routers=routers_zero) | |||||
elif opts == "LSRR": | |||||
subprocess.run(["sysctl", "net.inet.ip.process_options=0"], check=True) | |||||
options=sc.IPOption_LSRR(routers=routers) | |||||
elif opts == "LSRR-trunc": | |||||
subprocess.run(["sysctl", "net.inet.ip.process_options=0"], check=True) | |||||
options=sc.IPOption_LSRR(length=3, routers=routers_zero) | |||||
elif opts == "SSRR": | |||||
subprocess.run(["sysctl", "net.inet.ip.process_options=0"], check=True) | |||||
options=sc.IPOption_SSRR(routers=routers) | |||||
elif opts == "SSRR-trunc": | |||||
subprocess.run(["sysctl", "net.inet.ip.process_options=0"], check=True) | |||||
options=sc.IPOption_SSRR(length=3, routers=routers_zero) | |||||
elif opts == "unk": | |||||
subprocess.run(["sysctl", "net.inet.ip.process_options=0"], check=True) | |||||
options=sc.IPOption(b"\x9f") | |||||
elif opts == "unk-40": | |||||
subprocess.run(["sysctl", "net.inet.ip.process_options=0"], check=True) | |||||
options=sc.IPOption(b"\x9f" * 40) | |||||
else: | |||||
options="" | |||||
return options | |||||
def main(): | |||||
"""P I N G E R""" | |||||
args=parse_args() | |||||
opts=generate_ip_options(args.opts) | |||||
ip=sc.IP(flags=args.flags, src=args.dst, dst=args.src, options=opts) | |||||
tun=sc.TunTapInterface(args.iface) | |||||
with open("created_interfaces.lst", "w", encoding="utf-8") as file: | |||||
file.write(args.iface) | |||||
asomersUnsubmitted Done Inline Actions@melifaro note that this line only works because atf-sh runs each test case in its own temporary directory. I don't think the pytest Kyua wrapper does that. asomers: @melifaro note that this line only works because atf-sh runs each test case in its own… | |||||
melifaroUnsubmitted Done Inline Actionsatf-sh is just an utility wrapper that implements kyua/atf protocol. melifaro: atf-sh is just an utility wrapper that implements kyua/atf protocol.
All isolation/priv drops… | |||||
jlduran_gmail.comAuthorUnsubmitted Done Inline ActionsThank you! I'll study those pytests! jlduran_gmail.com: Thank you! I'll study those pytests!
At the moment, I fail to see the advantage of one over the… | |||||
jlduran_gmail.comAuthorUnsubmitted Done Inline ActionsFor self reference: D31084 (should become a wiki!) jlduran_gmail.com: For self reference: D31084 (should become a wiki!) | |||||
subprocess.run(["ifconfig", tun.iface, "up"], check=True) | |||||
subprocess.run(["ifconfig", tun.iface, args.src, args.dst], check=True) | |||||
command=["/sbin/ping", "-c", str(args.count), "-t", str(args.count), "-v"] | |||||
if args.request == "mask": | |||||
command+=["-Mm"] | |||||
if args.request == "timestamp": | |||||
command+=["-Mt"] | |||||
if args.special != "": | |||||
command+=["-p1"] | |||||
if args.opts in routing_options: | |||||
command+=["-R"] | |||||
command+=[args.dst] | |||||
with subprocess.Popen( | |||||
args=command, | |||||
text=True | |||||
) as ping: | |||||
for dummy in range(args.count): | |||||
echo=tun.recv() | |||||
icmp=sc.ICMP(type=args.icmp_type, code=args.icmp_code, | |||||
id=echo[sc.ICMP].id, seq=echo[sc.ICMP].seq, | |||||
ts_ori=args.icmp_otime, ts_rx=args.icmp_rtime, | |||||
ts_tx=args.icmp_ttime, gw=args.icmp_gwaddr, | |||||
ptr=args.icmp_pptr, | |||||
addr_mask=args.icmp_mask, nexthopmtu=args.icmp_nextmtu) | |||||
pkt=construct_response_packet(echo, ip, icmp, args.special) | |||||
tun.send(pkt) | |||||
if args.dup is True: | |||||
tun.send(pkt) | |||||
ping.communicate() | |||||
sys.exit(ping.returncode) | |||||
if __name__ == "__main__": | |||||
main() |
s/python/python3/