diff --git a/src/IStorage.h b/src/IStorage.h index d1f316022..ad956beb6 100644 --- a/src/IStorage.h +++ b/src/IStorage.h @@ -2,6 +2,8 @@ #include #include "sds.h" +#define METADATA_DB_IDENTIFIER "c299fde0-6d42-4ec4-b939-34f680ffe39f" + class IStorageFactory { public: @@ -9,6 +11,7 @@ public: virtual ~IStorageFactory() {} virtual class IStorage *create(int db, key_load_iterator itr, void *privdata) = 0; + virtual class IStorage *createMetadataDb() = 0; virtual const char *name() const = 0; virtual size_t totalDiskspaceUsed() const = 0; virtual bool FSlow() const = 0; @@ -30,10 +33,10 @@ public: virtual bool enumerate(callback fn) const = 0; virtual size_t count() const = 0; - virtual void bulkInsert(sds *rgkeys, sds *rgvals, size_t celem) { + virtual void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem) { beginWriteBatch(); for (size_t ielem = 0; ielem < celem; ++ielem) { - insert(rgkeys[ielem], sdslen(rgkeys[ielem]), rgvals[ielem], sdslen(rgvals[ielem]), false); + insert(rgkeys[ielem], rgcbkeys[ielem], rgvals[ielem], rgcbvals[ielem], false); } endWriteBatch(); } diff --git a/src/Makefile b/src/Makefile index 13f5209f7..d275cfea1 100644 --- a/src/Makefile +++ b/src/Makefile @@ -354,6 +354,7 @@ endif REDIS_SERVER_NAME=keydb-server$(PROG_SUFFIX) REDIS_SENTINEL_NAME=keydb-sentinel$(PROG_SUFFIX) REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o t_nhash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o cron.o connection.o tls.o sha256.o motd_server.o timeout.o setcpuaffinity.o AsyncWorkQueue.o snapshot.o storage/rocksdb.o storage/rocksdbfactory.o storage/teststorageprovider.o keydbutils.o StorageCache.o monotonic.o cli_common.o mt19937-64.o $(ASM_OBJ) +KEYDB_SERVER_OBJ=SnapshotPayloadParseState.o REDIS_CLI_NAME=keydb-cli$(PROG_SUFFIX) REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crcspeed.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o motd_client.o monotonic.o cli_common.o mt19937-64.o $(ASM_OBJ) REDIS_BENCHMARK_NAME=keydb-benchmark$(PROG_SUFFIX) @@ -410,7 +411,7 @@ endif @touch $@ # keydb-server -$(REDIS_SERVER_NAME): $(REDIS_SERVER_OBJ) +$(REDIS_SERVER_NAME): $(REDIS_SERVER_OBJ) $(KEYDB_SERVER_OBJ) $(REDIS_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/lua/src/liblua.a ../deps/rocksdb/librocksdb.a $(FINAL_LIBS) # keydb-sentinel @@ -433,7 +434,7 @@ $(REDIS_CLI_NAME): $(REDIS_CLI_OBJ) $(REDIS_BENCHMARK_NAME): $(REDIS_BENCHMARK_OBJ) $(REDIS_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/hdr_histogram/hdr_histogram.o $(FINAL_LIBS) -DEP = $(REDIS_SERVER_OBJ:%.o=%.d) $(REDIS_CLI_OBJ:%.o=%.d) $(REDIS_BENCHMARK_OBJ:%.o=%.d) +DEP = $(REDIS_SERVER_OBJ:%.o=%.d) $(KEYDB_SERVER_OBJ:%.o=%.d) $(REDIS_CLI_OBJ:%.o=%.d) $(REDIS_BENCHMARK_OBJ:%.o=%.d) -include $(DEP) # Because the jemalloc.h header is generated as a part of the jemalloc build, diff --git a/src/SnapshotPayloadParseState.cpp b/src/SnapshotPayloadParseState.cpp new file mode 100644 index 000000000..8ba4b109b --- /dev/null +++ b/src/SnapshotPayloadParseState.cpp @@ -0,0 +1,344 @@ +#include "server.h" +#include "SnapshotPayloadParseState.h" +#include + +static const size_t SLAB_SIZE = 2*1024*1024; // Note 64 MB because we try to give 64MB to a block +class SlabAllocator +{ + class Slab { + char *m_pv = nullptr; + size_t m_cb = 0; + size_t m_cbAllocated = 0; + + public: + Slab(size_t cbAllocate) { + m_pv = (char*)zmalloc(cbAllocate); + m_cb = cbAllocate; + } + + ~Slab() { + zfree(m_pv); + } + + Slab(Slab &&other) { + m_pv = other.m_pv; + m_cb = other.m_cb; + m_cbAllocated = other.m_cbAllocated; + other.m_pv = nullptr; + other.m_cb = 0; + other.m_cbAllocated = 0; + } + + bool canAllocate(size_t cb) const { + return (m_cbAllocated + cb) <= m_cb; + } + + void *allocate(size_t cb) { + if ((m_cbAllocated + cb) > m_cb) + return nullptr; + void *pvret = m_pv + m_cbAllocated; + m_cbAllocated += cb; + return pvret; + } + }; + std::vector m_vecslabs; +public: + + void *allocate(size_t cb) { + if (m_vecslabs.empty() || !m_vecslabs.back().canAllocate(cb)) { + m_vecslabs.emplace_back(std::max(cb, SLAB_SIZE)); + } + return m_vecslabs.back().allocate(cb); + } +}; + +static uint64_t dictCStringHash(const void *key) { + return dictGenHashFunction((unsigned char*)key, strlen((char*)key)); +} + + +static void dictKeyDestructor(void *privdata, void *key) +{ + DICT_NOTUSED(privdata); + sdsfree((sds)key); +} + +static int dictCStringCompare(void *, const void *key1, const void *key2) +{ + int l1,l2; + + l1 = strlen((sds)key1); + l2 = strlen((sds)key2); + if (l1 != l2) return 0; + return memcmp(key1, key2, l1) == 0; +} + +dictType metadataLongLongDictType = { + dictCStringHash, /* hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + dictCStringCompare, /* key compare */ + dictKeyDestructor, /* key destructor */ + nullptr, /* val destructor */ + nullptr, /* allow to expand */ + nullptr /* async free destructor */ +}; + +dictType metadataDictType = { + dictSdsHash, /* hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCompare, /* key compare */ + dictSdsDestructor, /* key destructor */ + dictSdsDestructor, /* val destructor */ + nullptr, /* allow to expand */ + nullptr /* async free destructor */ +}; + + +SnapshotPayloadParseState::ParseStageName SnapshotPayloadParseState::getNextStage() { + if (stackParse.empty()) + return ParseStageName::Global; + + switch (stackParse.back().name) + { + case ParseStageName::None: + return ParseStageName::Global; + + case ParseStageName::Global: + if (stackParse.back().arraycur == 0) + return ParseStageName::MetaData; + else if (stackParse.back().arraycur == 1) + return ParseStageName::Databases; + break; + + case ParseStageName::MetaData: + return ParseStageName::KeyValuePair; + + case ParseStageName::Databases: + return ParseStageName::Dataset; + + case ParseStageName::Dataset: + return ParseStageName::KeyValuePair; + + default: + break; + } + throw "Bad protocol: corrupt state"; +} + +void SnapshotPayloadParseState::flushQueuedKeys() { + if (vecqueuedKeys.empty()) + return; + serverAssert(current_database >= 0); + + // TODO: We can't finish parse until all the work functions finish + int idb = current_database; + serverAssert(vecqueuedKeys.size() == vecqueuedVals.size()); + auto sizePrev = vecqueuedKeys.size(); + ++insertsInFlight; + auto &insertsInFlightTmp = insertsInFlight; // C++ GRRRRRRRRRRRRRRRR, we don't want to capute "this" because that's dangerous + if (current_database < cserver.dbnum) { + g_pserver->asyncworkqueue->AddWorkFunction([idb, vecqueuedKeys = std::move(this->vecqueuedKeys), vecqueuedKeysCb = std::move(this->vecqueuedKeysCb), vecqueuedVals = std::move(this->vecqueuedVals), vecqueuedValsCb = std::move(this->vecqueuedValsCb), &insertsInFlightTmp, pallocator = m_spallocator.release()]() mutable { + g_pserver->db[idb]->bulkStorageInsert(vecqueuedKeys.data(), vecqueuedKeysCb.data(), vecqueuedVals.data(), vecqueuedValsCb.data(), vecqueuedKeys.size()); + --insertsInFlightTmp; + delete pallocator; + }); + } else { + // else drop the data + vecqueuedKeys.clear(); + vecqueuedKeysCb.clear(); + vecqueuedVals.clear(); + vecqueuedValsCb.clear(); + // Note: m_spallocator will get free'd when overwritten below + } + m_spallocator = std::make_unique(); + cbQueued = 0; + vecqueuedKeys.reserve(sizePrev); + vecqueuedKeysCb.reserve(sizePrev); + vecqueuedVals.reserve(sizePrev); + vecqueuedValsCb.reserve(sizePrev); + serverAssert(vecqueuedKeys.empty()); + serverAssert(vecqueuedVals.empty()); +} + + +SnapshotPayloadParseState::SnapshotPayloadParseState() { + // This is to represent the first array the client is intended to send us + ParseStage stage; + stage.name = ParseStageName::None; + stage.arraylen = 1; + stackParse.push_back(stage); + + dictLongLongMetaData = dictCreate(&metadataLongLongDictType, nullptr); + dictMetaData = dictCreate(&metadataDictType, nullptr); + insertsInFlight = 0; + m_spallocator = std::make_unique(); +} + +SnapshotPayloadParseState::~SnapshotPayloadParseState() { + dictRelease(dictLongLongMetaData); + dictRelease(dictMetaData); +} + +const char *SnapshotPayloadParseState::getStateDebugName(ParseStage stage) { + switch (stage.name) { + case ParseStageName::None: + return "None"; + + case ParseStageName::Global: + return "Global"; + + case ParseStageName::MetaData: + return "MetaData"; + + case ParseStageName::Databases: + return "Databases"; + + case ParseStageName::KeyValuePair: + return "KeyValuePair"; + + case ParseStageName::Dataset: + return "Dataset"; + + default: + return "Unknown"; + } +} + +size_t SnapshotPayloadParseState::depth() const { return stackParse.size(); } + +void SnapshotPayloadParseState::trimState() { + while (!stackParse.empty() && (stackParse.back().arraycur == stackParse.back().arraylen)) + stackParse.pop_back(); + + if (stackParse.empty()) { + flushQueuedKeys(); + while (insertsInFlight > 0) { + // TODO: ProcessEventsWhileBlocked + aeReleaseLock(); + aeAcquireLock(); + } + } +} + +void SnapshotPayloadParseState::pushArray(long long size) { + if (stackParse.empty()) + throw "Bad Protocol: unexpected trailing data"; + + if (size == 0) { + stackParse.back().arraycur++; + return; + } + + if (stackParse.back().name == ParseStageName::Databases) { + flushQueuedKeys(); + current_database = stackParse.back().arraycur; + } + + ParseStage stage; + stage.name = getNextStage(); + stage.arraylen = size; + + if (stage.name == ParseStageName::Dataset) { + g_pserver->db[current_database]->expand(stage.arraylen); + } + + // Note: This affects getNextStage so ensure its after + stackParse.back().arraycur++; + stackParse.push_back(stage); +} + +void SnapshotPayloadParseState::pushValue(const char *rgch, long long cch) { + if (stackParse.empty()) + throw "Bad Protocol: unexpected trailing bulk string"; + + if (stackParse.back().arraycur >= static_cast(stackParse.back().arrvalues.size())) + throw "Bad protocol: Unexpected value"; + + auto &stage = stackParse.back(); + stage.arrvalues[stackParse.back().arraycur].first = (char*)m_spallocator->allocate(cch); + stage.arrvalues[stackParse.back().arraycur].second = cch; + memcpy(stage.arrvalues[stackParse.back().arraycur].first, rgch, cch); + stage.arraycur++; + switch (stage.name) { + case ParseStageName::KeyValuePair: + if (stackParse.size() < 2) + throw "Bad Protocol: unexpected bulk string"; + if (stackParse[stackParse.size()-2].name == ParseStageName::MetaData) { + if (stage.arraycur == 2) { + // We loaded both pairs + if (stage.arrvalues[0].first == nullptr || stage.arrvalues[1].first == nullptr) + throw "Bad Protocol: Got array when expecing a string"; // A baddy could make us derefence the vector when its too small + + if (!strcasecmp(stage.arrvalues[0].first, "lua")) { + /* Load the script back in memory. */ + robj *auxval = createStringObject(stage.arrvalues[1].first, stage.arrvalues[1].second); + if (luaCreateFunction(NULL,g_pserver->lua,auxval) == NULL) { + throw "Can't load Lua script"; + } + } else { + dictAdd(dictMetaData, sdsnewlen(stage.arrvalues[0].first, stage.arrvalues[0].second), sdsnewlen(stage.arrvalues[1].first, stage.arrvalues[1].second)); + } + } + } else if (stackParse[stackParse.size()-2].name == ParseStageName::Dataset) { + if (stage.arraycur == 2) { + // We loaded both pairs + if (stage.arrvalues[0].first == nullptr || stage.arrvalues[1].first == nullptr) + throw "Bad Protocol: Got array when expecing a string"; // A baddy could make us derefence the vector when its too small + vecqueuedKeys.push_back(stage.arrvalues[0].first); + vecqueuedKeysCb.push_back(stage.arrvalues[0].second); + vecqueuedVals.push_back(stage.arrvalues[1].first); + vecqueuedValsCb.push_back(stage.arrvalues[1].second); + stage.arrvalues[0].first = nullptr; + stage.arrvalues[1].first = nullptr; + cbQueued += vecqueuedKeysCb.back(); + cbQueued += vecqueuedValsCb.back(); + if (cbQueued >= queuedBatchLimit) + flushQueuedKeys(); + } + } else { + throw "Bad Protocol: unexpected bulk string"; + } + break; + + default: + throw "Bad Protocol: unexpected bulk string out of KV pair"; + } +} + +void SnapshotPayloadParseState::pushValue(long long value) { + if (stackParse.empty()) + throw "Bad Protocol: unexpected integer value"; + + stackParse.back().arraycur++; + + if (stackParse.back().arraycur != 2 || stackParse.back().arrvalues[0].first == nullptr) + throw "Bad Protocol: unexpected integer value"; + + dictEntry *de = dictAddRaw(dictLongLongMetaData, sdsnewlen(stackParse.back().arrvalues[0].first, stackParse.back().arrvalues[0].second), nullptr); + if (de == nullptr) + throw "Bad Protocol: metadata field sent twice"; + de->v.s64 = value; +} + +long long SnapshotPayloadParseState::getMetaDataLongLong(const char *szField) const { + dictEntry *de = dictFind(dictLongLongMetaData, szField); + + if (de == nullptr) { + serverLog(LL_WARNING, "Master did not send field: %s", szField); + throw false; + } + return de->v.s64; +} + +sds SnapshotPayloadParseState::getMetaDataStr(const char *szField) const { + sdsstring str(szField, strlen(szField)); + + dictEntry *de = dictFind(dictMetaData, str.get()); + if (de == nullptr) { + serverLog(LL_WARNING, "Master did not send field: %s", szField); + throw false; + } + return (sds)de->v.val; +} \ No newline at end of file diff --git a/src/SnapshotPayloadParseState.h b/src/SnapshotPayloadParseState.h new file mode 100644 index 000000000..cb1e0420a --- /dev/null +++ b/src/SnapshotPayloadParseState.h @@ -0,0 +1,66 @@ +#pragma once +#include +#include + +class SlabAllocator; + +class SnapshotPayloadParseState { + enum class ParseStageName { + None, + Global, + MetaData, + Databases, + Dataset, + KeyValuePair, + }; + struct ParseStage { + ParseStageName name; + long long arraylen = 0; + long long arraycur = 0; + std::array, 2> arrvalues; + + ParseStage() { + for (auto &pair : arrvalues) { + pair.first = nullptr; + pair.second = 0; + } + } + }; + + std::vector stackParse; + + + std::vector vecqueuedKeys; + std::vector vecqueuedKeysCb; + std::vector vecqueuedVals; + std::vector vecqueuedValsCb; + + + std::atomic insertsInFlight; + std::unique_ptr m_spallocator; + dict *dictLongLongMetaData = nullptr; + dict *dictMetaData = nullptr; + size_t cbQueued = 0; + static const size_t queuedBatchLimit = 64*1024*1024; // 64 MB + int current_database = -1; + + ParseStageName getNextStage(); + void flushQueuedKeys(); + +public: + SnapshotPayloadParseState(); + ~SnapshotPayloadParseState(); + + long long getMetaDataLongLong(const char *field) const; + sds getMetaDataStr(const char *szField) const; + + static const char *getStateDebugName(ParseStage stage); + + size_t depth() const; + + void trimState(); + void pushArray(long long size); + void pushValue(const char *rgch, long long cch); + void pushValue(long long value); + bool shouldThrottle() const { return insertsInFlight > (cserver.cthreads*4); } +}; \ No newline at end of file diff --git a/src/StorageCache.cpp b/src/StorageCache.cpp index af2ac12b7..cdc380105 100644 --- a/src/StorageCache.cpp +++ b/src/StorageCache.cpp @@ -97,17 +97,47 @@ void StorageCache::insert(sds key, const void *data, size_t cbdata, bool fOverwr m_spstorage->insert(key, sdslen(key), (void*)data, cbdata, fOverwrite); } -void StorageCache::bulkInsert(sds *rgkeys, sds *rgvals, size_t celem) +long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing); +void StorageCache::bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem) { + std::vector vechashes; + if (m_pdict != nullptr) { + vechashes.reserve(celem); + + for (size_t ielem = 0; ielem < celem; ++ielem) { + dictEntry *de = (dictEntry*)zmalloc(sizeof(dictEntry)); + de->key = (void*)dictGenHashFunction(rgkeys[ielem], (int)rgcbkeys[ielem]); + de->v.u64 = 1; + vechashes.push_back(de); + } + } + std::unique_lock ul(m_lock); bulkInsertsInProgress++; if (m_pdict != nullptr) { - for (size_t ielem = 0; ielem < celem; ++ielem) { - cacheKey(rgkeys[ielem]); + for (dictEntry *de : vechashes) { + if (dictIsRehashing(m_pdict)) dictRehash(m_pdict,1); + /* Get the index of the new element, or -1 if + * the element already exists. */ + long index; + if ((index = _dictKeyIndex(m_pdict, de->key, (uint64_t)de->key, nullptr)) == -1) { + dictEntry *de = dictFind(m_pdict, de->key); + serverAssert(de != nullptr); + de->v.s64++; + m_collisionCount++; + zfree(de); + } else { + int iht = dictIsRehashing(m_pdict) ? 1 : 0; + de->next = m_pdict->ht[iht].table[index]; + m_pdict->ht[iht].table[index] = de; + m_pdict->ht[iht].used++; + } } } ul.unlock(); - m_spstorage->bulkInsert(rgkeys, rgvals, celem); + + m_spstorage->bulkInsert(rgkeys, rgcbkeys, rgvals, rgcbvals, celem); + bulkInsertsInProgress--; } @@ -119,6 +149,14 @@ const StorageCache *StorageCache::clone() return cacheNew; } +void StorageCache::expand(uint64_t slots) +{ + std::unique_lock ul(m_lock); + if (m_pdict) { + dictExpand(m_pdict, slots); + } +} + void StorageCache::retrieve(sds key, IStorage::callbackSingle fn) const { std::unique_lock ul(m_lock); diff --git a/src/StorageCache.h b/src/StorageCache.h index 184eb60bd..879b512dc 100644 --- a/src/StorageCache.h +++ b/src/StorageCache.h @@ -40,11 +40,12 @@ public: void clear(); void insert(sds key, const void *data, size_t cbdata, bool fOverwrite); - void bulkInsert(sds *rgkeys, sds *rgvals, size_t celem); + void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem); void retrieve(sds key, IStorage::callbackSingle fn) const; bool erase(sds key); void emergencyFreeCache(); bool keycacheIsEnabled() const { return m_pdict != nullptr; } + void expand(uint64_t slots); bool enumerate(IStorage::callback fn) const { return m_spstorage->enumerate(fn); } diff --git a/src/aof.cpp b/src/aof.cpp index 326c30ba2..f99a90b2e 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -870,6 +870,11 @@ int loadAppendOnlyFile(char *filename) { fakeClient = createAOFClient(); startLoadingFile(fp, filename, RDBFLAGS_AOF_PREAMBLE); + for (int idb = 0; idb < cserver.dbnum; ++idb) + { + g_pserver->db[idb]->trackChanges(true); + } + /* Check if this AOF file has an RDB preamble. In that case we need to * load the RDB file and later continue loading the AOF tail. */ char sig[5]; /* "REDIS" */ @@ -892,11 +897,6 @@ int loadAppendOnlyFile(char *filename) { } } - for (int idb = 0; idb < cserver.dbnum; ++idb) - { - g_pserver->db[idb]->trackChanges(true); - } - /* Read the actual AOF file, in REPL format, command by command. */ while(1) { int argc, j; diff --git a/src/db.cpp b/src/db.cpp index cc6092831..8417a008a 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -2903,8 +2903,10 @@ bool redisDbPersistentData::processChanges(bool fSnapshot) { if (m_fAllChanged) { - m_spstorage->clear(); - storeDatabase(); + if (dictSize(m_pdict) > 0 || m_spstorage->count() > 0) { // in some cases we may have pre-sized the StorageCache's dict, and we don't want clear to ruin it + m_spstorage->clear(); + storeDatabase(); + } m_fAllChanged = 0; } else @@ -2936,15 +2938,19 @@ void redisDbPersistentData::processChangesAsync(std::atomic &pendingJobs) dictIterator *di = dictGetIterator(dictNew); dictEntry *de; std::vector veckeys; + std::vector veccbkeys; std::vector vecvals; + std::vector veccbvals; while ((de = dictNext(di)) != nullptr) { robj *o = (robj*)dictGetVal(de); sds temp = serializeStoredObjectAndExpire(this, (const char*) dictGetKey(de), o); veckeys.push_back((sds)dictGetKey(de)); + veccbkeys.push_back(sdslen((sds)dictGetKey(de))); vecvals.push_back(temp); + veccbvals.push_back(sdslen(temp)); } - m_spstorage->bulkInsert(veckeys.data(), vecvals.data(), veckeys.size()); + m_spstorage->bulkInsert(veckeys.data(), veccbkeys.data(), vecvals.data(), veccbvals.data(), veckeys.size()); for (auto val : vecvals) sdsfree(val); dictReleaseIterator(di); @@ -2953,6 +2959,11 @@ void redisDbPersistentData::processChangesAsync(std::atomic &pendingJobs) }); } +void redisDbPersistentData::bulkStorageInsert(char **rgKeys, size_t *rgcbKeys, char **rgVals, size_t *rgcbVals, size_t celem) +{ + m_spstorage->bulkInsert(rgKeys, rgcbKeys, rgVals, rgcbVals, celem); +} + void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot **psnapshotFree) { if (m_pdbSnapshotStorageFlush) diff --git a/src/dict.cpp b/src/dict.cpp index e10567c8d..0268c19d2 100644 --- a/src/dict.cpp +++ b/src/dict.cpp @@ -63,7 +63,7 @@ static unsigned int dict_force_resize_ratio = 5; static int _dictExpandIfNeeded(dict *ht); static unsigned long _dictNextPower(unsigned long size); -static long _dictKeyIndex(dict *ht, const void *key, uint64_t hash, dictEntry **existing); +long _dictKeyIndex(dict *ht, const void *key, uint64_t hash, dictEntry **existing); static int _dictInit(dict *ht, dictType *type, void *privDataPtr); /* -------------------------- hash functions -------------------------------- */ @@ -1366,7 +1366,7 @@ static unsigned long _dictNextPower(unsigned long size) * * Note that if we are in the process of rehashing the hash table, the * index is always returned in the context of the second (new) hash table. */ -static long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing) +long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing) { unsigned long idx, table; dictEntry *he; diff --git a/src/networking.cpp b/src/networking.cpp index 29c4ac206..4d4eb32c0 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -237,7 +237,7 @@ void clientInstallWriteHandler(client *c) { * writes at this stage. */ if (!(c->flags & CLIENT_PENDING_WRITE) && - (c->replstate == REPL_STATE_NONE || + (c->replstate == REPL_STATE_NONE || c->replstate == SLAVE_STATE_FASTSYNC_TX || c->replstate == SLAVE_STATE_FASTSYNC_DONE || (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack))) { AssertCorrectThread(c); @@ -1566,7 +1566,7 @@ bool freeClient(client *c) { /* If a client is protected, yet we need to free it right now, make sure * to at least use asynchronous freeing. */ - if (c->flags & CLIENT_PROTECTED || c->casyncOpsPending) { + if (c->flags & CLIENT_PROTECTED || c->casyncOpsPending || c->replstate == SLAVE_STATE_FASTSYNC_TX) { freeClientAsync(c); return false; } @@ -1791,7 +1791,7 @@ int writeToClient(client *c, int handler_installed) { /* We can only directly read from the replication backlog if the client is a replica, so only attempt to do so if that's the case. */ - if (c->flags & CLIENT_SLAVE && !(c->flags & CLIENT_MONITOR)) { + if (c->flags & CLIENT_SLAVE && !(c->flags & CLIENT_MONITOR) && c->replstate == SLAVE_STATE_ONLINE) { std::unique_lock repl_backlog_lock (g_pserver->repl_backlog_lock); while (clientHasPendingReplies(c)) { @@ -1833,9 +1833,11 @@ int writeToClient(client *c, int handler_installed) { } } else { while(clientHasPendingReplies(c)) { - serverAssert(!(c->flags & CLIENT_SLAVE) || c->flags & CLIENT_MONITOR); if (c->bufpos > 0) { - nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen); + auto bufpos = c->bufpos; + lock.unlock(); + nwritten = connWrite(c->conn,c->buf+c->sentlen,bufpos-c->sentlen); + lock.lock(); if (nwritten <= 0) break; c->sentlen += nwritten; totwritten += nwritten; @@ -1854,7 +1856,10 @@ int writeToClient(client *c, int handler_installed) { continue; } - nwritten = connWrite(c->conn, o->buf() + c->sentlen, o->used - c->sentlen); + auto used = o->used; + lock.unlock(); + nwritten = connWrite(c->conn, o->buf() + c->sentlen, used - c->sentlen); + lock.lock(); if (nwritten <= 0) break; c->sentlen += nwritten; totwritten += nwritten; @@ -1993,7 +1998,7 @@ void ProcessPendingAsyncWrites() ae_flags |= AE_BARRIER; } - if (!((c->replstate == REPL_STATE_NONE || + if (!((c->replstate == REPL_STATE_NONE || c->replstate == SLAVE_STATE_FASTSYNC_TX || (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))) continue; @@ -2063,9 +2068,9 @@ int handleClientsWithPendingWrites(int iel, int aof_state) { /* If a client is protected, don't do anything, * that may trigger write error or recreate handler. */ - if (flags & CLIENT_PROTECTED) continue; + if ((flags & CLIENT_PROTECTED) && !(flags & CLIENT_SLAVE)) continue; - std::unique_locklock)> lock(c->lock); + //std::unique_locklock)> lock(c->lock); /* Don't write to clients that are going to be closed anyway. */ if (c->flags & CLIENT_CLOSE_ASAP) continue; @@ -2075,11 +2080,9 @@ int handleClientsWithPendingWrites(int iel, int aof_state) { { if (c->flags & CLIENT_CLOSE_ASAP) { - lock.release(); // still locked AeLocker ae; - ae.arm(c); - if (!freeClient(c)) // writeToClient will only async close, but there's no need to wait - c->lock.unlock(); // if we just got put on the async close list, then we need to remove the lock + ae.arm(nullptr); + freeClient(c); // writeToClient will only async close, but there's no need to wait } continue; } @@ -3829,7 +3832,7 @@ void asyncCloseClientOnOutputBufferLimitReached(client *c) { if (!c->conn) return; /* It is unsafe to free fake clients. */ serverAssert(c->reply_bytes < SIZE_MAX-(1024*64)); if (c->reply_bytes == 0 || c->flags & CLIENT_CLOSE_ASAP) return; - if (checkClientOutputBufferLimits(c)) { + if (checkClientOutputBufferLimits(c) && c->replstate != SLAVE_STATE_FASTSYNC_TX) { sds client = catClientInfoString(sdsempty(),c); freeClientAsync(c); diff --git a/src/replication.cpp b/src/replication.cpp index 1f2c9043a..64fd0171e 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -34,6 +34,7 @@ #include "cluster.h" #include "bio.h" #include "aelocker.h" +#include "SnapshotPayloadParseState.h" #include #include @@ -982,6 +983,244 @@ need_full_resync: return C_ERR; } +int checkClientOutputBufferLimits(client *c); +class replicationBuffer { + std::vector replicas; + clientReplyBlock *reply = nullptr; + size_t writtenBytesTracker = 0; + +public: + replicationBuffer() { + reply = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + (PROTO_REPLY_CHUNK_BYTES*2)); + reply->size = zmalloc_usable_size(reply) - sizeof(clientReplyBlock); + reply->used = 0; + } + + ~replicationBuffer() { + zfree(reply); + } + + void addReplica(client *replica) { + replicas.push_back(replica); + replicationSetupSlaveForFullResync(replica,getPsyncInitialOffset()); + // Optimize the socket for bulk transfer + //connDisableTcpNoDelay(replica->conn); + } + + bool isActive() const { return !replicas.empty(); } + + void flushData() { + if (reply == nullptr) + return; + size_t written = reply->used; + aeAcquireLock(); + for (size_t ireplica = 0; ireplica < replicas.size(); ++ireplica) { + auto replica = replicas[ireplica]; + if (replica->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP) { + replica->replstate = REPL_STATE_NONE; + continue; + } + + while (checkClientOutputBufferLimits(replica) + && (replica->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP) == 0) { + aeReleaseLock(); + usleep(10); + aeAcquireLock(); + } + + std::unique_lock lock(replica->lock); + addReplyProto(replica, reply->buf(), reply->used); + } + ProcessPendingAsyncWrites(); + replicas.erase(std::remove_if(replicas.begin(), replicas.end(), [](const client *c)->bool{ return c->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP;}), replicas.end()); + aeReleaseLock(); + if (reply != nullptr) { + reply->used = 0; + } + writtenBytesTracker += written; + } + + void addData(const char *data, unsigned long size) { + if (reply != nullptr && (size + reply->used) > reply->size) + flushData(); + + if (reply != nullptr && reply->size < size) { + serverAssert(reply->used == 0); // flush should have happened + zfree(reply); + reply = nullptr; + } + + if (reply == nullptr) { + reply = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + std::max(size, (unsigned long)(PROTO_REPLY_CHUNK_BYTES*2))); + reply->size = zmalloc_usable_size(reply) - sizeof(clientReplyBlock); + reply->used = 0; + } + + serverAssert((reply->size - reply->used) >= size); + memcpy(reply->buf() + reply->used, data, size); + reply->used += size; + } + + void addLongWithPrefix(long val, char prefix) { + char buf[128]; + int len; + + buf[0] = prefix; + len = ll2string(buf+1,sizeof(buf)-1,val); + buf[len+1] = '\r'; + buf[len+2] = '\n'; + addData(buf, len+3); + } + + void addArrayLen(int len) { + addLongWithPrefix(len, '*'); + } + + void addLong(long val) { + addLongWithPrefix(val, ':'); + } + + void addString(const char *s, unsigned long len) { + addLongWithPrefix(len, '$'); + addData(s, len); + addData("\r\n", 2); + } + + size_t cbWritten() const { return writtenBytesTracker; } + + void end() { + flushData(); + for (auto replica : replicas) { + // Return to original settings + //if (!g_pserver->repl_disable_tcp_nodelay) + // connEnableTcpNoDelay(replica->conn); + } + } + + void putSlavesOnline() { + for (auto replica : replicas) { + replica->replstate = SLAVE_STATE_FASTSYNC_DONE; + replica->repl_put_online_on_ack = 1; + } + } +}; + +int rdbSaveSnapshotForReplication(struct rdbSaveInfo *rsi) { + // TODO: This needs to be on a background thread + int retval = C_OK; + serverAssert(GlobalLocksAcquired()); + serverLog(LL_NOTICE, "Starting storage provider fast full sync with target: %s", "disk"); + + std::shared_ptr spreplBuf = std::make_shared(); + listNode *ln; + listIter li; + client *replica = nullptr; + listRewind(g_pserver->slaves, &li); + while (replica == nullptr && (ln = listNext(&li))) { + client *replicaCur = (client*)listNodeValue(ln); + if ((replicaCur->slave_capa & SLAVE_CAPA_ROCKSDB_SNAPSHOT) && (replicaCur->replstate == SLAVE_STATE_WAIT_BGSAVE_START)) { + replica = replicaCur; + spreplBuf->addReplica(replica); + replicaCur->replstate = SLAVE_STATE_FASTSYNC_TX; + } + } + serverAssert(replica != nullptr); + + spreplBuf->addArrayLen(2); // Two sections: Metadata and databases + + // MetaData + aeAcquireLock(); + spreplBuf->addArrayLen(3 + dictSize(g_pserver->lua_scripts)); + spreplBuf->addArrayLen(2); + spreplBuf->addString("repl-stream-db", 14); + spreplBuf->addLong(rsi->repl_stream_db); + spreplBuf->addArrayLen(2); + spreplBuf->addString("repl-id", 7); + spreplBuf->addString(rsi->repl_id, CONFIG_RUN_ID_SIZE); + spreplBuf->addArrayLen(2); + spreplBuf->addString("repl-offset", 11); + spreplBuf->addLong(rsi->master_repl_offset); + + if (dictSize(g_pserver->lua_scripts)) { + dictEntry *de; + auto di = dictGetIterator(g_pserver->lua_scripts); + while((de = dictNext(di)) != NULL) { + robj *body = (robj*)dictGetVal(de); + + spreplBuf->addArrayLen(2); + spreplBuf->addString("lua", 3); + spreplBuf->addString(szFromObj(body), sdslen(szFromObj(body))); + } + dictReleaseIterator(di); + di = NULL; /* So that we don't release it again on error. */ + } + + std::shared_ptr>> spvecspsnapshot = std::make_shared>>(); + for (int idb = 0; idb < cserver.dbnum; ++idb) { + spvecspsnapshot->emplace_back(g_pserver->db[idb]->CloneStorageCache()); + } + aeReleaseLock(); + + g_pserver->asyncworkqueue->AddWorkFunction([spreplBuf = std::move(spreplBuf), spvecspsnapshot = std::move(spvecspsnapshot)]{ + int retval = C_OK; + auto timeStart = ustime(); + auto lastLogTime = timeStart; + size_t cbData = 0; + size_t cbLastUpdate = 0; + auto &replBuf = *spreplBuf; + + // Databases + replBuf.addArrayLen(cserver.dbnum); + for (int idb = 0; idb < cserver.dbnum; ++idb) { + auto &spsnapshot = (*spvecspsnapshot)[idb]; + size_t snapshotDeclaredCount = spsnapshot->count(); + replBuf.addArrayLen(snapshotDeclaredCount); + size_t count = 0; + bool result = spsnapshot->enumerate([&replBuf, &count, &cbData, &lastLogTime, timeStart, &cbLastUpdate](const char *rgchKey, size_t cchKey, const void *rgchVal, size_t cchVal) -> bool{ + replBuf.addArrayLen(2); + + replBuf.addString(rgchKey, cchKey); + replBuf.addString((const char *)rgchVal, cchVal); + ++count; + if ((count % 8092) == 0) { + auto curTime = ustime(); + if ((curTime - lastLogTime) > 60000000) { + auto usec = curTime - lastLogTime; + serverLog(LL_NOTICE, "Replication status: Transferred %zuMB (%.2fGbit/s)", replBuf.cbWritten()/1024/1024, ((replBuf.cbWritten()-cbLastUpdate)*8.0)/(usec/1000000.0)/1000000000.0); + cbLastUpdate = replBuf.cbWritten(); + lastLogTime = ustime(); + } + } + cbData += cchKey + cchVal; + return replBuf.isActive(); + }); + + if (!result) { + retval = C_ERR; + break; + } + serverAssert(count == snapshotDeclaredCount); + } + + replBuf.end(); + + if (!replBuf.isActive()) { + retval = C_ERR; + } + + serverLog(LL_NOTICE, "Snapshot replication done: %s", (retval == C_OK) ? "success" : "failed"); + auto usec = ustime() - timeStart; + serverLog(LL_NOTICE, "Transferred %zuMB total (%zuMB data) in %.2f seconds. (%.2fGbit/s)", spreplBuf->cbWritten()/1024/1024, cbData/1024/1024, usec/1000000.0, (spreplBuf->cbWritten()*8.0)/(usec/1000000.0)/1000000000.0); + if (retval == C_OK) { + aeAcquireLock(); + replBuf.putSlavesOnline(); + aeReleaseLock(); + } + }); + + return retval; +} + /* Start a BGSAVE for replication goals, which is, selecting the disk or * socket target depending on the configuration, and making sure that * the script cache is flushed before to start. @@ -1015,7 +1254,9 @@ int startBgsaveForReplication(int mincapa) { /* Only do rdbSave* when rsiptr is not NULL, * otherwise replica will miss repl-stream-db. */ if (rsiptr) { - if (socket_target) + if (mincapa & SLAVE_CAPA_ROCKSDB_SNAPSHOT && g_pserver->m_pstorageFactory) + retval = rdbSaveSnapshotForReplication(rsiptr); + else if (socket_target) retval = rdbSaveToSlavesSockets(rsiptr); else retval = rdbSaveBackground(rsiptr); @@ -1191,8 +1432,11 @@ void syncCommand(client *c) { g_pserver->replid, g_pserver->replid2); } + /* CASE 0: Fast Sync */ + if ((c->slave_capa & SLAVE_CAPA_ROCKSDB_SNAPSHOT) && g_pserver->m_pstorageFactory) { + serverLog(LL_NOTICE,"Fast SYNC on next replication cycle"); /* CASE 1: BGSAVE is in progress, with disk target. */ - if (g_pserver->FRdbSaveInProgress() && + } else if (g_pserver->FRdbSaveInProgress() && g_pserver->rdb_child_type == RDB_CHILD_TYPE_DISK) { /* Ok a background save is in progress. Let's check if it is a good @@ -1387,6 +1631,8 @@ void replconfCommand(client *c) { c->slave_capa |= SLAVE_CAPA_PSYNC2; else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]), "activeExpire")) c->slave_capa |= SLAVE_CAPA_ACTIVE_EXPIRE; + else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]), "rocksdb-snapshot-load")) + c->slave_capa |= SLAVE_CAPA_ROCKSDB_SNAPSHOT; fCapaCommand = true; } else if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"ack")) { @@ -1411,7 +1657,7 @@ void replconfCommand(client *c) { * quick check first (instead of waiting for the next ACK. */ if (g_pserver->child_type == CHILD_TYPE_RDB && c->replstate == SLAVE_STATE_WAIT_BGSAVE_END) checkChildrenDone(); - if (c->repl_put_online_on_ack && c->replstate == SLAVE_STATE_ONLINE) + if (c->repl_put_online_on_ack && (c->replstate == SLAVE_STATE_ONLINE || c->replstate == SLAVE_STATE_FASTSYNC_DONE)) putSlaveOnline(c); /* Note: this command does not reply anything! */ return; @@ -1453,6 +1699,8 @@ void replconfCommand(client *c) { sds reply = sdsnew("+OK"); if (g_pserver->fActiveReplica) reply = sdscat(reply, " active-replica"); + if (g_pserver->m_pstorageFactory && (c->slave_capa & SLAVE_CAPA_ROCKSDB_SNAPSHOT) && !g_pserver->fActiveReplica) + reply = sdscat(reply, " rocksdb-snapshot-save"); reply = sdscat(reply, "\r\n"); addReplySds(c, reply); } else { @@ -1898,6 +2146,68 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) } } +/* Save the replid of yourself and any connected masters to storage. + * Returns if no storage provider is used. */ +void saveMasterStatusToStorage() +{ + if (!g_pserver->m_pstorageFactory || !g_pserver->metadataDb) return; + + g_pserver->metadataDb->insert("repl-id", 7, g_pserver->replid, sizeof(g_pserver->replid), true); + g_pserver->metadataDb->insert("repl-offset", 11, &g_pserver->master_repl_offset, sizeof(g_pserver->master_repl_offset), true); + if (g_pserver->fActiveReplica || (!listLength(g_pserver->masters) && g_pserver->repl_backlog)) { + g_pserver->metadataDb->insert("repl-stream-db", 14, g_pserver->replicaseldb == -1 ? 0 : &g_pserver->replicaseldb, + g_pserver->replicaseldb == -1 ? 0 : sizeof(g_pserver->replicaseldb), true); + } + + struct redisMaster *miFirst = (redisMaster*)(listLength(g_pserver->masters) ? listNodeValue(listFirst(g_pserver->masters)) : NULL); + + if (miFirst && miFirst->master) { + g_pserver->metadataDb->insert("repl-stream-db", 14, &miFirst->master->db->id, sizeof(miFirst->master->db->id), true); + } + else if (miFirst && miFirst->cached_master) { + g_pserver->metadataDb->insert("repl-stream-db", 14, &miFirst->cached_master->db->id, sizeof(miFirst->cached_master->db->id), true); + } + + if (listLength(g_pserver->masters) == 0) { + g_pserver->metadataDb->insert("repl-masters", 12, (void*)"", 0, true); + return; + } + sds val = sds(sdsempty()); + listNode *ln; + listIter li; + redisMaster *mi; + listRewind(g_pserver->masters,&li); + while((ln = listNext(&li)) != NULL) { + mi = (redisMaster*)listNodeValue(ln); + if (mi->masterhost == NULL) { + // if we don't know the host, no reason to save + continue; + } + if (!mi->master) { + // If master client is not available, use info from master struct - better than nothing + if (mi->master_replid[0] == 0) { + // if replid is null, there's no reason to save it + continue; + } + val = sdscatfmt(val, "%s:%I:%s:%i;", mi->master_replid, + mi->master_initial_offset, + mi->masterhost, + mi->masterport); + } + else { + if (mi->master->replid[0] == 0) { + // if replid is null, there's no reason to save it + continue; + } + val = sdscatfmt(val, "%s:%I:%s:%i;", mi->master->replid, + mi->master->reploff, + mi->masterhost, + mi->masterport); + } + } + g_pserver->metadataDb->insert("repl-masters", 12, (void*)val, sdslen(val), true); +} + /* Change the current instance replication ID with a new, random one. * This will prevent successful PSYNCs between this master and other * slaves, so the command should be called when something happens that @@ -1905,6 +2215,7 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) void changeReplicationId(void) { getRandomHexChars(g_pserver->replid,CONFIG_RUN_ID_SIZE); g_pserver->replid[CONFIG_RUN_ID_SIZE] = '\0'; + saveMasterStatusToStorage(); } @@ -2127,25 +2438,183 @@ void disklessLoadDiscardBackup(const dbBackup *buckup, int flag) { discardDbBackup(buckup, flag, replicationEmptyDbCallback); } +size_t parseCount(const char *rgch, size_t cch, long long *pvalue) { + size_t cchNumeral = 0; + + if (cch > cchNumeral+1 && rgch[cchNumeral+1] == '-') ++cchNumeral; + while ((cch > (cchNumeral+1)) && isdigit(rgch[1 + cchNumeral])) ++cchNumeral; + + if (cch < (cchNumeral+1+2)) { // +2 is for the \r\n we expect + throw true; // continuable + } + + if (rgch[cchNumeral+1] != '\r' || rgch[cchNumeral+2] != '\n') { + serverLog(LL_WARNING, "Bad protocol from MASTER: %s", rgch); + throw false; + } + + if (!string2ll(rgch+1, cchNumeral, pvalue)) { + serverLog(LL_WARNING, "Bad protocol from MASTER: %s", rgch); + throw false; + } + + return cchNumeral + 3; +} + +bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi) { + int fUpdate = g_pserver->fActiveReplica || g_pserver->enable_multimaster; + serverAssert(GlobalLocksAcquired()); + serverAssert(mi->master == nullptr); + bool fFinished = false; + + if (mi->bulkreadBuffer == nullptr) { + mi->bulkreadBuffer = sdsempty(); + mi->parseState = new SnapshotPayloadParseState(); + if (g_pserver->aof_state != AOF_OFF) stopAppendOnly(); + if (!fUpdate) { + int empty_db_flags = g_pserver->repl_slave_lazy_flush ? EMPTYDB_ASYNC : + EMPTYDB_NO_FLAGS; + serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data"); + emptyDb(-1,empty_db_flags,replicationEmptyDbCallback); + for (int idb = 0; idb < cserver.dbnum; ++idb) { + aeAcquireLock(); + g_pserver->db[idb]->processChanges(false); + aeReleaseLock(); + g_pserver->db[idb]->commitChanges(); + g_pserver->db[idb]->trackChanges(false); + } + } + } + + for (int iter = 0; iter < 10; ++iter) { + if (mi->parseState->shouldThrottle()) + return false; + + auto readlen = PROTO_IOBUF_LEN; + auto qblen = sdslen(mi->bulkreadBuffer); + mi->bulkreadBuffer = sdsMakeRoomFor(mi->bulkreadBuffer, readlen); + + auto nread = connRead(conn, mi->bulkreadBuffer+qblen, readlen); + if (nread <= 0) { + if (connGetState(conn) != CONN_STATE_CONNECTED) + cancelReplicationHandshake(mi, true); + return false; + } + mi->repl_transfer_lastio = g_pserver->unixtime; + sdsIncrLen(mi->bulkreadBuffer,nread); + + size_t offset = 0; + + try { + if (sdslen(mi->bulkreadBuffer) > cserver.client_max_querybuf_len) { + throw "Full Sync Streaming Buffer Exceeded (increase client_max_querybuf_len)"; + } + + while (sdslen(mi->bulkreadBuffer) > offset) { + // Pop completed items + mi->parseState->trimState(); + + if (mi->bulkreadBuffer[offset] == '*') { + // Starting an array + long long arraySize; + + // Lets read the array length + offset += parseCount(mi->bulkreadBuffer + offset, sdslen(mi->bulkreadBuffer) - offset, &arraySize); + if (arraySize < 0) + throw "Invalid array size"; + + mi->parseState->pushArray(arraySize); + } else if (mi->bulkreadBuffer[offset] == '$') { + // Loading in a string + long long payloadsize = 0; + + // Lets read the string length + size_t offsetCount = parseCount(mi->bulkreadBuffer + offset, sdslen(mi->bulkreadBuffer) - offset, &payloadsize); + if (payloadsize < 0) + throw "Invalid array size"; + + // OK we know how long the string is, now lets make sure the payload is here. + if (sdslen(mi->bulkreadBuffer) < (offset + offsetCount + payloadsize + 2)) { + goto LContinue; // wait for more data (note: we could throw true here, but throw is way more expensive) + } + + mi->parseState->pushValue(mi->bulkreadBuffer + offset + offsetCount, payloadsize); + + // On to the next one + offset += offsetCount + payloadsize + 2; + } else if (mi->bulkreadBuffer[offset] == ':') { + // Numeral + long long value; + + size_t offsetValue = parseCount(mi->bulkreadBuffer + offset, sdslen(mi->bulkreadBuffer) - offset, &value); + + mi->parseState->pushValue(value); + offset += offsetValue; + } else { + serverLog(LL_WARNING, "Bad protocol from MASTER: %s", mi->bulkreadBuffer+offset); + cancelReplicationHandshake(mi, true); + return false; + } + } + + sdsrange(mi->bulkreadBuffer, offset, -1); + offset = 0; + + // Cleanup the remaining stack + mi->parseState->trimState(); + + if (mi->parseState->depth() != 0) + return false; + + static_assert(sizeof(long) == sizeof(long long),""); + rsi.repl_stream_db = mi->parseState->getMetaDataLongLong("repl-stream-db"); + rsi.repl_offset = mi->parseState->getMetaDataLongLong("repl-offset"); + sds str = mi->parseState->getMetaDataStr("repl-id"); + if (sdslen(str) == CONFIG_RUN_ID_SIZE+1) { + memcpy(rsi.repl_id, str, CONFIG_RUN_ID_SIZE+1); + } + + fFinished = true; + break; // We're done!!! + } + catch (const char *sz) { + serverLog(LL_WARNING, "%s", sz); + cancelReplicationHandshake(mi, true); + return false; + } catch (bool fContinue) { + if (!fContinue) { + cancelReplicationHandshake(mi, true); + return false; + } + } + LContinue: + sdsrange(mi->bulkreadBuffer, offset, -1); + } + + if (!fFinished) + return false; + + serverLog(LL_NOTICE, "Fast sync complete"); + sdsfree(mi->bulkreadBuffer); + mi->bulkreadBuffer = nullptr; + delete mi->parseState; + mi->parseState = nullptr; + return true; +} + /* Asynchronously read the SYNC payload we receive from a master */ #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */ -void readSyncBulkPayload(connection *conn) { +bool readSyncBulkPayloadRdb(connection *conn, redisMaster *mi, rdbSaveInfo &rsi, int &usemark) { char buf[PROTO_IOBUF_LEN]; ssize_t nread, readlen, nwritten; int use_diskless_load = useDisklessLoad(); const dbBackup *diskless_load_backup = NULL; - rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; rsi.fForceSetKey = !!g_pserver->fActiveReplica; int empty_db_flags = g_pserver->repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS; off_t left; // Should we update our database, or create from scratch? int fUpdate = g_pserver->fActiveReplica || g_pserver->enable_multimaster; - redisMaster *mi = (redisMaster*)connGetPrivateData(conn); - if (mi == nullptr) { - // We're about to be free'd so bail out - return; - } serverAssert(GlobalLocksAcquired()); serverAssert(mi->master == nullptr); @@ -2154,7 +2623,6 @@ void readSyncBulkPayload(connection *conn) { * from the server: when they match, we reached the end of the transfer. */ static char eofmark[CONFIG_RUN_ID_SIZE]; static char lastbytes[CONFIG_RUN_ID_SIZE]; - static int usemark = 0; /* If repl_transfer_size == -1 we still have to read the bulk length * from the master reply. */ @@ -2176,7 +2644,7 @@ void readSyncBulkPayload(connection *conn) { * the connection live. So we refresh our last interaction * timestamp. */ mi->repl_transfer_lastio = g_pserver->unixtime; - return; + return false; } else if (buf[0] != '$') { serverLog(LL_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf); goto error; @@ -2210,7 +2678,7 @@ void readSyncBulkPayload(connection *conn) { (long long) mi->repl_transfer_size, use_diskless_load? "to parser":"to disk"); } - return; + return false; } if (!use_diskless_load) { @@ -2227,12 +2695,12 @@ void readSyncBulkPayload(connection *conn) { if (nread <= 0) { if (connGetState(conn) == CONN_STATE_CONNECTED) { /* equivalent to EAGAIN */ - return; + return false; } serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s", (nread == -1) ? strerror(errno) : "connection lost"); cancelReplicationHandshake(mi, true); - return; + return false; } g_pserver->stat_net_input_bytes += nread; @@ -2301,7 +2769,7 @@ void readSyncBulkPayload(connection *conn) { /* If the transfer is yet not complete, we need to read more, so * return ASAP and wait for the handler to be called again. */ - if (!eof_reached) return; + if (!eof_reached) return false; } /* We reach this point in one of the following cases: @@ -2373,7 +2841,7 @@ void readSyncBulkPayload(connection *conn) { /* Note that there's no point in restarting the AOF on SYNC * failure, it'll be restarted when sync succeeds or the replica * gets promoted. */ - return; + return false; } /* RDB loading succeeded if we reach this point. */ @@ -2393,7 +2861,7 @@ void readSyncBulkPayload(connection *conn) { serverLog(LL_WARNING,"Replication stream EOF marker is broken"); cancelReplicationHandshake(mi,true); rioFreeConn(&rdb, NULL); - return; + return false; } } @@ -2425,7 +2893,7 @@ void readSyncBulkPayload(connection *conn) { "MASTER <-> REPLICA synchronization: %s", strerror(errno)); cancelReplicationHandshake(mi,true); - return; + return false; } /* Rename rdb like renaming rewrite aof asynchronously. */ @@ -2438,7 +2906,7 @@ void readSyncBulkPayload(connection *conn) { g_pserver->rdb_filename, strerror(errno)); cancelReplicationHandshake(mi,true); if (old_rdb_fd != -1) close(old_rdb_fd); - return; + return false; } rdb_filename = g_pserver->rdb_filename; @@ -2468,7 +2936,7 @@ void readSyncBulkPayload(connection *conn) { } /* Note that there's no point in restarting the AOF on sync failure, it'll be restarted when sync succeeds or replica promoted. */ - return; + return false; } /* Cleanup. */ @@ -2486,6 +2954,32 @@ void readSyncBulkPayload(connection *conn) { mi->repl_transfer_tmpfile = NULL; } + return true; +error: + cancelReplicationHandshake(mi,true); + return false; +} + +void readSyncBulkPayload(connection *conn) { + rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; + redisMaster *mi = (redisMaster*)connGetPrivateData(conn); + static int usemark = 0; + if (mi == nullptr) { + // We're about to be free'd so bail out + return; + } + + if (mi->isRocksdbSnapshotRepl) { + if (!readSnapshotBulkPayload(conn, mi, rsi)) + return; + } else { + if (!readSyncBulkPayloadRdb(conn, mi, rsi, usemark)) + return; + } + + // Should we update our database, or create from scratch? + int fUpdate = g_pserver->fActiveReplica || g_pserver->enable_multimaster; + /* Final setup of the connected slave <- master link */ replicationCreateMasterClient(mi,mi->repl_transfer_s,rsi.repl_stream_db); mi->repl_transfer_s = nullptr; @@ -2513,6 +3007,7 @@ void readSyncBulkPayload(connection *conn) { g_pserver->master_repl_offset = mi->master->reploff; if (g_pserver->repl_batch_offStart >= 0) g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; + saveMasterStatusToStorage(); } clearReplicationId2(); @@ -2528,17 +3023,13 @@ void readSyncBulkPayload(connection *conn) { } /* Send the initial ACK immediately to put this replica in online state. */ - if (usemark) replicationSendAck(mi); + if (usemark || mi->isRocksdbSnapshotRepl) replicationSendAck(mi); /* Restart the AOF subsystem now that we finished the sync. This * will trigger an AOF rewrite, and when done will start appending * to the new file. */ if (g_pserver->aof_enabled) restartAOFAfterSYNC(); return; - -error: - cancelReplicationHandshake(mi,true); - return; } char *receiveSynchronousResponse(redisMaster *mi, connection *conn) { @@ -2861,12 +3352,15 @@ void parseMasterCapa(redisMaster *mi, sds strcapa) char *pchEnd = szStart; mi->isActive = false; + mi->isRocksdbSnapshotRepl = false; for (;;) { if (*pchEnd == ' ' || *pchEnd == '\0') { // Parse the word if (strncmp(szStart, "active-replica", pchEnd - szStart) == 0) { mi->isActive = true; + } else if (strncmp(szStart, "rocksdb-snapshot-save", pchEnd - szStart) == 0) { + mi->isRocksdbSnapshotRepl = true; } szStart = pchEnd + 1; } @@ -2998,8 +3492,20 @@ void syncWithMaster(connection *conn) { * PSYNC2: supports PSYNC v2, so understands +CONTINUE . * * The master will ignore capabilities it does not understand. */ - err = sendCommand(conn,"REPLCONF", - "capa","eof","capa","psync2","capa","activeExpire",NULL); + + + std::vector veccapabilities = { + "REPLCONF", + "capa","eof", + "capa","psync2", + "capa","activeExpire", + }; + if (g_pserver->m_pstorageFactory && !g_pserver->fActiveReplica && g_pserver->repl_diskless_load != REPL_DISKLESS_LOAD_SWAPDB) { + veccapabilities.push_back("capa"); + veccapabilities.push_back("rocksdb-snapshot-load"); + } + + err = sendCommandArgv(conn, veccapabilities.size(), veccapabilities.data(), nullptr); if (err) goto write_error; /* Send UUID */ @@ -3343,6 +3849,15 @@ void replicationAbortSyncTransfer(redisMaster *mi) { * * Otherwise zero is returned and no operation is performed at all. */ int cancelReplicationHandshake(redisMaster *mi, int reconnect) { + if (mi->bulkreadBuffer != nullptr) { + sdsfree(mi->bulkreadBuffer); + mi->bulkreadBuffer = nullptr; + } + if (mi->parseState) { + delete mi->parseState; + mi->parseState = nullptr; + } + if (mi->repl_state == REPL_STATE_TRANSFER) { replicationAbortSyncTransfer(mi); mi->repl_state = REPL_STATE_CONNECT; @@ -3454,6 +3969,7 @@ struct redisMaster *replicationAddMaster(char *ip, int port) { mi->masterhost, mi->masterport); connectWithMaster(mi); } + saveMasterStatusToStorage(); return mi; } @@ -3537,6 +4053,8 @@ void replicationUnsetMaster(redisMaster *mi) { /* Restart the AOF subsystem in case we shut it down during a sync when * we were still a slave. */ if (g_pserver->aof_enabled && g_pserver->aof_state == AOF_OFF) restartAOFAfterSYNC(); + + saveMasterStatusToStorage(); } /* This function is called when the replica lose the connection with the @@ -3568,6 +4086,8 @@ void replicationHandleMasterDisconnection(redisMaster *mi) { mi->masterhost, mi->masterport); connectWithMaster(mi); } + + saveMasterStatusToStorage(); } } @@ -4336,6 +4856,13 @@ void replicationCron(void) { client *replica = (client*)ln->value; std::unique_lock ul(replica->lock); + if (replica->replstate == SLAVE_STATE_FASTSYNC_DONE && !clientHasPendingReplies(replica)) { + serverLog(LL_WARNING, "Putting replica online"); + replica->postFunction([](client *c){ + putSlaveOnline(c); + }); + } + if (replica->replstate == SLAVE_STATE_ONLINE) { if (replica->flags & CLIENT_PRE_PSYNC) continue; diff --git a/src/sds.h b/src/sds.h index d16906f30..0d77f2772 100644 --- a/src/sds.h +++ b/src/sds.h @@ -369,6 +369,8 @@ public: bool operator==(const char *other) const { + if (other == nullptr || m_str == nullptr) + return other == m_str; return sdscmp(m_str, other) == 0; } @@ -396,8 +398,11 @@ public: {} sdsstring(const sdsstring &other) - : sdsview(sdsdup(other.m_str)) - {} + : sdsview() + { + if (other.m_str != nullptr) + m_str = sdsdup(other.m_str); + } sdsstring(const char *rgch, size_t cch) : sdsview(sdsnewlen(rgch, cch)) diff --git a/src/server.cpp b/src/server.cpp index 6827ad675..c697e5b84 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -3984,6 +3984,38 @@ void initServer(void) { slowlogInit(); latencyMonitorInit(); + if (g_pserver->m_pstorageFactory) { + g_pserver->metadataDb = g_pserver->m_pstorageFactory->createMetadataDb(); + if (g_pserver->metadataDb) { + g_pserver->metadataDb->retrieve("repl-id", 7, [&](const char *, size_t, const void *data, size_t cb){ + if (cb == sizeof(g_pserver->replid)) { + memcpy(g_pserver->replid, data, cb); + } + }); + g_pserver->metadataDb->retrieve("repl-offset", 11, [&](const char *, size_t, const void *data, size_t cb){ + if (cb == sizeof(g_pserver->replid)) { + g_pserver->master_repl_offset = *(long long*)data; + } + }); + + listIter li; + listNode *ln; + + listRewind(g_pserver->masters, &li); + while ((ln = listNext(&li))) + { + redisMaster *mi = (redisMaster*)listNodeValue(ln); + /* If we are a replica, create a cached master from this + * information, in order to allow partial resynchronizations + * with masters. */ + replicationCacheMasterUsingMyself(mi); + g_pserver->metadataDb->retrieve("repl-stream-db", 14, [&](const char *, size_t, const void *data, size_t){ + selectDb(mi->cached_master, *(int*)data); + }); + } + } + } + /* We have to initialize storage providers after the cluster has been initialized */ for (int idb = 0; idb < cserver.dbnum; ++idb) { diff --git a/src/server.h b/src/server.h index a93d4008d..0d2e7790e 100644 --- a/src/server.h +++ b/src/server.h @@ -607,12 +607,15 @@ typedef enum { #define SLAVE_STATE_WAIT_BGSAVE_END 7 /* Waiting RDB file creation to finish. */ #define SLAVE_STATE_SEND_BULK 8 /* Sending RDB file to replica. */ #define SLAVE_STATE_ONLINE 9 /* RDB file transmitted, sending just updates. */ +#define SLAVE_STATE_FASTSYNC_TX 10 +#define SLAVE_STATE_FASTSYNC_DONE 11 /* Slave capabilities. */ #define SLAVE_CAPA_NONE 0 #define SLAVE_CAPA_EOF (1<<0) /* Can parse the RDB EOF streaming format. */ #define SLAVE_CAPA_PSYNC2 (1<<1) /* Supports PSYNC2 protocol. */ #define SLAVE_CAPA_ACTIVE_EXPIRE (1<<2) /* Will the slave perform its own expirations? (Don't send delete) */ +#define SLAVE_CAPA_ROCKSDB_SNAPSHOT (1<<3) /* Synchronous read timeout - replica side */ #define CONFIG_REPL_SYNCIO_TIMEOUT 5 @@ -1102,7 +1105,12 @@ public: size_t slots() const { return dictSlots(m_pdict); } size_t size(bool fCachedOnly = false) const; - void expand(uint64_t slots) { dictExpand(m_pdict, slots); } + void expand(uint64_t slots) { + if (m_spstorage) + m_spstorage->expand(slots); + else + dictExpand(m_pdict, slots); + } void trackkey(robj_roptr o, bool fUpdate) { @@ -1193,6 +1201,9 @@ public: bool FSnapshot() const { return m_spdbSnapshotHOLDER != nullptr; } + std::unique_ptr CloneStorageCache() { return std::unique_ptr(m_spstorage->clone()); } + void bulkStorageInsert(char **rgKeys, size_t *rgcbKeys, char **rgVals, size_t *rgcbVals, size_t celem); + dict_iter find_cached_threadsafe(const char *key) const; protected: @@ -1352,6 +1363,8 @@ struct redisDb : public redisDbPersistentDataSnapshot using redisDbPersistentData::prepOverwriteForSnapshot; using redisDbPersistentData::FRehashing; using redisDbPersistentData::FTrackingChanges; + using redisDbPersistentData::CloneStorageCache; + using redisDbPersistentData::bulkStorageInsert; public: expireset::setiter expireitr; @@ -1673,7 +1686,7 @@ struct client { size_t argv_len_sumActive = 0; bool FPendingReplicaWrite() const { - return repl_curr_off != repl_end_off; + return repl_curr_off != repl_end_off && replstate == SLAVE_STATE_ONLINE; } // post a function from a non-client thread to run on its client thread @@ -2058,6 +2071,7 @@ struct redisMaster { long long master_initial_offset; /* Master PSYNC offset. */ bool isActive = false; + bool isRocksdbSnapshotRepl = false; int repl_state; /* Replication status if the instance is a replica */ off_t repl_transfer_size; /* Size of RDB to read from master during sync. */ off_t repl_transfer_read; /* Amount of RDB read from master during sync. */ @@ -2068,6 +2082,9 @@ struct redisMaster { time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */ time_t repl_down_since; /* Unix time at which link with master went down */ + class SnapshotPayloadParseState *parseState; + sds bulkreadBuffer = nullptr; + unsigned char master_uuid[UUID_BINARY_LEN]; /* Used during sync with master, this is our master's UUID */ /* After we've connected with our master use the UUID in g_pserver->master */ uint64_t mvccLastSync; @@ -2154,6 +2171,7 @@ struct redisServer { mode_t umask; /* The umask value of the process on startup */ std::atomic hz; /* serverCron() calls frequency in hertz */ int in_fork_child; /* indication that this is a fork child */ + IStorage *metadataDb = nullptr; redisDb **db = nullptr; dict *commands; /* Command table */ dict *orig_commands; /* Command table before command renaming. */ @@ -2831,7 +2849,7 @@ void addReplyDouble(client *c, double d); void addReplyHumanLongDouble(client *c, long double d); void addReplyLongLong(client *c, long long ll); #ifdef __cplusplus -void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix); +void addReplyLongLongWithPrefix(client *c, long long ll, char prefix); #endif void addReplyArrayLen(client *c, long length); void addReplyMapLen(client *c, long length); diff --git a/src/storage/rocksdbfactor_internal.h b/src/storage/rocksdbfactor_internal.h new file mode 100644 index 000000000..8a7df2cf5 --- /dev/null +++ b/src/storage/rocksdbfactor_internal.h @@ -0,0 +1,29 @@ +#pragma once +#include "rocksdb.h" + +class RocksDBStorageFactory : public IStorageFactory +{ + std::shared_ptr m_spdb; // Note: This must be first so it is deleted last + std::vector> m_vecspcols; + std::shared_ptr m_pfilemanager; + std::string m_path; + bool m_fCreatedTempFolder = false; + +public: + RocksDBStorageFactory(const char *dbfile, int dbnum, const char *rgchConfig, size_t cchConfig); + ~RocksDBStorageFactory(); + + virtual IStorage *create(int db, key_load_iterator iter, void *privdata) override; + virtual IStorage *createMetadataDb() override; + virtual const char *name() const override; + + virtual size_t totalDiskspaceUsed() const override; + + virtual bool FSlow() const override { return true; } + + virtual size_t filedsRequired() const override; + std::string getTempFolder(); + +private: + void setVersion(rocksdb::ColumnFamilyHandle*); +}; \ No newline at end of file diff --git a/src/storage/teststorageprovider.cpp b/src/storage/teststorageprovider.cpp index 188a339f3..a287397c7 100644 --- a/src/storage/teststorageprovider.cpp +++ b/src/storage/teststorageprovider.cpp @@ -6,6 +6,13 @@ IStorage *TestStorageFactory::create(int, key_load_iterator, void *) return new (MALLOC_LOCAL) TestStorageProvider(); } +IStorage *TestStorageFactory::createMetadataDb() +{ + IStorage *metadataDb = new (MALLOC_LOCAL) TestStorageProvider(); + metadataDb->insert("KEYDB_METADATA_ID", strlen("KEYDB_METADATA_ID"), (void*)METADATA_DB_IDENTIFIER, strlen(METADATA_DB_IDENTIFIER), false); + return metadataDb; +} + const char *TestStorageFactory::name() const { return "TEST Storage Provider"; diff --git a/src/storage/teststorageprovider.h b/src/storage/teststorageprovider.h index d5e956c7e..2b2b1a38d 100644 --- a/src/storage/teststorageprovider.h +++ b/src/storage/teststorageprovider.h @@ -5,6 +5,7 @@ class TestStorageFactory : public IStorageFactory { virtual class IStorage *create(int db, key_load_iterator itr, void *privdata) override; + virtual class IStorage *createMetadataDb() override; virtual const char *name() const override; virtual size_t totalDiskspaceUsed() const override { return 0; } virtual bool FSlow() const { return false; } diff --git a/tests/integration/rdb-repl-tests b/tests/integration/rdb-repl-tests new file mode 100644 index 000000000..257d107f6 --- /dev/null +++ b/tests/integration/rdb-repl-tests @@ -0,0 +1,4 @@ +diskless no replicas drop during rdb pipe +diskless slow replicas drop during rdb pipe +diskless timeout replicas drop during rdb pipe +Kill rdb child process if its dumping RDB is not useful \ No newline at end of file diff --git a/tests/integration/replication-fast.tcl b/tests/integration/replication-fast.tcl new file mode 100644 index 000000000..f190524c1 --- /dev/null +++ b/tests/integration/replication-fast.tcl @@ -0,0 +1,34 @@ +proc prepare_value {size} { + set _v "c" + for {set i 1} {$i < $size} {incr i} { + append _v 0 + } + return $_v +} + +start_server {tags {"replication-fast"} overrides {storage-provider {flash ./rocks.db.master}}} { + set slave [srv 0 client] + set slave_host [srv 0 host] + set slave_port [srv 0 port] + start_server {tags {} overrides {storage-provider {flash ./rocks.db.replica}}} { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + + test "fast replication with large value" { + set _v [prepare_value [expr 64*1024*1024]] + # $master set key $_v + + $slave replicaof $master_host $master_port + wait_for_condition 50 300 { + [lindex [$slave role] 0] eq {slave} && + [string match {*master_link_status:up*} [$slave info replication]] + } else { + fail "Can't turn the instance into a replica" + } + + assert_equal [$slave debug digest] [$master debug digest] + $slave replicaof no one + } + } +} diff --git a/tests/integration/replication-psync-flash.tcl b/tests/integration/replication-psync-flash.tcl new file mode 100644 index 000000000..5cad127f3 --- /dev/null +++ b/tests/integration/replication-psync-flash.tcl @@ -0,0 +1,135 @@ +# Creates a master-slave pair and breaks the link continuously to force +# partial resyncs attempts, all this while flooding the master with +# write queries. +# +# You can specify backlog size, ttl, delay before reconnection, test duration +# in seconds, and an additional condition to verify at the end. +# +# If reconnect is > 0, the test actually try to break the connection and +# reconnect with the master, otherwise just the initial synchronization is +# checked for consistency. +proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reconnect} { + start_server {tags {"repl"}} { + start_server [list tags {flash} overrides [list storage-provider {flash ./rocks.db} delete-on-evict no storage-flush-period 10]] { + + set master [srv -1 client] + set master_host [srv -1 host] + set master_port [srv -1 port] + set slave [srv 0 client] + + set load_handle0 [start_bg_complex_data $master_host $master_port 9 100000] + set load_handle1 [start_bg_complex_data $master_host $master_port 11 100000] + set load_handle2 [start_bg_complex_data $master_host $master_port 12 100000] + + test {Slave should be able to synchronize with the master} { + $slave slaveof $master_host $master_port + wait_for_condition 50 100 { + [lindex [r role] 0] eq {slave} && + [lindex [r role] 3] eq {connected} + } else { + fail "Replication not started." + } + } + + # Check that the background clients are actually writing. + test {Detect write load to master} { + wait_for_condition 50 1000 { + [$master dbsize] > 100 + } else { + fail "Can't detect write load from background clients." + } + } + + test "Test replication partial resync: $descr (diskless: $mdl, $sdl, reconnect: $reconnect)" { + # Now while the clients are writing data, break the maste-slave + # link multiple times. + if ($reconnect) { + for {set j 0} {$j < $duration*10} {incr j} { + after 100 + # catch {puts "MASTER [$master dbsize] keys, REPLICA [$slave dbsize] keys"} + + if {($j % 20) == 0} { + catch { + $slave debug restart + } + } + } + } + stop_bg_complex_data $load_handle0 + stop_bg_complex_data $load_handle1 + stop_bg_complex_data $load_handle2 + + # Wait for the slave to reach the "online" + # state from the POV of the master. + set retry 5000 + while {$retry} { + set info [$master info] + if {[string match {*slave0:*state=online*} $info]} { + break + } else { + incr retry -1 + after 100 + } + } + if {$retry == 0} { + error "assertion:Slave not correctly synchronized" + } + + # Wait that slave acknowledge it is online so + # we are sure that DBSIZE and DEBUG DIGEST will not + # fail because of timing issues. (-LOADING error) + wait_for_condition 5000 100 { + [lindex [$slave role] 3] eq {connected} + } else { + fail "Slave still not connected after some time" + } + + set retry 10 + while {$retry && ([$master debug digest] ne [$slave debug digest])}\ + { + after 1000 + incr retry -1 + } + assert {[$master dbsize] > 0} + + if {[$master debug digest] ne [$slave debug digest]} { + set csv1 [csvdump r] + set csv2 [csvdump {r -1}] + set fd [open /tmp/repldump1.txt w] + puts -nonewline $fd $csv1 + close $fd + set fd [open /tmp/repldump2.txt w] + puts -nonewline $fd $csv2 + close $fd + puts "Master - Replica inconsistency" + puts "Run diff -u against /tmp/repldump*.txt for more info" + } + assert_equal [r debug digest] [r -1 debug digest] + eval $cond + } + } + } +} + +foreach mdl {no yes} { + foreach sdl {disabled swapdb} { + test_psync {no reconnection, just sync} 6 1000000 3600 0 { + } $mdl $sdl 0 + + test_psync {ok psync} 6 100000000 3600 0 { + assert {[s -1 sync_partial_ok] > 0} + } $mdl $sdl 1 + + test_psync {no backlog} 6 100 3600 0.5 { + assert {[s -1 sync_partial_err] > 0} + } $mdl $sdl 1 + + test_psync {ok after delay} 3 100000000 3600 3 { + assert {[s -1 sync_partial_ok] > 0} + } $mdl $sdl 1 + + test_psync {backlog expired} 3 100000000 1 3 { + assert {[s -1 sync_partial_err] > 0} + } $mdl $sdl 1 + } +} diff --git a/tests/support/server.tcl b/tests/support/server.tcl index c06dd0561..8b2aab137 100644 --- a/tests/support/server.tcl +++ b/tests/support/server.tcl @@ -1,4 +1,5 @@ set ::global_overrides {} +set ::global_storage_provider {} set ::tags {} set ::valgrind_errors {} @@ -392,6 +393,9 @@ proc start_server {options {code undefined}} { foreach {directive arguments} [concat $::global_overrides $overrides] { dict set config $directive $arguments } + foreach {directive argument1 argument2} $::global_storage_provider { + dict set config $directive $argument1 $argument2 + } # remove directives that are marked to be omitted foreach directive $omit { diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index 66f039511..2ac904d20 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -47,6 +47,7 @@ set ::all_tests { integration/replication-3 integration/replication-4 integration/replication-psync + integration/replication-psync-flash integration/replication-active integration/replication-multimaster integration/replication-multimaster-connect @@ -59,6 +60,7 @@ set ::all_tests { integration/failover integration/redis-cli integration/redis-benchmark + integration/replication-fast unit/pubsub unit/slowlog unit/scripting @@ -692,6 +694,21 @@ for {set j 0} {$j < [llength $argv]} {incr j} { } elseif {$opt eq {--help}} { print_help_screen exit 0 + } elseif {$opt eq {--flash}} { + lappend ::global_storage_provider storage-provider + lappend ::global_storage_provider flash + lappend ::global_storage_provider ./rocks.db + set ::all_tests { + integration/replication + integration/replication-2 + integration/replication-3 + integration/replication-4 + integration/replication-psync + } + set fp [open {./tests/integration/rdb-repl-tests} r] + set file_data [read $fp] + close $fp + set ::skiptests [split $file_data "\n"] } else { puts "Wrong argument: $opt" exit 1