Changeset View
Changeset View
Standalone View
Standalone View
sys/kern/subr_taskqueue.c
Context not available. | |||||
TAILQ_ENTRY(taskqueue_busy) tb_link; | TAILQ_ENTRY(taskqueue_busy) tb_link; | ||||
}; | }; | ||||
struct task * const TB_DRAIN_WAITER = (struct task *)0x1; | |||||
struct taskqueue { | struct taskqueue { | ||||
STAILQ_HEAD(, task) tq_queue; | STAILQ_HEAD(, task) tq_queue; | ||||
taskqueue_enqueue_fn tq_enqueue; | taskqueue_enqueue_fn tq_enqueue; | ||||
Context not available. | |||||
/* Return with lock released. */ | /* Return with lock released. */ | ||||
return (0); | return (0); | ||||
} | } | ||||
int | int | ||||
taskqueue_enqueue(struct taskqueue *queue, struct task *task) | taskqueue_enqueue(struct taskqueue *queue, struct task *task) | ||||
{ | { | ||||
Context not available. | |||||
} | } | ||||
static void | static void | ||||
taskqueue_drain_running(struct taskqueue *queue) | taskqueue_task_nop_fn(void *context, int pending) | ||||
{ | { | ||||
} | |||||
while (!TAILQ_EMPTY(&queue->tq_active)) | /* | ||||
TQ_SLEEP(queue, &queue->tq_active, &queue->tq_mutex, | * Block until all currently queued tasks in this taskqueue | ||||
PWAIT, "-", 0); | * have begun execution. Tasks queued during execution of | ||||
* this function are ignored. | |||||
*/ | |||||
static void | |||||
taskqueue_drain_tq_queue(struct taskqueue *queue) | |||||
{ | |||||
struct task t_barrier; | |||||
if (STAILQ_EMPTY(&queue->tq_queue)) | |||||
return; | |||||
/* | |||||
* Enqueue our barrier with the lowest possible priority | |||||
* so we are inserted after all current tasks. | |||||
*/ | |||||
TASK_INIT(&t_barrier, 0, taskqueue_task_nop_fn, &t_barrier); | |||||
taskqueue_enqueue_locked(queue, &t_barrier); | |||||
/* | |||||
* Raise the barrier's priority so newly queued tasks cannot | |||||
* pass it. | |||||
*/ | |||||
t_barrier.ta_priority = USHRT_MAX; | |||||
/* | |||||
* Once the barrier has executed, all previously queued tasks | |||||
* have completed or are currently executing. | |||||
*/ | |||||
while (t_barrier.ta_pending != 0) | |||||
TQ_SLEEP(queue, &t_barrier, &queue->tq_mutex, PWAIT, "-", 0); | |||||
} | } | ||||
/* | |||||
* Block until all currently executing tasks for this taskqueue | |||||
* complete. Tasks that begin execution during the execution | |||||
* of this function are ignored. | |||||
*/ | |||||
static void | |||||
taskqueue_drain_tq_active(struct taskqueue *queue) | |||||
{ | |||||
struct taskqueue_busy tb_marker, *tb_first; | |||||
if (TAILQ_EMPTY(&queue->tq_active)) | |||||
return; | |||||
/* Block taskq_terminate().*/ | |||||
queue->tq_callouts++; | |||||
/* | |||||
* Wait for all currently executing taskqueue threads | |||||
* to go idle. | |||||
*/ | |||||
tb_marker.tb_running = TB_DRAIN_WAITER; | |||||
TAILQ_INSERT_TAIL(&queue->tq_active, &tb_marker, tb_link); | |||||
while (TAILQ_FIRST(&queue->tq_active) != &tb_marker) | |||||
TQ_SLEEP(queue, &tb_marker, &queue->tq_mutex, PWAIT, "-", 0); | |||||
TAILQ_REMOVE(&queue->tq_active, &tb_marker, tb_link); | |||||
/* | |||||
* Wakeup any other drain waiter that happend to queue up | |||||
* without any interveining active thread. | |||||
jhb: s/interveining/intervening/ | |||||
Not Done Inline ActionsThanks for catching that. Happened was misspelled too. Both fixed. gibbs: Thanks for catching that. Happened was misspelled too. Both fixed. | |||||
*/ | |||||
tb_first = TAILQ_FIRST(&queue->tq_active); | |||||
if (tb_first != NULL && tb_first->tb_running == TB_DRAIN_WAITER) | |||||
wakeup(tb_first); | |||||
/* Release taskqueue_terminate().*/ | |||||
queue->tq_callouts--; | |||||
if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0) | |||||
wakeup_one(queue->tq_threads); | |||||
} | |||||
void | void | ||||
taskqueue_block(struct taskqueue *queue) | taskqueue_block(struct taskqueue *queue) | ||||
{ | { | ||||
Context not available. | |||||
taskqueue_run_locked(struct taskqueue *queue) | taskqueue_run_locked(struct taskqueue *queue) | ||||
{ | { | ||||
struct taskqueue_busy tb; | struct taskqueue_busy tb; | ||||
struct taskqueue_busy *tb_first; | |||||
struct task *task; | struct task *task; | ||||
int pending; | int pending; | ||||
TQ_ASSERT_LOCKED(queue); | TQ_ASSERT_LOCKED(queue); | ||||
tb.tb_running = NULL; | tb.tb_running = NULL; | ||||
TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link); | |||||
while (STAILQ_FIRST(&queue->tq_queue)) { | while (STAILQ_FIRST(&queue->tq_queue)) { | ||||
TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link); | |||||
/* | /* | ||||
* Carefully remove the first task from the queue and | * Carefully remove the first task from the queue and | ||||
* zero its pending count. | * zero its pending count. | ||||
Context not available. | |||||
TQ_LOCK(queue); | TQ_LOCK(queue); | ||||
tb.tb_running = NULL; | tb.tb_running = NULL; | ||||
wakeup(task); | wakeup(task); | ||||
TAILQ_REMOVE(&queue->tq_active, &tb, tb_link); | |||||
tb_first = TAILQ_FIRST(&queue->tq_active); | |||||
if (tb_first != NULL && | |||||
tb_first->tb_running == TB_DRAIN_WAITER) | |||||
wakeup(tb_first); | |||||
} | } | ||||
TAILQ_REMOVE(&queue->tq_active, &tb, tb_link); | |||||
if (TAILQ_EMPTY(&queue->tq_active)) | |||||
wakeup(&queue->tq_active); | |||||
} | } | ||||
void | void | ||||
Context not available. | |||||
void | void | ||||
taskqueue_drain_all(struct taskqueue *queue) | taskqueue_drain_all(struct taskqueue *queue) | ||||
{ | { | ||||
struct task *task; | |||||
if (!queue->tq_spin) | if (!queue->tq_spin) | ||||
WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); | WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); | ||||
TQ_LOCK(queue); | TQ_LOCK(queue); | ||||
task = STAILQ_LAST(&queue->tq_queue, task, ta_link); | taskqueue_drain_tq_queue(queue); | ||||
if (task != NULL) | taskqueue_drain_tq_active(queue); | ||||
while (task->ta_pending != 0) | |||||
TQ_SLEEP(queue, task, &queue->tq_mutex, PWAIT, "-", 0); | |||||
taskqueue_drain_running(queue); | |||||
KASSERT(STAILQ_EMPTY(&queue->tq_queue), | |||||
("taskqueue queue is not empty after draining")); | |||||
TQ_UNLOCK(queue); | TQ_UNLOCK(queue); | ||||
} | } | ||||
Context not available. |
s/interveining/intervening/