Changeset View
Changeset View
Standalone View
Standalone View
head/sys/kern/subr_taskqueue.c
Show First 20 Lines • Show All 255 Lines • ▼ Show 20 Lines | taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task) | ||||
if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0) | if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0) | ||||
TQ_UNLOCK(queue); | TQ_UNLOCK(queue); | ||||
/* Return with lock released. */ | /* Return with lock released. */ | ||||
return (0); | return (0); | ||||
} | } | ||||
int | int | ||||
grouptaskqueue_enqueue(struct taskqueue *queue, struct task *task) | |||||
{ | |||||
TQ_LOCK(queue); | |||||
if (task->ta_pending) { | |||||
TQ_UNLOCK(queue); | |||||
return (0); | |||||
} | |||||
STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); | |||||
task->ta_pending = 1; | |||||
TQ_UNLOCK(queue); | |||||
if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) | |||||
queue->tq_enqueue(queue->tq_context); | |||||
return (0); | |||||
} | |||||
int | |||||
taskqueue_enqueue(struct taskqueue *queue, struct task *task) | taskqueue_enqueue(struct taskqueue *queue, struct task *task) | ||||
{ | { | ||||
int res; | int res; | ||||
TQ_LOCK(queue); | TQ_LOCK(queue); | ||||
res = taskqueue_enqueue_locked(queue, task); | res = taskqueue_enqueue_locked(queue, task); | ||||
/* The lock is released inside. */ | /* The lock is released inside. */ | ||||
▲ Show 20 Lines • Show All 512 Lines • ▼ Show 20 Lines | for (i = 0, j = 0; ; i++) { | ||||
if (queue->tq_threads[i] == td) { | if (queue->tq_threads[i] == td) { | ||||
ret = 1; | ret = 1; | ||||
break; | break; | ||||
} | } | ||||
if (++j >= queue->tq_tcount) | if (++j >= queue->tq_tcount) | ||||
break; | break; | ||||
} | } | ||||
return (ret); | return (ret); | ||||
} | |||||
struct taskqgroup_cpu { | |||||
LIST_HEAD(, grouptask) tgc_tasks; | |||||
struct taskqueue *tgc_taskq; | |||||
int tgc_cnt; | |||||
int tgc_cpu; | |||||
}; | |||||
struct taskqgroup { | |||||
struct taskqgroup_cpu tqg_queue[MAXCPU]; | |||||
struct mtx tqg_lock; | |||||
char * tqg_name; | |||||
int tqg_adjusting; | |||||
int tqg_stride; | |||||
int tqg_cnt; | |||||
}; | |||||
struct taskq_bind_task { | |||||
struct task bt_task; | |||||
int bt_cpuid; | |||||
}; | |||||
static void | |||||
taskqgroup_cpu_create(struct taskqgroup *qgroup, int idx) | |||||
{ | |||||
struct taskqgroup_cpu *qcpu; | |||||
int i, j; | |||||
qcpu = &qgroup->tqg_queue[idx]; | |||||
LIST_INIT(&qcpu->tgc_tasks); | |||||
qcpu->tgc_taskq = taskqueue_create_fast(NULL, M_WAITOK, | |||||
taskqueue_thread_enqueue, &qcpu->tgc_taskq); | |||||
taskqueue_start_threads(&qcpu->tgc_taskq, 1, PI_SOFT, | |||||
"%s_%d", qgroup->tqg_name, idx); | |||||
for (i = CPU_FIRST(), j = 0; j < idx * qgroup->tqg_stride; | |||||
j++, i = CPU_NEXT(i)) { | |||||
/* | |||||
* Wait: evaluate the idx * qgroup->tqg_stride'th CPU, | |||||
* potentially wrapping the actual count | |||||
*/ | |||||
} | |||||
qcpu->tgc_cpu = i; | |||||
} | |||||
static void | |||||
taskqgroup_cpu_remove(struct taskqgroup *qgroup, int idx) | |||||
{ | |||||
taskqueue_free(qgroup->tqg_queue[idx].tgc_taskq); | |||||
} | |||||
/* | |||||
* Find the taskq with least # of tasks that doesn't currently have any | |||||
* other queues from the uniq identifier. | |||||
*/ | |||||
static int | |||||
taskqgroup_find(struct taskqgroup *qgroup, void *uniq) | |||||
{ | |||||
struct grouptask *n; | |||||
int i, idx, mincnt; | |||||
int strict; | |||||
mtx_assert(&qgroup->tqg_lock, MA_OWNED); | |||||
if (qgroup->tqg_cnt == 0) | |||||
return (0); | |||||
idx = -1; | |||||
mincnt = INT_MAX; | |||||
/* | |||||
* Two passes; First scan for a queue with the least tasks that | |||||
* does not already service this uniq id. If that fails simply find | |||||
* the queue with the least total tasks; | |||||
*/ | |||||
for (strict = 1; mincnt == INT_MAX; strict = 0) { | |||||
for (i = 0; i < qgroup->tqg_cnt; i++) { | |||||
if (qgroup->tqg_queue[i].tgc_cnt > mincnt) | |||||
continue; | |||||
if (strict) { | |||||
LIST_FOREACH(n, | |||||
&qgroup->tqg_queue[i].tgc_tasks, gt_list) | |||||
if (n->gt_uniq == uniq) | |||||
break; | |||||
if (n != NULL) | |||||
continue; | |||||
} | |||||
mincnt = qgroup->tqg_queue[i].tgc_cnt; | |||||
idx = i; | |||||
} | |||||
} | |||||
if (idx == -1) | |||||
panic("taskqgroup_find: Failed to pick a qid."); | |||||
return (idx); | |||||
} | |||||
void | |||||
taskqgroup_attach(struct taskqgroup *qgroup, struct grouptask *gtask, | |||||
void *uniq, int irq, char *name) | |||||
{ | |||||
cpuset_t mask; | |||||
int qid; | |||||
gtask->gt_uniq = uniq; | |||||
gtask->gt_name = name; | |||||
gtask->gt_irq = irq; | |||||
gtask->gt_cpu = -1; | |||||
mtx_lock(&qgroup->tqg_lock); | |||||
qid = taskqgroup_find(qgroup, uniq); | |||||
qgroup->tqg_queue[qid].tgc_cnt++; | |||||
LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list); | |||||
gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq; | |||||
if (irq != -1 && smp_started) { | |||||
CPU_ZERO(&mask); | |||||
CPU_SET(qgroup->tqg_queue[qid].tgc_cpu, &mask); | |||||
mtx_unlock(&qgroup->tqg_lock); | |||||
intr_setaffinity(irq, &mask); | |||||
} else | |||||
mtx_unlock(&qgroup->tqg_lock); | |||||
} | |||||
int | |||||
taskqgroup_attach_cpu(struct taskqgroup *qgroup, struct grouptask *gtask, | |||||
void *uniq, int cpu, int irq, char *name) | |||||
{ | |||||
cpuset_t mask; | |||||
int i, qid; | |||||
qid = -1; | |||||
gtask->gt_uniq = uniq; | |||||
gtask->gt_name = name; | |||||
gtask->gt_irq = irq; | |||||
gtask->gt_cpu = cpu; | |||||
mtx_lock(&qgroup->tqg_lock); | |||||
if (smp_started) { | |||||
for (i = 0; i < qgroup->tqg_cnt; i++) | |||||
if (qgroup->tqg_queue[i].tgc_cpu == cpu) { | |||||
qid = i; | |||||
break; | |||||
} | |||||
if (qid == -1) { | |||||
mtx_unlock(&qgroup->tqg_lock); | |||||
return (EINVAL); | |||||
} | |||||
} else | |||||
qid = 0; | |||||
qgroup->tqg_queue[qid].tgc_cnt++; | |||||
LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list); | |||||
gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq; | |||||
if (irq != -1 && smp_started) { | |||||
CPU_ZERO(&mask); | |||||
CPU_SET(qgroup->tqg_queue[qid].tgc_cpu, &mask); | |||||
mtx_unlock(&qgroup->tqg_lock); | |||||
intr_setaffinity(irq, &mask); | |||||
} else | |||||
mtx_unlock(&qgroup->tqg_lock); | |||||
return (0); | |||||
} | |||||
void | |||||
taskqgroup_detach(struct taskqgroup *qgroup, struct grouptask *gtask) | |||||
{ | |||||
int i; | |||||
mtx_lock(&qgroup->tqg_lock); | |||||
for (i = 0; i < qgroup->tqg_cnt; i++) | |||||
if (qgroup->tqg_queue[i].tgc_taskq == gtask->gt_taskqueue) | |||||
break; | |||||
if (i == qgroup->tqg_cnt) | |||||
panic("taskqgroup_detach: task not in group\n"); | |||||
qgroup->tqg_queue[i].tgc_cnt--; | |||||
LIST_REMOVE(gtask, gt_list); | |||||
mtx_unlock(&qgroup->tqg_lock); | |||||
gtask->gt_taskqueue = NULL; | |||||
} | |||||
static void | |||||
taskqgroup_binder(void *ctx, int pending) | |||||
{ | |||||
struct taskq_bind_task *task = (struct taskq_bind_task *)ctx; | |||||
cpuset_t mask; | |||||
int error; | |||||
CPU_ZERO(&mask); | |||||
CPU_SET(task->bt_cpuid, &mask); | |||||
error = cpuset_setthread(curthread->td_tid, &mask); | |||||
thread_lock(curthread); | |||||
sched_bind(curthread, task->bt_cpuid); | |||||
thread_unlock(curthread); | |||||
if (error) | |||||
printf("taskqgroup_binder: setaffinity failed: %d\n", | |||||
error); | |||||
free(task, M_DEVBUF); | |||||
} | |||||
static void | |||||
taskqgroup_bind(struct taskqgroup *qgroup) | |||||
{ | |||||
struct taskq_bind_task *task; | |||||
int i; | |||||
/* | |||||
* Bind taskqueue threads to specific CPUs, if they have been assigned | |||||
* one. | |||||
*/ | |||||
for (i = 0; i < qgroup->tqg_cnt; i++) { | |||||
task = malloc(sizeof (*task), M_DEVBUF, M_NOWAIT); | |||||
TASK_INIT(&task->bt_task, 0, taskqgroup_binder, task); | |||||
task->bt_cpuid = qgroup->tqg_queue[i].tgc_cpu; | |||||
taskqueue_enqueue(qgroup->tqg_queue[i].tgc_taskq, | |||||
&task->bt_task); | |||||
} | |||||
} | |||||
static int | |||||
_taskqgroup_adjust(struct taskqgroup *qgroup, int cnt, int stride) | |||||
{ | |||||
LIST_HEAD(, grouptask) gtask_head = LIST_HEAD_INITIALIZER(NULL); | |||||
cpuset_t mask; | |||||
struct grouptask *gtask; | |||||
int i, k, old_cnt, qid, cpu; | |||||
mtx_assert(&qgroup->tqg_lock, MA_OWNED); | |||||
if (cnt < 1 || cnt * stride > mp_ncpus || !smp_started) { | |||||
printf("taskqgroup_adjust failed cnt: %d stride: %d " | |||||
"mp_ncpus: %d smp_started: %d\n", cnt, stride, mp_ncpus, | |||||
smp_started); | |||||
return (EINVAL); | |||||
} | |||||
if (qgroup->tqg_adjusting) { | |||||
printf("taskqgroup_adjust failed: adjusting\n"); | |||||
return (EBUSY); | |||||
} | |||||
qgroup->tqg_adjusting = 1; | |||||
old_cnt = qgroup->tqg_cnt; | |||||
mtx_unlock(&qgroup->tqg_lock); | |||||
/* | |||||
* Set up queue for tasks added before boot. | |||||
*/ | |||||
if (old_cnt == 0) { | |||||
LIST_SWAP(>ask_head, &qgroup->tqg_queue[0].tgc_tasks, | |||||
grouptask, gt_list); | |||||
qgroup->tqg_queue[0].tgc_cnt = 0; | |||||
} | |||||
/* | |||||
* If new taskq threads have been added. | |||||
*/ | |||||
for (i = old_cnt; i < cnt; i++) | |||||
taskqgroup_cpu_create(qgroup, i); | |||||
mtx_lock(&qgroup->tqg_lock); | |||||
qgroup->tqg_cnt = cnt; | |||||
qgroup->tqg_stride = stride; | |||||
/* | |||||
* Adjust drivers to use new taskqs. | |||||
*/ | |||||
for (i = 0; i < old_cnt; i++) { | |||||
while ((gtask = LIST_FIRST(&qgroup->tqg_queue[i].tgc_tasks))) { | |||||
LIST_REMOVE(gtask, gt_list); | |||||
qgroup->tqg_queue[i].tgc_cnt--; | |||||
LIST_INSERT_HEAD(>ask_head, gtask, gt_list); | |||||
} | |||||
} | |||||
while ((gtask = LIST_FIRST(>ask_head))) { | |||||
LIST_REMOVE(gtask, gt_list); | |||||
if (gtask->gt_cpu == -1) | |||||
qid = taskqgroup_find(qgroup, gtask->gt_uniq); | |||||
else { | |||||
for (i = 0; i < qgroup->tqg_cnt; i++) | |||||
if (qgroup->tqg_queue[i].tgc_cpu == gtask->gt_cpu) { | |||||
qid = i; | |||||
break; | |||||
} | |||||
} | |||||
qgroup->tqg_queue[qid].tgc_cnt++; | |||||
LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, | |||||
gt_list); | |||||
gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq; | |||||
} | |||||
/* | |||||
* Set new CPU and IRQ affinity | |||||
*/ | |||||
cpu = CPU_FIRST(); | |||||
for (i = 0; i < cnt; i++) { | |||||
qgroup->tqg_queue[i].tgc_cpu = cpu; | |||||
for (k = 0; k < qgroup->tqg_stride; k++) | |||||
cpu = CPU_NEXT(cpu); | |||||
CPU_ZERO(&mask); | |||||
CPU_SET(qgroup->tqg_queue[i].tgc_cpu, &mask); | |||||
LIST_FOREACH(gtask, &qgroup->tqg_queue[i].tgc_tasks, gt_list) { | |||||
if (gtask->gt_irq == -1) | |||||
continue; | |||||
intr_setaffinity(gtask->gt_irq, &mask); | |||||
} | |||||
} | |||||
mtx_unlock(&qgroup->tqg_lock); | |||||
/* | |||||
* If taskq thread count has been reduced. | |||||
*/ | |||||
for (i = cnt; i < old_cnt; i++) | |||||
taskqgroup_cpu_remove(qgroup, i); | |||||
mtx_lock(&qgroup->tqg_lock); | |||||
qgroup->tqg_adjusting = 0; | |||||
taskqgroup_bind(qgroup); | |||||
return (0); | |||||
} | |||||
int | |||||
taskqgroup_adjust(struct taskqgroup *qgroup, int cpu, int stride) | |||||
{ | |||||
int error; | |||||
mtx_lock(&qgroup->tqg_lock); | |||||
error = _taskqgroup_adjust(qgroup, cpu, stride); | |||||
mtx_unlock(&qgroup->tqg_lock); | |||||
return (error); | |||||
} | |||||
struct taskqgroup * | |||||
taskqgroup_create(char *name) | |||||
{ | |||||
struct taskqgroup *qgroup; | |||||
qgroup = malloc(sizeof(*qgroup), M_TASKQUEUE, M_WAITOK | M_ZERO); | |||||
mtx_init(&qgroup->tqg_lock, "taskqgroup", NULL, MTX_DEF); | |||||
qgroup->tqg_name = name; | |||||
LIST_INIT(&qgroup->tqg_queue[0].tgc_tasks); | |||||
return (qgroup); | |||||
} | |||||
void | |||||
taskqgroup_destroy(struct taskqgroup *qgroup) | |||||
{ | |||||
} | } |