Index: stable/12/tools/tools/netmap/lb.c =================================================================== --- stable/12/tools/tools/netmap/lb.c (revision 368033) +++ stable/12/tools/tools/netmap/lb.c (revision 368034) @@ -1,1028 +1,1095 @@ /* * Copyright (C) 2017 Corelight, Inc. and Universita` di Pisa. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ /* $FreeBSD$ */ -#include -#include #include -#include +#include #include +#include +#include /* htonl */ +#include +#include +#include +#include +#include +#include #include - -#define NETMAP_WITH_LIBS -#include +#include #include +#include -#include /* htonl */ - -#include - #include "pkt_hash.h" #include "ctrs.h" /* * use our version of header structs, rather than bringing in a ton * of platform specific ones */ #ifndef ETH_ALEN #define ETH_ALEN 6 #endif struct compact_eth_hdr { unsigned char h_dest[ETH_ALEN]; unsigned char h_source[ETH_ALEN]; u_int16_t h_proto; }; struct compact_ip_hdr { u_int8_t ihl:4, version:4; u_int8_t tos; u_int16_t tot_len; u_int16_t id; u_int16_t frag_off; u_int8_t ttl; u_int8_t protocol; u_int16_t check; u_int32_t saddr; u_int32_t daddr; }; struct compact_ipv6_hdr { u_int8_t priority:4, version:4; u_int8_t flow_lbl[3]; u_int16_t payload_len; u_int8_t nexthdr; u_int8_t hop_limit; struct in6_addr saddr; struct in6_addr daddr; }; #define MAX_IFNAMELEN 64 #define MAX_PORTNAMELEN (MAX_IFNAMELEN + 40) #define DEF_OUT_PIPES 2 #define DEF_EXTRA_BUFS 0 #define DEF_BATCH 2048 #define DEF_WAIT_LINK 2 #define DEF_STATS_INT 600 -#define BUF_REVOKE 100 +#define BUF_REVOKE 150 #define STAT_MSG_MAXSIZE 1024 static struct { - char ifname[MAX_IFNAMELEN]; - char base_name[MAX_IFNAMELEN]; + char ifname[MAX_IFNAMELEN + 1]; + char base_name[MAX_IFNAMELEN + 1]; int netmap_fd; uint16_t output_rings; uint16_t num_groups; uint32_t extra_bufs; uint16_t batch; int stdout_interval; int syslog_interval; int wait_link; bool busy_wait; } glob_arg; /* * the overflow queue is a circular queue of buffers */ struct overflow_queue { char name[MAX_IFNAMELEN + 16]; struct netmap_slot *slots; uint32_t head; uint32_t tail; uint32_t n; uint32_t size; }; static struct overflow_queue *freeq; static inline int oq_full(struct overflow_queue *q) { return q->n >= q->size; } static inline int oq_empty(struct overflow_queue *q) { return q->n <= 0; } static inline void oq_enq(struct overflow_queue *q, const struct netmap_slot *s) { if (unlikely(oq_full(q))) { D("%s: queue full!", q->name); abort(); } q->slots[q->tail] = *s; q->n++; q->tail++; if (q->tail >= q->size) q->tail = 0; } static inline struct netmap_slot oq_deq(struct overflow_queue *q) { struct netmap_slot s = q->slots[q->head]; if (unlikely(oq_empty(q))) { D("%s: queue empty!", q->name); abort(); } q->n--; q->head++; if (q->head >= q->size) q->head = 0; return s; } static volatile int do_abort = 0; static uint64_t dropped = 0; static uint64_t forwarded = 0; static uint64_t received_bytes = 0; static uint64_t received_pkts = 0; static uint64_t non_ip = 0; static uint32_t freeq_n = 0; struct port_des { char interface[MAX_PORTNAMELEN]; struct my_ctrs ctr; unsigned int last_sync; uint32_t last_tail; struct overflow_queue *oq; - struct nm_desc *nmd; + struct nmport_d *nmd; struct netmap_ring *ring; struct group_des *group; }; static struct port_des *ports; /* each group of pipes receives all the packets */ struct group_des { char pipename[MAX_IFNAMELEN]; struct port_des *ports; int first_id; int nports; int last; int custom_port; }; static struct group_des *groups; /* statistcs */ struct counters { struct timeval ts; struct my_ctrs *ctrs; uint64_t received_pkts; uint64_t received_bytes; uint64_t non_ip; uint32_t freeq_n; int status __attribute__((aligned(64))); #define COUNTERS_EMPTY 0 #define COUNTERS_FULL 1 }; static struct counters counters_buf; static void * print_stats(void *arg) { int npipes = glob_arg.output_rings; int sys_int = 0; (void)arg; struct my_ctrs cur, prev; struct my_ctrs *pipe_prev; pipe_prev = calloc(npipes, sizeof(struct my_ctrs)); if (pipe_prev == NULL) { D("out of memory"); exit(1); } char stat_msg[STAT_MSG_MAXSIZE] = ""; memset(&prev, 0, sizeof(prev)); while (!do_abort) { int j, dosyslog = 0, dostdout = 0, newdata; uint64_t pps = 0, dps = 0, bps = 0, dbps = 0, usec = 0; struct my_ctrs x; counters_buf.status = COUNTERS_EMPTY; newdata = 0; memset(&cur, 0, sizeof(cur)); sleep(1); if (counters_buf.status == COUNTERS_FULL) { __sync_synchronize(); newdata = 1; cur.t = counters_buf.ts; if (prev.t.tv_sec || prev.t.tv_usec) { usec = (cur.t.tv_sec - prev.t.tv_sec) * 1000000 + cur.t.tv_usec - prev.t.tv_usec; } } ++sys_int; if (glob_arg.stdout_interval && sys_int % glob_arg.stdout_interval == 0) dostdout = 1; if (glob_arg.syslog_interval && sys_int % glob_arg.syslog_interval == 0) dosyslog = 1; for (j = 0; j < npipes; ++j) { struct my_ctrs *c = &counters_buf.ctrs[j]; cur.pkts += c->pkts; cur.drop += c->drop; cur.drop_bytes += c->drop_bytes; cur.bytes += c->bytes; if (usec) { x.pkts = c->pkts - pipe_prev[j].pkts; x.drop = c->drop - pipe_prev[j].drop; x.bytes = c->bytes - pipe_prev[j].bytes; x.drop_bytes = c->drop_bytes - pipe_prev[j].drop_bytes; pps = (x.pkts*1000000 + usec/2) / usec; dps = (x.drop*1000000 + usec/2) / usec; bps = ((x.bytes*1000000 + usec/2) / usec) * 8; dbps = ((x.drop_bytes*1000000 + usec/2) / usec) * 8; } pipe_prev[j] = *c; if ( (dosyslog || dostdout) && newdata ) snprintf(stat_msg, STAT_MSG_MAXSIZE, "{" "\"ts\":%.6f," "\"interface\":\"%s\"," "\"output_ring\":%" PRIu16 "," "\"packets_forwarded\":%" PRIu64 "," "\"packets_dropped\":%" PRIu64 "," "\"data_forward_rate_Mbps\":%.4f," "\"data_drop_rate_Mbps\":%.4f," "\"packet_forward_rate_kpps\":%.4f," "\"packet_drop_rate_kpps\":%.4f," "\"overflow_queue_size\":%" PRIu32 "}", cur.t.tv_sec + (cur.t.tv_usec / 1000000.0), ports[j].interface, j, c->pkts, c->drop, (double)bps / 1024 / 1024, (double)dbps / 1024 / 1024, (double)pps / 1000, (double)dps / 1000, c->oq_n); if (dosyslog && stat_msg[0]) syslog(LOG_INFO, "%s", stat_msg); if (dostdout && stat_msg[0]) printf("%s\n", stat_msg); } if (usec) { x.pkts = cur.pkts - prev.pkts; x.drop = cur.drop - prev.drop; x.bytes = cur.bytes - prev.bytes; x.drop_bytes = cur.drop_bytes - prev.drop_bytes; pps = (x.pkts*1000000 + usec/2) / usec; dps = (x.drop*1000000 + usec/2) / usec; bps = ((x.bytes*1000000 + usec/2) / usec) * 8; dbps = ((x.drop_bytes*1000000 + usec/2) / usec) * 8; } if ( (dosyslog || dostdout) && newdata ) snprintf(stat_msg, STAT_MSG_MAXSIZE, "{" "\"ts\":%.6f," "\"interface\":\"%s\"," "\"output_ring\":null," "\"packets_received\":%" PRIu64 "," "\"packets_forwarded\":%" PRIu64 "," "\"packets_dropped\":%" PRIu64 "," "\"non_ip_packets\":%" PRIu64 "," "\"data_forward_rate_Mbps\":%.4f," "\"data_drop_rate_Mbps\":%.4f," "\"packet_forward_rate_kpps\":%.4f," "\"packet_drop_rate_kpps\":%.4f," "\"free_buffer_slots\":%" PRIu32 "}", cur.t.tv_sec + (cur.t.tv_usec / 1000000.0), glob_arg.ifname, received_pkts, cur.pkts, cur.drop, counters_buf.non_ip, (double)bps / 1024 / 1024, (double)dbps / 1024 / 1024, (double)pps / 1000, (double)dps / 1000, counters_buf.freeq_n); if (dosyslog && stat_msg[0]) syslog(LOG_INFO, "%s", stat_msg); if (dostdout && stat_msg[0]) printf("%s\n", stat_msg); prev = cur; } free(pipe_prev); return NULL; } static void free_buffers(void) { int i, tot = 0; struct port_des *rxport = &ports[glob_arg.output_rings]; /* build a netmap free list with the buffers in all the overflow queues */ for (i = 0; i < glob_arg.output_rings + 1; i++) { struct port_des *cp = &ports[i]; struct overflow_queue *q = cp->oq; if (!q) continue; while (q->n) { struct netmap_slot s = oq_deq(q); uint32_t *b = (uint32_t *)NETMAP_BUF(cp->ring, s.buf_idx); *b = rxport->nmd->nifp->ni_bufs_head; rxport->nmd->nifp->ni_bufs_head = s.buf_idx; tot++; } } D("added %d buffers to netmap free list", tot); for (i = 0; i < glob_arg.output_rings + 1; ++i) { - nm_close(ports[i].nmd); + nmport_close(ports[i].nmd); } } static void sigint_h(int sig) { (void)sig; /* UNUSED */ do_abort = 1; signal(SIGINT, SIG_DFL); } static void usage() { printf("usage: lb [options]\n"); printf("where options are:\n"); printf(" -h view help text\n"); printf(" -i iface interface name (required)\n"); printf(" -p [prefix:]npipes add a new group of output pipes\n"); printf(" -B nbufs number of extra buffers (default: %d)\n", DEF_EXTRA_BUFS); printf(" -b batch batch size (default: %d)\n", DEF_BATCH); printf(" -w seconds wait for link up (default: %d)\n", DEF_WAIT_LINK); printf(" -W enable busy waiting. this will run your CPU at 100%%\n"); printf(" -s seconds seconds between syslog stats messages (default: 0)\n"); printf(" -o seconds seconds between stdout stats messages (default: 0)\n"); exit(0); } static int parse_pipes(const char *spec) { const char *end = index(spec, ':'); static int max_groups = 0; struct group_des *g; ND("spec %s num_groups %d", spec, glob_arg.num_groups); if (max_groups < glob_arg.num_groups + 1) { size_t size = sizeof(*g) * (glob_arg.num_groups + 1); groups = realloc(groups, size); if (groups == NULL) { D("out of memory"); return 1; } } g = &groups[glob_arg.num_groups]; memset(g, 0, sizeof(*g)); if (end != NULL) { if (end - spec > MAX_IFNAMELEN - 8) { D("name '%s' too long", spec); return 1; } if (end == spec) { D("missing prefix before ':' in '%s'", spec); return 1; } strncpy(g->pipename, spec, end - spec); g->custom_port = 1; end++; } else { /* no prefix, this group will use the * name of the input port. * This will be set in init_groups(), * since here the input port may still * be uninitialized */ end = spec; } if (*end == '\0') { g->nports = DEF_OUT_PIPES; } else { g->nports = atoi(end); if (g->nports < 1) { D("invalid number of pipes '%s' (must be at least 1)", end); return 1; } } glob_arg.output_rings += g->nports; glob_arg.num_groups++; return 0; } /* complete the initialization of the groups data structure */ static void init_groups(void) { int i, j, t = 0; struct group_des *g = NULL; for (i = 0; i < glob_arg.num_groups; i++) { g = &groups[i]; g->ports = &ports[t]; for (j = 0; j < g->nports; j++) g->ports[j].group = g; t += g->nports; if (!g->custom_port) strcpy(g->pipename, glob_arg.base_name); for (j = 0; j < i; j++) { struct group_des *h = &groups[j]; if (!strcmp(h->pipename, g->pipename)) g->first_id += h->nports; } } g->last = 1; } + +/* To support packets that span multiple slots (NS_MOREFRAG) we + * need to make sure of the following: + * + * - all fragments of the same packet must go to the same output pipe + * - when dropping, all fragments of the same packet must be dropped + * + * For the former point we remember and reuse the last hash computed + * in each input ring, and only update it when NS_MOREFRAG was not + * set in the last received slot (this marks the start of a new packet). + * + * For the latter point, we only update the output ring head pointer + * when an entire packet has been forwarded. We keep a shadow_head + * pointer to know where to put the next partial fragment and, + * when the need to drop arises, we roll it back to head. + */ +struct morefrag { + uint16_t last_flag; /* for intput rings */ + uint32_t last_hash; /* for input rings */ + uint32_t shadow_head; /* for output rings */ +}; + /* push the packet described by slot rs to the group g. * This may cause other buffers to be pushed down the * chain headed by g. * Return a free buffer. */ static uint32_t forward_packet(struct group_des *g, struct netmap_slot *rs) { uint32_t hash = rs->ptr; uint32_t output_port = hash % g->nports; struct port_des *port = &g->ports[output_port]; struct netmap_ring *ring = port->ring; struct overflow_queue *q = port->oq; + struct morefrag *mf = (struct morefrag *)ring->sem; + uint16_t curmf = rs->flags & NS_MOREFRAG; /* Move the packet to the output pipe, unless there is * either no space left on the ring, or there is some * packet still in the overflow queue (since those must * take precedence over the new one) */ - if (ring->head != ring->tail && (q == NULL || oq_empty(q))) { - struct netmap_slot *ts = &ring->slot[ring->head]; + if (mf->shadow_head != ring->tail && (q == NULL || oq_empty(q))) { + struct netmap_slot *ts = &ring->slot[mf->shadow_head]; struct netmap_slot old_slot = *ts; ts->buf_idx = rs->buf_idx; ts->len = rs->len; - ts->flags |= NS_BUF_CHANGED; + ts->flags = rs->flags | NS_BUF_CHANGED; ts->ptr = rs->ptr; - ring->head = nm_ring_next(ring, ring->head); + mf->shadow_head = nm_ring_next(ring, mf->shadow_head); + if (!curmf) { + ring->head = mf->shadow_head; + } + ND("curmf %2x ts->flags %2x shadow_head %3u head %3u tail %3u", + curmf, ts->flags, mf->shadow_head, ring->head, ring->tail); port->ctr.bytes += rs->len; port->ctr.pkts++; forwarded++; return old_slot.buf_idx; } /* use the overflow queue, if available */ if (q == NULL || oq_full(q)) { + uint32_t scan; /* no space left on the ring and no overflow queue * available: we are forced to drop the packet */ + + /* drop previous fragments, if any */ + for (scan = ring->head; scan != mf->shadow_head; + scan = nm_ring_next(ring, scan)) { + struct netmap_slot *ts = &ring->slot[scan]; + dropped++; + port->ctr.drop_bytes += ts->len; + } + mf->shadow_head = ring->head; + dropped++; port->ctr.drop++; port->ctr.drop_bytes += rs->len; return rs->buf_idx; } oq_enq(q, rs); /* * we cannot continue down the chain and we need to * return a free buffer now. We take it from the free queue. */ if (oq_empty(freeq)) { /* the free queue is empty. Revoke some buffers * from the longest overflow queue */ uint32_t j; struct port_des *lp = &ports[0]; uint32_t max = lp->oq->n; /* let lp point to the port with the longest queue */ for (j = 1; j < glob_arg.output_rings; j++) { struct port_des *cp = &ports[j]; if (cp->oq->n > max) { lp = cp; max = cp->oq->n; } } /* move the oldest BUF_REVOKE buffers from the * lp queue to the free queue + * + * We cannot revoke a partially received packet. + * To make thinks simple we make sure to leave + * at least NETMAP_MAX_FRAGS slots in the queue. */ - // XXX optimize this cycle - for (j = 0; lp->oq->n && j < BUF_REVOKE; j++) { + for (j = 0; lp->oq->n > NETMAP_MAX_FRAGS && j < BUF_REVOKE; j++) { struct netmap_slot tmp = oq_deq(lp->oq); dropped++; lp->ctr.drop++; lp->ctr.drop_bytes += tmp.len; oq_enq(freeq, &tmp); } ND(1, "revoked %d buffers from %s", j, lq->name); } return oq_deq(freeq).buf_idx; } int main(int argc, char **argv) { int ch; uint32_t i; int rv; unsigned int iter = 0; int poll_timeout = 10; /* default */ glob_arg.ifname[0] = '\0'; glob_arg.output_rings = 0; glob_arg.batch = DEF_BATCH; glob_arg.wait_link = DEF_WAIT_LINK; glob_arg.busy_wait = false; glob_arg.syslog_interval = 0; glob_arg.stdout_interval = 0; while ( (ch = getopt(argc, argv, "hi:p:b:B:s:o:w:W")) != -1) { switch (ch) { case 'i': D("interface is %s", optarg); if (strlen(optarg) > MAX_IFNAMELEN - 8) { D("ifname too long %s", optarg); return 1; } if (strncmp(optarg, "netmap:", 7) && strncmp(optarg, "vale", 4)) { sprintf(glob_arg.ifname, "netmap:%s", optarg); } else { strcpy(glob_arg.ifname, optarg); } break; case 'p': if (parse_pipes(optarg)) { usage(); return 1; } break; case 'B': glob_arg.extra_bufs = atoi(optarg); D("requested %d extra buffers", glob_arg.extra_bufs); break; case 'b': glob_arg.batch = atoi(optarg); D("batch is %d", glob_arg.batch); break; case 'w': glob_arg.wait_link = atoi(optarg); D("link wait for up time is %d", glob_arg.wait_link); break; case 'W': glob_arg.busy_wait = true; break; case 'o': glob_arg.stdout_interval = atoi(optarg); break; case 's': glob_arg.syslog_interval = atoi(optarg); break; case 'h': usage(); return 0; break; default: D("bad option %c %s", ch, optarg); usage(); return 1; } } if (glob_arg.ifname[0] == '\0') { D("missing interface name"); usage(); return 1; } - /* extract the base name */ - char *nscan = strncmp(glob_arg.ifname, "netmap:", 7) ? - glob_arg.ifname : glob_arg.ifname + 7; - strncpy(glob_arg.base_name, nscan, MAX_IFNAMELEN - 1); - for (nscan = glob_arg.base_name; *nscan && !index("-*^{}/@", *nscan); nscan++) - ; - *nscan = '\0'; - if (glob_arg.num_groups == 0) parse_pipes(""); if (glob_arg.syslog_interval) { setlogmask(LOG_UPTO(LOG_INFO)); openlog("lb", LOG_CONS | LOG_PID | LOG_NDELAY, LOG_LOCAL1); } uint32_t npipes = glob_arg.output_rings; pthread_t stat_thread; ports = calloc(npipes + 1, sizeof(struct port_des)); if (!ports) { D("failed to allocate the stats array"); return 1; } struct port_des *rxport = &ports[npipes]; + + rxport->nmd = nmport_prepare(glob_arg.ifname); + if (rxport->nmd == NULL) { + D("cannot parse %s", glob_arg.ifname); + return (1); + } + /* extract the base name */ + strncpy(glob_arg.base_name, rxport->nmd->hdr.nr_name, MAX_IFNAMELEN); + init_groups(); memset(&counters_buf, 0, sizeof(counters_buf)); counters_buf.ctrs = calloc(npipes, sizeof(struct my_ctrs)); if (!counters_buf.ctrs) { D("failed to allocate the counters snapshot buffer"); return 1; } - /* we need base_req to specify pipes and extra bufs */ - struct nmreq base_req; - memset(&base_req, 0, sizeof(base_req)); + rxport->nmd->reg.nr_extra_bufs = glob_arg.extra_bufs; - base_req.nr_arg1 = npipes; - base_req.nr_arg3 = glob_arg.extra_bufs; - - rxport->nmd = nm_open(glob_arg.ifname, &base_req, 0, NULL); - - if (rxport->nmd == NULL) { + if (nmport_open_desc(rxport->nmd) < 0) { D("cannot open %s", glob_arg.ifname); return (1); - } else { - D("successfully opened %s (tx rings: %u)", glob_arg.ifname, - rxport->nmd->req.nr_tx_slots); } + D("successfully opened %s", glob_arg.ifname); - uint32_t extra_bufs = rxport->nmd->req.nr_arg3; + uint32_t extra_bufs = rxport->nmd->reg.nr_extra_bufs; struct overflow_queue *oq = NULL; /* reference ring to access the buffers */ rxport->ring = NETMAP_RXRING(rxport->nmd->nifp, 0); if (!glob_arg.extra_bufs) goto run; D("obtained %d extra buffers", extra_bufs); if (!extra_bufs) goto run; /* one overflow queue for each output pipe, plus one for the * free extra buffers */ oq = calloc(npipes + 1, sizeof(struct overflow_queue)); if (!oq) { D("failed to allocated overflow queues descriptors"); goto run; } freeq = &oq[npipes]; rxport->oq = freeq; freeq->slots = calloc(extra_bufs, sizeof(struct netmap_slot)); if (!freeq->slots) { D("failed to allocate the free list"); } freeq->size = extra_bufs; snprintf(freeq->name, MAX_IFNAMELEN, "free queue"); /* * the list of buffers uses the first uint32_t in each buffer * as the index of the next buffer. */ uint32_t scan; for (scan = rxport->nmd->nifp->ni_bufs_head; scan; scan = *(uint32_t *)NETMAP_BUF(rxport->ring, scan)) { struct netmap_slot s; s.len = s.flags = 0; s.ptr = 0; s.buf_idx = scan; ND("freeq <- %d", s.buf_idx); oq_enq(freeq, &s); } if (freeq->n != extra_bufs) { D("something went wrong: netmap reported %d extra_bufs, but the free list contained %d", extra_bufs, freeq->n); return 1; } rxport->nmd->nifp->ni_bufs_head = 0; run: atexit(free_buffers); int j, t = 0; for (j = 0; j < glob_arg.num_groups; j++) { struct group_des *g = &groups[j]; int k; for (k = 0; k < g->nports; ++k) { struct port_des *p = &g->ports[k]; snprintf(p->interface, MAX_PORTNAMELEN, "%s%s{%d/xT@%d", (strncmp(g->pipename, "vale", 4) ? "netmap:" : ""), g->pipename, g->first_id + k, - rxport->nmd->req.nr_arg2); + rxport->nmd->reg.nr_mem_id); D("opening pipe named %s", p->interface); - p->nmd = nm_open(p->interface, NULL, 0, rxport->nmd); + p->nmd = nmport_open(p->interface); if (p->nmd == NULL) { D("cannot open %s", p->interface); return (1); - } else if (p->nmd->req.nr_arg2 != rxport->nmd->req.nr_arg2) { + } else if (p->nmd->mem != rxport->nmd->mem) { D("failed to open pipe #%d in zero-copy mode, " "please close any application that uses either pipe %s}%d, " "or %s{%d, and retry", k + 1, g->pipename, g->first_id + k, g->pipename, g->first_id + k); return (1); } else { + struct morefrag *mf; + D("successfully opened pipe #%d %s (tx slots: %d)", - k + 1, p->interface, p->nmd->req.nr_tx_slots); + k + 1, p->interface, p->nmd->reg.nr_tx_slots); p->ring = NETMAP_TXRING(p->nmd->nifp, 0); p->last_tail = nm_ring_next(p->ring, p->ring->tail); + mf = (struct morefrag *)p->ring->sem; + mf->last_flag = 0; /* unused */ + mf->last_hash = 0; /* unused */ + mf->shadow_head = p->ring->head; } D("zerocopy %s", (rxport->nmd->mem == p->nmd->mem) ? "enabled" : "disabled"); if (extra_bufs) { struct overflow_queue *q = &oq[t + k]; q->slots = calloc(extra_bufs, sizeof(struct netmap_slot)); if (!q->slots) { D("failed to allocate overflow queue for pipe %d", k); /* make all overflow queue management fail */ extra_bufs = 0; } q->size = extra_bufs; snprintf(q->name, sizeof(q->name), "oq %s{%4d", g->pipename, k); p->oq = q; } } t += g->nports; } if (glob_arg.extra_bufs && !extra_bufs) { if (oq) { for (i = 0; i < npipes + 1; i++) { free(oq[i].slots); oq[i].slots = NULL; } free(oq); oq = NULL; } D("*** overflow queues disabled ***"); } sleep(glob_arg.wait_link); /* start stats thread after wait_link */ if (pthread_create(&stat_thread, NULL, print_stats, NULL) == -1) { D("unable to create the stats thread: %s", strerror(errno)); return 1; } struct pollfd pollfd[npipes + 1]; memset(&pollfd, 0, sizeof(pollfd)); signal(SIGINT, sigint_h); /* make sure we wake up as often as needed, even when there are no * packets coming in */ if (glob_arg.syslog_interval > 0 && glob_arg.syslog_interval < poll_timeout) poll_timeout = glob_arg.syslog_interval; if (glob_arg.stdout_interval > 0 && glob_arg.stdout_interval < poll_timeout) poll_timeout = glob_arg.stdout_interval; + /* initialize the morefrag structures for the input rings */ + for (i = rxport->nmd->first_rx_ring; i <= rxport->nmd->last_rx_ring; i++) { + struct netmap_ring *rxring = NETMAP_RXRING(rxport->nmd->nifp, i); + struct morefrag *mf = (struct morefrag *)rxring->sem; + + mf->last_flag = 0; + mf->last_hash = 0; + mf->shadow_head = 0; /* unused */ + } + while (!do_abort) { u_int polli = 0; iter++; for (i = 0; i < npipes; ++i) { struct netmap_ring *ring = ports[i].ring; int pending = nm_tx_pending(ring); /* if there are packets pending, we want to be notified when * tail moves, so we let cur=tail */ ring->cur = pending ? ring->tail : ring->head; if (!glob_arg.busy_wait && !pending) { /* no need to poll, there are no packets pending */ continue; } pollfd[polli].fd = ports[i].nmd->fd; pollfd[polli].events = POLLOUT; pollfd[polli].revents = 0; ++polli; } pollfd[polli].fd = rxport->nmd->fd; pollfd[polli].events = POLLIN; pollfd[polli].revents = 0; ++polli; - //RD(5, "polling %d file descriptors", polli+1); + ND(5, "polling %d file descriptors", polli); rv = poll(pollfd, polli, poll_timeout); if (rv <= 0) { if (rv < 0 && errno != EAGAIN && errno != EINTR) RD(1, "poll error %s", strerror(errno)); goto send_stats; } /* if there are several groups, try pushing released packets from * upstream groups to the downstream ones. * * It is important to do this before returned slots are reused * for new transmissions. For the same reason, this must be * done starting from the last group going backwards. */ for (i = glob_arg.num_groups - 1U; i > 0; i--) { struct group_des *g = &groups[i - 1]; for (j = 0; j < g->nports; j++) { struct port_des *p = &g->ports[j]; struct netmap_ring *ring = p->ring; uint32_t last = p->last_tail, stop = nm_ring_next(ring, ring->tail); /* slight abuse of the API here: we touch the slot * pointed to by tail */ for ( ; last != stop; last = nm_ring_next(ring, last)) { struct netmap_slot *rs = &ring->slot[last]; // XXX less aggressive? rs->buf_idx = forward_packet(g + 1, rs); - rs->flags |= NS_BUF_CHANGED; + rs->flags = NS_BUF_CHANGED; rs->ptr = 0; } p->last_tail = last; } } if (oq) { /* try to push packets from the overflow queues * to the corresponding pipes */ for (i = 0; i < npipes; i++) { struct port_des *p = &ports[i]; struct overflow_queue *q = p->oq; - uint32_t k, lim; + uint32_t k; + int64_t lim; struct netmap_ring *ring; struct netmap_slot *slot; + struct morefrag *mf; if (oq_empty(q)) continue; ring = p->ring; - lim = nm_ring_space(ring); + mf = (struct morefrag *)ring->sem; + lim = ring->tail - mf->shadow_head; if (!lim) continue; + if (lim < 0) + lim += ring->num_slots; if (q->n < lim) lim = q->n; for (k = 0; k < lim; k++) { struct netmap_slot s = oq_deq(q), tmp; tmp.ptr = 0; - slot = &ring->slot[ring->head]; + slot = &ring->slot[mf->shadow_head]; tmp.buf_idx = slot->buf_idx; oq_enq(freeq, &tmp); *slot = s; slot->flags |= NS_BUF_CHANGED; - ring->head = nm_ring_next(ring, ring->head); + mf->shadow_head = nm_ring_next(ring, mf->shadow_head); + if (!(slot->flags & NS_MOREFRAG)) + ring->head = mf->shadow_head; } } } /* push any new packets from the input port to the first group */ int batch = 0; for (i = rxport->nmd->first_rx_ring; i <= rxport->nmd->last_rx_ring; i++) { struct netmap_ring *rxring = NETMAP_RXRING(rxport->nmd->nifp, i); + struct morefrag *mf = (struct morefrag *)rxring->sem; //D("prepare to scan rings"); int next_head = rxring->head; struct netmap_slot *next_slot = &rxring->slot[next_head]; const char *next_buf = NETMAP_BUF(rxring, next_slot->buf_idx); while (!nm_ring_empty(rxring)) { struct netmap_slot *rs = next_slot; struct group_des *g = &groups[0]; ++received_pkts; received_bytes += rs->len; // CHOOSE THE CORRECT OUTPUT PIPE - rs->ptr = pkt_hdr_hash((const unsigned char *)next_buf, 4, 'B'); + // If the previous slot had NS_MOREFRAG set, this is another + // fragment of the last packet and it should go to the same + // output pipe as before. + if (!mf->last_flag) { + // 'B' is just a hashing seed + mf->last_hash = pkt_hdr_hash((const unsigned char *)next_buf, 4, 'B'); + } + mf->last_flag = rs->flags & NS_MOREFRAG; + rs->ptr = mf->last_hash; if (rs->ptr == 0) { non_ip++; // XXX ?? } // prefetch the buffer for the next round next_head = nm_ring_next(rxring, next_head); next_slot = &rxring->slot[next_head]; next_buf = NETMAP_BUF(rxring, next_slot->buf_idx); __builtin_prefetch(next_buf); - // 'B' is just a hashing seed rs->buf_idx = forward_packet(g, rs); - rs->flags |= NS_BUF_CHANGED; + rs->flags = NS_BUF_CHANGED; rxring->head = rxring->cur = next_head; batch++; if (unlikely(batch >= glob_arg.batch)) { ioctl(rxport->nmd->fd, NIOCRXSYNC, NULL); batch = 0; } ND(1, "Forwarded Packets: %"PRIu64" Dropped packets: %"PRIu64" Percent: %.2f", forwarded, dropped, ((float)dropped / (float)forwarded * 100)); } } send_stats: if (counters_buf.status == COUNTERS_FULL) continue; /* take a new snapshot of the counters */ gettimeofday(&counters_buf.ts, NULL); for (i = 0; i < npipes; i++) { struct my_ctrs *c = &counters_buf.ctrs[i]; *c = ports[i].ctr; /* * If there are overflow queues, copy the number of them for each * port to the ctrs.oq_n variable for each port. */ if (ports[i].oq != NULL) c->oq_n = ports[i].oq->n; } counters_buf.received_pkts = received_pkts; counters_buf.received_bytes = received_bytes; counters_buf.non_ip = non_ip; if (freeq != NULL) counters_buf.freeq_n = freeq->n; __sync_synchronize(); counters_buf.status = COUNTERS_FULL; } /* * If freeq exists, copy the number to the freeq_n member of the * message struct, otherwise set it to 0. */ if (freeq != NULL) { freeq_n = freeq->n; } else { freeq_n = 0; } pthread_join(stat_thread, NULL); printf("%"PRIu64" packets forwarded. %"PRIu64" packets dropped. Total %"PRIu64"\n", forwarded, dropped, forwarded + dropped); return 0; } Index: stable/12/tools/tools/netmap/pkt_hash.c =================================================================== --- stable/12/tools/tools/netmap/pkt_hash.c (revision 368033) +++ stable/12/tools/tools/netmap/pkt_hash.c (revision 368034) @@ -1,396 +1,397 @@ /* ** Copyright (c) 2015, Asim Jamshed, Robin Sommer, Seth Hall ** and the International Computer Science Institute. All rights reserved. ** ** Redistribution and use in source and binary forms, with or without ** modification, are permitted provided that the following conditions are met: ** ** (1) Redistributions of source code must retain the above copyright ** notice, this list of conditions and the following disclaimer. ** ** (2) Redistributions in binary form must reproduce the above copyright ** notice, this list of conditions and the following disclaimer in the ** documentation and/or other materials provided with the distribution. ** ** ** THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" ** AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE ** IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ** ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE ** LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR ** CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF ** SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS ** INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN ** CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ** ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE ** POSSIBILITY OF SUCH DAMAGE. **/ /* $FreeBSD$ */ + /* for func prototypes */ #include "pkt_hash.h" /* Make Linux headers choose BSD versions of some of the data structures */ #define __FAVOR_BSD /* for types */ #include /* for [n/h]to[h/n][ls] */ #include /* iphdr */ #include /* ipv6hdr */ #include /* tcphdr */ #include /* udphdr */ #include /* eth hdr */ #include /* for memset */ #include #include #include //#include /*---------------------------------------------------------------------*/ /** * * The cache table is used to pick a nice seed for the hash value. It is * * built only once when sym_hash_fn is called for the very first time * */ static void build_sym_key_cache(uint32_t *cache, int cache_len) { static const uint8_t key[] = { 0x50, 0x6d }; uint32_t result = (((uint32_t)key[0]) << 24) | (((uint32_t)key[1]) << 16) | (((uint32_t)key[0]) << 8) | ((uint32_t)key[1]); uint32_t idx = 32; int i; for (i = 0; i < cache_len; i++, idx++) { uint8_t shift = (idx % 8); uint32_t bit; cache[i] = result; bit = ((key[(idx/8) & 1] << shift) & 0x80) ? 1 : 0; result = ((result << 1) | bit); } } static void build_byte_cache(uint32_t byte_cache[256][4]) { #define KEY_CACHE_LEN 96 int i, j, k; uint32_t key_cache[KEY_CACHE_LEN]; build_sym_key_cache(key_cache, KEY_CACHE_LEN); for (i = 0; i < 4; i++) { for (j = 0; j < 256; j++) { uint8_t b = j; byte_cache[j][i] = 0; for (k = 0; k < 8; k++) { if (b & 0x80) byte_cache[j][i] ^= key_cache[8 * i + k]; b <<= 1U; } } } } /*---------------------------------------------------------------------*/ /** ** Computes symmetric hash based on the 4-tuple header data **/ static uint32_t sym_hash_fn(uint32_t sip, uint32_t dip, uint16_t sp, uint32_t dp) { uint32_t rc = 0; static int first_time = 1; static uint32_t byte_cache[256][4]; uint8_t *sip_b = (uint8_t *)&sip, *dip_b = (uint8_t *)&dip, *sp_b = (uint8_t *)&sp, *dp_b = (uint8_t *)&dp; if (first_time) { build_byte_cache(byte_cache); first_time = 0; } rc = byte_cache[sip_b[3]][0] ^ byte_cache[sip_b[2]][1] ^ byte_cache[sip_b[1]][2] ^ byte_cache[sip_b[0]][3] ^ byte_cache[dip_b[3]][0] ^ byte_cache[dip_b[2]][1] ^ byte_cache[dip_b[1]][2] ^ byte_cache[dip_b[0]][3] ^ byte_cache[sp_b[1]][0] ^ byte_cache[sp_b[0]][1] ^ byte_cache[dp_b[1]][2] ^ byte_cache[dp_b[0]][3]; return rc; } static uint32_t decode_gre_hash(const uint8_t *, uint8_t, uint8_t); /*---------------------------------------------------------------------*/ /** ** Parser + hash function for the IPv4 packet **/ static uint32_t decode_ip_n_hash(const struct ip *iph, uint8_t hash_split, uint8_t seed) { uint32_t rc = 0; if (hash_split == 2) { rc = sym_hash_fn(ntohl(iph->ip_src.s_addr), ntohl(iph->ip_dst.s_addr), ntohs(0xFFFD) + seed, ntohs(0xFFFE) + seed); } else { const struct tcphdr *tcph = NULL; const struct udphdr *udph = NULL; switch (iph->ip_p) { case IPPROTO_TCP: tcph = (const struct tcphdr *)((const uint8_t *)iph + (iph->ip_hl<<2)); rc = sym_hash_fn(ntohl(iph->ip_src.s_addr), ntohl(iph->ip_dst.s_addr), ntohs(tcph->th_sport) + seed, ntohs(tcph->th_dport) + seed); break; case IPPROTO_UDP: udph = (const struct udphdr *)((const uint8_t *)iph + (iph->ip_hl<<2)); rc = sym_hash_fn(ntohl(iph->ip_src.s_addr), ntohl(iph->ip_dst.s_addr), ntohs(udph->uh_sport) + seed, ntohs(udph->uh_dport) + seed); break; case IPPROTO_IPIP: /* tunneling */ rc = decode_ip_n_hash((const struct ip *)((const uint8_t *)iph + (iph->ip_hl<<2)), hash_split, seed); break; case IPPROTO_GRE: rc = decode_gre_hash((const uint8_t *)iph + (iph->ip_hl<<2), hash_split, seed); break; case IPPROTO_ICMP: case IPPROTO_ESP: case IPPROTO_PIM: case IPPROTO_IGMP: default: /* ** the hash strength (although weaker but) should still hold ** even with 2 fields **/ rc = sym_hash_fn(ntohl(iph->ip_src.s_addr), ntohl(iph->ip_dst.s_addr), ntohs(0xFFFD) + seed, ntohs(0xFFFE) + seed); break; } } return rc; } /*---------------------------------------------------------------------*/ /** ** Parser + hash function for the IPv6 packet **/ static uint32_t decode_ipv6_n_hash(const struct ip6_hdr *ipv6h, uint8_t hash_split, uint8_t seed) { uint32_t saddr, daddr; uint32_t rc = 0; /* Get only the first 4 octets */ saddr = ipv6h->ip6_src.s6_addr[0] | (ipv6h->ip6_src.s6_addr[1] << 8) | (ipv6h->ip6_src.s6_addr[2] << 16) | (ipv6h->ip6_src.s6_addr[3] << 24); daddr = ipv6h->ip6_dst.s6_addr[0] | (ipv6h->ip6_dst.s6_addr[1] << 8) | (ipv6h->ip6_dst.s6_addr[2] << 16) | (ipv6h->ip6_dst.s6_addr[3] << 24); if (hash_split == 2) { rc = sym_hash_fn(ntohl(saddr), ntohl(daddr), ntohs(0xFFFD) + seed, ntohs(0xFFFE) + seed); } else { const struct tcphdr *tcph = NULL; const struct udphdr *udph = NULL; switch(ntohs(ipv6h->ip6_ctlun.ip6_un1.ip6_un1_nxt)) { case IPPROTO_TCP: tcph = (const struct tcphdr *)(ipv6h + 1); rc = sym_hash_fn(ntohl(saddr), ntohl(daddr), ntohs(tcph->th_sport) + seed, ntohs(tcph->th_dport) + seed); break; case IPPROTO_UDP: udph = (const struct udphdr *)(ipv6h + 1); rc = sym_hash_fn(ntohl(saddr), ntohl(daddr), ntohs(udph->uh_sport) + seed, ntohs(udph->uh_dport) + seed); break; case IPPROTO_IPIP: /* tunneling */ rc = decode_ip_n_hash((const struct ip *)(ipv6h + 1), hash_split, seed); break; case IPPROTO_IPV6: /* tunneling */ rc = decode_ipv6_n_hash((const struct ip6_hdr *)(ipv6h + 1), hash_split, seed); break; case IPPROTO_GRE: rc = decode_gre_hash((const uint8_t *)(ipv6h + 1), hash_split, seed); break; case IPPROTO_ICMP: case IPPROTO_ESP: case IPPROTO_PIM: case IPPROTO_IGMP: default: /* ** the hash strength (although weaker but) should still hold ** even with 2 fields **/ rc = sym_hash_fn(ntohl(saddr), ntohl(daddr), ntohs(0xFFFD) + seed, ntohs(0xFFFE) + seed); } } return rc; } /*---------------------------------------------------------------------*/ /** * * A temp solution while hash for other protocols are filled... * * (See decode_vlan_n_hash & pkt_hdr_hash functions). * */ static uint32_t decode_others_n_hash(const struct ether_header *ethh, uint8_t seed) { uint32_t saddr, daddr, rc; saddr = ethh->ether_shost[5] | (ethh->ether_shost[4] << 8) | (ethh->ether_shost[3] << 16) | (ethh->ether_shost[2] << 24); daddr = ethh->ether_dhost[5] | (ethh->ether_dhost[4] << 8) | (ethh->ether_dhost[3] << 16) | (ethh->ether_dhost[2] << 24); rc = sym_hash_fn(ntohl(saddr), ntohl(daddr), ntohs(0xFFFD) + seed, ntohs(0xFFFE) + seed); return rc; } /*---------------------------------------------------------------------*/ /** ** Parser + hash function for VLAN packet **/ static inline uint32_t decode_vlan_n_hash(const struct ether_header *ethh, uint8_t hash_split, uint8_t seed) { uint32_t rc = 0; const struct vlanhdr *vhdr = (const struct vlanhdr *)(ethh + 1); switch (ntohs(vhdr->proto)) { case ETHERTYPE_IP: rc = decode_ip_n_hash((const struct ip *)(vhdr + 1), hash_split, seed); break; case ETHERTYPE_IPV6: rc = decode_ipv6_n_hash((const struct ip6_hdr *)(vhdr + 1), hash_split, seed); break; case ETHERTYPE_ARP: default: /* others */ rc = decode_others_n_hash(ethh, seed); break; } return rc; } /*---------------------------------------------------------------------*/ /** ** General parser + hash function... **/ uint32_t pkt_hdr_hash(const unsigned char *buffer, uint8_t hash_split, uint8_t seed) { uint32_t rc = 0; const struct ether_header *ethh = (const struct ether_header *)buffer; switch (ntohs(ethh->ether_type)) { case ETHERTYPE_IP: rc = decode_ip_n_hash((const struct ip *)(ethh + 1), hash_split, seed); break; case ETHERTYPE_IPV6: rc = decode_ipv6_n_hash((const struct ip6_hdr *)(ethh + 1), hash_split, seed); break; case ETHERTYPE_VLAN: rc = decode_vlan_n_hash(ethh, hash_split, seed); break; case ETHERTYPE_ARP: default: /* others */ rc = decode_others_n_hash(ethh, seed); break; } return rc; } /*---------------------------------------------------------------------*/ /** ** Parser + hash function for the GRE packet **/ static uint32_t decode_gre_hash(const uint8_t *grehdr, uint8_t hash_split, uint8_t seed) { uint32_t rc = 0; int len = 4 + 2 * (!!(*grehdr & 1) + /* Checksum */ !!(*grehdr & 2) + /* Routing */ !!(*grehdr & 4) + /* Key */ !!(*grehdr & 8)); /* Sequence Number */ uint16_t proto = ntohs(*(const uint16_t *)(const void *)(grehdr + 2)); switch (proto) { case ETHERTYPE_IP: rc = decode_ip_n_hash((const struct ip *)(grehdr + len), hash_split, seed); break; case ETHERTYPE_IPV6: rc = decode_ipv6_n_hash((const struct ip6_hdr *)(grehdr + len), hash_split, seed); break; case 0x6558: /* Transparent Ethernet Bridging */ rc = pkt_hdr_hash(grehdr + len, hash_split, seed); break; default: /* others */ break; } return rc; } /*---------------------------------------------------------------------*/ Index: stable/12 =================================================================== --- stable/12 (revision 368033) +++ stable/12 (revision 368034) Property changes on: stable/12 ___________________________________________________________________ Modified: svn:mergeinfo ## -0,0 +0,1 ## Merged /head:r367933