Ensure multi-master works for ring topologies
Former-commit-id: a7cc3aac28ccec4dadb80aa2cc7279c53982bc28
This commit is contained in:
parent
6d25bd9f47
commit
9cd3b8253d
@ -28,6 +28,7 @@
|
||||
*/
|
||||
|
||||
#include "server.h"
|
||||
bool FInReplicaReplay();
|
||||
|
||||
/* ================================ MULTI/EXEC ============================== */
|
||||
|
||||
@ -172,12 +173,15 @@ void execCommand(client *c) {
|
||||
* This way we'll deliver the MULTI/..../EXEC block as a whole and
|
||||
* both the AOF and the replication link will have the same consistency
|
||||
* and atomicity guarantees. */
|
||||
if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) {
|
||||
if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN)) && !(FInReplicaReplay())) {
|
||||
execCommandPropagateMulti(c);
|
||||
must_propagate = 1;
|
||||
}
|
||||
|
||||
call(c,g_pserver->loading ? CMD_CALL_NONE : CMD_CALL_FULL);
|
||||
int flags = g_pserver->loading ? CMD_CALL_NONE : CMD_CALL_FULL;
|
||||
if (FInReplicaReplay())
|
||||
flags &= ~CMD_CALL_PROPAGATE;
|
||||
call(c,flags);
|
||||
|
||||
/* Commands may alter argc/argv, restore mstate. */
|
||||
c->mstate.commands[j].argc = c->argc;
|
||||
|
@ -42,6 +42,7 @@
|
||||
#include <algorithm>
|
||||
#include <uuid/uuid.h>
|
||||
#include <chrono>
|
||||
#include <unordered_map>
|
||||
|
||||
void replicationDiscardCachedMaster(redisMaster *mi);
|
||||
void replicationResurrectCachedMaster(redisMaster *mi, int newfd);
|
||||
@ -353,6 +354,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
||||
cchDbNum = std::min<int>(cchDbNum, sizeof(szDbNum)); // snprintf is tricky like that
|
||||
|
||||
char szMvcc[128];
|
||||
incrementMvccTstamp();
|
||||
uint64_t mvccTstamp = getMvccTstamp();
|
||||
int cchMvccNum = snprintf(szMvcc, sizeof(szMvcc), "%lu", mvccTstamp);
|
||||
int cchMvcc = snprintf(szMvcc, sizeof(szMvcc), "$%d\r\n%lu\r\n", cchMvccNum, mvccTstamp);
|
||||
@ -432,6 +434,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
||||
clientReplyBlock* reply = (clientReplyBlock*)listNodeValue(lnReply);
|
||||
addReplyProtoAsync(replica, reply->buf(), reply->used);
|
||||
}
|
||||
|
||||
if (!fSendRaw)
|
||||
{
|
||||
addReplyAsync(replica,shared.crlf);
|
||||
@ -2420,6 +2423,8 @@ void freeMasterInfo(redisMaster *mi)
|
||||
{
|
||||
zfree(mi->masterauth);
|
||||
zfree(mi->masteruser);
|
||||
if (mi->clientFake)
|
||||
freeClient(mi->clientFake);
|
||||
delete mi->staleKeyMap;
|
||||
zfree(mi);
|
||||
}
|
||||
@ -2477,6 +2482,11 @@ void replicationHandleMasterDisconnection(redisMaster *mi) {
|
||||
mi->master = NULL;
|
||||
mi->repl_state = REPL_STATE_CONNECT;
|
||||
mi->repl_down_since = g_pserver->unixtime;
|
||||
if (mi->clientFake) {
|
||||
freeClient(mi->clientFake);
|
||||
mi->clientFake = nullptr;
|
||||
|
||||
}
|
||||
/* We lost connection with our master, don't disconnect slaves yet,
|
||||
* maybe we'll be able to PSYNC with our master later. We'll disconnect
|
||||
* the slaves only if we'll have to do a full resync with our master. */
|
||||
@ -3344,14 +3354,33 @@ public:
|
||||
return m_cnesting == 1;
|
||||
}
|
||||
|
||||
redisMaster *getMi(client *c)
|
||||
{
|
||||
if (m_mi == nullptr)
|
||||
m_mi = MasterInfoFromClient(c);
|
||||
return m_mi;
|
||||
}
|
||||
|
||||
int nesting() const { return m_cnesting; }
|
||||
|
||||
private:
|
||||
int m_cnesting = 0;
|
||||
bool m_fCancelled = false;
|
||||
redisMaster *m_mi = nullptr;
|
||||
};
|
||||
|
||||
static thread_local ReplicaNestState *s_pstate = nullptr;
|
||||
|
||||
bool FInReplicaReplay()
|
||||
{
|
||||
return s_pstate != nullptr && s_pstate->nesting() > 0;
|
||||
}
|
||||
|
||||
|
||||
static std::unordered_map<std::string, uint64_t> g_mapmvcc;
|
||||
|
||||
void replicaReplayCommand(client *c)
|
||||
{
|
||||
static thread_local ReplicaNestState *s_pstate = nullptr;
|
||||
if (s_pstate == nullptr)
|
||||
s_pstate = new (MALLOC_LOCAL) ReplicaNestState;
|
||||
|
||||
@ -3375,9 +3404,10 @@ void replicaReplayCommand(client *c)
|
||||
return;
|
||||
}
|
||||
|
||||
unsigned char uuid[UUID_BINARY_LEN];
|
||||
std::string uuid;
|
||||
uuid.resize(UUID_BINARY_LEN);
|
||||
if (c->argv[1]->type != OBJ_STRING || sdslen((sds)ptrFromObj(c->argv[1])) != 36
|
||||
|| uuid_parse((sds)ptrFromObj(c->argv[1]), uuid) != 0)
|
||||
|| uuid_parse((sds)ptrFromObj(c->argv[1]), (unsigned char*)uuid.data()) != 0)
|
||||
{
|
||||
addReplyError(c, "Expected UUID arg1");
|
||||
s_pstate->Cancel();
|
||||
@ -3413,7 +3443,7 @@ void replicaReplayCommand(client *c)
|
||||
}
|
||||
}
|
||||
|
||||
if (FSameUuidNoNil(uuid, cserver.uuid))
|
||||
if (FSameUuidNoNil((unsigned char*)uuid.data(), cserver.uuid))
|
||||
{
|
||||
addReply(c, shared.ok);
|
||||
s_pstate->Cancel();
|
||||
@ -3423,33 +3453,56 @@ void replicaReplayCommand(client *c)
|
||||
if (!s_pstate->FPush())
|
||||
return;
|
||||
|
||||
redisMaster *mi = s_pstate->getMi(c);
|
||||
client *cFake = mi->clientFake;
|
||||
if (mi->clientFakeNesting != s_pstate->nesting())
|
||||
cFake = nullptr;
|
||||
serverAssert(mi != nullptr);
|
||||
if (mvcc != 0 && g_mapmvcc[uuid] >= mvcc && (mi->clientFake == nullptr))
|
||||
{
|
||||
s_pstate->Cancel();
|
||||
s_pstate->Pop();
|
||||
return;
|
||||
}
|
||||
|
||||
// OK We've recieved a command lets execute
|
||||
client *current_clientSave = serverTL->current_client;
|
||||
client *cFake = createClient(-1, c->iel);
|
||||
if (cFake == nullptr)
|
||||
cFake = createClient(-1, c->iel);
|
||||
cFake->lock.lock();
|
||||
cFake->authenticated = c->authenticated;
|
||||
cFake->puser = c->puser;
|
||||
cFake->querybuf = sdscatsds(cFake->querybuf,(sds)ptrFromObj(c->argv[2]));
|
||||
selectDb(cFake, c->db->id);
|
||||
auto ccmdPrev = serverTL->commandsExecuted;
|
||||
cFake->flags |= CLIENT_MASTER | CLIENT_PREVENT_REPL_PROP;
|
||||
processInputBuffer(cFake, (CMD_CALL_FULL & (~CMD_CALL_PROPAGATE)));
|
||||
cFake->flags &= ~(CLIENT_MASTER | CLIENT_PREVENT_REPL_PROP);
|
||||
bool fExec = ccmdPrev != serverTL->commandsExecuted;
|
||||
cFake->lock.unlock();
|
||||
if (fExec)
|
||||
if (fExec || cFake->flags & CLIENT_MULTI)
|
||||
{
|
||||
addReply(c, shared.ok);
|
||||
selectDb(c, cFake->db->id);
|
||||
redisMaster *mi = MasterInfoFromClient(c);
|
||||
if (mi != nullptr) // this should never be null but I'd prefer not to crash
|
||||
{
|
||||
mi->mvccLastSync = mvcc;
|
||||
}
|
||||
g_mapmvcc[uuid] = mvcc;
|
||||
}
|
||||
else
|
||||
{
|
||||
serverLog(LL_WARNING, "Command didn't execute: %s", cFake->buf);
|
||||
addReplyError(c, "command did not execute");
|
||||
}
|
||||
freeClient(cFake);
|
||||
serverAssert(sdslen(cFake->querybuf) == 0);
|
||||
if (cFake->flags & CLIENT_MULTI)
|
||||
{
|
||||
mi->clientFake = cFake;
|
||||
mi->clientFakeNesting = s_pstate->nesting();
|
||||
}
|
||||
else
|
||||
{
|
||||
if (mi->clientFake == cFake)
|
||||
mi->clientFake = nullptr;
|
||||
freeClient(cFake);
|
||||
}
|
||||
serverTL->current_client = current_clientSave;
|
||||
|
||||
// call() will not propogate this for us, so we do so here
|
||||
|
@ -1535,6 +1535,8 @@ struct redisMaster {
|
||||
int masterport; /* Port of master */
|
||||
client *cached_master; /* Cached master to be reused for PSYNC. */
|
||||
client *master;
|
||||
client *clientFake;
|
||||
int clientFakeNesting;
|
||||
/* The following two fields is where we store master PSYNC replid/offset
|
||||
* while the PSYNC is in progress. At the end we'll copy the fields into
|
||||
* the server->master client structure. */
|
||||
|
@ -59,6 +59,24 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} {
|
||||
}
|
||||
}
|
||||
|
||||
test {Active replicas propogate transaction} {
|
||||
$master set testkey 0
|
||||
$master multi
|
||||
$master incr testkey
|
||||
$master incr testkey
|
||||
after 5000
|
||||
$master get testkey
|
||||
$master exec
|
||||
assert_equal 2 [$master get testkey]
|
||||
after 500
|
||||
wait_for_condition 50 500 {
|
||||
[string match "2" [$slave get testkey]]
|
||||
} else {
|
||||
fail "Transaction failed to replicate"
|
||||
}
|
||||
$master flushall
|
||||
}
|
||||
|
||||
test {Active replicas WAIT} {
|
||||
# Test that wait succeeds since replicas should be syncronized
|
||||
$master set testkey foo
|
||||
|
@ -45,6 +45,7 @@ set ::all_tests {
|
||||
integration/replication-4
|
||||
integration/replication-psync
|
||||
integration/replication-active
|
||||
integration/replication-multimaster
|
||||
integration/aof
|
||||
integration/rdb
|
||||
integration/convert-zipmap-hash-on-load
|
||||
|
@ -1,7 +1,7 @@
|
||||
start_server {tags {"rreplay"} overrides {active-replica yes}} {
|
||||
|
||||
test {RREPLAY use current db} {
|
||||
r debug force-master flagonly
|
||||
r debug force-master yes
|
||||
r select 4
|
||||
r set dbnum invalid
|
||||
r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$5\r\ndbnum\r\n\$4\r\nfour\r\n"
|
||||
@ -10,7 +10,7 @@ start_server {tags {"rreplay"} overrides {active-replica yes}} {
|
||||
reconnect
|
||||
|
||||
test {RREPLAY db different} {
|
||||
r debug force-master flagonly
|
||||
r debug force-master yes
|
||||
r select 4
|
||||
r set testkey four
|
||||
r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$7\r\ntestkey\r\n\$4\r\nbebe\r\n" 2
|
||||
|
Loading…
x
Reference in New Issue
Block a user