Index: sys/sys/buf_ring.h =================================================================== --- sys/sys/buf_ring.h +++ sys/sys/buf_ring.h @@ -1,5 +1,5 @@ /*- - * Copyright (c) 2007-2009 Kip Macy + * Copyright (c) 2007-2009, 2015 Kip Macy * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -58,7 +58,39 @@ }; /* - * multi-producer safe lock-free ring buffer enqueue + * Multi-producer safe lock-free ring buffer enqueue + * + * Most architectures do not support the atomic update of multiple + * discontiguous locations. So it is not possible to atomically update + * the producer index and ring buffer entry. To side-step this limitation + * we split update in to 3 steps: + * 1) atomically acquiring an index + * 2) updating the corresponding ring entry + * 3) making the update available to the consumer + * In order to split the index update in to an acquire and release + * phase there are _two_ producer indexes. 'prod_head' is used for + * step 1) and is thus only used by the enqueue itself. 'prod_tail' + * is used for step 3) to signal to the consumer that the update is + * complete. To guarantee memory ordering the update of 'prod_tail' is + * done with a atomic_store_rel_32(...) and the corresponding + * initial read of 'prod_tail' by the dequeue functions is done with + * an atomic_load_acq_int(...). + * + * Some implementation notes: + * - Much like a simpler single-producer single consumer ring buffer, + * the producer can not produce faster than the consumer. Hence the + * check of 'prod_head' + 1 against 'cons_tail'. + * - The use of "prod_next = (prod_head + 1) & br->br_prod_mask" to + * calculate the next index is slightly cheaper than a modulo but + * requires the ring to be power-of-2 sized. + * - The critical_enter() / critical_exit() are a performance optimization. + * They are not required for correctness. They prevent updates from + * stalling by having a producer be preempted after updating 'prod_head' + * but before updating 'prod_tail'. + * - The "while (atomic_load_acq_32(&br->br_prod_tail) != prod_head)" + * check assures in order completion (probably not strictly necessary, + * but makes it easier to reason about) and allows us to update + * 'prod_tail' without a cmpxchg / LOCK prefix. * */ static __inline int @@ -74,22 +106,28 @@ buf, i, br->br_prod_tail, br->br_cons_tail); #endif critical_enter(); + cons_tail = atomic_load_acq_32(&br->br_cons_tail); do { + /* br_cons_tail is up to date by virtue of previous memory + * barrier or the subsequent atomic_cmpset_acq_32 + */ + cons_tail = br->br_cons_tail; prod_head = br->br_prod_head; prod_next = (prod_head + 1) & br->br_prod_mask; - cons_tail = br->br_cons_tail; if (prod_next == cons_tail) { - rmb(); - if (prod_head == br->br_prod_head && - cons_tail == br->br_cons_tail) { + /* This case should be rare - so the overhead of the redundant + * memory barrier is OK. + */ + if (prod_head == atomic_load_acq_32(&br->br_prod_head) && + cons_tail == atomic_load_acq_32(&br->br_cons_tail)) { br->br_drops++; critical_exit(); return (ENOBUFS); } continue; } - } while (!atomic_cmpset_acq_int(&br->br_prod_head, prod_head, prod_next)); + } while (!atomic_cmpset_acq_32(&br->br_prod_head, prod_head, prod_next)); #ifdef DEBUG_BUFRING if (br->br_ring[prod_head] != NULL) panic("dangling value in enqueue"); @@ -101,9 +139,9 @@ * that preceeded us, we need to wait for them * to complete */ - while (br->br_prod_tail != prod_head) + while (atomic_load_acq_32(&br->br_prod_tail) != prod_head) cpu_spinwait(); - atomic_store_rel_int(&br->br_prod_tail, prod_next); + atomic_store_rel_32(&br->br_prod_tail, prod_next); critical_exit(); return (0); } @@ -119,15 +157,18 @@ void *buf; critical_enter(); + cons_head = atomic_load_acq_32(&br->br_cons_head); do { + /* subsequent reads should have memory ordering guaranteed by + * the atomic_cmpset_acq_32 below + */ cons_head = br->br_cons_head; - cons_next = (cons_head + 1) & br->br_cons_mask; - if (cons_head == br->br_prod_tail) { critical_exit(); return (NULL); } - } while (!atomic_cmpset_acq_int(&br->br_cons_head, cons_head, cons_next)); + cons_next = (cons_head + 1) & br->br_cons_mask; + } while (!atomic_cmpset_acq_32(&br->br_cons_head, cons_head, cons_next)); buf = br->br_ring[cons_head]; #ifdef DEBUG_BUFRING @@ -138,10 +179,10 @@ * that preceeded us, we need to wait for them * to complete */ - while (br->br_cons_tail != cons_head) + while (atomic_load_acq_32(&br->br_cons_tail) != cons_head) cpu_spinwait(); - atomic_store_rel_int(&br->br_cons_tail, cons_next); + atomic_store_rel_32(&br->br_cons_tail, cons_next); critical_exit(); return (buf); @@ -162,18 +203,14 @@ uint32_t prod_tail; void *buf; + prod_tail = atomic_load_acq_32(&br->br_prod_tail); cons_head = br->br_cons_head; - prod_tail = br->br_prod_tail; + if (cons_head == prod_tail) + return (NULL); cons_next = (cons_head + 1) & br->br_cons_mask; #ifdef PREFETCH_DEFINED cons_next_next = (cons_head + 2) & br->br_cons_mask; -#endif - - if (cons_head == prod_tail) - return (NULL); - -#ifdef PREFETCH_DEFINED if (cons_next != prod_tail) { prefetch(br->br_ring[cons_next]); if (cons_next_next != prod_tail) @@ -191,7 +228,10 @@ panic("inconsistent list cons_tail=%d cons_head=%d", br->br_cons_tail, cons_head); #endif - br->br_cons_tail = cons_next; + /* do atomic_store_rel_32 to assure that producer has + * up to date view + */ + atomic_store_rel_32(&br->br_cons_tail, cons_next); return (buf); } @@ -207,7 +247,7 @@ uint32_t prod_tail; cons_head = br->br_cons_head; - prod_tail = br->br_prod_tail; + prod_tail = atomic_load_acq_32(&br->br_prod_tail); cons_next = (cons_head + 1) & br->br_cons_mask; if (cons_head == prod_tail) @@ -216,7 +256,7 @@ #ifdef DEBUG_BUFRING br->br_ring[cons_head] = NULL; #endif - br->br_cons_tail = cons_next; + atomic_store_rel_32(&br->br_cons_tail, cons_next); } /* @@ -256,13 +296,8 @@ if ((br->br_lock != NULL) && !mtx_owned(br->br_lock)) panic("lock not held on single consumer dequeue"); #endif - /* - * I believe it is safe to not have a memory barrier - * here because we control cons and tail is worst case - * a lagging indicator so we worst case we might - * return NULL immediately after a buffer has been enqueued - */ - if (br->br_cons_head == br->br_prod_tail) + + if (br->br_cons_head == atomic_load_acq_32(&br->br_prod_tail)) return (NULL); return (br->br_ring[br->br_cons_head]); @@ -271,13 +306,18 @@ static __inline int buf_ring_full(struct buf_ring *br) { - + /* br_cons_tail may be stale but the consumer understands that this is + * only a point in time snapshot + */ return (((br->br_prod_head + 1) & br->br_prod_mask) == br->br_cons_tail); } static __inline int buf_ring_empty(struct buf_ring *br) { + /* br_prod_tail may be stale but the consumer understands that this is + * only a point in time snapshot + */ return (br->br_cons_head == br->br_prod_tail); } @@ -285,6 +325,9 @@ static __inline int buf_ring_count(struct buf_ring *br) { + /* br_cons_tail and br_prod_tail may be stale but the consumer + * understands that this is only a point in time snapshot + */ return ((br->br_prod_size + br->br_prod_tail - br->br_cons_tail) & br->br_prod_mask);