Implement database merging for Active Replicas

Former-commit-id: 91e6368de0f0ecb7e4db497ce286a15336d4ec34
This commit is contained in:
John Sully 2019-03-28 15:12:43 -04:00
parent bbc073c07c
commit 7e62ed49af
7 changed files with 97 additions and 43 deletions

View File

@ -727,7 +727,8 @@ int loadAppendOnlyFile(char *filename) {
serverLog(LL_NOTICE,"Reading RDB preamble from AOF file..."); serverLog(LL_NOTICE,"Reading RDB preamble from AOF file...");
if (fseek(fp,0,SEEK_SET) == -1) goto readerr; if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
rioInitWithFile(&rdb,fileno(fp)); rioInitWithFile(&rdb,fileno(fp));
if (rdbLoadRio(&rdb,NULL,1) != C_OK) { rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
if (rdbLoadRio(&rdb,&rsi,1) != C_OK) {
serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted"); serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted");
goto readerr; goto readerr;
} else { } else {

View File

@ -167,19 +167,47 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
return o; return o;
} }
int dbAddCore(redisDb *db, robj *key, robj *val) {
sds copy = sdsdup(ptrFromObj(key));
int retval = dictAdd(db->pdict, copy, val);
if (retval == DICT_OK)
{
if (val->type == OBJ_LIST ||
val->type == OBJ_ZSET)
signalKeyAsReady(db, key);
if (server.cluster_enabled) slotToKeyAdd(key);
}
else
{
sdsfree(copy);
}
return retval;
}
/* Add the key to the DB. It's up to the caller to increment the reference /* Add the key to the DB. It's up to the caller to increment the reference
* counter of the value if needed. * counter of the value if needed.
* *
* The program is aborted if the key already exists. */ * The program is aborted if the key already exists. */
void dbAdd(redisDb *db, robj *key, robj *val) { void dbAdd(redisDb *db, robj *key, robj *val)
sds copy = sdsdup(ptrFromObj(key)); {
int retval = dictAdd(db->pdict, copy, val); int retval = dbAddCore(db, key, val);
serverAssertWithInfo(NULL,key,retval == DICT_OK); serverAssertWithInfo(NULL,key,retval == DICT_OK);
if (val->type == OBJ_LIST || }
val->type == OBJ_ZSET)
signalKeyAsReady(db, key); /* Insert a key, handling duplicate keys according to fReplace */
if (server.cluster_enabled) slotToKeyAdd(key); int dbMerge(redisDb *db, robj *key, robj *val, int fReplace)
{
if (fReplace)
{
setKey(db, key, val);
return TRUE;
}
else
{
return (dbAddCore(db, key, val) == DICT_OK);
}
} }
/* Overwrite an existing key with a new value. Incrementing the reference /* Overwrite an existing key with a new value. Incrementing the reference

View File

@ -362,7 +362,8 @@ NULL
} }
emptyDb(-1,EMPTYDB_NO_FLAGS,NULL); emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
protectClient(c); protectClient(c);
int ret = rdbLoad(NULL); rdbSaveInfo rsiDft = RDB_SAVE_INFO_INIT;
int ret = rdbLoad(&rsiDft);
unprotectClient(c); unprotectClient(c);
if (ret != C_OK) { if (ret != C_OK) {
addReplyError(c,"Error trying to load the RDB dump"); addReplyError(c,"Error trying to load the RDB dump");

View File

@ -1166,10 +1166,8 @@ static void freeClientArgv(client *c) {
c->cmd = NULL; c->cmd = NULL;
} }
/* Close all the slaves connections. This is useful in chained replication void disconnectSlavesExcept(unsigned char *uuid)
* when we resync with our own master and want to force all our slaves to {
* resync with us as well. */
void disconnectSlaves(void) {
serverAssert(GlobalLocksAcquired()); serverAssert(GlobalLocksAcquired());
listIter li; listIter li;
listNode *ln; listNode *ln;
@ -1177,8 +1175,16 @@ void disconnectSlaves(void) {
listRewind(server.slaves, &li); listRewind(server.slaves, &li);
while ((ln = listNext(&li))) { while ((ln = listNext(&li))) {
client *c = (client*)listNodeValue(ln); client *c = (client*)listNodeValue(ln);
freeClientAsync(c); if (uuid == nullptr || !FUuidEqual(c->uuid, uuid))
} freeClientAsync(c);
}
}
/* Close all the slaves connections. This is useful in chained replication
* when we resync with our own master and want to force all our slaves to
* resync with us as well. */
void disconnectSlaves(void) {
disconnectSlavesExcept(nullptr);
} }
/* Remove the specified client from global lists where the client could /* Remove the specified client from global lists where the client could

View File

@ -2055,17 +2055,25 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
decrRefCount(val); decrRefCount(val);
} else { } else {
/* Add the new object in the hash table */ /* Add the new object in the hash table */
dbAdd(db,key,val); int fInserted = dbMerge(db, key, val, rsi->fForceSetKey);
/* Set the expire time if needed */ if (fInserted)
if (expiretime != -1) setExpire(NULL,db,key,expiretime); {
/* Set the expire time if needed */
if (expiretime != -1) setExpire(NULL,db,key,expiretime);
/* Set usage information (for eviction). */ /* Set usage information (for eviction). */
objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock); objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock);
/* Decrement the key refcount since dbAdd() will take its /* Decrement the key refcount since dbMerge() will take its
* own reference. */ * own reference. */
decrRefCount(key); decrRefCount(key);
}
else
{
decrRefCount(key);
decrRefCount(val);
}
} }
/* Reset the state that is key-specified and is populated by /* Reset the state that is key-specified and is populated by

View File

@ -77,6 +77,9 @@ char *replicationGetSlaveName(client *c) {
static bool FSameHost(client *clientA, client *clientB) static bool FSameHost(client *clientA, client *clientB)
{ {
if (clientA == nullptr || clientB == nullptr)
return false;
const unsigned char *a = clientA->uuid; const unsigned char *a = clientA->uuid;
const unsigned char *b = clientB->uuid; const unsigned char *b = clientB->uuid;
@ -138,7 +141,8 @@ void freeReplicationBacklog(void) {
listRewind(server.slaves, &li); listRewind(server.slaves, &li);
while ((ln = listNext(&li))) { while ((ln = listNext(&li))) {
// server.slaves should be empty, or filled with clients pending close // server.slaves should be empty, or filled with clients pending close
serverAssert(((client*)listNodeValue(ln))->flags & CLIENT_CLOSE_ASAP); client *c = (client*)listNodeValue(ln);
serverAssert(c->flags & CLIENT_CLOSE_ASAP || FUuidEqual(server.master_uuid, c->uuid));
} }
zfree(server.repl_backlog); zfree(server.repl_backlog);
server.repl_backlog = NULL; server.repl_backlog = NULL;
@ -1234,8 +1238,8 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
ssize_t nread, readlen, nwritten; ssize_t nread, readlen, nwritten;
off_t left; off_t left;
UNUSED(el); UNUSED(el);
UNUSED(privdata);
UNUSED(mask); UNUSED(mask);
int fUpdate = (int)((ptrdiff_t)privdata); // Should we update our database, or create from scratch?
serverAssert(GlobalLocksAcquired()); serverAssert(GlobalLocksAcquired());
@ -1390,15 +1394,19 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
cancelReplicationHandshake(); cancelReplicationHandshake();
return; return;
} }
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data"); serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: %s", fUpdate ? "Keeping old data" : "Flushing old data");
/* We need to stop any AOFRW fork before flusing and parsing /* We need to stop any AOFRW fork before flusing and parsing
* RDB, otherwise we'll create a copy-on-write disaster. */ * RDB, otherwise we'll create a copy-on-write disaster. */
if(aof_is_enabled) stopAppendOnly(); if(aof_is_enabled) stopAppendOnly();
signalFlushedDb(-1); if (!fUpdate)
emptyDb( {
-1, signalFlushedDb(-1);
server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS, emptyDb(
replicationEmptyDbCallback); -1,
server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS,
replicationEmptyDbCallback);
}
/* Before loading the DB into memory we need to delete the readable /* Before loading the DB into memory we need to delete the readable
* handler, otherwise it will get called recursively since * handler, otherwise it will get called recursively since
* rdbLoad() will call the event loop to process events from time to * rdbLoad() will call the event loop to process events from time to
@ -1576,7 +1584,7 @@ int slaveTryPartialResynchronization(aeEventLoop *el, int fd, int read_reply) {
* client structure representing the master into server.master. */ * client structure representing the master into server.master. */
server.master_initial_offset = -1; server.master_initial_offset = -1;
if (server.cached_master) { if (server.cached_master && !server.fActiveReplica) {
psync_replid = server.cached_master->replid; psync_replid = server.cached_master->replid;
snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1); snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset); serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset);
@ -1726,7 +1734,7 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
serverAssert(GlobalLocksAcquired()); serverAssert(GlobalLocksAcquired());
char tmpfile[256], *err = NULL; char tmpfile[256], *err = NULL;
int dfd = -1, maxtries = 5; int dfd = -1, maxtries = 5;
int sockerr = 0, psync_result; int sockerr = 0, psync_result = PSYNC_FULLRESYNC;
socklen_t errlen = sizeof(sockerr); socklen_t errlen = sizeof(sockerr);
UNUSED(el); UNUSED(el);
UNUSED(privdata); UNUSED(privdata);
@ -1960,13 +1968,13 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */ if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */
/* If the master is in an transient error, we should try to PSYNC /* If the master is in an transient error, we should try to PSYNC
* from scratch later, so go to the error path. This happens when * from scratch later, so go to the error path. This happens when
* the server is loading the dataset or is not connected with its * the server is loading the dataset or is not connected with its
* master and so forth. */ * master and so forth. */
if (psync_result == PSYNC_TRY_LATER) goto error; if (psync_result == PSYNC_TRY_LATER) goto error;
/* Note: if PSYNC does not return WAIT_REPLY, it will take care of /* Note: if PSYNC does not return WAIT_REPLY, it will take care of
* uninstalling the read handler from the file descriptor. */ * uninstalling the read handler from the file descriptor. */
if (psync_result == PSYNC_CONTINUE) { if (psync_result == PSYNC_CONTINUE) {
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Master accepted a Partial Resynchronization."); serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Master accepted a Partial Resynchronization.");
@ -1977,7 +1985,7 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
* as well, if we have any sub-slaves. The master may transfer us an * as well, if we have any sub-slaves. The master may transfer us an
* entirely different data set and we have no way to incrementally feed * entirely different data set and we have no way to incrementally feed
* our slaves after that. */ * our slaves after that. */
disconnectSlaves(); /* Force our slaves to resync with us as well. */ disconnectSlavesExcept(server.master_uuid); /* Force our slaves to resync with us as well. */
freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */ freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
/* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
@ -2006,7 +2014,7 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
} }
/* Setup the non blocking download of the bulk file. */ /* Setup the non blocking download of the bulk file. */
if (aeCreateFileEvent(el,fd, AE_READABLE,readSyncBulkPayload,NULL) if (aeCreateFileEvent(el,fd, AE_READABLE,readSyncBulkPayload,(void*)((ptrdiff_t)server.fActiveReplica))
== AE_ERR) == AE_ERR)
{ {
serverLog(LL_WARNING, serverLog(LL_WARNING,

View File

@ -76,6 +76,7 @@ typedef long long mstime_t; /* millisecond time type. */
#include "quicklist.h" /* Lists are encoded as linked lists of #include "quicklist.h" /* Lists are encoded as linked lists of
N-elements flat arrays */ N-elements flat arrays */
#include "rax.h" /* Radix tree */ #include "rax.h" /* Radix tree */
#include "uuid.h"
/* Following includes allow test functions to be called from Redis main() */ /* Following includes allow test functions to be called from Redis main() */
#include "zipmap.h" #include "zipmap.h"
@ -87,8 +88,6 @@ typedef long long mstime_t; /* millisecond time type. */
extern "C" { extern "C" {
#endif #endif
#define UUID_BINARY_LEN 16
/* Error codes */ /* Error codes */
#define C_OK 0 #define C_OK 0
#define C_ERR -1 #define C_ERR -1
@ -1027,9 +1026,10 @@ typedef struct rdbSaveInfo {
int repl_id_is_set; /* True if repl_id field is set. */ int repl_id_is_set; /* True if repl_id field is set. */
char repl_id[CONFIG_RUN_ID_SIZE+1]; /* Replication ID. */ char repl_id[CONFIG_RUN_ID_SIZE+1]; /* Replication ID. */
long long repl_offset; /* Replication offset. */ long long repl_offset; /* Replication offset. */
int fForceSetKey;
} rdbSaveInfo; } rdbSaveInfo;
#define RDB_SAVE_INFO_INIT {-1,0,"000000000000000000000000000000",-1} #define RDB_SAVE_INFO_INIT {-1,0,"000000000000000000000000000000",-1, TRUE}
struct malloc_stats { struct malloc_stats {
size_t zmalloc_used; size_t zmalloc_used;
@ -1647,6 +1647,7 @@ int getClientTypeByName(const char *name);
const char *getClientTypeName(int cclass); const char *getClientTypeName(int cclass);
void flushSlavesOutputBuffers(void); void flushSlavesOutputBuffers(void);
void disconnectSlaves(void); void disconnectSlaves(void);
void disconnectSlavesExcept(unsigned char *uuid);
int listenToPort(int port, int *fds, int *count, int fReusePort); int listenToPort(int port, int *fds, int *count, int fReusePort);
void pauseClients(mstime_t duration); void pauseClients(mstime_t duration);
int clientsArePaused(void); int clientsArePaused(void);
@ -2046,6 +2047,7 @@ void objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
#define LOOKUP_NOTOUCH (1<<0) #define LOOKUP_NOTOUCH (1<<0)
void dbAdd(redisDb *db, robj *key, robj *val); void dbAdd(redisDb *db, robj *key, robj *val);
void dbOverwrite(redisDb *db, robj *key, robj *val); void dbOverwrite(redisDb *db, robj *key, robj *val);
int dbMerge(redisDb *db, robj *key, robj *val, int fReplace);
void setKey(redisDb *db, robj *key, robj *val); void setKey(redisDb *db, robj *key, robj *val);
int dbExists(redisDb *db, robj *key); int dbExists(redisDb *db, robj *key);
robj *dbRandomKey(redisDb *db); robj *dbRandomKey(redisDb *db);