From 8a7ace0a346811a4b65013685fc310912a396a95 Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 22 Apr 2022 22:43:20 +0000 Subject: [PATCH 01/35] PSYNC production fixes --- src/rdb.cpp | 120 +++++++++++++++++++++++++------------------- src/replication.cpp | 4 +- src/server.cpp | 40 +++++---------- src/server.h | 95 +++++++++++++++++++++-------------- src/t_stream.cpp | 1 - src/tls.cpp | 7 +++ 6 files changed, 149 insertions(+), 118 deletions(-) diff --git a/src/rdb.cpp b/src/rdb.cpp index 5d158eec5..320ab918b 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1218,37 +1218,15 @@ int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { == -1) return -1; if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",rsi->master_repl_offset) == -1) return -1; - if (g_pserver->fActiveReplica && listLength(g_pserver->masters) > 0) { + if (g_pserver->fActiveReplica) { sdsstring val = sdsstring(sdsempty()); - listNode *ln; - listIter li; - redisMaster* mi; - listRewind(g_pserver->masters,&li); - while ((ln = listNext(&li)) != NULL) { - mi = (redisMaster*)listNodeValue(ln); - if (!mi->master) { - // If master client is not available, use info from master struct - better than nothing - serverLog(LL_NOTICE, "saving master %s", mi->master_replid); - if (mi->master_replid[0] == 0) { - // if replid is null, there's no reason to save it - continue; - } - val = val.catfmt("%s:%I:%s:%i;", mi->master_replid, - mi->master_initial_offset, - mi->masterhost, - mi->masterport); - } - else { - serverLog(LL_NOTICE, "saving master %s", mi->master->replid); - if (mi->master->replid[0] == 0) { - // if replid is null, there's no reason to save it - continue; - } - val = val.catfmt("%s:%I:%s:%i;", mi->master->replid, - mi->master->reploff, - mi->masterhost, - mi->masterport); - } + + for (auto &msi : rsi->vecmastersaveinfo) { + val = val.catfmt("%s:%I:%s:%i:%i;", msi.master_replid, + msi.master_initial_offset, + msi.masterhost.get(), + msi.masterport, + msi.selected_db); } if (rdbSaveAuxFieldStrStr(rdb, "repl-masters",val.get()) == -1) return -1; } @@ -1661,11 +1639,12 @@ int launchRdbSaveThread(pthread_t &child, rdbSaveInfo *rsi) return rdbSaveBackgroundFork(rsi); } else { - rdbSaveThreadArgs *args = (rdbSaveThreadArgs*)zmalloc(sizeof(rdbSaveThreadArgs) + ((cserver.dbnum-1)*sizeof(redisDbPersistentDataSnapshot*)), MALLOC_LOCAL); + rdbSaveThreadArgs *args = (rdbSaveThreadArgs*)zcalloc(sizeof(rdbSaveThreadArgs) + ((cserver.dbnum-1)*sizeof(redisDbPersistentDataSnapshot*)), MALLOC_LOCAL); + // Placement new rdbSaveInfo rsiT; if (rsi == nullptr) rsi = &rsiT; - args->rsi = *(new (args) rdbSaveInfo(*rsi)); + args->rsi = *rsi; memcpy(&args->rsi.repl_id, g_pserver->replid, sizeof(g_pserver->replid)); args->rsi.master_repl_offset = g_pserver->master_repl_offset; @@ -2922,11 +2901,11 @@ public: * snapshot taken by the master may not be reflected on the replica. */ bool fExpiredKey = iAmMaster() && !(this->rdbflags&RDBFLAGS_AOF_PREAMBLE) && job.expiretime != INVALID_EXPIRE && job.expiretime < this->now; if (fStaleMvccKey || fExpiredKey) { - if (fStaleMvccKey && !fExpiredKey && this->rsi != nullptr && this->rsi->masters != nullptr && this->rsi->masters->staleKeyMap != nullptr && lookupKeyRead(job.db, &keyobj) == nullptr) { + if (fStaleMvccKey && !fExpiredKey && this->rsi != nullptr && this->rsi->mi != nullptr && this->rsi->mi->staleKeyMap != nullptr && lookupKeyRead(job.db, &keyobj) == nullptr) { // We have a key that we've already deleted and is not back in our database. // We'll need to inform the sending master of the delete if it is also a replica of us robj_sharedptr objKeyDup(createStringObject(job.key, sdslen(job.key))); - this->rsi->masters->staleKeyMap->operator[](job.db->id).push_back(objKeyDup); + this->rsi->mi->staleKeyMap->operator[](job.db->id).push_back(objKeyDup); } sdsfree(job.key); job.key = nullptr; @@ -3242,19 +3221,26 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { } } else if (!strcasecmp(szFromObj(auxkey),"repl-masters")) { if (rsi) { - struct redisMaster mi; + MasterSaveInfo msi; char *masters = szFromObj(auxval); - char *entry = strtok(masters, ":"); + char *saveptr; + char *entry = strtok_r(masters, ":", &saveptr); while (entry != NULL) { - memcpy(mi.master_replid, entry, sizeof(mi.master_replid)); - entry = strtok(NULL, ":"); - mi.master_initial_offset = atoi(entry); - entry = strtok(NULL, ":"); - mi.masterhost = entry; - entry = strtok(NULL, ";"); - mi.masterport = atoi(entry); - entry = strtok(NULL, ":"); - rsi->addMaster(mi); + memcpy(msi.master_replid, entry, sizeof(msi.master_replid)); + entry = strtok_r(NULL, ":", &saveptr); + if (entry == nullptr) break; + msi.master_initial_offset = atoll(entry); + entry = strtok_r(NULL, ":", &saveptr); + if (entry == nullptr) break; + msi.masterhost = sdsstring(sdsnew(entry)); + entry = strtok_r(NULL, ":", &saveptr); + if (entry == nullptr) break; + msi.masterport = atoi(entry); + entry = strtok_r(NULL, ";", &saveptr); + if (entry == nullptr) break; + msi.selected_db = atoi(entry); + entry = strtok_r(NULL, ":", &saveptr); + rsi->addMaster(msi); } } } else if (!strcasecmp(szFromObj(auxkey),"repl-offset")) { @@ -3533,6 +3519,32 @@ eoferr: return C_ERR; } +void updateActiveReplicaMastersFromRsi(rdbSaveInfo *rsi) { + if (rsi != nullptr && g_pserver->fActiveReplica) { + serverLog(LL_NOTICE, "RDB contains information on %d masters", (int)rsi->numMasters()); + listIter li; + listNode *ln; + + listRewind(g_pserver->masters, &li); + while ((ln = listNext(&li))) + { + redisMaster *mi = (redisMaster*)listNodeValue(ln); + if (mi->master != nullptr) { + continue; //ignore connected masters + } + for (size_t i = 0; i < rsi->numMasters(); i++) { + if (!sdscmp(mi->masterhost, (sds)rsi->vecmastersaveinfo[i].masterhost.get()) && mi->masterport == rsi->vecmastersaveinfo[i].masterport) { + memcpy(mi->master_replid, rsi->vecmastersaveinfo[i].master_replid, sizeof(mi->master_replid)); + mi->master_initial_offset = rsi->vecmastersaveinfo[i].master_initial_offset; + replicationCacheMasterUsingMaster(mi); + serverLog(LL_NOTICE, "Cached master recovered from RDB for %s:%d", mi->masterhost, mi->masterport); + break; + } + } + } + } +} + int rdbLoad(rdbSaveInfo *rsi, int rdbflags) { int err = C_ERR; @@ -3913,11 +3925,22 @@ void bgsaveCommand(client *c) { * information. */ rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) { rdbSaveInfo rsi_init; - *rsi = std::move(rsi_init); + *rsi = rsi_init; memcpy(rsi->repl_id, g_pserver->replid, sizeof(g_pserver->replid)); rsi->master_repl_offset = g_pserver->master_repl_offset; + if (g_pserver->fActiveReplica) { + listIter li; + listNode *ln = nullptr; + listRewind(g_pserver->masters, &li); + while ((ln = listNext(&li))) { + redisMaster *mi = (redisMaster*)listNodeValue(ln); + MasterSaveInfo msi(*mi); + rsi->addMaster(msi); + } + } + /* If the instance is a master, we can populate the replication info * only when repl_backlog is not NULL. If the repl_backlog is NULL, * it means that the instance isn't in any replication chains. In this @@ -3935,11 +3958,6 @@ rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) { return rsi; } - if (listLength(g_pserver->masters) > 1) - { - // BUGBUG, warn user about this incomplete implementation - serverLog(LL_WARNING, "Warning: Only backing up first master's information in RDB"); - } struct redisMaster *miFirst = (redisMaster*)(listLength(g_pserver->masters) ? listNodeValue(listFirst(g_pserver->masters)) : NULL); /* If the instance is a replica we need a connected master diff --git a/src/replication.cpp b/src/replication.cpp index 7ca2a647e..fd713e55b 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -2856,6 +2856,7 @@ bool readSyncBulkPayloadRdb(connection *conn, redisMaster *mi, rdbSaveInfo &rsi, * gets promoted. */ return false; } + if (g_pserver->fActiveReplica) updateActiveReplicaMastersFromRsi(&rsi); /* RDB loading succeeded if we reach this point. */ if (g_pserver->repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { @@ -2934,7 +2935,7 @@ bool readSyncBulkPayloadRdb(connection *conn, redisMaster *mi, rdbSaveInfo &rsi, mi->staleKeyMap->clear(); else mi->staleKeyMap = new (MALLOC_LOCAL) std::map>(); - rsi.addMaster(*mi); + rsi.mi = mi; } if (rdbLoadFile(rdb_filename,&rsi,RDBFLAGS_REPLICATION) != C_OK) { serverLog(LL_WARNING, @@ -2951,6 +2952,7 @@ bool readSyncBulkPayloadRdb(connection *conn, redisMaster *mi, rdbSaveInfo &rsi, it'll be restarted when sync succeeds or replica promoted. */ return false; } + if (g_pserver->fActiveReplica) updateActiveReplicaMastersFromRsi(&rsi); /* Cleanup. */ if (g_pserver->rdb_del_sync_files && allPersistenceDisabled()) { diff --git a/src/server.cpp b/src/server.cpp index 3cac24e97..373b20337 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1141,7 +1141,7 @@ struct redisCommand redisCommandTable[] = { 0,NULL,0,0,0,0,0,0}, {"rreplay",replicaReplayCommand,-3, - "read-only fast noprop", + "read-only fast noprop ok-stale", 0,NULL,0,0,0,0,0,0}, {"keydb.cron",cronCommand,-5, @@ -4938,7 +4938,8 @@ int processCommand(client *c, int callFlags) { if (FBrokenLinkToMaster() && g_pserver->repl_serve_stale_data == 0 && is_denystale_command && - !(g_pserver->fActiveReplica && c->cmd->proc == syncCommand)) + !(g_pserver->fActiveReplica && c->cmd->proc == syncCommand) + && !FInReplicaReplay()) { rejectCommand(c, shared.masterdownerr); return C_OK; @@ -6962,31 +6963,15 @@ void loadDataFromDisk(void) { g_pserver->master_repl_offset = rsi.repl_offset; if (g_pserver->repl_batch_offStart >= 0) g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; - listIter li; - listNode *ln; - - listRewind(g_pserver->masters, &li); - while ((ln = listNext(&li))) - { - redisMaster *mi = (redisMaster*)listNodeValue(ln); - if (g_pserver->fActiveReplica) { - for (size_t i = 0; i < rsi.numMasters(); i++) { - if (!strcmp(mi->masterhost, rsi.masters[i].masterhost) && mi->masterport == rsi.masters[i].masterport) { - memcpy(mi->master_replid, rsi.masters[i].master_replid, sizeof(mi->master_replid)); - mi->master_initial_offset = rsi.masters[i].master_initial_offset; - replicationCacheMasterUsingMaster(mi); - serverLog(LL_NOTICE, "Cached master recovered from RDB for %s:%d", mi->masterhost, mi->masterport); - } - } - } - else { - /* If we are a replica, create a cached master from this - * information, in order to allow partial resynchronizations - * with masters. */ - replicationCacheMasterUsingMyself(mi); - selectDb(mi->cached_master,rsi.repl_stream_db); - } - } + } + updateActiveReplicaMastersFromRsi(&rsi); + if (!g_pserver->fActiveReplica && listLength(g_pserver->masters)) { + redisMaster *mi = (redisMaster*)listNodeValue(listFirst(g_pserver->masters)); + /* If we are a replica, create a cached master from this + * information, in order to allow partial resynchronizations + * with masters. */ + replicationCacheMasterUsingMyself(mi); + selectDb(mi->cached_master,rsi.repl_stream_db); } } else if (errno != ENOENT) { serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno)); @@ -7293,6 +7278,7 @@ void *workerThreadMain(void *parg) serverAssert(!GlobalLocksAcquired()); aeDeleteEventLoop(el); + tlsCleanupThread(); return NULL; } diff --git a/src/server.h b/src/server.h index 0a7d7c1e6..f96482612 100644 --- a/src/server.h +++ b/src/server.h @@ -1887,6 +1887,40 @@ struct redisMaster { int ielReplTransfer = -1; }; +struct MasterSaveInfo { + MasterSaveInfo() = default; + MasterSaveInfo(const redisMaster &mi) { + memcpy(master_replid, mi.master_replid, sizeof(mi.master_replid)); + if (mi.master) { + master_initial_offset = mi.master->reploff; + selected_db = mi.master->db->id; + } else if (mi.cached_master) { + master_initial_offset = mi.cached_master->reploff; + selected_db = mi.cached_master->db->id; + } else { + master_initial_offset = -1; + selected_db = 0; + } + masterport = mi.masterport; + masterhost = sdsstring(sdsdup(mi.masterhost)); + masterport = mi.masterport; + } + + MasterSaveInfo &operator=(const MasterSaveInfo &other) { + masterhost = other.masterhost; + masterport = other.masterport; + memcpy(master_replid, other.master_replid, sizeof(master_replid)); + master_initial_offset = other.master_initial_offset; + return *this; + } + + sdsstring masterhost; + int masterport; + char master_replid[CONFIG_RUN_ID_SIZE+1]; + long long master_initial_offset; + int selected_db; +}; + /* This structure can be optionally passed to RDB save/load functions in * order to implement additional functionalities, by storing and loading * metadata to the RDB file. @@ -1904,8 +1938,6 @@ public: repl_offset = -1; fForceSetKey = TRUE; mvccMinThreshold = 0; - masters = nullptr; - masterCount = 0; } rdbSaveInfo(const rdbSaveInfo &other) { repl_stream_db = other.repl_stream_db; @@ -1914,45 +1946,31 @@ public: repl_offset = other.repl_offset; fForceSetKey = other.fForceSetKey; mvccMinThreshold = other.mvccMinThreshold; - masters = (struct redisMaster*)malloc(sizeof(struct redisMaster) * other.masterCount); - memcpy(masters, other.masters, sizeof(struct redisMaster) * other.masterCount); - masterCount = other.masterCount; + vecmastersaveinfo = other.vecmastersaveinfo; + master_repl_offset = other.master_repl_offset; + mi = other.mi; } - rdbSaveInfo(rdbSaveInfo &&other) : rdbSaveInfo() { - swap(*this, other); - } - rdbSaveInfo &operator=(rdbSaveInfo other) { - swap(*this, other); + + rdbSaveInfo &operator=(const rdbSaveInfo &other) { + repl_stream_db = other.repl_stream_db; + repl_id_is_set = other.repl_id_is_set; + memcpy(repl_id, other.repl_id, sizeof(repl_id)); + repl_offset = other.repl_offset; + fForceSetKey = other.fForceSetKey; + mvccMinThreshold = other.mvccMinThreshold; + vecmastersaveinfo = other.vecmastersaveinfo; + master_repl_offset = other.master_repl_offset; + mi = other.mi; + return *this; } - ~rdbSaveInfo() { - free(masters); - } - friend void swap(rdbSaveInfo &first, rdbSaveInfo &second) { - std::swap(first.repl_stream_db, second.repl_stream_db); - std::swap(first.repl_id_is_set, second.repl_id_is_set); - std::swap(first.repl_id, second.repl_id); - std::swap(first.repl_offset, second.repl_offset); - std::swap(first.fForceSetKey, second.fForceSetKey); - std::swap(first.mvccMinThreshold, second.mvccMinThreshold); - std::swap(first.masters, second.masters); - std::swap(first.masterCount, second.masterCount); - } - - void addMaster(const struct redisMaster &mi) { - masterCount++; - if (masters == nullptr) { - masters = (struct redisMaster*)malloc(sizeof(struct redisMaster)); - } - else { - masters = (struct redisMaster*)realloc(masters, sizeof(struct redisMaster) * masterCount); - } - memcpy(masters + masterCount - 1, &mi, sizeof(struct redisMaster)); + void addMaster(const MasterSaveInfo &si) { + vecmastersaveinfo.push_back(si); } size_t numMasters() { - return masterCount; + return vecmastersaveinfo.size(); } /* Used saving and loading. */ @@ -1968,10 +1986,8 @@ public: long long master_repl_offset; uint64_t mvccMinThreshold; - struct redisMaster *masters; - -private: - size_t masterCount; + std::vector vecmastersaveinfo; + struct redisMaster *mi = nullptr; }; struct malloc_stats { @@ -3853,6 +3869,8 @@ void lfenceCommand(client *c); int FBrokenLinkToMaster(int *pconnectMasters = nullptr); int FActiveMaster(client *c); struct redisMaster *MasterInfoFromClient(client *c); +bool FInReplicaReplay(); +void updateActiveReplicaMastersFromRsi(rdbSaveInfo *rsi); /* MVCC */ uint64_t getMvccTstamp(); @@ -3950,6 +3968,7 @@ void makeThreadKillable(void); /* TLS stuff */ void tlsInit(void); void tlsInitThread(); +void tlsCleanupThread(); void tlsCleanup(void); int tlsConfigure(redisTLSContextConfig *ctx_config); void tlsReload(void); diff --git a/src/t_stream.cpp b/src/t_stream.cpp index 010e9b65b..b005cf600 100644 --- a/src/t_stream.cpp +++ b/src/t_stream.cpp @@ -56,7 +56,6 @@ void streamFreeCG(streamCG *cg); void streamFreeNACK(streamNACK *na); size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer); -bool FInReplicaReplay(); int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq); int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq); diff --git a/src/tls.cpp b/src/tls.cpp index 9b3a0415c..68651bfbb 100644 --- a/src/tls.cpp +++ b/src/tls.cpp @@ -180,6 +180,12 @@ void tlsInitThread(void) pending_list = listCreate(); } +void tlsCleanupThread(void) +{ + if (pending_list) + listRelease(pending_list); +} + void tlsCleanup(void) { if (redis_tls_ctx) { SSL_CTX_free(redis_tls_ctx); @@ -1260,6 +1266,7 @@ int tlsProcessPendingData() { } void tlsInitThread() {} +void tlsCleanupThread(void) {} sds connTLSGetPeerCert(connection *conn_) { (void) conn_; From 96f7d482d4e898c170636b3599799d04b74bfecc Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 31 Jan 2022 18:43:54 -0500 Subject: [PATCH 02/35] Update ci.yml Change min tested version to 18.04 --- .github/workflows/ci.yml | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b50d41e6f..c0b60dd89 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -34,3 +34,26 @@ jobs: - name: rotation test run: | ./runtest-rotation + + build-ubuntu-old: + runs-on: ubuntu-18.04 + steps: + - uses: actions/checkout@v1 + - name: make -j2 + run: | + sudo apt-get update + sudo apt-get -y install uuid-dev libcurl4-openssl-dev + make -j2 + + build-macos-latest: + runs-on: macos-latest + steps: + - uses: actions/checkout@v2 + - name: make + run: make KEYDB_CFLAGS='-Werror' KEYDB_CXXFLAGS='-Werror' -j2 + + build-libc-malloc: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - name: make \ No newline at end of file From a3527311785027fb999b9d9246850e4f502ad689 Mon Sep 17 00:00:00 2001 From: malavan Date: Fri, 14 Jan 2022 18:50:20 +0000 Subject: [PATCH 03/35] fork lock for all threads, use fastlock for readwritelock --- src/ae.cpp | 42 ++++++++++++++++++++++++++++++++-------- src/aelocker.h | 4 ++++ src/module.cpp | 2 +- src/networking.cpp | 2 ++ src/readwritelock.h | 47 +++++++++++++++------------------------------ src/replication.cpp | 2 ++ src/server.cpp | 16 ++++++++------- 7 files changed, 67 insertions(+), 48 deletions(-) diff --git a/src/ae.cpp b/src/ae.cpp index 9c1155a2f..4643b3999 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -53,6 +53,7 @@ #include "zmalloc.h" #include "config.h" #include "serverassert.h" +#include "server.h" #ifdef USE_MUTEX thread_local int cOwnLock = 0; @@ -87,6 +88,8 @@ mutex_wrapper g_lock; #else fastlock g_lock("AE (global)"); #endif +readWriteLock forkLock("Fork (global)"); +readWriteLock *g_forkLock = &forkLock; thread_local aeEventLoop *g_eventLoopThisThread = NULL; /* Include the best multiplexing layer supported by this system. @@ -154,16 +157,22 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int ) case AE_ASYNC_OP::PostFunction: { - if (cmd.fLock && !ulock.owns_lock()) + if (cmd.fLock && !ulock.owns_lock()) { + g_forkLock->releaseRead(); ulock.lock(); + g_forkLock->acquireRead(); + } ((aePostFunctionProc*)cmd.proc)(cmd.clientData); break; } case AE_ASYNC_OP::PostCppFunction: { - if (cmd.fLock && !ulock.owns_lock()) + if (cmd.fLock && !ulock.owns_lock()) { + g_forkLock->releaseRead(); ulock.lock(); + g_forkLock->acquireRead(); + } (*cmd.pfn)(); delete cmd.pfn; @@ -547,7 +556,11 @@ static int processTimeEvents(aeEventLoop *eventLoop) { if (te->next) te->next->prev = te->prev; if (te->finalizerProc) { - if (!ulock.owns_lock()) ulock.lock(); + if (!ulock.owns_lock()) { + g_forkLock->releaseRead(); + ulock.lock(); + g_forkLock->acquireRead(); + } te->finalizerProc(eventLoop, te->clientData); now = getMonotonicUs(); } @@ -567,7 +580,11 @@ static int processTimeEvents(aeEventLoop *eventLoop) { } if (te->when <= now) { - if (!ulock.owns_lock()) ulock.lock(); + if (!ulock.owns_lock()) { + g_forkLock->releaseRead(); + ulock.lock(); + g_forkLock->acquireRead(); + } int retval; id = te->id; @@ -591,8 +608,11 @@ extern "C" void ProcessEventCore(aeEventLoop *eventLoop, aeFileEvent *fe, int ma { #define LOCK_IF_NECESSARY(fe, tsmask) \ std::unique_lock ulock(g_lock, std::defer_lock); \ - if (!(fe->mask & tsmask)) \ - ulock.lock() + if (!(fe->mask & tsmask)) { \ + g_forkLock->releaseRead(); \ + ulock.lock(); \ + g_forkLock->acquireRead(); \ + } int fired = 0; /* Number of events fired for current fd. */ @@ -704,8 +724,11 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP) { std::unique_lock ulock(g_lock, std::defer_lock); - if (!(eventLoop->beforesleepFlags & AE_SLEEP_THREADSAFE)) + if (!(eventLoop->beforesleepFlags & AE_SLEEP_THREADSAFE)) { + g_forkLock->releaseRead(); ulock.lock(); + g_forkLock->acquireRead(); + } eventLoop->beforesleep(eventLoop); } @@ -716,8 +739,11 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) /* After sleep callback. */ if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) { std::unique_lock ulock(g_lock, std::defer_lock); - if (!(eventLoop->aftersleepFlags & AE_SLEEP_THREADSAFE)) + if (!(eventLoop->aftersleepFlags & AE_SLEEP_THREADSAFE)) { + g_forkLock->releaseRead(); ulock.lock(); + g_forkLock->acquireRead(); + } eventLoop->aftersleep(eventLoop); } diff --git a/src/aelocker.h b/src/aelocker.h index 75a3cef53..dda2b3142 100644 --- a/src/aelocker.h +++ b/src/aelocker.h @@ -34,7 +34,9 @@ public: clientNesting = c->lock.unlock_recursive(); fOwnClientLock = false; } + g_forkLock->releaseRead(); aeAcquireLock(); + g_forkLock->acquireRead(); if (!c->lock.try_lock(false)) // ensure a strong try because aeAcquireLock is expensive { aeReleaseLock(); @@ -52,7 +54,9 @@ public: else if (!m_fArmed) { m_fArmed = true; + g_forkLock->releaseRead(); aeAcquireLock(); + g_forkLock->acquireRead(); } } diff --git a/src/module.cpp b/src/module.cpp index 6f4315c32..cf3c81c1a 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -365,7 +365,7 @@ typedef struct RedisModuleCommandFilter { static list *moduleCommandFilters; /* Module GIL Variables */ -static readWriteLock s_moduleGIL; +static readWriteLock s_moduleGIL("Module GIL"); thread_local bool g_fModuleThread = false; typedef void (*RedisModuleForkDoneHandler) (int exitcode, int bysignal, void *user_data); diff --git a/src/networking.cpp b/src/networking.cpp index a7920b692..73fd4332c 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1359,7 +1359,9 @@ void acceptOnThread(connection *conn, int flags, char *cip) } rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed); + g_forkLock->releaseRead(); aeAcquireLock(); + g_forkLock->acquireRead(); acceptCommonHandler(conn,flags,cip,ielCur); aeReleaseLock(); } diff --git a/src/readwritelock.h b/src/readwritelock.h index d03e1c82b..a7318a29f 100644 --- a/src/readwritelock.h +++ b/src/readwritelock.h @@ -2,22 +2,24 @@ #include class readWriteLock { - std::mutex m_readLock; - std::recursive_mutex m_writeLock; - std::condition_variable m_cv; + fastlock m_readLock; + fastlock m_writeLock; + std::condition_variable_any m_cv; int m_readCount = 0; int m_writeCount = 0; bool m_writeWaiting = false; public: + readWriteLock(const char *name) : m_readLock(name), m_writeLock(name) {} + void acquireRead() { - std::unique_lock rm(m_readLock); + std::unique_lock rm(m_readLock); while (m_writeCount > 0 || m_writeWaiting) m_cv.wait(rm); m_readCount++; } bool tryAcquireRead() { - std::unique_lock rm(m_readLock, std::defer_lock); + std::unique_lock rm(m_readLock, std::defer_lock); if (!rm.try_lock()) return false; if (m_writeCount > 0 || m_writeWaiting) @@ -27,7 +29,7 @@ public: } void acquireWrite(bool exclusive = true) { - std::unique_lock rm(m_readLock); + std::unique_lock rm(m_readLock); m_writeWaiting = true; while (m_readCount > 0) m_cv.wait(rm); @@ -43,24 +45,12 @@ public: } void upgradeWrite(bool exclusive = true) { - std::unique_lock rm(m_readLock); - m_writeWaiting = true; - while (m_readCount > 1) - m_cv.wait(rm); - if (exclusive) { - /* Another thread might have the write lock while we have the read lock - but won't be able to release it until they can acquire the read lock - so release the read lock and try again instead of waiting to avoid deadlock */ - while(!m_writeLock.try_lock()) - m_cv.wait(rm); - } - m_writeCount++; - m_readCount--; - m_writeWaiting = false; + releaseRead(); + acquireWrite(exclusive); } bool tryAcquireWrite(bool exclusive = true) { - std::unique_lock rm(m_readLock, std::defer_lock); + std::unique_lock rm(m_readLock, std::defer_lock); if (!rm.try_lock()) return false; if (m_readCount > 0) @@ -73,14 +63,13 @@ public: } void releaseRead() { - std::unique_lock rm(m_readLock); - serverAssert(m_readCount > 0); + std::unique_lock rm(m_readLock); m_readCount--; m_cv.notify_all(); } void releaseWrite(bool exclusive = true) { - std::unique_lock rm(m_readLock); + std::unique_lock rm(m_readLock); serverAssert(m_writeCount > 0); if (exclusive) m_writeLock.unlock(); @@ -89,14 +78,8 @@ public: } void downgradeWrite(bool exclusive = true) { - std::unique_lock rm(m_readLock); - serverAssert(m_writeCount > 0); - if (exclusive) - m_writeLock.unlock(); - m_writeCount--; - while (m_writeCount > 0 || m_writeWaiting) - m_cv.wait(rm); - m_readCount++; + releaseWrite(exclusive); + acquireRead(); } bool hasReader() { diff --git a/src/replication.cpp b/src/replication.cpp index 74e77c307..8347b8228 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -5334,7 +5334,9 @@ struct RemoteMasterState ~RemoteMasterState() { + g_forkLock->releaseRead(); aeAcquireLock(); + g_forkLock->acquireRead(); freeClient(cFake); aeReleaseLock(); } diff --git a/src/server.cpp b/src/server.cpp index 99cbec3f0..1e6052417 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -92,10 +92,8 @@ double R_Zero, R_PosInf, R_NegInf, R_Nan; /* Global vars */ namespace GlobalHidden { struct redisServer server; /* Server global state */ -readWriteLock forkLock; } redisServer *g_pserver = &GlobalHidden::server; -readWriteLock *g_forkLock = &GlobalHidden::forkLock; struct redisServerConst cserver; thread_local struct redisServerThreadVars *serverTL = NULL; // thread local server vars std::mutex time_thread_mutex; @@ -2975,7 +2973,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { g_pserver->garbageCollector.endEpoch(epoch); }, true /*fHiPri*/); } - + g_forkLock->releaseRead(); /* Determine whether the modules are enabled before sleeping, and use that result both here, and after wakeup to avoid double acquire or release of the GIL */ serverTL->modulesEnabledThisAeLoop = !!moduleCount(); @@ -2995,8 +2993,9 @@ void afterSleep(struct aeEventLoop *eventLoop) { Don't check here that modules are enabled, rather use the result from beforeSleep Otherwise you may double acquire the GIL and cause deadlocks in the module */ if (!ProcessingEventsWhileBlocked) { - wakeTimeThread(); if (serverTL->modulesEnabledThisAeLoop) moduleAcquireGIL(TRUE /*fServerThread*/); + g_forkLock->acquireRead(); + wakeTimeThread(); serverAssert(serverTL->gcEpoch.isReset()); serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch(); @@ -6852,7 +6851,7 @@ int redisFork(int purpose) { openChildInfoPipe(); } long long startWriteLock = ustime(); - g_forkLock->acquireWrite(); + g_forkLock->upgradeWrite(); latencyAddSampleIfNeeded("fork-lock",(ustime()-startWriteLock)/1000); if ((childpid = fork()) == 0) { /* Child */ @@ -6862,7 +6861,7 @@ int redisFork(int purpose) { closeChildUnusedResourceAfterFork(); } else { /* Parent */ - g_forkLock->releaseWrite(); + g_forkLock->downgradeWrite(); g_pserver->stat_total_forks++; g_pserver->stat_fork_time = ustime()-start; g_pserver->stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / g_pserver->stat_fork_time / (1024*1024*1024); /* GB per second. */ @@ -7261,6 +7260,7 @@ void *workerThreadMain(void *parg) } moduleAcquireGIL(true); // Normally afterSleep acquires this, but that won't be called on the first run + g_forkLock->acquireRead(); aeEventLoop *el = g_pserver->rgthreadvar[iel].el; try { @@ -7269,6 +7269,7 @@ void *workerThreadMain(void *parg) catch (ShutdownException) { } + g_forkLock->releaseRead(); moduleReleaseGIL(true); serverAssert(!GlobalLocksAcquired()); aeDeleteEventLoop(el); @@ -7418,7 +7419,7 @@ int main(int argc, char **argv) { initServerConfig(); serverTL = &g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN]; aeAcquireLock(); // We own the lock on boot - + g_forkLock->acquireRead(); ACLInit(); /* The ACL subsystem must be initialized ASAP because the basic networking code and client creation depends on it. */ moduleInitModulesSystem(); @@ -7652,6 +7653,7 @@ int main(int argc, char **argv) { } redisSetCpuAffinity(g_pserver->server_cpulist); + g_forkLock->releaseRead(); aeReleaseLock(); //Finally we can dump the lock moduleReleaseGIL(true); From f35baf8e7df9738a1314d6f61fad7b65fce2707c Mon Sep 17 00:00:00 2001 From: Malavan Sotheeswaran Date: Tue, 18 Jan 2022 11:36:59 -0800 Subject: [PATCH 04/35] hide forklock object in ae --- src/ae.cpp | 53 +++++++++++++++++++++++++++++++-------------- src/ae.h | 4 ++++ src/aelocker.h | 4 ---- src/networking.cpp | 2 -- src/replication.cpp | 2 -- src/server.cpp | 28 ++++++++++++------------ src/server.h | 1 - 7 files changed, 55 insertions(+), 39 deletions(-) diff --git a/src/ae.cpp b/src/ae.cpp index 4643b3999..7c4af1652 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -88,8 +88,7 @@ mutex_wrapper g_lock; #else fastlock g_lock("AE (global)"); #endif -readWriteLock forkLock("Fork (global)"); -readWriteLock *g_forkLock = &forkLock; +readWriteLock g_forkLock("Fork (global)"); thread_local aeEventLoop *g_eventLoopThisThread = NULL; /* Include the best multiplexing layer supported by this system. @@ -158,9 +157,9 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int ) case AE_ASYNC_OP::PostFunction: { if (cmd.fLock && !ulock.owns_lock()) { - g_forkLock->releaseRead(); + g_forkLock.releaseRead(); ulock.lock(); - g_forkLock->acquireRead(); + g_forkLock.acquireRead(); } ((aePostFunctionProc*)cmd.proc)(cmd.clientData); break; @@ -169,9 +168,9 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int ) case AE_ASYNC_OP::PostCppFunction: { if (cmd.fLock && !ulock.owns_lock()) { - g_forkLock->releaseRead(); + g_forkLock.releaseRead(); ulock.lock(); - g_forkLock->acquireRead(); + g_forkLock.acquireRead(); } (*cmd.pfn)(); @@ -557,9 +556,9 @@ static int processTimeEvents(aeEventLoop *eventLoop) { te->next->prev = te->prev; if (te->finalizerProc) { if (!ulock.owns_lock()) { - g_forkLock->releaseRead(); + g_forkLock.releaseRead(); ulock.lock(); - g_forkLock->acquireRead(); + g_forkLock.acquireRead(); } te->finalizerProc(eventLoop, te->clientData); now = getMonotonicUs(); @@ -581,9 +580,9 @@ static int processTimeEvents(aeEventLoop *eventLoop) { if (te->when <= now) { if (!ulock.owns_lock()) { - g_forkLock->releaseRead(); + g_forkLock.releaseRead(); ulock.lock(); - g_forkLock->acquireRead(); + g_forkLock.acquireRead(); } int retval; @@ -609,9 +608,9 @@ extern "C" void ProcessEventCore(aeEventLoop *eventLoop, aeFileEvent *fe, int ma #define LOCK_IF_NECESSARY(fe, tsmask) \ std::unique_lock ulock(g_lock, std::defer_lock); \ if (!(fe->mask & tsmask)) { \ - g_forkLock->releaseRead(); \ + g_forkLock.releaseRead(); \ ulock.lock(); \ - g_forkLock->acquireRead(); \ + g_forkLock.acquireRead(); \ } int fired = 0; /* Number of events fired for current fd. */ @@ -725,9 +724,9 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP) { std::unique_lock ulock(g_lock, std::defer_lock); if (!(eventLoop->beforesleepFlags & AE_SLEEP_THREADSAFE)) { - g_forkLock->releaseRead(); + g_forkLock.releaseRead(); ulock.lock(); - g_forkLock->acquireRead(); + g_forkLock.acquireRead(); } eventLoop->beforesleep(eventLoop); } @@ -740,9 +739,9 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) { std::unique_lock ulock(g_lock, std::defer_lock); if (!(eventLoop->aftersleepFlags & AE_SLEEP_THREADSAFE)) { - g_forkLock->releaseRead(); + g_forkLock.releaseRead(); ulock.lock(); - g_forkLock->acquireRead(); + g_forkLock.acquireRead(); } eventLoop->aftersleep(eventLoop); } @@ -818,9 +817,21 @@ void setAeLockSetThreadSpinWorker(spin_worker worker) tl_worker = worker; } +void aeThreadOnline() +{ + g_forkLock.acquireRead(); +} + void aeAcquireLock() { + g_forkLock.releaseRead(); g_lock.lock(tl_worker); + g_forkLock.acquireRead(); +} + +void aeAcquireForkLock() +{ + g_forkLock.upgradeWrite(); } int aeTryAcquireLock(int fWeak) @@ -828,6 +839,11 @@ int aeTryAcquireLock(int fWeak) return g_lock.try_lock(!!fWeak); } +void aeThreadOffline() +{ + g_forkLock.releaseRead(); +} + void aeReleaseLock() { g_lock.unlock(); @@ -838,6 +854,11 @@ void aeSetThreadOwnsLockOverride(int fOverride) fOwnLockOverride = fOverride; } +void aeReleaseForkLock() +{ + g_forkLock.downgradeWrite(); +} + int aeThreadOwnsLock() { return fOwnLockOverride || g_lock.fOwnLock(); diff --git a/src/ae.h b/src/ae.h index c22624ad6..cd513f652 100644 --- a/src/ae.h +++ b/src/ae.h @@ -164,9 +164,13 @@ void aeSetDontWait(aeEventLoop *eventLoop, int noWait); void aeClosePipesForForkChild(aeEventLoop *eventLoop); void setAeLockSetThreadSpinWorker(spin_worker worker); +void aeThreadOnline(); void aeAcquireLock(); +void aeAcquireForkLock(); int aeTryAcquireLock(int fWeak); +void aeThreadOffline(); void aeReleaseLock(); +void aeReleaseForkLock(); int aeThreadOwnsLock(); void aeSetThreadOwnsLockOverride(int fOverride); int aeLockContested(int threshold); diff --git a/src/aelocker.h b/src/aelocker.h index dda2b3142..75a3cef53 100644 --- a/src/aelocker.h +++ b/src/aelocker.h @@ -34,9 +34,7 @@ public: clientNesting = c->lock.unlock_recursive(); fOwnClientLock = false; } - g_forkLock->releaseRead(); aeAcquireLock(); - g_forkLock->acquireRead(); if (!c->lock.try_lock(false)) // ensure a strong try because aeAcquireLock is expensive { aeReleaseLock(); @@ -54,9 +52,7 @@ public: else if (!m_fArmed) { m_fArmed = true; - g_forkLock->releaseRead(); aeAcquireLock(); - g_forkLock->acquireRead(); } } diff --git a/src/networking.cpp b/src/networking.cpp index 73fd4332c..a7920b692 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1359,9 +1359,7 @@ void acceptOnThread(connection *conn, int flags, char *cip) } rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed); - g_forkLock->releaseRead(); aeAcquireLock(); - g_forkLock->acquireRead(); acceptCommonHandler(conn,flags,cip,ielCur); aeReleaseLock(); } diff --git a/src/replication.cpp b/src/replication.cpp index 8347b8228..74e77c307 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -5334,9 +5334,7 @@ struct RemoteMasterState ~RemoteMasterState() { - g_forkLock->releaseRead(); aeAcquireLock(); - g_forkLock->acquireRead(); freeClient(cFake); aeReleaseLock(); } diff --git a/src/server.cpp b/src/server.cpp index 1e6052417..7a91fb97d 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2973,7 +2973,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { g_pserver->garbageCollector.endEpoch(epoch); }, true /*fHiPri*/); } - g_forkLock->releaseRead(); + aeThreadOffline(); /* Determine whether the modules are enabled before sleeping, and use that result both here, and after wakeup to avoid double acquire or release of the GIL */ serverTL->modulesEnabledThisAeLoop = !!moduleCount(); @@ -2994,7 +2994,7 @@ void afterSleep(struct aeEventLoop *eventLoop) { Otherwise you may double acquire the GIL and cause deadlocks in the module */ if (!ProcessingEventsWhileBlocked) { if (serverTL->modulesEnabledThisAeLoop) moduleAcquireGIL(TRUE /*fServerThread*/); - g_forkLock->acquireRead(); + aeThreadOnline(); wakeTimeThread(); serverAssert(serverTL->gcEpoch.isReset()); @@ -6851,7 +6851,7 @@ int redisFork(int purpose) { openChildInfoPipe(); } long long startWriteLock = ustime(); - g_forkLock->upgradeWrite(); + aeAcquireForkLock(); latencyAddSampleIfNeeded("fork-lock",(ustime()-startWriteLock)/1000); if ((childpid = fork()) == 0) { /* Child */ @@ -6861,7 +6861,7 @@ int redisFork(int purpose) { closeChildUnusedResourceAfterFork(); } else { /* Parent */ - g_forkLock->downgradeWrite(); + aeReleaseForkLock(); g_pserver->stat_total_forks++; g_pserver->stat_fork_time = ustime()-start; g_pserver->stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / g_pserver->stat_fork_time / (1024*1024*1024); /* GB per second. */ @@ -7218,21 +7218,21 @@ void *timeThreadMain(void*) { delay.tv_sec = 0; delay.tv_nsec = 100; int cycle_count = 0; - g_forkLock->acquireRead(); + aeThreadOnline(); while (true) { { std::unique_lock lock(time_thread_mutex); if (sleeping_threads >= cserver.cthreads) { - g_forkLock->releaseRead(); + aeThreadOffline(); time_thread_cv.wait(lock); - g_forkLock->acquireRead(); + aeThreadOnline(); cycle_count = 0; } } updateCachedTime(); if (cycle_count == MAX_CYCLES_TO_HOLD_FORK_LOCK) { - g_forkLock->releaseRead(); - g_forkLock->acquireRead(); + aeThreadOffline(); + aeThreadOnline(); cycle_count = 0; } #if defined(__APPLE__) @@ -7242,7 +7242,7 @@ void *timeThreadMain(void*) { #endif cycle_count++; } - g_forkLock->releaseRead(); + aeThreadOffline(); } void *workerThreadMain(void *parg) @@ -7260,7 +7260,7 @@ void *workerThreadMain(void *parg) } moduleAcquireGIL(true); // Normally afterSleep acquires this, but that won't be called on the first run - g_forkLock->acquireRead(); + aeThreadOnline(); aeEventLoop *el = g_pserver->rgthreadvar[iel].el; try { @@ -7269,7 +7269,7 @@ void *workerThreadMain(void *parg) catch (ShutdownException) { } - g_forkLock->releaseRead(); + aeThreadOffline(); moduleReleaseGIL(true); serverAssert(!GlobalLocksAcquired()); aeDeleteEventLoop(el); @@ -7418,8 +7418,8 @@ int main(int argc, char **argv) { g_pserver->sentinel_mode = checkForSentinelMode(argc,argv); initServerConfig(); serverTL = &g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN]; + aeThreadOnline(); aeAcquireLock(); // We own the lock on boot - g_forkLock->acquireRead(); ACLInit(); /* The ACL subsystem must be initialized ASAP because the basic networking code and client creation depends on it. */ moduleInitModulesSystem(); @@ -7653,8 +7653,8 @@ int main(int argc, char **argv) { } redisSetCpuAffinity(g_pserver->server_cpulist); - g_forkLock->releaseRead(); aeReleaseLock(); //Finally we can dump the lock + aeThreadOffline(); moduleReleaseGIL(true); setOOMScoreAdj(-1); diff --git a/src/server.h b/src/server.h index 974661ad8..e3d7e90ae 100644 --- a/src/server.h +++ b/src/server.h @@ -2763,7 +2763,6 @@ typedef struct { *----------------------------------------------------------------------------*/ //extern struct redisServer server; -extern readWriteLock *g_forkLock; extern struct redisServerConst cserver; extern thread_local struct redisServerThreadVars *serverTL; // thread local server vars extern struct sharedObjectsStruct shared; From fa0a60240be40df2c418e19172a6cc89eab23db1 Mon Sep 17 00:00:00 2001 From: Malavan Sotheeswaran Date: Tue, 18 Jan 2022 11:40:47 -0800 Subject: [PATCH 05/35] only need to include readwritelock in ae --- src/ae.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ae.cpp b/src/ae.cpp index 7c4af1652..702c465d4 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -53,7 +53,7 @@ #include "zmalloc.h" #include "config.h" #include "serverassert.h" -#include "server.h" +#include "readwritelock.h" #ifdef USE_MUTEX thread_local int cOwnLock = 0; From 6d1cd00f153915b8c3a8117fd89fd8991b15bbac Mon Sep 17 00:00:00 2001 From: Malavan Sotheeswaran Date: Wed, 26 Jan 2022 14:02:39 -0800 Subject: [PATCH 06/35] time thread lock uses fastlock instead of std::mutex --- src/server.cpp | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/server.cpp b/src/server.cpp index 7a91fb97d..9df3764df 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -62,7 +62,6 @@ #include #include #include -#include #include #include "aelocker.h" #include "motd.h" @@ -96,8 +95,8 @@ struct redisServer server; /* Server global state */ redisServer *g_pserver = &GlobalHidden::server; struct redisServerConst cserver; thread_local struct redisServerThreadVars *serverTL = NULL; // thread local server vars -std::mutex time_thread_mutex; -std::condition_variable time_thread_cv; +fastlock time_thread_lock("Time thread lock"); +std::condition_variable_any time_thread_cv; int sleeping_threads = 0; void wakeTimeThread(); @@ -2959,7 +2958,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { // Scope lock_guard { - std::lock_guard lock(time_thread_mutex); + std::unique_lock lock(time_thread_lock); sleeping_threads++; serverAssert(sleeping_threads <= cserver.cthreads); } @@ -7206,7 +7205,7 @@ void OnTerminate() void wakeTimeThread() { updateCachedTime(); - std::lock_guard lock(time_thread_mutex); + std::unique_lock lock(time_thread_lock); if (sleeping_threads >= cserver.cthreads) time_thread_cv.notify_one(); sleeping_threads--; @@ -7221,7 +7220,7 @@ void *timeThreadMain(void*) { aeThreadOnline(); while (true) { { - std::unique_lock lock(time_thread_mutex); + std::unique_lock lock(time_thread_lock); if (sleeping_threads >= cserver.cthreads) { aeThreadOffline(); time_thread_cv.wait(lock); From 5e2d3fafae93b03f1f58a7ba13d7f6b47fcace14 Mon Sep 17 00:00:00 2001 From: Malavan Sotheeswaran Date: Thu, 27 Jan 2022 10:03:05 -0800 Subject: [PATCH 07/35] set thread as offline when waiting for time thread lock --- src/server.cpp | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/server.cpp b/src/server.cpp index 9df3764df..eea2032d8 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2956,6 +2956,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { if (!fSentReplies) handleClientsWithPendingWrites(iel, aof_state); + aeThreadOffline(); // Scope lock_guard { std::unique_lock lock(time_thread_lock); @@ -2972,7 +2973,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { g_pserver->garbageCollector.endEpoch(epoch); }, true /*fHiPri*/); } - aeThreadOffline(); + /* Determine whether the modules are enabled before sleeping, and use that result both here, and after wakeup to avoid double acquire or release of the GIL */ serverTL->modulesEnabledThisAeLoop = !!moduleCount(); @@ -7205,7 +7206,9 @@ void OnTerminate() void wakeTimeThread() { updateCachedTime(); + aeThreadOffline(); std::unique_lock lock(time_thread_lock); + aeThreadOnline(); if (sleeping_threads >= cserver.cthreads) time_thread_cv.notify_one(); sleeping_threads--; @@ -7220,7 +7223,9 @@ void *timeThreadMain(void*) { aeThreadOnline(); while (true) { { + aeThreadOffline(); std::unique_lock lock(time_thread_lock); + aeThreadOnline(); if (sleeping_threads >= cserver.cthreads) { aeThreadOffline(); time_thread_cv.wait(lock); From b5a7e4bcac8ac261b8fe58f4b41c704337daae1f Mon Sep 17 00:00:00 2001 From: benschermel Date: Sun, 6 Mar 2022 15:42:48 -0500 Subject: [PATCH 08/35] update README resource links --- README.md | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 832f50085..b00867b41 100644 --- a/README.md +++ b/README.md @@ -33,19 +33,13 @@ Because of this difference of opinion features which are right for KeyDB may not Additional Resources -------------------- -Try our docker container: https://hub.docker.com/r/eqalpha/keydb +Check out KeyDB's [Docker Image](https://hub.docker.com/r/eqalpha/keydb) -Talk on Gitter: https://gitter.im/KeyDB +Join us on [Slack](https://docs.keydb.dev/slack/) -Visit our Website: https://keydb.dev +Post to the [Community Forum](https://community.keydb.dev) -See options for channel partners and support contracts: https://keydb.dev/support.html - -Learn with KeyDB’s official documentation site: https://docs.keydb.dev - -[Subscribe to the KeyDB mailing list](https://eqalpha.us20.list-manage.com/subscribe/post?u=978f486c2f95589b24591a9cc&id=4ab9220500) - -Management GUI: We recommend [FastoNoSQL](https://fastonosql.com/) which has official KeyDB support. +Learn more through KeyDB's [Documentation & Learning Center](https://docs.keydb.dev) Benchmarking KeyDB From 57ccb8864135c4685c49a447fd8a1e5bc172d067 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 6 Mar 2022 20:38:00 -0500 Subject: [PATCH 09/35] Fix MALLOC=memkind build issues --- runtest-moduleapi | 1 + src/object.cpp | 2 +- src/storage.cpp | 1 + src/zmalloc.cpp | 2 +- tests/modules/Makefile | 1 + 5 files changed, 5 insertions(+), 2 deletions(-) diff --git a/runtest-moduleapi b/runtest-moduleapi index 8adf2171d..05ca161ab 100755 --- a/runtest-moduleapi +++ b/runtest-moduleapi @@ -32,6 +32,7 @@ $TCLSH tests/test_helper.tcl \ --single unit/moduleapi/blockedclient \ --single unit/moduleapi/moduleloadsave \ --single unit/moduleapi/getkeys \ +--single unit/moduleapi/timers \ --single unit/moduleapi/test_lazyfree \ --single unit/moduleapi/defrag \ --single unit/moduleapi/hash \ diff --git a/src/object.cpp b/src/object.cpp index 4f38b1768..eb8118d11 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -138,7 +138,7 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) { * * The current limit of 52 is chosen so that the biggest string object * we allocate as EMBSTR will still fit into the 64 byte arena of jemalloc. */ -#define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 52 +size_t OBJ_ENCODING_EMBSTR_SIZE_LIMIT = 52; robj *createStringObject(const char *ptr, size_t len) { if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT) diff --git a/src/storage.cpp b/src/storage.cpp index f36c5474e..6a42b509f 100644 --- a/src/storage.cpp +++ b/src/storage.cpp @@ -118,6 +118,7 @@ void pool_free(struct alloc_pool *ppool, void *pv) return; } +extern size_t OBJ_ENCODING_EMBSTR_SIZE_LIMIT; #define EMBSTR_ROBJ_SIZE (sizeof(robj)+sizeof(struct sdshdr8)+OBJ_ENCODING_EMBSTR_SIZE_LIMIT+1) struct alloc_pool poolobj; struct alloc_pool poolembstrobj; diff --git a/src/zmalloc.cpp b/src/zmalloc.cpp index 3e20cc652..dac6f6631 100644 --- a/src/zmalloc.cpp +++ b/src/zmalloc.cpp @@ -243,7 +243,7 @@ void *ztryrealloc_usable(void *ptr, size_t size, size_t *usable) { #else realptr = (char*)ptr-PREFIX_SIZE; oldsize = *((size_t*)realptr); - newptr = realloc(realptr,size+PREFIX_SIZE); + newptr = realloc(realptr,size+PREFIX_SIZE, MALLOC_LOCAL); if (newptr == NULL) { if (usable) *usable = 0; return NULL; diff --git a/tests/modules/Makefile b/tests/modules/Makefile index ae611de86..1beb217b8 100644 --- a/tests/modules/Makefile +++ b/tests/modules/Makefile @@ -32,6 +32,7 @@ TEST_MODULES = \ auth.so \ keyspace_events.so \ blockedclient.so \ + timers.so \ getkeys.so \ test_lazyfree.so \ timer.so \ From b90b753328bb2d060d79dc8b39ac8f331dd4730b Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 7 Mar 2022 20:20:47 -0500 Subject: [PATCH 10/35] Fix module test break --- runtest-moduleapi | 1 - 1 file changed, 1 deletion(-) diff --git a/runtest-moduleapi b/runtest-moduleapi index 05ca161ab..8adf2171d 100755 --- a/runtest-moduleapi +++ b/runtest-moduleapi @@ -32,7 +32,6 @@ $TCLSH tests/test_helper.tcl \ --single unit/moduleapi/blockedclient \ --single unit/moduleapi/moduleloadsave \ --single unit/moduleapi/getkeys \ ---single unit/moduleapi/timers \ --single unit/moduleapi/test_lazyfree \ --single unit/moduleapi/defrag \ --single unit/moduleapi/hash \ From 2928806ff07f5ead040557d6fd11f969653c0d08 Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 8 Mar 2022 18:20:03 -0500 Subject: [PATCH 11/35] Eliminate firewall dialogs on mac for regular and cluster tests. There are still issues with the sentinel tests but attempting to bind only to localhost causes failures --- tests/assets/minimal.conf | 1 + tests/cluster/run.tcl | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/assets/minimal.conf b/tests/assets/minimal.conf index ed49223c9..5e0b033da 100644 --- a/tests/assets/minimal.conf +++ b/tests/assets/minimal.conf @@ -1,4 +1,5 @@ # Minimal configuration for testing. +bind 127.0.0.1 always-show-logo yes daemonize no pidfile /var/run/keydb.pid diff --git a/tests/cluster/run.tcl b/tests/cluster/run.tcl index 2c72ad307..7e1e91081 100644 --- a/tests/cluster/run.tcl +++ b/tests/cluster/run.tcl @@ -13,8 +13,9 @@ set ::tlsdir "../../tls" proc main {} { parse_options spawn_instance redis $::redis_base_port $::instances_count { - "cluster-enabled yes" - "appendonly yes" + "bind 127.0.0.1" + "cluster-enabled yes" + "appendonly yes" "testmode yes" "server-threads 3" } From 203e4e228feac8a75bb68360c8894ebfc4212200 Mon Sep 17 00:00:00 2001 From: Malavan Sotheeswaran Date: Thu, 31 Mar 2022 18:40:17 -0700 Subject: [PATCH 12/35] remove unused var in networking.cpp --- src/networking.cpp | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/src/networking.cpp b/src/networking.cpp index a7920b692..8b1fb7e36 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -2000,18 +2000,6 @@ void ProcessPendingAsyncWrites() } c->fPendingAsyncWrite = FALSE; - // Now install the write event handler - int ae_flags = AE_WRITABLE|AE_WRITE_THREADSAFE; - /* For the fsync=always policy, we want that a given FD is never - * served for reading and writing in the same event loop iteration, - * so that in the middle of receiving the query, and serving it - * to the client, we'll call beforeSleep() that will do the - * actual fsync of AOF to disk. AE_BARRIER ensures that. */ - if (g_pserver->aof_state == AOF_ON && - g_pserver->aof_fsync == AOF_FSYNC_ALWAYS) - { - ae_flags |= AE_BARRIER; - } if (!((c->replstate == REPL_STATE_NONE || c->replstate == SLAVE_STATE_FASTSYNC_TX || (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))) From 8f3f12720311e73a4e5a708d3fc9cf3c10b058e5 Mon Sep 17 00:00:00 2001 From: Malavan Sotheeswaran Date: Tue, 5 Apr 2022 03:17:41 -0700 Subject: [PATCH 13/35] check ziplist len to avoid crash on empty ziplist convert --- src/t_zset.cpp | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/src/t_zset.cpp b/src/t_zset.cpp index 73e9316c8..ad4ea2bd4 100644 --- a/src/t_zset.cpp +++ b/src/t_zset.cpp @@ -1191,24 +1191,26 @@ void zsetConvert(robj *zobj, int encoding) { zs->dict = dictCreate(&zsetDictType,NULL); zs->zsl = zslCreate(); - eptr = ziplistIndex(zl,0); - serverAssertWithInfo(NULL,zobj,eptr != NULL); - sptr = ziplistNext(zl,eptr); - serverAssertWithInfo(NULL,zobj,sptr != NULL); + if (ziplistLen(zl) > 0) { + eptr = ziplistIndex(zl,0); + serverAssertWithInfo(NULL,zobj,eptr != NULL); + sptr = ziplistNext(zl,eptr); + serverAssertWithInfo(NULL,zobj,sptr != NULL); - while (eptr != NULL) { - score = zzlGetScore(sptr); - serverAssertWithInfo(NULL,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong)); - if (vstr == NULL) - ele = sdsfromlonglong(vlong); - else - ele = sdsnewlen((char*)vstr,vlen); + while (eptr != NULL) { + score = zzlGetScore(sptr); + serverAssertWithInfo(NULL,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong)); + if (vstr == NULL) + ele = sdsfromlonglong(vlong); + else + ele = sdsnewlen((char*)vstr,vlen); - node = zslInsert(zs->zsl,score,ele); - serverAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK); - zzlNext(zl,&eptr,&sptr); + node = zslInsert(zs->zsl,score,ele); + serverAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK); + zzlNext(zl,&eptr,&sptr); + } } - + zfree(zobj->m_ptr); zobj->m_ptr = zs; zobj->encoding = OBJ_ENCODING_SKIPLIST; From d63c5acb208622dc5877063aa65ecd2812d5fd80 Mon Sep 17 00:00:00 2001 From: Malavan Sotheeswaran Date: Thu, 31 Mar 2022 18:51:40 -0700 Subject: [PATCH 14/35] remove nullptr subtraction --- src/pqsort.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pqsort.c b/src/pqsort.c index 508c09f92..6e79097c8 100644 --- a/src/pqsort.c +++ b/src/pqsort.c @@ -62,7 +62,7 @@ static inline void swapfunc (char *, char *, size_t, int); } while (--i > 0); \ } -#define SWAPINIT(a, es) swaptype = ((char *)a - (char *)0) % sizeof(long) || \ +#define SWAPINIT(a, es) swaptype = (char *)a % sizeof(long) || \ es % sizeof(long) ? 2 : es == sizeof(long)? 0 : 1; static inline void From 0f187c74659016f5876283b9f33778e9e7500a96 Mon Sep 17 00:00:00 2001 From: Malavan Sotheeswaran Date: Thu, 31 Mar 2022 18:55:12 -0700 Subject: [PATCH 15/35] cannot mod a pointer --- src/pqsort.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pqsort.c b/src/pqsort.c index 6e79097c8..974553504 100644 --- a/src/pqsort.c +++ b/src/pqsort.c @@ -62,7 +62,7 @@ static inline void swapfunc (char *, char *, size_t, int); } while (--i > 0); \ } -#define SWAPINIT(a, es) swaptype = (char *)a % sizeof(long) || \ +#define SWAPINIT(a, es) swaptype = (uintptr_t)a % sizeof(long) || \ es % sizeof(long) ? 2 : es == sizeof(long)? 0 : 1; static inline void From eb46be3685d56e43914c735e1d46827698e26820 Mon Sep 17 00:00:00 2001 From: Malavan Sotheeswaran Date: Thu, 31 Mar 2022 19:47:13 -0700 Subject: [PATCH 16/35] need to include stdint for uintptr_t --- src/pqsort.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/pqsort.c b/src/pqsort.c index 974553504..26bb6a65c 100644 --- a/src/pqsort.c +++ b/src/pqsort.c @@ -41,6 +41,7 @@ #include #include +#include static inline char *med3 (char *, char *, char *, int (*)(const void *, const void *)); From 4af996e32e6442477790618ddd1c964ab5d62572 Mon Sep 17 00:00:00 2001 From: Malavan Sotheeswaran Date: Tue, 12 Apr 2022 13:12:34 -0700 Subject: [PATCH 17/35] use atomic_load for g_pserver->mstime --- src/server.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server.cpp b/src/server.cpp index eea2032d8..87213de04 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2694,7 +2694,7 @@ extern "C" void asyncFreeDictTable(dictEntry **de) void blockingOperationStarts() { if(!g_pserver->blocking_op_nesting++){ - g_pserver->blocked_last_cron = g_pserver->mstime; + __atomic_load(&g_pserver->mstime, &g_pserver->blocked_last_cron, __ATOMIC_ACQUIRE); } } From c529f0e1ed9f12225350d63a4b4dc31d330598f0 Mon Sep 17 00:00:00 2001 From: Vivek Saini Date: Thu, 14 Apr 2022 17:15:19 +0000 Subject: [PATCH 18/35] Integrate readwritelock with Pro Code --- src/AsyncWorkQueue.cpp | 2 ++ src/db.cpp | 2 ++ src/rdb.cpp | 6 ++++-- src/redis-benchmark.cpp | 11 +++++++++-- src/replication.cpp | 3 ++- src/server.cpp | 3 +++ 6 files changed, 22 insertions(+), 5 deletions(-) diff --git a/src/AsyncWorkQueue.cpp b/src/AsyncWorkQueue.cpp index fe02e5212..fddfa840b 100644 --- a/src/AsyncWorkQueue.cpp +++ b/src/AsyncWorkQueue.cpp @@ -43,9 +43,11 @@ void AsyncWorkQueue::WorkerThreadMain() lock.unlock(); serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch(); if (listLength(serverTL->clients_pending_asyncwrite)) { + aeThreadOnline(); aeAcquireLock(); ProcessPendingAsyncWrites(); aeReleaseLock(); + aeThreadOffline(); } g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch); serverTL->gcEpoch.reset(); diff --git a/src/db.cpp b/src/db.cpp index a115c887b..2b1321bcf 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -1068,7 +1068,9 @@ void keysCommand(client *c) { blockClient(c, BLOCKED_ASYNC); redisDb *db = c->db; g_pserver->asyncworkqueue->AddWorkFunction([el, c, db, patternCopy, snapshot]{ + aeThreadOnline(); keysCommandCore(c, snapshot, patternCopy); + aeThreadOffline(); sdsfree(patternCopy); aePostFunction(el, [c, db, snapshot]{ aeReleaseLock(); // we need to lock with coordination of the client diff --git a/src/rdb.cpp b/src/rdb.cpp index 2f5f73cd5..8e0bca922 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1552,6 +1552,7 @@ struct rdbSaveThreadArgs void *rdbSaveThread(void *vargs) { + aeThreadOnline(); serverAssert(!g_pserver->rdbThreadVars.fDone); rdbSaveThreadArgs *args = reinterpret_cast(vargs); serverAssert(serverTL == nullptr); @@ -1577,7 +1578,7 @@ void *rdbSaveThread(void *vargs) "%s: %zd MB of memory used by copy-on-write", "RDB",cbDiff/(1024*1024)); } - + aeThreadOffline(); g_pserver->rdbThreadVars.fDone = true; return (retval == C_OK) ? (void*)0 : (void*)1; } @@ -3659,6 +3660,7 @@ void *rdbSaveToSlavesSocketsThread(void *vargs) serverTL = &vars; vars.gcEpoch = g_pserver->garbageCollector.startEpoch(); + aeThreadOnline(); rioInitWithFd(&rdb,args->rdb_pipe_write); retval = rdbSaveRioWithEOFMark(&rdb,args->rgpdb,NULL,&args->rsi); @@ -3684,7 +3686,7 @@ void *rdbSaveToSlavesSocketsThread(void *vargs) g_pserver->db[idb]->endSnapshotAsync(args->rgpdb[idb]); g_pserver->garbageCollector.endEpoch(vars.gcEpoch); - + aeThreadOffline(); close(args->safe_to_exit_pipe); zfree(args); diff --git a/src/redis-benchmark.cpp b/src/redis-benchmark.cpp index 397b557c6..34e19c372 100644 --- a/src/redis-benchmark.cpp +++ b/src/redis-benchmark.cpp @@ -1017,7 +1017,11 @@ static void benchmark(const char *title, const char *cmd, int len) { createMissingClients(c); config.start = mstime(); - if (!config.num_threads) aeMain(config.el); + if (!config.num_threads) { + aeThreadOnline(); + aeMain(config.el); + aeThreadOffline(); + } else startBenchmarkThreads(); config.totlatency = mstime()-config.start; @@ -1057,7 +1061,9 @@ static void freeBenchmarkThreads() { static void *execBenchmarkThread(void *ptr) { benchmarkThread *thread = (benchmarkThread *) ptr; + aeThreadOnline(); aeMain(thread->el); + aeThreadOffline(); return NULL; } @@ -1696,7 +1702,7 @@ int main(int argc, const char **argv) { int len; client c; - + aeThreadOnline(); storage_init(NULL, 0); srandom(time(NULL) ^ getpid()); @@ -1749,6 +1755,7 @@ int main(int argc, const char **argv) { cliSecureInit(); } #endif + aeThreadOffline(); if (config.cluster_mode) { // We only include the slot placeholder {tag} if cluster mode is enabled diff --git a/src/replication.cpp b/src/replication.cpp index 74e77c307..c68e3f09c 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -1196,7 +1196,7 @@ int rdbSaveSnapshotForReplication(struct rdbSaveInfo *rsi) { size_t cbData = 0; size_t cbLastUpdate = 0; auto &replBuf = *spreplBuf; - + aeThreadOnline(); // Databases replBuf.addArrayLen(cserver.dbnum); for (int idb = 0; idb < cserver.dbnum; ++idb) { @@ -1244,6 +1244,7 @@ int rdbSaveSnapshotForReplication(struct rdbSaveInfo *rsi) { replBuf.putSlavesOnline(); aeReleaseLock(); } + aeThreadOffline(); }); return retval; diff --git a/src/server.cpp b/src/server.cpp index 87213de04..1f94566b4 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -6855,6 +6855,7 @@ int redisFork(int purpose) { latencyAddSampleIfNeeded("fork-lock",(ustime()-startWriteLock)/1000); if ((childpid = fork()) == 0) { /* Child */ + aeReleaseForkLock(); g_pserver->in_fork_child = purpose; setOOMScoreAdj(CONFIG_OOM_BGCHILD); setupChildSignalHandlers(); @@ -7258,9 +7259,11 @@ void *workerThreadMain(void *parg) if (iel != IDX_EVENT_LOOP_MAIN) { + aeThreadOnline(); aeAcquireLock(); initNetworkingThread(iel, cserver.cthreads > 1); aeReleaseLock(); + aeThreadOffline(); } moduleAcquireGIL(true); // Normally afterSleep acquires this, but that won't be called on the first run From 6a89ac329ca4b1c526f171f11c0eba1cc5343a50 Mon Sep 17 00:00:00 2001 From: Vivek Saini Date: Thu, 14 Apr 2022 17:21:50 +0000 Subject: [PATCH 19/35] Defensive asserts for RWLock --- src/readwritelock.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/readwritelock.h b/src/readwritelock.h index a7318a29f..9efed8da4 100644 --- a/src/readwritelock.h +++ b/src/readwritelock.h @@ -65,6 +65,7 @@ public: void releaseRead() { std::unique_lock rm(m_readLock); m_readCount--; + serverAssert(m_readCount >= 0); m_cv.notify_all(); } @@ -74,6 +75,7 @@ public: if (exclusive) m_writeLock.unlock(); m_writeCount--; + serverAssert(m_writeCount >= 0); m_cv.notify_all(); } From 0ed0745d909996c75a37a8bbb93373e384959715 Mon Sep 17 00:00:00 2001 From: Christian Legge Date: Wed, 12 Jan 2022 19:49:15 -0500 Subject: [PATCH 20/35] Save and restore master info in rdb to allow active replica partial sync (#371) * save replid for all masters in rdb * expanded rdbSaveInfo to hold multiple master structs * parse repl-masters from rdb * recover replid info from rdb in active replica mode, attempt partial sync * save offset from rdb into correct variable * don't change replid based on master in active rep * save and load psync info from correct fields --- src/aof.cpp | 2 +- src/rdb.cpp | 61 +++++++++++++++++-- src/replication.cpp | 53 ++++++++++------- src/sds.h | 15 +++++ src/server.cpp | 26 +++++--- src/server.h | 142 +++++++++++++++++++++++++++++++------------- 6 files changed, 224 insertions(+), 75 deletions(-) diff --git a/src/aof.cpp b/src/aof.cpp index 5ee11877b..9fa9290bd 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -884,7 +884,7 @@ int loadAppendOnlyFile(char *filename) { } else { /* RDB preamble. Pass loading the RDB functions. */ rio rdb; - rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; + rdbSaveInfo rsi; serverLog(LL_NOTICE,"Reading RDB preamble from AOF file..."); if (fseek(fp,0,SEEK_SET) == -1) goto readerr; diff --git a/src/rdb.cpp b/src/rdb.cpp index 8e0bca922..702ed9097 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1218,6 +1218,40 @@ int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { == -1) return -1; if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",rsi->master_repl_offset) == -1) return -1; + if (g_pserver->fActiveReplica && listLength(g_pserver->masters) > 0) { + sdsstring val = sdsstring(sdsempty()); + listNode *ln; + listIter li; + redisMaster* mi; + listRewind(g_pserver->masters,&li); + while ((ln = listNext(&li)) != NULL) { + mi = (redisMaster*)listNodeValue(ln); + if (!mi->master) { + // If master client is not available, use info from master struct - better than nothing + serverLog(LL_NOTICE, "saving master %s", mi->master_replid); + if (mi->master_replid[0] == 0) { + // if replid is null, there's no reason to save it + continue; + } + val = val.catfmt("%s:%I:%s:%i;", mi->master_replid, + mi->master_initial_offset, + mi->masterhost, + mi->masterport); + } + else { + serverLog(LL_NOTICE, "saving master %s", mi->master->replid); + if (mi->master->replid[0] == 0) { + // if replid is null, there's no reason to save it + continue; + } + val = val.catfmt("%s:%I:%s:%i;", mi->master->replid, + mi->master->reploff, + mi->masterhost, + mi->masterport); + } + } + if (rdbSaveAuxFieldStrStr(rdb, "repl-masters",val.get()) == -1) return -1; + } } if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble) == -1) return -1; return 1; @@ -1628,7 +1662,7 @@ int launchRdbSaveThread(pthread_t &child, rdbSaveInfo *rsi) } else { rdbSaveThreadArgs *args = (rdbSaveThreadArgs*)zmalloc(sizeof(rdbSaveThreadArgs) + ((cserver.dbnum-1)*sizeof(redisDbPersistentDataSnapshot*)), MALLOC_LOCAL); - rdbSaveInfo rsiT = RDB_SAVE_INFO_INIT; + rdbSaveInfo rsiT; if (rsi == nullptr) rsi = &rsiT; memcpy(&args->rsi, rsi, sizeof(rdbSaveInfo)); @@ -2888,11 +2922,11 @@ public: * snapshot taken by the master may not be reflected on the replica. */ bool fExpiredKey = iAmMaster() && !(this->rdbflags&RDBFLAGS_AOF_PREAMBLE) && job.expiretime != INVALID_EXPIRE && job.expiretime < this->now; if (fStaleMvccKey || fExpiredKey) { - if (fStaleMvccKey && !fExpiredKey && this->rsi != nullptr && this->rsi->mi != nullptr && this->rsi->mi->staleKeyMap != nullptr && lookupKeyRead(job.db, &keyobj) == nullptr) { + if (fStaleMvccKey && !fExpiredKey && this->rsi != nullptr && this->rsi->masters != nullptr && this->rsi->masters->staleKeyMap != nullptr && lookupKeyRead(job.db, &keyobj) == nullptr) { // We have a key that we've already deleted and is not back in our database. // We'll need to inform the sending master of the delete if it is also a replica of us robj_sharedptr objKeyDup(createStringObject(job.key, sdslen(job.key))); - this->rsi->mi->staleKeyMap->operator[](job.db->id).push_back(objKeyDup); + this->rsi->masters->staleKeyMap->operator[](job.db->id).push_back(objKeyDup); } sdsfree(job.key); job.key = nullptr; @@ -3206,6 +3240,23 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { memcpy(rsi->repl_id,ptrFromObj(auxval),CONFIG_RUN_ID_SIZE+1); rsi->repl_id_is_set = 1; } + } else if (!strcasecmp(szFromObj(auxkey),"repl-masters")) { + if (rsi) { + struct redisMaster mi; + char *masters = szFromObj(auxval); + char *entry = strtok(masters, ":"); + while (entry != NULL) { + memcpy(mi.master_replid, entry, sizeof(mi.master_replid)); + entry = strtok(NULL, ":"); + mi.master_initial_offset = atoi(entry); + entry = strtok(NULL, ":"); + mi.masterhost = entry; + entry = strtok(NULL, ";"); + mi.masterport = atoi(entry); + entry = strtok(NULL, ":"); + rsi->addMaster(mi); + } + } } else if (!strcasecmp(szFromObj(auxkey),"repl-offset")) { if (rsi) rsi->repl_offset = strtoll(szFromObj(auxval),NULL,10); } else if (!strcasecmp(szFromObj(auxkey),"lua")) { @@ -3861,8 +3912,8 @@ void bgsaveCommand(client *c) { * is returned, and the RDB saving will not persist any replication related * information. */ rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) { - rdbSaveInfo rsi_init = RDB_SAVE_INFO_INIT; - *rsi = rsi_init; + rdbSaveInfo rsi_init; + *rsi = std::move(rsi_init); memcpy(rsi->repl_id, g_pserver->replid, sizeof(g_pserver->replid)); rsi->master_repl_offset = g_pserver->master_repl_offset; diff --git a/src/replication.cpp b/src/replication.cpp index c68e3f09c..858e9d55d 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -2954,7 +2954,7 @@ bool readSyncBulkPayloadRdb(connection *conn, redisMaster *mi, rdbSaveInfo &rsi, mi->staleKeyMap->clear(); else mi->staleKeyMap = new (MALLOC_LOCAL) std::map>(); - rsi.mi = mi; + rsi.addMaster(*mi); } if (rdbLoadFile(rdb_filename,&rsi,RDBFLAGS_REPLICATION) != C_OK) { serverLog(LL_WARNING, @@ -2994,7 +2994,7 @@ error: } void readSyncBulkPayload(connection *conn) { - rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; + rdbSaveInfo rsi; redisMaster *mi = (redisMaster*)connGetPrivateData(conn); static int usemark = 0; if (mi == nullptr) { @@ -3043,7 +3043,7 @@ void readSyncBulkPayload(connection *conn) { { mergeReplicationId(mi->master->replid); } - else + else if (!g_pserver->fActiveReplica) { /* After a full resynchroniziation we use the replication ID and * offset of the master. The secondary ID / offset are cleared since @@ -3238,7 +3238,7 @@ int slaveTryPartialResynchronization(redisMaster *mi, connection *conn, int read * client structure representing the master into g_pserver->master. */ mi->master_initial_offset = -1; - if (mi->cached_master && !g_pserver->fActiveReplica) { + if (mi->cached_master) { psync_replid = mi->cached_master->replid; snprintf(psync_offset,sizeof(psync_offset),"%lld", mi->cached_master->reploff+1); serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset); @@ -3337,14 +3337,15 @@ int slaveTryPartialResynchronization(redisMaster *mi, connection *conn, int read sizeof(g_pserver->replid2)); g_pserver->second_replid_offset = g_pserver->master_repl_offset+1; - /* Update the cached master ID and our own primary ID to the - * new one. */ - memcpy(g_pserver->replid,sznew,sizeof(g_pserver->replid)); - memcpy(mi->cached_master->replid,sznew,sizeof(g_pserver->replid)); + if (!g_pserver->fActiveReplica) { + /* Update the cached master ID and our own primary ID to the + * new one. */ + memcpy(g_pserver->replid,sznew,sizeof(g_pserver->replid)); + memcpy(mi->cached_master->replid,sznew,sizeof(g_pserver->replid)); - /* Disconnect all the sub-slaves: they need to be notified. */ - if (!g_pserver->fActiveReplica) + /* Disconnect all the sub-slaves: they need to be notified. */ disconnectSlaves(); + } } } @@ -3725,18 +3726,6 @@ retry_connect: disconnectSlavesExcept(mi->master_uuid); /* Force our slaves to resync with us as well. */ freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */ } - else - { - if (listLength(g_pserver->slaves)) - { - changeReplicationId(); - clearReplicationId2(); - } - else - { - freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */ - } - } /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC * and the g_pserver->master_replid and master_initial_offset are @@ -4405,6 +4394,26 @@ void replicationCacheMasterUsingMyself(redisMaster *mi) { mi->master = NULL; } +/* This function is called when reloading master info from an RDB in Active Replica mode. + * It creates a cached master client using the info contained in the redisMaster struct. + * + * Assumes that the passed struct contains valid master info. */ +void replicationCacheMasterUsingMaster(redisMaster *mi) { + if (mi->cached_master) { + freeClient(mi->cached_master); + } + + replicationCreateMasterClient(mi, NULL, -1); + std::lock_guardmaster->lock)> lock(mi->master->lock); + + memcpy(mi->master->replid, mi->master_replid, sizeof(mi->master_replid)); + mi->master->reploff = mi->master_initial_offset; + + unlinkClient(mi->master); + mi->cached_master = mi->master; + mi->master = NULL; +} + /* Free a cached master, called when there are no longer the conditions for * a partial resync on reconnection. */ void replicationDiscardCachedMaster(redisMaster *mi) { diff --git a/src/sds.h b/src/sds.h index 26fe16225..1c983cdc1 100644 --- a/src/sds.h +++ b/src/sds.h @@ -429,6 +429,21 @@ public: return *this; } + sdsstring &operator=(sdsstring &&other) + { + sds tmp = m_str; + m_str = other.m_str; + other.m_str = tmp; + return *this; + } + + template + sdsstring catfmt(const char *fmt, Args... args) + { + m_str = sdscatfmt(m_str, fmt, args...); + return *this; + } + sds release() { sds sdsT = m_str; m_str = nullptr; diff --git a/src/server.cpp b/src/server.cpp index 1f94566b4..e3ed33beb 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -6940,7 +6940,7 @@ void loadDataFromDisk(void) { if (loadAppendOnlyFile(g_pserver->aof_filename) == C_OK) serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000); } else if (g_pserver->rdb_filename != NULL || g_pserver->rdb_s3bucketpath != NULL) { - rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; + rdbSaveInfo rsi; rsi.fForceSetKey = false; errno = 0; /* Prevent a stale value from affecting error checking */ if (rdbLoad(&rsi,RDBFLAGS_NONE) == C_OK) { @@ -6969,11 +6969,23 @@ void loadDataFromDisk(void) { while ((ln = listNext(&li))) { redisMaster *mi = (redisMaster*)listNodeValue(ln); - /* If we are a replica, create a cached master from this - * information, in order to allow partial resynchronizations - * with masters. */ - replicationCacheMasterUsingMyself(mi); - selectDb(mi->cached_master,rsi.repl_stream_db); + if (g_pserver->fActiveReplica) { + for (size_t i = 0; i < rsi.numMasters(); i++) { + if (!strcmp(mi->masterhost, rsi.masters[i].masterhost) && mi->masterport == rsi.masters[i].masterport) { + memcpy(mi->master_replid, rsi.masters[i].master_replid, sizeof(mi->master_replid)); + mi->master_initial_offset = rsi.masters[i].master_initial_offset; + replicationCacheMasterUsingMaster(mi); + serverLog(LL_NOTICE, "Cached master recovered from RDB for %s:%d", mi->masterhost, mi->masterport); + } + } + } + else { + /* If we are a replica, create a cached master from this + * information, in order to allow partial resynchronizations + * with masters. */ + replicationCacheMasterUsingMyself(mi); + selectDb(mi->cached_master,rsi.repl_stream_db); + } } } } else if (errno != ENOENT) { @@ -7596,7 +7608,7 @@ int main(int argc, char **argv) { __AFL_INIT(); #endif rio rdb; - rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; + rdbSaveInfo rsi; startLoadingFile(stdin, (char*)"stdin", 0); rioInitWithFile(&rdb,stdin); rdbLoadRio(&rdb,0,&rsi); diff --git a/src/server.h b/src/server.h index e3d7e90ae..900c42f94 100644 --- a/src/server.h +++ b/src/server.h @@ -1850,6 +1850,43 @@ struct redisMemOverhead { } *db; }; + +struct redisMaster { + char *masteruser; /* AUTH with this user and masterauth with master */ + char *masterauth; /* AUTH with this password with master */ + char *masterhost; /* Hostname of master */ + int masterport; /* Port of master */ + client *cached_master; /* Cached master to be reused for PSYNC. */ + client *master; + /* The following two fields is where we store master PSYNC replid/offset + * while the PSYNC is in progress. At the end we'll copy the fields into + * the server->master client structure. */ + char master_replid[CONFIG_RUN_ID_SIZE+1]; /* Master PSYNC runid. */ + long long master_initial_offset; /* Master PSYNC offset. */ + + bool isActive = false; + bool isRocksdbSnapshotRepl = false; + int repl_state; /* Replication status if the instance is a replica */ + off_t repl_transfer_size; /* Size of RDB to read from master during sync. */ + off_t repl_transfer_read; /* Amount of RDB read from master during sync. */ + off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */ + connection *repl_transfer_s; /* Slave -> Master SYNC socket */ + int repl_transfer_fd; /* Slave -> Master SYNC temp file descriptor */ + char *repl_transfer_tmpfile; /* Slave-> master SYNC temp file name */ + time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */ + time_t repl_down_since; /* Unix time at which link with master went down */ + + class SnapshotPayloadParseState *parseState; + sds bulkreadBuffer = nullptr; + + unsigned char master_uuid[UUID_BINARY_LEN]; /* Used during sync with master, this is our master's UUID */ + /* After we've connected with our master use the UUID in g_pserver->master */ + uint64_t mvccLastSync; + /* During a handshake the server may have stale keys, we track these here to share once a reciprocal connection is made */ + std::map> *staleKeyMap; + int ielReplTransfer = -1; +}; + /* This structure can be optionally passed to RDB save/load functions in * order to implement additional functionalities, by storing and loading * metadata to the RDB file. @@ -1858,7 +1895,66 @@ struct redisMemOverhead { * replication in order to make sure that chained slaves (slaves of slaves) * select the correct DB and are able to accept the stream coming from the * top-level master. */ -typedef struct rdbSaveInfo { +class rdbSaveInfo { +public: + rdbSaveInfo() { + repl_stream_db = -1; + repl_id_is_set = 0; + memcpy(repl_id, "0000000000000000000000000000000000000000", sizeof(repl_id)); + repl_offset = -1; + fForceSetKey = TRUE; + mvccMinThreshold = 0; + masters = nullptr; + masterCount = 0; + } + rdbSaveInfo(const rdbSaveInfo &other) { + repl_stream_db = other.repl_stream_db; + repl_id_is_set = other.repl_id_is_set; + memcpy(repl_id, other.repl_id, sizeof(repl_id)); + repl_offset = other.repl_offset; + fForceSetKey = other.fForceSetKey; + mvccMinThreshold = other.mvccMinThreshold; + masters = (struct redisMaster*)malloc(sizeof(struct redisMaster) * other.masterCount); + memcpy(masters, other.masters, sizeof(struct redisMaster) * other.masterCount); + masterCount = other.masterCount; + } + rdbSaveInfo(rdbSaveInfo &&other) : rdbSaveInfo() { + swap(*this, other); + } + rdbSaveInfo &operator=(rdbSaveInfo other) { + swap(*this, other); + return *this; + } + ~rdbSaveInfo() { + free(masters); + } + friend void swap(rdbSaveInfo &first, rdbSaveInfo &second) { + std::swap(first.repl_stream_db, second.repl_stream_db); + std::swap(first.repl_id_is_set, second.repl_id_is_set); + std::swap(first.repl_id, second.repl_id); + std::swap(first.repl_offset, second.repl_offset); + std::swap(first.fForceSetKey, second.fForceSetKey); + std::swap(first.mvccMinThreshold, second.mvccMinThreshold); + std::swap(first.masters, second.masters); + std::swap(first.masterCount, second.masterCount); + + } + + void addMaster(const struct redisMaster &mi) { + masterCount++; + if (masters == nullptr) { + masters = (struct redisMaster*)malloc(sizeof(struct redisMaster)); + } + else { + masters = (struct redisMaster*)realloc(masters, sizeof(struct redisMaster) * masterCount); + } + memcpy(masters + masterCount - 1, &mi, sizeof(struct redisMaster)); + } + + size_t numMasters() { + return masterCount; + } + /* Used saving and loading. */ int repl_stream_db; /* DB to select in g_pserver->master client. */ @@ -1872,10 +1968,11 @@ typedef struct rdbSaveInfo { long long master_repl_offset; uint64_t mvccMinThreshold; - struct redisMaster *mi; -} rdbSaveInfo; + struct redisMaster *masters; -#define RDB_SAVE_INFO_INIT {-1,0,"0000000000000000000000000000000000000000",-1, TRUE, 0, 0, nullptr} +private: + size_t masterCount; +}; struct malloc_stats { size_t zmalloc_used; @@ -2081,42 +2178,6 @@ private: int rdb_key_save_delay = -1; // thread local cache }; -struct redisMaster { - char *masteruser; /* AUTH with this user and masterauth with master */ - char *masterauth; /* AUTH with this password with master */ - char *masterhost; /* Hostname of master */ - int masterport; /* Port of master */ - client *cached_master; /* Cached master to be reused for PSYNC. */ - client *master; - /* The following two fields is where we store master PSYNC replid/offset - * while the PSYNC is in progress. At the end we'll copy the fields into - * the server->master client structure. */ - char master_replid[CONFIG_RUN_ID_SIZE+1]; /* Master PSYNC runid. */ - long long master_initial_offset; /* Master PSYNC offset. */ - - bool isActive = false; - bool isRocksdbSnapshotRepl = false; - int repl_state; /* Replication status if the instance is a replica */ - off_t repl_transfer_size; /* Size of RDB to read from master during sync. */ - off_t repl_transfer_read; /* Amount of RDB read from master during sync. */ - off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */ - connection *repl_transfer_s; /* Slave -> Master SYNC socket */ - int repl_transfer_fd; /* Slave -> Master SYNC temp file descriptor */ - char *repl_transfer_tmpfile; /* Slave-> master SYNC temp file name */ - time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */ - time_t repl_down_since; /* Unix time at which link with master went down */ - - class SnapshotPayloadParseState *parseState; - sds bulkreadBuffer = nullptr; - - unsigned char master_uuid[UUID_BINARY_LEN]; /* Used during sync with master, this is our master's UUID */ - /* After we've connected with our master use the UUID in g_pserver->master */ - uint64_t mvccLastSync; - /* During a handshake the server may have stale keys, we track these here to share once a reciprocal connection is made */ - std::map> *staleKeyMap; - int ielReplTransfer = -1; -}; - // Const vars are not changed after worker threads are launched struct redisServerConst { pid_t pid; /* Main process pid. */ @@ -3101,6 +3162,7 @@ void clearReplicationId2(void); void mergeReplicationId(const char *); void chopReplicationBacklog(void); void replicationCacheMasterUsingMyself(struct redisMaster *mi); +void replicationCacheMasterUsingMaster(struct redisMaster *mi); void feedReplicationBacklog(const void *ptr, size_t len); void updateMasterAuth(); void showLatestBacklog(); From 09067046f6a377ac961f565b085114ceddf5de78 Mon Sep 17 00:00:00 2001 From: Vivek Saini Date: Thu, 14 Apr 2022 21:05:10 +0000 Subject: [PATCH 21/35] placement new instead of memcpy --- src/rdb.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rdb.cpp b/src/rdb.cpp index 702ed9097..567ea0388 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1665,7 +1665,7 @@ int launchRdbSaveThread(pthread_t &child, rdbSaveInfo *rsi) rdbSaveInfo rsiT; if (rsi == nullptr) rsi = &rsiT; - memcpy(&args->rsi, rsi, sizeof(rdbSaveInfo)); + args->rsi = *(new (args) rdbSaveInfo(*rsi)); memcpy(&args->rsi.repl_id, g_pserver->replid, sizeof(g_pserver->replid)); args->rsi.master_repl_offset = g_pserver->master_repl_offset; @@ -3772,7 +3772,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { args->rdb_pipe_write = pipefds[1]; /* write end */ anetNonBlock(NULL, g_pserver->rdb_pipe_read); - memcpy(&args->rsi, rsi, sizeof(rdbSaveInfo)); + args->rsi = *(new (args) rdbSaveInfo(*rsi)); memcpy(&args->rsi.repl_id, g_pserver->replid, sizeof(g_pserver->replid)); args->rsi.master_repl_offset = g_pserver->master_repl_offset; From 738f4d44bd452a3aedb52709c49b5b962e7f7cbf Mon Sep 17 00:00:00 2001 From: Vivek Saini Date: Thu, 14 Apr 2022 22:34:02 +0000 Subject: [PATCH 22/35] Remove asserts, RW lock can go below zero in cases of aeAcquireLock --- src/readwritelock.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/readwritelock.h b/src/readwritelock.h index 9efed8da4..a7318a29f 100644 --- a/src/readwritelock.h +++ b/src/readwritelock.h @@ -65,7 +65,6 @@ public: void releaseRead() { std::unique_lock rm(m_readLock); m_readCount--; - serverAssert(m_readCount >= 0); m_cv.notify_all(); } @@ -75,7 +74,6 @@ public: if (exclusive) m_writeLock.unlock(); m_writeCount--; - serverAssert(m_writeCount >= 0); m_cv.notify_all(); } From 4d053b1aa165fb9f38ba735c4037b9e406aae20a Mon Sep 17 00:00:00 2001 From: Vivek Saini Date: Thu, 14 Apr 2022 23:54:05 +0000 Subject: [PATCH 23/35] Inclusive language --- src/replication.cpp | 2 +- src/server.h | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/replication.cpp b/src/replication.cpp index 858e9d55d..e57cf184f 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -3343,7 +3343,7 @@ int slaveTryPartialResynchronization(redisMaster *mi, connection *conn, int read memcpy(g_pserver->replid,sznew,sizeof(g_pserver->replid)); memcpy(mi->cached_master->replid,sznew,sizeof(g_pserver->replid)); - /* Disconnect all the sub-slaves: they need to be notified. */ + /* Disconnect all the replicas: they need to be notified. */ disconnectSlaves(); } } diff --git a/src/server.h b/src/server.h index 900c42f94..74ac1372a 100644 --- a/src/server.h +++ b/src/server.h @@ -1870,9 +1870,9 @@ struct redisMaster { off_t repl_transfer_size; /* Size of RDB to read from master during sync. */ off_t repl_transfer_read; /* Amount of RDB read from master during sync. */ off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */ - connection *repl_transfer_s; /* Slave -> Master SYNC socket */ - int repl_transfer_fd; /* Slave -> Master SYNC temp file descriptor */ - char *repl_transfer_tmpfile; /* Slave-> master SYNC temp file name */ + connection *repl_transfer_s; /* Replica -> Master SYNC socket */ + int repl_transfer_fd; /* Replica -> Master SYNC temp file descriptor */ + char *repl_transfer_tmpfile; /* Replica-> master SYNC temp file name */ time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */ time_t repl_down_since; /* Unix time at which link with master went down */ From d7b4f1e492f8303b1ad998149216599d8683e733 Mon Sep 17 00:00:00 2001 From: Vivek Saini Date: Wed, 20 Apr 2022 20:46:48 +0000 Subject: [PATCH 24/35] call aeThreadOnline() earlier --- src/AsyncWorkQueue.cpp | 4 ++-- src/db.cpp | 2 -- src/rdb.cpp | 2 +- src/replication.cpp | 2 -- 4 files changed, 3 insertions(+), 7 deletions(-) diff --git a/src/AsyncWorkQueue.cpp b/src/AsyncWorkQueue.cpp index fddfa840b..8d04b742a 100644 --- a/src/AsyncWorkQueue.cpp +++ b/src/AsyncWorkQueue.cpp @@ -28,6 +28,7 @@ void AsyncWorkQueue::WorkerThreadMain() if (m_workqueue.empty()) m_cvWakeup.wait(lock); + aeThreadOnline(); while (!m_workqueue.empty()) { WorkItem task = std::move(m_workqueue.front()); @@ -43,14 +44,13 @@ void AsyncWorkQueue::WorkerThreadMain() lock.unlock(); serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch(); if (listLength(serverTL->clients_pending_asyncwrite)) { - aeThreadOnline(); aeAcquireLock(); ProcessPendingAsyncWrites(); aeReleaseLock(); - aeThreadOffline(); } g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch); serverTL->gcEpoch.reset(); + aeThreadOffline(); } listRelease(vars.clients_pending_asyncwrite); diff --git a/src/db.cpp b/src/db.cpp index 2b1321bcf..a115c887b 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -1068,9 +1068,7 @@ void keysCommand(client *c) { blockClient(c, BLOCKED_ASYNC); redisDb *db = c->db; g_pserver->asyncworkqueue->AddWorkFunction([el, c, db, patternCopy, snapshot]{ - aeThreadOnline(); keysCommandCore(c, snapshot, patternCopy); - aeThreadOffline(); sdsfree(patternCopy); aePostFunction(el, [c, db, snapshot]{ aeReleaseLock(); // we need to lock with coordination of the client diff --git a/src/rdb.cpp b/src/rdb.cpp index 567ea0388..5d158eec5 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -3706,12 +3706,12 @@ void *rdbSaveToSlavesSocketsThread(void *vargs) int retval; rio rdb; + aeThreadOnline(); serverAssert(serverTL == nullptr); redisServerThreadVars vars; serverTL = &vars; vars.gcEpoch = g_pserver->garbageCollector.startEpoch(); - aeThreadOnline(); rioInitWithFd(&rdb,args->rdb_pipe_write); retval = rdbSaveRioWithEOFMark(&rdb,args->rgpdb,NULL,&args->rsi); diff --git a/src/replication.cpp b/src/replication.cpp index e57cf184f..1f20f225d 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -1196,7 +1196,6 @@ int rdbSaveSnapshotForReplication(struct rdbSaveInfo *rsi) { size_t cbData = 0; size_t cbLastUpdate = 0; auto &replBuf = *spreplBuf; - aeThreadOnline(); // Databases replBuf.addArrayLen(cserver.dbnum); for (int idb = 0; idb < cserver.dbnum; ++idb) { @@ -1244,7 +1243,6 @@ int rdbSaveSnapshotForReplication(struct rdbSaveInfo *rsi) { replBuf.putSlavesOnline(); aeReleaseLock(); } - aeThreadOffline(); }); return retval; From a0208b730116d8677766b3a3b526d305a12ba240 Mon Sep 17 00:00:00 2001 From: Vivek Saini Date: Thu, 21 Apr 2022 01:18:12 +0000 Subject: [PATCH 25/35] Removed mergeReplicationId --- src/replication.cpp | 27 +-------------------------- src/server.h | 1 - 2 files changed, 1 insertion(+), 27 deletions(-) diff --git a/src/replication.cpp b/src/replication.cpp index 1f20f225d..7ca2a647e 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -2249,24 +2249,6 @@ void changeReplicationId(void) { saveMasterStatusToStorage(false); } - -int hexchToInt(char ch) -{ - if (ch >= '0' && ch <= '9') - return ch - '0'; - if (ch >= 'a' && ch <= 'f') - return (ch - 'a') + 10; - return (ch - 'A') + 10; -} -void mergeReplicationId(const char *id) -{ - for (int i = 0; i < CONFIG_RUN_ID_SIZE; ++i) - { - const char *charset = "0123456789abcdef"; - g_pserver->replid[i] = charset[hexchToInt(g_pserver->replid[i]) ^ hexchToInt(id[i])]; - } -} - /* Clear (invalidate) the secondary replication ID. This happens, for * example, after a full resynchronization, when we start a new replication * history. */ @@ -3008,9 +2990,6 @@ void readSyncBulkPayload(connection *conn) { return; } - // Should we update our database, or create from scratch? - int fUpdate = g_pserver->fActiveReplica || g_pserver->enable_multimaster; - /* Final setup of the connected slave <- master link */ replicationCreateMasterClient(mi,mi->repl_transfer_s,rsi.repl_stream_db); if (mi->isRocksdbSnapshotRepl) { @@ -3037,11 +3016,7 @@ void readSyncBulkPayload(connection *conn) { /* After a full resynchronization we use the replication ID and * offset of the master. The secondary ID / offset are cleared since * we are starting a new history. */ - if (fUpdate) - { - mergeReplicationId(mi->master->replid); - } - else if (!g_pserver->fActiveReplica) + if (!g_pserver->fActiveReplica) { /* After a full resynchroniziation we use the replication ID and * offset of the master. The secondary ID / offset are cleared since diff --git a/src/server.h b/src/server.h index 74ac1372a..e174d6902 100644 --- a/src/server.h +++ b/src/server.h @@ -3159,7 +3159,6 @@ long long getPsyncInitialOffset(void); int replicationSetupSlaveForFullResync(client *replica, long long offset); void changeReplicationId(void); void clearReplicationId2(void); -void mergeReplicationId(const char *); void chopReplicationBacklog(void); void replicationCacheMasterUsingMyself(struct redisMaster *mi); void replicationCacheMasterUsingMaster(struct redisMaster *mi); From 0a8cbc19b31397bfc3208820d82e492728a28a62 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 18 Apr 2022 23:05:03 +0000 Subject: [PATCH 26/35] Fix bug in dockerfile script --- docker-internal/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker-internal/Dockerfile b/docker-internal/Dockerfile index 17703fbee..33993eea7 100644 --- a/docker-internal/Dockerfile +++ b/docker-internal/Dockerfile @@ -91,7 +91,7 @@ RUN \ sed -i 's/^\(logfile .*\)$/# \1/' /etc/keydb/keydb.conf; \ sed -i 's/protected-mode yes/protected-mode no/g' /etc/keydb/keydb.conf; \ sed -i 's/^\(bind .*\)$/# \1/' /etc/keydb/keydb.conf; \ - echo "loadmodule /usr/local/lib/modstatsd.so" >> /etc/keydb/keydb.conf; \ + echo -e "\nloadmodule /usr/local/lib/modstatsd.so" >> /etc/keydb/keydb.conf; \ ln -s keydb-cli redis-cli; \ cd /etc/keydb; \ ln -s keydb.conf redis.conf; \ From 33a4d78a903ec2020e8c63f9bef949918e35de34 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 18 Apr 2022 23:07:15 +0000 Subject: [PATCH 27/35] Make active client balancing a configurable option --- keydb.conf | 9 +++++++++ src/config.cpp | 1 + src/networking.cpp | 4 ++-- src/server.h | 1 + 4 files changed, 13 insertions(+), 2 deletions(-) diff --git a/keydb.conf b/keydb.conf index d543af03f..355271150 100644 --- a/keydb.conf +++ b/keydb.conf @@ -2063,3 +2063,12 @@ server-threads 2 # # By default KeyDB sets this to 2. replica-weighting-factor 2 + +# Should KeyDB make active attempts at balancing clients across threads? This can impact +# performance accepting new clients. By default this is enabled. If disabled there is still +# a best effort from the kernel to distribute across threads with SO_REUSEPORT but it will not +# be as fair. +# +# By default this is enabled +# +active-client-balancing yes \ No newline at end of file diff --git a/src/config.cpp b/src/config.cpp index efd7928bd..eaa1e2dd9 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -2779,6 +2779,7 @@ standardConfig configs[] = { createBoolConfig("replica-announced", NULL, MODIFIABLE_CONFIG, g_pserver->replica_announced, 1, NULL, NULL), createBoolConfig("enable-async-commands", NULL, MODIFIABLE_CONFIG, g_pserver->enable_async_commands, 1, NULL, NULL), createBoolConfig("multithread-load-enabled", NULL, MODIFIABLE_CONFIG, g_pserver->multithread_load_enabled, 0, NULL, NULL), + createBoolConfig("active-client-balancing", NULL, MODIFIABLE_CONFIG, g_pserver->active_client_balancing, 1, NULL, NULL), /* String Configs */ createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, g_pserver->acl_filename, "", NULL, NULL), diff --git a/src/networking.cpp b/src/networking.cpp index 8b1fb7e36..078e03f82 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1319,7 +1319,7 @@ void acceptOnThread(connection *conn, int flags, char *cip) int ielCur = ielFromEventLoop(serverTL->el); bool fBootLoad = (g_pserver->loading == LOADING_BOOT); - int ielTarget = 0; + int ielTarget = ielCur; if (fBootLoad) { ielTarget = IDX_EVENT_LOOP_MAIN; // During load only the main thread is active @@ -1330,7 +1330,7 @@ void acceptOnThread(connection *conn, int flags, char *cip) while (cserver.cthreads > 1 && ielTarget == IDX_EVENT_LOOP_MAIN) ielTarget = rand() % cserver.cthreads; } - else + else if (g_pserver->active_client_balancing) { // Cluster connections are more transient, so its not worth the cost to balance // we can trust that SO_REUSEPORT is doing its job of distributing connections diff --git a/src/server.h b/src/server.h index e174d6902..0a7d7c1e6 100644 --- a/src/server.h +++ b/src/server.h @@ -2696,6 +2696,7 @@ struct redisServer { int enable_async_commands; int multithread_load_enabled = 0; + int active_client_balancing = 1; long long repl_batch_offStart = -1; long long repl_batch_idxStart = -1; From 68957b279a371a6bad0601b4f608f28647a93305 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 18 Apr 2022 23:07:36 +0000 Subject: [PATCH 28/35] With TLS throttle accepts if server is under heavy load - do not change non TLS behavior --- src/networking.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/networking.cpp b/src/networking.cpp index 078e03f82..e8e929a20 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1405,6 +1405,8 @@ void acceptTLSHandler(aeEventLoop *el, int fd, void *privdata, int mask) { serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); acceptOnThread(connCreateAcceptedTLS(cfd, g_pserver->tls_auth_clients), 0, cip); + if (aeLockContention() >= 2) + break; } } From 3d2a25fa3322cd80b061321131d0707f9e7f1acb Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 21 Apr 2022 19:53:16 +0000 Subject: [PATCH 29/35] acceptTLS is threadsafe like the non TLS version --- src/server.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server.cpp b/src/server.cpp index e3ed33beb..3cac24e97 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -3772,7 +3772,7 @@ static void initNetworkingThread(int iel, int fReusePort) makeThreadKillable(); for (int j = 0; j < g_pserver->rgthreadvar[iel].tlsfd.count; j++) { - if (aeCreateFileEvent(g_pserver->rgthreadvar[iel].el, g_pserver->rgthreadvar[iel].tlsfd.fd[j], AE_READABLE, + if (aeCreateFileEvent(g_pserver->rgthreadvar[iel].el, g_pserver->rgthreadvar[iel].tlsfd.fd[j], AE_READABLE|AE_READ_THREADSAFE, acceptTLSHandler,NULL) == AE_ERR) { serverPanic( From c7108ac57e0be8c5a1252d42caf6bad4e165b81f Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 22 Apr 2022 22:43:20 +0000 Subject: [PATCH 30/35] PSYNC production fixes --- src/rdb.cpp | 120 +++++++++++++++++++++++++------------------- src/replication.cpp | 4 +- src/server.cpp | 40 +++++---------- src/server.h | 95 +++++++++++++++++++++-------------- src/t_stream.cpp | 1 - src/tls.cpp | 7 +++ 6 files changed, 149 insertions(+), 118 deletions(-) diff --git a/src/rdb.cpp b/src/rdb.cpp index 5d158eec5..320ab918b 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1218,37 +1218,15 @@ int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { == -1) return -1; if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",rsi->master_repl_offset) == -1) return -1; - if (g_pserver->fActiveReplica && listLength(g_pserver->masters) > 0) { + if (g_pserver->fActiveReplica) { sdsstring val = sdsstring(sdsempty()); - listNode *ln; - listIter li; - redisMaster* mi; - listRewind(g_pserver->masters,&li); - while ((ln = listNext(&li)) != NULL) { - mi = (redisMaster*)listNodeValue(ln); - if (!mi->master) { - // If master client is not available, use info from master struct - better than nothing - serverLog(LL_NOTICE, "saving master %s", mi->master_replid); - if (mi->master_replid[0] == 0) { - // if replid is null, there's no reason to save it - continue; - } - val = val.catfmt("%s:%I:%s:%i;", mi->master_replid, - mi->master_initial_offset, - mi->masterhost, - mi->masterport); - } - else { - serverLog(LL_NOTICE, "saving master %s", mi->master->replid); - if (mi->master->replid[0] == 0) { - // if replid is null, there's no reason to save it - continue; - } - val = val.catfmt("%s:%I:%s:%i;", mi->master->replid, - mi->master->reploff, - mi->masterhost, - mi->masterport); - } + + for (auto &msi : rsi->vecmastersaveinfo) { + val = val.catfmt("%s:%I:%s:%i:%i;", msi.master_replid, + msi.master_initial_offset, + msi.masterhost.get(), + msi.masterport, + msi.selected_db); } if (rdbSaveAuxFieldStrStr(rdb, "repl-masters",val.get()) == -1) return -1; } @@ -1661,11 +1639,12 @@ int launchRdbSaveThread(pthread_t &child, rdbSaveInfo *rsi) return rdbSaveBackgroundFork(rsi); } else { - rdbSaveThreadArgs *args = (rdbSaveThreadArgs*)zmalloc(sizeof(rdbSaveThreadArgs) + ((cserver.dbnum-1)*sizeof(redisDbPersistentDataSnapshot*)), MALLOC_LOCAL); + rdbSaveThreadArgs *args = (rdbSaveThreadArgs*)zcalloc(sizeof(rdbSaveThreadArgs) + ((cserver.dbnum-1)*sizeof(redisDbPersistentDataSnapshot*)), MALLOC_LOCAL); + // Placement new rdbSaveInfo rsiT; if (rsi == nullptr) rsi = &rsiT; - args->rsi = *(new (args) rdbSaveInfo(*rsi)); + args->rsi = *rsi; memcpy(&args->rsi.repl_id, g_pserver->replid, sizeof(g_pserver->replid)); args->rsi.master_repl_offset = g_pserver->master_repl_offset; @@ -2922,11 +2901,11 @@ public: * snapshot taken by the master may not be reflected on the replica. */ bool fExpiredKey = iAmMaster() && !(this->rdbflags&RDBFLAGS_AOF_PREAMBLE) && job.expiretime != INVALID_EXPIRE && job.expiretime < this->now; if (fStaleMvccKey || fExpiredKey) { - if (fStaleMvccKey && !fExpiredKey && this->rsi != nullptr && this->rsi->masters != nullptr && this->rsi->masters->staleKeyMap != nullptr && lookupKeyRead(job.db, &keyobj) == nullptr) { + if (fStaleMvccKey && !fExpiredKey && this->rsi != nullptr && this->rsi->mi != nullptr && this->rsi->mi->staleKeyMap != nullptr && lookupKeyRead(job.db, &keyobj) == nullptr) { // We have a key that we've already deleted and is not back in our database. // We'll need to inform the sending master of the delete if it is also a replica of us robj_sharedptr objKeyDup(createStringObject(job.key, sdslen(job.key))); - this->rsi->masters->staleKeyMap->operator[](job.db->id).push_back(objKeyDup); + this->rsi->mi->staleKeyMap->operator[](job.db->id).push_back(objKeyDup); } sdsfree(job.key); job.key = nullptr; @@ -3242,19 +3221,26 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { } } else if (!strcasecmp(szFromObj(auxkey),"repl-masters")) { if (rsi) { - struct redisMaster mi; + MasterSaveInfo msi; char *masters = szFromObj(auxval); - char *entry = strtok(masters, ":"); + char *saveptr; + char *entry = strtok_r(masters, ":", &saveptr); while (entry != NULL) { - memcpy(mi.master_replid, entry, sizeof(mi.master_replid)); - entry = strtok(NULL, ":"); - mi.master_initial_offset = atoi(entry); - entry = strtok(NULL, ":"); - mi.masterhost = entry; - entry = strtok(NULL, ";"); - mi.masterport = atoi(entry); - entry = strtok(NULL, ":"); - rsi->addMaster(mi); + memcpy(msi.master_replid, entry, sizeof(msi.master_replid)); + entry = strtok_r(NULL, ":", &saveptr); + if (entry == nullptr) break; + msi.master_initial_offset = atoll(entry); + entry = strtok_r(NULL, ":", &saveptr); + if (entry == nullptr) break; + msi.masterhost = sdsstring(sdsnew(entry)); + entry = strtok_r(NULL, ":", &saveptr); + if (entry == nullptr) break; + msi.masterport = atoi(entry); + entry = strtok_r(NULL, ";", &saveptr); + if (entry == nullptr) break; + msi.selected_db = atoi(entry); + entry = strtok_r(NULL, ":", &saveptr); + rsi->addMaster(msi); } } } else if (!strcasecmp(szFromObj(auxkey),"repl-offset")) { @@ -3533,6 +3519,32 @@ eoferr: return C_ERR; } +void updateActiveReplicaMastersFromRsi(rdbSaveInfo *rsi) { + if (rsi != nullptr && g_pserver->fActiveReplica) { + serverLog(LL_NOTICE, "RDB contains information on %d masters", (int)rsi->numMasters()); + listIter li; + listNode *ln; + + listRewind(g_pserver->masters, &li); + while ((ln = listNext(&li))) + { + redisMaster *mi = (redisMaster*)listNodeValue(ln); + if (mi->master != nullptr) { + continue; //ignore connected masters + } + for (size_t i = 0; i < rsi->numMasters(); i++) { + if (!sdscmp(mi->masterhost, (sds)rsi->vecmastersaveinfo[i].masterhost.get()) && mi->masterport == rsi->vecmastersaveinfo[i].masterport) { + memcpy(mi->master_replid, rsi->vecmastersaveinfo[i].master_replid, sizeof(mi->master_replid)); + mi->master_initial_offset = rsi->vecmastersaveinfo[i].master_initial_offset; + replicationCacheMasterUsingMaster(mi); + serverLog(LL_NOTICE, "Cached master recovered from RDB for %s:%d", mi->masterhost, mi->masterport); + break; + } + } + } + } +} + int rdbLoad(rdbSaveInfo *rsi, int rdbflags) { int err = C_ERR; @@ -3913,11 +3925,22 @@ void bgsaveCommand(client *c) { * information. */ rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) { rdbSaveInfo rsi_init; - *rsi = std::move(rsi_init); + *rsi = rsi_init; memcpy(rsi->repl_id, g_pserver->replid, sizeof(g_pserver->replid)); rsi->master_repl_offset = g_pserver->master_repl_offset; + if (g_pserver->fActiveReplica) { + listIter li; + listNode *ln = nullptr; + listRewind(g_pserver->masters, &li); + while ((ln = listNext(&li))) { + redisMaster *mi = (redisMaster*)listNodeValue(ln); + MasterSaveInfo msi(*mi); + rsi->addMaster(msi); + } + } + /* If the instance is a master, we can populate the replication info * only when repl_backlog is not NULL. If the repl_backlog is NULL, * it means that the instance isn't in any replication chains. In this @@ -3935,11 +3958,6 @@ rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) { return rsi; } - if (listLength(g_pserver->masters) > 1) - { - // BUGBUG, warn user about this incomplete implementation - serverLog(LL_WARNING, "Warning: Only backing up first master's information in RDB"); - } struct redisMaster *miFirst = (redisMaster*)(listLength(g_pserver->masters) ? listNodeValue(listFirst(g_pserver->masters)) : NULL); /* If the instance is a replica we need a connected master diff --git a/src/replication.cpp b/src/replication.cpp index 7ca2a647e..fd713e55b 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -2856,6 +2856,7 @@ bool readSyncBulkPayloadRdb(connection *conn, redisMaster *mi, rdbSaveInfo &rsi, * gets promoted. */ return false; } + if (g_pserver->fActiveReplica) updateActiveReplicaMastersFromRsi(&rsi); /* RDB loading succeeded if we reach this point. */ if (g_pserver->repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { @@ -2934,7 +2935,7 @@ bool readSyncBulkPayloadRdb(connection *conn, redisMaster *mi, rdbSaveInfo &rsi, mi->staleKeyMap->clear(); else mi->staleKeyMap = new (MALLOC_LOCAL) std::map>(); - rsi.addMaster(*mi); + rsi.mi = mi; } if (rdbLoadFile(rdb_filename,&rsi,RDBFLAGS_REPLICATION) != C_OK) { serverLog(LL_WARNING, @@ -2951,6 +2952,7 @@ bool readSyncBulkPayloadRdb(connection *conn, redisMaster *mi, rdbSaveInfo &rsi, it'll be restarted when sync succeeds or replica promoted. */ return false; } + if (g_pserver->fActiveReplica) updateActiveReplicaMastersFromRsi(&rsi); /* Cleanup. */ if (g_pserver->rdb_del_sync_files && allPersistenceDisabled()) { diff --git a/src/server.cpp b/src/server.cpp index 3cac24e97..373b20337 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1141,7 +1141,7 @@ struct redisCommand redisCommandTable[] = { 0,NULL,0,0,0,0,0,0}, {"rreplay",replicaReplayCommand,-3, - "read-only fast noprop", + "read-only fast noprop ok-stale", 0,NULL,0,0,0,0,0,0}, {"keydb.cron",cronCommand,-5, @@ -4938,7 +4938,8 @@ int processCommand(client *c, int callFlags) { if (FBrokenLinkToMaster() && g_pserver->repl_serve_stale_data == 0 && is_denystale_command && - !(g_pserver->fActiveReplica && c->cmd->proc == syncCommand)) + !(g_pserver->fActiveReplica && c->cmd->proc == syncCommand) + && !FInReplicaReplay()) { rejectCommand(c, shared.masterdownerr); return C_OK; @@ -6962,31 +6963,15 @@ void loadDataFromDisk(void) { g_pserver->master_repl_offset = rsi.repl_offset; if (g_pserver->repl_batch_offStart >= 0) g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; - listIter li; - listNode *ln; - - listRewind(g_pserver->masters, &li); - while ((ln = listNext(&li))) - { - redisMaster *mi = (redisMaster*)listNodeValue(ln); - if (g_pserver->fActiveReplica) { - for (size_t i = 0; i < rsi.numMasters(); i++) { - if (!strcmp(mi->masterhost, rsi.masters[i].masterhost) && mi->masterport == rsi.masters[i].masterport) { - memcpy(mi->master_replid, rsi.masters[i].master_replid, sizeof(mi->master_replid)); - mi->master_initial_offset = rsi.masters[i].master_initial_offset; - replicationCacheMasterUsingMaster(mi); - serverLog(LL_NOTICE, "Cached master recovered from RDB for %s:%d", mi->masterhost, mi->masterport); - } - } - } - else { - /* If we are a replica, create a cached master from this - * information, in order to allow partial resynchronizations - * with masters. */ - replicationCacheMasterUsingMyself(mi); - selectDb(mi->cached_master,rsi.repl_stream_db); - } - } + } + updateActiveReplicaMastersFromRsi(&rsi); + if (!g_pserver->fActiveReplica && listLength(g_pserver->masters)) { + redisMaster *mi = (redisMaster*)listNodeValue(listFirst(g_pserver->masters)); + /* If we are a replica, create a cached master from this + * information, in order to allow partial resynchronizations + * with masters. */ + replicationCacheMasterUsingMyself(mi); + selectDb(mi->cached_master,rsi.repl_stream_db); } } else if (errno != ENOENT) { serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno)); @@ -7293,6 +7278,7 @@ void *workerThreadMain(void *parg) serverAssert(!GlobalLocksAcquired()); aeDeleteEventLoop(el); + tlsCleanupThread(); return NULL; } diff --git a/src/server.h b/src/server.h index 0a7d7c1e6..f96482612 100644 --- a/src/server.h +++ b/src/server.h @@ -1887,6 +1887,40 @@ struct redisMaster { int ielReplTransfer = -1; }; +struct MasterSaveInfo { + MasterSaveInfo() = default; + MasterSaveInfo(const redisMaster &mi) { + memcpy(master_replid, mi.master_replid, sizeof(mi.master_replid)); + if (mi.master) { + master_initial_offset = mi.master->reploff; + selected_db = mi.master->db->id; + } else if (mi.cached_master) { + master_initial_offset = mi.cached_master->reploff; + selected_db = mi.cached_master->db->id; + } else { + master_initial_offset = -1; + selected_db = 0; + } + masterport = mi.masterport; + masterhost = sdsstring(sdsdup(mi.masterhost)); + masterport = mi.masterport; + } + + MasterSaveInfo &operator=(const MasterSaveInfo &other) { + masterhost = other.masterhost; + masterport = other.masterport; + memcpy(master_replid, other.master_replid, sizeof(master_replid)); + master_initial_offset = other.master_initial_offset; + return *this; + } + + sdsstring masterhost; + int masterport; + char master_replid[CONFIG_RUN_ID_SIZE+1]; + long long master_initial_offset; + int selected_db; +}; + /* This structure can be optionally passed to RDB save/load functions in * order to implement additional functionalities, by storing and loading * metadata to the RDB file. @@ -1904,8 +1938,6 @@ public: repl_offset = -1; fForceSetKey = TRUE; mvccMinThreshold = 0; - masters = nullptr; - masterCount = 0; } rdbSaveInfo(const rdbSaveInfo &other) { repl_stream_db = other.repl_stream_db; @@ -1914,45 +1946,31 @@ public: repl_offset = other.repl_offset; fForceSetKey = other.fForceSetKey; mvccMinThreshold = other.mvccMinThreshold; - masters = (struct redisMaster*)malloc(sizeof(struct redisMaster) * other.masterCount); - memcpy(masters, other.masters, sizeof(struct redisMaster) * other.masterCount); - masterCount = other.masterCount; + vecmastersaveinfo = other.vecmastersaveinfo; + master_repl_offset = other.master_repl_offset; + mi = other.mi; } - rdbSaveInfo(rdbSaveInfo &&other) : rdbSaveInfo() { - swap(*this, other); - } - rdbSaveInfo &operator=(rdbSaveInfo other) { - swap(*this, other); + + rdbSaveInfo &operator=(const rdbSaveInfo &other) { + repl_stream_db = other.repl_stream_db; + repl_id_is_set = other.repl_id_is_set; + memcpy(repl_id, other.repl_id, sizeof(repl_id)); + repl_offset = other.repl_offset; + fForceSetKey = other.fForceSetKey; + mvccMinThreshold = other.mvccMinThreshold; + vecmastersaveinfo = other.vecmastersaveinfo; + master_repl_offset = other.master_repl_offset; + mi = other.mi; + return *this; } - ~rdbSaveInfo() { - free(masters); - } - friend void swap(rdbSaveInfo &first, rdbSaveInfo &second) { - std::swap(first.repl_stream_db, second.repl_stream_db); - std::swap(first.repl_id_is_set, second.repl_id_is_set); - std::swap(first.repl_id, second.repl_id); - std::swap(first.repl_offset, second.repl_offset); - std::swap(first.fForceSetKey, second.fForceSetKey); - std::swap(first.mvccMinThreshold, second.mvccMinThreshold); - std::swap(first.masters, second.masters); - std::swap(first.masterCount, second.masterCount); - } - - void addMaster(const struct redisMaster &mi) { - masterCount++; - if (masters == nullptr) { - masters = (struct redisMaster*)malloc(sizeof(struct redisMaster)); - } - else { - masters = (struct redisMaster*)realloc(masters, sizeof(struct redisMaster) * masterCount); - } - memcpy(masters + masterCount - 1, &mi, sizeof(struct redisMaster)); + void addMaster(const MasterSaveInfo &si) { + vecmastersaveinfo.push_back(si); } size_t numMasters() { - return masterCount; + return vecmastersaveinfo.size(); } /* Used saving and loading. */ @@ -1968,10 +1986,8 @@ public: long long master_repl_offset; uint64_t mvccMinThreshold; - struct redisMaster *masters; - -private: - size_t masterCount; + std::vector vecmastersaveinfo; + struct redisMaster *mi = nullptr; }; struct malloc_stats { @@ -3853,6 +3869,8 @@ void lfenceCommand(client *c); int FBrokenLinkToMaster(int *pconnectMasters = nullptr); int FActiveMaster(client *c); struct redisMaster *MasterInfoFromClient(client *c); +bool FInReplicaReplay(); +void updateActiveReplicaMastersFromRsi(rdbSaveInfo *rsi); /* MVCC */ uint64_t getMvccTstamp(); @@ -3950,6 +3968,7 @@ void makeThreadKillable(void); /* TLS stuff */ void tlsInit(void); void tlsInitThread(); +void tlsCleanupThread(); void tlsCleanup(void); int tlsConfigure(redisTLSContextConfig *ctx_config); void tlsReload(void); diff --git a/src/t_stream.cpp b/src/t_stream.cpp index 010e9b65b..b005cf600 100644 --- a/src/t_stream.cpp +++ b/src/t_stream.cpp @@ -56,7 +56,6 @@ void streamFreeCG(streamCG *cg); void streamFreeNACK(streamNACK *na); size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer); -bool FInReplicaReplay(); int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq); int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq); diff --git a/src/tls.cpp b/src/tls.cpp index 9b3a0415c..68651bfbb 100644 --- a/src/tls.cpp +++ b/src/tls.cpp @@ -180,6 +180,12 @@ void tlsInitThread(void) pending_list = listCreate(); } +void tlsCleanupThread(void) +{ + if (pending_list) + listRelease(pending_list); +} + void tlsCleanup(void) { if (redis_tls_ctx) { SSL_CTX_free(redis_tls_ctx); @@ -1260,6 +1266,7 @@ int tlsProcessPendingData() { } void tlsInitThread() {} +void tlsCleanupThread(void) {} sds connTLSGetPeerCert(connection *conn_) { (void) conn_; From 20c34a91da474c05d4e7aca50f7571bc6482d0c5 Mon Sep 17 00:00:00 2001 From: Vivek Saini Date: Thu, 28 Apr 2022 21:30:40 +0000 Subject: [PATCH 31/35] Converted some existing PSYNC tests for multimaster --- tests/integration/psync2-reg-multimaster.tcl | 102 ++++++++++++ .../replication-psync-multimaster.tcl | 153 ++++++++++++++++++ 2 files changed, 255 insertions(+) create mode 100644 tests/integration/psync2-reg-multimaster.tcl create mode 100644 tests/integration/replication-psync-multimaster.tcl diff --git a/tests/integration/psync2-reg-multimaster.tcl b/tests/integration/psync2-reg-multimaster.tcl new file mode 100644 index 000000000..1f15d75bb --- /dev/null +++ b/tests/integration/psync2-reg-multimaster.tcl @@ -0,0 +1,102 @@ +# Issue 3899 regression test. +# We create a chain of three instances: master -> slave -> slave2 +# and continuously break the link while traffic is generated by +# keydb-benchmark. At the end we check that the data is the same +# everywhere. + +start_server {tags {"psync2"} overrides {active-replica {yes} multi-master {yes} client-output-buffer-limit {replica 200mb 10mb 999999} } } { +start_server {overrides {active-replica {yes} multi-master {yes} client-output-buffer-limit {replica 200mb 10mb 999999} } } { +start_server {overrides {active-replica {yes} multi-master {yes} client-output-buffer-limit {replica 200mb 10mb 999999} } } { + # Config + set debug_msg 0 ; # Enable additional debug messages + + set no_exit 0 ; # Do not exit at end of the test + + set duration 20 ; # Total test seconds + + for {set j 0} {$j < 3} {incr j} { + set R($j) [srv [expr 0-$j] client] + set R_host($j) [srv [expr 0-$j] host] + set R_port($j) [srv [expr 0-$j] port] + set R_unixsocket($j) [srv [expr 0-$j] unixsocket] + if {$debug_msg} {puts "Log file: [srv [expr 0-$j] stdout]"} + } + + # Setup the replication and backlog parameters + test "PSYNC2 #3899 regression: setup" { + $R(0) slaveof $R_host(1) $R_port(1) + $R(0) slaveof $R_host(2) $R_port(2) + $R(1) slaveof $R_host(0) $R_port(0) + $R(1) slaveof $R_host(2) $R_port(2) + $R(2) slaveof $R_host(0) $R_port(0) + $R(2) slaveof $R_host(1) $R_port(1) + + $R(0) set foo bar + wait_for_condition 50 1000 { + [status $R(1) master_link_status] == "up" && + [status $R(2) master_link_status] == "up" && + [$R(1) dbsize] == 1 && + [$R(2) dbsize] == 1 + } else { + fail "Replicas not replicating from master" + } + + $R(0) config set repl-backlog-size 200mb + $R(1) config set repl-backlog-size 200mb + $R(2) config set repl-backlog-size 200mb + + } + + set cycle_start_time [clock milliseconds] + set bench_pid [exec src/keydb-benchmark -s $R_unixsocket(0) -n 10000000 -r 1000 incr __rand_int__ > /dev/null &] + while 1 { + set elapsed [expr {[clock milliseconds]-$cycle_start_time}] + if {$elapsed > $duration*1000} break + if {rand() < .05} { + test "PSYNC2 #3899 regression: kill first replica" { + $R(1) client kill type master + } + } + if {rand() < .05} { + test "PSYNC2 #3899 regression: kill chained replica" { + $R(2) client kill type master + } + } + after 100 + } + exec kill -9 $bench_pid + + if {$debug_msg} { + for {set j 0} {$j < 100} {incr j} { + if { + [$R(0) debug digest] == [$R(1) debug digest] && + [$R(1) debug digest] == [$R(2) debug digest] + } break + puts [$R(0) debug digest] + puts [$R(1) debug digest] + puts [$R(2) debug digest] + after 1000 + } + } + + test "PSYNC2 #3899 regression: verify consistency" { + wait_for_condition 50 1000 { + ([$R(0) debug digest] eq [$R(1) debug digest]) && + ([$R(1) debug digest] eq [$R(2) debug digest]) + } else { + set csv3 [csvdump {r -2}] + set csv2 [csvdump {r -1}] + set csv1 [csvdump r] + set fd [open /tmp/repldump1.txt w] + puts -nonewline $fd $csv1 + close $fd + set fd [open /tmp/repldump2.txt w] + puts -nonewline $fd $csv2 + close $fd + set fd [open /tmp/repldump3.txt w] + puts -nonewline $fd $csv3 + close $fd + fail [format "The three instances have different data sets:\n\tnode 1: %s\n\tnode 2: %s\n\tnode 3: %s\nRun diff -u against /tmp/repldump*.txt for more info" [$R(0) debug digest] [$R(1) debug digest] [$R(2) debug digest] ] + } + } +}}} diff --git a/tests/integration/replication-psync-multimaster.tcl b/tests/integration/replication-psync-multimaster.tcl new file mode 100644 index 000000000..2665adf9e --- /dev/null +++ b/tests/integration/replication-psync-multimaster.tcl @@ -0,0 +1,153 @@ +# Creates a master-replica pair and breaks the link continuously to force +# partial resyncs attempts, all this while flooding the master with +# write queries. +# +# You can specify backlog size, ttl, delay before reconnection, test duration +# in seconds, and an additional condition to verify at the end. +# +# If reconnect is > 0, the test actually try to break the connection and +# reconnect with the master, otherwise just the initial synchronization is +# checked for consistency. +proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reconnect} { + start_server [list tags [list "repl"] overrides [list active-replica yes client-output-buffer-limit [list replica $backlog_size $backlog_size 9999999] ] ] { + start_server [list overrides [list client-output-buffer-limit [list replica $backlog_size $backlog_size 9999999] active-replica yes ] ] { + + set master [srv -1 client] + set master_host [srv -1 host] + set master_port [srv -1 port] + set replica [srv 0 client] + set replica_host [srv 0 host] + set replica_port [srv 0 port] + + $master config set repl-backlog-size $backlog_size + $master config set repl-backlog-ttl $backlog_ttl + $master config set repl-diskless-sync $mdl + $master config set repl-diskless-sync-delay 1 + $replica config set repl-diskless-load $sdl + + test {Replica should be able to synchronize with the master} { + $replica replicaof $master_host $master_port + } + + after 1000 + + test {Master should be able to synchronize with the replica} { + $master replicaof $replica_host $replica_port + } + + set load_handle0 [start_climbing_load $master_host $master_port 9 100000] + set load_handle1 [start_climbing_load $master_host $master_port 11 100000] + set load_handle2 [start_climbing_load $master_host $master_port 12 100000] + + # Check that the background clients are actually writing. + test {Detect write load to master} { + wait_for_condition 50 1000 { + [$master dbsize] > 100 + } else { + fail "Can't detect write load from background clients." + } + } + + test "Test replication partial resync: $descr (diskless: $mdl, $sdl, reconnect: $reconnect)" { + # Now while the clients are writing data, break the maste-replica + # link multiple times. + if ($reconnect) { + for {set j 0} {$j < $duration*10} {incr j} { + after 100 + # catch {puts "MASTER [$master dbsize] keys, REPLICA [$replica dbsize] keys"} + + if {($j % 20) == 0} { + catch { + if {$delay} { + $replica multi + $replica client kill $master_host:$master_port + $replica debug sleep $delay + $replica exec + } else { + $replica client kill $master_host:$master_port + } + } + } + } + } + stop_bg_complex_data $load_handle0 + stop_bg_complex_data $load_handle1 + stop_bg_complex_data $load_handle2 + + # Wait for the replica to reach the "online" + # state from the POV of the master. + set retry 5000 + while {$retry} { + set info [$master info] + if {[string match {*slave0:*state=online*} $info]} { + break + } else { + incr retry -1 + after 100 + } + } + if {$retry == 0} { + error "assertion:replica not correctly synchronized" + } + + # Wait that replica acknowledge it is online so + # we are sure that DBSIZE and DEBUG DIGEST will not + # fail because of timing issues. (-LOADING error) + wait_for_condition 5000 100 { + [lindex [$replica role] 3] eq {connected} + } else { + fail "replica still not connected after some time" + } + + wait_for_condition 100 100 { + [$master debug digest] == [$replica debug digest] + } else { + set csv1 [csvdump r] + set csv2 [csvdump {r -1}] + set fd [open /tmp/repldump1.txt w] + puts -nonewline $fd $csv1 + close $fd + set fd [open /tmp/repldump2.txt w] + puts -nonewline $fd $csv2 + close $fd + fail "Master - Replica inconsistency, Run diff -u against /tmp/repldump*.txt for more info" + } + assert {[$master dbsize] > 0} + eval $cond + } + } + } +} + + +foreach mdl {no yes} { + foreach sdl {disabled swapdb} { + test_psync {no reconnection, just sync} 6 1000000 3600 0 { + } $mdl $sdl 0 + + test_psync {ok psync} 6 100000000 3600 0 { + assert {[s -1 sync_partial_ok] > 0} + } $mdl $sdl 1 + + test_psync {no backlog} 6 100 3600 0.5 { + assert {[s -1 sync_partial_err] > 0} + } $mdl $sdl 1 + + test_psync {ok after delay} 3 100000000 3600 3 { + assert {[s -1 sync_partial_ok] > 0} + } $mdl $sdl 1 + + test_psync {backlog expired} 3 100000000 1 3 { + assert {[s -1 sync_partial_err] > 0} + } $mdl $sdl 1 + } +} + +foreach mdl {no} { + foreach sdl {swapdb} { + test_psync {backlog expired} 3 100000000 1 3 { + assert {[s -1 sync_partial_err] > 0} + } $mdl $sdl 1 + } +} + From 1995023a0c1b00485307b5a70713e5c066dfc58d Mon Sep 17 00:00:00 2001 From: Vivek Saini Date: Thu, 28 Apr 2022 21:34:10 +0000 Subject: [PATCH 32/35] Inclusive language fix --- tests/integration/psync2-reg-multimaster.tcl | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/integration/psync2-reg-multimaster.tcl b/tests/integration/psync2-reg-multimaster.tcl index 1f15d75bb..2e9b1187b 100644 --- a/tests/integration/psync2-reg-multimaster.tcl +++ b/tests/integration/psync2-reg-multimaster.tcl @@ -1,5 +1,5 @@ # Issue 3899 regression test. -# We create a chain of three instances: master -> slave -> slave2 +# We create a chain of three instances: master -> replica -> replica2 # and continuously break the link while traffic is generated by # keydb-benchmark. At the end we check that the data is the same # everywhere. @@ -24,12 +24,12 @@ start_server {overrides {active-replica {yes} multi-master {yes} client-output-b # Setup the replication and backlog parameters test "PSYNC2 #3899 regression: setup" { - $R(0) slaveof $R_host(1) $R_port(1) - $R(0) slaveof $R_host(2) $R_port(2) - $R(1) slaveof $R_host(0) $R_port(0) - $R(1) slaveof $R_host(2) $R_port(2) - $R(2) slaveof $R_host(0) $R_port(0) - $R(2) slaveof $R_host(1) $R_port(1) + $R(0) replicaof $R_host(1) $R_port(1) + $R(0) replicaof $R_host(2) $R_port(2) + $R(1) replicaof $R_host(0) $R_port(0) + $R(1) replicaof $R_host(2) $R_port(2) + $R(2) replicaof $R_host(0) $R_port(0) + $R(2) replicaof $R_host(1) $R_port(1) $R(0) set foo bar wait_for_condition 50 1000 { From d0386cadc614f395401339497ed3166f5a847303 Mon Sep 17 00:00:00 2001 From: Vivek Saini Date: Thu, 28 Apr 2022 21:36:01 +0000 Subject: [PATCH 33/35] Cleanup test suite --- tests/integration/replication-psync-multimaster.tcl | 9 --------- 1 file changed, 9 deletions(-) diff --git a/tests/integration/replication-psync-multimaster.tcl b/tests/integration/replication-psync-multimaster.tcl index 2665adf9e..65f718763 100644 --- a/tests/integration/replication-psync-multimaster.tcl +++ b/tests/integration/replication-psync-multimaster.tcl @@ -142,12 +142,3 @@ foreach mdl {no yes} { } $mdl $sdl 1 } } - -foreach mdl {no} { - foreach sdl {swapdb} { - test_psync {backlog expired} 3 100000000 1 3 { - assert {[s -1 sync_partial_err] > 0} - } $mdl $sdl 1 - } -} - From 5162693e1bb694c10c1657c85fd00483e688ced9 Mon Sep 17 00:00:00 2001 From: Vivek Saini Date: Thu, 28 Apr 2022 21:58:35 +0000 Subject: [PATCH 34/35] Updated test replica configs so tests make sense --- tests/integration/replication-psync-multimaster.tcl | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/integration/replication-psync-multimaster.tcl b/tests/integration/replication-psync-multimaster.tcl index 65f718763..4d52f13a1 100644 --- a/tests/integration/replication-psync-multimaster.tcl +++ b/tests/integration/replication-psync-multimaster.tcl @@ -24,6 +24,12 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reco $master config set repl-diskless-sync $mdl $master config set repl-diskless-sync-delay 1 $replica config set repl-diskless-load $sdl + + $replica config set repl-backlog-size $backlog_size + $replica config set repl-backlog-ttl $backlog_ttl + $replica config set repl-diskless-sync $mdl + $replica config set repl-diskless-sync-delay 1 + $master config set repl-diskless-load $sdl test {Replica should be able to synchronize with the master} { $replica replicaof $master_host $master_port From d521bfc6027b40e20bec90f723d420ff1055779c Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 3 May 2022 00:02:22 +0000 Subject: [PATCH 35/35] active-rep test reliability --- tests/integration/replication-active.tcl | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/tests/integration/replication-active.tcl b/tests/integration/replication-active.tcl index 5adf97647..957e04919 100644 --- a/tests/integration/replication-active.tcl +++ b/tests/integration/replication-active.tcl @@ -241,10 +241,18 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} { $slave replicaof $master_host $master_port after 1000 $master replicaof $slave_host $slave_port - after 1000 - assert_equal {bar} [$slave get testkey] {replica is correct} - assert_equal {bar} [$master get testkey] {master is correct} + wait_for_condition 50 100 { + [string match bar [$slave get testkey]] + } else { + fail "Replica is not correct" + } + + wait_for_condition 50 100 { + [string match bar [$master get testkey]] + } else { + fail "Master is not correct" + } } test {Active replica merge works with client blocked} {