Merge branch 'fastsync_collab' into 'keydbpro'

Fastsync collab

See merge request external-collab/keydb-pro-6!9

Former-commit-id: 8f410e4b814ea6ac506ab540ee1394834fd1d8a8
This commit is contained in:
Malavan Sotheeswaran 2021-11-26 20:53:00 +00:00
commit aaf1a42a7b
22 changed files with 1346 additions and 66 deletions

View File

@ -2,6 +2,8 @@
#include <functional> #include <functional>
#include "sds.h" #include "sds.h"
#define METADATA_DB_IDENTIFIER "c299fde0-6d42-4ec4-b939-34f680ffe39f"
class IStorageFactory class IStorageFactory
{ {
public: public:
@ -9,6 +11,7 @@ public:
virtual ~IStorageFactory() {} virtual ~IStorageFactory() {}
virtual class IStorage *create(int db, key_load_iterator itr, void *privdata) = 0; virtual class IStorage *create(int db, key_load_iterator itr, void *privdata) = 0;
virtual class IStorage *createMetadataDb() = 0;
virtual const char *name() const = 0; virtual const char *name() const = 0;
virtual size_t totalDiskspaceUsed() const = 0; virtual size_t totalDiskspaceUsed() const = 0;
virtual bool FSlow() const = 0; virtual bool FSlow() const = 0;
@ -30,10 +33,10 @@ public:
virtual bool enumerate(callback fn) const = 0; virtual bool enumerate(callback fn) const = 0;
virtual size_t count() const = 0; virtual size_t count() const = 0;
virtual void bulkInsert(sds *rgkeys, sds *rgvals, size_t celem) { virtual void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem) {
beginWriteBatch(); beginWriteBatch();
for (size_t ielem = 0; ielem < celem; ++ielem) { for (size_t ielem = 0; ielem < celem; ++ielem) {
insert(rgkeys[ielem], sdslen(rgkeys[ielem]), rgvals[ielem], sdslen(rgvals[ielem]), false); insert(rgkeys[ielem], rgcbkeys[ielem], rgvals[ielem], rgcbvals[ielem], false);
} }
endWriteBatch(); endWriteBatch();
} }

View File

@ -354,6 +354,7 @@ endif
REDIS_SERVER_NAME=keydb-server$(PROG_SUFFIX) REDIS_SERVER_NAME=keydb-server$(PROG_SUFFIX)
REDIS_SENTINEL_NAME=keydb-sentinel$(PROG_SUFFIX) REDIS_SENTINEL_NAME=keydb-sentinel$(PROG_SUFFIX)
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o t_nhash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o cron.o connection.o tls.o sha256.o motd_server.o timeout.o setcpuaffinity.o AsyncWorkQueue.o snapshot.o storage/rocksdb.o storage/rocksdbfactory.o storage/teststorageprovider.o keydbutils.o StorageCache.o monotonic.o cli_common.o mt19937-64.o $(ASM_OBJ) REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o t_nhash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o cron.o connection.o tls.o sha256.o motd_server.o timeout.o setcpuaffinity.o AsyncWorkQueue.o snapshot.o storage/rocksdb.o storage/rocksdbfactory.o storage/teststorageprovider.o keydbutils.o StorageCache.o monotonic.o cli_common.o mt19937-64.o $(ASM_OBJ)
KEYDB_SERVER_OBJ=SnapshotPayloadParseState.o
REDIS_CLI_NAME=keydb-cli$(PROG_SUFFIX) REDIS_CLI_NAME=keydb-cli$(PROG_SUFFIX)
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crcspeed.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o motd_client.o monotonic.o cli_common.o mt19937-64.o $(ASM_OBJ) REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crcspeed.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o motd_client.o monotonic.o cli_common.o mt19937-64.o $(ASM_OBJ)
REDIS_BENCHMARK_NAME=keydb-benchmark$(PROG_SUFFIX) REDIS_BENCHMARK_NAME=keydb-benchmark$(PROG_SUFFIX)
@ -410,7 +411,7 @@ endif
@touch $@ @touch $@
# keydb-server # keydb-server
$(REDIS_SERVER_NAME): $(REDIS_SERVER_OBJ) $(REDIS_SERVER_NAME): $(REDIS_SERVER_OBJ) $(KEYDB_SERVER_OBJ)
$(REDIS_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/lua/src/liblua.a ../deps/rocksdb/librocksdb.a $(FINAL_LIBS) $(REDIS_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/lua/src/liblua.a ../deps/rocksdb/librocksdb.a $(FINAL_LIBS)
# keydb-sentinel # keydb-sentinel
@ -433,7 +434,7 @@ $(REDIS_CLI_NAME): $(REDIS_CLI_OBJ)
$(REDIS_BENCHMARK_NAME): $(REDIS_BENCHMARK_OBJ) $(REDIS_BENCHMARK_NAME): $(REDIS_BENCHMARK_OBJ)
$(REDIS_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/hdr_histogram/hdr_histogram.o $(FINAL_LIBS) $(REDIS_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/hdr_histogram/hdr_histogram.o $(FINAL_LIBS)
DEP = $(REDIS_SERVER_OBJ:%.o=%.d) $(REDIS_CLI_OBJ:%.o=%.d) $(REDIS_BENCHMARK_OBJ:%.o=%.d) DEP = $(REDIS_SERVER_OBJ:%.o=%.d) $(KEYDB_SERVER_OBJ:%.o=%.d) $(REDIS_CLI_OBJ:%.o=%.d) $(REDIS_BENCHMARK_OBJ:%.o=%.d)
-include $(DEP) -include $(DEP)
# Because the jemalloc.h header is generated as a part of the jemalloc build, # Because the jemalloc.h header is generated as a part of the jemalloc build,

View File

@ -0,0 +1,344 @@
#include "server.h"
#include "SnapshotPayloadParseState.h"
#include <sys/mman.h>
static const size_t SLAB_SIZE = 2*1024*1024; // Note 64 MB because we try to give 64MB to a block
class SlabAllocator
{
class Slab {
char *m_pv = nullptr;
size_t m_cb = 0;
size_t m_cbAllocated = 0;
public:
Slab(size_t cbAllocate) {
m_pv = (char*)zmalloc(cbAllocate);
m_cb = cbAllocate;
}
~Slab() {
zfree(m_pv);
}
Slab(Slab &&other) {
m_pv = other.m_pv;
m_cb = other.m_cb;
m_cbAllocated = other.m_cbAllocated;
other.m_pv = nullptr;
other.m_cb = 0;
other.m_cbAllocated = 0;
}
bool canAllocate(size_t cb) const {
return (m_cbAllocated + cb) <= m_cb;
}
void *allocate(size_t cb) {
if ((m_cbAllocated + cb) > m_cb)
return nullptr;
void *pvret = m_pv + m_cbAllocated;
m_cbAllocated += cb;
return pvret;
}
};
std::vector<Slab> m_vecslabs;
public:
void *allocate(size_t cb) {
if (m_vecslabs.empty() || !m_vecslabs.back().canAllocate(cb)) {
m_vecslabs.emplace_back(std::max(cb, SLAB_SIZE));
}
return m_vecslabs.back().allocate(cb);
}
};
static uint64_t dictCStringHash(const void *key) {
return dictGenHashFunction((unsigned char*)key, strlen((char*)key));
}
static void dictKeyDestructor(void *privdata, void *key)
{
DICT_NOTUSED(privdata);
sdsfree((sds)key);
}
static int dictCStringCompare(void *, const void *key1, const void *key2)
{
int l1,l2;
l1 = strlen((sds)key1);
l2 = strlen((sds)key2);
if (l1 != l2) return 0;
return memcmp(key1, key2, l1) == 0;
}
dictType metadataLongLongDictType = {
dictCStringHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictCStringCompare, /* key compare */
dictKeyDestructor, /* key destructor */
nullptr, /* val destructor */
nullptr, /* allow to expand */
nullptr /* async free destructor */
};
dictType metadataDictType = {
dictSdsHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
dictSdsDestructor, /* key destructor */
dictSdsDestructor, /* val destructor */
nullptr, /* allow to expand */
nullptr /* async free destructor */
};
SnapshotPayloadParseState::ParseStageName SnapshotPayloadParseState::getNextStage() {
if (stackParse.empty())
return ParseStageName::Global;
switch (stackParse.back().name)
{
case ParseStageName::None:
return ParseStageName::Global;
case ParseStageName::Global:
if (stackParse.back().arraycur == 0)
return ParseStageName::MetaData;
else if (stackParse.back().arraycur == 1)
return ParseStageName::Databases;
break;
case ParseStageName::MetaData:
return ParseStageName::KeyValuePair;
case ParseStageName::Databases:
return ParseStageName::Dataset;
case ParseStageName::Dataset:
return ParseStageName::KeyValuePair;
default:
break;
}
throw "Bad protocol: corrupt state";
}
void SnapshotPayloadParseState::flushQueuedKeys() {
if (vecqueuedKeys.empty())
return;
serverAssert(current_database >= 0);
// TODO: We can't finish parse until all the work functions finish
int idb = current_database;
serverAssert(vecqueuedKeys.size() == vecqueuedVals.size());
auto sizePrev = vecqueuedKeys.size();
++insertsInFlight;
auto &insertsInFlightTmp = insertsInFlight; // C++ GRRRRRRRRRRRRRRRR, we don't want to capute "this" because that's dangerous
if (current_database < cserver.dbnum) {
g_pserver->asyncworkqueue->AddWorkFunction([idb, vecqueuedKeys = std::move(this->vecqueuedKeys), vecqueuedKeysCb = std::move(this->vecqueuedKeysCb), vecqueuedVals = std::move(this->vecqueuedVals), vecqueuedValsCb = std::move(this->vecqueuedValsCb), &insertsInFlightTmp, pallocator = m_spallocator.release()]() mutable {
g_pserver->db[idb]->bulkStorageInsert(vecqueuedKeys.data(), vecqueuedKeysCb.data(), vecqueuedVals.data(), vecqueuedValsCb.data(), vecqueuedKeys.size());
--insertsInFlightTmp;
delete pallocator;
});
} else {
// else drop the data
vecqueuedKeys.clear();
vecqueuedKeysCb.clear();
vecqueuedVals.clear();
vecqueuedValsCb.clear();
// Note: m_spallocator will get free'd when overwritten below
}
m_spallocator = std::make_unique<SlabAllocator>();
cbQueued = 0;
vecqueuedKeys.reserve(sizePrev);
vecqueuedKeysCb.reserve(sizePrev);
vecqueuedVals.reserve(sizePrev);
vecqueuedValsCb.reserve(sizePrev);
serverAssert(vecqueuedKeys.empty());
serverAssert(vecqueuedVals.empty());
}
SnapshotPayloadParseState::SnapshotPayloadParseState() {
// This is to represent the first array the client is intended to send us
ParseStage stage;
stage.name = ParseStageName::None;
stage.arraylen = 1;
stackParse.push_back(stage);
dictLongLongMetaData = dictCreate(&metadataLongLongDictType, nullptr);
dictMetaData = dictCreate(&metadataDictType, nullptr);
insertsInFlight = 0;
m_spallocator = std::make_unique<SlabAllocator>();
}
SnapshotPayloadParseState::~SnapshotPayloadParseState() {
dictRelease(dictLongLongMetaData);
dictRelease(dictMetaData);
}
const char *SnapshotPayloadParseState::getStateDebugName(ParseStage stage) {
switch (stage.name) {
case ParseStageName::None:
return "None";
case ParseStageName::Global:
return "Global";
case ParseStageName::MetaData:
return "MetaData";
case ParseStageName::Databases:
return "Databases";
case ParseStageName::KeyValuePair:
return "KeyValuePair";
case ParseStageName::Dataset:
return "Dataset";
default:
return "Unknown";
}
}
size_t SnapshotPayloadParseState::depth() const { return stackParse.size(); }
void SnapshotPayloadParseState::trimState() {
while (!stackParse.empty() && (stackParse.back().arraycur == stackParse.back().arraylen))
stackParse.pop_back();
if (stackParse.empty()) {
flushQueuedKeys();
while (insertsInFlight > 0) {
// TODO: ProcessEventsWhileBlocked
aeReleaseLock();
aeAcquireLock();
}
}
}
void SnapshotPayloadParseState::pushArray(long long size) {
if (stackParse.empty())
throw "Bad Protocol: unexpected trailing data";
if (size == 0) {
stackParse.back().arraycur++;
return;
}
if (stackParse.back().name == ParseStageName::Databases) {
flushQueuedKeys();
current_database = stackParse.back().arraycur;
}
ParseStage stage;
stage.name = getNextStage();
stage.arraylen = size;
if (stage.name == ParseStageName::Dataset) {
g_pserver->db[current_database]->expand(stage.arraylen);
}
// Note: This affects getNextStage so ensure its after
stackParse.back().arraycur++;
stackParse.push_back(stage);
}
void SnapshotPayloadParseState::pushValue(const char *rgch, long long cch) {
if (stackParse.empty())
throw "Bad Protocol: unexpected trailing bulk string";
if (stackParse.back().arraycur >= static_cast<int>(stackParse.back().arrvalues.size()))
throw "Bad protocol: Unexpected value";
auto &stage = stackParse.back();
stage.arrvalues[stackParse.back().arraycur].first = (char*)m_spallocator->allocate(cch);
stage.arrvalues[stackParse.back().arraycur].second = cch;
memcpy(stage.arrvalues[stackParse.back().arraycur].first, rgch, cch);
stage.arraycur++;
switch (stage.name) {
case ParseStageName::KeyValuePair:
if (stackParse.size() < 2)
throw "Bad Protocol: unexpected bulk string";
if (stackParse[stackParse.size()-2].name == ParseStageName::MetaData) {
if (stage.arraycur == 2) {
// We loaded both pairs
if (stage.arrvalues[0].first == nullptr || stage.arrvalues[1].first == nullptr)
throw "Bad Protocol: Got array when expecing a string"; // A baddy could make us derefence the vector when its too small
if (!strcasecmp(stage.arrvalues[0].first, "lua")) {
/* Load the script back in memory. */
robj *auxval = createStringObject(stage.arrvalues[1].first, stage.arrvalues[1].second);
if (luaCreateFunction(NULL,g_pserver->lua,auxval) == NULL) {
throw "Can't load Lua script";
}
} else {
dictAdd(dictMetaData, sdsnewlen(stage.arrvalues[0].first, stage.arrvalues[0].second), sdsnewlen(stage.arrvalues[1].first, stage.arrvalues[1].second));
}
}
} else if (stackParse[stackParse.size()-2].name == ParseStageName::Dataset) {
if (stage.arraycur == 2) {
// We loaded both pairs
if (stage.arrvalues[0].first == nullptr || stage.arrvalues[1].first == nullptr)
throw "Bad Protocol: Got array when expecing a string"; // A baddy could make us derefence the vector when its too small
vecqueuedKeys.push_back(stage.arrvalues[0].first);
vecqueuedKeysCb.push_back(stage.arrvalues[0].second);
vecqueuedVals.push_back(stage.arrvalues[1].first);
vecqueuedValsCb.push_back(stage.arrvalues[1].second);
stage.arrvalues[0].first = nullptr;
stage.arrvalues[1].first = nullptr;
cbQueued += vecqueuedKeysCb.back();
cbQueued += vecqueuedValsCb.back();
if (cbQueued >= queuedBatchLimit)
flushQueuedKeys();
}
} else {
throw "Bad Protocol: unexpected bulk string";
}
break;
default:
throw "Bad Protocol: unexpected bulk string out of KV pair";
}
}
void SnapshotPayloadParseState::pushValue(long long value) {
if (stackParse.empty())
throw "Bad Protocol: unexpected integer value";
stackParse.back().arraycur++;
if (stackParse.back().arraycur != 2 || stackParse.back().arrvalues[0].first == nullptr)
throw "Bad Protocol: unexpected integer value";
dictEntry *de = dictAddRaw(dictLongLongMetaData, sdsnewlen(stackParse.back().arrvalues[0].first, stackParse.back().arrvalues[0].second), nullptr);
if (de == nullptr)
throw "Bad Protocol: metadata field sent twice";
de->v.s64 = value;
}
long long SnapshotPayloadParseState::getMetaDataLongLong(const char *szField) const {
dictEntry *de = dictFind(dictLongLongMetaData, szField);
if (de == nullptr) {
serverLog(LL_WARNING, "Master did not send field: %s", szField);
throw false;
}
return de->v.s64;
}
sds SnapshotPayloadParseState::getMetaDataStr(const char *szField) const {
sdsstring str(szField, strlen(szField));
dictEntry *de = dictFind(dictMetaData, str.get());
if (de == nullptr) {
serverLog(LL_WARNING, "Master did not send field: %s", szField);
throw false;
}
return (sds)de->v.val;
}

View File

@ -0,0 +1,66 @@
#pragma once
#include <stack>
#include <vector>
class SlabAllocator;
class SnapshotPayloadParseState {
enum class ParseStageName {
None,
Global,
MetaData,
Databases,
Dataset,
KeyValuePair,
};
struct ParseStage {
ParseStageName name;
long long arraylen = 0;
long long arraycur = 0;
std::array<std::pair<char*, size_t>, 2> arrvalues;
ParseStage() {
for (auto &pair : arrvalues) {
pair.first = nullptr;
pair.second = 0;
}
}
};
std::vector<ParseStage> stackParse;
std::vector<char*> vecqueuedKeys;
std::vector<size_t> vecqueuedKeysCb;
std::vector<char*> vecqueuedVals;
std::vector<size_t> vecqueuedValsCb;
std::atomic<int> insertsInFlight;
std::unique_ptr<SlabAllocator> m_spallocator;
dict *dictLongLongMetaData = nullptr;
dict *dictMetaData = nullptr;
size_t cbQueued = 0;
static const size_t queuedBatchLimit = 64*1024*1024; // 64 MB
int current_database = -1;
ParseStageName getNextStage();
void flushQueuedKeys();
public:
SnapshotPayloadParseState();
~SnapshotPayloadParseState();
long long getMetaDataLongLong(const char *field) const;
sds getMetaDataStr(const char *szField) const;
static const char *getStateDebugName(ParseStage stage);
size_t depth() const;
void trimState();
void pushArray(long long size);
void pushValue(const char *rgch, long long cch);
void pushValue(long long value);
bool shouldThrottle() const { return insertsInFlight > (cserver.cthreads*4); }
};

View File

@ -97,17 +97,47 @@ void StorageCache::insert(sds key, const void *data, size_t cbdata, bool fOverwr
m_spstorage->insert(key, sdslen(key), (void*)data, cbdata, fOverwrite); m_spstorage->insert(key, sdslen(key), (void*)data, cbdata, fOverwrite);
} }
void StorageCache::bulkInsert(sds *rgkeys, sds *rgvals, size_t celem) long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing);
void StorageCache::bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem)
{ {
std::vector<dictEntry*> vechashes;
if (m_pdict != nullptr) {
vechashes.reserve(celem);
for (size_t ielem = 0; ielem < celem; ++ielem) {
dictEntry *de = (dictEntry*)zmalloc(sizeof(dictEntry));
de->key = (void*)dictGenHashFunction(rgkeys[ielem], (int)rgcbkeys[ielem]);
de->v.u64 = 1;
vechashes.push_back(de);
}
}
std::unique_lock<fastlock> ul(m_lock); std::unique_lock<fastlock> ul(m_lock);
bulkInsertsInProgress++; bulkInsertsInProgress++;
if (m_pdict != nullptr) { if (m_pdict != nullptr) {
for (size_t ielem = 0; ielem < celem; ++ielem) { for (dictEntry *de : vechashes) {
cacheKey(rgkeys[ielem]); if (dictIsRehashing(m_pdict)) dictRehash(m_pdict,1);
/* Get the index of the new element, or -1 if
* the element already exists. */
long index;
if ((index = _dictKeyIndex(m_pdict, de->key, (uint64_t)de->key, nullptr)) == -1) {
dictEntry *de = dictFind(m_pdict, de->key);
serverAssert(de != nullptr);
de->v.s64++;
m_collisionCount++;
zfree(de);
} else {
int iht = dictIsRehashing(m_pdict) ? 1 : 0;
de->next = m_pdict->ht[iht].table[index];
m_pdict->ht[iht].table[index] = de;
m_pdict->ht[iht].used++;
}
} }
} }
ul.unlock(); ul.unlock();
m_spstorage->bulkInsert(rgkeys, rgvals, celem);
m_spstorage->bulkInsert(rgkeys, rgcbkeys, rgvals, rgcbvals, celem);
bulkInsertsInProgress--; bulkInsertsInProgress--;
} }
@ -119,6 +149,14 @@ const StorageCache *StorageCache::clone()
return cacheNew; return cacheNew;
} }
void StorageCache::expand(uint64_t slots)
{
std::unique_lock<fastlock> ul(m_lock);
if (m_pdict) {
dictExpand(m_pdict, slots);
}
}
void StorageCache::retrieve(sds key, IStorage::callbackSingle fn) const void StorageCache::retrieve(sds key, IStorage::callbackSingle fn) const
{ {
std::unique_lock<fastlock> ul(m_lock); std::unique_lock<fastlock> ul(m_lock);

View File

@ -40,11 +40,12 @@ public:
void clear(); void clear();
void insert(sds key, const void *data, size_t cbdata, bool fOverwrite); void insert(sds key, const void *data, size_t cbdata, bool fOverwrite);
void bulkInsert(sds *rgkeys, sds *rgvals, size_t celem); void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem);
void retrieve(sds key, IStorage::callbackSingle fn) const; void retrieve(sds key, IStorage::callbackSingle fn) const;
bool erase(sds key); bool erase(sds key);
void emergencyFreeCache(); void emergencyFreeCache();
bool keycacheIsEnabled() const { return m_pdict != nullptr; } bool keycacheIsEnabled() const { return m_pdict != nullptr; }
void expand(uint64_t slots);
bool enumerate(IStorage::callback fn) const { return m_spstorage->enumerate(fn); } bool enumerate(IStorage::callback fn) const { return m_spstorage->enumerate(fn); }

View File

@ -870,6 +870,11 @@ int loadAppendOnlyFile(char *filename) {
fakeClient = createAOFClient(); fakeClient = createAOFClient();
startLoadingFile(fp, filename, RDBFLAGS_AOF_PREAMBLE); startLoadingFile(fp, filename, RDBFLAGS_AOF_PREAMBLE);
for (int idb = 0; idb < cserver.dbnum; ++idb)
{
g_pserver->db[idb]->trackChanges(true);
}
/* Check if this AOF file has an RDB preamble. In that case we need to /* Check if this AOF file has an RDB preamble. In that case we need to
* load the RDB file and later continue loading the AOF tail. */ * load the RDB file and later continue loading the AOF tail. */
char sig[5]; /* "REDIS" */ char sig[5]; /* "REDIS" */
@ -892,11 +897,6 @@ int loadAppendOnlyFile(char *filename) {
} }
} }
for (int idb = 0; idb < cserver.dbnum; ++idb)
{
g_pserver->db[idb]->trackChanges(true);
}
/* Read the actual AOF file, in REPL format, command by command. */ /* Read the actual AOF file, in REPL format, command by command. */
while(1) { while(1) {
int argc, j; int argc, j;

View File

@ -2903,8 +2903,10 @@ bool redisDbPersistentData::processChanges(bool fSnapshot)
{ {
if (m_fAllChanged) if (m_fAllChanged)
{ {
m_spstorage->clear(); if (dictSize(m_pdict) > 0 || m_spstorage->count() > 0) { // in some cases we may have pre-sized the StorageCache's dict, and we don't want clear to ruin it
storeDatabase(); m_spstorage->clear();
storeDatabase();
}
m_fAllChanged = 0; m_fAllChanged = 0;
} }
else else
@ -2936,15 +2938,19 @@ void redisDbPersistentData::processChangesAsync(std::atomic<int> &pendingJobs)
dictIterator *di = dictGetIterator(dictNew); dictIterator *di = dictGetIterator(dictNew);
dictEntry *de; dictEntry *de;
std::vector<sds> veckeys; std::vector<sds> veckeys;
std::vector<size_t> veccbkeys;
std::vector<sds> vecvals; std::vector<sds> vecvals;
std::vector<size_t> veccbvals;
while ((de = dictNext(di)) != nullptr) while ((de = dictNext(di)) != nullptr)
{ {
robj *o = (robj*)dictGetVal(de); robj *o = (robj*)dictGetVal(de);
sds temp = serializeStoredObjectAndExpire(this, (const char*) dictGetKey(de), o); sds temp = serializeStoredObjectAndExpire(this, (const char*) dictGetKey(de), o);
veckeys.push_back((sds)dictGetKey(de)); veckeys.push_back((sds)dictGetKey(de));
veccbkeys.push_back(sdslen((sds)dictGetKey(de)));
vecvals.push_back(temp); vecvals.push_back(temp);
veccbvals.push_back(sdslen(temp));
} }
m_spstorage->bulkInsert(veckeys.data(), vecvals.data(), veckeys.size()); m_spstorage->bulkInsert(veckeys.data(), veccbkeys.data(), vecvals.data(), veccbvals.data(), veckeys.size());
for (auto val : vecvals) for (auto val : vecvals)
sdsfree(val); sdsfree(val);
dictReleaseIterator(di); dictReleaseIterator(di);
@ -2953,6 +2959,11 @@ void redisDbPersistentData::processChangesAsync(std::atomic<int> &pendingJobs)
}); });
} }
void redisDbPersistentData::bulkStorageInsert(char **rgKeys, size_t *rgcbKeys, char **rgVals, size_t *rgcbVals, size_t celem)
{
m_spstorage->bulkInsert(rgKeys, rgcbKeys, rgVals, rgcbVals, celem);
}
void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot **psnapshotFree) void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot **psnapshotFree)
{ {
if (m_pdbSnapshotStorageFlush) if (m_pdbSnapshotStorageFlush)

View File

@ -63,7 +63,7 @@ static unsigned int dict_force_resize_ratio = 5;
static int _dictExpandIfNeeded(dict *ht); static int _dictExpandIfNeeded(dict *ht);
static unsigned long _dictNextPower(unsigned long size); static unsigned long _dictNextPower(unsigned long size);
static long _dictKeyIndex(dict *ht, const void *key, uint64_t hash, dictEntry **existing); long _dictKeyIndex(dict *ht, const void *key, uint64_t hash, dictEntry **existing);
static int _dictInit(dict *ht, dictType *type, void *privDataPtr); static int _dictInit(dict *ht, dictType *type, void *privDataPtr);
/* -------------------------- hash functions -------------------------------- */ /* -------------------------- hash functions -------------------------------- */
@ -1366,7 +1366,7 @@ static unsigned long _dictNextPower(unsigned long size)
* *
* Note that if we are in the process of rehashing the hash table, the * Note that if we are in the process of rehashing the hash table, the
* index is always returned in the context of the second (new) hash table. */ * index is always returned in the context of the second (new) hash table. */
static long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing) long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing)
{ {
unsigned long idx, table; unsigned long idx, table;
dictEntry *he; dictEntry *he;

View File

@ -237,7 +237,7 @@ void clientInstallWriteHandler(client *c) {
* writes at this stage. */ * writes at this stage. */
if (!(c->flags & CLIENT_PENDING_WRITE) && if (!(c->flags & CLIENT_PENDING_WRITE) &&
(c->replstate == REPL_STATE_NONE || (c->replstate == REPL_STATE_NONE || c->replstate == SLAVE_STATE_FASTSYNC_TX || c->replstate == SLAVE_STATE_FASTSYNC_DONE ||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack))) (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
{ {
AssertCorrectThread(c); AssertCorrectThread(c);
@ -1566,7 +1566,7 @@ bool freeClient(client *c) {
/* If a client is protected, yet we need to free it right now, make sure /* If a client is protected, yet we need to free it right now, make sure
* to at least use asynchronous freeing. */ * to at least use asynchronous freeing. */
if (c->flags & CLIENT_PROTECTED || c->casyncOpsPending) { if (c->flags & CLIENT_PROTECTED || c->casyncOpsPending || c->replstate == SLAVE_STATE_FASTSYNC_TX) {
freeClientAsync(c); freeClientAsync(c);
return false; return false;
} }
@ -1791,7 +1791,7 @@ int writeToClient(client *c, int handler_installed) {
/* We can only directly read from the replication backlog if the client /* We can only directly read from the replication backlog if the client
is a replica, so only attempt to do so if that's the case. */ is a replica, so only attempt to do so if that's the case. */
if (c->flags & CLIENT_SLAVE && !(c->flags & CLIENT_MONITOR)) { if (c->flags & CLIENT_SLAVE && !(c->flags & CLIENT_MONITOR) && c->replstate == SLAVE_STATE_ONLINE) {
std::unique_lock<fastlock> repl_backlog_lock (g_pserver->repl_backlog_lock); std::unique_lock<fastlock> repl_backlog_lock (g_pserver->repl_backlog_lock);
while (clientHasPendingReplies(c)) { while (clientHasPendingReplies(c)) {
@ -1833,9 +1833,11 @@ int writeToClient(client *c, int handler_installed) {
} }
} else { } else {
while(clientHasPendingReplies(c)) { while(clientHasPendingReplies(c)) {
serverAssert(!(c->flags & CLIENT_SLAVE) || c->flags & CLIENT_MONITOR);
if (c->bufpos > 0) { if (c->bufpos > 0) {
nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen); auto bufpos = c->bufpos;
lock.unlock();
nwritten = connWrite(c->conn,c->buf+c->sentlen,bufpos-c->sentlen);
lock.lock();
if (nwritten <= 0) break; if (nwritten <= 0) break;
c->sentlen += nwritten; c->sentlen += nwritten;
totwritten += nwritten; totwritten += nwritten;
@ -1854,7 +1856,10 @@ int writeToClient(client *c, int handler_installed) {
continue; continue;
} }
nwritten = connWrite(c->conn, o->buf() + c->sentlen, o->used - c->sentlen); auto used = o->used;
lock.unlock();
nwritten = connWrite(c->conn, o->buf() + c->sentlen, used - c->sentlen);
lock.lock();
if (nwritten <= 0) break; if (nwritten <= 0) break;
c->sentlen += nwritten; c->sentlen += nwritten;
totwritten += nwritten; totwritten += nwritten;
@ -1993,7 +1998,7 @@ void ProcessPendingAsyncWrites()
ae_flags |= AE_BARRIER; ae_flags |= AE_BARRIER;
} }
if (!((c->replstate == REPL_STATE_NONE || if (!((c->replstate == REPL_STATE_NONE || c->replstate == SLAVE_STATE_FASTSYNC_TX ||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))) (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack))))
continue; continue;
@ -2063,9 +2068,9 @@ int handleClientsWithPendingWrites(int iel, int aof_state) {
/* If a client is protected, don't do anything, /* If a client is protected, don't do anything,
* that may trigger write error or recreate handler. */ * that may trigger write error or recreate handler. */
if (flags & CLIENT_PROTECTED) continue; if ((flags & CLIENT_PROTECTED) && !(flags & CLIENT_SLAVE)) continue;
std::unique_lock<decltype(c->lock)> lock(c->lock); //std::unique_lock<decltype(c->lock)> lock(c->lock);
/* Don't write to clients that are going to be closed anyway. */ /* Don't write to clients that are going to be closed anyway. */
if (c->flags & CLIENT_CLOSE_ASAP) continue; if (c->flags & CLIENT_CLOSE_ASAP) continue;
@ -2075,11 +2080,9 @@ int handleClientsWithPendingWrites(int iel, int aof_state) {
{ {
if (c->flags & CLIENT_CLOSE_ASAP) if (c->flags & CLIENT_CLOSE_ASAP)
{ {
lock.release(); // still locked
AeLocker ae; AeLocker ae;
ae.arm(c); ae.arm(nullptr);
if (!freeClient(c)) // writeToClient will only async close, but there's no need to wait freeClient(c); // writeToClient will only async close, but there's no need to wait
c->lock.unlock(); // if we just got put on the async close list, then we need to remove the lock
} }
continue; continue;
} }
@ -3829,7 +3832,7 @@ void asyncCloseClientOnOutputBufferLimitReached(client *c) {
if (!c->conn) return; /* It is unsafe to free fake clients. */ if (!c->conn) return; /* It is unsafe to free fake clients. */
serverAssert(c->reply_bytes < SIZE_MAX-(1024*64)); serverAssert(c->reply_bytes < SIZE_MAX-(1024*64));
if (c->reply_bytes == 0 || c->flags & CLIENT_CLOSE_ASAP) return; if (c->reply_bytes == 0 || c->flags & CLIENT_CLOSE_ASAP) return;
if (checkClientOutputBufferLimits(c)) { if (checkClientOutputBufferLimits(c) && c->replstate != SLAVE_STATE_FASTSYNC_TX) {
sds client = catClientInfoString(sdsempty(),c); sds client = catClientInfoString(sdsempty(),c);
freeClientAsync(c); freeClientAsync(c);

View File

@ -34,6 +34,7 @@
#include "cluster.h" #include "cluster.h"
#include "bio.h" #include "bio.h"
#include "aelocker.h" #include "aelocker.h"
#include "SnapshotPayloadParseState.h"
#include <sys/time.h> #include <sys/time.h>
#include <unistd.h> #include <unistd.h>
@ -982,6 +983,244 @@ need_full_resync:
return C_ERR; return C_ERR;
} }
int checkClientOutputBufferLimits(client *c);
class replicationBuffer {
std::vector<client *> replicas;
clientReplyBlock *reply = nullptr;
size_t writtenBytesTracker = 0;
public:
replicationBuffer() {
reply = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + (PROTO_REPLY_CHUNK_BYTES*2));
reply->size = zmalloc_usable_size(reply) - sizeof(clientReplyBlock);
reply->used = 0;
}
~replicationBuffer() {
zfree(reply);
}
void addReplica(client *replica) {
replicas.push_back(replica);
replicationSetupSlaveForFullResync(replica,getPsyncInitialOffset());
// Optimize the socket for bulk transfer
//connDisableTcpNoDelay(replica->conn);
}
bool isActive() const { return !replicas.empty(); }
void flushData() {
if (reply == nullptr)
return;
size_t written = reply->used;
aeAcquireLock();
for (size_t ireplica = 0; ireplica < replicas.size(); ++ireplica) {
auto replica = replicas[ireplica];
if (replica->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP) {
replica->replstate = REPL_STATE_NONE;
continue;
}
while (checkClientOutputBufferLimits(replica)
&& (replica->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP) == 0) {
aeReleaseLock();
usleep(10);
aeAcquireLock();
}
std::unique_lock<fastlock> lock(replica->lock);
addReplyProto(replica, reply->buf(), reply->used);
}
ProcessPendingAsyncWrites();
replicas.erase(std::remove_if(replicas.begin(), replicas.end(), [](const client *c)->bool{ return c->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP;}), replicas.end());
aeReleaseLock();
if (reply != nullptr) {
reply->used = 0;
}
writtenBytesTracker += written;
}
void addData(const char *data, unsigned long size) {
if (reply != nullptr && (size + reply->used) > reply->size)
flushData();
if (reply != nullptr && reply->size < size) {
serverAssert(reply->used == 0); // flush should have happened
zfree(reply);
reply = nullptr;
}
if (reply == nullptr) {
reply = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + std::max(size, (unsigned long)(PROTO_REPLY_CHUNK_BYTES*2)));
reply->size = zmalloc_usable_size(reply) - sizeof(clientReplyBlock);
reply->used = 0;
}
serverAssert((reply->size - reply->used) >= size);
memcpy(reply->buf() + reply->used, data, size);
reply->used += size;
}
void addLongWithPrefix(long val, char prefix) {
char buf[128];
int len;
buf[0] = prefix;
len = ll2string(buf+1,sizeof(buf)-1,val);
buf[len+1] = '\r';
buf[len+2] = '\n';
addData(buf, len+3);
}
void addArrayLen(int len) {
addLongWithPrefix(len, '*');
}
void addLong(long val) {
addLongWithPrefix(val, ':');
}
void addString(const char *s, unsigned long len) {
addLongWithPrefix(len, '$');
addData(s, len);
addData("\r\n", 2);
}
size_t cbWritten() const { return writtenBytesTracker; }
void end() {
flushData();
for (auto replica : replicas) {
// Return to original settings
//if (!g_pserver->repl_disable_tcp_nodelay)
// connEnableTcpNoDelay(replica->conn);
}
}
void putSlavesOnline() {
for (auto replica : replicas) {
replica->replstate = SLAVE_STATE_FASTSYNC_DONE;
replica->repl_put_online_on_ack = 1;
}
}
};
int rdbSaveSnapshotForReplication(struct rdbSaveInfo *rsi) {
// TODO: This needs to be on a background thread
int retval = C_OK;
serverAssert(GlobalLocksAcquired());
serverLog(LL_NOTICE, "Starting storage provider fast full sync with target: %s", "disk");
std::shared_ptr<replicationBuffer> spreplBuf = std::make_shared<replicationBuffer>();
listNode *ln;
listIter li;
client *replica = nullptr;
listRewind(g_pserver->slaves, &li);
while (replica == nullptr && (ln = listNext(&li))) {
client *replicaCur = (client*)listNodeValue(ln);
if ((replicaCur->slave_capa & SLAVE_CAPA_ROCKSDB_SNAPSHOT) && (replicaCur->replstate == SLAVE_STATE_WAIT_BGSAVE_START)) {
replica = replicaCur;
spreplBuf->addReplica(replica);
replicaCur->replstate = SLAVE_STATE_FASTSYNC_TX;
}
}
serverAssert(replica != nullptr);
spreplBuf->addArrayLen(2); // Two sections: Metadata and databases
// MetaData
aeAcquireLock();
spreplBuf->addArrayLen(3 + dictSize(g_pserver->lua_scripts));
spreplBuf->addArrayLen(2);
spreplBuf->addString("repl-stream-db", 14);
spreplBuf->addLong(rsi->repl_stream_db);
spreplBuf->addArrayLen(2);
spreplBuf->addString("repl-id", 7);
spreplBuf->addString(rsi->repl_id, CONFIG_RUN_ID_SIZE);
spreplBuf->addArrayLen(2);
spreplBuf->addString("repl-offset", 11);
spreplBuf->addLong(rsi->master_repl_offset);
if (dictSize(g_pserver->lua_scripts)) {
dictEntry *de;
auto di = dictGetIterator(g_pserver->lua_scripts);
while((de = dictNext(di)) != NULL) {
robj *body = (robj*)dictGetVal(de);
spreplBuf->addArrayLen(2);
spreplBuf->addString("lua", 3);
spreplBuf->addString(szFromObj(body), sdslen(szFromObj(body)));
}
dictReleaseIterator(di);
di = NULL; /* So that we don't release it again on error. */
}
std::shared_ptr<std::vector<std::unique_ptr<const StorageCache>>> spvecspsnapshot = std::make_shared<std::vector<std::unique_ptr<const StorageCache>>>();
for (int idb = 0; idb < cserver.dbnum; ++idb) {
spvecspsnapshot->emplace_back(g_pserver->db[idb]->CloneStorageCache());
}
aeReleaseLock();
g_pserver->asyncworkqueue->AddWorkFunction([spreplBuf = std::move(spreplBuf), spvecspsnapshot = std::move(spvecspsnapshot)]{
int retval = C_OK;
auto timeStart = ustime();
auto lastLogTime = timeStart;
size_t cbData = 0;
size_t cbLastUpdate = 0;
auto &replBuf = *spreplBuf;
// Databases
replBuf.addArrayLen(cserver.dbnum);
for (int idb = 0; idb < cserver.dbnum; ++idb) {
auto &spsnapshot = (*spvecspsnapshot)[idb];
size_t snapshotDeclaredCount = spsnapshot->count();
replBuf.addArrayLen(snapshotDeclaredCount);
size_t count = 0;
bool result = spsnapshot->enumerate([&replBuf, &count, &cbData, &lastLogTime, timeStart, &cbLastUpdate](const char *rgchKey, size_t cchKey, const void *rgchVal, size_t cchVal) -> bool{
replBuf.addArrayLen(2);
replBuf.addString(rgchKey, cchKey);
replBuf.addString((const char *)rgchVal, cchVal);
++count;
if ((count % 8092) == 0) {
auto curTime = ustime();
if ((curTime - lastLogTime) > 60000000) {
auto usec = curTime - lastLogTime;
serverLog(LL_NOTICE, "Replication status: Transferred %zuMB (%.2fGbit/s)", replBuf.cbWritten()/1024/1024, ((replBuf.cbWritten()-cbLastUpdate)*8.0)/(usec/1000000.0)/1000000000.0);
cbLastUpdate = replBuf.cbWritten();
lastLogTime = ustime();
}
}
cbData += cchKey + cchVal;
return replBuf.isActive();
});
if (!result) {
retval = C_ERR;
break;
}
serverAssert(count == snapshotDeclaredCount);
}
replBuf.end();
if (!replBuf.isActive()) {
retval = C_ERR;
}
serverLog(LL_NOTICE, "Snapshot replication done: %s", (retval == C_OK) ? "success" : "failed");
auto usec = ustime() - timeStart;
serverLog(LL_NOTICE, "Transferred %zuMB total (%zuMB data) in %.2f seconds. (%.2fGbit/s)", spreplBuf->cbWritten()/1024/1024, cbData/1024/1024, usec/1000000.0, (spreplBuf->cbWritten()*8.0)/(usec/1000000.0)/1000000000.0);
if (retval == C_OK) {
aeAcquireLock();
replBuf.putSlavesOnline();
aeReleaseLock();
}
});
return retval;
}
/* Start a BGSAVE for replication goals, which is, selecting the disk or /* Start a BGSAVE for replication goals, which is, selecting the disk or
* socket target depending on the configuration, and making sure that * socket target depending on the configuration, and making sure that
* the script cache is flushed before to start. * the script cache is flushed before to start.
@ -1015,7 +1254,9 @@ int startBgsaveForReplication(int mincapa) {
/* Only do rdbSave* when rsiptr is not NULL, /* Only do rdbSave* when rsiptr is not NULL,
* otherwise replica will miss repl-stream-db. */ * otherwise replica will miss repl-stream-db. */
if (rsiptr) { if (rsiptr) {
if (socket_target) if (mincapa & SLAVE_CAPA_ROCKSDB_SNAPSHOT && g_pserver->m_pstorageFactory)
retval = rdbSaveSnapshotForReplication(rsiptr);
else if (socket_target)
retval = rdbSaveToSlavesSockets(rsiptr); retval = rdbSaveToSlavesSockets(rsiptr);
else else
retval = rdbSaveBackground(rsiptr); retval = rdbSaveBackground(rsiptr);
@ -1191,8 +1432,11 @@ void syncCommand(client *c) {
g_pserver->replid, g_pserver->replid2); g_pserver->replid, g_pserver->replid2);
} }
/* CASE 0: Fast Sync */
if ((c->slave_capa & SLAVE_CAPA_ROCKSDB_SNAPSHOT) && g_pserver->m_pstorageFactory) {
serverLog(LL_NOTICE,"Fast SYNC on next replication cycle");
/* CASE 1: BGSAVE is in progress, with disk target. */ /* CASE 1: BGSAVE is in progress, with disk target. */
if (g_pserver->FRdbSaveInProgress() && } else if (g_pserver->FRdbSaveInProgress() &&
g_pserver->rdb_child_type == RDB_CHILD_TYPE_DISK) g_pserver->rdb_child_type == RDB_CHILD_TYPE_DISK)
{ {
/* Ok a background save is in progress. Let's check if it is a good /* Ok a background save is in progress. Let's check if it is a good
@ -1387,6 +1631,8 @@ void replconfCommand(client *c) {
c->slave_capa |= SLAVE_CAPA_PSYNC2; c->slave_capa |= SLAVE_CAPA_PSYNC2;
else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]), "activeExpire")) else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]), "activeExpire"))
c->slave_capa |= SLAVE_CAPA_ACTIVE_EXPIRE; c->slave_capa |= SLAVE_CAPA_ACTIVE_EXPIRE;
else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]), "rocksdb-snapshot-load"))
c->slave_capa |= SLAVE_CAPA_ROCKSDB_SNAPSHOT;
fCapaCommand = true; fCapaCommand = true;
} else if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"ack")) { } else if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"ack")) {
@ -1411,7 +1657,7 @@ void replconfCommand(client *c) {
* quick check first (instead of waiting for the next ACK. */ * quick check first (instead of waiting for the next ACK. */
if (g_pserver->child_type == CHILD_TYPE_RDB && c->replstate == SLAVE_STATE_WAIT_BGSAVE_END) if (g_pserver->child_type == CHILD_TYPE_RDB && c->replstate == SLAVE_STATE_WAIT_BGSAVE_END)
checkChildrenDone(); checkChildrenDone();
if (c->repl_put_online_on_ack && c->replstate == SLAVE_STATE_ONLINE) if (c->repl_put_online_on_ack && (c->replstate == SLAVE_STATE_ONLINE || c->replstate == SLAVE_STATE_FASTSYNC_DONE))
putSlaveOnline(c); putSlaveOnline(c);
/* Note: this command does not reply anything! */ /* Note: this command does not reply anything! */
return; return;
@ -1453,6 +1699,8 @@ void replconfCommand(client *c) {
sds reply = sdsnew("+OK"); sds reply = sdsnew("+OK");
if (g_pserver->fActiveReplica) if (g_pserver->fActiveReplica)
reply = sdscat(reply, " active-replica"); reply = sdscat(reply, " active-replica");
if (g_pserver->m_pstorageFactory && (c->slave_capa & SLAVE_CAPA_ROCKSDB_SNAPSHOT) && !g_pserver->fActiveReplica)
reply = sdscat(reply, " rocksdb-snapshot-save");
reply = sdscat(reply, "\r\n"); reply = sdscat(reply, "\r\n");
addReplySds(c, reply); addReplySds(c, reply);
} else { } else {
@ -1898,6 +2146,68 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type)
} }
} }
/* Save the replid of yourself and any connected masters to storage.
* Returns if no storage provider is used. */
void saveMasterStatusToStorage()
{
if (!g_pserver->m_pstorageFactory || !g_pserver->metadataDb) return;
g_pserver->metadataDb->insert("repl-id", 7, g_pserver->replid, sizeof(g_pserver->replid), true);
g_pserver->metadataDb->insert("repl-offset", 11, &g_pserver->master_repl_offset, sizeof(g_pserver->master_repl_offset), true);
if (g_pserver->fActiveReplica || (!listLength(g_pserver->masters) && g_pserver->repl_backlog)) {
g_pserver->metadataDb->insert("repl-stream-db", 14, g_pserver->replicaseldb == -1 ? 0 : &g_pserver->replicaseldb,
g_pserver->replicaseldb == -1 ? 0 : sizeof(g_pserver->replicaseldb), true);
}
struct redisMaster *miFirst = (redisMaster*)(listLength(g_pserver->masters) ? listNodeValue(listFirst(g_pserver->masters)) : NULL);
if (miFirst && miFirst->master) {
g_pserver->metadataDb->insert("repl-stream-db", 14, &miFirst->master->db->id, sizeof(miFirst->master->db->id), true);
}
else if (miFirst && miFirst->cached_master) {
g_pserver->metadataDb->insert("repl-stream-db", 14, &miFirst->cached_master->db->id, sizeof(miFirst->cached_master->db->id), true);
}
if (listLength(g_pserver->masters) == 0) {
g_pserver->metadataDb->insert("repl-masters", 12, (void*)"", 0, true);
return;
}
sds val = sds(sdsempty());
listNode *ln;
listIter li;
redisMaster *mi;
listRewind(g_pserver->masters,&li);
while((ln = listNext(&li)) != NULL) {
mi = (redisMaster*)listNodeValue(ln);
if (mi->masterhost == NULL) {
// if we don't know the host, no reason to save
continue;
}
if (!mi->master) {
// If master client is not available, use info from master struct - better than nothing
if (mi->master_replid[0] == 0) {
// if replid is null, there's no reason to save it
continue;
}
val = sdscatfmt(val, "%s:%I:%s:%i;", mi->master_replid,
mi->master_initial_offset,
mi->masterhost,
mi->masterport);
}
else {
if (mi->master->replid[0] == 0) {
// if replid is null, there's no reason to save it
continue;
}
val = sdscatfmt(val, "%s:%I:%s:%i;", mi->master->replid,
mi->master->reploff,
mi->masterhost,
mi->masterport);
}
}
g_pserver->metadataDb->insert("repl-masters", 12, (void*)val, sdslen(val), true);
}
/* Change the current instance replication ID with a new, random one. /* Change the current instance replication ID with a new, random one.
* This will prevent successful PSYNCs between this master and other * This will prevent successful PSYNCs between this master and other
* slaves, so the command should be called when something happens that * slaves, so the command should be called when something happens that
@ -1905,6 +2215,7 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type)
void changeReplicationId(void) { void changeReplicationId(void) {
getRandomHexChars(g_pserver->replid,CONFIG_RUN_ID_SIZE); getRandomHexChars(g_pserver->replid,CONFIG_RUN_ID_SIZE);
g_pserver->replid[CONFIG_RUN_ID_SIZE] = '\0'; g_pserver->replid[CONFIG_RUN_ID_SIZE] = '\0';
saveMasterStatusToStorage();
} }
@ -2127,25 +2438,183 @@ void disklessLoadDiscardBackup(const dbBackup *buckup, int flag) {
discardDbBackup(buckup, flag, replicationEmptyDbCallback); discardDbBackup(buckup, flag, replicationEmptyDbCallback);
} }
size_t parseCount(const char *rgch, size_t cch, long long *pvalue) {
size_t cchNumeral = 0;
if (cch > cchNumeral+1 && rgch[cchNumeral+1] == '-') ++cchNumeral;
while ((cch > (cchNumeral+1)) && isdigit(rgch[1 + cchNumeral])) ++cchNumeral;
if (cch < (cchNumeral+1+2)) { // +2 is for the \r\n we expect
throw true; // continuable
}
if (rgch[cchNumeral+1] != '\r' || rgch[cchNumeral+2] != '\n') {
serverLog(LL_WARNING, "Bad protocol from MASTER: %s", rgch);
throw false;
}
if (!string2ll(rgch+1, cchNumeral, pvalue)) {
serverLog(LL_WARNING, "Bad protocol from MASTER: %s", rgch);
throw false;
}
return cchNumeral + 3;
}
bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi) {
int fUpdate = g_pserver->fActiveReplica || g_pserver->enable_multimaster;
serverAssert(GlobalLocksAcquired());
serverAssert(mi->master == nullptr);
bool fFinished = false;
if (mi->bulkreadBuffer == nullptr) {
mi->bulkreadBuffer = sdsempty();
mi->parseState = new SnapshotPayloadParseState();
if (g_pserver->aof_state != AOF_OFF) stopAppendOnly();
if (!fUpdate) {
int empty_db_flags = g_pserver->repl_slave_lazy_flush ? EMPTYDB_ASYNC :
EMPTYDB_NO_FLAGS;
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data");
emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
for (int idb = 0; idb < cserver.dbnum; ++idb) {
aeAcquireLock();
g_pserver->db[idb]->processChanges(false);
aeReleaseLock();
g_pserver->db[idb]->commitChanges();
g_pserver->db[idb]->trackChanges(false);
}
}
}
for (int iter = 0; iter < 10; ++iter) {
if (mi->parseState->shouldThrottle())
return false;
auto readlen = PROTO_IOBUF_LEN;
auto qblen = sdslen(mi->bulkreadBuffer);
mi->bulkreadBuffer = sdsMakeRoomFor(mi->bulkreadBuffer, readlen);
auto nread = connRead(conn, mi->bulkreadBuffer+qblen, readlen);
if (nread <= 0) {
if (connGetState(conn) != CONN_STATE_CONNECTED)
cancelReplicationHandshake(mi, true);
return false;
}
mi->repl_transfer_lastio = g_pserver->unixtime;
sdsIncrLen(mi->bulkreadBuffer,nread);
size_t offset = 0;
try {
if (sdslen(mi->bulkreadBuffer) > cserver.client_max_querybuf_len) {
throw "Full Sync Streaming Buffer Exceeded (increase client_max_querybuf_len)";
}
while (sdslen(mi->bulkreadBuffer) > offset) {
// Pop completed items
mi->parseState->trimState();
if (mi->bulkreadBuffer[offset] == '*') {
// Starting an array
long long arraySize;
// Lets read the array length
offset += parseCount(mi->bulkreadBuffer + offset, sdslen(mi->bulkreadBuffer) - offset, &arraySize);
if (arraySize < 0)
throw "Invalid array size";
mi->parseState->pushArray(arraySize);
} else if (mi->bulkreadBuffer[offset] == '$') {
// Loading in a string
long long payloadsize = 0;
// Lets read the string length
size_t offsetCount = parseCount(mi->bulkreadBuffer + offset, sdslen(mi->bulkreadBuffer) - offset, &payloadsize);
if (payloadsize < 0)
throw "Invalid array size";
// OK we know how long the string is, now lets make sure the payload is here.
if (sdslen(mi->bulkreadBuffer) < (offset + offsetCount + payloadsize + 2)) {
goto LContinue; // wait for more data (note: we could throw true here, but throw is way more expensive)
}
mi->parseState->pushValue(mi->bulkreadBuffer + offset + offsetCount, payloadsize);
// On to the next one
offset += offsetCount + payloadsize + 2;
} else if (mi->bulkreadBuffer[offset] == ':') {
// Numeral
long long value;
size_t offsetValue = parseCount(mi->bulkreadBuffer + offset, sdslen(mi->bulkreadBuffer) - offset, &value);
mi->parseState->pushValue(value);
offset += offsetValue;
} else {
serverLog(LL_WARNING, "Bad protocol from MASTER: %s", mi->bulkreadBuffer+offset);
cancelReplicationHandshake(mi, true);
return false;
}
}
sdsrange(mi->bulkreadBuffer, offset, -1);
offset = 0;
// Cleanup the remaining stack
mi->parseState->trimState();
if (mi->parseState->depth() != 0)
return false;
static_assert(sizeof(long) == sizeof(long long),"");
rsi.repl_stream_db = mi->parseState->getMetaDataLongLong("repl-stream-db");
rsi.repl_offset = mi->parseState->getMetaDataLongLong("repl-offset");
sds str = mi->parseState->getMetaDataStr("repl-id");
if (sdslen(str) == CONFIG_RUN_ID_SIZE+1) {
memcpy(rsi.repl_id, str, CONFIG_RUN_ID_SIZE+1);
}
fFinished = true;
break; // We're done!!!
}
catch (const char *sz) {
serverLog(LL_WARNING, "%s", sz);
cancelReplicationHandshake(mi, true);
return false;
} catch (bool fContinue) {
if (!fContinue) {
cancelReplicationHandshake(mi, true);
return false;
}
}
LContinue:
sdsrange(mi->bulkreadBuffer, offset, -1);
}
if (!fFinished)
return false;
serverLog(LL_NOTICE, "Fast sync complete");
sdsfree(mi->bulkreadBuffer);
mi->bulkreadBuffer = nullptr;
delete mi->parseState;
mi->parseState = nullptr;
return true;
}
/* Asynchronously read the SYNC payload we receive from a master */ /* Asynchronously read the SYNC payload we receive from a master */
#define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */ #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
void readSyncBulkPayload(connection *conn) { bool readSyncBulkPayloadRdb(connection *conn, redisMaster *mi, rdbSaveInfo &rsi, int &usemark) {
char buf[PROTO_IOBUF_LEN]; char buf[PROTO_IOBUF_LEN];
ssize_t nread, readlen, nwritten; ssize_t nread, readlen, nwritten;
int use_diskless_load = useDisklessLoad(); int use_diskless_load = useDisklessLoad();
const dbBackup *diskless_load_backup = NULL; const dbBackup *diskless_load_backup = NULL;
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
rsi.fForceSetKey = !!g_pserver->fActiveReplica; rsi.fForceSetKey = !!g_pserver->fActiveReplica;
int empty_db_flags = g_pserver->repl_slave_lazy_flush ? EMPTYDB_ASYNC : int empty_db_flags = g_pserver->repl_slave_lazy_flush ? EMPTYDB_ASYNC :
EMPTYDB_NO_FLAGS; EMPTYDB_NO_FLAGS;
off_t left; off_t left;
// Should we update our database, or create from scratch? // Should we update our database, or create from scratch?
int fUpdate = g_pserver->fActiveReplica || g_pserver->enable_multimaster; int fUpdate = g_pserver->fActiveReplica || g_pserver->enable_multimaster;
redisMaster *mi = (redisMaster*)connGetPrivateData(conn);
if (mi == nullptr) {
// We're about to be free'd so bail out
return;
}
serverAssert(GlobalLocksAcquired()); serverAssert(GlobalLocksAcquired());
serverAssert(mi->master == nullptr); serverAssert(mi->master == nullptr);
@ -2154,7 +2623,6 @@ void readSyncBulkPayload(connection *conn) {
* from the server: when they match, we reached the end of the transfer. */ * from the server: when they match, we reached the end of the transfer. */
static char eofmark[CONFIG_RUN_ID_SIZE]; static char eofmark[CONFIG_RUN_ID_SIZE];
static char lastbytes[CONFIG_RUN_ID_SIZE]; static char lastbytes[CONFIG_RUN_ID_SIZE];
static int usemark = 0;
/* If repl_transfer_size == -1 we still have to read the bulk length /* If repl_transfer_size == -1 we still have to read the bulk length
* from the master reply. */ * from the master reply. */
@ -2176,7 +2644,7 @@ void readSyncBulkPayload(connection *conn) {
* the connection live. So we refresh our last interaction * the connection live. So we refresh our last interaction
* timestamp. */ * timestamp. */
mi->repl_transfer_lastio = g_pserver->unixtime; mi->repl_transfer_lastio = g_pserver->unixtime;
return; return false;
} else if (buf[0] != '$') { } else if (buf[0] != '$') {
serverLog(LL_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf); serverLog(LL_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);
goto error; goto error;
@ -2210,7 +2678,7 @@ void readSyncBulkPayload(connection *conn) {
(long long) mi->repl_transfer_size, (long long) mi->repl_transfer_size,
use_diskless_load? "to parser":"to disk"); use_diskless_load? "to parser":"to disk");
} }
return; return false;
} }
if (!use_diskless_load) { if (!use_diskless_load) {
@ -2227,12 +2695,12 @@ void readSyncBulkPayload(connection *conn) {
if (nread <= 0) { if (nread <= 0) {
if (connGetState(conn) == CONN_STATE_CONNECTED) { if (connGetState(conn) == CONN_STATE_CONNECTED) {
/* equivalent to EAGAIN */ /* equivalent to EAGAIN */
return; return false;
} }
serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s", serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s",
(nread == -1) ? strerror(errno) : "connection lost"); (nread == -1) ? strerror(errno) : "connection lost");
cancelReplicationHandshake(mi, true); cancelReplicationHandshake(mi, true);
return; return false;
} }
g_pserver->stat_net_input_bytes += nread; g_pserver->stat_net_input_bytes += nread;
@ -2301,7 +2769,7 @@ void readSyncBulkPayload(connection *conn) {
/* If the transfer is yet not complete, we need to read more, so /* If the transfer is yet not complete, we need to read more, so
* return ASAP and wait for the handler to be called again. */ * return ASAP and wait for the handler to be called again. */
if (!eof_reached) return; if (!eof_reached) return false;
} }
/* We reach this point in one of the following cases: /* We reach this point in one of the following cases:
@ -2373,7 +2841,7 @@ void readSyncBulkPayload(connection *conn) {
/* Note that there's no point in restarting the AOF on SYNC /* Note that there's no point in restarting the AOF on SYNC
* failure, it'll be restarted when sync succeeds or the replica * failure, it'll be restarted when sync succeeds or the replica
* gets promoted. */ * gets promoted. */
return; return false;
} }
/* RDB loading succeeded if we reach this point. */ /* RDB loading succeeded if we reach this point. */
@ -2393,7 +2861,7 @@ void readSyncBulkPayload(connection *conn) {
serverLog(LL_WARNING,"Replication stream EOF marker is broken"); serverLog(LL_WARNING,"Replication stream EOF marker is broken");
cancelReplicationHandshake(mi,true); cancelReplicationHandshake(mi,true);
rioFreeConn(&rdb, NULL); rioFreeConn(&rdb, NULL);
return; return false;
} }
} }
@ -2425,7 +2893,7 @@ void readSyncBulkPayload(connection *conn) {
"MASTER <-> REPLICA synchronization: %s", "MASTER <-> REPLICA synchronization: %s",
strerror(errno)); strerror(errno));
cancelReplicationHandshake(mi,true); cancelReplicationHandshake(mi,true);
return; return false;
} }
/* Rename rdb like renaming rewrite aof asynchronously. */ /* Rename rdb like renaming rewrite aof asynchronously. */
@ -2438,7 +2906,7 @@ void readSyncBulkPayload(connection *conn) {
g_pserver->rdb_filename, strerror(errno)); g_pserver->rdb_filename, strerror(errno));
cancelReplicationHandshake(mi,true); cancelReplicationHandshake(mi,true);
if (old_rdb_fd != -1) close(old_rdb_fd); if (old_rdb_fd != -1) close(old_rdb_fd);
return; return false;
} }
rdb_filename = g_pserver->rdb_filename; rdb_filename = g_pserver->rdb_filename;
@ -2468,7 +2936,7 @@ void readSyncBulkPayload(connection *conn) {
} }
/* Note that there's no point in restarting the AOF on sync failure, /* Note that there's no point in restarting the AOF on sync failure,
it'll be restarted when sync succeeds or replica promoted. */ it'll be restarted when sync succeeds or replica promoted. */
return; return false;
} }
/* Cleanup. */ /* Cleanup. */
@ -2486,6 +2954,32 @@ void readSyncBulkPayload(connection *conn) {
mi->repl_transfer_tmpfile = NULL; mi->repl_transfer_tmpfile = NULL;
} }
return true;
error:
cancelReplicationHandshake(mi,true);
return false;
}
void readSyncBulkPayload(connection *conn) {
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
redisMaster *mi = (redisMaster*)connGetPrivateData(conn);
static int usemark = 0;
if (mi == nullptr) {
// We're about to be free'd so bail out
return;
}
if (mi->isRocksdbSnapshotRepl) {
if (!readSnapshotBulkPayload(conn, mi, rsi))
return;
} else {
if (!readSyncBulkPayloadRdb(conn, mi, rsi, usemark))
return;
}
// Should we update our database, or create from scratch?
int fUpdate = g_pserver->fActiveReplica || g_pserver->enable_multimaster;
/* Final setup of the connected slave <- master link */ /* Final setup of the connected slave <- master link */
replicationCreateMasterClient(mi,mi->repl_transfer_s,rsi.repl_stream_db); replicationCreateMasterClient(mi,mi->repl_transfer_s,rsi.repl_stream_db);
mi->repl_transfer_s = nullptr; mi->repl_transfer_s = nullptr;
@ -2513,6 +3007,7 @@ void readSyncBulkPayload(connection *conn) {
g_pserver->master_repl_offset = mi->master->reploff; g_pserver->master_repl_offset = mi->master->reploff;
if (g_pserver->repl_batch_offStart >= 0) if (g_pserver->repl_batch_offStart >= 0)
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; g_pserver->repl_batch_offStart = g_pserver->master_repl_offset;
saveMasterStatusToStorage();
} }
clearReplicationId2(); clearReplicationId2();
@ -2528,17 +3023,13 @@ void readSyncBulkPayload(connection *conn) {
} }
/* Send the initial ACK immediately to put this replica in online state. */ /* Send the initial ACK immediately to put this replica in online state. */
if (usemark) replicationSendAck(mi); if (usemark || mi->isRocksdbSnapshotRepl) replicationSendAck(mi);
/* Restart the AOF subsystem now that we finished the sync. This /* Restart the AOF subsystem now that we finished the sync. This
* will trigger an AOF rewrite, and when done will start appending * will trigger an AOF rewrite, and when done will start appending
* to the new file. */ * to the new file. */
if (g_pserver->aof_enabled) restartAOFAfterSYNC(); if (g_pserver->aof_enabled) restartAOFAfterSYNC();
return; return;
error:
cancelReplicationHandshake(mi,true);
return;
} }
char *receiveSynchronousResponse(redisMaster *mi, connection *conn) { char *receiveSynchronousResponse(redisMaster *mi, connection *conn) {
@ -2861,12 +3352,15 @@ void parseMasterCapa(redisMaster *mi, sds strcapa)
char *pchEnd = szStart; char *pchEnd = szStart;
mi->isActive = false; mi->isActive = false;
mi->isRocksdbSnapshotRepl = false;
for (;;) for (;;)
{ {
if (*pchEnd == ' ' || *pchEnd == '\0') { if (*pchEnd == ' ' || *pchEnd == '\0') {
// Parse the word // Parse the word
if (strncmp(szStart, "active-replica", pchEnd - szStart) == 0) { if (strncmp(szStart, "active-replica", pchEnd - szStart) == 0) {
mi->isActive = true; mi->isActive = true;
} else if (strncmp(szStart, "rocksdb-snapshot-save", pchEnd - szStart) == 0) {
mi->isRocksdbSnapshotRepl = true;
} }
szStart = pchEnd + 1; szStart = pchEnd + 1;
} }
@ -2998,8 +3492,20 @@ void syncWithMaster(connection *conn) {
* PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>. * PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
* *
* The master will ignore capabilities it does not understand. */ * The master will ignore capabilities it does not understand. */
err = sendCommand(conn,"REPLCONF",
"capa","eof","capa","psync2","capa","activeExpire",NULL);
std::vector<const char*> veccapabilities = {
"REPLCONF",
"capa","eof",
"capa","psync2",
"capa","activeExpire",
};
if (g_pserver->m_pstorageFactory && !g_pserver->fActiveReplica && g_pserver->repl_diskless_load != REPL_DISKLESS_LOAD_SWAPDB) {
veccapabilities.push_back("capa");
veccapabilities.push_back("rocksdb-snapshot-load");
}
err = sendCommandArgv(conn, veccapabilities.size(), veccapabilities.data(), nullptr);
if (err) goto write_error; if (err) goto write_error;
/* Send UUID */ /* Send UUID */
@ -3343,6 +3849,15 @@ void replicationAbortSyncTransfer(redisMaster *mi) {
* *
* Otherwise zero is returned and no operation is performed at all. */ * Otherwise zero is returned and no operation is performed at all. */
int cancelReplicationHandshake(redisMaster *mi, int reconnect) { int cancelReplicationHandshake(redisMaster *mi, int reconnect) {
if (mi->bulkreadBuffer != nullptr) {
sdsfree(mi->bulkreadBuffer);
mi->bulkreadBuffer = nullptr;
}
if (mi->parseState) {
delete mi->parseState;
mi->parseState = nullptr;
}
if (mi->repl_state == REPL_STATE_TRANSFER) { if (mi->repl_state == REPL_STATE_TRANSFER) {
replicationAbortSyncTransfer(mi); replicationAbortSyncTransfer(mi);
mi->repl_state = REPL_STATE_CONNECT; mi->repl_state = REPL_STATE_CONNECT;
@ -3454,6 +3969,7 @@ struct redisMaster *replicationAddMaster(char *ip, int port) {
mi->masterhost, mi->masterport); mi->masterhost, mi->masterport);
connectWithMaster(mi); connectWithMaster(mi);
} }
saveMasterStatusToStorage();
return mi; return mi;
} }
@ -3537,6 +4053,8 @@ void replicationUnsetMaster(redisMaster *mi) {
/* Restart the AOF subsystem in case we shut it down during a sync when /* Restart the AOF subsystem in case we shut it down during a sync when
* we were still a slave. */ * we were still a slave. */
if (g_pserver->aof_enabled && g_pserver->aof_state == AOF_OFF) restartAOFAfterSYNC(); if (g_pserver->aof_enabled && g_pserver->aof_state == AOF_OFF) restartAOFAfterSYNC();
saveMasterStatusToStorage();
} }
/* This function is called when the replica lose the connection with the /* This function is called when the replica lose the connection with the
@ -3568,6 +4086,8 @@ void replicationHandleMasterDisconnection(redisMaster *mi) {
mi->masterhost, mi->masterport); mi->masterhost, mi->masterport);
connectWithMaster(mi); connectWithMaster(mi);
} }
saveMasterStatusToStorage();
} }
} }
@ -4336,6 +4856,13 @@ void replicationCron(void) {
client *replica = (client*)ln->value; client *replica = (client*)ln->value;
std::unique_lock<fastlock> ul(replica->lock); std::unique_lock<fastlock> ul(replica->lock);
if (replica->replstate == SLAVE_STATE_FASTSYNC_DONE && !clientHasPendingReplies(replica)) {
serverLog(LL_WARNING, "Putting replica online");
replica->postFunction([](client *c){
putSlaveOnline(c);
});
}
if (replica->replstate == SLAVE_STATE_ONLINE) { if (replica->replstate == SLAVE_STATE_ONLINE) {
if (replica->flags & CLIENT_PRE_PSYNC) if (replica->flags & CLIENT_PRE_PSYNC)
continue; continue;

View File

@ -369,6 +369,8 @@ public:
bool operator==(const char *other) const bool operator==(const char *other) const
{ {
if (other == nullptr || m_str == nullptr)
return other == m_str;
return sdscmp(m_str, other) == 0; return sdscmp(m_str, other) == 0;
} }
@ -396,8 +398,11 @@ public:
{} {}
sdsstring(const sdsstring &other) sdsstring(const sdsstring &other)
: sdsview(sdsdup(other.m_str)) : sdsview()
{} {
if (other.m_str != nullptr)
m_str = sdsdup(other.m_str);
}
sdsstring(const char *rgch, size_t cch) sdsstring(const char *rgch, size_t cch)
: sdsview(sdsnewlen(rgch, cch)) : sdsview(sdsnewlen(rgch, cch))

View File

@ -3984,6 +3984,38 @@ void initServer(void) {
slowlogInit(); slowlogInit();
latencyMonitorInit(); latencyMonitorInit();
if (g_pserver->m_pstorageFactory) {
g_pserver->metadataDb = g_pserver->m_pstorageFactory->createMetadataDb();
if (g_pserver->metadataDb) {
g_pserver->metadataDb->retrieve("repl-id", 7, [&](const char *, size_t, const void *data, size_t cb){
if (cb == sizeof(g_pserver->replid)) {
memcpy(g_pserver->replid, data, cb);
}
});
g_pserver->metadataDb->retrieve("repl-offset", 11, [&](const char *, size_t, const void *data, size_t cb){
if (cb == sizeof(g_pserver->replid)) {
g_pserver->master_repl_offset = *(long long*)data;
}
});
listIter li;
listNode *ln;
listRewind(g_pserver->masters, &li);
while ((ln = listNext(&li)))
{
redisMaster *mi = (redisMaster*)listNodeValue(ln);
/* If we are a replica, create a cached master from this
* information, in order to allow partial resynchronizations
* with masters. */
replicationCacheMasterUsingMyself(mi);
g_pserver->metadataDb->retrieve("repl-stream-db", 14, [&](const char *, size_t, const void *data, size_t){
selectDb(mi->cached_master, *(int*)data);
});
}
}
}
/* We have to initialize storage providers after the cluster has been initialized */ /* We have to initialize storage providers after the cluster has been initialized */
for (int idb = 0; idb < cserver.dbnum; ++idb) for (int idb = 0; idb < cserver.dbnum; ++idb)
{ {

View File

@ -607,12 +607,15 @@ typedef enum {
#define SLAVE_STATE_WAIT_BGSAVE_END 7 /* Waiting RDB file creation to finish. */ #define SLAVE_STATE_WAIT_BGSAVE_END 7 /* Waiting RDB file creation to finish. */
#define SLAVE_STATE_SEND_BULK 8 /* Sending RDB file to replica. */ #define SLAVE_STATE_SEND_BULK 8 /* Sending RDB file to replica. */
#define SLAVE_STATE_ONLINE 9 /* RDB file transmitted, sending just updates. */ #define SLAVE_STATE_ONLINE 9 /* RDB file transmitted, sending just updates. */
#define SLAVE_STATE_FASTSYNC_TX 10
#define SLAVE_STATE_FASTSYNC_DONE 11
/* Slave capabilities. */ /* Slave capabilities. */
#define SLAVE_CAPA_NONE 0 #define SLAVE_CAPA_NONE 0
#define SLAVE_CAPA_EOF (1<<0) /* Can parse the RDB EOF streaming format. */ #define SLAVE_CAPA_EOF (1<<0) /* Can parse the RDB EOF streaming format. */
#define SLAVE_CAPA_PSYNC2 (1<<1) /* Supports PSYNC2 protocol. */ #define SLAVE_CAPA_PSYNC2 (1<<1) /* Supports PSYNC2 protocol. */
#define SLAVE_CAPA_ACTIVE_EXPIRE (1<<2) /* Will the slave perform its own expirations? (Don't send delete) */ #define SLAVE_CAPA_ACTIVE_EXPIRE (1<<2) /* Will the slave perform its own expirations? (Don't send delete) */
#define SLAVE_CAPA_ROCKSDB_SNAPSHOT (1<<3)
/* Synchronous read timeout - replica side */ /* Synchronous read timeout - replica side */
#define CONFIG_REPL_SYNCIO_TIMEOUT 5 #define CONFIG_REPL_SYNCIO_TIMEOUT 5
@ -1102,7 +1105,12 @@ public:
size_t slots() const { return dictSlots(m_pdict); } size_t slots() const { return dictSlots(m_pdict); }
size_t size(bool fCachedOnly = false) const; size_t size(bool fCachedOnly = false) const;
void expand(uint64_t slots) { dictExpand(m_pdict, slots); } void expand(uint64_t slots) {
if (m_spstorage)
m_spstorage->expand(slots);
else
dictExpand(m_pdict, slots);
}
void trackkey(robj_roptr o, bool fUpdate) void trackkey(robj_roptr o, bool fUpdate)
{ {
@ -1193,6 +1201,9 @@ public:
bool FSnapshot() const { return m_spdbSnapshotHOLDER != nullptr; } bool FSnapshot() const { return m_spdbSnapshotHOLDER != nullptr; }
std::unique_ptr<const StorageCache> CloneStorageCache() { return std::unique_ptr<const StorageCache>(m_spstorage->clone()); }
void bulkStorageInsert(char **rgKeys, size_t *rgcbKeys, char **rgVals, size_t *rgcbVals, size_t celem);
dict_iter find_cached_threadsafe(const char *key) const; dict_iter find_cached_threadsafe(const char *key) const;
protected: protected:
@ -1352,6 +1363,8 @@ struct redisDb : public redisDbPersistentDataSnapshot
using redisDbPersistentData::prepOverwriteForSnapshot; using redisDbPersistentData::prepOverwriteForSnapshot;
using redisDbPersistentData::FRehashing; using redisDbPersistentData::FRehashing;
using redisDbPersistentData::FTrackingChanges; using redisDbPersistentData::FTrackingChanges;
using redisDbPersistentData::CloneStorageCache;
using redisDbPersistentData::bulkStorageInsert;
public: public:
expireset::setiter expireitr; expireset::setiter expireitr;
@ -1673,7 +1686,7 @@ struct client {
size_t argv_len_sumActive = 0; size_t argv_len_sumActive = 0;
bool FPendingReplicaWrite() const { bool FPendingReplicaWrite() const {
return repl_curr_off != repl_end_off; return repl_curr_off != repl_end_off && replstate == SLAVE_STATE_ONLINE;
} }
// post a function from a non-client thread to run on its client thread // post a function from a non-client thread to run on its client thread
@ -2058,6 +2071,7 @@ struct redisMaster {
long long master_initial_offset; /* Master PSYNC offset. */ long long master_initial_offset; /* Master PSYNC offset. */
bool isActive = false; bool isActive = false;
bool isRocksdbSnapshotRepl = false;
int repl_state; /* Replication status if the instance is a replica */ int repl_state; /* Replication status if the instance is a replica */
off_t repl_transfer_size; /* Size of RDB to read from master during sync. */ off_t repl_transfer_size; /* Size of RDB to read from master during sync. */
off_t repl_transfer_read; /* Amount of RDB read from master during sync. */ off_t repl_transfer_read; /* Amount of RDB read from master during sync. */
@ -2068,6 +2082,9 @@ struct redisMaster {
time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */ time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */
time_t repl_down_since; /* Unix time at which link with master went down */ time_t repl_down_since; /* Unix time at which link with master went down */
class SnapshotPayloadParseState *parseState;
sds bulkreadBuffer = nullptr;
unsigned char master_uuid[UUID_BINARY_LEN]; /* Used during sync with master, this is our master's UUID */ unsigned char master_uuid[UUID_BINARY_LEN]; /* Used during sync with master, this is our master's UUID */
/* After we've connected with our master use the UUID in g_pserver->master */ /* After we've connected with our master use the UUID in g_pserver->master */
uint64_t mvccLastSync; uint64_t mvccLastSync;
@ -2154,6 +2171,7 @@ struct redisServer {
mode_t umask; /* The umask value of the process on startup */ mode_t umask; /* The umask value of the process on startup */
std::atomic<int> hz; /* serverCron() calls frequency in hertz */ std::atomic<int> hz; /* serverCron() calls frequency in hertz */
int in_fork_child; /* indication that this is a fork child */ int in_fork_child; /* indication that this is a fork child */
IStorage *metadataDb = nullptr;
redisDb **db = nullptr; redisDb **db = nullptr;
dict *commands; /* Command table */ dict *commands; /* Command table */
dict *orig_commands; /* Command table before command renaming. */ dict *orig_commands; /* Command table before command renaming. */
@ -2831,7 +2849,7 @@ void addReplyDouble(client *c, double d);
void addReplyHumanLongDouble(client *c, long double d); void addReplyHumanLongDouble(client *c, long double d);
void addReplyLongLong(client *c, long long ll); void addReplyLongLong(client *c, long long ll);
#ifdef __cplusplus #ifdef __cplusplus
void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix); void addReplyLongLongWithPrefix(client *c, long long ll, char prefix);
#endif #endif
void addReplyArrayLen(client *c, long length); void addReplyArrayLen(client *c, long length);
void addReplyMapLen(client *c, long length); void addReplyMapLen(client *c, long length);

View File

@ -0,0 +1,29 @@
#pragma once
#include "rocksdb.h"
class RocksDBStorageFactory : public IStorageFactory
{
std::shared_ptr<rocksdb::DB> m_spdb; // Note: This must be first so it is deleted last
std::vector<std::unique_ptr<rocksdb::ColumnFamilyHandle>> m_vecspcols;
std::shared_ptr<rocksdb::SstFileManager> m_pfilemanager;
std::string m_path;
bool m_fCreatedTempFolder = false;
public:
RocksDBStorageFactory(const char *dbfile, int dbnum, const char *rgchConfig, size_t cchConfig);
~RocksDBStorageFactory();
virtual IStorage *create(int db, key_load_iterator iter, void *privdata) override;
virtual IStorage *createMetadataDb() override;
virtual const char *name() const override;
virtual size_t totalDiskspaceUsed() const override;
virtual bool FSlow() const override { return true; }
virtual size_t filedsRequired() const override;
std::string getTempFolder();
private:
void setVersion(rocksdb::ColumnFamilyHandle*);
};

View File

@ -6,6 +6,13 @@ IStorage *TestStorageFactory::create(int, key_load_iterator, void *)
return new (MALLOC_LOCAL) TestStorageProvider(); return new (MALLOC_LOCAL) TestStorageProvider();
} }
IStorage *TestStorageFactory::createMetadataDb()
{
IStorage *metadataDb = new (MALLOC_LOCAL) TestStorageProvider();
metadataDb->insert("KEYDB_METADATA_ID", strlen("KEYDB_METADATA_ID"), (void*)METADATA_DB_IDENTIFIER, strlen(METADATA_DB_IDENTIFIER), false);
return metadataDb;
}
const char *TestStorageFactory::name() const const char *TestStorageFactory::name() const
{ {
return "TEST Storage Provider"; return "TEST Storage Provider";

View File

@ -5,6 +5,7 @@
class TestStorageFactory : public IStorageFactory class TestStorageFactory : public IStorageFactory
{ {
virtual class IStorage *create(int db, key_load_iterator itr, void *privdata) override; virtual class IStorage *create(int db, key_load_iterator itr, void *privdata) override;
virtual class IStorage *createMetadataDb() override;
virtual const char *name() const override; virtual const char *name() const override;
virtual size_t totalDiskspaceUsed() const override { return 0; } virtual size_t totalDiskspaceUsed() const override { return 0; }
virtual bool FSlow() const { return false; } virtual bool FSlow() const { return false; }

View File

@ -0,0 +1,4 @@
diskless no replicas drop during rdb pipe
diskless slow replicas drop during rdb pipe
diskless timeout replicas drop during rdb pipe
Kill rdb child process if its dumping RDB is not useful

View File

@ -0,0 +1,34 @@
proc prepare_value {size} {
set _v "c"
for {set i 1} {$i < $size} {incr i} {
append _v 0
}
return $_v
}
start_server {tags {"replication-fast"} overrides {storage-provider {flash ./rocks.db.master}}} {
set slave [srv 0 client]
set slave_host [srv 0 host]
set slave_port [srv 0 port]
start_server {tags {} overrides {storage-provider {flash ./rocks.db.replica}}} {
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]
test "fast replication with large value" {
set _v [prepare_value [expr 64*1024*1024]]
# $master set key $_v
$slave replicaof $master_host $master_port
wait_for_condition 50 300 {
[lindex [$slave role] 0] eq {slave} &&
[string match {*master_link_status:up*} [$slave info replication]]
} else {
fail "Can't turn the instance into a replica"
}
assert_equal [$slave debug digest] [$master debug digest]
$slave replicaof no one
}
}
}

View File

@ -0,0 +1,135 @@
# Creates a master-slave pair and breaks the link continuously to force
# partial resyncs attempts, all this while flooding the master with
# write queries.
#
# You can specify backlog size, ttl, delay before reconnection, test duration
# in seconds, and an additional condition to verify at the end.
#
# If reconnect is > 0, the test actually try to break the connection and
# reconnect with the master, otherwise just the initial synchronization is
# checked for consistency.
proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reconnect} {
start_server {tags {"repl"}} {
start_server [list tags {flash} overrides [list storage-provider {flash ./rocks.db} delete-on-evict no storage-flush-period 10]] {
set master [srv -1 client]
set master_host [srv -1 host]
set master_port [srv -1 port]
set slave [srv 0 client]
set load_handle0 [start_bg_complex_data $master_host $master_port 9 100000]
set load_handle1 [start_bg_complex_data $master_host $master_port 11 100000]
set load_handle2 [start_bg_complex_data $master_host $master_port 12 100000]
test {Slave should be able to synchronize with the master} {
$slave slaveof $master_host $master_port
wait_for_condition 50 100 {
[lindex [r role] 0] eq {slave} &&
[lindex [r role] 3] eq {connected}
} else {
fail "Replication not started."
}
}
# Check that the background clients are actually writing.
test {Detect write load to master} {
wait_for_condition 50 1000 {
[$master dbsize] > 100
} else {
fail "Can't detect write load from background clients."
}
}
test "Test replication partial resync: $descr (diskless: $mdl, $sdl, reconnect: $reconnect)" {
# Now while the clients are writing data, break the maste-slave
# link multiple times.
if ($reconnect) {
for {set j 0} {$j < $duration*10} {incr j} {
after 100
# catch {puts "MASTER [$master dbsize] keys, REPLICA [$slave dbsize] keys"}
if {($j % 20) == 0} {
catch {
$slave debug restart
}
}
}
}
stop_bg_complex_data $load_handle0
stop_bg_complex_data $load_handle1
stop_bg_complex_data $load_handle2
# Wait for the slave to reach the "online"
# state from the POV of the master.
set retry 5000
while {$retry} {
set info [$master info]
if {[string match {*slave0:*state=online*} $info]} {
break
} else {
incr retry -1
after 100
}
}
if {$retry == 0} {
error "assertion:Slave not correctly synchronized"
}
# Wait that slave acknowledge it is online so
# we are sure that DBSIZE and DEBUG DIGEST will not
# fail because of timing issues. (-LOADING error)
wait_for_condition 5000 100 {
[lindex [$slave role] 3] eq {connected}
} else {
fail "Slave still not connected after some time"
}
set retry 10
while {$retry && ([$master debug digest] ne [$slave debug digest])}\
{
after 1000
incr retry -1
}
assert {[$master dbsize] > 0}
if {[$master debug digest] ne [$slave debug digest]} {
set csv1 [csvdump r]
set csv2 [csvdump {r -1}]
set fd [open /tmp/repldump1.txt w]
puts -nonewline $fd $csv1
close $fd
set fd [open /tmp/repldump2.txt w]
puts -nonewline $fd $csv2
close $fd
puts "Master - Replica inconsistency"
puts "Run diff -u against /tmp/repldump*.txt for more info"
}
assert_equal [r debug digest] [r -1 debug digest]
eval $cond
}
}
}
}
foreach mdl {no yes} {
foreach sdl {disabled swapdb} {
test_psync {no reconnection, just sync} 6 1000000 3600 0 {
} $mdl $sdl 0
test_psync {ok psync} 6 100000000 3600 0 {
assert {[s -1 sync_partial_ok] > 0}
} $mdl $sdl 1
test_psync {no backlog} 6 100 3600 0.5 {
assert {[s -1 sync_partial_err] > 0}
} $mdl $sdl 1
test_psync {ok after delay} 3 100000000 3600 3 {
assert {[s -1 sync_partial_ok] > 0}
} $mdl $sdl 1
test_psync {backlog expired} 3 100000000 1 3 {
assert {[s -1 sync_partial_err] > 0}
} $mdl $sdl 1
}
}

View File

@ -1,4 +1,5 @@
set ::global_overrides {} set ::global_overrides {}
set ::global_storage_provider {}
set ::tags {} set ::tags {}
set ::valgrind_errors {} set ::valgrind_errors {}
@ -392,6 +393,9 @@ proc start_server {options {code undefined}} {
foreach {directive arguments} [concat $::global_overrides $overrides] { foreach {directive arguments} [concat $::global_overrides $overrides] {
dict set config $directive $arguments dict set config $directive $arguments
} }
foreach {directive argument1 argument2} $::global_storage_provider {
dict set config $directive $argument1 $argument2
}
# remove directives that are marked to be omitted # remove directives that are marked to be omitted
foreach directive $omit { foreach directive $omit {

View File

@ -47,6 +47,7 @@ set ::all_tests {
integration/replication-3 integration/replication-3
integration/replication-4 integration/replication-4
integration/replication-psync integration/replication-psync
integration/replication-psync-flash
integration/replication-active integration/replication-active
integration/replication-multimaster integration/replication-multimaster
integration/replication-multimaster-connect integration/replication-multimaster-connect
@ -59,6 +60,7 @@ set ::all_tests {
integration/failover integration/failover
integration/redis-cli integration/redis-cli
integration/redis-benchmark integration/redis-benchmark
integration/replication-fast
unit/pubsub unit/pubsub
unit/slowlog unit/slowlog
unit/scripting unit/scripting
@ -692,6 +694,21 @@ for {set j 0} {$j < [llength $argv]} {incr j} {
} elseif {$opt eq {--help}} { } elseif {$opt eq {--help}} {
print_help_screen print_help_screen
exit 0 exit 0
} elseif {$opt eq {--flash}} {
lappend ::global_storage_provider storage-provider
lappend ::global_storage_provider flash
lappend ::global_storage_provider ./rocks.db
set ::all_tests {
integration/replication
integration/replication-2
integration/replication-3
integration/replication-4
integration/replication-psync
}
set fp [open {./tests/integration/rdb-repl-tests} r]
set file_data [read $fp]
close $fp
set ::skiptests [split $file_data "\n"]
} else { } else {
puts "Wrong argument: $opt" puts "Wrong argument: $opt"
exit 1 exit 1