Merge pull request #60 from Snapchat/merge_os_apr_13

Merge in changes from Open Source.

For the most part, this encompasses two changes:

- Adding partial sync capabilities to multi-master
- Adding ReadWrite locks to prevent global lock issues when forking threads, specifically with regards to the time thread.
This commit is contained in:
Vivek Saini 2022-05-02 13:26:21 -04:00 committed by GitHub Enterprise
commit 3724cf97d9
24 changed files with 442 additions and 212 deletions

View File

@ -34,3 +34,26 @@ jobs:
- name: rotation test - name: rotation test
run: | run: |
./runtest-rotation ./runtest-rotation
build-ubuntu-old:
runs-on: ubuntu-18.04
steps:
- uses: actions/checkout@v1
- name: make -j2
run: |
sudo apt-get update
sudo apt-get -y install uuid-dev libcurl4-openssl-dev
make -j2
build-macos-latest:
runs-on: macos-latest
steps:
- uses: actions/checkout@v2
- name: make
run: make KEYDB_CFLAGS='-Werror' KEYDB_CXXFLAGS='-Werror' -j2
build-libc-malloc:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: make

View File

@ -33,19 +33,13 @@ Because of this difference of opinion features which are right for KeyDB may not
Additional Resources Additional Resources
-------------------- --------------------
Try our docker container: https://hub.docker.com/r/eqalpha/keydb Check out KeyDB's [Docker Image](https://hub.docker.com/r/eqalpha/keydb)
Talk on Gitter: https://gitter.im/KeyDB Join us on [Slack](https://docs.keydb.dev/slack/)
Visit our Website: https://keydb.dev Post to the [Community Forum](https://community.keydb.dev)
See options for channel partners and support contracts: https://keydb.dev/support.html Learn more through KeyDB's [Documentation & Learning Center](https://docs.keydb.dev)
Learn with KeyDBs official documentation site: https://docs.keydb.dev
[Subscribe to the KeyDB mailing list](https://eqalpha.us20.list-manage.com/subscribe/post?u=978f486c2f95589b24591a9cc&id=4ab9220500)
Management GUI: We recommend [FastoNoSQL](https://fastonosql.com/) which has official KeyDB support.
Benchmarking KeyDB Benchmarking KeyDB

View File

@ -28,6 +28,7 @@ void AsyncWorkQueue::WorkerThreadMain()
if (m_workqueue.empty()) if (m_workqueue.empty())
m_cvWakeup.wait(lock); m_cvWakeup.wait(lock);
aeThreadOnline();
while (!m_workqueue.empty()) while (!m_workqueue.empty())
{ {
WorkItem task = std::move(m_workqueue.front()); WorkItem task = std::move(m_workqueue.front());
@ -49,6 +50,7 @@ void AsyncWorkQueue::WorkerThreadMain()
} }
g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch); g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch);
serverTL->gcEpoch.reset(); serverTL->gcEpoch.reset();
aeThreadOffline();
} }
listRelease(vars.clients_pending_asyncwrite); listRelease(vars.clients_pending_asyncwrite);

View File

@ -53,6 +53,7 @@
#include "zmalloc.h" #include "zmalloc.h"
#include "config.h" #include "config.h"
#include "serverassert.h" #include "serverassert.h"
#include "readwritelock.h"
#ifdef USE_MUTEX #ifdef USE_MUTEX
thread_local int cOwnLock = 0; thread_local int cOwnLock = 0;
@ -87,6 +88,7 @@ mutex_wrapper g_lock;
#else #else
fastlock g_lock("AE (global)"); fastlock g_lock("AE (global)");
#endif #endif
readWriteLock g_forkLock("Fork (global)");
thread_local aeEventLoop *g_eventLoopThisThread = NULL; thread_local aeEventLoop *g_eventLoopThisThread = NULL;
/* Include the best multiplexing layer supported by this system. /* Include the best multiplexing layer supported by this system.
@ -154,16 +156,22 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int )
case AE_ASYNC_OP::PostFunction: case AE_ASYNC_OP::PostFunction:
{ {
if (cmd.fLock && !ulock.owns_lock()) if (cmd.fLock && !ulock.owns_lock()) {
g_forkLock.releaseRead();
ulock.lock(); ulock.lock();
g_forkLock.acquireRead();
}
((aePostFunctionProc*)cmd.proc)(cmd.clientData); ((aePostFunctionProc*)cmd.proc)(cmd.clientData);
break; break;
} }
case AE_ASYNC_OP::PostCppFunction: case AE_ASYNC_OP::PostCppFunction:
{ {
if (cmd.fLock && !ulock.owns_lock()) if (cmd.fLock && !ulock.owns_lock()) {
g_forkLock.releaseRead();
ulock.lock(); ulock.lock();
g_forkLock.acquireRead();
}
(*cmd.pfn)(); (*cmd.pfn)();
delete cmd.pfn; delete cmd.pfn;
@ -547,7 +555,11 @@ static int processTimeEvents(aeEventLoop *eventLoop) {
if (te->next) if (te->next)
te->next->prev = te->prev; te->next->prev = te->prev;
if (te->finalizerProc) { if (te->finalizerProc) {
if (!ulock.owns_lock()) ulock.lock(); if (!ulock.owns_lock()) {
g_forkLock.releaseRead();
ulock.lock();
g_forkLock.acquireRead();
}
te->finalizerProc(eventLoop, te->clientData); te->finalizerProc(eventLoop, te->clientData);
now = getMonotonicUs(); now = getMonotonicUs();
} }
@ -567,7 +579,11 @@ static int processTimeEvents(aeEventLoop *eventLoop) {
} }
if (te->when <= now) { if (te->when <= now) {
if (!ulock.owns_lock()) ulock.lock(); if (!ulock.owns_lock()) {
g_forkLock.releaseRead();
ulock.lock();
g_forkLock.acquireRead();
}
int retval; int retval;
id = te->id; id = te->id;
@ -591,8 +607,11 @@ extern "C" void ProcessEventCore(aeEventLoop *eventLoop, aeFileEvent *fe, int ma
{ {
#define LOCK_IF_NECESSARY(fe, tsmask) \ #define LOCK_IF_NECESSARY(fe, tsmask) \
std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock); \ std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock); \
if (!(fe->mask & tsmask)) \ if (!(fe->mask & tsmask)) { \
ulock.lock() g_forkLock.releaseRead(); \
ulock.lock(); \
g_forkLock.acquireRead(); \
}
int fired = 0; /* Number of events fired for current fd. */ int fired = 0; /* Number of events fired for current fd. */
@ -704,8 +723,11 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags)
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP) { if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP) {
std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock); std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock);
if (!(eventLoop->beforesleepFlags & AE_SLEEP_THREADSAFE)) if (!(eventLoop->beforesleepFlags & AE_SLEEP_THREADSAFE)) {
g_forkLock.releaseRead();
ulock.lock(); ulock.lock();
g_forkLock.acquireRead();
}
eventLoop->beforesleep(eventLoop); eventLoop->beforesleep(eventLoop);
} }
@ -716,8 +738,11 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags)
/* After sleep callback. */ /* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) { if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) {
std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock); std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock);
if (!(eventLoop->aftersleepFlags & AE_SLEEP_THREADSAFE)) if (!(eventLoop->aftersleepFlags & AE_SLEEP_THREADSAFE)) {
g_forkLock.releaseRead();
ulock.lock(); ulock.lock();
g_forkLock.acquireRead();
}
eventLoop->aftersleep(eventLoop); eventLoop->aftersleep(eventLoop);
} }
@ -792,9 +817,21 @@ void setAeLockSetThreadSpinWorker(spin_worker worker)
tl_worker = worker; tl_worker = worker;
} }
void aeThreadOnline()
{
g_forkLock.acquireRead();
}
void aeAcquireLock() void aeAcquireLock()
{ {
g_forkLock.releaseRead();
g_lock.lock(tl_worker); g_lock.lock(tl_worker);
g_forkLock.acquireRead();
}
void aeAcquireForkLock()
{
g_forkLock.upgradeWrite();
} }
int aeTryAcquireLock(int fWeak) int aeTryAcquireLock(int fWeak)
@ -802,6 +839,11 @@ int aeTryAcquireLock(int fWeak)
return g_lock.try_lock(!!fWeak); return g_lock.try_lock(!!fWeak);
} }
void aeThreadOffline()
{
g_forkLock.releaseRead();
}
void aeReleaseLock() void aeReleaseLock()
{ {
g_lock.unlock(); g_lock.unlock();
@ -812,6 +854,11 @@ void aeSetThreadOwnsLockOverride(int fOverride)
fOwnLockOverride = fOverride; fOwnLockOverride = fOverride;
} }
void aeReleaseForkLock()
{
g_forkLock.downgradeWrite();
}
int aeThreadOwnsLock() int aeThreadOwnsLock()
{ {
return fOwnLockOverride || g_lock.fOwnLock(); return fOwnLockOverride || g_lock.fOwnLock();

View File

@ -164,9 +164,13 @@ void aeSetDontWait(aeEventLoop *eventLoop, int noWait);
void aeClosePipesForForkChild(aeEventLoop *eventLoop); void aeClosePipesForForkChild(aeEventLoop *eventLoop);
void setAeLockSetThreadSpinWorker(spin_worker worker); void setAeLockSetThreadSpinWorker(spin_worker worker);
void aeThreadOnline();
void aeAcquireLock(); void aeAcquireLock();
void aeAcquireForkLock();
int aeTryAcquireLock(int fWeak); int aeTryAcquireLock(int fWeak);
void aeThreadOffline();
void aeReleaseLock(); void aeReleaseLock();
void aeReleaseForkLock();
int aeThreadOwnsLock(); int aeThreadOwnsLock();
void aeSetThreadOwnsLockOverride(int fOverride); void aeSetThreadOwnsLockOverride(int fOverride);
int aeLockContested(int threshold); int aeLockContested(int threshold);

View File

@ -884,7 +884,7 @@ int loadAppendOnlyFile(char *filename) {
} else { } else {
/* RDB preamble. Pass loading the RDB functions. */ /* RDB preamble. Pass loading the RDB functions. */
rio rdb; rio rdb;
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; rdbSaveInfo rsi;
serverLog(LL_NOTICE,"Reading RDB preamble from AOF file..."); serverLog(LL_NOTICE,"Reading RDB preamble from AOF file...");
if (fseek(fp,0,SEEK_SET) == -1) goto readerr; if (fseek(fp,0,SEEK_SET) == -1) goto readerr;

