diff --git a/share/man/man9/taskqueue.9 b/share/man/man9/taskqueue.9 index 58eb6b7c5571..e13ad9498562 100644 --- a/share/man/man9/taskqueue.9 +++ b/share/man/man9/taskqueue.9 @@ -1,520 +1,544 @@ .\" -*- nroff -*- .\" .\" Copyright (c) 2000 Doug Rabson .\" .\" All rights reserved. .\" .\" This program is free software. .\" .\" Redistribution and use in source and binary forms, with or without .\" modification, are permitted provided that the following conditions .\" are met: .\" 1. Redistributions of source code must retain the above copyright .\" notice, this list of conditions and the following disclaimer. .\" 2. Redistributions in binary form must reproduce the above copyright .\" notice, this list of conditions and the following disclaimer in the .\" documentation and/or other materials provided with the distribution. .\" .\" THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY EXPRESS OR .\" IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES .\" OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. .\" IN NO EVENT SHALL THE DEVELOPERS BE LIABLE FOR ANY DIRECT, INDIRECT, .\" INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT .\" NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, .\" DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY .\" THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT .\" (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF .\" THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. .\" .\" $FreeBSD$ .\" -.Dd September 1, 2021 +.Dd April 25, 2022 .Dt TASKQUEUE 9 .Os .Sh NAME .Nm taskqueue .Nd asynchronous task execution .Sh SYNOPSIS .In sys/param.h .In sys/kernel.h .In sys/malloc.h .In sys/queue.h .In sys/taskqueue.h .Bd -literal typedef void (*task_fn_t)(void *context, int pending); typedef void (*taskqueue_enqueue_fn)(void *context); struct task { STAILQ_ENTRY(task) ta_link; /* link for queue */ u_short ta_pending; /* count times queued */ u_short ta_priority; /* priority of task in queue */ task_fn_t ta_func; /* task handler */ void *ta_context; /* argument for handler */ }; enum taskqueue_callback_type { TASKQUEUE_CALLBACK_TYPE_INIT, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN, }; typedef void (*taskqueue_callback_fn)(void *context); struct timeout_task; .Ed .Ft struct taskqueue * .Fn taskqueue_create "const char *name" "int mflags" "taskqueue_enqueue_fn enqueue" "void *context" .Ft struct taskqueue * .Fn taskqueue_create_fast "const char *name" "int mflags" "taskqueue_enqueue_fn enqueue" "void *context" .Ft int .Fn taskqueue_start_threads "struct taskqueue **tqp" "int count" "int pri" "const char *name" "..." .Ft int .Fo taskqueue_start_threads_cpuset .Fa "struct taskqueue **tqp" "int count" "int pri" "cpuset_t *mask" .Fa "const char *name" "..." .Fc .Ft int .Fo taskqueue_start_threads_in_proc .Fa "struct taskqueue **tqp" "int count" "int pri" "struct proc *proc" .Fa "const char *name" "..." .Fc .Ft void .Fn taskqueue_set_callback "struct taskqueue *queue" "enum taskqueue_callback_type cb_type" "taskqueue_callback_fn callback" "void *context" .Ft void .Fn taskqueue_free "struct taskqueue *queue" .Ft int .Fn taskqueue_enqueue "struct taskqueue *queue" "struct task *task" .Ft int +.Fn taskqueue_enqueue_flags "struct taskqueue *queue" "struct task *task" "int flags" +.Ft int .Fn taskqueue_enqueue_timeout "struct taskqueue *queue" "struct timeout_task *timeout_task" "int ticks" .Ft int .Fn taskqueue_enqueue_timeout_sbt "struct taskqueue *queue" "struct timeout_task *timeout_task" "sbintime_t sbt" "sbintime_t pr" "int flags" .Ft int .Fn taskqueue_cancel "struct taskqueue *queue" "struct task *task" "u_int *pendp" .Ft int .Fn taskqueue_cancel_timeout "struct taskqueue *queue" "struct timeout_task *timeout_task" "u_int *pendp" .Ft void .Fn taskqueue_drain "struct taskqueue *queue" "struct task *task" .Ft void .Fn taskqueue_drain_timeout "struct taskqueue *queue" "struct timeout_task *timeout_task" .Ft void .Fn taskqueue_drain_all "struct taskqueue *queue" .Ft void .Fn taskqueue_quiesce "struct taskqueue *queue" .Ft void .Fn taskqueue_block "struct taskqueue *queue" .Ft void .Fn taskqueue_unblock "struct taskqueue *queue" .Ft int .Fn taskqueue_member "struct taskqueue *queue" "struct thread *td" .Ft void .Fn taskqueue_run "struct taskqueue *queue" .Fn TASK_INIT "struct task *task" "int priority" "task_fn_t func" "void *context" .Fn TASK_INITIALIZER "int priority" "task_fn_t func" "void *context" .Fn TASKQUEUE_DECLARE "name" .Fn TASKQUEUE_DEFINE "name" "taskqueue_enqueue_fn enqueue" "void *context" "init" .Fn TASKQUEUE_FAST_DEFINE "name" "taskqueue_enqueue_fn enqueue" "void *context" "init" .Fn TASKQUEUE_DEFINE_THREAD "name" .Fn TASKQUEUE_FAST_DEFINE_THREAD "name" .Fn TIMEOUT_TASK_INIT "struct taskqueue *queue" "struct timeout_task *timeout_task" "int priority" "task_fn_t func" "void *context" .Sh DESCRIPTION These functions provide a simple interface for asynchronous execution of code. .Pp The function .Fn taskqueue_create is used to create new queues. The arguments to .Fn taskqueue_create include a name that should be unique, a set of .Xr malloc 9 flags that specify whether the call to .Fn malloc is allowed to sleep, a function that is called from .Fn taskqueue_enqueue when a task is added to the queue, and a pointer to the memory location where the identity of the thread that services the queue is recorded. .\" XXX The rest of the sentence gets lots in relation to the first part. The function called from .Fn taskqueue_enqueue must arrange for the queue to be processed (for instance by scheduling a software interrupt or waking a kernel thread). The memory location where the thread identity is recorded is used to signal the service thread(s) to terminate--when this value is set to zero and the thread is signaled it will terminate. If the queue is intended for use in fast interrupt handlers .Fn taskqueue_create_fast should be used in place of .Fn taskqueue_create . .Pp The function .Fn taskqueue_free should be used to free the memory used by the queue. Any tasks that are on the queue will be executed at this time after which the thread servicing the queue will be signaled that it should exit. .Pp Once a taskqueue has been created, its threads should be started using .Fn taskqueue_start_threads , .Fn taskqueue_start_threads_cpuset or .Fn taskqueue_start_threads_in_proc . .Fn taskqueue_start_threads_cpuset takes a .Va cpuset argument which will cause the threads which are started for the taskqueue to be restricted to run on the given CPUs. .Fn taskqueue_start_threads_in_proc takes a .Va proc argument which will cause the threads which are started for the taskqueue to be assigned to the given kernel process. Callbacks may optionally be registered using .Fn taskqueue_set_callback . Currently, callbacks may be registered for the following purposes: .Bl -tag -width TASKQUEUE_CALLBACK_TYPE_SHUTDOWN .It Dv TASKQUEUE_CALLBACK_TYPE_INIT This callback is called by every thread in the taskqueue, before it executes any tasks. This callback must be set before the taskqueue's threads are started. .It Dv TASKQUEUE_CALLBACK_TYPE_SHUTDOWN This callback is called by every thread in the taskqueue, after it executes its last task. This callback will always be called before the taskqueue structure is reclaimed. .El .Pp To add a task to the list of tasks queued on a taskqueue, call .Fn taskqueue_enqueue with pointers to the queue and task. If the task's .Va ta_pending field is non-zero, then it is simply incremented to reflect the number of times the task was enqueued, up to a cap of USHRT_MAX. Otherwise, the task is added to the list before the first task which has a lower .Va ta_priority value or at the end of the list if no tasks have a lower priority. Enqueueing a task does not perform any memory allocation which makes it suitable for calling from an interrupt handler. This function will return .Er EPIPE if the queue is being freed. .Pp When a task is executed, first it is removed from the queue, the value of .Va ta_pending is recorded and then the field is zeroed. The function .Va ta_func from the task structure is called with the value of the field .Va ta_context as its first argument and the value of .Va ta_pending as its second argument. After the function .Va ta_func returns, .Xr wakeup 9 is called on the task pointer passed to .Fn taskqueue_enqueue . .Pp The +.Fn taskqueue_enqueue_flags +accepts an extra +.Va flags +parameter which specifies a set of optional flags to alter the behavior of +.Fn taskqueue_enqueue . +It contains one or more of the following flags: +.Bl -tag -width TASKQUEUE_FAIL_IF_CANCELING +.It Dv TASKQUEUE_FAIL_IF_PENDING +.Fn taskqueue_enqueue_flags +fails if the task is already scheduled for execution. +.Er EEXIST +is returned and the +.Va ta_pending +counter value remains unchanged. +.It Dv TASKQUEUE_FAIL_IF_CANCELING +.Fn taskqueue_enqueue_flags +fails if the task is in the canceling state and +.Er ECANCELED +is returned. +.El +.Pp +The .Fn taskqueue_enqueue_timeout function is used to schedule the enqueue after the specified number of .Va ticks . The .Fn taskqueue_enqueue_timeout_sbt function provides finer control over the scheduling based on .Va sbt , .Va pr , and .Va flags , as detailed in .Xr callout 9 . If the .Va ticks argument is negative, the already scheduled enqueueing is not re-scheduled. Otherwise, the task is scheduled for enqueueing in the future, after the absolute value of .Va ticks is passed. This function returns -1 if the task is being drained. Otherwise, the number of pending calls is returned. .Pp The .Fn taskqueue_cancel function is used to cancel a task. The .Va ta_pending count is cleared, and the old value returned in the reference parameter .Fa pendp , if it is .Pf non- Dv NULL . If the task is currently running, .Dv EBUSY is returned, otherwise 0. To implement a blocking .Fn taskqueue_cancel that waits for a running task to finish, it could look like: .Bd -literal -offset indent while (taskqueue_cancel(tq, task, NULL) != 0) taskqueue_drain(tq, task); .Ed .Pp Note that, as with .Fn taskqueue_drain , the caller is responsible for ensuring that the task is not re-enqueued after being canceled. .Pp Similarly, the .Fn taskqueue_cancel_timeout function is used to cancel the scheduled task execution. .Pp The .Fn taskqueue_drain function is used to wait for the task to finish, and the .Fn taskqueue_drain_timeout function is used to wait for the scheduled task to finish. There is no guarantee that the task will not be enqueued after call to .Fn taskqueue_drain . If the caller wants to put the task into a known state, then before calling .Fn taskqueue_drain the caller should use out-of-band means to ensure that the task would not be enqueued. For example, if the task is enqueued by an interrupt filter, then the interrupt could be disabled. .Pp The .Fn taskqueue_drain_all function is used to wait for all pending and running tasks that are enqueued on the taskqueue to finish. Tasks posted to the taskqueue after .Fn taskqueue_drain_all begins processing, including pending enqueues scheduled by a previous call to .Fn taskqueue_enqueue_timeout , do not extend the wait time of .Fn taskqueue_drain_all and may complete after .Fn taskqueue_drain_all returns. The .Fn taskqueue_quiesce function is used to wait for the queue to become empty and for all running tasks to finish. To avoid blocking indefinitely, the caller must ensure by some mechanism that tasks will eventually stop being posted to the queue. .Pp The .Fn taskqueue_block function blocks the taskqueue. It prevents any enqueued but not running tasks from being executed. Future calls to .Fn taskqueue_enqueue will enqueue tasks, but the tasks will not be run until .Fn taskqueue_unblock is called. Please note that .Fn taskqueue_block does not wait for any currently running tasks to finish. Thus, the .Fn taskqueue_block does not provide a guarantee that .Fn taskqueue_run is not running after .Fn taskqueue_block returns, but it does provide a guarantee that .Fn taskqueue_run will not be called again until .Fn taskqueue_unblock is called. If the caller requires a guarantee that .Fn taskqueue_run is not running, then this must be arranged by the caller. Note that if .Fn taskqueue_drain is called on a task that is enqueued on a taskqueue that is blocked by .Fn taskqueue_block , then .Fn taskqueue_drain can not return until the taskqueue is unblocked. This can result in a deadlock if the thread blocked in .Fn taskqueue_drain is the thread that is supposed to call .Fn taskqueue_unblock . Thus, use of .Fn taskqueue_drain after .Fn taskqueue_block is discouraged, because the state of the task can not be known in advance. The same caveat applies to .Fn taskqueue_drain_all . .Pp The .Fn taskqueue_unblock function unblocks the previously blocked taskqueue. All enqueued tasks can be run after this call. .Pp The .Fn taskqueue_member function returns .No 1 if the given thread .Fa td is part of the given taskqueue .Fa queue and .No 0 otherwise. .Pp The .Fn taskqueue_run function will run all pending tasks in the specified .Fa queue . Normally this function is only used internally. .Pp A convenience macro, .Fn TASK_INIT "task" "priority" "func" "context" is provided to initialise a .Va task structure. The .Fn TASK_INITIALIZER macro generates an initializer for a task structure. A macro .Fn TIMEOUT_TASK_INIT "queue" "timeout_task" "priority" "func" "context" initializes the .Va timeout_task structure. The values of .Va priority , .Va func , and .Va context are simply copied into the task structure fields and the .Va ta_pending field is cleared. .Pp Five macros .Fn TASKQUEUE_DECLARE "name" , .Fn TASKQUEUE_DEFINE "name" "enqueue" "context" "init" , .Fn TASKQUEUE_FAST_DEFINE "name" "enqueue" "context" "init" , and .Fn TASKQUEUE_DEFINE_THREAD "name" .Fn TASKQUEUE_FAST_DEFINE_THREAD "name" are used to declare a reference to a global queue, to define the implementation of the queue, and declare a queue that uses its own thread. The .Fn TASKQUEUE_DEFINE macro arranges to call .Fn taskqueue_create with the values of its .Va name , .Va enqueue and .Va context arguments during system initialisation. After calling .Fn taskqueue_create , the .Va init argument to the macro is executed as a C statement, allowing any further initialisation to be performed (such as registering an interrupt handler, etc.). .Pp The .Fn TASKQUEUE_DEFINE_THREAD macro defines a new taskqueue with its own kernel thread to serve tasks. The variable .Vt struct taskqueue *taskqueue_name is used to enqueue tasks onto the queue. .Pp .Fn TASKQUEUE_FAST_DEFINE and .Fn TASKQUEUE_FAST_DEFINE_THREAD act just like .Fn TASKQUEUE_DEFINE and .Fn TASKQUEUE_DEFINE_THREAD respectively but taskqueue is created with .Fn taskqueue_create_fast . .Ss Predefined Task Queues The system provides four global taskqueues, .Va taskqueue_fast , .Va taskqueue_swi , .Va taskqueue_swi_giant , and .Va taskqueue_thread . The .Va taskqueue_fast queue is for swi handlers dispatched from fast interrupt handlers, where sleep mutexes cannot be used. The swi taskqueues are run via a software interrupt mechanism. The .Va taskqueue_swi queue runs without the protection of the .Va Giant kernel lock, and the .Va taskqueue_swi_giant queue runs with the protection of the .Va Giant kernel lock. The thread taskqueue .Va taskqueue_thread runs in a kernel thread context, and tasks run from this thread do not run under the .Va Giant kernel lock. If the caller wants to run under .Va Giant , he should explicitly acquire and release .Va Giant in his taskqueue handler routine. .Pp To use these queues, call .Fn taskqueue_enqueue with the value of the global taskqueue variable for the queue you wish to use. .Pp The software interrupt queues can be used, for instance, for implementing interrupt handlers which must perform a significant amount of processing in the handler. The hardware interrupt handler would perform minimal processing of the interrupt and then enqueue a task to finish the work. This reduces to a minimum the amount of time spent with interrupts disabled. .Pp The thread queue can be used, for instance, by interrupt level routines that need to call kernel functions that do things that can only be done from a thread context. (e.g., call malloc with the M_WAITOK flag.) .Pp Note that tasks queued on shared taskqueues such as .Va taskqueue_swi may be delayed an indeterminate amount of time before execution. If queueing delays cannot be tolerated then a private taskqueue should be created with a dedicated processing thread. .Sh SEE ALSO .Xr callout 9 , .Xr ithread 9 , .Xr kthread 9 , .Xr swi 9 .Sh HISTORY This interface first appeared in .Fx 5.0 . There is a similar facility called work_queue in the Linux kernel. .Sh AUTHORS This manual page was written by .An Doug Rabson . diff --git a/sys/compat/linuxkpi/common/include/linux/kthread.h b/sys/compat/linuxkpi/common/include/linux/kthread.h index f4791d5f0db3..49309cd47a40 100644 --- a/sys/compat/linuxkpi/common/include/linux/kthread.h +++ b/sys/compat/linuxkpi/common/include/linux/kthread.h @@ -1,73 +1,168 @@ /*- * Copyright (c) 2010 Isilon Systems, Inc. * Copyright (c) 2010 iX Systems, Inc. * Copyright (c) 2010 Panasas, Inc. * Copyright (c) 2013-2017 Mellanox Technologies, Ltd. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice unmodified, this list of conditions, and the following * disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * $FreeBSD$ */ #ifndef _LINUXKPI_LINUX_KTHREAD_H_ #define _LINUXKPI_LINUX_KTHREAD_H_ #include -#include +#include +#include #include +#include +#include +#include +#include + +struct task_struct; +struct kthread_work; + +typedef void (*kthread_work_func_t)(struct kthread_work *work); + +struct kthread_worker { + struct task_struct *task; + struct taskqueue *tq; +}; + +struct kthread_work { + struct taskqueue *tq; + struct task task; + kthread_work_func_t func; +}; #define kthread_run(fn, data, fmt, ...) ({ \ struct task_struct *__task; \ struct thread *__td; \ \ if (kthread_add(linux_kthread_fn, NULL, NULL, &__td, \ RFSTOPPED, 0, fmt, ## __VA_ARGS__)) \ __task = NULL; \ else \ __task = linux_kthread_setup_and_run(__td, fn, data); \ __task; \ }) int linux_kthread_stop(struct task_struct *); bool linux_kthread_should_stop_task(struct task_struct *); bool linux_kthread_should_stop(void); int linux_kthread_park(struct task_struct *); void linux_kthread_parkme(void); bool linux_kthread_should_park(void); void linux_kthread_unpark(struct task_struct *); void linux_kthread_fn(void *); struct task_struct *linux_kthread_setup_and_run(struct thread *, linux_task_fn_t *, void *arg); int linux_in_atomic(void); #define kthread_stop(task) linux_kthread_stop(task) #define kthread_should_stop() linux_kthread_should_stop() #define kthread_should_stop_task(task) linux_kthread_should_stop_task(task) #define kthread_park(task) linux_kthread_park(task) #define kthread_parkme() linux_kthread_parkme() #define kthread_should_park() linux_kthread_should_park() #define kthread_unpark(task) linux_kthread_unpark(task) #define in_atomic() linux_in_atomic() +/* Only kthread_(create|destroy)_worker interface is allowed */ +#define kthread_init_worker(worker) \ + _Static_assert(false, "pre-4.9 worker interface is not supported"); + +task_fn_t lkpi_kthread_work_fn; +task_fn_t lkpi_kthread_worker_init_fn; + +#define kthread_create_worker(flags, fmt, ...) ({ \ + struct kthread_worker *__w; \ + struct task __task; \ + \ + __w = malloc(sizeof(*__w), M_KMALLOC, M_WAITOK | M_ZERO); \ + __w->tq = taskqueue_create("lkpi kthread taskq", M_WAITOK, \ + taskqueue_thread_enqueue, &__w->tq); \ + taskqueue_start_threads(&__w->tq, 1, PWAIT, fmt, ##__VA_ARGS__);\ + TASK_INIT(&__task, 0, lkpi_kthread_worker_init_fn, __w); \ + taskqueue_enqueue(__w->tq, &__task); \ + taskqueue_drain(__w->tq, &__task); \ + __w; \ +}) + +static inline void +kthread_destroy_worker(struct kthread_worker *worker) +{ + taskqueue_drain_all(worker->tq); + taskqueue_free(worker->tq); + free(worker, M_KMALLOC); +} + +static inline void +kthread_init_work(struct kthread_work *work, kthread_work_func_t func) +{ + work->tq = NULL; + work->func = func; + TASK_INIT(&work->task, 0, lkpi_kthread_work_fn, work); +} + +static inline bool +kthread_queue_work(struct kthread_worker *worker, struct kthread_work *work) +{ + int error; + + error = taskqueue_enqueue_flags(worker->tq, &work->task, + TASKQUEUE_FAIL_IF_CANCELING | TASKQUEUE_FAIL_IF_PENDING); + if (error == 0) + work->tq = worker->tq; + return (error == 0); +} + +static inline bool +kthread_cancel_work_sync(struct kthread_work *work) +{ + u_int pending = 0; + + if (work->tq != NULL && + taskqueue_cancel(work->tq, &work->task, &pending) != 0) + taskqueue_drain(work->tq, &work->task); + + return (pending != 0); +} + +static inline void +kthread_flush_work(struct kthread_work *work) +{ + if (work->tq != NULL) + taskqueue_drain(work->tq, &work->task); +} + +static inline void +kthread_flush_worker(struct kthread_worker *worker) +{ + taskqueue_drain_all(worker->tq); +} + #endif /* _LINUXKPI_LINUX_KTHREAD_H_ */ diff --git a/sys/compat/linuxkpi/common/src/linux_kthread.c b/sys/compat/linuxkpi/common/src/linux_kthread.c index 26afe005ea59..19afad6872c3 100644 --- a/sys/compat/linuxkpi/common/src/linux_kthread.c +++ b/sys/compat/linuxkpi/common/src/linux_kthread.c @@ -1,167 +1,183 @@ /*- * Copyright (c) 2017 Hans Petter Selasky * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice unmodified, this list of conditions, and the following * disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #include __FBSDID("$FreeBSD$"); #include #include #include #include #include #include #include enum { KTHREAD_SHOULD_STOP_MASK = (1 << 0), KTHREAD_SHOULD_PARK_MASK = (1 << 1), KTHREAD_IS_PARKED_MASK = (1 << 2), }; bool linux_kthread_should_stop_task(struct task_struct *task) { return (atomic_read(&task->kthread_flags) & KTHREAD_SHOULD_STOP_MASK); } bool linux_kthread_should_stop(void) { return (atomic_read(¤t->kthread_flags) & KTHREAD_SHOULD_STOP_MASK); } int linux_kthread_stop(struct task_struct *task) { int retval; /* * Assume task is still alive else caller should not call * kthread_stop(): */ atomic_or(KTHREAD_SHOULD_STOP_MASK, &task->kthread_flags); kthread_unpark(task); wake_up_process(task); wait_for_completion(&task->exited); /* * Get return code and free task structure: */ retval = task->task_ret; put_task_struct(task); return (retval); } int linux_kthread_park(struct task_struct *task) { atomic_or(KTHREAD_SHOULD_PARK_MASK, &task->kthread_flags); wake_up_process(task); wait_for_completion(&task->parked); return (0); } void linux_kthread_parkme(void) { struct task_struct *task; task = current; set_task_state(task, TASK_PARKED | TASK_UNINTERRUPTIBLE); while (linux_kthread_should_park()) { while ((atomic_fetch_or(KTHREAD_IS_PARKED_MASK, &task->kthread_flags) & KTHREAD_IS_PARKED_MASK) == 0) complete(&task->parked); schedule(); set_task_state(task, TASK_PARKED | TASK_UNINTERRUPTIBLE); } atomic_andnot(KTHREAD_IS_PARKED_MASK, &task->kthread_flags); set_task_state(task, TASK_RUNNING); } bool linux_kthread_should_park(void) { struct task_struct *task; task = current; return (atomic_read(&task->kthread_flags) & KTHREAD_SHOULD_PARK_MASK); } void linux_kthread_unpark(struct task_struct *task) { atomic_andnot(KTHREAD_SHOULD_PARK_MASK, &task->kthread_flags); if ((atomic_fetch_andnot(KTHREAD_IS_PARKED_MASK, &task->kthread_flags) & KTHREAD_IS_PARKED_MASK) != 0) wake_up_state(task, TASK_PARKED); } struct task_struct * linux_kthread_setup_and_run(struct thread *td, linux_task_fn_t *task_fn, void *arg) { struct task_struct *task; linux_set_current(td); task = td->td_lkpi_task; task->task_fn = task_fn; task->task_data = arg; thread_lock(td); /* make sure the scheduler priority is raised */ sched_prio(td, PI_SWI(SWI_NET)); /* put thread into run-queue */ sched_add(td, SRQ_BORING); return (task); } void linux_kthread_fn(void *arg __unused) { struct task_struct *task = current; if (linux_kthread_should_stop_task(task) == 0) task->task_ret = task->task_fn(task->task_data); if (linux_kthread_should_stop_task(task) != 0) { struct thread *td = curthread; /* let kthread_stop() free data */ td->td_lkpi_task = NULL; /* wakeup kthread_stop() */ complete(&task->exited); } kthread_exit(); } + +void +lkpi_kthread_work_fn(void *context, int pending __unused) +{ + struct kthread_work *work = context; + + work->func(work); +} + +void +lkpi_kthread_worker_init_fn(void *context, int pending __unused) +{ + struct kthread_worker *worker = context; + + worker->task = current; +} diff --git a/sys/kern/subr_taskqueue.c b/sys/kern/subr_taskqueue.c index e43b09010761..7ad7c210ceff 100644 --- a/sys/kern/subr_taskqueue.c +++ b/sys/kern/subr_taskqueue.c @@ -1,883 +1,915 @@ /*- * SPDX-License-Identifier: BSD-2-Clause-FreeBSD * * Copyright (c) 2000 Doug Rabson * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ #include __FBSDID("$FreeBSD$"); #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include static MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues"); static void *taskqueue_giant_ih; static void *taskqueue_ih; static void taskqueue_fast_enqueue(void *); static void taskqueue_swi_enqueue(void *); static void taskqueue_swi_giant_enqueue(void *); struct taskqueue_busy { struct task *tb_running; u_int tb_seq; + bool tb_canceling; LIST_ENTRY(taskqueue_busy) tb_link; }; struct taskqueue { STAILQ_HEAD(, task) tq_queue; LIST_HEAD(, taskqueue_busy) tq_active; struct task *tq_hint; u_int tq_seq; int tq_callouts; struct mtx_padalign tq_mutex; taskqueue_enqueue_fn tq_enqueue; void *tq_context; char *tq_name; struct thread **tq_threads; int tq_tcount; int tq_spin; int tq_flags; taskqueue_callback_fn tq_callbacks[TASKQUEUE_NUM_CALLBACKS]; void *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS]; }; #define TQ_FLAGS_ACTIVE (1 << 0) #define TQ_FLAGS_BLOCKED (1 << 1) #define TQ_FLAGS_UNLOCKED_ENQUEUE (1 << 2) #define DT_CALLOUT_ARMED (1 << 0) #define DT_DRAIN_IN_PROGRESS (1 << 1) #define TQ_LOCK(tq) \ do { \ if ((tq)->tq_spin) \ mtx_lock_spin(&(tq)->tq_mutex); \ else \ mtx_lock(&(tq)->tq_mutex); \ } while (0) #define TQ_ASSERT_LOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_OWNED) #define TQ_UNLOCK(tq) \ do { \ if ((tq)->tq_spin) \ mtx_unlock_spin(&(tq)->tq_mutex); \ else \ mtx_unlock(&(tq)->tq_mutex); \ } while (0) #define TQ_ASSERT_UNLOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED) void _timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task, int priority, task_fn_t func, void *context) { TASK_INIT(&timeout_task->t, priority, func, context); callout_init_mtx(&timeout_task->c, &queue->tq_mutex, CALLOUT_RETURNUNLOCKED); timeout_task->q = queue; timeout_task->f = 0; } static __inline int TQ_SLEEP(struct taskqueue *tq, void *p, const char *wm) { if (tq->tq_spin) return (msleep_spin(p, (struct mtx *)&tq->tq_mutex, wm, 0)); return (msleep(p, &tq->tq_mutex, 0, wm, 0)); } +static struct taskqueue_busy * +task_get_busy(struct taskqueue *queue, struct task *task) +{ + struct taskqueue_busy *tb; + + TQ_ASSERT_LOCKED(queue); + LIST_FOREACH(tb, &queue->tq_active, tb_link) { + if (tb->tb_running == task) + return (tb); + } + return (NULL); +} + static struct taskqueue * _taskqueue_create(const char *name, int mflags, taskqueue_enqueue_fn enqueue, void *context, int mtxflags, const char *mtxname __unused) { struct taskqueue *queue; char *tq_name; tq_name = malloc(TASKQUEUE_NAMELEN, M_TASKQUEUE, mflags | M_ZERO); if (tq_name == NULL) return (NULL); queue = malloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags | M_ZERO); if (queue == NULL) { free(tq_name, M_TASKQUEUE); return (NULL); } snprintf(tq_name, TASKQUEUE_NAMELEN, "%s", (name) ? name : "taskqueue"); STAILQ_INIT(&queue->tq_queue); LIST_INIT(&queue->tq_active); queue->tq_enqueue = enqueue; queue->tq_context = context; queue->tq_name = tq_name; queue->tq_spin = (mtxflags & MTX_SPIN) != 0; queue->tq_flags |= TQ_FLAGS_ACTIVE; if (enqueue == taskqueue_fast_enqueue || enqueue == taskqueue_swi_enqueue || enqueue == taskqueue_swi_giant_enqueue || enqueue == taskqueue_thread_enqueue) queue->tq_flags |= TQ_FLAGS_UNLOCKED_ENQUEUE; mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags); return (queue); } struct taskqueue * taskqueue_create(const char *name, int mflags, taskqueue_enqueue_fn enqueue, void *context) { return _taskqueue_create(name, mflags, enqueue, context, MTX_DEF, name); } void taskqueue_set_callback(struct taskqueue *queue, enum taskqueue_callback_type cb_type, taskqueue_callback_fn callback, void *context) { KASSERT(((cb_type >= TASKQUEUE_CALLBACK_TYPE_MIN) && (cb_type <= TASKQUEUE_CALLBACK_TYPE_MAX)), ("Callback type %d not valid, must be %d-%d", cb_type, TASKQUEUE_CALLBACK_TYPE_MIN, TASKQUEUE_CALLBACK_TYPE_MAX)); KASSERT((queue->tq_callbacks[cb_type] == NULL), ("Re-initialization of taskqueue callback?")); queue->tq_callbacks[cb_type] = callback; queue->tq_cb_contexts[cb_type] = context; } /* * Signal a taskqueue thread to terminate. */ static void taskqueue_terminate(struct thread **pp, struct taskqueue *tq) { while (tq->tq_tcount > 0 || tq->tq_callouts > 0) { wakeup(tq); TQ_SLEEP(tq, pp, "tq_destroy"); } } void taskqueue_free(struct taskqueue *queue) { TQ_LOCK(queue); queue->tq_flags &= ~TQ_FLAGS_ACTIVE; taskqueue_terminate(queue->tq_threads, queue); KASSERT(LIST_EMPTY(&queue->tq_active), ("Tasks still running?")); KASSERT(queue->tq_callouts == 0, ("Armed timeout tasks")); mtx_destroy(&queue->tq_mutex); free(queue->tq_threads, M_TASKQUEUE); free(queue->tq_name, M_TASKQUEUE); free(queue, M_TASKQUEUE); } static int -taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task) +taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task, int flags) { struct task *ins; struct task *prev; + struct taskqueue_busy *tb; KASSERT(task->ta_func != NULL, ("enqueueing task with NULL func")); + /* + * Ignore canceling task if requested. + */ + if (__predict_false((flags & TASKQUEUE_FAIL_IF_CANCELING) != 0)) { + tb = task_get_busy(queue, task); + if (tb != NULL && tb->tb_canceling) { + TQ_UNLOCK(queue); + return (ECANCELED); + } + } + /* * Count multiple enqueues. */ if (task->ta_pending) { + if (__predict_false((flags & TASKQUEUE_FAIL_IF_PENDING) != 0)) { + TQ_UNLOCK(queue); + return (EEXIST); + } if (task->ta_pending < USHRT_MAX) task->ta_pending++; TQ_UNLOCK(queue); return (0); } /* * Optimise cases when all tasks use small set of priorities. * In case of only one priority we always insert at the end. * In case of two tq_hint typically gives the insertion point. * In case of more then two tq_hint should halve the search. */ prev = STAILQ_LAST(&queue->tq_queue, task, ta_link); if (!prev || prev->ta_priority >= task->ta_priority) { STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link); } else { prev = queue->tq_hint; if (prev && prev->ta_priority >= task->ta_priority) { ins = STAILQ_NEXT(prev, ta_link); } else { prev = NULL; ins = STAILQ_FIRST(&queue->tq_queue); } for (; ins; prev = ins, ins = STAILQ_NEXT(ins, ta_link)) if (ins->ta_priority < task->ta_priority) break; if (prev) { STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link); queue->tq_hint = task; } else STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link); } task->ta_pending = 1; if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) != 0) TQ_UNLOCK(queue); if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) queue->tq_enqueue(queue->tq_context); if ((queue->tq_flags & TQ_FLAGS_UNLOCKED_ENQUEUE) == 0) TQ_UNLOCK(queue); /* Return with lock released. */ return (0); } int -taskqueue_enqueue(struct taskqueue *queue, struct task *task) +taskqueue_enqueue_flags(struct taskqueue *queue, struct task *task, int flags) { int res; TQ_LOCK(queue); - res = taskqueue_enqueue_locked(queue, task); + res = taskqueue_enqueue_locked(queue, task, flags); /* The lock is released inside. */ return (res); } +int +taskqueue_enqueue(struct taskqueue *queue, struct task *task) +{ + return (taskqueue_enqueue_flags(queue, task, 0)); +} + static void taskqueue_timeout_func(void *arg) { struct taskqueue *queue; struct timeout_task *timeout_task; timeout_task = arg; queue = timeout_task->q; KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout")); timeout_task->f &= ~DT_CALLOUT_ARMED; queue->tq_callouts--; - taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t); + taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t, 0); /* The lock is released inside. */ } int taskqueue_enqueue_timeout_sbt(struct taskqueue *queue, struct timeout_task *timeout_task, sbintime_t sbt, sbintime_t pr, int flags) { int res; TQ_LOCK(queue); KASSERT(timeout_task->q == NULL || timeout_task->q == queue, ("Migrated queue")); timeout_task->q = queue; res = timeout_task->t.ta_pending; if (timeout_task->f & DT_DRAIN_IN_PROGRESS) { /* Do nothing */ TQ_UNLOCK(queue); res = -1; } else if (sbt == 0) { - taskqueue_enqueue_locked(queue, &timeout_task->t); + taskqueue_enqueue_locked(queue, &timeout_task->t, 0); /* The lock is released inside. */ } else { if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) { res++; } else { queue->tq_callouts++; timeout_task->f |= DT_CALLOUT_ARMED; if (sbt < 0) sbt = -sbt; /* Ignore overflow. */ } if (sbt > 0) { if (queue->tq_spin) flags |= C_DIRECT_EXEC; callout_reset_sbt(&timeout_task->c, sbt, pr, taskqueue_timeout_func, timeout_task, flags); } TQ_UNLOCK(queue); } return (res); } int taskqueue_enqueue_timeout(struct taskqueue *queue, struct timeout_task *ttask, int ticks) { return (taskqueue_enqueue_timeout_sbt(queue, ttask, ticks * tick_sbt, 0, C_HARDCLOCK)); } static void taskqueue_task_nop_fn(void *context, int pending) { } /* * Block until all currently queued tasks in this taskqueue * have begun execution. Tasks queued during execution of * this function are ignored. */ static int taskqueue_drain_tq_queue(struct taskqueue *queue) { struct task t_barrier; if (STAILQ_EMPTY(&queue->tq_queue)) return (0); /* * Enqueue our barrier after all current tasks, but with * the highest priority so that newly queued tasks cannot * pass it. Because of the high priority, we can not use * taskqueue_enqueue_locked directly (which drops the lock * anyway) so just insert it at tail while we have the * queue lock. */ TASK_INIT(&t_barrier, UCHAR_MAX, taskqueue_task_nop_fn, &t_barrier); STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link); queue->tq_hint = &t_barrier; t_barrier.ta_pending = 1; /* * Once the barrier has executed, all previously queued tasks * have completed or are currently executing. */ while (t_barrier.ta_pending != 0) TQ_SLEEP(queue, &t_barrier, "tq_qdrain"); return (1); } /* * Block until all currently executing tasks for this taskqueue * complete. Tasks that begin execution during the execution * of this function are ignored. */ static int taskqueue_drain_tq_active(struct taskqueue *queue) { struct taskqueue_busy *tb; u_int seq; if (LIST_EMPTY(&queue->tq_active)) return (0); /* Block taskq_terminate().*/ queue->tq_callouts++; /* Wait for any active task with sequence from the past. */ seq = queue->tq_seq; restart: LIST_FOREACH(tb, &queue->tq_active, tb_link) { if ((int)(tb->tb_seq - seq) <= 0) { TQ_SLEEP(queue, tb->tb_running, "tq_adrain"); goto restart; } } /* Release taskqueue_terminate(). */ queue->tq_callouts--; if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0) wakeup_one(queue->tq_threads); return (1); } void taskqueue_block(struct taskqueue *queue) { TQ_LOCK(queue); queue->tq_flags |= TQ_FLAGS_BLOCKED; TQ_UNLOCK(queue); } void taskqueue_unblock(struct taskqueue *queue) { TQ_LOCK(queue); queue->tq_flags &= ~TQ_FLAGS_BLOCKED; if (!STAILQ_EMPTY(&queue->tq_queue)) queue->tq_enqueue(queue->tq_context); TQ_UNLOCK(queue); } static void taskqueue_run_locked(struct taskqueue *queue) { struct epoch_tracker et; struct taskqueue_busy tb; struct task *task; bool in_net_epoch; int pending; KASSERT(queue != NULL, ("tq is NULL")); TQ_ASSERT_LOCKED(queue); tb.tb_running = NULL; LIST_INSERT_HEAD(&queue->tq_active, &tb, tb_link); in_net_epoch = false; while ((task = STAILQ_FIRST(&queue->tq_queue)) != NULL) { STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link); if (queue->tq_hint == task) queue->tq_hint = NULL; pending = task->ta_pending; task->ta_pending = 0; tb.tb_running = task; tb.tb_seq = ++queue->tq_seq; + tb.tb_canceling = false; TQ_UNLOCK(queue); KASSERT(task->ta_func != NULL, ("task->ta_func is NULL")); if (!in_net_epoch && TASK_IS_NET(task)) { in_net_epoch = true; NET_EPOCH_ENTER(et); } else if (in_net_epoch && !TASK_IS_NET(task)) { NET_EPOCH_EXIT(et); in_net_epoch = false; } task->ta_func(task->ta_context, pending); TQ_LOCK(queue); wakeup(task); } if (in_net_epoch) NET_EPOCH_EXIT(et); LIST_REMOVE(&tb, tb_link); } void taskqueue_run(struct taskqueue *queue) { TQ_LOCK(queue); taskqueue_run_locked(queue); TQ_UNLOCK(queue); } -static int -task_is_running(struct taskqueue *queue, struct task *task) -{ - struct taskqueue_busy *tb; - - TQ_ASSERT_LOCKED(queue); - LIST_FOREACH(tb, &queue->tq_active, tb_link) { - if (tb->tb_running == task) - return (1); - } - return (0); -} - /* * Only use this function in single threaded contexts. It returns * non-zero if the given task is either pending or running. Else the * task is idle and can be queued again or freed. */ int taskqueue_poll_is_busy(struct taskqueue *queue, struct task *task) { int retval; TQ_LOCK(queue); - retval = task->ta_pending > 0 || task_is_running(queue, task); + retval = task->ta_pending > 0 || task_get_busy(queue, task) != NULL; TQ_UNLOCK(queue); return (retval); } static int taskqueue_cancel_locked(struct taskqueue *queue, struct task *task, u_int *pendp) { + struct taskqueue_busy *tb; + int retval = 0; if (task->ta_pending > 0) { STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link); if (queue->tq_hint == task) queue->tq_hint = NULL; } if (pendp != NULL) *pendp = task->ta_pending; task->ta_pending = 0; - return (task_is_running(queue, task) ? EBUSY : 0); + tb = task_get_busy(queue, task); + if (tb != NULL) { + tb->tb_canceling = true; + retval = EBUSY; + } + + return (retval); } int taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp) { int error; TQ_LOCK(queue); error = taskqueue_cancel_locked(queue, task, pendp); TQ_UNLOCK(queue); return (error); } int taskqueue_cancel_timeout(struct taskqueue *queue, struct timeout_task *timeout_task, u_int *pendp) { u_int pending, pending1; int error; TQ_LOCK(queue); pending = !!(callout_stop(&timeout_task->c) > 0); error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1); if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) { timeout_task->f &= ~DT_CALLOUT_ARMED; queue->tq_callouts--; } TQ_UNLOCK(queue); if (pendp != NULL) *pendp = pending + pending1; return (error); } void taskqueue_drain(struct taskqueue *queue, struct task *task) { if (!queue->tq_spin) WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); TQ_LOCK(queue); - while (task->ta_pending != 0 || task_is_running(queue, task)) + while (task->ta_pending != 0 || task_get_busy(queue, task) != NULL) TQ_SLEEP(queue, task, "tq_drain"); TQ_UNLOCK(queue); } void taskqueue_drain_all(struct taskqueue *queue) { if (!queue->tq_spin) WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__); TQ_LOCK(queue); (void)taskqueue_drain_tq_queue(queue); (void)taskqueue_drain_tq_active(queue); TQ_UNLOCK(queue); } void taskqueue_drain_timeout(struct taskqueue *queue, struct timeout_task *timeout_task) { /* * Set flag to prevent timer from re-starting during drain: */ TQ_LOCK(queue); KASSERT((timeout_task->f & DT_DRAIN_IN_PROGRESS) == 0, ("Drain already in progress")); timeout_task->f |= DT_DRAIN_IN_PROGRESS; TQ_UNLOCK(queue); callout_drain(&timeout_task->c); taskqueue_drain(queue, &timeout_task->t); /* * Clear flag to allow timer to re-start: */ TQ_LOCK(queue); timeout_task->f &= ~DT_DRAIN_IN_PROGRESS; TQ_UNLOCK(queue); } void taskqueue_quiesce(struct taskqueue *queue) { int ret; TQ_LOCK(queue); do { ret = taskqueue_drain_tq_queue(queue); if (ret == 0) ret = taskqueue_drain_tq_active(queue); } while (ret != 0); TQ_UNLOCK(queue); } static void taskqueue_swi_enqueue(void *context) { swi_sched(taskqueue_ih, 0); } static void taskqueue_swi_run(void *dummy) { taskqueue_run(taskqueue_swi); } static void taskqueue_swi_giant_enqueue(void *context) { swi_sched(taskqueue_giant_ih, 0); } static void taskqueue_swi_giant_run(void *dummy) { taskqueue_run(taskqueue_swi_giant); } static int _taskqueue_start_threads(struct taskqueue **tqp, int count, int pri, cpuset_t *mask, struct proc *p, const char *name, va_list ap) { char ktname[MAXCOMLEN + 1]; struct thread *td; struct taskqueue *tq; int i, error; if (count <= 0) return (EINVAL); vsnprintf(ktname, sizeof(ktname), name, ap); tq = *tqp; tq->tq_threads = malloc(sizeof(struct thread *) * count, M_TASKQUEUE, M_NOWAIT | M_ZERO); if (tq->tq_threads == NULL) { printf("%s: no memory for %s threads\n", __func__, ktname); return (ENOMEM); } for (i = 0; i < count; i++) { if (count == 1) error = kthread_add(taskqueue_thread_loop, tqp, p, &tq->tq_threads[i], RFSTOPPED, 0, "%s", ktname); else error = kthread_add(taskqueue_thread_loop, tqp, p, &tq->tq_threads[i], RFSTOPPED, 0, "%s_%d", ktname, i); if (error) { /* should be ok to continue, taskqueue_free will dtrt */ printf("%s: kthread_add(%s): error %d", __func__, ktname, error); tq->tq_threads[i] = NULL; /* paranoid */ } else tq->tq_tcount++; } if (tq->tq_tcount == 0) { free(tq->tq_threads, M_TASKQUEUE); tq->tq_threads = NULL; return (ENOMEM); } for (i = 0; i < count; i++) { if (tq->tq_threads[i] == NULL) continue; td = tq->tq_threads[i]; if (mask) { error = cpuset_setthread(td->td_tid, mask); /* * Failing to pin is rarely an actual fatal error; * it'll just affect performance. */ if (error) printf("%s: curthread=%llu: can't pin; " "error=%d\n", __func__, (unsigned long long) td->td_tid, error); } thread_lock(td); sched_prio(td, pri); sched_add(td, SRQ_BORING); } return (0); } int taskqueue_start_threads(struct taskqueue **tqp, int count, int pri, const char *name, ...) { va_list ap; int error; va_start(ap, name); error = _taskqueue_start_threads(tqp, count, pri, NULL, NULL, name, ap); va_end(ap); return (error); } int taskqueue_start_threads_in_proc(struct taskqueue **tqp, int count, int pri, struct proc *proc, const char *name, ...) { va_list ap; int error; va_start(ap, name); error = _taskqueue_start_threads(tqp, count, pri, NULL, proc, name, ap); va_end(ap); return (error); } int taskqueue_start_threads_cpuset(struct taskqueue **tqp, int count, int pri, cpuset_t *mask, const char *name, ...) { va_list ap; int error; va_start(ap, name); error = _taskqueue_start_threads(tqp, count, pri, mask, NULL, name, ap); va_end(ap); return (error); } static inline void taskqueue_run_callback(struct taskqueue *tq, enum taskqueue_callback_type cb_type) { taskqueue_callback_fn tq_callback; TQ_ASSERT_UNLOCKED(tq); tq_callback = tq->tq_callbacks[cb_type]; if (tq_callback != NULL) tq_callback(tq->tq_cb_contexts[cb_type]); } void taskqueue_thread_loop(void *arg) { struct taskqueue **tqp, *tq; tqp = arg; tq = *tqp; taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_INIT); TQ_LOCK(tq); while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) { /* XXX ? */ taskqueue_run_locked(tq); /* * Because taskqueue_run() can drop tq_mutex, we need to * check if the TQ_FLAGS_ACTIVE flag wasn't removed in the * meantime, which means we missed a wakeup. */ if ((tq->tq_flags & TQ_FLAGS_ACTIVE) == 0) break; TQ_SLEEP(tq, tq, "-"); } taskqueue_run_locked(tq); /* * This thread is on its way out, so just drop the lock temporarily * in order to call the shutdown callback. This allows the callback * to look at the taskqueue, even just before it dies. */ TQ_UNLOCK(tq); taskqueue_run_callback(tq, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN); TQ_LOCK(tq); /* rendezvous with thread that asked us to terminate */ tq->tq_tcount--; wakeup_one(tq->tq_threads); TQ_UNLOCK(tq); kthread_exit(); } void taskqueue_thread_enqueue(void *context) { struct taskqueue **tqp, *tq; tqp = context; tq = *tqp; wakeup_any(tq); } TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, NULL, swi_add(NULL, "task queue", taskqueue_swi_run, NULL, SWI_TQ, INTR_MPSAFE, &taskqueue_ih)); TASKQUEUE_DEFINE(swi_giant, taskqueue_swi_giant_enqueue, NULL, swi_add(NULL, "Giant taskq", taskqueue_swi_giant_run, NULL, SWI_TQ_GIANT, 0, &taskqueue_giant_ih)); TASKQUEUE_DEFINE_THREAD(thread); struct taskqueue * taskqueue_create_fast(const char *name, int mflags, taskqueue_enqueue_fn enqueue, void *context) { return _taskqueue_create(name, mflags, enqueue, context, MTX_SPIN, "fast_taskqueue"); } static void *taskqueue_fast_ih; static void taskqueue_fast_enqueue(void *context) { swi_sched(taskqueue_fast_ih, 0); } static void taskqueue_fast_run(void *dummy) { taskqueue_run(taskqueue_fast); } TASKQUEUE_FAST_DEFINE(fast, taskqueue_fast_enqueue, NULL, swi_add(NULL, "fast taskq", taskqueue_fast_run, NULL, SWI_TQ_FAST, INTR_MPSAFE, &taskqueue_fast_ih)); int taskqueue_member(struct taskqueue *queue, struct thread *td) { int i, j, ret = 0; for (i = 0, j = 0; ; i++) { if (queue->tq_threads[i] == NULL) continue; if (queue->tq_threads[i] == td) { ret = 1; break; } if (++j >= queue->tq_tcount) break; } return (ret); } diff --git a/sys/sys/taskqueue.h b/sys/sys/taskqueue.h index 7e59187e0114..cc98fe5fab85 100644 --- a/sys/sys/taskqueue.h +++ b/sys/sys/taskqueue.h @@ -1,222 +1,228 @@ /*- * SPDX-License-Identifier: BSD-2-Clause-FreeBSD * * Copyright (c) 2000 Doug Rabson * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * * $FreeBSD$ */ #ifndef _SYS_TASKQUEUE_H_ #define _SYS_TASKQUEUE_H_ #ifndef _KERNEL #error "no user-serviceable parts inside" #endif #include #include #include #include struct taskqueue; struct taskqgroup; struct proc; struct thread; struct timeout_task { struct taskqueue *q; struct task t; struct callout c; int f; }; enum taskqueue_callback_type { TASKQUEUE_CALLBACK_TYPE_INIT, TASKQUEUE_CALLBACK_TYPE_SHUTDOWN, }; #define TASKQUEUE_CALLBACK_TYPE_MIN TASKQUEUE_CALLBACK_TYPE_INIT #define TASKQUEUE_CALLBACK_TYPE_MAX TASKQUEUE_CALLBACK_TYPE_SHUTDOWN #define TASKQUEUE_NUM_CALLBACKS TASKQUEUE_CALLBACK_TYPE_MAX + 1 #define TASKQUEUE_NAMELEN 32 +/* taskqueue_enqueue flags */ +#define TASKQUEUE_FAIL_IF_PENDING (1 << 0) +#define TASKQUEUE_FAIL_IF_CANCELING (1 << 1) + typedef void (*taskqueue_callback_fn)(void *context); /* * A notification callback function which is called from * taskqueue_enqueue(). The context argument is given in the call to * taskqueue_create(). This function would normally be used to allow the * queue to arrange to run itself later (e.g., by scheduling a software * interrupt or waking a kernel thread). */ typedef void (*taskqueue_enqueue_fn)(void *context); struct taskqueue *taskqueue_create(const char *name, int mflags, taskqueue_enqueue_fn enqueue, void *context); int taskqueue_start_threads(struct taskqueue **tqp, int count, int pri, const char *name, ...) __printflike(4, 5); int taskqueue_start_threads_in_proc(struct taskqueue **tqp, int count, int pri, struct proc *p, const char *name, ...) __printflike(5, 6); int taskqueue_start_threads_cpuset(struct taskqueue **tqp, int count, int pri, cpuset_t *mask, const char *name, ...) __printflike(5, 6); int taskqueue_enqueue(struct taskqueue *queue, struct task *task); +int taskqueue_enqueue_flags(struct taskqueue *queue, struct task *task, + int flags); int taskqueue_enqueue_timeout(struct taskqueue *queue, struct timeout_task *timeout_task, int ticks); int taskqueue_enqueue_timeout_sbt(struct taskqueue *queue, struct timeout_task *timeout_task, sbintime_t sbt, sbintime_t pr, int flags); int taskqueue_poll_is_busy(struct taskqueue *queue, struct task *task); int taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp); int taskqueue_cancel_timeout(struct taskqueue *queue, struct timeout_task *timeout_task, u_int *pendp); void taskqueue_drain(struct taskqueue *queue, struct task *task); void taskqueue_drain_timeout(struct taskqueue *queue, struct timeout_task *timeout_task); void taskqueue_drain_all(struct taskqueue *queue); void taskqueue_quiesce(struct taskqueue *queue); void taskqueue_free(struct taskqueue *queue); void taskqueue_run(struct taskqueue *queue); void taskqueue_block(struct taskqueue *queue); void taskqueue_unblock(struct taskqueue *queue); int taskqueue_member(struct taskqueue *queue, struct thread *td); void taskqueue_set_callback(struct taskqueue *queue, enum taskqueue_callback_type cb_type, taskqueue_callback_fn callback, void *context); #define TASK_INITIALIZER(priority, func, context) \ { .ta_priority = (priority), \ .ta_func = (func), \ .ta_context = (context) } /* * Functions for dedicated thread taskqueues */ void taskqueue_thread_loop(void *arg); void taskqueue_thread_enqueue(void *context); /* * Initialise a task structure. */ #define TASK_INIT_FLAGS(task, priority, func, context, flags) do { \ (task)->ta_pending = 0; \ (task)->ta_priority = (priority); \ (task)->ta_flags = (flags); \ (task)->ta_func = (func); \ (task)->ta_context = (context); \ } while (0) #define TASK_INIT(t, p, f, c) TASK_INIT_FLAGS(t, p, f, c, 0) void _timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task, int priority, task_fn_t func, void *context); #define TIMEOUT_TASK_INIT(queue, timeout_task, priority, func, context) do { \ _Static_assert((priority) >= 0 && (priority) <= 255, \ "struct task priority is 8 bit in size"); \ _timeout_task_init(queue, timeout_task, priority, func, context); \ } while (0) /* * Declare a reference to a taskqueue. */ #define TASKQUEUE_DECLARE(name) \ extern struct taskqueue *taskqueue_##name /* * Define and initialise a global taskqueue that uses sleep mutexes. */ #define TASKQUEUE_DEFINE(name, enqueue, context, init) \ \ struct taskqueue *taskqueue_##name; \ \ static void \ taskqueue_define_##name(void *arg) \ { \ taskqueue_##name = \ taskqueue_create(#name, M_WAITOK, (enqueue), (context)); \ init; \ } \ \ SYSINIT(taskqueue_##name, SI_SUB_TASKQ, SI_ORDER_SECOND, \ taskqueue_define_##name, NULL); \ \ struct __hack #define TASKQUEUE_DEFINE_THREAD(name) \ TASKQUEUE_DEFINE(name, taskqueue_thread_enqueue, &taskqueue_##name, \ taskqueue_start_threads(&taskqueue_##name, 1, PWAIT, \ "%s taskq", #name)) /* * Define and initialise a global taskqueue that uses spin mutexes. */ #define TASKQUEUE_FAST_DEFINE(name, enqueue, context, init) \ \ struct taskqueue *taskqueue_##name; \ \ static void \ taskqueue_define_##name(void *arg) \ { \ taskqueue_##name = \ taskqueue_create_fast(#name, M_WAITOK, (enqueue), \ (context)); \ init; \ } \ \ SYSINIT(taskqueue_##name, SI_SUB_TASKQ, SI_ORDER_SECOND, \ taskqueue_define_##name, NULL); \ \ struct __hack #define TASKQUEUE_FAST_DEFINE_THREAD(name) \ TASKQUEUE_FAST_DEFINE(name, taskqueue_thread_enqueue, \ &taskqueue_##name, taskqueue_start_threads(&taskqueue_##name, \ 1, PWAIT, "%s taskq", #name)) /* * These queues are serviced by software interrupt handlers. To enqueue * a task, call taskqueue_enqueue(taskqueue_swi, &task) or * taskqueue_enqueue(taskqueue_swi_giant, &task). */ TASKQUEUE_DECLARE(swi_giant); TASKQUEUE_DECLARE(swi); /* * This queue is serviced by a kernel thread. To enqueue a task, call * taskqueue_enqueue(taskqueue_thread, &task). */ TASKQUEUE_DECLARE(thread); /* * Queue for swi handlers dispatched from fast interrupt handlers. * These are necessarily different from the above because the queue * must be locked with spinlocks since sleep mutex's cannot be used * from a fast interrupt handler context. */ TASKQUEUE_DECLARE(fast); struct taskqueue *taskqueue_create_fast(const char *name, int mflags, taskqueue_enqueue_fn enqueue, void *context); #endif /* !_SYS_TASKQUEUE_H_ */