diff --git a/src/cluster.c b/src/cluster.c index 3439dab0a..9b8b3b3b8 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -10,7 +10,7 @@ /* We have 16384 hash slots. The hash slot of a given key is obtained * as the least significant 14 bits of the crc16 of the key. * - * However if the key contains the {...} pattern, only the part between + * However, if the key contains the {...} pattern, only the part between * { and } is hashed. This may be useful in the future to force certain * keys to be in the same node (assuming no resharding is in progress). */ unsigned int keyHashSlot(char *key, int keylen) { @@ -754,6 +754,10 @@ void clusterCommandMyId(client *c) { } } +char* getMyClusterId(void) { + return clusterNodeGetName(getMyClusterNode()); +} + void clusterCommandMyShardId(client *c) { char *sid = clusterNodeGetShardId(getMyClusterNode()); if (sid) { @@ -918,3 +922,523 @@ void clusterCommand(client *c) { return; } } + +/* Return the pointer to the cluster node that is able to serve the command. + * For the function to succeed the command should only target either: + * + * 1) A single key (even multiple times like RPOPLPUSH mylist mylist). + * 2) Multiple keys in the same hash slot, while the slot is stable (no + * resharding in progress). + * + * On success the function returns the node that is able to serve the request. + * If the node is not 'myself' a redirection must be performed. The kind of + * redirection is specified setting the integer passed by reference + * 'error_code', which will be set to CLUSTER_REDIR_ASK or + * CLUSTER_REDIR_MOVED. + * + * When the node is 'myself' 'error_code' is set to CLUSTER_REDIR_NONE. + * + * If the command fails NULL is returned, and the reason of the failure is + * provided via 'error_code', which will be set to: + * + * CLUSTER_REDIR_CROSS_SLOT if the request contains multiple keys that + * don't belong to the same hash slot. + * + * CLUSTER_REDIR_UNSTABLE if the request contains multiple keys + * belonging to the same slot, but the slot is not stable (in migration or + * importing state, likely because a resharding is in progress). + * + * CLUSTER_REDIR_DOWN_UNBOUND if the request addresses a slot which is + * not bound to any node. In this case the cluster global state should be + * already "down" but it is fragile to rely on the update of the global state, + * so we also handle it here. + * + * CLUSTER_REDIR_DOWN_STATE and CLUSTER_REDIR_DOWN_RO_STATE if the cluster is + * down but the user attempts to execute a command that addresses one or more keys. */ +clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) { + clusterNode *myself = getMyClusterNode(); + clusterNode *n = NULL; + robj *firstkey = NULL; + int multiple_keys = 0; + multiState *ms, _ms; + multiCmd mc; + int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0, + existing_keys = 0; + + /* Allow any key to be set if a module disabled cluster redirections. */ + if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION) + return myself; + + /* Set error code optimistically for the base case. */ + if (error_code) *error_code = CLUSTER_REDIR_NONE; + + /* Modules can turn off Redis Cluster redirection: this is useful + * when writing a module that implements a completely different + * distributed system. */ + + /* We handle all the cases as if they were EXEC commands, so we have + * a common code path for everything */ + if (cmd->proc == execCommand) { + /* If CLIENT_MULTI flag is not set EXEC is just going to return an + * error. */ + if (!(c->flags & CLIENT_MULTI)) return myself; + ms = &c->mstate; + } else { + /* In order to have a single codepath create a fake Multi State + * structure if the client is not in MULTI/EXEC state, this way + * we have a single codepath below. */ + ms = &_ms; + _ms.commands = &mc; + _ms.count = 1; + mc.argv = argv; + mc.argc = argc; + mc.cmd = cmd; + } + + int is_pubsubshard = cmd->proc == ssubscribeCommand || + cmd->proc == sunsubscribeCommand || + cmd->proc == spublishCommand; + + /* Check that all the keys are in the same hash slot, and obtain this + * slot and the node associated. */ + for (i = 0; i < ms->count; i++) { + struct redisCommand *mcmd; + robj **margv; + int margc, numkeys, j; + keyReference *keyindex; + + mcmd = ms->commands[i].cmd; + margc = ms->commands[i].argc; + margv = ms->commands[i].argv; + + getKeysResult result = GETKEYS_RESULT_INIT; + numkeys = getKeysFromCommand(mcmd,margv,margc,&result); + keyindex = result.keys; + + for (j = 0; j < numkeys; j++) { + robj *thiskey = margv[keyindex[j].pos]; + int thisslot = keyHashSlot((char*)thiskey->ptr, + sdslen(thiskey->ptr)); + + if (firstkey == NULL) { + /* This is the first key we see. Check what is the slot + * and node. */ + firstkey = thiskey; + slot = thisslot; + n = getNodeBySlot(slot); + + /* Error: If a slot is not served, we are in "cluster down" + * state. However the state is yet to be updated, so this was + * not trapped earlier in processCommand(). Report the same + * error to the client. */ + if (n == NULL) { + getKeysFreeResult(&result); + if (error_code) + *error_code = CLUSTER_REDIR_DOWN_UNBOUND; + return NULL; + } + + /* If we are migrating or importing this slot, we need to check + * if we have all the keys in the request (the only way we + * can safely serve the request, otherwise we return a TRYAGAIN + * error). To do so we set the importing/migrating state and + * increment a counter for every missing key. */ + if (n == myself && + getMigratingSlotDest(slot) != NULL) + { + migrating_slot = 1; + } else if (getImportingSlotSource(slot) != NULL) { + importing_slot = 1; + } + } else { + /* If it is not the first key/channel, make sure it is exactly + * the same key/channel as the first we saw. */ + if (slot != thisslot) { + /* Error: multiple keys from different slots. */ + getKeysFreeResult(&result); + if (error_code) + *error_code = CLUSTER_REDIR_CROSS_SLOT; + return NULL; + } + if (importing_slot && !multiple_keys && !equalStringObjects(firstkey,thiskey)) { + /* Flag this request as one with multiple different + * keys/channels when the slot is in importing state. */ + multiple_keys = 1; + } + } + + /* Migrating / Importing slot? Count keys we don't have. + * If it is pubsubshard command, it isn't required to check + * the channel being present or not in the node during the + * slot migration, the channel will be served from the source + * node until the migration completes with CLUSTER SETSLOT + * NODE . */ + int flags = LOOKUP_NOTOUCH | LOOKUP_NOSTATS | LOOKUP_NONOTIFY | LOOKUP_NOEXPIRE; + if ((migrating_slot || importing_slot) && !is_pubsubshard) + { + if (lookupKeyReadWithFlags(&server.db[0], thiskey, flags) == NULL) missing_keys++; + else existing_keys++; + } + } + getKeysFreeResult(&result); + } + + /* No key at all in command? then we can serve the request + * without redirections or errors in all the cases. */ + if (n == NULL) return myself; + + uint64_t cmd_flags = getCommandFlags(c); + /* Cluster is globally down but we got keys? We only serve the request + * if it is a read command and when allow_reads_when_down is enabled. */ + if (!isClusterHealthy()) { + if (is_pubsubshard) { + if (!server.cluster_allow_pubsubshard_when_down) { + if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE; + return NULL; + } + } else if (!server.cluster_allow_reads_when_down) { + /* The cluster is configured to block commands when the + * cluster is down. */ + if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE; + return NULL; + } else if (cmd_flags & CMD_WRITE) { + /* The cluster is configured to allow read only commands */ + if (error_code) *error_code = CLUSTER_REDIR_DOWN_RO_STATE; + return NULL; + } else { + /* Fall through and allow the command to be executed: + * this happens when server.cluster_allow_reads_when_down is + * true and the command is not a write command */ + } + } + + /* Return the hashslot by reference. */ + if (hashslot) *hashslot = slot; + + /* MIGRATE always works in the context of the local node if the slot + * is open (migrating or importing state). We need to be able to freely + * move keys among instances in this case. */ + if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand) + return myself; + + /* If we don't have all the keys and we are migrating the slot, send + * an ASK redirection or TRYAGAIN. */ + if (migrating_slot && missing_keys) { + /* If we have keys but we don't have all keys, we return TRYAGAIN */ + if (existing_keys) { + if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE; + return NULL; + } else { + if (error_code) *error_code = CLUSTER_REDIR_ASK; + return getMigratingSlotDest(slot); + } + } + + /* If we are receiving the slot, and the client correctly flagged the + * request as "ASKING", we can serve the request. However if the request + * involves multiple keys and we don't have them all, the only option is + * to send a TRYAGAIN error. */ + if (importing_slot && + (c->flags & CLIENT_ASKING || cmd_flags & CMD_ASKING)) + { + if (multiple_keys && missing_keys) { + if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE; + return NULL; + } else { + return myself; + } + } + + /* Handle the read-only client case reading from a slave: if this + * node is a slave and the request is about a hash slot our master + * is serving, we can reply without redirection. */ + int is_write_command = (cmd_flags & CMD_WRITE) || + (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE)); + if (((c->flags & CLIENT_READONLY) || is_pubsubshard) && + !is_write_command && + clusterNodeIsSlave(myself) && + clusterNodeGetSlaveof(myself) == n) + { + return myself; + } + + /* Base case: just return the right node. However, if this node is not + * myself, set error_code to MOVED since we need to issue a redirection. */ + if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED; + return n; +} + +/* Send the client the right redirection code, according to error_code + * that should be set to one of CLUSTER_REDIR_* macros. + * + * If CLUSTER_REDIR_ASK or CLUSTER_REDIR_MOVED error codes + * are used, then the node 'n' should not be NULL, but should be the + * node we want to mention in the redirection. Moreover hashslot should + * be set to the hash slot that caused the redirection. */ +void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code) { + if (error_code == CLUSTER_REDIR_CROSS_SLOT) { + addReplyError(c,"-CROSSSLOT Keys in request don't hash to the same slot"); + } else if (error_code == CLUSTER_REDIR_UNSTABLE) { + /* The request spawns multiple keys in the same slot, + * but the slot is not "stable" currently as there is + * a migration or import in progress. */ + addReplyError(c,"-TRYAGAIN Multiple keys request during rehashing of slot"); + } else if (error_code == CLUSTER_REDIR_DOWN_STATE) { + addReplyError(c,"-CLUSTERDOWN The cluster is down"); + } else if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) { + addReplyError(c,"-CLUSTERDOWN The cluster is down and only accepts read commands"); + } else if (error_code == CLUSTER_REDIR_DOWN_UNBOUND) { + addReplyError(c,"-CLUSTERDOWN Hash slot not served"); + } else if (error_code == CLUSTER_REDIR_MOVED || + error_code == CLUSTER_REDIR_ASK) + { + /* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */ + int port = getNodeClientPort(n, shouldReturnTlsInfo()); + addReplyErrorSds(c,sdscatprintf(sdsempty(), + "-%s %d %s:%d", + (error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED", + hashslot, getPreferredEndpoint(n), port)); + } else { + serverPanic("getNodeByQuery() unknown error."); + } +} + +/* This function is called by the function processing clients incrementally + * to detect timeouts, in order to handle the following case: + * + * 1) A client blocks with BLPOP or similar blocking operation. + * 2) The master migrates the hash slot elsewhere or turns into a slave. + * 3) The client may remain blocked forever (or up to the max timeout time) + * waiting for a key change that will never happen. + * + * If the client is found to be blocked into a hash slot this node no + * longer handles, the client is sent a redirection error, and the function + * returns 1. Otherwise 0 is returned and no operation is performed. */ +int clusterRedirectBlockedClientIfNeeded(client *c) { + clusterNode *myself = getMyClusterNode(); + if (c->flags & CLIENT_BLOCKED && + (c->bstate.btype == BLOCKED_LIST || + c->bstate.btype == BLOCKED_ZSET || + c->bstate.btype == BLOCKED_STREAM || + c->bstate.btype == BLOCKED_MODULE)) + { + dictEntry *de; + dictIterator *di; + + /* If the cluster is down, unblock the client with the right error. + * If the cluster is configured to allow reads on cluster down, we + * still want to emit this error since a write will be required + * to unblock them which may never come. */ + if (!isClusterHealthy()) { + clusterRedirectClient(c,NULL,0,CLUSTER_REDIR_DOWN_STATE); + return 1; + } + + /* If the client is blocked on module, but not on a specific key, + * don't unblock it (except for the CLUSTER_FAIL case above). */ + if (c->bstate.btype == BLOCKED_MODULE && !moduleClientIsBlockedOnKeys(c)) + return 0; + + /* All keys must belong to the same slot, so check first key only. */ + di = dictGetIterator(c->bstate.keys); + if ((de = dictNext(di)) != NULL) { + robj *key = dictGetKey(de); + int slot = keyHashSlot((char*)key->ptr, sdslen(key->ptr)); + clusterNode *node = getNodeBySlot(slot); + + /* if the client is read-only and attempting to access key that our + * replica can handle, allow it. */ + if ((c->flags & CLIENT_READONLY) && + !(c->lastcmd->flags & CMD_WRITE) && + clusterNodeIsSlave(myself) && clusterNodeGetSlaveof(myself) == node) + { + node = myself; + } + + /* We send an error and unblock the client if: + * 1) The slot is unassigned, emitting a cluster down error. + * 2) The slot is not handled by this node, nor being imported. */ + if (node != myself && getImportingSlotSource(slot) == NULL) + { + if (node == NULL) { + clusterRedirectClient(c,NULL,0, + CLUSTER_REDIR_DOWN_UNBOUND); + } else { + clusterRedirectClient(c,node,slot, + CLUSTER_REDIR_MOVED); + } + dictReleaseIterator(di); + return 1; + } + } + dictReleaseIterator(di); + } + return 0; +} + +/* Returns an indication if the replica node is fully available + * and should be listed in CLUSTER SLOTS response. + * Returns 1 for available nodes, 0 for nodes that have + * not finished their initial sync, in failed state, or are + * otherwise considered not available to serve read commands. */ +static int isReplicaAvailable(clusterNode *node) { + if (clusterNodeIsFailing(node)) { + return 0; + } + long long repl_offset = getReplOffset(node); + if (clusterNodeIsMyself(node)) { + /* Nodes do not update their own information + * in the cluster node list. */ + repl_offset = replicationGetSlaveOffset(); + } + return (repl_offset != 0); +} + +void addNodeToNodeReply(client *c, clusterNode *node) { + char* hostname = clusterNodeHostname(node); + addReplyArrayLen(c, 4); + if (server.cluster_preferred_endpoint_type == CLUSTER_ENDPOINT_TYPE_IP) { + addReplyBulkCString(c, clusterNodeIp(node)); + } else if (server.cluster_preferred_endpoint_type == CLUSTER_ENDPOINT_TYPE_HOSTNAME) { + if (hostname != NULL && hostname[0] != '\0') { + addReplyBulkCString(c, hostname); + } else { + addReplyBulkCString(c, "?"); + } + } else if (server.cluster_preferred_endpoint_type == CLUSTER_ENDPOINT_TYPE_UNKNOWN_ENDPOINT) { + addReplyNull(c); + } else { + serverPanic("Unrecognized preferred endpoint type"); + } + + /* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */ + addReplyLongLong(c, getNodeClientPort(node, shouldReturnTlsInfo())); + addReplyBulkCBuffer(c, clusterNodeGetName(node), CLUSTER_NAMELEN); + + /* Add the additional endpoint information, this is all the known networking information + * that is not the preferred endpoint. Note the logic is evaluated twice so we can + * correctly report the number of additional network arguments without using a deferred + * map, an assertion is made at the end to check we set the right length. */ + int length = 0; + if (server.cluster_preferred_endpoint_type != CLUSTER_ENDPOINT_TYPE_IP) { + length++; + } + if (server.cluster_preferred_endpoint_type != CLUSTER_ENDPOINT_TYPE_HOSTNAME + && hostname != NULL && hostname[0] != '\0') + { + length++; + } + addReplyMapLen(c, length); + + if (server.cluster_preferred_endpoint_type != CLUSTER_ENDPOINT_TYPE_IP) { + addReplyBulkCString(c, "ip"); + addReplyBulkCString(c, clusterNodeIp(node)); + length--; + } + if (server.cluster_preferred_endpoint_type != CLUSTER_ENDPOINT_TYPE_HOSTNAME + && hostname != NULL && hostname[0] != '\0') + { + addReplyBulkCString(c, "hostname"); + addReplyBulkCString(c, hostname); + length--; + } + serverAssert(length == 0); +} + +void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, int end_slot) { + int i, nested_elements = 3; /* slots (2) + master addr (1) */ + for (i = 0; i < getNumSlaves(node); i++) { + if (!isReplicaAvailable(getSlave(node, i))) continue; + nested_elements++; + } + addReplyArrayLen(c, nested_elements); + addReplyLongLong(c, start_slot); + addReplyLongLong(c, end_slot); + addNodeToNodeReply(c, node); + + /* Remaining nodes in reply are replicas for slot range */ + for (i = 0; i < getNumSlaves(node); i++) { + /* This loop is copy/pasted from clusterGenNodeDescription() + * with modifications for per-slot node aggregation. */ + if (!isReplicaAvailable(getSlave(node, i))) continue; + addNodeToNodeReply(c, getSlave(node, i)); + nested_elements--; + } + serverAssert(nested_elements == 3); /* Original 3 elements */ +} + +void clusterCommandSlots(client * c) { + /* Format: 1) 1) start slot + * 2) end slot + * 3) 1) master IP + * 2) master port + * 3) node ID + * 4) 1) replica IP + * 2) replica port + * 3) node ID + * ... continued until done + */ + clusterNode *n = NULL; + int num_masters = 0, start = -1; + void *slot_replylen = addReplyDeferredLen(c); + + for (int i = 0; i <= CLUSTER_SLOTS; i++) { + /* Find start node and slot id. */ + if (n == NULL) { + if (i == CLUSTER_SLOTS) break; + n = getNodeBySlot(i); + start = i; + continue; + } + + /* Add cluster slots info when occur different node with start + * or end of slot. */ + if (i == CLUSTER_SLOTS || n != getNodeBySlot(i)) { + addNodeReplyForClusterSlot(c, n, start, i-1); + num_masters++; + if (i == CLUSTER_SLOTS) break; + n = getNodeBySlot(i); + start = i; + } + } + setDeferredArrayLen(c, slot_replylen, num_masters); +} + +/* ----------------------------------------------------------------------------- + * Cluster functions related to serving / redirecting clients + * -------------------------------------------------------------------------- */ + +/* The ASKING command is required after a -ASK redirection. + * The client should issue ASKING before to actually send the command to + * the target instance. See the Redis Cluster specification for more + * information. */ +void askingCommand(client *c) { + if (server.cluster_enabled == 0) { + addReplyError(c,"This instance has cluster support disabled"); + return; + } + c->flags |= CLIENT_ASKING; + addReply(c,shared.ok); +} + +/* The READONLY command is used by clients to enter the read-only mode. + * In this mode slaves will not redirect clients as long as clients access + * with read-only commands to keys that are served by the slave's master. */ +void readonlyCommand(client *c) { + if (server.cluster_enabled == 0) { + addReplyError(c,"This instance has cluster support disabled"); + return; + } + c->flags |= CLIENT_READONLY; + addReply(c,shared.ok); +} + +/* The READWRITE command just clears the READONLY command state. */ +void readwriteCommand(client *c) { + if (server.cluster_enabled == 0) { + addReplyError(c,"This instance has cluster support disabled"); + return; + } + c->flags &= ~CLIENT_READONLY; + addReply(c,shared.ok); +} diff --git a/src/cluster.h b/src/cluster.h index 5160582b3..9f6e482f4 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -87,6 +87,15 @@ void clusterCommandShards(client *c); sds clusterGenNodeDescription(client *c, clusterNode *node, int tls_primary); int getNumSlaves(clusterNode *node); clusterNode *getSlave(clusterNode *node, int slave_idx); +clusterNode *getMigratingSlotDest(int slot); +clusterNode *getImportingSlotSource(int slot); +int isClusterHealthy(void); +clusterNode *getNodeBySlot(int slot); +int getNodeClientPort(clusterNode *n, int use_tls); +char* clusterNodeHostname(clusterNode *node); +const char *getPreferredEndpoint(clusterNode *n); +void migrateCommand(client *c); +long long getReplOffset(clusterNode *node); char **clusterDebugCommandHelp(void); ConnectionType *connTypeOfCluster(void); diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 2b080ef89..0fd8b0a20 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -112,7 +112,7 @@ static inline int getNodeDefaultReplicationPort(clusterNode *n) { return server.tls_replication ? n->tls_port : n->tcp_port; } -static inline int getNodeClientPort(clusterNode *n, int use_tls) { +int getNodeClientPort(clusterNode *n, int use_tls) { return use_tls ? n->tls_port : n->tcp_port; } @@ -5402,15 +5402,6 @@ void addReplyClusterLinksDescription(client *c) { * CLUSTER command * -------------------------------------------------------------------------- */ -const char *getPreferredEndpoint(clusterNode *n) { - switch(server.cluster_preferred_endpoint_type) { - case CLUSTER_ENDPOINT_TYPE_IP: return n->ip; - case CLUSTER_ENDPOINT_TYPE_HOSTNAME: return (sdslen(n->hostname) != 0) ? n->hostname : "?"; - case CLUSTER_ENDPOINT_TYPE_UNKNOWN_ENDPOINT: return ""; - } - return "unknown"; -} - const char *clusterGetMessageTypeString(int type) { switch(type) { case CLUSTERMSG_TYPE_PING: return "ping"; @@ -5440,24 +5431,6 @@ int getSlotOrReply(client *c, robj *o) { return (int) slot; } -/* Returns an indication if the replica node is fully available - * and should be listed in CLUSTER SLOTS response. - * Returns 1 for available nodes, 0 for nodes that have - * not finished their initial sync, in failed state, or are - * otherwise considered not available to serve read commands. */ -static int isReplicaAvailable(clusterNode *node) { - if (nodeFailed(node)) { - return 0; - } - long long repl_offset = node->repl_offset; - if (node->flags & CLUSTER_NODE_MYSELF) { - /* Nodes do not update their own information - * in the cluster node list. */ - repl_offset = replicationGetSlaveOffset(); - } - return (repl_offset != 0); -} - int checkSlotAssignmentsOrReply(client *c, unsigned char *slots, int del, int start_slot, int end_slot) { int slot; for (slot = start_slot; slot <= end_slot; slot++) { @@ -5494,78 +5467,6 @@ void clusterUpdateSlots(client *c, unsigned char *slots, int del) { } } -void addNodeToNodeReply(client *c, clusterNode *node) { - addReplyArrayLen(c, 4); - if (server.cluster_preferred_endpoint_type == CLUSTER_ENDPOINT_TYPE_IP) { - addReplyBulkCString(c, node->ip); - } else if (server.cluster_preferred_endpoint_type == CLUSTER_ENDPOINT_TYPE_HOSTNAME) { - if (sdslen(node->hostname) != 0) { - addReplyBulkCBuffer(c, node->hostname, sdslen(node->hostname)); - } else { - addReplyBulkCString(c, "?"); - } - } else if (server.cluster_preferred_endpoint_type == CLUSTER_ENDPOINT_TYPE_UNKNOWN_ENDPOINT) { - addReplyNull(c); - } else { - serverPanic("Unrecognized preferred endpoint type"); - } - - /* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */ - addReplyLongLong(c, getNodeClientPort(node, shouldReturnTlsInfo())); - addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN); - - /* Add the additional endpoint information, this is all the known networking information - * that is not the preferred endpoint. Note the logic is evaluated twice so we can - * correctly report the number of additional network arguments without using a deferred - * map, an assertion is made at the end to check we set the right length. */ - int length = 0; - if (server.cluster_preferred_endpoint_type != CLUSTER_ENDPOINT_TYPE_IP) { - length++; - } - if (server.cluster_preferred_endpoint_type != CLUSTER_ENDPOINT_TYPE_HOSTNAME - && sdslen(node->hostname) != 0) - { - length++; - } - addReplyMapLen(c, length); - - if (server.cluster_preferred_endpoint_type != CLUSTER_ENDPOINT_TYPE_IP) { - addReplyBulkCString(c, "ip"); - addReplyBulkCString(c, node->ip); - length--; - } - if (server.cluster_preferred_endpoint_type != CLUSTER_ENDPOINT_TYPE_HOSTNAME - && sdslen(node->hostname) != 0) - { - addReplyBulkCString(c, "hostname"); - addReplyBulkCBuffer(c, node->hostname, sdslen(node->hostname)); - length--; - } - serverAssert(length == 0); -} - -void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, int end_slot) { - int i, nested_elements = 3; /* slots (2) + master addr (1) */ - for (i = 0; i < node->numslaves; i++) { - if (!isReplicaAvailable(node->slaves[i])) continue; - nested_elements++; - } - addReplyArrayLen(c, nested_elements); - addReplyLongLong(c, start_slot); - addReplyLongLong(c, end_slot); - addNodeToNodeReply(c, node); - - /* Remaining nodes in reply are replicas for slot range */ - for (i = 0; i < node->numslaves; i++) { - /* This loop is copy/pasted from clusterGenNodeDescription() - * with modifications for per-slot node aggregation. */ - if (!isReplicaAvailable(node->slaves[i])) continue; - addNodeToNodeReply(c, node->slaves[i]); - nested_elements--; - } - serverAssert(nested_elements == 3); /* Original 3 elements */ -} - /* Add detailed information of a node to the output buffer of the given client. */ void addNodeDetailsToShardReply(client *c, clusterNode *node) { int reply_count = 0; @@ -5675,43 +5576,6 @@ void clusterCommandShards(client *c) { dictReleaseIterator(di); } -void clusterCommandSlots(client * c) { - /* Format: 1) 1) start slot - * 2) end slot - * 3) 1) master IP - * 2) master port - * 3) node ID - * 4) 1) replica IP - * 2) replica port - * 3) node ID - * ... continued until done - */ - clusterNode *n = NULL; - int num_masters = 0, start = -1; - void *slot_replylen = addReplyDeferredLen(c); - - for (int i = 0; i <= CLUSTER_SLOTS; i++) { - /* Find start node and slot id. */ - if (n == NULL) { - if (i == CLUSTER_SLOTS) break; - n = server.cluster->slots[i]; - start = i; - continue; - } - - /* Add cluster slots info when occur different node with start - * or end of slot. */ - if (i == CLUSTER_SLOTS || n != server.cluster->slots[i]) { - addNodeReplyForClusterSlot(c, n, start, i-1); - num_masters++; - if (i == CLUSTER_SLOTS) break; - n = server.cluster->slots[i]; - start = i; - } - } - setDeferredArrayLen(c, slot_replylen, num_masters); -} - sds genClusterInfoString(void) { sds info = sdsempty(); char *statestr[] = {"ok","fail"}; @@ -5816,396 +5680,6 @@ void removeChannelsInSlot(unsigned int slot) { } -/* ----------------------------------------------------------------------------- - * Cluster functions related to serving / redirecting clients - * -------------------------------------------------------------------------- */ - -/* The ASKING command is required after a -ASK redirection. - * The client should issue ASKING before to actually send the command to - * the target instance. See the Redis Cluster specification for more - * information. */ -void askingCommand(client *c) { - if (server.cluster_enabled == 0) { - addReplyError(c,"This instance has cluster support disabled"); - return; - } - c->flags |= CLIENT_ASKING; - addReply(c,shared.ok); -} - -/* The READONLY command is used by clients to enter the read-only mode. - * In this mode slaves will not redirect clients as long as clients access - * with read-only commands to keys that are served by the slave's master. */ -void readonlyCommand(client *c) { - if (server.cluster_enabled == 0) { - addReplyError(c,"This instance has cluster support disabled"); - return; - } - c->flags |= CLIENT_READONLY; - addReply(c,shared.ok); -} - -/* The READWRITE command just clears the READONLY command state. */ -void readwriteCommand(client *c) { - if (server.cluster_enabled == 0) { - addReplyError(c,"This instance has cluster support disabled"); - return; - } - c->flags &= ~CLIENT_READONLY; - addReply(c,shared.ok); -} - -/* Return the pointer to the cluster node that is able to serve the command. - * For the function to succeed the command should only target either: - * - * 1) A single key (even multiple times like RPOPLPUSH mylist mylist). - * 2) Multiple keys in the same hash slot, while the slot is stable (no - * resharding in progress). - * - * On success the function returns the node that is able to serve the request. - * If the node is not 'myself' a redirection must be performed. The kind of - * redirection is specified setting the integer passed by reference - * 'error_code', which will be set to CLUSTER_REDIR_ASK or - * CLUSTER_REDIR_MOVED. - * - * When the node is 'myself' 'error_code' is set to CLUSTER_REDIR_NONE. - * - * If the command fails NULL is returned, and the reason of the failure is - * provided via 'error_code', which will be set to: - * - * CLUSTER_REDIR_CROSS_SLOT if the request contains multiple keys that - * don't belong to the same hash slot. - * - * CLUSTER_REDIR_UNSTABLE if the request contains multiple keys - * belonging to the same slot, but the slot is not stable (in migration or - * importing state, likely because a resharding is in progress). - * - * CLUSTER_REDIR_DOWN_UNBOUND if the request addresses a slot which is - * not bound to any node. In this case the cluster global state should be - * already "down" but it is fragile to rely on the update of the global state, - * so we also handle it here. - * - * CLUSTER_REDIR_DOWN_STATE and CLUSTER_REDIR_DOWN_RO_STATE if the cluster is - * down but the user attempts to execute a command that addresses one or more keys. */ -clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) { - clusterNode *n = NULL; - robj *firstkey = NULL; - int multiple_keys = 0; - multiState *ms, _ms; - multiCmd mc; - int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0, - existing_keys = 0; - - /* Allow any key to be set if a module disabled cluster redirections. */ - if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION) - return myself; - - /* Set error code optimistically for the base case. */ - if (error_code) *error_code = CLUSTER_REDIR_NONE; - - /* Modules can turn off Redis Cluster redirection: this is useful - * when writing a module that implements a completely different - * distributed system. */ - - /* We handle all the cases as if they were EXEC commands, so we have - * a common code path for everything */ - if (cmd->proc == execCommand) { - /* If CLIENT_MULTI flag is not set EXEC is just going to return an - * error. */ - if (!(c->flags & CLIENT_MULTI)) return myself; - ms = &c->mstate; - } else { - /* In order to have a single codepath create a fake Multi State - * structure if the client is not in MULTI/EXEC state, this way - * we have a single codepath below. */ - ms = &_ms; - _ms.commands = &mc; - _ms.count = 1; - mc.argv = argv; - mc.argc = argc; - mc.cmd = cmd; - } - - int is_pubsubshard = cmd->proc == ssubscribeCommand || - cmd->proc == sunsubscribeCommand || - cmd->proc == spublishCommand; - - /* Check that all the keys are in the same hash slot, and obtain this - * slot and the node associated. */ - for (i = 0; i < ms->count; i++) { - struct redisCommand *mcmd; - robj **margv; - int margc, numkeys, j; - keyReference *keyindex; - - mcmd = ms->commands[i].cmd; - margc = ms->commands[i].argc; - margv = ms->commands[i].argv; - - getKeysResult result = GETKEYS_RESULT_INIT; - numkeys = getKeysFromCommand(mcmd,margv,margc,&result); - keyindex = result.keys; - - for (j = 0; j < numkeys; j++) { - robj *thiskey = margv[keyindex[j].pos]; - int thisslot = keyHashSlot((char*)thiskey->ptr, - sdslen(thiskey->ptr)); - - if (firstkey == NULL) { - /* This is the first key we see. Check what is the slot - * and node. */ - firstkey = thiskey; - slot = thisslot; - n = server.cluster->slots[slot]; - - /* Error: If a slot is not served, we are in "cluster down" - * state. However the state is yet to be updated, so this was - * not trapped earlier in processCommand(). Report the same - * error to the client. */ - if (n == NULL) { - getKeysFreeResult(&result); - if (error_code) - *error_code = CLUSTER_REDIR_DOWN_UNBOUND; - return NULL; - } - - /* If we are migrating or importing this slot, we need to check - * if we have all the keys in the request (the only way we - * can safely serve the request, otherwise we return a TRYAGAIN - * error). To do so we set the importing/migrating state and - * increment a counter for every missing key. */ - if (n == myself && - server.cluster->migrating_slots_to[slot] != NULL) - { - migrating_slot = 1; - } else if (server.cluster->importing_slots_from[slot] != NULL) { - importing_slot = 1; - } - } else { - /* If it is not the first key/channel, make sure it is exactly - * the same key/channel as the first we saw. */ - if (slot != thisslot) { - /* Error: multiple keys from different slots. */ - getKeysFreeResult(&result); - if (error_code) - *error_code = CLUSTER_REDIR_CROSS_SLOT; - return NULL; - } - if (importing_slot && !multiple_keys && !equalStringObjects(firstkey,thiskey)) { - /* Flag this request as one with multiple different - * keys/channels when the slot is in importing state. */ - multiple_keys = 1; - } - } - - /* Migrating / Importing slot? Count keys we don't have. - * If it is pubsubshard command, it isn't required to check - * the channel being present or not in the node during the - * slot migration, the channel will be served from the source - * node until the migration completes with CLUSTER SETSLOT - * NODE . */ - int flags = LOOKUP_NOTOUCH | LOOKUP_NOSTATS | LOOKUP_NONOTIFY | LOOKUP_NOEXPIRE; - if ((migrating_slot || importing_slot) && !is_pubsubshard) - { - if (lookupKeyReadWithFlags(&server.db[0], thiskey, flags) == NULL) missing_keys++; - else existing_keys++; - } - } - getKeysFreeResult(&result); - } - - /* No key at all in command? then we can serve the request - * without redirections or errors in all the cases. */ - if (n == NULL) return myself; - - uint64_t cmd_flags = getCommandFlags(c); - /* Cluster is globally down but we got keys? We only serve the request - * if it is a read command and when allow_reads_when_down is enabled. */ - if (server.cluster->state != CLUSTER_OK) { - if (is_pubsubshard) { - if (!server.cluster_allow_pubsubshard_when_down) { - if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE; - return NULL; - } - } else if (!server.cluster_allow_reads_when_down) { - /* The cluster is configured to block commands when the - * cluster is down. */ - if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE; - return NULL; - } else if (cmd_flags & CMD_WRITE) { - /* The cluster is configured to allow read only commands */ - if (error_code) *error_code = CLUSTER_REDIR_DOWN_RO_STATE; - return NULL; - } else { - /* Fall through and allow the command to be executed: - * this happens when server.cluster_allow_reads_when_down is - * true and the command is not a write command */ - } - } - - /* Return the hashslot by reference. */ - if (hashslot) *hashslot = slot; - - /* MIGRATE always works in the context of the local node if the slot - * is open (migrating or importing state). We need to be able to freely - * move keys among instances in this case. */ - if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand) - return myself; - - /* If we don't have all the keys and we are migrating the slot, send - * an ASK redirection or TRYAGAIN. */ - if (migrating_slot && missing_keys) { - /* If we have keys but we don't have all keys, we return TRYAGAIN */ - if (existing_keys) { - if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE; - return NULL; - } else { - if (error_code) *error_code = CLUSTER_REDIR_ASK; - return server.cluster->migrating_slots_to[slot]; - } - } - - /* If we are receiving the slot, and the client correctly flagged the - * request as "ASKING", we can serve the request. However if the request - * involves multiple keys and we don't have them all, the only option is - * to send a TRYAGAIN error. */ - if (importing_slot && - (c->flags & CLIENT_ASKING || cmd_flags & CMD_ASKING)) - { - if (multiple_keys && missing_keys) { - if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE; - return NULL; - } else { - return myself; - } - } - - /* Handle the read-only client case reading from a slave: if this - * node is a slave and the request is about a hash slot our master - * is serving, we can reply without redirection. */ - int is_write_command = (cmd_flags & CMD_WRITE) || - (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE)); - if (((c->flags & CLIENT_READONLY) || is_pubsubshard) && - !is_write_command && - nodeIsSlave(myself) && - myself->slaveof == n) - { - return myself; - } - - /* Base case: just return the right node. However if this node is not - * myself, set error_code to MOVED since we need to issue a redirection. */ - if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED; - return n; -} - -/* Send the client the right redirection code, according to error_code - * that should be set to one of CLUSTER_REDIR_* macros. - * - * If CLUSTER_REDIR_ASK or CLUSTER_REDIR_MOVED error codes - * are used, then the node 'n' should not be NULL, but should be the - * node we want to mention in the redirection. Moreover hashslot should - * be set to the hash slot that caused the redirection. */ -void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code) { - if (error_code == CLUSTER_REDIR_CROSS_SLOT) { - addReplyError(c,"-CROSSSLOT Keys in request don't hash to the same slot"); - } else if (error_code == CLUSTER_REDIR_UNSTABLE) { - /* The request spawns multiple keys in the same slot, - * but the slot is not "stable" currently as there is - * a migration or import in progress. */ - addReplyError(c,"-TRYAGAIN Multiple keys request during rehashing of slot"); - } else if (error_code == CLUSTER_REDIR_DOWN_STATE) { - addReplyError(c,"-CLUSTERDOWN The cluster is down"); - } else if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) { - addReplyError(c,"-CLUSTERDOWN The cluster is down and only accepts read commands"); - } else if (error_code == CLUSTER_REDIR_DOWN_UNBOUND) { - addReplyError(c,"-CLUSTERDOWN Hash slot not served"); - } else if (error_code == CLUSTER_REDIR_MOVED || - error_code == CLUSTER_REDIR_ASK) - { - /* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */ - int port = getNodeClientPort(n, shouldReturnTlsInfo()); - addReplyErrorSds(c,sdscatprintf(sdsempty(), - "-%s %d %s:%d", - (error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED", - hashslot, getPreferredEndpoint(n), port)); - } else { - serverPanic("getNodeByQuery() unknown error."); - } -} - -/* This function is called by the function processing clients incrementally - * to detect timeouts, in order to handle the following case: - * - * 1) A client blocks with BLPOP or similar blocking operation. - * 2) The master migrates the hash slot elsewhere or turns into a slave. - * 3) The client may remain blocked forever (or up to the max timeout time) - * waiting for a key change that will never happen. - * - * If the client is found to be blocked into a hash slot this node no - * longer handles, the client is sent a redirection error, and the function - * returns 1. Otherwise 0 is returned and no operation is performed. */ -int clusterRedirectBlockedClientIfNeeded(client *c) { - if (c->flags & CLIENT_BLOCKED && - (c->bstate.btype == BLOCKED_LIST || - c->bstate.btype == BLOCKED_ZSET || - c->bstate.btype == BLOCKED_STREAM || - c->bstate.btype == BLOCKED_MODULE)) - { - dictEntry *de; - dictIterator *di; - - /* If the cluster is down, unblock the client with the right error. - * If the cluster is configured to allow reads on cluster down, we - * still want to emit this error since a write will be required - * to unblock them which may never come. */ - if (server.cluster->state == CLUSTER_FAIL) { - clusterRedirectClient(c,NULL,0,CLUSTER_REDIR_DOWN_STATE); - return 1; - } - - /* If the client is blocked on module, but not on a specific key, - * don't unblock it (except for the CLUSTER_FAIL case above). */ - if (c->bstate.btype == BLOCKED_MODULE && !moduleClientIsBlockedOnKeys(c)) - return 0; - - /* All keys must belong to the same slot, so check first key only. */ - di = dictGetIterator(c->bstate.keys); - if ((de = dictNext(di)) != NULL) { - robj *key = dictGetKey(de); - int slot = keyHashSlot((char*)key->ptr, sdslen(key->ptr)); - clusterNode *node = server.cluster->slots[slot]; - - /* if the client is read-only and attempting to access key that our - * replica can handle, allow it. */ - if ((c->flags & CLIENT_READONLY) && - !(c->lastcmd->flags & CMD_WRITE) && - nodeIsSlave(myself) && myself->slaveof == node) - { - node = myself; - } - - /* We send an error and unblock the client if: - * 1) The slot is unassigned, emitting a cluster down error. - * 2) The slot is not handled by this node, nor being imported. */ - if (node != myself && - server.cluster->importing_slots_from[slot] == NULL) - { - if (node == NULL) { - clusterRedirectClient(c,NULL,0, - CLUSTER_REDIR_DOWN_UNBOUND); - } else { - clusterRedirectClient(c,node,slot, - CLUSTER_REDIR_MOVED); - } - dictReleaseIterator(di); - return 1; - } - } - dictReleaseIterator(di); - } - return 0; -} /* Remove all the keys in the specified hash slot. * The number of removed items is returned. */ @@ -6295,10 +5769,6 @@ int clusterManualFailoverTimeLimit(void) { return server.cluster->mf_end; } -char* getMyClusterId(void) { - return server.cluster->myself->name; -} - int getClusterSize(void) { return dictSize(server.cluster->nodes); } @@ -6904,3 +6374,37 @@ int getNumSlaves(clusterNode *node) { clusterNode *getSlave(clusterNode *node, int slave_idx) { return node->slaves[slave_idx]; } + +clusterNode *getMigratingSlotDest(int slot) { + return server.cluster->migrating_slots_to[slot]; +} + +clusterNode *getImportingSlotSource(int slot) { + return server.cluster->importing_slots_from[slot]; +} + +int isClusterHealthy(void) { + return server.cluster->state == CLUSTER_OK; +} + +clusterNode *getNodeBySlot(int slot) { + return server.cluster->slots[slot]; +} + +char* clusterNodeHostname(clusterNode *node) { + return node->hostname; +} + +long long getReplOffset(clusterNode *node) { + return node->repl_offset; +} + +const char *getPreferredEndpoint(clusterNode *n) { + char* hostname = clusterNodeHostname(n); + switch(server.cluster_preferred_endpoint_type) { + case CLUSTER_ENDPOINT_TYPE_IP: return clusterNodeIp(n); + case CLUSTER_ENDPOINT_TYPE_HOSTNAME: return (hostname != NULL && hostname[0] != '\0') ? hostname : "?"; + case CLUSTER_ENDPOINT_TYPE_UNKNOWN_ENDPOINT: return ""; + } + return "unknown"; +}