Fix issue where active replication doesn't replicate RDB data

Former-commit-id: 527b7eb0742567302e0343e3acbed9814c0cbb95
This commit is contained in:
John Sully 2020-11-23 02:01:40 +00:00
parent 3366f914d4
commit 50c2a8524c
9 changed files with 228 additions and 9 deletions

View File

@ -642,7 +642,7 @@ sds catAppendOnlyExpireMemberAtCommand(sds buf, struct redisCommand *cmd, robj *
when += mstime(); when += mstime();
robj *argvNew[4]; robj *argvNew[4];
argvNew[0] = createStringObject("PEXPIREMEMBERAT",15); argvNew[0] = shared.pexpirememberat;
argvNew[1] = argv[1]; argvNew[1] = argv[1];
argvNew[2] = argv[2]; argvNew[2] = argv[2];
argvNew[3] = createStringObjectFromLongLong(when); argvNew[3] = createStringObjectFromLongLong(when);

View File

@ -5019,6 +5019,48 @@ void dumpCommand(client *c) {
return; return;
} }
/* KEYDB.MVCCRESTORE key mvcc expire serialized-value */
void mvccrestoreCommand(client *c) {
long long mvcc, expire;
robj *key = c->argv[1], *obj = nullptr;
int type;
if (getLongLongFromObjectOrReply(c, c->argv[2], &mvcc, "Invalid MVCC Tstamp") != C_OK)
return;
if (getLongLongFromObjectOrReply(c, c->argv[3], &expire, "Invalid expire") != C_OK)
return;
/* Verify RDB version and data checksum unles the client is already a replica or master */
if (!(c->flags & (CLIENT_SLAVE | CLIENT_MASTER))) {
if (verifyDumpPayload((unsigned char*)ptrFromObj(c->argv[4]),sdslen(szFromObj(c->argv[4]))) == C_ERR)
{
addReplyError(c,"DUMP payload version or checksum are wrong");
return;
}
}
rio payload;
rioInitWithBuffer(&payload,szFromObj(c->argv[4]));
if (((type = rdbLoadObjectType(&payload)) == -1) ||
((obj = rdbLoadObject(type,&payload,szFromObj(key), OBJ_MVCC_INVALID)) == NULL))
{
addReplyError(c,"Bad data format");
return;
}
setMvccTstamp(obj, mvcc);
/* Create the key and set the TTL if any */
dbMerge(c->db,key,obj,true);
if (expire >= 0) {
setExpire(c,c->db,key,nullptr,expire);
}
signalModifiedKey(c,c->db,key);
notifyKeyspaceEvent(NOTIFY_GENERIC,"restore",key,c->db->id);
addReply(c,shared.ok);
g_pserver->dirty++;
}
/* RESTORE key ttl serialized-value [REPLACE] */ /* RESTORE key ttl serialized-value [REPLACE] */
void restoreCommand(client *c) { void restoreCommand(client *c) {
long long ttl, lfu_freq = -1, lru_idle = -1, lru_clock = -1; long long ttl, lfu_freq = -1, lru_idle = -1, lru_clock = -1;

View File

@ -202,13 +202,15 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
return o; return o;
} }
int dbAddCore(redisDb *db, robj *key, robj *val) { int dbAddCore(redisDb *db, robj *key, robj *val, bool fUpdateMvcc) {
serverAssert(!val->FExpires()); serverAssert(!val->FExpires());
sds copy = sdsdup(szFromObj(key)); sds copy = sdsdup(szFromObj(key));
int retval = dictAdd(db->pdict, copy, val); int retval = dictAdd(db->pdict, copy, val);
uint64_t mvcc = getMvccTstamp(); uint64_t mvcc = getMvccTstamp();
if (fUpdateMvcc) {
setMvccTstamp(key, mvcc); setMvccTstamp(key, mvcc);
setMvccTstamp(val, mvcc); setMvccTstamp(val, mvcc);
}
if (retval == DICT_OK) if (retval == DICT_OK)
{ {
@ -232,7 +234,7 @@ int dbAddCore(redisDb *db, robj *key, robj *val) {
* The program is aborted if the key already exists. */ * The program is aborted if the key already exists. */
void dbAdd(redisDb *db, robj *key, robj *val) void dbAdd(redisDb *db, robj *key, robj *val)
{ {
int retval = dbAddCore(db, key, val); int retval = dbAddCore(db, key, val, true /* fUpdateMvcc */);
serverAssertWithInfo(NULL,key,retval == DICT_OK); serverAssertWithInfo(NULL,key,retval == DICT_OK);
} }
@ -290,7 +292,7 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace)
{ {
dictEntry *de = dictFind(db->pdict, ptrFromObj(key)); dictEntry *de = dictFind(db->pdict, ptrFromObj(key));
if (de == nullptr) if (de == nullptr)
return (dbAddCore(db, key, val) == DICT_OK); return (dbAddCore(db, key, val, false /* fUpdateMvcc */) == DICT_OK);
robj *old = (robj*)dictGetVal(de); robj *old = (robj*)dictGetVal(de);
if (mvccFromObj(old) <= mvccFromObj(val)) if (mvccFromObj(old) <= mvccFromObj(val))
@ -303,7 +305,7 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace)
} }
else else
{ {
return (dbAddCore(db, key, val) == DICT_OK); return (dbAddCore(db, key, val, true /* fUpdateMvcc */) == DICT_OK);
} }
} }

View File

@ -2353,7 +2353,9 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
else { else {
redisObjectStack keyobj; redisObjectStack keyobj;
initStaticStringObject(keyobj,key); initStaticStringObject(keyobj,key);
setExpire(NULL, db, &keyobj, subexpireKey, strtoll(szFromObj(auxval), nullptr, 10)); long long expireT = strtoll(szFromObj(auxval), nullptr, 10);
setExpire(NULL, db, &keyobj, subexpireKey, expireT);
replicateSubkeyExpire(db, &keyobj, subexpireKey, expireT);
decrRefCount(subexpireKey); decrRefCount(subexpireKey);
subexpireKey = nullptr; subexpireKey = nullptr;
} }
@ -2475,6 +2477,8 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
/* call key space notification on key loaded for modules only */ /* call key space notification on key loaded for modules only */
moduleNotifyKeyspaceEvent(NOTIFY_LOADED, "loaded", &keyobj, db->id); moduleNotifyKeyspaceEvent(NOTIFY_LOADED, "loaded", &keyobj, db->id);
replicationNotifyLoadedKey(db, &keyobj, val, expiretime);
} }
else else
{ {

View File

@ -4367,3 +4367,39 @@ static void propagateMasterStaleKeys()
decrRefCount(rgobj[0]); decrRefCount(rgobj[0]);
} }
void replicationNotifyLoadedKey(redisDb *db, robj_roptr key, robj_roptr val, long long expire) {
if (!g_pserver->fActiveReplica || listLength(g_pserver->slaves) == 0)
return;
// Send a digest over to the replicas
rio r;
createDumpPayload(&r, val, key.unsafe_robjcast());
redisObjectStack objPayload;
initStaticStringObject(objPayload, r.io.buffer.ptr);
redisObjectStack objTtl;
initStaticStringObject(objTtl, sdscatprintf(sdsempty(), "%lld", expire));
redisObjectStack objMvcc;
initStaticStringObject(objMvcc, sdscatprintf(sdsempty(), "%lu", mvccFromObj(val)));
redisObject *argv[5] = {shared.mvccrestore, key.unsafe_robjcast(), &objMvcc, &objTtl, &objPayload};
replicationFeedSlaves(g_pserver->slaves, db - g_pserver->db, argv, 5);
sdsfree(szFromObj(&objTtl));
sdsfree(szFromObj(&objMvcc));
sdsfree(r.io.buffer.ptr);
}
void replicateSubkeyExpire(redisDb *db, robj_roptr key, robj_roptr subkey, long long expire) {
if (!g_pserver->fActiveReplica || listLength(g_pserver->slaves) == 0)
return;
redisObjectStack objTtl;
initStaticStringObject(objTtl, sdscatprintf(sdsempty(), "%lld", expire));
redisObject *argv[4] = {shared.pexpirememberat, key.unsafe_robjcast(), subkey.unsafe_robjcast(), &objTtl};
replicationFeedSlaves(g_pserver->slaves, db - g_pserver->db, argv, 4);
sdsfree(szFromObj(&objTtl));
}

