Index: sys/sys/buf_ring.h =================================================================== --- sys/sys/buf_ring.h +++ sys/sys/buf_ring.h @@ -1,5 +1,5 @@ /*- - * Copyright (c) 2007-2009 Kip Macy + * Copyright (c) 2007-2015 Kip Macy * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -41,24 +41,113 @@ #include #endif +struct br_entry_ { + volatile void *bre_ptr; +}; + + struct buf_ring { volatile uint32_t br_prod_head; volatile uint32_t br_prod_tail; int br_prod_size; int br_prod_mask; uint64_t br_drops; + /* cache line aligned to avoid cache line invalidate traffic + * between consumer and producer (false sharing) + */ volatile uint32_t br_cons_head __aligned(CACHE_LINE_SIZE); volatile uint32_t br_cons_tail; int br_cons_size; int br_cons_mask; #ifdef DEBUG_BUFRING struct mtx *br_lock; -#endif - void *br_ring[0] __aligned(CACHE_LINE_SIZE); +#endif + /* cache line aligned to avoid false sharing with other data structures + * located just beyond the end of the ring + */ + struct br_entry_ br_ring[0] __aligned(CACHE_LINE_SIZE); }; /* - * multi-producer safe lock-free ring buffer enqueue + * Many architectures other than x86 permit speculative re-ordering + * of loads. Unfortunately, atomic_load_acq_32() is comparatively + * expensive so we'd rather elide it if possible. + */ +#if defined(__i386__) || defined(__amd64__) +#define ORDERED_LOAD_32(x) (*x) +#else +#define ORDERED_LOAD_32(x) atomic_load_acq_32((x)) +#endif + +/* + * Multi-producer safe lock-free ring buffer enqueue + * + * Most architectures do not support the atomic update of multiple + * discontiguous locations. So it is not possible to atomically update + * the producer index and ring buffer entry. To side-step this limitation + * we split update in to 3 steps: + * 1) atomically acquiring an index + * 2) updating the corresponding ring entry + * 3) making the update available to the consumer + * In order to split the index update in to an acquire and release + * phase there are _two_ producer indexes. 'prod_head' is used for + * step 1) and is thus only used by the enqueue itself. 'prod_tail' + * is used for step 3) to signal to the consumer that the update is + * complete. To guarantee memory ordering the update of 'prod_tail' is + * done with a atomic_store_rel_32(...) and the corresponding + * initial read of 'prod_tail' by the dequeue functions is done with + * an atomic_load_acq_32(...). + * + * Regarding memory ordering - there are five variables in question: + * (br_) prod_head, prod_tail, cons_head, cons_tail, ring[idx={cons, prod}] + * It's easiest examine correctness by considering the consequence of + * reading a stale value or having an update become visible prior to + * preceding writes. + * + * - prod_head: this is only read by the enqueue routine, if the latter were to + * initially read a stale value for it the cmpxchg (atomic_cmpset_acq_32) + * would fail. However, the implied memory barrier in cmpxchg would cause the + * subsequent read of prod_head to read the up-to-date value permitting the + * cmpxchg to succeed the second time. + * + * - prod_tail: This value is used by dequeue to determine the effective + * producer index. On architectures with weaker memory ordering than x86 it + * needs special handling. In enqueue it needs to be updated with + * atomic_store_rel_32() (i.e. a write memory barrier before update) to + * guarantee that the new ring value is committed to memory before it is + * made available by prod_tail. In dequeue to guarantee that it is read before + * br_ring[cons_head] it needs to be read with atomic_load_acq_32(). + * + * - cons_head: this value is used only by dequeue, it is either updated + * atomically (dequeue_mc) or protected by a mutex (dequeue_sc). + * + * - cons_tail: This is used to communicate the latest consumer index between + * dequeue and enqueue. Reading a stale value in enqueue can cause an enqueue + * to fail erroneously. To avoid a load being re-ordered after a store (and + * thus permitting enqueue to store a new value before the old one has been + * consumed) it is updated with an atomic_store_rel_32() in deqeueue. + * + * - ring[idx] : Updates to this value need to reach memory before the subsequent + * update to prod_tail does. Reads need to happen before subsequent updates to + * cons_tail. + * + * Some implementation notes: + * - Much like a simpler single-producer single consumer ring buffer, + * the producer can not produce faster than the consumer. Hence the + * check of 'prod_head' + 1 against 'cons_tail'. + * + * - The use of "prod_next = (prod_head + 1) & br->br_prod_mask" to + * calculate the next index is slightly cheaper than a modulo but + * requires the ring to be power-of-2 sized. + * + * - The critical_enter() / critical_exit() are not required for + * correctness. They prevent updates from stalling by having a producer be + * preempted after updating 'prod_head' but before updating 'prod_tail'. + * + * - The "while (br->br_prod_tail != prod_head)" + * check assures in order completion (probably not strictly necessary, + * but makes it easier to reason about) and allows us to update + * 'prod_tail' without a cmpxchg / LOCK prefix. * */ static __inline int @@ -69,41 +158,48 @@ int i; for (i = br->br_cons_head; i != br->br_prod_head; i = ((i + 1) & br->br_cons_mask)) - if(br->br_ring[i] == buf) + if(br->br_ring[i].bre_ptr == buf) panic("buf=%p already enqueue at %d prod=%d cons=%d", buf, i, br->br_prod_tail, br->br_cons_tail); #endif critical_enter(); do { + prod_head = br->br_prod_head; prod_next = (prod_head + 1) & br->br_prod_mask; cons_tail = br->br_cons_tail; if (prod_next == cons_tail) { - rmb(); - if (prod_head == br->br_prod_head && - cons_tail == br->br_cons_tail) { - br->br_drops++; - critical_exit(); - return (ENOBUFS); - } - continue; + /* ensure that we only return ENOBUFS + * if the latest value matches what we read + */ + if (prod_head != atomic_load_acq_32(&br->br_prod_head) || + cons_tail != atomic_load_acq_32(&br->br_cons_tail)) + continue; + + br->br_drops++; + critical_exit(); + return (ENOBUFS); } - } while (!atomic_cmpset_acq_int(&br->br_prod_head, prod_head, prod_next)); + } while (!atomic_cmpset_acq_32(&br->br_prod_head, prod_head, prod_next)); #ifdef DEBUG_BUFRING - if (br->br_ring[prod_head] != NULL) + if (br->br_ring[prod_head].bre_ptr != NULL) panic("dangling value in enqueue"); #endif - br->br_ring[prod_head] = buf; + br->br_ring[prod_head].bre_ptr = buf; /* * If there are other enqueues in progress - * that preceeded us, we need to wait for them - * to complete - */ + * that preceded us, we need to wait for them + * to complete + * re-ordering of reads would not effect correctness + */ while (br->br_prod_tail != prod_head) cpu_spinwait(); - atomic_store_rel_int(&br->br_prod_tail, prod_next); + /* ensure that the ring update reaches memory before the new + * value of prod_tail + */ + atomic_store_rel_32(&br->br_prod_tail, prod_next); critical_exit(); return (0); } @@ -116,35 +212,47 @@ buf_ring_dequeue_mc(struct buf_ring *br) { uint32_t cons_head, cons_next; - void *buf; + volatile void *buf; critical_enter(); do { + /* + * prod_tail must be read before br_ring[cons_head] is + * and the atomic_cmpset_acq_32 on br_cons_head should + * enforce that + */ cons_head = br->br_cons_head; - cons_next = (cons_head + 1) & br->br_cons_mask; - if (cons_head == br->br_prod_tail) { critical_exit(); return (NULL); } - } while (!atomic_cmpset_acq_int(&br->br_cons_head, cons_head, cons_next)); + cons_next = (cons_head + 1) & br->br_cons_mask; + } while (!atomic_cmpset_acq_32(&br->br_cons_head, cons_head, cons_next)); + + /* ensure that the read completes before either of the + * subsequent stores + */ + buf = br->br_ring[cons_head].bre_ptr; + /* guarantee that the load completes before we update cons_tail */ + br->br_ring[cons_head].bre_ptr = NULL; - buf = br->br_ring[cons_head]; -#ifdef DEBUG_BUFRING - br->br_ring[cons_head] = NULL; -#endif /* * If there are other dequeues in progress - * that preceeded us, we need to wait for them - * to complete - */ + * that preceded us, we need to wait for them + * to complete - no memory barrier needed as + * re-ordering shouldn't effect correctness or + * progress + */ while (br->br_cons_tail != cons_head) cpu_spinwait(); - - atomic_store_rel_int(&br->br_cons_tail, cons_next); + /* + * assure that the ring entry is read before + * marking the entry as free by updating cons_tail + */ + atomic_store_rel_32(&br->br_cons_tail, cons_next); critical_exit(); - return (buf); + return ((void *)buf); } /* @@ -158,41 +266,47 @@ uint32_t cons_head, cons_next; #ifdef PREFETCH_DEFINED uint32_t cons_next_next; -#endif uint32_t prod_tail; - void *buf; - +#endif + volatile void *buf; + + /* + * prod_tail tells whether or not br_ring[cons_head] is valid + * thus we must guarantee that it is read first + */ cons_head = br->br_cons_head; - prod_tail = br->br_prod_tail; - + if (cons_head == ORDERED_LOAD_32(&br->br_prod_tail)) + return (NULL); + cons_next = (cons_head + 1) & br->br_cons_mask; #ifdef PREFETCH_DEFINED + /* + * If prod_tail is stale we will prefetch the wrong value - but this is safe + * as cache coherence (should) ensure that the when the value is loaded for + * actual use it is fetched from main memory + */ + prod_tail = br->br_prod_tail; cons_next_next = (cons_head + 2) & br->br_cons_mask; -#endif - - if (cons_head == prod_tail) - return (NULL); - -#ifdef PREFETCH_DEFINED if (cons_next != prod_tail) { - prefetch(br->br_ring[cons_next]); + prefetch(br->br_ring[cons_next].bre_ptr); if (cons_next_next != prod_tail) - prefetch(br->br_ring[cons_next_next]); + prefetch(br->br_ring[cons_next_next].bre_ptr); } #endif br->br_cons_head = cons_next; - buf = br->br_ring[cons_head]; - + buf = br->br_ring[cons_head].bre_ptr; + /* guarantee that the load completes before we update cons_tail */ + br->br_ring[cons_head].bre_ptr = NULL; #ifdef DEBUG_BUFRING - br->br_ring[cons_head] = NULL; if (!mtx_owned(br->br_lock)) panic("lock not held on single consumer dequeue"); if (br->br_cons_tail != cons_head) panic("inconsistent list cons_tail=%d cons_head=%d", br->br_cons_tail, cons_head); #endif - br->br_cons_tail = cons_next; - return (buf); + atomic_store_rel_32(&br->br_cons_tail, cons_next); + + return ((void *)buf); } /* @@ -205,7 +319,7 @@ { uint32_t cons_head, cons_next; uint32_t prod_tail; - + cons_head = br->br_cons_head; prod_tail = br->br_prod_tail; @@ -213,10 +327,17 @@ if (cons_head == prod_tail) return; br->br_cons_head = cons_next; -#ifdef DEBUG_BUFRING - br->br_ring[cons_head] = NULL; -#endif - br->br_cons_tail = cons_next; + + /* + * Storing NULL here serves two purposes: + * 1) it assures that the load of ring[cons_head] has completed + * (only the most perverted architecture or compiler would + * consider re-ordering a = *x; *x = b) + * 2) it allows us to enforce global ordering of the cons_tail + * update with an atomic_store_rel_32 + */ + br->br_ring[cons_head].bre_ptr = NULL; + atomic_store_rel_32(&br->br_cons_tail, cons_next); } /* @@ -234,13 +355,14 @@ * back (since jhb says the store is probably cheaper), * if we have to do a multi-queue version we will need * the compare and an atomic. + * */ static __inline void buf_ring_putback_sc(struct buf_ring *br, void *new) { KASSERT(br->br_cons_head != br->br_prod_tail, ("Buf-Ring has none in putback")) ; - br->br_ring[br->br_cons_head] = new; + br->br_ring[br->br_cons_head].bre_ptr = new; } /* @@ -251,33 +373,40 @@ static __inline void * buf_ring_peek(struct buf_ring *br) { - + uint32_t cons_head; #ifdef DEBUG_BUFRING if ((br->br_lock != NULL) && !mtx_owned(br->br_lock)) panic("lock not held on single consumer dequeue"); #endif + cons_head = br->br_cons_head; /* - * I believe it is safe to not have a memory barrier - * here because we control cons and tail is worst case - * a lagging indicator so we worst case we might - * return NULL immediately after a buffer has been enqueued + * for correctness prod_tail must be read before ring[cons_head] */ - if (br->br_cons_head == br->br_prod_tail) + + if (cons_head == ORDERED_LOAD_32(&br->br_prod_tail)) return (NULL); - - return (br->br_ring[br->br_cons_head]); + + /* ensure that the ring load completes before + * exposing it to any destructive updates + */ + return ((void *)br->br_ring[cons_head].bre_ptr); } static __inline int buf_ring_full(struct buf_ring *br) { - + /* br_cons_tail may be stale but the consumer understands that this is + * only a point in time snapshot + */ return (((br->br_prod_head + 1) & br->br_prod_mask) == br->br_cons_tail); } static __inline int buf_ring_empty(struct buf_ring *br) { + /* br_prod_tail may be stale but the consumer understands that this is + * only a point in time snapshot + */ return (br->br_cons_head == br->br_prod_tail); } @@ -285,6 +414,9 @@ static __inline int buf_ring_count(struct buf_ring *br) { + /* br_cons_tail and br_prod_tail may be stale but the consumer + * understands that this is only a point in time snapshot + */ return ((br->br_prod_size + br->br_prod_tail - br->br_cons_tail) & br->br_prod_mask);