Cluster refactor: Make clustering functions common

Move primary functions used to implement datapath
clustering into cluster.c, making them shared. This
required adding "accessor" and other functions to
abstract access to node details and cluster state.

Signed-off-by: Josh Hershberg <yehoshua@redis.com>
This commit is contained in:
Josh Hershberg 2023-11-02 11:38:31 +02:00
parent 4afc54ad9b
commit c6157b3510
3 changed files with 569 additions and 532 deletions

View File

@ -10,7 +10,7 @@
/* We have 16384 hash slots. The hash slot of a given key is obtained
* as the least significant 14 bits of the crc16 of the key.
*
* However if the key contains the {...} pattern, only the part between
* However, if the key contains the {...} pattern, only the part between
* { and } is hashed. This may be useful in the future to force certain
* keys to be in the same node (assuming no resharding is in progress). */
unsigned int keyHashSlot(char *key, int keylen) {
@ -754,6 +754,10 @@ void clusterCommandMyId(client *c) {
}
}
char* getMyClusterId(void) {
return clusterNodeGetName(getMyClusterNode());
}
void clusterCommandMyShardId(client *c) {
char *sid = clusterNodeGetShardId(getMyClusterNode());
if (sid) {
@ -918,3 +922,523 @@ void clusterCommand(client *c) {
return;
}
}
/* Return the pointer to the cluster node that is able to serve the command.
* For the function to succeed the command should only target either:
*
* 1) A single key (even multiple times like RPOPLPUSH mylist mylist).
* 2) Multiple keys in the same hash slot, while the slot is stable (no
* resharding in progress).
*
* On success the function returns the node that is able to serve the request.
* If the node is not 'myself' a redirection must be performed. The kind of
* redirection is specified setting the integer passed by reference
* 'error_code', which will be set to CLUSTER_REDIR_ASK or
* CLUSTER_REDIR_MOVED.
*
* When the node is 'myself' 'error_code' is set to CLUSTER_REDIR_NONE.
*
* If the command fails NULL is returned, and the reason of the failure is
* provided via 'error_code', which will be set to:
*
* CLUSTER_REDIR_CROSS_SLOT if the request contains multiple keys that
* don't belong to the same hash slot.
*
* CLUSTER_REDIR_UNSTABLE if the request contains multiple keys
* belonging to the same slot, but the slot is not stable (in migration or
* importing state, likely because a resharding is in progress).
*
* CLUSTER_REDIR_DOWN_UNBOUND if the request addresses a slot which is
* not bound to any node. In this case the cluster global state should be
* already "down" but it is fragile to rely on the update of the global state,
* so we also handle it here.
*
* CLUSTER_REDIR_DOWN_STATE and CLUSTER_REDIR_DOWN_RO_STATE if the cluster is
* down but the user attempts to execute a command that addresses one or more keys. */
clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
clusterNode *myself = getMyClusterNode();
clusterNode *n = NULL;
robj *firstkey = NULL;
int multiple_keys = 0;
multiState *ms, _ms;
multiCmd mc;
int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0,
existing_keys = 0;
/* Allow any key to be set if a module disabled cluster redirections. */
if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
return myself;
/* Set error code optimistically for the base case. */
if (error_code) *error_code = CLUSTER_REDIR_NONE;
/* Modules can turn off Redis Cluster redirection: this is useful
* when writing a module that implements a completely different
* distributed system. */
/* We handle all the cases as if they were EXEC commands, so we have
* a common code path for everything */
if (cmd->proc == execCommand) {
/* If CLIENT_MULTI flag is not set EXEC is just going to return an
* error. */
if (!(c->flags & CLIENT_MULTI)) return myself;
ms = &c->mstate;
} else {
/* In order to have a single codepath create a fake Multi State
* structure if the client is not in MULTI/EXEC state, this way
* we have a single codepath below. */
ms = &_ms;
_ms.commands = &mc;
_ms.count = 1;
mc.argv = argv;
mc.argc = argc;
mc.cmd = cmd;
}
int is_pubsubshard = cmd->proc == ssubscribeCommand ||
cmd->proc == sunsubscribeCommand ||
cmd->proc == spublishCommand;
/* Check that all the keys are in the same hash slot, and obtain this
* slot and the node associated. */
for (i = 0; i < ms->count; i++) {
struct redisCommand *mcmd;
robj **margv;
int margc, numkeys, j;
keyReference *keyindex;
mcmd = ms->commands[i].cmd;
margc = ms->commands[i].argc;
margv = ms->commands[i].argv;
getKeysResult result = GETKEYS_RESULT_INIT;
numkeys = getKeysFromCommand(mcmd,margv,margc,&result);
keyindex = result.keys;
for (j = 0; j < numkeys; j++) {
robj *thiskey = margv[keyindex[j].pos];
int thisslot = keyHashSlot((char*)thiskey->ptr,
sdslen(thiskey->ptr));
if (firstkey == NULL) {
/* This is the first key we see. Check what is the slot
* and node. */
firstkey = thiskey;
slot = thisslot;
n = getNodeBySlot(slot);
/* Error: If a slot is not served, we are in "cluster down"
* state. However the state is yet to be updated, so this was
* not trapped earlier in processCommand(). Report the same
* error to the client. */
if (n == NULL) {
getKeysFreeResult(&result);
if (error_code)
*error_code = CLUSTER_REDIR_DOWN_UNBOUND;
return NULL;
}
/* If we are migrating or importing this slot, we need to check
* if we have all the keys in the request (the only way we
* can safely serve the request, otherwise we return a TRYAGAIN
* error). To do so we set the importing/migrating state and
* increment a counter for every missing key. */
if (n == myself &&
getMigratingSlotDest(slot) != NULL)
{
migrating_slot = 1;
} else if (getImportingSlotSource(slot) != NULL) {
importing_slot = 1;
}
} else {
/* If it is not the first key/channel, make sure it is exactly
* the same key/channel as the first we saw. */
if (slot != thisslot) {
/* Error: multiple keys from different slots. */
getKeysFreeResult(&result);
if (error_code)
*error_code = CLUSTER_REDIR_CROSS_SLOT;
return NULL;
}
if (importing_slot && !multiple_keys && !equalStringObjects(firstkey,thiskey)) {
/* Flag this request as one with multiple different
* keys/channels when the slot is in importing state. */
multiple_keys = 1;
}
}
/* Migrating / Importing slot? Count keys we don't have.
* If it is pubsubshard command, it isn't required to check
* the channel being present or not in the node during the
* slot migration, the channel will be served from the source
* node until the migration completes with CLUSTER SETSLOT <slot>
* NODE <node-id>. */
int flags = LOOKUP_NOTOUCH | LOOKUP_NOSTATS | LOOKUP_NONOTIFY | LOOKUP_NOEXPIRE;
if ((migrating_slot || importing_slot) && !is_pubsubshard)
{
if (lookupKeyReadWithFlags(&server.db[0], thiskey, flags) == NULL) missing_keys++;
else existing_keys++;
}
}
getKeysFreeResult(&result);
}
/* No key at all in command? then we can serve the request
* without redirections or errors in all the cases. */
if (n == NULL) return myself;
uint64_t cmd_flags = getCommandFlags(c);
/* Cluster is globally down but we got keys? We only serve the request
* if it is a read command and when allow_reads_when_down is enabled. */
if (!isClusterHealthy()) {
if (is_pubsubshard) {
if (!server.cluster_allow_pubsubshard_when_down) {
if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
return NULL;
}
} else if (!server.cluster_allow_reads_when_down) {
/* The cluster is configured to block commands when the
* cluster is down. */
if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
return NULL;
} else if (cmd_flags & CMD_WRITE) {
/* The cluster is configured to allow read only commands */
if (error_code) *error_code = CLUSTER_REDIR_DOWN_RO_STATE;
return NULL;
} else {
/* Fall through and allow the command to be executed:
* this happens when server.cluster_allow_reads_when_down is
* true and the command is not a write command */
}
}
/* Return the hashslot by reference. */
if (hashslot) *hashslot = slot;
/* MIGRATE always works in the context of the local node if the slot
* is open (migrating or importing state). We need to be able to freely
* move keys among instances in this case. */
if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand)
return myself;
/* If we don't have all the keys and we are migrating the slot, send
* an ASK redirection or TRYAGAIN. */
if (migrating_slot && missing_keys) {
/* If we have keys but we don't have all keys, we return TRYAGAIN */
if (existing_keys) {
if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
return NULL;
} else {
if (error_code) *error_code = CLUSTER_REDIR_ASK;
return getMigratingSlotDest(slot);
}
}
/* If we are receiving the slot, and the client correctly flagged the
* request as "ASKING", we can serve the request. However if the request
* involves multiple keys and we don't have them all, the only option is
* to send a TRYAGAIN error. */
if (importing_slot &&
(c->flags & CLIENT_ASKING || cmd_flags & CMD_ASKING))
{
if (multiple_keys && missing_keys) {
if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
return NULL;
} else {
return myself;
}
}
/* Handle the read-only client case reading from a slave: if this
* node is a slave and the request is about a hash slot our master
* is serving, we can reply without redirection. */
int is_write_command = (cmd_flags & CMD_WRITE) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
if (((c->flags & CLIENT_READONLY) || is_pubsubshard) &&
!is_write_command &&
clusterNodeIsSlave(myself) &&
clusterNodeGetSlaveof(myself) == n)
{
return myself;
}
/* Base case: just return the right node. However, if this node is not
* myself, set error_code to MOVED since we need to issue a redirection. */
if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED;
return n;
}
/* Send the client the right redirection code, according to error_code
* that should be set to one of CLUSTER_REDIR_* macros.
*
* If CLUSTER_REDIR_ASK or CLUSTER_REDIR_MOVED error codes
* are used, then the node 'n' should not be NULL, but should be the
* node we want to mention in the redirection. Moreover hashslot should
* be set to the hash slot that caused the redirection. */
void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code) {
if (error_code == CLUSTER_REDIR_CROSS_SLOT) {
addReplyError(c,"-CROSSSLOT Keys in request don't hash to the same slot");
} else if (error_code == CLUSTER_REDIR_UNSTABLE) {
/* The request spawns multiple keys in the same slot,
* but the slot is not "stable" currently as there is
* a migration or import in progress. */
addReplyError(c,"-TRYAGAIN Multiple keys request during rehashing of slot");
} else if (error_code == CLUSTER_REDIR_DOWN_STATE) {
addReplyError(c,"-CLUSTERDOWN The cluster is down");
} else if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) {
addReplyError(c,"-CLUSTERDOWN The cluster is down and only accepts read commands");
} else if (error_code == CLUSTER_REDIR_DOWN_UNBOUND) {
addReplyError(c,"-CLUSTERDOWN Hash slot not served");
} else if (error_code == CLUSTER_REDIR_MOVED ||
error_code == CLUSTER_REDIR_ASK)
{
/* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */
int port = getNodeClientPort(n, shouldReturnTlsInfo());
addReplyErrorSds(c,sdscatprintf(sdsempty(),
"-%s %d %s:%d",
(error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
hashslot, getPreferredEndpoint(n), port));
} else {
serverPanic("getNodeByQuery() unknown error.");
}
}
/* This function is called by the function processing clients incrementally
* to detect timeouts, in order to handle the following case:
*
* 1) A client blocks with BLPOP or similar blocking operation.
* 2) The master migrates the hash slot elsewhere or turns into a slave.
* 3) The client may remain blocked forever (or up to the max timeout time)
* waiting for a key change that will never happen.
*
* If the client is found to be blocked into a hash slot this node no
* longer handles, the client is sent a redirection error, and the function
* returns 1. Otherwise 0 is returned and no operation is performed. */
int clusterRedirectBlockedClientIfNeeded(client *c) {
clusterNode *myself = getMyClusterNode();
if (c->flags & CLIENT_BLOCKED &&
(c->bstate.btype == BLOCKED_LIST ||
c->bstate.btype == BLOCKED_ZSET ||
c->bstate.btype == BLOCKED_STREAM ||
c->bstate.btype == BLOCKED_MODULE))
{
dictEntry *de;
dictIterator *di;
/* If the cluster is down, unblock the client with the right error.
* If the cluster is configured to allow reads on cluster down, we
* still want to emit this error since a write will be required
* to unblock them which may never come. */
if (!isClusterHealthy()) {
clusterRedirectClient(c,NULL,0,CLUSTER_REDIR_DOWN_STATE);
return 1;
}
/* If the client is blocked on module, but not on a specific key,
* don't unblock it (except for the CLUSTER_FAIL case above). */
if (c->bstate.btype == BLOCKED_MODULE && !moduleClientIsBlockedOnKeys(c))
return 0;
/* All keys must belong to the same slot, so check first key only. */
di = dictGetIterator(c->bstate.keys);
if ((de = dictNext(di)) != NULL) {
robj *key = dictGetKey(de);
int slot = keyHashSlot((char*)key->ptr, sdslen(key->ptr));
clusterNode *node = getNodeBySlot(slot);
/* if the client is read-only and attempting to access key that our
* replica can handle, allow it. */
if ((c->flags & CLIENT_READONLY) &&
!(c->lastcmd->flags & CMD_WRITE) &&
clusterNodeIsSlave(myself) && clusterNodeGetSlaveof(myself) == node)
{
node = myself;
}
/* We send an error and unblock the client if:
* 1) The slot is unassigned, emitting a cluster down error.
* 2) The slot is not handled by this node, nor being imported. */
if (node != myself && getImportingSlotSource(slot) == NULL)
{
if (node == NULL) {
clusterRedirectClient(c,NULL,0,
CLUSTER_REDIR_DOWN_UNBOUND);
} else {
clusterRedirectClient(c,node,slot,
CLUSTER_REDIR_MOVED);
}
dictReleaseIterator(di);
return 1;
}
}
dictReleaseIterator(di);
}
return 0;
}
/* Returns an indication if the replica node is fully available
* and should be listed in CLUSTER SLOTS response.
* Returns 1 for available nodes, 0 for nodes that have
* not finished their initial sync, in failed state, or are
* otherwise considered not available to serve read commands. */
static int isReplicaAvailable(clusterNode *node) {
if (clusterNodeIsFailing(node)) {
return 0;
}
long long repl_offset = getReplOffset(node);
if (clusterNodeIsMyself(node)) {
/* Nodes do not update their own information
* in the cluster node list. */
repl_offset = replicationGetSlaveOffset();
}
return (repl_offset != 0);
}
void addNodeToNodeReply(client *c, clusterNode *node) {
char* hostname = clusterNodeHostname(node);
addReplyArrayLen(c, 4);
if (server.cluster_preferred_endpoint_type == CLUSTER_ENDPOINT_TYPE_IP) {
addReplyBulkCString(c, clusterNodeIp(node));
} else if (server.cluster_preferred_endpoint_type == CLUSTER_ENDPOINT_TYPE_HOSTNAME) {
if (hostname != NULL && hostname[0] != '\0') {
addReplyBulkCString(c, hostname);
} else {
addReplyBulkCString(c, "?");
}
} else if (server.cluster_preferred_endpoint_type == CLUSTER_ENDPOINT_TYPE_UNKNOWN_ENDPOINT) {
addReplyNull(c);
} else {
serverPanic("Unrecognized preferred endpoint type");
}
/* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */
addReplyLongLong(c, getNodeClientPort(node, shouldReturnTlsInfo()));
addReplyBulkCBuffer(c, clusterNodeGetName(node), CLUSTER_NAMELEN);
/* Add the additional endpoint information, this is all the known networking information
* that is not the preferred endpoint. Note the logic is evaluated twice so we can
* correctly report the number of additional network arguments without using a deferred
* map, an assertion is made at the end to check we set the right length. */
int length = 0;
if (server.cluster_preferred_endpoint_type != CLUSTER_ENDPOINT_TYPE_IP) {
length++;
}
if (server.cluster_preferred_endpoint_type != CLUSTER_ENDPOINT_TYPE_HOSTNAME
&& hostname != NULL && hostname[0] != '\0')
{
length++;
}
addReplyMapLen(c, length);
if (server.cluster_preferred_endpoint_type != CLUSTER_ENDPOINT_TYPE_IP) {
addReplyBulkCString(c, "ip");
addReplyBulkCString(c, clusterNodeIp(node));
length--;
}
if (server.cluster_preferred_endpoint_type != CLUSTER_ENDPOINT_TYPE_HOSTNAME
&& hostname != NULL && hostname[0] != '\0')
{
addReplyBulkCString(c, "hostname");
addReplyBulkCString(c, hostname);
length--;
}
serverAssert(length == 0);
}
void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, int end_slot) {
int i, nested_elements = 3; /* slots (2) + master addr (1) */
for (i = 0; i < getNumSlaves(node); i++) {
if (!isReplicaAvailable(getSlave(node, i))) continue;
nested_elements++;
}
addReplyArrayLen(c, nested_elements);
addReplyLongLong(c, start_slot);
addReplyLongLong(c, end_slot);
addNodeToNodeReply(c, node);
/* Remaining nodes in reply are replicas for slot range */
for (i = 0; i < getNumSlaves(node); i++) {
/* This loop is copy/pasted from clusterGenNodeDescription()
* with modifications for per-slot node aggregation. */
if (!isReplicaAvailable(getSlave(node, i))) continue;
addNodeToNodeReply(c, getSlave(node, i));
nested_elements--;
}
serverAssert(nested_elements == 3); /* Original 3 elements */
}
void clusterCommandSlots(client * c) {
/* Format: 1) 1) start slot
* 2) end slot
* 3) 1) master IP
* 2) master port
* 3) node ID
* 4) 1) replica IP
* 2) replica port
* 3) node ID
* ... continued until done
*/
clusterNode *n = NULL;
int num_masters = 0, start = -1;
void *slot_replylen = addReplyDeferredLen(c);
for (int i = 0; i <= CLUSTER_SLOTS; i++) {
/* Find start node and slot id. */
if (n == NULL) {
if (i == CLUSTER_SLOTS) break;
n = getNodeBySlot(i);
start = i;
continue;
}
/* Add cluster slots info when occur different node with start
* or end of slot. */
if (i == CLUSTER_SLOTS || n != getNodeBySlot(i)) {
addNodeReplyForClusterSlot(c, n, start, i-1);
num_masters++;
if (i == CLUSTER_SLOTS) break;
n = getNodeBySlot(i);
start = i;
}
}
setDeferredArrayLen(c, slot_replylen, num_masters);
}
/* -----------------------------------------------------------------------------
* Cluster functions related to serving / redirecting clients
* -------------------------------------------------------------------------- */
/* The ASKING command is required after a -ASK redirection.
* The client should issue ASKING before to actually send the command to
* the target instance. See the Redis Cluster specification for more
* information. */
void askingCommand(client *c) {
if (server.cluster_enabled == 0) {
addReplyError(c,"This instance has cluster support disabled");
return;
}
c->flags |= CLIENT_ASKING;
addReply(c,shared.ok);
}
/* The READONLY command is used by clients to enter the read-only mode.
* In this mode slaves will not redirect clients as long as clients access
* with read-only commands to keys that are served by the slave's master. */
void readonlyCommand(client *c) {
if (server.cluster_enabled == 0) {
addReplyError(c,"This instance has cluster support disabled");
return;
}
c->flags |= CLIENT_READONLY;
addReply(c,shared.ok);
}
/* The READWRITE command just clears the READONLY command state. */
void readwriteCommand(client *c) {
if (server.cluster_enabled == 0) {
addReplyError(c,"This instance has cluster support disabled");
return;
}
c->flags &= ~CLIENT_READONLY;
addReply(c,shared.ok);
}

