Create a new IPv6 netisr which expects the frames to have been verified;
it just directly dispatches to the IPv6 input path.
Details
I slightly modified rss-udp-srv [1] to count the flowid of each packet:
diff --git a/rss-udp-srv/udp_srv.c b/rss-udp-srv/udp_srv.c index 8f49f2a..ad75361 100644 --- a/rss-udp-srv/udp_srv.c +++ b/rss-udp-srv/udp_srv.c @@ -1,5 +1,6 @@ #include <stdio.h> #include <stdlib.h> +#include <assert.h> #include <unistd.h> #include <string.h> #include <err.h> @@ -43,6 +44,14 @@ struct udp_srv_thread { struct event *ev_read6, *ev_write6; }; +struct flowid_node { + uint32_t flowid; + int64_t count; +}; + +#define NR_FLOWIDS 10 +struct flowid_node flowids[NR_FLOWIDS]; + static int thr_sock_set_reuseaddr(int fd, int reuse_addr) { @@ -96,7 +105,7 @@ thr_rss_udp_listen_sock_setup(int fd, int af_family, int rss_bucket) return (-1); } -#if 0 +#if 1 if (rss_sock_set_recvrss(fd, af_family, rss_bucket) < 0) { return (-1); } @@ -208,12 +217,13 @@ error: } static void -thr_parse_msghdr(struct msghdr *m) +thr_parse_msghdr(struct msghdr *m, int ver) { const struct cmsghdr *c; uint32_t flowid; uint32_t flowtype; uint32_t flow_rssbucket; + int i; for (c = CMSG_FIRSTHDR(m); c != NULL; c = CMSG_NXTHDR(m, c)) { #if 0 @@ -221,28 +231,72 @@ thr_parse_msghdr(struct msghdr *m) printf(" msghdr type: %d\n", c->cmsg_type); printf(" msghdr len: %d\n", c->cmsg_len); #endif - if (c->cmsg_level != IPPROTO_IP) - continue; - switch (c->cmsg_type) { - case IP_FLOWID: - flowid = *(uint32_t *) CMSG_DATA(c); - break; - case IP_FLOWTYPE: - flowtype = *(uint32_t *) CMSG_DATA(c); - break; - case IP_RSSBUCKETID: - flow_rssbucket = *(uint32_t *) CMSG_DATA(c); - break; + if (ver == 4) { + if (c->cmsg_level != IPPROTO_IP) + continue; + switch (c->cmsg_type) { + case IP_FLOWID: + flowid = *(uint32_t *) CMSG_DATA(c); + break; + case IP_FLOWTYPE: + flowtype = *(uint32_t *) CMSG_DATA(c); + break; + case IP_RSSBUCKETID: + flow_rssbucket = *(uint32_t *) CMSG_DATA(c); + break; + } + } else { + if (c->cmsg_level != IPPROTO_IPV6) + continue; + switch (c->cmsg_type) { + case IPV6_FLOWID: + flowid = *(uint32_t *) CMSG_DATA(c); + break; + case IPV6_FLOWTYPE: + flowtype = *(uint32_t *) CMSG_DATA(c); + break; + case IPV6_RSSBUCKETID: + flow_rssbucket = *(uint32_t *) CMSG_DATA(c); + break; + } + } + } + + if (ver == 4) { + if (flowtype != 7) { + printf("flowtype=%d\n", flowtype); + exit(127); + } + } else { + if (flowtype != 9) { + printf("flowtype=%d\n", flowtype); + exit(127); } } #if 0 - printf(" flowid=0x%08x; flowtype=%d; bucket=%d\n", flowid, flowtype, flow_rssbucket); + printf(" flowid=0x%08x; flowtype=%d; bucket=%d; version=%d\n", flowid, flowtype, flow_rssbucket, ver); #endif + + for (i = 0; i < NR_FLOWIDS && flowids[i].count != -1; i++) { + if (flowid == flowids[i].flowid) { + flowids[i].count++; + break; + } + } + if (i == NR_FLOWIDS) { + fprintf(stderr, "Too many flowids appeared\n"); + exit(127); + } + if (flowids[i].count == -1) { + flowids[i].flowid = flowid; + flowids[i].count = 1; + } } static void thr_ev_timer(int fd, short what, void *arg) { +#if 0 struct udp_srv_thread *th = arg; struct timeval tv; @@ -260,11 +314,12 @@ thr_ev_timer(int fd, short what, void *arg) tv.tv_sec = 1; tv.tv_usec = 0; evtimer_add(th->ev_timer, &tv); +#endif } static void -thr_udp_ev_read(int fd, short what, void *arg) +thr_udp_ev_read(int fd, short what, void *arg, int ver) { struct udp_srv_thread *th = arg; /* XXX should be thread-local, and a larger buffer, and likely a queue .. */ @@ -274,7 +329,7 @@ thr_udp_ev_read(int fd, short what, void *arg) struct sockaddr_storage sin; socklen_t sin_len; -#if 0 +#if 1 /* for the msghdr contents */ struct msghdr m; char msgbuf[2048]; @@ -286,7 +341,7 @@ thr_udp_ev_read(int fd, short what, void *arg) /* Loop read UDP frames until EWOULDBLOCK or 1024 frames */ while (i < 10240) { -#if 0 +#if 1 iov[0].iov_base = buf; iov[0].iov_len = 2048; @@ -299,22 +354,24 @@ thr_udp_ev_read(int fd, short what, void *arg) m.msg_flags = 0; ret = recvmsg(fd, &m, 0); -#endif +#else sin_len = sizeof(sin); ret = recvfrom(fd, buf, 2048, MSG_DONTWAIT, (struct sockaddr *) &sin, &sin_len); +#endif if (ret <= 0) { if (errno != EWOULDBLOCK) warn("%s: recv", __func__); break; } -#if 0 - printf(" recv: len=%d, controllen=%d\n", - (int) ret, - (int) m.msg_controllen); - thr_parse_msghdr(&m); +#if 1 + //printf(" recv: len=%d, controllen=%d\n", + // (int) ret, + // (int) m.msg_controllen); + thr_parse_msghdr(&m, ver); + //printf("%s\n", buf); #endif i++; th->recv_pkts++; @@ -336,9 +393,15 @@ thr_udp_ev_read(int fd, short what, void *arg) } static void +thr_udp_ev_read4(int fd, short what, void *arg) +{ + thr_udp_ev_read(fd, what, arg, 4); +} + +static void thr_udp_ev_read6(int fd, short what, void *arg) { - thr_udp_ev_read(fd, what, arg); + thr_udp_ev_read(fd, what, arg, 6); } static void * @@ -388,7 +451,7 @@ thr_udp_srv_init(void *arg) /* Create read and write readiness events */ th->ev_read = event_new(th->b, th->s4, EV_READ | EV_PERSIST, - thr_udp_ev_read, th); + thr_udp_ev_read4, th); event_add(th->ev_read, NULL); th->ev_read6 = event_new(th->b, th->s6, EV_READ | EV_PERSIST, @@ -428,7 +491,7 @@ main(int argc, char *argv[]) struct in6_addr lcl6_addr; int do_response; - if (argc < 3) { + if (argc < 2) { printf("Usage: %s <response> <ipv4 lcl address>\n", argv[0]); printf(" response: 1 if each RX packet generates a TX response, else 0\n"); printf(" ipv4 lcl address: IPv4 local address to bind to\n"); @@ -438,7 +501,7 @@ main(int argc, char *argv[]) lcl_addr.s_addr = INADDR_ANY; lcl6_addr = in6addr_any; do_response = atoi(argv[1]); - (void) inet_aton(argv[2], &lcl_addr); + //(void) inet_aton(argv[2], &lcl_addr); ncpu = rss_getsysctlint("net.inet.rss.ncpus"); if (ncpu < 0) { @@ -488,6 +551,11 @@ main(int argc, char *argv[]) if (sigemptyset(&sa.sa_mask) == -1 || sigaction(SIGPIPE, &sa, 0) == -1) perror("failed to ignore SIGPIPE; sigaction"); + for (i = 0; i < NR_FLOWIDS; i++) { + flowids[i].flowid = -1; + flowids[i].count = -1; + } + for (i = 0; i < nbuckets; i++) { th[i].tid = i; th[i].rss_bucket = i; @@ -504,6 +572,23 @@ main(int argc, char *argv[]) (void) pthread_create(&th[i].thr, NULL, thr_udp_srv_init, &th[i]); } +#if 0 + while (1) { + for (i = 0; i < nbuckets; i++) { + printf("bucket%d=%lu ", i, th[i].recv_pkts); + } + printf("\n"); + sleep(1); + } +#endif + while (1) { + for (i = 0; i < NR_FLOWIDS && flowids[i].count != -1; i++) + printf("count(flowid=%08x)=%ld ", flowids[i].flowid, + flowids[i].count); + printf("\n"); + sleep(1); + } + /* Wait */ for (i = 0; i < nbuckets; i++) { (void) pthread_join(th[i].thr, NULL);
And I enabled UDP 4-tuple hashing to make sure the RSS hash is
recalculated as the type of RSS_HASHTYPE_RSS_UDP_IPV4 or
RSS_HASHTYPE_RSS_UDP_IPV6 by np_m2cpuid (for testing purpose).
diff --git a/sys/net/rss_config.c b/sys/net/rss_config.c index e7e8eb4..320ed9b 100644 --- a/sys/net/rss_config.c +++ b/sys/net/rss_config.c @@ -483,14 +483,14 @@ rss_gethashconfig(void) return ( RSS_HASHTYPE_RSS_IPV4 | RSS_HASHTYPE_RSS_TCP_IPV4 + | RSS_HASHTYPE_RSS_UDP_IPV4 | RSS_HASHTYPE_RSS_IPV6 | RSS_HASHTYPE_RSS_TCP_IPV6 + | RSS_HASHTYPE_RSS_UDP_IPV6 | RSS_HASHTYPE_RSS_IPV6_EX | RSS_HASHTYPE_RSS_TCP_IPV6_EX #if 0 - | RSS_HASHTYPE_RSS_UDP_IPV4 | RSS_HASHTYPE_RSS_UDP_IPV4_EX - | RSS_HASHTYPE_RSS_UDP_IPV6 | RSS_HASHTYPE_RSS_UDP_IPV6_EX #endif );
And I patched udp6_input() to be able to handle "atomic" fragments
correctly:
diff --git a/sys/netinet6/udp6_usrreq.c b/sys/netinet6/udp6_usrreq.c index 98790a8..da72f00 100644 --- a/sys/netinet6/udp6_usrreq.c +++ b/sys/netinet6/udp6_usrreq.c @@ -207,7 +207,7 @@ udp6_input(struct mbuf **mp, int *offp, int proto) struct sockaddr_in6 fromsa; struct m_tag *fwd_tag; uint16_t uh_sum; - uint8_t nxt; + uint8_t nxt = proto; ifp = m->m_pkthdr.rcvif; ip6 = mtod(m, struct ip6_hdr *); @@ -233,7 +233,6 @@ udp6_input(struct mbuf **mp, int *offp, int proto) plen = ntohs(ip6->ip6_plen) - off + sizeof(*ip6); ulen = ntohs((u_short)uh->uh_ulen); - nxt = ip6->ip6_nxt; cscov_partial = (nxt == IPPROTO_UDPLITE) ? 1 : 0; if (nxt == IPPROTO_UDPLITE) { /* Zero means checksum over the complete packet. */
Then I use pktgen [2] to generate UDP packets with the UDP payload
length varies in the range of 100 to 10000 bytes (packets will be
fragmented when the payload exceeds 800 bytes), but keep the 4-tuple
(saddr, sport, daddr, dport) consistent, and inject them into tap(4).
So the flowid of each packet received by rss-udp-srv should be the
same even if the packet is fragmented.
The script that is used to execute pktgen automatically:
% cat auto #!/bin/sh sudo ./pktgen -i tap0 -6 -n 10000 -l 100 # No fragment, 10000 packets sudo ./pktgen -i tap0 -6 -n 10000 -l 100 -f # atomic fragment sudo ./pktgen -i tap0 -6 -n 10000 -l 1000 # 2 fragments sudo ./pktgen -i tap0 -6 -n 10000 -l 10000 # 13 fragments
The outputs of netstat(1):
% netstat -s -p ip6 ip6: 170000 total packets received 0 with size smaller than minimum 0 with data size < data length 0 with bad options 0 with incorrect version number 160000 fragments received 0 fragments dropped (dup or out of space) 0 fragments dropped after timeout 0 fragments that exceeded limit 30000 packets reassembled ok 170000 packets for this host 0 packets forwarded 0 packets not forwardable 0 redirects sent 11 packets sent from this host 0 packets sent with fabricated ip header 0 output packets dropped due to no bufs, etc. 0 output packets discarded due to no route 0 output datagrams fragmented 0 fragments created 0 datagrams that can't be fragmented 0 packets that violated scope rules 0 multicast packets which we don't join Input histogram: UDP: 10000 fragment: 160000 Mbuf statistics: 10000 one mbuf 160000 one ext mbuf 0 two or more ext mbuf 0 packets whose headers are not contiguous 0 tunneling packets that can't find gif 0 packets discarded because of too many headers 1 failure of source address selection source addresses on a non-outgoing I/F 1 addresses scope=%x Source addresses selection rule applied: 1 same address
160000 fragments were received, 30000 packets are reassembled ok.
It is the expected result.
The outputs of rss-udp-srv:
% ./rss-udp-srv 0 starting: tid=0, rss_bucket=0, cpuid=0 starting: tid=1, rss_bucket=1, cpuid=1 starting: tid=2, rss_bucket=2, cpuid=2 [1] th=0x801615480 [2] th=0x801615500 starting: tid=3, rss_bucket=3, cpuid=3 starting: tid=4, rss_bucket=4, cpuid=0 starting: tid=5, rss_bucket=5, cpuid=1 starting: tid=6, rss_bucket=6, cpuid=2 starting: tid=7, rss_bucket=7, cpuid=3 [6] th=0x801615700 [3] th=0x801615580 [7] th=0x801615780 [0] th=0x801615400 [4] th=0x801615600 [5] th=0x801615680 count(flowid=6e0f86ef)=10000 count(flowid=6e0f86ef)=20000 count(flowid=6e0f86ef)=26160 count(flowid=6e0f86ef)=30000 count(flowid=6e0f86ef)=38865 count(flowid=6e0f86ef)=40000 count(flowid=6e0f86ef)=40000 ^C
40000 UDP packets were received, and their flowids were all the same.
It is the expected result.
So ip6_direct worked correctly!
[1] https://github.com/btw616/ip6_direct_test/tree/master/rss-udp-srv
[2] https://github.com/btw616/ip6_direct_test/tree/master/pktgen
Diff Detail
- Repository
- rS FreeBSD src repository - subversion
- Lint
Lint Passed - Unit
No Test Coverage - Build Status
Buildable 725 Build 725: arc lint + arc unit
Event Timeline
sys/netinet6/frag6.c | ||
---|---|---|
605 | Hi, This bit is fixing a separate bug, right? Are we able to commit just this bit first as a separate fix? |
sys/netinet6/frag6.c | ||
---|---|---|
605 | That bit is not fixing a separate bug. Previously, after the processing /* * Tell launch routine the next header */ *mp = m; *offp = offset; IP6Q_UNLOCK(); return nxt; And the processing of the next header will be invoked by the following while (nxt != IPPROTO_DONE) { ...... nxt = (*inet6sw[ip6_protox[nxt]].pr_input)(&m, &off, nxt); } But now, the processing of the rest headers (headers after the fragment PS. I'm not pretty sure whether it is the best choice to save the protocol Many thanks! ^_^ |
Fix the bug related to atomic fragments in the previous patch
Atomic fragments should also be dispatched to ip6_direct.
sys/netinet6/ip6_input.c | ||
---|---|---|
793 | Previously, M_RTALERT_MLD will be set only when this is an ICMPV6 packet. |
sys/netinet6/frag6.c | ||
---|---|---|
594 | You should just define it as a struct with two uint32_t members. That way (a) it doesn't change size based on different architectures (eg assuming int is a fixed 32 bit size isn't valid!) and (b) it's less error prone to use a struct with named fields. |
sys/netinet6/ip6_input.c | ||
---|---|---|
437 | ok, so what about for direct-dispatch v6 that isn't fragment related? eg, IPSEC tunnel decap, GRE or IPIP tunnel decap, etc? Would there be a hop-by-hop parsing mbuf tag in that instance? Maybe we should handle that case by just assuming it's a fully formed v6 packet that we're not doing incremental parsing and to just reinject it? Would that even work? |
sys/netinet6/ip6_input.c | ||
---|---|---|
437 | I'm not pretty familiar with these things. But by reading their codes, I think they are something different from fragment. Take GRE as an example: When a GRE packet is received, gre_input will be called: struct protosw in6_gre_protosw = { ...... .pr_protocol = IPPROTO_GRE, ...... .pr_input = gre_input, ...... }; int gre_input(struct mbuf **mp, int *offp, int proto) { ...... gh = (struct grehdr *)mtodo(m, *offp); ...... hlen = 2 * sizeof(uint16_t); if (flags & GRE_FLAGS_CP) { ...... hlen += 2 * sizeof(uint16_t); ...... } ...... switch (ntohs(gh->gre_proto)) { ...... case ETHERTYPE_IP: isr = NETISR_IP; af = AF_INET; break; case ETHERTYPE_IPV6: isr = NETISR_IPV6; af = AF_INET6; break; default: goto drop; } m_adj(m, *offp + hlen); ...... if ((ifp->if_flags & IFF_MONITOR) != 0) m_freem(m); else netisr_dispatch(isr, m); ...... } When the GRE parsing is done, the GRE header will be trimmed. And the rest is a new ip or ip6 packet. And gre_input() will and should dispatch it to ip_input() or ip6_input(). |
Right - so it may have a new src/dst L3/L4 which means it needs to go back through the stack.
So as long as it's going back through and getting re-hashed on its trip though, I think we're okay.
How about adding a counter for when we drop frames on that direct dispatch v6 netisr so people can see when things are failing? Then I think we'll be ready.
I think we should make sure that the tag is prepended to mbuf before we reinject it to ip6_direct_input(), that is to say, this tag is mandatory. So, I'm wondering whether it is a better choice to replace current error handling codes with a KASSERT().
sys/netinet6/frag6.c | ||
---|---|---|
591 | I'm wondering whether it is a better choice to replace these error handling codes with a KASSERT(). |
I mean this line!!!
sys/netinet6/ip6_input.c | ||
---|---|---|
436 | I'm wondering whether it is a better choice to replace these error handling codes with a KASSERT(). |
Replace the error handling code in ip6_direct_input() with KASSERT(), because this tag is mandatory.
ok, this looks good. I'll see if I can get gnn / bz to review this before I commit it to -HEAD.
Thanks very much for digging into this!