From 50c2a8524cae1a22d8667f9570022eaf0e55b6c1 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 23 Nov 2020 02:01:40 +0000 Subject: [PATCH] Fix issue where active replication doesn't replicate RDB data Former-commit-id: 527b7eb0742567302e0343e3acbed9814c0cbb95 --- src/aof.cpp | 2 +- src/cluster.cpp | 42 ++++++ src/db.cpp | 14 +- src/rdb.cpp | 6 +- src/replication.cpp | 36 +++++ src/server.cpp | 6 + src/server.h | 6 +- .../replication-multimaster-connect.tcl | 124 ++++++++++++++++++ tests/test_helper.tcl | 1 + 9 files changed, 228 insertions(+), 9 deletions(-) create mode 100644 tests/integration/replication-multimaster-connect.tcl diff --git a/src/aof.cpp b/src/aof.cpp index 476789a65..34feea237 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -642,7 +642,7 @@ sds catAppendOnlyExpireMemberAtCommand(sds buf, struct redisCommand *cmd, robj * when += mstime(); robj *argvNew[4]; - argvNew[0] = createStringObject("PEXPIREMEMBERAT",15); + argvNew[0] = shared.pexpirememberat; argvNew[1] = argv[1]; argvNew[2] = argv[2]; argvNew[3] = createStringObjectFromLongLong(when); diff --git a/src/cluster.cpp b/src/cluster.cpp index a3074b564..b146b1011 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -5019,6 +5019,48 @@ void dumpCommand(client *c) { return; } +/* KEYDB.MVCCRESTORE key mvcc expire serialized-value */ +void mvccrestoreCommand(client *c) { + long long mvcc, expire; + robj *key = c->argv[1], *obj = nullptr; + int type; + + if (getLongLongFromObjectOrReply(c, c->argv[2], &mvcc, "Invalid MVCC Tstamp") != C_OK) + return; + + if (getLongLongFromObjectOrReply(c, c->argv[3], &expire, "Invalid expire") != C_OK) + return; + + /* Verify RDB version and data checksum unles the client is already a replica or master */ + if (!(c->flags & (CLIENT_SLAVE | CLIENT_MASTER))) { + if (verifyDumpPayload((unsigned char*)ptrFromObj(c->argv[4]),sdslen(szFromObj(c->argv[4]))) == C_ERR) + { + addReplyError(c,"DUMP payload version or checksum are wrong"); + return; + } + } + + rio payload; + rioInitWithBuffer(&payload,szFromObj(c->argv[4])); + if (((type = rdbLoadObjectType(&payload)) == -1) || + ((obj = rdbLoadObject(type,&payload,szFromObj(key), OBJ_MVCC_INVALID)) == NULL)) + { + addReplyError(c,"Bad data format"); + return; + } + setMvccTstamp(obj, mvcc); + + /* Create the key and set the TTL if any */ + dbMerge(c->db,key,obj,true); + if (expire >= 0) { + setExpire(c,c->db,key,nullptr,expire); + } + signalModifiedKey(c,c->db,key); + notifyKeyspaceEvent(NOTIFY_GENERIC,"restore",key,c->db->id); + addReply(c,shared.ok); + g_pserver->dirty++; +} + /* RESTORE key ttl serialized-value [REPLACE] */ void restoreCommand(client *c) { long long ttl, lfu_freq = -1, lru_idle = -1, lru_clock = -1; diff --git a/src/db.cpp b/src/db.cpp index 2edad7341..97bcc86df 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -202,13 +202,15 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) { return o; } -int dbAddCore(redisDb *db, robj *key, robj *val) { +int dbAddCore(redisDb *db, robj *key, robj *val, bool fUpdateMvcc) { serverAssert(!val->FExpires()); sds copy = sdsdup(szFromObj(key)); int retval = dictAdd(db->pdict, copy, val); uint64_t mvcc = getMvccTstamp(); - setMvccTstamp(key, mvcc); - setMvccTstamp(val, mvcc); + if (fUpdateMvcc) { + setMvccTstamp(key, mvcc); + setMvccTstamp(val, mvcc); + } if (retval == DICT_OK) { @@ -232,7 +234,7 @@ int dbAddCore(redisDb *db, robj *key, robj *val) { * The program is aborted if the key already exists. */ void dbAdd(redisDb *db, robj *key, robj *val) { - int retval = dbAddCore(db, key, val); + int retval = dbAddCore(db, key, val, true /* fUpdateMvcc */); serverAssertWithInfo(NULL,key,retval == DICT_OK); } @@ -290,7 +292,7 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace) { dictEntry *de = dictFind(db->pdict, ptrFromObj(key)); if (de == nullptr) - return (dbAddCore(db, key, val) == DICT_OK); + return (dbAddCore(db, key, val, false /* fUpdateMvcc */) == DICT_OK); robj *old = (robj*)dictGetVal(de); if (mvccFromObj(old) <= mvccFromObj(val)) @@ -303,7 +305,7 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace) } else { - return (dbAddCore(db, key, val) == DICT_OK); + return (dbAddCore(db, key, val, true /* fUpdateMvcc */) == DICT_OK); } } diff --git a/src/rdb.cpp b/src/rdb.cpp index 1cfd6a423..367d8ad13 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2353,7 +2353,9 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { else { redisObjectStack keyobj; initStaticStringObject(keyobj,key); - setExpire(NULL, db, &keyobj, subexpireKey, strtoll(szFromObj(auxval), nullptr, 10)); + long long expireT = strtoll(szFromObj(auxval), nullptr, 10); + setExpire(NULL, db, &keyobj, subexpireKey, expireT); + replicateSubkeyExpire(db, &keyobj, subexpireKey, expireT); decrRefCount(subexpireKey); subexpireKey = nullptr; } @@ -2475,6 +2477,8 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { /* call key space notification on key loaded for modules only */ moduleNotifyKeyspaceEvent(NOTIFY_LOADED, "loaded", &keyobj, db->id); + + replicationNotifyLoadedKey(db, &keyobj, val, expiretime); } else { diff --git a/src/replication.cpp b/src/replication.cpp index f8db2f552..46ade807d 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -4367,3 +4367,39 @@ static void propagateMasterStaleKeys() decrRefCount(rgobj[0]); } + +void replicationNotifyLoadedKey(redisDb *db, robj_roptr key, robj_roptr val, long long expire) { + if (!g_pserver->fActiveReplica || listLength(g_pserver->slaves) == 0) + return; + + // Send a digest over to the replicas + rio r; + + createDumpPayload(&r, val, key.unsafe_robjcast()); + + redisObjectStack objPayload; + initStaticStringObject(objPayload, r.io.buffer.ptr); + redisObjectStack objTtl; + initStaticStringObject(objTtl, sdscatprintf(sdsempty(), "%lld", expire)); + redisObjectStack objMvcc; + initStaticStringObject(objMvcc, sdscatprintf(sdsempty(), "%lu", mvccFromObj(val))); + redisObject *argv[5] = {shared.mvccrestore, key.unsafe_robjcast(), &objMvcc, &objTtl, &objPayload}; + + replicationFeedSlaves(g_pserver->slaves, db - g_pserver->db, argv, 5); + + sdsfree(szFromObj(&objTtl)); + sdsfree(szFromObj(&objMvcc)); + sdsfree(r.io.buffer.ptr); +} + +void replicateSubkeyExpire(redisDb *db, robj_roptr key, robj_roptr subkey, long long expire) { + if (!g_pserver->fActiveReplica || listLength(g_pserver->slaves) == 0) + return; + + redisObjectStack objTtl; + initStaticStringObject(objTtl, sdscatprintf(sdsempty(), "%lld", expire)); + redisObject *argv[4] = {shared.pexpirememberat, key.unsafe_robjcast(), subkey.unsafe_robjcast(), &objTtl}; + replicationFeedSlaves(g_pserver->slaves, db - g_pserver->db, argv, 4); + + sdsfree(szFromObj(&objTtl)); +} \ No newline at end of file diff --git a/src/server.cpp b/src/server.cpp index 9d7dc2ef3..277c9e39a 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1070,6 +1070,10 @@ struct redisCommand redisCommandTable[] = { {"keydb.nhset",nhsetCommand,-3, "read-only fast @hash", 0,NULL,1,1,1,0,0,0}, + + {"KEYDB.MVCCRESTORE",mvccrestoreCommand, 5, + "write use-memory @keyspace @dangerous", + 0,NULL,1,1,1,0,0,0}, }; /*============================ Utility functions ============================ */ @@ -2438,6 +2442,8 @@ void createSharedObjects(void) { shared.hdel = makeObjectShared(createStringObject("HDEL", 4)); shared.zrem = makeObjectShared(createStringObject("ZREM", 4)); shared.srem = makeObjectShared(createStringObject("SREM", 4)); + shared.mvccrestore = makeObjectShared(createStringObject("KEYDB.MVCCRESTORE", 17)); + shared.pexpirememberat = makeObjectShared(createStringObject("PEXPIREMEMBERAT",15)); for (j = 0; j < OBJ_SHARED_INTEGERS; j++) { shared.integers[j] = makeObjectShared(createObject(OBJ_STRING,(void*)(long)j)); diff --git a/src/server.h b/src/server.h index d8285e5f4..0fc874b6a 100644 --- a/src/server.h +++ b/src/server.h @@ -1176,7 +1176,7 @@ struct sharedObjectsStruct { *busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk, *unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *unlink, *rpop, *lpop, *lpush, *rpoplpush, *zpopmin, *zpopmax, *emptyscan, - *multi, *exec, *srem, *hdel, *zrem, + *multi, *exec, *srem, *hdel, *zrem, *mvccrestore, *pexpirememberat, *select[PROTO_SHARED_SELECT_CMDS], *integers[OBJ_SHARED_INTEGERS], *mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*\r\n" */ @@ -2217,6 +2217,8 @@ void updateMasterAuth(); void showLatestBacklog(); void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask); void rdbPipeWriteHandlerConnRemoved(struct connection *conn); +void replicationNotifyLoadedKey(redisDb *db, robj_roptr key, robj_roptr val, long long expire); +void replicateSubkeyExpire(redisDb *db, robj_roptr key, robj_roptr subkey, long long expire); /* Generic persistence functions */ void startLoadingFile(FILE* fp, const char * filename, int rdbflags); @@ -2547,6 +2549,7 @@ void clusterPropagatePublish(robj *channel, robj *message); void migrateCloseTimedoutSockets(void); void clusterBeforeSleep(void); int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, unsigned char *payload, uint32_t len); +void createDumpPayload(rio *payload, robj_roptr o, robj *key); /* Sentinel */ void initSentinelConfig(void); @@ -2764,6 +2767,7 @@ void watchCommand(client *c); void unwatchCommand(client *c); void clusterCommand(client *c); void restoreCommand(client *c); +void mvccrestoreCommand(client *c); void migrateCommand(client *c); void askingCommand(client *c); void readonlyCommand(client *c); diff --git a/tests/integration/replication-multimaster-connect.tcl b/tests/integration/replication-multimaster-connect.tcl new file mode 100644 index 000000000..229d9fe96 --- /dev/null +++ b/tests/integration/replication-multimaster-connect.tcl @@ -0,0 +1,124 @@ +start_server {tags {"multi-master"} overrides {active-replica yes multi-master yes}} { +start_server {overrides {active-replica yes multi-master yes}} { +start_server {overrides {active-replica yes multi-master yes}} { +start_server {overrides {active-replica yes multi-master yes}} { + for {set j 0} {$j < 4} {incr j} { + set R($j) [srv [expr 0-$j] client] + set R_host($j) [srv [expr 0-$j] host] + set R_port($j) [srv [expr 0-$j] port] + } + + set keysPerServer 100 + + # Initialize Dataset + for {set j 1} {$j < 4} {incr j} { + for {set key 0} { $key < $keysPerServer} { incr key } { + $R($j) set "key_$j\_$key" asdjaoijioasdjiod ex 100000 + } + set hash($j) [$R($j) debug digest] + } + + $R(1) replicaof $R_host(0) $R_port(0) + $R(2) replicaof $R_host(0) $R_port(0) + $R(3) replicaof $R_host(0) $R_port(0) + + test "all nodes up" { + for {set j 1} {$j < 4} {incr j} { + wait_for_condition 50 100 { + [string match {*master_global_link_status:up*} [$R($j) info replication]] + } else { + fail "Multimaster group didn't connect up in a reasonable period of time" + } + } + } + + test "nodes retain their data" { + for {set j 1} { $j < 4 } { incr j } { + assert_equal [$R($j) debug digest] $hash($j) $j + } + } + + # Set all servers with an overlapping key - the last one should win + $R(0) set isvalid no + $R(1) set isvalid no + $R(2) set isvalid no + # Note: Sleep is due to mvcc slip + after 2 + $R(3) set isvalid yes + + for {set n 1} {$n < 4} {incr n} { + test "Node $n reciprocal rep works" { + $R(0) replicaof $R_host($n) $R_port($n) + after 2000 + for {set key 0} { $key < $keysPerServer } { incr key } { + assert_equal [$R(0) get "key_$n\_$key"] asdjaoijioasdjiod $key + } + } + } + + test "All data transferred between nodes" { + for {set server 0} {$server < 4} {incr server} { + set hash($j) [$R($server) debug digest] + for {set n 1} {$n < 4} {incr n} { + for {set key 0} {$key < $keysPerServer} {incr key} { + assert_equal [$R($server) get "key_$n\_$key"] asdjaoijioasdjiod "server: $n key: $key" + } + } + } + } + + test "MVCC Updates Correctly" { + assert_equal [$R(0) get isvalid] yes + assert_equal [$R(1) get isvalid] yes + assert_equal [$R(2) get isvalid] yes + assert_equal [$R(3) get isvalid] yes + } + + unset hash + test "All servers same debug digest" { + set hash [$R(0) debug digest] + for {set j 1} {$j < 4} {incr j} { + assert_equal $hash [$R($j) debug digest] $j + } + } +}}}} + +# The tests below validate features replicated via RDB +start_server {tags {"multi-master"} overrides {active-replica yes multi-master yes}} { +start_server {overrides {active-replica yes multi-master yes}} { +start_server {overrides {active-replica yes multi-master yes}} { + for {set j 0} {$j < 3} {incr j} { + set R($j) [srv [expr 0-$j] client] + set R_host($j) [srv [expr 0-$j] host] + set R_port($j) [srv [expr 0-$j] port] + } + + # Set replicated features here + $R(0) sadd testhash subkey + $R(0) expiremember testhash subkey 10000 + + + test "node 2 up" { + $R(2) replicaof $R_host(1) $R_port(1) + wait_for_condition 50 100 { + [string match {*master_global_link_status:up*} [$R(2) info replication]] + } else { + fail "didn't connect up in a reasonable period of time" + } + } + + # While node 1 loads from 0, it will relay to 2 + test "node 1 up" { + $R(1) replicaof $R_host(0) $R_port(0) + wait_for_condition 50 100 { + [string match {*master_global_link_status:up*} [$R(1) info replication]] + } else { + fail "didn't connect up in a reasonable period of time" + } + } + + #Tests that validate replication made it to node 2 + test "subkey expire replicates via RDB" { + assert [expr [$R(2) ttl testhash subkey] > 0] + } +}}} diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index b70d89968..6ee5572e1 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -48,6 +48,7 @@ set ::all_tests { integration/replication-psync integration/replication-active integration/replication-multimaster + integration/replication-multimaster-connect integration/aof integration/rdb integration/convert-zipmap-hash-on-load