Changeset View
Changeset View
Standalone View
Standalone View
sys/netlink/netlink_io.c
- This file was added.
/*- | |||||
* SPDX-License-Identifier: BSD-2-Clause-FreeBSD | |||||
* | |||||
* Copyright (c) 2021 Ng Peng Nam Sean | |||||
Lint: Possible Spelling Mistake: Possible spelling error. You wrote 'nam', but did you mean 'name'? | |||||
* Copyright (c) 2022 Alexander V. Chernikov | |||||
* | |||||
* Redistribution and use in source and binary forms, with or without | |||||
* modification, are permitted provided that the following conditions | |||||
* are met: | |||||
* 1. Redistributions of source code must retain the above copyright | |||||
* notice, this list of conditions and the following disclaimer. | |||||
* 2. Redistributions in binary form must reproduce the above copyright | |||||
* notice, this list of conditions and the following disclaimer in the | |||||
* documentation and/or other materials provided with the distribution. | |||||
* | |||||
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND | |||||
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE | |||||
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE | |||||
* ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE | |||||
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL | |||||
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS | |||||
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) | |||||
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT | |||||
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY | |||||
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF | |||||
* SUCH DAMAGE. | |||||
*/ | |||||
#include <sys/cdefs.h> | |||||
__FBSDID("$FreeBSD$"); | |||||
#include <sys/param.h> | |||||
#include <sys/malloc.h> | |||||
#include <sys/lock.h> | |||||
#include <sys/mbuf.h> | |||||
#include <sys/ck.h> | |||||
#include <sys/socket.h> | |||||
#include <sys/socketvar.h> | |||||
#include <sys/syslog.h> | |||||
#include <netlink/netlink.h> | |||||
#include <netlink/netlink_ctl.h> | |||||
#include <netlink/netlink_linux.h> | |||||
#include <netlink/netlink_var.h> | |||||
#define DEBUG_MOD_NAME nl_io | |||||
#define DEBUG_MAX_LEVEL LOG_DEBUG3 | |||||
#include <netlink/netlink_debug.h> | |||||
_DECLARE_DEBUG(LOG_DEBUG); | |||||
/* | |||||
* The logic below provide a p2p interface for receiving and | |||||
* sending netlink data between the kernel and userland. | |||||
*/ | |||||
static struct sockaddr_nl _nl_empty_src = { | |||||
.nl_len = sizeof(struct sockaddr_nl), | |||||
.nl_family = PF_NETLINK, | |||||
.nl_pid = 0 /* comes from the kernel */ | |||||
}; | |||||
static struct sockaddr *nl_empty_src = (struct sockaddr *)&_nl_empty_src; | |||||
Done Inline ActionsCan it be const? glebius: Can it be const? | |||||
static struct mbuf *nl_process_mbuf(struct mbuf *m, struct nlpcb *nlp); | |||||
/* | |||||
struct nl_io_queue { | |||||
struct mbuf *head; | |||||
struct mbuf *last; | |||||
int length; | |||||
}; | |||||
*/ | |||||
static void | |||||
queue_push(struct nl_io_queue *q, struct mbuf *m) | |||||
{ | |||||
struct mbuf *m_last; | |||||
for (m_last = m; m_last->m_nextpkt != NULL; m_last = m_last->m_nextpkt) | |||||
q->length += m_length(m_last, NULL); | |||||
q->length += m_length(m_last, NULL); | |||||
if (q->last == NULL) { | |||||
q->head = m; | |||||
q->last = m_last; | |||||
} else { | |||||
q->last->m_nextpkt = m; | |||||
q->last = m_last; | |||||
} | |||||
} | |||||
static void | |||||
queue_push_head(struct nl_io_queue *q, struct mbuf *m) | |||||
{ | |||||
MPASS(m->m_nextpkt == NULL); | |||||
q->length += m_length(m, NULL); | |||||
if (q->last == NULL) { | |||||
q->head = m; | |||||
q->last = m; | |||||
} else { | |||||
m->m_nextpkt = q->head; | |||||
q->head = m; | |||||
} | |||||
} | |||||
static struct mbuf * | |||||
queue_pop(struct nl_io_queue *q) | |||||
{ | |||||
if (q->head != NULL) { | |||||
struct mbuf *m = q->head; | |||||
q->head = m->m_nextpkt; | |||||
m->m_nextpkt = NULL; | |||||
if (q->head == NULL) | |||||
q->last = NULL; | |||||
q->length -= m_length(m, NULL); | |||||
return (m); | |||||
} | |||||
return (NULL); | |||||
} | |||||
static struct mbuf * | |||||
queue_head(const struct nl_io_queue *q) | |||||
{ | |||||
return (q->head); | |||||
} | |||||
static inline bool | |||||
queue_empty(const struct nl_io_queue *q) | |||||
{ | |||||
return (q->length == 0); | |||||
} | |||||
static void | |||||
queue_free(struct nl_io_queue *q) | |||||
{ | |||||
struct mbuf *m = q->head; | |||||
while (m != NULL) { | |||||
struct mbuf *m_next = m->m_nextpkt; | |||||
m->m_nextpkt = NULL; | |||||
m_freem(m); | |||||
m = m_next; | |||||
} | |||||
q->head = NULL; | |||||
q->last = NULL; | |||||
q->length = 0; | |||||
} | |||||
static void | |||||
nl_schedule_taskqueue(struct nlpcb *nlp) | |||||
{ | |||||
if (!nlp->nl_task_pending) { | |||||
nlp->nl_task_pending = true; | |||||
taskqueue_enqueue(nlp->nl_taskqueue, &nlp->nl_task); | |||||
RT_LOG(LOG_DEBUG3, "taskqueue scheduled"); | |||||
} else { | |||||
RT_LOG(LOG_DEBUG3, "taskqueue schedule skipped"); | |||||
} | |||||
} | |||||
int | |||||
nl_receive_async(struct mbuf *m, struct socket *so) | |||||
{ | |||||
struct nlpcb *nlp = sotonlpcb(so); | |||||
int error = 0; | |||||
m->m_nextpkt = NULL; | |||||
NLP_LOCK(nlp); | |||||
if ((__predict_true(nlp->nl_active))) { | |||||
sbappend(&so->so_snd, m, 0); | |||||
RT_LOG(LOG_DEBUG3, "enqueue %u bytes", m_length(m, NULL)); | |||||
nl_schedule_taskqueue(nlp); | |||||
} else { | |||||
RT_LOG(LOG_DEBUG, "ignoring %u bytes on non-active socket", | |||||
m_length(m, NULL)); | |||||
m_free(m); | |||||
error = EINVAL; | |||||
} | |||||
NLP_UNLOCK(nlp); | |||||
return (error); | |||||
} | |||||
static bool | |||||
tx_check_locked(struct nlpcb *nlp) | |||||
{ | |||||
if (queue_empty(&nlp->tx_queue)) | |||||
return (true); | |||||
/* | |||||
* Check if something can be moved from the internal TX queue | |||||
* to the socket queue. | |||||
*/ | |||||
bool appended = false; | |||||
struct sockbuf *sb = &nlp->nl_socket->so_rcv; | |||||
SOCKBUF_LOCK(sb); | |||||
while (true) { | |||||
struct mbuf *m = queue_head(&nlp->tx_queue); | |||||
if (m && sbappendaddr_locked(sb, nl_empty_src, m, NULL) != 0) { | |||||
/* appended successfully */ | |||||
queue_pop(&nlp->tx_queue); | |||||
appended = true; | |||||
} else | |||||
break; | |||||
} | |||||
SOCKBUF_UNLOCK(sb); | |||||
if (appended) | |||||
sorwakeup(nlp->nl_socket); | |||||
return (queue_empty(&nlp->tx_queue)); | |||||
} | |||||
static bool | |||||
nl_process_received_one(struct nlpcb *nlp) | |||||
{ | |||||
bool reschedule = false; | |||||
NLP_LOCK(nlp); | |||||
nlp->nl_task_pending = false; | |||||
if (!tx_check_locked(nlp)) { | |||||
/* TX overflow queue still not empty, ignore RX */ | |||||
NLP_UNLOCK(nlp); | |||||
return (false); | |||||
} | |||||
if (queue_empty(&nlp->rx_queue)) { | |||||
/* | |||||
* Grab all data we have from the socket TX queue | |||||
* and store it the internal queue, so it can be worked on | |||||
* w/o holding socket lock. | |||||
*/ | |||||
struct sockbuf *sb = &nlp->nl_socket->so_snd; | |||||
SOCKBUF_LOCK(sb); | |||||
unsigned int avail = sbavail(sb); | |||||
if (avail > 0) { | |||||
RT_LOG(LOG_DEBUG3, "grabbed %u bytes", avail); | |||||
queue_push(&nlp->rx_queue, sbcut_locked(sb, avail)); | |||||
} | |||||
SOCKBUF_UNLOCK(sb); | |||||
} else { | |||||
/* Schedule another pass to read from the socket queue */ | |||||
reschedule = true; | |||||
} | |||||
int prev_hiwat = nlp->tx_queue.hiwat; | |||||
NLP_UNLOCK(nlp); | |||||
while (!queue_empty(&nlp->rx_queue)) { | |||||
struct mbuf *m = queue_pop(&nlp->rx_queue); | |||||
m = nl_process_mbuf(m, nlp); | |||||
if (m != NULL) { | |||||
queue_push_head(&nlp->rx_queue, m); | |||||
reschedule = false; | |||||
break; | |||||
} | |||||
} | |||||
if (nlp->tx_queue.hiwat > prev_hiwat) { | |||||
NLP_LOG(LOG_DEBUG, nlp, "TX override peaked to %d", nlp->tx_queue.hiwat); | |||||
} | |||||
return (reschedule); | |||||
} | |||||
static void | |||||
nl_process_received(struct nlpcb *nlp) | |||||
{ | |||||
RT_LOG(LOG_DEBUG3, "taskqueue called"); | |||||
while (nl_process_received_one(nlp)) | |||||
; | |||||
} | |||||
void | |||||
nl_free_io(struct nlpcb *nlp) | |||||
{ | |||||
queue_free(&nlp->rx_queue); | |||||
queue_free(&nlp->tx_queue); | |||||
} | |||||
/* | |||||
* Called after some data have been read from the socket. | |||||
*/ | |||||
void | |||||
nl_on_transmit(struct nlpcb *nlp) | |||||
{ | |||||
NLP_LOCK(nlp); | |||||
struct socket *so = nlp->nl_socket; | |||||
if (__predict_false(nlp->nl_dropped_bytes > 0 && so != NULL)) { | |||||
uint64_t dropped_bytes = nlp->nl_dropped_bytes; | |||||
uint64_t dropped_messages = nlp->nl_dropped_messages; | |||||
nlp->nl_dropped_bytes = 0; | |||||
nlp->nl_dropped_messages = 0; | |||||
struct sockbuf *sb = &so->so_rcv; | |||||
NLP_LOG(LOG_DEBUG, nlp, | |||||
"socket RX overflowed, %lu messages (%lu bytes) dropped. " | |||||
"bytes: [%u/%u] mbufs: [%u/%u]", dropped_messages, dropped_bytes, | |||||
sb->sb_ccc, sb->sb_hiwat, sb->sb_mbcnt, sb->sb_mbmax); | |||||
/* TODO: send netlink message */ | |||||
} | |||||
nl_schedule_taskqueue(nlp); | |||||
NLP_UNLOCK(nlp); | |||||
} | |||||
void | |||||
nl_taskqueue_handler(void *_arg, int pending) | |||||
{ | |||||
struct nlpcb *nlp = (struct nlpcb *)_arg; | |||||
CURVNET_SET(nlp->nl_socket->so_vnet); | |||||
nl_process_received(nlp); | |||||
CURVNET_RESTORE(); | |||||
} | |||||
static __noinline void | |||||
queue_push_tx(struct nlpcb *nlp, struct mbuf *m) | |||||
{ | |||||
queue_push(&nlp->tx_queue, m); | |||||
nlp->nl_tx_blocked = true; | |||||
if (nlp->tx_queue.length > nlp->tx_queue.hiwat) | |||||
nlp->tx_queue.hiwat = nlp->tx_queue.length; | |||||
} | |||||
/* | |||||
* Tries to send @m to the socket @nlp. | |||||
* | |||||
* @m: mbuf(s) to send to. Consumed in any case. | |||||
* @nlp: socket to send to | |||||
* @cnt: number of messages in @m | |||||
* @io_flags: combination of NL_IOF_* flags | |||||
* | |||||
* Returns true on success. | |||||
* If no queue overrunes happened, wakes up socket owner. | |||||
*/ | |||||
bool | |||||
nl_send_one(struct mbuf *m, struct nlpcb *nlp, int num_messages, int io_flags) | |||||
{ | |||||
#if DEBUG_MAX_LEVEL >= LOG_DEBUG2 | |||||
struct nlmsghdr *hdr = mtod(m, struct nlmsghdr *); | |||||
NLP_LOG(LOG_DEBUG2, nlp, "TX mbuf len %u msgs %u msg type %d first hdrlen %u io_flags %X", | |||||
m_length(m, NULL), num_messages, hdr->nlmsg_type, hdr->nlmsg_len, io_flags); | |||||
#endif | |||||
bool untranslated = io_flags & NL_IOF_UNTRANSLATED; | |||||
bool ignore_limits = io_flags & NL_IOF_IGNORE_LIMIT; | |||||
bool result = true; | |||||
if (__predict_false(nlp->nl_linux && linux_netlink_p != NULL && untranslated)) { | |||||
m = linux_netlink_p->mbufs_to_linux(nlp->nl_proto, m, nlp); | |||||
if (m == NULL) | |||||
return (false); | |||||
} | |||||
NLP_LOCK(nlp); | |||||
if (__predict_false(nlp->nl_socket == NULL)) { | |||||
NLP_UNLOCK(nlp); | |||||
m_freem(m); | |||||
return (false); | |||||
} | |||||
if (!queue_empty(&nlp->tx_queue)) { | |||||
if (ignore_limits) { | |||||
queue_push_tx(nlp, m); | |||||
} else { | |||||
m_free(m); | |||||
result = false; | |||||
} | |||||
NLP_UNLOCK(nlp); | |||||
return (result); | |||||
} | |||||
struct socket *so = nlp->nl_socket; | |||||
if (sbappendaddr(&so->so_rcv, nl_empty_src, m, NULL) != 0) { | |||||
sorwakeup(so); | |||||
NLP_LOG(LOG_DEBUG3, nlp, "appended data & woken up"); | |||||
} else { | |||||
if (ignore_limits) { | |||||
queue_push_tx(nlp, m); | |||||
} else { | |||||
/* | |||||
* Store dropped data so it can be reported | |||||
* on the next read | |||||
*/ | |||||
nlp->nl_dropped_bytes += m_length(m, NULL); | |||||
nlp->nl_dropped_messages += num_messages; | |||||
NLP_LOG(LOG_DEBUG2, nlp, "RX oveflow: %lu m (+%d), %lu b (+%d)", | |||||
nlp->nl_dropped_messages, num_messages, | |||||
nlp->nl_dropped_bytes, m_length(m, NULL)); | |||||
soroverflow(so); | |||||
m_freem(m); | |||||
result = false; | |||||
} | |||||
} | |||||
NLP_UNLOCK(nlp); | |||||
return (result); | |||||
} | |||||
static int | |||||
nl_receive_message(struct nlmsghdr *hdr, int remaining_length, | |||||
struct nlpcb *nlp, struct netlink_parse_tracker *npt) | |||||
{ | |||||
nl_handler_f handler = nl_handlers[nlp->nl_proto].cb; | |||||
int error = 0; | |||||
RT_LOG(LOG_DEBUG2, "msg len: %d type: %d", hdr->nlmsg_len, hdr->nlmsg_type); | |||||
if (__predict_false(hdr->nlmsg_len > remaining_length)) { | |||||
RT_LOG(LOG_DEBUG, "invalid message"); | |||||
return (EINVAL); | |||||
} else if (__predict_false(hdr->nlmsg_len < sizeof(*hdr))) { | |||||
RT_LOG(LOG_DEBUG, "message too short: %d", hdr->nlmsg_len); | |||||
return (EINVAL); | |||||
} | |||||
/* Stamp each message with sender pid */ | |||||
hdr->nlmsg_pid = nlp->nl_port; | |||||
if (hdr->nlmsg_flags & NLM_F_REQUEST && hdr->nlmsg_type >= NLMSG_MIN_TYPE) { | |||||
RT_LOG(LOG_DEBUG2, "handling message with msg type: %d", | |||||
hdr->nlmsg_type); | |||||
struct nlmsghdr *thdr = hdr; | |||||
if (nlp->nl_linux && linux_netlink_p != NULL) { | |||||
thdr = linux_netlink_p->msg_from_linux(nlp->nl_proto, hdr, npt); | |||||
} | |||||
error = handler(thdr, npt); | |||||
RT_LOG(LOG_DEBUG2, "retcode: %d", error); | |||||
} | |||||
if ((hdr->nlmsg_flags & NLM_F_ACK) || (error != 0 && error != EINTR)) { | |||||
RT_LOG(LOG_DEBUG3, "ack"); | |||||
nlmsg_ack(nlp, error, hdr); | |||||
RT_LOG(LOG_DEBUG3, "done"); | |||||
} | |||||
return (0); | |||||
} | |||||
/* | |||||
* Processes an incoming packet, which can contain multiple netlink messages | |||||
*/ | |||||
static struct mbuf * | |||||
nl_process_mbuf(struct mbuf *m, struct nlpcb *nlp) | |||||
{ | |||||
int offset, buffer_length; | |||||
struct nlmsghdr *hdr; | |||||
char *buffer; | |||||
int error; | |||||
RT_LOG(LOG_DEBUG3, "RX netlink mbuf %p on %p", m, nlp->nl_socket); | |||||
/* TODO: alloc this buf once for nlp */ | |||||
int data_length = m_length(m, NULL); | |||||
buffer_length = roundup2(data_length, 8) + SCRATCH_BUFFER_SIZE; | |||||
if (nlp->nl_linux) | |||||
buffer_length += roundup2(data_length, 8); | |||||
buffer = malloc(buffer_length, M_NETLINK, M_NOWAIT | M_ZERO); | |||||
if (buffer == NULL) { | |||||
m_freem(m); | |||||
RT_LOG(LOG_DEBUG, "Unable to allocate %d bytes of memory", | |||||
buffer_length); | |||||
return (NULL); | |||||
} | |||||
m_copydata(m, 0, data_length, buffer); | |||||
struct netlink_parse_tracker npt = { | |||||
.nlp = nlp, | |||||
.lb.base = &buffer[roundup2(data_length, 8)], | |||||
.lb.size = buffer_length - roundup2(data_length, 8), | |||||
}; | |||||
for (offset = 0; offset + sizeof(struct nlmsghdr) <= data_length;) { | |||||
hdr = (struct nlmsghdr *)&buffer[offset]; | |||||
/* Save length prior to calling handler */ | |||||
int msglen = NLMSG_ALIGN(hdr->nlmsg_len); | |||||
RT_LOG(LOG_DEBUG3, "parsing offset %d/%d", offset, data_length); | |||||
/* Update parse state */ | |||||
lb_clear(&npt.lb); | |||||
error = nl_receive_message(hdr, data_length - offset, nlp, &npt); | |||||
offset += msglen; | |||||
if (__predict_false(error != 0 || nlp->nl_tx_blocked)) | |||||
break; | |||||
} | |||||
RT_LOG(LOG_DEBUG3, "packet parsing done"); | |||||
free(buffer, M_NETLINK); | |||||
if (nlp->nl_tx_blocked) { | |||||
NLP_LOCK(nlp); | |||||
nlp->nl_tx_blocked = false; | |||||
NLP_UNLOCK(nlp); | |||||
m_adj(m, offset); | |||||
return (m); | |||||
} else { | |||||
m_freem(m); | |||||
return (NULL); | |||||
} | |||||
} |
Possible spelling error. You wrote 'nam', but did you mean 'name'?