Index: lib/libc/sys/kqueue.2 =================================================================== --- lib/libc/sys/kqueue.2 +++ lib/libc/sys/kqueue.2 @@ -318,8 +318,7 @@ .Dv EV_EOF in .Va flags . -This may be cleared by passing in -.Dv EV_CLEAR , +This will be cleared by the filter when a new writer connects, at which point the filter will resume waiting for data to become available before returning. @@ -338,9 +337,10 @@ and fifos, .Va data will contain the amount of space remaining in the write buffer. -The filter will set EV_EOF when the reader disconnects, and for -the fifo case, this may be cleared by use of -.Dv EV_CLEAR . +The filter will set +.Dv EV_EOF +when the reader disconnects, and for the fifo case, this will be cleared +when a new reader connects. Note that this filter is not supported for vnodes or BPF devices. .Pp For sockets, the low water mark and socket error handling is Index: sys/fs/fifofs/fifo_vnops.c =================================================================== --- sys/fs/fifofs/fifo_vnops.c +++ sys/fs/fifofs/fifo_vnops.c @@ -174,8 +174,10 @@ fip->fi_rgen++; if (fip->fi_readers == 1) { fpipe->pipe_state &= ~PIPE_EOF; - if (fip->fi_writers > 0) + if (fip->fi_writers > 0) { wakeup(&fip->fi_writers); + pipeselwakeup(fpipe); + } } fp->f_pipegen = fpipe->pipe_wgen - fip->fi_writers; } @@ -190,8 +192,10 @@ fip->fi_wgen++; if (fip->fi_writers == 1) { fpipe->pipe_state &= ~PIPE_EOF; - if (fip->fi_readers > 0) + if (fip->fi_readers > 0) { wakeup(&fip->fi_readers); + pipeselwakeup(fpipe); + } } } if ((ap->a_mode & O_NONBLOCK) == 0) { @@ -210,6 +214,7 @@ fpipe->pipe_state |= PIPE_EOF; if (fpipe->pipe_state & PIPE_WANTW) wakeup(fpipe); + pipeselwakeup(fpipe); PIPE_UNLOCK(fpipe); fifo_cleanup(vp); } @@ -238,6 +243,7 @@ if (fpipe->pipe_state & PIPE_WANTR) wakeup(fpipe); fpipe->pipe_wgen++; + pipeselwakeup(fpipe); PIPE_UNLOCK(fpipe); fifo_cleanup(vp); } Index: sys/kern/sys_pipe.c =================================================================== --- sys/kern/sys_pipe.c +++ sys/kern/sys_pipe.c @@ -824,7 +824,12 @@ } } - if ((rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF) + /* + * Only wake up writers if there was actually something read. + * Otherwise, when calling read(2) at EOF, a spurious wakeup occurs. + */ + if (nread > 0 && + (rpipe->pipe_buffer.size - rpipe->pipe_buffer.cnt) >= PIPE_BUF) pipeselwakeup(rpipe); PIPE_UNLOCK(rpipe); @@ -1430,7 +1435,8 @@ if ((events & POLLINIGNEOF) == 0) { if (rpipe->pipe_state & PIPE_EOF) { - revents |= (events & (POLLIN | POLLRDNORM)); + if (fp->f_flag & FREAD) + revents |= (events & (POLLIN | POLLRDNORM)); if (wpipe->pipe_present != PIPE_ACTIVE || (wpipe->pipe_state & PIPE_EOF)) revents |= POLLHUP; @@ -1605,8 +1611,6 @@ pipelock(cpipe, 0); pp = cpipe->pipe_pair; - pipeselwakeup(cpipe); - /* * If the other side is blocked, wake it up saying that * we want to close it down. @@ -1620,16 +1624,16 @@ pipelock(cpipe, 0); } + pipeselwakeup(cpipe); + /* * Disconnect from peer, if any. */ ppipe = cpipe->pipe_peer; if (ppipe->pipe_present == PIPE_ACTIVE) { - pipeselwakeup(ppipe); - ppipe->pipe_state |= PIPE_EOF; wakeup(ppipe); - KNOTE_LOCKED(&ppipe->pipe_sel.si_note, 0); + pipeselwakeup(ppipe); } /* @@ -1727,48 +1731,52 @@ static int filt_piperead(struct knote *kn, long hint) { + struct file *fp = kn->kn_fp; struct pipe *rpipe = kn->kn_hook; - struct pipe *wpipe = rpipe->pipe_peer; - int ret; PIPE_LOCK_ASSERT(rpipe, MA_OWNED); kn->kn_data = rpipe->pipe_buffer.cnt; if (kn->kn_data == 0) kn->kn_data = rpipe->pipe_map.cnt; - if ((rpipe->pipe_state & PIPE_EOF) || - wpipe->pipe_present != PIPE_ACTIVE || - (wpipe->pipe_state & PIPE_EOF)) { + if ((rpipe->pipe_state & PIPE_EOF) != 0 && + ((rpipe->pipe_state & PIPE_NAMED) == 0 || + fp->f_pipegen != rpipe->pipe_wgen)) { kn->kn_flags |= EV_EOF; return (1); } - ret = kn->kn_data > 0; - return ret; + kn->kn_flags &= ~EV_EOF; + return (kn->kn_data > 0); } /*ARGSUSED*/ static int filt_pipewrite(struct knote *kn, long hint) { - struct pipe *wpipe; + struct pipe *wpipe = kn->kn_hook; /* * If this end of the pipe is closed, the knote was removed from the * knlist and the list lock (i.e., the pipe lock) is therefore not held. */ - wpipe = kn->kn_hook; + if (wpipe->pipe_present == PIPE_ACTIVE) { + PIPE_LOCK_ASSERT(wpipe, MA_OWNED); + } + + if (wpipe->pipe_state & PIPE_DIRECTW) { + kn->kn_data = 0; + } else if (wpipe->pipe_buffer.size > 0) { + kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt; + } else { + kn->kn_data = PIPE_BUF; + } + if (wpipe->pipe_present != PIPE_ACTIVE || (wpipe->pipe_state & PIPE_EOF)) { - kn->kn_data = 0; kn->kn_flags |= EV_EOF; return (1); } - PIPE_LOCK_ASSERT(wpipe, MA_OWNED); - kn->kn_data = (wpipe->pipe_buffer.size > 0) ? - (wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt) : PIPE_BUF; - if (wpipe->pipe_state & PIPE_DIRECTW) - kn->kn_data = 0; - + kn->kn_flags &= ~EV_EOF; return (kn->kn_data >= PIPE_BUF); } Index: tests/sys/fifo/Makefile =================================================================== --- tests/sys/fifo/Makefile +++ tests/sys/fifo/Makefile @@ -4,6 +4,7 @@ PLAIN_TESTS_C+= fifo_create PLAIN_TESTS_C+= fifo_io +ATF_TESTS_C+= fifo_kqueue PLAIN_TESTS_C+= fifo_misc PLAIN_TESTS_C+= fifo_open Index: tests/sys/fifo/fifo_kqueue.c =================================================================== --- /dev/null +++ tests/sys/fifo/fifo_kqueue.c @@ -0,0 +1,426 @@ +/* + * Copyright (c) 2020 Jan Kokemüller + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#include +#include +#include + +#include +#include +#include +#include + +#include +#include +#include +#include + +#include + +ATF_TC_WITHOUT_HEAD(fifo_kqueue__writes); +ATF_TC_BODY(fifo_kqueue__writes, tc) +{ + int p[2] = { -1, -1 }; + + ATF_REQUIRE(mkfifo("testfifo", 0600) == 0); + + ATF_REQUIRE((p[0] = open("testfifo", + O_RDONLY | O_CLOEXEC | O_NONBLOCK)) >= 0); + ATF_REQUIRE((p[1] = open("testfifo", + O_WRONLY | O_CLOEXEC | O_NONBLOCK)) >= 0); + + int kq = kqueue(); + ATF_REQUIRE(kq >= 0); + + struct kevent kev[32]; + EV_SET(&kev[0], p[1], EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, 0); + EV_SET(&kev[1], p[1], EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, 0); + + ATF_REQUIRE(kevent(kq, kev, 2, NULL, 0, NULL) == 0); + + /* A new writer should immediately get a EVFILT_WRITE event. */ + + ATF_REQUIRE(kevent(kq, NULL, 0, kev, nitems(kev), + &(struct timespec) { 0, 0 }) == 1); + ATF_REQUIRE(kev[0].ident == (uintptr_t)p[1]); + ATF_REQUIRE(kev[0].filter == EVFILT_WRITE); + ATF_REQUIRE(kev[0].flags == EV_CLEAR); + ATF_REQUIRE(kev[0].fflags == 0); + ATF_REQUIRE(kev[0].data == 16384); + ATF_REQUIRE(kev[0].udata == 0); + + /* Filling up the pipe should make the EVFILT_WRITE disappear. */ + + char c = 0; + ssize_t r; + while ((r = write(p[1], &c, 1)) == 1) { + } + ATF_REQUIRE(r < 0); + ATF_REQUIRE(errno == EAGAIN || errno == EWOULDBLOCK); + + ATF_REQUIRE(kevent(kq, NULL, 0, kev, nitems(kev), + &(struct timespec) { 0, 0 }) == 0); + + /* Reading (PIPE_BUF - 1) bytes will not trigger a EVFILT_WRITE yet. */ + + for (int i = 0; i < PIPE_BUF - 1; ++i) { + ATF_REQUIRE(read(p[0], &c, 1) == 1); + } + + ATF_REQUIRE(kevent(kq, NULL, 0, kev, nitems(kev), + &(struct timespec) { 0, 0 }) == 0); + + /* Reading one additional byte triggers the EVFILT_WRITE. */ + + ATF_REQUIRE(read(p[0], &c, 1) == 1); + + ATF_REQUIRE(kevent(kq, NULL, 0, kev, nitems(kev), + &(struct timespec) { 0, 0 }) == 1); + ATF_REQUIRE(kev[0].ident == (uintptr_t)p[1]); + ATF_REQUIRE(kev[0].filter == EVFILT_WRITE); + ATF_REQUIRE(kev[0].flags == EV_CLEAR); + ATF_REQUIRE(kev[0].fflags == 0); + ATF_REQUIRE(kev[0].data == PIPE_BUF); + ATF_REQUIRE(kev[0].udata == 0); + + /* + * Reading another byte triggers the EVFILT_WRITE again with a changed + * 'data' field. + */ + + ATF_REQUIRE(read(p[0], &c, 1) == 1); + + ATF_REQUIRE(kevent(kq, NULL, 0, kev, nitems(kev), + &(struct timespec) { 0, 0 }) == 1); + ATF_REQUIRE(kev[0].ident == (uintptr_t)p[1]); + ATF_REQUIRE(kev[0].filter == EVFILT_WRITE); + ATF_REQUIRE(kev[0].flags == EV_CLEAR); + ATF_REQUIRE(kev[0].fflags == 0); + ATF_REQUIRE(kev[0].data == PIPE_BUF + 1); + ATF_REQUIRE(kev[0].udata == 0); + + /* + * Closing the read end should make a EV_EOF appear but leave the 'data' + * field unchanged. + */ + + ATF_REQUIRE(close(p[0]) == 0); + + ATF_REQUIRE(kevent(kq, NULL, 0, kev, nitems(kev), NULL) == 1); + ATF_REQUIRE(kev[0].ident == (uintptr_t)p[1]); + ATF_REQUIRE(kev[0].filter == EVFILT_WRITE); + ATF_REQUIRE(kev[0].flags == (EV_CLEAR | EV_EOF)); + ATF_REQUIRE(kev[0].fflags == 0); + ATF_REQUIRE(kev[0].data == PIPE_BUF + 1); + ATF_REQUIRE(kev[0].udata == 0); + + ATF_REQUIRE(close(kq) == 0); + ATF_REQUIRE(close(p[1]) == 0); +} + +ATF_TC_WITHOUT_HEAD(fifo_kqueue__connecting_reader); +ATF_TC_BODY(fifo_kqueue__connecting_reader, tc) +{ + int p[2] = { -1, -1 }; + + ATF_REQUIRE(mkfifo("testfifo", 0600) == 0); + + ATF_REQUIRE((p[0] = open("testfifo", + O_RDONLY | O_CLOEXEC | O_NONBLOCK)) >= 0); + ATF_REQUIRE((p[1] = open("testfifo", + O_WRONLY | O_CLOEXEC | O_NONBLOCK)) >= 0); + + int kq = kqueue(); + ATF_REQUIRE(kq >= 0); + + struct kevent kev[32]; + EV_SET(&kev[0], p[1], EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, 0); + EV_SET(&kev[1], p[1], EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, 0); + + ATF_REQUIRE(kevent(kq, kev, 2, NULL, 0, NULL) == 0); + + /* A new writer should immediately get a EVFILT_WRITE event. */ + + ATF_REQUIRE(kevent(kq, NULL, 0, kev, nitems(kev), + &(struct timespec) { 0, 0 }) == 1); + ATF_REQUIRE(kev[0].ident == (uintptr_t)p[1]); + ATF_REQUIRE(kev[0].filter == EVFILT_WRITE); + ATF_REQUIRE(kevent(kq, NULL, 0, kev, nitems(kev), + &(struct timespec) { 0, 0 }) == 0); + + /* + * Filling the pipe, reading (PIPE_BUF + 1) bytes, then closing the + * read end leads to a EVFILT_WRITE with EV_EOF set. + */ + + char c = 0; + ssize_t r; + while ((r = write(p[1], &c, 1)) == 1) { + } + ATF_REQUIRE(r < 0); + ATF_REQUIRE(errno == EAGAIN || errno == EWOULDBLOCK); + + for (int i = 0; i < PIPE_BUF + 1; ++i) { + ATF_REQUIRE(read(p[0], &c, 1) == 1); + } + + ATF_REQUIRE(close(p[0]) == 0); + + ATF_REQUIRE(kevent(kq, NULL, 0, kev, nitems(kev), NULL) == 1); + ATF_REQUIRE(kev[0].filter == EVFILT_WRITE); + ATF_REQUIRE((kev[0].flags & EV_EOF) != 0); + ATF_REQUIRE(kevent(kq, NULL, 0, kev, nitems(kev), + &(struct timespec) { 0, 0 }) == 0); + + /* Opening the reader again must trigger the EVFILT_WRITE. */ + + ATF_REQUIRE((p[0] = open("testfifo", + O_RDONLY | O_CLOEXEC | O_NONBLOCK)) >= 0); + + r = kevent(kq, NULL, 0, kev, nitems(kev), &(struct timespec) { 1, 0 }); + ATF_REQUIRE(r == 1); + ATF_REQUIRE(kev[0].ident == (uintptr_t)p[1]); + ATF_REQUIRE(kev[0].filter == EVFILT_WRITE); + ATF_REQUIRE(kev[0].flags == EV_CLEAR); + ATF_REQUIRE(kev[0].fflags == 0); + ATF_REQUIRE(kev[0].data == PIPE_BUF + 1); + ATF_REQUIRE(kev[0].udata == 0); + ATF_REQUIRE(kevent(kq, NULL, 0, kev, nitems(kev), + &(struct timespec) { 0, 0 }) == 0); + + ATF_REQUIRE(close(kq) == 0); + ATF_REQUIRE(close(p[0]) == 0); + ATF_REQUIRE(close(p[1]) == 0); +} + +ATF_TC_WITHOUT_HEAD(fifo_kqueue__reads); +ATF_TC_BODY(fifo_kqueue__reads, tc) +{ + int p[2] = { -1, -1 }; + + ATF_REQUIRE(mkfifo("testfifo", 0600) == 0); + + ATF_REQUIRE((p[0] = open("testfifo", + O_RDONLY | O_CLOEXEC | O_NONBLOCK)) >= 0); + ATF_REQUIRE((p[1] = open("testfifo", + O_WRONLY | O_CLOEXEC | O_NONBLOCK)) >= 0); + + /* Check that EVFILT_READ behaves sensibly on a FIFO reader. */ + + char c = 0; + ssize_t r; + while ((r = write(p[1], &c, 1)) == 1) { + } + ATF_REQUIRE(r < 0); + ATF_REQUIRE(errno == EAGAIN || errno == EWOULDBLOCK); + + for (int i = 0; i < PIPE_BUF + 1; ++i) { + ATF_REQUIRE(read(p[0], &c, 1) == 1); + } + + int kq = kqueue(); + ATF_REQUIRE(kq >= 0); + + struct kevent kev[32]; + EV_SET(&kev[0], p[0], EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, 0); + + ATF_REQUIRE(kevent(kq, kev, 1, NULL, 0, NULL) == 0); + + ATF_REQUIRE(kevent(kq, NULL, 0, kev, nitems(kev), + &(struct timespec) { 0, 0 }) == 1); + ATF_REQUIRE(kev[0].ident == (uintptr_t)p[0]); + ATF_REQUIRE(kev[0].filter == EVFILT_READ); + ATF_REQUIRE(kev[0].flags == EV_CLEAR); + ATF_REQUIRE(kev[0].fflags == 0); + ATF_REQUIRE(kev[0].data == 65023); + ATF_REQUIRE(kev[0].udata == 0); + + while ((r = read(p[0], &c, 1)) == 1) { + } + ATF_REQUIRE(r < 0); + ATF_REQUIRE(errno == EAGAIN || errno == EWOULDBLOCK); + + ATF_REQUIRE(kevent(kq, NULL, 0, kev, nitems(kev), + &(struct timespec) { 0, 0 }) == 0); + + ATF_REQUIRE(close(kq) == 0); + ATF_REQUIRE(close(p[0]) == 0); + ATF_REQUIRE(close(p[1]) == 0); +} + +ATF_TC_WITHOUT_HEAD(fifo_kqueue__read_eof_wakeups); +ATF_TC_BODY(fifo_kqueue__read_eof_wakeups, tc) +{ + int p[2] = { -1, -1 }; + + ATF_REQUIRE(mkfifo("testfifo", 0600) == 0); + + ATF_REQUIRE((p[0] = open("testfifo", + O_RDONLY | O_CLOEXEC | O_NONBLOCK)) >= 0); + ATF_REQUIRE((p[1] = open("testfifo", + O_WRONLY | O_CLOEXEC | O_NONBLOCK)) >= 0); + + int kq = kqueue(); + ATF_REQUIRE(kq >= 0); + + struct kevent kev[32]; + + EV_SET(&kev[0], p[0], EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, 0); + ATF_REQUIRE(kevent(kq, kev, 1, NULL, 0, NULL) == 0); + + ATF_REQUIRE(kevent(kq, NULL, 0, kev, nitems(kev), + &(struct timespec) { 0, 0 }) == 0); + + /* + * Closing the writer must trigger a EVFILT_READ edge with EV_EOF set. + */ + + ATF_REQUIRE(close(p[1]) == 0); + + ATF_REQUIRE(kevent(kq, NULL, 0, kev, nitems(kev), + &(struct timespec) { 0, 0 }) == 1); + ATF_REQUIRE(kev[0].ident == (uintptr_t)p[0]); + ATF_REQUIRE(kev[0].filter == EVFILT_READ); + ATF_REQUIRE(kev[0].flags == (EV_EOF | EV_CLEAR)); + ATF_REQUIRE(kev[0].fflags == 0); + ATF_REQUIRE(kev[0].data == 0); + ATF_REQUIRE(kev[0].udata == 0); + + /* + * Trying to read from a closed pipe should not trigger EVFILT_READ + * edges. + */ + + char c; + ATF_REQUIRE(read(p[0], &c, 1) == 0); + + ATF_REQUIRE(kevent(kq, NULL, 0, kev, nitems(kev), + &(struct timespec) { 0, 0 }) == 0); + + ATF_REQUIRE(close(kq) == 0); + ATF_REQUIRE(close(p[0]) == 0); +} + +ATF_TC_WITHOUT_HEAD(fifo_kqueue__read_eof_state_when_reconnecting); +ATF_TC_BODY(fifo_kqueue__read_eof_state_when_reconnecting, tc) +{ + int p[2] = { -1, -1 }; + + ATF_REQUIRE(mkfifo("testfifo", 0600) == 0); + + ATF_REQUIRE((p[0] = open("testfifo", + O_RDONLY | O_CLOEXEC | O_NONBLOCK)) >= 0); + ATF_REQUIRE((p[1] = open("testfifo", + O_WRONLY | O_CLOEXEC | O_NONBLOCK)) >= 0); + + int kq = kqueue(); + ATF_REQUIRE(kq >= 0); + + struct kevent kev[32]; + + EV_SET(&kev[0], p[0], EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, 0); + ATF_REQUIRE(kevent(kq, kev, 1, NULL, 0, NULL) == 0); + + ATF_REQUIRE(kevent(kq, NULL, 0, kev, nitems(kev), + &(struct timespec) { 0, 0 }) == 0); + + /* + * Closing the writer must trigger a EVFILT_READ edge with EV_EOF set. + */ + + ATF_REQUIRE(close(p[1]) == 0); + + ATF_REQUIRE(kevent(kq, NULL, 0, kev, nitems(kev), + &(struct timespec) { 0, 0 }) == 1); + ATF_REQUIRE(kev[0].ident == (uintptr_t)p[0]); + ATF_REQUIRE(kev[0].filter == EVFILT_READ); + ATF_REQUIRE(kev[0].flags == (EV_EOF | EV_CLEAR)); + ATF_REQUIRE(kev[0].fflags == 0); + ATF_REQUIRE(kev[0].data == 0); + ATF_REQUIRE(kev[0].udata == 0); + + /* A new reader shouldn't see the EOF flag. */ + + { + int new_reader; + ATF_REQUIRE((new_reader = open("testfifo", + O_RDONLY | O_CLOEXEC | O_NONBLOCK)) >= 0); + + int new_kq = kqueue(); + ATF_REQUIRE(new_kq >= 0); + + struct kevent new_kev[32]; + EV_SET(&new_kev[0], new_reader, EVFILT_READ, EV_ADD | EV_CLEAR, + 0, 0, 0); + ATF_REQUIRE(kevent(new_kq, new_kev, 1, NULL, 0, NULL) == 0); + + ATF_REQUIRE(kevent(new_kq, NULL, 0, new_kev, nitems(new_kev), + &(struct timespec) { 0, 0 }) == 0); + + ATF_REQUIRE(close(new_kq) == 0); + ATF_REQUIRE(close(new_reader) == 0); + } + + /* + * Simply reopening the writer does not trigger the EVFILT_READ again -- + * EV_EOF should be cleared, but there is no data yet so the filter + * does not trigger. + */ + + ATF_REQUIRE((p[1] = open("testfifo", + O_WRONLY | O_CLOEXEC | O_NONBLOCK)) >= 0); + + ATF_REQUIRE(kevent(kq, NULL, 0, kev, nitems(kev), + &(struct timespec) { 0, 0 }) == 0); + + /* Writing a byte should trigger a EVFILT_READ. */ + + char c = 0; + ATF_REQUIRE(write(p[1], &c, 1) == 1); + + ATF_REQUIRE(kevent(kq, NULL, 0, kev, nitems(kev), + &(struct timespec) { 0, 0 }) == 1); + ATF_REQUIRE(kev[0].ident == (uintptr_t)p[0]); + ATF_REQUIRE(kev[0].filter == EVFILT_READ); + ATF_REQUIRE(kev[0].flags == EV_CLEAR); + ATF_REQUIRE(kev[0].fflags == 0); + ATF_REQUIRE(kev[0].data == 1); + ATF_REQUIRE(kev[0].udata == 0); + + ATF_REQUIRE(close(kq) == 0); + ATF_REQUIRE(close(p[0]) == 0); + ATF_REQUIRE(close(p[1]) == 0); +} + +ATF_TP_ADD_TCS(tp) +{ + ATF_TP_ADD_TC(tp, fifo_kqueue__writes); + ATF_TP_ADD_TC(tp, fifo_kqueue__connecting_reader); + ATF_TP_ADD_TC(tp, fifo_kqueue__reads); + ATF_TP_ADD_TC(tp, fifo_kqueue__read_eof_wakeups); + ATF_TP_ADD_TC(tp, fifo_kqueue__read_eof_state_when_reconnecting); + + return atf_no_error(); +} Index: tests/sys/kern/pipe/Makefile =================================================================== --- tests/sys/kern/pipe/Makefile +++ tests/sys/kern/pipe/Makefile @@ -5,6 +5,7 @@ PLAIN_TESTS_C+= big_pipe_test PLAIN_TESTS_C+= pipe_fstat_bug_test PLAIN_TESTS_C+= pipe_ino_test +ATF_TESTS_C+= pipe_kqueue_test PLAIN_TESTS_C+= pipe_overcommit1_test PLAIN_TESTS_C+= pipe_overcommit2_test PLAIN_TESTS_C+= pipe_reverse2_test Index: tests/sys/kern/pipe/pipe_kqueue_test.c =================================================================== --- /dev/null +++ tests/sys/kern/pipe/pipe_kqueue_test.c @@ -0,0 +1,362 @@ +/* + * Copyright (c) 2020 Jan Kokemüller + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#include +#include +#include + +#include +#include +#include +#include + +#include +#include +#include +#include + +#include + +ATF_TC_WITHOUT_HEAD(pipe_kqueue__write_end); +ATF_TC_BODY(pipe_kqueue__write_end, tc) +{ + int p[2] = { -1, -1 }; + + ATF_REQUIRE(pipe2(p, O_CLOEXEC | O_NONBLOCK) == 0); + ATF_REQUIRE(p[0] >= 0); + ATF_REQUIRE(p[1] >= 0); + + int kq = kqueue(); + ATF_REQUIRE(kq >= 0); + + struct kevent kev[32]; + EV_SET(&kev[0], p[1], EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, 0); + + ATF_REQUIRE(kevent(kq, kev, 1, NULL, 0, NULL) == 0); + + /* Test that EVFILT_WRITE behaves sensibly on the write end. */ + + ATF_REQUIRE(kevent(kq, NULL, 0, kev, nitems(kev), + &(struct timespec) { 0, 0 }) == 1); + ATF_REQUIRE(kev[0].ident == (uintptr_t)p[1]); + ATF_REQUIRE(kev[0].filter == EVFILT_WRITE); + ATF_REQUIRE(kev[0].flags == EV_CLEAR); + ATF_REQUIRE(kev[0].fflags == 0); + ATF_REQUIRE(kev[0].data == 16384); + ATF_REQUIRE(kev[0].udata == 0); + + /* Filling up the pipe should make the EVFILT_WRITE disappear. */ + + char c = 0; + ssize_t r; + while ((r = write(p[1], &c, 1)) == 1) { + } + ATF_REQUIRE(r < 0); + ATF_REQUIRE(errno == EAGAIN || errno == EWOULDBLOCK); + + ATF_REQUIRE(kevent(kq, NULL, 0, kev, nitems(kev), + &(struct timespec) { 0, 0 }) == 0); + + /* Reading (PIPE_BUF - 1) bytes will not trigger a EVFILT_WRITE yet. */ + + for (int i = 0; i < PIPE_BUF - 1; ++i) { + ATF_REQUIRE(read(p[0], &c, 1) == 1); + } + + ATF_REQUIRE(kevent(kq, NULL, 0, kev, nitems(kev), + &(struct timespec) { 0, 0 }) == 0); + + /* Reading one additional byte triggers the EVFILT_WRITE. */ + + ATF_REQUIRE(read(p[0], &c, 1) == 1); + + r = kevent(kq, NULL, 0, kev, nitems(kev), &(struct timespec) { 0, 0 }); + ATF_REQUIRE(r == 1); + ATF_REQUIRE(kev[0].ident == (uintptr_t)p[1]); + ATF_REQUIRE(kev[0].filter == EVFILT_WRITE); + ATF_REQUIRE(kev[0].flags == EV_CLEAR); + ATF_REQUIRE(kev[0].fflags == 0); + ATF_REQUIRE(kev[0].data == PIPE_BUF); + ATF_REQUIRE(kev[0].udata == 0); + + /* + * Reading another byte triggers the EVFILT_WRITE again with a changed + * 'data' field. + */ + + ATF_REQUIRE(read(p[0], &c, 1) == 1); + + ATF_REQUIRE(kevent(kq, NULL, 0, kev, nitems(kev), + &(struct timespec) { 0, 0 }) == 1); + ATF_REQUIRE(kev[0].ident == (uintptr_t)p[1]); + ATF_REQUIRE(kev[0].filter == EVFILT_WRITE); + ATF_REQUIRE(kev[0].flags == EV_CLEAR); + ATF_REQUIRE(kev[0].fflags == 0); + ATF_REQUIRE(kev[0].data == PIPE_BUF + 1); + ATF_REQUIRE(kev[0].udata == 0); + + /* + * Closing the read end should make a EV_EOF appear but leave the 'data' + * field unchanged. + */ + + ATF_REQUIRE(close(p[0]) == 0); + + ATF_REQUIRE(kevent(kq, NULL, 0, kev, nitems(kev), + &(struct timespec) { 0, 0 }) == 1); + ATF_REQUIRE(kev[0].ident == (uintptr_t)p[1]); + ATF_REQUIRE(kev[0].filter == EVFILT_WRITE); + ATF_REQUIRE(kev[0].flags == (EV_CLEAR | EV_EOF | EV_ONESHOT)); + ATF_REQUIRE(kev[0].fflags == 0); + ATF_REQUIRE(kev[0].data == PIPE_BUF + 1); + ATF_REQUIRE(kev[0].udata == 0); + + ATF_REQUIRE(close(kq) == 0); + ATF_REQUIRE(close(p[1]) == 0); +} + +ATF_TC_WITHOUT_HEAD(pipe_kqueue__closed_read_end); +ATF_TC_BODY(pipe_kqueue__closed_read_end, tc) +{ + int p[2] = { -1, -1 }; + + ATF_REQUIRE(pipe2(p, O_CLOEXEC | O_NONBLOCK) == 0); + ATF_REQUIRE(p[0] >= 0); + ATF_REQUIRE(p[1] >= 0); + + ATF_REQUIRE(close(p[0]) == 0); + + int kq = kqueue(); + ATF_REQUIRE(kq >= 0); + + struct kevent kev[32]; + EV_SET(&kev[0], p[1], EVFILT_READ, EV_ADD | EV_CLEAR | EV_RECEIPT, /**/ + 0, 0, 0); + EV_SET(&kev[1], p[1], EVFILT_WRITE, EV_ADD | EV_CLEAR | EV_RECEIPT, /**/ + 0, 0, 0); + + /* + * Trying to register EVFILT_WRITE when the pipe is closed leads to an + * EPIPE error. + */ + + ATF_REQUIRE(kevent(kq, kev, 2, kev, 2, NULL) == 2); + ATF_REQUIRE((kev[0].flags & EV_ERROR) != 0); + ATF_REQUIRE(kev[0].data == 0); + ATF_REQUIRE((kev[1].flags & EV_ERROR) != 0); + ATF_REQUIRE(kev[1].data == EPIPE); + + ATF_REQUIRE(kevent(kq, NULL, 0, kev, nitems(kev), + &(struct timespec) { 0, 0 }) == 1); + ATF_REQUIRE(kev[0].ident == (uintptr_t)p[1]); + ATF_REQUIRE(kev[0].filter == EVFILT_READ); + ATF_REQUIRE(kev[0].flags == (EV_EOF | EV_CLEAR | EV_RECEIPT)); + ATF_REQUIRE(kev[0].fflags == 0); + ATF_REQUIRE(kev[0].data == 0); + ATF_REQUIRE(kev[0].udata == 0); + + ATF_REQUIRE(close(kq) == 0); + ATF_REQUIRE(close(p[1]) == 0); +} + +ATF_TC_WITHOUT_HEAD(pipe_kqueue__closed_read_end_register_before_close); +ATF_TC_BODY(pipe_kqueue__closed_read_end_register_before_close, tc) +{ + int p[2] = { -1, -1 }; + + ATF_REQUIRE(pipe2(p, O_CLOEXEC | O_NONBLOCK) == 0); + ATF_REQUIRE(p[0] >= 0); + ATF_REQUIRE(p[1] >= 0); + + int kq = kqueue(); + ATF_REQUIRE(kq >= 0); + + struct kevent kev[32]; + EV_SET(&kev[0], p[1], EVFILT_READ, EV_ADD | EV_CLEAR | EV_RECEIPT, /**/ + 0, 0, 0); + EV_SET(&kev[1], p[1], EVFILT_WRITE, EV_ADD | EV_CLEAR | EV_RECEIPT, /**/ + 0, 0, 0); + + /* + * Registering EVFILT_WRITE before the pipe is closed leads to a + * EVFILT_WRITE event with EV_EOF set. + */ + + ATF_REQUIRE(kevent(kq, kev, 2, kev, 2, NULL) == 2); + ATF_REQUIRE((kev[0].flags & EV_ERROR) != 0); + ATF_REQUIRE(kev[0].data == 0); + ATF_REQUIRE((kev[1].flags & EV_ERROR) != 0); + ATF_REQUIRE(kev[1].data == 0); + + ATF_REQUIRE(close(p[0]) == 0); + + ATF_REQUIRE(kevent(kq, NULL, 0, kev, nitems(kev), + &(struct timespec) { 0, 0 }) == 2); + { + ATF_REQUIRE(kev[0].ident == (uintptr_t)p[1]); + ATF_REQUIRE(kev[0].filter == EVFILT_WRITE); + ATF_REQUIRE(kev[0].flags == + (EV_EOF | EV_CLEAR | EV_ONESHOT | EV_RECEIPT)); + ATF_REQUIRE(kev[0].fflags == 0); + ATF_REQUIRE(kev[0].data == 16384); + ATF_REQUIRE(kev[0].udata == 0); + } + { + ATF_REQUIRE(kev[1].ident == (uintptr_t)p[1]); + ATF_REQUIRE(kev[1].filter == EVFILT_READ); + ATF_REQUIRE(kev[1].flags == (EV_EOF | EV_CLEAR | EV_RECEIPT)); + ATF_REQUIRE(kev[1].fflags == 0); + ATF_REQUIRE(kev[1].data == 0); + ATF_REQUIRE(kev[1].udata == 0); + } + + ATF_REQUIRE(close(kq) == 0); + ATF_REQUIRE(close(p[1]) == 0); +} + +ATF_TC_WITHOUT_HEAD(pipe_kqueue__closed_write_end); +ATF_TC_BODY(pipe_kqueue__closed_write_end, tc) +{ + int p[2] = { -1, -1 }; + + ATF_REQUIRE(pipe2(p, O_CLOEXEC | O_NONBLOCK) == 0); + ATF_REQUIRE(p[0] >= 0); + ATF_REQUIRE(p[1] >= 0); + + char c = 0; + ssize_t r; + while ((r = write(p[1], &c, 1)) == 1) { + } + ATF_REQUIRE(r < 0); + ATF_REQUIRE(errno == EAGAIN || errno == EWOULDBLOCK); + + ATF_REQUIRE(close(p[1]) == 0); + + int kq = kqueue(); + ATF_REQUIRE(kq >= 0); + + struct kevent kev[32]; + EV_SET(&kev[0], p[0], EVFILT_READ, EV_ADD | EV_CLEAR | EV_RECEIPT, /**/ + 0, 0, 0); + EV_SET(&kev[1], p[0], EVFILT_WRITE, EV_ADD | EV_CLEAR | EV_RECEIPT, /**/ + 0, 0, 0); + + /* + * Trying to register EVFILT_WRITE when the pipe is closed leads to an + * EPIPE error. + */ + + ATF_REQUIRE(kevent(kq, kev, 2, kev, 2, NULL) == 2); + ATF_REQUIRE((kev[0].flags & EV_ERROR) != 0); + ATF_REQUIRE(kev[0].data == 0); + ATF_REQUIRE((kev[1].flags & EV_ERROR) != 0); + ATF_REQUIRE(kev[1].data == EPIPE); + + ATF_REQUIRE(kevent(kq, NULL, 0, kev, nitems(kev), + &(struct timespec) { 0, 0 }) == 1); + ATF_REQUIRE(kev[0].ident == (uintptr_t)p[0]); + ATF_REQUIRE(kev[0].filter == EVFILT_READ); + ATF_REQUIRE(kev[0].flags == (EV_EOF | EV_CLEAR | EV_RECEIPT)); + ATF_REQUIRE(kev[0].fflags == 0); + ATF_REQUIRE(kev[0].data == 65536); + ATF_REQUIRE(kev[0].udata == 0); + + ATF_REQUIRE(close(kq) == 0); + ATF_REQUIRE(close(p[0]) == 0); +} + +ATF_TC_WITHOUT_HEAD(pipe_kqueue__closed_write_end_register_before_close); +ATF_TC_BODY(pipe_kqueue__closed_write_end_register_before_close, tc) +{ + int p[2] = { -1, -1 }; + + ATF_REQUIRE(pipe2(p, O_CLOEXEC | O_NONBLOCK) == 0); + ATF_REQUIRE(p[0] >= 0); + ATF_REQUIRE(p[1] >= 0); + + int kq = kqueue(); + ATF_REQUIRE(kq >= 0); + + struct kevent kev[32]; + EV_SET(&kev[0], p[0], EVFILT_READ, EV_ADD | EV_CLEAR | EV_RECEIPT, /**/ + 0, 0, 0); + EV_SET(&kev[1], p[0], EVFILT_WRITE, EV_ADD | EV_CLEAR | EV_RECEIPT, /**/ + 0, 0, 0); + + /* + * Registering EVFILT_WRITE before the pipe is closed leads to a + * EVFILT_WRITE event with EV_EOF set. + */ + + ATF_REQUIRE(kevent(kq, kev, 2, kev, 2, NULL) == 2); + ATF_REQUIRE((kev[0].flags & EV_ERROR) != 0); + ATF_REQUIRE(kev[0].data == 0); + ATF_REQUIRE((kev[1].flags & EV_ERROR) != 0); + ATF_REQUIRE(kev[1].data == 0); + + char c = 0; + ssize_t r; + while ((r = write(p[1], &c, 1)) == 1) { + } + ATF_REQUIRE(r < 0); + ATF_REQUIRE(errno == EAGAIN || errno == EWOULDBLOCK); + + ATF_REQUIRE(close(p[1]) == 0); + + ATF_REQUIRE(kevent(kq, NULL, 0, kev, nitems(kev), + &(struct timespec) { 0, 0 }) == 2); + { + ATF_REQUIRE(kev[0].ident == (uintptr_t)p[0]); + ATF_REQUIRE(kev[0].filter == EVFILT_WRITE); + ATF_REQUIRE(kev[0].flags == + (EV_EOF | EV_CLEAR | EV_ONESHOT | EV_RECEIPT)); + ATF_REQUIRE(kev[0].fflags == 0); + ATF_REQUIRE(kev[0].data == 4096 || + kev[0].data == 512 /* on FreeBSD 11.3 */); + ATF_REQUIRE(kev[0].udata == 0); + } + { + ATF_REQUIRE(kev[1].ident == (uintptr_t)p[0]); + ATF_REQUIRE(kev[1].filter == EVFILT_READ); + ATF_REQUIRE(kev[1].flags == (EV_EOF | EV_CLEAR | EV_RECEIPT)); + ATF_REQUIRE(kev[1].fflags == 0); + ATF_REQUIRE(kev[1].data == 65536); + ATF_REQUIRE(kev[1].udata == 0); + } + + ATF_REQUIRE(close(kq) == 0); + ATF_REQUIRE(close(p[0]) == 0); +} + +ATF_TP_ADD_TCS(tp) +{ + ATF_TP_ADD_TC(tp, pipe_kqueue__write_end); + ATF_TP_ADD_TC(tp, pipe_kqueue__closed_read_end); + ATF_TP_ADD_TC(tp, pipe_kqueue__closed_read_end_register_before_close); + ATF_TP_ADD_TC(tp, pipe_kqueue__closed_write_end); + ATF_TP_ADD_TC(tp, pipe_kqueue__closed_write_end_register_before_close); + + return atf_no_error(); +}