Page Menu
Home
FreeBSD
Search
Configure Global Search
Log In
Files
F107760161
D5289.id13428.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
65 KB
Referenced Files
None
Subscribers
None
D5289.id13428.diff
View Options
Index: share/man/man4/aio.4
===================================================================
--- share/man/man4/aio.4
+++ share/man/man4/aio.4
@@ -27,24 +27,116 @@
.\"
.\" $FreeBSD$
.\"
-.Dd October 24, 2002
+.Dd February 15, 2016
.Dt AIO 4
.Os
.Sh NAME
.Nm aio
.Nd asynchronous I/O
-.Sh SYNOPSIS
-To link into the kernel:
-.Cd "options VFS_AIO"
-.Pp
-To load as a kernel loadable module:
-.Dl kldload aio
.Sh DESCRIPTION
The
.Nm
facility provides system calls for asynchronous I/O.
-It is available both as a kernel option for static inclusion and as a
-dynamic kernel module.
+However, asynchronous I/O operations are only enabled for certain file
+types by default.
+Asynchronous I/O operations for other file types may block an AIO daemon
+indefinitely resulting in process and/or system hangs.
+Asynchronous I/O operations can be enabled for all file types by setting
+the
+.Va vfs.aio.enable_unsafe
+sysctl node to a non-zero value.
+.Pp
+Asynchronous I/O operations on sockets and raw disk devices do not block
+indefinitely and are enabled by default.
+.Pp
+The
+.Nm
+facility uses kernel processes
+(also known as AIO daemons)
+to service most asynchronous I/O requests.
+These processes are grouped into pools containing a variable number of
+processes.
+Each pool will add or remove processes to the pool based on load.
+Pools can be configured by sysctl nodes that define the minimum
+and maximum number of processes as well as the amount of time an idle
+process will wait before exiting.
+.Pp
+One pool of AIO daemons is used to service asynchronous I/O requests for
+sockets.
+These processes are named
+.Dq soaiod<N> .
+The following sysctl nodes are used with this pool:
+.Bl -tag -width indent
+.It Va kern.ipc.aio.num_procs
+The current number of processes in the pool.
+.It Va kern.ipc.aio.target_procs
+The minimum number of processes that should be present in the pool.
+.It Va kern.ipc.aio.max_procs
+The maximum number of processes permitted in the pool.
+.It Va kern.ipc.aio.lifetime
+The amount of time a process is permitted to idle in clock ticks.
+If a process is idle for this amount of time and there are more processes
+in the pool than the target minimum,
+the process will exit.
+.El
+.Pp
+A second pool of AIO daemons isused to service all other asynchronous I/O
+requests except for I/O requests to raw disks.
+These processes are named
+.Dq aiod<N> .
+The following sysctl nodes are used with this pool:
+.Bl -tag -width indent
+.It Va vfs.aio.num_aio_procs
+The current number of processes in the pool.
+.It Va vfs.aio.target_aio_procs
+The minimum number of processes that should be present in the pool.
+.It Va vfs.aio.max_aio_procs
+The maximum number of processes permitted in the pool.
+.It Va vfs.aio.aiod_lifetime
+The amount of time a process is permitted to idle in clock ticks.
+If a process is idle for this amount of time and there are more processes
+in the pool than the target minimum,
+the process will exit.
+.El
+.Pp
+Asynchronous I/O requests for raw disks are queued directly to the disk
+device layer after temporarily wiring the user pages associated with the
+request.
+These requests are not serviced by any of the AIO daemon pools.
+.Pp
+Several limits on the number of asynchronous I/O requests are imposed both
+system-wide and per-process.
+These limits are configured via the following sysctls:
+.Bl -tag -width indent
+.It Va vfs.aio.max_buf_aio
+The maximum number of queued asynchronous I/O requests for raw disks permitted
+for a single process.
+Asynchronous I/O requests that have completed but whose status has not been
+retrieved via
+.Xr aio_return 2
+or
+.Xr aio_waitcomplete 2
+are not counted against this limit.
+.It Va vfs.aio.num_buf_aio
+The number of queued asynchronous I/O requests for raw disks system-wide.
+.It Va vfs.aio.max_aio_queue_per_proc
+The maximum number of asynchronous I/O requests for a single process
+serviced concurrently by the default AIO daemon pool.
+.It Va vfs.aio.max_aio_per_proc
+The maximum number of outstanding asynchronous I/O requests permitted for a
+single process.
+This includes requests that have not been serviced,
+requests currently being serviced,
+and requests that have completed but whose status has not been retrieved via
+.Xr aio_return 2
+or
+.Xr aio_waitcomplete 2 .
+.It Va vfs.aio.num_queue_count
+The number of outstanding asynchronous I/O requests system-wide.
+.It Va vfs.aio.max_aio_queue
+The maximum number of outstanding asynchronous I/O requests permitted
+system-wide.
+.El
.Sh SEE ALSO
.Xr aio_cancel 2 ,
.Xr aio_error 2 ,
@@ -54,9 +146,7 @@
.Xr aio_waitcomplete 2 ,
.Xr aio_write 2 ,
.Xr lio_listio 2 ,
-.Xr config 8 ,
-.Xr kldload 8 ,
-.Xr kldunload 8
+.Xr sysctl 8
.Sh HISTORY
The
.Nm
@@ -66,3 +156,7 @@
.Nm
kernel module appeared in
.Fx 5.0 .
+The
+.Nm
+facility was integrated into all kernels in
+.Fx 11.0 .
Index: sys/conf/NOTES
===================================================================
--- sys/conf/NOTES
+++ sys/conf/NOTES
@@ -1131,11 +1131,6 @@
#
options REISERFS
-# Use real implementations of the aio_* system calls. There are numerous
-# stability and security issues in the current aio code that make it
-# unsuitable for inclusion on machines with untrusted local users.
-options VFS_AIO
-
# Cryptographically secure random number generator; /dev/random
device random
Index: sys/conf/files
===================================================================
--- sys/conf/files
+++ sys/conf/files
@@ -3330,7 +3330,7 @@
kern/uipc_syscalls.c standard
kern/uipc_usrreq.c standard
kern/vfs_acl.c standard
-kern/vfs_aio.c optional vfs_aio
+kern/vfs_aio.c standard
kern/vfs_bio.c standard
kern/vfs_cache.c standard
kern/vfs_cluster.c standard
Index: sys/conf/options
===================================================================
--- sys/conf/options
+++ sys/conf/options
@@ -214,7 +214,6 @@
SW_WATCHDOG opt_watchdog.h
TURNSTILE_PROFILING
UMTX_PROFILING
-VFS_AIO
VERBOSE_SYSINIT
WLCACHE opt_wavelan.h
WLDEBUG opt_wavelan.h
Index: sys/kern/sys_socket.c
===================================================================
--- sys/kern/sys_socket.c
+++ sys/kern/sys_socket.c
@@ -34,9 +34,12 @@
#include <sys/param.h>
#include <sys/systm.h>
+#include <sys/aio.h>
#include <sys/domain.h>
#include <sys/file.h>
#include <sys/filedesc.h>
+#include <sys/kernel.h>
+#include <sys/kthread.h>
#include <sys/malloc.h>
#include <sys/proc.h>
#include <sys/protosw.h>
@@ -48,6 +51,9 @@
#include <sys/filio.h> /* XXX */
#include <sys/sockio.h>
#include <sys/stat.h>
+#include <sys/sysctl.h>
+#include <sys/sysproto.h>
+#include <sys/taskqueue.h>
#include <sys/uio.h>
#include <sys/ucred.h>
#include <sys/un.h>
@@ -64,6 +70,22 @@
#include <security/mac/mac_framework.h>
+#include <vm/vm.h>
+#include <vm/pmap.h>
+#include <vm/vm_extern.h>
+#include <vm/vm_map.h>
+
+static SYSCTL_NODE(_kern_ipc, OID_AUTO, aio, CTLFLAG_RD, NULL,
+ "socket AIO stats");
+
+static int empty_results;
+SYSCTL_INT(_kern_ipc_aio, OID_AUTO, empty_results, CTLFLAG_RD, &empty_results,
+ 0, "socket operation returned EAGAIN");
+
+static int empty_retries;
+SYSCTL_INT(_kern_ipc_aio, OID_AUTO, empty_retries, CTLFLAG_RD, &empty_retries,
+ 0, "socket operation retries");
+
static fo_rdwr_t soo_read;
static fo_rdwr_t soo_write;
static fo_ioctl_t soo_ioctl;
@@ -72,6 +94,9 @@
static fo_stat_t soo_stat;
static fo_close_t soo_close;
static fo_fill_kinfo_t soo_fill_kinfo;
+static fo_aio_queue_t soo_aio_queue;
+
+static void soo_aio_cancel(struct kaiocb *job);
struct fileops socketops = {
.fo_read = soo_read,
@@ -86,6 +111,7 @@
.fo_chown = invfo_chown,
.fo_sendfile = invfo_sendfile,
.fo_fill_kinfo = soo_fill_kinfo,
+ .fo_aio_queue = soo_aio_queue,
.fo_flags = DFLAG_PASSABLE
};
@@ -363,3 +389,374 @@
sizeof(kif->kf_path));
return (0);
}
+
+static STAILQ_HEAD(, task) soaio_jobs;
+static struct mtx soaio_jobs_lock;
+static struct task soaio_kproc_task;
+static int soaio_starting, soaio_idle, soaio_queued;
+static struct unrhdr *soaio_kproc_unr;
+
+static int soaio_max_procs = MAX_AIO_PROCS;
+SYSCTL_INT(_kern_ipc_aio, OID_AUTO, max_procs, CTLFLAG_RW, &soaio_max_procs, 0,
+ "Maximum number of kernel processes to use for async socket IO");
+
+static int soaio_num_procs;
+SYSCTL_INT(_kern_ipc_aio, OID_AUTO, num_procs, CTLFLAG_RD, &soaio_num_procs, 0,
+ "Number of active kernel processes for async socket IO");
+
+static int soaio_target_procs = TARGET_AIO_PROCS;
+SYSCTL_INT(_kern_ipc_aio, OID_AUTO, target_procs, CTLFLAG_RD,
+ &soaio_target_procs, 0,
+ "Preferred number of ready kernel processes for async socket IO");
+
+static int soaio_lifetime;
+SYSCTL_INT(_kern_ipc_aio, OID_AUTO, lifetime, CTLFLAG_RW, &soaio_lifetime, 0,
+ "Maximum lifetime for idle aiod");
+
+static void
+soaio_kproc_loop(void *arg)
+{
+ struct proc *p;
+ struct vmspace *myvm;
+ struct task *task;
+ int error, id, pending;
+
+ id = (intptr_t)arg;
+
+ /*
+ * Grab an extra reference on the daemon's vmspace so that it
+ * doesn't get freed by jobs that switch to a different
+ * vmspace.
+ */
+ p = curproc;
+ myvm = vmspace_acquire_ref(p);
+
+ mtx_lock(&soaio_jobs_lock);
+ MPASS(soaio_starting > 0);
+ soaio_starting--;
+ for (;;) {
+ while (!STAILQ_EMPTY(&soaio_jobs)) {
+ task = STAILQ_FIRST(&soaio_jobs);
+ STAILQ_REMOVE_HEAD(&soaio_jobs, ta_link);
+ soaio_queued--;
+ pending = task->ta_pending;
+ task->ta_pending = 0;
+ mtx_unlock(&soaio_jobs_lock);
+
+ task->ta_func(task->ta_context, pending);
+
+ mtx_lock(&soaio_jobs_lock);
+ }
+ MPASS(soaio_queued == 0);
+
+ if (p->p_vmspace != myvm) {
+ mtx_unlock(&soaio_jobs_lock);
+ vmspace_switch_aio(myvm);
+ mtx_lock(&soaio_jobs_lock);
+ continue;
+ }
+
+ soaio_idle++;
+ error = mtx_sleep(&soaio_idle, &soaio_jobs_lock, 0, "-",
+ soaio_lifetime);
+ soaio_idle--;
+ if (error == EWOULDBLOCK && STAILQ_EMPTY(&soaio_jobs) &&
+ soaio_num_procs > soaio_target_procs)
+ break;
+ }
+ soaio_num_procs--;
+ mtx_unlock(&soaio_jobs_lock);
+ free_unr(soaio_kproc_unr, id);
+ kproc_exit(0);
+}
+
+static void
+soaio_kproc_create(void *context, int pending)
+{
+ struct proc *p;
+ int error, id;
+
+ mtx_lock(&soaio_jobs_lock);
+ for (;;) {
+ if (soaio_num_procs < soaio_target_procs) {
+ /* Must create */
+ } else if (soaio_num_procs >= soaio_max_procs) {
+ /*
+ * Hit the limit on kernel processes, don't
+ * create another one.
+ */
+ break;
+ } else if (soaio_queued <= soaio_idle + soaio_starting) {
+ /*
+ * No more AIO jobs waiting for a process to be
+ * created, so stop.
+ */
+ break;
+ }
+ soaio_starting++;
+ mtx_unlock(&soaio_jobs_lock);
+
+ id = alloc_unr(soaio_kproc_unr);
+ error = kproc_create(soaio_kproc_loop, (void *)(intptr_t)id,
+ &p, 0, 0, "soaiod%d", id);
+ if (error != 0) {
+ free_unr(soaio_kproc_unr, id);
+ mtx_lock(&soaio_jobs_lock);
+ soaio_starting--;
+ break;
+ }
+
+ mtx_lock(&soaio_jobs_lock);
+ soaio_num_procs++;
+ }
+ mtx_unlock(&soaio_jobs_lock);
+}
+
+static void
+soaio_enqueue(struct task *task)
+{
+
+ mtx_lock(&soaio_jobs_lock);
+ MPASS(task->ta_pending == 0);
+ task->ta_pending++;
+ STAILQ_INSERT_TAIL(&soaio_jobs, task, ta_link);
+ soaio_queued++;
+ if (soaio_queued <= soaio_idle)
+ wakeup_one(&soaio_idle);
+ else if (soaio_num_procs < soaio_max_procs)
+ taskqueue_enqueue(taskqueue_thread, &soaio_kproc_task);
+ mtx_unlock(&soaio_jobs_lock);
+}
+
+static void
+soaio_init(void)
+{
+
+ soaio_lifetime = AIOD_LIFETIME_DEFAULT;
+ STAILQ_INIT(&soaio_jobs);
+ mtx_init(&soaio_jobs_lock, "soaio jobs", NULL, MTX_DEF);
+ soaio_kproc_unr = new_unrhdr(1, INT_MAX, NULL);
+ TASK_INIT(&soaio_kproc_task, 0, soaio_kproc_create, NULL);
+ if (soaio_target_procs > 0)
+ taskqueue_enqueue(taskqueue_thread, &soaio_kproc_task);
+}
+SYSINIT(soaio, SI_SUB_VFS, SI_ORDER_ANY, soaio_init, NULL);
+
+static __inline int
+soaio_ready(struct socket *so, struct sockbuf *sb)
+{
+ return (sb == &so->so_rcv ? soreadable(so) : sowriteable(so));
+}
+
+static void
+soaio_process_job(struct socket *so, struct sockbuf *sb, struct kaiocb *job)
+{
+ struct ucred *td_savedcred;
+ struct thread *td;
+ struct file *fp;
+ struct uio uio;
+ struct iovec iov;
+ size_t cnt;
+ int error, flags;
+
+ SOCKBUF_UNLOCK(sb);
+ aio_switch_vmspace(job);
+ td = curthread;
+ fp = job->fd_file;
+retry:
+ td_savedcred = td->td_ucred;
+ td->td_ucred = job->cred;
+
+ cnt = job->uaiocb.aio_nbytes;
+ iov.iov_base = (void *)(uintptr_t)job->uaiocb.aio_buf;
+ iov.iov_len = cnt;
+ uio.uio_iov = &iov;
+ uio.uio_iovcnt = 1;
+ uio.uio_offset = 0;
+ uio.uio_resid = cnt;
+ uio.uio_segflg = UIO_USERSPACE;
+ uio.uio_td = td;
+ flags = MSG_NBIO;
+
+ /* TODO: Charge ru_msg* to job. */
+
+ if (sb == &so->so_rcv) {
+ uio.uio_rw = UIO_READ;
+#ifdef MAC
+ error = mac_socket_check_receive(fp->f_cred, so);
+ if (error == 0)
+
+#endif
+ error = soreceive(so, NULL, &uio, NULL, NULL, &flags);
+ } else {
+ uio.uio_rw = UIO_WRITE;
+#ifdef MAC
+ error = mac_socket_check_send(fp->f_cred, so);
+ if (error == 0)
+#endif
+ error = sosend(so, NULL, &uio, NULL, NULL, flags, td);
+ if (error == EPIPE && (so->so_options & SO_NOSIGPIPE) == 0) {
+ PROC_LOCK(job->userproc);
+ kern_psignal(job->userproc, SIGPIPE);
+ PROC_UNLOCK(job->userproc);
+ }
+ }
+
+ cnt -= uio.uio_resid;
+ td->td_ucred = td_savedcred;
+
+ /* XXX: Not sure if this is needed? */
+ if (cnt != 0 && (error == ERESTART || error == EINTR ||
+ error == EWOULDBLOCK))
+ error = 0;
+ if (error == EWOULDBLOCK) {
+ /*
+ * A read() or write() on the socket raced with this
+ * request. If the socket is now ready, try again.
+ * If it is not, place this request at the head of the
+ * queue to try again when the socket is ready.
+ */
+ SOCKBUF_LOCK(sb);
+ empty_results++;
+ if (soaio_ready(so, sb)) {
+ empty_retries++;
+ SOCKBUF_UNLOCK(sb);
+ goto retry;
+ }
+
+ if (!aio_set_cancel_function(job, soo_aio_cancel)) {
+ MPASS(cnt == 0);
+ SOCKBUF_UNLOCK(sb);
+ aio_cancel(job);
+ SOCKBUF_LOCK(sb);
+ } else {
+ TAILQ_INSERT_HEAD(&sb->sb_aiojobq, job, list);
+ }
+ } else {
+ aio_complete(job, cnt, error);
+ SOCKBUF_LOCK(sb);
+ }
+}
+
+static void
+soaio_process_sb(struct socket *so, struct sockbuf *sb)
+{
+ struct kaiocb *job;
+
+ SOCKBUF_LOCK(sb);
+ while (!TAILQ_EMPTY(&sb->sb_aiojobq) && soaio_ready(so, sb)) {
+ job = TAILQ_FIRST(&sb->sb_aiojobq);
+ TAILQ_REMOVE(&sb->sb_aiojobq, job, list);
+ if (!aio_clear_cancel_function(job))
+ continue;
+
+ soaio_process_job(so, sb, job);
+ }
+
+ /*
+ * If there are still pending requests, the socket must not be
+ * ready so set SB_AIO to request a wakeup when the socket
+ * becomes ready.
+ */
+ if (!TAILQ_EMPTY(&sb->sb_aiojobq))
+ sb->sb_flags |= SB_AIO;
+ sb->sb_flags &= ~SB_AIO_RUNNING;
+ SOCKBUF_UNLOCK(sb);
+
+ ACCEPT_LOCK();
+ SOCK_LOCK(so);
+ sorele(so);
+}
+
+void
+soaio_rcv(void *context, int pending)
+{
+ struct socket *so;
+
+ so = context;
+ soaio_process_sb(so, &so->so_rcv);
+}
+
+void
+soaio_snd(void *context, int pending)
+{
+ struct socket *so;
+
+ so = context;
+ soaio_process_sb(so, &so->so_snd);
+}
+
+void
+sowakeup_aio(struct socket *so, struct sockbuf *sb)
+{
+
+ SOCKBUF_LOCK_ASSERT(sb);
+ sb->sb_flags &= ~SB_AIO;
+ if (sb->sb_flags & SB_AIO_RUNNING)
+ return;
+ sb->sb_flags |= SB_AIO_RUNNING;
+ if (sb == &so->so_snd)
+ SOCK_LOCK(so);
+ soref(so);
+ if (sb == &so->so_snd)
+ SOCK_UNLOCK(so);
+ soaio_enqueue(&sb->sb_aiotask);
+}
+
+static void
+soo_aio_cancel(struct kaiocb *job)
+{
+ struct socket *so;
+ struct sockbuf *sb;
+ int opcode;
+
+ so = job->fd_file->f_data;
+ opcode = job->uaiocb.aio_lio_opcode;
+ if (opcode == LIO_READ)
+ sb = &so->so_rcv;
+ else {
+ MPASS(opcode == LIO_WRITE);
+ sb = &so->so_snd;
+ }
+
+ SOCKBUF_LOCK(sb);
+ if (!aio_cancel_cleared(job))
+ TAILQ_REMOVE(&sb->sb_aiojobq, job, list);
+ if (TAILQ_EMPTY(&sb->sb_aiojobq))
+ sb->sb_flags &= ~SB_AIO;
+ SOCKBUF_UNLOCK(sb);
+
+ aio_cancel(job);
+}
+
+static int
+soo_aio_queue(struct file *fp, struct kaiocb *job)
+{
+ struct socket *so;
+ struct sockbuf *sb;
+
+ so = fp->f_data;
+ switch (job->uaiocb.aio_lio_opcode) {
+ case LIO_READ:
+ sb = &so->so_rcv;
+ break;
+ case LIO_WRITE:
+ sb = &so->so_snd;
+ break;
+ default:
+ return (EINVAL);
+ }
+
+ SOCKBUF_LOCK(sb);
+ if (!aio_set_cancel_function(job, soo_aio_cancel))
+ panic("new job was cancelled");
+ TAILQ_INSERT_TAIL(&sb->sb_aiojobq, job, list);
+ if (!(sb->sb_flags & SB_AIO_RUNNING)) {
+ if (soaio_ready(so, sb))
+ sowakeup_aio(so, sb);
+ else
+ sb->sb_flags |= SB_AIO;
+ }
+ SOCKBUF_UNLOCK(sb);
+ return (0);
+}
Index: sys/kern/uipc_debug.c
===================================================================
--- sys/kern/uipc_debug.c
+++ sys/kern/uipc_debug.c
@@ -416,6 +416,9 @@
db_printf("sb_flags: 0x%x (", sb->sb_flags);
db_print_sbflags(sb->sb_flags);
db_printf(")\n");
+
+ db_print_indent(indent);
+ db_printf("sb_aiojobq first: %p\n", TAILQ_FIRST(&sb->sb_aiojobq));
}
static void
@@ -470,7 +473,6 @@
db_print_indent(indent);
db_printf("so_sigio: %p ", so->so_sigio);
db_printf("so_oobmark: %lu ", so->so_oobmark);
- db_printf("so_aiojobq first: %p\n", TAILQ_FIRST(&so->so_aiojobq));
db_print_sockbuf(&so->so_rcv, "so_rcv", indent);
db_print_sockbuf(&so->so_snd, "so_snd", indent);
Index: sys/kern/uipc_sockbuf.c
===================================================================
--- sys/kern/uipc_sockbuf.c
+++ sys/kern/uipc_sockbuf.c
@@ -332,7 +332,7 @@
} else
ret = SU_OK;
if (sb->sb_flags & SB_AIO)
- aio_swake(so, sb);
+ sowakeup_aio(so, sb);
SOCKBUF_UNLOCK(sb);
if (ret == SU_ISCONNECTED)
soisconnected(so);
Index: sys/kern/uipc_socket.c
===================================================================
--- sys/kern/uipc_socket.c
+++ sys/kern/uipc_socket.c
@@ -134,6 +134,7 @@
#include <sys/stat.h>
#include <sys/sx.h>
#include <sys/sysctl.h>
+#include <sys/taskqueue.h>
#include <sys/uio.h>
#include <sys/jail.h>
#include <sys/syslog.h>
@@ -396,7 +397,10 @@
SOCKBUF_LOCK_INIT(&so->so_rcv, "so_rcv");
sx_init(&so->so_snd.sb_sx, "so_snd_sx");
sx_init(&so->so_rcv.sb_sx, "so_rcv_sx");
- TAILQ_INIT(&so->so_aiojobq);
+ TAILQ_INIT(&so->so_snd.sb_aiojobq);
+ TAILQ_INIT(&so->so_rcv.sb_aiojobq);
+ TASK_INIT(&so->so_snd.sb_aiotask, 0, soaio_snd, so);
+ TASK_INIT(&so->so_rcv.sb_aiotask, 0, soaio_rcv, so);
#ifdef VIMAGE
VNET_ASSERT(vnet != NULL, ("%s:%d vnet is NULL, so=%p",
__func__, __LINE__, so));
Index: sys/kern/vfs_aio.c
===================================================================
--- sys/kern/vfs_aio.c
+++ sys/kern/vfs_aio.c
@@ -72,8 +72,6 @@
#include <vm/uma.h>
#include <sys/aio.h>
-#include "opt_vfs_aio.h"
-
/*
* Counter for allocating reference ids to new jobs. Wrapped to 1 on
* overflow. (XXX will be removed soon.)
@@ -85,14 +83,6 @@
*/
static uint64_t jobseqno;
-#define JOBST_NULL 0
-#define JOBST_JOBQSOCK 1
-#define JOBST_JOBQGLOBAL 2
-#define JOBST_JOBRUNNING 3
-#define JOBST_JOBFINISHED 4
-#define JOBST_JOBQBUF 5
-#define JOBST_JOBQSYNC 6
-
#ifndef MAX_AIO_PER_PROC
#define MAX_AIO_PER_PROC 32
#endif
@@ -101,26 +91,14 @@
#define MAX_AIO_QUEUE_PER_PROC 256 /* Bigger than AIO_LISTIO_MAX */
#endif
-#ifndef MAX_AIO_PROCS
-#define MAX_AIO_PROCS 32
-#endif
-
#ifndef MAX_AIO_QUEUE
#define MAX_AIO_QUEUE 1024 /* Bigger than AIO_LISTIO_MAX */
#endif
-#ifndef TARGET_AIO_PROCS
-#define TARGET_AIO_PROCS 4
-#endif
-
#ifndef MAX_BUF_AIO
#define MAX_BUF_AIO 16
#endif
-#ifndef AIOD_LIFETIME_DEFAULT
-#define AIOD_LIFETIME_DEFAULT (30 * hz)
-#endif
-
FEATURE(aio, "Asynchronous I/O");
static MALLOC_DEFINE(M_LIO, "lio", "listio aio control block list");
@@ -128,6 +106,10 @@
static SYSCTL_NODE(_vfs, OID_AUTO, aio, CTLFLAG_RW, 0,
"Async IO management");
+static int enable_aio_unsafe = 0;
+SYSCTL_INT(_vfs_aio, OID_AUTO, enable_unsafe, CTLFLAG_RW, &enable_aio_unsafe, 0,
+ "Permit asynchronous IO on all file types, not just known-safe types");
+
static int max_aio_procs = MAX_AIO_PROCS;
SYSCTL_INT(_vfs_aio, OID_AUTO, max_aio_procs, CTLFLAG_RW, &max_aio_procs, 0,
"Maximum number of kernel processes to use for handling async IO ");
@@ -165,11 +147,6 @@
SYSCTL_INT(_vfs_aio, OID_AUTO, aiod_lifetime, CTLFLAG_RW, &aiod_lifetime, 0,
"Maximum lifetime for idle aiod");
-static int unloadable = 0;
-SYSCTL_INT(_vfs_aio, OID_AUTO, unloadable, CTLFLAG_RW, &unloadable, 0,
- "Allow unload of aio (not recommended)");
-
-
static int max_aio_per_proc = MAX_AIO_PER_PROC;
SYSCTL_INT(_vfs_aio, OID_AUTO, max_aio_per_proc, CTLFLAG_RW, &max_aio_per_proc,
0,
@@ -208,46 +185,27 @@
*/
/*
- * Current, there is only two backends: BIO and generic file I/O.
- * socket I/O is served by generic file I/O, this is not a good idea, since
- * disk file I/O and any other types without O_NONBLOCK flag can block daemon
- * processes, if there is no thread to serve socket I/O, the socket I/O will be
- * delayed too long or starved, we should create some processes dedicated to
- * sockets to do non-blocking I/O, same for pipe and fifo, for these I/O
- * systems we really need non-blocking interface, fiddling O_NONBLOCK in file
- * structure is not safe because there is race between userland and aio
- * daemons.
+ * If the routine that services an AIO request blocks while running in an
+ * AIO kernel process it can starve other I/O requests. BIO requests
+ * queued via aio_qphysio() complete in GEOM and do not use AIO kernel
+ * processes at all. Socket I/O requests use a separate pool of
+ * kprocs and also force non-blocking I/O. Other file I/O requests
+ * use the generic fo_read/fo_write operations which can block. The
+ * fsync and mlock operations can also block while executing. Ideally
+ * none of these requests would block while executing.
+ *
+ * Note that the service routines cannot toggle O_NONBLOCK in the file
+ * structure directly while handling a request due to races with
+ * userland threads.
*/
-struct kaiocb {
- TAILQ_ENTRY(kaiocb) list; /* (b) internal list of for backend */
- TAILQ_ENTRY(kaiocb) plist; /* (a) list of jobs for each backend */
- TAILQ_ENTRY(kaiocb) allist; /* (a) list of all jobs in proc */
- int jobflags; /* (a) job flags */
- int jobstate; /* (b) job state */
- int inputcharge; /* (*) input blockes */
- int outputcharge; /* (*) output blockes */
- struct bio *bp; /* (*) BIO backend BIO pointer */
- struct buf *pbuf; /* (*) BIO backend buffer pointer */
- struct vm_page *pages[btoc(MAXPHYS)+1]; /* BIO backend pages */
- int npages; /* BIO backend number of pages */
- struct proc *userproc; /* (*) user process */
- struct ucred *cred; /* (*) active credential when created */
- struct file *fd_file; /* (*) pointer to file structure */
- struct aioliojob *lio; /* (*) optional lio job */
- struct aiocb *ujob; /* (*) pointer in userspace of aiocb */
- struct knlist klist; /* (a) list of knotes */
- struct aiocb uaiocb; /* (*) kernel I/O control block */
- ksiginfo_t ksi; /* (a) realtime signal info */
- uint64_t seqno; /* (*) job number */
- int pending; /* (a) number of pending I/O, aio_fsync only */
-};
-
/* jobflags */
-#define KAIOCB_DONE 0x01
-#define KAIOCB_BUFDONE 0x02
-#define KAIOCB_RUNDOWN 0x04
+#define KAIOCB_QUEUEING 0x01
+#define KAIOCB_CANCELLED 0x02
+#define KAIOCB_CANCELLING 0x04
#define KAIOCB_CHECKSYNC 0x08
+#define KAIOCB_CLEARED 0x10
+#define KAIOCB_FINISHED 0x20
/*
* AIO process info
@@ -293,9 +251,10 @@
TAILQ_HEAD(,kaiocb) kaio_done; /* (a) done queue for process */
TAILQ_HEAD(,aioliojob) kaio_liojoblist; /* (a) list of lio jobs */
TAILQ_HEAD(,kaiocb) kaio_jobqueue; /* (a) job queue for process */
- TAILQ_HEAD(,kaiocb) kaio_bufqueue; /* (a) buffer job queue */
TAILQ_HEAD(,kaiocb) kaio_syncqueue; /* (a) queue for aio_fsync */
+ TAILQ_HEAD(,kaiocb) kaio_syncready; /* (a) second q for aio_fsync */
struct task kaio_task; /* (*) task to kick aio processes */
+ struct task kaio_sync_task; /* (*) task to schedule fsync jobs */
};
#define AIO_LOCK(ki) mtx_lock(&(ki)->kaio_mtx)
@@ -332,21 +291,18 @@
static void aio_process_rw(struct kaiocb *job);
static void aio_process_sync(struct kaiocb *job);
static void aio_process_mlock(struct kaiocb *job);
+static void aio_schedule_fsync(void *context, int pending);
static int aio_newproc(int *);
int aio_aqueue(struct thread *td, struct aiocb *ujob,
struct aioliojob *lio, int type, struct aiocb_ops *ops);
+static int aio_queue_file(struct file *fp, struct kaiocb *job);
static void aio_physwakeup(struct bio *bp);
static void aio_proc_rundown(void *arg, struct proc *p);
static void aio_proc_rundown_exec(void *arg, struct proc *p,
struct image_params *imgp);
static int aio_qphysio(struct proc *p, struct kaiocb *job);
static void aio_daemon(void *param);
-static void aio_swake_cb(struct socket *, struct sockbuf *);
-static int aio_unload(void);
-static void aio_bio_done_notify(struct proc *userp, struct kaiocb *job,
- int type);
-#define DONE_BUF 1
-#define DONE_QUEUE 2
+static void aio_bio_done_notify(struct proc *userp, struct kaiocb *job);
static int aio_kick(struct proc *userp);
static void aio_kick_nowait(struct proc *userp);
static void aio_kick_helper(void *context, int pending);
@@ -397,13 +353,10 @@
case MOD_LOAD:
aio_onceonly();
break;
- case MOD_UNLOAD:
- error = aio_unload();
- break;
case MOD_SHUTDOWN:
break;
default:
- error = EINVAL;
+ error = EOPNOTSUPP;
break;
}
return (error);
@@ -471,8 +424,6 @@
{
int error;
- /* XXX: should probably just use so->callback */
- aio_swake = &aio_swake_cb;
exit_tag = EVENTHANDLER_REGISTER(process_exit, aio_proc_rundown, NULL,
EVENTHANDLER_PRI_ANY);
exec_tag = EVENTHANDLER_REGISTER(process_exec, aio_proc_rundown_exec,
@@ -513,55 +464,6 @@
}
/*
- * Callback for unload of AIO when used as a module.
- */
-static int
-aio_unload(void)
-{
- int error;
-
- /*
- * XXX: no unloads by default, it's too dangerous.
- * perhaps we could do it if locked out callers and then
- * did an aio_proc_rundown() on each process.
- *
- * jhb: aio_proc_rundown() needs to run on curproc though,
- * so I don't think that would fly.
- */
- if (!unloadable)
- return (EOPNOTSUPP);
-
-#ifdef COMPAT_FREEBSD32
- syscall32_helper_unregister(aio32_syscalls);
-#endif
- syscall_helper_unregister(aio_syscalls);
-
- error = kqueue_del_filteropts(EVFILT_AIO);
- if (error)
- return error;
- error = kqueue_del_filteropts(EVFILT_LIO);
- if (error)
- return error;
- async_io_version = 0;
- aio_swake = NULL;
- taskqueue_free(taskqueue_aiod_kick);
- delete_unrhdr(aiod_unr);
- uma_zdestroy(kaio_zone);
- uma_zdestroy(aiop_zone);
- uma_zdestroy(aiocb_zone);
- uma_zdestroy(aiol_zone);
- uma_zdestroy(aiolio_zone);
- EVENTHANDLER_DEREGISTER(process_exit, exit_tag);
- EVENTHANDLER_DEREGISTER(process_exec, exec_tag);
- mtx_destroy(&aio_job_mtx);
- sema_destroy(&aio_newproc_sem);
- p31b_setcfg(CTL_P1003_1B_AIO_LISTIO_MAX, -1);
- p31b_setcfg(CTL_P1003_1B_AIO_MAX, -1);
- p31b_setcfg(CTL_P1003_1B_AIO_PRIO_DELTA_MAX, -1);
- return (0);
-}
-
-/*
* Init the per-process aioinfo structure. The aioinfo limits are set
* per-process for user limit (resource) management.
*/
@@ -582,10 +484,11 @@
TAILQ_INIT(&ki->kaio_all);
TAILQ_INIT(&ki->kaio_done);
TAILQ_INIT(&ki->kaio_jobqueue);
- TAILQ_INIT(&ki->kaio_bufqueue);
TAILQ_INIT(&ki->kaio_liojoblist);
TAILQ_INIT(&ki->kaio_syncqueue);
+ TAILQ_INIT(&ki->kaio_syncready);
TASK_INIT(&ki->kaio_task, 0, aio_kick_helper, p);
+ TASK_INIT(&ki->kaio_sync_task, 0, aio_schedule_fsync, ki);
PROC_LOCK(p);
if (p->p_aioinfo == NULL) {
p->p_aioinfo = ki;
@@ -637,7 +540,7 @@
MPASS(ki != NULL);
AIO_LOCK_ASSERT(ki, MA_OWNED);
- MPASS(job->jobstate == JOBST_JOBFINISHED);
+ MPASS(job->jobflags & KAIOCB_FINISHED);
atomic_subtract_int(&num_queue_count, 1);
@@ -670,7 +573,6 @@
PROC_UNLOCK(p);
MPASS(job->bp == NULL);
- job->jobstate = JOBST_NULL;
AIO_UNLOCK(ki);
/*
@@ -709,6 +611,57 @@
aio_proc_rundown(arg, p);
}
+static int
+aio_cancel_job(struct proc *p, struct kaioinfo *ki, struct kaiocb *job)
+{
+ aio_cancel_fn_t *func;
+ int cancelled;
+
+ AIO_LOCK_ASSERT(ki, MA_OWNED);
+ if (job->jobflags & (KAIOCB_CANCELLED | KAIOCB_FINISHED))
+ return (0);
+ MPASS((job->jobflags & KAIOCB_CANCELLING) == 0);
+ job->jobflags |= KAIOCB_CANCELLED;
+
+ func = job->cancel_fn;
+
+ /*
+ * If there is no cancel routine, just leave the job marked as
+ * cancelled. The job should be in active use by a caller who
+ * should complete it normally or when it fails to install a
+ * cancel routine.
+ */
+ if (func == NULL)
+ return (0);
+
+ /*
+ * Set the CANCELLING flag so that aio_complete() will defer
+ * completions of this job. This prevents the job from being
+ * freed out from under the cancel callback. After the
+ * callback any deferred completion (whether from the callback
+ * or any other source) will be completed.
+ */
+ job->jobflags |= KAIOCB_CANCELLING;
+ AIO_UNLOCK(ki);
+ func(job);
+ AIO_LOCK(ki);
+ job->jobflags &= ~KAIOCB_CANCELLING;
+ if (job->jobflags & KAIOCB_FINISHED) {
+ cancelled = job->uaiocb._aiocb_private.error == ECANCELED;
+ TAILQ_REMOVE(&ki->kaio_jobqueue, job, plist);
+ aio_bio_done_notify(p, job);
+ } else {
+ /*
+ * The cancel callback might have scheduled an
+ * operation to cancel this request, but it is
+ * only counted as cancelled if the request is
+ * cancelled when the callback returns.
+ */
+ cancelled = 0;
+ }
+ return (cancelled);
+}
+
/*
* Rundown the jobs for a given process.
*/
@@ -718,9 +671,6 @@
struct kaioinfo *ki;
struct aioliojob *lj;
struct kaiocb *job, *jobn;
- struct file *fp;
- struct socket *so;
- int remove;
KASSERT(curthread->td_proc == p,
("%s: called on non-curproc", __func__));
@@ -738,35 +688,11 @@
* aio_cancel on all pending I/O requests.
*/
TAILQ_FOREACH_SAFE(job, &ki->kaio_jobqueue, plist, jobn) {
- remove = 0;
- mtx_lock(&aio_job_mtx);
- if (job->jobstate == JOBST_JOBQGLOBAL) {
- TAILQ_REMOVE(&aio_jobs, job, list);
- remove = 1;
- } else if (job->jobstate == JOBST_JOBQSOCK) {
- fp = job->fd_file;
- MPASS(fp->f_type == DTYPE_SOCKET);
- so = fp->f_data;
- TAILQ_REMOVE(&so->so_aiojobq, job, list);
- remove = 1;
- } else if (job->jobstate == JOBST_JOBQSYNC) {
- TAILQ_REMOVE(&ki->kaio_syncqueue, job, list);
- remove = 1;
- }
- mtx_unlock(&aio_job_mtx);
-
- if (remove) {
- job->jobstate = JOBST_JOBFINISHED;
- job->uaiocb._aiocb_private.status = -1;
- job->uaiocb._aiocb_private.error = ECANCELED;
- TAILQ_REMOVE(&ki->kaio_jobqueue, job, plist);
- aio_bio_done_notify(p, job, DONE_QUEUE);
- }
+ aio_cancel_job(p, ki, job);
}
/* Wait for all running I/O to be finished */
- if (TAILQ_FIRST(&ki->kaio_bufqueue) ||
- TAILQ_FIRST(&ki->kaio_jobqueue)) {
+ if (TAILQ_FIRST(&ki->kaio_jobqueue) || ki->kaio_active_count != 0) {
ki->kaio_flags |= KAIO_WAKEUP;
msleep(&p->p_aioinfo, AIO_MTX(ki), PRIBIO, "aioprn", hz);
goto restart;
@@ -791,6 +717,7 @@
}
AIO_UNLOCK(ki);
taskqueue_drain(taskqueue_aiod_kick, &ki->kaio_task);
+ taskqueue_drain(taskqueue_aiod_kick, &ki->kaio_sync_task);
mtx_destroy(&ki->kaio_mtx);
uma_zfree(kaio_zone, ki);
p->p_aioinfo = NULL;
@@ -807,15 +734,18 @@
struct proc *userp;
mtx_assert(&aio_job_mtx, MA_OWNED);
+restart:
TAILQ_FOREACH(job, &aio_jobs, list) {
userp = job->userproc;
ki = userp->p_aioinfo;
if (ki->kaio_active_count < ki->kaio_maxactive_count) {
TAILQ_REMOVE(&aio_jobs, job, list);
+ if (!aio_clear_cancel_function(job))
+ goto restart;
+
/* Account for currently active jobs. */
ki->kaio_active_count++;
- job->jobstate = JOBST_JOBRUNNING;
break;
}
}
@@ -863,7 +793,6 @@
struct thread *td;
struct aiocb *cb;
struct file *fp;
- struct socket *so;
struct uio auio;
struct iovec aiov;
int cnt;
@@ -875,6 +804,7 @@
job->uaiocb.aio_lio_opcode == LIO_WRITE,
("%s: opcode %d", __func__, job->uaiocb.aio_lio_opcode));
+ aio_switch_vmspace(job);
td = curthread;
td_savedcred = td->td_ucred;
td->td_ucred = job->cred;
@@ -920,24 +850,15 @@
if (error == ERESTART || error == EINTR || error == EWOULDBLOCK)
error = 0;
if ((error == EPIPE) && (cb->aio_lio_opcode == LIO_WRITE)) {
- int sigpipe = 1;
- if (fp->f_type == DTYPE_SOCKET) {
- so = fp->f_data;
- if (so->so_options & SO_NOSIGPIPE)
- sigpipe = 0;
- }
- if (sigpipe) {
- PROC_LOCK(job->userproc);
- kern_psignal(job->userproc, SIGPIPE);
- PROC_UNLOCK(job->userproc);
- }
+ PROC_LOCK(job->userproc);
+ kern_psignal(job->userproc, SIGPIPE);
+ PROC_UNLOCK(job->userproc);
}
}
cnt -= auio.uio_resid;
- cb->_aiocb_private.error = error;
- cb->_aiocb_private.status = cnt;
td->td_ucred = td_savedcred;
+ aio_complete(job, cnt, error);
}
static void
@@ -945,7 +866,6 @@
{
struct thread *td = curthread;
struct ucred *td_savedcred = td->td_ucred;
- struct aiocb *cb = &job->uaiocb;
struct file *fp = job->fd_file;
int error = 0;
@@ -955,9 +875,8 @@
td->td_ucred = job->cred;
if (fp->f_vnode != NULL)
error = aio_fsync_vnode(td, fp->f_vnode);
- cb->_aiocb_private.error = error;
- cb->_aiocb_private.status = 0;
td->td_ucred = td_savedcred;
+ aio_complete(job, 0, error);
}
static void
@@ -969,19 +888,20 @@
KASSERT(job->uaiocb.aio_lio_opcode == LIO_MLOCK,
("%s: opcode %d", __func__, job->uaiocb.aio_lio_opcode));
+ aio_switch_vmspace(job);
error = vm_mlock(job->userproc, job->cred,
__DEVOLATILE(void *, cb->aio_buf), cb->aio_nbytes);
- cb->_aiocb_private.error = error;
- cb->_aiocb_private.status = 0;
+ aio_complete(job, 0, error);
}
static void
-aio_bio_done_notify(struct proc *userp, struct kaiocb *job, int type)
+aio_bio_done_notify(struct proc *userp, struct kaiocb *job)
{
struct aioliojob *lj;
struct kaioinfo *ki;
struct kaiocb *sjob, *sjobn;
int lj_done;
+ bool schedule_fsync;
ki = userp->p_aioinfo;
AIO_LOCK_ASSERT(ki, MA_OWNED);
@@ -992,13 +912,8 @@
if (lj->lioj_count == lj->lioj_finished_count)
lj_done = 1;
}
- if (type == DONE_QUEUE) {
- job->jobflags |= KAIOCB_DONE;
- } else {
- job->jobflags |= KAIOCB_BUFDONE;
- }
TAILQ_INSERT_TAIL(&ki->kaio_done, job, plist);
- job->jobstate = JOBST_JOBFINISHED;
+ MPASS(job->jobflags & KAIOCB_FINISHED);
if (ki->kaio_flags & KAIO_RUNDOWN)
goto notification_done;
@@ -1025,20 +940,24 @@
notification_done:
if (job->jobflags & KAIOCB_CHECKSYNC) {
+ schedule_fsync = false;
TAILQ_FOREACH_SAFE(sjob, &ki->kaio_syncqueue, list, sjobn) {
if (job->fd_file == sjob->fd_file &&
job->seqno < sjob->seqno) {
if (--sjob->pending == 0) {
- mtx_lock(&aio_job_mtx);
- sjob->jobstate = JOBST_JOBQGLOBAL;
TAILQ_REMOVE(&ki->kaio_syncqueue, sjob,
list);
- TAILQ_INSERT_TAIL(&aio_jobs, sjob, list);
- aio_kick_nowait(userp);
- mtx_unlock(&aio_job_mtx);
+ if (!aio_clear_cancel_function(sjob))
+ continue;
+ TAILQ_INSERT_TAIL(&ki->kaio_syncready,
+ sjob, list);
+ schedule_fsync = true;
}
}
}
+ if (schedule_fsync)
+ taskqueue_enqueue(taskqueue_aiod_kick,
+ &ki->kaio_sync_task);
}
if (ki->kaio_flags & KAIO_WAKEUP) {
ki->kaio_flags &= ~KAIO_WAKEUP;
@@ -1047,6 +966,103 @@
}
static void
+aio_schedule_fsync(void *context, int pending)
+{
+ struct kaioinfo *ki;
+ struct kaiocb *job;
+
+ ki = context;
+ AIO_LOCK(ki);
+ while (!TAILQ_EMPTY(&ki->kaio_syncready)) {
+ job = TAILQ_FIRST(&ki->kaio_syncready);
+ TAILQ_REMOVE(&ki->kaio_syncready, job, list);
+ AIO_UNLOCK(ki);
+ aio_schedule(job, aio_process_sync);
+ AIO_LOCK(ki);
+ }
+ AIO_UNLOCK(ki);
+}
+
+bool
+aio_cancel_cleared(struct kaiocb *job)
+{
+ struct kaioinfo *ki;
+
+ /*
+ * The caller should hold the same queue lock held when
+ * aio_clear_cancel_function() was called and set this flag
+ * ensuring this check sees an up-to-date value. However,
+ * there is no way to assert that.
+ */
+ ki = job->userproc->p_aioinfo;
+ return ((job->jobflags & KAIOCB_CLEARED) != 0);
+}
+
+bool
+aio_clear_cancel_function(struct kaiocb *job)
+{
+ struct kaioinfo *ki;
+
+ ki = job->userproc->p_aioinfo;
+ AIO_LOCK(ki);
+ MPASS(job->cancel_fn != NULL);
+ if (job->jobflags & KAIOCB_CANCELLING) {
+ job->jobflags |= KAIOCB_CLEARED;
+ AIO_UNLOCK(ki);
+ return (false);
+ }
+ job->cancel_fn = NULL;
+ AIO_UNLOCK(ki);
+ return (true);
+}
+
+bool
+aio_set_cancel_function(struct kaiocb *job, aio_cancel_fn_t *func)
+{
+ struct kaioinfo *ki;
+
+ ki = job->userproc->p_aioinfo;
+ AIO_LOCK(ki);
+ if (job->jobflags & KAIOCB_CANCELLED) {
+ AIO_UNLOCK(ki);
+ return (false);
+ }
+ job->cancel_fn = func;
+ AIO_UNLOCK(ki);
+ return (true);
+}
+
+void
+aio_complete(struct kaiocb *job, long status, int error)
+{
+ struct kaioinfo *ki;
+ struct proc *userp;
+
+ job->uaiocb._aiocb_private.error = error;
+ job->uaiocb._aiocb_private.status = status;
+
+ userp = job->userproc;
+ ki = userp->p_aioinfo;
+
+ AIO_LOCK(ki);
+ KASSERT(!(job->jobflags & KAIOCB_FINISHED),
+ ("duplicate aio_complete"));
+ job->jobflags |= KAIOCB_FINISHED;
+ if ((job->jobflags & (KAIOCB_QUEUEING | KAIOCB_CANCELLING)) == 0) {
+ TAILQ_REMOVE(&ki->kaio_jobqueue, job, plist);
+ aio_bio_done_notify(userp, job);
+ }
+ AIO_UNLOCK(ki);
+}
+
+void
+aio_cancel(struct kaiocb *job)
+{
+
+ aio_complete(job, -1, ECANCELED);
+}
+
+void
aio_switch_vmspace(struct kaiocb *job)
{
@@ -1063,7 +1079,7 @@
struct kaiocb *job;
struct aioproc *aiop;
struct kaioinfo *ki;
- struct proc *p, *userp;
+ struct proc *p;
struct vmspace *myvm;
struct thread *td = curthread;
int id = (intptr_t)_id;
@@ -1107,40 +1123,13 @@
*/
while ((job = aio_selectjob(aiop)) != NULL) {
mtx_unlock(&aio_job_mtx);
- userp = job->userproc;
-
- /*
- * Connect to process address space for user program.
- */
- aio_switch_vmspace(job);
- ki = userp->p_aioinfo;
-
- /* Do the I/O function. */
- switch(job->uaiocb.aio_lio_opcode) {
- case LIO_READ:
- case LIO_WRITE:
- aio_process_rw(job);
- break;
- case LIO_SYNC:
- aio_process_sync(job);
- break;
- case LIO_MLOCK:
- aio_process_mlock(job);
- break;
- }
+ ki = job->userproc->p_aioinfo;
+ job->handle_fn(job);
mtx_lock(&aio_job_mtx);
/* Decrement the active job count. */
ki->kaio_active_count--;
- mtx_unlock(&aio_job_mtx);
-
- AIO_LOCK(ki);
- TAILQ_REMOVE(&ki->kaio_jobqueue, job, plist);
- aio_bio_done_notify(userp, job, DONE_QUEUE);
- AIO_UNLOCK(ki);
-
- mtx_lock(&aio_job_mtx);
}
/*
@@ -1236,7 +1225,6 @@
struct cdevsw *csw;
struct cdev *dev;
struct kaioinfo *ki;
- struct aioliojob *lj;
int error, ref, unmap, poff;
vm_prot_t prot;
@@ -1293,16 +1281,8 @@
}
AIO_LOCK(ki);
- ki->kaio_count++;
if (!unmap)
ki->kaio_buffer_count++;
- lj = job->lio;
- if (lj)
- lj->lioj_count++;
- TAILQ_INSERT_TAIL(&ki->kaio_bufqueue, job, plist);
- TAILQ_INSERT_TAIL(&ki->kaio_all, job, allist);
- job->jobstate = JOBST_JOBQBUF;
- cb->_aiocb_private.status = cb->aio_nbytes;
AIO_UNLOCK(ki);
bp->bio_length = cb->aio_nbytes;
@@ -1336,7 +1316,6 @@
bp->bio_flags |= BIO_UNMAPPED;
}
- atomic_add_int(&num_queue_count, 1);
if (!unmap)
atomic_add_int(&num_buf_aio, 1);
@@ -1347,14 +1326,8 @@
doerror:
AIO_LOCK(ki);
- job->jobstate = JOBST_NULL;
- TAILQ_REMOVE(&ki->kaio_bufqueue, job, plist);
- TAILQ_REMOVE(&ki->kaio_all, job, allist);
- ki->kaio_count--;
if (!unmap)
ki->kaio_buffer_count--;
- if (lj)
- lj->lioj_count--;
AIO_UNLOCK(ki);
if (pbuf) {
relpbuf(pbuf, NULL);
@@ -1367,40 +1340,6 @@
return (error);
}
-/*
- * Wake up aio requests that may be serviceable now.
- */
-static void
-aio_swake_cb(struct socket *so, struct sockbuf *sb)
-{
- struct kaiocb *job, *jobn;
- int opcode;
-
- SOCKBUF_LOCK_ASSERT(sb);
- if (sb == &so->so_snd)
- opcode = LIO_WRITE;
- else
- opcode = LIO_READ;
-
- sb->sb_flags &= ~SB_AIO;
- mtx_lock(&aio_job_mtx);
- TAILQ_FOREACH_SAFE(job, &so->so_aiojobq, list, jobn) {
- if (opcode == job->uaiocb.aio_lio_opcode) {
- if (job->jobstate != JOBST_JOBQSOCK)
- panic("invalid queue value");
- /* XXX
- * We don't have actual sockets backend yet,
- * so we simply move the requests to the generic
- * file I/O backend.
- */
- TAILQ_REMOVE(&so->so_aiojobq, job, list);
- TAILQ_INSERT_TAIL(&aio_jobs, job, list);
- aio_kick_nowait(job->userproc);
- }
- }
- mtx_unlock(&aio_job_mtx);
-}
-
static int
convert_old_sigevent(struct osigevent *osig, struct sigevent *nsig)
{
@@ -1521,11 +1460,9 @@
struct proc *p = td->td_proc;
cap_rights_t rights;
struct file *fp;
- struct socket *so;
- struct kaiocb *job, *job2;
+ struct kaiocb *job;
struct kaioinfo *ki;
struct kevent kev;
- struct sockbuf *sb;
int opcode;
int error;
int fd, kqfd;
@@ -1668,86 +1605,128 @@
kev.data = (intptr_t)job;
kev.udata = job->uaiocb.aio_sigevent.sigev_value.sival_ptr;
error = kqfd_register(kqfd, &kev, td, 1);
-aqueue_fail:
- if (error) {
- if (fp)
- fdrop(fp, td);
- uma_zfree(aiocb_zone, job);
- ops->store_error(ujob, error);
- goto done;
- }
+ if (error)
+ goto aqueue_fail;
+
no_kqueue:
ops->store_error(ujob, EINPROGRESS);
job->uaiocb._aiocb_private.error = EINPROGRESS;
job->userproc = p;
job->cred = crhold(td->td_ucred);
- job->jobflags = 0;
+ job->jobflags = KAIOCB_QUEUEING;
job->lio = lj;
- if (opcode == LIO_SYNC)
- goto queueit;
+ if (opcode == LIO_MLOCK) {
+ aio_schedule(job, aio_process_mlock);
+ error = 0;
+ } else if (fp->f_ops->fo_aio_queue == NULL)
+ error = aio_queue_file(fp, job);
+ else
+ error = fo_aio_queue(fp, job);
+ if (error)
+ goto aqueue_fail;
- if (fp && fp->f_type == DTYPE_SOCKET) {
+ AIO_LOCK(ki);
+ job->jobflags &= ~KAIOCB_QUEUEING;
+ TAILQ_INSERT_TAIL(&ki->kaio_all, job, allist);
+ ki->kaio_count++;
+ if (lj)
+ lj->lioj_count++;
+ atomic_add_int(&num_queue_count, 1);
+ if (job->jobflags & KAIOCB_FINISHED) {
/*
- * Alternate queueing for socket ops: Reach down into the
- * descriptor to get the socket data. Then check to see if the
- * socket is ready to be read or written (based on the requested
- * operation).
- *
- * If it is not ready for io, then queue the job on the
- * socket, and set the flags so we get a call when sbnotify()
- * happens.
- *
- * Note if opcode is neither LIO_WRITE nor LIO_READ we lock
- * and unlock the snd sockbuf for no reason.
+ * The queue callback completed the request synchronously.
+ * The bulk of the completion is deferred in that case
+ * until this point.
*/
- so = fp->f_data;
- sb = (opcode == LIO_READ) ? &so->so_rcv : &so->so_snd;
- SOCKBUF_LOCK(sb);
- if (((opcode == LIO_READ) && (!soreadable(so))) || ((opcode ==
- LIO_WRITE) && (!sowriteable(so)))) {
- sb->sb_flags |= SB_AIO;
+ aio_bio_done_notify(p, job);
+ } else
+ TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, job, plist);
+ AIO_UNLOCK(ki);
+ return (0);
- mtx_lock(&aio_job_mtx);
- TAILQ_INSERT_TAIL(&so->so_aiojobq, job, list);
- mtx_unlock(&aio_job_mtx);
+aqueue_fail:
+ knlist_delete(&job->klist, curthread, 0);
+ if (fp)
+ fdrop(fp, td);
+ uma_zfree(aiocb_zone, job);
+ ops->store_error(ujob, error);
+ return (error);
+}
- AIO_LOCK(ki);
- TAILQ_INSERT_TAIL(&ki->kaio_all, job, allist);
- TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, job, plist);
- job->jobstate = JOBST_JOBQSOCK;
- ki->kaio_count++;
- if (lj)
- lj->lioj_count++;
- AIO_UNLOCK(ki);
- SOCKBUF_UNLOCK(sb);
- atomic_add_int(&num_queue_count, 1);
- error = 0;
- goto done;
- }
- SOCKBUF_UNLOCK(sb);
+static void
+aio_cancel_daemon_job(struct kaiocb *job)
+{
+
+ mtx_lock(&aio_job_mtx);
+ if (!aio_cancel_cleared(job))
+ TAILQ_REMOVE(&aio_jobs, job, list);
+ mtx_unlock(&aio_job_mtx);
+ aio_cancel(job);
+}
+
+void
+aio_schedule(struct kaiocb *job, aio_handle_fn_t *func)
+{
+
+ mtx_lock(&aio_job_mtx);
+ if (!aio_set_cancel_function(job, aio_cancel_daemon_job)) {
+ mtx_unlock(&aio_job_mtx);
+ aio_cancel(job);
+ return;
}
+ job->handle_fn = func;
+ TAILQ_INSERT_TAIL(&aio_jobs, job, list);
+ aio_kick_nowait(job->userproc);
+ mtx_unlock(&aio_job_mtx);
+}
+
+static void
+aio_cancel_sync(struct kaiocb *job)
+{
+ struct kaioinfo *ki;
+
+ ki = job->userproc->p_aioinfo;
+ mtx_lock(&aio_job_mtx);
+ if (!aio_cancel_cleared(job))
+ TAILQ_REMOVE(&ki->kaio_syncqueue, job, list);
+ mtx_unlock(&aio_job_mtx);
+ aio_cancel(job);
+}
+
+int
+aio_queue_file(struct file *fp, struct kaiocb *job)
+{
+ struct aioliojob *lj;
+ struct kaioinfo *ki;
+ struct kaiocb *job2;
+ int error, opcode;
+
+ lj = job->lio;
+ ki = job->userproc->p_aioinfo;
+ opcode = job->uaiocb.aio_lio_opcode;
+ if (opcode == LIO_SYNC)
+ goto queueit;
- if ((error = aio_qphysio(p, job)) == 0)
+ if ((error = aio_qphysio(job->userproc, job)) == 0)
goto done;
#if 0
- if (error > 0) {
- job->uaiocb._aiocb_private.error = error;
- ops->store_error(ujob, error);
+ /*
+ * XXX: This means qphysio() failed with EFAULT. The current
+ * behavior is to retry the operation via fo_read/fo_write.
+ * Wouldn't it be better to just complete the request with an
+ * error here?
+ */
+ if (error > 0)
goto done;
- }
#endif
queueit:
- atomic_add_int(&num_queue_count, 1);
+ if (!enable_aio_unsafe)
+ return (EOPNOTSUPP);
- AIO_LOCK(ki);
- ki->kaio_count++;
- if (lj)
- lj->lioj_count++;
- TAILQ_INSERT_TAIL(&ki->kaio_jobqueue, job, plist);
- TAILQ_INSERT_TAIL(&ki->kaio_all, job, allist);
if (opcode == LIO_SYNC) {
+ AIO_LOCK(ki);
TAILQ_FOREACH(job2, &ki->kaio_jobqueue, plist) {
if (job2->fd_file == job->fd_file &&
job2->uaiocb.aio_lio_opcode != LIO_SYNC &&
@@ -1756,28 +1735,32 @@
job->pending++;
}
}
- TAILQ_FOREACH(job2, &ki->kaio_bufqueue, plist) {
- if (job2->fd_file == job->fd_file &&
- job2->uaiocb.aio_lio_opcode != LIO_SYNC &&
- job2->seqno < job->seqno) {
- job2->jobflags |= KAIOCB_CHECKSYNC;
- job->pending++;
- }
- }
if (job->pending != 0) {
+ if (!aio_set_cancel_function(job, aio_cancel_sync)) {
+ AIO_UNLOCK(ki);
+ aio_cancel(job);
+ return (0);
+ }
TAILQ_INSERT_TAIL(&ki->kaio_syncqueue, job, list);
- job->jobstate = JOBST_JOBQSYNC;
AIO_UNLOCK(ki);
- goto done;
+ return (0);
}
+ AIO_UNLOCK(ki);
+ }
+
+ switch (opcode) {
+ case LIO_READ:
+ case LIO_WRITE:
+ aio_schedule(job, aio_process_rw);
+ error = 0;
+ break;
+ case LIO_SYNC:
+ aio_schedule(job, aio_process_sync);
+ error = 0;
+ break;
+ default:
+ error = EINVAL;
}
- mtx_lock(&aio_job_mtx);
- TAILQ_INSERT_TAIL(&aio_jobs, job, list);
- job->jobstate = JOBST_JOBQGLOBAL;
- aio_kick_nowait(p);
- mtx_unlock(&aio_job_mtx);
- AIO_UNLOCK(ki);
- error = 0;
done:
return (error);
}
@@ -1864,7 +1847,7 @@
break;
}
if (job != NULL) {
- MPASS(job->jobstate == JOBST_JOBFINISHED);
+ MPASS(job->jobflags & KAIOCB_FINISHED);
status = job->uaiocb._aiocb_private.status;
error = job->uaiocb._aiocb_private.error;
td->td_retval[0] = status;
@@ -1933,7 +1916,7 @@
if (job->ujob == ujoblist[i]) {
if (firstjob == NULL)
firstjob = job;
- if (job->jobstate == JOBST_JOBFINISHED)
+ if (job->jobflags & KAIOCB_FINISHED)
goto RETURN;
}
}
@@ -1992,10 +1975,8 @@
struct kaioinfo *ki;
struct kaiocb *job, *jobn;
struct file *fp;
- struct socket *so;
cap_rights_t rights;
int error;
- int remove;
int cancelled = 0;
int notcancelled = 0;
struct vnode *vp;
@@ -2023,28 +2004,7 @@
if ((uap->fd == job->uaiocb.aio_fildes) &&
((uap->aiocbp == NULL) ||
(uap->aiocbp == job->ujob))) {
- remove = 0;
-
- mtx_lock(&aio_job_mtx);
- if (job->jobstate == JOBST_JOBQGLOBAL) {
- TAILQ_REMOVE(&aio_jobs, job, list);
- remove = 1;
- } else if (job->jobstate == JOBST_JOBQSOCK) {
- MPASS(fp->f_type == DTYPE_SOCKET);
- so = fp->f_data;
- TAILQ_REMOVE(&so->so_aiojobq, job, list);
- remove = 1;
- } else if (job->jobstate == JOBST_JOBQSYNC) {
- TAILQ_REMOVE(&ki->kaio_syncqueue, job, list);
- remove = 1;
- }
- mtx_unlock(&aio_job_mtx);
-
- if (remove) {
- TAILQ_REMOVE(&ki->kaio_jobqueue, job, plist);
- job->uaiocb._aiocb_private.status = -1;
- job->uaiocb._aiocb_private.error = ECANCELED;
- aio_bio_done_notify(p, job, DONE_QUEUE);
+ if (aio_cancel_job(p, ki, job)) {
cancelled++;
} else {
notcancelled++;
@@ -2102,7 +2062,7 @@
AIO_LOCK(ki);
TAILQ_FOREACH(job, &ki->kaio_all, allist) {
if (job->ujob == ujob) {
- if (job->jobstate == JOBST_JOBFINISHED)
+ if (job->jobflags & KAIOCB_FINISHED)
td->td_retval[0] =
job->uaiocb._aiocb_private.error;
else
@@ -2382,35 +2342,36 @@
struct kaiocb *job = (struct kaiocb *)bp->bio_caller1;
struct proc *userp;
struct kaioinfo *ki;
- int nblks;
+ size_t nbytes;
+ int error, nblks;
/* Release mapping into kernel space. */
+ userp = job->userproc;
+ ki = userp->p_aioinfo;
if (job->pbuf) {
pmap_qremove((vm_offset_t)job->pbuf->b_data, job->npages);
relpbuf(job->pbuf, NULL);
job->pbuf = NULL;
atomic_subtract_int(&num_buf_aio, 1);
+ AIO_LOCK(ki);
+ ki->kaio_buffer_count--;
+ AIO_UNLOCK(ki);
}
vm_page_unhold_pages(job->pages, job->npages);
bp = job->bp;
job->bp = NULL;
- userp = job->userproc;
- ki = userp->p_aioinfo;
- AIO_LOCK(ki);
- job->uaiocb._aiocb_private.status -= bp->bio_resid;
- job->uaiocb._aiocb_private.error = 0;
+ nbytes = job->uaiocb.aio_nbytes - bp->bio_resid;
+ error = 0;
if (bp->bio_flags & BIO_ERROR)
- job->uaiocb._aiocb_private.error = bp->bio_error;
- nblks = btodb(job->uaiocb.aio_nbytes);
+ error = bp->bio_error;
+ nblks = btodb(nbytes);
if (job->uaiocb.aio_lio_opcode == LIO_WRITE)
job->outputcharge += nblks;
else
job->inputcharge += nblks;
- TAILQ_REMOVE(&userp->p_aioinfo->kaio_bufqueue, job, plist);
- ki->kaio_buffer_count--;
- aio_bio_done_notify(userp, job, DONE_BUF);
- AIO_UNLOCK(ki);
+
+ aio_complete(job, nbytes, error);
g_destroy_bio(bp);
}
@@ -2465,7 +2426,7 @@
}
if (job != NULL) {
- MPASS(job->jobstate == JOBST_JOBFINISHED);
+ MPASS(job->jobflags & KAIOCB_FINISHED);
ujob = job->ujob;
status = job->uaiocb._aiocb_private.status;
error = job->uaiocb._aiocb_private.error;
@@ -2570,7 +2531,7 @@
struct kaiocb *job = kn->kn_ptr.p_aio;
kn->kn_data = job->uaiocb._aiocb_private.error;
- if (job->jobstate != JOBST_JOBFINISHED)
+ if (!(job->jobflags & KAIOCB_FINISHED))
return (0);
kn->kn_flags |= EV_EOF;
return (1);
Index: sys/modules/Makefile
===================================================================
--- sys/modules/Makefile
+++ sys/modules/Makefile
@@ -31,7 +31,6 @@
ahci \
${_aic} \
aic7xxx \
- aio \
alc \
ale \
alq \
Index: sys/modules/aio/Makefile
===================================================================
--- sys/modules/aio/Makefile
+++ /dev/null
@@ -1,10 +0,0 @@
-# $FreeBSD$
-
-.PATH: ${.CURDIR}/../../kern
-
-KMOD= aio
-SRCS= vfs_aio.c opt_vfs_aio.h vnode_if.h opt_compat.h
-
-EXPORT_SYMS= aio_init_aioinfo aio_aqueue
-
-.include <bsd.kmod.mk>
Index: sys/sys/aio.h
===================================================================
--- sys/sys/aio.h
+++ sys/sys/aio.h
@@ -21,6 +21,11 @@
#include <sys/types.h>
#include <sys/signal.h>
+#ifdef _KERNEL
+#include <sys/queue.h>
+#include <sys/event.h>
+#include <sys/signalvar.h>
+#endif
/*
* Returned by aio_cancel:
@@ -51,6 +56,24 @@
*/
#define AIO_LISTIO_MAX 16
+#ifdef _KERNEL
+
+/* Default values of tunables for the AIO worker pool. */
+
+#ifndef MAX_AIO_PROCS
+#define MAX_AIO_PROCS 32
+#endif
+
+#ifndef TARGET_AIO_PROCS
+#define TARGET_AIO_PROCS 4
+#endif
+
+#ifndef AIOD_LIFETIME_DEFAULT
+#define AIOD_LIFETIME_DEFAULT (30 * hz)
+#endif
+
+#endif
+
/*
* Private members for aiocb -- don't access
* directly.
@@ -77,7 +100,91 @@
struct sigevent aio_sigevent; /* Signal to deliver */
} aiocb_t;
-#ifndef _KERNEL
+#ifdef _KERNEL
+
+typedef void aio_cancel_fn_t(struct kaiocb *);
+typedef void aio_handle_fn_t(struct kaiocb *);
+
+/*
+ * Kernel version of an I/O control block.
+ *
+ * Locking key:
+ * * - need not protected
+ * a - locked by kaioinfo lock
+ * b - locked by backend lock
+ * c - locked by aio_job_mtx
+ */
+struct kaiocb {
+ TAILQ_ENTRY(kaiocb) list; /* (b) backend-specific list of jobs */
+ TAILQ_ENTRY(kaiocb) plist; /* (a) lists of pending / done jobs */
+ TAILQ_ENTRY(kaiocb) allist; /* (a) list of all jobs in proc */
+ int jobflags; /* (a) job flags */
+ int inputcharge; /* (*) input blocks */
+ int outputcharge; /* (*) output blocks */
+ struct bio *bp; /* (*) BIO backend BIO pointer */
+ struct buf *pbuf; /* (*) BIO backend buffer pointer */
+ struct vm_page *pages[btoc(MAXPHYS)+1]; /* BIO backend pages */
+ int npages; /* BIO backend number of pages */
+ struct proc *userproc; /* (*) user process */
+ struct ucred *cred; /* (*) active credential when created */
+ struct file *fd_file; /* (*) pointer to file structure */
+ struct aioliojob *lio; /* (*) optional lio job */
+ struct aiocb *ujob; /* (*) pointer in userspace of aiocb */
+ struct knlist klist; /* (a) list of knotes */
+ struct aiocb uaiocb; /* (*) copy of user I/O control block */
+ ksiginfo_t ksi; /* (a) realtime signal info */
+ uint64_t seqno; /* (*) job number */
+ int pending; /* (a) number of pending I/O, aio_fsync only */
+ aio_cancel_fn_t *cancel_fn; /* (a) backend cancel function */
+ aio_handle_fn_t *handle_fn; /* (c) backend handle function */
+};
+
+struct socket;
+struct sockbuf;
+
+/*
+ * AIO backends should permit cancellation of queued requests waiting to
+ * be serviced by installing a cancel routine while the request is
+ * queued. The cancellation routine should dequeue the request if
+ * necessary and cancel it. Care must be used to handle races between
+ * queueing and dequeueing requests and cancellation.
+ *
+ * When queueing a request somewhere such that it can be cancelled, the
+ * caller should:
+ *
+ * 1) Acquire lock that protects the associated queue.
+ * 2) Call aio_set_cancel_function() to install the cancel routine.
+ * 3) If that fails, the request has a pending cancel and should be
+ * cancelled via aio_cancel().
+ * 4) Queue the request.
+ *
+ * When dequeueing a request to service it or hand it off to somewhere else,
+ * the caller should:
+ *
+ * 1) Acquire the lock that protects the associated queue.
+ * 2) Dequeue the request.
+ * 3) Call aio_clear_cancel_function() to clear the cancel routine.
+ * 4) If that fails, the cancel routine is about to be called. The
+ * caller should ignore the request.
+ *
+ * The cancel routine should:
+ *
+ * 1) Acquire the lock that protects the associated queue.
+ * 2) Call aio_cancel_cleared() to determine if the request is already
+ * dequeued due to a race with dequeueing thread.
+ * 3) If that fails, dequeue the request.
+ * 4) Cancel the request via aio_cancel().
+ */
+
+bool aio_cancel_cleared(struct kaiocb *job);
+void aio_cancel(struct kaiocb *job);
+bool aio_clear_cancel_function(struct kaiocb *job);
+void aio_complete(struct kaiocb *job, long status, int error);
+void aio_schedule(struct kaiocb *job, aio_handle_fn_t *func);
+bool aio_set_cancel_function(struct kaiocb *job, aio_cancel_fn_t *func);
+void aio_switch_vmspace(struct kaiocb *job);
+
+#else /* !_KERNEL */
struct timespec;
@@ -137,14 +244,6 @@
int aio_fsync(int op, struct aiocb *aiocbp);
__END_DECLS
-#else
-
-/* Forward declarations for prototypes below. */
-struct socket;
-struct sockbuf;
-
-extern void (*aio_swake)(struct socket *, struct sockbuf *);
+#endif /* !_KERNEL */
-#endif
-
-#endif
+#endif /* !_SYS_AIO_H_ */
Index: sys/sys/file.h
===================================================================
--- sys/sys/file.h
+++ sys/sys/file.h
@@ -73,6 +73,7 @@
struct file;
struct filecaps;
+struct kaiocb;
struct kinfo_file;
struct ucred;
@@ -119,6 +120,7 @@
typedef int fo_mmap_t(struct file *fp, vm_map_t map, vm_offset_t *addr,
vm_size_t size, vm_prot_t prot, vm_prot_t cap_maxprot,
int flags, vm_ooffset_t foff, struct thread *td);
+typedef int fo_aio_queue_t(struct file *fp, struct kaiocb *job);
typedef int fo_flags_t;
struct fileops {
@@ -136,6 +138,7 @@
fo_seek_t *fo_seek;
fo_fill_kinfo_t *fo_fill_kinfo;
fo_mmap_t *fo_mmap;
+ fo_aio_queue_t *fo_aio_queue;
fo_flags_t fo_flags; /* DFLAG_* below */
};
@@ -406,6 +409,13 @@
flags, foff, td));
}
+static __inline int
+fo_aio_queue(struct file *fp, struct kaiocb *job)
+{
+
+ return ((*fp->f_ops->fo_aio_queue)(fp, job));
+}
+
#endif /* _KERNEL */
#endif /* !SYS_FILE_H */
Index: sys/sys/sockbuf.h
===================================================================
--- sys/sys/sockbuf.h
+++ sys/sys/sockbuf.h
@@ -36,6 +36,7 @@
#include <sys/_lock.h>
#include <sys/_mutex.h>
#include <sys/_sx.h>
+#include <sys/_task.h>
#define SB_MAX (2*1024*1024) /* default for max chars in sockbuf */
@@ -53,6 +54,7 @@
#define SB_IN_TOE 0x400 /* socket buffer is in the middle of an operation */
#define SB_AUTOSIZE 0x800 /* automatically size socket buffer */
#define SB_STOP 0x1000 /* backpressure indicator */
+#define SB_AIO_RUNNING 0x2000 /* AIO operation running */
#define SBS_CANTSENDMORE 0x0010 /* can't send more data to peer */
#define SBS_CANTRCVMORE 0x0020 /* can't receive more data from peer */
@@ -77,33 +79,38 @@
/*
* Variables for socket buffering.
+ *
+ * Locking key to struct sockbuf:
+ * (a) locked by SOCKBUF_LOCK().
*/
struct sockbuf {
struct selinfo sb_sel; /* process selecting read/write */
struct mtx sb_mtx; /* sockbuf lock */
struct sx sb_sx; /* prevent I/O interlacing */
- short sb_state; /* (c/d) socket state on sockbuf */
+ short sb_state; /* (a) socket state on sockbuf */
#define sb_startzero sb_mb
- struct mbuf *sb_mb; /* (c/d) the mbuf chain */
- struct mbuf *sb_mbtail; /* (c/d) the last mbuf in the chain */
- struct mbuf *sb_lastrecord; /* (c/d) first mbuf of last
+ struct mbuf *sb_mb; /* (a) the mbuf chain */
+ struct mbuf *sb_mbtail; /* (a) the last mbuf in the chain */
+ struct mbuf *sb_lastrecord; /* (a) first mbuf of last
* record in socket buffer */
- struct mbuf *sb_sndptr; /* (c/d) pointer into mbuf chain */
- struct mbuf *sb_fnrdy; /* (c/d) pointer to first not ready buffer */
- u_int sb_sndptroff; /* (c/d) byte offset of ptr into chain */
- u_int sb_acc; /* (c/d) available chars in buffer */
- u_int sb_ccc; /* (c/d) claimed chars in buffer */
- u_int sb_hiwat; /* (c/d) max actual char count */
- u_int sb_mbcnt; /* (c/d) chars of mbufs used */
- u_int sb_mcnt; /* (c/d) number of mbufs in buffer */
- u_int sb_ccnt; /* (c/d) number of clusters in buffer */
- u_int sb_mbmax; /* (c/d) max chars of mbufs to use */
- u_int sb_ctl; /* (c/d) non-data chars in buffer */
- int sb_lowat; /* (c/d) low water mark */
- sbintime_t sb_timeo; /* (c/d) timeout for read/write */
- short sb_flags; /* (c/d) flags, see below */
- int (*sb_upcall)(struct socket *, void *, int); /* (c/d) */
- void *sb_upcallarg; /* (c/d) */
+ struct mbuf *sb_sndptr; /* (a) pointer into mbuf chain */
+ struct mbuf *sb_fnrdy; /* (a) pointer to first not ready buffer */
+ u_int sb_sndptroff; /* (a) byte offset of ptr into chain */
+ u_int sb_acc; /* (a) available chars in buffer */
+ u_int sb_ccc; /* (a) claimed chars in buffer */
+ u_int sb_hiwat; /* (a) max actual char count */
+ u_int sb_mbcnt; /* (a) chars of mbufs used */
+ u_int sb_mcnt; /* (a) number of mbufs in buffer */
+ u_int sb_ccnt; /* (a) number of clusters in buffer */
+ u_int sb_mbmax; /* (a) max chars of mbufs to use */
+ u_int sb_ctl; /* (a) non-data chars in buffer */
+ int sb_lowat; /* (a) low water mark */
+ sbintime_t sb_timeo; /* (a) timeout for read/write */
+ short sb_flags; /* (a) flags, see below */
+ int (*sb_upcall)(struct socket *, void *, int); /* (a) */
+ void *sb_upcallarg; /* (a) */
+ TAILQ_HEAD(, kaiocb) sb_aiojobq; /* (a) pending AIO ops */
+ struct task sb_aiotask; /* AIO task */
};
#ifdef _KERNEL
Index: sys/sys/socketvar.h
===================================================================
--- sys/sys/socketvar.h
+++ sys/sys/socketvar.h
@@ -64,7 +64,6 @@
* (a) constant after allocation, no locking required.
* (b) locked by SOCK_LOCK(so).
* (c) locked by SOCKBUF_LOCK(&so->so_rcv).
- * (d) locked by SOCKBUF_LOCK(&so->so_snd).
* (e) locked by ACCEPT_LOCK().
* (f) not locked since integer reads/writes are atomic.
* (g) used only as a sleep/wakeup address, no value.
@@ -104,7 +103,6 @@
struct sigio *so_sigio; /* [sg] information for async I/O or
out of band data (SIGURG) */
u_long so_oobmark; /* (c) chars to oob mark */
- TAILQ_HEAD(, kaiocb) so_aiojobq; /* AIO ops waiting on socket */
struct sockbuf so_rcv, so_snd;
@@ -343,6 +341,8 @@
struct file **fpp, u_int *fflagp);
void soabort(struct socket *so);
int soaccept(struct socket *so, struct sockaddr **nam);
+void soaio_rcv(void *context, int pending);
+void soaio_snd(void *context, int pending);
int socheckuid(struct socket *so, uid_t uid);
int sobind(struct socket *so, struct sockaddr *nam, struct thread *td);
int sobindat(int fd, struct socket *so, struct sockaddr *nam,
@@ -397,6 +397,7 @@
void soupcall_set(struct socket *so, int which,
int (*func)(struct socket *, void *, int), void *arg);
void sowakeup(struct socket *so, struct sockbuf *sb);
+void sowakeup_aio(struct socket *so, struct sockbuf *sb);
int selsocket(struct socket *so, int events, struct timeval *tv,
struct thread *td);
Index: tests/sys/aio/aio_kqueue_test.c
===================================================================
--- tests/sys/aio/aio_kqueue_test.c
+++ tests/sys/aio/aio_kqueue_test.c
@@ -47,6 +47,7 @@
#include <unistd.h>
#include "freebsd_test_suite/macros.h"
+#include "local.h"
#define PATH_TEMPLATE "aio.XXXXXXXXXX"
@@ -70,6 +71,7 @@
unsigned i, j;
PLAIN_REQUIRE_KERNEL_MODULE("aio", 0);
+ PLAIN_REQUIRE_UNSAFE_AIO(0);
kq = kqueue();
if (kq < 0) {
Index: tests/sys/aio/aio_test.c
===================================================================
--- tests/sys/aio/aio_test.c
+++ tests/sys/aio/aio_test.c
@@ -60,6 +60,7 @@
#include <atf-c.h>
#include "freebsd_test_suite/macros.h"
+#include "local.h"
#define PATH_TEMPLATE "aio.XXXXXXXXXX"
@@ -340,6 +341,7 @@
int fd;
ATF_REQUIRE_KERNEL_MODULE("aio");
+ ATF_REQUIRE_UNSAFE_AIO();
strcpy(pathname, PATH_TEMPLATE);
fd = mkstemp(pathname);
@@ -386,6 +388,7 @@
struct aio_context ac;
ATF_REQUIRE_KERNEL_MODULE("aio");
+ ATF_REQUIRE_UNSAFE_AIO();
/*
* In theory, mkstemp() can return a name that is then collided with.
@@ -497,6 +500,7 @@
int error;
ATF_REQUIRE_KERNEL_MODULE("aio");
+ ATF_REQUIRE_UNSAFE_AIO();
ATF_REQUIRE_MSG(openpty(&read_fd, &write_fd, NULL, NULL, NULL) == 0,
"openpty failed: %s", strerror(errno));
@@ -544,6 +548,7 @@
int pipes[2];
ATF_REQUIRE_KERNEL_MODULE("aio");
+ ATF_REQUIRE_UNSAFE_AIO();
ATF_REQUIRE_MSG(pipe(pipes) != -1,
"pipe failed: %s", strerror(errno));
Index: tests/sys/aio/lio_kqueue_test.c
===================================================================
--- tests/sys/aio/lio_kqueue_test.c
+++ tests/sys/aio/lio_kqueue_test.c
@@ -50,6 +50,7 @@
#include <unistd.h>
#include "freebsd_test_suite/macros.h"
+#include "local.h"
#define PATH_TEMPLATE "aio.XXXXXXXXXX"
@@ -74,6 +75,7 @@
int tmp_file = 0, failed = 0;
PLAIN_REQUIRE_KERNEL_MODULE("aio", 0);
+ PLAIN_REQUIRE_UNSAFE_AIO(0);
kq = kqueue();
if (kq < 0)
Index: tests/sys/aio/local.h
===================================================================
--- /dev/null
+++ tests/sys/aio/local.h
@@ -0,0 +1,74 @@
+/*-
+ * Copyright (c) 2016 Chelsio Communications, Inc.
+ * All rights reserved.
+ * Written by: John Baldwin <jhb@FreeBSD.org>
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+ * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ *
+ * $FreeBSD$
+ */
+
+#ifndef _AIO_TEST_LOCAL_H_
+#define _AIO_TEST_LOCAL_H_
+
+#include <sys/types.h>
+#include <sys/sysctl.h>
+#include <errno.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+
+#include <atf-c.h>
+
+#define ATF_REQUIRE_UNSAFE_AIO() do { \
+ size_t _len; \
+ int _unsafe; \
+ \
+ _len = sizeof(_unsafe); \
+ if (sysctlbyname("vfs.aio.enable_unsafe", &_unsafe, &_len, NULL,\
+ 0) < 0) { \
+ if (errno != ENOENT) \
+ atf_libc_error(errno, \
+ "Failed to read vfs.aio.enable_unsafe"); \
+ } else if (_unsafe == 0) \
+ atf_tc_skip("Unsafe AIO is disabled"); \
+} while (0)
+
+#define PLAIN_REQUIRE_UNSAFE_AIO(_exit_code) do { \
+ size_t _len; \
+ int _unsafe; \
+ \
+ _len = sizeof(_unsafe); \
+ if (sysctlbyname("vfs.aio.enable_unsafe", &_unsafe, &_len, NULL,\
+ 0) < 0) { \
+ if (errno != ENOENT) { \
+ printf("Failed to read vfs.aio.enable_unsafe: %s\n",\
+ strerror(errno)); \
+ _exit(1); \
+ } \
+ } else if (_unsafe == 0) { \
+ printf("Unsafe AIO is disabled"); \
+ _exit(_exit_code); \
+ } \
+} while (0)
+
+#endif /* !_AIO_TEST_LOCAL_H_ */
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sun, Jan 19, 12:32 AM (4 h, 8 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
15909022
Default Alt Text
D5289.id13428.diff (65 KB)
Attached To
Mode
D5289: Refactor the AIO code.
Attached
Detach File
Event Timeline
Log In to Comment