From 7e62ed49af58e611a497140621bd3dcab65d41fe Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 28 Mar 2019 15:12:43 -0400 Subject: [PATCH] Implement database merging for Active Replicas Former-commit-id: 91e6368de0f0ecb7e4db497ce286a15336d4ec34 --- src/aof.c | 3 ++- src/db.c | 44 ++++++++++++++++++++++++++++++++++++-------- src/debug.c | 3 ++- src/networking.cpp | 18 ++++++++++++------ src/rdb.c | 24 ++++++++++++++++-------- src/replication.cpp | 40 ++++++++++++++++++++++++---------------- src/server.h | 8 +++++--- 7 files changed, 97 insertions(+), 43 deletions(-) diff --git a/src/aof.c b/src/aof.c index 431ea8756..fb8360bf5 100644 --- a/src/aof.c +++ b/src/aof.c @@ -727,7 +727,8 @@ int loadAppendOnlyFile(char *filename) { serverLog(LL_NOTICE,"Reading RDB preamble from AOF file..."); if (fseek(fp,0,SEEK_SET) == -1) goto readerr; rioInitWithFile(&rdb,fileno(fp)); - if (rdbLoadRio(&rdb,NULL,1) != C_OK) { + rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; + if (rdbLoadRio(&rdb,&rsi,1) != C_OK) { serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted"); goto readerr; } else { diff --git a/src/db.c b/src/db.c index d2946b341..c46b6de18 100644 --- a/src/db.c +++ b/src/db.c @@ -167,19 +167,47 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) { return o; } +int dbAddCore(redisDb *db, robj *key, robj *val) { + sds copy = sdsdup(ptrFromObj(key)); + int retval = dictAdd(db->pdict, copy, val); + + if (retval == DICT_OK) + { + if (val->type == OBJ_LIST || + val->type == OBJ_ZSET) + signalKeyAsReady(db, key); + if (server.cluster_enabled) slotToKeyAdd(key); + } + else + { + sdsfree(copy); + } + + return retval; +} + /* Add the key to the DB. It's up to the caller to increment the reference * counter of the value if needed. * * The program is aborted if the key already exists. */ -void dbAdd(redisDb *db, robj *key, robj *val) { - sds copy = sdsdup(ptrFromObj(key)); - int retval = dictAdd(db->pdict, copy, val); - +void dbAdd(redisDb *db, robj *key, robj *val) +{ + int retval = dbAddCore(db, key, val); serverAssertWithInfo(NULL,key,retval == DICT_OK); - if (val->type == OBJ_LIST || - val->type == OBJ_ZSET) - signalKeyAsReady(db, key); - if (server.cluster_enabled) slotToKeyAdd(key); +} + +/* Insert a key, handling duplicate keys according to fReplace */ +int dbMerge(redisDb *db, robj *key, robj *val, int fReplace) +{ + if (fReplace) + { + setKey(db, key, val); + return TRUE; + } + else + { + return (dbAddCore(db, key, val) == DICT_OK); + } } /* Overwrite an existing key with a new value. Incrementing the reference diff --git a/src/debug.c b/src/debug.c index 34e0ab22c..1e4af59ca 100644 --- a/src/debug.c +++ b/src/debug.c @@ -362,7 +362,8 @@ NULL } emptyDb(-1,EMPTYDB_NO_FLAGS,NULL); protectClient(c); - int ret = rdbLoad(NULL); + rdbSaveInfo rsiDft = RDB_SAVE_INFO_INIT; + int ret = rdbLoad(&rsiDft); unprotectClient(c); if (ret != C_OK) { addReplyError(c,"Error trying to load the RDB dump"); diff --git a/src/networking.cpp b/src/networking.cpp index 550f84b9f..726fc7ee2 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1166,10 +1166,8 @@ static void freeClientArgv(client *c) { c->cmd = NULL; } -/* Close all the slaves connections. This is useful in chained replication - * when we resync with our own master and want to force all our slaves to - * resync with us as well. */ -void disconnectSlaves(void) { +void disconnectSlavesExcept(unsigned char *uuid) +{ serverAssert(GlobalLocksAcquired()); listIter li; listNode *ln; @@ -1177,8 +1175,16 @@ void disconnectSlaves(void) { listRewind(server.slaves, &li); while ((ln = listNext(&li))) { client *c = (client*)listNodeValue(ln); - freeClientAsync(c); - } + if (uuid == nullptr || !FUuidEqual(c->uuid, uuid)) + freeClientAsync(c); + } +} + +/* Close all the slaves connections. This is useful in chained replication + * when we resync with our own master and want to force all our slaves to + * resync with us as well. */ +void disconnectSlaves(void) { + disconnectSlavesExcept(nullptr); } /* Remove the specified client from global lists where the client could diff --git a/src/rdb.c b/src/rdb.c index 6a06e779e..8ab2fee82 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -2055,17 +2055,25 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { decrRefCount(val); } else { /* Add the new object in the hash table */ - dbAdd(db,key,val); + int fInserted = dbMerge(db, key, val, rsi->fForceSetKey); - /* Set the expire time if needed */ - if (expiretime != -1) setExpire(NULL,db,key,expiretime); + if (fInserted) + { + /* Set the expire time if needed */ + if (expiretime != -1) setExpire(NULL,db,key,expiretime); - /* Set usage information (for eviction). */ - objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock); + /* Set usage information (for eviction). */ + objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock); - /* Decrement the key refcount since dbAdd() will take its - * own reference. */ - decrRefCount(key); + /* Decrement the key refcount since dbMerge() will take its + * own reference. */ + decrRefCount(key); + } + else + { + decrRefCount(key); + decrRefCount(val); + } } /* Reset the state that is key-specified and is populated by diff --git a/src/replication.cpp b/src/replication.cpp index 651680127..fd1c6301c 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -77,6 +77,9 @@ char *replicationGetSlaveName(client *c) { static bool FSameHost(client *clientA, client *clientB) { + if (clientA == nullptr || clientB == nullptr) + return false; + const unsigned char *a = clientA->uuid; const unsigned char *b = clientB->uuid; @@ -138,7 +141,8 @@ void freeReplicationBacklog(void) { listRewind(server.slaves, &li); while ((ln = listNext(&li))) { // server.slaves should be empty, or filled with clients pending close - serverAssert(((client*)listNodeValue(ln))->flags & CLIENT_CLOSE_ASAP); + client *c = (client*)listNodeValue(ln); + serverAssert(c->flags & CLIENT_CLOSE_ASAP || FUuidEqual(server.master_uuid, c->uuid)); } zfree(server.repl_backlog); server.repl_backlog = NULL; @@ -1234,8 +1238,8 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { ssize_t nread, readlen, nwritten; off_t left; UNUSED(el); - UNUSED(privdata); UNUSED(mask); + int fUpdate = (int)((ptrdiff_t)privdata); // Should we update our database, or create from scratch? serverAssert(GlobalLocksAcquired()); @@ -1390,15 +1394,19 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { cancelReplicationHandshake(); return; } - serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data"); + serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: %s", fUpdate ? "Keeping old data" : "Flushing old data"); /* We need to stop any AOFRW fork before flusing and parsing * RDB, otherwise we'll create a copy-on-write disaster. */ if(aof_is_enabled) stopAppendOnly(); - signalFlushedDb(-1); - emptyDb( - -1, - server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS, - replicationEmptyDbCallback); + if (!fUpdate) + { + signalFlushedDb(-1); + emptyDb( + -1, + server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS, + replicationEmptyDbCallback); + } + /* Before loading the DB into memory we need to delete the readable * handler, otherwise it will get called recursively since * rdbLoad() will call the event loop to process events from time to @@ -1576,7 +1584,7 @@ int slaveTryPartialResynchronization(aeEventLoop *el, int fd, int read_reply) { * client structure representing the master into server.master. */ server.master_initial_offset = -1; - if (server.cached_master) { + if (server.cached_master && !server.fActiveReplica) { psync_replid = server.cached_master->replid; snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1); serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset); @@ -1726,7 +1734,7 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { serverAssert(GlobalLocksAcquired()); char tmpfile[256], *err = NULL; int dfd = -1, maxtries = 5; - int sockerr = 0, psync_result; + int sockerr = 0, psync_result = PSYNC_FULLRESYNC; socklen_t errlen = sizeof(sockerr); UNUSED(el); UNUSED(privdata); @@ -1960,13 +1968,13 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */ /* If the master is in an transient error, we should try to PSYNC - * from scratch later, so go to the error path. This happens when - * the server is loading the dataset or is not connected with its - * master and so forth. */ + * from scratch later, so go to the error path. This happens when + * the server is loading the dataset or is not connected with its + * master and so forth. */ if (psync_result == PSYNC_TRY_LATER) goto error; /* Note: if PSYNC does not return WAIT_REPLY, it will take care of - * uninstalling the read handler from the file descriptor. */ + * uninstalling the read handler from the file descriptor. */ if (psync_result == PSYNC_CONTINUE) { serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Master accepted a Partial Resynchronization."); @@ -1977,7 +1985,7 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { * as well, if we have any sub-slaves. The master may transfer us an * entirely different data set and we have no way to incrementally feed * our slaves after that. */ - disconnectSlaves(); /* Force our slaves to resync with us as well. */ + disconnectSlavesExcept(server.master_uuid); /* Force our slaves to resync with us as well. */ freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */ /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC @@ -2006,7 +2014,7 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { } /* Setup the non blocking download of the bulk file. */ - if (aeCreateFileEvent(el,fd, AE_READABLE,readSyncBulkPayload,NULL) + if (aeCreateFileEvent(el,fd, AE_READABLE,readSyncBulkPayload,(void*)((ptrdiff_t)server.fActiveReplica)) == AE_ERR) { serverLog(LL_WARNING, diff --git a/src/server.h b/src/server.h index 7b56f27ff..26a9dac00 100644 --- a/src/server.h +++ b/src/server.h @@ -76,6 +76,7 @@ typedef long long mstime_t; /* millisecond time type. */ #include "quicklist.h" /* Lists are encoded as linked lists of N-elements flat arrays */ #include "rax.h" /* Radix tree */ +#include "uuid.h" /* Following includes allow test functions to be called from Redis main() */ #include "zipmap.h" @@ -87,8 +88,6 @@ typedef long long mstime_t; /* millisecond time type. */ extern "C" { #endif -#define UUID_BINARY_LEN 16 - /* Error codes */ #define C_OK 0 #define C_ERR -1 @@ -1027,9 +1026,10 @@ typedef struct rdbSaveInfo { int repl_id_is_set; /* True if repl_id field is set. */ char repl_id[CONFIG_RUN_ID_SIZE+1]; /* Replication ID. */ long long repl_offset; /* Replication offset. */ + int fForceSetKey; } rdbSaveInfo; -#define RDB_SAVE_INFO_INIT {-1,0,"000000000000000000000000000000",-1} +#define RDB_SAVE_INFO_INIT {-1,0,"000000000000000000000000000000",-1, TRUE} struct malloc_stats { size_t zmalloc_used; @@ -1647,6 +1647,7 @@ int getClientTypeByName(const char *name); const char *getClientTypeName(int cclass); void flushSlavesOutputBuffers(void); void disconnectSlaves(void); +void disconnectSlavesExcept(unsigned char *uuid); int listenToPort(int port, int *fds, int *count, int fReusePort); void pauseClients(mstime_t duration); int clientsArePaused(void); @@ -2046,6 +2047,7 @@ void objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle, #define LOOKUP_NOTOUCH (1<<0) void dbAdd(redisDb *db, robj *key, robj *val); void dbOverwrite(redisDb *db, robj *key, robj *val); +int dbMerge(redisDb *db, robj *key, robj *val, int fReplace); void setKey(redisDb *db, robj *key, robj *val); int dbExists(redisDb *db, robj *key); robj *dbRandomKey(redisDb *db);