Page Menu
Home
FreeBSD
Search
Configure Global Search
Log In
Files
F111561105
D35303.id106688.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
16 KB
Referenced Files
None
Subscribers
None
D35303.id106688.diff
View Options
Index: share/man/man4/unix.4
===================================================================
--- share/man/man4/unix.4
+++ share/man/man4/unix.4
@@ -393,6 +393,35 @@
.Dv LOCAL_CREDS
socket option.
.El
+.Sh BUFFERING
+Due to local nature of the
+.Ux Ns -domain
+sockets they do not implement send buffers.
+The
+.Xr send 2
+and
+.Xr write 2
+families of system calls attempt to write data to the receive buffer of the
+destination socket.
+.Pp
+The
+.Ux Ns -domain
+sockets of
+.Dv SOCK_DGRAM
+kind are unreliable and always non-blocking for write operations.
+A
+.Dv SOCK_DGRAM
+socket that has been bound with bind(2) can have multiple connections at the
+same time.
+The modern
+.Fx
+implementation would create space quotas in the receive buffer of the bound
+socket for every connected socket, preventing a situation when a single writer
+can exhaust buffer space.
+Apparent side effect of the implementation is that it doesn't guarantee
+that writes from different senders will arrive to receiver at the same
+chronological order they were sent. The order is preserved for writes
+coming through a particular connection.
.Sh SEE ALSO
.Xr connect 2 ,
.Xr dup 2 ,
Index: sys/kern/uipc_usrreq.c
===================================================================
--- sys/kern/uipc_usrreq.c
+++ sys/kern/uipc_usrreq.c
@@ -532,8 +532,14 @@
case SOCK_DGRAM:
STAILQ_INIT(&so->so_rcv.uxdg_mb);
- sendspace = unpdg_maxdgram;
- recvspace = unpdg_recvspace;
+ STAILQ_INIT(&so->so_snd.uxdg_mb);
+ TAILQ_INIT(&so->so_rcv.uxdg_conns);
+ /*
+ * Since send buffer is either bypassed or is a part
+ * of one-to-many receive buffer, we assign both space
+ * limits to unpdg_recvspace.
+ */
+ sendspace = recvspace = unpdg_recvspace;
break;
case SOCK_SEQPACKET:
@@ -858,9 +864,13 @@
switch (so->so_type) {
case SOCK_DGRAM:
/*
- * Everything should have been unlinked/freed by unp_dispose().
+ * Everything should have been unlinked/freed by unp_dispose()
+ * and/or unp_disconnect().
*/
+ MPASS(so->so_rcv.uxdg_peeked == NULL);
MPASS(STAILQ_EMPTY(&so->so_rcv.uxdg_mb));
+ MPASS(TAILQ_EMPTY(&so->so_rcv.uxdg_conns));
+ MPASS(STAILQ_EMPTY(&so->so_snd.uxdg_mb));
}
}
@@ -1130,6 +1140,21 @@
return (error);
}
+/* PF_UNIX/SOCK_DGRAM version of sbspace() */
+static inline long
+uipc_dgram_sbspace(struct sockbuf *sb)
+{
+ int bleft, mleft;
+
+ MPASS(sb->sb_hiwat >= sb->uxdg_cc);
+ MPASS(sb->sb_mbmax >= sb->uxdg_mbcnt);
+
+ bleft = sb->sb_hiwat - sb->uxdg_cc;
+ mleft = sb->sb_mbmax - sb->uxdg_mbcnt;
+
+ return ((bleft < mleft) ? bleft : mleft);
+}
+
/*
* PF_UNIX/SOCK_DGRAM send
*
@@ -1310,15 +1335,46 @@
f->m_pkthdr.memlen = mbcnt;
f->m_pkthdr.ctllen = ctl;
+ /*
+ * Destination socket buffer selection.
+ *
+ * Unconnected sends, when !(so->so_state & SS_ISCONNECTED) and the
+ * destination address is supplied, create a temporary connection for
+ * the run time of the function (see call to unp_connectat() above and
+ * to unp_disconnect() below). We distinguish them by condition of
+ * (addr != NULL). We intentionally avoid adding 'bool connected' for
+ * that condition, since, again, through the run time of this code we
+ * are always connected. For such "unconnected" sends, the destination
+ * buffer would be the receive buffer of destination socket so2.
+ *
+ * For connected sends, data lands on the send buffer of the sender's
+ * socket "so". Then, if we just added the very first datagram
+ * on this send buffer, we need to add the send buffer on to the
+ * receiving socket's buffer list. We put ourselves on top of the
+ * list. Such logic gives infrequent senders priority over frequent
+ * senders.
+ *
+ * Note on byte count management. As long as event methods kevent(2),
+ * select(2) are not protocol specific (yet), we need to maintain
+ * meaningful values on the receive buffer. So, the receive buffer
+ * would accumulate counters from all connected buffers potentially
+ * having sb_ccc > sb_hiwat or sb_mbcnt > sb_mbmax.
+ */
so2 = unp2->unp_socket;
- sb = &so2->so_rcv;
+ sb = (addr == NULL) ? &so->so_snd : &so2->so_rcv;
SOCK_RECVBUF_LOCK(so2);
- if (cc <= sbspace(sb)) {
+ if (cc <= uipc_dgram_sbspace(sb)) {
+ if (addr == NULL && STAILQ_EMPTY(&sb->uxdg_mb))
+ TAILQ_INSERT_HEAD(&so2->so_rcv.uxdg_conns, &so->so_snd,
+ uxdg_clist);
STAILQ_INSERT_TAIL(&sb->uxdg_mb, f, m_stailqpkt);
- sb->sb_acc += cc + ctl;
- sb->sb_ccc += cc + ctl;
- sb->sb_ctl += ctl;
- sb->sb_mbcnt += mbcnt;
+ sb->uxdg_cc += cc + ctl;
+ sb->uxdg_ctl += ctl;
+ sb->uxdg_mbcnt += mbcnt;
+ so2->so_rcv.sb_acc += cc + ctl;
+ so2->so_rcv.sb_ccc += cc + ctl;
+ so2->so_rcv.sb_ctl += ctl;
+ so2->so_rcv.sb_mbcnt += mbcnt;
sorwakeup_locked(so2);
f = NULL;
} else {
@@ -1350,19 +1406,23 @@
}
/*
- * PF_UNIX/SOCK_DGRAM receive with MSG_PEEK
+ * PF_UNIX/SOCK_DGRAM receive with MSG_PEEK.
+ * The mbuf has already been unlinked from the uxdg_mb of socket buffer
+ * and needs to be linked onto uxdg_peeked of receive socket buffer.
*/
static int
-uipc_peek_dgram(struct socket *so, struct sockaddr **psa, struct uio *uio,
- struct mbuf **controlp, int *flagsp)
+uipc_peek_dgram(struct socket *so, struct mbuf *m, struct sockaddr **psa,
+ struct uio *uio, struct mbuf **controlp, int *flagsp)
{
- struct mbuf *m;
ssize_t len;
int error;
+ so->so_rcv.uxdg_peeked = m;
+ so->so_rcv.uxdg_cc += m->m_pkthdr.len;
+ so->so_rcv.uxdg_ctl += m->m_pkthdr.ctllen;
+ so->so_rcv.uxdg_mbcnt += m->m_pkthdr.memlen;
SOCK_RECVBUF_UNLOCK(so);
- m = STAILQ_FIRST(&so->so_rcv.uxdg_mb);
KASSERT(m->m_type == MT_SONAME, ("m->m_type == %d", m->m_type));
if (psa != NULL)
*psa = sodupsockaddr(mtod(m, struct sockaddr *), M_WAITOK);
@@ -1409,6 +1469,7 @@
uipc_soreceive_dgram(struct socket *so, struct sockaddr **psa, struct uio *uio,
struct mbuf **mp0, struct mbuf **controlp, int *flagsp)
{
+ struct sockbuf *sb = NULL;
struct mbuf *m;
int flags, error;
ssize_t len;
@@ -1430,13 +1491,15 @@
return (error);
/*
- * Loop blocking while waiting for a datagram.
+ * Loop blocking while waiting for a datagram. Prioritize connected
+ * peers over unconnected sends. Set sb to selected socket buffer
+ * containing an mbuf on exit from the wait loop. A datagram that
+ * had already been peeked at has top priority.
*/
SOCK_RECVBUF_LOCK(so);
- while ((m = STAILQ_FIRST(&so->so_rcv.uxdg_mb)) == NULL) {
- KASSERT(sbavail(&so->so_rcv) == 0,
- ("soreceive_dgram: sb_mb NULL but sbavail %u",
- sbavail(&so->so_rcv)));
+ while ((m = so->so_rcv.uxdg_peeked) == NULL &&
+ (sb = TAILQ_FIRST(&so->so_rcv.uxdg_conns)) == NULL &&
+ (m = STAILQ_FIRST(&so->so_rcv.uxdg_mb)) == NULL) {
if (so->so_error) {
error = so->so_error;
so->so_error = 0;
@@ -1463,16 +1526,34 @@
}
}
+ if (sb == NULL)
+ sb = &so->so_rcv;
+ else if (m == NULL)
+ m = STAILQ_FIRST(&sb->uxdg_mb);
+ else
+ MPASS(m == so->so_rcv.uxdg_peeked);
+
+ MPASS(sb->uxdg_cc > 0);
M_ASSERTPKTHDR(m);
KASSERT(m->m_type == MT_SONAME, ("m->m_type == %d", m->m_type));
if (uio->uio_td)
uio->uio_td->td_ru.ru_msgrcv++;
+ if (__predict_true(m != so->so_rcv.uxdg_peeked)) {
+ STAILQ_REMOVE_HEAD(&sb->uxdg_mb, m_stailqpkt);
+ if (STAILQ_EMPTY(&sb->uxdg_mb) && sb != &so->so_rcv)
+ TAILQ_REMOVE(&so->so_rcv.uxdg_conns, sb, uxdg_clist);
+ } else
+ so->so_rcv.uxdg_peeked = NULL;
+
+ sb->uxdg_cc -= m->m_pkthdr.len;
+ sb->uxdg_ctl -= m->m_pkthdr.ctllen;
+ sb->uxdg_mbcnt -= m->m_pkthdr.memlen;
+
if (__predict_false(flags & MSG_PEEK))
- return (uipc_peek_dgram(so, psa, uio, controlp, flagsp));
+ return (uipc_peek_dgram(so, m, psa, uio, controlp, flagsp));
- STAILQ_REMOVE_HEAD(&so->so_rcv.uxdg_mb, m_stailqpkt);
so->so_rcv.sb_acc -= m->m_pkthdr.len;
so->so_rcv.sb_ccc -= m->m_pkthdr.len;
so->so_rcv.sb_ctl -= m->m_pkthdr.ctllen;
@@ -2103,6 +2184,7 @@
unp_disconnect(struct unpcb *unp, struct unpcb *unp2)
{
struct socket *so, *so2;
+ struct mbuf *m = NULL;
#ifdef INVARIANTS
struct unpcb *unptmp;
#endif
@@ -2117,6 +2199,34 @@
so2 = unp2->unp_socket;
switch (unp->unp_socket->so_type) {
case SOCK_DGRAM:
+ /*
+ * Remove our send socket buffer from the peer's receive buffer.
+ * Move the data to the receive buffer only if it is empty.
+ * This is a protection against a scenario where a peer
+ * connects, floods and disconnects, effectively blocking
+ * sendto() from unconnected sockets.
+ */
+ SOCK_RECVBUF_LOCK(so2);
+ if ((m = STAILQ_FIRST(&so->so_snd.uxdg_mb)) != NULL) {
+ STAILQ_INIT(&so->so_snd.uxdg_mb);
+ TAILQ_REMOVE(&so2->so_rcv.uxdg_conns, &so->so_snd,
+ uxdg_clist);
+ if (STAILQ_EMPTY(&so2->so_rcv.uxdg_mb)) {
+ STAILQ_INSERT_HEAD(&so2->so_rcv.uxdg_mb, m,
+ m_stailqpkt);
+ so2->so_rcv.uxdg_cc += so->so_snd.uxdg_cc;
+ so2->so_rcv.uxdg_ctl += so->so_snd.uxdg_ctl;
+ so2->so_rcv.uxdg_mbcnt += so->so_snd.uxdg_mbcnt;
+ m = NULL;
+ }
+ }
+ if (m != NULL) {
+ so2->so_rcv.sb_acc -= so->so_snd.uxdg_cc;
+ so2->so_rcv.sb_ccc -= so->so_snd.uxdg_cc;
+ so2->so_rcv.sb_ctl -= so->so_snd.uxdg_ctl;
+ so2->so_rcv.sb_mbcnt -= so->so_snd.uxdg_mbcnt;
+ }
+ SOCK_RECVBUF_UNLOCK(so2);
UNP_REF_LIST_LOCK();
#ifdef INVARIANTS
LIST_FOREACH(unptmp, &unp2->unp_refs, unp_reflink) {
@@ -2156,6 +2266,11 @@
if (!unp_pcb_rele(unp2))
UNP_PCB_UNLOCK(unp2);
}
+
+ if (m != NULL) {
+ unp_scan(m, unp_freerights);
+ m_freem(m);
+ }
}
/*
@@ -3153,7 +3268,7 @@
static void
unp_dispose(struct socket *so)
{
- struct sockbuf *sb = &so->so_rcv;
+ struct sockbuf *sb;
struct unpcb *unp;
struct mbuf *m;
@@ -3170,6 +3285,16 @@
SOCK_RECVBUF_LOCK(so);
switch (so->so_type) {
case SOCK_DGRAM:
+ while ((sb = TAILQ_FIRST(&so->so_rcv.uxdg_conns)) != NULL) {
+ STAILQ_CONCAT(&so->so_rcv.uxdg_mb, &sb->uxdg_mb);
+ TAILQ_REMOVE(&so->so_rcv.uxdg_conns, sb, uxdg_clist);
+ }
+ sb = &so->so_rcv;
+ if (sb->uxdg_peeked != NULL) {
+ STAILQ_INSERT_HEAD(&sb->uxdg_mb, sb->uxdg_peeked,
+ m_stailqpkt);
+ sb->uxdg_peeked = NULL;
+ }
m = STAILQ_FIRST(&sb->uxdg_mb);
STAILQ_INIT(&sb->uxdg_mb);
/* XXX: our shortened sbrelease() */
@@ -3178,6 +3303,7 @@
break;
case SOCK_STREAM:
case SOCK_SEQPACKET:
+ sb = &so->so_rcv;
m = sbcut_locked(sb, sb->sb_ccc);
KASSERT(sb->sb_ccc == 0 && sb->sb_mb == 0 && sb->sb_mbcnt == 0,
("%s: ccc %u mb %p mbcnt %u", __func__,
Index: sys/sys/sockbuf.h
===================================================================
--- sys/sys/sockbuf.h
+++ sys/sys/sockbuf.h
@@ -135,10 +135,35 @@
/*
* PF_UNIX/SOCK_DGRAM
*
- * Local protocol, thus any socket buffer is a receive buffer.
+ * Local protocol, thus we should buffer on the receive side
+ * only. However, in one to many configuration we don't want
+ * a single receive buffer to be shared. So we would link
+ * send buffers onto receive buffer. All the fields are locked
+ * by the receive buffer lock.
*/
struct {
+ /*
+ * For receive buffer: own queue of this buffer for
+ * unconnected sends. For send buffer: queue lended
+ * to the peer receive buffer, to isolate ourselves
+ * from other senders.
+ */
STAILQ_HEAD(, mbuf) uxdg_mb;
+ /* For receive buffer: datagram seen via MSG_PEEK. */
+ struct mbuf *uxdg_peeked;
+ /*
+ * For receive buffer: queue of send buffers of
+ * connected peers. For send buffer: linkage on
+ * connected peer receive buffer queue.
+ */
+ union {
+ TAILQ_HEAD(, sockbuf) uxdg_conns;
+ TAILQ_ENTRY(sockbuf) uxdg_clist;
+ };
+ /* Counters for this buffer uxdg_mb chain + peeked. */
+ u_int uxdg_cc;
+ u_int uxdg_ctl;
+ u_int uxdg_mbcnt;
};
};
};
Index: tests/sys/kern/unix_dgram.c
===================================================================
--- tests/sys/kern/unix_dgram.c
+++ tests/sys/kern/unix_dgram.c
@@ -170,15 +170,16 @@
ATF_TC_WITHOUT_HEAD(one2many);
ATF_TC_BODY(one2many, tc)
{
- int one, many[2], two;
- char buf[1024];
+ int one, many[3], two;
+#define BUFSIZE 1024
+ char buf[BUFSIZE], goodboy[BUFSIZE], flooder[BUFSIZE], notconn[BUFSIZE];
/* Establish one to many connection. */
ATF_REQUIRE((one = socket(PF_UNIX, SOCK_DGRAM, 0)) > 0);
ATF_REQUIRE(bind(one, (struct sockaddr *)&sun, sizeof(sun)) == 0);
/* listen(2) shall fail. */
ATF_REQUIRE(listen(one, -1) != 0);
- for (int i = 0; i < 2; i++) {
+ for (int i = 0; i < 3; i++) {
ATF_REQUIRE((many[i] = socket(PF_UNIX, SOCK_DGRAM, 0)) > 0);
ATF_REQUIRE(connect(many[i], (struct sockaddr *)&sun,
sizeof(sun)) == 0);
@@ -198,22 +199,78 @@
ATF_REQUIRE(recv(one, buf, sizeof(buf), 0) == 42);
/*
- * Sending from an unconnected socket to a bound socket. Connection is
- * created for the duration of the syscall.
+ * Interaction between concurrent senders. New feature in FreeBSD 14.
+ *
+ * One sender can not fill the receive side. Other senders can
+ * continue operation. Senders who don't fill their buffers are
+ * prioritized over flooders. Connected senders are prioritized over
+ * unconnected.
+ *
+ * Disconnecting a sender that has queued data optionally preserves
+ * the data. Allow the data to migrate to peers buffer only if the
+ * latter is empty. Otherwise discard it, to prevent against
+ * connect-fill-close attack.
*/
+#define FLOODER 13 /* for connected flooder on many[0] */
+#define GOODBOY 42 /* for a good boy on many[1] */
+#define NOTCONN 66 /* for sendto(2) via two */
+ goodboy[0] = GOODBOY;
+ flooder[0] = FLOODER;
+ notconn[0] = NOTCONN;
+
+ /* Connected priority over sendto(2). */
ATF_REQUIRE((two = socket(PF_UNIX, SOCK_DGRAM, 0)) > 0);
- ATF_REQUIRE(sendto(two, buf, 43, 0, (struct sockaddr *)&sun,
- sizeof(sun)) == 43);
- ATF_REQUIRE(recv(one, buf, sizeof(buf), 0) == 43);
+ ATF_REQUIRE(sendto(two, notconn, BUFSIZE, 0, (struct sockaddr *)&sun,
+ sizeof(sun)) == BUFSIZE);
+ ATF_REQUIRE(send(many[1], goodboy, BUFSIZE, 0) == BUFSIZE);
+ ATF_REQUIRE(recv(one, buf, sizeof(buf), 0) == sizeof(buf));
+ ATF_REQUIRE(buf[0] == GOODBOY); /* message from good boy comes first */
+ ATF_REQUIRE(recv(one, buf, sizeof(buf), 0) == sizeof(buf));
+ ATF_REQUIRE(buf[0] == NOTCONN); /* only then message from sendto(2) */
- /* One sender can fill the receive side.
- * Current behavior which needs improvement.
- */
- fill(many[0], buf, sizeof(buf));
- ATF_REQUIRE(send(many[1], buf, sizeof(buf), 0) == -1);
+ /* Casual sender priority over a flooder. */
+ fill(many[0], flooder, sizeof(flooder));
+ ATF_REQUIRE(send(many[0], flooder, BUFSIZE, 0) == -1);
ATF_REQUIRE(errno == ENOBUFS);
+ ATF_REQUIRE(send(many[1], goodboy, BUFSIZE, 0) == BUFSIZE);
+ ATF_REQUIRE(recv(one, buf, sizeof(buf), 0) == sizeof(buf));
+ ATF_REQUIRE(buf[0] == GOODBOY); /* message from good boy comes first */
+ ATF_REQUIRE(recv(one, buf, sizeof(buf), 0) == sizeof(buf));
+ ATF_REQUIRE(buf[0] == FLOODER); /* only then message from flooder */
+
+ /* Once seen, a message can't be deprioritized by any other message. */
+ ATF_REQUIRE(recv(one, buf, sizeof(buf), MSG_PEEK) == sizeof(buf));
+ ATF_REQUIRE(buf[0] == FLOODER); /* message from the flooder seen */
+ ATF_REQUIRE(send(many[1], goodboy, BUFSIZE, 0) == BUFSIZE);
+ ATF_REQUIRE(recv(one, buf, sizeof(buf), MSG_PEEK) == sizeof(buf));
+ ATF_REQUIRE(buf[0] == FLOODER); /* should be the same message */
+ ATF_REQUIRE(recv(one, buf, sizeof(buf), 0) == sizeof(buf));
+ ATF_REQUIRE(buf[0] == FLOODER); /* now we read it out... */
ATF_REQUIRE(recv(one, buf, sizeof(buf), 0) == sizeof(buf));
- ATF_REQUIRE(send(many[1], buf, sizeof(buf), 0) == sizeof(buf));
+ ATF_REQUIRE(buf[0] == GOODBOY); /* ... and next one is the good boy */
+
+ /* Disconnect in presence of data from not connected. */
+ ATF_REQUIRE(sendto(two, notconn, BUFSIZE, 0, (struct sockaddr *)&sun,
+ sizeof(sun)) == BUFSIZE);
+ close(many[0]);
+ ATF_REQUIRE(recv(one, buf, sizeof(buf), 0) == sizeof(buf));
+ ATF_REQUIRE(buf[0] == NOTCONN); /* message from sendto() */
+ ATF_REQUIRE(recv(one, buf, sizeof(buf), MSG_DONTWAIT) == -1);
+ ATF_REQUIRE(errno == EAGAIN); /* data from many[0] discarded */
+
+ /* Disconnect in absence of data from not connected. */
+ ATF_REQUIRE(send(many[1], goodboy, BUFSIZE, 0) == BUFSIZE);
+ close(many[1]);
+ ATF_REQUIRE(recv(one, buf, sizeof(buf), 0) == sizeof(buf));
+ ATF_REQUIRE(buf[0] == GOODBOY); /* message from many[1] preserved */
+
+ /* Check that nothing leaks on close(2). */
+ ATF_REQUIRE(send(many[2], buf, 42, 0) == 42);
+ ATF_REQUIRE(send(many[2], buf, 42, 0) == 42);
+ ATF_REQUIRE(recv(one, buf, sizeof(buf), MSG_PEEK) == 42);
+ ATF_REQUIRE(sendto(two, notconn, 42, 0, (struct sockaddr *)&sun,
+ sizeof(sun)) == 42);
+ close(one);
}
/*
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Mar 6, 7:08 AM (4 h, 30 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
17009972
Default Alt Text
D35303.id106688.diff (16 KB)
Attached To
Mode
D35303: unix/dgram: smart socket buffers for one-to-many sockets
Attached
Detach File
Event Timeline
Log In to Comment