diff --git a/sys/kern/vfs_aio.c b/sys/kern/vfs_aio.c --- a/sys/kern/vfs_aio.c +++ b/sys/kern/vfs_aio.c @@ -46,6 +46,7 @@ #include #include #include +#include #include #include #include @@ -172,6 +173,10 @@ SYSCTL_INT(_vfs_aio, OID_AUTO, max_buf_aio, CTLFLAG_RW, &max_buf_aio, 0, "Maximum buf aio requests per process"); +static unsigned int num_aio_sc = 1; +SYSCTL_INT(_vfs_aio, OID_AUTO, num_aio_sc, CTLFLAG_RDTUN, &num_aio_sc, 0, + "Number of AIO software contexts"); + /* * Though redundant with vfs.aio.max_aio_queue_per_proc, POSIX requires * sysconf(3) to support AIO_LISTIO_MAX, and we implement that with @@ -301,10 +306,25 @@ int (*store_aiocb)(struct aiocb **ujobp, struct aiocb *ujob); }; -static TAILQ_HEAD(,aioproc) aio_freeproc; /* (c) Idle daemons */ -static struct sema aio_newproc_sem; -static struct mtx aio_job_mtx; -static TAILQ_HEAD(,kaiocb) aio_jobs; /* (c) Async job list */ +struct aio_softc { + TAILQ_HEAD(,aioproc) aio_freeproc; /* (c) Idle daemons */ + struct sema aio_newproc_sem; + struct mtx aio_job_mtx; + TAILQ_HEAD(,kaiocb) aio_jobs; /* (c) Async job list */ +}; + + +struct aio_softc *aio_scs; + +static __noinline struct aio_softc * +aio_proc_to_softc(struct proc *p) +{ + unsigned int idx; + + idx = p->p_pid % num_aio_sc; + return (&aio_scs[idx]); +} + static struct unrhdr *aiod_unr; static void aio_biocleanup(struct bio *bp); @@ -315,7 +335,7 @@ static void aio_process_sync(struct kaiocb *job); static void aio_process_mlock(struct kaiocb *job); static void aio_schedule_fsync(void *context, int pending); -static int aio_newproc(int *); +static int aio_newproc(int *, struct proc *); int aio_aqueue(struct thread *td, struct aiocb *ujob, struct aioliojob *lio, int type, struct aiocb_ops *ops); static int aio_queue_file(struct file *fp, struct kaiocb *job); @@ -328,7 +348,7 @@ static void aio_bio_done_notify(struct proc *userp, struct kaiocb *job); static bool aio_clear_cancel_function_locked(struct kaiocb *job); static int aio_kick(struct proc *userp); -static void aio_kick_nowait(struct proc *userp); +static void aio_kick_nowait(struct aio_softc *sc, struct proc *userp); static void aio_kick_helper(void *context, int pending); static int filt_aioattach(struct knote *kn); static void filt_aiodetach(struct knote *kn); @@ -399,6 +419,18 @@ static int aio_onceonly(void) { + struct aio_softc *sc; + int i; + + /* + * Autotune the context count to 2x the number of + * cores so as to reduce chances of a collision between + * two active processes using aio. + */ + + if (num_aio_sc == 0) { + num_aio_sc = 2 * mp_ncpus; + } exit_tag = EVENTHANDLER_REGISTER(process_exit, aio_proc_rundown, NULL, EVENTHANDLER_PRI_ANY); @@ -406,10 +438,14 @@ NULL, EVENTHANDLER_PRI_ANY); kqueue_add_filteropts(EVFILT_AIO, &aio_filtops); kqueue_add_filteropts(EVFILT_LIO, &lio_filtops); - TAILQ_INIT(&aio_freeproc); - sema_init(&aio_newproc_sem, 0, "aio_new_proc"); - mtx_init(&aio_job_mtx, "aio_job", NULL, MTX_DEF); - TAILQ_INIT(&aio_jobs); + aio_scs = malloc(sizeof(*aio_scs) * num_aio_sc, M_AIO, M_WAITOK | M_ZERO); + for (i = 0; i < num_aio_sc; i++) { + sc = &aio_scs[i]; + TAILQ_INIT(&sc->aio_freeproc); + sema_init(&sc->aio_newproc_sem, 0, "aio_new_proc"); + mtx_init(&sc->aio_job_mtx, "aio_job", NULL, MTX_DEF); + TAILQ_INIT(&sc->aio_jobs); + } aiod_unr = new_unrhdr(1, INT_MAX, NULL); kaio_zone = uma_zcreate("AIO", sizeof(struct kaioinfo), NULL, NULL, NULL, NULL, UMA_ALIGN_PTR, 0); @@ -459,8 +495,8 @@ uma_zfree(kaio_zone, ki); } - while (num_aio_procs < MIN(target_aio_procs, max_aio_procs)) - aio_newproc(NULL); + while (atomic_load_acq_int(&num_aio_procs) < MIN(target_aio_procs, max_aio_procs)) + aio_newproc(NULL, p); } static int @@ -688,20 +724,20 @@ * Select a job to run (called by an AIO daemon). */ static struct kaiocb * -aio_selectjob(struct aioproc *aiop) +aio_selectjob(struct aio_softc *sc, struct aioproc *aiop) { struct kaiocb *job; struct kaioinfo *ki; struct proc *userp; - mtx_assert(&aio_job_mtx, MA_OWNED); + mtx_assert(&sc->aio_job_mtx, MA_OWNED); restart: - TAILQ_FOREACH(job, &aio_jobs, list) { + TAILQ_FOREACH(job, &sc->aio_jobs, list) { userp = job->userproc; ki = userp->p_aioinfo; if (ki->kaio_active_count < max_aio_per_proc) { - TAILQ_REMOVE(&aio_jobs, job, list); + TAILQ_REMOVE(&sc->aio_jobs, job, list); if (!aio_clear_cancel_function(job)) goto restart; @@ -1056,12 +1092,16 @@ vmspace_switch_aio(job->userproc->p_vmspace); } +struct aiod_args { + struct aio_softc *sc; + int id; +}; /* * The AIO daemon, most of the actual work is done in aio_process_*, * but the setup (and address space mgmt) is done in this routine. */ static void -aio_daemon(void *_id) +aio_daemon(void *_args) { struct kaiocb *job; struct aioproc *aiop; @@ -1069,7 +1109,9 @@ struct proc *p; struct vmspace *myvm; struct thread *td = curthread; - int id = (intptr_t)_id; + struct aiod_args *args = _args; + struct aio_softc *sc = args->sc; + int id = args->id; /* * Grab an extra reference on the daemon's vmspace so that it @@ -1093,28 +1135,28 @@ * Wakeup parent process. (Parent sleeps to keep from blasting away * and creating too many daemons.) */ - sema_post(&aio_newproc_sem); + sema_post(&sc->aio_newproc_sem); - mtx_lock(&aio_job_mtx); + mtx_lock(&sc->aio_job_mtx); for (;;) { /* * Take daemon off of free queue */ if (aiop->aioprocflags & AIOP_FREE) { - TAILQ_REMOVE(&aio_freeproc, aiop, list); + TAILQ_REMOVE(&sc->aio_freeproc, aiop, list); aiop->aioprocflags &= ~AIOP_FREE; } /* * Check for jobs. */ - while ((job = aio_selectjob(aiop)) != NULL) { - mtx_unlock(&aio_job_mtx); + while ((job = aio_selectjob(sc, aiop)) != NULL) { + mtx_unlock(&sc->aio_job_mtx); ki = job->userproc->p_aioinfo; job->handle_fn(job); - mtx_lock(&aio_job_mtx); + mtx_lock(&sc->aio_job_mtx); /* Decrement the active job count. */ ki->kaio_active_count--; } @@ -1123,9 +1165,9 @@ * Disconnect from user address space. */ if (p->p_vmspace != myvm) { - mtx_unlock(&aio_job_mtx); + mtx_unlock(&sc->aio_job_mtx); vmspace_switch_aio(myvm); - mtx_lock(&aio_job_mtx); + mtx_lock(&sc->aio_job_mtx); /* * We have to restart to avoid race, we only sleep if * no job can be selected. @@ -1133,24 +1175,24 @@ continue; } - mtx_assert(&aio_job_mtx, MA_OWNED); + mtx_assert(&sc->aio_job_mtx, MA_OWNED); - TAILQ_INSERT_HEAD(&aio_freeproc, aiop, list); + TAILQ_INSERT_HEAD(&sc->aio_freeproc, aiop, list); aiop->aioprocflags |= AIOP_FREE; /* * If daemon is inactive for a long time, allow it to exit, * thereby freeing resources. */ - if (msleep(p, &aio_job_mtx, PRIBIO, "aiordy", - aiod_lifetime) == EWOULDBLOCK && TAILQ_EMPTY(&aio_jobs) && + if (msleep(p, &sc->aio_job_mtx, PRIBIO, "aiordy", + aiod_lifetime) == EWOULDBLOCK && TAILQ_EMPTY(&sc->aio_jobs) && (aiop->aioprocflags & AIOP_FREE) && - num_aio_procs > target_aio_procs) + atomic_load_acq_int(&num_aio_procs) > target_aio_procs) break; } - TAILQ_REMOVE(&aio_freeproc, aiop, list); - num_aio_procs--; - mtx_unlock(&aio_job_mtx); + TAILQ_REMOVE(&sc->aio_freeproc, aiop, list); + atomic_subtract_int(&num_aio_procs, 1); + mtx_unlock(&sc->aio_job_mtx); free(aiop, M_AIO); free_unr(aiod_unr, id); vmspace_free(myvm); @@ -1168,25 +1210,29 @@ * AIO daemon modifies its environment itself. */ static int -aio_newproc(int *start) +aio_newproc(int *start, struct proc *userproc) { + struct aiod_args args; + struct aio_softc *sc; int error; struct proc *p; int id; id = alloc_unr(aiod_unr); - error = kproc_create(aio_daemon, (void *)(intptr_t)id, &p, + args.id = id; + args.sc = sc = aio_proc_to_softc(userproc); + error = kproc_create(aio_daemon, (void *)&args, &p, RFNOWAIT, 0, "aiod%d", id); if (error == 0) { /* * Wait until daemon is started. */ - sema_wait(&aio_newproc_sem); - mtx_lock(&aio_job_mtx); - num_aio_procs++; + sema_wait(&sc->aio_newproc_sem); + mtx_lock(&sc->aio_job_mtx); + atomic_add_int(&num_aio_procs, 1); if (start != NULL) (*start)--; - mtx_unlock(&aio_job_mtx); + mtx_unlock(&sc->aio_job_mtx); } else { free_unr(aiod_unr, id); } @@ -1507,7 +1553,7 @@ int opcode; int error; int fd, kqfd; - int jid; + u_long jid; u_short evflags; if (p->p_aioinfo == NULL) @@ -1629,10 +1675,8 @@ job->fd_file = fp; - mtx_lock(&aio_job_mtx); - jid = jobrefid++; - job->seqno = jobseqno++; - mtx_unlock(&aio_job_mtx); + jid = atomic_fetchadd_long(&jobrefid, 1); + job->seqno = atomic_fetchadd_64(&jobseqno, 1); error = ops->store_kernelinfo(ujob, jid); if (error) { error = EINVAL; @@ -1745,28 +1789,32 @@ static void aio_cancel_daemon_job(struct kaiocb *job) { + struct aio_softc *sc; - mtx_lock(&aio_job_mtx); + sc = aio_proc_to_softc(job->userproc); + mtx_lock(&sc->aio_job_mtx); if (!aio_cancel_cleared(job)) - TAILQ_REMOVE(&aio_jobs, job, list); - mtx_unlock(&aio_job_mtx); + TAILQ_REMOVE(&sc->aio_jobs, job, list); + mtx_unlock(&sc->aio_job_mtx); aio_cancel(job); } void aio_schedule(struct kaiocb *job, aio_handle_fn_t *func) { + struct aio_softc *sc; - mtx_lock(&aio_job_mtx); + sc = aio_proc_to_softc(job->userproc); + mtx_lock(&sc->aio_job_mtx); if (!aio_set_cancel_function(job, aio_cancel_daemon_job)) { - mtx_unlock(&aio_job_mtx); + mtx_unlock(&sc->aio_job_mtx); aio_cancel(job); return; } job->handle_fn = func; - TAILQ_INSERT_TAIL(&aio_jobs, job, list); - aio_kick_nowait(job->userproc); - mtx_unlock(&aio_job_mtx); + TAILQ_INSERT_TAIL(&sc->aio_jobs, job, list); + aio_kick_nowait(sc, job->userproc); + mtx_unlock(&sc->aio_job_mtx); } static void @@ -1845,17 +1893,17 @@ } static void -aio_kick_nowait(struct proc *userp) +aio_kick_nowait(struct aio_softc *sc, struct proc *userp) { struct kaioinfo *ki = userp->p_aioinfo; struct aioproc *aiop; - mtx_assert(&aio_job_mtx, MA_OWNED); - if ((aiop = TAILQ_FIRST(&aio_freeproc)) != NULL) { - TAILQ_REMOVE(&aio_freeproc, aiop, list); + mtx_assert(&sc->aio_job_mtx, MA_OWNED); + if ((aiop = TAILQ_FIRST(&sc->aio_freeproc)) != NULL) { + TAILQ_REMOVE(&sc->aio_freeproc, aiop, list); aiop->aioprocflags &= ~AIOP_FREE; wakeup(aiop->aioproc); - } else if (num_aio_resv_start + num_aio_procs < max_aio_procs && + } else if (num_aio_resv_start + atomic_load_acq_int(&num_aio_procs) < max_aio_procs && ki->kaio_active_count + num_aio_resv_start < max_aio_per_proc) { taskqueue_enqueue(taskqueue_aiod_kick, &ki->kaio_task); } @@ -1866,20 +1914,22 @@ { struct kaioinfo *ki = userp->p_aioinfo; struct aioproc *aiop; + struct aio_softc *sc; int error, ret = 0; - mtx_assert(&aio_job_mtx, MA_OWNED); + sc = aio_proc_to_softc(userp); + mtx_assert(&sc->aio_job_mtx, MA_OWNED); retryproc: - if ((aiop = TAILQ_FIRST(&aio_freeproc)) != NULL) { - TAILQ_REMOVE(&aio_freeproc, aiop, list); + if ((aiop = TAILQ_FIRST(&sc->aio_freeproc)) != NULL) { + TAILQ_REMOVE(&sc->aio_freeproc, aiop, list); aiop->aioprocflags &= ~AIOP_FREE; wakeup(aiop->aioproc); - } else if (num_aio_resv_start + num_aio_procs < max_aio_procs && + } else if (num_aio_resv_start + atomic_load_acq_int(&num_aio_procs) < max_aio_procs && ki->kaio_active_count + num_aio_resv_start < max_aio_per_proc) { num_aio_resv_start++; - mtx_unlock(&aio_job_mtx); - error = aio_newproc(&num_aio_resv_start); - mtx_lock(&aio_job_mtx); + mtx_unlock(&sc->aio_job_mtx); + error = aio_newproc(&num_aio_resv_start, userp); + mtx_lock(&sc->aio_job_mtx); if (error) { num_aio_resv_start--; goto retryproc; @@ -1894,13 +1944,17 @@ aio_kick_helper(void *context, int pending) { struct proc *userp = context; + struct aio_softc *sc; - mtx_lock(&aio_job_mtx); + sc = aio_proc_to_softc(userp); + if (sc == NULL) + panic("null sc"); + mtx_lock(&sc->aio_job_mtx); while (--pending >= 0) { if (aio_kick(userp)) break; } - mtx_unlock(&aio_job_mtx); + mtx_unlock(&sc->aio_job_mtx); } /*