Merge branch 'unstable' of github.com:/antirez/redis into unstable
This commit is contained in:
commit
28dd6356e1
48
.github/workflows/daily.yml
vendored
Normal file
48
.github/workflows/daily.yml
vendored
Normal file
@ -0,0 +1,48 @@
|
||||
name: Daily
|
||||
|
||||
on:
|
||||
schedule:
|
||||
- cron: '0 7 * * *'
|
||||
|
||||
jobs:
|
||||
test-jemalloc:
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 1200
|
||||
steps:
|
||||
- uses: actions/checkout@v1
|
||||
- name: make
|
||||
run: make
|
||||
- name: test
|
||||
run: |
|
||||
sudo apt-get install tcl8.5
|
||||
./runtest --accurate --verbose
|
||||
- name: module api test
|
||||
run: ./runtest-moduleapi --verbose
|
||||
|
||||
test-libc-malloc:
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 1200
|
||||
steps:
|
||||
- uses: actions/checkout@v1
|
||||
- name: make
|
||||
run: make MALLOC=libc
|
||||
- name: test
|
||||
run: |
|
||||
sudo apt-get install tcl8.5
|
||||
./runtest --accurate --verbose
|
||||
- name: module api test
|
||||
run: ./runtest-moduleapi --verbose
|
||||
|
||||
test-valgrind:
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 14400
|
||||
steps:
|
||||
- uses: actions/checkout@v1
|
||||
- name: make
|
||||
run: make valgrind
|
||||
- name: test
|
||||
run: |
|
||||
sudo apt-get install tcl8.5 valgrind -y
|
||||
./runtest --valgrind --verbose --clients 1
|
||||
- name: module api test
|
||||
run: ./runtest-moduleapi --valgrind --verbose --clients 1
|
15
redis.conf
15
redis.conf
@ -1780,3 +1780,18 @@ rdb-save-incremental-fsync yes
|
||||
# Maximum number of set/hash/zset/list fields that will be processed from
|
||||
# the main dictionary scan
|
||||
# active-defrag-max-scan-fields 1000
|
||||
|
||||
# Redis server/IO threads, bio threads, aof rewrite child process, and bgsave
|
||||
# child process cpu affinity list config. syntax of cpu list looks like taskset
|
||||
# command. serveral examples:
|
||||
# set redis server/io threads to cpu affinity 0,2,4,6
|
||||
# server_cpulist 0-7:2
|
||||
#
|
||||
# set bio threads to cpu affinity 1,3
|
||||
# bio_cpulist 1,3
|
||||
#
|
||||
# set aof rewrite child process to cpu affinity 8,9,10,11
|
||||
# aof_rewrite_cpulist 8-11
|
||||
#
|
||||
# set bgsave child process to cpu affinity 1,10,11
|
||||
# bgsave_cpulist 1,10-11
|
||||
|
@ -206,7 +206,7 @@ endif
|
||||
|
||||
REDIS_SERVER_NAME=redis-server
|
||||
REDIS_SENTINEL_NAME=redis-sentinel
|
||||
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o gopher.o tracking.o connection.o tls.o sha256.o timeout.o
|
||||
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o gopher.o tracking.o connection.o tls.o sha256.o timeout.o setcpuaffinity.o
|
||||
REDIS_CLI_NAME=redis-cli
|
||||
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o ae.o crcspeed.o crc64.o siphash.o crc16.o
|
||||
REDIS_BENCHMARK_NAME=redis-benchmark
|
||||
|
@ -1596,6 +1596,7 @@ int rewriteAppendOnlyFileBackground(void) {
|
||||
|
||||
/* Child */
|
||||
redisSetProcTitle("redis-aof-rewrite");
|
||||
redisSetCpuAffinity(server.aof_rewrite_cpulist);
|
||||
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
|
||||
if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
|
||||
sendChildCOWInfo(CHILD_INFO_TYPE_AOF, "AOF rewrite");
|
||||
|
@ -166,6 +166,8 @@ void *bioProcessBackgroundJobs(void *arg) {
|
||||
break;
|
||||
}
|
||||
|
||||
redisSetCpuAffinity(server.bio_cpulist);
|
||||
|
||||
/* Make the thread killable at any time, so that bioKillThreads()
|
||||
* can work reliably. */
|
||||
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
|
||||
|
@ -371,9 +371,10 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
|
||||
int noack = 0;
|
||||
|
||||
if (group) {
|
||||
consumer = streamLookupConsumer(group,
|
||||
consumer =
|
||||
streamLookupConsumer(group,
|
||||
receiver->bpop.xread_consumer->ptr,
|
||||
1);
|
||||
SLC_NONE);
|
||||
noack = receiver->bpop.xread_group_noack;
|
||||
}
|
||||
|
||||
|
@ -2133,6 +2133,10 @@ standardConfig configs[] = {
|
||||
createStringConfig("syslog-ident", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.syslog_ident, "redis", NULL, NULL),
|
||||
createStringConfig("dbfilename", NULL, MODIFIABLE_CONFIG, ALLOW_EMPTY_STRING, server.rdb_filename, "dump.rdb", isValidDBfilename, NULL),
|
||||
createStringConfig("appendfilename", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.aof_filename, "appendonly.aof", isValidAOFfilename, NULL),
|
||||
createStringConfig("server_cpulist", NULL, IMMUTABLE_CONFIG, EMPTY_STRING_IS_NULL, server.server_cpulist, NULL, NULL, NULL),
|
||||
createStringConfig("bio_cpulist", NULL, IMMUTABLE_CONFIG, EMPTY_STRING_IS_NULL, server.bio_cpulist, NULL, NULL, NULL),
|
||||
createStringConfig("aof_rewrite_cpulist", NULL, IMMUTABLE_CONFIG, EMPTY_STRING_IS_NULL, server.aof_rewrite_cpulist, NULL, NULL, NULL),
|
||||
createStringConfig("bgsave_cpulist", NULL, IMMUTABLE_CONFIG, EMPTY_STRING_IS_NULL, server.bgsave_cpulist, NULL, NULL, NULL),
|
||||
|
||||
/* Enum Configs */
|
||||
createEnumConfig("supervised", NULL, IMMUTABLE_CONFIG, supervised_mode_enum, server.supervised_mode, SUPERVISED_NONE, NULL, NULL),
|
||||
|
@ -244,4 +244,10 @@ int pthread_setname_np(const char *name);
|
||||
#endif
|
||||
#endif
|
||||
|
||||
/* Check if we can use setcpuaffinity(). */
|
||||
#if (defined __linux || defined __NetBSD__ || defined __FreeBSD__ || defined __OpenBSD__)
|
||||
#define USE_SETCPUAFFINITY
|
||||
void setcpuaffinity(const char *cpulist);
|
||||
#endif
|
||||
|
||||
#endif
|
||||
|
@ -1636,7 +1636,7 @@ void enableWatchdog(int period) {
|
||||
/* Watchdog was actually disabled, so we have to setup the signal
|
||||
* handler. */
|
||||
sigemptyset(&act.sa_mask);
|
||||
act.sa_flags = SA_ONSTACK | SA_SIGINFO;
|
||||
act.sa_flags = SA_SIGINFO;
|
||||
act.sa_sigaction = watchdogSignalHandler;
|
||||
sigaction(SIGALRM, &act, NULL);
|
||||
}
|
||||
|
@ -436,6 +436,34 @@ void addReplyStatusFormat(client *c, const char *fmt, ...) {
|
||||
sdsfree(s);
|
||||
}
|
||||
|
||||
/* Sometimes we are forced to create a new reply node, and we can't append to
|
||||
* the previous one, when that happens, we wanna try to trim the unused space
|
||||
* at the end of the last reply node which we won't use anymore. */
|
||||
void trimReplyUnusedTailSpace(client *c) {
|
||||
listNode *ln = listLast(c->reply);
|
||||
clientReplyBlock *tail = ln? listNodeValue(ln): NULL;
|
||||
|
||||
/* Note that 'tail' may be NULL even if we have a tail node, becuase when
|
||||
* addDeferredMultiBulkLength() is used */
|
||||
if (!tail) return;
|
||||
|
||||
/* We only try to trim the space is relatively high (more than a 1/4 of the
|
||||
* allocation), otherwise there's a high chance realloc will NOP.
|
||||
* Also, to avoid large memmove which happens as part of realloc, we only do
|
||||
* that if the used part is small. */
|
||||
if (tail->size - tail->used > tail->size / 4 &&
|
||||
tail->used < PROTO_REPLY_CHUNK_BYTES)
|
||||
{
|
||||
size_t old_size = tail->size;
|
||||
tail = zrealloc(tail, tail->used + sizeof(clientReplyBlock));
|
||||
/* take over the allocation's internal fragmentation (at least for
|
||||
* memory usage tracking) */
|
||||
tail->size = zmalloc_usable(tail) - sizeof(clientReplyBlock);
|
||||
c->reply_bytes += tail->size - old_size;
|
||||
listNodeValue(ln) = tail;
|
||||
}
|
||||
}
|
||||
|
||||
/* Adds an empty object to the reply list that will contain the multi bulk
|
||||
* length, which is not known when this function is called. */
|
||||
void *addReplyDeferredLen(client *c) {
|
||||
@ -443,6 +471,7 @@ void *addReplyDeferredLen(client *c) {
|
||||
* ready to be sent, since we are sure that before returning to the
|
||||
* event loop setDeferredAggregateLen() will be called. */
|
||||
if (prepareClientToWrite(c) != C_OK) return NULL;
|
||||
trimReplyUnusedTailSpace(c);
|
||||
listAddNodeTail(c->reply,NULL); /* NULL is our placeholder. */
|
||||
return listLast(c->reply);
|
||||
}
|
||||
@ -2843,6 +2872,7 @@ void *IOThreadMain(void *myid) {
|
||||
|
||||
snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
|
||||
redis_set_thread_title(thdname);
|
||||
redisSetCpuAffinity(server.server_cpulist);
|
||||
|
||||
while(1) {
|
||||
/* Wait for start */
|
||||
|
66
src/rdb.c
66
src/rdb.c
@ -1351,6 +1351,7 @@ int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
|
||||
|
||||
/* Child */
|
||||
redisSetProcTitle("redis-rdb-bgsave");
|
||||
redisSetCpuAffinity(server.bgsave_cpulist);
|
||||
retval = rdbSave(filename,rsi);
|
||||
if (retval == C_OK) {
|
||||
sendChildCOWInfo(CHILD_INFO_TYPE_RDB, "RDB");
|
||||
@ -1441,7 +1442,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) {
|
||||
|
||||
/* Load every single element of the list */
|
||||
while(len--) {
|
||||
if ((ele = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL;
|
||||
if ((ele = rdbLoadEncodedStringObject(rdb)) == NULL) {
|
||||
decrRefCount(o);
|
||||
return NULL;
|
||||
}
|
||||
dec = getDecodedObject(ele);
|
||||
size_t len = sdslen(dec->ptr);
|
||||
quicklistPushTail(o->ptr, dec->ptr, len);
|
||||
@ -1468,8 +1472,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) {
|
||||
long long llval;
|
||||
sds sdsele;
|
||||
|
||||
if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))
|
||||
== NULL) return NULL;
|
||||
if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) {
|
||||
decrRefCount(o);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (o->encoding == OBJ_ENCODING_INTSET) {
|
||||
/* Fetch integer value from element. */
|
||||
@ -1508,13 +1514,23 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) {
|
||||
double score;
|
||||
zskiplistNode *znode;
|
||||
|
||||
if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))
|
||||
== NULL) return NULL;
|
||||
if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) {
|
||||
decrRefCount(o);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (rdbtype == RDB_TYPE_ZSET_2) {
|
||||
if (rdbLoadBinaryDoubleValue(rdb,&score) == -1) return NULL;
|
||||
if (rdbLoadBinaryDoubleValue(rdb,&score) == -1) {
|
||||
decrRefCount(o);
|
||||
sdsfree(sdsele);
|
||||
return NULL;
|
||||
}
|
||||
} else {
|
||||
if (rdbLoadDoubleValue(rdb,&score) == -1) return NULL;
|
||||
if (rdbLoadDoubleValue(rdb,&score) == -1) {
|
||||
decrRefCount(o);
|
||||
sdsfree(sdsele);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
/* Don't care about integer-encoded strings. */
|
||||
@ -1546,10 +1562,15 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) {
|
||||
while (o->encoding == OBJ_ENCODING_ZIPLIST && len > 0) {
|
||||
len--;
|
||||
/* Load raw strings */
|
||||
if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))
|
||||
== NULL) return NULL;
|
||||
if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))
|
||||
== NULL) return NULL;
|
||||
if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) {
|
||||
decrRefCount(o);
|
||||
return NULL;
|
||||
}
|
||||
if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) {
|
||||
sdsfree(field);
|
||||
decrRefCount(o);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Add pair to ziplist */
|
||||
o->ptr = ziplistPush(o->ptr, (unsigned char*)field,
|
||||
@ -1577,10 +1598,15 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) {
|
||||
while (o->encoding == OBJ_ENCODING_HT && len > 0) {
|
||||
len--;
|
||||
/* Load encoded strings */
|
||||
if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))
|
||||
== NULL) return NULL;
|
||||
if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))
|
||||
== NULL) return NULL;
|
||||
if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) {
|
||||
decrRefCount(o);
|
||||
return NULL;
|
||||
}
|
||||
if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) {
|
||||
sdsfree(field);
|
||||
decrRefCount(o);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Add pair to hash table */
|
||||
ret = dictAdd((dict*)o->ptr, field, value);
|
||||
@ -1600,7 +1626,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) {
|
||||
while (len--) {
|
||||
unsigned char *zl =
|
||||
rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,NULL);
|
||||
if (zl == NULL) return NULL;
|
||||
if (zl == NULL) {
|
||||
decrRefCount(o);
|
||||
return NULL;
|
||||
}
|
||||
quicklistAppendZiplist(o->ptr, zl);
|
||||
}
|
||||
} else if (rdbtype == RDB_TYPE_HASH_ZIPMAP ||
|
||||
@ -1823,8 +1852,8 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) {
|
||||
decrRefCount(o);
|
||||
return NULL;
|
||||
}
|
||||
streamConsumer *consumer = streamLookupConsumer(cgroup,cname,
|
||||
1);
|
||||
streamConsumer *consumer =
|
||||
streamLookupConsumer(cgroup,cname,SLC_NONE);
|
||||
sdsfree(cname);
|
||||
consumer->seen_time = rdbLoadMillisecondTime(rdb,RDB_VERSION);
|
||||
if (rioGetReadError(rdb)) {
|
||||
@ -2460,6 +2489,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
|
||||
rioInitWithFd(&rdb,server.rdb_pipe_write);
|
||||
|
||||
redisSetProcTitle("redis-rdb-to-slaves");
|
||||
redisSetCpuAffinity(server.bgsave_cpulist);
|
||||
|
||||
retval = rdbSaveRioWithEOFMark(&rdb,NULL,rsi);
|
||||
if (retval == C_OK && rioFlush(&rdb) == 0)
|
||||
|
@ -4850,6 +4850,14 @@ void redisSetProcTitle(char *title) {
|
||||
#endif
|
||||
}
|
||||
|
||||
void redisSetCpuAffinity(const char *cpulist) {
|
||||
#ifdef USE_SETCPUAFFINITY
|
||||
setcpuaffinity(cpulist);
|
||||
#else
|
||||
UNUSED(cpulist);
|
||||
#endif
|
||||
}
|
||||
|
||||
/*
|
||||
* Check whether systemd or upstart have been used to start redis.
|
||||
*/
|
||||
@ -5118,6 +5126,7 @@ int main(int argc, char **argv) {
|
||||
serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);
|
||||
}
|
||||
|
||||
redisSetCpuAffinity(server.server_cpulist);
|
||||
aeSetBeforeSleepProc(server.el,beforeSleep);
|
||||
aeSetAfterSleepProc(server.el,afterSleep);
|
||||
aeMain(server.el);
|
||||
|
@ -1433,6 +1433,11 @@ struct redisServer {
|
||||
int tls_replication;
|
||||
int tls_auth_clients;
|
||||
redisTLSContextConfig tls_ctx_config;
|
||||
/* cpu affinity */
|
||||
char *server_cpulist; /* cpu affinity list of redis server main/io thread. */
|
||||
char *bio_cpulist; /* cpu affinity list of bio thread. */
|
||||
char *aof_rewrite_cpulist; /* cpu affinity list of aof rewrite process. */
|
||||
char *bgsave_cpulist; /* cpu affinity list of bgsave process. */
|
||||
};
|
||||
|
||||
typedef struct pubsubPattern {
|
||||
@ -1585,6 +1590,7 @@ void exitFromChild(int retcode);
|
||||
size_t redisPopcount(void *s, long count);
|
||||
void redisSetProcTitle(char *title);
|
||||
int redisCommunicateSystemd(const char *sd_notify_msg);
|
||||
void redisSetCpuAffinity(const char *cpulist);
|
||||
|
||||
/* networking.c -- Networking and Client related operations */
|
||||
client *createClient(connection *conn);
|
||||
|
129
src/setcpuaffinity.c
Normal file
129
src/setcpuaffinity.c
Normal file
@ -0,0 +1,129 @@
|
||||
/* ==========================================================================
|
||||
* setproctitle.c - Linux/BSD setcpuaffinity.
|
||||
* --------------------------------------------------------------------------
|
||||
* Copyright (C) 2020 zhenwei pi
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person obtaining a
|
||||
* copy of this software and associated documentation files (the
|
||||
* "Software"), to deal in the Software without restriction, including
|
||||
* without limitation the rights to use, copy, modify, merge, publish,
|
||||
* distribute, sublicense, and/or sell copies of the Software, and to permit
|
||||
* persons to whom the Software is furnished to do so, subject to the
|
||||
* following conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be included
|
||||
* in all copies or substantial portions of the Software.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
* OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
|
||||
* NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
|
||||
* DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
|
||||
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
|
||||
* USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
* ==========================================================================
|
||||
*/
|
||||
#ifndef _GNU_SOURCE
|
||||
#define _GNU_SOURCE
|
||||
#endif
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <ctype.h>
|
||||
#ifdef __linux__
|
||||
#include <sched.h>
|
||||
#endif
|
||||
#ifdef __FreeBSD__
|
||||
#include <sys/param.h>
|
||||
#include <sys/cpuset.h>
|
||||
#endif
|
||||
#include "config.h"
|
||||
|
||||
#ifdef USE_SETCPUAFFINITY
|
||||
static const char *next_token(const char *q, int sep) {
|
||||
if (q)
|
||||
q = strchr(q, sep);
|
||||
if (q)
|
||||
q++;
|
||||
|
||||
return q;
|
||||
}
|
||||
|
||||
static int next_num(const char *str, char **end, int *result) {
|
||||
if (!str || *str == '\0' || !isdigit(*str))
|
||||
return -1;
|
||||
|
||||
*result = strtoul(str, end, 10);
|
||||
if (str == *end)
|
||||
return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* set current thread cpu affinity to cpu list, this function works like
|
||||
* taskset command (actually cpulist parsing logic reference to util-linux).
|
||||
* example of this function: "0,2,3", "0,2-3", "0-20:2". */
|
||||
void setcpuaffinity(const char *cpulist) {
|
||||
const char *p, *q;
|
||||
char *end = NULL;
|
||||
#ifdef __linux__
|
||||
cpu_set_t cpuset;
|
||||
#endif
|
||||
#ifdef __FreeBSD__
|
||||
cpuset_t cpuset;
|
||||
#endif
|
||||
|
||||
if (!cpulist)
|
||||
return;
|
||||
|
||||
CPU_ZERO(&cpuset);
|
||||
|
||||
q = cpulist;
|
||||
while (p = q, q = next_token(q, ','), p) {
|
||||
int a, b, s;
|
||||
const char *c1, *c2;
|
||||
|
||||
if (next_num(p, &end, &a) != 0)
|
||||
return;
|
||||
|
||||
b = a;
|
||||
s = 1;
|
||||
p = end;
|
||||
|
||||
c1 = next_token(p, '-');
|
||||
c2 = next_token(p, ',');
|
||||
|
||||
if (c1 != NULL && (c2 == NULL || c1 < c2)) {
|
||||
if (next_num(c1, &end, &b) != 0)
|
||||
return;
|
||||
|
||||
c1 = end && *end ? next_token(end, ':') : NULL;
|
||||
if (c1 != NULL && (c2 == NULL || c1 < c2)) {
|
||||
if (next_num(c1, &end, &s) != 0)
|
||||
return;
|
||||
|
||||
if (s == 0)
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if ((a > b))
|
||||
return;
|
||||
|
||||
while (a <= b) {
|
||||
CPU_SET(a, &cpuset);
|
||||
a += s;
|
||||
}
|
||||
}
|
||||
|
||||
if (end && *end)
|
||||
return;
|
||||
|
||||
#ifdef __linux__
|
||||
sched_setaffinity(0, sizeof(cpuset), &cpuset);
|
||||
#endif
|
||||
#ifdef __FreeBSD__
|
||||
cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID, -1, sizeof(cpuset), &cpuset);
|
||||
#endif
|
||||
}
|
||||
|
||||
#endif /* USE_SETCPUAFFINITY */
|
@ -96,6 +96,11 @@ typedef struct streamPropInfo {
|
||||
/* Prototypes of exported APIs. */
|
||||
struct client;
|
||||
|
||||
/* Flags for streamLookupConsumer */
|
||||
#define SLC_NONE 0
|
||||
#define SLC_NOCREAT (1<<0) /* Do not create the consumer if it doesn't exist */
|
||||
#define SLC_NOREFRESH (1<<1) /* Do not update consumer's seen-time */
|
||||
|
||||
stream *streamNew(void);
|
||||
void freeStream(stream *s);
|
||||
unsigned long streamLength(const robj *subject);
|
||||
@ -105,7 +110,7 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
|
||||
void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen);
|
||||
void streamIteratorStop(streamIterator *si);
|
||||
streamCG *streamLookupCG(stream *s, sds groupname);
|
||||
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create);
|
||||
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags);
|
||||
streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id);
|
||||
streamNACK *streamCreateNACK(streamConsumer *consumer);
|
||||
void streamDecodeID(void *buf, streamID *id);
|
||||
|
@ -1570,7 +1570,8 @@ void xreadCommand(client *c) {
|
||||
addReplyBulk(c,c->argv[streams_arg+i]);
|
||||
streamConsumer *consumer = NULL;
|
||||
if (groups) consumer = streamLookupConsumer(groups[i],
|
||||
consumername->ptr,1);
|
||||
consumername->ptr,
|
||||
SLC_NONE);
|
||||
streamPropInfo spi = {c->argv[i+streams_arg],groupname};
|
||||
int flags = 0;
|
||||
if (noack) flags |= STREAM_RWR_NOACK;
|
||||
@ -1706,7 +1707,9 @@ streamCG *streamLookupCG(stream *s, sds groupname) {
|
||||
* consumer does not exist it is automatically created as a side effect
|
||||
* of calling this function, otherwise its last seen time is updated and
|
||||
* the existing consumer reference returned. */
|
||||
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) {
|
||||
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags) {
|
||||
int create = !(flags & SLC_NOCREAT);
|
||||
int refresh = !(flags & SLC_NOREFRESH);
|
||||
streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name,
|
||||
sdslen(name));
|
||||
if (consumer == raxNotFound) {
|
||||
@ -1717,7 +1720,7 @@ streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) {
|
||||
raxInsert(cg->consumers,(unsigned char*)name,sdslen(name),
|
||||
consumer,NULL);
|
||||
}
|
||||
consumer->seen_time = mstime();
|
||||
if (refresh) consumer->seen_time = mstime();
|
||||
return consumer;
|
||||
}
|
||||
|
||||
@ -1725,7 +1728,8 @@ streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) {
|
||||
* may have pending messages: they are removed from the PEL, and the number
|
||||
* of pending messages "lost" is returned. */
|
||||
uint64_t streamDelConsumer(streamCG *cg, sds name) {
|
||||
streamConsumer *consumer = streamLookupConsumer(cg,name,0);
|
||||
streamConsumer *consumer =
|
||||
streamLookupConsumer(cg,name,SLC_NOCREAT|SLC_NOREFRESH);
|
||||
if (consumer == NULL) return 0;
|
||||
|
||||
uint64_t retval = raxSize(consumer->pel);
|
||||
@ -2068,16 +2072,19 @@ void xpendingCommand(client *c) {
|
||||
}
|
||||
/* XPENDING <key> <group> <start> <stop> <count> [<consumer>] variant. */
|
||||
else {
|
||||
streamConsumer *consumer = consumername ?
|
||||
streamLookupConsumer(group,consumername->ptr,0):
|
||||
NULL;
|
||||
streamConsumer *consumer = NULL;
|
||||
if (consumername) {
|
||||
consumer = streamLookupConsumer(group,
|
||||
consumername->ptr,
|
||||
SLC_NOCREAT|SLC_NOREFRESH);
|
||||
|
||||
/* If a consumer name was mentioned but it does not exist, we can
|
||||
* just return an empty array. */
|
||||
if (consumername && consumer == NULL) {
|
||||
if (consumer == NULL) {
|
||||
addReplyArrayLen(c,0);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
rax *pel = consumer ? consumer->pel : group->pel;
|
||||
unsigned char startkey[sizeof(streamID)];
|
||||
@ -2338,7 +2345,7 @@ void xclaimCommand(client *c) {
|
||||
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
|
||||
/* Update the consumer and idle time. */
|
||||
if (consumer == NULL)
|
||||
consumer = streamLookupConsumer(group,c->argv[3]->ptr,1);
|
||||
consumer = streamLookupConsumer(group,c->argv[3]->ptr,SLC_NONE);
|
||||
nack->consumer = consumer;
|
||||
nack->delivery_time = deliverytime;
|
||||
/* Set the delivery attempts counter if given, otherwise
|
||||
|
@ -54,6 +54,12 @@ tags {"aof"} {
|
||||
|
||||
set client [redis [dict get $srv host] [dict get $srv port] 0 $::tls]
|
||||
|
||||
wait_for_condition 50 100 {
|
||||
[catch {$client ping} e] == 0
|
||||
} else {
|
||||
fail "Loading DB is taking too much time."
|
||||
}
|
||||
|
||||
test "Truncated AOF loaded: we expect foo to be equal to 5" {
|
||||
assert {[$client get foo] eq "5"}
|
||||
}
|
||||
@ -71,6 +77,12 @@ tags {"aof"} {
|
||||
|
||||
set client [redis [dict get $srv host] [dict get $srv port] 0 $::tls]
|
||||
|
||||
wait_for_condition 50 100 {
|
||||
[catch {$client ping} e] == 0
|
||||
} else {
|
||||
fail "Loading DB is taking too much time."
|
||||
}
|
||||
|
||||
test "Truncated AOF loaded: we expect foo to be equal to 6 now" {
|
||||
assert {[$client get foo] eq "6"}
|
||||
}
|
||||
|
@ -118,7 +118,7 @@ start_server {tags {"repl"}} {
|
||||
# correctly the RDB file: such file will contain "lua" AUX
|
||||
# sections with scripts already in the memory of the master.
|
||||
|
||||
wait_for_condition 50 100 {
|
||||
wait_for_condition 500 100 {
|
||||
[s -1 master_link_status] eq {up}
|
||||
} else {
|
||||
fail "Replication not started."
|
||||
|
@ -94,6 +94,10 @@ start_server {tags {"introspection"}} {
|
||||
slaveof
|
||||
bind
|
||||
requirepass
|
||||
server_cpulist
|
||||
bio_cpulist
|
||||
aof_rewrite_cpulist
|
||||
bgsave_cpulist
|
||||
}
|
||||
|
||||
set configs {}
|
||||
|
Loading…
x
Reference in New Issue
Block a user