It compiles and doesn't crash immediately!

Former-commit-id: efaeca588717ca7cd44aa3502672d158acd94a6d
This commit is contained in:
John Sully 2019-04-02 16:47:05 -04:00
parent ff9d1d20e6
commit 5d3c28a902
13 changed files with 744 additions and 528 deletions

View File

@ -77,6 +77,12 @@ uint64_t clusterGetMaxEpoch(void);
int clusterBumpConfigEpochWithoutConsensus(void);
void moduleCallClusterReceivers(const char *sender_id, uint64_t module_id, uint8_t type, const unsigned char *payload, uint32_t len);
struct redisMaster *getFirstMaster()
{
serverAssert(listLength(server.masters) == 1);
return listFirst(server.masters)->value;
}
/* -----------------------------------------------------------------------------
* Initialization
* -------------------------------------------------------------------------- */
@ -536,7 +542,11 @@ void clusterReset(int hard) {
/* Turn into master. */
if (nodeIsSlave(myself)) {
clusterSetNodeAsMaster(myself);
replicationUnsetMaster();
if (listLength(server.masters) > 0)
{
serverAssert(listLength(server.masters) == 1);
replicationUnsetMaster(listFirst(server.masters)->value);
}
emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
}
@ -623,7 +633,7 @@ void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
/* If the server is starting up, don't accept cluster connections:
* UPDATE messages may interact with the database content. */
if (server.masterhost == NULL && server.loading) return;
if (listLength(server.masters) == 0 && server.loading) return;
while(max--) {
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
@ -1493,7 +1503,12 @@ int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link,
/* Check if this is our master and we have to change the
* replication target as well. */
if (nodeIsSlave(myself) && myself->slaveof == node)
replicationSetMaster(node->ip, node->port);
{
serverAssert(listLength(server.masters) == 1);
replicationUnsetMaster(listFirst(server.masters)->value);
replicationAddMaster(node->ip, node->port);
}
return 1;
}
@ -2296,7 +2311,7 @@ void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
/* Set the replication offset. */
if (nodeIsSlave(myself))
offset = replicationGetSlaveOffset();
offset = replicationGetSlaveOffset(getFirstMaster());
else
offset = server.master_repl_offset;
hdr->offset = htonu64(offset);
@ -2827,7 +2842,7 @@ int clusterGetSlaveRank(void) {
master = myself->slaveof;
if (master == NULL) return 0; /* Never called by slaves without master. */
myoffset = replicationGetSlaveOffset();
myoffset = replicationGetSlaveOffset(getFirstMaster());
for (j = 0; j < master->numslaves; j++)
if (master->slaves[j] != myself &&
!nodeCantFailover(master->slaves[j]) &&
@ -2913,7 +2928,7 @@ void clusterFailoverReplaceYourMaster(void) {
/* 1) Turn this node into a master. */
clusterSetNodeAsMaster(myself);
replicationUnsetMaster();
replicationUnsetMaster(getFirstMaster());
/* 2) Claim all the slots assigned to our master. */
for (j = 0; j < CLUSTER_SLOTS; j++) {
@ -2985,11 +3000,11 @@ void clusterHandleSlaveFailover(void) {
/* Set data_age to the number of seconds we are disconnected from
* the master. */
if (server.repl_state == REPL_STATE_CONNECTED) {
data_age = (mstime_t)(server.unixtime - server.master->lastinteraction)
if (getFirstMaster()->repl_state == REPL_STATE_CONNECTED) {
data_age = (mstime_t)(server.unixtime - getFirstMaster()->master->lastinteraction)
* 1000;
} else {
data_age = (mstime_t)(server.unixtime - server.repl_down_since) * 1000;
data_age = (mstime_t)(server.unixtime - getFirstMaster()->repl_down_since) * 1000;
}
/* Remove the node timeout from the data age as it is fine that we are
@ -3038,7 +3053,7 @@ void clusterHandleSlaveFailover(void) {
"(rank #%d, offset %lld).",
server.cluster->failover_auth_time - mstime(),
server.cluster->failover_auth_rank,
replicationGetSlaveOffset());
replicationGetSlaveOffset(getFirstMaster()));
/* Now that we have a scheduled election, broadcast our offset
* to all the other slaves so that they'll updated their offsets
* if our offset is better. */
@ -3291,7 +3306,7 @@ void clusterHandleManualFailover(void) {
if (server.cluster->mf_master_offset == 0) return; /* Wait for offset... */
if (server.cluster->mf_master_offset == replicationGetSlaveOffset()) {
if (server.cluster->mf_master_offset == replicationGetSlaveOffset(getFirstMaster())) {
/* Our replication offset matches the master replication offset
* announced after clients were paused. We can start the failover. */
server.cluster->mf_can_start = 1;
@ -3559,11 +3574,12 @@ void clusterCron(void) {
* enable it if we know the address of our master and it appears to
* be up. */
if (nodeIsSlave(myself) &&
server.masterhost == NULL &&
listLength(server.masters) == 0 &&
myself->slaveof &&
nodeHasAddr(myself->slaveof))
{
replicationSetMaster(myself->slaveof->ip, myself->slaveof->port);
replicationUnsetMaster(getFirstMaster());
replicationAddMaster(myself->slaveof->ip, myself->slaveof->port);
}
/* Abourt a manual failover if the timeout is reached. */
@ -3943,7 +3959,9 @@ void clusterSetMaster(clusterNode *n) {
}
myself->slaveof = n;
clusterNodeAddSlave(n,myself);
replicationSetMaster(n->ip, n->port);
if (listLength(server.masters))
replicationUnsetMaster(getFirstMaster());
replicationAddMaster(n->ip, n->port);
resetManualFailover();
}

View File

@ -354,9 +354,7 @@ void loadServerConfigFromString(char *config) {
} else if ((!strcasecmp(argv[0],"slaveof") ||
!strcasecmp(argv[0],"replicaof")) && argc == 3) {
slaveof_linenum = linenum;
server.masterhost = sdsnew(argv[1]);
server.masterport = atoi(argv[2]);
server.repl_state = REPL_STATE_CONNECT;
replicationAddMaster(sdsnew(argv[1]), atoi(argv[2]));
} else if ((!strcasecmp(argv[0],"repl-ping-slave-period") ||
!strcasecmp(argv[0],"repl-ping-replica-period")) &&
argc == 2)
@ -400,11 +398,11 @@ void loadServerConfigFromString(char *config) {
goto loaderr;
}
} else if (!strcasecmp(argv[0],"masteruser") && argc == 2) {
zfree(server.masteruser);
server.masteruser = argv[1][0] ? zstrdup(argv[1]) : NULL;
zfree(server.default_masteruser);
server.default_masteruser = argv[1][0] ? zstrdup(argv[1]) : NULL;
} else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
zfree(server.masterauth);
server.masterauth = argv[1][0] ? zstrdup(argv[1]) : NULL;
zfree(server.default_masterauth);
server.default_masterauth = argv[1][0] ? zstrdup(argv[1]) : NULL;
} else if ((!strcasecmp(argv[0],"slave-serve-stale-data") ||
!strcasecmp(argv[0],"replica-serve-stale-data"))
&& argc == 2)
@ -866,7 +864,7 @@ void loadServerConfigFromString(char *config) {
}
/* Sanity checks. */
if (server.cluster_enabled && server.masterhost) {
if (server.cluster_enabled && listLength(server.masters)) {
linenum = slaveof_linenum;
i = linenum-1;
err = "replicaof directive not allowed in cluster mode";
@ -986,11 +984,11 @@ void configSetCommand(client *c) {
ACLSetUser(DefaultUser,aclop,sdslen(aclop));
sdsfree(aclop);
} config_set_special_field("masteruser") {
zfree(server.masteruser);
server.masteruser = ((char*)ptrFromObj(o))[0] ? zstrdup(ptrFromObj(o)) : NULL;
zfree(server.default_masteruser);
server.default_masteruser = ((char*)ptrFromObj(o))[0] ? zstrdup(ptrFromObj(o)) : NULL;
} config_set_special_field("masterauth") {
zfree(server.masterauth);
server.masterauth = ((char*)ptrFromObj(o))[0] ? zstrdup(ptrFromObj(o)) : NULL;
zfree(server.default_masterauth);
server.default_masterauth = ((char*)ptrFromObj(o))[0] ? zstrdup(ptrFromObj(o)) : NULL;
} config_set_special_field("cluster-announce-ip") {
zfree(server.cluster_announce_ip);
server.cluster_announce_ip = ((char*)ptrFromObj(o))[0] ? zstrdup(ptrFromObj(o)) : NULL;
@ -1405,8 +1403,8 @@ void configGetCommand(client *c) {
/* String values */
config_get_string_field("dbfilename",server.rdb_filename);
config_get_string_field("masteruser",server.masteruser);
config_get_string_field("masterauth",server.masterauth);
config_get_string_field("masteruser",server.default_masteruser);
config_get_string_field("masterauth",server.default_masterauth);
config_get_string_field("cluster-announce-ip",server.cluster_announce_ip);
config_get_string_field("unixsocket",server.unixsocket);
config_get_string_field("logfile",server.logfile);
@ -1619,14 +1617,25 @@ void configGetCommand(client *c) {
char *optname = stringmatch(pattern,"slaveof",1) ?
"slaveof" : "replicaof";
char buf[256];
addReplyBulkCString(c,optname);
if (server.masterhost)
snprintf(buf,sizeof(buf),"%s %d",
server.masterhost, server.masterport);
else
if (listLength(server.masters) == 0)
{
buf[0] = '\0';
addReplyBulkCString(c,optname);
addReplyBulkCString(c,buf);
}
else
{
listIter li;
listNode *ln;
listRewind(server.masters, &li);
while ((ln = listNext(&li)))
{
struct redisMaster *mi = (struct redisMaster*)listNodeValue(ln);
snprintf(buf,sizeof(buf),"%s %d",
mi->masterhost, mi->masterport);
addReplyBulkCString(c,buf);
}
}
matches++;
}
if (stringmatch(pattern,"notify-keyspace-events",1)) {
@ -2019,18 +2028,26 @@ void rewriteConfigDirOption(struct rewriteConfigState *state) {
/* Rewrite the slaveof option. */
void rewriteConfigSlaveofOption(struct rewriteConfigState *state, char *option) {
sds line;
/* If this is a master, we want all the slaveof config options
* in the file to be removed. Note that if this is a cluster instance
* we don't want a slaveof directive inside redis.conf. */
if (server.cluster_enabled || server.masterhost == NULL) {
if (server.cluster_enabled || listLength(server.masters) == 0) {
rewriteConfigMarkAsProcessed(state,option);
return;
}
listIter li;
listNode *ln;
listRewind(server.masters, &li);
while ((ln = listNext(&li)))
{
struct redisMaster *mi = (struct redisMaster*)listNodeValue(ln);
sds line;
line = sdscatprintf(sdsempty(),"%s %s %d", option,
server.masterhost, server.masterport);
mi->masterhost, mi->masterport);
rewriteConfigRewriteLine(state,option,line,1);
}
}
/* Rewrite the notify-keyspace-events option. */
@ -2285,8 +2302,8 @@ int rewriteConfig(char *path) {
rewriteConfigDirOption(state);
rewriteConfigSlaveofOption(state,"replicaof");
rewriteConfigStringOption(state,"replica-announce-ip",server.slave_announce_ip,CONFIG_DEFAULT_SLAVE_ANNOUNCE_IP);
rewriteConfigStringOption(state,"masteruser",server.masteruser,NULL);
rewriteConfigStringOption(state,"masterauth",server.masterauth,NULL);
rewriteConfigStringOption(state,"masteruser",server.default_masteruser,NULL);
rewriteConfigStringOption(state,"masterauth",server.default_masterauth,NULL);
rewriteConfigStringOption(state,"cluster-announce-ip",server.cluster_announce_ip,NULL);
rewriteConfigYesNoOption(state,"replica-serve-stale-data",server.repl_serve_stale_data,CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA);
rewriteConfigYesNoOption(state,"replica-read-only",server.repl_slave_ro,CONFIG_DEFAULT_SLAVE_READ_ONLY);

View File

@ -105,7 +105,7 @@ robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
/* Key expired. If we are in the context of a master, expireIfNeeded()
* returns 0 only when the key does not exist at all, so it's safe
* to return NULL ASAP. */
if (server.masterhost == NULL) {
if (listLength(server.masters) == 0) {
server.stat_keyspace_misses++;
return NULL;
}
@ -123,7 +123,7 @@ robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
*
* Notably this covers GETs when slaves are used to scale reads. */
if (server.current_client &&
server.current_client != server.master &&
!FActiveMaster(server.current_client) &&
server.current_client->cmd &&
server.current_client->cmd->flags & CMD_READONLY)
{
@ -276,7 +276,7 @@ robj *dbRandomKey(redisDb *db) {
key = dictGetKey(de);
keyobj = createStringObject(key,sdslen(key));
if (dictFind(db->expires,key)) {
if (allvolatile && server.masterhost && --maxtries == 0) {
if (allvolatile && listLength(server.masters) && --maxtries == 0) {
/* If the DB is composed only of keys with an expire set,
* it could happen that all the keys are already logically
* expired in the slave, so the function cannot stop because
@ -1109,7 +1109,7 @@ void setExpire(client *c, redisDb *db, robj *key, long long when) {
de = dictAddOrFind(db->expires,dictGetKey(kde));
dictSetSignedIntegerVal(de,when);
int writable_slave = server.masterhost && server.repl_slave_ro == 0;
int writable_slave = listLength(server.masters) && server.repl_slave_ro == 0;
if (c && writable_slave && !(c->flags & CLIENT_MASTER))
rememberSlaveKeyWithExpire(db,key);
}
@ -1203,7 +1203,7 @@ int expireIfNeeded(redisDb *db, robj *key) {
* Still we try to return the right information to the caller,
* that is, 0 if we think the key should be still valid, 1 if
* we think the key is expired at this time. */
if (server.masterhost != NULL) return 1;
if (listLength(server.masters)) return 1;
/* Delete the key */
server.stat_expiredkeys++;

View File

@ -448,7 +448,7 @@ int freeMemoryIfNeeded(void) {
serverAssert(GlobalLocksAcquired());
/* By default replicas should ignore maxmemory
* and just be masters exact copies. */
if (server.masterhost && server.repl_slave_ignore_maxmemory) return C_OK;
if (listLength(server.masters) && server.repl_slave_ignore_maxmemory) return C_OK;
size_t mem_reported, mem_tofree, mem_freed;
mstime_t latency, eviction_latency;

View File

@ -424,7 +424,7 @@ void expireGenericCommand(client *c, long long basetime, int unit) {
*
* Instead we take the other branch of the IF statement setting an expire
* (possibly in the past) and wait for an explicit DEL from the master. */
if (when <= mstime() && !server.loading && !server.masterhost) {
if (when <= mstime() && !server.loading && !listLength(server.masters)) {
robj *aux;
int deleted = server.lazyfree_lazy_expire ? dbAsyncDelete(c->db,key) :

View File

@ -1448,7 +1448,7 @@ int RM_GetContextFlags(RedisModuleCtx *ctx) {
flags |= REDISMODULE_CTX_FLAGS_RDB;
/* Replication flags */
if (server.masterhost == NULL) {
if (listLength(server.masters) == 0) {
flags |= REDISMODULE_CTX_FLAGS_MASTER;
} else {
flags |= REDISMODULE_CTX_FLAGS_SLAVE;

View File

@ -122,7 +122,7 @@ void execCommand(client *c) {
int orig_argc;
struct redisCommand *orig_cmd;
int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */
int was_master = server.masterhost == NULL;
int was_master = listLength(server.masters) == 0;
if (!(c->flags & CLIENT_MULTI)) {
addReplyError(c,"EXEC without MULTI");
@ -147,7 +147,7 @@ void execCommand(client *c) {
* was initiated when the instance was a master or a writable replica and
* then the configuration changed (for example instance was turned into
* a replica). */
if (!server.loading && server.masterhost && server.repl_slave_ro &&
if (!server.loading && listLength(server.masters) && server.repl_slave_ro &&
!(c->flags & CLIENT_MASTER) && c->mstate.cmd_flags & CMD_WRITE)
{
addReplyError(c,
@ -193,7 +193,7 @@ void execCommand(client *c) {
/* Make sure the EXEC command will be propagated as well if MULTI
* was already propagated. */
if (must_propagate) {
int is_master = server.masterhost == NULL;
int is_master = listLength(server.masters) == 0;
server.dirty++;
/* If inside the MULTI/EXEC block this instance was suddenly
* switched from master to slave (using the SLAVEOF command), the

View File

@ -1273,13 +1273,13 @@ void freeClient(client *c) {
*
* Note that before doing this we make sure that the client is not in
* some unexpected state, by checking its flags. */
if (server.master && c->flags & CLIENT_MASTER) {
if (FActiveMaster(c)) {
serverLog(LL_WARNING,"Connection with master lost.");
if (!(c->flags & (CLIENT_CLOSE_AFTER_REPLY|
CLIENT_CLOSE_ASAP|
CLIENT_BLOCKED)))
{
replicationCacheMaster(c);
replicationCacheMaster(MasterInfoFromClient(c), c);
return;
}
}
@ -1339,7 +1339,7 @@ void freeClient(client *c) {
/* Master/slave cleanup Case 2:
* we lost the connection with the master. */
if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection();
if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection(MasterInfoFromClient(c));
/* If this client was scheduled for async freeing we need to remove it
* from the queue. */
@ -2564,7 +2564,7 @@ void helloCommand(client *c) {
if (!server.sentinel_mode) {
addReplyBulkCString(c,"role");
addReplyBulkCString(c,server.masterhost ? "replica" : "master");
addReplyBulkCString(c,listLength(server.masters) ? "replica" : "master");
}
addReplyBulkCString(c,"modules");

View File

@ -1859,8 +1859,15 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
* our cached time since it is used to create and update the last
* interaction time with clients and for other important things. */
updateCachedTime();
if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER)
replicationSendNewlineToMaster();
listIter li;
listNode *ln;
listRewind(server.masters, &li);
while ((ln = listNext(&li)))
{
struct redisMaster *mi = (struct redisMaster*)listNodeValue(ln);
if (mi->repl_state == REPL_STATE_TRANSFER)
replicationSendNewlineToMaster(mi);
}
loadingProgress(r->processed_bytes);
processEventsWhileBlocked(serverTL - server.rgthreadvar);
}
@ -2050,7 +2057,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
* received from the master. In the latter case, the master is
* responsible for key expiry. If we would expire keys here, the
* snapshot taken by the master may not be reflected on the slave. */
if (server.masterhost == NULL && !loading_aof && expiretime != -1 && expiretime < now) {
if (listLength(server.masters) == 0 && !loading_aof && expiretime != -1 && expiretime < now) {
decrRefCount(key);
decrRefCount(val);
} else {
@ -2528,7 +2535,7 @@ rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) {
* connects to us, the NULL repl_backlog will trigger a full
* synchronization, at the same time we will use a new replid and clear
* replid2. */
if (!server.masterhost && server.repl_backlog) {
if (!listLength(server.masters) && server.repl_backlog) {
/* Note that when server.slaveseldb is -1, it means that this master
* didn't apply any write commands after a full synchronization.
* So we can let repl_stream_db be 0, this allows a restarted slave
@ -2538,10 +2545,17 @@ rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) {
return rsi;
}
if (listLength(server.masters) > 1)
{
// BUGBUG, warn user about this incomplete implementation
serverLog(LL_WARNING, "Warning: Only backing up first master's information in RDB");
}
struct redisMaster *miFirst = listLength(server.masters) ? listNodeValue(listFirst(server.masters)) : NULL;
/* If the instance is a slave we need a connected master
* in order to fetch the currently selected DB. */
if (server.master) {
rsi->repl_stream_db = server.master->db->id;
if (miFirst && miFirst->master) {
rsi->repl_stream_db = miFirst->master->db->id;
return rsi;
}
@ -2550,8 +2564,8 @@ rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) {
* increment the master_repl_offset only from data arriving from the
* master, so if we are disconnected the offset in the cached master
* is valid. */
if (server.cached_master) {
rsi->repl_stream_db = server.cached_master->db->id;
if (miFirst && miFirst->cached_master) {
rsi->repl_stream_db = miFirst->cached_master->db->id;
return rsi;
}
return NULL;

File diff suppressed because it is too large Load Diff

View File

@ -532,7 +532,7 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
luaPushError(lua,
"Write commands not allowed after non deterministic commands. Call redis.replicate_commands() at the start of your script in order to switch to single commands replication mode.");
goto cleanup;
} else if (server.masterhost && server.repl_slave_ro &&
} else if (listLength(server.masters) && server.repl_slave_ro &&
!server.loading &&
!(server.lua_caller->flags & CLIENT_MASTER))
{
@ -558,7 +558,7 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
* in the middle. */
if (server.maxmemory && /* Maxmemory is actually enabled. */
!server.loading && /* Don't care about mem if loading. */
!server.masterhost && /* Slave must execute the script. */
!listLength(server.masters) && /* Slave must execute the script. */
server.lua_write_dirty == 0 && /* Script had no side effects so far. */
(cmd->flags & CMD_DENYOOM))
{
@ -1420,8 +1420,15 @@ void evalGenericCommand(client *c, int evalsha) {
/* Restore the client that was protected when the script timeout
* was detected. */
unprotectClient(c);
if (server.masterhost && server.master)
queueClientForReprocessing(server.master);
listIter li;
listNode *ln;
listRewind(server.masters, &li);
while ((ln = listNext(&li)))
{
struct redisMaster *mi = (struct redisMaster*)listNodeValue(ln);
if (mi->master)
queueClientForReprocessing(mi->master);
}
}
server.lua_caller = NULL;

View File

@ -1046,7 +1046,7 @@ void serverLogRaw(int level, const char *msg) {
} else if (pid != server.pid) {
role_char = 'C'; /* RDB / AOF writing child. */
} else {
role_char = (server.masterhost ? 'S':'M'); /* Slave or Master. */
role_char = (listLength(server.masters) ? 'S':'M'); /* Slave or Master. */
}
fprintf(fp,"%d:%c %s %c %s\n",
(int)getpid(),role_char, buf,c[level],msg);
@ -1687,9 +1687,9 @@ void clientsCron(int iel) {
void databasesCron(void) {
/* Expire keys by random sampling. Not required for slaves
* as master will synthesize DELs for us. */
if (server.active_expire_enabled && server.masterhost == NULL) {
if (server.active_expire_enabled && listLength(server.masters) == 0) {
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
} else if (server.masterhost != NULL) {
} else if (listLength(server.masters)) {
expireSlaveKeys();
}
@ -2074,7 +2074,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
/* Run a fast expire cycle (the called function will return
* ASAP if a fast cycle is not needed). */
if (server.active_expire_enabled && server.masterhost == NULL)
if (server.active_expire_enabled && listLength(server.masters) == 0)
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
/* Send all the slaves an ACK request if at least one client blocked
@ -2254,6 +2254,19 @@ extern "C" void createSharedObjects(void) {
shared.maxstring = sdsnew("maxstring");
}
void initMasterInfo(redisMaster *master)
{
master->masterauth = NULL;
master->masterhost = NULL;
master->masterport = 6379;
master->master = NULL;
master->cached_master = NULL;
master->master_initial_offset = -1;
master->repl_state = REPL_STATE_NONE;
master->repl_down_since = 0; /* Never connected, repl is down since EVER. */
}
void initServerConfig(void) {
int j;
@ -2383,19 +2396,12 @@ void initServerConfig(void) {
appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
/* Replication related */
server.masterauth = NULL;
server.masterhost = NULL;
server.masterport = 6379;
server.master = NULL;
server.cached_master = NULL;
server.master_initial_offset = -1;
server.repl_state = REPL_STATE_NONE;
server.masters = listCreate();
server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT;
server.repl_serve_stale_data = CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA;
server.repl_slave_ro = CONFIG_DEFAULT_SLAVE_READ_ONLY;
server.repl_slave_ignore_maxmemory = CONFIG_DEFAULT_SLAVE_IGNORE_MAXMEMORY;
server.repl_slave_lazy_flush = CONFIG_DEFAULT_SLAVE_LAZY_FLUSH;
server.repl_down_since = 0; /* Never connected, repl is down since EVER. */
server.repl_disable_tcp_nodelay = CONFIG_DEFAULT_REPL_DISABLE_TCP_NODELAY;
server.repl_diskless_sync = CONFIG_DEFAULT_REPL_DISKLESS_SYNC;
server.repl_diskless_sync_delay = CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY;
@ -3504,7 +3510,7 @@ int processCommand(client *c) {
* and if this is a master instance. */
int deny_write_type = writeCommandsDeniedByDiskError();
if (deny_write_type != DISK_ERROR_TYPE_NONE &&
server.masterhost == NULL &&
listLength(server.masters) == 0 &&
(c->cmd->flags & CMD_WRITE ||
c->cmd->proc == pingCommand))
{
@ -3521,7 +3527,7 @@ int processCommand(client *c) {
/* Don't accept write commands if there are not enough good slaves and
* user configured the min-slaves-to-write option. */
if (server.masterhost == NULL &&
if (listLength(server.masters) == 0 &&
server.repl_min_slaves_to_write &&
server.repl_min_slaves_max_lag &&
c->cmd->flags & CMD_WRITE &&
@ -3534,7 +3540,7 @@ int processCommand(client *c) {
/* Don't accept write commands if this is a read only slave. But
* accept write commands if this is our master. */
if (server.masterhost && server.repl_slave_ro &&
if (listLength(server.masters) && server.repl_slave_ro &&
!(c->flags & CLIENT_MASTER) &&
c->cmd->flags & CMD_WRITE)
{
@ -3557,7 +3563,7 @@ int processCommand(client *c) {
/* Only allow commands with flag "t", such as INFO, SLAVEOF and so on,
* when slave-serve-stale-data is no and we are a slave with a broken
* link with master. */
if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
if (FBrokenLinkToMaster() &&
server.repl_serve_stale_data == 0 &&
!(c->cmd->flags & CMD_STALE))
{
@ -4284,46 +4290,57 @@ extern "C" sds genRedisInfoString(const char *section) {
info = sdscatprintf(info,
"# Replication\r\n"
"role:%s\r\n",
server.masterhost == NULL ? "master" : "slave");
if (server.masterhost) {
long long slave_repl_offset = 1;
listLength(server.masters) == 0 ? "master" : "slave");
if (listLength(server.masters)) {
listIter li;
listNode *ln;
listRewind(server.masters, &li);
if (server.master)
slave_repl_offset = server.master->reploff;
else if (server.cached_master)
slave_repl_offset = server.cached_master->reploff;
int cmasters = 0;
while ((ln = listNext(&li)))
{
long long slave_repl_offset = 1;
redisMaster *mi = (redisMaster*)listNodeValue(ln);
info = sdscatprintf(info, "Master %d: \r\n", cmasters);
++cmasters;
if (mi->master)
slave_repl_offset = mi->master->reploff;
else if (mi->cached_master)
slave_repl_offset = mi->cached_master->reploff;
info = sdscatprintf(info,
"master_host:%s\r\n"
"master_port:%d\r\n"
"master_link_status:%s\r\n"
"master_last_io_seconds_ago:%d\r\n"
"master_sync_in_progress:%d\r\n"
"slave_repl_offset:%lld\r\n"
,server.masterhost,
server.masterport,
(server.repl_state == REPL_STATE_CONNECTED) ?
"\tmaster_host:%s\r\n"
"\tmaster_port:%d\r\n"
"\tmaster_link_status:%s\r\n"
"\tmaster_last_io_seconds_ago:%d\r\n"
"\tmaster_sync_in_progress:%d\r\n"
"\tslave_repl_offset:%lld\r\n"
,mi->masterhost,
mi->masterport,
(mi->repl_state == REPL_STATE_CONNECTED) ?
"up" : "down",
server.master ?
((int)(server.unixtime-server.master->lastinteraction)) : -1,
server.repl_state == REPL_STATE_TRANSFER,
mi->master ?
((int)(server.unixtime-mi->master->lastinteraction)) : -1,
mi->repl_state == REPL_STATE_TRANSFER,
slave_repl_offset
);
if (server.repl_state == REPL_STATE_TRANSFER) {
if (mi->repl_state == REPL_STATE_TRANSFER) {
info = sdscatprintf(info,
"master_sync_left_bytes:%lld\r\n"
"master_sync_last_io_seconds_ago:%d\r\n"
"\tmaster_sync_left_bytes:%lld\r\n"
"\tmaster_sync_last_io_seconds_ago:%d\r\n"
, (long long)
(server.repl_transfer_size - server.repl_transfer_read),
(int)(server.unixtime-server.repl_transfer_lastio)
(mi->repl_transfer_size - mi->repl_transfer_read),
(int)(server.unixtime-mi->repl_transfer_lastio)
);
}
if (server.repl_state != REPL_STATE_CONNECTED) {
if (mi->repl_state != REPL_STATE_CONNECTED) {
info = sdscatprintf(info,
"master_link_down_since_seconds:%jd\r\n",
(intmax_t)server.unixtime-server.repl_down_since);
"\tmaster_link_down_since_seconds:%jd\r\n",
(intmax_t)server.unixtime-mi->repl_down_since);
}
}
info = sdscatprintf(info,
"slave_priority:%d\r\n"
@ -4694,7 +4711,7 @@ void loadDataFromDisk(void) {
(float)(ustime()-start)/1000000);
/* Restore the replication ID / offset from the RDB file. */
if ((server.masterhost || (server.cluster_enabled && nodeIsSlave(server.cluster->myself)))&&
if ((listLength(server.masters) || (server.cluster_enabled && nodeIsSlave(server.cluster->myself)))&&
rsi.repl_id_is_set &&
rsi.repl_offset != -1 &&
/* Note that older implementations may save a repl_stream_db
@ -4704,11 +4721,19 @@ void loadDataFromDisk(void) {
{
memcpy(server.replid,rsi.repl_id,sizeof(server.replid));
server.master_repl_offset = rsi.repl_offset;
listIter li;
listNode *ln;
listRewind(server.masters, &li);
while ((ln = listNext(&li)))
{
redisMaster *mi = (redisMaster*)listNodeValue(ln);
/* If we are a slave, create a cached master from this
* information, in order to allow partial resynchronizations
* with masters. */
replicationCacheMasterUsingMyself();
selectDb(server.cached_master,rsi.repl_stream_db);
replicationCacheMasterUsingMyself(mi);
selectDb(mi->cached_master,rsi.repl_stream_db);
}
}
} else if (errno != ENOENT) {
serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));

View File

@ -1070,6 +1070,33 @@ struct redisServerThreadVars {
struct fastlock lockPendingWrite;
};
struct redisMaster {
char *masteruser; /* AUTH with this user and masterauth with master */
char *masterauth; /* AUTH with this password with master */
char *masterhost; /* Hostname of master */
int masterport; /* Port of master */
client *cached_master; /* Cached master to be reused for PSYNC. */
client *master;
/* The following two fields is where we store master PSYNC replid/offset
* while the PSYNC is in progress. At the end we'll copy the fields into
* the server->master client structure. */
char master_replid[CONFIG_RUN_ID_SIZE+1]; /* Master PSYNC runid. */
long long master_initial_offset; /* Master PSYNC offset. */
int repl_state; /* Replication status if the instance is a slave */
off_t repl_transfer_size; /* Size of RDB to read from master during sync. */
off_t repl_transfer_read; /* Amount of RDB read from master during sync. */
off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */
int repl_transfer_s; /* Slave -> Master SYNC socket */
int repl_transfer_fd; /* Slave -> Master SYNC temp file descriptor */
char *repl_transfer_tmpfile; /* Slave-> master SYNC temp file name */
time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */
time_t repl_down_since; /* Unix time at which link with master went down */
unsigned char master_uuid[UUID_BINARY_LEN]; /* Used during sync with master, this is our master's UUID */
/* After we've connected with our master use the UUID in server.master */
};
struct redisServer {
/* General */
pid_t pid; /* Main process pid. */
@ -1183,6 +1210,8 @@ struct redisServer {
int idx;
} inst_metric[STATS_METRIC_COUNT];
/* Configuration */
char *default_masteruser; /* AUTH with this user and masterauth with master */
char *default_masterauth; /* AUTH with this password with master */
int verbosity; /* Loglevel in redis.conf */
int maxidletime; /* Client timeout in seconds */
int tcpkeepalive; /* Set SO_KEEPALIVE if non-zero. */
@ -1296,35 +1325,16 @@ struct redisServer {
int repl_diskless_sync; /* Send RDB to slaves sockets directly. */
int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */
/* Replication (slave) */
char *masteruser; /* AUTH with this user and masterauth with master */
char *masterauth; /* AUTH with this password with master */
char *masterhost; /* Hostname of master */
int masterport; /* Port of master */
list *masters;
int repl_timeout; /* Timeout after N seconds of master idle */
client *master; /* Client that is master for this slave */
client *cached_master; /* Cached master to be reused for PSYNC. */
int repl_syncio_timeout; /* Timeout for synchronous I/O calls */
int repl_state; /* Replication status if the instance is a slave */
off_t repl_transfer_size; /* Size of RDB to read from master during sync. */
off_t repl_transfer_read; /* Amount of RDB read from master during sync. */
off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */
int repl_transfer_s; /* Slave -> Master SYNC socket */
int repl_transfer_fd; /* Slave -> Master SYNC temp file descriptor */
char *repl_transfer_tmpfile; /* Slave-> master SYNC temp file name */
time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */
int repl_disable_tcp_nodelay; /* Disable TCP_NODELAY after SYNC? */
int repl_serve_stale_data; /* Serve stale data when link is down? */
int repl_slave_ro; /* Slave is read only? */
int repl_slave_ignore_maxmemory; /* If true slaves do not evict. */
time_t repl_down_since; /* Unix time at which link with master went down */
int repl_disable_tcp_nodelay; /* Disable TCP_NODELAY after SYNC? */
int slave_priority; /* Reported in INFO and used by Sentinel. */
int slave_announce_port; /* Give the master this listening port. */
char *slave_announce_ip; /* Give the master this ip address. */
/* The following two fields is where we store master PSYNC replid/offset
* while the PSYNC is in progress. At the end we'll copy the fields into
* the server->master client structure. */
char master_replid[CONFIG_RUN_ID_SIZE+1]; /* Master PSYNC runid. */
long long master_initial_offset; /* Master PSYNC offset. */
int repl_slave_lazy_flush; /* Lazy FLUSHALL before loading DB? */
/* Replication script cache. */
dict *repl_scriptcache_dict; /* SHA1 all slaves are aware of. */
@ -1436,8 +1446,6 @@ struct redisServer {
int fActiveReplica; /* Can this replica also be a master? */
unsigned char uuid[UUID_BINARY_LEN]; /* This server's UUID - populated on boot */
unsigned char master_uuid[UUID_BINARY_LEN]; /* Used during sync with master, this is our master's UUID */
/* After we've connected with our master use the UUID in server.master */
struct fastlock flock;
};
@ -1771,16 +1779,17 @@ ssize_t syncRead(int fd, char *ptr, ssize_t size, long long timeout);
ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout);
/* Replication */
void initMasterInfo(struct redisMaster *master);
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen);
void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc);
void updateSlavesWaitingBgsave(int bgsaveerr, int type);
void replicationCron(void);
void replicationHandleMasterDisconnection(void);
void replicationCacheMaster(client *c);
void replicationHandleMasterDisconnection(struct redisMaster *mi);
void replicationCacheMaster(struct redisMaster *mi, client *c);
void resizeReplicationBacklog(long long newsize);
void replicationSetMaster(char *ip, int port);
void replicationUnsetMaster(void);
struct redisMaster *replicationAddMaster(char *ip, int port);
void replicationUnsetMaster(struct redisMaster *mi);
void refreshGoodSlavesCount(void);
void replicationScriptCacheInit(void);
void replicationScriptCacheFlush(void);
@ -1789,15 +1798,15 @@ int replicationScriptCacheExists(sds sha1);
void processClientsWaitingReplicas(void);
void unblockClientWaitingReplicas(client *c);
int replicationCountAcksByOffset(long long offset);
void replicationSendNewlineToMaster(void);
long long replicationGetSlaveOffset(void);
void replicationSendNewlineToMaster(struct redisMaster *mi);
long long replicationGetSlaveOffset(struct redisMaster *mi);
char *replicationGetSlaveName(client *c);
long long getPsyncInitialOffset(void);
int replicationSetupSlaveForFullResync(client *slave, long long offset);
void changeReplicationId(void);
void clearReplicationId2(void);
void chopReplicationBacklog(void);
void replicationCacheMasterUsingMyself(void);
void replicationCacheMasterUsingMyself(struct redisMaster *mi);
void feedReplicationBacklog(void *ptr, size_t len);
/* Generic persistence functions */
@ -2350,6 +2359,10 @@ void xtrimCommand(client *c);
void lolwutCommand(client *c);
void aclCommand(client *c);
int FBrokenLinkToMaster();
int FActiveMaster(client *c);
struct redisMaster *MasterInfoFromClient(client *c);
#if defined(__GNUC__)
#ifndef __cplusplus
void *calloc(size_t count, size_t size) __attribute__ ((deprecated));