Initial implementation of snapshot fast replication. There are still a few TODOs in progress

Former-commit-id: 0febdcdab8693af443f350968ed3d8c80106675d
This commit is contained in:
John Sully 2021-11-09 19:36:07 +00:00
parent 60f08d5f93
commit 1ebf62da26
20 changed files with 1072 additions and 103 deletions

View File

@ -14,3 +14,4 @@ then
exit 1
fi
$TCLSH tests/test_helper.tcl "${@}"
$TCLSH tests/test_helper.tcl "${@}--flash"

View File

@ -30,10 +30,10 @@ public:
virtual bool enumerate(callback fn) const = 0;
virtual size_t count() const = 0;
virtual void bulkInsert(sds *rgkeys, sds *rgvals, size_t celem) {
virtual void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem) {
beginWriteBatch();
for (size_t ielem = 0; ielem < celem; ++ielem) {
insert(rgkeys[ielem], sdslen(rgkeys[ielem]), rgvals[ielem], sdslen(rgvals[ielem]), false);
insert(rgkeys[ielem], rgcbkeys[ielem], rgvals[ielem], rgcbvals[ielem], false);
}
endWriteBatch();
}

View File

@ -354,6 +354,7 @@ endif
REDIS_SERVER_NAME=keydb-server$(PROG_SUFFIX)
REDIS_SENTINEL_NAME=keydb-sentinel$(PROG_SUFFIX)
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o t_nhash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o cron.o connection.o tls.o sha256.o motd_server.o timeout.o setcpuaffinity.o AsyncWorkQueue.o snapshot.o storage/rocksdb.o storage/rocksdbfactory.o storage/teststorageprovider.o keydbutils.o StorageCache.o monotonic.o cli_common.o mt19937-64.o $(ASM_OBJ)
KEYDB_SERVER_OBJ=SnapshotPayloadParseState.o
REDIS_CLI_NAME=keydb-cli$(PROG_SUFFIX)
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crcspeed.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o motd_client.o monotonic.o cli_common.o mt19937-64.o $(ASM_OBJ)
REDIS_BENCHMARK_NAME=keydb-benchmark$(PROG_SUFFIX)
@ -410,7 +411,7 @@ endif
@touch $@
# keydb-server
$(REDIS_SERVER_NAME): $(REDIS_SERVER_OBJ)
$(REDIS_SERVER_NAME): $(REDIS_SERVER_OBJ) $(KEYDB_SERVER_OBJ)
$(REDIS_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/lua/src/liblua.a ../deps/rocksdb/librocksdb.a $(FINAL_LIBS)
# keydb-sentinel
@ -433,7 +434,7 @@ $(REDIS_CLI_NAME): $(REDIS_CLI_OBJ)
$(REDIS_BENCHMARK_NAME): $(REDIS_BENCHMARK_OBJ)
$(REDIS_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/hdr_histogram/hdr_histogram.o $(FINAL_LIBS)
DEP = $(REDIS_SERVER_OBJ:%.o=%.d) $(REDIS_CLI_OBJ:%.o=%.d) $(REDIS_BENCHMARK_OBJ:%.o=%.d)
DEP = $(REDIS_SERVER_OBJ:%.o=%.d) $(KEYDB_SERVER_OBJ:%.o=%.d) $(REDIS_CLI_OBJ:%.o=%.d) $(REDIS_BENCHMARK_OBJ:%.o=%.d)
-include $(DEP)
# Because the jemalloc.h header is generated as a part of the jemalloc build,

View File

@ -0,0 +1,287 @@
#include "server.h"
#include "SnapshotPayloadParseState.h"
#include <sys/mman.h>
static const size_t SLAB_SIZE = 2*1024*1024; // Note 64 MB because we try to give 64MB to a block
class SlabAllocator
{
class Slab {
char *m_pv = nullptr;
size_t m_cb = 0;
size_t m_cbAllocated = 0;
public:
Slab(size_t cbAllocate) {
m_pv = (char*)zmalloc(cbAllocate);
m_cb = cbAllocate;
}
~Slab() {
zfree(m_pv);
}
Slab(Slab &&other) {
m_pv = other.m_pv;
m_cb = other.m_cb;
m_cbAllocated = other.m_cbAllocated;
other.m_pv = nullptr;
other.m_cb = 0;
other.m_cbAllocated = 0;
}
bool canAllocate(size_t cb) const {
return (m_cbAllocated + cb) <= m_cb;
}
void *allocate(size_t cb) {
if ((m_cbAllocated + cb) > m_cb)
return nullptr;
void *pvret = m_pv + m_cbAllocated;
m_cbAllocated += cb;
return pvret;
}
};
std::vector<Slab> m_vecslabs;
public:
void *allocate(size_t cb) {
if (m_vecslabs.empty() || !m_vecslabs.back().canAllocate(cb)) {
m_vecslabs.emplace_back(std::max(cb, SLAB_SIZE));
}
return m_vecslabs.back().allocate(cb);
}
};
static uint64_t dictCStringHash(const void *key) {
return dictGenHashFunction((unsigned char*)key, strlen((char*)key));
}
static void dictKeyDestructor(void *privdata, void *key)
{
DICT_NOTUSED(privdata);
sdsfree((sds)key);
}
static int dictCStringCompare(void *, const void *key1, const void *key2)
{
int l1,l2;
l1 = strlen((sds)key1);
l2 = strlen((sds)key2);
if (l1 != l2) return 0;
return memcmp(key1, key2, l1) == 0;
}
dictType metadataLongLongDictType = {
dictCStringHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictCStringCompare, /* key compare */
dictKeyDestructor, /* key destructor */
nullptr, /* val destructor */
nullptr, /* allow to expand */
nullptr /* async free destructor */
};
SnapshotPayloadParseState::ParseStageName SnapshotPayloadParseState::getNextStage() {
if (stackParse.empty())
return ParseStageName::Global;
switch (stackParse.top().name)
{
case ParseStageName::None:
return ParseStageName::Global;
case ParseStageName::Global:
if (stackParse.top().arraycur == 0)
return ParseStageName::MetaData;
else if (stackParse.top().arraycur == 1)
return ParseStageName::Databases;
break;
case ParseStageName::MetaData:
return ParseStageName::KeyValuePair;
case ParseStageName::Databases:
return ParseStageName::Dataset;
case ParseStageName::Dataset:
return ParseStageName::KeyValuePair;
default:
break;
}
throw "Bad protocol: corrupt state";
}
void SnapshotPayloadParseState::flushQueuedKeys() {
if (vecqueuedKeys.empty())
return;
// TODO: We can't finish parse until all the work functions finish
int idb = current_database;
serverAssert(vecqueuedKeys.size() == vecqueuedVals.size());
auto sizePrev = vecqueuedKeys.size();
++insertsInFlight;
auto &insertsInFlightTmp = insertsInFlight; // C++ GRRRRRRRRRRRRRRRR, we don't want to capute "this" because that's dangerous
g_pserver->asyncworkqueue->AddWorkFunction([idb, vecqueuedKeys = std::move(this->vecqueuedKeys), vecqueuedKeysCb = std::move(this->vecqueuedKeysCb), vecqueuedVals = std::move(this->vecqueuedVals), vecqueuedValsCb = std::move(this->vecqueuedValsCb), &insertsInFlightTmp, pallocator = m_spallocator.release()]() mutable {
g_pserver->db[idb]->bulkStorageInsert(vecqueuedKeys.data(), vecqueuedKeysCb.data(), vecqueuedVals.data(), vecqueuedValsCb.data(), vecqueuedKeys.size());
--insertsInFlightTmp;
delete pallocator;
});
m_spallocator = std::make_unique<SlabAllocator>();
cbQueued = 0;
vecqueuedKeys.reserve(sizePrev);
vecqueuedKeysCb.reserve(sizePrev);
vecqueuedVals.reserve(sizePrev);
vecqueuedValsCb.reserve(sizePrev);
serverAssert(vecqueuedKeys.empty());
serverAssert(vecqueuedVals.empty());
}
SnapshotPayloadParseState::SnapshotPayloadParseState() {
// This is to represent the first array the client is intended to send us
ParseStage stage;
stage.name = ParseStageName::None;
stage.arraylen = 1;
stackParse.push(stage);
dictLongLongMetaData = dictCreate(&metadataLongLongDictType, nullptr);
insertsInFlight = 0;
m_spallocator = std::make_unique<SlabAllocator>();
}
SnapshotPayloadParseState::~SnapshotPayloadParseState() {
dictRelease(dictLongLongMetaData);
}
const char *SnapshotPayloadParseState::getStateDebugName() {
switch (stackParse.top().name) {
case ParseStageName::None:
return "None";
case ParseStageName::Global:
return "Global";
case ParseStageName::MetaData:
return "MetaData";
case ParseStageName::Databases:
return "Databases";
case ParseStageName::KeyValuePair:
return "KeyValuePair";
case ParseStageName::Dataset:
return "Dataset";
default:
return "Unknown";
}
}
size_t SnapshotPayloadParseState::depth() const { return stackParse.size(); }
void SnapshotPayloadParseState::trimState() {
while (!stackParse.empty() && (stackParse.top().arraycur == stackParse.top().arraylen))
stackParse.pop();
if (stackParse.empty()) {
flushQueuedKeys();
while (insertsInFlight > 0) {
// TODO: ProcessEventsWhileBlocked
aeReleaseLock();
aeAcquireLock();
}
}
}
void SnapshotPayloadParseState::pushArray(long long size) {
if (stackParse.empty())
throw "Bad Protocol: unexpected trailing data";
if (size == 0) {
stackParse.top().arraycur++;
return;
}
if (stackParse.top().name == ParseStageName::Databases) {
flushQueuedKeys();
current_database = stackParse.top().arraycur;
}
ParseStage stage;
stage.name = getNextStage();
stage.arraylen = size;
if (stage.name == ParseStageName::Dataset) {
g_pserver->db[current_database]->expand(stage.arraylen);
}
// Note: This affects getNextStage so ensure its after
stackParse.top().arraycur++;
stackParse.push(stage);
}
void SnapshotPayloadParseState::pushValue(const char *rgch, long long cch) {
if (stackParse.empty())
throw "Bad Protocol: unexpected trailing bulk string";
if (stackParse.top().arraycur >= static_cast<int>(stackParse.top().arrvalues.size()))
throw "Bad protocol: Unexpected value";
auto &stage = stackParse.top();
stage.arrvalues[stackParse.top().arraycur].first = (char*)m_spallocator->allocate(cch);
stage.arrvalues[stackParse.top().arraycur].second = cch;
memcpy(stage.arrvalues[stackParse.top().arraycur].first, rgch, cch);
stage.arraycur++;
switch (stage.name) {
case ParseStageName::KeyValuePair:
if (stage.arraycur == 2) {
// We loaded both pairs
if (stage.arrvalues[0].first == nullptr || stage.arrvalues[1].first == nullptr)
throw "Bad Protocol: Got array when expecing a string"; // A baddy could make us derefence the vector when its too small
vecqueuedKeys.push_back(stage.arrvalues[0].first);
vecqueuedKeysCb.push_back(stage.arrvalues[0].second);
vecqueuedVals.push_back(stage.arrvalues[1].first);
vecqueuedValsCb.push_back(stage.arrvalues[1].second);
stage.arrvalues[0].first = nullptr;
stage.arrvalues[1].first = nullptr;
cbQueued += vecqueuedKeysCb.back();
cbQueued += vecqueuedValsCb.back();
if (cbQueued >= queuedBatchLimit)
flushQueuedKeys();
}
break;
default:
throw "Bad Protocol: unexpected bulk string";
}
}
void SnapshotPayloadParseState::pushValue(long long value) {
if (stackParse.empty())
throw "Bad Protocol: unexpected integer value";
stackParse.top().arraycur++;
if (stackParse.top().arraycur != 2 || stackParse.top().arrvalues[0].first == nullptr)
throw "Bad Protocol: unexpected integer value";
dictEntry *de = dictAddRaw(dictLongLongMetaData, sdsnewlen(stackParse.top().arrvalues[0].first, stackParse.top().arrvalues[0].second), nullptr);
if (de == nullptr)
throw "Bad Protocol: metadata field sent twice";
de->v.s64 = value;
}
long long SnapshotPayloadParseState::getMetaDataLongLong(const char *szField) const {
dictEntry *de = dictFind(dictLongLongMetaData, szField);
if (de == nullptr) {
serverLog(LL_WARNING, "Master did not send field: %s", szField);
throw false;
}
return de->v.s64;
}

View File

@ -0,0 +1,64 @@
#pragma once
#include <stack>
#include <vector>
class SlabAllocator;
class SnapshotPayloadParseState {
enum class ParseStageName {
None,
Global,
MetaData,
Databases,
Dataset,
KeyValuePair,
};
struct ParseStage {
ParseStageName name;
long long arraylen = 0;
long long arraycur = 0;
std::array<std::pair<char*, size_t>, 2> arrvalues;
ParseStage() {
for (auto &pair : arrvalues) {
pair.first = nullptr;
pair.second = 0;
}
}
};
std::stack<ParseStage> stackParse;
std::vector<char*> vecqueuedKeys;
std::vector<size_t> vecqueuedKeysCb;
std::vector<char*> vecqueuedVals;
std::vector<size_t> vecqueuedValsCb;
std::atomic<int> insertsInFlight;
std::unique_ptr<SlabAllocator> m_spallocator;
dict *dictLongLongMetaData = nullptr;
size_t cbQueued = 0;
static const size_t queuedBatchLimit = 64*1024*1024; // 64 MB
int current_database = -1;
ParseStageName getNextStage();
void flushQueuedKeys();
public:
SnapshotPayloadParseState();
~SnapshotPayloadParseState();
long long getMetaDataLongLong(const char *field) const;
const char *getStateDebugName();
size_t depth() const;
void trimState();
void pushArray(long long size);
void pushValue(const char *rgch, long long cch);
void pushValue(long long value);
bool shouldThrottle() const { return insertsInFlight > (cserver.cthreads*4); }
};

View File

@ -97,17 +97,47 @@ void StorageCache::insert(sds key, const void *data, size_t cbdata, bool fOverwr
m_spstorage->insert(key, sdslen(key), (void*)data, cbdata, fOverwrite);
}
void StorageCache::bulkInsert(sds *rgkeys, sds *rgvals, size_t celem)
long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing);
void StorageCache::bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem)
{
std::vector<dictEntry*> vechashes;
if (m_pdict != nullptr) {
vechashes.reserve(celem);
for (size_t ielem = 0; ielem < celem; ++ielem) {
dictEntry *de = (dictEntry*)zmalloc(sizeof(dictEntry));
de->key = (void*)dictGenHashFunction(rgkeys[ielem], (int)rgcbkeys[ielem]);
de->v.u64 = 1;
vechashes.push_back(de);
}
}
std::unique_lock<fastlock> ul(m_lock);
bulkInsertsInProgress++;
if (m_pdict != nullptr) {
for (size_t ielem = 0; ielem < celem; ++ielem) {
cacheKey(rgkeys[ielem]);
for (dictEntry *de : vechashes) {
if (dictIsRehashing(m_pdict)) dictRehash(m_pdict,1);
/* Get the index of the new element, or -1 if
* the element already exists. */
long index;
if ((index = _dictKeyIndex(m_pdict, de->key, (uint64_t)de->key, nullptr)) == -1) {
dictEntry *de = dictFind(m_pdict, de->key);
serverAssert(de != nullptr);
de->v.s64++;
m_collisionCount++;
zfree(de);
} else {
int iht = dictIsRehashing(m_pdict) ? 1 : 0;
de->next = m_pdict->ht[iht].table[index];
m_pdict->ht[iht].table[index] = de;
m_pdict->ht[iht].used++;
}
}
}
ul.unlock();
m_spstorage->bulkInsert(rgkeys, rgvals, celem);
m_spstorage->bulkInsert(rgkeys, rgcbkeys, rgvals, rgcbvals, celem);
bulkInsertsInProgress--;
}
@ -119,6 +149,14 @@ const StorageCache *StorageCache::clone()
return cacheNew;
}
void StorageCache::expand(uint64_t slots)
{
std::unique_lock<fastlock> ul(m_lock);
if (m_pdict) {
dictExpand(m_pdict, slots);
}
}
void StorageCache::retrieve(sds key, IStorage::callbackSingle fn) const
{
std::unique_lock<fastlock> ul(m_lock);

View File

@ -40,11 +40,12 @@ public:
void clear();
void insert(sds key, const void *data, size_t cbdata, bool fOverwrite);
void bulkInsert(sds *rgkeys, sds *rgvals, size_t celem);
void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem);
void retrieve(sds key, IStorage::callbackSingle fn) const;
bool erase(sds key);
void emergencyFreeCache();
bool keycacheIsEnabled() const { return m_pdict != nullptr; }
void expand(uint64_t slots);
bool enumerate(IStorage::callback fn) const { return m_spstorage->enumerate(fn); }

