diff --git a/src/networking.cpp b/src/networking.cpp index 3aab1e62c..b36eea49e 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -2460,14 +2460,26 @@ void readQueryFromClient(connection *conn) { return; } - /* There is more data in the client input buffer, continue parsing it - * in case to check if there is a full command to execute. */ - processInputBuffer(c, CMD_CALL_FULL); + serverTL->vecclientsProcess.push_back(c); +} + +void processClients() +{ + aeAcquireLock(); + for (client *c : serverTL->vecclientsProcess) { + /* There is more data in the client input buffer, continue parsing it + * in case to check if there is a full command to execute. */ + std::unique_lock ul(c->lock); + processInputBuffer(c, CMD_CALL_FULL); + } + if (listLength(serverTL->clients_pending_asyncwrite)) { - aelock.arm(c); ProcessPendingAsyncWrites(); } + aeReleaseLock(); + + serverTL->vecclientsProcess.clear(); } void getClientsMaxBuffers(unsigned long *longest_output_list, diff --git a/src/server.cpp b/src/server.cpp index 95b805363..458a0f48f 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2185,6 +2185,7 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData } extern int ProcessingEventsWhileBlocked; +void processClients(); /* This function gets called every time Redis is entering the * main loop of the event driven library, that is, before to sleep @@ -2203,6 +2204,7 @@ extern int ProcessingEventsWhileBlocked; void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); int iel = ielFromEventLoop(eventLoop); + processClients(); /* Handle precise timeouts of blocked clients. */ handleBlockedClientsTimeout(); @@ -2287,6 +2289,7 @@ void beforeSleepLite(struct aeEventLoop *eventLoop) /* Try to process pending commands for clients that were just unblocked. */ aeAcquireLock(); + processClients(); if (listLength(g_pserver->rgthreadvar[iel].unblocked_clients)) { processUnblockedClients(iel); } diff --git a/src/server.h b/src/server.h index 4ed40339f..d51d2354a 100644 --- a/src/server.h +++ b/src/server.h @@ -1572,6 +1572,7 @@ struct redisServerThreadVars { char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ long unsigned commandsExecuted = 0; bool fRetrySetAofEvent = false; + std::vector vecclientsProcess; }; struct redisMaster {