Merge branch 'unstable' into RELEASE_6

Former-commit-id: f7c2006bf69d6c22f6998327ce8e9746b3b9023c
This commit is contained in:
John Sully 2020-07-13 01:04:49 +00:00
commit 7f36b5541e
30 changed files with 383 additions and 132 deletions

View File

@ -18,5 +18,5 @@ jobs:
- name: test-multithread (5X) - name: test-multithread (5X)
run: | run: |
sudo apt-get install -y tcl8.5 sudo apt-get install -y tcl8.5
./runtest --loopn 5 --config server-threads 3 --clients 5 ./runtest --loopn 5 --config server-threads 3 --clients 5 --endurance

19
.gitignore vendored
View File

@ -4,8 +4,24 @@ core
*.xo *.xo
*.so *.so
*.d *.d
!**/bash_completion.d
!**/logrotate.d
!**/keydb.service.d
!**/keydb-sentinel.service.d
*.log *.log
dump.rdb dump.rdb
src/keydb-server
**/bin/keydb-server
**/app/keydb-server
*.deb
*.rpm
keydb-pro-server
src/keydb-cli
**/bin/keydb-cli
**/app/keydb-cli
src/keydb-sentinel
**/bin/keydb-sentinel
**/app/keydb-sentinel
redis-benchmark redis-benchmark
keydb-benchmark keydb-benchmark
redis-check-aof redis-check-aof
@ -15,11 +31,8 @@ keydb-check-rdb
redis-check-dump redis-check-dump
keydb-check-dump keydb-check-dump
redis-cli redis-cli
keydb-cli
redis-sentinel redis-sentinel
keydb-sentinel
redis-server redis-server
keydb-server
doc-tools doc-tools
release release
misc/* misc/*

View File

@ -11,6 +11,76 @@ CRITICAL: There is a critical bug affecting MOST USERS. Upgrade ASAP.
SECURITY: There are security fixes in the release. SECURITY: There are security fixes in the release.
-------------------------------------------------------------------------------- --------------------------------------------------------------------------------
================================================================================
Redis 6.0.5 Released Tue Jun 09 11:56:08 CEST 2020
================================================================================
Upgrade urgency MODERATE: several bugs with moderate impact are fixed here.
The most important issues are listed here:
* Fix handling of speical chars in ACL LOAD.
* Make Redis Cluster more robust about operation errors that may lead
to two clusters to mix together.
* Revert the sendfile() implementation of RDB transfer. It causes some delay.
* Fix TLS certificate loading for chained certificates.
* Fix AOF rewirting of KEEPTTL SET option.
* Fix MULTI/EXEC behavior during -BUSY script errors.
And this is the full list of commits:
antirez in commit ee8dd01bb:
Temporary fix for #7353 issue about EVAL during -BUSY.
1 file changed, 9 insertions(+)
xhe in commit a4a856d53:
return the correct proto version HELLO should return the current proto version, while the code hardcoded 3
1 file changed, 1 insertion(+), 1 deletion(-)
Oran Agra in commit e2046b300:
Don't queue commands in an already aborted MULTI state
1 file changed, 7 insertions(+)
Oran Agra in commit b35fdf1de:
Avoid rejecting WATCH / UNWATCH, like MULTI/EXEC/DISCARD
1 file changed, 4 insertions(+), 2 deletions(-)
zhaozhao.zz in commit 1d7bf208c:
AOF: append origin SET if no expire option
2 files changed, 23 insertions(+), 8 deletions(-)
Oran Agra in commit 676445ad9:
fix disconnectSlaves, to try to free each slave.
1 file changed, 1 deletion(-)
zhaozhao.zz in commit 4846c0c8a:
donot free protected client in freeClientsInAsyncFreeQueue
1 file changed, 9 insertions(+), 3 deletions(-)
Oran Agra in commit f33de403e:
fix pingoff test race
1 file changed, 1 insertion(+)
Kevin Fwu in commit 49af4d07e:
Fix TLS certificate loading for chained certificates.
1 file changed, 1 insertion(+), 1 deletion(-)
antirez in commit 329fddbda:
Revert "Implements sendfile for redis."
2 files changed, 2 insertions(+), 55 deletions(-)
antirez in commit 925a2cd5a:
Revert "avoid using sendfile if tls-replication is enabled"
1 file changed, 27 insertions(+), 34 deletions(-)
Liu Zhen in commit 84a7a9058:
fix clusters mixing accidentally by gossip
1 file changed, 10 insertions(+), 2 deletions(-)
antirez in commit cd63359a1:
Fix handling of special chars in ACL LOAD.
1 file changed, 8 insertions(+), 4 deletions(-)
================================================================================ ================================================================================
Redis 6.0.4 Released Thu May 28 11:36:45 CEST 2020 Redis 6.0.4 Released Thu May 28 11:36:45 CEST 2020
================================================================================ ================================================================================

View File

@ -1,10 +0,0 @@
FROM ubuntu:18.04
RUN apt-get update \
&& DEBIAN_FRONTEND=noninteractive apt-get install -qqy \
build-essential nasm autotools-dev autoconf libcurl4-openssl-dev libjemalloc-dev tcl tcl-dev uuid-dev libcurl4-openssl-dev \
&& apt-get clean
CMD make

View File

@ -1811,6 +1811,15 @@ jemalloc-bg-thread yes
# Set bgsave child process to cpu affinity 1,10,11 # Set bgsave child process to cpu affinity 1,10,11
# bgsave_cpulist 1,10-11 # bgsave_cpulist 1,10-11
# The minimum number of clients on a thread before KeyDB assigns new connections to a different thread
# Tuning this parameter is a tradeoff between locking overhead and distributing the workload over multiple cores
# min-clients-per-thread 50
# Avoid forwarding RREPLAY messages to other masters?
# WARNING: This setting is dangerous! You must be certain all masters are connected to each
# other in a true mesh topology or data loss will occur!
# This command can be used to reduce multimaster bus traffic
# multi-master-no-forward no
# Path to directory for file backed scratchpad. The file backed scratchpad # Path to directory for file backed scratchpad. The file backed scratchpad
# reduces memory requirements by storing rarely accessed data on disk # reduces memory requirements by storing rarely accessed data on disk

View File

@ -402,13 +402,13 @@ bench: $(REDIS_BENCHMARK_NAME)
$(MAKE) CXXFLAGS="-m32" CFLAGS="-m32" LDFLAGS="-m32" $(MAKE) CXXFLAGS="-m32" CFLAGS="-m32" LDFLAGS="-m32"
gcov: gcov:
$(MAKE) REDIS_CFLAGS="-fprofile-arcs -ftest-coverage -DCOVERAGE_TEST" REDIS_LDFLAGS="-fprofile-arcs -ftest-coverage" $(MAKE) REDIS_CXXFLAGS="-fprofile-arcs -ftest-coverage -DCOVERAGE_TEST" REDIS_CFLAGS="-fprofile-arcs -ftest-coverage -DCOVERAGE_TEST" REDIS_LDFLAGS="-fprofile-arcs -ftest-coverage"
noopt: noopt:
$(MAKE) OPTIMIZATION="-O0" $(MAKE) OPTIMIZATION="-O0"
valgrind: valgrind:
$(MAKE) OPTIMIZATION="-O0" MALLOC="libc" $(MAKE) OPTIMIZATION="-O0" USEASM="false" MALLOC="libc" CFLAGS="-DSANITIZE" CXXFLAGS="-DSANITIZE"
helgrind: helgrind:
$(MAKE) OPTIMIZATION="-O0" MALLOC="libc" CFLAGS="-D__ATOMIC_VAR_FORCE_SYNC_MACROS" $(MAKE) OPTIMIZATION="-O0" MALLOC="libc" CFLAGS="-D__ATOMIC_VAR_FORCE_SYNC_MACROS"

View File

@ -739,6 +739,7 @@ void ACLAddAllowedSubcommand(user *u, unsigned long id, const char *sub) {
*/ */
int ACLSetUser(user *u, const char *op, ssize_t oplen) { int ACLSetUser(user *u, const char *op, ssize_t oplen) {
if (oplen == -1) oplen = strlen(op); if (oplen == -1) oplen = strlen(op);
if (oplen == 0) return C_OK; /* Empty string is a no-operation. */
if (!strcasecmp(op,"on")) { if (!strcasecmp(op,"on")) {
u->flags |= USER_FLAG_ENABLED; u->flags |= USER_FLAG_ENABLED;
u->flags &= ~USER_FLAG_DISABLED; u->flags &= ~USER_FLAG_DISABLED;
@ -1300,7 +1301,7 @@ sds ACLLoadFromFile(const char *filename) {
if (lines[i][0] == '\0') continue; if (lines[i][0] == '\0') continue;
/* Split into arguments */ /* Split into arguments */
argv = sdssplitargs(lines[i],&argc); argv = sdssplitlen(lines[i],sdslen(lines[i])," ",1,&argc);
if (argv == NULL) { if (argv == NULL) {
errors = sdscatprintf(errors, errors = sdscatprintf(errors,
"%s:%d: unbalanced quotes in acl line. ", "%s:%d: unbalanced quotes in acl line. ",
@ -1332,11 +1333,14 @@ sds ACLLoadFromFile(const char *filename) {
continue; continue;
} }
/* Try to process the line using the fake user to validate iif /* Try to process the line using the fake user to validate if
* the rules are able to apply cleanly. */ * the rules are able to apply cleanly. At this stage we also
* trim trailing spaces, so that we don't have to handle that
* in ACLSetUser(). */
ACLSetUser(fakeuser,"reset",-1); ACLSetUser(fakeuser,"reset",-1);
int j; int j;
for (j = 2; j < argc; j++) { for (j = 2; j < argc; j++) {
argv[j] = sdstrim(argv[j],"\t\r\n");
if (ACLSetUser(fakeuser,argv[j],sdslen(argv[j])) != C_OK) { if (ACLSetUser(fakeuser,argv[j],sdslen(argv[j])) != C_OK) {
const char *errmsg = ACLSetUserStringError(); const char *errmsg = ACLSetUserStringError();
errors = sdscatprintf(errors, errors = sdscatprintf(errors,

View File

@ -234,7 +234,7 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
int ret = AE_OK; int ret = AE_OK;
aeCommand cmd = {}; aeCommand cmd;
cmd.op = AE_ASYNC_OP::CreateFileEvent; cmd.op = AE_ASYNC_OP::CreateFileEvent;
cmd.fd = fd; cmd.fd = fd;
cmd.mask = mask; cmd.mask = mask;
@ -258,7 +258,7 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
if (fSynchronous) if (fSynchronous)
{ {
std::unique_lock<std::mutex> ulock(cmd.pctl->mutexcv, std::defer_lock); std::unique_lock<std::mutex> ulock(cmd.pctl->mutexcv, std::adopt_lock);
cmd.pctl->cv.wait(ulock); cmd.pctl->cv.wait(ulock);
ret = cmd.pctl->rval; ret = cmd.pctl->rval;
delete cmd.pctl; delete cmd.pctl;
@ -311,7 +311,7 @@ int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynch
int ret = AE_OK; int ret = AE_OK;
if (fSynchronous) if (fSynchronous)
{ {
std::unique_lock<std::mutex> ulock(cmd.pctl->mutexcv, std::defer_lock); std::unique_lock<std::mutex> ulock(cmd.pctl->mutexcv, std::adopt_lock);
cmd.pctl->cv.wait(ulock); cmd.pctl->cv.wait(ulock);
ret = cmd.pctl->rval; ret = cmd.pctl->rval;
delete cmd.pctl; delete cmd.pctl;
@ -453,7 +453,7 @@ void aeDeleteFileEventAsync(aeEventLoop *eventLoop, int fd, int mask)
{ {
if (eventLoop == g_eventLoopThisThread) if (eventLoop == g_eventLoopThisThread)
return aeDeleteFileEvent(eventLoop, fd, mask); return aeDeleteFileEvent(eventLoop, fd, mask);
aeCommand cmd; aeCommand cmd = {};
cmd.op = AE_ASYNC_OP::DeleteFileEvent; cmd.op = AE_ASYNC_OP::DeleteFileEvent;
cmd.fd = fd; cmd.fd = fd;
cmd.mask = mask; cmd.mask = mask;

View File

@ -671,19 +671,23 @@ sds catCommandForAofAndActiveReplication(sds buf, struct redisCommand *cmd, robj
} else if (cmd->proc == setCommand && argc > 3) { } else if (cmd->proc == setCommand && argc > 3) {
int i; int i;
robj *exarg = NULL, *pxarg = NULL; robj *exarg = NULL, *pxarg = NULL;
/* Translate SET [EX seconds][PX milliseconds] to SET and PEXPIREAT */
buf = catAppendOnlyGenericCommand(buf,3,argv);
for (i = 3; i < argc; i ++) { for (i = 3; i < argc; i ++) {
if (!strcasecmp(szFromObj(argv[i]), "ex")) exarg = argv[i+1]; if (!strcasecmp(szFromObj(argv[i]), "ex")) exarg = argv[i+1];
if (!strcasecmp(szFromObj(argv[i]), "px")) pxarg = argv[i+1]; if (!strcasecmp(szFromObj(argv[i]), "px")) pxarg = argv[i+1];
} }
serverAssert(!(exarg && pxarg)); serverAssert(!(exarg && pxarg));
if (exarg) if (exarg || pxarg) {
buf = catAppendOnlyExpireAtCommand(buf,cserver.expireCommand,argv[1], /* Translate SET [EX seconds][PX milliseconds] to SET and PEXPIREAT */
exarg); buf = catAppendOnlyGenericCommand(buf,3,argv);
if (pxarg) if (exarg)
buf = catAppendOnlyExpireAtCommand(buf,cserver.pexpireCommand,argv[1], buf = catAppendOnlyExpireAtCommand(buf,cserver.expireCommand,argv[1],
pxarg); exarg);
if (pxarg)
buf = catAppendOnlyExpireAtCommand(buf,cserver.pexpireCommand,argv[1],
pxarg);
} else {
buf = catAppendOnlyGenericCommand(buf,argc,argv);
}
} else if (cmd->proc == expireMemberCommand || cmd->proc == expireMemberAtCommand || } else if (cmd->proc == expireMemberCommand || cmd->proc == expireMemberAtCommand ||
cmd->proc == pexpireMemberAtCommand) { cmd->proc == pexpireMemberAtCommand) {
/* Translate subkey expire commands to PEXPIREMEMBERAT */ /* Translate subkey expire commands to PEXPIREMEMBERAT */

View File

@ -1501,7 +1501,10 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
} }
} else { } else {
/* If it's not in NOADDR state and we don't have it, we /* If it's not in NOADDR state and we don't have it, we
* start a handshake process against this IP/PORT pairs. * add it to our trusted dict with exact nodeid and flag.
* Note that we cannot simply start a handshake against
* this IP/PORT pairs, since IP/PORT can be reused already,
* otherwise we risk joining another cluster.
* *
* Note that we require that the sender of this gossip message * Note that we require that the sender of this gossip message
* is a well known node in our cluster, otherwise we risk * is a well known node in our cluster, otherwise we risk
@ -1510,7 +1513,12 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
!(flags & CLUSTER_NODE_NOADDR) && !(flags & CLUSTER_NODE_NOADDR) &&
!clusterBlacklistExists(g->nodename)) !clusterBlacklistExists(g->nodename))
{ {
clusterStartHandshake(g->ip,ntohs(g->port),ntohs(g->cport)); clusterNode *node;
node = createClusterNode(g->nodename, flags);
memcpy(node->ip,g->ip,NET_IP_STR_LEN);
node->port = ntohs(g->port);
node->cport = ntohs(g->cport);
clusterAddNode(node);
} }
} }

View File

@ -518,7 +518,7 @@ void loadServerConfigFromString(char *config) {
err = "KeyDB not compliled with scratch-file support."; err = "KeyDB not compliled with scratch-file support.";
goto loaderr; goto loaderr;
#endif #endif
} else if (!strcasecmp(argv[0],"server-threads") && argc == 2) { } else if ((!strcasecmp(argv[0],"server-threads") || !strcasecmp(argv[0],"io-threads")) && argc == 2) {
cserver.cthreads = atoi(argv[1]); cserver.cthreads = atoi(argv[1]);
if (cserver.cthreads <= 0 || cserver.cthreads > MAX_EVENT_LOOPS) { if (cserver.cthreads <= 0 || cserver.cthreads > MAX_EVENT_LOOPS) {
err = "Invalid number of threads specified"; err = "Invalid number of threads specified";
@ -2167,6 +2167,13 @@ static int updateMaxclients(long long val, long long prev, const char **err) {
return 1; return 1;
} }
static int validateMultiMasterNoForward(int val, const char **) {
if (val) {
serverLog(LL_WARNING, "WARNING: multi-master-no-forward is set, you *must* use a mesh topology or dataloss will occur");
}
return 1;
}
#ifdef USE_OPENSSL #ifdef USE_OPENSSL
static int updateTlsCfg(char *val, char *prev, const char **err) { static int updateTlsCfg(char *val, char *prev, const char **err) {
UNUSED(val); UNUSED(val);
@ -2185,6 +2192,7 @@ static int updateTlsCfgBool(int val, int prev, const char **err) {
} }
#endif /* USE_OPENSSL */ #endif /* USE_OPENSSL */
int fDummy = false;
standardConfig configs[] = { standardConfig configs[] = {
/* Bool configs */ /* Bool configs */
createBoolConfig("rdbchecksum", NULL, IMMUTABLE_CONFIG, g_pserver->rdb_checksum, 1, NULL, NULL), createBoolConfig("rdbchecksum", NULL, IMMUTABLE_CONFIG, g_pserver->rdb_checksum, 1, NULL, NULL),
@ -2200,6 +2208,7 @@ standardConfig configs[] = {
createBoolConfig("lazyfree-lazy-eviction", NULL, MODIFIABLE_CONFIG, g_pserver->lazyfree_lazy_eviction, 0, NULL, NULL), createBoolConfig("lazyfree-lazy-eviction", NULL, MODIFIABLE_CONFIG, g_pserver->lazyfree_lazy_eviction, 0, NULL, NULL),
createBoolConfig("lazyfree-lazy-expire", NULL, MODIFIABLE_CONFIG, g_pserver->lazyfree_lazy_expire, 0, NULL, NULL), createBoolConfig("lazyfree-lazy-expire", NULL, MODIFIABLE_CONFIG, g_pserver->lazyfree_lazy_expire, 0, NULL, NULL),
createBoolConfig("lazyfree-lazy-server-del", NULL, MODIFIABLE_CONFIG, g_pserver->lazyfree_lazy_server_del, 0, NULL, NULL), createBoolConfig("lazyfree-lazy-server-del", NULL, MODIFIABLE_CONFIG, g_pserver->lazyfree_lazy_server_del, 0, NULL, NULL),
createBoolConfig("lazyfree-lazy-user-del", NULL, MODIFIABLE_CONFIG, g_pserver->lazyfree_lazy_user_del , 0, NULL, NULL),
createBoolConfig("repl-disable-tcp-nodelay", NULL, MODIFIABLE_CONFIG, g_pserver->repl_disable_tcp_nodelay, 0, NULL, NULL), createBoolConfig("repl-disable-tcp-nodelay", NULL, MODIFIABLE_CONFIG, g_pserver->repl_disable_tcp_nodelay, 0, NULL, NULL),
createBoolConfig("repl-diskless-sync", NULL, MODIFIABLE_CONFIG, g_pserver->repl_diskless_sync, 0, NULL, NULL), createBoolConfig("repl-diskless-sync", NULL, MODIFIABLE_CONFIG, g_pserver->repl_diskless_sync, 0, NULL, NULL),
createBoolConfig("aof-rewrite-incremental-fsync", NULL, MODIFIABLE_CONFIG, g_pserver->aof_rewrite_incremental_fsync, 1, NULL, NULL), createBoolConfig("aof-rewrite-incremental-fsync", NULL, MODIFIABLE_CONFIG, g_pserver->aof_rewrite_incremental_fsync, 1, NULL, NULL),
@ -2220,6 +2229,7 @@ standardConfig configs[] = {
createBoolConfig("cluster-enabled", NULL, IMMUTABLE_CONFIG, g_pserver->cluster_enabled, 0, NULL, NULL), createBoolConfig("cluster-enabled", NULL, IMMUTABLE_CONFIG, g_pserver->cluster_enabled, 0, NULL, NULL),
createBoolConfig("appendonly", NULL, MODIFIABLE_CONFIG, g_pserver->aof_enabled, 0, NULL, updateAppendonly), createBoolConfig("appendonly", NULL, MODIFIABLE_CONFIG, g_pserver->aof_enabled, 0, NULL, updateAppendonly),
createBoolConfig("cluster-allow-reads-when-down", NULL, MODIFIABLE_CONFIG, g_pserver->cluster_allow_reads_when_down, 0, NULL, NULL), createBoolConfig("cluster-allow-reads-when-down", NULL, MODIFIABLE_CONFIG, g_pserver->cluster_allow_reads_when_down, 0, NULL, NULL),
createBoolConfig("multi-master-no-forward", NULL, MODIFIABLE_CONFIG, cserver.multimaster_no_forward, 0, validateMultiMasterNoForward, NULL),
/* String Configs */ /* String Configs */
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, g_pserver->acl_filename, "", NULL, NULL), createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, g_pserver->acl_filename, "", NULL, NULL),

View File

@ -164,7 +164,7 @@ static int connSocketWrite(connection *conn, const void *data, size_t data_len)
int ret = write(conn->fd, data, data_len); int ret = write(conn->fd, data, data_len);
if (ret < 0 && errno != EAGAIN) { if (ret < 0 && errno != EAGAIN) {
conn->last_errno = errno; conn->last_errno = errno;
conn->state = CONN_STATE_ERROR; conn->state.store(CONN_STATE_ERROR, std::memory_order_relaxed);
} }
return ret; return ret;
@ -173,10 +173,10 @@ static int connSocketWrite(connection *conn, const void *data, size_t data_len)
static int connSocketRead(connection *conn, void *buf, size_t buf_len) { static int connSocketRead(connection *conn, void *buf, size_t buf_len) {
int ret = read(conn->fd, buf, buf_len); int ret = read(conn->fd, buf, buf_len);
if (!ret) { if (!ret) {
conn->state = CONN_STATE_CLOSED; conn->state.store(CONN_STATE_CLOSED, std::memory_order_release);
} else if (ret < 0 && errno != EAGAIN) { } else if (ret < 0 && errno != EAGAIN) {
conn->last_errno = errno; conn->last_errno = errno;
conn->state = CONN_STATE_ERROR; conn->state.store(CONN_STATE_ERROR, std::memory_order_release);
} }
return ret; return ret;
@ -253,14 +253,14 @@ static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientD
UNUSED(fd); UNUSED(fd);
connection *conn = (connection*)clientData; connection *conn = (connection*)clientData;
if (conn->state == CONN_STATE_CONNECTING && if (conn->state.load(std::memory_order_relaxed) == CONN_STATE_CONNECTING &&
(mask & AE_WRITABLE) && conn->conn_handler) { (mask & AE_WRITABLE) && conn->conn_handler) {
if (connGetSocketError(conn)) { if (connGetSocketError(conn)) {
conn->last_errno = errno; conn->last_errno = errno;
conn->state = CONN_STATE_ERROR; conn->state.store(CONN_STATE_ERROR, std::memory_order_release);
} else { } else {
conn->state = CONN_STATE_CONNECTED; conn->state.store(CONN_STATE_CONNECTED, std::memory_order_release);
} }
if (!conn->write_handler) aeDeleteFileEvent(serverTL->el,conn->fd,AE_WRITABLE); if (!conn->write_handler) aeDeleteFileEvent(serverTL->el,conn->fd,AE_WRITABLE);
@ -407,7 +407,7 @@ int connRecvTimeout(connection *conn, long long ms) {
} }
int connGetState(connection *conn) { int connGetState(connection *conn) {
return conn->state; return conn->state.load(std::memory_order_relaxed);
} }
void connSetThreadAffinity(connection *conn, int cpu) { void connSetThreadAffinity(connection *conn, int cpu) {

View File

@ -683,6 +683,13 @@ void existsCommand(client *c) {
addReplyLongLong(c,count); addReplyLongLong(c,count);
} }
void mexistsCommand(client *c) {
addReplyArrayLen(c, c->argc - 1);
for (int j = 1; j < c->argc; ++j) {
addReplyBool(c, lookupKeyRead(c->db, c->argv[j]));
}
}
void selectCommand(client *c) { void selectCommand(client *c) {
long id; long id;
@ -1173,6 +1180,7 @@ void moveCommand(client *c) {
/* Return zero if the key already exists in the target DB */ /* Return zero if the key already exists in the target DB */
if (lookupKeyWrite(dst,c->argv[1]) != NULL) { if (lookupKeyWrite(dst,c->argv[1]) != NULL) {
addReply(c,shared.czero); addReply(c,shared.czero);
decrRefCount(o);
return; return;
} }
dbAdd(dst,c->argv[1],o); dbAdd(dst,c->argv[1],o);
@ -1218,8 +1226,7 @@ int dbSwapDatabases(long id1, long id2) {
if (id1 < 0 || id1 >= cserver.dbnum || if (id1 < 0 || id1 >= cserver.dbnum ||
id2 < 0 || id2 >= cserver.dbnum) return C_ERR; id2 < 0 || id2 >= cserver.dbnum) return C_ERR;
if (id1 == id2) return C_OK; if (id1 == id2) return C_OK;
redisDb aux; redisDb aux(g_pserver->db[id1]);
memcpy(&aux, &g_pserver->db[id1], sizeof(redisDb));
redisDb *db1 = &g_pserver->db[id1], *db2 = &g_pserver->db[id2]; redisDb *db1 = &g_pserver->db[id1], *db2 = &g_pserver->db[id2];
/* Swap hash tables. Note that we don't swap blocking_keys, /* Swap hash tables. Note that we don't swap blocking_keys,

View File

@ -1325,7 +1325,12 @@ struct commandHelp {
"Rename a hash key, copying the value.", "Rename a hash key, copying the value.",
4, 4,
"6.5.3" "6.5.3"
} },
{ "KEYDB.MEXISTS",
"key [key ...]",
"Determine if a key exists",
0,
"6.5.12" },
}; };
#endif #endif

View File

@ -4895,14 +4895,17 @@ void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) {
zfree(ctx); zfree(ctx);
} }
static bool g_fModuleThread = false;
/* Acquire the server lock before executing a thread safe API call. /* Acquire the server lock before executing a thread safe API call.
* This is not needed for `RedisModule_Reply*` calls when there is * This is not needed for `RedisModule_Reply*` calls when there is
* a blocked client connected to the thread safe context. */ * a blocked client connected to the thread safe context. */
void RM_ThreadSafeContextLock(RedisModuleCtx *ctx) { void RM_ThreadSafeContextLock(RedisModuleCtx *ctx) {
UNUSED(ctx); UNUSED(ctx);
moduleAcquireGIL(FALSE /*fServerThread*/); if (serverTL == nullptr) {
if (serverTL == nullptr)
serverTL = &g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN]; // arbitrary module threads get the main thread context serverTL = &g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN]; // arbitrary module threads get the main thread context
g_fModuleThread = true;
}
moduleAcquireGIL(FALSE /*fServerThread*/);
} }
/* Release the server lock after a thread safe API call was executed. */ /* Release the server lock after a thread safe API call was executed. */
@ -4911,10 +4914,20 @@ void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) {
moduleReleaseGIL(FALSE /*fServerThread*/); moduleReleaseGIL(FALSE /*fServerThread*/);
} }
// A module may be triggered synchronously in a non-module context. In this scenario we don't lock again
// as the server thread acquisition is sufficient. If we did try to lock we would deadlock
static bool FModuleCallBackLock(bool fServerThread)
{
return !fServerThread && aeThreadOwnsLock() && !g_fModuleThread && s_cAcquisitionsServer > 0;
}
void moduleAcquireGIL(int fServerThread) { void moduleAcquireGIL(int fServerThread) {
std::unique_lock<std::mutex> lock(s_mutex); std::unique_lock<std::mutex> lock(s_mutex);
int *pcheck = fServerThread ? &s_cAcquisitionsModule : &s_cAcquisitionsServer; int *pcheck = fServerThread ? &s_cAcquisitionsModule : &s_cAcquisitionsServer;
if (FModuleCallBackLock(fServerThread)) {
return;
}
while (*pcheck > 0) while (*pcheck > 0)
s_cv.wait(lock); s_cv.wait(lock);
@ -4932,6 +4945,10 @@ void moduleAcquireGIL(int fServerThread) {
void moduleReleaseGIL(int fServerThread) { void moduleReleaseGIL(int fServerThread) {
std::unique_lock<std::mutex> lock(s_mutex); std::unique_lock<std::mutex> lock(s_mutex);
if (FModuleCallBackLock(fServerThread)) {
return;
}
if (fServerThread) if (fServerThread)
{ {
--s_cAcquisitionsServer; --s_cAcquisitionsServer;

View File

@ -59,6 +59,13 @@ void queueMultiCommand(client *c) {
multiCmd *mc; multiCmd *mc;
int j; int j;
/* No sense to waste memory if the transaction is already aborted.
* this is useful in case client sends these in a pipeline, or doesn't
* bother to read previous responses and didn't notice the multi was already
* aborted. */
if (c->flags & CLIENT_DIRTY_EXEC)
return;
c->mstate.commands = (multiCmd*)zrealloc(c->mstate.commands, c->mstate.commands = (multiCmd*)zrealloc(c->mstate.commands,
sizeof(multiCmd)*(c->mstate.count+1), MALLOC_LOCAL); sizeof(multiCmd)*(c->mstate.count+1), MALLOC_LOCAL);
mc = c->mstate.commands+c->mstate.count; mc = c->mstate.commands+c->mstate.count;
@ -131,6 +138,15 @@ void execCommand(client *c) {
return; return;
} }
/* If we are in -BUSY state, flag the transaction and return the
* -BUSY error, like Redis <= 5. This is a temporary fix, may be changed
* ASAP, see issue #7353 on Github. */
if (g_pserver->lua_timedout) {
flagTransaction(c);
addReply(c, shared.slowscripterr);
return;
}
/* Check if we need to abort the EXEC because: /* Check if we need to abort the EXEC because:
* 1) Some WATCHed key was touched. * 1) Some WATCHed key was touched.
* 2) There was a previous error while queueing commands. * 2) There was a previous error while queueing commands.

View File

@ -1645,7 +1645,7 @@ int freeClientsInAsyncFreeQueue(int iel) {
while((ln = listNext(&li))) while((ln = listNext(&li)))
{ {
client *c = (client*)listNodeValue(ln); client *c = (client*)listNodeValue(ln);
if (c->iel == iel) if (c->iel == iel && !(c->flags & CLIENT_PROTECTED))
{ {
vecclientsFree.push_back(c); vecclientsFree.push_back(c);
listDelNode(g_pserver->clients_to_close, ln); listDelNode(g_pserver->clients_to_close, ln);
@ -1928,9 +1928,12 @@ int handleClientsWithPendingWrites(int iel, int aof_state) {
} }
} }
AeLocker locker; if (listLength(serverTL->clients_pending_asyncwrite))
locker.arm(nullptr); {
ProcessPendingAsyncWrites(); AeLocker locker;
locker.arm(nullptr);
ProcessPendingAsyncWrites();
}
return processed; return processed;
} }
@ -3057,7 +3060,7 @@ void helloCommand(client *c) {
addReplyBulkCString(c,KEYDB_SET_VERSION); addReplyBulkCString(c,KEYDB_SET_VERSION);
addReplyBulkCString(c,"proto"); addReplyBulkCString(c,"proto");
addReplyLongLong(c,3); addReplyLongLong(c,ver);
addReplyBulkCString(c,"id"); addReplyBulkCString(c,"id");
addReplyLongLong(c,c->id); addReplyLongLong(c,c->id);

View File

@ -2412,9 +2412,8 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
if (fStaleMvccKey && !fExpiredKey && rsi != nullptr && rsi->mi != nullptr && rsi->mi->staleKeyMap != nullptr && lookupKeyRead(db, &keyobj) == nullptr) { if (fStaleMvccKey && !fExpiredKey && rsi != nullptr && rsi->mi != nullptr && rsi->mi->staleKeyMap != nullptr && lookupKeyRead(db, &keyobj) == nullptr) {
// We have a key that we've already deleted and is not back in our database. // We have a key that we've already deleted and is not back in our database.
// We'll need to inform the sending master of the delete if it is also a replica of us // We'll need to inform the sending master of the delete if it is also a replica of us
robj *objKeyDup = createStringObject(key, sdslen(key)); robj_sharedptr objKeyDup(createStringObject(key, sdslen(key)));
rsi->mi->staleKeyMap->operator[](db - g_pserver->db).push_back(objKeyDup); rsi->mi->staleKeyMap->operator[](db - g_pserver->db).push_back(objKeyDup);
decrRefCount(objKeyDup);
} }
fLastKeyExpired = true; fLastKeyExpired = true;
sdsfree(key); sdsfree(key);

View File

@ -1370,30 +1370,28 @@ void sendBulkToSlave(connection *conn) {
* try to use sendfile system call if supported, unless tls is enabled. * try to use sendfile system call if supported, unless tls is enabled.
* fallback to normal read+write otherwise. */ * fallback to normal read+write otherwise. */
nwritten = 0; nwritten = 0;
if (!nwritten) { ssize_t buflen;
ssize_t buflen; char buf[PROTO_IOBUF_LEN];
char buf[PROTO_IOBUF_LEN];
lseek(replica->repldbfd,replica->repldboff,SEEK_SET); lseek(replica->repldbfd,replica->repldboff,SEEK_SET);
buflen = read(replica->repldbfd,buf,PROTO_IOBUF_LEN); buflen = read(replica->repldbfd,buf,PROTO_IOBUF_LEN);
if (buflen <= 0) { if (buflen <= 0) {
serverLog(LL_WARNING,"Read error sending DB to replica: %s", serverLog(LL_WARNING,"Read error sending DB to replica: %s",
(buflen == 0) ? "premature EOF" : strerror(errno)); (buflen == 0) ? "premature EOF" : strerror(errno));
ul.unlock();
aeLock.arm(nullptr);
freeClient(replica);
return;
}
if ((nwritten = connWrite(conn,buf,buflen)) == -1) {
if (connGetState(conn) != CONN_STATE_CONNECTED) {
serverLog(LL_WARNING,"Write error sending DB to replica: %s",
connGetLastError(conn));
ul.unlock(); ul.unlock();
aeLock.arm(nullptr); aeLock.arm(nullptr);
freeClient(replica); freeClient(replica);
return;
}
if ((nwritten = connWrite(conn,buf,buflen)) == -1) {
if (connGetState(conn) != CONN_STATE_CONNECTED) {
serverLog(LL_WARNING,"Write error sending DB to replica: %s",
connGetLastError(conn));
ul.unlock();
aeLock.arm(nullptr);
freeClient(replica);
}
return;
} }
return;
} }
replica->repldboff += nwritten; replica->repldboff += nwritten;
@ -2069,7 +2067,6 @@ void readSyncBulkPayload(connection *conn) {
* *
* 2. Or when we are done reading from the socket to the RDB file, in * 2. Or when we are done reading from the socket to the RDB file, in
* such case we want just to read the RDB file in memory. */ * such case we want just to read the RDB file in memory. */
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data");
/* We need to stop any AOF rewriting child before flusing and parsing /* We need to stop any AOF rewriting child before flusing and parsing
* the RDB, otherwise we'll create a copy-on-write disaster. */ * the RDB, otherwise we'll create a copy-on-write disaster. */
@ -2089,7 +2086,10 @@ void readSyncBulkPayload(connection *conn) {
* (Where disklessLoadMakeBackups left server.db empty) because we * (Where disklessLoadMakeBackups left server.db empty) because we
* want to execute all the auxiliary logic of emptyDb (Namely, * want to execute all the auxiliary logic of emptyDb (Namely,
* fire module events) */ * fire module events) */
emptyDb(-1,empty_db_flags,replicationEmptyDbCallback); if (!fUpdate) {
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data");
emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
}
/* Before loading the DB into memory we need to delete the readable /* Before loading the DB into memory we need to delete the readable
* handler, otherwise it will get called recursively since * handler, otherwise it will get called recursively since
@ -2561,7 +2561,7 @@ int slaveTryPartialResynchronization(redisMaster *mi, connection *conn, int read
* establish a connection with the master. */ * establish a connection with the master. */
void syncWithMaster(connection *conn) { void syncWithMaster(connection *conn) {
serverAssert(GlobalLocksAcquired()); serverAssert(GlobalLocksAcquired());
char tmpfile[256], *err = NULL; char tmpfile[256] = {0}, *err = NULL;
int dfd = -1, maxtries = 5; int dfd = -1, maxtries = 5;
int psync_result; int psync_result;
@ -3151,15 +3151,46 @@ void replicaofCommand(client *c) {
return; return;
} }
/* The special host/port combination "NO" "ONE" turns the instance if (c->argc > 3) {
* into a master. Otherwise the new master address is set. */ if (c->argc != 4) {
if (!strcasecmp((const char*)ptrFromObj(c->argv[1]),"no") && addReplyError(c, "Invalid arguments");
return;
}
if (!strcasecmp((const char*)ptrFromObj(c->argv[1]),"remove")) {
listIter li;
listNode *ln;
bool fRemoved = false;
long port;
string2l(szFromObj(c->argv[3]), sdslen(szFromObj(c->argv[3])), &port);
LRestart:
listRewind(g_pserver->masters, &li);
while ((ln = listNext(&li))) {
redisMaster *mi = (redisMaster*)listNodeValue(ln);
if (mi->masterport != port)
continue;
if (sdscmp(szFromObj(c->argv[2]), mi->masterhost) == 0) {
replicationUnsetMaster(mi);
fRemoved = true;
goto LRestart;
}
}
if (!fRemoved) {
addReplyError(c, "Master not found");
return;
} else if (listLength(g_pserver->masters) == 0) {
goto LLogNoMaster;
}
}
} else if (!strcasecmp((const char*)ptrFromObj(c->argv[1]),"no") &&
!strcasecmp((const char*)ptrFromObj(c->argv[2]),"one")) { !strcasecmp((const char*)ptrFromObj(c->argv[2]),"one")) {
/* The special host/port combination "NO" "ONE" turns the instance
* into a master. Otherwise the new master address is set. */
if (listLength(g_pserver->masters)) { if (listLength(g_pserver->masters)) {
while (listLength(g_pserver->masters)) while (listLength(g_pserver->masters))
{ {
replicationUnsetMaster((redisMaster*)listNodeValue(listFirst(g_pserver->masters))); replicationUnsetMaster((redisMaster*)listNodeValue(listFirst(g_pserver->masters)));
} }
LLogNoMaster:
sds client = catClientInfoString(sdsempty(),c); sds client = catClientInfoString(sdsempty(),c);
serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')", serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')",
client); client);
@ -4218,7 +4249,7 @@ void replicaReplayCommand(client *c)
serverTL->current_client = current_clientSave; serverTL->current_client = current_clientSave;
// call() will not propogate this for us, so we do so here // call() will not propogate this for us, so we do so here
if (!s_pstate->FCancelled() && s_pstate->FFirst()) if (!s_pstate->FCancelled() && s_pstate->FFirst() && !cserver.multimaster_no_forward)
alsoPropagate(cserver.rreplayCommand,c->db->id,c->argv,c->argc,PROPAGATE_AOF|PROPAGATE_REPL); alsoPropagate(cserver.rreplayCommand,c->db->id,c->argv,c->argc,PROPAGATE_AOF|PROPAGATE_REPL);
s_pstate->Pop(); s_pstate->Pop();

View File

@ -247,6 +247,10 @@ struct redisCommand redisCommandTable[] = {
"read-only fast @keyspace", "read-only fast @keyspace",
0,NULL,1,-1,1,0,0,0}, 0,NULL,1,-1,1,0,0,0},
{"keydb.mexists",mexistsCommand,-2,
"read-only fast @keyspace",
0,NULL,1,-1,1,0,0,0},
{"setbit",setbitCommand,4, {"setbit",setbitCommand,4,
"write use-memory @bitmap", "write use-memory @bitmap",
0,NULL,1,1,1,0,0,0}, 0,NULL,1,1,1,0,0,0},
@ -768,7 +772,7 @@ struct redisCommand redisCommandTable[] = {
"admin no-script ok-stale", "admin no-script ok-stale",
0,NULL,0,0,0,0,0,0}, 0,NULL,0,0,0,0,0,0},
{"replicaof",replicaofCommand,3, {"replicaof",replicaofCommand,-3,
"admin no-script ok-stale", "admin no-script ok-stale",
0,NULL,0,0,0,0,0,0}, 0,NULL,0,0,0,0,0,0},
@ -809,11 +813,11 @@ struct redisCommand redisCommandTable[] = {
0,NULL,0,0,0,0,0,0}, 0,NULL,0,0,0,0,0,0},
{"watch",watchCommand,-2, {"watch",watchCommand,-2,
"no-script fast @transaction", "no-script fast ok-loading ok-stale @transaction",
0,NULL,1,-1,1,0,0,0}, 0,NULL,1,-1,1,0,0,0},
{"unwatch",unwatchCommand,1, {"unwatch",unwatchCommand,1,
"no-script fast @transaction", "no-script fast ok-loading ok-stale @transaction",
0,NULL,0,0,0,0,0,0}, 0,NULL,0,0,0,0,0,0},
{"cluster",clusterCommand,-2, {"cluster",clusterCommand,-2,
@ -3866,6 +3870,8 @@ int processCommand(client *c, int callFlags, AeLocker &locker) {
c->cmd->proc != multiCommand && c->cmd->proc != multiCommand &&
c->cmd->proc != execCommand && c->cmd->proc != execCommand &&
c->cmd->proc != discardCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != watchCommand &&
c->cmd->proc != unwatchCommand &&
!(c->cmd->proc == shutdownCommand && !(c->cmd->proc == shutdownCommand &&
c->argc == 2 && c->argc == 2 &&
tolower(((char*)ptrFromObj(c->argv[1]))[0]) == 'n') && tolower(((char*)ptrFromObj(c->argv[1]))[0]) == 'n') &&

View File

@ -166,22 +166,24 @@ class robj_sharedptr
public: public:
robj_sharedptr() robj_sharedptr()
: m_ptr(nullptr) : m_ptr(nullptr)
{} {}
robj_sharedptr(redisObject *ptr) explicit robj_sharedptr(redisObject *ptr)
: m_ptr(ptr) : m_ptr(ptr)
{ {
if(m_ptr)
incrRefCount(ptr); incrRefCount(ptr);
} }
~robj_sharedptr() ~robj_sharedptr()
{ {
if (m_ptr) if (m_ptr)
decrRefCount(m_ptr); decrRefCount(m_ptr);
} }
robj_sharedptr(const robj_sharedptr& other) robj_sharedptr(const robj_sharedptr& other)
: m_ptr(other.m_ptr)
{ {
m_ptr = other.m_ptr; if(m_ptr)
incrRefCount(m_ptr); incrRefCount(m_ptr);
} }
robj_sharedptr(robj_sharedptr&& other) robj_sharedptr(robj_sharedptr&& other)
@ -192,41 +194,19 @@ public:
robj_sharedptr &operator=(const robj_sharedptr& other) robj_sharedptr &operator=(const robj_sharedptr& other)
{ {
if (m_ptr) robj_sharedptr tmp(other);
decrRefCount(m_ptr); using std::swap;
m_ptr = other.m_ptr; swap(m_ptr, tmp.m_ptr);
incrRefCount(m_ptr);
return *this; return *this;
} }
robj_sharedptr &operator=(redisObject *ptr) robj_sharedptr &operator=(redisObject *ptr)
{ {
if (m_ptr) robj_sharedptr tmp(ptr);
decrRefCount(m_ptr); using std::swap;
m_ptr = ptr; swap(m_ptr, tmp.m_ptr);
incrRefCount(m_ptr);
return *this; return *this;
} }
bool operator==(const robj_sharedptr &other) const
{
return m_ptr == other.m_ptr;
}
bool operator==(const void *p) const
{
return m_ptr == p;
}
bool operator!=(const robj_sharedptr &other) const
{
return m_ptr != other.m_ptr;
}
bool operator!=(const void *p) const
{
return m_ptr != p;
}
redisObject* operator->() const redisObject* operator->() const
{ {
return m_ptr; return m_ptr;
@ -237,7 +217,7 @@ public:
return !m_ptr; return !m_ptr;
} }
operator bool() const{ explicit operator bool() const{
return !!m_ptr; return !!m_ptr;
} }
@ -247,8 +227,39 @@ public:
} }
redisObject *get() { return m_ptr; } redisObject *get() { return m_ptr; }
const redisObject *get() const { return m_ptr; }
}; };
inline bool operator==(const robj_sharedptr &lhs, const robj_sharedptr &rhs)
{
return lhs.get() == rhs.get();
}
inline bool operator!=(const robj_sharedptr &lhs, const robj_sharedptr &rhs)
{
return !(lhs == rhs);
}
inline bool operator==(const robj_sharedptr &lhs, const void *p)
{
return lhs.get() == p;
}
inline bool operator==(const void *p, const robj_sharedptr &rhs)
{
return rhs == p;
}
inline bool operator!=(const robj_sharedptr &lhs, const void *p)
{
return !(lhs == p);
}
inline bool operator!=(const void *p, const robj_sharedptr &rhs)
{
return !(rhs == p);
}
/* Error codes */ /* Error codes */
#define C_OK 0 #define C_OK 0
#define C_ERR -1 #define C_ERR -1
@ -1637,6 +1648,7 @@ struct redisServerConst {
unsigned char uuid[UUID_BINARY_LEN]; /* This server's UUID - populated on boot */ unsigned char uuid[UUID_BINARY_LEN]; /* This server's UUID - populated on boot */
bool fUsePro = false; bool fUsePro = false;
int thread_min_client_threshold = 50; int thread_min_client_threshold = 50;
int multimaster_no_forward;
}; };
struct redisServer { struct redisServer {
@ -2810,6 +2822,7 @@ void getCommand(client *c);
void delCommand(client *c); void delCommand(client *c);
void unlinkCommand(client *c); void unlinkCommand(client *c);
void existsCommand(client *c); void existsCommand(client *c);
void mexistsCommand(client *c);
void setbitCommand(client *c); void setbitCommand(client *c);
void getbitCommand(client *c); void getbitCommand(client *c);
void bitfieldCommand(client *c); void bitfieldCommand(client *c);

View File

@ -226,7 +226,7 @@ int tlsConfigure(redisTLSContextConfig *ctx_config) {
SSL_CTX_set_ecdh_auto(ctx, 1); SSL_CTX_set_ecdh_auto(ctx, 1);
#endif #endif
if (SSL_CTX_use_certificate_file(ctx, ctx_config->cert_file, SSL_FILETYPE_PEM) <= 0) { if (SSL_CTX_use_certificate_chain_file(ctx, ctx_config->cert_file) <= 0) {
ERR_error_string_n(ERR_get_error(), errbuf, sizeof(errbuf)); ERR_error_string_n(ERR_get_error(), errbuf, sizeof(errbuf));
serverLog(LL_WARNING, "Failed to load certificate: %s: %s", ctx_config->cert_file, errbuf); serverLog(LL_WARNING, "Failed to load certificate: %s: %s", ctx_config->cert_file, errbuf);
goto error; goto error;

View File

@ -101,8 +101,8 @@ void disableTracking(client *c) {
/* Set the client 'c' to track the prefix 'prefix'. If the client 'c' is /* Set the client 'c' to track the prefix 'prefix'. If the client 'c' is
* already registered for the specified prefix, no operation is performed. */ * already registered for the specified prefix, no operation is performed. */
void enableBcastTrackingForPrefix(client *c, const char *prefix, size_t plen) { static void enableBcastTrackingForPrefix(client *c, const char *prefix, size_t plen) {
bcastState *bs = (bcastState*)raxFind(PrefixTable,(unsigned char*)prefix,sdslen(prefix)); bcastState *bs = (bcastState*)raxFind(PrefixTable,(unsigned char*)prefix,plen);
/* If this is the first client subscribing to such prefix, create /* If this is the first client subscribing to such prefix, create
* the prefix in the table. */ * the prefix in the table. */
if (bs == raxNotFound) { if (bs == raxNotFound) {

View File

@ -64,6 +64,7 @@ start_server {} {
# make sure replication is still alive and kicking # make sure replication is still alive and kicking
$R(1) incr x $R(1) incr x
wait_for_condition 50 1000 { wait_for_condition 50 1000 {
[status $R(0) loading] == 0 &&
[$R(0) get x] == 1 [$R(0) get x] == 1
} else { } else {
fail "replica didn't get incr" fail "replica didn't get incr"

View File

@ -237,3 +237,18 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} {
} }
} }
} }
start_server {tags {"active-repl"} overrides {active-replica yes}} {
set slave [srv 0 client]
set slave_host [srv 0 host]
set slave_port [srv 0 port]
start_server {tags {"active-repl"} overrides { active-replica yes}} {
r set testkeyB bar
test {Active Replica Merges Database On Sync} {
$slave set testkeyA foo
r replicaof $slave_host $slave_port
after 1000
assert_equal 2 [r dbsize]
}
}
}

View File

@ -1,4 +1,6 @@
foreach topology {mesh ring} { foreach topology {mesh ring} {
foreach noforward [expr {[string equal $topology "mesh"] ? {no yes} : {no}}] {
start_server {tags {"multi-master"} overrides {hz 500 active-replica yes multi-master yes}} { start_server {tags {"multi-master"} overrides {hz 500 active-replica yes multi-master yes}} {
start_server {overrides {hz 500 active-replica yes multi-master yes}} { start_server {overrides {hz 500 active-replica yes multi-master yes}} {
start_server {overrides {hz 500 active-replica yes multi-master yes}} { start_server {overrides {hz 500 active-replica yes multi-master yes}} {
@ -8,8 +10,12 @@ start_server {overrides {hz 500 active-replica yes multi-master yes}} {
set R($j) [srv [expr 0-$j] client] set R($j) [srv [expr 0-$j] client]
set R_host($j) [srv [expr 0-$j] host] set R_host($j) [srv [expr 0-$j] host]
set R_port($j) [srv [expr 0-$j] port] set R_port($j) [srv [expr 0-$j] port]
$R($j) config set multi-master-no-forward $noforward
} }
set topology_name "$topology[expr {[string equal $noforward "yes"] ? " no-forward" : ""}]"
# Initialize as mesh # Initialize as mesh
if [string equal $topology "mesh"] { if [string equal $topology "mesh"] {
for {set j 0} {$j < 4} {incr j} { for {set j 0} {$j < 4} {incr j} {
@ -31,7 +37,7 @@ start_server {overrides {hz 500 active-replica yes multi-master yes}} {
$R(3) replicaof $R_host(2) $R_port(2) $R(3) replicaof $R_host(2) $R_port(2)
} }
test "$topology all nodes up" { test "$topology_name all nodes up" {
for {set j 0} {$j < 4} {incr j} { for {set j 0} {$j < 4} {incr j} {
wait_for_condition 50 100 { wait_for_condition 50 100 {
[string match {*master_global_link_status:up*} [$R($j) info replication]] [string match {*master_global_link_status:up*} [$R($j) info replication]]
@ -41,7 +47,7 @@ start_server {overrides {hz 500 active-replica yes multi-master yes}} {
} }
} }
test "$topology replicates to all nodes" { test "$topology_name replicates to all nodes" {
$R(0) set testkey foo $R(0) set testkey foo
after 500 after 500
for {set n 0} {$n < 4} {incr n} { for {set n 0} {$n < 4} {incr n} {
@ -53,7 +59,7 @@ start_server {overrides {hz 500 active-replica yes multi-master yes}} {
} }
} }
test "$topology replicates only once" { test "$topology_name replicates only once" {
$R(0) set testkey 1 $R(0) set testkey 1
after 500 after 500
#wait_for_condition 50 100 { #wait_for_condition 50 100 {
@ -74,7 +80,7 @@ start_server {overrides {hz 500 active-replica yes multi-master yes}} {
} }
} }
test "$topology transaction replicates only once" { test "$topology_name transaction replicates only once" {
for {set j 0} {$j < 1000} {incr j} { for {set j 0} {$j < 1000} {incr j} {
$R(0) set testkey 1 $R(0) set testkey 1
$R(0) multi $R(0) multi
@ -95,3 +101,4 @@ start_server {overrides {hz 500 active-replica yes multi-master yes}} {
} }
} }
} }
}

View File

@ -30,6 +30,7 @@
* POSSIBILITY OF SUCH DAMAGE. * POSSIBILITY OF SUCH DAMAGE.
*/ */
#define REDISMODULE_EXPERIMENTAL_API 1
#include "redismodule.h" #include "redismodule.h"
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
@ -143,10 +144,12 @@ void flushdbCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void
{ {
REDISMODULE_NOT_USED(e); REDISMODULE_NOT_USED(e);
RedisModule_ThreadSafeContextLock(ctx);
RedisModuleFlushInfo *fi = data; RedisModuleFlushInfo *fi = data;
char *keyname = (sub == REDISMODULE_SUBEVENT_FLUSHDB_START) ? char *keyname = (sub == REDISMODULE_SUBEVENT_FLUSHDB_START) ?
"flush-start" : "flush-end"; "flush-start" : "flush-end";
LogNumericEvent(ctx, keyname, fi->dbnum); LogNumericEvent(ctx, keyname, fi->dbnum);
RedisModule_ThreadSafeContextUnlock(ctx);
} }
void roleChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) void roleChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)

View File

@ -104,6 +104,7 @@ set ::dont_clean 0
set ::wait_server 0 set ::wait_server 0
set ::stop_on_failure 0 set ::stop_on_failure 0
set ::loop 0 set ::loop 0
set ::endurance 0
set ::tlsdir "tests/tls" set ::tlsdir "tests/tls"
# Set to 1 when we are running in client mode. The Redis test uses a # Set to 1 when we are running in client mode. The Redis test uses a
@ -215,9 +216,13 @@ proc s {args} {
# test server, so that the test server will send them again to # test server, so that the test server will send them again to
# clients once the clients are idle. # clients once the clients are idle.
proc run_solo {name code} { proc run_solo {name code} {
if {$::numclients == 1 || $::loop < 0 || $::external} { if {$::numclients == 1 || $::loop < 0 || $::external || $::endurance} {
# run_solo is not supported in these scenarios, just run the code. # run_solo is not supported in these scenarios, just run the code.
eval $code if {$::endurance} {
puts "Skipping solo tests because endurance mode is enabled"
} else {
eval $code
}
return return
} }
send_data_packet $::test_server_fd run_solo [list $name $code] send_data_packet $::test_server_fd run_solo [list $name $code]
@ -620,6 +625,8 @@ for {set j 0} {$j < [llength $argv]} {incr j} {
} elseif {$opt eq {--timeout}} { } elseif {$opt eq {--timeout}} {
set ::timeout $arg set ::timeout $arg
incr j incr j
} elseif {$opt eq {--endurance}} {
set ::endurance 1
} elseif {$opt eq {--help}} { } elseif {$opt eq {--help}} {
print_help_screen print_help_screen
exit 0 exit 0

View File

@ -313,4 +313,14 @@ start_server {tags {"expire"}} {
after 3000 after 3000
assert_equal [r dbsize] 0 assert_equal [r dbsize] 0
} }
test {SET - use KEEPTTL option, TTL should not be removed after loadaof} {
r config set appendonly yes
r set foo bar EX 100
r set foo bar2 KEEPTTL
after 2000
r debug loadaof
set ttl [r ttl foo]
assert {$ttl <= 98 && $ttl > 90}
}
} }

View File

@ -363,6 +363,9 @@ start_server {tags {"multi"}} {
set xx [r get xx] set xx [r get xx]
# make sure that either the whole transcation passed or none of it (we actually expect none) # make sure that either the whole transcation passed or none of it (we actually expect none)
assert { $xx == 1 || $xx == 3} assert { $xx == 1 || $xx == 3}
# Discard the transaction since EXEC likely got -BUSY error
# so the client is still in MULTI state.
catch { $rd2 discard ;$rd2 read } e
# check that the connection is no longer in multi state # check that the connection is no longer in multi state
$rd2 ping asdf $rd2 ping asdf
set pong [$rd2 read] set pong [$rd2 read]