diff --git a/usr.sbin/syslogd/syslogd.c b/usr.sbin/syslogd/syslogd.c --- a/usr.sbin/syslogd/syslogd.c +++ b/usr.sbin/syslogd/syslogd.c @@ -117,6 +117,7 @@ #include #include #include +#include #if defined(INET) || defined(INET6) #include @@ -268,6 +269,13 @@ F_PIPE, /* pipe to program */ }; +struct chunk_list { + STAILQ_ENTRY(chunk_list) next; + ssize_t size; + char *ptr; + char buf[]; +}; + /* * This structure represents the files that will have log * copies printed. @@ -302,6 +310,9 @@ struct { char f_pname[MAXPATHLEN]; int f_procdesc; +#define DEAD_QUEUE_TIME 10 + time_t queue_time; + STAILQ_HEAD(chunks_head, chunk_list) chunks; } f_pipe; /* F_PIPE */ } f_un; #define fu_uname f_un.f_uname @@ -434,6 +445,8 @@ static bool needdofsync = true; /* Are any file(s) waiting to be fsynced? */ static struct pidfh *pfh; static bool RFC3164OutputFormat = true; /* Use legacy format by default. */ +static uint64_t pipes_limit = 16*1024*1024, pipes_size; +static int kq; /* must be accessed from close_filed() and other functions */ struct iovlist; @@ -473,6 +486,8 @@ static void wallmsg(struct filed *, struct iovec *, const int iovlen); static int waitdaemon(int); static void increase_rcvbuf(int); +static void pipe_queue_write(struct filed *); +static void pipe_failed(struct filed *f, int e); static void close_filed(struct filed *f) @@ -495,6 +510,19 @@ break; case F_PIPE: if (f->fu_pipe_pd >= 0) { + struct chunk_list *tmp; + struct chunk_list *cl; + struct kevent ev; + + STAILQ_FOREACH_SAFE(cl, &f->f_un.f_pipe.chunks, next, tmp) { + STAILQ_REMOVE(&f->f_un.f_pipe.chunks, cl, chunk_list, next); + if(pipes_limit) pipes_size -= cl->size; + free(cl); + } + EV_SET(&ev, f->f_file, EVFILT_WRITE, EV_DELETE, 0, 0, f); + if (kevent(kq, &ev, 1, NULL, 0, NULL) == -1) { + warn("failed to remove kevent from kqueue"); + } deadq_enter(f->fu_pipe_pd); f->fu_pipe_pd = -1; } @@ -600,14 +628,21 @@ struct kevent ev; struct socklist *sl; pid_t spid; - int ch, kq, ppipe_w = -1, s; + int ch, ppipe_w = -1, s; char *p; bool bflag = false, pflag = false, Sflag = false; + struct filed *f; + uint64_t usermem; + size_t usermem_len = sizeof(usermem); + if (sysctlbyname("hw.usermem", &usermem, &usermem_len, NULL, 0) >= 0) { + pipes_limit = usermem / 128; + pipes_limit = MAX(pipes_limit, 128*1024); + } if (madvise(NULL, 0, MADV_PROTECT) != 0) dprintf("madvise() failed: %s\n", strerror(errno)); - while ((ch = getopt(argc, argv, "468Aa:b:cCdf:FHkl:M:m:nNoO:p:P:sS:Tuv")) + while ((ch = getopt(argc, argv, "468Aa:b:cCdf:Fg:Hkl:M:m:nNoO:p:P:sS:Tuv")) != -1) switch (ch) { #ifdef INET @@ -665,6 +700,10 @@ case 'F': /* run in foreground instead of daemon */ Foreground = true; break; + case 'g': + if (expand_number(optarg, &pipes_limit) == -1) + errx(EX_USAGE, "invalid global pipe limit: %s, exiting", optarg); + break; case 'H': RemoteHostname = true; break; @@ -886,6 +925,18 @@ if (sl->sl_socket != -1 && sl->sl_recv != NULL) sl->sl_recv(sl); break; + case EVFILT_WRITE: + f = ev.udata; + if (f->f_type == F_PIPE && f->fu_pipe_pd >= 0) + pipe_queue_write(f); + else { + fprintf(stderr, "delete illegal EVFILT_WRITE fro fd %d\n", f->f_file); + EV_SET(&ev, f->f_file, EVFILT_WRITE, EV_DELETE, 0, 0, f); + if (kevent(kq, &ev, 1, NULL, 0, NULL) == -1 && errno != ENOENT) + warn("failed to remove kevent from kqueue"); + errno = 0; + } + break; case EVFILT_SIGNAL: switch (ev.ident) { case SIGHUP: @@ -903,6 +954,15 @@ } break; } + STAILQ_FOREACH(f, &fhead, next) { + if (f->f_type != F_PIPE || f->fu_pipe_pd < 0) + continue; + if (STAILQ_EMPTY(&f->f_un.f_pipe.chunks)) + continue; + if (time(NULL) - f->f_un.f_pipe.queue_time < DEAD_QUEUE_TIME) + continue; + pipe_failed(f, 0); + } } } @@ -1842,6 +1902,121 @@ } #endif +static void +pipe_failed(struct filed *f, int e) +{ + char buf[512]; + + close_filed(f); + if (e == 0) { /* DEAD_QUEUE_TIME timeput */ + errno = 0; + snprintf(buf, sizeof(buf), "pipe to %s stalled more then " + __XSTRING(DEAD_QUEUE_TIME) " seconds, killed", + f->fu_pipe_pname); + logerror(buf); + } else if (e < 0) { + errno = 0; + snprintf(buf, sizeof(buf), "pipes to %s try allocated more " + "total limit bytes (%lu), killed", + f->fu_pipe_pname, pipes_limit); + logerror(buf); + } else { + errno = e; + logerror(f->fu_pipe_pname); + } +} + +static void +pipe_queue_write(struct filed *f) +{ + struct chunk_list *cl, *tmp; + + STAILQ_FOREACH_SAFE(cl, &f->f_un.f_pipe.chunks, next, tmp) { + ssize_t cnt = write(f->f_file, cl->ptr, cl->size); + if (cnt < 0) { + if (errno != EAGAIN) + pipe_failed(f, errno); + return; + } + f->f_un.f_pipe.queue_time = time(NULL); + if (pipes_limit) pipes_size -= cnt; + if (cnt != cl->size) { + struct kevent ev; + + cl->ptr += cnt; + cl->size -= cnt; + EV_SET(&ev, f->f_file, EVFILT_WRITE, EV_ADD | EV_ONESHOT, 0, 0, f); + if (kevent(kq, &ev, 1, NULL, 0, NULL) == -1) { + warn("failed to enable kevent in kqueue"); + pidfile_remove(pfh); + exit(1); + } + return; + } + STAILQ_REMOVE(&f->f_un.f_pipe.chunks, cl, chunk_list, next); + free(cl); + } +} + +static void +pipe_add_queue(struct filed *f, struct iovlist *il, size_t skip) +{ + char *p; + struct chunk_list *cl; + + if (time(NULL) - f->f_un.f_pipe.queue_time >= DEAD_QUEUE_TIME) { + pipe_failed(f, 0); + return; + } + if (pipes_limit) { + if (pipes_size + il->totalsize - skip > pipes_limit) { + pipe_failed(f, -1); + return; + } + pipes_size += il->totalsize - skip; + } + cl = malloc(sizeof(*cl) + il->totalsize - skip); + p = cl->ptr = cl->buf; + cl->size = il->totalsize - skip; + for(size_t i = 0; i < il->iovcnt; i++) { + if (skip < il->iov[i].iov_len) { + memcpy(p, skip + (char*)il->iov[i].iov_base, il->iov[i].iov_len - skip); + p += il->iov[i].iov_len - skip; + skip = 0; + } else + skip -= il->iov[i].iov_len; + } + STAILQ_INSERT_TAIL(&f->f_un.f_pipe.chunks, cl, next); +} + +static void +pipe_write(struct filed *f, struct iovlist *il) +{ + struct kevent ev; + + if (!STAILQ_EMPTY(&f->f_un.f_pipe.chunks)) { + pipe_add_queue(f, il, 0); + return; + } + ssize_t cnt = writev(f->f_file, il->iov, il->iovcnt); + if (cnt == (ssize_t)il->totalsize) + return; + if (cnt < 0 && errno != EAGAIN) { + pipe_failed(f, errno); + return; + } + if (cnt < 0) + cnt = 0; + f->f_un.f_pipe.queue_time = time(NULL); + EV_SET(&ev, f->f_file, EVFILT_WRITE, EV_ADD | EV_ONESHOT, 0, 0, f); + if (kevent(kq, &ev, 1, NULL, 0, NULL) == -1) { + warn("failed to enable write pipe kevent in kqueue"); + pidfile_remove(pfh); + exit(1); + } + pipe_add_queue(f, il, cnt); +} + static void fprintlog_write(struct filed *f, struct iovlist *il, int flags) { @@ -1957,10 +2132,7 @@ break; } } - if (writev(f->f_file, il->iov, il->iovcnt) < 0) { - logerror(f->fu_pipe_pname); - close_filed(f); - } + pipe_write(f, il); break; case F_CONSOLE: @@ -3067,6 +3239,8 @@ (void)strlcpy(f->fu_pipe_pname, p + 1, sizeof(f->fu_pipe_pname)); f->f_type = F_PIPE; + f->f_un.f_pipe.queue_time = 0; + STAILQ_INIT(&f->f_un.f_pipe.chunks); break; case '*':