diff --git a/00-RELEASENOTES b/00-RELEASENOTES index 7ce88c556..c6ee44246 100644 --- a/00-RELEASENOTES +++ b/00-RELEASENOTES @@ -11,6 +11,76 @@ CRITICAL: There is a critical bug affecting MOST USERS. Upgrade ASAP. SECURITY: There are security fixes in the release. -------------------------------------------------------------------------------- +================================================================================ +Redis 6.0.5 Released Tue Jun 09 11:56:08 CEST 2020 +================================================================================ + +Upgrade urgency MODERATE: several bugs with moderate impact are fixed here. + +The most important issues are listed here: + +* Fix handling of speical chars in ACL LOAD. +* Make Redis Cluster more robust about operation errors that may lead + to two clusters to mix together. +* Revert the sendfile() implementation of RDB transfer. It causes some delay. +* Fix TLS certificate loading for chained certificates. +* Fix AOF rewirting of KEEPTTL SET option. +* Fix MULTI/EXEC behavior during -BUSY script errors. + +And this is the full list of commits: + +antirez in commit ee8dd01bb: + Temporary fix for #7353 issue about EVAL during -BUSY. + 1 file changed, 9 insertions(+) + +xhe in commit a4a856d53: + return the correct proto version HELLO should return the current proto version, while the code hardcoded 3 + 1 file changed, 1 insertion(+), 1 deletion(-) + +Oran Agra in commit e2046b300: + Don't queue commands in an already aborted MULTI state + 1 file changed, 7 insertions(+) + +Oran Agra in commit b35fdf1de: + Avoid rejecting WATCH / UNWATCH, like MULTI/EXEC/DISCARD + 1 file changed, 4 insertions(+), 2 deletions(-) + +zhaozhao.zz in commit 1d7bf208c: + AOF: append origin SET if no expire option + 2 files changed, 23 insertions(+), 8 deletions(-) + +Oran Agra in commit 676445ad9: + fix disconnectSlaves, to try to free each slave. + 1 file changed, 1 deletion(-) + +zhaozhao.zz in commit 4846c0c8a: + donot free protected client in freeClientsInAsyncFreeQueue + 1 file changed, 9 insertions(+), 3 deletions(-) + +Oran Agra in commit f33de403e: + fix pingoff test race + 1 file changed, 1 insertion(+) + +Kevin Fwu in commit 49af4d07e: + Fix TLS certificate loading for chained certificates. + 1 file changed, 1 insertion(+), 1 deletion(-) + +antirez in commit 329fddbda: + Revert "Implements sendfile for redis." + 2 files changed, 2 insertions(+), 55 deletions(-) + +antirez in commit 925a2cd5a: + Revert "avoid using sendfile if tls-replication is enabled" + 1 file changed, 27 insertions(+), 34 deletions(-) + +Liu Zhen in commit 84a7a9058: + fix clusters mixing accidentally by gossip + 1 file changed, 10 insertions(+), 2 deletions(-) + +antirez in commit cd63359a1: + Fix handling of special chars in ACL LOAD. + 1 file changed, 8 insertions(+), 4 deletions(-) + ================================================================================ Redis 6.0.4 Released Thu May 28 11:36:45 CEST 2020 ================================================================================ diff --git a/keydb.conf b/keydb.conf index 7ac2099da..bc7f90093 100644 --- a/keydb.conf +++ b/keydb.conf @@ -1811,6 +1811,15 @@ jemalloc-bg-thread yes # Set bgsave child process to cpu affinity 1,10,11 # bgsave_cpulist 1,10-11 +# The minimum number of clients on a thread before KeyDB assigns new connections to a different thread +# Tuning this parameter is a tradeoff between locking overhead and distributing the workload over multiple cores +# min-clients-per-thread 50 + +# Avoid forwarding RREPLAY messages to other masters? +# WARNING: This setting is dangerous! You must be certain all masters are connected to each +# other in a true mesh topology or data loss will occur! +# This command can be used to reduce multimaster bus traffic +# multi-master-no-forward no # Path to directory for file backed scratchpad. The file backed scratchpad # reduces memory requirements by storing rarely accessed data on disk diff --git a/pkg/rpm/keydb_build/keydb_rpm/etc/keydb/keydb-sentinel.conf b/pkg/rpm/keydb_build/keydb_rpm/etc/keydb/keydb-sentinel.conf old mode 100755 new mode 100644 diff --git a/pkg/rpm/keydb_build/keydb_rpm/etc/keydb/keydb.conf b/pkg/rpm/keydb_build/keydb_rpm/etc/keydb/keydb.conf old mode 100755 new mode 100644 diff --git a/pkg/rpm/keydb_build/keydb_rpm/etc/logrotate.d/keydb b/pkg/rpm/keydb_build/keydb_rpm/etc/logrotate.d/keydb old mode 100755 new mode 100644 diff --git a/pkg/rpm/keydb_build/keydb_rpm/etc/systemd/system/keydb-sentinel.service.d/limit.conf b/pkg/rpm/keydb_build/keydb_rpm/etc/systemd/system/keydb-sentinel.service.d/limit.conf old mode 100755 new mode 100644 diff --git a/pkg/rpm/keydb_build/keydb_rpm/etc/systemd/system/keydb.service.d/limit.conf b/pkg/rpm/keydb_build/keydb_rpm/etc/systemd/system/keydb.service.d/limit.conf old mode 100755 new mode 100644 diff --git a/pkg/rpm/keydb_build/keydb_rpm/usr/lib/systemd/system/keydb-sentinel.service b/pkg/rpm/keydb_build/keydb_rpm/usr/lib/systemd/system/keydb-sentinel.service old mode 100755 new mode 100644 diff --git a/pkg/rpm/keydb_build/keydb_rpm/usr/lib/systemd/system/keydb.service b/pkg/rpm/keydb_build/keydb_rpm/usr/lib/systemd/system/keydb.service old mode 100755 new mode 100644 diff --git a/src/acl.cpp b/src/acl.cpp index b51dcc295..509dd0776 100644 --- a/src/acl.cpp +++ b/src/acl.cpp @@ -735,10 +735,11 @@ void ACLAddAllowedSubcommand(user *u, unsigned long id, const char *sub) { * EEXIST: You are adding a key pattern after "*" was already added. This is * almost surely an error on the user side. * ENODEV: The password you are trying to remove from the user does not exist. - * EBADMSG: The hash you are trying to add is not a valid hash. + * EBADMSG: The hash you are trying to add is not a valid hash. */ int ACLSetUser(user *u, const char *op, ssize_t oplen) { if (oplen == -1) oplen = strlen(op); + if (oplen == 0) return C_OK; /* Empty string is a no-operation. */ if (!strcasecmp(op,"on")) { u->flags |= USER_FLAG_ENABLED; u->flags &= ~USER_FLAG_DISABLED; @@ -1300,7 +1301,7 @@ sds ACLLoadFromFile(const char *filename) { if (lines[i][0] == '\0') continue; /* Split into arguments */ - argv = sdssplitargs(lines[i],&argc); + argv = sdssplitlen(lines[i],sdslen(lines[i])," ",1,&argc); if (argv == NULL) { errors = sdscatprintf(errors, "%s:%d: unbalanced quotes in acl line. ", @@ -1332,11 +1333,14 @@ sds ACLLoadFromFile(const char *filename) { continue; } - /* Try to process the line using the fake user to validate iif - * the rules are able to apply cleanly. */ + /* Try to process the line using the fake user to validate if + * the rules are able to apply cleanly. At this stage we also + * trim trailing spaces, so that we don't have to handle that + * in ACLSetUser(). */ ACLSetUser(fakeuser,"reset",-1); int j; for (j = 2; j < argc; j++) { + argv[j] = sdstrim(argv[j],"\t\r\n"); if (ACLSetUser(fakeuser,argv[j],sdslen(argv[j])) != C_OK) { const char *errmsg = ACLSetUserStringError(); errors = sdscatprintf(errors, diff --git a/src/ae.cpp b/src/ae.cpp index 6d87a85c7..cccf2a130 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -237,7 +237,7 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask, int ret = AE_OK; - aeCommand cmd = {}; + aeCommand cmd; cmd.op = AE_ASYNC_OP::CreateFileEvent; cmd.fd = fd; cmd.mask = mask; @@ -261,7 +261,7 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask, if (fSynchronous) { - std::unique_lock ulock(cmd.pctl->mutexcv, std::defer_lock); + std::unique_lock ulock(cmd.pctl->mutexcv, std::adopt_lock); cmd.pctl->cv.wait(ulock); ret = cmd.pctl->rval; delete cmd.pctl; @@ -297,7 +297,6 @@ int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynch } aeCommand cmd = {}; - memset(&cmd, 0, sizeof(aeCommand)); cmd.op = AE_ASYNC_OP::PostCppFunction; cmd.pfn = new std::function(fn); cmd.pctl = nullptr; @@ -316,7 +315,7 @@ int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynch int ret = AE_OK; if (fSynchronous) { - std::unique_lock ulock(cmd.pctl->mutexcv, std::defer_lock); + std::unique_lock ulock(cmd.pctl->mutexcv, std::adopt_lock); cmd.pctl->cv.wait(ulock); ret = cmd.pctl->rval; delete cmd.pctl; @@ -458,7 +457,7 @@ void aeDeleteFileEventAsync(aeEventLoop *eventLoop, int fd, int mask) { if (eventLoop == g_eventLoopThisThread) return aeDeleteFileEvent(eventLoop, fd, mask); - aeCommand cmd; + aeCommand cmd = {}; cmd.op = AE_ASYNC_OP::DeleteFileEvent; cmd.fd = fd; cmd.mask = mask; diff --git a/src/aof.cpp b/src/aof.cpp index 0a15204d0..8055007f1 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -671,19 +671,23 @@ sds catCommandForAofAndActiveReplication(sds buf, struct redisCommand *cmd, robj } else if (cmd->proc == setCommand && argc > 3) { int i; robj *exarg = NULL, *pxarg = NULL; - /* Translate SET [EX seconds][PX milliseconds] to SET and PEXPIREAT */ - buf = catAppendOnlyGenericCommand(buf,3,argv); for (i = 3; i < argc; i ++) { if (!strcasecmp(szFromObj(argv[i]), "ex")) exarg = argv[i+1]; if (!strcasecmp(szFromObj(argv[i]), "px")) pxarg = argv[i+1]; } serverAssert(!(exarg && pxarg)); - if (exarg) - buf = catAppendOnlyExpireAtCommand(buf,cserver.expireCommand,argv[1], - exarg); - if (pxarg) - buf = catAppendOnlyExpireAtCommand(buf,cserver.pexpireCommand,argv[1], - pxarg); + if (exarg || pxarg) { + /* Translate SET [EX seconds][PX milliseconds] to SET and PEXPIREAT */ + buf = catAppendOnlyGenericCommand(buf,3,argv); + if (exarg) + buf = catAppendOnlyExpireAtCommand(buf,cserver.expireCommand,argv[1], + exarg); + if (pxarg) + buf = catAppendOnlyExpireAtCommand(buf,cserver.pexpireCommand,argv[1], + pxarg); + } else { + buf = catAppendOnlyGenericCommand(buf,argc,argv); + } } else if (cmd->proc == expireMemberCommand || cmd->proc == expireMemberAtCommand || cmd->proc == pexpireMemberAtCommand) { /* Translate subkey expire commands to PEXPIREMEMBERAT */ diff --git a/src/cluster.cpp b/src/cluster.cpp index 389c6dd98..a13386cba 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -1501,7 +1501,10 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) { } } else { /* If it's not in NOADDR state and we don't have it, we - * start a handshake process against this IP/PORT pairs. + * add it to our trusted dict with exact nodeid and flag. + * Note that we cannot simply start a handshake against + * this IP/PORT pairs, since IP/PORT can be reused already, + * otherwise we risk joining another cluster. * * Note that we require that the sender of this gossip message * is a well known node in our cluster, otherwise we risk @@ -1510,7 +1513,12 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) { !(flags & CLUSTER_NODE_NOADDR) && !clusterBlacklistExists(g->nodename)) { - clusterStartHandshake(g->ip,ntohs(g->port),ntohs(g->cport)); + clusterNode *node; + node = createClusterNode(g->nodename, flags); + memcpy(node->ip,g->ip,NET_IP_STR_LEN); + node->port = ntohs(g->port); + node->cport = ntohs(g->cport); + clusterAddNode(node); } } diff --git a/src/config.cpp b/src/config.cpp index 64fda91b6..35d6386cb 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -2238,6 +2238,13 @@ static int updateMaxclients(long long val, long long prev, const char **err) { return 1; } +static int validateMultiMasterNoForward(int val, const char **) { + if (val) { + serverLog(LL_WARNING, "WARNING: multi-master-no-forward is set, you *must* use a mesh topology or dataloss will occur"); + } + return 1; +} + #ifdef USE_OPENSSL static int updateTlsCfg(char *val, char *prev, const char **err) { UNUSED(val); @@ -2295,6 +2302,7 @@ standardConfig configs[] = { createBoolConfig("cluster-allow-reads-when-down", NULL, MODIFIABLE_CONFIG, g_pserver->cluster_allow_reads_when_down, 0, NULL, NULL), createBoolConfig("delete-on-evict", NULL, MODIFIABLE_CONFIG, cserver.delete_on_evict, 0, NULL, NULL), createBoolConfig("io-threads-do-reads", NULL, IMMUTABLE_CONFIG, fDummy, 0,NULL, NULL), // Not applicable to KeyDB, just there for compatibility + createBoolConfig("multi-master-no-forward", NULL, MODIFIABLE_CONFIG, cserver.multimaster_no_forward, 0, validateMultiMasterNoForward, NULL), /* String Configs */ createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, g_pserver->acl_filename, "", NULL, NULL), diff --git a/src/connection.cpp b/src/connection.cpp index 01bbabd65..b92d096aa 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -165,7 +165,7 @@ static int connSocketWrite(connection *conn, const void *data, size_t data_len) int ret = write(conn->fd, data, data_len); if (ret < 0 && errno != EAGAIN) { conn->last_errno = errno; - conn->state = CONN_STATE_ERROR; + conn->state.store(CONN_STATE_ERROR, std::memory_order_relaxed); } return ret; @@ -174,10 +174,10 @@ static int connSocketWrite(connection *conn, const void *data, size_t data_len) static int connSocketRead(connection *conn, void *buf, size_t buf_len) { int ret = read(conn->fd, buf, buf_len); if (!ret) { - conn->state = CONN_STATE_CLOSED; + conn->state.store(CONN_STATE_CLOSED, std::memory_order_release); } else if (ret < 0 && errno != EAGAIN) { conn->last_errno = errno; - conn->state = CONN_STATE_ERROR; + conn->state.store(CONN_STATE_ERROR, std::memory_order_release); } return ret; @@ -256,14 +256,14 @@ void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, in UNUSED(fd); connection *conn = (connection*)clientData; - if (conn->state == CONN_STATE_CONNECTING && + if (conn->state.load(std::memory_order_relaxed) == CONN_STATE_CONNECTING && (mask & AE_WRITABLE) && conn->conn_handler) { if (connGetSocketError(conn)) { conn->last_errno = errno; - conn->state = CONN_STATE_ERROR; + conn->state.store(CONN_STATE_ERROR, std::memory_order_release); } else { - conn->state = CONN_STATE_CONNECTED; + conn->state.store(CONN_STATE_CONNECTED, std::memory_order_release); } if (!conn->write_handler) aeDeleteFileEvent(serverTL->el,conn->fd,AE_WRITABLE); @@ -426,7 +426,7 @@ int connRecvTimeout(connection *conn, long long ms) { } int connGetState(connection *conn) { - return conn->state; + return conn->state.load(std::memory_order_relaxed); } void connSetThreadAffinity(connection *conn, int cpu) { diff --git a/src/db.cpp b/src/db.cpp index 39ba16cb9..0c1cb0350 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -741,6 +741,13 @@ void existsCommand(client *c) { addReplyLongLong(c,count); } +void mexistsCommand(client *c) { + addReplyArrayLen(c, c->argc - 1); + for (int j = 1; j < c->argc; ++j) { + addReplyBool(c, lookupKeyRead(c->db, c->argv[j])); + } +} + void selectCommand(client *c) { long id; @@ -2737,4 +2744,4 @@ int dbnumFromDb(redisDb *db) return i; } serverPanic("invalid database pointer"); -} \ No newline at end of file +} diff --git a/src/expire.cpp b/src/expire.cpp index 14c65449d..5784690ee 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -152,7 +152,11 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { fTtlChanged = true; } - if (!pfat->FEmpty() && fTtlChanged) + if (pfat->FEmpty()) + { + removeExpire(db, &objKey); + } + else if (!pfat->FEmpty() && fTtlChanged) { // We need to resort the expire entry since it may no longer be in the correct position db->resortExpire(e); @@ -168,11 +172,6 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { break; } } - - if (pfat->FEmpty()) - { - removeExpire(db, &objKey); - } } int parseUnitString(const char *sz) diff --git a/src/help.h b/src/help.h index 2f692dfc2..c6b614475 100644 --- a/src/help.h +++ b/src/help.h @@ -1325,7 +1325,12 @@ struct commandHelp { "Rename a hash key, copying the value.", 4, "6.5.3" - } + }, + { "KEYDB.MEXISTS", + "key [key ...]", + "Determine if a key exists", + 0, + "6.5.12" }, }; #endif diff --git a/src/module.cpp b/src/module.cpp index 4f79b9f95..27055e595 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -4895,14 +4895,17 @@ void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) { zfree(ctx); } +static bool g_fModuleThread = false; /* Acquire the server lock before executing a thread safe API call. * This is not needed for `RedisModule_Reply*` calls when there is * a blocked client connected to the thread safe context. */ void RM_ThreadSafeContextLock(RedisModuleCtx *ctx) { UNUSED(ctx); - moduleAcquireGIL(FALSE /*fServerThread*/); - if (serverTL == nullptr) + if (serverTL == nullptr) { serverTL = &g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN]; // arbitrary module threads get the main thread context + g_fModuleThread = true; + } + moduleAcquireGIL(FALSE /*fServerThread*/); } /* Release the server lock after a thread safe API call was executed. */ @@ -4911,10 +4914,20 @@ void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) { moduleReleaseGIL(FALSE /*fServerThread*/); } +// A module may be triggered synchronously in a non-module context. In this scenario we don't lock again +// as the server thread acquisition is sufficient. If we did try to lock we would deadlock +static bool FModuleCallBackLock(bool fServerThread) +{ + return !fServerThread && aeThreadOwnsLock() && !g_fModuleThread && s_cAcquisitionsServer > 0; +} void moduleAcquireGIL(int fServerThread) { std::unique_lock lock(s_mutex); int *pcheck = fServerThread ? &s_cAcquisitionsModule : &s_cAcquisitionsServer; + if (FModuleCallBackLock(fServerThread)) { + return; + } + while (*pcheck > 0) s_cv.wait(lock); @@ -4932,6 +4945,10 @@ void moduleAcquireGIL(int fServerThread) { void moduleReleaseGIL(int fServerThread) { std::unique_lock lock(s_mutex); + if (FModuleCallBackLock(fServerThread)) { + return; + } + if (fServerThread) { --s_cAcquisitionsServer; diff --git a/src/multi.cpp b/src/multi.cpp index 7faad7cb5..8f4462569 100644 --- a/src/multi.cpp +++ b/src/multi.cpp @@ -59,6 +59,13 @@ void queueMultiCommand(client *c) { multiCmd *mc; int j; + /* No sense to waste memory if the transaction is already aborted. + * this is useful in case client sends these in a pipeline, or doesn't + * bother to read previous responses and didn't notice the multi was already + * aborted. */ + if (c->flags & CLIENT_DIRTY_EXEC) + return; + c->mstate.commands = (multiCmd*)zrealloc(c->mstate.commands, sizeof(multiCmd)*(c->mstate.count+1), MALLOC_LOCAL); mc = c->mstate.commands+c->mstate.count; @@ -131,6 +138,15 @@ void execCommand(client *c) { return; } + /* If we are in -BUSY state, flag the transaction and return the + * -BUSY error, like Redis <= 5. This is a temporary fix, may be changed + * ASAP, see issue #7353 on Github. */ + if (g_pserver->lua_timedout) { + flagTransaction(c); + addReply(c, shared.slowscripterr); + return; + } + /* Check if we need to abort the EXEC because: * 1) Some WATCHed key was touched. * 2) There was a previous error while queueing commands. diff --git a/src/networking.cpp b/src/networking.cpp index 89efceb8d..328be5c73 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1652,7 +1652,7 @@ int freeClientsInAsyncFreeQueue(int iel) { while((ln = listNext(&li))) { client *c = (client*)listNodeValue(ln); - if (c->iel == iel) + if (c->iel == iel && !(c->flags & CLIENT_PROTECTED)) { vecclientsFree.push_back(c); listDelNode(g_pserver->clients_to_close, ln); @@ -1935,9 +1935,12 @@ int handleClientsWithPendingWrites(int iel, int aof_state) { } } - AeLocker locker; - locker.arm(nullptr); - ProcessPendingAsyncWrites(); + if (listLength(serverTL->clients_pending_asyncwrite)) + { + AeLocker locker; + locker.arm(nullptr); + ProcessPendingAsyncWrites(); + } return processed; } @@ -3065,7 +3068,7 @@ void helloCommand(client *c) { addReplyBulkCString(c,KEYDB_SET_VERSION); addReplyBulkCString(c,"proto"); - addReplyLongLong(c,3); + addReplyLongLong(c,ver); addReplyBulkCString(c,"id"); addReplyLongLong(c,c->id); diff --git a/src/rdb.cpp b/src/rdb.cpp index befb43430..278c657e4 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2497,9 +2497,8 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { if (fStaleMvccKey && !fExpiredKey && rsi != nullptr && rsi->mi != nullptr && rsi->mi->staleKeyMap != nullptr && lookupKeyRead(db, &keyobj) == nullptr) { // We have a key that we've already deleted and is not back in our database. // We'll need to inform the sending master of the delete if it is also a replica of us - robj *objKeyDup = createStringObject(key, sdslen(key)); - rsi->mi->staleKeyMap->operator[](dbid).push_back(objKeyDup); - decrRefCount(objKeyDup); + robj_sharedptr objKeyDup(createStringObject(key, sdslen(key))); + rsi->mi->staleKeyMap->operator[](db->id).push_back(objKeyDup); } fLastKeyExpired = true; sdsfree(key); diff --git a/src/replication.cpp b/src/replication.cpp index ea290363e..a905fe0fe 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -1388,30 +1388,28 @@ void sendBulkToSlave(connection *conn) { * try to use sendfile system call if supported, unless tls is enabled. * fallback to normal read+write otherwise. */ nwritten = 0; - if (!nwritten) { - ssize_t buflen; - char buf[PROTO_IOBUF_LEN]; + ssize_t buflen; + char buf[PROTO_IOBUF_LEN]; - lseek(replica->repldbfd,replica->repldboff,SEEK_SET); - buflen = read(replica->repldbfd,buf,PROTO_IOBUF_LEN); - if (buflen <= 0) { - serverLog(LL_WARNING,"Read error sending DB to replica: %s", - (buflen == 0) ? "premature EOF" : strerror(errno)); + lseek(replica->repldbfd,replica->repldboff,SEEK_SET); + buflen = read(replica->repldbfd,buf,PROTO_IOBUF_LEN); + if (buflen <= 0) { + serverLog(LL_WARNING,"Read error sending DB to replica: %s", + (buflen == 0) ? "premature EOF" : strerror(errno)); + ul.unlock(); + aeLock.arm(nullptr); + freeClient(replica); + return; + } + if ((nwritten = connWrite(conn,buf,buflen)) == -1) { + if (connGetState(conn) != CONN_STATE_CONNECTED) { + serverLog(LL_WARNING,"Write error sending DB to replica: %s", + connGetLastError(conn)); ul.unlock(); aeLock.arm(nullptr); freeClient(replica); - return; - } - if ((nwritten = connWrite(conn,buf,buflen)) == -1) { - if (connGetState(conn) != CONN_STATE_CONNECTED) { - serverLog(LL_WARNING,"Write error sending DB to replica: %s", - connGetLastError(conn)); - ul.unlock(); - aeLock.arm(nullptr); - freeClient(replica); - } - return; } + return; } replica->repldboff += nwritten; @@ -2086,7 +2084,6 @@ void readSyncBulkPayload(connection *conn) { * * 2. Or when we are done reading from the socket to the RDB file, in * such case we want just to read the RDB file in memory. */ - serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data"); /* We need to stop any AOF rewriting child before flusing and parsing * the RDB, otherwise we'll create a copy-on-write disaster. */ @@ -2099,13 +2096,13 @@ void readSyncBulkPayload(connection *conn) { * dictionaries */ diskless_load_backup = disklessLoadMakeBackups(); } - - if (!fUpdate) - { - /* We call to emptyDb even in case of REPL_DISKLESS_LOAD_SWAPDB - * (Where disklessLoadMakeBackups left server.db empty) because we - * want to execute all the auxiliary logic of emptyDb (Namely, - * fire module events) */ + + /* We call to emptyDb even in case of REPL_DISKLESS_LOAD_SWAPDB + * (Where disklessLoadMakeBackups left server.db empty) because we + * want to execute all the auxiliary logic of emptyDb (Namely, + * fire module events) */ + if (!fUpdate) { + serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data"); emptyDb(-1,empty_db_flags,replicationEmptyDbCallback); } @@ -3203,15 +3200,46 @@ void replicaofCommand(client *c) { return; } - /* The special host/port combination "NO" "ONE" turns the instance - * into a master. Otherwise the new master address is set. */ - if (!strcasecmp((const char*)ptrFromObj(c->argv[1]),"no") && + if (c->argc > 3) { + if (c->argc != 4) { + addReplyError(c, "Invalid arguments"); + return; + } + if (!strcasecmp((const char*)ptrFromObj(c->argv[1]),"remove")) { + listIter li; + listNode *ln; + bool fRemoved = false; + long port; + string2l(szFromObj(c->argv[3]), sdslen(szFromObj(c->argv[3])), &port); + LRestart: + listRewind(g_pserver->masters, &li); + while ((ln = listNext(&li))) { + redisMaster *mi = (redisMaster*)listNodeValue(ln); + if (mi->masterport != port) + continue; + if (sdscmp(szFromObj(c->argv[2]), mi->masterhost) == 0) { + replicationUnsetMaster(mi); + fRemoved = true; + goto LRestart; + } + } + if (!fRemoved) { + addReplyError(c, "Master not found"); + return; + } else if (listLength(g_pserver->masters) == 0) { + goto LLogNoMaster; + } + } + } else if (!strcasecmp((const char*)ptrFromObj(c->argv[1]),"no") && !strcasecmp((const char*)ptrFromObj(c->argv[2]),"one")) { + /* The special host/port combination "NO" "ONE" turns the instance + * into a master. Otherwise the new master address is set. */ if (listLength(g_pserver->masters)) { while (listLength(g_pserver->masters)) { replicationUnsetMaster((redisMaster*)listNodeValue(listFirst(g_pserver->masters))); } + LLogNoMaster: sds client = catClientInfoString(sdsempty(),c); serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')", client); @@ -4270,7 +4298,7 @@ void replicaReplayCommand(client *c) serverTL->current_client = current_clientSave; // call() will not propogate this for us, so we do so here - if (!s_pstate->FCancelled() && s_pstate->FFirst()) + if (!s_pstate->FCancelled() && s_pstate->FFirst() && !cserver.multimaster_no_forward) alsoPropagate(cserver.rreplayCommand,c->db->id,c->argv,c->argc,PROPAGATE_AOF|PROPAGATE_REPL); s_pstate->Pop(); diff --git a/src/server.cpp b/src/server.cpp index 4fae6e2af..e6e7df3c2 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -248,6 +248,10 @@ struct redisCommand redisCommandTable[] = { "read-only fast @keyspace", 0,NULL,1,-1,1,0,0,0}, + {"keydb.mexists",mexistsCommand,-2, + "read-only fast @keyspace", + 0,NULL,1,-1,1,0,0,0}, + {"setbit",setbitCommand,4, "write use-memory @bitmap", 0,NULL,1,1,1,0,0,0}, @@ -769,7 +773,7 @@ struct redisCommand redisCommandTable[] = { "admin no-script ok-stale", 0,NULL,0,0,0,0,0,0}, - {"replicaof",replicaofCommand,3, + {"replicaof",replicaofCommand,-3, "admin no-script ok-stale", 0,NULL,0,0,0,0,0,0}, @@ -810,11 +814,11 @@ struct redisCommand redisCommandTable[] = { 0,NULL,0,0,0,0,0,0}, {"watch",watchCommand,-2, - "no-script fast @transaction", + "no-script fast ok-loading ok-stale @transaction", 0,NULL,1,-1,1,0,0,0}, {"unwatch",unwatchCommand,1, - "no-script fast @transaction", + "no-script fast ok-loading ok-stale @transaction", 0,NULL,0,0,0,0,0,0}, {"cluster",clusterCommand,-2, @@ -4078,6 +4082,8 @@ int processCommand(client *c, int callFlags, AeLocker &locker) { c->cmd->proc != multiCommand && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && + c->cmd->proc != watchCommand && + c->cmd->proc != unwatchCommand && !(c->cmd->proc == shutdownCommand && c->argc == 2 && tolower(((char*)ptrFromObj(c->argv[1]))[0]) == 'n') && diff --git a/src/server.h b/src/server.h index 81f47304c..cbff46156 100644 --- a/src/server.h +++ b/src/server.h @@ -228,22 +228,24 @@ class robj_sharedptr public: robj_sharedptr() - : m_ptr(nullptr) - {} - robj_sharedptr(redisObject *ptr) - : m_ptr(ptr) - { + : m_ptr(nullptr) + {} + explicit robj_sharedptr(redisObject *ptr) + : m_ptr(ptr) + { + if(m_ptr) incrRefCount(ptr); - } + } ~robj_sharedptr() { if (m_ptr) decrRefCount(m_ptr); } robj_sharedptr(const robj_sharedptr& other) - { - m_ptr = other.m_ptr; - incrRefCount(m_ptr); + : m_ptr(other.m_ptr) + { + if(m_ptr) + incrRefCount(m_ptr); } robj_sharedptr(robj_sharedptr&& other) @@ -254,41 +256,19 @@ public: robj_sharedptr &operator=(const robj_sharedptr& other) { - if (m_ptr) - decrRefCount(m_ptr); - m_ptr = other.m_ptr; - incrRefCount(m_ptr); + robj_sharedptr tmp(other); + using std::swap; + swap(m_ptr, tmp.m_ptr); return *this; } robj_sharedptr &operator=(redisObject *ptr) { - if (m_ptr) - decrRefCount(m_ptr); - m_ptr = ptr; - incrRefCount(m_ptr); + robj_sharedptr tmp(ptr); + using std::swap; + swap(m_ptr, tmp.m_ptr); return *this; } - - bool operator==(const robj_sharedptr &other) const - { - return m_ptr == other.m_ptr; - } - - bool operator==(const void *p) const - { - return m_ptr == p; - } - - bool operator!=(const robj_sharedptr &other) const - { - return m_ptr != other.m_ptr; - } - - bool operator!=(const void *p) const - { - return m_ptr != p; - } - + redisObject* operator->() const { return m_ptr; @@ -299,7 +279,7 @@ public: return !m_ptr; } - operator bool() const{ + explicit operator bool() const{ return !!m_ptr; } @@ -309,8 +289,39 @@ public: } redisObject *get() { return m_ptr; } + const redisObject *get() const { return m_ptr; } }; +inline bool operator==(const robj_sharedptr &lhs, const robj_sharedptr &rhs) +{ + return lhs.get() == rhs.get(); +} + +inline bool operator!=(const robj_sharedptr &lhs, const robj_sharedptr &rhs) +{ + return !(lhs == rhs); +} + +inline bool operator==(const robj_sharedptr &lhs, const void *p) +{ + return lhs.get() == p; +} + +inline bool operator==(const void *p, const robj_sharedptr &rhs) +{ + return rhs == p; +} + +inline bool operator!=(const robj_sharedptr &lhs, const void *p) +{ + return !(lhs == p); +} + +inline bool operator!=(const void *p, const robj_sharedptr &rhs) +{ + return !(rhs == p); +} + /* Error codes */ #define C_OK 0 #define C_ERR -1 @@ -2033,6 +2044,7 @@ struct redisServerConst { int trial_timeout = 120; int delete_on_evict = false; // Only valid when a storage provider is set int thread_min_client_threshold = 50; + int multimaster_no_forward; }; struct redisServer { @@ -3236,6 +3248,7 @@ void getCommand(client *c); void delCommand(client *c); void unlinkCommand(client *c); void existsCommand(client *c); +void mexistsCommand(client *c); void setbitCommand(client *c); void getbitCommand(client *c); void bitfieldCommand(client *c); diff --git a/src/tls.cpp b/src/tls.cpp index 4284a27bf..25ca0bd31 100644 --- a/src/tls.cpp +++ b/src/tls.cpp @@ -226,7 +226,7 @@ int tlsConfigure(redisTLSContextConfig *ctx_config) { SSL_CTX_set_ecdh_auto(ctx, 1); #endif - if (SSL_CTX_use_certificate_file(ctx, ctx_config->cert_file, SSL_FILETYPE_PEM) <= 0) { + if (SSL_CTX_use_certificate_chain_file(ctx, ctx_config->cert_file) <= 0) { ERR_error_string_n(ERR_get_error(), errbuf, sizeof(errbuf)); serverLog(LL_WARNING, "Failed to load certificate: %s: %s", ctx_config->cert_file, errbuf); goto error; diff --git a/src/tracking.cpp b/src/tracking.cpp index 2bb53e615..2a5afcd15 100644 --- a/src/tracking.cpp +++ b/src/tracking.cpp @@ -101,8 +101,8 @@ void disableTracking(client *c) { /* Set the client 'c' to track the prefix 'prefix'. If the client 'c' is * already registered for the specified prefix, no operation is performed. */ -void enableBcastTrackingForPrefix(client *c, const char *prefix, size_t plen) { - bcastState *bs = (bcastState*)raxFind(PrefixTable,(unsigned char*)prefix,sdslen(prefix)); +static void enableBcastTrackingForPrefix(client *c, const char *prefix, size_t plen) { + bcastState *bs = (bcastState*)raxFind(PrefixTable,(unsigned char*)prefix,plen); /* If this is the first client subscribing to such prefix, create * the prefix in the table. */ if (bs == raxNotFound) { diff --git a/tests/integration/psync2-pingoff.tcl b/tests/integration/psync2-pingoff.tcl index 5a9a46d16..cdecfc5c6 100644 --- a/tests/integration/psync2-pingoff.tcl +++ b/tests/integration/psync2-pingoff.tcl @@ -64,6 +64,7 @@ start_server {} { # make sure replication is still alive and kicking $R(1) incr x wait_for_condition 50 1000 { + [status $R(0) loading] == 0 && [$R(0) get x] == 1 } else { fail "replica didn't get incr" diff --git a/tests/integration/replication-active.tcl b/tests/integration/replication-active.tcl index fb49e5dc4..c8e7ef367 100644 --- a/tests/integration/replication-active.tcl +++ b/tests/integration/replication-active.tcl @@ -265,3 +265,18 @@ foreach mdl {no yes} { } } } + +start_server {tags {"active-repl"} overrides {active-replica yes}} { + set slave [srv 0 client] + set slave_host [srv 0 host] + set slave_port [srv 0 port] + start_server {tags {"active-repl"} overrides { active-replica yes}} { + r set testkeyB bar + test {Active Replica Merges Database On Sync} { + $slave set testkeyA foo + r replicaof $slave_host $slave_port + after 1000 + assert_equal 2 [r dbsize] + } + } +} diff --git a/tests/integration/replication-multimaster.tcl b/tests/integration/replication-multimaster.tcl index 858cff26a..0e753c147 100644 --- a/tests/integration/replication-multimaster.tcl +++ b/tests/integration/replication-multimaster.tcl @@ -1,4 +1,6 @@ foreach topology {mesh ring} { + +foreach noforward [expr {[string equal $topology "mesh"] ? {no yes} : {no}}] { start_server {tags {"multi-master"} overrides {hz 500 active-replica yes multi-master yes}} { start_server {overrides {hz 500 active-replica yes multi-master yes}} { start_server {overrides {hz 500 active-replica yes multi-master yes}} { @@ -8,8 +10,12 @@ start_server {overrides {hz 500 active-replica yes multi-master yes}} { set R($j) [srv [expr 0-$j] client] set R_host($j) [srv [expr 0-$j] host] set R_port($j) [srv [expr 0-$j] port] + + $R($j) config set multi-master-no-forward $noforward } + set topology_name "$topology[expr {[string equal $noforward "yes"] ? " no-forward" : ""}]" + # Initialize as mesh if [string equal $topology "mesh"] { for {set j 0} {$j < 4} {incr j} { @@ -31,7 +37,7 @@ start_server {overrides {hz 500 active-replica yes multi-master yes}} { $R(3) replicaof $R_host(2) $R_port(2) } - test "$topology all nodes up" { + test "$topology_name all nodes up" { for {set j 0} {$j < 4} {incr j} { wait_for_condition 50 100 { [string match {*master_global_link_status:up*} [$R($j) info replication]] @@ -41,7 +47,7 @@ start_server {overrides {hz 500 active-replica yes multi-master yes}} { } } - test "$topology replicates to all nodes" { + test "$topology_name replicates to all nodes" { $R(0) set testkey foo after 500 for {set n 0} {$n < 4} {incr n} { @@ -53,7 +59,7 @@ start_server {overrides {hz 500 active-replica yes multi-master yes}} { } } - test "$topology replicates only once" { + test "$topology_name replicates only once" { $R(0) set testkey 1 after 500 #wait_for_condition 50 100 { @@ -74,7 +80,7 @@ start_server {overrides {hz 500 active-replica yes multi-master yes}} { } } - test "$topology transaction replicates only once" { + test "$topology_name transaction replicates only once" { for {set j 0} {$j < 1000} {incr j} { $R(0) set testkey 1 $R(0) multi @@ -95,3 +101,4 @@ start_server {overrides {hz 500 active-replica yes multi-master yes}} { } } } +} diff --git a/tests/modules/hooks.c b/tests/modules/hooks.c index 665a20481..ff08c3d36 100644 --- a/tests/modules/hooks.c +++ b/tests/modules/hooks.c @@ -30,6 +30,7 @@ * POSSIBILITY OF SUCH DAMAGE. */ +#define REDISMODULE_EXPERIMENTAL_API 1 #include "redismodule.h" #include #include @@ -143,10 +144,12 @@ void flushdbCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void { REDISMODULE_NOT_USED(e); + RedisModule_ThreadSafeContextLock(ctx); RedisModuleFlushInfo *fi = data; char *keyname = (sub == REDISMODULE_SUBEVENT_FLUSHDB_START) ? "flush-start" : "flush-end"; LogNumericEvent(ctx, keyname, fi->dbnum); + RedisModule_ThreadSafeContextUnlock(ctx); } void roleChangeCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) diff --git a/tests/unit/expire.tcl b/tests/unit/expire.tcl index 16006d7da..362c7d8d4 100644 --- a/tests/unit/expire.tcl +++ b/tests/unit/expire.tcl @@ -313,4 +313,14 @@ start_server {tags {"expire"}} { after 3000 assert_equal [r dbsize] 0 } + + test {SET - use KEEPTTL option, TTL should not be removed after loadaof} { + r config set appendonly yes + r set foo bar EX 100 + r set foo bar2 KEEPTTL + after 2000 + r debug loadaof + set ttl [r ttl foo] + assert {$ttl <= 98 && $ttl > 90} + } } diff --git a/tests/unit/multi.tcl b/tests/unit/multi.tcl index 55f18bec8..0c70fbde7 100644 --- a/tests/unit/multi.tcl +++ b/tests/unit/multi.tcl @@ -363,6 +363,9 @@ start_server {tags {"multi"}} { set xx [r get xx] # make sure that either the whole transcation passed or none of it (we actually expect none) assert { $xx == 1 || $xx == 3} + # Discard the transaction since EXEC likely got -BUSY error + # so the client is still in MULTI state. + catch { $rd2 discard ;$rd2 read } e # check that the connection is no longer in multi state $rd2 ping asdf set pong [$rd2 read]