View File

@ -365,7 +365,7 @@ typedef struct RedisModuleCommandFilter {
static list *moduleCommandFilters; static list *moduleCommandFilters;
/* Module GIL Variables */ /* Module GIL Variables */
static readWriteLock s_moduleGIL; static readWriteLock s_moduleGIL("Module GIL");
thread_local bool g_fModuleThread = false; thread_local bool g_fModuleThread = false;
typedef void (*RedisModuleForkDoneHandler) (int exitcode, int bysignal, void *user_data); typedef void (*RedisModuleForkDoneHandler) (int exitcode, int bysignal, void *user_data);

View File

@ -2002,18 +2002,6 @@ void ProcessPendingAsyncWrites()
} }
c->fPendingAsyncWrite = FALSE; c->fPendingAsyncWrite = FALSE;
// Now install the write event handler
int ae_flags = AE_WRITABLE|AE_WRITE_THREADSAFE;
/* For the fsync=always policy, we want that a given FD is never
* served for reading and writing in the same event loop iteration,
* so that in the middle of receiving the query, and serving it
* to the client, we'll call beforeSleep() that will do the
* actual fsync of AOF to disk. AE_BARRIER ensures that. */
if (g_pserver->aof_state == AOF_ON &&
g_pserver->aof_fsync == AOF_FSYNC_ALWAYS)
{
ae_flags |= AE_BARRIER;
}
if (!((c->replstate == REPL_STATE_NONE || c->replstate == SLAVE_STATE_FASTSYNC_TX || if (!((c->replstate == REPL_STATE_NONE || c->replstate == SLAVE_STATE_FASTSYNC_TX ||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))) (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack))))

View File

@ -138,7 +138,7 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) {
* *
* The current limit of 52 is chosen so that the biggest string object * The current limit of 52 is chosen so that the biggest string object
* we allocate as EMBSTR will still fit into the 64 byte arena of jemalloc. */ * we allocate as EMBSTR will still fit into the 64 byte arena of jemalloc. */
#define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 52 size_t OBJ_ENCODING_EMBSTR_SIZE_LIMIT = 52;
robj *createStringObject(const char *ptr, size_t len) { robj *createStringObject(const char *ptr, size_t len) {
if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT) if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT)

View File

@ -41,6 +41,7 @@
#include <errno.h> #include <errno.h>
#include <stdlib.h> #include <stdlib.h>
#include <stdint.h>
static inline char *med3 (char *, char *, char *, static inline char *med3 (char *, char *, char *,
int (*)(const void *, const void *)); int (*)(const void *, const void *));
@ -62,7 +63,7 @@ static inline void swapfunc (char *, char *, size_t, int);
} while (--i > 0); \ } while (--i > 0); \
} }
#define SWAPINIT(a, es) swaptype = ((char *)a - (char *)0) % sizeof(long) || \ #define SWAPINIT(a, es) swaptype = (uintptr_t)a % sizeof(long) || \
es % sizeof(long) ? 2 : es == sizeof(long)? 0 : 1; es % sizeof(long) ? 2 : es == sizeof(long)? 0 : 1;
static inline void static inline void

View File

