Index: sys/sys/buf_ring.h =================================================================== --- sys/sys/buf_ring.h +++ sys/sys/buf_ring.h @@ -166,36 +166,12 @@ uint32_t prod_tail; void *buf; - /* - * This is a workaround to allow using buf_ring on ARM and ARM64. - * ARM64TODO: Fix buf_ring in a generic way. - * REMARKS: It is suspected that br_cons_head does not require - * load_acq operation, but this change was extensively tested - * and confirmed it's working. To be reviewed once again in - * FreeBSD-12. - * - * Preventing following situation: - - * Core(0) - buf_ring_enqueue() Core(1) - buf_ring_dequeue_sc() - * ----------------------------------------- ---------------------------------------------- - * - * cons_head = br->br_cons_head; - * atomic_cmpset_acq_32(&br->br_prod_head, ...)); - * buf = br->br_ring[cons_head]; > - * br->br_ring[prod_head] = buf; - * atomic_store_rel_32(&br->br_prod_tail, ...); - * prod_tail = br->br_prod_tail; - * if (cons_head == prod_tail) - * return (NULL); - * ` - * - * <1> Load (on core 1) from br->br_ring[cons_head] can be reordered (speculative readed) by CPU. - */ -#if defined(__arm__) || defined(__aarch64__) - cons_head = atomic_load_acq_32(&br->br_cons_head); -#else - cons_head = br->br_cons_head; +#ifdef DEBUG_BUFRING + if (!mtx_owned(br->br_lock)) + panic("lock not held on single consumer dequeue"); #endif + + cons_head = br->br_cons_head; prod_tail = atomic_load_acq_32(&br->br_prod_tail); cons_next = (cons_head + 1) & br->br_cons_mask; @@ -218,8 +194,6 @@ #ifdef DEBUG_BUFRING br->br_ring[cons_head] = NULL; - if (!mtx_owned(br->br_lock)) - panic("lock not held on single consumer dequeue"); if (br->br_cons_tail != cons_head) panic("inconsistent list cons_tail=%d cons_head=%d", br->br_cons_tail, cons_head); @@ -284,61 +258,48 @@ static __inline void * buf_ring_peek(struct buf_ring *br) { + uint32_t cons_head, prod_tail; #ifdef DEBUG_BUFRING if ((br->br_lock != NULL) && !mtx_owned(br->br_lock)) panic("lock not held on single consumer dequeue"); #endif - /* - * I believe it is safe to not have a memory barrier - * here because we control cons and tail is worst case - * a lagging indicator so we worst case we might - * return NULL immediately after a buffer has been enqueued - */ - if (br->br_cons_head == br->br_prod_tail) + + cons_head = br->br_cons_head; + prod_tail = atomic_load_acq_32(&br->br_prod_tail); + + if (cons_head == prod_tail) return (NULL); - return (br->br_ring[br->br_cons_head]); + return (br->br_ring[cons_head]); } static __inline void * buf_ring_peek_clear_sc(struct buf_ring *br) { -#ifdef DEBUG_BUFRING + uint32_t cons_head, prod_tail; void *ret; +#ifdef DEBUG_BUFRING if (!mtx_owned(br->br_lock)) panic("lock not held on single consumer dequeue"); -#endif +#endif - if (br->br_cons_head == br->br_prod_tail) - return (NULL); + cons_head = br->br_cons_head; + prod_tail = atomic_load_acq_32(&br->br_prod_tail); -#if defined(__arm__) || defined(__aarch64__) - /* - * The barrier is required there on ARM and ARM64 to ensure, that - * br->br_ring[br->br_cons_head] will not be fetched before the above - * condition is checked. - * Without the barrier, it is possible, that buffer will be fetched - * before the enqueue will put mbuf into br, then, in the meantime, the - * enqueue will update the array and the br_prod_tail, and the - * conditional check will be true, so we will return previously fetched - * (and invalid) buffer. - */ - atomic_thread_fence_acq(); -#endif + if (cons_head == prod_tail) + return (NULL); + ret = br->br_ring[cons_head]; #ifdef DEBUG_BUFRING /* * Single consumer, i.e. cons_head will not move while we are * running, so atomic_swap_ptr() is not necessary here. */ - ret = br->br_ring[br->br_cons_head]; - br->br_ring[br->br_cons_head] = NULL; - return (ret); -#else - return (br->br_ring[br->br_cons_head]); + br->br_ring[cons_head] = NULL; #endif + return (ret); } static __inline int