diff --git a/share/man/man4/aio.4 b/share/man/man4/aio.4 --- a/share/man/man4/aio.4 +++ b/share/man/man4/aio.4 @@ -27,7 +27,7 @@ .\" .\" $FreeBSD$ .\" -.Dd January 2, 2021 +.Dd December 5, 2021 .Dt AIO 4 .Os .Sh NAME @@ -170,6 +170,7 @@ to, its .Va sigev_notify_kevent_flags field may contain +.Dv AIO_KEVENT_FLAG_REAP , .Dv EV_ONESHOT , .Dv EV_CLEAR , and/or .Dv EV_DISPATCH , and its @@ -185,8 +186,30 @@ .It Va udata Ta value stored in .Va aio_sigevent.sigev_value +.It Va data Ta +if +.Dv AIO_KEVENT_FLAG_REAP +is specified, +error or return status, depending on +.Dv EV_ERROR +flag .El .Pp +If +.Dv AIO_KEVENT_FLAG_REAP +is specified in +.Va sigev_notify_kevent_flags , +then it is not necessary to call +.Xr aio_error 2 +and +.Xr aio_return 2 +in the common case. +Note that it is still necessary to query the status +of individual I/O operations explicitly with +.Xr aio_error 2 +after a failed call to +.Xr lio_listio 2 . +.Pp For .Dv SIGEV_SIGNO and diff --git a/sys/kern/vfs_aio.c b/sys/kern/vfs_aio.c --- a/sys/kern/vfs_aio.c +++ b/sys/kern/vfs_aio.c @@ -496,12 +496,30 @@ struct kaioinfo *ki; struct aioliojob *lj; struct proc *p; + struct thread *td; p = job->userproc; - MPASS(curproc == p); ki = p->p_aioinfo; MPASS(ki != NULL); + /* + * The thread argument here is used to find the owning process + * and is also passed to fo_close() which may pass it to various + * places such as devsw close() routines. Because of that, we + * need a thread pointer from the process owning the job that is + * persistent and won't disappear out from under us or move to + * another process. + * + * We use the thread that submitted the IO when freeing jobs + * asynchronously (ie AIO_KEVENT_FLAG_REAP), and we know that is safe + * because aio_thread_exit() waits for submitted IOs to finish. We + * don't want to make aio_thread_exit() wait for a user to call + * aio_return() though, so we drop that reference as soon as possible + * (namely, after updating the submitting thread's rusage counters), + * and use the calling thread here, in that case. + */ + td = job->td ? job->td : curthread; + AIO_LOCK_ASSERT(ki, MA_OWNED); MPASS(job->jobflags & KAIOCB_FINISHED); @@ -521,7 +539,7 @@ if (lj->lioj_count == 0) { TAILQ_REMOVE(&ki->kaio_liojoblist, lj, lioj_list); /* lio is going away, we need to destroy any knotes */ - knlist_delete(&lj->klist, curthread, 1); + knlist_delete(&lj->klist, td, 1); PROC_LOCK(p); sigqueue_take(&lj->lioj_ksi); PROC_UNLOCK(p); @@ -530,35 +548,15 @@ } /* job is going away, we need to destroy any knotes */ - knlist_delete(&job->klist, curthread, 1); + knlist_delete(&job->klist, td, 1); PROC_LOCK(p); sigqueue_take(&job->ksi); PROC_UNLOCK(p); AIO_UNLOCK(ki); - /* - * The thread argument here is used to find the owning process - * and is also passed to fo_close() which may pass it to various - * places such as devsw close() routines. Because of that, we - * need a thread pointer from the process owning the job that is - * persistent and won't disappear out from under us or move to - * another process. - * - * Currently, all the callers of this function call it to remove - * a kaiocb from the current process' job list either via a - * syscall or due to the current process calling exit() or - * execve(). Thus, we know that p == curproc. We also know that - * curthread can't exit since we are curthread. - * - * Therefore, we use curthread as the thread to pass to - * knlist_delete(). This does mean that it is possible for the - * thread pointer at close time to differ from the thread pointer - * at open time, but this is already true of file descriptors in - * a multithreaded process. - */ if (job->fd_file) - fdrop(job->fd_file, curthread); + fdrop(job->fd_file, td); crfree(job->cred); if (job->uiop != &job->uio) free(job->uiop, M_IOV); @@ -970,6 +968,18 @@ &ki->kaio_sync_task); } + /* + * If asynchronous reap was requested, the result was stored in the + * knote data member by KNOTE_UNLOCKED() above, and we can clear our + * knote list and free the job eagerly while it still has a reference + * to the submitting thread (needed for fdrop()). + */ + if (job->uaiocb.aio_sigevent.sigev_notify == SIGEV_KEVENT && + (job->uaiocb.aio_sigevent.sigev_notify_kevent_flags & AIO_KEVENT_FLAG_REAP)) { + knlist_clear(&job->klist, 1); + aio_free_entry(job); + } + /* * Drop our reference to the submitting thread. This allows it to * exit. @@ -1074,6 +1084,12 @@ job->uaiocb._aiocb_private.error = error; job->uaiocb._aiocb_private.status = status; + /* Write the result to the user space object. */ + if (curproc->p_vmspace != job->userproc->p_vmspace) + aio_switch_vmspace(job); + job->ops->store_status(job->ujob, status); + job->ops->store_error(job->ujob, error); + /* * Transfer the resource usage delta to the submitting thread's * counters. @@ -1633,6 +1649,12 @@ /* Save userspace address of the job info. */ job->ujob = ujob; + /* + * Record the ops table so that worker daemons can write result to user + * space with the correct ABI. + */ + job->ops = ops; + /* * Validate the opcode and fetch the file object for the specified * file descriptor. @@ -1707,7 +1729,7 @@ if (job->uaiocb.aio_sigevent.sigev_notify != SIGEV_KEVENT) goto no_kqueue; evflags = job->uaiocb.aio_sigevent.sigev_notify_kevent_flags; - if ((evflags & ~(EV_CLEAR | EV_DISPATCH | EV_ONESHOT)) != 0) { + if ((evflags & ~(EV_CLEAR | EV_DISPATCH | EV_ONESHOT | AIO_KEVENT_FLAG_REAP)) != 0) { error = EINVAL; goto err3; } @@ -1758,6 +1780,8 @@ /* * Take a reference to the submitting thread, so that worker daemons * can update this thread's resource usage counters. + * + * It is also needed to free resources asynchronously, if requested. */ job->td = td; atomic_add_int(&td->td_aio_count, 1); @@ -1994,8 +2018,6 @@ td->td_retval[0] = status; aio_free_entry(job); AIO_UNLOCK(ki); - ops->store_error(ujob, error); - ops->store_status(ujob, status); } else { error = EINVAL; AIO_UNLOCK(ki); @@ -2180,41 +2202,12 @@ static int kern_aio_error(struct thread *td, struct aiocb *ujob, struct aiocb_ops *ops) { - struct proc *p = td->td_proc; - struct kaiocb *job; - struct kaioinfo *ki; - int status; - - ki = p->p_aioinfo; - if (ki == NULL) { - td->td_retval[0] = EINVAL; - return (0); - } - - AIO_LOCK(ki); - TAILQ_FOREACH(job, &ki->kaio_all, allist) { - if (job->ujob == ujob) { - if (job->jobflags & KAIOCB_FINISHED) - td->td_retval[0] = - job->uaiocb._aiocb_private.error; - else - td->td_retval[0] = EINPROGRESS; - AIO_UNLOCK(ki); - return (0); - } - } - AIO_UNLOCK(ki); - /* - * Hack for failure of aio_aqueue. + * We return the value we last stored in the user space object, + * so that it's available after a partial failure of lio_listio(). */ - status = ops->fetch_status(ujob); - if (status == -1) { - td->td_retval[0] = ops->fetch_error(ujob); - return (0); - } + td->td_retval[0] = ops->fetch_error(ujob); - td->td_retval[0] = EINVAL; return (0); } @@ -2622,8 +2615,6 @@ aio_free_entry(job); AIO_UNLOCK(ki); ops->store_aiocb(ujobp, ujob); - ops->store_error(ujob, error); - ops->store_status(ujob, status); } else AIO_UNLOCK(ki); @@ -2693,6 +2684,17 @@ kn->kn_ptr.p_aio = job; kn->kn_flags &= ~EV_FLAG1; + /* + * Since AIO_KEVENT_FLAG_REAP (EV_FLAG2) is overloaded with EV_ERROR, + * we clear it from flags (it's still visible inside the job object + * where it will affect our behavior), but implicitly enable + * EV_ONESHOT. + */ + if (kn->kn_flags & AIO_KEVENT_FLAG_REAP) { + kn->kn_flags &= ~AIO_KEVENT_FLAG_REAP; + kn->kn_flags |= EV_ONESHOT; + } + knlist_add(&job->klist, kn, 0); return (0); @@ -2712,15 +2714,37 @@ } /* kqueue filter function */ -/*ARGSUSED*/ static int filt_aio(struct knote *kn, long hint) { struct kaiocb *job = kn->kn_ptr.p_aio; - kn->kn_data = job->uaiocb._aiocb_private.error; + if (job->uaiocb.aio_sigevent.sigev_notify_kevent_flags & AIO_KEVENT_FLAG_REAP) { + /* + * When asynchronous reap is requested, "data" is set to either + * the error or the result, with a flag to say which it is. + * The user does not have to call aio_return(). + */ + if (job->uaiocb._aiocb_private.error > 0) { + kn->kn_flags |= EV_ERROR; + kn->kn_data = job->uaiocb._aiocb_private.error; + } else { + kn->kn_data = job->uaiocb._aiocb_private.status; + } + } else { + /* + * The user must call aio_return() to fetch the result and free + * kernel resources. To avoid the need to call aio_error(), + * the error is written into "data". + * + * XXX This behaviour is old but undocumented. + */ + kn->kn_data = job->uaiocb._aiocb_private.error; + } + if (!(job->jobflags & KAIOCB_FINISHED)) return (0); + kn->kn_flags |= EV_EOF; return (1); } diff --git a/sys/sys/aio.h b/sys/sys/aio.h --- a/sys/sys/aio.h +++ b/sys/sys/aio.h @@ -112,6 +112,13 @@ #define aio_iov aio_buf /* I/O scatter/gather list */ #define aio_iovcnt aio_nbytes /* Length of aio_iov */ +/* + * A value used in aio_sigevent.sigev_kevent_flags to activate asynchronous + * reaping of completion events, with the result stored in kevent's data member + * and EV_ERROR flag. + */ +#define AIO_KEVENT_FLAG_REAP 0x4000 /* == EV_FLAG2 */ + #ifdef _KERNEL typedef void aio_cancel_fn_t(struct kaiocb *); @@ -150,6 +157,7 @@ uint64_t seqno; /* (*) job number */ aio_cancel_fn_t *cancel_fn; /* (a) backend cancel function */ aio_handle_fn_t *handle_fn; /* (c) backend handle function */ + struct aiocb_ops *ops; /* (a) ops for writing to user space */ union { /* Backend-specific data fields */ struct { /* BIO backend */ int nbio; /* Number of remaining bios */ diff --git a/tests/sys/aio/aio_test.c b/tests/sys/aio/aio_test.c --- a/tests/sys/aio/aio_test.c +++ b/tests/sys/aio/aio_test.c @@ -1980,6 +1980,142 @@ close(fd); } +ATF_TC_WITHOUT_HEAD(aio_kqueue_noreap); +ATF_TC_BODY(aio_kqueue_noreap, tc) +{ + int kq; + int pipe_fds[2]; + struct aiocb iocb; + struct aiocb *piocb; + char buffer[] = "hello world"; + struct kevent kev; + struct timespec timeout = {0, 0}; + + ATF_REQUIRE_KERNEL_MODULE("aio"); + ATF_REQUIRE_UNSAFE_AIO(); + + kq = kqueue(); + ATF_REQUIRE(kq >= 0); + ATF_REQUIRE_EQ(0, pipe(pipe_fds)); + + /* Submit a write that will succeed. */ + memset(&iocb, 0, sizeof(iocb)); + iocb.aio_sigevent.sigev_notify = SIGEV_KEVENT; + iocb.aio_sigevent.sigev_notify_kqueue = kq; + iocb.aio_fildes = pipe_fds[1]; + iocb.aio_buf = buffer; + iocb.aio_nbytes = sizeof(buffer); + ATF_REQUIRE_EQ(0, aio_write(&iocb)); + + /* Wait for completion. */ + ATF_REQUIRE_EQ(1, kevent(kq, NULL, 0, &kev, 1, NULL)); + ATF_REQUIRE_EQ((uintptr_t) &iocb, kev.ident); + ATF_REQUIRE_EQ(0, kev.flags & EV_ERROR); + ATF_REQUIRE_EQ(0, kev.data); /* undocumented */ + ATF_REQUIRE_EQ(0, aio_error(&iocb)); + + /* Reap the IO explicitly. */ + ATF_REQUIRE_EQ(sizeof(buffer), aio_waitcomplete(&piocb, &timeout)); + ATF_REQUIRE_EQ(&iocb, piocb); + + /* Nothing left in the kernel's queue. */ + ATF_REQUIRE_EQ(-1, aio_waitcomplete(&piocb, &timeout)); + ATF_REQUIRE_EQ(NULL, piocb); + + /* Submit a write that will fail. */ + close(pipe_fds[0]); + memset(&iocb, 0, sizeof(iocb)); + iocb.aio_sigevent.sigev_notify = SIGEV_KEVENT; + iocb.aio_sigevent.sigev_notify_kqueue = kq; + iocb.aio_fildes = pipe_fds[1]; + iocb.aio_buf = buffer; + iocb.aio_nbytes = sizeof(buffer); + ATF_REQUIRE_EQ(0, aio_write(&iocb)); + + /* Wait for completion. */ + ATF_REQUIRE_EQ(1, kevent(kq, NULL, 0, &kev, 1, NULL)); + ATF_REQUIRE_EQ((uintptr_t) &iocb, kev.ident); + ATF_REQUIRE_EQ(0, kev.flags & EV_ERROR); + ATF_REQUIRE_EQ(EPIPE, kev.data); /* undocumented */ + ATF_REQUIRE_EQ(EPIPE, aio_error(&iocb)); + + /* Reap the IO explicitly. */ + ATF_REQUIRE_EQ(-1, aio_waitcomplete(&piocb, &timeout)); + ATF_REQUIRE_EQ(&iocb, piocb); + ATF_REQUIRE_EQ(EPIPE, errno); + + /* Nothing left in the kernel's queue. */ + ATF_REQUIRE_EQ(-1, aio_waitcomplete(&piocb, &timeout)); + ATF_REQUIRE_EQ(NULL, piocb); + + close(pipe_fds[1]); + close(kq); +} + +ATF_TC_WITHOUT_HEAD(aio_kqueue_reap); +ATF_TC_BODY(aio_kqueue_reap, tc) +{ + int kq; + int pipe_fds[2]; + struct aiocb iocb; + struct aiocb *piocb; + char buffer[] = "hello world"; + struct kevent kev; + struct timespec timeout = {0, 0}; + + ATF_REQUIRE_KERNEL_MODULE("aio"); + ATF_REQUIRE_UNSAFE_AIO(); + + kq = kqueue(); + ATF_REQUIRE(kq >= 0); + ATF_REQUIRE_EQ(0, pipe(pipe_fds)); + + /* Submit a write that will succeed. */ + memset(&iocb, 0, sizeof(iocb)); + iocb.aio_sigevent.sigev_notify = SIGEV_KEVENT; + iocb.aio_sigevent.sigev_notify_kqueue = kq; + iocb.aio_sigevent.sigev_notify_kevent_flags = AIO_KEVENT_FLAG_REAP; + iocb.aio_fildes = pipe_fds[1]; + iocb.aio_buf = buffer; + iocb.aio_nbytes = sizeof(buffer); + ATF_REQUIRE_EQ(0, aio_write(&iocb)); + + /* Wait for completion. */ + ATF_REQUIRE_EQ(1, kevent(kq, NULL, 0, &kev, 1, NULL)); + ATF_REQUIRE_EQ((uintptr_t) &iocb, kev.ident); + ATF_REQUIRE_EQ(0, kev.flags & EV_ERROR); + ATF_REQUIRE_EQ(sizeof(buffer), kev.data); + ATF_REQUIRE_EQ(0, aio_error(&iocb)); + + /* Nothing left in the kernel's queue. */ + ATF_REQUIRE_EQ(-1, aio_waitcomplete(&piocb, &timeout)); + ATF_REQUIRE_EQ(NULL, piocb); + + /* Submit a write that will fail. */ + close(pipe_fds[0]); + memset(&iocb, 0, sizeof(iocb)); + iocb.aio_sigevent.sigev_notify = SIGEV_KEVENT; + iocb.aio_sigevent.sigev_notify_kqueue = kq; + iocb.aio_sigevent.sigev_notify_kevent_flags = AIO_KEVENT_FLAG_REAP; + iocb.aio_fildes = pipe_fds[1]; + iocb.aio_buf = buffer; + iocb.aio_nbytes = sizeof(buffer); + ATF_REQUIRE_EQ(0, aio_write(&iocb)); + + /* Wait for completion. */ + ATF_REQUIRE_EQ(1, kevent(kq, NULL, 0, &kev, 1, NULL)); + ATF_REQUIRE_EQ((uintptr_t) &iocb, kev.ident); + ATF_REQUIRE_EQ(EV_ERROR, kev.flags & EV_ERROR); + ATF_REQUIRE_EQ(EPIPE, kev.data); + ATF_REQUIRE_EQ(EPIPE, aio_error(&iocb)); + + /* Nothing left in the kernel's queue. */ + ATF_REQUIRE_EQ(-1, aio_waitcomplete(&piocb, &timeout)); + ATF_REQUIRE_EQ(NULL, piocb); + + close(pipe_fds[1]); + close(kq); +} ATF_TP_ADD_TCS(tp) { @@ -2038,6 +2174,8 @@ ATF_TP_ADD_TC(tp, vectored_socket_poll); ATF_TP_ADD_TC(tp, vectored_thread); ATF_TP_ADD_TC(tp, aio_threadexit); + ATF_TP_ADD_TC(tp, aio_kqueue_noreap); + ATF_TP_ADD_TC(tp, aio_kqueue_reap); return (atf_no_error()); }