Redis Cluster: support for multi-key operations.
This commit is contained in:
parent
bbf39b7a3a
commit
36676c2318
123
src/cluster.c
123
src/cluster.c
@ -3804,21 +3804,39 @@ void readwriteCommand(redisClient *c) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Return the pointer to the cluster node that is able to serve the command.
|
/* Return the pointer to the cluster node that is able to serve the command.
|
||||||
* For the function to succeed the command should only target a single
|
* For the function to succeed the command should only target either:
|
||||||
* key (or the same key multiple times).
|
|
||||||
*
|
*
|
||||||
* If the returned node should be used only for this request, the *ask
|
* 1) A single key (even multiple times like LPOPRPUSH mylist mylist).
|
||||||
* integer is set to '1', otherwise to '0'. This is used in order to
|
* 2) Multiple keys in the same hash slot, while the slot is stable (no
|
||||||
* let the caller know if we should reply with -MOVED or with -ASK.
|
* resharding in progress).
|
||||||
*
|
*
|
||||||
* If the command contains multiple keys, and as a consequence it is not
|
* On success the function returns the node that is able to serve the request.
|
||||||
* possible to handle the request in Redis Cluster, NULL is returned. */
|
* If the node is not 'myself' a redirection must be perfomed. The kind of
|
||||||
clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask) {
|
* redirection is specified setting the integer passed by reference
|
||||||
|
* 'error_code', which will be set to REDIS_CLUSTER_REDIR_ASK or
|
||||||
|
* REDIS_CLUSTER_REDIR_MOVED.
|
||||||
|
*
|
||||||
|
* When the node is 'myself' 'error_code' is set to REDIS_CLUSTER_REDIR_NONE.
|
||||||
|
*
|
||||||
|
* If the command fails NULL is returned, and the reason of the failure is
|
||||||
|
* provided via 'error_code', which will be set to:
|
||||||
|
*
|
||||||
|
* REDIS_CLUSTER_REDIR_CROSS_SLOT if the request contains multiple keys that
|
||||||
|
* don't belong to the same hash slot.
|
||||||
|
*
|
||||||
|
* REDIS_CLUSTER_REDIR_UNSTABLE if the request contains mutliple keys
|
||||||
|
* belonging to the same slot, but the slot is not stable (in migration or
|
||||||
|
* importing state, likely because a resharding is in progress). */
|
||||||
|
clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
|
||||||
clusterNode *n = NULL;
|
clusterNode *n = NULL;
|
||||||
robj *firstkey = NULL;
|
robj *firstkey = NULL;
|
||||||
|
int multiple_keys = 0;
|
||||||
multiState *ms, _ms;
|
multiState *ms, _ms;
|
||||||
multiCmd mc;
|
multiCmd mc;
|
||||||
int i, slot = 0;
|
int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0;
|
||||||
|
|
||||||
|
/* Set error code optimistically for the base case. */
|
||||||
|
if (error_code) *error_code = REDIS_CLUSTER_REDIR_NONE;
|
||||||
|
|
||||||
/* We handle all the cases as if they were EXEC commands, so we have
|
/* We handle all the cases as if they were EXEC commands, so we have
|
||||||
* a common code path for everything */
|
* a common code path for everything */
|
||||||
@ -3839,8 +3857,8 @@ clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **arg
|
|||||||
mc.cmd = cmd;
|
mc.cmd = cmd;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Check that all the keys are the same key, and get the slot and
|
/* Check that all the keys are in the same hash slot, and obtain this
|
||||||
* node for this key. */
|
* slot and the node associated. */
|
||||||
for (i = 0; i < ms->count; i++) {
|
for (i = 0; i < ms->count; i++) {
|
||||||
struct redisCommand *mcmd;
|
struct redisCommand *mcmd;
|
||||||
robj **margv;
|
robj **margv;
|
||||||
@ -3853,48 +3871,88 @@ clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **arg
|
|||||||
keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys,
|
keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys,
|
||||||
REDIS_GETKEYS_ALL);
|
REDIS_GETKEYS_ALL);
|
||||||
for (j = 0; j < numkeys; j++) {
|
for (j = 0; j < numkeys; j++) {
|
||||||
|
robj *thiskey = margv[keyindex[j]];
|
||||||
|
int thisslot = keyHashSlot((char*)thiskey->ptr,
|
||||||
|
sdslen(thiskey->ptr));
|
||||||
|
|
||||||
if (firstkey == NULL) {
|
if (firstkey == NULL) {
|
||||||
/* This is the first key we see. Check what is the slot
|
/* This is the first key we see. Check what is the slot
|
||||||
* and node. */
|
* and node. */
|
||||||
firstkey = margv[keyindex[j]];
|
firstkey = thiskey;
|
||||||
|
slot = thisslot;
|
||||||
slot = keyHashSlot((char*)firstkey->ptr, sdslen(firstkey->ptr));
|
|
||||||
n = server.cluster->slots[slot];
|
n = server.cluster->slots[slot];
|
||||||
redisAssertWithInfo(c,firstkey,n != NULL);
|
redisAssertWithInfo(c,firstkey,n != NULL);
|
||||||
|
/* If we are migrating or importing this slot, we need to check
|
||||||
|
* if we have all the keys in the request (the only way we
|
||||||
|
* can safely serve the request, otherwise we return a TRYAGAIN
|
||||||
|
* error). To do so we set the importing/migrating state and
|
||||||
|
* increment a counter for every missing key. */
|
||||||
|
if (n == myself &&
|
||||||
|
server.cluster->migrating_slots_to[slot] != NULL)
|
||||||
|
{
|
||||||
|
migrating_slot = 1;
|
||||||
|
} else if (server.cluster->importing_slots_from[slot] != NULL) {
|
||||||
|
importing_slot = 1;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
/* If it is not the first key, make sure it is exactly
|
/* If it is not the first key, make sure it is exactly
|
||||||
* the same key as the first we saw. */
|
* the same key as the first we saw. */
|
||||||
if (!equalStringObjects(firstkey,margv[keyindex[j]])) {
|
if (!equalStringObjects(firstkey,thiskey)) {
|
||||||
|
if (slot != thisslot) {
|
||||||
|
/* Error: multiple keys from different slots. */
|
||||||
getKeysFreeResult(keyindex);
|
getKeysFreeResult(keyindex);
|
||||||
|
if (error_code)
|
||||||
|
*error_code = REDIS_CLUSTER_REDIR_CROSS_SLOT;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
} else {
|
||||||
|
/* Flag this request as one with multiple different
|
||||||
|
* keys. */
|
||||||
|
multiple_keys = 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Migarting / Improrting slot? Count keys we don't have. */
|
||||||
|
if ((migrating_slot || importing_slot) &&
|
||||||
|
lookupKeyRead(&server.db[0],thiskey) == NULL)
|
||||||
|
{
|
||||||
|
missing_keys++;
|
||||||
|
}
|
||||||
|
}
|
||||||
getKeysFreeResult(keyindex);
|
getKeysFreeResult(keyindex);
|
||||||
}
|
}
|
||||||
if (ask) *ask = 0; /* This is the default. Set to 1 if needed later. */
|
|
||||||
/* No key at all in command? then we can serve the request
|
/* No key at all in command? then we can serve the request
|
||||||
* without redirections. */
|
* without redirections or errors. */
|
||||||
if (n == NULL) return myself;
|
if (n == NULL) return myself;
|
||||||
|
|
||||||
|
/* Return the hashslot by reference. */
|
||||||
if (hashslot) *hashslot = slot;
|
if (hashslot) *hashslot = slot;
|
||||||
|
|
||||||
/* This request is about a slot we are migrating into another instance?
|
/* This request is about a slot we are migrating into another instance?
|
||||||
* Then we need to check if we have the key. If we have it we can reply.
|
* Then if we have all the keys. */
|
||||||
* If instead is a new key, we pass the request to the node that is
|
|
||||||
* receiving the slot. */
|
/* If we don't have all the keys and we are migrating the slot, send
|
||||||
if (n == myself && server.cluster->migrating_slots_to[slot] != NULL) {
|
* an ASK redirection. */
|
||||||
if (lookupKeyRead(&server.db[0],firstkey) == NULL) {
|
if (migrating_slot && missing_keys) {
|
||||||
if (ask) *ask = 1;
|
if (error_code) *error_code = REDIS_CLUSTER_REDIR_ASK;
|
||||||
return server.cluster->migrating_slots_to[slot];
|
return server.cluster->migrating_slots_to[slot];
|
||||||
}
|
}
|
||||||
}
|
|
||||||
/* Handle the case in which we are receiving this hash slot from
|
/* If we are receiving the slot, we have all the keys, and the client
|
||||||
* another instance, so we'll accept the query even if in the table
|
* correctly flagged the request as "ASKING", we can serve
|
||||||
* it is assigned to a different node, but only if the client
|
* the request, otherwise the only option is to send a TRYAGAIN error. */
|
||||||
* issued an ASKING command before. */
|
if (importing_slot &&
|
||||||
if (server.cluster->importing_slots_from[slot] != NULL &&
|
(c->flags & REDIS_ASKING || cmd->flags & REDIS_CMD_ASKING))
|
||||||
(c->flags & REDIS_ASKING || cmd->flags & REDIS_CMD_ASKING)) {
|
{
|
||||||
|
if (missing_keys) {
|
||||||
|
if (error_code) *error_code = REDIS_CLUSTER_REDIR_UNSTABLE;
|
||||||
|
return NULL;
|
||||||
|
} else {
|
||||||
return myself;
|
return myself;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Handle the read-only client case reading from a slave: if this
|
/* Handle the read-only client case reading from a slave: if this
|
||||||
* node is a slave and the request is about an hash slot our master
|
* node is a slave and the request is about an hash slot our master
|
||||||
* is serving, we can reply without redirection. */
|
* is serving, we can reply without redirection. */
|
||||||
@ -3905,6 +3963,9 @@ clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **arg
|
|||||||
{
|
{
|
||||||
return myself;
|
return myself;
|
||||||
}
|
}
|
||||||
/* It's not a -ASK case. Base case: just return the right node. */
|
|
||||||
|
/* Base case: just return the right node. However if this node is not
|
||||||
|
* myself, set error_code to MOVED since we need to issue a rediretion. */
|
||||||
|
if (n != myself && error_code) *error_code = REDIS_CLUSTER_REDIR_MOVED;
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
|
@ -24,6 +24,13 @@
|
|||||||
#define REDIS_CLUSTER_MF_TIMEOUT 5000 /* Milliseconds to do a manual failover. */
|
#define REDIS_CLUSTER_MF_TIMEOUT 5000 /* Milliseconds to do a manual failover. */
|
||||||
#define REDIS_CLUSTER_MF_PAUSE_MULT 2 /* Master pause manual failover mult. */
|
#define REDIS_CLUSTER_MF_PAUSE_MULT 2 /* Master pause manual failover mult. */
|
||||||
|
|
||||||
|
/* Redirection errors returned by getNodeByQuery(). */
|
||||||
|
#define REDIS_CLUSTER_REDIR_NONE 0 /* Node can serve the request. */
|
||||||
|
#define REDIS_CLUSTER_REDIR_CROSS_SLOT 1 /* Keys in different slots. */
|
||||||
|
#define REDIS_CLUSTER_REDIR_UNSTABLE 2 /* Keys in slot resharding. */
|
||||||
|
#define REDIS_CLUSTER_REDIR_ASK 3 /* -ASK redirection required. */
|
||||||
|
#define REDIS_CLUSTER_REDIR_MOVED 4 /* -MOVED redirection required. */
|
||||||
|
|
||||||
struct clusterNode;
|
struct clusterNode;
|
||||||
|
|
||||||
/* clusterLink encapsulates everything needed to talk with a remote node. */
|
/* clusterLink encapsulates everything needed to talk with a remote node. */
|
||||||
|
18
src/redis.c
18
src/redis.c
@ -2021,15 +2021,25 @@ int processCommand(redisClient *c) {
|
|||||||
addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down. Use CLUSTER INFO for more information\r\n"));
|
addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down. Use CLUSTER INFO for more information\r\n"));
|
||||||
return REDIS_OK;
|
return REDIS_OK;
|
||||||
} else {
|
} else {
|
||||||
int ask;
|
int error_code;
|
||||||
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&ask);
|
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
|
||||||
if (n == NULL) {
|
if (n == NULL) {
|
||||||
addReplyError(c,"Multi keys request invalid in cluster");
|
if (error_code == REDIS_CLUSTER_REDIR_CROSS_SLOT) {
|
||||||
|
addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
|
||||||
|
} else if (error_code == REDIS_CLUSTER_REDIR_UNSTABLE) {
|
||||||
|
/* The request spawns mutliple keys in the same slot,
|
||||||
|
* but the slot is not "stable" currently as there is
|
||||||
|
* a migration or import in progress. */
|
||||||
|
addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));
|
||||||
|
} else {
|
||||||
|
redisPanic("getNodeByQuery() unknown error.");
|
||||||
|
}
|
||||||
return REDIS_OK;
|
return REDIS_OK;
|
||||||
} else if (n != server.cluster->myself) {
|
} else if (n != server.cluster->myself) {
|
||||||
flagTransaction(c);
|
flagTransaction(c);
|
||||||
addReplySds(c,sdscatprintf(sdsempty(),
|
addReplySds(c,sdscatprintf(sdsempty(),
|
||||||
"-%s %d %s:%d\r\n", ask ? "ASK" : "MOVED",
|
"-%s %d %s:%d\r\n",
|
||||||
|
(error_code == REDIS_CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
|
||||||
hashslot,n->ip,n->port));
|
hashslot,n->ip,n->port));
|
||||||
return REDIS_OK;
|
return REDIS_OK;
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user