Detect cycles in mesh networks with multi master

Former-commit-id: da51ac430b43628f5cb5c1f0f005d937ed32660b
This commit is contained in:
John Sully 2019-04-19 19:31:11 -04:00
parent a8d89641cf
commit daed37e6c4

View File

@ -339,7 +339,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
cchbuf += reply->used; cchbuf += reply->used;
} }
bool fSendRaw = !server.fActiveReplica || (argc >= 1 && lookupCommand((sds)ptrFromObj(argv[0])) == server.rreplayCommand); bool fSendRaw = !server.fActiveReplica;
serverAssert(argc > 0); serverAssert(argc > 0);
serverAssert(cchbuf > 0); serverAssert(cchbuf > 0);
@ -3141,8 +3141,62 @@ redisMaster *MasterInfoFromClient(client *c)
return nullptr; return nullptr;
} }
#define REPLAY_MAX_NESTING 64
class ReplicaNestState
{
public:
bool FPush(unsigned char *uuid)
{
if (m_cnesting == REPLAY_MAX_NESTING) {
m_fCancelled = true;
return false; // overflow
}
for (int i = 0; i < m_cnesting; ++i)
{
if (FUuidEqual(m_stackUUID[i], uuid)) {
m_fCancelled = true;
return false; // cycle detected
}
}
if (m_cnesting == 0)
m_fCancelled = false;
memcpy(m_stackUUID[m_cnesting], uuid, UUID_BINARY_LEN);
++m_cnesting;
return true;
}
void Pop()
{
--m_cnesting;
}
void Cancel()
{
m_fCancelled = true;
}
bool FCancelled() const
{
return m_fCancelled;
}
bool FFirst() const
{
return m_cnesting == 1;
}
private:
int m_cnesting = 0;
bool m_fCancelled = false;
unsigned char m_stackUUID[REPLAY_MAX_NESTING][UUID_BINARY_LEN];
};
void replicaReplayCommand(client *c) void replicaReplayCommand(client *c)
{ {
static thread_local ReplicaNestState *s_pstate = nullptr;
if (s_pstate == nullptr)
s_pstate = new ReplicaNestState;
// the replay command contains two arguments: // the replay command contains two arguments:
// 1: The UUID of the source // 1: The UUID of the source
// 2: The raw command buffer to be replayed // 2: The raw command buffer to be replayed
@ -3150,6 +3204,7 @@ void replicaReplayCommand(client *c)
if (!(c->flags & CLIENT_MASTER)) if (!(c->flags & CLIENT_MASTER))
{ {
addReplyError(c, "Command must be sent from a master"); addReplyError(c, "Command must be sent from a master");
s_pstate->Cancel();
return; return;
} }
@ -3157,6 +3212,7 @@ void replicaReplayCommand(client *c)
if (c->argc != 3) if (c->argc != 3)
{ {
addReplyError(c, "Invalid number of arguments"); addReplyError(c, "Invalid number of arguments");
s_pstate->Cancel();
return; return;
} }
@ -3165,21 +3221,27 @@ void replicaReplayCommand(client *c)
|| uuid_parse((sds)ptrFromObj(c->argv[1]), uuid) != 0) || uuid_parse((sds)ptrFromObj(c->argv[1]), uuid) != 0)
{ {
addReplyError(c, "Expected UUID arg1"); addReplyError(c, "Expected UUID arg1");
s_pstate->Cancel();
return; return;
} }
if (c->argv[2]->type != OBJ_STRING) if (c->argv[2]->type != OBJ_STRING)
{ {
addReplyError(c, "Expected command buffer arg2"); addReplyError(c, "Expected command buffer arg2");
s_pstate->Cancel();
return; return;
} }
if (FSameUuidNoNil(uuid, server.uuid)) if (FSameUuidNoNil(uuid, server.uuid))
{ {
addReply(c, shared.ok); addReply(c, shared.ok);
s_pstate->Cancel();
return; // Our own commands have come back to us. Ignore them. return; // Our own commands have come back to us. Ignore them.
} }
if (!s_pstate->FPush(uuid))
return;
// OK We've recieved a command lets execute // OK We've recieved a command lets execute
client *cFake = createClient(-1, c->iel); client *cFake = createClient(-1, c->iel);
cFake->lock.lock(); cFake->lock.lock();
@ -3193,7 +3255,10 @@ void replicaReplayCommand(client *c)
freeClient(cFake); freeClient(cFake);
// call() will not propogate this for us, so we do so here // call() will not propogate this for us, so we do so here
alsoPropagate(server.rreplayCommand,c->db->id,c->argv,c->argc,PROPAGATE_AOF|PROPAGATE_REPL); if (!s_pstate->FCancelled() && s_pstate->FFirst())
alsoPropagate(server.rreplayCommand,c->db->id,c->argv,c->argc,PROPAGATE_AOF|PROPAGATE_REPL);
s_pstate->Pop();
return; return;
} }