convert the pending write list to a vector for better performance and less cache thrashing
Former-commit-id: ae33ff3a6ef6d47bab63cf8362055c1ed210dbad
This commit is contained in:
parent
924509f52f
commit
9725fadaba
@ -3806,7 +3806,7 @@ void moduleHandleBlockedClients(int iel) {
|
||||
AssertCorrectThread(c);
|
||||
|
||||
fastlock_lock(&g_pserver->rgthreadvar[c->iel].lockPendingWrite);
|
||||
listAddNodeHead(g_pserver->rgthreadvar[c->iel].clients_pending_write,c);
|
||||
g_pserver->rgthreadvar[c->iel].clients_pending_write.push_back(c);
|
||||
fastlock_unlock(&g_pserver->rgthreadvar[c->iel].lockPendingWrite);
|
||||
}
|
||||
}
|
||||
|
@ -208,7 +208,7 @@ void clientInstallWriteHandler(client *c) {
|
||||
* we'll not be able to write the whole reply at once. */
|
||||
c->flags |= CLIENT_PENDING_WRITE;
|
||||
std::unique_lock<fastlock> lockf(g_pserver->rgthreadvar[c->iel].lockPendingWrite);
|
||||
listAddNodeHead(g_pserver->rgthreadvar[c->iel].clients_pending_write,c);
|
||||
g_pserver->rgthreadvar[c->iel].clients_pending_write.push_back(c);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1212,9 +1212,10 @@ void unlinkClient(client *c) {
|
||||
/* Remove from the list of pending writes if needed. */
|
||||
if (c->flags & CLIENT_PENDING_WRITE) {
|
||||
std::unique_lock<fastlock> lockf(g_pserver->rgthreadvar[c->iel].lockPendingWrite);
|
||||
ln = listSearchKey(g_pserver->rgthreadvar[c->iel].clients_pending_write,c);
|
||||
serverAssert(ln != NULL);
|
||||
listDelNode(g_pserver->rgthreadvar[c->iel].clients_pending_write,ln);
|
||||
auto itr = std::find(g_pserver->rgthreadvar[c->iel].clients_pending_write.begin(),
|
||||
g_pserver->rgthreadvar[c->iel].clients_pending_write.end(), c);
|
||||
serverAssert(itr != g_pserver->rgthreadvar[c->iel].clients_pending_write.end());
|
||||
g_pserver->rgthreadvar[c->iel].clients_pending_write.erase(itr);
|
||||
c->flags &= ~CLIENT_PENDING_WRITE;
|
||||
}
|
||||
|
||||
@ -1584,21 +1585,17 @@ void ProcessPendingAsyncWrites()
|
||||
* need to use a syscall in order to install the writable event handler,
|
||||
* get it called, and so forth. */
|
||||
int handleClientsWithPendingWrites(int iel) {
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
|
||||
std::unique_lock<fastlock> lockf(g_pserver->rgthreadvar[iel].lockPendingWrite);
|
||||
list *list = g_pserver->rgthreadvar[iel].clients_pending_write;
|
||||
int processed = listLength(list);
|
||||
auto &vec = g_pserver->rgthreadvar[iel].clients_pending_write;
|
||||
int processed = (int)vec.size();
|
||||
serverAssert(iel == (serverTL - g_pserver->rgthreadvar));
|
||||
|
||||
listRewind(list,&li);
|
||||
while((ln = listNext(&li))) {
|
||||
client *c = (client*)listNodeValue(ln);
|
||||
while(!vec.empty()) {
|
||||
client *c = vec.back();
|
||||
std::unique_lock<decltype(c->lock)> lock(c->lock);
|
||||
|
||||
c->flags &= ~CLIENT_PENDING_WRITE;
|
||||
listDelNode(list,ln);
|
||||
vec.pop_back();
|
||||
AssertCorrectThread(c);
|
||||
|
||||
/* If a client is protected, don't do anything,
|
||||
|
@ -2859,7 +2859,6 @@ static void initNetworking(int fReusePort)
|
||||
|
||||
static void initServerThread(struct redisServerThreadVars *pvar, int fMain)
|
||||
{
|
||||
pvar->clients_pending_write = listCreate();
|
||||
pvar->unblocked_clients = listCreate();
|
||||
pvar->clients_pending_asyncwrite = listCreate();
|
||||
pvar->ipfd_count = 0;
|
||||
@ -5135,7 +5134,7 @@ int main(int argc, char **argv) {
|
||||
if (background) daemonize();
|
||||
|
||||
initServer();
|
||||
initNetworking(cserver.cthreads > 1 /* fReusePort */);
|
||||
initNetworking(/*cserver.cthreads >*/ 1 /* fReusePort */);
|
||||
|
||||
if (background || cserver.pidfile) createPidFile();
|
||||
redisSetProcTitle(argv[0]);
|
||||
|
@ -50,6 +50,8 @@
|
||||
#include <syslog.h>
|
||||
#include <netinet/in.h>
|
||||
#include <atomic>
|
||||
#include <vector>
|
||||
#include <algorithm>
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#include <lua.h>
|
||||
@ -1141,7 +1143,7 @@ struct redisServerThreadVars {
|
||||
aeEventLoop *el;
|
||||
int ipfd[CONFIG_BINDADDR_MAX]; /* TCP socket file descriptors */
|
||||
int ipfd_count; /* Used slots in ipfd[] */
|
||||
list *clients_pending_write; /* There is to write or install handler. */
|
||||
std::vector<client*> clients_pending_write; /* There is to write or install handler. */
|
||||
list *unblocked_clients; /* list of clients to unblock before next loop NOT THREADSAFE */
|
||||
list *clients_pending_asyncwrite;
|
||||
int cclients;
|
||||
|
Loading…
x
Reference in New Issue
Block a user