Index: stable/8/sbin/hastctl/hastctl.c =================================================================== --- stable/8/sbin/hastctl/hastctl.c (revision 221506) +++ stable/8/sbin/hastctl/hastctl.c (revision 221507) @@ -1,520 +1,520 @@ /*- * Copyright (c) 2009-2010 The FreeBSD Foundation * All rights reserved. * * This software was developed by Pawel Jakub Dawidek under sponsorship from * the FreeBSD Foundation. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ #include __FBSDID("$FreeBSD$"); #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "hast.h" #include "hast_proto.h" #include "metadata.h" #include "nv.h" #include "pjdlog.h" #include "proto.h" #include "subr.h" /* Path to configuration file. */ static const char *cfgpath = HAST_CONFIG; /* Hastd configuration. */ static struct hastd_config *cfg; /* Control connection. */ static struct proto_conn *controlconn; enum { CMD_INVALID, CMD_CREATE, CMD_ROLE, CMD_STATUS, CMD_DUMP }; static __dead2 void usage(void) { fprintf(stderr, "usage: %s create [-d] [-c config] [-e extentsize] [-k keepdirty]\n" "\t\t[-m mediasize] name ...\n", getprogname()); fprintf(stderr, " %s role [-d] [-c config] all | name ...\n", getprogname()); fprintf(stderr, " %s status [-d] [-c config] [all | name ...]\n", getprogname()); fprintf(stderr, " %s dump [-d] [-c config] [all | name ...]\n", getprogname()); exit(EX_USAGE); } static int create_one(struct hast_resource *res, intmax_t mediasize, intmax_t extentsize, intmax_t keepdirty) { unsigned char *buf; size_t mapsize; int ec; ec = 0; pjdlog_prefix_set("[%s] ", res->hr_name); if (provinfo(res, true) < 0) { ec = EX_NOINPUT; goto end; } if (mediasize == 0) mediasize = res->hr_local_mediasize; else if (mediasize > res->hr_local_mediasize) { pjdlog_error("Provided mediasize is larger than provider %s size.", res->hr_localpath); ec = EX_DATAERR; goto end; } if (!powerof2(res->hr_local_sectorsize)) { pjdlog_error("Sector size of provider %s is not power of 2 (%u).", res->hr_localpath, res->hr_local_sectorsize); ec = EX_DATAERR; goto end; } if (extentsize == 0) extentsize = HAST_EXTENTSIZE; if (extentsize < res->hr_local_sectorsize) { pjdlog_error("Extent size (%jd) is less than sector size (%u).", (intmax_t)extentsize, res->hr_local_sectorsize); ec = EX_DATAERR; goto end; } if ((extentsize % res->hr_local_sectorsize) != 0) { pjdlog_error("Extent size (%jd) is not multiple of sector size (%u).", (intmax_t)extentsize, res->hr_local_sectorsize); ec = EX_DATAERR; goto end; } mapsize = activemap_calc_ondisk_size(mediasize - METADATA_SIZE, extentsize, res->hr_local_sectorsize); if (keepdirty == 0) keepdirty = HAST_KEEPDIRTY; res->hr_datasize = mediasize - METADATA_SIZE - mapsize; res->hr_extentsize = extentsize; res->hr_keepdirty = keepdirty; res->hr_localoff = METADATA_SIZE + mapsize; if (metadata_write(res) < 0) { ec = EX_IOERR; goto end; } buf = calloc(1, mapsize); if (buf == NULL) { pjdlog_error("Unable to allocate %zu bytes of memory for initial bitmap.", mapsize); ec = EX_TEMPFAIL; goto end; } if (pwrite(res->hr_localfd, buf, mapsize, METADATA_SIZE) != (ssize_t)mapsize) { pjdlog_errno(LOG_ERR, "Unable to store initial bitmap on %s", res->hr_localpath); free(buf); ec = EX_IOERR; goto end; } free(buf); end: if (res->hr_localfd >= 0) close(res->hr_localfd); pjdlog_prefix_set("%s", ""); return (ec); } static void control_create(int argc, char *argv[], intmax_t mediasize, intmax_t extentsize, intmax_t keepdirty) { struct hast_resource *res; int ec, ii, ret; /* Initialize the given resources. */ if (argc < 1) usage(); ec = 0; for (ii = 0; ii < argc; ii++) { TAILQ_FOREACH(res, &cfg->hc_resources, hr_next) { if (strcmp(argv[ii], res->hr_name) == 0) break; } if (res == NULL) { pjdlog_error("Unknown resource %s.", argv[ii]); if (ec == 0) ec = EX_DATAERR; continue; } ret = create_one(res, mediasize, extentsize, keepdirty); if (ret != 0 && ec == 0) ec = ret; } exit(ec); } static int dump_one(struct hast_resource *res) { int ret; ret = metadata_read(res, false); if (ret != 0) return (ret); printf("resource: %s\n", res->hr_name); printf(" datasize: %ju (%NB)\n", (uintmax_t)res->hr_datasize, (intmax_t)res->hr_datasize); printf(" extentsize: %d (%NB)\n", res->hr_extentsize, (intmax_t)res->hr_extentsize); printf(" keepdirty: %d\n", res->hr_keepdirty); printf(" localoff: %ju\n", (uintmax_t)res->hr_localoff); printf(" resuid: %ju\n", (uintmax_t)res->hr_resuid); printf(" localcnt: %ju\n", (uintmax_t)res->hr_primary_localcnt); printf(" remotecnt: %ju\n", (uintmax_t)res->hr_primary_remotecnt); printf(" prevrole: %s\n", role2str(res->hr_previous_role)); return (0); } static void control_dump(int argc, char *argv[]) { struct hast_resource *res; int ec, ret; /* Dump metadata of the given resource(s). */ ec = 0; if (argc == 0 || (argc == 1 && strcmp(argv[0], "all") == 0)) { TAILQ_FOREACH(res, &cfg->hc_resources, hr_next) { ret = dump_one(res); if (ret != 0 && ec == 0) ec = ret; } } else { int ii; for (ii = 0; ii < argc; ii++) { TAILQ_FOREACH(res, &cfg->hc_resources, hr_next) { if (strcmp(argv[ii], res->hr_name) == 0) break; } if (res == NULL) { pjdlog_error("Unknown resource %s.", argv[ii]); if (ec == 0) ec = EX_DATAERR; continue; } ret = dump_one(res); if (ret != 0 && ec == 0) ec = ret; } } exit(ec); } static int control_set_role(struct nv *nv, const char *newrole) { const char *res, *oldrole; unsigned int ii; int error, ret; ret = 0; for (ii = 0; ; ii++) { res = nv_get_string(nv, "resource%u", ii); if (res == NULL) break; pjdlog_prefix_set("[%s] ", res); error = nv_get_int16(nv, "error%u", ii); if (error != 0) { if (ret == 0) ret = error; pjdlog_warning("Received error %d from hastd.", error); continue; } oldrole = nv_get_string(nv, "role%u", ii); if (strcmp(oldrole, newrole) == 0) pjdlog_debug(2, "Role unchanged (%s).", oldrole); else { pjdlog_debug(1, "Role changed from %s to %s.", oldrole, newrole); } } pjdlog_prefix_set("%s", ""); return (ret); } static int control_status(struct nv *nv) { unsigned int ii; const char *str; int error, ret; ret = 0; for (ii = 0; ; ii++) { str = nv_get_string(nv, "resource%u", ii); if (str == NULL) break; printf("%s:\n", str); error = nv_get_int16(nv, "error%u", ii); if (error != 0) { if (ret == 0) ret = error; printf(" error: %d\n", error); continue; } printf(" role: %s\n", nv_get_string(nv, "role%u", ii)); printf(" provname: %s\n", nv_get_string(nv, "provname%u", ii)); printf(" localpath: %s\n", nv_get_string(nv, "localpath%u", ii)); printf(" extentsize: %u (%NB)\n", (unsigned int)nv_get_uint32(nv, "extentsize%u", ii), (intmax_t)nv_get_uint32(nv, "extentsize%u", ii)); printf(" keepdirty: %u\n", (unsigned int)nv_get_uint32(nv, "keepdirty%u", ii)); printf(" remoteaddr: %s\n", nv_get_string(nv, "remoteaddr%u", ii)); str = nv_get_string(nv, "sourceaddr%u", ii); if (str != NULL) printf(" sourceaddr: %s\n", str); printf(" replication: %s\n", nv_get_string(nv, "replication%u", ii)); str = nv_get_string(nv, "status%u", ii); if (str != NULL) printf(" status: %s\n", str); printf(" dirty: %ju (%NB)\n", (uintmax_t)nv_get_uint64(nv, "dirty%u", ii), (intmax_t)nv_get_uint64(nv, "dirty%u", ii)); } return (ret); } int main(int argc, char *argv[]) { struct nv *nv; int64_t mediasize, extentsize, keepdirty; int cmd, debug, error, ii; const char *optstr; debug = 0; mediasize = extentsize = keepdirty = 0; if (argc == 1) usage(); if (strcmp(argv[1], "create") == 0) { cmd = CMD_CREATE; optstr = "c:de:k:m:h"; } else if (strcmp(argv[1], "role") == 0) { cmd = CMD_ROLE; optstr = "c:dh"; } else if (strcmp(argv[1], "status") == 0) { cmd = CMD_STATUS; optstr = "c:dh"; } else if (strcmp(argv[1], "dump") == 0) { cmd = CMD_DUMP; optstr = "c:dh"; } else usage(); argc--; argv++; for (;;) { int ch; ch = getopt(argc, argv, optstr); if (ch == -1) break; switch (ch) { case 'c': cfgpath = optarg; break; case 'd': debug++; break; case 'e': if (expand_number(optarg, &extentsize) < 0) err(1, "Invalid extentsize"); break; case 'k': if (expand_number(optarg, &keepdirty) < 0) err(1, "Invalid keepdirty"); break; case 'm': if (expand_number(optarg, &mediasize) < 0) err(1, "Invalid mediasize"); break; case 'h': default: usage(); } } argc -= optind; argv += optind; switch (cmd) { case CMD_CREATE: case CMD_ROLE: if (argc == 0) usage(); break; } pjdlog_init(PJDLOG_MODE_STD); pjdlog_debug_set(debug); cfg = yy_config_parse(cfgpath, true); assert(cfg != NULL); switch (cmd) { case CMD_CREATE: control_create(argc, argv, mediasize, extentsize, keepdirty); /* NOTREACHED */ assert(!"What are we doing here?!"); break; case CMD_DUMP: /* Dump metadata from local component of the given resource. */ control_dump(argc, argv); /* NOTREACHED */ assert(!"What are we doing here?!"); break; case CMD_ROLE: /* Change role for the given resources. */ if (argc < 2) usage(); nv = nv_alloc(); nv_add_uint8(nv, HASTCTL_CMD_SETROLE, "cmd"); if (strcmp(argv[0], "init") == 0) nv_add_uint8(nv, HAST_ROLE_INIT, "role"); else if (strcmp(argv[0], "primary") == 0) nv_add_uint8(nv, HAST_ROLE_PRIMARY, "role"); else if (strcmp(argv[0], "secondary") == 0) nv_add_uint8(nv, HAST_ROLE_SECONDARY, "role"); else usage(); for (ii = 0; ii < argc - 1; ii++) nv_add_string(nv, argv[ii + 1], "resource%d", ii); break; case CMD_STATUS: /* Obtain status of the given resources. */ nv = nv_alloc(); nv_add_uint8(nv, HASTCTL_CMD_STATUS, "cmd"); if (argc == 0) nv_add_string(nv, "all", "resource%d", 0); else { for (ii = 0; ii < argc; ii++) nv_add_string(nv, argv[ii], "resource%d", ii); } break; default: - assert(!"Impossible role!"); + assert(!"Impossible command!"); } /* Setup control connection... */ if (proto_client(NULL, cfg->hc_controladdr, &controlconn) < 0) { pjdlog_exit(EX_OSERR, "Unable to setup control connection to %s", cfg->hc_controladdr); } /* ...and connect to hastd. */ if (proto_connect(controlconn, HAST_TIMEOUT) < 0) { pjdlog_exit(EX_OSERR, "Unable to connect to hastd via %s", cfg->hc_controladdr); } if (drop_privs() != 0) exit(EX_CONFIG); pjdlog_debug(1, "Privileges successfully dropped."); /* Send the command to the server... */ if (hast_proto_send(NULL, controlconn, nv, NULL, 0) < 0) { pjdlog_exit(EX_UNAVAILABLE, "Unable to send command to hastd via %s", cfg->hc_controladdr); } nv_free(nv); /* ...and receive reply. */ if (hast_proto_recv_hdr(controlconn, &nv) < 0) { pjdlog_exit(EX_UNAVAILABLE, "cannot receive reply from hastd via %s", cfg->hc_controladdr); } error = nv_get_int16(nv, "error"); if (error != 0) { pjdlog_exitx(EX_SOFTWARE, "Error %d received from hastd.", error); } nv_set_error(nv, 0); switch (cmd) { case CMD_ROLE: error = control_set_role(nv, argv[0]); break; case CMD_STATUS: error = control_status(nv); break; default: - assert(!"Impossible role!"); + assert(!"Impossible command!"); } exit(error); } Index: stable/8/sbin/hastctl =================================================================== --- stable/8/sbin/hastctl (revision 221506) +++ stable/8/sbin/hastctl (revision 221507) Property changes on: stable/8/sbin/hastctl ___________________________________________________________________ Modified: svn:mergeinfo ## -0,0 +0,1 ## Merged /head/sbin/hastctl:r221074 Index: stable/8/sbin/hastd/control.c =================================================================== --- stable/8/sbin/hastd/control.c (revision 221506) +++ stable/8/sbin/hastd/control.c (revision 221507) @@ -1,479 +1,479 @@ /*- * Copyright (c) 2009-2010 The FreeBSD Foundation * All rights reserved. * * This software was developed by Pawel Jakub Dawidek under sponsorship from * the FreeBSD Foundation. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ #include __FBSDID("$FreeBSD$"); #include #include #include #include #include #include #include #include #include #include "hast.h" #include "hastd.h" #include "hast_checksum.h" #include "hast_compression.h" #include "hast_proto.h" #include "hooks.h" #include "nv.h" #include "pjdlog.h" #include "proto.h" #include "subr.h" #include "control.h" void child_cleanup(struct hast_resource *res) { proto_close(res->hr_ctrl); res->hr_ctrl = NULL; if (res->hr_event != NULL) { proto_close(res->hr_event); res->hr_event = NULL; } if (res->hr_conn != NULL) { proto_close(res->hr_conn); res->hr_conn = NULL; } res->hr_workerpid = 0; } static void control_set_role_common(struct hastd_config *cfg, struct nv *nvout, uint8_t role, struct hast_resource *res, const char *name, unsigned int no) { int oldrole; /* Name is always needed. */ if (name != NULL) nv_add_string(nvout, name, "resource%u", no); if (res == NULL) { assert(cfg != NULL); assert(name != NULL); TAILQ_FOREACH(res, &cfg->hc_resources, hr_next) { if (strcmp(res->hr_name, name) == 0) break; } if (res == NULL) { nv_add_int16(nvout, EHAST_NOENTRY, "error%u", no); return; } } assert(res != NULL); /* Send previous role back. */ nv_add_string(nvout, role2str(res->hr_role), "role%u", no); /* Nothing changed, return here. */ if (role == res->hr_role) return; pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); pjdlog_info("Role changed to %s.", role2str(role)); /* Change role to the new one. */ oldrole = res->hr_role; res->hr_role = role; pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); /* * If previous role was primary or secondary we have to kill process * doing that work. */ if (res->hr_workerpid != 0) { if (kill(res->hr_workerpid, SIGTERM) < 0) { pjdlog_errno(LOG_WARNING, "Unable to kill worker process %u", (unsigned int)res->hr_workerpid); } else if (waitpid(res->hr_workerpid, NULL, 0) != res->hr_workerpid) { pjdlog_errno(LOG_WARNING, "Error while waiting for worker process %u", (unsigned int)res->hr_workerpid); } else { pjdlog_debug(1, "Worker process %u stopped.", (unsigned int)res->hr_workerpid); } child_cleanup(res); } /* Start worker process if we are changing to primary. */ if (role == HAST_ROLE_PRIMARY) hastd_primary(res); pjdlog_prefix_set("%s", ""); hook_exec(res->hr_exec, "role", res->hr_name, role2str(oldrole), role2str(res->hr_role), NULL); } void control_set_role(struct hast_resource *res, uint8_t role) { control_set_role_common(NULL, NULL, role, res, NULL, 0); } static void control_status_worker(struct hast_resource *res, struct nv *nvout, unsigned int no) { struct nv *cnvin, *cnvout; const char *str; int error; cnvin = cnvout = NULL; error = 0; /* * Prepare and send command to worker process. */ cnvout = nv_alloc(); - nv_add_uint8(cnvout, HASTCTL_STATUS, "cmd"); + nv_add_uint8(cnvout, CONTROL_STATUS, "cmd"); error = nv_error(cnvout); if (error != 0) { pjdlog_common(LOG_ERR, 0, error, "Unable to prepare control header"); goto end; } if (hast_proto_send(res, res->hr_ctrl, cnvout, NULL, 0) < 0) { error = errno; pjdlog_errno(LOG_ERR, "Unable to send control header"); goto end; } /* * Receive response. */ if (hast_proto_recv_hdr(res->hr_ctrl, &cnvin) < 0) { error = errno; pjdlog_errno(LOG_ERR, "Unable to receive control header"); goto end; } error = nv_get_int16(cnvin, "error"); if (error != 0) goto end; if ((str = nv_get_string(cnvin, "status")) == NULL) { error = ENOENT; pjdlog_errno(LOG_ERR, "Field 'status' is missing."); goto end; } nv_add_string(nvout, str, "status%u", no); nv_add_uint64(nvout, nv_get_uint64(cnvin, "dirty"), "dirty%u", no); nv_add_uint32(nvout, nv_get_uint32(cnvin, "extentsize"), "extentsize%u", no); nv_add_uint32(nvout, nv_get_uint32(cnvin, "keepdirty"), "keepdirty%u", no); end: if (cnvin != NULL) nv_free(cnvin); if (cnvout != NULL) nv_free(cnvout); if (error != 0) nv_add_int16(nvout, error, "error"); } static void control_status(struct hastd_config *cfg, struct nv *nvout, struct hast_resource *res, const char *name, unsigned int no) { assert(cfg != NULL); assert(nvout != NULL); assert(name != NULL); /* Name is always needed. */ nv_add_string(nvout, name, "resource%u", no); if (res == NULL) { TAILQ_FOREACH(res, &cfg->hc_resources, hr_next) { if (strcmp(res->hr_name, name) == 0) break; } if (res == NULL) { nv_add_int16(nvout, EHAST_NOENTRY, "error%u", no); return; } } assert(res != NULL); nv_add_string(nvout, res->hr_provname, "provname%u", no); nv_add_string(nvout, res->hr_localpath, "localpath%u", no); nv_add_string(nvout, res->hr_remoteaddr, "remoteaddr%u", no); if (res->hr_sourceaddr[0] != '\0') nv_add_string(nvout, res->hr_sourceaddr, "sourceaddr%u", no); switch (res->hr_replication) { case HAST_REPLICATION_FULLSYNC: nv_add_string(nvout, "fullsync", "replication%u", no); break; case HAST_REPLICATION_MEMSYNC: nv_add_string(nvout, "memsync", "replication%u", no); break; case HAST_REPLICATION_ASYNC: nv_add_string(nvout, "async", "replication%u", no); break; default: nv_add_string(nvout, "unknown", "replication%u", no); break; } nv_add_string(nvout, checksum_name(res->hr_checksum), "checksum%u", no); nv_add_string(nvout, compression_name(res->hr_compression), "compression%u", no); nv_add_string(nvout, role2str(res->hr_role), "role%u", no); switch (res->hr_role) { case HAST_ROLE_PRIMARY: assert(res->hr_workerpid != 0); /* FALLTHROUGH */ case HAST_ROLE_SECONDARY: if (res->hr_workerpid != 0) break; /* FALLTHROUGH */ default: return; } /* * If we are here, it means that we have a worker process, which we * want to ask some questions. */ control_status_worker(res, nvout, no); } void control_handle(struct hastd_config *cfg) { struct proto_conn *conn; struct nv *nvin, *nvout; unsigned int ii; const char *str; uint8_t cmd, role; int error; if (proto_accept(cfg->hc_controlconn, &conn) < 0) { pjdlog_errno(LOG_ERR, "Unable to accept control connection"); return; } cfg->hc_controlin = conn; nvin = nvout = NULL; role = HAST_ROLE_UNDEF; if (hast_proto_recv_hdr(conn, &nvin) < 0) { pjdlog_errno(LOG_ERR, "Unable to receive control header"); nvin = NULL; goto close; } /* Obtain command code. 0 means that nv_get_uint8() failed. */ cmd = nv_get_uint8(nvin, "cmd"); if (cmd == 0) { pjdlog_error("Control header is missing 'cmd' field."); error = EHAST_INVALID; goto close; } /* Allocate outgoing nv structure. */ nvout = nv_alloc(); if (nvout == NULL) { pjdlog_error("Unable to allocate header for control response."); error = EHAST_NOMEMORY; goto close; } error = 0; str = nv_get_string(nvin, "resource0"); if (str == NULL) { pjdlog_error("Control header is missing 'resource0' field."); error = EHAST_INVALID; goto fail; } - if (cmd == HASTCTL_SET_ROLE) { + if (cmd == HASTCTL_CMD_SETROLE) { role = nv_get_uint8(nvin, "role"); switch (role) { case HAST_ROLE_INIT: case HAST_ROLE_PRIMARY: case HAST_ROLE_SECONDARY: break; default: pjdlog_error("Invalid role received (%hhu).", role); error = EHAST_INVALID; goto fail; } } if (strcmp(str, "all") == 0) { struct hast_resource *res; /* All configured resources. */ ii = 0; TAILQ_FOREACH(res, &cfg->hc_resources, hr_next) { switch (cmd) { - case HASTCTL_SET_ROLE: + case HASTCTL_CMD_SETROLE: control_set_role_common(cfg, nvout, role, res, res->hr_name, ii++); break; - case HASTCTL_STATUS: + case HASTCTL_CMD_STATUS: control_status(cfg, nvout, res, res->hr_name, ii++); break; default: pjdlog_error("Invalid command received (%hhu).", cmd); error = EHAST_UNIMPLEMENTED; goto fail; } } } else { /* Only selected resources. */ for (ii = 0; ; ii++) { str = nv_get_string(nvin, "resource%u", ii); if (str == NULL) break; switch (cmd) { - case HASTCTL_SET_ROLE: + case HASTCTL_CMD_SETROLE: control_set_role_common(cfg, nvout, role, NULL, str, ii); break; - case HASTCTL_STATUS: + case HASTCTL_CMD_STATUS: control_status(cfg, nvout, NULL, str, ii); break; default: pjdlog_error("Invalid command received (%hhu).", cmd); error = EHAST_UNIMPLEMENTED; goto fail; } } } if (nv_error(nvout) != 0) goto close; fail: if (error != 0) nv_add_int16(nvout, error, "error"); if (hast_proto_send(NULL, conn, nvout, NULL, 0) < 0) pjdlog_errno(LOG_ERR, "Unable to send control response"); close: if (nvin != NULL) nv_free(nvin); if (nvout != NULL) nv_free(nvout); proto_close(conn); cfg->hc_controlin = NULL; } /* * Thread handles control requests from the parent. */ void * ctrl_thread(void *arg) { struct hast_resource *res = arg; struct nv *nvin, *nvout; uint8_t cmd; for (;;) { if (hast_proto_recv_hdr(res->hr_ctrl, &nvin) < 0) { if (sigexit_received) pthread_exit(NULL); pjdlog_errno(LOG_ERR, "Unable to receive control message"); kill(getpid(), SIGTERM); pthread_exit(NULL); } cmd = nv_get_uint8(nvin, "cmd"); if (cmd == 0) { pjdlog_error("Control message is missing 'cmd' field."); nv_free(nvin); continue; } nvout = nv_alloc(); switch (cmd) { - case HASTCTL_STATUS: + case CONTROL_STATUS: if (res->hr_remotein != NULL && res->hr_remoteout != NULL) { nv_add_string(nvout, "complete", "status"); } else { nv_add_string(nvout, "degraded", "status"); } nv_add_uint32(nvout, (uint32_t)res->hr_extentsize, "extentsize"); if (res->hr_role == HAST_ROLE_PRIMARY) { nv_add_uint32(nvout, (uint32_t)res->hr_keepdirty, "keepdirty"); nv_add_uint64(nvout, (uint64_t)(activemap_ndirty(res->hr_amp) * res->hr_extentsize), "dirty"); } else { nv_add_uint32(nvout, (uint32_t)0, "keepdirty"); nv_add_uint64(nvout, (uint64_t)0, "dirty"); } nv_add_int16(nvout, 0, "error"); break; - case HASTCTL_RELOAD: + case CONTROL_RELOAD: /* * When parent receives SIGHUP and discovers that * something related to us has changes, it sends reload * message to us. */ assert(res->hr_role == HAST_ROLE_PRIMARY); primary_config_reload(res, nvin); nv_add_int16(nvout, 0, "error"); break; default: nv_add_int16(nvout, EINVAL, "error"); break; } nv_free(nvin); if (nv_error(nvout) != 0) { pjdlog_error("Unable to create answer on control message."); nv_free(nvout); continue; } if (hast_proto_send(NULL, res->hr_ctrl, nvout, NULL, 0) < 0) { pjdlog_errno(LOG_ERR, "Unable to send reply to control message"); } nv_free(nvout); } /* NOTREACHED */ return (NULL); } Index: stable/8/sbin/hastd/control.h =================================================================== --- stable/8/sbin/hastd/control.h (revision 221506) +++ stable/8/sbin/hastd/control.h (revision 221507) @@ -1,50 +1,49 @@ /*- * Copyright (c) 2009-2010 The FreeBSD Foundation * All rights reserved. * * This software was developed by Pawel Jakub Dawidek under sponsorship from * the FreeBSD Foundation. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * * $FreeBSD$ */ #ifndef _CONTROL_H_ #define _CONTROL_H_ -#define HASTCTL_SET_ROLE 1 -#define HASTCTL_STATUS 2 -#define HASTCTL_RELOAD 3 +#define CONTROL_STATUS 10 +#define CONTROL_RELOAD 11 struct hastd_config; struct hast_resource; void child_cleanup(struct hast_resource *res); void control_set_role(struct hast_resource *res, uint8_t role); void control_handle(struct hastd_config *cfg); void *ctrl_thread(void *arg); #endif /* !_CONTROL_H_ */ Index: stable/8/sbin/hastd/hast_proto.c =================================================================== --- stable/8/sbin/hastd/hast_proto.c (revision 221506) +++ stable/8/sbin/hastd/hast_proto.c (revision 221507) @@ -1,221 +1,223 @@ /*- * Copyright (c) 2009-2010 The FreeBSD Foundation * Copyright (c) 2011 Pawel Jakub Dawidek * All rights reserved. * * This software was developed by Pawel Jakub Dawidek under sponsorship from * the FreeBSD Foundation. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ #include __FBSDID("$FreeBSD$"); #include #include #include #include #include #include #include #include #include #ifdef HAVE_CRYPTO #include "hast_checksum.h" #endif #include "hast_compression.h" #include "hast_proto.h" struct hast_main_header { /* Protocol version. */ uint8_t version; /* Size of nv headers. */ uint32_t size; } __packed; typedef int hps_send_t(const struct hast_resource *, struct nv *nv, void **, size_t *, bool *); typedef int hps_recv_t(const struct hast_resource *, struct nv *nv, void **, size_t *, bool *); struct hast_pipe_stage { const char *hps_name; hps_send_t *hps_send; hps_recv_t *hps_recv; }; static struct hast_pipe_stage pipeline[] = { { "compression", compression_send, compression_recv }, +#ifdef HAVE_CRYPTO { "checksum", checksum_send, checksum_recv } +#endif }; /* * Send the given nv structure via conn. * We keep headers in nv structure and pass data in separate argument. * There can be no data at all (data is NULL then). */ int hast_proto_send(const struct hast_resource *res, struct proto_conn *conn, struct nv *nv, const void *data, size_t size) { struct hast_main_header hdr; struct ebuf *eb; bool freedata; void *dptr, *hptr; size_t hsize; int ret; dptr = (void *)(uintptr_t)data; freedata = false; ret = -1; if (data != NULL) { unsigned int ii; for (ii = 0; ii < sizeof(pipeline) / sizeof(pipeline[0]); ii++) { (void)pipeline[ii].hps_send(res, nv, &dptr, &size, &freedata); } nv_add_uint32(nv, size, "size"); if (nv_error(nv) != 0) { errno = nv_error(nv); goto end; } } eb = nv_hton(nv); if (eb == NULL) goto end; hdr.version = HAST_PROTO_VERSION; hdr.size = htole32((uint32_t)ebuf_size(eb)); if (ebuf_add_head(eb, &hdr, sizeof(hdr)) < 0) goto end; hptr = ebuf_data(eb, &hsize); if (proto_send(conn, hptr, hsize) < 0) goto end; if (data != NULL && proto_send(conn, dptr, size) < 0) goto end; ret = 0; end: if (freedata) free(dptr); return (ret); } int hast_proto_recv_hdr(const struct proto_conn *conn, struct nv **nvp) { struct hast_main_header hdr; struct nv *nv; struct ebuf *eb; void *hptr; eb = NULL; nv = NULL; if (proto_recv(conn, &hdr, sizeof(hdr)) < 0) goto fail; if (hdr.version != HAST_PROTO_VERSION) { errno = ERPCMISMATCH; goto fail; } hdr.size = le32toh(hdr.size); eb = ebuf_alloc(hdr.size); if (eb == NULL) goto fail; if (ebuf_add_tail(eb, NULL, hdr.size) < 0) goto fail; hptr = ebuf_data(eb, NULL); assert(hptr != NULL); if (proto_recv(conn, hptr, hdr.size) < 0) goto fail; nv = nv_ntoh(eb); if (nv == NULL) goto fail; *nvp = nv; return (0); fail: if (eb != NULL) ebuf_free(eb); return (-1); } int hast_proto_recv_data(const struct hast_resource *res, struct proto_conn *conn, struct nv *nv, void *data, size_t size) { unsigned int ii; bool freedata; size_t dsize; void *dptr; int ret; assert(data != NULL); assert(size > 0); ret = -1; freedata = false; dptr = data; dsize = nv_get_uint32(nv, "size"); if (dsize > size) { errno = EINVAL; goto end; } else if (dsize == 0) { (void)nv_set_error(nv, 0); } else { if (proto_recv(conn, data, dsize) < 0) goto end; for (ii = sizeof(pipeline) / sizeof(pipeline[0]); ii > 0; ii--) { ret = pipeline[ii - 1].hps_recv(res, nv, &dptr, &dsize, &freedata); if (ret == -1) goto end; } ret = -1; if (dsize > size) { errno = EINVAL; goto end; } if (dptr != data) bcopy(dptr, data, dsize); } ret = 0; end: if (freedata) free(dptr); return (ret); } Index: stable/8/sbin/hastd/hastd.c =================================================================== --- stable/8/sbin/hastd/hastd.c (revision 221506) +++ stable/8/sbin/hastd/hastd.c (revision 221507) @@ -1,1166 +1,1166 @@ /*- * Copyright (c) 2009-2010 The FreeBSD Foundation * Copyright (c) 2010-2011 Pawel Jakub Dawidek * All rights reserved. * * This software was developed by Pawel Jakub Dawidek under sponsorship from * the FreeBSD Foundation. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ #include __FBSDID("$FreeBSD$"); #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "control.h" #include "event.h" #include "hast.h" #include "hast_proto.h" #include "hastd.h" #include "hooks.h" #include "subr.h" /* Path to configuration file. */ const char *cfgpath = HAST_CONFIG; /* Hastd configuration. */ static struct hastd_config *cfg; /* Was SIGINT or SIGTERM signal received? */ bool sigexit_received = false; /* PID file handle. */ struct pidfh *pfh; /* How often check for hooks running for too long. */ #define REPORT_INTERVAL 5 static void usage(void) { errx(EX_USAGE, "[-dFh] [-c config] [-P pidfile]"); } static void g_gate_load(void) { if (modfind("g_gate") == -1) { /* Not present in kernel, try loading it. */ if (kldload("geom_gate") == -1 || modfind("g_gate") == -1) { if (errno != EEXIST) { pjdlog_exit(EX_OSERR, "Unable to load geom_gate module"); } } } } void descriptors_cleanup(struct hast_resource *res) { struct hast_resource *tres; TAILQ_FOREACH(tres, &cfg->hc_resources, hr_next) { if (tres == res) { PJDLOG_VERIFY(res->hr_role == HAST_ROLE_SECONDARY || (res->hr_remotein == NULL && res->hr_remoteout == NULL)); continue; } if (tres->hr_remotein != NULL) proto_close(tres->hr_remotein); if (tres->hr_remoteout != NULL) proto_close(tres->hr_remoteout); if (tres->hr_ctrl != NULL) proto_close(tres->hr_ctrl); if (tres->hr_event != NULL) proto_close(tres->hr_event); if (tres->hr_conn != NULL) proto_close(tres->hr_conn); } if (cfg->hc_controlin != NULL) proto_close(cfg->hc_controlin); proto_close(cfg->hc_controlconn); proto_close(cfg->hc_listenconn); (void)pidfile_close(pfh); hook_fini(); pjdlog_fini(); } static const char * dtype2str(mode_t mode) { if (S_ISBLK(mode)) return ("block device"); else if (S_ISCHR(mode)) return ("character device"); else if (S_ISDIR(mode)) return ("directory"); else if (S_ISFIFO(mode)) return ("pipe or FIFO"); else if (S_ISLNK(mode)) return ("symbolic link"); else if (S_ISREG(mode)) return ("regular file"); else if (S_ISSOCK(mode)) return ("socket"); else if (S_ISWHT(mode)) return ("whiteout"); else return ("unknown"); } void descriptors_assert(const struct hast_resource *res, int pjdlogmode) { char msg[256]; struct stat sb; long maxfd; bool isopen; mode_t mode; int fd; /* * At this point descriptor to syslog socket is closed, so if we want * to log assertion message, we have to first store it in 'msg' local * buffer and then open syslog socket and log it. */ msg[0] = '\0'; maxfd = sysconf(_SC_OPEN_MAX); if (maxfd < 0) { pjdlog_init(pjdlogmode); pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); pjdlog_errno(LOG_WARNING, "sysconf(_SC_OPEN_MAX) failed"); pjdlog_fini(); maxfd = 16384; } for (fd = 0; fd <= maxfd; fd++) { if (fstat(fd, &sb) == 0) { isopen = true; mode = sb.st_mode; } else if (errno == EBADF) { isopen = false; mode = 0; } else { (void)snprintf(msg, sizeof(msg), "Unable to fstat descriptor %d: %s", fd, strerror(errno)); break; } if (fd == STDIN_FILENO || fd == STDOUT_FILENO || fd == STDERR_FILENO) { if (!isopen) { (void)snprintf(msg, sizeof(msg), "Descriptor %d (%s) is closed, but should be open.", fd, (fd == STDIN_FILENO ? "stdin" : (fd == STDOUT_FILENO ? "stdout" : "stderr"))); break; } } else if (fd == proto_descriptor(res->hr_event)) { if (!isopen) { (void)snprintf(msg, sizeof(msg), "Descriptor %d (event) is closed, but should be open.", fd); break; } if (!S_ISSOCK(mode)) { (void)snprintf(msg, sizeof(msg), "Descriptor %d (event) is %s, but should be %s.", fd, dtype2str(mode), dtype2str(S_IFSOCK)); break; } } else if (fd == proto_descriptor(res->hr_ctrl)) { if (!isopen) { (void)snprintf(msg, sizeof(msg), "Descriptor %d (ctrl) is closed, but should be open.", fd); break; } if (!S_ISSOCK(mode)) { (void)snprintf(msg, sizeof(msg), "Descriptor %d (ctrl) is %s, but should be %s.", fd, dtype2str(mode), dtype2str(S_IFSOCK)); break; } } else if (res->hr_role == HAST_ROLE_PRIMARY && fd == proto_descriptor(res->hr_conn)) { if (!isopen) { (void)snprintf(msg, sizeof(msg), "Descriptor %d (conn) is closed, but should be open.", fd); break; } if (!S_ISSOCK(mode)) { (void)snprintf(msg, sizeof(msg), "Descriptor %d (conn) is %s, but should be %s.", fd, dtype2str(mode), dtype2str(S_IFSOCK)); break; } } else if (res->hr_role == HAST_ROLE_SECONDARY && res->hr_conn != NULL && fd == proto_descriptor(res->hr_conn)) { if (isopen) { (void)snprintf(msg, sizeof(msg), "Descriptor %d (conn) is open, but should be closed.", fd); break; } } else if (res->hr_role == HAST_ROLE_SECONDARY && fd == proto_descriptor(res->hr_remotein)) { if (!isopen) { (void)snprintf(msg, sizeof(msg), "Descriptor %d (remote in) is closed, but should be open.", fd); break; } if (!S_ISSOCK(mode)) { (void)snprintf(msg, sizeof(msg), "Descriptor %d (remote in) is %s, but should be %s.", fd, dtype2str(mode), dtype2str(S_IFSOCK)); break; } } else if (res->hr_role == HAST_ROLE_SECONDARY && fd == proto_descriptor(res->hr_remoteout)) { if (!isopen) { (void)snprintf(msg, sizeof(msg), "Descriptor %d (remote out) is closed, but should be open.", fd); break; } if (!S_ISSOCK(mode)) { (void)snprintf(msg, sizeof(msg), "Descriptor %d (remote out) is %s, but should be %s.", fd, dtype2str(mode), dtype2str(S_IFSOCK)); break; } } else { if (isopen) { (void)snprintf(msg, sizeof(msg), "Descriptor %d is open (%s), but should be closed.", fd, dtype2str(mode)); break; } } } if (msg[0] != '\0') { pjdlog_init(pjdlogmode); pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); PJDLOG_ABORT("%s", msg); } } static void child_exit_log(unsigned int pid, int status) { if (WIFEXITED(status) && WEXITSTATUS(status) == 0) { pjdlog_debug(1, "Worker process exited gracefully (pid=%u).", pid); } else if (WIFSIGNALED(status)) { pjdlog_error("Worker process killed (pid=%u, signal=%d).", pid, WTERMSIG(status)); } else { pjdlog_error("Worker process exited ungracefully (pid=%u, exitcode=%d).", pid, WIFEXITED(status) ? WEXITSTATUS(status) : -1); } } static void child_exit(void) { struct hast_resource *res; int status; pid_t pid; while ((pid = wait3(&status, WNOHANG, NULL)) > 0) { /* Find resource related to the process that just exited. */ TAILQ_FOREACH(res, &cfg->hc_resources, hr_next) { if (pid == res->hr_workerpid) break; } if (res == NULL) { /* * This can happen when new connection arrives and we * cancel child responsible for the old one or if this * was hook which we executed. */ hook_check_one(pid, status); continue; } pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); child_exit_log(pid, status); child_cleanup(res); if (res->hr_role == HAST_ROLE_PRIMARY) { /* * Restart child process if it was killed by signal * or exited because of temporary problem. */ if (WIFSIGNALED(status) || (WIFEXITED(status) && WEXITSTATUS(status) == EX_TEMPFAIL)) { sleep(1); pjdlog_info("Restarting worker process."); hastd_primary(res); } else { res->hr_role = HAST_ROLE_INIT; pjdlog_info("Changing resource role back to %s.", role2str(res->hr_role)); } } pjdlog_prefix_set("%s", ""); } } static bool resource_needs_restart(const struct hast_resource *res0, const struct hast_resource *res1) { PJDLOG_ASSERT(strcmp(res0->hr_name, res1->hr_name) == 0); if (strcmp(res0->hr_provname, res1->hr_provname) != 0) return (true); if (strcmp(res0->hr_localpath, res1->hr_localpath) != 0) return (true); if (res0->hr_role == HAST_ROLE_INIT || res0->hr_role == HAST_ROLE_SECONDARY) { if (strcmp(res0->hr_remoteaddr, res1->hr_remoteaddr) != 0) return (true); if (strcmp(res0->hr_sourceaddr, res1->hr_sourceaddr) != 0) return (true); if (res0->hr_replication != res1->hr_replication) return (true); if (res0->hr_checksum != res1->hr_checksum) return (true); if (res0->hr_compression != res1->hr_compression) return (true); if (res0->hr_timeout != res1->hr_timeout) return (true); if (strcmp(res0->hr_exec, res1->hr_exec) != 0) return (true); } return (false); } static bool resource_needs_reload(const struct hast_resource *res0, const struct hast_resource *res1) { PJDLOG_ASSERT(strcmp(res0->hr_name, res1->hr_name) == 0); PJDLOG_ASSERT(strcmp(res0->hr_provname, res1->hr_provname) == 0); PJDLOG_ASSERT(strcmp(res0->hr_localpath, res1->hr_localpath) == 0); if (res0->hr_role != HAST_ROLE_PRIMARY) return (false); if (strcmp(res0->hr_remoteaddr, res1->hr_remoteaddr) != 0) return (true); if (strcmp(res0->hr_sourceaddr, res1->hr_sourceaddr) != 0) return (true); if (res0->hr_replication != res1->hr_replication) return (true); if (res0->hr_checksum != res1->hr_checksum) return (true); if (res0->hr_compression != res1->hr_compression) return (true); if (res0->hr_timeout != res1->hr_timeout) return (true); if (strcmp(res0->hr_exec, res1->hr_exec) != 0) return (true); return (false); } static void resource_reload(const struct hast_resource *res) { struct nv *nvin, *nvout; int error; PJDLOG_ASSERT(res->hr_role == HAST_ROLE_PRIMARY); nvout = nv_alloc(); - nv_add_uint8(nvout, HASTCTL_RELOAD, "cmd"); + nv_add_uint8(nvout, CONTROL_RELOAD, "cmd"); nv_add_string(nvout, res->hr_remoteaddr, "remoteaddr"); nv_add_string(nvout, res->hr_sourceaddr, "sourceaddr"); nv_add_int32(nvout, (int32_t)res->hr_replication, "replication"); nv_add_int32(nvout, (int32_t)res->hr_checksum, "checksum"); nv_add_int32(nvout, (int32_t)res->hr_compression, "compression"); nv_add_int32(nvout, (int32_t)res->hr_timeout, "timeout"); nv_add_string(nvout, res->hr_exec, "exec"); if (nv_error(nvout) != 0) { nv_free(nvout); pjdlog_error("Unable to allocate header for reload message."); return; } if (hast_proto_send(res, res->hr_ctrl, nvout, NULL, 0) < 0) { pjdlog_errno(LOG_ERR, "Unable to send reload message"); nv_free(nvout); return; } nv_free(nvout); /* Receive response. */ if (hast_proto_recv_hdr(res->hr_ctrl, &nvin) < 0) { pjdlog_errno(LOG_ERR, "Unable to receive reload reply"); return; } error = nv_get_int16(nvin, "error"); nv_free(nvin); if (error != 0) { pjdlog_common(LOG_ERR, 0, error, "Reload failed"); return; } } static void hastd_reload(void) { struct hastd_config *newcfg; struct hast_resource *nres, *cres, *tres; uint8_t role; pjdlog_info("Reloading configuration..."); newcfg = yy_config_parse(cfgpath, false); if (newcfg == NULL) goto failed; /* * Check if control address has changed. */ if (strcmp(cfg->hc_controladdr, newcfg->hc_controladdr) != 0) { if (proto_server(newcfg->hc_controladdr, &newcfg->hc_controlconn) < 0) { pjdlog_errno(LOG_ERR, "Unable to listen on control address %s", newcfg->hc_controladdr); goto failed; } } /* * Check if listen address has changed. */ if (strcmp(cfg->hc_listenaddr, newcfg->hc_listenaddr) != 0) { if (proto_server(newcfg->hc_listenaddr, &newcfg->hc_listenconn) < 0) { pjdlog_errno(LOG_ERR, "Unable to listen on address %s", newcfg->hc_listenaddr); goto failed; } } /* * Only when both control and listen sockets are successfully * initialized switch them to new configuration. */ if (newcfg->hc_controlconn != NULL) { pjdlog_info("Control socket changed from %s to %s.", cfg->hc_controladdr, newcfg->hc_controladdr); proto_close(cfg->hc_controlconn); cfg->hc_controlconn = newcfg->hc_controlconn; newcfg->hc_controlconn = NULL; strlcpy(cfg->hc_controladdr, newcfg->hc_controladdr, sizeof(cfg->hc_controladdr)); } if (newcfg->hc_listenconn != NULL) { pjdlog_info("Listen socket changed from %s to %s.", cfg->hc_listenaddr, newcfg->hc_listenaddr); proto_close(cfg->hc_listenconn); cfg->hc_listenconn = newcfg->hc_listenconn; newcfg->hc_listenconn = NULL; strlcpy(cfg->hc_listenaddr, newcfg->hc_listenaddr, sizeof(cfg->hc_listenaddr)); } /* * Stop and remove resources that were removed from the configuration. */ TAILQ_FOREACH_SAFE(cres, &cfg->hc_resources, hr_next, tres) { TAILQ_FOREACH(nres, &newcfg->hc_resources, hr_next) { if (strcmp(cres->hr_name, nres->hr_name) == 0) break; } if (nres == NULL) { control_set_role(cres, HAST_ROLE_INIT); TAILQ_REMOVE(&cfg->hc_resources, cres, hr_next); pjdlog_info("Resource %s removed.", cres->hr_name); free(cres); } } /* * Move new resources to the current configuration. */ TAILQ_FOREACH_SAFE(nres, &newcfg->hc_resources, hr_next, tres) { TAILQ_FOREACH(cres, &cfg->hc_resources, hr_next) { if (strcmp(cres->hr_name, nres->hr_name) == 0) break; } if (cres == NULL) { TAILQ_REMOVE(&newcfg->hc_resources, nres, hr_next); TAILQ_INSERT_TAIL(&cfg->hc_resources, nres, hr_next); pjdlog_info("Resource %s added.", nres->hr_name); } } /* * Deal with modified resources. * Depending on what has changed exactly we might want to perform * different actions. * * We do full resource restart in the following situations: * Resource role is INIT or SECONDARY. * Resource role is PRIMARY and path to local component or provider * name has changed. * In case of PRIMARY, the worker process will be killed and restarted, * which also means removing /dev/hast/ provider and * recreating it. * * We do just reload (send SIGHUP to worker process) if we act as * PRIMARY, but only if remote address, replication mode, timeout or * execution path has changed. For those, there is no need to restart * worker process. * If PRIMARY receives SIGHUP, it will reconnect if remote address or * replication mode has changed or simply set new timeout if only * timeout has changed. */ TAILQ_FOREACH_SAFE(nres, &newcfg->hc_resources, hr_next, tres) { TAILQ_FOREACH(cres, &cfg->hc_resources, hr_next) { if (strcmp(cres->hr_name, nres->hr_name) == 0) break; } PJDLOG_ASSERT(cres != NULL); if (resource_needs_restart(cres, nres)) { pjdlog_info("Resource %s configuration was modified, restarting it.", cres->hr_name); role = cres->hr_role; control_set_role(cres, HAST_ROLE_INIT); TAILQ_REMOVE(&cfg->hc_resources, cres, hr_next); free(cres); TAILQ_REMOVE(&newcfg->hc_resources, nres, hr_next); TAILQ_INSERT_TAIL(&cfg->hc_resources, nres, hr_next); control_set_role(nres, role); } else if (resource_needs_reload(cres, nres)) { pjdlog_info("Resource %s configuration was modified, reloading it.", cres->hr_name); strlcpy(cres->hr_remoteaddr, nres->hr_remoteaddr, sizeof(cres->hr_remoteaddr)); strlcpy(cres->hr_sourceaddr, nres->hr_sourceaddr, sizeof(cres->hr_sourceaddr)); cres->hr_replication = nres->hr_replication; cres->hr_checksum = nres->hr_checksum; cres->hr_compression = nres->hr_compression; cres->hr_timeout = nres->hr_timeout; strlcpy(cres->hr_exec, nres->hr_exec, sizeof(cres->hr_exec)); if (cres->hr_workerpid != 0) resource_reload(cres); } } yy_config_free(newcfg); pjdlog_info("Configuration reloaded successfully."); return; failed: if (newcfg != NULL) { if (newcfg->hc_controlconn != NULL) proto_close(newcfg->hc_controlconn); if (newcfg->hc_listenconn != NULL) proto_close(newcfg->hc_listenconn); yy_config_free(newcfg); } pjdlog_warning("Configuration not reloaded."); } static void terminate_workers(void) { struct hast_resource *res; pjdlog_info("Termination signal received, exiting."); TAILQ_FOREACH(res, &cfg->hc_resources, hr_next) { if (res->hr_workerpid == 0) continue; pjdlog_info("Terminating worker process (resource=%s, role=%s, pid=%u).", res->hr_name, role2str(res->hr_role), res->hr_workerpid); if (kill(res->hr_workerpid, SIGTERM) == 0) continue; pjdlog_errno(LOG_WARNING, "Unable to send signal to worker process (resource=%s, role=%s, pid=%u).", res->hr_name, role2str(res->hr_role), res->hr_workerpid); } } static void listen_accept(void) { struct hast_resource *res; struct proto_conn *conn; struct nv *nvin, *nvout, *nverr; const char *resname; const unsigned char *token; char laddr[256], raddr[256]; size_t size; pid_t pid; int status; proto_local_address(cfg->hc_listenconn, laddr, sizeof(laddr)); pjdlog_debug(1, "Accepting connection to %s.", laddr); if (proto_accept(cfg->hc_listenconn, &conn) < 0) { pjdlog_errno(LOG_ERR, "Unable to accept connection %s", laddr); return; } proto_local_address(conn, laddr, sizeof(laddr)); proto_remote_address(conn, raddr, sizeof(raddr)); pjdlog_info("Connection from %s to %s.", raddr, laddr); /* Error in setting timeout is not critical, but why should it fail? */ if (proto_timeout(conn, HAST_TIMEOUT) < 0) pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); nvin = nvout = nverr = NULL; /* * Before receiving any data see if remote host have access to any * resource. */ TAILQ_FOREACH(res, &cfg->hc_resources, hr_next) { if (proto_address_match(conn, res->hr_remoteaddr)) break; } if (res == NULL) { pjdlog_error("Client %s isn't known.", raddr); goto close; } /* Ok, remote host can access at least one resource. */ if (hast_proto_recv_hdr(conn, &nvin) < 0) { pjdlog_errno(LOG_ERR, "Unable to receive header from %s", raddr); goto close; } resname = nv_get_string(nvin, "resource"); if (resname == NULL) { pjdlog_error("No 'resource' field in the header received from %s.", raddr); goto close; } pjdlog_debug(2, "%s: resource=%s", raddr, resname); token = nv_get_uint8_array(nvin, &size, "token"); /* * NULL token means that this is first conection. */ if (token != NULL && size != sizeof(res->hr_token)) { pjdlog_error("Received token of invalid size from %s (expected %zu, got %zu).", raddr, sizeof(res->hr_token), size); goto close; } /* * From now on we want to send errors to the remote node. */ nverr = nv_alloc(); /* Find resource related to this connection. */ TAILQ_FOREACH(res, &cfg->hc_resources, hr_next) { if (strcmp(resname, res->hr_name) == 0) break; } /* Have we found the resource? */ if (res == NULL) { pjdlog_error("No resource '%s' as requested by %s.", resname, raddr); nv_add_stringf(nverr, "errmsg", "Resource not configured."); goto fail; } /* Now that we know resource name setup log prefix. */ pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); /* Does the remote host have access to this resource? */ if (!proto_address_match(conn, res->hr_remoteaddr)) { pjdlog_error("Client %s has no access to the resource.", raddr); nv_add_stringf(nverr, "errmsg", "No access to the resource."); goto fail; } /* Is the resource marked as secondary? */ if (res->hr_role != HAST_ROLE_SECONDARY) { pjdlog_warning("We act as %s for the resource and not as %s as requested by %s.", role2str(res->hr_role), role2str(HAST_ROLE_SECONDARY), raddr); nv_add_stringf(nverr, "errmsg", "Remote node acts as %s for the resource and not as %s.", role2str(res->hr_role), role2str(HAST_ROLE_SECONDARY)); if (res->hr_role == HAST_ROLE_PRIMARY) { /* * If we act as primary request the other side to wait * for us a bit, as we might be finishing cleanups. */ nv_add_uint8(nverr, 1, "wait"); } goto fail; } /* Does token (if exists) match? */ if (token != NULL && memcmp(token, res->hr_token, sizeof(res->hr_token)) != 0) { pjdlog_error("Token received from %s doesn't match.", raddr); nv_add_stringf(nverr, "errmsg", "Token doesn't match."); goto fail; } /* * If there is no token, but we have half-open connection * (only remotein) or full connection (worker process is running) * we have to cancel those and accept the new connection. */ if (token == NULL) { PJDLOG_ASSERT(res->hr_remoteout == NULL); pjdlog_debug(1, "Initial connection from %s.", raddr); if (res->hr_workerpid != 0) { PJDLOG_ASSERT(res->hr_remotein == NULL); pjdlog_debug(1, "Worker process exists (pid=%u), stopping it.", (unsigned int)res->hr_workerpid); /* Stop child process. */ if (kill(res->hr_workerpid, SIGINT) < 0) { pjdlog_errno(LOG_ERR, "Unable to stop worker process (pid=%u)", (unsigned int)res->hr_workerpid); /* * Other than logging the problem we * ignore it - nothing smart to do. */ } /* Wait for it to exit. */ else if ((pid = waitpid(res->hr_workerpid, &status, 0)) != res->hr_workerpid) { /* We can only log the problem. */ pjdlog_errno(LOG_ERR, "Waiting for worker process (pid=%u) failed", (unsigned int)res->hr_workerpid); } else { child_exit_log(res->hr_workerpid, status); } child_cleanup(res); } else if (res->hr_remotein != NULL) { char oaddr[256]; proto_remote_address(res->hr_remotein, oaddr, sizeof(oaddr)); pjdlog_debug(1, "Canceling half-open connection from %s on connection from %s.", oaddr, raddr); proto_close(res->hr_remotein); res->hr_remotein = NULL; } } /* * Checks and cleanups are done. */ if (token == NULL) { arc4random_buf(res->hr_token, sizeof(res->hr_token)); nvout = nv_alloc(); nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), "token"); if (nv_error(nvout) != 0) { pjdlog_common(LOG_ERR, 0, nv_error(nvout), "Unable to prepare return header for %s", raddr); nv_add_stringf(nverr, "errmsg", "Remote node was unable to prepare return header: %s.", strerror(nv_error(nvout))); goto fail; } if (hast_proto_send(NULL, conn, nvout, NULL, 0) < 0) { int error = errno; pjdlog_errno(LOG_ERR, "Unable to send response to %s", raddr); nv_add_stringf(nverr, "errmsg", "Remote node was unable to send response: %s.", strerror(error)); goto fail; } res->hr_remotein = conn; pjdlog_debug(1, "Incoming connection from %s configured.", raddr); } else { res->hr_remoteout = conn; pjdlog_debug(1, "Outgoing connection to %s configured.", raddr); hastd_secondary(res, nvin); } nv_free(nvin); nv_free(nvout); nv_free(nverr); pjdlog_prefix_set("%s", ""); return; fail: if (nv_error(nverr) != 0) { pjdlog_common(LOG_ERR, 0, nv_error(nverr), "Unable to prepare error header for %s", raddr); goto close; } if (hast_proto_send(NULL, conn, nverr, NULL, 0) < 0) { pjdlog_errno(LOG_ERR, "Unable to send error to %s", raddr); goto close; } close: if (nvin != NULL) nv_free(nvin); if (nvout != NULL) nv_free(nvout); if (nverr != NULL) nv_free(nverr); proto_close(conn); pjdlog_prefix_set("%s", ""); } static void connection_migrate(struct hast_resource *res) { struct proto_conn *conn; int16_t val = 0; pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); PJDLOG_ASSERT(res->hr_role == HAST_ROLE_PRIMARY); if (proto_recv(res->hr_conn, &val, sizeof(val)) < 0) { pjdlog_errno(LOG_WARNING, "Unable to receive connection command"); return; } if (proto_client(res->hr_sourceaddr[0] != '\0' ? res->hr_sourceaddr : NULL, res->hr_remoteaddr, &conn) < 0) { val = errno; pjdlog_errno(LOG_WARNING, "Unable to create outgoing connection to %s", res->hr_remoteaddr); goto out; } if (proto_connect(conn, -1) < 0) { val = errno; pjdlog_errno(LOG_WARNING, "Unable to connect to %s", res->hr_remoteaddr); proto_close(conn); goto out; } val = 0; out: if (proto_send(res->hr_conn, &val, sizeof(val)) < 0) { pjdlog_errno(LOG_WARNING, "Unable to send reply to connection request"); } if (val == 0 && proto_connection_send(res->hr_conn, conn) < 0) pjdlog_errno(LOG_WARNING, "Unable to send connection"); pjdlog_prefix_set("%s", ""); } static void check_signals(void) { struct timespec sigtimeout; sigset_t mask; int signo; sigtimeout.tv_sec = 0; sigtimeout.tv_nsec = 0; PJDLOG_VERIFY(sigemptyset(&mask) == 0); PJDLOG_VERIFY(sigaddset(&mask, SIGHUP) == 0); PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); PJDLOG_VERIFY(sigaddset(&mask, SIGCHLD) == 0); while ((signo = sigtimedwait(&mask, NULL, &sigtimeout)) != -1) { switch (signo) { case SIGINT: case SIGTERM: sigexit_received = true; terminate_workers(); proto_close(cfg->hc_controlconn); exit(EX_OK); break; case SIGCHLD: child_exit(); break; case SIGHUP: hastd_reload(); break; default: PJDLOG_ABORT("Unexpected signal (%d).", signo); } } } static void main_loop(void) { struct hast_resource *res; struct timeval seltimeout; int fd, maxfd, ret; time_t lastcheck, now; fd_set rfds; lastcheck = time(NULL); seltimeout.tv_sec = REPORT_INTERVAL; seltimeout.tv_usec = 0; pjdlog_info("Started successfully, running protocol version %d.", HAST_PROTO_VERSION); for (;;) { check_signals(); /* Setup descriptors for select(2). */ FD_ZERO(&rfds); maxfd = fd = proto_descriptor(cfg->hc_controlconn); PJDLOG_ASSERT(fd >= 0); FD_SET(fd, &rfds); fd = proto_descriptor(cfg->hc_listenconn); PJDLOG_ASSERT(fd >= 0); FD_SET(fd, &rfds); maxfd = fd > maxfd ? fd : maxfd; TAILQ_FOREACH(res, &cfg->hc_resources, hr_next) { if (res->hr_event == NULL) continue; fd = proto_descriptor(res->hr_event); PJDLOG_ASSERT(fd >= 0); FD_SET(fd, &rfds); maxfd = fd > maxfd ? fd : maxfd; if (res->hr_role == HAST_ROLE_PRIMARY) { /* Only primary workers asks for connections. */ PJDLOG_ASSERT(res->hr_conn != NULL); fd = proto_descriptor(res->hr_conn); PJDLOG_ASSERT(fd >= 0); FD_SET(fd, &rfds); maxfd = fd > maxfd ? fd : maxfd; } else { PJDLOG_ASSERT(res->hr_conn == NULL); } } PJDLOG_ASSERT(maxfd + 1 <= (int)FD_SETSIZE); ret = select(maxfd + 1, &rfds, NULL, NULL, &seltimeout); now = time(NULL); if (lastcheck + REPORT_INTERVAL <= now) { hook_check(); lastcheck = now; } if (ret == 0) { /* * select(2) timed out, so there should be no * descriptors to check. */ continue; } else if (ret == -1) { if (errno == EINTR) continue; KEEP_ERRNO((void)pidfile_remove(pfh)); pjdlog_exit(EX_OSERR, "select() failed"); } /* * Check for signals before we do anything to update our * info about terminated workers in the meantime. */ check_signals(); if (FD_ISSET(proto_descriptor(cfg->hc_controlconn), &rfds)) control_handle(cfg); if (FD_ISSET(proto_descriptor(cfg->hc_listenconn), &rfds)) listen_accept(); TAILQ_FOREACH(res, &cfg->hc_resources, hr_next) { if (res->hr_event == NULL) continue; if (FD_ISSET(proto_descriptor(res->hr_event), &rfds)) { if (event_recv(res) == 0) continue; /* The worker process exited? */ proto_close(res->hr_event); res->hr_event = NULL; if (res->hr_conn != NULL) { proto_close(res->hr_conn); res->hr_conn = NULL; } continue; } if (res->hr_role == HAST_ROLE_PRIMARY) { PJDLOG_ASSERT(res->hr_conn != NULL); if (FD_ISSET(proto_descriptor(res->hr_conn), &rfds)) { connection_migrate(res); } } else { PJDLOG_ASSERT(res->hr_conn == NULL); } } } } static void dummy_sighandler(int sig __unused) { /* Nothing to do. */ } int main(int argc, char *argv[]) { const char *pidfile; pid_t otherpid; bool foreground; int debuglevel; sigset_t mask; foreground = false; debuglevel = 0; pidfile = HASTD_PIDFILE; for (;;) { int ch; ch = getopt(argc, argv, "c:dFhP:"); if (ch == -1) break; switch (ch) { case 'c': cfgpath = optarg; break; case 'd': debuglevel++; break; case 'F': foreground = true; break; case 'P': pidfile = optarg; break; case 'h': default: usage(); } } argc -= optind; argv += optind; pjdlog_init(PJDLOG_MODE_STD); pjdlog_debug_set(debuglevel); g_gate_load(); pfh = pidfile_open(pidfile, 0600, &otherpid); if (pfh == NULL) { if (errno == EEXIST) { pjdlog_exitx(EX_TEMPFAIL, "Another hastd is already running, pid: %jd.", (intmax_t)otherpid); } /* If we cannot create pidfile from other reasons, only warn. */ pjdlog_errno(LOG_WARNING, "Unable to open or create pidfile"); } cfg = yy_config_parse(cfgpath, true); PJDLOG_ASSERT(cfg != NULL); /* * Restore default actions for interesting signals in case parent * process (like init(8)) decided to ignore some of them (like SIGHUP). */ PJDLOG_VERIFY(signal(SIGHUP, SIG_DFL) != SIG_ERR); PJDLOG_VERIFY(signal(SIGINT, SIG_DFL) != SIG_ERR); PJDLOG_VERIFY(signal(SIGTERM, SIG_DFL) != SIG_ERR); /* * Because SIGCHLD is ignored by default, setup dummy handler for it, * so we can mask it. */ PJDLOG_VERIFY(signal(SIGCHLD, dummy_sighandler) != SIG_ERR); PJDLOG_VERIFY(sigemptyset(&mask) == 0); PJDLOG_VERIFY(sigaddset(&mask, SIGHUP) == 0); PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); PJDLOG_VERIFY(sigaddset(&mask, SIGCHLD) == 0); PJDLOG_VERIFY(sigprocmask(SIG_SETMASK, &mask, NULL) == 0); /* Listen on control address. */ if (proto_server(cfg->hc_controladdr, &cfg->hc_controlconn) < 0) { KEEP_ERRNO((void)pidfile_remove(pfh)); pjdlog_exit(EX_OSERR, "Unable to listen on control address %s", cfg->hc_controladdr); } /* Listen for remote connections. */ if (proto_server(cfg->hc_listenaddr, &cfg->hc_listenconn) < 0) { KEEP_ERRNO((void)pidfile_remove(pfh)); pjdlog_exit(EX_OSERR, "Unable to listen on address %s", cfg->hc_listenaddr); } if (!foreground) { if (daemon(0, 0) < 0) { KEEP_ERRNO((void)pidfile_remove(pfh)); pjdlog_exit(EX_OSERR, "Unable to daemonize"); } /* Start logging to syslog. */ pjdlog_mode_set(PJDLOG_MODE_SYSLOG); /* Write PID to a file. */ if (pidfile_write(pfh) < 0) { pjdlog_errno(LOG_WARNING, "Unable to write PID to a file"); } } hook_init(); main_loop(); exit(0); } Index: stable/8/sbin/hastd =================================================================== --- stable/8/sbin/hastd (revision 221506) +++ stable/8/sbin/hastd (revision 221507) Property changes on: stable/8/sbin/hastd ___________________________________________________________________ Modified: svn:mergeinfo ## -0,0 +0,1 ## Merged /head/sbin/hastd:r221075-221076,221078