Index: sys/sys/buf_ring.h =================================================================== --- sys/sys/buf_ring.h +++ sys/sys/buf_ring.h @@ -1,5 +1,5 @@ /*- - * Copyright (c) 2007-2009 Kip Macy + * Copyright (c) 2007-2015 Kip Macy * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -41,6 +41,9 @@ #include #endif +/* workaround sloppy API that doesn't actually conform to its name */ +#define atomic_load_acq_ptr_workaround(x) (void *)atomic_load_acq_ptr((uintptr_t *)(uintptr_t)(x)) + struct buf_ring { volatile uint32_t br_prod_head; volatile uint32_t br_prod_tail; @@ -58,7 +61,70 @@ }; /* - * multi-producer safe lock-free ring buffer enqueue + * Multi-producer safe lock-free ring buffer enqueue + * + * Most architectures do not support the atomic update of multiple + * discontiguous locations. So it is not possible to atomically update + * the producer index and ring buffer entry. To side-step this limitation + * we split update in to 3 steps: + * 1) atomically acquiring an index + * 2) updating the corresponding ring entry + * 3) making the update available to the consumer + * In order to split the index update in to an acquire and release + * phase there are _two_ producer indexes. 'prod_head' is used for + * step 1) and is thus only used by the enqueue itself. 'prod_tail' + * is used for step 3) to signal to the consumer that the update is + * complete. To guarantee memory ordering the update of 'prod_tail' is + * done with a atomic_store_rel_32(...) and the corresponding + * initial read of 'prod_tail' by the dequeue functions is done with + * an atomic_load_acq_32(...). + * + * Regarding memory ordering - there are five variables in question: + * (br_) prod_head, prod_tail, cons_head, cons_tail, ring[idx={cons, prod}] + * It's easiest examine correctness by considering the consequence of + * reading a stale value or having an update become visible prior to + * preceding writes. + * + * - prod_head: this is only read by the enqueue routine, if the latter were to + * initially read a stale value for it the cmpxchg (atomic_cmpset_acq_32) + * would fail. However, the implied memory barrier in cmpxchg would cause the + * subsequent read of prod_head to read the up-to-date value permitting the + * cmpxchg to succeed the second time. + * + * - prod_tail: this value is used by dequeue to determine the effective + * producer index. The absence of a memory barrier preceding it would make + * it possible for dequeue to read the new value prior to the update to + * ring[idx] becomings visible, thus reading a stale value in ring[idx]. + * + * - cons_head: this value is used only by dequeue, it is either updated + * atomically (dequeue_mc) or protected by a mutex (dequeue_sc). + * + * - cons_tail: This is used to communicate the latest consumer index between + * dequeue and enqueue. Reading a stale value in enqueue can cause an enqueue + * to fail erroneously. There is no risk of reading a _new_ value of cons_tail + * before br_ring[idx] has been read because the value is not updated until + * following the read. + * + * - ring[idx] : Updates to this value need to reach memory before the subsequent + * update to prod_tail does. + * + * Some implementation notes: + * - Much like a simpler single-producer single consumer ring buffer, + * the producer can not produce faster than the consumer. Hence the + * check of 'prod_head' + 1 against 'cons_tail'. + * + * - The use of "prod_next = (prod_head + 1) & br->br_prod_mask" to + * calculate the next index is slightly cheaper than a modulo but + * requires the ring to be power-of-2 sized. + * + * - The critical_enter() / critical_exit() are not required for + * correctness. They prevent updates from stalling by having a producer be + * preempted after updating 'prod_head' but before updating 'prod_tail'. + * + * - The "while (br->br_prod_tail != prod_head)" + * check assures in order completion (probably not strictly necessary, + * but makes it easier to reason about) and allows us to update + * 'prod_tail' without a cmpxchg / LOCK prefix. * */ static __inline int @@ -75,21 +141,26 @@ #endif critical_enter(); do { + + cons_tail = br->br_cons_tail; prod_head = br->br_prod_head; prod_next = (prod_head + 1) & br->br_prod_mask; - cons_tail = br->br_cons_tail; if (prod_next == cons_tail) { - rmb(); - if (prod_head == br->br_prod_head && - cons_tail == br->br_cons_tail) { + /* We need to assure that we have up to date values + * but there is no apparent load ordering required + * this really just gives us a chance to re-read + * the values + */ + if (prod_head == atomic_load_acq_32(&br->br_prod_head) && + cons_tail == atomic_load_acq_32(&br->br_cons_tail)) { br->br_drops++; critical_exit(); return (ENOBUFS); } continue; } - } while (!atomic_cmpset_acq_int(&br->br_prod_head, prod_head, prod_next)); + } while (!atomic_cmpset_acq_32(&br->br_prod_head, prod_head, prod_next)); #ifdef DEBUG_BUFRING if (br->br_ring[prod_head] != NULL) panic("dangling value in enqueue"); @@ -99,11 +170,15 @@ /* * If there are other enqueues in progress * that preceeded us, we need to wait for them - * to complete - */ + * to complete + * re-ordering of reads would not effect correctness + */ while (br->br_prod_tail != prod_head) - cpu_spinwait(); - atomic_store_rel_int(&br->br_prod_tail, prod_next); + cpu_spinwait(); + /* ensure that the ring update reaches memory before the new + * value of prod_tail + */ + atomic_store_rel_32(&br->br_prod_tail, prod_next); critical_exit(); return (0); } @@ -120,28 +195,42 @@ critical_enter(); do { + /* + * There is no logical ordering required in the ordering of the + * prod_tail and cons_head + */ cons_head = br->br_cons_head; - cons_next = (cons_head + 1) & br->br_cons_mask; - if (cons_head == br->br_prod_tail) { critical_exit(); return (NULL); } - } while (!atomic_cmpset_acq_int(&br->br_cons_head, cons_head, cons_next)); + cons_next = (cons_head + 1) & br->br_cons_mask; + } while (!atomic_cmpset_acq_32(&br->br_cons_head, cons_head, cons_next)); + + /* ensure that the read completes before either of the + * subsequent stores + */ + buf = atomic_load_acq_ptr_workaround(&br->br_ring[cons_head]); - buf = br->br_ring[cons_head]; #ifdef DEBUG_BUFRING br->br_ring[cons_head] = NULL; #endif /* * If there are other dequeues in progress * that preceeded us, we need to wait for them - * to complete - */ + * to complete - no memory barrier needed as + * re-ordering shouldn't effect correctness or + * progress + */ while (br->br_cons_tail != cons_head) cpu_spinwait(); - - atomic_store_rel_int(&br->br_cons_tail, cons_next); +#ifdef DEBUG_BUFRING + /* a memory barrier is only needed here if we wish to guarantee the + * the NULL store completes first + */ + atomic_store_rel_32(&br->br_cons_tail, cons_next); +#endif + br->br_cons_tail = cons_next; critical_exit(); return (buf); @@ -161,19 +250,18 @@ #endif uint32_t prod_tail; void *buf; - + + /* + * guarantee that we read prod_tail before reading ring[cons_next] + * thus assuring that we can't read a stale ring entry value + */ + prod_tail = atomic_load_acq_32(&br->br_prod_tail); cons_head = br->br_cons_head; - prod_tail = br->br_prod_tail; - + if (cons_head == prod_tail) + return (NULL); cons_next = (cons_head + 1) & br->br_cons_mask; #ifdef PREFETCH_DEFINED cons_next_next = (cons_head + 2) & br->br_cons_mask; -#endif - - if (cons_head == prod_tail) - return (NULL); - -#ifdef PREFETCH_DEFINED if (cons_next != prod_tail) { prefetch(br->br_ring[cons_next]); if (cons_next_next != prod_tail) @@ -181,7 +269,8 @@ } #endif br->br_cons_head = cons_next; - buf = br->br_ring[cons_head]; + /* assure the read completes before we allow the producer to overwrite */ + buf = atomic_load_acq_ptr_workaround(&br->br_ring[cons_head]); #ifdef DEBUG_BUFRING br->br_ring[cons_head] = NULL; @@ -190,8 +279,10 @@ if (br->br_cons_tail != cons_head) panic("inconsistent list cons_tail=%d cons_head=%d", br->br_cons_tail, cons_head); + atomic_store_rel_32(&br->br_cons_tail, cons_next); #endif - br->br_cons_tail = cons_next; + /* no release required as there are no updates that have to precede this */ + br->br_cons_tail = cons_next; return (buf); } @@ -205,16 +296,18 @@ { uint32_t cons_head, cons_next; uint32_t prod_tail; - - cons_head = br->br_cons_head; + prod_tail = br->br_prod_tail; + cons_head = br->br_cons_head; cons_next = (cons_head + 1) & br->br_cons_mask; if (cons_head == prod_tail) return; br->br_cons_head = cons_next; + /* we only need to guarantee memory ordering when storing to br_ring */ #ifdef DEBUG_BUFRING br->br_ring[cons_head] = NULL; + atomic_store_rel_32(&br->br_cons_tail, cons_next); #endif br->br_cons_tail = cons_next; } @@ -234,6 +327,8 @@ * back (since jhb says the store is probably cheaper), * if we have to do a multi-queue version we will need * the compare and an atomic. + * + * should be a no-op */ static __inline void buf_ring_putback_sc(struct buf_ring *br, void *new) @@ -251,33 +346,37 @@ static __inline void * buf_ring_peek(struct buf_ring *br) { - + uint32_t prod_tail; #ifdef DEBUG_BUFRING if ((br->br_lock != NULL) && !mtx_owned(br->br_lock)) panic("lock not held on single consumer dequeue"); #endif - /* - * I believe it is safe to not have a memory barrier - * here because we control cons and tail is worst case - * a lagging indicator so we worst case we might - * return NULL immediately after a buffer has been enqueued - */ - if (br->br_cons_head == br->br_prod_tail) + + /* ensure that prod_tail is read before ring */ + prod_tail = atomic_load_acq_32(&br->br_prod_tail); + if (br->br_cons_head == prod_tail) return (NULL); - - return (br->br_ring[br->br_cons_head]); + /* ensure that the ring load completes before + * exposing it to any destructive updates + */ + return (atomic_load_acq_ptr_workaround(&br->br_ring[br->br_cons_head])); } static __inline int buf_ring_full(struct buf_ring *br) { - + /* br_cons_tail may be stale but the consumer understands that this is + * only a point in time snapshot + */ return (((br->br_prod_head + 1) & br->br_prod_mask) == br->br_cons_tail); } static __inline int buf_ring_empty(struct buf_ring *br) { + /* br_prod_tail may be stale but the consumer understands that this is + * only a point in time snapshot + */ return (br->br_cons_head == br->br_prod_tail); } @@ -285,6 +384,9 @@ static __inline int buf_ring_count(struct buf_ring *br) { + /* br_cons_tail and br_prod_tail may be stale but the consumer + * understands that this is only a point in time snapshot + */ return ((br->br_prod_size + br->br_prod_tail - br->br_cons_tail) & br->br_prod_mask);