diff --git a/src/cluster.c b/src/cluster.c index dd053de70..3439dab0a 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -744,3 +744,177 @@ int isValidAuxString(char *s, unsigned int length) { } return 1; } + +void clusterCommandMyId(client *c) { + char *name = clusterNodeGetName(getMyClusterNode()); + if (name) { + addReplyBulkCBuffer(c,name, CLUSTER_NAMELEN); + } else { + addReplyError(c, "No ID yet"); + } +} + +void clusterCommandMyShardId(client *c) { + char *sid = clusterNodeGetShardId(getMyClusterNode()); + if (sid) { + addReplyBulkCBuffer(c,sid, CLUSTER_NAMELEN); + } else { + addReplyError(c, "No shard ID yet"); + } +} + +/* When a cluster command is called, we need to decide whether to return TLS info or + * non-TLS info by the client's connection type. However if the command is called by + * a Lua script or RM_call, there is no connection in the fake client, so we use + * server.current_client here to get the real client if available. And if it is not + * available (modules may call commands without a real client), we return the default + * info, which is determined by server.tls_cluster. */ +static int shouldReturnTlsInfo(void) { + if (server.current_client && server.current_client->conn) { + return connIsTLS(server.current_client->conn); + } else { + return server.tls_cluster; + } +} + +unsigned int countKeysInSlot(unsigned int slot) { + return dictSize(server.db->dict[slot]); +} + +void clusterCommandHelp(client *c) { + const char *help[] = { + "COUNTKEYSINSLOT ", + " Return the number of keys in .", + "GETKEYSINSLOT ", + " Return key names stored by current node in a slot.", + "INFO", + " Return information about the cluster.", + "KEYSLOT ", + " Return the hash slot for .", + "MYID", + " Return the node id.", + "MYSHARDID", + " Return the node's shard id.", + "NODES", + " Return cluster configuration seen by node. Output format:", + " ...", + "REPLICAS ", + " Return replicas.", + "SLOTS", + " Return information about slots range mappings. Each range is made of:", + " start, end, master and replicas IP addresses, ports and ids", + "SHARDS", + " Return information about slot range mappings and the nodes associated with them.", + NULL + }; + + addExtendedReplyHelp(c, help, clusterCommandSpecialHelp()); +} + +void clusterCommand(client *c) { + if (server.cluster_enabled == 0) { + addReplyError(c,"This instance has cluster support disabled"); + return; + } + + if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) { + clusterCommandHelp(c); + } else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) { + /* CLUSTER NODES */ + /* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */ + sds nodes = clusterGenNodesDescription(c, 0, shouldReturnTlsInfo()); + addReplyVerbatim(c,nodes,sdslen(nodes),"txt"); + sdsfree(nodes); + } else if (!strcasecmp(c->argv[1]->ptr,"myid") && c->argc == 2) { + /* CLUSTER MYID */ + clusterCommandMyId(c); + } else if (!strcasecmp(c->argv[1]->ptr,"myshardid") && c->argc == 2) { + /* CLUSTER MYSHARDID */ + clusterCommandMyShardId(c); + } else if (!strcasecmp(c->argv[1]->ptr,"slots") && c->argc == 2) { + /* CLUSTER SLOTS */ + clusterCommandSlots(c); + } else if (!strcasecmp(c->argv[1]->ptr,"shards") && c->argc == 2) { + /* CLUSTER SHARDS */ + clusterCommandShards(c); + } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) { + /* CLUSTER INFO */ + + sds info = genClusterInfoString(); + + /* Produce the reply protocol. */ + addReplyVerbatim(c,info,sdslen(info),"txt"); + sdsfree(info); + } else if (!strcasecmp(c->argv[1]->ptr,"keyslot") && c->argc == 3) { + /* CLUSTER KEYSLOT */ + sds key = c->argv[2]->ptr; + + addReplyLongLong(c,keyHashSlot(key,sdslen(key))); + } else if (!strcasecmp(c->argv[1]->ptr,"countkeysinslot") && c->argc == 3) { + /* CLUSTER COUNTKEYSINSLOT */ + long long slot; + + if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK) + return; + if (slot < 0 || slot >= CLUSTER_SLOTS) { + addReplyError(c,"Invalid slot"); + return; + } + addReplyLongLong(c,countKeysInSlot(slot)); + } else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) { + /* CLUSTER GETKEYSINSLOT */ + long long maxkeys, slot; + + if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK) + return; + if (getLongLongFromObjectOrReply(c,c->argv[3],&maxkeys,NULL) + != C_OK) + return; + if (slot < 0 || slot >= CLUSTER_SLOTS || maxkeys < 0) { + addReplyError(c,"Invalid slot or number of keys"); + return; + } + + unsigned int keys_in_slot = countKeysInSlot(slot); + unsigned int numkeys = maxkeys > keys_in_slot ? keys_in_slot : maxkeys; + addReplyArrayLen(c,numkeys); + dictIterator *iter = NULL; + dictEntry *de = NULL; + iter = dictGetIterator(server.db->dict[slot]); + for (unsigned int i = 0; i < numkeys; i++) { + de = dictNext(iter); + serverAssert(de != NULL); + sds sdskey = dictGetKey(de); + addReplyBulkCBuffer(c, sdskey, sdslen(sdskey)); + } + dictReleaseIterator(iter); + } else if ((!strcasecmp(c->argv[1]->ptr,"slaves") || + !strcasecmp(c->argv[1]->ptr,"replicas")) && c->argc == 3) { + /* CLUSTER SLAVES */ + /* CLUSTER REPLICAS */ + clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr)); + int j; + + /* Lookup the specified node in our table. */ + if (!n) { + addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr); + return; + } + + if (clusterNodeIsSlave(n)) { + addReplyError(c,"The specified node is not a master"); + return; + } + + /* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */ + addReplyArrayLen(c, getNumSlaves(n)); + for (j = 0; j < getNumSlaves(n); j++) { + sds ni = clusterGenNodeDescription(c, getSlave(n, j), shouldReturnTlsInfo()); + addReplyBulkCString(c,ni); + sdsfree(ni); + } + } else if(!clusterCommandSpecial(c)) { + addReplySubcommandSyntaxError(c); + return; + } +} diff --git a/src/cluster.h b/src/cluster.h index 7cf412351..5160582b3 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -68,7 +68,7 @@ int getClusterSize(void); char** getClusterNodesList(size_t *numnodes); int nodeIsMaster(clusterNode *n); int handleDebugClusterCommand(client *c); -int clusterNodeConfirmedReachable(clusterNode *node); +int clusterNodePending(clusterNode *node); char* clusterNodeIp(clusterNode *node); int clusterNodeIsSlave(clusterNode *node); clusterNode *clusterNodeGetSlaveof(clusterNode *node); @@ -76,6 +76,17 @@ char* clusterNodeGetName(clusterNode *node); int clusterNodeTimedOut(clusterNode *node); int clusterNodeIsFailing(clusterNode *node); int clusterNodeIsNoFailover(clusterNode *node); +void clusterCommand(client *c); +int clusterCommandSpecial(client *c); +const char** clusterCommandSpecialHelp(void); +char* clusterNodeGetShardId(clusterNode *node); +void clusterCommandSlots(client * c); +void clusterCommandMyId(client *c); +void clusterCommandMyShardId(client *c); +void clusterCommandShards(client *c); +sds clusterGenNodeDescription(client *c, clusterNode *node, int tls_primary); +int getNumSlaves(clusterNode *node); +clusterNode *getSlave(clusterNode *node, int slave_idx); char **clusterDebugCommandHelp(void); ConnectionType *connTypeOfCluster(void); diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index bba4395f3..2b080ef89 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -120,20 +120,6 @@ static inline int defaultClientPort(void) { return server.tls_cluster ? server.tls_port : server.port; } -/* When a cluster command is called, we need to decide whether to return TLS info or - * non-TLS info by the client's connection type. However if the command is called by - * a Lua script or RM_call, there is no connection in the fake client, so we use - * server.current_client here to get the real client if available. And if it is not - * available (modules may call commands without a real client), we return the default - * info, which is determined by server.tls_cluster. */ -static int shouldReturnTlsInfo(void) { - if (server.current_client && server.current_client->conn) { - return connIsTLS(server.current_client->conn); - } else { - return server.tls_cluster; - } -} - #define isSlotUnclaimed(slot) \ (server.cluster->slots[slot] == NULL || \ bitmapTestBit(server.cluster->owner_not_claiming_slot, slot)) @@ -5678,7 +5664,7 @@ void addShardReplyForClusterShards(client *c, list *nodes) { /* Add to the output buffer of the given client, an array of slot (start, end) * pair owned by the shard, also the primary and set of replica(s) along with * information about each node. */ -void clusterReplyShards(client *c) { +void clusterCommandShards(client *c) { addReplyArrayLen(c, dictSize(server.cluster->shards)); /* This call will add slot_info_pairs to all nodes */ clusterGenNodesSlotsInfo(0); @@ -5689,7 +5675,7 @@ void clusterReplyShards(client *c) { dictReleaseIterator(di); } -void clusterReplyMultiBulkSlots(client * c) { +void clusterCommandSlots(client * c) { /* Format: 1) 1) start slot * 2) end slot * 3) 1) master IP @@ -5804,614 +5790,6 @@ sds genClusterInfoString(void) { return info; } -void clusterCommand(client *c) { - if (server.cluster_enabled == 0) { - addReplyError(c,"This instance has cluster support disabled"); - return; - } - - if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) { - const char *help[] = { -"ADDSLOTS [ ...]", -" Assign slots to current node.", -"ADDSLOTSRANGE [ ...]", -" Assign slots which are between and to current node.", -"BUMPEPOCH", -" Advance the cluster config epoch.", -"COUNT-FAILURE-REPORTS ", -" Return number of failure reports for .", -"COUNTKEYSINSLOT ", -" Return the number of keys in .", -"DELSLOTS [ ...]", -" Delete slots information from current node.", -"DELSLOTSRANGE [ ...]", -" Delete slots information which are between and from current node.", -"FAILOVER [FORCE|TAKEOVER]", -" Promote current replica node to being a master.", -"FORGET ", -" Remove a node from the cluster.", -"GETKEYSINSLOT ", -" Return key names stored by current node in a slot.", -"FLUSHSLOTS", -" Delete current node own slots information.", -"INFO", -" Return information about the cluster.", -"KEYSLOT ", -" Return the hash slot for .", -"MEET []", -" Connect nodes into a working cluster.", -"MYID", -" Return the node id.", -"MYSHARDID", -" Return the node's shard id.", -"NODES", -" Return cluster configuration seen by node. Output format:", -" ...", -"REPLICATE ", -" Configure current node as replica to .", -"RESET [HARD|SOFT]", -" Reset current node (default: soft).", -"SET-CONFIG-EPOCH ", -" Set config epoch of current node.", -"SETSLOT (IMPORTING |MIGRATING |STABLE|NODE )", -" Set slot state.", -"REPLICAS ", -" Return replicas.", -"SAVECONFIG", -" Force saving cluster configuration on disk.", -"SLOTS", -" Return information about slots range mappings. Each range is made of:", -" start, end, master and replicas IP addresses, ports and ids", -"SHARDS", -" Return information about slot range mappings and the nodes associated with them.", -"LINKS", -" Return information about all network links between this node and its peers.", -" Output format is an array where each array element is a map containing attributes of a link", -NULL - }; - addReplyHelp(c, help); - } else if (!strcasecmp(c->argv[1]->ptr,"meet") && (c->argc == 4 || c->argc == 5)) { - /* CLUSTER MEET [cport] */ - long long port, cport; - - if (getLongLongFromObject(c->argv[3], &port) != C_OK) { - addReplyErrorFormat(c,"Invalid base port specified: %s", - (char*)c->argv[3]->ptr); - return; - } - - if (c->argc == 5) { - if (getLongLongFromObject(c->argv[4], &cport) != C_OK) { - addReplyErrorFormat(c,"Invalid bus port specified: %s", - (char*)c->argv[4]->ptr); - return; - } - } else { - cport = port + CLUSTER_PORT_INCR; - } - - if (clusterStartHandshake(c->argv[2]->ptr,port,cport) == 0 && - errno == EINVAL) - { - addReplyErrorFormat(c,"Invalid node address specified: %s:%s", - (char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr); - } else { - addReply(c,shared.ok); - } - } else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) { - /* CLUSTER NODES */ - /* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */ - sds nodes = clusterGenNodesDescription(c, 0, shouldReturnTlsInfo()); - addReplyVerbatim(c,nodes,sdslen(nodes),"txt"); - sdsfree(nodes); - } else if (!strcasecmp(c->argv[1]->ptr,"myid") && c->argc == 2) { - /* CLUSTER MYID */ - addReplyBulkCBuffer(c,myself->name, CLUSTER_NAMELEN); - } else if (!strcasecmp(c->argv[1]->ptr,"myshardid") && c->argc == 2) { - /* CLUSTER MYSHARDID */ - addReplyBulkCBuffer(c,myself->shard_id, CLUSTER_NAMELEN); - } else if (!strcasecmp(c->argv[1]->ptr,"slots") && c->argc == 2) { - /* CLUSTER SLOTS */ - clusterReplyMultiBulkSlots(c); - } else if (!strcasecmp(c->argv[1]->ptr,"shards") && c->argc == 2) { - /* CLUSTER SHARDS */ - clusterReplyShards(c); - } else if (!strcasecmp(c->argv[1]->ptr,"flushslots") && c->argc == 2) { - /* CLUSTER FLUSHSLOTS */ - if (dbSize(&server.db[0], DB_MAIN) != 0) { - addReplyError(c,"DB must be empty to perform CLUSTER FLUSHSLOTS."); - return; - } - clusterDelNodeSlots(myself); - clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); - addReply(c,shared.ok); - } else if ((!strcasecmp(c->argv[1]->ptr,"addslots") || - !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3) - { - /* CLUSTER ADDSLOTS [slot] ... */ - /* CLUSTER DELSLOTS [slot] ... */ - int j, slot; - unsigned char *slots = zmalloc(CLUSTER_SLOTS); - int del = !strcasecmp(c->argv[1]->ptr,"delslots"); - - memset(slots,0,CLUSTER_SLOTS); - /* Check that all the arguments are parseable.*/ - for (j = 2; j < c->argc; j++) { - if ((slot = getSlotOrReply(c,c->argv[j])) == C_ERR) { - zfree(slots); - return; - } - } - /* Check that the slots are not already busy. */ - for (j = 2; j < c->argc; j++) { - slot = getSlotOrReply(c,c->argv[j]); - if (checkSlotAssignmentsOrReply(c, slots, del, slot, slot) == C_ERR) { - zfree(slots); - return; - } - } - clusterUpdateSlots(c, slots, del); - zfree(slots); - clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); - addReply(c,shared.ok); - } else if ((!strcasecmp(c->argv[1]->ptr,"addslotsrange") || - !strcasecmp(c->argv[1]->ptr,"delslotsrange")) && c->argc >= 4) { - if (c->argc % 2 == 1) { - addReplyErrorArity(c); - return; - } - /* CLUSTER ADDSLOTSRANGE [ ...] */ - /* CLUSTER DELSLOTSRANGE [ ...] */ - int j, startslot, endslot; - unsigned char *slots = zmalloc(CLUSTER_SLOTS); - int del = !strcasecmp(c->argv[1]->ptr,"delslotsrange"); - - memset(slots,0,CLUSTER_SLOTS); - /* Check that all the arguments are parseable and that all the - * slots are not already busy. */ - for (j = 2; j < c->argc; j += 2) { - if ((startslot = getSlotOrReply(c,c->argv[j])) == C_ERR) { - zfree(slots); - return; - } - if ((endslot = getSlotOrReply(c,c->argv[j+1])) == C_ERR) { - zfree(slots); - return; - } - if (startslot > endslot) { - addReplyErrorFormat(c,"start slot number %d is greater than end slot number %d", startslot, endslot); - zfree(slots); - return; - } - - if (checkSlotAssignmentsOrReply(c, slots, del, startslot, endslot) == C_ERR) { - zfree(slots); - return; - } - } - clusterUpdateSlots(c, slots, del); - zfree(slots); - clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); - addReply(c,shared.ok); - } else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) { - /* SETSLOT 10 MIGRATING */ - /* SETSLOT 10 IMPORTING */ - /* SETSLOT 10 STABLE */ - /* SETSLOT 10 NODE */ - int slot; - clusterNode *n; - - if (nodeIsSlave(myself)) { - addReplyError(c,"Please use SETSLOT only with masters."); - return; - } - - if ((slot = getSlotOrReply(c,c->argv[2])) == -1) return; - - if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) { - if (server.cluster->slots[slot] != myself) { - addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot); - return; - } - n = clusterLookupNode(c->argv[4]->ptr, sdslen(c->argv[4]->ptr)); - if (n == NULL) { - addReplyErrorFormat(c,"I don't know about node %s", - (char*)c->argv[4]->ptr); - return; - } - if (nodeIsSlave(n)) { - addReplyError(c,"Target node is not a master"); - return; - } - server.cluster->migrating_slots_to[slot] = n; - } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) { - if (server.cluster->slots[slot] == myself) { - addReplyErrorFormat(c, - "I'm already the owner of hash slot %u",slot); - return; - } - n = clusterLookupNode(c->argv[4]->ptr, sdslen(c->argv[4]->ptr)); - if (n == NULL) { - addReplyErrorFormat(c,"I don't know about node %s", - (char*)c->argv[4]->ptr); - return; - } - if (nodeIsSlave(n)) { - addReplyError(c,"Target node is not a master"); - return; - } - server.cluster->importing_slots_from[slot] = n; - } else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc == 4) { - /* CLUSTER SETSLOT STABLE */ - server.cluster->importing_slots_from[slot] = NULL; - server.cluster->migrating_slots_to[slot] = NULL; - } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) { - /* CLUSTER SETSLOT NODE */ - n = clusterLookupNode(c->argv[4]->ptr, sdslen(c->argv[4]->ptr)); - if (!n) { - addReplyErrorFormat(c,"Unknown node %s", - (char*)c->argv[4]->ptr); - return; - } - if (nodeIsSlave(n)) { - addReplyError(c,"Target node is not a master"); - return; - } - /* If this hash slot was served by 'myself' before to switch - * make sure there are no longer local keys for this hash slot. */ - if (server.cluster->slots[slot] == myself && n != myself) { - if (countKeysInSlot(slot) != 0) { - addReplyErrorFormat(c, - "Can't assign hashslot %d to a different node " - "while I still hold keys for this hash slot.", slot); - return; - } - } - /* If this slot is in migrating status but we have no keys - * for it assigning the slot to another node will clear - * the migrating status. */ - if (countKeysInSlot(slot) == 0 && - server.cluster->migrating_slots_to[slot]) - server.cluster->migrating_slots_to[slot] = NULL; - - int slot_was_mine = server.cluster->slots[slot] == myself; - clusterDelSlot(slot); - clusterAddSlot(n,slot); - - /* If we are a master left without slots, we should turn into a - * replica of the new master. */ - if (slot_was_mine && - n != myself && - myself->numslots == 0 && - server.cluster_allow_replica_migration) - { - serverLog(LL_NOTICE, - "Configuration change detected. Reconfiguring myself " - "as a replica of %.40s (%s)", n->name, n->human_nodename); - clusterSetMaster(n); - clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | - CLUSTER_TODO_UPDATE_STATE | - CLUSTER_TODO_FSYNC_CONFIG); - } - - /* If this node was importing this slot, assigning the slot to - * itself also clears the importing status. */ - if (n == myself && - server.cluster->importing_slots_from[slot]) - { - /* This slot was manually migrated, set this node configEpoch - * to a new epoch so that the new version can be propagated - * by the cluster. - * - * Note that if this ever results in a collision with another - * node getting the same configEpoch, for example because a - * failover happens at the same time we close the slot, the - * configEpoch collision resolution will fix it assigning - * a different epoch to each node. */ - if (clusterBumpConfigEpochWithoutConsensus() == C_OK) { - serverLog(LL_NOTICE, - "configEpoch updated after importing slot %d", slot); - } - server.cluster->importing_slots_from[slot] = NULL; - /* After importing this slot, let the other nodes know as - * soon as possible. */ - clusterBroadcastPong(CLUSTER_BROADCAST_ALL); - } - } else { - addReplyError(c, - "Invalid CLUSTER SETSLOT action or number of arguments. Try CLUSTER HELP"); - return; - } - clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE); - addReply(c,shared.ok); - } else if (!strcasecmp(c->argv[1]->ptr,"bumpepoch") && c->argc == 2) { - /* CLUSTER BUMPEPOCH */ - int retval = clusterBumpConfigEpochWithoutConsensus(); - sds reply = sdscatprintf(sdsempty(),"+%s %llu\r\n", - (retval == C_OK) ? "BUMPED" : "STILL", - (unsigned long long) myself->configEpoch); - addReplySds(c,reply); - } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) { - /* CLUSTER INFO */ - - sds info = genClusterInfoString(); - - /* Produce the reply protocol. */ - addReplyVerbatim(c,info,sdslen(info),"txt"); - sdsfree(info); - } else if (!strcasecmp(c->argv[1]->ptr,"saveconfig") && c->argc == 2) { - int retval = clusterSaveConfig(1); - - if (retval == 0) - addReply(c,shared.ok); - else - addReplyErrorFormat(c,"error saving the cluster node config: %s", - strerror(errno)); - } else if (!strcasecmp(c->argv[1]->ptr,"keyslot") && c->argc == 3) { - /* CLUSTER KEYSLOT */ - sds key = c->argv[2]->ptr; - - addReplyLongLong(c,keyHashSlot(key,sdslen(key))); - } else if (!strcasecmp(c->argv[1]->ptr,"countkeysinslot") && c->argc == 3) { - /* CLUSTER COUNTKEYSINSLOT */ - long long slot; - - if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK) - return; - if (slot < 0 || slot >= CLUSTER_SLOTS) { - addReplyError(c,"Invalid slot"); - return; - } - addReplyLongLong(c,countKeysInSlot(slot)); - } else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) { - /* CLUSTER GETKEYSINSLOT */ - long long maxkeys, slot; - - if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK) - return; - if (getLongLongFromObjectOrReply(c,c->argv[3],&maxkeys,NULL) - != C_OK) - return; - if (slot < 0 || slot >= CLUSTER_SLOTS || maxkeys < 0) { - addReplyError(c,"Invalid slot or number of keys"); - return; - } - - unsigned int keys_in_slot = countKeysInSlot(slot); - unsigned int numkeys = maxkeys > keys_in_slot ? keys_in_slot : maxkeys; - addReplyArrayLen(c,numkeys); - dictIterator *iter = NULL; - dictEntry *de = NULL; - iter = dictGetIterator(server.db->dict[slot]); - for (unsigned int i = 0; i < numkeys; i++) { - de = dictNext(iter); - serverAssert(de != NULL); - sds sdskey = dictGetKey(de); - addReplyBulkCBuffer(c, sdskey, sdslen(sdskey)); - } - dictReleaseIterator(iter); - } else if (!strcasecmp(c->argv[1]->ptr,"forget") && c->argc == 3) { - /* CLUSTER FORGET */ - clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr)); - if (!n) { - if (clusterBlacklistExists((char*)c->argv[2]->ptr)) - /* Already forgotten. The deletion may have been gossipped by - * another node, so we pretend it succeeded. */ - addReply(c,shared.ok); - else - addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr); - return; - } else if (n == myself) { - addReplyError(c,"I tried hard but I can't forget myself..."); - return; - } else if (nodeIsSlave(myself) && myself->slaveof == n) { - addReplyError(c,"Can't forget my master!"); - return; - } - clusterBlacklistAddNode(n); - clusterDelNode(n); - clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE| - CLUSTER_TODO_SAVE_CONFIG); - addReply(c,shared.ok); - } else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc == 3) { - /* CLUSTER REPLICATE */ - /* Lookup the specified node in our table. */ - clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr)); - if (!n) { - addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr); - return; - } - - /* I can't replicate myself. */ - if (n == myself) { - addReplyError(c,"Can't replicate myself"); - return; - } - - /* Can't replicate a slave. */ - if (nodeIsSlave(n)) { - addReplyError(c,"I can only replicate a master, not a replica."); - return; - } - - /* If the instance is currently a master, it should have no assigned - * slots nor keys to accept to replicate some other node. - * Slaves can switch to another master without issues. */ - if (nodeIsMaster(myself) && - (myself->numslots != 0 || dbSize(&server.db[0], DB_MAIN) != 0)) { - addReplyError(c, - "To set a master the node must be empty and " - "without assigned slots."); - return; - } - - /* Set the master. */ - clusterSetMaster(n); - clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); - addReply(c,shared.ok); - } else if ((!strcasecmp(c->argv[1]->ptr,"slaves") || - !strcasecmp(c->argv[1]->ptr,"replicas")) && c->argc == 3) { - /* CLUSTER SLAVES */ - /* CLUSTER REPLICAS */ - clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr)); - int j; - - /* Lookup the specified node in our table. */ - if (!n) { - addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr); - return; - } - - if (nodeIsSlave(n)) { - addReplyError(c,"The specified node is not a master"); - return; - } - - /* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */ - addReplyArrayLen(c,n->numslaves); - for (j = 0; j < n->numslaves; j++) { - sds ni = clusterGenNodeDescription(c, n->slaves[j], shouldReturnTlsInfo()); - addReplyBulkCString(c,ni); - sdsfree(ni); - } - } else if (!strcasecmp(c->argv[1]->ptr,"count-failure-reports") && - c->argc == 3) - { - /* CLUSTER COUNT-FAILURE-REPORTS */ - clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr)); - - if (!n) { - addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr); - return; - } else { - addReplyLongLong(c,clusterNodeFailureReportsCount(n)); - } - } else if (!strcasecmp(c->argv[1]->ptr,"failover") && - (c->argc == 2 || c->argc == 3)) - { - /* CLUSTER FAILOVER [FORCE|TAKEOVER] */ - int force = 0, takeover = 0; - - if (c->argc == 3) { - if (!strcasecmp(c->argv[2]->ptr,"force")) { - force = 1; - } else if (!strcasecmp(c->argv[2]->ptr,"takeover")) { - takeover = 1; - force = 1; /* Takeover also implies force. */ - } else { - addReplyErrorObject(c,shared.syntaxerr); - return; - } - } - - /* Check preconditions. */ - if (nodeIsMaster(myself)) { - addReplyError(c,"You should send CLUSTER FAILOVER to a replica"); - return; - } else if (myself->slaveof == NULL) { - addReplyError(c,"I'm a replica but my master is unknown to me"); - return; - } else if (!force && - (nodeFailed(myself->slaveof) || - myself->slaveof->link == NULL)) - { - addReplyError(c,"Master is down or failed, " - "please use CLUSTER FAILOVER FORCE"); - return; - } - resetManualFailover(); - server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT; - - if (takeover) { - /* A takeover does not perform any initial check. It just - * generates a new configuration epoch for this node without - * consensus, claims the master's slots, and broadcast the new - * configuration. */ - serverLog(LL_NOTICE,"Taking over the master (user request)."); - clusterBumpConfigEpochWithoutConsensus(); - clusterFailoverReplaceYourMaster(); - } else if (force) { - /* If this is a forced failover, we don't need to talk with our - * master to agree about the offset. We just failover taking over - * it without coordination. */ - serverLog(LL_NOTICE,"Forced failover user request accepted."); - server.cluster->mf_can_start = 1; - } else { - serverLog(LL_NOTICE,"Manual failover user request accepted."); - clusterSendMFStart(myself->slaveof); - } - addReply(c,shared.ok); - } else if (!strcasecmp(c->argv[1]->ptr,"set-config-epoch") && c->argc == 3) - { - /* CLUSTER SET-CONFIG-EPOCH - * - * The user is allowed to set the config epoch only when a node is - * totally fresh: no config epoch, no other known node, and so forth. - * This happens at cluster creation time to start with a cluster where - * every node has a different node ID, without to rely on the conflicts - * resolution system which is too slow when a big cluster is created. */ - long long epoch; - - if (getLongLongFromObjectOrReply(c,c->argv[2],&epoch,NULL) != C_OK) - return; - - if (epoch < 0) { - addReplyErrorFormat(c,"Invalid config epoch specified: %lld",epoch); - } else if (dictSize(server.cluster->nodes) > 1) { - addReplyError(c,"The user can assign a config epoch only when the " - "node does not know any other node."); - } else if (myself->configEpoch != 0) { - addReplyError(c,"Node config epoch is already non-zero"); - } else { - myself->configEpoch = epoch; - serverLog(LL_NOTICE, - "configEpoch set to %llu via CLUSTER SET-CONFIG-EPOCH", - (unsigned long long) myself->configEpoch); - - if (server.cluster->currentEpoch < (uint64_t)epoch) - server.cluster->currentEpoch = epoch; - /* No need to fsync the config here since in the unlucky event - * of a failure to persist the config, the conflict resolution code - * will assign a unique config to this node. */ - clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE| - CLUSTER_TODO_SAVE_CONFIG); - addReply(c,shared.ok); - } - } else if (!strcasecmp(c->argv[1]->ptr,"reset") && - (c->argc == 2 || c->argc == 3)) - { - /* CLUSTER RESET [SOFT|HARD] */ - int hard = 0; - - /* Parse soft/hard argument. Default is soft. */ - if (c->argc == 3) { - if (!strcasecmp(c->argv[2]->ptr,"hard")) { - hard = 1; - } else if (!strcasecmp(c->argv[2]->ptr,"soft")) { - hard = 0; - } else { - addReplyErrorObject(c,shared.syntaxerr); - return; - } - } - - /* Slaves can be reset while containing data, but not master nodes - * that must be empty. */ - if (nodeIsMaster(myself) && dbSize(c->db, DB_MAIN) != 0) { - addReplyError(c,"CLUSTER RESET can't be called with " - "master nodes containing keys"); - return; - } - clusterReset(hard); - addReply(c,shared.ok); - } else if (!strcasecmp(c->argv[1]->ptr,"links") && c->argc == 2) { - /* CLUSTER LINKS */ - addReplyClusterLinksDescription(c); - } else { - addReplySubcommandSyntaxError(c); - return; - } -} void removeChannelsInSlot(unsigned int slot) { unsigned int channelcount = countChannelsInSlot(slot); @@ -6857,10 +6235,6 @@ unsigned int delKeysInSlot(unsigned int hashslot) { return j; } -unsigned int countKeysInSlot(unsigned int slot) { - return dictSize(server.db->dict[slot]); -} - /* ----------------------------------------------------------------------------- * Operation(s) on channel rax tree. * -------------------------------------------------------------------------- */ @@ -6988,8 +6362,8 @@ int handleDebugClusterCommand(client *c) { return 1; } -int clusterNodeConfirmedReachable(clusterNode *node) { - return !(node->flags & (CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE)); +int clusterNodePending(clusterNode *node) { + return node->flags & (CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE); } char* clusterNodeIp(clusterNode *node) { @@ -6997,7 +6371,7 @@ char* clusterNodeIp(clusterNode *node) { } int clusterNodeIsSlave(clusterNode *node) { - return !nodeIsMaster(node); + return node->flags & CLUSTER_NODE_SLAVE; } clusterNode *clusterNodeGetSlaveof(clusterNode *node) { @@ -7020,12 +6394,513 @@ int clusterNodeIsNoFailover(clusterNode *node) { return node->flags & CLUSTER_NODE_NOFAILOVER; } -char **clusterDebugCommandHelp(void) { - const char *help[] = { +const char **clusterDebugCommandHelp(void) { + static const char *help[] = { "CLUSTERLINK KILL ", - " Kills the link based on the direction to/from (both) with the provided node." , + " Kills the link based on the direction to/from (both) with the provided node.", NULL }; return help; } + +char* clusterNodeGetShardId(clusterNode *node) { + return node->shard_id; +} + +int clusterCommandSpecial(client *c) { + if (!strcasecmp(c->argv[1]->ptr,"meet") && (c->argc == 4 || c->argc == 5)) { + /* CLUSTER MEET [cport] */ + long long port, cport; + + if (getLongLongFromObject(c->argv[3], &port) != C_OK) { + addReplyErrorFormat(c,"Invalid base port specified: %s", + (char*)c->argv[3]->ptr); + return 1; + } + + if (c->argc == 5) { + if (getLongLongFromObject(c->argv[4], &cport) != C_OK) { + addReplyErrorFormat(c,"Invalid bus port specified: %s", + (char*)c->argv[4]->ptr); + return 1; + } + } else { + cport = port + CLUSTER_PORT_INCR; + } + + if (clusterStartHandshake(c->argv[2]->ptr,port,cport) == 0 && + errno == EINVAL) + { + addReplyErrorFormat(c,"Invalid node address specified: %s:%s", + (char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr); + } else { + addReply(c,shared.ok); + } + } else if (!strcasecmp(c->argv[1]->ptr,"flushslots") && c->argc == 2) { + /* CLUSTER FLUSHSLOTS */ + if (dbSize(&server.db[0], DB_MAIN) != 0) { + addReplyError(c,"DB must be empty to perform CLUSTER FLUSHSLOTS."); + return 1; + } + clusterDelNodeSlots(myself); + clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); + addReply(c,shared.ok); + } else if ((!strcasecmp(c->argv[1]->ptr,"addslots") || + !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3) { + /* CLUSTER ADDSLOTS [slot] ... */ + /* CLUSTER DELSLOTS [slot] ... */ + int j, slot; + unsigned char *slots = zmalloc(CLUSTER_SLOTS); + int del = !strcasecmp(c->argv[1]->ptr,"delslots"); + + memset(slots,0,CLUSTER_SLOTS); + /* Check that all the arguments are parseable.*/ + for (j = 2; j < c->argc; j++) { + if ((slot = getSlotOrReply(c,c->argv[j])) == C_ERR) { + zfree(slots); + return 1; + } + } + /* Check that the slots are not already busy. */ + for (j = 2; j < c->argc; j++) { + slot = getSlotOrReply(c,c->argv[j]); + if (checkSlotAssignmentsOrReply(c, slots, del, slot, slot) == C_ERR) { + zfree(slots); + return 1; + } + } + clusterUpdateSlots(c, slots, del); + zfree(slots); + clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); + addReply(c,shared.ok); + } else if ((!strcasecmp(c->argv[1]->ptr,"addslotsrange") || + !strcasecmp(c->argv[1]->ptr,"delslotsrange")) && c->argc >= 4) { + if (c->argc % 2 == 1) { + addReplyErrorArity(c); + return 1; + } + /* CLUSTER ADDSLOTSRANGE [ ...] */ + /* CLUSTER DELSLOTSRANGE [ ...] */ + int j, startslot, endslot; + unsigned char *slots = zmalloc(CLUSTER_SLOTS); + int del = !strcasecmp(c->argv[1]->ptr,"delslotsrange"); + + memset(slots,0,CLUSTER_SLOTS); + /* Check that all the arguments are parseable and that all the + * slots are not already busy. */ + for (j = 2; j < c->argc; j += 2) { + if ((startslot = getSlotOrReply(c,c->argv[j])) == C_ERR) { + zfree(slots); + return 1; + } + if ((endslot = getSlotOrReply(c,c->argv[j+1])) == C_ERR) { + zfree(slots); + return 1; + } + if (startslot > endslot) { + addReplyErrorFormat(c,"start slot number %d is greater than end slot number %d", startslot, endslot); + zfree(slots); + return 1; + } + + if (checkSlotAssignmentsOrReply(c, slots, del, startslot, endslot) == C_ERR) { + zfree(slots); + return 1; + } + } + clusterUpdateSlots(c, slots, del); + zfree(slots); + clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); + addReply(c,shared.ok); + } else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) { + /* SETSLOT 10 MIGRATING */ + /* SETSLOT 10 IMPORTING */ + /* SETSLOT 10 STABLE */ + /* SETSLOT 10 NODE */ + int slot; + clusterNode *n; + + if (nodeIsSlave(myself)) { + addReplyError(c,"Please use SETSLOT only with masters."); + return 1; + } + + if ((slot = getSlotOrReply(c, c->argv[2])) == -1) return 1; + + if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) { + if (server.cluster->slots[slot] != myself) { + addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot); + return 1; + } + n = clusterLookupNode(c->argv[4]->ptr, sdslen(c->argv[4]->ptr)); + if (n == NULL) { + addReplyErrorFormat(c,"I don't know about node %s", + (char*)c->argv[4]->ptr); + return 1; + } + if (nodeIsSlave(n)) { + addReplyError(c,"Target node is not a master"); + return 1; + } + server.cluster->migrating_slots_to[slot] = n; + } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) { + if (server.cluster->slots[slot] == myself) { + addReplyErrorFormat(c, + "I'm already the owner of hash slot %u",slot); + return 1; + } + n = clusterLookupNode(c->argv[4]->ptr, sdslen(c->argv[4]->ptr)); + if (n == NULL) { + addReplyErrorFormat(c,"I don't know about node %s", + (char*)c->argv[4]->ptr); + return 1; + } + if (nodeIsSlave(n)) { + addReplyError(c,"Target node is not a master"); + return 1; + } + server.cluster->importing_slots_from[slot] = n; + } else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc == 4) { + /* CLUSTER SETSLOT STABLE */ + server.cluster->importing_slots_from[slot] = NULL; + server.cluster->migrating_slots_to[slot] = NULL; + } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) { + /* CLUSTER SETSLOT NODE */ + n = clusterLookupNode(c->argv[4]->ptr, sdslen(c->argv[4]->ptr)); + if (!n) { + addReplyErrorFormat(c,"Unknown node %s", + (char*)c->argv[4]->ptr); + return 1; + } + if (nodeIsSlave(n)) { + addReplyError(c,"Target node is not a master"); + return 1; + } + /* If this hash slot was served by 'myself' before to switch + * make sure there are no longer local keys for this hash slot. */ + if (server.cluster->slots[slot] == myself && n != myself) { + if (countKeysInSlot(slot) != 0) { + addReplyErrorFormat(c, + "Can't assign hashslot %d to a different node " + "while I still hold keys for this hash slot.", slot); + return 1; + } + } + /* If this slot is in migrating status but we have no keys + * for it assigning the slot to another node will clear + * the migrating status. */ + if (countKeysInSlot(slot) == 0 && + server.cluster->migrating_slots_to[slot]) + server.cluster->migrating_slots_to[slot] = NULL; + + int slot_was_mine = server.cluster->slots[slot] == myself; + clusterDelSlot(slot); + clusterAddSlot(n,slot); + + /* If we are a master left without slots, we should turn into a + * replica of the new master. */ + if (slot_was_mine && + n != myself && + myself->numslots == 0 && + server.cluster_allow_replica_migration) { + serverLog(LL_NOTICE, + "Configuration change detected. Reconfiguring myself " + "as a replica of %.40s (%s)", n->name, n->human_nodename); + clusterSetMaster(n); + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | + CLUSTER_TODO_UPDATE_STATE | + CLUSTER_TODO_FSYNC_CONFIG); + } + + /* If this node was importing this slot, assigning the slot to + * itself also clears the importing status. */ + if (n == myself && + server.cluster->importing_slots_from[slot]) { + /* This slot was manually migrated, set this node configEpoch + * to a new epoch so that the new version can be propagated + * by the cluster. + * + * Note that if this ever results in a collision with another + * node getting the same configEpoch, for example because a + * failover happens at the same time we close the slot, the + * configEpoch collision resolution will fix it assigning + * a different epoch to each node. */ + if (clusterBumpConfigEpochWithoutConsensus() == C_OK) { + serverLog(LL_NOTICE, + "configEpoch updated after importing slot %d", slot); + } + server.cluster->importing_slots_from[slot] = NULL; + /* After importing this slot, let the other nodes know as + * soon as possible. */ + clusterBroadcastPong(CLUSTER_BROADCAST_ALL); + } + } else { + addReplyError(c, + "Invalid CLUSTER SETSLOT action or number of arguments. Try CLUSTER HELP"); + return 1; + } + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE); + addReply(c,shared.ok); + } else if (!strcasecmp(c->argv[1]->ptr,"bumpepoch") && c->argc == 2) { + /* CLUSTER BUMPEPOCH */ + int retval = clusterBumpConfigEpochWithoutConsensus(); + sds reply = sdscatprintf(sdsempty(),"+%s %llu\r\n", + (retval == C_OK) ? "BUMPED" : "STILL", + (unsigned long long) myself->configEpoch); + addReplySds(c,reply); + } else if (!strcasecmp(c->argv[1]->ptr,"saveconfig") && c->argc == 2) { + int retval = clusterSaveConfig(1); + + if (retval == 0) + addReply(c,shared.ok); + else + addReplyErrorFormat(c,"error saving the cluster node config: %s", + strerror(errno)); + } else if (!strcasecmp(c->argv[1]->ptr,"forget") && c->argc == 3) { + /* CLUSTER FORGET */ + clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr)); + if (!n) { + if (clusterBlacklistExists((char*)c->argv[2]->ptr)) + /* Already forgotten. The deletion may have been gossipped by + * another node, so we pretend it succeeded. */ + addReply(c,shared.ok); + else + addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr); + return 1; + } else if (n == myself) { + addReplyError(c,"I tried hard but I can't forget myself..."); + return 1; + } else if (nodeIsSlave(myself) && myself->slaveof == n) { + addReplyError(c,"Can't forget my master!"); + return 1; + } + clusterBlacklistAddNode(n); + clusterDelNode(n); + clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE| + CLUSTER_TODO_SAVE_CONFIG); + addReply(c,shared.ok); + } else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc == 3) { + /* CLUSTER REPLICATE */ + /* Lookup the specified node in our table. */ + clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr)); + if (!n) { + addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr); + return 1; + } + + /* I can't replicate myself. */ + if (n == myself) { + addReplyError(c,"Can't replicate myself"); + return 1; + } + + /* Can't replicate a slave. */ + if (nodeIsSlave(n)) { + addReplyError(c,"I can only replicate a master, not a replica."); + return 1; + } + + /* If the instance is currently a master, it should have no assigned + * slots nor keys to accept to replicate some other node. + * Slaves can switch to another master without issues. */ + if (nodeIsMaster(myself) && + (myself->numslots != 0 || dbSize(&server.db[0], DB_MAIN) != 0)) { + addReplyError(c, + "To set a master the node must be empty and " + "without assigned slots."); + return 1; + } + + /* Set the master. */ + clusterSetMaster(n); + clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); + addReply(c,shared.ok); + } else if (!strcasecmp(c->argv[1]->ptr,"count-failure-reports") && + c->argc == 3) + { + /* CLUSTER COUNT-FAILURE-REPORTS */ + clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr)); + + if (!n) { + addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr); + return 1; + } else { + addReplyLongLong(c,clusterNodeFailureReportsCount(n)); + } + } else if (!strcasecmp(c->argv[1]->ptr,"failover") && + (c->argc == 2 || c->argc == 3)) + { + /* CLUSTER FAILOVER [FORCE|TAKEOVER] */ + int force = 0, takeover = 0; + + if (c->argc == 3) { + if (!strcasecmp(c->argv[2]->ptr,"force")) { + force = 1; + } else if (!strcasecmp(c->argv[2]->ptr,"takeover")) { + takeover = 1; + force = 1; /* Takeover also implies force. */ + } else { + addReplyErrorObject(c,shared.syntaxerr); + return 1; + } + } + + /* Check preconditions. */ + if (nodeIsMaster(myself)) { + addReplyError(c,"You should send CLUSTER FAILOVER to a replica"); + return 1; + } else if (myself->slaveof == NULL) { + addReplyError(c,"I'm a replica but my master is unknown to me"); + return 1; + } else if (!force && + (nodeFailed(myself->slaveof) || + myself->slaveof->link == NULL)) + { + addReplyError(c,"Master is down or failed, " + "please use CLUSTER FAILOVER FORCE"); + return 1; + } + resetManualFailover(); + server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT; + + if (takeover) { + /* A takeover does not perform any initial check. It just + * generates a new configuration epoch for this node without + * consensus, claims the master's slots, and broadcast the new + * configuration. */ + serverLog(LL_NOTICE,"Taking over the master (user request)."); + clusterBumpConfigEpochWithoutConsensus(); + clusterFailoverReplaceYourMaster(); + } else if (force) { + /* If this is a forced failover, we don't need to talk with our + * master to agree about the offset. We just failover taking over + * it without coordination. */ + serverLog(LL_NOTICE,"Forced failover user request accepted."); + server.cluster->mf_can_start = 1; + } else { + serverLog(LL_NOTICE,"Manual failover user request accepted."); + clusterSendMFStart(myself->slaveof); + } + addReply(c,shared.ok); + } else if (!strcasecmp(c->argv[1]->ptr,"set-config-epoch") && c->argc == 3) + { + /* CLUSTER SET-CONFIG-EPOCH + * + * The user is allowed to set the config epoch only when a node is + * totally fresh: no config epoch, no other known node, and so forth. + * This happens at cluster creation time to start with a cluster where + * every node has a different node ID, without to rely on the conflicts + * resolution system which is too slow when a big cluster is created. */ + long long epoch; + + if (getLongLongFromObjectOrReply(c,c->argv[2],&epoch,NULL) != C_OK) + return 1; + + if (epoch < 0) { + addReplyErrorFormat(c,"Invalid config epoch specified: %lld",epoch); + } else if (dictSize(server.cluster->nodes) > 1) { + addReplyError(c,"The user can assign a config epoch only when the " + "node does not know any other node."); + } else if (myself->configEpoch != 0) { + addReplyError(c,"Node config epoch is already non-zero"); + } else { + myself->configEpoch = epoch; + serverLog(LL_NOTICE, + "configEpoch set to %llu via CLUSTER SET-CONFIG-EPOCH", + (unsigned long long) myself->configEpoch); + + if (server.cluster->currentEpoch < (uint64_t)epoch) + server.cluster->currentEpoch = epoch; + /* No need to fsync the config here since in the unlucky event + * of a failure to persist the config, the conflict resolution code + * will assign a unique config to this node. */ + clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE| + CLUSTER_TODO_SAVE_CONFIG); + addReply(c,shared.ok); + } + } else if (!strcasecmp(c->argv[1]->ptr,"reset") && + (c->argc == 2 || c->argc == 3)) + { + /* CLUSTER RESET [SOFT|HARD] */ + int hard = 0; + + /* Parse soft/hard argument. Default is soft. */ + if (c->argc == 3) { + if (!strcasecmp(c->argv[2]->ptr,"hard")) { + hard = 1; + } else if (!strcasecmp(c->argv[2]->ptr,"soft")) { + hard = 0; + } else { + addReplyErrorObject(c,shared.syntaxerr); + return 1; + } + } + + /* Slaves can be reset while containing data, but not master nodes + * that must be empty. */ + if (nodeIsMaster(myself) && dbSize(c->db, DB_MAIN) != 0) { + addReplyError(c,"CLUSTER RESET can't be called with " + "master nodes containing keys"); + return 1; + } + clusterReset(hard); + addReply(c,shared.ok); + } else if (!strcasecmp(c->argv[1]->ptr,"links") && c->argc == 2) { + /* CLUSTER LINKS */ + addReplyClusterLinksDescription(c); + } else { + return 0; + } + + return 1; +} + +const char** clusterCommandSpecialHelp(void) { + static const char *help[] = { + "ADDSLOTS [ ...]", + " Assign slots to current node.", + "ADDSLOTSRANGE [ ...]", + " Assign slots which are between and to current node.", + "BUMPEPOCH", + " Advance the cluster config epoch.", + "COUNT-FAILURE-REPORTS ", + " Return number of failure reports for .", + "DELSLOTS [ ...]", + " Delete slots information from current node.", + "DELSLOTSRANGE [ ...]", + " Delete slots information which are between and from current node.", + "FAILOVER [FORCE|TAKEOVER]", + " Promote current replica node to being a master.", + "FORGET ", + " Remove a node from the cluster.", + "FLUSHSLOTS", + " Delete current node own slots information.", + "MEET []", + " Connect nodes into a working cluster.", + "REPLICATE ", + " Configure current node as replica to .", + "RESET [HARD|SOFT]", + " Reset current node (default: soft).", + "SET-CONFIG-EPOCH ", + " Set config epoch of current node.", + "SETSLOT (IMPORTING |MIGRATING |STABLE|NODE )", + " Set slot state.", + "SAVECONFIG", + " Force saving cluster configuration on disk.", + "LINKS", + " Return information about all network links between this node and its peers.", + " Output format is an array where each array element is a map containing attributes of a link", + NULL + }; + + return help; +} + +int getNumSlaves(clusterNode *node) { + return node->numslaves; +} + +clusterNode *getSlave(clusterNode *node, int slave_idx) { + return node->slaves[slave_idx]; +} diff --git a/src/module.c b/src/module.c index 115a7cbc4..5a813fb5c 100644 --- a/src/module.c +++ b/src/module.c @@ -8967,7 +8967,7 @@ int RM_GetClusterNodeInfo(RedisModuleCtx *ctx, const char *id, char *ip, char *m UNUSED(ctx); clusterNode *node = clusterLookupNode(id, strlen(id)); - if (node == NULL || !clusterNodeConfirmedReachable(node)) + if (node == NULL || clusterNodePending(node)) { return REDISMODULE_ERR; }