diff --git a//home/admin/stable.13/usr.sbin/syslogd/syslogd.c b//usr/src/usr.sbin/syslogd/syslogd.c --- a//home/admin/stable.13/usr.sbin/syslogd/syslogd.c +++ b//usr/src/usr.sbin/syslogd/syslogd.c @@ -117,6 +117,7 @@ #include #include #include +#include #if defined(INET) || defined(INET6) #include @@ -256,6 +257,13 @@ * or if f_type is F_PIPE and f_pid > 0. */ +struct chunk_list { + STAILQ_ENTRY(chunk_list) next; + size_t size; + char *ptr; + char buf[]; +}; + struct filed { STAILQ_ENTRY(filed) next; /* next in linked list */ short f_type; /* entry type, see below */ @@ -280,6 +288,8 @@ struct { char f_pname[MAXPATHLEN]; pid_t f_pid; + time_t queue_time; + STAILQ_HEAD(chunks_head, chunk_list) chunks; } f_pipe; } f_un; #define fu_uname f_un.f_uname @@ -368,6 +378,9 @@ "FORW", "USERS", "WALL", "PIPE" }; +#define DEAD_QUEUE_TIME 10 + +static uint64_t pipes_limit = 16*1024*1024, pipes_size; static STAILQ_HEAD(, filed) fhead = STAILQ_HEAD_INITIALIZER(fhead); /* Log files that we write to */ static struct filed consfile; /* Console */ @@ -413,7 +426,6 @@ struct iovlist; static int allowaddr(char *); -static int addfile(struct filed *); static int addpeer(struct peer *); static int addsock(struct addrinfo *, struct socklist *); static struct filed *cfline(const char *, const char *, const char *, @@ -459,10 +471,12 @@ static int waitdaemon(int); static void timedout(int); static void increase_rcvbuf(int); +static void pipe_queue_write(struct filed *); static void close_filed(struct filed *f) { + struct chunk_list *cl; if (f == NULL || f->f_file == -1) return; @@ -482,6 +496,12 @@ break; case F_PIPE: f->fu_pipe_pid = 0; + struct chunk_list *tmp; + STAILQ_FOREACH_SAFE(cl, &f->f_un.f_pipe.chunks, next, tmp) { + STAILQ_REMOVE(&f->f_un.f_pipe.chunks, cl, chunk_list, next); + if(pipes_limit) pipes_size -= cl->size; + free(cl); + } break; } (void)close(f->f_file); @@ -489,20 +509,6 @@ } static int -addfile(struct filed *f0) -{ - struct filed *f; - - f = calloc(1, sizeof(*f)); - if (f == NULL) - err(1, "malloc failed"); - *f = *f0; - STAILQ_INSERT_TAIL(&fhead, f, next); - - return (0); -} - -static int addpeer(struct peer *pe0) { struct peer *pe; @@ -542,18 +548,26 @@ int main(int argc, char *argv[]) { - int ch, i, s, fdsrmax = 0, bflag = 0, pflag = 0, Sflag = 0; + int ch, i, s, fdsrmax = 0, fdswmax = 0, bflag = 0, pflag = 0, Sflag = 0; fd_set *fdsr = NULL; + fd_set *fdsw = NULL; struct timeval tv, *tvp; struct peer *pe; struct socklist *sl; pid_t ppid = 1, spid; char *p; - + struct filed *f; + uint64_t usermem; + size_t usermem_len = sizeof(usermem); + if (madvise(NULL, 0, MADV_PROTECT) != 0) dprintf("madvise() failed: %s\n", strerror(errno)); - while ((ch = getopt(argc, argv, "468Aa:b:cCdf:FHkl:M:m:nNoO:p:P:sS:Tuv")) + if (sysctlbyname("hw.usermem", &usermem, &usermem_len, NULL, 0) >= 0) { + pipes_limit = usermem / 128; + pipes_limit = MAX(pipes_limit, 128*1024); + } + while ((ch = getopt(argc, argv, "468Aa:b:cCdf:g:FHkl:M:m:nNoO:p:P:sS:Tuv")) != -1) switch (ch) { #ifdef INET @@ -617,6 +631,10 @@ case 'F': /* run in foreground instead of daemon */ Foreground++; break; + case 'g': + if (expand_number(optarg, &pipes_limit) == -1) + errx(EX_USAGE, "invalid global pipe limit: %s", optarg); + break; case 'H': RemoteHostname = 1; break; @@ -803,10 +821,18 @@ if (sl->sl_socket > fdsrmax) fdsrmax = sl->sl_socket; } + STAILQ_FOREACH(f, &fhead, next) { + if (f->f_type == F_PIPE && f->fu_pipe_pid && f->f_file > fdsrmax) + fdsrmax = f->f_file; + } fdsr = (fd_set *)calloc(howmany(fdsrmax+1, NFDBITS), sizeof(*fdsr)); if (fdsr == NULL) errx(1, "calloc fd_set"); + fdsw = (fd_set *)calloc(howmany(fdsrmax+1, NFDBITS), + sizeof(*fdsw)); + if (fdsw == NULL) + errx(1, "calloc fd_set (write)"); for (;;) { if (Initialized == 0) @@ -819,17 +845,43 @@ markit(); if (WantDie) { free(fdsr); + free(fdsw); die(WantDie); } + fdswmax = 0; + STAILQ_FOREACH(f, &fhead, next) { + if(f->f_type == F_PIPE && f->fu_pipe_pid && f->f_file > fdswmax) + fdswmax = f->f_file; + } + if(fdswmax > fdsrmax) { + fdsrmax = fdswmax; + free(fdsr); + fdsr = (fd_set *)calloc(howmany(fdsrmax+1, NFDBITS), + sizeof(*fdsr)); + if (fdsr == NULL) + errx(1, "calloc fd_set"); + free(fdsw); + fdsw = (fd_set *)calloc(howmany(fdsrmax+1, NFDBITS), + sizeof(*fdsw)); + if (fdsw == NULL) + errx(1, "calloc fd_set (write)"); + } bzero(fdsr, howmany(fdsrmax+1, NFDBITS) * sizeof(*fdsr)); + bzero(fdsw, howmany(fdsrmax+1, NFDBITS) * + sizeof(*fdsw)); STAILQ_FOREACH(sl, &shead, next) { if (sl->sl_socket != -1 && sl->sl_recv != NULL) FD_SET(sl->sl_socket, fdsr); } - i = select(fdsrmax + 1, fdsr, NULL, NULL, + + STAILQ_FOREACH(f, &fhead, next) { + if(f->f_type == F_PIPE && f->fu_pipe_pid && !STAILQ_EMPTY(&f->f_un.f_pipe.chunks)) + FD_SET(f->f_file, fdsw); + } + i = select(fdsrmax + 1, fdsr, fdsw, NULL, needdofsync ? &tv : tvp); switch (i) { case 0: @@ -846,12 +898,31 @@ logerror("select"); continue; } + STAILQ_FOREACH(f, &fhead, next) { + char buf[512]; + if (f->f_type != F_PIPE) + continue; + if (f->fu_pipe_pid == 0 || STAILQ_EMPTY(&f->f_un.f_pipe.chunks)) + continue; + if (FD_ISSET(f->f_file, fdsw)) { + pipe_queue_write(f); + continue; + } + if (time(NULL) - f->f_un.f_pipe.queue_time < DEAD_QUEUE_TIME) + continue; + deadq_enter(f->fu_pipe_pid, f->fu_pipe_pname); + close_filed(f); + errno = 0; + snprintf(buf, sizeof(buf), "pipe to %s stalled more then " __XSTRING(DEAD_QUEUE_TIME) " seconds, killed", f->fu_pipe_pname); + logerror(buf); + } STAILQ_FOREACH(sl, &shead, next) { if (FD_ISSET(sl->sl_socket, fdsr)) (*sl->sl_recv)(sl); } } free(fdsr); + free(fdsw); } static int @@ -1833,6 +1904,105 @@ #endif static void +pipe_failed(struct filed *f, int e) +{ + deadq_enter(f->fu_pipe_pid, f->fu_pipe_pname); + close_filed(f); + if (e == 0) { + char buf[512]; + + errno = 0; + snprintf(buf, sizeof(buf), "pipe to %s stalled more then " __XSTRING(DEAD_QUEUE_TIME) " seconds, killed", f->fu_pipe_pname); + logerror(buf); + } else if (e < 0) { + char buf[512]; + + errno = 0; + snprintf(buf, sizeof(buf), "pipes to %s try allocated more total limit bytes (%lu), killed", f->fu_pipe_pname, pipes_limit); + logerror(buf); + } else { + errno = e; + logerror(f->fu_pipe_pname); + } +} + +static void +pipe_queue_write(struct filed *f) +{ + struct chunk_list *cl, *tmp; + STAILQ_FOREACH_SAFE(cl, &f->f_un.f_pipe.chunks, next, tmp) { + ssize_t cnt = write(f->f_file, cl->ptr, cl->size); + if (cnt < 0) { + if (errno != EAGAIN) + pipe_failed(f, errno); + return; + } + f->f_un.f_pipe.queue_time = time(NULL); + if (pipes_limit) pipes_size -= cnt; + if (cnt != (ssize_t)cl->size) { + cl->ptr += cnt; + cl->size -= cnt; + return; + } + STAILQ_REMOVE(&f->f_un.f_pipe.chunks, cl, chunk_list, next); + free(cl); + } +} + +static void +pipe_add_queue(struct filed *f, struct iovlist *il, size_t skip) +{ + char *p; + struct chunk_list *cl; + + if (time(NULL) - f->f_un.f_pipe.queue_time >= DEAD_QUEUE_TIME) { + pipe_failed(f, 0); + return; + } + if (pipes_limit) { + if (pipes_size + il->totalsize - skip > pipes_limit) { + pipe_failed(f, -1); + return; + } else + pipes_size += il->totalsize - skip; + } + cl = malloc(sizeof(*cl) + il->totalsize - skip); + p = cl->ptr = cl->buf; + cl->size = il->totalsize - skip; + for(size_t i = 0; i < il->iovcnt; i++) { + if (skip < il->iov[i].iov_len) { + memcpy(p, skip + (char*)il->iov[i].iov_base, il->iov[i].iov_len - skip); + p += il->iov[i].iov_len - skip; + skip = 0; + } else + skip -= il->iov[i].iov_len; + } + struct chunks_head *ch = &f->f_un.f_pipe.chunks; + STAILQ_INSERT_TAIL(ch, cl, next); +} + +static void +pipe_write(struct filed *f, struct iovlist *il) +{ + if (!STAILQ_EMPTY(&f->f_un.f_pipe.chunks)) { + pipe_add_queue(f, il, 0); + return; + } + ssize_t cnt = writev(f->f_file, il->iov, il->iovcnt); + if (cnt == (ssize_t)il->totalsize) + return; + if (cnt < 0 && errno != EAGAIN) { + pipe_failed(f, errno); + return; + } + if (cnt < 0) + cnt = 0; + f->f_un.f_pipe.queue_time = time(NULL); + pipe_add_queue(f, il, cnt); + return; +} + +static void fprintlog_write(struct filed *f, struct iovlist *il, int flags) { struct msghdr msghdr; @@ -1947,14 +2117,7 @@ break; } } - if (writev(f->f_file, il->iov, il->iovcnt) < 0) { - int e = errno; - - deadq_enter(f->fu_pipe_pid, f->fu_pipe_pname); - close_filed(f); - errno = e; - logerror(f->fu_pipe_pname); - } + pipe_write(f, il); break; case F_CONSOLE: @@ -2533,8 +2696,7 @@ cline[i] = '\0'; f = cfline(cline, prog, host, pfilter); if (f != NULL) - addfile(f); - free(f); + STAILQ_INSERT_TAIL(&fhead, f, next); } } @@ -2647,12 +2809,10 @@ dprintf("cannot open %s\n", ConfFile); f = cfline("*.ERR\t/dev/console", "*", "*", "*"); if (f != NULL) - addfile(f); - free(f); + STAILQ_INSERT_TAIL(&fhead, f, next); f = cfline("*.PANIC\t*", "*", "*", "*"); if (f != NULL) - addfile(f); - free(f); + STAILQ_INSERT_TAIL(&fhead, f, next); Initialized = 1; return; @@ -3161,6 +3321,8 @@ (void)strlcpy(f->fu_pipe_pname, p + 1, sizeof(f->fu_pipe_pname)); f->f_type = F_PIPE; + f->f_un.f_pipe.queue_time = 0; + STAILQ_INIT(&f->f_un.f_pipe.chunks); break; case '*':