Merge commit 'cb683a84f7a9d530bec629f2b656e7d0842a0f75' into unstable

Former-commit-id: 817a11ff110772893eda0675912bbb0cfc1fca74
This commit is contained in:
John Sully 2020-05-22 15:56:35 -04:00
commit 67b7d512cb
17 changed files with 262 additions and 22 deletions

View File

@ -1782,6 +1782,32 @@ rdb-save-incremental-fsync yes
# the main dictionary scan
# active-defrag-max-scan-fields 1000
# It is possible to pin different threads and processes of Redis to specific
# CPUs in your system, in order to maximize the performances of the server.
# This is useful both in order to pin different Redis threads in different
# CPUs, but also in order to make sure that multiple Redis instances running
# in the same host will be pinned to different CPUs.
#
# Normally you can do this using the "taskset" command, however it is also
# possible to this via Redis configuration directly, both in Linux and FreeBSD.
#
# You can pin the server/IO threads, bio threads, aof rewrite child process, and
# the bgsave child process. The syntax to specify the cpu list is the same as
# the taskset command:
#
# Set redis server/io threads to cpu affinity 0,2,4,6:
# server_cpulist 0-7:2
#
# Set bio threads to cpu affinity 1,3:
# bio_cpulist 1,3
#
# Set aof rewrite child process to cpu affinity 8,9,10,11:
# aof_rewrite_cpulist 8-11
#
# Set bgsave child process to cpu affinity 1,10,11
# bgsave_cpulist 1,10-11
# Path to directory for file backed scratchpad. The file backed scratchpad
# reduces memory requirements by storing rarely accessed data on disk
# instead of RAM. A temporary file will be created in this directory.

View File

@ -252,7 +252,7 @@ endif
REDIS_SERVER_NAME=keydb-server
REDIS_SENTINEL_NAME=keydb-sentinel
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o cron.o connection.o tls.o sha256.o motd.o timeout.o $(ASM_OBJ)
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o cron.o connection.o tls.o sha256.o motd.o timeout.o setcpuaffinity.o $(ASM_OBJ)
REDIS_CLI_NAME=keydb-cli
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crcspeed.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o new.o motd.o $(ASM_OBJ)
REDIS_BENCHMARK_NAME=keydb-benchmark

View File

@ -1711,6 +1711,7 @@ int rewriteAppendOnlyFileBackground(void) {
/* Child */
redisSetProcTitle("keydb-aof-rewrite");
redisSetCpuAffinity(g_pserver->aof_rewrite_cpulist);
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
sendChildCOWInfo(CHILD_INFO_TYPE_AOF, "AOF rewrite");

View File

@ -166,6 +166,8 @@ void *bioProcessBackgroundJobs(void *arg) {
break;
}
redisSetCpuAffinity(g_pserver->bio_cpulist);
/* Make the thread killable at any time, so that bioKillThreads()
* can work reliably. */
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);

View File

