Index: head/sys/kern/vfs_aio.c =================================================================== --- head/sys/kern/vfs_aio.c +++ head/sys/kern/vfs_aio.c @@ -311,6 +311,7 @@ static int aio_qphysio(struct proc *p, struct kaiocb *job); static void aio_daemon(void *param); static void aio_bio_done_notify(struct proc *userp, struct kaiocb *job); +static bool aio_clear_cancel_function_locked(struct kaiocb *job); static int aio_kick(struct proc *userp); static void aio_kick_nowait(struct proc *userp); static void aio_kick_helper(void *context, int pending); @@ -919,7 +920,7 @@ if (--sjob->pending > 0) continue; TAILQ_REMOVE(&ki->kaio_syncqueue, sjob, list); - if (!aio_clear_cancel_function(sjob)) + if (!aio_clear_cancel_function_locked(sjob)) continue; TAILQ_INSERT_TAIL(&ki->kaio_syncready, sjob, list); schedule_fsync = true; @@ -967,40 +968,57 @@ return ((job->jobflags & KAIOCB_CLEARED) != 0); } -bool -aio_clear_cancel_function(struct kaiocb *job) +static bool +aio_clear_cancel_function_locked(struct kaiocb *job) { - struct kaioinfo *ki; - ki = job->userproc->p_aioinfo; - AIO_LOCK(ki); + AIO_LOCK_ASSERT(job->userproc->p_aioinfo, MA_OWNED); MPASS(job->cancel_fn != NULL); if (job->jobflags & KAIOCB_CANCELLING) { job->jobflags |= KAIOCB_CLEARED; - AIO_UNLOCK(ki); return (false); } job->cancel_fn = NULL; - AIO_UNLOCK(ki); return (true); } bool -aio_set_cancel_function(struct kaiocb *job, aio_cancel_fn_t *func) +aio_clear_cancel_function(struct kaiocb *job) { struct kaioinfo *ki; + bool ret; ki = job->userproc->p_aioinfo; AIO_LOCK(ki); - if (job->jobflags & KAIOCB_CANCELLED) { - AIO_UNLOCK(ki); + ret = aio_clear_cancel_function_locked(job); + AIO_UNLOCK(ki); + return (ret); +} + +static bool +aio_set_cancel_function_locked(struct kaiocb *job, aio_cancel_fn_t *func) +{ + + AIO_LOCK_ASSERT(job->userproc->p_aioinfo, MA_OWNED); + if (job->jobflags & KAIOCB_CANCELLED) return (false); - } job->cancel_fn = func; - AIO_UNLOCK(ki); return (true); } +bool +aio_set_cancel_function(struct kaiocb *job, aio_cancel_fn_t *func) +{ + struct kaioinfo *ki; + bool ret; + + ki = job->userproc->p_aioinfo; + AIO_LOCK(ki); + ret = aio_set_cancel_function_locked(job, func); + AIO_UNLOCK(ki); + return (ret); +} + void aio_complete(struct kaiocb *job, long status, int error) { @@ -1655,10 +1673,10 @@ struct kaioinfo *ki; ki = job->userproc->p_aioinfo; - mtx_lock(&aio_job_mtx); + AIO_LOCK(ki); if (!aio_cancel_cleared(job)) TAILQ_REMOVE(&ki->kaio_syncqueue, job, list); - mtx_unlock(&aio_job_mtx); + AIO_UNLOCK(ki); aio_cancel(job); } @@ -1718,7 +1736,8 @@ } } if (job->pending != 0) { - if (!aio_set_cancel_function(job, aio_cancel_sync)) { + if (!aio_set_cancel_function_locked(job, + aio_cancel_sync)) { AIO_UNLOCK(ki); aio_cancel(job); return (0); Index: head/tests/sys/aio/aio_test.c =================================================================== --- head/tests/sys/aio/aio_test.c +++ head/tests/sys/aio/aio_test.c @@ -924,6 +924,88 @@ close(s[0]); } +/* + * This test just performs a basic test of aio_fsync(). + */ +ATF_TC_WITHOUT_HEAD(aio_fsync_test); +ATF_TC_BODY(aio_fsync_test, tc) +{ + struct aiocb synccb, *iocbp; + struct { + struct aiocb iocb; + bool done; + char *buffer; + } buffers[16]; + struct stat sb; + char pathname[PATH_MAX]; + ssize_t rval; + unsigned i; + int fd; + + ATF_REQUIRE_KERNEL_MODULE("aio"); + ATF_REQUIRE_UNSAFE_AIO(); + + strcpy(pathname, PATH_TEMPLATE); + fd = mkstemp(pathname); + ATF_REQUIRE_MSG(fd != -1, "mkstemp failed: %s", strerror(errno)); + unlink(pathname); + + ATF_REQUIRE(fstat(fd, &sb) == 0); + ATF_REQUIRE(sb.st_blksize != 0); + ATF_REQUIRE(ftruncate(fd, sb.st_blksize * nitems(buffers)) == 0); + + /* + * Queue several asynchronous write requests. Hopefully this + * forces the aio_fsync() request to be deferred. There is no + * reliable way to guarantee that however. + */ + srandomdev(); + for (i = 0; i < nitems(buffers); i++) { + buffers[i].done = false; + memset(&buffers[i].iocb, 0, sizeof(buffers[i].iocb)); + buffers[i].buffer = malloc(sb.st_blksize); + aio_fill_buffer(buffers[i].buffer, sb.st_blksize, random()); + buffers[i].iocb.aio_fildes = fd; + buffers[i].iocb.aio_buf = buffers[i].buffer; + buffers[i].iocb.aio_nbytes = sb.st_blksize; + buffers[i].iocb.aio_offset = sb.st_blksize * i; + ATF_REQUIRE(aio_write(&buffers[i].iocb) == 0); + } + + /* Queue the aio_fsync request. */ + memset(&synccb, 0, sizeof(synccb)); + synccb.aio_fildes = fd; + ATF_REQUIRE(aio_fsync(O_SYNC, &synccb) == 0); + + /* Wait for requests to complete. */ + for (;;) { + next: + rval = aio_waitcomplete(&iocbp, NULL); + ATF_REQUIRE(iocbp != NULL); + if (iocbp == &synccb) { + ATF_REQUIRE(rval == 0); + break; + } + + for (i = 0; i < nitems(buffers); i++) { + if (iocbp == &buffers[i].iocb) { + ATF_REQUIRE(buffers[i].done == false); + ATF_REQUIRE(rval == sb.st_blksize); + buffers[i].done = true; + goto next; + } + } + + ATF_REQUIRE_MSG(false, "unmatched AIO request"); + } + + for (i = 0; i < nitems(buffers); i++) + ATF_REQUIRE_MSG(buffers[i].done, + "AIO request %u did not complete", i); + + close(fd); +} + ATF_TP_ADD_TCS(tp) { @@ -937,6 +1019,7 @@ ATF_TP_ADD_TC(tp, aio_socket_two_reads); ATF_TP_ADD_TC(tp, aio_socket_blocking_short_write); ATF_TP_ADD_TC(tp, aio_socket_short_write_cancel); + ATF_TP_ADD_TC(tp, aio_fsync_test); return (atf_no_error()); }