From b12b48ab272dbf1e6fdd8b758c175cacc10eb449 Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 9 Nov 2021 19:36:07 +0000 Subject: [PATCH 01/20] Initial implementation of snapshot fast replication. There are still a few TODOs in progress Former-commit-id: 0febdcdab8693af443f350968ed3d8c80106675d --- runtest | 1 + src/IStorage.h | 4 +- src/Makefile | 5 +- src/SnapshotPayloadParseState.cpp | 287 +++++++++++++++ src/SnapshotPayloadParseState.h | 64 ++++ src/StorageCache.cpp | 46 ++- src/StorageCache.h | 3 +- src/db.cpp | 17 +- src/dict.cpp | 4 +- src/networking.cpp | 31 +- src/replication.cpp | 469 +++++++++++++++++++++++-- src/sds.h | 9 +- src/server.h | 23 +- src/storage/rocksdbfactor_internal.h | 28 ++ tests/integration/replication-fast.tcl | 34 ++ tests/support/server.tcl | 4 + tests/test_helper.tcl | 12 + 17 files changed, 981 insertions(+), 60 deletions(-) create mode 100644 src/SnapshotPayloadParseState.cpp create mode 100644 src/SnapshotPayloadParseState.h create mode 100644 src/storage/rocksdbfactor_internal.h create mode 100644 tests/integration/replication-fast.tcl diff --git a/runtest b/runtest index c6349d118..6c9ea5ae4 100755 --- a/runtest +++ b/runtest @@ -14,3 +14,4 @@ then exit 1 fi $TCLSH tests/test_helper.tcl "${@}" +$TCLSH tests/test_helper.tcl "${@}--flash" \ No newline at end of file diff --git a/src/IStorage.h b/src/IStorage.h index d1f316022..e060d849f 100644 --- a/src/IStorage.h +++ b/src/IStorage.h @@ -30,10 +30,10 @@ public: virtual bool enumerate(callback fn) const = 0; virtual size_t count() const = 0; - virtual void bulkInsert(sds *rgkeys, sds *rgvals, size_t celem) { + virtual void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem) { beginWriteBatch(); for (size_t ielem = 0; ielem < celem; ++ielem) { - insert(rgkeys[ielem], sdslen(rgkeys[ielem]), rgvals[ielem], sdslen(rgvals[ielem]), false); + insert(rgkeys[ielem], rgcbkeys[ielem], rgvals[ielem], rgcbvals[ielem], false); } endWriteBatch(); } diff --git a/src/Makefile b/src/Makefile index 13f5209f7..d275cfea1 100644 --- a/src/Makefile +++ b/src/Makefile @@ -354,6 +354,7 @@ endif REDIS_SERVER_NAME=keydb-server$(PROG_SUFFIX) REDIS_SENTINEL_NAME=keydb-sentinel$(PROG_SUFFIX) REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o t_nhash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o cron.o connection.o tls.o sha256.o motd_server.o timeout.o setcpuaffinity.o AsyncWorkQueue.o snapshot.o storage/rocksdb.o storage/rocksdbfactory.o storage/teststorageprovider.o keydbutils.o StorageCache.o monotonic.o cli_common.o mt19937-64.o $(ASM_OBJ) +KEYDB_SERVER_OBJ=SnapshotPayloadParseState.o REDIS_CLI_NAME=keydb-cli$(PROG_SUFFIX) REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crcspeed.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o motd_client.o monotonic.o cli_common.o mt19937-64.o $(ASM_OBJ) REDIS_BENCHMARK_NAME=keydb-benchmark$(PROG_SUFFIX) @@ -410,7 +411,7 @@ endif @touch $@ # keydb-server -$(REDIS_SERVER_NAME): $(REDIS_SERVER_OBJ) +$(REDIS_SERVER_NAME): $(REDIS_SERVER_OBJ) $(KEYDB_SERVER_OBJ) $(REDIS_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/lua/src/liblua.a ../deps/rocksdb/librocksdb.a $(FINAL_LIBS) # keydb-sentinel @@ -433,7 +434,7 @@ $(REDIS_CLI_NAME): $(REDIS_CLI_OBJ) $(REDIS_BENCHMARK_NAME): $(REDIS_BENCHMARK_OBJ) $(REDIS_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/hdr_histogram/hdr_histogram.o $(FINAL_LIBS) -DEP = $(REDIS_SERVER_OBJ:%.o=%.d) $(REDIS_CLI_OBJ:%.o=%.d) $(REDIS_BENCHMARK_OBJ:%.o=%.d) +DEP = $(REDIS_SERVER_OBJ:%.o=%.d) $(KEYDB_SERVER_OBJ:%.o=%.d) $(REDIS_CLI_OBJ:%.o=%.d) $(REDIS_BENCHMARK_OBJ:%.o=%.d) -include $(DEP) # Because the jemalloc.h header is generated as a part of the jemalloc build, diff --git a/src/SnapshotPayloadParseState.cpp b/src/SnapshotPayloadParseState.cpp new file mode 100644 index 000000000..4bf89c718 --- /dev/null +++ b/src/SnapshotPayloadParseState.cpp @@ -0,0 +1,287 @@ +#include "server.h" +#include "SnapshotPayloadParseState.h" +#include + +static const size_t SLAB_SIZE = 2*1024*1024; // Note 64 MB because we try to give 64MB to a block +class SlabAllocator +{ + class Slab { + char *m_pv = nullptr; + size_t m_cb = 0; + size_t m_cbAllocated = 0; + + public: + Slab(size_t cbAllocate) { + m_pv = (char*)zmalloc(cbAllocate); + m_cb = cbAllocate; + } + + ~Slab() { + zfree(m_pv); + } + + Slab(Slab &&other) { + m_pv = other.m_pv; + m_cb = other.m_cb; + m_cbAllocated = other.m_cbAllocated; + other.m_pv = nullptr; + other.m_cb = 0; + other.m_cbAllocated = 0; + } + + bool canAllocate(size_t cb) const { + return (m_cbAllocated + cb) <= m_cb; + } + + void *allocate(size_t cb) { + if ((m_cbAllocated + cb) > m_cb) + return nullptr; + void *pvret = m_pv + m_cbAllocated; + m_cbAllocated += cb; + return pvret; + } + }; + std::vector m_vecslabs; +public: + + void *allocate(size_t cb) { + if (m_vecslabs.empty() || !m_vecslabs.back().canAllocate(cb)) { + m_vecslabs.emplace_back(std::max(cb, SLAB_SIZE)); + } + return m_vecslabs.back().allocate(cb); + } +}; + +static uint64_t dictCStringHash(const void *key) { + return dictGenHashFunction((unsigned char*)key, strlen((char*)key)); +} + + +static void dictKeyDestructor(void *privdata, void *key) +{ + DICT_NOTUSED(privdata); + sdsfree((sds)key); +} + +static int dictCStringCompare(void *, const void *key1, const void *key2) +{ + int l1,l2; + + l1 = strlen((sds)key1); + l2 = strlen((sds)key2); + if (l1 != l2) return 0; + return memcmp(key1, key2, l1) == 0; +} + +dictType metadataLongLongDictType = { + dictCStringHash, /* hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + dictCStringCompare, /* key compare */ + dictKeyDestructor, /* key destructor */ + nullptr, /* val destructor */ + nullptr, /* allow to expand */ + nullptr /* async free destructor */ +}; + +SnapshotPayloadParseState::ParseStageName SnapshotPayloadParseState::getNextStage() { + if (stackParse.empty()) + return ParseStageName::Global; + + switch (stackParse.top().name) + { + case ParseStageName::None: + return ParseStageName::Global; + + case ParseStageName::Global: + if (stackParse.top().arraycur == 0) + return ParseStageName::MetaData; + else if (stackParse.top().arraycur == 1) + return ParseStageName::Databases; + break; + + case ParseStageName::MetaData: + return ParseStageName::KeyValuePair; + + case ParseStageName::Databases: + return ParseStageName::Dataset; + + case ParseStageName::Dataset: + return ParseStageName::KeyValuePair; + + default: + break; + } + throw "Bad protocol: corrupt state"; +} + +void SnapshotPayloadParseState::flushQueuedKeys() { + if (vecqueuedKeys.empty()) + return; + + // TODO: We can't finish parse until all the work functions finish + int idb = current_database; + serverAssert(vecqueuedKeys.size() == vecqueuedVals.size()); + auto sizePrev = vecqueuedKeys.size(); + ++insertsInFlight; + auto &insertsInFlightTmp = insertsInFlight; // C++ GRRRRRRRRRRRRRRRR, we don't want to capute "this" because that's dangerous + g_pserver->asyncworkqueue->AddWorkFunction([idb, vecqueuedKeys = std::move(this->vecqueuedKeys), vecqueuedKeysCb = std::move(this->vecqueuedKeysCb), vecqueuedVals = std::move(this->vecqueuedVals), vecqueuedValsCb = std::move(this->vecqueuedValsCb), &insertsInFlightTmp, pallocator = m_spallocator.release()]() mutable { + g_pserver->db[idb]->bulkStorageInsert(vecqueuedKeys.data(), vecqueuedKeysCb.data(), vecqueuedVals.data(), vecqueuedValsCb.data(), vecqueuedKeys.size()); + --insertsInFlightTmp; + delete pallocator; + }); + m_spallocator = std::make_unique(); + cbQueued = 0; + vecqueuedKeys.reserve(sizePrev); + vecqueuedKeysCb.reserve(sizePrev); + vecqueuedVals.reserve(sizePrev); + vecqueuedValsCb.reserve(sizePrev); + serverAssert(vecqueuedKeys.empty()); + serverAssert(vecqueuedVals.empty()); +} + + +SnapshotPayloadParseState::SnapshotPayloadParseState() { + // This is to represent the first array the client is intended to send us + ParseStage stage; + stage.name = ParseStageName::None; + stage.arraylen = 1; + stackParse.push(stage); + + dictLongLongMetaData = dictCreate(&metadataLongLongDictType, nullptr); + insertsInFlight = 0; + m_spallocator = std::make_unique(); +} + +SnapshotPayloadParseState::~SnapshotPayloadParseState() { + dictRelease(dictLongLongMetaData); +} + +const char *SnapshotPayloadParseState::getStateDebugName() { + switch (stackParse.top().name) { + case ParseStageName::None: + return "None"; + + case ParseStageName::Global: + return "Global"; + + case ParseStageName::MetaData: + return "MetaData"; + + case ParseStageName::Databases: + return "Databases"; + + case ParseStageName::KeyValuePair: + return "KeyValuePair"; + + case ParseStageName::Dataset: + return "Dataset"; + + default: + return "Unknown"; + } +} + +size_t SnapshotPayloadParseState::depth() const { return stackParse.size(); } + +void SnapshotPayloadParseState::trimState() { + while (!stackParse.empty() && (stackParse.top().arraycur == stackParse.top().arraylen)) + stackParse.pop(); + + if (stackParse.empty()) { + flushQueuedKeys(); + while (insertsInFlight > 0) { + // TODO: ProcessEventsWhileBlocked + aeReleaseLock(); + aeAcquireLock(); + } + } +} + +void SnapshotPayloadParseState::pushArray(long long size) { + if (stackParse.empty()) + throw "Bad Protocol: unexpected trailing data"; + + if (size == 0) { + stackParse.top().arraycur++; + return; + } + + if (stackParse.top().name == ParseStageName::Databases) { + flushQueuedKeys(); + current_database = stackParse.top().arraycur; + } + + ParseStage stage; + stage.name = getNextStage(); + stage.arraylen = size; + + if (stage.name == ParseStageName::Dataset) { + g_pserver->db[current_database]->expand(stage.arraylen); + } + + // Note: This affects getNextStage so ensure its after + stackParse.top().arraycur++; + stackParse.push(stage); +} + +void SnapshotPayloadParseState::pushValue(const char *rgch, long long cch) { + if (stackParse.empty()) + throw "Bad Protocol: unexpected trailing bulk string"; + + if (stackParse.top().arraycur >= static_cast(stackParse.top().arrvalues.size())) + throw "Bad protocol: Unexpected value"; + + auto &stage = stackParse.top(); + stage.arrvalues[stackParse.top().arraycur].first = (char*)m_spallocator->allocate(cch); + stage.arrvalues[stackParse.top().arraycur].second = cch; + memcpy(stage.arrvalues[stackParse.top().arraycur].first, rgch, cch); + stage.arraycur++; + switch (stage.name) { + case ParseStageName::KeyValuePair: + if (stage.arraycur == 2) { + // We loaded both pairs + if (stage.arrvalues[0].first == nullptr || stage.arrvalues[1].first == nullptr) + throw "Bad Protocol: Got array when expecing a string"; // A baddy could make us derefence the vector when its too small + vecqueuedKeys.push_back(stage.arrvalues[0].first); + vecqueuedKeysCb.push_back(stage.arrvalues[0].second); + vecqueuedVals.push_back(stage.arrvalues[1].first); + vecqueuedValsCb.push_back(stage.arrvalues[1].second); + stage.arrvalues[0].first = nullptr; + stage.arrvalues[1].first = nullptr; + cbQueued += vecqueuedKeysCb.back(); + cbQueued += vecqueuedValsCb.back(); + if (cbQueued >= queuedBatchLimit) + flushQueuedKeys(); + } + break; + + default: + throw "Bad Protocol: unexpected bulk string"; + } +} + +void SnapshotPayloadParseState::pushValue(long long value) { + if (stackParse.empty()) + throw "Bad Protocol: unexpected integer value"; + + stackParse.top().arraycur++; + + if (stackParse.top().arraycur != 2 || stackParse.top().arrvalues[0].first == nullptr) + throw "Bad Protocol: unexpected integer value"; + + dictEntry *de = dictAddRaw(dictLongLongMetaData, sdsnewlen(stackParse.top().arrvalues[0].first, stackParse.top().arrvalues[0].second), nullptr); + if (de == nullptr) + throw "Bad Protocol: metadata field sent twice"; + de->v.s64 = value; +} + +long long SnapshotPayloadParseState::getMetaDataLongLong(const char *szField) const { + dictEntry *de = dictFind(dictLongLongMetaData, szField); + + if (de == nullptr) { + serverLog(LL_WARNING, "Master did not send field: %s", szField); + throw false; + } + return de->v.s64; +} \ No newline at end of file diff --git a/src/SnapshotPayloadParseState.h b/src/SnapshotPayloadParseState.h new file mode 100644 index 000000000..fee42178a --- /dev/null +++ b/src/SnapshotPayloadParseState.h @@ -0,0 +1,64 @@ +#pragma once +#include +#include + +class SlabAllocator; + +class SnapshotPayloadParseState { + enum class ParseStageName { + None, + Global, + MetaData, + Databases, + Dataset, + KeyValuePair, + }; + struct ParseStage { + ParseStageName name; + long long arraylen = 0; + long long arraycur = 0; + std::array, 2> arrvalues; + + ParseStage() { + for (auto &pair : arrvalues) { + pair.first = nullptr; + pair.second = 0; + } + } + }; + + std::stack stackParse; + + + std::vector vecqueuedKeys; + std::vector vecqueuedKeysCb; + std::vector vecqueuedVals; + std::vector vecqueuedValsCb; + + + std::atomic insertsInFlight; + std::unique_ptr m_spallocator; + dict *dictLongLongMetaData = nullptr; + size_t cbQueued = 0; + static const size_t queuedBatchLimit = 64*1024*1024; // 64 MB + int current_database = -1; + + ParseStageName getNextStage(); + void flushQueuedKeys(); + +public: + SnapshotPayloadParseState(); + ~SnapshotPayloadParseState(); + + long long getMetaDataLongLong(const char *field) const; + + const char *getStateDebugName(); + + size_t depth() const; + + void trimState(); + void pushArray(long long size); + void pushValue(const char *rgch, long long cch); + void pushValue(long long value); + bool shouldThrottle() const { return insertsInFlight > (cserver.cthreads*4); } +}; \ No newline at end of file diff --git a/src/StorageCache.cpp b/src/StorageCache.cpp index af2ac12b7..cdc380105 100644 --- a/src/StorageCache.cpp +++ b/src/StorageCache.cpp @@ -97,17 +97,47 @@ void StorageCache::insert(sds key, const void *data, size_t cbdata, bool fOverwr m_spstorage->insert(key, sdslen(key), (void*)data, cbdata, fOverwrite); } -void StorageCache::bulkInsert(sds *rgkeys, sds *rgvals, size_t celem) +long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing); +void StorageCache::bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem) { + std::vector vechashes; + if (m_pdict != nullptr) { + vechashes.reserve(celem); + + for (size_t ielem = 0; ielem < celem; ++ielem) { + dictEntry *de = (dictEntry*)zmalloc(sizeof(dictEntry)); + de->key = (void*)dictGenHashFunction(rgkeys[ielem], (int)rgcbkeys[ielem]); + de->v.u64 = 1; + vechashes.push_back(de); + } + } + std::unique_lock ul(m_lock); bulkInsertsInProgress++; if (m_pdict != nullptr) { - for (size_t ielem = 0; ielem < celem; ++ielem) { - cacheKey(rgkeys[ielem]); + for (dictEntry *de : vechashes) { + if (dictIsRehashing(m_pdict)) dictRehash(m_pdict,1); + /* Get the index of the new element, or -1 if + * the element already exists. */ + long index; + if ((index = _dictKeyIndex(m_pdict, de->key, (uint64_t)de->key, nullptr)) == -1) { + dictEntry *de = dictFind(m_pdict, de->key); + serverAssert(de != nullptr); + de->v.s64++; + m_collisionCount++; + zfree(de); + } else { + int iht = dictIsRehashing(m_pdict) ? 1 : 0; + de->next = m_pdict->ht[iht].table[index]; + m_pdict->ht[iht].table[index] = de; + m_pdict->ht[iht].used++; + } } } ul.unlock(); - m_spstorage->bulkInsert(rgkeys, rgvals, celem); + + m_spstorage->bulkInsert(rgkeys, rgcbkeys, rgvals, rgcbvals, celem); + bulkInsertsInProgress--; } @@ -119,6 +149,14 @@ const StorageCache *StorageCache::clone() return cacheNew; } +void StorageCache::expand(uint64_t slots) +{ + std::unique_lock ul(m_lock); + if (m_pdict) { + dictExpand(m_pdict, slots); + } +} + void StorageCache::retrieve(sds key, IStorage::callbackSingle fn) const { std::unique_lock ul(m_lock); diff --git a/src/StorageCache.h b/src/StorageCache.h index 184eb60bd..879b512dc 100644 --- a/src/StorageCache.h +++ b/src/StorageCache.h @@ -40,11 +40,12 @@ public: void clear(); void insert(sds key, const void *data, size_t cbdata, bool fOverwrite); - void bulkInsert(sds *rgkeys, sds *rgvals, size_t celem); + void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem); void retrieve(sds key, IStorage::callbackSingle fn) const; bool erase(sds key); void emergencyFreeCache(); bool keycacheIsEnabled() const { return m_pdict != nullptr; } + void expand(uint64_t slots); bool enumerate(IStorage::callback fn) const { return m_spstorage->enumerate(fn); } diff --git a/src/db.cpp b/src/db.cpp index cc6092831..8417a008a 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -2903,8 +2903,10 @@ bool redisDbPersistentData::processChanges(bool fSnapshot) { if (m_fAllChanged) { - m_spstorage->clear(); - storeDatabase(); + if (dictSize(m_pdict) > 0 || m_spstorage->count() > 0) { // in some cases we may have pre-sized the StorageCache's dict, and we don't want clear to ruin it + m_spstorage->clear(); + storeDatabase(); + } m_fAllChanged = 0; } else @@ -2936,15 +2938,19 @@ void redisDbPersistentData::processChangesAsync(std::atomic &pendingJobs) dictIterator *di = dictGetIterator(dictNew); dictEntry *de; std::vector veckeys; + std::vector veccbkeys; std::vector vecvals; + std::vector veccbvals; while ((de = dictNext(di)) != nullptr) { robj *o = (robj*)dictGetVal(de); sds temp = serializeStoredObjectAndExpire(this, (const char*) dictGetKey(de), o); veckeys.push_back((sds)dictGetKey(de)); + veccbkeys.push_back(sdslen((sds)dictGetKey(de))); vecvals.push_back(temp); + veccbvals.push_back(sdslen(temp)); } - m_spstorage->bulkInsert(veckeys.data(), vecvals.data(), veckeys.size()); + m_spstorage->bulkInsert(veckeys.data(), veccbkeys.data(), vecvals.data(), veccbvals.data(), veckeys.size()); for (auto val : vecvals) sdsfree(val); dictReleaseIterator(di); @@ -2953,6 +2959,11 @@ void redisDbPersistentData::processChangesAsync(std::atomic &pendingJobs) }); } +void redisDbPersistentData::bulkStorageInsert(char **rgKeys, size_t *rgcbKeys, char **rgVals, size_t *rgcbVals, size_t celem) +{ + m_spstorage->bulkInsert(rgKeys, rgcbKeys, rgVals, rgcbVals, celem); +} + void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot **psnapshotFree) { if (m_pdbSnapshotStorageFlush) diff --git a/src/dict.cpp b/src/dict.cpp index e10567c8d..0268c19d2 100644 --- a/src/dict.cpp +++ b/src/dict.cpp @@ -63,7 +63,7 @@ static unsigned int dict_force_resize_ratio = 5; static int _dictExpandIfNeeded(dict *ht); static unsigned long _dictNextPower(unsigned long size); -static long _dictKeyIndex(dict *ht, const void *key, uint64_t hash, dictEntry **existing); +long _dictKeyIndex(dict *ht, const void *key, uint64_t hash, dictEntry **existing); static int _dictInit(dict *ht, dictType *type, void *privDataPtr); /* -------------------------- hash functions -------------------------------- */ @@ -1366,7 +1366,7 @@ static unsigned long _dictNextPower(unsigned long size) * * Note that if we are in the process of rehashing the hash table, the * index is always returned in the context of the second (new) hash table. */ -static long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing) +long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing) { unsigned long idx, table; dictEntry *he; diff --git a/src/networking.cpp b/src/networking.cpp index 29c4ac206..4d4eb32c0 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -237,7 +237,7 @@ void clientInstallWriteHandler(client *c) { * writes at this stage. */ if (!(c->flags & CLIENT_PENDING_WRITE) && - (c->replstate == REPL_STATE_NONE || + (c->replstate == REPL_STATE_NONE || c->replstate == SLAVE_STATE_FASTSYNC_TX || c->replstate == SLAVE_STATE_FASTSYNC_DONE || (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack))) { AssertCorrectThread(c); @@ -1566,7 +1566,7 @@ bool freeClient(client *c) { /* If a client is protected, yet we need to free it right now, make sure * to at least use asynchronous freeing. */ - if (c->flags & CLIENT_PROTECTED || c->casyncOpsPending) { + if (c->flags & CLIENT_PROTECTED || c->casyncOpsPending || c->replstate == SLAVE_STATE_FASTSYNC_TX) { freeClientAsync(c); return false; } @@ -1791,7 +1791,7 @@ int writeToClient(client *c, int handler_installed) { /* We can only directly read from the replication backlog if the client is a replica, so only attempt to do so if that's the case. */ - if (c->flags & CLIENT_SLAVE && !(c->flags & CLIENT_MONITOR)) { + if (c->flags & CLIENT_SLAVE && !(c->flags & CLIENT_MONITOR) && c->replstate == SLAVE_STATE_ONLINE) { std::unique_lock repl_backlog_lock (g_pserver->repl_backlog_lock); while (clientHasPendingReplies(c)) { @@ -1833,9 +1833,11 @@ int writeToClient(client *c, int handler_installed) { } } else { while(clientHasPendingReplies(c)) { - serverAssert(!(c->flags & CLIENT_SLAVE) || c->flags & CLIENT_MONITOR); if (c->bufpos > 0) { - nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen); + auto bufpos = c->bufpos; + lock.unlock(); + nwritten = connWrite(c->conn,c->buf+c->sentlen,bufpos-c->sentlen); + lock.lock(); if (nwritten <= 0) break; c->sentlen += nwritten; totwritten += nwritten; @@ -1854,7 +1856,10 @@ int writeToClient(client *c, int handler_installed) { continue; } - nwritten = connWrite(c->conn, o->buf() + c->sentlen, o->used - c->sentlen); + auto used = o->used; + lock.unlock(); + nwritten = connWrite(c->conn, o->buf() + c->sentlen, used - c->sentlen); + lock.lock(); if (nwritten <= 0) break; c->sentlen += nwritten; totwritten += nwritten; @@ -1993,7 +1998,7 @@ void ProcessPendingAsyncWrites() ae_flags |= AE_BARRIER; } - if (!((c->replstate == REPL_STATE_NONE || + if (!((c->replstate == REPL_STATE_NONE || c->replstate == SLAVE_STATE_FASTSYNC_TX || (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))) continue; @@ -2063,9 +2068,9 @@ int handleClientsWithPendingWrites(int iel, int aof_state) { /* If a client is protected, don't do anything, * that may trigger write error or recreate handler. */ - if (flags & CLIENT_PROTECTED) continue; + if ((flags & CLIENT_PROTECTED) && !(flags & CLIENT_SLAVE)) continue; - std::unique_locklock)> lock(c->lock); + //std::unique_locklock)> lock(c->lock); /* Don't write to clients that are going to be closed anyway. */ if (c->flags & CLIENT_CLOSE_ASAP) continue; @@ -2075,11 +2080,9 @@ int handleClientsWithPendingWrites(int iel, int aof_state) { { if (c->flags & CLIENT_CLOSE_ASAP) { - lock.release(); // still locked AeLocker ae; - ae.arm(c); - if (!freeClient(c)) // writeToClient will only async close, but there's no need to wait - c->lock.unlock(); // if we just got put on the async close list, then we need to remove the lock + ae.arm(nullptr); + freeClient(c); // writeToClient will only async close, but there's no need to wait } continue; } @@ -3829,7 +3832,7 @@ void asyncCloseClientOnOutputBufferLimitReached(client *c) { if (!c->conn) return; /* It is unsafe to free fake clients. */ serverAssert(c->reply_bytes < SIZE_MAX-(1024*64)); if (c->reply_bytes == 0 || c->flags & CLIENT_CLOSE_ASAP) return; - if (checkClientOutputBufferLimits(c)) { + if (checkClientOutputBufferLimits(c) && c->replstate != SLAVE_STATE_FASTSYNC_TX) { sds client = catClientInfoString(sdsempty(),c); freeClientAsync(c); diff --git a/src/replication.cpp b/src/replication.cpp index 91036723a..d8ffcca99 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -34,6 +34,7 @@ #include "cluster.h" #include "bio.h" #include "aelocker.h" +#include "SnapshotPayloadParseState.h" #include #include @@ -929,6 +930,224 @@ need_full_resync: return C_ERR; } +int checkClientOutputBufferLimits(client *c); +class replicationBuffer { + std::vector replicas; + clientReplyBlock *reply = nullptr; + size_t writtenBytesTracker = 0; + +public: + replicationBuffer() { + reply = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + (PROTO_REPLY_CHUNK_BYTES*2)); + reply->size = zmalloc_usable_size(reply) - sizeof(clientReplyBlock); + reply->used = 0; + } + + ~replicationBuffer() { + zfree(reply); + } + + void addReplica(client *replica) { + replicas.push_back(replica); + replicationSetupSlaveForFullResync(replica,getPsyncInitialOffset()); + // Optimize the socket for bulk transfer + //connDisableTcpNoDelay(replica->conn); + } + + bool isActive() const { return !replicas.empty(); } + + void flushData() { + if (reply == nullptr) + return; + size_t written = reply->used; + aeAcquireLock(); + for (size_t ireplica = 0; ireplica < replicas.size(); ++ireplica) { + auto replica = replicas[ireplica]; + if (replica->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP) { + replica->replstate = REPL_STATE_NONE; + continue; + } + + while (checkClientOutputBufferLimits(replica) + && (replica->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP) == 0) { + aeReleaseLock(); + usleep(10); + aeAcquireLock(); + } + + if (ireplica == replicas.size()-1 && replica->replyAsync == nullptr) { + if (prepareClientToWrite(replica) == C_OK) { + replica->replyAsync = reply; + reply = nullptr; + } + } else { + addReplyProto(replica, reply->buf(), reply->size); + } + } + ProcessPendingAsyncWrites(); + replicas.erase(std::remove_if(replicas.begin(), replicas.end(), [](const client *c)->bool{ return c->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP;}), replicas.end()); + aeReleaseLock(); + if (reply != nullptr) { + reply->used = 0; + } + writtenBytesTracker += written; + } + + void addData(const char *data, unsigned long size) { + if (reply != nullptr && (size + reply->used) > reply->size) + flushData(); + + if (reply != nullptr && reply->size < size) { + serverAssert(reply->used == 0); // flush should have happened + zfree(reply); + reply = nullptr; + } + + if (reply == nullptr) { + reply = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + std::max(size, (unsigned long)(PROTO_REPLY_CHUNK_BYTES*2))); + reply->size = zmalloc_usable_size(reply) - sizeof(clientReplyBlock); + reply->used = 0; + } + + serverAssert((reply->size - reply->used) >= size); + memcpy(reply->buf() + reply->used, data, size); + reply->used += size; + } + + void addLongWithPrefix(long val, char prefix) { + char buf[128]; + int len; + + buf[0] = prefix; + len = ll2string(buf+1,sizeof(buf)-1,val); + buf[len+1] = '\r'; + buf[len+2] = '\n'; + addData(buf, len+3); + } + + void addArrayLen(int len) { + addLongWithPrefix(len, '*'); + } + + void addLong(long val) { + addLongWithPrefix(val, ':'); + } + + void addString(const char *s, unsigned long len) { + addLongWithPrefix(len, '$'); + addData(s, len); + addData("\r\n", 2); + } + + size_t cbWritten() const { return writtenBytesTracker; } + + void end() { + flushData(); + for (auto replica : replicas) { + // Return to original settings + //if (!g_pserver->repl_disable_tcp_nodelay) + // connEnableTcpNoDelay(replica->conn); + } + } + + void putSlavesOnline() { + for (auto replica : replicas) { + replica->replstate = SLAVE_STATE_FASTSYNC_DONE; + replica->repl_put_online_on_ack = 1; + } + } +}; + +int rdbSaveSnapshotForReplication(struct rdbSaveInfo *rsi) { + // TODO: This needs to be on a background thread + int retval = C_OK; + serverAssert(GlobalLocksAcquired()); + serverLog(LL_NOTICE, "Starting storage provider fast full sync with target: %s", "disk"); + + std::shared_ptr spreplBuf = std::make_shared(); + listNode *ln; + listIter li; + client *replica = nullptr; + listRewind(g_pserver->slaves, &li); + while (replica == nullptr && (ln = listNext(&li))) { + client *replicaCur = (client*)listNodeValue(ln); + if ((replicaCur->slave_capa & SLAVE_CAPA_ROCKSDB_SNAPSHOT) && (replicaCur->replstate == SLAVE_STATE_WAIT_BGSAVE_START)) { + replica = replicaCur; + spreplBuf->addReplica(replica); + replicaCur->replstate = SLAVE_STATE_FASTSYNC_TX; + } + } + serverAssert(replica != nullptr); + + g_pserver->asyncworkqueue->AddWorkFunction([repl_stream_db = rsi->repl_stream_db, spreplBuf = std::move(spreplBuf)]{ + int retval = C_OK; + auto timeStart = ustime(); + auto lastLogTime = timeStart; + size_t cbData = 0; + size_t cbLastUpdate = 0; + auto &replBuf = *spreplBuf; + + replBuf.addArrayLen(2); // Two sections: Metadata and databases + + // MetaData + replBuf.addArrayLen(1); + replBuf.addArrayLen(2); + replBuf.addString("repl-stream-db", 14); + replBuf.addLong(repl_stream_db); + + // Databases + replBuf.addArrayLen(cserver.dbnum); + for (int idb = 0; idb < cserver.dbnum; ++idb) { + std::unique_ptr spsnapshot = g_pserver->db[idb]->CloneStorageCache(); + size_t snapshotDeclaredCount = spsnapshot->count(); + replBuf.addArrayLen(snapshotDeclaredCount); + size_t count = 0; + bool result = spsnapshot->enumerate([&replBuf, &count, &cbData, &lastLogTime, timeStart, &cbLastUpdate](const char *rgchKey, size_t cchKey, const void *rgchVal, size_t cchVal) -> bool{ + replBuf.addArrayLen(2); + + replBuf.addString(rgchKey, cchKey); + replBuf.addString((const char *)rgchVal, cchVal); + ++count; + if ((count % 8092) == 0) { + auto curTime = ustime(); + if ((curTime - lastLogTime) > 60000000) { + auto usec = curTime - lastLogTime; + serverLog(LL_NOTICE, "Replication status: Transferred %zuMB (%.2fGbit/s)", replBuf.cbWritten()/1024/1024, ((replBuf.cbWritten()-cbLastUpdate)*8.0)/(usec/1000000.0)/1000000000.0); + cbLastUpdate = replBuf.cbWritten(); + lastLogTime = ustime(); + } + } + cbData += cchKey + cchVal; + return replBuf.isActive(); + }); + + if (!result) { + retval = C_ERR; + break; + } + serverAssert(count == snapshotDeclaredCount); + } + + replBuf.end(); + + if (!replBuf.isActive()) { + retval = C_ERR; + } + + serverLog(LL_NOTICE, "Snapshot replication done: %s", (retval == C_OK) ? "success" : "failed"); + auto usec = ustime() - timeStart; + serverLog(LL_NOTICE, "Transferred %zuMB total (%zuMB data) in %.2f seconds. (%.2fGbit/s)", spreplBuf->cbWritten()/1024/1024, cbData/1024/1024, usec/1000000.0, (spreplBuf->cbWritten()*8.0)/(usec/1000000.0)/1000000000.0); + if (retval == C_OK) { + aeAcquireLock(); + replicationScriptCacheFlush(); + replBuf.putSlavesOnline(); + aeReleaseLock(); + } + }); + + return retval; +} + /* Start a BGSAVE for replication goals, which is, selecting the disk or * socket target depending on the configuration, and making sure that * the script cache is flushed before to start. @@ -962,7 +1181,9 @@ int startBgsaveForReplication(int mincapa) { /* Only do rdbSave* when rsiptr is not NULL, * otherwise replica will miss repl-stream-db. */ if (rsiptr) { - if (socket_target) + if (mincapa & SLAVE_CAPA_ROCKSDB_SNAPSHOT && g_pserver->m_pstorageFactory) + retval = rdbSaveSnapshotForReplication(rsiptr); + else if (socket_target) retval = rdbSaveToSlavesSockets(rsiptr); else retval = rdbSaveBackground(rsiptr); @@ -1334,6 +1555,8 @@ void replconfCommand(client *c) { c->slave_capa |= SLAVE_CAPA_PSYNC2; else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]), "activeExpire")) c->slave_capa |= SLAVE_CAPA_ACTIVE_EXPIRE; + else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]), "rocksdb-snapshot-load")) + c->slave_capa |= SLAVE_CAPA_ROCKSDB_SNAPSHOT; fCapaCommand = true; } else if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"ack")) { @@ -1358,7 +1581,7 @@ void replconfCommand(client *c) { * quick check first (instead of waiting for the next ACK. */ if (g_pserver->child_type == CHILD_TYPE_RDB && c->replstate == SLAVE_STATE_WAIT_BGSAVE_END) checkChildrenDone(); - if (c->repl_put_online_on_ack && c->replstate == SLAVE_STATE_ONLINE) + if (c->repl_put_online_on_ack && (c->replstate == SLAVE_STATE_ONLINE || c->replstate == SLAVE_STATE_FASTSYNC_DONE)) putSlaveOnline(c); /* Note: this command does not reply anything! */ return; @@ -1400,6 +1623,8 @@ void replconfCommand(client *c) { sds reply = sdsnew("+OK"); if (g_pserver->fActiveReplica) reply = sdscat(reply, " active-replica"); + if (g_pserver->m_pstorageFactory && (c->slave_capa & SLAVE_CAPA_ROCKSDB_SNAPSHOT) && !g_pserver->fActiveReplica) + reply = sdscat(reply, " rocksdb-snapshot-save"); reply = sdscat(reply, "\r\n"); addReplySds(c, reply); } else { @@ -2074,25 +2299,163 @@ void disklessLoadDiscardBackup(const dbBackup *buckup, int flag) { discardDbBackup(buckup, flag, replicationEmptyDbCallback); } +size_t parseCount(const char *rgch, size_t cch, long long *pvalue) { + size_t cchNumeral = 0; + + while ((cch > (cchNumeral+1)) && isdigit(rgch[1 + cchNumeral])) ++cchNumeral; + + if (cch < (cchNumeral+1+2)) { // +2 is for the \r\n we expect + throw true; // continuable + } + + if (rgch[cchNumeral+1] != '\r' || rgch[cchNumeral+2] != '\n') { + serverLog(LL_WARNING, "Bad protocol from MASTER: %s", rgch); + throw false; + } + + if (!string2ll(rgch+1, cchNumeral, pvalue) || *pvalue < 0) { + serverLog(LL_WARNING, "Bad protocol from MASTER: %s", rgch); + throw false; + } + + return cchNumeral + 3; +} + +bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi) { + int fUpdate = g_pserver->fActiveReplica || g_pserver->enable_multimaster; + serverAssert(GlobalLocksAcquired()); + serverAssert(mi->master == nullptr); + bool fFinished = false; + + if (mi->bulkreadBuffer == nullptr) { + mi->bulkreadBuffer = sdsempty(); + mi->parseState = new SnapshotPayloadParseState(); + if (!fUpdate) { + int empty_db_flags = g_pserver->repl_slave_lazy_flush ? EMPTYDB_ASYNC : + EMPTYDB_NO_FLAGS; + serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data"); + emptyDb(-1,empty_db_flags,replicationEmptyDbCallback); + } + } + + for (int iter = 0; iter < 10; ++iter) { + if (mi->parseState->shouldThrottle()) + return false; + + auto readlen = PROTO_IOBUF_LEN; + auto qblen = sdslen(mi->bulkreadBuffer); + mi->bulkreadBuffer = sdsMakeRoomFor(mi->bulkreadBuffer, readlen); + + auto nread = connRead(conn, mi->bulkreadBuffer+qblen, readlen); + if (nread <= 0) { + if (connGetState(conn) != CONN_STATE_CONNECTED) + cancelReplicationHandshake(mi, true); + return false; + } + mi->repl_transfer_lastio = g_pserver->unixtime; + sdsIncrLen(mi->bulkreadBuffer,nread); + + size_t offset = 0; + + try { + if (sdslen(mi->bulkreadBuffer) > cserver.client_max_querybuf_len) { + throw "Full Sync Streaming Buffer Exceeded (increase client_max_querybuf_len)"; + } + + while (sdslen(mi->bulkreadBuffer) > offset) { + // Pop completed items + mi->parseState->trimState(); + + if (mi->bulkreadBuffer[offset] == '*') { + // Starting an array + long long arraySize; + + // Lets read the array length + offset += parseCount(mi->bulkreadBuffer + offset, sdslen(mi->bulkreadBuffer) - offset, &arraySize); + + mi->parseState->pushArray(arraySize); + } else if (mi->bulkreadBuffer[offset] == '$') { + // Loading in a string + long long payloadsize = 0; + + // Lets read the string length + size_t offsetCount = parseCount(mi->bulkreadBuffer + offset, sdslen(mi->bulkreadBuffer) - offset, &payloadsize); + + // OK we know how long the string is, now lets make sure the payload is here. + if (sdslen(mi->bulkreadBuffer) < (offset + offsetCount + payloadsize + 2)) { + goto LContinue; // wait for more data (note: we could throw true here, but throw is way more expensive) + } + + mi->parseState->pushValue(mi->bulkreadBuffer + offset + offsetCount, payloadsize); + + // On to the next one + offset += offsetCount + payloadsize + 2; + } else if (mi->bulkreadBuffer[offset] == ':') { + // Numeral + long long value; + + size_t offsetValue = parseCount(mi->bulkreadBuffer + offset, sdslen(mi->bulkreadBuffer) - offset, &value); + + mi->parseState->pushValue(value); + offset += offsetValue; + } else { + serverLog(LL_WARNING, "Bad protocol from MASTER: %s", mi->bulkreadBuffer+offset); + cancelReplicationHandshake(mi, true); + return false; + } + } + + sdsrange(mi->bulkreadBuffer, offset, -1); + offset = 0; + + // Cleanup the remaining stack + mi->parseState->trimState(); + + if (mi->parseState->depth() != 0) + return false; + + rsi.repl_stream_db = mi->parseState->getMetaDataLongLong("repl-stream-db"); + fFinished = true; + break; // We're done!!! + } + catch (const char *sz) { + serverLog(LL_WARNING, "%s", sz); + cancelReplicationHandshake(mi, true); + return false; + } catch (bool fContinue) { + if (!fContinue) { + cancelReplicationHandshake(mi, true); + return false; + } + } + LContinue: + sdsrange(mi->bulkreadBuffer, offset, -1); + } + + if (!fFinished) + return false; + + serverLog(LL_NOTICE, "Fast sync complete"); + sdsfree(mi->bulkreadBuffer); + mi->bulkreadBuffer = nullptr; + delete mi->parseState; + mi->parseState = nullptr; + return true; +} + /* Asynchronously read the SYNC payload we receive from a master */ #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */ -void readSyncBulkPayload(connection *conn) { +bool readSyncBulkPayloadRdb(connection *conn, redisMaster *mi, rdbSaveInfo &rsi, int &usemark) { char buf[PROTO_IOBUF_LEN]; ssize_t nread, readlen, nwritten; int use_diskless_load = useDisklessLoad(); const dbBackup *diskless_load_backup = NULL; - rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; rsi.fForceSetKey = !!g_pserver->fActiveReplica; int empty_db_flags = g_pserver->repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS; off_t left; // Should we update our database, or create from scratch? int fUpdate = g_pserver->fActiveReplica || g_pserver->enable_multimaster; - redisMaster *mi = (redisMaster*)connGetPrivateData(conn); - if (mi == nullptr) { - // We're about to be free'd so bail out - return; - } serverAssert(GlobalLocksAcquired()); serverAssert(mi->master == nullptr); @@ -2101,7 +2464,6 @@ void readSyncBulkPayload(connection *conn) { * from the server: when they match, we reached the end of the transfer. */ static char eofmark[CONFIG_RUN_ID_SIZE]; static char lastbytes[CONFIG_RUN_ID_SIZE]; - static int usemark = 0; /* If repl_transfer_size == -1 we still have to read the bulk length * from the master reply. */ @@ -2123,7 +2485,7 @@ void readSyncBulkPayload(connection *conn) { * the connection live. So we refresh our last interaction * timestamp. */ mi->repl_transfer_lastio = g_pserver->unixtime; - return; + return false; } else if (buf[0] != '$') { serverLog(LL_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf); goto error; @@ -2157,7 +2519,7 @@ void readSyncBulkPayload(connection *conn) { (long long) mi->repl_transfer_size, use_diskless_load? "to parser":"to disk"); } - return; + return false; } if (!use_diskless_load) { @@ -2174,12 +2536,12 @@ void readSyncBulkPayload(connection *conn) { if (nread <= 0) { if (connGetState(conn) == CONN_STATE_CONNECTED) { /* equivalent to EAGAIN */ - return; + return false; } serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s", (nread == -1) ? strerror(errno) : "connection lost"); cancelReplicationHandshake(mi, true); - return; + return false; } g_pserver->stat_net_input_bytes += nread; @@ -2248,7 +2610,7 @@ void readSyncBulkPayload(connection *conn) { /* If the transfer is yet not complete, we need to read more, so * return ASAP and wait for the handler to be called again. */ - if (!eof_reached) return; + if (!eof_reached) return false; } /* We reach this point in one of the following cases: @@ -2320,7 +2682,7 @@ void readSyncBulkPayload(connection *conn) { /* Note that there's no point in restarting the AOF on SYNC * failure, it'll be restarted when sync succeeds or the replica * gets promoted. */ - return; + return false; } /* RDB loading succeeded if we reach this point. */ @@ -2340,7 +2702,7 @@ void readSyncBulkPayload(connection *conn) { serverLog(LL_WARNING,"Replication stream EOF marker is broken"); cancelReplicationHandshake(mi,true); rioFreeConn(&rdb, NULL); - return; + return false; } } @@ -2372,7 +2734,7 @@ void readSyncBulkPayload(connection *conn) { "MASTER <-> REPLICA synchronization: %s", strerror(errno)); cancelReplicationHandshake(mi,true); - return; + return false; } /* Rename rdb like renaming rewrite aof asynchronously. */ @@ -2385,7 +2747,7 @@ void readSyncBulkPayload(connection *conn) { g_pserver->rdb_filename, strerror(errno)); cancelReplicationHandshake(mi,true); if (old_rdb_fd != -1) close(old_rdb_fd); - return; + return false; } rdb_filename = g_pserver->rdb_filename; @@ -2415,7 +2777,7 @@ void readSyncBulkPayload(connection *conn) { } /* Note that there's no point in restarting the AOF on sync failure, it'll be restarted when sync succeeds or replica promoted. */ - return; + return false; } /* Cleanup. */ @@ -2433,6 +2795,32 @@ void readSyncBulkPayload(connection *conn) { mi->repl_transfer_tmpfile = NULL; } + return true; +error: + cancelReplicationHandshake(mi,true); + return false; +} + +void readSyncBulkPayload(connection *conn) { + rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; + redisMaster *mi = (redisMaster*)connGetPrivateData(conn); + static int usemark = 0; + if (mi == nullptr) { + // We're about to be free'd so bail out + return; + } + + if (mi->isRocksdbSnapshotRepl) { + if (!readSnapshotBulkPayload(conn, mi, rsi)) + return; + } else { + if (!readSyncBulkPayloadRdb(conn, mi, rsi, usemark)) + return; + } + + // Should we update our database, or create from scratch? + int fUpdate = g_pserver->fActiveReplica || g_pserver->enable_multimaster; + /* Final setup of the connected slave <- master link */ replicationCreateMasterClient(mi,mi->repl_transfer_s,rsi.repl_stream_db); mi->repl_transfer_s = nullptr; @@ -2475,17 +2863,13 @@ void readSyncBulkPayload(connection *conn) { } /* Send the initial ACK immediately to put this replica in online state. */ - if (usemark) replicationSendAck(mi); + if (usemark || mi->isRocksdbSnapshotRepl) replicationSendAck(mi); /* Restart the AOF subsystem now that we finished the sync. This * will trigger an AOF rewrite, and when done will start appending * to the new file. */ if (g_pserver->aof_enabled) restartAOFAfterSYNC(); return; - -error: - cancelReplicationHandshake(mi,true); - return; } char *receiveSynchronousResponse(redisMaster *mi, connection *conn) { @@ -2808,12 +3192,15 @@ void parseMasterCapa(redisMaster *mi, sds strcapa) char *pchEnd = szStart; mi->isActive = false; + mi->isRocksdbSnapshotRepl = false; for (;;) { if (*pchEnd == ' ' || *pchEnd == '\0') { // Parse the word if (strncmp(szStart, "active-replica", pchEnd - szStart) == 0) { mi->isActive = true; + } else if (strncmp(szStart, "rocksdb-snapshot-save", pchEnd - szStart) == 0) { + mi->isRocksdbSnapshotRepl = true; } szStart = pchEnd + 1; } @@ -2945,8 +3332,20 @@ void syncWithMaster(connection *conn) { * PSYNC2: supports PSYNC v2, so understands +CONTINUE . * * The master will ignore capabilities it does not understand. */ - err = sendCommand(conn,"REPLCONF", - "capa","eof","capa","psync2","capa","activeExpire",NULL); + + + std::vector veccapabilities = { + "REPLCONF", + "capa","eof", + "capa","psync2", + "capa","activeExpire", + }; + if (g_pserver->m_pstorageFactory && !g_pserver->fActiveReplica) { + veccapabilities.push_back("capa"); + veccapabilities.push_back("rocksdb-snapshot-load"); + } + + err = sendCommandArgv(conn, veccapabilities.size(), veccapabilities.data(), nullptr); if (err) goto write_error; /* Send UUID */ @@ -3290,6 +3689,15 @@ void replicationAbortSyncTransfer(redisMaster *mi) { * * Otherwise zero is returned and no operation is performed at all. */ int cancelReplicationHandshake(redisMaster *mi, int reconnect) { + if (mi->bulkreadBuffer != nullptr) { + sdsfree(mi->bulkreadBuffer); + mi->bulkreadBuffer = nullptr; + } + if (mi->parseState) { + delete mi->parseState; + mi->parseState = nullptr; + } + if (mi->repl_state == REPL_STATE_TRANSFER) { replicationAbortSyncTransfer(mi); mi->repl_state = REPL_STATE_CONNECT; @@ -4283,6 +4691,13 @@ void replicationCron(void) { client *replica = (client*)ln->value; std::unique_lock ul(replica->lock); + if (replica->replstate == SLAVE_STATE_FASTSYNC_DONE && !clientHasPendingReplies(replica)) { + serverLog(LL_WARNING, "Putting replica online"); + replica->postFunction([](client *c){ + putSlaveOnline(c); + }); + } + if (replica->replstate == SLAVE_STATE_ONLINE) { if (replica->flags & CLIENT_PRE_PSYNC) continue; diff --git a/src/sds.h b/src/sds.h index d16906f30..0d77f2772 100644 --- a/src/sds.h +++ b/src/sds.h @@ -369,6 +369,8 @@ public: bool operator==(const char *other) const { + if (other == nullptr || m_str == nullptr) + return other == m_str; return sdscmp(m_str, other) == 0; } @@ -396,8 +398,11 @@ public: {} sdsstring(const sdsstring &other) - : sdsview(sdsdup(other.m_str)) - {} + : sdsview() + { + if (other.m_str != nullptr) + m_str = sdsdup(other.m_str); + } sdsstring(const char *rgch, size_t cch) : sdsview(sdsnewlen(rgch, cch)) diff --git a/src/server.h b/src/server.h index 5a067090b..656560868 100644 --- a/src/server.h +++ b/src/server.h @@ -607,12 +607,15 @@ typedef enum { #define SLAVE_STATE_WAIT_BGSAVE_END 7 /* Waiting RDB file creation to finish. */ #define SLAVE_STATE_SEND_BULK 8 /* Sending RDB file to replica. */ #define SLAVE_STATE_ONLINE 9 /* RDB file transmitted, sending just updates. */ +#define SLAVE_STATE_FASTSYNC_TX 10 +#define SLAVE_STATE_FASTSYNC_DONE 11 /* Slave capabilities. */ #define SLAVE_CAPA_NONE 0 #define SLAVE_CAPA_EOF (1<<0) /* Can parse the RDB EOF streaming format. */ #define SLAVE_CAPA_PSYNC2 (1<<1) /* Supports PSYNC2 protocol. */ #define SLAVE_CAPA_ACTIVE_EXPIRE (1<<2) /* Will the slave perform its own expirations? (Don't send delete) */ +#define SLAVE_CAPA_ROCKSDB_SNAPSHOT (1<<3) /* Synchronous read timeout - replica side */ #define CONFIG_REPL_SYNCIO_TIMEOUT 5 @@ -1102,7 +1105,12 @@ public: size_t slots() const { return dictSlots(m_pdict); } size_t size(bool fCachedOnly = false) const; - void expand(uint64_t slots) { dictExpand(m_pdict, slots); } + void expand(uint64_t slots) { + if (m_spstorage) + m_spstorage->expand(slots); + else + dictExpand(m_pdict, slots); + } void trackkey(robj_roptr o, bool fUpdate) { @@ -1193,6 +1201,9 @@ public: bool FSnapshot() const { return m_spdbSnapshotHOLDER != nullptr; } + std::unique_ptr CloneStorageCache() { return std::unique_ptr(m_spstorage->clone()); } + void bulkStorageInsert(char **rgKeys, size_t *rgcbKeys, char **rgVals, size_t *rgcbVals, size_t celem); + dict_iter find_cached_threadsafe(const char *key) const; protected: @@ -1352,6 +1363,8 @@ struct redisDb : public redisDbPersistentDataSnapshot using redisDbPersistentData::prepOverwriteForSnapshot; using redisDbPersistentData::FRehashing; using redisDbPersistentData::FTrackingChanges; + using redisDbPersistentData::CloneStorageCache; + using redisDbPersistentData::bulkStorageInsert; public: expireset::setiter expireitr; @@ -1673,7 +1686,7 @@ struct client { size_t argv_len_sumActive = 0; bool FPendingReplicaWrite() const { - return repl_curr_off != repl_end_off; + return repl_curr_off != repl_end_off && replstate == SLAVE_STATE_ONLINE; } // post a function from a non-client thread to run on its client thread @@ -2058,6 +2071,7 @@ struct redisMaster { long long master_initial_offset; /* Master PSYNC offset. */ bool isActive = false; + bool isRocksdbSnapshotRepl = false; int repl_state; /* Replication status if the instance is a replica */ off_t repl_transfer_size; /* Size of RDB to read from master during sync. */ off_t repl_transfer_read; /* Amount of RDB read from master during sync. */ @@ -2068,6 +2082,9 @@ struct redisMaster { time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */ time_t repl_down_since; /* Unix time at which link with master went down */ + class SnapshotPayloadParseState *parseState; + sds bulkreadBuffer = nullptr; + unsigned char master_uuid[UUID_BINARY_LEN]; /* Used during sync with master, this is our master's UUID */ /* After we've connected with our master use the UUID in g_pserver->master */ uint64_t mvccLastSync; @@ -2829,7 +2846,7 @@ void addReplyDouble(client *c, double d); void addReplyHumanLongDouble(client *c, long double d); void addReplyLongLong(client *c, long long ll); #ifdef __cplusplus -void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix); +void addReplyLongLongWithPrefix(client *c, long long ll, char prefix); #endif void addReplyArrayLen(client *c, long length); void addReplyMapLen(client *c, long length); diff --git a/src/storage/rocksdbfactor_internal.h b/src/storage/rocksdbfactor_internal.h new file mode 100644 index 000000000..beb71d87e --- /dev/null +++ b/src/storage/rocksdbfactor_internal.h @@ -0,0 +1,28 @@ +#pragma once +#include "rocksdb.h" + +class RocksDBStorageFactory : public IStorageFactory +{ + std::shared_ptr m_spdb; // Note: This must be first so it is deleted last + std::vector> m_vecspcols; + std::shared_ptr m_pfilemanager; + std::string m_path; + bool m_fCreatedTempFolder = false; + +public: + RocksDBStorageFactory(const char *dbfile, int dbnum, const char *rgchConfig, size_t cchConfig); + ~RocksDBStorageFactory(); + + virtual IStorage *create(int db, key_load_iterator iter, void *privdata) override; + virtual const char *name() const override; + + virtual size_t totalDiskspaceUsed() const override; + + virtual bool FSlow() const override { return true; } + + virtual size_t filedsRequired() const override; + std::string getTempFolder(); + +private: + void setVersion(rocksdb::ColumnFamilyHandle*); +}; \ No newline at end of file diff --git a/tests/integration/replication-fast.tcl b/tests/integration/replication-fast.tcl new file mode 100644 index 000000000..f190524c1 --- /dev/null +++ b/tests/integration/replication-fast.tcl @@ -0,0 +1,34 @@ +proc prepare_value {size} { + set _v "c" + for {set i 1} {$i < $size} {incr i} { + append _v 0 + } + return $_v +} + +start_server {tags {"replication-fast"} overrides {storage-provider {flash ./rocks.db.master}}} { + set slave [srv 0 client] + set slave_host [srv 0 host] + set slave_port [srv 0 port] + start_server {tags {} overrides {storage-provider {flash ./rocks.db.replica}}} { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + + test "fast replication with large value" { + set _v [prepare_value [expr 64*1024*1024]] + # $master set key $_v + + $slave replicaof $master_host $master_port + wait_for_condition 50 300 { + [lindex [$slave role] 0] eq {slave} && + [string match {*master_link_status:up*} [$slave info replication]] + } else { + fail "Can't turn the instance into a replica" + } + + assert_equal [$slave debug digest] [$master debug digest] + $slave replicaof no one + } + } +} diff --git a/tests/support/server.tcl b/tests/support/server.tcl index c06dd0561..8b2aab137 100644 --- a/tests/support/server.tcl +++ b/tests/support/server.tcl @@ -1,4 +1,5 @@ set ::global_overrides {} +set ::global_storage_provider {} set ::tags {} set ::valgrind_errors {} @@ -392,6 +393,9 @@ proc start_server {options {code undefined}} { foreach {directive arguments} [concat $::global_overrides $overrides] { dict set config $directive $arguments } + foreach {directive argument1 argument2} $::global_storage_provider { + dict set config $directive $argument1 $argument2 + } # remove directives that are marked to be omitted foreach directive $omit { diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index 66f039511..f9ca7107b 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -59,6 +59,7 @@ set ::all_tests { integration/failover integration/redis-cli integration/redis-benchmark + integration/replication-fast unit/pubsub unit/slowlog unit/scripting @@ -692,6 +693,17 @@ for {set j 0} {$j < [llength $argv]} {incr j} { } elseif {$opt eq {--help}} { print_help_screen exit 0 + } elseif {$opt eq {--flash}} { + lappend ::global_storage_provider storage-provider + lappend ::global_storage_provider flash + lappend ::global_storage_provider ./rocks.db + set ::all_tests { + integration/replication + integration/replication-2 + integration/replication-3 + integration/replication-4 + integration/replication-psync + } } else { puts "Wrong argument: $opt" exit 1 From fcad4dfc6b0c7c426f375fc92f25cad40564cb5e Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 10 Nov 2021 05:29:13 +0000 Subject: [PATCH 02/20] Fix snapshot count mismatch due to race Former-commit-id: b4175c9bf82cbec047b94403e2a3700f4544b4f1 From 24fbc46352fbcd4015f15fb28887b78da5c91a41 Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 10 Nov 2021 06:40:43 +0000 Subject: [PATCH 03/20] Prevent track changes from clearing out the cache and misreporting the key count Former-commit-id: d7ceb59f11bc714e40f208cb8c774a5b762ad41e --- src/replication.cpp | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/replication.cpp b/src/replication.cpp index d8ffcca99..6cfe7d3ba 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -2335,6 +2335,13 @@ bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi EMPTYDB_NO_FLAGS; serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data"); emptyDb(-1,empty_db_flags,replicationEmptyDbCallback); + for (int idb = 0; idb < cserver.dbnum; ++idb) { + aeAcquireLock(); + g_pserver->db[idb]->processChanges(false); + aeReleaseLock(); + g_pserver->db[idb]->commitChanges(); + g_pserver->db[idb]->trackChanges(false); + } } } From 3ab09b9781c67cc92b4e994f0e805428e4b1eb13 Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 10 Nov 2021 17:39:00 +0000 Subject: [PATCH 04/20] Fix compile error on some machines Former-commit-id: 5f491a38cb7eae63fd316c465f317005bbced385 From 0b607ecfc373123ebebac54e941e4a70f6476c5d Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 10 Nov 2021 18:47:36 +0000 Subject: [PATCH 05/20] Fix multiple test failures Former-commit-id: ba99418643e7b4a12e79bb08e4a7f152da28b861 --- runtest | 1 - src/SnapshotPayloadParseState.cpp | 139 +++++++++++++++++++++--------- src/SnapshotPayloadParseState.h | 6 +- src/replication.cpp | 65 +++++++++++--- 4 files changed, 154 insertions(+), 57 deletions(-) diff --git a/runtest b/runtest index 6c9ea5ae4..c6349d118 100755 --- a/runtest +++ b/runtest @@ -14,4 +14,3 @@ then exit 1 fi $TCLSH tests/test_helper.tcl "${@}" -$TCLSH tests/test_helper.tcl "${@}--flash" \ No newline at end of file diff --git a/src/SnapshotPayloadParseState.cpp b/src/SnapshotPayloadParseState.cpp index 4bf89c718..8ba4b109b 100644 --- a/src/SnapshotPayloadParseState.cpp +++ b/src/SnapshotPayloadParseState.cpp @@ -84,19 +84,31 @@ dictType metadataLongLongDictType = { nullptr /* async free destructor */ }; +dictType metadataDictType = { + dictSdsHash, /* hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCompare, /* key compare */ + dictSdsDestructor, /* key destructor */ + dictSdsDestructor, /* val destructor */ + nullptr, /* allow to expand */ + nullptr /* async free destructor */ +}; + + SnapshotPayloadParseState::ParseStageName SnapshotPayloadParseState::getNextStage() { if (stackParse.empty()) return ParseStageName::Global; - switch (stackParse.top().name) + switch (stackParse.back().name) { case ParseStageName::None: return ParseStageName::Global; case ParseStageName::Global: - if (stackParse.top().arraycur == 0) + if (stackParse.back().arraycur == 0) return ParseStageName::MetaData; - else if (stackParse.top().arraycur == 1) + else if (stackParse.back().arraycur == 1) return ParseStageName::Databases; break; @@ -118,6 +130,7 @@ SnapshotPayloadParseState::ParseStageName SnapshotPayloadParseState::getNextStag void SnapshotPayloadParseState::flushQueuedKeys() { if (vecqueuedKeys.empty()) return; + serverAssert(current_database >= 0); // TODO: We can't finish parse until all the work functions finish int idb = current_database; @@ -125,11 +138,20 @@ void SnapshotPayloadParseState::flushQueuedKeys() { auto sizePrev = vecqueuedKeys.size(); ++insertsInFlight; auto &insertsInFlightTmp = insertsInFlight; // C++ GRRRRRRRRRRRRRRRR, we don't want to capute "this" because that's dangerous - g_pserver->asyncworkqueue->AddWorkFunction([idb, vecqueuedKeys = std::move(this->vecqueuedKeys), vecqueuedKeysCb = std::move(this->vecqueuedKeysCb), vecqueuedVals = std::move(this->vecqueuedVals), vecqueuedValsCb = std::move(this->vecqueuedValsCb), &insertsInFlightTmp, pallocator = m_spallocator.release()]() mutable { - g_pserver->db[idb]->bulkStorageInsert(vecqueuedKeys.data(), vecqueuedKeysCb.data(), vecqueuedVals.data(), vecqueuedValsCb.data(), vecqueuedKeys.size()); - --insertsInFlightTmp; - delete pallocator; - }); + if (current_database < cserver.dbnum) { + g_pserver->asyncworkqueue->AddWorkFunction([idb, vecqueuedKeys = std::move(this->vecqueuedKeys), vecqueuedKeysCb = std::move(this->vecqueuedKeysCb), vecqueuedVals = std::move(this->vecqueuedVals), vecqueuedValsCb = std::move(this->vecqueuedValsCb), &insertsInFlightTmp, pallocator = m_spallocator.release()]() mutable { + g_pserver->db[idb]->bulkStorageInsert(vecqueuedKeys.data(), vecqueuedKeysCb.data(), vecqueuedVals.data(), vecqueuedValsCb.data(), vecqueuedKeys.size()); + --insertsInFlightTmp; + delete pallocator; + }); + } else { + // else drop the data + vecqueuedKeys.clear(); + vecqueuedKeysCb.clear(); + vecqueuedVals.clear(); + vecqueuedValsCb.clear(); + // Note: m_spallocator will get free'd when overwritten below + } m_spallocator = std::make_unique(); cbQueued = 0; vecqueuedKeys.reserve(sizePrev); @@ -146,19 +168,21 @@ SnapshotPayloadParseState::SnapshotPayloadParseState() { ParseStage stage; stage.name = ParseStageName::None; stage.arraylen = 1; - stackParse.push(stage); + stackParse.push_back(stage); dictLongLongMetaData = dictCreate(&metadataLongLongDictType, nullptr); + dictMetaData = dictCreate(&metadataDictType, nullptr); insertsInFlight = 0; m_spallocator = std::make_unique(); } SnapshotPayloadParseState::~SnapshotPayloadParseState() { dictRelease(dictLongLongMetaData); + dictRelease(dictMetaData); } -const char *SnapshotPayloadParseState::getStateDebugName() { - switch (stackParse.top().name) { +const char *SnapshotPayloadParseState::getStateDebugName(ParseStage stage) { + switch (stage.name) { case ParseStageName::None: return "None"; @@ -185,8 +209,8 @@ const char *SnapshotPayloadParseState::getStateDebugName() { size_t SnapshotPayloadParseState::depth() const { return stackParse.size(); } void SnapshotPayloadParseState::trimState() { - while (!stackParse.empty() && (stackParse.top().arraycur == stackParse.top().arraylen)) - stackParse.pop(); + while (!stackParse.empty() && (stackParse.back().arraycur == stackParse.back().arraylen)) + stackParse.pop_back(); if (stackParse.empty()) { flushQueuedKeys(); @@ -203,13 +227,13 @@ void SnapshotPayloadParseState::pushArray(long long size) { throw "Bad Protocol: unexpected trailing data"; if (size == 0) { - stackParse.top().arraycur++; + stackParse.back().arraycur++; return; } - if (stackParse.top().name == ParseStageName::Databases) { + if (stackParse.back().name == ParseStageName::Databases) { flushQueuedKeys(); - current_database = stackParse.top().arraycur; + current_database = stackParse.back().arraycur; } ParseStage stage; @@ -221,43 +245,65 @@ void SnapshotPayloadParseState::pushArray(long long size) { } // Note: This affects getNextStage so ensure its after - stackParse.top().arraycur++; - stackParse.push(stage); + stackParse.back().arraycur++; + stackParse.push_back(stage); } void SnapshotPayloadParseState::pushValue(const char *rgch, long long cch) { if (stackParse.empty()) throw "Bad Protocol: unexpected trailing bulk string"; - if (stackParse.top().arraycur >= static_cast(stackParse.top().arrvalues.size())) + if (stackParse.back().arraycur >= static_cast(stackParse.back().arrvalues.size())) throw "Bad protocol: Unexpected value"; - auto &stage = stackParse.top(); - stage.arrvalues[stackParse.top().arraycur].first = (char*)m_spallocator->allocate(cch); - stage.arrvalues[stackParse.top().arraycur].second = cch; - memcpy(stage.arrvalues[stackParse.top().arraycur].first, rgch, cch); + auto &stage = stackParse.back(); + stage.arrvalues[stackParse.back().arraycur].first = (char*)m_spallocator->allocate(cch); + stage.arrvalues[stackParse.back().arraycur].second = cch; + memcpy(stage.arrvalues[stackParse.back().arraycur].first, rgch, cch); stage.arraycur++; switch (stage.name) { case ParseStageName::KeyValuePair: - if (stage.arraycur == 2) { - // We loaded both pairs - if (stage.arrvalues[0].first == nullptr || stage.arrvalues[1].first == nullptr) - throw "Bad Protocol: Got array when expecing a string"; // A baddy could make us derefence the vector when its too small - vecqueuedKeys.push_back(stage.arrvalues[0].first); - vecqueuedKeysCb.push_back(stage.arrvalues[0].second); - vecqueuedVals.push_back(stage.arrvalues[1].first); - vecqueuedValsCb.push_back(stage.arrvalues[1].second); - stage.arrvalues[0].first = nullptr; - stage.arrvalues[1].first = nullptr; - cbQueued += vecqueuedKeysCb.back(); - cbQueued += vecqueuedValsCb.back(); - if (cbQueued >= queuedBatchLimit) - flushQueuedKeys(); + if (stackParse.size() < 2) + throw "Bad Protocol: unexpected bulk string"; + if (stackParse[stackParse.size()-2].name == ParseStageName::MetaData) { + if (stage.arraycur == 2) { + // We loaded both pairs + if (stage.arrvalues[0].first == nullptr || stage.arrvalues[1].first == nullptr) + throw "Bad Protocol: Got array when expecing a string"; // A baddy could make us derefence the vector when its too small + + if (!strcasecmp(stage.arrvalues[0].first, "lua")) { + /* Load the script back in memory. */ + robj *auxval = createStringObject(stage.arrvalues[1].first, stage.arrvalues[1].second); + if (luaCreateFunction(NULL,g_pserver->lua,auxval) == NULL) { + throw "Can't load Lua script"; + } + } else { + dictAdd(dictMetaData, sdsnewlen(stage.arrvalues[0].first, stage.arrvalues[0].second), sdsnewlen(stage.arrvalues[1].first, stage.arrvalues[1].second)); + } + } + } else if (stackParse[stackParse.size()-2].name == ParseStageName::Dataset) { + if (stage.arraycur == 2) { + // We loaded both pairs + if (stage.arrvalues[0].first == nullptr || stage.arrvalues[1].first == nullptr) + throw "Bad Protocol: Got array when expecing a string"; // A baddy could make us derefence the vector when its too small + vecqueuedKeys.push_back(stage.arrvalues[0].first); + vecqueuedKeysCb.push_back(stage.arrvalues[0].second); + vecqueuedVals.push_back(stage.arrvalues[1].first); + vecqueuedValsCb.push_back(stage.arrvalues[1].second); + stage.arrvalues[0].first = nullptr; + stage.arrvalues[1].first = nullptr; + cbQueued += vecqueuedKeysCb.back(); + cbQueued += vecqueuedValsCb.back(); + if (cbQueued >= queuedBatchLimit) + flushQueuedKeys(); + } + } else { + throw "Bad Protocol: unexpected bulk string"; } break; default: - throw "Bad Protocol: unexpected bulk string"; + throw "Bad Protocol: unexpected bulk string out of KV pair"; } } @@ -265,12 +311,12 @@ void SnapshotPayloadParseState::pushValue(long long value) { if (stackParse.empty()) throw "Bad Protocol: unexpected integer value"; - stackParse.top().arraycur++; + stackParse.back().arraycur++; - if (stackParse.top().arraycur != 2 || stackParse.top().arrvalues[0].first == nullptr) + if (stackParse.back().arraycur != 2 || stackParse.back().arrvalues[0].first == nullptr) throw "Bad Protocol: unexpected integer value"; - dictEntry *de = dictAddRaw(dictLongLongMetaData, sdsnewlen(stackParse.top().arrvalues[0].first, stackParse.top().arrvalues[0].second), nullptr); + dictEntry *de = dictAddRaw(dictLongLongMetaData, sdsnewlen(stackParse.back().arrvalues[0].first, stackParse.back().arrvalues[0].second), nullptr); if (de == nullptr) throw "Bad Protocol: metadata field sent twice"; de->v.s64 = value; @@ -284,4 +330,15 @@ long long SnapshotPayloadParseState::getMetaDataLongLong(const char *szField) co throw false; } return de->v.s64; +} + +sds SnapshotPayloadParseState::getMetaDataStr(const char *szField) const { + sdsstring str(szField, strlen(szField)); + + dictEntry *de = dictFind(dictMetaData, str.get()); + if (de == nullptr) { + serverLog(LL_WARNING, "Master did not send field: %s", szField); + throw false; + } + return (sds)de->v.val; } \ No newline at end of file diff --git a/src/SnapshotPayloadParseState.h b/src/SnapshotPayloadParseState.h index fee42178a..cb1e0420a 100644 --- a/src/SnapshotPayloadParseState.h +++ b/src/SnapshotPayloadParseState.h @@ -27,7 +27,7 @@ class SnapshotPayloadParseState { } }; - std::stack stackParse; + std::vector stackParse; std::vector vecqueuedKeys; @@ -39,6 +39,7 @@ class SnapshotPayloadParseState { std::atomic insertsInFlight; std::unique_ptr m_spallocator; dict *dictLongLongMetaData = nullptr; + dict *dictMetaData = nullptr; size_t cbQueued = 0; static const size_t queuedBatchLimit = 64*1024*1024; // 64 MB int current_database = -1; @@ -51,8 +52,9 @@ public: ~SnapshotPayloadParseState(); long long getMetaDataLongLong(const char *field) const; + sds getMetaDataStr(const char *szField) const; - const char *getStateDebugName(); + static const char *getStateDebugName(ParseStage stage); size_t depth() const; diff --git a/src/replication.cpp b/src/replication.cpp index 6cfe7d3ba..3651b0113 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -1079,7 +1079,42 @@ int rdbSaveSnapshotForReplication(struct rdbSaveInfo *rsi) { } serverAssert(replica != nullptr); - g_pserver->asyncworkqueue->AddWorkFunction([repl_stream_db = rsi->repl_stream_db, spreplBuf = std::move(spreplBuf)]{ + spreplBuf->addArrayLen(2); // Two sections: Metadata and databases + + // MetaData + aeAcquireLock(); + spreplBuf->addArrayLen(3 + dictSize(g_pserver->lua_scripts)); + spreplBuf->addArrayLen(2); + spreplBuf->addString("repl-stream-db", 14); + spreplBuf->addLong(rsi->repl_stream_db); + spreplBuf->addArrayLen(2); + spreplBuf->addString("repl-id", 7); + spreplBuf->addString(rsi->repl_id, CONFIG_RUN_ID_SIZE+1); + spreplBuf->addArrayLen(2); + spreplBuf->addString("repl-offset", 11); + spreplBuf->addLong(rsi->master_repl_offset); + + if (dictSize(g_pserver->lua_scripts)) { + dictEntry *de; + auto di = dictGetIterator(g_pserver->lua_scripts); + while((de = dictNext(di)) != NULL) { + robj *body = (robj*)dictGetVal(de); + + spreplBuf->addArrayLen(2); + spreplBuf->addString("lua", 3); + spreplBuf->addString(szFromObj(body), sdslen(szFromObj(body))); + } + dictReleaseIterator(di); + di = NULL; /* So that we don't release it again on error. */ + } + + std::shared_ptr>> spvecspsnapshot = std::make_shared>>(); + for (int idb = 0; idb < cserver.dbnum; ++idb) { + spvecspsnapshot->emplace_back(g_pserver->db[idb]->CloneStorageCache()); + } + aeReleaseLock(); + + g_pserver->asyncworkqueue->AddWorkFunction([spreplBuf = std::move(spreplBuf), spvecspsnapshot = std::move(spvecspsnapshot)]{ int retval = C_OK; auto timeStart = ustime(); auto lastLogTime = timeStart; @@ -1087,18 +1122,10 @@ int rdbSaveSnapshotForReplication(struct rdbSaveInfo *rsi) { size_t cbLastUpdate = 0; auto &replBuf = *spreplBuf; - replBuf.addArrayLen(2); // Two sections: Metadata and databases - - // MetaData - replBuf.addArrayLen(1); - replBuf.addArrayLen(2); - replBuf.addString("repl-stream-db", 14); - replBuf.addLong(repl_stream_db); - // Databases replBuf.addArrayLen(cserver.dbnum); for (int idb = 0; idb < cserver.dbnum; ++idb) { - std::unique_ptr spsnapshot = g_pserver->db[idb]->CloneStorageCache(); + auto &spsnapshot = (*spvecspsnapshot)[idb]; size_t snapshotDeclaredCount = spsnapshot->count(); replBuf.addArrayLen(snapshotDeclaredCount); size_t count = 0; @@ -1139,7 +1166,6 @@ int rdbSaveSnapshotForReplication(struct rdbSaveInfo *rsi) { serverLog(LL_NOTICE, "Transferred %zuMB total (%zuMB data) in %.2f seconds. (%.2fGbit/s)", spreplBuf->cbWritten()/1024/1024, cbData/1024/1024, usec/1000000.0, (spreplBuf->cbWritten()*8.0)/(usec/1000000.0)/1000000000.0); if (retval == C_OK) { aeAcquireLock(); - replicationScriptCacheFlush(); replBuf.putSlavesOnline(); aeReleaseLock(); } @@ -2302,6 +2328,7 @@ void disklessLoadDiscardBackup(const dbBackup *buckup, int flag) { size_t parseCount(const char *rgch, size_t cch, long long *pvalue) { size_t cchNumeral = 0; + if (cch > cchNumeral+1 && rgch[cchNumeral+1] == '-') ++cchNumeral; while ((cch > (cchNumeral+1)) && isdigit(rgch[1 + cchNumeral])) ++cchNumeral; if (cch < (cchNumeral+1+2)) { // +2 is for the \r\n we expect @@ -2313,7 +2340,7 @@ size_t parseCount(const char *rgch, size_t cch, long long *pvalue) { throw false; } - if (!string2ll(rgch+1, cchNumeral, pvalue) || *pvalue < 0) { + if (!string2ll(rgch+1, cchNumeral, pvalue)) { serverLog(LL_WARNING, "Bad protocol from MASTER: %s", rgch); throw false; } @@ -2330,6 +2357,7 @@ bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi if (mi->bulkreadBuffer == nullptr) { mi->bulkreadBuffer = sdsempty(); mi->parseState = new SnapshotPayloadParseState(); + if (g_pserver->aof_state != AOF_OFF) stopAppendOnly(); if (!fUpdate) { int empty_db_flags = g_pserver->repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS; @@ -2379,6 +2407,8 @@ bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi // Lets read the array length offset += parseCount(mi->bulkreadBuffer + offset, sdslen(mi->bulkreadBuffer) - offset, &arraySize); + if (arraySize < 0) + throw "Invalid array size"; mi->parseState->pushArray(arraySize); } else if (mi->bulkreadBuffer[offset] == '$') { @@ -2387,6 +2417,8 @@ bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi // Lets read the string length size_t offsetCount = parseCount(mi->bulkreadBuffer + offset, sdslen(mi->bulkreadBuffer) - offset, &payloadsize); + if (payloadsize < 0) + throw "Invalid array size"; // OK we know how long the string is, now lets make sure the payload is here. if (sdslen(mi->bulkreadBuffer) < (offset + offsetCount + payloadsize + 2)) { @@ -2421,7 +2453,14 @@ bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi if (mi->parseState->depth() != 0) return false; + static_assert(sizeof(long) == sizeof(long long),""); rsi.repl_stream_db = mi->parseState->getMetaDataLongLong("repl-stream-db"); + rsi.repl_offset = mi->parseState->getMetaDataLongLong("repl-offset"); + sds str = mi->parseState->getMetaDataStr("repl-id"); + if (sdslen(str) == CONFIG_RUN_ID_SIZE+1) { + memcpy(rsi.repl_id, str, CONFIG_RUN_ID_SIZE+1); + } + fFinished = true; break; // We're done!!! } @@ -3347,7 +3386,7 @@ void syncWithMaster(connection *conn) { "capa","psync2", "capa","activeExpire", }; - if (g_pserver->m_pstorageFactory && !g_pserver->fActiveReplica) { + if (g_pserver->m_pstorageFactory && !g_pserver->fActiveReplica && g_pserver->repl_diskless_load != REPL_DISKLESS_LOAD_SWAPDB) { veccapabilities.push_back("capa"); veccapabilities.push_back("rocksdb-snapshot-load"); } From d443babbea470d84ea8dced9c6a5b3cf0dcec669 Mon Sep 17 00:00:00 2001 From: malavan Date: Wed, 10 Nov 2021 20:46:09 +0000 Subject: [PATCH 06/20] ignore rdb specific tests when running in --flash mode Former-commit-id: b44ba3efb21c390c3d199ea1cdd238a57611fd85 --- tests/integration/rdb-repl-tests | 4 ++++ tests/test_helper.tcl | 4 ++++ 2 files changed, 8 insertions(+) create mode 100644 tests/integration/rdb-repl-tests diff --git a/tests/integration/rdb-repl-tests b/tests/integration/rdb-repl-tests new file mode 100644 index 000000000..257d107f6 --- /dev/null +++ b/tests/integration/rdb-repl-tests @@ -0,0 +1,4 @@ +diskless no replicas drop during rdb pipe +diskless slow replicas drop during rdb pipe +diskless timeout replicas drop during rdb pipe +Kill rdb child process if its dumping RDB is not useful \ No newline at end of file diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index f9ca7107b..2e200c35a 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -704,6 +704,10 @@ for {set j 0} {$j < [llength $argv]} {incr j} { integration/replication-4 integration/replication-psync } + set fp [open {./tests/integration/rdb-repl-tests} r] + set file_data [read $fp] + close $fp + set ::skiptests [split $file_data "\n"] } else { puts "Wrong argument: $opt" exit 1 From aa048e2b2f1ba17dd5f2e7fd7ed2bf4d1e4d8f17 Mon Sep 17 00:00:00 2001 From: christianEQ Date: Mon, 1 Nov 2021 03:17:13 +0000 Subject: [PATCH 07/20] added createMetadataDb to IStorageFactory + implementations Former-commit-id: a2acf75484d2af93aad9d03a20bd402893044860 --- src/IStorage.h | 1 + src/server.cpp | 4 ++++ src/server.h | 1 + src/storage/rocksdbfactor_internal.h | 1 + src/storage/teststorageprovider.cpp | 5 +++++ src/storage/teststorageprovider.h | 1 + 6 files changed, 13 insertions(+) diff --git a/src/IStorage.h b/src/IStorage.h index e060d849f..196ae8cef 100644 --- a/src/IStorage.h +++ b/src/IStorage.h @@ -9,6 +9,7 @@ public: virtual ~IStorageFactory() {} virtual class IStorage *create(int db, key_load_iterator itr, void *privdata) = 0; + virtual class IStorage *createMetadataDb() = 0; virtual const char *name() const = 0; virtual size_t totalDiskspaceUsed() const = 0; virtual bool FSlow() const = 0; diff --git a/src/server.cpp b/src/server.cpp index 0dcae55b0..64bfeb86e 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -3984,6 +3984,10 @@ void initServer(void) { slowlogInit(); latencyMonitorInit(); + if (g_pserver->m_pstorageFactory) { + g_pserver->metadataDb = g_pserver->m_pstorageFactory->createMetadataDb(); + } + /* We have to initialize storage providers after the cluster has been initialized */ for (int idb = 0; idb < cserver.dbnum; ++idb) { diff --git a/src/server.h b/src/server.h index 656560868..6b00b27a3 100644 --- a/src/server.h +++ b/src/server.h @@ -2170,6 +2170,7 @@ struct redisServer { mode_t umask; /* The umask value of the process on startup */ std::atomic hz; /* serverCron() calls frequency in hertz */ int in_fork_child; /* indication that this is a fork child */ + IStorage *metadataDb = nullptr; redisDb **db = nullptr; dict *commands; /* Command table */ dict *orig_commands; /* Command table before command renaming. */ diff --git a/src/storage/rocksdbfactor_internal.h b/src/storage/rocksdbfactor_internal.h index beb71d87e..8a7df2cf5 100644 --- a/src/storage/rocksdbfactor_internal.h +++ b/src/storage/rocksdbfactor_internal.h @@ -14,6 +14,7 @@ public: ~RocksDBStorageFactory(); virtual IStorage *create(int db, key_load_iterator iter, void *privdata) override; + virtual IStorage *createMetadataDb() override; virtual const char *name() const override; virtual size_t totalDiskspaceUsed() const override; diff --git a/src/storage/teststorageprovider.cpp b/src/storage/teststorageprovider.cpp index 188a339f3..3739c403d 100644 --- a/src/storage/teststorageprovider.cpp +++ b/src/storage/teststorageprovider.cpp @@ -6,6 +6,11 @@ IStorage *TestStorageFactory::create(int, key_load_iterator, void *) return new (MALLOC_LOCAL) TestStorageProvider(); } +IStorage *TestStorageFactory::createMetadataDb() +{ + return new (MALLOC_LOCAL) TestStorageProvider(); +} + const char *TestStorageFactory::name() const { return "TEST Storage Provider"; diff --git a/src/storage/teststorageprovider.h b/src/storage/teststorageprovider.h index d5e956c7e..2b2b1a38d 100644 --- a/src/storage/teststorageprovider.h +++ b/src/storage/teststorageprovider.h @@ -5,6 +5,7 @@ class TestStorageFactory : public IStorageFactory { virtual class IStorage *create(int db, key_load_iterator itr, void *privdata) override; + virtual class IStorage *createMetadataDb() override; virtual const char *name() const override; virtual size_t totalDiskspaceUsed() const override { return 0; } virtual bool FSlow() const { return false; } From 39db170c4e950a223ab11bd6a506cf207c5a43ca Mon Sep 17 00:00:00 2001 From: christianEQ Date: Tue, 2 Nov 2021 15:37:29 +0000 Subject: [PATCH 08/20] add KEYDB_METADATA_ID to metadata tables when created Former-commit-id: 7fe8a184db62eab171935c20498bdb4f30ee6b1d --- src/IStorage.h | 2 ++ src/storage/teststorageprovider.cpp | 4 +++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/IStorage.h b/src/IStorage.h index 196ae8cef..ad956beb6 100644 --- a/src/IStorage.h +++ b/src/IStorage.h @@ -2,6 +2,8 @@ #include #include "sds.h" +#define METADATA_DB_IDENTIFIER "c299fde0-6d42-4ec4-b939-34f680ffe39f" + class IStorageFactory { public: diff --git a/src/storage/teststorageprovider.cpp b/src/storage/teststorageprovider.cpp index 3739c403d..a287397c7 100644 --- a/src/storage/teststorageprovider.cpp +++ b/src/storage/teststorageprovider.cpp @@ -8,7 +8,9 @@ IStorage *TestStorageFactory::create(int, key_load_iterator, void *) IStorage *TestStorageFactory::createMetadataDb() { - return new (MALLOC_LOCAL) TestStorageProvider(); + IStorage *metadataDb = new (MALLOC_LOCAL) TestStorageProvider(); + metadataDb->insert("KEYDB_METADATA_ID", strlen("KEYDB_METADATA_ID"), (void*)METADATA_DB_IDENTIFIER, strlen(METADATA_DB_IDENTIFIER), false); + return metadataDb; } const char *TestStorageFactory::name() const From 5219a90483e166c8be94fe65372954c7b8759c52 Mon Sep 17 00:00:00 2001 From: christianEQ Date: Wed, 3 Nov 2021 00:59:17 +0000 Subject: [PATCH 09/20] save and recognize metadata table identifier Former-commit-id: f06ef757c24ecc50df0e7abf5201a5499ff28c53 From ebb0e08d94a066c435dea457b36a87c8ae6e8493 Mon Sep 17 00:00:00 2001 From: christianEQ Date: Wed, 3 Nov 2021 02:25:57 +0000 Subject: [PATCH 10/20] save master status to storage when masters change Former-commit-id: 4989926a0028aed7d7700fd1d1f4ed27c20277cc --- src/IStorage.h | 1 + src/replication.cpp | 44 +++++++++++++++++++++++++++++ src/server.cpp | 4 +++ src/storage/teststorageprovider.cpp | 5 ++++ src/storage/teststorageprovider.h | 1 + 5 files changed, 55 insertions(+) diff --git a/src/IStorage.h b/src/IStorage.h index ad956beb6..cd5215b6d 100644 --- a/src/IStorage.h +++ b/src/IStorage.h @@ -12,6 +12,7 @@ public: virtual ~IStorageFactory() {} virtual class IStorage *create(int db, key_load_iterator itr, void *privdata) = 0; virtual class IStorage *createMetadataDb() = 0; + virtual std::string getMetadata() const = 0; virtual const char *name() const = 0; virtual size_t totalDiskspaceUsed() const = 0; virtual bool FSlow() const = 0; diff --git a/src/replication.cpp b/src/replication.cpp index 3651b0113..4a6e8d5c8 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -3783,6 +3783,45 @@ void disconnectMaster(redisMaster *mi) } } +void saveMasterStatusToStorage() +{ + if (!g_pserver->m_pstorageFactory || !g_pserver->metadataDb) return; + if (listLength(g_pserver->masters) == 0) { + g_pserver->metadataDb->insert("repl_masters", 12, (void*)"", 0, true); + return; + } + sds val = sds(sdsempty()); + listNode *ln; + listIter li; + redisMaster *mi; + listRewind(g_pserver->masters,&li); + while((ln = listNext(&li)) != NULL) { + mi = (redisMaster*)listNodeValue(ln); + if (!mi->master) { + // If master client is not available, use info from master struct - better than nothing + if (mi->master_replid[0] == 0) { + // if replid is null, there's no reason to save it + continue; + } + val = sdscatfmt(val, "%s:%I:%s:%i;", mi->master_replid, + mi->master_initial_offset, + mi->masterhost, + mi->masterport); + } + else { + if (mi->master->replid[0] == 0) { + // if replid is null, there's no reason to save it + continue; + } + val = sdscatfmt(val, "%s:%I:%s:%i;", mi->master->replid, + mi->master->reploff, + mi->masterhost, + mi->masterport); + } + } + g_pserver->metadataDb->insert("repl_masters", 12, (void*)val, sdslen(val), true); +} + /* Set replication to the specified master address and port. */ struct redisMaster *replicationAddMaster(char *ip, int port) { // pre-reqs: We must not already have a replica in the list with the same tuple @@ -3855,6 +3894,7 @@ struct redisMaster *replicationAddMaster(char *ip, int port) { mi->masterhost, mi->masterport); connectWithMaster(mi); } + saveMasterStatusToStorage(); return mi; } @@ -3938,6 +3978,8 @@ void replicationUnsetMaster(redisMaster *mi) { /* Restart the AOF subsystem in case we shut it down during a sync when * we were still a slave. */ if (g_pserver->aof_enabled && g_pserver->aof_state == AOF_OFF) restartAOFAfterSYNC(); + + saveMasterStatusToStorage(); } /* This function is called when the replica lose the connection with the @@ -3969,6 +4011,8 @@ void replicationHandleMasterDisconnection(redisMaster *mi) { mi->masterhost, mi->masterport); connectWithMaster(mi); } + + saveMasterStatusToStorage(); } } diff --git a/src/server.cpp b/src/server.cpp index 64bfeb86e..d807f4d04 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -3985,6 +3985,10 @@ void initServer(void) { latencyMonitorInit(); if (g_pserver->m_pstorageFactory) { + std::string repl_masters = g_pserver->m_pstorageFactory->getMetadata(); + if (!repl_masters.empty()) { + serverLog(LL_NOTICE, "Loaded repl-masters from storage provider: %s", repl_masters); + } g_pserver->metadataDb = g_pserver->m_pstorageFactory->createMetadataDb(); } diff --git a/src/storage/teststorageprovider.cpp b/src/storage/teststorageprovider.cpp index a287397c7..73a7ef01d 100644 --- a/src/storage/teststorageprovider.cpp +++ b/src/storage/teststorageprovider.cpp @@ -13,6 +13,11 @@ IStorage *TestStorageFactory::createMetadataDb() return metadataDb; } +std::string TestStorageFactory::getMetadata() const +{ + return ""; +} + const char *TestStorageFactory::name() const { return "TEST Storage Provider"; diff --git a/src/storage/teststorageprovider.h b/src/storage/teststorageprovider.h index 2b2b1a38d..40f4a0f9d 100644 --- a/src/storage/teststorageprovider.h +++ b/src/storage/teststorageprovider.h @@ -6,6 +6,7 @@ class TestStorageFactory : public IStorageFactory { virtual class IStorage *create(int db, key_load_iterator itr, void *privdata) override; virtual class IStorage *createMetadataDb() override; + virtual std::string getMetadata() const override; virtual const char *name() const override; virtual size_t totalDiskspaceUsed() const override { return 0; } virtual bool FSlow() const { return false; } From 7f9ab83d2476576d8d756ddc4f9cab9f6857f38b Mon Sep 17 00:00:00 2001 From: christianEQ Date: Sat, 6 Nov 2021 01:05:47 +0000 Subject: [PATCH 11/20] retrieve replid from IStorage rather than at load time Former-commit-id: c25323439ce400ca91b2193aa2f464e7b09978fd --- src/IStorage.h | 1 - src/replication.cpp | 20 ++++++++++++++++++-- src/server.cpp | 12 ++++++++---- src/storage/teststorageprovider.cpp | 5 ----- src/storage/teststorageprovider.h | 1 - 5 files changed, 26 insertions(+), 13 deletions(-) diff --git a/src/IStorage.h b/src/IStorage.h index cd5215b6d..ad956beb6 100644 --- a/src/IStorage.h +++ b/src/IStorage.h @@ -12,7 +12,6 @@ public: virtual ~IStorageFactory() {} virtual class IStorage *create(int db, key_load_iterator itr, void *privdata) = 0; virtual class IStorage *createMetadataDb() = 0; - virtual std::string getMetadata() const = 0; virtual const char *name() const = 0; virtual size_t totalDiskspaceUsed() const = 0; virtual bool FSlow() const = 0; diff --git a/src/replication.cpp b/src/replication.cpp index 4a6e8d5c8..b6607235c 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -3786,8 +3786,24 @@ void disconnectMaster(redisMaster *mi) void saveMasterStatusToStorage() { if (!g_pserver->m_pstorageFactory || !g_pserver->metadataDb) return; + + g_pserver->metadataDb->insert("repl-id", 7, g_pserver->replid, sizeof(g_pserver->replid), true); + if (g_pserver->fActiveReplica || (!listLength(g_pserver->masters) && g_pserver->repl_backlog)) { + g_pserver->metadataDb->insert("repl-stream-db", 14, g_pserver->replicaseldb == -1 ? 0 : &g_pserver->replicaseldb, + g_pserver->replicaseldb == -1 ? 0 : sizeof(g_pserver->replicaseldb), true); + } + + struct redisMaster *miFirst = (redisMaster*)(listLength(g_pserver->masters) ? listNodeValue(listFirst(g_pserver->masters)) : NULL); + + if (miFirst && miFirst->master) { + g_pserver->metadataDb->insert("repl-stream-db", 14, &miFirst->master->db->id, sizeof(miFirst->master->db->id), true); + } + else if (miFirst && miFirst->cached_master) { + g_pserver->metadataDb->insert("repl-stream-db", 14, &miFirst->cached_master->db->id, sizeof(miFirst->cached_master->db->id), true); + } + if (listLength(g_pserver->masters) == 0) { - g_pserver->metadataDb->insert("repl_masters", 12, (void*)"", 0, true); + g_pserver->metadataDb->insert("repl-masters", 12, (void*)"", 0, true); return; } sds val = sds(sdsempty()); @@ -3819,7 +3835,7 @@ void saveMasterStatusToStorage() mi->masterport); } } - g_pserver->metadataDb->insert("repl_masters", 12, (void*)val, sdslen(val), true); + g_pserver->metadataDb->insert("repl-masters", 12, (void*)val, sdslen(val), true); } /* Set replication to the specified master address and port. */ diff --git a/src/server.cpp b/src/server.cpp index d807f4d04..a3fe4773a 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -3985,11 +3985,15 @@ void initServer(void) { latencyMonitorInit(); if (g_pserver->m_pstorageFactory) { - std::string repl_masters = g_pserver->m_pstorageFactory->getMetadata(); - if (!repl_masters.empty()) { - serverLog(LL_NOTICE, "Loaded repl-masters from storage provider: %s", repl_masters); - } g_pserver->metadataDb = g_pserver->m_pstorageFactory->createMetadataDb(); + if (g_pserver->metadataDb) { + g_pserver->metadataDb->retrieve("repl-id", 7, [&](const char *, size_t, const void *data, size_t cb){ + if (cb == sizeof(g_pserver->replid)) { + serverLog(LL_NOTICE, "Retrieved repl-id: %s", (const char*)data); + memcpy(g_pserver->replid, data, cb); + } + }); + } } /* We have to initialize storage providers after the cluster has been initialized */ diff --git a/src/storage/teststorageprovider.cpp b/src/storage/teststorageprovider.cpp index 73a7ef01d..a287397c7 100644 --- a/src/storage/teststorageprovider.cpp +++ b/src/storage/teststorageprovider.cpp @@ -13,11 +13,6 @@ IStorage *TestStorageFactory::createMetadataDb() return metadataDb; } -std::string TestStorageFactory::getMetadata() const -{ - return ""; -} - const char *TestStorageFactory::name() const { return "TEST Storage Provider"; diff --git a/src/storage/teststorageprovider.h b/src/storage/teststorageprovider.h index 40f4a0f9d..2b2b1a38d 100644 --- a/src/storage/teststorageprovider.h +++ b/src/storage/teststorageprovider.h @@ -6,7 +6,6 @@ class TestStorageFactory : public IStorageFactory { virtual class IStorage *create(int db, key_load_iterator itr, void *privdata) override; virtual class IStorage *createMetadataDb() override; - virtual std::string getMetadata() const override; virtual const char *name() const override; virtual size_t totalDiskspaceUsed() const override { return 0; } virtual bool FSlow() const { return false; } From a59b192174086b0f6f614f70ff01b11db2a7f162 Mon Sep 17 00:00:00 2001 From: christianEQ Date: Mon, 8 Nov 2021 19:34:30 +0000 Subject: [PATCH 12/20] save replid from storage on load Former-commit-id: 8e5d0cb7035db30f35ead36aab52df07ab3c9bee --- src/replication.cpp | 115 +++++++++++++++++++++++--------------------- src/server.cpp | 22 ++++++++- 2 files changed, 81 insertions(+), 56 deletions(-) diff --git a/src/replication.cpp b/src/replication.cpp index b6607235c..4756801e4 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -2096,6 +2096,64 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) } } +/* Save the replid of yourself and any connected masters to storage. + * Returns if no storage provider is used. */ +void saveMasterStatusToStorage() +{ + if (!g_pserver->m_pstorageFactory || !g_pserver->metadataDb) return; + + g_pserver->metadataDb->insert("repl-id", 7, g_pserver->replid, sizeof(g_pserver->replid), true); + g_pserver->metadataDb->insert("repl-offset", 11, &g_pserver->master_repl_offset, sizeof(g_pserver->master_repl_offset), true); + if (g_pserver->fActiveReplica || (!listLength(g_pserver->masters) && g_pserver->repl_backlog)) { + g_pserver->metadataDb->insert("repl-stream-db", 14, g_pserver->replicaseldb == -1 ? 0 : &g_pserver->replicaseldb, + g_pserver->replicaseldb == -1 ? 0 : sizeof(g_pserver->replicaseldb), true); + } + + struct redisMaster *miFirst = (redisMaster*)(listLength(g_pserver->masters) ? listNodeValue(listFirst(g_pserver->masters)) : NULL); + + if (miFirst && miFirst->master) { + g_pserver->metadataDb->insert("repl-stream-db", 14, &miFirst->master->db->id, sizeof(miFirst->master->db->id), true); + } + else if (miFirst && miFirst->cached_master) { + g_pserver->metadataDb->insert("repl-stream-db", 14, &miFirst->cached_master->db->id, sizeof(miFirst->cached_master->db->id), true); + } + + if (listLength(g_pserver->masters) == 0) { + g_pserver->metadataDb->insert("repl-masters", 12, (void*)"", 0, true); + return; + } + sds val = sds(sdsempty()); + listNode *ln; + listIter li; + redisMaster *mi; + listRewind(g_pserver->masters,&li); + while((ln = listNext(&li)) != NULL) { + mi = (redisMaster*)listNodeValue(ln); + if (!mi->master) { + // If master client is not available, use info from master struct - better than nothing + if (mi->master_replid[0] == 0) { + // if replid is null, there's no reason to save it + continue; + } + val = sdscatfmt(val, "%s:%I:%s:%i;", mi->master_replid, + mi->master_initial_offset, + mi->masterhost, + mi->masterport); + } + else { + if (mi->master->replid[0] == 0) { + // if replid is null, there's no reason to save it + continue; + } + val = sdscatfmt(val, "%s:%I:%s:%i;", mi->master->replid, + mi->master->reploff, + mi->masterhost, + mi->masterport); + } + } + g_pserver->metadataDb->insert("repl-masters", 12, (void*)val, sdslen(val), true); +} + /* Change the current instance replication ID with a new, random one. * This will prevent successful PSYNCs between this master and other * slaves, so the command should be called when something happens that @@ -2103,6 +2161,7 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) void changeReplicationId(void) { getRandomHexChars(g_pserver->replid,CONFIG_RUN_ID_SIZE); g_pserver->replid[CONFIG_RUN_ID_SIZE] = '\0'; + saveMasterStatusToStorage(); } @@ -2894,6 +2953,7 @@ void readSyncBulkPayload(connection *conn) { g_pserver->master_repl_offset = mi->master->reploff; if (g_pserver->repl_batch_offStart >= 0) g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; + saveMasterStatusToStorage(); } clearReplicationId2(); @@ -3783,61 +3843,6 @@ void disconnectMaster(redisMaster *mi) } } -void saveMasterStatusToStorage() -{ - if (!g_pserver->m_pstorageFactory || !g_pserver->metadataDb) return; - - g_pserver->metadataDb->insert("repl-id", 7, g_pserver->replid, sizeof(g_pserver->replid), true); - if (g_pserver->fActiveReplica || (!listLength(g_pserver->masters) && g_pserver->repl_backlog)) { - g_pserver->metadataDb->insert("repl-stream-db", 14, g_pserver->replicaseldb == -1 ? 0 : &g_pserver->replicaseldb, - g_pserver->replicaseldb == -1 ? 0 : sizeof(g_pserver->replicaseldb), true); - } - - struct redisMaster *miFirst = (redisMaster*)(listLength(g_pserver->masters) ? listNodeValue(listFirst(g_pserver->masters)) : NULL); - - if (miFirst && miFirst->master) { - g_pserver->metadataDb->insert("repl-stream-db", 14, &miFirst->master->db->id, sizeof(miFirst->master->db->id), true); - } - else if (miFirst && miFirst->cached_master) { - g_pserver->metadataDb->insert("repl-stream-db", 14, &miFirst->cached_master->db->id, sizeof(miFirst->cached_master->db->id), true); - } - - if (listLength(g_pserver->masters) == 0) { - g_pserver->metadataDb->insert("repl-masters", 12, (void*)"", 0, true); - return; - } - sds val = sds(sdsempty()); - listNode *ln; - listIter li; - redisMaster *mi; - listRewind(g_pserver->masters,&li); - while((ln = listNext(&li)) != NULL) { - mi = (redisMaster*)listNodeValue(ln); - if (!mi->master) { - // If master client is not available, use info from master struct - better than nothing - if (mi->master_replid[0] == 0) { - // if replid is null, there's no reason to save it - continue; - } - val = sdscatfmt(val, "%s:%I:%s:%i;", mi->master_replid, - mi->master_initial_offset, - mi->masterhost, - mi->masterport); - } - else { - if (mi->master->replid[0] == 0) { - // if replid is null, there's no reason to save it - continue; - } - val = sdscatfmt(val, "%s:%I:%s:%i;", mi->master->replid, - mi->master->reploff, - mi->masterhost, - mi->masterport); - } - } - g_pserver->metadataDb->insert("repl-masters", 12, (void*)val, sdslen(val), true); -} - /* Set replication to the specified master address and port. */ struct redisMaster *replicationAddMaster(char *ip, int port) { // pre-reqs: We must not already have a replica in the list with the same tuple diff --git a/src/server.cpp b/src/server.cpp index a3fe4773a..c74cc179b 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -3989,10 +3989,30 @@ void initServer(void) { if (g_pserver->metadataDb) { g_pserver->metadataDb->retrieve("repl-id", 7, [&](const char *, size_t, const void *data, size_t cb){ if (cb == sizeof(g_pserver->replid)) { - serverLog(LL_NOTICE, "Retrieved repl-id: %s", (const char*)data); memcpy(g_pserver->replid, data, cb); } }); + g_pserver->metadataDb->retrieve("repl-offset", 11, [&](const char *, size_t, const void *data, size_t cb){ + if (cb == sizeof(g_pserver->replid)) { + g_pserver->master_repl_offset = *(long long*)data; + } + }); + + listIter li; + listNode *ln; + + listRewind(g_pserver->masters, &li); + while ((ln = listNext(&li))) + { + redisMaster *mi = (redisMaster*)listNodeValue(ln); + /* If we are a replica, create a cached master from this + * information, in order to allow partial resynchronizations + * with masters. */ + replicationCacheMasterUsingMyself(mi); + g_pserver->metadataDb->retrieve("repl-stream-db", 14, [&](const char *, size_t, const void *data, size_t cb){ + selectDb(mi->cached_master, *(int*)data); + }); + } } } From 0dacd68163bd5454e0108e7dd02826624b775fb7 Mon Sep 17 00:00:00 2001 From: christianEQ Date: Thu, 11 Nov 2021 01:41:28 +0000 Subject: [PATCH 13/20] added tests for flash psync Former-commit-id: 79f7376a4fea2de9bd06c06d96f3f98ee7308874 --- tests/integration/replication-psync-flash.tcl | 135 ++++++++++++++++++ tests/test_helper.tcl | 1 + 2 files changed, 136 insertions(+) create mode 100644 tests/integration/replication-psync-flash.tcl diff --git a/tests/integration/replication-psync-flash.tcl b/tests/integration/replication-psync-flash.tcl new file mode 100644 index 000000000..5cad127f3 --- /dev/null +++ b/tests/integration/replication-psync-flash.tcl @@ -0,0 +1,135 @@ +# Creates a master-slave pair and breaks the link continuously to force +# partial resyncs attempts, all this while flooding the master with +# write queries. +# +# You can specify backlog size, ttl, delay before reconnection, test duration +# in seconds, and an additional condition to verify at the end. +# +# If reconnect is > 0, the test actually try to break the connection and +# reconnect with the master, otherwise just the initial synchronization is +# checked for consistency. +proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reconnect} { + start_server {tags {"repl"}} { + start_server [list tags {flash} overrides [list storage-provider {flash ./rocks.db} delete-on-evict no storage-flush-period 10]] { + + set master [srv -1 client] + set master_host [srv -1 host] + set master_port [srv -1 port] + set slave [srv 0 client] + + set load_handle0 [start_bg_complex_data $master_host $master_port 9 100000] + set load_handle1 [start_bg_complex_data $master_host $master_port 11 100000] + set load_handle2 [start_bg_complex_data $master_host $master_port 12 100000] + + test {Slave should be able to synchronize with the master} { + $slave slaveof $master_host $master_port + wait_for_condition 50 100 { + [lindex [r role] 0] eq {slave} && + [lindex [r role] 3] eq {connected} + } else { + fail "Replication not started." + } + } + + # Check that the background clients are actually writing. + test {Detect write load to master} { + wait_for_condition 50 1000 { + [$master dbsize] > 100 + } else { + fail "Can't detect write load from background clients." + } + } + + test "Test replication partial resync: $descr (diskless: $mdl, $sdl, reconnect: $reconnect)" { + # Now while the clients are writing data, break the maste-slave + # link multiple times. + if ($reconnect) { + for {set j 0} {$j < $duration*10} {incr j} { + after 100 + # catch {puts "MASTER [$master dbsize] keys, REPLICA [$slave dbsize] keys"} + + if {($j % 20) == 0} { + catch { + $slave debug restart + } + } + } + } + stop_bg_complex_data $load_handle0 + stop_bg_complex_data $load_handle1 + stop_bg_complex_data $load_handle2 + + # Wait for the slave to reach the "online" + # state from the POV of the master. + set retry 5000 + while {$retry} { + set info [$master info] + if {[string match {*slave0:*state=online*} $info]} { + break + } else { + incr retry -1 + after 100 + } + } + if {$retry == 0} { + error "assertion:Slave not correctly synchronized" + } + + # Wait that slave acknowledge it is online so + # we are sure that DBSIZE and DEBUG DIGEST will not + # fail because of timing issues. (-LOADING error) + wait_for_condition 5000 100 { + [lindex [$slave role] 3] eq {connected} + } else { + fail "Slave still not connected after some time" + } + + set retry 10 + while {$retry && ([$master debug digest] ne [$slave debug digest])}\ + { + after 1000 + incr retry -1 + } + assert {[$master dbsize] > 0} + + if {[$master debug digest] ne [$slave debug digest]} { + set csv1 [csvdump r] + set csv2 [csvdump {r -1}] + set fd [open /tmp/repldump1.txt w] + puts -nonewline $fd $csv1 + close $fd + set fd [open /tmp/repldump2.txt w] + puts -nonewline $fd $csv2 + close $fd + puts "Master - Replica inconsistency" + puts "Run diff -u against /tmp/repldump*.txt for more info" + } + assert_equal [r debug digest] [r -1 debug digest] + eval $cond + } + } + } +} + +foreach mdl {no yes} { + foreach sdl {disabled swapdb} { + test_psync {no reconnection, just sync} 6 1000000 3600 0 { + } $mdl $sdl 0 + + test_psync {ok psync} 6 100000000 3600 0 { + assert {[s -1 sync_partial_ok] > 0} + } $mdl $sdl 1 + + test_psync {no backlog} 6 100 3600 0.5 { + assert {[s -1 sync_partial_err] > 0} + } $mdl $sdl 1 + + test_psync {ok after delay} 3 100000000 3600 3 { + assert {[s -1 sync_partial_ok] > 0} + } $mdl $sdl 1 + + test_psync {backlog expired} 3 100000000 1 3 { + assert {[s -1 sync_partial_err] > 0} + } $mdl $sdl 1 + } +} diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index 2e200c35a..2ac904d20 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -47,6 +47,7 @@ set ::all_tests { integration/replication-3 integration/replication-4 integration/replication-psync + integration/replication-psync-flash integration/replication-active integration/replication-multimaster integration/replication-multimaster-connect From 9c75fc3e8fb7a4be49cc8b52a985eff16cfebb5f Mon Sep 17 00:00:00 2001 From: christianEQ Date: Thu, 11 Nov 2021 13:12:46 +0000 Subject: [PATCH 14/20] removed unnecessary code to check for metadata Former-commit-id: 69ffc89d7c27a4eef04aa5cb59ffd1c2b9b8eb20 From 3819331e2e4e19bec475c3af2966a5054f3cfb4a Mon Sep 17 00:00:00 2001 From: christianEQ Date: Thu, 11 Nov 2021 13:15:21 +0000 Subject: [PATCH 15/20] removed unused vars Former-commit-id: e2bb6242e8be53cdad6def6acdd7e90e0f7d9852 --- src/server.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server.cpp b/src/server.cpp index c74cc179b..d13bc6f00 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -4009,7 +4009,7 @@ void initServer(void) { * information, in order to allow partial resynchronizations * with masters. */ replicationCacheMasterUsingMyself(mi); - g_pserver->metadataDb->retrieve("repl-stream-db", 14, [&](const char *, size_t, const void *data, size_t cb){ + g_pserver->metadataDb->retrieve("repl-stream-db", 14, [&](const char *, size_t, const void *data, size_t){ selectDb(mi->cached_master, *(int*)data); }); } From fa223f4b5a1d20403ae00cbc147168fd999bdaf8 Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 12 Nov 2021 00:18:32 +0000 Subject: [PATCH 16/20] Allow fast sync even if a background save is in progress Former-commit-id: b98aadfb6875f366a63a9d27d47fb5ca60abaa03 --- src/replication.cpp | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/replication.cpp b/src/replication.cpp index 3651b0113..2618a605f 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -1385,8 +1385,11 @@ void syncCommand(client *c) { g_pserver->replid, g_pserver->replid2); } + /* CASE 0: Fast Sync */ + if ((c->slave_capa & SLAVE_CAPA_ROCKSDB_SNAPSHOT) && g_pserver->m_pstorageFactory) { + startBgsaveForReplication(c->slave_capa); /* CASE 1: BGSAVE is in progress, with disk target. */ - if (g_pserver->FRdbSaveInProgress() && + } else if (g_pserver->FRdbSaveInProgress() && g_pserver->rdb_child_type == RDB_CHILD_TYPE_DISK) { /* Ok a background save is in progress. Let's check if it is a good From ef9c1bd07439f74a274da6a3e66c879ab69da15f Mon Sep 17 00:00:00 2001 From: malavan Date: Tue, 16 Nov 2021 02:13:36 +0000 Subject: [PATCH 17/20] fix dataloss on aof load with storage provider Former-commit-id: 447c8601b5203346fdd4b956ad1ac87c4a6073c8 --- src/aof.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/aof.cpp b/src/aof.cpp index 326c30ba2..f99a90b2e 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -870,6 +870,11 @@ int loadAppendOnlyFile(char *filename) { fakeClient = createAOFClient(); startLoadingFile(fp, filename, RDBFLAGS_AOF_PREAMBLE); + for (int idb = 0; idb < cserver.dbnum; ++idb) + { + g_pserver->db[idb]->trackChanges(true); + } + /* Check if this AOF file has an RDB preamble. In that case we need to * load the RDB file and later continue loading the AOF tail. */ char sig[5]; /* "REDIS" */ @@ -892,11 +897,6 @@ int loadAppendOnlyFile(char *filename) { } } - for (int idb = 0; idb < cserver.dbnum; ++idb) - { - g_pserver->db[idb]->trackChanges(true); - } - /* Read the actual AOF file, in REPL format, command by command. */ while(1) { int argc, j; From 2ae7247ae7c3adaa358aacaba00284fa9c84834b Mon Sep 17 00:00:00 2001 From: malavan Date: Wed, 24 Nov 2021 21:59:01 +0000 Subject: [PATCH 18/20] remove async write from fast sync repl buffer and fix some bugs Former-commit-id: 6c45706f75d8322281296d9c73a3fac4f7f383a1 --- src/replication.cpp | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/src/replication.cpp b/src/replication.cpp index 3651b0113..06160bc8c 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -975,14 +975,7 @@ public: aeAcquireLock(); } - if (ireplica == replicas.size()-1 && replica->replyAsync == nullptr) { - if (prepareClientToWrite(replica) == C_OK) { - replica->replyAsync = reply; - reply = nullptr; - } - } else { - addReplyProto(replica, reply->buf(), reply->size); - } + addReplyProto(replica, reply->buf(), reply->used); } ProcessPendingAsyncWrites(); replicas.erase(std::remove_if(replicas.begin(), replicas.end(), [](const client *c)->bool{ return c->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP;}), replicas.end()); @@ -1089,7 +1082,7 @@ int rdbSaveSnapshotForReplication(struct rdbSaveInfo *rsi) { spreplBuf->addLong(rsi->repl_stream_db); spreplBuf->addArrayLen(2); spreplBuf->addString("repl-id", 7); - spreplBuf->addString(rsi->repl_id, CONFIG_RUN_ID_SIZE+1); + spreplBuf->addString(rsi->repl_id, CONFIG_RUN_ID_SIZE); spreplBuf->addArrayLen(2); spreplBuf->addString("repl-offset", 11); spreplBuf->addLong(rsi->master_repl_offset); From 55cc6f797a9179c393b7e6651eadfc8b457d1c9b Mon Sep 17 00:00:00 2001 From: malavan Date: Fri, 26 Nov 2021 17:46:41 +0000 Subject: [PATCH 19/20] client lock for fast sync replbuffer, delay fast sync for next replication cron Former-commit-id: 9fe7f8328d66f9ec57060934462ad85ef60c36aa --- src/replication.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/replication.cpp b/src/replication.cpp index c55617cba..d99346a57 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -975,6 +975,7 @@ public: aeAcquireLock(); } + std::unique_lock lock(replica->lock); addReplyProto(replica, reply->buf(), reply->used); } ProcessPendingAsyncWrites(); @@ -1380,7 +1381,7 @@ void syncCommand(client *c) { /* CASE 0: Fast Sync */ if ((c->slave_capa & SLAVE_CAPA_ROCKSDB_SNAPSHOT) && g_pserver->m_pstorageFactory) { - startBgsaveForReplication(c->slave_capa); + serverLog(LL_NOTICE,"Fast SYNC on next replication cycle"); /* CASE 1: BGSAVE is in progress, with disk target. */ } else if (g_pserver->FRdbSaveInProgress() && g_pserver->rdb_child_type == RDB_CHILD_TYPE_DISK) From e5b5099eef2f071acd588684ea647b0a830dc8f8 Mon Sep 17 00:00:00 2001 From: christianEQ Date: Fri, 26 Nov 2021 20:36:46 +0000 Subject: [PATCH 20/20] don't save master if host is null Former-commit-id: 8238d8da82c483c093f5248b9dac983bc542e20f --- src/replication.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/replication.cpp b/src/replication.cpp index e1f34f2be..87f3be643 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -2126,6 +2126,10 @@ void saveMasterStatusToStorage() listRewind(g_pserver->masters,&li); while((ln = listNext(&li)) != NULL) { mi = (redisMaster*)listNodeValue(ln); + if (mi->masterhost == NULL) { + // if we don't know the host, no reason to save + continue; + } if (!mi->master) { // If master client is not available, use info from master struct - better than nothing if (mi->master_replid[0] == 0) {