From 5d3c28a90213dafb8eae5bc5f3d49631e490fea3 Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 2 Apr 2019 16:47:05 -0400 Subject: [PATCH] It compiles and doesn't crash immediately! Former-commit-id: efaeca588717ca7cd44aa3502672d158acd94a6d --- src/cluster.c | 46 ++- src/config.c | 79 ++-- src/db.c | 10 +- src/evict.c | 2 +- src/expire.c | 2 +- src/module.c | 2 +- src/multi.c | 6 +- src/networking.cpp | 12 +- src/rdb.c | 30 +- src/replication.cpp | 858 +++++++++++++++++++++++++------------------- src/scripting.cpp | 15 +- src/server.cpp | 137 ++++--- src/server.h | 73 ++-- 13 files changed, 744 insertions(+), 528 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index ec92bc4e3..e4b5464e1 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -77,6 +77,12 @@ uint64_t clusterGetMaxEpoch(void); int clusterBumpConfigEpochWithoutConsensus(void); void moduleCallClusterReceivers(const char *sender_id, uint64_t module_id, uint8_t type, const unsigned char *payload, uint32_t len); +struct redisMaster *getFirstMaster() +{ + serverAssert(listLength(server.masters) == 1); + return listFirst(server.masters)->value; +} + /* ----------------------------------------------------------------------------- * Initialization * -------------------------------------------------------------------------- */ @@ -536,7 +542,11 @@ void clusterReset(int hard) { /* Turn into master. */ if (nodeIsSlave(myself)) { clusterSetNodeAsMaster(myself); - replicationUnsetMaster(); + if (listLength(server.masters) > 0) + { + serverAssert(listLength(server.masters) == 1); + replicationUnsetMaster(listFirst(server.masters)->value); + } emptyDb(-1,EMPTYDB_NO_FLAGS,NULL); } @@ -623,7 +633,7 @@ void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { /* If the server is starting up, don't accept cluster connections: * UPDATE messages may interact with the database content. */ - if (server.masterhost == NULL && server.loading) return; + if (listLength(server.masters) == 0 && server.loading) return; while(max--) { cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); @@ -1493,7 +1503,12 @@ int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link, /* Check if this is our master and we have to change the * replication target as well. */ if (nodeIsSlave(myself) && myself->slaveof == node) - replicationSetMaster(node->ip, node->port); + { + serverAssert(listLength(server.masters) == 1); + replicationUnsetMaster(listFirst(server.masters)->value); + replicationAddMaster(node->ip, node->port); + } + return 1; } @@ -2296,7 +2311,7 @@ void clusterBuildMessageHdr(clusterMsg *hdr, int type) { /* Set the replication offset. */ if (nodeIsSlave(myself)) - offset = replicationGetSlaveOffset(); + offset = replicationGetSlaveOffset(getFirstMaster()); else offset = server.master_repl_offset; hdr->offset = htonu64(offset); @@ -2827,7 +2842,7 @@ int clusterGetSlaveRank(void) { master = myself->slaveof; if (master == NULL) return 0; /* Never called by slaves without master. */ - myoffset = replicationGetSlaveOffset(); + myoffset = replicationGetSlaveOffset(getFirstMaster()); for (j = 0; j < master->numslaves; j++) if (master->slaves[j] != myself && !nodeCantFailover(master->slaves[j]) && @@ -2913,7 +2928,7 @@ void clusterFailoverReplaceYourMaster(void) { /* 1) Turn this node into a master. */ clusterSetNodeAsMaster(myself); - replicationUnsetMaster(); + replicationUnsetMaster(getFirstMaster()); /* 2) Claim all the slots assigned to our master. */ for (j = 0; j < CLUSTER_SLOTS; j++) { @@ -2985,11 +3000,11 @@ void clusterHandleSlaveFailover(void) { /* Set data_age to the number of seconds we are disconnected from * the master. */ - if (server.repl_state == REPL_STATE_CONNECTED) { - data_age = (mstime_t)(server.unixtime - server.master->lastinteraction) + if (getFirstMaster()->repl_state == REPL_STATE_CONNECTED) { + data_age = (mstime_t)(server.unixtime - getFirstMaster()->master->lastinteraction) * 1000; } else { - data_age = (mstime_t)(server.unixtime - server.repl_down_since) * 1000; + data_age = (mstime_t)(server.unixtime - getFirstMaster()->repl_down_since) * 1000; } /* Remove the node timeout from the data age as it is fine that we are @@ -3038,7 +3053,7 @@ void clusterHandleSlaveFailover(void) { "(rank #%d, offset %lld).", server.cluster->failover_auth_time - mstime(), server.cluster->failover_auth_rank, - replicationGetSlaveOffset()); + replicationGetSlaveOffset(getFirstMaster())); /* Now that we have a scheduled election, broadcast our offset * to all the other slaves so that they'll updated their offsets * if our offset is better. */ @@ -3291,7 +3306,7 @@ void clusterHandleManualFailover(void) { if (server.cluster->mf_master_offset == 0) return; /* Wait for offset... */ - if (server.cluster->mf_master_offset == replicationGetSlaveOffset()) { + if (server.cluster->mf_master_offset == replicationGetSlaveOffset(getFirstMaster())) { /* Our replication offset matches the master replication offset * announced after clients were paused. We can start the failover. */ server.cluster->mf_can_start = 1; @@ -3559,11 +3574,12 @@ void clusterCron(void) { * enable it if we know the address of our master and it appears to * be up. */ if (nodeIsSlave(myself) && - server.masterhost == NULL && + listLength(server.masters) == 0 && myself->slaveof && nodeHasAddr(myself->slaveof)) { - replicationSetMaster(myself->slaveof->ip, myself->slaveof->port); + replicationUnsetMaster(getFirstMaster()); + replicationAddMaster(myself->slaveof->ip, myself->slaveof->port); } /* Abourt a manual failover if the timeout is reached. */ @@ -3943,7 +3959,9 @@ void clusterSetMaster(clusterNode *n) { } myself->slaveof = n; clusterNodeAddSlave(n,myself); - replicationSetMaster(n->ip, n->port); + if (listLength(server.masters)) + replicationUnsetMaster(getFirstMaster()); + replicationAddMaster(n->ip, n->port); resetManualFailover(); } diff --git a/src/config.c b/src/config.c index dd8bdfdf8..ffafb14eb 100644 --- a/src/config.c +++ b/src/config.c @@ -354,9 +354,7 @@ void loadServerConfigFromString(char *config) { } else if ((!strcasecmp(argv[0],"slaveof") || !strcasecmp(argv[0],"replicaof")) && argc == 3) { slaveof_linenum = linenum; - server.masterhost = sdsnew(argv[1]); - server.masterport = atoi(argv[2]); - server.repl_state = REPL_STATE_CONNECT; + replicationAddMaster(sdsnew(argv[1]), atoi(argv[2])); } else if ((!strcasecmp(argv[0],"repl-ping-slave-period") || !strcasecmp(argv[0],"repl-ping-replica-period")) && argc == 2) @@ -400,11 +398,11 @@ void loadServerConfigFromString(char *config) { goto loaderr; } } else if (!strcasecmp(argv[0],"masteruser") && argc == 2) { - zfree(server.masteruser); - server.masteruser = argv[1][0] ? zstrdup(argv[1]) : NULL; + zfree(server.default_masteruser); + server.default_masteruser = argv[1][0] ? zstrdup(argv[1]) : NULL; } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) { - zfree(server.masterauth); - server.masterauth = argv[1][0] ? zstrdup(argv[1]) : NULL; + zfree(server.default_masterauth); + server.default_masterauth = argv[1][0] ? zstrdup(argv[1]) : NULL; } else if ((!strcasecmp(argv[0],"slave-serve-stale-data") || !strcasecmp(argv[0],"replica-serve-stale-data")) && argc == 2) @@ -866,7 +864,7 @@ void loadServerConfigFromString(char *config) { } /* Sanity checks. */ - if (server.cluster_enabled && server.masterhost) { + if (server.cluster_enabled && listLength(server.masters)) { linenum = slaveof_linenum; i = linenum-1; err = "replicaof directive not allowed in cluster mode"; @@ -986,11 +984,11 @@ void configSetCommand(client *c) { ACLSetUser(DefaultUser,aclop,sdslen(aclop)); sdsfree(aclop); } config_set_special_field("masteruser") { - zfree(server.masteruser); - server.masteruser = ((char*)ptrFromObj(o))[0] ? zstrdup(ptrFromObj(o)) : NULL; + zfree(server.default_masteruser); + server.default_masteruser = ((char*)ptrFromObj(o))[0] ? zstrdup(ptrFromObj(o)) : NULL; } config_set_special_field("masterauth") { - zfree(server.masterauth); - server.masterauth = ((char*)ptrFromObj(o))[0] ? zstrdup(ptrFromObj(o)) : NULL; + zfree(server.default_masterauth); + server.default_masterauth = ((char*)ptrFromObj(o))[0] ? zstrdup(ptrFromObj(o)) : NULL; } config_set_special_field("cluster-announce-ip") { zfree(server.cluster_announce_ip); server.cluster_announce_ip = ((char*)ptrFromObj(o))[0] ? zstrdup(ptrFromObj(o)) : NULL; @@ -1405,8 +1403,8 @@ void configGetCommand(client *c) { /* String values */ config_get_string_field("dbfilename",server.rdb_filename); - config_get_string_field("masteruser",server.masteruser); - config_get_string_field("masterauth",server.masterauth); + config_get_string_field("masteruser",server.default_masteruser); + config_get_string_field("masterauth",server.default_masterauth); config_get_string_field("cluster-announce-ip",server.cluster_announce_ip); config_get_string_field("unixsocket",server.unixsocket); config_get_string_field("logfile",server.logfile); @@ -1619,14 +1617,25 @@ void configGetCommand(client *c) { char *optname = stringmatch(pattern,"slaveof",1) ? "slaveof" : "replicaof"; char buf[256]; - - addReplyBulkCString(c,optname); - if (server.masterhost) - snprintf(buf,sizeof(buf),"%s %d", - server.masterhost, server.masterport); - else + if (listLength(server.masters) == 0) + { buf[0] = '\0'; - addReplyBulkCString(c,buf); + addReplyBulkCString(c,optname); + addReplyBulkCString(c,buf); + } + else + { + listIter li; + listNode *ln; + listRewind(server.masters, &li); + while ((ln = listNext(&li))) + { + struct redisMaster *mi = (struct redisMaster*)listNodeValue(ln); + snprintf(buf,sizeof(buf),"%s %d", + mi->masterhost, mi->masterport); + addReplyBulkCString(c,buf); + } + } matches++; } if (stringmatch(pattern,"notify-keyspace-events",1)) { @@ -2019,18 +2028,26 @@ void rewriteConfigDirOption(struct rewriteConfigState *state) { /* Rewrite the slaveof option. */ void rewriteConfigSlaveofOption(struct rewriteConfigState *state, char *option) { - sds line; - /* If this is a master, we want all the slaveof config options - * in the file to be removed. Note that if this is a cluster instance - * we don't want a slaveof directive inside redis.conf. */ - if (server.cluster_enabled || server.masterhost == NULL) { + * in the file to be removed. Note that if this is a cluster instance + * we don't want a slaveof directive inside redis.conf. */ + if (server.cluster_enabled || listLength(server.masters) == 0) { rewriteConfigMarkAsProcessed(state,option); return; } - line = sdscatprintf(sdsempty(),"%s %s %d", option, - server.masterhost, server.masterport); - rewriteConfigRewriteLine(state,option,line,1); + + listIter li; + listNode *ln; + listRewind(server.masters, &li); + while ((ln = listNext(&li))) + { + struct redisMaster *mi = (struct redisMaster*)listNodeValue(ln); + sds line; + + line = sdscatprintf(sdsempty(),"%s %s %d", option, + mi->masterhost, mi->masterport); + rewriteConfigRewriteLine(state,option,line,1); + } } /* Rewrite the notify-keyspace-events option. */ @@ -2285,8 +2302,8 @@ int rewriteConfig(char *path) { rewriteConfigDirOption(state); rewriteConfigSlaveofOption(state,"replicaof"); rewriteConfigStringOption(state,"replica-announce-ip",server.slave_announce_ip,CONFIG_DEFAULT_SLAVE_ANNOUNCE_IP); - rewriteConfigStringOption(state,"masteruser",server.masteruser,NULL); - rewriteConfigStringOption(state,"masterauth",server.masterauth,NULL); + rewriteConfigStringOption(state,"masteruser",server.default_masteruser,NULL); + rewriteConfigStringOption(state,"masterauth",server.default_masterauth,NULL); rewriteConfigStringOption(state,"cluster-announce-ip",server.cluster_announce_ip,NULL); rewriteConfigYesNoOption(state,"replica-serve-stale-data",server.repl_serve_stale_data,CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA); rewriteConfigYesNoOption(state,"replica-read-only",server.repl_slave_ro,CONFIG_DEFAULT_SLAVE_READ_ONLY); diff --git a/src/db.c b/src/db.c index c46b6de18..38b75feb6 100644 --- a/src/db.c +++ b/src/db.c @@ -105,7 +105,7 @@ robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) { /* Key expired. If we are in the context of a master, expireIfNeeded() * returns 0 only when the key does not exist at all, so it's safe * to return NULL ASAP. */ - if (server.masterhost == NULL) { + if (listLength(server.masters) == 0) { server.stat_keyspace_misses++; return NULL; } @@ -123,7 +123,7 @@ robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) { * * Notably this covers GETs when slaves are used to scale reads. */ if (server.current_client && - server.current_client != server.master && + !FActiveMaster(server.current_client) && server.current_client->cmd && server.current_client->cmd->flags & CMD_READONLY) { @@ -276,7 +276,7 @@ robj *dbRandomKey(redisDb *db) { key = dictGetKey(de); keyobj = createStringObject(key,sdslen(key)); if (dictFind(db->expires,key)) { - if (allvolatile && server.masterhost && --maxtries == 0) { + if (allvolatile && listLength(server.masters) && --maxtries == 0) { /* If the DB is composed only of keys with an expire set, * it could happen that all the keys are already logically * expired in the slave, so the function cannot stop because @@ -1109,7 +1109,7 @@ void setExpire(client *c, redisDb *db, robj *key, long long when) { de = dictAddOrFind(db->expires,dictGetKey(kde)); dictSetSignedIntegerVal(de,when); - int writable_slave = server.masterhost && server.repl_slave_ro == 0; + int writable_slave = listLength(server.masters) && server.repl_slave_ro == 0; if (c && writable_slave && !(c->flags & CLIENT_MASTER)) rememberSlaveKeyWithExpire(db,key); } @@ -1203,7 +1203,7 @@ int expireIfNeeded(redisDb *db, robj *key) { * Still we try to return the right information to the caller, * that is, 0 if we think the key should be still valid, 1 if * we think the key is expired at this time. */ - if (server.masterhost != NULL) return 1; + if (listLength(server.masters)) return 1; /* Delete the key */ server.stat_expiredkeys++; diff --git a/src/evict.c b/src/evict.c index cd270c257..f0bb94b7f 100644 --- a/src/evict.c +++ b/src/evict.c @@ -448,7 +448,7 @@ int freeMemoryIfNeeded(void) { serverAssert(GlobalLocksAcquired()); /* By default replicas should ignore maxmemory * and just be masters exact copies. */ - if (server.masterhost && server.repl_slave_ignore_maxmemory) return C_OK; + if (listLength(server.masters) && server.repl_slave_ignore_maxmemory) return C_OK; size_t mem_reported, mem_tofree, mem_freed; mstime_t latency, eviction_latency; diff --git a/src/expire.c b/src/expire.c index 20acd7f45..780968d54 100644 --- a/src/expire.c +++ b/src/expire.c @@ -424,7 +424,7 @@ void expireGenericCommand(client *c, long long basetime, int unit) { * * Instead we take the other branch of the IF statement setting an expire * (possibly in the past) and wait for an explicit DEL from the master. */ - if (when <= mstime() && !server.loading && !server.masterhost) { + if (when <= mstime() && !server.loading && !listLength(server.masters)) { robj *aux; int deleted = server.lazyfree_lazy_expire ? dbAsyncDelete(c->db,key) : diff --git a/src/module.c b/src/module.c index 4e4a584af..28dc8f131 100644 --- a/src/module.c +++ b/src/module.c @@ -1448,7 +1448,7 @@ int RM_GetContextFlags(RedisModuleCtx *ctx) { flags |= REDISMODULE_CTX_FLAGS_RDB; /* Replication flags */ - if (server.masterhost == NULL) { + if (listLength(server.masters) == 0) { flags |= REDISMODULE_CTX_FLAGS_MASTER; } else { flags |= REDISMODULE_CTX_FLAGS_SLAVE; diff --git a/src/multi.c b/src/multi.c index d92e515d4..65a400614 100644 --- a/src/multi.c +++ b/src/multi.c @@ -122,7 +122,7 @@ void execCommand(client *c) { int orig_argc; struct redisCommand *orig_cmd; int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */ - int was_master = server.masterhost == NULL; + int was_master = listLength(server.masters) == 0; if (!(c->flags & CLIENT_MULTI)) { addReplyError(c,"EXEC without MULTI"); @@ -147,7 +147,7 @@ void execCommand(client *c) { * was initiated when the instance was a master or a writable replica and * then the configuration changed (for example instance was turned into * a replica). */ - if (!server.loading && server.masterhost && server.repl_slave_ro && + if (!server.loading && listLength(server.masters) && server.repl_slave_ro && !(c->flags & CLIENT_MASTER) && c->mstate.cmd_flags & CMD_WRITE) { addReplyError(c, @@ -193,7 +193,7 @@ void execCommand(client *c) { /* Make sure the EXEC command will be propagated as well if MULTI * was already propagated. */ if (must_propagate) { - int is_master = server.masterhost == NULL; + int is_master = listLength(server.masters) == 0; server.dirty++; /* If inside the MULTI/EXEC block this instance was suddenly * switched from master to slave (using the SLAVEOF command), the diff --git a/src/networking.cpp b/src/networking.cpp index 726fc7ee2..d32f8a1cf 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1273,13 +1273,13 @@ void freeClient(client *c) { * * Note that before doing this we make sure that the client is not in * some unexpected state, by checking its flags. */ - if (server.master && c->flags & CLIENT_MASTER) { + if (FActiveMaster(c)) { serverLog(LL_WARNING,"Connection with master lost."); if (!(c->flags & (CLIENT_CLOSE_AFTER_REPLY| - CLIENT_CLOSE_ASAP| - CLIENT_BLOCKED))) + CLIENT_CLOSE_ASAP| + CLIENT_BLOCKED))) { - replicationCacheMaster(c); + replicationCacheMaster(MasterInfoFromClient(c), c); return; } } @@ -1339,7 +1339,7 @@ void freeClient(client *c) { /* Master/slave cleanup Case 2: * we lost the connection with the master. */ - if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection(); + if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection(MasterInfoFromClient(c)); /* If this client was scheduled for async freeing we need to remove it * from the queue. */ @@ -2564,7 +2564,7 @@ void helloCommand(client *c) { if (!server.sentinel_mode) { addReplyBulkCString(c,"role"); - addReplyBulkCString(c,server.masterhost ? "replica" : "master"); + addReplyBulkCString(c,listLength(server.masters) ? "replica" : "master"); } addReplyBulkCString(c,"modules"); diff --git a/src/rdb.c b/src/rdb.c index 8ab2fee82..dbf314021 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1859,8 +1859,15 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { * our cached time since it is used to create and update the last * interaction time with clients and for other important things. */ updateCachedTime(); - if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER) - replicationSendNewlineToMaster(); + listIter li; + listNode *ln; + listRewind(server.masters, &li); + while ((ln = listNext(&li))) + { + struct redisMaster *mi = (struct redisMaster*)listNodeValue(ln); + if (mi->repl_state == REPL_STATE_TRANSFER) + replicationSendNewlineToMaster(mi); + } loadingProgress(r->processed_bytes); processEventsWhileBlocked(serverTL - server.rgthreadvar); } @@ -2050,7 +2057,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { * received from the master. In the latter case, the master is * responsible for key expiry. If we would expire keys here, the * snapshot taken by the master may not be reflected on the slave. */ - if (server.masterhost == NULL && !loading_aof && expiretime != -1 && expiretime < now) { + if (listLength(server.masters) == 0 && !loading_aof && expiretime != -1 && expiretime < now) { decrRefCount(key); decrRefCount(val); } else { @@ -2528,7 +2535,7 @@ rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) { * connects to us, the NULL repl_backlog will trigger a full * synchronization, at the same time we will use a new replid and clear * replid2. */ - if (!server.masterhost && server.repl_backlog) { + if (!listLength(server.masters) && server.repl_backlog) { /* Note that when server.slaveseldb is -1, it means that this master * didn't apply any write commands after a full synchronization. * So we can let repl_stream_db be 0, this allows a restarted slave @@ -2538,10 +2545,17 @@ rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) { return rsi; } + if (listLength(server.masters) > 1) + { + // BUGBUG, warn user about this incomplete implementation + serverLog(LL_WARNING, "Warning: Only backing up first master's information in RDB"); + } + struct redisMaster *miFirst = listLength(server.masters) ? listNodeValue(listFirst(server.masters)) : NULL; + /* If the instance is a slave we need a connected master * in order to fetch the currently selected DB. */ - if (server.master) { - rsi->repl_stream_db = server.master->db->id; + if (miFirst && miFirst->master) { + rsi->repl_stream_db = miFirst->master->db->id; return rsi; } @@ -2550,8 +2564,8 @@ rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) { * increment the master_repl_offset only from data arriving from the * master, so if we are disconnected the offset in the cached master * is valid. */ - if (server.cached_master) { - rsi->repl_stream_db = server.cached_master->db->id; + if (miFirst && miFirst->cached_master) { + rsi->repl_stream_db = miFirst->cached_master->db->id; return rsi; } return NULL; diff --git a/src/replication.cpp b/src/replication.cpp index fd1c6301c..0e31889f9 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -40,11 +40,11 @@ #include #include -void replicationDiscardCachedMaster(void); -void replicationResurrectCachedMaster(int newfd); -void replicationSendAck(void); +void replicationDiscardCachedMaster(redisMaster *mi); +void replicationResurrectCachedMaster(redisMaster *mi, int newfd); +void replicationSendAck(redisMaster *mi); void putSlaveOnline(client *slave); -int cancelReplicationHandshake(void); +int cancelReplicationHandshake(redisMaster *mi); /* --------------------------- Utility functions ---------------------------- */ @@ -93,6 +93,34 @@ static bool FSameHost(client *clientA, client *clientB) return (zeroCheck != 0); // if the UUID is nil then it is never equal } +static bool FMasterHost(client *c) +{ + listIter li; + listNode *ln; + listRewind(server.masters, &li); + while ((ln = listNext(&li))) + { + redisMaster *mi = (redisMaster*)listNodeValue(ln); + if (FSameHost(mi->master, c)) + return true; + } + return false; +} + +static bool FAnyDisconnectedMasters() +{ + listIter li; + listNode *ln; + listRewind(server.masters, &li); + while ((ln = listNext(&li))) + { + redisMaster *mi = (redisMaster*)listNodeValue(ln); + if (mi->repl_state != REPL_STATE_CONNECTED) + return true; + } + return false; +} + /* ---------------------------------- MASTER -------------------------------- */ void createReplicationBacklog(void) { @@ -142,7 +170,7 @@ void freeReplicationBacklog(void) { while ((ln = listNext(&li))) { // server.slaves should be empty, or filled with clients pending close client *c = (client*)listNodeValue(ln); - serverAssert(c->flags & CLIENT_CLOSE_ASAP || FUuidEqual(server.master_uuid, c->uuid)); + serverAssert(c->flags & CLIENT_CLOSE_ASAP || FMasterHost(c)); } zfree(server.repl_backlog); server.repl_backlog = NULL; @@ -212,7 +240,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { * propagate *identical* replication stream. In this way this slave can * advertise the same replication ID as the master (since it shares the * master replication history and has the same backlog and offsets). */ - if (!server.fActiveReplica && server.masterhost != NULL) return; + if (!server.fActiveReplica && listLength(server.masters)) return; /* If there aren't slaves, and there is no backlog buffer to populate, * we can return ASAP. */ @@ -339,7 +367,7 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle while((ln = listNext(&li))) { client *slave = (client*)ln->value; std::lock_guardlock)> ulock(slave->lock); - if (FSameHost(slave, server.master)) + if (FMasterHost(slave)) continue; // Active Active case, don't feed back /* Don't feed slaves that are still waiting for BGSAVE to start */ @@ -689,7 +717,7 @@ void syncCommand(client *c) { /* Refuse SYNC requests if we are a slave but the link with our master * is not ok... */ if (!server.fActiveReplica) { - if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) { + if (FAnyDisconnectedMasters()) { addReplySds(c,sdsnew("-NOMASTERLINK Can't SYNC while not connected with my master\r\n")); return; } @@ -917,7 +945,13 @@ void replconfCommand(client *c) { } else if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"getack")) { /* REPLCONF GETACK is used in order to request an ACK ASAP * to the slave. */ - if (server.masterhost && server.master) replicationSendAck(); + listIter li; + listNode *ln; + listRewind(server.masters, &li); + while ((ln = listNext(&li))) + { + replicationSendAck((redisMaster*)listNodeValue(ln)); + } return; } else if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"uuid")) { /* REPLCONF uuid is used to set and send the UUID of each host */ @@ -1157,9 +1191,9 @@ void shiftReplicationId(void) { /* Returns 1 if the given replication state is a handshake state, * 0 otherwise. */ -int slaveIsInHandshakeState(void) { - return server.repl_state >= REPL_STATE_RECEIVE_PONG && - server.repl_state <= REPL_STATE_RECEIVE_PSYNC; +int slaveIsInHandshakeState(redisMaster *mi) { + return mi->repl_state >= REPL_STATE_RECEIVE_PONG && + mi->repl_state <= REPL_STATE_RECEIVE_PSYNC; } /* Avoid the master to detect the slave is timing out while loading the @@ -1170,11 +1204,11 @@ int slaveIsInHandshakeState(void) { * The function is called in two contexts: while we flush the current * data with emptyDb(), and while we load the new data received as an * RDB file from the master. */ -void replicationSendNewlineToMaster(void) { +void replicationSendNewlineToMaster(redisMaster *mi) { static time_t newline_sent; if (time(NULL) != newline_sent) { newline_sent = time(NULL); - if (write(server.repl_transfer_s,"\n",1) == -1) { + if (write(mi->repl_transfer_s,"\n",1) == -1) { /* Pinging back in this stage is best-effort. */ } } @@ -1184,30 +1218,38 @@ void replicationSendNewlineToMaster(void) { * the new dataset received by the master. */ void replicationEmptyDbCallback(void *privdata) { UNUSED(privdata); - replicationSendNewlineToMaster(); + listIter li; + listNode *ln; + listRewind(server.masters, &li); + while ((ln = listNext(&li))) + { + replicationSendNewlineToMaster((redisMaster*)listNodeValue(ln)); + } } /* Once we have a link with the master and the synchroniziation was * performed, this function materializes the master client we store * at server.master, starting from the specified file descriptor. */ -void replicationCreateMasterClient(int fd, int dbid) { - server.master = createClient(fd, serverTL - server.rgthreadvar); - server.master->flags |= CLIENT_MASTER; - server.master->authenticated = 1; - server.master->reploff = server.master_initial_offset; - server.master->read_reploff = server.master->reploff; - server.master->puser = NULL; /* This client can do everything. */ +void replicationCreateMasterClient(redisMaster *mi, int fd, int dbid) { + mi->master = createClient(fd, serverTL - server.rgthreadvar); + mi->master->flags |= CLIENT_MASTER; + mi->master->authenticated = 1; + mi->master->reploff = mi->master_initial_offset; + mi->master->read_reploff = mi->master->reploff; + mi->master->puser = NULL; /* This client can do everything. */ - memcpy(server.master->uuid, server.master_uuid, UUID_BINARY_LEN); - memset(server.master_uuid, 0, UUID_BINARY_LEN); // make sure people don't use this temp storage buffer + memcpy(mi->master->uuid, mi->master_uuid, UUID_BINARY_LEN); + memset(mi->master_uuid, 0, UUID_BINARY_LEN); // make sure people don't use this temp storage buffer - memcpy(server.master->replid, server.master_replid, - sizeof(server.master_replid)); + memcpy(mi->master->replid, mi->master_replid, + sizeof(mi->master_replid)); /* If master offset is set to -1, this master is old and is not * PSYNC capable, so we flag it accordingly. */ - if (server.master->reploff == -1) - server.master->flags |= CLIENT_PRE_PSYNC; - if (dbid != -1) selectDb(server.master,dbid); + if (mi->master->reploff == -1) + mi->master->flags |= CLIENT_PRE_PSYNC; + if (dbid != -1) selectDb(mi->master,dbid); + + listAddNodeTail(server.masters, mi); } /* This function will try to re-enable the AOF file after the @@ -1239,7 +1281,8 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { off_t left; UNUSED(el); UNUSED(mask); - int fUpdate = (int)((ptrdiff_t)privdata); // Should we update our database, or create from scratch? + int fUpdate = server.fActiveReplica; // Should we update our database, or create from scratch? + redisMaster *mi = (redisMaster*)privdata; serverAssert(GlobalLocksAcquired()); @@ -1255,7 +1298,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { /* If repl_transfer_size == -1 we still have to read the bulk length * from the master reply. */ - if (server.repl_transfer_size == -1) { + if (mi->repl_transfer_size == -1) { if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) { serverLog(LL_WARNING, "I/O error reading bulk count from MASTER: %s", @@ -1272,7 +1315,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { /* At this stage just a newline works as a PING in order to take * the connection live. So we refresh our last interaction * timestamp. */ - server.repl_transfer_lastio = server.unixtime; + mi->repl_transfer_lastio = server.unixtime; return; } else if (buf[0] != '$') { serverLog(LL_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf); @@ -1295,15 +1338,15 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { memset(lastbytes,0,CONFIG_RUN_ID_SIZE); /* Set any repl_transfer_size to avoid entering this code path * at the next call. */ - server.repl_transfer_size = 0; + mi->repl_transfer_size = 0; serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: receiving streamed RDB from master"); } else { usemark = 0; - server.repl_transfer_size = strtol(buf+1,NULL,10); + mi->repl_transfer_size = strtol(buf+1,NULL,10); serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: receiving %lld bytes from master", - (long long) server.repl_transfer_size); + (long long) mi->repl_transfer_size); } return; } @@ -1312,7 +1355,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { if (usemark) { readlen = sizeof(buf); } else { - left = server.repl_transfer_size - server.repl_transfer_read; + left = mi->repl_transfer_size - mi->repl_transfer_read; readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf); } @@ -1320,7 +1363,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { if (nread <= 0) { serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s", (nread == -1) ? strerror(errno) : "connection lost"); - cancelReplicationHandshake(); + cancelReplicationHandshake(mi); return; } server.stat_net_input_bytes += nread; @@ -1337,18 +1380,18 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0) eof_reached = 1; } - server.repl_transfer_lastio = server.unixtime; - if ((nwritten = write(server.repl_transfer_fd,buf,nread)) != nread) { + mi->repl_transfer_lastio = server.unixtime; + if ((nwritten = write(mi->repl_transfer_fd,buf,nread)) != nread) { serverLog(LL_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> REPLICA synchronization: %s", (nwritten == -1) ? strerror(errno) : "short write"); goto error; } - server.repl_transfer_read += nread; + mi->repl_transfer_read += nread; /* Delete the last 40 bytes from the file if we reached EOF. */ if (usemark && eof_reached) { - if (ftruncate(server.repl_transfer_fd, - server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1) + if (ftruncate(mi->repl_transfer_fd, + mi->repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1) { serverLog(LL_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno)); goto error; @@ -1358,19 +1401,19 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { /* Sync data on disk from time to time, otherwise at the end of the transfer * we may suffer a big delay as the memory buffers are copied into the * actual disk. */ - if (server.repl_transfer_read >= - server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC) + if (mi->repl_transfer_read >= + mi->repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC) { - off_t sync_size = server.repl_transfer_read - - server.repl_transfer_last_fsync_off; - rdb_fsync_range(server.repl_transfer_fd, - server.repl_transfer_last_fsync_off, sync_size); - server.repl_transfer_last_fsync_off += sync_size; + off_t sync_size = mi->repl_transfer_read - + mi->repl_transfer_last_fsync_off; + rdb_fsync_range(mi->repl_transfer_fd, + mi->repl_transfer_last_fsync_off, sync_size); + mi->repl_transfer_last_fsync_off += sync_size; } /* Check if the transfer is now complete */ if (!usemark) { - if (server.repl_transfer_read == server.repl_transfer_size) + if (mi->repl_transfer_read == mi->repl_transfer_size) eof_reached = 1; } @@ -1388,10 +1431,10 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { killRDBChild(); } - if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) { + if (rename(mi->repl_transfer_tmpfile,server.rdb_filename) == -1) { serverLog(LL_WARNING,"Failed trying to rename the temp DB into %s in MASTER <-> REPLICA synchronization: %s", server.rdb_filename, strerror(errno)); - cancelReplicationHandshake(); + cancelReplicationHandshake(mi); return; } serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: %s", fUpdate ? "Keeping old data" : "Flushing old data"); @@ -1411,28 +1454,28 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { * handler, otherwise it will get called recursively since * rdbLoad() will call the event loop to process events from time to * time for non blocking loading. */ - aeDeleteFileEvent(el,server.repl_transfer_s,AE_READABLE); + aeDeleteFileEvent(el,mi->repl_transfer_s,AE_READABLE); serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory"); rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; if (rdbLoad(&rsi) != C_OK) { serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk"); - cancelReplicationHandshake(); + cancelReplicationHandshake(mi); /* Re-enable the AOF if we disabled it earlier, in order to restore * the original configuration. */ if (aof_is_enabled) restartAOFAfterSYNC(); return; } /* Final setup of the connected slave <- master link */ - zfree(server.repl_transfer_tmpfile); - close(server.repl_transfer_fd); - replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db); - server.repl_state = REPL_STATE_CONNECTED; - server.repl_down_since = 0; + zfree(mi->repl_transfer_tmpfile); + close(mi->repl_transfer_fd); + replicationCreateMasterClient(mi, mi->repl_transfer_s,rsi.repl_stream_db); + mi->repl_state = REPL_STATE_CONNECTED; + mi->repl_down_since = 0; /* After a full resynchroniziation we use the replication ID and * offset of the master. The secondary ID / offset are cleared since * we are starting a new history. */ - memcpy(server.replid,server.master->replid,sizeof(server.replid)); - server.master_repl_offset = server.master->reploff; + memcpy(server.replid,mi->master->replid,sizeof(server.replid)); + server.master_repl_offset = mi->master->reploff; clearReplicationId2(); /* Let's create the replication backlog if needed. Slaves need to * accumulate the backlog regardless of the fact they have sub-slaves @@ -1449,7 +1492,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { return; error: - cancelReplicationHandshake(); + cancelReplicationHandshake(mi); return; } @@ -1462,7 +1505,7 @@ error: #define SYNC_CMD_READ (1<<0) #define SYNC_CMD_WRITE (1<<1) #define SYNC_CMD_FULL (SYNC_CMD_READ|SYNC_CMD_WRITE) -char *sendSynchronousCommand(int flags, int fd, ...) { +char *sendSynchronousCommand(redisMaster *mi, int flags, int fd, ...) { /* Create the command to send to the master, we use redis binary * protocol to make sure correct arguments are sent. This function @@ -1510,7 +1553,7 @@ char *sendSynchronousCommand(int flags, int fd, ...) { return sdscatprintf(sdsempty(),"-Reading from master: %s", strerror(errno)); } - server.repl_transfer_lastio = server.unixtime; + mi->repl_transfer_lastio = server.unixtime; return sdsnew(buf); } return NULL; @@ -1570,7 +1613,7 @@ char *sendSynchronousCommand(int flags, int fd, ...) { #define PSYNC_FULLRESYNC 3 #define PSYNC_NOT_SUPPORTED 4 #define PSYNC_TRY_LATER 5 -int slaveTryPartialResynchronization(aeEventLoop *el, int fd, int read_reply) { +int slaveTryPartialResynchronization(redisMaster *mi, aeEventLoop *el, int fd, int read_reply) { const char *psync_replid; char psync_offset[32]; sds reply; @@ -1582,11 +1625,11 @@ int slaveTryPartialResynchronization(aeEventLoop *el, int fd, int read_reply) { * a FULL resync using the PSYNC command we'll set the offset at the * right value, so that this information will be propagated to the * client structure representing the master into server.master. */ - server.master_initial_offset = -1; + mi->master_initial_offset = -1; - if (server.cached_master && !server.fActiveReplica) { - psync_replid = server.cached_master->replid; - snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1); + if (mi->cached_master && !server.fActiveReplica) { + psync_replid = mi->cached_master->replid; + snprintf(psync_offset,sizeof(psync_offset),"%lld", mi->cached_master->reploff+1); serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset); } else { serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)"); @@ -1595,7 +1638,7 @@ int slaveTryPartialResynchronization(aeEventLoop *el, int fd, int read_reply) { } /* Issue the PSYNC command */ - reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_replid,psync_offset,NULL); + reply = sendSynchronousCommand(mi, SYNC_CMD_WRITE,fd,"PSYNC",psync_replid,psync_offset,NULL); if (reply != NULL) { serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply); sdsfree(reply); @@ -1606,7 +1649,7 @@ int slaveTryPartialResynchronization(aeEventLoop *el, int fd, int read_reply) { } /* Reading half */ - reply = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); + reply = sendSynchronousCommand(mi, SYNC_CMD_READ,fd,NULL); if (sdslen(reply) == 0) { /* The master may send empty newlines after it receives PSYNC * and before to reply, just to keep the connection alive. */ @@ -1634,17 +1677,17 @@ int slaveTryPartialResynchronization(aeEventLoop *el, int fd, int read_reply) { * reply means that the master supports PSYNC, but the reply * format seems wrong. To stay safe we blank the master * replid to make sure next PSYNCs will fail. */ - memset(server.master_replid,0,CONFIG_RUN_ID_SIZE+1); + memset(mi->master_replid,0,CONFIG_RUN_ID_SIZE+1); } else { - memcpy(server.master_replid, replid, offset-replid-1); - server.master_replid[CONFIG_RUN_ID_SIZE] = '\0'; - server.master_initial_offset = strtoll(offset,NULL,10); + memcpy(mi->master_replid, replid, offset-replid-1); + mi->master_replid[CONFIG_RUN_ID_SIZE] = '\0'; + mi->master_initial_offset = strtoll(offset,NULL,10); serverLog(LL_NOTICE,"Full resync from master: %s:%lld", - server.master_replid, - server.master_initial_offset); + mi->master_replid, + mi->master_initial_offset); } /* We are going to full resync, discard the cached master structure. */ - replicationDiscardCachedMaster(); + replicationDiscardCachedMaster(mi); sdsfree(reply); return PSYNC_FULLRESYNC; } @@ -1667,19 +1710,19 @@ int slaveTryPartialResynchronization(aeEventLoop *el, int fd, int read_reply) { memcpy(sznew,start,CONFIG_RUN_ID_SIZE); sznew[CONFIG_RUN_ID_SIZE] = '\0'; - if (strcmp(sznew,server.cached_master->replid)) { + if (strcmp(sznew,mi->cached_master->replid)) { /* Master ID changed. */ serverLog(LL_WARNING,"Master replication ID changed to %s",sznew); /* Set the old ID as our ID2, up to the current offset+1. */ - memcpy(server.replid2,server.cached_master->replid, + memcpy(server.replid2,mi->cached_master->replid, sizeof(server.replid2)); server.second_replid_offset = server.master_repl_offset+1; /* Update the cached master ID and our own primary ID to the * new one. */ memcpy(server.replid,sznew,sizeof(server.replid)); - memcpy(server.cached_master->replid,sznew,sizeof(server.replid)); + memcpy(mi->cached_master->replid,sznew,sizeof(server.replid)); /* Disconnect all the sub-slaves: they need to be notified. */ disconnectSlaves(); @@ -1688,7 +1731,7 @@ int slaveTryPartialResynchronization(aeEventLoop *el, int fd, int read_reply) { /* Setup the replication to continue. */ sdsfree(reply); - replicationResurrectCachedMaster(fd); + replicationResurrectCachedMaster(mi, fd); /* If this instance was restarted and we read the metadata to * PSYNC from the persistence file, our replication backlog could @@ -1724,7 +1767,7 @@ int slaveTryPartialResynchronization(aeEventLoop *el, int fd, int read_reply) { "error state (reply: %s)", reply); } sdsfree(reply); - replicationDiscardCachedMaster(); + replicationDiscardCachedMaster(mi); return PSYNC_NOT_SUPPORTED; } @@ -1737,12 +1780,13 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { int sockerr = 0, psync_result = PSYNC_FULLRESYNC; socklen_t errlen = sizeof(sockerr); UNUSED(el); - UNUSED(privdata); UNUSED(mask); + redisMaster *mi = (redisMaster*)privdata; + /* If this event fired after the user turned the instance into a master * with SLAVEOF NO ONE we must just return ASAP. */ - if (server.repl_state == REPL_STATE_NONE) { + if (mi->repl_state == REPL_STATE_NONE) { close(fd); return; } @@ -1758,22 +1802,22 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { } /* Send a PING to check the master is able to reply without errors. */ - if (server.repl_state == REPL_STATE_CONNECTING) { + if (mi->repl_state == REPL_STATE_CONNECTING) { serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event."); /* Delete the writable event so that the readable event remains * registered and we can wait for the PONG reply. */ aeDeleteFileEvent(el,fd,AE_WRITABLE); - server.repl_state = REPL_STATE_RECEIVE_PONG; + mi->repl_state = REPL_STATE_RECEIVE_PONG; /* Send the PING, don't check for errors at all, we have the timeout * that will take care about this. */ - err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PING",NULL); + err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,fd,"PING",NULL); if (err) goto write_error; return; } /* Receive the PONG command. */ - if (server.repl_state == REPL_STATE_RECEIVE_PONG) { - err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); + if (mi->repl_state == REPL_STATE_RECEIVE_PONG) { + err = sendSynchronousCommand(mi, SYNC_CMD_READ,fd,NULL); /* We accept only two replies as valid, a positive +PONG reply * (we just check for "+") or an authentication error. @@ -1792,59 +1836,59 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { "Master replied to PING, replication can continue..."); } sdsfree(err); - server.repl_state = REPL_STATE_SEND_AUTH; + mi->repl_state = REPL_STATE_SEND_AUTH; } /* AUTH with the master if required. */ - if (server.repl_state == REPL_STATE_SEND_AUTH) { - if (server.masteruser && server.masterauth) { - err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"AUTH", - server.masteruser,server.masterauth,NULL); + if (mi->repl_state == REPL_STATE_SEND_AUTH) { + if (mi->masteruser && mi->masterauth) { + err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,fd,"AUTH", + mi->masteruser,mi->masterauth,NULL); if (err) goto write_error; - server.repl_state = REPL_STATE_RECEIVE_AUTH; + mi->repl_state = REPL_STATE_RECEIVE_AUTH; return; - } else if (server.masterauth) { - err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"AUTH",server.masterauth,NULL); + } else if (mi->masterauth) { + err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,fd,"AUTH",mi->masterauth,NULL); if (err) goto write_error; - server.repl_state = REPL_STATE_RECEIVE_AUTH; + mi->repl_state = REPL_STATE_RECEIVE_AUTH; return; } else { - server.repl_state = REPL_STATE_SEND_UUID; + mi->repl_state = REPL_STATE_SEND_UUID; } } /* Receive AUTH reply. */ - if (server.repl_state == REPL_STATE_RECEIVE_AUTH) { - err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); + if (mi->repl_state == REPL_STATE_RECEIVE_AUTH) { + err = sendSynchronousCommand(mi, SYNC_CMD_READ,fd,NULL); if (err[0] == '-') { serverLog(LL_WARNING,"Unable to AUTH to MASTER: %s",err); sdsfree(err); goto error; } sdsfree(err); - server.repl_state = REPL_STATE_SEND_UUID; + mi->repl_state = REPL_STATE_SEND_UUID; } /* Send UUID */ - if (server.repl_state == REPL_STATE_SEND_UUID) { + if (mi->repl_state == REPL_STATE_SEND_UUID) { char szUUID[37] = {0}; - memset(server.master_uuid, 0, UUID_BINARY_LEN); + memset(mi->master_uuid, 0, UUID_BINARY_LEN); uuid_unparse((unsigned char*)server.uuid, szUUID); - err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF","uuid",szUUID,NULL); + err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,fd,"REPLCONF","uuid",szUUID,NULL); if (err) goto write_error; - server.repl_state = REPL_STATE_RECEIVE_UUID; + mi->repl_state = REPL_STATE_RECEIVE_UUID; return; } /* Receive UUID */ - if (server.repl_state == REPL_STATE_RECEIVE_UUID) { - err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); + if (mi->repl_state == REPL_STATE_RECEIVE_UUID) { + err = sendSynchronousCommand(mi, SYNC_CMD_READ,fd,NULL); if (err[0] == '-') { serverLog(LL_WARNING, "non-fatal: Master doesn't understand REPLCONF uuid"); } else { if (strlen(err) != 37 // 36-byte UUID string and the leading '+' - || uuid_parse(err+1, server.master_uuid) != 0) + || uuid_parse(err+1, mi->master_uuid) != 0) { serverLog(LL_WARNING, "Master replied with a UUID we don't understand"); sdsfree(err); @@ -1852,27 +1896,27 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { } } sdsfree(err); - server.repl_state = REPL_STATE_SEND_PORT; + mi->repl_state = REPL_STATE_SEND_PORT; // fallthrough } /* Set the slave port, so that Master's INFO command can list the * slave listening port correctly. */ - if (server.repl_state == REPL_STATE_SEND_PORT) { + if (mi->repl_state == REPL_STATE_SEND_PORT) { sds port = sdsfromlonglong(server.slave_announce_port ? server.slave_announce_port : server.port); - err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF", + err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,fd,"REPLCONF", "listening-port",port, NULL); sdsfree(port); if (err) goto write_error; sdsfree(err); - server.repl_state = REPL_STATE_RECEIVE_PORT; + mi->repl_state = REPL_STATE_RECEIVE_PORT; return; } /* Receive REPLCONF listening-port reply. */ - if (server.repl_state == REPL_STATE_RECEIVE_PORT) { - err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); + if (mi->repl_state == REPL_STATE_RECEIVE_PORT) { + err = sendSynchronousCommand(mi, SYNC_CMD_READ,fd,NULL); /* Ignore the error if any, not all the Redis versions support * REPLCONF listening-port. */ if (err[0] == '-') { @@ -1880,30 +1924,30 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { "REPLCONF listening-port: %s", err); } sdsfree(err); - server.repl_state = REPL_STATE_SEND_IP; + mi->repl_state = REPL_STATE_SEND_IP; } /* Skip REPLCONF ip-address if there is no slave-announce-ip option set. */ - if (server.repl_state == REPL_STATE_SEND_IP && + if (mi->repl_state == REPL_STATE_SEND_IP && server.slave_announce_ip == NULL) { - server.repl_state = REPL_STATE_SEND_CAPA; + mi->repl_state = REPL_STATE_SEND_CAPA; } /* Set the slave ip, so that Master's INFO command can list the * slave IP address port correctly in case of port forwarding or NAT. */ - if (server.repl_state == REPL_STATE_SEND_IP) { - err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF", + if (mi->repl_state == REPL_STATE_SEND_IP) { + err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,fd,"REPLCONF", "ip-address",server.slave_announce_ip, NULL); if (err) goto write_error; sdsfree(err); - server.repl_state = REPL_STATE_RECEIVE_IP; + mi->repl_state = REPL_STATE_RECEIVE_IP; return; } /* Receive REPLCONF ip-address reply. */ - if (server.repl_state == REPL_STATE_RECEIVE_IP) { - err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); + if (mi->repl_state == REPL_STATE_RECEIVE_IP) { + err = sendSynchronousCommand(mi, SYNC_CMD_READ,fd,NULL); /* Ignore the error if any, not all the Redis versions support * REPLCONF listening-port. */ if (err[0] == '-') { @@ -1911,7 +1955,7 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { "REPLCONF ip-address: %s", err); } sdsfree(err); - server.repl_state = REPL_STATE_SEND_CAPA; + mi->repl_state = REPL_STATE_SEND_CAPA; } /* Inform the master of our (slave) capabilities. @@ -1920,18 +1964,18 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { * PSYNC2: supports PSYNC v2, so understands +CONTINUE . * * The master will ignore capabilities it does not understand. */ - if (server.repl_state == REPL_STATE_SEND_CAPA) { - err = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF", + if (mi->repl_state == REPL_STATE_SEND_CAPA) { + err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,fd,"REPLCONF", "capa","eof","capa","psync2",NULL); if (err) goto write_error; sdsfree(err); - server.repl_state = REPL_STATE_RECEIVE_CAPA; + mi->repl_state = REPL_STATE_RECEIVE_CAPA; return; } /* Receive CAPA reply. */ - if (server.repl_state == REPL_STATE_RECEIVE_CAPA) { - err = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL); + if (mi->repl_state == REPL_STATE_RECEIVE_CAPA) { + err = sendSynchronousCommand(mi, SYNC_CMD_READ,fd,NULL); /* Ignore the error if any, not all the Redis versions support * REPLCONF capa. */ if (err[0] == '-') { @@ -1939,7 +1983,7 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { "REPLCONF capa: %s", err); } sdsfree(err); - server.repl_state = REPL_STATE_SEND_PSYNC; + mi->repl_state = REPL_STATE_SEND_PSYNC; } /* Try a partial resynchonization. If we don't have a cached master @@ -1947,24 +1991,24 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { * to start a full resynchronization so that we get the master run id * and the global offset, to try a partial resync at the next * reconnection attempt. */ - if (server.repl_state == REPL_STATE_SEND_PSYNC) { - if (slaveTryPartialResynchronization(el,fd,0) == PSYNC_WRITE_ERROR) { + if (mi->repl_state == REPL_STATE_SEND_PSYNC) { + if (slaveTryPartialResynchronization(mi,el,fd,0) == PSYNC_WRITE_ERROR) { err = sdsnew("Write error sending the PSYNC command."); goto write_error; } - server.repl_state = REPL_STATE_RECEIVE_PSYNC; + mi->repl_state = REPL_STATE_RECEIVE_PSYNC; return; } /* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC. */ - if (server.repl_state != REPL_STATE_RECEIVE_PSYNC) { + if (mi->repl_state != REPL_STATE_RECEIVE_PSYNC) { serverLog(LL_WARNING,"syncWithMaster(): state machine error, " "state should be RECEIVE_PSYNC but is %d", - server.repl_state); + mi->repl_state); goto error; } - psync_result = slaveTryPartialResynchronization(el,fd,1); + psync_result = slaveTryPartialResynchronization(mi,el,fd,1); if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */ /* If the master is in an transient error, we should try to PSYNC @@ -1985,7 +2029,7 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { * as well, if we have any sub-slaves. The master may transfer us an * entirely different data set and we have no way to incrementally feed * our slaves after that. */ - disconnectSlavesExcept(server.master_uuid); /* Force our slaves to resync with us as well. */ + disconnectSlavesExcept(mi->master_uuid); /* Force our slaves to resync with us as well. */ freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */ /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC @@ -2014,7 +2058,7 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { } /* Setup the non blocking download of the bulk file. */ - if (aeCreateFileEvent(el,fd, AE_READABLE,readSyncBulkPayload,(void*)((ptrdiff_t)server.fActiveReplica)) + if (aeCreateFileEvent(el,fd, AE_READABLE,readSyncBulkPayload,mi) == AE_ERR) { serverLog(LL_WARNING, @@ -2023,21 +2067,21 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { goto error; } - server.repl_state = REPL_STATE_TRANSFER; - server.repl_transfer_size = -1; - server.repl_transfer_read = 0; - server.repl_transfer_last_fsync_off = 0; - server.repl_transfer_fd = dfd; - server.repl_transfer_lastio = server.unixtime; - server.repl_transfer_tmpfile = zstrdup(tmpfile); + mi->repl_state = REPL_STATE_TRANSFER; + mi->repl_transfer_size = -1; + mi->repl_transfer_read = 0; + mi->repl_transfer_last_fsync_off = 0; + mi->repl_transfer_fd = dfd; + mi->repl_transfer_lastio = server.unixtime; + mi->repl_transfer_tmpfile = zstrdup(tmpfile); return; error: aeDeleteFileEvent(el,fd,AE_READABLE|AE_WRITABLE); if (dfd != -1) close(dfd); close(fd); - server.repl_transfer_s = -1; - server.repl_state = REPL_STATE_CONNECT; + mi->repl_transfer_s = -1; + mi->repl_state = REPL_STATE_CONNECT; return; write_error: /* Handle sendSynchronousCommand(SYNC_CMD_WRITE) errors. */ @@ -2046,18 +2090,18 @@ write_error: /* Handle sendSynchronousCommand(SYNC_CMD_WRITE) errors. */ goto error; } -int connectWithMaster(void) { +int connectWithMaster(redisMaster *mi) { int fd; fd = anetTcpNonBlockBestEffortBindConnect(NULL, - server.masterhost,server.masterport,NET_FIRST_BIND_ADDR); + mi->masterhost,mi->masterport,NET_FIRST_BIND_ADDR); if (fd == -1) { serverLog(LL_WARNING,"Unable to connect to MASTER: %s", strerror(errno)); return C_ERR; } - if (aeCreateFileEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) == + if (aeCreateFileEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,mi) == AE_ERR) { close(fd); @@ -2065,9 +2109,9 @@ int connectWithMaster(void) { return C_ERR; } - server.repl_transfer_lastio = server.unixtime; - server.repl_transfer_s = fd; - server.repl_state = REPL_STATE_CONNECTING; + mi->repl_transfer_lastio = server.unixtime; + mi->repl_transfer_s = fd; + mi->repl_state = REPL_STATE_CONNECTING; return C_OK; } @@ -2075,23 +2119,23 @@ int connectWithMaster(void) { * in progress to undo it. * Never call this function directly, use cancelReplicationHandshake() instead. */ -void undoConnectWithMaster(void) { - int fd = server.repl_transfer_s; +void undoConnectWithMaster(redisMaster *mi) { + int fd = mi->repl_transfer_s; aeDeleteFileEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el,fd,AE_READABLE|AE_WRITABLE); close(fd); - server.repl_transfer_s = -1; + mi->repl_transfer_s = -1; } /* Abort the async download of the bulk dataset while SYNC-ing with master. * Never call this function directly, use cancelReplicationHandshake() instead. */ -void replicationAbortSyncTransfer(void) { - serverAssert(server.repl_state == REPL_STATE_TRANSFER); - undoConnectWithMaster(); - close(server.repl_transfer_fd); - unlink(server.repl_transfer_tmpfile); - zfree(server.repl_transfer_tmpfile); +void replicationAbortSyncTransfer(redisMaster *mi) { + serverAssert(mi->repl_state == REPL_STATE_TRANSFER); + undoConnectWithMaster(mi); + close(mi->repl_transfer_fd); + unlink(mi->repl_transfer_tmpfile); + zfree(mi->repl_transfer_tmpfile); } /* This function aborts a non blocking replication attempt if there is one @@ -2102,15 +2146,15 @@ void replicationAbortSyncTransfer(void) { * the replication state (server.repl_state) set to REPL_STATE_CONNECT. * * Otherwise zero is returned and no operation is perforemd at all. */ -int cancelReplicationHandshake(void) { - if (server.repl_state == REPL_STATE_TRANSFER) { - replicationAbortSyncTransfer(); - server.repl_state = REPL_STATE_CONNECT; - } else if (server.repl_state == REPL_STATE_CONNECTING || - slaveIsInHandshakeState()) +int cancelReplicationHandshake(redisMaster *mi) { + if (mi->repl_state == REPL_STATE_TRANSFER) { + replicationAbortSyncTransfer(mi); + mi->repl_state = REPL_STATE_CONNECT; + } else if (mi->repl_state == REPL_STATE_CONNECTING || + slaveIsInHandshakeState(mi)) { - undoConnectWithMaster(); - server.repl_state = REPL_STATE_CONNECT; + undoConnectWithMaster(mi); + mi->repl_state = REPL_STATE_CONNECT; } else { return 0; } @@ -2118,54 +2162,58 @@ int cancelReplicationHandshake(void) { } /* Set replication to the specified master address and port. */ -void replicationSetMaster(char *ip, int port) { - int was_master = server.masterhost == NULL; +struct redisMaster *replicationAddMaster(char *ip, int port) { + redisMaster *mi = (redisMaster*)zcalloc(sizeof(redisMaster), MALLOC_LOCAL); + initMasterInfo(mi); + int was_master = mi->masterhost == NULL; - sdsfree(server.masterhost); - server.masterhost = sdsnew(ip); - server.masterport = port; - if (server.master) { - if (FCorrectThread(server.master)) - freeClient(server.master); + sdsfree(mi->masterhost); + mi->masterhost = sdsnew(ip); + mi->masterport = port; + if (mi->master) { + if (FCorrectThread(mi->master)) + freeClient(mi->master); else - freeClientAsync(server.master); + freeClientAsync(mi->master); } disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */ /* Force our slaves to resync with us as well. They may hopefully be able * to partially resync with us, but we can notify the replid change. */ disconnectSlaves(); - cancelReplicationHandshake(); + cancelReplicationHandshake(mi); /* Before destroying our master state, create a cached master using * our own parameters, to later PSYNC with the new master. */ - if (was_master) replicationCacheMasterUsingMyself(); - server.repl_state = REPL_STATE_CONNECT; + if (was_master) replicationCacheMasterUsingMyself(mi); + mi->repl_state = REPL_STATE_CONNECT; + listAddNodeTail(server.masters, mi); + return mi; } /* Cancel replication, setting the instance as a master itself. */ -void replicationUnsetMaster(void) { - if (server.masterhost == NULL) return; /* Nothing to do. */ - sdsfree(server.masterhost); - server.masterhost = NULL; +void replicationUnsetMaster(redisMaster *mi) { + if (mi->masterhost == NULL) return; /* Nothing to do. */ + sdsfree(mi->masterhost); + mi->masterhost = NULL; /* When a slave is turned into a master, the current replication ID * (that was inherited from the master at synchronization time) is * used as secondary ID up to the current offset, and a new replication * ID is created to continue with a new replication history. */ shiftReplicationId(); - if (server.master) { - if (FCorrectThread(server.master)) - freeClient(server.master); + if (mi->master) { + if (FCorrectThread(mi->master)) + freeClient(mi->master); else - freeClientAsync(server.master); + freeClientAsync(mi->master); } - replicationDiscardCachedMaster(); - cancelReplicationHandshake(); + replicationDiscardCachedMaster(mi); + cancelReplicationHandshake(mi); /* Disconnecting all the slaves is required: we need to inform slaves * of the replication ID change (see shiftReplicationId() call). However * the slaves will be able to partially resync with us, so it will be * a very fast reconnection. */ disconnectSlaves(); - server.repl_state = REPL_STATE_NONE; + mi->repl_state = REPL_STATE_NONE; /* We need to make sure the new master will start the replication stream * with a SELECT statement. This is forced after a full resync, but @@ -2182,10 +2230,10 @@ void replicationUnsetMaster(void) { /* This function is called when the slave lose the connection with the * master into an unexpected way. */ -void replicationHandleMasterDisconnection(void) { - server.master = NULL; - server.repl_state = REPL_STATE_CONNECT; - server.repl_down_since = server.unixtime; +void replicationHandleMasterDisconnection(redisMaster *mi) { + mi->master = NULL; + mi->repl_state = REPL_STATE_CONNECT; + mi->repl_down_since = server.unixtime; /* We lost connection with our master, don't disconnect slaves yet, * maybe we'll be able to PSYNC with our master later. We'll disconnect * the slaves only if we'll have to do a full resync with our master. */ @@ -2203,8 +2251,14 @@ void replicaofCommand(client *c) { * into a master. Otherwise the new master address is set. */ if (!strcasecmp((const char*)ptrFromObj(c->argv[1]),"no") && !strcasecmp((const char*)ptrFromObj(c->argv[2]),"one")) { - if (server.masterhost) { - replicationUnsetMaster(); + if (listLength(server.masters)) { + listIter li; + listNode *ln; + listRewind(server.masters, &li); + while ((ln = listNext(&li))) + { + replicationUnsetMaster((redisMaster*)listNodeValue(ln)); + } sds client = catClientInfoString(sdsempty(),c); serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')", client); @@ -2226,21 +2280,28 @@ void replicaofCommand(client *c) { return; /* Check if we are already attached to the specified slave */ - if (server.masterhost && !strcasecmp(server.masterhost,(const char*)ptrFromObj(c->argv[1])) - && server.masterport == port) { - serverLog(LL_NOTICE,"REPLICAOF would result into synchronization " - "with the master we are already connected " - "with. No operation performed."); - addReplySds(c,sdsnew("+OK Already connected to specified " - "master\r\n")); - return; + listIter li; + listNode *ln; + listRewind(server.masters, &li); + while ((ln = listNext(&li))) + { + redisMaster *mi = (redisMaster*)listNodeValue(ln); + if (!strcasecmp(mi->masterhost,(const char*)ptrFromObj(c->argv[1])) + && mi->masterport == port) { + serverLog(LL_NOTICE,"REPLICAOF would result into synchronization " + "with the master we are already connected " + "with. No operation performed."); + addReplySds(c,sdsnew("+OK Already connected to specified " + "master\r\n")); + return; + } } /* There was no previous master or the user specified a different one, * we can continue. */ - replicationSetMaster((char*)ptrFromObj(c->argv[1]), port); + redisMaster *miNew = replicationAddMaster((char*)ptrFromObj(c->argv[1]), port); sds client = catClientInfoString(sdsempty(),c); serverLog(LL_NOTICE,"REPLICAOF %s:%d enabled (user request from '%s')", - server.masterhost, server.masterport, client); + miNew->masterhost, miNew->masterport, client); sdsfree(client); } addReplyAsync(c,shared.ok); @@ -2250,7 +2311,7 @@ void replicaofCommand(client *c) { * (master or slave) and additional information related to replication * in an easy to process format. */ void roleCommand(client *c) { - if (server.masterhost == NULL) { + if (listLength(server.masters) == 0) { listIter li; listNode *ln; void *mbcount; @@ -2279,35 +2340,42 @@ void roleCommand(client *c) { } setDeferredArrayLen(c,mbcount,slaves); } else { - const char *slavestate = NULL; + listIter li; + listNode *ln; + listRewind(server.masters, &li); - addReplyArrayLen(c,5); - addReplyBulkCBuffer(c,"slave",5); - addReplyBulkCString(c,server.masterhost); - addReplyLongLong(c,server.masterport); - if (slaveIsInHandshakeState()) { - slavestate = "handshake"; - } else { - switch(server.repl_state) { - case REPL_STATE_NONE: slavestate = "none"; break; - case REPL_STATE_CONNECT: slavestate = "connect"; break; - case REPL_STATE_CONNECTING: slavestate = "connecting"; break; - case REPL_STATE_TRANSFER: slavestate = "sync"; break; - case REPL_STATE_CONNECTED: slavestate = "connected"; break; - default: slavestate = "unknown"; break; + while ((ln = listNext(&li))) + { + redisMaster *mi = (redisMaster*)listNodeValue(ln); + const char *slavestate = NULL; + addReplyArrayLen(c,5); + addReplyBulkCBuffer(c,"slave",5); + addReplyBulkCString(c,mi->masterhost); + addReplyLongLong(c,mi->masterport); + if (slaveIsInHandshakeState(mi)) { + slavestate = "handshake"; + } else { + switch(mi->repl_state) { + case REPL_STATE_NONE: slavestate = "none"; break; + case REPL_STATE_CONNECT: slavestate = "connect"; break; + case REPL_STATE_CONNECTING: slavestate = "connecting"; break; + case REPL_STATE_TRANSFER: slavestate = "sync"; break; + case REPL_STATE_CONNECTED: slavestate = "connected"; break; + default: slavestate = "unknown"; break; + } } + addReplyBulkCString(c,slavestate); + addReplyLongLong(c,mi->master ? mi->master->reploff : -1); } - addReplyBulkCString(c,slavestate); - addReplyLongLong(c,server.master ? server.master->reploff : -1); } } /* Send a REPLCONF ACK command to the master to inform it about the current * processed offset. If we are not connected with a master, the command has * no effects. */ -void replicationSendAck(void) +void replicationSendAck(redisMaster *mi) { - client *c = server.master; + client *c = mi->master; if (c != NULL) { c->flags |= CLIENT_MASTER_FORCE_REPLY; @@ -2339,8 +2407,8 @@ void replicationSendAck(void) * replicationResurrectCachedMaster() that is used after a successful PSYNC * handshake in order to reactivate the cached master. */ -void replicationCacheMaster(client *c) { - serverAssert(server.master != NULL && server.cached_master == NULL); +void replicationCacheMaster(redisMaster *mi, client *c) { + serverAssert(mi->master != NULL && mi->cached_master == NULL); serverLog(LL_NOTICE,"Caching the disconnected master state."); AssertCorrectThread(c); std::lock_guardlock)> clientlock(c->lock); @@ -2352,9 +2420,9 @@ void replicationCacheMaster(client *c) { * we want to discard te non processed query buffers and non processed * offsets, including pending transactions, already populated arguments, * pending outputs to the master. */ - sdsclear(server.master->querybuf); - sdsclear(server.master->pending_querybuf); - server.master->read_reploff = server.master->reploff; + sdsclear(mi->master->querybuf); + sdsclear(mi->master->pending_querybuf); + mi->master->read_reploff = mi->master->reploff; if (c->flags & CLIENT_MULTI) discardTransaction(c); listEmpty(c->reply); c->sentlen = 0; @@ -2365,7 +2433,7 @@ void replicationCacheMaster(client *c) { /* Save the master. Server.master will be set to null later by * replicationHandleMasterDisconnection(). */ - server.cached_master = server.master; + mi->cached_master = mi->master; /* Invalidate the Peer ID cache. */ if (c->peerid) { @@ -2376,7 +2444,7 @@ void replicationCacheMaster(client *c) { /* Caching the master happens instead of the actual freeClient() call, * so make sure to adjust the replication state. This function will * also set server.master to NULL. */ - replicationHandleMasterDisconnection(); + replicationHandleMasterDisconnection(mi); } /* This function is called when a master is turend into a slave, in order to @@ -2388,35 +2456,35 @@ void replicationCacheMaster(client *c) { * the new master will accept its replication ID, and potentiall also the * current offset if no data was lost during the failover. So we use our * current replication ID and offset in order to synthesize a cached master. */ -void replicationCacheMasterUsingMyself(void) { +void replicationCacheMasterUsingMyself(redisMaster *mi) { /* The master client we create can be set to any DBID, because * the new master will start its replication stream with SELECT. */ - server.master_initial_offset = server.master_repl_offset; - replicationCreateMasterClient(-1,-1); - std::lock_guardlock)> lock(server.master->lock); + mi->master_initial_offset = server.master_repl_offset; + replicationCreateMasterClient(mi, -1,-1); + std::lock_guardmaster->lock)> lock(mi->master->lock); /* Use our own ID / offset. */ - memcpy(server.master->replid, server.replid, sizeof(server.replid)); + memcpy(mi->master->replid, server.replid, sizeof(server.replid)); /* Set as cached master. */ - unlinkClient(server.master); - server.cached_master = server.master; - server.master = NULL; + unlinkClient(mi->master); + mi->cached_master = mi->master; + mi->master = NULL; serverLog(LL_NOTICE,"Before turning into a replica, using my master parameters to synthesize a cached master: I may be able to synchronize with the new master with just a partial transfer."); } /* Free a cached master, called when there are no longer the conditions for * a partial resync on reconnection. */ -void replicationDiscardCachedMaster(void) { - if (server.cached_master == NULL) return; +void replicationDiscardCachedMaster(redisMaster *mi) { + if (mi->cached_master == NULL) return; serverLog(LL_NOTICE,"Discarding previously cached master state."); - server.cached_master->flags &= ~CLIENT_MASTER; - if (FCorrectThread(server.cached_master)) - freeClient(server.cached_master); + mi->cached_master->flags &= ~CLIENT_MASTER; + if (FCorrectThread(mi->cached_master)) + freeClient(mi->cached_master); else - freeClientAsync(server.cached_master); - server.cached_master = NULL; + freeClientAsync(mi->cached_master); + mi->cached_master = NULL; } /* Turn the cached master into the current master, using the file descriptor @@ -2425,35 +2493,35 @@ void replicationDiscardCachedMaster(void) { * This function is called when successfully setup a partial resynchronization * so the stream of data that we'll receive will start from were this * master left. */ -void replicationResurrectCachedMaster(int newfd) { - server.master = server.cached_master; - server.cached_master = NULL; - server.master->fd = newfd; - server.master->flags &= ~(CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP); - server.master->authenticated = 1; - server.master->lastinteraction = server.unixtime; - server.repl_state = REPL_STATE_CONNECTED; - server.repl_down_since = 0; +void replicationResurrectCachedMaster(redisMaster *mi, int newfd) { + mi->master = mi->cached_master; + mi->cached_master = NULL; + mi->master->fd = newfd; + mi->master->flags &= ~(CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP); + mi->master->authenticated = 1; + mi->master->lastinteraction = server.unixtime; + mi->repl_state = REPL_STATE_CONNECTED; + mi->repl_down_since = 0; /* Normally changing the thread of a client is a BIG NONO, but this client was unlinked so its OK here */ - server.master->iel = serverTL - server.rgthreadvar; // martial to this thread + mi->master->iel = serverTL - server.rgthreadvar; // martial to this thread /* Re-add to the list of clients. */ - linkClient(server.master); - if (aeCreateFileEvent(server.rgthreadvar[server.master->iel].el, newfd, AE_READABLE|AE_READ_THREADSAFE, - readQueryFromClient, server.master)) { + linkClient(mi->master); + if (aeCreateFileEvent(server.rgthreadvar[mi->master->iel].el, newfd, AE_READABLE|AE_READ_THREADSAFE, + readQueryFromClient, mi->master)) { serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno)); - freeClientAsync(server.master); /* Close ASAP. */ + freeClientAsync(mi->master); /* Close ASAP. */ } /* We may also need to install the write handler as well if there is * pending data in the write buffers. */ - if (clientHasPendingReplies(server.master)) { - if (aeCreateFileEvent(server.rgthreadvar[server.master->iel].el, newfd, AE_WRITABLE|AE_WRITE_THREADSAFE, - sendReplyToClient, server.master)) { + if (clientHasPendingReplies(mi->master)) { + if (aeCreateFileEvent(server.rgthreadvar[mi->master->iel].el, newfd, AE_WRITABLE|AE_WRITE_THREADSAFE, + sendReplyToClient, mi->master)) { serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno)); - freeClientAsync(server.master); /* Close ASAP. */ + freeClientAsync(mi->master); /* Close ASAP. */ } } } @@ -2624,7 +2692,7 @@ void waitCommand(client *c) { long numreplicas, ackreplicas; long long offset = c->woff; - if (server.masterhost) { + if (listLength(server.masters)) { addReplyError(c,"WAIT cannot be used with replica instances. Please also note that since Redis 4.0 if a replica is configured to be writable (which is not the default) writes to replicas are just local and are not propagated."); return; } @@ -2704,14 +2772,14 @@ void processClientsWaitingReplicas(void) { /* Return the slave replication offset for this instance, that is * the offset for which we already processed the master replication stream. */ -long long replicationGetSlaveOffset(void) { +long long replicationGetSlaveOffset(redisMaster *mi) { long long offset = 0; - if (server.masterhost != NULL) { - if (server.master) { - offset = server.master->reploff; - } else if (server.cached_master) { - offset = server.cached_master->reploff; + if (mi->masterhost != NULL) { + if (mi->master) { + offset = mi->master->reploff; + } else if (mi->cached_master) { + offset = mi->cached_master->reploff; } } /* offset may be -1 when the master does not support it at all, however @@ -2726,61 +2794,69 @@ long long replicationGetSlaveOffset(void) { /* Replication cron function, called 1 time per second. */ void replicationCron(void) { - serverAssert(GlobalLocksAcquired()); static long long replication_cron_loops = 0; - std::unique_locklock)> ulock; - if (server.master != nullptr) - ulock = decltype(ulock)(server.master->lock); - - /* Non blocking connection timeout? */ - if (server.masterhost && - (server.repl_state == REPL_STATE_CONNECTING || - slaveIsInHandshakeState()) && - (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout) + serverAssert(GlobalLocksAcquired()); + listIter liMaster; + listNode *lnMaster; + listRewind(server.masters, &liMaster); + while ((lnMaster = listNext(&liMaster))) { - serverLog(LL_WARNING,"Timeout connecting to the MASTER..."); - cancelReplicationHandshake(); - } + redisMaster *mi = (redisMaster*)listNodeValue(lnMaster); - /* Bulk transfer I/O timeout? */ - if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER && - (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout) - { - serverLog(LL_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value."); - cancelReplicationHandshake(); - } + std::unique_lockmaster->lock)> ulock; + if (mi->master != nullptr) + ulock = decltype(ulock)(mi->master->lock); - /* Timed out master when we are an already connected slave? */ - if (server.masterhost && server.repl_state == REPL_STATE_CONNECTED && - (time(NULL)-server.master->lastinteraction) > server.repl_timeout) - { - serverLog(LL_WARNING,"MASTER timeout: no data nor PING received..."); - if (FCorrectThread(server.master)) - freeClient(server.master); - else - freeClientAsync(server.master); - } - - /* Check if we should connect to a MASTER */ - if (server.repl_state == REPL_STATE_CONNECT) { - serverLog(LL_NOTICE,"Connecting to MASTER %s:%d", - server.masterhost, server.masterport); - if (connectWithMaster() == C_OK) { - serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started"); + /* Non blocking connection timeout? */ + if (mi->masterhost && + (mi->repl_state == REPL_STATE_CONNECTING || + slaveIsInHandshakeState(mi)) && + (time(NULL)-mi->repl_transfer_lastio) > server.repl_timeout) + { + serverLog(LL_WARNING,"Timeout connecting to the MASTER..."); + cancelReplicationHandshake(mi); } - } - /* Send ACK to master from time to time. - * Note that we do not send periodic acks to masters that don't - * support PSYNC and replication offsets. */ - if (server.masterhost && server.master && - !(server.master->flags & CLIENT_PRE_PSYNC)) - replicationSendAck(); + /* Bulk transfer I/O timeout? */ + if (mi->masterhost && mi->repl_state == REPL_STATE_TRANSFER && + (time(NULL)-mi->repl_transfer_lastio) > server.repl_timeout) + { + serverLog(LL_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value."); + cancelReplicationHandshake(mi); + } + + /* Timed out master when we are an already connected slave? */ + if (mi->masterhost && mi->repl_state == REPL_STATE_CONNECTED && + (time(NULL)-mi->master->lastinteraction) > server.repl_timeout) + { + serverLog(LL_WARNING,"MASTER timeout: no data nor PING received..."); + if (FCorrectThread(mi->master)) + freeClient(mi->master); + else + freeClientAsync(mi->master); + } + + /* Check if we should connect to a MASTER */ + if (mi->repl_state == REPL_STATE_CONNECT) { + serverLog(LL_NOTICE,"Connecting to MASTER %s:%d", + mi->masterhost, mi->masterport); + if (connectWithMaster(mi) == C_OK) { + serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started"); + } + } + + /* Send ACK to master from time to time. + * Note that we do not send periodic acks to masters that don't + * support PSYNC and replication offsets. */ + if (mi->masterhost && mi->master && + !(mi->master->flags & CLIENT_PRE_PSYNC)) + replicationSendAck(mi); + } /* If we have attached slaves, PING them from time to time. - * So slaves can implement an explicit timeout to masters, and will - * be able to detect a link disconnection even if the TCP connection - * will not actually go down. */ + * So slaves can implement an explicit timeout to masters, and will + * be able to detect a link disconnection even if the TCP connection + * will not actually go down. */ listIter li; listNode *ln; robj *ping_argv[1]; @@ -2796,19 +2872,19 @@ void replicationCron(void) { } /* Second, send a newline to all the slaves in pre-synchronization - * stage, that is, slaves waiting for the master to create the RDB file. - * - * Also send the a newline to all the chained slaves we have, if we lost - * connection from our master, to keep the slaves aware that their - * master is online. This is needed since sub-slaves only receive proxied - * data from top-level masters, so there is no explicit pinging in order - * to avoid altering the replication offsets. This special out of band - * pings (newlines) can be sent, they will have no effect in the offset. - * - * The newline will be ignored by the slave but will refresh the - * last interaction timer preventing a timeout. In this case we ignore the - * ping period and refresh the connection once per second since certain - * timeouts are set at a few seconds (example: PSYNC response). */ + * stage, that is, slaves waiting for the master to create the RDB file. + * + * Also send the a newline to all the chained slaves we have, if we lost + * connection from our master, to keep the slaves aware that their + * master is online. This is needed since sub-slaves only receive proxied + * data from top-level masters, so there is no explicit pinging in order + * to avoid altering the replication offsets. This special out of band + * pings (newlines) can be sent, they will have no effect in the offset. + * + * The newline will be ignored by the slave but will refresh the + * last interaction timer preventing a timeout. In this case we ignore the + * ping period and refresh the connection once per second since certain + * timeouts are set at a few seconds (example: PSYNC response). */ listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = (client*)ln->value; @@ -2816,7 +2892,7 @@ void replicationCron(void) { int is_presync = (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START || (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END && - server.rdb_child_type != RDB_CHILD_TYPE_SOCKET)); + server.rdb_child_type != RDB_CHILD_TYPE_SOCKET)); if (is_presync) { if (write(slave->fd, "\n", 1) == -1) { @@ -2849,32 +2925,32 @@ void replicationCron(void) { } /* If this is a master without attached slaves and there is a replication - * backlog active, in order to reclaim memory we can free it after some - * (configured) time. Note that this cannot be done for slaves: slaves - * without sub-slaves attached should still accumulate data into the - * backlog, in order to reply to PSYNC queries if they are turned into - * masters after a failover. */ + * backlog active, in order to reclaim memory we can free it after some + * (configured) time. Note that this cannot be done for slaves: slaves + * without sub-slaves attached should still accumulate data into the + * backlog, in order to reply to PSYNC queries if they are turned into + * masters after a failover. */ if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit && - server.repl_backlog && server.masterhost == NULL) + server.repl_backlog && listLength(server.masters) == 0) { time_t idle = server.unixtime - server.repl_no_slaves_since; if (idle > server.repl_backlog_time_limit) { /* When we free the backlog, we always use a new - * replication ID and clear the ID2. This is needed - * because when there is no backlog, the master_repl_offset - * is not updated, but we would still retain our replication - * ID, leading to the following problem: - * - * 1. We are a master instance. - * 2. Our slave is promoted to master. It's repl-id-2 will - * be the same as our repl-id. - * 3. We, yet as master, receive some updates, that will not - * increment the master_repl_offset. - * 4. Later we are turned into a slave, connect to the new - * master that will accept our PSYNC request by second - * replication ID, but there will be data inconsistency - * because we received writes. */ + * replication ID and clear the ID2. This is needed + * because when there is no backlog, the master_repl_offset + * is not updated, but we would still retain our replication + * ID, leading to the following problem: + * + * 1. We are a master instance. + * 2. Our slave is promoted to master. It's repl-id-2 will + * be the same as our repl-id. + * 3. We, yet as master, receive some updates, that will not + * increment the master_repl_offset. + * 4. Later we are turned into a slave, connect to the new + * master that will accept our PSYNC request by second + * replication ID, but there will be data inconsistency + * because we received writes. */ changeReplicationId(); clearReplicationId2(); freeReplicationBacklog(); @@ -2886,8 +2962,8 @@ void replicationCron(void) { } /* If AOF is disabled and we no longer have attached slaves, we can - * free our Replication Script Cache as there is no need to propagate - * EVALSHA at all. */ + * free our Replication Script Cache as there is no need to propagate + * EVALSHA at all. */ if (listLength(server.slaves) == 0 && server.aof_state == AOF_OFF && listLength(server.repl_scriptcache_fifo) != 0) @@ -2896,11 +2972,11 @@ void replicationCron(void) { } /* Start a BGSAVE good for replication if we have slaves in - * WAIT_BGSAVE_START state. - * - * In case of diskless replication, we make sure to wait the specified - * number of seconds (according to configuration) so that other slaves - * have the time to arrive before we start streaming. */ + * WAIT_BGSAVE_START state. + * + * In case of diskless replication, we make sure to wait the specified + * number of seconds (according to configuration) so that other slaves + * have the time to arrive before we start streaming. */ if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) { time_t idle, max_idle = 0; int slaves_waiting = 0; @@ -2922,11 +2998,11 @@ void replicationCron(void) { if (slaves_waiting && (!server.repl_diskless_sync || - max_idle > server.repl_diskless_sync_delay)) + max_idle > server.repl_diskless_sync_delay)) { /* Start the BGSAVE. The called function may start a - * BGSAVE with socket target or disk target depending on the - * configuration and slaves capabilities. */ + * BGSAVE with socket target or disk target depending on the + * configuration and slaves capabilities. */ startBgsaveForReplication(mincapa); } } @@ -2935,3 +3011,49 @@ void replicationCron(void) { refreshGoodSlavesCount(); replication_cron_loops++; /* Incremented with frequency 1 HZ. */ } + +int FBrokenLinkToMaster() +{ + listIter li; + listNode *ln; + listRewind(server.masters, &li); + + while ((ln = listNext(&li))) + { + redisMaster *mi = (redisMaster*)listNodeValue(ln); + if (mi->repl_state != REPL_STATE_CONNECTED) + return true; + } + return false; +} + +int FActiveMaster(client *c) +{ + if (!(c->flags & CLIENT_MASTER)) + return false; + + listIter li; + listNode *ln; + listRewind(server.masters, &li); + while ((ln = listNext(&li))) + { + redisMaster *mi = (redisMaster*)listNodeValue(ln); + if (mi->master == c) + return true; + } + return false; +} + +redisMaster *MasterInfoFromClient(client *c) +{ + listIter li; + listNode *ln; + listRewind(server.masters, &li); + while ((ln = listNext(&li))) + { + redisMaster *mi = (redisMaster*)listNodeValue(ln); + if (mi->master == c || mi->cached_master == c) + return mi; + } + return nullptr; +} \ No newline at end of file diff --git a/src/scripting.cpp b/src/scripting.cpp index b23c5dc5a..442287df0 100644 --- a/src/scripting.cpp +++ b/src/scripting.cpp @@ -532,7 +532,7 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) { luaPushError(lua, "Write commands not allowed after non deterministic commands. Call redis.replicate_commands() at the start of your script in order to switch to single commands replication mode."); goto cleanup; - } else if (server.masterhost && server.repl_slave_ro && + } else if (listLength(server.masters) && server.repl_slave_ro && !server.loading && !(server.lua_caller->flags & CLIENT_MASTER)) { @@ -558,7 +558,7 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) { * in the middle. */ if (server.maxmemory && /* Maxmemory is actually enabled. */ !server.loading && /* Don't care about mem if loading. */ - !server.masterhost && /* Slave must execute the script. */ + !listLength(server.masters) && /* Slave must execute the script. */ server.lua_write_dirty == 0 && /* Script had no side effects so far. */ (cmd->flags & CMD_DENYOOM)) { @@ -1420,8 +1420,15 @@ void evalGenericCommand(client *c, int evalsha) { /* Restore the client that was protected when the script timeout * was detected. */ unprotectClient(c); - if (server.masterhost && server.master) - queueClientForReprocessing(server.master); + listIter li; + listNode *ln; + listRewind(server.masters, &li); + while ((ln = listNext(&li))) + { + struct redisMaster *mi = (struct redisMaster*)listNodeValue(ln); + if (mi->master) + queueClientForReprocessing(mi->master); + } } server.lua_caller = NULL; diff --git a/src/server.cpp b/src/server.cpp index 14f4d0832..445039dfe 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1046,7 +1046,7 @@ void serverLogRaw(int level, const char *msg) { } else if (pid != server.pid) { role_char = 'C'; /* RDB / AOF writing child. */ } else { - role_char = (server.masterhost ? 'S':'M'); /* Slave or Master. */ + role_char = (listLength(server.masters) ? 'S':'M'); /* Slave or Master. */ } fprintf(fp,"%d:%c %s %c %s\n", (int)getpid(),role_char, buf,c[level],msg); @@ -1687,9 +1687,9 @@ void clientsCron(int iel) { void databasesCron(void) { /* Expire keys by random sampling. Not required for slaves * as master will synthesize DELs for us. */ - if (server.active_expire_enabled && server.masterhost == NULL) { + if (server.active_expire_enabled && listLength(server.masters) == 0) { activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); - } else if (server.masterhost != NULL) { + } else if (listLength(server.masters)) { expireSlaveKeys(); } @@ -2074,7 +2074,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* Run a fast expire cycle (the called function will return * ASAP if a fast cycle is not needed). */ - if (server.active_expire_enabled && server.masterhost == NULL) + if (server.active_expire_enabled && listLength(server.masters) == 0) activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); /* Send all the slaves an ACK request if at least one client blocked @@ -2254,6 +2254,19 @@ extern "C" void createSharedObjects(void) { shared.maxstring = sdsnew("maxstring"); } +void initMasterInfo(redisMaster *master) +{ + master->masterauth = NULL; + master->masterhost = NULL; + master->masterport = 6379; + master->master = NULL; + master->cached_master = NULL; + master->master_initial_offset = -1; + + master->repl_state = REPL_STATE_NONE; + master->repl_down_since = 0; /* Never connected, repl is down since EVER. */ +} + void initServerConfig(void) { int j; @@ -2383,19 +2396,12 @@ void initServerConfig(void) { appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */ /* Replication related */ - server.masterauth = NULL; - server.masterhost = NULL; - server.masterport = 6379; - server.master = NULL; - server.cached_master = NULL; - server.master_initial_offset = -1; - server.repl_state = REPL_STATE_NONE; + server.masters = listCreate(); server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT; server.repl_serve_stale_data = CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA; server.repl_slave_ro = CONFIG_DEFAULT_SLAVE_READ_ONLY; server.repl_slave_ignore_maxmemory = CONFIG_DEFAULT_SLAVE_IGNORE_MAXMEMORY; server.repl_slave_lazy_flush = CONFIG_DEFAULT_SLAVE_LAZY_FLUSH; - server.repl_down_since = 0; /* Never connected, repl is down since EVER. */ server.repl_disable_tcp_nodelay = CONFIG_DEFAULT_REPL_DISABLE_TCP_NODELAY; server.repl_diskless_sync = CONFIG_DEFAULT_REPL_DISKLESS_SYNC; server.repl_diskless_sync_delay = CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY; @@ -3504,7 +3510,7 @@ int processCommand(client *c) { * and if this is a master instance. */ int deny_write_type = writeCommandsDeniedByDiskError(); if (deny_write_type != DISK_ERROR_TYPE_NONE && - server.masterhost == NULL && + listLength(server.masters) == 0 && (c->cmd->flags & CMD_WRITE || c->cmd->proc == pingCommand)) { @@ -3521,7 +3527,7 @@ int processCommand(client *c) { /* Don't accept write commands if there are not enough good slaves and * user configured the min-slaves-to-write option. */ - if (server.masterhost == NULL && + if (listLength(server.masters) == 0 && server.repl_min_slaves_to_write && server.repl_min_slaves_max_lag && c->cmd->flags & CMD_WRITE && @@ -3534,7 +3540,7 @@ int processCommand(client *c) { /* Don't accept write commands if this is a read only slave. But * accept write commands if this is our master. */ - if (server.masterhost && server.repl_slave_ro && + if (listLength(server.masters) && server.repl_slave_ro && !(c->flags & CLIENT_MASTER) && c->cmd->flags & CMD_WRITE) { @@ -3557,7 +3563,7 @@ int processCommand(client *c) { /* Only allow commands with flag "t", such as INFO, SLAVEOF and so on, * when slave-serve-stale-data is no and we are a slave with a broken * link with master. */ - if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED && + if (FBrokenLinkToMaster() && server.repl_serve_stale_data == 0 && !(c->cmd->flags & CMD_STALE)) { @@ -4284,46 +4290,57 @@ extern "C" sds genRedisInfoString(const char *section) { info = sdscatprintf(info, "# Replication\r\n" "role:%s\r\n", - server.masterhost == NULL ? "master" : "slave"); - if (server.masterhost) { - long long slave_repl_offset = 1; + listLength(server.masters) == 0 ? "master" : "slave"); + if (listLength(server.masters)) { + listIter li; + listNode *ln; + listRewind(server.masters, &li); - if (server.master) - slave_repl_offset = server.master->reploff; - else if (server.cached_master) - slave_repl_offset = server.cached_master->reploff; + int cmasters = 0; + while ((ln = listNext(&li))) + { + long long slave_repl_offset = 1; + redisMaster *mi = (redisMaster*)listNodeValue(ln); + info = sdscatprintf(info, "Master %d: \r\n", cmasters); + ++cmasters; - info = sdscatprintf(info, - "master_host:%s\r\n" - "master_port:%d\r\n" - "master_link_status:%s\r\n" - "master_last_io_seconds_ago:%d\r\n" - "master_sync_in_progress:%d\r\n" - "slave_repl_offset:%lld\r\n" - ,server.masterhost, - server.masterport, - (server.repl_state == REPL_STATE_CONNECTED) ? - "up" : "down", - server.master ? - ((int)(server.unixtime-server.master->lastinteraction)) : -1, - server.repl_state == REPL_STATE_TRANSFER, - slave_repl_offset - ); + if (mi->master) + slave_repl_offset = mi->master->reploff; + else if (mi->cached_master) + slave_repl_offset = mi->cached_master->reploff; - if (server.repl_state == REPL_STATE_TRANSFER) { info = sdscatprintf(info, - "master_sync_left_bytes:%lld\r\n" - "master_sync_last_io_seconds_ago:%d\r\n" - , (long long) - (server.repl_transfer_size - server.repl_transfer_read), - (int)(server.unixtime-server.repl_transfer_lastio) + "\tmaster_host:%s\r\n" + "\tmaster_port:%d\r\n" + "\tmaster_link_status:%s\r\n" + "\tmaster_last_io_seconds_ago:%d\r\n" + "\tmaster_sync_in_progress:%d\r\n" + "\tslave_repl_offset:%lld\r\n" + ,mi->masterhost, + mi->masterport, + (mi->repl_state == REPL_STATE_CONNECTED) ? + "up" : "down", + mi->master ? + ((int)(server.unixtime-mi->master->lastinteraction)) : -1, + mi->repl_state == REPL_STATE_TRANSFER, + slave_repl_offset ); - } - if (server.repl_state != REPL_STATE_CONNECTED) { - info = sdscatprintf(info, - "master_link_down_since_seconds:%jd\r\n", - (intmax_t)server.unixtime-server.repl_down_since); + if (mi->repl_state == REPL_STATE_TRANSFER) { + info = sdscatprintf(info, + "\tmaster_sync_left_bytes:%lld\r\n" + "\tmaster_sync_last_io_seconds_ago:%d\r\n" + , (long long) + (mi->repl_transfer_size - mi->repl_transfer_read), + (int)(server.unixtime-mi->repl_transfer_lastio) + ); + } + + if (mi->repl_state != REPL_STATE_CONNECTED) { + info = sdscatprintf(info, + "\tmaster_link_down_since_seconds:%jd\r\n", + (intmax_t)server.unixtime-mi->repl_down_since); + } } info = sdscatprintf(info, "slave_priority:%d\r\n" @@ -4694,7 +4711,7 @@ void loadDataFromDisk(void) { (float)(ustime()-start)/1000000); /* Restore the replication ID / offset from the RDB file. */ - if ((server.masterhost || (server.cluster_enabled && nodeIsSlave(server.cluster->myself)))&& + if ((listLength(server.masters) || (server.cluster_enabled && nodeIsSlave(server.cluster->myself)))&& rsi.repl_id_is_set && rsi.repl_offset != -1 && /* Note that older implementations may save a repl_stream_db @@ -4704,11 +4721,19 @@ void loadDataFromDisk(void) { { memcpy(server.replid,rsi.repl_id,sizeof(server.replid)); server.master_repl_offset = rsi.repl_offset; - /* If we are a slave, create a cached master from this - * information, in order to allow partial resynchronizations - * with masters. */ - replicationCacheMasterUsingMyself(); - selectDb(server.cached_master,rsi.repl_stream_db); + listIter li; + listNode *ln; + + listRewind(server.masters, &li); + while ((ln = listNext(&li))) + { + redisMaster *mi = (redisMaster*)listNodeValue(ln); + /* If we are a slave, create a cached master from this + * information, in order to allow partial resynchronizations + * with masters. */ + replicationCacheMasterUsingMyself(mi); + selectDb(mi->cached_master,rsi.repl_stream_db); + } } } else if (errno != ENOENT) { serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno)); diff --git a/src/server.h b/src/server.h index 26a9dac00..f9101ba5f 100644 --- a/src/server.h +++ b/src/server.h @@ -1070,6 +1070,33 @@ struct redisServerThreadVars { struct fastlock lockPendingWrite; }; +struct redisMaster { + char *masteruser; /* AUTH with this user and masterauth with master */ + char *masterauth; /* AUTH with this password with master */ + char *masterhost; /* Hostname of master */ + int masterport; /* Port of master */ + client *cached_master; /* Cached master to be reused for PSYNC. */ + client *master; + /* The following two fields is where we store master PSYNC replid/offset + * while the PSYNC is in progress. At the end we'll copy the fields into + * the server->master client structure. */ + char master_replid[CONFIG_RUN_ID_SIZE+1]; /* Master PSYNC runid. */ + long long master_initial_offset; /* Master PSYNC offset. */ + + int repl_state; /* Replication status if the instance is a slave */ + off_t repl_transfer_size; /* Size of RDB to read from master during sync. */ + off_t repl_transfer_read; /* Amount of RDB read from master during sync. */ + off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */ + int repl_transfer_s; /* Slave -> Master SYNC socket */ + int repl_transfer_fd; /* Slave -> Master SYNC temp file descriptor */ + char *repl_transfer_tmpfile; /* Slave-> master SYNC temp file name */ + time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */ + time_t repl_down_since; /* Unix time at which link with master went down */ + + unsigned char master_uuid[UUID_BINARY_LEN]; /* Used during sync with master, this is our master's UUID */ + /* After we've connected with our master use the UUID in server.master */ +}; + struct redisServer { /* General */ pid_t pid; /* Main process pid. */ @@ -1183,6 +1210,8 @@ struct redisServer { int idx; } inst_metric[STATS_METRIC_COUNT]; /* Configuration */ + char *default_masteruser; /* AUTH with this user and masterauth with master */ + char *default_masterauth; /* AUTH with this password with master */ int verbosity; /* Loglevel in redis.conf */ int maxidletime; /* Client timeout in seconds */ int tcpkeepalive; /* Set SO_KEEPALIVE if non-zero. */ @@ -1296,35 +1325,16 @@ struct redisServer { int repl_diskless_sync; /* Send RDB to slaves sockets directly. */ int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */ /* Replication (slave) */ - char *masteruser; /* AUTH with this user and masterauth with master */ - char *masterauth; /* AUTH with this password with master */ - char *masterhost; /* Hostname of master */ - int masterport; /* Port of master */ + list *masters; int repl_timeout; /* Timeout after N seconds of master idle */ - client *master; /* Client that is master for this slave */ - client *cached_master; /* Cached master to be reused for PSYNC. */ int repl_syncio_timeout; /* Timeout for synchronous I/O calls */ - int repl_state; /* Replication status if the instance is a slave */ - off_t repl_transfer_size; /* Size of RDB to read from master during sync. */ - off_t repl_transfer_read; /* Amount of RDB read from master during sync. */ - off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */ - int repl_transfer_s; /* Slave -> Master SYNC socket */ - int repl_transfer_fd; /* Slave -> Master SYNC temp file descriptor */ - char *repl_transfer_tmpfile; /* Slave-> master SYNC temp file name */ - time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */ + int repl_disable_tcp_nodelay; /* Disable TCP_NODELAY after SYNC? */ int repl_serve_stale_data; /* Serve stale data when link is down? */ int repl_slave_ro; /* Slave is read only? */ int repl_slave_ignore_maxmemory; /* If true slaves do not evict. */ - time_t repl_down_since; /* Unix time at which link with master went down */ - int repl_disable_tcp_nodelay; /* Disable TCP_NODELAY after SYNC? */ int slave_priority; /* Reported in INFO and used by Sentinel. */ int slave_announce_port; /* Give the master this listening port. */ char *slave_announce_ip; /* Give the master this ip address. */ - /* The following two fields is where we store master PSYNC replid/offset - * while the PSYNC is in progress. At the end we'll copy the fields into - * the server->master client structure. */ - char master_replid[CONFIG_RUN_ID_SIZE+1]; /* Master PSYNC runid. */ - long long master_initial_offset; /* Master PSYNC offset. */ int repl_slave_lazy_flush; /* Lazy FLUSHALL before loading DB? */ /* Replication script cache. */ dict *repl_scriptcache_dict; /* SHA1 all slaves are aware of. */ @@ -1436,8 +1446,6 @@ struct redisServer { int fActiveReplica; /* Can this replica also be a master? */ unsigned char uuid[UUID_BINARY_LEN]; /* This server's UUID - populated on boot */ - unsigned char master_uuid[UUID_BINARY_LEN]; /* Used during sync with master, this is our master's UUID */ - /* After we've connected with our master use the UUID in server.master */ struct fastlock flock; }; @@ -1771,16 +1779,17 @@ ssize_t syncRead(int fd, char *ptr, ssize_t size, long long timeout); ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout); /* Replication */ +void initMasterInfo(struct redisMaster *master); void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc); void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen); void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc); void updateSlavesWaitingBgsave(int bgsaveerr, int type); void replicationCron(void); -void replicationHandleMasterDisconnection(void); -void replicationCacheMaster(client *c); +void replicationHandleMasterDisconnection(struct redisMaster *mi); +void replicationCacheMaster(struct redisMaster *mi, client *c); void resizeReplicationBacklog(long long newsize); -void replicationSetMaster(char *ip, int port); -void replicationUnsetMaster(void); +struct redisMaster *replicationAddMaster(char *ip, int port); +void replicationUnsetMaster(struct redisMaster *mi); void refreshGoodSlavesCount(void); void replicationScriptCacheInit(void); void replicationScriptCacheFlush(void); @@ -1789,15 +1798,15 @@ int replicationScriptCacheExists(sds sha1); void processClientsWaitingReplicas(void); void unblockClientWaitingReplicas(client *c); int replicationCountAcksByOffset(long long offset); -void replicationSendNewlineToMaster(void); -long long replicationGetSlaveOffset(void); +void replicationSendNewlineToMaster(struct redisMaster *mi); +long long replicationGetSlaveOffset(struct redisMaster *mi); char *replicationGetSlaveName(client *c); long long getPsyncInitialOffset(void); int replicationSetupSlaveForFullResync(client *slave, long long offset); void changeReplicationId(void); void clearReplicationId2(void); void chopReplicationBacklog(void); -void replicationCacheMasterUsingMyself(void); +void replicationCacheMasterUsingMyself(struct redisMaster *mi); void feedReplicationBacklog(void *ptr, size_t len); /* Generic persistence functions */ @@ -2350,6 +2359,10 @@ void xtrimCommand(client *c); void lolwutCommand(client *c); void aclCommand(client *c); +int FBrokenLinkToMaster(); +int FActiveMaster(client *c); +struct redisMaster *MasterInfoFromClient(client *c); + #if defined(__GNUC__) #ifndef __cplusplus void *calloc(size_t count, size_t size) __attribute__ ((deprecated));