Changeset View
Standalone View
sys/sys/buf_ring.h
/*- | /*- | ||||
* Copyright (c) 2007-2009 Kip Macy <kmacy@freebsd.org> | * Copyright (c) 2007-2009, 2015 Kip Macy <kmacy@freebsd.org> | ||||
imp: 2007-2015 is project style.
| |||||
Not Done Inline ActionsHappiness from the anonymous nitpickers society. imp: Happiness from the anonymous nitpickers society. | |||||
* All rights reserved. | * All rights reserved. | ||||
* | * | ||||
* Redistribution and use in source and binary forms, with or without | * Redistribution and use in source and binary forms, with or without | ||||
* modification, are permitted provided that the following conditions | * modification, are permitted provided that the following conditions | ||||
* are met: | * are met: | ||||
* 1. Redistributions of source code must retain the above copyright | * 1. Redistributions of source code must retain the above copyright | ||||
* notice, this list of conditions and the following disclaimer. | * notice, this list of conditions and the following disclaimer. | ||||
* 2. Redistributions in binary form must reproduce the above copyright | * 2. Redistributions in binary form must reproduce the above copyright | ||||
▲ Show 20 Lines • Show All 41 Lines • ▼ Show 20 Lines | struct buf_ring { | ||||
int br_cons_size; | int br_cons_size; | ||||
int br_cons_mask; | int br_cons_mask; | ||||
#ifdef DEBUG_BUFRING | #ifdef DEBUG_BUFRING | ||||
struct mtx *br_lock; | struct mtx *br_lock; | ||||
#endif | #endif | ||||
void *br_ring[0] __aligned(CACHE_LINE_SIZE); | void *br_ring[0] __aligned(CACHE_LINE_SIZE); | ||||
}; | }; | ||||
/* | /* | ||||
* multi-producer safe lock-free ring buffer enqueue | * Multi-producer safe lock-free ring buffer enqueue | ||||
* | * | ||||
* Most architectures do not support the atomic update of multiple | |||||
* discontiguous locations. So it is not possible to atomically update | |||||
* the producer index and ring buffer entry. To side-step this limitation | |||||
* we split update in to 3 steps: | |||||
* 1) atomically acquiring an index | |||||
* 2) updating the corresponding ring entry | |||||
* 3) making the update available to the consumer | |||||
* In order to split the index update in to an acquire and release | |||||
* phase there are _two_ producer indexes. 'prod_head' is used for | |||||
* step 1) and is thus only used by the enqueue itself. 'prod_tail' | |||||
* is used for step 3) to signal to the consumer that the update is | |||||
* complete. To guarantee memory ordering the update of 'prod_tail' is | |||||
* done with a atomic_store_rel_32(...) and the corresponding | |||||
* initial read of 'prod_tail' by the dequeue functions is done with | |||||
* an atomic_load_acq_int(...). | |||||
* | |||||
* Some implementation notes: | |||||
* - Much like a simpler single-producer single consumer ring buffer, | |||||
* the producer can not produce faster than the consumer. Hence the | |||||
* check of 'prod_head' + 1 against 'cons_tail'. | |||||
* - The use of "prod_next = (prod_head + 1) & br->br_prod_mask" to | |||||
* calculate the next index is slightly cheaper than a modulo but | |||||
* requires the ring to be power-of-2 sized. | |||||
* - The critical_enter() / critical_exit() are a performance optimization. | |||||
* They are not required for correctness. They prevent updates from | |||||
* stalling by having a producer be preempted after updating 'prod_head' | |||||
* but before updating 'prod_tail'. | |||||
* - The "while (atomic_load_acq_32(&br->br_prod_tail) != prod_head)" | |||||
* check assures in order completion (probably not strictly necessary, | |||||
* but makes it easier to reason about) and allows us to update | |||||
* 'prod_tail' without a cmpxchg / LOCK prefix. | |||||
* | |||||
Not Done Inline ActionsAs of this week, you no longer need this. The excessive, i.e., unnecessary, ordering by load_acq on x86 no longer exists. alc: As of this week, you no longer need this. The excessive, i.e., unnecessary, ordering by… | |||||
*/ | */ | ||||
static __inline int | static __inline int | ||||
buf_ring_enqueue(struct buf_ring *br, void *buf) | buf_ring_enqueue(struct buf_ring *br, void *buf) | ||||
{ | { | ||||
uint32_t prod_head, prod_next, cons_tail; | uint32_t prod_head, prod_next, cons_tail; | ||||
#ifdef DEBUG_BUFRING | #ifdef DEBUG_BUFRING | ||||
int i; | int i; | ||||
for (i = br->br_cons_head; i != br->br_prod_head; | for (i = br->br_cons_head; i != br->br_prod_head; | ||||
i = ((i + 1) & br->br_cons_mask)) | i = ((i + 1) & br->br_cons_mask)) | ||||
if(br->br_ring[i] == buf) | if(br->br_ring[i] == buf) | ||||
panic("buf=%p already enqueue at %d prod=%d cons=%d", | panic("buf=%p already enqueue at %d prod=%d cons=%d", | ||||
buf, i, br->br_prod_tail, br->br_cons_tail); | buf, i, br->br_prod_tail, br->br_cons_tail); | ||||
#endif | #endif | ||||
critical_enter(); | critical_enter(); | ||||
cons_tail = atomic_load_acq_32(&br->br_cons_tail); | |||||
Not Done Inline Actions_acq_32 rpaulo: _acq_32 | |||||
do { | do { | ||||
/* br_cons_tail is up to date by virtue of previous memory | |||||
* barrier or the subsequent atomic_cmpset_acq_32 | |||||
Not Done Inline ActionsThis is the part that I don't understand: cons_tail and prod_head don't share a cache line, so how can the read memory barrier help? rpaulo: This is the part that I don't understand: cons_tail and prod_head don't share a cache line, so… | |||||
Not Done Inline ActionsSo this bit of confusion leads me to believe that the implied specificity of FreeBSD's acquire / release API is a bit of sophistry that actually falls short of the more straightforward rmb()/wmb()/mb() in Linux. So I know of no architecture where memory barriers apply only to specific cache lines. Can you name one? kmacy: So this bit of confusion leads me to believe that the implied specificity of FreeBSD's acquire… | |||||
Not Done Inline ActionsActually, you're right and this seems to work as intended. rpaulo: Actually, you're right and this seems to work as intended. | |||||
*/ | |||||
cons_tail = br->br_cons_tail; | |||||
prod_head = br->br_prod_head; | prod_head = br->br_prod_head; | ||||
Not Done Inline ActionsSimilar race to my previous comment. We must order this read or risk invalidly storing a value. sbahra_repnop.org: Similar race to my previous comment. We must order this read or risk invalidly storing a value. | |||||
prod_next = (prod_head + 1) & br->br_prod_mask; | prod_next = (prod_head + 1) & br->br_prod_mask; | ||||
cons_tail = br->br_cons_tail; | |||||
if (prod_next == cons_tail) { | if (prod_next == cons_tail) { | ||||
rmb(); | /* This case should be rare - so the overhead of the redundant | ||||
if (prod_head == br->br_prod_head && | * memory barrier is OK. | ||||
cons_tail == br->br_cons_tail) { | */ | ||||
if (prod_head == atomic_load_acq_32(&br->br_prod_head) && | |||||
cons_tail == atomic_load_acq_32(&br->br_cons_tail)) { | |||||
br->br_drops++; | br->br_drops++; | ||||
critical_exit(); | critical_exit(); | ||||
return (ENOBUFS); | return (ENOBUFS); | ||||
} | } | ||||
continue; | continue; | ||||
} | } | ||||
} while (!atomic_cmpset_acq_int(&br->br_prod_head, prod_head, prod_next)); | } while (!atomic_cmpset_acq_32(&br->br_prod_head, prod_head, prod_next)); | ||||
#ifdef DEBUG_BUFRING | #ifdef DEBUG_BUFRING | ||||
Not Done Inline ActionsIs there a release operation on &br->br_prod_head anywhere in this code? I didn't see one. So, I'm not sure what this is intended to order. alc: Is there a release operation on &br->br_prod_head anywhere in this code? I didn't see one. So… | |||||
if (br->br_ring[prod_head] != NULL) | if (br->br_ring[prod_head] != NULL) | ||||
panic("dangling value in enqueue"); | panic("dangling value in enqueue"); | ||||
#endif | #endif | ||||
br->br_ring[prod_head] = buf; | br->br_ring[prod_head] = buf; | ||||
/* | /* | ||||
* If there are other enqueues in progress | * If there are other enqueues in progress | ||||
* that preceeded us, we need to wait for them | * that preceeded us, we need to wait for them | ||||
* to complete | * to complete | ||||
*/ | */ | ||||
while (br->br_prod_tail != prod_head) | while (atomic_load_acq_32(&br->br_prod_tail) != prod_head) | ||||
cpu_spinwait(); | cpu_spinwait(); | ||||
atomic_store_rel_int(&br->br_prod_tail, prod_next); | atomic_store_rel_32(&br->br_prod_tail, prod_next); | ||||
critical_exit(); | critical_exit(); | ||||
return (0); | return (0); | ||||
} | } | ||||
/* | /* | ||||
* multi-consumer safe dequeue | * multi-consumer safe dequeue | ||||
* | * | ||||
*/ | */ | ||||
static __inline void * | static __inline void * | ||||
buf_ring_dequeue_mc(struct buf_ring *br) | buf_ring_dequeue_mc(struct buf_ring *br) | ||||
{ | { | ||||
uint32_t cons_head, cons_next; | uint32_t cons_head, cons_next; | ||||
void *buf; | void *buf; | ||||
critical_enter(); | critical_enter(); | ||||
cons_head = atomic_load_acq_32(&br->br_cons_head); | |||||
do { | do { | ||||
/* subsequent reads should have memory ordering guaranteed by | |||||
* the atomic_cmpset_acq_32 below | |||||
*/ | |||||
cons_head = br->br_cons_head; | cons_head = br->br_cons_head; | ||||
cons_next = (cons_head + 1) & br->br_cons_mask; | |||||
if (cons_head == br->br_prod_tail) { | if (cons_head == br->br_prod_tail) { | ||||
Not Done Inline ActionsIf br_prod_tail is re-ordered prior to load of br_cons_head, bad things will happen. Let's assume (a, b) = (cons_head, prod_tail). Initial state: (0, 0), indicating empty. T0: Producer transitions us from (0,0) to (0, 1). This scenario was also confirmed with fault injection on Power (RMO). sbahra_repnop.org: If br_prod_tail is re-ordered prior to load of br_cons_head, bad things will happen. Let's… | |||||
critical_exit(); | critical_exit(); | ||||
return (NULL); | return (NULL); | ||||
} | } | ||||
} while (!atomic_cmpset_acq_int(&br->br_cons_head, cons_head, cons_next)); | cons_next = (cons_head + 1) & br->br_cons_mask; | ||||
} while (!atomic_cmpset_acq_32(&br->br_cons_head, cons_head, cons_next)); | |||||
buf = br->br_ring[cons_head]; | buf = br->br_ring[cons_head]; | ||||
#ifdef DEBUG_BUFRING | #ifdef DEBUG_BUFRING | ||||
br->br_ring[cons_head] = NULL; | br->br_ring[cons_head] = NULL; | ||||
#endif | #endif | ||||
/* | /* | ||||
* If there are other dequeues in progress | * If there are other dequeues in progress | ||||
Not Done Inline ActionsThat is guaranteed by the release semantics on the store in line 252. Preceding (by program order) loads as well as stores are ordered by a release store. alc: That is guaranteed by the release semantics on the store in line 252. Preceding (by program… | |||||
* that preceeded us, we need to wait for them | * that preceeded us, we need to wait for them | ||||
* to complete | * to complete | ||||
*/ | */ | ||||
while (br->br_cons_tail != cons_head) | while (atomic_load_acq_32(&br->br_cons_tail) != cons_head) | ||||
cpu_spinwait(); | cpu_spinwait(); | ||||
atomic_store_rel_int(&br->br_cons_tail, cons_next); | atomic_store_rel_32(&br->br_cons_tail, cons_next); | ||||
critical_exit(); | critical_exit(); | ||||
return (buf); | return (buf); | ||||
} | } | ||||
/* | /* | ||||
Not Done Inline ActionsThis should be under #else, right? rpaulo: This should be under #else, right? | |||||
* single-consumer dequeue | * single-consumer dequeue | ||||
* use where dequeue is protected by a lock | * use where dequeue is protected by a lock | ||||
* e.g. a network driver's tx queue lock | * e.g. a network driver's tx queue lock | ||||
*/ | */ | ||||
static __inline void * | static __inline void * | ||||
buf_ring_dequeue_sc(struct buf_ring *br) | buf_ring_dequeue_sc(struct buf_ring *br) | ||||
{ | { | ||||
uint32_t cons_head, cons_next; | uint32_t cons_head, cons_next; | ||||
#ifdef PREFETCH_DEFINED | #ifdef PREFETCH_DEFINED | ||||
uint32_t cons_next_next; | uint32_t cons_next_next; | ||||
#endif | #endif | ||||
uint32_t prod_tail; | uint32_t prod_tail; | ||||
void *buf; | void *buf; | ||||
prod_tail = atomic_load_acq_32(&br->br_prod_tail); | |||||
cons_head = br->br_cons_head; | cons_head = br->br_cons_head; | ||||
prod_tail = br->br_prod_tail; | |||||
cons_next = (cons_head + 1) & br->br_cons_mask; | |||||
#ifdef PREFETCH_DEFINED | |||||
cons_next_next = (cons_head + 2) & br->br_cons_mask; | |||||
#endif | |||||
if (cons_head == prod_tail) | if (cons_head == prod_tail) | ||||
return (NULL); | return (NULL); | ||||
cons_next = (cons_head + 1) & br->br_cons_mask; | |||||
#ifdef PREFETCH_DEFINED | #ifdef PREFETCH_DEFINED | ||||
cons_next_next = (cons_head + 2) & br->br_cons_mask; | |||||
if (cons_next != prod_tail) { | if (cons_next != prod_tail) { | ||||
prefetch(br->br_ring[cons_next]); | prefetch(br->br_ring[cons_next]); | ||||
if (cons_next_next != prod_tail) | if (cons_next_next != prod_tail) | ||||
prefetch(br->br_ring[cons_next_next]); | prefetch(br->br_ring[cons_next_next]); | ||||
} | } | ||||
#endif | #endif | ||||
br->br_cons_head = cons_next; | br->br_cons_head = cons_next; | ||||
buf = br->br_ring[cons_head]; | buf = br->br_ring[cons_head]; | ||||
#ifdef DEBUG_BUFRING | #ifdef DEBUG_BUFRING | ||||
br->br_ring[cons_head] = NULL; | br->br_ring[cons_head] = NULL; | ||||
Not Done Inline ActionsSame comment as before. alc: Same comment as before. | |||||
if (!mtx_owned(br->br_lock)) | if (!mtx_owned(br->br_lock)) | ||||
panic("lock not held on single consumer dequeue"); | panic("lock not held on single consumer dequeue"); | ||||
if (br->br_cons_tail != cons_head) | if (br->br_cons_tail != cons_head) | ||||
panic("inconsistent list cons_tail=%d cons_head=%d", | panic("inconsistent list cons_tail=%d cons_head=%d", | ||||
br->br_cons_tail, cons_head); | br->br_cons_tail, cons_head); | ||||
#endif | #endif | ||||
br->br_cons_tail = cons_next; | /* do atomic_store_rel_32 to assure that producer has | ||||
* up to date view | |||||
*/ | |||||
atomic_store_rel_32(&br->br_cons_tail, cons_next); | |||||
return (buf); | return (buf); | ||||
} | } | ||||
/* | /* | ||||
* single-consumer advance after a peek | * single-consumer advance after a peek | ||||
* use where it is protected by a lock | * use where it is protected by a lock | ||||
* e.g. a network driver's tx queue lock | * e.g. a network driver's tx queue lock | ||||
*/ | */ | ||||
static __inline void | static __inline void | ||||
buf_ring_advance_sc(struct buf_ring *br) | buf_ring_advance_sc(struct buf_ring *br) | ||||
{ | { | ||||
uint32_t cons_head, cons_next; | uint32_t cons_head, cons_next; | ||||
uint32_t prod_tail; | uint32_t prod_tail; | ||||
cons_head = br->br_cons_head; | cons_head = br->br_cons_head; | ||||
prod_tail = br->br_prod_tail; | prod_tail = atomic_load_acq_32(&br->br_prod_tail); | ||||
cons_next = (cons_head + 1) & br->br_cons_mask; | cons_next = (cons_head + 1) & br->br_cons_mask; | ||||
if (cons_head == prod_tail) | if (cons_head == prod_tail) | ||||
return; | return; | ||||
br->br_cons_head = cons_next; | br->br_cons_head = cons_next; | ||||
#ifdef DEBUG_BUFRING | #ifdef DEBUG_BUFRING | ||||
br->br_ring[cons_head] = NULL; | br->br_ring[cons_head] = NULL; | ||||
#endif | #endif | ||||
br->br_cons_tail = cons_next; | atomic_store_rel_32(&br->br_cons_tail, cons_next); | ||||
} | } | ||||
/* | /* | ||||
* Used to return a buffer (most likely already there) | * Used to return a buffer (most likely already there) | ||||
* to the top od the ring. The caller should *not* | * to the top od the ring. The caller should *not* | ||||
* have used any dequeue to pull it out of the ring | * have used any dequeue to pull it out of the ring | ||||
* but instead should have used the peek() function. | * but instead should have used the peek() function. | ||||
* This is normally used where the transmit queue | * This is normally used where the transmit queue | ||||
* of a driver is full, and an mubf must be returned. | * of a driver is full, and an mubf must be returned. | ||||
Not Done Inline ActionsSame comment as before. alc: Same comment as before.
| |||||
* Most likely whats in the ring-buffer is what | * Most likely whats in the ring-buffer is what | ||||
* is being put back (since it was not removed), but | * is being put back (since it was not removed), but | ||||
* sometimes the lower transmit function may have | * sometimes the lower transmit function may have | ||||
* done a pullup or other function that will have | * done a pullup or other function that will have | ||||
* changed it. As an optimzation we always put it | * changed it. As an optimzation we always put it | ||||
* back (since jhb says the store is probably cheaper), | * back (since jhb says the store is probably cheaper), | ||||
* if we have to do a multi-queue version we will need | * if we have to do a multi-queue version we will need | ||||
* the compare and an atomic. | * the compare and an atomic. | ||||
Show All 14 Lines | |||||
static __inline void * | static __inline void * | ||||
buf_ring_peek(struct buf_ring *br) | buf_ring_peek(struct buf_ring *br) | ||||
{ | { | ||||
#ifdef DEBUG_BUFRING | #ifdef DEBUG_BUFRING | ||||
if ((br->br_lock != NULL) && !mtx_owned(br->br_lock)) | if ((br->br_lock != NULL) && !mtx_owned(br->br_lock)) | ||||
panic("lock not held on single consumer dequeue"); | panic("lock not held on single consumer dequeue"); | ||||
#endif | #endif | ||||
/* | |||||
* I believe it is safe to not have a memory barrier | if (br->br_cons_head == atomic_load_acq_32(&br->br_prod_tail)) | ||||
* here because we control cons and tail is worst case | |||||
* a lagging indicator so we worst case we might | |||||
* return NULL immediately after a buffer has been enqueued | |||||
*/ | |||||
if (br->br_cons_head == br->br_prod_tail) | |||||
return (NULL); | return (NULL); | ||||
return (br->br_ring[br->br_cons_head]); | return (br->br_ring[br->br_cons_head]); | ||||
} | } | ||||
static __inline int | static __inline int | ||||
buf_ring_full(struct buf_ring *br) | buf_ring_full(struct buf_ring *br) | ||||
{ | { | ||||
/* br_cons_tail may be stale but the consumer understands that this is | |||||
* only a point in time snapshot | |||||
*/ | |||||
return (((br->br_prod_head + 1) & br->br_prod_mask) == br->br_cons_tail); | return (((br->br_prod_head + 1) & br->br_prod_mask) == br->br_cons_tail); | ||||
} | } | ||||
static __inline int | static __inline int | ||||
buf_ring_empty(struct buf_ring *br) | buf_ring_empty(struct buf_ring *br) | ||||
{ | { | ||||
/* br_prod_tail may be stale but the consumer understands that this is | |||||
* only a point in time snapshot | |||||
*/ | |||||
return (br->br_cons_head == br->br_prod_tail); | return (br->br_cons_head == br->br_prod_tail); | ||||
} | } | ||||
static __inline int | static __inline int | ||||
buf_ring_count(struct buf_ring *br) | buf_ring_count(struct buf_ring *br) | ||||
{ | { | ||||
/* br_cons_tail and br_prod_tail may be stale but the consumer | |||||
* understands that this is only a point in time snapshot | |||||
*/ | |||||
return ((br->br_prod_size + br->br_prod_tail - br->br_cons_tail) | return ((br->br_prod_size + br->br_prod_tail - br->br_cons_tail) | ||||
& br->br_prod_mask); | & br->br_prod_mask); | ||||
} | } | ||||
struct buf_ring *buf_ring_alloc(int count, struct malloc_type *type, int flags, | struct buf_ring *buf_ring_alloc(int count, struct malloc_type *type, int flags, | ||||
struct mtx *); | struct mtx *); | ||||
void buf_ring_free(struct buf_ring *br, struct malloc_type *type); | void buf_ring_free(struct buf_ring *br, struct malloc_type *type); | ||||
#endif | #endif |
2007-2015 is project style.