View File

@ -2903,8 +2903,10 @@ bool redisDbPersistentData::processChanges(bool fSnapshot)
{
if (m_fAllChanged)
{
m_spstorage->clear();
storeDatabase();
if (dictSize(m_pdict) > 0 || m_spstorage->count() > 0) { // in some cases we may have pre-sized the StorageCache's dict, and we don't want clear to ruin it
m_spstorage->clear();
storeDatabase();
}
m_fAllChanged = 0;
}
else
@ -2936,15 +2938,19 @@ void redisDbPersistentData::processChangesAsync(std::atomic<int> &pendingJobs)
dictIterator *di = dictGetIterator(dictNew);
dictEntry *de;
std::vector<sds> veckeys;
std::vector<size_t> veccbkeys;
std::vector<sds> vecvals;
std::vector<size_t> veccbvals;
while ((de = dictNext(di)) != nullptr)
{
robj *o = (robj*)dictGetVal(de);
sds temp = serializeStoredObjectAndExpire(this, (const char*) dictGetKey(de), o);
veckeys.push_back((sds)dictGetKey(de));
veccbkeys.push_back(sdslen((sds)dictGetKey(de)));
vecvals.push_back(temp);
veccbvals.push_back(sdslen(temp));
}
m_spstorage->bulkInsert(veckeys.data(), vecvals.data(), veckeys.size());
m_spstorage->bulkInsert(veckeys.data(), veccbkeys.data(), vecvals.data(), veccbvals.data(), veckeys.size());
for (auto val : vecvals)
sdsfree(val);
dictReleaseIterator(di);
@ -2953,6 +2959,11 @@ void redisDbPersistentData::processChangesAsync(std::atomic<int> &pendingJobs)
});
}
void redisDbPersistentData::bulkStorageInsert(char **rgKeys, size_t *rgcbKeys, char **rgVals, size_t *rgcbVals, size_t celem)
{
m_spstorage->bulkInsert(rgKeys, rgcbKeys, rgVals, rgcbVals, celem);
}
void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot **psnapshotFree)
{
if (m_pdbSnapshotStorageFlush)

View File

@ -63,7 +63,7 @@ static unsigned int dict_force_resize_ratio = 5;
static int _dictExpandIfNeeded(dict *ht);
static unsigned long _dictNextPower(unsigned long size);
static long _dictKeyIndex(dict *ht, const void *key, uint64_t hash, dictEntry **existing);
long _dictKeyIndex(dict *ht, const void *key, uint64_t hash, dictEntry **existing);
static int _dictInit(dict *ht, dictType *type, void *privDataPtr);
/* -------------------------- hash functions -------------------------------- */
@ -1366,7 +1366,7 @@ static unsigned long _dictNextPower(unsigned long size)
*
* Note that if we are in the process of rehashing the hash table, the
* index is always returned in the context of the second (new) hash table. */
static long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing)
long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing)
{
unsigned long idx, table;
dictEntry *he;

View File

@ -237,7 +237,7 @@ void clientInstallWriteHandler(client *c) {
* writes at this stage. */
if (!(c->flags & CLIENT_PENDING_WRITE) &&
(c->replstate == REPL_STATE_NONE ||
(c->replstate == REPL_STATE_NONE || c->replstate == SLAVE_STATE_FASTSYNC_TX || c->replstate == SLAVE_STATE_FASTSYNC_DONE ||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
{
AssertCorrectThread(c);
@ -1566,7 +1566,7 @@ bool freeClient(client *c) {
/* If a client is protected, yet we need to free it right now, make sure
* to at least use asynchronous freeing. */
if (c->flags & CLIENT_PROTECTED || c->casyncOpsPending) {
if (c->flags & CLIENT_PROTECTED || c->casyncOpsPending || c->replstate == SLAVE_STATE_FASTSYNC_TX) {
freeClientAsync(c);
return false;
}
@ -1791,7 +1791,7 @@ int writeToClient(client *c, int handler_installed) {
/* We can only directly read from the replication backlog if the client
is a replica, so only attempt to do so if that's the case. */
if (c->flags & CLIENT_SLAVE && !(c->flags & CLIENT_MONITOR)) {
if (c->flags & CLIENT_SLAVE && !(c->flags & CLIENT_MONITOR) && c->replstate == SLAVE_STATE_ONLINE) {
std::unique_lock<fastlock> repl_backlog_lock (g_pserver->repl_backlog_lock);
while (clientHasPendingReplies(c)) {
@ -1833,9 +1833,11 @@ int writeToClient(client *c, int handler_installed) {
}
} else {
while(clientHasPendingReplies(c)) {
serverAssert(!(c->flags & CLIENT_SLAVE) || c->flags & CLIENT_MONITOR);
if (c->bufpos > 0) {
nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen);
auto bufpos = c->bufpos;
lock.unlock();
nwritten = connWrite(c->conn,c->buf+c->sentlen,bufpos-c->sentlen);
lock.lock();
if (nwritten <= 0) break;
c->sentlen += nwritten;
totwritten += nwritten;
@ -1854,7 +1856,10 @@ int writeToClient(client *c, int handler_installed) {
continue;
}
nwritten = connWrite(c->conn, o->buf() + c->sentlen, o->used - c->sentlen);
auto used = o->used;
lock.unlock();
nwritten = connWrite(c->conn, o->buf() + c->sentlen, used - c->sentlen);
lock.lock();
if (nwritten <= 0) break;
c->sentlen += nwritten;
totwritten += nwritten;
@ -1993,7 +1998,7 @@ void ProcessPendingAsyncWrites()
ae_flags |= AE_BARRIER;
}
if (!((c->replstate == REPL_STATE_NONE ||
if (!((c->replstate == REPL_STATE_NONE || c->replstate == SLAVE_STATE_FASTSYNC_TX ||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack))))
continue;
@ -2063,9 +2068,9 @@ int handleClientsWithPendingWrites(int iel, int aof_state) {
/* If a client is protected, don't do anything,
* that may trigger write error or recreate handler. */
if (flags & CLIENT_PROTECTED) continue;
if ((flags & CLIENT_PROTECTED) && !(flags & CLIENT_SLAVE)) continue;
std::unique_lock<decltype(c->lock)> lock(c->lock);
//std::unique_lock<decltype(c->lock)> lock(c->lock);
/* Don't write to clients that are going to be closed anyway. */
if (c->flags & CLIENT_CLOSE_ASAP) continue;
@ -2075,11 +2080,9 @@ int handleClientsWithPendingWrites(int iel, int aof_state) {
{
if (c->flags & CLIENT_CLOSE_ASAP)
{
lock.release(); // still locked
AeLocker ae;
ae.arm(c);
if (!freeClient(c)) // writeToClient will only async close, but there's no need to wait
c->lock.unlock(); // if we just got put on the async close list, then we need to remove the lock
ae.arm(nullptr);
freeClient(c); // writeToClient will only async close, but there's no need to wait
}
continue;
}
@ -3829,7 +3832,7 @@ void asyncCloseClientOnOutputBufferLimitReached(client *c) {
if (!c->conn) return; /* It is unsafe to free fake clients. */
serverAssert(c->reply_bytes < SIZE_MAX-(1024*64));
if (c->reply_bytes == 0 || c->flags & CLIENT_CLOSE_ASAP) return;
if (checkClientOutputBufferLimits(c)) {
if (checkClientOutputBufferLimits(c) && c->replstate != SLAVE_STATE_FASTSYNC_TX) {
sds client = catClientInfoString(sdsempty(),c);
freeClientAsync(c);

View File

@ -34,6 +34,7 @@
#include "cluster.h"
#include "bio.h"
#include "aelocker.h"
#include "SnapshotPayloadParseState.h"
#include <sys/time.h>
#include <unistd.h>
@ -929,6 +930,224 @@ need_full_resync:
return C_ERR;
}
int checkClientOutputBufferLimits(client *c);
class replicationBuffer {
std::vector<client *> replicas;
clientReplyBlock *reply = nullptr;
size_t writtenBytesTracker = 0;
public:
replicationBuffer() {
reply = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + (PROTO_REPLY_CHUNK_BYTES*2));
reply->size = zmalloc_usable_size(reply) - sizeof(clientReplyBlock);
reply->used = 0;
}
~replicationBuffer() {
zfree(reply);
}
void addReplica(client *replica) {
replicas.push_back(replica);
replicationSetupSlaveForFullResync(replica,getPsyncInitialOffset());
// Optimize the socket for bulk transfer
//connDisableTcpNoDelay(replica->conn);
}
bool isActive() const { return !replicas.empty(); }
void flushData() {
if (reply == nullptr)
return;
size_t written = reply->used;
aeAcquireLock();
for (size_t ireplica = 0; ireplica < replicas.size(); ++ireplica) {
auto replica = replicas[ireplica];
if (replica->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP) {
replica->replstate = REPL_STATE_NONE;
continue;
}
while (checkClientOutputBufferLimits(replica)
&& (replica->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP) == 0) {
aeReleaseLock();
usleep(10);
aeAcquireLock();
}
if (ireplica == replicas.size()-1 && replica->replyAsync == nullptr) {
if (prepareClientToWrite(replica) == C_OK) {
replica->replyAsync = reply;
reply = nullptr;
}
} else {
addReplyProto(replica, reply->buf(), reply->size);
}
}
ProcessPendingAsyncWrites();
replicas.erase(std::remove_if(replicas.begin(), replicas.end(), [](const client *c)->bool{ return c->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP;}), replicas.end());
aeReleaseLock();
if (reply != nullptr) {
reply->used = 0;
}
writtenBytesTracker += written;
}
void addData(const char *data, unsigned long size) {
if (reply != nullptr && (size + reply->used) > reply->size)
flushData();
if (reply != nullptr && reply->size < size) {
serverAssert(reply->used == 0); // flush should have happened
zfree(reply);
reply = nullptr;
}
if (reply == nullptr) {
reply = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + std::max(size, (unsigned long)(PROTO_REPLY_CHUNK_BYTES*2)));
reply->size = zmalloc_usable_size(reply) - sizeof(clientReplyBlock);
reply->used = 0;
}
serverAssert((reply->size - reply->used) >= size);
memcpy(reply->buf() + reply->used, data, size);
reply->used += size;
}
void addLongWithPrefix(long val, char prefix) {
char buf[128];
int len;
buf[0] = prefix;
len = ll2string(buf+1,sizeof(buf)-1,val);
buf[len+1] = '\r';
buf[len+2] = '\n';
addData(buf, len+3);
}
void addArrayLen(int len) {
addLongWithPrefix(len, '*');
}
void addLong(long val) {
addLongWithPrefix(val, ':');
}
void addString(const char *s, unsigned long len) {
addLongWithPrefix(len, '$');
addData(s, len);
addData("\r\n", 2);
}
size_t cbWritten() const { return writtenBytesTracker; }
void end() {
flushData();
for (auto replica : replicas) {
// Return to original settings
//if (!g_pserver->repl_disable_tcp_nodelay)
// connEnableTcpNoDelay(replica->conn);
}
}
void putSlavesOnline() {
for (auto replica : replicas) {
replica->replstate = SLAVE_STATE_FASTSYNC_DONE;
replica->repl_put_online_on_ack = 1;
}
}
};
int rdbSaveSnapshotForReplication(struct rdbSaveInfo *rsi) {
// TODO: This needs to be on a background thread
int retval = C_OK;
serverAssert(GlobalLocksAcquired());
serverLog(LL_NOTICE, "Starting storage provider fast full sync with target: %s", "disk");
std::shared_ptr<replicationBuffer> spreplBuf = std::make_shared<replicationBuffer>();
listNode *ln;
listIter li;
client *replica = nullptr;
listRewind(g_pserver->slaves, &li);
while (replica == nullptr && (ln = listNext(&li))) {
client *replicaCur = (client*)listNodeValue(ln);
if ((replicaCur->slave_capa & SLAVE_CAPA_ROCKSDB_SNAPSHOT) && (replicaCur->replstate == SLAVE_STATE_WAIT_BGSAVE_START)) {
replica = replicaCur;
spreplBuf->addReplica(replica);
replicaCur->replstate = SLAVE_STATE_FASTSYNC_TX;
}
}
serverAssert(replica != nullptr);
g_pserver->asyncworkqueue->AddWorkFunction([repl_stream_db = rsi->repl_stream_db, spreplBuf = std::move(spreplBuf)]{
int retval = C_OK;
auto timeStart = ustime();
auto lastLogTime = timeStart;
size_t cbData = 0;
size_t cbLastUpdate = 0;
auto &replBuf = *spreplBuf;
replBuf.addArrayLen(2); // Two sections: Metadata and databases
// MetaData
replBuf.addArrayLen(1);
replBuf.addArrayLen(2);
replBuf.addString("repl-stream-db", 14);
replBuf.addLong(repl_stream_db);
// Databases
replBuf.addArrayLen(cserver.dbnum);
for (int idb = 0; idb < cserver.dbnum; ++idb) {
std::unique_ptr<const StorageCache> spsnapshot = g_pserver->db[idb]->CloneStorageCache();
size_t snapshotDeclaredCount = spsnapshot->count();
replBuf.addArrayLen(snapshotDeclaredCount);
size_t count = 0;
bool result = spsnapshot->enumerate([&replBuf, &count, &cbData, &lastLogTime, timeStart, &cbLastUpdate](const char *rgchKey, size_t cchKey, const void *rgchVal, size_t cchVal) -> bool{
replBuf.addArrayLen(2);
replBuf.addString(rgchKey, cchKey);
replBuf.addString((const char *)rgchVal, cchVal);
++count;
if ((count % 8092) == 0) {
auto curTime = ustime();
if ((curTime - lastLogTime) > 60000000) {
auto usec = curTime - lastLogTime;
serverLog(LL_NOTICE, "Replication status: Transferred %zuMB (%.2fGbit/s)", replBuf.cbWritten()/1024/1024, ((replBuf.cbWritten()-cbLastUpdate)*8.0)/(usec/1000000.0)/1000000000.0);
cbLastUpdate = replBuf.cbWritten();
lastLogTime = ustime();
}
}
cbData += cchKey + cchVal;
return replBuf.isActive();
});
if (!result) {
retval = C_ERR;
break;
}
serverAssert(count == snapshotDeclaredCount);
}
replBuf.end();
if (!replBuf.isActive()) {
retval = C_ERR;
}
serverLog(LL_NOTICE, "Snapshot replication done: %s", (retval == C_OK) ? "success" : "failed");
auto usec = ustime() - timeStart;
serverLog(LL_NOTICE, "Transferred %zuMB total (%zuMB data) in %.2f seconds. (%.2fGbit/s)", spreplBuf->cbWritten()/1024/1024, cbData/1024/1024, usec/1000000.0, (spreplBuf->cbWritten()*8.0)/(usec/1000000.0)/1000000000.0);
if (retval == C_OK) {
aeAcquireLock();
replicationScriptCacheFlush();
replBuf.putSlavesOnline();
aeReleaseLock();
}
});
return retval;
}
/* Start a BGSAVE for replication goals, which is, selecting the disk or
* socket target depending on the configuration, and making sure that
* the script cache is flushed before to start.
@ -962,7 +1181,9 @@ int startBgsaveForReplication(int mincapa) {
/* Only do rdbSave* when rsiptr is not NULL,
* otherwise replica will miss repl-stream-db. */
if (rsiptr) {
if (socket_target)
if (mincapa & SLAVE_CAPA_ROCKSDB_SNAPSHOT && g_pserver->m_pstorageFactory)
retval = rdbSaveSnapshotForReplication(rsiptr);
else if (socket_target)
retval = rdbSaveToSlavesSockets(rsiptr);
else
retval = rdbSaveBackground(rsiptr);
@ -1334,6 +1555,8 @@ void replconfCommand(client *c) {
c->slave_capa |= SLAVE_CAPA_PSYNC2;
else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]), "activeExpire"))
c->slave_capa |= SLAVE_CAPA_ACTIVE_EXPIRE;
else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]), "rocksdb-snapshot-load"))
c->slave_capa |= SLAVE_CAPA_ROCKSDB_SNAPSHOT;
fCapaCommand = true;
} else if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"ack")) {
@ -1358,7 +1581,7 @@ void replconfCommand(client *c) {
* quick check first (instead of waiting for the next ACK. */
if (g_pserver->child_type == CHILD_TYPE_RDB && c->replstate == SLAVE_STATE_WAIT_BGSAVE_END)
checkChildrenDone();
if (c->repl_put_online_on_ack && c->replstate == SLAVE_STATE_ONLINE)
if (c->repl_put_online_on_ack && (c->replstate == SLAVE_STATE_ONLINE || c->replstate == SLAVE_STATE_FASTSYNC_DONE))
putSlaveOnline(c);
/* Note: this command does not reply anything! */
return;
@ -1400,6 +1623,8 @@ void replconfCommand(client *c) {
sds reply = sdsnew("+OK");
if (g_pserver->fActiveReplica)
reply = sdscat(reply, " active-replica");
if (g_pserver->m_pstorageFactory && (c->slave_capa & SLAVE_CAPA_ROCKSDB_SNAPSHOT) && !g_pserver->fActiveReplica)
reply = sdscat(reply, " rocksdb-snapshot-save");
reply = sdscat(reply, "\r\n");
addReplySds(c, reply);
} else {
@ -2074,25 +2299,163 @@ void disklessLoadDiscardBackup(const dbBackup *buckup, int flag) {
discardDbBackup(buckup, flag, replicationEmptyDbCallback);
}
size_t parseCount(const char *rgch, size_t cch, long long *pvalue) {
size_t cchNumeral = 0;
while ((cch > (cchNumeral+1)) && isdigit(rgch[1 + cchNumeral])) ++cchNumeral;
if (cch < (cchNumeral+1+2)) { // +2 is for the \r\n we expect
throw true; // continuable
}
if (rgch[cchNumeral+1] != '\r' || rgch[cchNumeral+2] != '\n') {
serverLog(LL_WARNING, "Bad protocol from MASTER: %s", rgch);
throw false;
}
if (!string2ll(rgch+1, cchNumeral, pvalue) || *pvalue < 0) {
serverLog(LL_WARNING, "Bad protocol from MASTER: %s", rgch);
throw false;
}
return cchNumeral + 3;
}
bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi) {
int fUpdate = g_pserver->fActiveReplica || g_pserver->enable_multimaster;
serverAssert(GlobalLocksAcquired());
serverAssert(mi->master == nullptr);
bool fFinished = false;
if (mi->bulkreadBuffer == nullptr) {
mi->bulkreadBuffer = sdsempty();
mi->parseState = new SnapshotPayloadParseState();
if (!fUpdate) {
int empty_db_flags = g_pserver->repl_slave_lazy_flush ? EMPTYDB_ASYNC :
EMPTYDB_NO_FLAGS;
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data");
emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
}
}
for (int iter = 0; iter < 10; ++iter) {
if (mi->parseState->shouldThrottle())
return false;
auto readlen = PROTO_IOBUF_LEN;
auto qblen = sdslen(mi->bulkreadBuffer);
mi->bulkreadBuffer = sdsMakeRoomFor(mi->bulkreadBuffer, readlen);
auto nread = connRead(conn, mi->bulkreadBuffer+qblen, readlen);
if (nread <= 0) {
if (connGetState(conn) != CONN_STATE_CONNECTED)
cancelReplicationHandshake(mi, true);
return false;
}
mi->repl_transfer_lastio = g_pserver->unixtime;
sdsIncrLen(mi->bulkreadBuffer,nread);
size_t offset = 0;
try {
if (sdslen(mi->bulkreadBuffer) > cserver.client_max_querybuf_len) {
throw "Full Sync Streaming Buffer Exceeded (increase client_max_querybuf_len)";
}
while (sdslen(mi->bulkreadBuffer) > offset) {
// Pop completed items
mi->parseState->trimState();
if (mi->bulkreadBuffer[offset] == '*') {
// Starting an array
long long arraySize;
// Lets read the array length
offset += parseCount(mi->bulkreadBuffer + offset, sdslen(mi->bulkreadBuffer) - offset, &arraySize);
mi->parseState->pushArray(arraySize);
} else if (mi->bulkreadBuffer[offset] == '$') {
// Loading in a string
long long payloadsize = 0;
// Lets read the string length
size_t offsetCount = parseCount(mi->bulkreadBuffer + offset, sdslen(mi->bulkreadBuffer) - offset, &payloadsize);
// OK we know how long the string is, now lets make sure the payload is here.
if (sdslen(mi->bulkreadBuffer) < (offset + offsetCount + payloadsize + 2)) {
goto LContinue; // wait for more data (note: we could throw true here, but throw is way more expensive)
}
mi->parseState->pushValue(mi->bulkreadBuffer + offset + offsetCount, payloadsize);
// On to the next one
offset += offsetCount + payloadsize + 2;
} else if (mi->bulkreadBuffer[offset] == ':') {
// Numeral
long long value;
size_t offsetValue = parseCount(mi->bulkreadBuffer + offset, sdslen(mi->bulkreadBuffer) - offset, &value);
mi->parseState->pushValue(value);
offset += offsetValue;
} else {
serverLog(LL_WARNING, "Bad protocol from MASTER: %s", mi->bulkreadBuffer+offset);
cancelReplicationHandshake(mi, true);
return false;
}
}
sdsrange(mi->bulkreadBuffer, offset, -1);
offset = 0;
// Cleanup the remaining stack
mi->parseState->trimState();
if (mi->parseState->depth() != 0)
return false;
rsi.repl_stream_db = mi->parseState->getMetaDataLongLong("repl-stream-db");
fFinished = true;
break; // We're done!!!
}
catch (const char *sz) {
serverLog(LL_WARNING, "%s", sz);
cancelReplicationHandshake(mi, true);
return false;
} catch (bool fContinue) {
if (!fContinue) {
cancelReplicationHandshake(mi, true);
return false;
}
}
LContinue:
sdsrange(mi->bulkreadBuffer, offset, -1);
}
if (!fFinished)
return false;
serverLog(LL_NOTICE, "Fast sync complete");
sdsfree(mi->bulkreadBuffer);
mi->bulkreadBuffer = nullptr;
delete mi->parseState;
mi->parseState = nullptr;
return true;
}
/* Asynchronously read the SYNC payload we receive from a master */
#define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
void readSyncBulkPayload(connection *conn) {
bool readSyncBulkPayloadRdb(connection *conn, redisMaster *mi, rdbSaveInfo &rsi, int &usemark) {
char buf[PROTO_IOBUF_LEN];
ssize_t nread, readlen, nwritten;
int use_diskless_load = useDisklessLoad();
const dbBackup *diskless_load_backup = NULL;
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
rsi.fForceSetKey = !!g_pserver->fActiveReplica;
int empty_db_flags = g_pserver->repl_slave_lazy_flush ? EMPTYDB_ASYNC :
EMPTYDB_NO_FLAGS;
off_t left;
// Should we update our database, or create from scratch?
int fUpdate = g_pserver->fActiveReplica || g_pserver->enable_multimaster;
redisMaster *mi = (redisMaster*)connGetPrivateData(conn);
if (mi == nullptr) {
// We're about to be free'd so bail out
return;
}
serverAssert(GlobalLocksAcquired());
serverAssert(mi->master == nullptr);
@ -2101,7 +2464,6 @@ void readSyncBulkPayload(connection *conn) {
* from the server: when they match, we reached the end of the transfer. */
static char eofmark[CONFIG_RUN_ID_SIZE];
static char lastbytes[CONFIG_RUN_ID_SIZE];
static int usemark = 0;
/* If repl_transfer_size == -1 we still have to read the bulk length
* from the master reply. */
@ -2123,7 +2485,7 @@ void readSyncBulkPayload(connection *conn) {
* the connection live. So we refresh our last interaction
* timestamp. */
mi->repl_transfer_lastio = g_pserver->unixtime;
return;
return false;
} else if (buf[0] != '$') {
serverLog(LL_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);
goto error;
@ -2157,7 +2519,7 @@ void readSyncBulkPayload(connection *conn) {
(long long) mi->repl_transfer_size,
use_diskless_load? "to parser":"to disk");
}
return;
return false;
}
if (!use_diskless_load) {
@ -2174,12 +2536,12 @@ void readSyncBulkPayload(connection *conn) {
if (nread <= 0) {
if (connGetState(conn) == CONN_STATE_CONNECTED) {
/* equivalent to EAGAIN */
return;
return false;
}
serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s",
(nread == -1) ? strerror(errno) : "connection lost");
cancelReplicationHandshake(mi, true);
return;
return false;
}
g_pserver->stat_net_input_bytes += nread;
@ -2248,7 +2610,7 @@ void readSyncBulkPayload(connection *conn) {
/* If the transfer is yet not complete, we need to read more, so
* return ASAP and wait for the handler to be called again. */
if (!eof_reached) return;
if (!eof_reached) return false;
}
/* We reach this point in one of the following cases:
@ -2320,7 +2682,7 @@ void readSyncBulkPayload(connection *conn) {
/* Note that there's no point in restarting the AOF on SYNC
* failure, it'll be restarted when sync succeeds or the replica
* gets promoted. */
return;
return false;
}
/* RDB loading succeeded if we reach this point. */
@ -2340,7 +2702,7 @@ void readSyncBulkPayload(connection *conn) {
serverLog(LL_WARNING,"Replication stream EOF marker is broken");
cancelReplicationHandshake(mi,true);
rioFreeConn(&rdb, NULL);
return;
return false;
}
}
@ -2372,7 +2734,7 @@ void readSyncBulkPayload(connection *conn) {
"MASTER <-> REPLICA synchronization: %s",
strerror(errno));
cancelReplicationHandshake(mi,true);
return;
return false;
}
/* Rename rdb like renaming rewrite aof asynchronously. */
@ -2385,7 +2747,7 @@ void readSyncBulkPayload(connection *conn) {
g_pserver->rdb_filename, strerror(errno));
cancelReplicationHandshake(mi,true);
if (old_rdb_fd != -1) close(old_rdb_fd);
return;
return false;
}
rdb_filename = g_pserver->rdb_filename;
@ -2415,7 +2777,7 @@ void readSyncBulkPayload(connection *conn) {
}
/* Note that there's no point in restarting the AOF on sync failure,
it'll be restarted when sync succeeds or replica promoted. */
return;
return false;
}
/* Cleanup. */
@ -2433,6 +2795,32 @@ void readSyncBulkPayload(connection *conn) {
mi->repl_transfer_tmpfile = NULL;
}
return true;
error:
cancelReplicationHandshake(mi,true);
return false;
}
void readSyncBulkPayload(connection *conn) {
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
redisMaster *mi = (redisMaster*)connGetPrivateData(conn);
static int usemark = 0;
if (mi == nullptr) {
// We're about to be free'd so bail out
return;
}
if (mi->isRocksdbSnapshotRepl) {
if (!readSnapshotBulkPayload(conn, mi, rsi))
return;
} else {
if (!readSyncBulkPayloadRdb(conn, mi, rsi, usemark))
return;
}
// Should we update our database, or create from scratch?
int fUpdate = g_pserver->fActiveReplica || g_pserver->enable_multimaster;
/* Final setup of the connected slave <- master link */
replicationCreateMasterClient(mi,mi->repl_transfer_s,rsi.repl_stream_db);
mi->repl_transfer_s = nullptr;
@ -2475,17 +2863,13 @@ void readSyncBulkPayload(connection *conn) {
}
/* Send the initial ACK immediately to put this replica in online state. */
if (usemark) replicationSendAck(mi);
if (usemark || mi->isRocksdbSnapshotRepl) replicationSendAck(mi);
/* Restart the AOF subsystem now that we finished the sync. This
* will trigger an AOF rewrite, and when done will start appending
* to the new file. */
if (g_pserver->aof_enabled) restartAOFAfterSYNC();
return;
error:
cancelReplicationHandshake(mi,true);
return;
}
char *receiveSynchronousResponse(redisMaster *mi, connection *conn) {
@ -2808,12 +3192,15 @@ void parseMasterCapa(redisMaster *mi, sds strcapa)
char *pchEnd = szStart;
mi->isActive = false;
mi->isRocksdbSnapshotRepl = false;
for (;;)
{
if (*pchEnd == ' ' || *pchEnd == '\0') {
// Parse the word
if (strncmp(szStart, "active-replica", pchEnd - szStart) == 0) {
mi->isActive = true;
} else if (strncmp(szStart, "rocksdb-snapshot-save", pchEnd - szStart) == 0) {
mi->isRocksdbSnapshotRepl = true;
}
szStart = pchEnd + 1;
}
@ -2945,8 +3332,20 @@ void syncWithMaster(connection *conn) {
* PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
*
* The master will ignore capabilities it does not understand. */
err = sendCommand(conn,"REPLCONF",
"capa","eof","capa","psync2","capa","activeExpire",NULL);
std::vector<const char*> veccapabilities = {
"REPLCONF",
"capa","eof",
"capa","psync2",
"capa","activeExpire",
};
if (g_pserver->m_pstorageFactory && !g_pserver->fActiveReplica) {
veccapabilities.push_back("capa");
veccapabilities.push_back("rocksdb-snapshot-load");
}
err = sendCommandArgv(conn, veccapabilities.size(), veccapabilities.data(), nullptr);
if (err) goto write_error;
/* Send UUID */
@ -3290,6 +3689,15 @@ void replicationAbortSyncTransfer(redisMaster *mi) {
*
* Otherwise zero is returned and no operation is performed at all. */
int cancelReplicationHandshake(redisMaster *mi, int reconnect) {
if (mi->bulkreadBuffer != nullptr) {
sdsfree(mi->bulkreadBuffer);
mi->bulkreadBuffer = nullptr;
}
if (mi->parseState) {
delete mi->parseState;
mi->parseState = nullptr;
}
if (mi->repl_state == REPL_STATE_TRANSFER) {
replicationAbortSyncTransfer(mi);
mi->repl_state = REPL_STATE_CONNECT;
@ -4283,6 +4691,13 @@ void replicationCron(void) {
client *replica = (client*)ln->value;
std::unique_lock<fastlock> ul(replica->lock);
if (replica->replstate == SLAVE_STATE_FASTSYNC_DONE && !clientHasPendingReplies(replica)) {
serverLog(LL_WARNING, "Putting replica online");
replica->postFunction([](client *c){
putSlaveOnline(c);
});
}
if (replica->replstate == SLAVE_STATE_ONLINE) {
if (replica->flags & CLIENT_PRE_PSYNC)
continue;

View File

@ -369,6 +369,8 @@ public:
bool operator==(const char *other) const
{
if (other == nullptr || m_str == nullptr)
return other == m_str;
return sdscmp(m_str, other) == 0;
}
@ -396,8 +398,11 @@ public:
{}
sdsstring(const sdsstring &other)
: sdsview(sdsdup(other.m_str))
{}
: sdsview()
{
if (other.m_str != nullptr)
m_str = sdsdup(other.m_str);
}
sdsstring(const char *rgch, size_t cch)
: sdsview(sdsnewlen(rgch, cch))

View File

@ -607,12 +607,15 @@ typedef enum {
#define SLAVE_STATE_WAIT_BGSAVE_END 7 /* Waiting RDB file creation to finish. */
#define SLAVE_STATE_SEND_BULK 8 /* Sending RDB file to replica. */
#define SLAVE_STATE_ONLINE 9 /* RDB file transmitted, sending just updates. */
#define SLAVE_STATE_FASTSYNC_TX 10
#define SLAVE_STATE_FASTSYNC_DONE 11
/* Slave capabilities. */
#define SLAVE_CAPA_NONE 0
#define SLAVE_CAPA_EOF (1<<0) /* Can parse the RDB EOF streaming format. */
#define SLAVE_CAPA_PSYNC2 (1<<1) /* Supports PSYNC2 protocol. */
#define SLAVE_CAPA_ACTIVE_EXPIRE (1<<2) /* Will the slave perform its own expirations? (Don't send delete) */
#define SLAVE_CAPA_ROCKSDB_SNAPSHOT (1<<3)
/* Synchronous read timeout - replica side */
#define CONFIG_REPL_SYNCIO_TIMEOUT 5
@ -1102,7 +1105,12 @@ public:
size_t slots() const { return dictSlots(m_pdict); }
size_t size(bool fCachedOnly = false) const;
void expand(uint64_t slots) { dictExpand(m_pdict, slots); }
void expand(uint64_t slots) {
if (m_spstorage)
m_spstorage->expand(slots);
else
dictExpand(m_pdict, slots);
}
void trackkey(robj_roptr o, bool fUpdate)
{
@ -1193,6 +1201,9 @@ public:
bool FSnapshot() const { return m_spdbSnapshotHOLDER != nullptr; }
std::unique_ptr<const StorageCache> CloneStorageCache() { return std::unique_ptr<const StorageCache>(m_spstorage->clone()); }
void bulkStorageInsert(char **rgKeys, size_t *rgcbKeys, char **rgVals, size_t *rgcbVals, size_t celem);
dict_iter find_cached_threadsafe(const char *key) const;
protected:
@ -1352,6 +1363,8 @@ struct redisDb : public redisDbPersistentDataSnapshot
using redisDbPersistentData::prepOverwriteForSnapshot;
using redisDbPersistentData::FRehashing;
using redisDbPersistentData::FTrackingChanges;
using redisDbPersistentData::CloneStorageCache;
using redisDbPersistentData::bulkStorageInsert;
public:
expireset::setiter expireitr;
@ -1673,7 +1686,7 @@ struct client {
size_t argv_len_sumActive = 0;
bool FPendingReplicaWrite() const {
return repl_curr_off != repl_end_off;
return repl_curr_off != repl_end_off && replstate == SLAVE_STATE_ONLINE;
}
// post a function from a non-client thread to run on its client thread
@ -2058,6 +2071,7 @@ struct redisMaster {
long long master_initial_offset; /* Master PSYNC offset. */
bool isActive = false;
bool isRocksdbSnapshotRepl = false;
int repl_state; /* Replication status if the instance is a replica */
off_t repl_transfer_size; /* Size of RDB to read from master during sync. */
off_t repl_transfer_read; /* Amount of RDB read from master during sync. */
@ -2068,6 +2082,9 @@ struct redisMaster {
time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */
time_t repl_down_since; /* Unix time at which link with master went down */
class SnapshotPayloadParseState *parseState;
sds bulkreadBuffer = nullptr;
unsigned char master_uuid[UUID_BINARY_LEN]; /* Used during sync with master, this is our master's UUID */
/* After we've connected with our master use the UUID in g_pserver->master */
uint64_t mvccLastSync;
@ -2829,7 +2846,7 @@ void addReplyDouble(client *c, double d);
void addReplyHumanLongDouble(client *c, long double d);
void addReplyLongLong(client *c, long long ll);
#ifdef __cplusplus
void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix);
void addReplyLongLongWithPrefix(client *c, long long ll, char prefix);
#endif
void addReplyArrayLen(client *c, long length);
void addReplyMapLen(client *c, long length);

View File

@ -2,9 +2,13 @@
#include <string>
#include <sstream>
#include <mutex>
#include <unistd.h>
#include "rocksdbfactor_internal.h"
static const char keyprefix[] = INTERNAL_KEY_PREFIX;
rocksdb::Options DefaultRocksDBOptions();
bool FInternalKey(const char *key, size_t cch)
{
if (cch >= sizeof(INTERNAL_KEY_PREFIX))
@ -15,8 +19,8 @@ bool FInternalKey(const char *key, size_t cch)
return false;
}
RocksDBStorageProvider::RocksDBStorageProvider(std::shared_ptr<rocksdb::DB> &spdb, std::shared_ptr<rocksdb::ColumnFamilyHandle> &spcolfam, const rocksdb::Snapshot *psnapshot, size_t count)
: m_spdb(spdb), m_psnapshot(psnapshot), m_spcolfamily(spcolfam), m_count(count)
RocksDBStorageProvider::RocksDBStorageProvider(RocksDBStorageFactory *pfactory, std::shared_ptr<rocksdb::DB> &spdb, std::shared_ptr<rocksdb::ColumnFamilyHandle> &spcolfam, const rocksdb::Snapshot *psnapshot, size_t count)
: m_pfactory(pfactory), m_spdb(spdb), m_psnapshot(psnapshot), m_spcolfamily(spcolfam), m_count(count)
{
m_readOptionsTemplate = rocksdb::ReadOptions();
m_readOptionsTemplate.verify_checksums = false;
@ -38,13 +42,57 @@ void RocksDBStorageProvider::insert(const char *key, size_t cchKey, void *data,
++m_count;
}
void RocksDBStorageProvider::bulkInsert(sds *rgkeys, sds *rgvals, size_t celem)
void RocksDBStorageProvider::bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem)
{
auto spbatch = std::make_unique<rocksdb::WriteBatch>();
for (size_t ielem = 0; ielem < celem; ++ielem) {
spbatch->Put(m_spcolfamily.get(), rocksdb::Slice(rgkeys[ielem], sdslen(rgkeys[ielem])), rocksdb::Slice(rgvals[ielem], sdslen(rgvals[ielem])));
if (celem >= 16384) {
rocksdb::Options options = DefaultRocksDBOptions();
rocksdb::SstFileWriter sst_file_writer(rocksdb::EnvOptions(), options, options.comparator);
std::string file_path = m_pfactory->getTempFolder() + "/tmpIngest.";
file_path += std::to_string(gettid());
file_path += ".sst";
rocksdb::Status s = sst_file_writer.Open(file_path);
if (!s.ok())
goto LFallback;
// Insert rows into the SST file, note that inserted keys must be
// strictly increasing (based on options.comparator)
for (size_t ielem = 0; ielem < celem; ++ielem) {
s = sst_file_writer.Put(rocksdb::Slice(rgkeys[ielem], rgcbkeys[ielem]), rocksdb::Slice(rgvals[ielem], rgcbvals[ielem]));
if (!s.ok()) {
unlink(file_path.c_str());
goto LFallback;
}
}
s = sst_file_writer.Finish();
if (!s.ok()) {
unlink(file_path.c_str());
goto LFallback;
}
auto ingestOptions = rocksdb::IngestExternalFileOptions();
ingestOptions.move_files = true;
//ingestOptions.verify_file_checksum = false;
ingestOptions.write_global_seqno = false;
// Ingest the external SST file into the DB
s = m_spdb->IngestExternalFile(m_spcolfamily.get(), {file_path}, ingestOptions);
if (!s.ok()) {
unlink(file_path.c_str());
goto LFallback;
}
unlink(file_path.c_str());
} else {
LFallback:
auto spbatch = std::make_unique<rocksdb::WriteBatch>();
for (size_t ielem = 0; ielem < celem; ++ielem) {
spbatch->Put(m_spcolfamily.get(), rocksdb::Slice(rgkeys[ielem], rgcbkeys[ielem]), rocksdb::Slice(rgvals[ielem], rgcbvals[ielem]));
}
m_spdb->Write(WriteOptions(), spbatch.get());
}
m_spdb->Write(WriteOptions(), spbatch.get());
std::unique_lock<fastlock> l(m_lock);
m_count += celem;
}
@ -123,7 +171,7 @@ bool RocksDBStorageProvider::enumerate(callback fn) const
const IStorage *RocksDBStorageProvider::clone() const
{
const rocksdb::Snapshot *psnapshot = const_cast<RocksDBStorageProvider*>(this)->m_spdb->GetSnapshot();
return new RocksDBStorageProvider(const_cast<RocksDBStorageProvider*>(this)->m_spdb, const_cast<RocksDBStorageProvider*>(this)->m_spcolfamily, psnapshot, m_count);
return new RocksDBStorageProvider(m_pfactory, const_cast<RocksDBStorageProvider*>(this)->m_spdb, const_cast<RocksDBStorageProvider*>(this)->m_spcolfamily, psnapshot, m_count);
}
RocksDBStorageProvider::~RocksDBStorageProvider()

View File

@ -8,9 +8,11 @@
#define INTERNAL_KEY_PREFIX "\x00\x04\x03\x00\x05\x02\x04"
static const char count_key[] = INTERNAL_KEY_PREFIX "__keydb__count\1";
static const char version_key[] = INTERNAL_KEY_PREFIX "__keydb__version\1";
class RocksDBStorageFactory;
class RocksDBStorageProvider : public IStorage
{
RocksDBStorageFactory *m_pfactory;
std::shared_ptr<rocksdb::DB> m_spdb; // Note: This must be first so it is deleted last
std::unique_ptr<rocksdb::WriteBatch> m_spbatch;
const rocksdb::Snapshot *m_psnapshot = nullptr;
@ -20,7 +22,7 @@ class RocksDBStorageProvider : public IStorage
mutable fastlock m_lock {"RocksDBStorageProvider"};
public:
RocksDBStorageProvider(std::shared_ptr<rocksdb::DB> &spdb, std::shared_ptr<rocksdb::ColumnFamilyHandle> &spcolfam, const rocksdb::Snapshot *psnapshot, size_t count);
RocksDBStorageProvider(RocksDBStorageFactory *pfactory, std::shared_ptr<rocksdb::DB> &spdb, std::shared_ptr<rocksdb::ColumnFamilyHandle> &spcolfam, const rocksdb::Snapshot *psnapshot, size_t count);
~RocksDBStorageProvider();
virtual void insert(const char *key, size_t cchKey, void *data, size_t cb, bool fOverwrite) override;
@ -34,7 +36,7 @@ public:
virtual void beginWriteBatch() override;
virtual void endWriteBatch() override;
virtual void bulkInsert(sds *rgkeys, sds *rgvals, size_t celem) override;
virtual void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem) override;
virtual void batch_lock() override;
virtual void batch_unlock() override;

View File

@ -0,0 +1,28 @@
#pragma once
#include "rocksdb.h"
class RocksDBStorageFactory : public IStorageFactory
{
std::shared_ptr<rocksdb::DB> m_spdb; // Note: This must be first so it is deleted last
std::vector<std::unique_ptr<rocksdb::ColumnFamilyHandle>> m_vecspcols;
std::shared_ptr<rocksdb::SstFileManager> m_pfilemanager;
std::string m_path;
bool m_fCreatedTempFolder = false;
public:
RocksDBStorageFactory(const char *dbfile, int dbnum, const char *rgchConfig, size_t cchConfig);
~RocksDBStorageFactory();
virtual IStorage *create(int db, key_load_iterator iter, void *privdata) override;
virtual const char *name() const override;
virtual size_t totalDiskspaceUsed() const override;
virtual bool FSlow() const override { return true; }
virtual size_t filedsRequired() const override;
std::string getTempFolder();
private:
void setVersion(rocksdb::ColumnFamilyHandle*);
};

View File

@ -6,28 +6,23 @@
#include <rocksdb/sst_file_manager.h>
#include <rocksdb/utilities/convenience.h>
#include <rocksdb/slice_transform.h>
#include "rocksdbfactor_internal.h"
#include <sys/types.h>
#include <sys/stat.h>
class RocksDBStorageFactory : public IStorageFactory
{
std::shared_ptr<rocksdb::DB> m_spdb; // Note: This must be first so it is deleted last
std::vector<std::unique_ptr<rocksdb::ColumnFamilyHandle>> m_vecspcols;
std::shared_ptr<rocksdb::SstFileManager> m_pfilemanager;
public:
RocksDBStorageFactory(const char *dbfile, int dbnum, const char *rgchConfig, size_t cchConfig);
~RocksDBStorageFactory();
virtual IStorage *create(int db, key_load_iterator iter, void *privdata) override;
virtual const char *name() const override;
virtual size_t totalDiskspaceUsed() const override;
virtual bool FSlow() const override { return true; }
virtual size_t filedsRequired() const override;
private:
void setVersion(rocksdb::ColumnFamilyHandle*);
};
rocksdb::Options DefaultRocksDBOptions() {
rocksdb::Options options;
options.max_background_compactions = 4;
options.max_background_flushes = 2;
options.bytes_per_sync = 1048576;
options.compaction_pri = rocksdb::kMinOverlappingRatio;
options.compression = rocksdb::kNoCompression;
options.enable_pipelined_write = true;
options.allow_mmap_reads = true;
options.avoid_unnecessary_blocking_io = true;
options.prefix_extractor.reset(rocksdb::NewFixedPrefixTransform(0));
return options;
}
IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum, const char *rgchConfig, size_t cchConfig)
{
@ -35,6 +30,7 @@ IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum, const
}
RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum, const char *rgchConfig, size_t cchConfig)
: m_path(dbfile)
{
// Get the count of column families in the actual database
std::vector<std::string> vecT;
@ -49,21 +45,13 @@ RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum, cons
m_pfilemanager = std::shared_ptr<rocksdb::SstFileManager>(rocksdb::NewSstFileManager(rocksdb::Env::Default()));
rocksdb::Options options;
rocksdb::Options options = DefaultRocksDBOptions();
options.max_open_files = filedsRequired();
options.sst_file_manager = m_pfilemanager;
options.create_if_missing = true;
options.create_missing_column_families = true;
rocksdb::DB *db = nullptr;
options.max_background_compactions = 4;
options.max_background_flushes = 2;
options.max_open_files = filedsRequired();
options.bytes_per_sync = 1048576;
options.compaction_pri = rocksdb::kMinOverlappingRatio;
options.compression = rocksdb::kNoCompression;
options.enable_pipelined_write = true;
options.sst_file_manager = m_pfilemanager;
options.allow_mmap_reads = true;
options.prefix_extractor.reset(rocksdb::NewFixedPrefixTransform(0));
rocksdb::BlockBasedTableOptions table_options;
table_options.block_size = 16 * 1024;
table_options.cache_index_and_filter_blocks = true;
@ -136,6 +124,16 @@ size_t RocksDBStorageFactory::filedsRequired() const {
return 256;
}
std::string RocksDBStorageFactory::getTempFolder()
{
auto path = m_path + "/keydb_tmp/";
if (!m_fCreatedTempFolder) {
if (!mkdir(path.c_str(), 0700))
m_fCreatedTempFolder = true;
}
return path;
}
IStorage *RocksDBStorageFactory::create(int db, key_load_iterator iter, void *privdata)
{
++db; // skip default col family
@ -174,7 +172,7 @@ IStorage *RocksDBStorageFactory::create(int db, key_load_iterator iter, void *pr
++count;
}
}
return new RocksDBStorageProvider(m_spdb, spcolfamily, nullptr, count);
return new RocksDBStorageProvider(this, m_spdb, spcolfamily, nullptr, count);
}
const char *RocksDBStorageFactory::name() const

View File

@ -0,0 +1,34 @@
proc prepare_value {size} {
set _v "c"
for {set i 1} {$i < $size} {incr i} {
append _v 0
}
return $_v
}
start_server {tags {"replication-fast"} overrides {storage-provider {flash ./rocks.db.master}}} {
set slave [srv 0 client]
set slave_host [srv 0 host]
set slave_port [srv 0 port]
start_server {tags {} overrides {storage-provider {flash ./rocks.db.replica}}} {
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]
test "fast replication with large value" {
set _v [prepare_value [expr 64*1024*1024]]
# $master set key $_v
$slave replicaof $master_host $master_port
wait_for_condition 50 300 {
[lindex [$slave role] 0] eq {slave} &&
[string match {*master_link_status:up*} [$slave info replication]]
} else {
fail "Can't turn the instance into a replica"
}
assert_equal [$slave debug digest] [$master debug digest]
$slave replicaof no one
}
}
}

View File

@ -1,4 +1,5 @@
set ::global_overrides {}
set ::global_storage_provider {}
set ::tags {}
set ::valgrind_errors {}
@ -392,6 +393,9 @@ proc start_server {options {code undefined}} {
foreach {directive arguments} [concat $::global_overrides $overrides] {
dict set config $directive $arguments
}
foreach {directive argument1 argument2} $::global_storage_provider {
dict set config $directive $argument1 $argument2
}
# remove directives that are marked to be omitted
foreach directive $omit {

View File

@ -59,6 +59,7 @@ set ::all_tests {
integration/failover
integration/redis-cli
integration/redis-benchmark
integration/replication-fast
unit/pubsub
unit/slowlog
unit/scripting
@ -692,6 +693,17 @@ for {set j 0} {$j < [llength $argv]} {incr j} {
} elseif {$opt eq {--help}} {
print_help_screen
exit 0
} elseif {$opt eq {--flash}} {
lappend ::global_storage_provider storage-provider
lappend ::global_storage_provider flash
lappend ::global_storage_provider ./rocks.db
set ::all_tests {
integration/replication
integration/replication-2
integration/replication-3
integration/replication-4
integration/replication-psync
}
} else {
puts "Wrong argument: $opt"
exit 1