View File

@ -1070,6 +1070,10 @@ struct redisCommand redisCommandTable[] = {
{"keydb.nhset",nhsetCommand,-3, {"keydb.nhset",nhsetCommand,-3,
"read-only fast @hash", "read-only fast @hash",
0,NULL,1,1,1,0,0,0}, 0,NULL,1,1,1,0,0,0},
{"KEYDB.MVCCRESTORE",mvccrestoreCommand, 5,
"write use-memory @keyspace @dangerous",
0,NULL,1,1,1,0,0,0},
}; };
/*============================ Utility functions ============================ */ /*============================ Utility functions ============================ */
@ -2438,6 +2442,8 @@ void createSharedObjects(void) {
shared.hdel = makeObjectShared(createStringObject("HDEL", 4)); shared.hdel = makeObjectShared(createStringObject("HDEL", 4));
shared.zrem = makeObjectShared(createStringObject("ZREM", 4)); shared.zrem = makeObjectShared(createStringObject("ZREM", 4));
shared.srem = makeObjectShared(createStringObject("SREM", 4)); shared.srem = makeObjectShared(createStringObject("SREM", 4));
shared.mvccrestore = makeObjectShared(createStringObject("KEYDB.MVCCRESTORE", 17));
shared.pexpirememberat = makeObjectShared(createStringObject("PEXPIREMEMBERAT",15));
for (j = 0; j < OBJ_SHARED_INTEGERS; j++) { for (j = 0; j < OBJ_SHARED_INTEGERS; j++) {
shared.integers[j] = shared.integers[j] =
makeObjectShared(createObject(OBJ_STRING,(void*)(long)j)); makeObjectShared(createObject(OBJ_STRING,(void*)(long)j));

View File

@ -1176,7 +1176,7 @@ struct sharedObjectsStruct {
*busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk, *busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk,
*unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *unlink, *unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *unlink,
*rpop, *lpop, *lpush, *rpoplpush, *zpopmin, *zpopmax, *emptyscan, *rpop, *lpop, *lpush, *rpoplpush, *zpopmin, *zpopmax, *emptyscan,
*multi, *exec, *srem, *hdel, *zrem, *multi, *exec, *srem, *hdel, *zrem, *mvccrestore, *pexpirememberat,
*select[PROTO_SHARED_SELECT_CMDS], *select[PROTO_SHARED_SELECT_CMDS],
*integers[OBJ_SHARED_INTEGERS], *integers[OBJ_SHARED_INTEGERS],
*mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */ *mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
@ -2217,6 +2217,8 @@ void updateMasterAuth();
void showLatestBacklog(); void showLatestBacklog();
void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask); void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
void rdbPipeWriteHandlerConnRemoved(struct connection *conn); void rdbPipeWriteHandlerConnRemoved(struct connection *conn);
void replicationNotifyLoadedKey(redisDb *db, robj_roptr key, robj_roptr val, long long expire);
void replicateSubkeyExpire(redisDb *db, robj_roptr key, robj_roptr subkey, long long expire);
/* Generic persistence functions */ /* Generic persistence functions */
void startLoadingFile(FILE* fp, const char * filename, int rdbflags); void startLoadingFile(FILE* fp, const char * filename, int rdbflags);
@ -2547,6 +2549,7 @@ void clusterPropagatePublish(robj *channel, robj *message);
void migrateCloseTimedoutSockets(void); void migrateCloseTimedoutSockets(void);
void clusterBeforeSleep(void); void clusterBeforeSleep(void);
int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, unsigned char *payload, uint32_t len); int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, unsigned char *payload, uint32_t len);
void createDumpPayload(rio *payload, robj_roptr o, robj *key);
/* Sentinel */ /* Sentinel */
void initSentinelConfig(void); void initSentinelConfig(void);
@ -2764,6 +2767,7 @@ void watchCommand(client *c);
void unwatchCommand(client *c); void unwatchCommand(client *c);
void clusterCommand(client *c); void clusterCommand(client *c);
void restoreCommand(client *c); void restoreCommand(client *c);
void mvccrestoreCommand(client *c);
void migrateCommand(client *c); void migrateCommand(client *c);
void askingCommand(client *c); void askingCommand(client *c);
void readonlyCommand(client *c); void readonlyCommand(client *c);

View File

@ -0,0 +1,124 @@
start_server {tags {"multi-master"} overrides {active-replica yes multi-master yes}} {
start_server {overrides {active-replica yes multi-master yes}} {
start_server {overrides {active-replica yes multi-master yes}} {
start_server {overrides {active-replica yes multi-master yes}} {
for {set j 0} {$j < 4} {incr j} {
set R($j) [srv [expr 0-$j] client]
set R_host($j) [srv [expr 0-$j] host]
set R_port($j) [srv [expr 0-$j] port]
}
set keysPerServer 100
# Initialize Dataset
for {set j 1} {$j < 4} {incr j} {
for {set key 0} { $key < $keysPerServer} { incr key } {
$R($j) set "key_$j\_$key" asdjaoijioasdjiod ex 100000
}
set hash($j) [$R($j) debug digest]
}
$R(1) replicaof $R_host(0) $R_port(0)
$R(2) replicaof $R_host(0) $R_port(0)
$R(3) replicaof $R_host(0) $R_port(0)
test "all nodes up" {
for {set j 1} {$j < 4} {incr j} {
wait_for_condition 50 100 {
[string match {*master_global_link_status:up*} [$R($j) info replication]]
} else {
fail "Multimaster group didn't connect up in a reasonable period of time"
}
}
}
test "nodes retain their data" {
for {set j 1} { $j < 4 } { incr j } {
assert_equal [$R($j) debug digest] $hash($j) $j
}
}
# Set all servers with an overlapping key - the last one should win
$R(0) set isvalid no
$R(1) set isvalid no
$R(2) set isvalid no
# Note: Sleep is due to mvcc slip
after 2
$R(3) set isvalid yes
for {set n 1} {$n < 4} {incr n} {
test "Node $n reciprocal rep works" {
$R(0) replicaof $R_host($n) $R_port($n)
after 2000
for {set key 0} { $key < $keysPerServer } { incr key } {
assert_equal [$R(0) get "key_$n\_$key"] asdjaoijioasdjiod $key
}
}
}
test "All data transferred between nodes" {
for {set server 0} {$server < 4} {incr server} {
set hash($j) [$R($server) debug digest]
for {set n 1} {$n < 4} {incr n} {
for {set key 0} {$key < $keysPerServer} {incr key} {
assert_equal [$R($server) get "key_$n\_$key"] asdjaoijioasdjiod "server: $n key: $key"
}
}
}
}
test "MVCC Updates Correctly" {
assert_equal [$R(0) get isvalid] yes
assert_equal [$R(1) get isvalid] yes
assert_equal [$R(2) get isvalid] yes
assert_equal [$R(3) get isvalid] yes
}
unset hash
test "All servers same debug digest" {
set hash [$R(0) debug digest]
for {set j 1} {$j < 4} {incr j} {
assert_equal $hash [$R($j) debug digest] $j
}
}
}}}}
# The tests below validate features replicated via RDB
start_server {tags {"multi-master"} overrides {active-replica yes multi-master yes}} {
start_server {overrides {active-replica yes multi-master yes}} {
start_server {overrides {active-replica yes multi-master yes}} {
for {set j 0} {$j < 3} {incr j} {
set R($j) [srv [expr 0-$j] client]
set R_host($j) [srv [expr 0-$j] host]
set R_port($j) [srv [expr 0-$j] port]
}
# Set replicated features here
$R(0) sadd testhash subkey
$R(0) expiremember testhash subkey 10000
test "node 2 up" {
$R(2) replicaof $R_host(1) $R_port(1)
wait_for_condition 50 100 {
[string match {*master_global_link_status:up*} [$R(2) info replication]]
} else {
fail "didn't connect up in a reasonable period of time"
}
}
# While node 1 loads from 0, it will relay to 2
test "node 1 up" {
$R(1) replicaof $R_host(0) $R_port(0)
wait_for_condition 50 100 {
[string match {*master_global_link_status:up*} [$R(1) info replication]]
} else {
fail "didn't connect up in a reasonable period of time"
}
}
#Tests that validate replication made it to node 2
test "subkey expire replicates via RDB" {
assert [expr [$R(2) ttl testhash subkey] > 0]
}
}}}

View File

@ -48,6 +48,7 @@ set ::all_tests {
integration/replication-psync integration/replication-psync
integration/replication-active integration/replication-active
integration/replication-multimaster integration/replication-multimaster
integration/replication-multimaster-connect
integration/aof integration/aof
integration/rdb integration/rdb
integration/convert-zipmap-hash-on-load integration/convert-zipmap-hash-on-load