From efc6bb9e0cea75e33f1b5c29b15237626112e392 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 11 Jul 2019 18:53:34 -0400 Subject: [PATCH 01/24] Release version 5 Former-commit-id: 25e18a7de91d4af651f8f22674846cb0238dd5d3 --- src/version.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/version.h b/src/version.h index db32931f1..c83af2844 100644 --- a/src/version.h +++ b/src/version.h @@ -1,3 +1,3 @@ -#define KEYDB_REAL_VERSION "0.0.0" +#define KEYDB_REAL_VERSION "5.0.0" extern const char *KEYDB_SET_VERSION; // Unlike real version, this can be overriden by the config From 5b2ea982924b0acd2e00512f3d2c647678ea6f5c Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 24 Jul 2019 22:31:02 -0400 Subject: [PATCH 02/24] Issue #64 RREPLAY isn't binary safe. Add fix and test. Former-commit-id: f1982ca63dc8dd85b62c1338d7be324595b6ad8e --- src/replication.cpp | 2 +- tests/integration/replication-active.tcl | 9 +++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/replication.cpp b/src/replication.cpp index 14ee002aa..4118087c0 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -3265,7 +3265,7 @@ void replicaReplayCommand(client *c) cFake->lock.lock(); cFake->authenticated = c->authenticated; cFake->puser = c->puser; - cFake->querybuf = sdscat(cFake->querybuf,(sds)ptrFromObj(c->argv[2])); + cFake->querybuf = sdscatsds(cFake->querybuf,(sds)ptrFromObj(c->argv[2])); selectDb(cFake, c->db->id); processInputBuffer(cFake, (CMD_CALL_FULL & (~CMD_CALL_PROPAGATE))); cFake->lock.unlock(); diff --git a/tests/integration/replication-active.tcl b/tests/integration/replication-active.tcl index dfb89f603..99e0dc006 100644 --- a/tests/integration/replication-active.tcl +++ b/tests/integration/replication-active.tcl @@ -49,6 +49,15 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} { } } + test {Active replicas propogate binary} { + $master set binkey "\u0000foo" + wait_for_condition 50 500 { + [string match *foo* [$slave get binkey]] + } else { + fail "replication failed to propogate binary data" + } + } + test {Active replicas WAIT} { # Test that wait succeeds since replicas should be syncronized $master set testkey foo From e7ef6c23c82151d701a145453c54c7bb562854fe Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 24 Jul 2019 22:49:30 -0400 Subject: [PATCH 03/24] RREPLAY failures should be logged Former-commit-id: b0a0f03b96b44005e905f5d46985d76d52a712ec --- src/replication.cpp | 7 ++++++- src/server.cpp | 1 + src/server.h | 1 + 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/src/replication.cpp b/src/replication.cpp index 4118087c0..461a2fd92 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -3267,9 +3267,14 @@ void replicaReplayCommand(client *c) cFake->puser = c->puser; cFake->querybuf = sdscatsds(cFake->querybuf,(sds)ptrFromObj(c->argv[2])); selectDb(cFake, c->db->id); + auto ccmdPrev = serverTL->commandsExecuted; processInputBuffer(cFake, (CMD_CALL_FULL & (~CMD_CALL_PROPAGATE))); + bool fExec = ccmdPrev != serverTL->commandsExecuted; cFake->lock.unlock(); - addReply(c, shared.ok); + if (fExec) + addReply(c, shared.ok); + else + addReplyError(c, "command did not execute"); freeClient(cFake); serverTL->current_client = current_clientSave; diff --git a/src/server.cpp b/src/server.cpp index 4ee6922d2..1aca94717 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -3348,6 +3348,7 @@ void call(client *c, int flags) { dirty = g_pserver->dirty; start = ustime(); c->cmd->proc(c); + serverTL->commandsExecuted++; duration = ustime()-start; dirty = g_pserver->dirty-dirty; if (dirty < 0) dirty = 0; diff --git a/src/server.h b/src/server.h index d057765f2..d2aa14637 100644 --- a/src/server.h +++ b/src/server.h @@ -1159,6 +1159,7 @@ struct redisServerThreadVars { client *lua_client = nullptr; /* The "fake client" to query Redis from Lua */ struct fastlock lockPendingWrite; char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ + long unsigned commandsExecuted = 0; }; struct redisMaster { From caaf5d91416499cdbc70d8dd627008630ad28995 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 29 Jul 2019 15:08:41 -0400 Subject: [PATCH 04/24] Fix crash in RediSearch Former-commit-id: 6b6e6f6c1ef49f87f794de512489e5fbbfb67ca0 --- src/module.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/module.cpp b/src/module.cpp index 568205f90..8d935c0cc 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -3930,6 +3930,8 @@ void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) { void RM_ThreadSafeContextLock(RedisModuleCtx *ctx) { UNUSED(ctx); moduleAcquireGIL(FALSE /*fServerThread*/); + if (serverTL == nullptr) + serverTL = &g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN]; // arbitrary module threads get the main thread context } /* Release the server lock after a thread safe API call was executed. */ From e7747b91819640d446856dfe2098acd029915bce Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 26 Aug 2019 20:18:52 -0400 Subject: [PATCH 05/24] Fix race condition in PUB/SUB and other async reply commands where the client can be freed before our handler is executed on the client thread. When this occurs the client pointer is dangling Former-commit-id: fad9483fc920e5b1fa67e56d4b8483138b565bd3 --- .vscode/settings.json | 56 ------------------------------ src/ae.cpp | 33 +++++++++++++++++- src/debug.cpp | 3 ++ src/networking.cpp | 79 +++++++++++++++++++++++++++++++++++-------- src/pubsub.cpp | 10 ++++++ src/server.h | 6 ++-- 6 files changed, 113 insertions(+), 74 deletions(-) delete mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index 56bf76d11..000000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,56 +0,0 @@ -{ - "files.associations": { - "zmalloc.h": "c", - "stat.h": "c", - "array": "cpp", - "atomic": "cpp", - "*.tcc": "cpp", - "cctype": "cpp", - "chrono": "cpp", - "clocale": "cpp", - "cmath": "cpp", - "condition_variable": "cpp", - "cstdarg": "cpp", - "cstddef": "cpp", - "cstdint": "cpp", - "cstdio": "cpp", - "cstdlib": "cpp", - "cstring": "cpp", - "ctime": "cpp", - "cwchar": "cpp", - "cwctype": "cpp", - "deque": "cpp", - "list": "cpp", - "unordered_map": "cpp", - "vector": "cpp", - "exception": "cpp", - "fstream": "cpp", - "functional": "cpp", - "future": "cpp", - "initializer_list": "cpp", - "iomanip": "cpp", - "iosfwd": "cpp", - "iostream": "cpp", - "istream": "cpp", - "limits": "cpp", - "memory": "cpp", - "mutex": "cpp", - "new": "cpp", - "numeric": "cpp", - "optional": "cpp", - "ostream": "cpp", - "ratio": "cpp", - "scoped_allocator": "cpp", - "sstream": "cpp", - "stdexcept": "cpp", - "streambuf": "cpp", - "string_view": "cpp", - "system_error": "cpp", - "thread": "cpp", - "cinttypes": "cpp", - "tuple": "cpp", - "type_traits": "cpp", - "typeinfo": "cpp", - "utility": "cpp" - } -} diff --git a/src/ae.cpp b/src/ae.cpp index 48d6107b7..f636078b1 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -191,6 +191,36 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int ) } } +// Unlike write() this is an all or nothing thing. We will block if a partial write is hit +ssize_t safe_write(int fd, const void *pv, size_t cb) +{ + const char *pcb = (const char*)pv; + ssize_t written = 0; + do + { + ssize_t rval = write(fd, pcb, cb); + if (rval > 0) + { + pcb += rval; + cb -= rval; + written += rval; + } + else if (errno == EAGAIN) + { + if (written == 0) + break; + // if we've already written something then we're committed so keep trying + } + else + { + if (rval == 0) + return written; + return rval; + } + } while (cb); + return written; +} + int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData, int fSynchronous) { @@ -212,9 +242,10 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask, std::unique_lock ulock(cmd.pctl->mutexcv, std::defer_lock); if (fSynchronous) cmd.pctl->mutexcv.lock(); - auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); + auto size = safe_write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); if (size != sizeof(cmd)) { + AE_ASSERT(size == sizeof(cmd) || size <= 0); AE_ASSERT(errno == EAGAIN); ret = AE_ERR; } diff --git a/src/debug.cpp b/src/debug.cpp index 2fbbd9e2f..2ded21ea4 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -55,6 +55,8 @@ typedef ucontext_t sigcontext_t; #endif #endif +bool g_fInCrash = false; + /* ================================= Debugging ============================== */ /* Compute the sha1 of string at 's' with 'len' bytes long. @@ -1356,6 +1358,7 @@ void dumpX86Calls(void *addr, size_t len) { void sigsegvHandler(int sig, siginfo_t *info, void *secret) { ucontext_t *uc = (ucontext_t*) secret; + g_fInCrash = true; void *eip = getMcontextEip(uc); sds infostring, clients; struct sigaction act; diff --git a/src/networking.cpp b/src/networking.cpp index 174c21ff5..baaf796fd 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -174,6 +174,7 @@ client *createClient(int fd, int iel) { c->bufAsync = NULL; c->buflenAsync = 0; c->bufposAsync = 0; + c->casyncOpsPending = 0; memset(c->uuid, 0, UUID_BINARY_LEN); listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid); @@ -1003,7 +1004,6 @@ static void acceptCommonHandler(int fd, int flags, char *ip, int iel) { serverLog(LL_WARNING, "Error registering fd event for the new client: %s (fd=%d)", strerror(errno),fd); - close(fd); /* May be already closed, just ignore errors */ return; } @@ -1266,7 +1266,7 @@ void unlinkClient(client *c) { } } -void freeClient(client *c) { +bool freeClient(client *c) { listNode *ln; serverAssert(c->fd == -1 || GlobalLocksAcquired()); AssertCorrectThread(c); @@ -1274,9 +1274,9 @@ void freeClient(client *c) { /* If a client is protected, yet we need to free it right now, make sure * to at least use asynchronous freeing. */ - if (c->flags & CLIENT_PROTECTED) { + if (c->flags & CLIENT_PROTECTED || c->casyncOpsPending) { freeClientAsync(c); - return; + return false; } /* If it is our master that's beging disconnected we should make sure @@ -1291,7 +1291,7 @@ void freeClient(client *c) { CLIENT_BLOCKED))) { replicationCacheMaster(MasterInfoFromClient(c), c); - return; + return false; } } @@ -1370,6 +1370,7 @@ void freeClient(client *c) { ulock.unlock(); fastlock_free(&c->lock); zfree(c); + return true; } /* Schedule a client to free it at a safe time in the serverCron() function. @@ -1382,28 +1383,37 @@ void freeClientAsync(client *c) { * may access the list while Redis uses I/O threads. All the other accesses * are in the context of the main thread while the other threads are * idle. */ - if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return; + if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return; // check without the lock first std::lock_guardlock)> clientlock(c->lock); AeLocker lock; lock.arm(c); + if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return; // race condition after we acquire the lock c->flags |= CLIENT_CLOSE_ASAP; listAddNodeTail(g_pserver->clients_to_close,c); } void freeClientsInAsyncFreeQueue(int iel) { + serverAssert(GlobalLocksAcquired()); listIter li; listNode *ln; listRewind(g_pserver->clients_to_close,&li); - while((ln = listNext(&li))) { + // Store the clients in a temp vector since freeClient will modify this list + std::vector vecclientsFree; + while((ln = listNext(&li))) + { client *c = (client*)listNodeValue(ln); - if (c->iel != iel) - continue; // wrong thread + if (c->iel == iel) + { + vecclientsFree.push_back(c); + listDelNode(g_pserver->clients_to_close, ln); + } + } + for (client *c : vecclientsFree) + { c->flags &= ~CLIENT_CLOSE_ASAP; freeClient(c); - listDelNode(g_pserver->clients_to_close,ln); - listRewind(g_pserver->clients_to_close,&li); } } @@ -1551,6 +1561,15 @@ void ProcessPendingAsyncWrites() std::lock_guardlock)> lock(c->lock); serverAssert(c->fPendingAsyncWrite); + if (c->flags & (CLIENT_CLOSE_ASAP | CLIENT_CLOSE_AFTER_REPLY)) + { + c->bufposAsync = 0; + c->buflenAsync = 0; + zfree(c->bufAsync); + c->bufAsync = nullptr; + c->fPendingAsyncWrite = FALSE; + continue; + } // TODO: Append to end of reply block? @@ -1587,8 +1606,36 @@ void ProcessPendingAsyncWrites() continue; asyncCloseClientOnOutputBufferLimitReached(c); - if (aeCreateRemoteFileEvent(g_pserver->rgthreadvar[c->iel].el, c->fd, ae_flags, sendReplyToClient, c, FALSE) == AE_ERR) - continue; // We can retry later in the cron + if (c->flags & CLIENT_CLOSE_ASAP) + continue; // we will never write this so don't post an op + + std::atomic_thread_fence(std::memory_order_seq_cst); + + if (c->casyncOpsPending == 0) + { + if (FCorrectThread(c)) + { + prepareClientToWrite(c, false); // queue an event + } + else + { + // We need to start the write on the client's thread + if (aePostFunction(g_pserver->rgthreadvar[c->iel].el, [c]{ + // Install a write handler. Don't do the actual write here since we don't want + // to duplicate the throttling and safety mechanisms of the normal write code + std::lock_guardlock)> lock(c->lock); + serverAssert(c->casyncOpsPending > 0); + c->casyncOpsPending--; + aeCreateFileEvent(g_pserver->rgthreadvar[c->iel].el, c->fd, AE_WRITABLE|AE_WRITE_THREADSAFE, sendReplyToClient, c); + }, false) == AE_ERR + ) + { + // Posting the function failed + continue; // We can retry later in the cron + } + ++c->casyncOpsPending; // race is handled by the client lock in the lambda + } + } } } @@ -1628,13 +1675,15 @@ int handleClientsWithPendingWrites(int iel) { std::unique_locklock)> lock(c->lock); /* Try to write buffers to the client socket. */ - if (writeToClient(c->fd,c,0) == C_ERR) { + if (writeToClient(c->fd,c,0) == C_ERR) + { if (c->flags & CLIENT_CLOSE_ASAP) { lock.release(); // still locked AeLocker ae; ae.arm(c); - freeClient(c); // writeToClient will only async close, but there's no need to wait + if (!freeClient(c)) // writeToClient will only async close, but there's no need to wait + c->lock.unlock(); // if we just got put on the async close list, then we need to remove the lock } continue; } diff --git a/src/pubsub.cpp b/src/pubsub.cpp index 6a9c2bdfc..46677487f 100644 --- a/src/pubsub.cpp +++ b/src/pubsub.cpp @@ -143,6 +143,8 @@ int clientSubscriptionsCount(client *c) { /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or * 0 if the client was already subscribed to that channel. */ int pubsubSubscribeChannel(client *c, robj *channel) { + serverAssert(GlobalLocksAcquired()); + serverAssert(c->lock.fOwnLock()); dictEntry *de; list *clients = NULL; int retval = 0; @@ -202,6 +204,7 @@ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) { /* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the client was already subscribed to that pattern. */ int pubsubSubscribePattern(client *c, robj *pattern) { + serverAssert(GlobalLocksAcquired()); int retval = 0; if (listSearchKey(c->pubsub_patterns,pattern) == NULL) { @@ -244,6 +247,7 @@ int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) { /* Unsubscribe from all the channels. Return the number of channels the * client was subscribed to. */ int pubsubUnsubscribeAllChannels(client *c, int notify) { + serverAssert(GlobalLocksAcquired()); dictIterator *di = dictGetSafeIterator(c->pubsub_channels); dictEntry *de; int count = 0; @@ -262,6 +266,7 @@ int pubsubUnsubscribeAllChannels(client *c, int notify) { /* Unsubscribe from all the patterns. Return the number of patterns the * client was subscribed from. */ int pubsubUnsubscribeAllPatterns(client *c, int notify) { + serverAssert(GlobalLocksAcquired()); listNode *ln; listIter li; int count = 0; @@ -278,6 +283,7 @@ int pubsubUnsubscribeAllPatterns(client *c, int notify) { /* Publish a message */ int pubsubPublishMessage(robj *channel, robj *message) { + serverAssert(GlobalLocksAcquired()); int receivers = 0; dictEntry *de; listNode *ln; @@ -293,6 +299,8 @@ int pubsubPublishMessage(robj *channel, robj *message) { listRewind(list,&li); while ((ln = listNext(&li)) != NULL) { client *c = reinterpret_cast(ln->value); + if (c->flags & CLIENT_CLOSE_ASAP) // avoid blocking if the write will be ignored + continue; fastlock_lock(&c->lock); addReplyPubsubMessage(c,channel,message); fastlock_unlock(&c->lock); @@ -311,6 +319,8 @@ int pubsubPublishMessage(robj *channel, robj *message) { (char*)ptrFromObj(channel), sdslen(szFromObj(channel)),0)) { + if (pat->pclient->flags & CLIENT_CLOSE_ASAP) + continue; fastlock_lock(&pat->pclient->lock); addReplyPubsubPatMessage(pat->pclient, pat->pattern,channel,message); diff --git a/src/server.h b/src/server.h index d2aa14637..23f0d7aa0 100644 --- a/src/server.h +++ b/src/server.h @@ -925,6 +925,7 @@ typedef struct client { time_t lastinteraction; /* Time of the last interaction, used for timeout */ time_t obuf_soft_limit_reached_time; std::atomic flags; /* Client flags: CLIENT_* macros. */ + int casyncOpsPending; int fPendingAsyncWrite; /* NOTE: Not a flag because it is written to outside of the client lock (locked by the global lock instead) */ int authenticated; /* Needed when the default user requires auth. */ int replstate; /* Replication state if this is a slave. */ @@ -1694,7 +1695,7 @@ void redisSetProcTitle(const char *title); /* networking.c -- Networking and Client related operations */ client *createClient(int fd, int iel); void closeTimedoutClients(void); -void freeClient(client *c); +bool freeClient(client *c); void freeClientAsync(client *c); void resetClient(client *c); void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask); @@ -2503,9 +2504,10 @@ void xorDigest(unsigned char *digest, const void *ptr, size_t len); int populateCommandTableParseFlags(struct redisCommand *c, const char *strflags); int moduleGILAcquiredByModule(void); +extern bool g_fInCrash; static inline int GlobalLocksAcquired(void) // Used in asserts to verify all global locks are correctly acquired for a server-thread to operate { - return aeThreadOwnsLock() || moduleGILAcquiredByModule(); + return aeThreadOwnsLock() || moduleGILAcquiredByModule() || g_fInCrash; } inline int ielFromEventLoop(const aeEventLoop *eventLoop) From bffee1aa3f4e37b518b665650bd2da5ec2885f54 Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 27 Aug 2019 15:32:24 -0400 Subject: [PATCH 06/24] Bump version Former-commit-id: bf905a78f0982cf4e13a4d9bbd9fa0166233b245 --- src/version.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/version.h b/src/version.h index c83af2844..db7af9930 100644 --- a/src/version.h +++ b/src/version.h @@ -1,3 +1,3 @@ -#define KEYDB_REAL_VERSION "5.0.0" +#define KEYDB_REAL_VERSION "5.0.1" extern const char *KEYDB_SET_VERSION; // Unlike real version, this can be overriden by the config From 2491f2a23dff9f6bc8208d105364658779320192 Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 30 Jul 2019 16:54:25 -0400 Subject: [PATCH 07/24] Acquire the lock for modules that don't acquire it before calling like they are supposed to Former-commit-id: f83a89f82a30d4edbd8068172bc54e0f1fe0cc25 --- src/module.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/module.cpp b/src/module.cpp index 8d935c0cc..f071ae4e1 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -3920,7 +3920,9 @@ RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) { /* Release a thread safe context. */ void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) { + moduleAcquireGIL(false /*fServerThread*/); moduleFreeContext(ctx); + moduleReleaseGIL(false /*fServerThread*/); zfree(ctx); } From 8fa118873c3e73afb247aac97ba1a48314c5393c Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 6 Sep 2019 13:30:15 -0400 Subject: [PATCH 08/24] Fix issue where Active Replicas were commiting data to the wrong database under load Former-commit-id: 50a6a3ca389aef3d8f970faef5336f7053cf4cc5 --- src/replication.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/replication.cpp b/src/replication.cpp index 461a2fd92..52ba568a6 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -3272,9 +3272,14 @@ void replicaReplayCommand(client *c) bool fExec = ccmdPrev != serverTL->commandsExecuted; cFake->lock.unlock(); if (fExec) + { addReply(c, shared.ok); + selectDb(c, cFake->db->id); + } else + { addReplyError(c, "command did not execute"); + } freeClient(cFake); serverTL->current_client = current_clientSave; From c23c307a0d7cb9bf9d20e266f3a1050bf876bd0f Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 6 Sep 2019 13:31:40 -0400 Subject: [PATCH 09/24] bump version Former-commit-id: 71d5a10621b762fccec10d38a72b0535f8f16502 --- src/version.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/version.h b/src/version.h index db7af9930..4ea159eb1 100644 --- a/src/version.h +++ b/src/version.h @@ -1,3 +1,3 @@ -#define KEYDB_REAL_VERSION "5.0.1" +#define KEYDB_REAL_VERSION "5.0.2" extern const char *KEYDB_SET_VERSION; // Unlike real version, this can be overriden by the config From 2852a81a1343f243f931ccafb9862228b20c0d72 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 19 Sep 2019 15:39:52 -0400 Subject: [PATCH 10/24] Fix issue where AOF events are posted to the wrong event loop and not properly cleaned up Former-commit-id: 4589e861ab79992802cfa26ba06693996d75835c --- src/aof.cpp | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/src/aof.cpp b/src/aof.cpp index c7160489b..69e32fc3b 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -97,6 +97,7 @@ void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) { aofrwblock *block; ssize_t nwritten; serverAssert(GlobalLocksAcquired()); + serverAssert(el == g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el); // MUST run on main thread UNUSED(el); UNUSED(fd); @@ -164,10 +165,7 @@ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) { /* Install a file event to send data to the rewrite child if there is * not one already. */ - if (aeGetFileEvents(serverTL->el,g_pserver->aof_pipe_write_data_to_child) == 0) { - aeCreateFileEvent(serverTL->el, g_pserver->aof_pipe_write_data_to_child, - AE_WRITABLE, aofChildWriteDiffData, NULL); - } + aeCreateRemoteFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL, FALSE); } /* Write the buffer (possibly composed of multiple blocks) into the specified @@ -1508,7 +1506,7 @@ void aofChildPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) { } /* Remove the handler since this can be called only one time during a * rewrite. */ - aeDeleteFileEventAsync(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,g_pserver->aof_pipe_read_ack_from_child,AE_READABLE); + aeDeleteFileEvent(el,g_pserver->aof_pipe_read_ack_from_child,AE_READABLE); } /* Create the pipes used for parent - child process IPC during rewrite. @@ -1546,12 +1544,20 @@ error: } void aofClosePipes(void) { - aeDeleteFileEventAsync(g_pserver->el_alf_pip_read_ack_from_child,g_pserver->aof_pipe_read_ack_from_child,AE_READABLE); - aeDeleteFileEventAsync(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,g_pserver->aof_pipe_write_data_to_child,AE_WRITABLE); - close(g_pserver->aof_pipe_write_data_to_child); + int fdAofAckPipe = g_pserver->aof_pipe_read_ack_from_child; + aePostFunction(g_pserver->el_alf_pip_read_ack_from_child, [fdAofAckPipe]{ + aeDeleteFileEventAsync(serverTL->el,fdAofAckPipe,AE_READABLE); + close (fdAofAckPipe); + }); + + int fdAofWritePipe = g_pserver->aof_pipe_write_data_to_child; + aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [fdAofWritePipe]{ + aeDeleteFileEventAsync(serverTL->el,fdAofWritePipe,AE_WRITABLE); + close(fdAofWritePipe); + }); + close(g_pserver->aof_pipe_read_data_from_parent); close(g_pserver->aof_pipe_write_ack_to_parent); - close(g_pserver->aof_pipe_read_ack_from_child); close(g_pserver->aof_pipe_write_ack_to_child); close(g_pserver->aof_pipe_read_ack_from_parent); } From d25883411b98dcfd950b2c12360aa67ab2560557 Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 1 Oct 2019 17:53:48 -0400 Subject: [PATCH 11/24] Bump version Former-commit-id: 0a0c2146c88417baf8f905aaab7f1b45f4d7e46d --- src/version.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/version.h b/src/version.h index 4ea159eb1..d0a152517 100644 --- a/src/version.h +++ b/src/version.h @@ -1,3 +1,3 @@ -#define KEYDB_REAL_VERSION "5.0.2" +#define KEYDB_REAL_VERSION "5.1.0" extern const char *KEYDB_SET_VERSION; // Unlike real version, this can be overriden by the config From 6ec189bd1bb13a9e8f4014439cc12717625a4046 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 21 Oct 2019 12:21:46 -0400 Subject: [PATCH 12/24] Disable multithreaded KEYS due to bugs Former-commit-id: 3fac516950e831129da856f32fa373a56a6268a1 --- src/db.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/db.cpp b/src/db.cpp index 0d5042bab..f34245964 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -643,7 +643,9 @@ void keysCommand(client *c) { unsigned long numkeys = 0; void *replylen = addReplyDeferredLen(c); +#ifdef MULTITHREADED_KEYS aeReleaseLock(); +#endif di = dictGetSafeIterator(c->db->pdict); allkeys = (pattern[0] == '*' && pattern[1] == '\0'); @@ -663,11 +665,13 @@ void keysCommand(client *c) { dictReleaseIterator(di); setDeferredArrayLen(c,replylen,numkeys); +#ifdef MULTITHREADED_KEYS fastlock_unlock(&c->db->lock); // we must release the DB lock before acquiring the AE lock to prevent deadlocks AeLocker lock; lock.arm(c); fastlock_lock(&c->db->lock); // we still need the DB lock lock.release(); +#endif } /* This callback is used by scanGenericCommand in order to collect elements From 3c955d0a2316c3c1aeda5535360983340e705387 Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 25 Oct 2019 01:00:28 -0400 Subject: [PATCH 13/24] bump version Former-commit-id: 3013799664932ce95a22425d13ab7203add07b52 --- src/version.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/version.h b/src/version.h index d0a152517..60b1425cd 100644 --- a/src/version.h +++ b/src/version.h @@ -1,3 +1,3 @@ -#define KEYDB_REAL_VERSION "5.1.0" +#define KEYDB_REAL_VERSION "5.1.1" extern const char *KEYDB_SET_VERSION; // Unlike real version, this can be overriden by the config From b4c1b5df3d1f572cb2234a429ddd35dcf5450412 Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 25 Oct 2019 02:44:14 -0400 Subject: [PATCH 14/24] Fix potential race in pubsub Former-commit-id: a91f58b8fd5d267760d504b024068d0132569ae9 --- src/pubsub.cpp | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/pubsub.cpp b/src/pubsub.cpp index 46677487f..0c657a588 100644 --- a/src/pubsub.cpp +++ b/src/pubsub.cpp @@ -301,9 +301,11 @@ int pubsubPublishMessage(robj *channel, robj *message) { client *c = reinterpret_cast(ln->value); if (c->flags & CLIENT_CLOSE_ASAP) // avoid blocking if the write will be ignored continue; - fastlock_lock(&c->lock); + if (FCorrectThread(c)) + fastlock_lock(&c->lock); addReplyPubsubMessage(c,channel,message); - fastlock_unlock(&c->lock); + if (FCorrectThread(c)) + fastlock_unlock(&c->lock); receivers++; } } @@ -321,10 +323,12 @@ int pubsubPublishMessage(robj *channel, robj *message) { { if (pat->pclient->flags & CLIENT_CLOSE_ASAP) continue; - fastlock_lock(&pat->pclient->lock); + if (FCorrectThread(pat->pclient)) + fastlock_lock(&pat->pclient->lock); addReplyPubsubPatMessage(pat->pclient, pat->pattern,channel,message); - fastlock_unlock(&pat->pclient->lock); + if (FCorrectThread(pat->pclient)) + fastlock_unlock(&pat->pclient->lock); receivers++; } } From 774cebc4db82f60528ec24a2a775e60c70e62396 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 21 Nov 2019 20:16:42 -0500 Subject: [PATCH 15/24] Bump version Former-commit-id: 7dd53182e071641ff32813e5ba4b0e3fabe72abc --- src/version.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/version.h b/src/version.h index 60b1425cd..b2bbc88d1 100644 --- a/src/version.h +++ b/src/version.h @@ -1,3 +1,3 @@ -#define KEYDB_REAL_VERSION "5.1.1" +#define KEYDB_REAL_VERSION "5.2.0" extern const char *KEYDB_SET_VERSION; // Unlike real version, this can be overriden by the config From 7e596c9703e2dd1270301171e6e1d34ab4cc0cae Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 6 Jan 2020 12:07:57 -0500 Subject: [PATCH 16/24] Bump version Former-commit-id: 39acb312efbb06f38e98c3ebbb98d17a556b050a --- src/version.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/version.h b/src/version.h index b2bbc88d1..79edddc49 100644 --- a/src/version.h +++ b/src/version.h @@ -1,3 +1,3 @@ -#define KEYDB_REAL_VERSION "5.2.0" +#define KEYDB_REAL_VERSION "5.3.0" extern const char *KEYDB_SET_VERSION; // Unlike real version, this can be overriden by the config From e942999159f3ce319529502ec0ed04f427c3f755 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sat, 11 Jan 2020 16:34:09 -0500 Subject: [PATCH 17/24] Avoid crash due to excessive posted functions for AOF rewrite Former-commit-id: c575e7df9408ad7bd66ac7a104a38e841d525681 --- src/aof.cpp | 12 ++++++++---- src/server.h | 1 + 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/aof.cpp b/src/aof.cpp index d48b664d2..b82be9a34 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -165,10 +165,14 @@ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) { /* Install a file event to send data to the rewrite child if there is * not one already. */ - aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, []{ - if (g_pserver->aof_pipe_write_data_to_child >= 0) - aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL); - }); + if (!g_pserver->aof_rewrite_pending) { + g_pserver->aof_rewrite_pending = true; + aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [] { + g_pserver->aof_rewrite_pending = false; + if (g_pserver->aof_pipe_write_data_to_child >= 0) + aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL); + }); + } } /* Write the buffer (possibly composed of multiple blocks) into the specified diff --git a/src/server.h b/src/server.h index 5733264e0..6b8903092 100644 --- a/src/server.h +++ b/src/server.h @@ -1733,6 +1733,7 @@ struct redisServer { int aof_stop_sending_diff; /* If true stop sending accumulated diffs to child process. */ sds aof_child_diff; /* AOF diff accumulator child side. */ + int aof_rewrite_pending = 0; /* is a call to aofChildWriteDiffData already queued? */ /* RDB persistence */ long long dirty; /* Changes to DB from the last save */ long long dirty_before_bgsave; /* Used to restore dirty on failed BGSAVE */ From 7f8cb3600bf59d31ee26e9f6b8588465cb8fad3d Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 20 Jan 2020 19:20:50 -0500 Subject: [PATCH 18/24] Bump version Former-commit-id: f4319c12803f27a93150178f8b61f10aea09ce01 --- src/version.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/version.h b/src/version.h index 79edddc49..b20005ade 100644 --- a/src/version.h +++ b/src/version.h @@ -1,3 +1,3 @@ -#define KEYDB_REAL_VERSION "5.3.0" +#define KEYDB_REAL_VERSION "5.3.1" extern const char *KEYDB_SET_VERSION; // Unlike real version, this can be overriden by the config From ef291b246eaed8633a4c3bd0d09e7822189a30aa Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 28 Feb 2020 23:53:54 -0500 Subject: [PATCH 19/24] Bump version Former-commit-id: 3ce6441a0c0f5c58f31f5fa28116cea09aebbf35 --- src/version.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/version.h b/src/version.h index b20005ade..751337aaa 100644 --- a/src/version.h +++ b/src/version.h @@ -1,3 +1,3 @@ -#define KEYDB_REAL_VERSION "5.3.1" +#define KEYDB_REAL_VERSION "5.3.2" extern const char *KEYDB_SET_VERSION; // Unlike real version, this can be overriden by the config From 09f08908e2a5e791845c4e57100bf9d140c2859c Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 4 Mar 2020 17:23:40 -0500 Subject: [PATCH 20/24] Fix CLANG build break Former-commit-id: e523afa7410399697659106c88e9f65e2cffae79 --- src/server.h | 1 + 1 file changed, 1 insertion(+) diff --git a/src/server.h b/src/server.h index 1e6632576..94d8a0d11 100644 --- a/src/server.h +++ b/src/server.h @@ -55,6 +55,7 @@ #include #include #include +#include #ifdef __cplusplus extern "C" { #include From 87626299a6445715856141bef3eec44f11f2107f Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 19 Mar 2020 14:34:23 -0400 Subject: [PATCH 21/24] Prevent dangling lock when we can't free the client Former-commit-id: 3c373494d63b21744b264f0a47e6999bcdda6b2b --- src/networking.cpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/networking.cpp b/src/networking.cpp index 58bd79bcc..f6425af60 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1630,10 +1630,13 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { if (writeToClient(fd,c,1) == C_ERR) { AeLocker ae; - c->lock.lock(); + c->lock(); ae.arm(c); if (c->flags & CLIENT_CLOSE_ASAP) - freeClient(c); + { + if (!freeClient(c)) + c->unlock(); + } } } From 4d5d7ed59f3da3292696991c96d6ec59b6db30e7 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 19 Mar 2020 15:28:39 -0400 Subject: [PATCH 22/24] Fix lock inversion in processEventsWhileBlocked Former-commit-id: a9249d4a82a0f0355ac8ffa40b34b9c14cabf66b --- src/networking.cpp | 35 +++++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/src/networking.cpp b/src/networking.cpp index f6425af60..94b202c17 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1630,12 +1630,12 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { if (writeToClient(fd,c,1) == C_ERR) { AeLocker ae; - c->lock(); + c->lock.lock(); ae.arm(c); if (c->flags & CLIENT_CLOSE_ASAP) { if (!freeClient(c)) - c->unlock(); + c->lock.unlock(); } } } @@ -3107,12 +3107,23 @@ int processEventsWhileBlocked(int iel) { int iterations = 4; /* See the function top-comment. */ int count = 0; - client *c = serverTL->current_client; - if (c != nullptr) + std::vector vecclients; + listIter li; + listNode *ln; + listRewind(g_pserver->clients, &li); + + // All client locks must be acquired *after* the global lock is reacquired to prevent deadlocks + // so unlock here, and save them for reacquisition later + while ((ln = listNext(&li)) != nullptr) { - serverAssert(c->flags & CLIENT_PROTECTED); - c->lock.unlock(); + client *c = (client*)listNodeValue(ln); + if (c->lock.fOwnLock()) { + serverAssert(c->flags & CLIENT_PROTECTED); // If the client is not protected we have no gurantee they won't be free'd in the event loop + c->lock.unlock(); + vecclients.push_back(c); + } } + aeReleaseLock(); try @@ -3129,18 +3140,18 @@ int processEventsWhileBlocked(int iel) { { // Caller expects us to be locked so fix and rethrow AeLocker locker; - if (c != nullptr) - c->lock.lock(); - locker.arm(c); + locker.arm(nullptr); locker.release(); + for (client *c : vecclients) + c->lock.lock(); throw; } AeLocker locker; - if (c != nullptr) - c->lock.lock(); - locker.arm(c); + locker.arm(nullptr); locker.release(); + for (client *c : vecclients) + c->lock.lock(); return count; } From 1bdccb3c424a6dd86119c848c8e35a217929fe87 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 19 Mar 2020 15:37:24 -0400 Subject: [PATCH 23/24] Log which thread a message came from Former-commit-id: bc1eccb66d3302d6c99588fb4a5a879e1ef243b1 --- src/server.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/server.cpp b/src/server.cpp index d6fe1ba1f..22bca3963 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1037,6 +1037,7 @@ struct redisCommand redisCommandTable[] = { /* We use a private localtime implementation which is fork-safe. The logging * function of Redis may be called from other threads. */ extern "C" void nolocks_localtime(struct tm *tmp, time_t t, time_t tz, int dst); +extern "C" pid_t gettid(); /* Low level logging. To use only for very big messages, otherwise * serverLog() is to prefer. */ @@ -1074,8 +1075,8 @@ void serverLogRaw(int level, const char *msg) { } else { role_char = (listLength(g_pserver->masters) ? 'S':'M'); /* Slave or Master. */ } - fprintf(fp,"%d:%c %s %c %s\n", - (int)getpid(),role_char, buf,c[level],msg); + fprintf(fp,"%d:%d:%c %s %c %s\n", + (int)getpid(),(int)gettid(),role_char, buf,c[level],msg); } fflush(fp); From 69806aaa7bb5cb529bdfe311739b328ce7a9f61b Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 19 Mar 2020 15:37:49 -0400 Subject: [PATCH 24/24] Print stack traces of other threads in the deadlock detector Former-commit-id: 6b0172b9bf124372f4c8688c91c380c6c1b607c7 --- src/fastlock.cpp | 92 ++++++++++++++++++++++++++++++++++-------------- 1 file changed, 66 insertions(+), 26 deletions(-) diff --git a/src/fastlock.cpp b/src/fastlock.cpp index 49c0fb095..861bfb7ca 100644 --- a/src/fastlock.cpp +++ b/src/fastlock.cpp @@ -43,6 +43,7 @@ #include #include #include +#include "config.h" #ifdef __APPLE__ #include @@ -60,6 +61,11 @@ #define UNUSED(x) ((void)x) #endif +#ifdef HAVE_BACKTRACE +#include +__attribute__((weak)) void logStackTrace(ucontext_t *) {} +#endif + extern int g_fInCrash; /**************************************************** @@ -149,6 +155,43 @@ __attribute__((weak)) void serverLog(int , const char *fmt, ...) printf("\n"); } +extern "C" pid_t gettid() +{ + static thread_local int pidCache = -1; +#ifdef __linux__ + if (pidCache == -1) + pidCache = syscall(SYS_gettid); +#else + if (pidCache == -1) { + uint64_t tidT; + pthread_threadid_np(nullptr, &tidT); + assert(tidT < UINT_MAX); + pidCache = (int)tidT; + } +#endif + return pidCache; +} + +void printTrace() +{ +#ifdef HAVE_BACKTRACE + serverLog(3 /*LL_WARNING*/, "printing backtrace for thread %d", gettid()); + ucontext_t ctxt; + getcontext(&ctxt); + logStackTrace(&ctxt); +#endif +} + + +#ifdef __linux__ +static int futex(volatile unsigned *uaddr, int futex_op, int val, + const struct timespec *timeout, int val3) +{ + return syscall(SYS_futex, uaddr, futex_op, val, + timeout, uaddr, val3); +} +#endif + class DeadlockDetector { std::map m_mapwait; @@ -156,9 +199,19 @@ class DeadlockDetector public: void registerwait(fastlock *lock, pid_t thispid) { + static volatile bool fInDeadlock = false; + if (lock == &m_lock || g_fInCrash) return; fastlock_lock(&m_lock); + + if (fInDeadlock) + { + printTrace(); + fastlock_unlock(&m_lock); + return; + } + m_mapwait.insert(std::make_pair(thispid, lock)); // Detect cycles @@ -184,6 +237,19 @@ public: if (pidCheck == thispid) break; } + // Wake All sleeping threads so they can print their callstacks +#ifdef HAVE_BACKTRACE +#ifdef __linux__ + int mask = -1; + fInDeadlock = true; + fastlock_unlock(&m_lock); + futex(&lock->m_ticket.u, FUTEX_WAKE_BITSET_PRIVATE, INT_MAX, nullptr, mask); + futex(&itr->second->m_ticket.u, FUTEX_WAKE_BITSET_PRIVATE, INT_MAX, nullptr, mask); + sleep(2); + fastlock_lock(&m_lock); + printTrace(); +#endif +#endif serverLog(3 /*LL_WARNING*/, "!!! KeyDB Will Now Crash !!!"); _serverPanic(__FILE__, __LINE__, "Deadlock detected"); } @@ -222,32 +288,6 @@ uint64_t fastlock_getlongwaitcount() return rval; } -#ifdef __linux__ -static int futex(volatile unsigned *uaddr, int futex_op, int val, - const struct timespec *timeout, int val3) -{ - return syscall(SYS_futex, uaddr, futex_op, val, - timeout, uaddr, val3); -} -#endif - -extern "C" pid_t gettid() -{ - static thread_local int pidCache = -1; -#ifdef __linux__ - if (pidCache == -1) - pidCache = syscall(SYS_gettid); -#else - if (pidCache == -1) { - uint64_t tidT; - pthread_threadid_np(nullptr, &tidT); - assert(tidT < UINT_MAX); - pidCache = (int)tidT; - } -#endif - return pidCache; -} - extern "C" void fastlock_sleep(fastlock *lock, pid_t pid, unsigned wake, unsigned mask) { #ifdef __linux__