From b12b48ab272dbf1e6fdd8b758c175cacc10eb449 Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 9 Nov 2021 19:36:07 +0000 Subject: [PATCH] Initial implementation of snapshot fast replication. There are still a few TODOs in progress Former-commit-id: 0febdcdab8693af443f350968ed3d8c80106675d --- runtest | 1 + src/IStorage.h | 4 +- src/Makefile | 5 +- src/SnapshotPayloadParseState.cpp | 287 +++++++++++++++ src/SnapshotPayloadParseState.h | 64 ++++ src/StorageCache.cpp | 46 ++- src/StorageCache.h | 3 +- src/db.cpp | 17 +- src/dict.cpp | 4 +- src/networking.cpp | 31 +- src/replication.cpp | 469 +++++++++++++++++++++++-- src/sds.h | 9 +- src/server.h | 23 +- src/storage/rocksdbfactor_internal.h | 28 ++ tests/integration/replication-fast.tcl | 34 ++ tests/support/server.tcl | 4 + tests/test_helper.tcl | 12 + 17 files changed, 981 insertions(+), 60 deletions(-) create mode 100644 src/SnapshotPayloadParseState.cpp create mode 100644 src/SnapshotPayloadParseState.h create mode 100644 src/storage/rocksdbfactor_internal.h create mode 100644 tests/integration/replication-fast.tcl diff --git a/runtest b/runtest index c6349d118..6c9ea5ae4 100755 --- a/runtest +++ b/runtest @@ -14,3 +14,4 @@ then exit 1 fi $TCLSH tests/test_helper.tcl "${@}" +$TCLSH tests/test_helper.tcl "${@}--flash" \ No newline at end of file diff --git a/src/IStorage.h b/src/IStorage.h index d1f316022..e060d849f 100644 --- a/src/IStorage.h +++ b/src/IStorage.h @@ -30,10 +30,10 @@ public: virtual bool enumerate(callback fn) const = 0; virtual size_t count() const = 0; - virtual void bulkInsert(sds *rgkeys, sds *rgvals, size_t celem) { + virtual void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem) { beginWriteBatch(); for (size_t ielem = 0; ielem < celem; ++ielem) { - insert(rgkeys[ielem], sdslen(rgkeys[ielem]), rgvals[ielem], sdslen(rgvals[ielem]), false); + insert(rgkeys[ielem], rgcbkeys[ielem], rgvals[ielem], rgcbvals[ielem], false); } endWriteBatch(); } diff --git a/src/Makefile b/src/Makefile index 13f5209f7..d275cfea1 100644 --- a/src/Makefile +++ b/src/Makefile @@ -354,6 +354,7 @@ endif REDIS_SERVER_NAME=keydb-server$(PROG_SUFFIX) REDIS_SENTINEL_NAME=keydb-sentinel$(PROG_SUFFIX) REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o t_nhash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o cron.o connection.o tls.o sha256.o motd_server.o timeout.o setcpuaffinity.o AsyncWorkQueue.o snapshot.o storage/rocksdb.o storage/rocksdbfactory.o storage/teststorageprovider.o keydbutils.o StorageCache.o monotonic.o cli_common.o mt19937-64.o $(ASM_OBJ) +KEYDB_SERVER_OBJ=SnapshotPayloadParseState.o REDIS_CLI_NAME=keydb-cli$(PROG_SUFFIX) REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crcspeed.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o motd_client.o monotonic.o cli_common.o mt19937-64.o $(ASM_OBJ) REDIS_BENCHMARK_NAME=keydb-benchmark$(PROG_SUFFIX) @@ -410,7 +411,7 @@ endif @touch $@ # keydb-server -$(REDIS_SERVER_NAME): $(REDIS_SERVER_OBJ) +$(REDIS_SERVER_NAME): $(REDIS_SERVER_OBJ) $(KEYDB_SERVER_OBJ) $(REDIS_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/lua/src/liblua.a ../deps/rocksdb/librocksdb.a $(FINAL_LIBS) # keydb-sentinel @@ -433,7 +434,7 @@ $(REDIS_CLI_NAME): $(REDIS_CLI_OBJ) $(REDIS_BENCHMARK_NAME): $(REDIS_BENCHMARK_OBJ) $(REDIS_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/hdr_histogram/hdr_histogram.o $(FINAL_LIBS) -DEP = $(REDIS_SERVER_OBJ:%.o=%.d) $(REDIS_CLI_OBJ:%.o=%.d) $(REDIS_BENCHMARK_OBJ:%.o=%.d) +DEP = $(REDIS_SERVER_OBJ:%.o=%.d) $(KEYDB_SERVER_OBJ:%.o=%.d) $(REDIS_CLI_OBJ:%.o=%.d) $(REDIS_BENCHMARK_OBJ:%.o=%.d) -include $(DEP) # Because the jemalloc.h header is generated as a part of the jemalloc build, diff --git a/src/SnapshotPayloadParseState.cpp b/src/SnapshotPayloadParseState.cpp new file mode 100644 index 000000000..4bf89c718 --- /dev/null +++ b/src/SnapshotPayloadParseState.cpp @@ -0,0 +1,287 @@ +#include "server.h" +#include "SnapshotPayloadParseState.h" +#include + +static const size_t SLAB_SIZE = 2*1024*1024; // Note 64 MB because we try to give 64MB to a block +class SlabAllocator +{ + class Slab { + char *m_pv = nullptr; + size_t m_cb = 0; + size_t m_cbAllocated = 0; + + public: + Slab(size_t cbAllocate) { + m_pv = (char*)zmalloc(cbAllocate); + m_cb = cbAllocate; + } + + ~Slab() { + zfree(m_pv); + } + + Slab(Slab &&other) { + m_pv = other.m_pv; + m_cb = other.m_cb; + m_cbAllocated = other.m_cbAllocated; + other.m_pv = nullptr; + other.m_cb = 0; + other.m_cbAllocated = 0; + } + + bool canAllocate(size_t cb) const { + return (m_cbAllocated + cb) <= m_cb; + } + + void *allocate(size_t cb) { + if ((m_cbAllocated + cb) > m_cb) + return nullptr; + void *pvret = m_pv + m_cbAllocated; + m_cbAllocated += cb; + return pvret; + } + }; + std::vector m_vecslabs; +public: + + void *allocate(size_t cb) { + if (m_vecslabs.empty() || !m_vecslabs.back().canAllocate(cb)) { + m_vecslabs.emplace_back(std::max(cb, SLAB_SIZE)); + } + return m_vecslabs.back().allocate(cb); + } +}; + +static uint64_t dictCStringHash(const void *key) { + return dictGenHashFunction((unsigned char*)key, strlen((char*)key)); +} + + +static void dictKeyDestructor(void *privdata, void *key) +{ + DICT_NOTUSED(privdata); + sdsfree((sds)key); +} + +static int dictCStringCompare(void *, const void *key1, const void *key2) +{ + int l1,l2; + + l1 = strlen((sds)key1); + l2 = strlen((sds)key2); + if (l1 != l2) return 0; + return memcmp(key1, key2, l1) == 0; +} + +dictType metadataLongLongDictType = { + dictCStringHash, /* hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + dictCStringCompare, /* key compare */ + dictKeyDestructor, /* key destructor */ + nullptr, /* val destructor */ + nullptr, /* allow to expand */ + nullptr /* async free destructor */ +}; + +SnapshotPayloadParseState::ParseStageName SnapshotPayloadParseState::getNextStage() { + if (stackParse.empty()) + return ParseStageName::Global; + + switch (stackParse.top().name) + { + case ParseStageName::None: + return ParseStageName::Global; + + case ParseStageName::Global: + if (stackParse.top().arraycur == 0) + return ParseStageName::MetaData; + else if (stackParse.top().arraycur == 1) + return ParseStageName::Databases; + break; + + case ParseStageName::MetaData: + return ParseStageName::KeyValuePair; + + case ParseStageName::Databases: + return ParseStageName::Dataset; + + case ParseStageName::Dataset: + return ParseStageName::KeyValuePair; + + default: + break; + } + throw "Bad protocol: corrupt state"; +} + +void SnapshotPayloadParseState::flushQueuedKeys() { + if (vecqueuedKeys.empty()) + return; + + // TODO: We can't finish parse until all the work functions finish + int idb = current_database; + serverAssert(vecqueuedKeys.size() == vecqueuedVals.size()); + auto sizePrev = vecqueuedKeys.size(); + ++insertsInFlight; + auto &insertsInFlightTmp = insertsInFlight; // C++ GRRRRRRRRRRRRRRRR, we don't want to capute "this" because that's dangerous + g_pserver->asyncworkqueue->AddWorkFunction([idb, vecqueuedKeys = std::move(this->vecqueuedKeys), vecqueuedKeysCb = std::move(this->vecqueuedKeysCb), vecqueuedVals = std::move(this->vecqueuedVals), vecqueuedValsCb = std::move(this->vecqueuedValsCb), &insertsInFlightTmp, pallocator = m_spallocator.release()]() mutable { + g_pserver->db[idb]->bulkStorageInsert(vecqueuedKeys.data(), vecqueuedKeysCb.data(), vecqueuedVals.data(), vecqueuedValsCb.data(), vecqueuedKeys.size()); + --insertsInFlightTmp; + delete pallocator; + }); + m_spallocator = std::make_unique(); + cbQueued = 0; + vecqueuedKeys.reserve(sizePrev); + vecqueuedKeysCb.reserve(sizePrev); + vecqueuedVals.reserve(sizePrev); + vecqueuedValsCb.reserve(sizePrev); + serverAssert(vecqueuedKeys.empty()); + serverAssert(vecqueuedVals.empty()); +} + + +SnapshotPayloadParseState::SnapshotPayloadParseState() { + // This is to represent the first array the client is intended to send us + ParseStage stage; + stage.name = ParseStageName::None; + stage.arraylen = 1; + stackParse.push(stage); + + dictLongLongMetaData = dictCreate(&metadataLongLongDictType, nullptr); + insertsInFlight = 0; + m_spallocator = std::make_unique(); +} + +SnapshotPayloadParseState::~SnapshotPayloadParseState() { + dictRelease(dictLongLongMetaData); +} + +const char *SnapshotPayloadParseState::getStateDebugName() { + switch (stackParse.top().name) { + case ParseStageName::None: + return "None"; + + case ParseStageName::Global: + return "Global"; + + case ParseStageName::MetaData: + return "MetaData"; + + case ParseStageName::Databases: + return "Databases"; + + case ParseStageName::KeyValuePair: + return "KeyValuePair"; + + case ParseStageName::Dataset: + return "Dataset"; + + default: + return "Unknown"; + } +} + +size_t SnapshotPayloadParseState::depth() const { return stackParse.size(); } + +void SnapshotPayloadParseState::trimState() { + while (!stackParse.empty() && (stackParse.top().arraycur == stackParse.top().arraylen)) + stackParse.pop(); + + if (stackParse.empty()) { + flushQueuedKeys(); + while (insertsInFlight > 0) { + // TODO: ProcessEventsWhileBlocked + aeReleaseLock(); + aeAcquireLock(); + } + } +} + +void SnapshotPayloadParseState::pushArray(long long size) { + if (stackParse.empty()) + throw "Bad Protocol: unexpected trailing data"; + + if (size == 0) { + stackParse.top().arraycur++; + return; + } + + if (stackParse.top().name == ParseStageName::Databases) { + flushQueuedKeys(); + current_database = stackParse.top().arraycur; + } + + ParseStage stage; + stage.name = getNextStage(); + stage.arraylen = size; + + if (stage.name == ParseStageName::Dataset) { + g_pserver->db[current_database]->expand(stage.arraylen); + } + + // Note: This affects getNextStage so ensure its after + stackParse.top().arraycur++; + stackParse.push(stage); +} + +void SnapshotPayloadParseState::pushValue(const char *rgch, long long cch) { + if (stackParse.empty()) + throw "Bad Protocol: unexpected trailing bulk string"; + + if (stackParse.top().arraycur >= static_cast(stackParse.top().arrvalues.size())) + throw "Bad protocol: Unexpected value"; + + auto &stage = stackParse.top(); + stage.arrvalues[stackParse.top().arraycur].first = (char*)m_spallocator->allocate(cch); + stage.arrvalues[stackParse.top().arraycur].second = cch; + memcpy(stage.arrvalues[stackParse.top().arraycur].first, rgch, cch); + stage.arraycur++; + switch (stage.name) { + case ParseStageName::KeyValuePair: + if (stage.arraycur == 2) { + // We loaded both pairs + if (stage.arrvalues[0].first == nullptr || stage.arrvalues[1].first == nullptr) + throw "Bad Protocol: Got array when expecing a string"; // A baddy could make us derefence the vector when its too small + vecqueuedKeys.push_back(stage.arrvalues[0].first); + vecqueuedKeysCb.push_back(stage.arrvalues[0].second); + vecqueuedVals.push_back(stage.arrvalues[1].first); + vecqueuedValsCb.push_back(stage.arrvalues[1].second); + stage.arrvalues[0].first = nullptr; + stage.arrvalues[1].first = nullptr; + cbQueued += vecqueuedKeysCb.back(); + cbQueued += vecqueuedValsCb.back(); + if (cbQueued >= queuedBatchLimit) + flushQueuedKeys(); + } + break; + + default: + throw "Bad Protocol: unexpected bulk string"; + } +} + +void SnapshotPayloadParseState::pushValue(long long value) { + if (stackParse.empty()) + throw "Bad Protocol: unexpected integer value"; + + stackParse.top().arraycur++; + + if (stackParse.top().arraycur != 2 || stackParse.top().arrvalues[0].first == nullptr) + throw "Bad Protocol: unexpected integer value"; + + dictEntry *de = dictAddRaw(dictLongLongMetaData, sdsnewlen(stackParse.top().arrvalues[0].first, stackParse.top().arrvalues[0].second), nullptr); + if (de == nullptr) + throw "Bad Protocol: metadata field sent twice"; + de->v.s64 = value; +} + +long long SnapshotPayloadParseState::getMetaDataLongLong(const char *szField) const { + dictEntry *de = dictFind(dictLongLongMetaData, szField); + + if (de == nullptr) { + serverLog(LL_WARNING, "Master did not send field: %s", szField); + throw false; + } + return de->v.s64; +} \ No newline at end of file diff --git a/src/SnapshotPayloadParseState.h b/src/SnapshotPayloadParseState.h new file mode 100644 index 000000000..fee42178a --- /dev/null +++ b/src/SnapshotPayloadParseState.h @@ -0,0 +1,64 @@ +#pragma once +#include +#include + +class SlabAllocator; + +class SnapshotPayloadParseState { + enum class ParseStageName { + None, + Global, + MetaData, + Databases, + Dataset, + KeyValuePair, + }; + struct ParseStage { + ParseStageName name; + long long arraylen = 0; + long long arraycur = 0; + std::array, 2> arrvalues; + + ParseStage() { + for (auto &pair : arrvalues) { + pair.first = nullptr; + pair.second = 0; + } + } + }; + + std::stack stackParse; + + + std::vector vecqueuedKeys; + std::vector vecqueuedKeysCb; + std::vector vecqueuedVals; + std::vector vecqueuedValsCb; + + + std::atomic insertsInFlight; + std::unique_ptr m_spallocator; + dict *dictLongLongMetaData = nullptr; + size_t cbQueued = 0; + static const size_t queuedBatchLimit = 64*1024*1024; // 64 MB + int current_database = -1; + + ParseStageName getNextStage(); + void flushQueuedKeys(); + +public: + SnapshotPayloadParseState(); + ~SnapshotPayloadParseState(); + + long long getMetaDataLongLong(const char *field) const; + + const char *getStateDebugName(); + + size_t depth() const; + + void trimState(); + void pushArray(long long size); + void pushValue(const char *rgch, long long cch); + void pushValue(long long value); + bool shouldThrottle() const { return insertsInFlight > (cserver.cthreads*4); } +}; \ No newline at end of file diff --git a/src/StorageCache.cpp b/src/StorageCache.cpp index af2ac12b7..cdc380105 100644 --- a/src/StorageCache.cpp +++ b/src/StorageCache.cpp @@ -97,17 +97,47 @@ void StorageCache::insert(sds key, const void *data, size_t cbdata, bool fOverwr m_spstorage->insert(key, sdslen(key), (void*)data, cbdata, fOverwrite); } -void StorageCache::bulkInsert(sds *rgkeys, sds *rgvals, size_t celem) +long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing); +void StorageCache::bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem) { + std::vector vechashes; + if (m_pdict != nullptr) { + vechashes.reserve(celem); + + for (size_t ielem = 0; ielem < celem; ++ielem) { + dictEntry *de = (dictEntry*)zmalloc(sizeof(dictEntry)); + de->key = (void*)dictGenHashFunction(rgkeys[ielem], (int)rgcbkeys[ielem]); + de->v.u64 = 1; + vechashes.push_back(de); + } + } + std::unique_lock ul(m_lock); bulkInsertsInProgress++; if (m_pdict != nullptr) { - for (size_t ielem = 0; ielem < celem; ++ielem) { - cacheKey(rgkeys[ielem]); + for (dictEntry *de : vechashes) { + if (dictIsRehashing(m_pdict)) dictRehash(m_pdict,1); + /* Get the index of the new element, or -1 if + * the element already exists. */ + long index; + if ((index = _dictKeyIndex(m_pdict, de->key, (uint64_t)de->key, nullptr)) == -1) { + dictEntry *de = dictFind(m_pdict, de->key); + serverAssert(de != nullptr); + de->v.s64++; + m_collisionCount++; + zfree(de); + } else { + int iht = dictIsRehashing(m_pdict) ? 1 : 0; + de->next = m_pdict->ht[iht].table[index]; + m_pdict->ht[iht].table[index] = de; + m_pdict->ht[iht].used++; + } } } ul.unlock(); - m_spstorage->bulkInsert(rgkeys, rgvals, celem); + + m_spstorage->bulkInsert(rgkeys, rgcbkeys, rgvals, rgcbvals, celem); + bulkInsertsInProgress--; } @@ -119,6 +149,14 @@ const StorageCache *StorageCache::clone() return cacheNew; } +void StorageCache::expand(uint64_t slots) +{ + std::unique_lock ul(m_lock); + if (m_pdict) { + dictExpand(m_pdict, slots); + } +} + void StorageCache::retrieve(sds key, IStorage::callbackSingle fn) const { std::unique_lock ul(m_lock); diff --git a/src/StorageCache.h b/src/StorageCache.h index 184eb60bd..879b512dc 100644 --- a/src/StorageCache.h +++ b/src/StorageCache.h @@ -40,11 +40,12 @@ public: void clear(); void insert(sds key, const void *data, size_t cbdata, bool fOverwrite); - void bulkInsert(sds *rgkeys, sds *rgvals, size_t celem); + void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem); void retrieve(sds key, IStorage::callbackSingle fn) const; bool erase(sds key); void emergencyFreeCache(); bool keycacheIsEnabled() const { return m_pdict != nullptr; } + void expand(uint64_t slots); bool enumerate(IStorage::callback fn) const { return m_spstorage->enumerate(fn); } diff --git a/src/db.cpp b/src/db.cpp index cc6092831..8417a008a 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -2903,8 +2903,10 @@ bool redisDbPersistentData::processChanges(bool fSnapshot) { if (m_fAllChanged) { - m_spstorage->clear(); - storeDatabase(); + if (dictSize(m_pdict) > 0 || m_spstorage->count() > 0) { // in some cases we may have pre-sized the StorageCache's dict, and we don't want clear to ruin it + m_spstorage->clear(); + storeDatabase(); + } m_fAllChanged = 0; } else @@ -2936,15 +2938,19 @@ void redisDbPersistentData::processChangesAsync(std::atomic &pendingJobs) dictIterator *di = dictGetIterator(dictNew); dictEntry *de; std::vector veckeys; + std::vector veccbkeys; std::vector vecvals; + std::vector veccbvals; while ((de = dictNext(di)) != nullptr) { robj *o = (robj*)dictGetVal(de); sds temp = serializeStoredObjectAndExpire(this, (const char*) dictGetKey(de), o); veckeys.push_back((sds)dictGetKey(de)); + veccbkeys.push_back(sdslen((sds)dictGetKey(de))); vecvals.push_back(temp); + veccbvals.push_back(sdslen(temp)); } - m_spstorage->bulkInsert(veckeys.data(), vecvals.data(), veckeys.size()); + m_spstorage->bulkInsert(veckeys.data(), veccbkeys.data(), vecvals.data(), veccbvals.data(), veckeys.size()); for (auto val : vecvals) sdsfree(val); dictReleaseIterator(di); @@ -2953,6 +2959,11 @@ void redisDbPersistentData::processChangesAsync(std::atomic &pendingJobs) }); } +void redisDbPersistentData::bulkStorageInsert(char **rgKeys, size_t *rgcbKeys, char **rgVals, size_t *rgcbVals, size_t celem) +{ + m_spstorage->bulkInsert(rgKeys, rgcbKeys, rgVals, rgcbVals, celem); +} + void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot **psnapshotFree) { if (m_pdbSnapshotStorageFlush) diff --git a/src/dict.cpp b/src/dict.cpp index e10567c8d..0268c19d2 100644 --- a/src/dict.cpp +++ b/src/dict.cpp @@ -63,7 +63,7 @@ static unsigned int dict_force_resize_ratio = 5; static int _dictExpandIfNeeded(dict *ht); static unsigned long _dictNextPower(unsigned long size); -static long _dictKeyIndex(dict *ht, const void *key, uint64_t hash, dictEntry **existing); +long _dictKeyIndex(dict *ht, const void *key, uint64_t hash, dictEntry **existing); static int _dictInit(dict *ht, dictType *type, void *privDataPtr); /* -------------------------- hash functions -------------------------------- */ @@ -1366,7 +1366,7 @@ static unsigned long _dictNextPower(unsigned long size) * * Note that if we are in the process of rehashing the hash table, the * index is always returned in the context of the second (new) hash table. */ -static long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing) +long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing) { unsigned long idx, table; dictEntry *he; diff --git a/src/networking.cpp b/src/networking.cpp index 29c4ac206..4d4eb32c0 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -237,7 +237,7 @@ void clientInstallWriteHandler(client *c) { * writes at this stage. */ if (!(c->flags & CLIENT_PENDING_WRITE) && - (c->replstate == REPL_STATE_NONE || + (c->replstate == REPL_STATE_NONE || c->replstate == SLAVE_STATE_FASTSYNC_TX || c->replstate == SLAVE_STATE_FASTSYNC_DONE || (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack))) { AssertCorrectThread(c); @@ -1566,7 +1566,7 @@ bool freeClient(client *c) { /* If a client is protected, yet we need to free it right now, make sure * to at least use asynchronous freeing. */ - if (c->flags & CLIENT_PROTECTED || c->casyncOpsPending) { + if (c->flags & CLIENT_PROTECTED || c->casyncOpsPending || c->replstate == SLAVE_STATE_FASTSYNC_TX) { freeClientAsync(c); return false; } @@ -1791,7 +1791,7 @@ int writeToClient(client *c, int handler_installed) { /* We can only directly read from the replication backlog if the client is a replica, so only attempt to do so if that's the case. */ - if (c->flags & CLIENT_SLAVE && !(c->flags & CLIENT_MONITOR)) { + if (c->flags & CLIENT_SLAVE && !(c->flags & CLIENT_MONITOR) && c->replstate == SLAVE_STATE_ONLINE) { std::unique_lock repl_backlog_lock (g_pserver->repl_backlog_lock); while (clientHasPendingReplies(c)) { @@ -1833,9 +1833,11 @@ int writeToClient(client *c, int handler_installed) { } } else { while(clientHasPendingReplies(c)) { - serverAssert(!(c->flags & CLIENT_SLAVE) || c->flags & CLIENT_MONITOR); if (c->bufpos > 0) { - nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen); + auto bufpos = c->bufpos; + lock.unlock(); + nwritten = connWrite(c->conn,c->buf+c->sentlen,bufpos-c->sentlen); + lock.lock(); if (nwritten <= 0) break; c->sentlen += nwritten; totwritten += nwritten; @@ -1854,7 +1856,10 @@ int writeToClient(client *c, int handler_installed) { continue; } - nwritten = connWrite(c->conn, o->buf() + c->sentlen, o->used - c->sentlen); + auto used = o->used; + lock.unlock(); + nwritten = connWrite(c->conn, o->buf() + c->sentlen, used - c->sentlen); + lock.lock(); if (nwritten <= 0) break; c->sentlen += nwritten; totwritten += nwritten; @@ -1993,7 +1998,7 @@ void ProcessPendingAsyncWrites() ae_flags |= AE_BARRIER; } - if (!((c->replstate == REPL_STATE_NONE || + if (!((c->replstate == REPL_STATE_NONE || c->replstate == SLAVE_STATE_FASTSYNC_TX || (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))) continue; @@ -2063,9 +2068,9 @@ int handleClientsWithPendingWrites(int iel, int aof_state) { /* If a client is protected, don't do anything, * that may trigger write error or recreate handler. */ - if (flags & CLIENT_PROTECTED) continue; + if ((flags & CLIENT_PROTECTED) && !(flags & CLIENT_SLAVE)) continue; - std::unique_locklock)> lock(c->lock); + //std::unique_locklock)> lock(c->lock); /* Don't write to clients that are going to be closed anyway. */ if (c->flags & CLIENT_CLOSE_ASAP) continue; @@ -2075,11 +2080,9 @@ int handleClientsWithPendingWrites(int iel, int aof_state) { { if (c->flags & CLIENT_CLOSE_ASAP) { - lock.release(); // still locked AeLocker ae; - ae.arm(c); - if (!freeClient(c)) // writeToClient will only async close, but there's no need to wait - c->lock.unlock(); // if we just got put on the async close list, then we need to remove the lock + ae.arm(nullptr); + freeClient(c); // writeToClient will only async close, but there's no need to wait } continue; } @@ -3829,7 +3832,7 @@ void asyncCloseClientOnOutputBufferLimitReached(client *c) { if (!c->conn) return; /* It is unsafe to free fake clients. */ serverAssert(c->reply_bytes < SIZE_MAX-(1024*64)); if (c->reply_bytes == 0 || c->flags & CLIENT_CLOSE_ASAP) return; - if (checkClientOutputBufferLimits(c)) { + if (checkClientOutputBufferLimits(c) && c->replstate != SLAVE_STATE_FASTSYNC_TX) { sds client = catClientInfoString(sdsempty(),c); freeClientAsync(c); diff --git a/src/replication.cpp b/src/replication.cpp index 91036723a..d8ffcca99 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -34,6 +34,7 @@ #include "cluster.h" #include "bio.h" #include "aelocker.h" +#include "SnapshotPayloadParseState.h" #include #include @@ -929,6 +930,224 @@ need_full_resync: return C_ERR; } +int checkClientOutputBufferLimits(client *c); +class replicationBuffer { + std::vector replicas; + clientReplyBlock *reply = nullptr; + size_t writtenBytesTracker = 0; + +public: + replicationBuffer() { + reply = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + (PROTO_REPLY_CHUNK_BYTES*2)); + reply->size = zmalloc_usable_size(reply) - sizeof(clientReplyBlock); + reply->used = 0; + } + + ~replicationBuffer() { + zfree(reply); + } + + void addReplica(client *replica) { + replicas.push_back(replica); + replicationSetupSlaveForFullResync(replica,getPsyncInitialOffset()); + // Optimize the socket for bulk transfer + //connDisableTcpNoDelay(replica->conn); + } + + bool isActive() const { return !replicas.empty(); } + + void flushData() { + if (reply == nullptr) + return; + size_t written = reply->used; + aeAcquireLock(); + for (size_t ireplica = 0; ireplica < replicas.size(); ++ireplica) { + auto replica = replicas[ireplica]; + if (replica->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP) { + replica->replstate = REPL_STATE_NONE; + continue; + } + + while (checkClientOutputBufferLimits(replica) + && (replica->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP) == 0) { + aeReleaseLock(); + usleep(10); + aeAcquireLock(); + } + + if (ireplica == replicas.size()-1 && replica->replyAsync == nullptr) { + if (prepareClientToWrite(replica) == C_OK) { + replica->replyAsync = reply; + reply = nullptr; + } + } else { + addReplyProto(replica, reply->buf(), reply->size); + } + } + ProcessPendingAsyncWrites(); + replicas.erase(std::remove_if(replicas.begin(), replicas.end(), [](const client *c)->bool{ return c->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP;}), replicas.end()); + aeReleaseLock(); + if (reply != nullptr) { + reply->used = 0; + } + writtenBytesTracker += written; + } + + void addData(const char *data, unsigned long size) { + if (reply != nullptr && (size + reply->used) > reply->size) + flushData(); + + if (reply != nullptr && reply->size < size) { + serverAssert(reply->used == 0); // flush should have happened + zfree(reply); + reply = nullptr; + } + + if (reply == nullptr) { + reply = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + std::max(size, (unsigned long)(PROTO_REPLY_CHUNK_BYTES*2))); + reply->size = zmalloc_usable_size(reply) - sizeof(clientReplyBlock); + reply->used = 0; + } + + serverAssert((reply->size - reply->used) >= size); + memcpy(reply->buf() + reply->used, data, size); + reply->used += size; + } + + void addLongWithPrefix(long val, char prefix) { + char buf[128]; + int len; + + buf[0] = prefix; + len = ll2string(buf+1,sizeof(buf)-1,val); + buf[len+1] = '\r'; + buf[len+2] = '\n'; + addData(buf, len+3); + } + + void addArrayLen(int len) { + addLongWithPrefix(len, '*'); + } + + void addLong(long val) { + addLongWithPrefix(val, ':'); + } + + void addString(const char *s, unsigned long len) { + addLongWithPrefix(len, '$'); + addData(s, len); + addData("\r\n", 2); + } + + size_t cbWritten() const { return writtenBytesTracker; } + + void end() { + flushData(); + for (auto replica : replicas) { + // Return to original settings + //if (!g_pserver->repl_disable_tcp_nodelay) + // connEnableTcpNoDelay(replica->conn); + } + } + + void putSlavesOnline() { + for (auto replica : replicas) { + replica->replstate = SLAVE_STATE_FASTSYNC_DONE; + replica->repl_put_online_on_ack = 1; + } + } +}; + +int rdbSaveSnapshotForReplication(struct rdbSaveInfo *rsi) { + // TODO: This needs to be on a background thread + int retval = C_OK; + serverAssert(GlobalLocksAcquired()); + serverLog(LL_NOTICE, "Starting storage provider fast full sync with target: %s", "disk"); + + std::shared_ptr spreplBuf = std::make_shared(); + listNode *ln; + listIter li; + client *replica = nullptr; + listRewind(g_pserver->slaves, &li); + while (replica == nullptr && (ln = listNext(&li))) { + client *replicaCur = (client*)listNodeValue(ln); + if ((replicaCur->slave_capa & SLAVE_CAPA_ROCKSDB_SNAPSHOT) && (replicaCur->replstate == SLAVE_STATE_WAIT_BGSAVE_START)) { + replica = replicaCur; + spreplBuf->addReplica(replica); + replicaCur->replstate = SLAVE_STATE_FASTSYNC_TX; + } + } + serverAssert(replica != nullptr); + + g_pserver->asyncworkqueue->AddWorkFunction([repl_stream_db = rsi->repl_stream_db, spreplBuf = std::move(spreplBuf)]{ + int retval = C_OK; + auto timeStart = ustime(); + auto lastLogTime = timeStart; + size_t cbData = 0; + size_t cbLastUpdate = 0; + auto &replBuf = *spreplBuf; + + replBuf.addArrayLen(2); // Two sections: Metadata and databases + + // MetaData + replBuf.addArrayLen(1); + replBuf.addArrayLen(2); + replBuf.addString("repl-stream-db", 14); + replBuf.addLong(repl_stream_db); + + // Databases + replBuf.addArrayLen(cserver.dbnum); + for (int idb = 0; idb < cserver.dbnum; ++idb) { + std::unique_ptr spsnapshot = g_pserver->db[idb]->CloneStorageCache(); + size_t snapshotDeclaredCount = spsnapshot->count(); + replBuf.addArrayLen(snapshotDeclaredCount); + size_t count = 0; + bool result = spsnapshot->enumerate([&replBuf, &count, &cbData, &lastLogTime, timeStart, &cbLastUpdate](const char *rgchKey, size_t cchKey, const void *rgchVal, size_t cchVal) -> bool{ + replBuf.addArrayLen(2); + + replBuf.addString(rgchKey, cchKey); + replBuf.addString((const char *)rgchVal, cchVal); + ++count; + if ((count % 8092) == 0) { + auto curTime = ustime(); + if ((curTime - lastLogTime) > 60000000) { + auto usec = curTime - lastLogTime; + serverLog(LL_NOTICE, "Replication status: Transferred %zuMB (%.2fGbit/s)", replBuf.cbWritten()/1024/1024, ((replBuf.cbWritten()-cbLastUpdate)*8.0)/(usec/1000000.0)/1000000000.0); + cbLastUpdate = replBuf.cbWritten(); + lastLogTime = ustime(); + } + } + cbData += cchKey + cchVal; + return replBuf.isActive(); + }); + + if (!result) { + retval = C_ERR; + break; + } + serverAssert(count == snapshotDeclaredCount); + } + + replBuf.end(); + + if (!replBuf.isActive()) { + retval = C_ERR; + } + + serverLog(LL_NOTICE, "Snapshot replication done: %s", (retval == C_OK) ? "success" : "failed"); + auto usec = ustime() - timeStart; + serverLog(LL_NOTICE, "Transferred %zuMB total (%zuMB data) in %.2f seconds. (%.2fGbit/s)", spreplBuf->cbWritten()/1024/1024, cbData/1024/1024, usec/1000000.0, (spreplBuf->cbWritten()*8.0)/(usec/1000000.0)/1000000000.0); + if (retval == C_OK) { + aeAcquireLock(); + replicationScriptCacheFlush(); + replBuf.putSlavesOnline(); + aeReleaseLock(); + } + }); + + return retval; +} + /* Start a BGSAVE for replication goals, which is, selecting the disk or * socket target depending on the configuration, and making sure that * the script cache is flushed before to start. @@ -962,7 +1181,9 @@ int startBgsaveForReplication(int mincapa) { /* Only do rdbSave* when rsiptr is not NULL, * otherwise replica will miss repl-stream-db. */ if (rsiptr) { - if (socket_target) + if (mincapa & SLAVE_CAPA_ROCKSDB_SNAPSHOT && g_pserver->m_pstorageFactory) + retval = rdbSaveSnapshotForReplication(rsiptr); + else if (socket_target) retval = rdbSaveToSlavesSockets(rsiptr); else retval = rdbSaveBackground(rsiptr); @@ -1334,6 +1555,8 @@ void replconfCommand(client *c) { c->slave_capa |= SLAVE_CAPA_PSYNC2; else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]), "activeExpire")) c->slave_capa |= SLAVE_CAPA_ACTIVE_EXPIRE; + else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]), "rocksdb-snapshot-load")) + c->slave_capa |= SLAVE_CAPA_ROCKSDB_SNAPSHOT; fCapaCommand = true; } else if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"ack")) { @@ -1358,7 +1581,7 @@ void replconfCommand(client *c) { * quick check first (instead of waiting for the next ACK. */ if (g_pserver->child_type == CHILD_TYPE_RDB && c->replstate == SLAVE_STATE_WAIT_BGSAVE_END) checkChildrenDone(); - if (c->repl_put_online_on_ack && c->replstate == SLAVE_STATE_ONLINE) + if (c->repl_put_online_on_ack && (c->replstate == SLAVE_STATE_ONLINE || c->replstate == SLAVE_STATE_FASTSYNC_DONE)) putSlaveOnline(c); /* Note: this command does not reply anything! */ return; @@ -1400,6 +1623,8 @@ void replconfCommand(client *c) { sds reply = sdsnew("+OK"); if (g_pserver->fActiveReplica) reply = sdscat(reply, " active-replica"); + if (g_pserver->m_pstorageFactory && (c->slave_capa & SLAVE_CAPA_ROCKSDB_SNAPSHOT) && !g_pserver->fActiveReplica) + reply = sdscat(reply, " rocksdb-snapshot-save"); reply = sdscat(reply, "\r\n"); addReplySds(c, reply); } else { @@ -2074,25 +2299,163 @@ void disklessLoadDiscardBackup(const dbBackup *buckup, int flag) { discardDbBackup(buckup, flag, replicationEmptyDbCallback); } +size_t parseCount(const char *rgch, size_t cch, long long *pvalue) { + size_t cchNumeral = 0; + + while ((cch > (cchNumeral+1)) && isdigit(rgch[1 + cchNumeral])) ++cchNumeral; + + if (cch < (cchNumeral+1+2)) { // +2 is for the \r\n we expect + throw true; // continuable + } + + if (rgch[cchNumeral+1] != '\r' || rgch[cchNumeral+2] != '\n') { + serverLog(LL_WARNING, "Bad protocol from MASTER: %s", rgch); + throw false; + } + + if (!string2ll(rgch+1, cchNumeral, pvalue) || *pvalue < 0) { + serverLog(LL_WARNING, "Bad protocol from MASTER: %s", rgch); + throw false; + } + + return cchNumeral + 3; +} + +bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi) { + int fUpdate = g_pserver->fActiveReplica || g_pserver->enable_multimaster; + serverAssert(GlobalLocksAcquired()); + serverAssert(mi->master == nullptr); + bool fFinished = false; + + if (mi->bulkreadBuffer == nullptr) { + mi->bulkreadBuffer = sdsempty(); + mi->parseState = new SnapshotPayloadParseState(); + if (!fUpdate) { + int empty_db_flags = g_pserver->repl_slave_lazy_flush ? EMPTYDB_ASYNC : + EMPTYDB_NO_FLAGS; + serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data"); + emptyDb(-1,empty_db_flags,replicationEmptyDbCallback); + } + } + + for (int iter = 0; iter < 10; ++iter) { + if (mi->parseState->shouldThrottle()) + return false; + + auto readlen = PROTO_IOBUF_LEN; + auto qblen = sdslen(mi->bulkreadBuffer); + mi->bulkreadBuffer = sdsMakeRoomFor(mi->bulkreadBuffer, readlen); + + auto nread = connRead(conn, mi->bulkreadBuffer+qblen, readlen); + if (nread <= 0) { + if (connGetState(conn) != CONN_STATE_CONNECTED) + cancelReplicationHandshake(mi, true); + return false; + } + mi->repl_transfer_lastio = g_pserver->unixtime; + sdsIncrLen(mi->bulkreadBuffer,nread); + + size_t offset = 0; + + try { + if (sdslen(mi->bulkreadBuffer) > cserver.client_max_querybuf_len) { + throw "Full Sync Streaming Buffer Exceeded (increase client_max_querybuf_len)"; + } + + while (sdslen(mi->bulkreadBuffer) > offset) { + // Pop completed items + mi->parseState->trimState(); + + if (mi->bulkreadBuffer[offset] == '*') { + // Starting an array + long long arraySize; + + // Lets read the array length + offset += parseCount(mi->bulkreadBuffer + offset, sdslen(mi->bulkreadBuffer) - offset, &arraySize); + + mi->parseState->pushArray(arraySize); + } else if (mi->bulkreadBuffer[offset] == '$') { + // Loading in a string + long long payloadsize = 0; + + // Lets read the string length + size_t offsetCount = parseCount(mi->bulkreadBuffer + offset, sdslen(mi->bulkreadBuffer) - offset, &payloadsize); + + // OK we know how long the string is, now lets make sure the payload is here. + if (sdslen(mi->bulkreadBuffer) < (offset + offsetCount + payloadsize + 2)) { + goto LContinue; // wait for more data (note: we could throw true here, but throw is way more expensive) + } + + mi->parseState->pushValue(mi->bulkreadBuffer + offset + offsetCount, payloadsize); + + // On to the next one + offset += offsetCount + payloadsize + 2; + } else if (mi->bulkreadBuffer[offset] == ':') { + // Numeral + long long value; + + size_t offsetValue = parseCount(mi->bulkreadBuffer + offset, sdslen(mi->bulkreadBuffer) - offset, &value); + + mi->parseState->pushValue(value); + offset += offsetValue; + } else { + serverLog(LL_WARNING, "Bad protocol from MASTER: %s", mi->bulkreadBuffer+offset); + cancelReplicationHandshake(mi, true); + return false; + } + } + + sdsrange(mi->bulkreadBuffer, offset, -1); + offset = 0; + + // Cleanup the remaining stack + mi->parseState->trimState(); + + if (mi->parseState->depth() != 0) + return false; + + rsi.repl_stream_db = mi->parseState->getMetaDataLongLong("repl-stream-db"); + fFinished = true; + break; // We're done!!! + } + catch (const char *sz) { + serverLog(LL_WARNING, "%s", sz); + cancelReplicationHandshake(mi, true); + return false; + } catch (bool fContinue) { + if (!fContinue) { + cancelReplicationHandshake(mi, true); + return false; + } + } + LContinue: + sdsrange(mi->bulkreadBuffer, offset, -1); + } + + if (!fFinished) + return false; + + serverLog(LL_NOTICE, "Fast sync complete"); + sdsfree(mi->bulkreadBuffer); + mi->bulkreadBuffer = nullptr; + delete mi->parseState; + mi->parseState = nullptr; + return true; +} + /* Asynchronously read the SYNC payload we receive from a master */ #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */ -void readSyncBulkPayload(connection *conn) { +bool readSyncBulkPayloadRdb(connection *conn, redisMaster *mi, rdbSaveInfo &rsi, int &usemark) { char buf[PROTO_IOBUF_LEN]; ssize_t nread, readlen, nwritten; int use_diskless_load = useDisklessLoad(); const dbBackup *diskless_load_backup = NULL; - rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; rsi.fForceSetKey = !!g_pserver->fActiveReplica; int empty_db_flags = g_pserver->repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS; off_t left; // Should we update our database, or create from scratch? int fUpdate = g_pserver->fActiveReplica || g_pserver->enable_multimaster; - redisMaster *mi = (redisMaster*)connGetPrivateData(conn); - if (mi == nullptr) { - // We're about to be free'd so bail out - return; - } serverAssert(GlobalLocksAcquired()); serverAssert(mi->master == nullptr); @@ -2101,7 +2464,6 @@ void readSyncBulkPayload(connection *conn) { * from the server: when they match, we reached the end of the transfer. */ static char eofmark[CONFIG_RUN_ID_SIZE]; static char lastbytes[CONFIG_RUN_ID_SIZE]; - static int usemark = 0; /* If repl_transfer_size == -1 we still have to read the bulk length * from the master reply. */ @@ -2123,7 +2485,7 @@ void readSyncBulkPayload(connection *conn) { * the connection live. So we refresh our last interaction * timestamp. */ mi->repl_transfer_lastio = g_pserver->unixtime; - return; + return false; } else if (buf[0] != '$') { serverLog(LL_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf); goto error; @@ -2157,7 +2519,7 @@ void readSyncBulkPayload(connection *conn) { (long long) mi->repl_transfer_size, use_diskless_load? "to parser":"to disk"); } - return; + return false; } if (!use_diskless_load) { @@ -2174,12 +2536,12 @@ void readSyncBulkPayload(connection *conn) { if (nread <= 0) { if (connGetState(conn) == CONN_STATE_CONNECTED) { /* equivalent to EAGAIN */ - return; + return false; } serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s", (nread == -1) ? strerror(errno) : "connection lost"); cancelReplicationHandshake(mi, true); - return; + return false; } g_pserver->stat_net_input_bytes += nread; @@ -2248,7 +2610,7 @@ void readSyncBulkPayload(connection *conn) { /* If the transfer is yet not complete, we need to read more, so * return ASAP and wait for the handler to be called again. */ - if (!eof_reached) return; + if (!eof_reached) return false; } /* We reach this point in one of the following cases: @@ -2320,7 +2682,7 @@ void readSyncBulkPayload(connection *conn) { /* Note that there's no point in restarting the AOF on SYNC * failure, it'll be restarted when sync succeeds or the replica * gets promoted. */ - return; + return false; } /* RDB loading succeeded if we reach this point. */ @@ -2340,7 +2702,7 @@ void readSyncBulkPayload(connection *conn) { serverLog(LL_WARNING,"Replication stream EOF marker is broken"); cancelReplicationHandshake(mi,true); rioFreeConn(&rdb, NULL); - return; + return false; } } @@ -2372,7 +2734,7 @@ void readSyncBulkPayload(connection *conn) { "MASTER <-> REPLICA synchronization: %s", strerror(errno)); cancelReplicationHandshake(mi,true); - return; + return false; } /* Rename rdb like renaming rewrite aof asynchronously. */ @@ -2385,7 +2747,7 @@ void readSyncBulkPayload(connection *conn) { g_pserver->rdb_filename, strerror(errno)); cancelReplicationHandshake(mi,true); if (old_rdb_fd != -1) close(old_rdb_fd); - return; + return false; } rdb_filename = g_pserver->rdb_filename; @@ -2415,7 +2777,7 @@ void readSyncBulkPayload(connection *conn) { } /* Note that there's no point in restarting the AOF on sync failure, it'll be restarted when sync succeeds or replica promoted. */ - return; + return false; } /* Cleanup. */ @@ -2433,6 +2795,32 @@ void readSyncBulkPayload(connection *conn) { mi->repl_transfer_tmpfile = NULL; } + return true; +error: + cancelReplicationHandshake(mi,true); + return false; +} + +void readSyncBulkPayload(connection *conn) { + rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; + redisMaster *mi = (redisMaster*)connGetPrivateData(conn); + static int usemark = 0; + if (mi == nullptr) { + // We're about to be free'd so bail out + return; + } + + if (mi->isRocksdbSnapshotRepl) { + if (!readSnapshotBulkPayload(conn, mi, rsi)) + return; + } else { + if (!readSyncBulkPayloadRdb(conn, mi, rsi, usemark)) + return; + } + + // Should we update our database, or create from scratch? + int fUpdate = g_pserver->fActiveReplica || g_pserver->enable_multimaster; + /* Final setup of the connected slave <- master link */ replicationCreateMasterClient(mi,mi->repl_transfer_s,rsi.repl_stream_db); mi->repl_transfer_s = nullptr; @@ -2475,17 +2863,13 @@ void readSyncBulkPayload(connection *conn) { } /* Send the initial ACK immediately to put this replica in online state. */ - if (usemark) replicationSendAck(mi); + if (usemark || mi->isRocksdbSnapshotRepl) replicationSendAck(mi); /* Restart the AOF subsystem now that we finished the sync. This * will trigger an AOF rewrite, and when done will start appending * to the new file. */ if (g_pserver->aof_enabled) restartAOFAfterSYNC(); return; - -error: - cancelReplicationHandshake(mi,true); - return; } char *receiveSynchronousResponse(redisMaster *mi, connection *conn) { @@ -2808,12 +3192,15 @@ void parseMasterCapa(redisMaster *mi, sds strcapa) char *pchEnd = szStart; mi->isActive = false; + mi->isRocksdbSnapshotRepl = false; for (;;) { if (*pchEnd == ' ' || *pchEnd == '\0') { // Parse the word if (strncmp(szStart, "active-replica", pchEnd - szStart) == 0) { mi->isActive = true; + } else if (strncmp(szStart, "rocksdb-snapshot-save", pchEnd - szStart) == 0) { + mi->isRocksdbSnapshotRepl = true; } szStart = pchEnd + 1; } @@ -2945,8 +3332,20 @@ void syncWithMaster(connection *conn) { * PSYNC2: supports PSYNC v2, so understands +CONTINUE . * * The master will ignore capabilities it does not understand. */ - err = sendCommand(conn,"REPLCONF", - "capa","eof","capa","psync2","capa","activeExpire",NULL); + + + std::vector veccapabilities = { + "REPLCONF", + "capa","eof", + "capa","psync2", + "capa","activeExpire", + }; + if (g_pserver->m_pstorageFactory && !g_pserver->fActiveReplica) { + veccapabilities.push_back("capa"); + veccapabilities.push_back("rocksdb-snapshot-load"); + } + + err = sendCommandArgv(conn, veccapabilities.size(), veccapabilities.data(), nullptr); if (err) goto write_error; /* Send UUID */ @@ -3290,6 +3689,15 @@ void replicationAbortSyncTransfer(redisMaster *mi) { * * Otherwise zero is returned and no operation is performed at all. */ int cancelReplicationHandshake(redisMaster *mi, int reconnect) { + if (mi->bulkreadBuffer != nullptr) { + sdsfree(mi->bulkreadBuffer); + mi->bulkreadBuffer = nullptr; + } + if (mi->parseState) { + delete mi->parseState; + mi->parseState = nullptr; + } + if (mi->repl_state == REPL_STATE_TRANSFER) { replicationAbortSyncTransfer(mi); mi->repl_state = REPL_STATE_CONNECT; @@ -4283,6 +4691,13 @@ void replicationCron(void) { client *replica = (client*)ln->value; std::unique_lock ul(replica->lock); + if (replica->replstate == SLAVE_STATE_FASTSYNC_DONE && !clientHasPendingReplies(replica)) { + serverLog(LL_WARNING, "Putting replica online"); + replica->postFunction([](client *c){ + putSlaveOnline(c); + }); + } + if (replica->replstate == SLAVE_STATE_ONLINE) { if (replica->flags & CLIENT_PRE_PSYNC) continue; diff --git a/src/sds.h b/src/sds.h index d16906f30..0d77f2772 100644 --- a/src/sds.h +++ b/src/sds.h @@ -369,6 +369,8 @@ public: bool operator==(const char *other) const { + if (other == nullptr || m_str == nullptr) + return other == m_str; return sdscmp(m_str, other) == 0; } @@ -396,8 +398,11 @@ public: {} sdsstring(const sdsstring &other) - : sdsview(sdsdup(other.m_str)) - {} + : sdsview() + { + if (other.m_str != nullptr) + m_str = sdsdup(other.m_str); + } sdsstring(const char *rgch, size_t cch) : sdsview(sdsnewlen(rgch, cch)) diff --git a/src/server.h b/src/server.h index 5a067090b..656560868 100644 --- a/src/server.h +++ b/src/server.h @@ -607,12 +607,15 @@ typedef enum { #define SLAVE_STATE_WAIT_BGSAVE_END 7 /* Waiting RDB file creation to finish. */ #define SLAVE_STATE_SEND_BULK 8 /* Sending RDB file to replica. */ #define SLAVE_STATE_ONLINE 9 /* RDB file transmitted, sending just updates. */ +#define SLAVE_STATE_FASTSYNC_TX 10 +#define SLAVE_STATE_FASTSYNC_DONE 11 /* Slave capabilities. */ #define SLAVE_CAPA_NONE 0 #define SLAVE_CAPA_EOF (1<<0) /* Can parse the RDB EOF streaming format. */ #define SLAVE_CAPA_PSYNC2 (1<<1) /* Supports PSYNC2 protocol. */ #define SLAVE_CAPA_ACTIVE_EXPIRE (1<<2) /* Will the slave perform its own expirations? (Don't send delete) */ +#define SLAVE_CAPA_ROCKSDB_SNAPSHOT (1<<3) /* Synchronous read timeout - replica side */ #define CONFIG_REPL_SYNCIO_TIMEOUT 5 @@ -1102,7 +1105,12 @@ public: size_t slots() const { return dictSlots(m_pdict); } size_t size(bool fCachedOnly = false) const; - void expand(uint64_t slots) { dictExpand(m_pdict, slots); } + void expand(uint64_t slots) { + if (m_spstorage) + m_spstorage->expand(slots); + else + dictExpand(m_pdict, slots); + } void trackkey(robj_roptr o, bool fUpdate) { @@ -1193,6 +1201,9 @@ public: bool FSnapshot() const { return m_spdbSnapshotHOLDER != nullptr; } + std::unique_ptr CloneStorageCache() { return std::unique_ptr(m_spstorage->clone()); } + void bulkStorageInsert(char **rgKeys, size_t *rgcbKeys, char **rgVals, size_t *rgcbVals, size_t celem); + dict_iter find_cached_threadsafe(const char *key) const; protected: @@ -1352,6 +1363,8 @@ struct redisDb : public redisDbPersistentDataSnapshot using redisDbPersistentData::prepOverwriteForSnapshot; using redisDbPersistentData::FRehashing; using redisDbPersistentData::FTrackingChanges; + using redisDbPersistentData::CloneStorageCache; + using redisDbPersistentData::bulkStorageInsert; public: expireset::setiter expireitr; @@ -1673,7 +1686,7 @@ struct client { size_t argv_len_sumActive = 0; bool FPendingReplicaWrite() const { - return repl_curr_off != repl_end_off; + return repl_curr_off != repl_end_off && replstate == SLAVE_STATE_ONLINE; } // post a function from a non-client thread to run on its client thread @@ -2058,6 +2071,7 @@ struct redisMaster { long long master_initial_offset; /* Master PSYNC offset. */ bool isActive = false; + bool isRocksdbSnapshotRepl = false; int repl_state; /* Replication status if the instance is a replica */ off_t repl_transfer_size; /* Size of RDB to read from master during sync. */ off_t repl_transfer_read; /* Amount of RDB read from master during sync. */ @@ -2068,6 +2082,9 @@ struct redisMaster { time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */ time_t repl_down_since; /* Unix time at which link with master went down */ + class SnapshotPayloadParseState *parseState; + sds bulkreadBuffer = nullptr; + unsigned char master_uuid[UUID_BINARY_LEN]; /* Used during sync with master, this is our master's UUID */ /* After we've connected with our master use the UUID in g_pserver->master */ uint64_t mvccLastSync; @@ -2829,7 +2846,7 @@ void addReplyDouble(client *c, double d); void addReplyHumanLongDouble(client *c, long double d); void addReplyLongLong(client *c, long long ll); #ifdef __cplusplus -void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix); +void addReplyLongLongWithPrefix(client *c, long long ll, char prefix); #endif void addReplyArrayLen(client *c, long length); void addReplyMapLen(client *c, long length); diff --git a/src/storage/rocksdbfactor_internal.h b/src/storage/rocksdbfactor_internal.h new file mode 100644 index 000000000..beb71d87e --- /dev/null +++ b/src/storage/rocksdbfactor_internal.h @@ -0,0 +1,28 @@ +#pragma once +#include "rocksdb.h" + +class RocksDBStorageFactory : public IStorageFactory +{ + std::shared_ptr m_spdb; // Note: This must be first so it is deleted last + std::vector> m_vecspcols; + std::shared_ptr m_pfilemanager; + std::string m_path; + bool m_fCreatedTempFolder = false; + +public: + RocksDBStorageFactory(const char *dbfile, int dbnum, const char *rgchConfig, size_t cchConfig); + ~RocksDBStorageFactory(); + + virtual IStorage *create(int db, key_load_iterator iter, void *privdata) override; + virtual const char *name() const override; + + virtual size_t totalDiskspaceUsed() const override; + + virtual bool FSlow() const override { return true; } + + virtual size_t filedsRequired() const override; + std::string getTempFolder(); + +private: + void setVersion(rocksdb::ColumnFamilyHandle*); +}; \ No newline at end of file diff --git a/tests/integration/replication-fast.tcl b/tests/integration/replication-fast.tcl new file mode 100644 index 000000000..f190524c1 --- /dev/null +++ b/tests/integration/replication-fast.tcl @@ -0,0 +1,34 @@ +proc prepare_value {size} { + set _v "c" + for {set i 1} {$i < $size} {incr i} { + append _v 0 + } + return $_v +} + +start_server {tags {"replication-fast"} overrides {storage-provider {flash ./rocks.db.master}}} { + set slave [srv 0 client] + set slave_host [srv 0 host] + set slave_port [srv 0 port] + start_server {tags {} overrides {storage-provider {flash ./rocks.db.replica}}} { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + + test "fast replication with large value" { + set _v [prepare_value [expr 64*1024*1024]] + # $master set key $_v + + $slave replicaof $master_host $master_port + wait_for_condition 50 300 { + [lindex [$slave role] 0] eq {slave} && + [string match {*master_link_status:up*} [$slave info replication]] + } else { + fail "Can't turn the instance into a replica" + } + + assert_equal [$slave debug digest] [$master debug digest] + $slave replicaof no one + } + } +} diff --git a/tests/support/server.tcl b/tests/support/server.tcl index c06dd0561..8b2aab137 100644 --- a/tests/support/server.tcl +++ b/tests/support/server.tcl @@ -1,4 +1,5 @@ set ::global_overrides {} +set ::global_storage_provider {} set ::tags {} set ::valgrind_errors {} @@ -392,6 +393,9 @@ proc start_server {options {code undefined}} { foreach {directive arguments} [concat $::global_overrides $overrides] { dict set config $directive $arguments } + foreach {directive argument1 argument2} $::global_storage_provider { + dict set config $directive $argument1 $argument2 + } # remove directives that are marked to be omitted foreach directive $omit { diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index 66f039511..f9ca7107b 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -59,6 +59,7 @@ set ::all_tests { integration/failover integration/redis-cli integration/redis-benchmark + integration/replication-fast unit/pubsub unit/slowlog unit/scripting @@ -692,6 +693,17 @@ for {set j 0} {$j < [llength $argv]} {incr j} { } elseif {$opt eq {--help}} { print_help_screen exit 0 + } elseif {$opt eq {--flash}} { + lappend ::global_storage_provider storage-provider + lappend ::global_storage_provider flash + lappend ::global_storage_provider ./rocks.db + set ::all_tests { + integration/replication + integration/replication-2 + integration/replication-3 + integration/replication-4 + integration/replication-psync + } } else { puts "Wrong argument: $opt" exit 1