Merge branch 'unstable' into keydbpro
Former-commit-id: 0dafbc254a0efd5ee302d5c58fb2ca0a85110104
This commit is contained in:
commit
f17dab1f67
@ -11,6 +11,76 @@ CRITICAL: There is a critical bug affecting MOST USERS. Upgrade ASAP.
|
|||||||
SECURITY: There are security fixes in the release.
|
SECURITY: There are security fixes in the release.
|
||||||
--------------------------------------------------------------------------------
|
--------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
================================================================================
|
||||||
|
Redis 6.0.5 Released Tue Jun 09 11:56:08 CEST 2020
|
||||||
|
================================================================================
|
||||||
|
|
||||||
|
Upgrade urgency MODERATE: several bugs with moderate impact are fixed here.
|
||||||
|
|
||||||
|
The most important issues are listed here:
|
||||||
|
|
||||||
|
* Fix handling of speical chars in ACL LOAD.
|
||||||
|
* Make Redis Cluster more robust about operation errors that may lead
|
||||||
|
to two clusters to mix together.
|
||||||
|
* Revert the sendfile() implementation of RDB transfer. It causes some delay.
|
||||||
|
* Fix TLS certificate loading for chained certificates.
|
||||||
|
* Fix AOF rewirting of KEEPTTL SET option.
|
||||||
|
* Fix MULTI/EXEC behavior during -BUSY script errors.
|
||||||
|
|
||||||
|
And this is the full list of commits:
|
||||||
|
|
||||||
|
antirez in commit ee8dd01bb:
|
||||||
|
Temporary fix for #7353 issue about EVAL during -BUSY.
|
||||||
|
1 file changed, 9 insertions(+)
|
||||||
|
|
||||||
|
xhe in commit a4a856d53:
|
||||||
|
return the correct proto version HELLO should return the current proto version, while the code hardcoded 3
|
||||||
|
1 file changed, 1 insertion(+), 1 deletion(-)
|
||||||
|
|
||||||
|
Oran Agra in commit e2046b300:
|
||||||
|
Don't queue commands in an already aborted MULTI state
|
||||||
|
1 file changed, 7 insertions(+)
|
||||||
|
|
||||||
|
Oran Agra in commit b35fdf1de:
|
||||||
|
Avoid rejecting WATCH / UNWATCH, like MULTI/EXEC/DISCARD
|
||||||
|
1 file changed, 4 insertions(+), 2 deletions(-)
|
||||||
|
|
||||||
|
zhaozhao.zz in commit 1d7bf208c:
|
||||||
|
AOF: append origin SET if no expire option
|
||||||
|
2 files changed, 23 insertions(+), 8 deletions(-)
|
||||||
|
|
||||||
|
Oran Agra in commit 676445ad9:
|
||||||
|
fix disconnectSlaves, to try to free each slave.
|
||||||
|
1 file changed, 1 deletion(-)
|
||||||
|
|
||||||
|
zhaozhao.zz in commit 4846c0c8a:
|
||||||
|
donot free protected client in freeClientsInAsyncFreeQueue
|
||||||
|
1 file changed, 9 insertions(+), 3 deletions(-)
|
||||||
|
|
||||||
|
Oran Agra in commit f33de403e:
|
||||||
|
fix pingoff test race
|
||||||
|
1 file changed, 1 insertion(+)
|
||||||
|
|
||||||
|
Kevin Fwu in commit 49af4d07e:
|
||||||
|
Fix TLS certificate loading for chained certificates.
|
||||||
|
1 file changed, 1 insertion(+), 1 deletion(-)
|
||||||
|
|
||||||
|
antirez in commit 329fddbda:
|
||||||
|
Revert "Implements sendfile for redis."
|
||||||
|
2 files changed, 2 insertions(+), 55 deletions(-)
|
||||||
|
|
||||||
|
antirez in commit 925a2cd5a:
|
||||||
|
Revert "avoid using sendfile if tls-replication is enabled"
|
||||||
|
1 file changed, 27 insertions(+), 34 deletions(-)
|
||||||
|
|
||||||
|
Liu Zhen in commit 84a7a9058:
|
||||||
|
fix clusters mixing accidentally by gossip
|
||||||
|
1 file changed, 10 insertions(+), 2 deletions(-)
|
||||||
|
|
||||||
|
antirez in commit cd63359a1:
|
||||||
|
Fix handling of special chars in ACL LOAD.
|
||||||
|
1 file changed, 8 insertions(+), 4 deletions(-)
|
||||||
|
|
||||||
================================================================================
|
================================================================================
|
||||||
Redis 6.0.4 Released Thu May 28 11:36:45 CEST 2020
|
Redis 6.0.4 Released Thu May 28 11:36:45 CEST 2020
|
||||||
================================================================================
|
================================================================================
|
||||||
|
@ -1811,6 +1811,15 @@ jemalloc-bg-thread yes
|
|||||||
# Set bgsave child process to cpu affinity 1,10,11
|
# Set bgsave child process to cpu affinity 1,10,11
|
||||||
# bgsave_cpulist 1,10-11
|
# bgsave_cpulist 1,10-11
|
||||||
|
|
||||||
|
# The minimum number of clients on a thread before KeyDB assigns new connections to a different thread
|
||||||
|
# Tuning this parameter is a tradeoff between locking overhead and distributing the workload over multiple cores
|
||||||
|
# min-clients-per-thread 50
|
||||||
|
|
||||||
|
# Avoid forwarding RREPLAY messages to other masters?
|
||||||
|
# WARNING: This setting is dangerous! You must be certain all masters are connected to each
|
||||||
|
# other in a true mesh topology or data loss will occur!
|
||||||
|
# This command can be used to reduce multimaster bus traffic
|
||||||
|
# multi-master-no-forward no
|
||||||
|
|
||||||
# Path to directory for file backed scratchpad. The file backed scratchpad
|
# Path to directory for file backed scratchpad. The file backed scratchpad
|
||||||
# reduces memory requirements by storing rarely accessed data on disk
|
# reduces memory requirements by storing rarely accessed data on disk
|
||||||
|
0
pkg/rpm/keydb_build/keydb_rpm/etc/keydb/keydb-sentinel.conf
Executable file → Normal file
0
pkg/rpm/keydb_build/keydb_rpm/etc/keydb/keydb-sentinel.conf
Executable file → Normal file
0
pkg/rpm/keydb_build/keydb_rpm/etc/keydb/keydb.conf
Executable file → Normal file
0
pkg/rpm/keydb_build/keydb_rpm/etc/keydb/keydb.conf
Executable file → Normal file
0
pkg/rpm/keydb_build/keydb_rpm/etc/logrotate.d/keydb
Executable file → Normal file
0
pkg/rpm/keydb_build/keydb_rpm/etc/logrotate.d/keydb
Executable file → Normal file
0
pkg/rpm/keydb_build/keydb_rpm/etc/systemd/system/keydb-sentinel.service.d/limit.conf
Executable file → Normal file
0
pkg/rpm/keydb_build/keydb_rpm/etc/systemd/system/keydb-sentinel.service.d/limit.conf
Executable file → Normal file
0
pkg/rpm/keydb_build/keydb_rpm/etc/systemd/system/keydb.service.d/limit.conf
Executable file → Normal file
0
pkg/rpm/keydb_build/keydb_rpm/etc/systemd/system/keydb.service.d/limit.conf
Executable file → Normal file
0
pkg/rpm/keydb_build/keydb_rpm/usr/lib/systemd/system/keydb-sentinel.service
Executable file → Normal file
0
pkg/rpm/keydb_build/keydb_rpm/usr/lib/systemd/system/keydb-sentinel.service
Executable file → Normal file
0
pkg/rpm/keydb_build/keydb_rpm/usr/lib/systemd/system/keydb.service
Executable file → Normal file
0
pkg/rpm/keydb_build/keydb_rpm/usr/lib/systemd/system/keydb.service
Executable file → Normal file
12
src/acl.cpp
12
src/acl.cpp
@ -735,10 +735,11 @@ void ACLAddAllowedSubcommand(user *u, unsigned long id, const char *sub) {
|
|||||||
* EEXIST: You are adding a key pattern after "*" was already added. This is
|
* EEXIST: You are adding a key pattern after "*" was already added. This is
|
||||||
* almost surely an error on the user side.
|
* almost surely an error on the user side.
|
||||||
* ENODEV: The password you are trying to remove from the user does not exist.
|
* ENODEV: The password you are trying to remove from the user does not exist.
|
||||||
* EBADMSG: The hash you are trying to add is not a valid hash.
|
* EBADMSG: The hash you are trying to add is not a valid hash.
|
||||||
*/
|
*/
|
||||||
int ACLSetUser(user *u, const char *op, ssize_t oplen) {
|
int ACLSetUser(user *u, const char *op, ssize_t oplen) {
|
||||||
if (oplen == -1) oplen = strlen(op);
|
if (oplen == -1) oplen = strlen(op);
|
||||||
|
if (oplen == 0) return C_OK; /* Empty string is a no-operation. */
|
||||||
if (!strcasecmp(op,"on")) {
|
if (!strcasecmp(op,"on")) {
|
||||||
u->flags |= USER_FLAG_ENABLED;
|
u->flags |= USER_FLAG_ENABLED;
|
||||||
u->flags &= ~USER_FLAG_DISABLED;
|
u->flags &= ~USER_FLAG_DISABLED;
|
||||||
@ -1300,7 +1301,7 @@ sds ACLLoadFromFile(const char *filename) {
|
|||||||
if (lines[i][0] == '\0') continue;
|
if (lines[i][0] == '\0') continue;
|
||||||
|
|
||||||
/* Split into arguments */
|
/* Split into arguments */
|
||||||
argv = sdssplitargs(lines[i],&argc);
|
argv = sdssplitlen(lines[i],sdslen(lines[i])," ",1,&argc);
|
||||||
if (argv == NULL) {
|
if (argv == NULL) {
|
||||||
errors = sdscatprintf(errors,
|
errors = sdscatprintf(errors,
|
||||||
"%s:%d: unbalanced quotes in acl line. ",
|
"%s:%d: unbalanced quotes in acl line. ",
|
||||||
@ -1332,11 +1333,14 @@ sds ACLLoadFromFile(const char *filename) {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Try to process the line using the fake user to validate iif
|
/* Try to process the line using the fake user to validate if
|
||||||
* the rules are able to apply cleanly. */
|
* the rules are able to apply cleanly. At this stage we also
|
||||||
|
* trim trailing spaces, so that we don't have to handle that
|
||||||
|
* in ACLSetUser(). */
|
||||||
ACLSetUser(fakeuser,"reset",-1);
|
ACLSetUser(fakeuser,"reset",-1);
|
||||||
int j;
|
int j;
|
||||||
for (j = 2; j < argc; j++) {
|
for (j = 2; j < argc; j++) {
|
||||||
|
argv[j] = sdstrim(argv[j],"\t\r\n");
|
||||||
if (ACLSetUser(fakeuser,argv[j],sdslen(argv[j])) != C_OK) {
|
if (ACLSetUser(fakeuser,argv[j],sdslen(argv[j])) != C_OK) {
|
||||||
const char *errmsg = ACLSetUserStringError();
|
const char *errmsg = ACLSetUserStringError();
|
||||||
errors = sdscatprintf(errors,
|
errors = sdscatprintf(errors,
|
||||||
|
@ -237,7 +237,7 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
|
|||||||
|
|
||||||
int ret = AE_OK;
|
int ret = AE_OK;
|
||||||
|
|
||||||
aeCommand cmd = {};
|
aeCommand cmd;
|
||||||
cmd.op = AE_ASYNC_OP::CreateFileEvent;
|
cmd.op = AE_ASYNC_OP::CreateFileEvent;
|
||||||
cmd.fd = fd;
|
cmd.fd = fd;
|
||||||
cmd.mask = mask;
|
cmd.mask = mask;
|
||||||
@ -261,7 +261,7 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
|
|||||||
|
|
||||||
if (fSynchronous)
|
if (fSynchronous)
|
||||||
{
|
{
|
||||||
std::unique_lock<std::mutex> ulock(cmd.pctl->mutexcv, std::defer_lock);
|
std::unique_lock<std::mutex> ulock(cmd.pctl->mutexcv, std::adopt_lock);
|
||||||
cmd.pctl->cv.wait(ulock);
|
cmd.pctl->cv.wait(ulock);
|
||||||
ret = cmd.pctl->rval;
|
ret = cmd.pctl->rval;
|
||||||
delete cmd.pctl;
|
delete cmd.pctl;
|
||||||
@ -297,7 +297,6 @@ int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynch
|
|||||||
}
|
}
|
||||||
|
|
||||||
aeCommand cmd = {};
|
aeCommand cmd = {};
|
||||||
memset(&cmd, 0, sizeof(aeCommand));
|
|
||||||
cmd.op = AE_ASYNC_OP::PostCppFunction;
|
cmd.op = AE_ASYNC_OP::PostCppFunction;
|
||||||
cmd.pfn = new std::function<void()>(fn);
|
cmd.pfn = new std::function<void()>(fn);
|
||||||
cmd.pctl = nullptr;
|
cmd.pctl = nullptr;
|
||||||
@ -316,7 +315,7 @@ int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynch
|
|||||||
int ret = AE_OK;
|
int ret = AE_OK;
|
||||||
if (fSynchronous)
|
if (fSynchronous)
|
||||||
{
|
{
|
||||||
std::unique_lock<std::mutex> ulock(cmd.pctl->mutexcv, std::defer_lock);
|
std::unique_lock<std::mutex> ulock(cmd.pctl->mutexcv, std::adopt_lock);
|
||||||
cmd.pctl->cv.wait(ulock);
|
cmd.pctl->cv.wait(ulock);
|
||||||
ret = cmd.pctl->rval;
|
ret = cmd.pctl->rval;
|
||||||
delete cmd.pctl;
|
delete cmd.pctl;
|
||||||
@ -458,7 +457,7 @@ void aeDeleteFileEventAsync(aeEventLoop *eventLoop, int fd, int mask)
|
|||||||
{
|
{
|
||||||
if (eventLoop == g_eventLoopThisThread)
|
if (eventLoop == g_eventLoopThisThread)
|
||||||
return aeDeleteFileEvent(eventLoop, fd, mask);
|
return aeDeleteFileEvent(eventLoop, fd, mask);
|
||||||
aeCommand cmd;
|
aeCommand cmd = {};
|
||||||
cmd.op = AE_ASYNC_OP::DeleteFileEvent;
|
cmd.op = AE_ASYNC_OP::DeleteFileEvent;
|
||||||
cmd.fd = fd;
|
cmd.fd = fd;
|
||||||
cmd.mask = mask;
|
cmd.mask = mask;
|
||||||
|
20
src/aof.cpp
20
src/aof.cpp
@ -671,19 +671,23 @@ sds catCommandForAofAndActiveReplication(sds buf, struct redisCommand *cmd, robj
|
|||||||
} else if (cmd->proc == setCommand && argc > 3) {
|
} else if (cmd->proc == setCommand && argc > 3) {
|
||||||
int i;
|
int i;
|
||||||
robj *exarg = NULL, *pxarg = NULL;
|
robj *exarg = NULL, *pxarg = NULL;
|
||||||
/* Translate SET [EX seconds][PX milliseconds] to SET and PEXPIREAT */
|
|
||||||
buf = catAppendOnlyGenericCommand(buf,3,argv);
|
|
||||||
for (i = 3; i < argc; i ++) {
|
for (i = 3; i < argc; i ++) {
|
||||||
if (!strcasecmp(szFromObj(argv[i]), "ex")) exarg = argv[i+1];
|
if (!strcasecmp(szFromObj(argv[i]), "ex")) exarg = argv[i+1];
|
||||||
if (!strcasecmp(szFromObj(argv[i]), "px")) pxarg = argv[i+1];
|
if (!strcasecmp(szFromObj(argv[i]), "px")) pxarg = argv[i+1];
|
||||||
}
|
}
|
||||||
serverAssert(!(exarg && pxarg));
|
serverAssert(!(exarg && pxarg));
|
||||||
if (exarg)
|
if (exarg || pxarg) {
|
||||||
buf = catAppendOnlyExpireAtCommand(buf,cserver.expireCommand,argv[1],
|
/* Translate SET [EX seconds][PX milliseconds] to SET and PEXPIREAT */
|
||||||
exarg);
|
buf = catAppendOnlyGenericCommand(buf,3,argv);
|
||||||
if (pxarg)
|
if (exarg)
|
||||||
buf = catAppendOnlyExpireAtCommand(buf,cserver.pexpireCommand,argv[1],
|
buf = catAppendOnlyExpireAtCommand(buf,cserver.expireCommand,argv[1],
|
||||||
pxarg);
|
exarg);
|
||||||
|
if (pxarg)
|
||||||
|
buf = catAppendOnlyExpireAtCommand(buf,cserver.pexpireCommand,argv[1],
|
||||||
|
pxarg);
|
||||||
|
} else {
|
||||||
|
buf = catAppendOnlyGenericCommand(buf,argc,argv);
|
||||||
|
}
|
||||||
} else if (cmd->proc == expireMemberCommand || cmd->proc == expireMemberAtCommand ||
|
} else if (cmd->proc == expireMemberCommand || cmd->proc == expireMemberAtCommand ||
|
||||||
cmd->proc == pexpireMemberAtCommand) {
|
cmd->proc == pexpireMemberAtCommand) {
|
||||||
/* Translate subkey expire commands to PEXPIREMEMBERAT */
|
/* Translate subkey expire commands to PEXPIREMEMBERAT */
|
||||||
|
@ -1501,7 +1501,10 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
/* If it's not in NOADDR state and we don't have it, we
|
/* If it's not in NOADDR state and we don't have it, we
|
||||||
* start a handshake process against this IP/PORT pairs.
|
* add it to our trusted dict with exact nodeid and flag.
|
||||||
|
* Note that we cannot simply start a handshake against
|
||||||
|
* this IP/PORT pairs, since IP/PORT can be reused already,
|
||||||
|
* otherwise we risk joining another cluster.
|
||||||
*
|
*
|
||||||
* Note that we require that the sender of this gossip message
|
* Note that we require that the sender of this gossip message
|
||||||
* is a well known node in our cluster, otherwise we risk
|
* is a well known node in our cluster, otherwise we risk
|
||||||
@ -1510,7 +1513,12 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
|
|||||||
!(flags & CLUSTER_NODE_NOADDR) &&
|
!(flags & CLUSTER_NODE_NOADDR) &&
|
||||||
!clusterBlacklistExists(g->nodename))
|
!clusterBlacklistExists(g->nodename))
|
||||||
{
|
{
|
||||||
clusterStartHandshake(g->ip,ntohs(g->port),ntohs(g->cport));
|
clusterNode *node;
|
||||||
|
node = createClusterNode(g->nodename, flags);
|
||||||
|
memcpy(node->ip,g->ip,NET_IP_STR_LEN);
|
||||||
|
node->port = ntohs(g->port);
|
||||||
|
node->cport = ntohs(g->cport);
|
||||||
|
clusterAddNode(node);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2238,6 +2238,13 @@ static int updateMaxclients(long long val, long long prev, const char **err) {
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int validateMultiMasterNoForward(int val, const char **) {
|
||||||
|
if (val) {
|
||||||
|
serverLog(LL_WARNING, "WARNING: multi-master-no-forward is set, you *must* use a mesh topology or dataloss will occur");
|
||||||
|
}
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
#ifdef USE_OPENSSL
|
#ifdef USE_OPENSSL
|
||||||
static int updateTlsCfg(char *val, char *prev, const char **err) {
|
static int updateTlsCfg(char *val, char *prev, const char **err) {
|
||||||
UNUSED(val);
|
UNUSED(val);
|
||||||
@ -2295,6 +2302,7 @@ standardConfig configs[] = {
|
|||||||
createBoolConfig("cluster-allow-reads-when-down", NULL, MODIFIABLE_CONFIG, g_pserver->cluster_allow_reads_when_down, 0, NULL, NULL),
|
createBoolConfig("cluster-allow-reads-when-down", NULL, MODIFIABLE_CONFIG, g_pserver->cluster_allow_reads_when_down, 0, NULL, NULL),
|
||||||
createBoolConfig("delete-on-evict", NULL, MODIFIABLE_CONFIG, cserver.delete_on_evict, 0, NULL, NULL),
|
createBoolConfig("delete-on-evict", NULL, MODIFIABLE_CONFIG, cserver.delete_on_evict, 0, NULL, NULL),
|
||||||
createBoolConfig("io-threads-do-reads", NULL, IMMUTABLE_CONFIG, fDummy, 0,NULL, NULL), // Not applicable to KeyDB, just there for compatibility
|
createBoolConfig("io-threads-do-reads", NULL, IMMUTABLE_CONFIG, fDummy, 0,NULL, NULL), // Not applicable to KeyDB, just there for compatibility
|
||||||
|
createBoolConfig("multi-master-no-forward", NULL, MODIFIABLE_CONFIG, cserver.multimaster_no_forward, 0, validateMultiMasterNoForward, NULL),
|
||||||
|
|
||||||
/* String Configs */
|
/* String Configs */
|
||||||
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, g_pserver->acl_filename, "", NULL, NULL),
|
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, g_pserver->acl_filename, "", NULL, NULL),
|
||||||
|
@ -165,7 +165,7 @@ static int connSocketWrite(connection *conn, const void *data, size_t data_len)
|
|||||||
int ret = write(conn->fd, data, data_len);
|
int ret = write(conn->fd, data, data_len);
|
||||||
if (ret < 0 && errno != EAGAIN) {
|
if (ret < 0 && errno != EAGAIN) {
|
||||||
conn->last_errno = errno;
|
conn->last_errno = errno;
|
||||||
conn->state = CONN_STATE_ERROR;
|
conn->state.store(CONN_STATE_ERROR, std::memory_order_relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
@ -174,10 +174,10 @@ static int connSocketWrite(connection *conn, const void *data, size_t data_len)
|
|||||||
static int connSocketRead(connection *conn, void *buf, size_t buf_len) {
|
static int connSocketRead(connection *conn, void *buf, size_t buf_len) {
|
||||||
int ret = read(conn->fd, buf, buf_len);
|
int ret = read(conn->fd, buf, buf_len);
|
||||||
if (!ret) {
|
if (!ret) {
|
||||||
conn->state = CONN_STATE_CLOSED;
|
conn->state.store(CONN_STATE_CLOSED, std::memory_order_release);
|
||||||
} else if (ret < 0 && errno != EAGAIN) {
|
} else if (ret < 0 && errno != EAGAIN) {
|
||||||
conn->last_errno = errno;
|
conn->last_errno = errno;
|
||||||
conn->state = CONN_STATE_ERROR;
|
conn->state.store(CONN_STATE_ERROR, std::memory_order_release);
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
@ -256,14 +256,14 @@ void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, in
|
|||||||
UNUSED(fd);
|
UNUSED(fd);
|
||||||
connection *conn = (connection*)clientData;
|
connection *conn = (connection*)clientData;
|
||||||
|
|
||||||
if (conn->state == CONN_STATE_CONNECTING &&
|
if (conn->state.load(std::memory_order_relaxed) == CONN_STATE_CONNECTING &&
|
||||||
(mask & AE_WRITABLE) && conn->conn_handler) {
|
(mask & AE_WRITABLE) && conn->conn_handler) {
|
||||||
|
|
||||||
if (connGetSocketError(conn)) {
|
if (connGetSocketError(conn)) {
|
||||||
conn->last_errno = errno;
|
conn->last_errno = errno;
|
||||||
conn->state = CONN_STATE_ERROR;
|
conn->state.store(CONN_STATE_ERROR, std::memory_order_release);
|
||||||
} else {
|
} else {
|
||||||
conn->state = CONN_STATE_CONNECTED;
|
conn->state.store(CONN_STATE_CONNECTED, std::memory_order_release);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!conn->write_handler) aeDeleteFileEvent(serverTL->el,conn->fd,AE_WRITABLE);
|
if (!conn->write_handler) aeDeleteFileEvent(serverTL->el,conn->fd,AE_WRITABLE);
|
||||||
@ -426,7 +426,7 @@ int connRecvTimeout(connection *conn, long long ms) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
int connGetState(connection *conn) {
|
int connGetState(connection *conn) {
|
||||||
return conn->state;
|
return conn->state.load(std::memory_order_relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
void connSetThreadAffinity(connection *conn, int cpu) {
|
void connSetThreadAffinity(connection *conn, int cpu) {
|
||||||
|
@ -741,6 +741,13 @@ void existsCommand(client *c) {
|
|||||||
addReplyLongLong(c,count);
|
addReplyLongLong(c,count);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void mexistsCommand(client *c) {
|
||||||
|
addReplyArrayLen(c, c->argc - 1);
|
||||||
|
for (int j = 1; j < c->argc; ++j) {
|
||||||
|
addReplyBool(c, lookupKeyRead(c->db, c->argv[j]));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void selectCommand(client *c) {
|
void selectCommand(client *c) {
|
||||||
long id;
|
long id;
|
||||||
|
|
||||||
@ -2737,4 +2744,4 @@ int dbnumFromDb(redisDb *db)
|
|||||||
return i;
|
return i;
|
||||||
}
|
}
|
||||||
serverPanic("invalid database pointer");
|
serverPanic("invalid database pointer");
|
||||||
}
|
}
|
||||||
|
@ -152,7 +152,11 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
|
|||||||
fTtlChanged = true;
|
fTtlChanged = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!pfat->FEmpty() && fTtlChanged)
|
if (pfat->FEmpty())
|
||||||
|
{
|
||||||
|
removeExpire(db, &objKey);
|
||||||
|
}
|
||||||
|
else if (!pfat->FEmpty() && fTtlChanged)
|
||||||
{
|
{
|
||||||
// We need to resort the expire entry since it may no longer be in the correct position
|
// We need to resort the expire entry since it may no longer be in the correct position
|
||||||
db->resortExpire(e);
|
db->resortExpire(e);
|
||||||
@ -168,11 +172,6 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pfat->FEmpty())
|
|
||||||
{
|
|
||||||
removeExpire(db, &objKey);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int parseUnitString(const char *sz)
|
int parseUnitString(const char *sz)
|
||||||
|
@ -1325,7 +1325,12 @@ struct commandHelp {
|
|||||||
"Rename a hash key, copying the value.",
|
"Rename a hash key, copying the value.",
|
||||||
4,
|
4,
|
||||||
"6.5.3"
|
"6.5.3"
|
||||||
}
|
},
|
||||||
|
{ "KEYDB.MEXISTS",
|
||||||
|
"key [key ...]",
|
||||||
|
"Determine if a key exists",
|
||||||
|
0,
|
||||||
|
"6.5.12" },
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
@ -4895,14 +4895,17 @@ void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) {
|
|||||||
zfree(ctx);
|
zfree(ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool g_fModuleThread = false;
|
||||||
/* Acquire the server lock before executing a thread safe API call.
|
/* Acquire the server lock before executing a thread safe API call.
|
||||||
* This is not needed for `RedisModule_Reply*` calls when there is
|
* This is not needed for `RedisModule_Reply*` calls when there is
|
||||||
* a blocked client connected to the thread safe context. */
|
* a blocked client connected to the thread safe context. */
|
||||||
void RM_ThreadSafeContextLock(RedisModuleCtx *ctx) {
|
void RM_ThreadSafeContextLock(RedisModuleCtx *ctx) {
|
||||||
UNUSED(ctx);
|
UNUSED(ctx);
|
||||||
moduleAcquireGIL(FALSE /*fServerThread*/);
|
if (serverTL == nullptr) {
|
||||||
if (serverTL == nullptr)
|
|
||||||
serverTL = &g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN]; // arbitrary module threads get the main thread context
|
serverTL = &g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN]; // arbitrary module threads get the main thread context
|
||||||
|
g_fModuleThread = true;
|
||||||
|
}
|
||||||
|
moduleAcquireGIL(FALSE /*fServerThread*/);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Release the server lock after a thread safe API call was executed. */
|
/* Release the server lock after a thread safe API call was executed. */
|
||||||
@ -4911,10 +4914,20 @@ void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) {
|
|||||||
moduleReleaseGIL(FALSE /*fServerThread*/);
|
moduleReleaseGIL(FALSE /*fServerThread*/);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// A module may be triggered synchronously in a non-module context. In this scenario we don't lock again
|
||||||
|
// as the server thread acquisition is sufficient. If we did try to lock we would deadlock
|
||||||
|
static bool FModuleCallBackLock(bool fServerThread)
|
||||||
|
{
|
||||||
|
return !fServerThread && aeThreadOwnsLock() && !g_fModuleThread && s_cAcquisitionsServer > 0;
|
||||||
|
}
|
||||||
void moduleAcquireGIL(int fServerThread) {
|
void moduleAcquireGIL(int fServerThread) {
|
||||||
std::unique_lock<std::mutex> lock(s_mutex);
|
std::unique_lock<std::mutex> lock(s_mutex);
|
||||||
int *pcheck = fServerThread ? &s_cAcquisitionsModule : &s_cAcquisitionsServer;
|
int *pcheck = fServerThread ? &s_cAcquisitionsModule : &s_cAcquisitionsServer;
|
||||||
|
|
||||||
|
if (FModuleCallBackLock(fServerThread)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
while (*pcheck > 0)
|
while (*pcheck > 0)
|
||||||
s_cv.wait(lock);
|
s_cv.wait(lock);
|
||||||
|
|
||||||
@ -4932,6 +4945,10 @@ void moduleAcquireGIL(int fServerThread) {
|
|||||||
void moduleReleaseGIL(int fServerThread) {
|
void moduleReleaseGIL(int fServerThread) {
|
||||||
std::unique_lock<std::mutex> lock(s_mutex);
|
std::unique_lock<std::mutex> lock(s_mutex);
|
||||||
|
|
||||||
|
if (FModuleCallBackLock(fServerThread)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (fServerThread)
|
if (fServerThread)
|
||||||
{
|
{
|
||||||
--s_cAcquisitionsServer;
|
--s_cAcquisitionsServer;
|
||||||
|
@ -59,6 +59,13 @@ void queueMultiCommand(client *c) {
|
|||||||
multiCmd *mc;
|
multiCmd *mc;
|
||||||
int j;
|
int j;
|
||||||
|
|
||||||
|
/* No sense to waste memory if the transaction is already aborted.
|
||||||
|
* this is useful in case client sends these in a pipeline, or doesn't
|
||||||
|
* bother to read previous responses and didn't notice the multi was already
|
||||||
|
* aborted. */
|
||||||
|
if (c->flags & CLIENT_DIRTY_EXEC)
|
||||||
|
return;
|
||||||
|
|
||||||
c->mstate.commands = (multiCmd*)zrealloc(c->mstate.commands,
|
c->mstate.commands = (multiCmd*)zrealloc(c->mstate.commands,
|
||||||
sizeof(multiCmd)*(c->mstate.count+1), MALLOC_LOCAL);
|
sizeof(multiCmd)*(c->mstate.count+1), MALLOC_LOCAL);
|
||||||
mc = c->mstate.commands+c->mstate.count;
|
mc = c->mstate.commands+c->mstate.count;
|
||||||
@ -131,6 +138,15 @@ void execCommand(client *c) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* If we are in -BUSY state, flag the transaction and return the
|
||||||
|
* -BUSY error, like Redis <= 5. This is a temporary fix, may be changed
|
||||||
|
* ASAP, see issue #7353 on Github. */
|
||||||
|
if (g_pserver->lua_timedout) {
|
||||||
|
flagTransaction(c);
|
||||||
|
addReply(c, shared.slowscripterr);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
/* Check if we need to abort the EXEC because:
|
/* Check if we need to abort the EXEC because:
|
||||||
* 1) Some WATCHed key was touched.
|
* 1) Some WATCHed key was touched.
|
||||||
* 2) There was a previous error while queueing commands.
|
* 2) There was a previous error while queueing commands.
|
||||||
|
@ -1652,7 +1652,7 @@ int freeClientsInAsyncFreeQueue(int iel) {
|
|||||||
while((ln = listNext(&li)))
|
while((ln = listNext(&li)))
|
||||||
{
|
{
|
||||||
client *c = (client*)listNodeValue(ln);
|
client *c = (client*)listNodeValue(ln);
|
||||||
if (c->iel == iel)
|
if (c->iel == iel && !(c->flags & CLIENT_PROTECTED))
|
||||||
{
|
{
|
||||||
vecclientsFree.push_back(c);
|
vecclientsFree.push_back(c);
|
||||||
listDelNode(g_pserver->clients_to_close, ln);
|
listDelNode(g_pserver->clients_to_close, ln);
|
||||||
@ -1935,9 +1935,12 @@ int handleClientsWithPendingWrites(int iel, int aof_state) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
AeLocker locker;
|
if (listLength(serverTL->clients_pending_asyncwrite))
|
||||||
locker.arm(nullptr);
|
{
|
||||||
ProcessPendingAsyncWrites();
|
AeLocker locker;
|
||||||
|
locker.arm(nullptr);
|
||||||
|
ProcessPendingAsyncWrites();
|
||||||
|
}
|
||||||
|
|
||||||
return processed;
|
return processed;
|
||||||
}
|
}
|
||||||
@ -3065,7 +3068,7 @@ void helloCommand(client *c) {
|
|||||||
addReplyBulkCString(c,KEYDB_SET_VERSION);
|
addReplyBulkCString(c,KEYDB_SET_VERSION);
|
||||||
|
|
||||||
addReplyBulkCString(c,"proto");
|
addReplyBulkCString(c,"proto");
|
||||||
addReplyLongLong(c,3);
|
addReplyLongLong(c,ver);
|
||||||
|
|
||||||
addReplyBulkCString(c,"id");
|
addReplyBulkCString(c,"id");
|
||||||
addReplyLongLong(c,c->id);
|
addReplyLongLong(c,c->id);
|
||||||
|
@ -2497,9 +2497,8 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
|
|||||||
if (fStaleMvccKey && !fExpiredKey && rsi != nullptr && rsi->mi != nullptr && rsi->mi->staleKeyMap != nullptr && lookupKeyRead(db, &keyobj) == nullptr) {
|
if (fStaleMvccKey && !fExpiredKey && rsi != nullptr && rsi->mi != nullptr && rsi->mi->staleKeyMap != nullptr && lookupKeyRead(db, &keyobj) == nullptr) {
|
||||||
// We have a key that we've already deleted and is not back in our database.
|
// We have a key that we've already deleted and is not back in our database.
|
||||||
// We'll need to inform the sending master of the delete if it is also a replica of us
|
// We'll need to inform the sending master of the delete if it is also a replica of us
|
||||||
robj *objKeyDup = createStringObject(key, sdslen(key));
|
robj_sharedptr objKeyDup(createStringObject(key, sdslen(key)));
|
||||||
rsi->mi->staleKeyMap->operator[](dbid).push_back(objKeyDup);
|
rsi->mi->staleKeyMap->operator[](db->id).push_back(objKeyDup);
|
||||||
decrRefCount(objKeyDup);
|
|
||||||
}
|
}
|
||||||
fLastKeyExpired = true;
|
fLastKeyExpired = true;
|
||||||
sdsfree(key);
|
sdsfree(key);
|
||||||
|
@ -1388,30 +1388,28 @@ void sendBulkToSlave(connection *conn) {
|
|||||||
* try to use sendfile system call if supported, unless tls is enabled.
|
* try to use sendfile system call if supported, unless tls is enabled.
|
||||||
* fallback to normal read+write otherwise. */
|
* fallback to normal read+write otherwise. */
|
||||||
nwritten = 0;
|
nwritten = 0;
|
||||||
if (!nwritten) {
|
ssize_t buflen;
|
||||||
ssize_t buflen;
|
char buf[PROTO_IOBUF_LEN];
|
||||||
char buf[PROTO_IOBUF_LEN];
|
|
||||||
|
|
||||||
lseek(replica->repldbfd,replica->repldboff,SEEK_SET);
|
lseek(replica->repldbfd,replica->repldboff,SEEK_SET);
|
||||||
buflen = read(replica->repldbfd,buf,PROTO_IOBUF_LEN);
|
buflen = read(replica->repldbfd,buf,PROTO_IOBUF_LEN);
|
||||||
if (buflen <= 0) {
|
if (buflen <= 0) {
|
||||||
serverLog(LL_WARNING,"Read error sending DB to replica: %s",
|
serverLog(LL_WARNING,"Read error sending DB to replica: %s",
|
||||||
(buflen == 0) ? "premature EOF" : strerror(errno));
|
(buflen == 0) ? "premature EOF" : strerror(errno));
|
||||||
|
ul.unlock();
|
||||||
|
aeLock.arm(nullptr);
|
||||||
|
freeClient(replica);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if ((nwritten = connWrite(conn,buf,buflen)) == -1) {
|
||||||
|
if (connGetState(conn) != CONN_STATE_CONNECTED) {
|
||||||
|
serverLog(LL_WARNING,"Write error sending DB to replica: %s",
|
||||||
|
connGetLastError(conn));
|
||||||
ul.unlock();
|
ul.unlock();
|
||||||
aeLock.arm(nullptr);
|
aeLock.arm(nullptr);
|
||||||
freeClient(replica);
|
freeClient(replica);
|
||||||
return;
|
|
||||||
}
|
|
||||||
if ((nwritten = connWrite(conn,buf,buflen)) == -1) {
|
|
||||||
if (connGetState(conn) != CONN_STATE_CONNECTED) {
|
|
||||||
serverLog(LL_WARNING,"Write error sending DB to replica: %s",
|
|
||||||
connGetLastError(conn));
|
|
||||||
ul.unlock();
|
|
||||||
aeLock.arm(nullptr);
|
|
||||||
freeClient(replica);
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
replica->repldboff += nwritten;
|
replica->repldboff += nwritten;
|
||||||
@ -2086,7 +2084,6 @@ void readSyncBulkPayload(connection *conn) {
|
|||||||
*
|
*
|
||||||
* 2. Or when we are done reading from the socket to the RDB file, in
|
* 2. Or when we are done reading from the socket to the RDB file, in
|
||||||
* such case we want just to read the RDB file in memory. */
|
* such case we want just to read the RDB file in memory. */
|
||||||
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data");
|
|
||||||
|
|
||||||
/* We need to stop any AOF rewriting child before flusing and parsing
|
/* We need to stop any AOF rewriting child before flusing and parsing
|
||||||
* the RDB, otherwise we'll create a copy-on-write disaster. */
|
* the RDB, otherwise we'll create a copy-on-write disaster. */
|
||||||
@ -2099,13 +2096,13 @@ void readSyncBulkPayload(connection *conn) {
|
|||||||
* dictionaries */
|
* dictionaries */
|
||||||
diskless_load_backup = disklessLoadMakeBackups();
|
diskless_load_backup = disklessLoadMakeBackups();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!fUpdate)
|
/* We call to emptyDb even in case of REPL_DISKLESS_LOAD_SWAPDB
|
||||||
{
|
* (Where disklessLoadMakeBackups left server.db empty) because we
|
||||||
/* We call to emptyDb even in case of REPL_DISKLESS_LOAD_SWAPDB
|
* want to execute all the auxiliary logic of emptyDb (Namely,
|
||||||
* (Where disklessLoadMakeBackups left server.db empty) because we
|
* fire module events) */
|
||||||
* want to execute all the auxiliary logic of emptyDb (Namely,
|
if (!fUpdate) {
|
||||||
* fire module events) */
|
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data");
|
||||||
emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
|
emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -3203,15 +3200,46 @@ void replicaofCommand(client *c) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* The special host/port combination "NO" "ONE" turns the instance
|
if (c->argc > 3) {
|
||||||
* into a master. Otherwise the new master address is set. */
|
if (c->argc != 4) {
|
||||||
if (!strcasecmp((const char*)ptrFromObj(c->argv[1]),"no") &&
|
addReplyError(c, "Invalid arguments");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (!strcasecmp((const char*)ptrFromObj(c->argv[1]),"remove")) {
|
||||||
|
listIter li;
|
||||||
|
listNode *ln;
|
||||||
|
bool fRemoved = false;
|
||||||
|
long port;
|
||||||
|
string2l(szFromObj(c->argv[3]), sdslen(szFromObj(c->argv[3])), &port);
|
||||||
|
LRestart:
|
||||||
|
listRewind(g_pserver->masters, &li);
|
||||||
|
while ((ln = listNext(&li))) {
|
||||||
|
redisMaster *mi = (redisMaster*)listNodeValue(ln);
|
||||||
|
if (mi->masterport != port)
|
||||||
|
continue;
|
||||||
|
if (sdscmp(szFromObj(c->argv[2]), mi->masterhost) == 0) {
|
||||||
|
replicationUnsetMaster(mi);
|
||||||
|
fRemoved = true;
|
||||||
|
goto LRestart;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!fRemoved) {
|
||||||
|
addReplyError(c, "Master not found");
|
||||||
|
return;
|
||||||
|
} else if (listLength(g_pserver->masters) == 0) {
|
||||||
|
goto LLogNoMaster;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if (!strcasecmp((const char*)ptrFromObj(c->argv[1]),"no") &&
|
||||||
!strcasecmp((const char*)ptrFromObj(c->argv[2]),"one")) {
|
!strcasecmp((const char*)ptrFromObj(c->argv[2]),"one")) {
|
||||||
|
/* The special host/port combination "NO" "ONE" turns the instance
|
||||||
|
* into a master. Otherwise the new master address is set. */
|
||||||
if (listLength(g_pserver->masters)) {
|
if (listLength(g_pserver->masters)) {
|
||||||
while (listLength(g_pserver->masters))
|
while (listLength(g_pserver->masters))
|
||||||
{
|
{
|
||||||
replicationUnsetMaster((redisMaster*)listNodeValue(listFirst(g_pserver->masters)));
|
replicationUnsetMaster((redisMaster*)listNodeValue(listFirst(g_pserver->masters)));
|
||||||
}
|
}
|
||||||
|
LLogNoMaster:
|
||||||
sds client = catClientInfoString(sdsempty(),c);
|
sds client = catClientInfoString(sdsempty(),c);
|
||||||
serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')",
|
serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')",
|
||||||
client);
|
client);
|
||||||
@ -4270,7 +4298,7 @@ void replicaReplayCommand(client *c)
|
|||||||
serverTL->current_client = current_clientSave;
|
serverTL->current_client = current_clientSave;
|
||||||
|
|
||||||
// call() will not propogate this for us, so we do so here
|
// call() will not propogate this for us, so we do so here
|
||||||
if (!s_pstate->FCancelled() && s_pstate->FFirst())
|
if (!s_pstate->FCancelled() && s_pstate->FFirst() && !cserver.multimaster_no_forward)
|
||||||
alsoPropagate(cserver.rreplayCommand,c->db->id,c->argv,c->argc,PROPAGATE_AOF|PROPAGATE_REPL);
|
alsoPropagate(cserver.rreplayCommand,c->db->id,c->argv,c->argc,PROPAGATE_AOF|PROPAGATE_REPL);
|
||||||
|
|
||||||
s_pstate->Pop();
|
s_pstate->Pop();
|
||||||
|
@ -248,6 +248,10 @@ struct redisCommand redisCommandTable[] = {
|
|||||||
"read-only fast @keyspace",
|
"read-only fast @keyspace",
|
||||||
0,NULL,1,-1,1,0,0,0},
|
0,NULL,1,-1,1,0,0,0},
|
||||||
|
|
||||||
|
{"keydb.mexists",mexistsCommand,-2,
|
||||||
|
"read-only fast @keyspace",
|
||||||
|
0,NULL,1,-1,1,0,0,0},
|
||||||
|
|
||||||
{"setbit",setbitCommand,4,
|
{"setbit",setbitCommand,4,
|
||||||
"write use-memory @bitmap",
|
"write use-memory @bitmap",
|
||||||
0,NULL,1,1,1,0,0,0},
|
0,NULL,1,1,1,0,0,0},
|
||||||
@ -769,7 +773,7 @@ struct redisCommand redisCommandTable[] = {
|
|||||||
"admin no-script ok-stale",
|
"admin no-script ok-stale",
|
||||||
0,NULL,0,0,0,0,0,0},
|
0,NULL,0,0,0,0,0,0},
|
||||||
|
|
||||||
{"replicaof",replicaofCommand,3,
|
{"replicaof",replicaofCommand,-3,
|
||||||
"admin no-script ok-stale",
|
"admin no-script ok-stale",
|
||||||
0,NULL,0,0,0,0,0,0},
|
0,NULL,0,0,0,0,0,0},
|
||||||
|
|
||||||
@ -810,11 +814,11 @@ struct redisCommand redisCommandTable[] = {
|
|||||||
0,NULL,0,0,0,0,0,0},
|
0,NULL,0,0,0,0,0,0},
|
||||||
|
|
||||||
{"watch",watchCommand,-2,
|
{"watch",watchCommand,-2,
|
||||||
"no-script fast @transaction",
|
"no-script fast ok-loading ok-stale @transaction",
|
||||||
0,NULL,1,-1,1,0,0,0},
|
0,NULL,1,-1,1,0,0,0},
|
||||||
|
|
||||||
{"unwatch",unwatchCommand,1,
|
{"unwatch",unwatchCommand,1,
|
||||||
"no-script fast @transaction",
|
"no-script fast ok-loading ok-stale @transaction",
|
||||||
0,NULL,0,0,0,0,0,0},
|
0,NULL,0,0,0,0,0,0},
|
||||||
|
|
||||||
{"cluster",clusterCommand,-2,
|
{"cluster",clusterCommand,-2,
|
||||||
@ -4078,6 +4082,8 @@ int processCommand(client *c, int callFlags, AeLocker &locker) {
|
|||||||
c->cmd->proc != multiCommand &&
|
c->cmd->proc != multiCommand &&
|
||||||
c->cmd->proc != execCommand &&
|
c->cmd->proc != execCommand &&
|
||||||
c->cmd->proc != discardCommand &&
|
c->cmd->proc != discardCommand &&
|
||||||
|
c->cmd->proc != watchCommand &&
|
||||||
|
c->cmd->proc != unwatchCommand &&
|
||||||
!(c->cmd->proc == shutdownCommand &&
|
!(c->cmd->proc == shutdownCommand &&
|
||||||
c->argc == 2 &&
|
c->argc == 2 &&
|
||||||
tolower(((char*)ptrFromObj(c->argv[1]))[0]) == 'n') &&
|
tolower(((char*)ptrFromObj(c->argv[1]))[0]) == 'n') &&
|
||||||
|
91
src/server.h
91
src/server.h
@ -228,22 +228,24 @@ class robj_sharedptr
|
|||||||
|
|
||||||
public:
|
public:
|
||||||
robj_sharedptr()
|
robj_sharedptr()
|
||||||
: m_ptr(nullptr)
|
: m_ptr(nullptr)
|
||||||
{}
|
{}
|
||||||
robj_sharedptr(redisObject *ptr)
|
explicit robj_sharedptr(redisObject *ptr)
|
||||||
: m_ptr(ptr)
|
: m_ptr(ptr)
|
||||||
{
|
{
|
||||||
|
if(m_ptr)
|
||||||
incrRefCount(ptr);
|
incrRefCount(ptr);
|
||||||
}
|
}
|
||||||
~robj_sharedptr()
|
~robj_sharedptr()
|
||||||
{
|
{
|
||||||
if (m_ptr)
|
if (m_ptr)
|
||||||
decrRefCount(m_ptr);
|
decrRefCount(m_ptr);
|
||||||
}
|
}
|
||||||
robj_sharedptr(const robj_sharedptr& other)
|
robj_sharedptr(const robj_sharedptr& other)
|
||||||
{
|
: m_ptr(other.m_ptr)
|
||||||
m_ptr = other.m_ptr;
|
{
|
||||||
incrRefCount(m_ptr);
|
if(m_ptr)
|
||||||
|
incrRefCount(m_ptr);
|
||||||
}
|
}
|
||||||
|
|
||||||
robj_sharedptr(robj_sharedptr&& other)
|
robj_sharedptr(robj_sharedptr&& other)
|
||||||
@ -254,41 +256,19 @@ public:
|
|||||||
|
|
||||||
robj_sharedptr &operator=(const robj_sharedptr& other)
|
robj_sharedptr &operator=(const robj_sharedptr& other)
|
||||||
{
|
{
|
||||||
if (m_ptr)
|
robj_sharedptr tmp(other);
|
||||||
decrRefCount(m_ptr);
|
using std::swap;
|
||||||
m_ptr = other.m_ptr;
|
swap(m_ptr, tmp.m_ptr);
|
||||||
incrRefCount(m_ptr);
|
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
robj_sharedptr &operator=(redisObject *ptr)
|
robj_sharedptr &operator=(redisObject *ptr)
|
||||||
{
|
{
|
||||||
if (m_ptr)
|
robj_sharedptr tmp(ptr);
|
||||||
decrRefCount(m_ptr);
|
using std::swap;
|
||||||
m_ptr = ptr;
|
swap(m_ptr, tmp.m_ptr);
|
||||||
incrRefCount(m_ptr);
|
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool operator==(const robj_sharedptr &other) const
|
|
||||||
{
|
|
||||||
return m_ptr == other.m_ptr;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool operator==(const void *p) const
|
|
||||||
{
|
|
||||||
return m_ptr == p;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool operator!=(const robj_sharedptr &other) const
|
|
||||||
{
|
|
||||||
return m_ptr != other.m_ptr;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool operator!=(const void *p) const
|
|
||||||
{
|
|
||||||
return m_ptr != p;
|
|
||||||
}
|
|
||||||
|
|
||||||
redisObject* operator->() const
|
redisObject* operator->() const
|
||||||
{
|
{
|
||||||
return m_ptr;
|
return m_ptr;
|
||||||
@ -299,7 +279,7 @@ public:
|
|||||||
return !m_ptr;
|
return !m_ptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
operator bool() const{
|
explicit operator bool() const{
|
||||||
return !!m_ptr;
|
return !!m_ptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -309,8 +289,39 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
redisObject *get() { return m_ptr; }
|
redisObject *get() { return m_ptr; }
|
||||||
|
const redisObject *get() const { return m_ptr; }
|
||||||
};
|
};
|
||||||
|
|
||||||
|
inline bool operator==(const robj_sharedptr &lhs, const robj_sharedptr &rhs)
|
||||||
|
{
|
||||||
|
return lhs.get() == rhs.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
inline bool operator!=(const robj_sharedptr &lhs, const robj_sharedptr &rhs)
|
||||||
|
{
|
||||||
|
return !(lhs == rhs);
|
||||||
|
}
|
||||||
|
|
||||||
|
inline bool operator==(const robj_sharedptr &lhs, const void *p)
|
||||||
|
{
|
||||||
|
return lhs.get() == p;
|
||||||
|
}
|
||||||
|
|
||||||
|
inline bool operator==(const void *p, const robj_sharedptr &rhs)
|
||||||
|
{
|
||||||
|
return rhs == p;
|
||||||
|
}
|
||||||
|
|
||||||
|
inline bool operator!=(const robj_sharedptr &lhs, const void *p)
|
||||||
|
{
|
||||||
|
return !(lhs == p);
|
||||||
|
}
|
||||||
|
|
||||||
|
inline bool operator!=(const void *p, const robj_sharedptr &rhs)
|
||||||
|
{
|
||||||
|
return !(rhs == p);
|
||||||
|
}
|
||||||
|
|
||||||
/* Error codes */
|
/* Error codes */
|
||||||
#define C_OK 0
|
#define C_OK 0
|
||||||
#define C_ERR -1
|
#define C_ERR -1
|
||||||
@ -2033,6 +2044,7 @@ struct redisServerConst {
|
|||||||
int trial_timeout = 120;
|
int trial_timeout = 120;
|
||||||
int delete_on_evict = false; // Only valid when a storage provider is set
|
int delete_on_evict = false; // Only valid when a storage provider is set
|
||||||
int thread_min_client_threshold = 50;
|
int thread_min_client_threshold = 50;
|
||||||
|
int multimaster_no_forward;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct redisServer {
|
struct redisServer {
|
||||||
@ -3236,6 +3248,7 @@ void getCommand(client *c);
|
|||||||
void delCommand(client *c);
|
void delCommand(client *c);
|
||||||
void unlinkCommand(client *c);
|
void unlinkCommand(client *c);
|
||||||
void existsCommand(client *c);
|
void existsCommand(client *c);
|
||||||
|
void mexistsCommand(client *c);
|
||||||
void setbitCommand(client *c);
|
void setbitCommand(client *c);
|
||||||
void getbitCommand(client *c);
|
void getbitCommand(client *c);
|
||||||
void bitfieldCommand(client *c);
|
void bitfieldCommand(client *c);
|
||||||
|
@ -226,7 +226,7 @@ int tlsConfigure(redisTLSContextConfig *ctx_config) {
|
|||||||
SSL_CTX_set_ecdh_auto(ctx, 1);
|
SSL_CTX_set_ecdh_auto(ctx, 1);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
if (SSL_CTX_use_certificate_file(ctx, ctx_config->cert_file, SSL_FILETYPE_PEM) <= 0) {
|
if (SSL_CTX_use_certificate_chain_file(ctx, ctx_config->cert_file) <= 0) {
|
||||||
ERR_error_string_n(ERR_get_error(), errbuf, sizeof(errbuf));
|
ERR_error_string_n(ERR_get_error(), errbuf, sizeof(errbuf));
|
||||||
serverLog(LL_WARNING, "Failed to load certificate: %s: %s", ctx_config->cert_file, errbuf);
|
serverLog(LL_WARNING, "Failed to load certificate: %s: %s", ctx_config->cert_file, errbuf);
|
||||||
goto error;
|
goto error;
|
||||||
|
@ -101,8 +101,8 @@ void disableTracking(client *c) {
|
|||||||
|
|
||||||
/* Set the client 'c' to track the prefix 'prefix'. If the client 'c' is
|
/* Set the client 'c' to track the prefix 'prefix'. If the client 'c' is
|
||||||
* already registered for the specified prefix, no operation is performed. */
|
* already registered for the specified prefix, no operation is performed. */
|
||||||
void enableBcastTrackingForPrefix(client *c, const char *prefix, size_t plen) {
|
static void enableBcastTrackingForPrefix(client *c, const char *prefix, size_t plen) {
|
||||||
bcastState *bs = (bcastState*)raxFind(PrefixTable,(unsigned char*)prefix,sdslen(prefix));
|
bcastState *bs = (bcastState*)raxFind(PrefixTable,(unsigned char*)prefix,plen);
|
||||||
/* If this is the first client subscribing to such prefix, create
|
/* If this is the first client subscribing to such prefix, create
|
||||||
* the prefix in the table. */
|
* the prefix in the table. */
|
||||||
if (bs == raxNotFound) {
|
if (bs == raxNotFound) {
|
||||||
|
@ -64,6 +64,7 @@ start_server {} {
|
|||||||
# make sure replication is still alive and kicking
|
# make sure replication is still alive and kicking
|
||||||
$R(1) incr x
|
$R(1) incr x
|
||||||
wait_for_condition 50 1000 {
|
wait_for_condition 50 1000 {
|
||||||
|
[status $R(0) loading] == 0 &&
|
||||||
[$R(0) get x] == 1
|
[$R(0) get x] == 1
|
||||||
} else {
|
} else {
|
||||||
fail "replica didn't get incr"
|
fail "replica didn't get incr"
|
||||||
|
@ -265,3 +265,18 @@ foreach mdl {no yes} {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
start_server {tags {"active-repl"} overrides {active-replica yes}} {
|
||||||
|
set slave [srv 0 client]
|
||||||
|
set slave_host [srv 0 host]
|
||||||
|
set slave_port [srv 0 port]
|
||||||
|
start_server {tags {"active-repl"} overrides { active-replica yes}} {
|
||||||
|
r set testkeyB bar
|
||||||
|
test {Active Replica Merges Database On Sync} {
|
||||||
|
$slave set testkeyA foo
|
||||||
|
r replicaof $slave_host $slave_port
|
||||||
|
after 1000
|
||||||
|
assert_equal 2 [r dbsize]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -1,4 +1,6 @@
|
|||||||
foreach topology {mesh ring} {
|
foreach topology {mesh ring} {
|
||||||
|
|
||||||
|
foreach noforward [expr {[string equal $topology "mesh"] ? {no yes} : {no}}] {
|
||||||
start_server {tags {"multi-master"} overrides {hz 500 active-replica yes multi-master yes}} {
|
start_server {tags {"multi-master"} overrides {hz 500 active-replica yes multi-master yes}} {
|
||||||
start_server {overrides {hz 500 active-replica yes multi-master yes}} {
|
start_server {overrides {hz 500 active-replica yes multi-master yes}} {
|
||||||
start_server {overrides {hz 500 active-replica yes multi-master yes}} {
|
start_server {overrides {hz 500 active-replica yes multi-master yes}} {
|
||||||
@ -8,8 +10,12 @@ start_server {overrides {hz 500 active-replica yes multi-master yes}} {
|
|||||||
set R($j) [srv [expr 0-$j] client]
|
set R($j) [srv [expr 0-$j] client]
|
||||||
set R_host($j) [srv [expr 0-$j] host]
|
set R_host($j) [srv [expr 0-$j] host]
|
||||||
set R_port($j) [srv [expr 0-$j] port]
|
set R_port($j) [srv [expr 0-$j] port]
|
||||||
|
|
||||||
|
$R($j) config set multi-master-no-forward $noforward
|
||||||
}
|
}
|
||||||
|
|
||||||
|
set topology_name "$topology[expr {[string equal $noforward "yes"] ? " no-forward" : ""}]"
|
||||||
|
|
||||||
# Initialize as mesh
|
# Initialize as mesh
|
||||||
if [string equal $topology "mesh"] {
|
if [string equal $topology "mesh"] {
|
||||||
for {set j 0} {$j < 4} {incr j} {
|
for {set j 0} {$j < 4} {incr j} {
|
||||||
@ -31,7 +37,7 @@ start_server {overrides {hz 500 active-replica yes multi-master yes}} {
|
|||||||
$R(3) replicaof $R_host(2) $R_port(2)
|
$R(3) replicaof $R_host(2) $R_port(2)
|
||||||
}
|
}
|
||||||
|
|
||||||
test "$topology all nodes up" {
|
test "$topology_name all nodes up" {
|
||||||
for {set j 0} {$j < 4} {incr j} {
|
for {set j 0} {$j < 4} {incr j} {
|
||||||
wait_for_condition 50 100 {
|
wait_for_condition 50 100 {
|
||||||
[string match {*master_global_link_status:up*} [$R($j) info replication]]
|
[string match {*master_global_link_status:up*} [$R($j) info replication]]
|
||||||
@ -41,7 +47,7 @@ start_server {overrides {hz 500 active-replica yes multi-master yes}} {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
test "$topology replicates to all nodes" {
|
test "$topology_name replicates to all nodes" {
|
||||||
$R(0) set testkey foo
|
$R(0) set testkey foo
|
||||||
after 500
|
after 500
|
||||||
for {set n 0} {$n < 4} {incr n} {
|
for {set n 0} {$n < 4} {incr n} {
|
||||||
@ -53,7 +59,7 @@ start_server {overrides {hz 500 active-replica yes multi-master yes}} {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
test "$topology replicates only once" {
|
test "$topology_name replicates only once" {
|
||||||
$R(0) set testkey 1
|
$R(0) set testkey 1
|
||||||
after 500
|
after 500
|
||||||
#wait_for_condition 50 100 {
|
#wait_for_condition 50 100 {
|
||||||
@ -74,7 +80,7 @@ start_server {overrides {hz 500 active-replica yes multi-master yes}} {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
test "$topology transaction replicates only once" {
|
test "$topology_name transaction replicates only once" {
|
||||||
for {set j 0} {$j < 1000} {incr j} {
|
for {set j 0} {$j < 1000} {incr j} {
|
||||||
$R(0) set testkey 1
|
$R(0) set testkey 1
|
||||||
$R(0) multi
|
$R(0) multi
|
||||||
@ -95,3 +101,4 @@ start_server {overrides {hz 500 active-replica yes multi-master yes}} {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
@ -30,6 +30,7 @@
|
|||||||
* POSSIBILITY OF SUCH DAMAGE.
|
* POSSIBILITY OF SUCH DAMAGE.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#define REDISMODULE_EXPERIMENTAL_API 1
|
||||||
#include "redismodule.h"
|
#include "redismodule.h"
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
@ -143,10 +144,12 @@ void flushdbCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void
|
|||||||
{
|
{
|
||||||
REDISMODULE_NOT_USED(e);
|
REDISMODULE_NOT_USED(e);
|
||||||
|
|
||||||
|
RedisModule_ThreadSafeContextLock(ctx);
|
||||||
RedisModuleFlushInfo *fi = data;
|
RedisModuleFlushInfo *fi = data;
|
||||||
char *keyname = (sub == REDISMODULE_SUBEVENT_FLUSHDB_START) ?
|
char *keyname = (sub == REDISMODULE_SUBEVENT_FLUSHDB_START) ?
|
||||||
"flush-start" : "flush-end";
|
"flush-start" : "flush-end";
|
||||||
LogNumericEvent(ctx, keyname, fi->dbnum);
|
LogNumericEvent(ctx, keyname, fi->dbnum);
|
||||||
|
RedisModule_ThreadSafeContextUnlock(ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
void roleChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
|
void roleChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
|
||||||
|
@ -313,4 +313,14 @@ start_server {tags {"expire"}} {
|
|||||||
after 3000
|
after 3000
|
||||||
assert_equal [r dbsize] 0
|
assert_equal [r dbsize] 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test {SET - use KEEPTTL option, TTL should not be removed after loadaof} {
|
||||||
|
r config set appendonly yes
|
||||||
|
r set foo bar EX 100
|
||||||
|
r set foo bar2 KEEPTTL
|
||||||
|
after 2000
|
||||||
|
r debug loadaof
|
||||||
|
set ttl [r ttl foo]
|
||||||
|
assert {$ttl <= 98 && $ttl > 90}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -363,6 +363,9 @@ start_server {tags {"multi"}} {
|
|||||||
set xx [r get xx]
|
set xx [r get xx]
|
||||||
# make sure that either the whole transcation passed or none of it (we actually expect none)
|
# make sure that either the whole transcation passed or none of it (we actually expect none)
|
||||||
assert { $xx == 1 || $xx == 3}
|
assert { $xx == 1 || $xx == 3}
|
||||||
|
# Discard the transaction since EXEC likely got -BUSY error
|
||||||
|
# so the client is still in MULTI state.
|
||||||
|
catch { $rd2 discard ;$rd2 read } e
|
||||||
# check that the connection is no longer in multi state
|
# check that the connection is no longer in multi state
|
||||||
$rd2 ping asdf
|
$rd2 ping asdf
|
||||||
set pong [$rd2 read]
|
set pong [$rd2 read]
|
||||||
|
Loading…
x
Reference in New Issue
Block a user