Save and restore master info in rdb to allow active replica partial sync (#371)
* save replid for all masters in rdb * expanded rdbSaveInfo to hold multiple master structs * parse repl-masters from rdb * recover replid info from rdb in active replica mode, attempt partial sync * save offset from rdb into correct variable * don't change replid based on master in active rep * save and load psync info from correct fields
This commit is contained in:
parent
05bfd007f3
commit
c81cc4ee2b
@ -884,7 +884,7 @@ int loadAppendOnlyFile(char *filename) {
|
|||||||
} else {
|
} else {
|
||||||
/* RDB preamble. Pass loading the RDB functions. */
|
/* RDB preamble. Pass loading the RDB functions. */
|
||||||
rio rdb;
|
rio rdb;
|
||||||
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
|
rdbSaveInfo rsi;
|
||||||
|
|
||||||
serverLog(LL_NOTICE,"Reading RDB preamble from AOF file...");
|
serverLog(LL_NOTICE,"Reading RDB preamble from AOF file...");
|
||||||
if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
|
if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
|
||||||
|
61
src/rdb.cpp
61
src/rdb.cpp
@ -1218,6 +1218,40 @@ int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
|
|||||||
== -1) return -1;
|
== -1) return -1;
|
||||||
if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",rsi->master_repl_offset)
|
if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",rsi->master_repl_offset)
|
||||||
== -1) return -1;
|
== -1) return -1;
|
||||||
|
if (g_pserver->fActiveReplica && listLength(g_pserver->masters) > 0) {
|
||||||
|
sdsstring val = sdsstring(sdsempty());
|
||||||
|
listNode *ln;
|
||||||
|
listIter li;
|
||||||
|
redisMaster* mi;
|
||||||
|
listRewind(g_pserver->masters,&li);
|
||||||
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
|
mi = (redisMaster*)listNodeValue(ln);
|
||||||
|
if (!mi->master) {
|
||||||
|
// If master client is not available, use info from master struct - better than nothing
|
||||||
|
serverLog(LL_NOTICE, "saving master %s", mi->master_replid);
|
||||||
|
if (mi->master_replid[0] == 0) {
|
||||||
|
// if replid is null, there's no reason to save it
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
val = val.catfmt("%s:%I:%s:%i;", mi->master_replid,
|
||||||
|
mi->master_initial_offset,
|
||||||
|
mi->masterhost,
|
||||||
|
mi->masterport);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
serverLog(LL_NOTICE, "saving master %s", mi->master->replid);
|
||||||
|
if (mi->master->replid[0] == 0) {
|
||||||
|
// if replid is null, there's no reason to save it
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
val = val.catfmt("%s:%I:%s:%i;", mi->master->replid,
|
||||||
|
mi->master->reploff,
|
||||||
|
mi->masterhost,
|
||||||
|
mi->masterport);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (rdbSaveAuxFieldStrStr(rdb, "repl-masters",val.get()) == -1) return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble) == -1) return -1;
|
if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble) == -1) return -1;
|
||||||
return 1;
|
return 1;
|
||||||
@ -1628,7 +1662,7 @@ int launchRdbSaveThread(pthread_t &child, rdbSaveInfo *rsi)
|
|||||||
} else
|
} else
|
||||||
{
|
{
|
||||||
rdbSaveThreadArgs *args = (rdbSaveThreadArgs*)zmalloc(sizeof(rdbSaveThreadArgs) + ((cserver.dbnum-1)*sizeof(redisDbPersistentDataSnapshot*)), MALLOC_LOCAL);
|
rdbSaveThreadArgs *args = (rdbSaveThreadArgs*)zmalloc(sizeof(rdbSaveThreadArgs) + ((cserver.dbnum-1)*sizeof(redisDbPersistentDataSnapshot*)), MALLOC_LOCAL);
|
||||||
rdbSaveInfo rsiT = RDB_SAVE_INFO_INIT;
|
rdbSaveInfo rsiT;
|
||||||
if (rsi == nullptr)
|
if (rsi == nullptr)
|
||||||
rsi = &rsiT;
|
rsi = &rsiT;
|
||||||
memcpy(&args->rsi, rsi, sizeof(rdbSaveInfo));
|
memcpy(&args->rsi, rsi, sizeof(rdbSaveInfo));
|
||||||
@ -2888,11 +2922,11 @@ public:
|
|||||||
* snapshot taken by the master may not be reflected on the replica. */
|
* snapshot taken by the master may not be reflected on the replica. */
|
||||||
bool fExpiredKey = iAmMaster() && !(this->rdbflags&RDBFLAGS_AOF_PREAMBLE) && job.expiretime != INVALID_EXPIRE && job.expiretime < this->now;
|
bool fExpiredKey = iAmMaster() && !(this->rdbflags&RDBFLAGS_AOF_PREAMBLE) && job.expiretime != INVALID_EXPIRE && job.expiretime < this->now;
|
||||||
if (fStaleMvccKey || fExpiredKey) {
|
if (fStaleMvccKey || fExpiredKey) {
|
||||||
if (fStaleMvccKey && !fExpiredKey && this->rsi != nullptr && this->rsi->mi != nullptr && this->rsi->mi->staleKeyMap != nullptr && lookupKeyRead(job.db, &keyobj) == nullptr) {
|
if (fStaleMvccKey && !fExpiredKey && this->rsi != nullptr && this->rsi->masters != nullptr && this->rsi->masters->staleKeyMap != nullptr && lookupKeyRead(job.db, &keyobj) == nullptr) {
|
||||||
// We have a key that we've already deleted and is not back in our database.
|
// We have a key that we've already deleted and is not back in our database.
|
||||||
// We'll need to inform the sending master of the delete if it is also a replica of us
|
// We'll need to inform the sending master of the delete if it is also a replica of us
|
||||||
robj_sharedptr objKeyDup(createStringObject(job.key, sdslen(job.key)));
|
robj_sharedptr objKeyDup(createStringObject(job.key, sdslen(job.key)));
|
||||||
this->rsi->mi->staleKeyMap->operator[](job.db->id).push_back(objKeyDup);
|
this->rsi->masters->staleKeyMap->operator[](job.db->id).push_back(objKeyDup);
|
||||||
}
|
}
|
||||||
sdsfree(job.key);
|
sdsfree(job.key);
|
||||||
job.key = nullptr;
|
job.key = nullptr;
|
||||||
@ -3206,6 +3240,23 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
|
|||||||
memcpy(rsi->repl_id,ptrFromObj(auxval),CONFIG_RUN_ID_SIZE+1);
|
memcpy(rsi->repl_id,ptrFromObj(auxval),CONFIG_RUN_ID_SIZE+1);
|
||||||
rsi->repl_id_is_set = 1;
|
rsi->repl_id_is_set = 1;
|
||||||
}
|
}
|
||||||
|
} else if (!strcasecmp(szFromObj(auxkey),"repl-masters")) {
|
||||||
|
if (rsi) {
|
||||||
|
struct redisMaster mi;
|
||||||
|
char *masters = szFromObj(auxval);
|
||||||
|
char *entry = strtok(masters, ":");
|
||||||
|
while (entry != NULL) {
|
||||||
|
memcpy(mi.master_replid, entry, sizeof(mi.master_replid));
|
||||||
|
entry = strtok(NULL, ":");
|
||||||
|
mi.master_initial_offset = atoi(entry);
|
||||||
|
entry = strtok(NULL, ":");
|
||||||
|
mi.masterhost = entry;
|
||||||
|
entry = strtok(NULL, ";");
|
||||||
|
mi.masterport = atoi(entry);
|
||||||
|
entry = strtok(NULL, ":");
|
||||||
|
rsi->addMaster(mi);
|
||||||
|
}
|
||||||
|
}
|
||||||
} else if (!strcasecmp(szFromObj(auxkey),"repl-offset")) {
|
} else if (!strcasecmp(szFromObj(auxkey),"repl-offset")) {
|
||||||
if (rsi) rsi->repl_offset = strtoll(szFromObj(auxval),NULL,10);
|
if (rsi) rsi->repl_offset = strtoll(szFromObj(auxval),NULL,10);
|
||||||
} else if (!strcasecmp(szFromObj(auxkey),"lua")) {
|
} else if (!strcasecmp(szFromObj(auxkey),"lua")) {
|
||||||
@ -3861,8 +3912,8 @@ void bgsaveCommand(client *c) {
|
|||||||
* is returned, and the RDB saving will not persist any replication related
|
* is returned, and the RDB saving will not persist any replication related
|
||||||
* information. */
|
* information. */
|
||||||
rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) {
|
rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) {
|
||||||
rdbSaveInfo rsi_init = RDB_SAVE_INFO_INIT;
|
rdbSaveInfo rsi_init;
|
||||||
*rsi = rsi_init;
|
*rsi = std::move(rsi_init);
|
||||||
|
|
||||||
memcpy(rsi->repl_id, g_pserver->replid, sizeof(g_pserver->replid));
|
memcpy(rsi->repl_id, g_pserver->replid, sizeof(g_pserver->replid));
|
||||||
rsi->master_repl_offset = g_pserver->master_repl_offset;
|
rsi->master_repl_offset = g_pserver->master_repl_offset;
|
||||||
|
@ -2954,7 +2954,7 @@ bool readSyncBulkPayloadRdb(connection *conn, redisMaster *mi, rdbSaveInfo &rsi,
|
|||||||
mi->staleKeyMap->clear();
|
mi->staleKeyMap->clear();
|
||||||
else
|
else
|
||||||
mi->staleKeyMap = new (MALLOC_LOCAL) std::map<int, std::vector<robj_sharedptr>>();
|
mi->staleKeyMap = new (MALLOC_LOCAL) std::map<int, std::vector<robj_sharedptr>>();
|
||||||
rsi.mi = mi;
|
rsi.addMaster(*mi);
|
||||||
}
|
}
|
||||||
if (rdbLoadFile(rdb_filename,&rsi,RDBFLAGS_REPLICATION) != C_OK) {
|
if (rdbLoadFile(rdb_filename,&rsi,RDBFLAGS_REPLICATION) != C_OK) {
|
||||||
serverLog(LL_WARNING,
|
serverLog(LL_WARNING,
|
||||||
@ -2994,7 +2994,7 @@ error:
|
|||||||
}
|
}
|
||||||
|
|
||||||
void readSyncBulkPayload(connection *conn) {
|
void readSyncBulkPayload(connection *conn) {
|
||||||
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
|
rdbSaveInfo rsi;
|
||||||
redisMaster *mi = (redisMaster*)connGetPrivateData(conn);
|
redisMaster *mi = (redisMaster*)connGetPrivateData(conn);
|
||||||
static int usemark = 0;
|
static int usemark = 0;
|
||||||
if (mi == nullptr) {
|
if (mi == nullptr) {
|
||||||
@ -3043,7 +3043,7 @@ void readSyncBulkPayload(connection *conn) {
|
|||||||
{
|
{
|
||||||
mergeReplicationId(mi->master->replid);
|
mergeReplicationId(mi->master->replid);
|
||||||
}
|
}
|
||||||
else
|
else if (!g_pserver->fActiveReplica)
|
||||||
{
|
{
|
||||||
/* After a full resynchroniziation we use the replication ID and
|
/* After a full resynchroniziation we use the replication ID and
|
||||||
* offset of the master. The secondary ID / offset are cleared since
|
* offset of the master. The secondary ID / offset are cleared since
|
||||||
@ -3238,7 +3238,7 @@ int slaveTryPartialResynchronization(redisMaster *mi, connection *conn, int read
|
|||||||
* client structure representing the master into g_pserver->master. */
|
* client structure representing the master into g_pserver->master. */
|
||||||
mi->master_initial_offset = -1;
|
mi->master_initial_offset = -1;
|
||||||
|
|
||||||
if (mi->cached_master && !g_pserver->fActiveReplica) {
|
if (mi->cached_master) {
|
||||||
psync_replid = mi->cached_master->replid;
|
psync_replid = mi->cached_master->replid;
|
||||||
snprintf(psync_offset,sizeof(psync_offset),"%lld", mi->cached_master->reploff+1);
|
snprintf(psync_offset,sizeof(psync_offset),"%lld", mi->cached_master->reploff+1);
|
||||||
serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset);
|
serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset);
|
||||||
@ -3337,16 +3337,17 @@ int slaveTryPartialResynchronization(redisMaster *mi, connection *conn, int read
|
|||||||
sizeof(g_pserver->replid2));
|
sizeof(g_pserver->replid2));
|
||||||
g_pserver->second_replid_offset = g_pserver->master_repl_offset+1;
|
g_pserver->second_replid_offset = g_pserver->master_repl_offset+1;
|
||||||
|
|
||||||
|
if (!g_pserver->fActiveReplica) {
|
||||||
/* Update the cached master ID and our own primary ID to the
|
/* Update the cached master ID and our own primary ID to the
|
||||||
* new one. */
|
* new one. */
|
||||||
memcpy(g_pserver->replid,sznew,sizeof(g_pserver->replid));
|
memcpy(g_pserver->replid,sznew,sizeof(g_pserver->replid));
|
||||||
memcpy(mi->cached_master->replid,sznew,sizeof(g_pserver->replid));
|
memcpy(mi->cached_master->replid,sznew,sizeof(g_pserver->replid));
|
||||||
|
|
||||||
/* Disconnect all the sub-slaves: they need to be notified. */
|
/* Disconnect all the sub-slaves: they need to be notified. */
|
||||||
if (!g_pserver->fActiveReplica)
|
|
||||||
disconnectSlaves();
|
disconnectSlaves();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Setup the replication to continue. */
|
/* Setup the replication to continue. */
|
||||||
sdsfree(reply);
|
sdsfree(reply);
|
||||||
@ -3725,18 +3726,6 @@ retry_connect:
|
|||||||
disconnectSlavesExcept(mi->master_uuid); /* Force our slaves to resync with us as well. */
|
disconnectSlavesExcept(mi->master_uuid); /* Force our slaves to resync with us as well. */
|
||||||
freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
|
freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
|
||||||
}
|
}
|
||||||
else
|
|
||||||
{
|
|
||||||
if (listLength(g_pserver->slaves))
|
|
||||||
{
|
|
||||||
changeReplicationId();
|
|
||||||
clearReplicationId2();
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
|
/* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
|
||||||
* and the g_pserver->master_replid and master_initial_offset are
|
* and the g_pserver->master_replid and master_initial_offset are
|
||||||
@ -4405,6 +4394,26 @@ void replicationCacheMasterUsingMyself(redisMaster *mi) {
|
|||||||
mi->master = NULL;
|
mi->master = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* This function is called when reloading master info from an RDB in Active Replica mode.
|
||||||
|
* It creates a cached master client using the info contained in the redisMaster struct.
|
||||||
|
*
|
||||||
|
* Assumes that the passed struct contains valid master info. */
|
||||||
|
void replicationCacheMasterUsingMaster(redisMaster *mi) {
|
||||||
|
if (mi->cached_master) {
|
||||||
|
freeClient(mi->cached_master);
|
||||||
|
}
|
||||||
|
|
||||||
|
replicationCreateMasterClient(mi, NULL, -1);
|
||||||
|
std::lock_guard<decltype(mi->master->lock)> lock(mi->master->lock);
|
||||||
|
|
||||||
|
memcpy(mi->master->replid, mi->master_replid, sizeof(mi->master_replid));
|
||||||
|
mi->master->reploff = mi->master_initial_offset;
|
||||||
|
|
||||||
|
unlinkClient(mi->master);
|
||||||
|
mi->cached_master = mi->master;
|
||||||
|
mi->master = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
/* Free a cached master, called when there are no longer the conditions for
|
/* Free a cached master, called when there are no longer the conditions for
|
||||||
* a partial resync on reconnection. */
|
* a partial resync on reconnection. */
|
||||||
void replicationDiscardCachedMaster(redisMaster *mi) {
|
void replicationDiscardCachedMaster(redisMaster *mi) {
|
||||||
|
15
src/sds.h
15
src/sds.h
@ -429,6 +429,21 @@ public:
|
|||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sdsstring &operator=(sdsstring &&other)
|
||||||
|
{
|
||||||
|
sds tmp = m_str;
|
||||||
|
m_str = other.m_str;
|
||||||
|
other.m_str = tmp;
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
template<typename... Args>
|
||||||
|
sdsstring catfmt(const char *fmt, Args... args)
|
||||||
|
{
|
||||||
|
m_str = sdscatfmt(m_str, fmt, args...);
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
sds release() {
|
sds release() {
|
||||||
sds sdsT = m_str;
|
sds sdsT = m_str;
|
||||||
m_str = nullptr;
|
m_str = nullptr;
|
||||||
|
@ -6940,7 +6940,7 @@ void loadDataFromDisk(void) {
|
|||||||
if (loadAppendOnlyFile(g_pserver->aof_filename) == C_OK)
|
if (loadAppendOnlyFile(g_pserver->aof_filename) == C_OK)
|
||||||
serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
|
serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
|
||||||
} else if (g_pserver->rdb_filename != NULL || g_pserver->rdb_s3bucketpath != NULL) {
|
} else if (g_pserver->rdb_filename != NULL || g_pserver->rdb_s3bucketpath != NULL) {
|
||||||
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
|
rdbSaveInfo rsi;
|
||||||
rsi.fForceSetKey = false;
|
rsi.fForceSetKey = false;
|
||||||
errno = 0; /* Prevent a stale value from affecting error checking */
|
errno = 0; /* Prevent a stale value from affecting error checking */
|
||||||
if (rdbLoad(&rsi,RDBFLAGS_NONE) == C_OK) {
|
if (rdbLoad(&rsi,RDBFLAGS_NONE) == C_OK) {
|
||||||
@ -6969,6 +6969,17 @@ void loadDataFromDisk(void) {
|
|||||||
while ((ln = listNext(&li)))
|
while ((ln = listNext(&li)))
|
||||||
{
|
{
|
||||||
redisMaster *mi = (redisMaster*)listNodeValue(ln);
|
redisMaster *mi = (redisMaster*)listNodeValue(ln);
|
||||||
|
if (g_pserver->fActiveReplica) {
|
||||||
|
for (size_t i = 0; i < rsi.numMasters(); i++) {
|
||||||
|
if (!strcmp(mi->masterhost, rsi.masters[i].masterhost) && mi->masterport == rsi.masters[i].masterport) {
|
||||||
|
memcpy(mi->master_replid, rsi.masters[i].master_replid, sizeof(mi->master_replid));
|
||||||
|
mi->master_initial_offset = rsi.masters[i].master_initial_offset;
|
||||||
|
replicationCacheMasterUsingMaster(mi);
|
||||||
|
serverLog(LL_NOTICE, "Cached master recovered from RDB for %s:%d", mi->masterhost, mi->masterport);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
/* If we are a replica, create a cached master from this
|
/* If we are a replica, create a cached master from this
|
||||||
* information, in order to allow partial resynchronizations
|
* information, in order to allow partial resynchronizations
|
||||||
* with masters. */
|
* with masters. */
|
||||||
@ -6976,6 +6987,7 @@ void loadDataFromDisk(void) {
|
|||||||
selectDb(mi->cached_master,rsi.repl_stream_db);
|
selectDb(mi->cached_master,rsi.repl_stream_db);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
} else if (errno != ENOENT) {
|
} else if (errno != ENOENT) {
|
||||||
serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
|
serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
|
||||||
exit(1);
|
exit(1);
|
||||||
@ -7596,7 +7608,7 @@ int main(int argc, char **argv) {
|
|||||||
__AFL_INIT();
|
__AFL_INIT();
|
||||||
#endif
|
#endif
|
||||||
rio rdb;
|
rio rdb;
|
||||||
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
|
rdbSaveInfo rsi;
|
||||||
startLoadingFile(stdin, (char*)"stdin", 0);
|
startLoadingFile(stdin, (char*)"stdin", 0);
|
||||||
rioInitWithFile(&rdb,stdin);
|
rioInitWithFile(&rdb,stdin);
|
||||||
rdbLoadRio(&rdb,0,&rsi);
|
rdbLoadRio(&rdb,0,&rsi);
|
||||||
|
142
src/server.h
142
src/server.h
@ -1850,6 +1850,43 @@ struct redisMemOverhead {
|
|||||||
} *db;
|
} *db;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
struct redisMaster {
|
||||||
|
char *masteruser; /* AUTH with this user and masterauth with master */
|
||||||
|
char *masterauth; /* AUTH with this password with master */
|
||||||
|
char *masterhost; /* Hostname of master */
|
||||||
|
int masterport; /* Port of master */
|
||||||
|
client *cached_master; /* Cached master to be reused for PSYNC. */
|
||||||
|
client *master;
|
||||||
|
/* The following two fields is where we store master PSYNC replid/offset
|
||||||
|
* while the PSYNC is in progress. At the end we'll copy the fields into
|
||||||
|
* the server->master client structure. */
|
||||||
|
char master_replid[CONFIG_RUN_ID_SIZE+1]; /* Master PSYNC runid. */
|
||||||
|
long long master_initial_offset; /* Master PSYNC offset. */
|
||||||
|
|
||||||
|
bool isActive = false;
|
||||||
|
bool isRocksdbSnapshotRepl = false;
|
||||||
|
int repl_state; /* Replication status if the instance is a replica */
|
||||||
|
off_t repl_transfer_size; /* Size of RDB to read from master during sync. */
|
||||||
|
off_t repl_transfer_read; /* Amount of RDB read from master during sync. */
|
||||||
|
off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */
|
||||||
|
connection *repl_transfer_s; /* Slave -> Master SYNC socket */
|
||||||
|
int repl_transfer_fd; /* Slave -> Master SYNC temp file descriptor */
|
||||||
|
char *repl_transfer_tmpfile; /* Slave-> master SYNC temp file name */
|
||||||
|
time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */
|
||||||
|
time_t repl_down_since; /* Unix time at which link with master went down */
|
||||||
|
|
||||||
|
class SnapshotPayloadParseState *parseState;
|
||||||
|
sds bulkreadBuffer = nullptr;
|
||||||
|
|
||||||
|
unsigned char master_uuid[UUID_BINARY_LEN]; /* Used during sync with master, this is our master's UUID */
|
||||||
|
/* After we've connected with our master use the UUID in g_pserver->master */
|
||||||
|
uint64_t mvccLastSync;
|
||||||
|
/* During a handshake the server may have stale keys, we track these here to share once a reciprocal connection is made */
|
||||||
|
std::map<int, std::vector<robj_sharedptr>> *staleKeyMap;
|
||||||
|
int ielReplTransfer = -1;
|
||||||
|
};
|
||||||
|
|
||||||
/* This structure can be optionally passed to RDB save/load functions in
|
/* This structure can be optionally passed to RDB save/load functions in
|
||||||
* order to implement additional functionalities, by storing and loading
|
* order to implement additional functionalities, by storing and loading
|
||||||
* metadata to the RDB file.
|
* metadata to the RDB file.
|
||||||
@ -1858,7 +1895,66 @@ struct redisMemOverhead {
|
|||||||
* replication in order to make sure that chained slaves (slaves of slaves)
|
* replication in order to make sure that chained slaves (slaves of slaves)
|
||||||
* select the correct DB and are able to accept the stream coming from the
|
* select the correct DB and are able to accept the stream coming from the
|
||||||
* top-level master. */
|
* top-level master. */
|
||||||
typedef struct rdbSaveInfo {
|
class rdbSaveInfo {
|
||||||
|
public:
|
||||||
|
rdbSaveInfo() {
|
||||||
|
repl_stream_db = -1;
|
||||||
|
repl_id_is_set = 0;
|
||||||
|
memcpy(repl_id, "0000000000000000000000000000000000000000", sizeof(repl_id));
|
||||||
|
repl_offset = -1;
|
||||||
|
fForceSetKey = TRUE;
|
||||||
|
mvccMinThreshold = 0;
|
||||||
|
masters = nullptr;
|
||||||
|
masterCount = 0;
|
||||||
|
}
|
||||||
|
rdbSaveInfo(const rdbSaveInfo &other) {
|
||||||
|
repl_stream_db = other.repl_stream_db;
|
||||||
|
repl_id_is_set = other.repl_id_is_set;
|
||||||
|
memcpy(repl_id, other.repl_id, sizeof(repl_id));
|
||||||
|
repl_offset = other.repl_offset;
|
||||||
|
fForceSetKey = other.fForceSetKey;
|
||||||
|
mvccMinThreshold = other.mvccMinThreshold;
|
||||||
|
masters = (struct redisMaster*)malloc(sizeof(struct redisMaster) * other.masterCount);
|
||||||
|
memcpy(masters, other.masters, sizeof(struct redisMaster) * other.masterCount);
|
||||||
|
masterCount = other.masterCount;
|
||||||
|
}
|
||||||
|
rdbSaveInfo(rdbSaveInfo &&other) : rdbSaveInfo() {
|
||||||
|
swap(*this, other);
|
||||||
|
}
|
||||||
|
rdbSaveInfo &operator=(rdbSaveInfo other) {
|
||||||
|
swap(*this, other);
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
~rdbSaveInfo() {
|
||||||
|
free(masters);
|
||||||
|
}
|
||||||
|
friend void swap(rdbSaveInfo &first, rdbSaveInfo &second) {
|
||||||
|
std::swap(first.repl_stream_db, second.repl_stream_db);
|
||||||
|
std::swap(first.repl_id_is_set, second.repl_id_is_set);
|
||||||
|
std::swap(first.repl_id, second.repl_id);
|
||||||
|
std::swap(first.repl_offset, second.repl_offset);
|
||||||
|
std::swap(first.fForceSetKey, second.fForceSetKey);
|
||||||
|
std::swap(first.mvccMinThreshold, second.mvccMinThreshold);
|
||||||
|
std::swap(first.masters, second.masters);
|
||||||
|
std::swap(first.masterCount, second.masterCount);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
void addMaster(const struct redisMaster &mi) {
|
||||||
|
masterCount++;
|
||||||
|
if (masters == nullptr) {
|
||||||
|
masters = (struct redisMaster*)malloc(sizeof(struct redisMaster));
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
masters = (struct redisMaster*)realloc(masters, sizeof(struct redisMaster) * masterCount);
|
||||||
|
}
|
||||||
|
memcpy(masters + masterCount - 1, &mi, sizeof(struct redisMaster));
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t numMasters() {
|
||||||
|
return masterCount;
|
||||||
|
}
|
||||||
|
|
||||||
/* Used saving and loading. */
|
/* Used saving and loading. */
|
||||||
int repl_stream_db; /* DB to select in g_pserver->master client. */
|
int repl_stream_db; /* DB to select in g_pserver->master client. */
|
||||||
|
|
||||||
@ -1872,10 +1968,11 @@ typedef struct rdbSaveInfo {
|
|||||||
long long master_repl_offset;
|
long long master_repl_offset;
|
||||||
|
|
||||||
uint64_t mvccMinThreshold;
|
uint64_t mvccMinThreshold;
|
||||||
struct redisMaster *mi;
|
struct redisMaster *masters;
|
||||||
} rdbSaveInfo;
|
|
||||||
|
|
||||||
#define RDB_SAVE_INFO_INIT {-1,0,"0000000000000000000000000000000000000000",-1, TRUE, 0, 0, nullptr}
|
private:
|
||||||
|
size_t masterCount;
|
||||||
|
};
|
||||||
|
|
||||||
struct malloc_stats {
|
struct malloc_stats {
|
||||||
size_t zmalloc_used;
|
size_t zmalloc_used;
|
||||||
@ -2081,42 +2178,6 @@ private:
|
|||||||
int rdb_key_save_delay = -1; // thread local cache
|
int rdb_key_save_delay = -1; // thread local cache
|
||||||
};
|
};
|
||||||
|
|
||||||
struct redisMaster {
|
|
||||||
char *masteruser; /* AUTH with this user and masterauth with master */
|
|
||||||
char *masterauth; /* AUTH with this password with master */
|
|
||||||
char *masterhost; /* Hostname of master */
|
|
||||||
int masterport; /* Port of master */
|
|
||||||
client *cached_master; /* Cached master to be reused for PSYNC. */
|
|
||||||
client *master;
|
|
||||||
/* The following two fields is where we store master PSYNC replid/offset
|
|
||||||
* while the PSYNC is in progress. At the end we'll copy the fields into
|
|
||||||
* the server->master client structure. */
|
|
||||||
char master_replid[CONFIG_RUN_ID_SIZE+1]; /* Master PSYNC runid. */
|
|
||||||
long long master_initial_offset; /* Master PSYNC offset. */
|
|
||||||
|
|
||||||
bool isActive = false;
|
|
||||||
bool isRocksdbSnapshotRepl = false;
|
|
||||||
int repl_state; /* Replication status if the instance is a replica */
|
|
||||||
off_t repl_transfer_size; /* Size of RDB to read from master during sync. */
|
|
||||||
off_t repl_transfer_read; /* Amount of RDB read from master during sync. */
|
|
||||||
off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */
|
|
||||||
connection *repl_transfer_s; /* Slave -> Master SYNC socket */
|
|
||||||
int repl_transfer_fd; /* Slave -> Master SYNC temp file descriptor */
|
|
||||||
char *repl_transfer_tmpfile; /* Slave-> master SYNC temp file name */
|
|
||||||
time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */
|
|
||||||
time_t repl_down_since; /* Unix time at which link with master went down */
|
|
||||||
|
|
||||||
class SnapshotPayloadParseState *parseState;
|
|
||||||
sds bulkreadBuffer = nullptr;
|
|
||||||
|
|
||||||
unsigned char master_uuid[UUID_BINARY_LEN]; /* Used during sync with master, this is our master's UUID */
|
|
||||||
/* After we've connected with our master use the UUID in g_pserver->master */
|
|
||||||
uint64_t mvccLastSync;
|
|
||||||
/* During a handshake the server may have stale keys, we track these here to share once a reciprocal connection is made */
|
|
||||||
std::map<int, std::vector<robj_sharedptr>> *staleKeyMap;
|
|
||||||
int ielReplTransfer = -1;
|
|
||||||
};
|
|
||||||
|
|
||||||
// Const vars are not changed after worker threads are launched
|
// Const vars are not changed after worker threads are launched
|
||||||
struct redisServerConst {
|
struct redisServerConst {
|
||||||
pid_t pid; /* Main process pid. */
|
pid_t pid; /* Main process pid. */
|
||||||
@ -3101,6 +3162,7 @@ void clearReplicationId2(void);
|
|||||||
void mergeReplicationId(const char *);
|
void mergeReplicationId(const char *);
|
||||||
void chopReplicationBacklog(void);
|
void chopReplicationBacklog(void);
|
||||||
void replicationCacheMasterUsingMyself(struct redisMaster *mi);
|
void replicationCacheMasterUsingMyself(struct redisMaster *mi);
|
||||||
|
void replicationCacheMasterUsingMaster(struct redisMaster *mi);
|
||||||
void feedReplicationBacklog(const void *ptr, size_t len);
|
void feedReplicationBacklog(const void *ptr, size_t len);
|
||||||
void updateMasterAuth();
|
void updateMasterAuth();
|
||||||
void showLatestBacklog();
|
void showLatestBacklog();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user