@ -1218,6 +1218,40 @@ int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
== -1) return -1; == -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",rsi->master_repl_offset) if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",rsi->master_repl_offset)
== -1) return -1; == -1) return -1;
if (g_pserver->fActiveReplica && listLength(g_pserver->masters) > 0) {
sdsstring val = sdsstring(sdsempty());
listNode *ln;
listIter li;
redisMaster* mi;
listRewind(g_pserver->masters,&li);
while ((ln = listNext(&li)) != NULL) {
mi = (redisMaster*)listNodeValue(ln);
if (!mi->master) {
// If master client is not available, use info from master struct - better than nothing
serverLog(LL_NOTICE, "saving master %s", mi->master_replid);
if (mi->master_replid[0] == 0) {
// if replid is null, there's no reason to save it
continue;
}
val = val.catfmt("%s:%I:%s:%i;", mi->master_replid,
mi->master_initial_offset,
mi->masterhost,
mi->masterport);
}
else {
serverLog(LL_NOTICE, "saving master %s", mi->master->replid);
if (mi->master->replid[0] == 0) {
// if replid is null, there's no reason to save it
continue;
}
val = val.catfmt("%s:%I:%s:%i;", mi->master->replid,
mi->master->reploff,
mi->masterhost,
mi->masterport);
}
}
if (rdbSaveAuxFieldStrStr(rdb, "repl-masters",val.get()) == -1) return -1;
}
} }
if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble) == -1) return -1; if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble) == -1) return -1;
return 1; return 1;
@ -1552,6 +1586,7 @@ struct rdbSaveThreadArgs
void *rdbSaveThread(void *vargs) void *rdbSaveThread(void *vargs)
{ {
aeThreadOnline();
serverAssert(!g_pserver->rdbThreadVars.fDone); serverAssert(!g_pserver->rdbThreadVars.fDone);
rdbSaveThreadArgs *args = reinterpret_cast<rdbSaveThreadArgs*>(vargs); rdbSaveThreadArgs *args = reinterpret_cast<rdbSaveThreadArgs*>(vargs);
serverAssert(serverTL == nullptr); serverAssert(serverTL == nullptr);
@ -1577,7 +1612,7 @@ void *rdbSaveThread(void *vargs)
"%s: %zd MB of memory used by copy-on-write", "%s: %zd MB of memory used by copy-on-write",
"RDB",cbDiff/(1024*1024)); "RDB",cbDiff/(1024*1024));
} }
aeThreadOffline();
g_pserver->rdbThreadVars.fDone = true; g_pserver->rdbThreadVars.fDone = true;
return (retval == C_OK) ? (void*)0 : (void*)1; return (retval == C_OK) ? (void*)0 : (void*)1;
} }
@ -1627,10 +1662,10 @@ int launchRdbSaveThread(pthread_t &child, rdbSaveInfo *rsi)
} else } else
{ {
rdbSaveThreadArgs *args = (rdbSaveThreadArgs*)zmalloc(sizeof(rdbSaveThreadArgs) + ((cserver.dbnum-1)*sizeof(redisDbPersistentDataSnapshot*)), MALLOC_LOCAL); rdbSaveThreadArgs *args = (rdbSaveThreadArgs*)zmalloc(sizeof(rdbSaveThreadArgs) + ((cserver.dbnum-1)*sizeof(redisDbPersistentDataSnapshot*)), MALLOC_LOCAL);
rdbSaveInfo rsiT = RDB_SAVE_INFO_INIT; rdbSaveInfo rsiT;
if (rsi == nullptr) if (rsi == nullptr)
rsi = &rsiT; rsi = &rsiT;
memcpy(&args->rsi, rsi, sizeof(rdbSaveInfo)); args->rsi = *(new (args) rdbSaveInfo(*rsi));
memcpy(&args->rsi.repl_id, g_pserver->replid, sizeof(g_pserver->replid)); memcpy(&args->rsi.repl_id, g_pserver->replid, sizeof(g_pserver->replid));
args->rsi.master_repl_offset = g_pserver->master_repl_offset; args->rsi.master_repl_offset = g_pserver->master_repl_offset;
@ -2887,11 +2922,11 @@ public:
* snapshot taken by the master may not be reflected on the replica. */ * snapshot taken by the master may not be reflected on the replica. */
bool fExpiredKey = iAmMaster() && !(this->rdbflags&RDBFLAGS_AOF_PREAMBLE) && job.expiretime != INVALID_EXPIRE && job.expiretime < this->now; bool fExpiredKey = iAmMaster() && !(this->rdbflags&RDBFLAGS_AOF_PREAMBLE) && job.expiretime != INVALID_EXPIRE && job.expiretime < this->now;
if (fStaleMvccKey || fExpiredKey) { if (fStaleMvccKey || fExpiredKey) {
if (fStaleMvccKey && !fExpiredKey && this->rsi != nullptr && this->rsi->mi != nullptr && this->rsi->mi->staleKeyMap != nullptr && lookupKeyRead(job.db, &keyobj) == nullptr) { if (fStaleMvccKey && !fExpiredKey && this->rsi != nullptr && this->rsi->masters != nullptr && this->rsi->masters->staleKeyMap != nullptr && lookupKeyRead(job.db, &keyobj) == nullptr) {
// We have a key that we've already deleted and is not back in our database. // We have a key that we've already deleted and is not back in our database.
// We'll need to inform the sending master of the delete if it is also a replica of us // We'll need to inform the sending master of the delete if it is also a replica of us
robj_sharedptr objKeyDup(createStringObject(job.key, sdslen(job.key))); robj_sharedptr objKeyDup(createStringObject(job.key, sdslen(job.key)));
this->rsi->mi->staleKeyMap->operator[](job.db->id).push_back(objKeyDup); this->rsi->masters->staleKeyMap->operator[](job.db->id).push_back(objKeyDup);
} }
sdsfree(job.key); sdsfree(job.key);
job.key = nullptr; job.key = nullptr;
@ -3205,6 +3240,23 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
memcpy(rsi->repl_id,ptrFromObj(auxval),CONFIG_RUN_ID_SIZE+1); memcpy(rsi->repl_id,ptrFromObj(auxval),CONFIG_RUN_ID_SIZE+1);
rsi->repl_id_is_set = 1; rsi->repl_id_is_set = 1;
} }
} else if (!strcasecmp(szFromObj(auxkey),"repl-masters")) {
if (rsi) {
struct redisMaster mi;
char *masters = szFromObj(auxval);
char *entry = strtok(masters, ":");
while (entry != NULL) {
memcpy(mi.master_replid, entry, sizeof(mi.master_replid));
entry = strtok(NULL, ":");
mi.master_initial_offset = atoi(entry);
entry = strtok(NULL, ":");
mi.masterhost = entry;
entry = strtok(NULL, ";");
mi.masterport = atoi(entry);
entry = strtok(NULL, ":");
rsi->addMaster(mi);
}
}
} else if (!strcasecmp(szFromObj(auxkey),"repl-offset")) { } else if (!strcasecmp(szFromObj(auxkey),"repl-offset")) {
if (rsi) rsi->repl_offset = strtoll(szFromObj(auxval),NULL,10); if (rsi) rsi->repl_offset = strtoll(szFromObj(auxval),NULL,10);
} else if (!strcasecmp(szFromObj(auxkey),"lua")) { } else if (!strcasecmp(szFromObj(auxkey),"lua")) {
@ -3654,6 +3706,7 @@ void *rdbSaveToSlavesSocketsThread(void *vargs)
int retval; int retval;
rio rdb; rio rdb;
aeThreadOnline();
serverAssert(serverTL == nullptr); serverAssert(serverTL == nullptr);
redisServerThreadVars vars; redisServerThreadVars vars;
serverTL = &vars; serverTL = &vars;
@ -3684,7 +3737,7 @@ void *rdbSaveToSlavesSocketsThread(void *vargs)
g_pserver->db[idb]->endSnapshotAsync(args->rgpdb[idb]); g_pserver->db[idb]->endSnapshotAsync(args->rgpdb[idb]);
g_pserver->garbageCollector.endEpoch(vars.gcEpoch); g_pserver->garbageCollector.endEpoch(vars.gcEpoch);
aeThreadOffline();
close(args->safe_to_exit_pipe); close(args->safe_to_exit_pipe);
zfree(args); zfree(args);
@ -3719,7 +3772,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
args->rdb_pipe_write = pipefds[1]; /* write end */ args->rdb_pipe_write = pipefds[1]; /* write end */
anetNonBlock(NULL, g_pserver->rdb_pipe_read); anetNonBlock(NULL, g_pserver->rdb_pipe_read);
memcpy(&args->rsi, rsi, sizeof(rdbSaveInfo)); args->rsi = *(new (args) rdbSaveInfo(*rsi));
memcpy(&args->rsi.repl_id, g_pserver->replid, sizeof(g_pserver->replid)); memcpy(&args->rsi.repl_id, g_pserver->replid, sizeof(g_pserver->replid));
args->rsi.master_repl_offset = g_pserver->master_repl_offset; args->rsi.master_repl_offset = g_pserver->master_repl_offset;
@ -3859,8 +3912,8 @@ void bgsaveCommand(client *c) {
* is returned, and the RDB saving will not persist any replication related * is returned, and the RDB saving will not persist any replication related
* information. */ * information. */
rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) { rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) {
rdbSaveInfo rsi_init = RDB_SAVE_INFO_INIT; rdbSaveInfo rsi_init;
*rsi = rsi_init; *rsi = std::move(rsi_init);
memcpy(rsi->repl_id, g_pserver->replid, sizeof(g_pserver->replid)); memcpy(rsi->repl_id, g_pserver->replid, sizeof(g_pserver->replid));
rsi->master_repl_offset = g_pserver->master_repl_offset; rsi->master_repl_offset = g_pserver->master_repl_offset;

View File

@ -2,22 +2,24 @@
#include <condition_variable> #include <condition_variable>
class readWriteLock { class readWriteLock {
std::mutex m_readLock; fastlock m_readLock;
std::recursive_mutex m_writeLock; fastlock m_writeLock;
std::condition_variable m_cv; std::condition_variable_any m_cv;
int m_readCount = 0; int m_readCount = 0;
int m_writeCount = 0; int m_writeCount = 0;
bool m_writeWaiting = false; bool m_writeWaiting = false;
public: public:
readWriteLock(const char *name) : m_readLock(name), m_writeLock(name) {}
void acquireRead() { void acquireRead() {
std::unique_lock<std::mutex> rm(m_readLock); std::unique_lock<fastlock> rm(m_readLock);
while (m_writeCount > 0 || m_writeWaiting) while (m_writeCount > 0 || m_writeWaiting)
m_cv.wait(rm); m_cv.wait(rm);
m_readCount++; m_readCount++;
} }
bool tryAcquireRead() { bool tryAcquireRead() {
std::unique_lock<std::mutex> rm(m_readLock, std::defer_lock); std::unique_lock<fastlock> rm(m_readLock, std::defer_lock);
if (!rm.try_lock()) if (!rm.try_lock())
return false; return false;
if (m_writeCount > 0 || m_writeWaiting) if (m_writeCount > 0 || m_writeWaiting)
@ -27,7 +29,7 @@ public:
} }
void acquireWrite(bool exclusive = true) { void acquireWrite(bool exclusive = true) {
std::unique_lock<std::mutex> rm(m_readLock); std::unique_lock<fastlock> rm(m_readLock);
m_writeWaiting = true; m_writeWaiting = true;
while (m_readCount > 0) while (m_readCount > 0)
m_cv.wait(rm); m_cv.wait(rm);
@ -43,24 +45,12 @@ public:
} }
void upgradeWrite(bool exclusive = true) { void upgradeWrite(bool exclusive = true) {
std::unique_lock<std::mutex> rm(m_readLock); releaseRead();
m_writeWaiting = true; acquireWrite(exclusive);
while (m_readCount > 1)
m_cv.wait(rm);
if (exclusive) {
/* Another thread might have the write lock while we have the read lock
but won't be able to release it until they can acquire the read lock
so release the read lock and try again instead of waiting to avoid deadlock */
while(!m_writeLock.try_lock())
m_cv.wait(rm);
}
m_writeCount++;
m_readCount--;
m_writeWaiting = false;
} }
bool tryAcquireWrite(bool exclusive = true) { bool tryAcquireWrite(bool exclusive = true) {
std::unique_lock<std::mutex> rm(m_readLock, std::defer_lock); std::unique_lock<fastlock> rm(m_readLock, std::defer_lock);
if (!rm.try_lock()) if (!rm.try_lock())
return false; return false;
if (m_readCount > 0) if (m_readCount > 0)
@ -73,14 +63,13 @@ public:
} }
void releaseRead() { void releaseRead() {
std::unique_lock<std::mutex> rm(m_readLock); std::unique_lock<fastlock> rm(m_readLock);
serverAssert(m_readCount > 0);
m_readCount--; m_readCount--;
m_cv.notify_all(); m_cv.notify_all();
} }
void releaseWrite(bool exclusive = true) { void releaseWrite(bool exclusive = true) {
std::unique_lock<std::mutex> rm(m_readLock); std::unique_lock<fastlock> rm(m_readLock);
serverAssert(m_writeCount > 0); serverAssert(m_writeCount > 0);
if (exclusive) if (exclusive)
m_writeLock.unlock(); m_writeLock.unlock();
@ -89,14 +78,8 @@ public:
} }
void downgradeWrite(bool exclusive = true) { void downgradeWrite(bool exclusive = true) {
std::unique_lock<std::mutex> rm(m_readLock); releaseWrite(exclusive);
serverAssert(m_writeCount > 0); acquireRead();
if (exclusive)
m_writeLock.unlock();
m_writeCount--;
while (m_writeCount > 0 || m_writeWaiting)
m_cv.wait(rm);
m_readCount++;
} }
bool hasReader() { bool hasReader() {

View File

@ -1017,7 +1017,11 @@ static void benchmark(const char *title, const char *cmd, int len) {
createMissingClients(c); createMissingClients(c);
config.start = mstime(); config.start = mstime();
if (!config.num_threads) aeMain(config.el); if (!config.num_threads) {
aeThreadOnline();
aeMain(config.el);
aeThreadOffline();
}
else startBenchmarkThreads(); else startBenchmarkThreads();
config.totlatency = mstime()-config.start; config.totlatency = mstime()-config.start;
@ -1057,7 +1061,9 @@ static void freeBenchmarkThreads() {
static void *execBenchmarkThread(void *ptr) { static void *execBenchmarkThread(void *ptr) {
benchmarkThread *thread = (benchmarkThread *) ptr; benchmarkThread *thread = (benchmarkThread *) ptr;
aeThreadOnline();
aeMain(thread->el); aeMain(thread->el);
aeThreadOffline();
return NULL; return NULL;
} }
@ -1696,7 +1702,7 @@ int main(int argc, const char **argv) {
int len; int len;
client c; client c;
aeThreadOnline();
storage_init(NULL, 0); storage_init(NULL, 0);
srandom(time(NULL) ^ getpid()); srandom(time(NULL) ^ getpid());
@ -1749,6 +1755,7 @@ int main(int argc, const char **argv) {
cliSecureInit(); cliSecureInit();
} }
#endif #endif
aeThreadOffline();
if (config.cluster_mode) { if (config.cluster_mode) {
// We only include the slot placeholder {tag} if cluster mode is enabled // We only include the slot placeholder {tag} if cluster mode is enabled

View File

@ -1196,7 +1196,6 @@ int rdbSaveSnapshotForReplication(struct rdbSaveInfo *rsi) {
size_t cbData = 0; size_t cbData = 0;
size_t cbLastUpdate = 0; size_t cbLastUpdate = 0;
auto &replBuf = *spreplBuf; auto &replBuf = *spreplBuf;
// Databases // Databases
replBuf.addArrayLen(cserver.dbnum); replBuf.addArrayLen(cserver.dbnum);
for (int idb = 0; idb < cserver.dbnum; ++idb) { for (int idb = 0; idb < cserver.dbnum; ++idb) {
@ -2250,24 +2249,6 @@ void changeReplicationId(void) {
saveMasterStatusToStorage(false); saveMasterStatusToStorage(false);
} }
int hexchToInt(char ch)
{
if (ch >= '0' && ch <= '9')
return ch - '0';
if (ch >= 'a' && ch <= 'f')
return (ch - 'a') + 10;
return (ch - 'A') + 10;
}
void mergeReplicationId(const char *id)
{
for (int i = 0; i < CONFIG_RUN_ID_SIZE; ++i)
{
const char *charset = "0123456789abcdef";
g_pserver->replid[i] = charset[hexchToInt(g_pserver->replid[i]) ^ hexchToInt(id[i])];
}
}
/* Clear (invalidate) the secondary replication ID. This happens, for /* Clear (invalidate) the secondary replication ID. This happens, for
* example, after a full resynchronization, when we start a new replication * example, after a full resynchronization, when we start a new replication
* history. */ * history. */
@ -2953,7 +2934,7 @@ bool readSyncBulkPayloadRdb(connection *conn, redisMaster *mi, rdbSaveInfo &rsi,
mi->staleKeyMap->clear(); mi->staleKeyMap->clear();
else else
mi->staleKeyMap = new (MALLOC_LOCAL) std::map<int, std::vector<robj_sharedptr>>(); mi->staleKeyMap = new (MALLOC_LOCAL) std::map<int, std::vector<robj_sharedptr>>();
rsi.mi = mi; rsi.addMaster(*mi);
} }
if (rdbLoadFile(rdb_filename,&rsi,RDBFLAGS_REPLICATION) != C_OK) { if (rdbLoadFile(rdb_filename,&rsi,RDBFLAGS_REPLICATION) != C_OK) {
serverLog(LL_WARNING, serverLog(LL_WARNING,
@ -2993,7 +2974,7 @@ error:
} }
void readSyncBulkPayload(connection *conn) { void readSyncBulkPayload(connection *conn) {
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; rdbSaveInfo rsi;
redisMaster *mi = (redisMaster*)connGetPrivateData(conn); redisMaster *mi = (redisMaster*)connGetPrivateData(conn);
static int usemark = 0; static int usemark = 0;
if (mi == nullptr) { if (mi == nullptr) {
@ -3009,9 +2990,6 @@ void readSyncBulkPayload(connection *conn) {
return; return;
} }
// Should we update our database, or create from scratch?
int fUpdate = g_pserver->fActiveReplica || g_pserver->enable_multimaster;
/* Final setup of the connected slave <- master link */ /* Final setup of the connected slave <- master link */
replicationCreateMasterClient(mi,mi->repl_transfer_s,rsi.repl_stream_db); replicationCreateMasterClient(mi,mi->repl_transfer_s,rsi.repl_stream_db);
if (mi->isRocksdbSnapshotRepl) { if (mi->isRocksdbSnapshotRepl) {
@ -3038,11 +3016,7 @@ void readSyncBulkPayload(connection *conn) {
/* After a full resynchronization we use the replication ID and /* After a full resynchronization we use the replication ID and
* offset of the master. The secondary ID / offset are cleared since * offset of the master. The secondary ID / offset are cleared since
* we are starting a new history. */ * we are starting a new history. */
if (fUpdate) if (!g_pserver->fActiveReplica)
{
mergeReplicationId(mi->master->replid);
}
else
{ {
/* After a full resynchroniziation we use the replication ID and /* After a full resynchroniziation we use the replication ID and
* offset of the master. The secondary ID / offset are cleared since * offset of the master. The secondary ID / offset are cleared since
@ -3237,7 +3211,7 @@ int slaveTryPartialResynchronization(redisMaster *mi, connection *conn, int read
* client structure representing the master into g_pserver->master. */ * client structure representing the master into g_pserver->master. */
mi->master_initial_offset = -1; mi->master_initial_offset = -1;
if (mi->cached_master && !g_pserver->fActiveReplica) { if (mi->cached_master) {
psync_replid = mi->cached_master->replid; psync_replid = mi->cached_master->replid;
snprintf(psync_offset,sizeof(psync_offset),"%lld", mi->cached_master->reploff+1); snprintf(psync_offset,sizeof(psync_offset),"%lld", mi->cached_master->reploff+1);
serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset); serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset);
@ -3336,16 +3310,17 @@ int slaveTryPartialResynchronization(redisMaster *mi, connection *conn, int read
sizeof(g_pserver->replid2)); sizeof(g_pserver->replid2));
g_pserver->second_replid_offset = g_pserver->master_repl_offset+1; g_pserver->second_replid_offset = g_pserver->master_repl_offset+1;
if (!g_pserver->fActiveReplica) {
/* Update the cached master ID and our own primary ID to the /* Update the cached master ID and our own primary ID to the
* new one. */ * new one. */
memcpy(g_pserver->replid,sznew,sizeof(g_pserver->replid)); memcpy(g_pserver->replid,sznew,sizeof(g_pserver->replid));
memcpy(mi->cached_master->replid,sznew,sizeof(g_pserver->replid)); memcpy(mi->cached_master->replid,sznew,sizeof(g_pserver->replid));
/* Disconnect all the sub-slaves: they need to be notified. */ /* Disconnect all the replicas: they need to be notified. */
if (!g_pserver->fActiveReplica)
disconnectSlaves(); disconnectSlaves();
} }
} }
}
/* Setup the replication to continue. */ /* Setup the replication to continue. */
sdsfree(reply); sdsfree(reply);
@ -3724,18 +3699,6 @@ retry_connect:
disconnectSlavesExcept(mi->master_uuid); /* Force our slaves to resync with us as well. */ disconnectSlavesExcept(mi->master_uuid); /* Force our slaves to resync with us as well. */
freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */ freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
} }
else
{
if (listLength(g_pserver->slaves))
{
changeReplicationId();
clearReplicationId2();
}
else
{
freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
}
}
/* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
* and the g_pserver->master_replid and master_initial_offset are * and the g_pserver->master_replid and master_initial_offset are
@ -4404,6 +4367,26 @@ void replicationCacheMasterUsingMyself(redisMaster *mi) {
mi->master = NULL; mi->master = NULL;
} }
/* This function is called when reloading master info from an RDB in Active Replica mode.
* It creates a cached master client using the info contained in the redisMaster struct.
*
* Assumes that the passed struct contains valid master info. */
void replicationCacheMasterUsingMaster(redisMaster *mi) {
if (mi->cached_master) {
freeClient(mi->cached_master);
}
replicationCreateMasterClient(mi, NULL, -1);
std::lock_guard<decltype(mi->master->lock)> lock(mi->master->lock);
memcpy(mi->master->replid, mi->master_replid, sizeof(mi->master_replid));
mi->master->reploff = mi->master_initial_offset;
unlinkClient(mi->master);
mi->cached_master = mi->master;
mi->master = NULL;
}
/* Free a cached master, called when there are no longer the conditions for /* Free a cached master, called when there are no longer the conditions for
* a partial resync on reconnection. */ * a partial resync on reconnection. */
void replicationDiscardCachedMaster(redisMaster *mi) { void replicationDiscardCachedMaster(redisMaster *mi) {

View File

@ -429,6 +429,21 @@ public:
return *this; return *this;
} }
sdsstring &operator=(sdsstring &&other)
{
sds tmp = m_str;
m_str = other.m_str;
other.m_str = tmp;
return *this;
}
template<typename... Args>
sdsstring catfmt(const char *fmt, Args... args)
{
m_str = sdscatfmt(m_str, fmt, args...);
return *this;
}
sds release() { sds release() {
sds sdsT = m_str; sds sdsT = m_str;
m_str = nullptr; m_str = nullptr;

View File

@ -62,7 +62,6 @@
#include <sys/socket.h> #include <sys/socket.h>
#include <algorithm> #include <algorithm>
#include <uuid/uuid.h> #include <uuid/uuid.h>
#include <mutex>
#include <condition_variable> #include <condition_variable>
#include "aelocker.h" #include "aelocker.h"
#include "motd.h" #include "motd.h"
@ -92,14 +91,12 @@ double R_Zero, R_PosInf, R_NegInf, R_Nan;
/* Global vars */ /* Global vars */
namespace GlobalHidden { namespace GlobalHidden {
struct redisServer server; /* Server global state */ struct redisServer server; /* Server global state */
readWriteLock forkLock;
} }
redisServer *g_pserver = &GlobalHidden::server; redisServer *g_pserver = &GlobalHidden::server;
readWriteLock *g_forkLock = &GlobalHidden::forkLock;
struct redisServerConst cserver; struct redisServerConst cserver;
thread_local struct redisServerThreadVars *serverTL = NULL; // thread local server vars thread_local struct redisServerThreadVars *serverTL = NULL; // thread local server vars
std::mutex time_thread_mutex; fastlock time_thread_lock("Time thread lock");
std::condition_variable time_thread_cv; std::condition_variable_any time_thread_cv;
int sleeping_threads = 0; int sleeping_threads = 0;
void wakeTimeThread(); void wakeTimeThread();
@ -2697,7 +2694,7 @@ extern "C" void asyncFreeDictTable(dictEntry **de)
void blockingOperationStarts() { void blockingOperationStarts() {
if(!g_pserver->blocking_op_nesting++){ if(!g_pserver->blocking_op_nesting++){
g_pserver->blocked_last_cron = g_pserver->mstime; __atomic_load(&g_pserver->mstime, &g_pserver->blocked_last_cron, __ATOMIC_ACQUIRE);
} }
} }
@ -2959,9 +2956,10 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
if (!fSentReplies) if (!fSentReplies)
handleClientsWithPendingWrites(iel, aof_state); handleClientsWithPendingWrites(iel, aof_state);
aeThreadOffline();
// Scope lock_guard // Scope lock_guard
{ {
std::lock_guard<std::mutex> lock(time_thread_mutex); std::unique_lock<fastlock> lock(time_thread_lock);
sleeping_threads++; sleeping_threads++;
serverAssert(sleeping_threads <= cserver.cthreads); serverAssert(sleeping_threads <= cserver.cthreads);
} }
@ -2995,8 +2993,9 @@ void afterSleep(struct aeEventLoop *eventLoop) {
Don't check here that modules are enabled, rather use the result from beforeSleep Don't check here that modules are enabled, rather use the result from beforeSleep
Otherwise you may double acquire the GIL and cause deadlocks in the module */ Otherwise you may double acquire the GIL and cause deadlocks in the module */
if (!ProcessingEventsWhileBlocked) { if (!ProcessingEventsWhileBlocked) {
wakeTimeThread();
if (serverTL->modulesEnabledThisAeLoop) moduleAcquireGIL(TRUE /*fServerThread*/); if (serverTL->modulesEnabledThisAeLoop) moduleAcquireGIL(TRUE /*fServerThread*/);
aeThreadOnline();
wakeTimeThread();
serverAssert(serverTL->gcEpoch.isReset()); serverAssert(serverTL->gcEpoch.isReset());
serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch(); serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch();
@ -6852,17 +6851,18 @@ int redisFork(int purpose) {
openChildInfoPipe(); openChildInfoPipe();
} }
long long startWriteLock = ustime(); long long startWriteLock = ustime();
g_forkLock->acquireWrite(); aeAcquireForkLock();
latencyAddSampleIfNeeded("fork-lock",(ustime()-startWriteLock)/1000); latencyAddSampleIfNeeded("fork-lock",(ustime()-startWriteLock)/1000);
if ((childpid = fork()) == 0) { if ((childpid = fork()) == 0) {
/* Child */ /* Child */
aeReleaseForkLock();
g_pserver->in_fork_child = purpose; g_pserver->in_fork_child = purpose;
setOOMScoreAdj(CONFIG_OOM_BGCHILD); setOOMScoreAdj(CONFIG_OOM_BGCHILD);
setupChildSignalHandlers(); setupChildSignalHandlers();
closeChildUnusedResourceAfterFork(); closeChildUnusedResourceAfterFork();
} else { } else {
/* Parent */ /* Parent */
g_forkLock->releaseWrite(); aeReleaseForkLock();
g_pserver->stat_total_forks++; g_pserver->stat_total_forks++;
g_pserver->stat_fork_time = ustime()-start; g_pserver->stat_fork_time = ustime()-start;
g_pserver->stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / g_pserver->stat_fork_time / (1024*1024*1024); /* GB per second. */ g_pserver->stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / g_pserver->stat_fork_time / (1024*1024*1024); /* GB per second. */
@ -6940,7 +6940,7 @@ void loadDataFromDisk(void) {
if (loadAppendOnlyFile(g_pserver->aof_filename) == C_OK) if (loadAppendOnlyFile(g_pserver->aof_filename) == C_OK)
serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000); serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
} else if (g_pserver->rdb_filename != NULL || g_pserver->rdb_s3bucketpath != NULL) { } else if (g_pserver->rdb_filename != NULL || g_pserver->rdb_s3bucketpath != NULL) {
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; rdbSaveInfo rsi;
rsi.fForceSetKey = false; rsi.fForceSetKey = false;
errno = 0; /* Prevent a stale value from affecting error checking */ errno = 0; /* Prevent a stale value from affecting error checking */
if (rdbLoad(&rsi,RDBFLAGS_NONE) == C_OK) { if (rdbLoad(&rsi,RDBFLAGS_NONE) == C_OK) {
@ -6969,6 +6969,17 @@ void loadDataFromDisk(void) {
while ((ln = listNext(&li))) while ((ln = listNext(&li)))
{ {
redisMaster *mi = (redisMaster*)listNodeValue(ln); redisMaster *mi = (redisMaster*)listNodeValue(ln);
if (g_pserver->fActiveReplica) {
for (size_t i = 0; i < rsi.numMasters(); i++) {
if (!strcmp(mi->masterhost, rsi.masters[i].masterhost) && mi->masterport == rsi.masters[i].masterport) {
memcpy(mi->master_replid, rsi.masters[i].master_replid, sizeof(mi->master_replid));
mi->master_initial_offset = rsi.masters[i].master_initial_offset;
replicationCacheMasterUsingMaster(mi);
serverLog(LL_NOTICE, "Cached master recovered from RDB for %s:%d", mi->masterhost, mi->masterport);
}
}
}
else {
/* If we are a replica, create a cached master from this /* If we are a replica, create a cached master from this
* information, in order to allow partial resynchronizations * information, in order to allow partial resynchronizations
* with masters. */ * with masters. */
@ -6976,6 +6987,7 @@ void loadDataFromDisk(void) {
selectDb(mi->cached_master,rsi.repl_stream_db); selectDb(mi->cached_master,rsi.repl_stream_db);
} }
} }
}
} else if (errno != ENOENT) { } else if (errno != ENOENT) {
serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno)); serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
exit(1); exit(1);
@ -7207,7 +7219,9 @@ void OnTerminate()
void wakeTimeThread() { void wakeTimeThread() {
updateCachedTime(); updateCachedTime();
std::lock_guard<std::mutex> lock(time_thread_mutex); aeThreadOffline();
std::unique_lock<fastlock> lock(time_thread_lock);
aeThreadOnline();
if (sleeping_threads >= cserver.cthreads) if (sleeping_threads >= cserver.cthreads)
time_thread_cv.notify_one(); time_thread_cv.notify_one();
sleeping_threads--; sleeping_threads--;
@ -7219,21 +7233,23 @@ void *timeThreadMain(void*) {
delay.tv_sec = 0; delay.tv_sec = 0;
delay.tv_nsec = 100; delay.tv_nsec = 100;
int cycle_count = 0; int cycle_count = 0;
g_forkLock->acquireRead(); aeThreadOnline();
while (true) { while (true) {
{ {
std::unique_lock<std::mutex> lock(time_thread_mutex); aeThreadOffline();
std::unique_lock<fastlock> lock(time_thread_lock);
aeThreadOnline();
if (sleeping_threads >= cserver.cthreads) { if (sleeping_threads >= cserver.cthreads) {
g_forkLock->releaseRead(); aeThreadOffline();
time_thread_cv.wait(lock); time_thread_cv.wait(lock);
g_forkLock->acquireRead(); aeThreadOnline();
cycle_count = 0; cycle_count = 0;
} }
} }
updateCachedTime(); updateCachedTime();
if (cycle_count == MAX_CYCLES_TO_HOLD_FORK_LOCK) { if (cycle_count == MAX_CYCLES_TO_HOLD_FORK_LOCK) {
g_forkLock->releaseRead(); aeThreadOffline();
g_forkLock->acquireRead(); aeThreadOnline();
cycle_count = 0; cycle_count = 0;
} }
#if defined(__APPLE__) #if defined(__APPLE__)
@ -7243,7 +7259,7 @@ void *timeThreadMain(void*) {
#endif #endif
cycle_count++; cycle_count++;
} }
g_forkLock->releaseRead(); aeThreadOffline();
} }
void *workerThreadMain(void *parg) void *workerThreadMain(void *parg)
@ -7255,12 +7271,15 @@ void *workerThreadMain(void *parg)
if (iel != IDX_EVENT_LOOP_MAIN) if (iel != IDX_EVENT_LOOP_MAIN)
{ {
aeThreadOnline();
aeAcquireLock(); aeAcquireLock();
initNetworkingThread(iel, cserver.cthreads > 1); initNetworkingThread(iel, cserver.cthreads > 1);
aeReleaseLock(); aeReleaseLock();
aeThreadOffline();
} }
moduleAcquireGIL(true); // Normally afterSleep acquires this, but that won't be called on the first run moduleAcquireGIL(true); // Normally afterSleep acquires this, but that won't be called on the first run
aeThreadOnline();
aeEventLoop *el = g_pserver->rgthreadvar[iel].el; aeEventLoop *el = g_pserver->rgthreadvar[iel].el;
try try
{ {
@ -7269,6 +7288,7 @@ void *workerThreadMain(void *parg)
catch (ShutdownException) catch (ShutdownException)
{ {
} }
aeThreadOffline();
moduleReleaseGIL(true); moduleReleaseGIL(true);
serverAssert(!GlobalLocksAcquired()); serverAssert(!GlobalLocksAcquired());
aeDeleteEventLoop(el); aeDeleteEventLoop(el);
@ -7417,8 +7437,8 @@ int main(int argc, char **argv) {
g_pserver->sentinel_mode = checkForSentinelMode(argc,argv); g_pserver->sentinel_mode = checkForSentinelMode(argc,argv);
initServerConfig(); initServerConfig();
serverTL = &g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN]; serverTL = &g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN];
aeThreadOnline();
aeAcquireLock(); // We own the lock on boot aeAcquireLock(); // We own the lock on boot
ACLInit(); /* The ACL subsystem must be initialized ASAP because the ACLInit(); /* The ACL subsystem must be initialized ASAP because the
basic networking code and client creation depends on it. */ basic networking code and client creation depends on it. */
moduleInitModulesSystem(); moduleInitModulesSystem();
@ -7588,7 +7608,7 @@ int main(int argc, char **argv) {
__AFL_INIT(); __AFL_INIT();
#endif #endif
rio rdb; rio rdb;
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; rdbSaveInfo rsi;
startLoadingFile(stdin, (char*)"stdin", 0); startLoadingFile(stdin, (char*)"stdin", 0);
rioInitWithFile(&rdb,stdin); rioInitWithFile(&rdb,stdin);
rdbLoadRio(&rdb,0,&rsi); rdbLoadRio(&rdb,0,&rsi);
@ -7653,6 +7673,7 @@ int main(int argc, char **argv) {
redisSetCpuAffinity(g_pserver->server_cpulist); redisSetCpuAffinity(g_pserver->server_cpulist);
aeReleaseLock(); //Finally we can dump the lock aeReleaseLock(); //Finally we can dump the lock
aeThreadOffline();
moduleReleaseGIL(true); moduleReleaseGIL(true);
setOOMScoreAdj(-1); setOOMScoreAdj(-1);

View File

@ -1850,6 +1850,43 @@ struct redisMemOverhead {
} *db; } *db;
}; };
struct redisMaster {
char *masteruser; /* AUTH with this user and masterauth with master */
char *masterauth; /* AUTH with this password with master */
char *masterhost; /* Hostname of master */
int masterport; /* Port of master */
client *cached_master; /* Cached master to be reused for PSYNC. */
client *master;
/* The following two fields is where we store master PSYNC replid/offset
* while the PSYNC is in progress. At the end we'll copy the fields into
* the server->master client structure. */
char master_replid[CONFIG_RUN_ID_SIZE+1]; /* Master PSYNC runid. */
long long master_initial_offset; /* Master PSYNC offset. */
bool isActive = false;
bool isRocksdbSnapshotRepl = false;
int repl_state; /* Replication status if the instance is a replica */
off_t repl_transfer_size; /* Size of RDB to read from master during sync. */
off_t repl_transfer_read; /* Amount of RDB read from master during sync. */
off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */
connection *repl_transfer_s; /* Replica -> Master SYNC socket */
int repl_transfer_fd; /* Replica -> Master SYNC temp file descriptor */
char *repl_transfer_tmpfile; /* Replica-> master SYNC temp file name */
time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */
time_t repl_down_since; /* Unix time at which link with master went down */
class SnapshotPayloadParseState *parseState;
sds bulkreadBuffer = nullptr;
unsigned char master_uuid[UUID_BINARY_LEN]; /* Used during sync with master, this is our master's UUID */
/* After we've connected with our master use the UUID in g_pserver->master */
uint64_t mvccLastSync;
/* During a handshake the server may have stale keys, we track these here to share once a reciprocal connection is made */
std::map<int, std::vector<robj_sharedptr>> *staleKeyMap;
int ielReplTransfer = -1;
};
/* This structure can be optionally passed to RDB save/load functions in /* This structure can be optionally passed to RDB save/load functions in
* order to implement additional functionalities, by storing and loading * order to implement additional functionalities, by storing and loading
* metadata to the RDB file. * metadata to the RDB file.
@ -1858,7 +1895,66 @@ struct redisMemOverhead {
* replication in order to make sure that chained slaves (slaves of slaves) * replication in order to make sure that chained slaves (slaves of slaves)
* select the correct DB and are able to accept the stream coming from the * select the correct DB and are able to accept the stream coming from the
* top-level master. */ * top-level master. */
typedef struct rdbSaveInfo { class rdbSaveInfo {
public:
rdbSaveInfo() {
repl_stream_db = -1;
repl_id_is_set = 0;
memcpy(repl_id, "0000000000000000000000000000000000000000", sizeof(repl_id));
repl_offset = -1;
fForceSetKey = TRUE;
mvccMinThreshold = 0;
masters = nullptr;
masterCount = 0;
}
rdbSaveInfo(const rdbSaveInfo &other) {
repl_stream_db = other.repl_stream_db;
repl_id_is_set = other.repl_id_is_set;
memcpy(repl_id, other.repl_id, sizeof(repl_id));
repl_offset = other.repl_offset;
fForceSetKey = other.fForceSetKey;
mvccMinThreshold = other.mvccMinThreshold;
masters = (struct redisMaster*)malloc(sizeof(struct redisMaster) * other.masterCount);
memcpy(masters, other.masters, sizeof(struct redisMaster) * other.masterCount);
masterCount = other.masterCount;
}
rdbSaveInfo(rdbSaveInfo &&other) : rdbSaveInfo() {
swap(*this, other);
}
rdbSaveInfo &operator=(rdbSaveInfo other) {
swap(*this, other);
return *this;
}
~rdbSaveInfo() {
free(masters);
}
friend void swap(rdbSaveInfo &first, rdbSaveInfo &second) {
std::swap(first.repl_stream_db, second.repl_stream_db);
std::swap(first.repl_id_is_set, second.repl_id_is_set);
std::swap(first.repl_id, second.repl_id);
std::swap(first.repl_offset, second.repl_offset);
std::swap(first.fForceSetKey, second.fForceSetKey);
std::swap(first.mvccMinThreshold, second.mvccMinThreshold);
std::swap(first.masters, second.masters);
std::swap(first.masterCount, second.masterCount);
}
void addMaster(const struct redisMaster &mi) {
masterCount++;
if (masters == nullptr) {
masters = (struct redisMaster*)malloc(sizeof(struct redisMaster));
}
else {
masters = (struct redisMaster*)realloc(masters, sizeof(struct redisMaster) * masterCount);
}
memcpy(masters + masterCount - 1, &mi, sizeof(struct redisMaster));
}
size_t numMasters() {
return masterCount;
}
/* Used saving and loading. */ /* Used saving and loading. */
int repl_stream_db; /* DB to select in g_pserver->master client. */ int repl_stream_db; /* DB to select in g_pserver->master client. */
@ -1872,10 +1968,11 @@ typedef struct rdbSaveInfo {
long long master_repl_offset; long long master_repl_offset;
uint64_t mvccMinThreshold; uint64_t mvccMinThreshold;
struct redisMaster *mi; struct redisMaster *masters;
} rdbSaveInfo;
#define RDB_SAVE_INFO_INIT {-1,0,"0000000000000000000000000000000000000000",-1, TRUE, 0, 0, nullptr} private:
size_t masterCount;
};
struct malloc_stats { struct malloc_stats {
size_t zmalloc_used; size_t zmalloc_used;
@ -2081,42 +2178,6 @@ private:
int rdb_key_save_delay = -1; // thread local cache int rdb_key_save_delay = -1; // thread local cache
}; };
struct redisMaster {
char *masteruser; /* AUTH with this user and masterauth with master */
char *masterauth; /* AUTH with this password with master */
char *masterhost; /* Hostname of master */
int masterport; /* Port of master */
client *cached_master; /* Cached master to be reused for PSYNC. */
client *master;
/* The following two fields is where we store master PSYNC replid/offset
* while the PSYNC is in progress. At the end we'll copy the fields into
* the server->master client structure. */
char master_replid[CONFIG_RUN_ID_SIZE+1]; /* Master PSYNC runid. */
long long master_initial_offset; /* Master PSYNC offset. */
bool isActive = false;
bool isRocksdbSnapshotRepl = false;
int repl_state; /* Replication status if the instance is a replica */
off_t repl_transfer_size; /* Size of RDB to read from master during sync. */
off_t repl_transfer_read; /* Amount of RDB read from master during sync. */
off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */
connection *repl_transfer_s; /* Slave -> Master SYNC socket */
int repl_transfer_fd; /* Slave -> Master SYNC temp file descriptor */
char *repl_transfer_tmpfile; /* Slave-> master SYNC temp file name */
time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */
time_t repl_down_since; /* Unix time at which link with master went down */
class SnapshotPayloadParseState *parseState;
sds bulkreadBuffer = nullptr;
unsigned char master_uuid[UUID_BINARY_LEN]; /* Used during sync with master, this is our master's UUID */
/* After we've connected with our master use the UUID in g_pserver->master */
uint64_t mvccLastSync;
/* During a handshake the server may have stale keys, we track these here to share once a reciprocal connection is made */
std::map<int, std::vector<robj_sharedptr>> *staleKeyMap;
int ielReplTransfer = -1;
};
// Const vars are not changed after worker threads are launched // Const vars are not changed after worker threads are launched
struct redisServerConst { struct redisServerConst {
pid_t pid; /* Main process pid. */ pid_t pid; /* Main process pid. */
@ -2764,7 +2825,6 @@ typedef struct {
*----------------------------------------------------------------------------*/ *----------------------------------------------------------------------------*/
//extern struct redisServer server; //extern struct redisServer server;
extern readWriteLock *g_forkLock;
extern struct redisServerConst cserver; extern struct redisServerConst cserver;
extern thread_local struct redisServerThreadVars *serverTL; // thread local server vars extern thread_local struct redisServerThreadVars *serverTL; // thread local server vars
extern struct sharedObjectsStruct shared; extern struct sharedObjectsStruct shared;
@ -3100,9 +3160,9 @@ long long getPsyncInitialOffset(void);
int replicationSetupSlaveForFullResync(client *replica, long long offset); int replicationSetupSlaveForFullResync(client *replica, long long offset);
void changeReplicationId(void); void changeReplicationId(void);
void clearReplicationId2(void); void clearReplicationId2(void);
void mergeReplicationId(const char *);
void chopReplicationBacklog(void); void chopReplicationBacklog(void);
void replicationCacheMasterUsingMyself(struct redisMaster *mi); void replicationCacheMasterUsingMyself(struct redisMaster *mi);
void replicationCacheMasterUsingMaster(struct redisMaster *mi);
void feedReplicationBacklog(const void *ptr, size_t len); void feedReplicationBacklog(const void *ptr, size_t len);
void updateMasterAuth(); void updateMasterAuth();
void showLatestBacklog(); void showLatestBacklog();

View File

@ -118,6 +118,7 @@ void pool_free(struct alloc_pool *ppool, void *pv)
return; return;
} }
extern size_t OBJ_ENCODING_EMBSTR_SIZE_LIMIT;
#define EMBSTR_ROBJ_SIZE (sizeof(robj)+sizeof(struct sdshdr8)+OBJ_ENCODING_EMBSTR_SIZE_LIMIT+1) #define EMBSTR_ROBJ_SIZE (sizeof(robj)+sizeof(struct sdshdr8)+OBJ_ENCODING_EMBSTR_SIZE_LIMIT+1)
struct alloc_pool poolobj; struct alloc_pool poolobj;
struct alloc_pool poolembstrobj; struct alloc_pool poolembstrobj;

View File

@ -1191,6 +1191,7 @@ void zsetConvert(robj *zobj, int encoding) {
zs->dict = dictCreate(&zsetDictType,NULL); zs->dict = dictCreate(&zsetDictType,NULL);
zs->zsl = zslCreate(); zs->zsl = zslCreate();
if (ziplistLen(zl) > 0) {
eptr = ziplistIndex(zl,0); eptr = ziplistIndex(zl,0);
serverAssertWithInfo(NULL,zobj,eptr != NULL); serverAssertWithInfo(NULL,zobj,eptr != NULL);
sptr = ziplistNext(zl,eptr); sptr = ziplistNext(zl,eptr);
@ -1208,6 +1209,7 @@ void zsetConvert(robj *zobj, int encoding) {
serverAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK); serverAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK);
zzlNext(zl,&eptr,&sptr); zzlNext(zl,&eptr,&sptr);
} }
}
zfree(zobj->m_ptr); zfree(zobj->m_ptr);
zobj->m_ptr = zs; zobj->m_ptr = zs;

View File

@ -243,7 +243,7 @@ void *ztryrealloc_usable(void *ptr, size_t size, size_t *usable) {
#else #else
realptr = (char*)ptr-PREFIX_SIZE; realptr = (char*)ptr-PREFIX_SIZE;
oldsize = *((size_t*)realptr); oldsize = *((size_t*)realptr);
newptr = realloc(realptr,size+PREFIX_SIZE); newptr = realloc(realptr,size+PREFIX_SIZE, MALLOC_LOCAL);
if (newptr == NULL) { if (newptr == NULL) {
if (usable) *usable = 0; if (usable) *usable = 0;
return NULL; return NULL;

View File

@ -1,4 +1,5 @@
# Minimal configuration for testing. # Minimal configuration for testing.
bind 127.0.0.1
always-show-logo yes always-show-logo yes
daemonize no daemonize no
pidfile /var/run/keydb.pid pidfile /var/run/keydb.pid

View File

@ -13,6 +13,7 @@ set ::tlsdir "../../tls"
proc main {} { proc main {} {
parse_options parse_options
spawn_instance redis $::redis_base_port $::instances_count { spawn_instance redis $::redis_base_port $::instances_count {
"bind 127.0.0.1"
"cluster-enabled yes" "cluster-enabled yes"
"appendonly yes" "appendonly yes"
"testmode yes" "testmode yes"

View File

@ -0,0 +1,43 @@
start_server {tags {"repl"} overrides {active-replica {yes} multi-master {yes}}} {
start_server {overrides {active-replica {yes} multi-master {yes}}} {
test {2 node cluster heals after multimaster psync} {
set master [srv -1 client]
set master_host [srv -1 host]
set master_port [srv -1 port]
set replica [srv 0 client]
set replica_host [srv 0 host]
set replica_port [srv 0 port]
# connect two nodes in active-active
$replica replicaof $master_host $master_port
$master replicaof $replica_host $replica_port
after 1000
# write to db7 in the master
$master select 7
$master set x 1
# restart the replica to break the connection and force a psync
restart_server 0 true false
set replica [srv 0 client]
# write again to db7
$master set y 2
# uncommenting the following delay causes test to pass
# after 1000
# reconnect the replica to the master
$replica replicaof $master_host $master_port
# verify results
after 1000
for {set j 0} {$j < 16} {incr j} {
$master select $j
$replica select $j
assert_equal [$replica get x] [$master get x]
assert_equal [$replica get y] [$master get y]
}
}
}
}

View File

@ -32,6 +32,7 @@ TEST_MODULES = \
auth.so \ auth.so \
keyspace_events.so \ keyspace_events.so \
blockedclient.so \ blockedclient.so \
timers.so \
getkeys.so \ getkeys.so \
test_lazyfree.so \ test_lazyfree.so \
timer.so \ timer.so \