From 98a6c44b751df2a79d55703a8ff7caa740a8f064 Mon Sep 17 00:00:00 2001 From: Josh Hershberg Date: Mon, 30 Oct 2023 12:38:43 +0200 Subject: [PATCH] Cluster refactor: Make clusterState private Move clusterState into cluster_legacy.h. In order to achieve this some "accessor" methods needed to be added to the cluster API and some other minor refactors. Signed-off-by: Josh Hershberg --- src/cluster.h | 54 ++++++-------------------------------------- src/cluster_legacy.c | 42 +++++++++++++++++++++++++++++++++- src/cluster_legacy.h | 50 ++++++++++++++++++++++++++++++++++++++++ src/db.c | 2 +- src/module.c | 24 ++++---------------- src/replication.c | 2 +- src/script.c | 2 +- src/server.c | 4 ++-- src/server.h | 1 + 9 files changed, 108 insertions(+), 73 deletions(-) diff --git a/src/cluster.h b/src/cluster.h index 7f6687ae7..757861588 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -129,54 +129,8 @@ typedef struct clusterNode { list *fail_reports; /* List of nodes signaling this as failing */ } clusterNode; -typedef struct clusterState { - clusterNode *myself; /* This node */ - uint64_t currentEpoch; - int state; /* CLUSTER_OK, CLUSTER_FAIL, ... */ - int size; /* Num of master nodes with at least one slot */ - dict *nodes; /* Hash table of name -> clusterNode structures */ - dict *shards; /* Hash table of shard_id -> list (of nodes) structures */ - dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */ - clusterNode *migrating_slots_to[CLUSTER_SLOTS]; - clusterNode *importing_slots_from[CLUSTER_SLOTS]; - clusterNode *slots[CLUSTER_SLOTS]; - rax *slots_to_channels; - /* The following fields are used to take the slave state on elections. */ - mstime_t failover_auth_time; /* Time of previous or next election. */ - int failover_auth_count; /* Number of votes received so far. */ - int failover_auth_sent; /* True if we already asked for votes. */ - int failover_auth_rank; /* This slave rank for current auth request. */ - uint64_t failover_auth_epoch; /* Epoch of the current election. */ - int cant_failover_reason; /* Why a slave is currently not able to - failover. See the CANT_FAILOVER_* macros. */ - /* Manual failover state in common. */ - mstime_t mf_end; /* Manual failover time limit (ms unixtime). - It is zero if there is no MF in progress. */ - /* Manual failover state of master. */ - clusterNode *mf_slave; /* Slave performing the manual failover. */ - /* Manual failover state of slave. */ - long long mf_master_offset; /* Master offset the slave needs to start MF - or -1 if still not received. */ - int mf_can_start; /* If non-zero signal that the manual failover - can start requesting masters vote. */ - /* The following fields are used by masters to take state on elections. */ - uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */ - int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */ - /* Stats */ - /* Messages received and sent by type. */ - long long stats_bus_messages_sent[CLUSTERMSG_TYPE_COUNT]; - long long stats_bus_messages_received[CLUSTERMSG_TYPE_COUNT]; - long long stats_pfail_nodes; /* Number of nodes in PFAIL status, - excluding nodes without address. */ - unsigned long long stat_cluster_links_buffer_limit_exceeded; /* Total number of cluster links freed due to exceeding buffer limit */ +struct clusterState; - /* Bit map for slots that are no longer claimed by the owner in cluster PING - * messages. During slot migration, the owner will stop claiming the slot after - * the ownership transfer. Set the bit corresponding to the slot when a node - * stops claiming the slot. This prevents spreading incorrect information (that - * source still owns the slot) using UPDATE messages. */ - unsigned char owner_not_claiming_slot[CLUSTER_SLOTS / 8]; -} clusterState; /* ---------------------- API exported outside cluster.c -------------------- */ void clusterInit(void); @@ -208,5 +162,11 @@ int clusterNodeGetSlotBit(clusterNode *n, int slot); void clusterUpdateMyselfHumanNodename(void); int isValidAuxString(char *s, unsigned int length); int getNodeDefaultClientPort(clusterNode *n); +int clusterNodeIsMyself(clusterNode *n); +clusterNode* getMyClusterNode(void); +int clusterManualFailoverTimeLimit(void); +char* getMyClusterId(void); +int getClusterSize(void); +char** getClusterNodesList(size_t *numnodes); #endif /* __CLUSTER_H */ diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index aa1dc20c0..a5cfcf7c5 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -985,7 +985,7 @@ void clusterUpdateMyselfHumanNodename(void) { void clusterInit(void) { int saveconf = 0; - server.cluster = zmalloc(sizeof(clusterState)); + server.cluster = zmalloc(sizeof(struct clusterState)); server.cluster->myself = NULL; server.cluster->currentEpoch = 0; server.cluster->state = CLUSTER_FAIL; @@ -7656,3 +7656,43 @@ unsigned int countChannelsInSlot(unsigned int hashslot) { raxStop(&iter); return j; } + +int clusterNodeIsMyself(clusterNode *n) { + return n == server.cluster->myself; +} + +clusterNode* getMyClusterNode(void) { + return server.cluster->myself; +} + +int clusterManualFailoverTimeLimit(void) { + return server.cluster->mf_end; +} + +char* getMyClusterId(void) { + return server.cluster->myself->name; +} + +int getClusterSize(void) { + return dictSize(server.cluster->nodes); +} + +char** getClusterNodesList(size_t *numnodes) { + size_t count = dictSize(server.cluster->nodes); + char **ids = zmalloc((count+1)*CLUSTER_NAMELEN); + dictIterator *di = dictGetIterator(server.cluster->nodes); + dictEntry *de; + int j = 0; + while((de = dictNext(di)) != NULL) { + clusterNode *node = dictGetVal(de); + if (node->flags & (CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE)) continue; + ids[j] = zmalloc(CLUSTER_NAMELEN); + memcpy(ids[j],node->name,CLUSTER_NAMELEN); + j++; + } + *numnodes = j; + ids[j] = NULL; /* Null term so that FreeClusterNodesList does not need + * to also get the count argument. */ + dictReleaseIterator(di); + return ids; +} diff --git a/src/cluster_legacy.h b/src/cluster_legacy.h index 43234d889..3c2e148fb 100644 --- a/src/cluster_legacy.h +++ b/src/cluster_legacy.h @@ -211,4 +211,54 @@ static_assert(offsetof(clusterMsg, data) == 2256, "unexpected field offset"); master is up. */ #define CLUSTERMSG_FLAG0_EXT_DATA (1<<2) /* Message contains extension data */ +struct clusterState { + clusterNode *myself; /* This node */ + uint64_t currentEpoch; + int state; /* CLUSTER_OK, CLUSTER_FAIL, ... */ + int size; /* Num of master nodes with at least one slot */ + dict *nodes; /* Hash table of name -> clusterNode structures */ + dict *shards; /* Hash table of shard_id -> list (of nodes) structures */ + dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */ + clusterNode *migrating_slots_to[CLUSTER_SLOTS]; + clusterNode *importing_slots_from[CLUSTER_SLOTS]; + clusterNode *slots[CLUSTER_SLOTS]; + rax *slots_to_channels; + /* The following fields are used to take the slave state on elections. */ + mstime_t failover_auth_time; /* Time of previous or next election. */ + int failover_auth_count; /* Number of votes received so far. */ + int failover_auth_sent; /* True if we already asked for votes. */ + int failover_auth_rank; /* This slave rank for current auth request. */ + uint64_t failover_auth_epoch; /* Epoch of the current election. */ + int cant_failover_reason; /* Why a slave is currently not able to + failover. See the CANT_FAILOVER_* macros. */ + /* Manual failover state in common. */ + mstime_t mf_end; /* Manual failover time limit (ms unixtime). + It is zero if there is no MF in progress. */ + /* Manual failover state of master. */ + clusterNode *mf_slave; /* Slave performing the manual failover. */ + /* Manual failover state of slave. */ + long long mf_master_offset; /* Master offset the slave needs to start MF + or -1 if still not received. */ + int mf_can_start; /* If non-zero signal that the manual failover + can start requesting masters vote. */ + /* The following fields are used by masters to take state on elections. */ + uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */ + int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */ + /* Stats */ + /* Messages received and sent by type. */ + long long stats_bus_messages_sent[CLUSTERMSG_TYPE_COUNT]; + long long stats_bus_messages_received[CLUSTERMSG_TYPE_COUNT]; + long long stats_pfail_nodes; /* Number of nodes in PFAIL status, + excluding nodes without address. */ + unsigned long long stat_cluster_links_buffer_limit_exceeded; /* Total number of cluster links freed due to exceeding buffer limit */ + + /* Bit map for slots that are no longer claimed by the owner in cluster PING + * messages. During slot migration, the owner will stop claiming the slot after + * the ownership transfer. Set the bit corresponding to the slot when a node + * stops claiming the slot. This prevents spreading incorrect information (that + * source still owns the slot) using UPDATE messages. */ + unsigned char owner_not_claiming_slot[CLUSTER_SLOTS / 8]; +}; + + #endif //CLUSTER_LEGACY_H diff --git a/src/db.c b/src/db.c index c4c22907e..f77db3f88 100644 --- a/src/db.c +++ b/src/db.c @@ -2197,7 +2197,7 @@ int dbExpand(const redisDb *db, uint64_t db_size, dbKeyType keyType, int try_exp dict *d; if (server.cluster_enabled) { for (int i = 0; i < CLUSTER_SLOTS; i++) { - if (clusterNodeGetSlotBit(server.cluster->myself, i)) { + if (clusterNodeGetSlotBit(getMyClusterNode(), i)) { /* We don't know exact number of keys that would fall into each slot, but we can approximate it, assuming even distribution. */ if (keyType == DB_MAIN) { d = db->dict[i]; diff --git a/src/module.c b/src/module.c index 0428ac59c..b24527fc1 100644 --- a/src/module.c +++ b/src/module.c @@ -6466,7 +6466,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch c->flags &= ~(CLIENT_READONLY|CLIENT_ASKING); c->flags |= ctx->client->flags & (CLIENT_READONLY|CLIENT_ASKING); if (getNodeByQuery(c,c->cmd,c->argv,c->argc,NULL,&error_code) != - server.cluster->myself) + getMyClusterNode()) { sds msg = NULL; if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) { @@ -8917,23 +8917,7 @@ char **RM_GetClusterNodesList(RedisModuleCtx *ctx, size_t *numnodes) { UNUSED(ctx); if (!server.cluster_enabled) return NULL; - size_t count = dictSize(server.cluster->nodes); - char **ids = zmalloc((count+1)*REDISMODULE_NODE_ID_LEN); - dictIterator *di = dictGetIterator(server.cluster->nodes); - dictEntry *de; - int j = 0; - while((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetVal(de); - if (node->flags & (CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE)) continue; - ids[j] = zmalloc(REDISMODULE_NODE_ID_LEN); - memcpy(ids[j],node->name,REDISMODULE_NODE_ID_LEN); - j++; - } - *numnodes = j; - ids[j] = NULL; /* Null term so that FreeClusterNodesList does not need - * to also get the count argument. */ - dictReleaseIterator(di); - return ids; + return getClusterNodesList(numnodes); } /* Free the node list obtained with RedisModule_GetClusterNodesList. */ @@ -8947,7 +8931,7 @@ void RM_FreeClusterNodesList(char **ids) { * is disabled. */ const char *RM_GetMyClusterID(void) { if (!server.cluster_enabled) return NULL; - return server.cluster->myself->name; + return getMyClusterId(); } /* Return the number of nodes in the cluster, regardless of their state @@ -8956,7 +8940,7 @@ const char *RM_GetMyClusterID(void) { * cluster mode, zero is returned. */ size_t RM_GetClusterSize(void) { if (!server.cluster_enabled) return 0; - return dictSize(server.cluster->nodes); + return getClusterSize(); } /* Populate the specified info for the node having as ID the specified 'id', diff --git a/src/replication.c b/src/replication.c index 313f69152..e64251663 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3774,7 +3774,7 @@ void replicationCron(void) { * match the one stored into 'mf_master_offset' state. */ int manual_failover_in_progress = ((server.cluster_enabled && - server.cluster->mf_end) || + clusterManualFailoverTimeLimit()) || server.failover_end_time) && isPausedActionsWithUpdate(PAUSE_ACTION_REPLICA); diff --git a/src/script.c b/src/script.c index d0b9b9635..678773d96 100644 --- a/src/script.c +++ b/src/script.c @@ -429,7 +429,7 @@ static int scriptVerifyClusterState(scriptRunCtx *run_ctx, client *c, client *or c->flags &= ~(CLIENT_READONLY | CLIENT_ASKING); c->flags |= original_c->flags & (CLIENT_READONLY | CLIENT_ASKING); int hashslot = -1; - if (getNodeByQuery(c, c->cmd, c->argv, c->argc, &hashslot, &error_code) != server.cluster->myself) { + if (getNodeByQuery(c, c->cmd, c->argv, c->argc, &hashslot, &error_code) != getMyClusterNode()) { if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) { *err = sdsnew( "Script attempted to execute a write command while the " diff --git a/src/server.c b/src/server.c index e63a2ffff..327675690 100644 --- a/src/server.c +++ b/src/server.c @@ -4037,7 +4037,7 @@ int processCommand(client *c) { int error_code; clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc, &c->slot,&error_code); - if (n == NULL || n != server.cluster->myself) { + if (n == NULL || !clusterNodeIsMyself(n)) { if (c->cmd->proc == execCommand) { discardTransaction(c); } else { @@ -6838,7 +6838,7 @@ int redisIsSupervised(int mode) { int iAmMaster(void) { return ((!server.cluster_enabled && server.masterhost == NULL) || - (server.cluster_enabled && nodeIsMaster(server.cluster->myself))); + (server.cluster_enabled && nodeIsMaster(getMyClusterNode()))); } #ifdef REDIS_TEST diff --git a/src/server.h b/src/server.h index 902050889..13486a543 100644 --- a/src/server.h +++ b/src/server.h @@ -738,6 +738,7 @@ struct RedisModuleCtx; struct moduleLoadQueueEntry; struct RedisModuleKeyOptCtx; struct RedisModuleCommand; +struct clusterState; /* Each module type implementation should export a set of methods in order * to serialize and deserialize the value in the RDB file, rewrite the AOF