Create a new IPv6 netisr which expects the frames to have been verified;
it just directly dispatches to the IPv6 input path.
Details
I slightly modified rss-udp-srv [1] to count the flowid of each packet:
diff --git a/rss-udp-srv/udp_srv.c b/rss-udp-srv/udp_srv.c index 8f49f2a..ad75361 100644 --- a/rss-udp-srv/udp_srv.c +++ b/rss-udp-srv/udp_srv.c @@ -1,5 +1,6 @@ #include <stdio.h> #include <stdlib.h> +#include <assert.h> #include <unistd.h> #include <string.h> #include <err.h> @@ -43,6 +44,14 @@ struct udp_srv_thread { struct event *ev_read6, *ev_write6; }; +struct flowid_node { + uint32_t flowid; + int64_t count; +}; + +#define NR_FLOWIDS 10 +struct flowid_node flowids[NR_FLOWIDS]; + static int thr_sock_set_reuseaddr(int fd, int reuse_addr) { @@ -96,7 +105,7 @@ thr_rss_udp_listen_sock_setup(int fd, int af_family, int rss_bucket) return (-1); } -#if 0 +#if 1 if (rss_sock_set_recvrss(fd, af_family, rss_bucket) < 0) { return (-1); } @@ -208,12 +217,13 @@ error: } static void -thr_parse_msghdr(struct msghdr *m) +thr_parse_msghdr(struct msghdr *m, int ver) { const struct cmsghdr *c; uint32_t flowid; uint32_t flowtype; uint32_t flow_rssbucket; + int i; for (c = CMSG_FIRSTHDR(m); c != NULL; c = CMSG_NXTHDR(m, c)) { #if 0 @@ -221,28 +231,72 @@ thr_parse_msghdr(struct msghdr *m) printf(" msghdr type: %d\n", c->cmsg_type); printf(" msghdr len: %d\n", c->cmsg_len); #endif - if (c->cmsg_level != IPPROTO_IP) - continue; - switch (c->cmsg_type) { - case IP_FLOWID: - flowid = *(uint32_t *) CMSG_DATA(c); - break; - case IP_FLOWTYPE: - flowtype = *(uint32_t *) CMSG_DATA(c); - break; - case IP_RSSBUCKETID: - flow_rssbucket = *(uint32_t *) CMSG_DATA(c); - break; + if (ver == 4) { + if (c->cmsg_level != IPPROTO_IP) + continue; + switch (c->cmsg_type) { + case IP_FLOWID: + flowid = *(uint32_t *) CMSG_DATA(c); + break; + case IP_FLOWTYPE: + flowtype = *(uint32_t *) CMSG_DATA(c); + break; + case IP_RSSBUCKETID: + flow_rssbucket = *(uint32_t *) CMSG_DATA(c); + break; + } + } else { + if (c->cmsg_level != IPPROTO_IPV6) + continue; + switch (c->cmsg_type) { + case IPV6_FLOWID: + flowid = *(uint32_t *) CMSG_DATA(c); + break; + case IPV6_FLOWTYPE: + flowtype = *(uint32_t *) CMSG_DATA(c); + break; + case IPV6_RSSBUCKETID: + flow_rssbucket = *(uint32_t *) CMSG_DATA(c); + break; + } + } + } + + if (ver == 4) { + if (flowtype != 7) { + printf("flowtype=%d\n", flowtype); + exit(127); + } + } else { + if (flowtype != 9) { + printf("flowtype=%d\n", flowtype); + exit(127); } } #if 0 - printf(" flowid=0x%08x; flowtype=%d; bucket=%d\n", flowid, flowtype, flow_rssbucket); + printf(" flowid=0x%08x; flowtype=%d; bucket=%d; version=%d\n", flowid, flowtype, flow_rssbucket, ver); #endif + + for (i = 0; i < NR_FLOWIDS && flowids[i].count != -1; i++) { + if (flowid == flowids[i].flowid) { + flowids[i].count++; + break; + } + } + if (i == NR_FLOWIDS) { + fprintf(stderr, "Too many flowids appeared\n"); + exit(127); + } + if (flowids[i].count == -1) { + flowids[i].flowid = flowid; + flowids[i].count = 1; + } } static void thr_ev_timer(int fd, short what, void *arg) { +#if 0 struct udp_srv_thread *th = arg; struct timeval tv; @@ -260,11 +314,12 @@ thr_ev_timer(int fd, short what, void *arg) tv.tv_sec = 1; tv.tv_usec = 0; evtimer_add(th->ev_timer, &tv); +#endif } static void -thr_udp_ev_read(int fd, short what, void *arg) +thr_udp_ev_read(int fd, short what, void *arg, int ver) { struct udp_srv_thread *th = arg; /* XXX should be thread-local, and a larger buffer, and likely a queue .. */ @@ -274,7 +329,7 @@ thr_udp_ev_read(int fd, short what, void *arg) struct sockaddr_storage sin; socklen_t sin_len; -#if 0 +#if 1 /* for the msghdr contents */ struct msghdr m; char msgbuf[2048]; @@ -286,7 +341,7 @@ thr_udp_ev_read(int fd, short what, void *arg) /* Loop read UDP frames until EWOULDBLOCK or 1024 frames */ while (i < 10240) { -#if 0 +#if 1 iov[0].iov_base = buf; iov[0].iov_len = 2048; @@ -299,22 +354,24 @@ thr_udp_ev_read(int fd, short what, void *arg) m.msg_flags = 0; ret = recvmsg(fd, &m, 0); -#endif +#else sin_len = sizeof(sin); ret = recvfrom(fd, buf, 2048, MSG_DONTWAIT, (struct sockaddr *) &sin, &sin_len); +#endif if (ret <= 0) { if (errno != EWOULDBLOCK) warn("%s: recv", __func__); break; } -#if 0 - printf(" recv: len=%d, controllen=%d\n", - (int) ret, - (int) m.msg_controllen); - thr_parse_msghdr(&m); +#if 1 + //printf(" recv: len=%d, controllen=%d\n", + // (int) ret, + // (int) m.msg_controllen); + thr_parse_msghdr(&m, ver); + //printf("%s\n", buf); #endif i++; th->recv_pkts++; @@ -336,9 +393,15 @@ thr_udp_ev_read(int fd, short what, void *arg) } static void +thr_udp_ev_read4(int fd, short what, void *arg) +{ + thr_udp_ev_read(fd, what, arg, 4); +} + +static void thr_udp_ev_read6(int fd, short what, void *arg) { - thr_udp_ev_read(fd, what, arg); + thr_udp_ev_read(fd, what, arg, 6); } static void * @@ -388,7 +451,7 @@ thr_udp_srv_init(void *arg) /* Create read and write readiness events */ th->ev_read = event_new(th->b, th->s4, EV_READ | EV_PERSIST, - thr_udp_ev_read, th); + thr_udp_ev_read4, th); event_add(th->ev_read, NULL); th->ev_read6 = event_new(th->b, th->s6, EV_READ | EV_PERSIST, @@ -428,7 +491,7 @@ main(int argc, char *argv[]) struct in6_addr lcl6_addr; int do_response; - if (argc < 3) { + if (argc < 2) { printf("Usage: %s <response> <ipv4 lcl address>\n", argv[0]); printf(" response: 1 if each RX packet generates a TX response, else 0\n"); printf(" ipv4 lcl address: IPv4 local address to bind to\n"); @@ -438,7 +501,7 @@ main(int argc, char *argv[]) lcl_addr.s_addr = INADDR_ANY; lcl6_addr = in6addr_any; do_response = atoi(argv[1]); - (void) inet_aton(argv[2], &lcl_addr); + //(void) inet_aton(argv[2], &lcl_addr); ncpu = rss_getsysctlint("net.inet.rss.ncpus"); if (ncpu < 0) { @@ -488,6 +551,11 @@ main(int argc, char *argv[]) if (sigemptyset(&sa.sa_mask) == -1 || sigaction(SIGPIPE, &sa, 0) == -1) perror("failed to ignore SIGPIPE; sigaction"); + for (i = 0; i < NR_FLOWIDS; i++) { + flowids[i].flowid = -1; + flowids[i].count = -1; + } + for (i = 0; i < nbuckets; i++) { th[i].tid = i; th[i].rss_bucket = i; @@ -504,6 +572,23 @@ main(int argc, char *argv[]) (void) pthread_create(&th[i].thr, NULL, thr_udp_srv_init, &th[i]); } +#if 0 + while (1) { + for (i = 0; i < nbuckets; i++) { + printf("bucket%d=%lu ", i, th[i].recv_pkts); + } + printf("\n"); + sleep(1); + } +#endif + while (1) { + for (i = 0; i < NR_FLOWIDS && flowids[i].count != -1; i++) + printf("count(flowid=%08x)=%ld ", flowids[i].flowid, + flowids[i].count); + printf("\n"); + sleep(1); + } + /* Wait */ for (i = 0; i < nbuckets; i++) { (void) pthread_join(th[i].thr, NULL);
And I enabled UDP 4-tuple hashing to make sure the RSS hash is
recalculated as the type of RSS_HASHTYPE_RSS_UDP_IPV4 or
RSS_HASHTYPE_RSS_UDP_IPV6 by np_m2cpuid (for testing purpose).
diff --git a/sys/net/rss_config.c b/sys/net/rss_config.c index e7e8eb4..320ed9b 100644 --- a/sys/net/rss_config.c +++ b/sys/net/rss_config.c @@ -483,14 +483,14 @@ rss_gethashconfig(void) return ( RSS_HASHTYPE_RSS_IPV4 | RSS_HASHTYPE_RSS_TCP_IPV4 + | RSS_HASHTYPE_RSS_UDP_IPV4 | RSS_HASHTYPE_RSS_IPV6 | RSS_HASHTYPE_RSS_TCP_IPV6 + | RSS_HASHTYPE_RSS_UDP_IPV6 | RSS_HASHTYPE_RSS_IPV6_EX | RSS_HASHTYPE_RSS_TCP_IPV6_EX #if 0 - | RSS_HASHTYPE_RSS_UDP_IPV4 | RSS_HASHTYPE_RSS_UDP_IPV4_EX - | RSS_HASHTYPE_RSS_UDP_IPV6 | RSS_HASHTYPE_RSS_UDP_IPV6_EX #endif );
And I patched udp6_input() to be able to handle "atomic" fragments
correctly:
diff --git a/sys/netinet6/udp6_usrreq.c b/sys/netinet6/udp6_usrreq.c index 98790a8..da72f00 100644 --- a/sys/netinet6/udp6_usrreq.c +++ b/sys/netinet6/udp6_usrreq.c @@ -207,7 +207,7 @@ udp6_input(struct mbuf **mp, int *offp, int proto) struct sockaddr_in6 fromsa; struct m_tag *fwd_tag; uint16_t uh_sum; - uint8_t nxt; + uint8_t nxt = proto; ifp = m->m_pkthdr.rcvif; ip6 = mtod(m, struct ip6_hdr *); @@ -233,7 +233,6 @@ udp6_input(struct mbuf **mp, int *offp, int proto) plen = ntohs(ip6->ip6_plen) - off + sizeof(*ip6); ulen = ntohs((u_short)uh->uh_ulen); - nxt = ip6->ip6_nxt; cscov_partial = (nxt == IPPROTO_UDPLITE) ? 1 : 0; if (nxt == IPPROTO_UDPLITE) { /* Zero means checksum over the complete packet. */
Then I use pktgen [2] to generate UDP packets with the UDP payload
length varies in the range of 100 to 10000 bytes (packets will be
fragmented when the payload exceeds 800 bytes), but keep the 4-tuple
(saddr, sport, daddr, dport) consistent, and inject them into tap(4).
So the flowid of each packet received by rss-udp-srv should be the
same even if the packet is fragmented.
The script that is used to execute pktgen automatically:
% cat auto #!/bin/sh sudo ./pktgen -i tap0 -6 -n 10000 -l 100 # No fragment, 10000 packets sudo ./pktgen -i tap0 -6 -n 10000 -l 100 -f # atomic fragment sudo ./pktgen -i tap0 -6 -n 10000 -l 1000 # 2 fragments sudo ./pktgen -i tap0 -6 -n 10000 -l 10000 # 13 fragments
The outputs of netstat(1):
% netstat -s -p ip6 ip6: 170000 total packets received 0 with size smaller than minimum 0 with data size < data length 0 with bad options 0 with incorrect version number 160000 fragments received 0 fragments dropped (dup or out of space) 0 fragments dropped after timeout 0 fragments that exceeded limit 30000 packets reassembled ok 170000 packets for this host 0 packets forwarded 0 packets not forwardable 0 redirects sent 11 packets sent from this host 0 packets sent with fabricated ip header 0 output packets dropped due to no bufs, etc. 0 output packets discarded due to no route 0 output datagrams fragmented 0 fragments created 0 datagrams that can't be fragmented 0 packets that violated scope rules 0 multicast packets which we don't join Input histogram: UDP: 10000 fragment: 160000 Mbuf statistics: 10000 one mbuf 160000 one ext mbuf 0 two or more ext mbuf 0 packets whose headers are not contiguous 0 tunneling packets that can't find gif 0 packets discarded because of too many headers 1 failure of source address selection source addresses on a non-outgoing I/F 1 addresses scope=%x Source addresses selection rule applied: 1 same address
160000 fragments were received, 30000 packets are reassembled ok.
It is the expected result.
The outputs of rss-udp-srv:
% ./rss-udp-srv 0 starting: tid=0, rss_bucket=0, cpuid=0 starting: tid=1, rss_bucket=1, cpuid=1 starting: tid=2, rss_bucket=2, cpuid=2 [1] th=0x801615480 [2] th=0x801615500 starting: tid=3, rss_bucket=3, cpuid=3 starting: tid=4, rss_bucket=4, cpuid=0 starting: tid=5, rss_bucket=5, cpuid=1 starting: tid=6, rss_bucket=6, cpuid=2 starting: tid=7, rss_bucket=7, cpuid=3 [6] th=0x801615700 [3] th=0x801615580 [7] th=0x801615780 [0] th=0x801615400 [4] th=0x801615600 [5] th=0x801615680 count(flowid=6e0f86ef)=10000 count(flowid=6e0f86ef)=20000 count(flowid=6e0f86ef)=26160 count(flowid=6e0f86ef)=30000 count(flowid=6e0f86ef)=38865 count(flowid=6e0f86ef)=40000 count(flowid=6e0f86ef)=40000 ^C
40000 UDP packets were received, and their flowids were all the same.
It is the expected result.
So ip6_direct worked correctly!
[1] https://github.com/btw616/ip6_direct_test/tree/master/rss-udp-srv
[2] https://github.com/btw616/ip6_direct_test/tree/master/pktgen
Diff Detail
- Repository
- rS FreeBSD src repository - subversion
- Lint
Lint Passed - Unit
No Test Coverage - Build Status
Buildable 725 Build 725: arc lint + arc unit
Event Timeline
sys/netinet6/frag6.c | ||
---|---|---|
605 | Hi, This bit is fixing a separate bug, right? Are we able to commit just this bit first as a separate fix? |
sys/netinet6/frag6.c | ||
---|---|---|
605 | That bit is not fixing a separate bug. Previously, after the processing /* * Tell launch routine the next header */ *mp = m; *offp = offset; IP6Q_UNLOCK(); return nxt; And the processing of the next header will be invoked by the following while (nxt != IPPROTO_DONE) { ...... nxt = (*inet6sw[ip6_protox[nxt]].pr_input)(&m, &off, nxt); } But now, the processing of the rest headers (headers after the fragment PS. I'm not pretty sure whether it is the best choice to save the protocol Many thanks! ^_^ |
Fix the bug related to atomic fragments in the previous patch
Atomic fragments should also be dispatched to ip6_direct.
sys/netinet6/ip6_input.c | ||
---|---|---|
793 | Previously, M_RTALERT_MLD will be set only when this is an ICMPV6 packet. |
sys/netinet6/frag6.c | ||
---|---|---|
594 | You should just define it as a struct with two uint32_t members. That way (a) it doesn't change size based on different architectures (eg assuming int is a fixed 32 bit size isn't valid!) and (b) it's less error prone to use a struct with named fields. |
sys/netinet6/ip6_input.c | ||
---|---|---|
437 | ok, so what about for direct-dispatch v6 that isn't fragment related? eg, IPSEC tunnel decap, GRE or IPIP tunnel decap, etc? Would there be a hop-by-hop parsing mbuf tag in that instance? Maybe we should handle that case by just assuming it's a fully formed v6 packet that we're not doing incremental parsing and to just reinject it? Would that even work? |
sys/netinet6/ip6_input.c | ||
---|---|---|
437 | I'm not pretty familiar with these things. But by reading their codes, I think they are something different from fragment. Take GRE as an example: When a GRE packet is received, gre_input will be called: struct protosw in6_gre_protosw = { ...... .pr_protocol = IPPROTO_GRE, ...... .pr_input = gre_input, ...... }; int gre_input(struct mbuf **mp, int *offp, int proto) { ...... gh = (struct grehdr *)mtodo(m, *offp); ...... hlen = 2 * sizeof(uint16_t); if (flags & GRE_FLAGS_CP) { ...... hlen += 2 * sizeof(uint16_t); ...... } ...... switch (ntohs(gh->gre_proto)) { ...... case ETHERTYPE_IP: isr = NETISR_IP; af = AF_INET; break; case ETHERTYPE_IPV6: isr = NETISR_IPV6; af = AF_INET6; break; default: goto drop; } m_adj(m, *offp + hlen); ...... if ((ifp->if_flags & IFF_MONITOR) != 0) m_freem(m); else netisr_dispatch(isr, m); ...... } When the GRE parsing is done, the GRE header will be trimmed. And the rest is a new ip or ip6 packet. And gre_input() will and should dispatch it to ip_input() or ip6_input(). |
Right - so it may have a new src/dst L3/L4 which means it needs to go back through the stack.
So as long as it's going back through and getting re-hashed on its trip though, I think we're okay.
How about adding a counter for when we drop frames on that direct dispatch v6 netisr so people can see when things are failing? Then I think we'll be ready.
I think we should make sure that the tag is prepended to mbuf before we reinject it to ip6_direct_input(), that is to say, this tag is mandatory. So, I'm wondering whether it is a better choice to replace current error handling codes with a KASSERT().
sys/netinet6/frag6.c | ||
---|---|---|
591 | I'm wondering whether it is a better choice to replace these error handling codes with a KASSERT(). |
I mean this line!!!
sys/netinet6/ip6_input.c | ||
---|---|---|
436 | I'm wondering whether it is a better choice to replace these error handling codes with a KASSERT(). |
Replace the error handling code in ip6_direct_input() with KASSERT(), because this tag is mandatory.
ok, this looks good. I'll see if I can get gnn / bz to review this before I commit it to -HEAD.
Thanks very much for digging into this!
commit aaa46574b048573f1c81fde37cacc5f6003bcaa9
Author: Adrian Chadd <adrian@FreeBSD.org>
Date: Fri Nov 6 23:07:43 2015 +0000
[netinet6]: Create a new IPv6 netisr which expects the frames to have been verified. This is required for fragments and encapsulated data (eg tunneling) to be redistributed to the RSS bucket based on the eventual IPv6 header and protocol (TCP, UDP, etc) header. * Add an mbuf tag with the state of IPv6 options parsing before the frame is queued into the direct dispatch handler; * Continue processing and complete the frame reception in the correct RSS bucket / netisr context. Testing results are in the phabricator review. Differential Revision: https://reviews.freebsd.org/D3563 Submitted by: Tiwei Bie <btw@mail.ustc.edu.cn>
Notes:
svn path=/head/; revision=290471