Index: sys/kern/uipc_usrreq.c =================================================================== --- sys/kern/uipc_usrreq.c +++ sys/kern/uipc_usrreq.c @@ -532,8 +532,9 @@ case SOCK_DGRAM: STAILQ_INIT(&so->so_rcv.uxdg_mb); - sendspace = unpdg_maxdgram; - recvspace = unpdg_recvspace; + STAILQ_INIT(&so->so_snd.uxdg_mb); + TAILQ_INIT(&so->so_rcv.uxdg_conns); + sendspace = recvspace = unpdg_recvspace; break; case SOCK_SEQPACKET: @@ -858,9 +859,13 @@ switch (so->so_type) { case SOCK_DGRAM: /* - * Everything should have been unlinked/freed by unp_dispose(). + * Everything should have been unlinked/freed by unp_dispose() + * and/or unp_disconnect(). */ + MPASS(so->so_rcv.uxdg_peeked == NULL); MPASS(STAILQ_EMPTY(&so->so_rcv.uxdg_mb)); + MPASS(TAILQ_EMPTY(&so->so_rcv.uxdg_conns)); + MPASS(STAILQ_EMPTY(&so->so_snd.uxdg_mb)); } } @@ -1130,6 +1135,18 @@ return (error); } +/* PF_UNIX/SOCK_DGRAM version of sbspace() */ +static inline long +uipc_dgram_sbspace(struct sockbuf *sb) +{ + int bleft, mleft; + + bleft = sb->sb_hiwat - sb->uxdg_cc; + mleft = sb->sb_mbmax - sb->uxdg_mbcnt; + + return ((bleft < mleft) ? bleft : mleft); +} + /* * PF_UNIX/SOCK_DGRAM send * @@ -1310,15 +1327,40 @@ f->m_pkthdr.memlen = mbcnt; f->m_pkthdr.ctllen = ctl; + /* + * Destination socket buffer selection. + * + * Unconnected sends (in the scope of this function better call them + * "temporary connected"), which are identified by (addr != NULL) + * condition, use the receive buffer of the receiving socket "so2". + * + * Connected sends land on the send buffer of the sender's + * socket "so". Then, if we just added the very first datagram + * on this send buffer, we need to add the send buffer on to the + * receive buffer list. We put ourselves on top of the list. Such + * logic gives seldom senders a priority over frequent senders. + * + * Note on byte count management. As long as event methods kevent(2), + * select(2) are not protocol specific (yet), we need to maintain + * meaningful values on the receive buffer. So, the receive buffer + * would accumulate counters from all connected buffers potentially + * having sb_ccc > sb_hiwat or sb_mbcnt > sb_mbmax. + */ so2 = unp2->unp_socket; - sb = &so2->so_rcv; + sb = (addr == NULL) ? &so->so_snd : &so2->so_rcv; SOCK_RECVBUF_LOCK(so2); - if (cc <= sbspace(sb)) { + if (cc <= uipc_dgram_sbspace(sb)) { + if (addr == NULL && STAILQ_EMPTY(&sb->uxdg_mb)) + TAILQ_INSERT_HEAD(&so2->so_rcv.uxdg_conns, &so->so_snd, + uxdg_clist); STAILQ_INSERT_TAIL(&sb->uxdg_mb, f, m_stailqpkt); - sb->sb_acc += cc + ctl; - sb->sb_ccc += cc + ctl; - sb->sb_ctl += ctl; - sb->sb_mbcnt += mbcnt; + sb->uxdg_cc += cc + ctl; + sb->uxdg_ctl += ctl; + sb->uxdg_mbcnt += mbcnt; + so2->so_rcv.sb_acc += cc + ctl; + so2->so_rcv.sb_ccc += cc + ctl; + so2->so_rcv.sb_ctl += ctl; + so2->so_rcv.sb_mbcnt += mbcnt; sorwakeup_locked(so2); f = NULL; } else { @@ -1350,19 +1392,23 @@ } /* - * PF_UNIX/SOCK_DGRAM receive with MSG_PEEK + * PF_UNIX/SOCK_DGRAM receive with MSG_PEEK. + * The mbuf has already been unlinked from the uxdg_mb of socket buffer + * and needs to be linked onto uxdg_peeked of receive socket buffer. */ static int -uipc_peek_dgram(struct socket *so, struct sockaddr **psa, struct uio *uio, - struct mbuf **controlp, int *flagsp) +uipc_peek_dgram(struct socket *so, struct mbuf *m, struct sockaddr **psa, + struct uio *uio, struct mbuf **controlp, int *flagsp) { - struct mbuf *m; ssize_t len; int error; + so->so_rcv.uxdg_peeked = m; + so->so_rcv.uxdg_cc += m->m_pkthdr.len; + so->so_rcv.uxdg_ctl += m->m_pkthdr.ctllen; + so->so_rcv.uxdg_mbcnt += m->m_pkthdr.memlen; SOCK_RECVBUF_UNLOCK(so); - m = STAILQ_FIRST(&so->so_rcv.uxdg_mb); KASSERT(m->m_type == MT_SONAME, ("m->m_type == %d", m->m_type)); if (psa != NULL) *psa = sodupsockaddr(mtod(m, struct sockaddr *), M_WAITOK); @@ -1409,6 +1455,7 @@ uipc_soreceive_dgram(struct socket *so, struct sockaddr **psa, struct uio *uio, struct mbuf **mp0, struct mbuf **controlp, int *flagsp) { + struct sockbuf *sb = NULL; struct mbuf *m; int flags, error; ssize_t len; @@ -1430,13 +1477,15 @@ return (error); /* - * Loop blocking while waiting for a datagram. + * Loop blocking while waiting for a datagram. Prioritize connected + * peers over unconnected sends. Set sb to selected socket buffer + * containing an mbuf on exit from the wait loop. A datagram that + * had already been peeked at has top priority. */ SOCK_RECVBUF_LOCK(so); - while ((m = STAILQ_FIRST(&so->so_rcv.uxdg_mb)) == NULL) { - KASSERT(sbavail(&so->so_rcv) == 0, - ("soreceive_dgram: sb_mb NULL but sbavail %u", - sbavail(&so->so_rcv))); + while ((m = so->so_rcv.uxdg_peeked) == NULL && + (sb = TAILQ_FIRST(&so->so_rcv.uxdg_conns)) == NULL && + (m = STAILQ_FIRST(&so->so_rcv.uxdg_mb)) == NULL) { if (so->so_error) { error = so->so_error; so->so_error = 0; @@ -1463,16 +1512,34 @@ } } + if (sb == NULL) + sb = &so->so_rcv; + else if (m == NULL) + m = STAILQ_FIRST(&sb->uxdg_mb); + else + MPASS(m == so->so_rcv.uxdg_peeked); + + MPASS(sb->uxdg_cc > 0); M_ASSERTPKTHDR(m); KASSERT(m->m_type == MT_SONAME, ("m->m_type == %d", m->m_type)); if (uio->uio_td) uio->uio_td->td_ru.ru_msgrcv++; + if (__predict_true(m != so->so_rcv.uxdg_peeked)) { + STAILQ_REMOVE_HEAD(&sb->uxdg_mb, m_stailqpkt); + if (STAILQ_EMPTY(&sb->uxdg_mb) && sb != &so->so_rcv) + TAILQ_REMOVE(&so->so_rcv.uxdg_conns, sb, uxdg_clist); + } else + so->so_rcv.uxdg_peeked = NULL; + + sb->uxdg_cc -= m->m_pkthdr.len; + sb->uxdg_ctl -= m->m_pkthdr.ctllen; + sb->uxdg_mbcnt -= m->m_pkthdr.memlen; + if (__predict_false(flags & MSG_PEEK)) - return (uipc_peek_dgram(so, psa, uio, controlp, flagsp)); + return (uipc_peek_dgram(so, m, psa, uio, controlp, flagsp)); - STAILQ_REMOVE_HEAD(&so->so_rcv.uxdg_mb, m_stailqpkt); so->so_rcv.sb_acc -= m->m_pkthdr.len; so->so_rcv.sb_ccc -= m->m_pkthdr.len; so->so_rcv.sb_ctl -= m->m_pkthdr.ctllen; @@ -2103,6 +2170,7 @@ unp_disconnect(struct unpcb *unp, struct unpcb *unp2) { struct socket *so, *so2; + struct mbuf *m = NULL; #ifdef INVARIANTS struct unpcb *unptmp; #endif @@ -2117,6 +2185,34 @@ so2 = unp2->unp_socket; switch (unp->unp_socket->so_type) { case SOCK_DGRAM: + /* + * Remove our send socket buffer from peers receive buffer. + * Move the data to the receive buffer only if it is empty. + * This a protection against a scenario when a peer connects, + * floods and disconnects, effectively blocking sendto() from + * unconnected sockets. + */ + SOCK_RECVBUF_LOCK(so2); + if ((m = STAILQ_FIRST(&so->so_snd.uxdg_mb)) != NULL) { + STAILQ_INIT(&so->so_snd.uxdg_mb); + TAILQ_REMOVE(&so2->so_rcv.uxdg_conns, &so->so_snd, + uxdg_clist); + if (STAILQ_EMPTY(&so2->so_rcv.uxdg_mb)) { + STAILQ_INSERT_HEAD(&so2->so_rcv.uxdg_mb, m, + m_stailqpkt); + so2->so_rcv.uxdg_cc += so->so_snd.uxdg_cc; + so2->so_rcv.uxdg_ctl += so->so_snd.uxdg_ctl; + so2->so_rcv.uxdg_mbcnt += so->so_snd.uxdg_mbcnt; + m = NULL; + } + } + if (m != NULL) { + so2->so_rcv.sb_acc -= so->so_snd.uxdg_cc; + so2->so_rcv.sb_ccc -= so->so_snd.uxdg_cc; + so2->so_rcv.sb_ctl -= so->so_snd.uxdg_ctl; + so2->so_rcv.sb_mbcnt -= so->so_snd.uxdg_mbcnt; + } + SOCK_RECVBUF_UNLOCK(so2); UNP_REF_LIST_LOCK(); #ifdef INVARIANTS LIST_FOREACH(unptmp, &unp2->unp_refs, unp_reflink) { @@ -2156,6 +2252,11 @@ if (!unp_pcb_rele(unp2)) UNP_PCB_UNLOCK(unp2); } + + if (m != NULL) { + unp_scan(m, unp_freerights); + m_freem(m); + } } /* @@ -3153,7 +3254,7 @@ static void unp_dispose(struct socket *so) { - struct sockbuf *sb = &so->so_rcv; + struct sockbuf *sb; struct unpcb *unp; struct mbuf *m; @@ -3170,6 +3271,16 @@ SOCK_RECVBUF_LOCK(so); switch (so->so_type) { case SOCK_DGRAM: + while ((sb = TAILQ_FIRST(&so->so_rcv.uxdg_conns)) != NULL) { + STAILQ_CONCAT(&so->so_rcv.uxdg_mb, &sb->uxdg_mb); + TAILQ_REMOVE(&so->so_rcv.uxdg_conns, sb, uxdg_clist); + } + sb = &so->so_rcv; + if (sb->uxdg_peeked != NULL) { + STAILQ_INSERT_HEAD(&sb->uxdg_mb, sb->uxdg_peeked, + m_stailqpkt); + sb->uxdg_peeked = NULL; + } m = STAILQ_FIRST(&sb->uxdg_mb); STAILQ_INIT(&sb->uxdg_mb); /* XXX: our shortened sbrelease() */ @@ -3178,6 +3289,7 @@ break; case SOCK_STREAM: case SOCK_SEQPACKET: + sb = &so->so_rcv; m = sbcut_locked(sb, sb->sb_ccc); KASSERT(sb->sb_ccc == 0 && sb->sb_mb == 0 && sb->sb_mbcnt == 0, ("%s: ccc %u mb %p mbcnt %u", __func__, Index: sys/sys/sockbuf.h =================================================================== --- sys/sys/sockbuf.h +++ sys/sys/sockbuf.h @@ -135,10 +135,35 @@ /* * PF_UNIX/SOCK_DGRAM * - * Local protocol, thus any socket buffer is a receive buffer. + * Local protocol, thus we should buffer on the receive side + * only. However, in one to many configuration we don't want + * a single receive buffer to be shared. So we would link + * send buffers onto receive buffer. All the fields are locked + * by the receive buffer lock. */ struct { + /* + * For receive buffer: own queue of this buffer for + * unconnected sends. For send buffer: queue lended + * to the peer receive buffer, to isolate ourselves + * from other senders. + */ STAILQ_HEAD(, mbuf) uxdg_mb; + /* For receive buffer: datagram seen via MSG_PEEK. */ + struct mbuf *uxdg_peeked; + /* + * For receive buffer: queue of send buffers of + * connected peers. For send buffer: linkage on + * connected peer receive buffer queue. + */ + union { + TAILQ_HEAD(, sockbuf) uxdg_conns; + TAILQ_ENTRY(sockbuf) uxdg_clist; + }; + /* Counters for this buffer uxdg_mb chain + peeked. */ + u_int uxdg_cc; + u_int uxdg_ctl; + u_int uxdg_mbcnt; }; }; }; Index: tests/sys/kern/unix_dgram.c =================================================================== --- tests/sys/kern/unix_dgram.c +++ tests/sys/kern/unix_dgram.c @@ -170,15 +170,16 @@ ATF_TC_WITHOUT_HEAD(one2many); ATF_TC_BODY(one2many, tc) { - int one, many[2], two; - char buf[1024]; + int one, many[3], two; +#define BUFSIZE 1024 + char buf[BUFSIZE], goodboy[BUFSIZE], flooder[BUFSIZE], notconn[BUFSIZE]; /* Establish one to many connection. */ ATF_REQUIRE((one = socket(PF_UNIX, SOCK_DGRAM, 0)) > 0); ATF_REQUIRE(bind(one, (struct sockaddr *)&sun, sizeof(sun)) == 0); /* listen(2) shall fail. */ ATF_REQUIRE(listen(one, -1) != 0); - for (int i = 0; i < 2; i++) { + for (int i = 0; i < 3; i++) { ATF_REQUIRE((many[i] = socket(PF_UNIX, SOCK_DGRAM, 0)) > 0); ATF_REQUIRE(connect(many[i], (struct sockaddr *)&sun, sizeof(sun)) == 0); @@ -198,22 +199,78 @@ ATF_REQUIRE(recv(one, buf, sizeof(buf), 0) == 42); /* - * Sending from an unconnected socket to a bound socket. Connection is - * created for the duration of the syscall. + * Interaction between concurrent senders. New feature in FreeBSD 14. + * + * One sender can not fill the receive side. Other senders can + * continue operation. Senders who don't fill their buffers are + * prioritized over flooders. Connected senders are prioritized over + * unconnected. + * + * Disconnecting a sender that has queued data optionally preserves + * the data. Allow the data to migrate to peers buffer only if the + * latter is empty. Otherwise discard it, to prevent against + * connect-fill-close attack. */ +#define FLOODER 13 /* for connected flooder on many[0] */ +#define GOODBOY 42 /* for a good boy on many[1] */ +#define NOTCONN 66 /* for sendto(2) via two */ + goodboy[0] = GOODBOY; + flooder[0] = FLOODER; + notconn[0] = NOTCONN; + + /* Connected priority over sendto(2). */ ATF_REQUIRE((two = socket(PF_UNIX, SOCK_DGRAM, 0)) > 0); - ATF_REQUIRE(sendto(two, buf, 43, 0, (struct sockaddr *)&sun, - sizeof(sun)) == 43); - ATF_REQUIRE(recv(one, buf, sizeof(buf), 0) == 43); + ATF_REQUIRE(sendto(two, notconn, BUFSIZE, 0, (struct sockaddr *)&sun, + sizeof(sun)) == BUFSIZE); + ATF_REQUIRE(send(many[1], goodboy, BUFSIZE, 0) == BUFSIZE); + ATF_REQUIRE(recv(one, buf, sizeof(buf), 0) == sizeof(buf)); + ATF_REQUIRE(buf[0] == GOODBOY); /* message from good boy comes first */ + ATF_REQUIRE(recv(one, buf, sizeof(buf), 0) == sizeof(buf)); + ATF_REQUIRE(buf[0] == NOTCONN); /* only then message from sendto(2) */ - /* One sender can fill the receive side. - * Current behavior which needs improvement. - */ - fill(many[0], buf, sizeof(buf)); - ATF_REQUIRE(send(many[1], buf, sizeof(buf), 0) == -1); + /* Casual sender priority over a flooder. */ + fill(many[0], flooder, sizeof(flooder)); + ATF_REQUIRE(send(many[0], flooder, BUFSIZE, 0) == -1); ATF_REQUIRE(errno == ENOBUFS); + ATF_REQUIRE(send(many[1], goodboy, BUFSIZE, 0) == BUFSIZE); + ATF_REQUIRE(recv(one, buf, sizeof(buf), 0) == sizeof(buf)); + ATF_REQUIRE(buf[0] == GOODBOY); /* message from good boy comes first */ + ATF_REQUIRE(recv(one, buf, sizeof(buf), 0) == sizeof(buf)); + ATF_REQUIRE(buf[0] == FLOODER); /* only then message from flooder */ + + /* Once seen, a message can't be deprioritized by any other message. */ + ATF_REQUIRE(recv(one, buf, sizeof(buf), MSG_PEEK) == sizeof(buf)); + ATF_REQUIRE(buf[0] == FLOODER); /* message from the flooder seen */ + ATF_REQUIRE(send(many[1], goodboy, BUFSIZE, 0) == BUFSIZE); + ATF_REQUIRE(recv(one, buf, sizeof(buf), MSG_PEEK) == sizeof(buf)); + ATF_REQUIRE(buf[0] == FLOODER); /* should be the same message */ + ATF_REQUIRE(recv(one, buf, sizeof(buf), 0) == sizeof(buf)); + ATF_REQUIRE(buf[0] == FLOODER); /* now we read it out... */ ATF_REQUIRE(recv(one, buf, sizeof(buf), 0) == sizeof(buf)); - ATF_REQUIRE(send(many[1], buf, sizeof(buf), 0) == sizeof(buf)); + ATF_REQUIRE(buf[0] == GOODBOY); /* ... and next one is the good boy */ + + /* Disconnect in presence of data from not connected. */ + ATF_REQUIRE(sendto(two, notconn, BUFSIZE, 0, (struct sockaddr *)&sun, + sizeof(sun)) == BUFSIZE); + close(many[0]); + ATF_REQUIRE(recv(one, buf, sizeof(buf), 0) == sizeof(buf)); + ATF_REQUIRE(buf[0] == NOTCONN); /* message from sendto() */ + ATF_REQUIRE(recv(one, buf, sizeof(buf), MSG_DONTWAIT) == -1); + ATF_REQUIRE(errno == EAGAIN); /* data from many[0] discarded */ + + /* Disconnect in absence of data from not connected. */ + ATF_REQUIRE(send(many[1], goodboy, BUFSIZE, 0) == BUFSIZE); + close(many[1]); + ATF_REQUIRE(recv(one, buf, sizeof(buf), 0) == sizeof(buf)); + ATF_REQUIRE(buf[0] == GOODBOY); /* message from many[1] preserved */ + + /* Check that nothing leaks on close(2). */ + ATF_REQUIRE(send(many[2], buf, 42, 0) == 42); + ATF_REQUIRE(send(many[2], buf, 42, 0) == 42); + ATF_REQUIRE(recv(one, buf, sizeof(buf), MSG_PEEK) == 42); + ATF_REQUIRE(sendto(two, notconn, 42, 0, (struct sockaddr *)&sun, + sizeof(sun)) == 42); + close(one); } /*