Merge branch 'unstable' into keydbpro
Former-commit-id: 53a062a6b796ccbd43560fed1b05367ba759da96
This commit is contained in:
commit
07a52ceb67
@ -379,6 +379,11 @@ int aeGetSetSize(aeEventLoop *eventLoop) {
|
|||||||
return eventLoop->setsize;
|
return eventLoop->setsize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Return the current EventLoop. */
|
||||||
|
aeEventLoop *aeGetCurrentEventLoop(){
|
||||||
|
return g_eventLoopThisThread;
|
||||||
|
}
|
||||||
|
|
||||||
/* Tells the next iteration/s of the event processing to set timeout of 0. */
|
/* Tells the next iteration/s of the event processing to set timeout of 0. */
|
||||||
void aeSetDontWait(aeEventLoop *eventLoop, int noWait) {
|
void aeSetDontWait(aeEventLoop *eventLoop, int noWait) {
|
||||||
if (noWait)
|
if (noWait)
|
||||||
|
1
src/ae.h
1
src/ae.h
@ -160,6 +160,7 @@ const char *aeGetApiName(void);
|
|||||||
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep, int flags);
|
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep, int flags);
|
||||||
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep, int flags);
|
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep, int flags);
|
||||||
int aeGetSetSize(aeEventLoop *eventLoop);
|
int aeGetSetSize(aeEventLoop *eventLoop);
|
||||||
|
aeEventLoop *aeGetCurrentEventLoop();
|
||||||
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);
|
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);
|
||||||
void aeSetDontWait(aeEventLoop *eventLoop, int noWait);
|
void aeSetDontWait(aeEventLoop *eventLoop, int noWait);
|
||||||
|
|
||||||
|
@ -2349,17 +2349,42 @@ static int updateMaxclients(long long val, long long prev, const char **err) {
|
|||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
for (int iel = 0; iel < cserver.cthreads; ++iel)
|
/* Change the SetSize for the current thread first. If any error, return the error message to the client,
|
||||||
{
|
* otherwise, continue to do the same for other threads */
|
||||||
if ((unsigned int) aeGetSetSize(g_pserver->rgthreadvar[iel].el) <
|
if ((unsigned int) aeGetSetSize(aeGetCurrentEventLoop()) <
|
||||||
g_pserver->maxclients + CONFIG_FDSET_INCR)
|
g_pserver->maxclients + CONFIG_FDSET_INCR)
|
||||||
{
|
{
|
||||||
if (aeResizeSetSize(g_pserver->rgthreadvar[iel].el,
|
if (aeResizeSetSize(aeGetCurrentEventLoop(),
|
||||||
g_pserver->maxclients + CONFIG_FDSET_INCR) == AE_ERR)
|
g_pserver->maxclients + CONFIG_FDSET_INCR) == AE_ERR)
|
||||||
{
|
{
|
||||||
*err = "The event loop API used by Redis is not able to handle the specified number of clients";
|
*err = "The event loop API used by Redis is not able to handle the specified number of clients";
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
serverLog(LL_DEBUG,"Successfully changed the setsize for current thread %d", ielFromEventLoop(aeGetCurrentEventLoop()));
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int iel = 0; iel < cserver.cthreads; ++iel)
|
||||||
|
{
|
||||||
|
if (g_pserver->rgthreadvar[iel].el == aeGetCurrentEventLoop()){
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((unsigned int) aeGetSetSize(g_pserver->rgthreadvar[iel].el) <
|
||||||
|
g_pserver->maxclients + CONFIG_FDSET_INCR)
|
||||||
|
{
|
||||||
|
int res = aePostFunction(g_pserver->rgthreadvar[iel].el, [iel] {
|
||||||
|
if (aeResizeSetSize(g_pserver->rgthreadvar[iel].el, g_pserver->maxclients + CONFIG_FDSET_INCR) == AE_ERR) {
|
||||||
|
serverLog(LL_WARNING,"Failed to change the setsize for Thread %d", iel);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
if (res != AE_OK){
|
||||||
|
static char msg[128];
|
||||||
|
sprintf(msg, "Failed to post the request to change setsize for Thread %d", iel);
|
||||||
|
*err = msg;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
serverLog(LL_DEBUG,"Successfully post the request to change the setsize for thread %d", iel);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1548,7 +1548,7 @@ int freeClientsInAsyncFreeQueue(int iel) {
|
|||||||
while((ln = listNext(&li)))
|
while((ln = listNext(&li)))
|
||||||
{
|
{
|
||||||
client *c = (client*)listNodeValue(ln);
|
client *c = (client*)listNodeValue(ln);
|
||||||
if (c->iel == iel && !(c->flags & CLIENT_PROTECTED))
|
if (c->iel == iel && !(c->flags & CLIENT_PROTECTED) && !c->casyncOpsPending)
|
||||||
{
|
{
|
||||||
vecclientsFree.push_back(c);
|
vecclientsFree.push_back(c);
|
||||||
listDelNode(g_pserver->clients_to_close, ln);
|
listDelNode(g_pserver->clients_to_close, ln);
|
||||||
|
@ -4298,8 +4298,8 @@ bool client::postFunction(std::function<void(client *)> fn, bool fLock) {
|
|||||||
this->casyncOpsPending++;
|
this->casyncOpsPending++;
|
||||||
return aePostFunction(g_pserver->rgthreadvar[this->iel].el, [this, fn]{
|
return aePostFunction(g_pserver->rgthreadvar[this->iel].el, [this, fn]{
|
||||||
std::lock_guard<decltype(this->lock)> lock(this->lock);
|
std::lock_guard<decltype(this->lock)> lock(this->lock);
|
||||||
--casyncOpsPending;
|
|
||||||
fn(this);
|
fn(this);
|
||||||
|
--casyncOpsPending;
|
||||||
}, false, fLock) == AE_OK;
|
}, false, fLock) == AE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -6078,7 +6078,7 @@ int main(int argc, char **argv) {
|
|||||||
exit(EXIT_FAILURE);
|
exit(EXIT_FAILURE);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int iel = 0; iel < MAX_EVENT_LOOPS; ++iel)
|
for (int iel = 0; iel < cserver.cthreads; ++iel)
|
||||||
{
|
{
|
||||||
initServerThread(g_pserver->rgthreadvar+iel, iel == IDX_EVENT_LOOP_MAIN);
|
initServerThread(g_pserver->rgthreadvar+iel, iel == IDX_EVENT_LOOP_MAIN);
|
||||||
}
|
}
|
||||||
@ -6171,9 +6171,13 @@ int main(int argc, char **argv) {
|
|||||||
setOOMScoreAdj(-1);
|
setOOMScoreAdj(-1);
|
||||||
serverAssert(cserver.cthreads > 0 && cserver.cthreads <= MAX_EVENT_LOOPS);
|
serverAssert(cserver.cthreads > 0 && cserver.cthreads <= MAX_EVENT_LOOPS);
|
||||||
pthread_t rgthread[MAX_EVENT_LOOPS];
|
pthread_t rgthread[MAX_EVENT_LOOPS];
|
||||||
|
|
||||||
|
pthread_attr_t tattr;
|
||||||
|
pthread_attr_init(&tattr);
|
||||||
|
pthread_attr_setstacksize(&tattr, 1 << 23); // 8 MB
|
||||||
for (int iel = 0; iel < cserver.cthreads; ++iel)
|
for (int iel = 0; iel < cserver.cthreads; ++iel)
|
||||||
{
|
{
|
||||||
pthread_create(rgthread + iel, NULL, workerThreadMain, (void*)((int64_t)iel));
|
pthread_create(rgthread + iel, &tattr, workerThreadMain, (void*)((int64_t)iel));
|
||||||
if (cserver.fThreadAffinity)
|
if (cserver.fThreadAffinity)
|
||||||
{
|
{
|
||||||
#ifdef __linux__
|
#ifdef __linux__
|
||||||
|
Loading…
x
Reference in New Issue
Block a user