Index: share/man/man9/taskqueue.9 =================================================================== --- share/man/man9/taskqueue.9 +++ share/man/man9/taskqueue.9 @@ -28,7 +28,7 @@ .\" .\" $FreeBSD$ .\" -.Dd May 24, 2014 +.Dd December 31, 2014 .Dt TASKQUEUE 9 .Os .Sh NAME @@ -285,10 +285,16 @@ .Fn taskqueue_drain_all function is used to wait for all pending and running tasks that are enqueued on the taskqueue to finish. -The caller must arrange that the tasks are not re-enqueued. -Note that +Tasks posted to the taskqueue after .Fn taskqueue_drain_all -currently does not handle tasks with delayed enqueueing. +begins processing, +including pending enqueues scheduled by a previous call to +.Fn taskqueue_enqueue_timeout , +do not extend the wait time of +.Fn taskqueue_drain_all +and may complete after +.Fn taskqueue_drain_all +returns. .Pp The .Fn taskqueue_block Index: sys/kern/subr_taskqueue.c =================================================================== --- sys/kern/subr_taskqueue.c +++ sys/kern/subr_taskqueue.c @@ -56,6 +56,8 @@ TAILQ_ENTRY(taskqueue_busy) tb_link; }; +struct task * const TB_DRAIN_WAITER = (struct task *)0x1; + struct taskqueue { STAILQ_HEAD(, task) tq_queue; taskqueue_enqueue_fn tq_enqueue; @@ -241,6 +243,7 @@ /* Return with lock released. */ return (0); } + int taskqueue_enqueue(struct taskqueue *queue, struct task *task) { @@ -302,12 +305,82 @@ } static void -taskqueue_drain_running(struct taskqueue *queue) +taskqueue_task_nop_fn(void *context, int pending) +{ +} + +/* + * Block until all currently queued tasks in this taskqueue + * have begun execution. Tasks queued during execution of + * this function are ignored. + */ +static void +taskqueue_drain_tq_queue(struct taskqueue *queue) { + struct task t_barrier; - while (!TAILQ_EMPTY(&queue->tq_active)) - TQ_SLEEP(queue, &queue->tq_active, &queue->tq_mutex, - PWAIT, "-", 0); + if (STAILQ_EMPTY(&queue->tq_queue)) + return; + + /* + * Enqueue our barrier with the lowest possible priority + * so we are inserted after all current tasks. + */ + TASK_INIT(&t_barrier, 0, taskqueue_task_nop_fn, &t_barrier); + taskqueue_enqueue_locked(queue, &t_barrier); + + /* + * Raise the barrier's priority so newly queued tasks cannot + * pass it. + */ + t_barrier.ta_priority = USHRT_MAX; + + /* + * Once the barrier has executed, all previously queued tasks + * have completed or are currently executing. + */ + while (t_barrier.ta_pending != 0) + TQ_SLEEP(queue, &t_barrier, &queue->tq_mutex, PWAIT, "-", 0); +} + +/* + * Block until all currently executing tasks for this taskqueue + * complete. Tasks that begin execution during the execution + * of this function are ignored. + */ +static void +taskqueue_drain_tq_active(struct taskqueue *queue) +{ + struct taskqueue_busy tb_marker, *tb_first; + + if (TAILQ_EMPTY(&queue->tq_active)) + return; + + /* Block taskq_terminate().*/ + queue->tq_callouts++; + + /* + * Wait for all currently executing taskqueue threads + * to go idle. + */ + tb_marker.tb_running = TB_DRAIN_WAITER; + TAILQ_INSERT_TAIL(&queue->tq_active, &tb_marker, tb_link); + while (TAILQ_FIRST(&queue->tq_active) != &tb_marker) + TQ_SLEEP(queue, &tb_marker, &queue->tq_mutex, PWAIT, "-", 0); + TAILQ_REMOVE(&queue->tq_active, &tb_marker, tb_link); + + /* + * Wakeup any other drain waiter that happened to queue up + * without any intervening active thread. + */ + tb_first = TAILQ_FIRST(&queue->tq_active); + if (tb_first != NULL && tb_first->tb_running == TB_DRAIN_WAITER) + wakeup(tb_first); + + /* Release taskqueue_terminate(). */ + queue->tq_callouts--; + if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0) + wakeup_one(queue->tq_threads); } void @@ -334,14 +407,16 @@ taskqueue_run_locked(struct taskqueue *queue) { struct taskqueue_busy tb; + struct taskqueue_busy *tb_first; struct task *task; int pending; TQ_ASSERT_LOCKED(queue); tb.tb_running = NULL; - TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link); while (STAILQ_FIRST(&queue->tq_queue)) { + TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link); + /* * Carefully remove the first task from the queue and * zero its pending count. @@ -358,10 +433,13 @@ TQ_LOCK(queue); tb.tb_running = NULL; wakeup(task); + + TAILQ_REMOVE(&queue->tq_active, &tb, tb_link); + tb_first = TAILQ_FIRST(&queue->tq_active); + if (tb_first != NULL && + tb_first->tb_running == TB_DRAIN_WAITER) + wakeup(tb_first); } - TAILQ_REMOVE(&queue->tq_active, &tb, tb_link); - if (TAILQ_EMPTY(&queue->tq_active)) - wakeup(&queue->tq_active); } void @@ -448,19 +526,13 @@ void taskqueue_drain_all(struct taskqueue *queue) { - struct task *task; if (!queue->tq_spin) WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); TQ_LOCK(queue); - task = STAILQ_LAST(&queue->tq_queue, task, ta_link); - if (task != NULL) - while (task->ta_pending != 0) - TQ_SLEEP(queue, task, &queue->tq_mutex, PWAIT, "-", 0); - taskqueue_drain_running(queue); - KASSERT(STAILQ_EMPTY(&queue->tq_queue), - ("taskqueue queue is not empty after draining")); + taskqueue_drain_tq_queue(queue); + taskqueue_drain_tq_active(queue); TQ_UNLOCK(queue); }