From 40eefe7da7406c8cd0b693981da035fb97ecbad3 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 11 Jul 2019 18:53:34 -0400 Subject: [PATCH 01/38] Release version 5 Former-commit-id: 25e18a7de91d4af651f8f22674846cb0238dd5d3 --- src/version.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/version.h b/src/version.h index db32931f1..c83af2844 100644 --- a/src/version.h +++ b/src/version.h @@ -1,3 +1,3 @@ -#define KEYDB_REAL_VERSION "0.0.0" +#define KEYDB_REAL_VERSION "5.0.0" extern const char *KEYDB_SET_VERSION; // Unlike real version, this can be overriden by the config From 17e6131ca5941dce081dae8d4993e7a99ae095d0 Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 24 Jul 2019 22:31:02 -0400 Subject: [PATCH 02/38] Issue #64 RREPLAY isn't binary safe. Add fix and test. Former-commit-id: f1982ca63dc8dd85b62c1338d7be324595b6ad8e --- src/replication.cpp | 2 +- tests/integration/replication-active.tcl | 9 +++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/replication.cpp b/src/replication.cpp index 14ee002aa..4118087c0 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -3265,7 +3265,7 @@ void replicaReplayCommand(client *c) cFake->lock.lock(); cFake->authenticated = c->authenticated; cFake->puser = c->puser; - cFake->querybuf = sdscat(cFake->querybuf,(sds)ptrFromObj(c->argv[2])); + cFake->querybuf = sdscatsds(cFake->querybuf,(sds)ptrFromObj(c->argv[2])); selectDb(cFake, c->db->id); processInputBuffer(cFake, (CMD_CALL_FULL & (~CMD_CALL_PROPAGATE))); cFake->lock.unlock(); diff --git a/tests/integration/replication-active.tcl b/tests/integration/replication-active.tcl index dfb89f603..99e0dc006 100644 --- a/tests/integration/replication-active.tcl +++ b/tests/integration/replication-active.tcl @@ -49,6 +49,15 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} { } } + test {Active replicas propogate binary} { + $master set binkey "\u0000foo" + wait_for_condition 50 500 { + [string match *foo* [$slave get binkey]] + } else { + fail "replication failed to propogate binary data" + } + } + test {Active replicas WAIT} { # Test that wait succeeds since replicas should be syncronized $master set testkey foo From ce96c69719661a80babe3f6321e9116387f43c49 Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 24 Jul 2019 22:49:30 -0400 Subject: [PATCH 03/38] RREPLAY failures should be logged Former-commit-id: b0a0f03b96b44005e905f5d46985d76d52a712ec --- src/replication.cpp | 7 ++++++- src/server.cpp | 1 + src/server.h | 1 + 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/src/replication.cpp b/src/replication.cpp index 4118087c0..461a2fd92 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -3267,9 +3267,14 @@ void replicaReplayCommand(client *c) cFake->puser = c->puser; cFake->querybuf = sdscatsds(cFake->querybuf,(sds)ptrFromObj(c->argv[2])); selectDb(cFake, c->db->id); + auto ccmdPrev = serverTL->commandsExecuted; processInputBuffer(cFake, (CMD_CALL_FULL & (~CMD_CALL_PROPAGATE))); + bool fExec = ccmdPrev != serverTL->commandsExecuted; cFake->lock.unlock(); - addReply(c, shared.ok); + if (fExec) + addReply(c, shared.ok); + else + addReplyError(c, "command did not execute"); freeClient(cFake); serverTL->current_client = current_clientSave; diff --git a/src/server.cpp b/src/server.cpp index 4ee6922d2..1aca94717 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -3348,6 +3348,7 @@ void call(client *c, int flags) { dirty = g_pserver->dirty; start = ustime(); c->cmd->proc(c); + serverTL->commandsExecuted++; duration = ustime()-start; dirty = g_pserver->dirty-dirty; if (dirty < 0) dirty = 0; diff --git a/src/server.h b/src/server.h index d057765f2..d2aa14637 100644 --- a/src/server.h +++ b/src/server.h @@ -1159,6 +1159,7 @@ struct redisServerThreadVars { client *lua_client = nullptr; /* The "fake client" to query Redis from Lua */ struct fastlock lockPendingWrite; char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ + long unsigned commandsExecuted = 0; }; struct redisMaster { From 8d36bab0e120cb2d184124f76b624e35aceb4a27 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 29 Jul 2019 15:08:41 -0400 Subject: [PATCH 04/38] Fix crash in RediSearch Former-commit-id: 6b6e6f6c1ef49f87f794de512489e5fbbfb67ca0 --- src/module.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/module.cpp b/src/module.cpp index 568205f90..8d935c0cc 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -3930,6 +3930,8 @@ void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) { void RM_ThreadSafeContextLock(RedisModuleCtx *ctx) { UNUSED(ctx); moduleAcquireGIL(FALSE /*fServerThread*/); + if (serverTL == nullptr) + serverTL = &g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN]; // arbitrary module threads get the main thread context } /* Release the server lock after a thread safe API call was executed. */ From cb6abcf3fa141c9b1eab820daf49e723cd60b5bb Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 26 Aug 2019 20:18:52 -0400 Subject: [PATCH 05/38] Fix race condition in PUB/SUB and other async reply commands where the client can be freed before our handler is executed on the client thread. When this occurs the client pointer is dangling Former-commit-id: fad9483fc920e5b1fa67e56d4b8483138b565bd3 --- .vscode/settings.json | 56 ------------------------------ src/ae.cpp | 33 +++++++++++++++++- src/debug.cpp | 3 ++ src/networking.cpp | 79 +++++++++++++++++++++++++++++++++++-------- src/pubsub.cpp | 10 ++++++ src/server.h | 6 ++-- 6 files changed, 113 insertions(+), 74 deletions(-) delete mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index 56bf76d11..000000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,56 +0,0 @@ -{ - "files.associations": { - "zmalloc.h": "c", - "stat.h": "c", - "array": "cpp", - "atomic": "cpp", - "*.tcc": "cpp", - "cctype": "cpp", - "chrono": "cpp", - "clocale": "cpp", - "cmath": "cpp", - "condition_variable": "cpp", - "cstdarg": "cpp", - "cstddef": "cpp", - "cstdint": "cpp", - "cstdio": "cpp", - "cstdlib": "cpp", - "cstring": "cpp", - "ctime": "cpp", - "cwchar": "cpp", - "cwctype": "cpp", - "deque": "cpp", - "list": "cpp", - "unordered_map": "cpp", - "vector": "cpp", - "exception": "cpp", - "fstream": "cpp", - "functional": "cpp", - "future": "cpp", - "initializer_list": "cpp", - "iomanip": "cpp", - "iosfwd": "cpp", - "iostream": "cpp", - "istream": "cpp", - "limits": "cpp", - "memory": "cpp", - "mutex": "cpp", - "new": "cpp", - "numeric": "cpp", - "optional": "cpp", - "ostream": "cpp", - "ratio": "cpp", - "scoped_allocator": "cpp", - "sstream": "cpp", - "stdexcept": "cpp", - "streambuf": "cpp", - "string_view": "cpp", - "system_error": "cpp", - "thread": "cpp", - "cinttypes": "cpp", - "tuple": "cpp", - "type_traits": "cpp", - "typeinfo": "cpp", - "utility": "cpp" - } -} diff --git a/src/ae.cpp b/src/ae.cpp index 48d6107b7..f636078b1 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -191,6 +191,36 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int ) } } +// Unlike write() this is an all or nothing thing. We will block if a partial write is hit +ssize_t safe_write(int fd, const void *pv, size_t cb) +{ + const char *pcb = (const char*)pv; + ssize_t written = 0; + do + { + ssize_t rval = write(fd, pcb, cb); + if (rval > 0) + { + pcb += rval; + cb -= rval; + written += rval; + } + else if (errno == EAGAIN) + { + if (written == 0) + break; + // if we've already written something then we're committed so keep trying + } + else + { + if (rval == 0) + return written; + return rval; + } + } while (cb); + return written; +} + int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData, int fSynchronous) { @@ -212,9 +242,10 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask, std::unique_lock ulock(cmd.pctl->mutexcv, std::defer_lock); if (fSynchronous) cmd.pctl->mutexcv.lock(); - auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); + auto size = safe_write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); if (size != sizeof(cmd)) { + AE_ASSERT(size == sizeof(cmd) || size <= 0); AE_ASSERT(errno == EAGAIN); ret = AE_ERR; } diff --git a/src/debug.cpp b/src/debug.cpp index 2fbbd9e2f..2ded21ea4 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -55,6 +55,8 @@ typedef ucontext_t sigcontext_t; #endif #endif +bool g_fInCrash = false; + /* ================================= Debugging ============================== */ /* Compute the sha1 of string at 's' with 'len' bytes long. @@ -1356,6 +1358,7 @@ void dumpX86Calls(void *addr, size_t len) { void sigsegvHandler(int sig, siginfo_t *info, void *secret) { ucontext_t *uc = (ucontext_t*) secret; + g_fInCrash = true; void *eip = getMcontextEip(uc); sds infostring, clients; struct sigaction act; diff --git a/src/networking.cpp b/src/networking.cpp index 174c21ff5..baaf796fd 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -174,6 +174,7 @@ client *createClient(int fd, int iel) { c->bufAsync = NULL; c->buflenAsync = 0; c->bufposAsync = 0; + c->casyncOpsPending = 0; memset(c->uuid, 0, UUID_BINARY_LEN); listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid); @@ -1003,7 +1004,6 @@ static void acceptCommonHandler(int fd, int flags, char *ip, int iel) { serverLog(LL_WARNING, "Error registering fd event for the new client: %s (fd=%d)", strerror(errno),fd); - close(fd); /* May be already closed, just ignore errors */ return; } @@ -1266,7 +1266,7 @@ void unlinkClient(client *c) { } } -void freeClient(client *c) { +bool freeClient(client *c) { listNode *ln; serverAssert(c->fd == -1 || GlobalLocksAcquired()); AssertCorrectThread(c); @@ -1274,9 +1274,9 @@ void freeClient(client *c) { /* If a client is protected, yet we need to free it right now, make sure * to at least use asynchronous freeing. */ - if (c->flags & CLIENT_PROTECTED) { + if (c->flags & CLIENT_PROTECTED || c->casyncOpsPending) { freeClientAsync(c); - return; + return false; } /* If it is our master that's beging disconnected we should make sure @@ -1291,7 +1291,7 @@ void freeClient(client *c) { CLIENT_BLOCKED))) { replicationCacheMaster(MasterInfoFromClient(c), c); - return; + return false; } } @@ -1370,6 +1370,7 @@ void freeClient(client *c) { ulock.unlock(); fastlock_free(&c->lock); zfree(c); + return true; } /* Schedule a client to free it at a safe time in the serverCron() function. @@ -1382,28 +1383,37 @@ void freeClientAsync(client *c) { * may access the list while Redis uses I/O threads. All the other accesses * are in the context of the main thread while the other threads are * idle. */ - if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return; + if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return; // check without the lock first std::lock_guardlock)> clientlock(c->lock); AeLocker lock; lock.arm(c); + if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return; // race condition after we acquire the lock c->flags |= CLIENT_CLOSE_ASAP; listAddNodeTail(g_pserver->clients_to_close,c); } void freeClientsInAsyncFreeQueue(int iel) { + serverAssert(GlobalLocksAcquired()); listIter li; listNode *ln; listRewind(g_pserver->clients_to_close,&li); - while((ln = listNext(&li))) { + // Store the clients in a temp vector since freeClient will modify this list + std::vector vecclientsFree; + while((ln = listNext(&li))) + { client *c = (client*)listNodeValue(ln); - if (c->iel != iel) - continue; // wrong thread + if (c->iel == iel) + { + vecclientsFree.push_back(c); + listDelNode(g_pserver->clients_to_close, ln); + } + } + for (client *c : vecclientsFree) + { c->flags &= ~CLIENT_CLOSE_ASAP; freeClient(c); - listDelNode(g_pserver->clients_to_close,ln); - listRewind(g_pserver->clients_to_close,&li); } } @@ -1551,6 +1561,15 @@ void ProcessPendingAsyncWrites() std::lock_guardlock)> lock(c->lock); serverAssert(c->fPendingAsyncWrite); + if (c->flags & (CLIENT_CLOSE_ASAP | CLIENT_CLOSE_AFTER_REPLY)) + { + c->bufposAsync = 0; + c->buflenAsync = 0; + zfree(c->bufAsync); + c->bufAsync = nullptr; + c->fPendingAsyncWrite = FALSE; + continue; + } // TODO: Append to end of reply block? @@ -1587,8 +1606,36 @@ void ProcessPendingAsyncWrites() continue; asyncCloseClientOnOutputBufferLimitReached(c); - if (aeCreateRemoteFileEvent(g_pserver->rgthreadvar[c->iel].el, c->fd, ae_flags, sendReplyToClient, c, FALSE) == AE_ERR) - continue; // We can retry later in the cron + if (c->flags & CLIENT_CLOSE_ASAP) + continue; // we will never write this so don't post an op + + std::atomic_thread_fence(std::memory_order_seq_cst); + + if (c->casyncOpsPending == 0) + { + if (FCorrectThread(c)) + { + prepareClientToWrite(c, false); // queue an event + } + else + { + // We need to start the write on the client's thread + if (aePostFunction(g_pserver->rgthreadvar[c->iel].el, [c]{ + // Install a write handler. Don't do the actual write here since we don't want + // to duplicate the throttling and safety mechanisms of the normal write code + std::lock_guardlock)> lock(c->lock); + serverAssert(c->casyncOpsPending > 0); + c->casyncOpsPending--; + aeCreateFileEvent(g_pserver->rgthreadvar[c->iel].el, c->fd, AE_WRITABLE|AE_WRITE_THREADSAFE, sendReplyToClient, c); + }, false) == AE_ERR + ) + { + // Posting the function failed + continue; // We can retry later in the cron + } + ++c->casyncOpsPending; // race is handled by the client lock in the lambda + } + } } } @@ -1628,13 +1675,15 @@ int handleClientsWithPendingWrites(int iel) { std::unique_locklock)> lock(c->lock); /* Try to write buffers to the client socket. */ - if (writeToClient(c->fd,c,0) == C_ERR) { + if (writeToClient(c->fd,c,0) == C_ERR) + { if (c->flags & CLIENT_CLOSE_ASAP) { lock.release(); // still locked AeLocker ae; ae.arm(c); - freeClient(c); // writeToClient will only async close, but there's no need to wait + if (!freeClient(c)) // writeToClient will only async close, but there's no need to wait + c->lock.unlock(); // if we just got put on the async close list, then we need to remove the lock } continue; } diff --git a/src/pubsub.cpp b/src/pubsub.cpp index 6a9c2bdfc..46677487f 100644 --- a/src/pubsub.cpp +++ b/src/pubsub.cpp @@ -143,6 +143,8 @@ int clientSubscriptionsCount(client *c) { /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or * 0 if the client was already subscribed to that channel. */ int pubsubSubscribeChannel(client *c, robj *channel) { + serverAssert(GlobalLocksAcquired()); + serverAssert(c->lock.fOwnLock()); dictEntry *de; list *clients = NULL; int retval = 0; @@ -202,6 +204,7 @@ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) { /* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the client was already subscribed to that pattern. */ int pubsubSubscribePattern(client *c, robj *pattern) { + serverAssert(GlobalLocksAcquired()); int retval = 0; if (listSearchKey(c->pubsub_patterns,pattern) == NULL) { @@ -244,6 +247,7 @@ int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) { /* Unsubscribe from all the channels. Return the number of channels the * client was subscribed to. */ int pubsubUnsubscribeAllChannels(client *c, int notify) { + serverAssert(GlobalLocksAcquired()); dictIterator *di = dictGetSafeIterator(c->pubsub_channels); dictEntry *de; int count = 0; @@ -262,6 +266,7 @@ int pubsubUnsubscribeAllChannels(client *c, int notify) { /* Unsubscribe from all the patterns. Return the number of patterns the * client was subscribed from. */ int pubsubUnsubscribeAllPatterns(client *c, int notify) { + serverAssert(GlobalLocksAcquired()); listNode *ln; listIter li; int count = 0; @@ -278,6 +283,7 @@ int pubsubUnsubscribeAllPatterns(client *c, int notify) { /* Publish a message */ int pubsubPublishMessage(robj *channel, robj *message) { + serverAssert(GlobalLocksAcquired()); int receivers = 0; dictEntry *de; listNode *ln; @@ -293,6 +299,8 @@ int pubsubPublishMessage(robj *channel, robj *message) { listRewind(list,&li); while ((ln = listNext(&li)) != NULL) { client *c = reinterpret_cast(ln->value); + if (c->flags & CLIENT_CLOSE_ASAP) // avoid blocking if the write will be ignored + continue; fastlock_lock(&c->lock); addReplyPubsubMessage(c,channel,message); fastlock_unlock(&c->lock); @@ -311,6 +319,8 @@ int pubsubPublishMessage(robj *channel, robj *message) { (char*)ptrFromObj(channel), sdslen(szFromObj(channel)),0)) { + if (pat->pclient->flags & CLIENT_CLOSE_ASAP) + continue; fastlock_lock(&pat->pclient->lock); addReplyPubsubPatMessage(pat->pclient, pat->pattern,channel,message); diff --git a/src/server.h b/src/server.h index d2aa14637..23f0d7aa0 100644 --- a/src/server.h +++ b/src/server.h @@ -925,6 +925,7 @@ typedef struct client { time_t lastinteraction; /* Time of the last interaction, used for timeout */ time_t obuf_soft_limit_reached_time; std::atomic flags; /* Client flags: CLIENT_* macros. */ + int casyncOpsPending; int fPendingAsyncWrite; /* NOTE: Not a flag because it is written to outside of the client lock (locked by the global lock instead) */ int authenticated; /* Needed when the default user requires auth. */ int replstate; /* Replication state if this is a slave. */ @@ -1694,7 +1695,7 @@ void redisSetProcTitle(const char *title); /* networking.c -- Networking and Client related operations */ client *createClient(int fd, int iel); void closeTimedoutClients(void); -void freeClient(client *c); +bool freeClient(client *c); void freeClientAsync(client *c); void resetClient(client *c); void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask); @@ -2503,9 +2504,10 @@ void xorDigest(unsigned char *digest, const void *ptr, size_t len); int populateCommandTableParseFlags(struct redisCommand *c, const char *strflags); int moduleGILAcquiredByModule(void); +extern bool g_fInCrash; static inline int GlobalLocksAcquired(void) // Used in asserts to verify all global locks are correctly acquired for a server-thread to operate { - return aeThreadOwnsLock() || moduleGILAcquiredByModule(); + return aeThreadOwnsLock() || moduleGILAcquiredByModule() || g_fInCrash; } inline int ielFromEventLoop(const aeEventLoop *eventLoop) From a6102185358b968d98fd7ac5eddda9162d72e70a Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 27 Aug 2019 15:32:24 -0400 Subject: [PATCH 06/38] Bump version Former-commit-id: bf905a78f0982cf4e13a4d9bbd9fa0166233b245 --- src/version.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/version.h b/src/version.h index c83af2844..db7af9930 100644 --- a/src/version.h +++ b/src/version.h @@ -1,3 +1,3 @@ -#define KEYDB_REAL_VERSION "5.0.0" +#define KEYDB_REAL_VERSION "5.0.1" extern const char *KEYDB_SET_VERSION; // Unlike real version, this can be overriden by the config From afa69ddcbf91d39d04afcdca0a767b4b70b826a8 Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 30 Jul 2019 16:54:25 -0400 Subject: [PATCH 07/38] Acquire the lock for modules that don't acquire it before calling like they are supposed to Former-commit-id: f83a89f82a30d4edbd8068172bc54e0f1fe0cc25 --- src/module.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/module.cpp b/src/module.cpp index 8d935c0cc..f071ae4e1 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -3920,7 +3920,9 @@ RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) { /* Release a thread safe context. */ void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) { + moduleAcquireGIL(false /*fServerThread*/); moduleFreeContext(ctx); + moduleReleaseGIL(false /*fServerThread*/); zfree(ctx); } From 216636c1341aa88a5ab68b3b82a0adcfdbac0e0b Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 6 Sep 2019 13:30:15 -0400 Subject: [PATCH 08/38] Fix issue where Active Replicas were commiting data to the wrong database under load Former-commit-id: 50a6a3ca389aef3d8f970faef5336f7053cf4cc5 --- src/replication.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/replication.cpp b/src/replication.cpp index 461a2fd92..52ba568a6 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -3272,9 +3272,14 @@ void replicaReplayCommand(client *c) bool fExec = ccmdPrev != serverTL->commandsExecuted; cFake->lock.unlock(); if (fExec) + { addReply(c, shared.ok); + selectDb(c, cFake->db->id); + } else + { addReplyError(c, "command did not execute"); + } freeClient(cFake); serverTL->current_client = current_clientSave; From 26c02f47fbf3208d8328a26633b3602bd90e54ec Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 6 Sep 2019 13:31:40 -0400 Subject: [PATCH 09/38] bump version Former-commit-id: 71d5a10621b762fccec10d38a72b0535f8f16502 --- src/version.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/version.h b/src/version.h index db7af9930..4ea159eb1 100644 --- a/src/version.h +++ b/src/version.h @@ -1,3 +1,3 @@ -#define KEYDB_REAL_VERSION "5.0.1" +#define KEYDB_REAL_VERSION "5.0.2" extern const char *KEYDB_SET_VERSION; // Unlike real version, this can be overriden by the config From e62e4ed6c3a9b0d6e2c83d72972173aa53d4d904 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 19 Sep 2019 15:39:52 -0400 Subject: [PATCH 10/38] Fix issue where AOF events are posted to the wrong event loop and not properly cleaned up Former-commit-id: 4589e861ab79992802cfa26ba06693996d75835c --- src/aof.cpp | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/src/aof.cpp b/src/aof.cpp index c7160489b..69e32fc3b 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -97,6 +97,7 @@ void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) { aofrwblock *block; ssize_t nwritten; serverAssert(GlobalLocksAcquired()); + serverAssert(el == g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el); // MUST run on main thread UNUSED(el); UNUSED(fd); @@ -164,10 +165,7 @@ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) { /* Install a file event to send data to the rewrite child if there is * not one already. */ - if (aeGetFileEvents(serverTL->el,g_pserver->aof_pipe_write_data_to_child) == 0) { - aeCreateFileEvent(serverTL->el, g_pserver->aof_pipe_write_data_to_child, - AE_WRITABLE, aofChildWriteDiffData, NULL); - } + aeCreateRemoteFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL, FALSE); } /* Write the buffer (possibly composed of multiple blocks) into the specified @@ -1508,7 +1506,7 @@ void aofChildPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) { } /* Remove the handler since this can be called only one time during a * rewrite. */ - aeDeleteFileEventAsync(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,g_pserver->aof_pipe_read_ack_from_child,AE_READABLE); + aeDeleteFileEvent(el,g_pserver->aof_pipe_read_ack_from_child,AE_READABLE); } /* Create the pipes used for parent - child process IPC during rewrite. @@ -1546,12 +1544,20 @@ error: } void aofClosePipes(void) { - aeDeleteFileEventAsync(g_pserver->el_alf_pip_read_ack_from_child,g_pserver->aof_pipe_read_ack_from_child,AE_READABLE); - aeDeleteFileEventAsync(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,g_pserver->aof_pipe_write_data_to_child,AE_WRITABLE); - close(g_pserver->aof_pipe_write_data_to_child); + int fdAofAckPipe = g_pserver->aof_pipe_read_ack_from_child; + aePostFunction(g_pserver->el_alf_pip_read_ack_from_child, [fdAofAckPipe]{ + aeDeleteFileEventAsync(serverTL->el,fdAofAckPipe,AE_READABLE); + close (fdAofAckPipe); + }); + + int fdAofWritePipe = g_pserver->aof_pipe_write_data_to_child; + aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [fdAofWritePipe]{ + aeDeleteFileEventAsync(serverTL->el,fdAofWritePipe,AE_WRITABLE); + close(fdAofWritePipe); + }); + close(g_pserver->aof_pipe_read_data_from_parent); close(g_pserver->aof_pipe_write_ack_to_parent); - close(g_pserver->aof_pipe_read_ack_from_child); close(g_pserver->aof_pipe_write_ack_to_child); close(g_pserver->aof_pipe_read_ack_from_parent); } From acab05cf08dfe19df8cc1ba9aa8258afa6a2b3cc Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 1 Oct 2019 17:53:48 -0400 Subject: [PATCH 11/38] Bump version Former-commit-id: 0a0c2146c88417baf8f905aaab7f1b45f4d7e46d --- src/version.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/version.h b/src/version.h index 4ea159eb1..d0a152517 100644 --- a/src/version.h +++ b/src/version.h @@ -1,3 +1,3 @@ -#define KEYDB_REAL_VERSION "5.0.2" +#define KEYDB_REAL_VERSION "5.1.0" extern const char *KEYDB_SET_VERSION; // Unlike real version, this can be overriden by the config From 6de2f0779aa425252df44c63f0e56a5ff11a7615 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 21 Oct 2019 12:21:46 -0400 Subject: [PATCH 12/38] Disable multithreaded KEYS due to bugs Former-commit-id: 3fac516950e831129da856f32fa373a56a6268a1 --- src/db.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/db.cpp b/src/db.cpp index 0d5042bab..f34245964 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -643,7 +643,9 @@ void keysCommand(client *c) { unsigned long numkeys = 0; void *replylen = addReplyDeferredLen(c); +#ifdef MULTITHREADED_KEYS aeReleaseLock(); +#endif di = dictGetSafeIterator(c->db->pdict); allkeys = (pattern[0] == '*' && pattern[1] == '\0'); @@ -663,11 +665,13 @@ void keysCommand(client *c) { dictReleaseIterator(di); setDeferredArrayLen(c,replylen,numkeys); +#ifdef MULTITHREADED_KEYS fastlock_unlock(&c->db->lock); // we must release the DB lock before acquiring the AE lock to prevent deadlocks AeLocker lock; lock.arm(c); fastlock_lock(&c->db->lock); // we still need the DB lock lock.release(); +#endif } /* This callback is used by scanGenericCommand in order to collect elements From c276b86feafc288344b95f5561ec739b6f58e062 Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 25 Oct 2019 01:00:28 -0400 Subject: [PATCH 13/38] bump version Former-commit-id: 3013799664932ce95a22425d13ab7203add07b52 --- src/version.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/version.h b/src/version.h index d0a152517..60b1425cd 100644 --- a/src/version.h +++ b/src/version.h @@ -1,3 +1,3 @@ -#define KEYDB_REAL_VERSION "5.1.0" +#define KEYDB_REAL_VERSION "5.1.1" extern const char *KEYDB_SET_VERSION; // Unlike real version, this can be overriden by the config From 1b3cbd056f1d450ce8bfcda289ac854b3fe7646c Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 25 Oct 2019 02:44:14 -0400 Subject: [PATCH 14/38] Fix potential race in pubsub Former-commit-id: a91f58b8fd5d267760d504b024068d0132569ae9 --- src/pubsub.cpp | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/pubsub.cpp b/src/pubsub.cpp index 46677487f..0c657a588 100644 --- a/src/pubsub.cpp +++ b/src/pubsub.cpp @@ -301,9 +301,11 @@ int pubsubPublishMessage(robj *channel, robj *message) { client *c = reinterpret_cast(ln->value); if (c->flags & CLIENT_CLOSE_ASAP) // avoid blocking if the write will be ignored continue; - fastlock_lock(&c->lock); + if (FCorrectThread(c)) + fastlock_lock(&c->lock); addReplyPubsubMessage(c,channel,message); - fastlock_unlock(&c->lock); + if (FCorrectThread(c)) + fastlock_unlock(&c->lock); receivers++; } } @@ -321,10 +323,12 @@ int pubsubPublishMessage(robj *channel, robj *message) { { if (pat->pclient->flags & CLIENT_CLOSE_ASAP) continue; - fastlock_lock(&pat->pclient->lock); + if (FCorrectThread(pat->pclient)) + fastlock_lock(&pat->pclient->lock); addReplyPubsubPatMessage(pat->pclient, pat->pattern,channel,message); - fastlock_unlock(&pat->pclient->lock); + if (FCorrectThread(pat->pclient)) + fastlock_unlock(&pat->pclient->lock); receivers++; } } From 032e5e4dc5ba79e8bd6e863ba34c4c7686a40c70 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 21 Nov 2019 20:16:42 -0500 Subject: [PATCH 15/38] Bump version Former-commit-id: 7dd53182e071641ff32813e5ba4b0e3fabe72abc --- src/version.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/version.h b/src/version.h index 60b1425cd..b2bbc88d1 100644 --- a/src/version.h +++ b/src/version.h @@ -1,3 +1,3 @@ -#define KEYDB_REAL_VERSION "5.1.1" +#define KEYDB_REAL_VERSION "5.2.0" extern const char *KEYDB_SET_VERSION; // Unlike real version, this can be overriden by the config From 2d5197d30eedc6533c78678a9a7f200be157a490 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 6 Jan 2020 12:07:57 -0500 Subject: [PATCH 16/38] Bump version Former-commit-id: 39acb312efbb06f38e98c3ebbb98d17a556b050a --- src/version.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/version.h b/src/version.h index b2bbc88d1..79edddc49 100644 --- a/src/version.h +++ b/src/version.h @@ -1,3 +1,3 @@ -#define KEYDB_REAL_VERSION "5.2.0" +#define KEYDB_REAL_VERSION "5.3.0" extern const char *KEYDB_SET_VERSION; // Unlike real version, this can be overriden by the config From 6e096ca33e877c0f4fc4612371e257aa61892167 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sat, 11 Jan 2020 16:34:09 -0500 Subject: [PATCH 17/38] Avoid crash due to excessive posted functions for AOF rewrite Former-commit-id: c575e7df9408ad7bd66ac7a104a38e841d525681 --- src/aof.cpp | 12 ++++++++---- src/server.h | 1 + 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/aof.cpp b/src/aof.cpp index d48b664d2..b82be9a34 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -165,10 +165,14 @@ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) { /* Install a file event to send data to the rewrite child if there is * not one already. */ - aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, []{ - if (g_pserver->aof_pipe_write_data_to_child >= 0) - aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL); - }); + if (!g_pserver->aof_rewrite_pending) { + g_pserver->aof_rewrite_pending = true; + aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [] { + g_pserver->aof_rewrite_pending = false; + if (g_pserver->aof_pipe_write_data_to_child >= 0) + aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL); + }); + } } /* Write the buffer (possibly composed of multiple blocks) into the specified diff --git a/src/server.h b/src/server.h index 5733264e0..6b8903092 100644 --- a/src/server.h +++ b/src/server.h @@ -1733,6 +1733,7 @@ struct redisServer { int aof_stop_sending_diff; /* If true stop sending accumulated diffs to child process. */ sds aof_child_diff; /* AOF diff accumulator child side. */ + int aof_rewrite_pending = 0; /* is a call to aofChildWriteDiffData already queued? */ /* RDB persistence */ long long dirty; /* Changes to DB from the last save */ long long dirty_before_bgsave; /* Used to restore dirty on failed BGSAVE */ From 5f3fde673b96bb9b1cb426765dd6516359c2eecb Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 20 Jan 2020 19:20:50 -0500 Subject: [PATCH 18/38] Bump version Former-commit-id: f4319c12803f27a93150178f8b61f10aea09ce01 --- src/version.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/version.h b/src/version.h index 79edddc49..b20005ade 100644 --- a/src/version.h +++ b/src/version.h @@ -1,3 +1,3 @@ -#define KEYDB_REAL_VERSION "5.3.0" +#define KEYDB_REAL_VERSION "5.3.1" extern const char *KEYDB_SET_VERSION; // Unlike real version, this can be overriden by the config From b69a8826fa189c1e3614fdeec8acf9feafb5cd7f Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 28 Feb 2020 23:53:54 -0500 Subject: [PATCH 19/38] Bump version Former-commit-id: 3ce6441a0c0f5c58f31f5fa28116cea09aebbf35 --- src/version.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/version.h b/src/version.h index b20005ade..751337aaa 100644 --- a/src/version.h +++ b/src/version.h @@ -1,3 +1,3 @@ -#define KEYDB_REAL_VERSION "5.3.1" +#define KEYDB_REAL_VERSION "5.3.2" extern const char *KEYDB_SET_VERSION; // Unlike real version, this can be overriden by the config From 591cc30206965307c09f0bc1f8c1fbefe53eee4c Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 4 Mar 2020 17:23:40 -0500 Subject: [PATCH 20/38] Fix CLANG build break Former-commit-id: e523afa7410399697659106c88e9f65e2cffae79 --- src/server.h | 1 + 1 file changed, 1 insertion(+) diff --git a/src/server.h b/src/server.h index 1e6632576..94d8a0d11 100644 --- a/src/server.h +++ b/src/server.h @@ -55,6 +55,7 @@ #include #include #include +#include #ifdef __cplusplus extern "C" { #include From 96be5f6a6c7a6787d4864a1d555ed98f09f6cad2 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 19 Mar 2020 14:34:23 -0400 Subject: [PATCH 21/38] Prevent dangling lock when we can't free the client Former-commit-id: 3c373494d63b21744b264f0a47e6999bcdda6b2b --- src/networking.cpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/networking.cpp b/src/networking.cpp index 58bd79bcc..f6425af60 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1630,10 +1630,13 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { if (writeToClient(fd,c,1) == C_ERR) { AeLocker ae; - c->lock.lock(); + c->lock(); ae.arm(c); if (c->flags & CLIENT_CLOSE_ASAP) - freeClient(c); + { + if (!freeClient(c)) + c->unlock(); + } } } From 1b9ef774da21139112f878b7dd71848528835628 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 19 Mar 2020 15:28:39 -0400 Subject: [PATCH 22/38] Fix lock inversion in processEventsWhileBlocked Former-commit-id: a9249d4a82a0f0355ac8ffa40b34b9c14cabf66b --- src/networking.cpp | 35 +++++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/src/networking.cpp b/src/networking.cpp index f6425af60..94b202c17 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1630,12 +1630,12 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { if (writeToClient(fd,c,1) == C_ERR) { AeLocker ae; - c->lock(); + c->lock.lock(); ae.arm(c); if (c->flags & CLIENT_CLOSE_ASAP) { if (!freeClient(c)) - c->unlock(); + c->lock.unlock(); } } } @@ -3107,12 +3107,23 @@ int processEventsWhileBlocked(int iel) { int iterations = 4; /* See the function top-comment. */ int count = 0; - client *c = serverTL->current_client; - if (c != nullptr) + std::vector vecclients; + listIter li; + listNode *ln; + listRewind(g_pserver->clients, &li); + + // All client locks must be acquired *after* the global lock is reacquired to prevent deadlocks + // so unlock here, and save them for reacquisition later + while ((ln = listNext(&li)) != nullptr) { - serverAssert(c->flags & CLIENT_PROTECTED); - c->lock.unlock(); + client *c = (client*)listNodeValue(ln); + if (c->lock.fOwnLock()) { + serverAssert(c->flags & CLIENT_PROTECTED); // If the client is not protected we have no gurantee they won't be free'd in the event loop + c->lock.unlock(); + vecclients.push_back(c); + } } + aeReleaseLock(); try @@ -3129,18 +3140,18 @@ int processEventsWhileBlocked(int iel) { { // Caller expects us to be locked so fix and rethrow AeLocker locker; - if (c != nullptr) - c->lock.lock(); - locker.arm(c); + locker.arm(nullptr); locker.release(); + for (client *c : vecclients) + c->lock.lock(); throw; } AeLocker locker; - if (c != nullptr) - c->lock.lock(); - locker.arm(c); + locker.arm(nullptr); locker.release(); + for (client *c : vecclients) + c->lock.lock(); return count; } From 833f5b39f45aee3a23ad3d67000dc2013dbb9d9b Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 19 Mar 2020 15:37:24 -0400 Subject: [PATCH 23/38] Log which thread a message came from Former-commit-id: bc1eccb66d3302d6c99588fb4a5a879e1ef243b1 --- src/server.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/server.cpp b/src/server.cpp index d6fe1ba1f..22bca3963 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1037,6 +1037,7 @@ struct redisCommand redisCommandTable[] = { /* We use a private localtime implementation which is fork-safe. The logging * function of Redis may be called from other threads. */ extern "C" void nolocks_localtime(struct tm *tmp, time_t t, time_t tz, int dst); +extern "C" pid_t gettid(); /* Low level logging. To use only for very big messages, otherwise * serverLog() is to prefer. */ @@ -1074,8 +1075,8 @@ void serverLogRaw(int level, const char *msg) { } else { role_char = (listLength(g_pserver->masters) ? 'S':'M'); /* Slave or Master. */ } - fprintf(fp,"%d:%c %s %c %s\n", - (int)getpid(),role_char, buf,c[level],msg); + fprintf(fp,"%d:%d:%c %s %c %s\n", + (int)getpid(),(int)gettid(),role_char, buf,c[level],msg); } fflush(fp); From 4f55afa9483577578f131393ac06705cc206f435 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 19 Mar 2020 15:37:49 -0400 Subject: [PATCH 24/38] Print stack traces of other threads in the deadlock detector Former-commit-id: 6b0172b9bf124372f4c8688c91c380c6c1b607c7 --- src/fastlock.cpp | 92 ++++++++++++++++++++++++++++++++++-------------- 1 file changed, 66 insertions(+), 26 deletions(-) diff --git a/src/fastlock.cpp b/src/fastlock.cpp index 49c0fb095..861bfb7ca 100644 --- a/src/fastlock.cpp +++ b/src/fastlock.cpp @@ -43,6 +43,7 @@ #include #include #include +#include "config.h" #ifdef __APPLE__ #include @@ -60,6 +61,11 @@ #define UNUSED(x) ((void)x) #endif +#ifdef HAVE_BACKTRACE +#include +__attribute__((weak)) void logStackTrace(ucontext_t *) {} +#endif + extern int g_fInCrash; /**************************************************** @@ -149,6 +155,43 @@ __attribute__((weak)) void serverLog(int , const char *fmt, ...) printf("\n"); } +extern "C" pid_t gettid() +{ + static thread_local int pidCache = -1; +#ifdef __linux__ + if (pidCache == -1) + pidCache = syscall(SYS_gettid); +#else + if (pidCache == -1) { + uint64_t tidT; + pthread_threadid_np(nullptr, &tidT); + assert(tidT < UINT_MAX); + pidCache = (int)tidT; + } +#endif + return pidCache; +} + +void printTrace() +{ +#ifdef HAVE_BACKTRACE + serverLog(3 /*LL_WARNING*/, "printing backtrace for thread %d", gettid()); + ucontext_t ctxt; + getcontext(&ctxt); + logStackTrace(&ctxt); +#endif +} + + +#ifdef __linux__ +static int futex(volatile unsigned *uaddr, int futex_op, int val, + const struct timespec *timeout, int val3) +{ + return syscall(SYS_futex, uaddr, futex_op, val, + timeout, uaddr, val3); +} +#endif + class DeadlockDetector { std::map m_mapwait; @@ -156,9 +199,19 @@ class DeadlockDetector public: void registerwait(fastlock *lock, pid_t thispid) { + static volatile bool fInDeadlock = false; + if (lock == &m_lock || g_fInCrash) return; fastlock_lock(&m_lock); + + if (fInDeadlock) + { + printTrace(); + fastlock_unlock(&m_lock); + return; + } + m_mapwait.insert(std::make_pair(thispid, lock)); // Detect cycles @@ -184,6 +237,19 @@ public: if (pidCheck == thispid) break; } + // Wake All sleeping threads so they can print their callstacks +#ifdef HAVE_BACKTRACE +#ifdef __linux__ + int mask = -1; + fInDeadlock = true; + fastlock_unlock(&m_lock); + futex(&lock->m_ticket.u, FUTEX_WAKE_BITSET_PRIVATE, INT_MAX, nullptr, mask); + futex(&itr->second->m_ticket.u, FUTEX_WAKE_BITSET_PRIVATE, INT_MAX, nullptr, mask); + sleep(2); + fastlock_lock(&m_lock); + printTrace(); +#endif +#endif serverLog(3 /*LL_WARNING*/, "!!! KeyDB Will Now Crash !!!"); _serverPanic(__FILE__, __LINE__, "Deadlock detected"); } @@ -222,32 +288,6 @@ uint64_t fastlock_getlongwaitcount() return rval; } -#ifdef __linux__ -static int futex(volatile unsigned *uaddr, int futex_op, int val, - const struct timespec *timeout, int val3) -{ - return syscall(SYS_futex, uaddr, futex_op, val, - timeout, uaddr, val3); -} -#endif - -extern "C" pid_t gettid() -{ - static thread_local int pidCache = -1; -#ifdef __linux__ - if (pidCache == -1) - pidCache = syscall(SYS_gettid); -#else - if (pidCache == -1) { - uint64_t tidT; - pthread_threadid_np(nullptr, &tidT); - assert(tidT < UINT_MAX); - pidCache = (int)tidT; - } -#endif - return pidCache; -} - extern "C" void fastlock_sleep(fastlock *lock, pid_t pid, unsigned wake, unsigned mask) { #ifdef __linux__ From 2b199b384dc68482a6c443a9e46b8d10befc0b83 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 19 Mar 2020 15:52:24 -0400 Subject: [PATCH 25/38] Unstable branch should always be version 0.0.0 Former-commit-id: 448808529ab9e9e4705c2e7cbda5b3e6523b7c6f --- src/version.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/version.h b/src/version.h index 751337aaa..db32931f1 100644 --- a/src/version.h +++ b/src/version.h @@ -1,3 +1,3 @@ -#define KEYDB_REAL_VERSION "5.3.2" +#define KEYDB_REAL_VERSION "0.0.0" extern const char *KEYDB_SET_VERSION; // Unlike real version, this can be overriden by the config From b695c0279ea2be26b4c913adaaf25342f2ae5cb5 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 23 Mar 2020 18:06:47 -0400 Subject: [PATCH 26/38] Prevent bookkeeping keys from leaking through Former-commit-id: 1a5af28e115fc123bf250b4a80ac69232bb3add0 From 28c632729ec6bc882bb7f4e9db6520aec2de6b88 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 23 Mar 2020 19:47:48 -0400 Subject: [PATCH 27/38] Fix incorrect count when loading FLASH Former-commit-id: 38ee9c0df144621f7ca750527e9efb16e754ef40 From f0e85993adb0ff69fbbca854c1e676847796db9d Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 23 Mar 2020 22:51:46 -0400 Subject: [PATCH 28/38] Fix incorrect prefix comparison Former-commit-id: 1ef167546be0678edd457d65a5368e8706fde0a3 From ad8a61697baf2b92c70b89f3677666e01aa15009 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 23 Mar 2020 23:12:10 -0400 Subject: [PATCH 29/38] Fix OOM errors during forkless bgsave Former-commit-id: c31c64b13409c741e8d52ad06add78300c39fce2 --- src/evict.cpp | 33 ++++++++++++++++++++++----------- src/server.h | 2 +- src/snapshot.cpp | 4 ++-- 3 files changed, 25 insertions(+), 14 deletions(-) diff --git a/src/evict.cpp b/src/evict.cpp index ddf8e1818..060440d25 100644 --- a/src/evict.cpp +++ b/src/evict.cpp @@ -429,9 +429,12 @@ int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *lev * to subtract the slaves output buffers. We can just return ASAP. */ mem_reported = zmalloc_used_memory(); if (total) *total = mem_reported; + size_t maxmemory = g_pserver->maxmemory; + if (g_pserver->FRdbSaveInProgress()) + maxmemory *= 2; /* We may return ASAP if there is no need to compute the level. */ - int return_ok_asap = !g_pserver->maxmemory || mem_reported <= g_pserver->maxmemory; + int return_ok_asap = !maxmemory || mem_reported <= maxmemory; if (return_ok_asap && !level) return C_OK; /* Remove the size of slaves output buffers and AOF buffer from the @@ -442,20 +445,20 @@ int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *lev /* Compute the ratio of memory usage. */ if (level) { - if (!g_pserver->maxmemory) { + if (!maxmemory) { *level = 0; } else { - *level = (float)mem_used / (float)g_pserver->maxmemory; + *level = (float)mem_used / (float)maxmemory; } } if (return_ok_asap) return C_OK; /* Check if we are still over the memory limit. */ - if (mem_used <= g_pserver->maxmemory) return C_OK; + if (mem_used <= maxmemory) return C_OK; /* Compute how much memory we need to free. */ - mem_tofree = mem_used - g_pserver->maxmemory; + mem_tofree = mem_used - maxmemory; if (logical) *logical = mem_used; if (tofree) *tofree = mem_tofree; @@ -483,6 +486,8 @@ int freeMemoryIfNeeded(void) { mstime_t latency, eviction_latency; long long delta; int slaves = listLength(g_pserver->slaves); + const bool fEvictToStorage = !cserver.delete_on_evict && g_pserver->db[0]->FStorageProvider(); + /* When clients are paused the dataset should be static not just from the * POV of clients not being able to write, but also from the POV of @@ -503,6 +508,7 @@ int freeMemoryIfNeeded(void) { sds bestkey = NULL; int bestdbid; redisDb *db; + bool fFallback = false; if (g_pserver->maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) || g_pserver->maxmemory_policy == MAXMEMORY_VOLATILE_TTL) @@ -534,7 +540,9 @@ int freeMemoryIfNeeded(void) { /* Go backward from best to worst element to evict. */ for (k = EVPOOL_SIZE-1; k >= 0; k--) { - if (pool[k].key == NULL) continue; + if (pool[k].key == NULL) { + continue; + } bestdbid = pool[k].dbid; sds key = nullptr; @@ -558,11 +566,14 @@ int freeMemoryIfNeeded(void) { } } } + if (bestkey == nullptr && fEvictToStorage) + fFallback = true; } /* volatile-random and allkeys-random policy */ - else if (g_pserver->maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM || - g_pserver->maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM) + if (g_pserver->maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM || + g_pserver->maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM + || fEvictToStorage) { /* When evicting a random key, we try to evict a key for * each DB, so we use the static 'next_db' variable to @@ -570,10 +581,10 @@ int freeMemoryIfNeeded(void) { for (i = 0; i < cserver.dbnum; i++) { j = (++next_db) % cserver.dbnum; db = g_pserver->db[j]; - if (g_pserver->maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) + if (g_pserver->maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM || fFallback) { if (db->size() != 0) { - auto itr = db->random_cache_threadsafe(); + auto itr = db->random_cache_threadsafe(true /*fPrimaryOnly*/); // primary only because we can't evict a snapshot key bestkey = itr.key(); bestdbid = j; break; @@ -595,7 +606,7 @@ int freeMemoryIfNeeded(void) { if (bestkey) { db = g_pserver->db[bestdbid]; - if (!cserver.delete_on_evict && db->FStorageProvider()) + if (fEvictToStorage) { // This key is in the storage so we only need to free the object delta = (long long) zmalloc_used_memory(); diff --git a/src/server.h b/src/server.h index cb5ce5a3b..7fdee9aac 100644 --- a/src/server.h +++ b/src/server.h @@ -1380,7 +1380,7 @@ public: using redisDbPersistentData::endSnapshotAsync; using redisDbPersistentData::end; - dict_iter random_cache_threadsafe() const; + dict_iter random_cache_threadsafe(bool fPrimaryOnly = false) const; dict_iter find_cached_threadsafe(const char *key) const; expireEntry *getExpire(robj_roptr key) { return getExpire(szFromObj(key)); } diff --git a/src/snapshot.cpp b/src/snapshot.cpp index ba8875ddd..a5a6830a2 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -333,11 +333,11 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn serverAssert(sizeStart == size()); } -dict_iter redisDbPersistentDataSnapshot::random_cache_threadsafe() const +dict_iter redisDbPersistentDataSnapshot::random_cache_threadsafe(bool fPrimaryOnly) const { if (size() == 0) return dict_iter(nullptr); - if (m_pdbSnapshot != nullptr && m_pdbSnapshot->size() > 0) + if (!fPrimaryOnly && m_pdbSnapshot != nullptr && m_pdbSnapshot->size() > 0) { dict_iter iter(nullptr); double pctInSnapshot = (double)m_pdbSnapshot->size() / (size() + m_pdbSnapshot->size()); From aed3d33499bf7e5ce19d7c3b0163e95cdcbc2353 Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 24 Mar 2020 00:21:12 -0400 Subject: [PATCH 30/38] Prevent issue where count can be out of sync temporarily, causing crashes where we expect the count to be perfect Former-commit-id: 77c9f36413c6f0cbb0b13a7ec746746c97faadcd --- src/aof.cpp | 4 ++-- src/db.cpp | 18 ++++++------------ src/rdb.cpp | 4 ++-- src/server.cpp | 17 ++++++++--------- src/server.h | 19 +++++++++---------- 5 files changed, 27 insertions(+), 35 deletions(-) diff --git a/src/aof.cpp b/src/aof.cpp index b9e70734e..cb01f3978 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -899,8 +899,8 @@ int loadAppendOnlyFile(char *filename) { loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */ for (int idb = 0; idb < cserver.dbnum; ++idb) { - auto vec = g_pserver->db[idb]->processChanges(); - g_pserver->db[idb]->commitChanges(vec); + g_pserver->db[idb]->processChanges(); + g_pserver->db[idb]->commitChanges(); } fclose(fp); freeFakeClient(fakeClient); diff --git a/src/db.cpp b/src/db.cpp index 9adf8cd11..bf256edc4 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -2257,13 +2257,12 @@ void redisDbPersistentData::storeDatabase() dictReleaseIterator(di); } -redisDbPersistentData::changelist redisDbPersistentData::processChanges() +void redisDbPersistentData::processChanges() { serverAssert(GlobalLocksAcquired()); --m_fTrackingChanges; serverAssert(m_fTrackingChanges >= 0); - changelist vecRet; if (m_spstorage != nullptr) { @@ -2285,23 +2284,18 @@ redisDbPersistentData::changelist redisDbPersistentData::processChanges() continue; robj *o = (robj*)dictGetVal(de); sds temp = serializeStoredObjectAndExpire(this, (const char*) dictGetKey(de), o); - vecRet.emplace_back(std::move(change), unique_sds_ptr(temp)); + m_spstorage->insert(change.strkey.get(), sdslen(change.strkey.get()), temp, sdslen(temp), change.fUpdate); + sdsfree(temp); } } m_setchanged.clear(); m_cnewKeysPending = 0; } } - - return vecRet; } -void redisDbPersistentData::commitChanges(const changelist &vec) +void redisDbPersistentData::commitChanges() { - for (auto &pair : vec) - { - m_spstorage->insert(pair.first.strkey.get(), sdslen(pair.first.strkey.get()), pair.second.get(), sdslen(pair.second.get()), pair.first.fUpdate); - } if (m_spstorage != nullptr) m_spstorage->endWriteBatch(); } @@ -2379,8 +2373,8 @@ void redisDbPersistentData::removeAllCachedValues() // First we have to flush the tracked changes if (m_fTrackingChanges) { - auto vec = processChanges(); - commitChanges(vec); + processChanges(); + commitChanges(); trackChanges(false); } diff --git a/src/rdb.cpp b/src/rdb.cpp index e350adf15..05180c7ed 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2534,8 +2534,8 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { for (int idb = 0; idb < cserver.dbnum; ++idb) { - auto vec = g_pserver->db[idb]->processChanges(); - g_pserver->db[idb]->commitChanges(vec); + g_pserver->db[idb]->processChanges(); + g_pserver->db[idb]->commitChanges(); } return C_OK; diff --git a/src/server.cpp b/src/server.cpp index bb5f47a80..dbcb4f542 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2339,21 +2339,20 @@ void beforeSleep(struct aeEventLoop *eventLoop) { static thread_local bool fFirstRun = true; // note: we also copy the DB pointer in case a DB swap is done while the lock is released - std::vector> vecchanges; + std::vector vecdb; // note we cache the database pointer in case a dbswap is done while the lock is released if (!fFirstRun) { - for (int idb = 0; idb < cserver.dbnum; ++idb) - { - auto vec = g_pserver->db[idb]->processChanges(); - vecchanges.emplace_back(g_pserver->db[idb], std::move(vec)); + for (int idb = 0; idb < cserver.dbnum; ++idb) { + vecdb.push_back(g_pserver->db[idb]); + g_pserver->db[idb]->processChanges(); } - } - else { + } else { fFirstRun = false; } aeReleaseLock(); - for (auto &pair : vecchanges) - pair.first->commitChanges(pair.second); + for (redisDb *db : vecdb) + db->commitChanges(); + handleClientsWithPendingWrites(iel); if (serverTL->gcEpoch != 0) diff --git a/src/server.h b/src/server.h index 7fdee9aac..7f1953098 100644 --- a/src/server.h +++ b/src/server.h @@ -1293,16 +1293,8 @@ public: // to allow you to release the global lock before commiting. To prevent deadlocks you *must* // either release the global lock or keep the same global lock between the two functions as // a second look is kept to ensure writes to secondary storage are ordered - struct changedesc - { - sdsimmutablestring strkey; - bool fUpdate; - - changedesc(const char *strkey, bool fUpdate) : strkey(strkey), fUpdate(fUpdate) {} - }; - typedef std::vector> changelist; - changelist processChanges(); - void commitChanges(const changelist &vec); + void processChanges(); + void commitChanges(); // This should only be used if you look at the key, we do not fixup // objects stored elsewhere @@ -1326,6 +1318,13 @@ protected: uint64_t m_mvccCheckpoint = 0; private: + struct changedesc + { + sdsimmutablestring strkey; + bool fUpdate; + + changedesc(const char *strkey, bool fUpdate) : strkey(strkey), fUpdate(fUpdate) {} + }; struct changedescCmp { using is_transparent = void; // C++14 to allow comparisons with different types From 982175b5840f02d09b4fbf1db9e97d5f8faa72d2 Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 24 Mar 2020 14:49:43 -0400 Subject: [PATCH 31/38] Evict on load if we have a storage provider Former-commit-id: bb091796c3da7282e040c7b72a28ec1c5f5ecfb7 --- src/rdb.cpp | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/rdb.cpp b/src/rdb.cpp index 05180c7ed..6554a10bb 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2231,6 +2231,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now; long long lru_clock = 0; uint64_t mvcc_tstamp = OBJ_MVCC_INVALID; + size_t ckeysLoaded = 0; robj *subexpireKey = nullptr; robj *key = nullptr; @@ -2476,11 +2477,29 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { decrRefCount(val); val = nullptr; } else { + /* If we have a storage provider check if we need to evict some keys to stay under our memory limit, + do this every 16 keys to limit the perf impact */ + if (g_pserver->m_pstorageFactory && (ckeysLoaded % 16) == 0) + { + if (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK) + { + for (int idb = 0; idb < cserver.dbnum; ++idb) + { + g_pserver->db[idb]->processChanges(); + g_pserver->db[idb]->commitChanges(); + g_pserver->db[idb]->trackChanges(true); + } + freeMemoryIfNeeded(); + } + } + /* Add the new object in the hash table */ int fInserted = dbMerge(db, key, val, rsi && rsi->fForceSetKey); // Note: dbMerge will incrRef if (fInserted) { + ++ckeysLoaded; + /* Set the expire time if needed */ if (expiretime != -1) setExpire(NULL,db,key,nullptr,expiretime); From 9ff5d3f3c4de2659b761035de8c6e78137fa3751 Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 25 Mar 2020 01:06:40 -0400 Subject: [PATCH 32/38] Expire entry needs to be resorted after a subkey expires Former-commit-id: b357803362728c26a1169e3cec279c693b86205b --- src/expire.cpp | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/expire.cpp b/src/expire.cpp index 66a05e4ef..1ac6ab415 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -135,6 +135,15 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { if (deleted) { + if (!pfat->FEmpty()) + { + // We need to resort the expire entry since it may no longer be in the correct position + auto itr = db->setexpire->find(e.key()); + expireEntry eT = std::move(e); + db->setexpire->erase(itr); + db->setexpire->insert(eT); + } + robj objT; switch (val->type) { From 41dfd175a8a7740557f1f9cf6e280cf1121afa07 Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 25 Mar 2020 01:38:58 -0400 Subject: [PATCH 33/38] Fix failure to load subkey expires Former-commit-id: 528a43bd6c80f073d928dd18c4f67f37cfd0977a --- src/rdb.cpp | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/src/rdb.cpp b/src/rdb.cpp index 5b316a4a8..431ce9c5f 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2095,7 +2095,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { incrRefCount(subexpireKey); } else if (!strcasecmp(szFromObj(auxkey), "keydb-subexpire-when")) { if (key == nullptr || subexpireKey == nullptr) { - serverLog(LL_WARNING, "Corrupt subexpire entry in RDB skipping."); + serverLog(LL_WARNING, "Corrupt subexpire entry in RDB skipping. key: %s subkey: %s", key != nullptr ? szFromObj(key) : "(null)", subexpireKey != nullptr ? szFromObj(subexpireKey) : "(null)"); } else { setExpire(NULL, db, key, subexpireKey, strtoll(szFromObj(auxval), nullptr, 10)); @@ -2187,8 +2187,6 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { decrRefCount(val); val = nullptr; } - decrRefCount(key); - key = nullptr; } /* Reset the state that is key-specified and is populated by @@ -2199,7 +2197,10 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { } if (key != nullptr) + { decrRefCount(key); + key = nullptr; + } if (subexpireKey != nullptr) { @@ -2226,6 +2227,17 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { return C_OK; eoferr: /* unexpected end of file is handled here with a fatal exit */ + if (key != nullptr) + { + decrRefCount(key); + key = nullptr; + } + if (subexpireKey != nullptr) + { + decrRefCount(subexpireKey); + subexpireKey = nullptr; + } + serverLog(LL_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now."); rdbExitReportCorruptRDB("Unexpected EOF reading RDB file"); return C_ERR; /* Just to avoid warning */ From 118bd49a4e52ccd381ad241eef24540309e54548 Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 25 Mar 2020 15:34:30 -0400 Subject: [PATCH 34/38] Add roundtrip test for subkey expires Former-commit-id: 56fc6b7deb59cfb3219d65c01c96969d3983e84a --- tests/unit/expire.tcl | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/tests/unit/expire.tcl b/tests/unit/expire.tcl index 477df0242..39ef28ea9 100644 --- a/tests/unit/expire.tcl +++ b/tests/unit/expire.tcl @@ -220,7 +220,7 @@ start_server {tags {"expire"}} { assert {$ttl <= 98 && $ttl > 90} } - test { EXPIREMEMBER works (set) } { + test {EXPIREMEMBER works (set)} { r flushall r sadd testkey foo bar baz r expiremember testkey foo 1 @@ -228,7 +228,7 @@ start_server {tags {"expire"}} { assert_equal {2} [r scard testkey] } - test { EXPIREMEMBER works (hash) } { + test {EXPIREMEMBER works (hash)} { r flushall r hset testkey foo bar r expiremember testkey foo 1 @@ -236,7 +236,7 @@ start_server {tags {"expire"}} { r exists testkey } {0} - test { EXPIREMEMBER works (zset) } { + test {EXPIREMEMBER works (zset)} { r flushall r zadd testkey 1 foo r zadd testkey 2 bar @@ -246,10 +246,19 @@ start_server {tags {"expire"}} { assert_equal {1} [r zcard testkey] } - test { TTL for subkey expires works } { + test {TTL for subkey expires works} { r flushall r sadd testkey foo bar baz r expiremember testkey foo 10000 assert [expr [r ttl testkey foo] > 0] } + + test {Roundtrip for subkey expires works} { + r flushall + r sadd testkey foo bar baz + r expiremember testkey foo 10000 + r save + r debug reload + assert [expr [r ttl testkey foo] > 0] + } } From 6c04367c8df66caa0cf77284117c5be9207eae8d Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 25 Mar 2020 16:22:32 -0400 Subject: [PATCH 35/38] Fix breaks from merge Former-commit-id: fa76d19bee9df21967c4d8554128eebdd19021fa --- src/db.cpp | 8 ++++++++ src/expire.cpp | 15 +++++++++------ src/server.h | 2 ++ 3 files changed, 19 insertions(+), 6 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index bf256edc4..1324c6e14 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -1469,6 +1469,14 @@ int redisDbPersistentData::removeSubkeyExpire(robj *key, robj *subkey) { return found; } +void redisDbPersistentData::resortExpire(expireEntry &e) +{ + auto itr = m_setexpire->find(e.key()); + expireEntry eT = std::move(e); + m_setexpire->erase(itr); + m_setexpire->insert(eT); +} + /* Set an expire to the specified key. If the expire is set in the context * of an user calling a command 'c' is the client, otherwise 'c' is set * to NULL. The 'when' parameter is the absolute unix time in milliseconds diff --git a/src/expire.cpp b/src/expire.cpp index b7cb49eaf..34051b521 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -132,15 +132,17 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { pfat->popfrontExpireEntry(); } + robj *keyobj = nullptr; + + if (deleted || pfat->FEmpty()) + keyobj = createStringObject(e.key(),sdslen(e.key())); + if (deleted) { if (!pfat->FEmpty()) { // We need to resort the expire entry since it may no longer be in the correct position - auto itr = db->setexpire->find(e.key()); - expireEntry eT = std::move(e); - db->setexpire->erase(itr); - db->setexpire->insert(eT); + db->resortExpire(e); } robj objT; @@ -156,10 +158,11 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { if (pfat->FEmpty()) { - robj *keyobj = createStringObject(e.key(),sdslen(e.key())); removeExpire(db, keyobj); - decrRefCount(keyobj); } + + if (keyobj) + decrRefCount(keyobj); } int parseUnitString(const char *sz) diff --git a/src/server.h b/src/server.h index 7f1953098..6651837b7 100644 --- a/src/server.h +++ b/src/server.h @@ -1277,6 +1277,7 @@ public: size_t expireSize() const { return m_setexpire->size(); } int removeExpire(robj *key, dict_iter itr); int removeSubkeyExpire(robj *key, robj *subkey); + void resortExpire(expireEntry &e); void clear(void(callback)(void*)); void emptyDbAsync(); // Note: If you do not need the obj then use the objless iterator version. It's faster @@ -1468,6 +1469,7 @@ struct redisDb : public redisDbPersistentDataSnapshot using redisDbPersistentData::consolidate_snapshot; using redisDbPersistentData::removeAllCachedValues; using redisDbPersistentData::dictUnsafeKeyOnly; + using redisDbPersistentData::resortExpire; public: expireset::setiter expireitr; From b443553a3d5aec33c0bb05acd65c9d0b24834e42 Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 25 Mar 2020 16:27:24 -0400 Subject: [PATCH 36/38] Give a better error when handling std::terminate Former-commit-id: 7b79ec360ba046da6d9dbf3cc731bbdee1458d34 --- src/server.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server.cpp b/src/server.cpp index 2a3054973..ae0ec52f1 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -5036,7 +5036,7 @@ void OnTerminate() The easiest way to achieve that is to acutally segfault, so we assert here. */ - serverAssert(false); + serverPanic("std::teminate() called"); } void *workerThreadMain(void *parg) From 71fe6f7ba906c3368965f8ca0030a96ba49feea7 Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 25 Mar 2020 21:55:31 -0400 Subject: [PATCH 37/38] Fix issue #143 Former-commit-id: 6ec1641294b23e22a2a5dc5cc6098a02ce234df3 --- src/networking.cpp | 2 +- src/replication.cpp | 50 ++++++++++++++++++--------------------------- src/server.cpp | 11 +++++----- src/server.h | 2 -- 4 files changed, 27 insertions(+), 38 deletions(-) diff --git a/src/networking.cpp b/src/networking.cpp index 94b202c17..276095715 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -496,7 +496,7 @@ void addReplyErrorLengthCore(client *c, const char *s, size_t len, bool fAsync) if (c->querybuf && sdslen(c->querybuf)) { std::string str = escapeString(c->querybuf); - serverLog(LL_WARNING, "\tquerybuf: %s", str.c_str()); + printf("\tquerybuf: %s\n", str.c_str()); } c->master_error = 1; } diff --git a/src/replication.cpp b/src/replication.cpp index 2bdc21bf0..6f38ee0bc 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -2425,8 +2425,6 @@ void freeMasterInfo(redisMaster *mi) { zfree(mi->masterauth); zfree(mi->masteruser); - if (mi->clientFake) - freeClient(mi->clientFake); delete mi->staleKeyMap; zfree(mi); } @@ -2484,11 +2482,6 @@ void replicationHandleMasterDisconnection(redisMaster *mi) { mi->master = NULL; mi->repl_state = REPL_STATE_CONNECT; mi->repl_down_since = g_pserver->unixtime; - if (mi->clientFake) { - freeClient(mi->clientFake); - mi->clientFake = nullptr; - - } /* We lost connection with our master, don't disconnect slaves yet, * maybe we'll be able to PSYNC with our master later. We'll disconnect * the slaves only if we'll have to do a full resync with our master. */ @@ -3378,8 +3371,13 @@ bool FInReplicaReplay() return s_pstate != nullptr && s_pstate->nesting() > 0; } +struct RemoteMasterState +{ + uint64_t mvcc = 0; + client *cFake = nullptr; +}; -static std::unordered_map g_mapmvcc; +static std::unordered_map g_mapremote; void replicaReplayCommand(client *c) { @@ -3455,12 +3453,15 @@ void replicaReplayCommand(client *c) if (!s_pstate->FPush()) return; - redisMaster *mi = s_pstate->getMi(c); - client *cFake = mi->clientFake; - if (mi->clientFakeNesting != s_pstate->nesting()) - cFake = nullptr; - serverAssert(mi != nullptr); - if (mvcc != 0 && g_mapmvcc[uuid] >= mvcc) + RemoteMasterState &remoteState = g_mapremote[uuid]; + if (remoteState.cFake == nullptr) + remoteState.cFake = createClient(-1, c->iel); + else + remoteState.cFake->iel = c->iel; + + client *cFake = remoteState.cFake; + + if (mvcc != 0 && remoteState.mvcc >= mvcc) { s_pstate->Cancel(); s_pstate->Pop(); @@ -3469,8 +3470,6 @@ void replicaReplayCommand(client *c) // OK We've recieved a command lets execute client *current_clientSave = serverTL->current_client; - if (cFake == nullptr) - cFake = createClient(-1, c->iel); cFake->lock.lock(); cFake->authenticated = c->authenticated; cFake->puser = c->puser; @@ -3483,13 +3482,15 @@ void replicaReplayCommand(client *c) bool fExec = ccmdPrev != serverTL->commandsExecuted; cFake->lock.unlock(); if (cFake->master_error) - addReplyError(c, "Error in rreplay command, please check logs"); + { + addReplyError(c, "Error in rreplay command, please check logs."); + } if (fExec || cFake->flags & CLIENT_MULTI) { addReply(c, shared.ok); selectDb(c, cFake->db->id); - if (mvcc > g_mapmvcc[uuid]) - g_mapmvcc[uuid] = mvcc; + if (mvcc > remoteState.mvcc) + remoteState.mvcc = mvcc; } else { @@ -3497,17 +3498,6 @@ void replicaReplayCommand(client *c) addReplyError(c, "command did not execute"); } serverAssert(sdslen(cFake->querybuf) == 0); - if (cFake->flags & CLIENT_MULTI) - { - mi->clientFake = cFake; - mi->clientFakeNesting = s_pstate->nesting(); - } - else - { - if (mi->clientFake == cFake) - mi->clientFake = nullptr; - freeClient(cFake); - } serverTL->current_client = current_clientSave; // call() will not propogate this for us, so we do so here diff --git a/src/server.cpp b/src/server.cpp index ae0ec52f1..30d7eb9de 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -3593,12 +3593,12 @@ int processCommand(client *c, int callFlags) { return C_OK; } } - - incrementMvccTstamp(); if (!locker.isArmed()) locker.arm(c); + incrementMvccTstamp(); + /* Handle the maxmemory directive. * * Note that we do not want to reclaim memory if we are here re-entering @@ -5018,14 +5018,15 @@ void incrementMvccTstamp() msPrev >>= MVCC_MS_SHIFT; // convert to milliseconds long long mst; - __atomic_load(&g_pserver->mstime, &mst, __ATOMIC_RELAXED); + __atomic_load(&g_pserver->mstime, &mst, __ATOMIC_ACQUIRE); if (msPrev >= (uint64_t)mst) // we can be greater if the count overflows { - atomicIncr(g_pserver->mvcc_tstamp, 1); + __atomic_fetch_add(&g_pserver->mvcc_tstamp, 1, __ATOMIC_RELEASE); } else { - atomicSet(g_pserver->mvcc_tstamp, ((uint64_t)g_pserver->mstime) << MVCC_MS_SHIFT); + uint64_t val = ((uint64_t)g_pserver->mstime) << MVCC_MS_SHIFT; + __atomic_store(&g_pserver->mvcc_tstamp, &val, __ATOMIC_RELEASE); } } diff --git a/src/server.h b/src/server.h index 94d8a0d11..218982bda 100644 --- a/src/server.h +++ b/src/server.h @@ -1541,8 +1541,6 @@ struct redisMaster { int masterport; /* Port of master */ client *cached_master; /* Cached master to be reused for PSYNC. */ client *master; - client *clientFake; - int clientFakeNesting; /* The following two fields is where we store master PSYNC replid/offset * while the PSYNC is in progress. At the end we'll copy the fields into * the server->master client structure. */ From f4caa849877619f8892a20a2b709a0e966f29a9f Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 25 Mar 2020 22:26:27 -0400 Subject: [PATCH 38/38] Fix merge issues Former-commit-id: b22d9cc27d0434578891c59825f1c8813a3a9b28 --- src/replication.cpp | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/replication.cpp b/src/replication.cpp index 615acb077..91ec6309a 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -3870,7 +3870,7 @@ void replicaReplayCommand(client *c) RemoteMasterState &remoteState = g_mapremote[uuid]; if (remoteState.cFake == nullptr) - remoteState.cFake = createClient(-1, c->iel); + remoteState.cFake = createClient(nullptr, c->iel); else remoteState.cFake->iel = c->iel; @@ -3885,11 +3885,6 @@ void replicaReplayCommand(client *c) // OK We've recieved a command lets execute client *current_clientSave = serverTL->current_client; -<<<<<<< HEAD - if (cFake == nullptr) - cFake = createClient(nullptr, c->iel); -======= ->>>>>>> unstable cFake->lock.lock(); cFake->authenticated = c->authenticated; cFake->puser = c->puser;