diff --git a/include/os/linux/spl/sys/taskq.h b/include/os/linux/spl/sys/taskq.h index 2a6cd8283d16..6c1b4377a98a 100644 --- a/include/os/linux/spl/sys/taskq.h +++ b/include/os/linux/spl/sys/taskq.h @@ -1,169 +1,170 @@ /* * Copyright (C) 2007-2010 Lawrence Livermore National Security, LLC. * Copyright (C) 2007 The Regents of the University of California. * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). * Written by Brian Behlendorf . * UCRL-CODE-235197 * * This file is part of the SPL, Solaris Porting Layer. * * The SPL is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the * Free Software Foundation; either version 2 of the License, or (at your * option) any later version. * * The SPL is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * for more details. * * You should have received a copy of the GNU General Public License along * with the SPL. If not, see . */ #ifndef _SPL_TASKQ_H #define _SPL_TASKQ_H #include #include #include #include #include #include #include #include #include #define TASKQ_NAMELEN 31 #define TASKQ_PREPOPULATE 0x00000001 #define TASKQ_CPR_SAFE 0x00000002 #define TASKQ_DYNAMIC 0x00000004 #define TASKQ_THREADS_CPU_PCT 0x00000008 #define TASKQ_DC_BATCH 0x00000010 #define TASKQ_ACTIVE 0x80000000 /* * Flags for taskq_dispatch. TQ_SLEEP/TQ_NOSLEEP should be same as * KM_SLEEP/KM_NOSLEEP. TQ_NOQUEUE/TQ_NOALLOC are set particularly * large so as not to conflict with already used GFP_* defines. */ #define TQ_SLEEP 0x00000000 #define TQ_NOSLEEP 0x00000001 #define TQ_PUSHPAGE 0x00000002 #define TQ_NOQUEUE 0x01000000 #define TQ_NOALLOC 0x02000000 #define TQ_NEW 0x04000000 #define TQ_FRONT 0x08000000 /* * Reserved taskqid values. */ #define TASKQID_INVALID ((taskqid_t)0) #define TASKQID_INITIAL ((taskqid_t)1) /* * spin_lock(lock) and spin_lock_nested(lock,0) are equivalent, * so TQ_LOCK_DYNAMIC must not evaluate to 0 */ typedef enum tq_lock_role { TQ_LOCK_GENERAL = 0, TQ_LOCK_DYNAMIC = 1, } tq_lock_role_t; typedef unsigned long taskqid_t; typedef void (task_func_t)(void *); typedef struct taskq { spinlock_t tq_lock; /* protects taskq_t */ char *tq_name; /* taskq name */ int tq_instance; /* instance of tq_name */ struct list_head tq_thread_list; /* list of all threads */ struct list_head tq_active_list; /* list of active threads */ int tq_nactive; /* # of active threads */ int tq_nthreads; /* # of existing threads */ int tq_nspawn; /* # of threads being spawned */ int tq_maxthreads; /* # of threads maximum */ /* If PERCPU flag is set, percent of NCPUs to have as threads */ int tq_cpu_pct; int tq_pri; /* priority */ int tq_minalloc; /* min taskq_ent_t pool size */ int tq_maxalloc; /* max taskq_ent_t pool size */ int tq_nalloc; /* cur taskq_ent_t pool size */ uint_t tq_flags; /* flags */ taskqid_t tq_next_id; /* next pend/work id */ taskqid_t tq_lowest_id; /* lowest pend/work id */ struct list_head tq_free_list; /* free taskq_ent_t's */ struct list_head tq_pend_list; /* pending taskq_ent_t's */ struct list_head tq_prio_list; /* priority taskq_ent_t's */ struct list_head tq_delay_list; /* delayed taskq_ent_t's */ struct list_head tq_taskqs; /* all taskq_t's */ spl_wait_queue_head_t tq_work_waitq; /* new work waitq */ spl_wait_queue_head_t tq_wait_waitq; /* wait waitq */ tq_lock_role_t tq_lock_class; /* class when taking tq_lock */ /* list node for the cpu hotplug callback */ struct hlist_node tq_hp_cb_node; boolean_t tq_hp_support; + unsigned long lastshouldstop; /* when to purge dynamic */ } taskq_t; typedef struct taskq_ent { spinlock_t tqent_lock; spl_wait_queue_head_t tqent_waitq; struct timer_list tqent_timer; struct list_head tqent_list; taskqid_t tqent_id; task_func_t *tqent_func; void *tqent_arg; taskq_t *tqent_taskq; uintptr_t tqent_flags; unsigned long tqent_birth; } taskq_ent_t; #define TQENT_FLAG_PREALLOC 0x1 #define TQENT_FLAG_CANCEL 0x2 typedef struct taskq_thread { struct list_head tqt_thread_list; struct list_head tqt_active_list; struct task_struct *tqt_thread; taskq_t *tqt_tq; taskqid_t tqt_id; taskq_ent_t *tqt_task; uintptr_t tqt_flags; } taskq_thread_t; /* Global system-wide dynamic task queue available for all consumers */ extern taskq_t *system_taskq; /* Global dynamic task queue for long delay */ extern taskq_t *system_delay_taskq; /* List of all taskqs */ extern struct list_head tq_list; extern struct rw_semaphore tq_list_sem; extern taskqid_t taskq_dispatch(taskq_t *, task_func_t, void *, uint_t); extern taskqid_t taskq_dispatch_delay(taskq_t *, task_func_t, void *, uint_t, clock_t); extern void taskq_dispatch_ent(taskq_t *, task_func_t, void *, uint_t, taskq_ent_t *); extern int taskq_empty_ent(taskq_ent_t *); extern void taskq_init_ent(taskq_ent_t *); extern taskq_t *taskq_create(const char *, int, pri_t, int, int, uint_t); extern void taskq_destroy(taskq_t *); extern void taskq_wait_id(taskq_t *, taskqid_t); extern void taskq_wait_outstanding(taskq_t *, taskqid_t); extern void taskq_wait(taskq_t *); extern int taskq_cancel_id(taskq_t *, taskqid_t); extern int taskq_member(taskq_t *, kthread_t *); extern taskq_t *taskq_of_curthread(void); #define taskq_create_proc(name, nthreads, pri, min, max, proc, flags) \ taskq_create(name, nthreads, pri, min, max, flags) #define taskq_create_sysdc(name, nthreads, min, max, proc, dc, flags) \ ((void) sizeof (dc), \ taskq_create(name, nthreads, maxclsyspri, min, max, flags)) int spl_taskq_init(void); void spl_taskq_fini(void); #endif /* _SPL_TASKQ_H */ diff --git a/man/man4/spl.4 b/man/man4/spl.4 index 02efaf16dc3a..82455fb53254 100644 --- a/man/man4/spl.4 +++ b/man/man4/spl.4 @@ -1,196 +1,211 @@ .\" .\" The contents of this file are subject to the terms of the Common Development .\" and Distribution License (the "License"). You may not use this file except .\" in compliance with the License. You can obtain a copy of the license at .\" usr/src/OPENSOLARIS.LICENSE or https://opensource.org/licenses/CDDL-1.0. .\" .\" See the License for the specific language governing permissions and .\" limitations under the License. When distributing Covered Code, include this .\" CDDL HEADER in each file and include the License file at .\" usr/src/OPENSOLARIS.LICENSE. If applicable, add the following below this .\" CDDL HEADER, with the fields enclosed by brackets "[]" replaced with your .\" own identifying information: .\" Portions Copyright [yyyy] [name of copyright owner] .\" .\" Copyright 2013 Turbo Fredriksson . All rights reserved. .\" .Dd August 24, 2020 .Dt SPL 4 .Os . .Sh NAME .Nm spl .Nd parameters of the SPL kernel module . .Sh DESCRIPTION .Bl -tag -width Ds .It Sy spl_kmem_cache_kmem_threads Ns = Ns Sy 4 Pq uint The number of threads created for the spl_kmem_cache task queue. This task queue is responsible for allocating new slabs for use by the kmem caches. For the majority of systems and workloads only a small number of threads are required. . .It Sy spl_kmem_cache_reclaim Ns = Ns Sy 0 Pq uint When this is set it prevents Linux from being able to rapidly reclaim all the memory held by the kmem caches. This may be useful in circumstances where it's preferable that Linux reclaim memory from some other subsystem first. Setting this will increase the likelihood out of memory events on a memory constrained system. . .It Sy spl_kmem_cache_obj_per_slab Ns = Ns Sy 8 Pq uint The preferred number of objects per slab in the cache. In general, a larger value will increase the caches memory footprint while decreasing the time required to perform an allocation. Conversely, a smaller value will minimize the footprint and improve cache reclaim time but individual allocations may take longer. . .It Sy spl_kmem_cache_max_size Ns = Ns Sy 32 Po 64-bit Pc or Sy 4 Po 32-bit Pc Pq uint The maximum size of a kmem cache slab in MiB. This effectively limits the maximum cache object size to .Sy spl_kmem_cache_max_size Ns / Ns Sy spl_kmem_cache_obj_per_slab . .Pp Caches may not be created with object sized larger than this limit. . .It Sy spl_kmem_cache_slab_limit Ns = Ns Sy 16384 Pq uint For small objects the Linux slab allocator should be used to make the most efficient use of the memory. However, large objects are not supported by the Linux slab and therefore the SPL implementation is preferred. This value is used to determine the cutoff between a small and large object. .Pp Objects of size .Sy spl_kmem_cache_slab_limit or smaller will be allocated using the Linux slab allocator, large objects use the SPL allocator. A cutoff of 16K was determined to be optimal for architectures using 4K pages. . .It Sy spl_kmem_alloc_warn Ns = Ns Sy 32768 Pq uint As a general rule .Fn kmem_alloc allocations should be small, preferably just a few pages, since they must by physically contiguous. Therefore, a rate limited warning will be printed to the console for any .Fn kmem_alloc which exceeds a reasonable threshold. .Pp The default warning threshold is set to eight pages but capped at 32K to accommodate systems using large pages. This value was selected to be small enough to ensure the largest allocations are quickly noticed and fixed. But large enough to avoid logging any warnings when a allocation size is larger than optimal but not a serious concern. Since this value is tunable, developers are encouraged to set it lower when testing so any new largish allocations are quickly caught. These warnings may be disabled by setting the threshold to zero. . .It Sy spl_kmem_alloc_max Ns = Ns Sy KMALLOC_MAX_SIZE Ns / Ns Sy 4 Pq uint Large .Fn kmem_alloc allocations will fail if they exceed .Sy KMALLOC_MAX_SIZE . Allocations which are marginally smaller than this limit may succeed but should still be avoided due to the expense of locating a contiguous range of free pages. Therefore, a maximum kmem size with reasonable safely margin of 4x is set. .Fn kmem_alloc allocations larger than this maximum will quickly fail. .Fn vmem_alloc allocations less than or equal to this value will use .Fn kmalloc , but shift to .Fn vmalloc when exceeding this value. . .It Sy spl_kmem_cache_magazine_size Ns = Ns Sy 0 Pq uint Cache magazines are an optimization designed to minimize the cost of allocating memory. They do this by keeping a per-cpu cache of recently freed objects, which can then be reallocated without taking a lock. This can improve performance on highly contended caches. However, because objects in magazines will prevent otherwise empty slabs from being immediately released this may not be ideal for low memory machines. .Pp For this reason, .Sy spl_kmem_cache_magazine_size can be used to set a maximum magazine size. When this value is set to 0 the magazine size will be automatically determined based on the object size. Otherwise magazines will be limited to 2-256 objects per magazine (i.e per cpu). Magazines may never be entirely disabled in this implementation. . .It Sy spl_hostid Ns = Ns Sy 0 Pq ulong The system hostid, when set this can be used to uniquely identify a system. By default this value is set to zero which indicates the hostid is disabled. It can be explicitly enabled by placing a unique non-zero value in .Pa /etc/hostid . . .It Sy spl_hostid_path Ns = Ns Pa /etc/hostid Pq charp The expected path to locate the system hostid when specified. This value may be overridden for non-standard configurations. . .It Sy spl_panic_halt Ns = Ns Sy 0 Pq uint Cause a kernel panic on assertion failures. When not enabled, the thread is halted to facilitate further debugging. .Pp Set to a non-zero value to enable. . .It Sy spl_taskq_kick Ns = Ns Sy 0 Pq uint Kick stuck taskq to spawn threads. When writing a non-zero value to it, it will scan all the taskqs. If any of them have a pending task more than 5 seconds old, it will kick it to spawn more threads. This can be used if you find a rare deadlock occurs because one or more taskqs didn't spawn a thread when it should. . .It Sy spl_taskq_thread_bind Ns = Ns Sy 0 Pq int Bind taskq threads to specific CPUs. When enabled all taskq threads will be distributed evenly across the available CPUs. By default, this behavior is disabled to allow the Linux scheduler the maximum flexibility to determine where a thread should run. . .It Sy spl_taskq_thread_dynamic Ns = Ns Sy 1 Pq int Allow dynamic taskqs. When enabled taskqs which set the .Sy TASKQ_DYNAMIC flag will by default create only a single thread. New threads will be created on demand up to a maximum allowed number to facilitate the completion of outstanding tasks. Threads which are no longer needed will be promptly destroyed. By default this behavior is enabled but it can be disabled to aid performance analysis or troubleshooting. . .It Sy spl_taskq_thread_priority Ns = Ns Sy 1 Pq int Allow newly created taskq threads to set a non-default scheduler priority. When enabled, the priority specified when a taskq is created will be applied to all threads created by that taskq. When disabled all threads will use the default Linux kernel thread priority. By default, this behavior is enabled. . .It Sy spl_taskq_thread_sequential Ns = Ns Sy 4 Pq int The number of items a taskq worker thread must handle without interruption before requesting a new worker thread be spawned. This is used to control how quickly taskqs ramp up the number of threads processing the queue. Because Linux thread creation and destruction are relatively inexpensive a small default value has been selected. This means that normally threads will be created aggressively which is desirable. Increasing this value will result in a slower thread creation rate which may be preferable for some configurations. . .It Sy spl_max_show_tasks Ns = Ns Sy 512 Pq uint The maximum number of tasks per pending list in each taskq shown in .Pa /proc/spl/taskq{,-all} . Write .Sy 0 to turn off the limit. The proc file will walk the lists with lock held, reading it could cause a lock-up if the list grow too large without limiting the output. "(truncated)" will be shown if the list is larger than the limit. +. +.It Sy spl_taskq_thread_timeout_ms Ns = Ns Sy 10000 Pq uint +(Linux-only) +How long a taskq has to have had no work before we tear it down. +Previously, we would tear down a dynamic taskq worker as soon +as we noticed it had no work, but it was observed that this led +to a lot of churn in tearing down things we then immediately +spawned anew. +In practice, it seems any nonzero value will remove the vast +majority of this churn, while the nontrivially larger value +was chosen to help filter out the little remaining churn on +a mostly idle system. +Setting this value to +.Sy 0 +will revert to the previous behavior. .El diff --git a/module/os/linux/spl/spl-taskq.c b/module/os/linux/spl/spl-taskq.c index 84497359ce2e..d18f935b167c 100644 --- a/module/os/linux/spl/spl-taskq.c +++ b/module/os/linux/spl/spl-taskq.c @@ -1,1434 +1,1466 @@ /* * Copyright (C) 2007-2010 Lawrence Livermore National Security, LLC. * Copyright (C) 2007 The Regents of the University of California. * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER). * Written by Brian Behlendorf . * UCRL-CODE-235197 * * This file is part of the SPL, Solaris Porting Layer. * * The SPL is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the * Free Software Foundation; either version 2 of the License, or (at your * option) any later version. * * The SPL is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * for more details. * * You should have received a copy of the GNU General Public License along * with the SPL. If not, see . * * Solaris Porting Layer (SPL) Task Queue Implementation. */ #include #include #include #include #include #ifdef HAVE_CPU_HOTPLUG #include #endif static int spl_taskq_thread_bind = 0; module_param(spl_taskq_thread_bind, int, 0644); MODULE_PARM_DESC(spl_taskq_thread_bind, "Bind taskq thread to CPU by default"); +static uint_t spl_taskq_thread_timeout_ms = 10000; +/* BEGIN CSTYLED */ +module_param(spl_taskq_thread_timeout_ms, uint, 0644); +/* END CSTYLED */ +MODULE_PARM_DESC(spl_taskq_thread_timeout_ms, + "Time to require a dynamic thread be idle before it gets cleaned up"); static int spl_taskq_thread_dynamic = 1; module_param(spl_taskq_thread_dynamic, int, 0444); MODULE_PARM_DESC(spl_taskq_thread_dynamic, "Allow dynamic taskq threads"); static int spl_taskq_thread_priority = 1; module_param(spl_taskq_thread_priority, int, 0644); MODULE_PARM_DESC(spl_taskq_thread_priority, "Allow non-default priority for taskq threads"); static uint_t spl_taskq_thread_sequential = 4; /* BEGIN CSTYLED */ module_param(spl_taskq_thread_sequential, uint, 0644); /* END CSTYLED */ MODULE_PARM_DESC(spl_taskq_thread_sequential, "Create new taskq threads after N sequential tasks"); /* * Global system-wide dynamic task queue available for all consumers. This * taskq is not intended for long-running tasks; instead, a dedicated taskq * should be created. */ taskq_t *system_taskq; EXPORT_SYMBOL(system_taskq); /* Global dynamic task queue for long delay */ taskq_t *system_delay_taskq; EXPORT_SYMBOL(system_delay_taskq); /* Private dedicated taskq for creating new taskq threads on demand. */ static taskq_t *dynamic_taskq; static taskq_thread_t *taskq_thread_create(taskq_t *); #ifdef HAVE_CPU_HOTPLUG /* Multi-callback id for cpu hotplugging. */ static int spl_taskq_cpuhp_state; #endif /* List of all taskqs */ LIST_HEAD(tq_list); struct rw_semaphore tq_list_sem; static uint_t taskq_tsd; static int task_km_flags(uint_t flags) { if (flags & TQ_NOSLEEP) return (KM_NOSLEEP); if (flags & TQ_PUSHPAGE) return (KM_PUSHPAGE); return (KM_SLEEP); } /* * taskq_find_by_name - Find the largest instance number of a named taskq. */ static int taskq_find_by_name(const char *name) { struct list_head *tql = NULL; taskq_t *tq; list_for_each_prev(tql, &tq_list) { tq = list_entry(tql, taskq_t, tq_taskqs); if (strcmp(name, tq->tq_name) == 0) return (tq->tq_instance); } return (-1); } /* * NOTE: Must be called with tq->tq_lock held, returns a list_t which * is not attached to the free, work, or pending taskq lists. */ static taskq_ent_t * task_alloc(taskq_t *tq, uint_t flags, unsigned long *irqflags) { taskq_ent_t *t; int count = 0; ASSERT(tq); retry: /* Acquire taskq_ent_t's from free list if available */ if (!list_empty(&tq->tq_free_list) && !(flags & TQ_NEW)) { t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list); ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC)); ASSERT(!(t->tqent_flags & TQENT_FLAG_CANCEL)); ASSERT(!timer_pending(&t->tqent_timer)); list_del_init(&t->tqent_list); return (t); } /* Free list is empty and memory allocations are prohibited */ if (flags & TQ_NOALLOC) return (NULL); /* Hit maximum taskq_ent_t pool size */ if (tq->tq_nalloc >= tq->tq_maxalloc) { if (flags & TQ_NOSLEEP) return (NULL); /* * Sleep periodically polling the free list for an available * taskq_ent_t. Dispatching with TQ_SLEEP should always succeed * but we cannot block forever waiting for an taskq_ent_t to * show up in the free list, otherwise a deadlock can happen. * * Therefore, we need to allocate a new task even if the number * of allocated tasks is above tq->tq_maxalloc, but we still * end up delaying the task allocation by one second, thereby * throttling the task dispatch rate. */ spin_unlock_irqrestore(&tq->tq_lock, *irqflags); schedule_timeout(HZ / 100); spin_lock_irqsave_nested(&tq->tq_lock, *irqflags, tq->tq_lock_class); if (count < 100) { count++; goto retry; } } spin_unlock_irqrestore(&tq->tq_lock, *irqflags); t = kmem_alloc(sizeof (taskq_ent_t), task_km_flags(flags)); spin_lock_irqsave_nested(&tq->tq_lock, *irqflags, tq->tq_lock_class); if (t) { taskq_init_ent(t); tq->tq_nalloc++; } return (t); } /* * NOTE: Must be called with tq->tq_lock held, expects the taskq_ent_t * to already be removed from the free, work, or pending taskq lists. */ static void task_free(taskq_t *tq, taskq_ent_t *t) { ASSERT(tq); ASSERT(t); ASSERT(list_empty(&t->tqent_list)); ASSERT(!timer_pending(&t->tqent_timer)); kmem_free(t, sizeof (taskq_ent_t)); tq->tq_nalloc--; } /* * NOTE: Must be called with tq->tq_lock held, either destroys the * taskq_ent_t if too many exist or moves it to the free list for later use. */ static void task_done(taskq_t *tq, taskq_ent_t *t) { ASSERT(tq); ASSERT(t); /* Wake tasks blocked in taskq_wait_id() */ wake_up_all(&t->tqent_waitq); list_del_init(&t->tqent_list); if (tq->tq_nalloc <= tq->tq_minalloc) { t->tqent_id = TASKQID_INVALID; t->tqent_func = NULL; t->tqent_arg = NULL; t->tqent_flags = 0; list_add_tail(&t->tqent_list, &tq->tq_free_list); } else { task_free(tq, t); } } /* * When a delayed task timer expires remove it from the delay list and * add it to the priority list in order for immediate processing. */ static void task_expire_impl(taskq_ent_t *t) { taskq_ent_t *w; taskq_t *tq = t->tqent_taskq; struct list_head *l = NULL; unsigned long flags; spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class); if (t->tqent_flags & TQENT_FLAG_CANCEL) { ASSERT(list_empty(&t->tqent_list)); spin_unlock_irqrestore(&tq->tq_lock, flags); return; } t->tqent_birth = jiffies; DTRACE_PROBE1(taskq_ent__birth, taskq_ent_t *, t); /* * The priority list must be maintained in strict task id order * from lowest to highest for lowest_id to be easily calculable. */ list_del(&t->tqent_list); list_for_each_prev(l, &tq->tq_prio_list) { w = list_entry(l, taskq_ent_t, tqent_list); if (w->tqent_id < t->tqent_id) { list_add(&t->tqent_list, l); break; } } if (l == &tq->tq_prio_list) list_add(&t->tqent_list, &tq->tq_prio_list); spin_unlock_irqrestore(&tq->tq_lock, flags); wake_up(&tq->tq_work_waitq); } static void task_expire(spl_timer_list_t tl) { struct timer_list *tmr = (struct timer_list *)tl; taskq_ent_t *t = from_timer(t, tmr, tqent_timer); task_expire_impl(t); } /* * Returns the lowest incomplete taskqid_t. The taskqid_t may * be queued on the pending list, on the priority list, on the * delay list, or on the work list currently being handled, but * it is not 100% complete yet. */ static taskqid_t taskq_lowest_id(taskq_t *tq) { taskqid_t lowest_id = tq->tq_next_id; taskq_ent_t *t; taskq_thread_t *tqt; if (!list_empty(&tq->tq_pend_list)) { t = list_entry(tq->tq_pend_list.next, taskq_ent_t, tqent_list); lowest_id = MIN(lowest_id, t->tqent_id); } if (!list_empty(&tq->tq_prio_list)) { t = list_entry(tq->tq_prio_list.next, taskq_ent_t, tqent_list); lowest_id = MIN(lowest_id, t->tqent_id); } if (!list_empty(&tq->tq_delay_list)) { t = list_entry(tq->tq_delay_list.next, taskq_ent_t, tqent_list); lowest_id = MIN(lowest_id, t->tqent_id); } if (!list_empty(&tq->tq_active_list)) { tqt = list_entry(tq->tq_active_list.next, taskq_thread_t, tqt_active_list); ASSERT(tqt->tqt_id != TASKQID_INVALID); lowest_id = MIN(lowest_id, tqt->tqt_id); } return (lowest_id); } /* * Insert a task into a list keeping the list sorted by increasing taskqid. */ static void taskq_insert_in_order(taskq_t *tq, taskq_thread_t *tqt) { taskq_thread_t *w; struct list_head *l = NULL; ASSERT(tq); ASSERT(tqt); list_for_each_prev(l, &tq->tq_active_list) { w = list_entry(l, taskq_thread_t, tqt_active_list); if (w->tqt_id < tqt->tqt_id) { list_add(&tqt->tqt_active_list, l); break; } } if (l == &tq->tq_active_list) list_add(&tqt->tqt_active_list, &tq->tq_active_list); } /* * Find and return a task from the given list if it exists. The list * must be in lowest to highest task id order. */ static taskq_ent_t * taskq_find_list(taskq_t *tq, struct list_head *lh, taskqid_t id) { struct list_head *l = NULL; taskq_ent_t *t; list_for_each(l, lh) { t = list_entry(l, taskq_ent_t, tqent_list); if (t->tqent_id == id) return (t); if (t->tqent_id > id) break; } return (NULL); } /* * Find an already dispatched task given the task id regardless of what * state it is in. If a task is still pending it will be returned. * If a task is executing, then -EBUSY will be returned instead. * If the task has already been run then NULL is returned. */ static taskq_ent_t * taskq_find(taskq_t *tq, taskqid_t id) { taskq_thread_t *tqt; struct list_head *l = NULL; taskq_ent_t *t; t = taskq_find_list(tq, &tq->tq_delay_list, id); if (t) return (t); t = taskq_find_list(tq, &tq->tq_prio_list, id); if (t) return (t); t = taskq_find_list(tq, &tq->tq_pend_list, id); if (t) return (t); list_for_each(l, &tq->tq_active_list) { tqt = list_entry(l, taskq_thread_t, tqt_active_list); if (tqt->tqt_id == id) { /* * Instead of returning tqt_task, we just return a non * NULL value to prevent misuse, since tqt_task only * has two valid fields. */ return (ERR_PTR(-EBUSY)); } } return (NULL); } /* * Theory for the taskq_wait_id(), taskq_wait_outstanding(), and * taskq_wait() functions below. * * Taskq waiting is accomplished by tracking the lowest outstanding task * id and the next available task id. As tasks are dispatched they are * added to the tail of the pending, priority, or delay lists. As worker * threads become available the tasks are removed from the heads of these * lists and linked to the worker threads. This ensures the lists are * kept sorted by lowest to highest task id. * * Therefore the lowest outstanding task id can be quickly determined by * checking the head item from all of these lists. This value is stored * with the taskq as the lowest id. It only needs to be recalculated when * either the task with the current lowest id completes or is canceled. * * By blocking until the lowest task id exceeds the passed task id the * taskq_wait_outstanding() function can be easily implemented. Similarly, * by blocking until the lowest task id matches the next task id taskq_wait() * can be implemented. * * Callers should be aware that when there are multiple worked threads it * is possible for larger task ids to complete before smaller ones. Also * when the taskq contains delay tasks with small task ids callers may * block for a considerable length of time waiting for them to expire and * execute. */ static int taskq_wait_id_check(taskq_t *tq, taskqid_t id) { int rc; unsigned long flags; spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class); rc = (taskq_find(tq, id) == NULL); spin_unlock_irqrestore(&tq->tq_lock, flags); return (rc); } /* * The taskq_wait_id() function blocks until the passed task id completes. * This does not guarantee that all lower task ids have completed. */ void taskq_wait_id(taskq_t *tq, taskqid_t id) { wait_event(tq->tq_wait_waitq, taskq_wait_id_check(tq, id)); } EXPORT_SYMBOL(taskq_wait_id); static int taskq_wait_outstanding_check(taskq_t *tq, taskqid_t id) { int rc; unsigned long flags; spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class); rc = (id < tq->tq_lowest_id); spin_unlock_irqrestore(&tq->tq_lock, flags); return (rc); } /* * The taskq_wait_outstanding() function will block until all tasks with a * lower taskqid than the passed 'id' have been completed. Note that all * task id's are assigned monotonically at dispatch time. Zero may be * passed for the id to indicate all tasks dispatch up to this point, * but not after, should be waited for. */ void taskq_wait_outstanding(taskq_t *tq, taskqid_t id) { id = id ? id : tq->tq_next_id - 1; wait_event(tq->tq_wait_waitq, taskq_wait_outstanding_check(tq, id)); } EXPORT_SYMBOL(taskq_wait_outstanding); static int taskq_wait_check(taskq_t *tq) { int rc; unsigned long flags; spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class); rc = (tq->tq_lowest_id == tq->tq_next_id); spin_unlock_irqrestore(&tq->tq_lock, flags); return (rc); } /* * The taskq_wait() function will block until the taskq is empty. * This means that if a taskq re-dispatches work to itself taskq_wait() * callers will block indefinitely. */ void taskq_wait(taskq_t *tq) { wait_event(tq->tq_wait_waitq, taskq_wait_check(tq)); } EXPORT_SYMBOL(taskq_wait); int taskq_member(taskq_t *tq, kthread_t *t) { return (tq == (taskq_t *)tsd_get_by_thread(taskq_tsd, t)); } EXPORT_SYMBOL(taskq_member); taskq_t * taskq_of_curthread(void) { return (tsd_get(taskq_tsd)); } EXPORT_SYMBOL(taskq_of_curthread); /* * Cancel an already dispatched task given the task id. Still pending tasks * will be immediately canceled, and if the task is active the function will * block until it completes. Preallocated tasks which are canceled must be * freed by the caller. */ int taskq_cancel_id(taskq_t *tq, taskqid_t id) { taskq_ent_t *t; int rc = ENOENT; unsigned long flags; ASSERT(tq); spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class); t = taskq_find(tq, id); if (t && t != ERR_PTR(-EBUSY)) { list_del_init(&t->tqent_list); t->tqent_flags |= TQENT_FLAG_CANCEL; /* * When canceling the lowest outstanding task id we * must recalculate the new lowest outstanding id. */ if (tq->tq_lowest_id == t->tqent_id) { tq->tq_lowest_id = taskq_lowest_id(tq); ASSERT3S(tq->tq_lowest_id, >, t->tqent_id); } /* * The task_expire() function takes the tq->tq_lock so drop * drop the lock before synchronously cancelling the timer. */ if (timer_pending(&t->tqent_timer)) { spin_unlock_irqrestore(&tq->tq_lock, flags); del_timer_sync(&t->tqent_timer); spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class); } if (!(t->tqent_flags & TQENT_FLAG_PREALLOC)) task_done(tq, t); rc = 0; } spin_unlock_irqrestore(&tq->tq_lock, flags); if (t == ERR_PTR(-EBUSY)) { taskq_wait_id(tq, id); rc = EBUSY; } return (rc); } EXPORT_SYMBOL(taskq_cancel_id); static int taskq_thread_spawn(taskq_t *tq); taskqid_t taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags) { taskq_ent_t *t; taskqid_t rc = TASKQID_INVALID; unsigned long irqflags; ASSERT(tq); ASSERT(func); spin_lock_irqsave_nested(&tq->tq_lock, irqflags, tq->tq_lock_class); /* Taskq being destroyed and all tasks drained */ if (!(tq->tq_flags & TASKQ_ACTIVE)) goto out; /* Do not queue the task unless there is idle thread for it */ ASSERT(tq->tq_nactive <= tq->tq_nthreads); if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads)) { /* Dynamic taskq may be able to spawn another thread */ if (!(tq->tq_flags & TASKQ_DYNAMIC) || taskq_thread_spawn(tq) == 0) goto out; } if ((t = task_alloc(tq, flags, &irqflags)) == NULL) goto out; spin_lock(&t->tqent_lock); /* Queue to the front of the list to enforce TQ_NOQUEUE semantics */ if (flags & TQ_NOQUEUE) list_add(&t->tqent_list, &tq->tq_prio_list); /* Queue to the priority list instead of the pending list */ else if (flags & TQ_FRONT) list_add_tail(&t->tqent_list, &tq->tq_prio_list); else list_add_tail(&t->tqent_list, &tq->tq_pend_list); t->tqent_id = rc = tq->tq_next_id; tq->tq_next_id++; t->tqent_func = func; t->tqent_arg = arg; t->tqent_taskq = tq; t->tqent_timer.function = NULL; t->tqent_timer.expires = 0; t->tqent_birth = jiffies; DTRACE_PROBE1(taskq_ent__birth, taskq_ent_t *, t); ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC)); spin_unlock(&t->tqent_lock); wake_up(&tq->tq_work_waitq); out: /* Spawn additional taskq threads if required. */ if (!(flags & TQ_NOQUEUE) && tq->tq_nactive == tq->tq_nthreads) (void) taskq_thread_spawn(tq); spin_unlock_irqrestore(&tq->tq_lock, irqflags); return (rc); } EXPORT_SYMBOL(taskq_dispatch); taskqid_t taskq_dispatch_delay(taskq_t *tq, task_func_t func, void *arg, uint_t flags, clock_t expire_time) { taskqid_t rc = TASKQID_INVALID; taskq_ent_t *t; unsigned long irqflags; ASSERT(tq); ASSERT(func); spin_lock_irqsave_nested(&tq->tq_lock, irqflags, tq->tq_lock_class); /* Taskq being destroyed and all tasks drained */ if (!(tq->tq_flags & TASKQ_ACTIVE)) goto out; if ((t = task_alloc(tq, flags, &irqflags)) == NULL) goto out; spin_lock(&t->tqent_lock); /* Queue to the delay list for subsequent execution */ list_add_tail(&t->tqent_list, &tq->tq_delay_list); t->tqent_id = rc = tq->tq_next_id; tq->tq_next_id++; t->tqent_func = func; t->tqent_arg = arg; t->tqent_taskq = tq; t->tqent_timer.function = task_expire; t->tqent_timer.expires = (unsigned long)expire_time; add_timer(&t->tqent_timer); ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC)); spin_unlock(&t->tqent_lock); out: /* Spawn additional taskq threads if required. */ if (tq->tq_nactive == tq->tq_nthreads) (void) taskq_thread_spawn(tq); spin_unlock_irqrestore(&tq->tq_lock, irqflags); return (rc); } EXPORT_SYMBOL(taskq_dispatch_delay); void taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, taskq_ent_t *t) { unsigned long irqflags; ASSERT(tq); ASSERT(func); spin_lock_irqsave_nested(&tq->tq_lock, irqflags, tq->tq_lock_class); /* Taskq being destroyed and all tasks drained */ if (!(tq->tq_flags & TASKQ_ACTIVE)) { t->tqent_id = TASKQID_INVALID; goto out; } if ((flags & TQ_NOQUEUE) && (tq->tq_nactive == tq->tq_nthreads)) { /* Dynamic taskq may be able to spawn another thread */ if (!(tq->tq_flags & TASKQ_DYNAMIC) || taskq_thread_spawn(tq) == 0) goto out2; flags |= TQ_FRONT; } spin_lock(&t->tqent_lock); /* * Make sure the entry is not on some other taskq; it is important to * ASSERT() under lock */ ASSERT(taskq_empty_ent(t)); /* * Mark it as a prealloc'd task. This is important * to ensure that we don't free it later. */ t->tqent_flags |= TQENT_FLAG_PREALLOC; /* Queue to the priority list instead of the pending list */ if (flags & TQ_FRONT) list_add_tail(&t->tqent_list, &tq->tq_prio_list); else list_add_tail(&t->tqent_list, &tq->tq_pend_list); t->tqent_id = tq->tq_next_id; tq->tq_next_id++; t->tqent_func = func; t->tqent_arg = arg; t->tqent_taskq = tq; t->tqent_birth = jiffies; DTRACE_PROBE1(taskq_ent__birth, taskq_ent_t *, t); spin_unlock(&t->tqent_lock); wake_up(&tq->tq_work_waitq); out: /* Spawn additional taskq threads if required. */ if (tq->tq_nactive == tq->tq_nthreads) (void) taskq_thread_spawn(tq); out2: spin_unlock_irqrestore(&tq->tq_lock, irqflags); } EXPORT_SYMBOL(taskq_dispatch_ent); int taskq_empty_ent(taskq_ent_t *t) { return (list_empty(&t->tqent_list)); } EXPORT_SYMBOL(taskq_empty_ent); void taskq_init_ent(taskq_ent_t *t) { spin_lock_init(&t->tqent_lock); init_waitqueue_head(&t->tqent_waitq); timer_setup(&t->tqent_timer, NULL, 0); INIT_LIST_HEAD(&t->tqent_list); t->tqent_id = 0; t->tqent_func = NULL; t->tqent_arg = NULL; t->tqent_flags = 0; t->tqent_taskq = NULL; } EXPORT_SYMBOL(taskq_init_ent); /* * Return the next pending task, preference is given to tasks on the * priority list which were dispatched with TQ_FRONT. */ static taskq_ent_t * taskq_next_ent(taskq_t *tq) { struct list_head *list; if (!list_empty(&tq->tq_prio_list)) list = &tq->tq_prio_list; else if (!list_empty(&tq->tq_pend_list)) list = &tq->tq_pend_list; else return (NULL); return (list_entry(list->next, taskq_ent_t, tqent_list)); } /* * Spawns a new thread for the specified taskq. */ static void taskq_thread_spawn_task(void *arg) { taskq_t *tq = (taskq_t *)arg; unsigned long flags; if (taskq_thread_create(tq) == NULL) { /* restore spawning count if failed */ spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class); tq->tq_nspawn--; spin_unlock_irqrestore(&tq->tq_lock, flags); } } /* * Spawn addition threads for dynamic taskqs (TASKQ_DYNAMIC) the current * number of threads is insufficient to handle the pending tasks. These * new threads must be created by the dedicated dynamic_taskq to avoid * deadlocks between thread creation and memory reclaim. The system_taskq * which is also a dynamic taskq cannot be safely used for this. */ static int taskq_thread_spawn(taskq_t *tq) { int spawning = 0; if (!(tq->tq_flags & TASKQ_DYNAMIC)) return (0); if ((tq->tq_nthreads + tq->tq_nspawn < tq->tq_maxthreads) && (tq->tq_flags & TASKQ_ACTIVE)) { spawning = (++tq->tq_nspawn); taskq_dispatch(dynamic_taskq, taskq_thread_spawn_task, tq, TQ_NOSLEEP); } return (spawning); } /* * Threads in a dynamic taskq should only exit once it has been completely * drained and no other threads are actively servicing tasks. This prevents * threads from being created and destroyed more than is required. * * The first thread is the thread list is treated as the primary thread. * There is nothing special about the primary thread but in order to avoid * all the taskq pids from changing we opt to make it long running. */ static int taskq_thread_should_stop(taskq_t *tq, taskq_thread_t *tqt) { if (!(tq->tq_flags & TASKQ_DYNAMIC)) return (0); if (list_first_entry(&(tq->tq_thread_list), taskq_thread_t, tqt_thread_list) == tqt) return (0); - return + int no_work = ((tq->tq_nspawn == 0) && /* No threads are being spawned */ (tq->tq_nactive == 0) && /* No threads are handling tasks */ (tq->tq_nthreads > 1) && /* More than 1 thread is running */ (!taskq_next_ent(tq)) && /* There are no pending tasks */ (spl_taskq_thread_dynamic)); /* Dynamic taskqs are allowed */ + + /* + * If we would have said stop before, let's instead wait a bit, maybe + * we'll see more work come our way soon... + */ + if (no_work) { + /* if it's 0, we want the old behavior. */ + /* if the taskq is being torn down, we also want to go away. */ + if (spl_taskq_thread_timeout_ms == 0 || + !(tq->tq_flags & TASKQ_ACTIVE)) + return (1); + unsigned long lasttime = tq->lastshouldstop; + if (lasttime > 0) { + if (time_after(jiffies, lasttime + + msecs_to_jiffies(spl_taskq_thread_timeout_ms))) + return (1); + else + return (0); + } else { + tq->lastshouldstop = jiffies; + } + } else { + tq->lastshouldstop = 0; + } + return (0); } static int taskq_thread(void *args) { DECLARE_WAITQUEUE(wait, current); sigset_t blocked; taskq_thread_t *tqt = args; taskq_t *tq; taskq_ent_t *t; int seq_tasks = 0; unsigned long flags; taskq_ent_t dup_task = {}; ASSERT(tqt); ASSERT(tqt->tqt_tq); tq = tqt->tqt_tq; current->flags |= PF_NOFREEZE; (void) spl_fstrans_mark(); sigfillset(&blocked); sigprocmask(SIG_BLOCK, &blocked, NULL); flush_signals(current); tsd_set(taskq_tsd, tq); spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class); /* * If we are dynamically spawned, decrease spawning count. Note that * we could be created during taskq_create, in which case we shouldn't * do the decrement. But it's fine because taskq_create will reset * tq_nspawn later. */ if (tq->tq_flags & TASKQ_DYNAMIC) tq->tq_nspawn--; /* Immediately exit if more threads than allowed were created. */ if (tq->tq_nthreads >= tq->tq_maxthreads) goto error; tq->tq_nthreads++; list_add_tail(&tqt->tqt_thread_list, &tq->tq_thread_list); wake_up(&tq->tq_wait_waitq); set_current_state(TASK_INTERRUPTIBLE); while (!kthread_should_stop()) { if (list_empty(&tq->tq_pend_list) && list_empty(&tq->tq_prio_list)) { if (taskq_thread_should_stop(tq, tqt)) { wake_up_all(&tq->tq_wait_waitq); break; } add_wait_queue_exclusive(&tq->tq_work_waitq, &wait); spin_unlock_irqrestore(&tq->tq_lock, flags); schedule(); seq_tasks = 0; spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class); remove_wait_queue(&tq->tq_work_waitq, &wait); } else { __set_current_state(TASK_RUNNING); } if ((t = taskq_next_ent(tq)) != NULL) { list_del_init(&t->tqent_list); /* * A TQENT_FLAG_PREALLOC task may be reused or freed * during the task function call. Store tqent_id and * tqent_flags here. * * Also use an on stack taskq_ent_t for tqt_task * assignment in this case; we want to make sure * to duplicate all fields, so the values are * correct when it's accessed via DTRACE_PROBE*. */ tqt->tqt_id = t->tqent_id; tqt->tqt_flags = t->tqent_flags; if (t->tqent_flags & TQENT_FLAG_PREALLOC) { dup_task = *t; t = &dup_task; } tqt->tqt_task = t; taskq_insert_in_order(tq, tqt); tq->tq_nactive++; spin_unlock_irqrestore(&tq->tq_lock, flags); DTRACE_PROBE1(taskq_ent__start, taskq_ent_t *, t); /* Perform the requested task */ t->tqent_func(t->tqent_arg); DTRACE_PROBE1(taskq_ent__finish, taskq_ent_t *, t); spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class); tq->tq_nactive--; list_del_init(&tqt->tqt_active_list); tqt->tqt_task = NULL; /* For prealloc'd tasks, we don't free anything. */ if (!(tqt->tqt_flags & TQENT_FLAG_PREALLOC)) task_done(tq, t); /* * When the current lowest outstanding taskqid is * done calculate the new lowest outstanding id */ if (tq->tq_lowest_id == tqt->tqt_id) { tq->tq_lowest_id = taskq_lowest_id(tq); ASSERT3S(tq->tq_lowest_id, >, tqt->tqt_id); } /* Spawn additional taskq threads if required. */ if ((++seq_tasks) > spl_taskq_thread_sequential && taskq_thread_spawn(tq)) seq_tasks = 0; tqt->tqt_id = TASKQID_INVALID; tqt->tqt_flags = 0; wake_up_all(&tq->tq_wait_waitq); } else { if (taskq_thread_should_stop(tq, tqt)) break; } set_current_state(TASK_INTERRUPTIBLE); } __set_current_state(TASK_RUNNING); tq->tq_nthreads--; list_del_init(&tqt->tqt_thread_list); error: kmem_free(tqt, sizeof (taskq_thread_t)); spin_unlock_irqrestore(&tq->tq_lock, flags); tsd_set(taskq_tsd, NULL); thread_exit(); return (0); } static taskq_thread_t * taskq_thread_create(taskq_t *tq) { static int last_used_cpu = 0; taskq_thread_t *tqt; tqt = kmem_alloc(sizeof (*tqt), KM_PUSHPAGE); INIT_LIST_HEAD(&tqt->tqt_thread_list); INIT_LIST_HEAD(&tqt->tqt_active_list); tqt->tqt_tq = tq; tqt->tqt_id = TASKQID_INVALID; tqt->tqt_thread = spl_kthread_create(taskq_thread, tqt, "%s", tq->tq_name); if (tqt->tqt_thread == NULL) { kmem_free(tqt, sizeof (taskq_thread_t)); return (NULL); } if (spl_taskq_thread_bind) { last_used_cpu = (last_used_cpu + 1) % num_online_cpus(); kthread_bind(tqt->tqt_thread, last_used_cpu); } if (spl_taskq_thread_priority) set_user_nice(tqt->tqt_thread, PRIO_TO_NICE(tq->tq_pri)); wake_up_process(tqt->tqt_thread); return (tqt); } taskq_t * taskq_create(const char *name, int threads_arg, pri_t pri, int minalloc, int maxalloc, uint_t flags) { taskq_t *tq; taskq_thread_t *tqt; int count = 0, rc = 0, i; unsigned long irqflags; int nthreads = threads_arg; ASSERT(name != NULL); ASSERT(minalloc >= 0); ASSERT(!(flags & (TASKQ_CPR_SAFE))); /* Unsupported */ /* Scale the number of threads using nthreads as a percentage */ if (flags & TASKQ_THREADS_CPU_PCT) { ASSERT(nthreads <= 100); ASSERT(nthreads >= 0); nthreads = MIN(threads_arg, 100); nthreads = MAX(nthreads, 0); nthreads = MAX((num_online_cpus() * nthreads) /100, 1); } tq = kmem_alloc(sizeof (*tq), KM_PUSHPAGE); if (tq == NULL) return (NULL); tq->tq_hp_support = B_FALSE; #ifdef HAVE_CPU_HOTPLUG if (flags & TASKQ_THREADS_CPU_PCT) { tq->tq_hp_support = B_TRUE; if (cpuhp_state_add_instance_nocalls(spl_taskq_cpuhp_state, &tq->tq_hp_cb_node) != 0) { kmem_free(tq, sizeof (*tq)); return (NULL); } } #endif spin_lock_init(&tq->tq_lock); INIT_LIST_HEAD(&tq->tq_thread_list); INIT_LIST_HEAD(&tq->tq_active_list); tq->tq_name = kmem_strdup(name); tq->tq_nactive = 0; tq->tq_nthreads = 0; tq->tq_nspawn = 0; tq->tq_maxthreads = nthreads; tq->tq_cpu_pct = threads_arg; tq->tq_pri = pri; tq->tq_minalloc = minalloc; tq->tq_maxalloc = maxalloc; tq->tq_nalloc = 0; tq->tq_flags = (flags | TASKQ_ACTIVE); tq->tq_next_id = TASKQID_INITIAL; tq->tq_lowest_id = TASKQID_INITIAL; + tq->lastshouldstop = 0; INIT_LIST_HEAD(&tq->tq_free_list); INIT_LIST_HEAD(&tq->tq_pend_list); INIT_LIST_HEAD(&tq->tq_prio_list); INIT_LIST_HEAD(&tq->tq_delay_list); init_waitqueue_head(&tq->tq_work_waitq); init_waitqueue_head(&tq->tq_wait_waitq); tq->tq_lock_class = TQ_LOCK_GENERAL; INIT_LIST_HEAD(&tq->tq_taskqs); if (flags & TASKQ_PREPOPULATE) { spin_lock_irqsave_nested(&tq->tq_lock, irqflags, tq->tq_lock_class); for (i = 0; i < minalloc; i++) task_done(tq, task_alloc(tq, TQ_PUSHPAGE | TQ_NEW, &irqflags)); spin_unlock_irqrestore(&tq->tq_lock, irqflags); } if ((flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic) nthreads = 1; for (i = 0; i < nthreads; i++) { tqt = taskq_thread_create(tq); if (tqt == NULL) rc = 1; else count++; } /* Wait for all threads to be started before potential destroy */ wait_event(tq->tq_wait_waitq, tq->tq_nthreads == count); /* * taskq_thread might have touched nspawn, but we don't want them to * because they're not dynamically spawned. So we reset it to 0 */ tq->tq_nspawn = 0; if (rc) { taskq_destroy(tq); tq = NULL; } else { down_write(&tq_list_sem); tq->tq_instance = taskq_find_by_name(name) + 1; list_add_tail(&tq->tq_taskqs, &tq_list); up_write(&tq_list_sem); } return (tq); } EXPORT_SYMBOL(taskq_create); void taskq_destroy(taskq_t *tq) { struct task_struct *thread; taskq_thread_t *tqt; taskq_ent_t *t; unsigned long flags; ASSERT(tq); spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class); tq->tq_flags &= ~TASKQ_ACTIVE; spin_unlock_irqrestore(&tq->tq_lock, flags); #ifdef HAVE_CPU_HOTPLUG if (tq->tq_hp_support) { VERIFY0(cpuhp_state_remove_instance_nocalls( spl_taskq_cpuhp_state, &tq->tq_hp_cb_node)); } #endif /* * When TASKQ_ACTIVE is clear new tasks may not be added nor may * new worker threads be spawned for dynamic taskq. */ if (dynamic_taskq != NULL) taskq_wait_outstanding(dynamic_taskq, 0); taskq_wait(tq); /* remove taskq from global list used by the kstats */ down_write(&tq_list_sem); list_del(&tq->tq_taskqs); up_write(&tq_list_sem); spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class); /* wait for spawning threads to insert themselves to the list */ while (tq->tq_nspawn) { spin_unlock_irqrestore(&tq->tq_lock, flags); schedule_timeout_interruptible(1); spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class); } /* * Signal each thread to exit and block until it does. Each thread * is responsible for removing itself from the list and freeing its * taskq_thread_t. This allows for idle threads to opt to remove * themselves from the taskq. They can be recreated as needed. */ while (!list_empty(&tq->tq_thread_list)) { tqt = list_entry(tq->tq_thread_list.next, taskq_thread_t, tqt_thread_list); thread = tqt->tqt_thread; spin_unlock_irqrestore(&tq->tq_lock, flags); kthread_stop(thread); spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class); } while (!list_empty(&tq->tq_free_list)) { t = list_entry(tq->tq_free_list.next, taskq_ent_t, tqent_list); ASSERT(!(t->tqent_flags & TQENT_FLAG_PREALLOC)); list_del_init(&t->tqent_list); task_free(tq, t); } ASSERT0(tq->tq_nthreads); ASSERT0(tq->tq_nalloc); ASSERT0(tq->tq_nspawn); ASSERT(list_empty(&tq->tq_thread_list)); ASSERT(list_empty(&tq->tq_active_list)); ASSERT(list_empty(&tq->tq_free_list)); ASSERT(list_empty(&tq->tq_pend_list)); ASSERT(list_empty(&tq->tq_prio_list)); ASSERT(list_empty(&tq->tq_delay_list)); spin_unlock_irqrestore(&tq->tq_lock, flags); kmem_strfree(tq->tq_name); kmem_free(tq, sizeof (taskq_t)); } EXPORT_SYMBOL(taskq_destroy); static unsigned int spl_taskq_kick = 0; /* * 2.6.36 API Change * module_param_cb is introduced to take kernel_param_ops and * module_param_call is marked as obsolete. Also set and get operations * were changed to take a 'const struct kernel_param *'. */ static int #ifdef module_param_cb param_set_taskq_kick(const char *val, const struct kernel_param *kp) #else param_set_taskq_kick(const char *val, struct kernel_param *kp) #endif { int ret; taskq_t *tq = NULL; taskq_ent_t *t; unsigned long flags; ret = param_set_uint(val, kp); if (ret < 0 || !spl_taskq_kick) return (ret); /* reset value */ spl_taskq_kick = 0; down_read(&tq_list_sem); list_for_each_entry(tq, &tq_list, tq_taskqs) { spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class); /* Check if the first pending is older than 5 seconds */ t = taskq_next_ent(tq); if (t && time_after(jiffies, t->tqent_birth + 5*HZ)) { (void) taskq_thread_spawn(tq); printk(KERN_INFO "spl: Kicked taskq %s/%d\n", tq->tq_name, tq->tq_instance); } spin_unlock_irqrestore(&tq->tq_lock, flags); } up_read(&tq_list_sem); return (ret); } #ifdef module_param_cb static const struct kernel_param_ops param_ops_taskq_kick = { .set = param_set_taskq_kick, .get = param_get_uint, }; module_param_cb(spl_taskq_kick, ¶m_ops_taskq_kick, &spl_taskq_kick, 0644); #else module_param_call(spl_taskq_kick, param_set_taskq_kick, param_get_uint, &spl_taskq_kick, 0644); #endif MODULE_PARM_DESC(spl_taskq_kick, "Write nonzero to kick stuck taskqs to spawn more threads"); #ifdef HAVE_CPU_HOTPLUG /* * This callback will be called exactly once for each core that comes online, * for each dynamic taskq. We attempt to expand taskqs that have * TASKQ_THREADS_CPU_PCT set. We need to redo the percentage calculation every * time, to correctly determine whether or not to add a thread. */ static int spl_taskq_expand(unsigned int cpu, struct hlist_node *node) { taskq_t *tq = list_entry(node, taskq_t, tq_hp_cb_node); unsigned long flags; int err = 0; ASSERT(tq); spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class); if (!(tq->tq_flags & TASKQ_ACTIVE)) { spin_unlock_irqrestore(&tq->tq_lock, flags); return (err); } ASSERT(tq->tq_flags & TASKQ_THREADS_CPU_PCT); int nthreads = MIN(tq->tq_cpu_pct, 100); nthreads = MAX(((num_online_cpus() + 1) * nthreads) / 100, 1); tq->tq_maxthreads = nthreads; if (!((tq->tq_flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic) && tq->tq_maxthreads > tq->tq_nthreads) { spin_unlock_irqrestore(&tq->tq_lock, flags); taskq_thread_t *tqt = taskq_thread_create(tq); if (tqt == NULL) err = -1; return (err); } spin_unlock_irqrestore(&tq->tq_lock, flags); return (err); } /* * While we don't support offlining CPUs, it is possible that CPUs will fail * to online successfully. We do need to be able to handle this case * gracefully. */ static int spl_taskq_prepare_down(unsigned int cpu, struct hlist_node *node) { taskq_t *tq = list_entry(node, taskq_t, tq_hp_cb_node); unsigned long flags; ASSERT(tq); spin_lock_irqsave_nested(&tq->tq_lock, flags, tq->tq_lock_class); if (!(tq->tq_flags & TASKQ_ACTIVE)) goto out; ASSERT(tq->tq_flags & TASKQ_THREADS_CPU_PCT); int nthreads = MIN(tq->tq_cpu_pct, 100); nthreads = MAX(((num_online_cpus()) * nthreads) / 100, 1); tq->tq_maxthreads = nthreads; if (!((tq->tq_flags & TASKQ_DYNAMIC) && spl_taskq_thread_dynamic) && tq->tq_maxthreads < tq->tq_nthreads) { ASSERT3U(tq->tq_maxthreads, ==, tq->tq_nthreads - 1); taskq_thread_t *tqt = list_entry(tq->tq_thread_list.next, taskq_thread_t, tqt_thread_list); struct task_struct *thread = tqt->tqt_thread; spin_unlock_irqrestore(&tq->tq_lock, flags); kthread_stop(thread); return (0); } out: spin_unlock_irqrestore(&tq->tq_lock, flags); return (0); } #endif int spl_taskq_init(void) { init_rwsem(&tq_list_sem); tsd_create(&taskq_tsd, NULL); #ifdef HAVE_CPU_HOTPLUG spl_taskq_cpuhp_state = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "fs/spl_taskq:online", spl_taskq_expand, spl_taskq_prepare_down); #endif system_taskq = taskq_create("spl_system_taskq", MAX(boot_ncpus, 64), maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE|TASKQ_DYNAMIC); if (system_taskq == NULL) return (-ENOMEM); system_delay_taskq = taskq_create("spl_delay_taskq", MAX(boot_ncpus, 4), maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE|TASKQ_DYNAMIC); if (system_delay_taskq == NULL) { #ifdef HAVE_CPU_HOTPLUG cpuhp_remove_multi_state(spl_taskq_cpuhp_state); #endif taskq_destroy(system_taskq); return (-ENOMEM); } dynamic_taskq = taskq_create("spl_dynamic_taskq", 1, maxclsyspri, boot_ncpus, INT_MAX, TASKQ_PREPOPULATE); if (dynamic_taskq == NULL) { #ifdef HAVE_CPU_HOTPLUG cpuhp_remove_multi_state(spl_taskq_cpuhp_state); #endif taskq_destroy(system_taskq); taskq_destroy(system_delay_taskq); return (-ENOMEM); } /* * This is used to annotate tq_lock, so * taskq_dispatch -> taskq_thread_spawn -> taskq_dispatch * does not trigger a lockdep warning re: possible recursive locking */ dynamic_taskq->tq_lock_class = TQ_LOCK_DYNAMIC; return (0); } void spl_taskq_fini(void) { taskq_destroy(dynamic_taskq); dynamic_taskq = NULL; taskq_destroy(system_delay_taskq); system_delay_taskq = NULL; taskq_destroy(system_taskq); system_taskq = NULL; tsd_destroy(&taskq_tsd); #ifdef HAVE_CPU_HOTPLUG cpuhp_remove_multi_state(spl_taskq_cpuhp_state); spl_taskq_cpuhp_state = 0; #endif }