From b82cf7f034e6ebad1aabb46c809f2db98f26f260 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sat, 13 Jul 2019 16:44:11 -0400 Subject: [PATCH] Blocked clients can stall when under low load Former-commit-id: 7468c691ad04829c1fd3ae69f206946e8f38254a --- src/blocked.cpp | 3 +++ src/server.cpp | 18 ++++++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/src/blocked.cpp b/src/blocked.cpp index 1f807dac3..c09488b1f 100644 --- a/src/blocked.cpp +++ b/src/blocked.cpp @@ -64,6 +64,7 @@ */ #include "server.h" +#include int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where); @@ -180,6 +181,7 @@ void queueClientForReprocessing(client *c) { * of operation the client is blocking for. */ void unblockClient(client *c) { serverAssert(GlobalLocksAcquired()); + serverAssert(c->lock.fOwnLock()); if (c->btype == BLOCKED_LIST || c->btype == BLOCKED_ZSET || c->btype == BLOCKED_STREAM) { @@ -301,6 +303,7 @@ void handleClientsBlockedOnKeys(void) { while(numclients--) { listNode *clientnode = listFirst(clients); client *receiver = (client*)clientnode->value; + std::unique_lock lock(receiver->lock); if (receiver->btype != BLOCKED_LIST) { /* Put at the tail, so that at the next call diff --git a/src/server.cpp b/src/server.cpp index fa03b6efc..8afe5641c 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1785,6 +1785,15 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { UNUSED(id); UNUSED(clientData); + /* If another threads unblocked one of our clients, and this thread has been idle + then beforeSleep won't have a chance to process the unblocking. So we also + process them here in the cron job to ensure they don't starve. + */ + if (listLength(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].unblocked_clients)) + { + processUnblockedClients(IDX_EVENT_LOOP_MAIN); + } + ProcessPendingAsyncWrites(); // This is really a bug, but for now catch any laggards that didn't clean up /* Software watchdog: deliver the SIGALRM that will reach the signal @@ -2051,6 +2060,15 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData int iel = ielFromEventLoop(eventLoop); serverAssert(iel != IDX_EVENT_LOOP_MAIN); + + /* If another threads unblocked one of our clients, and this thread has been idle + then beforeSleep won't have a chance to process the unblocking. So we also + process them here in the cron job to ensure they don't starve. + */ + if (listLength(g_pserver->rgthreadvar[iel].unblocked_clients)) + { + processUnblockedClients(iel); + } ProcessPendingAsyncWrites(); // A bug but leave for now, events should clean up after themselves clientsCron(iel);