@ -2232,6 +2232,10 @@ standardConfig configs[] = {
createStringConfig("syslog-ident", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, g_pserver->syslog_ident, "redis", NULL, NULL),
createStringConfig("dbfilename", NULL, MODIFIABLE_CONFIG, ALLOW_EMPTY_STRING, g_pserver->rdb_filename, CONFIG_DEFAULT_RDB_FILENAME, isValidDBfilename, NULL),
createStringConfig("appendfilename", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, g_pserver->aof_filename, "appendonly.aof", isValidAOFfilename, NULL),
createStringConfig("server_cpulist", NULL, IMMUTABLE_CONFIG, EMPTY_STRING_IS_NULL, g_pserver->server_cpulist, NULL, NULL, NULL),
createStringConfig("bio_cpulist", NULL, IMMUTABLE_CONFIG, EMPTY_STRING_IS_NULL, g_pserver->bio_cpulist, NULL, NULL, NULL),
createStringConfig("aof_rewrite_cpulist", NULL, IMMUTABLE_CONFIG, EMPTY_STRING_IS_NULL, g_pserver->aof_rewrite_cpulist, NULL, NULL, NULL),
createStringConfig("bgsave_cpulist", NULL, IMMUTABLE_CONFIG, EMPTY_STRING_IS_NULL, g_pserver->bgsave_cpulist, NULL, NULL, NULL),
/* Enum Configs */
createEnumConfig("supervised", NULL, IMMUTABLE_CONFIG, supervised_mode_enum, cserver.supervised_mode, SUPERVISED_NONE, NULL, NULL),

View File

@ -236,9 +236,12 @@ void setproctitle(const char *fmt, ...);
#ifdef __linux__
#define redis_set_thread_title(name) pthread_setname_np(pthread_self(), name)
#else
#if (defined __NetBSD__ || defined __FreeBSD__ || defined __OpenBSD__)
#if (defined __FreeBSD__ || defined __OpenBSD__)
#include <pthread_np.h>
#define redis_set_thread_title(name) pthread_set_name_np(pthread_self(), name)
#elif defined __NetBSD__
#include <pthread.h>
#define redis_set_thread_title(name) pthread_setname_np(pthread_self(), name, NULL)
#else
#if (defined __APPLE__ && defined(MAC_OS_X_VERSION_10_7))
#ifdef __cplusplus
@ -253,4 +256,13 @@ int pthread_setname_np(const char *name);
#endif
#endif
/* Check if we can use setcpuaffinity(). */
#if (defined __linux || defined __NetBSD__ || defined __FreeBSD__ || defined __OpenBSD__)
#define USE_SETCPUAFFINITY
#ifdef __cplusplus
extern "C"
#endif
void setcpuaffinity(const char *cpulist);
#endif
#endif

View File

@ -766,8 +766,8 @@ dictEntry *dictGetFairRandomKey(dict *d) {
/* Function to reverse bits. Algorithm from:
* http://graphics.stanford.edu/~seander/bithacks.html#ReverseParallel */
static unsigned long rev(unsigned long v) {
unsigned long s = 8 * sizeof(v); // bit size; must be power of 2
unsigned long mask = ~0;
unsigned long s = CHAR_BIT * sizeof(v); // bit size; must be power of 2
unsigned long mask = ~0UL;
while ((s >>= 1) > 0) {
mask ^= (mask << s);
v = ((v >> s) & mask) | ((v << s) & ~mask);

View File

@ -175,7 +175,11 @@ void execCommand(client *c) {
* This way we'll deliver the MULTI/..../EXEC block as a whole and
* both the AOF and the replication link will have the same consistency
* and atomicity guarantees. */
if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN)) && !(FInReplicaReplay())) {
if (!must_propagate &&
!g_pserver->loading &&
!(c->cmd->flags & (CMD_READONLY|CMD_ADMIN)) &&
!(FInReplicaReplay()))
{
execCommandPropagateMulti(c);
must_propagate = 1;
}

View File

@ -508,12 +508,35 @@ void addReplyErrorLengthCore(client *c, const char *s, size_t len, bool fAsync)
serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error "
"to its %s: '%s' after processing the command "
"'%s'", from, to, s, cmdname);
if (ctype == CLIENT_TYPE_MASTER && g_pserver->repl_backlog &&
g_pserver->repl_backlog_histlen > 0)
{
long long dumplen = 256;
if (g_pserver->repl_backlog_histlen < dumplen)
dumplen = g_pserver->repl_backlog_histlen;
if (c->querybuf && sdslen(c->querybuf)) {
std::string str = escapeString(c->querybuf);
printf("\tquerybuf: %s\n", str.c_str());
/* Identify the first byte to dump. */
long long idx =
(g_pserver->repl_backlog_idx + (g_pserver->repl_backlog_size - dumplen)) %
g_pserver->repl_backlog_size;
/* Scan the circular buffer to collect 'dumplen' bytes. */
sds dump = sdsempty();
while(dumplen) {
long long thislen =
((g_pserver->repl_backlog_size - idx) < dumplen) ?
(g_pserver->repl_backlog_size - idx) : dumplen;
dump = sdscatrepr(dump,g_pserver->repl_backlog+idx,thislen);
dumplen -= thislen;
idx = 0;
}
/* Finally log such bytes: this is vital debugging info to
* understand what happened. */
serverLog(LL_WARNING,"Latest backlog is: '%s'", dump);
sdsfree(dump);
}
g_pserver->stat_unexpected_error_replies++;
}
}
@ -1633,7 +1656,7 @@ fastlock lockasyncfree {"async free lock"};
* a context where calling freeClient() is not possible, because the client
* should be valid for the continuation of the flow of the program. */
void freeClientAsync(client *c) {
/* We need to handle concurrent access to the server.clients_to_close list
/* We need to handle concurrent access to the g_pserver->clients_to_close list
* only in the freeClientAsync() function, since it's the only function that
* may access the list while Redis uses I/O threads. All the other accesses
* are in the context of the main thread while the other threads are

View File

@ -1433,6 +1433,7 @@ int rdbSaveBackground(rdbSaveInfo *rsi) {
/* Child */
redisSetProcTitle("keydb-rdb-bgsave");
redisSetCpuAffinity(g_pserver->bgsave_cpulist);
retval = rdbSave(rsi);
if (retval == C_OK) {
sendChildCOWInfo(CHILD_INFO_TYPE_RDB, "RDB");
@ -2658,6 +2659,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
rioInitWithFd(&rdb,g_pserver->rdb_pipe_write);
redisSetProcTitle("keydb-rdb-to-slaves");
redisSetCpuAffinity(g_pserver->bgsave_cpulist);
retval = rdbSaveRioWithEOFMark(&rdb,NULL,rsi);
if (retval == C_OK && rioFlush(&rdb) == 0)

View File

@ -96,6 +96,7 @@ static struct config {
sds dbnumstr;
char *tests;
char *auth;
const char *user;
int precision;
int num_threads;
struct benchmarkThread **threads;
@ -264,7 +265,10 @@ static redisConfig *getRedisConfig(const char *ip, int port,
if(config.auth) {
void *authReply = NULL;
redisAppendCommand(c, "AUTH %s", config.auth);
if (config.user == NULL)
redisAppendCommand(c, "AUTH %s", config.auth);
else
redisAppendCommand(c, "AUTH %s %s", config.user, config.auth);
if (REDIS_OK != redisGetReply(c, &authReply)) goto fail;
if (reply) freeReplyObject(reply);
reply = ((redisReply *) authReply);
@ -633,7 +637,12 @@ static client createClient(const char *cmd, size_t len, client from, int thread_
c->prefix_pending = 0;
if (config.auth) {
char *buf = NULL;
int len = redisFormatCommand(&buf, "AUTH %s", config.auth);
int len;
if (config.user == NULL)
len = redisFormatCommand(&buf, "AUTH %s", config.auth);
else
len = redisFormatCommand(&buf, "AUTH %s %s",
config.user, config.auth);
c->obuf = sdscatlen(c->obuf, buf, len);
free(buf);
c->prefix_pending++;
@ -1305,6 +1314,9 @@ int parseOptions(int argc, const char **argv) {
} else if (!strcmp(argv[i],"-a") ) {
if (lastarg) goto invalid;
config.auth = strdup(argv[++i]);
} else if (!strcmp(argv[i],"--user")) {
if (lastarg) goto invalid;
config.user = argv[++i];
} else if (!strcmp(argv[i],"-d")) {
if (lastarg) goto invalid;
config.datasize = atoi(argv[++i]);
@ -1391,6 +1403,7 @@ usage:
" -p <port> Server port (default 6379)\n"
" -s <socket> Server socket (overrides host and port)\n"
" -a <password> Password for Redis Auth\n"
" --user <username> Used to send ACL style 'AUTH username pass'. Needs -a.\n"
" -c <clients> Number of parallel connections (default 50)\n"
" -n <requests> Total number of requests (default 100000)\n"
" -d <size> Data size of SET/GET value in bytes (default 3)\n"

View File

@ -3124,7 +3124,6 @@ void initServer(void) {
scriptingInit(1);
slowlogInit();
latencyMonitorInit();
crc64_init();
}
/* Some steps in server initialization need to be done last (after modules
@ -3433,8 +3432,8 @@ void call(client *c, int flags) {
serverTL->fixed_time_expire++;
/* Sent the command to clients in MONITOR mode, only if the commands are
* not generated from reading an AOF. */
/* Send the command to clients in MONITOR mode if applicable.
* Administrative commands are considered too dangerous to be shown. */
if (listLength(g_pserver->monitors) &&
!g_pserver->loading &&
!(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
@ -4556,7 +4555,8 @@ sds genRedisInfoString(const char *section) {
"active_defrag_key_misses:%lld\r\n"
"tracking_total_keys:%lld\r\n"
"tracking_total_items:%llu\r\n"
"unexpected_error_replies:%lld\r\n",
"tracking_total_prefixes:%lld\r\n"
"unexpected_error_replies:%lld\r\n",
g_pserver->stat_numconnections,
g_pserver->stat_numcommands,
getInstantaneousMetric(STATS_METRIC_COMMAND),
@ -4586,6 +4586,7 @@ sds genRedisInfoString(const char *section) {
g_pserver->stat_active_defrag_key_misses,
(unsigned long long) trackingGetTotalKeys(),
(unsigned long long) trackingGetTotalItems(),
(unsigned long long) trackingGetTotalPrefixes(),
g_pserver->stat_unexpected_error_replies);
}
@ -5167,6 +5168,14 @@ void redisSetProcTitle(const char *title) {
#endif
}
void redisSetCpuAffinity(const char *cpulist) {
#ifdef USE_SETCPUAFFINITY
setcpuaffinity(cpulist);
#else
UNUSED(cpulist);
#endif
}
/*
* Check whether systemd or upstart have been used to start redis.
*/
@ -5353,6 +5362,7 @@ int main(int argc, char **argv) {
zmalloc_set_oom_handler(redisOutOfMemoryHandler);
srand(time(NULL)^getpid());
gettimeofday(&tv,NULL);
crc64_init();
uint8_t hashseed[16];
getRandomHexChars((char*)hashseed,sizeof(hashseed));
@ -5567,6 +5577,7 @@ int main(int argc, char **argv) {
serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", g_pserver->maxmemory);
}
redisSetCpuAffinity(g_pserver->server_cpulist);
aeReleaseLock(); //Finally we can dump the lock
moduleReleaseGIL(true);

View File

@ -1200,7 +1200,7 @@ typedef struct readyList {
no AUTH is needed, and every
connection is immediately
authenticated. */
typedef struct user {
typedef struct {
sds name; /* The username as an SDS string. */
uint64_t flags; /* See USER_FLAG_* */
@ -1984,6 +1984,11 @@ struct redisServer {
int tls_replication;
int tls_auth_clients;
redisTLSContextConfig tls_ctx_config;
/* cpu affinity */
char *server_cpulist; /* cpu affinity list of redis server main/io thread. */
char *bio_cpulist; /* cpu affinity list of bio thread. */
char *aof_rewrite_cpulist; /* cpu affinity list of aof rewrite process. */
char *bgsave_cpulist; /* cpu affinity list of bgsave process. */
};
typedef struct pubsubPattern {
@ -2140,6 +2145,7 @@ void exitFromChild(int retcode);
size_t redisPopcount(const void *s, long count);
void redisSetProcTitle(const char *title);
int redisCommunicateSystemd(const char *sd_notify_msg);
void redisSetCpuAffinity(const char *cpulist);
/* networking.c -- Networking and Client related operations */
client *createClient(connection *conn, int iel);
@ -2267,6 +2273,7 @@ void trackingInvalidateKeysOnFlush(int dbid);
void trackingLimitUsedSlots(void);
uint64_t trackingGetTotalItems(void);
uint64_t trackingGetTotalKeys(void);
uint64_t trackingGetTotalPrefixes(void);
void trackingBroadcastInvalidationMessages(void);
/* List data type */

129
src/setcpuaffinity.c Normal file
View File

@ -0,0 +1,129 @@
/* ==========================================================================
* setcpuaffinity.c - Linux/BSD setcpuaffinity.
* --------------------------------------------------------------------------
* Copyright (C) 2020 zhenwei pi
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to permit
* persons to whom the Software is furnished to do so, subject to the
* following conditions:
*
* The above copyright notice and this permission notice shall be included
* in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
* OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
* NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
* DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
* USE OR OTHER DEALINGS IN THE SOFTWARE.
* ==========================================================================
*/
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
#include <stdlib.h>
#include <string.h>
#include <ctype.h>
#ifdef __linux__
#include <sched.h>
#endif
#ifdef __FreeBSD__
#include <sys/param.h>
#include <sys/cpuset.h>
#endif
#include "config.h"
#ifdef USE_SETCPUAFFINITY
static const char *next_token(const char *q, int sep) {
if (q)
q = strchr(q, sep);
if (q)
q++;
return q;
}
static int next_num(const char *str, char **end, int *result) {
if (!str || *str == '\0' || !isdigit(*str))
return -1;
*result = strtoul(str, end, 10);
if (str == *end)
return -1;
return 0;
}
/* set current thread cpu affinity to cpu list, this function works like
* taskset command (actually cpulist parsing logic reference to util-linux).
* example of this function: "0,2,3", "0,2-3", "0-20:2". */
void setcpuaffinity(const char *cpulist) {
const char *p, *q;
char *end = NULL;
#ifdef __linux__
cpu_set_t cpuset;
#endif
#ifdef __FreeBSD__
cpuset_t cpuset;
#endif
if (!cpulist)
return;
CPU_ZERO(&cpuset);
q = cpulist;
while (p = q, q = next_token(q, ','), p) {
int a, b, s;
const char *c1, *c2;
if (next_num(p, &end, &a) != 0)
return;
b = a;
s = 1;
p = end;
c1 = next_token(p, '-');
c2 = next_token(p, ',');
if (c1 != NULL && (c2 == NULL || c1 < c2)) {
if (next_num(c1, &end, &b) != 0)
return;
c1 = end && *end ? next_token(end, ':') : NULL;
if (c1 != NULL && (c2 == NULL || c1 < c2)) {
if (next_num(c1, &end, &s) != 0)
return;
if (s == 0)
return;
}
}
if ((a > b))
return;
while (a <= b) {
CPU_SET(a, &cpuset);
a += s;
}
}
if (end && *end)
return;
#ifdef __linux__
sched_setaffinity(0, sizeof(cpuset), &cpuset);
#endif
#ifdef __FreeBSD__
cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID, -1, sizeof(cpuset), &cpuset);
#endif
}
#endif /* USE_SETCPUAFFINITY */

View File

@ -518,3 +518,8 @@ uint64_t trackingGetTotalKeys(void) {
if (TrackingTable == NULL) return 0;
return raxSize(TrackingTable);
}
uint64_t trackingGetTotalPrefixes(void) {
if (PrefixTable == NULL) return 0;
return raxSize(PrefixTable);
}

View File

@ -221,9 +221,6 @@ proc run_solo {name code} {
}
proc cleanup {} {
if {$::dont_clean} {
return
}
if {!$::quiet} {puts -nonewline "Cleanup: may take some time... "}
flush stdout
catch {exec rm -rf {*}[glob tests/tmp/redis.conf.*]}
@ -461,11 +458,11 @@ proc the_end {} {
foreach failed $::failed_tests {
puts "*** $failed"
}
cleanup
if {!$::dont_clean} cleanup
exit 1
} else {
puts "\n[colorstr bold-white {\o/}] [colorstr bold-green {All tests passed without errors!}]\n"
cleanup
if {!$::dont_clean} cleanup
exit 0
}
}

View File

@ -95,6 +95,10 @@ start_server {tags {"introspection"}} {
bind
requirepass
multi-master
server_cpulist
bio_cpulist
aof_rewrite_cpulist
bgsave_cpulist
}
set configs {}