diff --git a/sys/contrib/openzfs/module/os/freebsd/zfs/zfs_racct.c b/sys/contrib/openzfs/module/os/freebsd/zfs/zfs_racct.c --- a/sys/contrib/openzfs/module/os/freebsd/zfs/zfs_racct.c +++ b/sys/contrib/openzfs/module/os/freebsd/zfs/zfs_racct.c @@ -29,7 +29,7 @@ void zfs_racct_read(uint64_t size, uint64_t iops) { - curthread->td_ru.ru_inblock += iops; + RU_ATOMIC_ADD(curthread->td_ru.ru_inblock, iops); #ifdef RACCT if (racct_enable) { PROC_LOCK(curproc); @@ -43,7 +43,7 @@ void zfs_racct_write(uint64_t size, uint64_t iops) { - curthread->td_ru.ru_oublock += iops; + RU_ATOMIC_ADD(curthread->td_ru.ru_oublock, iops); #ifdef RACCT if (racct_enable) { PROC_LOCK(curproc); diff --git a/sys/fs/ext2fs/ext2_bmap.c b/sys/fs/ext2fs/ext2_bmap.c --- a/sys/fs/ext2fs/ext2_bmap.c +++ b/sys/fs/ext2fs/ext2_bmap.c @@ -184,7 +184,7 @@ PROC_UNLOCK(curproc); } #endif - curthread->td_ru.ru_inblock++; + RU_ATOMIC_INC(curthread->td_ru.ru_inblock); error = bufwait(bp); if (error != 0) { brelse(bp); diff --git a/sys/fs/nfsclient/nfs_clvnops.c b/sys/fs/nfsclient/nfs_clvnops.c --- a/sys/fs/nfsclient/nfs_clvnops.c +++ b/sys/fs/nfsclient/nfs_clvnops.c @@ -3506,7 +3506,7 @@ bp->b_iocmd = BIO_WRITE; bufobj_wref(bp->b_bufobj); - curthread->td_ru.ru_oublock++; + RU_ATOMIC_INC(curthread->td_ru.ru_oublock); /* * Note: to avoid loopback deadlocks, we do not diff --git a/sys/kern/kern_physio.c b/sys/kern/kern_physio.c --- a/sys/kern/kern_physio.c +++ b/sys/kern/kern_physio.c @@ -138,10 +138,10 @@ g_reset_bio(bp); if (uio->uio_rw == UIO_READ) { bp->bio_cmd = BIO_READ; - curthread->td_ru.ru_inblock++; + RU_ATOMIC_INC(curthread->td_ru.ru_inblock); } else { bp->bio_cmd = BIO_WRITE; - curthread->td_ru.ru_oublock++; + RU_ATOMIC_INC(curthread->td_ru.ru_oublock); } bp->bio_offset = uio->uio_offset; base = uio->uio_iov[i].iov_base; diff --git a/sys/kern/kern_thread.c b/sys/kern/kern_thread.c --- a/sys/kern/kern_thread.c +++ b/sys/kern/kern_thread.c @@ -64,6 +64,7 @@ #include #endif #include +#include #include @@ -365,6 +366,9 @@ #endif umtx_thread_alloc(td); MPASS(td->td_sel == NULL); + + atomic_store_int(&td->td_aio_count, 0); + return (0); } @@ -406,6 +410,7 @@ #endif /* Free all OSD associated to this thread. */ osd_thread_exit(td); + aio_thread_exit(td); td_softdep_cleanup(td); MPASS(td->td_su == NULL); seltdfini(td); diff --git a/sys/kern/sys_pipe.c b/sys/kern/sys_pipe.c --- a/sys/kern/sys_pipe.c +++ b/sys/kern/sys_pipe.c @@ -883,7 +883,7 @@ PIPE_UNLOCK(rpipe); if (nread > 0) - td->td_ru.ru_msgrcv++; + RU_ATOMIC_INC(td->td_ru.ru_msgrcv); return (error); } @@ -1342,7 +1342,7 @@ pipeunlock(wpipe); PIPE_UNLOCK(rpipe); if (uio->uio_resid != orig_resid) - td->td_ru.ru_msgsnd++; + RU_ATOMIC_INC(td->td_ru.ru_msgsnd); return (error); } diff --git a/sys/kern/uipc_socket.c b/sys/kern/uipc_socket.c --- a/sys/kern/uipc_socket.c +++ b/sys/kern/uipc_socket.c @@ -1464,7 +1464,7 @@ dontroute = (flags & MSG_DONTROUTE) && (so->so_options & SO_DONTROUTE) == 0; if (td != NULL) - td->td_ru.ru_msgsnd++; + RU_ATOMIC_INC(td->td_ru.ru_msgsnd); if (control != NULL) clen = control->m_len; @@ -1640,7 +1640,7 @@ (flags & MSG_DONTROUTE) && (so->so_options & SO_DONTROUTE) == 0 && (so->so_proto->pr_flags & PR_ATOMIC); if (td != NULL) - td->td_ru.ru_msgsnd++; + RU_ATOMIC_INC(td->td_ru.ru_msgsnd); if (control != NULL) clen = control->m_len; @@ -2081,7 +2081,7 @@ */ SOCKBUF_LOCK_ASSERT(&so->so_rcv); if (uio->uio_td) - uio->uio_td->td_ru.ru_msgrcv++; + RU_ATOMIC_INC(uio->uio_td->td_ru.ru_msgrcv); KASSERT(m == so->so_rcv.sb_mb, ("soreceive: m != so->so_rcv.sb_mb")); SBLASTRECORDCHK(&so->so_rcv); SBLASTMBUFCHK(&so->so_rcv); @@ -2573,7 +2573,7 @@ /* Statistics. */ if (uio->uio_td) - uio->uio_td->td_ru.ru_msgrcv++; + RU_ATOMIC_INC(uio->uio_td->td_ru.ru_msgrcv); /* Fill uio until full or current end of socket buffer is reached. */ len = min(uio->uio_resid, sbavail(sb)); @@ -2742,7 +2742,7 @@ SOCKBUF_LOCK_ASSERT(&so->so_rcv); if (uio->uio_td) - uio->uio_td->td_ru.ru_msgrcv++; + RU_ATOMIC_INC(uio->uio_td->td_ru.ru_msgrcv); SBLASTRECORDCHK(&so->so_rcv); SBLASTMBUFCHK(&so->so_rcv); nextrecord = m->m_nextpkt; diff --git a/sys/kern/vfs_aio.c b/sys/kern/vfs_aio.c --- a/sys/kern/vfs_aio.c +++ b/sys/kern/vfs_aio.c @@ -568,6 +568,37 @@ return (0); } +void +aio_thread_exit(struct thread *td) +{ + struct kaioinfo *ki; + struct proc *p; + + /* + * Common case. It's OK if we see a slightly out of date version as + * we'll recheck under lock if it's non-zero. + */ + if (atomic_load_int(&td->td_aio_count) == 0) + return; + + p = td->td_proc; + ki = p->p_aioinfo; + + /* If we've ever submitted an IO, p_aioinfo must be set. */ + MPASS(ki != NULL); + + /* + * Wait until there are no unfinished jobs that were submitted by this + * thread. + */ + AIO_LOCK(ki); + while (atomic_load_int(&td->td_aio_count) > 0) { + ki->kaio_flags |= KAIO_WAKEUP; + msleep(&p->p_aioinfo, AIO_MTX(ki), PRIBIO, "aiotde", hz); + } + AIO_UNLOCK(ki); +} + static void aio_proc_rundown_exec(void *arg, struct proc *p, struct image_params *imgp __unused) @@ -938,6 +969,14 @@ taskqueue_enqueue(taskqueue_aiod_kick, &ki->kaio_sync_task); } + + /* + * Drop our reference to the submitting thread. This allows it to + * exit. + */ + atomic_subtract_int(&job->td->td_aio_count, 1); + job->td = NULL; + if (ki->kaio_flags & KAIO_WAKEUP) { ki->kaio_flags &= ~KAIO_WAKEUP; wakeup(&userp->p_aioinfo); @@ -1035,6 +1074,19 @@ job->uaiocb._aiocb_private.error = error; job->uaiocb._aiocb_private.status = status; + /* + * Transfer the resource usage delta to the submitting thread's + * counters. + */ + if (job->outblock) + RU_ATOMIC_ADD(job->td->td_ru.ru_oublock, job->outblock); + if (job->inblock) + RU_ATOMIC_ADD(job->td->td_ru.ru_inblock, job->inblock); + if (job->msgsnd) + RU_ATOMIC_ADD(job->td->td_ru.ru_msgsnd, job->msgsnd); + if (job->msgrcv) + RU_ATOMIC_ADD(job->td->td_ru.ru_msgrcv, job->msgrcv); + userp = job->userproc; ki = userp->p_aioinfo; @@ -1703,6 +1755,13 @@ job->uiop->uio_offset = job->uaiocb.aio_offset; job->uiop->uio_td = td; + /* + * Take a reference to the submitting thread, so that worker daemons + * can update this thread's resource usage counters. + */ + job->td = td; + atomic_add_int(&td->td_aio_count, 1); + if (opcode == LIO_MLOCK) { aio_schedule(job, aio_process_mlock); error = 0; @@ -1933,10 +1992,6 @@ status = job->uaiocb._aiocb_private.status; error = job->uaiocb._aiocb_private.error; td->td_retval[0] = status; - td->td_ru.ru_oublock += job->outblock; - td->td_ru.ru_inblock += job->inblock; - td->td_ru.ru_msgsnd += job->msgsnd; - td->td_ru.ru_msgrcv += job->msgrcv; aio_free_entry(job); AIO_UNLOCK(ki); ops->store_error(ujob, error); @@ -2564,10 +2619,6 @@ status = job->uaiocb._aiocb_private.status; error = job->uaiocb._aiocb_private.error; td->td_retval[0] = status; - td->td_ru.ru_oublock += job->outblock; - td->td_ru.ru_inblock += job->inblock; - td->td_ru.ru_msgsnd += job->msgsnd; - td->td_ru.ru_msgrcv += job->msgrcv; aio_free_entry(job); AIO_UNLOCK(ki); ops->store_aiocb(ujobp, ujob); diff --git a/sys/kern/vfs_bio.c b/sys/kern/vfs_bio.c --- a/sys/kern/vfs_bio.c +++ b/sys/kern/vfs_bio.c @@ -2132,7 +2132,7 @@ PROC_UNLOCK(curproc); } #endif /* RACCT */ - td->td_ru.ru_inblock++; + RU_ATOMIC_INC(td->td_ru.ru_inblock); rabp->b_flags |= B_ASYNC; rabp->b_flags &= ~B_INVAL; if ((flags & GB_CKHASH) != 0) { @@ -2205,7 +2205,7 @@ PROC_UNLOCK(td->td_proc); } #endif /* RACCT */ - td->td_ru.ru_inblock++; + RU_ATOMIC_INC(td->td_ru.ru_inblock); bp->b_iocmd = BIO_READ; bp->b_flags &= ~B_INVAL; if ((flags & GB_CKHASH) != 0) { @@ -2313,7 +2313,7 @@ PROC_UNLOCK(curproc); } #endif /* RACCT */ - curthread->td_ru.ru_oublock++; + RU_ATOMIC_INC(curthread->td_ru.ru_oublock); if (oldflags & B_ASYNC) BUF_KERNPROC(bp); bp->b_iooffset = dbtob(bp->b_blkno); diff --git a/sys/kern/vfs_cluster.c b/sys/kern/vfs_cluster.c --- a/sys/kern/vfs_cluster.c +++ b/sys/kern/vfs_cluster.c @@ -257,7 +257,7 @@ PROC_UNLOCK(td->td_proc); } #endif /* RACCT */ - td->td_ru.ru_inblock++; + RU_ATOMIC_INC(td->td_ru.ru_inblock); } /* @@ -317,7 +317,7 @@ PROC_UNLOCK(td->td_proc); } #endif /* RACCT */ - td->td_ru.ru_inblock++; + RU_ATOMIC_INC(td->td_ru.ru_inblock); } if (reqbp) { diff --git a/sys/netinet/sctp_output.c b/sys/netinet/sctp_output.c --- a/sys/netinet/sctp_output.c +++ b/sys/netinet/sctp_output.c @@ -12900,7 +12900,7 @@ } /* Ok, we will attempt a msgsnd :> */ if (p) { - p->td_ru.ru_msgsnd++; + RU_ATOMIC_INC(p->td_ru.ru_msgsnd); } /* Are we aborting? */ if (sinfo_flags & SCTP_ABORT) { diff --git a/sys/ofed/drivers/infiniband/ulp/sdp/sdp_main.c b/sys/ofed/drivers/infiniband/ulp/sdp/sdp_main.c --- a/sys/ofed/drivers/infiniband/ulp/sdp/sdp_main.c +++ b/sys/ofed/drivers/infiniband/ulp/sdp/sdp_main.c @@ -1100,7 +1100,7 @@ goto out; } if (td != NULL) - td->td_ru.ru_msgsnd++; + RU_ATOMIC_INC(td->td_ru.ru_msgsnd); ssk = sdp_sk(so); error = SOCK_IO_SEND_LOCK(so, SBLOCKWAIT(flags)); @@ -1348,7 +1348,7 @@ /* Statistics. */ if (uio->uio_td) - uio->uio_td->td_ru.ru_msgrcv++; + RU_ATOMIC_INC(uio->uio_td->td_ru.ru_msgrcv); /* Fill uio until full or current end of socket buffer is reached. */ len = min(uio->uio_resid, sbavail(sb)); diff --git a/sys/sys/aio.h b/sys/sys/aio.h --- a/sys/sys/aio.h +++ b/sys/sys/aio.h @@ -136,6 +136,7 @@ int msgsnd; /* (*) messages sent */ int msgrcv; /* (*) messages received */ struct proc *userproc; /* (*) user process */ + struct thread *td; /* (a) submitting thread */ struct ucred *cred; /* (*) active credential when created */ struct file *fd_file; /* (*) pointer to file structure */ struct aioliojob *lio; /* (*) optional lio job */ @@ -210,6 +211,7 @@ void aio_schedule(struct kaiocb *job, aio_handle_fn_t *func); bool aio_set_cancel_function(struct kaiocb *job, aio_cancel_fn_t *func); void aio_switch_vmspace(struct kaiocb *job); +void aio_thread_exit(struct thread *thread); #else /* !_KERNEL */ diff --git a/sys/sys/proc.h b/sys/sys/proc.h --- a/sys/sys/proc.h +++ b/sys/sys/proc.h @@ -170,6 +170,7 @@ * y - created at first aio, doesn't change until exit or exec at which * point we are single-threaded and only curthread changes it * z - zombie threads lock + * A - atomic ops * * If the locking key specifies two identifiers (for example, p_pptr) then * either lock is sufficient for read access, but both locks must be held @@ -383,6 +384,7 @@ #ifdef EPOCH_TRACE SLIST_HEAD(, epoch_tracker) td_epochs; #endif + int td_aio_count; /* (A) number of AIOs in flight */ }; struct thread0_storage { diff --git a/sys/sys/resource.h b/sys/sys/resource.h --- a/sys/sys/resource.h +++ b/sys/sys/resource.h @@ -63,7 +63,8 @@ * Resource utilization information. * * All fields are only modified by curthread and - * no locks are required to read. + * no locks are required to read, except those marked + * (a), which are accessed with atomics. */ #define RUSAGE_SELF 0 @@ -81,10 +82,10 @@ long ru_minflt; /* page reclaims */ long ru_majflt; /* page faults */ long ru_nswap; /* swaps */ - long ru_inblock; /* block input operations */ - long ru_oublock; /* block output operations */ - long ru_msgsnd; /* messages sent */ - long ru_msgrcv; /* messages received */ + long ru_inblock; /* (a) block input operations */ + long ru_oublock; /* (a) block output operations */ + long ru_msgsnd; /* (a) messages sent */ + long ru_msgrcv; /* (a) messages received */ long ru_nsignals; /* signals received */ long ru_nvcsw; /* voluntary context switches */ long ru_nivcsw; /* involuntary " */ @@ -176,6 +177,9 @@ #ifdef _KERNEL +#define RU_ATOMIC_INC(v) atomic_add_long(&(v), 1) +#define RU_ATOMIC_ADD(v, d) atomic_add_long(&(v), (d)) + extern struct loadavg averunnable; void read_cpu_time(long *cp_time); /* Writes array of CPUSTATES */ diff --git a/tests/sys/aio/Makefile b/tests/sys/aio/Makefile --- a/tests/sys/aio/Makefile +++ b/tests/sys/aio/Makefile @@ -12,7 +12,7 @@ PLAIN_TESTS_C+= aio_kqueue_test PLAIN_TESTS_C+= lio_kqueue_test -LIBADD.aio_test+= util rt +LIBADD.aio_test+= util rt pthread LIBADD.lio_test+= rt CFLAGS+= -I${.CURDIR:H:H} diff --git a/tests/sys/aio/aio_test.c b/tests/sys/aio/aio_test.c --- a/tests/sys/aio/aio_test.c +++ b/tests/sys/aio/aio_test.c @@ -39,6 +39,7 @@ * types. */ +#include #include #include #include @@ -53,6 +54,7 @@ #include #include #include +#include #include #include #include @@ -1914,6 +1916,71 @@ aio_zvol_cleanup(); } +static void * +aio_threadexit_run(void *arg) +{ + struct aiocb *iocb; + intptr_t rc; + + iocb = arg; + rc = aio_write(iocb); + + return ((void *)rc); +} + +ATF_TC_WITHOUT_HEAD(aio_threadexit); +ATF_TC_BODY(aio_threadexit, tc) +{ + char buffer[] = "hello"; + struct timespec zero_timeout = {0, 0}; + pthread_t threads[sizeof(buffer)]; + struct aiocb iocbs[sizeof(buffer)]; + struct aiocb *iocbp; + int fd; + + ATF_REQUIRE_KERNEL_MODULE("aio"); + ATF_REQUIRE_UNSAFE_AIO(); + + fd = open(FILE_PATHNAME, O_RDWR | O_CREAT, 0600); + ATF_REQUIRE(fd >= 0); + + /* Start all the threads, one per byte in buffer. */ + for (unsigned i = 0; i < nitems(threads); ++i) { + memset(&iocbs[i], 0, sizeof(iocbs[i])); + iocbs[i].aio_sigevent.sigev_notify = SIGEV_NONE; + iocbs[i].aio_fildes = fd; + iocbs[i].aio_buf = &buffer[i]; + iocbs[i].aio_nbytes = 1; + iocbs[i].aio_offset = i; + ATF_REQUIRE_EQ(0, pthread_create(&threads[i], NULL, + aio_threadexit_run, &iocbs[i])); + } + + /* Wait for all the threads to exit. */ + for (unsigned i = 0; i < nitems(threads); ++i) { + void *result; + + ATF_REQUIRE_EQ(0, pthread_join(threads[i], &result)); + ATF_REQUIRE_EQ(0, result); + } + + /* Reap the completed IOs, zero timeout should be sufficent.. */ + for (unsigned i = 0; i < nitems(threads); ++i) { + ATF_REQUIRE_EQ(1, aio_waitcomplete(&iocbp, &zero_timeout)); + } + + /* Nothing more in the kernel's queue. */ + ATF_REQUIRE_EQ(-1, aio_waitcomplete(&iocbp, &zero_timeout)); + ATF_REQUIRE_EQ(errno, EAGAIN); + + /* Read the data back to see that our writes all worked. */ + ATF_REQUIRE_EQ(sizeof(buffer), pread(fd, buffer, sizeof(buffer), 0)); + ATF_REQUIRE_EQ(0, strcmp("hello", buffer)); + + close(fd); +} + + ATF_TP_ADD_TCS(tp) { @@ -1970,6 +2037,7 @@ ATF_TP_ADD_TC(tp, vectored_unaligned); ATF_TP_ADD_TC(tp, vectored_socket_poll); ATF_TP_ADD_TC(tp, vectored_thread); + ATF_TP_ADD_TC(tp, aio_threadexit); return (atf_no_error()); }