From 2684a266c875961cd60407a33be18a7973a2abea Mon Sep 17 00:00:00 2001 From: John Sully Date: Sat, 4 Apr 2020 21:52:27 -0400 Subject: [PATCH 1/7] Fix subkey expires not replicating correctly, and AOF issues Former-commit-id: bd183cdee13081a02efef5df75edf2292b872a16 --- src/aof.cpp | 89 +++++++++++--- src/cron.cpp | 12 ++ src/db.cpp | 43 +++++++ src/debug.cpp | 5 + src/expire.cpp | 30 +++-- src/replication.cpp | 24 +++- src/server.cpp | 9 ++ src/server.h | 8 +- tests/integration/aof.tcl | 22 ++++ tests/integration/replication-4.tcl | 88 ++++++++++++++ tests/integration/replication-active.tcl | 145 ++++++++++++++++++----- 11 files changed, 417 insertions(+), 58 deletions(-) diff --git a/src/aof.cpp b/src/aof.cpp index de8a8260e..4a7eb26ed 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -597,21 +597,59 @@ sds catAppendOnlyExpireAtCommand(sds buf, struct redisCommand *cmd, robj *key, r return buf; } -void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) { - sds buf = sdsempty(); - robj *tmpargv[3]; - - /* The DB this command was targeting is not the same as the last command - * we appended. To issue a SELECT command is needed. */ - if (dictid != g_pserver->aof_selected_db) { - char seldb[64]; - - snprintf(seldb,sizeof(seldb),"%d",dictid); - buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n", - (unsigned long)strlen(seldb),seldb); - g_pserver->aof_selected_db = dictid; +sds catAppendOnlyExpireMemberAtCommand(sds buf, struct redisCommand *cmd, robj **argv, const size_t argc) { + long long when = 0; + int unit = UNIT_SECONDS; + bool fAbsolute = false; + + if (cmd->proc == expireMemberCommand) { + if (getLongLongFromObject(argv[3], &when) != C_OK) + serverPanic("propogating invalid EXPIREMEMBER command"); + + if (argc == 5) { + unit = parseUnitString(szFromObj(argv[4])); + } + } else if (cmd->proc == expireMemberAtCommand) { + if (getLongLongFromObject(argv[3], &when) != C_OK) + serverPanic("propogating invalid EXPIREMEMBERAT command"); + fAbsolute = true; + } else if (cmd->proc == pexpireMemberAtCommand) { + if (getLongLongFromObject(argv[3], &when) != C_OK) + serverPanic("propogating invalid PEXPIREMEMBERAT command"); + fAbsolute = true; + unit = UNIT_MILLISECONDS; + } else { + serverPanic("Unknown expiremember command"); } + switch (unit) + { + case UNIT_SECONDS: + when *= 1000; + break; + + case UNIT_MILLISECONDS: + break; + } + + if (!fAbsolute) + when += mstime(); + + robj *argvNew[4]; + argvNew[0] = createStringObject("PEXPIREMEMBERAT",15); + argvNew[1] = argv[1]; + argvNew[2] = argv[2]; + argvNew[3] = createStringObjectFromLongLong(when); + buf = catAppendOnlyGenericCommand(buf, 4, argvNew); + decrRefCount(argvNew[0]); + decrRefCount(argvNew[3]); + return buf; +} + +sds catCommandForAofAndActiveReplication(sds buf, struct redisCommand *cmd, robj **argv, int argc) +{ + robj *tmpargv[3]; + if (cmd->proc == expireCommand || cmd->proc == pexpireCommand || cmd->proc == expireatCommand) { /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */ @@ -640,6 +678,10 @@ void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int a if (pxarg) buf = catAppendOnlyExpireAtCommand(buf,cserver.pexpireCommand,argv[1], pxarg); + } else if (cmd->proc == expireMemberCommand || cmd->proc == expireMemberAtCommand || + cmd->proc == pexpireMemberAtCommand) { + /* Translate subkey expire commands to PEXPIREMEMBERAT */ + buf = catAppendOnlyExpireMemberAtCommand(buf, cmd, argv, argc); } else { /* All the other commands don't need translation or need the * same translation already operated in the command vector @@ -647,6 +689,25 @@ void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int a buf = catAppendOnlyGenericCommand(buf,argc,argv); } + return buf; +} + +void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) { + sds buf = sdsempty(); + + /* The DB this command was targeting is not the same as the last command + * we appended. To issue a SELECT command is needed. */ + if (dictid != g_pserver->aof_selected_db) { + char seldb[64]; + + snprintf(seldb,sizeof(seldb),"%d",dictid); + buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n", + (unsigned long)strlen(seldb),seldb); + g_pserver->aof_selected_db = dictid; + } + + buf = catCommandForAofAndActiveReplication(buf, cmd, argv, argc); + /* Append to the AOF buffer. This will be flushed on disk just before * of re-entering the event loop, so before the client will get a * positive reply about the operation performed. */ @@ -1378,7 +1439,7 @@ int rewriteAppendOnlyFileRio(rio *aof) { } else { - char cmd[]="*4\r\n$12\r\nEXPIREMEMBER\r\n"; + char cmd[]="*4\r\n$12\r\nPEXPIREMEMBERAT\r\n"; if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr; if (rioWriteBulkObject(aof,&key) == 0) goto werr; if (rioWrite(aof,subExpire.subkey(),sdslen(subExpire.subkey())) == 0) goto werr; diff --git a/src/cron.cpp b/src/cron.cpp index 230cf4ed4..9ef8bbfb4 100644 --- a/src/cron.cpp +++ b/src/cron.cpp @@ -70,6 +70,7 @@ void cronCommand(client *c) decrRefCount(o); // use an expire to trigger execution. Note: We use a subkey expire here so legacy clients don't delete it. setExpire(c, c->db, c->argv[ARG_NAME], c->argv[ARG_NAME], base + interval); + ++g_pserver->dirty; addReply(c, shared.ok); } @@ -86,6 +87,7 @@ void executeCronJobExpireHook(const char *key, robj *o) serverAssert(cFake->argc == 0); // Setup the args for the EVAL command + cFake->cmd = lookupCommandByCString("EVAL"); cFake->argc = 3 + job->veckeys.size() + job->vecargs.size(); cFake->argv = (robj**)zmalloc(sizeof(robj*) * cFake->argc, MALLOC_LOCAL); cFake->argv[0] = createStringObject("EVAL", 4); @@ -96,7 +98,17 @@ void executeCronJobExpireHook(const char *key, robj *o) for (size_t i = 0; i < job->vecargs.size(); ++i) cFake->argv[3+job->veckeys.size()+i] = createStringObject(job->vecargs[i].get(), job->vecargs[i].size()); + int lua_replicate_backup = g_pserver->lua_always_replicate_commands; + g_pserver->lua_always_replicate_commands = 0; evalCommand(cFake); + g_pserver->lua_always_replicate_commands = lua_replicate_backup; + + if (g_pserver->aof_state != AOF_OFF) + feedAppendOnlyFile(cFake->cmd,cFake->db->id,cFake->argv,cFake->argc); + // Active replicas do their own expiries, do not propogate + if (!g_pserver->fActiveReplica) + replicationFeedSlaves(g_pserver->slaves,cFake->db->id,cFake->argv,cFake->argc); + resetClient(cFake); robj *keyobj = createStringObject(key,sdslen(key)); diff --git a/src/db.cpp b/src/db.cpp index 562485941..5ecd3253c 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -1372,6 +1372,7 @@ expireEntry *getExpire(redisDb *db, robj_roptr key) { * keys. */ void propagateExpire(redisDb *db, robj *key, int lazy) { serverAssert(GlobalLocksAcquired()); + robj *argv[2]; argv[0] = lazy ? shared.unlink : shared.del; @@ -1389,6 +1390,48 @@ void propagateExpire(redisDb *db, robj *key, int lazy) { decrRefCount(argv[1]); } +void propagateSubkeyExpire(redisDb *db, int type, robj *key, robj *subkey) +{ + robj *argv[3]; + robj objT; + redisCommand *cmd = nullptr; + switch (type) + { + case OBJ_SET: + argv[0] = shared.srem; + argv[1] = key; + argv[2] = subkey; + cmd = cserver.sremCommand; + break; + + case OBJ_HASH: + argv[0] = shared.hdel; + argv[1] = key; + argv[2] = subkey; + cmd = cserver.hdelCommand; + break; + + case OBJ_ZSET: + argv[0] = shared.zrem; + argv[1] = key; + argv[2] = subkey; + cmd = cserver.zremCommand; + break; + + case OBJ_CRON: + return; // CRON jobs replicate in their own handler + + default: + serverPanic("Unknown subkey type"); + } + + if (g_pserver->aof_state != AOF_OFF) + feedAppendOnlyFile(cmd,db->id,argv,3); + // Active replicas do their own expiries, do not propogate + if (!g_pserver->fActiveReplica) + replicationFeedSlaves(g_pserver->slaves,db->id,argv,3); +} + /* Check if the key is expired. Note, this does not check subexpires */ int keyIsExpired(redisDb *db, robj *key) { expireEntry *pexpire = getExpire(db,key); diff --git a/src/debug.cpp b/src/debug.cpp index 1916dc79b..ca4da2d0a 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -31,6 +31,7 @@ #include "server.h" #include "sha1.h" /* SHA1 is used for DEBUG DIGEST */ #include "crc64.h" +#include "cron.h" #include #include @@ -251,6 +252,10 @@ void xorObjectDigest(redisDb *db, robj_roptr keyobj, unsigned char *digest, robj mt->digest(&md,mv->value); xorDigest(digest,md.x,sizeof(md.x)); } + } else if (o->type == OBJ_CRON) { + cronjob *job = (cronjob*)ptrFromObj(o); + mixDigest(digest, &job->interval, sizeof(job->interval)); + mixDigest(digest, job->script.get(), job->script.size()); } else { serverPanic("Unknown object type"); } diff --git a/src/expire.cpp b/src/expire.cpp index 1ac6ab415..ae9c75e8a 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -78,6 +78,10 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { dictEntry *de = dictFind(db->pdict, e.key()); robj *val = (robj*)dictGetVal(de); int deleted = 0; + + robj objKey; + initStaticStringObject(objKey, (char*)e.key()); + while (!pfat->FEmpty()) { if (pfat->nextExpireEntry().when > now) @@ -130,6 +134,11 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { default: serverAssert(false); } + + robj objSubkey; + initStaticStringObject(objSubkey, (char*)pfat->nextExpireEntry().spsubkey.get()); + propagateSubkeyExpire(db, val->type, &objKey, &objSubkey); + pfat->popfrontExpireEntry(); } @@ -144,22 +153,18 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { db->setexpire->insert(eT); } - robj objT; switch (val->type) { case OBJ_SET: - initStaticStringObject(objT, (char*)e.key()); - signalModifiedKey(db,&objT); - notifyKeyspaceEvent(NOTIFY_SET,"srem",&objT,db->id); + signalModifiedKey(db,&objKey); + notifyKeyspaceEvent(NOTIFY_SET,"srem",&objKey,db->id); break; } } if (pfat->FEmpty()) { - robj *keyobj = createStringObject(e.key(),sdslen(e.key())); - removeExpire(db, keyobj); - decrRefCount(keyobj); + removeExpire(db, &objKey); } } @@ -224,7 +229,8 @@ void expireMemberCore(client *c, robj *key, robj *subkey, long long basetime, lo } setExpire(c, c->db, key, subkey, when); - + signalModifiedKey(c->db, key); + g_pserver->dirty++; addReply(c, shared.cone); } @@ -256,6 +262,14 @@ void expireMemberAtCommand(client *c) expireMemberCore(c, c->argv[1], c->argv[2], 0, when, UNIT_SECONDS); } +void pexpireMemberAtCommand(client *c) +{ + long long when; + if (getLongLongFromObjectOrReply(c, c->argv[3], &when, NULL) != C_OK) + return; + + expireMemberCore(c, c->argv[1], c->argv[2], 0, when, UNIT_MILLISECONDS); +} /* Try to expire a few timed out keys. The algorithm used is adaptive and * will use few CPU cycles if there are few expiring keys, otherwise diff --git a/src/replication.cpp b/src/replication.cpp index dcd6a915b..a0a69da26 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -251,6 +251,8 @@ void feedReplicationBacklogWithObject(robj *o) { feedReplicationBacklog(p,len); } +sds catCommandForAofAndActiveReplication(sds buf, struct redisCommand *cmd, robj **argv, int argc); + void replicationFeedSlave(client *replica, int dictid, robj **argv, int argc, bool fSendRaw) { char llstr[LONG_STR_SIZE]; @@ -289,13 +291,23 @@ void replicationFeedSlave(client *replica, int dictid, robj **argv, int argc, bo * are queued in the output buffer until the initial SYNC completes), * or are already in sync with the master. */ - /* Add the multi bulk length. */ - addReplyArrayLenAsync(replica,argc); + if (fSendRaw) + { + /* Add the multi bulk length. */ + addReplyArrayLenAsync(replica,argc); - /* Finally any additional argument that was not stored inside the - * static buffer if any (from j to argc). */ - for (int j = 0; j < argc; j++) - addReplyBulkAsync(replica,argv[j]); + /* Finally any additional argument that was not stored inside the + * static buffer if any (from j to argc). */ + for (int j = 0; j < argc; j++) + addReplyBulkAsync(replica,argv[j]); + } + else + { + struct redisCommand *cmd = lookupCommand(szFromObj(argv[0])); + sds buf = catCommandForAofAndActiveReplication(sdsempty(), cmd, argv, argc); + addReplyProtoAsync(replica, buf, sdslen(buf)); + sdsfree(buf); + } } /* Propagate write commands to slaves, and populate the replication backlog diff --git a/src/server.cpp b/src/server.cpp index 88f0b539b..4efe103ab 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -634,6 +634,10 @@ struct redisCommand redisCommandTable[] = { {"expirememberat", expireMemberAtCommand, 4, "write fast @keyspace", 0,NULL,1,1,1,0,0,0}, + + {"pexpirememberat", pexpireMemberAtCommand, 4, + "write fast @keyspace", + 0,NULL,1,1,1,0,0,0}, {"pexpire",pexpireCommand,3, "write fast @keyspace", @@ -2289,6 +2293,9 @@ void createSharedObjects(void) { shared.rpoplpush = makeObjectShared(createStringObject("RPOPLPUSH",9)); shared.zpopmin = makeObjectShared(createStringObject("ZPOPMIN",7)); shared.zpopmax = makeObjectShared(createStringObject("ZPOPMAX",7)); + shared.hdel = makeObjectShared(createStringObject("HDEL", 4)); + shared.zrem = makeObjectShared(createStringObject("ZREM", 4)); + shared.srem = makeObjectShared(createStringObject("SREM", 4)); for (j = 0; j < OBJ_SHARED_INTEGERS; j++) { shared.integers[j] = makeObjectShared(createObject(OBJ_STRING,(void*)(long)j)); @@ -2514,6 +2521,8 @@ void initServerConfig(void) { cserver.xclaimCommand = lookupCommandByCString("xclaim"); cserver.xgroupCommand = lookupCommandByCString("xgroup"); cserver.rreplayCommand = lookupCommandByCString("rreplay"); + cserver.hdelCommand = lookupCommandByCString("hdel"); + cserver.zremCommand = lookupCommandByCString("zrem"); /* Slow log */ g_pserver->slowlog_log_slower_than = CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN; diff --git a/src/server.h b/src/server.h index 218982bda..307321bab 100644 --- a/src/server.h +++ b/src/server.h @@ -1360,7 +1360,7 @@ struct sharedObjectsStruct { *masterdownerr, *roslaveerr, *execaborterr, *noautherr, *noreplicaserr, *busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk, *unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *unlink, - *rpop, *lpop, *lpush, *rpoplpush, *zpopmin, *zpopmax, *emptyscan, + *rpop, *lpop, *lpush, *rpoplpush, *zpopmin, *zpopmax, *emptyscan, *srem, *hdel, *zrem, *select[PROTO_SHARED_SELECT_CMDS], *integers[OBJ_SHARED_INTEGERS], *mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*\r\n" */ @@ -1581,7 +1581,7 @@ struct redisServerConst { *lpopCommand, *rpopCommand, *zpopminCommand, *zpopmaxCommand, *sremCommand, *execCommand, *expireCommand, *pexpireCommand, *xclaimCommand, - *xgroupCommand, *rreplayCommand; + *xgroupCommand, *rreplayCommand, *hdelCommand, *zremCommand; /* Configuration */ char *default_masteruser; /* AUTH with this user and masterauth with master */ @@ -2536,6 +2536,7 @@ int removeExpire(redisDb *db, robj *key); int removeExpireCore(redisDb *db, robj *key, dictEntry *de); int removeSubkeyExpire(redisDb *db, robj *key, robj *subkey); void propagateExpire(redisDb *db, robj *key, int lazy); +void propagateSubkeyExpire(redisDb *db, int type, robj *key, robj *subkey); int expireIfNeeded(redisDb *db, robj *key); expireEntry *getExpire(redisDb *db, robj_roptr key); void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when); @@ -2659,6 +2660,8 @@ extern "C" char *redisGitSHA1(void); extern "C" char *redisGitDirty(void); extern "C" uint64_t redisBuildId(void); +int parseUnitString(const char *sz); + /* Commands prototypes */ void authCommand(client *c); void pingCommand(client *c); @@ -2736,6 +2739,7 @@ void expireCommand(client *c); void expireatCommand(client *c); void expireMemberCommand(client *c); void expireMemberAtCommand(client *c); +void pexpireMemberAtCommand(client *c); void pexpireCommand(client *c); void pexpireatCommand(client *c); void getsetCommand(client *c); diff --git a/tests/integration/aof.tcl b/tests/integration/aof.tcl index 277b0d3df..0a1a21fdd 100644 --- a/tests/integration/aof.tcl +++ b/tests/integration/aof.tcl @@ -257,4 +257,26 @@ tags {"aof"} { r expire x -1 } } + + ## Test that PEXPIREMEMBERAT is loaded correctly + create_aof { + append_to_aof [formatCommand sadd testkey a b c d] + append_to_aof [formatCommand pexpirememberat testkey a 1000] + } + + start_server_aof [list dir $server_path aof-load-truncated no] { + test "AOF+EXPIREMEMBER: Server shuold have been started" { + assert_equal 1 [is_alive $srv] + } + + test "AOF+PEXPIREMEMBERAT: set should have 3 values" { + set client [redis [dict get $srv host] [dict get $srv port]] + wait_for_condition 50 100 { + [catch {$client ping} e] == 0 + } else { + fail "Loading DB is taking too much time." + } + assert_equal 3 [$client scard testkey] + } + } } diff --git a/tests/integration/replication-4.tcl b/tests/integration/replication-4.tcl index 3c6df52a8..4065de786 100644 --- a/tests/integration/replication-4.tcl +++ b/tests/integration/replication-4.tcl @@ -151,5 +151,93 @@ start_server {tags {"repl"}} { fail "SPOP replication inconsistency" } } + + test {Replication of EXPIREMEMBER (set) command} { + $master sadd testkey a b c d + wait_for_condition 50 100 { + [$master debug digest] eq [$slave debug digest] + } else { + fail "Failed to replicate set" + } + $master expiremember testkey a 1 + after 1000 + wait_for_condition 50 100 { + [$master scard testkey] eq 3 + } else { + fail "expiremember failed to work on master" + } + wait_for_condition 50 100 { + [$slave scard testkey] eq 3 + } else { + assert_equal [$slave scard testkey] 3 + } + $master del testkey + } + + test {Replication of EXPIREMEMBER (hash) command} { + $master hset testkey a value + $master hset testkey b value + wait_for_condition 50 100 { + [$master debug digest] eq [$slave debug digest] + } else { + fail "Failed to replicate set" + } + $master expiremember testkey a 1 + after 1000 + wait_for_condition 50 100 { + [$master hlen testkey] eq 1 + } else { + fail "expiremember failed to work on master" + } + wait_for_condition 50 100 { + [$slave hlen testkey] eq 1 + } else { + assert_equal [$slave hlen testkey] 1 + } + $master del testkey + } + + test {Replication of EXPIREMEMBER (zset) command} { + $master zadd testkey 1 a + $master zadd testkey 2 b + wait_for_condition 50 100 { + [$master debug digest] eq [$slave debug digest] + } else { + fail "Failed to replicate set" + } + $master expiremember testkey a 1 + after 1000 + wait_for_condition 50 100 { + [$master zcard testkey] eq 1 + } else { + fail "expiremember failed to work on master" + } + wait_for_condition 50 100 { + [$slave zcard testkey] eq 1 + } else { + assert_equal [$slave zcard testkey] 1 + } + } + + test {keydb.cron replicates} { + $master del testkey + $master keydb.cron testjob repeat 0 1000000 {redis.call("incr", "testkey")} 1 testkey + after 300 + assert_equal 1 [$master get testkey] + assert_equal 1 [$master exists testjob] + + wait_for_condition 50 100 { + [$master debug digest] eq [$slave debug digest] + } else { + fail "KEYDB.CRON failed to replicate" + } + $master del testjob + $master del testkey + wait_for_condition 50 1000 { + [$master debug digest] eq [$slave debug digest] + } else { + fail "cron delete failed to propogate" + } + } } } diff --git a/tests/integration/replication-active.tcl b/tests/integration/replication-active.tcl index 0c28eb85d..b1d1c5217 100644 --- a/tests/integration/replication-active.tcl +++ b/tests/integration/replication-active.tcl @@ -77,6 +77,94 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} { $master flushall } + test {Replication of EXPIREMEMBER (set) command (Active)} { + $master sadd testkey a b c d + wait_for_condition 50 100 { + [$master debug digest] eq [$slave debug digest] + } else { + fail "Failed to replicate set" + } + $master expiremember testkey a 1 + after 1000 + wait_for_condition 50 100 { + [$master scard testkey] eq 3 + } else { + fail "expiremember failed to work on master" + } + wait_for_condition 50 100 { + [$slave scard testkey] eq 3 + } else { + assert_equal [$slave scard testkey] 3 + } + $master del testkey + } + + test {Replication of EXPIREMEMBER (hash) command (Active)} { + $master hset testkey a value + $master hset testkey b value + wait_for_condition 50 100 { + [$master debug digest] eq [$slave debug digest] + } else { + fail "Failed to replicate set" + } + $master expiremember testkey a 1 + after 1000 + wait_for_condition 50 100 { + [$master hlen testkey] eq 1 + } else { + fail "expiremember failed to work on master" + } + wait_for_condition 50 100 { + [$slave hlen testkey] eq 1 + } else { + assert_equal [$slave hlen testkey] 1 + } + $master del testkey + } + + test {Replication of EXPIREMEMBER (zset) command (Active)} { + $master zadd testkey 1 a + $master zadd testkey 2 b + wait_for_condition 50 100 { + [$master debug digest] eq [$slave debug digest] + } else { + fail "Failed to replicate set" + } + $master expiremember testkey a 1 + after 1000 + wait_for_condition 50 100 { + [$master zcard testkey] eq 1 + } else { + fail "expiremember failed to work on master" + } + wait_for_condition 50 100 { + [$slave zcard testkey] eq 1 + } else { + assert_equal [$slave zcard testkey] 1 + } + } + + test {keydb.cron replicates (Active) } { + $master del testkey + $master keydb.cron testjob repeat 0 1000000 {redis.call("incr", "testkey")} 1 testkey + after 300 + assert_equal 1 [$master get testkey] + assert_equal 1 [$master exists testjob] + + wait_for_condition 50 100 { + [$master debug digest] eq [$slave debug digest] + } else { + fail "KEYDB.CRON failed to replicate" + } + $master del testjob + $master del testkey + wait_for_condition 50 1000 { + [$master debug digest] eq [$slave debug digest] + } else { + fail "cron delete failed to propogate" + } + } + test {Active replicas WAIT} { # Test that wait succeeds since replicas should be syncronized $master set testkey foo @@ -113,37 +201,38 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} { assert_equal {0} [$slave del testkey1] } - test {Active replica expire propogates when source is down} { - $slave flushall - $slave set testkey2 foo - $slave set testkey1 foo - wait_for_condition 50 1000 { - [string match *foo* [$master get testkey1]] - } else { - fail "Replication failed to propogate" - } - $slave expire testkey1 2 - assert_equal {1} [$slave wait 1 500] { "value should propogate - within 0.5 seconds" } - exec kill -SIGSTOP $slave_pid + test {Active replica expire propogates when source is down} { + $slave flushall + $slave set testkey2 foo + $slave set testkey1 foo + wait_for_condition 50 1000 { + [string match *foo* [$master get testkey1]] + } else { + fail "Replication failed to propogate" + } + $slave expire testkey1 2 + assert_equal {1} [$slave wait 1 500] { "value should propogate + within 0.5 seconds" } + exec kill -SIGSTOP $slave_pid after 3000 - # Ensure testkey1 is gone. Note, we can't do this directly as the normal commands lie to us - # about what is actually in the dict. The only way to know is with a count from info + # Ensure testkey1 is gone. Note, we can't do this directly as the normal commands lie to us + # about what is actually in the dict. The only way to know is with a count from info assert_equal {1} [expr [string first {keys=1} [$master info keyspace]] >= 0] {"slave expired"} - } - exec kill -SIGCONT $slave_pid + } - test {Active replica different databases} { - $master select 3 - $master set testkey abcd - $master select 2 - $master del testkey - $slave select 3 - wait_for_condition 50 1000 { - [string match abcd [$slave get testkey]] - } else { - fail "Replication failed to propogate DB 3" - } + exec kill -SIGCONT $slave_pid + + test {Active replica different databases} { + $master select 3 + $master set testkey abcd + $master select 2 + $master del testkey + $slave select 3 + wait_for_condition 50 1000 { + [string match abcd [$slave get testkey]] + } else { + fail "Replication failed to propogate DB 3" } } } +} From bf93e32e7e76dce4cae155adf99538eb15479340 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sat, 4 Apr 2020 22:32:15 -0400 Subject: [PATCH 2/7] Role command protocol corruption with multiple masters Former-commit-id: 888d69a87a0076caa5b381d2531a6a638aa69051 --- src/replication.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/replication.cpp b/src/replication.cpp index a0a69da26..4c5e4a26c 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -2594,6 +2594,8 @@ void roleCommand(client *c) { listNode *ln; listRewind(g_pserver->masters, &li); + if (listLength(g_pserver->masters) > 1) + addReplyArrayLen(c,listLength(g_pserver->masters)); while ((ln = listNext(&li))) { redisMaster *mi = (redisMaster*)listNodeValue(ln); From a6444c8ce980c27176d37900359b79b12d9511e6 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sat, 4 Apr 2020 22:45:12 -0400 Subject: [PATCH 3/7] Fix issue #164 Former-commit-id: f112c77fcc3a60277ce344478bc37adb0fe4a99d --- src/evict.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/evict.cpp b/src/evict.cpp index 04fd7a7d6..f21ee0ea7 100644 --- a/src/evict.cpp +++ b/src/evict.cpp @@ -462,7 +462,7 @@ int freeMemoryIfNeeded(void) { serverAssert(GlobalLocksAcquired()); /* By default replicas should ignore maxmemory * and just be masters exact copies. */ - if (listLength(g_pserver->masters) && g_pserver->repl_slave_ignore_maxmemory) return C_OK; + if (listLength(g_pserver->masters) && g_pserver->repl_slave_ignore_maxmemory && !g_pserver->fActiveReplica) return C_OK; size_t mem_reported, mem_tofree, mem_freed; mstime_t latency, eviction_latency; From 9b2392107c8c9f5b419bd579029cab23df3fa926 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sat, 4 Apr 2020 22:58:17 -0400 Subject: [PATCH 4/7] Add the ability to set a starting core # when setting thread affinity Former-commit-id: 9e2e2067c6df5919f1c6b8b9e6e3457c7edc0755 --- src/config.cpp | 10 ++++++++-- src/networking.cpp | 2 +- src/server.cpp | 5 +++-- src/server.h | 1 + 4 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/config.cpp b/src/config.cpp index 4035ca376..70448a916 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -782,8 +782,14 @@ void loadServerConfigFromString(char *config) { } else if (strcasecmp(argv[1], "false") == 0) { cserver.fThreadAffinity = FALSE; } else { - err = "Unknown argument: server-thread-affinity expects either true or false"; - goto loaderr; + int offset = atoi(argv[1]); + if (offset > 0) { + cserver.fThreadAffinity = TRUE; + cserver.threadAffinityOffset = offset-1; + } else { + err = "Unknown argument: server-thread-affinity expects either true or false"; + goto loaderr; + } } } else if (!strcasecmp(argv[0], "active-replica") && argc == 2) { g_pserver->fActiveReplica = yesnotoi(argv[1]); diff --git a/src/networking.cpp b/src/networking.cpp index 276095715..fb7d286c6 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1073,7 +1073,7 @@ static void acceptCommonHandler(int fd, int flags, char *ip, int iel) { // Set thread affinity if (cserver.fThreadAffinity) { - int cpu = iel; + int cpu = iel + cserver.threadAffinityOffset; if (setsockopt(fd, SOL_SOCKET, SO_INCOMING_CPU, &cpu, sizeof(iel)) != 0) { serverLog(LL_WARNING, "Failed to set socket affinity"); diff --git a/src/server.cpp b/src/server.cpp index 4efe103ab..0c4b3116c 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2547,6 +2547,7 @@ void initServerConfig(void) { /* Multithreading */ cserver.cthreads = CONFIG_DEFAULT_THREADS; cserver.fThreadAffinity = CONFIG_DEFAULT_THREAD_AFFINITY; + cserver.threadAffinityOffset = 0; } extern char **environ; @@ -5350,10 +5351,10 @@ int main(int argc, char **argv) { #ifdef __linux__ cpu_set_t cpuset; CPU_ZERO(&cpuset); - CPU_SET(iel, &cpuset); + CPU_SET(iel + cserver.threadAffinityOffset, &cpuset); if (pthread_setaffinity_np(rgthread[iel], sizeof(cpu_set_t), &cpuset) == 0) { - serverLog(LOG_INFO, "Binding thread %d to cpu %d", iel, iel); + serverLog(LOG_INFO, "Binding thread %d to cpu %d", iel, iel + cserver.threadAffinityOffset + 1); } #else serverLog(LL_WARNING, "CPU pinning not available on this platform"); diff --git a/src/server.h b/src/server.h index 307321bab..5d5f4b3e0 100644 --- a/src/server.h +++ b/src/server.h @@ -1574,6 +1574,7 @@ struct redisServerConst { int cthreads; /* Number of main worker threads */ int fThreadAffinity; /* Should we pin threads to cores? */ + int threadAffinityOffset = 0; /* Where should we start pinning them? */ char *pidfile; /* PID file path */ /* Fast pointers to often looked up command */ From dfdc0cefdd153b9b48f9cc2a219594f2ecdf3717 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 13 Apr 2020 22:11:34 -0400 Subject: [PATCH 5/7] Optimize replicaFeedSlaves by removing use of snprintf Former-commit-id: 32561a99124542461de283d5035f869b5fc9bc2f --- src/replication.cpp | 46 ++++++++++++++++++++++++++++++++++----------- 1 file changed, 35 insertions(+), 11 deletions(-) diff --git a/src/replication.cpp b/src/replication.cpp index 4c5e4a26c..7a5fce456 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -310,6 +310,26 @@ void replicationFeedSlave(client *replica, int dictid, robj **argv, int argc, bo } } +static int writeProtoNum(char *dst, const size_t cchdst, long long num) +{ + if (cchdst < 1) + return 0; + dst[0] = '$'; + int cch = 1; + cch += ll2string(dst + cch, cchdst - cch, digits10(num)); + int chCpyT = std::min(cchdst - cch, 2); + memcpy(dst + cch, "\r\n", chCpyT); + cch += chCpyT; + cch += ll2string(dst + cch, cchdst-cch, num); + chCpyT = std::min(cchdst - cch, 3); + memcpy(dst + cch, "\r\n", chCpyT); + if (chCpyT == 3) + cch += 2; + else + cch += chCpyT; + return cch; +} + /* Propagate write commands to slaves, and populate the replication backlog * as well. This function is used if the instance is a master: we use * the commands received by our clients in order to create the replication @@ -354,24 +374,28 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { serverAssert(argc > 0); serverAssert(cchbuf > 0); - char uuid[40] = {'\0'}; + char uuid[37]; uuid_unparse(cserver.uuid, uuid); - char proto[1024]; - int cchProto = snprintf(proto, sizeof(proto), "*5\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf); - cchProto = std::min((int)sizeof(proto), cchProto); + + + // The code below used to be: snprintf(proto, sizeof(proto), "*5\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf); + // but that was much too slow + char proto[1024] = "*5\r\n$7\r\nRREPLAY\r\n$36\r\n00000000-0000-0000-0000-000000000000\r\n$"; + int cchProto = strlen(proto); + memcpy(proto + 22, uuid, 36); // Note UUID_STR_LEN includes the \0 trailing byte which we don't want + cchProto += ll2string(proto + cchProto, sizeof(proto)-cchProto, cchbuf); + memcpy(proto + cchProto, "\r\n", 3); + cchProto += 2; + long long master_repl_offset_start = g_pserver->master_repl_offset; char szDbNum[128]; - int cchDictIdNum = snprintf(szDbNum, sizeof(szDbNum), "%d", dictid); - int cchDbNum = snprintf(szDbNum, sizeof(szDbNum), "$%d\r\n%d\r\n", cchDictIdNum, dictid); - cchDbNum = std::min(cchDbNum, sizeof(szDbNum)); // snprintf is tricky like that + int cchDbNum = writeProtoNum(szDbNum, sizeof(szDbNum), dictid); + char szMvcc[128]; incrementMvccTstamp(); - uint64_t mvccTstamp = getMvccTstamp(); - int cchMvccNum = snprintf(szMvcc, sizeof(szMvcc), "%" PRIu64, mvccTstamp); - int cchMvcc = snprintf(szMvcc, sizeof(szMvcc), "$%d\r\n%" PRIu64 "\r\n", cchMvccNum, mvccTstamp); - cchMvcc = std::min(cchMvcc, sizeof(szMvcc)); // tricky snprintf + int cchMvcc = writeProtoNum(szMvcc, sizeof(szMvcc), getMvccTstamp()); /* Write the command to the replication backlog if any. */ if (g_pserver->repl_backlog) From 4ed8f38a1338ba11ca0341ef7c0c7c2ae7aba359 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 13 Apr 2020 22:35:06 -0400 Subject: [PATCH 6/7] Don't do active replica work if we're not an active replica Former-commit-id: 63dd1fb599cfe959c0298825ed56ab06335b3fd7 --- src/replication.cpp | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/src/replication.cpp b/src/replication.cpp index 7a5fce456..8380862ca 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -380,22 +380,34 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { // The code below used to be: snprintf(proto, sizeof(proto), "*5\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf); // but that was much too slow - char proto[1024] = "*5\r\n$7\r\nRREPLAY\r\n$36\r\n00000000-0000-0000-0000-000000000000\r\n$"; - int cchProto = strlen(proto); - memcpy(proto + 22, uuid, 36); // Note UUID_STR_LEN includes the \0 trailing byte which we don't want - cchProto += ll2string(proto + cchProto, sizeof(proto)-cchProto, cchbuf); - memcpy(proto + cchProto, "\r\n", 3); - cchProto += 2; + static const char *protoRREPLAY = "*5\r\n$7\r\nRREPLAY\r\n$36\r\n00000000-0000-0000-0000-000000000000\r\n$"; + char proto[1024]; + int cchProto = 0; + if (!fSendRaw) + { + cchProto = strlen(protoRREPLAY); + memcpy(proto, protoRREPLAY, strlen(protoRREPLAY)); + memcpy(proto + 22, uuid, 36); // Note UUID_STR_LEN includes the \0 trailing byte which we don't want + cchProto += ll2string(proto + cchProto, sizeof(proto)-cchProto, cchbuf); + memcpy(proto + cchProto, "\r\n", 3); + cchProto += 2; + } long long master_repl_offset_start = g_pserver->master_repl_offset; char szDbNum[128]; - int cchDbNum = writeProtoNum(szDbNum, sizeof(szDbNum), dictid); + int cchDbNum = 0; + if (!fSendRaw) + cchDbNum = writeProtoNum(szDbNum, sizeof(szDbNum), dictid); char szMvcc[128]; - incrementMvccTstamp(); - int cchMvcc = writeProtoNum(szMvcc, sizeof(szMvcc), getMvccTstamp()); + int cchMvcc = 0; + incrementMvccTstamp(); // Always increment MVCC tstamp so we're consistent with active and normal replication + if (!fSendRaw) + { + cchMvcc = writeProtoNum(szMvcc, sizeof(szMvcc), getMvccTstamp()); + } /* Write the command to the replication backlog if any. */ if (g_pserver->repl_backlog) From 8fb3be9ce18336efe8b80d4e044a07a894058c59 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 13 Apr 2020 22:45:15 -0400 Subject: [PATCH 7/7] Cache fake client in replicaFeedSlaves Former-commit-id: 8e81e5f29e718395b32a60ff263808305d0b5818 --- src/replication.cpp | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/src/replication.cpp b/src/replication.cpp index 8380862ca..179fca5a4 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -340,6 +340,8 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { listIter li, liReply; int j, len; serverAssert(GlobalLocksAcquired()); + static client *fake = nullptr; + if (dictid < 0) dictid = 0; // this can happen if we send a PING before any real operation @@ -357,8 +359,12 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { /* We can't have slaves attached and no backlog. */ serverAssert(!(listLength(slaves) != 0 && g_pserver->repl_backlog == NULL)); - client *fake = createClient(-1, serverTL - g_pserver->rgthreadvar); - fake->flags |= CLIENT_FORCE_REPLY; + if (fake == nullptr) + { + fake = createClient(-1, serverTL - g_pserver->rgthreadvar); + fake->flags |= CLIENT_FORCE_REPLY; + } + bool fSendRaw = !g_pserver->fActiveReplica; replicationFeedSlave(fake, dictid, argv, argc, fSendRaw); // Note: updates the repl log, keep above the repl update code below @@ -374,10 +380,6 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { serverAssert(argc > 0); serverAssert(cchbuf > 0); - char uuid[37]; - uuid_unparse(cserver.uuid, uuid); - - // The code below used to be: snprintf(proto, sizeof(proto), "*5\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf); // but that was much too slow static const char *protoRREPLAY = "*5\r\n$7\r\nRREPLAY\r\n$36\r\n00000000-0000-0000-0000-000000000000\r\n$"; @@ -385,8 +387,11 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { int cchProto = 0; if (!fSendRaw) { + char uuid[37]; + uuid_unparse(cserver.uuid, uuid); + cchProto = strlen(protoRREPLAY); - memcpy(proto, protoRREPLAY, strlen(protoRREPLAY)); + memcpy(proto, protoRREPLAY, strlen(protoRREPLAY)); memcpy(proto + 22, uuid, 36); // Note UUID_STR_LEN includes the \0 trailing byte which we don't want cchProto += ll2string(proto + cchProto, sizeof(proto)-cchProto, cchbuf); memcpy(proto + cchProto, "\r\n", 3); @@ -405,9 +410,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { int cchMvcc = 0; incrementMvccTstamp(); // Always increment MVCC tstamp so we're consistent with active and normal replication if (!fSendRaw) - { cchMvcc = writeProtoNum(szMvcc, sizeof(szMvcc), getMvccTstamp()); - } /* Write the command to the replication backlog if any. */ if (g_pserver->repl_backlog) @@ -492,7 +495,11 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { } } - freeClient(fake); + // Cleanup cached fake client output buffers + fake->bufpos = 0; + fake->sentlen = 0; + fake->reply_bytes = 0; + listEmpty(fake->reply); } /* This function is used in order to proxy what we receive from our master