Cluster refactor: Make clusterState private
Move clusterState into cluster_legacy.h. In order to achieve this some "accessor" methods needed to be added to the cluster API and some other minor refactors. Signed-off-by: Josh Hershberg <yehoshua@redis.com>
This commit is contained in:
parent
5292adb985
commit
98a6c44b75
@ -129,54 +129,8 @@ typedef struct clusterNode {
|
||||
list *fail_reports; /* List of nodes signaling this as failing */
|
||||
} clusterNode;
|
||||
|
||||
typedef struct clusterState {
|
||||
clusterNode *myself; /* This node */
|
||||
uint64_t currentEpoch;
|
||||
int state; /* CLUSTER_OK, CLUSTER_FAIL, ... */
|
||||
int size; /* Num of master nodes with at least one slot */
|
||||
dict *nodes; /* Hash table of name -> clusterNode structures */
|
||||
dict *shards; /* Hash table of shard_id -> list (of nodes) structures */
|
||||
dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */
|
||||
clusterNode *migrating_slots_to[CLUSTER_SLOTS];
|
||||
clusterNode *importing_slots_from[CLUSTER_SLOTS];
|
||||
clusterNode *slots[CLUSTER_SLOTS];
|
||||
rax *slots_to_channels;
|
||||
/* The following fields are used to take the slave state on elections. */
|
||||
mstime_t failover_auth_time; /* Time of previous or next election. */
|
||||
int failover_auth_count; /* Number of votes received so far. */
|
||||
int failover_auth_sent; /* True if we already asked for votes. */
|
||||
int failover_auth_rank; /* This slave rank for current auth request. */
|
||||
uint64_t failover_auth_epoch; /* Epoch of the current election. */
|
||||
int cant_failover_reason; /* Why a slave is currently not able to
|
||||
failover. See the CANT_FAILOVER_* macros. */
|
||||
/* Manual failover state in common. */
|
||||
mstime_t mf_end; /* Manual failover time limit (ms unixtime).
|
||||
It is zero if there is no MF in progress. */
|
||||
/* Manual failover state of master. */
|
||||
clusterNode *mf_slave; /* Slave performing the manual failover. */
|
||||
/* Manual failover state of slave. */
|
||||
long long mf_master_offset; /* Master offset the slave needs to start MF
|
||||
or -1 if still not received. */
|
||||
int mf_can_start; /* If non-zero signal that the manual failover
|
||||
can start requesting masters vote. */
|
||||
/* The following fields are used by masters to take state on elections. */
|
||||
uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */
|
||||
int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */
|
||||
/* Stats */
|
||||
/* Messages received and sent by type. */
|
||||
long long stats_bus_messages_sent[CLUSTERMSG_TYPE_COUNT];
|
||||
long long stats_bus_messages_received[CLUSTERMSG_TYPE_COUNT];
|
||||
long long stats_pfail_nodes; /* Number of nodes in PFAIL status,
|
||||
excluding nodes without address. */
|
||||
unsigned long long stat_cluster_links_buffer_limit_exceeded; /* Total number of cluster links freed due to exceeding buffer limit */
|
||||
struct clusterState;
|
||||
|
||||
/* Bit map for slots that are no longer claimed by the owner in cluster PING
|
||||
* messages. During slot migration, the owner will stop claiming the slot after
|
||||
* the ownership transfer. Set the bit corresponding to the slot when a node
|
||||
* stops claiming the slot. This prevents spreading incorrect information (that
|
||||
* source still owns the slot) using UPDATE messages. */
|
||||
unsigned char owner_not_claiming_slot[CLUSTER_SLOTS / 8];
|
||||
} clusterState;
|
||||
|
||||
/* ---------------------- API exported outside cluster.c -------------------- */
|
||||
void clusterInit(void);
|
||||
@ -208,5 +162,11 @@ int clusterNodeGetSlotBit(clusterNode *n, int slot);
|
||||
void clusterUpdateMyselfHumanNodename(void);
|
||||
int isValidAuxString(char *s, unsigned int length);
|
||||
int getNodeDefaultClientPort(clusterNode *n);
|
||||
int clusterNodeIsMyself(clusterNode *n);
|
||||
clusterNode* getMyClusterNode(void);
|
||||
int clusterManualFailoverTimeLimit(void);
|
||||
char* getMyClusterId(void);
|
||||
int getClusterSize(void);
|
||||
char** getClusterNodesList(size_t *numnodes);
|
||||
|
||||
#endif /* __CLUSTER_H */
|
||||
|
@ -985,7 +985,7 @@ void clusterUpdateMyselfHumanNodename(void) {
|
||||
void clusterInit(void) {
|
||||
int saveconf = 0;
|
||||
|
||||
server.cluster = zmalloc(sizeof(clusterState));
|
||||
server.cluster = zmalloc(sizeof(struct clusterState));
|
||||
server.cluster->myself = NULL;
|
||||
server.cluster->currentEpoch = 0;
|
||||
server.cluster->state = CLUSTER_FAIL;
|
||||
@ -7656,3 +7656,43 @@ unsigned int countChannelsInSlot(unsigned int hashslot) {
|
||||
raxStop(&iter);
|
||||
return j;
|
||||
}
|
||||
|
||||
int clusterNodeIsMyself(clusterNode *n) {
|
||||
return n == server.cluster->myself;
|
||||
}
|
||||
|
||||
clusterNode* getMyClusterNode(void) {
|
||||
return server.cluster->myself;
|
||||
}
|
||||
|
||||
int clusterManualFailoverTimeLimit(void) {
|
||||
return server.cluster->mf_end;
|
||||
}
|
||||
|
||||
char* getMyClusterId(void) {
|
||||
return server.cluster->myself->name;
|
||||
}
|
||||
|
||||
int getClusterSize(void) {
|
||||
return dictSize(server.cluster->nodes);
|
||||
}
|
||||
|
||||
char** getClusterNodesList(size_t *numnodes) {
|
||||
size_t count = dictSize(server.cluster->nodes);
|
||||
char **ids = zmalloc((count+1)*CLUSTER_NAMELEN);
|
||||
dictIterator *di = dictGetIterator(server.cluster->nodes);
|
||||
dictEntry *de;
|
||||
int j = 0;
|
||||
while((de = dictNext(di)) != NULL) {
|
||||
clusterNode *node = dictGetVal(de);
|
||||
if (node->flags & (CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE)) continue;
|
||||
ids[j] = zmalloc(CLUSTER_NAMELEN);
|
||||
memcpy(ids[j],node->name,CLUSTER_NAMELEN);
|
||||
j++;
|
||||
}
|
||||
*numnodes = j;
|
||||
ids[j] = NULL; /* Null term so that FreeClusterNodesList does not need
|
||||
* to also get the count argument. */
|
||||
dictReleaseIterator(di);
|
||||
return ids;
|
||||
}
|
||||
|
@ -211,4 +211,54 @@ static_assert(offsetof(clusterMsg, data) == 2256, "unexpected field offset");
|
||||
master is up. */
|
||||
#define CLUSTERMSG_FLAG0_EXT_DATA (1<<2) /* Message contains extension data */
|
||||
|
||||
struct clusterState {
|
||||
clusterNode *myself; /* This node */
|
||||
uint64_t currentEpoch;
|
||||
int state; /* CLUSTER_OK, CLUSTER_FAIL, ... */
|
||||
int size; /* Num of master nodes with at least one slot */
|
||||
dict *nodes; /* Hash table of name -> clusterNode structures */
|
||||
dict *shards; /* Hash table of shard_id -> list (of nodes) structures */
|
||||
dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */
|
||||
clusterNode *migrating_slots_to[CLUSTER_SLOTS];
|
||||
clusterNode *importing_slots_from[CLUSTER_SLOTS];
|
||||
clusterNode *slots[CLUSTER_SLOTS];
|
||||
rax *slots_to_channels;
|
||||
/* The following fields are used to take the slave state on elections. */
|
||||
mstime_t failover_auth_time; /* Time of previous or next election. */
|
||||
int failover_auth_count; /* Number of votes received so far. */
|
||||
int failover_auth_sent; /* True if we already asked for votes. */
|
||||
int failover_auth_rank; /* This slave rank for current auth request. */
|
||||
uint64_t failover_auth_epoch; /* Epoch of the current election. */
|
||||
int cant_failover_reason; /* Why a slave is currently not able to
|
||||
failover. See the CANT_FAILOVER_* macros. */
|
||||
/* Manual failover state in common. */
|
||||
mstime_t mf_end; /* Manual failover time limit (ms unixtime).
|
||||
It is zero if there is no MF in progress. */
|
||||
/* Manual failover state of master. */
|
||||
clusterNode *mf_slave; /* Slave performing the manual failover. */
|
||||
/* Manual failover state of slave. */
|
||||
long long mf_master_offset; /* Master offset the slave needs to start MF
|
||||
or -1 if still not received. */
|
||||
int mf_can_start; /* If non-zero signal that the manual failover
|
||||
can start requesting masters vote. */
|
||||
/* The following fields are used by masters to take state on elections. */
|
||||
uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */
|
||||
int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */
|
||||
/* Stats */
|
||||
/* Messages received and sent by type. */
|
||||
long long stats_bus_messages_sent[CLUSTERMSG_TYPE_COUNT];
|
||||
long long stats_bus_messages_received[CLUSTERMSG_TYPE_COUNT];
|
||||
long long stats_pfail_nodes; /* Number of nodes in PFAIL status,
|
||||
excluding nodes without address. */
|
||||
unsigned long long stat_cluster_links_buffer_limit_exceeded; /* Total number of cluster links freed due to exceeding buffer limit */
|
||||
|
||||
/* Bit map for slots that are no longer claimed by the owner in cluster PING
|
||||
* messages. During slot migration, the owner will stop claiming the slot after
|
||||
* the ownership transfer. Set the bit corresponding to the slot when a node
|
||||
* stops claiming the slot. This prevents spreading incorrect information (that
|
||||
* source still owns the slot) using UPDATE messages. */
|
||||
unsigned char owner_not_claiming_slot[CLUSTER_SLOTS / 8];
|
||||
};
|
||||
|
||||
|
||||
#endif //CLUSTER_LEGACY_H
|
||||
|
2
src/db.c
2
src/db.c
@ -2197,7 +2197,7 @@ int dbExpand(const redisDb *db, uint64_t db_size, dbKeyType keyType, int try_exp
|
||||
dict *d;
|
||||
if (server.cluster_enabled) {
|
||||
for (int i = 0; i < CLUSTER_SLOTS; i++) {
|
||||
if (clusterNodeGetSlotBit(server.cluster->myself, i)) {
|
||||
if (clusterNodeGetSlotBit(getMyClusterNode(), i)) {
|
||||
/* We don't know exact number of keys that would fall into each slot, but we can approximate it, assuming even distribution. */
|
||||
if (keyType == DB_MAIN) {
|
||||
d = db->dict[i];
|
||||
|
24
src/module.c
24
src/module.c
@ -6466,7 +6466,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
|
||||
c->flags &= ~(CLIENT_READONLY|CLIENT_ASKING);
|
||||
c->flags |= ctx->client->flags & (CLIENT_READONLY|CLIENT_ASKING);
|
||||
if (getNodeByQuery(c,c->cmd,c->argv,c->argc,NULL,&error_code) !=
|
||||
server.cluster->myself)
|
||||
getMyClusterNode())
|
||||
{
|
||||
sds msg = NULL;
|
||||
if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) {
|
||||
@ -8917,23 +8917,7 @@ char **RM_GetClusterNodesList(RedisModuleCtx *ctx, size_t *numnodes) {
|
||||
UNUSED(ctx);
|
||||
|
||||
if (!server.cluster_enabled) return NULL;
|
||||
size_t count = dictSize(server.cluster->nodes);
|
||||
char **ids = zmalloc((count+1)*REDISMODULE_NODE_ID_LEN);
|
||||
dictIterator *di = dictGetIterator(server.cluster->nodes);
|
||||
dictEntry *de;
|
||||
int j = 0;
|
||||
while((de = dictNext(di)) != NULL) {
|
||||
clusterNode *node = dictGetVal(de);
|
||||
if (node->flags & (CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE)) continue;
|
||||
ids[j] = zmalloc(REDISMODULE_NODE_ID_LEN);
|
||||
memcpy(ids[j],node->name,REDISMODULE_NODE_ID_LEN);
|
||||
j++;
|
||||
}
|
||||
*numnodes = j;
|
||||
ids[j] = NULL; /* Null term so that FreeClusterNodesList does not need
|
||||
* to also get the count argument. */
|
||||
dictReleaseIterator(di);
|
||||
return ids;
|
||||
return getClusterNodesList(numnodes);
|
||||
}
|
||||
|
||||
/* Free the node list obtained with RedisModule_GetClusterNodesList. */
|
||||
@ -8947,7 +8931,7 @@ void RM_FreeClusterNodesList(char **ids) {
|
||||
* is disabled. */
|
||||
const char *RM_GetMyClusterID(void) {
|
||||
if (!server.cluster_enabled) return NULL;
|
||||
return server.cluster->myself->name;
|
||||
return getMyClusterId();
|
||||
}
|
||||
|
||||
/* Return the number of nodes in the cluster, regardless of their state
|
||||
@ -8956,7 +8940,7 @@ const char *RM_GetMyClusterID(void) {
|
||||
* cluster mode, zero is returned. */
|
||||
size_t RM_GetClusterSize(void) {
|
||||
if (!server.cluster_enabled) return 0;
|
||||
return dictSize(server.cluster->nodes);
|
||||
return getClusterSize();
|
||||
}
|
||||
|
||||
/* Populate the specified info for the node having as ID the specified 'id',
|
||||
|
@ -3774,7 +3774,7 @@ void replicationCron(void) {
|
||||
* match the one stored into 'mf_master_offset' state. */
|
||||
int manual_failover_in_progress =
|
||||
((server.cluster_enabled &&
|
||||
server.cluster->mf_end) ||
|
||||
clusterManualFailoverTimeLimit()) ||
|
||||
server.failover_end_time) &&
|
||||
isPausedActionsWithUpdate(PAUSE_ACTION_REPLICA);
|
||||
|
||||
|
@ -429,7 +429,7 @@ static int scriptVerifyClusterState(scriptRunCtx *run_ctx, client *c, client *or
|
||||
c->flags &= ~(CLIENT_READONLY | CLIENT_ASKING);
|
||||
c->flags |= original_c->flags & (CLIENT_READONLY | CLIENT_ASKING);
|
||||
int hashslot = -1;
|
||||
if (getNodeByQuery(c, c->cmd, c->argv, c->argc, &hashslot, &error_code) != server.cluster->myself) {
|
||||
if (getNodeByQuery(c, c->cmd, c->argv, c->argc, &hashslot, &error_code) != getMyClusterNode()) {
|
||||
if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) {
|
||||
*err = sdsnew(
|
||||
"Script attempted to execute a write command while the "
|
||||
|
@ -4037,7 +4037,7 @@ int processCommand(client *c) {
|
||||
int error_code;
|
||||
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
|
||||
&c->slot,&error_code);
|
||||
if (n == NULL || n != server.cluster->myself) {
|
||||
if (n == NULL || !clusterNodeIsMyself(n)) {
|
||||
if (c->cmd->proc == execCommand) {
|
||||
discardTransaction(c);
|
||||
} else {
|
||||
@ -6838,7 +6838,7 @@ int redisIsSupervised(int mode) {
|
||||
|
||||
int iAmMaster(void) {
|
||||
return ((!server.cluster_enabled && server.masterhost == NULL) ||
|
||||
(server.cluster_enabled && nodeIsMaster(server.cluster->myself)));
|
||||
(server.cluster_enabled && nodeIsMaster(getMyClusterNode())));
|
||||
}
|
||||
|
||||
#ifdef REDIS_TEST
|
||||
|
@ -738,6 +738,7 @@ struct RedisModuleCtx;
|
||||
struct moduleLoadQueueEntry;
|
||||
struct RedisModuleKeyOptCtx;
|
||||
struct RedisModuleCommand;
|
||||
struct clusterState;
|
||||
|
||||
/* Each module type implementation should export a set of methods in order
|
||||
* to serialize and deserialize the value in the RDB file, rewrite the AOF
|
||||
|
Loading…
x
Reference in New Issue
Block a user