Index: head/contrib/openbsm/bin/auditdistd/auditdistd.h =================================================================== --- head/contrib/openbsm/bin/auditdistd/auditdistd.h (revision 339176) +++ head/contrib/openbsm/bin/auditdistd/auditdistd.h (revision 339177) @@ -1,274 +1,289 @@ /*- * Copyright (c) 2012 The FreeBSD Foundation * All rights reserved. * * This software was developed by Pawel Jakub Dawidek under sponsorship from * the FreeBSD Foundation. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ #ifndef _AUDITDISTD_H_ #define _AUDITDISTD_H_ #include #include #include #include #include #include #include #include #include #include #include #include "proto.h" /* * Version history: * 0 - initial version */ #define ADIST_VERSION 0 #define ADIST_ROLE_UNDEF 0 #define ADIST_ROLE_SENDER 1 #define ADIST_ROLE_RECEIVER 2 #define ADIST_USER "auditdistd" #define ADIST_TIMEOUT 20 #define ADIST_CONFIG "/etc/security/auditdistd.conf" #define ADIST_TCP_PORT "7878" #define ADIST_LISTEN_TLS_TCP4 "tls://0.0.0.0:" ADIST_TCP_PORT #define ADIST_LISTEN_TLS_TCP6 "tls://[::]:" ADIST_TCP_PORT #define ADIST_PIDFILE "/var/run/auditdistd.pid" #define ADIST_DIRECTORY_SENDER "/var/audit/dist" #define ADIST_DIRECTORY_RECEIVER "/var/audit/remote" #define ADIST_CERTFILE "/etc/security/auditdistd.cert.pem" #define ADIST_KEYFILE "/etc/security/auditdistd.key.pem" #define ADIST_ERROR_WRONG_ORDER 1 #define ADIST_ERROR_INVALID_NAME 2 #define ADIST_ERROR_OPEN_OLD 3 #define ADIST_ERROR_CREATE 4 #define ADIST_ERROR_OPEN 5 #define ADIST_ERROR_READ 6 #define ADIST_ERROR_WRITE 7 #define ADIST_ERROR_RENAME 8 #define ADIST_ADDRSIZE 1024 #define ADIST_HOSTSIZE 256 #define ADIST_PATHSIZE 256 #define ADIST_PASSWORDSIZE 128 #define ADIST_FINGERPRINTSIZE 256 /* Number of seconds to sleep between reconnect retries or keepalive packets. */ #define ADIST_KEEPALIVE 10 struct adist_listen { /* Address to listen on. */ char adl_addr[ADIST_ADDRSIZE]; /* Protocol-specific data. */ struct proto_conn *adl_conn; TAILQ_ENTRY(adist_listen) adl_next; }; struct adist_config { /* Our name. */ char adc_name[ADIST_HOSTSIZE]; /* PID file path. */ char adc_pidfile[PATH_MAX]; /* Connection timeout. */ int adc_timeout; /* Path to receiver's certificate file. */ char adc_certfile[PATH_MAX]; /* Path to receiver's private key file. */ char adc_keyfile[PATH_MAX]; /* List of addresses to listen on. */ TAILQ_HEAD(, adist_listen) adc_listen; /* List of hosts. */ TAILQ_HEAD(, adist_host) adc_hosts; }; #define ADIST_COMPRESSION_NONE 0 #define ADIST_COMPRESSION_LZF 1 #define ADIST_CHECKSUM_NONE 0 #define ADIST_CHECKSUM_CRC32 1 #define ADIST_CHECKSUM_SHA256 2 /* * Structure that describes single host (either sender or receiver). */ struct adist_host { /* Host name. */ char adh_name[ADIST_HOSTSIZE]; /* Host role: ADIST_ROLE_{SENDER,RECEIVER}. */ int adh_role; /* Protocol version negotiated. */ int adh_version; /* Local address to bind to. */ char adh_localaddr[ADIST_ADDRSIZE]; /* Address of the remote component. */ char adh_remoteaddr[ADIST_ADDRSIZE]; /* Connection with remote host. */ struct proto_conn *adh_remote; /* Connection was reestablished, reset the state. */ bool adh_reset; /* * Directory from which audit trail files should be send in * ADIST_ROLE_SENDER case or stored into in ADIST_ROLE_RECEIVER case. */ char adh_directory[PATH_MAX]; /* Compression algorithm. Currently unused. */ int adh_compression; /* Checksum algorithm. Currently unused. */ int adh_checksum; /* Sender's password. */ char adh_password[ADIST_PASSWORDSIZE]; /* Fingerprint of receiver's public key. */ char adh_fingerprint[ADIST_FINGERPRINTSIZE]; /* PID of child worker process. 0 - no child. */ pid_t adh_worker_pid; /* Connection requests from sender to main. */ struct proto_conn *adh_conn; /* Receiver-specific fields. */ char adh_trail_name[ADIST_PATHSIZE]; int adh_trail_fd; int adh_trail_dirfd; DIR *adh_trail_dirfp; /* Sender-specific fields. */ uint64_t adh_trail_offset; /* Next resource. */ TAILQ_ENTRY(adist_host) adh_next; }; #define ADIST_BYTEORDER_UNDEFINED 0 #define ADIST_BYTEORDER_LITTLE_ENDIAN 1 #define ADIST_BYTEORDER_BIG_ENDIAN 2 #if _BYTE_ORDER == _LITTLE_ENDIAN #define ADIST_BYTEORDER ADIST_BYTEORDER_LITTLE_ENDIAN #elif _BYTE_ORDER == _BIG_ENDIAN #define ADIST_BYTEORDER ADIST_BYTEORDER_BIG_ENDIAN #else #error Unknown byte order. #endif struct adpkt { uint8_t adp_byteorder; #define ADIST_CMD_UNDEFINED 0 #define ADIST_CMD_OPEN 1 #define ADIST_CMD_APPEND 2 #define ADIST_CMD_CLOSE 3 #define ADIST_CMD_KEEPALIVE 4 #define ADIST_CMD_ERROR 5 uint8_t adp_cmd; uint64_t adp_seq; uint32_t adp_datasize; unsigned char adp_data[0]; } __packed; struct adreq { int adr_error; TAILQ_ENTRY(adreq) adr_next; struct adpkt adr_packet; }; #define adr_byteorder adr_packet.adp_byteorder #define adr_cmd adr_packet.adp_cmd #define adr_seq adr_packet.adp_seq #define adr_datasize adr_packet.adp_datasize #define adr_data adr_packet.adp_data #define ADPKT_SIZE(adreq) (sizeof((adreq)->adr_packet) + (adreq)->adr_datasize) struct adrep { uint8_t adrp_byteorder; uint64_t adrp_seq; uint16_t adrp_error; } __packed; #define ADIST_QUEUE_SIZE 16 #define ADIST_BUF_SIZE 65536 #define QUEUE_TAKE(adreq, list, timeout) do { \ mtx_lock(list##_lock); \ if ((timeout) == 0) { \ while (((adreq) = TAILQ_FIRST(list)) == NULL) \ cv_wait(list##_cond, list##_lock); \ } else { \ (adreq) = TAILQ_FIRST(list); \ if ((adreq) == NULL) { \ cv_timedwait(list##_cond, list##_lock, \ (timeout)); \ (adreq) = TAILQ_FIRST(list); \ } \ } \ if ((adreq) != NULL) \ TAILQ_REMOVE((list), (adreq), adr_next); \ mtx_unlock(list##_lock); \ } while (0) #define QUEUE_INSERT(adreq, list) do { \ bool _wakeup; \ \ mtx_lock(list##_lock); \ _wakeup = TAILQ_EMPTY(list); \ TAILQ_INSERT_TAIL((list), (adreq), adr_next); \ mtx_unlock(list##_lock); \ if (_wakeup) \ cv_signal(list##_cond); \ } while (0) +#define QUEUE_CONCAT2(tolist, fromlist1, fromlist2) do { \ + bool _wakeup; \ + \ + mtx_lock(tolist##_lock); \ + _wakeup = TAILQ_EMPTY(tolist); \ + mtx_lock(fromlist1##_lock); \ + TAILQ_CONCAT((tolist), (fromlist1), adr_next); \ + mtx_unlock(fromlist1##_lock); \ + mtx_lock(fromlist2##_lock); \ + TAILQ_CONCAT((tolist), (fromlist2), adr_next); \ + mtx_unlock(fromlist2##_lock); \ + mtx_unlock(tolist##_lock); \ + if (_wakeup) \ + cv_signal(tolist##_cond); \ +} while (0) #define QUEUE_WAIT(list) do { \ mtx_lock(list##_lock); \ while (TAILQ_EMPTY(list)) \ cv_wait(list##_cond, list##_lock); \ mtx_unlock(list##_lock); \ } while (0) extern const char *cfgpath; extern bool sigexit_received; extern struct pidfh *pfh; void descriptors_cleanup(struct adist_host *adhost); void descriptors_assert(const struct adist_host *adhost, int pjdlogmode); void adist_sender(struct adist_config *config, struct adist_host *adhost); void adist_receiver(struct adist_config *config, struct adist_host *adhost); struct adist_config *yy_config_parse(const char *config, bool exitonerror); void yy_config_free(struct adist_config *config); void yyerror(const char *); int yylex(void); #endif /* !_AUDITDISTD_H_ */ Index: head/contrib/openbsm/bin/auditdistd/sender.c =================================================================== --- head/contrib/openbsm/bin/auditdistd/sender.c (revision 339176) +++ head/contrib/openbsm/bin/auditdistd/sender.c (revision 339177) @@ -1,845 +1,842 @@ /*- * Copyright (c) 2012 The FreeBSD Foundation * All rights reserved. * * This software was developed by Pawel Jakub Dawidek under sponsorship from * the FreeBSD Foundation. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ #include #include #if defined(HAVE_SYS_ENDIAN_H) && defined(HAVE_BSWAP) #include #else /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */ #ifdef HAVE_MACHINE_ENDIAN_H #include #else /* !HAVE_MACHINE_ENDIAN_H */ #ifdef HAVE_ENDIAN_H #include #else /* !HAVE_ENDIAN_H */ #error "No supported endian.h" #endif /* !HAVE_ENDIAN_H */ #endif /* !HAVE_MACHINE_ENDIAN_H */ #include #endif /* !HAVE_SYS_ENDIAN_H || !HAVE_BSWAP */ #include #include #include #include #include #include #include #include #include #include #include #ifdef HAVE_LIBUTIL_H #include #endif #include #include #include #include #ifndef HAVE_SIGTIMEDWAIT #include "sigtimedwait.h" #endif #include "auditdistd.h" #include "pjdlog.h" #include "proto.h" #include "sandbox.h" #include "subr.h" #include "synch.h" #include "trail.h" static struct adist_config *adcfg; static struct adist_host *adhost; static pthread_rwlock_t adist_remote_lock; static pthread_mutex_t adist_remote_mtx; static pthread_cond_t adist_remote_cond; static struct trail *adist_trail; static TAILQ_HEAD(, adreq) adist_free_list; static pthread_mutex_t adist_free_list_lock; static pthread_cond_t adist_free_list_cond; static TAILQ_HEAD(, adreq) adist_send_list; static pthread_mutex_t adist_send_list_lock; static pthread_cond_t adist_send_list_cond; static TAILQ_HEAD(, adreq) adist_recv_list; static pthread_mutex_t adist_recv_list_lock; static pthread_cond_t adist_recv_list_cond; static void init_environment(void) { struct adreq *adreq; unsigned int ii; rw_init(&adist_remote_lock); mtx_init(&adist_remote_mtx); cv_init(&adist_remote_cond); TAILQ_INIT(&adist_free_list); mtx_init(&adist_free_list_lock); cv_init(&adist_free_list_cond); TAILQ_INIT(&adist_send_list); mtx_init(&adist_send_list_lock); cv_init(&adist_send_list_cond); TAILQ_INIT(&adist_recv_list); mtx_init(&adist_recv_list_lock); cv_init(&adist_recv_list_cond); for (ii = 0; ii < ADIST_QUEUE_SIZE; ii++) { adreq = malloc(sizeof(*adreq) + ADIST_BUF_SIZE); if (adreq == NULL) { pjdlog_exitx(EX_TEMPFAIL, "Unable to allocate %zu bytes of memory for adreq object.", sizeof(*adreq) + ADIST_BUF_SIZE); } adreq->adr_byteorder = ADIST_BYTEORDER; adreq->adr_cmd = ADIST_CMD_UNDEFINED; adreq->adr_seq = 0; adreq->adr_datasize = 0; TAILQ_INSERT_TAIL(&adist_free_list, adreq, adr_next); } } static int sender_connect(void) { unsigned char rnd[32], hash[32], resp[32]; struct proto_conn *conn; char welcome[8]; int16_t val; val = 1; if (proto_send(adhost->adh_conn, &val, sizeof(val)) < 0) { pjdlog_exit(EX_TEMPFAIL, "Unable to send connection request to parent"); } if (proto_recv(adhost->adh_conn, &val, sizeof(val)) < 0) { pjdlog_exit(EX_TEMPFAIL, "Unable to receive reply to connection request from parent"); } if (val != 0) { errno = val; pjdlog_errno(LOG_WARNING, "Unable to connect to %s", adhost->adh_remoteaddr); return (-1); } if (proto_connection_recv(adhost->adh_conn, true, &conn) < 0) { pjdlog_exit(EX_TEMPFAIL, "Unable to receive connection from parent"); } if (proto_connect_wait(conn, adcfg->adc_timeout) < 0) { pjdlog_errno(LOG_WARNING, "Unable to connect to %s", adhost->adh_remoteaddr); proto_close(conn); return (-1); } pjdlog_debug(1, "Connected to %s.", adhost->adh_remoteaddr); /* Error in setting timeout is not critical, but why should it fail? */ if (proto_timeout(conn, adcfg->adc_timeout) < 0) pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); else pjdlog_debug(1, "Timeout set to %d.", adcfg->adc_timeout); /* Exchange welcome message, which includes version number. */ (void)snprintf(welcome, sizeof(welcome), "ADIST%02d", ADIST_VERSION); if (proto_send(conn, welcome, sizeof(welcome)) < 0) { pjdlog_errno(LOG_WARNING, "Unable to send welcome message to %s", adhost->adh_remoteaddr); proto_close(conn); return (-1); } pjdlog_debug(1, "Welcome message sent (%s).", welcome); bzero(welcome, sizeof(welcome)); if (proto_recv(conn, welcome, sizeof(welcome)) < 0) { pjdlog_errno(LOG_WARNING, "Unable to receive welcome message from %s", adhost->adh_remoteaddr); proto_close(conn); return (-1); } if (strncmp(welcome, "ADIST", 5) != 0 || !isdigit(welcome[5]) || !isdigit(welcome[6]) || welcome[7] != '\0') { pjdlog_warning("Invalid welcome message from %s.", adhost->adh_remoteaddr); proto_close(conn); return (-1); } pjdlog_debug(1, "Welcome message received (%s).", welcome); /* * Receiver can only reply with version number lower or equal to * the one we sent. */ adhost->adh_version = atoi(welcome + 5); if (adhost->adh_version > ADIST_VERSION) { pjdlog_warning("Invalid version number from %s (%d received, up to %d supported).", adhost->adh_remoteaddr, adhost->adh_version, ADIST_VERSION); proto_close(conn); return (-1); } pjdlog_debug(1, "Version %d negotiated with %s.", adhost->adh_version, adhost->adh_remoteaddr); if (proto_send(conn, adcfg->adc_name, sizeof(adcfg->adc_name)) == -1) { pjdlog_errno(LOG_WARNING, "Unable to send name to %s", adhost->adh_remoteaddr); proto_close(conn); return (-1); } pjdlog_debug(1, "Name (%s) sent.", adcfg->adc_name); if (proto_recv(conn, rnd, sizeof(rnd)) == -1) { pjdlog_errno(LOG_WARNING, "Unable to receive challenge from %s", adhost->adh_remoteaddr); proto_close(conn); return (-1); } pjdlog_debug(1, "Challenge received."); if (HMAC(EVP_sha256(), adhost->adh_password, (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash, NULL) == NULL) { pjdlog_warning("Unable to generate response."); proto_close(conn); return (-1); } pjdlog_debug(1, "Response generated."); if (proto_send(conn, hash, sizeof(hash)) == -1) { pjdlog_errno(LOG_WARNING, "Unable to send response to %s", adhost->adh_remoteaddr); proto_close(conn); return (-1); } pjdlog_debug(1, "Response sent."); if (adist_random(rnd, sizeof(rnd)) == -1) { pjdlog_warning("Unable to generate challenge."); proto_close(conn); return (-1); } pjdlog_debug(1, "Challenge generated."); if (proto_send(conn, rnd, sizeof(rnd)) == -1) { pjdlog_errno(LOG_WARNING, "Unable to send challenge to %s", adhost->adh_remoteaddr); proto_close(conn); return (-1); } pjdlog_debug(1, "Challenge sent."); if (proto_recv(conn, resp, sizeof(resp)) == -1) { pjdlog_errno(LOG_WARNING, "Unable to receive response from %s", adhost->adh_remoteaddr); proto_close(conn); return (-1); } pjdlog_debug(1, "Response received."); if (HMAC(EVP_sha256(), adhost->adh_password, (int)strlen(adhost->adh_password), rnd, (int)sizeof(rnd), hash, NULL) == NULL) { pjdlog_warning("Unable to generate hash."); proto_close(conn); return (-1); } pjdlog_debug(1, "Hash generated."); if (memcmp(resp, hash, sizeof(hash)) != 0) { pjdlog_warning("Invalid response from %s (wrong password?).", adhost->adh_remoteaddr); proto_close(conn); return (-1); } pjdlog_info("Receiver authenticated."); if (proto_recv(conn, &adhost->adh_trail_offset, sizeof(adhost->adh_trail_offset)) == -1) { pjdlog_errno(LOG_WARNING, "Unable to receive size of the most recent trail file from %s", adhost->adh_remoteaddr); proto_close(conn); return (-1); } adhost->adh_trail_offset = le64toh(adhost->adh_trail_offset); if (proto_recv(conn, &adhost->adh_trail_name, sizeof(adhost->adh_trail_name)) == -1) { pjdlog_errno(LOG_WARNING, "Unable to receive name of the most recent trail file from %s", adhost->adh_remoteaddr); proto_close(conn); return (-1); } pjdlog_debug(1, "Trail name (%s) and offset (%ju) received.", adhost->adh_trail_name, (uintmax_t)adhost->adh_trail_offset); rw_wlock(&adist_remote_lock); mtx_lock(&adist_remote_mtx); PJDLOG_ASSERT(adhost->adh_remote == NULL); PJDLOG_ASSERT(conn != NULL); adhost->adh_remote = conn; mtx_unlock(&adist_remote_mtx); rw_unlock(&adist_remote_lock); cv_signal(&adist_remote_cond); return (0); } static void sender_disconnect(void) { rw_wlock(&adist_remote_lock); /* * Check for a race between dropping rlock and acquiring wlock - * another thread can close connection in-between. */ if (adhost->adh_remote == NULL) { rw_unlock(&adist_remote_lock); return; } pjdlog_debug(2, "Closing connection to %s.", adhost->adh_remoteaddr); proto_close(adhost->adh_remote); mtx_lock(&adist_remote_mtx); adhost->adh_remote = NULL; adhost->adh_reset = true; adhost->adh_trail_name[0] = '\0'; adhost->adh_trail_offset = 0; mtx_unlock(&adist_remote_mtx); rw_unlock(&adist_remote_lock); pjdlog_warning("Disconnected from %s.", adhost->adh_remoteaddr); /* Move all in-flight requests back onto free list. */ - mtx_lock(&adist_free_list_lock); - mtx_lock(&adist_send_list_lock); - TAILQ_CONCAT(&adist_free_list, &adist_send_list, adr_next); - mtx_unlock(&adist_send_list_lock); - mtx_lock(&adist_recv_list_lock); - TAILQ_CONCAT(&adist_free_list, &adist_recv_list, adr_next); - mtx_unlock(&adist_recv_list_lock); - mtx_unlock(&adist_free_list_lock); + QUEUE_CONCAT2(&adist_free_list, &adist_send_list, &adist_recv_list); } static void adreq_fill(struct adreq *adreq, uint8_t cmd, const unsigned char *data, size_t size) { static uint64_t seq = 1; PJDLOG_ASSERT(size <= ADIST_BUF_SIZE); switch (cmd) { case ADIST_CMD_OPEN: case ADIST_CMD_CLOSE: PJDLOG_ASSERT(data != NULL && size == 0); size = strlen(data) + 1; break; case ADIST_CMD_APPEND: PJDLOG_ASSERT(data != NULL && size > 0); break; case ADIST_CMD_KEEPALIVE: case ADIST_CMD_ERROR: PJDLOG_ASSERT(data == NULL && size == 0); break; default: PJDLOG_ABORT("Invalid command (%hhu).", cmd); } adreq->adr_cmd = cmd; adreq->adr_seq = seq++; adreq->adr_datasize = size; /* Don't copy if data is already in out buffer. */ if (data != NULL && data != adreq->adr_data) bcopy(data, adreq->adr_data, size); } static bool read_thread_wait(void) { bool newfile = false; mtx_lock(&adist_remote_mtx); if (adhost->adh_reset) { reset: adhost->adh_reset = false; if (trail_filefd(adist_trail) != -1) trail_close(adist_trail); trail_reset(adist_trail); while (adhost->adh_remote == NULL) cv_wait(&adist_remote_cond, &adist_remote_mtx); trail_start(adist_trail, adhost->adh_trail_name, adhost->adh_trail_offset); newfile = true; } mtx_unlock(&adist_remote_mtx); while (trail_filefd(adist_trail) == -1) { newfile = true; wait_for_dir(); /* * We may have been disconnected and reconnected in the * meantime, check if reset is set. */ mtx_lock(&adist_remote_mtx); if (adhost->adh_reset) goto reset; mtx_unlock(&adist_remote_mtx); if (trail_filefd(adist_trail) == -1) trail_next(adist_trail); } if (newfile) { pjdlog_debug(1, "Trail file \"%s/%s\" opened.", adhost->adh_directory, trail_filename(adist_trail)); (void)wait_for_file_init(trail_filefd(adist_trail)); } return (newfile); } static void * read_thread(void *arg __unused) { struct adreq *adreq; ssize_t done; bool newfile; pjdlog_debug(1, "%s started.", __func__); for (;;) { newfile = read_thread_wait(); QUEUE_TAKE(adreq, &adist_free_list, 0); if (newfile) { adreq_fill(adreq, ADIST_CMD_OPEN, trail_filename(adist_trail), 0); newfile = false; goto move; } done = read(trail_filefd(adist_trail), adreq->adr_data, ADIST_BUF_SIZE); if (done == -1) { off_t offset; int error; error = errno; offset = lseek(trail_filefd(adist_trail), 0, SEEK_CUR); errno = error; pjdlog_errno(LOG_ERR, "Error while reading \"%s/%s\" at offset %jd", adhost->adh_directory, trail_filename(adist_trail), offset); trail_close(adist_trail); adreq_fill(adreq, ADIST_CMD_ERROR, NULL, 0); goto move; } else if (done == 0) { /* End of file. */ pjdlog_debug(3, "End of \"%s/%s\".", adhost->adh_directory, trail_filename(adist_trail)); if (!trail_switch(adist_trail)) { /* More audit records can arrive. */ mtx_lock(&adist_free_list_lock); TAILQ_INSERT_TAIL(&adist_free_list, adreq, adr_next); mtx_unlock(&adist_free_list_lock); wait_for_file(); continue; } adreq_fill(adreq, ADIST_CMD_CLOSE, trail_filename(adist_trail), 0); trail_close(adist_trail); goto move; } adreq_fill(adreq, ADIST_CMD_APPEND, adreq->adr_data, done); move: pjdlog_debug(3, "read thread: Moving request %p to the send queue (%hhu).", adreq, adreq->adr_cmd); QUEUE_INSERT(adreq, &adist_send_list); } /* NOTREACHED */ return (NULL); } static void keepalive_send(void) { struct adreq *adreq; rw_rlock(&adist_remote_lock); if (adhost->adh_remote == NULL) { rw_unlock(&adist_remote_lock); return; } rw_unlock(&adist_remote_lock); mtx_lock(&adist_free_list_lock); adreq = TAILQ_FIRST(&adist_free_list); if (adreq != NULL) TAILQ_REMOVE(&adist_free_list, adreq, adr_next); mtx_unlock(&adist_free_list_lock); if (adreq == NULL) return; adreq_fill(adreq, ADIST_CMD_KEEPALIVE, NULL, 0); QUEUE_INSERT(adreq, &adist_send_list); pjdlog_debug(3, "keepalive_send: Request sent."); } /* * Thread sends request to secondary node. */ static void * send_thread(void *arg __unused) { time_t lastcheck, now; struct adreq *adreq; pjdlog_debug(1, "%s started.", __func__); lastcheck = time(NULL); for (;;) { pjdlog_debug(3, "send thread: Taking request."); for (;;) { QUEUE_TAKE(adreq, &adist_send_list, ADIST_KEEPALIVE); if (adreq != NULL) break; now = time(NULL); if (lastcheck + ADIST_KEEPALIVE <= now) { keepalive_send(); lastcheck = now; } } PJDLOG_ASSERT(adreq != NULL); pjdlog_debug(3, "send thread: (%p) Got request %hhu.", adreq, adreq->adr_cmd); /* * Protect connection from disappearing. */ rw_rlock(&adist_remote_lock); /* * Move the request to the recv queue first to avoid race * where the recv thread receives the reply before we move * the request to the recv queue. */ QUEUE_INSERT(adreq, &adist_recv_list); if (adhost->adh_remote == NULL || proto_send(adhost->adh_remote, &adreq->adr_packet, ADPKT_SIZE(adreq)) == -1) { rw_unlock(&adist_remote_lock); pjdlog_debug(1, "send thread: (%p) Unable to send request.", adreq); if (adhost->adh_remote != NULL) sender_disconnect(); continue; } else { pjdlog_debug(3, "Request %p sent successfully.", adreq); adreq_log(LOG_DEBUG, 2, -1, adreq, "send: (%p) Request sent: ", adreq); rw_unlock(&adist_remote_lock); } } /* NOTREACHED */ return (NULL); } static void adrep_decode_header(struct adrep *adrep) { /* Byte-swap only is the receiver is using different byte order. */ if (adrep->adrp_byteorder != ADIST_BYTEORDER) { adrep->adrp_byteorder = ADIST_BYTEORDER; adrep->adrp_seq = bswap64(adrep->adrp_seq); adrep->adrp_error = bswap16(adrep->adrp_error); } } /* * Thread receives answer from secondary node and passes it to ggate_send * thread. */ static void * recv_thread(void *arg __unused) { struct adrep adrep; struct adreq *adreq; pjdlog_debug(1, "%s started.", __func__); for (;;) { /* Wait until there is anything to receive. */ QUEUE_WAIT(&adist_recv_list); pjdlog_debug(3, "recv thread: Got something."); rw_rlock(&adist_remote_lock); if (adhost->adh_remote == NULL) { /* * Connection is dead. - * XXX: We shouldn't be here. + * There is a short race in sender_disconnect() between + * setting adh_remote to NULL and removing entries from + * the recv list, which can result in us being here. + * To avoid just spinning, wait for 0.1s. */ rw_unlock(&adist_remote_lock); + usleep(100000); continue; } if (proto_recv(adhost->adh_remote, &adrep, sizeof(adrep)) == -1) { rw_unlock(&adist_remote_lock); pjdlog_errno(LOG_ERR, "Unable to receive reply"); sender_disconnect(); continue; } rw_unlock(&adist_remote_lock); adrep_decode_header(&adrep); /* * Find the request that was just confirmed. */ mtx_lock(&adist_recv_list_lock); TAILQ_FOREACH(adreq, &adist_recv_list, adr_next) { if (adreq->adr_seq == adrep.adrp_seq) { TAILQ_REMOVE(&adist_recv_list, adreq, adr_next); break; } } if (adreq == NULL) { /* * If we disconnected in the meantime, just continue. * On disconnect sender_disconnect() clears the queue, * we can use that. */ if (TAILQ_EMPTY(&adist_recv_list)) { mtx_unlock(&adist_recv_list_lock); continue; } mtx_unlock(&adist_recv_list_lock); pjdlog_error("Found no request matching received 'seq' field (%ju).", (uintmax_t)adrep.adrp_seq); sender_disconnect(); continue; } mtx_unlock(&adist_recv_list_lock); adreq_log(LOG_DEBUG, 2, -1, adreq, "recv thread: (%p) Request confirmed: ", adreq); pjdlog_debug(3, "recv thread: (%p) Got request %hhu.", adreq, adreq->adr_cmd); if (adrep.adrp_error != 0) { pjdlog_error("Receiver returned error (%s), disconnecting.", adist_errstr((int)adrep.adrp_error)); sender_disconnect(); continue; } if (adreq->adr_cmd == ADIST_CMD_CLOSE) trail_unlink(adist_trail, adreq->adr_data); pjdlog_debug(3, "Request received successfully."); QUEUE_INSERT(adreq, &adist_free_list); } /* NOTREACHED */ return (NULL); } static void guard_check_connection(void) { PJDLOG_ASSERT(adhost->adh_role == ADIST_ROLE_SENDER); rw_rlock(&adist_remote_lock); if (adhost->adh_remote != NULL) { rw_unlock(&adist_remote_lock); pjdlog_debug(3, "remote_guard: Connection to %s is ok.", adhost->adh_remoteaddr); return; } /* * Upgrade the lock. It doesn't have to be atomic as no other thread * can change connection status from disconnected to connected. */ rw_unlock(&adist_remote_lock); pjdlog_debug(1, "remote_guard: Reconnecting to %s.", adhost->adh_remoteaddr); if (sender_connect() == 0) { pjdlog_info("Successfully reconnected to %s.", adhost->adh_remoteaddr); } else { pjdlog_debug(1, "remote_guard: Reconnect to %s failed.", adhost->adh_remoteaddr); } } /* * Thread guards remote connections and reconnects when needed, handles * signals, etc. */ static void * guard_thread(void *arg __unused) { struct timespec timeout; time_t lastcheck, now; sigset_t mask; int signo; lastcheck = time(NULL); PJDLOG_VERIFY(sigemptyset(&mask) == 0); PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); timeout.tv_sec = ADIST_KEEPALIVE; timeout.tv_nsec = 0; signo = -1; for (;;) { switch (signo) { case SIGINT: case SIGTERM: sigexit_received = true; pjdlog_exitx(EX_OK, "Termination signal received, exiting."); break; default: break; } pjdlog_debug(3, "remote_guard: Checking connections."); now = time(NULL); if (lastcheck + ADIST_KEEPALIVE <= now) { guard_check_connection(); lastcheck = now; } signo = sigtimedwait(&mask, NULL, &timeout); } /* NOTREACHED */ return (NULL); } void adist_sender(struct adist_config *config, struct adist_host *adh) { pthread_t td; pid_t pid; int error, mode, debuglevel; /* * Create communication channel for sending connection requests from * child to parent. */ if (proto_connect(NULL, "socketpair://", -1, &adh->adh_conn) == -1) { pjdlog_errno(LOG_ERR, "Unable to create connection sockets between child and parent"); return; } pid = fork(); if (pid == -1) { pjdlog_errno(LOG_ERR, "Unable to fork"); proto_close(adh->adh_conn); adh->adh_conn = NULL; return; } if (pid > 0) { /* This is parent. */ adh->adh_worker_pid = pid; /* Declare that we are receiver. */ proto_recv(adh->adh_conn, NULL, 0); return; } adcfg = config; adhost = adh; mode = pjdlog_mode_get(); debuglevel = pjdlog_debug_get(); /* Declare that we are sender. */ proto_send(adhost->adh_conn, NULL, 0); descriptors_cleanup(adhost); #ifdef TODO descriptors_assert(adhost, mode); #endif pjdlog_init(mode); pjdlog_debug_set(debuglevel); pjdlog_prefix_set("[%s] (%s) ", adhost->adh_name, role2str(adhost->adh_role)); #ifdef HAVE_SETPROCTITLE setproctitle("[%s] (%s) ", adhost->adh_name, role2str(adhost->adh_role)); #endif /* * The sender process should be able to remove entries from its * trail directory, but it should not be able to write to the * trail files, only read from them. */ adist_trail = trail_new(adhost->adh_directory, false); if (adist_trail == NULL) exit(EX_OSFILE); if (sandbox(ADIST_USER, true, "auditdistd: %s (%s)", role2str(adhost->adh_role), adhost->adh_name) != 0) { exit(EX_CONFIG); } pjdlog_info("Privileges successfully dropped."); /* * We can ignore wait_for_dir_init() failures. It will fall back to * using sleep(3). */ (void)wait_for_dir_init(trail_dirfd(adist_trail)); init_environment(); if (sender_connect() == 0) { pjdlog_info("Successfully connected to %s.", adhost->adh_remoteaddr); } adhost->adh_reset = true; /* * Create the guard thread first, so we can handle signals from the * very begining. */ error = pthread_create(&td, NULL, guard_thread, NULL); PJDLOG_ASSERT(error == 0); error = pthread_create(&td, NULL, send_thread, NULL); PJDLOG_ASSERT(error == 0); error = pthread_create(&td, NULL, recv_thread, NULL); PJDLOG_ASSERT(error == 0); (void)read_thread(NULL); }