Page MenuHomeFreeBSD

D5289.id13428.diff
No OneTemporary

D5289.id13428.diff

Index: share/man/man4/aio.4
===================================================================
--- share/man/man4/aio.4
+++ share/man/man4/aio.4
@@ -27,24 +27,116 @@
.\"
.\" $FreeBSD$
.\"
-.Dd October 24, 2002
+.Dd February 15, 2016
.Dt AIO 4
.Os
.Sh NAME
.Nm aio
.Nd asynchronous I/O
-.Sh SYNOPSIS
-To link into the kernel:
-.Cd "options VFS_AIO"
-.Pp
-To load as a kernel loadable module:
-.Dl kldload aio
.Sh DESCRIPTION
The
.Nm
facility provides system calls for asynchronous I/O.
-It is available both as a kernel option for static inclusion and as a
-dynamic kernel module.
+However, asynchronous I/O operations are only enabled for certain file
+types by default.
+Asynchronous I/O operations for other file types may block an AIO daemon
+indefinitely resulting in process and/or system hangs.
+Asynchronous I/O operations can be enabled for all file types by setting
+the
+.Va vfs.aio.enable_unsafe
+sysctl node to a non-zero value.
+.Pp
+Asynchronous I/O operations on sockets and raw disk devices do not block
+indefinitely and are enabled by default.
+.Pp
+The
+.Nm
+facility uses kernel processes
+(also known as AIO daemons)
+to service most asynchronous I/O requests.
+These processes are grouped into pools containing a variable number of
+processes.
+Each pool will add or remove processes to the pool based on load.
+Pools can be configured by sysctl nodes that define the minimum
+and maximum number of processes as well as the amount of time an idle
+process will wait before exiting.
+.Pp
+One pool of AIO daemons is used to service asynchronous I/O requests for
+sockets.
+These processes are named
+.Dq soaiod<N> .
+The following sysctl nodes are used with this pool:
+.Bl -tag -width indent
+.It Va kern.ipc.aio.num_procs
+The current number of processes in the pool.
+.It Va kern.ipc.aio.target_procs
+The minimum number of processes that should be present in the pool.
+.It Va kern.ipc.aio.max_procs
+The maximum number of processes permitted in the pool.
+.It Va kern.ipc.aio.lifetime
+The amount of time a process is permitted to idle in clock ticks.
+If a process is idle for this amount of time and there are more processes
+in the pool than the target minimum,
+the process will exit.
+.El
+.Pp
+A second pool of AIO daemons isused to service all other asynchronous I/O
+requests except for I/O requests to raw disks.
+These processes are named
+.Dq aiod<N> .
+The following sysctl nodes are used with this pool:
+.Bl -tag -width indent
+.It Va vfs.aio.num_aio_procs
+The current number of processes in the pool.
+.It Va vfs.aio.target_aio_procs
+The minimum number of processes that should be present in the pool.
+.It Va vfs.aio.max_aio_procs
+The maximum number of processes permitted in the pool.
+.It Va vfs.aio.aiod_lifetime
+The amount of time a process is permitted to idle in clock ticks.
+If a process is idle for this amount of time and there are more processes
+in the pool than the target minimum,
+the process will exit.
+.El
+.Pp
+Asynchronous I/O requests for raw disks are queued directly to the disk
+device layer after temporarily wiring the user pages associated with the
+request.
+These requests are not serviced by any of the AIO daemon pools.
+.Pp
+Several limits on the number of asynchronous I/O requests are imposed both
+system-wide and per-process.
+These limits are configured via the following sysctls:
+.Bl -tag -width indent
+.It Va vfs.aio.max_buf_aio
+The maximum number of queued asynchronous I/O requests for raw disks permitted
+for a single process.
+Asynchronous I/O requests that have completed but whose status has not been
+retrieved via
+.Xr aio_return 2
+or
+.Xr aio_waitcomplete 2
+are not counted against this limit.
+.It Va vfs.aio.num_buf_aio
+The number of queued asynchronous I/O requests for raw disks system-wide.
+.It Va vfs.aio.max_aio_queue_per_proc
+The maximum number of asynchronous I/O requests for a single process
+serviced concurrently by the default AIO daemon pool.
+.It Va vfs.aio.max_aio_per_proc
+The maximum number of outstanding asynchronous I/O requests permitted for a
+single process.
+This includes requests that have not been serviced,
+requests currently being serviced,
+and requests that have completed but whose status has not been retrieved via
+.Xr aio_return 2
+or
+.Xr aio_waitcomplete 2 .
+.It Va vfs.aio.num_queue_count
+The number of outstanding asynchronous I/O requests system-wide.
+.It Va vfs.aio.max_aio_queue
+The maximum number of outstanding asynchronous I/O requests permitted
+system-wide.
+.El
.Sh SEE ALSO
.Xr aio_cancel 2 ,
.Xr aio_error 2 ,
@@ -54,9 +146,7 @@
.Xr aio_waitcomplete 2 ,
.Xr aio_write 2 ,
.Xr lio_listio 2 ,
-.Xr config 8 ,
-.Xr kldload 8 ,
-.Xr kldunload 8
+.Xr sysctl 8
.Sh HISTORY
The
.Nm
@@ -66,3 +156,7 @@
.Nm
kernel module appeared in
.Fx 5.0 .
+The
+.Nm
+facility was integrated into all kernels in
+.Fx 11.0 .
Index: sys/conf/NOTES
===================================================================
--- sys/conf/NOTES
+++ sys/conf/NOTES
@@ -1131,11 +1131,6 @@
#
options REISERFS
-# Use real implementations of the aio_* system calls. There are numerous
-# stability and security issues in the current aio code that make it
-# unsuitable for inclusion on machines with untrusted local users.
-options VFS_AIO
-
# Cryptographically secure random number generator; /dev/random
device random
Index: sys/conf/files
===================================================================
--- sys/conf/files
+++ sys/conf/files
@@ -3330,7 +3330,7 @@
kern/uipc_syscalls.c standard
kern/uipc_usrreq.c standard
kern/vfs_acl.c standard
-kern/vfs_aio.c optional vfs_aio
+kern/vfs_aio.c standard
kern/vfs_bio.c standard
kern/vfs_cache.c standard
kern/vfs_cluster.c standard
Index: sys/conf/options
===================================================================
--- sys/conf/options
+++ sys/conf/options
@@ -214,7 +214,6 @@
SW_WATCHDOG opt_watchdog.h
TURNSTILE_PROFILING
UMTX_PROFILING
-VFS_AIO
VERBOSE_SYSINIT
WLCACHE opt_wavelan.h
WLDEBUG opt_wavelan.h
Index: sys/kern/sys_socket.c
===================================================================
--- sys/kern/sys_socket.c
+++ sys/kern/sys_socket.c
@@ -34,9 +34,12 @@
#include <sys/param.h>
#include <sys/systm.h>
+#include <sys/aio.h>
#include <sys/domain.h>
#include <sys/file.h>
#include <sys/filedesc.h>
+#include <sys/kernel.h>
+#include <sys/kthread.h>
#include <sys/malloc.h>
#include <sys/proc.h>
#include <sys/protosw.h>
@@ -48,6 +51,9 @@
#include <sys/filio.h> /* XXX */
#include <sys/sockio.h>
#include <sys/stat.h>
+#include <sys/sysctl.h>
+#include <sys/sysproto.h>
+#include <sys/taskqueue.h>
#include <sys/uio.h>
#include <sys/ucred.h>
#include <sys/un.h>
@@ -64,6 +70,22 @@
#include <security/mac/mac_framework.h>
+#include <vm/vm.h>
+#include <vm/pmap.h>
+#include <vm/vm_extern.h>
+#include <vm/vm_map.h>
+
+static SYSCTL_NODE(_kern_ipc, OID_AUTO, aio, CTLFLAG_RD, NULL,
+ "socket AIO stats");
+
+static int empty_results;
+SYSCTL_INT(_kern_ipc_aio, OID_AUTO, empty_results, CTLFLAG_RD, &empty_results,
+ 0, "socket operation returned EAGAIN");
+
+static int empty_retries;
+SYSCTL_INT(_kern_ipc_aio, OID_AUTO, empty_retries, CTLFLAG_RD, &empty_retries,
+ 0, "socket operation retries");
+
static fo_rdwr_t soo_read;
static fo_rdwr_t soo_write;
static fo_ioctl_t soo_ioctl;
@@ -72,6 +94,9 @@
static fo_stat_t soo_stat;
static fo_close_t soo_close;
static fo_fill_kinfo_t soo_fill_kinfo;
+static fo_aio_queue_t soo_aio_queue;
+
+static void soo_aio_cancel(struct kaiocb *job);
struct fileops socketops = {
.fo_read = soo_read,
@@ -86,6 +111,7 @@
.fo_chown = invfo_chown,
.fo_sendfile = invfo_sendfile,
.fo_fill_kinfo = soo_fill_kinfo,
+ .fo_aio_queue = soo_aio_queue,
.fo_flags = DFLAG_PASSABLE
};
@@ -363,3 +389,374 @@
sizeof(kif->kf_path));
return (0);
}
+
+static STAILQ_HEAD(, task) soaio_jobs;
+static struct mtx soaio_jobs_lock;
+static struct task soaio_kproc_task;
+static int soaio_starting, soaio_idle, soaio_queued;
+static struct unrhdr *soaio_kproc_unr;
+
+static int soaio_max_procs = MAX_AIO_PROCS;
+SYSCTL_INT(_kern_ipc_aio, OID_AUTO, max_procs, CTLFLAG_RW, &soaio_max_procs, 0,
+ "Maximum number of kernel processes to use for async socket IO");
+
+static int soaio_num_procs;
+SYSCTL_INT(_kern_ipc_aio, OID_AUTO, num_procs, CTLFLAG_RD, &soaio_num_procs, 0,
+ "Number of active kernel processes for async socket IO");
+
+static int soaio_target_procs = TARGET_AIO_PROCS;
+SYSCTL_INT(_kern_ipc_aio, OID_AUTO, target_procs, CTLFLAG_RD,
+ &soaio_target_procs, 0,
+ "Preferred number of ready kernel processes for async socket IO");
+
+static int soaio_lifetime;
+SYSCTL_INT(_kern_ipc_aio, OID_AUTO, lifetime, CTLFLAG_RW, &soaio_lifetime, 0,
+ "Maximum lifetime for idle aiod");
+
+static void
+soaio_kproc_loop(void *arg)
+{
+ struct proc *p;
+ struct vmspace *myvm;
+ struct task *task;
+ int error, id, pending;
+
+ id = (intptr_t)arg;
+
+ /*
+ * Grab an extra reference on the daemon's vmspace so that it
+ * doesn't get freed by jobs that switch to a different
+ * vmspace.
+ */
+ p = curproc;
+ myvm = vmspace_acquire_ref(p);
+
+ mtx_lock(&soaio_jobs_lock);
+ MPASS(soaio_starting > 0);
+ soaio_starting--;
+ for (;;) {
+ while (!STAILQ_EMPTY(&soaio_jobs)) {
+ task = STAILQ_FIRST(&soaio_jobs);
+ STAILQ_REMOVE_HEAD(&soaio_jobs, ta_link);
+ soaio_queued--;
+ pending = task->ta_pending;
+ task->ta_pending = 0;
+ mtx_unlock(&soaio_jobs_lock);
+
+ task->ta_func(task->ta_context, pending);
+
+ mtx_lock(&soaio_jobs_lock);
+ }
+ MPASS(soaio_queued == 0);
+
+ if (p->p_vmspace != myvm) {
+ mtx_unlock(&soaio_jobs_lock);
+ vmspace_switch_aio(myvm);
+ mtx_lock(&soaio_jobs_lock);
+ continue;
+ }
+
+ soaio_idle++;
+ error = mtx_sleep(&soaio_idle, &soaio_jobs_lock, 0, "-",
+ soaio_lifetime);
+ soaio_idle--;
+ if (error == EWOULDBLOCK && STAILQ_EMPTY(&soaio_jobs) &&
+ soaio_num_procs > soaio_target_procs)
+ break;
+ }
+ soaio_num_procs--;
+ mtx_unlock(&soaio_jobs_lock);
+ free_unr(soaio_kproc_unr, id);
+ kproc_exit(0);
+}
+
+static void
+soaio_kproc_create(void *context, int pending)
+{
+ struct proc *p;
+ int error, id;
+
+ mtx_lock(&soaio_jobs_lock);
+ for (;;) {
+ if (soaio_num_procs < soaio_target_procs) {
+ /* Must create */
+ } else if (soaio_num_procs >= soaio_max_procs) {
+ /*
+ * Hit the limit on kernel processes, don't
+ * create another one.
+ */
+ break;
+ } else if (soaio_queued <= soaio_idle + soaio_starting) {
+ /*
+ * No more AIO jobs waiting for a process to be
+ * created, so stop.
+ */
+ break;
+ }
+ soaio_starting++;
+ mtx_unlock(&soaio_jobs_lock);
+
+ id = alloc_unr(soaio_kproc_unr);
+ error = kproc_create(soaio_kproc_loop, (void *)(intptr_t)id,
+ &p, 0, 0, "soaiod%d", id);
+ if (error != 0) {
+ free_unr(soaio_kproc_unr, id);
+ mtx_lock(&soaio_jobs_lock);
+ soaio_starting--;
+ break;
+ }
+
+ mtx_lock(&soaio_jobs_lock);
+ soaio_num_procs++;
+ }
+ mtx_unlock(&soaio_jobs_lock);
+}
+
+static void
+soaio_enqueue(struct task *task)
+{
+
+ mtx_lock(&soaio_jobs_lock);
+ MPASS(task->ta_pending == 0);
+ task->ta_pending++;
+ STAILQ_INSERT_TAIL(&soaio_jobs, task, ta_link);
+ soaio_queued++;
+ if (soaio_queued <= soaio_idle)
+ wakeup_one(&soaio_idle);
+ else if (soaio_num_procs < soaio_max_procs)
+ taskqueue_enqueue(taskqueue_thread, &soaio_kproc_task);
+ mtx_unlock(&soaio_jobs_lock);
+}
+
+static void
+soaio_init(void)
+{
+
+ soaio_lifetime = AIOD_LIFETIME_DEFAULT;
+ STAILQ_INIT(&soaio_jobs);
+ mtx_init(&soaio_jobs_lock, "soaio jobs", NULL, MTX_DEF);
+ soaio_kproc_unr = new_unrhdr(1, INT_MAX, NULL);
+ TASK_INIT(&soaio_kproc_task, 0, soaio_kproc_create, NULL);
+ if (soaio_target_procs > 0)
+ taskqueue_enqueue(taskqueue_thread, &soaio_kproc_task);
+}
+SYSINIT(soaio, SI_SUB_VFS, SI_ORDER_ANY, soaio_init, NULL);
+
+static __inline int
+soaio_ready(struct socket *so, struct sockbuf *sb)
+{
+ return (sb == &so->so_rcv ? soreadable(so) : sowriteable(so));
+}
+
+static void
+soaio_process_job(struct socket *so, struct sockbuf *sb, struct kaiocb *job)
+{
+ struct ucred *td_savedcred;
+ struct thread *td;
+ struct file *fp;
+ struct uio uio;
+ struct iovec iov;
+ size_t cnt;
+ int error, flags;
+
+ SOCKBUF_UNLOCK(sb);
+ aio_switch_vmspace(job);
+ td = curthread;
+ fp = job->fd_file;
+retry:
+ td_savedcred = td->td_ucred;
+ td->td_ucred = job->cred;
+
+ cnt = job->uaiocb.aio_nbytes;
+ iov.iov_base = (void *)(uintptr_t)job->uaiocb.aio_buf;
+ iov.iov_len = cnt;
+ uio.uio_iov = &iov;
+ uio.uio_iovcnt = 1;
+ uio.uio_offset = 0;
+ uio.uio_resid = cnt;
+ uio.uio_segflg = UIO_USERSPACE;
+ uio.uio_td = td;
+ flags = MSG_NBIO;
+
+ /* TODO: Charge ru_msg* to job. */
+
+ if (sb == &so->so_rcv) {
+ uio.uio_rw = UIO_READ;
+#ifdef MAC
+ error = mac_socket_check_receive(fp->f_cred, so);
+ if (error == 0)
+
+#endif
+ error = soreceive(so, NULL, &uio, NULL, NULL, &flags);
+ } else {
+ uio.uio_rw = UIO_WRITE;
+#ifdef MAC
+ error = mac_socket_check_send(fp->f_cred, so);
+ if (error == 0)
+#endif
+ error = sosend(so, NULL, &uio, NULL, NULL, flags, td);
+ if (error == EPIPE && (so->so_options & SO_NOSIGPIPE) == 0) {
+ PROC_LOCK(job->userproc);
+ kern_psignal(job->userproc, SIGPIPE);
+ PROC_UNLOCK(job->userproc);
+ }
+ }
+
+ cnt -= uio.uio_resid;
+ td->td_ucred = td_savedcred;
+
+ /* XXX: Not sure if this is needed? */
+ if (cnt != 0 && (error == ERESTART || error == EINTR ||
+ error == EWOULDBLOCK))
+ error = 0;
+ if (error == EWOULDBLOCK) {
+ /*
+ * A read() or write() on the socket raced with this
+ * request. If the socket is now ready, try again.
+ * If it is not, place this request at the head of the
+ * queue to try again when the socket is ready.
+ */
+ SOCKBUF_LOCK(sb);
+ empty_results++;
+ if (soaio_ready(so, sb)) {
+ empty_retries++;
+ SOCKBUF_UNLOCK(sb);
+ goto retry;
+ }
+
+ if (!aio_set_cancel_function(job, soo_aio_cancel)) {
+ MPASS(cnt == 0);
+ SOCKBUF_UNLOCK(sb);
+ aio_cancel(job);
+ SOCKBUF_LOCK(sb);
+ } else {
+ TAILQ_INSERT_HEAD(&sb->sb_aiojobq, job, list);
+ }
+ } else {
+ aio_complete(job, cnt, error);
+ SOCKBUF_LOCK(sb);
+ }
+}
+
+static void
+soaio_process_sb(struct socket *so, struct sockbuf *sb)
+{
+ struct kaiocb *job;
+
+ SOCKBUF_LOCK(sb);
+ while (!TAILQ_EMPTY(&sb->sb_aiojobq) && soaio_ready(so, sb)) {
+ job = TAILQ_FIRST(&sb->sb_aiojobq);
+ TAILQ_REMOVE(&sb->sb_aiojobq, job, list);
+ if (!aio_clear_cancel_function(job))
+ continue;
+
+ soaio_process_job(so, sb, job);
+ }
+
+ /*
+ * If there are still pending requests, the socket must not be
+ * ready so set SB_AIO to request a wakeup when the socket
+ * becomes ready.
+ */
+ if (!TAILQ_EMPTY(&sb->sb_aiojobq))
+ sb->sb_flags |= SB_AIO;
+ sb->sb_flags &= ~SB_AIO_RUNNING;
+ SOCKBUF_UNLOCK(sb);
+
+ ACCEPT_LOCK();
+ SOCK_LOCK(so);
+ sorele(so);
+}
+
+void
+soaio_rcv(void *context, int pending)
+{
+ struct socket *so;
+
+ so = context;
+ soaio_process_sb(so, &so->so_rcv);
+}
+
+void
+soaio_snd(void *context, int pending)
+{
+ struct socket *so;
+
+ so = context;
+ soaio_process_sb(so, &so->so_snd);
+}
+
+void
+sowakeup_aio(struct socket *so, struct sockbuf *sb)
+{
+
+ SOCKBUF_LOCK_ASSERT(sb);
+ sb->sb_flags &= ~SB_AIO;
+ if (sb->sb_flags & SB_AIO_RUNNING)
+ return;
+ sb->sb_flags |= SB_AIO_RUNNING;
+ if (sb == &so->so_snd)
+ SOCK_LOCK(so);
+ soref(so);
+ if (sb == &so->so_snd)
+ SOCK_UNLOCK(so);
+ soaio_enqueue(&sb->sb_aiotask);
+}
+
+static void
+soo_aio_cancel(struct kaiocb *job)
+{
+ struct socket *so;
+ struct sockbuf *sb;
+ int opcode;
+
+ so = job->fd_file->f_data;
+ opcode = job->uaiocb.aio_lio_opcode;
+ if (opcode == LIO_READ)
+ sb = &so->so_rcv;
+ else {
+ MPASS(opcode == LIO_WRITE);
+ sb = &so->so_snd;
+ }
+
+ SOCKBUF_LOCK(sb);
+ if (!aio_cancel_cleared(job))
+ TAILQ_REMOVE(&sb->sb_aiojobq, job, list);
+ if (TAILQ_EMPTY(&sb->sb_aiojobq))
+ sb->sb_flags &= ~SB_AIO;
+ SOCKBUF_UNLOCK(sb);
+
+ aio_cancel(job);
+}
+
+static int
+soo_aio_queue(struct file *fp, struct kaiocb *job)
+{
+ struct socket *so;
+ struct sockbuf *sb;
+
+ so = fp->f_data;
+ switch (job->uaiocb.aio_lio_opcode) {
+ case LIO_READ:
+ sb = &so->so_rcv;
+ break;
+ case LIO_WRITE:
+ sb = &so->so_snd;
+ break;
+ default:
+ return (EINVAL);
+ }
+
+ SOCKBUF_LOCK(sb);
+ if (!aio_set_cancel_function(job, soo_aio_cancel))
+ panic("new job was cancelled");
+ TAILQ_INSERT_TAIL(&sb->sb_aiojobq, job, list);
+ if (!(sb->sb_flags & SB_AIO_RUNNING)) {
+ if (soaio_ready(so, sb))
+ sowakeup_aio(so, sb);
+ else
+ sb->sb_flags |= SB_AIO;
+ }
+ SOCKBUF_UNLOCK(sb);
+ return (0);
+}
Index: sys/kern/uipc_debug.c
===================================================================
--- sys/kern/uipc_debug.c
+++ sys/kern/uipc_debug.c
@@ -416,6 +416,9 @@
db_printf("sb_flags: 0x%x (", sb->sb_flags);
db_print_sbflags(sb->sb_flags);
db_printf(")\n");
+
+ db_print_indent(indent);
+ db_printf("sb_aiojobq first: %p\n", TAILQ_FIRST(&sb->sb_aiojobq));
}
static void
@@ -470,7 +473,6 @@
db_print_indent(indent);
db_printf("so_sigio: %p ", so->so_sigio);
db_printf("so_oobmark: %lu ", so->so_oobmark);
- db_printf("so_aiojobq first: %p\n", TAILQ_FIRST(&so->so_aiojobq));
db_print_sockbuf(&so->so_rcv, "so_rcv", indent);
db_print_sockbuf(&so->so_snd, "so_snd", indent);
Index: sys/kern/uipc_sockbuf.c
===================================================================
--- sys/kern/uipc_sockbuf.c
+++ sys/kern/uipc_sockbuf.c
@@ -332,7 +332,7 @@
} else
ret = SU_OK;
if (sb->sb_flags & SB_AIO)
- aio_swake(so, sb);
+ sowakeup_aio(so, sb);
SOCKBUF_UNLOCK(sb);
if (ret == SU_ISCONNECTED)
soisconnected(so);
Index: sys/kern/uipc_socket.c
===================================================================
--- sys/kern/uipc_socket.c
+++ sys/kern/uipc_socket.c
@@ -134,6 +134,7 @@
#include <sys/stat.h>
#include <sys/sx.h>
#include <sys/sysctl.h>
+#include <sys/taskqueue.h>
#include <sys/uio.h>
#include <sys/jail.h>
#include <sys/syslog.h>
@@ -396,7 +397,10 @@
SOCKBUF_LOCK_INIT(&so->so_rcv, "so_rcv");
sx_init(&so->so_snd.sb_sx, "so_snd_sx");
sx_init(&so->so_rcv.sb_sx, "so_rcv_sx");
- TAILQ_INIT(&so->so_aiojobq);
+ TAILQ_INIT(&so->so_snd.sb_aiojobq);
+ TAILQ_INIT(&so->so_rcv.sb_aiojobq);
+ TASK_INIT(&so->so_snd.sb_aiotask, 0, soaio_snd, so);
+ TASK_INIT(&so->so_rcv.sb_aiotask, 0, soaio_rcv, so);
#ifdef VIMAGE
VNET_ASSERT(vnet != NULL, ("%s:%d vnet is NULL, so=%p",
__func__, __LINE__, so));
Index: sys/kern/vfs_aio.c
===================================================================
--- sys/kern/vfs_aio.c
+++ sys/kern/vfs_aio.c
@@ -72,8 +72,6 @@
#include <vm/uma.h>
#include <sys/aio.h>
-#include "opt_vfs_aio.h"
-
/*
* Counter for allocating reference ids to new jobs. Wrapped to 1 on
* overflow. (XXX will be removed soon.)
@@ -85,14 +83,6 @@
*/
static uint64_t jobseqno;
-#define JOBST_NULL 0
-#define JOBST_JOBQSOCK 1
-#define JOBST_JOBQGLOBAL 2
-#define JOBST_JOBRUNNING 3
-#define JOBST_JOBFINISHED 4
-#define JOBST_JOBQBUF 5
-#define JOBST_JOBQSYNC 6
-
#ifndef MAX_AIO_PER_PROC
#define MAX_AIO_PER_PROC 32
#endif
@@ -101,26 +91,14 @@
#define MAX_AIO_QUEUE_PER_PROC 256 /* Bigger than AIO_LISTIO_MAX */
#endif
-#ifndef MAX_AIO_PROCS
-#define MAX_AIO_PROCS 32
-#endif
-
#ifndef MAX_AIO_QUEUE
#define MAX_AIO_QUEUE 1024 /* Bigger than AIO_LISTIO_MAX */
#endif
-#ifndef TARGET_AIO_PROCS
-#define TARGET_AIO_PROCS 4
-#endif
-
#ifndef MAX_BUF_AIO
#define MAX_BUF_AIO 16
#endif
-#ifndef AIOD_LIFETIME_DEFAULT
-#define AIOD_LIFETIME_DEFAULT (30 * hz)
-#endif
-
FEATURE(aio, "Asynchronous I/O");
static MALLOC_DEFINE(M_LIO, "lio", "listio aio control block list");
@@ -128,6 +106,10 @@
static SYSCTL_NODE(_vfs, OID_AUTO, aio, CTLFLAG_RW, 0,
"Async IO management");
+static int enable_aio_unsafe = 0;
+SYSCTL_INT(_vfs_aio, OID_AUTO, enable_unsafe, CTLFLAG_RW, &enable_aio_unsafe, 0,
+ "Permit asynchronous IO on all file types, not just known-safe types");
+
static int max_aio_procs = MAX_AIO_PROCS;
SYSCTL_INT(_vfs_aio, OID_AUTO, max_aio_procs, CTLFLAG_RW, &max_aio_procs, 0,
"Maximum number of kernel processes to use for handling async IO ");
@@ -165,11 +147,6 @@
SYSCTL_INT(_vfs_aio, OID_AUTO, aiod_lifetime, CTLFLAG_RW, &aiod_lifetime, 0,
"Maximum lifetime for idle aiod");
-static int unloadable = 0;
-SYSCTL_INT(_vfs_aio, OID_AUTO, unloadable, CTLFLAG_RW, &unloadable, 0,
- "Allow unload of aio (not recommended)");
-
-
static int max_aio_per_proc = MAX_AIO_PER_PROC;
SYSCTL_INT(_vfs_aio, OID_AUTO, max_aio_per_proc, CTLFLAG_RW, &max_aio_per_proc,
0,
@@ -208,46 +185,27 @@
*/
/*
- * Current, there is only two backends: BIO and generic file I/O.
- * socket I/O is served by generic file I/O, this is not a good idea, since
- * disk file I/O and any other types without O_NONBLOCK flag can block daemon
- * processes, if there is no thread to serve socket I/O, the socket I/O will be
- * delayed too long or starved, we should create some processes dedicated to
- * sockets to do non-blocking I/O, same for pipe and fifo, for these I/O
- * systems we really need non-blocking interface, fiddling O_NONBLOCK in file
- * structure is not safe because there is race between userland and aio
- * daemons.
+ * If the routine that services an AIO request blocks while running in an
+ * AIO kernel process it can starve other I/O requests. BIO requests
+ * queued via aio_qphysio() complete in GEOM and do not use AIO kernel
+ * processes at all. Socket I/O requests use a separate pool of
+ * kprocs and also force non-blocking I/O. Other file I/O requests
+ * use the generic fo_read/fo_write operations which can block. The
+ * fsync and mlock operations can also block while executing. Ideally
+ * none of these requests would block while executing.
+ *
+ * Note that the service routines cannot toggle O_NONBLOCK in the file
+ * structure directly while handling a request due to races with
+ * userland threads.
*/
-struct kaiocb {
- TAILQ_ENTRY(kaiocb) list; /* (b) internal list of for backend */
- TAILQ_ENTRY(kaiocb) plist; /* (a) list of jobs for each backend */
- TAILQ_ENTRY(kaiocb) allist; /* (a) list of all jobs in proc */
- int jobflags; /* (a) job flags */
- int jobstate; /* (b) job state */
- int inputcharge; /* (*) input blockes */
- int outputcharge; /* (*) output blockes */
- struct bio *bp; /* (*) BIO backend BIO pointer */
- struct buf *pbuf; /* (*) BIO backend buffer pointer */
- struct vm_page *pages[btoc(MAXPHYS)+1]; /* BIO backend pages */
- int npages; /* BIO backend number of pages */
- struct proc *userproc; /* (*) user process */
- struct ucred *cred; /* (*) active credential when created */
- struct file *fd_file; /* (*) pointer to file structure */
- struct aioliojob *lio; /* (*) optional lio job */
- struct aiocb *ujob; /* (*) pointer in userspace of aiocb */
- struct knlist klist; /* (a) list of knotes */
- struct aiocb uaiocb; /* (*) kernel I/O control block */
- ksiginfo_t ksi; /* (a) realtime signal info */
- uint64_t seqno; /* (*) job number */
- int pending; /* (a) number of pending I/O, aio_fsync only */
-};
-
/* jobflags */
-#define KAIOCB_DONE 0x01
-#define KAIOCB_BUFDONE 0x02
-#define KAIOCB_RUNDOWN 0x04
+#define KAIOCB_QUEUEING 0x01
+#define KAIOCB_CANCELLED 0x02
+#define KAIOCB_CANCELLING 0x04
#define KAIOCB_CHECKSYNC 0x08
+#define KAIOCB_CLEARED 0x10
+#define KAIOCB_FINISHED 0x20
/*
* AIO process info
@@ -293,9 +251,10 @@
TAILQ_HEAD(,kaiocb) kaio_done; /* (a) done queue for process */
TAILQ_HEAD(,aioliojob) kaio_liojoblist; /* (a) list of lio jobs */
TAILQ_HEAD(,kaiocb) kaio_jobqueue; /* (a) job queue for process */
- TAILQ_HEAD(,kaiocb) kaio_bufqueue; /* (a) buffer job queue */
TAILQ_HEAD(,kaiocb) kaio_syncqueue; /* (a) queue for aio_fsync */
+ TAILQ_HEAD(,kaiocb) kaio_syncready; /* (a) second q for aio_fsync */
struct task kaio_task; /* (*) task to kick aio processes */
+ struct task kaio_sync_task; /* (*) task to schedule fsync jobs */
};
#define AIO_LOCK(ki) mtx_lock(&(ki)->kaio_mtx)
@@ -332,21 +291,18 @@
static void aio_process_rw(struct kaiocb *job);
static void aio_process_sync(struct kaiocb *job);
static void aio_process_mlock(struct kaiocb *job);
+static void aio_schedule_fsync(void *context, int pending);
static int aio_newproc(int *);
int aio_aqueue(struct thread *td, struct aiocb *ujob,
struct aioliojob *lio, int type, struct aiocb_ops *ops);
+static int aio_queue_file(struct file *fp, struct kaiocb *job);
static void aio_physwakeup(struct bio *bp);
static void aio_proc_rundown(void *arg, struct proc *p);
static void aio_proc_rundown_exec(void *arg, struct proc *p,
struct image_params *imgp);
static int aio_qphysio(struct proc *p, struct kaiocb *job);
static void aio_daemon(void *param);
-static void aio_swake_cb(struct socket *, struct sockbuf *);
-static int aio_unload(void);
-static void aio_bio_done_notify(struct proc *userp, struct kaiocb *job,
- int type);
-#define DONE_BUF 1
-#define DONE_QUEUE 2
+static void aio_bio_done_notify(struct proc *userp, struct kaiocb *job);
static int aio_kick(struct proc *userp);
static void aio_kick_nowait(struct proc *userp);
static void aio_kick_helper(void *context, int pending);
@@ -397,13 +353,10 @@
case MOD_LOAD:
aio_onceonly();
break;
- case MOD_UNLOAD:
- error = aio_unload();
- break;
case MOD_SHUTDOWN:
break;
default:
- error = EINVAL;
+ error = EOPNOTSUPP;
break;
}
return (error);
@@ -471,8 +424,6 @@
{
int error;
- /* XXX: should probably just use so->callback */
- aio_swake = &aio_swake_cb;
exit_tag = EVENTHANDLER_REGISTER(process_exit, aio_proc_rundown, NULL,
EVENTHANDLER_PRI_ANY);
exec_tag = EVENTHANDLER_REGISTER(process_exec, aio_proc_rundown_exec,
@@ -513,55 +464,6 @@
}
/*
- * Callback for unload of AIO when used as a module.
- */
-static int
-aio_unload(void)
-{
- int error;
-
- /*
- * XXX: no unloads by default, it's too dangerous.
- * perhaps we could do it if locked out callers and then
- * did an aio_proc_rundown() on each process.
- *
- * jhb: aio_proc_rundown() needs to run on curproc though,
- * so I don't think that would fly.
- */
- if (!unloadable)
- return (EOPNOTSUPP);
-
-#ifdef COMPAT_FREEBSD32
- syscall32_helper_unregister(aio32_syscalls);
-#endif
- syscall_helper_unregister(aio_syscalls);
-
- error = kqueue_del_filteropts(EVFILT_AIO);
- if (error)
- return error;
- error = kqueue_del_filteropts(EVFILT_LIO);
- if (error)
- return error;
- async_io_version = 0;
- aio_swake = NULL;
- taskqueue_free(taskqueue_aiod_kick);
- delete_unrhdr(aiod_unr);
- uma_zdestroy(kaio_zone);
- uma_zdestroy(aiop_zone);
- uma_zdestroy(aiocb_zone);
- uma_zdestroy(aiol_zone);
- uma_zdestroy(aiolio_zone);
- EVENTHANDLER_DEREGISTER(process_exit, exit_tag);
- EVENTHANDLER_DEREGISTER(process_exec, exec_tag);
- mtx_destroy(&aio_job_mtx);
- sema_destroy(&aio_newproc_sem);
- p31b_setcfg(CTL_P1003_1B_AIO_LISTIO_MAX, -1);
- p31b_setcfg(CTL_P1003_1B_AIO_MAX, -1);
- p31b_setcfg(CTL_P1003_1B_AIO_PRIO_DELTA_MAX, -1);
- return (0);
-}
-
-/*
* Init the per-process aioinfo structure. The aioinfo limits are set
* per-process for user limit (resource) management.
*/
@@ -582,10 +484,11 @@
TAILQ_INIT(&ki->kaio_all);
TAILQ_INIT(&ki->kaio_done);
TAILQ_INIT(&ki->kaio_jobqueue);
- TAILQ_INIT(&ki->kaio_bufqueue);
TAILQ_INIT(&ki->kaio_liojoblist);
TAILQ_INIT(&ki->kaio_syncqueue);
+ TAILQ_INIT(&ki->kaio_syncready);
TASK_INIT(&ki->kaio_task, 0, aio_kick_helper, p);
+ TASK_INIT(&ki->kaio_sync_task, 0, aio_schedule_fsync, ki);
PROC_LOCK(p);
if (p->p_aioinfo == NULL) {
p->p_aioinfo = ki;
@@ -637,7 +540,7 @@
MPASS(ki != NULL);
AIO_LOCK_ASSERT(ki, MA_OWNED);
- MPASS(job->jobstate == JOBST_JOBFINISHED);
+ MPASS(job->jobflags & KAIOCB_FINISHED);
atomic_subtract_int(&num_queue_count, 1);
@@ -670,7 +573,6 @@
PROC_UNLOCK(p);
MPASS(job->bp == NULL);
- job->jobstate = JOBST_NULL;
AIO_UNLOCK(ki);
/*
@@ -709,6 +611,57 @@
aio_proc_rundown(arg, p);
}
+static int
+aio_cancel_job(struct proc *p, struct kaioinfo *ki, struct kaiocb *job)
+{
+ aio_cancel_fn_t *func;
+ int cancelled;
+
+ AIO_LOCK_ASSERT(ki, MA_OWNED);
+ if (job->jobflags & (KAIOCB_CANCELLED | KAIOCB_FINISHED))
+ return (0);
+ MPASS((job->jobflags & KAIOCB_CANCELLING) == 0);
+ job->jobflags |= KAIOCB_CANCELLED;
+
+ func = job->cancel_fn;
+
+ /*
+ * If there is no cancel routine, just leave the job marked as
+ * cancelled. The job should be in active use by a caller who
+ * should complete it normally or when it fails to install a
+ * cancel routine.
+ */
+ if (func == NULL)
+ return (0);
+
+ /*
+ * Set the CANCELLING flag so that aio_complete() will defer
+ * completions of this job. This prevents the job from being
+ * freed out from under the cancel callback. After the
+ * callback any deferred completion (whether from the callback
+ * or any other source) will be completed.
+ */
+ job->jobflags |= KAIOCB_CANCELLING;
+ AIO_UNLOCK(ki);
+ func(job);
+ AIO_LOCK(ki);
+ job->jobflags &= ~KAIOCB_CANCELLING;
+ if (job->jobflags & KAIOCB_FINISHED) {
+ cancelled = job->uaiocb._aiocb_private.error == ECANCELED;
+ TAILQ_REMOVE(&ki->kaio_jobqueue, job, plist);
+ aio_bio_done_notify(p, job);
+ } else {
+ /*
+ * The cancel callback might have scheduled an
+ * operation to cancel this request, but it is
+ * only counted as cancelled if the request is
+ * cancelled when the callback returns.
+ */
+ cancelled = 0;
+ }
+ return (cancelled);
+}
+
/*
* Rundown the jobs for a given process.
*/
@@ -718,9 +671,6 @@
struct kaioinfo *ki;
struct aioliojob *lj;
struct kaiocb *job, *jobn;
- struct file *fp;
- struct socket *so;
- int remove;
KASSERT(curthread->td_proc == p,
("%s: called on non-curproc", __func__));
@@ -738,35 +688,11 @@
* aio_cancel on all pending I/O requests.
*/
TAILQ_FOREACH_SAFE(job, &ki->kaio_jobqueue, plist, jobn) {
- remove = 0;
- mtx_lock(&aio_job_mtx);
- if (job->jobstate == JOBST_JOBQGLOBAL) {
- TAILQ_REMOVE(&aio_jobs, job, list);
- remove = 1;
- } else if (job->jobstate == JOBST_JOBQSOCK) {
- fp = job->fd_file;
- MPASS(fp->f_type == DTYPE_SOCKET);
- so = fp->f_data;
- TAILQ_REMOVE(&so->so_aiojobq, job, list);
- remove = 1;
- } else if (job->jobstate == JOBST_JOBQSYNC) {
- TAILQ_REMOVE(&ki->kaio_syncqueue, job, list);
- remove = 1;
- }
- mtx_unlock(&aio_job_mtx);
-
- if (remove) {
- job->jobstate = JOBST_JOBFINISHED;
- job->uaiocb._aiocb_private.status = -1;
- job->uaiocb._aiocb_private.error = ECANCELED;
- TAILQ_REMOVE(&ki->kaio_jobqueue, job, plist);
- aio_bio_done_notify(p, job, DONE_QUEUE);
- }
+ aio_cancel_job(p, ki, job);
}
/* Wait for all running I/O to be finished */
- if (TAILQ_FIRST(&ki->kaio_bufqueue) ||
- TAILQ_FIRST(&ki->kaio_jobqueue)) {
+ if (TAILQ_FIRST(&ki->kaio_jobqueue) || ki->kaio_active_count != 0) {
ki->kaio_flags |= KAIO_WAKEUP;
msleep(&p->p_aioinfo, AIO_MTX(ki), PRIBIO, "aioprn", hz);
goto restart;
@@ -791,6 +717,7 @@
}
AIO_UNLOCK(ki);
taskqueue_drain(taskqueue_aiod_kick, &ki->kaio_task);
+ taskqueue_drain(taskqueue_aiod_kick, &ki->kaio_sync_task);
mtx_destroy(&ki->kaio_mtx);
uma_zfree(kaio_zone, ki);
p->p_aioinfo = NULL;
@@ -807,15 +734,18 @@
struct proc *userp;
mtx_assert(&aio_job_mtx, MA_OWNED);
+restart:
TAILQ_FOREACH(job, &aio_jobs, list) {
userp = job->userproc;
ki = userp->p_aioinfo;
if (ki->kaio_active_count < ki->kaio_maxactive_count) {
TAILQ_REMOVE(&aio_jobs, job, list);
+ if (!aio_clear_cancel_function(job))
+ goto restart;
+
/* Account for currently active jobs. */
ki->kaio_active_count++;
- job->jobstate = JOBST_JOBRUNNING;
break;
}
}
@@ -863,7 +793,6 @@
struct thread *td;
struct aiocb *cb;
struct file *fp;
- struct socket *so;
struct uio auio;
struct iovec aiov;
int cnt;
@@ -875,6 +804,7 @@
job->uaiocb.aio_lio_opcode == LIO_WRITE,
("%s: opcode %d", __func__, job->uaiocb.aio_lio_opcode));
+ aio_switch_vmspace(job);
td = curthread;
td_savedcred = td->td_ucred;
td->td_ucred = job->cred;
@@ -920,24 +850,15 @@
if (error == ERESTART || error == EINTR || error == EWOULDBLOCK)
error = 0;
if ((error == EPIPE) && (cb->aio_lio_opcode == LIO_WRITE)) {
- int sigpipe = 1;
- if (fp->f_type == DTYPE_SOCKET) {
- so = fp->f_data;
- if (so->so_options & SO_NOSIGPIPE)
- sigpipe = 0;
- }
- if (sigpipe) {
- PROC_LOCK(job->userproc);
- kern_psignal(job->userproc, SIGPIPE);
- PROC_UNLOCK(job->userproc);
- }
+ PROC_LOCK(job->userproc);
+ kern_psignal(job->userproc, SIGPIPE);
+ PROC_UNLOCK(job->userproc);
}
}
cnt -= auio.uio_resid;
- cb->_aiocb_private.error = error;
- cb->_aiocb_private.status = cnt;
td->td_ucred = td_savedcred;
+ aio_complete(job, cnt, error);
}
static void
@@ -945,7 +866,6 @@
{
struct thread *td = curthread;
struct ucred *td_savedcred = td->td_ucred;
- struct aiocb *cb = &job->uaiocb;
struct file *fp = job->fd_file;
int error = 0;
@@ -955,9 +875,8 @@
td->td_ucred = job->cred;
if (fp->f_vnode != NULL)
error = aio_fsync_vnode(td, fp->f_vnode);
- cb->_aiocb_private.error = error;
- cb->_aiocb_private.status = 0;
td->td_ucred = td_savedcred;
+ aio_complete(job, 0, error);
}
static void
@@ -969,19 +888,20 @@
KASSERT(job->uaiocb.aio_lio_opcode == LIO_MLOCK,
("%s: opcode %d", __func__, job->uaiocb.aio_lio_opcode));
+ aio_switch_vmspace(job);
error = vm_mlock(job->userproc, job->cred,
__DEVOLATILE(void *, cb->aio_buf), cb->aio_nbytes);
- cb->_aiocb_private.error = error;
- cb->_aiocb_private.status = 0;
+ aio_complete(job, 0, error);
}
static void
-aio_bio_done_notify(struct proc *userp, struct kaiocb *job, int type)
+aio_bio_done_notify(struct proc *userp, struct kaiocb *job)
{
struct aioliojob *lj;
struct kaioinfo *ki;
struct kaiocb *sjob, *sjobn;
int lj_done;
+ bool schedule_fsync;
ki = userp->p_aioinfo;
AIO_LOCK_ASSERT(ki, MA_OWNED);
@@ -992,13 +912,8 @@
if (lj->lioj_count == lj->lioj_finished_count)
lj_done = 1;
}
- if (type == DONE_QUEUE) {
- job->jobflags |= KAIOCB_DONE;
- } else {
- job->jobflags |= KAIOCB_BUFDONE;
- }
TAILQ_INSERT_TAIL(&ki->kaio_done, job, plist);
- job->jobstate = JOBST_JOBFINISHED;
+ MPASS(job->jobflags & KAIOCB_FINISHED);
if (ki->kaio_flags & KAIO_RUNDOWN)
goto notification_done;
@@ -1025,20 +940,24 @@
notification_done:
if (job->jobflags & KAIOCB_CHECKSYNC) {
+ schedule_fsync = false;
TAILQ_FOREACH_SAFE(sjob, &ki->kaio_syncqueue, list, sjobn) {
if (job->fd_file == sjob->fd_file &&
job->seqno < sjob->seqno) {
if (--sjob->pending == 0) {
- mtx_lock(&aio_job_mtx);
- sjob->jobstate = JOBST_JOBQGLOBAL;
TAILQ_REMOVE(&ki->kaio_syncqueue, sjob,
list);
- TAILQ_INSERT_TAIL(&aio_jobs, sjob, list);
- aio_kick_nowait(userp);
- mtx_unlock(&aio_job_mtx);
+ if (!aio_clear_cancel_function(sjob))
+ continue;
+ TAILQ_INSERT_TAIL(&ki->kaio_syncready,
+ sjob, list);
+ schedule_fsync = true;
}
}
}
+ if (schedule_fsync)
+ taskqueue_enqueue(taskqueue_aiod_kick,
+ &ki->kaio_sync_task);
}
if (ki->kaio_flags & KAIO_WAKEUP) {
ki->kaio_flags &= ~KAIO_WAKEUP;
@@ -1047,6 +966,103 @@
}
static void
+aio_schedule_fsync(void *context, int pending)
+{
+ struct kaioinfo *ki;
+ struct kaiocb *job;
+
+ ki = context;
+ AIO_LOCK(ki);
+ while (!TAILQ_EMPTY(&ki->kaio_syncready)) {
+ job = TAILQ_FIRST(&ki->kaio_syncready);
+ TAILQ_REMOVE(&ki->kaio_syncready, job, list);
+ AIO_UNLOCK(ki);
+ aio_schedule(job, aio_process_sync);
+ AIO_LOCK(ki);
+ }
+ AIO_UNLOCK(ki);
+}
+
+bool
+aio_cancel_cleared(struct kaiocb *job)
+{
+ struct kaioinfo *ki;
+
+ /*
+ * The caller should hold the same queue lock held when
+ * aio_clear_cancel_function() was called and set this flag
+ * ensuring this check sees an up-to-date value. However,
+ * there is no way to assert that.
+ */
+ ki = job->userproc->p_aioinfo;
+ return ((job->jobflags & KAIOCB_CLEARED) != 0);
+}
+
+bool
+aio_clear_cancel_function(struct kaiocb *job)
+{
+ struct kaioinfo *ki;
+
+ ki = job->userproc->p_aioinfo;
+ AIO_LOCK(ki);
+ MPASS(job->cancel_fn != NULL);
+ if (job->jobflags & KAIOCB_CANCELLING) {
+ job->jobflags |= KAIOCB_CLEARED;
+ AIO_UNLOCK(ki);
+ return (false);
+ }
+ job->cancel_fn = NULL;
+ AIO_UNLOCK(ki);
+ return (true);
+}
+
+bool
+aio_set_cancel_function(struct kaiocb *job, aio_cancel_fn_t *func)
+{
+ struct kaioinfo *ki;
+
+ ki = job->userproc->p_aioinfo;
+ AIO_LOCK(ki);
+ if (job->jobflags & KAIOCB_CANCELLED) {
+ AIO_UNLOCK(ki);
+ return (false);
+ }
+ job->cancel_fn = func;
+ AIO_UNLOCK(ki);
+ return (true);
+}
+
+void
+aio_complete(struct kaiocb *job, long status, int error)
+{
+ struct kaioinfo *ki;
+ struct proc *userp;
+
+ job->uaiocb._aiocb_private.error = error;
+ job->uaiocb._aiocb_private.status = status;
+
+ userp = job->userproc;
+ ki = userp->p_aioinfo;
+
+ AIO_LOCK(ki);
+ KASSERT(!(job->jobflags & KAIOCB_FINISHED),
+ ("duplicate aio_complete"));
+ job->jobflags |= KAIOCB_FINISHED;
+ if ((job->jobflags & (KAIOCB_QUEUEING | KAIOCB_CANCELLING)) == 0) {
+ TAILQ_REMOVE(&ki->kaio_jobqueue, job, plist);
+ aio_bio_done_notify(userp, job);
+ }
+ AIO_UNLOCK(ki);
+}
+
+void
+aio_cancel(struct kaiocb *job)
+{
+
+ aio_complete(job, -1, ECANCELED);
+}
+
+void
aio_switch_vmspace(struct kaiocb *job)
{
@@ -1063,7 +1079,7 @@
struct kaiocb *job;
struct aioproc *aiop;
struct kaioinfo *ki;
- struct proc *p, *userp;
+ struct proc *p;
struct vmspace *myvm;
struct thread *td = curthread;
int id = (intptr_t)_id;
@@ -1107,40 +1123,13 @@
*/
while ((job = aio_selectjob(aiop)) != NULL) {
mtx_unlock(&aio_job_mtx);
- userp = job->userproc;
-
- /*
- * Connect to process address space for user program.
- */
- aio_switch_vmspace(job);
- ki = userp->p_aioinfo;
-
- /* Do the I/O function. */
- switch(job->uaiocb.aio_lio_opcode) {
- case LIO_READ:
- case LIO_WRITE:
- aio_process_rw(job);
- break;
- case LIO_SYNC:
- aio_process_sync(job);
- break;
- case LIO_MLOCK:
- aio_process_mlock(job);
- break;
- }
+ ki = job->userproc->p_aioinfo;
+ job->handle_fn(job);
mtx_lock(&aio_job_mtx);
/* Decrement the active job count. */
ki->kaio_active_count--;
- mtx_unlock(&aio_job_mtx);
-
- AIO_LOCK(ki);
- TAILQ_REMOVE(&ki->kaio_jobqueue, job, plist);
- aio_bio_done_notify(userp, job, DONE_QUEUE);
- AIO_UNLOCK(ki);
-
- mtx_lock(&aio_job_mtx);
}
/*
@@ -1236,7 +1225,6 @@
struct cdevsw *csw;
struct cdev *dev;
struct kaioinfo *ki;
- struct aioliojob *lj;
int error, ref, unmap, poff;
vm_prot_t prot;
@@ -1293,16 +1281,8 @@
}
AIO_LOCK(ki);
- ki->kaio_count++;
if (!unmap)
ki->kaio_buffer_count++;
- lj = job->lio;
- if (lj)
- lj->lioj_count++;
- TAILQ_INSERT_TAIL(&ki->kaio_bufqueue, job, plist);
- TAILQ_INSERT_TAIL(&ki->kaio_all, job, allist);
- job->jobstate = JOBST_JOBQBUF;
- cb->_aiocb_private.status = cb->aio_nbytes;
AIO_UNLOCK(ki);
bp->bio_length = cb->aio_nbytes;
@@ -1336,7 +1316,6 @@
bp->bio_flags |= BIO_UNMAPPED;
}
- atomic_add_int(&num_queue_count, 1);
if (!unmap)
atomic_add_int(&num_buf_aio, 1);
@@ -1347,14 +1326,8 @@
doerror:
AIO_LOCK(ki);
- job->jobstate = JOBST_NULL;
- TAILQ_REMOVE(&ki->kaio_bufqueue, job, plist);
- TAILQ_REMOVE(&ki->kaio_all, job, allist);
- ki->kaio_count--;
if (!unmap)
ki->kaio_buffer_count--;
- if (lj)
- lj->lioj_count--;
AIO_UNLOCK(ki);
if (pbuf) {
relpbuf(pbuf, NULL);
@@ -1367,40 +1340,6 @@
return (error);
}
-/*
- * Wake up aio requests that may be serviceable now.
- */
-static void
-aio_swake_cb(struct socket *so, struct sockbuf *sb)
-{
- struct kaiocb *job, *jobn;
- int opcode;
-
- SOCKBUF_LOCK_ASSERT(sb);
- if (sb == &so->so_snd)
- opcode = LIO_WRITE;
- else
- opcode = LIO_READ;
-
- sb->sb_flags &= ~SB_AIO;
- mtx_lock(&aio_job_mtx);
- TAILQ_FOREACH_SAFE(job, &so->so_aiojobq, list, jobn) {
- if (opcode == job->uaiocb.aio_lio_opcode) {
- if (job->jobstate != JOBST_JOBQSOCK)
- panic("invalid queue value");
- /* XXX
- * We don't have actual sockets backend yet,
- * so we simply move the requests to the generic
- * file I/O backend.
- */
- TAILQ_REMOVE(&so->so_aiojobq, job, list);
- TAILQ_INSERT_TAIL(&aio_jobs, job, list);
- aio_kick_nowait(job->userproc);
- }
- }
- mtx_unlock(&aio_job_mtx);
-}
-
static int
convert_old_sigevent(struct osigevent *osig, struct sigevent *nsig)
{
@@ -1521,11 +1460,9 @@
struct proc *p = td->td_proc;
cap_rights_t rights;
struct file *fp;
- struct socket *so;
- struct kaiocb *job, *job2;
+ struct kaiocb *job;
struct kaioinfo *ki;
struct kevent kev;
- struct sockbuf *sb;
int opcode;
int error;
int fd, kqfd;
@@ -1668,86 +1605,128 @@
kev.data = (intptr_t)job;
kev.udata = job->uaiocb.aio_sigevent.sigev_value.sival_ptr;
error = kqfd_register(kqfd, &kev, td, 1);
-aqueue_fail:
- if (error) {
- if (fp)
- fdrop(fp, td);
- uma_zfree(aiocb_zone, job);
- ops->store_error(ujob, error);
- goto done;
- }
+ if (error)
+ goto aqueue_fail;
+
no_kqueue:
ops->store_error(ujob, EINPROGRESS);
job->uaiocb._aiocb_private.error = EINPROGRESS;
job->userproc = p;
job->cred = crhold(td->td_ucred);
- job->jobflags = 0;
+ job->jobflags = KAIOCB_QUEUEING;
job->lio = lj;
- if (opcode == LIO_SYNC)
- goto queueit;
+ if (opcode == LIO_MLOCK) {
+ aio_schedule(job, aio_process_mlock);
+ error = 0;
+ } else if (fp->f_ops->fo_aio_queue == NULL)
+ error = aio_queue_file(fp, job);
+ else
+ error = fo_aio_queue(fp, job);
+ if (error)
+ goto aqueue_fail;
- if (fp && fp->f_type == DTYPE_SOCKET) {
+ AIO_LOCK(ki);
+ job->jobflags &= ~KAIOCB_QUEUEING;
+ TAILQ_INSERT_TAIL(&ki->kaio_all, job, allist);
+ ki->kaio_count++;
+ if (lj)
+ lj->lioj_count++;
+ atomic_add_int(&num_queue_count, 1);
+ if (job->jobflags & KAIOCB_FINISHED) {
/*
- * Alternate queueing for socket ops: Reach down into the
- * descriptor to get the socket data. Then check to see if the
- * socket is ready to be read or written (based on the requested
- * operation).
- *
- * If it is not ready for io, then queue the job on the
- * socket, and set the flags so we get a call when sbnotify()
- * happens.
- *
- * Note if opcode is neither LIO_WRITE nor LIO_READ we lock
- * and unlock the snd sockbuf for no reason.
+ * The queue callback completed the request synchronously.
+ * The bulk of the completion is deferred in that case
+ * until this point.
*/
- so = fp->f_data;
- sb = (opcode == LIO_READ) ? &so->so_rcv : &so->so_snd;
- SOCKBUF_LOCK(sb);
- if (((opcode == LIO_READ) && (!soreadable(so))) || ((opcode ==
- LIO_WRITE) && (!sowriteable(so)))) {
- sb->sb_flags |= SB_AIO;
+ aio_bio_done_notify(p, job);
+ } else
+ TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, job, plist);
+ AIO_UNLOCK(ki);
+ return (0);
- mtx_lock(&aio_job_mtx);
- TAILQ_INSERT_TAIL(&so->so_aiojobq, job, list);
- mtx_unlock(&aio_job_mtx);
+aqueue_fail:
+ knlist_delete(&job->klist, curthread, 0);
+ if (fp)
+ fdrop(fp, td);
+ uma_zfree(aiocb_zone, job);
+ ops->store_error(ujob, error);
+ return (error);
+}
- AIO_LOCK(ki);
- TAILQ_INSERT_TAIL(&ki->kaio_all, job, allist);
- TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, job, plist);
- job->jobstate = JOBST_JOBQSOCK;
- ki->kaio_count++;
- if (lj)
- lj->lioj_count++;
- AIO_UNLOCK(ki);
- SOCKBUF_UNLOCK(sb);
- atomic_add_int(&num_queue_count, 1);
- error = 0;
- goto done;
- }
- SOCKBUF_UNLOCK(sb);
+static void
+aio_cancel_daemon_job(struct kaiocb *job)
+{
+
+ mtx_lock(&aio_job_mtx);
+ if (!aio_cancel_cleared(job))
+ TAILQ_REMOVE(&aio_jobs, job, list);
+ mtx_unlock(&aio_job_mtx);
+ aio_cancel(job);
+}
+
+void
+aio_schedule(struct kaiocb *job, aio_handle_fn_t *func)
+{
+
+ mtx_lock(&aio_job_mtx);
+ if (!aio_set_cancel_function(job, aio_cancel_daemon_job)) {
+ mtx_unlock(&aio_job_mtx);
+ aio_cancel(job);
+ return;
}
+ job->handle_fn = func;
+ TAILQ_INSERT_TAIL(&aio_jobs, job, list);
+ aio_kick_nowait(job->userproc);
+ mtx_unlock(&aio_job_mtx);
+}
+
+static void
+aio_cancel_sync(struct kaiocb *job)
+{
+ struct kaioinfo *ki;
+
+ ki = job->userproc->p_aioinfo;
+ mtx_lock(&aio_job_mtx);
+ if (!aio_cancel_cleared(job))
+ TAILQ_REMOVE(&ki->kaio_syncqueue, job, list);
+ mtx_unlock(&aio_job_mtx);
+ aio_cancel(job);
+}
+
+int
+aio_queue_file(struct file *fp, struct kaiocb *job)
+{
+ struct aioliojob *lj;
+ struct kaioinfo *ki;
+ struct kaiocb *job2;
+ int error, opcode;
+
+ lj = job->lio;
+ ki = job->userproc->p_aioinfo;
+ opcode = job->uaiocb.aio_lio_opcode;
+ if (opcode == LIO_SYNC)
+ goto queueit;
- if ((error = aio_qphysio(p, job)) == 0)
+ if ((error = aio_qphysio(job->userproc, job)) == 0)
goto done;
#if 0
- if (error > 0) {
- job->uaiocb._aiocb_private.error = error;
- ops->store_error(ujob, error);
+ /*
+ * XXX: This means qphysio() failed with EFAULT. The current
+ * behavior is to retry the operation via fo_read/fo_write.
+ * Wouldn't it be better to just complete the request with an
+ * error here?
+ */
+ if (error > 0)
goto done;
- }
#endif
queueit:
- atomic_add_int(&num_queue_count, 1);
+ if (!enable_aio_unsafe)
+ return (EOPNOTSUPP);
- AIO_LOCK(ki);
- ki->kaio_count++;
- if (lj)
- lj->lioj_count++;
- TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, job, plist);
- TAILQ_INSERT_TAIL(&ki->kaio_all, job, allist);
if (opcode == LIO_SYNC) {
+ AIO_LOCK(ki);
TAILQ_FOREACH(job2, &ki->kaio_jobqueue, plist) {
if (job2->fd_file == job->fd_file &&
job2->uaiocb.aio_lio_opcode != LIO_SYNC &&
@@ -1756,28 +1735,32 @@
job->pending++;
}
}
- TAILQ_FOREACH(job2, &ki->kaio_bufqueue, plist) {
- if (job2->fd_file == job->fd_file &&
- job2->uaiocb.aio_lio_opcode != LIO_SYNC &&
- job2->seqno < job->seqno) {
- job2->jobflags |= KAIOCB_CHECKSYNC;
- job->pending++;
- }
- }
if (job->pending != 0) {
+ if (!aio_set_cancel_function(job, aio_cancel_sync)) {
+ AIO_UNLOCK(ki);
+ aio_cancel(job);
+ return (0);
+ }
TAILQ_INSERT_TAIL(&ki->kaio_syncqueue, job, list);
- job->jobstate = JOBST_JOBQSYNC;
AIO_UNLOCK(ki);
- goto done;
+ return (0);
}
+ AIO_UNLOCK(ki);
+ }
+
+ switch (opcode) {
+ case LIO_READ:
+ case LIO_WRITE:
+ aio_schedule(job, aio_process_rw);
+ error = 0;
+ break;
+ case LIO_SYNC:
+ aio_schedule(job, aio_process_sync);
+ error = 0;
+ break;
+ default:
+ error = EINVAL;
}
- mtx_lock(&aio_job_mtx);
- TAILQ_INSERT_TAIL(&aio_jobs, job, list);
- job->jobstate = JOBST_JOBQGLOBAL;
- aio_kick_nowait(p);
- mtx_unlock(&aio_job_mtx);
- AIO_UNLOCK(ki);
- error = 0;
done:
return (error);
}
@@ -1864,7 +1847,7 @@
break;
}
if (job != NULL) {
- MPASS(job->jobstate == JOBST_JOBFINISHED);
+ MPASS(job->jobflags & KAIOCB_FINISHED);
status = job->uaiocb._aiocb_private.status;
error = job->uaiocb._aiocb_private.error;
td->td_retval[0] = status;
@@ -1933,7 +1916,7 @@
if (job->ujob == ujoblist[i]) {
if (firstjob == NULL)
firstjob = job;
- if (job->jobstate == JOBST_JOBFINISHED)
+ if (job->jobflags & KAIOCB_FINISHED)
goto RETURN;
}
}
@@ -1992,10 +1975,8 @@
struct kaioinfo *ki;
struct kaiocb *job, *jobn;
struct file *fp;
- struct socket *so;
cap_rights_t rights;
int error;
- int remove;
int cancelled = 0;
int notcancelled = 0;
struct vnode *vp;
@@ -2023,28 +2004,7 @@
if ((uap->fd == job->uaiocb.aio_fildes) &&
((uap->aiocbp == NULL) ||
(uap->aiocbp == job->ujob))) {
- remove = 0;
-
- mtx_lock(&aio_job_mtx);
- if (job->jobstate == JOBST_JOBQGLOBAL) {
- TAILQ_REMOVE(&aio_jobs, job, list);
- remove = 1;
- } else if (job->jobstate == JOBST_JOBQSOCK) {
- MPASS(fp->f_type == DTYPE_SOCKET);
- so = fp->f_data;
- TAILQ_REMOVE(&so->so_aiojobq, job, list);
- remove = 1;
- } else if (job->jobstate == JOBST_JOBQSYNC) {
- TAILQ_REMOVE(&ki->kaio_syncqueue, job, list);
- remove = 1;
- }
- mtx_unlock(&aio_job_mtx);
-
- if (remove) {
- TAILQ_REMOVE(&ki->kaio_jobqueue, job, plist);
- job->uaiocb._aiocb_private.status = -1;
- job->uaiocb._aiocb_private.error = ECANCELED;
- aio_bio_done_notify(p, job, DONE_QUEUE);
+ if (aio_cancel_job(p, ki, job)) {
cancelled++;
} else {
notcancelled++;
@@ -2102,7 +2062,7 @@
AIO_LOCK(ki);
TAILQ_FOREACH(job, &ki->kaio_all, allist) {
if (job->ujob == ujob) {
- if (job->jobstate == JOBST_JOBFINISHED)
+ if (job->jobflags & KAIOCB_FINISHED)
td->td_retval[0] =
job->uaiocb._aiocb_private.error;
else
@@ -2382,35 +2342,36 @@
struct kaiocb *job = (struct kaiocb *)bp->bio_caller1;
struct proc *userp;
struct kaioinfo *ki;
- int nblks;
+ size_t nbytes;
+ int error, nblks;
/* Release mapping into kernel space. */
+ userp = job->userproc;
+ ki = userp->p_aioinfo;
if (job->pbuf) {
pmap_qremove((vm_offset_t)job->pbuf->b_data, job->npages);
relpbuf(job->pbuf, NULL);
job->pbuf = NULL;
atomic_subtract_int(&num_buf_aio, 1);
+ AIO_LOCK(ki);
+ ki->kaio_buffer_count--;
+ AIO_UNLOCK(ki);
}
vm_page_unhold_pages(job->pages, job->npages);
bp = job->bp;
job->bp = NULL;
- userp = job->userproc;
- ki = userp->p_aioinfo;
- AIO_LOCK(ki);
- job->uaiocb._aiocb_private.status -= bp->bio_resid;
- job->uaiocb._aiocb_private.error = 0;
+ nbytes = job->uaiocb.aio_nbytes - bp->bio_resid;
+ error = 0;
if (bp->bio_flags & BIO_ERROR)
- job->uaiocb._aiocb_private.error = bp->bio_error;
- nblks = btodb(job->uaiocb.aio_nbytes);
+ error = bp->bio_error;
+ nblks = btodb(nbytes);
if (job->uaiocb.aio_lio_opcode == LIO_WRITE)
job->outputcharge += nblks;
else
job->inputcharge += nblks;
- TAILQ_REMOVE(&userp->p_aioinfo->kaio_bufqueue, job, plist);
- ki->kaio_buffer_count--;
- aio_bio_done_notify(userp, job, DONE_BUF);
- AIO_UNLOCK(ki);
+
+ aio_complete(job, nbytes, error);
g_destroy_bio(bp);
}
@@ -2465,7 +2426,7 @@
}
if (job != NULL) {
- MPASS(job->jobstate == JOBST_JOBFINISHED);
+ MPASS(job->jobflags & KAIOCB_FINISHED);
ujob = job->ujob;
status = job->uaiocb._aiocb_private.status;
error = job->uaiocb._aiocb_private.error;
@@ -2570,7 +2531,7 @@
struct kaiocb *job = kn->kn_ptr.p_aio;
kn->kn_data = job->uaiocb._aiocb_private.error;
- if (job->jobstate != JOBST_JOBFINISHED)
+ if (!(job->jobflags & KAIOCB_FINISHED))
return (0);
kn->kn_flags |= EV_EOF;
return (1);
Index: sys/modules/Makefile
===================================================================
--- sys/modules/Makefile
+++ sys/modules/Makefile
@@ -31,7 +31,6 @@
ahci \
${_aic} \
aic7xxx \
- aio \
alc \
ale \
alq \
Index: sys/modules/aio/Makefile
===================================================================
--- sys/modules/aio/Makefile
+++ /dev/null
@@ -1,10 +0,0 @@
-# $FreeBSD$
-
-.PATH: ${.CURDIR}/../../kern
-
-KMOD= aio
-SRCS= vfs_aio.c opt_vfs_aio.h vnode_if.h opt_compat.h
-
-EXPORT_SYMS= aio_init_aioinfo aio_aqueue
-
-.include <bsd.kmod.mk>
Index: sys/sys/aio.h
===================================================================
--- sys/sys/aio.h
+++ sys/sys/aio.h
@@ -21,6 +21,11 @@
#include <sys/types.h>
#include <sys/signal.h>
+#ifdef _KERNEL
+#include <sys/queue.h>
+#include <sys/event.h>
+#include <sys/signalvar.h>
+#endif
/*
* Returned by aio_cancel:
@@ -51,6 +56,24 @@
*/
#define AIO_LISTIO_MAX 16
+#ifdef _KERNEL
+
+/* Default values of tunables for the AIO worker pool. */
+
+#ifndef MAX_AIO_PROCS
+#define MAX_AIO_PROCS 32
+#endif
+
+#ifndef TARGET_AIO_PROCS
+#define TARGET_AIO_PROCS 4
+#endif
+
+#ifndef AIOD_LIFETIME_DEFAULT
+#define AIOD_LIFETIME_DEFAULT (30 * hz)
+#endif
+
+#endif
+
/*
* Private members for aiocb -- don't access
* directly.
@@ -77,7 +100,91 @@
struct sigevent aio_sigevent; /* Signal to deliver */
} aiocb_t;
-#ifndef _KERNEL
+#ifdef _KERNEL
+
+typedef void aio_cancel_fn_t(struct kaiocb *);
+typedef void aio_handle_fn_t(struct kaiocb *);
+
+/*
+ * Kernel version of an I/O control block.
+ *
+ * Locking key:
+ * * - need not protected
+ * a - locked by kaioinfo lock
+ * b - locked by backend lock
+ * c - locked by aio_job_mtx
+ */
+struct kaiocb {
+ TAILQ_ENTRY(kaiocb) list; /* (b) backend-specific list of jobs */
+ TAILQ_ENTRY(kaiocb) plist; /* (a) lists of pending / done jobs */
+ TAILQ_ENTRY(kaiocb) allist; /* (a) list of all jobs in proc */
+ int jobflags; /* (a) job flags */
+ int inputcharge; /* (*) input blocks */
+ int outputcharge; /* (*) output blocks */
+ struct bio *bp; /* (*) BIO backend BIO pointer */
+ struct buf *pbuf; /* (*) BIO backend buffer pointer */
+ struct vm_page *pages[btoc(MAXPHYS)+1]; /* BIO backend pages */
+ int npages; /* BIO backend number of pages */
+ struct proc *userproc; /* (*) user process */
+ struct ucred *cred; /* (*) active credential when created */
+ struct file *fd_file; /* (*) pointer to file structure */
+ struct aioliojob *lio; /* (*) optional lio job */
+ struct aiocb *ujob; /* (*) pointer in userspace of aiocb */
+ struct knlist klist; /* (a) list of knotes */
+ struct aiocb uaiocb; /* (*) copy of user I/O control block */
+ ksiginfo_t ksi; /* (a) realtime signal info */
+ uint64_t seqno; /* (*) job number */
+ int pending; /* (a) number of pending I/O, aio_fsync only */
+ aio_cancel_fn_t *cancel_fn; /* (a) backend cancel function */
+ aio_handle_fn_t *handle_fn; /* (c) backend handle function */
+};
+
+struct socket;
+struct sockbuf;
+
+/*
+ * AIO backends should permit cancellation of queued requests waiting to
+ * be serviced by installing a cancel routine while the request is
+ * queued. The cancellation routine should dequeue the request if
+ * necessary and cancel it. Care must be used to handle races between
+ * queueing and dequeueing requests and cancellation.
+ *
+ * When queueing a request somewhere such that it can be cancelled, the
+ * caller should:
+ *
+ * 1) Acquire lock that protects the associated queue.
+ * 2) Call aio_set_cancel_function() to install the cancel routine.
+ * 3) If that fails, the request has a pending cancel and should be
+ * cancelled via aio_cancel().
+ * 4) Queue the request.
+ *
+ * When dequeueing a request to service it or hand it off to somewhere else,
+ * the caller should:
+ *
+ * 1) Acquire the lock that protects the associated queue.
+ * 2) Dequeue the request.
+ * 3) Call aio_clear_cancel_function() to clear the cancel routine.
+ * 4) If that fails, the cancel routine is about to be called. The
+ * caller should ignore the request.
+ *
+ * The cancel routine should:
+ *
+ * 1) Acquire the lock that protects the associated queue.
+ * 2) Call aio_cancel_cleared() to determine if the request is already
+ * dequeued due to a race with dequeueing thread.
+ * 3) If that fails, dequeue the request.
+ * 4) Cancel the request via aio_cancel().
+ */
+
+bool aio_cancel_cleared(struct kaiocb *job);
+void aio_cancel(struct kaiocb *job);
+bool aio_clear_cancel_function(struct kaiocb *job);
+void aio_complete(struct kaiocb *job, long status, int error);
+void aio_schedule(struct kaiocb *job, aio_handle_fn_t *func);
+bool aio_set_cancel_function(struct kaiocb *job, aio_cancel_fn_t *func);
+void aio_switch_vmspace(struct kaiocb *job);
+
+#else /* !_KERNEL */
struct timespec;
@@ -137,14 +244,6 @@
int aio_fsync(int op, struct aiocb *aiocbp);
__END_DECLS
-#else
-
-/* Forward declarations for prototypes below. */
-struct socket;
-struct sockbuf;
-
-extern void (*aio_swake)(struct socket *, struct sockbuf *);
+#endif /* !_KERNEL */
-#endif
-
-#endif
+#endif /* !_SYS_AIO_H_ */
Index: sys/sys/file.h
===================================================================
--- sys/sys/file.h
+++ sys/sys/file.h
@@ -73,6 +73,7 @@
struct file;
struct filecaps;
+struct kaiocb;
struct kinfo_file;
struct ucred;
@@ -119,6 +120,7 @@
typedef int fo_mmap_t(struct file *fp, vm_map_t map, vm_offset_t *addr,
vm_size_t size, vm_prot_t prot, vm_prot_t cap_maxprot,
int flags, vm_ooffset_t foff, struct thread *td);
+typedef int fo_aio_queue_t(struct file *fp, struct kaiocb *job);
typedef int fo_flags_t;
struct fileops {
@@ -136,6 +138,7 @@
fo_seek_t *fo_seek;
fo_fill_kinfo_t *fo_fill_kinfo;
fo_mmap_t *fo_mmap;
+ fo_aio_queue_t *fo_aio_queue;
fo_flags_t fo_flags; /* DFLAG_* below */
};
@@ -406,6 +409,13 @@
flags, foff, td));
}
+static __inline int
+fo_aio_queue(struct file *fp, struct kaiocb *job)
+{
+
+ return ((*fp->f_ops->fo_aio_queue)(fp, job));
+}
+
#endif /* _KERNEL */
#endif /* !SYS_FILE_H */
Index: sys/sys/sockbuf.h
===================================================================
--- sys/sys/sockbuf.h
+++ sys/sys/sockbuf.h
@@ -36,6 +36,7 @@
#include <sys/_lock.h>
#include <sys/_mutex.h>
#include <sys/_sx.h>
+#include <sys/_task.h>
#define SB_MAX (2*1024*1024) /* default for max chars in sockbuf */
@@ -53,6 +54,7 @@
#define SB_IN_TOE 0x400 /* socket buffer is in the middle of an operation */
#define SB_AUTOSIZE 0x800 /* automatically size socket buffer */
#define SB_STOP 0x1000 /* backpressure indicator */
+#define SB_AIO_RUNNING 0x2000 /* AIO operation running */
#define SBS_CANTSENDMORE 0x0010 /* can't send more data to peer */
#define SBS_CANTRCVMORE 0x0020 /* can't receive more data from peer */
@@ -77,33 +79,38 @@
/*
* Variables for socket buffering.
+ *
+ * Locking key to struct sockbuf:
+ * (a) locked by SOCKBUF_LOCK().
*/
struct sockbuf {
struct selinfo sb_sel; /* process selecting read/write */
struct mtx sb_mtx; /* sockbuf lock */
struct sx sb_sx; /* prevent I/O interlacing */
- short sb_state; /* (c/d) socket state on sockbuf */
+ short sb_state; /* (a) socket state on sockbuf */
#define sb_startzero sb_mb
- struct mbuf *sb_mb; /* (c/d) the mbuf chain */
- struct mbuf *sb_mbtail; /* (c/d) the last mbuf in the chain */
- struct mbuf *sb_lastrecord; /* (c/d) first mbuf of last
+ struct mbuf *sb_mb; /* (a) the mbuf chain */
+ struct mbuf *sb_mbtail; /* (a) the last mbuf in the chain */
+ struct mbuf *sb_lastrecord; /* (a) first mbuf of last
* record in socket buffer */
- struct mbuf *sb_sndptr; /* (c/d) pointer into mbuf chain */
- struct mbuf *sb_fnrdy; /* (c/d) pointer to first not ready buffer */
- u_int sb_sndptroff; /* (c/d) byte offset of ptr into chain */
- u_int sb_acc; /* (c/d) available chars in buffer */
- u_int sb_ccc; /* (c/d) claimed chars in buffer */
- u_int sb_hiwat; /* (c/d) max actual char count */
- u_int sb_mbcnt; /* (c/d) chars of mbufs used */
- u_int sb_mcnt; /* (c/d) number of mbufs in buffer */
- u_int sb_ccnt; /* (c/d) number of clusters in buffer */
- u_int sb_mbmax; /* (c/d) max chars of mbufs to use */
- u_int sb_ctl; /* (c/d) non-data chars in buffer */
- int sb_lowat; /* (c/d) low water mark */
- sbintime_t sb_timeo; /* (c/d) timeout for read/write */
- short sb_flags; /* (c/d) flags, see below */
- int (*sb_upcall)(struct socket *, void *, int); /* (c/d) */
- void *sb_upcallarg; /* (c/d) */
+ struct mbuf *sb_sndptr; /* (a) pointer into mbuf chain */
+ struct mbuf *sb_fnrdy; /* (a) pointer to first not ready buffer */
+ u_int sb_sndptroff; /* (a) byte offset of ptr into chain */
+ u_int sb_acc; /* (a) available chars in buffer */
+ u_int sb_ccc; /* (a) claimed chars in buffer */
+ u_int sb_hiwat; /* (a) max actual char count */
+ u_int sb_mbcnt; /* (a) chars of mbufs used */
+ u_int sb_mcnt; /* (a) number of mbufs in buffer */
+ u_int sb_ccnt; /* (a) number of clusters in buffer */
+ u_int sb_mbmax; /* (a) max chars of mbufs to use */
+ u_int sb_ctl; /* (a) non-data chars in buffer */
+ int sb_lowat; /* (a) low water mark */
+ sbintime_t sb_timeo; /* (a) timeout for read/write */
+ short sb_flags; /* (a) flags, see below */
+ int (*sb_upcall)(struct socket *, void *, int); /* (a) */
+ void *sb_upcallarg; /* (a) */
+ TAILQ_HEAD(, kaiocb) sb_aiojobq; /* (a) pending AIO ops */
+ struct task sb_aiotask; /* AIO task */
};
#ifdef _KERNEL
Index: sys/sys/socketvar.h
===================================================================
--- sys/sys/socketvar.h
+++ sys/sys/socketvar.h
@@ -64,7 +64,6 @@
* (a) constant after allocation, no locking required.
* (b) locked by SOCK_LOCK(so).
* (c) locked by SOCKBUF_LOCK(&so->so_rcv).
- * (d) locked by SOCKBUF_LOCK(&so->so_snd).
* (e) locked by ACCEPT_LOCK().
* (f) not locked since integer reads/writes are atomic.
* (g) used only as a sleep/wakeup address, no value.
@@ -104,7 +103,6 @@
struct sigio *so_sigio; /* [sg] information for async I/O or
out of band data (SIGURG) */
u_long so_oobmark; /* (c) chars to oob mark */
- TAILQ_HEAD(, kaiocb) so_aiojobq; /* AIO ops waiting on socket */
struct sockbuf so_rcv, so_snd;
@@ -343,6 +341,8 @@
struct file **fpp, u_int *fflagp);
void soabort(struct socket *so);
int soaccept(struct socket *so, struct sockaddr **nam);
+void soaio_rcv(void *context, int pending);
+void soaio_snd(void *context, int pending);
int socheckuid(struct socket *so, uid_t uid);
int sobind(struct socket *so, struct sockaddr *nam, struct thread *td);
int sobindat(int fd, struct socket *so, struct sockaddr *nam,
@@ -397,6 +397,7 @@
void soupcall_set(struct socket *so, int which,
int (*func)(struct socket *, void *, int), void *arg);
void sowakeup(struct socket *so, struct sockbuf *sb);
+void sowakeup_aio(struct socket *so, struct sockbuf *sb);
int selsocket(struct socket *so, int events, struct timeval *tv,
struct thread *td);
Index: tests/sys/aio/aio_kqueue_test.c
===================================================================
--- tests/sys/aio/aio_kqueue_test.c
+++ tests/sys/aio/aio_kqueue_test.c
@@ -47,6 +47,7 @@
#include <unistd.h>
#include "freebsd_test_suite/macros.h"
+#include "local.h"
#define PATH_TEMPLATE "aio.XXXXXXXXXX"
@@ -70,6 +71,7 @@
unsigned i, j;
PLAIN_REQUIRE_KERNEL_MODULE("aio", 0);
+ PLAIN_REQUIRE_UNSAFE_AIO(0);
kq = kqueue();
if (kq < 0) {
Index: tests/sys/aio/aio_test.c
===================================================================
--- tests/sys/aio/aio_test.c
+++ tests/sys/aio/aio_test.c
@@ -60,6 +60,7 @@
#include <atf-c.h>
#include "freebsd_test_suite/macros.h"
+#include "local.h"
#define PATH_TEMPLATE "aio.XXXXXXXXXX"
@@ -340,6 +341,7 @@
int fd;
ATF_REQUIRE_KERNEL_MODULE("aio");
+ ATF_REQUIRE_UNSAFE_AIO();
strcpy(pathname, PATH_TEMPLATE);
fd = mkstemp(pathname);
@@ -386,6 +388,7 @@
struct aio_context ac;
ATF_REQUIRE_KERNEL_MODULE("aio");
+ ATF_REQUIRE_UNSAFE_AIO();
/*
* In theory, mkstemp() can return a name that is then collided with.
@@ -497,6 +500,7 @@
int error;
ATF_REQUIRE_KERNEL_MODULE("aio");
+ ATF_REQUIRE_UNSAFE_AIO();
ATF_REQUIRE_MSG(openpty(&read_fd, &write_fd, NULL, NULL, NULL) == 0,
"openpty failed: %s", strerror(errno));
@@ -544,6 +548,7 @@
int pipes[2];
ATF_REQUIRE_KERNEL_MODULE("aio");
+ ATF_REQUIRE_UNSAFE_AIO();
ATF_REQUIRE_MSG(pipe(pipes) != -1,
"pipe failed: %s", strerror(errno));
Index: tests/sys/aio/lio_kqueue_test.c
===================================================================
--- tests/sys/aio/lio_kqueue_test.c
+++ tests/sys/aio/lio_kqueue_test.c
@@ -50,6 +50,7 @@
#include <unistd.h>
#include "freebsd_test_suite/macros.h"
+#include "local.h"
#define PATH_TEMPLATE "aio.XXXXXXXXXX"
@@ -74,6 +75,7 @@
int tmp_file = 0, failed = 0;
PLAIN_REQUIRE_KERNEL_MODULE("aio", 0);
+ PLAIN_REQUIRE_UNSAFE_AIO(0);
kq = kqueue();
if (kq < 0)
Index: tests/sys/aio/local.h
===================================================================
--- /dev/null
+++ tests/sys/aio/local.h
@@ -0,0 +1,74 @@
+/*-
+ * Copyright (c) 2016 Chelsio Communications, Inc.
+ * All rights reserved.
+ * Written by: John Baldwin <jhb@FreeBSD.org>
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+ * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ *
+ * $FreeBSD$
+ */
+
+#ifndef _AIO_TEST_LOCAL_H_
+#define _AIO_TEST_LOCAL_H_
+
+#include <sys/types.h>
+#include <sys/sysctl.h>
+#include <errno.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+
+#include <atf-c.h>
+
+#define ATF_REQUIRE_UNSAFE_AIO() do { \
+ size_t _len; \
+ int _unsafe; \
+ \
+ _len = sizeof(_unsafe); \
+ if (sysctlbyname("vfs.aio.enable_unsafe", &_unsafe, &_len, NULL,\
+ 0) < 0) { \
+ if (errno != ENOENT) \
+ atf_libc_error(errno, \
+ "Failed to read vfs.aio.enable_unsafe"); \
+ } else if (_unsafe == 0) \
+ atf_tc_skip("Unsafe AIO is disabled"); \
+} while (0)
+
+#define PLAIN_REQUIRE_UNSAFE_AIO(_exit_code) do { \
+ size_t _len; \
+ int _unsafe; \
+ \
+ _len = sizeof(_unsafe); \
+ if (sysctlbyname("vfs.aio.enable_unsafe", &_unsafe, &_len, NULL,\
+ 0) < 0) { \
+ if (errno != ENOENT) { \
+ printf("Failed to read vfs.aio.enable_unsafe: %s\n",\
+ strerror(errno)); \
+ _exit(1); \
+ } \
+ } else if (_unsafe == 0) { \
+ printf("Unsafe AIO is disabled"); \
+ _exit(_exit_code); \
+ } \
+} while (0)
+
+#endif /* !_AIO_TEST_LOCAL_H_ */

File Metadata

Mime Type
text/plain
Expires
Sun, Jan 19, 12:32 AM (4 h, 8 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
15909022
Default Alt Text
D5289.id13428.diff (65 KB)

Event Timeline