Page MenuHomeFreeBSD

D33271.diff
No OneTemporary

D33271.diff

diff --git a/sys/contrib/openzfs/module/os/freebsd/zfs/zfs_racct.c b/sys/contrib/openzfs/module/os/freebsd/zfs/zfs_racct.c
--- a/sys/contrib/openzfs/module/os/freebsd/zfs/zfs_racct.c
+++ b/sys/contrib/openzfs/module/os/freebsd/zfs/zfs_racct.c
@@ -29,7 +29,7 @@
void
zfs_racct_read(uint64_t size, uint64_t iops)
{
- curthread->td_ru.ru_inblock += iops;
+ RU_ATOMIC_ADD(curthread->td_ru.ru_inblock, iops);
#ifdef RACCT
if (racct_enable) {
PROC_LOCK(curproc);
@@ -43,7 +43,7 @@
void
zfs_racct_write(uint64_t size, uint64_t iops)
{
- curthread->td_ru.ru_oublock += iops;
+ RU_ATOMIC_ADD(curthread->td_ru.ru_oublock, iops);
#ifdef RACCT
if (racct_enable) {
PROC_LOCK(curproc);
diff --git a/sys/fs/ext2fs/ext2_bmap.c b/sys/fs/ext2fs/ext2_bmap.c
--- a/sys/fs/ext2fs/ext2_bmap.c
+++ b/sys/fs/ext2fs/ext2_bmap.c
@@ -184,7 +184,7 @@
PROC_UNLOCK(curproc);
}
#endif
- curthread->td_ru.ru_inblock++;
+ RU_ATOMIC_INC(curthread->td_ru.ru_inblock);
error = bufwait(bp);
if (error != 0) {
brelse(bp);
diff --git a/sys/fs/nfsclient/nfs_clvnops.c b/sys/fs/nfsclient/nfs_clvnops.c
--- a/sys/fs/nfsclient/nfs_clvnops.c
+++ b/sys/fs/nfsclient/nfs_clvnops.c
@@ -3506,7 +3506,7 @@
bp->b_iocmd = BIO_WRITE;
bufobj_wref(bp->b_bufobj);
- curthread->td_ru.ru_oublock++;
+ RU_ATOMIC_INC(curthread->td_ru.ru_oublock);
/*
* Note: to avoid loopback deadlocks, we do not
diff --git a/sys/kern/kern_physio.c b/sys/kern/kern_physio.c
--- a/sys/kern/kern_physio.c
+++ b/sys/kern/kern_physio.c
@@ -138,10 +138,10 @@
g_reset_bio(bp);
if (uio->uio_rw == UIO_READ) {
bp->bio_cmd = BIO_READ;
- curthread->td_ru.ru_inblock++;
+ RU_ATOMIC_INC(curthread->td_ru.ru_inblock);
} else {
bp->bio_cmd = BIO_WRITE;
- curthread->td_ru.ru_oublock++;
+ RU_ATOMIC_INC(curthread->td_ru.ru_oublock);
}
bp->bio_offset = uio->uio_offset;
base = uio->uio_iov[i].iov_base;
diff --git a/sys/kern/kern_thread.c b/sys/kern/kern_thread.c
--- a/sys/kern/kern_thread.c
+++ b/sys/kern/kern_thread.c
@@ -64,6 +64,7 @@
#include <sys/pmckern.h>
#endif
#include <sys/priv.h>
+#include <sys/aio.h>
#include <security/audit/audit.h>
@@ -365,6 +366,9 @@
#endif
umtx_thread_alloc(td);
MPASS(td->td_sel == NULL);
+
+ atomic_store_int(&td->td_aio_count, 0);
+
return (0);
}
@@ -406,6 +410,7 @@
#endif
/* Free all OSD associated to this thread. */
osd_thread_exit(td);
+ aio_thread_exit(td);
td_softdep_cleanup(td);
MPASS(td->td_su == NULL);
seltdfini(td);
diff --git a/sys/kern/sys_pipe.c b/sys/kern/sys_pipe.c
--- a/sys/kern/sys_pipe.c
+++ b/sys/kern/sys_pipe.c
@@ -883,7 +883,7 @@
PIPE_UNLOCK(rpipe);
if (nread > 0)
- td->td_ru.ru_msgrcv++;
+ RU_ATOMIC_INC(td->td_ru.ru_msgrcv);
return (error);
}
@@ -1342,7 +1342,7 @@
pipeunlock(wpipe);
PIPE_UNLOCK(rpipe);
if (uio->uio_resid != orig_resid)
- td->td_ru.ru_msgsnd++;
+ RU_ATOMIC_INC(td->td_ru.ru_msgsnd);
return (error);
}
diff --git a/sys/kern/uipc_socket.c b/sys/kern/uipc_socket.c
--- a/sys/kern/uipc_socket.c
+++ b/sys/kern/uipc_socket.c
@@ -1464,7 +1464,7 @@
dontroute =
(flags & MSG_DONTROUTE) && (so->so_options & SO_DONTROUTE) == 0;
if (td != NULL)
- td->td_ru.ru_msgsnd++;
+ RU_ATOMIC_INC(td->td_ru.ru_msgsnd);
if (control != NULL)
clen = control->m_len;
@@ -1640,7 +1640,7 @@
(flags & MSG_DONTROUTE) && (so->so_options & SO_DONTROUTE) == 0 &&
(so->so_proto->pr_flags & PR_ATOMIC);
if (td != NULL)
- td->td_ru.ru_msgsnd++;
+ RU_ATOMIC_INC(td->td_ru.ru_msgsnd);
if (control != NULL)
clen = control->m_len;
@@ -2081,7 +2081,7 @@
*/
SOCKBUF_LOCK_ASSERT(&so->so_rcv);
if (uio->uio_td)
- uio->uio_td->td_ru.ru_msgrcv++;
+ RU_ATOMIC_INC(uio->uio_td->td_ru.ru_msgrcv);
KASSERT(m == so->so_rcv.sb_mb, ("soreceive: m != so->so_rcv.sb_mb"));
SBLASTRECORDCHK(&so->so_rcv);
SBLASTMBUFCHK(&so->so_rcv);
@@ -2573,7 +2573,7 @@
/* Statistics. */
if (uio->uio_td)
- uio->uio_td->td_ru.ru_msgrcv++;
+ RU_ATOMIC_INC(uio->uio_td->td_ru.ru_msgrcv);
/* Fill uio until full or current end of socket buffer is reached. */
len = min(uio->uio_resid, sbavail(sb));
@@ -2742,7 +2742,7 @@
SOCKBUF_LOCK_ASSERT(&so->so_rcv);
if (uio->uio_td)
- uio->uio_td->td_ru.ru_msgrcv++;
+ RU_ATOMIC_INC(uio->uio_td->td_ru.ru_msgrcv);
SBLASTRECORDCHK(&so->so_rcv);
SBLASTMBUFCHK(&so->so_rcv);
nextrecord = m->m_nextpkt;
diff --git a/sys/kern/vfs_aio.c b/sys/kern/vfs_aio.c
--- a/sys/kern/vfs_aio.c
+++ b/sys/kern/vfs_aio.c
@@ -568,6 +568,37 @@
return (0);
}
+void
+aio_thread_exit(struct thread *td)
+{
+ struct kaioinfo *ki;
+ struct proc *p;
+
+ /*
+ * Common case. It's OK if we see a slightly out of date version as
+ * we'll recheck under lock if it's non-zero.
+ */
+ if (atomic_load_int(&td->td_aio_count) == 0)
+ return;
+
+ p = td->td_proc;
+ ki = p->p_aioinfo;
+
+ /* If we've ever submitted an IO, p_aioinfo must be set. */
+ MPASS(ki != NULL);
+
+ /*
+ * Wait until there are no unfinished jobs that were submitted by this
+ * thread.
+ */
+ AIO_LOCK(ki);
+ while (atomic_load_int(&td->td_aio_count) > 0) {
+ ki->kaio_flags |= KAIO_WAKEUP;
+ msleep(&p->p_aioinfo, AIO_MTX(ki), PRIBIO, "aiotde", hz);
+ }
+ AIO_UNLOCK(ki);
+}
+
static void
aio_proc_rundown_exec(void *arg, struct proc *p,
struct image_params *imgp __unused)
@@ -938,6 +969,14 @@
taskqueue_enqueue(taskqueue_aiod_kick,
&ki->kaio_sync_task);
}
+
+ /*
+ * Drop our reference to the submitting thread. This allows it to
+ * exit.
+ */
+ atomic_subtract_int(&job->td->td_aio_count, 1);
+ job->td = NULL;
+
if (ki->kaio_flags & KAIO_WAKEUP) {
ki->kaio_flags &= ~KAIO_WAKEUP;
wakeup(&userp->p_aioinfo);
@@ -1035,6 +1074,19 @@
job->uaiocb._aiocb_private.error = error;
job->uaiocb._aiocb_private.status = status;
+ /*
+ * Transfer the resource usage delta to the submitting thread's
+ * counters.
+ */
+ if (job->outblock)
+ RU_ATOMIC_ADD(job->td->td_ru.ru_oublock, job->outblock);
+ if (job->inblock)
+ RU_ATOMIC_ADD(job->td->td_ru.ru_inblock, job->inblock);
+ if (job->msgsnd)
+ RU_ATOMIC_ADD(job->td->td_ru.ru_msgsnd, job->msgsnd);
+ if (job->msgrcv)
+ RU_ATOMIC_ADD(job->td->td_ru.ru_msgrcv, job->msgrcv);
+
userp = job->userproc;
ki = userp->p_aioinfo;
@@ -1703,6 +1755,13 @@
job->uiop->uio_offset = job->uaiocb.aio_offset;
job->uiop->uio_td = td;
+ /*
+ * Take a reference to the submitting thread, so that worker daemons
+ * can update this thread's resource usage counters.
+ */
+ job->td = td;
+ atomic_add_int(&td->td_aio_count, 1);
+
if (opcode == LIO_MLOCK) {
aio_schedule(job, aio_process_mlock);
error = 0;
@@ -1933,10 +1992,6 @@
status = job->uaiocb._aiocb_private.status;
error = job->uaiocb._aiocb_private.error;
td->td_retval[0] = status;
- td->td_ru.ru_oublock += job->outblock;
- td->td_ru.ru_inblock += job->inblock;
- td->td_ru.ru_msgsnd += job->msgsnd;
- td->td_ru.ru_msgrcv += job->msgrcv;
aio_free_entry(job);
AIO_UNLOCK(ki);
ops->store_error(ujob, error);
@@ -2564,10 +2619,6 @@
status = job->uaiocb._aiocb_private.status;
error = job->uaiocb._aiocb_private.error;
td->td_retval[0] = status;
- td->td_ru.ru_oublock += job->outblock;
- td->td_ru.ru_inblock += job->inblock;
- td->td_ru.ru_msgsnd += job->msgsnd;
- td->td_ru.ru_msgrcv += job->msgrcv;
aio_free_entry(job);
AIO_UNLOCK(ki);
ops->store_aiocb(ujobp, ujob);
diff --git a/sys/kern/vfs_bio.c b/sys/kern/vfs_bio.c
--- a/sys/kern/vfs_bio.c
+++ b/sys/kern/vfs_bio.c
@@ -2132,7 +2132,7 @@
PROC_UNLOCK(curproc);
}
#endif /* RACCT */
- td->td_ru.ru_inblock++;
+ RU_ATOMIC_INC(td->td_ru.ru_inblock);
rabp->b_flags |= B_ASYNC;
rabp->b_flags &= ~B_INVAL;
if ((flags & GB_CKHASH) != 0) {
@@ -2205,7 +2205,7 @@
PROC_UNLOCK(td->td_proc);
}
#endif /* RACCT */
- td->td_ru.ru_inblock++;
+ RU_ATOMIC_INC(td->td_ru.ru_inblock);
bp->b_iocmd = BIO_READ;
bp->b_flags &= ~B_INVAL;
if ((flags & GB_CKHASH) != 0) {
@@ -2313,7 +2313,7 @@
PROC_UNLOCK(curproc);
}
#endif /* RACCT */
- curthread->td_ru.ru_oublock++;
+ RU_ATOMIC_INC(curthread->td_ru.ru_oublock);
if (oldflags & B_ASYNC)
BUF_KERNPROC(bp);
bp->b_iooffset = dbtob(bp->b_blkno);
diff --git a/sys/kern/vfs_cluster.c b/sys/kern/vfs_cluster.c
--- a/sys/kern/vfs_cluster.c
+++ b/sys/kern/vfs_cluster.c
@@ -257,7 +257,7 @@
PROC_UNLOCK(td->td_proc);
}
#endif /* RACCT */
- td->td_ru.ru_inblock++;
+ RU_ATOMIC_INC(td->td_ru.ru_inblock);
}
/*
@@ -317,7 +317,7 @@
PROC_UNLOCK(td->td_proc);
}
#endif /* RACCT */
- td->td_ru.ru_inblock++;
+ RU_ATOMIC_INC(td->td_ru.ru_inblock);
}
if (reqbp) {
diff --git a/sys/netinet/sctp_output.c b/sys/netinet/sctp_output.c
--- a/sys/netinet/sctp_output.c
+++ b/sys/netinet/sctp_output.c
@@ -12900,7 +12900,7 @@
}
/* Ok, we will attempt a msgsnd :> */
if (p) {
- p->td_ru.ru_msgsnd++;
+ RU_ATOMIC_INC(p->td_ru.ru_msgsnd);
}
/* Are we aborting? */
if (sinfo_flags & SCTP_ABORT) {
diff --git a/sys/ofed/drivers/infiniband/ulp/sdp/sdp_main.c b/sys/ofed/drivers/infiniband/ulp/sdp/sdp_main.c
--- a/sys/ofed/drivers/infiniband/ulp/sdp/sdp_main.c
+++ b/sys/ofed/drivers/infiniband/ulp/sdp/sdp_main.c
@@ -1100,7 +1100,7 @@
goto out;
}
if (td != NULL)
- td->td_ru.ru_msgsnd++;
+ RU_ATOMIC_INC(td->td_ru.ru_msgsnd);
ssk = sdp_sk(so);
error = SOCK_IO_SEND_LOCK(so, SBLOCKWAIT(flags));
@@ -1348,7 +1348,7 @@
/* Statistics. */
if (uio->uio_td)
- uio->uio_td->td_ru.ru_msgrcv++;
+ RU_ATOMIC_INC(uio->uio_td->td_ru.ru_msgrcv);
/* Fill uio until full or current end of socket buffer is reached. */
len = min(uio->uio_resid, sbavail(sb));
diff --git a/sys/sys/aio.h b/sys/sys/aio.h
--- a/sys/sys/aio.h
+++ b/sys/sys/aio.h
@@ -136,6 +136,7 @@
int msgsnd; /* (*) messages sent */
int msgrcv; /* (*) messages received */
struct proc *userproc; /* (*) user process */
+ struct thread *td; /* (a) submitting thread */
struct ucred *cred; /* (*) active credential when created */
struct file *fd_file; /* (*) pointer to file structure */
struct aioliojob *lio; /* (*) optional lio job */
@@ -210,6 +211,7 @@
void aio_schedule(struct kaiocb *job, aio_handle_fn_t *func);
bool aio_set_cancel_function(struct kaiocb *job, aio_cancel_fn_t *func);
void aio_switch_vmspace(struct kaiocb *job);
+void aio_thread_exit(struct thread *thread);
#else /* !_KERNEL */
diff --git a/sys/sys/proc.h b/sys/sys/proc.h
--- a/sys/sys/proc.h
+++ b/sys/sys/proc.h
@@ -170,6 +170,7 @@
* y - created at first aio, doesn't change until exit or exec at which
* point we are single-threaded and only curthread changes it
* z - zombie threads lock
+ * A - atomic ops
*
* If the locking key specifies two identifiers (for example, p_pptr) then
* either lock is sufficient for read access, but both locks must be held
@@ -383,6 +384,7 @@
#ifdef EPOCH_TRACE
SLIST_HEAD(, epoch_tracker) td_epochs;
#endif
+ int td_aio_count; /* (A) number of AIOs in flight */
};
struct thread0_storage {
diff --git a/sys/sys/resource.h b/sys/sys/resource.h
--- a/sys/sys/resource.h
+++ b/sys/sys/resource.h
@@ -63,7 +63,8 @@
* Resource utilization information.
*
* All fields are only modified by curthread and
- * no locks are required to read.
+ * no locks are required to read, except those marked
+ * (a), which are accessed with atomics.
*/
#define RUSAGE_SELF 0
@@ -81,10 +82,10 @@
long ru_minflt; /* page reclaims */
long ru_majflt; /* page faults */
long ru_nswap; /* swaps */
- long ru_inblock; /* block input operations */
- long ru_oublock; /* block output operations */
- long ru_msgsnd; /* messages sent */
- long ru_msgrcv; /* messages received */
+ long ru_inblock; /* (a) block input operations */
+ long ru_oublock; /* (a) block output operations */
+ long ru_msgsnd; /* (a) messages sent */
+ long ru_msgrcv; /* (a) messages received */
long ru_nsignals; /* signals received */
long ru_nvcsw; /* voluntary context switches */
long ru_nivcsw; /* involuntary " */
@@ -176,6 +177,9 @@
#ifdef _KERNEL
+#define RU_ATOMIC_INC(v) atomic_add_long(&(v), 1)
+#define RU_ATOMIC_ADD(v, d) atomic_add_long(&(v), (d))
+
extern struct loadavg averunnable;
void read_cpu_time(long *cp_time); /* Writes array of CPUSTATES */
diff --git a/tests/sys/aio/Makefile b/tests/sys/aio/Makefile
--- a/tests/sys/aio/Makefile
+++ b/tests/sys/aio/Makefile
@@ -12,7 +12,7 @@
PLAIN_TESTS_C+= aio_kqueue_test
PLAIN_TESTS_C+= lio_kqueue_test
-LIBADD.aio_test+= util rt
+LIBADD.aio_test+= util rt pthread
LIBADD.lio_test+= rt
CFLAGS+= -I${.CURDIR:H:H}
diff --git a/tests/sys/aio/aio_test.c b/tests/sys/aio/aio_test.c
--- a/tests/sys/aio/aio_test.c
+++ b/tests/sys/aio/aio_test.c
@@ -39,6 +39,7 @@
* types.
*/
+#include <sys/event.h>
#include <sys/param.h>
#include <sys/mdioctl.h>
#include <sys/module.h>
@@ -53,6 +54,7 @@
#include <fcntl.h>
#include <libutil.h>
#include <limits.h>
+#include <pthread.h>
#include <semaphore.h>
#include <signal.h>
#include <stdint.h>
@@ -1914,6 +1916,71 @@
aio_zvol_cleanup();
}
+static void *
+aio_threadexit_run(void *arg)
+{
+ struct aiocb *iocb;
+ intptr_t rc;
+
+ iocb = arg;
+ rc = aio_write(iocb);
+
+ return ((void *)rc);
+}
+
+ATF_TC_WITHOUT_HEAD(aio_threadexit);
+ATF_TC_BODY(aio_threadexit, tc)
+{
+ char buffer[] = "hello";
+ struct timespec zero_timeout = {0, 0};
+ pthread_t threads[sizeof(buffer)];
+ struct aiocb iocbs[sizeof(buffer)];
+ struct aiocb *iocbp;
+ int fd;
+
+ ATF_REQUIRE_KERNEL_MODULE("aio");
+ ATF_REQUIRE_UNSAFE_AIO();
+
+ fd = open(FILE_PATHNAME, O_RDWR | O_CREAT, 0600);
+ ATF_REQUIRE(fd >= 0);
+
+ /* Start all the threads, one per byte in buffer. */
+ for (unsigned i = 0; i < nitems(threads); ++i) {
+ memset(&iocbs[i], 0, sizeof(iocbs[i]));
+ iocbs[i].aio_sigevent.sigev_notify = SIGEV_NONE;
+ iocbs[i].aio_fildes = fd;
+ iocbs[i].aio_buf = &buffer[i];
+ iocbs[i].aio_nbytes = 1;
+ iocbs[i].aio_offset = i;
+ ATF_REQUIRE_EQ(0, pthread_create(&threads[i], NULL,
+ aio_threadexit_run, &iocbs[i]));
+ }
+
+ /* Wait for all the threads to exit. */
+ for (unsigned i = 0; i < nitems(threads); ++i) {
+ void *result;
+
+ ATF_REQUIRE_EQ(0, pthread_join(threads[i], &result));
+ ATF_REQUIRE_EQ(0, result);
+ }
+
+ /* Reap the completed IOs, zero timeout should be sufficent.. */
+ for (unsigned i = 0; i < nitems(threads); ++i) {
+ ATF_REQUIRE_EQ(1, aio_waitcomplete(&iocbp, &zero_timeout));
+ }
+
+ /* Nothing more in the kernel's queue. */
+ ATF_REQUIRE_EQ(-1, aio_waitcomplete(&iocbp, &zero_timeout));
+ ATF_REQUIRE_EQ(errno, EAGAIN);
+
+ /* Read the data back to see that our writes all worked. */
+ ATF_REQUIRE_EQ(sizeof(buffer), pread(fd, buffer, sizeof(buffer), 0));
+ ATF_REQUIRE_EQ(0, strcmp("hello", buffer));
+
+ close(fd);
+}
+
+
ATF_TP_ADD_TCS(tp)
{
@@ -1970,6 +2037,7 @@
ATF_TP_ADD_TC(tp, vectored_unaligned);
ATF_TP_ADD_TC(tp, vectored_socket_poll);
ATF_TP_ADD_TC(tp, vectored_thread);
+ ATF_TP_ADD_TC(tp, aio_threadexit);
return (atf_no_error());
}

File Metadata

Mime Type
text/plain
Expires
Sun, Jan 18, 1:08 AM (17 h, 48 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
27691720
Default Alt Text
D33271.diff (14 KB)

Event Timeline