Dramatically improve perf by blocking commands
Former-commit-id: e47584b286c41cf0783fe014ac8b6ec187564ade
This commit is contained in:
parent
e87dee8dc7
commit
b6d8a5938d
@ -2460,14 +2460,26 @@ void readQueryFromClient(connection *conn) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* There is more data in the client input buffer, continue parsing it
|
serverTL->vecclientsProcess.push_back(c);
|
||||||
* in case to check if there is a full command to execute. */
|
}
|
||||||
processInputBuffer(c, CMD_CALL_FULL);
|
|
||||||
|
void processClients()
|
||||||
|
{
|
||||||
|
aeAcquireLock();
|
||||||
|
for (client *c : serverTL->vecclientsProcess) {
|
||||||
|
/* There is more data in the client input buffer, continue parsing it
|
||||||
|
* in case to check if there is a full command to execute. */
|
||||||
|
std::unique_lock<fastlock> ul(c->lock);
|
||||||
|
processInputBuffer(c, CMD_CALL_FULL);
|
||||||
|
}
|
||||||
|
|
||||||
if (listLength(serverTL->clients_pending_asyncwrite))
|
if (listLength(serverTL->clients_pending_asyncwrite))
|
||||||
{
|
{
|
||||||
aelock.arm(c);
|
|
||||||
ProcessPendingAsyncWrites();
|
ProcessPendingAsyncWrites();
|
||||||
}
|
}
|
||||||
|
aeReleaseLock();
|
||||||
|
|
||||||
|
serverTL->vecclientsProcess.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
void getClientsMaxBuffers(unsigned long *longest_output_list,
|
void getClientsMaxBuffers(unsigned long *longest_output_list,
|
||||||
|
@ -2185,6 +2185,7 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData
|
|||||||
}
|
}
|
||||||
|
|
||||||
extern int ProcessingEventsWhileBlocked;
|
extern int ProcessingEventsWhileBlocked;
|
||||||
|
void processClients();
|
||||||
|
|
||||||
/* This function gets called every time Redis is entering the
|
/* This function gets called every time Redis is entering the
|
||||||
* main loop of the event driven library, that is, before to sleep
|
* main loop of the event driven library, that is, before to sleep
|
||||||
@ -2203,6 +2204,7 @@ extern int ProcessingEventsWhileBlocked;
|
|||||||
void beforeSleep(struct aeEventLoop *eventLoop) {
|
void beforeSleep(struct aeEventLoop *eventLoop) {
|
||||||
UNUSED(eventLoop);
|
UNUSED(eventLoop);
|
||||||
int iel = ielFromEventLoop(eventLoop);
|
int iel = ielFromEventLoop(eventLoop);
|
||||||
|
processClients();
|
||||||
|
|
||||||
/* Handle precise timeouts of blocked clients. */
|
/* Handle precise timeouts of blocked clients. */
|
||||||
handleBlockedClientsTimeout();
|
handleBlockedClientsTimeout();
|
||||||
@ -2287,6 +2289,7 @@ void beforeSleepLite(struct aeEventLoop *eventLoop)
|
|||||||
|
|
||||||
/* Try to process pending commands for clients that were just unblocked. */
|
/* Try to process pending commands for clients that were just unblocked. */
|
||||||
aeAcquireLock();
|
aeAcquireLock();
|
||||||
|
processClients();
|
||||||
if (listLength(g_pserver->rgthreadvar[iel].unblocked_clients)) {
|
if (listLength(g_pserver->rgthreadvar[iel].unblocked_clients)) {
|
||||||
processUnblockedClients(iel);
|
processUnblockedClients(iel);
|
||||||
}
|
}
|
||||||
|
@ -1572,6 +1572,7 @@ struct redisServerThreadVars {
|
|||||||
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
|
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
|
||||||
long unsigned commandsExecuted = 0;
|
long unsigned commandsExecuted = 0;
|
||||||
bool fRetrySetAofEvent = false;
|
bool fRetrySetAofEvent = false;
|
||||||
|
std::vector<client*> vecclientsProcess;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct redisMaster {
|
struct redisMaster {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user