Cluster Manager: avoid using reply error messages to check slot status.

Slot assignment status is now checked by using CLUSTER SLOTS.
Furthermore, one memory leak has been fixed.
This commit is contained in:
artix 2018-12-12 13:23:08 +01:00
parent 129f2d2746
commit d935cfcb89

View File

@ -1933,7 +1933,8 @@ static dictType clusterManagerDictType = {
}; };
typedef int clusterManagerCommandProc(int argc, char **argv); typedef int clusterManagerCommandProc(int argc, char **argv);
typedef int (*clusterManagerOnReplyError)(redisReply *reply, int bulk_idx); typedef int (*clusterManagerOnReplyError)(redisReply *reply,
clusterManagerNode *n, int bulk_idx);
/* Cluster Manager helper functions */ /* Cluster Manager helper functions */
@ -2196,7 +2197,7 @@ static int clusterManagerCheckRedisReply(clusterManagerNode *n,
return 1; return 1;
} }
/* Execute MULTI command on a cluster node. */ /* Call MULTI command on a cluster node. */
static int clusterManagerStartTransaction(clusterManagerNode *node) { static int clusterManagerStartTransaction(clusterManagerNode *node) {
redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "MULTI"); redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "MULTI");
int success = clusterManagerCheckRedisReply(node, reply, NULL); int success = clusterManagerCheckRedisReply(node, reply, NULL);
@ -2204,7 +2205,7 @@ static int clusterManagerStartTransaction(clusterManagerNode *node) {
return success; return success;
} }
/* Execute EXEC command on a cluster node. */ /* Call EXEC command on a cluster node. */
static int clusterManagerExecTransaction(clusterManagerNode *node, static int clusterManagerExecTransaction(clusterManagerNode *node,
clusterManagerOnReplyError onerror) clusterManagerOnReplyError onerror)
{ {
@ -2220,7 +2221,7 @@ static int clusterManagerExecTransaction(clusterManagerNode *node,
redisReply *r = reply->element[i]; redisReply *r = reply->element[i];
char *err = NULL; char *err = NULL;
success = clusterManagerCheckRedisReply(node, r, &err); success = clusterManagerCheckRedisReply(node, r, &err);
if (!success && onerror) success = onerror(r, i); if (!success && onerror) success = onerror(r, node, i);
if (err) { if (err) {
if (!success) if (!success)
CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err); CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err);
@ -2768,6 +2769,55 @@ cleanup:
return success; return success;
} }
/* Get the node the slot is assigned to from the point of view of node *n.
* If the slot is unassigned or if the reply is an error, return NULL.
* Use the **err argument in order to check wether the slot is unassigned
* or the reply resulted in an error. */
static clusterManagerNode *clusterManagerGetSlotOwner(clusterManagerNode *n,
int slot, char **err)
{
assert(slot >= 0 && slot < CLUSTER_MANAGER_SLOTS);
clusterManagerNode *owner = NULL;
redisReply *reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SLOTS");
if (clusterManagerCheckRedisReply(n, reply, err)) {
assert(reply->type == REDIS_REPLY_ARRAY);
size_t i;
for (i = 0; i < reply->elements; i++) {
redisReply *r = reply->element[i];
assert(r->type == REDIS_REPLY_ARRAY && r->elements >= 3);
int from, to;
from = r->element[0]->integer;
to = r->element[1]->integer;
if (slot < from || slot > to) continue;
redisReply *nr = r->element[2];
assert(nr->type == REDIS_REPLY_ARRAY && nr->elements >= 2);
char *name = NULL;
if (nr->elements >= 3)
name = nr->element[2]->str;
if (name != NULL)
owner = clusterManagerNodeByName(name);
else {
char *ip = nr->element[0]->str;
assert(ip != NULL);
int port = (int) nr->element[1]->integer;
listIter li;
listNode *ln;
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *nd = ln->value;
if (strcmp(nd->ip, ip) == 0 && port == nd->port) {
owner = nd;
break;
}
}
}
if (owner) break;
}
}
if (reply) freeReplyObject(reply);
return owner;
}
/* Set slot status to "importing" or "migrating" */ /* Set slot status to "importing" or "migrating" */
static int clusterManagerSetSlot(clusterManagerNode *node1, static int clusterManagerSetSlot(clusterManagerNode *node1,
clusterManagerNode *node2, clusterManagerNode *node2,
@ -2808,8 +2858,19 @@ static int clusterManagerDelSlot(clusterManagerNode *node, int slot,
char *err = NULL; char *err = NULL;
int success = clusterManagerCheckRedisReply(node, reply, &err); int success = clusterManagerCheckRedisReply(node, reply, &err);
if (!success && reply && reply->type == REDIS_REPLY_ERROR && if (!success && reply && reply->type == REDIS_REPLY_ERROR &&
ignore_unassigned_err && ignore_unassigned_err)
strstr(reply->str, "already unassigned") != NULL) success = 1; {
char *get_owner_err = NULL;
clusterManagerNode *assigned_to =
clusterManagerGetSlotOwner(node, slot, &get_owner_err);
if (!assigned_to) {
if (get_owner_err == NULL) success = 1;
else {
CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, get_owner_err);
zfree(get_owner_err);
}
}
}
if (!success && err != NULL) { if (!success && err != NULL) {
CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err); CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err);
zfree(err); zfree(err);
@ -2845,12 +2906,16 @@ static int clusterManagerBumpEpoch(clusterManagerNode *node) {
return success; return success;
} }
static int clusterManagerIgnoreUnassignedErr(redisReply *reply, int bulk_idx) { /* Callback used by clusterManagerSetSlotOwner transaction. It should ignore
if (bulk_idx == 0 && reply) { * errors except for ADDSLOTS errors.
if (reply->type == REDIS_REPLY_ERROR) * Return 1 if the error should be ignored. */
return strstr(reply->str, "already unassigned") != NULL; static int clusterManagerOnSetOwnerErr(redisReply *reply,
} clusterManagerNode *n, int bulk_idx)
return 0; {
UNUSED(reply);
UNUSED(n);
/* Only raise error when ADDSLOTS fail (bulk_idx == 1). */
return (bulk_idx != 1);
} }
static int clusterManagerSetSlotOwner(clusterManagerNode *owner, static int clusterManagerSetSlotOwner(clusterManagerNode *owner,
@ -2865,8 +2930,7 @@ static int clusterManagerSetSlotOwner(clusterManagerNode *owner,
clusterManagerAddSlot(owner, slot); clusterManagerAddSlot(owner, slot);
if (do_clear) clusterManagerClearSlotStatus(owner, slot); if (do_clear) clusterManagerClearSlotStatus(owner, slot);
clusterManagerBumpEpoch(owner); clusterManagerBumpEpoch(owner);
success = clusterManagerExecTransaction(owner, success = clusterManagerExecTransaction(owner, clusterManagerOnSetOwnerErr);
clusterManagerIgnoreUnassignedErr);
return success; return success;
} }
@ -2950,7 +3014,7 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source,
char **err) char **err)
{ {
int success = 1; int success = 1;
int replace_existing_keys = (config.cluster_manager_command.flags & int retry = (config.cluster_manager_command.flags &
(CLUSTER_MANAGER_CMD_FLAG_FIX | CLUSTER_MANAGER_CMD_FLAG_REPLACE)); (CLUSTER_MANAGER_CMD_FLAG_FIX | CLUSTER_MANAGER_CMD_FLAG_REPLACE));
while (1) { while (1) {
char *dots = NULL; char *dots = NULL;
@ -2983,16 +3047,35 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source,
if (migrate_reply == NULL) goto next; if (migrate_reply == NULL) goto next;
if (migrate_reply->type == REDIS_REPLY_ERROR) { if (migrate_reply->type == REDIS_REPLY_ERROR) {
int is_busy = strstr(migrate_reply->str, "BUSYKEY") != NULL; int is_busy = strstr(migrate_reply->str, "BUSYKEY") != NULL;
int not_served = strstr(migrate_reply->str, "slot not served") != NULL; int not_served = 0;
if (replace_existing_keys && (is_busy || not_served)) { if (!is_busy) {
char *get_owner_err = NULL;
clusterManagerNode *served_by =
clusterManagerGetSlotOwner(source, slot, &get_owner_err);
if (!served_by) {
if (get_owner_err == NULL) not_served = 1;
else {
CLUSTER_MANAGER_PRINT_REPLY_ERROR(source,
get_owner_err);
zfree(get_owner_err);
}
}
}
if (retry && (is_busy || not_served)) {
/* If the key already exists, try to migrate keys /* If the key already exists, try to migrate keys
* adding REPLACE option. * adding REPLACE option.
* If the key's slot is not served, try to assign slot * If the key's slot is not served, try to assign slot
* to the target node. */ * to the target node. */
if (not_served) if (not_served) {
clusterManagerLogWarn("*** Slot was not served, setting "
"owner to node %s:%d.\n",
target->ip, target->port);
clusterManagerSetSlot(source, target, slot, "node", NULL); clusterManagerSetSlot(source, target, slot, "node", NULL);
}
if (is_busy) {
clusterManagerLogWarn("*** Target key exists. " clusterManagerLogWarn("*** Target key exists. "
"Replacing it for FIX.\n"); "Replacing it for FIX.\n");
}
freeReplyObject(migrate_reply); freeReplyObject(migrate_reply);
migrate_reply = clusterManagerMigrateKeysInReply(source, migrate_reply = clusterManagerMigrateKeysInReply(source,
target, target,
@ -4252,7 +4335,7 @@ static int clusterManagerCheckCluster(int quiet) {
n->port); n->port);
for (i = 0; i < n->migrating_count; i += 2) { for (i = 0; i < n->migrating_count; i += 2) {
sds slot = n->migrating[i]; sds slot = n->migrating[i];
dictAdd(open_slots, slot, sdsdup(n->migrating[i + 1])); dictReplace(open_slots, slot, sdsdup(n->migrating[i + 1]));
char *fmt = (i > 0 ? ",%S" : "%S"); char *fmt = (i > 0 ? ",%S" : "%S");
errstr = sdscatfmt(errstr, fmt, slot); errstr = sdscatfmt(errstr, fmt, slot);
} }
@ -4270,7 +4353,7 @@ static int clusterManagerCheckCluster(int quiet) {
n->port); n->port);
for (i = 0; i < n->importing_count; i += 2) { for (i = 0; i < n->importing_count; i += 2) {
sds slot = n->importing[i]; sds slot = n->importing[i];
dictAdd(open_slots, slot, sdsdup(n->importing[i + 1])); dictReplace(open_slots, slot, sdsdup(n->importing[i + 1]));
char *fmt = (i > 0 ? ",%S" : "%S"); char *fmt = (i > 0 ? ",%S" : "%S");
errstr = sdscatfmt(errstr, fmt, slot); errstr = sdscatfmt(errstr, fmt, slot);
} }
@ -4333,7 +4416,7 @@ static int clusterManagerCheckCluster(int quiet) {
/* Check whether there are multiple owners, even when slots are /* Check whether there are multiple owners, even when slots are
* fully covered and there are no open slots. */ * fully covered and there are no open slots. */
clusterManagerLogInfo(">>> Check for multiple slot owners...\n"); clusterManagerLogInfo(">>> Check for multiple slot owners...\n");
int slot = 0; int slot = 0, slots_with_multiple_owners = 0;
for (; slot < CLUSTER_MANAGER_SLOTS; slot++) { for (; slot < CLUSTER_MANAGER_SLOTS; slot++) {
listIter li; listIter li;
listNode *ln; listNode *ln;
@ -4359,6 +4442,7 @@ static int clusterManagerCheckCluster(int quiet) {
clusterManagerNode *n = ln->value; clusterManagerNode *n = ln->value;
clusterManagerLogErr(" %s:%d\n", n->ip, n->port); clusterManagerLogErr(" %s:%d\n", n->ip, n->port);
} }
slots_with_multiple_owners++;
if (do_fix) { if (do_fix) {
result = clusterManagerFixMultipleSlotOwners(slot, owners); result = clusterManagerFixMultipleSlotOwners(slot, owners);
if (!result) { if (!result) {
@ -4366,11 +4450,13 @@ static int clusterManagerCheckCluster(int quiet) {
"for slot %d\n", slot); "for slot %d\n", slot);
listRelease(owners); listRelease(owners);
break; break;
} } else slots_with_multiple_owners--;
} }
} }
listRelease(owners); listRelease(owners);
} }
if (slots_with_multiple_owners == 0)
clusterManagerLogOk("[OK] No multiple owners found.\n");
} }
return result; return result;
} }