Page MenuHomeFreeBSD

D1247.id2625.diff
No OneTemporary

D1247.id2625.diff

Index: share/man/man9/taskqueue.9
===================================================================
--- share/man/man9/taskqueue.9
+++ share/man/man9/taskqueue.9
@@ -285,10 +285,9 @@
.Fn taskqueue_drain_all
function is used to wait for all pending and running tasks that
are enqueued on the taskqueue to finish.
-The caller must arrange that the tasks are not re-enqueued.
-Note that
+Tasks posted to the taskqueue after
.Fn taskqueue_drain_all
-currently does not handle tasks with delayed enqueueing.
+begins processing are ignored.
.Pp
The
.Fn taskqueue_block
Index: sys/kern/subr_taskqueue.c
===================================================================
--- sys/kern/subr_taskqueue.c
+++ sys/kern/subr_taskqueue.c
@@ -56,6 +56,8 @@
TAILQ_ENTRY(taskqueue_busy) tb_link;
};
+struct task * const TB_DRAIN_WAITER = (struct task *)0x1;
+
struct taskqueue {
STAILQ_HEAD(, task) tq_queue;
taskqueue_enqueue_fn tq_enqueue;
@@ -241,6 +243,7 @@
/* Return with lock released. */
return (0);
}
+
int
taskqueue_enqueue(struct taskqueue *queue, struct task *task)
{
@@ -302,14 +305,84 @@
}
static void
-taskqueue_drain_running(struct taskqueue *queue)
+taskqueue_task_nop_fn(void *context, int pending)
{
+}
- while (!TAILQ_EMPTY(&queue->tq_active))
- TQ_SLEEP(queue, &queue->tq_active, &queue->tq_mutex,
- PWAIT, "-", 0);
+/*
+ * Block until all currently queued tasks in this taskqueue
+ * have begun execution. Tasks queued during execution of
+ * this function are ignored.
+ */
+static void
+taskqueue_drain_tq_queue(struct taskqueue *queue)
+{
+ struct task t_barrier;
+
+ if (STAILQ_EMPTY(&queue->tq_queue))
+ return;
+
+ /*
+ * Enqueue our barrier with the lowest possible priority
+ * so we are inserted after all current tasks.
+ */
+ TASK_INIT(&t_barrier, 0, taskqueue_task_nop_fn, &t_barrier);
+ taskqueue_enqueue_locked(queue, &t_barrier);
+
+ /*
+ * Raise the barrier's priority so newly queued tasks cannot
+ * pass it.
+ */
+ t_barrier.ta_priority = USHRT_MAX;
+
+ /*
+ * Once the barrier has executed, all previously queued tasks
+ * have completed or are currently executing.
+ */
+ while (t_barrier.ta_pending != 0)
+ TQ_SLEEP(queue, &t_barrier, &queue->tq_mutex, PWAIT, "-", 0);
}
+/*
+ * Block until all currently executing tasks for this taskqueue
+ * complete. Tasks that begin execution during the execution
+ * of this function are ignored.
+ */
+static void
+taskqueue_drain_tq_active(struct taskqueue *queue)
+{
+ struct taskqueue_busy tb_marker, *tb_first;
+
+ if (TAILQ_EMPTY(&queue->tq_active))
+ return;
+
+ /* Block taskq_terminate().*/
+ queue->tq_callouts++;
+
+ /*
+ * Wait for all currently executing taskqueue threads
+ * to go idle.
+ */
+ tb_marker.tb_running = TB_DRAIN_WAITER;
+ TAILQ_INSERT_TAIL(&queue->tq_active, &tb_marker, tb_link);
+ while (TAILQ_FIRST(&queue->tq_active) != &tb_marker)
+ TQ_SLEEP(queue, &tb_marker, &queue->tq_mutex, PWAIT, "-", 0);
+ TAILQ_REMOVE(&queue->tq_active, &tb_marker, tb_link);
+
+ /*
+ * Wakeup any other drain waiter that happend to queue up
+ * without any interveining active thread.
+ */
+ tb_first = TAILQ_FIRST(&queue->tq_active);
+ if (tb_first != NULL && tb_first->tb_running == TB_DRAIN_WAITER)
+ wakeup(tb_first);
+
+ /* Release taskqueue_terminate().*/
+ queue->tq_callouts--;
+ if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0)
+ wakeup_one(queue->tq_threads);
+}
+
void
taskqueue_block(struct taskqueue *queue)
{
@@ -334,14 +407,16 @@
taskqueue_run_locked(struct taskqueue *queue)
{
struct taskqueue_busy tb;
+ struct taskqueue_busy *tb_first;
struct task *task;
int pending;
TQ_ASSERT_LOCKED(queue);
tb.tb_running = NULL;
- TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link);
while (STAILQ_FIRST(&queue->tq_queue)) {
+ TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link);
+
/*
* Carefully remove the first task from the queue and
* zero its pending count.
@@ -358,10 +433,13 @@
TQ_LOCK(queue);
tb.tb_running = NULL;
wakeup(task);
+
+ TAILQ_REMOVE(&queue->tq_active, &tb, tb_link);
+ tb_first = TAILQ_FIRST(&queue->tq_active);
+ if (tb_first != NULL &&
+ tb_first->tb_running == TB_DRAIN_WAITER)
+ wakeup(tb_first);
}
- TAILQ_REMOVE(&queue->tq_active, &tb, tb_link);
- if (TAILQ_EMPTY(&queue->tq_active))
- wakeup(&queue->tq_active);
}
void
@@ -448,19 +526,13 @@
void
taskqueue_drain_all(struct taskqueue *queue)
{
- struct task *task;
if (!queue->tq_spin)
WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
TQ_LOCK(queue);
- task = STAILQ_LAST(&queue->tq_queue, task, ta_link);
- if (task != NULL)
- while (task->ta_pending != 0)
- TQ_SLEEP(queue, task, &queue->tq_mutex, PWAIT, "-", 0);
- taskqueue_drain_running(queue);
- KASSERT(STAILQ_EMPTY(&queue->tq_queue),
- ("taskqueue queue is not empty after draining"));
+ taskqueue_drain_tq_queue(queue);
+ taskqueue_drain_tq_active(queue);
TQ_UNLOCK(queue);
}

File Metadata

Mime Type
text/plain
Expires
Thu, Nov 28, 3:58 AM (19 h, 10 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
14901486
Default Alt Text
D1247.id2625.diff (4 KB)

Event Timeline