diff --git a/.gitignore b/.gitignore index c54671d59..a2d8033da 100644 --- a/.gitignore +++ b/.gitignore @@ -19,7 +19,7 @@ keydb-cli redis-sentinel keydb-sentinel redis-server -keydb-server +keydb-pro-server doc-tools release misc/* diff --git a/README.md b/README.md index c06a32d6c..e081ed024 100644 --- a/README.md +++ b/README.md @@ -158,19 +158,19 @@ Running KeyDB To run KeyDB with the default configuration just type: % cd src - % ./keydb-server + % ./keydb-pro-server If you want to provide your keydb.conf, you have to run it using an additional parameter (the path of the configuration file): % cd src - % ./keydb-server /path/to/keydb.conf + % ./keydb-pro-server /path/to/keydb.conf It is possible to alter the KeyDB configuration by passing parameters directly as options using the command line. Examples: - % ./keydb-server --port 9999 --replicaof 127.0.0.1 6379 - % ./keydb-server /etc/keydb/6379.conf --loglevel debug + % ./keydb-pro-server --port 9999 --replicaof 127.0.0.1 6379 + % ./keydb-pro-server /etc/keydb/6379.conf --loglevel debug All the options in keydb.conf are also supported as options using the command line, with exactly the same name. @@ -178,7 +178,7 @@ line, with exactly the same name. Playing with KeyDB ------------------ -You can use keydb-cli to play with KeyDB. Start a keydb-server instance, +You can use keydb-cli to play with KeyDB. Start a keydb-pro-server instance, then in another terminal try the following: % cd src @@ -243,7 +243,7 @@ Simply make a directory you would like to have the latest binaries dumped in, th ``` $ docker run -it --rm -v /path-to-dump-binaries:/keydb_bin eqalpha/keydb-build-bin ``` -You should receive the following files: keydb-benchmark, keydb-check-aof, keydb-check-rdb, keydb-cli, keydb-sentinel, keydb-server +You should receive the following files: keydb-benchmark, keydb-check-aof, keydb-check-rdb, keydb-cli, keydb-sentinel, keydb-pro-server If you are looking to enable flash support with the build (make MALLOC=memkind) then use the following command: ``` diff --git a/deps/hiredis/Makefile b/deps/hiredis/Makefile index 841990d5c..c231a9b93 100644 --- a/deps/hiredis/Makefile +++ b/deps/hiredis/Makefile @@ -29,9 +29,9 @@ INSTALL_INCLUDE_PATH= $(DESTDIR)$(PREFIX)/$(INCLUDE_PATH) INSTALL_LIBRARY_PATH= $(DESTDIR)$(PREFIX)/$(LIBRARY_PATH) INSTALL_PKGCONF_PATH= $(INSTALL_LIBRARY_PATH)/$(PKGCONF_PATH) -# keydb-server configuration used for testing +# keydb-pro-server configuration used for testing REDIS_PORT=56379 -REDIS_SERVER=keydb-server +REDIS_SERVER=keydb-pro-server define REDIS_TEST_CONFIG daemonize yes pidfile /tmp/hiredis-test-redis.pid diff --git a/keydb.conf b/keydb.conf index 896528241..32442744c 100644 --- a/keydb.conf +++ b/keydb.conf @@ -3,7 +3,7 @@ # Note that in order to read the configuration file, KeyDB must be # started with the file path as first argument: # -# ./keydb-server /path/to/keydb.conf +# ./keydb-pro-server /path/to/keydb.conf # Note on units: when memory size is needed, it is possible to specify # it in the usual form of 1k 5GB 4M and so forth: diff --git a/src/Makefile b/src/Makefile index 7548c6a1d..45c70f6d6 100644 --- a/src/Makefile +++ b/src/Makefile @@ -319,7 +319,7 @@ else endif @touch $@ -# keydb-server +# keydb-pro-server $(REDIS_SERVER_NAME): $(REDIS_SERVER_OBJ) $(REDIS_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/lua/src/liblua.a ../deps/rocksdb/librocksdb.a $(FINAL_LIBS) diff --git a/src/ae.cpp b/src/ae.cpp index d84a5b0d6..25898007e 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -276,7 +276,8 @@ int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg) cmd.proc = proc; cmd.clientData = arg; auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); - AE_ASSERT(size == sizeof(cmd)); + if (size != sizeof(cmd)) + return AE_ERR; return AE_OK; } @@ -402,10 +403,18 @@ extern "C" void aeDeleteEventLoop(aeEventLoop *eventLoop) { aeApiFree(eventLoop); zfree(eventLoop->events); zfree(eventLoop->fired); - zfree(eventLoop); fastlock_free(&eventLoop->flock); close(eventLoop->fdCmdRead); close(eventLoop->fdCmdWrite); + + auto *te = eventLoop->timeEventHead; + while (te) + { + auto *teNext = te->next; + zfree(te); + te = teNext; + } + zfree(eventLoop); } extern "C" void aeStop(aeEventLoop *eventLoop) { diff --git a/src/aof.cpp b/src/aof.cpp index 7b0b96f2d..b9e70734e 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -124,6 +124,21 @@ void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) { } } +void installAofRewriteEvent() +{ + serverTL->fRetrySetAofEvent = false; + if (!g_pserver->aof_rewrite_pending) { + g_pserver->aof_rewrite_pending = true; + int res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [] { + g_pserver->aof_rewrite_pending = false; + if (g_pserver->aof_pipe_write_data_to_child >= 0) + aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL); + }); + if (res != AE_OK) + serverTL->fRetrySetAofEvent = true; + } +} + /* Append data to the AOF rewrite buffer, allocating new blocks if needed. */ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) { listNode *ln = listLast(g_pserver->aof_rewrite_buf_blocks); @@ -165,14 +180,7 @@ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) { /* Install a file event to send data to the rewrite child if there is * not one already. */ - if (!g_pserver->aof_rewrite_pending) { - g_pserver->aof_rewrite_pending = true; - aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [] { - g_pserver->aof_rewrite_pending = false; - if (g_pserver->aof_pipe_write_data_to_child >= 0) - aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL); - }); - } + installAofRewriteEvent(); } /* Write the buffer (possibly composed of multiple blocks) into the specified @@ -346,6 +354,9 @@ void flushAppendOnlyFile(int force) { int sync_in_progress = 0; mstime_t latency; + if (serverTL->fRetrySetAofEvent) + installAofRewriteEvent(); + if (sdslen(g_pserver->aof_buf) == 0) { /* Check if we need to do fsync even the aof buffer is empty, * because previously in AOF_FSYNC_EVERYSEC mode, fsync is @@ -1595,16 +1606,18 @@ error: void aofClosePipes(void) { int fdAofAckPipe = g_pserver->aof_pipe_read_ack_from_child; - aePostFunction(g_pserver->el_alf_pip_read_ack_from_child, [fdAofAckPipe]{ + int res = aePostFunction(g_pserver->el_alf_pip_read_ack_from_child, [fdAofAckPipe]{ aeDeleteFileEventAsync(serverTL->el,fdAofAckPipe,AE_READABLE); close (fdAofAckPipe); }); + serverAssert(res == AE_OK); int fdAofWritePipe = g_pserver->aof_pipe_write_data_to_child; - aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [fdAofWritePipe]{ + res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [fdAofWritePipe]{ aeDeleteFileEventAsync(serverTL->el,fdAofWritePipe,AE_WRITABLE); close(fdAofWritePipe); }); + serverAssert(res == AE_OK); g_pserver->aof_pipe_write_data_to_child = -1; close(g_pserver->aof_pipe_read_data_from_parent); diff --git a/src/cluster.cpp b/src/cluster.cpp index e8eec9ee9..b663150a0 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -296,6 +296,15 @@ int clusterLoadConfig(char *filename) { if (clusterGetMaxEpoch() > g_pserver->cluster->currentEpoch) { g_pserver->cluster->currentEpoch = clusterGetMaxEpoch(); } + + if (dictSize(g_pserver->cluster->nodes) > 1 && cserver.thread_min_client_threshold < 100) + { + // Because we expect the individual load of a client to be much less in a cluster (it will spread over multiple server) + // we can increase the grouping of clients on a single thread within reason + cserver.thread_min_client_threshold *= dictSize(g_pserver->cluster->nodes); + cserver.thread_min_client_threshold = std::min(cserver.thread_min_client_threshold, 200); + serverLog(LL_NOTICE, "Expanding min-clients-per-thread to %d due to cluster", cserver.thread_min_client_threshold); + } return C_OK; fmterr: @@ -624,9 +633,10 @@ void freeClusterLink(clusterLink *link) { if (link->node) link->node->link = NULL; link->node = nullptr; - aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [link]{ + int res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [link]{ freeClusterLink(link); }); + serverAssert(res == AE_OK); return; } if (link->conn) { diff --git a/src/db.cpp b/src/db.cpp index 29067c975..9adf8cd11 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -189,9 +189,8 @@ robj_roptr lookupKeyRead(redisDb *db, robj *key) { * Returns the linked value object if the key exists or NULL if the key * does not exist in the specified DB. */ robj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags) { + expireIfNeeded(db,key); robj *o = lookupKey(db,key,flags|LOOKUP_UPDATEMVCC); - if (expireIfNeeded(db,key)) - o = nullptr; return o; } @@ -412,7 +411,9 @@ bool redisDbPersistentData::syncDelete(robj *key) auto itr = m_pdbSnapshot->find_cached_threadsafe(szFromObj(key)); if (itr != nullptr) { - dictAdd(m_pdictTombstone, sdsdup(szFromObj(key)), nullptr); + sds keyTombstone = sdsdup(szFromObj(key)); + if (dictAdd(m_pdictTombstone, keyTombstone, nullptr) != DICT_OK) + sdsfree(keyTombstone); } } if (g_pserver->cluster_enabled) slotToKeyDel(key); @@ -1197,7 +1198,7 @@ void shutdownCommand(client *c) { * Also when in Sentinel mode clear the SAVE flag and force NOSAVE. */ if (g_pserver->loading || g_pserver->sentinel_mode) flags = (flags & ~SHUTDOWN_SAVE) | SHUTDOWN_NOSAVE; - if (prepareForShutdown(flags) == C_OK) exit(0); + if (prepareForShutdown(flags) == C_OK) throw ShutdownException(); addReplyError(c,"Errors trying to SHUTDOWN. Check logs."); } @@ -1496,6 +1497,14 @@ void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when) rememberSlaveKeyWithExpire(db,key); } +redisDb::~redisDb() +{ + dictRelease(watched_keys); + dictRelease(ready_keys); + dictRelease(blocking_keys); + listRelease(defrag_later); +} + void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e) { serverAssert(GlobalLocksAcquired()); @@ -2256,7 +2265,6 @@ redisDbPersistentData::changelist redisDbPersistentData::processChanges() serverAssert(m_fTrackingChanges >= 0); changelist vecRet; - fastlock_lock(&m_lockStorage); if (m_spstorage != nullptr) { m_spstorage->beginWriteBatch(); @@ -2296,7 +2304,6 @@ void redisDbPersistentData::commitChanges(const changelist &vec) } if (m_spstorage != nullptr) m_spstorage->endWriteBatch(); - fastlock_unlock(&m_lockStorage); } redisDbPersistentData::~redisDbPersistentData() @@ -2456,6 +2463,9 @@ std::unique_ptr deserializeExpire(sds key, const char *str, size_t spexpire = std::make_unique(key, subkey, when); else spexpire->update(subkey, when); + + if (subkey) + sdsfree(subkey); } *poffset = offset; diff --git a/src/debug.cpp b/src/debug.cpp index 99e7a0950..4d50f771b 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -736,8 +736,26 @@ NULL } else if (!strcasecmp(szFromObj(c->argv[1]),"stringmatch-test") && c->argc == 2) { stringmatchlen_fuzz_test(); addReplyStatus(c,"Apparently Redis did not crash: test passed"); - } else if (!strcasecmp(szFromObj(c->argv[1]), "force-master") && c->argc == 2) { + } else if (!strcasecmp(szFromObj(c->argv[1]), "force-master") && c->argc == 3) { c->flags |= CLIENT_MASTER | CLIENT_MASTER_FORCE_REPLY; + if (!strcasecmp(szFromObj(c->argv[2]), "yes")) + { + redisMaster *mi = (redisMaster*)zcalloc(sizeof(redisMaster), MALLOC_LOCAL); + mi->master = c; + listAddNodeHead(g_pserver->masters, mi); + } + else if (strcasecmp(szFromObj(c->argv[2]), "flagonly")) // if we didn't set flagonly assume its an unset + { + serverAssert(c->flags & CLIENT_MASTER); + if (listLength(g_pserver->masters)) + { + redisMaster *mi = (redisMaster*)listNodeValue(listFirst(g_pserver->masters)); + serverAssert(mi->master == c); + listDelNode(g_pserver->masters, listFirst(g_pserver->masters)); + zfree(mi); + } + c->flags &= ~(CLIENT_MASTER | CLIENT_MASTER_FORCE_REPLY); + } addReply(c, shared.ok); #ifdef USE_JEMALLOC } else if(!strcasecmp(szFromObj(c->argv[1]),"mallctl") && c->argc >= 3) { @@ -1542,7 +1560,7 @@ void sigsegvHandler(int sig, siginfo_t *info, void *secret) { "\n=== KEYDB BUG REPORT END. Make sure to include from START to END. ===\n\n" " Please report the crash by opening an issue on github:\n\n" " https://github.com/JohnSully/KeyDB/issues\n\n" -" Suspect RAM error? Use keydb-server --test-memory to verify it.\n\n" +" Suspect RAM error? Use keydb-pro-server --test-memory to verify it.\n\n" ); /* free(messages); Don't call free() with possibly corrupted memory. */ diff --git a/src/fastlock.cpp b/src/fastlock.cpp index afa4e4b36..7dd6cda4d 100644 --- a/src/fastlock.cpp +++ b/src/fastlock.cpp @@ -74,6 +74,10 @@ extern int g_fInCrash; #define __has_feature(x) 0 #endif +#ifdef __linux__ +extern "C" void unlock_futex(struct fastlock *lock, uint16_t ifutex); +#endif + #if __has_feature(thread_sanitizer) /* Report that a lock has been created at address "lock". */ @@ -206,6 +210,11 @@ DeadlockDetector g_dlock; static_assert(sizeof(pid_t) <= sizeof(fastlock::m_pidOwner), "fastlock::m_pidOwner not large enough"); uint64_t g_longwaits = 0; +extern "C" void fastlock_panic(struct fastlock *lock) +{ + _serverPanic(__FILE__, __LINE__, "fastlock lock/unlock mismatch for: %s", lock->szName); +} + uint64_t fastlock_getlongwaitcount() { uint64_t rval; @@ -337,31 +346,6 @@ extern "C" int fastlock_trylock(struct fastlock *lock, int fWeak) return false; } -#ifdef __linux__ -#define ROL32(v, shift) ((v << shift) | (v >> (32-shift))) -void unlock_futex(struct fastlock *lock, uint16_t ifutex) -{ - unsigned mask = (1U << (ifutex % 32)); - unsigned futexT; - __atomic_load(&lock->futex, &futexT, __ATOMIC_RELAXED); - futexT &= mask; - - if (futexT == 0) - return; - - for (;;) - { - __atomic_load(&lock->futex, &futexT, __ATOMIC_ACQUIRE); - futexT &= mask; - if (!futexT) - break; - - if (futex(&lock->m_ticket.u, FUTEX_WAKE_BITSET_PRIVATE, INT_MAX, nullptr, mask) == 1) - break; - } -} -#endif - extern "C" void fastlock_unlock(struct fastlock *lock) { --lock->m_depth; @@ -384,6 +368,26 @@ extern "C" void fastlock_unlock(struct fastlock *lock) } #endif +#ifdef __linux__ +#define ROL32(v, shift) ((v << shift) | (v >> (32-shift))) +extern "C" void unlock_futex(struct fastlock *lock, uint16_t ifutex) +{ + unsigned mask = (1U << (ifutex % 32)); + unsigned futexT; + + for (;;) + { + __atomic_load(&lock->futex, &futexT, __ATOMIC_ACQUIRE); + futexT &= mask; + if (!futexT) + break; + + if (futex(&lock->m_ticket.u, FUTEX_WAKE_BITSET_PRIVATE, INT_MAX, nullptr, mask) == 1) + break; + } +} +#endif + extern "C" void fastlock_free(struct fastlock *lock) { // NOP @@ -413,4 +417,4 @@ void fastlock_lock_recursive(struct fastlock *lock, int nesting) { fastlock_lock(lock); lock->m_depth = nesting; -} \ No newline at end of file +} diff --git a/src/fastlock_x64.asm b/src/fastlock_x64.asm index 7c9990a6d..bcea0e095 100644 --- a/src/fastlock_x64.asm +++ b/src/fastlock_x64.asm @@ -126,6 +126,7 @@ fastlock_trylock: .ALIGN 16 .global fastlock_unlock +.type fastlock_unlock,@function fastlock_unlock: # RDI points to the struct: # int32_t m_pidOwner @@ -133,34 +134,19 @@ fastlock_unlock: # [rdi+64] ... # uint16_t active # uint16_t avail - push r11 sub dword ptr [rdi+4], 1 # decrement m_depth, don't use dec because it partially writes the flag register and we don't know its state jnz .LDone # if depth is non-zero this is a recursive unlock, and we still hold it mov dword ptr [rdi], -1 # pidOwner = -1 (we don't own it anymore) - mov ecx, [rdi+64] # get current active (this one) - inc ecx # bump it to the next thread - mov [rdi+64], cx # give up our ticket (note: lock is not required here because the spinlock itself guards this variable) + mov esi, [rdi+64] # get current active (this one) + inc esi # bump it to the next thread + mov word ptr [rdi+64], si # give up our ticket (note: lock is not required here because the spinlock itself guards this variable) mfence # sync other threads # At this point the lock is removed, however we must wake up any pending futexs - mov r9d, 1 # eax is the bitmask for 2 threads - rol r9d, cl # place the mask in the right spot for the next 2 threads - add rdi, 64 # rdi now points to the token + mov edx, [rdi+64+4] # load the futex mask + bt edx, esi # is the next thread waiting on a futex? + jc unlock_futex # unlock the futex if necessary + ret # if not we're done. .ALIGN 16 -.LRetryWake: - mov r11d, [rdi+4] # load the futex mask - and r11d, r9d # are any threads waiting on a futex? - jz .LDone # if not we're done. - # we have to wake the futexs - # rdi ARG1 futex (already in rdi) - mov esi, (10 | 128) # rsi ARG2 FUTEX_WAKE_BITSET_PRIVATE - mov edx, 0x7fffffff # rdx ARG3 INT_MAX (number of threads to wake) - xor r10d, r10d # r10 ARG4 NULL - mov r8, rdi # r8 ARG5 dup rdi - # r9 ARG6 mask (already set above) - mov eax, 202 # sys_futex - syscall - cmp eax, 1 # did we wake as many as we expected? - jnz .LRetryWake .LDone: - pop r11 + js fastlock_panic # panic if we made m_depth negative ret diff --git a/src/gc.h b/src/gc.h index 6612b0afb..9717dcbee 100644 --- a/src/gc.h +++ b/src/gc.h @@ -30,6 +30,12 @@ public: return m_epochNext; } + void shutdown() + { + std::unique_lock lock(m_lock); + m_vecepochs.clear(); + } + void endEpoch(uint64_t epoch, bool fNoFree = false) { std::unique_lock lock(m_lock); diff --git a/src/module.cpp b/src/module.cpp index 234b7ecc4..0647b4a15 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -30,6 +30,7 @@ #include "server.h" #include "cluster.h" #include "rdb.h" +#include "aelocker.h" #include #include #include @@ -1276,7 +1277,10 @@ client *moduleGetReplyClient(RedisModuleCtx *ctx) { int RM_ReplyWithLongLong(RedisModuleCtx *ctx, long long ll) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyLongLong(c,ll); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyLongLongAsync(c,ll); return REDISMODULE_OK; } @@ -1286,9 +1290,12 @@ int RM_ReplyWithLongLong(RedisModuleCtx *ctx, long long ll) { int replyWithStatus(RedisModuleCtx *ctx, const char *msg, const char *prefix) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyProto(c,prefix,strlen(prefix)); - addReplyProto(c,msg,strlen(msg)); - addReplyProto(c,"\r\n",2); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyProtoAsync(c,prefix,strlen(prefix)); + addReplyProtoAsync(c,msg,strlen(msg)); + addReplyProtoAsync(c,"\r\n",2); return REDISMODULE_OK; } @@ -1332,15 +1339,19 @@ int RM_ReplyWithSimpleString(RedisModuleCtx *ctx, const char *msg) { * The function always returns REDISMODULE_OK. */ int RM_ReplyWithArray(RedisModuleCtx *ctx, long len) { client *c = moduleGetReplyClient(ctx); + AeLocker locker; + if (c == NULL) return REDISMODULE_OK; + std::unique_lock lock(c->lock); + locker.arm(c); if (len == REDISMODULE_POSTPONED_ARRAY_LEN) { ctx->postponed_arrays = (void**)zrealloc(ctx->postponed_arrays,sizeof(void*)* (ctx->postponed_arrays_count+1), MALLOC_LOCAL); ctx->postponed_arrays[ctx->postponed_arrays_count] = - addReplyDeferredLen(c); + addReplyDeferredLenAsync(c); ctx->postponed_arrays_count++; } else { - addReplyArrayLen(c,len); + addReplyArrayLenAsync(c,len); } return REDISMODULE_OK; } @@ -1352,7 +1363,10 @@ int RM_ReplyWithArray(RedisModuleCtx *ctx, long len) { int RM_ReplyWithNullArray(RedisModuleCtx *ctx) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyNullArray(c); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyNullArrayAsync(c); return REDISMODULE_OK; } @@ -1362,7 +1376,10 @@ int RM_ReplyWithNullArray(RedisModuleCtx *ctx) { int RM_ReplyWithEmptyArray(RedisModuleCtx *ctx) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReply(c,shared.emptyarray); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyAsync(c,shared.emptyarray); return REDISMODULE_OK; } @@ -1395,6 +1412,9 @@ int RM_ReplyWithEmptyArray(RedisModuleCtx *ctx) { void RM_ReplySetArrayLength(RedisModuleCtx *ctx, long len) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return; + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); if (ctx->postponed_arrays_count == 0) { serverLog(LL_WARNING, "API misuse detected in module %s: " @@ -1404,7 +1424,7 @@ void RM_ReplySetArrayLength(RedisModuleCtx *ctx, long len) { return; } ctx->postponed_arrays_count--; - setDeferredArrayLen(c, + setDeferredArrayLenAsync(c, ctx->postponed_arrays[ctx->postponed_arrays_count], len); if (ctx->postponed_arrays_count == 0) { @@ -1419,7 +1439,10 @@ void RM_ReplySetArrayLength(RedisModuleCtx *ctx, long len) { int RM_ReplyWithStringBuffer(RedisModuleCtx *ctx, const char *buf, size_t len) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyBulkCBuffer(c,(char*)buf,len); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyBulkCBufferAsync(c,(char*)buf,len); return REDISMODULE_OK; } @@ -1430,7 +1453,10 @@ int RM_ReplyWithStringBuffer(RedisModuleCtx *ctx, const char *buf, size_t len) { int RM_ReplyWithCString(RedisModuleCtx *ctx, const char *buf) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyBulkCString(c,(char*)buf); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyBulkCStringAsync(c,(char*)buf); return REDISMODULE_OK; } @@ -1440,7 +1466,10 @@ int RM_ReplyWithCString(RedisModuleCtx *ctx, const char *buf) { int RM_ReplyWithString(RedisModuleCtx *ctx, RedisModuleString *str) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyBulk(c,str); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyBulkAsync(c,str); return REDISMODULE_OK; } @@ -1450,7 +1479,10 @@ int RM_ReplyWithString(RedisModuleCtx *ctx, RedisModuleString *str) { int RM_ReplyWithEmptyString(RedisModuleCtx *ctx) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReply(c,shared.emptybulk); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyAsync(c,shared.emptybulk); return REDISMODULE_OK; } @@ -1461,7 +1493,10 @@ int RM_ReplyWithEmptyString(RedisModuleCtx *ctx) { int RM_ReplyWithVerbatimString(RedisModuleCtx *ctx, const char *buf, size_t len) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyVerbatim(c, buf, len, "txt"); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyVerbatimAsync(c, buf, len, "txt"); return REDISMODULE_OK; } @@ -1471,7 +1506,10 @@ int RM_ReplyWithVerbatimString(RedisModuleCtx *ctx, const char *buf, size_t len) int RM_ReplyWithNull(RedisModuleCtx *ctx) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyNull(c); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyNullAsync(c); return REDISMODULE_OK; } @@ -1484,8 +1522,11 @@ int RM_ReplyWithNull(RedisModuleCtx *ctx) { int RM_ReplyWithCallReply(RedisModuleCtx *ctx, RedisModuleCallReply *reply) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); sds proto = sdsnewlen(reply->proto, reply->protolen); - addReplySds(c,proto); + addReplySdsAsync(c,proto); return REDISMODULE_OK; } @@ -1498,7 +1539,10 @@ int RM_ReplyWithCallReply(RedisModuleCtx *ctx, RedisModuleCallReply *reply) { int RM_ReplyWithDouble(RedisModuleCtx *ctx, double d) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyDouble(c,d); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyDoubleAsync(c,d); return REDISMODULE_OK; } @@ -1513,7 +1557,10 @@ int RM_ReplyWithDouble(RedisModuleCtx *ctx, double d) { int RM_ReplyWithLongDouble(RedisModuleCtx *ctx, long double ld) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyHumanLongDouble(c, ld); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyHumanLongDoubleAsync(c, ld); return REDISMODULE_OK; } diff --git a/src/multi.cpp b/src/multi.cpp index af048b791..df2c85d19 100644 --- a/src/multi.cpp +++ b/src/multi.cpp @@ -28,6 +28,7 @@ */ #include "server.h" +bool FInReplicaReplay(); /* ================================ MULTI/EXEC ============================== */ @@ -174,7 +175,7 @@ void execCommand(client *c) { * This way we'll deliver the MULTI/..../EXEC block as a whole and * both the AOF and the replication link will have the same consistency * and atomicity guarantees. */ - if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) { + if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN)) && !(FInReplicaReplay())) { execCommandPropagateMulti(c); must_propagate = 1; } @@ -190,7 +191,10 @@ void execCommand(client *c) { "no permission to execute the command or subcommand" : "no permission to touch the specified keys"); } else { - call(c,g_pserver->loading ? CMD_CALL_NONE : CMD_CALL_FULL); + int flags = g_pserver->loading ? CMD_CALL_NONE : CMD_CALL_FULL; + if (FInReplicaReplay()) + flags &= ~CMD_CALL_PROPAGATE; + call(c,flags); } /* Commands may alter argc/argv, restore mstate. */ diff --git a/src/networking.cpp b/src/networking.cpp index 5186a3404..1f1dc3145 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -663,23 +663,33 @@ void addReplyDoubleAsync(client *c, double d) { addReplyDoubleCore(c, d, true); } +void addReplyBulkCore(client *c, robj_roptr obj, bool fAsync); + /* Add a long double as a bulk reply, but uses a human readable formatting * of the double instead of exposing the crude behavior of doubles to the * dear user. */ -void addReplyHumanLongDouble(client *c, long double d) { +void addReplyHumanLongDoubleCore(client *c, long double d, bool fAsync) { if (c->resp == 2) { robj *o = createStringObjectFromLongDouble(d,1); - addReplyBulk(c,o); + addReplyBulkCore(c,o,fAsync); decrRefCount(o); } else { char buf[MAX_LONG_DOUBLE_CHARS]; int len = ld2string(buf,sizeof(buf),d,LD_STR_HUMAN); - addReplyProto(c,",",1); - addReplyProto(c,buf,len); - addReplyProto(c,"\r\n",2); + addReplyProtoCore(c,",",1,fAsync); + addReplyProtoCore(c,buf,len,fAsync); + addReplyProtoCore(c,"\r\n",2,fAsync); } } +void addReplyHumanLongDouble(client *c, long double d) { + addReplyHumanLongDoubleCore(c, d, false); +} + +void addReplyHumanLongDoubleAsync(client *c, long double d) { + addReplyHumanLongDoubleCore(c, d, true); +} + /* Add a long long as integer reply or bulk len / multi bulk count. * Basically this is used to output . */ void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync) { @@ -914,6 +924,10 @@ void addReplyBulkCString(client *c, const char *s) { addReplyBulkCStringCore(c, s, false); } +void addReplyBulkCStringAsync(client *c, const char *s) { + addReplyBulkCStringCore(c, s, true); +} + /* Add a long long as a bulk reply */ void addReplyBulkLongLong(client *c, long long ll) { char buf[64]; @@ -932,9 +946,9 @@ void addReplyBulkLongLong(client *c, long long ll) { * three first characters of the extension are used, and if the * provided one is shorter than that, the remaining is filled with * spaces. */ -void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext) { +void addReplyVerbatimCore(client *c, const char *s, size_t len, const char *ext, bool fAsync) { if (c->resp == 2) { - addReplyBulkCBuffer(c,s,len); + addReplyBulkCBufferCore(c,s,len,fAsync); } else { char buf[32]; size_t preflen = snprintf(buf,sizeof(buf),"=%zu\r\nxxx:",len+4); @@ -946,12 +960,20 @@ void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext) { p[i] = *ext++; } } - addReplyProto(c,buf,preflen); - addReplyProto(c,s,len); - addReplyProto(c,"\r\n",2); + addReplyProtoCore(c,buf,preflen,fAsync); + addReplyProtoCore(c,s,len,fAsync); + addReplyProtoCore(c,"\r\n",2,fAsync); } } +void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext) { + addReplyVerbatimCore(c, s, len, ext, false); +} + +void addReplyVerbatimAsync(client *c, const char *s, size_t len, const char *ext) { + addReplyVerbatimCore(c, s, len, ext, true); +} + /* Add an array of C strings as status replies with a heading. * This function is typically invoked by from commands that support * subcommands in response to the 'help' subcommand. The help array @@ -1015,6 +1037,27 @@ int clientHasPendingReplies(client *c) { return (c->bufpos || listLength(c->reply)) && !(c->flags & CLIENT_CLOSE_ASAP); } +static std::atomic rgacceptsInFlight[MAX_EVENT_LOOPS]; +int chooseBestThreadForAccept() +{ + int ielMinLoad = 0; + int cclientsMin = INT_MAX; + for (int iel = 0; iel < cserver.cthreads; ++iel) + { + int cclientsThread; + atomicGet(g_pserver->rgthreadvar[iel].cclients, cclientsThread); + cclientsThread += rgacceptsInFlight[iel].load(std::memory_order_relaxed); + if (cclientsThread < cserver.thread_min_client_threshold) + return iel; + if (cclientsThread < cclientsMin) + { + cclientsMin = cclientsThread; + ielMinLoad = iel; + } + } + return ielMinLoad; +} + void clientAcceptHandler(connection *conn) { client *c = (client*)connGetPrivateData(conn); @@ -1156,7 +1199,25 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { if (!g_fTestMode) { - // We always accept on the same thread + { + int ielTarget = chooseBestThreadForAccept(); + rgacceptsInFlight[ielTarget].fetch_add(1, std::memory_order_relaxed); + if (ielTarget != ielCur) + { + char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL); + memcpy(szT, cip, NET_IP_STR_LEN); + int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget, szT]{ + acceptCommonHandler(connCreateAcceptedSocket(cfd),0,szT,ielTarget); + rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed); + zfree(szT); + }); + + if (res == AE_OK) + continue; + } + rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed); + } + LLocalThread: aeAcquireLock(); acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip,ielCur); @@ -1173,10 +1234,15 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { goto LLocalThread; char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL); memcpy(szT, cip, NET_IP_STR_LEN); - aePostFunction(g_pserver->rgthreadvar[iel].el, [cfd, iel, szT]{ + int res = aePostFunction(g_pserver->rgthreadvar[iel].el, [cfd, iel, szT]{ acceptCommonHandler(connCreateAcceptedSocket(cfd),0,szT,iel); zfree(szT); }); + if (res != AE_OK) + { + zfree(szT); + goto LLocalThread; + } } } } @@ -1220,7 +1286,24 @@ void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) { return; } serverLog(LL_VERBOSE,"Accepted connection to %s", g_pserver->unixsocket); - acceptCommonHandler(connCreateAcceptedSocket(cfd),CLIENT_UNIX_SOCKET,NULL,iel); + + aeAcquireLock(); + int ielTarget = rand() % cserver.cthreads; + if (ielTarget == iel) + { + LLocalThread: + acceptCommonHandler(connCreateAcceptedSocket(cfd),CLIENT_UNIX_SOCKET,NULL,iel); + } + else + { + int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget]{ + acceptCommonHandler(connCreateAcceptedSocket(cfd),CLIENT_UNIX_SOCKET,NULL,ielTarget); + }); + if (res != AE_OK) + goto LLocalThread; + } + aeReleaseLock(); + } } @@ -2602,7 +2685,7 @@ NULL { int iel = client->iel; freeClientAsync(client); - aePostFunction(g_pserver->rgthreadvar[client->iel].el, [iel] { + aePostFunction(g_pserver->rgthreadvar[client->iel].el, [iel] { // note: failure is OK freeClientsInAsyncFreeQueue(iel); }); } @@ -3108,6 +3191,7 @@ void unpauseClientsIfNecessary() * * The function returns the total number of events processed. */ int processEventsWhileBlocked(int iel) { + serverAssert(GlobalLocksAcquired()); int iterations = 4; /* See the function top-comment. */ int count = 0; @@ -3117,15 +3201,30 @@ int processEventsWhileBlocked(int iel) { serverAssert(c->flags & CLIENT_PROTECTED); c->lock.unlock(); } + aeReleaseLock(); serverAssertDebug(!GlobalLocksAcquired()); - while (iterations--) { - int events = 0; - events += aeProcessEvents(g_pserver->rgthreadvar[iel].el, AE_FILE_EVENTS|AE_DONT_WAIT); - events += handleClientsWithPendingWrites(iel); - if (!events) break; - count += events; + try + { + while (iterations--) { + int events = 0; + events += aeProcessEvents(g_pserver->rgthreadvar[iel].el, AE_FILE_EVENTS|AE_DONT_WAIT); + events += handleClientsWithPendingWrites(iel); + if (!events) break; + count += events; + } } + catch (...) + { + // Caller expects us to be locked so fix and rethrow + AeLocker locker; + if (c != nullptr) + c->lock.lock(); + locker.arm(c); + locker.release(); + throw; + } + AeLocker locker; if (c != nullptr) c->lock.lock(); diff --git a/src/rdb.cpp b/src/rdb.cpp index 3179e6e2d..e350adf15 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1676,9 +1676,15 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key, uint64_t mvcc_tstamp) { == NULL) return NULL; if (rdbtype == RDB_TYPE_ZSET_2) { - if (rdbLoadBinaryDoubleValue(rdb,&score) == -1) return NULL; + if (rdbLoadBinaryDoubleValue(rdb,&score) == -1) { + sdsfree(sdsele); + return NULL; + } } else { - if (rdbLoadDoubleValue(rdb,&score) == -1) return NULL; + if (rdbLoadDoubleValue(rdb,&score) == -1) { + sdsfree(sdsele); + return NULL; + } } /* Don't care about integer-encoded strings. */ @@ -2487,6 +2493,8 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { decrRefCount(val); val = nullptr; } + decrRefCount(key); + key = nullptr; } if (g_pserver->key_load_delay) usleep(g_pserver->key_load_delay); diff --git a/src/replication.cpp b/src/replication.cpp index b9815a9da..1d52e6d24 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -43,6 +43,8 @@ #include #include #include +#include +#include void replicationDiscardCachedMaster(redisMaster *mi); void replicationResurrectCachedMaster(redisMaster *mi, connection *conn); @@ -354,6 +356,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { cchDbNum = std::min(cchDbNum, sizeof(szDbNum)); // snprintf is tricky like that char szMvcc[128]; + incrementMvccTstamp(); uint64_t mvccTstamp = getMvccTstamp(); int cchMvccNum = snprintf(szMvcc, sizeof(szMvcc), "%lu", mvccTstamp); int cchMvcc = snprintf(szMvcc, sizeof(szMvcc), "$%d\r\n%lu\r\n", cchMvccNum, mvccTstamp); @@ -437,6 +440,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { clientReplyBlock* reply = (clientReplyBlock*)listNodeValue(lnReply); addReplyProtoAsync(replica, reply->buf(), reply->used); } + if (!fSendRaw) { addReplyAsync(replica,shared.crlf); @@ -1485,7 +1489,7 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) while ((ln = listNext(&li))) { if (listNodeValue(ln) == replica) { fFound = true; - break; + break; } } if (!fFound) @@ -2871,6 +2875,8 @@ void freeMasterInfo(redisMaster *mi) zfree(mi->masteruser); if (mi->repl_transfer_tmpfile) zfree(mi->repl_transfer_tmpfile); + if (mi->clientFake) + freeClient(mi->clientFake); delete mi->staleKeyMap; if (mi->cached_master != nullptr) freeClientAsync(mi->cached_master); @@ -2949,6 +2955,11 @@ void replicationHandleMasterDisconnection(redisMaster *mi) { mi->master = NULL; mi->repl_state = REPL_STATE_CONNECT; mi->repl_down_since = g_pserver->unixtime; + if (mi->clientFake) { + freeClient(mi->clientFake); + mi->clientFake = nullptr; + + } /* We lost connection with our master, don't disconnect slaves yet, * maybe we'll be able to PSYNC with our master later. We'll disconnect * the slaves only if we'll have to do a full resync with our master. */ @@ -3823,16 +3834,35 @@ public: return m_cnesting == 1; } + redisMaster *getMi(client *c) + { + if (m_mi == nullptr) + m_mi = MasterInfoFromClient(c); + return m_mi; + } + + int nesting() const { return m_cnesting; } + private: int m_cnesting = 0; bool m_fCancelled = false; + redisMaster *m_mi = nullptr; }; +static thread_local std::unique_ptr s_pstate; + +bool FInReplicaReplay() +{ + return s_pstate != nullptr && s_pstate->nesting() > 0; +} + + +static std::unordered_map g_mapmvcc; + void replicaReplayCommand(client *c) { - static thread_local ReplicaNestState *s_pstate = nullptr; if (s_pstate == nullptr) - s_pstate = new (MALLOC_LOCAL) ReplicaNestState; + s_pstate = std::make_unique(); // the replay command contains two arguments: // 1: The UUID of the source @@ -3854,9 +3884,10 @@ void replicaReplayCommand(client *c) return; } - unsigned char uuid[UUID_BINARY_LEN]; + std::string uuid; + uuid.resize(UUID_BINARY_LEN); if (c->argv[1]->type != OBJ_STRING || sdslen((sds)ptrFromObj(c->argv[1])) != 36 - || uuid_parse((sds)ptrFromObj(c->argv[1]), uuid) != 0) + || uuid_parse((sds)ptrFromObj(c->argv[1]), (unsigned char*)uuid.data()) != 0) { addReplyError(c, "Expected UUID arg1"); s_pstate->Cancel(); @@ -3892,7 +3923,7 @@ void replicaReplayCommand(client *c) } } - if (FSameUuidNoNil(uuid, cserver.uuid)) + if (FSameUuidNoNil((unsigned char*)uuid.data(), cserver.uuid)) { addReply(c, shared.ok); s_pstate->Cancel(); @@ -3902,33 +3933,57 @@ void replicaReplayCommand(client *c) if (!s_pstate->FPush()) return; + redisMaster *mi = s_pstate->getMi(c); + client *cFake = mi->clientFake; + if (mi->clientFakeNesting != s_pstate->nesting()) + cFake = nullptr; + serverAssert(mi != nullptr); + if (mvcc != 0 && g_mapmvcc[uuid] >= mvcc) + { + s_pstate->Cancel(); + s_pstate->Pop(); + return; + } + // OK We've recieved a command lets execute client *current_clientSave = serverTL->current_client; - client *cFake = createClient(nullptr, c->iel); + if (cFake == nullptr) + cFake = createClient(nullptr, c->iel); cFake->lock.lock(); cFake->authenticated = c->authenticated; cFake->puser = c->puser; cFake->querybuf = sdscatsds(cFake->querybuf,(sds)ptrFromObj(c->argv[2])); selectDb(cFake, c->db->id); auto ccmdPrev = serverTL->commandsExecuted; + cFake->flags |= CLIENT_MASTER | CLIENT_PREVENT_REPL_PROP; processInputBuffer(cFake, (CMD_CALL_FULL & (~CMD_CALL_PROPAGATE))); + cFake->flags &= ~(CLIENT_MASTER | CLIENT_PREVENT_REPL_PROP); bool fExec = ccmdPrev != serverTL->commandsExecuted; cFake->lock.unlock(); - if (fExec) + if (fExec || cFake->flags & CLIENT_MULTI) { addReply(c, shared.ok); selectDb(c, cFake->db->id); - redisMaster *mi = MasterInfoFromClient(c); - if (mi != nullptr) // this should never be null but I'd prefer not to crash - { - mi->mvccLastSync = mvcc; - } + if (mvcc > g_mapmvcc[uuid]) + g_mapmvcc[uuid] = mvcc; } else { + serverLog(LL_WARNING, "Command didn't execute: %s", cFake->buf); addReplyError(c, "command did not execute"); } - freeClient(cFake); + serverAssert(sdslen(cFake->querybuf) == 0); + if (cFake->flags & CLIENT_MULTI) + { + mi->clientFake = cFake; + mi->clientFakeNesting = s_pstate->nesting(); + } + else + { + if (mi->clientFake == cFake) + mi->clientFake = nullptr; + freeClient(cFake); + } serverTL->current_client = current_clientSave; // call() will not propogate this for us, so we do so here diff --git a/src/server.cpp b/src/server.cpp index 5c6a2873f..d35656290 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1744,7 +1744,7 @@ void clientsCron(int iel) { /* The following functions do different service checks on the client. * The protocol is that they return non-zero if the client was * terminated. */ - if (clientsCronHandleTimeout(c,now)) goto LContinue; + if (clientsCronHandleTimeout(c,now)) continue; // Client free'd so don't release the lock if (clientsCronResizeQueryBuffer(c)) goto LContinue; if (clientsCronTrackExpansiveClients(c)) goto LContinue; LContinue: @@ -1826,7 +1826,7 @@ void updateCachedTime(int update_daylight_info) { t /= 1000; __atomic_store(&g_pserver->mstime, &t, __ATOMIC_RELAXED); t /= 1000; - __atomic_store(&g_pserver->unixtime, &t, __ATOMIC_RELAXED); + g_pserver->unixtime = t; /* To get information about daylight saving time, we need to call * localtime_r and cache the result. However calling localtime_r in this @@ -2027,7 +2027,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { /* We received a SIGTERM, shutting down here in a safe way, as it is * not ok doing so inside the signal handler. */ if (g_pserver->shutdown_asap) { - if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0); + if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) throw ShutdownException(); serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information"); g_pserver->shutdown_asap = 0; } @@ -3018,6 +3018,7 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain) pvar->el = aeCreateEventLoop(g_pserver->maxclients+CONFIG_FDSET_INCR); pvar->current_client = nullptr; pvar->clients_paused = 0; + pvar->fRetrySetAofEvent = false; if (pvar->el == NULL) { serverLog(LL_WARNING, "Failed creating the event loop. Error message: '%s'", @@ -4026,23 +4027,16 @@ int prepareForShutdown(int flags) { /* Close the listening sockets. Apparently this allows faster restarts. */ closeListeningSockets(1); - for (int ithread = 0; ithread < MAX_EVENT_LOOPS; ++ithread) { - for (int idb = 0; idb < cserver.dbnum; ++idb) { - if (g_pserver->rgthreadvar[ithread].rgdbSnapshot[idb] != nullptr) - g_pserver->db[idb]->endSnapshot(g_pserver->rgthreadvar[ithread].rgdbSnapshot[idb]); - } + for (int iel = 0; iel < cserver.cthreads; ++iel) + { + aePostFunction(g_pserver->rgthreadvar[iel].el, [iel]{ + g_pserver->rgthreadvar[iel].el->stop = 1; + }); } - /* free our databases */ - for (int idb = 0; idb < cserver.dbnum; ++idb) { - delete g_pserver->db[idb]; - g_pserver->db[idb] = nullptr; - } - - delete g_pserver->m_pstorageFactory; - serverLog(LL_WARNING,"%s is now ready to exit, bye bye...", g_pserver->sentinel_mode ? "Sentinel" : "KeyDB"); + return C_OK; } @@ -4969,19 +4963,19 @@ void version(void) { } void usage(void) { - fprintf(stderr,"Usage: ./keydb-server [/path/to/keydb.conf] [options]\n"); - fprintf(stderr," ./keydb-server - (read config from stdin)\n"); - fprintf(stderr," ./keydb-server -v or --version\n"); - fprintf(stderr," ./keydb-server -h or --help\n"); - fprintf(stderr," ./keydb-server --test-memory \n\n"); + fprintf(stderr,"Usage: ./keydb-pro-server [/path/to/keydb.conf] [options]\n"); + fprintf(stderr," ./keydb-pro-server - (read config from stdin)\n"); + fprintf(stderr," ./keydb-pro-server -v or --version\n"); + fprintf(stderr," ./keydb-pro-server -h or --help\n"); + fprintf(stderr," ./keydb-pro-server --test-memory \n\n"); fprintf(stderr,"Examples:\n"); - fprintf(stderr," ./keydb-server (run the server with default conf)\n"); - fprintf(stderr," ./keydb-server /etc/redis/6379.conf\n"); - fprintf(stderr," ./keydb-server --port 7777\n"); - fprintf(stderr," ./keydb-server --port 7777 --replicaof 127.0.0.1 8888\n"); - fprintf(stderr," ./keydb-server /etc/mykeydb.conf --loglevel verbose\n\n"); + fprintf(stderr," ./keydb-pro-server (run the server with default conf)\n"); + fprintf(stderr," ./keydb-pro-server /etc/redis/6379.conf\n"); + fprintf(stderr," ./keydb-pro-server --port 7777\n"); + fprintf(stderr," ./keydb-pro-server --port 7777 --replicaof 127.0.0.1 8888\n"); + fprintf(stderr," ./keydb-pro-server /etc/mykeydb.conf --loglevel verbose\n\n"); fprintf(stderr,"Sentinel mode:\n"); - fprintf(stderr," ./keydb-server /etc/sentinel.conf --sentinel\n"); + fprintf(stderr," ./keydb-pro-server /etc/sentinel.conf --sentinel\n"); exit(1); } @@ -5351,8 +5345,23 @@ void *workerThreadMain(void *parg) aeEventLoop *el = g_pserver->rgthreadvar[iel].el; aeSetBeforeSleepProc(el, beforeSleep, AE_SLEEP_THREADSAFE); aeSetAfterSleepProc(el, afterSleep, AE_SLEEP_THREADSAFE); - aeMain(el); + try + { + aeMain(el); + } + catch (ShutdownException) + { + } + serverAssert(!GlobalLocksAcquired()); aeDeleteEventLoop(el); + + aeAcquireLock(); + for (int idb = 0; idb < cserver.dbnum; ++idb) { + if (g_pserver->rgthreadvar[iel].rgdbSnapshot[idb] != nullptr) + g_pserver->db[idb]->endSnapshot(g_pserver->rgthreadvar[iel].rgdbSnapshot[idb]); + } + aeReleaseLock(); + return NULL; } @@ -5475,7 +5484,7 @@ int main(int argc, char **argv) { exit(0); } else { fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n"); - fprintf(stderr,"Example: ./keydb-server --test-memory 4096\n\n"); + fprintf(stderr,"Example: ./keydb-pro-server --test-memory 4096\n\n"); exit(1); } } @@ -5664,7 +5673,18 @@ int main(int argc, char **argv) { /* The main thread sleeps until all the workers are done. this is so that all worker threads are orthogonal in their startup/shutdown */ void *pvRet; - pthread_join(rgthread[IDX_EVENT_LOOP_MAIN], &pvRet); + for (int iel = 0; iel < cserver.cthreads; ++iel) + pthread_join(rgthread[iel], &pvRet); + + /* free our databases */ + for (int idb = 0; idb < cserver.dbnum; ++idb) { + delete g_pserver->db[idb]; + g_pserver->db[idb] = nullptr; + } + + g_pserver->garbageCollector.shutdown(); + delete g_pserver->m_pstorageFactory; + return 0; } diff --git a/src/server.h b/src/server.h index d85a84235..8a4ef3d22 100644 --- a/src/server.h +++ b/src/server.h @@ -1226,7 +1226,7 @@ class redisDbPersistentData friend class redisDbPersistentDataSnapshot; public: - ~redisDbPersistentData(); + virtual ~redisDbPersistentData(); redisDbPersistentData() = default; redisDbPersistentData(redisDbPersistentData &&) = default; @@ -1358,7 +1358,6 @@ private: std::unique_ptr m_spdbSnapshotHOLDER; const redisDbPersistentDataSnapshot *m_pdbSnapshotASYNC = nullptr; int m_refCount = 0; - fastlock m_lockStorage { "storage" }; }; class redisDbPersistentDataSnapshot : protected redisDbPersistentData @@ -1400,7 +1399,7 @@ public: /* Redis database representation. There are multiple databases identified * by integers from 0 (the default database) up to the max configured * database. The database number is the 'id' field in the structure. */ -typedef struct redisDb : public redisDbPersistentDataSnapshot +struct redisDb : public redisDbPersistentDataSnapshot { // Legacy C API, Do not add more friend void tryResizeHashTables(int); @@ -1424,7 +1423,9 @@ typedef struct redisDb : public redisDbPersistentDataSnapshot redisDb() : expireitr(nullptr) {} + void initialize(int id); + virtual ~redisDb(); void dbOverwriteCore(redisDb::iter itr, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire); @@ -1477,7 +1478,7 @@ public: long long last_expire_set; /* when the last expire was set */ double avg_ttl; /* Average TTL, just for stats */ list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */ -} redisDb; +}; /* Client MULTI/EXEC state */ typedef struct multiCmd { @@ -1904,6 +1905,7 @@ struct redisServerThreadVars { long unsigned commandsExecuted = 0; uint64_t gcEpoch = 0; const redisDbPersistentDataSnapshot **rgdbSnapshot = nullptr; + bool fRetrySetAofEvent = false; }; struct redisMaster { @@ -1913,6 +1915,8 @@ struct redisMaster { int masterport; /* Port of master */ client *cached_master; /* Cached master to be reused for PSYNC. */ client *master; + client *clientFake; + int clientFakeNesting; /* The following two fields is where we store master PSYNC replid/offset * while the PSYNC is in progress. At the end we'll copy the fields into * the server->master client structure. */ @@ -1986,6 +1990,7 @@ struct redisServerConst { sds license_key = nullptr; int trial_timeout = 120; int delete_on_evict = false; // Only valid when a storage provider is set + int thread_min_client_threshold = 50; }; struct redisServer { @@ -2528,10 +2533,12 @@ void addReplyNullArray(client *c); void addReplyNullArrayAsync(client *c); void addReplyBool(client *c, int b); void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext); +void addReplyVerbatimAsync(client *c, const char *s, size_t len, const char *ext); void addReplyProto(client *c, const char *s, size_t len); void addReplyBulk(client *c, robj_roptr obj); void AddReplyFromClient(client *c, client *src); void addReplyBulkCString(client *c, const char *s); +void addReplyBulkCStringAsync(client *c, const char *s); void addReplyBulkCBuffer(client *c, const void *p, size_t len); void addReplyBulkLongLong(client *c, long long ll); void addReply(client *c, robj_roptr obj); @@ -2541,6 +2548,7 @@ void addReplyError(client *c, const char *err); void addReplyStatus(client *c, const char *status); void addReplyDouble(client *c, double d); void addReplyHumanLongDouble(client *c, long double d); +void addReplyHumanLongDoubleAsync(client *c, long double d); void addReplyLongLong(client *c, long long ll); #ifdef __cplusplus void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync); @@ -3396,6 +3404,10 @@ void tlsInit(void); void tlsInitThread(); int tlsConfigure(redisTLSContextConfig *ctx_config); + +class ShutdownException +{}; + #define redisDebug(fmt, ...) \ printf("DEBUG %s:%d > " fmt "\n", __FILE__, __LINE__, __VA_ARGS__) #define redisDebugMark() \ diff --git a/tests/cluster/tests/06-slave-stop-cond.tcl b/tests/cluster/tests/06-slave-stop-cond.tcl index f2e67050b..52110856d 100644 --- a/tests/cluster/tests/06-slave-stop-cond.tcl +++ b/tests/cluster/tests/06-slave-stop-cond.tcl @@ -65,7 +65,7 @@ test "Slave #5 is reachable and alive" { test "Slave #5 should not be able to failover" { after 10000 - assert {[RI 5 role] eq {slave}} + assert_equal {slave} [RI 5 role] } test "Cluster should be down" { diff --git a/tests/integration/replication-active.tcl b/tests/integration/replication-active.tcl index fee3a724c..847129e2c 100644 --- a/tests/integration/replication-active.tcl +++ b/tests/integration/replication-active.tcl @@ -59,6 +59,24 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} { } } + test {Active replicas propogate transaction} { + $master set testkey 0 + $master multi + $master incr testkey + $master incr testkey + after 5000 + $master get testkey + $master exec + assert_equal 2 [$master get testkey] + after 500 + wait_for_condition 50 500 { + [string match "2" [$slave get testkey]] + } else { + fail "Transaction failed to replicate" + } + $master flushall + } + test {Active replicas WAIT} { # Test that wait succeeds since replicas should be syncronized $master set testkey foo diff --git a/tests/integration/replication-multimaster.tcl b/tests/integration/replication-multimaster.tcl new file mode 100644 index 000000000..e5e77fdad --- /dev/null +++ b/tests/integration/replication-multimaster.tcl @@ -0,0 +1,74 @@ +foreach topology {mesh ring} { +start_server {tags {"multi-master"} overrides {hz 500 active-replica yes multi-master yes}} { +start_server {overrides {hz 500 active-replica yes multi-master yes}} { +start_server {overrides {hz 500 active-replica yes multi-master yes}} { +start_server {overrides {hz 500 active-replica yes multi-master yes}} { + + for {set j 0} {$j < 4} {incr j} { + set R($j) [srv [expr 0-$j] client] + set R_host($j) [srv [expr 0-$j] host] + set R_port($j) [srv [expr 0-$j] port] + } + + # Initialize as mesh + if [string equal $topology "mesh"] { + for {set j 0} {$j < 4} {incr j} { + for {set k 0} {$k < 4} {incr k} { + if $j!=$k { + $R($j) replicaof $R_host($k) $R_port($k) + after 100 + } + } + }} + #Else Ring + if [string equal $topology "ring"] { + $R(0) replicaof $R_host(3) $R_port(3) + after 100 + $R(1) replicaof $R_host(0) $R_port(0) + after 100 + $R(2) replicaof $R_host(1) $R_port(1) + after 100 + $R(3) replicaof $R_host(2) $R_port(2) + } + + after 2000 + + test "$topology replicates to all nodes" { + $R(0) set testkey foo + after 500 + assert_equal foo [$R(1) get testkey] "replicates to 1" + assert_equal foo [$R(2) get testkey] "replicates to 2" + } + + test "$topology replicates only once" { + $R(0) set testkey 1 + after 500 + $R(1) incr testkey + after 500 + $R(2) incr testkey + after 500 + assert_equal 3 [$R(0) get testkey] + assert_equal 3 [$R(1) get testkey] + assert_equal 3 [$R(2) get testkey] + assert_equal 3 [$R(3) get testkey] + } + + test "$topology transaction replicates only once" { + for {set j 0} {$j < 1000} {incr j} { + $R(0) set testkey 1 + $R(0) multi + $R(0) incr testkey + $R(0) incr testkey + $R(0) exec + after 1 + assert_equal 3 [$R(0) get testkey] "node 0" + assert_equal 3 [$R(1) get testkey] "node 1" + assert_equal 3 [$R(2) get testkey] "node 2" + assert_equal 3 [$R(3) get testkey] "node 3" + } + } +} +} +} +} +} diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index c6de0feed..1efc0cecf 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -37,6 +37,7 @@ set ::all_tests { unit/acl unit/rreplay unit/cron + unit/replication integration/block-repl integration/replication integration/replication-2 @@ -44,6 +45,7 @@ set ::all_tests { integration/replication-4 integration/replication-psync integration/replication-active + integration/replication-multimaster integration/aof integration/rdb integration/convert-zipmap-hash-on-load diff --git a/tests/unit/replication.tcl b/tests/unit/replication.tcl new file mode 100644 index 000000000..ada6af1e5 --- /dev/null +++ b/tests/unit/replication.tcl @@ -0,0 +1,12 @@ + +start_server {tags {"repl"}} { + test "incr of expired key on replica doesn't cause a crash" { + r debug force-master yes + r set testkey 1 + r pexpire testkey 1 + after 500 + r incr testkey + r incr testkey + r debug force-master no + } +} diff --git a/tests/unit/rreplay.tcl b/tests/unit/rreplay.tcl index 2029f521d..e11030f95 100644 --- a/tests/unit/rreplay.tcl +++ b/tests/unit/rreplay.tcl @@ -1,7 +1,7 @@ -start_server {tags {"rreplay"}} { +start_server {tags {"rreplay"} overrides {active-replica yes}} { test {RREPLAY use current db} { - r debug force-master + r debug force-master yes r select 4 r set dbnum invalid r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$5\r\ndbnum\r\n\$4\r\nfour\r\n" @@ -10,7 +10,7 @@ start_server {tags {"rreplay"}} { reconnect test {RREPLAY db different} { - r debug force-master + r debug force-master yes r select 4 r set testkey four r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$7\r\ntestkey\r\n\$4\r\nbebe\r\n" 2 diff --git a/utils/install_server.sh b/utils/install_server.sh index a1574a2fa..2e3ee9d4a 100755 --- a/utils/install_server.sh +++ b/utils/install_server.sh @@ -37,7 +37,7 @@ # REDIS_CONFIG_FILE=/etc/redis/1234.conf \ # REDIS_LOG_FILE=/var/log/redis_1234.log \ # REDIS_DATA_DIR=/var/lib/redis/1234 \ -# REDIS_EXECUTABLE=`command -v keydb-server` ./utils/install_server.sh +# REDIS_EXECUTABLE=`command -v keydb-pro-server` ./utils/install_server.sh # # This generates a redis config file and an /etc/init.d script, and installs them. # @@ -129,7 +129,7 @@ fi if [ ! -x "$REDIS_EXECUTABLE" ] ; then _MANUAL_EXECUTION=true #get the redis executable path - _REDIS_EXECUTABLE=`command -v keydb-server` + _REDIS_EXECUTABLE=`command -v keydb-pro-server` read -p "Please select the redis executable path [$_REDIS_EXECUTABLE] " REDIS_EXECUTABLE if [ ! -x "$REDIS_EXECUTABLE" ] ; then REDIS_EXECUTABLE=$_REDIS_EXECUTABLE diff --git a/utils/redis_init_script b/utils/redis_init_script index bee5545ef..589792ee3 100755 --- a/utils/redis_init_script +++ b/utils/redis_init_script @@ -12,7 +12,7 @@ ### END INIT INFO REDISPORT=6379 -EXEC=/usr/local/bin/keydb-server +EXEC=/usr/local/bin/keydb-pro-server CLIEXEC=/usr/local/bin/keydb-cli PIDFILE=/var/run/redis_${REDISPORT}.pid diff --git a/utils/speed-regression.tcl b/utils/speed-regression.tcl index 8d5220c75..073f01a19 100755 --- a/utils/speed-regression.tcl +++ b/utils/speed-regression.tcl @@ -27,8 +27,8 @@ proc run-tests branches { } # Start the Redis server - puts " starting the server... [exec ./keydb-server -v]" - set pids [exec echo "port $::port\nloglevel warning\n" | ./keydb-server - > /dev/null 2> /dev/null &] + puts " starting the server... [exec ./keydb-pro-server -v]" + set pids [exec echo "port $::port\nloglevel warning\n" | ./keydb-pro-server - > /dev/null 2> /dev/null &] puts " pids: $pids" after 1000 puts " running the benchmark"