Cluster refactor: break up clusterCommand

Divide up clusterCommand into clusterCommand for shared
sub-commands and clusterCommandSpecial for implementation
specific sub-commands. So to, the cluster command help
sub-command has been divided into two implementations,
clusterCommandHelp and clusterCommandHelpSpecial. Some
common sub-subcommand implementations have been extracted
and their implemenations either made shared or else
implementation specific.

Signed-off-by: Josh Hershberg <yehoshua@redis.com>
This commit is contained in:
Josh Hershberg 2023-11-01 14:51:49 +02:00
parent 33ef6a3003
commit 4afc54ad9b
4 changed files with 696 additions and 636 deletions

View File

@ -744,3 +744,177 @@ int isValidAuxString(char *s, unsigned int length) {
}
return 1;
}
void clusterCommandMyId(client *c) {
char *name = clusterNodeGetName(getMyClusterNode());
if (name) {
addReplyBulkCBuffer(c,name, CLUSTER_NAMELEN);
} else {
addReplyError(c, "No ID yet");
}
}
void clusterCommandMyShardId(client *c) {
char *sid = clusterNodeGetShardId(getMyClusterNode());
if (sid) {
addReplyBulkCBuffer(c,sid, CLUSTER_NAMELEN);
} else {
addReplyError(c, "No shard ID yet");
}
}
/* When a cluster command is called, we need to decide whether to return TLS info or
* non-TLS info by the client's connection type. However if the command is called by
* a Lua script or RM_call, there is no connection in the fake client, so we use
* server.current_client here to get the real client if available. And if it is not
* available (modules may call commands without a real client), we return the default
* info, which is determined by server.tls_cluster. */
static int shouldReturnTlsInfo(void) {
if (server.current_client && server.current_client->conn) {
return connIsTLS(server.current_client->conn);
} else {
return server.tls_cluster;
}
}
unsigned int countKeysInSlot(unsigned int slot) {
return dictSize(server.db->dict[slot]);
}
void clusterCommandHelp(client *c) {
const char *help[] = {
"COUNTKEYSINSLOT <slot>",
" Return the number of keys in <slot>.",
"GETKEYSINSLOT <slot> <count>",
" Return key names stored by current node in a slot.",
"INFO",
" Return information about the cluster.",
"KEYSLOT <key>",
" Return the hash slot for <key>.",
"MYID",
" Return the node id.",
"MYSHARDID",
" Return the node's shard id.",
"NODES",
" Return cluster configuration seen by node. Output format:",
" <id> <ip:port@bus-port[,hostname]> <flags> <master> <pings> <pongs> <epoch> <link> <slot> ...",
"REPLICAS <node-id>",
" Return <node-id> replicas.",
"SLOTS",
" Return information about slots range mappings. Each range is made of:",
" start, end, master and replicas IP addresses, ports and ids",
"SHARDS",
" Return information about slot range mappings and the nodes associated with them.",
NULL
};
addExtendedReplyHelp(c, help, clusterCommandSpecialHelp());
}
void clusterCommand(client *c) {
if (server.cluster_enabled == 0) {
addReplyError(c,"This instance has cluster support disabled");
return;
}
if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
clusterCommandHelp(c);
} else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) {
/* CLUSTER NODES */
/* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */
sds nodes = clusterGenNodesDescription(c, 0, shouldReturnTlsInfo());
addReplyVerbatim(c,nodes,sdslen(nodes),"txt");
sdsfree(nodes);
} else if (!strcasecmp(c->argv[1]->ptr,"myid") && c->argc == 2) {
/* CLUSTER MYID */
clusterCommandMyId(c);
} else if (!strcasecmp(c->argv[1]->ptr,"myshardid") && c->argc == 2) {
/* CLUSTER MYSHARDID */
clusterCommandMyShardId(c);
} else if (!strcasecmp(c->argv[1]->ptr,"slots") && c->argc == 2) {
/* CLUSTER SLOTS */
clusterCommandSlots(c);
} else if (!strcasecmp(c->argv[1]->ptr,"shards") && c->argc == 2) {
/* CLUSTER SHARDS */
clusterCommandShards(c);
} else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) {
/* CLUSTER INFO */
sds info = genClusterInfoString();
/* Produce the reply protocol. */
addReplyVerbatim(c,info,sdslen(info),"txt");
sdsfree(info);
} else if (!strcasecmp(c->argv[1]->ptr,"keyslot") && c->argc == 3) {
/* CLUSTER KEYSLOT <key> */
sds key = c->argv[2]->ptr;
addReplyLongLong(c,keyHashSlot(key,sdslen(key)));
} else if (!strcasecmp(c->argv[1]->ptr,"countkeysinslot") && c->argc == 3) {
/* CLUSTER COUNTKEYSINSLOT <slot> */
long long slot;
if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
return;
if (slot < 0 || slot >= CLUSTER_SLOTS) {
addReplyError(c,"Invalid slot");
return;
}
addReplyLongLong(c,countKeysInSlot(slot));
} else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) {
/* CLUSTER GETKEYSINSLOT <slot> <count> */
long long maxkeys, slot;
if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
return;
if (getLongLongFromObjectOrReply(c,c->argv[3],&maxkeys,NULL)
!= C_OK)
return;
if (slot < 0 || slot >= CLUSTER_SLOTS || maxkeys < 0) {
addReplyError(c,"Invalid slot or number of keys");
return;
}
unsigned int keys_in_slot = countKeysInSlot(slot);
unsigned int numkeys = maxkeys > keys_in_slot ? keys_in_slot : maxkeys;
addReplyArrayLen(c,numkeys);
dictIterator *iter = NULL;
dictEntry *de = NULL;
iter = dictGetIterator(server.db->dict[slot]);
for (unsigned int i = 0; i < numkeys; i++) {
de = dictNext(iter);
serverAssert(de != NULL);
sds sdskey = dictGetKey(de);
addReplyBulkCBuffer(c, sdskey, sdslen(sdskey));
}
dictReleaseIterator(iter);
} else if ((!strcasecmp(c->argv[1]->ptr,"slaves") ||
!strcasecmp(c->argv[1]->ptr,"replicas")) && c->argc == 3) {
/* CLUSTER SLAVES <NODE ID> */
/* CLUSTER REPLICAS <NODE ID> */
clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr));
int j;
/* Lookup the specified node in our table. */
if (!n) {
addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
return;
}
if (clusterNodeIsSlave(n)) {
addReplyError(c,"The specified node is not a master");
return;
}
/* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */
addReplyArrayLen(c, getNumSlaves(n));
for (j = 0; j < getNumSlaves(n); j++) {
sds ni = clusterGenNodeDescription(c, getSlave(n, j), shouldReturnTlsInfo());
addReplyBulkCString(c,ni);
sdsfree(ni);
}
} else if(!clusterCommandSpecial(c)) {
addReplySubcommandSyntaxError(c);
return;
}
}

