Index: sys/kern/subr_taskqueue.c =================================================================== --- sys/kern/subr_taskqueue.c +++ sys/kern/subr_taskqueue.c @@ -34,12 +34,14 @@ #include #include #include +#include #include #include #include #include #include #include +#include #include #include #include @@ -62,6 +64,7 @@ STAILQ_HEAD(, task) tq_queue; taskqueue_enqueue_fn tq_enqueue; void *tq_context; + char *tq_name; TAILQ_HEAD(, taskqueue_busy) tq_active; struct mtx tq_mutex; struct thread **tq_threads; @@ -119,11 +122,17 @@ } static struct taskqueue * -_taskqueue_create(const char *name __unused, int mflags, +_taskqueue_create(const char *name, int mflags, taskqueue_enqueue_fn enqueue, void *context, - int mtxflags, const char *mtxname) + int mtxflags, const char *mtxname __unused) { struct taskqueue *queue; + char *tq_name = NULL; + + if (name != NULL) + tq_name = strndup(name, 32, M_TASKQUEUE); + if (tq_name == NULL) + tq_name = "taskqueue"; queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO); if (!queue) @@ -133,6 +142,7 @@ TAILQ_INIT(&queue->tq_active); queue->tq_enqueue = enqueue; queue->tq_context = context; + queue->tq_name = tq_name; queue->tq_spin = (mtxflags & MTX_SPIN) != 0; queue->tq_flags |= TQ_FLAGS_ACTIVE; if (enqueue == taskqueue_fast_enqueue || @@ -140,7 +150,7 @@ enqueue == taskqueue_swi_giant_enqueue || enqueue == taskqueue_thread_enqueue) queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE; - mtx_init(&queue->tq_mutex, mtxname, NULL, mtxflags); + mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags); return queue; } @@ -149,8 +159,9 @@ taskqueue_create(const char *name, int mflags, taskqueue_enqueue_fn enqueue, void *context) { + return _taskqueue_create(name, mflags, enqueue, context, - MTX_DEF, "taskqueue"); + MTX_DEF, name); } void @@ -194,6 +205,7 @@ KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks")); mtx_destroy(&queue->tq_mutex); free(queue->tq_threads, M_TASKQUEUE); + free(queue->tq_name, M_TASKQUEUE); free(queue, M_TASKQUEUE); } @@ -203,6 +215,7 @@ struct task *ins; struct task *prev; + KASSERT(task->ta_func != NULL, ("enqueueing task with NULL func")); /* * Count multiple enqueues. */ @@ -410,6 +423,7 @@ struct task *task; int pending; + KASSERT(queue != NULL, ("tq is NULL")); TQ_ASSERT_LOCKED(queue); tb.tb_running = NULL; @@ -421,12 +435,14 @@ * zero its pending count. */ task = STAILQ_FIRST(&queue->tq_queue); + KASSERT(task != NULL, ("task is NULL")); STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); pending = task->ta_pending; task->ta_pending = 0; tb.tb_running = task; TQ_UNLOCK(queue); + KASSERT(task->ta_func != NULL, ("task->ta_func is NULL")); task->ta_func(task->ta_context, pending); TQ_LOCK(queue); @@ -680,6 +696,7 @@ taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT); TQ_LOCK(tq); while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) { + /* XXX ? */ taskqueue_run_locked(tq); /* * Because taskqueue_run() can drop tq_mutex, we need to @@ -779,3 +796,317 @@ } return (ret); } + +struct taskqgroup_cpu { + LIST_HEAD(, grouptask) tgc_tasks; + struct taskqueue *tgc_taskq; + int tgc_cnt; + int tgc_cpu; +}; + +struct taskqgroup { + struct taskqgroup_cpu tqg_queue[MAXCPU]; + struct mtx tqg_lock; + char * tqg_name; + int tqg_adjusting; + int tqg_stride; + int tqg_cnt; +}; + +struct taskq_bind_task { + struct task bt_task; + int bt_cpuid; +}; + +static void +taskqgroup_cpu_create(struct taskqgroup *qgroup, int idx) +{ + struct taskqgroup_cpu *qcpu; + + qcpu = &qgroup->tqg_queue[idx]; + LIST_INIT(&qcpu->tgc_tasks); + qcpu->tgc_taskq = taskqueue_create_fast(NULL, M_WAITOK, + taskqueue_thread_enqueue, &qcpu->tgc_taskq); + taskqueue_start_threads(&qcpu->tgc_taskq, 1, PI_SOFT, + "%s_%d", qgroup->tqg_name, idx); + qcpu->tgc_cpu = idx * qgroup->tqg_stride; +} + +static void +taskqgroup_cpu_remove(struct taskqgroup *qgroup, int idx) +{ + + taskqueue_free(qgroup->tqg_queue[idx].tgc_taskq); +} + +/* + * Find the taskq with least # of tasks that doesn't currently have any + * other queues from the uniq identifier. + */ +static int +taskqgroup_find(struct taskqgroup *qgroup, void *uniq) +{ + struct grouptask *n; + int i, idx, mincnt; + int strict; + + mtx_assert(&qgroup->tqg_lock, MA_OWNED); + if (qgroup->tqg_cnt == 0) + return (0); + idx = -1; + mincnt = INT_MAX; + /* + * Two passes; First scan for a queue with the least tasks that + * does not already service this uniq id. If that fails simply find + * the queue with the least total tasks; + */ + for (strict = 1; mincnt == INT_MAX; strict = 0) { + for (i = 0; i < qgroup->tqg_cnt; i++) { + if (qgroup->tqg_queue[i].tgc_cnt > mincnt) + continue; + if (strict) { + LIST_FOREACH(n, + &qgroup->tqg_queue[i].tgc_tasks, gt_list) + if (n->gt_uniq == uniq) + break; + if (n != NULL) + continue; + } + mincnt = qgroup->tqg_queue[i].tgc_cnt; + idx = i; + } + } + if (idx == -1) + panic("taskqgroup_find: Failed to pick a qid."); + + return (idx); +} + +void +taskqgroup_attach(struct taskqgroup *qgroup, struct grouptask *gtask, + void *uniq, int irq, char *name) +{ + cpuset_t mask; + int qid; + + gtask->gt_uniq = uniq; + gtask->gt_name = name; + gtask->gt_irq = irq; + mtx_lock(&qgroup->tqg_lock); + qid = taskqgroup_find(qgroup, uniq); + qgroup->tqg_queue[qid].tgc_cnt++; + LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list); + gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq; + if (irq != -1 && smp_started) { + CPU_ZERO(&mask); + CPU_SET(qgroup->tqg_queue[qid].tgc_cpu, &mask); + mtx_unlock(&qgroup->tqg_lock); + intr_setaffinity(irq, &mask); + } else + mtx_unlock(&qgroup->tqg_lock); +} + +int +taskqgroup_attach_cpu(struct taskqgroup *qgroup, struct grouptask *gtask, + void *uniq, int cpu, int irq, char *name) +{ + cpuset_t mask; + int i, qid; + + qid = -1; + gtask->gt_uniq = uniq; + gtask->gt_name = name; + gtask->gt_irq = irq; + mtx_lock(&qgroup->tqg_lock); + for (i = 0; i < qgroup->tqg_cnt; i++) + if (qgroup->tqg_queue[i].tgc_cpu == cpu) { + qid = i; + break; + } + if (qid == -1) { + mtx_unlock(&qgroup->tqg_lock); + return (EINVAL); + } + qgroup->tqg_queue[qid].tgc_cnt++; + LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, gt_list); + gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq; + if (irq != -1 && smp_started) { + CPU_ZERO(&mask); + CPU_SET(qgroup->tqg_queue[qid].tgc_cpu, &mask); + mtx_unlock(&qgroup->tqg_lock); + intr_setaffinity(irq, &mask); + } else + mtx_unlock(&qgroup->tqg_lock); + return (0); +} + +void +taskqgroup_detach(struct taskqgroup *qgroup, struct grouptask *gtask) +{ + int i; + + mtx_lock(&qgroup->tqg_lock); + for (i = 0; i < qgroup->tqg_cnt; i++) + if (qgroup->tqg_queue[i].tgc_taskq == gtask->gt_taskqueue) + break; + if (i == qgroup->tqg_cnt) + panic("taskqgroup_detach: task not in group\n"); + qgroup->tqg_queue[i].tgc_cnt--; + LIST_REMOVE(gtask, gt_list); + mtx_unlock(&qgroup->tqg_lock); + gtask->gt_taskqueue = NULL; +} + +static void +taskqgroup_binder(void *ctx, int pending) +{ + struct taskq_bind_task *task = (struct taskq_bind_task *)ctx; + cpuset_t mask; + int error; + + CPU_ZERO(&mask); + CPU_SET(task->bt_cpuid, &mask); + error = cpuset_setthread(curthread->td_tid, &mask); + if (error) + printf("taskqgroup_binder: setaffinity failed: %d\n", + error); + free(task, M_DEVBUF); +} + +static void +taskqgroup_bind(struct taskqgroup *qgroup) +{ + struct taskq_bind_task *task; + int i; + + /* + * Bind taskqueue threads to specific CPUs, if they have been assigned + * one. + */ + for (i = 0; i < qgroup->tqg_cnt; i++) { + task = malloc(sizeof (*task), M_DEVBUF, M_NOWAIT); + TASK_INIT(&task->bt_task, 0, taskqgroup_binder, task); + task->bt_cpuid = qgroup->tqg_queue[i].tgc_cpu; + taskqueue_enqueue_fast(qgroup->tqg_queue[i].tgc_taskq, + &task->bt_task); + } +} + +static int +_taskqgroup_adjust(struct taskqgroup *qgroup, int cnt, int stride) +{ + LIST_HEAD(, grouptask) gtask_head = LIST_HEAD_INITIALIZER(NULL); + cpuset_t mask; + struct grouptask *gtask; + int i, old_cnt, qid; + + mtx_assert(&qgroup->tqg_lock, MA_OWNED); + + if (cnt < 1 || cnt * stride > mp_ncpus || !smp_started) { + printf("taskqgroup_adjust failed cnt: %d stride: %d mp_ncpus: %d smp_started: %d\n", + cnt, stride, mp_ncpus, smp_started); + return (EINVAL); + } + if (qgroup->tqg_adjusting) { + printf("taskqgroup_adjust failed: adjusting\n"); + return (EBUSY); + } + qgroup->tqg_adjusting = 1; + old_cnt = qgroup->tqg_cnt; + mtx_unlock(&qgroup->tqg_lock); + /* + * Set up queue for tasks added before boot. + */ + if (old_cnt == 0) { + LIST_SWAP(>ask_head, &qgroup->tqg_queue[0].tgc_tasks, + grouptask, gt_list); + qgroup->tqg_queue[0].tgc_cnt = 0; + } + + /* + * If new taskq threads have been added. + */ + for (i = old_cnt; i < cnt; i++) + taskqgroup_cpu_create(qgroup, i); + mtx_lock(&qgroup->tqg_lock); + qgroup->tqg_cnt = cnt; + qgroup->tqg_stride = stride; + + /* + * Adjust drivers to use new taskqs. + */ + for (i = 0; i < old_cnt; i++) { + while ((gtask = LIST_FIRST(&qgroup->tqg_queue[i].tgc_tasks))) { + LIST_REMOVE(gtask, gt_list); + qgroup->tqg_queue[i].tgc_cnt--; + LIST_INSERT_HEAD(>ask_head, gtask, gt_list); + } + } + + while ((gtask = LIST_FIRST(>ask_head))) { + LIST_REMOVE(gtask, gt_list); + qid = taskqgroup_find(qgroup, gtask->gt_uniq); + qgroup->tqg_queue[qid].tgc_cnt++; + LIST_INSERT_HEAD(&qgroup->tqg_queue[qid].tgc_tasks, gtask, + gt_list); + gtask->gt_taskqueue = qgroup->tqg_queue[qid].tgc_taskq; + } + /* + * Set new CPU and IRQ affinity + */ + for (i = 0; i < cnt; i++) { + qgroup->tqg_queue[i].tgc_cpu = i * qgroup->tqg_stride; + CPU_ZERO(&mask); + CPU_SET(qgroup->tqg_queue[i].tgc_cpu, &mask); + LIST_FOREACH(gtask, &qgroup->tqg_queue[i].tgc_tasks, gt_list) { + if (gtask->gt_irq == -1) + continue; + intr_setaffinity(gtask->gt_irq, &mask); + } + } + mtx_unlock(&qgroup->tqg_lock); + + /* + * If taskq thread count has been reduced. + */ + for (i = cnt; i < old_cnt; i++) + taskqgroup_cpu_remove(qgroup, i); + + mtx_lock(&qgroup->tqg_lock); + qgroup->tqg_adjusting = 0; + + taskqgroup_bind(qgroup); + + return (0); +} + +int +taskqgroup_adjust(struct taskqgroup *qgroup, int cpu, int stride) +{ + int error; + + mtx_lock(&qgroup->tqg_lock); + error = _taskqgroup_adjust(qgroup, cpu, stride); + mtx_unlock(&qgroup->tqg_lock); + + return (error); +} + +struct taskqgroup * +taskqgroup_create(char *name) +{ + struct taskqgroup *qgroup; + + qgroup = malloc(sizeof(*qgroup), M_TASKQUEUE, M_WAITOK | M_ZERO); + mtx_init(&qgroup->tqg_lock, "taskqgroup", NULL, MTX_DEF); + qgroup->tqg_name = name; + LIST_INIT(&qgroup->tqg_queue[0].tgc_tasks); + + return (qgroup); +} + +void +taskqgroup_destroy(struct taskqgroup *qgroup) +{ + +} Index: sys/sys/_task.h =================================================================== --- sys/sys/_task.h +++ sys/sys/_task.h @@ -51,4 +51,13 @@ void *ta_context; /* (c) argument for handler */ }; +struct grouptask { + struct task gt_task; + void *gt_taskqueue; + LIST_ENTRY(grouptask) gt_list; + void *gt_uniq; + char *gt_name; + int gt_irq; +}; + #endif /* !_SYS__TASK_H_ */ Index: sys/sys/taskqueue.h =================================================================== --- sys/sys/taskqueue.h +++ sys/sys/taskqueue.h @@ -39,6 +39,7 @@ #include struct taskqueue; +struct taskqgroup; struct thread; struct timeout_task { @@ -143,7 +144,7 @@ init; \ } \ \ -SYSINIT(taskqueue_##name, SI_SUB_CONFIGURE, SI_ORDER_SECOND, \ +SYSINIT(taskqueue_##name, SI_SUB_INIT_IF, SI_ORDER_SECOND, \ taskqueue_define_##name, NULL); \ \ struct __hack @@ -168,7 +169,7 @@ init; \ } \ \ -SYSINIT(taskqueue_##name, SI_SUB_CONFIGURE, SI_ORDER_SECOND, \ +SYSINIT(taskqueue_##name, SI_SUB_INIT_IF, SI_ORDER_SECOND, \ taskqueue_define_##name, NULL); \ \ struct __hack @@ -203,4 +204,52 @@ taskqueue_enqueue_fn enqueue, void *context); +/* + * Taskqueue groups. Manages dynamic thread groups and irq binding for + * device and other tasks. + */ +void taskqgroup_attach(struct taskqgroup *qgroup, struct grouptask *gtask, + void *uniq, int irq, char *name); +int taskqgroup_attach_cpu(struct taskqgroup *qgroup, struct grouptask *gtask, + void *uniq, int cpu, int irq, char *name); +void taskqgroup_detach(struct taskqgroup *qgroup, struct grouptask *gtask); +struct taskqgroup *taskqgroup_create(char *name); +void taskqgroup_destroy(struct taskqgroup *qgroup); +int taskqgroup_adjust(struct taskqgroup *qgroup, int cnt, int stride); + +#define GROUPTASK_INIT(gtask, priority, func, context) \ + TASK_INIT(&(gtask)->gt_task, priority, func, context) + +#define GROUPTASK_ENQUEUE(gtask) \ + taskqueue_enqueue((gtask)->gt_taskqueue, &(gtask)->gt_task) + +#define TASKQGROUP_DECLARE(name) \ +extern struct taskqgroup *qgroup_##name + +#define TASKQGROUP_DEFINE(name, cnt, stride) \ + \ +struct taskqgroup *qgroup_##name; \ + \ +static void \ +taskqgroup_define_##name(void *arg) \ +{ \ + qgroup_##name = taskqgroup_create(#name); \ +} \ + \ +SYSINIT(taskqgroup_##name, SI_SUB_CONFIGURE, SI_ORDER_SECOND, \ + taskqgroup_define_##name, NULL); \ + \ +static void \ +taskqgroup_adjust_##name(void *arg) \ +{ \ + taskqgroup_adjust(qgroup_##name, (cnt), (stride)); \ +} \ + \ +SYSINIT(taskqgroup_adj_##name, SI_SUB_SMP, SI_ORDER_ANY, \ + taskqgroup_adjust_##name, NULL); \ + \ +struct __hack + +TASKQGROUP_DECLARE(net); + #endif /* !_SYS_TASKQUEUE_H_ */