Index: share/doc/notes/tcp_backpressure_design.txt =================================================================== --- /dev/null +++ share/doc/notes/tcp_backpressure_design.txt @@ -0,0 +1,88 @@ +inpcb portion of implementation: + +private: + - initialize an inpcb tailq per-cpu + - on non-iflib systems initialize a taskqueue thread per-cpu + +inp_rexmt_fn() task function: + - call inp_rexmt function for each inpcb in tailq + - inp_rexmt or the function it is a wrapper for should just + call inp_rexmt_enqueue when ENOBUFS is returned so that inpcbs + will end up being serviced round robin for all participating + interfaces + - Would be faster to skip further inp_rexmt (tcp_output) calls for a queue + that had already returned ENOBUFS. However, that would require callers + that check the return code of tcp_output to handle an ENOBUFS return. + Consider this a future optimization pending indication that much + time is spent in the txq overrun state. + +public: +inp_rexmt_enqueue(): + - initialize inp_rexmt to passed function + - enqueue inpcb to the tailq for the cpu corresponding to the flowid + +inp_rexmt_start(uint32_t qid, uint32_t nqs): + - wakeup taskq threads corresponding to qid + + +Driver portion of implementation: + +Backpressure adds a new state machine to driver packet processing: + + + + !full ifq ifq >= half full + _________ _______ + | | | | + __\ /___ | full ifq ____\ /__ | + | OPEN | --|------------->| CLOSED|--| + -------- --------- | + / \ | + | ifq < half full | + |---------------------------------- + + + +The term ifq refers to any software queue including struct ifq, buf_ring and +np_ring. When a queue becomes full the driver will return ENOBUFS on any +further requests until the software ring transtions to less than half full. +When the driver enqueues the packet that transitions the software queue back +to the open state it calls inp_rxmt_start with a cpuset indicating the cpus that +map to the software ring whose state has just transitioned. For example, a +hypothetical driver has 4 hw tx queues paired with 4 sw queues feeding them on +a system with 8 logical cores. If the software queue that is used by cores 4 +and 5 transitions from CLOSED to open, the driver will call inp_rxmt_start +with a cpuset corresponding to 4 and 5. + + +TCP portion of implementation: + +Legacy TCP behavior of tcp_output: + + switch (error) { + <..> + case ENOBUFS: + if (!tcp_timer_active(tp, TT_REXMT) && + !tcp_timer_active(tp, TT_PERSIST)) + tcp_timer_activate(tp, TT_REXMT, tp->t_rxtcur); + tp->snd_cwnd = tp->t_maxseg; + return (0); + <...> + +With backpressure supported the inpcb is simply put at the end of a tailq +corresponding to the current cpu: + switch (error) { + <..> + case ENOBUFS: + inp_rexmt_enqueue(tp->t_inp, tcp_output_rexmt); + return (0); + <...> + +where tcp_output_rexmt is a wrapper for tcp_output(): + +void +tcp_output_rexmt(struct inpcb *inp) +{ + + (void)tcp_output(tp->inp_ppcb); +} Index: sys/dev/bxe/bxe.c =================================================================== --- sys/dev/bxe/bxe.c +++ sys/dev/bxe/bxe.c @@ -6810,7 +6810,7 @@ #if __FreeBSD_version >= 800000 fp->tx_br = buf_ring_alloc(BXE_BR_SIZE, M_DEVBUF, - M_NOWAIT, &fp->tx_mtx); + M_NOWAIT, &fp->tx_mtx, i, sc->num_queues); if (fp->tx_br == NULL) { BLOGE(sc, "buf_ring alloc fail for fp[%02d]\n", i); goto bxe_alloc_fp_buffers_error; Index: sys/dev/cxgb/cxgb_sge.c =================================================================== --- sys/dev/cxgb/cxgb_sge.c +++ sys/dev/cxgb/cxgb_sge.c @@ -2421,13 +2421,18 @@ { struct sge_qset *q = &sc->sge.qs[id]; int i, ret = 0; + int nqsets; MTX_INIT(&q->lock, q->namebuf, NULL, MTX_DEF); q->port = pi; q->adap = sc; + for (nqsets = i = 0; i < sc->params.nports; i++) { + struct port_info *ptmp = &sc->port[i]; + nqsets += ptmp->nqsets; + } if ((q->txq[TXQ_ETH].txq_mr = buf_ring_alloc(cxgb_txq_buf_ring_size, - M_DEVBUF, M_WAITOK, &q->lock)) == NULL) { + M_DEVBUF, M_WAITOK, &q->lock, id, nqsets)) == NULL) { device_printf(sc->dev, "failed to allocate mbuf ring\n"); goto err; } Index: sys/dev/cxgb/ulp/iw_cxgb/iw_cxgb_resource.c =================================================================== --- sys/dev/cxgb/ulp/iw_cxgb/iw_cxgb_resource.c +++ sys/dev/cxgb/ulp/iw_cxgb/iw_cxgb_resource.c @@ -94,7 +94,7 @@ u32 rarray[16]; mtx_init(fifo_lock, "cxio fifo", NULL, MTX_DEF|MTX_DUPOK); - *fifo = buf_ring_alloc(nr, M_DEVBUF, M_NOWAIT, fifo_lock); + *fifo = buf_ring_alloc(nr, M_DEVBUF, M_NOWAIT, fifo_lock, 0, 0); if (*fifo == NULL) return (-ENOMEM); #if 0 @@ -154,7 +154,7 @@ mtx_init(&rdev_p->rscp->qpid_fifo_lock, "qpid fifo", NULL, MTX_DEF); rdev_p->rscp->qpid_fifo = buf_ring_alloc(T3_MAX_NUM_QP, M_DEVBUF, - M_NOWAIT, &rdev_p->rscp->qpid_fifo_lock); + M_NOWAIT, &rdev_p->rscp->qpid_fifo_lock, 0, 0); if (rdev_p->rscp->qpid_fifo == NULL) return (-ENOMEM); Index: sys/dev/e1000/if_em.c =================================================================== --- sys/dev/e1000/if_em.c +++ sys/dev/e1000/if_em.c @@ -3288,7 +3288,7 @@ #if __FreeBSD_version >= 800000 /* Allocate a buf ring */ txr->br = buf_ring_alloc(4096, M_DEVBUF, - M_WAITOK, &txr->tx_mtx); + M_WAITOK, &txr->tx_mtx, i, adapter->num_queues); #endif } Index: sys/dev/e1000/if_igb.c =================================================================== --- sys/dev/e1000/if_igb.c +++ sys/dev/e1000/if_igb.c @@ -3375,7 +3375,8 @@ #ifndef IGB_LEGACY_TX /* Allocate a buf ring */ txr->br = buf_ring_alloc(igb_buf_ring_size, M_DEVBUF, - M_WAITOK, &txr->tx_mtx); + M_WAITOK, &txr->tx_mtx, i, + adapter->num_queues); #endif } Index: sys/dev/ifmlx4/en_tx.c =================================================================== --- sys/dev/ifmlx4/en_tx.c +++ sys/dev/ifmlx4/en_tx.c @@ -89,7 +89,8 @@ /* Allocate the buf ring */ ring->br = buf_ring_alloc(MLX4_EN_DEF_TX_QUEUE_SIZE, M_DEVBUF, - M_WAITOK, &ring->tx_lock.m); + M_WAITOK, &ring->tx_lock.m, queue_idx, + priv->tx_ring_num); if (ring->br == NULL) { en_err(priv, "Failed allocating tx_info ring\n"); return -ENOMEM; Index: sys/dev/ixgbe/ix_txrx.c =================================================================== --- sys/dev/ixgbe/ix_txrx.c +++ sys/dev/ixgbe/ix_txrx.c @@ -2222,8 +2222,9 @@ } #ifndef IXGBE_LEGACY_TX /* Allocate a buf ring */ - txr->br = buf_ring_alloc(IXGBE_BR_SIZE, M_DEVBUF, - M_WAITOK, &txr->tx_mtx); + txr->br = buf_ring_allocIXGBE_BR_SIZE, M_DEVBUF, + M_WAITOK, &txr->tx_mtx, i, + adapter->num_queues); if (txr->br == NULL) { device_printf(dev, "Critical Failure setting up buf ring\n"); Index: sys/dev/mxge/if_mxge.c =================================================================== --- sys/dev/mxge/if_mxge.c +++ sys/dev/mxge/if_mxge.c @@ -4435,7 +4435,7 @@ mtx_init(&ss->tx.mtx, ss->tx.mtx_name, NULL, MTX_DEF); #ifdef IFNET_BUF_RING ss->tx.br = buf_ring_alloc(2048, M_DEVBUF, M_WAITOK, - &ss->tx.mtx); + &ss->tx.mtx, i, sc->num_slices); #endif } Index: sys/dev/oce/oce_queue.c =================================================================== --- sys/dev/oce/oce_queue.c +++ sys/dev/oce/oce_queue.c @@ -45,7 +45,7 @@ *****************************************************/ static struct oce_wq *oce_wq_init(POCE_SOFTC sc, - uint32_t q_len, uint32_t wq_type); + uint32_t q_len, uint32_t wq_type, int idx); static int oce_wq_create(struct oce_wq *wq, struct oce_eq *eq); static void oce_wq_free(struct oce_wq *wq); static void oce_wq_del(struct oce_wq *wq); @@ -94,7 +94,7 @@ /* alloc TX/RX queues */ for_all_wq_queues(sc, wq, i) { sc->wq[i] = oce_wq_init(sc, sc->tx_ring_size, - NIC_WQ_TYPE_STANDARD); + NIC_WQ_TYPE_STANDARD, i); if (!sc->wq[i]) goto error; @@ -202,7 +202,7 @@ * @returns the pointer to the WQ created or NULL on failure */ static struct -oce_wq *oce_wq_init(POCE_SOFTC sc, uint32_t q_len, uint32_t wq_type) +oce_wq *oce_wq_init(POCE_SOFTC sc, uint32_t q_len, uint32_t wq_type, int idx) { struct oce_wq *wq; int rc = 0, i; @@ -258,7 +258,7 @@ #if __FreeBSD_version >= 800000 /* Allocate buf ring for multiqueue*/ wq->br = buf_ring_alloc(4096, M_DEVBUF, - M_WAITOK, &wq->tx_lock.mutex); + M_WAITOK, &wq->tx_lock.mutex, idx, sc->nwqs); if (!wq->br) goto free_wq; #endif Index: sys/dev/virtio/network/if_vtnet.c =================================================================== --- sys/dev/virtio/network/if_vtnet.c +++ sys/dev/virtio/network/if_vtnet.c @@ -702,8 +702,10 @@ vtnet_init_txq(struct vtnet_softc *sc, int id) { struct vtnet_txq *txq; + int npairs; txq = &sc->vtnet_txqs[id]; + npairs = sc->vtnet_max_vq_pairs; snprintf(txq->vtntx_name, sizeof(txq->vtntx_name), "%s-tx%d", device_get_nameunit(sc->vtnet_dev), id); @@ -718,7 +720,7 @@ #ifndef VTNET_LEGACY_TX txq->vtntx_br = buf_ring_alloc(VTNET_DEFAULT_BUFRING_SIZE, M_DEVBUF, - M_NOWAIT, &txq->vtntx_mtx); + M_NOWAIT, &txq->vtntx_mtx, id, npairs); if (txq->vtntx_br == NULL) return (ENOMEM); Index: sys/dev/vmware/vmxnet3/if_vmx.c =================================================================== --- sys/dev/vmware/vmxnet3/if_vmx.c +++ sys/dev/vmware/vmxnet3/if_vmx.c @@ -990,7 +990,7 @@ TASK_INIT(&txq->vxtxq_defrtask, 0, vmxnet3_txq_tq_deferred, txq); txq->vxtxq_br = buf_ring_alloc(VMXNET3_DEF_BUFRING_SIZE, M_DEVBUF, - M_NOWAIT, &txq->vxtxq_mtx); + M_NOWAIT, &txq->vxtxq_mtx, q, sc->vmx_max_ntxqueues); if (txq->vxtxq_br == NULL) return (ENOMEM); #endif Index: sys/dev/vxge/vxge.c =================================================================== --- sys/dev/vxge/vxge.c +++ sys/dev/vxge/vxge.c @@ -2295,7 +2295,8 @@ } #if __FreeBSD_version >= 800000 vpath->br = buf_ring_alloc(VXGE_DEFAULT_BR_SIZE, M_DEVBUF, - M_WAITOK, &vpath->mtx_tx); + M_WAITOK, &vpath->mtx_tx, i, + vdev->no_of_vpath); if (vpath->br == NULL) { err = ENOMEM; break; Index: sys/kern/subr_bufring.c =================================================================== --- sys/kern/subr_bufring.c +++ sys/kern/subr_bufring.c @@ -50,7 +50,7 @@ static struct buf_ring * -buf_ring_alloc_(int count, struct malloc_type *type, int flags, struct mtx *lock, int brflags) +buf_ring_alloc_(int count, struct malloc_type *type, int flags, struct mtx *lock, int brflags, int id, int nqs) { struct buf_ring *br; int alloc_count; @@ -70,22 +70,24 @@ br->br_prod_mask = br->br_cons_mask = count-1; br->br_prod_head = br->br_cons_head = 0; br->br_prod_tail = br->br_cons_tail = 0; + br->br_id = id; + br->br_nqs = nqs; return (br); } struct buf_ring * -buf_ring_alloc(int count, struct malloc_type *type, int flags, struct mtx *lock) +buf_ring_alloc(int count, struct malloc_type *type, int flags, struct mtx *lock, int id, int nqs) { - return (buf_ring_alloc_(count, type, flags, lock, 0)); + return (buf_ring_alloc_(count, type, flags, lock, 0, id, nqs)); } struct buf_ring * buf_ring_aligned_alloc(int count, struct malloc_type *type, int flags, struct mtx *lock) { - return (buf_ring_alloc_(count, type, flags, lock, BR_FLAGS_ALIGNED)); + return (buf_ring_alloc_(count, type, flags, lock, BR_FLAGS_ALIGNED, 0, 0)); } void Index: sys/net/altq/if_altq.h =================================================================== --- sys/net/altq/if_altq.h +++ sys/net/altq/if_altq.h @@ -44,6 +44,7 @@ struct mbuf *ifq_tail; int ifq_len; int ifq_maxlen; + int ifq_closed; struct mtx ifq_mtx; /* driver owned queue (used for bulk dequeue and prepend) UNLOCKED */ Index: sys/net/if.c =================================================================== --- sys/net/if.c +++ sys/net/if.c @@ -81,6 +81,7 @@ #include #include #include +#include #include #include #ifdef INET @@ -3424,6 +3425,16 @@ (*(ifp)->if_start)(ifp); } +void +if_rexmt_start(int qid, int nqs) +{ + +#if defined(INET6) || defined(INET) + inp_rexmt_start(qid, nqs); +#endif +} + + /* * Backwards compatibility interface for drivers * that have not implemented it Index: sys/net/iflib.h =================================================================== --- sys/net/iflib.h +++ sys/net/iflib.h @@ -208,7 +208,7 @@ if_softc_ctx_t iflib_get_softc_ctx(if_ctx_t ctx); if_shared_ctx_t iflib_get_sctx(if_ctx_t ctx); -void iflib_set_mac(if_ctx_t ctx, uint8_t mac[ETHER_ADDR_LEN]); +void iflib_set_mac(if_ctx_t ctx, uint8_t mac[6/*ETHER_ADDR_LEN*/]); @@ -245,6 +245,7 @@ void iflib_irq_free(if_ctx_t ctx, if_irq_t irq); +void iflib_io_tqg_attach(struct grouptask *gtask, void *uniq, int cpu, char *name); void iflib_tx_intr_deferred(if_ctx_t ctx, int txqid); void iflib_rx_intr_deferred(if_ctx_t ctx, int rxqid); Index: sys/net/iflib.c =================================================================== --- sys/net/iflib.c +++ sys/net/iflib.c @@ -52,6 +52,7 @@ #include #include +#include #include #include @@ -288,12 +289,13 @@ iflib_sd_t ift_sds; int ift_nbr; struct mp_ring **ift_br; - struct grouptask ift_task; - int ift_qstatus; + struct grouptask ift_task; + int ift_qstatus; int ift_active; - int ift_watchdog_time; + int ift_watchdog_time; struct iflib_filter_info ift_filter_info; - iflib_dma_info_t ift_ifdi; + iflib_dma_info_t ift_ifdi; + int ift_closed; }; struct iflib_fl { @@ -2290,6 +2292,10 @@ int i, count, pkt_sent, bytes_sent, mcast_sent, avail; avail = IDXDIFF(pidx, cidx, r->size); + if (avail < (r->size >> 1)) { + txq->ift_closed = FALSE; + inp_rexmt_start(txq->ift_id, ctx->ifc_softc_ctx.isc_nqsets); + } if (ctx->ifc_flags & IFC_QFLUSH) { DBG_COUNTER_INC(txq_drain_flushing); for (i = 0; i < avail; i++) { @@ -2505,6 +2511,26 @@ m_freem(m); return (0); } + + qidx = 0; + if ((NQSETS(ctx) > 1) && M_HASHTYPE_GET(m)) + qidx = QIDX(ctx, m); + /* + * XXX calculate buf_ring based on flowid (divvy up bits?) + */ + txq = &ctx->ifc_txqs[qidx]; + + + if (txq->ift_closed) { + while (m != NULL) { + next = m->m_nextpkt; + m->m_nextpkt = NULL; + m_freem(m); + m = next; + } + return (ENOBUFS); + } + qidx = count = 0; mp = marr; next = m; @@ -2526,17 +2552,11 @@ next = next->m_nextpkt; mp[i]->m_nextpkt = NULL; } - if ((NQSETS(ctx) > 1) && M_HASHTYPE_GET(m)) - qidx = QIDX(ctx, m); - /* - * XXX calculate buf_ring based on flowid (divvy up bits?) - */ - txq = &ctx->ifc_txqs[qidx]; - DBG_COUNTER_INC(tx_seen); err = mp_ring_enqueue(txq->ift_br[0], (void **)mp, count, IFLIB_BUDGET); /* drain => err = iflib_txq_transmit(ifp, txq, m); */ if (err) { + txq->ift_closed = TRUE; for (i = 0; i < count; i++) m_freem(mp[i]); mp_ring_check_drainage(txq->ift_br[0], BATCH_SIZE); @@ -3717,6 +3737,14 @@ } void +iflib_io_tqg_attach(struct grouptask *gt, void *uniq, int cpu, char *name) +{ + + taskqgroup_attach_cpu(gctx->igc_io_tqg, gt, uniq, cpu, -1, name); +} + + +void iflib_link_state_change(if_ctx_t ctx, int link_state) { if_t ifp = ctx->ifc_ifp; Index: sys/net/ifq.h =================================================================== --- sys/net/ifq.h +++ sys/net/ifq.h @@ -57,6 +57,7 @@ struct mbuf *ifq_tail; int ifq_len; int ifq_maxlen; + int ifq_closed; struct mtx ifq_mtx; }; @@ -71,6 +72,7 @@ #define IF_UNLOCK(ifq) mtx_unlock(&(ifq)->ifq_mtx) #define IF_LOCK_ASSERT(ifq) mtx_assert(&(ifq)->ifq_mtx, MA_OWNED) #define _IF_QFULL(ifq) ((ifq)->ifq_len >= (ifq)->ifq_maxlen) +#define _IF_QCLOSED(ifq) ((ifq)->ifq_closed == TRUE) #define _IF_QLEN(ifq) ((ifq)->ifq_len) #define _IF_ENQUEUE(ifq, m) do { \ @@ -162,12 +164,14 @@ #define IFQ_ENQUEUE(ifq, m, err) \ do { \ IF_LOCK(ifq); \ - if (ALTQ_IS_ENABLED(ifq)) \ + if (ALTQ_IS_ENABLED(ifq)) { \ ALTQ_ENQUEUE(ifq, m, NULL, err); \ - else { \ - if (_IF_QFULL(ifq)) { \ + (ifq)->ifq_closed = TRUE; \ + } else { \ + if (_IF_QCLOSED(ifq) || _IF_QFULL(ifq)) { \ m_freem(m); \ (err) = ENOBUFS; \ + (ifq)->ifq_closed = TRUE; \ } else { \ _IF_ENQUEUE(ifq, m); \ (err) = 0; \ @@ -182,8 +186,14 @@ (m) = tbr_dequeue_ptr(ifq, ALTDQ_REMOVE); \ else if (ALTQ_IS_ENABLED(ifq)) \ ALTQ_DEQUEUE(ifq, m); \ - else \ + else { \ _IF_DEQUEUE(ifq, m); \ + if (_IF_QCLOSED(ifq) && \ + (ifq)->ifq_len < ((ifq)->ifq_maxlen >> 1)) { \ + (ifq)->ifq_closed = FALSE; \ + if_rexmt_start(0, 1); \ + } \ + } \ } while (0) #define IFQ_DEQUEUE(ifq, m) \ @@ -216,6 +226,7 @@ ALTQ_PURGE(ifq); \ } else \ _IF_DRAIN(ifq); \ + (ifq)->ifq_closed = FALSE; \ } while (0) #define IFQ_PURGE(ifq) \ @@ -329,7 +340,6 @@ error = buf_ring_enqueue(br, m); if (error) m_freem(m); - return (error); } Index: sys/netinet/in_pcb.h =================================================================== --- sys/netinet/in_pcb.h +++ sys/netinet/in_pcb.h @@ -161,6 +161,7 @@ * (p) - Protected by the pcbinfo lock for the inpcb * (l) - Protected by the pcblist lock for the inpcb * (h) - Protected by the pcbhash lock for the inpcb + * (r) - Protected by the rexmt lock * (s) - Protected by another subsystem's locks * (x) - Undefined locking * @@ -240,6 +241,8 @@ struct llentry *inp_lle; /* cached L2 information */ struct rtentry *inp_rt; /* cached L3 information */ struct rwlock inp_lock; + STAILQ_ENTRY(inpcb) inp_rexmt_entry; /* (i/) */ + void (*inp_rexmt) (struct inpcb *); }; #define inp_fport inp_inc.inc_fport #define inp_lport inp_inc.inc_lport @@ -609,6 +612,7 @@ #define INP_RSS_BUCKET_SET 0x00000080 /* IP_RSS_LISTEN_BUCKET is set */ #define INP_RECVFLOWID 0x00000100 /* populate recv datagram with flow info */ #define INP_RECVRSSBUCKETID 0x00000200 /* populate recv datagram with bucket id */ +#define INP_IN_REXMTQ 0x00000400 /* inpcb is referenced by the pcpu rexmit q */ /* * Flags passed to in_pcblookup*() functions. @@ -728,6 +732,9 @@ struct sockaddr * in_sockaddr(in_port_t port, struct in_addr *addr); void in_pcbsosetlabel(struct socket *so); +void inp_rexmt_enqueue(struct inpcb *inp, void (*inp_rexmt) (struct inpcb *)); +void inp_rexmt_start(uint32_t qid, uint32_t nqs); +void inp_rexmt_stop(struct inpcb *inp); #endif /* _KERNEL */ #endif /* !_NETINET_IN_PCB_H_ */ Index: sys/netinet/in_pcb.c =================================================================== --- sys/netinet/in_pcb.c +++ sys/netinet/in_pcb.c @@ -2387,6 +2387,159 @@ *fp = inp->inp_fport; } + +/* backpressure start */ +#include +#include + +#include + +STAILQ_HEAD(inp_rexmt_head, inpcb) *inp_rexmt_list, *inp_rexmt_worklist; +static struct mtx *inp_rexmt_lock; +static struct grouptask *inp_rexmt_gt; +static counter_u64_t inp_rexmt_count; +SYSCTL_COUNTER_U64(_net_inet_ip, OID_AUTO, rexmt, CTLFLAG_RD, &inp_rexmt_count, + "Number of times inpcb was enqueued for rexmit"); +#define INP_REXMT_LOCK(i) mtx_lock(&inp_rexmt_lock[i]) +#define INP_REXMT_UNLOCK(i) mtx_unlock(&inp_rexmt_lock[i]) + +void inp_rexmt_fn(void *context __unused, int pending __unused); + +static void +inp_rexmt_init(const void *arg __unused) +{ + int i; + struct grouptask *gt; + + inp_rexmt_list = malloc(sizeof(struct inp_rexmt_head)*mp_ncpus, M_PCB, M_WAITOK|M_ZERO); + inp_rexmt_worklist = malloc(sizeof(struct inp_rexmt_head)*mp_ncpus, M_PCB, M_WAITOK|M_ZERO); + inp_rexmt_lock = malloc(sizeof(struct mtx)*mp_ncpus, M_PCB, M_WAITOK|M_ZERO); + inp_rexmt_gt = malloc(sizeof(struct grouptask)*mp_ncpus, M_PCB, M_WAITOK|M_ZERO); + inp_rexmt_count = counter_u64_alloc(M_WAITOK|M_ZERO); + for (i = 0; i < mp_ncpus; i++) { + STAILQ_INIT(&inp_rexmt_list[i]); + STAILQ_INIT(&inp_rexmt_worklist[i]); + mtx_init(&inp_rexmt_lock[i], "rexmt", NULL, MTX_DEF); + gt = &inp_rexmt_gt[i]; + GROUPTASK_INIT(gt, 0, inp_rexmt_fn, NULL); + iflib_io_tqg_attach(gt, gt, i, "rexmt"); + } +} +/* must be post-SI_SUB_SMP */ +SYSINIT(inp_rexmt_init, SI_SUB_LAST, SI_ORDER_ANY, + inp_rexmt_init, NULL); + +void +inp_rexmt_enqueue(struct inpcb *inp, void (*inp_rexmt) (struct inpcb *)) +{ + int cpuid; + + INP_WLOCK_ASSERT(inp); + if (inp->inp_flags2 & INP_IN_REXMTQ) + return; + + counter_u64_add(inp_rexmt_count, 1); + inp->inp_flags2 |= INP_IN_REXMTQ; + inp->inp_rexmt = inp_rexmt; + in_pcbref(inp); + + /* + * having a consistent reversible mapping is more important than + * optimal cpu locality - so wo don't go the route of: + * cpuid = rss_hash2cpuid(inp->inp_flowid, inp->inp_flowtype); + */ + cpuid = inp->inp_flowid % mp_ncpus; + + INP_REXMT_LOCK(cpuid); + /* + * If performance is an issue on systems with multiple interfaces + * it would make sense to hash the inp based on it's ifp so that + * rexmt_start only wakes up inpcbs on the callers interface + */ + STAILQ_INSERT_TAIL(&inp_rexmt_list[cpuid], inp, inp_rexmt_entry); + INP_REXMT_UNLOCK(cpuid); +} + +void +inp_rexmt_fn(void *context __unused, int pending __unused) +{ + struct inpcb *inp; + int cpuid; + struct inp_rexmt_head *worklist, *list; + void (*inp_rexmt) (struct inpcb *); + + cpuid = curcpu; + worklist = &inp_rexmt_worklist[cpuid]; + list = &inp_rexmt_list[cpuid]; + if (STAILQ_EMPTY(worklist)) { + INP_REXMT_LOCK(cpuid); + STAILQ_SWAP(worklist, list, inpcb); + INP_REXMT_UNLOCK(cpuid); + } + + inp = STAILQ_FIRST(worklist); + STAILQ_REMOVE_HEAD(worklist, inp_rexmt_entry); + INP_WLOCK(inp); + inp_rexmt = inp->inp_rexmt; + inp->inp_flags2 &= ~INP_IN_REXMTQ; + inp->inp_rexmt = NULL; + if (inp_rexmt != NULL) + inp_rexmt(inp); + if (!in_pcbrele_wlocked(inp)) + INP_WUNLOCK(inp); + /* reschedule task to continue work */ + if (!STAILQ_EMPTY(worklist)) + GROUPTASK_ENQUEUE(&inp_rexmt_gt[cpuid]); +} + +void +inp_rexmt_stop(struct inpcb *inp) +{ + + INP_WLOCK_ASSERT(inp); + + /* we rely on tq thread to call rele on the inpcb */ + if ((inp->inp_flags2 & INP_IN_REXMTQ) == 0) + return; + + inp->inp_rexmt = NULL; +} + +void +inp_rexmt_start(uint32_t qid, uint32_t nqs) +{ + int i, start, count; + + MPASS(nqs > 0); + + if (nqs == 1) { + count = mp_ncpus; + start = 0; + } else if (nqs == mp_ncpus) { + count = 1; + start = qid; + } else if (nqs < mp_ncpus && (mp_ncpus % nqs) == 0) { + count = mp_ncpus/nqs; + start = count*qid; + } else if (nqs > mp_ncpus && (nqs % mp_ncpus) == 0) { + count = 1; + start = qid/mp_ncpus; + } else { + /* XXX chance for future optimization + * there isn't a trivial way to map queues to + * cpus in this case + */ + count = mp_ncpus; + start = 0; + } + + for (i = start; i < start + count; i++) + GROUPTASK_ENQUEUE(&inp_rexmt_gt[i]); +} + +/* backpressure end */ + + struct inpcb * so_sotoinpcb(struct socket *so) { Index: sys/netinet/tcp_output.c =================================================================== --- sys/netinet/tcp_output.c +++ sys/netinet/tcp_output.c @@ -126,6 +126,13 @@ &VNET_NAME(tcp_autosndbuf_max), 0, "Max size of automatic send buffer"); +VNET_DEFINE(int, tcp_output_enobufs) = 0; +#define V_tcp_output_enobufs VNET(tcp_output_enobufs) +SYSCTL_INT(_net_inet_tcp, OID_AUTO, tcp_output_enobufs, CTLFLAG_VNET | CTLFLAG_RW, + &VNET_NAME(tcp_output_enobufs), 0, + "number of times ENOBUFS returned"); + + static void inline hhook_run_tcp_est_out(struct tcpcb *tp, struct tcphdr *th, struct tcpopt *to, long len, int tso); @@ -164,6 +171,14 @@ CC_ALGO(tp)->after_idle(tp->ccv); } + +static void +tcp_rexmt_output(struct inpcb *inp) +{ + + (void) tcp_output(inp->inp_ppcb); +} + /* * Tcp output routine: figure out what should be sent and send it. */ @@ -1498,10 +1513,8 @@ tp->t_softerror = error; return (error); case ENOBUFS: - if (!tcp_timer_active(tp, TT_REXMT) && - !tcp_timer_active(tp, TT_PERSIST)) - tcp_timer_activate(tp, TT_REXMT, tp->t_rxtcur); - tp->snd_cwnd = tp->t_maxseg; + atomic_add_int(&V_tcp_output_enobufs, 1); + inp_rexmt_enqueue(tp->t_inpcb, tcp_rexmt_output); return (0); case EMSGSIZE: /* Index: sys/ofed/drivers/net/mlx4/en_tx.c =================================================================== --- sys/ofed/drivers/net/mlx4/en_tx.c +++ sys/ofed/drivers/net/mlx4/en_tx.c @@ -89,7 +89,8 @@ /* Allocate the buf ring */ ring->br = buf_ring_alloc(MLX4_EN_DEF_TX_QUEUE_SIZE, M_DEVBUF, - M_WAITOK, &ring->tx_lock.m); + M_WAITOK, &ring->tx_lock.m, queue_idx, + priv->tx_ring_num); if (ring->br == NULL) { en_err(priv, "Failed allocating tx_info ring\n"); return -ENOMEM; Index: sys/sys/buf_ring.h =================================================================== --- sys/sys/buf_ring.h +++ sys/sys/buf_ring.h @@ -41,6 +41,8 @@ #include #endif +void if_rexmt_start(int qid, int nqs); + /* cache line align buf ring entries */ #define BR_FLAGS_ALIGNED 0x1 @@ -61,6 +63,9 @@ volatile uint32_t br_cons_tail; int br_cons_size; int br_cons_mask; + int br_id; + int br_nqs; + int br_closed; #ifdef DEBUG_BUFRING struct mtx *br_lock; #endif @@ -70,6 +75,9 @@ struct br_entry_ br_ring[0] __aligned(CACHE_LINE_SIZE); }; + +static __inline int buf_ring_count(struct buf_ring *br); + /* * ring entry accessors to allow us to make ring entry * alignment determined at runtime @@ -191,6 +199,10 @@ buf, i, br->br_prod_tail, br->br_cons_tail); #endif critical_enter(); + if (br->br_closed == TRUE) { + critical_exit(); + return (ENOBUFS); + } do { prod_head = br->br_prod_head; @@ -206,6 +218,7 @@ continue; br->br_drops++; + br->br_closed = TRUE; critical_exit(); return (ENOBUFS); } @@ -333,7 +346,10 @@ br->br_cons_tail, cons_head); #endif atomic_store_rel_32(&br->br_cons_tail, cons_next); - + if (br->br_closed == TRUE && buf_ring_count(br) < (br->br_prod_size >> 1)) { + br->br_closed = FALSE; + if_rexmt_start(br->br_id, br->br_nqs); + } return ((void *)(uintptr_t)buf); } @@ -366,6 +382,10 @@ */ br->br_ring[cons_head].bre_ptr = NULL; atomic_store_rel_32(&br->br_cons_tail, cons_next); + if (br->br_closed == TRUE && buf_ring_count(br) < (br->br_prod_size >> 1)) { + br->br_closed = FALSE; + if_rexmt_start(br->br_id, br->br_nqs); + } } /* @@ -451,7 +471,7 @@ } struct buf_ring *buf_ring_alloc(int count, struct malloc_type *type, int flags, - struct mtx *); + struct mtx *, int id, int nqs); struct buf_ring *buf_ring_aligned_alloc(int count, struct malloc_type *type, int flags, struct mtx *); void buf_ring_free(struct buf_ring *br, struct malloc_type *type);