diff --git a/sbin/hastd/hast.conf.5 b/sbin/hastd/hast.conf.5 index f6368bcf913e..3d921e46c039 100644 --- a/sbin/hastd/hast.conf.5 +++ b/sbin/hastd/hast.conf.5 @@ -1,448 +1,449 @@ .\" Copyright (c) 2010 The FreeBSD Foundation .\" Copyright (c) 2010-2012 Pawel Jakub Dawidek .\" All rights reserved. .\" .\" This documentation was written by Pawel Jakub Dawidek under sponsorship from .\" the FreeBSD Foundation. .\" .\" Redistribution and use in source and binary forms, with or without .\" modification, are permitted provided that the following conditions .\" are met: .\" 1. Redistributions of source code must retain the above copyright .\" notice, this list of conditions and the following disclaimer. .\" 2. Redistributions in binary form must reproduce the above copyright .\" notice, this list of conditions and the following disclaimer in the .\" documentation and/or other materials provided with the distribution. .\" .\" THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND .\" ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE .\" IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE .\" ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE .\" FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL .\" DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS .\" OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) .\" HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT .\" LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY .\" OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF .\" SUCH DAMAGE. .\" .\" $FreeBSD$ .\" .Dd January 25, 2012 .Dt HAST.CONF 5 .Os .Sh NAME .Nm hast.conf .Nd configuration file for the .Xr hastd 8 daemon and the .Xr hastctl 8 utility .Sh DESCRIPTION The .Nm file is used by both .Xr hastd 8 daemon and .Xr hastctl 8 control utility. Configuration file is designed in a way that exactly the same file can be (and should be) used on both HAST nodes. Every line starting with # is treated as comment and ignored. .Sh CONFIGURATION FILE SYNTAX General syntax of the .Nm file is following: .Bd -literal -offset indent # Global section control listen replication checksum compression timeout exec metaflush on | off pidfile on { # Node section control listen pidfile } on { # Node section control listen pidfile } resource { # Resource section replication checksum compression name local timeout exec metaflush on | off on { # Resource-node section name # Required local metaflush on | off # Required remote source } on { # Resource-node section name # Required local metaflush on | off # Required remote source } } .Ed .Pp Most of the various available configuration parameters are optional. If parameter is not defined in the particular section, it will be inherited from the parent section. For example, if the .Ic listen parameter is not defined in the node section, it will be inherited from the global section. In case the global section does not define the .Ic listen parameter at all, the default value will be used. .Sh CONFIGURATION FILE DESCRIPTION The .Aq node argument can be replaced either by a full hostname as obtained by .Xr gethostname 3 , -only first part of the hostname, or by node's UUID as found in the +only first part of the hostname, by node's UUID as found in the .Va kern.hostuuid .Xr sysctl 8 +variable +or by node's hostid as found in the +.Va kern.hostid +.Xr sysctl 8 variable. .Pp The following statements are available: .Bl -tag -width ".Ic xxxx" .It Ic control Aq addr .Pp Address for communication with .Xr hastctl 8 . Each of the following examples defines the same control address: .Bd -literal -offset indent uds:///var/run/hastctl unix:///var/run/hastctl /var/run/hastctl .Ed .Pp The default value is .Pa uds:///var/run/hastctl . .It Ic pidfile Aq path .Pp File in which to store the process ID of the main .Xr hastd 8 process. .Pp The default value is .Pa /var/run/hastd.pid . .It Ic listen Aq addr .Pp Address to listen on in form of: .Bd -literal -offset indent protocol://protocol-specific-address .Ed .Pp Each of the following examples defines the same listen address: .Bd -literal -offset indent 0.0.0.0 0.0.0.0:8457 tcp://0.0.0.0 tcp://0.0.0.0:8457 tcp4://0.0.0.0 tcp4://0.0.0.0:8457 .Ed .Pp Multiple listen addresses can be specified. By default .Nm hastd listens on .Pa tcp4://0.0.0.0:8457 and .Pa tcp6://[::]:8457 if kernel supports IPv4 and IPv6 respectively. .It Ic replication Aq mode .Pp Replication mode should be one of the following: .Bl -tag -width ".Ic xxxx" .It Ic memsync .Pp Report the write operation as completed when local write completes and when the remote node acknowledges the data receipt, but before it actually stores the data. The data on remote node will be stored directly after sending acknowledgement. This mode is intended to reduce latency, but still provides a very good reliability. The only situation where some small amount of data could be lost is when the data is stored on primary node and sent to the secondary. Secondary node then acknowledges data receipt and primary reports success to an application. However, it may happen that the secondary goes down before the received data is really stored locally. Before secondary node returns, primary node dies entirely. When the secondary node comes back to life it becomes the new primary. Unfortunately some small amount of data which was confirmed to be stored to the application was lost. The risk of such a situation is very small. The .Ic memsync -replication mode is currently not implemented. +replication mode is the default. .It Ic fullsync .Pp Mark the write operation as completed when local as well as remote write completes. This is the safest and the slowest replication mode. -The -.Ic fullsync -replication mode is the default. .It Ic async .Pp The write operation is reported as complete right after the local write completes. This is the fastest and the most dangerous replication mode. This mode should be used when replicating to a distant node where latency is too high for other modes. .El .It Ic checksum Aq algorithm .Pp Checksum algorithm should be one of the following: .Bl -tag -width ".Ic sha256" .It Ic none No checksum will be calculated for the data being send over the network. This is the default setting. .It Ic crc32 CRC32 checksum will be calculated. .It Ic sha256 SHA256 checksum will be calculated. .El .It Ic compression Aq algorithm .Pp Compression algorithm should be one of the following: .Bl -tag -width ".Ic none" .It Ic none Data send over the network will not be compressed. .It Ic hole Only blocks that contain all zeros will be compressed. This is very useful for initial synchronization where potentially many blocks are still all zeros. There should be no measurable performance overhead when this algorithm is being used. This is the default setting. .It Ic lzf The LZF algorithm by Marc Alexander Lehmann will be used to compress the data send over the network. LZF is very fast, general purpose compression algorithm. .El .It Ic timeout Aq seconds .Pp Connection timeout in seconds. The default value is .Va 20 . .It Ic exec Aq path .Pp Execute the given program on various HAST events. Below is the list of currently implemented events and arguments the given program is executed with: .Bl -tag -width ".Ic xxxx" .It Ic " role " .Pp Executed on both primary and secondary nodes when resource role is changed. .Pp .It Ic " connect " .Pp Executed on both primary and secondary nodes when connection for the given resource between the nodes is established. .Pp .It Ic " disconnect " .Pp Executed on both primary and secondary nodes when connection for the given resource between the nodes is lost. .Pp .It Ic " syncstart " .Pp Executed on primary node when synchronization process of secondary node is started. .Pp .It Ic " syncdone " .Pp Executed on primary node when synchronization process of secondary node is completed successfully. .Pp .It Ic " syncintr " .Pp Executed on primary node when synchronization process of secondary node is interrupted, most likely due to secondary node outage or connection failure between the nodes. .Pp .It Ic " split-brain " .Pp Executed on both primary and secondary nodes when split-brain condition is detected. .Pp .El The .Aq path argument should contain full path to executable program. If the given program exits with code different than .Va 0 , .Nm hastd will log it as an error. .Pp The .Aq resource argument is resource name from the configuration file. .Pp The .Aq oldrole argument is previous resource role (before the change). It can be one of: .Ar init , .Ar secondary , .Ar primary . .Pp The .Aq newrole argument is current resource role (after the change). It can be one of: .Ar init , .Ar secondary , .Ar primary . .Pp .It Ic metaflush on | off .Pp When set to .Va on , flush write cache of the local provider after every metadata (activemap) update. Flushing write cache ensures that provider will not reorder writes and that metadata will be properly updated before real data is stored. If the local provider does not support flushing write cache (it returns .Er EOPNOTSUPP on the .Cm BIO_FLUSH request), .Nm hastd will disable .Ic metaflush automatically. The default value is .Va on . .Pp .It Ic name Aq name .Pp GEOM provider name that will appear as .Pa /dev/hast/ . If name is not defined, resource name will be used as provider name. .It Ic local Aq path .Pp Path to the local component which will be used as backend provider for the resource. This can be either GEOM provider or regular file. .It Ic remote Aq addr .Pp Address of the remote .Nm hastd daemon. Format is the same as for the .Ic listen statement. When operating as a primary node this address will be used to connect to the secondary node. When operating as a secondary node only connections from this address will be accepted. .Pp A special value of .Va none can be used when the remote address is not yet known (eg. the other node is not set up yet). .It Ic source Aq addr .Pp Local address to bind to before connecting to the remote .Nm hastd daemon. Format is the same as for the .Ic listen statement. .El .Sh FILES .Bl -tag -width ".Pa /var/run/hastctl" -compact .It Pa /etc/hast.conf The default .Xr hastctl 8 and .Xr hastd 8 configuration file. .It Pa /var/run/hastctl Control socket used by the .Xr hastctl 8 control utility to communicate with the .Xr hastd 8 daemon. .El .Sh EXAMPLES The example configuration file can look as follows: .Bd -literal -offset indent listen tcp://0.0.0.0 on hasta { listen tcp://2001:db8::1/64 } on hastb { listen tcp://2001:db8::2/64 } resource shared { local /dev/da0 on hasta { remote tcp://10.0.0.2 } on hastb { remote tcp://10.0.0.1 } } resource tank { on hasta { local /dev/mirror/tanka source tcp://10.0.0.1 remote tcp://10.0.0.2 } on hastb { local /dev/mirror/tankb source tcp://10.0.0.2 remote tcp://10.0.0.1 } } .Ed .Sh SEE ALSO .Xr gethostname 3 , .Xr geom 4 , .Xr hastctl 8 , .Xr hastd 8 .Sh AUTHORS The .Nm was written by .An Pawel Jakub Dawidek Aq pjd@FreeBSD.org under sponsorship of the FreeBSD Foundation. diff --git a/sbin/hastd/hast.h b/sbin/hastd/hast.h index 263e98410944..c7a6b49b292d 100644 --- a/sbin/hastd/hast.h +++ b/sbin/hastd/hast.h @@ -1,248 +1,253 @@ /*- * Copyright (c) 2009-2010 The FreeBSD Foundation * Copyright (c) 2011 Pawel Jakub Dawidek * All rights reserved. * * This software was developed by Pawel Jakub Dawidek under sponsorship from * the FreeBSD Foundation. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * * $FreeBSD$ */ #ifndef _HAST_H_ #define _HAST_H_ #include #include #include #include #include #include #include #include #include #include "proto.h" /* * Version history: * 0 - initial version * 1 - HIO_KEEPALIVE added + * 2 - "memsync" and "received" attributes added for memsync mode */ -#define HAST_PROTO_VERSION 1 +#define HAST_PROTO_VERSION 2 #define EHAST_OK 0 #define EHAST_NOENTRY 1 #define EHAST_INVALID 2 #define EHAST_NOMEMORY 3 #define EHAST_UNIMPLEMENTED 4 #define HASTCTL_CMD_UNKNOWN 0 #define HASTCTL_CMD_SETROLE 1 #define HASTCTL_CMD_STATUS 2 #define HAST_ROLE_UNDEF 0 #define HAST_ROLE_INIT 1 #define HAST_ROLE_PRIMARY 2 #define HAST_ROLE_SECONDARY 3 #define HAST_SYNCSRC_UNDEF 0 #define HAST_SYNCSRC_PRIMARY 1 #define HAST_SYNCSRC_SECONDARY 2 #define HIO_UNDEF 0 #define HIO_READ 1 #define HIO_WRITE 2 #define HIO_DELETE 3 #define HIO_FLUSH 4 #define HIO_KEEPALIVE 5 #define HAST_USER "hast" #define HAST_TIMEOUT 20 #define HAST_CONFIG "/etc/hast.conf" #define HAST_CONTROL "/var/run/hastctl" #define HASTD_LISTEN_TCP4 "tcp4://0.0.0.0:8457" #define HASTD_LISTEN_TCP6 "tcp6://[::]:8457" #define HASTD_PIDFILE "/var/run/hastd.pid" /* Default extent size. */ #define HAST_EXTENTSIZE 2097152 /* Default maximum number of extents that are kept dirty. */ #define HAST_KEEPDIRTY 64 #define HAST_ADDRSIZE 1024 #define HAST_TOKEN_SIZE 16 /* Number of seconds to sleep between reconnect retries or keepalive packets. */ #define HAST_KEEPALIVE 10 struct hastd_listen { /* Address to listen on. */ char hl_addr[HAST_ADDRSIZE]; /* Protocol-specific data. */ struct proto_conn *hl_conn; TAILQ_ENTRY(hastd_listen) hl_next; }; struct hastd_config { /* Address to communicate with hastctl(8). */ char hc_controladdr[HAST_ADDRSIZE]; /* Protocol-specific data. */ struct proto_conn *hc_controlconn; /* Incoming control connection. */ struct proto_conn *hc_controlin; /* PID file path. */ char hc_pidfile[PATH_MAX]; /* List of addresses to listen on. */ TAILQ_HEAD(, hastd_listen) hc_listen; /* List of resources. */ TAILQ_HEAD(, hast_resource) hc_resources; }; #define HAST_REPLICATION_FULLSYNC 0 #define HAST_REPLICATION_MEMSYNC 1 #define HAST_REPLICATION_ASYNC 2 #define HAST_COMPRESSION_NONE 0 #define HAST_COMPRESSION_HOLE 1 #define HAST_COMPRESSION_LZF 2 #define HAST_CHECKSUM_NONE 0 #define HAST_CHECKSUM_CRC32 1 #define HAST_CHECKSUM_SHA256 2 /* * Structure that describes single resource. */ struct hast_resource { /* Resource name. */ char hr_name[NAME_MAX]; - /* Replication mode (HAST_REPLICATION_*). */ + /* Negotiated replication mode (HAST_REPLICATION_*). */ int hr_replication; + /* Configured replication mode (HAST_REPLICATION_*). */ + int hr_original_replication; /* Provider name that will appear in /dev/hast/. */ char hr_provname[NAME_MAX]; /* Synchronization extent size. */ int hr_extentsize; /* Maximum number of extents that are kept dirty. */ int hr_keepdirty; /* Path to a program to execute on various events. */ char hr_exec[PATH_MAX]; /* Compression algorithm. */ int hr_compression; /* Checksum algorithm. */ int hr_checksum; + /* Protocol version. */ + int hr_version; /* Path to local component. */ char hr_localpath[PATH_MAX]; /* Descriptor to access local component. */ int hr_localfd; /* Offset into local component. */ off_t hr_localoff; /* Size of usable space. */ off_t hr_datasize; /* Size of entire local provider. */ off_t hr_local_mediasize; /* Sector size of local provider. */ unsigned int hr_local_sectorsize; /* Is flushing write cache supported by the local provider? */ bool hr_localflush; /* Flush write cache on metadata updates? */ int hr_metaflush; /* Descriptor for /dev/ggctl communication. */ int hr_ggatefd; /* Unit number for ggate communication. */ int hr_ggateunit; /* Address of the remote component. */ char hr_remoteaddr[HAST_ADDRSIZE]; /* Local address to bind to for outgoing connections. */ char hr_sourceaddr[HAST_ADDRSIZE]; /* Connection for incoming data. */ struct proto_conn *hr_remotein; /* Connection for outgoing data. */ struct proto_conn *hr_remoteout; /* Token to verify both in and out connection are coming from the same node (not necessarily from the same address). */ unsigned char hr_token[HAST_TOKEN_SIZE]; /* Connection timeout. */ int hr_timeout; /* Resource unique identifier. */ uint64_t hr_resuid; /* Primary's local modification count. */ uint64_t hr_primary_localcnt; /* Primary's remote modification count. */ uint64_t hr_primary_remotecnt; /* Secondary's local modification count. */ uint64_t hr_secondary_localcnt; /* Secondary's remote modification count. */ uint64_t hr_secondary_remotecnt; /* Synchronization source. */ uint8_t hr_syncsrc; /* Resource role: HAST_ROLE_{INIT,PRIMARY,SECONDARY}. */ int hr_role; /* Previous resource role: HAST_ROLE_{INIT,PRIMARY,SECONDARY}. */ int hr_previous_role; /* PID of child worker process. 0 - no child. */ pid_t hr_workerpid; /* Control commands from parent to child. */ struct proto_conn *hr_ctrl; /* Events from child to parent. */ struct proto_conn *hr_event; /* Connection requests from child to parent. */ struct proto_conn *hr_conn; /* Activemap structure. */ struct activemap *hr_amp; /* Locked used to synchronize access to hr_amp. */ pthread_mutex_t hr_amp_lock; /* Number of BIO_READ requests. */ uint64_t hr_stat_read; /* Number of BIO_WRITE requests. */ uint64_t hr_stat_write; /* Number of BIO_DELETE requests. */ uint64_t hr_stat_delete; /* Number of BIO_FLUSH requests. */ uint64_t hr_stat_flush; /* Number of activemap updates. */ uint64_t hr_stat_activemap_update; /* Next resource. */ TAILQ_ENTRY(hast_resource) hr_next; }; struct hastd_config *yy_config_parse(const char *config, bool exitonerror); void yy_config_free(struct hastd_config *config); void yyerror(const char *); int yylex(void); #endif /* !_HAST_H_ */ diff --git a/sbin/hastd/hast_proto.c b/sbin/hastd/hast_proto.c index 039e76715870..dd41fb1aaba4 100644 --- a/sbin/hastd/hast_proto.c +++ b/sbin/hastd/hast_proto.c @@ -1,222 +1,222 @@ /*- * Copyright (c) 2009-2010 The FreeBSD Foundation * Copyright (c) 2011 Pawel Jakub Dawidek * All rights reserved. * * This software was developed by Pawel Jakub Dawidek under sponsorship from * the FreeBSD Foundation. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ #include __FBSDID("$FreeBSD$"); #include #include #include #include #include #include #include #include #ifdef HAVE_CRYPTO #include "hast_checksum.h" #endif #include "hast_compression.h" #include "hast_proto.h" struct hast_main_header { /* Protocol version. */ uint8_t version; /* Size of nv headers. */ uint32_t size; } __packed; typedef int hps_send_t(const struct hast_resource *, struct nv *nv, void **, size_t *, bool *); typedef int hps_recv_t(const struct hast_resource *, struct nv *nv, void **, size_t *, bool *); struct hast_pipe_stage { const char *hps_name; hps_send_t *hps_send; hps_recv_t *hps_recv; }; static struct hast_pipe_stage pipeline[] = { { "compression", compression_send, compression_recv }, #ifdef HAVE_CRYPTO { "checksum", checksum_send, checksum_recv } #endif }; /* * Send the given nv structure via conn. * We keep headers in nv structure and pass data in separate argument. * There can be no data at all (data is NULL then). */ int hast_proto_send(const struct hast_resource *res, struct proto_conn *conn, struct nv *nv, const void *data, size_t size) { struct hast_main_header hdr; struct ebuf *eb; bool freedata; void *dptr, *hptr; size_t hsize; int ret; dptr = (void *)(uintptr_t)data; freedata = false; ret = -1; if (data != NULL) { unsigned int ii; for (ii = 0; ii < sizeof(pipeline) / sizeof(pipeline[0]); ii++) { (void)pipeline[ii].hps_send(res, nv, &dptr, &size, &freedata); } nv_add_uint32(nv, size, "size"); if (nv_error(nv) != 0) { errno = nv_error(nv); goto end; } } eb = nv_hton(nv); if (eb == NULL) goto end; - hdr.version = HAST_PROTO_VERSION; + hdr.version = res != NULL ? res->hr_version : HAST_PROTO_VERSION; hdr.size = htole32((uint32_t)ebuf_size(eb)); if (ebuf_add_head(eb, &hdr, sizeof(hdr)) == -1) goto end; hptr = ebuf_data(eb, &hsize); if (proto_send(conn, hptr, hsize) == -1) goto end; if (data != NULL && proto_send(conn, dptr, size) == -1) goto end; ret = 0; end: if (freedata) free(dptr); return (ret); } int hast_proto_recv_hdr(const struct proto_conn *conn, struct nv **nvp) { struct hast_main_header hdr; struct nv *nv; struct ebuf *eb; void *hptr; eb = NULL; nv = NULL; if (proto_recv(conn, &hdr, sizeof(hdr)) == -1) goto fail; - if (hdr.version != HAST_PROTO_VERSION) { + if (hdr.version > HAST_PROTO_VERSION) { errno = ERPCMISMATCH; goto fail; } hdr.size = le32toh(hdr.size); eb = ebuf_alloc(hdr.size); if (eb == NULL) goto fail; if (ebuf_add_tail(eb, NULL, hdr.size) == -1) goto fail; hptr = ebuf_data(eb, NULL); PJDLOG_ASSERT(hptr != NULL); if (proto_recv(conn, hptr, hdr.size) == -1) goto fail; nv = nv_ntoh(eb); if (nv == NULL) goto fail; *nvp = nv; return (0); fail: if (eb != NULL) ebuf_free(eb); return (-1); } int hast_proto_recv_data(const struct hast_resource *res, struct proto_conn *conn, struct nv *nv, void *data, size_t size) { unsigned int ii; bool freedata; size_t dsize; void *dptr; int ret; PJDLOG_ASSERT(data != NULL); PJDLOG_ASSERT(size > 0); ret = -1; freedata = false; dptr = data; dsize = nv_get_uint32(nv, "size"); if (dsize > size) { errno = EINVAL; goto end; } else if (dsize == 0) { (void)nv_set_error(nv, 0); } else { if (proto_recv(conn, data, dsize) == -1) goto end; for (ii = sizeof(pipeline) / sizeof(pipeline[0]); ii > 0; ii--) { ret = pipeline[ii - 1].hps_recv(res, nv, &dptr, &dsize, &freedata); if (ret == -1) goto end; } ret = -1; if (dsize > size) { errno = EINVAL; goto end; } if (dptr != data) bcopy(dptr, data, dsize); } ret = 0; end: if (freedata) free(dptr); return (ret); } diff --git a/sbin/hastd/hastd.c b/sbin/hastd/hastd.c index ccce81daad7f..06b38e91331f 100644 --- a/sbin/hastd/hastd.c +++ b/sbin/hastd/hastd.c @@ -1,1320 +1,1337 @@ /*- * Copyright (c) 2009-2010 The FreeBSD Foundation * Copyright (c) 2010-2011 Pawel Jakub Dawidek * All rights reserved. * * This software was developed by Pawel Jakub Dawidek under sponsorship from * the FreeBSD Foundation. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ #include __FBSDID("$FreeBSD$"); #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "control.h" #include "event.h" #include "hast.h" #include "hast_proto.h" #include "hastd.h" #include "hooks.h" #include "subr.h" /* Path to configuration file. */ const char *cfgpath = HAST_CONFIG; /* Hastd configuration. */ static struct hastd_config *cfg; /* Was SIGINT or SIGTERM signal received? */ bool sigexit_received = false; /* Path to pidfile. */ static const char *pidfile; -/* PID file handle. */ +/* Pidfile handle. */ struct pidfh *pfh; /* Do we run in foreground? */ static bool foreground; /* How often check for hooks running for too long. */ #define REPORT_INTERVAL 5 static void usage(void) { errx(EX_USAGE, "[-dFh] [-c config] [-P pidfile]"); } static void g_gate_load(void) { if (modfind("g_gate") == -1) { /* Not present in kernel, try loading it. */ if (kldload("geom_gate") == -1 || modfind("g_gate") == -1) { if (errno != EEXIST) { pjdlog_exit(EX_OSERR, "Unable to load geom_gate module"); } } } } void descriptors_cleanup(struct hast_resource *res) { struct hast_resource *tres, *tmres; struct hastd_listen *lst; TAILQ_FOREACH_SAFE(tres, &cfg->hc_resources, hr_next, tmres) { if (tres == res) { PJDLOG_VERIFY(res->hr_role == HAST_ROLE_SECONDARY || (res->hr_remotein == NULL && res->hr_remoteout == NULL)); continue; } if (tres->hr_remotein != NULL) proto_close(tres->hr_remotein); if (tres->hr_remoteout != NULL) proto_close(tres->hr_remoteout); if (tres->hr_ctrl != NULL) proto_close(tres->hr_ctrl); if (tres->hr_event != NULL) proto_close(tres->hr_event); if (tres->hr_conn != NULL) proto_close(tres->hr_conn); TAILQ_REMOVE(&cfg->hc_resources, tres, hr_next); free(tres); } if (cfg->hc_controlin != NULL) proto_close(cfg->hc_controlin); proto_close(cfg->hc_controlconn); while ((lst = TAILQ_FIRST(&cfg->hc_listen)) != NULL) { TAILQ_REMOVE(&cfg->hc_listen, lst, hl_next); if (lst->hl_conn != NULL) proto_close(lst->hl_conn); free(lst); } (void)pidfile_close(pfh); hook_fini(); pjdlog_fini(); } static const char * dtype2str(mode_t mode) { if (S_ISBLK(mode)) return ("block device"); else if (S_ISCHR(mode)) return ("character device"); else if (S_ISDIR(mode)) return ("directory"); else if (S_ISFIFO(mode)) return ("pipe or FIFO"); else if (S_ISLNK(mode)) return ("symbolic link"); else if (S_ISREG(mode)) return ("regular file"); else if (S_ISSOCK(mode)) return ("socket"); else if (S_ISWHT(mode)) return ("whiteout"); else return ("unknown"); } void descriptors_assert(const struct hast_resource *res, int pjdlogmode) { char msg[256]; struct stat sb; long maxfd; bool isopen; mode_t mode; int fd; /* * At this point descriptor to syslog socket is closed, so if we want * to log assertion message, we have to first store it in 'msg' local * buffer and then open syslog socket and log it. */ msg[0] = '\0'; maxfd = sysconf(_SC_OPEN_MAX); if (maxfd == -1) { pjdlog_init(pjdlogmode); pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); pjdlog_errno(LOG_WARNING, "sysconf(_SC_OPEN_MAX) failed"); pjdlog_fini(); maxfd = 16384; } for (fd = 0; fd <= maxfd; fd++) { if (fstat(fd, &sb) == 0) { isopen = true; mode = sb.st_mode; } else if (errno == EBADF) { isopen = false; mode = 0; } else { (void)snprintf(msg, sizeof(msg), "Unable to fstat descriptor %d: %s", fd, strerror(errno)); break; } if (fd == STDIN_FILENO || fd == STDOUT_FILENO || fd == STDERR_FILENO) { if (!isopen) { (void)snprintf(msg, sizeof(msg), "Descriptor %d (%s) is closed, but should be open.", fd, (fd == STDIN_FILENO ? "stdin" : (fd == STDOUT_FILENO ? "stdout" : "stderr"))); break; } } else if (fd == proto_descriptor(res->hr_event)) { if (!isopen) { (void)snprintf(msg, sizeof(msg), "Descriptor %d (event) is closed, but should be open.", fd); break; } if (!S_ISSOCK(mode)) { (void)snprintf(msg, sizeof(msg), "Descriptor %d (event) is %s, but should be %s.", fd, dtype2str(mode), dtype2str(S_IFSOCK)); break; } } else if (fd == proto_descriptor(res->hr_ctrl)) { if (!isopen) { (void)snprintf(msg, sizeof(msg), "Descriptor %d (ctrl) is closed, but should be open.", fd); break; } if (!S_ISSOCK(mode)) { (void)snprintf(msg, sizeof(msg), "Descriptor %d (ctrl) is %s, but should be %s.", fd, dtype2str(mode), dtype2str(S_IFSOCK)); break; } } else if (res->hr_role == HAST_ROLE_PRIMARY && fd == proto_descriptor(res->hr_conn)) { if (!isopen) { (void)snprintf(msg, sizeof(msg), "Descriptor %d (conn) is closed, but should be open.", fd); break; } if (!S_ISSOCK(mode)) { (void)snprintf(msg, sizeof(msg), "Descriptor %d (conn) is %s, but should be %s.", fd, dtype2str(mode), dtype2str(S_IFSOCK)); break; } } else if (res->hr_role == HAST_ROLE_SECONDARY && res->hr_conn != NULL && fd == proto_descriptor(res->hr_conn)) { if (isopen) { (void)snprintf(msg, sizeof(msg), "Descriptor %d (conn) is open, but should be closed.", fd); break; } } else if (res->hr_role == HAST_ROLE_SECONDARY && fd == proto_descriptor(res->hr_remotein)) { if (!isopen) { (void)snprintf(msg, sizeof(msg), "Descriptor %d (remote in) is closed, but should be open.", fd); break; } if (!S_ISSOCK(mode)) { (void)snprintf(msg, sizeof(msg), "Descriptor %d (remote in) is %s, but should be %s.", fd, dtype2str(mode), dtype2str(S_IFSOCK)); break; } } else if (res->hr_role == HAST_ROLE_SECONDARY && fd == proto_descriptor(res->hr_remoteout)) { if (!isopen) { (void)snprintf(msg, sizeof(msg), "Descriptor %d (remote out) is closed, but should be open.", fd); break; } if (!S_ISSOCK(mode)) { (void)snprintf(msg, sizeof(msg), "Descriptor %d (remote out) is %s, but should be %s.", fd, dtype2str(mode), dtype2str(S_IFSOCK)); break; } } else { if (isopen) { (void)snprintf(msg, sizeof(msg), "Descriptor %d is open (%s), but should be closed.", fd, dtype2str(mode)); break; } } } if (msg[0] != '\0') { pjdlog_init(pjdlogmode); pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); PJDLOG_ABORT("%s", msg); } } static void child_exit_log(unsigned int pid, int status) { if (WIFEXITED(status) && WEXITSTATUS(status) == 0) { pjdlog_debug(1, "Worker process exited gracefully (pid=%u).", pid); } else if (WIFSIGNALED(status)) { pjdlog_error("Worker process killed (pid=%u, signal=%d).", pid, WTERMSIG(status)); } else { pjdlog_error("Worker process exited ungracefully (pid=%u, exitcode=%d).", pid, WIFEXITED(status) ? WEXITSTATUS(status) : -1); } } static void child_exit(void) { struct hast_resource *res; int status; pid_t pid; while ((pid = wait3(&status, WNOHANG, NULL)) > 0) { /* Find resource related to the process that just exited. */ TAILQ_FOREACH(res, &cfg->hc_resources, hr_next) { if (pid == res->hr_workerpid) break; } if (res == NULL) { /* * This can happen when new connection arrives and we * cancel child responsible for the old one or if this * was hook which we executed. */ hook_check_one(pid, status); continue; } pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); child_exit_log(pid, status); child_cleanup(res); if (res->hr_role == HAST_ROLE_PRIMARY) { /* * Restart child process if it was killed by signal * or exited because of temporary problem. */ if (WIFSIGNALED(status) || (WIFEXITED(status) && WEXITSTATUS(status) == EX_TEMPFAIL)) { sleep(1); pjdlog_info("Restarting worker process."); hastd_primary(res); } else { res->hr_role = HAST_ROLE_INIT; pjdlog_info("Changing resource role back to %s.", role2str(res->hr_role)); } } pjdlog_prefix_set("%s", ""); } } static bool resource_needs_restart(const struct hast_resource *res0, const struct hast_resource *res1) { PJDLOG_ASSERT(strcmp(res0->hr_name, res1->hr_name) == 0); if (strcmp(res0->hr_provname, res1->hr_provname) != 0) return (true); if (strcmp(res0->hr_localpath, res1->hr_localpath) != 0) return (true); if (res0->hr_role == HAST_ROLE_INIT || res0->hr_role == HAST_ROLE_SECONDARY) { if (strcmp(res0->hr_remoteaddr, res1->hr_remoteaddr) != 0) return (true); if (strcmp(res0->hr_sourceaddr, res1->hr_sourceaddr) != 0) return (true); if (res0->hr_replication != res1->hr_replication) return (true); if (res0->hr_checksum != res1->hr_checksum) return (true); if (res0->hr_compression != res1->hr_compression) return (true); if (res0->hr_timeout != res1->hr_timeout) return (true); if (strcmp(res0->hr_exec, res1->hr_exec) != 0) return (true); /* * When metaflush has changed we don't really need restart, * but it is just easier this way. */ if (res0->hr_metaflush != res1->hr_metaflush) return (true); } return (false); } static bool resource_needs_reload(const struct hast_resource *res0, const struct hast_resource *res1) { PJDLOG_ASSERT(strcmp(res0->hr_name, res1->hr_name) == 0); PJDLOG_ASSERT(strcmp(res0->hr_provname, res1->hr_provname) == 0); PJDLOG_ASSERT(strcmp(res0->hr_localpath, res1->hr_localpath) == 0); if (res0->hr_role != HAST_ROLE_PRIMARY) return (false); if (strcmp(res0->hr_remoteaddr, res1->hr_remoteaddr) != 0) return (true); if (strcmp(res0->hr_sourceaddr, res1->hr_sourceaddr) != 0) return (true); if (res0->hr_replication != res1->hr_replication) return (true); if (res0->hr_checksum != res1->hr_checksum) return (true); if (res0->hr_compression != res1->hr_compression) return (true); if (res0->hr_timeout != res1->hr_timeout) return (true); if (strcmp(res0->hr_exec, res1->hr_exec) != 0) return (true); if (res0->hr_metaflush != res1->hr_metaflush) return (true); return (false); } static void resource_reload(const struct hast_resource *res) { struct nv *nvin, *nvout; int error; PJDLOG_ASSERT(res->hr_role == HAST_ROLE_PRIMARY); nvout = nv_alloc(); nv_add_uint8(nvout, CONTROL_RELOAD, "cmd"); nv_add_string(nvout, res->hr_remoteaddr, "remoteaddr"); nv_add_string(nvout, res->hr_sourceaddr, "sourceaddr"); nv_add_int32(nvout, (int32_t)res->hr_replication, "replication"); nv_add_int32(nvout, (int32_t)res->hr_checksum, "checksum"); nv_add_int32(nvout, (int32_t)res->hr_compression, "compression"); nv_add_int32(nvout, (int32_t)res->hr_timeout, "timeout"); nv_add_string(nvout, res->hr_exec, "exec"); nv_add_int32(nvout, (int32_t)res->hr_metaflush, "metaflush"); if (nv_error(nvout) != 0) { nv_free(nvout); pjdlog_error("Unable to allocate header for reload message."); return; } if (hast_proto_send(res, res->hr_ctrl, nvout, NULL, 0) == -1) { pjdlog_errno(LOG_ERR, "Unable to send reload message"); nv_free(nvout); return; } nv_free(nvout); /* Receive response. */ if (hast_proto_recv_hdr(res->hr_ctrl, &nvin) == -1) { pjdlog_errno(LOG_ERR, "Unable to receive reload reply"); return; } error = nv_get_int16(nvin, "error"); nv_free(nvin); if (error != 0) { pjdlog_common(LOG_ERR, 0, error, "Reload failed"); return; } } static void hastd_reload(void) { struct hastd_config *newcfg; struct hast_resource *nres, *cres, *tres; struct hastd_listen *nlst, *clst; struct pidfh *newpfh; unsigned int nlisten; uint8_t role; pid_t otherpid; pjdlog_info("Reloading configuration..."); newpfh = NULL; newcfg = yy_config_parse(cfgpath, false); if (newcfg == NULL) goto failed; /* * Check if control address has changed. */ if (strcmp(cfg->hc_controladdr, newcfg->hc_controladdr) != 0) { if (proto_server(newcfg->hc_controladdr, &newcfg->hc_controlconn) == -1) { pjdlog_errno(LOG_ERR, "Unable to listen on control address %s", newcfg->hc_controladdr); goto failed; } } /* * Check if any listen address has changed. */ nlisten = 0; TAILQ_FOREACH(nlst, &newcfg->hc_listen, hl_next) { TAILQ_FOREACH(clst, &cfg->hc_listen, hl_next) { if (strcmp(nlst->hl_addr, clst->hl_addr) == 0) break; } if (clst != NULL && clst->hl_conn != NULL) { pjdlog_info("Keep listening on address %s.", nlst->hl_addr); nlst->hl_conn = clst->hl_conn; nlisten++; } else if (proto_server(nlst->hl_addr, &nlst->hl_conn) == 0) { pjdlog_info("Listening on new address %s.", nlst->hl_addr); nlisten++; } else { pjdlog_errno(LOG_WARNING, "Unable to listen on address %s", nlst->hl_addr); } } if (nlisten == 0) { pjdlog_error("No addresses to listen on."); goto failed; } /* * Check if pidfile's path has changed. */ if (!foreground && pidfile == NULL && strcmp(cfg->hc_pidfile, newcfg->hc_pidfile) != 0) { newpfh = pidfile_open(newcfg->hc_pidfile, 0600, &otherpid); if (newpfh == NULL) { if (errno == EEXIST) { pjdlog_errno(LOG_WARNING, "Another hastd is already running, pidfile: %s, pid: %jd.", newcfg->hc_pidfile, (intmax_t)otherpid); } else { pjdlog_errno(LOG_WARNING, "Unable to open or create pidfile %s", newcfg->hc_pidfile); } } else if (pidfile_write(newpfh) == -1) { /* Write PID to a file. */ pjdlog_errno(LOG_WARNING, "Unable to write PID to file %s", newcfg->hc_pidfile); } else { pjdlog_debug(1, "PID stored in %s.", newcfg->hc_pidfile); } } /* No failures from now on. */ /* * Switch to new control socket. */ if (newcfg->hc_controlconn != NULL) { pjdlog_info("Control socket changed from %s to %s.", cfg->hc_controladdr, newcfg->hc_controladdr); proto_close(cfg->hc_controlconn); cfg->hc_controlconn = newcfg->hc_controlconn; newcfg->hc_controlconn = NULL; strlcpy(cfg->hc_controladdr, newcfg->hc_controladdr, sizeof(cfg->hc_controladdr)); } /* * Switch to new pidfile. */ if (newpfh != NULL) { pjdlog_info("Pidfile changed from %s to %s.", cfg->hc_pidfile, newcfg->hc_pidfile); (void)pidfile_remove(pfh); pfh = newpfh; (void)strlcpy(cfg->hc_pidfile, newcfg->hc_pidfile, sizeof(cfg->hc_pidfile)); } /* * Switch to new listen addresses. Close all that were removed. */ while ((clst = TAILQ_FIRST(&cfg->hc_listen)) != NULL) { TAILQ_FOREACH(nlst, &newcfg->hc_listen, hl_next) { if (strcmp(nlst->hl_addr, clst->hl_addr) == 0) break; } if (nlst == NULL && clst->hl_conn != NULL) { proto_close(clst->hl_conn); pjdlog_info("No longer listening on address %s.", clst->hl_addr); } TAILQ_REMOVE(&cfg->hc_listen, clst, hl_next); free(clst); } TAILQ_CONCAT(&cfg->hc_listen, &newcfg->hc_listen, hl_next); /* * Stop and remove resources that were removed from the configuration. */ TAILQ_FOREACH_SAFE(cres, &cfg->hc_resources, hr_next, tres) { TAILQ_FOREACH(nres, &newcfg->hc_resources, hr_next) { if (strcmp(cres->hr_name, nres->hr_name) == 0) break; } if (nres == NULL) { control_set_role(cres, HAST_ROLE_INIT); TAILQ_REMOVE(&cfg->hc_resources, cres, hr_next); pjdlog_info("Resource %s removed.", cres->hr_name); free(cres); } } /* * Move new resources to the current configuration. */ TAILQ_FOREACH_SAFE(nres, &newcfg->hc_resources, hr_next, tres) { TAILQ_FOREACH(cres, &cfg->hc_resources, hr_next) { if (strcmp(cres->hr_name, nres->hr_name) == 0) break; } if (cres == NULL) { TAILQ_REMOVE(&newcfg->hc_resources, nres, hr_next); TAILQ_INSERT_TAIL(&cfg->hc_resources, nres, hr_next); pjdlog_info("Resource %s added.", nres->hr_name); } } /* * Deal with modified resources. * Depending on what has changed exactly we might want to perform * different actions. * * We do full resource restart in the following situations: * Resource role is INIT or SECONDARY. * Resource role is PRIMARY and path to local component or provider * name has changed. * In case of PRIMARY, the worker process will be killed and restarted, * which also means removing /dev/hast/ provider and * recreating it. * * We do just reload (send SIGHUP to worker process) if we act as * PRIMARY, but only if remote address, source address, replication * mode, timeout, execution path or metaflush has changed. * For those, there is no need to restart worker process. * If PRIMARY receives SIGHUP, it will reconnect if remote address or * source address has changed or it will set new timeout if only timeout * has changed or it will update metaflush if only metaflush has * changed. */ TAILQ_FOREACH_SAFE(nres, &newcfg->hc_resources, hr_next, tres) { TAILQ_FOREACH(cres, &cfg->hc_resources, hr_next) { if (strcmp(cres->hr_name, nres->hr_name) == 0) break; } PJDLOG_ASSERT(cres != NULL); if (resource_needs_restart(cres, nres)) { pjdlog_info("Resource %s configuration was modified, restarting it.", cres->hr_name); role = cres->hr_role; control_set_role(cres, HAST_ROLE_INIT); TAILQ_REMOVE(&cfg->hc_resources, cres, hr_next); free(cres); TAILQ_REMOVE(&newcfg->hc_resources, nres, hr_next); TAILQ_INSERT_TAIL(&cfg->hc_resources, nres, hr_next); control_set_role(nres, role); } else if (resource_needs_reload(cres, nres)) { pjdlog_info("Resource %s configuration was modified, reloading it.", cres->hr_name); strlcpy(cres->hr_remoteaddr, nres->hr_remoteaddr, sizeof(cres->hr_remoteaddr)); strlcpy(cres->hr_sourceaddr, nres->hr_sourceaddr, sizeof(cres->hr_sourceaddr)); cres->hr_replication = nres->hr_replication; cres->hr_checksum = nres->hr_checksum; cres->hr_compression = nres->hr_compression; cres->hr_timeout = nres->hr_timeout; strlcpy(cres->hr_exec, nres->hr_exec, sizeof(cres->hr_exec)); cres->hr_metaflush = nres->hr_metaflush; if (cres->hr_workerpid != 0) resource_reload(cres); } } yy_config_free(newcfg); pjdlog_info("Configuration reloaded successfully."); return; failed: if (newcfg != NULL) { if (newcfg->hc_controlconn != NULL) proto_close(newcfg->hc_controlconn); while ((nlst = TAILQ_FIRST(&newcfg->hc_listen)) != NULL) { if (nlst->hl_conn != NULL) { TAILQ_FOREACH(clst, &cfg->hc_listen, hl_next) { if (strcmp(nlst->hl_addr, clst->hl_addr) == 0) { break; } } if (clst == NULL || clst->hl_conn == NULL) proto_close(nlst->hl_conn); } TAILQ_REMOVE(&newcfg->hc_listen, nlst, hl_next); free(nlst); } yy_config_free(newcfg); } if (newpfh != NULL) (void)pidfile_remove(newpfh); pjdlog_warning("Configuration not reloaded."); } static void terminate_workers(void) { struct hast_resource *res; pjdlog_info("Termination signal received, exiting."); TAILQ_FOREACH(res, &cfg->hc_resources, hr_next) { if (res->hr_workerpid == 0) continue; pjdlog_info("Terminating worker process (resource=%s, role=%s, pid=%u).", res->hr_name, role2str(res->hr_role), res->hr_workerpid); if (kill(res->hr_workerpid, SIGTERM) == 0) continue; pjdlog_errno(LOG_WARNING, "Unable to send signal to worker process (resource=%s, role=%s, pid=%u).", res->hr_name, role2str(res->hr_role), res->hr_workerpid); } } static void listen_accept(struct hastd_listen *lst) { struct hast_resource *res; struct proto_conn *conn; struct nv *nvin, *nvout, *nverr; const char *resname; const unsigned char *token; char laddr[256], raddr[256]; + uint8_t version; size_t size; pid_t pid; int status; proto_local_address(lst->hl_conn, laddr, sizeof(laddr)); pjdlog_debug(1, "Accepting connection to %s.", laddr); if (proto_accept(lst->hl_conn, &conn) == -1) { pjdlog_errno(LOG_ERR, "Unable to accept connection %s", laddr); return; } proto_local_address(conn, laddr, sizeof(laddr)); proto_remote_address(conn, raddr, sizeof(raddr)); pjdlog_info("Connection from %s to %s.", raddr, laddr); /* Error in setting timeout is not critical, but why should it fail? */ if (proto_timeout(conn, HAST_TIMEOUT) == -1) pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); nvin = nvout = nverr = NULL; /* * Before receiving any data see if remote host have access to any * resource. */ TAILQ_FOREACH(res, &cfg->hc_resources, hr_next) { if (proto_address_match(conn, res->hr_remoteaddr)) break; } if (res == NULL) { pjdlog_error("Client %s isn't known.", raddr); goto close; } /* Ok, remote host can access at least one resource. */ if (hast_proto_recv_hdr(conn, &nvin) == -1) { pjdlog_errno(LOG_ERR, "Unable to receive header from %s", raddr); goto close; } resname = nv_get_string(nvin, "resource"); if (resname == NULL) { pjdlog_error("No 'resource' field in the header received from %s.", raddr); goto close; } pjdlog_debug(2, "%s: resource=%s", raddr, resname); + version = nv_get_uint8(nvin, "version"); + pjdlog_debug(2, "%s: version=%hhu", raddr, version); + if (version == 0) { + /* + * If no version is sent, it means this is protocol version 1. + */ + version = 1; + } + if (version > HAST_PROTO_VERSION) { + pjdlog_info("Remote protocol version %hhu is not supported, falling back to version %hhu.", + version, (unsigned char)HAST_PROTO_VERSION); + version = HAST_PROTO_VERSION; + } + pjdlog_debug(1, "Negotiated protocol version %hhu.", version); token = nv_get_uint8_array(nvin, &size, "token"); /* * NULL token means that this is first connection. */ if (token != NULL && size != sizeof(res->hr_token)) { pjdlog_error("Received token of invalid size from %s (expected %zu, got %zu).", raddr, sizeof(res->hr_token), size); goto close; } /* * From now on we want to send errors to the remote node. */ nverr = nv_alloc(); /* Find resource related to this connection. */ TAILQ_FOREACH(res, &cfg->hc_resources, hr_next) { if (strcmp(resname, res->hr_name) == 0) break; } /* Have we found the resource? */ if (res == NULL) { pjdlog_error("No resource '%s' as requested by %s.", resname, raddr); nv_add_stringf(nverr, "errmsg", "Resource not configured."); goto fail; } /* Now that we know resource name setup log prefix. */ pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); /* Does the remote host have access to this resource? */ if (!proto_address_match(conn, res->hr_remoteaddr)) { pjdlog_error("Client %s has no access to the resource.", raddr); nv_add_stringf(nverr, "errmsg", "No access to the resource."); goto fail; } /* Is the resource marked as secondary? */ if (res->hr_role != HAST_ROLE_SECONDARY) { pjdlog_warning("We act as %s for the resource and not as %s as requested by %s.", role2str(res->hr_role), role2str(HAST_ROLE_SECONDARY), raddr); nv_add_stringf(nverr, "errmsg", "Remote node acts as %s for the resource and not as %s.", role2str(res->hr_role), role2str(HAST_ROLE_SECONDARY)); if (res->hr_role == HAST_ROLE_PRIMARY) { /* * If we act as primary request the other side to wait * for us a bit, as we might be finishing cleanups. */ nv_add_uint8(nverr, 1, "wait"); } goto fail; } /* Does token (if exists) match? */ if (token != NULL && memcmp(token, res->hr_token, sizeof(res->hr_token)) != 0) { pjdlog_error("Token received from %s doesn't match.", raddr); nv_add_stringf(nverr, "errmsg", "Token doesn't match."); goto fail; } /* * If there is no token, but we have half-open connection * (only remotein) or full connection (worker process is running) * we have to cancel those and accept the new connection. */ if (token == NULL) { PJDLOG_ASSERT(res->hr_remoteout == NULL); pjdlog_debug(1, "Initial connection from %s.", raddr); if (res->hr_workerpid != 0) { PJDLOG_ASSERT(res->hr_remotein == NULL); pjdlog_debug(1, "Worker process exists (pid=%u), stopping it.", (unsigned int)res->hr_workerpid); /* Stop child process. */ if (kill(res->hr_workerpid, SIGINT) == -1) { pjdlog_errno(LOG_ERR, "Unable to stop worker process (pid=%u)", (unsigned int)res->hr_workerpid); /* * Other than logging the problem we * ignore it - nothing smart to do. */ } /* Wait for it to exit. */ else if ((pid = waitpid(res->hr_workerpid, &status, 0)) != res->hr_workerpid) { /* We can only log the problem. */ pjdlog_errno(LOG_ERR, "Waiting for worker process (pid=%u) failed", (unsigned int)res->hr_workerpid); } else { child_exit_log(res->hr_workerpid, status); } child_cleanup(res); } else if (res->hr_remotein != NULL) { char oaddr[256]; proto_remote_address(res->hr_remotein, oaddr, sizeof(oaddr)); pjdlog_debug(1, "Canceling half-open connection from %s on connection from %s.", oaddr, raddr); proto_close(res->hr_remotein); res->hr_remotein = NULL; } } /* * Checks and cleanups are done. */ if (token == NULL) { + res->hr_version = version; arc4random_buf(res->hr_token, sizeof(res->hr_token)); nvout = nv_alloc(); + nv_add_uint8(nvout, version, "version"); nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), "token"); if (nv_error(nvout) != 0) { pjdlog_common(LOG_ERR, 0, nv_error(nvout), "Unable to prepare return header for %s", raddr); nv_add_stringf(nverr, "errmsg", "Remote node was unable to prepare return header: %s.", strerror(nv_error(nvout))); goto fail; } - if (hast_proto_send(NULL, conn, nvout, NULL, 0) == -1) { + if (hast_proto_send(res, conn, nvout, NULL, 0) == -1) { int error = errno; pjdlog_errno(LOG_ERR, "Unable to send response to %s", raddr); nv_add_stringf(nverr, "errmsg", "Remote node was unable to send response: %s.", strerror(error)); goto fail; } res->hr_remotein = conn; pjdlog_debug(1, "Incoming connection from %s configured.", raddr); } else { res->hr_remoteout = conn; pjdlog_debug(1, "Outgoing connection to %s configured.", raddr); hastd_secondary(res, nvin); } nv_free(nvin); nv_free(nvout); nv_free(nverr); pjdlog_prefix_set("%s", ""); return; fail: if (nv_error(nverr) != 0) { pjdlog_common(LOG_ERR, 0, nv_error(nverr), "Unable to prepare error header for %s", raddr); goto close; } if (hast_proto_send(NULL, conn, nverr, NULL, 0) == -1) { pjdlog_errno(LOG_ERR, "Unable to send error to %s", raddr); goto close; } close: if (nvin != NULL) nv_free(nvin); if (nvout != NULL) nv_free(nvout); if (nverr != NULL) nv_free(nverr); proto_close(conn); pjdlog_prefix_set("%s", ""); } static void connection_migrate(struct hast_resource *res) { struct proto_conn *conn; int16_t val = 0; pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); PJDLOG_ASSERT(res->hr_role == HAST_ROLE_PRIMARY); if (proto_recv(res->hr_conn, &val, sizeof(val)) == -1) { pjdlog_errno(LOG_WARNING, "Unable to receive connection command"); return; } if (proto_client(res->hr_sourceaddr[0] != '\0' ? res->hr_sourceaddr : NULL, res->hr_remoteaddr, &conn) == -1) { val = errno; pjdlog_errno(LOG_WARNING, "Unable to create outgoing connection to %s", res->hr_remoteaddr); goto out; } if (proto_connect(conn, -1) == -1) { val = errno; pjdlog_errno(LOG_WARNING, "Unable to connect to %s", res->hr_remoteaddr); proto_close(conn); goto out; } val = 0; out: if (proto_send(res->hr_conn, &val, sizeof(val)) == -1) { pjdlog_errno(LOG_WARNING, "Unable to send reply to connection request"); } if (val == 0 && proto_connection_send(res->hr_conn, conn) == -1) pjdlog_errno(LOG_WARNING, "Unable to send connection"); pjdlog_prefix_set("%s", ""); } static void check_signals(void) { struct timespec sigtimeout; sigset_t mask; int signo; sigtimeout.tv_sec = 0; sigtimeout.tv_nsec = 0; PJDLOG_VERIFY(sigemptyset(&mask) == 0); PJDLOG_VERIFY(sigaddset(&mask, SIGHUP) == 0); PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); PJDLOG_VERIFY(sigaddset(&mask, SIGCHLD) == 0); while ((signo = sigtimedwait(&mask, NULL, &sigtimeout)) != -1) { switch (signo) { case SIGINT: case SIGTERM: sigexit_received = true; terminate_workers(); proto_close(cfg->hc_controlconn); exit(EX_OK); break; case SIGCHLD: child_exit(); break; case SIGHUP: hastd_reload(); break; default: PJDLOG_ABORT("Unexpected signal (%d).", signo); } } } static void main_loop(void) { struct hast_resource *res; struct hastd_listen *lst; struct timeval seltimeout; int fd, maxfd, ret; time_t lastcheck, now; fd_set rfds; lastcheck = time(NULL); seltimeout.tv_sec = REPORT_INTERVAL; seltimeout.tv_usec = 0; for (;;) { check_signals(); /* Setup descriptors for select(2). */ FD_ZERO(&rfds); maxfd = fd = proto_descriptor(cfg->hc_controlconn); PJDLOG_ASSERT(fd >= 0); FD_SET(fd, &rfds); TAILQ_FOREACH(lst, &cfg->hc_listen, hl_next) { if (lst->hl_conn == NULL) continue; fd = proto_descriptor(lst->hl_conn); PJDLOG_ASSERT(fd >= 0); FD_SET(fd, &rfds); maxfd = fd > maxfd ? fd : maxfd; } TAILQ_FOREACH(res, &cfg->hc_resources, hr_next) { if (res->hr_event == NULL) continue; fd = proto_descriptor(res->hr_event); PJDLOG_ASSERT(fd >= 0); FD_SET(fd, &rfds); maxfd = fd > maxfd ? fd : maxfd; if (res->hr_role == HAST_ROLE_PRIMARY) { /* Only primary workers asks for connections. */ PJDLOG_ASSERT(res->hr_conn != NULL); fd = proto_descriptor(res->hr_conn); PJDLOG_ASSERT(fd >= 0); FD_SET(fd, &rfds); maxfd = fd > maxfd ? fd : maxfd; } else { PJDLOG_ASSERT(res->hr_conn == NULL); } } PJDLOG_ASSERT(maxfd + 1 <= (int)FD_SETSIZE); ret = select(maxfd + 1, &rfds, NULL, NULL, &seltimeout); now = time(NULL); if (lastcheck + REPORT_INTERVAL <= now) { hook_check(); lastcheck = now; } if (ret == 0) { /* * select(2) timed out, so there should be no * descriptors to check. */ continue; } else if (ret == -1) { if (errno == EINTR) continue; KEEP_ERRNO((void)pidfile_remove(pfh)); pjdlog_exit(EX_OSERR, "select() failed"); } /* * Check for signals before we do anything to update our * info about terminated workers in the meantime. */ check_signals(); if (FD_ISSET(proto_descriptor(cfg->hc_controlconn), &rfds)) control_handle(cfg); TAILQ_FOREACH(lst, &cfg->hc_listen, hl_next) { if (lst->hl_conn == NULL) continue; if (FD_ISSET(proto_descriptor(lst->hl_conn), &rfds)) listen_accept(lst); } TAILQ_FOREACH(res, &cfg->hc_resources, hr_next) { if (res->hr_event == NULL) continue; if (FD_ISSET(proto_descriptor(res->hr_event), &rfds)) { if (event_recv(res) == 0) continue; /* The worker process exited? */ proto_close(res->hr_event); res->hr_event = NULL; if (res->hr_conn != NULL) { proto_close(res->hr_conn); res->hr_conn = NULL; } continue; } if (res->hr_role == HAST_ROLE_PRIMARY) { PJDLOG_ASSERT(res->hr_conn != NULL); if (FD_ISSET(proto_descriptor(res->hr_conn), &rfds)) { connection_migrate(res); } } else { PJDLOG_ASSERT(res->hr_conn == NULL); } } } } static void dummy_sighandler(int sig __unused) { /* Nothing to do. */ } int main(int argc, char *argv[]) { struct hastd_listen *lst; pid_t otherpid; int debuglevel; sigset_t mask; foreground = false; debuglevel = 0; for (;;) { int ch; ch = getopt(argc, argv, "c:dFhP:"); if (ch == -1) break; switch (ch) { case 'c': cfgpath = optarg; break; case 'd': debuglevel++; break; case 'F': foreground = true; break; case 'P': pidfile = optarg; break; case 'h': default: usage(); } } argc -= optind; argv += optind; pjdlog_init(PJDLOG_MODE_STD); pjdlog_debug_set(debuglevel); g_gate_load(); /* * When path to the configuration file is relative, obtain full path, * so we can always find the file, even after daemonizing and changing * working directory to /. */ if (cfgpath[0] != '/') { const char *newcfgpath; newcfgpath = realpath(cfgpath, NULL); if (newcfgpath == NULL) { pjdlog_exit(EX_CONFIG, "Unable to obtain full path of %s", cfgpath); } cfgpath = newcfgpath; } cfg = yy_config_parse(cfgpath, true); PJDLOG_ASSERT(cfg != NULL); if (pidfile != NULL) { if (strlcpy(cfg->hc_pidfile, pidfile, sizeof(cfg->hc_pidfile)) >= sizeof(cfg->hc_pidfile)) { pjdlog_exitx(EX_CONFIG, "Pidfile path is too long."); } } if (pidfile != NULL || !foreground) { pfh = pidfile_open(cfg->hc_pidfile, 0600, &otherpid); if (pfh == NULL) { if (errno == EEXIST) { pjdlog_exitx(EX_TEMPFAIL, "Another hastd is already running, pidfile: %s, pid: %jd.", cfg->hc_pidfile, (intmax_t)otherpid); } /* * If we cannot create pidfile for other reasons, * only warn. */ pjdlog_errno(LOG_WARNING, "Unable to open or create pidfile %s", cfg->hc_pidfile); } } /* * Restore default actions for interesting signals in case parent * process (like init(8)) decided to ignore some of them (like SIGHUP). */ PJDLOG_VERIFY(signal(SIGHUP, SIG_DFL) != SIG_ERR); PJDLOG_VERIFY(signal(SIGINT, SIG_DFL) != SIG_ERR); PJDLOG_VERIFY(signal(SIGTERM, SIG_DFL) != SIG_ERR); /* * Because SIGCHLD is ignored by default, setup dummy handler for it, * so we can mask it. */ PJDLOG_VERIFY(signal(SIGCHLD, dummy_sighandler) != SIG_ERR); PJDLOG_VERIFY(sigemptyset(&mask) == 0); PJDLOG_VERIFY(sigaddset(&mask, SIGHUP) == 0); PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); PJDLOG_VERIFY(sigaddset(&mask, SIGCHLD) == 0); PJDLOG_VERIFY(sigprocmask(SIG_SETMASK, &mask, NULL) == 0); /* Listen on control address. */ if (proto_server(cfg->hc_controladdr, &cfg->hc_controlconn) == -1) { KEEP_ERRNO((void)pidfile_remove(pfh)); pjdlog_exit(EX_OSERR, "Unable to listen on control address %s", cfg->hc_controladdr); } /* Listen for remote connections. */ TAILQ_FOREACH(lst, &cfg->hc_listen, hl_next) { if (proto_server(lst->hl_addr, &lst->hl_conn) == -1) { KEEP_ERRNO((void)pidfile_remove(pfh)); pjdlog_exit(EX_OSERR, "Unable to listen on address %s", lst->hl_addr); } } if (!foreground) { if (daemon(0, 0) == -1) { KEEP_ERRNO((void)pidfile_remove(pfh)); pjdlog_exit(EX_OSERR, "Unable to daemonize"); } /* Start logging to syslog. */ pjdlog_mode_set(PJDLOG_MODE_SYSLOG); } if (pidfile != NULL || !foreground) { /* Write PID to a file. */ if (pidfile_write(pfh) == -1) { pjdlog_errno(LOG_WARNING, "Unable to write PID to a file %s", cfg->hc_pidfile); } else { pjdlog_debug(1, "PID stored in %s.", cfg->hc_pidfile); } } pjdlog_info("Started successfully, running protocol version %d.", HAST_PROTO_VERSION); pjdlog_debug(1, "Listening on control address %s.", cfg->hc_controladdr); TAILQ_FOREACH(lst, &cfg->hc_listen, hl_next) pjdlog_info("Listening on address %s.", lst->hl_addr); hook_init(); main_loop(); exit(0); } diff --git a/sbin/hastd/parse.y b/sbin/hastd/parse.y index 04ea7ab2513e..bd0690a02abf 100644 --- a/sbin/hastd/parse.y +++ b/sbin/hastd/parse.y @@ -1,1012 +1,1035 @@ %{ /*- * Copyright (c) 2009-2010 The FreeBSD Foundation * Copyright (c) 2011 Pawel Jakub Dawidek * All rights reserved. * * This software was developed by Pawel Jakub Dawidek under sponsorship from * the FreeBSD Foundation. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * * $FreeBSD$ */ #include /* MAXHOSTNAMELEN */ #include #include #include #include #include #include #include #include #include #include #include #include "hast.h" extern int depth; extern int lineno; extern FILE *yyin; extern char *yytext; static struct hastd_config *lconfig; static struct hast_resource *curres; static bool mynode, hadmynode; static char depth0_control[HAST_ADDRSIZE]; static char depth0_pidfile[PATH_MAX]; static char depth0_listen_tcp4[HAST_ADDRSIZE]; static char depth0_listen_tcp6[HAST_ADDRSIZE]; static TAILQ_HEAD(, hastd_listen) depth0_listen; static int depth0_replication; static int depth0_checksum; static int depth0_compression; static int depth0_timeout; static char depth0_exec[PATH_MAX]; static int depth0_metaflush; static char depth1_provname[PATH_MAX]; static char depth1_localpath[PATH_MAX]; static int depth1_metaflush; extern void yyrestart(FILE *); static int isitme(const char *name); static bool family_supported(int family); static int node_names(char **namesp); %} %token CONTROL PIDFILE LISTEN REPLICATION CHECKSUM COMPRESSION METAFLUSH %token TIMEOUT EXEC RESOURCE NAME LOCAL REMOTE SOURCE ON OFF %token FULLSYNC MEMSYNC ASYNC NONE CRC32 SHA256 HOLE LZF %token NUM STR OB CB %type remote_str %type replication_type %type checksum_type %type compression_type %type boolean %union { int num; char *str; } %token NUM %token STR %% statements: | statements statement ; statement: control_statement | pidfile_statement | listen_statement | replication_statement | checksum_statement | compression_statement | timeout_statement | exec_statement | metaflush_statement | node_statement | resource_statement ; control_statement: CONTROL STR { switch (depth) { case 0: if (strlcpy(depth0_control, $2, sizeof(depth0_control)) >= sizeof(depth0_control)) { pjdlog_error("control argument is too long."); free($2); return (1); } break; case 1: if (!mynode) break; if (strlcpy(lconfig->hc_controladdr, $2, sizeof(lconfig->hc_controladdr)) >= sizeof(lconfig->hc_controladdr)) { pjdlog_error("control argument is too long."); free($2); return (1); } break; default: PJDLOG_ABORT("control at wrong depth level"); } free($2); } ; pidfile_statement: PIDFILE STR { switch (depth) { case 0: if (strlcpy(depth0_pidfile, $2, sizeof(depth0_pidfile)) >= sizeof(depth0_pidfile)) { pjdlog_error("pidfile argument is too long."); free($2); return (1); } break; case 1: if (!mynode) break; if (strlcpy(lconfig->hc_pidfile, $2, sizeof(lconfig->hc_pidfile)) >= sizeof(lconfig->hc_pidfile)) { pjdlog_error("pidfile argument is too long."); free($2); return (1); } break; default: PJDLOG_ABORT("pidfile at wrong depth level"); } free($2); } ; listen_statement: LISTEN STR { struct hastd_listen *lst; lst = calloc(1, sizeof(*lst)); if (lst == NULL) { pjdlog_error("Unable to allocate memory for listen address."); free($2); return (1); } if (strlcpy(lst->hl_addr, $2, sizeof(lst->hl_addr)) >= sizeof(lst->hl_addr)) { pjdlog_error("listen argument is too long."); free($2); free(lst); return (1); } switch (depth) { case 0: TAILQ_INSERT_TAIL(&depth0_listen, lst, hl_next); break; case 1: if (mynode) TAILQ_INSERT_TAIL(&depth0_listen, lst, hl_next); else free(lst); break; default: PJDLOG_ABORT("listen at wrong depth level"); } free($2); } ; replication_statement: REPLICATION replication_type { switch (depth) { case 0: depth0_replication = $2; break; case 1: PJDLOG_ASSERT(curres != NULL); curres->hr_replication = $2; + curres->hr_original_replication = $2; break; default: PJDLOG_ABORT("replication at wrong depth level"); } } ; replication_type: FULLSYNC { $$ = HAST_REPLICATION_FULLSYNC; } | MEMSYNC { $$ = HAST_REPLICATION_MEMSYNC; } | ASYNC { $$ = HAST_REPLICATION_ASYNC; } ; checksum_statement: CHECKSUM checksum_type { switch (depth) { case 0: depth0_checksum = $2; break; case 1: PJDLOG_ASSERT(curres != NULL); curres->hr_checksum = $2; break; default: PJDLOG_ABORT("checksum at wrong depth level"); } } ; checksum_type: NONE { $$ = HAST_CHECKSUM_NONE; } | CRC32 { $$ = HAST_CHECKSUM_CRC32; } | SHA256 { $$ = HAST_CHECKSUM_SHA256; } ; compression_statement: COMPRESSION compression_type { switch (depth) { case 0: depth0_compression = $2; break; case 1: PJDLOG_ASSERT(curres != NULL); curres->hr_compression = $2; break; default: PJDLOG_ABORT("compression at wrong depth level"); } } ; compression_type: NONE { $$ = HAST_COMPRESSION_NONE; } | HOLE { $$ = HAST_COMPRESSION_HOLE; } | LZF { $$ = HAST_COMPRESSION_LZF; } ; timeout_statement: TIMEOUT NUM { if ($2 <= 0) { pjdlog_error("Negative or zero timeout."); return (1); } switch (depth) { case 0: depth0_timeout = $2; break; case 1: PJDLOG_ASSERT(curres != NULL); curres->hr_timeout = $2; break; default: PJDLOG_ABORT("timeout at wrong depth level"); } } ; exec_statement: EXEC STR { switch (depth) { case 0: if (strlcpy(depth0_exec, $2, sizeof(depth0_exec)) >= sizeof(depth0_exec)) { pjdlog_error("Exec path is too long."); free($2); return (1); } break; case 1: PJDLOG_ASSERT(curres != NULL); if (strlcpy(curres->hr_exec, $2, sizeof(curres->hr_exec)) >= sizeof(curres->hr_exec)) { pjdlog_error("Exec path is too long."); free($2); return (1); } break; default: PJDLOG_ABORT("exec at wrong depth level"); } free($2); } ; metaflush_statement: METAFLUSH boolean { switch (depth) { case 0: depth0_metaflush = $2; break; case 1: PJDLOG_ASSERT(curres != NULL); depth1_metaflush = $2; break; case 2: if (!mynode) break; PJDLOG_ASSERT(curres != NULL); curres->hr_metaflush = $2; break; default: PJDLOG_ABORT("metaflush at wrong depth level"); } } ; boolean: ON { $$ = 1; } | OFF { $$ = 0; } ; node_statement: ON node_start OB node_entries CB { mynode = false; } ; node_start: STR { switch (isitme($1)) { case -1: free($1); return (1); case 0: break; case 1: mynode = true; break; default: PJDLOG_ABORT("invalid isitme() return value"); } free($1); } ; node_entries: | node_entries node_entry ; node_entry: control_statement | pidfile_statement | listen_statement ; resource_statement: RESOURCE resource_start OB resource_entries CB { if (curres != NULL) { /* * There must be section for this node, at least with * remote address configuration. */ if (!hadmynode) { char *names; if (node_names(&names) != 0) return (1); pjdlog_error("No resource %s configuration for this node (acceptable node names: %s).", curres->hr_name, names); return (1); } /* * Let's see if there are some resource-level settings * that we can use for node-level settings. */ if (curres->hr_provname[0] == '\0' && depth1_provname[0] != '\0') { /* * Provider name is not set at node-level, * but is set at resource-level, use it. */ strlcpy(curres->hr_provname, depth1_provname, sizeof(curres->hr_provname)); } if (curres->hr_localpath[0] == '\0' && depth1_localpath[0] != '\0') { /* * Path to local provider is not set at * node-level, but is set at resource-level, * use it. */ strlcpy(curres->hr_localpath, depth1_localpath, sizeof(curres->hr_localpath)); } if (curres->hr_metaflush == -1 && depth1_metaflush != -1) { /* * Metaflush is not set at node-level, * but is set at resource-level, use it. */ curres->hr_metaflush = depth1_metaflush; } /* * If provider name is not given, use resource name * as provider name. */ if (curres->hr_provname[0] == '\0') { strlcpy(curres->hr_provname, curres->hr_name, sizeof(curres->hr_provname)); } /* * Remote address has to be configured at this point. */ if (curres->hr_remoteaddr[0] == '\0') { pjdlog_error("Remote address not configured for resource %s.", curres->hr_name); return (1); } /* * Path to local provider has to be configured at this * point. */ if (curres->hr_localpath[0] == '\0') { pjdlog_error("Path to local component not configured for resource %s.", curres->hr_name); return (1); } /* Put it onto resource list. */ TAILQ_INSERT_TAIL(&lconfig->hc_resources, curres, hr_next); curres = NULL; } } ; resource_start: STR { /* Check if there is no duplicate entry. */ TAILQ_FOREACH(curres, &lconfig->hc_resources, hr_next) { if (strcmp(curres->hr_name, $1) == 0) { pjdlog_error("Resource %s configured more than once.", curres->hr_name); free($1); return (1); } } /* * Clear those, so we can tell if they were set at * resource-level or not. */ depth1_provname[0] = '\0'; depth1_localpath[0] = '\0'; depth1_metaflush = -1; hadmynode = false; curres = calloc(1, sizeof(*curres)); if (curres == NULL) { pjdlog_error("Unable to allocate memory for resource."); free($1); return (1); } if (strlcpy(curres->hr_name, $1, sizeof(curres->hr_name)) >= sizeof(curres->hr_name)) { pjdlog_error("Resource name is too long."); free(curres); free($1); return (1); } free($1); curres->hr_role = HAST_ROLE_INIT; curres->hr_previous_role = HAST_ROLE_INIT; curres->hr_replication = -1; + curres->hr_original_replication = -1; curres->hr_checksum = -1; curres->hr_compression = -1; + curres->hr_version = 1; curres->hr_timeout = -1; curres->hr_exec[0] = '\0'; curres->hr_provname[0] = '\0'; curres->hr_localpath[0] = '\0'; curres->hr_localfd = -1; curres->hr_localflush = true; curres->hr_metaflush = -1; curres->hr_remoteaddr[0] = '\0'; curres->hr_sourceaddr[0] = '\0'; curres->hr_ggateunit = -1; } ; resource_entries: | resource_entries resource_entry ; resource_entry: replication_statement | checksum_statement | compression_statement | timeout_statement | exec_statement | metaflush_statement | name_statement | local_statement | resource_node_statement ; name_statement: NAME STR { switch (depth) { case 1: if (strlcpy(depth1_provname, $2, sizeof(depth1_provname)) >= sizeof(depth1_provname)) { pjdlog_error("name argument is too long."); free($2); return (1); } break; case 2: if (!mynode) break; PJDLOG_ASSERT(curres != NULL); if (strlcpy(curres->hr_provname, $2, sizeof(curres->hr_provname)) >= sizeof(curres->hr_provname)) { pjdlog_error("name argument is too long."); free($2); return (1); } break; default: PJDLOG_ABORT("name at wrong depth level"); } free($2); } ; local_statement: LOCAL STR { switch (depth) { case 1: if (strlcpy(depth1_localpath, $2, sizeof(depth1_localpath)) >= sizeof(depth1_localpath)) { pjdlog_error("local argument is too long."); free($2); return (1); } break; case 2: if (!mynode) break; PJDLOG_ASSERT(curres != NULL); if (strlcpy(curres->hr_localpath, $2, sizeof(curres->hr_localpath)) >= sizeof(curres->hr_localpath)) { pjdlog_error("local argument is too long."); free($2); return (1); } break; default: PJDLOG_ABORT("local at wrong depth level"); } free($2); } ; resource_node_statement:ON resource_node_start OB resource_node_entries CB { mynode = false; } ; resource_node_start: STR { if (curres != NULL) { switch (isitme($1)) { case -1: free($1); return (1); case 0: break; case 1: mynode = hadmynode = true; break; default: PJDLOG_ABORT("invalid isitme() return value"); } } free($1); } ; resource_node_entries: | resource_node_entries resource_node_entry ; resource_node_entry: name_statement | local_statement | remote_statement | source_statement | metaflush_statement ; remote_statement: REMOTE remote_str { PJDLOG_ASSERT(depth == 2); if (mynode) { PJDLOG_ASSERT(curres != NULL); if (strlcpy(curres->hr_remoteaddr, $2, sizeof(curres->hr_remoteaddr)) >= sizeof(curres->hr_remoteaddr)) { pjdlog_error("remote argument is too long."); free($2); return (1); } } free($2); } ; remote_str: NONE { $$ = strdup("none"); } | STR { } ; source_statement: SOURCE STR { PJDLOG_ASSERT(depth == 2); if (mynode) { PJDLOG_ASSERT(curres != NULL); if (strlcpy(curres->hr_sourceaddr, $2, sizeof(curres->hr_sourceaddr)) >= sizeof(curres->hr_sourceaddr)) { pjdlog_error("source argument is too long."); free($2); return (1); } } free($2); } ; %% static int isitme(const char *name) { char buf[MAXHOSTNAMELEN]; + unsigned long hostid; char *pos; size_t bufsize; /* * First check if the given name matches our full hostname. */ if (gethostname(buf, sizeof(buf)) < 0) { pjdlog_errno(LOG_ERR, "gethostname() failed"); return (-1); } if (strcmp(buf, name) == 0) return (1); /* - * Now check if it matches first part of the host name. + * Check if it matches first part of the host name. */ pos = strchr(buf, '.'); if (pos != NULL && (size_t)(pos - buf) == strlen(name) && strncmp(buf, name, pos - buf) == 0) { return (1); } /* - * At the end check if name is equal to our host's UUID. + * Check if it matches host UUID. */ bufsize = sizeof(buf); if (sysctlbyname("kern.hostuuid", buf, &bufsize, NULL, 0) < 0) { pjdlog_errno(LOG_ERR, "sysctlbyname(kern.hostuuid) failed"); return (-1); } if (strcasecmp(buf, name) == 0) return (1); + /* + * Check if it matches hostid. + */ + bufsize = sizeof(hostid); + if (sysctlbyname("kern.hostid", &hostid, &bufsize, NULL, 0) < 0) { + pjdlog_errno(LOG_ERR, "sysctlbyname(kern.hostid) failed"); + return (-1); + } + (void)snprintf(buf, sizeof(buf), "hostid%lu", hostid); + if (strcmp(buf, name) == 0) + return (1); + /* * Looks like this isn't about us. */ return (0); } static bool family_supported(int family) { int sock; sock = socket(family, SOCK_STREAM, 0); - if (sock == -1 && errno == EAFNOSUPPORT) + if (sock == -1 && errno == EPROTONOSUPPORT) return (false); if (sock >= 0) (void)close(sock); return (true); } static int node_names(char **namesp) { static char names[MAXHOSTNAMELEN * 3]; char buf[MAXHOSTNAMELEN]; + unsigned long hostid; char *pos; size_t bufsize; if (gethostname(buf, sizeof(buf)) < 0) { pjdlog_errno(LOG_ERR, "gethostname() failed"); return (-1); } /* First component of the host name. */ pos = strchr(buf, '.'); if (pos != NULL && pos != buf) { (void)strlcpy(names, buf, MIN((size_t)(pos - buf + 1), sizeof(names))); (void)strlcat(names, ", ", sizeof(names)); } /* Full host name. */ (void)strlcat(names, buf, sizeof(names)); (void)strlcat(names, ", ", sizeof(names)); /* Host UUID. */ bufsize = sizeof(buf); if (sysctlbyname("kern.hostuuid", buf, &bufsize, NULL, 0) < 0) { pjdlog_errno(LOG_ERR, "sysctlbyname(kern.hostuuid) failed"); return (-1); } (void)strlcat(names, buf, sizeof(names)); + (void)strlcat(names, ", ", sizeof(names)); + + /* Host ID. */ + bufsize = sizeof(hostid); + if (sysctlbyname("kern.hostid", &hostid, &bufsize, NULL, 0) < 0) { + pjdlog_errno(LOG_ERR, "sysctlbyname(kern.hostid) failed"); + return (-1); + } + (void)snprintf(buf, sizeof(buf), "hostid%lu", hostid); + (void)strlcat(names, buf, sizeof(names)); *namesp = names; return (0); } void yyerror(const char *str) { pjdlog_error("Unable to parse configuration file at line %d near '%s': %s", lineno, yytext, str); } struct hastd_config * yy_config_parse(const char *config, bool exitonerror) { int ret; curres = NULL; mynode = false; depth = 0; lineno = 0; depth0_timeout = HAST_TIMEOUT; - depth0_replication = HAST_REPLICATION_FULLSYNC; + depth0_replication = HAST_REPLICATION_MEMSYNC; depth0_checksum = HAST_CHECKSUM_NONE; depth0_compression = HAST_COMPRESSION_HOLE; strlcpy(depth0_control, HAST_CONTROL, sizeof(depth0_control)); strlcpy(depth0_pidfile, HASTD_PIDFILE, sizeof(depth0_pidfile)); TAILQ_INIT(&depth0_listen); strlcpy(depth0_listen_tcp4, HASTD_LISTEN_TCP4, sizeof(depth0_listen_tcp4)); strlcpy(depth0_listen_tcp6, HASTD_LISTEN_TCP6, sizeof(depth0_listen_tcp6)); depth0_exec[0] = '\0'; depth0_metaflush = 1; lconfig = calloc(1, sizeof(*lconfig)); if (lconfig == NULL) { pjdlog_error("Unable to allocate memory for configuration."); if (exitonerror) exit(EX_TEMPFAIL); return (NULL); } TAILQ_INIT(&lconfig->hc_listen); TAILQ_INIT(&lconfig->hc_resources); yyin = fopen(config, "r"); if (yyin == NULL) { pjdlog_errno(LOG_ERR, "Unable to open configuration file %s", config); yy_config_free(lconfig); if (exitonerror) exit(EX_OSFILE); return (NULL); } yyrestart(yyin); ret = yyparse(); fclose(yyin); if (ret != 0) { yy_config_free(lconfig); if (exitonerror) exit(EX_CONFIG); return (NULL); } /* * Let's see if everything is set up. */ if (lconfig->hc_controladdr[0] == '\0') { strlcpy(lconfig->hc_controladdr, depth0_control, sizeof(lconfig->hc_controladdr)); } if (lconfig->hc_pidfile[0] == '\0') { strlcpy(lconfig->hc_pidfile, depth0_pidfile, sizeof(lconfig->hc_pidfile)); } if (!TAILQ_EMPTY(&depth0_listen)) TAILQ_CONCAT(&lconfig->hc_listen, &depth0_listen, hl_next); if (TAILQ_EMPTY(&lconfig->hc_listen)) { struct hastd_listen *lst; if (family_supported(AF_INET)) { lst = calloc(1, sizeof(*lst)); if (lst == NULL) { pjdlog_error("Unable to allocate memory for listen address."); yy_config_free(lconfig); if (exitonerror) exit(EX_TEMPFAIL); return (NULL); } (void)strlcpy(lst->hl_addr, depth0_listen_tcp4, sizeof(lst->hl_addr)); TAILQ_INSERT_TAIL(&lconfig->hc_listen, lst, hl_next); } else { pjdlog_debug(1, "No IPv4 support in the kernel, not listening on IPv4 address."); } if (family_supported(AF_INET6)) { lst = calloc(1, sizeof(*lst)); if (lst == NULL) { pjdlog_error("Unable to allocate memory for listen address."); yy_config_free(lconfig); if (exitonerror) exit(EX_TEMPFAIL); return (NULL); } (void)strlcpy(lst->hl_addr, depth0_listen_tcp6, sizeof(lst->hl_addr)); TAILQ_INSERT_TAIL(&lconfig->hc_listen, lst, hl_next); } else { pjdlog_debug(1, "No IPv6 support in the kernel, not listening on IPv6 address."); } if (TAILQ_EMPTY(&lconfig->hc_listen)) { pjdlog_error("No address to listen on."); yy_config_free(lconfig); if (exitonerror) exit(EX_TEMPFAIL); return (NULL); } } TAILQ_FOREACH(curres, &lconfig->hc_resources, hr_next) { PJDLOG_ASSERT(curres->hr_provname[0] != '\0'); PJDLOG_ASSERT(curres->hr_localpath[0] != '\0'); PJDLOG_ASSERT(curres->hr_remoteaddr[0] != '\0'); if (curres->hr_replication == -1) { /* * Replication is not set at resource-level. * Use global or default setting. */ curres->hr_replication = depth0_replication; - } - if (curres->hr_replication == HAST_REPLICATION_MEMSYNC) { - pjdlog_warning("Replication mode \"%s\" is not implemented, falling back to \"%s\".", - "memsync", "fullsync"); - curres->hr_replication = HAST_REPLICATION_FULLSYNC; + curres->hr_original_replication = depth0_replication; } if (curres->hr_checksum == -1) { /* * Checksum is not set at resource-level. * Use global or default setting. */ curres->hr_checksum = depth0_checksum; } if (curres->hr_compression == -1) { /* * Compression is not set at resource-level. * Use global or default setting. */ curres->hr_compression = depth0_compression; } if (curres->hr_timeout == -1) { /* * Timeout is not set at resource-level. * Use global or default setting. */ curres->hr_timeout = depth0_timeout; } if (curres->hr_exec[0] == '\0') { /* * Exec is not set at resource-level. * Use global or default setting. */ strlcpy(curres->hr_exec, depth0_exec, sizeof(curres->hr_exec)); } if (curres->hr_metaflush == -1) { /* * Metaflush is not set at resource-level. * Use global or default setting. */ curres->hr_metaflush = depth0_metaflush; } } return (lconfig); } void yy_config_free(struct hastd_config *config) { struct hastd_listen *lst; struct hast_resource *res; while ((lst = TAILQ_FIRST(&depth0_listen)) != NULL) { TAILQ_REMOVE(&depth0_listen, lst, hl_next); free(lst); } while ((lst = TAILQ_FIRST(&config->hc_listen)) != NULL) { TAILQ_REMOVE(&config->hc_listen, lst, hl_next); free(lst); } while ((res = TAILQ_FIRST(&config->hc_resources)) != NULL) { TAILQ_REMOVE(&config->hc_resources, res, hr_next); free(res); } free(config); } diff --git a/sbin/hastd/primary.c b/sbin/hastd/primary.c index 88159cb6bec1..fb49ef61f054 100644 --- a/sbin/hastd/primary.c +++ b/sbin/hastd/primary.c @@ -1,2300 +1,2444 @@ /*- * Copyright (c) 2009 The FreeBSD Foundation * Copyright (c) 2010-2011 Pawel Jakub Dawidek * All rights reserved. * * This software was developed by Pawel Jakub Dawidek under sponsorship from * the FreeBSD Foundation. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ #include __FBSDID("$FreeBSD$"); #include #include #include #include -#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "control.h" #include "event.h" #include "hast.h" #include "hast_proto.h" #include "hastd.h" #include "hooks.h" #include "metadata.h" #include "proto.h" #include "pjdlog.h" +#include "refcnt.h" #include "subr.h" #include "synch.h" /* The is only one remote component for now. */ #define ISREMOTE(no) ((no) == 1) struct hio { /* * Number of components we are still waiting for. * When this field goes to 0, we can send the request back to the * kernel. Each component has to decrease this counter by one * even on failure. */ unsigned int hio_countdown; /* * Each component has a place to store its own error. * Once the request is handled by all components we can decide if the * request overall is successful or not. */ int *hio_errors; /* * Structure used to communicate with GEOM Gate class. */ struct g_gate_ctl_io hio_ggio; /* * Request was already confirmed to GEOM Gate. */ bool hio_done; /* * Remember replication from the time the request was initiated, * so we won't get confused when replication changes on reload. */ int hio_replication; TAILQ_ENTRY(hio) *hio_next; }; #define hio_free_next hio_next[0] #define hio_done_next hio_next[0] /* * Free list holds unused structures. When free list is empty, we have to wait * until some in-progress requests are freed. */ static TAILQ_HEAD(, hio) hio_free_list; static pthread_mutex_t hio_free_list_lock; static pthread_cond_t hio_free_list_cond; /* * There is one send list for every component. One requests is placed on all * send lists - each component gets the same request, but each component is * responsible for managing his own send list. */ static TAILQ_HEAD(, hio) *hio_send_list; static pthread_mutex_t *hio_send_list_lock; static pthread_cond_t *hio_send_list_cond; /* * There is one recv list for every component, although local components don't * use recv lists as local requests are done synchronously. */ static TAILQ_HEAD(, hio) *hio_recv_list; static pthread_mutex_t *hio_recv_list_lock; static pthread_cond_t *hio_recv_list_cond; /* * Request is placed on done list by the slowest component (the one that * decreased hio_countdown from 1 to 0). */ static TAILQ_HEAD(, hio) hio_done_list; static pthread_mutex_t hio_done_list_lock; static pthread_cond_t hio_done_list_cond; /* * Structure below are for interaction with sync thread. */ static bool sync_inprogress; static pthread_mutex_t sync_lock; static pthread_cond_t sync_cond; /* * The lock below allows to synchornize access to remote connections. */ static pthread_rwlock_t *hio_remote_lock; /* * Lock to synchronize metadata updates. Also synchronize access to * hr_primary_localcnt and hr_primary_remotecnt fields. */ static pthread_mutex_t metadata_lock; /* * Maximum number of outstanding I/O requests. */ #define HAST_HIO_MAX 256 /* * Number of components. At this point there are only two components: local * and remote, but in the future it might be possible to use multiple local * and remote components. */ #define HAST_NCOMPONENTS 2 #define ISCONNECTED(res, no) \ ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL) #define QUEUE_INSERT1(hio, name, ncomp) do { \ bool _wakeup; \ \ mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ _wakeup = TAILQ_EMPTY(&hio_##name##_list[(ncomp)]); \ TAILQ_INSERT_TAIL(&hio_##name##_list[(ncomp)], (hio), \ hio_next[(ncomp)]); \ mtx_unlock(&hio_##name##_list_lock[ncomp]); \ if (_wakeup) \ cv_signal(&hio_##name##_list_cond[(ncomp)]); \ } while (0) #define QUEUE_INSERT2(hio, name) do { \ bool _wakeup; \ \ mtx_lock(&hio_##name##_list_lock); \ _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_##name##_next);\ mtx_unlock(&hio_##name##_list_lock); \ if (_wakeup) \ cv_signal(&hio_##name##_list_cond); \ } while (0) #define QUEUE_TAKE1(hio, name, ncomp, timeout) do { \ bool _last; \ \ mtx_lock(&hio_##name##_list_lock[(ncomp)]); \ _last = false; \ while (((hio) = TAILQ_FIRST(&hio_##name##_list[(ncomp)])) == NULL && !_last) { \ cv_timedwait(&hio_##name##_list_cond[(ncomp)], \ &hio_##name##_list_lock[(ncomp)], (timeout)); \ if ((timeout) != 0) \ _last = true; \ } \ if (hio != NULL) { \ TAILQ_REMOVE(&hio_##name##_list[(ncomp)], (hio), \ hio_next[(ncomp)]); \ } \ mtx_unlock(&hio_##name##_list_lock[(ncomp)]); \ } while (0) #define QUEUE_TAKE2(hio, name) do { \ mtx_lock(&hio_##name##_list_lock); \ while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ cv_wait(&hio_##name##_list_cond, \ &hio_##name##_list_lock); \ } \ TAILQ_REMOVE(&hio_##name##_list, (hio), hio_##name##_next); \ mtx_unlock(&hio_##name##_list_lock); \ } while (0) #define SYNCREQ(hio) do { \ (hio)->hio_ggio.gctl_unit = -1; \ (hio)->hio_ggio.gctl_seq = 1; \ } while (0) #define ISSYNCREQ(hio) ((hio)->hio_ggio.gctl_unit == -1) #define SYNCREQDONE(hio) do { (hio)->hio_ggio.gctl_unit = -2; } while (0) #define ISSYNCREQDONE(hio) ((hio)->hio_ggio.gctl_unit == -2) static struct hast_resource *gres; static pthread_mutex_t range_lock; static struct rangelocks *range_regular; static bool range_regular_wait; static pthread_cond_t range_regular_cond; static struct rangelocks *range_sync; static bool range_sync_wait; static pthread_cond_t range_sync_cond; static bool fullystarted; static void *ggate_recv_thread(void *arg); static void *local_send_thread(void *arg); static void *remote_send_thread(void *arg); static void *remote_recv_thread(void *arg); static void *ggate_send_thread(void *arg); static void *sync_thread(void *arg); static void *guard_thread(void *arg); static void cleanup(struct hast_resource *res) { int rerrno; /* Remember errno. */ rerrno = errno; /* Destroy ggate provider if we created one. */ if (res->hr_ggateunit >= 0) { struct g_gate_ctl_destroy ggiod; bzero(&ggiod, sizeof(ggiod)); ggiod.gctl_version = G_GATE_VERSION; ggiod.gctl_unit = res->hr_ggateunit; ggiod.gctl_force = 1; if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) == -1) { pjdlog_errno(LOG_WARNING, "Unable to destroy hast/%s device", res->hr_provname); } res->hr_ggateunit = -1; } /* Restore errno. */ errno = rerrno; } static __dead2 void primary_exit(int exitcode, const char *fmt, ...) { va_list ap; PJDLOG_ASSERT(exitcode != EX_OK); va_start(ap, fmt); pjdlogv_errno(LOG_ERR, fmt, ap); va_end(ap); cleanup(gres); exit(exitcode); } static __dead2 void primary_exitx(int exitcode, const char *fmt, ...) { va_list ap; va_start(ap, fmt); pjdlogv(exitcode == EX_OK ? LOG_INFO : LOG_ERR, fmt, ap); va_end(ap); cleanup(gres); exit(exitcode); } static int hast_activemap_flush(struct hast_resource *res) { const unsigned char *buf; size_t size; buf = activemap_bitmap(res->hr_amp, &size); PJDLOG_ASSERT(buf != NULL); PJDLOG_ASSERT((size % res->hr_local_sectorsize) == 0); if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != (ssize_t)size) { pjdlog_errno(LOG_ERR, "Unable to flush activemap to disk"); return (-1); } if (res->hr_metaflush == 1 && g_flush(res->hr_localfd) == -1) { if (errno == EOPNOTSUPP) { pjdlog_warning("The %s provider doesn't support flushing write cache. Disabling it.", res->hr_localpath); res->hr_metaflush = 0; } else { pjdlog_errno(LOG_ERR, "Unable to flush disk cache on activemap update"); return (-1); } } return (0); } static bool real_remote(const struct hast_resource *res) { return (strcmp(res->hr_remoteaddr, "none") != 0); } static void init_environment(struct hast_resource *res __unused) { struct hio *hio; unsigned int ii, ncomps; /* * In the future it might be per-resource value. */ ncomps = HAST_NCOMPONENTS; /* * Allocate memory needed by lists. */ hio_send_list = malloc(sizeof(hio_send_list[0]) * ncomps); if (hio_send_list == NULL) { primary_exitx(EX_TEMPFAIL, "Unable to allocate %zu bytes of memory for send lists.", sizeof(hio_send_list[0]) * ncomps); } hio_send_list_lock = malloc(sizeof(hio_send_list_lock[0]) * ncomps); if (hio_send_list_lock == NULL) { primary_exitx(EX_TEMPFAIL, "Unable to allocate %zu bytes of memory for send list locks.", sizeof(hio_send_list_lock[0]) * ncomps); } hio_send_list_cond = malloc(sizeof(hio_send_list_cond[0]) * ncomps); if (hio_send_list_cond == NULL) { primary_exitx(EX_TEMPFAIL, "Unable to allocate %zu bytes of memory for send list condition variables.", sizeof(hio_send_list_cond[0]) * ncomps); } hio_recv_list = malloc(sizeof(hio_recv_list[0]) * ncomps); if (hio_recv_list == NULL) { primary_exitx(EX_TEMPFAIL, "Unable to allocate %zu bytes of memory for recv lists.", sizeof(hio_recv_list[0]) * ncomps); } hio_recv_list_lock = malloc(sizeof(hio_recv_list_lock[0]) * ncomps); if (hio_recv_list_lock == NULL) { primary_exitx(EX_TEMPFAIL, "Unable to allocate %zu bytes of memory for recv list locks.", sizeof(hio_recv_list_lock[0]) * ncomps); } hio_recv_list_cond = malloc(sizeof(hio_recv_list_cond[0]) * ncomps); if (hio_recv_list_cond == NULL) { primary_exitx(EX_TEMPFAIL, "Unable to allocate %zu bytes of memory for recv list condition variables.", sizeof(hio_recv_list_cond[0]) * ncomps); } hio_remote_lock = malloc(sizeof(hio_remote_lock[0]) * ncomps); if (hio_remote_lock == NULL) { primary_exitx(EX_TEMPFAIL, "Unable to allocate %zu bytes of memory for remote connections locks.", sizeof(hio_remote_lock[0]) * ncomps); } /* * Initialize lists, their locks and theirs condition variables. */ TAILQ_INIT(&hio_free_list); mtx_init(&hio_free_list_lock); cv_init(&hio_free_list_cond); for (ii = 0; ii < HAST_NCOMPONENTS; ii++) { TAILQ_INIT(&hio_send_list[ii]); mtx_init(&hio_send_list_lock[ii]); cv_init(&hio_send_list_cond[ii]); TAILQ_INIT(&hio_recv_list[ii]); mtx_init(&hio_recv_list_lock[ii]); cv_init(&hio_recv_list_cond[ii]); rw_init(&hio_remote_lock[ii]); } TAILQ_INIT(&hio_done_list); mtx_init(&hio_done_list_lock); cv_init(&hio_done_list_cond); mtx_init(&metadata_lock); /* * Allocate requests pool and initialize requests. */ for (ii = 0; ii < HAST_HIO_MAX; ii++) { hio = malloc(sizeof(*hio)); if (hio == NULL) { primary_exitx(EX_TEMPFAIL, "Unable to allocate %zu bytes of memory for hio request.", sizeof(*hio)); } hio->hio_countdown = 0; hio->hio_errors = malloc(sizeof(hio->hio_errors[0]) * ncomps); if (hio->hio_errors == NULL) { primary_exitx(EX_TEMPFAIL, "Unable allocate %zu bytes of memory for hio errors.", sizeof(hio->hio_errors[0]) * ncomps); } hio->hio_next = malloc(sizeof(hio->hio_next[0]) * ncomps); if (hio->hio_next == NULL) { primary_exitx(EX_TEMPFAIL, "Unable allocate %zu bytes of memory for hio_next field.", sizeof(hio->hio_next[0]) * ncomps); } hio->hio_ggio.gctl_version = G_GATE_VERSION; hio->hio_ggio.gctl_data = malloc(MAXPHYS); if (hio->hio_ggio.gctl_data == NULL) { primary_exitx(EX_TEMPFAIL, "Unable to allocate %zu bytes of memory for gctl_data.", MAXPHYS); } hio->hio_ggio.gctl_length = MAXPHYS; hio->hio_ggio.gctl_error = 0; TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_free_next); } } static bool init_resuid(struct hast_resource *res) { mtx_lock(&metadata_lock); if (res->hr_resuid != 0) { mtx_unlock(&metadata_lock); return (false); } else { /* Initialize unique resource identifier. */ arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); mtx_unlock(&metadata_lock); if (metadata_write(res) == -1) exit(EX_NOINPUT); return (true); } } static void init_local(struct hast_resource *res) { unsigned char *buf; size_t mapsize; if (metadata_read(res, true) == -1) exit(EX_NOINPUT); mtx_init(&res->hr_amp_lock); if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, res->hr_local_sectorsize, res->hr_keepdirty) == -1) { primary_exit(EX_TEMPFAIL, "Unable to create activemap"); } mtx_init(&range_lock); cv_init(&range_regular_cond); if (rangelock_init(&range_regular) == -1) primary_exit(EX_TEMPFAIL, "Unable to create regular range lock"); cv_init(&range_sync_cond); if (rangelock_init(&range_sync) == -1) primary_exit(EX_TEMPFAIL, "Unable to create sync range lock"); mapsize = activemap_ondisk_size(res->hr_amp); buf = calloc(1, mapsize); if (buf == NULL) { primary_exitx(EX_TEMPFAIL, "Unable to allocate buffer for activemap."); } if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != (ssize_t)mapsize) { primary_exit(EX_NOINPUT, "Unable to read activemap"); } activemap_copyin(res->hr_amp, buf, mapsize); free(buf); if (res->hr_resuid != 0) return; /* * We're using provider for the first time. Initialize local and remote * counters. We don't initialize resuid here, as we want to do it just * in time. The reason for this is that we want to inform secondary * that there were no writes yet, so there is no need to synchronize * anything. */ res->hr_primary_localcnt = 0; res->hr_primary_remotecnt = 0; if (metadata_write(res) == -1) exit(EX_NOINPUT); } static int primary_connect(struct hast_resource *res, struct proto_conn **connp) { struct proto_conn *conn; int16_t val; val = 1; if (proto_send(res->hr_conn, &val, sizeof(val)) == -1) { primary_exit(EX_TEMPFAIL, "Unable to send connection request to parent"); } if (proto_recv(res->hr_conn, &val, sizeof(val)) == -1) { primary_exit(EX_TEMPFAIL, "Unable to receive reply to connection request from parent"); } if (val != 0) { errno = val; pjdlog_errno(LOG_WARNING, "Unable to connect to %s", res->hr_remoteaddr); return (-1); } if (proto_connection_recv(res->hr_conn, true, &conn) == -1) { primary_exit(EX_TEMPFAIL, "Unable to receive connection from parent"); } if (proto_connect_wait(conn, res->hr_timeout) == -1) { pjdlog_errno(LOG_WARNING, "Unable to connect to %s", res->hr_remoteaddr); proto_close(conn); return (-1); } /* Error in setting timeout is not critical, but why should it fail? */ if (proto_timeout(conn, res->hr_timeout) == -1) pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); *connp = conn; return (0); } - + /* * Function instructs GEOM_GATE to handle reads directly from within the kernel. */ static void enable_direct_reads(struct hast_resource *res) { struct g_gate_ctl_modify ggiomodify; bzero(&ggiomodify, sizeof(ggiomodify)); ggiomodify.gctl_version = G_GATE_VERSION; ggiomodify.gctl_unit = res->hr_ggateunit; ggiomodify.gctl_modify = GG_MODIFY_READPROV | GG_MODIFY_READOFFSET; strlcpy(ggiomodify.gctl_readprov, res->hr_localpath, sizeof(ggiomodify.gctl_readprov)); ggiomodify.gctl_readoffset = res->hr_localoff; if (ioctl(res->hr_ggatefd, G_GATE_CMD_MODIFY, &ggiomodify) == 0) pjdlog_debug(1, "Direct reads enabled."); else pjdlog_errno(LOG_WARNING, "Failed to enable direct reads"); } static int init_remote(struct hast_resource *res, struct proto_conn **inp, struct proto_conn **outp) { struct proto_conn *in, *out; struct nv *nvout, *nvin; const unsigned char *token; unsigned char *map; const char *errmsg; int32_t extentsize; int64_t datasize; uint32_t mapsize; + uint8_t version; size_t size; int error; PJDLOG_ASSERT((inp == NULL && outp == NULL) || (inp != NULL && outp != NULL)); PJDLOG_ASSERT(real_remote(res)); in = out = NULL; errmsg = NULL; if (primary_connect(res, &out) == -1) return (ECONNREFUSED); error = ECONNABORTED; /* * First handshake step. * Setup outgoing connection with remote node. */ nvout = nv_alloc(); nv_add_string(nvout, res->hr_name, "resource"); + nv_add_uint8(nvout, HAST_PROTO_VERSION, "version"); if (nv_error(nvout) != 0) { pjdlog_common(LOG_WARNING, 0, nv_error(nvout), "Unable to allocate header for connection with %s", res->hr_remoteaddr); nv_free(nvout); goto close; } if (hast_proto_send(res, out, nvout, NULL, 0) == -1) { pjdlog_errno(LOG_WARNING, "Unable to send handshake header to %s", res->hr_remoteaddr); nv_free(nvout); goto close; } nv_free(nvout); if (hast_proto_recv_hdr(out, &nvin) == -1) { pjdlog_errno(LOG_WARNING, "Unable to receive handshake header from %s", res->hr_remoteaddr); goto close; } errmsg = nv_get_string(nvin, "errmsg"); if (errmsg != NULL) { pjdlog_warning("%s", errmsg); if (nv_exists(nvin, "wait")) error = EBUSY; nv_free(nvin); goto close; } + version = nv_get_uint8(nvin, "version"); + if (version == 0) { + /* + * If no version is sent, it means this is protocol version 1. + */ + version = 1; + } + if (version > HAST_PROTO_VERSION) { + pjdlog_warning("Invalid version received (%hhu).", version); + nv_free(nvin); + goto close; + } + res->hr_version = version; + pjdlog_debug(1, "Negotiated protocol version %d.", res->hr_version); token = nv_get_uint8_array(nvin, &size, "token"); if (token == NULL) { pjdlog_warning("Handshake header from %s has no 'token' field.", res->hr_remoteaddr); nv_free(nvin); goto close; } if (size != sizeof(res->hr_token)) { pjdlog_warning("Handshake header from %s contains 'token' of wrong size (got %zu, expected %zu).", res->hr_remoteaddr, size, sizeof(res->hr_token)); nv_free(nvin); goto close; } bcopy(token, res->hr_token, sizeof(res->hr_token)); nv_free(nvin); /* * Second handshake step. * Setup incoming connection with remote node. */ if (primary_connect(res, &in) == -1) goto close; nvout = nv_alloc(); nv_add_string(nvout, res->hr_name, "resource"); nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), "token"); if (res->hr_resuid == 0) { /* * The resuid field was not yet initialized. * Because we do synchronization inside init_resuid(), it is * possible that someone already initialized it, the function * will return false then, but if we successfully initialized * it, we will get true. True means that there were no writes * to this resource yet and we want to inform secondary that * synchronization is not needed by sending "virgin" argument. */ if (init_resuid(res)) nv_add_int8(nvout, 1, "virgin"); } nv_add_uint64(nvout, res->hr_resuid, "resuid"); nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); if (nv_error(nvout) != 0) { pjdlog_common(LOG_WARNING, 0, nv_error(nvout), "Unable to allocate header for connection with %s", res->hr_remoteaddr); nv_free(nvout); goto close; } if (hast_proto_send(res, in, nvout, NULL, 0) == -1) { pjdlog_errno(LOG_WARNING, "Unable to send handshake header to %s", res->hr_remoteaddr); nv_free(nvout); goto close; } nv_free(nvout); if (hast_proto_recv_hdr(out, &nvin) == -1) { pjdlog_errno(LOG_WARNING, "Unable to receive handshake header from %s", res->hr_remoteaddr); goto close; } errmsg = nv_get_string(nvin, "errmsg"); if (errmsg != NULL) { pjdlog_warning("%s", errmsg); nv_free(nvin); goto close; } datasize = nv_get_int64(nvin, "datasize"); if (datasize != res->hr_datasize) { pjdlog_warning("Data size differs between nodes (local=%jd, remote=%jd).", (intmax_t)res->hr_datasize, (intmax_t)datasize); nv_free(nvin); goto close; } extentsize = nv_get_int32(nvin, "extentsize"); if (extentsize != res->hr_extentsize) { pjdlog_warning("Extent size differs between nodes (local=%zd, remote=%zd).", (ssize_t)res->hr_extentsize, (ssize_t)extentsize); nv_free(nvin); goto close; } res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) enable_direct_reads(res); if (nv_exists(nvin, "virgin")) { /* * Secondary was reinitialized, bump localcnt if it is 0 as * only we have the data. */ PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_PRIMARY); PJDLOG_ASSERT(res->hr_secondary_localcnt == 0); if (res->hr_primary_localcnt == 0) { PJDLOG_ASSERT(res->hr_secondary_remotecnt == 0); mtx_lock(&metadata_lock); res->hr_primary_localcnt++; pjdlog_debug(1, "Increasing localcnt to %ju.", (uintmax_t)res->hr_primary_localcnt); (void)metadata_write(res); mtx_unlock(&metadata_lock); } } map = NULL; mapsize = nv_get_uint32(nvin, "mapsize"); if (mapsize > 0) { map = malloc(mapsize); if (map == NULL) { pjdlog_error("Unable to allocate memory for remote activemap (mapsize=%ju).", (uintmax_t)mapsize); nv_free(nvin); goto close; } /* * Remote node have some dirty extents on its own, lets * download its activemap. */ if (hast_proto_recv_data(res, out, nvin, map, mapsize) == -1) { pjdlog_errno(LOG_ERR, "Unable to receive remote activemap"); nv_free(nvin); free(map); goto close; } /* * Merge local and remote bitmaps. */ activemap_merge(res->hr_amp, map, mapsize); free(map); /* * Now that we merged bitmaps from both nodes, flush it to the * disk before we start to synchronize. */ (void)hast_activemap_flush(res); } nv_free(nvin); #ifdef notyet /* Setup directions. */ if (proto_send(out, NULL, 0) == -1) pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); if (proto_recv(in, NULL, 0) == -1) pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); #endif pjdlog_info("Connected to %s.", res->hr_remoteaddr); + if (res->hr_original_replication == HAST_REPLICATION_MEMSYNC && + res->hr_version < 2) { + pjdlog_warning("The 'memsync' replication mode is not supported by the remote node, falling back to 'fullsync' mode."); + res->hr_replication = HAST_REPLICATION_FULLSYNC; + } else if (res->hr_replication != res->hr_original_replication) { + /* + * This is in case hastd disconnected and was upgraded. + */ + res->hr_replication = res->hr_original_replication; + } if (inp != NULL && outp != NULL) { *inp = in; *outp = out; } else { res->hr_remotein = in; res->hr_remoteout = out; } event_send(res, EVENT_CONNECT); return (0); close: if (errmsg != NULL && strcmp(errmsg, "Split-brain condition!") == 0) event_send(res, EVENT_SPLITBRAIN); proto_close(out); if (in != NULL) proto_close(in); return (error); } static void sync_start(void) { mtx_lock(&sync_lock); sync_inprogress = true; mtx_unlock(&sync_lock); cv_signal(&sync_cond); } static void sync_stop(void) { mtx_lock(&sync_lock); if (sync_inprogress) sync_inprogress = false; mtx_unlock(&sync_lock); } static void init_ggate(struct hast_resource *res) { struct g_gate_ctl_create ggiocreate; struct g_gate_ctl_cancel ggiocancel; /* * We communicate with ggate via /dev/ggctl. Open it. */ res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); if (res->hr_ggatefd == -1) primary_exit(EX_OSFILE, "Unable to open /dev/" G_GATE_CTL_NAME); /* * Create provider before trying to connect, as connection failure * is not critical, but may take some time. */ bzero(&ggiocreate, sizeof(ggiocreate)); ggiocreate.gctl_version = G_GATE_VERSION; ggiocreate.gctl_mediasize = res->hr_datasize; ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; ggiocreate.gctl_flags = 0; ggiocreate.gctl_maxcount = 0; ggiocreate.gctl_timeout = 0; ggiocreate.gctl_unit = G_GATE_NAME_GIVEN; snprintf(ggiocreate.gctl_name, sizeof(ggiocreate.gctl_name), "hast/%s", res->hr_provname); if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { pjdlog_info("Device hast/%s created.", res->hr_provname); res->hr_ggateunit = ggiocreate.gctl_unit; return; } if (errno != EEXIST) { primary_exit(EX_OSERR, "Unable to create hast/%s device", res->hr_provname); } pjdlog_debug(1, "Device hast/%s already exists, we will try to take it over.", res->hr_provname); /* * If we received EEXIST, we assume that the process who created the * provider died and didn't clean up. In that case we will start from * where he left of. */ bzero(&ggiocancel, sizeof(ggiocancel)); ggiocancel.gctl_version = G_GATE_VERSION; ggiocancel.gctl_unit = G_GATE_NAME_GIVEN; snprintf(ggiocancel.gctl_name, sizeof(ggiocancel.gctl_name), "hast/%s", res->hr_provname); if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { pjdlog_info("Device hast/%s recovered.", res->hr_provname); res->hr_ggateunit = ggiocancel.gctl_unit; return; } primary_exit(EX_OSERR, "Unable to take over hast/%s device", res->hr_provname); } void hastd_primary(struct hast_resource *res) { pthread_t td; pid_t pid; int error, mode, debuglevel; /* * Create communication channel for sending control commands from * parent to child. */ if (proto_client(NULL, "socketpair://", &res->hr_ctrl) == -1) { /* TODO: There's no need for this to be fatal error. */ KEEP_ERRNO((void)pidfile_remove(pfh)); pjdlog_exit(EX_OSERR, "Unable to create control sockets between parent and child"); } /* * Create communication channel for sending events from child to parent. */ if (proto_client(NULL, "socketpair://", &res->hr_event) == -1) { /* TODO: There's no need for this to be fatal error. */ KEEP_ERRNO((void)pidfile_remove(pfh)); pjdlog_exit(EX_OSERR, "Unable to create event sockets between child and parent"); } /* * Create communication channel for sending connection requests from * child to parent. */ if (proto_client(NULL, "socketpair://", &res->hr_conn) == -1) { /* TODO: There's no need for this to be fatal error. */ KEEP_ERRNO((void)pidfile_remove(pfh)); pjdlog_exit(EX_OSERR, "Unable to create connection sockets between child and parent"); } pid = fork(); if (pid == -1) { /* TODO: There's no need for this to be fatal error. */ KEEP_ERRNO((void)pidfile_remove(pfh)); pjdlog_exit(EX_TEMPFAIL, "Unable to fork"); } if (pid > 0) { /* This is parent. */ /* Declare that we are receiver. */ proto_recv(res->hr_event, NULL, 0); proto_recv(res->hr_conn, NULL, 0); /* Declare that we are sender. */ proto_send(res->hr_ctrl, NULL, 0); res->hr_workerpid = pid; return; } gres = res; mode = pjdlog_mode_get(); debuglevel = pjdlog_debug_get(); /* Declare that we are sender. */ proto_send(res->hr_event, NULL, 0); proto_send(res->hr_conn, NULL, 0); /* Declare that we are receiver. */ proto_recv(res->hr_ctrl, NULL, 0); descriptors_cleanup(res); descriptors_assert(res, mode); pjdlog_init(mode); pjdlog_debug_set(debuglevel); pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role)); init_local(res); init_ggate(res); init_environment(res); if (drop_privs(res) != 0) { cleanup(res); exit(EX_CONFIG); } pjdlog_info("Privileges successfully dropped."); /* * Create the guard thread first, so we can handle signals from the * very beginning. */ error = pthread_create(&td, NULL, guard_thread, res); PJDLOG_ASSERT(error == 0); /* * Create the control thread before sending any event to the parent, * as we can deadlock when parent sends control request to worker, * but worker has no control thread started yet, so parent waits. * In the meantime worker sends an event to the parent, but parent * is unable to handle the event, because it waits for control * request response. */ error = pthread_create(&td, NULL, ctrl_thread, res); PJDLOG_ASSERT(error == 0); if (real_remote(res)) { error = init_remote(res, NULL, NULL); if (error == 0) { sync_start(); } else if (error == EBUSY) { time_t start = time(NULL); pjdlog_warning("Waiting for remote node to become %s for %ds.", role2str(HAST_ROLE_SECONDARY), res->hr_timeout); for (;;) { sleep(1); error = init_remote(res, NULL, NULL); if (error != EBUSY) break; if (time(NULL) > start + res->hr_timeout) break; } if (error == EBUSY) { pjdlog_warning("Remote node is still %s, starting anyway.", role2str(HAST_ROLE_PRIMARY)); } } } error = pthread_create(&td, NULL, ggate_recv_thread, res); PJDLOG_ASSERT(error == 0); error = pthread_create(&td, NULL, local_send_thread, res); PJDLOG_ASSERT(error == 0); error = pthread_create(&td, NULL, remote_send_thread, res); PJDLOG_ASSERT(error == 0); error = pthread_create(&td, NULL, remote_recv_thread, res); PJDLOG_ASSERT(error == 0); error = pthread_create(&td, NULL, ggate_send_thread, res); PJDLOG_ASSERT(error == 0); fullystarted = true; (void)sync_thread(res); } static void -reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, const char *fmt, ...) +reqlog(int loglevel, int debuglevel, struct g_gate_ctl_io *ggio, + const char *fmt, ...) { char msg[1024]; va_list ap; va_start(ap, fmt); (void)vsnprintf(msg, sizeof(msg), fmt, ap); va_end(ap); switch (ggio->gctl_cmd) { case BIO_READ: (void)snprlcat(msg, sizeof(msg), "READ(%ju, %ju).", - (uintmax_t)ggio->gctl_offset, - (uintmax_t)ggio->gctl_length); + (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); break; case BIO_DELETE: (void)snprlcat(msg, sizeof(msg), "DELETE(%ju, %ju).", - (uintmax_t)ggio->gctl_offset, - (uintmax_t)ggio->gctl_length); + (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); break; case BIO_FLUSH: (void)snprlcat(msg, sizeof(msg), "FLUSH."); break; case BIO_WRITE: (void)snprlcat(msg, sizeof(msg), "WRITE(%ju, %ju).", - (uintmax_t)ggio->gctl_offset, - (uintmax_t)ggio->gctl_length); + (uintmax_t)ggio->gctl_offset, (uintmax_t)ggio->gctl_length); break; default: (void)snprlcat(msg, sizeof(msg), "UNKNOWN(%u).", (unsigned int)ggio->gctl_cmd); break; } pjdlog_common(loglevel, debuglevel, -1, "%s", msg); } static void remote_close(struct hast_resource *res, int ncomp) { rw_wlock(&hio_remote_lock[ncomp]); /* * Check for a race between dropping rlock and acquiring wlock - * another thread can close connection in-between. */ if (!ISCONNECTED(res, ncomp)) { PJDLOG_ASSERT(res->hr_remotein == NULL); PJDLOG_ASSERT(res->hr_remoteout == NULL); rw_unlock(&hio_remote_lock[ncomp]); return; } PJDLOG_ASSERT(res->hr_remotein != NULL); PJDLOG_ASSERT(res->hr_remoteout != NULL); pjdlog_debug(2, "Closing incoming connection to %s.", res->hr_remoteaddr); proto_close(res->hr_remotein); res->hr_remotein = NULL; pjdlog_debug(2, "Closing outgoing connection to %s.", res->hr_remoteaddr); proto_close(res->hr_remoteout); res->hr_remoteout = NULL; rw_unlock(&hio_remote_lock[ncomp]); pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr); /* * Stop synchronization if in-progress. */ sync_stop(); event_send(res, EVENT_DISCONNECT); } /* * Acknowledge write completion to the kernel, but don't update activemap yet. */ static void write_complete(struct hast_resource *res, struct hio *hio) { struct g_gate_ctl_io *ggio; unsigned int ncomp; PJDLOG_ASSERT(!hio->hio_done); ggio = &hio->hio_ggio; PJDLOG_ASSERT(ggio->gctl_cmd == BIO_WRITE); /* * Bump local count if this is first write after * connection failure with remote node. */ ncomp = 1; rw_rlock(&hio_remote_lock[ncomp]); if (!ISCONNECTED(res, ncomp)) { mtx_lock(&metadata_lock); if (res->hr_primary_localcnt == res->hr_secondary_remotecnt) { res->hr_primary_localcnt++; pjdlog_debug(1, "Increasing localcnt to %ju.", (uintmax_t)res->hr_primary_localcnt); (void)metadata_write(res); } mtx_unlock(&metadata_lock); } rw_unlock(&hio_remote_lock[ncomp]); if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); hio->hio_done = true; } /* * Thread receives ggate I/O requests from the kernel and passes them to * appropriate threads: * WRITE - always goes to both local_send and remote_send threads * READ (when the block is up-to-date on local component) - * only local_send thread * READ (when the block isn't up-to-date on local component) - * only remote_send thread * DELETE - always goes to both local_send and remote_send threads * FLUSH - always goes to both local_send and remote_send threads */ static void * ggate_recv_thread(void *arg) { struct hast_resource *res = arg; struct g_gate_ctl_io *ggio; struct hio *hio; unsigned int ii, ncomp, ncomps; int error; for (;;) { pjdlog_debug(2, "ggate_recv: Taking free request."); QUEUE_TAKE2(hio, free); pjdlog_debug(2, "ggate_recv: (%p) Got free request.", hio); ggio = &hio->hio_ggio; ggio->gctl_unit = res->hr_ggateunit; ggio->gctl_length = MAXPHYS; ggio->gctl_error = 0; hio->hio_done = false; hio->hio_replication = res->hr_replication; pjdlog_debug(2, "ggate_recv: (%p) Waiting for request from the kernel.", hio); if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) == -1) { if (sigexit_received) pthread_exit(NULL); primary_exit(EX_OSERR, "G_GATE_CMD_START failed"); } error = ggio->gctl_error; switch (error) { case 0: break; case ECANCELED: /* Exit gracefully. */ if (!sigexit_received) { pjdlog_debug(2, "ggate_recv: (%p) Received cancel from the kernel.", hio); pjdlog_info("Received cancel from the kernel, exiting."); } pthread_exit(NULL); case ENOMEM: /* * Buffer too small? Impossible, we allocate MAXPHYS * bytes - request can't be bigger than that. */ /* FALLTHROUGH */ case ENXIO: default: primary_exitx(EX_OSERR, "G_GATE_CMD_START failed: %s.", strerror(error)); } ncomp = 0; ncomps = HAST_NCOMPONENTS; for (ii = 0; ii < ncomps; ii++) hio->hio_errors[ii] = EINVAL; reqlog(LOG_DEBUG, 2, ggio, "ggate_recv: (%p) Request received from the kernel: ", hio); /* * Inform all components about new write request. * For read request prefer local component unless the given * range is out-of-date, then use remote component. */ switch (ggio->gctl_cmd) { case BIO_READ: res->hr_stat_read++; ncomps = 1; mtx_lock(&metadata_lock); if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { /* * This range is up-to-date on local component, * so handle request locally. */ /* Local component is 0 for now. */ ncomp = 0; } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); /* * This range is out-of-date on local component, * so send request to the remote node. */ /* Remote component is 1 for now. */ ncomp = 1; } mtx_unlock(&metadata_lock); break; case BIO_WRITE: res->hr_stat_write++; if (res->hr_resuid == 0 && res->hr_primary_localcnt == 0) { /* This is first write. */ res->hr_primary_localcnt = 1; } for (;;) { mtx_lock(&range_lock); if (rangelock_islocked(range_sync, ggio->gctl_offset, ggio->gctl_length)) { pjdlog_debug(2, "regular: Range offset=%jd length=%zu locked.", (intmax_t)ggio->gctl_offset, (size_t)ggio->gctl_length); range_regular_wait = true; cv_wait(&range_regular_cond, &range_lock); range_regular_wait = false; mtx_unlock(&range_lock); continue; } if (rangelock_add(range_regular, ggio->gctl_offset, ggio->gctl_length) == -1) { mtx_unlock(&range_lock); pjdlog_debug(2, "regular: Range offset=%jd length=%zu is already locked, waiting.", (intmax_t)ggio->gctl_offset, (size_t)ggio->gctl_length); sleep(1); continue; } mtx_unlock(&range_lock); break; } mtx_lock(&res->hr_amp_lock); if (activemap_write_start(res->hr_amp, ggio->gctl_offset, ggio->gctl_length)) { res->hr_stat_activemap_update++; (void)hast_activemap_flush(res); } mtx_unlock(&res->hr_amp_lock); break; case BIO_DELETE: res->hr_stat_delete++; break; case BIO_FLUSH: res->hr_stat_flush++; break; } pjdlog_debug(2, "ggate_recv: (%p) Moving request to the send queues.", hio); - refcount_init(&hio->hio_countdown, ncomps); - for (ii = ncomp; ii < ncomp + ncomps; ii++) + hio->hio_countdown = ncomps; + if (hio->hio_replication == HAST_REPLICATION_MEMSYNC && + ggio->gctl_cmd == BIO_WRITE) { + /* Each remote request needs two responses in memsync. */ + hio->hio_countdown++; + } + for (ii = ncomp; ii < ncomps; ii++) QUEUE_INSERT1(hio, send, ii); } /* NOTREACHED */ return (NULL); } /* * Thread reads from or writes to local component. * If local read fails, it redirects it to remote_send thread. */ static void * local_send_thread(void *arg) { struct hast_resource *res = arg; struct g_gate_ctl_io *ggio; struct hio *hio; unsigned int ncomp, rncomp; ssize_t ret; /* Local component is 0 for now. */ ncomp = 0; /* Remote component is 1 for now. */ rncomp = 1; for (;;) { pjdlog_debug(2, "local_send: Taking request."); QUEUE_TAKE1(hio, send, ncomp, 0); pjdlog_debug(2, "local_send: (%p) Got request.", hio); ggio = &hio->hio_ggio; switch (ggio->gctl_cmd) { case BIO_READ: ret = pread(res->hr_localfd, ggio->gctl_data, ggio->gctl_length, ggio->gctl_offset + res->hr_localoff); if (ret == ggio->gctl_length) hio->hio_errors[ncomp] = 0; else if (!ISSYNCREQ(hio)) { /* * If READ failed, try to read from remote node. */ if (ret == -1) { reqlog(LOG_WARNING, 0, ggio, "Local request failed (%s), trying remote node. ", strerror(errno)); } else if (ret != ggio->gctl_length) { reqlog(LOG_WARNING, 0, ggio, "Local request failed (%zd != %jd), trying remote node. ", ret, (intmax_t)ggio->gctl_length); } QUEUE_INSERT1(hio, send, rncomp); continue; } break; case BIO_WRITE: ret = pwrite(res->hr_localfd, ggio->gctl_data, ggio->gctl_length, ggio->gctl_offset + res->hr_localoff); if (ret == -1) { hio->hio_errors[ncomp] = errno; reqlog(LOG_WARNING, 0, ggio, "Local request failed (%s): ", strerror(errno)); } else if (ret != ggio->gctl_length) { hio->hio_errors[ncomp] = EIO; reqlog(LOG_WARNING, 0, ggio, "Local request failed (%zd != %jd): ", ret, (intmax_t)ggio->gctl_length); } else { hio->hio_errors[ncomp] = 0; if (hio->hio_replication == - HAST_REPLICATION_ASYNC && - !ISSYNCREQ(hio)) { + HAST_REPLICATION_ASYNC) { ggio->gctl_error = 0; write_complete(res, hio); } } break; case BIO_DELETE: ret = g_delete(res->hr_localfd, ggio->gctl_offset + res->hr_localoff, ggio->gctl_length); if (ret == -1) { hio->hio_errors[ncomp] = errno; reqlog(LOG_WARNING, 0, ggio, "Local request failed (%s): ", strerror(errno)); } else { hio->hio_errors[ncomp] = 0; } break; case BIO_FLUSH: if (!res->hr_localflush) { ret = -1; errno = EOPNOTSUPP; break; } ret = g_flush(res->hr_localfd); if (ret == -1) { if (errno == EOPNOTSUPP) res->hr_localflush = false; hio->hio_errors[ncomp] = errno; reqlog(LOG_WARNING, 0, ggio, "Local request failed (%s): ", strerror(errno)); } else { hio->hio_errors[ncomp] = 0; } break; } - if (!refcount_release(&hio->hio_countdown)) - continue; + + if (hio->hio_replication != HAST_REPLICATION_MEMSYNC || + ggio->gctl_cmd != BIO_WRITE || ISSYNCREQ(hio)) { + if (refcnt_release(&hio->hio_countdown) > 0) + continue; + } else { + /* + * Depending on hio_countdown value, requests finished + * in the following order: + * 0: remote memsync, remote final, local write + * 1: remote memsync, local write, (remote final) + * 2: local write, (remote memsync), (remote final) + */ + switch (refcnt_release(&hio->hio_countdown)) { + case 0: + /* + * Local write finished as last. + */ + break; + case 1: + /* + * Local write finished after remote memsync + * reply arrvied. We can complete the write now. + */ + if (hio->hio_errors[0] == 0) + write_complete(res, hio); + continue; + case 2: + /* + * Local write finished as first. + */ + continue; + default: + PJDLOG_ABORT("Invalid hio_countdown."); + } + } if (ISSYNCREQ(hio)) { mtx_lock(&sync_lock); SYNCREQDONE(hio); mtx_unlock(&sync_lock); cv_signal(&sync_cond); } else { pjdlog_debug(2, "local_send: (%p) Moving request to the done queue.", hio); QUEUE_INSERT2(hio, done); } } /* NOTREACHED */ return (NULL); } static void keepalive_send(struct hast_resource *res, unsigned int ncomp) { struct nv *nv; rw_rlock(&hio_remote_lock[ncomp]); if (!ISCONNECTED(res, ncomp)) { rw_unlock(&hio_remote_lock[ncomp]); return; } PJDLOG_ASSERT(res->hr_remotein != NULL); PJDLOG_ASSERT(res->hr_remoteout != NULL); nv = nv_alloc(); nv_add_uint8(nv, HIO_KEEPALIVE, "cmd"); if (nv_error(nv) != 0) { rw_unlock(&hio_remote_lock[ncomp]); nv_free(nv); pjdlog_debug(1, "keepalive_send: Unable to prepare header to send."); return; } if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) == -1) { rw_unlock(&hio_remote_lock[ncomp]); pjdlog_common(LOG_DEBUG, 1, errno, "keepalive_send: Unable to send request"); nv_free(nv); remote_close(res, ncomp); return; } rw_unlock(&hio_remote_lock[ncomp]); nv_free(nv); pjdlog_debug(2, "keepalive_send: Request sent."); } /* * Thread sends request to secondary node. */ static void * remote_send_thread(void *arg) { struct hast_resource *res = arg; struct g_gate_ctl_io *ggio; time_t lastcheck, now; struct hio *hio; struct nv *nv; unsigned int ncomp; bool wakeup; uint64_t offset, length; uint8_t cmd; void *data; /* Remote component is 1 for now. */ ncomp = 1; lastcheck = time(NULL); for (;;) { pjdlog_debug(2, "remote_send: Taking request."); QUEUE_TAKE1(hio, send, ncomp, HAST_KEEPALIVE); if (hio == NULL) { now = time(NULL); if (lastcheck + HAST_KEEPALIVE <= now) { keepalive_send(res, ncomp); lastcheck = now; } continue; } pjdlog_debug(2, "remote_send: (%p) Got request.", hio); ggio = &hio->hio_ggio; switch (ggio->gctl_cmd) { case BIO_READ: cmd = HIO_READ; data = NULL; offset = ggio->gctl_offset; length = ggio->gctl_length; break; case BIO_WRITE: cmd = HIO_WRITE; data = ggio->gctl_data; offset = ggio->gctl_offset; length = ggio->gctl_length; break; case BIO_DELETE: cmd = HIO_DELETE; data = NULL; offset = ggio->gctl_offset; length = ggio->gctl_length; break; case BIO_FLUSH: cmd = HIO_FLUSH; data = NULL; offset = 0; length = 0; break; default: PJDLOG_ABORT("invalid condition"); } nv = nv_alloc(); nv_add_uint8(nv, cmd, "cmd"); nv_add_uint64(nv, (uint64_t)ggio->gctl_seq, "seq"); nv_add_uint64(nv, offset, "offset"); nv_add_uint64(nv, length, "length"); + if (hio->hio_replication == HAST_REPLICATION_MEMSYNC && + ggio->gctl_cmd == BIO_WRITE && !ISSYNCREQ(hio)) { + nv_add_uint8(nv, 1, "memsync"); + } if (nv_error(nv) != 0) { hio->hio_errors[ncomp] = nv_error(nv); pjdlog_debug(2, "remote_send: (%p) Unable to prepare header to send.", hio); reqlog(LOG_ERR, 0, ggio, "Unable to prepare header to send (%s): ", strerror(nv_error(nv))); /* Move failed request immediately to the done queue. */ goto done_queue; } /* * Protect connection from disappearing. */ rw_rlock(&hio_remote_lock[ncomp]); if (!ISCONNECTED(res, ncomp)) { rw_unlock(&hio_remote_lock[ncomp]); hio->hio_errors[ncomp] = ENOTCONN; goto done_queue; } /* * Move the request to recv queue before sending it, because * in different order we can get reply before we move request * to recv queue. */ pjdlog_debug(2, "remote_send: (%p) Moving request to the recv queue.", hio); mtx_lock(&hio_recv_list_lock[ncomp]); wakeup = TAILQ_EMPTY(&hio_recv_list[ncomp]); TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, hio_next[ncomp]); mtx_unlock(&hio_recv_list_lock[ncomp]); if (hast_proto_send(res, res->hr_remoteout, nv, data, data != NULL ? length : 0) == -1) { hio->hio_errors[ncomp] = errno; rw_unlock(&hio_remote_lock[ncomp]); pjdlog_debug(2, "remote_send: (%p) Unable to send request.", hio); reqlog(LOG_ERR, 0, ggio, "Unable to send request (%s): ", strerror(hio->hio_errors[ncomp])); remote_close(res, ncomp); /* * Take request back from the receive queue and move * it immediately to the done queue. */ mtx_lock(&hio_recv_list_lock[ncomp]); TAILQ_REMOVE(&hio_recv_list[ncomp], hio, hio_next[ncomp]); mtx_unlock(&hio_recv_list_lock[ncomp]); goto done_queue; } rw_unlock(&hio_remote_lock[ncomp]); nv_free(nv); if (wakeup) cv_signal(&hio_recv_list_cond[ncomp]); continue; done_queue: nv_free(nv); if (ISSYNCREQ(hio)) { - if (!refcount_release(&hio->hio_countdown)) + if (refcnt_release(&hio->hio_countdown) > 0) continue; mtx_lock(&sync_lock); SYNCREQDONE(hio); mtx_unlock(&sync_lock); cv_signal(&sync_cond); continue; } if (ggio->gctl_cmd == BIO_WRITE) { mtx_lock(&res->hr_amp_lock); if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, ggio->gctl_length)) { (void)hast_activemap_flush(res); } mtx_unlock(&res->hr_amp_lock); + if (hio->hio_replication == HAST_REPLICATION_MEMSYNC) + (void)refcnt_release(&hio->hio_countdown); } - if (!refcount_release(&hio->hio_countdown)) + if (refcnt_release(&hio->hio_countdown) > 0) continue; pjdlog_debug(2, "remote_send: (%p) Moving request to the done queue.", hio); QUEUE_INSERT2(hio, done); } /* NOTREACHED */ return (NULL); } /* * Thread receives answer from secondary node and passes it to ggate_send * thread. */ static void * remote_recv_thread(void *arg) { struct hast_resource *res = arg; struct g_gate_ctl_io *ggio; struct hio *hio; struct nv *nv; unsigned int ncomp; uint64_t seq; + bool memsyncack; int error; /* Remote component is 1 for now. */ ncomp = 1; for (;;) { /* Wait until there is anything to receive. */ mtx_lock(&hio_recv_list_lock[ncomp]); while (TAILQ_EMPTY(&hio_recv_list[ncomp])) { pjdlog_debug(2, "remote_recv: No requests, waiting."); cv_wait(&hio_recv_list_cond[ncomp], &hio_recv_list_lock[ncomp]); } mtx_unlock(&hio_recv_list_lock[ncomp]); + memsyncack = false; + rw_rlock(&hio_remote_lock[ncomp]); if (!ISCONNECTED(res, ncomp)) { rw_unlock(&hio_remote_lock[ncomp]); /* * Connection is dead, so move all pending requests to * the done queue (one-by-one). */ mtx_lock(&hio_recv_list_lock[ncomp]); hio = TAILQ_FIRST(&hio_recv_list[ncomp]); PJDLOG_ASSERT(hio != NULL); TAILQ_REMOVE(&hio_recv_list[ncomp], hio, hio_next[ncomp]); mtx_unlock(&hio_recv_list_lock[ncomp]); goto done_queue; } if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) { pjdlog_errno(LOG_ERR, "Unable to receive reply header"); rw_unlock(&hio_remote_lock[ncomp]); remote_close(res, ncomp); continue; } rw_unlock(&hio_remote_lock[ncomp]); seq = nv_get_uint64(nv, "seq"); if (seq == 0) { pjdlog_error("Header contains no 'seq' field."); nv_free(nv); continue; } + memsyncack = nv_exists(nv, "received"); mtx_lock(&hio_recv_list_lock[ncomp]); TAILQ_FOREACH(hio, &hio_recv_list[ncomp], hio_next[ncomp]) { if (hio->hio_ggio.gctl_seq == seq) { TAILQ_REMOVE(&hio_recv_list[ncomp], hio, hio_next[ncomp]); break; } } mtx_unlock(&hio_recv_list_lock[ncomp]); if (hio == NULL) { pjdlog_error("Found no request matching received 'seq' field (%ju).", (uintmax_t)seq); nv_free(nv); continue; } ggio = &hio->hio_ggio; error = nv_get_int16(nv, "error"); if (error != 0) { /* Request failed on remote side. */ hio->hio_errors[ncomp] = error; reqlog(LOG_WARNING, 0, ggio, "Remote request failed (%s): ", strerror(error)); nv_free(nv); goto done_queue; } switch (ggio->gctl_cmd) { case BIO_READ: rw_rlock(&hio_remote_lock[ncomp]); if (!ISCONNECTED(res, ncomp)) { rw_unlock(&hio_remote_lock[ncomp]); nv_free(nv); goto done_queue; } if (hast_proto_recv_data(res, res->hr_remotein, nv, ggio->gctl_data, ggio->gctl_length) == -1) { hio->hio_errors[ncomp] = errno; pjdlog_errno(LOG_ERR, "Unable to receive reply data"); rw_unlock(&hio_remote_lock[ncomp]); nv_free(nv); remote_close(res, ncomp); goto done_queue; } rw_unlock(&hio_remote_lock[ncomp]); break; case BIO_WRITE: case BIO_DELETE: case BIO_FLUSH: break; default: PJDLOG_ABORT("invalid condition"); } hio->hio_errors[ncomp] = 0; nv_free(nv); done_queue: - if (!refcount_release(&hio->hio_countdown)) - continue; + if (hio->hio_replication != HAST_REPLICATION_MEMSYNC || + hio->hio_ggio.gctl_cmd != BIO_WRITE || ISSYNCREQ(hio)) { + if (refcnt_release(&hio->hio_countdown) > 0) + continue; + } else { + /* + * Depending on hio_countdown value, requests finished + * in the following order: + * + * 0: local write, remote memsync, remote final + * or + * 0: remote memsync, local write, remote final + * + * 1: local write, remote memsync, (remote final) + * or + * 1: remote memsync, remote final, (local write) + * + * 2: remote memsync, (local write), (remote final) + * or + * 2: remote memsync, (remote final), (local write) + */ + switch (refcnt_release(&hio->hio_countdown)) { + case 0: + /* + * Remote final reply arrived. + */ + PJDLOG_ASSERT(!memsyncack); + break; + case 1: + if (memsyncack) { + /* + * Local request already finished, so we + * can complete the write. + */ + if (hio->hio_errors[0] == 0) + write_complete(res, hio); + /* + * We still need to wait for final + * remote reply. + */ + pjdlog_debug(2, + "remote_recv: (%p) Moving request back to the recv queue.", + hio); + mtx_lock(&hio_recv_list_lock[ncomp]); + TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], + hio, hio_next[ncomp]); + mtx_unlock(&hio_recv_list_lock[ncomp]); + } else { + /* + * Remote final reply arrived before + * local write finished. + * Nothing to do in such case. + */ + } + continue; + case 2: + /* + * We received remote memsync reply even before + * local write finished. + */ + PJDLOG_ASSERT(memsyncack); + + pjdlog_debug(2, + "remote_recv: (%p) Moving request back to the recv queue.", + hio); + mtx_lock(&hio_recv_list_lock[ncomp]); + TAILQ_INSERT_TAIL(&hio_recv_list[ncomp], hio, + hio_next[ncomp]); + mtx_unlock(&hio_recv_list_lock[ncomp]); + continue; + default: + PJDLOG_ABORT("Invalid hio_countdown."); + } + } if (ISSYNCREQ(hio)) { mtx_lock(&sync_lock); SYNCREQDONE(hio); mtx_unlock(&sync_lock); cv_signal(&sync_cond); } else { pjdlog_debug(2, "remote_recv: (%p) Moving request to the done queue.", hio); QUEUE_INSERT2(hio, done); } } /* NOTREACHED */ return (NULL); } /* * Thread sends answer to the kernel. */ static void * ggate_send_thread(void *arg) { struct hast_resource *res = arg; struct g_gate_ctl_io *ggio; struct hio *hio; unsigned int ii, ncomps; ncomps = HAST_NCOMPONENTS; for (;;) { pjdlog_debug(2, "ggate_send: Taking request."); QUEUE_TAKE2(hio, done); pjdlog_debug(2, "ggate_send: (%p) Got request.", hio); ggio = &hio->hio_ggio; for (ii = 0; ii < ncomps; ii++) { if (hio->hio_errors[ii] == 0) { /* * One successful request is enough to declare * success. */ ggio->gctl_error = 0; break; } } if (ii == ncomps) { /* * None of the requests were successful. * Use the error from local component except the * case when we did only remote request. */ if (ggio->gctl_cmd == BIO_READ && res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) ggio->gctl_error = hio->hio_errors[1]; else ggio->gctl_error = hio->hio_errors[0]; } if (ggio->gctl_error == 0 && ggio->gctl_cmd == BIO_WRITE) { mtx_lock(&res->hr_amp_lock); if (activemap_write_complete(res->hr_amp, ggio->gctl_offset, ggio->gctl_length)) { res->hr_stat_activemap_update++; (void)hast_activemap_flush(res); } mtx_unlock(&res->hr_amp_lock); } if (ggio->gctl_cmd == BIO_WRITE) { /* * Unlock range we locked. */ mtx_lock(&range_lock); rangelock_del(range_regular, ggio->gctl_offset, ggio->gctl_length); if (range_sync_wait) cv_signal(&range_sync_cond); mtx_unlock(&range_lock); if (!hio->hio_done) write_complete(res, hio); } else { if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) { primary_exit(EX_OSERR, "G_GATE_CMD_DONE failed"); } } pjdlog_debug(2, "ggate_send: (%p) Moving request to the free queue.", hio); QUEUE_INSERT2(hio, free); } /* NOTREACHED */ return (NULL); } /* * Thread synchronize local and remote components. */ static void * sync_thread(void *arg __unused) { struct hast_resource *res = arg; struct hio *hio; struct g_gate_ctl_io *ggio; struct timeval tstart, tend, tdiff; unsigned int ii, ncomp, ncomps; off_t offset, length, synced; bool dorewind, directreads; int syncext; ncomps = HAST_NCOMPONENTS; dorewind = true; synced = 0; offset = -1; directreads = false; for (;;) { mtx_lock(&sync_lock); if (offset >= 0 && !sync_inprogress) { gettimeofday(&tend, NULL); timersub(&tend, &tstart, &tdiff); pjdlog_info("Synchronization interrupted after %#.0T. " "%NB synchronized so far.", &tdiff, (intmax_t)synced); event_send(res, EVENT_SYNCINTR); } while (!sync_inprogress) { dorewind = true; synced = 0; cv_wait(&sync_cond, &sync_lock); } mtx_unlock(&sync_lock); /* * Obtain offset at which we should synchronize. * Rewind synchronization if needed. */ mtx_lock(&res->hr_amp_lock); if (dorewind) activemap_sync_rewind(res->hr_amp); offset = activemap_sync_offset(res->hr_amp, &length, &syncext); if (syncext != -1) { /* * We synchronized entire syncext extent, we can mark * it as clean now. */ if (activemap_extent_complete(res->hr_amp, syncext)) (void)hast_activemap_flush(res); } mtx_unlock(&res->hr_amp_lock); if (dorewind) { dorewind = false; if (offset == -1) pjdlog_info("Nodes are in sync."); else { pjdlog_info("Synchronization started. %NB to go.", (intmax_t)(res->hr_extentsize * activemap_ndirty(res->hr_amp))); event_send(res, EVENT_SYNCSTART); gettimeofday(&tstart, NULL); } } if (offset == -1) { sync_stop(); pjdlog_debug(1, "Nothing to synchronize."); /* * Synchronization complete, make both localcnt and * remotecnt equal. */ ncomp = 1; rw_rlock(&hio_remote_lock[ncomp]); if (ISCONNECTED(res, ncomp)) { if (synced > 0) { int64_t bps; gettimeofday(&tend, NULL); timersub(&tend, &tstart, &tdiff); bps = (int64_t)((double)synced / ((double)tdiff.tv_sec + (double)tdiff.tv_usec / 1000000)); pjdlog_info("Synchronization complete. " "%NB synchronized in %#.0lT (%NB/sec).", (intmax_t)synced, &tdiff, (intmax_t)bps); event_send(res, EVENT_SYNCDONE); } mtx_lock(&metadata_lock); if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) directreads = true; res->hr_syncsrc = HAST_SYNCSRC_UNDEF; res->hr_primary_localcnt = res->hr_secondary_remotecnt; res->hr_primary_remotecnt = res->hr_secondary_localcnt; pjdlog_debug(1, "Setting localcnt to %ju and remotecnt to %ju.", (uintmax_t)res->hr_primary_localcnt, (uintmax_t)res->hr_primary_remotecnt); (void)metadata_write(res); mtx_unlock(&metadata_lock); } rw_unlock(&hio_remote_lock[ncomp]); if (directreads) { directreads = false; enable_direct_reads(res); } continue; } pjdlog_debug(2, "sync: Taking free request."); QUEUE_TAKE2(hio, free); pjdlog_debug(2, "sync: (%p) Got free request.", hio); /* * Lock the range we are going to synchronize. We don't want * race where someone writes between our read and write. */ for (;;) { mtx_lock(&range_lock); if (rangelock_islocked(range_regular, offset, length)) { pjdlog_debug(2, "sync: Range offset=%jd length=%jd locked.", (intmax_t)offset, (intmax_t)length); range_sync_wait = true; cv_wait(&range_sync_cond, &range_lock); range_sync_wait = false; mtx_unlock(&range_lock); continue; } if (rangelock_add(range_sync, offset, length) == -1) { mtx_unlock(&range_lock); pjdlog_debug(2, "sync: Range offset=%jd length=%jd is already locked, waiting.", (intmax_t)offset, (intmax_t)length); sleep(1); continue; } mtx_unlock(&range_lock); break; } /* * First read the data from synchronization source. */ SYNCREQ(hio); ggio = &hio->hio_ggio; ggio->gctl_cmd = BIO_READ; ggio->gctl_offset = offset; ggio->gctl_length = length; ggio->gctl_error = 0; hio->hio_done = false; hio->hio_replication = res->hr_replication; for (ii = 0; ii < ncomps; ii++) hio->hio_errors[ii] = EINVAL; reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", hio); pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", hio); mtx_lock(&metadata_lock); if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { /* * This range is up-to-date on local component, * so handle request locally. */ /* Local component is 0 for now. */ ncomp = 0; } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); /* * This range is out-of-date on local component, * so send request to the remote node. */ /* Remote component is 1 for now. */ ncomp = 1; } mtx_unlock(&metadata_lock); - refcount_init(&hio->hio_countdown, 1); + hio->hio_countdown = 1; QUEUE_INSERT1(hio, send, ncomp); /* * Let's wait for READ to finish. */ mtx_lock(&sync_lock); while (!ISSYNCREQDONE(hio)) cv_wait(&sync_cond, &sync_lock); mtx_unlock(&sync_lock); if (hio->hio_errors[ncomp] != 0) { pjdlog_error("Unable to read synchronization data: %s.", strerror(hio->hio_errors[ncomp])); goto free_queue; } /* * We read the data from synchronization source, now write it * to synchronization target. */ SYNCREQ(hio); ggio->gctl_cmd = BIO_WRITE; for (ii = 0; ii < ncomps; ii++) hio->hio_errors[ii] = EINVAL; reqlog(LOG_DEBUG, 2, ggio, "sync: (%p) Sending sync request: ", hio); pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", hio); mtx_lock(&metadata_lock); if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { /* * This range is up-to-date on local component, * so we update remote component. */ /* Remote component is 1 for now. */ ncomp = 1; } else /* if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) */ { PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); /* * This range is out-of-date on local component, * so we update it. */ /* Local component is 0 for now. */ ncomp = 0; } mtx_unlock(&metadata_lock); pjdlog_debug(2, "sync: (%p) Moving request to the send queue.", hio); - refcount_init(&hio->hio_countdown, 1); + hio->hio_countdown = 1; QUEUE_INSERT1(hio, send, ncomp); /* * Let's wait for WRITE to finish. */ mtx_lock(&sync_lock); while (!ISSYNCREQDONE(hio)) cv_wait(&sync_cond, &sync_lock); mtx_unlock(&sync_lock); if (hio->hio_errors[ncomp] != 0) { pjdlog_error("Unable to write synchronization data: %s.", strerror(hio->hio_errors[ncomp])); goto free_queue; } synced += length; free_queue: mtx_lock(&range_lock); rangelock_del(range_sync, offset, length); if (range_regular_wait) cv_signal(&range_regular_cond); mtx_unlock(&range_lock); pjdlog_debug(2, "sync: (%p) Moving request to the free queue.", hio); QUEUE_INSERT2(hio, free); } /* NOTREACHED */ return (NULL); } void primary_config_reload(struct hast_resource *res, struct nv *nv) { unsigned int ii, ncomps; int modified, vint; const char *vstr; pjdlog_info("Reloading configuration..."); PJDLOG_ASSERT(res->hr_role == HAST_ROLE_PRIMARY); PJDLOG_ASSERT(gres == res); nv_assert(nv, "remoteaddr"); nv_assert(nv, "sourceaddr"); nv_assert(nv, "replication"); nv_assert(nv, "checksum"); nv_assert(nv, "compression"); nv_assert(nv, "timeout"); nv_assert(nv, "exec"); nv_assert(nv, "metaflush"); ncomps = HAST_NCOMPONENTS; #define MODIFIED_REMOTEADDR 0x01 #define MODIFIED_SOURCEADDR 0x02 #define MODIFIED_REPLICATION 0x04 #define MODIFIED_CHECKSUM 0x08 #define MODIFIED_COMPRESSION 0x10 #define MODIFIED_TIMEOUT 0x20 #define MODIFIED_EXEC 0x40 #define MODIFIED_METAFLUSH 0x80 modified = 0; vstr = nv_get_string(nv, "remoteaddr"); if (strcmp(gres->hr_remoteaddr, vstr) != 0) { /* * Don't copy res->hr_remoteaddr to gres just yet. * We want remote_close() to log disconnect from the old * addresses, not from the new ones. */ modified |= MODIFIED_REMOTEADDR; } vstr = nv_get_string(nv, "sourceaddr"); if (strcmp(gres->hr_sourceaddr, vstr) != 0) { strlcpy(gres->hr_sourceaddr, vstr, sizeof(gres->hr_sourceaddr)); modified |= MODIFIED_SOURCEADDR; } vint = nv_get_int32(nv, "replication"); if (gres->hr_replication != vint) { gres->hr_replication = vint; modified |= MODIFIED_REPLICATION; } vint = nv_get_int32(nv, "checksum"); if (gres->hr_checksum != vint) { gres->hr_checksum = vint; modified |= MODIFIED_CHECKSUM; } vint = nv_get_int32(nv, "compression"); if (gres->hr_compression != vint) { gres->hr_compression = vint; modified |= MODIFIED_COMPRESSION; } vint = nv_get_int32(nv, "timeout"); if (gres->hr_timeout != vint) { gres->hr_timeout = vint; modified |= MODIFIED_TIMEOUT; } vstr = nv_get_string(nv, "exec"); if (strcmp(gres->hr_exec, vstr) != 0) { strlcpy(gres->hr_exec, vstr, sizeof(gres->hr_exec)); modified |= MODIFIED_EXEC; } vint = nv_get_int32(nv, "metaflush"); if (gres->hr_metaflush != vint) { gres->hr_metaflush = vint; modified |= MODIFIED_METAFLUSH; } /* * Change timeout for connected sockets. * Don't bother if we need to reconnect. */ if ((modified & MODIFIED_TIMEOUT) != 0 && (modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) == 0) { for (ii = 0; ii < ncomps; ii++) { if (!ISREMOTE(ii)) continue; rw_rlock(&hio_remote_lock[ii]); if (!ISCONNECTED(gres, ii)) { rw_unlock(&hio_remote_lock[ii]); continue; } rw_unlock(&hio_remote_lock[ii]); if (proto_timeout(gres->hr_remotein, gres->hr_timeout) == -1) { pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); } if (proto_timeout(gres->hr_remoteout, gres->hr_timeout) == -1) { pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); } } } if ((modified & (MODIFIED_REMOTEADDR | MODIFIED_SOURCEADDR)) != 0) { for (ii = 0; ii < ncomps; ii++) { if (!ISREMOTE(ii)) continue; remote_close(gres, ii); } if (modified & MODIFIED_REMOTEADDR) { vstr = nv_get_string(nv, "remoteaddr"); strlcpy(gres->hr_remoteaddr, vstr, sizeof(gres->hr_remoteaddr)); } } #undef MODIFIED_REMOTEADDR #undef MODIFIED_SOURCEADDR #undef MODIFIED_REPLICATION #undef MODIFIED_CHECKSUM #undef MODIFIED_COMPRESSION #undef MODIFIED_TIMEOUT #undef MODIFIED_EXEC #undef MODIFIED_METAFLUSH pjdlog_info("Configuration reloaded successfully."); } static void guard_one(struct hast_resource *res, unsigned int ncomp) { struct proto_conn *in, *out; if (!ISREMOTE(ncomp)) return; rw_rlock(&hio_remote_lock[ncomp]); if (!real_remote(res)) { rw_unlock(&hio_remote_lock[ncomp]); return; } if (ISCONNECTED(res, ncomp)) { PJDLOG_ASSERT(res->hr_remotein != NULL); PJDLOG_ASSERT(res->hr_remoteout != NULL); rw_unlock(&hio_remote_lock[ncomp]); pjdlog_debug(2, "remote_guard: Connection to %s is ok.", res->hr_remoteaddr); return; } PJDLOG_ASSERT(res->hr_remotein == NULL); PJDLOG_ASSERT(res->hr_remoteout == NULL); /* * Upgrade the lock. It doesn't have to be atomic as no other thread * can change connection status from disconnected to connected. */ rw_unlock(&hio_remote_lock[ncomp]); pjdlog_debug(2, "remote_guard: Reconnecting to %s.", res->hr_remoteaddr); in = out = NULL; if (init_remote(res, &in, &out) == 0) { rw_wlock(&hio_remote_lock[ncomp]); PJDLOG_ASSERT(res->hr_remotein == NULL); PJDLOG_ASSERT(res->hr_remoteout == NULL); PJDLOG_ASSERT(in != NULL && out != NULL); res->hr_remotein = in; res->hr_remoteout = out; rw_unlock(&hio_remote_lock[ncomp]); pjdlog_info("Successfully reconnected to %s.", res->hr_remoteaddr); sync_start(); } else { /* Both connections should be NULL. */ PJDLOG_ASSERT(res->hr_remotein == NULL); PJDLOG_ASSERT(res->hr_remoteout == NULL); PJDLOG_ASSERT(in == NULL && out == NULL); pjdlog_debug(2, "remote_guard: Reconnect to %s failed.", res->hr_remoteaddr); } } /* * Thread guards remote connections and reconnects when needed, handles * signals, etc. */ static void * guard_thread(void *arg) { struct hast_resource *res = arg; unsigned int ii, ncomps; struct timespec timeout; time_t lastcheck, now; sigset_t mask; int signo; ncomps = HAST_NCOMPONENTS; lastcheck = time(NULL); PJDLOG_VERIFY(sigemptyset(&mask) == 0); PJDLOG_VERIFY(sigaddset(&mask, SIGINT) == 0); PJDLOG_VERIFY(sigaddset(&mask, SIGTERM) == 0); timeout.tv_sec = HAST_KEEPALIVE; timeout.tv_nsec = 0; signo = -1; for (;;) { switch (signo) { case SIGINT: case SIGTERM: sigexit_received = true; primary_exitx(EX_OK, "Termination signal received, exiting."); break; default: break; } /* * Don't check connections until we fully started, * as we may still be looping, waiting for remote node * to switch from primary to secondary. */ if (fullystarted) { pjdlog_debug(2, "remote_guard: Checking connections."); now = time(NULL); if (lastcheck + HAST_KEEPALIVE <= now) { for (ii = 0; ii < ncomps; ii++) guard_one(res, ii); lastcheck = now; } } signo = sigtimedwait(&mask, NULL, &timeout); } /* NOTREACHED */ return (NULL); } diff --git a/sbin/hastd/refcnt.h b/sbin/hastd/refcnt.h new file mode 100644 index 000000000000..a989df04885d --- /dev/null +++ b/sbin/hastd/refcnt.h @@ -0,0 +1,57 @@ +/*- + * Copyright (c) 2005 John Baldwin + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the name of the author nor the names of any co-contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + * + * $FreeBSD$ + */ + +#ifndef __REFCNT_H__ +#define __REFCNT_H__ + +#include + +#include "pjdlog.h" + +static __inline void +refcnt_acquire(volatile unsigned int *count) +{ + + atomic_add_acq_int(count, 1); +} + +static __inline unsigned int +refcnt_release(volatile unsigned int *count) +{ + unsigned int old; + + /* XXX: Should this have a rel membar? */ + old = atomic_fetchadd_int(count, -1); + PJDLOG_ASSERT(old > 0); + return (old - 1); +} + +#endif /* ! __REFCNT_H__ */ diff --git a/sbin/hastd/secondary.c b/sbin/hastd/secondary.c index 8ebcd48efb51..71524e90a267 100644 --- a/sbin/hastd/secondary.c +++ b/sbin/hastd/secondary.c @@ -1,856 +1,900 @@ /*- * Copyright (c) 2009-2010 The FreeBSD Foundation * Copyright (c) 2010 Pawel Jakub Dawidek * All rights reserved. * * This software was developed by Pawel Jakub Dawidek under sponsorship from * the FreeBSD Foundation. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ #include __FBSDID("$FreeBSD$"); #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "control.h" #include "event.h" #include "hast.h" #include "hast_proto.h" #include "hastd.h" #include "hooks.h" #include "metadata.h" #include "proto.h" #include "subr.h" #include "synch.h" struct hio { uint64_t hio_seq; int hio_error; void *hio_data; uint8_t hio_cmd; uint64_t hio_offset; uint64_t hio_length; + bool hio_memsync; TAILQ_ENTRY(hio) hio_next; }; static struct hast_resource *gres; /* * Free list holds unused structures. When free list is empty, we have to wait * until some in-progress requests are freed. */ static TAILQ_HEAD(, hio) hio_free_list; static pthread_mutex_t hio_free_list_lock; static pthread_cond_t hio_free_list_cond; /* * Disk thread (the one that do I/O requests) takes requests from this list. */ static TAILQ_HEAD(, hio) hio_disk_list; static pthread_mutex_t hio_disk_list_lock; static pthread_cond_t hio_disk_list_cond; /* * There is one recv list for every component, although local components don't * use recv lists as local requests are done synchronously. */ static TAILQ_HEAD(, hio) hio_send_list; static pthread_mutex_t hio_send_list_lock; static pthread_cond_t hio_send_list_cond; /* * Maximum number of outstanding I/O requests. */ #define HAST_HIO_MAX 256 static void *recv_thread(void *arg); static void *disk_thread(void *arg); static void *send_thread(void *arg); #define QUEUE_INSERT(name, hio) do { \ bool _wakeup; \ \ mtx_lock(&hio_##name##_list_lock); \ _wakeup = TAILQ_EMPTY(&hio_##name##_list); \ TAILQ_INSERT_TAIL(&hio_##name##_list, (hio), hio_next); \ mtx_unlock(&hio_##name##_list_lock); \ if (_wakeup) \ cv_signal(&hio_##name##_list_cond); \ } while (0) #define QUEUE_TAKE(name, hio) do { \ mtx_lock(&hio_##name##_list_lock); \ while (((hio) = TAILQ_FIRST(&hio_##name##_list)) == NULL) { \ cv_wait(&hio_##name##_list_cond, \ &hio_##name##_list_lock); \ } \ TAILQ_REMOVE(&hio_##name##_list, (hio), hio_next); \ mtx_unlock(&hio_##name##_list_lock); \ } while (0) static void hio_clear(struct hio *hio) { hio->hio_seq = 0; hio->hio_error = 0; hio->hio_cmd = HIO_UNDEF; hio->hio_offset = 0; hio->hio_length = 0; + hio->hio_memsync = false; +} + +static void +hio_copy(const struct hio *srchio, struct hio *dsthio) +{ + + /* + * We don't copy hio_error, hio_data and hio_next fields. + */ + + dsthio->hio_seq = srchio->hio_seq; + dsthio->hio_cmd = srchio->hio_cmd; + dsthio->hio_offset = srchio->hio_offset; + dsthio->hio_length = srchio->hio_length; + dsthio->hio_memsync = srchio->hio_memsync; } static void init_environment(void) { struct hio *hio; unsigned int ii; /* * Initialize lists, their locks and theirs condition variables. */ TAILQ_INIT(&hio_free_list); mtx_init(&hio_free_list_lock); cv_init(&hio_free_list_cond); TAILQ_INIT(&hio_disk_list); mtx_init(&hio_disk_list_lock); cv_init(&hio_disk_list_cond); TAILQ_INIT(&hio_send_list); mtx_init(&hio_send_list_lock); cv_init(&hio_send_list_cond); /* * Allocate requests pool and initialize requests. */ for (ii = 0; ii < HAST_HIO_MAX; ii++) { hio = malloc(sizeof(*hio)); if (hio == NULL) { pjdlog_exitx(EX_TEMPFAIL, "Unable to allocate memory (%zu bytes) for hio request.", sizeof(*hio)); } hio->hio_data = malloc(MAXPHYS); if (hio->hio_data == NULL) { pjdlog_exitx(EX_TEMPFAIL, "Unable to allocate memory (%zu bytes) for gctl_data.", (size_t)MAXPHYS); } hio_clear(hio); TAILQ_INSERT_HEAD(&hio_free_list, hio, hio_next); } } static void init_local(struct hast_resource *res) { if (metadata_read(res, true) == -1) exit(EX_NOINPUT); } static void init_remote(struct hast_resource *res, struct nv *nvin) { uint64_t resuid; struct nv *nvout; unsigned char *map; size_t mapsize; #ifdef notyet /* Setup direction. */ if (proto_send(res->hr_remoteout, NULL, 0) == -1) pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); #endif nvout = nv_alloc(); nv_add_int64(nvout, (int64_t)res->hr_datasize, "datasize"); nv_add_int32(nvout, (int32_t)res->hr_extentsize, "extentsize"); resuid = nv_get_uint64(nvin, "resuid"); res->hr_primary_localcnt = nv_get_uint64(nvin, "localcnt"); res->hr_primary_remotecnt = nv_get_uint64(nvin, "remotecnt"); nv_add_uint64(nvout, res->hr_secondary_localcnt, "localcnt"); nv_add_uint64(nvout, res->hr_secondary_remotecnt, "remotecnt"); mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize - METADATA_SIZE, res->hr_extentsize, res->hr_local_sectorsize); map = malloc(mapsize); if (map == NULL) { pjdlog_exitx(EX_TEMPFAIL, "Unable to allocate memory (%zu bytes) for activemap.", mapsize); } /* * When we work as primary and secondary is missing we will increase * localcnt in our metadata. When secondary is connected and synced * we make localcnt be equal to remotecnt, which means nodes are more * or less in sync. * Split-brain condition is when both nodes are not able to communicate * and are both configured as primary nodes. In turn, they can both * make incompatible changes to the data and we have to detect that. * Under split-brain condition we will increase our localcnt on first * write and remote node will increase its localcnt on first write. * When we connect we can see that primary's localcnt is greater than * our remotecnt (primary was modified while we weren't watching) and * our localcnt is greater than primary's remotecnt (we were modified * while primary wasn't watching). * There are many possible combinations which are all gathered below. * Don't pay too much attention to exact numbers, the more important * is to compare them. We compare secondary's local with primary's * remote and secondary's remote with primary's local. * Note that every case where primary's localcnt is smaller than * secondary's remotecnt and where secondary's localcnt is smaller than * primary's remotecnt should be impossible in practise. We will perform * full synchronization then. Those cases are marked with an asterisk. * Regular synchronization means that only extents marked as dirty are * synchronized (regular synchronization). * * SECONDARY METADATA PRIMARY METADATA * local=3 remote=3 local=2 remote=2* ?! Full sync from secondary. * local=3 remote=3 local=2 remote=3* ?! Full sync from primary. * local=3 remote=3 local=2 remote=4* ?! Full sync from primary. * local=3 remote=3 local=3 remote=2 Primary is out-of-date, * regular sync from secondary. * local=3 remote=3 local=3 remote=3 Regular sync just in case. * local=3 remote=3 local=3 remote=4* ?! Full sync from primary. * local=3 remote=3 local=4 remote=2 Split-brain condition. * local=3 remote=3 local=4 remote=3 Secondary out-of-date, * regular sync from primary. * local=3 remote=3 local=4 remote=4* ?! Full sync from primary. */ if (res->hr_resuid == 0) { /* * Provider is used for the first time. If primary node done no * writes yet as well (we will find "virgin" argument) then * there is no need to synchronize anything. If primary node * done any writes already we have to synchronize everything. */ PJDLOG_ASSERT(res->hr_secondary_localcnt == 0); res->hr_resuid = resuid; if (metadata_write(res) == -1) exit(EX_NOINPUT); if (nv_exists(nvin, "virgin")) { free(map); map = NULL; mapsize = 0; } else { memset(map, 0xff, mapsize); } nv_add_int8(nvout, 1, "virgin"); nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); } else if (res->hr_resuid != resuid) { char errmsg[256]; free(map); (void)snprintf(errmsg, sizeof(errmsg), "Resource unique ID mismatch (primary=%ju, secondary=%ju).", (uintmax_t)resuid, (uintmax_t)res->hr_resuid); pjdlog_error("%s", errmsg); nv_add_string(nvout, errmsg, "errmsg"); if (hast_proto_send(res, res->hr_remotein, nvout, NULL, 0) == -1) { pjdlog_exit(EX_TEMPFAIL, "Unable to send response to %s", res->hr_remoteaddr); } nv_free(nvout); exit(EX_CONFIG); } else if ( /* Is primary out-of-date? */ (res->hr_secondary_localcnt > res->hr_primary_remotecnt && res->hr_secondary_remotecnt == res->hr_primary_localcnt) || /* Are the nodes more or less in sync? */ (res->hr_secondary_localcnt == res->hr_primary_remotecnt && res->hr_secondary_remotecnt == res->hr_primary_localcnt) || /* Is secondary out-of-date? */ (res->hr_secondary_localcnt == res->hr_primary_remotecnt && res->hr_secondary_remotecnt < res->hr_primary_localcnt)) { /* * Nodes are more or less in sync or one of the nodes is * out-of-date. * It doesn't matter at this point which one, we just have to * send out local bitmap to the remote node. */ if (pread(res->hr_localfd, map, mapsize, METADATA_SIZE) != (ssize_t)mapsize) { pjdlog_exit(LOG_ERR, "Unable to read activemap"); } if (res->hr_secondary_localcnt > res->hr_primary_remotecnt && res->hr_secondary_remotecnt == res->hr_primary_localcnt) { /* Primary is out-of-date, sync from secondary. */ nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc"); } else { /* * Secondary is out-of-date or counts match. * Sync from primary. */ nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); } } else if (res->hr_secondary_localcnt > res->hr_primary_remotecnt && res->hr_primary_localcnt > res->hr_secondary_remotecnt) { /* * Not good, we have split-brain condition. */ free(map); pjdlog_error("Split-brain detected, exiting."); nv_add_string(nvout, "Split-brain condition!", "errmsg"); if (hast_proto_send(res, res->hr_remotein, nvout, NULL, 0) == -1) { pjdlog_exit(EX_TEMPFAIL, "Unable to send response to %s", res->hr_remoteaddr); } nv_free(nvout); /* Exit on split-brain. */ event_send(res, EVENT_SPLITBRAIN); exit(EX_CONFIG); } else /* if (res->hr_secondary_localcnt < res->hr_primary_remotecnt || res->hr_primary_localcnt < res->hr_secondary_remotecnt) */ { /* * This should never happen in practise, but we will perform * full synchronization. */ PJDLOG_ASSERT(res->hr_secondary_localcnt < res->hr_primary_remotecnt || res->hr_primary_localcnt < res->hr_secondary_remotecnt); mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize - METADATA_SIZE, res->hr_extentsize, res->hr_local_sectorsize); memset(map, 0xff, mapsize); if (res->hr_secondary_localcnt > res->hr_primary_remotecnt) { /* In this one of five cases sync from secondary. */ nv_add_uint8(nvout, HAST_SYNCSRC_SECONDARY, "syncsrc"); } else { /* For the rest four cases sync from primary. */ nv_add_uint8(nvout, HAST_SYNCSRC_PRIMARY, "syncsrc"); } pjdlog_warning("This should never happen, asking for full synchronization (primary(local=%ju, remote=%ju), secondary(local=%ju, remote=%ju)).", (uintmax_t)res->hr_primary_localcnt, (uintmax_t)res->hr_primary_remotecnt, (uintmax_t)res->hr_secondary_localcnt, (uintmax_t)res->hr_secondary_remotecnt); } nv_add_uint32(nvout, (uint32_t)mapsize, "mapsize"); if (hast_proto_send(res, res->hr_remotein, nvout, map, mapsize) == -1) { pjdlog_exit(EX_TEMPFAIL, "Unable to send activemap to %s", res->hr_remoteaddr); } if (map != NULL) free(map); nv_free(nvout); #ifdef notyet /* Setup direction. */ if (proto_recv(res->hr_remotein, NULL, 0) == -1) pjdlog_errno(LOG_WARNING, "Unable to set connection direction"); #endif } void hastd_secondary(struct hast_resource *res, struct nv *nvin) { sigset_t mask; pthread_t td; pid_t pid; int error, mode, debuglevel; /* * Create communication channel between parent and child. */ if (proto_client(NULL, "socketpair://", &res->hr_ctrl) == -1) { KEEP_ERRNO((void)pidfile_remove(pfh)); pjdlog_exit(EX_OSERR, "Unable to create control sockets between parent and child"); } /* * Create communication channel between child and parent. */ if (proto_client(NULL, "socketpair://", &res->hr_event) == -1) { KEEP_ERRNO((void)pidfile_remove(pfh)); pjdlog_exit(EX_OSERR, "Unable to create event sockets between child and parent"); } pid = fork(); if (pid == -1) { KEEP_ERRNO((void)pidfile_remove(pfh)); pjdlog_exit(EX_OSERR, "Unable to fork"); } if (pid > 0) { /* This is parent. */ proto_close(res->hr_remotein); res->hr_remotein = NULL; proto_close(res->hr_remoteout); res->hr_remoteout = NULL; /* Declare that we are receiver. */ proto_recv(res->hr_event, NULL, 0); /* Declare that we are sender. */ proto_send(res->hr_ctrl, NULL, 0); res->hr_workerpid = pid; return; } gres = res; mode = pjdlog_mode_get(); debuglevel = pjdlog_debug_get(); /* Declare that we are sender. */ proto_send(res->hr_event, NULL, 0); /* Declare that we are receiver. */ proto_recv(res->hr_ctrl, NULL, 0); descriptors_cleanup(res); descriptors_assert(res, mode); pjdlog_init(mode); pjdlog_debug_set(debuglevel); pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role)); PJDLOG_VERIFY(sigemptyset(&mask) == 0); PJDLOG_VERIFY(sigprocmask(SIG_SETMASK, &mask, NULL) == 0); /* Error in setting timeout is not critical, but why should it fail? */ if (proto_timeout(res->hr_remotein, 2 * HAST_KEEPALIVE) == -1) pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); if (proto_timeout(res->hr_remoteout, res->hr_timeout) == -1) pjdlog_errno(LOG_WARNING, "Unable to set connection timeout"); init_local(res); init_environment(); if (drop_privs(res) != 0) exit(EX_CONFIG); pjdlog_info("Privileges successfully dropped."); /* * Create the control thread before sending any event to the parent, * as we can deadlock when parent sends control request to worker, * but worker has no control thread started yet, so parent waits. * In the meantime worker sends an event to the parent, but parent * is unable to handle the event, because it waits for control * request response. */ error = pthread_create(&td, NULL, ctrl_thread, res); PJDLOG_ASSERT(error == 0); init_remote(res, nvin); event_send(res, EVENT_CONNECT); error = pthread_create(&td, NULL, recv_thread, res); PJDLOG_ASSERT(error == 0); error = pthread_create(&td, NULL, disk_thread, res); PJDLOG_ASSERT(error == 0); (void)send_thread(res); } static void reqlog(int loglevel, int debuglevel, int error, struct hio *hio, const char *fmt, ...) { char msg[1024]; va_list ap; int len; va_start(ap, fmt); len = vsnprintf(msg, sizeof(msg), fmt, ap); va_end(ap); if ((size_t)len < sizeof(msg)) { switch (hio->hio_cmd) { case HIO_READ: (void)snprintf(msg + len, sizeof(msg) - len, "READ(%ju, %ju).", (uintmax_t)hio->hio_offset, (uintmax_t)hio->hio_length); break; case HIO_DELETE: (void)snprintf(msg + len, sizeof(msg) - len, "DELETE(%ju, %ju).", (uintmax_t)hio->hio_offset, (uintmax_t)hio->hio_length); break; case HIO_FLUSH: (void)snprintf(msg + len, sizeof(msg) - len, "FLUSH."); break; case HIO_WRITE: (void)snprintf(msg + len, sizeof(msg) - len, "WRITE(%ju, %ju).", (uintmax_t)hio->hio_offset, (uintmax_t)hio->hio_length); break; case HIO_KEEPALIVE: (void)snprintf(msg + len, sizeof(msg) - len, "KEEPALIVE."); break; default: (void)snprintf(msg + len, sizeof(msg) - len, "UNKNOWN(%u).", (unsigned int)hio->hio_cmd); break; } } pjdlog_common(loglevel, debuglevel, error, "%s", msg); } static int requnpack(struct hast_resource *res, struct hio *hio, struct nv *nv) { hio->hio_cmd = nv_get_uint8(nv, "cmd"); if (hio->hio_cmd == 0) { pjdlog_error("Header contains no 'cmd' field."); hio->hio_error = EINVAL; goto end; } if (hio->hio_cmd != HIO_KEEPALIVE) { hio->hio_seq = nv_get_uint64(nv, "seq"); if (hio->hio_seq == 0) { pjdlog_error("Header contains no 'seq' field."); hio->hio_error = EINVAL; goto end; } } switch (hio->hio_cmd) { case HIO_FLUSH: case HIO_KEEPALIVE: break; - case HIO_READ: case HIO_WRITE: + hio->hio_memsync = nv_exists(nv, "memsync"); + /* FALLTHROUGH */ + case HIO_READ: case HIO_DELETE: hio->hio_offset = nv_get_uint64(nv, "offset"); if (nv_error(nv) != 0) { pjdlog_error("Header is missing 'offset' field."); hio->hio_error = EINVAL; goto end; } hio->hio_length = nv_get_uint64(nv, "length"); if (nv_error(nv) != 0) { pjdlog_error("Header is missing 'length' field."); hio->hio_error = EINVAL; goto end; } if (hio->hio_length == 0) { pjdlog_error("Data length is zero."); hio->hio_error = EINVAL; goto end; } if (hio->hio_length > MAXPHYS) { pjdlog_error("Data length is too large (%ju > %ju).", (uintmax_t)hio->hio_length, (uintmax_t)MAXPHYS); hio->hio_error = EINVAL; goto end; } if ((hio->hio_offset % res->hr_local_sectorsize) != 0) { pjdlog_error("Offset %ju is not multiple of sector size.", (uintmax_t)hio->hio_offset); hio->hio_error = EINVAL; goto end; } if ((hio->hio_length % res->hr_local_sectorsize) != 0) { pjdlog_error("Length %ju is not multiple of sector size.", (uintmax_t)hio->hio_length); hio->hio_error = EINVAL; goto end; } if (hio->hio_offset + hio->hio_length > (uint64_t)res->hr_datasize) { pjdlog_error("Data offset is too large (%ju > %ju).", (uintmax_t)(hio->hio_offset + hio->hio_length), (uintmax_t)res->hr_datasize); hio->hio_error = EINVAL; goto end; } break; default: pjdlog_error("Header contains invalid 'cmd' (%hhu).", hio->hio_cmd); hio->hio_error = EINVAL; goto end; } hio->hio_error = 0; end: return (hio->hio_error); } static __dead2 void secondary_exit(int exitcode, const char *fmt, ...) { va_list ap; PJDLOG_ASSERT(exitcode != EX_OK); va_start(ap, fmt); pjdlogv_errno(LOG_ERR, fmt, ap); va_end(ap); event_send(gres, EVENT_DISCONNECT); exit(exitcode); } /* * Thread receives requests from the primary node. */ static void * recv_thread(void *arg) { struct hast_resource *res = arg; - struct hio *hio; + struct hio *hio, *mshio; struct nv *nv; for (;;) { pjdlog_debug(2, "recv: Taking free request."); QUEUE_TAKE(free, hio); pjdlog_debug(2, "recv: (%p) Got request.", hio); if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) { secondary_exit(EX_TEMPFAIL, "Unable to receive request header"); } if (requnpack(res, hio, nv) != 0) { nv_free(nv); pjdlog_debug(2, "recv: (%p) Moving request to the send queue.", hio); QUEUE_INSERT(send, hio); continue; } switch (hio->hio_cmd) { case HIO_READ: res->hr_stat_read++; break; case HIO_WRITE: res->hr_stat_write++; break; case HIO_DELETE: res->hr_stat_delete++; break; case HIO_FLUSH: res->hr_stat_flush++; break; case HIO_KEEPALIVE: break; default: PJDLOG_ABORT("Unexpected command (cmd=%hhu).", hio->hio_cmd); } reqlog(LOG_DEBUG, 2, -1, hio, "recv: (%p) Got request header: ", hio); if (hio->hio_cmd == HIO_KEEPALIVE) { nv_free(nv); pjdlog_debug(2, "recv: (%p) Moving request to the free queue.", hio); hio_clear(hio); QUEUE_INSERT(free, hio); continue; } else if (hio->hio_cmd == HIO_WRITE) { if (hast_proto_recv_data(res, res->hr_remotein, nv, hio->hio_data, MAXPHYS) == -1) { secondary_exit(EX_TEMPFAIL, "Unable to receive request data"); } + if (hio->hio_memsync) { + /* + * For memsync requests we expect two replies. + * Clone the hio so we can handle both of them. + */ + pjdlog_debug(2, "recv: Taking free request."); + QUEUE_TAKE(free, mshio); + pjdlog_debug(2, "recv: (%p) Got request.", + mshio); + hio_copy(hio, mshio); + mshio->hio_error = 0; + /* + * We want to keep 'memsync' tag only on the + * request going onto send queue (mshio). + */ + hio->hio_memsync = false; + pjdlog_debug(2, + "recv: (%p) Moving memsync request to the send queue.", + mshio); + QUEUE_INSERT(send, mshio); + } } nv_free(nv); pjdlog_debug(2, "recv: (%p) Moving request to the disk queue.", hio); QUEUE_INSERT(disk, hio); } /* NOTREACHED */ return (NULL); } /* * Thread reads from or writes to local component and also handles DELETE and * FLUSH requests. */ static void * disk_thread(void *arg) { struct hast_resource *res = arg; struct hio *hio; ssize_t ret; bool clear_activemap, logerror; clear_activemap = true; for (;;) { pjdlog_debug(2, "disk: Taking request."); QUEUE_TAKE(disk, hio); while (clear_activemap) { unsigned char *map; size_t mapsize; /* * When first request is received, it means that primary * already received our activemap, merged it and stored * locally. We can now safely clear our activemap. */ mapsize = activemap_calc_ondisk_size(res->hr_local_mediasize - METADATA_SIZE, res->hr_extentsize, res->hr_local_sectorsize); map = calloc(1, mapsize); if (map == NULL) { pjdlog_warning("Unable to allocate memory to clear local activemap."); break; } if (pwrite(res->hr_localfd, map, mapsize, METADATA_SIZE) != (ssize_t)mapsize) { pjdlog_errno(LOG_WARNING, "Unable to store cleared activemap"); free(map); break; } free(map); clear_activemap = false; pjdlog_debug(1, "Local activemap cleared."); break; } reqlog(LOG_DEBUG, 2, -1, hio, "disk: (%p) Got request: ", hio); logerror = true; /* Handle the actual request. */ switch (hio->hio_cmd) { case HIO_READ: ret = pread(res->hr_localfd, hio->hio_data, hio->hio_length, hio->hio_offset + res->hr_localoff); if (ret == -1) hio->hio_error = errno; else if (ret != (int64_t)hio->hio_length) hio->hio_error = EIO; else hio->hio_error = 0; break; case HIO_WRITE: ret = pwrite(res->hr_localfd, hio->hio_data, hio->hio_length, hio->hio_offset + res->hr_localoff); if (ret == -1) hio->hio_error = errno; else if (ret != (int64_t)hio->hio_length) hio->hio_error = EIO; else hio->hio_error = 0; break; case HIO_DELETE: ret = g_delete(res->hr_localfd, hio->hio_offset + res->hr_localoff, hio->hio_length); if (ret == -1) hio->hio_error = errno; else hio->hio_error = 0; break; case HIO_FLUSH: if (!res->hr_localflush) { ret = -1; hio->hio_error = EOPNOTSUPP; logerror = false; break; } ret = g_flush(res->hr_localfd); if (ret == -1) { if (errno == EOPNOTSUPP) res->hr_localflush = false; hio->hio_error = errno; } else { hio->hio_error = 0; } break; default: PJDLOG_ABORT("Unexpected command (cmd=%hhu).", hio->hio_cmd); } if (logerror && hio->hio_error != 0) { reqlog(LOG_ERR, 0, hio->hio_error, hio, "Request failed: "); } pjdlog_debug(2, "disk: (%p) Moving request to the send queue.", hio); QUEUE_INSERT(send, hio); } /* NOTREACHED */ return (NULL); } /* * Thread sends requests back to primary node. */ static void * send_thread(void *arg) { struct hast_resource *res = arg; struct nv *nvout; struct hio *hio; void *data; size_t length; for (;;) { pjdlog_debug(2, "send: Taking request."); QUEUE_TAKE(send, hio); reqlog(LOG_DEBUG, 2, -1, hio, "send: (%p) Got request: ", hio); nvout = nv_alloc(); /* Copy sequence number. */ nv_add_uint64(nvout, hio->hio_seq, "seq"); + if (hio->hio_memsync) { + PJDLOG_ASSERT(hio->hio_cmd == HIO_WRITE); + nv_add_int8(nvout, 1, "received"); + } switch (hio->hio_cmd) { case HIO_READ: if (hio->hio_error == 0) { data = hio->hio_data; length = hio->hio_length; break; } /* * We send no data in case of an error. */ /* FALLTHROUGH */ case HIO_DELETE: case HIO_FLUSH: case HIO_WRITE: data = NULL; length = 0; break; default: PJDLOG_ABORT("Unexpected command (cmd=%hhu).", hio->hio_cmd); } if (hio->hio_error != 0) nv_add_int16(nvout, hio->hio_error, "error"); if (hast_proto_send(res, res->hr_remoteout, nvout, data, length) == -1) { secondary_exit(EX_TEMPFAIL, "Unable to send reply"); } nv_free(nvout); pjdlog_debug(2, "send: (%p) Moving request to the free queue.", hio); hio_clear(hio); QUEUE_INSERT(free, hio); } /* NOTREACHED */ return (NULL); }