Former-commit-id: 6ec1641294b23e22a2a5dc5cc6098a02ce234df3
This commit is contained in:
John Sully 2020-03-25 21:55:31 -04:00
parent b443553a3d
commit 71fe6f7ba9
4 changed files with 27 additions and 38 deletions

View File

@ -496,7 +496,7 @@ void addReplyErrorLengthCore(client *c, const char *s, size_t len, bool fAsync)
if (c->querybuf && sdslen(c->querybuf)) { if (c->querybuf && sdslen(c->querybuf)) {
std::string str = escapeString(c->querybuf); std::string str = escapeString(c->querybuf);
serverLog(LL_WARNING, "\tquerybuf: %s", str.c_str()); printf("\tquerybuf: %s\n", str.c_str());
} }
c->master_error = 1; c->master_error = 1;
} }

View File

@ -2425,8 +2425,6 @@ void freeMasterInfo(redisMaster *mi)
{ {
zfree(mi->masterauth); zfree(mi->masterauth);
zfree(mi->masteruser); zfree(mi->masteruser);
if (mi->clientFake)
freeClient(mi->clientFake);
delete mi->staleKeyMap; delete mi->staleKeyMap;
zfree(mi); zfree(mi);
} }
@ -2484,11 +2482,6 @@ void replicationHandleMasterDisconnection(redisMaster *mi) {
mi->master = NULL; mi->master = NULL;
mi->repl_state = REPL_STATE_CONNECT; mi->repl_state = REPL_STATE_CONNECT;
mi->repl_down_since = g_pserver->unixtime; mi->repl_down_since = g_pserver->unixtime;
if (mi->clientFake) {
freeClient(mi->clientFake);
mi->clientFake = nullptr;
}
/* We lost connection with our master, don't disconnect slaves yet, /* We lost connection with our master, don't disconnect slaves yet,
* maybe we'll be able to PSYNC with our master later. We'll disconnect * maybe we'll be able to PSYNC with our master later. We'll disconnect
* the slaves only if we'll have to do a full resync with our master. */ * the slaves only if we'll have to do a full resync with our master. */
@ -3378,8 +3371,13 @@ bool FInReplicaReplay()
return s_pstate != nullptr && s_pstate->nesting() > 0; return s_pstate != nullptr && s_pstate->nesting() > 0;
} }
struct RemoteMasterState
{
uint64_t mvcc = 0;
client *cFake = nullptr;
};
static std::unordered_map<std::string, uint64_t> g_mapmvcc; static std::unordered_map<std::string, RemoteMasterState> g_mapremote;
void replicaReplayCommand(client *c) void replicaReplayCommand(client *c)
{ {
@ -3455,12 +3453,15 @@ void replicaReplayCommand(client *c)
if (!s_pstate->FPush()) if (!s_pstate->FPush())
return; return;
redisMaster *mi = s_pstate->getMi(c); RemoteMasterState &remoteState = g_mapremote[uuid];
client *cFake = mi->clientFake; if (remoteState.cFake == nullptr)
if (mi->clientFakeNesting != s_pstate->nesting()) remoteState.cFake = createClient(-1, c->iel);
cFake = nullptr; else
serverAssert(mi != nullptr); remoteState.cFake->iel = c->iel;
if (mvcc != 0 && g_mapmvcc[uuid] >= mvcc)
client *cFake = remoteState.cFake;
if (mvcc != 0 && remoteState.mvcc >= mvcc)
{ {
s_pstate->Cancel(); s_pstate->Cancel();
s_pstate->Pop(); s_pstate->Pop();
@ -3469,8 +3470,6 @@ void replicaReplayCommand(client *c)
// OK We've recieved a command lets execute // OK We've recieved a command lets execute
client *current_clientSave = serverTL->current_client; client *current_clientSave = serverTL->current_client;
if (cFake == nullptr)
cFake = createClient(-1, c->iel);
cFake->lock.lock(); cFake->lock.lock();
cFake->authenticated = c->authenticated; cFake->authenticated = c->authenticated;
cFake->puser = c->puser; cFake->puser = c->puser;
@ -3483,13 +3482,15 @@ void replicaReplayCommand(client *c)
bool fExec = ccmdPrev != serverTL->commandsExecuted; bool fExec = ccmdPrev != serverTL->commandsExecuted;
cFake->lock.unlock(); cFake->lock.unlock();
if (cFake->master_error) if (cFake->master_error)
addReplyError(c, "Error in rreplay command, please check logs"); {
addReplyError(c, "Error in rreplay command, please check logs.");
}
if (fExec || cFake->flags & CLIENT_MULTI) if (fExec || cFake->flags & CLIENT_MULTI)
{ {
addReply(c, shared.ok); addReply(c, shared.ok);
selectDb(c, cFake->db->id); selectDb(c, cFake->db->id);
if (mvcc > g_mapmvcc[uuid]) if (mvcc > remoteState.mvcc)
g_mapmvcc[uuid] = mvcc; remoteState.mvcc = mvcc;
} }
else else
{ {
@ -3497,17 +3498,6 @@ void replicaReplayCommand(client *c)
addReplyError(c, "command did not execute"); addReplyError(c, "command did not execute");
} }
serverAssert(sdslen(cFake->querybuf) == 0); serverAssert(sdslen(cFake->querybuf) == 0);
if (cFake->flags & CLIENT_MULTI)
{
mi->clientFake = cFake;
mi->clientFakeNesting = s_pstate->nesting();
}
else
{
if (mi->clientFake == cFake)
mi->clientFake = nullptr;
freeClient(cFake);
}
serverTL->current_client = current_clientSave; serverTL->current_client = current_clientSave;
// call() will not propogate this for us, so we do so here // call() will not propogate this for us, so we do so here

View File

@ -3593,12 +3593,12 @@ int processCommand(client *c, int callFlags) {
return C_OK; return C_OK;
} }
} }
incrementMvccTstamp();
if (!locker.isArmed()) if (!locker.isArmed())
locker.arm(c); locker.arm(c);
incrementMvccTstamp();
/* Handle the maxmemory directive. /* Handle the maxmemory directive.
* *
* Note that we do not want to reclaim memory if we are here re-entering * Note that we do not want to reclaim memory if we are here re-entering
@ -5018,14 +5018,15 @@ void incrementMvccTstamp()
msPrev >>= MVCC_MS_SHIFT; // convert to milliseconds msPrev >>= MVCC_MS_SHIFT; // convert to milliseconds
long long mst; long long mst;
__atomic_load(&g_pserver->mstime, &mst, __ATOMIC_RELAXED); __atomic_load(&g_pserver->mstime, &mst, __ATOMIC_ACQUIRE);
if (msPrev >= (uint64_t)mst) // we can be greater if the count overflows if (msPrev >= (uint64_t)mst) // we can be greater if the count overflows
{ {
atomicIncr(g_pserver->mvcc_tstamp, 1); __atomic_fetch_add(&g_pserver->mvcc_tstamp, 1, __ATOMIC_RELEASE);
} }
else else
{ {
atomicSet(g_pserver->mvcc_tstamp, ((uint64_t)g_pserver->mstime) << MVCC_MS_SHIFT); uint64_t val = ((uint64_t)g_pserver->mstime) << MVCC_MS_SHIFT;
__atomic_store(&g_pserver->mvcc_tstamp, &val, __ATOMIC_RELEASE);
} }
} }

View File

@ -1541,8 +1541,6 @@ struct redisMaster {
int masterport; /* Port of master */ int masterport; /* Port of master */
client *cached_master; /* Cached master to be reused for PSYNC. */ client *cached_master; /* Cached master to be reused for PSYNC. */
client *master; client *master;
client *clientFake;
int clientFakeNesting;
/* The following two fields is where we store master PSYNC replid/offset /* The following two fields is where we store master PSYNC replid/offset
* while the PSYNC is in progress. At the end we'll copy the fields into * while the PSYNC is in progress. At the end we'll copy the fields into
* the server->master client structure. */ * the server->master client structure. */