Cluster Manager: setting new slot owner is now handled atomically

in 'fix' command.
This commit is contained in:
artix 2018-11-28 16:59:16 +01:00
parent 0c7a108dc9
commit b5d9d24d3b

View File

@ -1929,6 +1929,7 @@ static dictType clusterManagerDictType = {
}; };
typedef int clusterManagerCommandProc(int argc, char **argv); typedef int clusterManagerCommandProc(int argc, char **argv);
typedef int (*clusterManagerOnReplyError)(redisReply *reply, int bulk_idx);
/* Cluster Manager helper functions */ /* Cluster Manager helper functions */
@ -2188,6 +2189,38 @@ static int clusterManagerCheckRedisReply(clusterManagerNode *n,
return 1; return 1;
} }
/* Execute MULTI command on a cluster node. */
static int clusterManagerStartTransaction(clusterManagerNode *node) {
redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "MULTI");
int success = clusterManagerCheckRedisReply(node, reply, NULL);
if (reply) freeReplyObject(reply);
return success;
}
/* Execute EXEC command on a cluster node. */
static int clusterManagerExecTransaction(clusterManagerNode *node,
clusterManagerOnReplyError onerror)
{
redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "EXEC");
int success = clusterManagerCheckRedisReply(node, reply, NULL);
if (success) {
if (reply->type != REDIS_REPLY_ARRAY) {
success = 0;
goto cleanup;
}
size_t i;
for (i = 0; i < reply->elements; i++) {
redisReply *r = reply->element[i];
success = clusterManagerCheckRedisReply(node, r, NULL);
if (!success && onerror) success = onerror(r, i);
if (!success) break;
}
}
cleanup:
if (reply) freeReplyObject(reply);
return success;
}
static int clusterManagerNodeConnect(clusterManagerNode *node) { static int clusterManagerNodeConnect(clusterManagerNode *node) {
if (node->context) redisFree(node->context); if (node->context) redisFree(node->context);
node->context = redisConnect(node->ip, node->port); node->context = redisConnect(node->ip, node->port);
@ -2794,6 +2827,31 @@ static int clusterManagerBumpEpoch(clusterManagerNode *node) {
return success; return success;
} }
static int clusterManagerIgnoreUnassignedErr(redisReply *reply, int bulk_idx) {
if (bulk_idx == 0 && reply) {
if (reply->type == REDIS_REPLY_ERROR)
return strstr(reply->str, "already unassigned") != NULL;
}
return 0;
}
static int clusterManagerSetSlotOwner(clusterManagerNode *owner,
int slot,
int do_clear)
{
int success = clusterManagerStartTransaction(owner);
if (!success) return 0;
/* Ensure the slot is not already assigned. */
clusterManagerDelSlot(owner, slot, 0);
/* Add the slot and bump epoch. */
clusterManagerAddSlot(owner, slot);
if (do_clear) clusterManagerClearSlotStatus(owner, slot);
clusterManagerBumpEpoch(owner);
success = clusterManagerExecTransaction(owner,
clusterManagerIgnoreUnassignedErr);
return success;
}
/* Migrate keys taken from reply->elements. It returns the reply from the /* Migrate keys taken from reply->elements. It returns the reply from the
* MIGRATE command, or NULL if something goes wrong. If the argument 'dots' * MIGRATE command, or NULL if something goes wrong. If the argument 'dots'
* is not NULL, a dot will be printed for every migrated key. */ * is not NULL, a dot will be printed for every migrated key. */
@ -3675,11 +3733,10 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
clusterManagerNode *n = clusterManagerNodeMasterRandom(); clusterManagerNode *n = clusterManagerNodeMasterRandom();
clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n", clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n",
slot, n->ip, n->port); slot, n->ip, n->port);
/* Ensure the slot is not already assigned. */ if (!clusterManagerSetSlotOwner(n, s, 0)) {
clusterManagerDelSlot(n, s, 1); fixed = -1;
if (!clusterManagerAddSlot(n, s)) fixed = -1; goto cleanup;
if (fixed >= 0 && !clusterManagerBumpEpoch(n)) fixed = -1; }
if (fixed < 0) goto cleanup;
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot /* Since CLUSTER ADDSLOTS succeeded, we also update the slot
* info into the node struct, in order to keep it synced */ * info into the node struct, in order to keep it synced */
n->slots[s] = 1; n->slots[s] = 1;
@ -3707,11 +3764,10 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
clusterManagerNode *n = fn->value; clusterManagerNode *n = fn->value;
clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n", clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n",
slot, n->ip, n->port); slot, n->ip, n->port);
/* Ensure the slot is not already assigned. */ if (!clusterManagerSetSlotOwner(n, s, 0)) {
clusterManagerDelSlot(n, s, 1); fixed = -1;
if (!clusterManagerAddSlot(n, s)) fixed = -1; goto cleanup;
if (fixed >= 0 && !clusterManagerBumpEpoch(n)) fixed = -1; }
if (fixed < 0) goto cleanup;
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot /* Since CLUSTER ADDSLOTS succeeded, we also update the slot
* info into the node struct, in order to keep it synced */ * info into the node struct, in order to keep it synced */
n->slots[atoi(slot)] = 1; n->slots[atoi(slot)] = 1;
@ -3744,14 +3800,10 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
clusterManagerLogInfo(">>> Covering slot %s moving keys " clusterManagerLogInfo(">>> Covering slot %s moving keys "
"to %s:%d\n", slot, "to %s:%d\n", slot,
target->ip, target->port); target->ip, target->port);
/* Ensure the slot is not already assigned. */ if (!clusterManagerSetSlotOwner(target, s, 1)) {
clusterManagerDelSlot(target, s, 1);
if (!clusterManagerAddSlot(target, s)) fixed = -1;
if (fixed < 0) goto cleanup;
if (!clusterManagerClearSlotStatus(target, s))
fixed = -1; fixed = -1;
if (fixed >= 0 && !clusterManagerBumpEpoch(target)) fixed = -1; goto cleanup;
if (fixed < 0) goto cleanup; }
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot /* Since CLUSTER ADDSLOTS succeeded, we also update the slot
* info into the node struct, in order to keep it synced */ * info into the node struct, in order to keep it synced */
target->slots[atoi(slot)] = 1; target->slots[atoi(slot)] = 1;
@ -3905,11 +3957,7 @@ static int clusterManagerFixOpenSlot(int slot) {
owner->ip, owner->port); owner->ip, owner->port);
success = clusterManagerClearSlotStatus(owner, slot); success = clusterManagerClearSlotStatus(owner, slot);
if (!success) goto cleanup; if (!success) goto cleanup;
/* Ensure that the slot is unassigned before assigning it to the success = clusterManagerSetSlotOwner(owner, slot, 0);
* owner. */
success = clusterManagerDelSlot(owner, slot, 1);
if (!success) goto cleanup;
success = clusterManagerAddSlot(owner, slot);
if (!success) goto cleanup; if (!success) goto cleanup;
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot /* Since CLUSTER ADDSLOTS succeeded, we also update the slot
* info into the node struct, in order to keep it synced */ * info into the node struct, in order to keep it synced */
@ -4052,15 +4100,8 @@ static int clusterManagerFixMultipleSlotOwners(int slot, list *owners) {
if (!owner) owner = listFirst(owners)->value; if (!owner) owner = listFirst(owners)->value;
clusterManagerLogInfo(">>> Setting slot %d owner: %s:%d", clusterManagerLogInfo(">>> Setting slot %d owner: %s:%d",
slot, owner->ip, owner->port); slot, owner->ip, owner->port);
/* Set the owner node by calling DELSLOTS in order to unassign the slot /* Set the slot owner. */
* in case it's already assigned to another node and by finally calling if (!clusterManagerSetSlotOwner(owner, slot, 0)) return 0;
* ADDSLOTS and BUMPEPOCH. The call to DELSLOTS is not checked since it
* could reply with an "already unassigned" error and if it should fail
* for other reasons, it would lead to a failure in the follwing ADDSLOTS
* command. */
clusterManagerDelSlot(owner, slot, 1);
if (!clusterManagerAddSlot(owner, slot)) return 0;
if (!clusterManagerBumpEpoch(owner)) return 0;
listIter li; listIter li;
listNode *ln; listNode *ln;
listRewind(cluster_manager.nodes, &li); listRewind(cluster_manager.nodes, &li);