diff --git a/src/module.cpp b/src/module.cpp index 66c62b696..568205f90 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -3806,7 +3806,7 @@ void moduleHandleBlockedClients(int iel) { AssertCorrectThread(c); fastlock_lock(&g_pserver->rgthreadvar[c->iel].lockPendingWrite); - listAddNodeHead(g_pserver->rgthreadvar[c->iel].clients_pending_write,c); + g_pserver->rgthreadvar[c->iel].clients_pending_write.push_back(c); fastlock_unlock(&g_pserver->rgthreadvar[c->iel].lockPendingWrite); } } diff --git a/src/networking.cpp b/src/networking.cpp index fa6740048..0159ba9f9 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -208,7 +208,7 @@ void clientInstallWriteHandler(client *c) { * we'll not be able to write the whole reply at once. */ c->flags |= CLIENT_PENDING_WRITE; std::unique_lock lockf(g_pserver->rgthreadvar[c->iel].lockPendingWrite); - listAddNodeHead(g_pserver->rgthreadvar[c->iel].clients_pending_write,c); + g_pserver->rgthreadvar[c->iel].clients_pending_write.push_back(c); } } @@ -1212,9 +1212,10 @@ void unlinkClient(client *c) { /* Remove from the list of pending writes if needed. */ if (c->flags & CLIENT_PENDING_WRITE) { std::unique_lock lockf(g_pserver->rgthreadvar[c->iel].lockPendingWrite); - ln = listSearchKey(g_pserver->rgthreadvar[c->iel].clients_pending_write,c); - serverAssert(ln != NULL); - listDelNode(g_pserver->rgthreadvar[c->iel].clients_pending_write,ln); + auto itr = std::find(g_pserver->rgthreadvar[c->iel].clients_pending_write.begin(), + g_pserver->rgthreadvar[c->iel].clients_pending_write.end(), c); + serverAssert(itr != g_pserver->rgthreadvar[c->iel].clients_pending_write.end()); + g_pserver->rgthreadvar[c->iel].clients_pending_write.erase(itr); c->flags &= ~CLIENT_PENDING_WRITE; } @@ -1584,21 +1585,17 @@ void ProcessPendingAsyncWrites() * need to use a syscall in order to install the writable event handler, * get it called, and so forth. */ int handleClientsWithPendingWrites(int iel) { - listIter li; - listNode *ln; - std::unique_lock lockf(g_pserver->rgthreadvar[iel].lockPendingWrite); - list *list = g_pserver->rgthreadvar[iel].clients_pending_write; - int processed = listLength(list); + auto &vec = g_pserver->rgthreadvar[iel].clients_pending_write; + int processed = (int)vec.size(); serverAssert(iel == (serverTL - g_pserver->rgthreadvar)); - listRewind(list,&li); - while((ln = listNext(&li))) { - client *c = (client*)listNodeValue(ln); + while(!vec.empty()) { + client *c = vec.back(); std::unique_locklock)> lock(c->lock); c->flags &= ~CLIENT_PENDING_WRITE; - listDelNode(list,ln); + vec.pop_back(); AssertCorrectThread(c); /* If a client is protected, don't do anything, diff --git a/src/server.cpp b/src/server.cpp index 443ff53ee..fdd1de4d2 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2859,7 +2859,6 @@ static void initNetworking(int fReusePort) static void initServerThread(struct redisServerThreadVars *pvar, int fMain) { - pvar->clients_pending_write = listCreate(); pvar->unblocked_clients = listCreate(); pvar->clients_pending_asyncwrite = listCreate(); pvar->ipfd_count = 0; @@ -5135,7 +5134,7 @@ int main(int argc, char **argv) { if (background) daemonize(); initServer(); - initNetworking(cserver.cthreads > 1 /* fReusePort */); + initNetworking(/*cserver.cthreads >*/ 1 /* fReusePort */); if (background || cserver.pidfile) createPidFile(); redisSetProcTitle(argv[0]); diff --git a/src/server.h b/src/server.h index d3765a000..80281fac4 100644 --- a/src/server.h +++ b/src/server.h @@ -50,6 +50,8 @@ #include #include #include +#include +#include #ifdef __cplusplus extern "C" { #include @@ -1141,7 +1143,7 @@ struct redisServerThreadVars { aeEventLoop *el; int ipfd[CONFIG_BINDADDR_MAX]; /* TCP socket file descriptors */ int ipfd_count; /* Used slots in ipfd[] */ - list *clients_pending_write; /* There is to write or install handler. */ + std::vector clients_pending_write; /* There is to write or install handler. */ list *unblocked_clients; /* list of clients to unblock before next loop NOT THREADSAFE */ list *clients_pending_asyncwrite; int cclients;