Index: include/Makefile =================================================================== --- include/Makefile +++ include/Makefile @@ -18,9 +18,9 @@ inttypes.h iso646.h kenv.h langinfo.h libgen.h limits.h link.h \ locale.h malloc.h malloc_np.h memory.h monetary.h mpool.h mqueue.h \ ndbm.h netconfig.h \ - netdb.h nl_types.h nlist.h nss.h nsswitch.h paths.h \ - printf.h proc_service.h pthread.h \ - pthread_np.h pwd.h ranlib.h readpassphrase.h regex.h \ + netdb.h nl_types.h nlist.h nss.h nsswitch.h paths.h printf.h \ + proc_service.h pthread.h pthread_np.h pthread_workqueue.h \ + pwd.h ranlib.h readpassphrase.h regex.h \ res_update.h resolv.h runetype.h search.h semaphore.h setjmp.h \ signal.h spawn.h stab.h stdalign.h stdbool.h stddef.h \ stdnoreturn.h stdio.h stdlib.h string.h stringlist.h \ Index: include/pthread_workqueue.h =================================================================== --- /dev/null +++ include/pthread_workqueue.h @@ -0,0 +1,75 @@ +/*- + * Copyright (c) 2009-2014, Stacey Son + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice unmodified, this list of conditions, and the following + * disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * $FreeBSD$ + */ + +#include + +struct _pthread_workqueue; + +typedef struct _pthread_workqueue * pthread_workqueue_t; +typedef void * pthread_workitem_handle_t; + +/* Pad size to 64 bytes. */ +typedef struct { + uint32_t sig; + int queueprio; + int overcommit; + unsigned int pad[13]; +} pthread_workqueue_attr_t; + +/* Work queue priority attributes. */ +#define WORKQ_HIGH_PRIOQUEUE 0 /* Assign to high priority queue. */ +#define WORKQ_DEFAULT_PRIOQUEUE 1 /* Assign to default priority queue. */ +#define WORKQ_LOW_PRIOQUEUE 2 /* Assign to low priority queue. */ +#define WORKQ_BG_PRIOQUEUE 3 /* background priority queue */ + +#define WORKQ_NUM_PRIOQUEUE 4 + +extern __int32_t workq_targetconc[WORKQ_NUM_PRIOQUEUE]; + +int pthread_workqueue_init_np(void); +int pthread_workqueue_create_np(pthread_workqueue_t * workqp, + const pthread_workqueue_attr_t * attr); +int pthread_workqueue_additem_np(pthread_workqueue_t workq, + void *( *workitem_func)(void *), void * workitem_arg, + pthread_workitem_handle_t * itemhandlep, unsigned int *gencountp); +int pthread_workqueue_attr_init_np(pthread_workqueue_attr_t * attrp); +int pthread_workqueue_attr_destroy_np(pthread_workqueue_attr_t * attr); +int pthread_workqueue_attr_setqueuepriority_np(pthread_workqueue_attr_t * attr, + int qprio); +int pthread_workqueue_attr_getovercommit_np( + const pthread_workqueue_attr_t * attr, int * ocommp); +int pthread_workqueue_attr_setovercommit_np(pthread_workqueue_attr_t * attr, + int ocomm); +int pthread_workqueue_requestconcurrency_np(pthread_workqueue_t workq, + int queue, int request_concurrency); +int pthread_workqueue_getovercommit_np(pthread_workqueue_t workq, + unsigned int *ocommp); +void pthread_workqueue_atfork_child(void); +void pthread_workqueue_atfork_parent(void); +void pthread_workqueue_atfork_prepare(void); +__END_DECLS Index: lib/libc/sys/Symbol.map =================================================================== --- lib/libc/sys/Symbol.map +++ lib/libc/sys/Symbol.map @@ -398,6 +398,8 @@ mknodat; stat; statfs; + thr_stack; + thr_workq; }; FBSDprivate_1.0 { @@ -979,10 +981,14 @@ __sys_thr_self; _thr_set_name; __sys_thr_set_name; + _thr_stack; + __sys_thr_stack; _thr_suspend; __sys_thr_suspend; _thr_wake; __sys_thr_wake; + _thr_workq; + __sys_thr_workq; _ktimer_create; __sys_ktimer_create; _ktimer_delete; Index: lib/libthr/pthread.map =================================================================== --- lib/libthr/pthread.map +++ lib/libthr/pthread.map @@ -316,6 +316,39 @@ pthread_getthreadid_np; }; + +FBSD_1.3 { + _pthread_workqueue_additem_np; + _pthread_workqueue_atfork_child; + _pthread_workqueue_atfork_parent; + _pthread_workqueue_atfork_prepare; + _pthread_workqueue_attr_destroy_np; + _pthread_workqueue_attr_getovercommit_np; + _pthread_workqueue_attr_getqueuepriority_np; + _pthread_workqueue_attr_init_np; + _pthread_workqueue_attr_setovercommit_np; + _pthread_workqueue_attr_setqueuepriority_np; + _pthread_workqueue_create_np; + _pthread_workqueue_getovercommit_np; + _pthread_workqueue_init_np; + _pthread_workqueue_requestconcurrency_np; + + pthread_workqueue_additem_np; + pthread_workqueue_atfork_child; + pthread_workqueue_atfork_parent; + pthread_workqueue_atfork_prepare; + pthread_workqueue_attr_destroy_np; + pthread_workqueue_attr_getovercommit_np; + pthread_workqueue_attr_getqueuepriority_np; + pthread_workqueue_attr_init_np; + pthread_workqueue_attr_setovercommit_np; + pthread_workqueue_attr_setqueuepriority_np; + pthread_workqueue_create_np; + pthread_workqueue_getovercommit_np; + pthread_workqueue_init_np; + pthread_workqueue_requestconcurrency_np; +}; + FBSD_1.4 { pthread_mutex_consistent; pthread_mutexattr_getrobust; Index: lib/libthr/thread/Makefile.inc =================================================================== --- lib/libthr/thread/Makefile.inc +++ lib/libthr/thread/Makefile.inc @@ -56,5 +56,6 @@ thr_suspend_np.c \ thr_switch_np.c \ thr_symbols.c \ + thr_workq.c \ thr_umtx.c \ thr_yield.c Index: lib/libthr/thread/thr_init.c =================================================================== --- lib/libthr/thread/thr_init.c +++ lib/libthr/thread/thr_init.c @@ -59,7 +59,7 @@ #include "libc_private.h" #include "thr_private.h" -char *_usrstack; +static char *_usrstack; struct pthread *_thr_initial; int _libthr_debug; int _thread_event_mask; @@ -113,8 +113,6 @@ int _thr_is_smp = 0; size_t _thr_guard_default; -size_t _thr_stack_default = THR_STACK_DEFAULT; -size_t _thr_stack_initial = THR_STACK_INITIAL; int _thr_page_size; int _thr_spinloops; int _thr_yieldloops; @@ -386,9 +384,8 @@ * resource limits, so this stack needs an explicitly mapped * red zone to protect the thread stack that is just beyond. */ - if (mmap(_usrstack - _thr_stack_initial - - _thr_guard_default, _thr_guard_default, 0, MAP_ANON, - -1, 0) == MAP_FAILED) + if (mmap(_usrstack - THR_STACK_INITIAL - _thr_guard_default, + _thr_guard_default, PROT_NONE, MAP_ANON, -1, 0) == MAP_FAILED) PANIC("Cannot allocate red zone for initial thread"); /* @@ -400,8 +397,8 @@ * actually free() it; it just puts it in the free * stack queue for later reuse. */ - thread->attr.stackaddr_attr = _usrstack - _thr_stack_initial; - thread->attr.stacksize_attr = _thr_stack_initial; + thread->attr.stackaddr_attr = _usrstack - THR_STACK_INITIAL; + thread->attr.stacksize_attr = THR_STACK_INITIAL; thread->attr.guardsize_attr = _thr_guard_default; thread->attr.flags |= THR_STACK_USER; @@ -470,7 +467,7 @@ if (env_bigstack != NULL || env_splitstack == NULL) { if (getrlimit(RLIMIT_STACK, &rlim) == -1) PANIC("Cannot get stack rlimit"); - _thr_stack_initial = rlim.rlim_cur; + // XXX - _thr_stack_initial = rlim.rlim_cur; } len = sizeof(_thr_is_smp); sysctlbyname("kern.smp.cpus", &_thr_is_smp, &len, NULL, 0); @@ -478,7 +475,7 @@ _thr_page_size = getpagesize(); _thr_guard_default = _thr_page_size; _pthread_attr_default.guardsize_attr = _thr_guard_default; - _pthread_attr_default.stacksize_attr = _thr_stack_default; + _pthread_attr_default.stacksize_attr = THR_STACK_DEFAULT; env = getenv("LIBPTHREAD_SPINLOOPS"); if (env) _thr_spinloops = atoi(env); Index: lib/libthr/thread/thr_private.h =================================================================== --- lib/libthr/thread/thr_private.h +++ lib/libthr/thread/thr_private.h @@ -713,8 +713,6 @@ * Global variables for the pthread kernel. */ -extern char *_usrstack __hidden; - /* For debugger */ extern int _libthr_debug; extern int _thread_event_mask; @@ -745,8 +743,6 @@ extern int _thr_is_smp __hidden; extern size_t _thr_guard_default __hidden; -extern size_t _thr_stack_default __hidden; -extern size_t _thr_stack_initial __hidden; extern int _thr_page_size __hidden; extern int _thr_spinloops __hidden; extern int _thr_yieldloops __hidden; Index: lib/libthr/thread/thr_stack.c =================================================================== --- lib/libthr/thread/thr_stack.c +++ lib/libthr/thread/thr_stack.c @@ -84,7 +84,7 @@ * | Red Zone (guard page) | red zone for 2nd thread * | | * +-----------------------------------+ - * | stack 2 - _thr_stack_default | top of 2nd thread stack + * | stack 2 - THR_STACK_DEFAULT | top of 2nd thread stack * | | * | | * | | @@ -95,7 +95,7 @@ * | Red Zone | red zone for 1st thread * | | * +-----------------------------------+ - * | stack 1 - _thr_stack_default | top of 1st thread stack + * | stack 1 - THR_STACK_DEFAULT | top of 1st thread stack * | | * | | * | | @@ -106,7 +106,7 @@ * | Red Zone | * | | red zone for main thread * +-----------------------------------+ - * | USRSTACK - _thr_stack_initial | top of main thread stack + * | KERN_USRSTACK - THR_STACK_INITIAL | top of main thread stack * | | ^ * | | | * | | | @@ -117,7 +117,6 @@ * high memory * */ -static char *last_stack = NULL; /* * Round size up to the nearest multiple of @@ -194,11 +193,10 @@ struct stack *spare_stack; size_t stacksize; size_t guardsize; - char *stackaddr; /* * Round up stack size to nearest multiple of _thr_page_size so - * that mmap() * will work. If the stack size is not an even + * that thr_stack() will work. If the stack size is not an even * multiple, we end up initializing things such that there is * unused space above the beginning of the stack, so the stack * sits snugly against its guard. @@ -211,7 +209,7 @@ /* * Use the garbage collector lock for synchronization of the - * spare stack lists and allocations from usrstack. + * spare stack lists. */ THREAD_LIST_WRLOCK(curthread); /* @@ -246,43 +244,10 @@ THREAD_LIST_UNLOCK(curthread); } else { - /* - * Allocate a stack from or below usrstack, depending - * on the LIBPTHREAD_BIGSTACK_MAIN env variable. - */ - if (last_stack == NULL) - last_stack = _usrstack - _thr_stack_initial - - _thr_guard_default; - - /* Allocate a new stack. */ - stackaddr = last_stack - stacksize - guardsize; - - /* - * Even if stack allocation fails, we don't want to try to - * use this location again, so unconditionally decrement - * last_stack. Under normal operating conditions, the most - * likely reason for an mmap() error is a stack overflow of - * the adjacent thread stack. - */ - last_stack -= (stacksize + guardsize); - - /* Release the lock before mmap'ing it. */ + /* thr_stack() can block so release the lock */ THREAD_LIST_UNLOCK(curthread); - /* Map the stack and guard page together, and split guard - page from allocated space: */ - if ((stackaddr = mmap(stackaddr, stacksize + guardsize, - _rtld_get_stack_prot(), MAP_STACK, - -1, 0)) != MAP_FAILED && - (guardsize == 0 || - mprotect(stackaddr, guardsize, PROT_NONE) == 0)) { - stackaddr += guardsize; - } else { - if (stackaddr != MAP_FAILED) - munmap(stackaddr, stacksize + guardsize); - stackaddr = NULL; - } - attr->stackaddr_attr = stackaddr; + attr->stackaddr_attr = thr_stack(stacksize, guardsize); } if (attr->stackaddr_attr != NULL) return (0); Index: lib/libthr/thread/thr_workq.c =================================================================== --- /dev/null +++ lib/libthr/thread/thr_workq.c @@ -0,0 +1,1134 @@ +/*- + * Copyright (c) 2009-2014, Stacey Son + * Copyright (c) 2000-2008, Apple Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice unmodified, this list of conditions, and the following + * disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * $FreeBSD$ + */ + +#include "namespace.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "un-namespace.h" + +#include "thr_private.h" + +typedef struct _pthread_workitem { + TAILQ_ENTRY(_pthread_workitem) item_entry; /* pthread_workitem + list in prio */ + void *(*func)(void *); + void *func_arg; + struct _pthread_workqueue *workq; + unsigned int flags; + unsigned int gencount; +} *pthread_workitem_t; + +typedef struct _pthread_workqueue_head { + TAILQ_HEAD(, _pthread_workqueue) wqhead; + struct _pthread_workqueue * next_workq; +} * pthread_workqueue_head_t; + +struct _pthread_workqueue { + struct pthread *mthread; /* main thread */ + unsigned int sig; /* Unique signature for this structure */ + struct umutex lock; /* Used for internal mutex on structure */ + TAILQ_ENTRY(_pthread_workqueue) wq_list; /* workqueue list in prio */ + TAILQ_HEAD(, _pthread_workitem) item_listhead; /* pthread_workitem + list in prio */ + TAILQ_HEAD(, _pthread_workitem) item_kernhead; /* pthread_workitem + list in prio */ + unsigned int flags; + size_t stacksize; + int istimeshare; + int importance; + int affinity; /* XXX - not used yet */ + int queueprio; + int barrier_count; + int kq_count; + void (*term_callback)(struct _pthread_workqueue *,void *); + void *term_callarg; + pthread_workqueue_head_t headp; + int overcommit; +#if ! defined(__x86_64__) + unsigned int rev2[11]; +#endif +}; + +/* + * Workqueue flags. + */ +#define PTHREAD_WORKQ_IN_CREATION 0x0001 +#define PTHREAD_WORKQ_IN_TERMINATE 0x0002 +#define PTHREAD_WORKQ_BARRIER_ON 0x0004 +#define PTHREAD_WORKQ_TERM_ON 0x0008 +#define PTHREAD_WORKQ_DESTROYED 0x0010 +#define PTHREAD_WORKQ_REQUEUED 0x0020 +#define PTHREAD_WORKQ_SUSPEND 0x0040 + +/* + * Workitem flags. + */ +#define PTH_WQITEM_INKERNEL_QUEUE 0x0001 +#define PTH_WQITEM_RUNNING 0x0002 +#define PTH_WQITEM_COMPLETED 0x0004 +#define PTH_WQITEM_REMOVED 0x0008 +#define PTH_WQITEM_BARRIER 0x0010 +#define PTH_WQITEM_DESTROY 0x0020 +#define PTH_WQITEM_NOTINLIST 0x0040 +#define PTH_WQITEM_APPLIED 0x0080 +#define PTH_WQITEM_KERN_COUNT 0x0100 + +/* + * Signatures/magic numbers. + */ +#define PTHREAD_WORKQUEUE_SIG 0xBEBEBEBE +#define PTHREAD_WORKQUEUE_ATTR_SIG 0xBEBEBEBE + +/* + * Memory pool sizes. + */ +#define WORKITEM_POOL_SIZE 1000 +#define WORKQUEUE_POOL_SIZE 100 + +static pthread_spinlock_t __workqueue_list_lock; +static int kernel_workq_setup = 0; +static int workq_id = 0; +static int wqreadyprio = 0; /* current highest prio queue ready with items */ +static pthread_workqueue_attr_t _pthread_wq_attr_default = { + .sig = 0, + .queueprio = 0, + .overcommit = 0, +}; +static volatile int32_t kernel_workq_count = 0; +static volatile int32_t user_workq_count = 0; + +static TAILQ_HEAD(__pthread_workqueue_pool, _pthread_workqueue) + __pthread_workqueue_pool_head = TAILQ_HEAD_INITIALIZER( + __pthread_workqueue_pool_head); +static TAILQ_HEAD(__pthread_workitem_pool, _pthread_workitem) + __pthread_workitem_pool_head = TAILQ_HEAD_INITIALIZER( + __pthread_workitem_pool_head); + + +struct _pthread_workqueue_head __pthread_workq0_head; +struct _pthread_workqueue_head __pthread_workq1_head; +struct _pthread_workqueue_head __pthread_workq2_head; +pthread_workqueue_head_t __pthread_wq_head_tbl[WORKQ_OS_NUMPRIOS] = { + &__pthread_workq0_head, + &__pthread_workq1_head, + &__pthread_workq2_head +}; + +static void workqueue_list_lock(void); +static void workqueue_list_unlock(void); +static pthread_workitem_t alloc_workitem(void); +static void free_workitem(pthread_workitem_t witem); +static pthread_workqueue_t alloc_workqueue(void); +static void free_workqueue(pthread_workqueue_t wq); +static void _pthread_wqthread(void *arg); +static void _pthread_newthread(void *arg); +static void _pthread_exitthread(void *arg); +static void pick_nextworkqueue_droplock(void); + +int _pthread_workqueue_init_np(void); +int _pthread_workqueue_attr_init_np(pthread_workqueue_attr_t * attrp); +int _pthread_workqueue_attr_destroy_np(pthread_workqueue_attr_t * attr); +int _pthread_workqueue_attr_getqueuepriority_np( + const pthread_workqueue_attr_t * attr, int * qpriop); +int _pthread_workqueue_attr_setqueuepriority_np(pthread_workqueue_attr_t * attr, + int qprio); +int _pthread_workqueue_attr_getovercommit_np( + const pthread_workqueue_attr_t * attr, int * ocommp); +int _pthread_workqueue_attr_setovercommit_np(pthread_workqueue_attr_t * attr, + int ocomm); +int _pthread_workqueue_create_np(pthread_workqueue_t * workqp, + const pthread_workqueue_attr_t * attr); +int _pthread_workqueue_additem_np(pthread_workqueue_t workq, + void *( *workitem_func)(void *), void * workitem_arg, + pthread_workitem_handle_t * itemhandlep, + unsigned int *gencountp); +int _pthread_workqueue_requestconcurrency_np(int queue, + int request_concurrency); +int _pthread_workqueue_getovercommit_np(pthread_workqueue_t workq, + unsigned int *ocommp); +void _pthread_workqueue_atfork_prepare(void); +void _pthread_workqueue_atfork_parent(void); +void _pthread_workqueue_atfork_child(void); + +__weak_reference(_pthread_workqueue_init_np, pthread_workqueue_init_np); +__weak_reference(_pthread_workqueue_attr_init_np, + pthread_workqueue_attr_init_np); +__weak_reference(_pthread_workqueue_attr_destroy_np, + pthread_workqueue_attr_destroy_np); +__weak_reference(_pthread_workqueue_attr_getqueuepriority_np, + pthread_workqueue_attr_getqueuepriority_np); +__weak_reference(_pthread_workqueue_attr_setqueuepriority_np, + pthread_workqueue_attr_setqueuepriority_np); +__weak_reference(_pthread_workqueue_attr_getovercommit_np, + pthread_workqueue_attr_getovercommit_np); +__weak_reference(_pthread_workqueue_attr_setovercommit_np, + pthread_workqueue_attr_setovercommit_np); +__weak_reference(_pthread_workqueue_getovercommit_np, + pthread_workqueue_getovercommit_np); +__weak_reference(_pthread_workqueue_create_np, pthread_workqueue_create_np); +__weak_reference(_pthread_workqueue_additem_np, pthread_workqueue_additem_np); +__weak_reference(_pthread_workqueue_requestconcurrency_np, + pthread_workqueue_requestconcurrency_np); +__weak_reference(_pthread_workqueue_atfork_prepare, + pthread_workqueue_atfork_prepare); +__weak_reference(_pthread_workqueue_atfork_parent, + pthread_workqueue_atfork_parent); +__weak_reference(_pthread_workqueue_atfork_child, + pthread_workqueue_atfork_child); + +/* + * dispatch_atfork_{prepare(void), parent(void), child(void)}} are provided by + * libdispatch which may not be linked. + */ +__attribute__((weak)) void dispatch_atfork_prepare(void); +__attribute__((weak)) void dispatch_atfork_parent(void); +__attribute__((weak)) void dispatch_atfork_child(void); + +#if __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 2) +#define ATOMIC_INC(p) __sync_add_and_fetch((p), 1) +#define ATOMIC_DEC(p) __sync_sub_and_fetch((p), 1) +#else +#define ATOMIC_INC(p) atomic_fetchadd_int(p, 1) +#define ATOMIC_DEC(p) atomic_fetchadd_int(p, -1) +#endif + +static void +workqueue_list_lock(void) +{ + + _pthread_spin_lock(&__workqueue_list_lock); +} + +static void +workqueue_list_unlock(void) +{ + + _pthread_spin_unlock(&__workqueue_list_lock); +} + +/* + * Round up size to the nearest multiple of _thr_page_size. + */ +static size_t +round_up(size_t size) +{ + + if (size % _thr_page_size != 0) + size = ((size / _thr_page_size) + 1) * + _thr_page_size; + return (size); +} + +static int +thr_workq_init(int *retid, struct pthread_attr *attr) +{ + struct twq_param twq; + + twq.twq_retid = retid; + twq.twq_workqfunc = _pthread_wqthread; + twq.twq_newtdfunc = _pthread_newthread; + twq.twq_exitfunc = _pthread_exitthread; + twq.twq_stacksize = round_up(attr->stacksize_attr); + twq.twq_guardsize = round_up(attr->guardsize_attr); + + return (thr_workq(WQOPS_INIT, &twq)); +} + +static int +thr_workq_thread_return(void) +{ + struct twq_param twq; + + twq.twq_id = workq_id; + return (thr_workq(WQOPS_THREAD_RETURN, &twq)); +} + +static int +thr_workq_queue_add(pthread_workitem_t witem, int affinity, int prio) +{ + struct twq_param twq; + + twq.twq_id = workq_id; + twq.twq_add_item = (void *)witem; + twq.twq_add_affin = affinity; + twq.twq_add_prio = prio; + + return (thr_workq(WQOPS_QUEUE_ADD, &twq)); +} + +static int +thr_workq_thread_setconc(int queue, int request_concurrency) +{ + struct twq_param twq; + + twq.twq_id = workq_id; + twq.twq_setconc_prio = queue; + twq.twq_setconc_conc = request_concurrency; + + return (thr_workq(WQOPS_THREAD_SETCONC, &twq)); +} + +static void +workqueue_exit(pthread_t self, pthread_workqueue_t workq, + pthread_workitem_t item) +{ + pthread_workitem_t baritem; + pthread_workqueue_head_t headp; + void (*func)(pthread_workqueue_t, void *); + + workqueue_list_lock(); + TAILQ_REMOVE(&workq->item_kernhead, item, item_entry); + workq->kq_count--; + + item->flags = 0; + free_workitem(item); + + if ((workq->flags & PTHREAD_WORKQ_BARRIER_ON) == + PTHREAD_WORKQ_BARRIER_ON) { + workq->barrier_count--; + + if (workq->barrier_count <= 0 ) { + /* Need to remove barrier item from the list. */ + baritem = TAILQ_FIRST(&workq->item_listhead); + + /* + * If the front item is a barrier and call back is + * registered, run that. + */ + if (((baritem->flags & PTH_WQITEM_BARRIER) == + PTH_WQITEM_BARRIER) && (baritem->func != NULL)){ + + workqueue_list_unlock(); + func = (void (*)(pthread_workqueue_t, void *)) + baritem->func; + (*func)(workq, baritem->func_arg); + workqueue_list_lock(); + } + TAILQ_REMOVE(&workq->item_listhead, baritem, + item_entry); + baritem->flags = 0; + free_workitem(baritem); + workq->flags &= ~PTHREAD_WORKQ_BARRIER_ON; + if ((workq->flags & PTHREAD_WORKQ_TERM_ON) != 0) { + headp = __pthread_wq_head_tbl[workq->queueprio]; + workq->flags |= PTHREAD_WORKQ_DESTROYED; + if (headp->next_workq == workq) { + headp->next_workq = + TAILQ_NEXT(workq, wq_list); + if (headp->next_workq == NULL) { + headp->next_workq = + TAILQ_FIRST(&headp->wqhead); + if (headp->next_workq == workq) + headp->next_workq=NULL; + } + } + TAILQ_REMOVE(&headp->wqhead, workq, wq_list); + workq->sig = 0; + if (workq->term_callback != NULL) { + workqueue_list_unlock(); + (*workq->term_callback)(workq, + workq->term_callarg); + workqueue_list_lock(); + } + free_workqueue(workq); + } else { + /* + * If there are higher prio item then reset + * to wqreadyprio. + */ + if ((workq->queueprio < wqreadyprio) && + (!(TAILQ_EMPTY(&workq->item_listhead)))) + wqreadyprio = workq->queueprio; + } + } + } + + pick_nextworkqueue_droplock(); + + (void)thr_workq_thread_return(); + + _pthread_exit(NULL); +} + + +/* XXX need to compare notes to thr_create()'s version */ +static void +_pthread_wqthread(void *arg) +{ + pthread_workitem_t item = (pthread_workitem_t)arg; + pthread_workqueue_t workq; + pthread_t self = _pthread_self(); + + /* debug serialization */ + THR_LOCK(self); + THR_UNLOCK(self); + + workq = item->workq; + ATOMIC_DEC(&kernel_workq_count); + + (*item->func)(item->func_arg); + + workqueue_exit(self, workq, item); + + /* NOT REACHED */ +} + +static void +_pthread_newthread(void *arg) +{ + pthread_workitem_t item = (pthread_workitem_t)arg; + pthread_workqueue_t workq; + struct pthread *newthread, *mthread; + int i; + + /* + * This thread has been initiated by the kernel but we need to allocate + * the user land now including the TLS. + */ + + workq = item->workq; + mthread = workq->mthread; + + if ((newthread = _thr_alloc(mthread)) == NULL) + _pthread_exit(NULL); /* XXX Return some error code? */ + + /* + * Init the thread structure. + */ + + /* Use the default thread attributes. */ + newthread->attr = _pthread_attr_default; + + newthread->magic = THR_MAGIC; + newthread->start_routine = item->func; + newthread->arg = item->func_arg; + newthread->cancel_enable = 1; + newthread->cancel_async = 0; + /* Initialize the mutex queue: */ + for (i = 0; i < TMQ_NITEMS; ++i) { + TAILQ_INIT(&newthread->mq[i]); + } + newthread->refcount = 1; + + /* + * This thread's stack will be recycled in the kernel so record + * its address as NULL. + */ + newthread->attr.stackaddr_attr = NULL; + + /* + * Get the Thread ID and set the automatic TLS. + * XXX It seems we could reduce this to one syscall. + */ + (void)thr_self(&newthread->tid); + _tcb_set(newthread->tcb); + + _thr_link(mthread, newthread); + + if (SHOULD_REPORT_EVENT(mthread, TD_CREATE)) { + THR_THREAD_LOCK(mthread, newthread); + _thr_report_creation(mthread, newthread); + THR_THREAD_UNLOCK(mthread, newthread); + } + + THR_LOCK(newthread); + THR_UNLOCK(newthread); + + /* + * Put the new thread to work. + */ + ATOMIC_DEC(&kernel_workq_count); + + (*item->func)(item->func_arg); + + workqueue_exit(newthread, workq, item); + + /* NOT REACHED */ +} + +static void +_pthread_exitthread(void *arg) +{ + + /* + * If the thread gets started with this start function it means + * we are shutting down so just exit. + */ + _pthread_exit(NULL); +} + +static int +_pthread_work_internal_init(void) +{ + int i; + pthread_workqueue_head_t headp; + pthread_workitem_t witemp; + pthread_workqueue_t wq; + pthread_t curthread = _get_curthread(); + + if (kernel_workq_setup == 0) { + + _pthread_wq_attr_default.queueprio = WORKQ_DEFAULT_PRIOQUEUE; + _pthread_wq_attr_default.sig = PTHREAD_WORKQUEUE_ATTR_SIG; + + for( i = 0; i< WORKQ_OS_NUMPRIOS; i++) { + headp = __pthread_wq_head_tbl[i]; + TAILQ_INIT(&headp->wqhead); + headp->next_workq = 0; + } + + /* create work item and workqueue pools */ + witemp = (struct _pthread_workitem *)malloc( + sizeof(struct _pthread_workitem) * WORKITEM_POOL_SIZE); + if (witemp == NULL) + return (ENOMEM); + bzero(witemp, (sizeof(struct _pthread_workitem) * + WORKITEM_POOL_SIZE)); + for (i = 0; i < WORKITEM_POOL_SIZE; i++) { + TAILQ_INSERT_TAIL(&__pthread_workitem_pool_head, + &witemp[i], item_entry); + } + wq = (struct _pthread_workqueue *)malloc( + sizeof(struct _pthread_workqueue) * WORKQUEUE_POOL_SIZE); + if (wq == NULL) { + free(witemp); + return (ENOMEM); + } + bzero(wq, (sizeof(struct _pthread_workqueue) * + WORKQUEUE_POOL_SIZE)); + for (i = 0; i < WORKQUEUE_POOL_SIZE; i++) { + TAILQ_INSERT_TAIL(&__pthread_workqueue_pool_head, + &wq[i], wq_list); + } + + /* XXX need to use the workqueue attr's. */ + if (thr_workq_init(&workq_id, &curthread->attr)) { + free(wq); + free(witemp); + return (errno); + } + + kernel_workq_setup = 1; + } + + return (0); +} + +static void +_pthread_workq_init(pthread_workqueue_t wq, + const pthread_workqueue_attr_t * attr) +{ + + bzero(wq, sizeof(struct _pthread_workqueue)); + if (attr != NULL) { + wq->queueprio = attr->queueprio; + wq->overcommit = attr->overcommit; + } else { + wq->queueprio = WORKQ_DEFAULT_PRIOQUEUE; + wq->overcommit = 0; + } + wq->flags = 0; + TAILQ_INIT(&wq->item_listhead); + TAILQ_INIT(&wq->item_kernhead); + wq->wq_list.tqe_next = 0; + wq->wq_list.tqe_prev = 0; + wq->sig = PTHREAD_WORKQUEUE_SIG; + wq->headp = __pthread_wq_head_tbl[wq->queueprio]; + wq->mthread = _get_curthread(); + wq->affinity = -1; /* XXX not used yet. */ +} + +int +_pthread_workqueue_init_np(void) +{ + int ret; + + _thr_check_init(); + _pthread_spin_init(&__workqueue_list_lock, PTHREAD_PROCESS_PRIVATE); + workqueue_list_lock(); + /* XXX - _pthread_attr_init(&_pthread_attr_default); */ + ret =_pthread_work_internal_init(); + workqueue_list_unlock(); + + return(ret); +} + +/* + * Pthread Workqueue Attributes. + */ +int +_pthread_workqueue_attr_init_np(pthread_workqueue_attr_t * attrp) +{ + + attrp->queueprio = WORKQ_DEFAULT_PRIOQUEUE; + attrp->sig = PTHREAD_WORKQUEUE_ATTR_SIG; + attrp->overcommit = 0; + return (0); +} + +int +_pthread_workqueue_attr_destroy_np(pthread_workqueue_attr_t * attr) +{ + + if (attr->sig == PTHREAD_WORKQUEUE_ATTR_SIG) + return (0); + else + return (EINVAL); /* Not an attribute struct. */ +} + +int +_pthread_workqueue_attr_getqueuepriority_np( + const pthread_workqueue_attr_t * attr, int * qpriop) +{ + + if (attr->sig == PTHREAD_WORKQUEUE_ATTR_SIG) { + *qpriop = attr->queueprio; + return (0); + } else + return (EINVAL); /* Not an atribute struct. */ +} + +int +_pthread_workqueue_attr_setqueuepriority_np(pthread_workqueue_attr_t * attr, + int qprio) +{ + + if (attr->sig == PTHREAD_WORKQUEUE_ATTR_SIG) { + switch(qprio) { + case WORKQ_HIGH_PRIOQUEUE: + case WORKQ_DEFAULT_PRIOQUEUE: + case WORKQ_LOW_PRIOQUEUE: + attr->queueprio = qprio; + return (0); + default: + return (EINVAL); + } + } else + return (EINVAL); +} + +int +_pthread_workqueue_attr_getovercommit_np(const pthread_workqueue_attr_t * attr, + int * ocommp) +{ + + if (attr->sig == PTHREAD_WORKQUEUE_ATTR_SIG) { + *ocommp = attr->overcommit; + return (0); + } else + return (EINVAL); /* Not an attribute struct. */ +} + +int +_pthread_workqueue_attr_setovercommit_np(pthread_workqueue_attr_t * attr, + int ocomm) +{ + + if (attr->sig == PTHREAD_WORKQUEUE_ATTR_SIG) { + attr->overcommit = ocomm; + return (0); + } else + return (EINVAL); +} + + +static int +valid_workq(pthread_workqueue_t workq) +{ + + if (workq->sig == PTHREAD_WORKQUEUE_SIG) + return (1); + else + return (0); +} + +int +_pthread_workqueue_getovercommit_np(pthread_workqueue_t workq, + unsigned int *ocommp) +{ + + if (valid_workq(workq) == 0) + return (EINVAL); + + if (ocommp != NULL) + *ocommp = workq->overcommit; + + return (0); +} + +/* + * Pthread Workqueue support routines. + */ +int +_pthread_workqueue_create_np(pthread_workqueue_t * workqp, + const pthread_workqueue_attr_t * attr) +{ + pthread_workqueue_t wq; + pthread_workqueue_head_t headp; + int error; + + if ((attr != NULL) && (attr->sig != PTHREAD_WORKQUEUE_ATTR_SIG)) + return (EINVAL); + + _thr_check_init(); + + workqueue_list_lock(); + if (kernel_workq_setup == 0) { + error = _pthread_work_internal_init(); + if (error) { + workqueue_list_unlock(); + return (error); + } + } + + wq = alloc_workqueue(); + if (wq == NULL) { + workqueue_list_unlock(); + return (ENOMEM); + } + + _pthread_workq_init(wq, attr); + + headp = __pthread_wq_head_tbl[wq->queueprio]; + TAILQ_INSERT_TAIL(&headp->wqhead, wq, wq_list); + if (headp->next_workq == NULL) + headp->next_workq = TAILQ_FIRST(&headp->wqhead); + workqueue_list_unlock(); + + *workqp = wq; + + return (0); +} + +/* + * alloc_workitem() is called with the list lock held. It will drop the lock + * if we need to actually alocate memory. + */ +static pthread_workitem_t +alloc_workitem(void) +{ + pthread_workitem_t witem; + + if (TAILQ_EMPTY(&__pthread_workitem_pool_head)) { + workqueue_list_unlock(); + witem = malloc(sizeof(struct _pthread_workitem)); + if (witem == NULL) + return (NULL); + witem->gencount = 0; + workqueue_list_lock(); + } else { + witem = TAILQ_FIRST(&__pthread_workitem_pool_head); + TAILQ_REMOVE(&__pthread_workitem_pool_head, witem, item_entry); + } + return (witem); +} + +/* + * free_workitem() is called with the list lock held. + */ +static void +free_workitem(pthread_workitem_t witem) +{ + + witem->gencount++; + TAILQ_INSERT_TAIL(&__pthread_workitem_pool_head, witem, item_entry); +} + +/* + * alloc_workqueue() is called with list lock held. + */ +static pthread_workqueue_t +alloc_workqueue(void) +{ + pthread_workqueue_t wq; + + if (TAILQ_EMPTY(&__pthread_workqueue_pool_head)) { + workqueue_list_unlock(); + wq = malloc(sizeof(struct _pthread_workqueue)); + if (wq == NULL) + return (NULL); + workqueue_list_lock(); + } else { + wq = TAILQ_FIRST(&__pthread_workqueue_pool_head); + TAILQ_REMOVE(&__pthread_workqueue_pool_head, wq, wq_list); + } + user_workq_count++; + return(wq); +} + +/* + * free_workqueue() is called with list lock held. + */ +static void +free_workqueue(pthread_workqueue_t wq) +{ + + user_workq_count--; + TAILQ_INSERT_TAIL(&__pthread_workqueue_pool_head, wq, wq_list); +} + +static int +post_nextworkitem(pthread_workqueue_t workq) +{ + int error, prio; + pthread_workitem_t witem; + pthread_workqueue_head_t headp; + void (*func)(pthread_workqueue_t, void *); + + if ((workq->flags & PTHREAD_WORKQ_SUSPEND) == PTHREAD_WORKQ_SUSPEND) + return (0); + + if (TAILQ_EMPTY(&workq->item_listhead)) + return (0); + + if ((workq->flags & PTHREAD_WORKQ_BARRIER_ON) == + PTHREAD_WORKQ_BARRIER_ON) + return (0); + + witem = TAILQ_FIRST(&workq->item_listhead); + headp = workq->headp; + + if ((witem->flags & PTH_WQITEM_BARRIER) == PTH_WQITEM_BARRIER) { + if ((witem->flags & PTH_WQITEM_APPLIED) != 0) + return (0); + + /* + * Also barrier when nothing needs to be handled and + * nothing to wait for. + */ + if (workq->kq_count != 0) { + witem->flags |= PTH_WQITEM_APPLIED; + workq->flags |= PTHREAD_WORKQ_BARRIER_ON; + workq->barrier_count = workq->kq_count; + + return (1); + } else { + if (witem->func != NULL) { + /* We are going to drop list lock. */ + witem->flags |= PTH_WQITEM_APPLIED; + workq->flags |= PTHREAD_WORKQ_BARRIER_ON; + workqueue_list_unlock(); + + func = (void (*)(pthread_workqueue_t, void *)) + witem->func; + (*func)(workq, witem->func_arg); + + workqueue_list_lock(); + workq->flags &= ~PTHREAD_WORKQ_BARRIER_ON; + } + TAILQ_REMOVE(&workq->item_listhead, witem, item_entry); + witem->flags = 0; + free_workitem(witem); + + return (1); + } + } else if ((witem->flags & PTH_WQITEM_DESTROY) == PTH_WQITEM_DESTROY) { + if ((witem->flags & PTH_WQITEM_APPLIED) != 0) + return (0); + witem->flags |= PTH_WQITEM_APPLIED; + workq->flags |= + (PTHREAD_WORKQ_BARRIER_ON | PTHREAD_WORKQ_TERM_ON); + workq->barrier_count = workq->kq_count; + workq->term_callback = + (void (*)(struct _pthread_workqueue *,void *))witem->func; + workq->term_callarg = witem->func_arg; + TAILQ_REMOVE(&workq->item_listhead, witem, item_entry); + + if ((TAILQ_EMPTY(&workq->item_listhead)) && + (workq->kq_count == 0)) { + witem->flags = 0; + free_workitem(witem); + workq->flags |= PTHREAD_WORKQ_DESTROYED; + + headp = __pthread_wq_head_tbl[workq->queueprio]; + if (headp->next_workq == workq) { + headp->next_workq = TAILQ_NEXT(workq, wq_list); + if (headp->next_workq == NULL) { + headp->next_workq = + TAILQ_FIRST(&headp->wqhead); + if (headp->next_workq == workq) + headp->next_workq = NULL; + } + } + workq->sig = 0; + TAILQ_REMOVE(&headp->wqhead, workq, wq_list); + if (workq->term_callback != NULL) { + workqueue_list_unlock(); + (*workq->term_callback)(workq, + workq->term_callarg); + workqueue_list_lock(); + } + free_workqueue(workq); + return (1); + } else + TAILQ_INSERT_HEAD(&workq->item_listhead, witem, + item_entry); + + return (1); + + } else { + TAILQ_REMOVE(&workq->item_listhead, witem, item_entry); + if ((witem->flags & PTH_WQITEM_KERN_COUNT) == 0) { + workq->kq_count++; + witem->flags |= PTH_WQITEM_KERN_COUNT; + } + ATOMIC_INC(&kernel_workq_count); + + workqueue_list_unlock(); + + prio = workq->queueprio; + if (workq->overcommit != 0) + prio |= WORKQUEUE_OVERCOMMIT; + + if ((error = thr_workq_queue_add(witem, + workq->affinity, prio)) == -1) { + ATOMIC_DEC(&kernel_workq_count); + + workqueue_list_lock(); + TAILQ_REMOVE(&workq->item_kernhead, witem, item_entry); + TAILQ_INSERT_HEAD(&workq->item_listhead, witem, + item_entry); + if ((workq->flags & (PTHREAD_WORKQ_BARRIER_ON | + PTHREAD_WORKQ_TERM_ON)) != 0) + workq->flags |= PTHREAD_WORKQ_REQUEUED; + } else + workqueue_list_lock(); + + return (1); + } + + /* NOT REACHED. */ + + PANIC("Error in logic for post_nextworkitem()"); + + return (0); +} + +/* + * pick_nextworkqueue_droplock() is called with the list lock held and + * drops the lock. + */ +static void +pick_nextworkqueue_droplock(void) +{ + int i, curwqprio, val, found; + pthread_workqueue_head_t headp; + pthread_workqueue_t workq; + pthread_workqueue_t nworkq = NULL; + + +loop: + while (kernel_workq_count < WORKQ_OS_ELEM_MAX) { + found = 0; + for (i = 0; i < WORKQ_OS_NUMPRIOS; i++) { + /* There is nothing else higher to run. */ + wqreadyprio = i; + headp = __pthread_wq_head_tbl[i]; + + if (TAILQ_EMPTY(&headp->wqhead)) + continue; + workq = headp->next_workq; + if (workq == NULL) + workq = TAILQ_FIRST(&headp->wqhead); + curwqprio = workq->queueprio; + nworkq = workq; + while (kernel_workq_count < WORKQ_OS_ELEM_MAX) { + headp->next_workq = TAILQ_NEXT(workq, wq_list); + if (headp->next_workq == NULL) + headp->next_workq = + TAILQ_FIRST(&headp->wqhead); + + val = post_nextworkitem(workq); + if (val != 0) { + /* + * Things could have change so let's + * reassess. If kernel queu is full + * then skip. + */ + if (kernel_workq_count >= + WORKQ_OS_ELEM_MAX) + break; + /* + * If anything with higher prio arrived + * then reevaluate. + */ + if (wqreadyprio < curwqprio) + goto loop; /* re-evaluate */ + /* + * We can post some more work items. + */ + found = 1; + } + + /* + * We cannot use workq here as it could be + * freed. + */ + if (TAILQ_EMPTY(&headp->wqhead)) + break; + /* + * If we found nothing to run and only one + * workqueue in the list, skip. + */ + if ((val == 0) && (workq == headp->next_workq)) + break; + workq = headp->next_workq; + if (workq == NULL) + workq = TAILQ_FIRST(&headp->wqhead); + if (val != 0) + nworkq = workq; + /* + * If we found nothing to run then back to workq + * where we started. + */ + if ((val == 0) && (workq == nworkq)) + break; + } + if (kernel_workq_count >= WORKQ_OS_ELEM_MAX) + break; + } + /* Nothing found to run? */ + if (found == 0) + break; + + } + workqueue_list_unlock(); +} + + +int +_pthread_workqueue_additem_np(pthread_workqueue_t workq, + void *( *workitem_func)(void *), void * workitem_arg, + pthread_workitem_handle_t * itemhandlep, unsigned int *gencountp) +{ + pthread_workitem_t witem; + + if (valid_workq(workq) == 0) + return (EINVAL); + + workqueue_list_lock(); + /* + * Allocate the workitem here as it can drop the lock. Also we can + * evaluate the workqueue state only once. + */ + witem = alloc_workitem(); + if (witem == NULL) { + workqueue_list_unlock(); + return (ENOMEM); + } + witem->func = workitem_func; + witem->func_arg = workitem_arg; + witem->flags = 0; + witem->workq = workq; + witem->item_entry.tqe_next = 0; + witem->item_entry.tqe_prev = 0; + + /* alloc_workitem() can drop the lock, check the state. */ + if ((workq->flags & + (PTHREAD_WORKQ_IN_TERMINATE | PTHREAD_WORKQ_DESTROYED)) != 0) { + free_workitem(witem); + workqueue_list_unlock(); + *itemhandlep = 0; + return (ESRCH); + } + + if (itemhandlep != NULL) + *itemhandlep = (pthread_workitem_handle_t *)witem; + if (gencountp != NULL) + *gencountp = witem->gencount; + TAILQ_INSERT_TAIL(&workq->item_listhead, witem, item_entry); + if (((workq->flags & PTHREAD_WORKQ_BARRIER_ON) == 0) && + (workq->queueprio < wqreadyprio)) + wqreadyprio = workq->queueprio; + + pick_nextworkqueue_droplock(); + + return (0); +} + +/* + * Pthread Workqueue support routines. + */ +int +_pthread_workqueue_requestconcurrency_np(int queue, int request_concurrency) +{ + int error = 0; + + if (queue < 0 || queue > WORKQ_OS_NUMPRIOS) + return (EINVAL); + + error = thr_workq_thread_setconc(queue, request_concurrency); + + if (error == -1) + return (errno); + + return (0); +} + +void +_pthread_workqueue_atfork_prepare(void) +{ + + if (dispatch_atfork_prepare != 0) + dispatch_atfork_prepare(); +} + +void +_pthread_workqueue_atfork_parent(void) +{ + + if (dispatch_atfork_parent != 0) + dispatch_atfork_parent(); +} + +void +_pthread_workqueue_atfork_child(void) +{ + + (void)_pthread_spin_destroy(&__workqueue_list_lock); + (void)_pthread_spin_init(&__workqueue_list_lock, + PTHREAD_PROCESS_PRIVATE); + if (kernel_workq_setup != 0) { + kernel_workq_setup = 0; + _pthread_work_internal_init(); + } + if (dispatch_atfork_child != 0) + dispatch_atfork_child(); +} + Index: share/man/man3/Makefile =================================================================== --- share/man/man3/Makefile +++ share/man/man3/Makefile @@ -274,6 +274,7 @@ pthread_suspend_np.3 \ pthread_switch_add_np.3 \ pthread_testcancel.3 \ + pthread_workqueue_np.3 \ pthread_yield.3 PTHREAD_MLINKS= pthread_affinity_np.3 pthread_getaffinity_np.3 \ @@ -336,6 +337,15 @@ PTHREAD_MLINKS+=pthread_testcancel.3 pthread_setcancelstate.3 \ pthread_testcancel.3 pthread_setcanceltype.3 PTHREAD_MLINKS+=pthread_join.3 pthread_timedjoin_np.3 +PTHREAD_MLINKS+=pthread_workqueue_np.3 pthread_workqueue_init_np.3 \ + pthread_workqueue_np.3 pthread_workqueue_create_np.3 \ + pthread_workqueue_np.3 pthread_workqueue_additem_np.3 \ + pthread_workqueue_np.3 pthread_workqueue_attr_init_np.3 \ + pthread_workqueue_np.3 pthread_workqueue_attr_destroy_np.3 \ + pthread_workqueue_np.3 pthread_workqueue_attr_getqueuepriority_np.3 \ + pthread_workqueue_np.3 pthread_workqueue_attr_setqueuepriority_np.3 \ + pthread_workqueue_np.3 pthread_workqueue_attr_getovercommit_np.3 \ + pthread_workqueue_np.3 pthread_workqueue_attr_setovercommit_np.3 .endif .include Index: share/man/man3/pthread_workqueue_np.3 =================================================================== --- /dev/null +++ share/man/man3/pthread_workqueue_np.3 @@ -0,0 +1,489 @@ +.\" Copyright (C) 2009-2015 sson@FreeBSD.org +.\" All rights reserved. +.\" +.\" Redistribution and use in source and binary forms, with or without +.\" modification, are permitted provided that the following conditions +.\" are met: +.\" 1. Redistributions of source code must retain the above copyright +.\" notice(s), this list of conditions and the following disclaimer as +.\" the first lines of this file unmodified other than the possible +.\" addition of one or more copyright notices. +.\" 2. Redistributions in binary form must reproduce the above copyright +.\" notice(s), this list of conditions and the following disclaimer in +.\" the documentation and/or other materials provided with the +.\" distribution. +.\" +.\" THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDER(S) ``AS IS'' AND ANY +.\" EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +.\" IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +.\" PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER(S) BE +.\" LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +.\" CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +.\" SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR +.\" BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +.\" WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE +.\" OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, +.\" EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +.\" +.\" $FreeBSD$ +.Dd May 1, 2014 +.Dt PTHREAD_WORKQUEUE 3 +.Os +.Sh NAME +.Nm pthread_workqueue_init_np , +.Nm pthread_workqueue_create_np , +.Nm pthread_workqueue_additem_np +.Nd thread workqueue operations +.Pp +.Nm pthread_workqueue_attr_init_np , +.Nm pthread_workqueue_attr_destroy_np , +.Nm pthread_workqueue_attr_getovercommit_np , +.Nm pthread_workqueue_attr_setovercommit_np , +.Nm pthread_workqueue_attr_getqueuepriority_np , +.Nm pthread_workqueue_attr_setqueuepriority_np +.Nd thread workqueue attribute operations +.Sh LIBRARY +.Lb libpthread +.Sh SYNOPSIS +.In pthread_np.h +.Ft int +.Fn pthread_workqueue_init_np "void" +.Ft int +.Fn pthread_workqueue_create_np "pthread_workqueue_t *workqp" "const pthread_workqueue_attr_t * attr" +.Ft int +.Fn pthread_workqueue_additem_np "pthread_workqueue_t workq" "void *( *workitem_func)(void *)" "void * workitem_arg" "pthread_workitem_handle_t * itemhandlep" "unsigned int *gencountp" +.Ft int +.Fn pthread_workqueue_attr_init_np "pthread_workqueue_attr_t *attr" +.Ft int +.Fn pthread_workqueue_attr_destroy_np "pthread_workqueue_attr_t *attr" +.Ft int +.Fn pthread_workqueue_attr_getovercommit_np "pthread_workqueue_attr_t *attr" "int *ocommp" +.Ft int +.Fn pthread_workqueue_attr_setovercommit_np "pthread_workqueue_attr_t *attr" "int ocomm" +.Ft int +.Fn pthread_workqueue_attr_getqueuepriority_np "pthread_workqueue_attr_t *attr" "int *qpriop" +.Ft int +.Fn pthread_workqueue_attr_setqueuepriority_np "pthread_workqueue_attr_t *attr" "int qprio" +.Sh DESCRIPTION +The +.Fn pthread_workqueue_*_np +functions are used to create and submit work items to a thread pool. +The size of the thread pool is managed by the kernel based on physical +resources and these tunable +.Xr sysctl 3 +MIBs: +.Bl -tag -width "Va kern.wq_reduce_pool_window_usecs" +.It Va kern.wq_yielded_threshold +Maximum number of threads to yield within the window. +.It Va kern.wq_yielded_window_usecs +Yielded thread window size given in microseconds. +.It Va kern.wq_stalled_window_usecs +Number of microseconds until a thread is considered stalled. +.It Va kern.wq_reduce_pool_window_usecs +Number of microseconds while a thread is idle until it is +removed from the thread pool. +.It Va kern.wq_max_timer_interval_usecs +Number of microseconds to wait to check for stalled or idle threads. +.It Va kern.wq_max_threads +Maximum number of threads in the thread pool. +.El +.Pp +The user may create multiple work queues of different priority and +manually overcommit the available resources. +.Pp +.Fn pthread_workqueue_init_np +allocates and initializes the thread workqueue subsystem. +.Pp +.Fn pthread_workqueue_create_np +creates a new thread workqueue with the attributes given by +.Fa attr . +If +.Fa attr +is NULL then the default attributes are used. +A workqueue handle is returned in the +.Fa workqp +parameter. +.Pp +Thread workqueue attributes are used to specify parameters to +.Fn pthread_workqueue_create_np . +One attribute object can be used in multiple calls to +.Fn pthread_workqueue_create_np , +with or without modifications between calls. +.Pp +.Fn pthread_workqueue_additem_np +is used to submit work items to the thread pool specified by the +.Fa workq +parameter. +The work item function and function argument are given by +.Fa workitem_func +and +.Fa workitem_arg . +The work item handle is returned in +.Fa itemhandlep . +.Pp +The +.Fn pthread_workqueue_attr_init_np +function initializes +.Fa attr +with all the default thread workqueue attributes. +.Pp +The +.Fn pthread_workqueue_attr_destroy_np +function destroys +.Fa attr . +.Pp +The +.Fn pthread_workqueue_attr_set*_np +functions set the attribute that corresponds to each function name. +.Fn pthread_workqueue_attr_setovercommit_np +can be used to set the overcommit flag. +When the overcommit flag is set, more threads will be started as +needed. +This might overcommit the physical resources of the system. +.Fn pthread_workqueue_attr_setqueuepriority_np +sets the queue priority attribute of the thread work queue and must be +set to one of these values: +.Bl -tag -width "Va WORKQ_DEFAULT_PRIOQUEUE" +.It Va WORKQ_HIGH_PRIOQUEUE +Queued work items with this attribute will be given higher priority by +the thread scheduler. +.It Va WORKQ_DEFAULT_PRIOQUEUE +Queued work items in the queue with this attribute are given the default +priority. +.It Va WORKQ_LOW_PRIOQUEUE +Queued work items in the queue with this attribute will be given lower priority +by the thread scheduler. +.El +.Pp +The +.Fn pthread_workqueue_attr_get*_np +functions copy the value of the attribute that corresponds to each function name +to the location pointed to by the second function parameter. +.Sh RETURN VALUES +If successful, these functions return 0. +Otherwise, an error number is returned to indicate the error. +.Sh ERRORS +The +.Fn pthread_workqueue_init_np +function will fail with: +.Bl -tag -width Er +.It Bq Er ENOMEM +Out of memory. +.El +.Pp +The +.Fn pthread_workqueue_create_np +function will fail with: +.Bl -tag -width Er +.It Bq Er ENOMEM +Out of memory. +.El +.Pp +The +.Fn pthread_workqueue_additem_np +function will fail with: +.Bl -tag -width Er +.It Bq Er EINVAL +Invalid workqueue handle. +.It Bq Er ENOMEM +Out of memory. +.It Bq Er ESRCH +Cannot find workqueue. +.El +.Pp +The +.Fn pthread_workqueue_attr_init_np +function will fail if: +.Bl -tag -width Er +.It Bq Er ENOMEM +Out of memory. +.El +.Pp +The +.Fn pthread_workqueue_attr_destroy_np +function will fail if: +.Bl -tag -width Er +.It Bq Er EINVAL +Invalid value for +.Fa attr . +.El +.Pp +The +.Fn pthread_workqueue_attr_setqueuepriority_np +function will fail if: +.Bl -tag -width Er +.It Bq Er EINVAL +Invalid value for +.Fa attr +or for +.Fa qprio. +.El +.Pp +The +.Fn pthread_workqueue_attr_setovercommit_np , +.Fn pthread_workqueue_attr_getovercommit_np +and +.Fn pthread_workqueue_attr_getqueuepriority_np +functions will fail if: +.Bl -tag -width Er +.It Bq Er EINVAL +Invalid value for +.Fa attr . +.El +.Sh SEE ALSO +.Xr pthread 3 , +.Xr sysctl 3 +.Sh BUGS +There is currently no way to remove or destroy work queues or pending +work items other than exiting the process. +.Sh HISTORY +This thread workqueues code was created to support Grand Central Dispatch (GCD +or libdispatch) and first appeared in +.Fx 10.1 . +.Sh AUTHORS +.An "Stacey Son" Aq sson@FreeBSD.org . +.\" Copyright (C) 2009-2015 sson@FreeBSD.org +.\" All rights reserved. +.\" +.\" Redistribution and use in source and binary forms, with or without +.\" modification, are permitted provided that the following conditions +.\" are met: +.\" 1. Redistributions of source code must retain the above copyright +.\" notice(s), this list of conditions and the following disclaimer as +.\" the first lines of this file unmodified other than the possible +.\" addition of one or more copyright notices. +.\" 2. Redistributions in binary form must reproduce the above copyright +.\" notice(s), this list of conditions and the following disclaimer in +.\" the documentation and/or other materials provided with the +.\" distribution. +.\" +.\" THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDER(S) ``AS IS'' AND ANY +.\" EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +.\" IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +.\" PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER(S) BE +.\" LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +.\" CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +.\" SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR +.\" BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +.\" WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE +.\" OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, +.\" EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +.\" +.\" $FreeBSD$ +.Dd May 1, 2014 +.Dt PTHREAD_WORKQUEUE 3 +.Os +.Sh NAME +.Nm pthread_workqueue_init_np , +.Nm pthread_workqueue_create_np , +.Nm pthread_workqueue_additem_np +.Nd thread workqueue operations +.Pp +.Nm pthread_workqueue_attr_init_np , +.Nm pthread_workqueue_attr_destroy_np , +.Nm pthread_workqueue_attr_getovercommit_np , +.Nm pthread_workqueue_attr_setovercommit_np , +.Nm pthread_workqueue_attr_getqueuepriority_np , +.Nm pthread_workqueue_attr_setqueuepriority_np +.Nd thread workqueue attribute operations +.Sh LIBRARY +.Lb libpthread +.Sh SYNOPSIS +.In pthread_np.h +.Ft int +.Fn pthread_workqueue_init_np "void" +.Ft int +.Fn pthread_workqueue_create_np "pthread_workqueue_t *workqp" "const pthread_workqueue_attr_t * attr" +.Ft int +.Fn pthread_workqueue_additem_np "pthread_workqueue_t workq" "void *( *workitem_func)(void *)" "void * workitem_arg" "pthread_workitem_handle_t * itemhandlep" "unsigned int *gencountp" +.Ft int +.Fn pthread_workqueue_attr_init_np "pthread_workqueue_attr_t *attr" +.Ft int +.Fn pthread_workqueue_attr_destroy_np "pthread_workqueue_attr_t *attr" +.Ft int +.Fn pthread_workqueue_attr_getovercommit_np "pthread_workqueue_attr_t *attr" "int *ocommp" +.Ft int +.Fn pthread_workqueue_attr_setovercommit_np "pthread_workqueue_attr_t *attr" "int ocomm" +.Ft int +.Fn pthread_workqueue_attr_getqueuepriority_np "pthread_workqueue_attr_t *attr" "int *qpriop" +.Ft int +.Fn pthread_workqueue_attr_setqueuepriority_np "pthread_workqueue_attr_t *attr" "int qprio" +.Sh DESCRIPTION +The +.Fn pthread_workqueue_*_np +functions are used to create and submit work items to a thread pool. +The size of the thread pool is managed by the kernel based on physical +resources and these tunable +.Xr sysctl 3 +MIBs: +.Bl -tag -width "Va kern.wq_reduce_pool_window_usecs" +.It Va kern.wq_yielded_threshold +Maximum number of threads to yield within the window. +.It Va kern.wq_yielded_window_usecs +Yielded thread window size given in microseconds. +.It Va kern.wq_stalled_window_usecs +Number of microseconds until a thread is considered stalled. +.It Va kern.wq_reduce_pool_window_usecs +Number of microseconds while a thread is idle until it is +removed from the thread pool. +.It Va kern.wq_max_timer_interval_usecs +Number of microseconds to wait to check for stalled or idle threads. +.It Va kern.wq_max_threads +Maximum number of threads in the thread pool. +.El +.Pp +The user may create multiple work queues of different priority and +manually overcommit the available resources. +.Pp +.Fn pthread_workqueue_init_np +allocates and initializes the thread workqueue subsystem. +.Pp +.Fn pthread_workqueue_create_np +creates a new thread workqueue with the attributes given by +.Fa attr . +If +.Fa attr +is +.Dv NULL +then the default attributes are used. +A workqueue handle is returned in the +.Fa workqp +parameter. +.Pp +Thread workqueue attributes are used to specify parameters to +.Fn pthread_workqueue_create_np . +One attribute object can be used in multiple calls to +.Fn pthread_workqueue_create_np , +with or without modifications between calls. +.Pp +.Fn pthread_workqueue_additem_np +is used to submit work items to the thread pool specified by the +.Fa workq +parameter. +The work item function and function argument are given by +.Fa workitem_func +and +.Fa workitem_arg . +The work item handle is returned in +.Fa itemhandlep . +.Pp +The +.Fn pthread_workqueue_attr_init_np +function initializes +.Fa attr +with all the default thread workqueue attributes. +.Pp +The +.Fn pthread_workqueue_attr_destroy_np +function destroys +.Fa attr . +.Pp +The +.Fn pthread_workqueue_attr_set*_np +functions set the attribute that corresponds to each function name. +.Fn pthread_workqueue_attr_setovercommit_np +can be used to set the overcommit flag. +When the overcommit flag is set, more threads will be started as needed. +This might overcommit the physical resources of the system. +.Fn pthread_workqueue_attr_setqueuepriority_np +sets the queue priority attribute of the thread work queue and must be +set to one of these values: +.Bl -tag -width "Va WORKQ_DEFAULT_PRIOQUEUE" +.It Va WORKQ_HIGH_PRIOQUEUE +Queued work items with this attribute will be given higher priority by +the thread scheduler. +.It Va WORKQ_DEFAULT_PRIOQUEUE +Queued work items in the queue with this attribute are given the default +priority. +.It Va WORKQ_LOW_PRIOQUEUE +Queued work items in the queue with this attribute will be given lower priority +by the thread scheduler. +.El +.Pp +The +.Fn pthread_workqueue_attr_get*_np +functions copy the value of the attribute that corresponds to each function name +to the location pointed to by the second function parameter. +.Sh RETURN VALUES +If successful, these functions return 0. +Otherwise, an error number is returned to indicate the error. +.Sh ERRORS +The +.Fn pthread_workqueue_init_np +function will fail with: +.Bl -tag -width Er +.It Bq Er ENOMEM +Out of memory. +.El +.Pp +The +.Fn pthread_workqueue_create_np +function will fail with: +.Bl -tag -width Er +.It Bq Er ENOMEM +Out of memory. +.El +.Pp +The +.Fn pthread_workqueue_additem_np +function will fail with: +.Bl -tag -width Er +.It Bq Er EINVAL +Invalid workqueue handle. +.It Bq Er ENOMEM +Out of memory. +.It Bq Er ESRCH +Cannot find workqueue. +.El +.Pp +The +.Fn pthread_workqueue_attr_init_np +function will fail if: +.Bl -tag -width Er +.It Bq Er ENOMEM +Out of memory. +.El +.Pp +The +.Fn pthread_workqueue_attr_destroy_np +function will fail if: +.Bl -tag -width Er +.It Bq Er EINVAL +Invalid value for +.Fa attr . +.El +.Pp +The +.Fn pthread_workqueue_attr_setqueuepriority_np +function will fail if: +.Bl -tag -width Er +.It Bq Er EINVAL +Invalid value for +.Fa attr +or for +.Fa qprio. +.El +.Pp +The +.Fn pthread_workqueue_attr_setovercommit_np , +.Fn pthread_workqueue_attr_getovercommit_np +and +.Fn pthread_workqueue_attr_getqueuepriority_np +functions will fail if: +.Bl -tag -width Er +.It Bq Er EINVAL +Invalid value for +.Fa attr . +.El +.Sh SEE ALSO +.Xr pthread 3 , +.Xr sysctl 3 +.Sh BUGS +There is currently no way to remove or destroy work queues or pending +work items other than exiting the process. +.Sh HISTORY +This thread workqueues code was created to support Grand Central Dispatch (GCD +or libdispatch) and first appeared in +.Fx 10.1 . +.Sh AUTHORS +.An Stacey Son Aq Mt sson@FreeBSD.org Index: sys/conf/NOTES =================================================================== --- sys/conf/NOTES +++ sys/conf/NOTES @@ -1178,6 +1178,9 @@ # POSIX message queue options P1003_1B_MQUEUE + +# PThread WorkQueue support +options THRWORKQ ##################################################################### # SECURITY POLICY PARAMETERS Index: sys/conf/files =================================================================== --- sys/conf/files +++ sys/conf/files @@ -3688,6 +3688,7 @@ kern/kern_tc.c standard kern/kern_thr.c standard kern/kern_thread.c standard +kern/kern_thrworkq.c standard kern/kern_time.c standard kern/kern_timeout.c standard kern/kern_umtx.c standard Index: sys/conf/options =================================================================== --- sys/conf/options +++ sys/conf/options @@ -226,6 +226,9 @@ P1003_1B_SEMAPHORES opt_posix.h _KPOSIX_PRIORITY_SCHEDULING opt_posix.h +# PThread WorkQueue Option +THRWORKQ opt_thrworkq.h + # Do we want the config file compiled into the kernel? INCLUDE_CONFIG_FILE opt_config.h Index: sys/kern/init_sysent.c =================================================================== --- sys/kern/init_sysent.c +++ sys/kern/init_sysent.c @@ -518,8 +518,8 @@ { AS(thr_set_name_args), (sy_call_t *)sys_thr_set_name, AUE_NULL, NULL, 0, 0, SYF_CAPENABLED, SY_THR_STATIC }, /* 464 = thr_set_name */ { AS(aio_fsync_args), (sy_call_t *)sys_aio_fsync, AUE_AIO_FSYNC, NULL, 0, 0, SYF_CAPENABLED, SY_THR_STATIC }, /* 465 = aio_fsync */ { AS(rtprio_thread_args), (sy_call_t *)sys_rtprio_thread, AUE_RTPRIO, NULL, 0, 0, SYF_CAPENABLED, SY_THR_STATIC }, /* 466 = rtprio_thread */ - { 0, (sy_call_t *)nosys, AUE_NULL, NULL, 0, 0, 0, SY_THR_ABSENT }, /* 467 = nosys */ - { 0, (sy_call_t *)nosys, AUE_NULL, NULL, 0, 0, 0, SY_THR_ABSENT }, /* 468 = nosys */ + { AS(thr_stack_args), (sy_call_t *)sys_thr_stack, AUE_NULL, NULL, 0, 0, 0, SY_THR_STATIC }, /* 467 = thr_stack */ + { AS(thr_workq_args), (sy_call_t *)sys_thr_workq, AUE_NULL, NULL, 0, 0, 0, SY_THR_STATIC }, /* 468 = thr_workq */ { 0, (sy_call_t *)nosys, AUE_NULL, NULL, 0, 0, 0, SY_THR_ABSENT }, /* 469 = __getpath_fromfd */ { 0, (sy_call_t *)nosys, AUE_NULL, NULL, 0, 0, 0, SY_THR_ABSENT }, /* 470 = __getpath_fromaddr */ { AS(sctp_peeloff_args), (sy_call_t *)lkmressys, AUE_NULL, NULL, 0, 0, SYF_CAPENABLED, SY_THR_ABSENT }, /* 471 = sctp_peeloff */ Index: sys/kern/kern_exec.c =================================================================== --- sys/kern/kern_exec.c +++ sys/kern/kern_exec.c @@ -31,6 +31,7 @@ #include "opt_compat.h" #include "opt_hwpmc_hooks.h" #include "opt_ktrace.h" +#include "opt_thrworkq.h" #include "opt_vm.h" #include @@ -65,6 +66,9 @@ #include #include #include +#ifdef THRWORKQ +#include +#endif #include #include #include @@ -214,6 +218,7 @@ int error; error = pre_execve(td, &oldvmspace); + if (error != 0) return (error); error = exec_copyin_args(&args, uap->fname, UIO_USERSPACE, @@ -297,6 +302,10 @@ error = ERESTART; PROC_UNLOCK(p); } +#ifdef THRWORKQ + if (error == 0) + thrworkq_exit(p); +#endif KASSERT(error != 0 || (td->td_pflags & TDP_EXECVMSPC) == 0, ("nested execve")); *oldvmspace = p->p_vmspace; Index: sys/kern/kern_exit.c =================================================================== --- sys/kern/kern_exit.c +++ sys/kern/kern_exit.c @@ -39,6 +39,7 @@ #include "opt_compat.h" #include "opt_ktrace.h" +#include "opt_thrworkq.h" #include #include @@ -72,6 +73,9 @@ #include #include #include +#ifdef THRWORKQ +#include +#endif #ifdef KTRACE #include #endif @@ -205,6 +209,13 @@ panic("Going nowhere without my init!"); } +#ifdef THRWORKQ + /* + * Check if this process has a thread workqueue. + */ + thrworkq_exit(p); +#endif + /* * Deref SU mp, since the thread does not return to userspace. */ Index: sys/kern/kern_mutex.c =================================================================== --- sys/kern/kern_mutex.c +++ sys/kern/kern_mutex.c @@ -40,6 +40,7 @@ #include "opt_ddb.h" #include "opt_hwpmc_hooks.h" #include "opt_sched.h" +#include "opt_thrworkq.h" #include #include @@ -1133,6 +1134,10 @@ blocked_lock.mtx_lock = 0xdeadc0de; /* Always blocked. */ mtx_init(&proc0.p_mtx, "process lock", NULL, MTX_DEF | MTX_DUPOK); mtx_init(&proc0.p_slock, "process slock", NULL, MTX_SPIN); +#ifdef THRWORKQ + mtx_init(&proc0.p_twqlock, "thr workq lock", NULL, MTX_DEF | MTX_DUPOK); + proc0.p_twq = NULL; +#endif /* THRWORKQ */ mtx_init(&proc0.p_statmtx, "pstatl", NULL, MTX_SPIN); mtx_init(&proc0.p_itimmtx, "pitiml", NULL, MTX_SPIN); mtx_init(&proc0.p_profmtx, "pprofl", NULL, MTX_SPIN); Index: sys/kern/kern_proc.c =================================================================== --- sys/kern/kern_proc.c +++ sys/kern/kern_proc.c @@ -37,6 +37,7 @@ #include "opt_ktrace.h" #include "opt_kstack_pages.h" #include "opt_stack.h" +#include "opt_thrworkq.h" #include #include @@ -256,6 +257,10 @@ cv_init(&p->p_pwait, "ppwait"); cv_init(&p->p_dbgwait, "dbgwait"); TAILQ_INIT(&p->p_threads); /* all threads in proc */ +#ifdef THRWORKQ + mtx_init(&p->p_twqlock, "thr workq lock", NULL, MTX_DEF | MTX_DUPOK); + p->p_twq = NULL; +#endif /* THRQORKQ */ EVENTHANDLER_INVOKE(process_init, p); p->p_stats = pstats_alloc(); p->p_pgrp = NULL; Index: sys/kern/kern_synch.c =================================================================== --- sys/kern/kern_synch.c +++ sys/kern/kern_synch.c @@ -417,6 +417,11 @@ SCHED_STAT_INC(sched_switch_stats[flags & SW_TYPE_MASK]); #endif /* + * Do the context switch callback before blocking. + */ + if (td->td_cswitchcb != NULL) + (*td->td_cswitchcb)(SWCB_BLOCK, td); + /* * Compute the amount of time during which the current * thread was running, and add that to its total so far. */ @@ -439,6 +444,11 @@ CTR4(KTR_PROC, "mi_switch: new thread %ld (td_sched %p, pid %ld, %s)", td->td_tid, td_get_sched(td), td->td_proc->p_pid, td->td_name); + /* + * Do the context switch callback for unblocking. + */ + if (td->td_cswitchcb != NULL) + (*td->td_cswitchcb)(SWCB_UNBLOCK, td); /* * If the last thread was exiting, finish cleaning it up. */ Index: sys/kern/kern_thr.c =================================================================== --- sys/kern/kern_thr.c +++ sys/kern/kern_thr.c @@ -29,9 +29,12 @@ #include "opt_compat.h" #include "opt_posix.h" +#include "opt_thrworkq.h" + #include #include #include +#include #include #include #include @@ -52,6 +55,7 @@ #include #include #include +#include #include #include @@ -59,8 +63,25 @@ #include +#include +#include +#include +#include + #include +/* + * Default stack guard size for thread. If set to zero then no + * guard page. + */ +#define THR_GUARD_DEFAULT PAGE_SIZE + +/* + * XXX - These should most likely be sysctl parameters. + */ +static vm_size_t thr_stack_default = THR_STACK_DEFAULT; +static vm_size_t thr_stack_initial = THR_STACK_INITIAL; + static SYSCTL_NODE(_kern, OID_AUTO, threads, CTLFLAG_RW, 0, "thread allocation"); @@ -312,8 +333,15 @@ umtx_thread_exit(td); - /* Signal userland that it can free the stack. */ +#ifdef THRWORKQ + if (td->td_reuse_stack != NULL) { + thrworkq_reusestack(td->td_proc, td->td_reuse_stack); + td->td_reuse_stack = NULL; + } +#endif + if ((void *)uap->state != NULL) { + /* Signal userland that it can free the stack. */ suword_lwpid(uap->state, 1); kern_umtx_wake(td, uap->state, INT_MAX, 0); } @@ -599,6 +627,137 @@ } int +sys_thr_stack(struct thread *td, struct thr_stack_args *uap) +{ + vm_size_t stacksz, guardsz; + void *addr; + int error; + + /* Round up to the nearest page size. */ + stacksz = (vm_size_t)round_page(uap->stacksize); + guardsz = (vm_size_t)round_page(uap->guardsize); + + if (stacksz == 0) + stacksz = thr_stack_default; + + error = kern_thr_stack(td->td_proc, &addr, stacksz, guardsz); + + td->td_retval[0] = (register_t) addr; + + return (error); +} + +/* + * kern_thr_stack() maps a new thread stack in the process. It returns + * the stack address in the 'addr' arg. + * + * Base address of the last stack allocated (including its red zone, if + * there is one). Stacks are allocated contiguously, starting beyond the + * top of the main stack. When a new stack is created, a red zone is + * typically created (actually, the red zone is mapped with PROT_NONE) above + * the top of the stack, such that the stack will not be able to grow all + * the way to the bottom of the next stack. This isn't fool-proof. It is + * possible for a stack to grow by a large amount, such that it grows into + * the next stack, and as long as the memory within the red zone is never + * accessed, nothing will prevent one thread stack from trouncing all over + * the next. + * + * low memory + * . . . . . . . . . . . . . . . . . . + * | | + * | stack 3 | start of 3rd thread stack + * +-----------------------------------+ + * | | + * | Red Zone (guard page) | red zone for 2nd thread + * | | + * +-----------------------------------+ + * | stack 2 - thr_stack_default | top of 2nd thread stack + * | | + * | | + * | | + * | | + * | stack 2 | + * +-----------------------------------+ <-- start of 2nd thread stack + * | | + * | Red Zone (guard page) | red zone for 1st thread + * | | + * +-----------------------------------+ + * | stack 1 - thr_stack_default | top of 1st thread stack + * | | + * | | + * | | + * | | + * | stack 1 | + * +-----------------------------------+ <-- start of 1st thread stack + * | | (initial value of p->p_thrstack) + * | Red Zone (guard page) | + * | | red zone for main thread + * +-----------------------------------+ + * | ->sv_usrstack - thr_stack_initial | top of main thread stack + * | | ^ + * | | | + * | | | + * | | | stack growth + * | | + * +-----------------------------------+ <-- start of main thread stack + * (p->p_sysent->sv_usrstack) + * high memory + * + * XXX - This code assumes that the stack always grows down in address space. + */ +int +kern_thr_stack(struct proc *p, void **addr, vm_size_t stacksz, + vm_size_t guardsz) +{ + vm_offset_t stackaddr; + vm_map_t map; + int error; + + KASSERT(stacksz != 0, ("[%s: %d] stacksz = 0", __FILE__, __LINE__)); + + *addr = NULL; + + PROC_LOCK(p); + if (p->p_thrstack == 0) { + /* Compute the start of the first thread stack. */ + p->p_thrstack = p->p_sysent->sv_usrstack - + (vm_offset_t)(thr_stack_initial + THR_GUARD_DEFAULT); + } + + stackaddr = p->p_thrstack - (vm_offset_t)(stacksz + guardsz); + + /* + * Compute the next stack location unconditionally. Under normal + * operating conditions, the most likely reason for no being able + * to map the thread stack is a stack overflow of the adjacent + * thread stack. + */ + p->p_thrstack -= (vm_offset_t)(stacksz + guardsz); + PROC_UNLOCK(p); + + map = &p->p_vmspace->vm_map; + error = vm_mmap(map, &stackaddr, (stacksz + guardsz), VM_PROT_ALL, + PROT_READ | PROT_WRITE, MAP_STACK, OBJT_DEFAULT, NULL, 0); + if (error) + return (error); + + if (guardsz != 0) { + error = vm_map_protect(map, stackaddr, stackaddr + guardsz, + PROT_NONE, 0); + if (error) { + /* unmap memory */ + (void) vm_map_remove(map, stackaddr, stackaddr + + (stacksz + guardsz)); + + return (error); + } + } + + *addr = (void *)(stackaddr + guardsz); + return (0); +} + +int kern_thr_alloc(struct proc *p, int pages, struct thread **ntd) { Index: sys/kern/kern_thread.c =================================================================== --- sys/kern/kern_thread.c +++ sys/kern/kern_thread.c @@ -194,6 +194,9 @@ td->td_tid = tid_alloc(); + td->td_cswitchcb = NULL; + td->td_threadlist = NULL; + td->td_reuse_stack = NULL; /* * Note that td_critnest begins life as 1 because the thread is not * running and is thereby implicitly waiting to be on the receiving @@ -824,6 +827,12 @@ if (td2 == td) continue; thread_lock(td2); + /* a workq thread may not actually be runnable */ + if (td2->td_state == TDS_INACTIVE && (td2->td_flags & TDF_WORKQ)) { + thread_unlock(td2); + thread_stopped(p); + continue; + } td2->td_flags |= TDF_ASTPENDING | TDF_NEEDSUSPCHK; if (TD_IS_INHIBITED(td2)) { wakeup_swapper |= weed_inhib(mode, td2, p); Index: sys/kern/kern_thrworkq.c =================================================================== --- /dev/null +++ sys/kern/kern_thrworkq.c @@ -0,0 +1,1863 @@ +/*- + * Copyright (c) 2009-2014, Stacey Son + * Coryright (c) 2000-2009, Apple, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice unmodified, this list of conditions, and the following + * disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * $FreeBSD$ + * + */ + +#include "opt_thrworkq.h" + +#ifdef THRWORKQ + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#if !(defined(__i386__) || defined(__x86_64__) || defined(__sparc64__) || \ + defined(__sparc__) || defined(__ia64__)) +/* + * XXX atomic.h for each arch that doesn't have atomic_*_64() should maybe + * have something like the following. + */ +static struct mtx atomic_mtx; +MTX_SYSINIT(atomic, &atomic_mtx, "atomic_mtx", MTX_DEF); + +static __inline u_int32_t +atomic_cmpset_64(volatile u_int64_t *p, volatile u_int64_t cmpval, + volatile u_int64_t newval) +{ + int ret; + + mtx_lock(&atomic_mtx); + if (*p == cmpval) { + *p = newval; + ret = 1; + } else { + ret = 0; + } + mtx_unlock(&atomic_mtx); + + return (ret); +} +#endif + +struct threadlist { + TAILQ_ENTRY(threadlist) th_entry; + struct thread *th_thread; + int th_flags; + uint16_t th_affinity_tag; + uint16_t th_priority; + struct thrworkq *th_workq; + stack_t th_stack; +}; + +/* + * threadlist flags. + */ +#define TH_LIST_INITED 0x01 +#define TH_LIST_RUNNING 0x02 +#define TH_LIST_BLOCKED 0x04 +#define TH_LIST_UNSCHEDULED 0x08 +#define TH_LIST_BUSY 0x10 +#define TH_LIST_SBUSY 0x20 + +struct workitem { + TAILQ_ENTRY(workitem) wi_entry; + void *wi_item; + uint32_t wi_affinity; +}; + +struct workitemlist { + TAILQ_HEAD(, workitem) wl_itemlist; + TAILQ_HEAD(, workitem) wl_freelist; +}; + +struct thrworkq { + void *wq_workqfunc; /* user workq function */ + void *wq_newtdfunc; /* funciton called on new td start up */ + void *wq_exitfunc; /* funciton called on td shutdown */ + char *wq_ptlsbase; /* parent TLS base */ + struct thread *wq_pthread; /* parent thread */ + size_t wq_stacksize; /* stack size of each worker thread */ + size_t wq_guardsize; /* stack guard size. Usually a page. */ + struct workitem wq_array[WORKQ_OS_ELEM_MAX * WORKQ_OS_NUMPRIOS]; + struct proc *wq_proc; + struct proc *wq_atimer_thread; /* timer kernel thread */ + struct cv wq_atimer_cv; /* timer condition var */ + struct callout *wq_atimer_call; + int wq_flags; + int wq_itemcount; + uint64_t wq_thread_yielded_timestamp; + uint32_t wq_thread_yielded_count; + uint32_t wq_timer_interval; + uint32_t wq_affinity_max; + uint32_t wq_threads_scheduled; + uint32_t wq_nthreads; /* num of thread in workq */ + uint32_t wq_thidlecount; /* idle threads waiting */ + /* requested concurrency for each priority level */ + uint32_t wq_reqconc[WORKQ_OS_NUMPRIOS]; + /* priority based item list */ + struct workitemlist wq_list[WORKQ_OS_NUMPRIOS]; + uint32_t wq_list_bitmap; + TAILQ_HEAD(, threadlist) wq_thrunlist; /* workq threads working. */ + TAILQ_HEAD(, threadlist) wq_thidlelist; /* workq threads idle. */ + void **wq_stacklist; /* recycled stacks FIFO. */ + uint32_t wq_stacktop; /* top of stack list FIFO. */ + uint32_t wq_maxthreads; /* max num of threads for + this workq. */ + uint32_t *wq_thactive_count[WORKQ_OS_NUMPRIOS]; + uint32_t *wq_thscheduled_count[WORKQ_OS_NUMPRIOS]; + uint64_t *wq_lastblocked_ts[WORKQ_OS_NUMPRIOS]; +}; + +/* + * Workqueue flags (wq_flags). + */ +#define WQ_LIST_INITED 0x01 +#define WQ_ATIMER_RUNNING 0x02 +#define WQ_EXITING 0x04 + +/* + * Upcall types for twq_set_upcall(). + */ +#define WQ_UPCALL_NEWTD 1 +#define WQ_UPCALL_WORKQ 2 +#define WQ_UPCALL_EXIT 3 + +#define WORKQUEUE_LOCK(p) mtx_lock(&(p)->p_twqlock) +#define WORKQUEUE_UNLOCK(p) mtx_unlock(&(p)->p_twqlock) +#define WORKQUEUE_ASSERT_LOCKED(p) mtx_assert(&(p)->p_twqlock, MA_OWNED) + +#define WQ_TIMER_NEEDED(wq, start_timer) do { \ + int oldflags = wq->wq_flags; \ + \ + if ( !(oldflags & (WQ_EXITING | WQ_ATIMER_RUNNING))) { \ + if (atomic_cmpset_32(&wq->wq_flags, oldflags, \ + oldflags | WQ_ATIMER_RUNNING)) \ + start_timer = 1; \ + } \ +} while (0) + +static MALLOC_DEFINE(M_THRWORKQ, "thr_workq", "Thread Workqueue"); + +static int twq_additem(struct thrworkq *wq, int prio, void *item, + int affinity); +static int twq_removeitem(struct thrworkq *wq, int prio, void *item); +static int twq_run_nextitem(struct proc *p, struct thrworkq *wq, + struct thread *td, void *oc_item, int oc_prio, + int oc_affinity); +static void twq_runitem(struct proc *p, void *item, struct thread *td, + struct threadlist *tl, int wake_thread); +static int twq_unpark(struct thread *td, int timedout); +static int twq_addnewthread(struct thrworkq *wq); +static void twq_removethread(struct threadlist *tl); +static void twq_add_timer(void *arg); +static void twq_interval_timer_start(struct thrworkq *wq); +static void twq_callback(int type, struct thread *td); +static int twq_thr_busy(uint64_t cur_ts, uint64_t *lastblocked_tsp); +static int twq_init_workqueue(struct proc *p, struct twq_param *arg); +static int twq_timer_work(struct proc *p, struct thrworkq *wq, + int *start_timer); + +/* + * Thread work queue tunable paramaters defaults. + */ +#define WORKQUEUE_MAXTHREADS 512 /* Max num of threads / workQ */ +#define WQ_YIELDED_THRESHOLD 2000 /* Max num of threads to yield + in window */ +#define WQ_YIELDED_WINDOW_USECS 30000 /* Yield window interval size */ +#define WQ_STALLED_WINDOW_USECS 200 /* Useconds until thread is + considered stalled */ +#define WQ_REDUCE_POOL_WINDOW_USECS 5000000 /* Useconds until idle thread + is removed */ +#define WQ_MAX_TIMER_INTERVAL_USECS 50000 /* Useconds to wait to check for + stalled or idle threads */ + +/* + * Thread work queue tunable parameters. + */ +static uint32_t wq_yielded_threshold = WQ_YIELDED_THRESHOLD; +static uint32_t wq_yielded_window_usecs = WQ_YIELDED_WINDOW_USECS; +static uint32_t wq_stalled_window_usecs = WQ_STALLED_WINDOW_USECS; +static uint32_t wq_reduce_pool_window_usecs = WQ_REDUCE_POOL_WINDOW_USECS; +static uint32_t wq_max_timer_interval_usecs = WQ_MAX_TIMER_INTERVAL_USECS; +static uint32_t wq_max_threads = WORKQUEUE_MAXTHREADS; +extern int max_threads_per_proc; + +SYSCTL_INT(_kern, OID_AUTO, wq_yielded_threshold, CTLFLAG_RW, + &wq_yielded_threshold, 0, "Max number of threads to yield in window"); +SYSCTL_INT(_kern, OID_AUTO, wq_yielded_window_usecs, CTLFLAG_RW, + &wq_yielded_window_usecs, 0, "Size of yielded window in useconds"); +SYSCTL_INT(_kern, OID_AUTO, wq_stalled_window_usecs, CTLFLAG_RW, + &wq_stalled_window_usecs, 0, "Useconds until thread is stalled"); +SYSCTL_INT(_kern, OID_AUTO, wq_reduce_pool_window_usecs, CTLFLAG_RW, + &wq_reduce_pool_window_usecs, 0, "Useconds until idle thread is removed"); +SYSCTL_INT(_kern, OID_AUTO, wq_max_timer_interval_usecs, CTLFLAG_RW, + &wq_max_timer_interval_usecs, 0, + "Useconds between stalled/idle thread checks"); +SYSCTL_INT(_kern, OID_AUTO, wq_max_threads, CTLFLAG_RW, + &wq_max_threads, 0, "Max num of threads per workq"); + +/* + * Set up callback from mi_switch(). + */ +static void +twq_set_schedcallback(struct thread *td, mi_switchcb_t cswitchcb) +{ + + td->td_cswitchcb = cswitchcb; +} + +static void +twq_set_upcall(struct threadlist *tl, int which, void *item) +{ + struct thrworkq *wq = tl->th_workq; + void *func; + + /* XXX should thread sched lock be held?? */ + + KASSERT(wq != NULL, ("[%s: %d] twq_set_upcall: wq == NULL", __FILE__, + __LINE__)); + + switch (which) { + + case WQ_UPCALL_NEWTD: + func = wq->wq_newtdfunc; + break; + + case WQ_UPCALL_WORKQ: + func = wq->wq_workqfunc; + break; + + case WQ_UPCALL_EXIT: + func = wq->wq_exitfunc; + break; + + default: + panic("twq_set_upcall: unknown upcall type"); + } + + cpu_set_upcall_kse(tl->th_thread, func, item, &tl->th_stack); +} + +static void +twq_schedthr(struct thread *newtd) +{ + + thread_lock(newtd); + TD_SET_CAN_RUN(newtd); + sched_add(newtd, SRQ_BORING); + thread_unlock(newtd); +} + + +static uint64_t +twq_microuptime(void) +{ + struct timeval t; + + microuptime(&t); + return ((u_int64_t)t.tv_sec * 1000000 + (u_int64_t)t.tv_usec); +} + +static uint32_t +twq_usecstoticks(uint32_t usec) +{ + struct timeval tv; + uint32_t tticks; + + tv.tv_sec = usec / 1000000; + tv.tv_usec = usec - (tv.tv_sec * 1000000); + tticks = tvtohz(&tv); + + return (tticks); +} + +static void +twq_interval_timer_start(struct thrworkq *wq) +{ + uint32_t deadline; + + if (wq->wq_timer_interval == 0) + wq->wq_timer_interval = wq_stalled_window_usecs; + else { + + wq->wq_timer_interval = wq->wq_timer_interval * 2; + if (wq->wq_timer_interval > wq_max_timer_interval_usecs) + wq->wq_timer_interval = wq_max_timer_interval_usecs; + } + + deadline = twq_usecstoticks(wq->wq_timer_interval); + callout_reset_curcpu(wq->wq_atimer_call, deadline, twq_add_timer, + wq); +} + +static int +twq_thr_busy(uint64_t cur_ts, uint64_t *lastblocked_tsp) +{ + uint64_t lastblocked_ts; + uint64_t elapsed; + + /* + * The timestap is updated atomically w/o holding the workqueue + * lock so we need to do an atomic read of the 64 bits so that + * we don't see a mismatched pair of 32 bit reads. We accomplish + * this in an architecturally independent fashion by using + * atomic_cmpset_64 to write back the value we grabbed. If it + * succeeds then we have a good timestamp to evalute. If it fails + * we straddled grabbing the timestamp while it was being updated. + * Treat a failed update as a busy thread since it implies we are + * about to see a really fresh timestamp anyway. + * + */ + lastblocked_ts = *lastblocked_tsp; + + if (!atomic_cmpset_64(lastblocked_tsp, lastblocked_ts, lastblocked_ts)) + return (1); + + if (lastblocked_ts >= cur_ts) { + /* + * Because the update of the timestamp when a thread blocks + * isn't serialized against us looking at it (i.e. we don't + * hold the workq lock) it's possible to have a timestamp that + * matches the current time or that even looks to be in the + * future relative to when we grabbed the current time. + * Just treat this as a busy thread since it must have just + * blocked. + */ + return (1); + } + + /* Timestamps are in usecs. */ + elapsed = cur_ts - lastblocked_ts; + + if (elapsed < (uint64_t)wq_stalled_window_usecs) + return (1); + + return (0); +} + +static void +twq_add_timer(void *arg) +{ + struct thrworkq *wq = (struct thrworkq *)arg; + + + cv_signal(&wq->wq_atimer_cv); +} + +static int +twq_timer_work(struct proc *p, struct thrworkq *wq, int *start_timer) +{ + int retval; + int add_thread; + uint32_t busycount; + uint32_t priority; + uint32_t affinity_tag; + uint32_t i; + uint64_t curtime; + + + WORKQUEUE_ASSERT_LOCKED(p); + + retval = 1; + add_thread = 0; + + /* + * Check to see if the stall frequency was beyond our tolerance + * or we have work on the queue, but haven't scheduled any new + * work within our acceptable time interval because there were + * no idle threads left to schedule. + */ + if (wq->wq_itemcount) { + + for (priority = 0; priority < WORKQ_OS_NUMPRIOS; priority++) { + if (wq->wq_list_bitmap & (1 << priority)) + break; + } + + KASSERT(priority < WORKQ_OS_NUMPRIOS, + ("[%s: %d] priority >= WORKQ_OS_NUMPRIOS", __FILE__, + __LINE__)); + + curtime = twq_microuptime(); + busycount = 0; + + for (affinity_tag = 0; affinity_tag < wq->wq_reqconc[priority]; + affinity_tag++) { + /* + * if we have no idle threads, we can try to + * add them if needed. + */ + if (wq->wq_thidlecount == 0) + add_thread = 1; + + /* + * Look for first affinity group that is + * currently not active. i.e. no active + * threads at this priority level or higher + * and has not been active recently at this + * priority level or higher. + */ + for (i = 0; i <= priority; i++) { + if (wq->wq_thactive_count[i][affinity_tag]) { + add_thread = 0; + break; + } + if (wq->wq_thscheduled_count[i][affinity_tag]) { + if (twq_thr_busy(curtime, + &wq->wq_lastblocked_ts[i] + [affinity_tag])) { + add_thread = 0; + busycount++; + break; + } + } + } + if (add_thread) { + retval = twq_addnewthread(wq); + break; + } + } + if (wq->wq_itemcount) { + /* + * As long as we have threads to schedule, and + * we successfully scheduled new work, keep + * trying. + */ + while (wq->wq_thidlecount && + !(wq->wq_flags & WQ_EXITING)) { + /* + * twq_run_nextitem is + * responsible for dropping the + * workqueue lock in all cases. + */ + retval = twq_run_nextitem(p, wq, NULL, 0, 0, 0); + WORKQUEUE_LOCK(p); + + if (retval == 0) + break; + } + if ( !(wq->wq_flags & WQ_EXITING) && wq->wq_itemcount) { + if (wq->wq_thidlecount == 0 && retval && + add_thread) + return (1); + + if (wq->wq_thidlecount == 0 || busycount) + WQ_TIMER_NEEDED(wq, *start_timer); + } + } + } + + return (0); +} + + +static void +twq_timer_kthread(void *arg) +{ + struct thrworkq *wq = (struct thrworkq *)arg; + struct proc *p; + int start_timer; + + p = wq->wq_proc; + + while (1) { + + WORKQUEUE_LOCK(p); + + cv_wait(&wq->wq_atimer_cv, &p->p_twqlock); + + start_timer = 0; + + /* + * The workq lock will protect us from seeing WQ_EXITING change + * state, but we still need to update this atomically in case + * someone else tries to start the timer just as we're + * releasing it. + */ + while ( !(atomic_cmpset_32(&wq->wq_flags, wq->wq_flags, + (wq->wq_flags & ~WQ_ATIMER_RUNNING)))); + + while (!(wq->wq_flags & WQ_EXITING) && + twq_timer_work(p, wq, &start_timer)); + + if ( !(wq->wq_flags & WQ_ATIMER_RUNNING)) + wq->wq_timer_interval = 0; + + if (wq->wq_flags & WQ_EXITING) + break; + + WORKQUEUE_UNLOCK(p); + + if (start_timer) + twq_interval_timer_start(wq); + + } + + wq->wq_atimer_thread = NULL; + WORKQUEUE_UNLOCK(p); + kproc_exit(0); +} + +/* + * thrworkq_thread_yielded is called when an user thread voluntary yields. + */ +void +thrworkq_thread_yielded(void) +{ + struct thrworkq *wq; + struct proc *p = curproc; + + if ((wq = p->p_twq) == NULL || wq->wq_itemcount == 0) + return; + + WORKQUEUE_LOCK(p); + + if (wq->wq_itemcount) { + uint64_t curtime; + uint64_t elapsed; + + if (wq->wq_thread_yielded_count++ == 0) + wq->wq_thread_yielded_timestamp = twq_microuptime(); + + if (wq->wq_thread_yielded_count < wq_yielded_threshold) { + WORKQUEUE_UNLOCK(p); + return; + } + + wq->wq_thread_yielded_count = 0; + + curtime = twq_microuptime(); + elapsed = curtime - wq->wq_thread_yielded_timestamp; + + if (elapsed < wq_yielded_window_usecs) { + + /* + * We have 'wq_yielded_threadhold' or more threads + * yielding within a 'wq_yielded_window_usecs' period + * of time. Let's see if we need to add a thread or + * assign some work. + */ + + if (wq->wq_thidlecount == 0) { + (void) twq_addnewthread(wq); + /* + * 'twq_addnewthread' drops the workqueue + * lock when creating the new thread and then + * retakes it before returning. This window + * allows other threads to process work on the + * queue, so we need to recheck for available + * work if none found, we just return. The + * newly created thread will eventually get + * used (if it hasn't already). + */ + if (wq->wq_itemcount == 0) { + WORKQUEUE_UNLOCK(p); + return; + } + } + if (wq->wq_thidlecount) { + uint32_t priority; + uint32_t affinity = -1; + void *item; + struct workitem *witem = NULL; + struct workitemlist *wl = NULL; + struct thread *td; + struct threadlist *tl; + + /* + * We have an idle thread. Let's assign some + * work. + */ + + td = curthread; + if ((tl = td->td_threadlist)) + affinity = tl->th_affinity_tag; + + for (priority = 0; + priority < WORKQ_OS_NUMPRIOS; priority++) { + if (wq->wq_list_bitmap & + (1 << priority)) { + + wl = (struct workitemlist *) + &wq->wq_list[priority]; + break; + } + } + KASSERT(wl != NULL, ("[%s: %d] wl == NULL", + __FILE__, __LINE__)); + KASSERT(!(TAILQ_EMPTY(&wl->wl_itemlist)), + ("[%s: %d] wl_itemlist not empty", + __FILE__, __LINE__)); + + witem = TAILQ_FIRST(&wl->wl_itemlist); + TAILQ_REMOVE(&wl->wl_itemlist, witem, wi_entry); + + if (TAILQ_EMPTY(&wl->wl_itemlist)) + wq->wq_list_bitmap &= ~(1 << priority); + wq->wq_itemcount--; + + item = witem->wi_item; + witem->wi_item = (void *)0; + witem->wi_affinity = 0; + + TAILQ_INSERT_HEAD(&wl->wl_freelist, witem, + wi_entry); + + (void)twq_run_nextitem(p, wq, + NULL, item, priority, affinity); + /* + * twq_run_nextitem is responsible for + * dropping the workqueue lock in all cases. + */ + + return; + } + } + } + WORKQUEUE_UNLOCK(p); +} + +/* + * Callback for miswitch(). It is called before and after a context switch. + */ +static void +twq_callback(int type, struct thread *td) +{ + struct threadlist *tl; + struct thrworkq *wq; + + tl = td->td_threadlist; + wq = tl->th_workq; + + switch (type) { + + case SWCB_BLOCK: + { + uint32_t old_activecount; + + old_activecount = atomic_fetchadd_32( + &wq->wq_thactive_count[tl->th_priority] + [tl->th_affinity_tag], -1); + if (old_activecount == 1) { + int start_timer = 0; + uint64_t curtime; + uint64_t *lastblocked_ptr; + + /* + * We were the last active thread on this + * affinity set and we've got work to do. + */ + lastblocked_ptr = + &wq->wq_lastblocked_ts[tl->th_priority] + [tl->th_affinity_tag]; + curtime = twq_microuptime(); + + /* + * If we collide with another thread trying + * to update the last_blocked (really + * unlikely since another thread would have to + * get scheduled and then block after we start + * down this path), it's not a problem. Either + * timestamp is adequate, so no need to retry. + */ + (void)atomic_cmpset_64(lastblocked_ptr, + *lastblocked_ptr, curtime); + if (wq->wq_itemcount) + WQ_TIMER_NEEDED(wq, start_timer); + + if (start_timer) + twq_interval_timer_start(wq); + } + } + break; + + case SWCB_UNBLOCK: + /* + * We cannot take the workqueue_lock here. An UNBLOCK can occur + * from a timer event whichis run from an interrupt context. If + * the workqueue_lock is already held by this processor, we'll + * deadlock. The thread lock for this thread being UNBLOCKED + * is also held. + */ + atomic_add_32(&wq->wq_thactive_count[tl->th_priority] + [tl->th_affinity_tag], 1); + break; + } +} + +static void +twq_removethread(struct threadlist *tl) +{ + struct thrworkq *wq; + struct thread *td; + + wq = tl->th_workq; + + TAILQ_REMOVE(&wq->wq_thidlelist, tl, th_entry); + + wq->wq_nthreads--; + wq->wq_thidlecount--; + + + WORKQUEUE_UNLOCK(curproc); + + td = tl->th_thread; + + thread_lock(td); + /* + * Recycle this thread's stack. Done when the thread exits. + */ + td->td_reuse_stack = tl->th_stack.ss_sp; + + twq_set_schedcallback(td, NULL); + + /* + * Clear the threadlist pointer so blocked thread on wakeup for + * termination will not access the thread list as it is going to + * be freed. + */ + td->td_threadlist = NULL; + + /* + * Set to call the exit upcall to clean up and exit. + */ + twq_set_upcall(tl, WQ_UPCALL_EXIT, NULL); + + thread_unlock(td); + + free(tl, M_THRWORKQ); +} + +/* + * twq_addnewthread() is called with the workqueue lock held. + */ +static int +twq_addnewthread(struct thrworkq *wq) +{ + int error; + struct threadlist *tl; + void *stackaddr = NULL; + struct thread *newtd, *td; + struct proc *p = wq->wq_proc; + int try; + + WORKQUEUE_ASSERT_LOCKED(p); + + if (wq->wq_nthreads >= wq->wq_maxthreads + /* || wq->wq_nthreads >= (max_threads_per_proc - 20) */) + return (0); + wq->wq_nthreads++; + + td = wq->wq_pthread; + + /* + * See if we have a stack we can reuse. + */ + if (wq->wq_stacktop > 0) { + wq->wq_stacktop--; + stackaddr = wq->wq_stacklist[wq->wq_stacktop]; + KASSERT(stackaddr != NULL, ("[%s:%d] stackaddr = NULL", + __FILE__, __LINE__)); + wq->wq_stacklist[wq->wq_stacktop] = NULL; + } + WORKQUEUE_UNLOCK(p); + + /* + * If needed, map a new thread stack and guard page. + */ + if (stackaddr == NULL) + for (try = 0; try < 3; try++) { + error = kern_thr_stack(p, &stackaddr, wq->wq_stacksize, + wq->wq_guardsize); + if (error == 0) + break; + if (error != ENOMEM) + goto failed; + } + + newtd = thread_alloc(0); + if (newtd == NULL) { + /* Save the stack address so we can reuse it. */ + thrworkq_reusestack(p, stackaddr); + goto failed; + } + + bzero(&newtd->td_startzero, + __rangeof(struct thread, td_startzero, td_endzero)); + bcopy(&td->td_startcopy, &newtd->td_startcopy, + __rangeof(struct thread, td_startcopy, td_endcopy)); + newtd->td_proc = p; + newtd->td_ucred = crhold(td->td_ucred); + + cpu_set_upcall(newtd, td); + + /* + * Allocate thread list and init. + */ + tl = (struct threadlist *) malloc(sizeof(struct threadlist), + M_THRWORKQ, M_WAITOK | M_ZERO); + + tl->th_thread = newtd; + tl->th_workq = wq; + + tl->th_affinity_tag = -1; + tl->th_priority = WORKQ_OS_NUMPRIOS; + + tl->th_stack.ss_sp = stackaddr; + tl->th_stack.ss_size = wq->wq_stacksize; + + tl->th_flags = TH_LIST_INITED | TH_LIST_UNSCHEDULED; + + newtd->td_threadlist = (void *)tl; + + PROC_LOCK(p); + p->p_flag |= P_HADTHREADS; + newtd->td_sigmask = td->td_sigmask; + thread_link(newtd, p); + bcopy(p->p_comm, newtd->td_name, sizeof(newtd->td_name)); + thread_lock(td); + sched_fork_thread(td, newtd); + thread_unlock(td); + /* + * tell suspend handling code that if this thread is inactive + * to simply skip it + */ + newtd->td_flags |= TDF_WORKQ; + if (P_SHOULDSTOP(p)) + newtd->td_flags |= TDF_ASTPENDING | TDF_NEEDSUSPCHK; + PROC_UNLOCK(p); + + tidhash_add(newtd); + + /* + * We don't add the new thread to the scheduler yet until we find some + * work for it to do. + */ + + WORKQUEUE_LOCK(p); + TAILQ_INSERT_TAIL(&wq->wq_thidlelist, tl, th_entry); + wq->wq_thidlecount++; + + return (1); + +failed: + WORKQUEUE_LOCK(p); + wq->wq_nthreads--; + + return (0); + +} + +static int +twq_init_workqueue(struct proc *p, struct twq_param *arg) +{ + struct thrworkq *wq; + uint32_t i, j; + size_t wq_size; + char *ptr, *nptr; + struct workitem *witem; + struct workitemlist *wl; + struct callout *calloutp; + int error; + void *ssptr; + uint32_t maxthreads; + + /* 'smp_cpus' is the number of cpus running. */ + wq_size = sizeof(struct thrworkq) + + (smp_cpus * WORKQ_OS_NUMPRIOS * sizeof(uint32_t)) + + (smp_cpus * WORKQ_OS_NUMPRIOS * sizeof(uint32_t)) + + (smp_cpus * WORKQ_OS_NUMPRIOS * sizeof(uint64_t)) + + sizeof(uint64_t); + + ptr = malloc(wq_size, M_THRWORKQ, M_WAITOK | M_ZERO); + maxthreads = wq_max_threads; + ssptr = malloc(sizeof(void *) * maxthreads, M_THRWORKQ, + M_WAITOK | M_ZERO); + calloutp = (struct callout *)malloc(sizeof(struct callout), M_THRWORKQ, + M_WAITOK | M_ZERO); + + WORKQUEUE_LOCK(p); + if (p->p_twq != NULL) { + WORKQUEUE_UNLOCK(p); + free(ptr, M_THRWORKQ); + free(ssptr, M_THRWORKQ); + free(calloutp, M_THRWORKQ); + return (EINVAL); + } + + /* + * Initialize workqueue information. + */ + wq = (struct thrworkq *)ptr; + wq->wq_flags = WQ_LIST_INITED; + wq->wq_proc = p; + wq->wq_affinity_max = smp_cpus; + wq->wq_workqfunc = arg->twq_workqfunc; + wq->wq_newtdfunc = arg->twq_newtdfunc; + wq->wq_exitfunc = arg->twq_exitfunc; + if (arg->twq_stacksize == 0) + wq->wq_stacksize = THR_STACK_DEFAULT; + else + wq->wq_stacksize = round_page(arg->twq_stacksize); + wq->wq_guardsize = round_page(arg->twq_guardsize); + wq->wq_pthread = curthread; + + wq->wq_stacklist = ssptr; + wq->wq_stacktop = 0; + wq->wq_maxthreads = maxthreads; + + for (i = 0; i < WORKQ_OS_NUMPRIOS; i++) { + wl = (struct workitemlist *)&wq->wq_list[i]; + TAILQ_INIT(&wl->wl_itemlist); + TAILQ_INIT(&wl->wl_freelist); + + for (j = 0; j < WORKQ_OS_ELEM_MAX; j++) { + witem = &wq->wq_array[(i * WORKQ_OS_ELEM_MAX) + j]; + TAILQ_INSERT_TAIL(&wl->wl_freelist, witem, wi_entry); + } + wq->wq_reqconc[i] = wq->wq_affinity_max; + } + nptr = ptr + sizeof(struct thrworkq); + + for (i = 0; i < WORKQ_OS_NUMPRIOS; i++) { + wq->wq_thactive_count[i] = (uint32_t *)nptr; + nptr += (smp_cpus * sizeof(uint32_t)); + } + for (i = 0; i < WORKQ_OS_NUMPRIOS; i++) { + wq->wq_thscheduled_count[i] = (uint32_t *)nptr; + nptr += (smp_cpus * sizeof(uint32_t)); + } + + /* + * Align nptr on a 64 bit boundary so we can do atomic + * operations on the timestamps. (We allocated an extra + * uint64_t space above so we have room for this adjustment.) + */ + nptr += (sizeof(uint64_t) - 1); + nptr = (char *)((long)nptr & ~(sizeof(uint64_t) - 1)); + + for (i = 0; i < WORKQ_OS_NUMPRIOS; i++) { + wq->wq_lastblocked_ts[i] = (uint64_t *)nptr; + nptr += (smp_cpus * sizeof(uint64_t)); + } + TAILQ_INIT(&wq->wq_thrunlist); + TAILQ_INIT(&wq->wq_thidlelist); + + cv_init(&wq->wq_atimer_cv, "twq_atimer_cv"); + wq->wq_atimer_call = calloutp; + callout_init(wq->wq_atimer_call, CALLOUT_MPSAFE); + + PROC_LOCK(p); + p->p_twq = wq; + PROC_UNLOCK(p); + WORKQUEUE_UNLOCK(p); + + error = kproc_create(twq_timer_kthread, (void *)wq, + &wq->wq_atimer_thread, RFHIGHPID, 0, "twq %d", p->p_pid); + if (error) + panic("twq_init_workqueue: kproc_create returned %d", error); + + return (0); +} + +/* + * thr_workq() system call. + */ +int +sys_thr_workq(struct thread *td, struct thr_workq_args *uap) +{ + struct twq_param arg; + struct proc *p = td->td_proc; + int cmd = uap->cmd; + int prio, reqconc, affinity; + int error = 0; + void *oc_item = NULL; + struct thrworkq *wq; + + error = copyin(uap->args, &arg, sizeof(arg)); + if (error) + return (error); + + /* + * Affinity is not used yet. + */ + affinity = -1; + + switch (cmd) { + + case WQOPS_INIT: + /* + * Return the PID for the handle for now. If we decide to + * support multiple workqueues per process then we will need + * to do something different. + */ + error = suword(arg.twq_retid, p->p_pid); + if (error) + return (error); + return (twq_init_workqueue(p, &arg)); + + case WQOPS_QUEUE_ADD: + WORKQUEUE_LOCK(p); + if ((wq = p->p_twq) == NULL || arg.twq_id != p->p_pid) { + WORKQUEUE_UNLOCK(p); + return (EINVAL); + } + + prio = arg.twq_add_prio; + /* affinity = arg.twq_add_affin; XXX Not yet used. */ + + /* + * Add item to workqueue. If the WORKQUEUE_OVERCOMMIT flag + * is set we want to commit the item to a thread even if we + * have to start a new one. + */ + if (prio & WORKQUEUE_OVERCOMMIT) { + prio &= ~WORKQUEUE_OVERCOMMIT; + oc_item = arg.twq_add_item; + } + if ((prio < 0) || (prio >= WORKQ_OS_NUMPRIOS)) { + WORKQUEUE_UNLOCK(p); + return (EINVAL); + } + + if (wq->wq_thidlecount == 0 && + (oc_item || (wq->wq_nthreads < wq->wq_affinity_max))) { + (void) twq_addnewthread(wq); + + /* + * If we can start a new thread then this work item + * will have to wait on the queue. + */ + if (wq->wq_thidlecount == 0) + oc_item = NULL; + } + if (oc_item == NULL) + error = twq_additem(wq, prio, arg.twq_add_item, + affinity); + + /* twq_run_nextitem() drops the workqueue lock. */ + (void)twq_run_nextitem(p, wq, NULL, oc_item, prio, affinity); + + return (error); + + case WQOPS_QUEUE_REMOVE: + WORKQUEUE_LOCK(p); + if ((wq = p->p_twq) == NULL || arg.twq_id != p->p_pid) { + WORKQUEUE_UNLOCK(p); + return (EINVAL); + } + + prio = arg.twq_rm_prio; + /* affinity = arg.twq_add_affin; Not yet used. */ + + /* + * Remove item from workqueue. + */ + if ((prio < 0) || (prio >= WORKQ_OS_NUMPRIOS)) { + WORKQUEUE_UNLOCK(p); + return (EINVAL); + } + + + error = twq_removeitem(wq, prio, arg.twq_rm_item); + + /* + * twq_run_nextitem() drops the workqueue lock. See if + * we can assign a work item to an idle thread. + */ + (void)twq_run_nextitem(p, wq, NULL, NULL, prio, affinity); + + return (error); + + case WQOPS_THREAD_RETURN: + WORKQUEUE_LOCK(p); + if ((wq = p->p_twq) == NULL || arg.twq_id != p->p_pid) { + WORKQUEUE_UNLOCK(p); + return (EINVAL); + } + + if (td->td_threadlist == NULL) { + WORKQUEUE_UNLOCK(p); + return (EINVAL); + } + + /* + * twq_run_nextitem() drops the workqueue lock. Assign + * any pending work to this (or other idle) thread. + */ + (void)twq_run_nextitem(p, wq, td, NULL, 0, -1); + + return (0); + + case WQOPS_THREAD_SETCONC: + WORKQUEUE_LOCK(p); + if ((wq = p->p_twq) == NULL || arg.twq_id != p->p_pid) { + WORKQUEUE_UNLOCK(p); + return (EINVAL); + } + + prio = arg.twq_setconc_prio; + if ((prio < 0) || (prio > WORKQ_OS_NUMPRIOS)) { + WORKQUEUE_UNLOCK(p); + return (EINVAL); + } + + reqconc = arg.twq_setconc_conc; + + + if (prio < WORKQ_OS_NUMPRIOS) + wq->wq_reqconc[prio] = reqconc; + else { + for (prio = 0; prio < WORKQ_OS_NUMPRIOS; prio++) + wq->wq_reqconc[prio] = reqconc; + } + + /* + * twq_run_nextitem() drops the workqueue lock. See if + * we can assign a work item to an idle thread. + */ + (void)twq_run_nextitem(p, wq, NULL, NULL, 0, -1); + + return (0); + + default: + return (EINVAL); + } +} + +/* + * + */ +void +thrworkq_reusestack(struct proc *p, void *stackaddr) +{ + struct thrworkq *wq; + + WORKQUEUE_LOCK(p); + /* Recycle its stack. */ + if ((wq = p->p_twq) != NULL) + wq->wq_stacklist[wq->wq_stacktop++] = stackaddr; + WORKQUEUE_UNLOCK(p); +} + +/* + * thrworkq_exit is called when a process is about to exit (or has exec'ed). + */ +void +thrworkq_exit(struct proc *p) +{ + struct thrworkq *wq; + struct threadlist *tl, *tlist; + struct thread *td; + + KASSERT(p != 0, ("[%s: %d] thrworkq_exit: p = NULL", + __FILE__, __LINE__)); + + WORKQUEUE_LOCK(p); + if ((wq = p->p_twq) == NULL) { + WORKQUEUE_UNLOCK(p); + return; + } + p->p_twq = NULL; + + /* + * We now arm the timer in the callback function w/o + * holding the workq lock. WQ_ATIMER_RUNNING via + * atomic_cmpset in order to insure only a single timer is + * running and to notice that WQ_EXITING has been set been + * set (we don't want to start a timer once WQ_EXITING is + * posted). + * + * So once we have successfully set WQ_EXITING, we cannot fire + * up a new timer. Therefore no need to clear the timer state + * atomically from the flags. + * + * Since we always hold the workq_lock when dropping + * WQ_ATIMER_RUNNING the check for and sleep until clear is + * protected. + */ + + while ( !(atomic_cmpset_32(&wq->wq_flags, wq->wq_flags, + (wq->wq_flags | WQ_EXITING)))); + + if (wq->wq_flags & WQ_ATIMER_RUNNING) { + if (callout_stop(wq->wq_atimer_call) != 0) + wq->wq_flags &= ~WQ_ATIMER_RUNNING; + } + + /* Wait for timer thread to die. */ + if (wq->wq_atimer_thread) { + cv_signal(&wq->wq_atimer_cv); + if (msleep(wq->wq_atimer_thread, &p->p_twqlock, PWAIT, + "twq_atimer", 60 * hz)) + printf("thr workq timer thread didn't die."); + else + cv_destroy(&wq->wq_atimer_cv); + } + WORKQUEUE_UNLOCK(p); + + TAILQ_FOREACH_SAFE(tl, &wq->wq_thrunlist, th_entry, tlist) { + + td = tl->th_thread; + + thread_lock(td); + twq_set_schedcallback(td, NULL); + td->td_threadlist = NULL; + thread_unlock(td); + + TAILQ_REMOVE(&wq->wq_thrunlist, tl, th_entry); + + free(tl, M_THRWORKQ); + } + TAILQ_FOREACH_SAFE(tl, &wq->wq_thidlelist, th_entry, tlist) { + + td = tl->th_thread; + + thread_lock(td); + twq_set_schedcallback(td, NULL); + td->td_threadlist = NULL; + twq_set_upcall(tl, WQ_UPCALL_EXIT, NULL); + thread_unlock(td); + + if (tl->th_flags & TH_LIST_UNSCHEDULED) { + /* + * Schedule unscheduled the thread so it can exit. + */ + tl->th_flags &= ~TH_LIST_UNSCHEDULED; + twq_schedthr(td); + } + + TAILQ_REMOVE(&wq->wq_thrunlist, tl, th_entry); + + free(tl, M_THRWORKQ); + } + + callout_drain(wq->wq_atimer_call); + free(wq->wq_atimer_call, M_THRWORKQ); + + free(wq->wq_stacklist, M_THRWORKQ); + + free(wq, M_THRWORKQ); +} + +/* + * Add item to workqueue. Workqueue lock must be held. + */ +static int +twq_additem(struct thrworkq *wq, int prio, void *item, int affinity) +{ + struct workitem *witem; + struct workitemlist *wl; + + WORKQUEUE_ASSERT_LOCKED(wq->wq_proc); + + wl = (struct workitemlist *)&wq->wq_list[prio]; + + if (TAILQ_EMPTY(&wl->wl_freelist)) + return (ENOMEM); + + witem = (struct workitem *)TAILQ_FIRST(&wl->wl_freelist); + TAILQ_REMOVE(&wl->wl_freelist, witem, wi_entry); + + witem->wi_item = item; + witem->wi_affinity = affinity; + TAILQ_INSERT_TAIL(&wl->wl_itemlist, witem, wi_entry); + + wq->wq_list_bitmap |= (1 << prio); + + wq->wq_itemcount++; + + return (0); +} + +/* + * Remove item from workqueue. Workqueue lock must be held. + */ +static int +twq_removeitem(struct thrworkq *wq, int prio, void *item) +{ + struct workitem *witem; + struct workitemlist *wl; + int error = ESRCH; + + WORKQUEUE_ASSERT_LOCKED(wq->wq_proc); + + wl = (struct workitemlist *)&wq->wq_list[prio]; + + TAILQ_FOREACH(witem, &wl->wl_itemlist, wi_entry) { + if (witem->wi_item == item) { + TAILQ_REMOVE(&wl->wl_itemlist, witem, wi_entry); + + if (TAILQ_EMPTY(&wl->wl_itemlist)) + wq->wq_list_bitmap &= ~(1 << prio); + wq->wq_itemcount--; + + witem->wi_item = NULL; + witem->wi_affinity = 0; + TAILQ_INSERT_HEAD(&wl->wl_freelist, witem, wi_entry); + + error = 0; + break; + } + } + return (error); +} + +/* + * twq_run_nextitem is called with the workqueue lock held and + * must drop it in all cases. + */ +static int +twq_run_nextitem(struct proc *p, struct thrworkq *wq, + struct thread *thread, void * oc_item, int oc_prio, int oc_affinity) +{ + struct workitem *witem = NULL; + void *item = 0; + struct thread *th_to_run = NULL; + struct thread *th_to_park = NULL; + int wake_thread = 0; + uint32_t priority, orig_priority; + uint32_t affinity_tag, orig_affinity_tag; + uint32_t i; + uint32_t activecount, busycount; + uint32_t us_to_wait; + int start_timer = 0; + int adjust_counters = 1; + uint64_t curtime; + int error; + struct threadlist *tl = NULL; + struct threadlist *ttl = NULL; + struct workitemlist *wl = NULL; + + WORKQUEUE_ASSERT_LOCKED(p); + /* + * From here until we drop the workq lock we can't be pre-empted. + * This is important since we have to independently update the priority + * and affinity that the thread is associated with and these values are + * used to index the multi-dimensional counter arrays in + * 'twq_callback'. + */ + if (oc_item) { + uint32_t min_scheduled = 0; + uint32_t scheduled_count; + uint32_t active_count; + uint32_t t_affinity = 0; + + priority = oc_prio; + item = oc_item; + + if ((affinity_tag = oc_affinity) == (uint32_t)-1) { + /* + * CPU affinity is not assigned yet. + */ + for (affinity_tag = 0; + affinity_tag < wq->wq_reqconc[priority]; + affinity_tag++) { + /* + * Look for the affinity group with the + * least number of threads. + */ + scheduled_count = 0; + active_count = 0; + + for (i = 0; i <= priority; i++) { + scheduled_count += + wq->wq_thscheduled_count[i][affinity_tag]; + active_count += + wq->wq_thactive_count[i][affinity_tag]; + } + if (active_count == 0) { + t_affinity = affinity_tag; + break; + } + if (affinity_tag == 0 || + scheduled_count < min_scheduled) { + min_scheduled = scheduled_count; + t_affinity = affinity_tag; + } + } + affinity_tag = t_affinity; + } + goto grab_idle_thread; + } + if (wq->wq_itemcount == 0) { + if ((th_to_park = thread) == NULL) + goto out_of_work; + goto parkit; + } + for (priority = 0; priority < WORKQ_OS_NUMPRIOS; priority++) { + if (wq->wq_list_bitmap & (1 << priority)) { + wl = (struct workitemlist *)&wq->wq_list[priority]; + break; + } + } + KASSERT(wl != NULL, ("[%s:%d] workq list is NULL", __FILE__, __LINE__)); + KASSERT(!(TAILQ_EMPTY(&wl->wl_itemlist)), + ("[%s:%d] workq list is empty", __FILE__, __LINE__)); + + curtime = twq_microuptime(); + + if (thread != NULL) { + tl = thread->td_threadlist; + KASSERT(tl != NULL, ("[%s:%d] tl = NULL", __FILE__, __LINE__)); + affinity_tag = tl->th_affinity_tag; + + /* + * Check to see if the affinity group this thread is + * associated with is still within the bounds of the + * specified concurrency for the priority level we're + * considering running work for. + */ + if (affinity_tag < wq->wq_reqconc[priority]) { + /* + * We're a worker thread from the pool. Currently + * we are considered 'active' which means we're counted + * in 'wq_thactive_count'. Add up the active counts + * of all the priority levels up to and including + * the one we want to schedule. + */ + for (activecount = 0, i = 0; i <= priority; i++) { + uint32_t acount; + + acount = + wq->wq_thactive_count[i][affinity_tag]; + if (acount == 0 && + wq->wq_thscheduled_count[i][affinity_tag]){ + if (twq_thr_busy(curtime, + &wq->wq_lastblocked_ts[i] + [affinity_tag])) + acount = 1; + } + activecount += acount; + } + if (activecount == 1) { + /* + * We're the only active thread associated + * with our affinity group at this priority + * level and higher so pick up some work and + * keep going. + */ + th_to_run = thread; + goto pick_up_work; + } + } + + /* + * There's more than one thread running in this affinity group + * or the concurrency level has been cut back for this priority. + * Let's continue on and look for an 'empty' group to run this + * work item in. + */ + } + busycount = 0; + + for (affinity_tag = 0; affinity_tag < wq->wq_reqconc[priority]; + affinity_tag++) { + /* + * Look for first affinity group that is currently not active + * (i.e. no active threads at this priority level of higher + * and no threads that have run recently). + */ + for (activecount = 0, i = 0; i <= priority; i++) { + if ((activecount = + wq->wq_thactive_count[i][affinity_tag]) != 0) + break; + + if (wq->wq_thscheduled_count[i][affinity_tag] != 0) { + if (twq_thr_busy(curtime, + &wq->wq_lastblocked_ts[i][affinity_tag])) { + busycount++; + break; + } + } + } + if (activecount == 0 && busycount == 0) + break; + } + if (affinity_tag >= wq->wq_reqconc[priority]) { + /* + * We've already got at least 1 thread per affinity group in + * the active state. + */ + if (busycount) { + /* + * We found at least 1 thread in the 'busy' state. + * Make sure we start the timer because if they are the + * threads keeping us from scheduling this workitem then + * we won't get a callback to kick off the timer. We + * need to start i now. + */ + WQ_TIMER_NEEDED(wq, start_timer); + } + + if (thread != NULL) { + /* + * Go park this one for later. + */ + th_to_park = thread; + goto parkit; + } + goto out_of_work; + } + if (thread != NULL) { + /* + * We're overbooked on the affinity group this thread is + * currently associated with but we have work to do and + * at least 1 idle processor. Therefore, we we'll just + * retarget this thread to the new affinity group. + */ + th_to_run = thread; + goto pick_up_work; + } + if (wq->wq_thidlecount == 0) { + /* + * We don't have a thread to schedule but we have work to + * do and at least 1 affinity group doesn't currently have + * an active thread. + */ + WQ_TIMER_NEEDED(wq, start_timer); + goto no_thread_to_run; + } + +grab_idle_thread: + /* + * We've got a candidate (affinity group with no currently active + * threads) to start a new thread on. We already know there is both + * work available and an idle thread, so activate a thread and then + * fall into the code that pulls a new workitem (pick_up_work). + */ + TAILQ_FOREACH(ttl, &wq->wq_thidlelist, th_entry) { + if (ttl->th_affinity_tag == affinity_tag || + ttl->th_affinity_tag == (uint16_t)-1) { + TAILQ_REMOVE(&wq->wq_thidlelist, ttl, th_entry); + tl = ttl; + + break; + } + } + if (tl == NULL) { + tl = TAILQ_FIRST(&wq->wq_thidlelist); + TAILQ_REMOVE(&wq->wq_thidlelist, tl, th_entry); + } + wq->wq_thidlecount--; + + TAILQ_INSERT_TAIL(&wq->wq_thrunlist, tl, th_entry); + + if ((tl->th_flags & TH_LIST_UNSCHEDULED) == TH_LIST_UNSCHEDULED) { + tl->th_flags &= ~TH_LIST_UNSCHEDULED; + tl->th_flags |= TH_LIST_SBUSY; + + thread_lock(tl->th_thread); + twq_set_schedcallback(tl->th_thread, twq_callback); + thread_unlock(tl->th_thread); + + } else if ((tl->th_flags & TH_LIST_BLOCKED) == TH_LIST_BLOCKED) { + tl->th_flags &= ~TH_LIST_BLOCKED; + tl->th_flags |= TH_LIST_BUSY; + wake_thread = 1; + } + tl->th_flags |= TH_LIST_RUNNING; + + wq->wq_threads_scheduled++; + wq->wq_thscheduled_count[priority][affinity_tag]++; + atomic_add_32(&wq->wq_thactive_count[priority][affinity_tag], 1); + + adjust_counters = 0; + th_to_run = tl->th_thread; + +pick_up_work: + if (item == 0) { + witem = TAILQ_FIRST(&wl->wl_itemlist); + TAILQ_REMOVE(&wl->wl_itemlist, witem, wi_entry); + + if (TAILQ_EMPTY(&wl->wl_itemlist)) + wq->wq_list_bitmap &= ~(1 << priority); + wq->wq_itemcount--; + + item = witem->wi_item; + witem->wi_item = (void *)0; + witem->wi_affinity = 0; + TAILQ_INSERT_HEAD(&wl->wl_freelist, witem, wi_entry); + } + orig_priority = tl->th_priority; + orig_affinity_tag = tl->th_affinity_tag; + + tl->th_priority = priority; + tl->th_affinity_tag = affinity_tag; + + if (adjust_counters && + (orig_priority != priority || orig_affinity_tag != affinity_tag)) { + /* + * We need to adjust these counters based on this thread's + * new disposition w/r to affinity and priority. + */ + atomic_add_int(&wq->wq_thactive_count[orig_priority] + [orig_affinity_tag], -1); + atomic_add_int(&wq->wq_thactive_count[priority][affinity_tag], + 1); + wq->wq_thscheduled_count[orig_priority][orig_affinity_tag]--; + wq->wq_thscheduled_count[priority][affinity_tag]++; + } + wq->wq_thread_yielded_count = 0; + + WORKQUEUE_UNLOCK(p); + + if (orig_affinity_tag != affinity_tag) { + /* + * This thread's affinity does not match the affinity group + * it's being placed on (it's either a brand new thread or + * we're retargeting an existing thread to a new group). + * An affinity tag of 0 means no affinity but we want our + * tags to be 0 based because they are used to index arrays + * so keep it 0 based on internally and bump by 1 when + * calling out to set it. + */ + + /* XXX - Not used yet. */ +#if 0 + CPU_ZERO(&mask); + CPU_SET(affinity_tag, &mask); + cpuset_setthread(th_to_run->td_tid, &mask); +#endif + ; + } + if (orig_priority != priority) { + /* + * XXX Set thread priority. + * + * Can't simply just set thread priority here since: + * + * (1) The thread priority of TIMESHARE priority class is + * adjusted by the scheduler and there doesn't seem to be + * a per-thread 'nice' parameter. + * + * (2) We really shouldn't use the REALTIME class since + * thread workqueues doesn't require the process to have + * privilege. + * + * (3) Could maybe use IDLE priority class for + * WORKQ_LOW_PRIOQUUE. + * + * Need to figure out something here. + */ + ; + } + twq_runitem(p, item, th_to_run, tl, wake_thread); + + return (1); + +out_of_work: + /* + * We have no work to do or we are fully booked w/r to running threads. + */ +no_thread_to_run: + WORKQUEUE_UNLOCK(p); + + if (start_timer) + twq_interval_timer_start(wq); + + return (0); + +parkit: + /* + * This is a workqueue thread with no more work to do. + * Park it for now. + */ + + KASSERT(th_to_park == curthread, + ("[%s, %d] twq_run_nextitem: th_to_park is not current thread", + __FILE__, __LINE__)); + + tl = th_to_park->td_threadlist; + if (tl == 0) + panic("wq thread with no threadlist "); + + TAILQ_REMOVE(&wq->wq_thrunlist, tl, th_entry); + tl->th_flags &= ~TH_LIST_RUNNING; + + tl->th_flags |= TH_LIST_BLOCKED; + TAILQ_INSERT_HEAD(&wq->wq_thidlelist, tl, th_entry); + + thread_lock(th_to_park); + twq_set_schedcallback(th_to_park, NULL); + thread_unlock(th_to_park); + + atomic_add_32(&wq->wq_thactive_count[tl->th_priority] + [tl->th_affinity_tag], -1); + wq->wq_thscheduled_count[tl->th_priority][tl->th_affinity_tag]--; + wq->wq_threads_scheduled--; + + if (wq->wq_thidlecount < 100) + us_to_wait = wq_reduce_pool_window_usecs - + (wq->wq_thidlecount * (wq_reduce_pool_window_usecs / 100)); + else + us_to_wait = wq_reduce_pool_window_usecs / 100; + + wq->wq_thidlecount++; + + if (start_timer) + twq_interval_timer_start(wq); + + /* + * XXX I may be imaging things but it seems that only one + * thread will get unparked when a bunch are parked. Need + * to look at this closer. + */ +sleep_again: + error = msleep(tl, &p->p_twqlock, PCATCH, "twq_thpark", + twq_usecstoticks(us_to_wait)); + if (error == 0 || error == EWOULDBLOCK) { + if (twq_unpark(th_to_park, error == EWOULDBLOCK) != 0) + goto sleep_again; + } else + WORKQUEUE_UNLOCK(p); + + return (0); /* returning to system call handler */ +} + +static int +twq_unpark(struct thread *td, int timedout) +{ + struct threadlist *tl; + struct proc *p = curproc; + struct thrworkq *wq = p->p_twq; + + KASSERT(td == curthread, ("[%s: %d] twq_unpark: td != curthread", + __FILE__, __LINE__)); + WORKQUEUE_ASSERT_LOCKED(p); + + if (wq == NULL || (tl = td->td_threadlist) == NULL) { + WORKQUEUE_UNLOCK(p); + return (0); + } + + if (timedout) { + if ( !(tl->th_flags & TH_LIST_RUNNING)) { + /* + * The timer popped us out and we've not been + * moved off of the idle list so we should now + * self-destruct. + * + * twq_removethread() consumes the workq lock. + */ + twq_removethread(tl); + return (0); + } + + while (tl->th_flags & TH_LIST_BUSY) { + + /* + * The timer woke us up, but we have already started to + * make this a runnable thread, but have not yet + * finished that process so wait for the normal wakeup. + * Set the timer again in case we miss the wakeup in + * a race condition. + */ + /* Keep the workq lock held. */ + return (1); + } + } + + KASSERT(((tl->th_flags & (TH_LIST_RUNNING | TH_LIST_BUSY)) == + TH_LIST_RUNNING), ("[%s: %d] twq_unpark: !TH_LIST_RUNNING", + __FILE__, __LINE__)); + + /* + * A normal wakeup of this thread occurred. + * No need for any synchronization with the + * timer and twq_runitem + */ + thread_lock(td); + twq_set_schedcallback(td, twq_callback); + thread_unlock(td); + + WORKQUEUE_UNLOCK(p); + return (0); +} + +static void +twq_runitem(struct proc *p, void *item, struct thread *td, + struct threadlist *tl, int wake_thread) +{ + + KASSERT(p->p_twq != NULL, ("[%s: %d] twq_runitem: wq = NULL", + __FILE__, __LINE__)); + + if (wake_thread) { + twq_set_upcall(tl, WQ_UPCALL_WORKQ, item); + WORKQUEUE_LOCK(p); + tl->th_flags &= ~TH_LIST_BUSY; + wakeup(tl); + WORKQUEUE_UNLOCK(p); + } else { + twq_set_upcall(tl, WQ_UPCALL_NEWTD, item); + WORKQUEUE_LOCK(p); + if (tl->th_flags & TH_LIST_SBUSY) { + tl->th_flags &= ~TH_LIST_SBUSY; + twq_schedthr(td); + } + WORKQUEUE_UNLOCK(p); + } +} + +#else /* ! THRWORKQ */ + +#include + +int +sys_thr_workq(struct thread *td, struct thr_workq_args *uap) +{ + + return (ENOSYS); +} + +#endif /* ! THRWORKQ */ Index: sys/kern/p1003_1b.c =================================================================== --- sys/kern/p1003_1b.c +++ sys/kern/p1003_1b.c @@ -37,6 +37,7 @@ __FBSDID("$FreeBSD$"); #include "opt_posix.h" +#include "opt_thrworkq.h" #include #include @@ -52,6 +53,9 @@ #include #include #include +#ifdef THRWORKQ +#include +#endif MALLOC_DEFINE(M_P31B, "p1003.1b", "Posix 1003.1B"); @@ -292,6 +296,9 @@ sys_sched_yield(struct thread *td, struct sched_yield_args *uap) { +#ifdef THRWORKQ + thrworkq_thread_yielded(); +#endif sched_relinquish(curthread); return 0; } Index: sys/kern/syscalls.c =================================================================== --- sys/kern/syscalls.c +++ sys/kern/syscalls.c @@ -473,8 +473,8 @@ "thr_set_name", /* 464 = thr_set_name */ "aio_fsync", /* 465 = aio_fsync */ "rtprio_thread", /* 466 = rtprio_thread */ - "#467", /* 467 = nosys */ - "#468", /* 468 = nosys */ + "thr_stack", /* 467 = thr_stack */ + "thr_workq", /* 468 = thr_workq */ "#469", /* 469 = __getpath_fromfd */ "#470", /* 470 = __getpath_fromaddr */ "sctp_peeloff", /* 471 = sctp_peeloff */ Index: sys/kern/systrace_args.c =================================================================== --- sys/kern/systrace_args.c +++ sys/kern/systrace_args.c @@ -2487,6 +2487,22 @@ *n_args = 3; break; } + /* thr_stack */ + case 467: { + struct thr_stack_args *p = params; + uarg[0] = p->stacksize; /* size_t */ + uarg[1] = p->guardsize; /* size_t */ + *n_args = 2; + break; + } + /* thr_workq */ + case 468: { + struct thr_workq_args *p = params; + iarg[0] = p->cmd; /* int */ + uarg[1] = (intptr_t) p->args; /* struct twq_param * */ + *n_args = 2; + break; + } /* sctp_peeloff */ case 471: { struct sctp_peeloff_args *p = params; @@ -7308,6 +7324,32 @@ break; }; break; + /* thr_stack */ + case 467: + switch(ndx) { + case 0: + p = "size_t"; + break; + case 1: + p = "size_t"; + break; + default: + break; + }; + break; + /* thr_workq */ + case 468: + switch(ndx) { + case 0: + p = "int"; + break; + case 1: + p = "struct twq_param *"; + break; + default: + break; + }; + break; /* sctp_peeloff */ case 471: switch(ndx) { @@ -10187,6 +10229,16 @@ if (ndx == 0 || ndx == 1) p = "int"; break; + /* thr_stack */ + case 467: + if (ndx == 0 || ndx == 1) + p = "caddr_t"; + break; + /* thr_workq */ + case 468: + if (ndx == 0 || ndx == 1) + p = "int"; + break; /* sctp_peeloff */ case 471: if (ndx == 0 || ndx == 1) Index: sys/sys/proc.h =================================================================== --- sys/sys/proc.h +++ sys/sys/proc.h @@ -182,6 +182,10 @@ struct thread; struct trapframe; struct turnstile; +struct thrworkq; +struct threadlist; + +typedef void (*mi_switchcb_t)(int, struct thread *); /* * XXX: Does this belong in resource.h or resourcevar.h instead? @@ -339,7 +343,11 @@ struct proc *td_rfppwait_p; /* (k) The vforked child */ struct vm_page **td_ma; /* (k) uio pages held */ int td_ma_cnt; /* (k) size of *td_ma */ + mi_switchcb_t td_cswitchcb; /* (k) context switch callback. */ + struct threadlist *td_threadlist; /* (?) thread workq thread list. */ + void *td_reuse_stack; /* (?) reuse workq thread stack. */ void *td_emuldata; /* Emulator state data */ + void *td_machdata; /* (k) mach state. */ int td_lastcpu; /* (t) Last cpu we were on. */ int td_oncpu; /* (t) Which cpu we are on. */ void *td_lkpi_task; /* LinuxKPI task struct pointer */ @@ -404,7 +412,7 @@ #define TDF_THRWAKEUP 0x00100000 /* Libthr thread must not suspend itself. */ #define TDF_SEINTR 0x00200000 /* EINTR on stop attempts. */ #define TDF_SWAPINREQ 0x00400000 /* Swapin request due to wakeup. */ -#define TDF_UNUSED23 0x00800000 /* --available-- */ +#define TDF_WORKQ 0x00800000 /* a workq thread */ #define TDF_SCHED0 0x01000000 /* Reserved for scheduler private use */ #define TDF_SCHED1 0x02000000 /* Reserved for scheduler private use */ #define TDF_SCHED2 0x04000000 /* Reserved for scheduler private use */ @@ -465,6 +473,7 @@ #define TDP_UIOHELD 0x10000000 /* Current uio has pages held in td_ma */ #define TDP_FORKING 0x20000000 /* Thread is being created through fork() */ #define TDP_EXECVMSPC 0x40000000 /* Execve destroyed old vmspace */ +#define TDP_UNUSUED32 0x80000000 /* Mach initialization done */ /* * Reasons that the current thread can not be run yet. @@ -660,6 +669,10 @@ */ LIST_ENTRY(proc) p_orphan; /* (e) List of orphan processes. */ LIST_HEAD(, proc) p_orphans; /* (e) Pointer to list of orphans. */ + vm_offset_t p_thrstack; /* ( ) next addr for thread stack */ + struct mtx p_twqlock; /* (n) thread workqueue lock. */ + struct thrworkq *p_twq; /* (^) thread workqueue. */ + void *p_machdata; /* (c) Mach state data. */ }; #define p_session p_pgrp->pg_session @@ -773,6 +786,9 @@ #define SW_VOL 0x0100 /* Voluntary switch. */ #define SW_INVOL 0x0200 /* Involuntary switch. */ #define SW_PREEMPT 0x0400 /* The invol switch is a preemption */ +/* Callback type. */ +#define SWCB_BLOCK 1 /* Thread is about to block. */ +#define SWCB_UNBLOCK 2 /* Thread was just unblocked. */ /* How values for thread_single(). */ #define SINGLE_NO_EXIT 0 Index: sys/sys/syscall.h =================================================================== --- sys/sys/syscall.h +++ sys/sys/syscall.h @@ -389,6 +389,8 @@ #define SYS_thr_set_name 464 #define SYS_aio_fsync 465 #define SYS_rtprio_thread 466 +#define SYS_thr_stack 467 +#define SYS_thr_workq 468 #define SYS_sctp_peeloff 471 #define SYS_sctp_generic_sendmsg 472 #define SYS_sctp_generic_sendmsg_iov 473 Index: sys/sys/syscall.mk =================================================================== --- sys/sys/syscall.mk +++ sys/sys/syscall.mk @@ -318,6 +318,8 @@ thr_set_name.o \ aio_fsync.o \ rtprio_thread.o \ + thr_stack.o \ + thr_workq.o \ sctp_peeloff.o \ sctp_generic_sendmsg.o \ sctp_generic_sendmsg_iov.o \ Index: sys/sys/syscallsubr.h =================================================================== --- sys/sys/syscallsubr.h +++ sys/sys/syscallsubr.h @@ -270,6 +270,8 @@ int kern_thr_alloc(struct proc *, int pages, struct thread **); int kern_thr_exit(struct thread *td); int kern_thr_new(struct thread *td, struct thr_param *param); +int kern_thr_stack(struct proc *p, void **addr, vm_size_t stacksz, + vm_size_t guardsz); int kern_thr_suspend(struct thread *td, struct timespec *tsp); int kern_truncate(struct thread *td, char *path, enum uio_seg pathseg, off_t length); Index: sys/sys/sysproto.h =================================================================== --- sys/sys/sysproto.h +++ sys/sys/sysproto.h @@ -1314,6 +1314,14 @@ char lwpid_l_[PADL_(lwpid_t)]; lwpid_t lwpid; char lwpid_r_[PADR_(lwpid_t)]; char rtp_l_[PADL_(struct rtprio *)]; struct rtprio * rtp; char rtp_r_[PADR_(struct rtprio *)]; }; +struct thr_stack_args { + char stacksize_l_[PADL_(size_t)]; size_t stacksize; char stacksize_r_[PADR_(size_t)]; + char guardsize_l_[PADL_(size_t)]; size_t guardsize; char guardsize_r_[PADR_(size_t)]; +}; +struct thr_workq_args { + char cmd_l_[PADL_(int)]; int cmd; char cmd_r_[PADR_(int)]; + char args_l_[PADL_(struct twq_param *)]; struct twq_param * args; char args_r_[PADR_(struct twq_param *)]; +}; struct sctp_peeloff_args { char sd_l_[PADL_(int)]; int sd; char sd_r_[PADR_(int)]; char name_l_[PADL_(uint32_t)]; uint32_t name; char name_r_[PADR_(uint32_t)]; @@ -2055,6 +2063,8 @@ int sys_thr_set_name(struct thread *, struct thr_set_name_args *); int sys_aio_fsync(struct thread *, struct aio_fsync_args *); int sys_rtprio_thread(struct thread *, struct rtprio_thread_args *); +int sys_thr_stack(struct thread *, struct thr_stack_args *); +int sys_thr_workq(struct thread *, struct thr_workq_args *); int sys_sctp_peeloff(struct thread *, struct sctp_peeloff_args *); int sys_sctp_generic_sendmsg(struct thread *, struct sctp_generic_sendmsg_args *); int sys_sctp_generic_sendmsg_iov(struct thread *, struct sctp_generic_sendmsg_iov_args *); @@ -2945,6 +2955,8 @@ #define SYS_AUE_thr_set_name AUE_NULL #define SYS_AUE_aio_fsync AUE_AIO_FSYNC #define SYS_AUE_rtprio_thread AUE_RTPRIO +#define SYS_AUE_thr_stack AUE_NULL +#define SYS_AUE_thr_workq AUE_NULL #define SYS_AUE_sctp_peeloff AUE_SCTP_PEELOFF #define SYS_AUE_sctp_generic_sendmsg AUE_SCTP_GENERIC_SENDMSG #define SYS_AUE_sctp_generic_sendmsg_iov AUE_SCTP_GENERIC_SENDMSG_IOV Index: sys/sys/thr.h =================================================================== --- sys/sys/thr.h +++ sys/sys/thr.h @@ -44,6 +44,11 @@ /* Create the system scope thread. */ #define THR_SYSTEM_SCOPE 0x0002 +/* Default thread stack size. */ +#define THR_STACK_DEFAULT (sizeof(void *) / 4 * 1024 * 1024) +/* Initial (main) thread's stack size. */ +#define THR_STACK_INITIAL (THR_STACK_DEFAULT * 2) + struct thr_param { void (*start_func)(void *); /* thread entry function. */ void *arg; /* argument for entry function. */ @@ -79,6 +84,7 @@ int thr_suspend(const struct timespec *timeout); int thr_wake(long id); int thr_set_name(long id, const char *name); +void *thr_stack(size_t stacksize, size_t guardsize); __END_DECLS #endif /* !_KERNEL */ Index: sys/sys/thrworkq.h =================================================================== --- /dev/null +++ sys/sys/thrworkq.h @@ -0,0 +1,124 @@ +/*- + * Copyright (c) 2009-2014 Stacey Son + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice unmodified, this list of conditions, and the following + * disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. + * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * $FreeBSD$ + * + */ + +#ifndef _SYS_THRWORKQ_H_ +#define _SYS_THRWORKQ_H_ + +/* + * thr_workq() system call commands. + */ +#define WQOPS_INIT 1 +#define WQOPS_QUEUE_ADD 2 +#define WQOPS_QUEUE_REMOVE 3 +#define WQOPS_THREAD_RETURN 4 +#define WQOPS_THREAD_SETCONC 5 + +/* + * Workqueue priority flags. + */ +#define WORKQUEUE_OVERCOMMIT 0x10000 /* Attempt to start new thread if + can't run immediately even if + it requires overcommitting + resources. */ + +/* + * Kernel workqueue limits and sizing defaults. + */ +#define WORKQ_OS_ELEM_MAX 64 /* Max number of work items pending in + workq. */ +#define WORKQ_OS_NUMPRIOS 3 /* Number of workq priority levels. */ + +struct wqa_init { + int *retid; /* workqueue ID returned */ + void (*workqfunc)(void *); /* workq entry function */ + void (*newtdfunc)(void *); /* new thread startup function */ + void (*exitfunc)(void *); /* thread shutdown function */ + size_t stacksize; /* per worker thread stack size */ + size_t guardsize; /* per worker thread stack guard size */ +}; + +struct wqa_qadd { + void *item; /* work item (arg to workq func) */ + int prio; /* item priority */ + int affin; /* item CPU affinity */ +}; + +struct wqa_qrm { + void *item; /* work item */ + int prio; /* item priority */ +}; + +struct wqa_setconc { + int prio; /* priority queue */ + int conc; /* request concurrency */ +}; + +struct twq_param { + int twq_id; + union { + struct wqa_init init; + struct wqa_qadd qadd; + struct wqa_qrm qrm; + struct wqa_setconc setconc; + } a; + +#define twq_retid a.init.retid +#define twq_workqfunc a.init.workqfunc +#define twq_newtdfunc a.init.newtdfunc +#define twq_exitfunc a.init.exitfunc +#define twq_stacksize a.init.stacksize +#define twq_guardsize a.init.guardsize + +#define twq_add_item a.qadd.item +#define twq_add_prio a.qadd.prio +#define twq_add_affin a.qadd.affin + +#define twq_rm_item a.qrm.item +#define twq_rm_prio a.qrm.prio + +#define twq_setconc_prio a.setconc.prio +#define twq_setconc_conc a.setconc.conc +}; + +#ifdef _KERNEL +#include + +extern void thrworkq_exit(struct proc *p); +extern int thrworkq_newthread(struct thread *td, void *func, void *arg, + stack_t *stack); +extern void thrworkq_reusestack(struct proc *p, void *stackaddr); +extern void thrworkq_thread_yielded(void); + +#else + +int thr_workq(int cmd, struct twq_param *args); + +#endif /* _KERNEL */ + +#endif /* ! _SYS_THRWORKQ_H_ */