Page Menu
Home
FreeBSD
Search
Configure Global Search
Log In
Files
F142191881
D33271.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
14 KB
Referenced Files
None
Subscribers
None
D33271.diff
View Options
diff --git a/sys/contrib/openzfs/module/os/freebsd/zfs/zfs_racct.c b/sys/contrib/openzfs/module/os/freebsd/zfs/zfs_racct.c
--- a/sys/contrib/openzfs/module/os/freebsd/zfs/zfs_racct.c
+++ b/sys/contrib/openzfs/module/os/freebsd/zfs/zfs_racct.c
@@ -29,7 +29,7 @@
void
zfs_racct_read(uint64_t size, uint64_t iops)
{
- curthread->td_ru.ru_inblock += iops;
+ RU_ATOMIC_ADD(curthread->td_ru.ru_inblock, iops);
#ifdef RACCT
if (racct_enable) {
PROC_LOCK(curproc);
@@ -43,7 +43,7 @@
void
zfs_racct_write(uint64_t size, uint64_t iops)
{
- curthread->td_ru.ru_oublock += iops;
+ RU_ATOMIC_ADD(curthread->td_ru.ru_oublock, iops);
#ifdef RACCT
if (racct_enable) {
PROC_LOCK(curproc);
diff --git a/sys/fs/ext2fs/ext2_bmap.c b/sys/fs/ext2fs/ext2_bmap.c
--- a/sys/fs/ext2fs/ext2_bmap.c
+++ b/sys/fs/ext2fs/ext2_bmap.c
@@ -184,7 +184,7 @@
PROC_UNLOCK(curproc);
}
#endif
- curthread->td_ru.ru_inblock++;
+ RU_ATOMIC_INC(curthread->td_ru.ru_inblock);
error = bufwait(bp);
if (error != 0) {
brelse(bp);
diff --git a/sys/fs/nfsclient/nfs_clvnops.c b/sys/fs/nfsclient/nfs_clvnops.c
--- a/sys/fs/nfsclient/nfs_clvnops.c
+++ b/sys/fs/nfsclient/nfs_clvnops.c
@@ -3506,7 +3506,7 @@
bp->b_iocmd = BIO_WRITE;
bufobj_wref(bp->b_bufobj);
- curthread->td_ru.ru_oublock++;
+ RU_ATOMIC_INC(curthread->td_ru.ru_oublock);
/*
* Note: to avoid loopback deadlocks, we do not
diff --git a/sys/kern/kern_physio.c b/sys/kern/kern_physio.c
--- a/sys/kern/kern_physio.c
+++ b/sys/kern/kern_physio.c
@@ -138,10 +138,10 @@
g_reset_bio(bp);
if (uio->uio_rw == UIO_READ) {
bp->bio_cmd = BIO_READ;
- curthread->td_ru.ru_inblock++;
+ RU_ATOMIC_INC(curthread->td_ru.ru_inblock);
} else {
bp->bio_cmd = BIO_WRITE;
- curthread->td_ru.ru_oublock++;
+ RU_ATOMIC_INC(curthread->td_ru.ru_oublock);
}
bp->bio_offset = uio->uio_offset;
base = uio->uio_iov[i].iov_base;
diff --git a/sys/kern/kern_thread.c b/sys/kern/kern_thread.c
--- a/sys/kern/kern_thread.c
+++ b/sys/kern/kern_thread.c
@@ -64,6 +64,7 @@
#include <sys/pmckern.h>
#endif
#include <sys/priv.h>
+#include <sys/aio.h>
#include <security/audit/audit.h>
@@ -365,6 +366,9 @@
#endif
umtx_thread_alloc(td);
MPASS(td->td_sel == NULL);
+
+ atomic_store_int(&td->td_aio_count, 0);
+
return (0);
}
@@ -406,6 +410,7 @@
#endif
/* Free all OSD associated to this thread. */
osd_thread_exit(td);
+ aio_thread_exit(td);
td_softdep_cleanup(td);
MPASS(td->td_su == NULL);
seltdfini(td);
diff --git a/sys/kern/sys_pipe.c b/sys/kern/sys_pipe.c
--- a/sys/kern/sys_pipe.c
+++ b/sys/kern/sys_pipe.c
@@ -883,7 +883,7 @@
PIPE_UNLOCK(rpipe);
if (nread > 0)
- td->td_ru.ru_msgrcv++;
+ RU_ATOMIC_INC(td->td_ru.ru_msgrcv);
return (error);
}
@@ -1342,7 +1342,7 @@
pipeunlock(wpipe);
PIPE_UNLOCK(rpipe);
if (uio->uio_resid != orig_resid)
- td->td_ru.ru_msgsnd++;
+ RU_ATOMIC_INC(td->td_ru.ru_msgsnd);
return (error);
}
diff --git a/sys/kern/uipc_socket.c b/sys/kern/uipc_socket.c
--- a/sys/kern/uipc_socket.c
+++ b/sys/kern/uipc_socket.c
@@ -1464,7 +1464,7 @@
dontroute =
(flags & MSG_DONTROUTE) && (so->so_options & SO_DONTROUTE) == 0;
if (td != NULL)
- td->td_ru.ru_msgsnd++;
+ RU_ATOMIC_INC(td->td_ru.ru_msgsnd);
if (control != NULL)
clen = control->m_len;
@@ -1640,7 +1640,7 @@
(flags & MSG_DONTROUTE) && (so->so_options & SO_DONTROUTE) == 0 &&
(so->so_proto->pr_flags & PR_ATOMIC);
if (td != NULL)
- td->td_ru.ru_msgsnd++;
+ RU_ATOMIC_INC(td->td_ru.ru_msgsnd);
if (control != NULL)
clen = control->m_len;
@@ -2081,7 +2081,7 @@
*/
SOCKBUF_LOCK_ASSERT(&so->so_rcv);
if (uio->uio_td)
- uio->uio_td->td_ru.ru_msgrcv++;
+ RU_ATOMIC_INC(uio->uio_td->td_ru.ru_msgrcv);
KASSERT(m == so->so_rcv.sb_mb, ("soreceive: m != so->so_rcv.sb_mb"));
SBLASTRECORDCHK(&so->so_rcv);
SBLASTMBUFCHK(&so->so_rcv);
@@ -2573,7 +2573,7 @@
/* Statistics. */
if (uio->uio_td)
- uio->uio_td->td_ru.ru_msgrcv++;
+ RU_ATOMIC_INC(uio->uio_td->td_ru.ru_msgrcv);
/* Fill uio until full or current end of socket buffer is reached. */
len = min(uio->uio_resid, sbavail(sb));
@@ -2742,7 +2742,7 @@
SOCKBUF_LOCK_ASSERT(&so->so_rcv);
if (uio->uio_td)
- uio->uio_td->td_ru.ru_msgrcv++;
+ RU_ATOMIC_INC(uio->uio_td->td_ru.ru_msgrcv);
SBLASTRECORDCHK(&so->so_rcv);
SBLASTMBUFCHK(&so->so_rcv);
nextrecord = m->m_nextpkt;
diff --git a/sys/kern/vfs_aio.c b/sys/kern/vfs_aio.c
--- a/sys/kern/vfs_aio.c
+++ b/sys/kern/vfs_aio.c
@@ -568,6 +568,37 @@
return (0);
}
+void
+aio_thread_exit(struct thread *td)
+{
+ struct kaioinfo *ki;
+ struct proc *p;
+
+ /*
+ * Common case. It's OK if we see a slightly out of date version as
+ * we'll recheck under lock if it's non-zero.
+ */
+ if (atomic_load_int(&td->td_aio_count) == 0)
+ return;
+
+ p = td->td_proc;
+ ki = p->p_aioinfo;
+
+ /* If we've ever submitted an IO, p_aioinfo must be set. */
+ MPASS(ki != NULL);
+
+ /*
+ * Wait until there are no unfinished jobs that were submitted by this
+ * thread.
+ */
+ AIO_LOCK(ki);
+ while (atomic_load_int(&td->td_aio_count) > 0) {
+ ki->kaio_flags |= KAIO_WAKEUP;
+ msleep(&p->p_aioinfo, AIO_MTX(ki), PRIBIO, "aiotde", hz);
+ }
+ AIO_UNLOCK(ki);
+}
+
static void
aio_proc_rundown_exec(void *arg, struct proc *p,
struct image_params *imgp __unused)
@@ -938,6 +969,14 @@
taskqueue_enqueue(taskqueue_aiod_kick,
&ki->kaio_sync_task);
}
+
+ /*
+ * Drop our reference to the submitting thread. This allows it to
+ * exit.
+ */
+ atomic_subtract_int(&job->td->td_aio_count, 1);
+ job->td = NULL;
+
if (ki->kaio_flags & KAIO_WAKEUP) {
ki->kaio_flags &= ~KAIO_WAKEUP;
wakeup(&userp->p_aioinfo);
@@ -1035,6 +1074,19 @@
job->uaiocb._aiocb_private.error = error;
job->uaiocb._aiocb_private.status = status;
+ /*
+ * Transfer the resource usage delta to the submitting thread's
+ * counters.
+ */
+ if (job->outblock)
+ RU_ATOMIC_ADD(job->td->td_ru.ru_oublock, job->outblock);
+ if (job->inblock)
+ RU_ATOMIC_ADD(job->td->td_ru.ru_inblock, job->inblock);
+ if (job->msgsnd)
+ RU_ATOMIC_ADD(job->td->td_ru.ru_msgsnd, job->msgsnd);
+ if (job->msgrcv)
+ RU_ATOMIC_ADD(job->td->td_ru.ru_msgrcv, job->msgrcv);
+
userp = job->userproc;
ki = userp->p_aioinfo;
@@ -1703,6 +1755,13 @@
job->uiop->uio_offset = job->uaiocb.aio_offset;
job->uiop->uio_td = td;
+ /*
+ * Take a reference to the submitting thread, so that worker daemons
+ * can update this thread's resource usage counters.
+ */
+ job->td = td;
+ atomic_add_int(&td->td_aio_count, 1);
+
if (opcode == LIO_MLOCK) {
aio_schedule(job, aio_process_mlock);
error = 0;
@@ -1933,10 +1992,6 @@
status = job->uaiocb._aiocb_private.status;
error = job->uaiocb._aiocb_private.error;
td->td_retval[0] = status;
- td->td_ru.ru_oublock += job->outblock;
- td->td_ru.ru_inblock += job->inblock;
- td->td_ru.ru_msgsnd += job->msgsnd;
- td->td_ru.ru_msgrcv += job->msgrcv;
aio_free_entry(job);
AIO_UNLOCK(ki);
ops->store_error(ujob, error);
@@ -2564,10 +2619,6 @@
status = job->uaiocb._aiocb_private.status;
error = job->uaiocb._aiocb_private.error;
td->td_retval[0] = status;
- td->td_ru.ru_oublock += job->outblock;
- td->td_ru.ru_inblock += job->inblock;
- td->td_ru.ru_msgsnd += job->msgsnd;
- td->td_ru.ru_msgrcv += job->msgrcv;
aio_free_entry(job);
AIO_UNLOCK(ki);
ops->store_aiocb(ujobp, ujob);
diff --git a/sys/kern/vfs_bio.c b/sys/kern/vfs_bio.c
--- a/sys/kern/vfs_bio.c
+++ b/sys/kern/vfs_bio.c
@@ -2132,7 +2132,7 @@
PROC_UNLOCK(curproc);
}
#endif /* RACCT */
- td->td_ru.ru_inblock++;
+ RU_ATOMIC_INC(td->td_ru.ru_inblock);
rabp->b_flags |= B_ASYNC;
rabp->b_flags &= ~B_INVAL;
if ((flags & GB_CKHASH) != 0) {
@@ -2205,7 +2205,7 @@
PROC_UNLOCK(td->td_proc);
}
#endif /* RACCT */
- td->td_ru.ru_inblock++;
+ RU_ATOMIC_INC(td->td_ru.ru_inblock);
bp->b_iocmd = BIO_READ;
bp->b_flags &= ~B_INVAL;
if ((flags & GB_CKHASH) != 0) {
@@ -2313,7 +2313,7 @@
PROC_UNLOCK(curproc);
}
#endif /* RACCT */
- curthread->td_ru.ru_oublock++;
+ RU_ATOMIC_INC(curthread->td_ru.ru_oublock);
if (oldflags & B_ASYNC)
BUF_KERNPROC(bp);
bp->b_iooffset = dbtob(bp->b_blkno);
diff --git a/sys/kern/vfs_cluster.c b/sys/kern/vfs_cluster.c
--- a/sys/kern/vfs_cluster.c
+++ b/sys/kern/vfs_cluster.c
@@ -257,7 +257,7 @@
PROC_UNLOCK(td->td_proc);
}
#endif /* RACCT */
- td->td_ru.ru_inblock++;
+ RU_ATOMIC_INC(td->td_ru.ru_inblock);
}
/*
@@ -317,7 +317,7 @@
PROC_UNLOCK(td->td_proc);
}
#endif /* RACCT */
- td->td_ru.ru_inblock++;
+ RU_ATOMIC_INC(td->td_ru.ru_inblock);
}
if (reqbp) {
diff --git a/sys/netinet/sctp_output.c b/sys/netinet/sctp_output.c
--- a/sys/netinet/sctp_output.c
+++ b/sys/netinet/sctp_output.c
@@ -12900,7 +12900,7 @@
}
/* Ok, we will attempt a msgsnd :> */
if (p) {
- p->td_ru.ru_msgsnd++;
+ RU_ATOMIC_INC(p->td_ru.ru_msgsnd);
}
/* Are we aborting? */
if (sinfo_flags & SCTP_ABORT) {
diff --git a/sys/ofed/drivers/infiniband/ulp/sdp/sdp_main.c b/sys/ofed/drivers/infiniband/ulp/sdp/sdp_main.c
--- a/sys/ofed/drivers/infiniband/ulp/sdp/sdp_main.c
+++ b/sys/ofed/drivers/infiniband/ulp/sdp/sdp_main.c
@@ -1100,7 +1100,7 @@
goto out;
}
if (td != NULL)
- td->td_ru.ru_msgsnd++;
+ RU_ATOMIC_INC(td->td_ru.ru_msgsnd);
ssk = sdp_sk(so);
error = SOCK_IO_SEND_LOCK(so, SBLOCKWAIT(flags));
@@ -1348,7 +1348,7 @@
/* Statistics. */
if (uio->uio_td)
- uio->uio_td->td_ru.ru_msgrcv++;
+ RU_ATOMIC_INC(uio->uio_td->td_ru.ru_msgrcv);
/* Fill uio until full or current end of socket buffer is reached. */
len = min(uio->uio_resid, sbavail(sb));
diff --git a/sys/sys/aio.h b/sys/sys/aio.h
--- a/sys/sys/aio.h
+++ b/sys/sys/aio.h
@@ -136,6 +136,7 @@
int msgsnd; /* (*) messages sent */
int msgrcv; /* (*) messages received */
struct proc *userproc; /* (*) user process */
+ struct thread *td; /* (a) submitting thread */
struct ucred *cred; /* (*) active credential when created */
struct file *fd_file; /* (*) pointer to file structure */
struct aioliojob *lio; /* (*) optional lio job */
@@ -210,6 +211,7 @@
void aio_schedule(struct kaiocb *job, aio_handle_fn_t *func);
bool aio_set_cancel_function(struct kaiocb *job, aio_cancel_fn_t *func);
void aio_switch_vmspace(struct kaiocb *job);
+void aio_thread_exit(struct thread *thread);
#else /* !_KERNEL */
diff --git a/sys/sys/proc.h b/sys/sys/proc.h
--- a/sys/sys/proc.h
+++ b/sys/sys/proc.h
@@ -170,6 +170,7 @@
* y - created at first aio, doesn't change until exit or exec at which
* point we are single-threaded and only curthread changes it
* z - zombie threads lock
+ * A - atomic ops
*
* If the locking key specifies two identifiers (for example, p_pptr) then
* either lock is sufficient for read access, but both locks must be held
@@ -383,6 +384,7 @@
#ifdef EPOCH_TRACE
SLIST_HEAD(, epoch_tracker) td_epochs;
#endif
+ int td_aio_count; /* (A) number of AIOs in flight */
};
struct thread0_storage {
diff --git a/sys/sys/resource.h b/sys/sys/resource.h
--- a/sys/sys/resource.h
+++ b/sys/sys/resource.h
@@ -63,7 +63,8 @@
* Resource utilization information.
*
* All fields are only modified by curthread and
- * no locks are required to read.
+ * no locks are required to read, except those marked
+ * (a), which are accessed with atomics.
*/
#define RUSAGE_SELF 0
@@ -81,10 +82,10 @@
long ru_minflt; /* page reclaims */
long ru_majflt; /* page faults */
long ru_nswap; /* swaps */
- long ru_inblock; /* block input operations */
- long ru_oublock; /* block output operations */
- long ru_msgsnd; /* messages sent */
- long ru_msgrcv; /* messages received */
+ long ru_inblock; /* (a) block input operations */
+ long ru_oublock; /* (a) block output operations */
+ long ru_msgsnd; /* (a) messages sent */
+ long ru_msgrcv; /* (a) messages received */
long ru_nsignals; /* signals received */
long ru_nvcsw; /* voluntary context switches */
long ru_nivcsw; /* involuntary " */
@@ -176,6 +177,9 @@
#ifdef _KERNEL
+#define RU_ATOMIC_INC(v) atomic_add_long(&(v), 1)
+#define RU_ATOMIC_ADD(v, d) atomic_add_long(&(v), (d))
+
extern struct loadavg averunnable;
void read_cpu_time(long *cp_time); /* Writes array of CPUSTATES */
diff --git a/tests/sys/aio/Makefile b/tests/sys/aio/Makefile
--- a/tests/sys/aio/Makefile
+++ b/tests/sys/aio/Makefile
@@ -12,7 +12,7 @@
PLAIN_TESTS_C+= aio_kqueue_test
PLAIN_TESTS_C+= lio_kqueue_test
-LIBADD.aio_test+= util rt
+LIBADD.aio_test+= util rt pthread
LIBADD.lio_test+= rt
CFLAGS+= -I${.CURDIR:H:H}
diff --git a/tests/sys/aio/aio_test.c b/tests/sys/aio/aio_test.c
--- a/tests/sys/aio/aio_test.c
+++ b/tests/sys/aio/aio_test.c
@@ -39,6 +39,7 @@
* types.
*/
+#include <sys/event.h>
#include <sys/param.h>
#include <sys/mdioctl.h>
#include <sys/module.h>
@@ -53,6 +54,7 @@
#include <fcntl.h>
#include <libutil.h>
#include <limits.h>
+#include <pthread.h>
#include <semaphore.h>
#include <signal.h>
#include <stdint.h>
@@ -1914,6 +1916,71 @@
aio_zvol_cleanup();
}
+static void *
+aio_threadexit_run(void *arg)
+{
+ struct aiocb *iocb;
+ intptr_t rc;
+
+ iocb = arg;
+ rc = aio_write(iocb);
+
+ return ((void *)rc);
+}
+
+ATF_TC_WITHOUT_HEAD(aio_threadexit);
+ATF_TC_BODY(aio_threadexit, tc)
+{
+ char buffer[] = "hello";
+ struct timespec zero_timeout = {0, 0};
+ pthread_t threads[sizeof(buffer)];
+ struct aiocb iocbs[sizeof(buffer)];
+ struct aiocb *iocbp;
+ int fd;
+
+ ATF_REQUIRE_KERNEL_MODULE("aio");
+ ATF_REQUIRE_UNSAFE_AIO();
+
+ fd = open(FILE_PATHNAME, O_RDWR | O_CREAT, 0600);
+ ATF_REQUIRE(fd >= 0);
+
+ /* Start all the threads, one per byte in buffer. */
+ for (unsigned i = 0; i < nitems(threads); ++i) {
+ memset(&iocbs[i], 0, sizeof(iocbs[i]));
+ iocbs[i].aio_sigevent.sigev_notify = SIGEV_NONE;
+ iocbs[i].aio_fildes = fd;
+ iocbs[i].aio_buf = &buffer[i];
+ iocbs[i].aio_nbytes = 1;
+ iocbs[i].aio_offset = i;
+ ATF_REQUIRE_EQ(0, pthread_create(&threads[i], NULL,
+ aio_threadexit_run, &iocbs[i]));
+ }
+
+ /* Wait for all the threads to exit. */
+ for (unsigned i = 0; i < nitems(threads); ++i) {
+ void *result;
+
+ ATF_REQUIRE_EQ(0, pthread_join(threads[i], &result));
+ ATF_REQUIRE_EQ(0, result);
+ }
+
+ /* Reap the completed IOs, zero timeout should be sufficent.. */
+ for (unsigned i = 0; i < nitems(threads); ++i) {
+ ATF_REQUIRE_EQ(1, aio_waitcomplete(&iocbp, &zero_timeout));
+ }
+
+ /* Nothing more in the kernel's queue. */
+ ATF_REQUIRE_EQ(-1, aio_waitcomplete(&iocbp, &zero_timeout));
+ ATF_REQUIRE_EQ(errno, EAGAIN);
+
+ /* Read the data back to see that our writes all worked. */
+ ATF_REQUIRE_EQ(sizeof(buffer), pread(fd, buffer, sizeof(buffer), 0));
+ ATF_REQUIRE_EQ(0, strcmp("hello", buffer));
+
+ close(fd);
+}
+
+
ATF_TP_ADD_TCS(tp)
{
@@ -1970,6 +2037,7 @@
ATF_TP_ADD_TC(tp, vectored_unaligned);
ATF_TP_ADD_TC(tp, vectored_socket_poll);
ATF_TP_ADD_TC(tp, vectored_thread);
+ ATF_TP_ADD_TC(tp, aio_threadexit);
return (atf_no_error());
}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sun, Jan 18, 1:08 AM (17 h, 48 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
27691720
Default Alt Text
D33271.diff (14 KB)
Attached To
Mode
D33271: Update AIO rusage counters asynchronously.
Attached
Detach File
Event Timeline
Log In to Comment