Changeset View
Changeset View
Standalone View
Standalone View
sys/kern/subr_taskqueue.c
Show First 20 Lines • Show All 50 Lines • ▼ Show 20 Lines | |||||
static void taskqueue_swi_enqueue(void *); | static void taskqueue_swi_enqueue(void *); | ||||
static void taskqueue_swi_giant_enqueue(void *); | static void taskqueue_swi_giant_enqueue(void *); | ||||
struct taskqueue_busy { | struct taskqueue_busy { | ||||
struct task *tb_running; | struct task *tb_running; | ||||
TAILQ_ENTRY(taskqueue_busy) tb_link; | TAILQ_ENTRY(taskqueue_busy) tb_link; | ||||
}; | }; | ||||
struct task * const TB_DRAIN_WAITER = (struct task *)0x1; | |||||
struct taskqueue { | struct taskqueue { | ||||
STAILQ_HEAD(, task) tq_queue; | STAILQ_HEAD(, task) tq_queue; | ||||
taskqueue_enqueue_fn tq_enqueue; | taskqueue_enqueue_fn tq_enqueue; | ||||
void *tq_context; | void *tq_context; | ||||
TAILQ_HEAD(, taskqueue_busy) tq_active; | TAILQ_HEAD(, taskqueue_busy) tq_active; | ||||
struct mtx tq_mutex; | struct mtx tq_mutex; | ||||
struct thread **tq_threads; | struct thread **tq_threads; | ||||
int tq_tcount; | int tq_tcount; | ||||
▲ Show 20 Lines • Show All 169 Lines • ▼ Show 20 Lines | taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task) | ||||
if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) | if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) | ||||
queue->tq_enqueue(queue->tq_context); | queue->tq_enqueue(queue->tq_context); | ||||
if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0) | if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0) | ||||
TQ_UNLOCK(queue); | TQ_UNLOCK(queue); | ||||
/* Return with lock released. */ | /* Return with lock released. */ | ||||
return (0); | return (0); | ||||
} | } | ||||
int | int | ||||
taskqueue_enqueue(struct taskqueue *queue, struct task *task) | taskqueue_enqueue(struct taskqueue *queue, struct task *task) | ||||
{ | { | ||||
int res; | int res; | ||||
TQ_LOCK(queue); | TQ_LOCK(queue); | ||||
res = taskqueue_enqueue_locked(queue, task); | res = taskqueue_enqueue_locked(queue, task); | ||||
/* The lock is released inside. */ | /* The lock is released inside. */ | ||||
▲ Show 20 Lines • Show All 45 Lines • ▼ Show 20 Lines | if (ticks > 0) { | ||||
taskqueue_timeout_func, timeout_task); | taskqueue_timeout_func, timeout_task); | ||||
} | } | ||||
TQ_UNLOCK(queue); | TQ_UNLOCK(queue); | ||||
} | } | ||||
return (res); | return (res); | ||||
} | } | ||||
static void | static void | ||||
taskqueue_drain_running(struct taskqueue *queue) | taskqueue_task_nop_fn(void *context, int pending) | ||||
{ | { | ||||
} | |||||
while (!TAILQ_EMPTY(&queue->tq_active)) | /* | ||||
TQ_SLEEP(queue, &queue->tq_active, &queue->tq_mutex, | * Block until all currently queued tasks in this taskqueue | ||||
PWAIT, "-", 0); | * have begun execution. Tasks queued during execution of | ||||
* this function are ignored. | |||||
*/ | |||||
static void | |||||
taskqueue_drain_tq_queue(struct taskqueue *queue) | |||||
{ | |||||
struct task t_barrier; | |||||
if (STAILQ_EMPTY(&queue->tq_queue)) | |||||
return; | |||||
/* | |||||
* Enqueue our barrier with the lowest possible priority | |||||
* so we are inserted after all current tasks. | |||||
*/ | |||||
TASK_INIT(&t_barrier, 0, taskqueue_task_nop_fn, &t_barrier); | |||||
taskqueue_enqueue_locked(queue, &t_barrier); | |||||
/* | |||||
* Raise the barrier's priority so newly queued tasks cannot | |||||
* pass it. | |||||
*/ | |||||
t_barrier.ta_priority = USHRT_MAX; | |||||
/* | |||||
* Once the barrier has executed, all previously queued tasks | |||||
* have completed or are currently executing. | |||||
*/ | |||||
while (t_barrier.ta_pending != 0) | |||||
TQ_SLEEP(queue, &t_barrier, &queue->tq_mutex, PWAIT, "-", 0); | |||||
} | } | ||||
/* | |||||
* Block until all currently executing tasks for this taskqueue | |||||
* complete. Tasks that begin execution during the execution | |||||
* of this function are ignored. | |||||
*/ | |||||
static void | |||||
taskqueue_drain_tq_active(struct taskqueue *queue) | |||||
{ | |||||
struct taskqueue_busy tb_marker, *tb_first; | |||||
if (TAILQ_EMPTY(&queue->tq_active)) | |||||
return; | |||||
/* Block taskq_terminate().*/ | |||||
queue->tq_callouts++; | |||||
/* | |||||
* Wait for all currently executing taskqueue threads | |||||
* to go idle. | |||||
*/ | |||||
tb_marker.tb_running = TB_DRAIN_WAITER; | |||||
TAILQ_INSERT_TAIL(&queue->tq_active, &tb_marker, tb_link); | |||||
while (TAILQ_FIRST(&queue->tq_active) != &tb_marker) | |||||
TQ_SLEEP(queue, &tb_marker, &queue->tq_mutex, PWAIT, "-", 0); | |||||
TAILQ_REMOVE(&queue->tq_active, &tb_marker, tb_link); | |||||
/* | |||||
* Wakeup any other drain waiter that happened to queue up | |||||
* without any intervening active thread. | |||||
jhb: s/interveining/intervening/ | |||||
Not Done Inline ActionsThanks for catching that. Happened was misspelled too. Both fixed. gibbs: Thanks for catching that. Happened was misspelled too. Both fixed. | |||||
*/ | |||||
tb_first = TAILQ_FIRST(&queue->tq_active); | |||||
if (tb_first != NULL && tb_first->tb_running == TB_DRAIN_WAITER) | |||||
wakeup(tb_first); | |||||
/* Release taskqueue_terminate(). */ | |||||
queue->tq_callouts--; | |||||
if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0) | |||||
wakeup_one(queue->tq_threads); | |||||
} | |||||
void | void | ||||
taskqueue_block(struct taskqueue *queue) | taskqueue_block(struct taskqueue *queue) | ||||
{ | { | ||||
TQ_LOCK(queue); | TQ_LOCK(queue); | ||||
queue->tq_flags |= TQ_FLAGS_BLOCKED; | queue->tq_flags |= TQ_FLAGS_BLOCKED; | ||||
TQ_UNLOCK(queue); | TQ_UNLOCK(queue); | ||||
} | } | ||||
void | void | ||||
taskqueue_unblock(struct taskqueue *queue) | taskqueue_unblock(struct taskqueue *queue) | ||||
{ | { | ||||
TQ_LOCK(queue); | TQ_LOCK(queue); | ||||
queue->tq_flags &= ~TQ_FLAGS_BLOCKED; | queue->tq_flags &= ~TQ_FLAGS_BLOCKED; | ||||
if (!STAILQ_EMPTY(&queue->tq_queue)) | if (!STAILQ_EMPTY(&queue->tq_queue)) | ||||
queue->tq_enqueue(queue->tq_context); | queue->tq_enqueue(queue->tq_context); | ||||
TQ_UNLOCK(queue); | TQ_UNLOCK(queue); | ||||
} | } | ||||
static void | static void | ||||
taskqueue_run_locked(struct taskqueue *queue) | taskqueue_run_locked(struct taskqueue *queue) | ||||
{ | { | ||||
struct taskqueue_busy tb; | struct taskqueue_busy tb; | ||||
struct taskqueue_busy *tb_first; | |||||
struct task *task; | struct task *task; | ||||
int pending; | int pending; | ||||
TQ_ASSERT_LOCKED(queue); | TQ_ASSERT_LOCKED(queue); | ||||
tb.tb_running = NULL; | tb.tb_running = NULL; | ||||
TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link); | |||||
while (STAILQ_FIRST(&queue->tq_queue)) { | while (STAILQ_FIRST(&queue->tq_queue)) { | ||||
TAILQ_INSERT_TAIL(&queue->tq_active, &tb, tb_link); | |||||
/* | /* | ||||
* Carefully remove the first task from the queue and | * Carefully remove the first task from the queue and | ||||
* zero its pending count. | * zero its pending count. | ||||
*/ | */ | ||||
task = STAILQ_FIRST(&queue->tq_queue); | task = STAILQ_FIRST(&queue->tq_queue); | ||||
STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); | STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); | ||||
pending = task->ta_pending; | pending = task->ta_pending; | ||||
task->ta_pending = 0; | task->ta_pending = 0; | ||||
tb.tb_running = task; | tb.tb_running = task; | ||||
TQ_UNLOCK(queue); | TQ_UNLOCK(queue); | ||||
task->ta_func(task->ta_context, pending); | task->ta_func(task->ta_context, pending); | ||||
TQ_LOCK(queue); | TQ_LOCK(queue); | ||||
tb.tb_running = NULL; | tb.tb_running = NULL; | ||||
wakeup(task); | wakeup(task); | ||||
} | |||||
TAILQ_REMOVE(&queue->tq_active, &tb, tb_link); | TAILQ_REMOVE(&queue->tq_active, &tb, tb_link); | ||||
if (TAILQ_EMPTY(&queue->tq_active)) | tb_first = TAILQ_FIRST(&queue->tq_active); | ||||
wakeup(&queue->tq_active); | if (tb_first != NULL && | ||||
tb_first->tb_running == TB_DRAIN_WAITER) | |||||
wakeup(tb_first); | |||||
} | } | ||||
} | |||||
void | void | ||||
taskqueue_run(struct taskqueue *queue) | taskqueue_run(struct taskqueue *queue) | ||||
{ | { | ||||
TQ_LOCK(queue); | TQ_LOCK(queue); | ||||
taskqueue_run_locked(queue); | taskqueue_run_locked(queue); | ||||
TQ_UNLOCK(queue); | TQ_UNLOCK(queue); | ||||
▲ Show 20 Lines • Show All 69 Lines • ▼ Show 20 Lines | taskqueue_drain(struct taskqueue *queue, struct task *task) | ||||
while (task->ta_pending != 0 || task_is_running(queue, task)) | while (task->ta_pending != 0 || task_is_running(queue, task)) | ||||
TQ_SLEEP(queue, task, &queue->tq_mutex, PWAIT, "-", 0); | TQ_SLEEP(queue, task, &queue->tq_mutex, PWAIT, "-", 0); | ||||
TQ_UNLOCK(queue); | TQ_UNLOCK(queue); | ||||
} | } | ||||
void | void | ||||
taskqueue_drain_all(struct taskqueue *queue) | taskqueue_drain_all(struct taskqueue *queue) | ||||
{ | { | ||||
struct task *task; | |||||
if (!queue->tq_spin) | if (!queue->tq_spin) | ||||
WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); | WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); | ||||
TQ_LOCK(queue); | TQ_LOCK(queue); | ||||
task = STAILQ_LAST(&queue->tq_queue, task, ta_link); | taskqueue_drain_tq_queue(queue); | ||||
if (task != NULL) | taskqueue_drain_tq_active(queue); | ||||
while (task->ta_pending != 0) | |||||
TQ_SLEEP(queue, task, &queue->tq_mutex, PWAIT, "-", 0); | |||||
taskqueue_drain_running(queue); | |||||
KASSERT(STAILQ_EMPTY(&queue->tq_queue), | |||||
("taskqueue queue is not empty after draining")); | |||||
TQ_UNLOCK(queue); | TQ_UNLOCK(queue); | ||||
} | } | ||||
void | void | ||||
taskqueue_drain_timeout(struct taskqueue *queue, | taskqueue_drain_timeout(struct taskqueue *queue, | ||||
struct timeout_task *timeout_task) | struct timeout_task *timeout_task) | ||||
{ | { | ||||
▲ Show 20 Lines • Show All 250 Lines • Show Last 20 Lines |
s/interveining/intervening/