diff --git a/src/multi.cpp b/src/multi.cpp index 2018e08d9..c9485b0ae 100644 --- a/src/multi.cpp +++ b/src/multi.cpp @@ -28,6 +28,7 @@ */ #include "server.h" +bool FInReplicaReplay(); /* ================================ MULTI/EXEC ============================== */ @@ -172,12 +173,15 @@ void execCommand(client *c) { * This way we'll deliver the MULTI/..../EXEC block as a whole and * both the AOF and the replication link will have the same consistency * and atomicity guarantees. */ - if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) { + if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN)) && !(FInReplicaReplay())) { execCommandPropagateMulti(c); must_propagate = 1; } - call(c,g_pserver->loading ? CMD_CALL_NONE : CMD_CALL_FULL); + int flags = g_pserver->loading ? CMD_CALL_NONE : CMD_CALL_FULL; + if (FInReplicaReplay()) + flags &= ~CMD_CALL_PROPAGATE; + call(c,flags); /* Commands may alter argc/argv, restore mstate. */ c->mstate.commands[j].argc = c->argc; diff --git a/src/replication.cpp b/src/replication.cpp index b8236420d..e42872cd4 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -42,6 +42,7 @@ #include #include #include +#include void replicationDiscardCachedMaster(redisMaster *mi); void replicationResurrectCachedMaster(redisMaster *mi, int newfd); @@ -353,6 +354,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { cchDbNum = std::min(cchDbNum, sizeof(szDbNum)); // snprintf is tricky like that char szMvcc[128]; + incrementMvccTstamp(); uint64_t mvccTstamp = getMvccTstamp(); int cchMvccNum = snprintf(szMvcc, sizeof(szMvcc), "%lu", mvccTstamp); int cchMvcc = snprintf(szMvcc, sizeof(szMvcc), "$%d\r\n%lu\r\n", cchMvccNum, mvccTstamp); @@ -432,6 +434,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { clientReplyBlock* reply = (clientReplyBlock*)listNodeValue(lnReply); addReplyProtoAsync(replica, reply->buf(), reply->used); } + if (!fSendRaw) { addReplyAsync(replica,shared.crlf); @@ -2420,6 +2423,8 @@ void freeMasterInfo(redisMaster *mi) { zfree(mi->masterauth); zfree(mi->masteruser); + if (mi->clientFake) + freeClient(mi->clientFake); delete mi->staleKeyMap; zfree(mi); } @@ -2477,6 +2482,11 @@ void replicationHandleMasterDisconnection(redisMaster *mi) { mi->master = NULL; mi->repl_state = REPL_STATE_CONNECT; mi->repl_down_since = g_pserver->unixtime; + if (mi->clientFake) { + freeClient(mi->clientFake); + mi->clientFake = nullptr; + + } /* We lost connection with our master, don't disconnect slaves yet, * maybe we'll be able to PSYNC with our master later. We'll disconnect * the slaves only if we'll have to do a full resync with our master. */ @@ -3344,14 +3354,33 @@ public: return m_cnesting == 1; } + redisMaster *getMi(client *c) + { + if (m_mi == nullptr) + m_mi = MasterInfoFromClient(c); + return m_mi; + } + + int nesting() const { return m_cnesting; } + private: int m_cnesting = 0; bool m_fCancelled = false; + redisMaster *m_mi = nullptr; }; +static thread_local ReplicaNestState *s_pstate = nullptr; + +bool FInReplicaReplay() +{ + return s_pstate != nullptr && s_pstate->nesting() > 0; +} + + +static std::unordered_map g_mapmvcc; + void replicaReplayCommand(client *c) { - static thread_local ReplicaNestState *s_pstate = nullptr; if (s_pstate == nullptr) s_pstate = new (MALLOC_LOCAL) ReplicaNestState; @@ -3375,9 +3404,10 @@ void replicaReplayCommand(client *c) return; } - unsigned char uuid[UUID_BINARY_LEN]; + std::string uuid; + uuid.resize(UUID_BINARY_LEN); if (c->argv[1]->type != OBJ_STRING || sdslen((sds)ptrFromObj(c->argv[1])) != 36 - || uuid_parse((sds)ptrFromObj(c->argv[1]), uuid) != 0) + || uuid_parse((sds)ptrFromObj(c->argv[1]), (unsigned char*)uuid.data()) != 0) { addReplyError(c, "Expected UUID arg1"); s_pstate->Cancel(); @@ -3413,7 +3443,7 @@ void replicaReplayCommand(client *c) } } - if (FSameUuidNoNil(uuid, cserver.uuid)) + if (FSameUuidNoNil((unsigned char*)uuid.data(), cserver.uuid)) { addReply(c, shared.ok); s_pstate->Cancel(); @@ -3423,33 +3453,56 @@ void replicaReplayCommand(client *c) if (!s_pstate->FPush()) return; + redisMaster *mi = s_pstate->getMi(c); + client *cFake = mi->clientFake; + if (mi->clientFakeNesting != s_pstate->nesting()) + cFake = nullptr; + serverAssert(mi != nullptr); + if (mvcc != 0 && g_mapmvcc[uuid] >= mvcc && (mi->clientFake == nullptr)) + { + s_pstate->Cancel(); + s_pstate->Pop(); + return; + } + // OK We've recieved a command lets execute client *current_clientSave = serverTL->current_client; - client *cFake = createClient(-1, c->iel); + if (cFake == nullptr) + cFake = createClient(-1, c->iel); cFake->lock.lock(); cFake->authenticated = c->authenticated; cFake->puser = c->puser; cFake->querybuf = sdscatsds(cFake->querybuf,(sds)ptrFromObj(c->argv[2])); selectDb(cFake, c->db->id); auto ccmdPrev = serverTL->commandsExecuted; + cFake->flags |= CLIENT_MASTER | CLIENT_PREVENT_REPL_PROP; processInputBuffer(cFake, (CMD_CALL_FULL & (~CMD_CALL_PROPAGATE))); + cFake->flags &= ~(CLIENT_MASTER | CLIENT_PREVENT_REPL_PROP); bool fExec = ccmdPrev != serverTL->commandsExecuted; cFake->lock.unlock(); - if (fExec) + if (fExec || cFake->flags & CLIENT_MULTI) { addReply(c, shared.ok); selectDb(c, cFake->db->id); - redisMaster *mi = MasterInfoFromClient(c); - if (mi != nullptr) // this should never be null but I'd prefer not to crash - { - mi->mvccLastSync = mvcc; - } + g_mapmvcc[uuid] = mvcc; } else { + serverLog(LL_WARNING, "Command didn't execute: %s", cFake->buf); addReplyError(c, "command did not execute"); } - freeClient(cFake); + serverAssert(sdslen(cFake->querybuf) == 0); + if (cFake->flags & CLIENT_MULTI) + { + mi->clientFake = cFake; + mi->clientFakeNesting = s_pstate->nesting(); + } + else + { + if (mi->clientFake == cFake) + mi->clientFake = nullptr; + freeClient(cFake); + } serverTL->current_client = current_clientSave; // call() will not propogate this for us, so we do so here diff --git a/src/server.h b/src/server.h index 781f043f0..22d983b0f 100644 --- a/src/server.h +++ b/src/server.h @@ -1535,6 +1535,8 @@ struct redisMaster { int masterport; /* Port of master */ client *cached_master; /* Cached master to be reused for PSYNC. */ client *master; + client *clientFake; + int clientFakeNesting; /* The following two fields is where we store master PSYNC replid/offset * while the PSYNC is in progress. At the end we'll copy the fields into * the server->master client structure. */ diff --git a/tests/integration/replication-active.tcl b/tests/integration/replication-active.tcl index 4f37f2adf..0c28eb85d 100644 --- a/tests/integration/replication-active.tcl +++ b/tests/integration/replication-active.tcl @@ -59,6 +59,24 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} { } } + test {Active replicas propogate transaction} { + $master set testkey 0 + $master multi + $master incr testkey + $master incr testkey + after 5000 + $master get testkey + $master exec + assert_equal 2 [$master get testkey] + after 500 + wait_for_condition 50 500 { + [string match "2" [$slave get testkey]] + } else { + fail "Transaction failed to replicate" + } + $master flushall + } + test {Active replicas WAIT} { # Test that wait succeeds since replicas should be syncronized $master set testkey foo diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index 0711d4cc2..376004041 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -45,6 +45,7 @@ set ::all_tests { integration/replication-4 integration/replication-psync integration/replication-active + integration/replication-multimaster integration/aof integration/rdb integration/convert-zipmap-hash-on-load diff --git a/tests/unit/rreplay.tcl b/tests/unit/rreplay.tcl index 2fd1d3714..e11030f95 100644 --- a/tests/unit/rreplay.tcl +++ b/tests/unit/rreplay.tcl @@ -1,7 +1,7 @@ start_server {tags {"rreplay"} overrides {active-replica yes}} { test {RREPLAY use current db} { - r debug force-master flagonly + r debug force-master yes r select 4 r set dbnum invalid r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$5\r\ndbnum\r\n\$4\r\nfour\r\n" @@ -10,7 +10,7 @@ start_server {tags {"rreplay"} overrides {active-replica yes}} { reconnect test {RREPLAY db different} { - r debug force-master flagonly + r debug force-master yes r select 4 r set testkey four r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$7\r\ntestkey\r\n\$4\r\nbebe\r\n" 2