diff --git a/lib/libsys/getsockopt.2 b/lib/libsys/getsockopt.2 --- a/lib/libsys/getsockopt.2 +++ b/lib/libsys/getsockopt.2 @@ -25,7 +25,7 @@ .\" OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF .\" SUCH DAMAGE. .\" -.Dd February 8, 2021 +.Dd July 8, 2024 .Dt GETSOCKOPT 2 .Os .Sh NAME @@ -191,6 +191,7 @@ .It Dv SO_MAX_PACING_RATE Ta "set the maximum transmit rate in bytes per second for the socket" .It Dv SO_NO_OFFLOAD Ta "disables protocol offloads" .It Dv SO_NO_DDP Ta "disables direct data placement offload" +.It Dv SO_SPLICE Ta "splice two sockets together" .El .Pp .Dv SO_DEBUG @@ -551,6 +552,56 @@ reassembled TCP data streams to be received via zero-copy in user-supplied buffers using .Xr aio_read 2 . +.Pp +.Dv SO_SPLICE , +when passed to +.Fn setsockopt , +splices two sockets together using the following +.Fa optval : +.Bd -literal +struct so_splice { + int sp_fd; + off_t sp_max; + struct timeval sp_idle; +}; +.Ed +.Pp +Data received on +.Fa s +will automatically be transmitted from the socket specified in +.Fa sp_fd +without any intervention by userspace. +Splicing is a one-way operation; a given pair of sockets may be +spliced in one or both directions. +Currently only connected +.Xr tcp 4 +sockets may be spliced together. +If +.Fa sp_max +is greater than zero, the socket pair will automatically be unspliced +once that number of bytes have been transmitted. +If +.Fa sp_idle +is non-zero, the socket pair will automatically be unspliced once the +specified amount of time has elapsed since the initial call to +.Fn setsockopt . +If +.Fa sp_fd +is -1, the socket will be unspliced immediately. +.Pp +When passed to +.Fn getsockopt , +the +.Dv SO_SPLICE +option returns a 64-bit integer containing the number of bytes transmitted by +the most recent splice. +That is, while the socket is spliced, the value returned will be the number +of bytes spliced so far. +When unsplicing, this value is saved and is returned until the socket is closed +or spliced again. +For example, if a splice transmits 100 bytes and is then unspliced, a subsequent +.Nm getsockopt +call will return 100 until the socket is spliced again. .Sh RETURN VALUES .Rv -std .Sh ERRORS @@ -618,5 +669,12 @@ .Fn setsockopt system calls appeared in .Bx 4.2 . +The +.Dv SO_SPLICE +option originated in +.Ox . +The +.Fx +implementation aims to be compatible. .Sh BUGS Several of the socket options should be handled at lower levels of the system. diff --git a/sys/kern/uipc_sockbuf.c b/sys/kern/uipc_sockbuf.c --- a/sys/kern/uipc_sockbuf.c +++ b/sys/kern/uipc_sockbuf.c @@ -508,6 +508,32 @@ SOCK_BUF_UNLOCK_ASSERT(so, which); } +static void +splice_push(struct socket *so) +{ + struct so_splice *sp; + + SOCK_RECVBUF_LOCK_ASSERT(so); + + sp = so->so_splice; + mtx_lock(&sp->mtx); + SOCK_RECVBUF_UNLOCK(so); + so_splice_dispatch(sp); +} + +static void +splice_pull(struct socket *so) +{ + struct so_splice *sp; + + SOCK_SENDBUF_LOCK_ASSERT(so); + + sp = so->so_splice_back; + mtx_lock(&sp->mtx); + SOCK_SENDBUF_UNLOCK(so); + so_splice_dispatch(sp); +} + /* * Do we need to notify the other side when I/O is possible? */ @@ -522,7 +548,9 @@ sorwakeup_locked(struct socket *so) { SOCK_RECVBUF_LOCK_ASSERT(so); - if (sb_notify(&so->so_rcv)) + if (so->so_rcv.sb_flags & SB_SPLICED) + splice_push(so); + else if (sb_notify(&so->so_rcv)) sowakeup(so, SO_RCV); else SOCK_RECVBUF_UNLOCK(so); @@ -532,7 +560,9 @@ sowwakeup_locked(struct socket *so) { SOCK_SENDBUF_LOCK_ASSERT(so); - if (sb_notify(&so->so_snd)) + if (so->so_snd.sb_flags & SB_SPLICED) + splice_pull(so); + else if (sb_notify(&so->so_snd)) sowakeup(so, SO_SND); else SOCK_SENDBUF_UNLOCK(so); diff --git a/sys/kern/uipc_socket.c b/sys/kern/uipc_socket.c --- a/sys/kern/uipc_socket.c +++ b/sys/kern/uipc_socket.c @@ -122,6 +122,7 @@ #include #include #include +#include #include #include #include @@ -134,6 +135,7 @@ #include #include #include +#include #include #include #include @@ -159,8 +161,17 @@ #include #endif +static int soreceive_generic_locked(struct socket *so, + struct sockaddr **psa, struct uio *uio, struct mbuf **mp, + struct mbuf **controlp, int *flagsp); static int soreceive_rcvoob(struct socket *so, struct uio *uio, int flags); +static int soreceive_stream_locked(struct socket *so, struct sockbuf *sb, + struct sockaddr **psa, struct uio *uio, struct mbuf **mp, + struct mbuf **controlp, int flags); +static int sosend_generic_locked(struct socket *so, struct sockaddr *addr, + struct uio *uio, struct mbuf *top, struct mbuf *control, + int flags, struct thread *td); static void so_rdknl_lock(void *); static void so_rdknl_unlock(void *); static void so_rdknl_assert_lock(void *, int); @@ -278,6 +289,393 @@ maxsockets = uma_zone_set_max(socket_zone, maxsockets); } +static int splice_init_state; +static struct sx splice_init_lock; +SX_SYSINIT(splice_init_lock, &splice_init_lock, "splice_init"); + +static SYSCTL_NODE(_kern_ipc, OID_AUTO, splice, CTLFLAG_RW, 0, + "Settings relating to the SO_SPLICE socket option"); + +static bool splice_receive_stream = true; +SYSCTL_BOOL(_kern_ipc_splice, OID_AUTO, receive_stream, CTLFLAG_RWTUN, + &splice_receive_stream, 0, + "Use soreceive_stream() for stream splices"); + +static uma_zone_t splice_zone; +static struct proc *splice_proc; +struct splice_wq { + struct mtx mtx; + STAILQ_HEAD(, so_splice) head; + bool running; +} __aligned(CACHE_LINE_SIZE); +static struct splice_wq *splice_wq; +struct splice_domain_info { + int count; + int cpu[MAXCPU]; +}; +static int splice_bind_threads = 1; +static uint16_t splice_cpuid_lookup[MAXCPU]; +static struct splice_domain_info splice_domains[MAXMEMDOM]; +static uint32_t splice_index = 0; + +static void so_splice_timeout(void *arg, int pending); +static void so_splice_xfer(struct so_splice *s); +static int so_unsplice(struct socket *so, bool timeout); + +static void +splice_work_thread(void *ctx) +{ + struct splice_wq *wq = ctx; + struct so_splice *s, *s_temp; + STAILQ_HEAD(, so_splice) local_head; + int cpu; + + cpu = wq - splice_wq; + if (bootverbose) + printf("starting so_splice worker thread for CPU %d\n", cpu); + + for (;;) { + mtx_lock(&wq->mtx); + while (STAILQ_EMPTY(&wq->head)) { + wq->running = false; + mtx_sleep(wq, &wq->mtx, 0, "-", 0); + wq->running = true; + } + STAILQ_INIT(&local_head); + STAILQ_CONCAT(&local_head, &wq->head); + STAILQ_INIT(&wq->head); + mtx_unlock(&wq->mtx); + STAILQ_FOREACH_SAFE(s, &local_head, next, s_temp) { + mtx_lock(&s->mtx); + CURVNET_SET(s->src->so_vnet); + so_splice_xfer(s); + CURVNET_RESTORE(); + } + } +} + +static void +so_splice_dispatch_async(struct so_splice *sp) +{ + struct splice_wq *wq; + bool running; + + wq = &splice_wq[sp->wq_index]; + mtx_lock(&wq->mtx); + STAILQ_INSERT_TAIL(&wq->head, sp, next); + running = wq->running; + mtx_unlock(&wq->mtx); + if (!running) + wakeup(wq); +} + +void +so_splice_dispatch(struct so_splice *sp) +{ + mtx_assert(&sp->mtx, MA_OWNED); + + if (sp->state != SPLICE_IDLE) { + mtx_unlock(&sp->mtx); + } else { + sp->state = SPLICE_QUEUED; + mtx_unlock(&sp->mtx); + so_splice_dispatch_async(sp); + } +} + +static int +splice_zinit(void *mem, int size __unused, int flags __unused) +{ + struct so_splice *s; + + s = (struct so_splice *)mem; + mtx_init(&s->mtx, "so_splice", NULL, MTX_DEF); + return (0); +} + +static void +splice_zfini(void *mem, int size) +{ + struct so_splice *s; + + s = (struct so_splice *)mem; + mtx_destroy(&s->mtx); +} + +static int +splice_init(void) +{ + struct thread *td; + struct pcpu *pc; + int count, domain, error, i, nthr, state; + + state = atomic_load_acq_int(&splice_init_state); + if (__predict_true(state > 0)) + return (0); + if (state < 0) + return (ENXIO); + sx_xlock(&splice_init_lock); + if (splice_init_state != 0) { + sx_xunlock(&splice_init_lock); + return (0); + } + + splice_zone = uma_zcreate("splice", sizeof(struct so_splice), NULL, + NULL, splice_zinit, splice_zfini, UMA_ALIGN_CACHE, 0); + + splice_wq = mallocarray(mp_maxid + 1, sizeof(*splice_wq), M_TEMP, + M_WAITOK | M_ZERO); + + /* + * Initialize the workqueues to run the splice work. We create a + * work queue for each CPU. + */ + nthr = 0; + CPU_FOREACH(i) { + STAILQ_INIT(&splice_wq[i].head); + mtx_init(&splice_wq[i].mtx, "splice work queue", NULL, MTX_DEF); + if (splice_bind_threads > 1) { + pc = pcpu_find(i); + domain = pc->pc_domain; + count = splice_domains[domain].count; + splice_domains[domain].cpu[count] = i; + splice_domains[domain].count++; + } + splice_cpuid_lookup[nthr++] = i; + } + + /* + * If we somehow have an empty domain, fall back to choosing + * among all SPLICE threads. + */ + if (splice_bind_threads > 1) { + for (i = 0; i < vm_ndomains; i++) { + if (splice_domains[i].count == 0) { + splice_bind_threads = 1; + break; + } + } + } + + /* Start kthreads for each workqueue. */ + error = 0; + CPU_FOREACH(i) { + error = kproc_kthread_add(splice_work_thread, &splice_wq[i], + &splice_proc, &td, 0, 0, "so_splice", "thr_%d", i); + if (error) { + printf("Can't add so_splice thread %d error %d\n", + i, error); + break; + } + } + + splice_init_state = error != 0 ? -1 : 1; + sx_xunlock(&splice_init_lock); + + return (error); +} + +/* + * Lock a pair of socket's I/O locks for splicing. Avoid blocking while holding + * one lock in order to avoid potential deadlocks in case there is some other + * code path which acquires more than one I/O lock at a time. + */ +static void +splice_lock_pair(struct socket *so_src, struct socket *so_dst) +{ + int error; + + for (;;) { + error = SOCK_IO_SEND_LOCK(so_dst, SBL_WAIT | SBL_NOINTR); + KASSERT(error == 0, + ("%s: failed to lock send I/O lock: %d", __func__, error)); + error = SOCK_IO_RECV_LOCK(so_src, 0); + KASSERT(error == 0 || error == EWOULDBLOCK, + ("%s: failed to lock recv I/O lock: %d", __func__, error)); + if (error == 0) + break; + SOCK_IO_SEND_UNLOCK(so_dst); + + error = SOCK_IO_RECV_LOCK(so_src, SBL_WAIT | SBL_NOINTR); + KASSERT(error == 0, + ("%s: failed to lock recv I/O lock: %d", __func__, error)); + error = SOCK_IO_SEND_LOCK(so_dst, 0); + KASSERT(error == 0 || error == EWOULDBLOCK, + ("%s: failed to lock send I/O lock: %d", __func__, error)); + if (error == 0) + break; + SOCK_IO_RECV_UNLOCK(so_src); + } +} + +static void +splice_unlock_pair(struct socket *so_src, struct socket *so_dst) +{ + SOCK_IO_RECV_UNLOCK(so_src); + SOCK_IO_SEND_UNLOCK(so_dst); +} + +/* + * Move data from the source to the sink. Assumes that both of the relevant + * socket I/O locks are held. + */ +static int +so_splice_xfer_data(struct socket *so_src, struct socket *so_dst, off_t max, + ssize_t *lenp) +{ + struct uio uio; + struct mbuf *m; + struct sockbuf *sb_src, *sb_dst; + ssize_t len; + long space; + int error, flags; + + SOCK_IO_RECV_ASSERT_LOCKED(so_src); + SOCK_IO_SEND_ASSERT_LOCKED(so_dst); + + error = 0; + m = NULL; + memset(&uio, 0, sizeof(uio)); + + sb_src = &so_src->so_rcv; + sb_dst = &so_dst->so_snd; + + space = sbspace(sb_dst); + if (space < 0) + space = 0; + len = MIN(max, MIN(space, sbavail(sb_src))); + if (len == 0) { + SOCK_RECVBUF_LOCK(so_src); + if ((sb_src->sb_state & SBS_CANTRCVMORE) != 0) + error = EPIPE; + SOCK_RECVBUF_UNLOCK(so_src); + } else { + flags = MSG_DONTWAIT; + uio.uio_resid = len; + if (splice_receive_stream && sb_src->sb_tls_info == NULL) { + error = soreceive_stream_locked(so_src, sb_src, NULL, + &uio, &m, NULL, flags); + } else { + error = soreceive_generic_locked(so_src, NULL, + &uio, &m, NULL, &flags); + } + if (error != 0 && m != NULL) { + m_freem(m); + m = NULL; + } + } + if (m != NULL) { + len -= uio.uio_resid; + error = sosend_generic_locked(so_dst, NULL, NULL, m, NULL, + MSG_DONTWAIT, curthread); + } else if (error == 0) { + len = 0; + SOCK_SENDBUF_LOCK(so_dst); + if ((sb_dst->sb_state & SBS_CANTSENDMORE) != 0) + error = EPIPE; + SOCK_SENDBUF_UNLOCK(so_dst); + } + if (error == 0) + *lenp = len; + return (error); +} + +/* + * Transfer data from the source to the sink. + * + * If "direct" is true, the transfer is done in the context of whichever thread + * is operating on one of the socket buffers. We do not know which locks are + * held, so we can only trylock the socket buffers; if this fails, we fall back + * to the worker thread, which invokes this routine with "direct" set to false. + */ +static void +so_splice_xfer(struct so_splice *sp) +{ + struct socket *so_src, *so_dst; + off_t max; + ssize_t len; + int error; + + mtx_assert(&sp->mtx, MA_OWNED); + KASSERT(sp->state == SPLICE_QUEUED || sp->state == SPLICE_CLOSING, + ("so_splice_xfer: invalid state %d", sp->state)); + KASSERT(sp->max != 0, ("so_splice_xfer: max == 0")); + + if (sp->state == SPLICE_CLOSING) { + /* Userspace asked us to close the splice. */ + goto closing; + } + + sp->state = SPLICE_RUNNING; + so_src = sp->src; + so_dst = sp->dst; + max = sp->max > 0 ? sp->max - so_src->so_splice_sent : OFF_MAX; + if (max < 0) + max = 0; + + /* + * Lock the sockets in order to block userspace from doing anything + * sneaky. If an error occurs or one of the sockets can no longer + * transfer data, we will automatically unsplice. + */ + mtx_unlock(&sp->mtx); + splice_lock_pair(so_src, so_dst); + + error = so_splice_xfer_data(so_src, so_dst, max, &len); + + mtx_lock(&sp->mtx); + + /* + * Update our stats while still holding the socket locks. This + * synchronizes with getsockopt(SO_SPLICE), see the comment there. + */ + if (error == 0) { + KASSERT(len >= 0, ("%s: len %zd < 0", __func__, len)); + so_src->so_splice_sent += len; + } + splice_unlock_pair(so_src, so_dst); + + switch (sp->state) { + case SPLICE_CLOSING: +closing: + sp->state = SPLICE_CLOSED; + wakeup(sp); + mtx_unlock(&sp->mtx); + break; + case SPLICE_RUNNING: + if (error != 0 || + (sp->max > 0 && so_src->so_splice_sent >= sp->max)) { + sp->state = SPLICE_EXCEPTION; + soref(so_src); + mtx_unlock(&sp->mtx); + (void)so_unsplice(so_src, false); + sorele(so_src); + } else { + /* + * Locklessly check for additional bytes in the source's + * receive buffer and queue more work if possible. We + * may end up queuing needless work, but that's ok, and + * if we race with a thread inserting more data into the + * buffer and observe sbavail() == 0, the splice mutex + * ensures that splice_push() will queue more work for + * us. + */ + if (sbavail(&so_src->so_rcv) > 0 && + sbspace(&so_dst->so_snd) > 0) { + sp->state = SPLICE_QUEUED; + mtx_unlock(&sp->mtx); + so_splice_dispatch_async(sp); + } else { + sp->state = SPLICE_IDLE; + mtx_unlock(&sp->mtx); + } + } + break; + default: + __assert_unreachable(); + } +} + static void socket_init(void *tag) { @@ -1213,6 +1611,219 @@ return (0); } +static struct so_splice * +so_splice_alloc(off_t max) +{ + struct so_splice *sp; + + sp = uma_zalloc(splice_zone, M_WAITOK); + sp->src = NULL; + sp->dst = NULL; + sp->max = max > 0 ? max : -1; + do { + sp->wq_index = atomic_fetchadd_32(&splice_index, 1) % + (mp_maxid + 1); + } while (CPU_ABSENT(sp->wq_index)); + sp->state = SPLICE_IDLE; + TIMEOUT_TASK_INIT(taskqueue_thread, &sp->timeout, 0, so_splice_timeout, + sp); + return (sp); +} + +static void +so_splice_free(struct so_splice *sp) +{ + KASSERT(sp->state == SPLICE_CLOSED, + ("so_splice_free: sp %p not closed", sp)); + uma_zfree(splice_zone, sp); +} + +static void +so_splice_timeout(void *arg, int pending __unused) +{ + struct so_splice *sp; + + sp = arg; + (void)so_unsplice(sp->src, true); +} + +/* + * Splice the output from so to the input of so2. + */ +static int +so_splice(struct socket *so, struct socket *so2, struct splice *splice) +{ + struct so_splice *sp; + int error; + + if (splice->sp_max < 0) + return (EINVAL); + /* Handle only TCP for now; TODO: other streaming protos */ + if (so->so_proto->pr_protocol != IPPROTO_TCP || + so2->so_proto->pr_protocol != IPPROTO_TCP) + return (EPROTONOSUPPORT); + if (so->so_vnet != so2->so_vnet) + return (EINVAL); + + /* so_splice_xfer() assumes that we're using these implementations. */ + KASSERT(so->so_proto->pr_sosend == sosend_generic, + ("so_splice: sosend not sosend_generic")); + KASSERT(so2->so_proto->pr_soreceive == soreceive_generic || + so2->so_proto->pr_soreceive == soreceive_stream, + ("so_splice: soreceive not soreceive_generic/stream")); + + sp = so_splice_alloc(splice->sp_max); + so->so_splice_sent = 0; + sp->src = so; + sp->dst = so2; + + error = 0; + SOCK_LOCK(so); + if (SOLISTENING(so)) + error = EINVAL; + else if ((so->so_state & (SS_ISCONNECTED | SS_ISCONNECTING)) == 0) + error = ENOTCONN; + else if (so->so_splice != NULL) + error = EBUSY; + if (error != 0) { + SOCK_UNLOCK(so); + uma_zfree(splice_zone, sp); + return (error); + } + soref(so); + so->so_splice = sp; + SOCK_RECVBUF_LOCK(so); + so->so_rcv.sb_flags |= SB_SPLICED; + SOCK_RECVBUF_UNLOCK(so); + SOCK_UNLOCK(so); + + error = 0; + SOCK_LOCK(so2); + if (SOLISTENING(so2)) + error = EINVAL; + else if ((so2->so_state & (SS_ISCONNECTED | SS_ISCONNECTING)) == 0) + error = ENOTCONN; + else if (so2->so_splice_back != NULL) + error = EBUSY; + if (error != 0) { + SOCK_UNLOCK(so2); + SOCK_LOCK(so); + so->so_splice = NULL; + SOCK_RECVBUF_LOCK(so); + so->so_rcv.sb_flags &= ~SB_SPLICED; + SOCK_RECVBUF_UNLOCK(so); + SOCK_UNLOCK(so); + sorele(so); + uma_zfree(splice_zone, sp); + return (error); + } + soref(so2); + so2->so_splice_back = sp; + SOCK_SENDBUF_LOCK(so2); + so2->so_snd.sb_flags |= SB_SPLICED; + mtx_lock(&sp->mtx); + SOCK_SENDBUF_UNLOCK(so2); + SOCK_UNLOCK(so2); + + if (splice->sp_idle.tv_sec != 0 || splice->sp_idle.tv_usec != 0) { + taskqueue_enqueue_timeout_sbt(taskqueue_thread, &sp->timeout, + tvtosbt(splice->sp_idle), 0, C_PREL(4)); + } + + /* + * Transfer any data already present in the socket buffer. + */ + sp->state = SPLICE_QUEUED; + so_splice_xfer(sp); + return (0); +} + +static int +so_unsplice(struct socket *so, bool timeout) +{ + struct socket *so2; + struct so_splice *sp; + bool drain; + + /* + * First unset SB_SPLICED and hide the splice structure so that + * wakeup routines will stop enqueuing work. This also ensures that + * a only a single thread will proceed with the unsplice. + */ + SOCK_LOCK(so); + if (SOLISTENING(so)) { + SOCK_UNLOCK(so); + return (EINVAL); + } + SOCK_RECVBUF_LOCK(so); + if ((so->so_rcv.sb_flags & SB_SPLICED) == 0) { + SOCK_RECVBUF_UNLOCK(so); + SOCK_UNLOCK(so); + return (ENOTCONN); + } + so->so_rcv.sb_flags &= ~SB_SPLICED; + sp = so->so_splice; + so->so_splice = NULL; + SOCK_RECVBUF_UNLOCK(so); + SOCK_UNLOCK(so); + + so2 = sp->dst; + SOCK_LOCK(so2); + KASSERT(!SOLISTENING(so2), ("%s: so2 is listening", __func__)); + SOCK_SENDBUF_LOCK(so2); + KASSERT((so2->so_snd.sb_flags & SB_SPLICED) != 0, + ("%s: so2 is not spliced", __func__)); + KASSERT(so2->so_splice_back == sp, + ("%s: so_splice_back != sp", __func__)); + so2->so_snd.sb_flags &= ~SB_SPLICED; + so2->so_splice_back = NULL; + SOCK_SENDBUF_UNLOCK(so2); + SOCK_UNLOCK(so2); + + /* + * No new work is being enqueued. The worker thread might be + * splicing data right now, in which case we want to wait for it to + * finish before proceeding. + */ + mtx_lock(&sp->mtx); + switch (sp->state) { + case SPLICE_QUEUED: + case SPLICE_RUNNING: + sp->state = SPLICE_CLOSING; + while (sp->state == SPLICE_CLOSING) + msleep(sp, &sp->mtx, PSOCK, "unsplice", 0); + break; + case SPLICE_IDLE: + case SPLICE_EXCEPTION: + sp->state = SPLICE_CLOSED; + break; + default: + __assert_unreachable(); + } + if (!timeout) { + drain = taskqueue_cancel_timeout(taskqueue_thread, &sp->timeout, + NULL) != 0; + } else { + drain = false; + } + mtx_unlock(&sp->mtx); + if (drain) + taskqueue_drain_timeout(taskqueue_thread, &sp->timeout); + + /* + * Now we hold the sole reference to the splice structure. + * Clean up: signal userspace and release socket references. + */ + sorwakeup(so); + CURVNET_SET(so->so_vnet); + sorele(so); + sowwakeup(so2); + sorele(so2); + CURVNET_RESTORE(); + so_splice_free(sp); + return (0); +} + /* * Free socket upon release of the very last reference. */ @@ -1226,6 +1837,12 @@ ("%s: so %p has references", __func__, so)); KASSERT(SOLISTENING(so) || so->so_qstate == SQ_NONE, ("%s: so %p is on listen queue", __func__, so)); + KASSERT(SOLISTENING(so) || (so->so_rcv.sb_flags & SB_SPLICED) == 0, + ("%s: so %p rcvbuf is spliced", __func__, so)); + KASSERT(SOLISTENING(so) || (so->so_snd.sb_flags & SB_SPLICED) == 0, + ("%s: so %p sndbuf is spliced", __func__, so)); + KASSERT(so->so_splice == NULL && so->so_splice_back == NULL, + ("%s: so %p has spliced data", __func__, so)); SOCK_UNLOCK(so); @@ -1907,8 +2524,10 @@ if (tls != NULL) ktls_free(tls); #endif - if (top != NULL) + if (top != NULL) { + printf("m_freem(%p)\n", top); m_freem(top); + } if (control != NULL) m_freem(control); return (error); @@ -3318,6 +3937,40 @@ so->so_max_pacing_rate = val32; break; + case SO_SPLICE: { + struct splice splice; + + error = sooptcopyin(sopt, &splice, sizeof(splice), + sizeof(splice)); + if (error) + goto bad; + + error = splice_init(); + if (error != 0) + goto bad; + + if (splice.sp_fd >= 0) { + struct file *fp; + struct socket *so2; + + if (!cap_rights_contains(sopt->sopt_rights, + &cap_recv_rights)) { + error = ENOTCAPABLE; + goto bad; + } + error = getsock(sopt->sopt_td, splice.sp_fd, + &cap_send_rights, &fp); + if (error != 0) + goto bad; + so2 = fp->f_data; + + error = so_splice(so, so2, &splice); + fdrop(fp, sopt->sopt_td); + } else { + error = so_unsplice(so, false); + } + break; + } default: #ifdef SOCKET_HHOOK if (V_socket_hhh[HHOOK_SOCKET_OPT]->hhh_nhooks > 0) @@ -3537,6 +4190,33 @@ optval = so->so_max_pacing_rate; goto integer; + case SO_SPLICE: { + off_t n; + + /* + * Acquire the I/O lock to serialize with + * so_splice_xfer(). This is not required for + * correctness, but makes testing simpler: once a byte + * has been transmitted to the sink and observed (e.g., + * by reading from the socket to which the sink is + * connected), a subsequent getsockopt(SO_SPLICE) will + * return an up-to-date value. + */ + error = SOCK_IO_RECV_LOCK(so, SBL_WAIT); + if (error != 0) + goto bad; + SOCK_LOCK(so); + if (SOLISTENING(so)) { + n = 0; + } else { + n = so->so_splice_sent; + } + SOCK_UNLOCK(so); + SOCK_IO_RECV_UNLOCK(so); + error = sooptcopyout(sopt, &n, sizeof(n)); + break; + } + default: #ifdef SOCKET_HHOOK if (V_socket_hhh[HHOOK_SOCKET_OPT]->hhh_nhooks > 0) @@ -3548,9 +4228,7 @@ break; } } -#ifdef MAC bad: -#endif CURVNET_RESTORE(); return (error); } @@ -3713,10 +4391,10 @@ SOCK_SENDBUF_LOCK(so); SOCK_RECVBUF_LOCK(so); if (events & (POLLIN | POLLRDNORM)) - if (soreadabledata(so)) + if (soreadabledata(so) && !isspliced(so)) revents |= events & (POLLIN | POLLRDNORM); if (events & (POLLOUT | POLLWRNORM)) - if (sowriteable(so)) + if (sowriteable(so) && !issplicedback(so)) revents |= events & (POLLOUT | POLLWRNORM); if (events & (POLLPRI | POLLRDBAND)) if (so->so_oobmark || @@ -3824,6 +4502,9 @@ return (!TAILQ_EMPTY(&so->sol_comp)); } + if ((so->so_rcv.sb_flags & SB_SPLICED) != 0) + return (0); + SOCK_RECVBUF_LOCK_ASSERT(so); kn->kn_data = sbavail(&so->so_rcv) - so->so_rcv.sb_ctl; @@ -4330,6 +5011,8 @@ xso->so_oobmark = so->so_oobmark; sbtoxsockbuf(&so->so_snd, &xso->so_snd); sbtoxsockbuf(&so->so_rcv, &xso->so_rcv); + if ((so->so_rcv.sb_flags & SB_SPLICED) != 0) + xso->so_splice_so = (uintptr_t)so->so_splice->dst; } SOCK_UNLOCK(so); } diff --git a/sys/sys/sockbuf.h b/sys/sys/sockbuf.h --- a/sys/sys/sockbuf.h +++ b/sys/sys/sockbuf.h @@ -48,7 +48,8 @@ #define SB_AUTOSIZE 0x800 /* automatically size socket buffer */ #define SB_STOP 0x1000 /* backpressure indicator */ #define SB_AIO_RUNNING 0x2000 /* AIO operation running */ -#define SB_UNUSED 0x4000 /* previously used for SB_TLS_IFNET */ +#define SB_SPLICED 0x4000 /* socket buffer is spliced; + previously used for SB_TLS_IFNET */ #define SB_TLS_RX_RESYNC 0x8000 /* KTLS RX lost HW sync */ #define SBS_CANTSENDMORE 0x0010 /* can't send more data to peer */ diff --git a/sys/sys/socket.h b/sys/sys/socket.h --- a/sys/sys/socket.h +++ b/sys/sys/socket.h @@ -35,6 +35,7 @@ #include #include #include +#include #include /* @@ -173,6 +174,7 @@ #endif #if __BSD_VISIBLE +#define SO_SPLICE 0x1023 /* splice data to other socket */ #define SO_TS_REALTIME_MICRO 0 /* microsecond resolution, realtime */ #define SO_TS_BINTIME 1 /* sub-nanosecond resolution, realtime */ #define SO_TS_REALTIME 2 /* nanosecond resolution, realtime */ @@ -668,6 +670,16 @@ struct msghdr msg_hdr; /* message header */ ssize_t msg_len; /* message length */ }; + +/* + * Structure used for manipulating splice option. + */ +struct splice { + int sp_fd; /* drain socket file descriptor */ + off_t sp_max; /* if set, maximum bytes to splice */ + struct timeval sp_idle; /* idle timeout */ +}; + #endif /* __BSD_VISIBLE */ #if defined(_FORTIFY_SOURCE) && _FORTIFY_SOURCE > 0 diff --git a/sys/sys/socketvar.h b/sys/sys/socketvar.h --- a/sys/sys/socketvar.h +++ b/sys/sys/socketvar.h @@ -45,9 +45,12 @@ #include #include #include +#include #ifdef _KERNEL #include #include +#else +#include #endif struct vnet; @@ -69,6 +72,25 @@ SQ_COMP = 0x1000, /* on sol_comp */ }; + +struct so_splice { + struct socket *src; + struct socket *dst; + off_t max; /* maximum bytes to splice, or -1 */ + struct mtx mtx; + unsigned int wq_index; + enum so_splice_state { + SPLICE_IDLE, /* waiting for work to arrive */ + SPLICE_QUEUED, /* a wakeup has queued some work */ + SPLICE_RUNNING, /* currently transferring data */ + SPLICE_CLOSING, /* waiting for work to drain */ + SPLICE_CLOSED, /* unsplicing, terminal state */ + SPLICE_EXCEPTION, /* I/O error or limit, implicit unsplice */ + } state; + struct timeout_task timeout; + STAILQ_ENTRY(so_splice) next; +}; + /*- * Locking key to struct socket: * (a) constant after allocation, no locking required. @@ -79,6 +101,7 @@ * (f) not locked since integer reads/writes are atomic. * (g) used only as a sleep/wakeup address, no value. * (h) locked by global mutex so_global_mtx. + * (ir,is) locked by recv or send I/O locks. * (k) locked by KTLS workqueue mutex */ TAILQ_HEAD(accept_queue, socket); @@ -117,6 +140,9 @@ int so_ts_clock; /* type of the clock used for timestamps */ uint32_t so_max_pacing_rate; /* (f) TX rate limit in bytes/s */ + struct so_splice *so_splice; /* (b) splice state for sink */ + struct so_splice *so_splice_back; /* (b) splice state for source */ + off_t so_splice_sent; /* (ir) splice bytes sent so far */ /* * Mutexes to prevent interleaving of socket I/O. These have to be @@ -297,6 +323,11 @@ * Macros for sockets and socket buffering. */ + +#define isspliced(so) ((so->so_splice != NULL && \ + so->so_splice->src != NULL)) +#define issplicedback(so) ((so->so_splice_back != NULL && \ + so->so_splice_back->dst != NULL)) /* * Flags to soiolock(). */ @@ -327,9 +358,17 @@ #define soreadabledata(so) \ (sbavail(&(so)->so_rcv) >= (so)->so_rcv.sb_lowat || \ (so)->so_error || (so)->so_rerror) -#define soreadable(so) \ +#define _soreadable(so) \ (soreadabledata(so) || ((so)->so_rcv.sb_state & SBS_CANTRCVMORE)) +static inline bool +soreadable(struct socket *so) +{ + if (isspliced(so)) + return (false); + return (_soreadable(so)); +} + /* can we write something to so? */ #define sowriteable(so) \ ((sbspace(&(so)->so_snd) >= (so)->so_snd.sb_lowat && \ @@ -539,6 +578,11 @@ int soiolock(struct socket *so, struct sx *sx, int flags); void soiounlock(struct sx *sx); +/* + * Socket splicing routines. + */ +void so_splice_dispatch(struct so_splice *sp); + /* * Accept filter functions (duh). */ @@ -562,7 +606,8 @@ kvaddr_t xso_so; /* kernel address of struct socket */ kvaddr_t so_pcb; /* kernel address of struct inpcb */ uint64_t so_oobmark; - int64_t so_spare64[8]; + kvaddr_t so_splice_so; /* kernel address of spliced socket */ + int64_t so_spare64[7]; int32_t xso_protocol; int32_t xso_family; uint32_t so_qlen;