From 20abf506ae31994493e5a1cdf5ba65f6b1d1656a Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 18 Apr 2022 23:05:03 +0000 Subject: [PATCH 1/9] Fix bug in dockerfile script --- docker-internal/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker-internal/Dockerfile b/docker-internal/Dockerfile index 17703fbee..33993eea7 100644 --- a/docker-internal/Dockerfile +++ b/docker-internal/Dockerfile @@ -91,7 +91,7 @@ RUN \ sed -i 's/^\(logfile .*\)$/# \1/' /etc/keydb/keydb.conf; \ sed -i 's/protected-mode yes/protected-mode no/g' /etc/keydb/keydb.conf; \ sed -i 's/^\(bind .*\)$/# \1/' /etc/keydb/keydb.conf; \ - echo "loadmodule /usr/local/lib/modstatsd.so" >> /etc/keydb/keydb.conf; \ + echo -e "\nloadmodule /usr/local/lib/modstatsd.so" >> /etc/keydb/keydb.conf; \ ln -s keydb-cli redis-cli; \ cd /etc/keydb; \ ln -s keydb.conf redis.conf; \ From 63e78ab7f3bf801fdc74e069b581bc8c82a8283c Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 18 Apr 2022 23:07:15 +0000 Subject: [PATCH 2/9] Make active client balancing a configurable option --- keydb.conf | 9 +++++++++ src/config.cpp | 1 + src/networking.cpp | 4 ++-- src/server.h | 1 + 4 files changed, 13 insertions(+), 2 deletions(-) diff --git a/keydb.conf b/keydb.conf index 04f66f766..55594ef56 100644 --- a/keydb.conf +++ b/keydb.conf @@ -2070,3 +2070,12 @@ server-threads 2 # # By default KeyDB sets this to 2. replica-weighting-factor 2 + +# Should KeyDB make active attempts at balancing clients across threads? This can impact +# performance accepting new clients. By default this is enabled. If disabled there is still +# a best effort from the kernel to distribute across threads with SO_REUSEPORT but it will not +# be as fair. +# +# By default this is enabled +# +active-client-balancing yes \ No newline at end of file diff --git a/src/config.cpp b/src/config.cpp index efd7928bd..eaa1e2dd9 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -2779,6 +2779,7 @@ standardConfig configs[] = { createBoolConfig("replica-announced", NULL, MODIFIABLE_CONFIG, g_pserver->replica_announced, 1, NULL, NULL), createBoolConfig("enable-async-commands", NULL, MODIFIABLE_CONFIG, g_pserver->enable_async_commands, 1, NULL, NULL), createBoolConfig("multithread-load-enabled", NULL, MODIFIABLE_CONFIG, g_pserver->multithread_load_enabled, 0, NULL, NULL), + createBoolConfig("active-client-balancing", NULL, MODIFIABLE_CONFIG, g_pserver->active_client_balancing, 1, NULL, NULL), /* String Configs */ createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, g_pserver->acl_filename, "", NULL, NULL), diff --git a/src/networking.cpp b/src/networking.cpp index a7920b692..4346c7479 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1319,7 +1319,7 @@ void acceptOnThread(connection *conn, int flags, char *cip) int ielCur = ielFromEventLoop(serverTL->el); bool fBootLoad = (g_pserver->loading == LOADING_BOOT); - int ielTarget = 0; + int ielTarget = ielCur; if (fBootLoad) { ielTarget = IDX_EVENT_LOOP_MAIN; // During load only the main thread is active @@ -1330,7 +1330,7 @@ void acceptOnThread(connection *conn, int flags, char *cip) while (cserver.cthreads > 1 && ielTarget == IDX_EVENT_LOOP_MAIN) ielTarget = rand() % cserver.cthreads; } - else + else if (g_pserver->active_client_balancing) { // Cluster connections are more transient, so its not worth the cost to balance // we can trust that SO_REUSEPORT is doing its job of distributing connections diff --git a/src/server.h b/src/server.h index 974661ad8..305177651 100644 --- a/src/server.h +++ b/src/server.h @@ -2635,6 +2635,7 @@ struct redisServer { int enable_async_commands; int multithread_load_enabled = 0; + int active_client_balancing = 1; long long repl_batch_offStart = -1; long long repl_batch_idxStart = -1; From 05dc5a470ed5c59d9bbc228607c989d62204e553 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 18 Apr 2022 23:07:36 +0000 Subject: [PATCH 3/9] With TLS throttle accepts if server is under heavy load - do not change non TLS behavior --- src/networking.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/networking.cpp b/src/networking.cpp index 4346c7479..0f529125e 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1405,6 +1405,8 @@ void acceptTLSHandler(aeEventLoop *el, int fd, void *privdata, int mask) { serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); acceptOnThread(connCreateAcceptedTLS(cfd, g_pserver->tls_auth_clients), 0, cip); + if (aeLockContention() >= 2) + break; } } From f9b0cb0d553e8f5aad1d7231df0f8d8507ebd205 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 21 Apr 2022 19:53:16 +0000 Subject: [PATCH 4/9] acceptTLS is threadsafe like the non TLS version --- src/server.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server.cpp b/src/server.cpp index 99cbec3f0..eeea18c9d 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -3773,7 +3773,7 @@ static void initNetworkingThread(int iel, int fReusePort) makeThreadKillable(); for (int j = 0; j < g_pserver->rgthreadvar[iel].tlsfd.count; j++) { - if (aeCreateFileEvent(g_pserver->rgthreadvar[iel].el, g_pserver->rgthreadvar[iel].tlsfd.fd[j], AE_READABLE, + if (aeCreateFileEvent(g_pserver->rgthreadvar[iel].el, g_pserver->rgthreadvar[iel].tlsfd.fd[j], AE_READABLE|AE_READ_THREADSAFE, acceptTLSHandler,NULL) == AE_ERR) { serverPanic( From 8a7ace0a346811a4b65013685fc310912a396a95 Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 22 Apr 2022 22:43:20 +0000 Subject: [PATCH 5/9] PSYNC production fixes --- src/rdb.cpp | 120 +++++++++++++++++++++++++------------------- src/replication.cpp | 4 +- src/server.cpp | 40 +++++---------- src/server.h | 95 +++++++++++++++++++++-------------- src/t_stream.cpp | 1 - src/tls.cpp | 7 +++ 6 files changed, 149 insertions(+), 118 deletions(-) diff --git a/src/rdb.cpp b/src/rdb.cpp index 5d158eec5..320ab918b 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1218,37 +1218,15 @@ int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { == -1) return -1; if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",rsi->master_repl_offset) == -1) return -1; - if (g_pserver->fActiveReplica && listLength(g_pserver->masters) > 0) { + if (g_pserver->fActiveReplica) { sdsstring val = sdsstring(sdsempty()); - listNode *ln; - listIter li; - redisMaster* mi; - listRewind(g_pserver->masters,&li); - while ((ln = listNext(&li)) != NULL) { - mi = (redisMaster*)listNodeValue(ln); - if (!mi->master) { - // If master client is not available, use info from master struct - better than nothing - serverLog(LL_NOTICE, "saving master %s", mi->master_replid); - if (mi->master_replid[0] == 0) { - // if replid is null, there's no reason to save it - continue; - } - val = val.catfmt("%s:%I:%s:%i;", mi->master_replid, - mi->master_initial_offset, - mi->masterhost, - mi->masterport); - } - else { - serverLog(LL_NOTICE, "saving master %s", mi->master->replid); - if (mi->master->replid[0] == 0) { - // if replid is null, there's no reason to save it - continue; - } - val = val.catfmt("%s:%I:%s:%i;", mi->master->replid, - mi->master->reploff, - mi->masterhost, - mi->masterport); - } + + for (auto &msi : rsi->vecmastersaveinfo) { + val = val.catfmt("%s:%I:%s:%i:%i;", msi.master_replid, + msi.master_initial_offset, + msi.masterhost.get(), + msi.masterport, + msi.selected_db); } if (rdbSaveAuxFieldStrStr(rdb, "repl-masters",val.get()) == -1) return -1; } @@ -1661,11 +1639,12 @@ int launchRdbSaveThread(pthread_t &child, rdbSaveInfo *rsi) return rdbSaveBackgroundFork(rsi); } else { - rdbSaveThreadArgs *args = (rdbSaveThreadArgs*)zmalloc(sizeof(rdbSaveThreadArgs) + ((cserver.dbnum-1)*sizeof(redisDbPersistentDataSnapshot*)), MALLOC_LOCAL); + rdbSaveThreadArgs *args = (rdbSaveThreadArgs*)zcalloc(sizeof(rdbSaveThreadArgs) + ((cserver.dbnum-1)*sizeof(redisDbPersistentDataSnapshot*)), MALLOC_LOCAL); + // Placement new rdbSaveInfo rsiT; if (rsi == nullptr) rsi = &rsiT; - args->rsi = *(new (args) rdbSaveInfo(*rsi)); + args->rsi = *rsi; memcpy(&args->rsi.repl_id, g_pserver->replid, sizeof(g_pserver->replid)); args->rsi.master_repl_offset = g_pserver->master_repl_offset; @@ -2922,11 +2901,11 @@ public: * snapshot taken by the master may not be reflected on the replica. */ bool fExpiredKey = iAmMaster() && !(this->rdbflags&RDBFLAGS_AOF_PREAMBLE) && job.expiretime != INVALID_EXPIRE && job.expiretime < this->now; if (fStaleMvccKey || fExpiredKey) { - if (fStaleMvccKey && !fExpiredKey && this->rsi != nullptr && this->rsi->masters != nullptr && this->rsi->masters->staleKeyMap != nullptr && lookupKeyRead(job.db, &keyobj) == nullptr) { + if (fStaleMvccKey && !fExpiredKey && this->rsi != nullptr && this->rsi->mi != nullptr && this->rsi->mi->staleKeyMap != nullptr && lookupKeyRead(job.db, &keyobj) == nullptr) { // We have a key that we've already deleted and is not back in our database. // We'll need to inform the sending master of the delete if it is also a replica of us robj_sharedptr objKeyDup(createStringObject(job.key, sdslen(job.key))); - this->rsi->masters->staleKeyMap->operator[](job.db->id).push_back(objKeyDup); + this->rsi->mi->staleKeyMap->operator[](job.db->id).push_back(objKeyDup); } sdsfree(job.key); job.key = nullptr; @@ -3242,19 +3221,26 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { } } else if (!strcasecmp(szFromObj(auxkey),"repl-masters")) { if (rsi) { - struct redisMaster mi; + MasterSaveInfo msi; char *masters = szFromObj(auxval); - char *entry = strtok(masters, ":"); + char *saveptr; + char *entry = strtok_r(masters, ":", &saveptr); while (entry != NULL) { - memcpy(mi.master_replid, entry, sizeof(mi.master_replid)); - entry = strtok(NULL, ":"); - mi.master_initial_offset = atoi(entry); - entry = strtok(NULL, ":"); - mi.masterhost = entry; - entry = strtok(NULL, ";"); - mi.masterport = atoi(entry); - entry = strtok(NULL, ":"); - rsi->addMaster(mi); + memcpy(msi.master_replid, entry, sizeof(msi.master_replid)); + entry = strtok_r(NULL, ":", &saveptr); + if (entry == nullptr) break; + msi.master_initial_offset = atoll(entry); + entry = strtok_r(NULL, ":", &saveptr); + if (entry == nullptr) break; + msi.masterhost = sdsstring(sdsnew(entry)); + entry = strtok_r(NULL, ":", &saveptr); + if (entry == nullptr) break; + msi.masterport = atoi(entry); + entry = strtok_r(NULL, ";", &saveptr); + if (entry == nullptr) break; + msi.selected_db = atoi(entry); + entry = strtok_r(NULL, ":", &saveptr); + rsi->addMaster(msi); } } } else if (!strcasecmp(szFromObj(auxkey),"repl-offset")) { @@ -3533,6 +3519,32 @@ eoferr: return C_ERR; } +void updateActiveReplicaMastersFromRsi(rdbSaveInfo *rsi) { + if (rsi != nullptr && g_pserver->fActiveReplica) { + serverLog(LL_NOTICE, "RDB contains information on %d masters", (int)rsi->numMasters()); + listIter li; + listNode *ln; + + listRewind(g_pserver->masters, &li); + while ((ln = listNext(&li))) + { + redisMaster *mi = (redisMaster*)listNodeValue(ln); + if (mi->master != nullptr) { + continue; //ignore connected masters + } + for (size_t i = 0; i < rsi->numMasters(); i++) { + if (!sdscmp(mi->masterhost, (sds)rsi->vecmastersaveinfo[i].masterhost.get()) && mi->masterport == rsi->vecmastersaveinfo[i].masterport) { + memcpy(mi->master_replid, rsi->vecmastersaveinfo[i].master_replid, sizeof(mi->master_replid)); + mi->master_initial_offset = rsi->vecmastersaveinfo[i].master_initial_offset; + replicationCacheMasterUsingMaster(mi); + serverLog(LL_NOTICE, "Cached master recovered from RDB for %s:%d", mi->masterhost, mi->masterport); + break; + } + } + } + } +} + int rdbLoad(rdbSaveInfo *rsi, int rdbflags) { int err = C_ERR; @@ -3913,11 +3925,22 @@ void bgsaveCommand(client *c) { * information. */ rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) { rdbSaveInfo rsi_init; - *rsi = std::move(rsi_init); + *rsi = rsi_init; memcpy(rsi->repl_id, g_pserver->replid, sizeof(g_pserver->replid)); rsi->master_repl_offset = g_pserver->master_repl_offset; + if (g_pserver->fActiveReplica) { + listIter li; + listNode *ln = nullptr; + listRewind(g_pserver->masters, &li); + while ((ln = listNext(&li))) { + redisMaster *mi = (redisMaster*)listNodeValue(ln); + MasterSaveInfo msi(*mi); + rsi->addMaster(msi); + } + } + /* If the instance is a master, we can populate the replication info * only when repl_backlog is not NULL. If the repl_backlog is NULL, * it means that the instance isn't in any replication chains. In this @@ -3935,11 +3958,6 @@ rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) { return rsi; } - if (listLength(g_pserver->masters) > 1) - { - // BUGBUG, warn user about this incomplete implementation - serverLog(LL_WARNING, "Warning: Only backing up first master's information in RDB"); - } struct redisMaster *miFirst = (redisMaster*)(listLength(g_pserver->masters) ? listNodeValue(listFirst(g_pserver->masters)) : NULL); /* If the instance is a replica we need a connected master diff --git a/src/replication.cpp b/src/replication.cpp index 7ca2a647e..fd713e55b 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -2856,6 +2856,7 @@ bool readSyncBulkPayloadRdb(connection *conn, redisMaster *mi, rdbSaveInfo &rsi, * gets promoted. */ return false; } + if (g_pserver->fActiveReplica) updateActiveReplicaMastersFromRsi(&rsi); /* RDB loading succeeded if we reach this point. */ if (g_pserver->repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { @@ -2934,7 +2935,7 @@ bool readSyncBulkPayloadRdb(connection *conn, redisMaster *mi, rdbSaveInfo &rsi, mi->staleKeyMap->clear(); else mi->staleKeyMap = new (MALLOC_LOCAL) std::map>(); - rsi.addMaster(*mi); + rsi.mi = mi; } if (rdbLoadFile(rdb_filename,&rsi,RDBFLAGS_REPLICATION) != C_OK) { serverLog(LL_WARNING, @@ -2951,6 +2952,7 @@ bool readSyncBulkPayloadRdb(connection *conn, redisMaster *mi, rdbSaveInfo &rsi, it'll be restarted when sync succeeds or replica promoted. */ return false; } + if (g_pserver->fActiveReplica) updateActiveReplicaMastersFromRsi(&rsi); /* Cleanup. */ if (g_pserver->rdb_del_sync_files && allPersistenceDisabled()) { diff --git a/src/server.cpp b/src/server.cpp index 3cac24e97..373b20337 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1141,7 +1141,7 @@ struct redisCommand redisCommandTable[] = { 0,NULL,0,0,0,0,0,0}, {"rreplay",replicaReplayCommand,-3, - "read-only fast noprop", + "read-only fast noprop ok-stale", 0,NULL,0,0,0,0,0,0}, {"keydb.cron",cronCommand,-5, @@ -4938,7 +4938,8 @@ int processCommand(client *c, int callFlags) { if (FBrokenLinkToMaster() && g_pserver->repl_serve_stale_data == 0 && is_denystale_command && - !(g_pserver->fActiveReplica && c->cmd->proc == syncCommand)) + !(g_pserver->fActiveReplica && c->cmd->proc == syncCommand) + && !FInReplicaReplay()) { rejectCommand(c, shared.masterdownerr); return C_OK; @@ -6962,31 +6963,15 @@ void loadDataFromDisk(void) { g_pserver->master_repl_offset = rsi.repl_offset; if (g_pserver->repl_batch_offStart >= 0) g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; - listIter li; - listNode *ln; - - listRewind(g_pserver->masters, &li); - while ((ln = listNext(&li))) - { - redisMaster *mi = (redisMaster*)listNodeValue(ln); - if (g_pserver->fActiveReplica) { - for (size_t i = 0; i < rsi.numMasters(); i++) { - if (!strcmp(mi->masterhost, rsi.masters[i].masterhost) && mi->masterport == rsi.masters[i].masterport) { - memcpy(mi->master_replid, rsi.masters[i].master_replid, sizeof(mi->master_replid)); - mi->master_initial_offset = rsi.masters[i].master_initial_offset; - replicationCacheMasterUsingMaster(mi); - serverLog(LL_NOTICE, "Cached master recovered from RDB for %s:%d", mi->masterhost, mi->masterport); - } - } - } - else { - /* If we are a replica, create a cached master from this - * information, in order to allow partial resynchronizations - * with masters. */ - replicationCacheMasterUsingMyself(mi); - selectDb(mi->cached_master,rsi.repl_stream_db); - } - } + } + updateActiveReplicaMastersFromRsi(&rsi); + if (!g_pserver->fActiveReplica && listLength(g_pserver->masters)) { + redisMaster *mi = (redisMaster*)listNodeValue(listFirst(g_pserver->masters)); + /* If we are a replica, create a cached master from this + * information, in order to allow partial resynchronizations + * with masters. */ + replicationCacheMasterUsingMyself(mi); + selectDb(mi->cached_master,rsi.repl_stream_db); } } else if (errno != ENOENT) { serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno)); @@ -7293,6 +7278,7 @@ void *workerThreadMain(void *parg) serverAssert(!GlobalLocksAcquired()); aeDeleteEventLoop(el); + tlsCleanupThread(); return NULL; } diff --git a/src/server.h b/src/server.h index 0a7d7c1e6..f96482612 100644 --- a/src/server.h +++ b/src/server.h @@ -1887,6 +1887,40 @@ struct redisMaster { int ielReplTransfer = -1; }; +struct MasterSaveInfo { + MasterSaveInfo() = default; + MasterSaveInfo(const redisMaster &mi) { + memcpy(master_replid, mi.master_replid, sizeof(mi.master_replid)); + if (mi.master) { + master_initial_offset = mi.master->reploff; + selected_db = mi.master->db->id; + } else if (mi.cached_master) { + master_initial_offset = mi.cached_master->reploff; + selected_db = mi.cached_master->db->id; + } else { + master_initial_offset = -1; + selected_db = 0; + } + masterport = mi.masterport; + masterhost = sdsstring(sdsdup(mi.masterhost)); + masterport = mi.masterport; + } + + MasterSaveInfo &operator=(const MasterSaveInfo &other) { + masterhost = other.masterhost; + masterport = other.masterport; + memcpy(master_replid, other.master_replid, sizeof(master_replid)); + master_initial_offset = other.master_initial_offset; + return *this; + } + + sdsstring masterhost; + int masterport; + char master_replid[CONFIG_RUN_ID_SIZE+1]; + long long master_initial_offset; + int selected_db; +}; + /* This structure can be optionally passed to RDB save/load functions in * order to implement additional functionalities, by storing and loading * metadata to the RDB file. @@ -1904,8 +1938,6 @@ public: repl_offset = -1; fForceSetKey = TRUE; mvccMinThreshold = 0; - masters = nullptr; - masterCount = 0; } rdbSaveInfo(const rdbSaveInfo &other) { repl_stream_db = other.repl_stream_db; @@ -1914,45 +1946,31 @@ public: repl_offset = other.repl_offset; fForceSetKey = other.fForceSetKey; mvccMinThreshold = other.mvccMinThreshold; - masters = (struct redisMaster*)malloc(sizeof(struct redisMaster) * other.masterCount); - memcpy(masters, other.masters, sizeof(struct redisMaster) * other.masterCount); - masterCount = other.masterCount; + vecmastersaveinfo = other.vecmastersaveinfo; + master_repl_offset = other.master_repl_offset; + mi = other.mi; } - rdbSaveInfo(rdbSaveInfo &&other) : rdbSaveInfo() { - swap(*this, other); - } - rdbSaveInfo &operator=(rdbSaveInfo other) { - swap(*this, other); + + rdbSaveInfo &operator=(const rdbSaveInfo &other) { + repl_stream_db = other.repl_stream_db; + repl_id_is_set = other.repl_id_is_set; + memcpy(repl_id, other.repl_id, sizeof(repl_id)); + repl_offset = other.repl_offset; + fForceSetKey = other.fForceSetKey; + mvccMinThreshold = other.mvccMinThreshold; + vecmastersaveinfo = other.vecmastersaveinfo; + master_repl_offset = other.master_repl_offset; + mi = other.mi; + return *this; } - ~rdbSaveInfo() { - free(masters); - } - friend void swap(rdbSaveInfo &first, rdbSaveInfo &second) { - std::swap(first.repl_stream_db, second.repl_stream_db); - std::swap(first.repl_id_is_set, second.repl_id_is_set); - std::swap(first.repl_id, second.repl_id); - std::swap(first.repl_offset, second.repl_offset); - std::swap(first.fForceSetKey, second.fForceSetKey); - std::swap(first.mvccMinThreshold, second.mvccMinThreshold); - std::swap(first.masters, second.masters); - std::swap(first.masterCount, second.masterCount); - } - - void addMaster(const struct redisMaster &mi) { - masterCount++; - if (masters == nullptr) { - masters = (struct redisMaster*)malloc(sizeof(struct redisMaster)); - } - else { - masters = (struct redisMaster*)realloc(masters, sizeof(struct redisMaster) * masterCount); - } - memcpy(masters + masterCount - 1, &mi, sizeof(struct redisMaster)); + void addMaster(const MasterSaveInfo &si) { + vecmastersaveinfo.push_back(si); } size_t numMasters() { - return masterCount; + return vecmastersaveinfo.size(); } /* Used saving and loading. */ @@ -1968,10 +1986,8 @@ public: long long master_repl_offset; uint64_t mvccMinThreshold; - struct redisMaster *masters; - -private: - size_t masterCount; + std::vector vecmastersaveinfo; + struct redisMaster *mi = nullptr; }; struct malloc_stats { @@ -3853,6 +3869,8 @@ void lfenceCommand(client *c); int FBrokenLinkToMaster(int *pconnectMasters = nullptr); int FActiveMaster(client *c); struct redisMaster *MasterInfoFromClient(client *c); +bool FInReplicaReplay(); +void updateActiveReplicaMastersFromRsi(rdbSaveInfo *rsi); /* MVCC */ uint64_t getMvccTstamp(); @@ -3950,6 +3968,7 @@ void makeThreadKillable(void); /* TLS stuff */ void tlsInit(void); void tlsInitThread(); +void tlsCleanupThread(); void tlsCleanup(void); int tlsConfigure(redisTLSContextConfig *ctx_config); void tlsReload(void); diff --git a/src/t_stream.cpp b/src/t_stream.cpp index 010e9b65b..b005cf600 100644 --- a/src/t_stream.cpp +++ b/src/t_stream.cpp @@ -56,7 +56,6 @@ void streamFreeCG(streamCG *cg); void streamFreeNACK(streamNACK *na); size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer); -bool FInReplicaReplay(); int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq); int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq); diff --git a/src/tls.cpp b/src/tls.cpp index 9b3a0415c..68651bfbb 100644 --- a/src/tls.cpp +++ b/src/tls.cpp @@ -180,6 +180,12 @@ void tlsInitThread(void) pending_list = listCreate(); } +void tlsCleanupThread(void) +{ + if (pending_list) + listRelease(pending_list); +} + void tlsCleanup(void) { if (redis_tls_ctx) { SSL_CTX_free(redis_tls_ctx); @@ -1260,6 +1266,7 @@ int tlsProcessPendingData() { } void tlsInitThread() {} +void tlsCleanupThread(void) {} sds connTLSGetPeerCert(connection *conn_) { (void) conn_; From 20c34a91da474c05d4e7aca50f7571bc6482d0c5 Mon Sep 17 00:00:00 2001 From: Vivek Saini Date: Thu, 28 Apr 2022 21:30:40 +0000 Subject: [PATCH 6/9] Converted some existing PSYNC tests for multimaster --- tests/integration/psync2-reg-multimaster.tcl | 102 ++++++++++++ .../replication-psync-multimaster.tcl | 153 ++++++++++++++++++ 2 files changed, 255 insertions(+) create mode 100644 tests/integration/psync2-reg-multimaster.tcl create mode 100644 tests/integration/replication-psync-multimaster.tcl diff --git a/tests/integration/psync2-reg-multimaster.tcl b/tests/integration/psync2-reg-multimaster.tcl new file mode 100644 index 000000000..1f15d75bb --- /dev/null +++ b/tests/integration/psync2-reg-multimaster.tcl @@ -0,0 +1,102 @@ +# Issue 3899 regression test. +# We create a chain of three instances: master -> slave -> slave2 +# and continuously break the link while traffic is generated by +# keydb-benchmark. At the end we check that the data is the same +# everywhere. + +start_server {tags {"psync2"} overrides {active-replica {yes} multi-master {yes} client-output-buffer-limit {replica 200mb 10mb 999999} } } { +start_server {overrides {active-replica {yes} multi-master {yes} client-output-buffer-limit {replica 200mb 10mb 999999} } } { +start_server {overrides {active-replica {yes} multi-master {yes} client-output-buffer-limit {replica 200mb 10mb 999999} } } { + # Config + set debug_msg 0 ; # Enable additional debug messages + + set no_exit 0 ; # Do not exit at end of the test + + set duration 20 ; # Total test seconds + + for {set j 0} {$j < 3} {incr j} { + set R($j) [srv [expr 0-$j] client] + set R_host($j) [srv [expr 0-$j] host] + set R_port($j) [srv [expr 0-$j] port] + set R_unixsocket($j) [srv [expr 0-$j] unixsocket] + if {$debug_msg} {puts "Log file: [srv [expr 0-$j] stdout]"} + } + + # Setup the replication and backlog parameters + test "PSYNC2 #3899 regression: setup" { + $R(0) slaveof $R_host(1) $R_port(1) + $R(0) slaveof $R_host(2) $R_port(2) + $R(1) slaveof $R_host(0) $R_port(0) + $R(1) slaveof $R_host(2) $R_port(2) + $R(2) slaveof $R_host(0) $R_port(0) + $R(2) slaveof $R_host(1) $R_port(1) + + $R(0) set foo bar + wait_for_condition 50 1000 { + [status $R(1) master_link_status] == "up" && + [status $R(2) master_link_status] == "up" && + [$R(1) dbsize] == 1 && + [$R(2) dbsize] == 1 + } else { + fail "Replicas not replicating from master" + } + + $R(0) config set repl-backlog-size 200mb + $R(1) config set repl-backlog-size 200mb + $R(2) config set repl-backlog-size 200mb + + } + + set cycle_start_time [clock milliseconds] + set bench_pid [exec src/keydb-benchmark -s $R_unixsocket(0) -n 10000000 -r 1000 incr __rand_int__ > /dev/null &] + while 1 { + set elapsed [expr {[clock milliseconds]-$cycle_start_time}] + if {$elapsed > $duration*1000} break + if {rand() < .05} { + test "PSYNC2 #3899 regression: kill first replica" { + $R(1) client kill type master + } + } + if {rand() < .05} { + test "PSYNC2 #3899 regression: kill chained replica" { + $R(2) client kill type master + } + } + after 100 + } + exec kill -9 $bench_pid + + if {$debug_msg} { + for {set j 0} {$j < 100} {incr j} { + if { + [$R(0) debug digest] == [$R(1) debug digest] && + [$R(1) debug digest] == [$R(2) debug digest] + } break + puts [$R(0) debug digest] + puts [$R(1) debug digest] + puts [$R(2) debug digest] + after 1000 + } + } + + test "PSYNC2 #3899 regression: verify consistency" { + wait_for_condition 50 1000 { + ([$R(0) debug digest] eq [$R(1) debug digest]) && + ([$R(1) debug digest] eq [$R(2) debug digest]) + } else { + set csv3 [csvdump {r -2}] + set csv2 [csvdump {r -1}] + set csv1 [csvdump r] + set fd [open /tmp/repldump1.txt w] + puts -nonewline $fd $csv1 + close $fd + set fd [open /tmp/repldump2.txt w] + puts -nonewline $fd $csv2 + close $fd + set fd [open /tmp/repldump3.txt w] + puts -nonewline $fd $csv3 + close $fd + fail [format "The three instances have different data sets:\n\tnode 1: %s\n\tnode 2: %s\n\tnode 3: %s\nRun diff -u against /tmp/repldump*.txt for more info" [$R(0) debug digest] [$R(1) debug digest] [$R(2) debug digest] ] + } + } +}}} diff --git a/tests/integration/replication-psync-multimaster.tcl b/tests/integration/replication-psync-multimaster.tcl new file mode 100644 index 000000000..2665adf9e --- /dev/null +++ b/tests/integration/replication-psync-multimaster.tcl @@ -0,0 +1,153 @@ +# Creates a master-replica pair and breaks the link continuously to force +# partial resyncs attempts, all this while flooding the master with +# write queries. +# +# You can specify backlog size, ttl, delay before reconnection, test duration +# in seconds, and an additional condition to verify at the end. +# +# If reconnect is > 0, the test actually try to break the connection and +# reconnect with the master, otherwise just the initial synchronization is +# checked for consistency. +proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reconnect} { + start_server [list tags [list "repl"] overrides [list active-replica yes client-output-buffer-limit [list replica $backlog_size $backlog_size 9999999] ] ] { + start_server [list overrides [list client-output-buffer-limit [list replica $backlog_size $backlog_size 9999999] active-replica yes ] ] { + + set master [srv -1 client] + set master_host [srv -1 host] + set master_port [srv -1 port] + set replica [srv 0 client] + set replica_host [srv 0 host] + set replica_port [srv 0 port] + + $master config set repl-backlog-size $backlog_size + $master config set repl-backlog-ttl $backlog_ttl + $master config set repl-diskless-sync $mdl + $master config set repl-diskless-sync-delay 1 + $replica config set repl-diskless-load $sdl + + test {Replica should be able to synchronize with the master} { + $replica replicaof $master_host $master_port + } + + after 1000 + + test {Master should be able to synchronize with the replica} { + $master replicaof $replica_host $replica_port + } + + set load_handle0 [start_climbing_load $master_host $master_port 9 100000] + set load_handle1 [start_climbing_load $master_host $master_port 11 100000] + set load_handle2 [start_climbing_load $master_host $master_port 12 100000] + + # Check that the background clients are actually writing. + test {Detect write load to master} { + wait_for_condition 50 1000 { + [$master dbsize] > 100 + } else { + fail "Can't detect write load from background clients." + } + } + + test "Test replication partial resync: $descr (diskless: $mdl, $sdl, reconnect: $reconnect)" { + # Now while the clients are writing data, break the maste-replica + # link multiple times. + if ($reconnect) { + for {set j 0} {$j < $duration*10} {incr j} { + after 100 + # catch {puts "MASTER [$master dbsize] keys, REPLICA [$replica dbsize] keys"} + + if {($j % 20) == 0} { + catch { + if {$delay} { + $replica multi + $replica client kill $master_host:$master_port + $replica debug sleep $delay + $replica exec + } else { + $replica client kill $master_host:$master_port + } + } + } + } + } + stop_bg_complex_data $load_handle0 + stop_bg_complex_data $load_handle1 + stop_bg_complex_data $load_handle2 + + # Wait for the replica to reach the "online" + # state from the POV of the master. + set retry 5000 + while {$retry} { + set info [$master info] + if {[string match {*slave0:*state=online*} $info]} { + break + } else { + incr retry -1 + after 100 + } + } + if {$retry == 0} { + error "assertion:replica not correctly synchronized" + } + + # Wait that replica acknowledge it is online so + # we are sure that DBSIZE and DEBUG DIGEST will not + # fail because of timing issues. (-LOADING error) + wait_for_condition 5000 100 { + [lindex [$replica role] 3] eq {connected} + } else { + fail "replica still not connected after some time" + } + + wait_for_condition 100 100 { + [$master debug digest] == [$replica debug digest] + } else { + set csv1 [csvdump r] + set csv2 [csvdump {r -1}] + set fd [open /tmp/repldump1.txt w] + puts -nonewline $fd $csv1 + close $fd + set fd [open /tmp/repldump2.txt w] + puts -nonewline $fd $csv2 + close $fd + fail "Master - Replica inconsistency, Run diff -u against /tmp/repldump*.txt for more info" + } + assert {[$master dbsize] > 0} + eval $cond + } + } + } +} + + +foreach mdl {no yes} { + foreach sdl {disabled swapdb} { + test_psync {no reconnection, just sync} 6 1000000 3600 0 { + } $mdl $sdl 0 + + test_psync {ok psync} 6 100000000 3600 0 { + assert {[s -1 sync_partial_ok] > 0} + } $mdl $sdl 1 + + test_psync {no backlog} 6 100 3600 0.5 { + assert {[s -1 sync_partial_err] > 0} + } $mdl $sdl 1 + + test_psync {ok after delay} 3 100000000 3600 3 { + assert {[s -1 sync_partial_ok] > 0} + } $mdl $sdl 1 + + test_psync {backlog expired} 3 100000000 1 3 { + assert {[s -1 sync_partial_err] > 0} + } $mdl $sdl 1 + } +} + +foreach mdl {no} { + foreach sdl {swapdb} { + test_psync {backlog expired} 3 100000000 1 3 { + assert {[s -1 sync_partial_err] > 0} + } $mdl $sdl 1 + } +} + From 1995023a0c1b00485307b5a70713e5c066dfc58d Mon Sep 17 00:00:00 2001 From: Vivek Saini Date: Thu, 28 Apr 2022 21:34:10 +0000 Subject: [PATCH 7/9] Inclusive language fix --- tests/integration/psync2-reg-multimaster.tcl | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/integration/psync2-reg-multimaster.tcl b/tests/integration/psync2-reg-multimaster.tcl index 1f15d75bb..2e9b1187b 100644 --- a/tests/integration/psync2-reg-multimaster.tcl +++ b/tests/integration/psync2-reg-multimaster.tcl @@ -1,5 +1,5 @@ # Issue 3899 regression test. -# We create a chain of three instances: master -> slave -> slave2 +# We create a chain of three instances: master -> replica -> replica2 # and continuously break the link while traffic is generated by # keydb-benchmark. At the end we check that the data is the same # everywhere. @@ -24,12 +24,12 @@ start_server {overrides {active-replica {yes} multi-master {yes} client-output-b # Setup the replication and backlog parameters test "PSYNC2 #3899 regression: setup" { - $R(0) slaveof $R_host(1) $R_port(1) - $R(0) slaveof $R_host(2) $R_port(2) - $R(1) slaveof $R_host(0) $R_port(0) - $R(1) slaveof $R_host(2) $R_port(2) - $R(2) slaveof $R_host(0) $R_port(0) - $R(2) slaveof $R_host(1) $R_port(1) + $R(0) replicaof $R_host(1) $R_port(1) + $R(0) replicaof $R_host(2) $R_port(2) + $R(1) replicaof $R_host(0) $R_port(0) + $R(1) replicaof $R_host(2) $R_port(2) + $R(2) replicaof $R_host(0) $R_port(0) + $R(2) replicaof $R_host(1) $R_port(1) $R(0) set foo bar wait_for_condition 50 1000 { From d0386cadc614f395401339497ed3166f5a847303 Mon Sep 17 00:00:00 2001 From: Vivek Saini Date: Thu, 28 Apr 2022 21:36:01 +0000 Subject: [PATCH 8/9] Cleanup test suite --- tests/integration/replication-psync-multimaster.tcl | 9 --------- 1 file changed, 9 deletions(-) diff --git a/tests/integration/replication-psync-multimaster.tcl b/tests/integration/replication-psync-multimaster.tcl index 2665adf9e..65f718763 100644 --- a/tests/integration/replication-psync-multimaster.tcl +++ b/tests/integration/replication-psync-multimaster.tcl @@ -142,12 +142,3 @@ foreach mdl {no yes} { } $mdl $sdl 1 } } - -foreach mdl {no} { - foreach sdl {swapdb} { - test_psync {backlog expired} 3 100000000 1 3 { - assert {[s -1 sync_partial_err] > 0} - } $mdl $sdl 1 - } -} - From 5162693e1bb694c10c1657c85fd00483e688ced9 Mon Sep 17 00:00:00 2001 From: Vivek Saini Date: Thu, 28 Apr 2022 21:58:35 +0000 Subject: [PATCH 9/9] Updated test replica configs so tests make sense --- tests/integration/replication-psync-multimaster.tcl | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/integration/replication-psync-multimaster.tcl b/tests/integration/replication-psync-multimaster.tcl index 65f718763..4d52f13a1 100644 --- a/tests/integration/replication-psync-multimaster.tcl +++ b/tests/integration/replication-psync-multimaster.tcl @@ -24,6 +24,12 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reco $master config set repl-diskless-sync $mdl $master config set repl-diskless-sync-delay 1 $replica config set repl-diskless-load $sdl + + $replica config set repl-backlog-size $backlog_size + $replica config set repl-backlog-ttl $backlog_ttl + $replica config set repl-diskless-sync $mdl + $replica config set repl-diskless-sync-delay 1 + $master config set repl-diskless-load $sdl test {Replica should be able to synchronize with the master} { $replica replicaof $master_host $master_port