Index: sys/compat/linux/linux_event.h =================================================================== --- sys/compat/linux/linux_event.h +++ sys/compat/linux/linux_event.h @@ -40,15 +40,17 @@ #define LINUX_EPOLLERR 0x008 #define LINUX_EPOLLHUP 0x010 #define LINUX_EPOLLRDHUP 0x2000 -#define LINUX_EPOLLWAKEUP 1u<<29 -#define LINUX_EPOLLONESHOT 1u<<30 -#define LINUX_EPOLLET 1u<<31 +#define LINUX_EPOLLEXCLUSIVE (1u<<28) +#define LINUX_EPOLLWAKEUP (1u<<29) +#define LINUX_EPOLLONESHOT (1u<<30) +#define LINUX_EPOLLET (1u<<31) #define LINUX_EPOLL_EVRD (LINUX_EPOLLIN|LINUX_EPOLLRDNORM) #define LINUX_EPOLL_EVWR (LINUX_EPOLLOUT|LINUX_EPOLLWRNORM) #define LINUX_EPOLL_EVSUP (LINUX_EPOLLET|LINUX_EPOLLONESHOT \ |LINUX_EPOLLHUP|LINUX_EPOLLERR|LINUX_EPOLLPRI \ - |LINUX_EPOLL_EVRD|LINUX_EPOLL_EVWR|LINUX_EPOLLRDHUP) + |LINUX_EPOLL_EVRD|LINUX_EPOLL_EVWR|LINUX_EPOLLRDHUP \ + |LINUX_EPOLLEXCLUSIVE) #define LINUX_EPOLL_CTL_ADD 1 #define LINUX_EPOLL_CTL_DEL 2 Index: sys/compat/linux/linux_event.c =================================================================== --- sys/compat/linux/linux_event.c +++ sys/compat/linux/linux_event.c @@ -214,6 +214,8 @@ kev_flags |= EV_ERROR; if ((levents & LINUX_EPOLLRDHUP) != 0) kev_flags |= EV_EOF; + if ((levents & LINUX_EPOLLEXCLUSIVE) != 0) + kev_flags |= EV_EXCLUSIVE; /* flags related to what event is registered */ if ((levents & LINUX_EPOLL_EVRD) != 0) { Index: sys/kern/kern_event.c =================================================================== --- sys/kern/kern_event.c +++ sys/kern/kern_event.c @@ -115,7 +115,7 @@ struct kevent_copyops *k_ops, const struct timespec *timeout, struct kevent *keva, struct thread *td); -static void kqueue_wakeup(struct kqueue *kq); +static void kqueue_wakeup(struct kqueue *kq, bool one); static struct filterops *kqueue_fo_find(int filt); static void kqueue_fo_release(int filt); struct g_kevent_args; @@ -223,6 +223,18 @@ if (!(islock)) \ KQ_UNLOCK((kn)->kn_kq); \ } while (0) +#define KNOTE_ACTIVATE2(kn, islock, isbreak) do { \ + if ((islock)) \ + mtx_assert(&(kn)->kn_kq->kq_lock, MA_OWNED); \ + else \ + KQ_LOCK((kn)->kn_kq); \ + (kn)->kn_status |= KN_ACTIVE; \ + (isbreak) = ((kn->kn_flags & EV_EXCLUSIVE) != 0); \ + if (((kn)->kn_status & (KN_QUEUED | KN_DISABLED)) == 0) \ + knote_enqueue((kn)); \ + if (!(islock)) \ + KQ_UNLOCK((kn)->kn_kq); \ +} while (0) #define KQ_LOCK(kq) do { \ mtx_lock(&(kq)->kq_lock); \ } while (0) @@ -2330,13 +2342,13 @@ } static void -kqueue_wakeup(struct kqueue *kq) +kqueue_wakeup(struct kqueue *kq, bool one) { KQ_OWNED(kq); if ((kq->kq_state & KQ_SLEEP) == KQ_SLEEP) { kq->kq_state &= ~KQ_SLEEP; - wakeup(kq); + one == true ? wakeup_one(kq) : wakeup(kq); } if ((kq->kq_state & KQ_SEL) == KQ_SEL) { selwakeuppri(&kq->kq_sel, PSOCK); @@ -2363,7 +2375,7 @@ { struct kqueue *kq; struct knote *kn, *tkn; - int error; + int error, stop; if (list == NULL) return; @@ -2400,13 +2412,15 @@ KQ_LOCK(kq); kn_leave_flux(kn); if (error) - KNOTE_ACTIVATE(kn, 1); + KNOTE_ACTIVATE2(kn, 1, stop); KQ_UNLOCK_FLUX(kq); } else { if (kn->kn_fop->f_event(kn, hint)) - KNOTE_ACTIVATE(kn, 1); + KNOTE_ACTIVATE2(kn, 1, stop); KQ_UNLOCK(kq); } + if (stop) + break; } if ((lockflags & KNF_LISTLOCKED) == 0) list->kl_unlock(list->kl_lockarg); @@ -2780,6 +2794,7 @@ knote_enqueue(struct knote *kn) { struct kqueue *kq = kn->kn_kq; + bool one; KQ_OWNED(kn->kn_kq); KASSERT((kn->kn_status & KN_QUEUED) == 0, ("knote already queued")); @@ -2787,7 +2802,8 @@ TAILQ_INSERT_TAIL(&kq->kq_head, kn, kn_tqe); kn->kn_status |= KN_QUEUED; kq->kq_count++; - kqueue_wakeup(kq); + one = (kn->kn_flags & EV_EXCLUSIVE) != 0; + kqueue_wakeup(kq, one); } static void Index: sys/sys/event.h =================================================================== --- sys/sys/event.h +++ sys/sys/event.h @@ -145,6 +145,7 @@ #define EV_CLEAR 0x0020 /* clear event state after reporting */ #define EV_RECEIPT 0x0040 /* force EV_ERROR on success, data=0 */ #define EV_DISPATCH 0x0080 /* disable event after reporting */ +#define EV_EXCLUSIVE 0x0400 /* wakeup only one waiter */ #define EV_SYSFLAGS 0xF000 /* reserved by system */ #define EV_DROP 0x1000 /* note should be dropped */ Index: tests/sys/kqueue/Makefile =================================================================== --- tests/sys/kqueue/Makefile +++ tests/sys/kqueue/Makefile @@ -5,6 +5,7 @@ TESTSDIR= ${TESTSBASE}/sys/kqueue BINDIR= ${TESTSDIR} +ATF_TESTS_C+= kevent_exclusive ATF_TESTS_C+= kqueue_peek_signal NETBSD_ATF_TESTS_C= proc1_test @@ -14,6 +15,8 @@ NETBSD_ATF_TESTS_C+= sig_test NETBSD_ATF_TESTS_C+= vnode_test +LIBADD.kevent_exclusive+= pthread + WARNS?= 3 TESTS_SUBDIRS+= libkqueue Index: tests/sys/kqueue/kevent_exclusive.c =================================================================== --- /dev/null +++ tests/sys/kqueue/kevent_exclusive.c @@ -0,0 +1,204 @@ +/*- + * Copyright 2022 Dmitry Chagin + * + * SPDX-License-Identifier: BSD-2-Clause + */ +#include +#include +#include +#include + +#include +#include + +#include + +static int kq; +static int evagain; +static int evfired; +static int evwblock; +static int evaccpt; + +#define SERVER_PATH "server_kevexcl" + +#ifndef EV_EXCLUSIVE +#define EV_EXCLUSIVE 0x0400 +#endif + +static inline struct timespec +make_timespec(time_t s, long int ns) +{ + struct timespec rts; + + rts.tv_sec = s; + rts.tv_nsec = ns; + return (rts); +} + +static void * +waiter(void *arg) +{ + struct kevent tevent; + struct timespec timeout; + int afd; + ssize_t ec; + + timeout = make_timespec(0, 5000000); + + ec = kevent(kq, NULL, 0, &tevent, 1, &timeout); + if (ec == 0) { + evwblock++; + return (NULL); + } + ATF_REQUIRE_EQ(1, ec); + evfired++; + + afd = accept((int)tevent.ident, NULL, NULL); + if (afd == -1 && errno == EAGAIN) + evagain++; + else if (afd > 0) { + evaccpt++; + ATF_REQUIRE_EQ(0, close(afd)); + } + return (NULL); +} + +ATF_TC_WITH_CLEANUP(exclusiveon); +ATF_TC_BODY(exclusiveon, tc) +{ + struct sockaddr_un server; + socklen_t len; + struct kevent event; + pthread_t w0, w1; + int srvfd, clfd; + + evagain = evfired = evwblock = evaccpt = 0; + + /* server */ + len = sizeof(struct sockaddr_un); + memset(&server, 0, len); + server.sun_family = AF_UNIX; + strcpy(server.sun_path, SERVER_PATH); + + srvfd = socket(PF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0); + ATF_REQUIRE(srvfd > 0); + ATF_REQUIRE_EQ(0, bind(srvfd, (struct sockaddr *)&server, len)); + ATF_REQUIRE_EQ(0, listen(srvfd, 5)); + + kq = kqueue(); + ATF_REQUIRE(kq >= 0); + + EV_SET(&event, srvfd, EVFILT_READ, EV_ADD | EV_EXCLUSIVE, + 0, 0, NULL); + + ATF_REQUIRE_EQ(0, kevent(kq, &event, 1, NULL, 0, NULL)); + + ATF_REQUIRE_EQ(0, pthread_create(&w0, NULL, waiter, + (void *) (uintptr_t) 1)); + ATF_REQUIRE_EQ(0, pthread_create(&w1, NULL, waiter, + (void *) (uintptr_t) 2)); + + clfd = socket(PF_UNIX, SOCK_STREAM, 0); + ATF_REQUIRE(clfd > 0); + + ATF_REQUIRE_EQ(0, connect(clfd, (struct sockaddr *)&server, len)); + + ATF_REQUIRE_EQ(0, pthread_join(w0, NULL)); + ATF_REQUIRE_EQ(0, pthread_join(w1, NULL)); + + ATF_REQUIRE_EQ(0, evagain); + ATF_REQUIRE_EQ(1, evfired); + ATF_REQUIRE_EQ(1, evwblock); + ATF_REQUIRE_EQ(1, evaccpt); + + ATF_REQUIRE_EQ(0, close(clfd)); + ATF_REQUIRE_EQ(0, close(srvfd)); + ATF_REQUIRE_EQ(0, close(kq)); + ATF_REQUIRE_EQ(0, unlink(SERVER_PATH)); +} + +ATF_TC_HEAD(exclusiveon, tc) +{ + + atf_tc_set_md_var(tc, "descr", "Check waleups if EV_EXCLUSIVE is set"); +} + +ATF_TC_CLEANUP(exclusiveon, tc) +{ + + unlink(SERVER_PATH); +} + +ATF_TC_WITH_CLEANUP(exclusiveoff); +ATF_TC_BODY(exclusiveoff, tc) +{ + struct sockaddr_un server; + socklen_t len; + struct kevent event; + pthread_t w0, w1; + int srvfd, clfd; + + evagain = evfired = evwblock = evaccpt = 0; + + /* server */ + len = sizeof(struct sockaddr_un); + memset(&server, 0, len); + server.sun_family = AF_UNIX; + strcpy(server.sun_path, SERVER_PATH); + + srvfd = socket(PF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0); + ATF_REQUIRE(srvfd > 0); + ATF_REQUIRE_EQ(0, bind(srvfd, (struct sockaddr *)&server, len)); + ATF_REQUIRE_EQ(0, listen(srvfd, 5)); + + kq = kqueue(); + ATF_REQUIRE(kq >= 0); + + EV_SET(&event, srvfd, EVFILT_READ, EV_ADD, 0, 0, NULL); + + ATF_REQUIRE_EQ(0, kevent(kq, &event, 1, NULL, 0, NULL)); + + ATF_REQUIRE_EQ(0, pthread_create(&w0, NULL, waiter, NULL)); + ATF_REQUIRE_EQ(0, pthread_create(&w1, NULL, waiter, NULL)); + + clfd = socket(PF_UNIX, SOCK_STREAM, 0); + ATF_REQUIRE(clfd > 0); + + ATF_REQUIRE_EQ(0, connect(clfd, (struct sockaddr *)&server, len)); + + ATF_REQUIRE_EQ(0, pthread_join(w0, NULL)); + ATF_REQUIRE_EQ(0, pthread_join(w1, NULL)); + + ATF_REQUIRE_EQ(1, evagain); + ATF_REQUIRE_EQ(2, evfired); + ATF_REQUIRE_EQ(0, evwblock); + ATF_REQUIRE_EQ(1, evaccpt); + + ATF_REQUIRE_EQ(0, close(clfd)); + ATF_REQUIRE_EQ(0, close(srvfd)); + ATF_REQUIRE_EQ(0, close(kq)); + ATF_REQUIRE_EQ(0, unlink(SERVER_PATH)); +} + +ATF_TC_HEAD(exclusiveoff, tc) +{ + + atf_tc_set_md_var(tc, "descr", "Check waleups if EV_EXCLUSIVE is not set"); +} + +ATF_TC_CLEANUP(exclusiveoff, tc) +{ + + unlink(SERVER_PATH); +} + +ATF_TP_ADD_TCS(tp) +{ + /* + * Thundering herd & EV_EXCLUSIVE + */ + ATF_TP_ADD_TC(tp, exclusiveon); + ATF_TP_ADD_TC(tp, exclusiveoff); + + return (atf_no_error()); +}