diff --git a/src/ae.cpp b/src/ae.cpp index 53f88bb82..7459ea373 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -379,6 +379,11 @@ int aeGetSetSize(aeEventLoop *eventLoop) { return eventLoop->setsize; } +/* Return the current EventLoop. */ +aeEventLoop *aeGetCurrentEventLoop(){ + return g_eventLoopThisThread; +} + /* Tells the next iteration/s of the event processing to set timeout of 0. */ void aeSetDontWait(aeEventLoop *eventLoop, int noWait) { if (noWait) diff --git a/src/ae.h b/src/ae.h index fdd444d3a..e77abb01f 100644 --- a/src/ae.h +++ b/src/ae.h @@ -160,6 +160,7 @@ const char *aeGetApiName(void); void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep, int flags); void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep, int flags); int aeGetSetSize(aeEventLoop *eventLoop); +aeEventLoop *aeGetCurrentEventLoop(); int aeResizeSetSize(aeEventLoop *eventLoop, int setsize); void aeSetDontWait(aeEventLoop *eventLoop, int noWait); diff --git a/src/config.cpp b/src/config.cpp index 1681b12d2..f0dae226a 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -2349,17 +2349,42 @@ static int updateMaxclients(long long val, long long prev, const char **err) { } return 0; } + /* Change the SetSize for the current thread first. If any error, return the error message to the client, + * otherwise, continue to do the same for other threads */ + if ((unsigned int) aeGetSetSize(aeGetCurrentEventLoop()) < + g_pserver->maxclients + CONFIG_FDSET_INCR) + { + if (aeResizeSetSize(aeGetCurrentEventLoop(), + g_pserver->maxclients + CONFIG_FDSET_INCR) == AE_ERR) + { + *err = "The event loop API used by Redis is not able to handle the specified number of clients"; + return 0; + } + serverLog(LL_DEBUG,"Successfully changed the setsize for current thread %d", ielFromEventLoop(aeGetCurrentEventLoop())); + } + for (int iel = 0; iel < cserver.cthreads; ++iel) { + if (g_pserver->rgthreadvar[iel].el == aeGetCurrentEventLoop()){ + continue; + } + if ((unsigned int) aeGetSetSize(g_pserver->rgthreadvar[iel].el) < g_pserver->maxclients + CONFIG_FDSET_INCR) { - if (aeResizeSetSize(g_pserver->rgthreadvar[iel].el, - g_pserver->maxclients + CONFIG_FDSET_INCR) == AE_ERR) - { - *err = "The event loop API used by Redis is not able to handle the specified number of clients"; + int res = aePostFunction(g_pserver->rgthreadvar[iel].el, [iel] { + if (aeResizeSetSize(g_pserver->rgthreadvar[iel].el, g_pserver->maxclients + CONFIG_FDSET_INCR) == AE_ERR) { + serverLog(LL_WARNING,"Failed to change the setsize for Thread %d", iel); + } + }); + + if (res != AE_OK){ + static char msg[128]; + sprintf(msg, "Failed to post the request to change setsize for Thread %d", iel); + *err = msg; return 0; } + serverLog(LL_DEBUG,"Successfully post the request to change the setsize for thread %d", iel); } } } diff --git a/src/networking.cpp b/src/networking.cpp index 9c968da32..2c0fc3f15 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1548,7 +1548,7 @@ int freeClientsInAsyncFreeQueue(int iel) { while((ln = listNext(&li))) { client *c = (client*)listNodeValue(ln); - if (c->iel == iel && !(c->flags & CLIENT_PROTECTED)) + if (c->iel == iel && !(c->flags & CLIENT_PROTECTED) && !c->casyncOpsPending) { vecclientsFree.push_back(c); listDelNode(g_pserver->clients_to_close, ln); diff --git a/src/server.cpp b/src/server.cpp index 181f9defc..e205ab775 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -4298,8 +4298,8 @@ bool client::postFunction(std::function fn, bool fLock) { this->casyncOpsPending++; return aePostFunction(g_pserver->rgthreadvar[this->iel].el, [this, fn]{ std::lock_guardlock)> lock(this->lock); - --casyncOpsPending; fn(this); + --casyncOpsPending; }, false, fLock) == AE_OK; } @@ -6078,7 +6078,7 @@ int main(int argc, char **argv) { exit(EXIT_FAILURE); } - for (int iel = 0; iel < MAX_EVENT_LOOPS; ++iel) + for (int iel = 0; iel < cserver.cthreads; ++iel) { initServerThread(g_pserver->rgthreadvar+iel, iel == IDX_EVENT_LOOP_MAIN); } @@ -6171,9 +6171,13 @@ int main(int argc, char **argv) { setOOMScoreAdj(-1); serverAssert(cserver.cthreads > 0 && cserver.cthreads <= MAX_EVENT_LOOPS); pthread_t rgthread[MAX_EVENT_LOOPS]; + + pthread_attr_t tattr; + pthread_attr_init(&tattr); + pthread_attr_setstacksize(&tattr, 1 << 23); // 8 MB for (int iel = 0; iel < cserver.cthreads; ++iel) { - pthread_create(rgthread + iel, NULL, workerThreadMain, (void*)((int64_t)iel)); + pthread_create(rgthread + iel, &tattr, workerThreadMain, (void*)((int64_t)iel)); if (cserver.fThreadAffinity) { #ifdef __linux__