Index: sys/sys/buf_ring.h =================================================================== --- sys/sys/buf_ring.h +++ sys/sys/buf_ring.h @@ -75,35 +75,69 @@ #endif critical_enter(); do { - prod_head = br->br_prod_head; + /* + * load_acq(prod_head) preceding load(cons_tail) is + * mandatory: cons_tail MUST be not 'older' than prod_head. + * Otherwise following can happen: + * a) initial state: cons_tail: 1 prod_head: 0 ring is full. + * b) cpu0: reads cons_tail: 1 + * c) cpu1: runs consumer and sets cons_tail: 2 + * d) cpu1: runs producer and sets prod_head: 1 + * ring is full again. + * e) cpu0: reads prod_head: 1, 'ring is full' check will fail. + * f) cpu0: cmpset(prod_head) will be successful. + * g) ring is corrupted. + */ + prod_head = atomic_load_acq_32(&br->br_prod_head); prod_next = (prod_head + 1) & br->br_prod_mask; - cons_tail = br->br_cons_tail; - - if (prod_next == cons_tail) { - rmb(); - if (prod_head == br->br_prod_head && - cons_tail == br->br_cons_tail) { - br->br_drops++; - critical_exit(); - return (ENOBUFS); - } - continue; + /* + * load_acq(cons_tail) is required: thus we synchronize with + * consumers. + */ + cons_tail = atomic_load_acq_32(&br->br_cons_tail); + + if (prod_next == cons_tail) { /* ring is full */ + /* + * prod_head can be 'older' than cons_tail (see above), + * so this can be false positive 'ring is full' error. + * check it. + */ + if (prod_head != atomic_load_acq_32(&br->br_prod_head)) + continue; + br->br_drops++; + critical_exit(); + return (ENOBUFS); } - } while (!atomic_cmpset_acq_int(&br->br_prod_head, prod_head, prod_next)); + } while (!atomic_cmpset_32(&br->br_prod_head, prod_head, prod_next)); #ifdef DEBUG_BUFRING if (br->br_ring[prod_head] != NULL) panic("dangling value in enqueue"); #endif br->br_ring[prod_head] = buf; - /* - * If there are other enqueues in progress - * that preceded us, we need to wait for them - * to complete + * If there are other enqueues in progress that preceded us, we need + * to wait for them to complete. + * load_acq(prod_tail) is required for proper synchronization chain + * among consumer and multiple producers. + * Consider following scenario: + * a) initial state: prod_head,prod_tail,cons_head,cons_tail are 0. + * ring is empty. + * b) cpu0: runs producer, grabs prod_head: 0 and sets prod_head: 1 + * c) cpu1: runs producer, grabs prod_head: 1 and sets prod_head: 2 + * d) cpu1: stores data to ring[1] and waits for prod_tail: 1 + * e) cpu0: stores data to ring[0], sets prod_tail: 1 + * f) cpu1: sees prod_tail: 1. If load(prod_tail) was done without _acq + * barrier there is no guarantee that store to ring[0] is visible. + * g) cpu1: executes store_rel(prod_tail: 2) + * h) cpu2: runs consumer, executes load_acq(prod_tail) and sees + * prod_tail: 2. acq/rel semantic in consumer/producer guarantees + * consistent read from ring[1] and _does not guarantee_ consistent + * read from ring[0] (if we had unordered load(prod_tail) in f). + * i) cpu2: reads junk from ring[0] */ - while (br->br_prod_tail != prod_head) + while (atomic_load_acq_32(&br->br_prod_tail) != prod_head) cpu_spinwait(); - atomic_store_rel_int(&br->br_prod_tail, prod_next); + atomic_store_rel_32(&br->br_prod_tail, prod_next); critical_exit(); return (0); } @@ -115,19 +149,45 @@ static __inline void * buf_ring_dequeue_mc(struct buf_ring *br) { - uint32_t cons_head, cons_next; + uint32_t cons_head, cons_next, prod_tail; void *buf; critical_enter(); do { - cons_head = br->br_cons_head; - cons_next = (cons_head + 1) & br->br_cons_mask; - - if (cons_head == br->br_prod_tail) { + /* + * load_acq(cons_head) preceding load(prod_tail) is + * mandatory: prod_tail MUST be not 'older' than cons_head. + * Otherwise following can happen: + * a) initial state: prod_tail: 0 cons_head: 0 ring is empty. + * b) cpu0: reads prod_tail: 0 + * c) cpu1: runs producer and sets prod_tail: 1 + * d) cpu1: runs consumer and sets cons_head: 1 + * ring is empty again. + * e) cpu0: reads cons_head: 1, 'ring is empty' check will fail. + * f) cpu0: cmpset(cons_head) will be successful. + * g) cpu0: reads junk from empty ring + */ + cons_head = atomic_load_acq_32(&br->br_cons_head); + /* + * load_acq(prod_tail) is required: thus we synchronize with + * producers and later br->br_ring[cons_head] load will be + * consistent. + */ + prod_tail = atomic_load_acq_32(&br->br_prod_tail); + + if (cons_head == prod_tail) { /* ring is empty */ + /* + * Since cons_head can be 'older' than prod_tail this + * can be false positive 'ring is empty'. + * We can check it here or just wait for next + * buf_ring_dequeue_mc() call. + */ critical_exit(); return (NULL); } - } while (!atomic_cmpset_acq_int(&br->br_cons_head, cons_head, cons_next)); + + cons_next = (cons_head + 1) & br->br_cons_mask; + } while (!atomic_cmpset_32(&br->br_cons_head, cons_head, cons_next)); buf = br->br_ring[cons_head]; #ifdef DEBUG_BUFRING @@ -140,10 +200,8 @@ */ while (br->br_cons_tail != cons_head) cpu_spinwait(); - - atomic_store_rel_int(&br->br_cons_tail, cons_next); + atomic_store_rel_32(&br->br_cons_tail, cons_next); critical_exit(); - return (buf); } @@ -155,62 +213,41 @@ static __inline void * buf_ring_dequeue_sc(struct buf_ring *br) { - uint32_t cons_head, cons_next; + uint32_t cons_head, cons_next, prod_tail; #ifdef PREFETCH_DEFINED uint32_t cons_next_next; #endif - uint32_t prod_tail; void *buf; /* - * This is a workaround to allow using buf_ring on ARM and ARM64. - * ARM64TODO: Fix buf_ring in a generic way. - * REMARKS: It is suspected that br_cons_head does not require - * load_acq operation, but this change was extensively tested - * and confirmed it's working. To be reviewed once again in - * FreeBSD-12. - * - * Preventing following situation: - - * Core(0) - buf_ring_enqueue() Core(1) - buf_ring_dequeue_sc() - * ----------------------------------------- ---------------------------------------------- - * - * cons_head = br->br_cons_head; - * atomic_cmpset_acq_32(&br->br_prod_head, ...)); - * buf = br->br_ring[cons_head]; > - * br->br_ring[prod_head] = buf; - * atomic_store_rel_32(&br->br_prod_tail, ...); - * prod_tail = br->br_prod_tail; - * if (cons_head == prod_tail) - * return (NULL); - * ` - * - * <1> Load (on core 1) from br->br_ring[cons_head] can be reordered (speculative readed) by CPU. - */ -#if defined(__arm__) || defined(__aarch64__) - cons_head = atomic_load_acq_32(&br->br_cons_head); -#else - cons_head = br->br_cons_head; -#endif + * buf_ring_dequeue_mc() fatal scenario (see above) is not possible + * for single consumer. So we read prod_tail first in order to + * avoid false positive 'ring is empty' check. + * load_acq(prod_tail) is necessary for later br->br_ring[cons_head] + * load. + */ prod_tail = atomic_load_acq_32(&br->br_prod_tail); - - cons_next = (cons_head + 1) & br->br_cons_mask; -#ifdef PREFETCH_DEFINED - cons_next_next = (cons_head + 2) & br->br_cons_mask; -#endif + cons_head = br->br_cons_head; if (cons_head == prod_tail) return (NULL); -#ifdef PREFETCH_DEFINED + cons_next = (cons_head + 1) & br->br_cons_mask; +#ifdef PREFETCH_DEFINED if (cons_next != prod_tail) { prefetch(br->br_ring[cons_next]); + cons_next_next = (cons_head + 2) & br->br_cons_mask; if (cons_next_next != prod_tail) prefetch(br->br_ring[cons_next_next]); } #endif - br->br_cons_head = cons_next; + /* + * The order and atomicity of next two operations are unimportant: + * store(cons_head) is protected by lock (single consumer case), + * load(ring[cons_head]) is ordered by later store_rel(cons_tail). + */ buf = br->br_ring[cons_head]; + br->br_cons_head = cons_next; #ifdef DEBUG_BUFRING br->br_ring[cons_head] = NULL; @@ -220,7 +257,7 @@ panic("inconsistent list cons_tail=%d cons_head=%d", br->br_cons_tail, cons_head); #endif - br->br_cons_tail = cons_next; + atomic_store_rel_32(&br->br_cons_tail, cons_next); return (buf); } @@ -233,19 +270,25 @@ buf_ring_advance_sc(struct buf_ring *br) { uint32_t cons_head, cons_next; - uint32_t prod_tail; + /* + * atomic_load_acq_32(&br->br_prod_tail) is superfluous here. + * (actually it was done in preceding buf_ring_peek) + */ cons_head = br->br_cons_head; - prod_tail = br->br_prod_tail; - - cons_next = (cons_head + 1) & br->br_cons_mask; - if (cons_head == prod_tail) + if (cons_head == br->br_prod_tail) { +#ifdef DEBUG_BUFRING + panic("advance on empty ring. no previous peek?"); +#endif return; + } + + cons_next = (cons_head + 1) & br->br_cons_mask; br->br_cons_head = cons_next; #ifdef DEBUG_BUFRING br->br_ring[cons_head] = NULL; #endif - br->br_cons_tail = cons_next; + atomic_store_rel_32(&br->br_cons_tail, cons_next); } /* @@ -280,48 +323,46 @@ static __inline void * buf_ring_peek(struct buf_ring *br) { + uint32_t cons_head, prod_tail; #ifdef DEBUG_BUFRING if ((br->br_lock != NULL) && !mtx_owned(br->br_lock)) panic("lock not held on single consumer dequeue"); #endif - /* - * I believe it is safe to not have a memory barrier - * here because we control cons and tail is worst case - * a lagging indicator so we worst case we might - * return NULL immediately after a buffer has been enqueued - */ - if (br->br_cons_head == br->br_prod_tail) + /* see buf_ring_dequeue_sc() comment */ + prod_tail = atomic_load_acq_32(&br->br_prod_tail); + cons_head = br->br_cons_head; + + if (cons_head == prod_tail) return (NULL); - return (br->br_ring[br->br_cons_head]); + return (br->br_ring[cons_head]); } static __inline void * buf_ring_peek_clear_sc(struct buf_ring *br) { + uint32_t cons_head, prod_tail; + #ifdef DEBUG_BUFRING void *ret; if (!mtx_owned(br->br_lock)) panic("lock not held on single consumer dequeue"); #endif - /* - * I believe it is safe to not have a memory barrier - * here because we control cons and tail is worst case - * a lagging indicator so we worst case we might - * return NULL immediately after a buffer has been enqueued - */ - if (br->br_cons_head == br->br_prod_tail) - return (NULL); + /* see buf_ring_dequeue_sc() comment */ + prod_tail = atomic_load_acq_32(&br->br_prod_tail); + cons_head = br->br_cons_head; + if (cons_head == prod_tail) + return (NULL); #ifdef DEBUG_BUFRING /* * Single consumer, i.e. cons_head will not move while we are * running, so atomic_swap_ptr() is not necessary here. */ - ret = br->br_ring[br->br_cons_head]; - br->br_ring[br->br_cons_head] = NULL; + ret = br->br_ring[cons_head]; + br->br_ring[cons_head] = NULL; return (ret); #else return (br->br_ring[br->br_cons_head]); @@ -354,6 +395,4 @@ struct mtx *); void buf_ring_free(struct buf_ring *br, struct malloc_type *type); - - #endif