Index: sys/kern/vfs_aio.c =================================================================== --- sys/kern/vfs_aio.c +++ sys/kern/vfs_aio.c @@ -950,6 +950,49 @@ cb->_aiocb_private.error = error; cb->_aiocb_private.status = cnt; td->td_ucred = td_savedcred; + + /* + * If this request was for a socket, see if we need to queue + * another request. + */ + if (fp->f_type == DTYPE_SOCKET) { + struct aiocblist *cbnext; + struct sockbuf *sb; + so = fp->f_data; + + if (cb->aio_lio_opcode == LIO_WRITE) + sb = &so->so_snd; + else + sb = &so->so_rcv; + SOCKBUF_LOCK(sb); + mtx_lock(&aio_job_mtx); + TAILQ_FOREACH(cbnext, &so->so_aiojobq, list) { + MPASS(cbnext->jobstate == JOBST_JOBQSOCK); + if (cbnext->uaiocb.aio_lio_opcode != cb->aio_lio_opcode) + continue; + + /* + * If the socket buffer is ready, move this request + * to the run queue. Otherwise, set SB_AIO to + * trigger another wakeup. + * + * Note that since this request has just completed, + * there is no need to wake up another aio thread + * if we queue a request. + */ + if ((cbnext->uaiocb.aio_lio_opcode == LIO_WRITE && + sowriteable(so)) || + (cbnext->uaiocb.aio_lio_opcode == LIO_READ && + soreadable(so))) { + TAILQ_REMOVE(&so->so_aiojobq, cbnext, list); + TAILQ_INSERT_TAIL(&aio_jobs, cbnext, list); + } else + sb->sb_flags |= SB_AIO; + break; + } + mtx_unlock(&aio_job_mtx); + SOCKBUF_UNLOCK(sb); + } } static void @@ -1438,7 +1481,7 @@ static void aio_swake_cb(struct socket *so, struct sockbuf *sb) { - struct aiocblist *cb, *cbn; + struct aiocblist *cb; int opcode; SOCKBUF_LOCK_ASSERT(sb); @@ -1449,18 +1492,23 @@ sb->sb_flags &= ~SB_AIO; mtx_lock(&aio_job_mtx); - TAILQ_FOREACH_SAFE(cb, &so->so_aiojobq, list, cbn) { + TAILQ_FOREACH(cb, &so->so_aiojobq, list) { if (opcode == cb->uaiocb.aio_lio_opcode) { if (cb->jobstate != JOBST_JOBQSOCK) panic("invalid queue value"); /* XXX * We don't have actual sockets backend yet, - * so we simply move the requests to the generic + * so we simply move the request to the generic * file I/O backend. + * + * Queue a single request for completion. + * Once it has completed it will queue the next + * request. */ TAILQ_REMOVE(&so->so_aiojobq, cb, list); TAILQ_INSERT_TAIL(&aio_jobs, cb, list); aio_kick_nowait(cb->userproc); + break; } } mtx_unlock(&aio_job_mtx); Index: tests/sys/aio/Makefile =================================================================== --- tests/sys/aio/Makefile +++ tests/sys/aio/Makefile @@ -4,7 +4,7 @@ PLAIN_TESTS_C+= aio_kqueue_test PLAIN_TESTS_C+= lio_kqueue_test -ATF_TESTS_C+= aio_test +ATF_TESTS_C+= aio_socket_test aio_test DPADD.aio_test+= ${LIBUTIL} LDADD.aio_test+= -lutil Index: tests/sys/aio/aio_socket_test.c =================================================================== --- /dev/null +++ tests/sys/aio/aio_socket_test.c @@ -0,0 +1,99 @@ +/*- + * Copyright (c) 2015 John H. Baldwin + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#include +__FBSDID("$FreeBSD$"); + +#include +#include +#include + +#include + +#include "freebsd_test_suite/macros.h" + +/* + * This tests for a bug where arriving socket data can wakeup multiple + * AIO read requests resulting in an uncancellable request. + */ +ATF_TC_WITHOUT_HEAD(aio_socket_two_reads); +ATF_TC_BODY(aio_socket_two_reads, tc) +{ + struct ioreq { + struct aiocb iocb; + char buffer[1024]; + } ioreq[2]; + struct aiocb *iocb; + unsigned i; + int s[2]; + char c; + + ATF_REQUIRE_KERNEL_MODULE("aio"); + + ATF_REQUIRE(socketpair(PF_UNIX, SOCK_STREAM, 0, s) != -1); + + /* Queue two read requests. */ + memset(&ioreq, 0, sizeof(ioreq)); + for (i = 0; i < nitems(ioreq); i++) { + ioreq[i].iocb.aio_nbytes = sizeof(ioreq[i].buffer); + ioreq[i].iocb.aio_fildes = s[0]; + ioreq[i].iocb.aio_buf = ioreq[i].buffer; + ATF_REQUIRE(aio_read(&ioreq[i].iocb) == 0); + } + + /* Send a single byte. This should complete one request. */ + c = 0xc3; + ATF_REQUIRE(write(s[1], &c, sizeof(c)) == 1); + + ATF_REQUIRE(aio_waitcomplete(&iocb, NULL) == 1); + + /* Determine which request completed and verify the data was read. */ + if (iocb == &ioreq[0].iocb) + i = 0; + else + i = 1; + ATF_REQUIRE(ioreq[i].buffer[0] == c); + + i ^= 1; + + /* + * Try to cancel the other request. On broken systems this + * will fail and the process will hang on exit. + */ + ATF_REQUIRE(aio_error(&ioreq[i].iocb) == EINPROGRESS); + ATF_REQUIRE(aio_cancel(s[0], &ioreq[i].iocb) == AIO_CANCELED); + + close(s[1]); + close(s[0]); +} + +ATF_TP_ADD_TCS(tp) +{ + + ATF_TP_ADD_TC(tp, aio_socket_two_reads); + + return (atf_no_error()); +}