Index: sys/sys/buf_ring.h =================================================================== --- sys/sys/buf_ring.h +++ sys/sys/buf_ring.h @@ -1,5 +1,5 @@ /*- - * Copyright (c) 2007-2009 Kip Macy + * Copyright (c) 2007-2015 Kip Macy * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -58,7 +58,85 @@ }; /* - * multi-producer safe lock-free ring buffer enqueue + * Many architectures other than x86 permit speculative re-ordering + * of loads. Unfortunately, atomic_load_acq_32() is comparatively + * expensive so we'd rather elide it if possible. + */ +#if defined(__i386__) || defined(__amd64__) +#define ORDERED_LOAD_32(x) (*x) +#else +#define ORDERED_LOAD_32(x) atomic_load_acq_32((x)) +#endif + +/* + * Multi-producer safe lock-free ring buffer enqueue + * + * Most architectures do not support the atomic update of multiple + * discontiguous locations. So it is not possible to atomically update + * the producer index and ring buffer entry. To side-step this limitation + * we split update in to 3 steps: + * 1) atomically acquiring an index + * 2) updating the corresponding ring entry + * 3) making the update available to the consumer + * In order to split the index update in to an acquire and release + * phase there are _two_ producer indexes. 'prod_head' is used for + * step 1) and is thus only used by the enqueue itself. 'prod_tail' + * is used for step 3) to signal to the consumer that the update is + * complete. To guarantee memory ordering the update of 'prod_tail' is + * done with a atomic_store_rel_32(...) and the corresponding + * initial read of 'prod_tail' by the dequeue functions is done with + * an atomic_load_acq_32(...). + * + * Regarding memory ordering - there are five variables in question: + * (br_) prod_head, prod_tail, cons_head, cons_tail, ring[idx={cons, prod}] + * It's easiest examine correctness by considering the consequence of + * reading a stale value or having an update become visible prior to + * preceding writes. + * + * - prod_head: this is only read by the enqueue routine, if the latter were to + * initially read a stale value for it the cmpxchg (atomic_cmpset_acq_32) + * would fail. However, the implied memory barrier in cmpxchg would cause the + * subsequent read of prod_head to read the up-to-date value permitting the + * cmpxchg to succeed the second time. + * + * - prod_tail: This value is used by dequeue to determine the effective + * producer index. On architectures with weaker memory ordering than x86 it + * needs special handling. In enqueue it needs to be updated with + * atomic_store_rel_32() (i.e. a write memory barrier before update) to + * guarantee that the new ring value is committed to memory before it is + * made available by prod_tail. In dequeue to guarantee that it is read before + * br_ring[cons_head] it needs to be read with atomic_load_acq_32(). + * + * - cons_head: this value is used only by dequeue, it is either updated + * atomically (dequeue_mc) or protected by a mutex (dequeue_sc). + * + * - cons_tail: This is used to communicate the latest consumer index between + * dequeue and enqueue. Reading a stale value in enqueue can cause an enqueue + * to fail erroneously. To avoid a load being re-ordered after a store (and + * thus permitting enqueue to store a new value before the old one has been + * consumed) it is updated with an atomic_store_rel_32() in deqeueue. + * + * - ring[idx] : Updates to this value need to reach memory before the subsequent + * update to prod_tail does. Reads need to happen before subsequent updates to + * cons_tail. + * + * Some implementation notes: + * - Much like a simpler single-producer single consumer ring buffer, + * the producer can not produce faster than the consumer. Hence the + * check of 'prod_head' + 1 against 'cons_tail'. + * + * - The use of "prod_next = (prod_head + 1) & br->br_prod_mask" to + * calculate the next index is slightly cheaper than a modulo but + * requires the ring to be power-of-2 sized. + * + * - The critical_enter() / critical_exit() are not required for + * correctness. They prevent updates from stalling by having a producer be + * preempted after updating 'prod_head' but before updating 'prod_tail'. + * + * - The "while (br->br_prod_tail != prod_head)" + * check assures in order completion (probably not strictly necessary, + * but makes it easier to reason about) and allows us to update + * 'prod_tail' without a cmpxchg / LOCK prefix. * */ static __inline int @@ -75,21 +153,24 @@ #endif critical_enter(); do { + prod_head = br->br_prod_head; prod_next = (prod_head + 1) & br->br_prod_mask; cons_tail = br->br_cons_tail; if (prod_next == cons_tail) { - rmb(); - if (prod_head == br->br_prod_head && - cons_tail == br->br_cons_tail) { - br->br_drops++; - critical_exit(); - return (ENOBUFS); - } - continue; + /* ensure that we only return ENOBUFS + * if the latest value matches what we read + */ + if (prod_head != atomic_load_acq_32(&br->br_prod_head) || + cons_tail != atomic_load_acq_32(&br->br_cons_tail)) + continue; + + br->br_drops++; + critical_exit(); + return (ENOBUFS); } - } while (!atomic_cmpset_acq_int(&br->br_prod_head, prod_head, prod_next)); + } while (!atomic_cmpset_acq_32(&br->br_prod_head, prod_head, prod_next)); #ifdef DEBUG_BUFRING if (br->br_ring[prod_head] != NULL) panic("dangling value in enqueue"); @@ -99,11 +180,15 @@ /* * If there are other enqueues in progress * that preceeded us, we need to wait for them - * to complete - */ + * to complete + * re-ordering of reads would not effect correctness + */ while (br->br_prod_tail != prod_head) - cpu_spinwait(); - atomic_store_rel_int(&br->br_prod_tail, prod_next); + cpu_spinwait(); + /* ensure that the ring update reaches memory before the new + * value of prod_tail + */ + atomic_store_rel_32(&br->br_prod_tail, prod_next); critical_exit(); return (0); } @@ -120,28 +205,39 @@ critical_enter(); do { + /* + * There is no logical ordering required in the ordering of the + * prod_tail and cons_head + */ cons_head = br->br_cons_head; - cons_next = (cons_head + 1) & br->br_cons_mask; - if (cons_head == br->br_prod_tail) { critical_exit(); return (NULL); } - } while (!atomic_cmpset_acq_int(&br->br_cons_head, cons_head, cons_next)); + cons_next = (cons_head + 1) & br->br_cons_mask; + } while (!atomic_cmpset_acq_32(&br->br_cons_head, cons_head, cons_next)); + /* ensure that the read completes before either of the + * subsequent stores + */ buf = br->br_ring[cons_head]; -#ifdef DEBUG_BUFRING + /* guarantee that the load completes before we update cons_tail */ br->br_ring[cons_head] = NULL; -#endif + /* * If there are other dequeues in progress * that preceeded us, we need to wait for them - * to complete - */ + * to complete - no memory barrier needed as + * re-ordering shouldn't effect correctness or + * progress + */ while (br->br_cons_tail != cons_head) cpu_spinwait(); - - atomic_store_rel_int(&br->br_cons_tail, cons_next); + /* rather than doing a atomic_load_acq_ptr() on the ring + * we guarantee that the load completes first by ensuring + * release semantics on tail - this is cheaper on x86 + */ + atomic_store_rel_32(&br->br_cons_tail, cons_next); critical_exit(); return (buf); @@ -158,22 +254,25 @@ uint32_t cons_head, cons_next; #ifdef PREFETCH_DEFINED uint32_t cons_next_next; -#endif uint32_t prod_tail; +#endif void *buf; - + +retry: + /* + * prod_tail tells whether or not br_ring[cons_head] is valid + * thus we must guarantee that it is read first + */ cons_head = br->br_cons_head; - prod_tail = br->br_prod_tail; - + if (cons_head == ORDERED_LOAD_32(&br->br_prod_tail)) { + if (cons_head != atomic_load_acq_32(&br->br_prod_tail)) + goto retry; + return (NULL); + } cons_next = (cons_head + 1) & br->br_cons_mask; #ifdef PREFETCH_DEFINED + prod_tail = br->br_prod_tail; cons_next_next = (cons_head + 2) & br->br_cons_mask; -#endif - - if (cons_head == prod_tail) - return (NULL); - -#ifdef PREFETCH_DEFINED if (cons_next != prod_tail) { prefetch(br->br_ring[cons_next]); if (cons_next_next != prod_tail) @@ -182,16 +281,17 @@ #endif br->br_cons_head = cons_next; buf = br->br_ring[cons_head]; - -#ifdef DEBUG_BUFRING + /* guarantee that the load completes before we update cons_tail */ br->br_ring[cons_head] = NULL; +#ifdef DEBUG_BUFRING if (!mtx_owned(br->br_lock)) panic("lock not held on single consumer dequeue"); if (br->br_cons_tail != cons_head) panic("inconsistent list cons_tail=%d cons_head=%d", br->br_cons_tail, cons_head); #endif - br->br_cons_tail = cons_next; + atomic_store_rel_32(&br->br_cons_tail, cons_next); + return (buf); } @@ -205,7 +305,7 @@ { uint32_t cons_head, cons_next; uint32_t prod_tail; - + cons_head = br->br_cons_head; prod_tail = br->br_prod_tail; @@ -213,10 +313,17 @@ if (cons_head == prod_tail) return; br->br_cons_head = cons_next; -#ifdef DEBUG_BUFRING + + /* + * Storing NULL here serves two purposes: + * 1) it assures that the load of ring[cons_head] has completed + * (only the most perverted architecture or compiler would + * consider re-ordering a = *x; *x = b) + * 2) it allows us to enforce global ordering of the cons_tail + * update with an atomic_store_rel_32 + */ br->br_ring[cons_head] = NULL; -#endif - br->br_cons_tail = cons_next; + atomic_store_rel_32(&br->br_cons_tail, cons_next); } /* @@ -234,6 +341,8 @@ * back (since jhb says the store is probably cheaper), * if we have to do a multi-queue version we will need * the compare and an atomic. + * + * should be a no-op */ static __inline void buf_ring_putback_sc(struct buf_ring *br, void *new) @@ -251,33 +360,40 @@ static __inline void * buf_ring_peek(struct buf_ring *br) { - + uint32_t cons_head; #ifdef DEBUG_BUFRING if ((br->br_lock != NULL) && !mtx_owned(br->br_lock)) panic("lock not held on single consumer dequeue"); #endif + cons_head = br->br_cons_head; /* - * I believe it is safe to not have a memory barrier - * here because we control cons and tail is worst case - * a lagging indicator so we worst case we might - * return NULL immediately after a buffer has been enqueued + * for correctness prod_tail must be read before ring[cons_head] */ - if (br->br_cons_head == br->br_prod_tail) + + if (cons_head == ORDERED_LOAD_32(&br->br_prod_tail)) return (NULL); - - return (br->br_ring[br->br_cons_head]); + + /* ensure that the ring load completes before + * exposing it to any destructive updates + */ + return (br->br_ring[cons_head]); } static __inline int buf_ring_full(struct buf_ring *br) { - + /* br_cons_tail may be stale but the consumer understands that this is + * only a point in time snapshot + */ return (((br->br_prod_head + 1) & br->br_prod_mask) == br->br_cons_tail); } static __inline int buf_ring_empty(struct buf_ring *br) { + /* br_prod_tail may be stale but the consumer understands that this is + * only a point in time snapshot + */ return (br->br_cons_head == br->br_prod_tail); } @@ -285,6 +401,9 @@ static __inline int buf_ring_count(struct buf_ring *br) { + /* br_cons_tail and br_prod_tail may be stale but the consumer + * understands that this is only a point in time snapshot + */ return ((br->br_prod_size + br->br_prod_tail - br->br_cons_tail) & br->br_prod_mask);