From 71fe6f7ba906c3368965f8ca0030a96ba49feea7 Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 25 Mar 2020 21:55:31 -0400 Subject: [PATCH] Fix issue #143 Former-commit-id: 6ec1641294b23e22a2a5dc5cc6098a02ce234df3 --- src/networking.cpp | 2 +- src/replication.cpp | 50 ++++++++++++++++++--------------------------- src/server.cpp | 11 +++++----- src/server.h | 2 -- 4 files changed, 27 insertions(+), 38 deletions(-) diff --git a/src/networking.cpp b/src/networking.cpp index 94b202c17..276095715 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -496,7 +496,7 @@ void addReplyErrorLengthCore(client *c, const char *s, size_t len, bool fAsync) if (c->querybuf && sdslen(c->querybuf)) { std::string str = escapeString(c->querybuf); - serverLog(LL_WARNING, "\tquerybuf: %s", str.c_str()); + printf("\tquerybuf: %s\n", str.c_str()); } c->master_error = 1; } diff --git a/src/replication.cpp b/src/replication.cpp index 2bdc21bf0..6f38ee0bc 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -2425,8 +2425,6 @@ void freeMasterInfo(redisMaster *mi) { zfree(mi->masterauth); zfree(mi->masteruser); - if (mi->clientFake) - freeClient(mi->clientFake); delete mi->staleKeyMap; zfree(mi); } @@ -2484,11 +2482,6 @@ void replicationHandleMasterDisconnection(redisMaster *mi) { mi->master = NULL; mi->repl_state = REPL_STATE_CONNECT; mi->repl_down_since = g_pserver->unixtime; - if (mi->clientFake) { - freeClient(mi->clientFake); - mi->clientFake = nullptr; - - } /* We lost connection with our master, don't disconnect slaves yet, * maybe we'll be able to PSYNC with our master later. We'll disconnect * the slaves only if we'll have to do a full resync with our master. */ @@ -3378,8 +3371,13 @@ bool FInReplicaReplay() return s_pstate != nullptr && s_pstate->nesting() > 0; } +struct RemoteMasterState +{ + uint64_t mvcc = 0; + client *cFake = nullptr; +}; -static std::unordered_map g_mapmvcc; +static std::unordered_map g_mapremote; void replicaReplayCommand(client *c) { @@ -3455,12 +3453,15 @@ void replicaReplayCommand(client *c) if (!s_pstate->FPush()) return; - redisMaster *mi = s_pstate->getMi(c); - client *cFake = mi->clientFake; - if (mi->clientFakeNesting != s_pstate->nesting()) - cFake = nullptr; - serverAssert(mi != nullptr); - if (mvcc != 0 && g_mapmvcc[uuid] >= mvcc) + RemoteMasterState &remoteState = g_mapremote[uuid]; + if (remoteState.cFake == nullptr) + remoteState.cFake = createClient(-1, c->iel); + else + remoteState.cFake->iel = c->iel; + + client *cFake = remoteState.cFake; + + if (mvcc != 0 && remoteState.mvcc >= mvcc) { s_pstate->Cancel(); s_pstate->Pop(); @@ -3469,8 +3470,6 @@ void replicaReplayCommand(client *c) // OK We've recieved a command lets execute client *current_clientSave = serverTL->current_client; - if (cFake == nullptr) - cFake = createClient(-1, c->iel); cFake->lock.lock(); cFake->authenticated = c->authenticated; cFake->puser = c->puser; @@ -3483,13 +3482,15 @@ void replicaReplayCommand(client *c) bool fExec = ccmdPrev != serverTL->commandsExecuted; cFake->lock.unlock(); if (cFake->master_error) - addReplyError(c, "Error in rreplay command, please check logs"); + { + addReplyError(c, "Error in rreplay command, please check logs."); + } if (fExec || cFake->flags & CLIENT_MULTI) { addReply(c, shared.ok); selectDb(c, cFake->db->id); - if (mvcc > g_mapmvcc[uuid]) - g_mapmvcc[uuid] = mvcc; + if (mvcc > remoteState.mvcc) + remoteState.mvcc = mvcc; } else { @@ -3497,17 +3498,6 @@ void replicaReplayCommand(client *c) addReplyError(c, "command did not execute"); } serverAssert(sdslen(cFake->querybuf) == 0); - if (cFake->flags & CLIENT_MULTI) - { - mi->clientFake = cFake; - mi->clientFakeNesting = s_pstate->nesting(); - } - else - { - if (mi->clientFake == cFake) - mi->clientFake = nullptr; - freeClient(cFake); - } serverTL->current_client = current_clientSave; // call() will not propogate this for us, so we do so here diff --git a/src/server.cpp b/src/server.cpp index ae0ec52f1..30d7eb9de 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -3593,12 +3593,12 @@ int processCommand(client *c, int callFlags) { return C_OK; } } - - incrementMvccTstamp(); if (!locker.isArmed()) locker.arm(c); + incrementMvccTstamp(); + /* Handle the maxmemory directive. * * Note that we do not want to reclaim memory if we are here re-entering @@ -5018,14 +5018,15 @@ void incrementMvccTstamp() msPrev >>= MVCC_MS_SHIFT; // convert to milliseconds long long mst; - __atomic_load(&g_pserver->mstime, &mst, __ATOMIC_RELAXED); + __atomic_load(&g_pserver->mstime, &mst, __ATOMIC_ACQUIRE); if (msPrev >= (uint64_t)mst) // we can be greater if the count overflows { - atomicIncr(g_pserver->mvcc_tstamp, 1); + __atomic_fetch_add(&g_pserver->mvcc_tstamp, 1, __ATOMIC_RELEASE); } else { - atomicSet(g_pserver->mvcc_tstamp, ((uint64_t)g_pserver->mstime) << MVCC_MS_SHIFT); + uint64_t val = ((uint64_t)g_pserver->mstime) << MVCC_MS_SHIFT; + __atomic_store(&g_pserver->mvcc_tstamp, &val, __ATOMIC_RELEASE); } } diff --git a/src/server.h b/src/server.h index 94d8a0d11..218982bda 100644 --- a/src/server.h +++ b/src/server.h @@ -1541,8 +1541,6 @@ struct redisMaster { int masterport; /* Port of master */ client *cached_master; /* Cached master to be reused for PSYNC. */ client *master; - client *clientFake; - int clientFakeNesting; /* The following two fields is where we store master PSYNC replid/offset * while the PSYNC is in progress. At the end we'll copy the fields into * the server->master client structure. */