Page Menu
Home
FreeBSD
Search
Configure Global Search
Log In
Files
F132631805
D1945.id3938.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
10 KB
Referenced Files
None
Subscribers
None
D1945.id3938.diff
View Options
Index: sys/sys/buf_ring.h
===================================================================
--- sys/sys/buf_ring.h
+++ sys/sys/buf_ring.h
@@ -1,5 +1,5 @@
/*-
- * Copyright (c) 2007-2009 Kip Macy <kmacy@freebsd.org>
+ * Copyright (c) 2007-2015 Kip Macy <kmacy@freebsd.org>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -41,6 +41,9 @@
#include <sys/mutex.h>
#endif
+/* workaround sloppy API that doesn't actually conform to its name */
+#define atomic_load_acq_ptr_workaround(x) (void *)atomic_load_acq_ptr((uintptr_t *)(uintptr_t)(x))
+
struct buf_ring {
volatile uint32_t br_prod_head;
volatile uint32_t br_prod_tail;
@@ -58,7 +61,70 @@
};
/*
- * multi-producer safe lock-free ring buffer enqueue
+ * Multi-producer safe lock-free ring buffer enqueue
+ *
+ * Most architectures do not support the atomic update of multiple
+ * discontiguous locations. So it is not possible to atomically update
+ * the producer index and ring buffer entry. To side-step this limitation
+ * we split update in to 3 steps:
+ * 1) atomically acquiring an index
+ * 2) updating the corresponding ring entry
+ * 3) making the update available to the consumer
+ * In order to split the index update in to an acquire and release
+ * phase there are _two_ producer indexes. 'prod_head' is used for
+ * step 1) and is thus only used by the enqueue itself. 'prod_tail'
+ * is used for step 3) to signal to the consumer that the update is
+ * complete. To guarantee memory ordering the update of 'prod_tail' is
+ * done with a atomic_store_rel_32(...) and the corresponding
+ * initial read of 'prod_tail' by the dequeue functions is done with
+ * an atomic_load_acq_32(...).
+ *
+ * Regarding memory ordering - there are five variables in question:
+ * (br_) prod_head, prod_tail, cons_head, cons_tail, ring[idx={cons, prod}]
+ * It's easiest examine correctness by considering the consequence of
+ * reading a stale value or having an update become visible prior to
+ * preceding writes.
+ *
+ * - prod_head: this is only read by the enqueue routine, if the latter were to
+ * initially read a stale value for it the cmpxchg (atomic_cmpset_acq_32)
+ * would fail. However, the implied memory barrier in cmpxchg would cause the
+ * subsequent read of prod_head to read the up-to-date value permitting the
+ * cmpxchg to succeed the second time.
+ *
+ * - prod_tail: this value is used by dequeue to determine the effective
+ * producer index. The absence of a memory barrier preceding it would make
+ * it possible for dequeue to read the new value prior to the update to
+ * ring[idx] becomings visible, thus reading a stale value in ring[idx].
+ *
+ * - cons_head: this value is used only by dequeue, it is either updated
+ * atomically (dequeue_mc) or protected by a mutex (dequeue_sc).
+ *
+ * - cons_tail: This is used to communicate the latest consumer index between
+ * dequeue and enqueue. Reading a stale value in enqueue can cause an enqueue
+ * to fail erroneously. There is no risk of reading a _new_ value of cons_tail
+ * before br_ring[idx] has been read because the value is not updated until
+ * following the read.
+ *
+ * - ring[idx] : Updates to this value need to reach memory before the subsequent
+ * update to prod_tail does.
+ *
+ * Some implementation notes:
+ * - Much like a simpler single-producer single consumer ring buffer,
+ * the producer can not produce faster than the consumer. Hence the
+ * check of 'prod_head' + 1 against 'cons_tail'.
+ *
+ * - The use of "prod_next = (prod_head + 1) & br->br_prod_mask" to
+ * calculate the next index is slightly cheaper than a modulo but
+ * requires the ring to be power-of-2 sized.
+ *
+ * - The critical_enter() / critical_exit() are not required for
+ * correctness. They prevent updates from stalling by having a producer be
+ * preempted after updating 'prod_head' but before updating 'prod_tail'.
+ *
+ * - The "while (br->br_prod_tail != prod_head)"
+ * check assures in order completion (probably not strictly necessary,
+ * but makes it easier to reason about) and allows us to update
+ * 'prod_tail' without a cmpxchg / LOCK prefix.
*
*/
static __inline int
@@ -75,21 +141,26 @@
#endif
critical_enter();
do {
+
+ cons_tail = br->br_cons_tail;
prod_head = br->br_prod_head;
prod_next = (prod_head + 1) & br->br_prod_mask;
- cons_tail = br->br_cons_tail;
if (prod_next == cons_tail) {
- rmb();
- if (prod_head == br->br_prod_head &&
- cons_tail == br->br_cons_tail) {
+ /* We need to assure that we have up to date values
+ * but there is no apparent load ordering required
+ * this really just gives us a chance to re-read
+ * the values
+ */
+ if (prod_head == atomic_load_acq_32(&br->br_prod_head) &&
+ cons_tail == atomic_load_acq_32(&br->br_cons_tail)) {
br->br_drops++;
critical_exit();
return (ENOBUFS);
}
continue;
}
- } while (!atomic_cmpset_acq_int(&br->br_prod_head, prod_head, prod_next));
+ } while (!atomic_cmpset_acq_32(&br->br_prod_head, prod_head, prod_next));
#ifdef DEBUG_BUFRING
if (br->br_ring[prod_head] != NULL)
panic("dangling value in enqueue");
@@ -99,11 +170,15 @@
/*
* If there are other enqueues in progress
* that preceeded us, we need to wait for them
- * to complete
- */
+ * to complete
+ * re-ordering of reads would not effect correctness
+ */
while (br->br_prod_tail != prod_head)
- cpu_spinwait();
- atomic_store_rel_int(&br->br_prod_tail, prod_next);
+ cpu_spinwait();
+ /* ensure that the ring update reaches memory before the new
+ * value of prod_tail
+ */
+ atomic_store_rel_32(&br->br_prod_tail, prod_next);
critical_exit();
return (0);
}
@@ -120,28 +195,42 @@
critical_enter();
do {
+ /*
+ * There is no logical ordering required in the ordering of the
+ * prod_tail and cons_head
+ */
cons_head = br->br_cons_head;
- cons_next = (cons_head + 1) & br->br_cons_mask;
-
if (cons_head == br->br_prod_tail) {
critical_exit();
return (NULL);
}
- } while (!atomic_cmpset_acq_int(&br->br_cons_head, cons_head, cons_next));
+ cons_next = (cons_head + 1) & br->br_cons_mask;
+ } while (!atomic_cmpset_acq_32(&br->br_cons_head, cons_head, cons_next));
+
+ /* ensure that the read completes before either of the
+ * subsequent stores
+ */
+ buf = atomic_load_acq_ptr_workaround(&br->br_ring[cons_head]);
- buf = br->br_ring[cons_head];
#ifdef DEBUG_BUFRING
br->br_ring[cons_head] = NULL;
#endif
/*
* If there are other dequeues in progress
* that preceeded us, we need to wait for them
- * to complete
- */
+ * to complete - no memory barrier needed as
+ * re-ordering shouldn't effect correctness or
+ * progress
+ */
while (br->br_cons_tail != cons_head)
cpu_spinwait();
-
- atomic_store_rel_int(&br->br_cons_tail, cons_next);
+#ifdef DEBUG_BUFRING
+ /* a memory barrier is only needed here if we wish to guarantee the
+ * the NULL store completes first
+ */
+ atomic_store_rel_32(&br->br_cons_tail, cons_next);
+#endif
+ br->br_cons_tail = cons_next;
critical_exit();
return (buf);
@@ -162,18 +251,17 @@
uint32_t prod_tail;
void *buf;
+ /*
+ * guarantee that we read prod_tail before reading any of the subsequent
+ * values - (not convinced this is needed for actual correctness)
+ */
+ prod_tail = atomic_load_acq_32(&br->br_prod_tail);
cons_head = br->br_cons_head;
- prod_tail = br->br_prod_tail;
-
+ if (cons_head == prod_tail)
+ return (NULL);
cons_next = (cons_head + 1) & br->br_cons_mask;
#ifdef PREFETCH_DEFINED
cons_next_next = (cons_head + 2) & br->br_cons_mask;
-#endif
-
- if (cons_head == prod_tail)
- return (NULL);
-
-#ifdef PREFETCH_DEFINED
if (cons_next != prod_tail) {
prefetch(br->br_ring[cons_next]);
if (cons_next_next != prod_tail)
@@ -181,7 +269,8 @@
}
#endif
br->br_cons_head = cons_next;
- buf = br->br_ring[cons_head];
+ /* assure the read completes before we allow the producer to overwrite */
+ buf = atomic_load_acq_ptr_workaround(&br->br_ring[cons_head]);
#ifdef DEBUG_BUFRING
br->br_ring[cons_head] = NULL;
@@ -190,8 +279,10 @@
if (br->br_cons_tail != cons_head)
panic("inconsistent list cons_tail=%d cons_head=%d",
br->br_cons_tail, cons_head);
+ atomic_store_rel_32(&br->br_cons_tail, cons_next);
#endif
- br->br_cons_tail = cons_next;
+ /* no release required as there are no updates that have to precede this */
+ br->br_cons_tail = cons_next;
return (buf);
}
@@ -205,16 +296,19 @@
{
uint32_t cons_head, cons_next;
uint32_t prod_tail;
-
- cons_head = br->br_cons_head;
+
+ /* the lock's load acquire should make this safe */
prod_tail = br->br_prod_tail;
+ cons_head = br->br_cons_head;
cons_next = (cons_head + 1) & br->br_cons_mask;
if (cons_head == prod_tail)
return;
br->br_cons_head = cons_next;
+ /* we only need to guarantee memory ordering when storing to br_ring */
#ifdef DEBUG_BUFRING
br->br_ring[cons_head] = NULL;
+ atomic_store_rel_32(&br->br_cons_tail, cons_next);
#endif
br->br_cons_tail = cons_next;
}
@@ -256,12 +350,7 @@
if ((br->br_lock != NULL) && !mtx_owned(br->br_lock))
panic("lock not held on single consumer dequeue");
#endif
- /*
- * I believe it is safe to not have a memory barrier
- * here because we control cons and tail is worst case
- * a lagging indicator so we worst case we might
- * return NULL immediately after a buffer has been enqueued
- */
+
if (br->br_cons_head == br->br_prod_tail)
return (NULL);
@@ -271,13 +360,18 @@
static __inline int
buf_ring_full(struct buf_ring *br)
{
-
+ /* br_cons_tail may be stale but the consumer understands that this is
+ * only a point in time snapshot
+ */
return (((br->br_prod_head + 1) & br->br_prod_mask) == br->br_cons_tail);
}
static __inline int
buf_ring_empty(struct buf_ring *br)
{
+ /* br_prod_tail may be stale but the consumer understands that this is
+ * only a point in time snapshot
+ */
return (br->br_cons_head == br->br_prod_tail);
}
@@ -285,6 +379,9 @@
static __inline int
buf_ring_count(struct buf_ring *br)
{
+ /* br_cons_tail and br_prod_tail may be stale but the consumer
+ * understands that this is only a point in time snapshot
+ */
return ((br->br_prod_size + br->br_prod_tail - br->br_cons_tail)
& br->br_prod_mask);
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sun, Oct 19, 2:39 PM (7 h, 44 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
23928989
Default Alt Text
D1945.id3938.diff (10 KB)
Attached To
Mode
D1945: Buf ring cleanups
Attached
Detach File
Event Timeline
Log In to Comment