Merge branch 'unstable' into redis_6_merge
Former-commit-id: 908cf5042ebcd7870166bd1a0bb450f37e5f3b4d
This commit is contained in:
commit
9c4b66b9a4
@ -495,7 +495,7 @@ void addReplyErrorLengthCore(client *c, const char *s, size_t len, bool fAsync)
|
|||||||
|
|
||||||
if (c->querybuf && sdslen(c->querybuf)) {
|
if (c->querybuf && sdslen(c->querybuf)) {
|
||||||
std::string str = escapeString(c->querybuf);
|
std::string str = escapeString(c->querybuf);
|
||||||
serverLog(LL_WARNING, "\tquerybuf: %s", str.c_str());
|
printf("\tquerybuf: %s\n", str.c_str());
|
||||||
}
|
}
|
||||||
c->master_error = 1;
|
c->master_error = 1;
|
||||||
}
|
}
|
||||||
|
@ -2812,8 +2812,6 @@ void freeMasterInfo(redisMaster *mi)
|
|||||||
zfree(mi->masteruser);
|
zfree(mi->masteruser);
|
||||||
if (mi->repl_transfer_tmpfile)
|
if (mi->repl_transfer_tmpfile)
|
||||||
zfree(mi->repl_transfer_tmpfile);
|
zfree(mi->repl_transfer_tmpfile);
|
||||||
if (mi->clientFake)
|
|
||||||
freeClient(mi->clientFake);
|
|
||||||
delete mi->staleKeyMap;
|
delete mi->staleKeyMap;
|
||||||
if (mi->cached_master != nullptr)
|
if (mi->cached_master != nullptr)
|
||||||
freeClientAsync(mi->cached_master);
|
freeClientAsync(mi->cached_master);
|
||||||
@ -2892,11 +2890,6 @@ void replicationHandleMasterDisconnection(redisMaster *mi) {
|
|||||||
mi->master = NULL;
|
mi->master = NULL;
|
||||||
mi->repl_state = REPL_STATE_CONNECT;
|
mi->repl_state = REPL_STATE_CONNECT;
|
||||||
mi->repl_down_since = g_pserver->unixtime;
|
mi->repl_down_since = g_pserver->unixtime;
|
||||||
if (mi->clientFake) {
|
|
||||||
freeClient(mi->clientFake);
|
|
||||||
mi->clientFake = nullptr;
|
|
||||||
|
|
||||||
}
|
|
||||||
/* We lost connection with our master, don't disconnect slaves yet,
|
/* We lost connection with our master, don't disconnect slaves yet,
|
||||||
* maybe we'll be able to PSYNC with our master later. We'll disconnect
|
* maybe we'll be able to PSYNC with our master later. We'll disconnect
|
||||||
* the slaves only if we'll have to do a full resync with our master. */
|
* the slaves only if we'll have to do a full resync with our master. */
|
||||||
@ -3793,8 +3786,13 @@ bool FInReplicaReplay()
|
|||||||
return s_pstate != nullptr && s_pstate->nesting() > 0;
|
return s_pstate != nullptr && s_pstate->nesting() > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct RemoteMasterState
|
||||||
|
{
|
||||||
|
uint64_t mvcc = 0;
|
||||||
|
client *cFake = nullptr;
|
||||||
|
};
|
||||||
|
|
||||||
static std::unordered_map<std::string, uint64_t> g_mapmvcc;
|
static std::unordered_map<std::string, RemoteMasterState> g_mapremote;
|
||||||
|
|
||||||
void replicaReplayCommand(client *c)
|
void replicaReplayCommand(client *c)
|
||||||
{
|
{
|
||||||
@ -3870,12 +3868,15 @@ void replicaReplayCommand(client *c)
|
|||||||
if (!s_pstate->FPush())
|
if (!s_pstate->FPush())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
redisMaster *mi = s_pstate->getMi(c);
|
RemoteMasterState &remoteState = g_mapremote[uuid];
|
||||||
client *cFake = mi->clientFake;
|
if (remoteState.cFake == nullptr)
|
||||||
if (mi->clientFakeNesting != s_pstate->nesting())
|
remoteState.cFake = createClient(-1, c->iel);
|
||||||
cFake = nullptr;
|
else
|
||||||
serverAssert(mi != nullptr);
|
remoteState.cFake->iel = c->iel;
|
||||||
if (mvcc != 0 && g_mapmvcc[uuid] >= mvcc)
|
|
||||||
|
client *cFake = remoteState.cFake;
|
||||||
|
|
||||||
|
if (mvcc != 0 && remoteState.mvcc >= mvcc)
|
||||||
{
|
{
|
||||||
s_pstate->Cancel();
|
s_pstate->Cancel();
|
||||||
s_pstate->Pop();
|
s_pstate->Pop();
|
||||||
@ -3884,8 +3885,11 @@ void replicaReplayCommand(client *c)
|
|||||||
|
|
||||||
// OK We've recieved a command lets execute
|
// OK We've recieved a command lets execute
|
||||||
client *current_clientSave = serverTL->current_client;
|
client *current_clientSave = serverTL->current_client;
|
||||||
|
<<<<<<< HEAD
|
||||||
if (cFake == nullptr)
|
if (cFake == nullptr)
|
||||||
cFake = createClient(nullptr, c->iel);
|
cFake = createClient(nullptr, c->iel);
|
||||||
|
=======
|
||||||
|
>>>>>>> unstable
|
||||||
cFake->lock.lock();
|
cFake->lock.lock();
|
||||||
cFake->authenticated = c->authenticated;
|
cFake->authenticated = c->authenticated;
|
||||||
cFake->puser = c->puser;
|
cFake->puser = c->puser;
|
||||||
@ -3898,13 +3902,15 @@ void replicaReplayCommand(client *c)
|
|||||||
bool fExec = ccmdPrev != serverTL->commandsExecuted;
|
bool fExec = ccmdPrev != serverTL->commandsExecuted;
|
||||||
cFake->lock.unlock();
|
cFake->lock.unlock();
|
||||||
if (cFake->master_error)
|
if (cFake->master_error)
|
||||||
addReplyError(c, "Error in rreplay command, please check logs");
|
{
|
||||||
|
addReplyError(c, "Error in rreplay command, please check logs.");
|
||||||
|
}
|
||||||
if (fExec || cFake->flags & CLIENT_MULTI)
|
if (fExec || cFake->flags & CLIENT_MULTI)
|
||||||
{
|
{
|
||||||
addReply(c, shared.ok);
|
addReply(c, shared.ok);
|
||||||
selectDb(c, cFake->db->id);
|
selectDb(c, cFake->db->id);
|
||||||
if (mvcc > g_mapmvcc[uuid])
|
if (mvcc > remoteState.mvcc)
|
||||||
g_mapmvcc[uuid] = mvcc;
|
remoteState.mvcc = mvcc;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -3912,17 +3918,6 @@ void replicaReplayCommand(client *c)
|
|||||||
addReplyError(c, "command did not execute");
|
addReplyError(c, "command did not execute");
|
||||||
}
|
}
|
||||||
serverAssert(sdslen(cFake->querybuf) == 0);
|
serverAssert(sdslen(cFake->querybuf) == 0);
|
||||||
if (cFake->flags & CLIENT_MULTI)
|
|
||||||
{
|
|
||||||
mi->clientFake = cFake;
|
|
||||||
mi->clientFakeNesting = s_pstate->nesting();
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
if (mi->clientFake == cFake)
|
|
||||||
mi->clientFake = nullptr;
|
|
||||||
freeClient(cFake);
|
|
||||||
}
|
|
||||||
serverTL->current_client = current_clientSave;
|
serverTL->current_client = current_clientSave;
|
||||||
|
|
||||||
// call() will not propogate this for us, so we do so here
|
// call() will not propogate this for us, so we do so here
|
||||||
|
@ -3634,11 +3634,11 @@ int processCommand(client *c, int callFlags) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
incrementMvccTstamp();
|
|
||||||
|
|
||||||
if (!locker.isArmed())
|
if (!locker.isArmed())
|
||||||
locker.arm(c);
|
locker.arm(c);
|
||||||
|
|
||||||
|
incrementMvccTstamp();
|
||||||
|
|
||||||
/* Handle the maxmemory directive.
|
/* Handle the maxmemory directive.
|
||||||
*
|
*
|
||||||
* Note that we do not want to reclaim memory if we are here re-entering
|
* Note that we do not want to reclaim memory if we are here re-entering
|
||||||
@ -5139,14 +5139,15 @@ void incrementMvccTstamp()
|
|||||||
msPrev >>= MVCC_MS_SHIFT; // convert to milliseconds
|
msPrev >>= MVCC_MS_SHIFT; // convert to milliseconds
|
||||||
|
|
||||||
long long mst;
|
long long mst;
|
||||||
__atomic_load(&g_pserver->mstime, &mst, __ATOMIC_RELAXED);
|
__atomic_load(&g_pserver->mstime, &mst, __ATOMIC_ACQUIRE);
|
||||||
if (msPrev >= (uint64_t)mst) // we can be greater if the count overflows
|
if (msPrev >= (uint64_t)mst) // we can be greater if the count overflows
|
||||||
{
|
{
|
||||||
atomicIncr(g_pserver->mvcc_tstamp, 1);
|
__atomic_fetch_add(&g_pserver->mvcc_tstamp, 1, __ATOMIC_RELEASE);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
atomicSet(g_pserver->mvcc_tstamp, ((uint64_t)g_pserver->mstime) << MVCC_MS_SHIFT);
|
uint64_t val = ((uint64_t)g_pserver->mstime) << MVCC_MS_SHIFT;
|
||||||
|
__atomic_store(&g_pserver->mvcc_tstamp, &val, __ATOMIC_RELEASE);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -5157,7 +5158,7 @@ void OnTerminate()
|
|||||||
The easiest way to achieve that is to acutally segfault, so we assert
|
The easiest way to achieve that is to acutally segfault, so we assert
|
||||||
here.
|
here.
|
||||||
*/
|
*/
|
||||||
serverAssert(false);
|
serverPanic("std::teminate() called");
|
||||||
}
|
}
|
||||||
|
|
||||||
void *workerThreadMain(void *parg)
|
void *workerThreadMain(void *parg)
|
||||||
|
@ -1534,8 +1534,6 @@ struct redisMaster {
|
|||||||
int masterport; /* Port of master */
|
int masterport; /* Port of master */
|
||||||
client *cached_master; /* Cached master to be reused for PSYNC. */
|
client *cached_master; /* Cached master to be reused for PSYNC. */
|
||||||
client *master;
|
client *master;
|
||||||
client *clientFake;
|
|
||||||
int clientFakeNesting;
|
|
||||||
/* The following two fields is where we store master PSYNC replid/offset
|
/* The following two fields is where we store master PSYNC replid/offset
|
||||||
* while the PSYNC is in progress. At the end we'll copy the fields into
|
* while the PSYNC is in progress. At the end we'll copy the fields into
|
||||||
* the server->master client structure. */
|
* the server->master client structure. */
|
||||||
|
Loading…
x
Reference in New Issue
Block a user