Page MenuHomeFreeBSD

D1945.id3966.diff
No OneTemporary

D1945.id3966.diff

Index: sys/sys/buf_ring.h
===================================================================
--- sys/sys/buf_ring.h
+++ sys/sys/buf_ring.h
@@ -1,5 +1,5 @@
/*-
- * Copyright (c) 2007-2009 Kip Macy <kmacy@freebsd.org>
+ * Copyright (c) 2007-2015 Kip Macy <kmacy@freebsd.org>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -58,7 +58,85 @@
};
/*
- * multi-producer safe lock-free ring buffer enqueue
+ * Many architectures other than x86 permit speculative re-ordering
+ * of loads. Unfortunately, atomic_load_acq_32() is comparatively
+ * expensive so we'd rather elide it if possible.
+ */
+#if defined(__i386__) || defined(__amd64__)
+#define ORDERED_LOAD_32(x) (*x)
+#else
+#define ORDERED_LOAD_32(x) atomic_load_acq_32((x))
+#endif
+
+/*
+ * Multi-producer safe lock-free ring buffer enqueue
+ *
+ * Most architectures do not support the atomic update of multiple
+ * discontiguous locations. So it is not possible to atomically update
+ * the producer index and ring buffer entry. To side-step this limitation
+ * we split update in to 3 steps:
+ * 1) atomically acquiring an index
+ * 2) updating the corresponding ring entry
+ * 3) making the update available to the consumer
+ * In order to split the index update in to an acquire and release
+ * phase there are _two_ producer indexes. 'prod_head' is used for
+ * step 1) and is thus only used by the enqueue itself. 'prod_tail'
+ * is used for step 3) to signal to the consumer that the update is
+ * complete. To guarantee memory ordering the update of 'prod_tail' is
+ * done with a atomic_store_rel_32(...) and the corresponding
+ * initial read of 'prod_tail' by the dequeue functions is done with
+ * an atomic_load_acq_32(...).
+ *
+ * Regarding memory ordering - there are five variables in question:
+ * (br_) prod_head, prod_tail, cons_head, cons_tail, ring[idx={cons, prod}]
+ * It's easiest examine correctness by considering the consequence of
+ * reading a stale value or having an update become visible prior to
+ * preceding writes.
+ *
+ * - prod_head: this is only read by the enqueue routine, if the latter were to
+ * initially read a stale value for it the cmpxchg (atomic_cmpset_acq_32)
+ * would fail. However, the implied memory barrier in cmpxchg would cause the
+ * subsequent read of prod_head to read the up-to-date value permitting the
+ * cmpxchg to succeed the second time.
+ *
+ * - prod_tail: This value is used by dequeue to determine the effective
+ * producer index. On architectures with weaker memory ordering than x86 it
+ * needs special handling. In enqueue it needs to be updated with
+ * atomic_store_rel_32() (i.e. a write memory barrier before update) to
+ * guarantee that the new ring value is committed to memory before it is
+ * made available by prod_tail. In dequeue to guarantee that it is read before
+ * br_ring[cons_head] it needs to be read with atomic_load_acq_32().
+ *
+ * - cons_head: this value is used only by dequeue, it is either updated
+ * atomically (dequeue_mc) or protected by a mutex (dequeue_sc).
+ *
+ * - cons_tail: This is used to communicate the latest consumer index between
+ * dequeue and enqueue. Reading a stale value in enqueue can cause an enqueue
+ * to fail erroneously. To avoid a load being re-ordered after a store (and
+ * thus permitting enqueue to store a new value before the old one has been
+ * consumed) it is updated with an atomic_store_rel_32() in deqeueue.
+ *
+ * - ring[idx] : Updates to this value need to reach memory before the subsequent
+ * update to prod_tail does. Reads need to happen before subsequent updates to
+ * cons_tail.
+ *
+ * Some implementation notes:
+ * - Much like a simpler single-producer single consumer ring buffer,
+ * the producer can not produce faster than the consumer. Hence the
+ * check of 'prod_head' + 1 against 'cons_tail'.
+ *
+ * - The use of "prod_next = (prod_head + 1) & br->br_prod_mask" to
+ * calculate the next index is slightly cheaper than a modulo but
+ * requires the ring to be power-of-2 sized.
+ *
+ * - The critical_enter() / critical_exit() are not required for
+ * correctness. They prevent updates from stalling by having a producer be
+ * preempted after updating 'prod_head' but before updating 'prod_tail'.
+ *
+ * - The "while (br->br_prod_tail != prod_head)"
+ * check assures in order completion (probably not strictly necessary,
+ * but makes it easier to reason about) and allows us to update
+ * 'prod_tail' without a cmpxchg / LOCK prefix.
*
*/
static __inline int
@@ -75,21 +153,24 @@
#endif
critical_enter();
do {
+
prod_head = br->br_prod_head;
prod_next = (prod_head + 1) & br->br_prod_mask;
cons_tail = br->br_cons_tail;
if (prod_next == cons_tail) {
- rmb();
- if (prod_head == br->br_prod_head &&
- cons_tail == br->br_cons_tail) {
- br->br_drops++;
- critical_exit();
- return (ENOBUFS);
- }
- continue;
+ /* ensure that we only return ENOBUFS
+ * if the latest value matches what we read
+ */
+ if (prod_head != atomic_load_acq_32(&br->br_prod_head) ||
+ cons_tail != atomic_load_acq_32(&br->br_cons_tail))
+ continue;
+
+ br->br_drops++;
+ critical_exit();
+ return (ENOBUFS);
}
- } while (!atomic_cmpset_acq_int(&br->br_prod_head, prod_head, prod_next));
+ } while (!atomic_cmpset_acq_32(&br->br_prod_head, prod_head, prod_next));
#ifdef DEBUG_BUFRING
if (br->br_ring[prod_head] != NULL)
panic("dangling value in enqueue");
@@ -99,11 +180,15 @@
/*
* If there are other enqueues in progress
* that preceeded us, we need to wait for them
- * to complete
- */
+ * to complete
+ * re-ordering of reads would not effect correctness
+ */
while (br->br_prod_tail != prod_head)
- cpu_spinwait();
- atomic_store_rel_int(&br->br_prod_tail, prod_next);
+ cpu_spinwait();
+ /* ensure that the ring update reaches memory before the new
+ * value of prod_tail
+ */
+ atomic_store_rel_32(&br->br_prod_tail, prod_next);
critical_exit();
return (0);
}
@@ -120,28 +205,39 @@
critical_enter();
do {
+ /*
+ * There is no logical ordering required in the ordering of the
+ * prod_tail and cons_head
+ */
cons_head = br->br_cons_head;
- cons_next = (cons_head + 1) & br->br_cons_mask;
-
if (cons_head == br->br_prod_tail) {
critical_exit();
return (NULL);
}
- } while (!atomic_cmpset_acq_int(&br->br_cons_head, cons_head, cons_next));
+ cons_next = (cons_head + 1) & br->br_cons_mask;
+ } while (!atomic_cmpset_acq_32(&br->br_cons_head, cons_head, cons_next));
+ /* ensure that the read completes before either of the
+ * subsequent stores
+ */
buf = br->br_ring[cons_head];
-#ifdef DEBUG_BUFRING
+ /* guarantee that the load completes before we update cons_tail */
br->br_ring[cons_head] = NULL;
-#endif
+
/*
* If there are other dequeues in progress
* that preceeded us, we need to wait for them
- * to complete
- */
+ * to complete - no memory barrier needed as
+ * re-ordering shouldn't effect correctness or
+ * progress
+ */
while (br->br_cons_tail != cons_head)
cpu_spinwait();
-
- atomic_store_rel_int(&br->br_cons_tail, cons_next);
+ /* rather than doing a atomic_load_acq_ptr() on the ring
+ * we guarantee that the load completes first by ensuring
+ * release semantics on tail - this is cheaper on x86
+ */
+ atomic_store_rel_32(&br->br_cons_tail, cons_next);
critical_exit();
return (buf);
@@ -158,22 +254,25 @@
uint32_t cons_head, cons_next;
#ifdef PREFETCH_DEFINED
uint32_t cons_next_next;
-#endif
uint32_t prod_tail;
+#endif
void *buf;
-
+
+retry:
+ /*
+ * prod_tail tells whether or not br_ring[cons_head] is valid
+ * thus we must guarantee that it is read first
+ */
cons_head = br->br_cons_head;
- prod_tail = br->br_prod_tail;
-
+ if (cons_head == ORDERED_LOAD_32(&br->br_prod_tail)) {
+ if (cons_head != atomic_load_acq_32(&br->br_prod_tail))
+ goto retry;
+ return (NULL);
+ }
cons_next = (cons_head + 1) & br->br_cons_mask;
#ifdef PREFETCH_DEFINED
+ prod_tail = br->br_prod_tail;
cons_next_next = (cons_head + 2) & br->br_cons_mask;
-#endif
-
- if (cons_head == prod_tail)
- return (NULL);
-
-#ifdef PREFETCH_DEFINED
if (cons_next != prod_tail) {
prefetch(br->br_ring[cons_next]);
if (cons_next_next != prod_tail)
@@ -182,16 +281,17 @@
#endif
br->br_cons_head = cons_next;
buf = br->br_ring[cons_head];
-
-#ifdef DEBUG_BUFRING
+ /* guarantee that the load completes before we update cons_tail */
br->br_ring[cons_head] = NULL;
+#ifdef DEBUG_BUFRING
if (!mtx_owned(br->br_lock))
panic("lock not held on single consumer dequeue");
if (br->br_cons_tail != cons_head)
panic("inconsistent list cons_tail=%d cons_head=%d",
br->br_cons_tail, cons_head);
#endif
- br->br_cons_tail = cons_next;
+ atomic_store_rel_32(&br->br_cons_tail, cons_next);
+
return (buf);
}
@@ -205,7 +305,7 @@
{
uint32_t cons_head, cons_next;
uint32_t prod_tail;
-
+
cons_head = br->br_cons_head;
prod_tail = br->br_prod_tail;
@@ -213,10 +313,17 @@
if (cons_head == prod_tail)
return;
br->br_cons_head = cons_next;
-#ifdef DEBUG_BUFRING
+
+ /*
+ * Storing NULL here serves two purposes:
+ * 1) it assures that the load of ring[cons_head] has completed
+ * (only the most perverted architecture or compiler would
+ * consider re-ordering a = *x; *x = b)
+ * 2) it allows us to enforce global ordering of the cons_tail
+ * update with an atomic_store_rel_32
+ */
br->br_ring[cons_head] = NULL;
-#endif
- br->br_cons_tail = cons_next;
+ atomic_store_rel_32(&br->br_cons_tail, cons_next);
}
/*
@@ -234,6 +341,8 @@
* back (since jhb says the store is probably cheaper),
* if we have to do a multi-queue version we will need
* the compare and an atomic.
+ *
+ * should be a no-op
*/
static __inline void
buf_ring_putback_sc(struct buf_ring *br, void *new)
@@ -251,33 +360,40 @@
static __inline void *
buf_ring_peek(struct buf_ring *br)
{
-
+ uint32_t cons_head;
#ifdef DEBUG_BUFRING
if ((br->br_lock != NULL) && !mtx_owned(br->br_lock))
panic("lock not held on single consumer dequeue");
#endif
+ cons_head = br->br_cons_head;
/*
- * I believe it is safe to not have a memory barrier
- * here because we control cons and tail is worst case
- * a lagging indicator so we worst case we might
- * return NULL immediately after a buffer has been enqueued
+ * for correctness prod_tail must be read before ring[cons_head]
*/
- if (br->br_cons_head == br->br_prod_tail)
+
+ if (cons_head == ORDERED_LOAD_32(&br->br_prod_tail))
return (NULL);
-
- return (br->br_ring[br->br_cons_head]);
+
+ /* ensure that the ring load completes before
+ * exposing it to any destructive updates
+ */
+ return (br->br_ring[cons_head]);
}
static __inline int
buf_ring_full(struct buf_ring *br)
{
-
+ /* br_cons_tail may be stale but the consumer understands that this is
+ * only a point in time snapshot
+ */
return (((br->br_prod_head + 1) & br->br_prod_mask) == br->br_cons_tail);
}
static __inline int
buf_ring_empty(struct buf_ring *br)
{
+ /* br_prod_tail may be stale but the consumer understands that this is
+ * only a point in time snapshot
+ */
return (br->br_cons_head == br->br_prod_tail);
}
@@ -285,6 +401,9 @@
static __inline int
buf_ring_count(struct buf_ring *br)
{
+ /* br_cons_tail and br_prod_tail may be stale but the consumer
+ * understands that this is only a point in time snapshot
+ */
return ((br->br_prod_size + br->br_prod_tail - br->br_cons_tail)
& br->br_prod_mask);

File Metadata

Mime Type
text/plain
Expires
Fri, Feb 13, 1:50 AM (17 h, 28 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
28672494
Default Alt Text
D1945.id3966.diff (11 KB)

Event Timeline