Page MenuHomeFreeBSD

D1945.id3942.diff
No OneTemporary

D1945.id3942.diff

Index: sys/sys/buf_ring.h
===================================================================
--- sys/sys/buf_ring.h
+++ sys/sys/buf_ring.h
@@ -1,5 +1,5 @@
/*-
- * Copyright (c) 2007-2009 Kip Macy <kmacy@freebsd.org>
+ * Copyright (c) 2007-2015 Kip Macy <kmacy@freebsd.org>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -58,7 +58,70 @@
};
/*
- * multi-producer safe lock-free ring buffer enqueue
+ * Multi-producer safe lock-free ring buffer enqueue
+ *
+ * Most architectures do not support the atomic update of multiple
+ * discontiguous locations. So it is not possible to atomically update
+ * the producer index and ring buffer entry. To side-step this limitation
+ * we split update in to 3 steps:
+ * 1) atomically acquiring an index
+ * 2) updating the corresponding ring entry
+ * 3) making the update available to the consumer
+ * In order to split the index update in to an acquire and release
+ * phase there are _two_ producer indexes. 'prod_head' is used for
+ * step 1) and is thus only used by the enqueue itself. 'prod_tail'
+ * is used for step 3) to signal to the consumer that the update is
+ * complete. To guarantee memory ordering the update of 'prod_tail' is
+ * done with a atomic_store_rel_32(...) and the corresponding
+ * initial read of 'prod_tail' by the dequeue functions is done with
+ * an atomic_load_acq_32(...).
+ *
+ * Regarding memory ordering - there are five variables in question:
+ * (br_) prod_head, prod_tail, cons_head, cons_tail, ring[idx={cons, prod}]
+ * It's easiest examine correctness by considering the consequence of
+ * reading a stale value or having an update become visible prior to
+ * preceding writes.
+ *
+ * - prod_head: this is only read by the enqueue routine, if the latter were to
+ * initially read a stale value for it the cmpxchg (atomic_cmpset_acq_32)
+ * would fail. However, the implied memory barrier in cmpxchg would cause the
+ * subsequent read of prod_head to read the up-to-date value permitting the
+ * cmpxchg to succeed the second time.
+ *
+ * - prod_tail: this value is used by dequeue to determine the effective
+ * producer index. The absence of a memory barrier preceding it would make
+ * it possible for dequeue to read the new value prior to the update to
+ * ring[idx] becomings visible, thus reading a stale value in ring[idx].
+ *
+ * - cons_head: this value is used only by dequeue, it is either updated
+ * atomically (dequeue_mc) or protected by a mutex (dequeue_sc).
+ *
+ * - cons_tail: This is used to communicate the latest consumer index between
+ * dequeue and enqueue. Reading a stale value in enqueue can cause an enqueue
+ * to fail erroneously. There is no risk of reading a _new_ value of cons_tail
+ * before br_ring[idx] has been read because the value is not updated until
+ * following the read.
+ *
+ * - ring[idx] : Updates to this value need to reach memory before the subsequent
+ * update to prod_tail does.
+ *
+ * Some implementation notes:
+ * - Much like a simpler single-producer single consumer ring buffer,
+ * the producer can not produce faster than the consumer. Hence the
+ * check of 'prod_head' + 1 against 'cons_tail'.
+ *
+ * - The use of "prod_next = (prod_head + 1) & br->br_prod_mask" to
+ * calculate the next index is slightly cheaper than a modulo but
+ * requires the ring to be power-of-2 sized.
+ *
+ * - The critical_enter() / critical_exit() are not required for
+ * correctness. They prevent updates from stalling by having a producer be
+ * preempted after updating 'prod_head' but before updating 'prod_tail'.
+ *
+ * - The "while (br->br_prod_tail != prod_head)"
+ * check assures in order completion (probably not strictly necessary,
+ * but makes it easier to reason about) and allows us to update
+ * 'prod_tail' without a cmpxchg / LOCK prefix.
*
*/
static __inline int
@@ -75,21 +138,26 @@
#endif
critical_enter();
do {
+
+ cons_tail = br->br_cons_tail;
prod_head = br->br_prod_head;
prod_next = (prod_head + 1) & br->br_prod_mask;
- cons_tail = br->br_cons_tail;
if (prod_next == cons_tail) {
- rmb();
- if (prod_head == br->br_prod_head &&
- cons_tail == br->br_cons_tail) {
+ /* We need to assure that we have up to date values
+ * but there is no apparent load ordering required
+ * this really just gives us a chance to re-read
+ * the values
+ */
+ if (prod_head == atomic_load_acq_32(&br->br_prod_head) &&
+ cons_tail == atomic_load_acq_32(&br->br_cons_tail)) {
br->br_drops++;
critical_exit();
return (ENOBUFS);
}
continue;
}
- } while (!atomic_cmpset_acq_int(&br->br_prod_head, prod_head, prod_next));
+ } while (!atomic_cmpset_acq_32(&br->br_prod_head, prod_head, prod_next));
#ifdef DEBUG_BUFRING
if (br->br_ring[prod_head] != NULL)
panic("dangling value in enqueue");
@@ -99,11 +167,15 @@
/*
* If there are other enqueues in progress
* that preceeded us, we need to wait for them
- * to complete
- */
+ * to complete
+ * re-ordering of reads would not effect correctness
+ */
while (br->br_prod_tail != prod_head)
- cpu_spinwait();
- atomic_store_rel_int(&br->br_prod_tail, prod_next);
+ cpu_spinwait();
+ /* ensure that the ring update reaches memory before the new
+ * value of prod_tail
+ */
+ atomic_store_rel_32(&br->br_prod_tail, prod_next);
critical_exit();
return (0);
}
@@ -120,28 +192,40 @@
critical_enter();
do {
+ /*
+ * There is no logical ordering required in the ordering of the
+ * prod_tail and cons_head
+ */
cons_head = br->br_cons_head;
- cons_next = (cons_head + 1) & br->br_cons_mask;
-
if (cons_head == br->br_prod_tail) {
critical_exit();
return (NULL);
}
- } while (!atomic_cmpset_acq_int(&br->br_cons_head, cons_head, cons_next));
+ cons_next = (cons_head + 1) & br->br_cons_mask;
+ } while (!atomic_cmpset_acq_32(&br->br_cons_head, cons_head, cons_next));
+ /* ensure that the read completes before either of the
+ * subsequent stores
+ */
buf = br->br_ring[cons_head];
+
#ifdef DEBUG_BUFRING
br->br_ring[cons_head] = NULL;
#endif
/*
* If there are other dequeues in progress
* that preceeded us, we need to wait for them
- * to complete
- */
+ * to complete - no memory barrier needed as
+ * re-ordering shouldn't effect correctness or
+ * progress
+ */
while (br->br_cons_tail != cons_head)
cpu_spinwait();
-
- atomic_store_rel_int(&br->br_cons_tail, cons_next);
+ /* rather than doing a atomic_load_acq_ptr() on the ring
+ * we guarantee that the load completes first by ensuring
+ * release semantics on tail - this is cheaper on x86
+ */
+ atomic_store_rel_32(&br->br_cons_tail, cons_next);
critical_exit();
return (buf);
@@ -161,19 +245,18 @@
#endif
uint32_t prod_tail;
void *buf;
-
+
+ /*
+ * guarantee that we read prod_tail before reading ring[cons_next]
+ * thus assuring that we can't read a stale ring entry value
+ */
+ prod_tail = atomic_load_acq_32(&br->br_prod_tail);
cons_head = br->br_cons_head;
- prod_tail = br->br_prod_tail;
-
+ if (cons_head == prod_tail)
+ return (NULL);
cons_next = (cons_head + 1) & br->br_cons_mask;
#ifdef PREFETCH_DEFINED
cons_next_next = (cons_head + 2) & br->br_cons_mask;
-#endif
-
- if (cons_head == prod_tail)
- return (NULL);
-
-#ifdef PREFETCH_DEFINED
if (cons_next != prod_tail) {
prefetch(br->br_ring[cons_next]);
if (cons_next_next != prod_tail)
@@ -191,7 +274,12 @@
panic("inconsistent list cons_tail=%d cons_head=%d",
br->br_cons_tail, cons_head);
#endif
- br->br_cons_tail = cons_next;
+ /* rather than doing a atomic_load_acq_ptr() on the ring
+ * we guarantee that the load completes first by ensuring
+ * release semantics on tail - this is cheaper on x86
+ */
+ atomic_store_rel_32(&br->br_cons_tail, cons_next);
+
return (buf);
}
@@ -205,9 +293,9 @@
{
uint32_t cons_head, cons_next;
uint32_t prod_tail;
-
- cons_head = br->br_cons_head;
+
prod_tail = br->br_prod_tail;
+ cons_head = br->br_cons_head;
cons_next = (cons_head + 1) & br->br_cons_mask;
if (cons_head == prod_tail)
@@ -216,7 +304,11 @@
#ifdef DEBUG_BUFRING
br->br_ring[cons_head] = NULL;
#endif
- br->br_cons_tail = cons_next;
+ /* rather than doing a atomic_load_acq_ptr() on the ring
+ * we guarantee that the load completes first by ensuring
+ * release semantics on tail - this is cheaper on x86
+ */
+ atomic_store_rel_32(&br->br_cons_tail, cons_next);
}
/*
@@ -234,6 +326,8 @@
* back (since jhb says the store is probably cheaper),
* if we have to do a multi-queue version we will need
* the compare and an atomic.
+ *
+ * should be a no-op
*/
static __inline void
buf_ring_putback_sc(struct buf_ring *br, void *new)
@@ -251,33 +345,37 @@
static __inline void *
buf_ring_peek(struct buf_ring *br)
{
-
+ uint32_t prod_tail;
#ifdef DEBUG_BUFRING
if ((br->br_lock != NULL) && !mtx_owned(br->br_lock))
panic("lock not held on single consumer dequeue");
#endif
- /*
- * I believe it is safe to not have a memory barrier
- * here because we control cons and tail is worst case
- * a lagging indicator so we worst case we might
- * return NULL immediately after a buffer has been enqueued
- */
- if (br->br_cons_head == br->br_prod_tail)
+
+ /* ensure that prod_tail is read before ring */
+ prod_tail = atomic_load_acq_32(&br->br_prod_tail);
+ if (br->br_cons_head == prod_tail)
return (NULL);
-
+ /* ensure that the ring load completes before
+ * exposing it to any destructive updates
+ */
return (br->br_ring[br->br_cons_head]);
}
static __inline int
buf_ring_full(struct buf_ring *br)
{
-
+ /* br_cons_tail may be stale but the consumer understands that this is
+ * only a point in time snapshot
+ */
return (((br->br_prod_head + 1) & br->br_prod_mask) == br->br_cons_tail);
}
static __inline int
buf_ring_empty(struct buf_ring *br)
{
+ /* br_prod_tail may be stale but the consumer understands that this is
+ * only a point in time snapshot
+ */
return (br->br_cons_head == br->br_prod_tail);
}
@@ -285,6 +383,9 @@
static __inline int
buf_ring_count(struct buf_ring *br)
{
+ /* br_cons_tail and br_prod_tail may be stale but the consumer
+ * understands that this is only a point in time snapshot
+ */
return ((br->br_prod_size + br->br_prod_tail - br->br_cons_tail)
& br->br_prod_mask);

File Metadata

Mime Type
text/plain
Expires
Wed, Oct 22, 4:57 PM (2 h, 54 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
24060980
Default Alt Text
D1945.id3942.diff (10 KB)

Event Timeline