diff --git a/src/Makefile b/src/Makefile index 3af99b3b4..9e4070d4a 100644 --- a/src/Makefile +++ b/src/Makefile @@ -48,8 +48,8 @@ endif USEASM?=true ifneq ($(strip $(SANITIZE)),) - CFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE - CXXFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE + CFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE -fno-omit-frame-pointer + CXXFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE -fno-omit-frame-pointer LDFLAGS+= -fsanitize=$(SANITIZE) MALLOC=libc USEASM=false diff --git a/src/ae.cpp b/src/ae.cpp index 125179c89..89b36beaa 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -109,13 +109,6 @@ enum class AE_ASYNC_OP CreateFileEvent, }; -struct aeCommandControl -{ - std::condition_variable cv; - std::atomic rval; - std::mutex mutexcv; -}; - struct aeCommand { AE_ASYNC_OP op; @@ -128,7 +121,6 @@ struct aeCommand std::function *pfn; }; void *clientData; - aeCommandControl *pctl; }; void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int ) @@ -149,19 +141,7 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int ) break; case AE_ASYNC_OP::CreateFileEvent: - { - if (cmd.pctl != nullptr) - { - cmd.pctl->mutexcv.lock(); - std::atomic_store(&cmd.pctl->rval, aeCreateFileEvent(eventLoop, cmd.fd, cmd.mask, cmd.fproc, cmd.clientData)); - cmd.pctl->cv.notify_all(); - cmd.pctl->mutexcv.unlock(); - } - else - { - aeCreateFileEvent(eventLoop, cmd.fd, cmd.mask, cmd.fproc, cmd.clientData); - } - } + aeCreateFileEvent(eventLoop, cmd.fd, cmd.mask, cmd.fproc, cmd.clientData); break; case AE_ASYNC_OP::PostFunction: @@ -175,19 +155,11 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int ) case AE_ASYNC_OP::PostCppFunction: { - if (cmd.pctl != nullptr) - cmd.pctl->mutexcv.lock(); - std::unique_lock ulock(g_lock, std::defer_lock); if (cmd.fLock) ulock.lock(); (*cmd.pfn)(); - - if (cmd.pctl != nullptr) - { - cmd.pctl->cv.notify_all(); - cmd.pctl->mutexcv.unlock(); - } + delete cmd.pfn; } break; @@ -226,7 +198,7 @@ ssize_t safe_write(int fd, const void *pv, size_t cb) } int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask, - aeFileProc *proc, void *clientData, int fSynchronous) + aeFileProc *proc, void *clientData) { if (eventLoop == g_eventLoopThisThread) return aeCreateFileEvent(eventLoop, fd, mask, proc, clientData); @@ -239,13 +211,7 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask, cmd.mask = mask; cmd.fproc = proc; cmd.clientData = clientData; - cmd.pctl = nullptr; cmd.fLock = true; - if (fSynchronous) - { - cmd.pctl = new (MALLOC_LOCAL) aeCommandControl(); - cmd.pctl->mutexcv.lock(); - } auto size = safe_write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); if (size != sizeof(cmd)) @@ -254,16 +220,6 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask, serverAssert(errno == EAGAIN); ret = AE_ERR; } - - if (fSynchronous) - { - { - std::unique_lock ulock(cmd.pctl->mutexcv, std::adopt_lock); - cmd.pctl->cv.wait(ulock); - ret = cmd.pctl->rval; - } - delete cmd.pctl; - } return ret; } @@ -286,7 +242,7 @@ int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg) return AE_OK; } -int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynchronous, bool fLock, bool fForceQueue) +int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fLock, bool fForceQueue) { if (eventLoop == g_eventLoopThisThread && !fForceQueue) { @@ -297,13 +253,7 @@ int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynch aeCommand cmd = {}; cmd.op = AE_ASYNC_OP::PostCppFunction; cmd.pfn = new (MALLOC_LOCAL) std::function(fn); - cmd.pctl = nullptr; cmd.fLock = fLock; - if (fSynchronous) - { - cmd.pctl = new (MALLOC_LOCAL) aeCommandControl; - cmd.pctl->mutexcv.lock(); - } auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); if (!(!size || size == sizeof(cmd))) { @@ -314,17 +264,7 @@ int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynch if (size == 0) return AE_ERR; - int ret = AE_OK; - if (fSynchronous) - { - { - std::unique_lock ulock(cmd.pctl->mutexcv, std::adopt_lock); - cmd.pctl->cv.wait(ulock); - ret = cmd.pctl->rval; - } - delete cmd.pctl; - } - return ret; + return AE_OK; } aeEventLoop *aeCreateEventLoop(int setsize) { diff --git a/src/ae.h b/src/ae.h index e77abb01f..ab7127f8b 100644 --- a/src/ae.h +++ b/src/ae.h @@ -135,7 +135,7 @@ aeEventLoop *aeCreateEventLoop(int setsize); int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg); #ifdef __cplusplus } // EXTERN C -int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynchronous = false, bool fLock = true, bool fForceQueue = false); +int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fLock = true, bool fForceQueue = false); extern "C" { #endif void aeDeleteEventLoop(aeEventLoop *eventLoop); @@ -144,7 +144,7 @@ int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData); int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask, - aeFileProc *proc, void *clientData, int fSynchronous); + aeFileProc *proc, void *clientData); void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask); void aeDeleteFileEventAsync(aeEventLoop *eventLoop, int fd, int mask); diff --git a/src/cluster.cpp b/src/cluster.cpp index e60807180..8556f1d28 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -5066,12 +5066,15 @@ void mvccrestoreCommand(client *c) { setMvccTstamp(obj, mvcc); /* Create the key and set the TTL if any */ - dbMerge(c->db,key,obj,true); - if (expire >= 0) { - setExpire(c,c->db,key,nullptr,expire); + if (dbMerge(c->db,key,obj,true)) { + if (expire >= 0) { + setExpire(c,c->db,key,nullptr,expire); + } + signalModifiedKey(c,c->db,key); + notifyKeyspaceEvent(NOTIFY_GENERIC,"restore",key,c->db->id); + } else { + decrRefCount(obj); } - signalModifiedKey(c,c->db,key); - notifyKeyspaceEvent(NOTIFY_GENERIC,"restore",key,c->db->id); addReply(c,shared.ok); g_pserver->dirty++; } diff --git a/src/expire.cpp b/src/expire.cpp index 46f52c19c..1d14f7152 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -136,7 +136,7 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { executeCronJobExpireHook(keyCopy, val); sdsfree(keyCopy); decrRefCount(val); - }, false, true /*fLock*/, true /*fForceQueue*/); + }, true /*fLock*/, true /*fForceQueue*/); } return; diff --git a/src/module.cpp b/src/module.cpp index c99ab4685..ed57e7ef4 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -5045,7 +5045,7 @@ void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) { zfree(ctx); } -static bool g_fModuleThread = false; +thread_local bool g_fModuleThread = false; /* Acquire the server lock before executing a thread safe API call. * This is not needed for `RedisModule_Reply*` calls when there is * a blocked client connected to the thread safe context. */ @@ -5104,7 +5104,14 @@ void moduleAcquireGIL(int fServerThread) { } else { - s_mutexModule.lock(); + // It is possible that another module thread holds the GIL (and s_mutexModule as a result). + // When said thread goes to release the GIL, it will wait for s_mutex, which this thread owns. + // This thread is however waiting for the GIL (and s_mutexModule) that the other thread owns. + // As a result, a deadlock has occured. + // We release the lock on s_mutex and wait until we are able to safely acquire the GIL + // in order to prevent this deadlock from occuring. + while (!s_mutexModule.try_lock()) + s_cv.wait(lock); ++s_cAcquisitionsModule; fModuleGILWlocked++; } @@ -5643,6 +5650,9 @@ int moduleTimerHandler(struct aeEventLoop *eventLoop, long long id, void *client * (If the time it takes to execute 'callback' is negligible the two * statements above mean the same) */ RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisModuleTimerProc callback, void *data) { + static uint64_t pending_key; + static mstime_t pending_period = -1; + RedisModuleTimer *timer = (RedisModuleTimer*)zmalloc(sizeof(*timer), MALLOC_LOCAL); timer->module = ctx->module; timer->callback = callback; @@ -5661,32 +5671,40 @@ RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisMod } } + bool fNeedPost = (pending_period < 0); // If pending_period is already set, then a PostFunction is in flight and we don't need to set a new one + if (pending_period < 0 || period < pending_period) { + pending_period = period; + pending_key = key; + } + /* We need to install the main event loop timer if it's not already * installed, or we may need to refresh its period if we just installed * a timer that will expire sooner than any other else (i.e. the timer * we just installed is the first timer in the Timers rax). */ - if (aeTimer != -1) { - raxIterator ri; - raxStart(&ri,Timers); - raxSeek(&ri,"^",NULL,0); - raxNext(&ri); - if (memcmp(ri.key,&key,sizeof(key)) == 0) { - /* This is the first key, we need to re-install the timer according - * to the just added event. */ - aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [&]{ - aeDeleteTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,aeTimer); - }, true /* synchronous */, false /* fLock */); - aeTimer = -1; - } - raxStop(&ri); - } + if (fNeedPost) { + aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, []{ + if (aeTimer != -1) { + raxIterator ri; + raxStart(&ri,Timers); + raxSeek(&ri,"^",NULL,0); + raxNext(&ri); + if (memcmp(ri.key,&pending_key,sizeof(key)) == 0) { + /* This is the first key, we need to re-install the timer according + * to the just added event. */ + aeDeleteTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,aeTimer); + aeTimer = -1; + } + raxStop(&ri); + } - /* If we have no main timer (the old one was invalidated, or this is the - * first module timer we have), install one. */ - if (aeTimer == -1) { - aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [&]{ - aeTimer = aeCreateTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,period,moduleTimerHandler,NULL,NULL); - }, true /* synchronous */, false /* fLock */); + /* If we have no main timer (the old one was invalidated, or this is the + * first module timer we have), install one. */ + if (aeTimer == -1) { + aeTimer = aeCreateTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,pending_period,moduleTimerHandler,NULL,NULL); + } + + pending_period = -1; + }); } return key; diff --git a/src/server.cpp b/src/server.cpp index aa142b527..848b28bcf 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -4087,7 +4087,7 @@ bool client::postFunction(std::function fn, bool fLock) { std::lock_guardlock)> lock(this->lock); fn(this); --casyncOpsPending; - }, false, fLock) == AE_OK; + }, fLock) == AE_OK; } /*================================== Shutdown =============================== */ diff --git a/tests/unit/moduleapi/hooks.tcl b/tests/unit/moduleapi/hooks.tcl index c4af59bd2..a9387e757 100644 --- a/tests/unit/moduleapi/hooks.tcl +++ b/tests/unit/moduleapi/hooks.tcl @@ -132,6 +132,7 @@ tags "modules" { } $replica replicaof no one + after 300 test {Test role-master hook} { assert_equal [r hooks.event_count role-replica] 1