Merge branch 'unstable' of https://github.com/JohnSully/KeyDB into unstable

Former-commit-id: d03c2de9017b73b5d339fe76dd99292f27dfcd52
This commit is contained in:
John Sully 2020-12-11 03:59:50 +00:00
commit 248b6a52e3
4 changed files with 41 additions and 6 deletions

View File

@ -377,6 +377,11 @@ int aeGetSetSize(aeEventLoop *eventLoop) {
return eventLoop->setsize; return eventLoop->setsize;
} }
/* Return the current EventLoop. */
aeEventLoop *aeGetCurrentEventLoop(){
return g_eventLoopThisThread;
}
/* Tells the next iteration/s of the event processing to set timeout of 0. */ /* Tells the next iteration/s of the event processing to set timeout of 0. */
void aeSetDontWait(aeEventLoop *eventLoop, int noWait) { void aeSetDontWait(aeEventLoop *eventLoop, int noWait) {
if (noWait) if (noWait)

View File

@ -160,6 +160,7 @@ const char *aeGetApiName(void);
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep, int flags); void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep, int flags);
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep, int flags); void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep, int flags);
int aeGetSetSize(aeEventLoop *eventLoop); int aeGetSetSize(aeEventLoop *eventLoop);
aeEventLoop *aeGetCurrentEventLoop();
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize); int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);
void aeSetDontWait(aeEventLoop *eventLoop, int noWait); void aeSetDontWait(aeEventLoop *eventLoop, int noWait);

View File

@ -2273,17 +2273,42 @@ static int updateMaxclients(long long val, long long prev, const char **err) {
} }
return 0; return 0;
} }
for (int iel = 0; iel < cserver.cthreads; ++iel) /* Change the SetSize for the current thread first. If any error, return the error message to the client,
{ * otherwise, continue to do the same for other threads */
if ((unsigned int) aeGetSetSize(g_pserver->rgthreadvar[iel].el) < if ((unsigned int) aeGetSetSize(aeGetCurrentEventLoop()) <
g_pserver->maxclients + CONFIG_FDSET_INCR) g_pserver->maxclients + CONFIG_FDSET_INCR)
{ {
if (aeResizeSetSize(g_pserver->rgthreadvar[iel].el, if (aeResizeSetSize(aeGetCurrentEventLoop(),
g_pserver->maxclients + CONFIG_FDSET_INCR) == AE_ERR) g_pserver->maxclients + CONFIG_FDSET_INCR) == AE_ERR)
{ {
*err = "The event loop API used by Redis is not able to handle the specified number of clients"; *err = "The event loop API used by Redis is not able to handle the specified number of clients";
return 0; return 0;
} }
serverLog(LL_DEBUG,"Successfully changed the setsize for current thread %d", ielFromEventLoop(aeGetCurrentEventLoop()));
}
for (int iel = 0; iel < cserver.cthreads; ++iel)
{
if (g_pserver->rgthreadvar[iel].el == aeGetCurrentEventLoop()){
continue;
}
if ((unsigned int) aeGetSetSize(g_pserver->rgthreadvar[iel].el) <
g_pserver->maxclients + CONFIG_FDSET_INCR)
{
int res = aePostFunction(g_pserver->rgthreadvar[iel].el, [iel] {
if (aeResizeSetSize(g_pserver->rgthreadvar[iel].el, g_pserver->maxclients + CONFIG_FDSET_INCR) == AE_ERR) {
serverLog(LL_WARNING,"Failed to change the setsize for Thread %d", iel);
}
});
if (res != AE_OK){
static char msg[128];
sprintf(msg, "Failed to post the request to change setsize for Thread %d", iel);
*err = msg;
return 0;
}
serverLog(LL_DEBUG,"Successfully post the request to change the setsize for thread %d", iel);
} }
} }
} }

View File

@ -5695,7 +5695,7 @@ int main(int argc, char **argv) {
validateConfiguration(); validateConfiguration();
for (int iel = 0; iel < MAX_EVENT_LOOPS; ++iel) for (int iel = 0; iel < cserver.cthreads; ++iel)
{ {
initServerThread(g_pserver->rgthreadvar+iel, iel == IDX_EVENT_LOOP_MAIN); initServerThread(g_pserver->rgthreadvar+iel, iel == IDX_EVENT_LOOP_MAIN);
} }
@ -5788,9 +5788,13 @@ int main(int argc, char **argv) {
setOOMScoreAdj(-1); setOOMScoreAdj(-1);
serverAssert(cserver.cthreads > 0 && cserver.cthreads <= MAX_EVENT_LOOPS); serverAssert(cserver.cthreads > 0 && cserver.cthreads <= MAX_EVENT_LOOPS);
pthread_t rgthread[MAX_EVENT_LOOPS]; pthread_t rgthread[MAX_EVENT_LOOPS];
pthread_attr_t tattr;
pthread_attr_init(&tattr);
pthread_attr_setstacksize(&tattr, 1 << 23); // 8 MB
for (int iel = 0; iel < cserver.cthreads; ++iel) for (int iel = 0; iel < cserver.cthreads; ++iel)
{ {
pthread_create(rgthread + iel, NULL, workerThreadMain, (void*)((int64_t)iel)); pthread_create(rgthread + iel, &tattr, workerThreadMain, (void*)((int64_t)iel));
if (cserver.fThreadAffinity) if (cserver.fThreadAffinity)
{ {
#ifdef __linux__ #ifdef __linux__