diff --git a/src/replication.cpp b/src/replication.cpp index 35a460bac..75cdca40d 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -339,7 +339,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { cchbuf += reply->used; } - bool fSendRaw = !server.fActiveReplica || (argc >= 1 && lookupCommand((sds)ptrFromObj(argv[0])) == server.rreplayCommand); + bool fSendRaw = !server.fActiveReplica; serverAssert(argc > 0); serverAssert(cchbuf > 0); @@ -3141,8 +3141,62 @@ redisMaster *MasterInfoFromClient(client *c) return nullptr; } +#define REPLAY_MAX_NESTING 64 +class ReplicaNestState +{ +public: + bool FPush(unsigned char *uuid) + { + if (m_cnesting == REPLAY_MAX_NESTING) { + m_fCancelled = true; + return false; // overflow + } + for (int i = 0; i < m_cnesting; ++i) + { + if (FUuidEqual(m_stackUUID[i], uuid)) { + m_fCancelled = true; + return false; // cycle detected + } + } + if (m_cnesting == 0) + m_fCancelled = false; + memcpy(m_stackUUID[m_cnesting], uuid, UUID_BINARY_LEN); + ++m_cnesting; + return true; + } + + void Pop() + { + --m_cnesting; + } + + void Cancel() + { + m_fCancelled = true; + } + + bool FCancelled() const + { + return m_fCancelled; + } + + bool FFirst() const + { + return m_cnesting == 1; + } + +private: + int m_cnesting = 0; + bool m_fCancelled = false; + unsigned char m_stackUUID[REPLAY_MAX_NESTING][UUID_BINARY_LEN]; +}; + void replicaReplayCommand(client *c) { + static thread_local ReplicaNestState *s_pstate = nullptr; + if (s_pstate == nullptr) + s_pstate = new ReplicaNestState; + // the replay command contains two arguments: // 1: The UUID of the source // 2: The raw command buffer to be replayed @@ -3150,6 +3204,7 @@ void replicaReplayCommand(client *c) if (!(c->flags & CLIENT_MASTER)) { addReplyError(c, "Command must be sent from a master"); + s_pstate->Cancel(); return; } @@ -3157,6 +3212,7 @@ void replicaReplayCommand(client *c) if (c->argc != 3) { addReplyError(c, "Invalid number of arguments"); + s_pstate->Cancel(); return; } @@ -3165,21 +3221,27 @@ void replicaReplayCommand(client *c) || uuid_parse((sds)ptrFromObj(c->argv[1]), uuid) != 0) { addReplyError(c, "Expected UUID arg1"); + s_pstate->Cancel(); return; } if (c->argv[2]->type != OBJ_STRING) { addReplyError(c, "Expected command buffer arg2"); + s_pstate->Cancel(); return; } if (FSameUuidNoNil(uuid, server.uuid)) { addReply(c, shared.ok); + s_pstate->Cancel(); return; // Our own commands have come back to us. Ignore them. } + if (!s_pstate->FPush(uuid)) + return; + // OK We've recieved a command lets execute client *cFake = createClient(-1, c->iel); cFake->lock.lock(); @@ -3193,7 +3255,10 @@ void replicaReplayCommand(client *c) freeClient(cFake); // call() will not propogate this for us, so we do so here - alsoPropagate(server.rreplayCommand,c->db->id,c->argv,c->argc,PROPAGATE_AOF|PROPAGATE_REPL); + if (!s_pstate->FCancelled() && s_pstate->FFirst()) + alsoPropagate(server.rreplayCommand,c->db->id,c->argv,c->argc,PROPAGATE_AOF|PROPAGATE_REPL); + + s_pstate->Pop(); return; }