diff --git a/sys/kern/uipc_usrreq.c b/sys/kern/uipc_usrreq.c --- a/sys/kern/uipc_usrreq.c +++ b/sys/kern/uipc_usrreq.c @@ -5,7 +5,7 @@ * The Regents of the University of California. All Rights Reserved. * Copyright (c) 2004-2009 Robert N. M. Watson All Rights Reserved. * Copyright (c) 2018 Matthew Macy - * Copyright (c) 2022 Gleb Smirnoff + * Copyright (c) 2022-2024 Gleb Smirnoff * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions @@ -141,11 +141,14 @@ static struct task unp_defer_task; /* - * Both send and receive buffers are allocated PIPSIZ bytes of buffering for - * stream sockets, although the total for sender and receiver is actually - * only PIPSIZ. + * SOCK_STREAM and SOCK_SEQPACKET unix(4) sockets fully bypass the send buffer, + * however the notion of send buffer still makes sense with them. Its size is + * the amount of space that a send(2) syscall may copyin(9) before checking + * with * the receive buffer of a peer. Although not linked anywhere yet, + * pointed to by a stack variable, effectively it is a buffer that needs to be + * sized. * - * Datagram sockets really use the sendspace as the maximum datagram size, + * SOCK_DGRAM sockets really use the sendspace as the maximum datagram size, * and don't really want to reserve the sendspace. Their recvspace should be * large enough for at least one max-size datagram plus address. */ @@ -156,7 +159,7 @@ static u_long unpst_recvspace = PIPSIZ; static u_long unpdg_maxdgram = 8*1024; /* support 8KB syslog msgs */ static u_long unpdg_recvspace = 16*1024; -static u_long unpsp_sendspace = PIPSIZ; /* really max datagram size */ +static u_long unpsp_sendspace = PIPSIZ; static u_long unpsp_recvspace = PIPSIZ; static SYSCTL_NODE(_net, PF_LOCAL, local, CTLFLAG_RW | CTLFLAG_MPSAFE, 0, @@ -300,13 +303,12 @@ static void unp_scan(struct mbuf *, void (*)(struct filedescent **, int)); static void unp_discard(struct file *); static void unp_freerights(struct filedescent **, int); -static int unp_internalize(struct mbuf **, struct thread *, - struct mbuf **, u_int *, u_int *); +static int unp_internalize(struct mbuf *, struct mchain *, + struct thread *); static void unp_internalize_fp(struct file *); static int unp_externalize(struct mbuf *, struct mbuf **, int); static int unp_externalize_fp(struct file *); -static struct mbuf *unp_addsockcred(struct thread *, struct mbuf *, - int, struct mbuf **, u_int *, u_int *); +static void unp_addsockcred(struct thread *, struct mchain *, int); static void unp_process_defers(void * __unused, int); static void @@ -449,6 +451,7 @@ case SOCK_STREAM: sendspace = unpst_sendspace; recvspace = unpst_recvspace; + STAILQ_INIT(&so->so_rcv.sb_mbq); break; case SOCK_DGRAM: @@ -466,6 +469,7 @@ case SOCK_SEQPACKET: sendspace = unpsp_sendspace; recvspace = unpsp_recvspace; + STAILQ_INIT(&so->so_rcv.sb_mbq); break; default: @@ -797,6 +801,10 @@ taskqueue_enqueue_timeout(taskqueue_thread, &unp_gc_task, -1); switch (so->so_type) { + case SOCK_STREAM: + case SOCK_SEQPACKET: + MPASS(STAILQ_EMPTY(&so->so_rcv.sb_mbq)); + break; case SOCK_DGRAM: /* * Everything should have been unlinked/freed by unp_dispose() @@ -852,6 +860,10 @@ error = solisten_proto_check(so); if (error == 0) { cru2xt(td, &unp->unp_peercred); + (void)chgsbsize(so->so_cred->cr_uidinfo, &so->so_snd.sb_hiwat, + 0, RLIM_INFINITY); + (void)chgsbsize(so->so_cred->cr_uidinfo, &so->so_rcv.sb_hiwat, + 0, RLIM_INFINITY); solisten_proto(so, backlog); } SOCK_UNLOCK(so); @@ -885,187 +897,551 @@ return (0); } -static int -uipc_rcvd(struct socket *so, int flags) +/* + * pr_sosend() called with mbuf instead of uio is a kernel thread. NFS, + * netgraph(4) and other subsystems can call into socket code. The + * function will condition the mbuf so that it can be safely put onto socket + * buffer and calculate its char count and mbuf count. + * + * Note: we don't support receiving control data from a kernel thread. Our + * pr_sosend methods have MPASS() to check that. This may change. + */ +static void +uipc_reset_kernel_mbuf(struct mbuf *m, struct mchain *mc) { - struct unpcb *unp, *unp2; - struct socket *so2; - u_int mbcnt, sbcc; - unp = sotounpcb(so); - KASSERT(unp != NULL, ("%s: unp == NULL", __func__)); - KASSERT(so->so_type == SOCK_STREAM || so->so_type == SOCK_SEQPACKET, - ("%s: socktype %d", __func__, so->so_type)); + M_ASSERTPKTHDR(m); - /* - * Adjust backpressure on sender and wakeup any waiting to write. - * - * The unp lock is acquired to maintain the validity of the unp_conn - * pointer; no lock on unp2 is required as unp2->unp_socket will be - * static as long as we don't permit unp2 to disconnect from unp, - * which is prevented by the lock on unp. We cache values from - * so_rcv to avoid holding the so_rcv lock over the entire - * transaction on the remote so_snd. - */ - SOCKBUF_LOCK(&so->so_rcv); - mbcnt = so->so_rcv.sb_mbcnt; - sbcc = sbavail(&so->so_rcv); - SOCKBUF_UNLOCK(&so->so_rcv); - /* - * There is a benign race condition at this point. If we're planning to - * clear SB_STOP, but uipc_send is called on the connected socket at - * this instant, it might add data to the sockbuf and set SB_STOP. Then - * we would erroneously clear SB_STOP below, even though the sockbuf is - * full. The race is benign because the only ill effect is to allow the - * sockbuf to exceed its size limit, and the size limits are not - * strictly guaranteed anyway. - */ - UNP_PCB_LOCK(unp); - unp2 = unp->unp_conn; - if (unp2 == NULL) { - UNP_PCB_UNLOCK(unp); - return (0); + m_clrprotoflags(m); + m_tag_delete_chain(m, NULL); + m->m_pkthdr.rcvif = NULL; + m->m_pkthdr.flowid = 0; + m->m_pkthdr.csum_flags = 0; + m->m_pkthdr.fibnum = 0; + m->m_pkthdr.rsstype = 0; + + mc_init_m(mc, m); + MPASS(m->m_pkthdr.len == mc->mc_len); +} + +#ifdef INVARIANTS +static inline void +uipc_stream_sbcheck(struct sockbuf *sb) +{ + struct mbuf *d; + u_int dcc, dctl, dmbcnt; + + dcc = dctl = dmbcnt = 0; + STAILQ_FOREACH(d, &sb->sb_mbq, m_stailq) { + if (d->m_type == MT_CONTROL) + dctl += d->m_len; + else if (d->m_type == MT_DATA) + dcc += d->m_len; + else + MPASS(0); + dmbcnt += MSIZE; + if (d->m_flags & M_EXT) + dmbcnt += d->m_ext.ext_size; + if (d->m_stailq.stqe_next == NULL) + MPASS(sb->sb_mbq.stqh_last == &d->m_stailq.stqe_next); } - so2 = unp2->unp_socket; - SOCKBUF_LOCK(&so2->so_snd); - if (sbcc < so2->so_snd.sb_hiwat && mbcnt < so2->so_snd.sb_mbmax) - so2->so_snd.sb_flags &= ~SB_STOP; - sowwakeup_locked(so2); - UNP_PCB_UNLOCK(unp); - return (0); + MPASS(dcc == sb->sb_acc); + MPASS(dcc == sb->sb_ccc); + MPASS(dctl == sb->sb_ctl); + MPASS(dmbcnt == sb->sb_mbcnt); +} +#define UIPC_STREAM_SBCHECK(sb) uipc_stream_sbcheck(sb) +#else +#define UIPC_STREAM_SBCHECK(sb) do {} while (0) +#endif + +/* + * uipc_sbspace() returns how much a writer can send, limited by char count + * or mbuf memory use, whatever ends first. + * + * XXXGL: sb_mbcnt may overcommit sb_mbmax in case if previous write observed + * 'space < mbspace', but mchain allocated to hold 'space' bytes of data ended + * up with 'mc_mlen > mbspace'. A typical scenario would be a full buffer with + * writer trying to push in a large write, and a slow reader, that reads just + * a few bytes at a time. In that case writer will keep creating new mbufs + * with mc_split(). These mbufs will carry little chars, but will all point at + * the same cluster, thus each adding cluster size to sb_mbcnt. This means we + * will count same cluster many times potentially underutilizing socket buffer. + * We aren't optimizing towards ineffective readers. Classic socket buffer had + * the same "feature". + */ +static inline u_int +uipc_sbspace(struct sockbuf *sb) +{ + u_int space, mbspace; + + MPASS(sb->sb_hiwat >= sb->sb_ccc + sb->sb_ctl); + space = sb->sb_hiwat - sb->sb_ccc - sb->sb_ctl; + if (__predict_true(sb->sb_mbmax >= sb->sb_mbcnt)) + mbspace = sb->sb_mbmax - sb->sb_mbcnt; + else + return (0); + + return (min(space, mbspace)); } static int -uipc_send(struct socket *so, int flags, struct mbuf *m, struct sockaddr *nam, - struct mbuf *control, struct thread *td) +uipc_sosend_stream_or_seqpacket(struct socket *so, struct sockaddr *addr, + struct uio *uio, struct mbuf *m, struct mbuf *c, int flags, + struct thread *td) { struct unpcb *unp, *unp2; struct socket *so2; - u_int mbcnt, sbcc; + struct sockbuf *sb; + struct mchain mc, cmc; + ssize_t resid, sent; + bool nonblock, eor; int error; - unp = sotounpcb(so); - KASSERT(unp != NULL, ("%s: unp == NULL", __func__)); - KASSERT(so->so_type == SOCK_STREAM || so->so_type == SOCK_SEQPACKET, - ("%s: socktype %d", __func__, so->so_type)); + MPASS((uio != NULL && m == NULL) || (m != NULL && uio == NULL)); + MPASS(m == NULL || c == NULL); - error = 0; - if (flags & PRUS_OOB) { + if (__predict_false(flags & MSG_OOB)) { error = EOPNOTSUPP; - goto release; + goto out; } - if (control != NULL && - (error = unp_internalize(&control, td, NULL, NULL, NULL))) - goto release; - unp2 = NULL; - if ((so->so_state & SS_ISCONNECTED) == 0) { - if (nam != NULL) { - if ((error = unp_connect(so, nam, td)) != 0) - goto out; - } else { - error = ENOTCONN; + nonblock = (so->so_state & SS_NBIO) || + (flags & (MSG_DONTWAIT | MSG_NBIO)); + eor = flags & MSG_EOR; + + mc = MCHAIN_INITIALIZER(&mc); + cmc = MCHAIN_INITIALIZER(&cmc); + sent = 0; + + if (m == NULL) { + if (c != NULL && (error = unp_internalize(c, &cmc, td))) goto out; - } - } + /* + * Optimization for a case when our send fits into the receive + * buffer - do the copyout before taking any locks. + */ + resid = uio->uio_resid; + error = mc_uiotomc(&mc, uio, so->so_snd.sb_hiwat, 0, M_WAITOK, + eor ? M_EOR : 0); + if (__predict_false(error)) + goto out2; + } else + uipc_reset_kernel_mbuf(m, &mc); + error = SOCK_IO_SEND_LOCK(so, SBLOCKWAIT(flags)); + if (error) + goto out2; + + unp = sotounpcb(so); UNP_PCB_LOCK(unp); - if ((unp2 = unp_pcb_lock_peer(unp)) == NULL) { + if (__predict_false(so->so_error != 0)) { + error = so->so_error; + so->so_error = 0; UNP_PCB_UNLOCK(unp); - error = ENOTCONN; - goto out; - } else if (so->so_snd.sb_state & SBS_CANTSENDMORE) { - unp_pcb_unlock_pair(unp, unp2); - error = EPIPE; - goto out; + goto out3; } + unp2 = unp_pcb_lock_peer(unp); UNP_PCB_UNLOCK(unp); - if ((so2 = unp2->unp_socket) == NULL) { - UNP_PCB_UNLOCK(unp2); - error = ENOTCONN; - goto out; + if (unp2 == NULL) { + /* + * Different error code for a previously connected socket and + * a never connected one. The SS_ISDISCONNECTED is set in the + * unp_soisdisconnected() and is synchronized by the pcb lock. + */ + error = so->so_state & SS_ISDISCONNECTED ? EPIPE : ENOTCONN; + goto out3; } - SOCKBUF_LOCK(&so2->so_rcv); + if (unp2->unp_flags & UNP_WANTCRED_MASK) { /* * Credentials are passed only once on SOCK_STREAM and * SOCK_SEQPACKET (LOCAL_CREDS => WANTCRED_ONESHOT), or * forever (LOCAL_CREDS_PERSISTENT => WANTCRED_ALWAYS). */ - control = unp_addsockcred(td, control, unp2->unp_flags, NULL, - NULL, NULL); + unp_addsockcred(td, &cmc, unp2->unp_flags); unp2->unp_flags &= ~UNP_WANTCRED_ONESHOT; } /* - * Send to paired receive port and wake up readers. Don't - * check for space available in the receive buffer if we're - * attaching ancillary data; Unix domain sockets only check - * for space in the sending sockbuf, and that check is - * performed one level up the stack. At that level we cannot - * precisely account for the amount of buffer space used - * (e.g., because control messages are not yet internalized). + * Cycle through the data to send and available space in the peer's + * receive buffer. Put a reference on the peer socket, so that it + * doesn't get freed while we sbwait(). If peer goes away, we will + * observe the SBS_CANTRCVMORE and our sorele() will finalize peer's + * socket destruction. */ - switch (so->so_type) { - case SOCK_STREAM: - if (control != NULL) { - sbappendcontrol_locked(&so2->so_rcv, - m->m_len > 0 ? m : NULL, control, flags); - control = NULL; - } else - sbappend_locked(&so2->so_rcv, m, flags); - break; + so2 = unp2->unp_socket; + soref(so2); + UNP_PCB_UNLOCK(unp2); + sb = &so2->so_rcv; + while (mc.mc_len + cmc.mc_len > 0) { + struct mchain mcnext = MCHAIN_INITIALIZER(&mcnext); + u_int space; - case SOCK_SEQPACKET: - if (sbappendaddr_nospacecheck_locked(&so2->so_rcv, - &sun_noname, m, control)) - control = NULL; - break; + SOCK_RECVBUF_LOCK(so2); +restart: + UIPC_STREAM_SBCHECK(sb); + if (__predict_false(cmc.mc_len > sb->sb_hiwat)) { + SOCK_RECVBUF_UNLOCK(so2); + error = EMSGSIZE; + goto out4; + } + if (__predict_false(sb->sb_state & SBS_CANTRCVMORE)) { + SOCK_RECVBUF_UNLOCK(so2); + error = EPIPE; + goto out4; + } + /* + * Wait on the peer socket receive buffer until we have enough + * space to put at least control. The data is a stream and can + * be put partially, but control is really a datagram. + */ + space = uipc_sbspace(sb); + if (space < sb->sb_lowat || space < cmc.mc_len) { + if (nonblock) { + SOCK_RECVBUF_UNLOCK(so2); + error = EWOULDBLOCK; + goto out4; + } + if ((error = sbwait(so2, SO_RCV)) != 0) { + SOCK_RECVBUF_UNLOCK(so2); + goto out4; + } else + goto restart; + } + MPASS(space >= cmc.mc_len); + space -= cmc.mc_len; + if (space == 0) { + /* There is space only to send control. */ + MPASS(!STAILQ_EMPTY(&cmc.mc_q)); + mcnext = mc; + mc = MCHAIN_INITIALIZER(&mc); + } else if (space < mc.mc_len) { + /* Not enough space. */ + if (__predict_false(mc_split(&mc, &mcnext, space, + M_NOWAIT) == ENOMEM)) { + /* + * If allocation failed use M_WAITOK and merge + * the chain back. Next time mc_split() will + * easily split at the same place. Only if we + * race with setsockopt(SO_RCVBUF) shrinking + * sb_hiwat can this happen more than once. + */ + SOCK_RECVBUF_UNLOCK(so2); + (void)mc_split(&mc, &mcnext, space, M_WAITOK); + mc_concat(&mc, &mcnext); + SOCK_RECVBUF_LOCK(so2); + goto restart; + } + MPASS(mc.mc_len == space); + } + if (!STAILQ_EMPTY(&cmc.mc_q)) { + STAILQ_CONCAT(&sb->sb_mbq, &cmc.mc_q); + sb->sb_ctl += cmc.mc_len; + sb->sb_mbcnt += cmc.mc_mlen; + cmc.mc_len = 0; + } + sent += mc.mc_len; + sb->sb_acc += mc.mc_len; + sb->sb_ccc += mc.mc_len; + sb->sb_mbcnt += mc.mc_mlen; + STAILQ_CONCAT(&sb->sb_mbq, &mc.mc_q); + UIPC_STREAM_SBCHECK(sb); + space = uipc_sbspace(sb); + sorwakeup_locked(so2); + mc = mcnext; + if (STAILQ_EMPTY(&mc.mc_q) && + uio != NULL && uio->uio_resid > 0) { + /* Receive buffer space + virtual send buffer size. */ + error = mc_uiotomc(&mc, uio, + space + so->so_snd.sb_hiwat, 0, M_WAITOK, + eor ? M_EOR : 0); + if (__predict_false(error)) + goto out4; + } } - mbcnt = so2->so_rcv.sb_mbcnt; - sbcc = sbavail(&so2->so_rcv); - if (sbcc) - sorwakeup_locked(so2); - else - SOCKBUF_UNLOCK(&so2->so_rcv); + MPASS(STAILQ_EMPTY(&mc.mc_q)); - /* - * The PCB lock on unp2 protects the SB_STOP flag. Without it, - * it would be possible for uipc_rcvd to be called at this - * point, drain the receiving sockbuf, clear SB_STOP, and then - * we would set SB_STOP below. That could lead to an empty - * sockbuf having SB_STOP set - */ - SOCKBUF_LOCK(&so->so_snd); - if (sbcc >= so->so_snd.sb_hiwat || mbcnt >= so->so_snd.sb_mbmax) - so->so_snd.sb_flags |= SB_STOP; - SOCKBUF_UNLOCK(&so->so_snd); - UNP_PCB_UNLOCK(unp2); - m = NULL; + td->td_ru.ru_msgsnd++; +out4: + sorele(so2); +out3: + SOCK_IO_SEND_UNLOCK(so); +out2: + if (!mc_empty(&cmc)) + unp_scan(mc_first(&cmc), unp_freerights); out: + mc_freem(&mc); + mc_freem(&cmc); + + if (uio != NULL) + uio->uio_resid = resid - sent; + + return (error); +} + +static int +uipc_soreceive_stream_or_seqpacket(struct socket *so, struct sockaddr **psa, + struct uio *uio, struct mbuf **mp0, struct mbuf **controlp, int *flagsp) +{ + struct sockbuf *sb = &so->so_rcv; + struct mbuf *control, *m, *first, *last, *next; + u_int ctl, space, datalen, mbcnt, lastlen; + int error, flags; + bool nonblock, waitall, peek; + + MPASS(mp0 == NULL); + + if (psa != NULL) + *psa = NULL; + if (controlp != NULL) + *controlp = NULL; + + flags = flagsp != NULL ? *flagsp : 0; + nonblock = (so->so_state & SS_NBIO) || + (flags & (MSG_DONTWAIT | MSG_NBIO)); + peek = flags & MSG_PEEK; + waitall = (flags & MSG_WAITALL) && !peek; + + if (__predict_false((so->so_state & + (SS_ISCONNECTED|SS_ISDISCONNECTED)) == 0)) + return (ENOTCONN); + + error = SOCK_IO_RECV_LOCK(so, SBLOCKWAIT(flags)); + if (__predict_false(error)) + return (error); + +restart: + SOCK_RECVBUF_LOCK(so); + UIPC_STREAM_SBCHECK(sb); + while (sb->sb_acc < sb->sb_lowat && + (sb->sb_ctl == 0 || controlp == NULL)) { + if (so->so_error) { + error = so->so_error; + if (!peek) + so->so_error = 0; + SOCK_RECVBUF_UNLOCK(so); + SOCK_IO_RECV_UNLOCK(so); + return (error); + } + if (sb->sb_state & SBS_CANTRCVMORE) { + SOCK_RECVBUF_UNLOCK(so); + SOCK_IO_RECV_UNLOCK(so); + return (0); + } + if (nonblock) { + SOCK_RECVBUF_UNLOCK(so); + SOCK_IO_RECV_UNLOCK(so); + return (EWOULDBLOCK); + } + error = sbwait(so, SO_RCV); + if (error) { + SOCK_RECVBUF_UNLOCK(so); + SOCK_IO_RECV_UNLOCK(so); + return (error); + } + } + + MPASS(STAILQ_FIRST(&sb->sb_mbq)); + MPASS(sb->sb_acc > 0 || sb->sb_ctl > 0); + + mbcnt = 0; + ctl = 0; + first = STAILQ_FIRST(&sb->sb_mbq); + if (first->m_type == MT_CONTROL) { + control = first; + STAILQ_FOREACH_FROM(first, &sb->sb_mbq, m_stailq) { + if (first->m_type != MT_CONTROL) + break; + ctl += first->m_len; + mbcnt += MSIZE; + if (first->m_flags & M_EXT) + mbcnt += first->m_ext.ext_size; + } + } else + control = NULL; + /* - * PRUS_EOF is equivalent to pr_send followed by pr_shutdown. + * Find split point for the next copyout. On exit from the cycle: + * last == NULL - socket to be flushed + * last != NULL + * lastlen > last->m_len - uio to be filled, last to be adjusted + * lastlen == 0 - MT_CONTROL or M_EOR encountered */ - if (flags & PRUS_EOF) { - UNP_PCB_LOCK(unp); - socantsendmore(so); - unp_shutdown(unp); - UNP_PCB_UNLOCK(unp); + space = uio->uio_resid; + datalen = 0; + for (m = first, last = NULL; m != NULL; m = STAILQ_NEXT(m, m_stailq)) { + if (m->m_type != MT_DATA) { + last = m; + lastlen = 0; + break; + } + if (space >= m->m_len) { + space -= m->m_len; + datalen += m->m_len; + mbcnt += MSIZE; + if (m->m_flags & M_EXT) + mbcnt += m->m_ext.ext_size; + if (m->m_flags & M_EOR) { + last = STAILQ_NEXT(m, m_stailq); + lastlen = 0; + flags |= MSG_EOR; + break; + } + } else { + datalen += space; + last = m; + lastlen = space; + break; + } } - if (control != NULL && error != 0) - unp_scan(control, unp_freerights); -release: - if (control != NULL) - m_freem(control); - /* - * In case of PRUS_NOTREADY, uipc_ready() is responsible - * for freeing memory. - */ - if (m != NULL && (flags & PRUS_NOTREADY) == 0) - m_freem(m); - return (error); + UIPC_STREAM_SBCHECK(sb); + if (!peek) { + if (last == NULL) + STAILQ_INIT(&sb->sb_mbq); + else { + STAILQ_FIRST(&sb->sb_mbq) = last; + MPASS(last->m_len > lastlen); + last->m_len -= lastlen; + last->m_data += lastlen; + } + MPASS(sb->sb_acc >= datalen); + sb->sb_acc -= datalen; + sb->sb_ccc -= datalen; + MPASS(sb->sb_ctl >= ctl); + sb->sb_ctl -= ctl; + MPASS(sb->sb_mbcnt >= mbcnt); + sb->sb_mbcnt -= mbcnt; + UIPC_STREAM_SBCHECK(sb); + /* Mind the name. We are waking writer here, not reader. */ + sorwakeup_locked(so); + } else + SOCK_RECVBUF_UNLOCK(so); + + while (control != NULL && control->m_type == MT_CONTROL) { + if (!peek) { + struct mbuf *c; + + /* + * unp_externalize() failure must abort entire read(2). + * Such failure should also free the problematic + * control, so that socket is not left in a state + * where it can't progress forward with reading. + * Probability of such a failure is really low, so it + * is fine that we need to perform pretty complex + * operation here to reconstruct the buffer. This + * should be safe as we own top of the queue due to the + * sx(9) lock, but we need check if we raced with + * shutdown(2). + * XXXGL: unp_externalize() used to be + * dom_externalize() KBI and it frees whole chain, so + * we need to feed it with mbufs one by one. + */ + c = control; + control = STAILQ_NEXT(c, m_stailq); + STAILQ_NEXT(c, m_stailq) = NULL; + error = unp_externalize(c, controlp, flags); + if (__predict_false(error)) { + SOCK_RECVBUF_LOCK(so); + UIPC_STREAM_SBCHECK(sb); + if (__predict_false(sb->sb_state & + SBS_CANTRCVMORE)) { + unp_scan(control, unp_freerights); + m_freem(control); + } else { + /* XXXGL: STAILQ_PREPEND */ + if (STAILQ_EMPTY(&sb->sb_mbq)) + STAILQ_INSERT_HEAD(&sb->sb_mbq, + control, m_stailq); + else + STAILQ_FIRST(&sb->sb_mbq) = + control; + sb->sb_ctl = sb->sb_acc = sb->sb_ccc = + sb->sb_mbcnt = 0; + STAILQ_FOREACH(m, &sb->sb_mbq, + m_stailq) { + if (m->m_type == MT_DATA) { + sb->sb_acc += m->m_len; + sb->sb_ccc += m->m_len; + } else { + sb->sb_ctl += m->m_len; + } + sb->sb_mbcnt += MSIZE; + if (m->m_flags & M_EXT) + sb->sb_mbcnt += + m->m_ext.ext_size; + } + } + UIPC_STREAM_SBCHECK(sb); + SOCK_RECVBUF_UNLOCK(so); + SOCK_IO_RECV_UNLOCK(so); + return (error); + } + if (controlp != NULL) { + while (*controlp != NULL) + controlp = &(*controlp)->m_next; + } + } else { + /* + * XXXGL + * + * In MSG_PEEK case control is not externalized. This + * means we are leaking some kernel pointers to the + * userland. They are useless to a law-abiding + * application, but may be useful to a malware. This + * is what the historical implementation in the + * soreceive_generic() did. To be improved? + */ + if (controlp != NULL) { + *controlp = m_copym(control, 0, control->m_len, + M_WAITOK); + controlp = &(*controlp)->m_next; + } + control = STAILQ_NEXT(control, m_stailq); + } + } + + for (m = first; m != last; m = next) { + next = STAILQ_NEXT(m, m_stailq); + error = uiomove(mtod(m, char *), m->m_len, uio); + if (__predict_false(error)) { + SOCK_IO_RECV_UNLOCK(so); + if (!peek) + for (;m != last; m = next) { + next = STAILQ_NEXT(m, m_stailq); + m_free(m); + } + return (error); + } + if (!peek) + m_free(m); + } + if (last != NULL && lastlen > 0) { + if (!peek) { + MPASS(!(m->m_flags & M_PKTHDR)); + MPASS(last->m_data - (last->m_flags & M_EXT ? + last->m_ext.ext_buf : last->m_dat) >= lastlen); + error = uiomove(mtod(last, char *) - lastlen, + lastlen, uio); + } else + error = uiomove(mtod(last, char *), lastlen, uio); + if (__predict_false(error)) { + SOCK_IO_RECV_UNLOCK(so); + return (error); + } + } + if (waitall && !(flags & MSG_EOR) && uio->uio_resid > 0) + goto restart; + SOCK_IO_RECV_UNLOCK(so); + + if (flagsp != NULL) + *flagsp |= flags; + + uio->uio_td->td_ru.ru_msgrcv++; + + return (0); } /* PF_UNIX/SOCK_DGRAM version of sbspace() */ @@ -1111,7 +1487,8 @@ const struct sockaddr *from; struct socket *so2; struct sockbuf *sb; - struct mbuf *f, *clast; + struct mchain cmc = MCHAIN_INITIALIZER(&cmc); + struct mbuf *f; u_int cc, ctl, mbcnt; u_int dcc __diagused, dctl __diagused, dmbcnt __diagused; int error; @@ -1120,7 +1497,6 @@ error = 0; f = NULL; - ctl = 0; if (__predict_false(flags & MSG_OOB)) { error = EOPNOTSUPP; @@ -1139,16 +1515,14 @@ f = m_gethdr(M_WAITOK, MT_SONAME); cc = m->m_pkthdr.len; mbcnt = MSIZE + m->m_pkthdr.memlen; - if (c != NULL && - (error = unp_internalize(&c, td, &clast, &ctl, &mbcnt))) + if (c != NULL && (error = unp_internalize(c, &cmc, td))) goto out; } else { - /* pr_sosend() with mbuf usually is a kernel thread. */ - - M_ASSERTPKTHDR(m); - if (__predict_false(c != NULL)) - panic("%s: control from a kernel thread", __func__); + struct mchain mc; + uipc_reset_kernel_mbuf(m, &mc); + cc = mc.mc_len; + mbcnt = mc.mc_mlen; if (__predict_false(m->m_pkthdr.len > unpdg_maxdgram)) { error = EMSGSIZE; goto out; @@ -1157,22 +1531,6 @@ error = ENOBUFS; goto out; } - /* Condition the foreign mbuf to our standards. */ - m_clrprotoflags(m); - m_tag_delete_chain(m, NULL); - m->m_pkthdr.rcvif = NULL; - m->m_pkthdr.flowid = 0; - m->m_pkthdr.csum_flags = 0; - m->m_pkthdr.fibnum = 0; - m->m_pkthdr.rsstype = 0; - - cc = m->m_pkthdr.len; - mbcnt = MSIZE; - for (struct mbuf *mb = m; mb != NULL; mb = mb->m_next) { - mbcnt += MSIZE; - if (mb->m_flags & M_EXT) - mbcnt += mb->m_ext.ext_size; - } } unp = sotounpcb(so); @@ -1224,8 +1582,7 @@ } if (unp2->unp_flags & UNP_WANTCRED_MASK) - c = unp_addsockcred(td, c, unp2->unp_flags, &clast, &ctl, - &mbcnt); + unp_addsockcred(td, &cmc, unp2->unp_flags); if (unp->unp_addr != NULL) from = (struct sockaddr *)unp->unp_addr; else @@ -1233,25 +1590,21 @@ f->m_len = from->sa_len; MPASS(from->sa_len <= MLEN); bcopy(from, mtod(f, void *), from->sa_len); - ctl += f->m_len; /* * Concatenate mbufs: from -> control -> data. * Save overall cc and mbcnt in "from" mbuf. */ - if (c != NULL) { -#ifdef INVARIANTS - struct mbuf *mc; - - for (mc = c; mc->m_next != NULL; mc = mc->m_next); - MPASS(mc == clast); -#endif - f->m_next = c; - clast->m_next = m; - c = NULL; + if (!STAILQ_EMPTY(&cmc.mc_q)) { + f->m_next = mc_first(&cmc); + mc_last(&cmc)->m_next = m; + /* XXXGL: This is dirty as well as rollback after ENOBUFS. */ + STAILQ_INIT(&cmc.mc_q); } else f->m_next = m; m = NULL; + ctl = f->m_len + cmc.mc_len; + mbcnt += cmc.mc_mlen; #ifdef INVARIANTS dcc = dctl = dmbcnt = 0; for (struct mbuf *mb = f; mb != NULL; mb = mb->m_next) { @@ -1317,7 +1670,7 @@ soroverflow_locked(so2); error = ENOBUFS; if (f->m_next->m_type == MT_CONTROL) { - c = f->m_next; + STAILQ_FIRST(&cmc.mc_q) = f->m_next; f->m_next = NULL; } } @@ -1332,13 +1685,12 @@ out3: SOCK_IO_SEND_UNLOCK(so); out2: - if (c) - unp_scan(c, unp_freerights); + if (!mc_empty(&cmc)) + unp_scan(mc_first(&cmc), unp_freerights); out: if (f) m_freem(f); - if (c) - m_freem(c); + mc_freem(&cmc); if (m) m_freem(m); @@ -1579,6 +1931,7 @@ return (0); } +#if 0 /* No sendfile support. */ static bool uipc_ready_scan(struct socket *so, struct mbuf *m, int count, int *errorp) { @@ -1658,6 +2011,7 @@ } return (error); } +#endif static int uipc_sense(struct socket *so, struct stat *sb) @@ -2096,6 +2450,19 @@ } } +static void +unp_soisdisconnected(struct socket *so) +{ + SOCK_LOCK(so); + MPASS(!SOLISTENING(so)); + so->so_state |= SS_ISDISCONNECTED; + so->so_state &= ~SS_ISCONNECTED; + SOCK_RECVBUF_LOCK(so); + socantrcvmore_locked(so); + SOCK_UNLOCK(so); + wakeup(&so->so_timeo); /* XXXGL: is this needed? */ +} + static void unp_disconnect(struct unpcb *unp, struct unpcb *unp2) { @@ -2168,12 +2535,10 @@ case SOCK_STREAM: case SOCK_SEQPACKET: - if (so) - soisdisconnected(so); + unp_soisdisconnected(so); MPASS(unp2->unp_conn == unp); unp2->unp_conn = NULL; - if (so2) - soisdisconnected(so2); + unp_soisdisconnected(so2); break; } @@ -2378,13 +2743,12 @@ /* * Regardless of whether the socket's peer dropped the connection * with this socket by aborting or disconnecting, POSIX requires - * that ECONNRESET is returned. + * that ECONNRESET is returned on next connected send(2) in case of + * a SOCK_DGRAM socket and EPIPE for SOCK_STREAM. */ - UNP_PCB_LOCK(unp); so = unp->unp_socket; - if (so) - so->so_error = ECONNRESET; + so->so_error = so->so_proto->pr_type == SOCK_DGRAM ? ECONNRESET : EPIPE; if ((unp2 = unp_pcb_lock_peer(unp)) != NULL) { /* Last reference dropped in unp_disconnect(). */ unp_pcb_rele_notlast(unp); @@ -2584,15 +2948,14 @@ } static int -unp_internalize(struct mbuf **controlp, struct thread *td, - struct mbuf **clast, u_int *space, u_int *mbcnt) +unp_internalize(struct mbuf *control, struct mchain *mc, struct thread *td) { - struct mbuf *control, **initial_controlp; struct proc *p; struct filedesc *fdesc; struct bintime *bt; struct cmsghdr *cm; struct cmsgcred *cmcred; + struct mbuf *m; struct filedescent *fde, **fdep, *fdev; struct file *fp; struct timeval *tv; @@ -2602,15 +2965,13 @@ int i, j, error, *fdp, oldfds; u_int newlen; - MPASS((*controlp)->m_next == NULL); /* COMPAT_OLDSOCK may violate */ + MPASS(control->m_next == NULL); /* COMPAT_OLDSOCK may violate */ UNP_LINK_UNLOCK_ASSERT(); p = td->td_proc; fdesc = p->p_fd; error = 0; - control = *controlp; - *controlp = NULL; - initial_controlp = controlp; + *mc = MCHAIN_INITIALIZER(mc); for (clen = control->m_len, cm = mtod(control, struct cmsghdr *), data = CMSG_DATA(cm); @@ -2624,10 +2985,10 @@ datalen = (char *)cm + cm->cmsg_len - (char *)data; switch (cm->cmsg_type) { case SCM_CREDS: - *controlp = sbcreatecontrol(NULL, sizeof(*cmcred), - SCM_CREDS, SOL_SOCKET, M_WAITOK); + m = sbcreatecontrol(NULL, sizeof(*cmcred), SCM_CREDS, + SOL_SOCKET, M_WAITOK); cmcred = (struct cmsgcred *) - CMSG_DATA(mtod(*controlp, struct cmsghdr *)); + CMSG_DATA(mtod(m, struct cmsghdr *)); cmcred->cmcred_pid = p->p_pid; cmcred->cmcred_uid = td->td_ucred->cr_ruid; cmcred->cmcred_gid = td->td_ucred->cr_rgid; @@ -2680,8 +3041,8 @@ * Now replace the integer FDs with pointers to the * file structure and capability rights. */ - *controlp = sbcreatecontrol(NULL, newlen, - SCM_RIGHTS, SOL_SOCKET, M_WAITOK); + m = sbcreatecontrol(NULL, newlen, SCM_RIGHTS, + SOL_SOCKET, M_WAITOK); fdp = data; for (i = 0; i < oldfds; i++, fdp++) { if (!fhold(fdesc->fd_ofiles[*fdp].fde_file)) { @@ -2697,7 +3058,7 @@ } fdp = data; fdep = (struct filedescent **) - CMSG_DATA(mtod(*controlp, struct cmsghdr *)); + CMSG_DATA(mtod(m, struct cmsghdr *)); fdev = malloc(sizeof(*fdev) * oldfds, M_FILECAPS, M_WAITOK); for (i = 0; i < oldfds; i++, fdev++, fdp++) { @@ -2712,34 +3073,34 @@ break; case SCM_TIMESTAMP: - *controlp = sbcreatecontrol(NULL, sizeof(*tv), - SCM_TIMESTAMP, SOL_SOCKET, M_WAITOK); + m = sbcreatecontrol(NULL, sizeof(*tv), SCM_TIMESTAMP, + SOL_SOCKET, M_WAITOK); tv = (struct timeval *) - CMSG_DATA(mtod(*controlp, struct cmsghdr *)); + CMSG_DATA(mtod(m, struct cmsghdr *)); microtime(tv); break; case SCM_BINTIME: - *controlp = sbcreatecontrol(NULL, sizeof(*bt), - SCM_BINTIME, SOL_SOCKET, M_WAITOK); + m = sbcreatecontrol(NULL, sizeof(*bt), SCM_BINTIME, + SOL_SOCKET, M_WAITOK); bt = (struct bintime *) - CMSG_DATA(mtod(*controlp, struct cmsghdr *)); + CMSG_DATA(mtod(m, struct cmsghdr *)); bintime(bt); break; case SCM_REALTIME: - *controlp = sbcreatecontrol(NULL, sizeof(*ts), - SCM_REALTIME, SOL_SOCKET, M_WAITOK); + m = sbcreatecontrol(NULL, sizeof(*ts), SCM_REALTIME, + SOL_SOCKET, M_WAITOK); ts = (struct timespec *) - CMSG_DATA(mtod(*controlp, struct cmsghdr *)); + CMSG_DATA(mtod(m, struct cmsghdr *)); nanotime(ts); break; case SCM_MONOTONIC: - *controlp = sbcreatecontrol(NULL, sizeof(*ts), - SCM_MONOTONIC, SOL_SOCKET, M_WAITOK); + m = sbcreatecontrol(NULL, sizeof(*ts), SCM_MONOTONIC, + SOL_SOCKET, M_WAITOK); ts = (struct timespec *) - CMSG_DATA(mtod(*controlp, struct cmsghdr *)); + CMSG_DATA(mtod(m, struct cmsghdr *)); nanouptime(ts); break; @@ -2748,28 +3109,20 @@ goto out; } - if (space != NULL) { - *space += (*controlp)->m_len; - *mbcnt += MSIZE; - if ((*controlp)->m_flags & M_EXT) - *mbcnt += (*controlp)->m_ext.ext_size; - *clast = *controlp; - } - controlp = &(*controlp)->m_next; + mc_append(mc, m); } if (clen > 0) error = EINVAL; out: - if (error != 0 && initial_controlp != NULL) - unp_internalize_cleanup_rights(*initial_controlp); + if (error != 0) + unp_internalize_cleanup_rights(mc_first(mc)); m_freem(control); return (error); } -static struct mbuf * -unp_addsockcred(struct thread *td, struct mbuf *control, int mode, - struct mbuf **clast, u_int *space, u_int *mbcnt) +static void +unp_addsockcred(struct thread *td, struct mchain *mc, int mode) { struct mbuf *m, *n, *n_prev; const struct cmsghdr *cm; @@ -2785,9 +3138,10 @@ cmsgtype = SCM_CREDS; } + /* XXXGL: uipc_sosend_*() need to be improved so that we can M_WAITOK */ m = sbcreatecontrol(NULL, ctrlsz, cmsgtype, SOL_SOCKET, M_NOWAIT); if (m == NULL) - return (control); + return; MPASS((m->m_flags & M_EXT) == 0 && m->m_next == NULL); if (mode & UNP_WANTCRED_ALWAYS) { @@ -2821,50 +3175,18 @@ * created SCM_CREDS control message (struct sockcred) has another * format. */ - if (control != NULL && cmsgtype == SCM_CREDS) - for (n = control, n_prev = NULL; n != NULL;) { + if (!STAILQ_EMPTY(&mc->mc_q) && cmsgtype == SCM_CREDS) + STAILQ_FOREACH_SAFE(n, &mc->mc_q, m_stailq, n_prev) { cm = mtod(n, struct cmsghdr *); if (cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SCM_CREDS) { - if (n_prev == NULL) - control = n->m_next; - else - n_prev->m_next = n->m_next; - if (space != NULL) { - MPASS(*space >= n->m_len); - *space -= n->m_len; - MPASS(*mbcnt >= MSIZE); - *mbcnt -= MSIZE; - if (n->m_flags & M_EXT) { - MPASS(*mbcnt >= - n->m_ext.ext_size); - *mbcnt -= n->m_ext.ext_size; - } - MPASS(clast); - if (*clast == n) { - MPASS(n->m_next == NULL); - if (n_prev == NULL) - *clast = m; - else - *clast = n_prev; - } - } - n = m_free(n); - } else { - n_prev = n; - n = n->m_next; + mc_remove(mc, n); + m_free(n); } } /* Prepend it to the head. */ - m->m_next = control; - if (space != NULL) { - *space += m->m_len; - *mbcnt += MSIZE; - if (control == NULL) - *clast = m; - } - return (m); + mc_prepend(mc, m); } static struct unpcb * @@ -3032,7 +3354,7 @@ break; case SOCK_STREAM: case SOCK_SEQPACKET: - unp_scan(so->so_rcv.sb_mb, op); + unp_scan(STAILQ_FIRST(&so->so_rcv.sb_mbq), op); break; } SOCK_RECVBUF_UNLOCK(so); @@ -3242,39 +3564,33 @@ } m = STAILQ_FIRST(&sb->uxdg_mb); STAILQ_INIT(&sb->uxdg_mb); - /* XXX: our shortened sbrelease() */ - (void)chgsbsize(so->so_cred->cr_uidinfo, &sb->sb_hiwat, 0, - RLIM_INFINITY); - /* - * XXXGL Mark sb with SBS_CANTRCVMORE. This is needed to - * prevent uipc_sosend_dgram() or unp_disconnect() adding more - * data to the socket. - * We came here either through shutdown(2) or from the final - * sofree(). The sofree() case is simple as it guarantees - * that no more sends will happen, however we can race with - * unp_disconnect() from our peer. The shutdown(2) case is - * more exotic. It would call into unp_dispose() only if - * socket is SS_ISCONNECTED. This is possible if we did - * connect(2) on this socket and we also had it bound with - * bind(2) and receive connections from other sockets. - * Because uipc_shutdown() violates POSIX (see comment - * there) we will end up here shutting down our receive side. - * Of course this will have affect not only on the peer we - * connect(2)ed to, but also on all of the peers who had - * connect(2)ed to us. Their sends would end up with ENOBUFS. - */ - sb->sb_state |= SBS_CANTRCVMORE; break; case SOCK_STREAM: case SOCK_SEQPACKET: sb = &so->so_rcv; - m = sbcut_locked(sb, sb->sb_ccc); - KASSERT(sb->sb_ccc == 0 && sb->sb_mb == 0 && sb->sb_mbcnt == 0, - ("%s: ccc %u mb %p mbcnt %u", __func__, - sb->sb_ccc, (void *)sb->sb_mb, sb->sb_mbcnt)); - sbrelease_locked(so, SO_RCV); + m = STAILQ_FIRST(&sb->sb_mbq); + STAILQ_FIRST(&sb->sb_mbq) = NULL; break; } + /* + * Mark sb with SBS_CANTRCVMORE. This is needed to prevent + * uipc_sosend_*() or unp_disconnect() adding more data to the socket. + * We came here either through shutdown(2) or from the final sofree(). + * The sofree() case is simple as it guarantees that no more sends will + * happen, however we can race with unp_disconnect() from our peer. + * The shutdown(2) case is more exotic. It would call into + * unp_dispose() only if socket is SS_ISCONNECTED. This is possible if + * we did connect(2) on this socket and we also had it bound with + * bind(2) and receive connections from other sockets. Because + * uipc_shutdown() violates POSIX (see comment there) this applies to + * SOCK_DGRAM as well. For SOCK_DGRAM this SBS_CANTRCVMORE will have + * affect not only on the peer we connect(2)ed to, but also on all of + * the peers who had connect(2)ed to us. Their sends would end up + * with ENOBUFS. + */ + sb->sb_state |= SBS_CANTRCVMORE; + (void)chgsbsize(so->so_cred->cr_uidinfo, &sb->sb_hiwat, 0, + RLIM_INFINITY); SOCK_RECVBUF_UNLOCK(so); SOCK_IO_RECV_UNLOCK(so); @@ -3333,7 +3649,7 @@ */ static struct protosw streamproto = { .pr_type = SOCK_STREAM, - .pr_flags = PR_CONNREQUIRED | PR_WANTRCVD | PR_CAPATTACH, + .pr_flags = PR_CONNREQUIRED | PR_CAPATTACH | PR_SOCKBUF, .pr_ctloutput = &uipc_ctloutput, .pr_abort = uipc_abort, .pr_accept = uipc_peeraddr, @@ -3347,13 +3663,11 @@ .pr_disconnect = uipc_disconnect, .pr_listen = uipc_listen, .pr_peeraddr = uipc_peeraddr, - .pr_rcvd = uipc_rcvd, - .pr_send = uipc_send, - .pr_ready = uipc_ready, .pr_sense = uipc_sense, .pr_shutdown = uipc_shutdown, .pr_sockaddr = uipc_sockaddr, - .pr_soreceive = soreceive_generic, + .pr_sosend = uipc_sosend_stream_or_seqpacket, + .pr_soreceive = uipc_soreceive_stream_or_seqpacket, .pr_close = uipc_close, }; @@ -3382,13 +3696,7 @@ static struct protosw seqpacketproto = { .pr_type = SOCK_SEQPACKET, - /* - * XXXRW: For now, PR_ADDR because soreceive will bump into them - * due to our use of sbappendaddr. A new sbappend variants is needed - * that supports both atomic record writes and control data. - */ - .pr_flags = PR_ADDR | PR_ATOMIC | PR_CONNREQUIRED | - PR_WANTRCVD | PR_CAPATTACH, + .pr_flags = PR_CONNREQUIRED | PR_CAPATTACH | PR_SOCKBUF, .pr_ctloutput = &uipc_ctloutput, .pr_abort = uipc_abort, .pr_accept = uipc_peeraddr, @@ -3402,12 +3710,11 @@ .pr_disconnect = uipc_disconnect, .pr_listen = uipc_listen, .pr_peeraddr = uipc_peeraddr, - .pr_rcvd = uipc_rcvd, - .pr_send = uipc_send, .pr_sense = uipc_sense, .pr_shutdown = uipc_shutdown, .pr_sockaddr = uipc_sockaddr, - .pr_soreceive = soreceive_generic, /* XXX: or...? */ + .pr_sosend = uipc_sosend_stream_or_seqpacket, + .pr_soreceive = uipc_soreceive_stream_or_seqpacket, .pr_close = uipc_close, }; diff --git a/sys/sys/sockbuf.h b/sys/sys/sockbuf.h --- a/sys/sys/sockbuf.h +++ b/sys/sys/sockbuf.h @@ -130,6 +130,13 @@ uint64_t sb_tls_seqno; /* TLS seqno */ struct ktls_session *sb_tls_info; /* TLS state */ }; + /* + * PF_UNIX/SOCK_STREAM and PF_UNIX/SOCK_SEQPACKET + * A most simple stream buffer. + */ + struct { + STAILQ_HEAD(, mbuf) sb_mbq; + }; /* * PF_UNIX/SOCK_DGRAM *