View File

@ -68,7 +68,7 @@ int getClusterSize(void);
char** getClusterNodesList(size_t *numnodes);
int nodeIsMaster(clusterNode *n);
int handleDebugClusterCommand(client *c);
int clusterNodeConfirmedReachable(clusterNode *node);
int clusterNodePending(clusterNode *node);
char* clusterNodeIp(clusterNode *node);
int clusterNodeIsSlave(clusterNode *node);
clusterNode *clusterNodeGetSlaveof(clusterNode *node);
@ -76,6 +76,17 @@ char* clusterNodeGetName(clusterNode *node);
int clusterNodeTimedOut(clusterNode *node);
int clusterNodeIsFailing(clusterNode *node);
int clusterNodeIsNoFailover(clusterNode *node);
void clusterCommand(client *c);
int clusterCommandSpecial(client *c);
const char** clusterCommandSpecialHelp(void);
char* clusterNodeGetShardId(clusterNode *node);
void clusterCommandSlots(client * c);
void clusterCommandMyId(client *c);
void clusterCommandMyShardId(client *c);
void clusterCommandShards(client *c);
sds clusterGenNodeDescription(client *c, clusterNode *node, int tls_primary);
int getNumSlaves(clusterNode *node);
clusterNode *getSlave(clusterNode *node, int slave_idx);
char **clusterDebugCommandHelp(void);
ConnectionType *connTypeOfCluster(void);

File diff suppressed because it is too large Load Diff

View File

@ -8967,7 +8967,7 @@ int RM_GetClusterNodeInfo(RedisModuleCtx *ctx, const char *id, char *ip, char *m
UNUSED(ctx);
clusterNode *node = clusterLookupNode(id, strlen(id));
if (node == NULL || !clusterNodeConfirmedReachable(node))
if (node == NULL || clusterNodePending(node))
{
return REDISMODULE_ERR;
}