diff --git a/share/man/man9/taskqueue.9 b/share/man/man9/taskqueue.9 --- a/share/man/man9/taskqueue.9 +++ b/share/man/man9/taskqueue.9 @@ -28,7 +28,7 @@ .\" .\" $FreeBSD$ .\" -.Dd September 1, 2021 +.Dd April 25, 2022 .Dt TASKQUEUE 9 .Os .Sh NAME @@ -85,6 +85,8 @@ .Ft int .Fn taskqueue_enqueue "struct taskqueue *queue" "struct task *task" .Ft int +.Fn taskqueue_enqueue_flags "struct taskqueue *queue" "struct task *task" "int flags" +.Ft int .Fn taskqueue_enqueue_timeout "struct taskqueue *queue" "struct timeout_task *timeout_task" "int ticks" .Ft int .Fn taskqueue_enqueue_timeout_sbt "struct taskqueue *queue" "struct timeout_task *timeout_task" "sbintime_t sbt" "sbintime_t pr" "int flags" @@ -225,6 +227,28 @@ .Fn taskqueue_enqueue . .Pp The +.Fn taskqueue_enqueue_flags +accepts an extra +.Va flags +parameter which specifies a set of optional flags to alter the behavior of +.Fn taskqueue_enqueue . +It contains one or more of the following flags: +.Bl -tag -width TASKQUEUE_FAIL_IF_CANCELING +.It Dv TASKQUEUE_FAIL_IF_PENDING +.Fn taskqueue_enqueue_flags +fails if the task is already scheduled for execution. +.Er EEXIST +is returned and the +.Va ta_pending +counter value remains unchanged. +.It Dv TASKQUEUE_FAIL_IF_CANCELING +.Fn taskqueue_enqueue_flags +fails if the task is in the canceling state and +.Er ECANCELED +is returned. +.El +.Pp +The .Fn taskqueue_enqueue_timeout function is used to schedule the enqueue after the specified number of .Va ticks . diff --git a/sys/compat/linuxkpi/common/include/linux/kthread.h b/sys/compat/linuxkpi/common/include/linux/kthread.h --- a/sys/compat/linuxkpi/common/include/linux/kthread.h +++ b/sys/compat/linuxkpi/common/include/linux/kthread.h @@ -33,8 +33,29 @@ #include -#include +#include +#include #include +#include +#include +#include +#include + +struct task_struct; +struct kthread_work; + +typedef void (*kthread_work_func_t)(struct kthread_work *work); + +struct kthread_worker { + struct task_struct *task; + struct taskqueue *tq; +}; + +struct kthread_work { + struct taskqueue *tq; + struct task task; + kthread_work_func_t func; +}; #define kthread_run(fn, data, fmt, ...) ({ \ struct task_struct *__task; \ @@ -70,4 +91,78 @@ #define in_atomic() linux_in_atomic() +/* Only kthread_(create|destroy)_worker interface is allowed */ +#define kthread_init_worker(worker) \ + _Static_assert(false, "pre-4.9 worker interface is not supported"); + +task_fn_t lkpi_kthread_work_fn; +task_fn_t lkpi_kthread_worker_init_fn; + +#define kthread_create_worker(flags, fmt, ...) ({ \ + struct kthread_worker *__w; \ + struct task __task; \ + \ + __w = malloc(sizeof(*__w), M_KMALLOC, M_WAITOK | M_ZERO); \ + __w->tq = taskqueue_create("lkpi kthread taskq", M_WAITOK, \ + taskqueue_thread_enqueue, &__w->tq); \ + taskqueue_start_threads(&__w->tq, 1, PWAIT, fmt, ##__VA_ARGS__);\ + TASK_INIT(&__task, 0, lkpi_kthread_worker_init_fn, __w); \ + taskqueue_enqueue(__w->tq, &__task); \ + taskqueue_drain(__w->tq, &__task); \ + __w; \ +}) + +static inline void +kthread_destroy_worker(struct kthread_worker *worker) +{ + taskqueue_drain_all(worker->tq); + taskqueue_free(worker->tq); + free(worker, M_KMALLOC); +} + +static inline void +kthread_init_work(struct kthread_work *work, kthread_work_func_t func) +{ + work->tq = NULL; + work->func = func; + TASK_INIT(&work->task, 0, lkpi_kthread_work_fn, work); +} + +static inline bool +kthread_queue_work(struct kthread_worker *worker, struct kthread_work *work) +{ + int error; + + error = taskqueue_enqueue_flags(worker->tq, &work->task, + TASKQUEUE_FAIL_IF_CANCELING | TASKQUEUE_FAIL_IF_PENDING); + if (error == 0) + work->tq = worker->tq; + return (error == 0); +} + +static inline bool +kthread_cancel_work_sync(struct kthread_work *work) +{ + u_int pending = 0; + + if (work->tq != NULL && + taskqueue_cancel(work->tq, &work->task, &pending) != 0) + taskqueue_drain(work->tq, &work->task); + + return (pending != 0); +} + +static inline void +kthread_flush_work(struct kthread_work *work) +{ + if (work->tq != NULL) + taskqueue_drain(work->tq, &work->task); +} + +static inline void +kthread_flush_worker(struct kthread_worker *worker) +{ + taskqueue_drain_all(worker->tq); +} + #endif /* _LINUXKPI_LINUX_KTHREAD_H_ */ diff --git a/sys/compat/linuxkpi/common/src/linux_kthread.c b/sys/compat/linuxkpi/common/src/linux_kthread.c --- a/sys/compat/linuxkpi/common/src/linux_kthread.c +++ b/sys/compat/linuxkpi/common/src/linux_kthread.c @@ -165,3 +165,19 @@ } kthread_exit(); } + +void +lkpi_kthread_work_fn(void *context, int pending __unused) +{ + struct kthread_work *work = context; + + work->func(work); +} + +void +lkpi_kthread_worker_init_fn(void *context, int pending __unused) +{ + struct kthread_worker *worker = context; + + worker->task = current; +} diff --git a/sys/kern/subr_taskqueue.c b/sys/kern/subr_taskqueue.c --- a/sys/kern/subr_taskqueue.c +++ b/sys/kern/subr_taskqueue.c @@ -59,6 +59,7 @@ struct taskqueue_busy { struct task *tb_running; u_int tb_seq; + bool tb_canceling; LIST_ENTRY(taskqueue_busy) tb_link; }; @@ -125,6 +126,19 @@ return (msleep(p, &tq->tq_mutex, 0, wm, 0)); } +static struct taskqueue_busy * +task_get_busy(struct taskqueue *queue, struct task *task) +{ + struct taskqueue_busy *tb; + + TQ_ASSERT_LOCKED(queue); + LIST_FOREACH(tb, &queue->tq_active, tb_link) { + if (tb->tb_running == task) + return (tb); + } + return (NULL); +} + static struct taskqueue * _taskqueue_create(const char *name, int mflags, taskqueue_enqueue_fn enqueue, void *context, @@ -217,16 +231,32 @@ } static int -taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task) +taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task, int flags) { struct task *ins; struct task *prev; + struct taskqueue_busy *tb; KASSERT(task->ta_func != NULL, ("enqueueing task with NULL func")); + /* + * Ignore canceling task if requested. + */ + if (__predict_false((flags & TASKQUEUE_FAIL_IF_CANCELING) != 0)) { + tb = task_get_busy(queue, task); + if (tb != NULL && tb->tb_canceling) { + TQ_UNLOCK(queue); + return (ECANCELED); + } + } + /* * Count multiple enqueues. */ if (task->ta_pending) { + if (__predict_false((flags & TASKQUEUE_FAIL_IF_PENDING) != 0)) { + TQ_UNLOCK(queue); + return (EEXIST); + } if (task->ta_pending < USHRT_MAX) task->ta_pending++; TQ_UNLOCK(queue); @@ -274,17 +304,23 @@ } int -taskqueue_enqueue(struct taskqueue *queue, struct task *task) +taskqueue_enqueue_flags(struct taskqueue *queue, struct task *task, int flags) { int res; TQ_LOCK(queue); - res = taskqueue_enqueue_locked(queue, task); + res = taskqueue_enqueue_locked(queue, task, flags); /* The lock is released inside. */ return (res); } +int +taskqueue_enqueue(struct taskqueue *queue, struct task *task) +{ + return (taskqueue_enqueue_flags(queue, task, 0)); +} + static void taskqueue_timeout_func(void *arg) { @@ -296,7 +332,7 @@ KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout")); timeout_task->f &= ~DT_CALLOUT_ARMED; queue->tq_callouts--; - taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t); + taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t, 0); /* The lock is released inside. */ } @@ -316,7 +352,7 @@ TQ_UNLOCK(queue); res = -1; } else if (sbt == 0) { - taskqueue_enqueue_locked(queue, &timeout_task->t); + taskqueue_enqueue_locked(queue, &timeout_task->t, 0); /* The lock is released inside. */ } else { if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) { @@ -464,6 +500,7 @@ task->ta_pending = 0; tb.tb_running = task; tb.tb_seq = ++queue->tq_seq; + tb.tb_canceling = false; TQ_UNLOCK(queue); KASSERT(task->ta_func != NULL, ("task->ta_func is NULL")); @@ -493,19 +530,6 @@ TQ_UNLOCK(queue); } -static int -task_is_running(struct taskqueue *queue, struct task *task) -{ - struct taskqueue_busy *tb; - - TQ_ASSERT_LOCKED(queue); - LIST_FOREACH(tb, &queue->tq_active, tb_link) { - if (tb->tb_running == task) - return (1); - } - return (0); -} - /* * Only use this function in single threaded contexts. It returns * non-zero if the given task is either pending or running. Else the @@ -517,7 +541,7 @@ int retval; TQ_LOCK(queue); - retval = task->ta_pending > 0 || task_is_running(queue, task); + retval = task->ta_pending > 0 || task_get_busy(queue, task) != NULL; TQ_UNLOCK(queue); return (retval); @@ -527,6 +551,8 @@ taskqueue_cancel_locked(struct taskqueue *queue, struct task *task, u_int *pendp) { + struct taskqueue_busy *tb; + int retval = 0; if (task->ta_pending > 0) { STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link); @@ -536,7 +562,13 @@ if (pendp != NULL) *pendp = task->ta_pending; task->ta_pending = 0; - return (task_is_running(queue, task) ? EBUSY : 0); + tb = task_get_busy(queue, task); + if (tb != NULL) { + tb->tb_canceling = true; + retval = EBUSY; + } + + return (retval); } int @@ -580,7 +612,7 @@ WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); TQ_LOCK(queue); - while (task->ta_pending != 0 || task_is_running(queue, task)) + while (task->ta_pending != 0 || task_get_busy(queue, task) != NULL) TQ_SLEEP(queue, task, "tq_drain"); TQ_UNLOCK(queue); } diff --git a/sys/sys/taskqueue.h b/sys/sys/taskqueue.h --- a/sys/sys/taskqueue.h +++ b/sys/sys/taskqueue.h @@ -61,6 +61,10 @@ #define TASKQUEUE_NUM_CALLBACKS TASKQUEUE_CALLBACK_TYPE_MAX + 1 #define TASKQUEUE_NAMELEN 32 +/* taskqueue_enqueue flags */ +#define TASKQUEUE_FAIL_IF_PENDING (1 << 0) +#define TASKQUEUE_FAIL_IF_CANCELING (1 << 1) + typedef void (*taskqueue_callback_fn)(void *context); /* @@ -82,6 +86,8 @@ int taskqueue_start_threads_cpuset(struct taskqueue **tqp, int count, int pri, cpuset_t *mask, const char *name, ...) __printflike(5, 6); int taskqueue_enqueue(struct taskqueue *queue, struct task *task); +int taskqueue_enqueue_flags(struct taskqueue *queue, struct task *task, + int flags); int taskqueue_enqueue_timeout(struct taskqueue *queue, struct timeout_task *timeout_task, int ticks); int taskqueue_enqueue_timeout_sbt(struct taskqueue *queue,