Blocked clients can stall when under low load
Former-commit-id: 7468c691ad04829c1fd3ae69f206946e8f38254a
This commit is contained in:
parent
0f5b59241f
commit
b82cf7f034
@ -64,6 +64,7 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
#include "server.h"
|
#include "server.h"
|
||||||
|
#include <mutex>
|
||||||
|
|
||||||
int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where);
|
int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where);
|
||||||
|
|
||||||
@ -180,6 +181,7 @@ void queueClientForReprocessing(client *c) {
|
|||||||
* of operation the client is blocking for. */
|
* of operation the client is blocking for. */
|
||||||
void unblockClient(client *c) {
|
void unblockClient(client *c) {
|
||||||
serverAssert(GlobalLocksAcquired());
|
serverAssert(GlobalLocksAcquired());
|
||||||
|
serverAssert(c->lock.fOwnLock());
|
||||||
if (c->btype == BLOCKED_LIST ||
|
if (c->btype == BLOCKED_LIST ||
|
||||||
c->btype == BLOCKED_ZSET ||
|
c->btype == BLOCKED_ZSET ||
|
||||||
c->btype == BLOCKED_STREAM) {
|
c->btype == BLOCKED_STREAM) {
|
||||||
@ -301,6 +303,7 @@ void handleClientsBlockedOnKeys(void) {
|
|||||||
while(numclients--) {
|
while(numclients--) {
|
||||||
listNode *clientnode = listFirst(clients);
|
listNode *clientnode = listFirst(clients);
|
||||||
client *receiver = (client*)clientnode->value;
|
client *receiver = (client*)clientnode->value;
|
||||||
|
std::unique_lock<decltype(client::lock)> lock(receiver->lock);
|
||||||
|
|
||||||
if (receiver->btype != BLOCKED_LIST) {
|
if (receiver->btype != BLOCKED_LIST) {
|
||||||
/* Put at the tail, so that at the next call
|
/* Put at the tail, so that at the next call
|
||||||
|
@ -1785,6 +1785,15 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
|||||||
UNUSED(id);
|
UNUSED(id);
|
||||||
UNUSED(clientData);
|
UNUSED(clientData);
|
||||||
|
|
||||||
|
/* If another threads unblocked one of our clients, and this thread has been idle
|
||||||
|
then beforeSleep won't have a chance to process the unblocking. So we also
|
||||||
|
process them here in the cron job to ensure they don't starve.
|
||||||
|
*/
|
||||||
|
if (listLength(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].unblocked_clients))
|
||||||
|
{
|
||||||
|
processUnblockedClients(IDX_EVENT_LOOP_MAIN);
|
||||||
|
}
|
||||||
|
|
||||||
ProcessPendingAsyncWrites(); // This is really a bug, but for now catch any laggards that didn't clean up
|
ProcessPendingAsyncWrites(); // This is really a bug, but for now catch any laggards that didn't clean up
|
||||||
|
|
||||||
/* Software watchdog: deliver the SIGALRM that will reach the signal
|
/* Software watchdog: deliver the SIGALRM that will reach the signal
|
||||||
@ -2052,6 +2061,15 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData
|
|||||||
int iel = ielFromEventLoop(eventLoop);
|
int iel = ielFromEventLoop(eventLoop);
|
||||||
serverAssert(iel != IDX_EVENT_LOOP_MAIN);
|
serverAssert(iel != IDX_EVENT_LOOP_MAIN);
|
||||||
|
|
||||||
|
/* If another threads unblocked one of our clients, and this thread has been idle
|
||||||
|
then beforeSleep won't have a chance to process the unblocking. So we also
|
||||||
|
process them here in the cron job to ensure they don't starve.
|
||||||
|
*/
|
||||||
|
if (listLength(g_pserver->rgthreadvar[iel].unblocked_clients))
|
||||||
|
{
|
||||||
|
processUnblockedClients(iel);
|
||||||
|
}
|
||||||
|
|
||||||
ProcessPendingAsyncWrites(); // A bug but leave for now, events should clean up after themselves
|
ProcessPendingAsyncWrites(); // A bug but leave for now, events should clean up after themselves
|
||||||
clientsCron(iel);
|
clientsCron(iel);
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user