View File

@ -87,6 +87,15 @@ void clusterCommandShards(client *c);
sds clusterGenNodeDescription(client *c, clusterNode *node, int tls_primary);
int getNumSlaves(clusterNode *node);
clusterNode *getSlave(clusterNode *node, int slave_idx);
clusterNode *getMigratingSlotDest(int slot);
clusterNode *getImportingSlotSource(int slot);
int isClusterHealthy(void);
clusterNode *getNodeBySlot(int slot);
int getNodeClientPort(clusterNode *n, int use_tls);
char* clusterNodeHostname(clusterNode *node);
const char *getPreferredEndpoint(clusterNode *n);
void migrateCommand(client *c);
long long getReplOffset(clusterNode *node);
char **clusterDebugCommandHelp(void);
ConnectionType *connTypeOfCluster(void);

View File

@ -112,7 +112,7 @@ static inline int getNodeDefaultReplicationPort(clusterNode *n) {
return server.tls_replication ? n->tls_port : n->tcp_port;
}
static inline int getNodeClientPort(clusterNode *n, int use_tls) {
int getNodeClientPort(clusterNode *n, int use_tls) {
return use_tls ? n->tls_port : n->tcp_port;
}
@ -5402,15 +5402,6 @@ void addReplyClusterLinksDescription(client *c) {
* CLUSTER command
* -------------------------------------------------------------------------- */
const char *getPreferredEndpoint(clusterNode *n) {
switch(server.cluster_preferred_endpoint_type) {
case CLUSTER_ENDPOINT_TYPE_IP: return n->ip;
case CLUSTER_ENDPOINT_TYPE_HOSTNAME: return (sdslen(n->hostname) != 0) ? n->hostname : "?";
case CLUSTER_ENDPOINT_TYPE_UNKNOWN_ENDPOINT: return "";
}
return "unknown";
}
const char *clusterGetMessageTypeString(int type) {
switch(type) {
case CLUSTERMSG_TYPE_PING: return "ping";
@ -5440,24 +5431,6 @@ int getSlotOrReply(client *c, robj *o) {
return (int) slot;
}
/* Returns an indication if the replica node is fully available
* and should be listed in CLUSTER SLOTS response.
* Returns 1 for available nodes, 0 for nodes that have
* not finished their initial sync, in failed state, or are
* otherwise considered not available to serve read commands. */
static int isReplicaAvailable(clusterNode *node) {
if (nodeFailed(node)) {
return 0;
}
long long repl_offset = node->repl_offset;
if (node->flags & CLUSTER_NODE_MYSELF) {
/* Nodes do not update their own information
* in the cluster node list. */
repl_offset = replicationGetSlaveOffset();
}
return (repl_offset != 0);
}
int checkSlotAssignmentsOrReply(client *c, unsigned char *slots, int del, int start_slot, int end_slot) {
int slot;
for (slot = start_slot; slot <= end_slot; slot++) {
@ -5494,78 +5467,6 @@ void clusterUpdateSlots(client *c, unsigned char *slots, int del) {
}
}
void addNodeToNodeReply(client *c, clusterNode *node) {
addReplyArrayLen(c, 4);
if (server.cluster_preferred_endpoint_type == CLUSTER_ENDPOINT_TYPE_IP) {
addReplyBulkCString(c, node->ip);
} else if (server.cluster_preferred_endpoint_type == CLUSTER_ENDPOINT_TYPE_HOSTNAME) {
if (sdslen(node->hostname) != 0) {
addReplyBulkCBuffer(c, node->hostname, sdslen(node->hostname));
} else {
addReplyBulkCString(c, "?");
}
} else if (server.cluster_preferred_endpoint_type == CLUSTER_ENDPOINT_TYPE_UNKNOWN_ENDPOINT) {
addReplyNull(c);
} else {
serverPanic("Unrecognized preferred endpoint type");
}
/* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */
addReplyLongLong(c, getNodeClientPort(node, shouldReturnTlsInfo()));
addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN);
/* Add the additional endpoint information, this is all the known networking information
* that is not the preferred endpoint. Note the logic is evaluated twice so we can
* correctly report the number of additional network arguments without using a deferred
* map, an assertion is made at the end to check we set the right length. */
int length = 0;
if (server.cluster_preferred_endpoint_type != CLUSTER_ENDPOINT_TYPE_IP) {
length++;
}
if (server.cluster_preferred_endpoint_type != CLUSTER_ENDPOINT_TYPE_HOSTNAME
&& sdslen(node->hostname) != 0)
{
length++;
}
addReplyMapLen(c, length);
if (server.cluster_preferred_endpoint_type != CLUSTER_ENDPOINT_TYPE_IP) {
addReplyBulkCString(c, "ip");
addReplyBulkCString(c, node->ip);
length--;
}
if (server.cluster_preferred_endpoint_type != CLUSTER_ENDPOINT_TYPE_HOSTNAME
&& sdslen(node->hostname) != 0)
{
addReplyBulkCString(c, "hostname");
addReplyBulkCBuffer(c, node->hostname, sdslen(node->hostname));
length--;
}
serverAssert(length == 0);
}
void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, int end_slot) {
int i, nested_elements = 3; /* slots (2) + master addr (1) */
for (i = 0; i < node->numslaves; i++) {
if (!isReplicaAvailable(node->slaves[i])) continue;
nested_elements++;
}
addReplyArrayLen(c, nested_elements);
addReplyLongLong(c, start_slot);
addReplyLongLong(c, end_slot);
addNodeToNodeReply(c, node);
/* Remaining nodes in reply are replicas for slot range */
for (i = 0; i < node->numslaves; i++) {
/* This loop is copy/pasted from clusterGenNodeDescription()
* with modifications for per-slot node aggregation. */
if (!isReplicaAvailable(node->slaves[i])) continue;
addNodeToNodeReply(c, node->slaves[i]);
nested_elements--;
}
serverAssert(nested_elements == 3); /* Original 3 elements */
}
/* Add detailed information of a node to the output buffer of the given client. */
void addNodeDetailsToShardReply(client *c, clusterNode *node) {
int reply_count = 0;
@ -5675,43 +5576,6 @@ void clusterCommandShards(client *c) {
dictReleaseIterator(di);
}
void clusterCommandSlots(client * c) {
/* Format: 1) 1) start slot
* 2) end slot
* 3) 1) master IP
* 2) master port
* 3) node ID
* 4) 1) replica IP
* 2) replica port
* 3) node ID
* ... continued until done
*/
clusterNode *n = NULL;
int num_masters = 0, start = -1;
void *slot_replylen = addReplyDeferredLen(c);
for (int i = 0; i <= CLUSTER_SLOTS; i++) {
/* Find start node and slot id. */
if (n == NULL) {
if (i == CLUSTER_SLOTS) break;
n = server.cluster->slots[i];
start = i;
continue;
}
/* Add cluster slots info when occur different node with start
* or end of slot. */
if (i == CLUSTER_SLOTS || n != server.cluster->slots[i]) {
addNodeReplyForClusterSlot(c, n, start, i-1);
num_masters++;
if (i == CLUSTER_SLOTS) break;
n = server.cluster->slots[i];
start = i;
}
}
setDeferredArrayLen(c, slot_replylen, num_masters);
}
sds genClusterInfoString(void) {
sds info = sdsempty();
char *statestr[] = {"ok","fail"};
@ -5816,396 +5680,6 @@ void removeChannelsInSlot(unsigned int slot) {
}
/* -----------------------------------------------------------------------------
* Cluster functions related to serving / redirecting clients
* -------------------------------------------------------------------------- */
/* The ASKING command is required after a -ASK redirection.
* The client should issue ASKING before to actually send the command to
* the target instance. See the Redis Cluster specification for more
* information. */
void askingCommand(client *c) {
if (server.cluster_enabled == 0) {
addReplyError(c,"This instance has cluster support disabled");
return;
}
c->flags |= CLIENT_ASKING;
addReply(c,shared.ok);
}
/* The READONLY command is used by clients to enter the read-only mode.
* In this mode slaves will not redirect clients as long as clients access
* with read-only commands to keys that are served by the slave's master. */
void readonlyCommand(client *c) {
if (server.cluster_enabled == 0) {
addReplyError(c,"This instance has cluster support disabled");
return;
}
c->flags |= CLIENT_READONLY;
addReply(c,shared.ok);
}
/* The READWRITE command just clears the READONLY command state. */
void readwriteCommand(client *c) {
if (server.cluster_enabled == 0) {
addReplyError(c,"This instance has cluster support disabled");
return;
}
c->flags &= ~CLIENT_READONLY;
addReply(c,shared.ok);
}
/* Return the pointer to the cluster node that is able to serve the command.
* For the function to succeed the command should only target either:
*
* 1) A single key (even multiple times like RPOPLPUSH mylist mylist).
* 2) Multiple keys in the same hash slot, while the slot is stable (no
* resharding in progress).
*
* On success the function returns the node that is able to serve the request.
* If the node is not 'myself' a redirection must be performed. The kind of
* redirection is specified setting the integer passed by reference
* 'error_code', which will be set to CLUSTER_REDIR_ASK or
* CLUSTER_REDIR_MOVED.
*
* When the node is 'myself' 'error_code' is set to CLUSTER_REDIR_NONE.
*
* If the command fails NULL is returned, and the reason of the failure is
* provided via 'error_code', which will be set to:
*
* CLUSTER_REDIR_CROSS_SLOT if the request contains multiple keys that
* don't belong to the same hash slot.
*
* CLUSTER_REDIR_UNSTABLE if the request contains multiple keys
* belonging to the same slot, but the slot is not stable (in migration or
* importing state, likely because a resharding is in progress).
*
* CLUSTER_REDIR_DOWN_UNBOUND if the request addresses a slot which is
* not bound to any node. In this case the cluster global state should be
* already "down" but it is fragile to rely on the update of the global state,
* so we also handle it here.
*
* CLUSTER_REDIR_DOWN_STATE and CLUSTER_REDIR_DOWN_RO_STATE if the cluster is
* down but the user attempts to execute a command that addresses one or more keys. */
clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
clusterNode *n = NULL;
robj *firstkey = NULL;
int multiple_keys = 0;
multiState *ms, _ms;
multiCmd mc;
int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0,
existing_keys = 0;
/* Allow any key to be set if a module disabled cluster redirections. */
if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
return myself;
/* Set error code optimistically for the base case. */
if (error_code) *error_code = CLUSTER_REDIR_NONE;
/* Modules can turn off Redis Cluster redirection: this is useful
* when writing a module that implements a completely different
* distributed system. */
/* We handle all the cases as if they were EXEC commands, so we have
* a common code path for everything */
if (cmd->proc == execCommand) {
/* If CLIENT_MULTI flag is not set EXEC is just going to return an
* error. */
if (!(c->flags & CLIENT_MULTI)) return myself;
ms = &c->mstate;
} else {
/* In order to have a single codepath create a fake Multi State
* structure if the client is not in MULTI/EXEC state, this way
* we have a single codepath below. */
ms = &_ms;
_ms.commands = &mc;
_ms.count = 1;
mc.argv = argv;
mc.argc = argc;
mc.cmd = cmd;
}
int is_pubsubshard = cmd->proc == ssubscribeCommand ||
cmd->proc == sunsubscribeCommand ||
cmd->proc == spublishCommand;
/* Check that all the keys are in the same hash slot, and obtain this
* slot and the node associated. */
for (i = 0; i < ms->count; i++) {
struct redisCommand *mcmd;
robj **margv;
int margc, numkeys, j;
keyReference *keyindex;
mcmd = ms->commands[i].cmd;
margc = ms->commands[i].argc;
margv = ms->commands[i].argv;
getKeysResult result = GETKEYS_RESULT_INIT;
numkeys = getKeysFromCommand(mcmd,margv,margc,&result);
keyindex = result.keys;
for (j = 0; j < numkeys; j++) {
robj *thiskey = margv[keyindex[j].pos];
int thisslot = keyHashSlot((char*)thiskey->ptr,
sdslen(thiskey->ptr));
if (firstkey == NULL) {
/* This is the first key we see. Check what is the slot
* and node. */
firstkey = thiskey;
slot = thisslot;
n = server.cluster->slots[slot];
/* Error: If a slot is not served, we are in "cluster down"
* state. However the state is yet to be updated, so this was
* not trapped earlier in processCommand(). Report the same
* error to the client. */
if (n == NULL) {
getKeysFreeResult(&result);
if (error_code)
*error_code = CLUSTER_REDIR_DOWN_UNBOUND;
return NULL;
}
/* If we are migrating or importing this slot, we need to check
* if we have all the keys in the request (the only way we
* can safely serve the request, otherwise we return a TRYAGAIN
* error). To do so we set the importing/migrating state and
* increment a counter for every missing key. */
if (n == myself &&
server.cluster->migrating_slots_to[slot] != NULL)
{
migrating_slot = 1;
} else if (server.cluster->importing_slots_from[slot] != NULL) {
importing_slot = 1;
}
} else {
/* If it is not the first key/channel, make sure it is exactly
* the same key/channel as the first we saw. */
if (slot != thisslot) {
/* Error: multiple keys from different slots. */
getKeysFreeResult(&result);
if (error_code)
*error_code = CLUSTER_REDIR_CROSS_SLOT;
return NULL;
}
if (importing_slot && !multiple_keys && !equalStringObjects(firstkey,thiskey)) {
/* Flag this request as one with multiple different
* keys/channels when the slot is in importing state. */
multiple_keys = 1;
}
}
/* Migrating / Importing slot? Count keys we don't have.
* If it is pubsubshard command, it isn't required to check
* the channel being present or not in the node during the
* slot migration, the channel will be served from the source
* node until the migration completes with CLUSTER SETSLOT <slot>
* NODE <node-id>. */
int flags = LOOKUP_NOTOUCH | LOOKUP_NOSTATS | LOOKUP_NONOTIFY | LOOKUP_NOEXPIRE;
if ((migrating_slot || importing_slot) && !is_pubsubshard)
{
if (lookupKeyReadWithFlags(&server.db[0], thiskey, flags) == NULL) missing_keys++;
else existing_keys++;
}
}
getKeysFreeResult(&result);
}
/* No key at all in command? then we can serve the request
* without redirections or errors in all the cases. */
if (n == NULL) return myself;
uint64_t cmd_flags = getCommandFlags(c);
/* Cluster is globally down but we got keys? We only serve the request
* if it is a read command and when allow_reads_when_down is enabled. */
if (server.cluster->state != CLUSTER_OK) {
if (is_pubsubshard) {
if (!server.cluster_allow_pubsubshard_when_down) {
if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
return NULL;
}
} else if (!server.cluster_allow_reads_when_down) {
/* The cluster is configured to block commands when the
* cluster is down. */
if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
return NULL;
} else if (cmd_flags & CMD_WRITE) {
/* The cluster is configured to allow read only commands */
if (error_code) *error_code = CLUSTER_REDIR_DOWN_RO_STATE;
return NULL;
} else {
/* Fall through and allow the command to be executed:
* this happens when server.cluster_allow_reads_when_down is
* true and the command is not a write command */
}
}
/* Return the hashslot by reference. */
if (hashslot) *hashslot = slot;
/* MIGRATE always works in the context of the local node if the slot
* is open (migrating or importing state). We need to be able to freely
* move keys among instances in this case. */
if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand)
return myself;
/* If we don't have all the keys and we are migrating the slot, send
* an ASK redirection or TRYAGAIN. */
if (migrating_slot && missing_keys) {
/* If we have keys but we don't have all keys, we return TRYAGAIN */
if (existing_keys) {
if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
return NULL;
} else {
if (error_code) *error_code = CLUSTER_REDIR_ASK;
return server.cluster->migrating_slots_to[slot];
}
}
/* If we are receiving the slot, and the client correctly flagged the
* request as "ASKING", we can serve the request. However if the request
* involves multiple keys and we don't have them all, the only option is
* to send a TRYAGAIN error. */
if (importing_slot &&
(c->flags & CLIENT_ASKING || cmd_flags & CMD_ASKING))
{
if (multiple_keys && missing_keys) {
if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
return NULL;
} else {
return myself;
}
}
/* Handle the read-only client case reading from a slave: if this
* node is a slave and the request is about a hash slot our master
* is serving, we can reply without redirection. */
int is_write_command = (cmd_flags & CMD_WRITE) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
if (((c->flags & CLIENT_READONLY) || is_pubsubshard) &&
!is_write_command &&
nodeIsSlave(myself) &&
myself->slaveof == n)
{
return myself;
}
/* Base case: just return the right node. However if this node is not
* myself, set error_code to MOVED since we need to issue a redirection. */
if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED;
return n;
}
/* Send the client the right redirection code, according to error_code
* that should be set to one of CLUSTER_REDIR_* macros.
*
* If CLUSTER_REDIR_ASK or CLUSTER_REDIR_MOVED error codes
* are used, then the node 'n' should not be NULL, but should be the
* node we want to mention in the redirection. Moreover hashslot should
* be set to the hash slot that caused the redirection. */
void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code) {
if (error_code == CLUSTER_REDIR_CROSS_SLOT) {
addReplyError(c,"-CROSSSLOT Keys in request don't hash to the same slot");
} else if (error_code == CLUSTER_REDIR_UNSTABLE) {
/* The request spawns multiple keys in the same slot,
* but the slot is not "stable" currently as there is
* a migration or import in progress. */
addReplyError(c,"-TRYAGAIN Multiple keys request during rehashing of slot");
} else if (error_code == CLUSTER_REDIR_DOWN_STATE) {
addReplyError(c,"-CLUSTERDOWN The cluster is down");
} else if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) {
addReplyError(c,"-CLUSTERDOWN The cluster is down and only accepts read commands");
} else if (error_code == CLUSTER_REDIR_DOWN_UNBOUND) {
addReplyError(c,"-CLUSTERDOWN Hash slot not served");
} else if (error_code == CLUSTER_REDIR_MOVED ||
error_code == CLUSTER_REDIR_ASK)
{
/* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */
int port = getNodeClientPort(n, shouldReturnTlsInfo());
addReplyErrorSds(c,sdscatprintf(sdsempty(),
"-%s %d %s:%d",
(error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
hashslot, getPreferredEndpoint(n), port));
} else {
serverPanic("getNodeByQuery() unknown error.");
}
}
/* This function is called by the function processing clients incrementally
* to detect timeouts, in order to handle the following case:
*
* 1) A client blocks with BLPOP or similar blocking operation.
* 2) The master migrates the hash slot elsewhere or turns into a slave.
* 3) The client may remain blocked forever (or up to the max timeout time)
* waiting for a key change that will never happen.
*
* If the client is found to be blocked into a hash slot this node no
* longer handles, the client is sent a redirection error, and the function
* returns 1. Otherwise 0 is returned and no operation is performed. */
int clusterRedirectBlockedClientIfNeeded(client *c) {
if (c->flags & CLIENT_BLOCKED &&
(c->bstate.btype == BLOCKED_LIST ||
c->bstate.btype == BLOCKED_ZSET ||
c->bstate.btype == BLOCKED_STREAM ||
c->bstate.btype == BLOCKED_MODULE))
{
dictEntry *de;
dictIterator *di;
/* If the cluster is down, unblock the client with the right error.
* If the cluster is configured to allow reads on cluster down, we
* still want to emit this error since a write will be required
* to unblock them which may never come. */
if (server.cluster->state == CLUSTER_FAIL) {
clusterRedirectClient(c,NULL,0,CLUSTER_REDIR_DOWN_STATE);
return 1;
}
/* If the client is blocked on module, but not on a specific key,
* don't unblock it (except for the CLUSTER_FAIL case above). */
if (c->bstate.btype == BLOCKED_MODULE && !moduleClientIsBlockedOnKeys(c))
return 0;
/* All keys must belong to the same slot, so check first key only. */
di = dictGetIterator(c->bstate.keys);
if ((de = dictNext(di)) != NULL) {
robj *key = dictGetKey(de);
int slot = keyHashSlot((char*)key->ptr, sdslen(key->ptr));
clusterNode *node = server.cluster->slots[slot];
/* if the client is read-only and attempting to access key that our
* replica can handle, allow it. */
if ((c->flags & CLIENT_READONLY) &&
!(c->lastcmd->flags & CMD_WRITE) &&
nodeIsSlave(myself) && myself->slaveof == node)
{
node = myself;
}
/* We send an error and unblock the client if:
* 1) The slot is unassigned, emitting a cluster down error.
* 2) The slot is not handled by this node, nor being imported. */
if (node != myself &&
server.cluster->importing_slots_from[slot] == NULL)
{
if (node == NULL) {
clusterRedirectClient(c,NULL,0,
CLUSTER_REDIR_DOWN_UNBOUND);
} else {
clusterRedirectClient(c,node,slot,
CLUSTER_REDIR_MOVED);
}
dictReleaseIterator(di);
return 1;
}
}
dictReleaseIterator(di);
}
return 0;
}
/* Remove all the keys in the specified hash slot.
* The number of removed items is returned. */
@ -6295,10 +5769,6 @@ int clusterManualFailoverTimeLimit(void) {
return server.cluster->mf_end;
}
char* getMyClusterId(void) {
return server.cluster->myself->name;
}
int getClusterSize(void) {
return dictSize(server.cluster->nodes);
}
@ -6904,3 +6374,37 @@ int getNumSlaves(clusterNode *node) {
clusterNode *getSlave(clusterNode *node, int slave_idx) {
return node->slaves[slave_idx];
}
clusterNode *getMigratingSlotDest(int slot) {
return server.cluster->migrating_slots_to[slot];
}
clusterNode *getImportingSlotSource(int slot) {
return server.cluster->importing_slots_from[slot];
}
int isClusterHealthy(void) {
return server.cluster->state == CLUSTER_OK;
}
clusterNode *getNodeBySlot(int slot) {
return server.cluster->slots[slot];
}
char* clusterNodeHostname(clusterNode *node) {
return node->hostname;
}
long long getReplOffset(clusterNode *node) {
return node->repl_offset;
}
const char *getPreferredEndpoint(clusterNode *n) {
char* hostname = clusterNodeHostname(n);
switch(server.cluster_preferred_endpoint_type) {
case CLUSTER_ENDPOINT_TYPE_IP: return clusterNodeIp(n);
case CLUSTER_ENDPOINT_TYPE_HOSTNAME: return (hostname != NULL && hostname[0] != '\0') ? hostname : "?";
case CLUSTER_ENDPOINT_TYPE_UNKNOWN_ENDPOINT: return "";
}
return "unknown";
}