From b9b8272724bed931fa86b72c2c1ecd6386be3a99 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sat, 11 Jan 2020 16:34:09 -0500 Subject: [PATCH 01/28] Avoid crash due to excessive posted functions for AOF rewrite Former-commit-id: aa6409f2e8a37288eb4953fbcf3a82e02545348b --- src/aof.cpp | 12 ++++++++---- src/server.h | 1 + 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/aof.cpp b/src/aof.cpp index d48b664d2..b82be9a34 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -165,10 +165,14 @@ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) { /* Install a file event to send data to the rewrite child if there is * not one already. */ - aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, []{ - if (g_pserver->aof_pipe_write_data_to_child >= 0) - aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL); - }); + if (!g_pserver->aof_rewrite_pending) { + g_pserver->aof_rewrite_pending = true; + aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [] { + g_pserver->aof_rewrite_pending = false; + if (g_pserver->aof_pipe_write_data_to_child >= 0) + aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL); + }); + } } /* Write the buffer (possibly composed of multiple blocks) into the specified diff --git a/src/server.h b/src/server.h index 5733264e0..6b8903092 100644 --- a/src/server.h +++ b/src/server.h @@ -1733,6 +1733,7 @@ struct redisServer { int aof_stop_sending_diff; /* If true stop sending accumulated diffs to child process. */ sds aof_child_diff; /* AOF diff accumulator child side. */ + int aof_rewrite_pending = 0; /* is a call to aofChildWriteDiffData already queued? */ /* RDB persistence */ long long dirty; /* Changes to DB from the last save */ long long dirty_before_bgsave; /* Used to restore dirty on failed BGSAVE */ From 1116b63a0e7d675f0a9b46a15eb12eda17d88011 Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 21 Jan 2020 19:50:28 -0500 Subject: [PATCH 02/28] Initial implementation of the CRON command Former-commit-id: 3204a39ada15ec33ac7926dc8b8f0e1875b99acb --- src/Makefile | 2 +- src/expire.cpp | 5 +++++ src/object.cpp | 2 ++ src/rdb.cpp | 31 ++++++++++++++++++++++++++++ src/rdb.h | 5 ++++- src/sds.h | 14 +++++++++++++ src/server.cpp | 7 ++++++- src/server.h | 2 ++ tests/test_helper.tcl | 1 + tests/unit/cron.tcl | 47 +++++++++++++++++++++++++++++++++++++++++++ 10 files changed, 113 insertions(+), 3 deletions(-) create mode 100644 tests/unit/cron.tcl diff --git a/src/Makefile b/src/Makefile index cd411f9cd..a0e48831c 100644 --- a/src/Makefile +++ b/src/Makefile @@ -213,7 +213,7 @@ endif REDIS_SERVER_NAME=keydb-server REDIS_SENTINEL_NAME=keydb-sentinel -REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o $(ASM_OBJ) +REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o cron.o $(ASM_OBJ) REDIS_CLI_NAME=keydb-cli REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o new.o $(ASM_OBJ) REDIS_BENCHMARK_NAME=keydb-benchmark diff --git a/src/expire.cpp b/src/expire.cpp index b311a5673..66a05e4ef 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -31,6 +31,7 @@ */ #include "server.h" +#include "cron.h" void activeExpireCycleExpireFullKey(redisDb *db, const char *key) { robj *keyobj = createStringObject(key,sdslen(key)); @@ -121,6 +122,10 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { } break; + case OBJ_CRON: + executeCronJobExpireHook(e.key(), val); + return; + case OBJ_LIST: default: serverAssert(false); diff --git a/src/object.cpp b/src/object.cpp index 2a9c3f215..14abfcfbc 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -29,6 +29,7 @@ */ #include "server.h" +#include "cron.h" #include #include @@ -369,6 +370,7 @@ void decrRefCount(robj_roptr o) { case OBJ_HASH: freeHashObject(o); break; case OBJ_MODULE: freeModuleObject(o); break; case OBJ_STREAM: freeStreamObject(o); break; + case OBJ_CRON: freeCronObject(o); break; default: serverPanic("Unknown object type"); break; } zfree(o.unsafe_robjcast()); diff --git a/src/rdb.cpp b/src/rdb.cpp index b76afdfd7..48205f430 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -33,6 +33,7 @@ #include "endianconv.h" #include "stream.h" #include "storage.h" +#include "cron.h" #include #include @@ -523,6 +524,11 @@ void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr) { } } +sdsstring rdbLoadString(rio *rdb){ + sds str = (sds)rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL); + return sdsstring(str); +} + robj *rdbLoadStringObject(rio *rdb) { return (robj*)rdbGenericLoadStringObject(rdb,RDB_LOAD_NONE,NULL); } @@ -657,6 +663,8 @@ int rdbSaveObjectType(rio *rdb, robj_roptr o) { return rdbSaveType(rdb,RDB_TYPE_STREAM_LISTPACKS); case OBJ_MODULE: return rdbSaveType(rdb,RDB_TYPE_MODULE_2); + case OBJ_CRON: + return rdbSaveType(rdb,RDB_TYPE_CRON); default: serverPanic("Unknown object type"); } @@ -986,6 +994,17 @@ ssize_t rdbSaveObject(rio *rdb, robj_roptr o, robj *key) { zfree(io.ctx); } return io.error ? -1 : (ssize_t)io.bytes; + } else if (o->type == OBJ_CRON) { + cronjob *job = (cronjob*)ptrFromObj(o); + rdbSaveRawString(rdb, (const unsigned char*)job->script.get(), job->script.size()); + rdbSaveMillisecondTime(rdb, job->startTime); + rdbSaveMillisecondTime(rdb, job->interval); + rdbSaveLen(rdb, job->veckeys.size()); + for (auto &key : job->veckeys) + rdbSaveRawString(rdb, (const unsigned char*)key.get(), key.size()); + rdbSaveLen(rdb, job->vecargs.size()); + for (auto &arg : job->vecargs) + rdbSaveRawString(rdb, (const unsigned char*)arg.get(), arg.size()); } else { serverPanic("Unknown object type"); } @@ -1848,6 +1867,18 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key, uint64_t mvcc_tstamp) { exit(1); } o = createModuleObject(mt,ptr); + } else if (rdbtype == RDB_TYPE_CRON) { + std::unique_ptr spjob = std::make_unique(); + spjob->script = rdbLoadString(rdb); + spjob->startTime = rdbLoadMillisecondTime(rdb,RDB_VERSION); + spjob->interval = rdbLoadMillisecondTime(rdb,RDB_VERSION); + auto ckeys = rdbLoadLen(rdb,NULL); + for (uint64_t i = 0; i < ckeys; ++i) + spjob->veckeys.push_back(rdbLoadString(rdb)); + auto cargs = rdbLoadLen(rdb,NULL); + for (uint64_t i = 0; i < cargs; ++i) + spjob->vecargs.push_back(rdbLoadString(rdb)); + o = createObject(OBJ_CRON, spjob.release()); } else { rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype); } diff --git a/src/rdb.h b/src/rdb.h index edf43d422..b7fc0b71d 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -91,10 +91,13 @@ #define RDB_TYPE_HASH_ZIPLIST 13 #define RDB_TYPE_LIST_QUICKLIST 14 #define RDB_TYPE_STREAM_LISTPACKS 15 + +/* KeyDB Specific Object Types */ +#define RDB_TYPE_CRON 64 /* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType() BELOW */ /* Test if a type is an object type. */ -#define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 15)) +#define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 15) || (t == RDB_TYPE_CRON)) /* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */ #define RDB_OPCODE_MODULE_AUX 247 /* Module auxiliary data. */ diff --git a/src/sds.h b/src/sds.h index 23a11afa4..3315ed9b1 100644 --- a/src/sds.h +++ b/src/sds.h @@ -396,6 +396,20 @@ public: other.m_str = nullptr; } + sdsstring &operator=(const sdsstring &other) + { + sdsfree(m_str); + m_str = sdsdup(other.m_str); + return *this; + } + + sdsstring &operator=(sds other) + { + sdsfree(m_str); + m_str = sdsdup(other); + return *this; + } + ~sdsstring() { sdsfree(m_str); diff --git a/src/server.cpp b/src/server.cpp index 12dff6a85..c6f360905 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -35,6 +35,7 @@ #include "latency.h" #include "atomicvar.h" #include "storage.h" +#include "cron.h" #include #include #include @@ -1023,7 +1024,11 @@ struct redisCommand redisCommandTable[] = { {"rreplay",replicaReplayCommand,-3, "read-only fast noprop", - 0,NULL,0,0,0,0,0,0} + 0,NULL,0,0,0,0,0,0}, + + {"cron",cronCommand,-5, + "write use-memory", + 0,NULL,1,1,1,0,0,0}, }; /*============================ Utility functions ============================ */ diff --git a/src/server.h b/src/server.h index 6b8903092..781f043f0 100644 --- a/src/server.h +++ b/src/server.h @@ -683,6 +683,8 @@ public: * encoding version. */ #define OBJ_MODULE 5 /* Module object. */ #define OBJ_STREAM 6 /* Stream object. */ +#define OBJ_CRON 7 /* CRON job */ + /* Extract encver / signature from a module type ID. */ #define REDISMODULE_TYPE_ENCVER_BITS 10 diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index a06afca3e..ec1d7cd31 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -36,6 +36,7 @@ set ::all_tests { unit/aofrw unit/acl unit/rreplay + unit/cron integration/block-repl integration/replication integration/replication-2 diff --git a/tests/unit/cron.tcl b/tests/unit/cron.tcl new file mode 100644 index 000000000..554df9328 --- /dev/null +++ b/tests/unit/cron.tcl @@ -0,0 +1,47 @@ +start_server {tags {"CRON"}} { + test {cron singleshot past tense} { + r flushall + r cron testjob single 0 1 {redis.call("incr", "testkey")} 1 testkey + after 300 + assert_equal 1 [r get testkey] + assert_equal 0 [r exists testjob] + } + + test {cron repeat past tense next exec is in the future} { + r flushall + r cron testjob repeat 0 1000000 {redis.call("incr", "testkey")} 1 testkey + after 300 + assert_equal 1 [r get testkey] + assert_equal 1 [r exists testjob] + r del testjob + } + + test {cron repeat works} { + r flushall + r cron testjob repeat 0 600 {redis.call("incr","testkey")} + after 1000 + assert_equal 2 [r get testkey] + } + + test {cron overwrite works} { + r flushall + r cron testjob single 500 {redis.call("set","testkey","a")} 1 testkey + r cron testjob single 500 {redis.call("set","anotherkey","b")} 1 anotherkey + after 1000 + assert_equal 0 [r exists testkey] + assert_equal b [r get anotherkey] + } + + test {cron delete key stops job} { + r flushall + r cron testjob single 500 {redis.call("set","testkey","a")} + r del testjob + after 1000 + assert_equal 0 [r exists testkey] + } + + test {cron zero interval rejected} { + catch {r cron testjob single 0 0 {redis.call("incr","testkey")} 1 testkey} e + assert_match {ERR*} $e + } +} From 3be0eac7b66ea7256d719ad197a8769bcd40fb82 Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 21 Jan 2020 19:51:16 -0500 Subject: [PATCH 03/28] Missing cron files Former-commit-id: c983d57f739cc9b4b8271a6d069fad1b5a9d3dad --- src/cron.cpp | 126 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/cron.h | 16 +++++++ 2 files changed, 142 insertions(+) create mode 100644 src/cron.cpp create mode 100644 src/cron.h diff --git a/src/cron.cpp b/src/cron.cpp new file mode 100644 index 000000000..72d26f5c3 --- /dev/null +++ b/src/cron.cpp @@ -0,0 +1,126 @@ +#include "server.h" +#include "cron.h" + +void freeCronObject(robj_roptr o) +{ + delete reinterpret_cast(ptrFromObj(o)); +} + +// CRON [name] [single shot] [optional: start] [delay] [script] [numkeys] [key N] [arg N] +void cronCommand(client *c) +{ + int arg_offset = 0; + static const int ARG_NAME = 1; + static const int ARG_SINGLESHOT = 2; + static const int ARG_EXPIRE = 3; + #define ARG_SCRIPT (4+arg_offset) + #define ARG_NUMKEYS (5+arg_offset) + #define ARG_KEYSTART (6+arg_offset) + + bool fSingleShot = false; + if (strcasecmp("single", szFromObj(c->argv[ARG_SINGLESHOT])) == 0) { + fSingleShot = true; + } else { + if (strcasecmp("repeat", szFromObj(c->argv[ARG_SINGLESHOT])) != 0) { + addReply(c, shared.syntaxerr); + return; + } + } + + long long interval; + if (getLongLongFromObjectOrReply(c, c->argv[ARG_EXPIRE], &interval, "missing expire time") != C_OK) + return; + + long long base = g_pserver->mstime; + if (getLongLongFromObject(c->argv[ARG_EXPIRE+1], &base) == C_OK) { + arg_offset++; + std::swap(base, interval); + } + + if (interval <= 0) + { + addReplyError(c, "interval must be positive"); + return; + } + + long numkeys = 0; + if (c->argc > ARG_NUMKEYS) + { + if (getLongFromObjectOrReply(c, c->argv[ARG_NUMKEYS], &numkeys, NULL) != C_OK) + return; + + if (c->argc < (6 + numkeys)) { + addReplyError(c, "Missing arguments or numkeys is too big"); + } + } + + std::unique_ptr spjob = std::make_unique(); + spjob->script = sdsstring(sdsdup(szFromObj(c->argv[ARG_SCRIPT]))); + spjob->interval = (uint64_t)interval; + spjob->startTime = (uint64_t)base; + spjob->fSingleShot = fSingleShot; + spjob->dbNum = c->db - g_pserver->db; + for (long i = 0; i < numkeys; ++i) + spjob->veckeys.emplace_back(sdsdup(szFromObj(c->argv[ARG_KEYSTART+i]))); + for (long i = ARG_KEYSTART + numkeys; i < c->argc; ++i) + spjob->vecargs.emplace_back(sdsdup(szFromObj(c->argv[i]))); + + robj *o = createObject(OBJ_CRON, spjob.release()); + setKey(c->db, c->argv[ARG_NAME], o); + // use an expire to trigger execution. Note: We use a subkey expire here so legacy clients don't delete it. + setExpire(c, c->db, c->argv[ARG_NAME], c->argv[ARG_NAME], base + interval); + addReply(c, shared.ok); +} + +void executeCronJobExpireHook(const char *key, robj *o) +{ + serverAssert(o->type == OBJ_CRON); + cronjob *job = (cronjob*)ptrFromObj(o); + + client *cFake = createClient(-1, IDX_EVENT_LOOP_MAIN); + cFake->lock.lock(); + cFake->authenticated = 1; + cFake->puser = nullptr; + selectDb(cFake, job->dbNum); + serverAssert(cFake->argc == 0); + + // Setup the args for the EVAL command + cFake->argc = 3 + job->veckeys.size() + job->vecargs.size(); + cFake->argv = (robj**)zmalloc(sizeof(robj*) * cFake->argc, MALLOC_LOCAL); + cFake->argv[0] = createStringObject("EVAL", 4); + cFake->argv[1] = createStringObject(job->script.get(), job->script.size()); + cFake->argv[2] = createStringObjectFromLongLong(job->veckeys.size()); + for (size_t i = 0; i < job->veckeys.size(); ++i) + cFake->argv[3+i] = createStringObject(job->veckeys[i].get(), job->veckeys[i].size()); + for (size_t i = 0; i < job->vecargs.size(); ++i) + cFake->argv[3+job->veckeys.size()+i] = createStringObject(job->vecargs[i].get(), job->vecargs[i].size()); + + evalCommand(cFake); + resetClient(cFake); + + robj *keyobj = createStringObject(key,sdslen(key)); + int dbId = job->dbNum; + if (job->fSingleShot) + { + dbSyncDelete(cFake->db, keyobj); + } + else + { + job->startTime += job->interval; + if (job->startTime < (uint64_t)g_pserver->mstime) + { + // If we are more than one interval in the past then fast forward to + // the first interval still in the future + auto delta = g_pserver->mstime - job->startTime; + auto multiple = (delta / job->interval)+1; + job->startTime += job->interval * multiple; + } + setExpire(cFake, cFake->db, keyobj, keyobj, job->startTime + job->interval); + } + + notifyKeyspaceEvent(NOTIFY_KEYEVENT, "CRON Executed", keyobj, dbId); + decrRefCount(keyobj); + + // o is invalid at this point + freeClient(cFake); +} \ No newline at end of file diff --git a/src/cron.h b/src/cron.h new file mode 100644 index 000000000..8bac7f604 --- /dev/null +++ b/src/cron.h @@ -0,0 +1,16 @@ +#pragma once + +struct cronjob +{ + sdsstring script; + uint64_t interval; + uint64_t startTime; + std::vector veckeys; + std::vector vecargs; + int dbNum = 0; + bool fSingleShot = false; +}; + +void freeCronObject(robj_roptr o); +void executeCronJobExpireHook(const char *key, robj *o); +void cronCommand(client *c); \ No newline at end of file From ae344960f16393a01d0625a5e32e36b77aa3dce5 Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 28 Jan 2020 21:39:56 -0500 Subject: [PATCH 04/28] CRON jobs were running half as often as required Former-commit-id: 1069d0b4bf39818cbd72d0bcbd168769244cf18f --- src/cron.cpp | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/cron.cpp b/src/cron.cpp index 72d26f5c3..fd777d43d 100644 --- a/src/cron.cpp +++ b/src/cron.cpp @@ -110,12 +110,20 @@ void executeCronJobExpireHook(const char *key, robj *o) if (job->startTime < (uint64_t)g_pserver->mstime) { // If we are more than one interval in the past then fast forward to - // the first interval still in the future - auto delta = g_pserver->mstime - job->startTime; - auto multiple = (delta / job->interval)+1; - job->startTime += job->interval * multiple; + // the first interval still in the future. If startTime wasn't zero align + // this to the original startTime, if it was zero align to now + if (job->startTime == job->interval) + { // startTime was 0 + job->startTime = g_pserver->mstime + job->interval; + } + else + { + auto delta = g_pserver->mstime - job->startTime; + auto multiple = (delta / job->interval)+1; + job->startTime += job->interval * multiple; + } } - setExpire(cFake, cFake->db, keyobj, keyobj, job->startTime + job->interval); + setExpire(cFake, cFake->db, keyobj, keyobj, job->startTime); } notifyKeyspaceEvent(NOTIFY_KEYEVENT, "CRON Executed", keyobj, dbId); From 8800a516d152190c46e547bd69da3aca6515944c Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 6 Feb 2020 23:31:12 -0500 Subject: [PATCH 05/28] Add test to detect issue #137 and #132 Former-commit-id: 49d86746edef497a568c6f3a64695d420305cca8 --- src/debug.cpp | 20 +++++++++++++++++++- tests/integration/replication.tcl | 2 ++ tests/test_helper.tcl | 1 + tests/unit/rreplay.tcl | 6 +++--- 4 files changed, 25 insertions(+), 4 deletions(-) diff --git a/src/debug.cpp b/src/debug.cpp index 234f197be..1916dc79b 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -685,8 +685,26 @@ NULL } else if (!strcasecmp(szFromObj(c->argv[1]),"stringmatch-test") && c->argc == 2) { stringmatchlen_fuzz_test(); addReplyStatus(c,"Apparently Redis did not crash: test passed"); - } else if (!strcasecmp(szFromObj(c->argv[1]), "force-master") && c->argc == 2) { + } else if (!strcasecmp(szFromObj(c->argv[1]), "force-master") && c->argc == 3) { c->flags |= CLIENT_MASTER | CLIENT_MASTER_FORCE_REPLY; + if (!strcasecmp(szFromObj(c->argv[2]), "yes")) + { + redisMaster *mi = (redisMaster*)zcalloc(sizeof(redisMaster), MALLOC_LOCAL); + mi->master = c; + listAddNodeHead(g_pserver->masters, mi); + } + else if (strcasecmp(szFromObj(c->argv[2]), "flagonly")) // if we didn't set flagonly assume its an unset + { + serverAssert(c->flags & CLIENT_MASTER); + if (listLength(g_pserver->masters)) + { + redisMaster *mi = (redisMaster*)listNodeValue(listFirst(g_pserver->masters)); + serverAssert(mi->master == c); + listDelNode(g_pserver->masters, listFirst(g_pserver->masters)); + zfree(mi); + } + c->flags &= ~(CLIENT_MASTER | CLIENT_MASTER_FORCE_REPLY); + } addReply(c, shared.ok); } else { addReplySubcommandSyntaxError(c); diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index 0e50c20a9..5ed1ffc52 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -309,3 +309,5 @@ start_server {tags {"repl"}} { } } } + + diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index ec1d7cd31..0711d4cc2 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -37,6 +37,7 @@ set ::all_tests { unit/acl unit/rreplay unit/cron + unit/replication integration/block-repl integration/replication integration/replication-2 diff --git a/tests/unit/rreplay.tcl b/tests/unit/rreplay.tcl index 2029f521d..2fd1d3714 100644 --- a/tests/unit/rreplay.tcl +++ b/tests/unit/rreplay.tcl @@ -1,7 +1,7 @@ -start_server {tags {"rreplay"}} { +start_server {tags {"rreplay"} overrides {active-replica yes}} { test {RREPLAY use current db} { - r debug force-master + r debug force-master flagonly r select 4 r set dbnum invalid r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$5\r\ndbnum\r\n\$4\r\nfour\r\n" @@ -10,7 +10,7 @@ start_server {tags {"rreplay"}} { reconnect test {RREPLAY db different} { - r debug force-master + r debug force-master flagonly r select 4 r set testkey four r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$7\r\ntestkey\r\n\$4\r\nbebe\r\n" 2 From eabc436814c58c112d56bda275d70b0622843168 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 6 Feb 2020 23:31:31 -0500 Subject: [PATCH 06/28] Fix issue #137 and #132 Former-commit-id: 050d58007f84e4f71b0ae8b053ae4d6fd5bb4ec7 --- src/db.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index 605baaf40..9190d22f1 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -177,9 +177,8 @@ robj_roptr lookupKeyRead(redisDb *db, robj *key) { * Returns the linked value object if the key exists or NULL if the key * does not exist in the specified DB. */ robj *lookupKeyWrite(redisDb *db, robj *key) { + expireIfNeeded(db,key); robj *o = lookupKey(db,key,LOOKUP_UPDATEMVCC); - if (expireIfNeeded(db,key)) - o = NULL; return o; } From 6c2cef768707fa662e5ce152c3ec556ef5a70794 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sat, 8 Feb 2020 16:49:41 -0500 Subject: [PATCH 07/28] Addmissing test file Former-commit-id: fb2bdf7d05e27b15dcb53b09d6820416a99a3ba7 --- tests/unit/replication.tcl | 12 ++++++++++++ 1 file changed, 12 insertions(+) create mode 100644 tests/unit/replication.tcl diff --git a/tests/unit/replication.tcl b/tests/unit/replication.tcl new file mode 100644 index 000000000..ada6af1e5 --- /dev/null +++ b/tests/unit/replication.tcl @@ -0,0 +1,12 @@ + +start_server {tags {"repl"}} { + test "incr of expired key on replica doesn't cause a crash" { + r debug force-master yes + r set testkey 1 + r pexpire testkey 1 + after 500 + r incr testkey + r incr testkey + r debug force-master no + } +} From 25ef65463e2c08191009c656883d6602d46ae342 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 10 Feb 2020 00:25:03 -0500 Subject: [PATCH 08/28] Ensure multi-master works for ring topologies Former-commit-id: a7cc3aac28ccec4dadb80aa2cc7279c53982bc28 --- src/multi.cpp | 8 ++- src/replication.cpp | 77 ++++++++++++++++++++---- src/server.h | 2 + tests/integration/replication-active.tcl | 18 ++++++ tests/test_helper.tcl | 1 + tests/unit/rreplay.tcl | 4 +- 6 files changed, 94 insertions(+), 16 deletions(-) diff --git a/src/multi.cpp b/src/multi.cpp index 2018e08d9..c9485b0ae 100644 --- a/src/multi.cpp +++ b/src/multi.cpp @@ -28,6 +28,7 @@ */ #include "server.h" +bool FInReplicaReplay(); /* ================================ MULTI/EXEC ============================== */ @@ -172,12 +173,15 @@ void execCommand(client *c) { * This way we'll deliver the MULTI/..../EXEC block as a whole and * both the AOF and the replication link will have the same consistency * and atomicity guarantees. */ - if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) { + if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN)) && !(FInReplicaReplay())) { execCommandPropagateMulti(c); must_propagate = 1; } - call(c,g_pserver->loading ? CMD_CALL_NONE : CMD_CALL_FULL); + int flags = g_pserver->loading ? CMD_CALL_NONE : CMD_CALL_FULL; + if (FInReplicaReplay()) + flags &= ~CMD_CALL_PROPAGATE; + call(c,flags); /* Commands may alter argc/argv, restore mstate. */ c->mstate.commands[j].argc = c->argc; diff --git a/src/replication.cpp b/src/replication.cpp index b8236420d..e42872cd4 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -42,6 +42,7 @@ #include #include #include +#include void replicationDiscardCachedMaster(redisMaster *mi); void replicationResurrectCachedMaster(redisMaster *mi, int newfd); @@ -353,6 +354,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { cchDbNum = std::min(cchDbNum, sizeof(szDbNum)); // snprintf is tricky like that char szMvcc[128]; + incrementMvccTstamp(); uint64_t mvccTstamp = getMvccTstamp(); int cchMvccNum = snprintf(szMvcc, sizeof(szMvcc), "%lu", mvccTstamp); int cchMvcc = snprintf(szMvcc, sizeof(szMvcc), "$%d\r\n%lu\r\n", cchMvccNum, mvccTstamp); @@ -432,6 +434,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { clientReplyBlock* reply = (clientReplyBlock*)listNodeValue(lnReply); addReplyProtoAsync(replica, reply->buf(), reply->used); } + if (!fSendRaw) { addReplyAsync(replica,shared.crlf); @@ -2420,6 +2423,8 @@ void freeMasterInfo(redisMaster *mi) { zfree(mi->masterauth); zfree(mi->masteruser); + if (mi->clientFake) + freeClient(mi->clientFake); delete mi->staleKeyMap; zfree(mi); } @@ -2477,6 +2482,11 @@ void replicationHandleMasterDisconnection(redisMaster *mi) { mi->master = NULL; mi->repl_state = REPL_STATE_CONNECT; mi->repl_down_since = g_pserver->unixtime; + if (mi->clientFake) { + freeClient(mi->clientFake); + mi->clientFake = nullptr; + + } /* We lost connection with our master, don't disconnect slaves yet, * maybe we'll be able to PSYNC with our master later. We'll disconnect * the slaves only if we'll have to do a full resync with our master. */ @@ -3344,14 +3354,33 @@ public: return m_cnesting == 1; } + redisMaster *getMi(client *c) + { + if (m_mi == nullptr) + m_mi = MasterInfoFromClient(c); + return m_mi; + } + + int nesting() const { return m_cnesting; } + private: int m_cnesting = 0; bool m_fCancelled = false; + redisMaster *m_mi = nullptr; }; +static thread_local ReplicaNestState *s_pstate = nullptr; + +bool FInReplicaReplay() +{ + return s_pstate != nullptr && s_pstate->nesting() > 0; +} + + +static std::unordered_map g_mapmvcc; + void replicaReplayCommand(client *c) { - static thread_local ReplicaNestState *s_pstate = nullptr; if (s_pstate == nullptr) s_pstate = new (MALLOC_LOCAL) ReplicaNestState; @@ -3375,9 +3404,10 @@ void replicaReplayCommand(client *c) return; } - unsigned char uuid[UUID_BINARY_LEN]; + std::string uuid; + uuid.resize(UUID_BINARY_LEN); if (c->argv[1]->type != OBJ_STRING || sdslen((sds)ptrFromObj(c->argv[1])) != 36 - || uuid_parse((sds)ptrFromObj(c->argv[1]), uuid) != 0) + || uuid_parse((sds)ptrFromObj(c->argv[1]), (unsigned char*)uuid.data()) != 0) { addReplyError(c, "Expected UUID arg1"); s_pstate->Cancel(); @@ -3413,7 +3443,7 @@ void replicaReplayCommand(client *c) } } - if (FSameUuidNoNil(uuid, cserver.uuid)) + if (FSameUuidNoNil((unsigned char*)uuid.data(), cserver.uuid)) { addReply(c, shared.ok); s_pstate->Cancel(); @@ -3423,33 +3453,56 @@ void replicaReplayCommand(client *c) if (!s_pstate->FPush()) return; + redisMaster *mi = s_pstate->getMi(c); + client *cFake = mi->clientFake; + if (mi->clientFakeNesting != s_pstate->nesting()) + cFake = nullptr; + serverAssert(mi != nullptr); + if (mvcc != 0 && g_mapmvcc[uuid] >= mvcc && (mi->clientFake == nullptr)) + { + s_pstate->Cancel(); + s_pstate->Pop(); + return; + } + // OK We've recieved a command lets execute client *current_clientSave = serverTL->current_client; - client *cFake = createClient(-1, c->iel); + if (cFake == nullptr) + cFake = createClient(-1, c->iel); cFake->lock.lock(); cFake->authenticated = c->authenticated; cFake->puser = c->puser; cFake->querybuf = sdscatsds(cFake->querybuf,(sds)ptrFromObj(c->argv[2])); selectDb(cFake, c->db->id); auto ccmdPrev = serverTL->commandsExecuted; + cFake->flags |= CLIENT_MASTER | CLIENT_PREVENT_REPL_PROP; processInputBuffer(cFake, (CMD_CALL_FULL & (~CMD_CALL_PROPAGATE))); + cFake->flags &= ~(CLIENT_MASTER | CLIENT_PREVENT_REPL_PROP); bool fExec = ccmdPrev != serverTL->commandsExecuted; cFake->lock.unlock(); - if (fExec) + if (fExec || cFake->flags & CLIENT_MULTI) { addReply(c, shared.ok); selectDb(c, cFake->db->id); - redisMaster *mi = MasterInfoFromClient(c); - if (mi != nullptr) // this should never be null but I'd prefer not to crash - { - mi->mvccLastSync = mvcc; - } + g_mapmvcc[uuid] = mvcc; } else { + serverLog(LL_WARNING, "Command didn't execute: %s", cFake->buf); addReplyError(c, "command did not execute"); } - freeClient(cFake); + serverAssert(sdslen(cFake->querybuf) == 0); + if (cFake->flags & CLIENT_MULTI) + { + mi->clientFake = cFake; + mi->clientFakeNesting = s_pstate->nesting(); + } + else + { + if (mi->clientFake == cFake) + mi->clientFake = nullptr; + freeClient(cFake); + } serverTL->current_client = current_clientSave; // call() will not propogate this for us, so we do so here diff --git a/src/server.h b/src/server.h index 781f043f0..22d983b0f 100644 --- a/src/server.h +++ b/src/server.h @@ -1535,6 +1535,8 @@ struct redisMaster { int masterport; /* Port of master */ client *cached_master; /* Cached master to be reused for PSYNC. */ client *master; + client *clientFake; + int clientFakeNesting; /* The following two fields is where we store master PSYNC replid/offset * while the PSYNC is in progress. At the end we'll copy the fields into * the server->master client structure. */ diff --git a/tests/integration/replication-active.tcl b/tests/integration/replication-active.tcl index 4f37f2adf..0c28eb85d 100644 --- a/tests/integration/replication-active.tcl +++ b/tests/integration/replication-active.tcl @@ -59,6 +59,24 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} { } } + test {Active replicas propogate transaction} { + $master set testkey 0 + $master multi + $master incr testkey + $master incr testkey + after 5000 + $master get testkey + $master exec + assert_equal 2 [$master get testkey] + after 500 + wait_for_condition 50 500 { + [string match "2" [$slave get testkey]] + } else { + fail "Transaction failed to replicate" + } + $master flushall + } + test {Active replicas WAIT} { # Test that wait succeeds since replicas should be syncronized $master set testkey foo diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index 0711d4cc2..376004041 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -45,6 +45,7 @@ set ::all_tests { integration/replication-4 integration/replication-psync integration/replication-active + integration/replication-multimaster integration/aof integration/rdb integration/convert-zipmap-hash-on-load diff --git a/tests/unit/rreplay.tcl b/tests/unit/rreplay.tcl index 2fd1d3714..e11030f95 100644 --- a/tests/unit/rreplay.tcl +++ b/tests/unit/rreplay.tcl @@ -1,7 +1,7 @@ start_server {tags {"rreplay"} overrides {active-replica yes}} { test {RREPLAY use current db} { - r debug force-master flagonly + r debug force-master yes r select 4 r set dbnum invalid r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$5\r\ndbnum\r\n\$4\r\nfour\r\n" @@ -10,7 +10,7 @@ start_server {tags {"rreplay"} overrides {active-replica yes}} { reconnect test {RREPLAY db different} { - r debug force-master flagonly + r debug force-master yes r select 4 r set testkey four r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$7\r\ntestkey\r\n\$4\r\nbebe\r\n" 2 From eac3cffe416623af74a4d7b88fc39c5092d111a6 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 10 Feb 2020 00:29:26 -0500 Subject: [PATCH 09/28] CLANG build fix Former-commit-id: dc78bf1ccbd3dfd2de582d2a0d0be3223de3c7c3 --- src/replication.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/replication.cpp b/src/replication.cpp index e42872cd4..fc632a6cd 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -43,6 +43,7 @@ #include #include #include +#include void replicationDiscardCachedMaster(redisMaster *mi); void replicationResurrectCachedMaster(redisMaster *mi, int newfd); From d346ad77347901d519f87514b817fdbe1db3e895 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 10 Feb 2020 18:15:29 -0500 Subject: [PATCH 10/28] Add missing test file Former-commit-id: 0c101dccc825668cb7ff07c23e82db0f5642b786 --- tests/integration/replication-multimaster.tcl | 74 +++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100644 tests/integration/replication-multimaster.tcl diff --git a/tests/integration/replication-multimaster.tcl b/tests/integration/replication-multimaster.tcl new file mode 100644 index 000000000..e5e77fdad --- /dev/null +++ b/tests/integration/replication-multimaster.tcl @@ -0,0 +1,74 @@ +foreach topology {mesh ring} { +start_server {tags {"multi-master"} overrides {hz 500 active-replica yes multi-master yes}} { +start_server {overrides {hz 500 active-replica yes multi-master yes}} { +start_server {overrides {hz 500 active-replica yes multi-master yes}} { +start_server {overrides {hz 500 active-replica yes multi-master yes}} { + + for {set j 0} {$j < 4} {incr j} { + set R($j) [srv [expr 0-$j] client] + set R_host($j) [srv [expr 0-$j] host] + set R_port($j) [srv [expr 0-$j] port] + } + + # Initialize as mesh + if [string equal $topology "mesh"] { + for {set j 0} {$j < 4} {incr j} { + for {set k 0} {$k < 4} {incr k} { + if $j!=$k { + $R($j) replicaof $R_host($k) $R_port($k) + after 100 + } + } + }} + #Else Ring + if [string equal $topology "ring"] { + $R(0) replicaof $R_host(3) $R_port(3) + after 100 + $R(1) replicaof $R_host(0) $R_port(0) + after 100 + $R(2) replicaof $R_host(1) $R_port(1) + after 100 + $R(3) replicaof $R_host(2) $R_port(2) + } + + after 2000 + + test "$topology replicates to all nodes" { + $R(0) set testkey foo + after 500 + assert_equal foo [$R(1) get testkey] "replicates to 1" + assert_equal foo [$R(2) get testkey] "replicates to 2" + } + + test "$topology replicates only once" { + $R(0) set testkey 1 + after 500 + $R(1) incr testkey + after 500 + $R(2) incr testkey + after 500 + assert_equal 3 [$R(0) get testkey] + assert_equal 3 [$R(1) get testkey] + assert_equal 3 [$R(2) get testkey] + assert_equal 3 [$R(3) get testkey] + } + + test "$topology transaction replicates only once" { + for {set j 0} {$j < 1000} {incr j} { + $R(0) set testkey 1 + $R(0) multi + $R(0) incr testkey + $R(0) incr testkey + $R(0) exec + after 1 + assert_equal 3 [$R(0) get testkey] "node 0" + assert_equal 3 [$R(1) get testkey] "node 1" + assert_equal 3 [$R(2) get testkey] "node 2" + assert_equal 3 [$R(3) get testkey] "node 3" + } + } +} +} +} +} +} From 68235881e99a41b53cd2f0cc365edc9d0476d1ce Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 30 Jan 2020 17:55:48 -0500 Subject: [PATCH 11/28] Fix memory leak in cron Former-commit-id: f1748f8c7611ad96d7ba4fed66439cd1f043e6f3 --- src/cron.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/cron.cpp b/src/cron.cpp index fd777d43d..230cf4ed4 100644 --- a/src/cron.cpp +++ b/src/cron.cpp @@ -67,6 +67,7 @@ void cronCommand(client *c) robj *o = createObject(OBJ_CRON, spjob.release()); setKey(c->db, c->argv[ARG_NAME], o); + decrRefCount(o); // use an expire to trigger execution. Note: We use a subkey expire here so legacy clients don't delete it. setExpire(c, c->db, c->argv[ARG_NAME], c->argv[ARG_NAME], base + interval); addReply(c, shared.ok); From 30ece138d5e4faa414d98f585791538eb3c93455 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 10 Feb 2020 19:52:57 -0500 Subject: [PATCH 12/28] Fix issue #119 Former-commit-id: 46224721237616c345f6726b721a354d7bda71df --- src/server.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server.cpp b/src/server.cpp index c6f360905..2d6027348 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1690,7 +1690,7 @@ void clientsCron(int iel) { /* The following functions do different service checks on the client. * The protocol is that they return non-zero if the client was * terminated. */ - if (clientsCronHandleTimeout(c,now)) goto LContinue; + if (clientsCronHandleTimeout(c,now)) continue; // Client free'd so don't release the lock if (clientsCronResizeQueryBuffer(c)) goto LContinue; if (clientsCronTrackExpansiveClients(c)) goto LContinue; LContinue: From e4d74b993f928dce069996e32a90f4b98e17a4ca Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 11 Feb 2020 00:59:07 -0500 Subject: [PATCH 13/28] Fix cases where duplicate RREPLAY is applied Former-commit-id: c3317686f8b8d94a3b2295def899ae30e208f327 --- src/replication.cpp | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/src/replication.cpp b/src/replication.cpp index fc632a6cd..9b8570c49 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -1259,19 +1259,19 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) { aePostFunction(g_pserver->rgthreadvar[replica->iel].el, [replica] { // Because the client could have been closed while the lambda waited to run we need to - // verify the replica is still connected + // verify the replica is still connected listIter li; - listNode *ln; - listRewind(g_pserver->slaves,&li); - bool fFound = false; - while ((ln = listNext(&li))) { - if (listNodeValue(ln) == replica) { - fFound = true; - break; - } - } - if (!fFound) - return; + listNode *ln; + listRewind(g_pserver->slaves,&li); + bool fFound = false; + while ((ln = listNext(&li))) { + if (listNodeValue(ln) == replica) { + fFound = true; + break; + } + } + if (!fFound) + return; aeDeleteFileEvent(g_pserver->rgthreadvar[replica->iel].el,replica->fd,AE_WRITABLE); if (aeCreateFileEvent(g_pserver->rgthreadvar[replica->iel].el, replica->fd, AE_WRITABLE, sendBulkToSlave, replica) == AE_ERR) { freeClient(replica); @@ -2329,10 +2329,11 @@ int connectWithMaster(redisMaster *mi) { void undoConnectWithMaster(redisMaster *mi) { int fd = mi->repl_transfer_s; - aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [fd]{ + int res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [fd]{ aeDeleteFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,fd,AE_READABLE|AE_WRITABLE); close(fd); }); + serverAssert(res == AE_OK); mi->repl_transfer_s = -1; } @@ -3459,7 +3460,7 @@ void replicaReplayCommand(client *c) if (mi->clientFakeNesting != s_pstate->nesting()) cFake = nullptr; serverAssert(mi != nullptr); - if (mvcc != 0 && g_mapmvcc[uuid] >= mvcc && (mi->clientFake == nullptr)) + if (mvcc != 0 && g_mapmvcc[uuid] >= mvcc) { s_pstate->Cancel(); s_pstate->Pop(); @@ -3485,7 +3486,8 @@ void replicaReplayCommand(client *c) { addReply(c, shared.ok); selectDb(c, cFake->db->id); - g_mapmvcc[uuid] = mvcc; + if (mvcc > g_mapmvcc[uuid]) + g_mapmvcc[uuid] = mvcc; } else { From fef9925b7f90e622cfded283d8cd637c68def82a Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 11 Feb 2020 01:00:21 -0500 Subject: [PATCH 14/28] Fix higher latency at low load by grouping clients to threads. This fixes slow perf in cluster benchmarks mentioned in issue #102 Former-commit-id: 1a4c3224c9848f02fbdb49674045b593cfc41d31 --- src/ae.cpp | 5 +++- src/aof.cpp | 9 ++++--- src/cluster.cpp | 12 +++++++++- src/config.cpp | 5 ++++ src/networking.cpp | 58 ++++++++++++++++++++++++++++++++++++++++++---- src/server.h | 1 + 6 files changed, 81 insertions(+), 9 deletions(-) diff --git a/src/ae.cpp b/src/ae.cpp index 90c148510..b92cd4a67 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -273,7 +273,8 @@ int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg) cmd.proc = proc; cmd.clientData = arg; auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); - AE_ASSERT(size == sizeof(cmd)); + if (size != sizeof(cmd)) + return AE_ERR; return AE_OK; } @@ -296,6 +297,8 @@ int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynch } auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); + if (size != sizeof(cmd)) + return AE_ERR; AE_ASSERT(size == sizeof(cmd)); int ret = AE_OK; if (fSynchronous) diff --git a/src/aof.cpp b/src/aof.cpp index b82be9a34..65647bbba 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -167,11 +167,12 @@ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) { * not one already. */ if (!g_pserver->aof_rewrite_pending) { g_pserver->aof_rewrite_pending = true; - aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [] { + int res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [] { g_pserver->aof_rewrite_pending = false; if (g_pserver->aof_pipe_write_data_to_child >= 0) aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL); }); + serverAssert(res == AE_OK); // we can't handle an error here } } @@ -1563,16 +1564,18 @@ error: void aofClosePipes(void) { int fdAofAckPipe = g_pserver->aof_pipe_read_ack_from_child; - aePostFunction(g_pserver->el_alf_pip_read_ack_from_child, [fdAofAckPipe]{ + int res = aePostFunction(g_pserver->el_alf_pip_read_ack_from_child, [fdAofAckPipe]{ aeDeleteFileEventAsync(serverTL->el,fdAofAckPipe,AE_READABLE); close (fdAofAckPipe); }); + serverAssert(res == AE_OK); int fdAofWritePipe = g_pserver->aof_pipe_write_data_to_child; - aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [fdAofWritePipe]{ + res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [fdAofWritePipe]{ aeDeleteFileEventAsync(serverTL->el,fdAofWritePipe,AE_WRITABLE); close(fdAofWritePipe); }); + serverAssert(res == AE_OK); g_pserver->aof_pipe_write_data_to_child = -1; close(g_pserver->aof_pipe_read_data_from_parent); diff --git a/src/cluster.cpp b/src/cluster.cpp index f6a6e03dc..c20b0f4c4 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -295,6 +295,15 @@ int clusterLoadConfig(char *filename) { if (clusterGetMaxEpoch() > g_pserver->cluster->currentEpoch) { g_pserver->cluster->currentEpoch = clusterGetMaxEpoch(); } + + if (dictSize(g_pserver->cluster->nodes) > 1 && cserver.thread_min_client_threshold < 100) + { + // Because we expect the individual load of a client to be much less in a cluster (it will spread over multiple server) + // we can increase the grouping of clients on a single thread within reason + cserver.thread_min_client_threshold *= dictSize(g_pserver->cluster->nodes); + cserver.thread_min_client_threshold = std::min(cserver.thread_min_client_threshold, 200); + serverLog(LL_NOTICE, "Expanding min-clients-per-thread to %d due to cluster", cserver.thread_min_client_threshold); + } return C_OK; fmterr: @@ -623,9 +632,10 @@ void freeClusterLink(clusterLink *link) { if (link->node) link->node->link = NULL; link->node = nullptr; - aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [link]{ + int res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [link]{ freeClusterLink(link); }); + serverAssert(res == AE_OK); return; } if (link->fd != -1) { diff --git a/src/config.cpp b/src/config.cpp index c056b98dc..2aaad825e 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -805,6 +805,11 @@ void loadServerConfigFromString(char *config) { } else if (!strcasecmp(argv[0],"enable-pro")) { cserver.fUsePro = true; break; + } else if (!strcasecmp(argv[0],"min-clients-per-thread") && argc == 2) { + cserver.thread_min_client_threshold = atoi(argv[1]); + if (cserver.thread_min_client_threshold < 0 || cserver.thread_min_client_threshold > 400) { + err = "min-thread-client must be between 0 and 400"; goto loaderr; + } } else { err = "Bad directive or wrong number of arguments"; goto loaderr; } diff --git a/src/networking.cpp b/src/networking.cpp index 97744c410..097df0f87 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1003,6 +1003,33 @@ int clientHasPendingReplies(client *c) { return (c->bufpos || listLength(c->reply)) && !(c->flags & CLIENT_CLOSE_ASAP); } +int chooseBestThreadForAccept(int ielCur) +{ + listIter li; + listNode *ln; + int rgcclients[MAX_EVENT_LOOPS] = {0}; + + listRewind(g_pserver->clients, &li); + while ((ln = listNext(&li)) != nullptr) + { + client *c = (client*)listNodeValue(ln); + if (c->iel < 0) + continue; + + rgcclients[c->iel]++; + } + + int ielMinLoad = 0; + for (int iel = 0; iel < cserver.cthreads; ++iel) + { + if (rgcclients[iel] < cserver.thread_min_client_threshold) + return iel; + if (rgcclients[iel] < rgcclients[ielMinLoad]) + ielMinLoad = iel; + } + return ielMinLoad; +} + #define MAX_ACCEPTS_PER_CALL 1000 static void acceptCommonHandler(int fd, int flags, char *ip, int iel) { client *c; @@ -1105,7 +1132,22 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { if (!g_fTestMode) { - // We always accept on the same thread + { + int ielTarget = chooseBestThreadForAccept(ielCur); + if (ielTarget != ielCur) + { + char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL); + memcpy(szT, cip, NET_IP_STR_LEN); + int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget, szT]{ + acceptCommonHandler(cfd,0,szT, ielTarget); + zfree(szT); + }); + + if (res == AE_OK) + continue; + } + } + LLocalThread: aeAcquireLock(); acceptCommonHandler(cfd,0,cip, ielCur); @@ -1122,10 +1164,15 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { goto LLocalThread; char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL); memcpy(szT, cip, NET_IP_STR_LEN); - aePostFunction(g_pserver->rgthreadvar[iel].el, [cfd, iel, szT]{ + int res = aePostFunction(g_pserver->rgthreadvar[iel].el, [cfd, iel, szT]{ acceptCommonHandler(cfd,0,szT, iel); zfree(szT); }); + if (res != AE_OK) + { + zfree(szT); + goto LLocalThread; + } } } } @@ -1151,13 +1198,16 @@ void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int ielTarget = rand() % cserver.cthreads; if (ielTarget == ielCur) { + LLocalThread: acceptCommonHandler(cfd,CLIENT_UNIX_SOCKET,NULL, ielCur); } else { - aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget]{ + int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget]{ acceptCommonHandler(cfd,CLIENT_UNIX_SOCKET,NULL, ielTarget); }); + if (res != AE_OK) + goto LLocalThread; } aeReleaseLock(); @@ -2529,7 +2579,7 @@ NULL { int iel = client->iel; freeClientAsync(client); - aePostFunction(g_pserver->rgthreadvar[client->iel].el, [iel] { + aePostFunction(g_pserver->rgthreadvar[client->iel].el, [iel] { // note: failure is OK freeClientsInAsyncFreeQueue(iel); }); } diff --git a/src/server.h b/src/server.h index 22d983b0f..3ff023677 100644 --- a/src/server.h +++ b/src/server.h @@ -1604,6 +1604,7 @@ struct redisServerConst { unsigned char uuid[UUID_BINARY_LEN]; /* This server's UUID - populated on boot */ bool fUsePro = false; + int thread_min_client_threshold = 50; }; struct redisServer { From d4c1e981247cb8ac2c33d64dcef9937ee422f451 Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 11 Feb 2020 01:41:00 -0500 Subject: [PATCH 15/28] Implement an error handler so bug #125 can't happen Former-commit-id: 16a019dba053fd0654116ff98a2ad0b66a9ed4e6 --- src/aof.cpp | 28 +++++++++++++++++++--------- src/networking.cpp | 4 ++-- src/server.cpp | 1 + src/server.h | 1 + 4 files changed, 23 insertions(+), 11 deletions(-) diff --git a/src/aof.cpp b/src/aof.cpp index 65647bbba..de8a8260e 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -124,6 +124,21 @@ void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) { } } +void installAofRewriteEvent() +{ + serverTL->fRetrySetAofEvent = false; + if (!g_pserver->aof_rewrite_pending) { + g_pserver->aof_rewrite_pending = true; + int res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [] { + g_pserver->aof_rewrite_pending = false; + if (g_pserver->aof_pipe_write_data_to_child >= 0) + aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL); + }); + if (res != AE_OK) + serverTL->fRetrySetAofEvent = true; + } +} + /* Append data to the AOF rewrite buffer, allocating new blocks if needed. */ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) { listNode *ln = listLast(g_pserver->aof_rewrite_buf_blocks); @@ -165,15 +180,7 @@ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) { /* Install a file event to send data to the rewrite child if there is * not one already. */ - if (!g_pserver->aof_rewrite_pending) { - g_pserver->aof_rewrite_pending = true; - int res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [] { - g_pserver->aof_rewrite_pending = false; - if (g_pserver->aof_pipe_write_data_to_child >= 0) - aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL); - }); - serverAssert(res == AE_OK); // we can't handle an error here - } + installAofRewriteEvent(); } /* Write the buffer (possibly composed of multiple blocks) into the specified @@ -349,6 +356,9 @@ void flushAppendOnlyFile(int force) { int sync_in_progress = 0; mstime_t latency; + if (serverTL->fRetrySetAofEvent) + installAofRewriteEvent(); + if (sdslen(g_pserver->aof_buf) == 0) { /* Check if we need to do fsync even the aof buffer is empty, * because previously in AOF_FSYNC_EVERYSEC mode, fsync is diff --git a/src/networking.cpp b/src/networking.cpp index 097df0f87..54e04406f 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1003,7 +1003,7 @@ int clientHasPendingReplies(client *c) { return (c->bufpos || listLength(c->reply)) && !(c->flags & CLIENT_CLOSE_ASAP); } -int chooseBestThreadForAccept(int ielCur) +int chooseBestThreadForAccept() { listIter li; listNode *ln; @@ -1133,7 +1133,7 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { if (!g_fTestMode) { { - int ielTarget = chooseBestThreadForAccept(ielCur); + int ielTarget = chooseBestThreadForAccept(); if (ielTarget != ielCur) { char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL); diff --git a/src/server.cpp b/src/server.cpp index 2d6027348..15f52ab52 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2884,6 +2884,7 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain) pvar->el = aeCreateEventLoop(g_pserver->maxclients+CONFIG_FDSET_INCR); pvar->current_client = nullptr; pvar->clients_paused = 0; + pvar->fRetrySetAofEvent = false; if (pvar->el == NULL) { serverLog(LL_WARNING, "Failed creating the event loop. Error message: '%s'", diff --git a/src/server.h b/src/server.h index 3ff023677..3ec2e8948 100644 --- a/src/server.h +++ b/src/server.h @@ -1526,6 +1526,7 @@ struct redisServerThreadVars { struct fastlock lockPendingWrite { "thread pending write" }; char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ long unsigned commandsExecuted = 0; + bool fRetrySetAofEvent = false; }; struct redisMaster { From 47480943560ab4d110ac4d80f26356c93e733096 Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 11 Feb 2020 03:44:28 -0500 Subject: [PATCH 16/28] Fix race condition in allocating connections to threads Former-commit-id: 52434a583aa7114ff5658226441ab82ed3110a57 --- src/networking.cpp | 29 +++++++++++++---------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/src/networking.cpp b/src/networking.cpp index 54e04406f..9d94f5171 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1003,29 +1003,23 @@ int clientHasPendingReplies(client *c) { return (c->bufpos || listLength(c->reply)) && !(c->flags & CLIENT_CLOSE_ASAP); } +static std::atomic rgacceptsInFlight[MAX_EVENT_LOOPS]; int chooseBestThreadForAccept() { - listIter li; - listNode *ln; - int rgcclients[MAX_EVENT_LOOPS] = {0}; - - listRewind(g_pserver->clients, &li); - while ((ln = listNext(&li)) != nullptr) - { - client *c = (client*)listNodeValue(ln); - if (c->iel < 0) - continue; - - rgcclients[c->iel]++; - } - int ielMinLoad = 0; + int cclientsMin = INT_MAX; for (int iel = 0; iel < cserver.cthreads; ++iel) { - if (rgcclients[iel] < cserver.thread_min_client_threshold) + int cclientsThread; + atomicGet(g_pserver->rgthreadvar[iel].cclients, cclientsThread); + cclientsThread += rgacceptsInFlight[iel].load(std::memory_order_relaxed); + if (cclientsThread < cserver.thread_min_client_threshold) return iel; - if (rgcclients[iel] < rgcclients[ielMinLoad]) + if (cclientsThread < cclientsMin) + { + cclientsMin = cclientsThread; ielMinLoad = iel; + } } return ielMinLoad; } @@ -1134,18 +1128,21 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { { { int ielTarget = chooseBestThreadForAccept(); + rgacceptsInFlight[ielTarget].fetch_add(1, std::memory_order_relaxed); if (ielTarget != ielCur) { char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL); memcpy(szT, cip, NET_IP_STR_LEN); int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget, szT]{ acceptCommonHandler(cfd,0,szT, ielTarget); + rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed); zfree(szT); }); if (res == AE_OK) continue; } + rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed); } LLocalThread: From 4bf9beb484ecc83330a99c76509dba47733ebcbf Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 16 Feb 2020 03:33:45 -0500 Subject: [PATCH 17/28] aeDeleteEventLoop use after free and leak fixes Former-commit-id: 2fd93c5789a4e81455d51b2a4786f708e8d6a2d7 --- src/ae.cpp | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/ae.cpp b/src/ae.cpp index b92cd4a67..16ac3ebba 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -390,10 +390,18 @@ extern "C" void aeDeleteEventLoop(aeEventLoop *eventLoop) { aeApiFree(eventLoop); zfree(eventLoop->events); zfree(eventLoop->fired); - zfree(eventLoop); fastlock_free(&eventLoop->flock); close(eventLoop->fdCmdRead); close(eventLoop->fdCmdWrite); + + auto *te = eventLoop->timeEventHead; + while (te) + { + auto *teNext = te->next; + zfree(te); + te = teNext; + } + zfree(eventLoop); } extern "C" void aeStop(aeEventLoop *eventLoop) { From 41c75234bd592de8cb531188767a2964596a829c Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 16 Feb 2020 03:43:29 -0500 Subject: [PATCH 18/28] Fix memory leak of ReplicaNestState on shutdown Former-commit-id: 4781eda7225c2640e25387663c33ef74cd98b0c4 --- src/replication.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/replication.cpp b/src/replication.cpp index 9b8570c49..ba2008c0a 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -3371,7 +3371,7 @@ private: redisMaster *m_mi = nullptr; }; -static thread_local ReplicaNestState *s_pstate = nullptr; +static thread_local std::unique_ptr s_pstate; bool FInReplicaReplay() { @@ -3384,7 +3384,7 @@ static std::unordered_map g_mapmvcc; void replicaReplayCommand(client *c) { if (s_pstate == nullptr) - s_pstate = new (MALLOC_LOCAL) ReplicaNestState; + s_pstate = std::make_unique(); // the replay command contains two arguments: // 1: The UUID of the source From 8f6f496c7e64fa50f72777d9ec290da85e77e722 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 16 Feb 2020 17:08:00 -0500 Subject: [PATCH 19/28] Memory leak fix on config, and redisDb dtor Former-commit-id: b92bbf4de8ffc3edc965e2f9da4dd82ed7071559 --- src/config.cpp | 2 +- src/db.cpp | 8 ++++++++ src/server.h | 7 +++++-- 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/src/config.cpp b/src/config.cpp index 2aaad825e..2c9a9d518 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -405,7 +405,7 @@ void loadServerConfigFromString(char *config) { } else if ((!strcasecmp(argv[0],"slaveof") || !strcasecmp(argv[0],"replicaof")) && argc == 3) { slaveof_linenum = linenum; - replicationAddMaster(sdsnew(argv[1]), atoi(argv[2])); + replicationAddMaster(argv[1], atoi(argv[2])); } else if ((!strcasecmp(argv[0],"repl-ping-slave-period") || !strcasecmp(argv[0],"repl-ping-replica-period")) && argc == 2) diff --git a/src/db.cpp b/src/db.cpp index 9190d22f1..e94e0cdb1 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -1305,6 +1305,14 @@ void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when) rememberSlaveKeyWithExpire(db,key); } +redisDb::~redisDb() +{ + dictRelease(watched_keys); + dictRelease(ready_keys); + dictRelease(blocking_keys); + listRelease(defrag_later); +} + void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e) { dictEntry *kde; diff --git a/src/server.h b/src/server.h index 3ec2e8948..135a216ae 100644 --- a/src/server.h +++ b/src/server.h @@ -1127,10 +1127,13 @@ typedef struct clientReplyBlock { /* Redis database representation. There are multiple databases identified * by integers from 0 (the default database) up to the max configured * database. The database number is the 'id' field in the structure. */ -typedef struct redisDb { +struct redisDb { redisDb() : expireitr(nullptr) {}; + + ~redisDb(); + dict *pdict; /* The keyspace for this DB */ expireset *setexpire; expireset::setiter expireitr; @@ -1142,7 +1145,7 @@ typedef struct redisDb { long long last_expire_set; /* when the last expire was set */ double avg_ttl; /* Average TTL, just for stats */ list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */ -} redisDb; +}; /* Client MULTI/EXEC state */ typedef struct multiCmd { From d3a69998e4e513b799fcf72a2d13bb8ec883c30f Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 16 Feb 2020 18:41:29 -0500 Subject: [PATCH 20/28] Fix memory leak in RDB load Former-commit-id: 06ad1c15d719a34fed36244b12a593f749bbb8a6 --- src/rdb.cpp | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/rdb.cpp b/src/rdb.cpp index 48205f430..5b316a4a8 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1545,9 +1545,15 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key, uint64_t mvcc_tstamp) { == NULL) return NULL; if (rdbtype == RDB_TYPE_ZSET_2) { - if (rdbLoadBinaryDoubleValue(rdb,&score) == -1) return NULL; + if (rdbLoadBinaryDoubleValue(rdb,&score) == -1) { + sdsfree(sdsele); + return NULL; + } } else { - if (rdbLoadDoubleValue(rdb,&score) == -1) return NULL; + if (rdbLoadDoubleValue(rdb,&score) == -1) { + sdsfree(sdsele); + return NULL; + } } /* Don't care about integer-encoded strings. */ @@ -2181,6 +2187,8 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { decrRefCount(val); val = nullptr; } + decrRefCount(key); + key = nullptr; } /* Reset the state that is key-specified and is populated by From b6db2d32ad14e24f968d780f975b8fd08c08df44 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 16 Feb 2020 18:53:46 -0500 Subject: [PATCH 21/28] Graceful shutdown of server threads when quit is requested Former-commit-id: b9db899f6ccea62222170c6eec264d403a7a911d --- src/db.cpp | 2 +- src/server.cpp | 25 ++++++++++++++++++++++--- src/server.h | 3 +++ 3 files changed, 26 insertions(+), 4 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index e94e0cdb1..562485941 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -977,7 +977,7 @@ void shutdownCommand(client *c) { * Also when in Sentinel mode clear the SAVE flag and force NOSAVE. */ if (g_pserver->loading || g_pserver->sentinel_mode) flags = (flags & ~SHUTDOWN_SAVE) | SHUTDOWN_NOSAVE; - if (prepareForShutdown(flags) == C_OK) exit(0); + if (prepareForShutdown(flags) == C_OK) throw ShutdownException(); addReplyError(c,"Errors trying to SHUTDOWN. Check logs."); } diff --git a/src/server.cpp b/src/server.cpp index 15f52ab52..6fee88732 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1894,7 +1894,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { /* We received a SIGTERM, shutting down here in a safe way, as it is * not ok doing so inside the signal handler. */ if (g_pserver->shutdown_asap) { - if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0); + if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) throw ShutdownException(); serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information"); g_pserver->shutdown_asap = 0; } @@ -3812,8 +3812,17 @@ int prepareForShutdown(int flags) { /* Close the listening sockets. Apparently this allows faster restarts. */ closeListeningSockets(1); + + for (int iel = 0; iel < cserver.cthreads; ++iel) + { + aePostFunction(g_pserver->rgthreadvar[iel].el, [iel]{ + g_pserver->rgthreadvar[iel].el->stop = 1; + }); + } + serverLog(LL_WARNING,"%s is now ready to exit, bye bye...", g_pserver->sentinel_mode ? "Sentinel" : "KeyDB"); + return C_OK; } @@ -5033,8 +5042,16 @@ void *workerThreadMain(void *parg) aeEventLoop *el = g_pserver->rgthreadvar[iel].el; aeSetBeforeSleepProc(el, isMainThread ? beforeSleep : beforeSleepLite, isMainThread ? 0 : AE_SLEEP_THREADSAFE); aeSetAfterSleepProc(el, afterSleep, AE_SLEEP_THREADSAFE); - aeMain(el); + try + { + aeMain(el); + } + catch (ShutdownException) + { + } + serverAssert(!GlobalLocksAcquired()); aeDeleteEventLoop(el); + return NULL; } @@ -5334,7 +5351,9 @@ int main(int argc, char **argv) { /* The main thread sleeps until all the workers are done. this is so that all worker threads are orthogonal in their startup/shutdown */ void *pvRet; - pthread_join(rgthread[IDX_EVENT_LOOP_MAIN], &pvRet); + for (int iel = 0; iel < cserver.cthreads; ++iel) + pthread_join(rgthread[iel], &pvRet); + return 0; } diff --git a/src/server.h b/src/server.h index 135a216ae..58ea3e620 100644 --- a/src/server.h +++ b/src/server.h @@ -2922,6 +2922,9 @@ inline int FCorrectThread(client *c) } #define AssertCorrectThread(c) serverAssert(FCorrectThread(c)) +class ShutdownException +{}; + #define redisDebug(fmt, ...) \ printf("DEBUG %s:%d > " fmt "\n", __FILE__, __LINE__, __VA_ARGS__) #define redisDebugMark() \ From c810823abed5967e17c1e12b45329c20248c5821 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 17 Feb 2020 18:57:13 -0500 Subject: [PATCH 22/28] processEventsWhileBlocked not exception safe Former-commit-id: 1ef187533c26bfa0c084a815b8b80de92ba1cf0b --- src/networking.cpp | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/src/networking.cpp b/src/networking.cpp index 9d94f5171..803edb5ac 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -3065,6 +3065,7 @@ void unpauseClientsIfNecessary() * * The function returns the total number of events processed. */ int processEventsWhileBlocked(int iel) { + serverAssert(GlobalLocksAcquired()); int iterations = 4; /* See the function top-comment. */ int count = 0; @@ -3074,14 +3075,30 @@ int processEventsWhileBlocked(int iel) { serverAssert(c->flags & CLIENT_PROTECTED); c->lock.unlock(); } + aeReleaseLock(); - while (iterations--) { - int events = 0; - events += aeProcessEvents(g_pserver->rgthreadvar[iel].el, AE_FILE_EVENTS|AE_DONT_WAIT); - events += handleClientsWithPendingWrites(iel); - if (!events) break; - count += events; + serverAssertDebug(!GlobalLocksAcquired()); + try + { + while (iterations--) { + int events = 0; + events += aeProcessEvents(g_pserver->rgthreadvar[iel].el, AE_FILE_EVENTS|AE_DONT_WAIT); + events += handleClientsWithPendingWrites(iel); + if (!events) break; + count += events; + } } + catch (...) + { + // Caller expects us to be locked so fix and rethrow + AeLocker locker; + if (c != nullptr) + c->lock.lock(); + locker.arm(c); + locker.release(); + throw; + } + AeLocker locker; if (c != nullptr) c->lock.lock(); From 1d368edc10610851e56e8bb5b6e239d1ca5a7fcf Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 17 Feb 2020 19:54:05 -0500 Subject: [PATCH 23/28] Add double unlock detection and improve fastlock_unlock assmebly Former-commit-id: 98aefac09b6b59371e6c1c77d1ef2794bfc5ae62 --- src/fastlock.cpp | 56 ++++++++++++++++++++++++-------------------- src/fastlock_x64.asm | 32 +++++++------------------ src/networking.cpp | 1 - 3 files changed, 39 insertions(+), 50 deletions(-) diff --git a/src/fastlock.cpp b/src/fastlock.cpp index 19375cd0e..4f8b2e6dc 100644 --- a/src/fastlock.cpp +++ b/src/fastlock.cpp @@ -74,6 +74,10 @@ extern int g_fInCrash; #define __has_feature(x) 0 #endif +#ifdef __linux__ +extern "C" void unlock_futex(struct fastlock *lock, uint16_t ifutex); +#endif + #if __has_feature(thread_sanitizer) /* Report that a lock has been created at address "lock". */ @@ -206,6 +210,11 @@ DeadlockDetector g_dlock; static_assert(sizeof(pid_t) <= sizeof(fastlock::m_pidOwner), "fastlock::m_pidOwner not large enough"); uint64_t g_longwaits = 0; +extern "C" void fastlock_panic(struct fastlock *lock) +{ + _serverPanic(__FILE__, __LINE__, "fastlock lock/unlock mismatch for: %s", lock->szName); +} + uint64_t fastlock_getlongwaitcount() { uint64_t rval; @@ -337,31 +346,6 @@ extern "C" int fastlock_trylock(struct fastlock *lock, int fWeak) return false; } -#ifdef __linux__ -#define ROL32(v, shift) ((v << shift) | (v >> (32-shift))) -void unlock_futex(struct fastlock *lock, uint16_t ifutex) -{ - unsigned mask = (1U << (ifutex % 32)); - unsigned futexT; - __atomic_load(&lock->futex, &futexT, __ATOMIC_RELAXED); - futexT &= mask; - - if (futexT == 0) - return; - - for (;;) - { - __atomic_load(&lock->futex, &futexT, __ATOMIC_ACQUIRE); - futexT &= mask; - if (!futexT) - break; - - if (futex(&lock->m_ticket.u, FUTEX_WAKE_BITSET_PRIVATE, INT_MAX, nullptr, mask) == 1) - break; - } -} -#endif - extern "C" void fastlock_unlock(struct fastlock *lock) { --lock->m_depth; @@ -384,6 +368,26 @@ extern "C" void fastlock_unlock(struct fastlock *lock) } #endif +#ifdef __linux__ +#define ROL32(v, shift) ((v << shift) | (v >> (32-shift))) +extern "C" void unlock_futex(struct fastlock *lock, uint16_t ifutex) +{ + unsigned mask = (1U << (ifutex % 32)); + unsigned futexT; + + for (;;) + { + __atomic_load(&lock->futex, &futexT, __ATOMIC_ACQUIRE); + futexT &= mask; + if (!futexT) + break; + + if (futex(&lock->m_ticket.u, FUTEX_WAKE_BITSET_PRIVATE, INT_MAX, nullptr, mask) == 1) + break; + } +} +#endif + extern "C" void fastlock_free(struct fastlock *lock) { // NOP @@ -413,4 +417,4 @@ void fastlock_lock_recursive(struct fastlock *lock, int nesting) { fastlock_lock(lock); lock->m_depth = nesting; -} \ No newline at end of file +} diff --git a/src/fastlock_x64.asm b/src/fastlock_x64.asm index 7c9990a6d..bcea0e095 100644 --- a/src/fastlock_x64.asm +++ b/src/fastlock_x64.asm @@ -126,6 +126,7 @@ fastlock_trylock: .ALIGN 16 .global fastlock_unlock +.type fastlock_unlock,@function fastlock_unlock: # RDI points to the struct: # int32_t m_pidOwner @@ -133,34 +134,19 @@ fastlock_unlock: # [rdi+64] ... # uint16_t active # uint16_t avail - push r11 sub dword ptr [rdi+4], 1 # decrement m_depth, don't use dec because it partially writes the flag register and we don't know its state jnz .LDone # if depth is non-zero this is a recursive unlock, and we still hold it mov dword ptr [rdi], -1 # pidOwner = -1 (we don't own it anymore) - mov ecx, [rdi+64] # get current active (this one) - inc ecx # bump it to the next thread - mov [rdi+64], cx # give up our ticket (note: lock is not required here because the spinlock itself guards this variable) + mov esi, [rdi+64] # get current active (this one) + inc esi # bump it to the next thread + mov word ptr [rdi+64], si # give up our ticket (note: lock is not required here because the spinlock itself guards this variable) mfence # sync other threads # At this point the lock is removed, however we must wake up any pending futexs - mov r9d, 1 # eax is the bitmask for 2 threads - rol r9d, cl # place the mask in the right spot for the next 2 threads - add rdi, 64 # rdi now points to the token + mov edx, [rdi+64+4] # load the futex mask + bt edx, esi # is the next thread waiting on a futex? + jc unlock_futex # unlock the futex if necessary + ret # if not we're done. .ALIGN 16 -.LRetryWake: - mov r11d, [rdi+4] # load the futex mask - and r11d, r9d # are any threads waiting on a futex? - jz .LDone # if not we're done. - # we have to wake the futexs - # rdi ARG1 futex (already in rdi) - mov esi, (10 | 128) # rsi ARG2 FUTEX_WAKE_BITSET_PRIVATE - mov edx, 0x7fffffff # rdx ARG3 INT_MAX (number of threads to wake) - xor r10d, r10d # r10 ARG4 NULL - mov r8, rdi # r8 ARG5 dup rdi - # r9 ARG6 mask (already set above) - mov eax, 202 # sys_futex - syscall - cmp eax, 1 # did we wake as many as we expected? - jnz .LRetryWake .LDone: - pop r11 + js fastlock_panic # panic if we made m_depth negative ret diff --git a/src/networking.cpp b/src/networking.cpp index 803edb5ac..00322df72 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -3077,7 +3077,6 @@ int processEventsWhileBlocked(int iel) { } aeReleaseLock(); - serverAssertDebug(!GlobalLocksAcquired()); try { while (iterations--) { From 67eccf74bacb370425ac159313d30a942a9048d9 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 17 Feb 2020 23:45:40 -0500 Subject: [PATCH 24/28] Change Redis to KeyDB Former-commit-id: 6ad6c1d780f26a0785f39586b074ac3bb3132e09 --- src/server.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server.cpp b/src/server.cpp index 6fee88732..8da8e650b 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -4699,7 +4699,7 @@ void daemonize(void) { } void version(void) { - printf("Redis server v=%s sha=%s:%d malloc=%s bits=%d build=%llx\n", + printf("KeyDB server v=%s sha=%s:%d malloc=%s bits=%d build=%llx\n", KEYDB_REAL_VERSION, redisGitSHA1(), atoi(redisGitDirty()) > 0, From e8b92588902eb6ce1495b5c1272845ce55423de1 Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 26 Feb 2020 21:49:40 -0500 Subject: [PATCH 25/28] Fix crash propogating stale keys Former-commit-id: d95bead3837edeca11f27f6f344eca8174ca53e3 --- src/replication.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/replication.cpp b/src/replication.cpp index ba2008c0a..d7e92d308 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -278,7 +278,7 @@ void replicationFeedSlave(client *replica, int dictid, robj **argv, int argc, bo if (g_pserver->repl_backlog && fSendRaw) feedReplicationBacklogWithObject(selectcmd); /* Send it to slaves */ - addReply(replica,selectcmd); + addReplyAsync(replica,selectcmd); if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) decrRefCount(selectcmd); @@ -290,12 +290,12 @@ void replicationFeedSlave(client *replica, int dictid, robj **argv, int argc, bo * or are already in sync with the master. */ /* Add the multi bulk length. */ - addReplyArrayLen(replica,argc); + addReplyArrayLenAsync(replica,argc); /* Finally any additional argument that was not stored inside the * static buffer if any (from j to argc). */ for (int j = 0; j < argc; j++) - addReplyBulk(replica,argv[j]); + addReplyBulkAsync(replica,argv[j]); } /* Propagate write commands to slaves, and populate the replication backlog From 67d78e7808021693d99d22932d26704a5f877467 Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 28 Feb 2020 21:21:05 -0500 Subject: [PATCH 26/28] Fix issue #146 Former-commit-id: e5a2d594464a965f36e4cbf924f02929265186e6 --- src/fastlock.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/fastlock.cpp b/src/fastlock.cpp index 4f8b2e6dc..788d84c97 100644 --- a/src/fastlock.cpp +++ b/src/fastlock.cpp @@ -335,7 +335,7 @@ extern "C" int fastlock_trylock(struct fastlock *lock, int fWeak) struct ticket ticket_expect { { { active, active } } }; struct ticket ticket_setiflocked { { { active, next } } }; - if (__atomic_compare_exchange(&lock->m_ticket, &ticket_expect, &ticket_setiflocked, fWeak /*weak*/, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) + if (__atomic_compare_exchange(&lock->m_ticket.u, &ticket_expect.u, &ticket_setiflocked.u, fWeak /*weak*/, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) { lock->m_depth = 1; tid = gettid(); From f2c68909dcbbc90ae06e86edfda38db615e2dc1f Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 28 Feb 2020 22:04:42 -0500 Subject: [PATCH 27/28] Fix not respecting max clients config, issue #147 Former-commit-id: 54c96d19954fbddcfd2a307e436ca3290c76e0fb --- src/server.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/server.cpp b/src/server.cpp index 8da8e650b..837b1c38e 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -5121,10 +5121,6 @@ int main(int argc, char **argv) { dictSetHashFunctionSeed((uint8_t*)hashseed); g_pserver->sentinel_mode = checkForSentinelMode(argc,argv); initServerConfig(); - for (int iel = 0; iel < MAX_EVENT_LOOPS; ++iel) - { - initServerThread(g_pserver->rgthreadvar+iel, iel == IDX_EVENT_LOOP_MAIN); - } serverTL = &g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN]; aeAcquireLock(); // We own the lock on boot @@ -5251,6 +5247,10 @@ int main(int argc, char **argv) { int background = cserver.daemonize && !cserver.supervised; if (background) daemonize(); + for (int iel = 0; iel < MAX_EVENT_LOOPS; ++iel) + { + initServerThread(g_pserver->rgthreadvar+iel, iel == IDX_EVENT_LOOP_MAIN); + } initServer(); initNetworking(cserver.cthreads > 1 /* fReusePort */); From 1d804ef499251b47c9e7740b46da397aca636c83 Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 28 Feb 2020 23:49:17 -0500 Subject: [PATCH 28/28] Fix compile failure on raspberry pi machines, issue #141 Former-commit-id: aaca32a5cd1bdb0314b4a57847938854b8a4fef2 --- src/fastlock.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/fastlock.cpp b/src/fastlock.cpp index 788d84c97..49c0fb095 100644 --- a/src/fastlock.cpp +++ b/src/fastlock.cpp @@ -299,7 +299,7 @@ extern "C" void fastlock_lock(struct fastlock *lock) #if defined(__i386__) || defined(__amd64__) __asm__ __volatile__ ("pause"); -#elif defined(__arm__) +#elif defined(__aarch64__) __asm__ __volatile__ ("yield"); #endif if ((++cloops % 0x100000) == 0)