diff --git a/tests/sys/netpfil/pf/nat64.py b/tests/sys/netpfil/pf/nat64.py --- a/tests/sys/netpfil/pf/nat64.py +++ b/tests/sys/netpfil/pf/nat64.py @@ -28,19 +28,38 @@ import selectors import socket import sys +import threading +import time from atf_python.sys.net.tools import ToolsHelper from atf_python.sys.net.vnet import VnetTestTemplate +class DelayedSend(threading.Thread): + def __init__(self, packet): + threading.Thread.__init__(self) + self._packet = packet + + self.start() + + def run(self): + import scapy.all as sp + time.sleep(1) + sp.send(self._packet) + class TestNAT64(VnetTestTemplate): REQUIRED_MODULES = [ "pf" ] TOPOLOGY = { "vnet1": {"ifaces": ["if1"]}, "vnet2": {"ifaces": ["if1", "if2"]}, - "vnet3": {"ifaces": ["if2"]}, + "vnet3": {"ifaces": ["if2", "if3"]}, + "vnet4": {"ifaces": ["if3"]}, "if1": {"prefixes6": [("2001:db8::2/64", "2001:db8::1/64")]}, "if2": {"prefixes4": [("192.0.2.1/24", "192.0.2.2/24")]}, + "if3": {"prefixes4": [("198.51.100.1/24", "198.51.100.2/24")]} } + def vnet4_handler(self, vnet): + ToolsHelper.print_output("/sbin/route add default 198.51.100.1") + def vnet3_handler(self, vnet): ToolsHelper.print_output("/sbin/sysctl net.inet.ip.forwarding=1") ToolsHelper.print_output("/sbin/sysctl net.inet.ip.ttl=62") @@ -155,7 +174,7 @@ import scapy.all as sp - packet = sp.IPv6(dst="64:ff9b::198.51.100.3") \ + packet = sp.IPv6(dst="64:ff9b::203.0.113.2") \ / sp.UDP(dport=1222) / sp.Raw("bar") reply = sp.sr1(packet, timeout=3) print(reply.show()) @@ -193,3 +212,74 @@ udp = reply.getlayer(sp.UDP) assert udp assert udp.chksum != 0 + + def common_test_source_addr(self, packet): + vnet = self.vnet_map["vnet1"] + sendif = vnet.iface_alias_map["if1"].name + + import scapy.all as sp + + print("Outbound:\n") + packet.show() + + s = DelayedSend(packet) + + # We expect an ICMPv6 error here, where we'll verify the source address of + # the outer packet + packets = sp.sniff(iface=sendif, timeout=5) + + for reply in packets: + print("Reply:\n") + reply.show() + icmp = reply.getlayer(sp.ICMPv6TimeExceeded) + if not icmp: + continue + + ip = reply.getlayer(sp.IPv6) + assert icmp + assert ip.src == "64:ff9b::c000:202" + return + + # If we don't find the packet we expect to see + assert False + + @pytest.mark.require_user("root") + @pytest.mark.require_progs(["scapy"]) + def test_source_addr_tcp(self): + ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") + import scapy.all as sp + + packet = sp.IPv6(dst="64:ff9b::198.51.100.2", hlim=1) \ + / sp.TCP(sport=1111, dport=2222, flags="S") + self.common_test_source_addr(packet) + + @pytest.mark.require_user("root") + @pytest.mark.require_progs(["scapy"]) + def test_source_addr_udp(self): + ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") + import scapy.all as sp + + packet = sp.IPv6(dst="64:ff9b::198.51.100.2", hlim=1) \ + / sp.UDP(sport=1111, dport=2222) / sp.Raw("foo") + self.common_test_source_addr(packet) + + @pytest.mark.require_user("root") + @pytest.mark.require_progs(["scapy"]) + def test_source_addr_sctp(self): + ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") + import scapy.all as sp + + packet = sp.IPv6(dst="64:ff9b::198.51.100.2", hlim=1) \ + / sp.SCTP(sport=1111, dport=2222) \ + / sp.SCTPChunkInit(init_tag=1, n_in_streams=1, n_out_streams=1, a_rwnd=1500) + self.common_test_source_addr(packet) + + @pytest.mark.require_user("root") + @pytest.mark.require_progs(["scapy"]) + def test_source_addr_icmp(self): + ToolsHelper.print_output("/sbin/route -6 add default 2001:db8::1") + import scapy.all as sp + + packet = sp.IPv6(dst="64:ff9b::198.51.100.2", hlim=1) \ + / sp.ICMPv6EchoRequest() / sp.Raw("foo") + self.common_test_source_addr(packet)