Fix cases where duplicate RREPLAY is applied

Former-commit-id: c3317686f8b8d94a3b2295def899ae30e208f327
This commit is contained in:
John Sully 2020-02-11 00:59:07 -05:00
parent 30ece138d5
commit e4d74b993f

View File

@ -1259,19 +1259,19 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type)
{ {
aePostFunction(g_pserver->rgthreadvar[replica->iel].el, [replica] { aePostFunction(g_pserver->rgthreadvar[replica->iel].el, [replica] {
// Because the client could have been closed while the lambda waited to run we need to // Because the client could have been closed while the lambda waited to run we need to
// verify the replica is still connected // verify the replica is still connected
listIter li; listIter li;
listNode *ln; listNode *ln;
listRewind(g_pserver->slaves,&li); listRewind(g_pserver->slaves,&li);
bool fFound = false; bool fFound = false;
while ((ln = listNext(&li))) { while ((ln = listNext(&li))) {
if (listNodeValue(ln) == replica) { if (listNodeValue(ln) == replica) {
fFound = true; fFound = true;
break; break;
} }
} }
if (!fFound) if (!fFound)
return; return;
aeDeleteFileEvent(g_pserver->rgthreadvar[replica->iel].el,replica->fd,AE_WRITABLE); aeDeleteFileEvent(g_pserver->rgthreadvar[replica->iel].el,replica->fd,AE_WRITABLE);
if (aeCreateFileEvent(g_pserver->rgthreadvar[replica->iel].el, replica->fd, AE_WRITABLE, sendBulkToSlave, replica) == AE_ERR) { if (aeCreateFileEvent(g_pserver->rgthreadvar[replica->iel].el, replica->fd, AE_WRITABLE, sendBulkToSlave, replica) == AE_ERR) {
freeClient(replica); freeClient(replica);
@ -2329,10 +2329,11 @@ int connectWithMaster(redisMaster *mi) {
void undoConnectWithMaster(redisMaster *mi) { void undoConnectWithMaster(redisMaster *mi) {
int fd = mi->repl_transfer_s; int fd = mi->repl_transfer_s;
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [fd]{ int res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [fd]{
aeDeleteFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,fd,AE_READABLE|AE_WRITABLE); aeDeleteFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,fd,AE_READABLE|AE_WRITABLE);
close(fd); close(fd);
}); });
serverAssert(res == AE_OK);
mi->repl_transfer_s = -1; mi->repl_transfer_s = -1;
} }
@ -3459,7 +3460,7 @@ void replicaReplayCommand(client *c)
if (mi->clientFakeNesting != s_pstate->nesting()) if (mi->clientFakeNesting != s_pstate->nesting())
cFake = nullptr; cFake = nullptr;
serverAssert(mi != nullptr); serverAssert(mi != nullptr);
if (mvcc != 0 && g_mapmvcc[uuid] >= mvcc && (mi->clientFake == nullptr)) if (mvcc != 0 && g_mapmvcc[uuid] >= mvcc)
{ {
s_pstate->Cancel(); s_pstate->Cancel();
s_pstate->Pop(); s_pstate->Pop();
@ -3485,7 +3486,8 @@ void replicaReplayCommand(client *c)
{ {
addReply(c, shared.ok); addReply(c, shared.ok);
selectDb(c, cFake->db->id); selectDb(c, cFake->db->id);
g_mapmvcc[uuid] = mvcc; if (mvcc > g_mapmvcc[uuid])
g_mapmvcc[uuid] = mvcc;
} }
else else
{ {