Thread affinity feature (default disabled)

Former-commit-id: 9b1e95809ce57fea84f30b8d5fb68ee00bd46ceb
This commit is contained in:
John Sully 2019-02-26 15:14:35 -05:00
parent a23fffc6d2
commit e0089b390b
5 changed files with 40 additions and 2 deletions

View File

@ -1503,3 +1503,7 @@ rdb-save-incremental-fsync yes
# of your network hardware, not the number of cores on your machine. We don't recommend going # of your network hardware, not the number of cores on your machine. We don't recommend going
# above 4 at this time. By default this is set 1. # above 4 at this time. By default this is set 1.
server-threads 2 server-threads 2
# Should KeyDB pin threads to CPUs? By default this is disabled, and KeyDB will not bind threads.
# When enabled threads are bount to cores sequentially starting at core 0.
# server-thread-affinity true

View File

@ -836,6 +836,15 @@ void loadServerConfigFromString(char *config) {
err = "Invalid number of threads specified"; err = "Invalid number of threads specified";
goto loaderr; goto loaderr;
} }
} else if (!strcasecmp(argv[0],"server-thread-affinity") && argc == 2) {
if (strcasecmp(argv[1], "true") == 0) {
server.fThreadAffinity = TRUE;
} else if (strcasecmp(argv[1], "false") == 0) {
server.fThreadAffinity = FALSE;
} else {
err = "Unknown argument: server-thread-affinity expects either true or false";
goto loaderr;
}
} else { } else {
err = "Bad directive or wrong number of arguments"; goto loaderr; err = "Bad directive or wrong number of arguments"; goto loaderr;
} }

View File

@ -1034,6 +1034,17 @@ static void acceptCommonHandler(int fd, int flags, char *ip, int iel) {
close(fd); /* May be already closed, just ignore errors */ close(fd); /* May be already closed, just ignore errors */
return; return;
} }
// Set thread affinity
if (server.fThreadAffinity)
{
int cpu = iel;
if (setsockopt(fd, SOL_SOCKET, SO_INCOMING_CPU, &cpu, sizeof(iel)) != 0)
{
serverLog(LL_WARNING, "Failed to set socket affinity");
}
}
/* If maxclient directive is set and this is one client more... close the /* If maxclient directive is set and this is one client more... close the
* connection. Note that we create the client instead to check before * connection. Note that we create the client instead to check before
* for this condition, since now the socket is already set in non-blocking * for this condition, since now the socket is already set in non-blocking

View File

@ -2460,6 +2460,7 @@ void initServerConfig(void) {
/* Multithreading */ /* Multithreading */
server.cthreads = CONFIG_DEFAULT_THREADS; server.cthreads = CONFIG_DEFAULT_THREADS;
server.fThreadAffinity = CONFIG_DEFAULT_THREAD_AFFINITY;
} }
extern char **environ; extern char **environ;
@ -5086,11 +5087,22 @@ int main(int argc, char **argv) {
serverAssert(server.cthreads > 0 && server.cthreads <= MAX_EVENT_LOOPS); serverAssert(server.cthreads > 0 && server.cthreads <= MAX_EVENT_LOOPS);
pthread_t rgthread[MAX_EVENT_LOOPS]; pthread_t rgthread[MAX_EVENT_LOOPS];
for (int iel = 1; iel < server.cthreads; ++iel) for (int iel = 0; iel < server.cthreads; ++iel)
{ {
pthread_create(rgthread + iel, NULL, workerThreadMain, (void*)((int64_t)iel)); pthread_create(rgthread + iel, NULL, workerThreadMain, (void*)((int64_t)iel));
if (server.fThreadAffinity)
{
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(iel, &cpuset);
if (pthread_setaffinity_np(rgthread[iel], sizeof(cpu_set_t), &cpuset) == 0)
{
serverLog(LOG_INFO, "Binding thread %d to cpu %d", iel, iel);
}
}
} }
workerThreadMain((void*)((int64_t)IDX_EVENT_LOOP_MAIN)); void *pvRet;
pthread_join(rgthread[IDX_EVENT_LOOP_MAIN], &pvRet);
return 0; return 0;
} }

View File

@ -183,6 +183,7 @@ extern "C" {
#define CONFIG_DEFAULT_PROTO_MAX_BULK_LEN (512ll*1024*1024) /* Bulk request max size */ #define CONFIG_DEFAULT_PROTO_MAX_BULK_LEN (512ll*1024*1024) /* Bulk request max size */
#define CONFIG_DEFAULT_THREADS 1 #define CONFIG_DEFAULT_THREADS 1
#define CONFIG_DEFAULT_THREAD_AFFINITY 0
#define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 20 /* Loopkups per loop. */ #define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 20 /* Loopkups per loop. */
#define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds */ #define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds */
@ -1070,6 +1071,7 @@ struct redisServer {
dict *orig_commands; /* Command table before command renaming. */ dict *orig_commands; /* Command table before command renaming. */
int cthreads; /* Number of main worker threads */ int cthreads; /* Number of main worker threads */
int fThreadAffinity; /* Should we pin threads to cores? */
struct redisServerThreadVars rgthreadvar[MAX_EVENT_LOOPS]; struct redisServerThreadVars rgthreadvar[MAX_EVENT_LOOPS];
unsigned int lruclock; /* Clock for LRU eviction */ unsigned int lruclock; /* Clock for LRU eviction */