From 25ef65463e2c08191009c656883d6602d46ae342 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 10 Feb 2020 00:25:03 -0500 Subject: [PATCH 1/8] Ensure multi-master works for ring topologies Former-commit-id: a7cc3aac28ccec4dadb80aa2cc7279c53982bc28 --- src/multi.cpp | 8 ++- src/replication.cpp | 77 ++++++++++++++++++++---- src/server.h | 2 + tests/integration/replication-active.tcl | 18 ++++++ tests/test_helper.tcl | 1 + tests/unit/rreplay.tcl | 4 +- 6 files changed, 94 insertions(+), 16 deletions(-) diff --git a/src/multi.cpp b/src/multi.cpp index 2018e08d9..c9485b0ae 100644 --- a/src/multi.cpp +++ b/src/multi.cpp @@ -28,6 +28,7 @@ */ #include "server.h" +bool FInReplicaReplay(); /* ================================ MULTI/EXEC ============================== */ @@ -172,12 +173,15 @@ void execCommand(client *c) { * This way we'll deliver the MULTI/..../EXEC block as a whole and * both the AOF and the replication link will have the same consistency * and atomicity guarantees. */ - if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) { + if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN)) && !(FInReplicaReplay())) { execCommandPropagateMulti(c); must_propagate = 1; } - call(c,g_pserver->loading ? CMD_CALL_NONE : CMD_CALL_FULL); + int flags = g_pserver->loading ? CMD_CALL_NONE : CMD_CALL_FULL; + if (FInReplicaReplay()) + flags &= ~CMD_CALL_PROPAGATE; + call(c,flags); /* Commands may alter argc/argv, restore mstate. */ c->mstate.commands[j].argc = c->argc; diff --git a/src/replication.cpp b/src/replication.cpp index b8236420d..e42872cd4 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -42,6 +42,7 @@ #include #include #include +#include void replicationDiscardCachedMaster(redisMaster *mi); void replicationResurrectCachedMaster(redisMaster *mi, int newfd); @@ -353,6 +354,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { cchDbNum = std::min(cchDbNum, sizeof(szDbNum)); // snprintf is tricky like that char szMvcc[128]; + incrementMvccTstamp(); uint64_t mvccTstamp = getMvccTstamp(); int cchMvccNum = snprintf(szMvcc, sizeof(szMvcc), "%lu", mvccTstamp); int cchMvcc = snprintf(szMvcc, sizeof(szMvcc), "$%d\r\n%lu\r\n", cchMvccNum, mvccTstamp); @@ -432,6 +434,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { clientReplyBlock* reply = (clientReplyBlock*)listNodeValue(lnReply); addReplyProtoAsync(replica, reply->buf(), reply->used); } + if (!fSendRaw) { addReplyAsync(replica,shared.crlf); @@ -2420,6 +2423,8 @@ void freeMasterInfo(redisMaster *mi) { zfree(mi->masterauth); zfree(mi->masteruser); + if (mi->clientFake) + freeClient(mi->clientFake); delete mi->staleKeyMap; zfree(mi); } @@ -2477,6 +2482,11 @@ void replicationHandleMasterDisconnection(redisMaster *mi) { mi->master = NULL; mi->repl_state = REPL_STATE_CONNECT; mi->repl_down_since = g_pserver->unixtime; + if (mi->clientFake) { + freeClient(mi->clientFake); + mi->clientFake = nullptr; + + } /* We lost connection with our master, don't disconnect slaves yet, * maybe we'll be able to PSYNC with our master later. We'll disconnect * the slaves only if we'll have to do a full resync with our master. */ @@ -3344,14 +3354,33 @@ public: return m_cnesting == 1; } + redisMaster *getMi(client *c) + { + if (m_mi == nullptr) + m_mi = MasterInfoFromClient(c); + return m_mi; + } + + int nesting() const { return m_cnesting; } + private: int m_cnesting = 0; bool m_fCancelled = false; + redisMaster *m_mi = nullptr; }; +static thread_local ReplicaNestState *s_pstate = nullptr; + +bool FInReplicaReplay() +{ + return s_pstate != nullptr && s_pstate->nesting() > 0; +} + + +static std::unordered_map g_mapmvcc; + void replicaReplayCommand(client *c) { - static thread_local ReplicaNestState *s_pstate = nullptr; if (s_pstate == nullptr) s_pstate = new (MALLOC_LOCAL) ReplicaNestState; @@ -3375,9 +3404,10 @@ void replicaReplayCommand(client *c) return; } - unsigned char uuid[UUID_BINARY_LEN]; + std::string uuid; + uuid.resize(UUID_BINARY_LEN); if (c->argv[1]->type != OBJ_STRING || sdslen((sds)ptrFromObj(c->argv[1])) != 36 - || uuid_parse((sds)ptrFromObj(c->argv[1]), uuid) != 0) + || uuid_parse((sds)ptrFromObj(c->argv[1]), (unsigned char*)uuid.data()) != 0) { addReplyError(c, "Expected UUID arg1"); s_pstate->Cancel(); @@ -3413,7 +3443,7 @@ void replicaReplayCommand(client *c) } } - if (FSameUuidNoNil(uuid, cserver.uuid)) + if (FSameUuidNoNil((unsigned char*)uuid.data(), cserver.uuid)) { addReply(c, shared.ok); s_pstate->Cancel(); @@ -3423,33 +3453,56 @@ void replicaReplayCommand(client *c) if (!s_pstate->FPush()) return; + redisMaster *mi = s_pstate->getMi(c); + client *cFake = mi->clientFake; + if (mi->clientFakeNesting != s_pstate->nesting()) + cFake = nullptr; + serverAssert(mi != nullptr); + if (mvcc != 0 && g_mapmvcc[uuid] >= mvcc && (mi->clientFake == nullptr)) + { + s_pstate->Cancel(); + s_pstate->Pop(); + return; + } + // OK We've recieved a command lets execute client *current_clientSave = serverTL->current_client; - client *cFake = createClient(-1, c->iel); + if (cFake == nullptr) + cFake = createClient(-1, c->iel); cFake->lock.lock(); cFake->authenticated = c->authenticated; cFake->puser = c->puser; cFake->querybuf = sdscatsds(cFake->querybuf,(sds)ptrFromObj(c->argv[2])); selectDb(cFake, c->db->id); auto ccmdPrev = serverTL->commandsExecuted; + cFake->flags |= CLIENT_MASTER | CLIENT_PREVENT_REPL_PROP; processInputBuffer(cFake, (CMD_CALL_FULL & (~CMD_CALL_PROPAGATE))); + cFake->flags &= ~(CLIENT_MASTER | CLIENT_PREVENT_REPL_PROP); bool fExec = ccmdPrev != serverTL->commandsExecuted; cFake->lock.unlock(); - if (fExec) + if (fExec || cFake->flags & CLIENT_MULTI) { addReply(c, shared.ok); selectDb(c, cFake->db->id); - redisMaster *mi = MasterInfoFromClient(c); - if (mi != nullptr) // this should never be null but I'd prefer not to crash - { - mi->mvccLastSync = mvcc; - } + g_mapmvcc[uuid] = mvcc; } else { + serverLog(LL_WARNING, "Command didn't execute: %s", cFake->buf); addReplyError(c, "command did not execute"); } - freeClient(cFake); + serverAssert(sdslen(cFake->querybuf) == 0); + if (cFake->flags & CLIENT_MULTI) + { + mi->clientFake = cFake; + mi->clientFakeNesting = s_pstate->nesting(); + } + else + { + if (mi->clientFake == cFake) + mi->clientFake = nullptr; + freeClient(cFake); + } serverTL->current_client = current_clientSave; // call() will not propogate this for us, so we do so here diff --git a/src/server.h b/src/server.h index 781f043f0..22d983b0f 100644 --- a/src/server.h +++ b/src/server.h @@ -1535,6 +1535,8 @@ struct redisMaster { int masterport; /* Port of master */ client *cached_master; /* Cached master to be reused for PSYNC. */ client *master; + client *clientFake; + int clientFakeNesting; /* The following two fields is where we store master PSYNC replid/offset * while the PSYNC is in progress. At the end we'll copy the fields into * the server->master client structure. */ diff --git a/tests/integration/replication-active.tcl b/tests/integration/replication-active.tcl index 4f37f2adf..0c28eb85d 100644 --- a/tests/integration/replication-active.tcl +++ b/tests/integration/replication-active.tcl @@ -59,6 +59,24 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} { } } + test {Active replicas propogate transaction} { + $master set testkey 0 + $master multi + $master incr testkey + $master incr testkey + after 5000 + $master get testkey + $master exec + assert_equal 2 [$master get testkey] + after 500 + wait_for_condition 50 500 { + [string match "2" [$slave get testkey]] + } else { + fail "Transaction failed to replicate" + } + $master flushall + } + test {Active replicas WAIT} { # Test that wait succeeds since replicas should be syncronized $master set testkey foo diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index 0711d4cc2..376004041 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -45,6 +45,7 @@ set ::all_tests { integration/replication-4 integration/replication-psync integration/replication-active + integration/replication-multimaster integration/aof integration/rdb integration/convert-zipmap-hash-on-load diff --git a/tests/unit/rreplay.tcl b/tests/unit/rreplay.tcl index 2fd1d3714..e11030f95 100644 --- a/tests/unit/rreplay.tcl +++ b/tests/unit/rreplay.tcl @@ -1,7 +1,7 @@ start_server {tags {"rreplay"} overrides {active-replica yes}} { test {RREPLAY use current db} { - r debug force-master flagonly + r debug force-master yes r select 4 r set dbnum invalid r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$5\r\ndbnum\r\n\$4\r\nfour\r\n" @@ -10,7 +10,7 @@ start_server {tags {"rreplay"} overrides {active-replica yes}} { reconnect test {RREPLAY db different} { - r debug force-master flagonly + r debug force-master yes r select 4 r set testkey four r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$7\r\ntestkey\r\n\$4\r\nbebe\r\n" 2 From eac3cffe416623af74a4d7b88fc39c5092d111a6 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 10 Feb 2020 00:29:26 -0500 Subject: [PATCH 2/8] CLANG build fix Former-commit-id: dc78bf1ccbd3dfd2de582d2a0d0be3223de3c7c3 --- src/replication.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/replication.cpp b/src/replication.cpp index e42872cd4..fc632a6cd 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -43,6 +43,7 @@ #include #include #include +#include void replicationDiscardCachedMaster(redisMaster *mi); void replicationResurrectCachedMaster(redisMaster *mi, int newfd); From d346ad77347901d519f87514b817fdbe1db3e895 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 10 Feb 2020 18:15:29 -0500 Subject: [PATCH 3/8] Add missing test file Former-commit-id: 0c101dccc825668cb7ff07c23e82db0f5642b786 --- tests/integration/replication-multimaster.tcl | 74 +++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100644 tests/integration/replication-multimaster.tcl diff --git a/tests/integration/replication-multimaster.tcl b/tests/integration/replication-multimaster.tcl new file mode 100644 index 000000000..e5e77fdad --- /dev/null +++ b/tests/integration/replication-multimaster.tcl @@ -0,0 +1,74 @@ +foreach topology {mesh ring} { +start_server {tags {"multi-master"} overrides {hz 500 active-replica yes multi-master yes}} { +start_server {overrides {hz 500 active-replica yes multi-master yes}} { +start_server {overrides {hz 500 active-replica yes multi-master yes}} { +start_server {overrides {hz 500 active-replica yes multi-master yes}} { + + for {set j 0} {$j < 4} {incr j} { + set R($j) [srv [expr 0-$j] client] + set R_host($j) [srv [expr 0-$j] host] + set R_port($j) [srv [expr 0-$j] port] + } + + # Initialize as mesh + if [string equal $topology "mesh"] { + for {set j 0} {$j < 4} {incr j} { + for {set k 0} {$k < 4} {incr k} { + if $j!=$k { + $R($j) replicaof $R_host($k) $R_port($k) + after 100 + } + } + }} + #Else Ring + if [string equal $topology "ring"] { + $R(0) replicaof $R_host(3) $R_port(3) + after 100 + $R(1) replicaof $R_host(0) $R_port(0) + after 100 + $R(2) replicaof $R_host(1) $R_port(1) + after 100 + $R(3) replicaof $R_host(2) $R_port(2) + } + + after 2000 + + test "$topology replicates to all nodes" { + $R(0) set testkey foo + after 500 + assert_equal foo [$R(1) get testkey] "replicates to 1" + assert_equal foo [$R(2) get testkey] "replicates to 2" + } + + test "$topology replicates only once" { + $R(0) set testkey 1 + after 500 + $R(1) incr testkey + after 500 + $R(2) incr testkey + after 500 + assert_equal 3 [$R(0) get testkey] + assert_equal 3 [$R(1) get testkey] + assert_equal 3 [$R(2) get testkey] + assert_equal 3 [$R(3) get testkey] + } + + test "$topology transaction replicates only once" { + for {set j 0} {$j < 1000} {incr j} { + $R(0) set testkey 1 + $R(0) multi + $R(0) incr testkey + $R(0) incr testkey + $R(0) exec + after 1 + assert_equal 3 [$R(0) get testkey] "node 0" + assert_equal 3 [$R(1) get testkey] "node 1" + assert_equal 3 [$R(2) get testkey] "node 2" + assert_equal 3 [$R(3) get testkey] "node 3" + } + } +} +} +} +} +} From 68235881e99a41b53cd2f0cc365edc9d0476d1ce Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 30 Jan 2020 17:55:48 -0500 Subject: [PATCH 4/8] Fix memory leak in cron Former-commit-id: f1748f8c7611ad96d7ba4fed66439cd1f043e6f3 --- src/cron.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/cron.cpp b/src/cron.cpp index fd777d43d..230cf4ed4 100644 --- a/src/cron.cpp +++ b/src/cron.cpp @@ -67,6 +67,7 @@ void cronCommand(client *c) robj *o = createObject(OBJ_CRON, spjob.release()); setKey(c->db, c->argv[ARG_NAME], o); + decrRefCount(o); // use an expire to trigger execution. Note: We use a subkey expire here so legacy clients don't delete it. setExpire(c, c->db, c->argv[ARG_NAME], c->argv[ARG_NAME], base + interval); addReply(c, shared.ok); From 30ece138d5e4faa414d98f585791538eb3c93455 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 10 Feb 2020 19:52:57 -0500 Subject: [PATCH 5/8] Fix issue #119 Former-commit-id: 46224721237616c345f6726b721a354d7bda71df --- src/server.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server.cpp b/src/server.cpp index c6f360905..2d6027348 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1690,7 +1690,7 @@ void clientsCron(int iel) { /* The following functions do different service checks on the client. * The protocol is that they return non-zero if the client was * terminated. */ - if (clientsCronHandleTimeout(c,now)) goto LContinue; + if (clientsCronHandleTimeout(c,now)) continue; // Client free'd so don't release the lock if (clientsCronResizeQueryBuffer(c)) goto LContinue; if (clientsCronTrackExpansiveClients(c)) goto LContinue; LContinue: From e4d74b993f928dce069996e32a90f4b98e17a4ca Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 11 Feb 2020 00:59:07 -0500 Subject: [PATCH 6/8] Fix cases where duplicate RREPLAY is applied Former-commit-id: c3317686f8b8d94a3b2295def899ae30e208f327 --- src/replication.cpp | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/src/replication.cpp b/src/replication.cpp index fc632a6cd..9b8570c49 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -1259,19 +1259,19 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) { aePostFunction(g_pserver->rgthreadvar[replica->iel].el, [replica] { // Because the client could have been closed while the lambda waited to run we need to - // verify the replica is still connected + // verify the replica is still connected listIter li; - listNode *ln; - listRewind(g_pserver->slaves,&li); - bool fFound = false; - while ((ln = listNext(&li))) { - if (listNodeValue(ln) == replica) { - fFound = true; - break; - } - } - if (!fFound) - return; + listNode *ln; + listRewind(g_pserver->slaves,&li); + bool fFound = false; + while ((ln = listNext(&li))) { + if (listNodeValue(ln) == replica) { + fFound = true; + break; + } + } + if (!fFound) + return; aeDeleteFileEvent(g_pserver->rgthreadvar[replica->iel].el,replica->fd,AE_WRITABLE); if (aeCreateFileEvent(g_pserver->rgthreadvar[replica->iel].el, replica->fd, AE_WRITABLE, sendBulkToSlave, replica) == AE_ERR) { freeClient(replica); @@ -2329,10 +2329,11 @@ int connectWithMaster(redisMaster *mi) { void undoConnectWithMaster(redisMaster *mi) { int fd = mi->repl_transfer_s; - aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [fd]{ + int res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [fd]{ aeDeleteFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,fd,AE_READABLE|AE_WRITABLE); close(fd); }); + serverAssert(res == AE_OK); mi->repl_transfer_s = -1; } @@ -3459,7 +3460,7 @@ void replicaReplayCommand(client *c) if (mi->clientFakeNesting != s_pstate->nesting()) cFake = nullptr; serverAssert(mi != nullptr); - if (mvcc != 0 && g_mapmvcc[uuid] >= mvcc && (mi->clientFake == nullptr)) + if (mvcc != 0 && g_mapmvcc[uuid] >= mvcc) { s_pstate->Cancel(); s_pstate->Pop(); @@ -3485,7 +3486,8 @@ void replicaReplayCommand(client *c) { addReply(c, shared.ok); selectDb(c, cFake->db->id); - g_mapmvcc[uuid] = mvcc; + if (mvcc > g_mapmvcc[uuid]) + g_mapmvcc[uuid] = mvcc; } else { From fef9925b7f90e622cfded283d8cd637c68def82a Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 11 Feb 2020 01:00:21 -0500 Subject: [PATCH 7/8] Fix higher latency at low load by grouping clients to threads. This fixes slow perf in cluster benchmarks mentioned in issue #102 Former-commit-id: 1a4c3224c9848f02fbdb49674045b593cfc41d31 --- src/ae.cpp | 5 +++- src/aof.cpp | 9 ++++--- src/cluster.cpp | 12 +++++++++- src/config.cpp | 5 ++++ src/networking.cpp | 58 ++++++++++++++++++++++++++++++++++++++++++---- src/server.h | 1 + 6 files changed, 81 insertions(+), 9 deletions(-) diff --git a/src/ae.cpp b/src/ae.cpp index 90c148510..b92cd4a67 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -273,7 +273,8 @@ int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg) cmd.proc = proc; cmd.clientData = arg; auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); - AE_ASSERT(size == sizeof(cmd)); + if (size != sizeof(cmd)) + return AE_ERR; return AE_OK; } @@ -296,6 +297,8 @@ int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynch } auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); + if (size != sizeof(cmd)) + return AE_ERR; AE_ASSERT(size == sizeof(cmd)); int ret = AE_OK; if (fSynchronous) diff --git a/src/aof.cpp b/src/aof.cpp index b82be9a34..65647bbba 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -167,11 +167,12 @@ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) { * not one already. */ if (!g_pserver->aof_rewrite_pending) { g_pserver->aof_rewrite_pending = true; - aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [] { + int res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [] { g_pserver->aof_rewrite_pending = false; if (g_pserver->aof_pipe_write_data_to_child >= 0) aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL); }); + serverAssert(res == AE_OK); // we can't handle an error here } } @@ -1563,16 +1564,18 @@ error: void aofClosePipes(void) { int fdAofAckPipe = g_pserver->aof_pipe_read_ack_from_child; - aePostFunction(g_pserver->el_alf_pip_read_ack_from_child, [fdAofAckPipe]{ + int res = aePostFunction(g_pserver->el_alf_pip_read_ack_from_child, [fdAofAckPipe]{ aeDeleteFileEventAsync(serverTL->el,fdAofAckPipe,AE_READABLE); close (fdAofAckPipe); }); + serverAssert(res == AE_OK); int fdAofWritePipe = g_pserver->aof_pipe_write_data_to_child; - aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [fdAofWritePipe]{ + res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [fdAofWritePipe]{ aeDeleteFileEventAsync(serverTL->el,fdAofWritePipe,AE_WRITABLE); close(fdAofWritePipe); }); + serverAssert(res == AE_OK); g_pserver->aof_pipe_write_data_to_child = -1; close(g_pserver->aof_pipe_read_data_from_parent); diff --git a/src/cluster.cpp b/src/cluster.cpp index f6a6e03dc..c20b0f4c4 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -295,6 +295,15 @@ int clusterLoadConfig(char *filename) { if (clusterGetMaxEpoch() > g_pserver->cluster->currentEpoch) { g_pserver->cluster->currentEpoch = clusterGetMaxEpoch(); } + + if (dictSize(g_pserver->cluster->nodes) > 1 && cserver.thread_min_client_threshold < 100) + { + // Because we expect the individual load of a client to be much less in a cluster (it will spread over multiple server) + // we can increase the grouping of clients on a single thread within reason + cserver.thread_min_client_threshold *= dictSize(g_pserver->cluster->nodes); + cserver.thread_min_client_threshold = std::min(cserver.thread_min_client_threshold, 200); + serverLog(LL_NOTICE, "Expanding min-clients-per-thread to %d due to cluster", cserver.thread_min_client_threshold); + } return C_OK; fmterr: @@ -623,9 +632,10 @@ void freeClusterLink(clusterLink *link) { if (link->node) link->node->link = NULL; link->node = nullptr; - aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [link]{ + int res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [link]{ freeClusterLink(link); }); + serverAssert(res == AE_OK); return; } if (link->fd != -1) { diff --git a/src/config.cpp b/src/config.cpp index c056b98dc..2aaad825e 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -805,6 +805,11 @@ void loadServerConfigFromString(char *config) { } else if (!strcasecmp(argv[0],"enable-pro")) { cserver.fUsePro = true; break; + } else if (!strcasecmp(argv[0],"min-clients-per-thread") && argc == 2) { + cserver.thread_min_client_threshold = atoi(argv[1]); + if (cserver.thread_min_client_threshold < 0 || cserver.thread_min_client_threshold > 400) { + err = "min-thread-client must be between 0 and 400"; goto loaderr; + } } else { err = "Bad directive or wrong number of arguments"; goto loaderr; } diff --git a/src/networking.cpp b/src/networking.cpp index 97744c410..097df0f87 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1003,6 +1003,33 @@ int clientHasPendingReplies(client *c) { return (c->bufpos || listLength(c->reply)) && !(c->flags & CLIENT_CLOSE_ASAP); } +int chooseBestThreadForAccept(int ielCur) +{ + listIter li; + listNode *ln; + int rgcclients[MAX_EVENT_LOOPS] = {0}; + + listRewind(g_pserver->clients, &li); + while ((ln = listNext(&li)) != nullptr) + { + client *c = (client*)listNodeValue(ln); + if (c->iel < 0) + continue; + + rgcclients[c->iel]++; + } + + int ielMinLoad = 0; + for (int iel = 0; iel < cserver.cthreads; ++iel) + { + if (rgcclients[iel] < cserver.thread_min_client_threshold) + return iel; + if (rgcclients[iel] < rgcclients[ielMinLoad]) + ielMinLoad = iel; + } + return ielMinLoad; +} + #define MAX_ACCEPTS_PER_CALL 1000 static void acceptCommonHandler(int fd, int flags, char *ip, int iel) { client *c; @@ -1105,7 +1132,22 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { if (!g_fTestMode) { - // We always accept on the same thread + { + int ielTarget = chooseBestThreadForAccept(ielCur); + if (ielTarget != ielCur) + { + char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL); + memcpy(szT, cip, NET_IP_STR_LEN); + int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget, szT]{ + acceptCommonHandler(cfd,0,szT, ielTarget); + zfree(szT); + }); + + if (res == AE_OK) + continue; + } + } + LLocalThread: aeAcquireLock(); acceptCommonHandler(cfd,0,cip, ielCur); @@ -1122,10 +1164,15 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { goto LLocalThread; char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL); memcpy(szT, cip, NET_IP_STR_LEN); - aePostFunction(g_pserver->rgthreadvar[iel].el, [cfd, iel, szT]{ + int res = aePostFunction(g_pserver->rgthreadvar[iel].el, [cfd, iel, szT]{ acceptCommonHandler(cfd,0,szT, iel); zfree(szT); }); + if (res != AE_OK) + { + zfree(szT); + goto LLocalThread; + } } } } @@ -1151,13 +1198,16 @@ void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int ielTarget = rand() % cserver.cthreads; if (ielTarget == ielCur) { + LLocalThread: acceptCommonHandler(cfd,CLIENT_UNIX_SOCKET,NULL, ielCur); } else { - aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget]{ + int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget]{ acceptCommonHandler(cfd,CLIENT_UNIX_SOCKET,NULL, ielTarget); }); + if (res != AE_OK) + goto LLocalThread; } aeReleaseLock(); @@ -2529,7 +2579,7 @@ NULL { int iel = client->iel; freeClientAsync(client); - aePostFunction(g_pserver->rgthreadvar[client->iel].el, [iel] { + aePostFunction(g_pserver->rgthreadvar[client->iel].el, [iel] { // note: failure is OK freeClientsInAsyncFreeQueue(iel); }); } diff --git a/src/server.h b/src/server.h index 22d983b0f..3ff023677 100644 --- a/src/server.h +++ b/src/server.h @@ -1604,6 +1604,7 @@ struct redisServerConst { unsigned char uuid[UUID_BINARY_LEN]; /* This server's UUID - populated on boot */ bool fUsePro = false; + int thread_min_client_threshold = 50; }; struct redisServer { From d4c1e981247cb8ac2c33d64dcef9937ee422f451 Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 11 Feb 2020 01:41:00 -0500 Subject: [PATCH 8/8] Implement an error handler so bug #125 can't happen Former-commit-id: 16a019dba053fd0654116ff98a2ad0b66a9ed4e6 --- src/aof.cpp | 28 +++++++++++++++++++--------- src/networking.cpp | 4 ++-- src/server.cpp | 1 + src/server.h | 1 + 4 files changed, 23 insertions(+), 11 deletions(-) diff --git a/src/aof.cpp b/src/aof.cpp index 65647bbba..de8a8260e 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -124,6 +124,21 @@ void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) { } } +void installAofRewriteEvent() +{ + serverTL->fRetrySetAofEvent = false; + if (!g_pserver->aof_rewrite_pending) { + g_pserver->aof_rewrite_pending = true; + int res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [] { + g_pserver->aof_rewrite_pending = false; + if (g_pserver->aof_pipe_write_data_to_child >= 0) + aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL); + }); + if (res != AE_OK) + serverTL->fRetrySetAofEvent = true; + } +} + /* Append data to the AOF rewrite buffer, allocating new blocks if needed. */ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) { listNode *ln = listLast(g_pserver->aof_rewrite_buf_blocks); @@ -165,15 +180,7 @@ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) { /* Install a file event to send data to the rewrite child if there is * not one already. */ - if (!g_pserver->aof_rewrite_pending) { - g_pserver->aof_rewrite_pending = true; - int res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [] { - g_pserver->aof_rewrite_pending = false; - if (g_pserver->aof_pipe_write_data_to_child >= 0) - aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL); - }); - serverAssert(res == AE_OK); // we can't handle an error here - } + installAofRewriteEvent(); } /* Write the buffer (possibly composed of multiple blocks) into the specified @@ -349,6 +356,9 @@ void flushAppendOnlyFile(int force) { int sync_in_progress = 0; mstime_t latency; + if (serverTL->fRetrySetAofEvent) + installAofRewriteEvent(); + if (sdslen(g_pserver->aof_buf) == 0) { /* Check if we need to do fsync even the aof buffer is empty, * because previously in AOF_FSYNC_EVERYSEC mode, fsync is diff --git a/src/networking.cpp b/src/networking.cpp index 097df0f87..54e04406f 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1003,7 +1003,7 @@ int clientHasPendingReplies(client *c) { return (c->bufpos || listLength(c->reply)) && !(c->flags & CLIENT_CLOSE_ASAP); } -int chooseBestThreadForAccept(int ielCur) +int chooseBestThreadForAccept() { listIter li; listNode *ln; @@ -1133,7 +1133,7 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { if (!g_fTestMode) { { - int ielTarget = chooseBestThreadForAccept(ielCur); + int ielTarget = chooseBestThreadForAccept(); if (ielTarget != ielCur) { char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL); diff --git a/src/server.cpp b/src/server.cpp index 2d6027348..15f52ab52 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2884,6 +2884,7 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain) pvar->el = aeCreateEventLoop(g_pserver->maxclients+CONFIG_FDSET_INCR); pvar->current_client = nullptr; pvar->clients_paused = 0; + pvar->fRetrySetAofEvent = false; if (pvar->el == NULL) { serverLog(LL_WARNING, "Failed creating the event loop. Error message: '%s'", diff --git a/src/server.h b/src/server.h index 3ff023677..3ec2e8948 100644 --- a/src/server.h +++ b/src/server.h @@ -1526,6 +1526,7 @@ struct redisServerThreadVars { struct fastlock lockPendingWrite { "thread pending write" }; char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ long unsigned commandsExecuted = 0; + bool fRetrySetAofEvent = false; }; struct redisMaster {