Fix higher latency at low load by grouping clients to threads. This fixes slow perf in cluster benchmarks mentioned in issue #102
Former-commit-id: 1a4c3224c9848f02fbdb49674045b593cfc41d31
This commit is contained in:
parent
e4d74b993f
commit
fef9925b7f
@ -273,7 +273,8 @@ int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg)
|
|||||||
cmd.proc = proc;
|
cmd.proc = proc;
|
||||||
cmd.clientData = arg;
|
cmd.clientData = arg;
|
||||||
auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
|
auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
|
||||||
AE_ASSERT(size == sizeof(cmd));
|
if (size != sizeof(cmd))
|
||||||
|
return AE_ERR;
|
||||||
return AE_OK;
|
return AE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -296,6 +297,8 @@ int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynch
|
|||||||
}
|
}
|
||||||
|
|
||||||
auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
|
auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
|
||||||
|
if (size != sizeof(cmd))
|
||||||
|
return AE_ERR;
|
||||||
AE_ASSERT(size == sizeof(cmd));
|
AE_ASSERT(size == sizeof(cmd));
|
||||||
int ret = AE_OK;
|
int ret = AE_OK;
|
||||||
if (fSynchronous)
|
if (fSynchronous)
|
||||||
|
@ -167,11 +167,12 @@ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
|
|||||||
* not one already. */
|
* not one already. */
|
||||||
if (!g_pserver->aof_rewrite_pending) {
|
if (!g_pserver->aof_rewrite_pending) {
|
||||||
g_pserver->aof_rewrite_pending = true;
|
g_pserver->aof_rewrite_pending = true;
|
||||||
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [] {
|
int res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [] {
|
||||||
g_pserver->aof_rewrite_pending = false;
|
g_pserver->aof_rewrite_pending = false;
|
||||||
if (g_pserver->aof_pipe_write_data_to_child >= 0)
|
if (g_pserver->aof_pipe_write_data_to_child >= 0)
|
||||||
aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL);
|
aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL);
|
||||||
});
|
});
|
||||||
|
serverAssert(res == AE_OK); // we can't handle an error here
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1563,16 +1564,18 @@ error:
|
|||||||
|
|
||||||
void aofClosePipes(void) {
|
void aofClosePipes(void) {
|
||||||
int fdAofAckPipe = g_pserver->aof_pipe_read_ack_from_child;
|
int fdAofAckPipe = g_pserver->aof_pipe_read_ack_from_child;
|
||||||
aePostFunction(g_pserver->el_alf_pip_read_ack_from_child, [fdAofAckPipe]{
|
int res = aePostFunction(g_pserver->el_alf_pip_read_ack_from_child, [fdAofAckPipe]{
|
||||||
aeDeleteFileEventAsync(serverTL->el,fdAofAckPipe,AE_READABLE);
|
aeDeleteFileEventAsync(serverTL->el,fdAofAckPipe,AE_READABLE);
|
||||||
close (fdAofAckPipe);
|
close (fdAofAckPipe);
|
||||||
});
|
});
|
||||||
|
serverAssert(res == AE_OK);
|
||||||
|
|
||||||
int fdAofWritePipe = g_pserver->aof_pipe_write_data_to_child;
|
int fdAofWritePipe = g_pserver->aof_pipe_write_data_to_child;
|
||||||
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [fdAofWritePipe]{
|
res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [fdAofWritePipe]{
|
||||||
aeDeleteFileEventAsync(serverTL->el,fdAofWritePipe,AE_WRITABLE);
|
aeDeleteFileEventAsync(serverTL->el,fdAofWritePipe,AE_WRITABLE);
|
||||||
close(fdAofWritePipe);
|
close(fdAofWritePipe);
|
||||||
});
|
});
|
||||||
|
serverAssert(res == AE_OK);
|
||||||
g_pserver->aof_pipe_write_data_to_child = -1;
|
g_pserver->aof_pipe_write_data_to_child = -1;
|
||||||
|
|
||||||
close(g_pserver->aof_pipe_read_data_from_parent);
|
close(g_pserver->aof_pipe_read_data_from_parent);
|
||||||
|
@ -295,6 +295,15 @@ int clusterLoadConfig(char *filename) {
|
|||||||
if (clusterGetMaxEpoch() > g_pserver->cluster->currentEpoch) {
|
if (clusterGetMaxEpoch() > g_pserver->cluster->currentEpoch) {
|
||||||
g_pserver->cluster->currentEpoch = clusterGetMaxEpoch();
|
g_pserver->cluster->currentEpoch = clusterGetMaxEpoch();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (dictSize(g_pserver->cluster->nodes) > 1 && cserver.thread_min_client_threshold < 100)
|
||||||
|
{
|
||||||
|
// Because we expect the individual load of a client to be much less in a cluster (it will spread over multiple server)
|
||||||
|
// we can increase the grouping of clients on a single thread within reason
|
||||||
|
cserver.thread_min_client_threshold *= dictSize(g_pserver->cluster->nodes);
|
||||||
|
cserver.thread_min_client_threshold = std::min(cserver.thread_min_client_threshold, 200);
|
||||||
|
serverLog(LL_NOTICE, "Expanding min-clients-per-thread to %d due to cluster", cserver.thread_min_client_threshold);
|
||||||
|
}
|
||||||
return C_OK;
|
return C_OK;
|
||||||
|
|
||||||
fmterr:
|
fmterr:
|
||||||
@ -623,9 +632,10 @@ void freeClusterLink(clusterLink *link) {
|
|||||||
if (link->node)
|
if (link->node)
|
||||||
link->node->link = NULL;
|
link->node->link = NULL;
|
||||||
link->node = nullptr;
|
link->node = nullptr;
|
||||||
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [link]{
|
int res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [link]{
|
||||||
freeClusterLink(link);
|
freeClusterLink(link);
|
||||||
});
|
});
|
||||||
|
serverAssert(res == AE_OK);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (link->fd != -1) {
|
if (link->fd != -1) {
|
||||||
|
@ -805,6 +805,11 @@ void loadServerConfigFromString(char *config) {
|
|||||||
} else if (!strcasecmp(argv[0],"enable-pro")) {
|
} else if (!strcasecmp(argv[0],"enable-pro")) {
|
||||||
cserver.fUsePro = true;
|
cserver.fUsePro = true;
|
||||||
break;
|
break;
|
||||||
|
} else if (!strcasecmp(argv[0],"min-clients-per-thread") && argc == 2) {
|
||||||
|
cserver.thread_min_client_threshold = atoi(argv[1]);
|
||||||
|
if (cserver.thread_min_client_threshold < 0 || cserver.thread_min_client_threshold > 400) {
|
||||||
|
err = "min-thread-client must be between 0 and 400"; goto loaderr;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
err = "Bad directive or wrong number of arguments"; goto loaderr;
|
err = "Bad directive or wrong number of arguments"; goto loaderr;
|
||||||
}
|
}
|
||||||
|
@ -1003,6 +1003,33 @@ int clientHasPendingReplies(client *c) {
|
|||||||
return (c->bufpos || listLength(c->reply)) && !(c->flags & CLIENT_CLOSE_ASAP);
|
return (c->bufpos || listLength(c->reply)) && !(c->flags & CLIENT_CLOSE_ASAP);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int chooseBestThreadForAccept(int ielCur)
|
||||||
|
{
|
||||||
|
listIter li;
|
||||||
|
listNode *ln;
|
||||||
|
int rgcclients[MAX_EVENT_LOOPS] = {0};
|
||||||
|
|
||||||
|
listRewind(g_pserver->clients, &li);
|
||||||
|
while ((ln = listNext(&li)) != nullptr)
|
||||||
|
{
|
||||||
|
client *c = (client*)listNodeValue(ln);
|
||||||
|
if (c->iel < 0)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
rgcclients[c->iel]++;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ielMinLoad = 0;
|
||||||
|
for (int iel = 0; iel < cserver.cthreads; ++iel)
|
||||||
|
{
|
||||||
|
if (rgcclients[iel] < cserver.thread_min_client_threshold)
|
||||||
|
return iel;
|
||||||
|
if (rgcclients[iel] < rgcclients[ielMinLoad])
|
||||||
|
ielMinLoad = iel;
|
||||||
|
}
|
||||||
|
return ielMinLoad;
|
||||||
|
}
|
||||||
|
|
||||||
#define MAX_ACCEPTS_PER_CALL 1000
|
#define MAX_ACCEPTS_PER_CALL 1000
|
||||||
static void acceptCommonHandler(int fd, int flags, char *ip, int iel) {
|
static void acceptCommonHandler(int fd, int flags, char *ip, int iel) {
|
||||||
client *c;
|
client *c;
|
||||||
@ -1105,7 +1132,22 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
|
|
||||||
if (!g_fTestMode)
|
if (!g_fTestMode)
|
||||||
{
|
{
|
||||||
// We always accept on the same thread
|
{
|
||||||
|
int ielTarget = chooseBestThreadForAccept(ielCur);
|
||||||
|
if (ielTarget != ielCur)
|
||||||
|
{
|
||||||
|
char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL);
|
||||||
|
memcpy(szT, cip, NET_IP_STR_LEN);
|
||||||
|
int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget, szT]{
|
||||||
|
acceptCommonHandler(cfd,0,szT, ielTarget);
|
||||||
|
zfree(szT);
|
||||||
|
});
|
||||||
|
|
||||||
|
if (res == AE_OK)
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
LLocalThread:
|
LLocalThread:
|
||||||
aeAcquireLock();
|
aeAcquireLock();
|
||||||
acceptCommonHandler(cfd,0,cip, ielCur);
|
acceptCommonHandler(cfd,0,cip, ielCur);
|
||||||
@ -1122,10 +1164,15 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
goto LLocalThread;
|
goto LLocalThread;
|
||||||
char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL);
|
char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL);
|
||||||
memcpy(szT, cip, NET_IP_STR_LEN);
|
memcpy(szT, cip, NET_IP_STR_LEN);
|
||||||
aePostFunction(g_pserver->rgthreadvar[iel].el, [cfd, iel, szT]{
|
int res = aePostFunction(g_pserver->rgthreadvar[iel].el, [cfd, iel, szT]{
|
||||||
acceptCommonHandler(cfd,0,szT, iel);
|
acceptCommonHandler(cfd,0,szT, iel);
|
||||||
zfree(szT);
|
zfree(szT);
|
||||||
});
|
});
|
||||||
|
if (res != AE_OK)
|
||||||
|
{
|
||||||
|
zfree(szT);
|
||||||
|
goto LLocalThread;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1151,13 +1198,16 @@ void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
int ielTarget = rand() % cserver.cthreads;
|
int ielTarget = rand() % cserver.cthreads;
|
||||||
if (ielTarget == ielCur)
|
if (ielTarget == ielCur)
|
||||||
{
|
{
|
||||||
|
LLocalThread:
|
||||||
acceptCommonHandler(cfd,CLIENT_UNIX_SOCKET,NULL, ielCur);
|
acceptCommonHandler(cfd,CLIENT_UNIX_SOCKET,NULL, ielCur);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget]{
|
int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget]{
|
||||||
acceptCommonHandler(cfd,CLIENT_UNIX_SOCKET,NULL, ielTarget);
|
acceptCommonHandler(cfd,CLIENT_UNIX_SOCKET,NULL, ielTarget);
|
||||||
});
|
});
|
||||||
|
if (res != AE_OK)
|
||||||
|
goto LLocalThread;
|
||||||
}
|
}
|
||||||
aeReleaseLock();
|
aeReleaseLock();
|
||||||
|
|
||||||
@ -2529,7 +2579,7 @@ NULL
|
|||||||
{
|
{
|
||||||
int iel = client->iel;
|
int iel = client->iel;
|
||||||
freeClientAsync(client);
|
freeClientAsync(client);
|
||||||
aePostFunction(g_pserver->rgthreadvar[client->iel].el, [iel] {
|
aePostFunction(g_pserver->rgthreadvar[client->iel].el, [iel] { // note: failure is OK
|
||||||
freeClientsInAsyncFreeQueue(iel);
|
freeClientsInAsyncFreeQueue(iel);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -1604,6 +1604,7 @@ struct redisServerConst {
|
|||||||
|
|
||||||
unsigned char uuid[UUID_BINARY_LEN]; /* This server's UUID - populated on boot */
|
unsigned char uuid[UUID_BINARY_LEN]; /* This server's UUID - populated on boot */
|
||||||
bool fUsePro = false;
|
bool fUsePro = false;
|
||||||
|
int thread_min_client_threshold = 50;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct redisServer {
|
struct redisServer {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user