From dd0b8af2c569d76fbf03cb1b5b4266a82a66598e Mon Sep 17 00:00:00 2001 From: VivekSainiEQ Date: Fri, 8 Jan 2021 17:33:14 +0000 Subject: [PATCH 01/10] removed synchronous calls to aePostFunction and changed scope of g_fModuleThread in order to prevent module related deadlocks, issue #214 Former-commit-id: 3b8d1f7076e4ab2082cd0768abc7b0b6ed4f951a --- src/ae.cpp | 19 ++----------------- src/ae.h | 2 +- src/expire.cpp | 2 +- src/module.cpp | 19 ++++++++----------- src/server.cpp | 2 +- 5 files changed, 13 insertions(+), 31 deletions(-) diff --git a/src/ae.cpp b/src/ae.cpp index 125179c89..4f26df7b3 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -286,7 +286,7 @@ int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg) return AE_OK; } -int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynchronous, bool fLock, bool fForceQueue) +int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fLock, bool fForceQueue) { if (eventLoop == g_eventLoopThisThread && !fForceQueue) { @@ -299,11 +299,6 @@ int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynch cmd.pfn = new (MALLOC_LOCAL) std::function(fn); cmd.pctl = nullptr; cmd.fLock = fLock; - if (fSynchronous) - { - cmd.pctl = new (MALLOC_LOCAL) aeCommandControl; - cmd.pctl->mutexcv.lock(); - } auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); if (!(!size || size == sizeof(cmd))) { @@ -314,17 +309,7 @@ int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynch if (size == 0) return AE_ERR; - int ret = AE_OK; - if (fSynchronous) - { - { - std::unique_lock ulock(cmd.pctl->mutexcv, std::adopt_lock); - cmd.pctl->cv.wait(ulock); - ret = cmd.pctl->rval; - } - delete cmd.pctl; - } - return ret; + return AE_OK; } aeEventLoop *aeCreateEventLoop(int setsize) { diff --git a/src/ae.h b/src/ae.h index e77abb01f..20ef22a5e 100644 --- a/src/ae.h +++ b/src/ae.h @@ -135,7 +135,7 @@ aeEventLoop *aeCreateEventLoop(int setsize); int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg); #ifdef __cplusplus } // EXTERN C -int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynchronous = false, bool fLock = true, bool fForceQueue = false); +int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fLock = true, bool fForceQueue = false); extern "C" { #endif void aeDeleteEventLoop(aeEventLoop *eventLoop); diff --git a/src/expire.cpp b/src/expire.cpp index 46f52c19c..1d14f7152 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -136,7 +136,7 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { executeCronJobExpireHook(keyCopy, val); sdsfree(keyCopy); decrRefCount(val); - }, false, true /*fLock*/, true /*fForceQueue*/); + }, true /*fLock*/, true /*fForceQueue*/); } return; diff --git a/src/module.cpp b/src/module.cpp index c99ab4685..594a172e4 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -5045,7 +5045,7 @@ void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) { zfree(ctx); } -static bool g_fModuleThread = false; +__thread bool g_fModuleThread = false; /* Acquire the server lock before executing a thread safe API call. * This is not needed for `RedisModule_Reply*` calls when there is * a blocked client connected to the thread safe context. */ @@ -5673,20 +5673,17 @@ RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisMod if (memcmp(ri.key,&key,sizeof(key)) == 0) { /* This is the first key, we need to re-install the timer according * to the just added event. */ - aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [&]{ + aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [period]{ aeDeleteTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,aeTimer); - }, true /* synchronous */, false /* fLock */); - aeTimer = -1; + aeTimer = aeCreateTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,period,moduleTimerHandler,NULL,NULL); + }); } raxStop(&ri); - } - - /* If we have no main timer (the old one was invalidated, or this is the - * first module timer we have), install one. */ - if (aeTimer == -1) { - aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [&]{ + } else { + /* If we have no main timer because this is the first module timer we have, install one. */ + aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [period]{ aeTimer = aeCreateTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,period,moduleTimerHandler,NULL,NULL); - }, true /* synchronous */, false /* fLock */); + }); } return key; diff --git a/src/server.cpp b/src/server.cpp index aa142b527..848b28bcf 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -4087,7 +4087,7 @@ bool client::postFunction(std::function fn, bool fLock) { std::lock_guardlock)> lock(this->lock); fn(this); --casyncOpsPending; - }, false, fLock) == AE_OK; + }, fLock) == AE_OK; } /*================================== Shutdown =============================== */ From 2addbe7e4a32cb3756f783693190edbdac16e127 Mon Sep 17 00:00:00 2001 From: VivekSainiEQ Date: Fri, 8 Jan 2021 17:45:45 +0000 Subject: [PATCH 02/10] Added fix for scenario where module thread waiting for s_mutexModule in acquireGIL can deadlock with module thread waiting for s_mutex in releaseGIL Former-commit-id: 3205373bb378f895824cc1936a6bae663b1abdcc --- src/module.cpp | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/module.cpp b/src/module.cpp index 594a172e4..f128d10e3 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -5104,7 +5104,14 @@ void moduleAcquireGIL(int fServerThread) { } else { - s_mutexModule.lock(); + // It is possible that another module thread holds the GIL (and s_mutexModule as a result). + // When said thread goes to release the GIL, it will wait for s_mutex, which this thread owns. + // This thread is however waiting for the GIL (and s_mutexModule) that the other thread owns. + // As a result, a deadlock has occured. + // We release the lock on s_mutex and wait until we are able to safely acquire the GIL + // in order to prevent this deadlock from occuring. + while (!s_mutexModule.try_lock()) + s_cv.wait(lock); ++s_cAcquisitionsModule; fModuleGILWlocked++; } From 662037e3a33ff282b22cf398a73dd4f5f3f7d192 Mon Sep 17 00:00:00 2001 From: VivekSainiEQ Date: Mon, 11 Jan 2021 19:09:51 +0000 Subject: [PATCH 03/10] Removed more uses of fSynchronous and the use of condition variable and mutex on the control struct. Former-commit-id: 6ab08cc3e1429178b26b55ed7aa8ba85240eb766 --- src/ae.cpp | 51 +++----------------------------------------------- src/module.cpp | 2 +- 2 files changed, 4 insertions(+), 49 deletions(-) diff --git a/src/ae.cpp b/src/ae.cpp index 4f26df7b3..89b36beaa 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -109,13 +109,6 @@ enum class AE_ASYNC_OP CreateFileEvent, }; -struct aeCommandControl -{ - std::condition_variable cv; - std::atomic rval; - std::mutex mutexcv; -}; - struct aeCommand { AE_ASYNC_OP op; @@ -128,7 +121,6 @@ struct aeCommand std::function *pfn; }; void *clientData; - aeCommandControl *pctl; }; void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int ) @@ -149,19 +141,7 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int ) break; case AE_ASYNC_OP::CreateFileEvent: - { - if (cmd.pctl != nullptr) - { - cmd.pctl->mutexcv.lock(); - std::atomic_store(&cmd.pctl->rval, aeCreateFileEvent(eventLoop, cmd.fd, cmd.mask, cmd.fproc, cmd.clientData)); - cmd.pctl->cv.notify_all(); - cmd.pctl->mutexcv.unlock(); - } - else - { - aeCreateFileEvent(eventLoop, cmd.fd, cmd.mask, cmd.fproc, cmd.clientData); - } - } + aeCreateFileEvent(eventLoop, cmd.fd, cmd.mask, cmd.fproc, cmd.clientData); break; case AE_ASYNC_OP::PostFunction: @@ -175,19 +155,11 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int ) case AE_ASYNC_OP::PostCppFunction: { - if (cmd.pctl != nullptr) - cmd.pctl->mutexcv.lock(); - std::unique_lock ulock(g_lock, std::defer_lock); if (cmd.fLock) ulock.lock(); (*cmd.pfn)(); - - if (cmd.pctl != nullptr) - { - cmd.pctl->cv.notify_all(); - cmd.pctl->mutexcv.unlock(); - } + delete cmd.pfn; } break; @@ -226,7 +198,7 @@ ssize_t safe_write(int fd, const void *pv, size_t cb) } int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask, - aeFileProc *proc, void *clientData, int fSynchronous) + aeFileProc *proc, void *clientData) { if (eventLoop == g_eventLoopThisThread) return aeCreateFileEvent(eventLoop, fd, mask, proc, clientData); @@ -239,13 +211,7 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask, cmd.mask = mask; cmd.fproc = proc; cmd.clientData = clientData; - cmd.pctl = nullptr; cmd.fLock = true; - if (fSynchronous) - { - cmd.pctl = new (MALLOC_LOCAL) aeCommandControl(); - cmd.pctl->mutexcv.lock(); - } auto size = safe_write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); if (size != sizeof(cmd)) @@ -254,16 +220,6 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask, serverAssert(errno == EAGAIN); ret = AE_ERR; } - - if (fSynchronous) - { - { - std::unique_lock ulock(cmd.pctl->mutexcv, std::adopt_lock); - cmd.pctl->cv.wait(ulock); - ret = cmd.pctl->rval; - } - delete cmd.pctl; - } return ret; } @@ -297,7 +253,6 @@ int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fLock, aeCommand cmd = {}; cmd.op = AE_ASYNC_OP::PostCppFunction; cmd.pfn = new (MALLOC_LOCAL) std::function(fn); - cmd.pctl = nullptr; cmd.fLock = fLock; auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); diff --git a/src/module.cpp b/src/module.cpp index f128d10e3..0430124de 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -5045,7 +5045,7 @@ void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) { zfree(ctx); } -__thread bool g_fModuleThread = false; +thread_local bool g_fModuleThread = false; /* Acquire the server lock before executing a thread safe API call. * This is not needed for `RedisModule_Reply*` calls when there is * a blocked client connected to the thread safe context. */ From 49132539dd837e7292c3514bd8c1bcdf00312578 Mon Sep 17 00:00:00 2001 From: VivekSainiEQ Date: Mon, 11 Jan 2021 19:17:28 +0000 Subject: [PATCH 04/10] Updated header file to remove fSynchronous flag Former-commit-id: e2552ff8a92ea5adf7cea070b48afc573003254d --- src/ae.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ae.h b/src/ae.h index 20ef22a5e..ab7127f8b 100644 --- a/src/ae.h +++ b/src/ae.h @@ -144,7 +144,7 @@ int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData); int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask, - aeFileProc *proc, void *clientData, int fSynchronous); + aeFileProc *proc, void *clientData); void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask); void aeDeleteFileEventAsync(aeEventLoop *eventLoop, int fd, int mask); From 84dc6401c741eb25e549b4988971de7dff240c0e Mon Sep 17 00:00:00 2001 From: VivekSainiEQ Date: Tue, 12 Jan 2021 23:22:36 +0000 Subject: [PATCH 05/10] Now post entire timer installation process as one function to make atomic with respect to global locks Former-commit-id: 53936661c88bd7eac88308afc75c510134a8e044 --- src/module.cpp | 44 +++++++++++++++++++++++--------------------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/src/module.cpp b/src/module.cpp index 0430124de..a86241c43 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -5668,30 +5668,32 @@ RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisMod } } - /* We need to install the main event loop timer if it's not already - * installed, or we may need to refresh its period if we just installed - * a timer that will expire sooner than any other else (i.e. the timer - * we just installed is the first timer in the Timers rax). */ - if (aeTimer != -1) { - raxIterator ri; - raxStart(&ri,Timers); - raxSeek(&ri,"^",NULL,0); - raxNext(&ri); - if (memcmp(ri.key,&key,sizeof(key)) == 0) { - /* This is the first key, we need to re-install the timer according - * to the just added event. */ - aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [period]{ + aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [period, key]{ + /* We need to install the main event loop timer if it's not already + * installed, or we may need to refresh its period if we just installed + * a timer that will expire sooner than any other else (i.e. the timer + * we just installed is the first timer in the Timers rax). */ + if (aeTimer != -1) { + raxIterator ri; + raxStart(&ri,Timers); + raxSeek(&ri,"^",NULL,0); + raxNext(&ri); + if (memcmp(ri.key,&key,sizeof(key)) == 0) { + /* This is the first key, we need to re-install the timer according + * to the just added event. */ aeDeleteTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,aeTimer); - aeTimer = aeCreateTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,period,moduleTimerHandler,NULL,NULL); - }); + aeTimer = -1; + } + raxStop(&ri); } - raxStop(&ri); - } else { - /* If we have no main timer because this is the first module timer we have, install one. */ - aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [period]{ + + /* If we have no main timer (the old one was invalidated, or this is the + * first module timer we have), install one. */ + if (aeTimer == -1) { aeTimer = aeCreateTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,period,moduleTimerHandler,NULL,NULL); - }); - } + } + }); + return key; } From cf122aa54d67e83436e341e353db854a050cd0c8 Mon Sep 17 00:00:00 2001 From: VivekSainiEQ Date: Thu, 14 Jan 2021 01:14:09 +0000 Subject: [PATCH 06/10] Changed RM_CreateTimer to only call aePostFunction is existing aePostFunction isn't in flight Former-commit-id: 9954f5b4a48286d07fb876fd9579801365b6c237 --- src/module.cpp | 56 ++++++++++++++++++++++++++++---------------------- 1 file changed, 32 insertions(+), 24 deletions(-) diff --git a/src/module.cpp b/src/module.cpp index a86241c43..bc1e9f9e6 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -5579,6 +5579,8 @@ void RM_SetClusterFlags(RedisModuleCtx *ctx, uint64_t flags) { static rax *Timers; /* The radix tree of all the timers sorted by expire. */ long long aeTimer = -1; /* Main event loop (ae.c) timer identifier. */ +bool aeTimerSet = false;/* Checks whether the main event loop timer is set + or if an aePostFunction is queued up that will set it */ typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data); @@ -5668,32 +5670,38 @@ RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisMod } } - aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [period, key]{ - /* We need to install the main event loop timer if it's not already - * installed, or we may need to refresh its period if we just installed - * a timer that will expire sooner than any other else (i.e. the timer - * we just installed is the first timer in the Timers rax). */ - if (aeTimer != -1) { - raxIterator ri; - raxStart(&ri,Timers); - raxSeek(&ri,"^",NULL,0); - raxNext(&ri); - if (memcmp(ri.key,&key,sizeof(key)) == 0) { - /* This is the first key, we need to re-install the timer according - * to the just added event. */ + /* We need to install the main event loop timer if it's not already + * installed, or we may need to refresh its period if we just installed + * a timer that will expire sooner than any other else (i.e. the timer + * we just installed is the first timer in the Timers rax). */ + bool isFirstExpiry = false; + if (raxSize(Timers) > 0){ + raxIterator ri; + raxStart(&ri, Timers); + raxSeek(&ri,"^",NULL,0); + raxNext(&ri); + if (memcmp(ri.key,&key,sizeof(key)) == 0) + /* This is the first key, we need to re-install the timer according + * to the just added event. */ + isFirstExpiry = true; + raxStop(&ri); + } + + /* Now that either we know that we either need to refresh the period of the + * recently installed timer, or that there is no timer to begin with, we must post + * a function call to install the main event timer */ + if (isFirstExpiry || !aeTimerSet){ + aeTimerSet = true; + aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [period, isFirstExpiry]{ + /* If we deemed that this timer required a reinstall, delete it before proceeding + * to the install */ + if (isFirstExpiry) aeDeleteTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,aeTimer); - aeTimer = -1; - } - raxStop(&ri); - } - - /* If we have no main timer (the old one was invalidated, or this is the - * first module timer we have), install one. */ - if (aeTimer == -1) { + /* If we have no main timer (the old one was invalidated, or this is the + * first module timer we have), install one. */ aeTimer = aeCreateTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,period,moduleTimerHandler,NULL,NULL); - } - }); - + }); + } return key; } From b0f94bd4b550b67e594ca8f2ced836f130f5bc05 Mon Sep 17 00:00:00 2001 From: VivekSainiEQ Date: Thu, 14 Jan 2021 21:59:56 +0000 Subject: [PATCH 07/10] Modified RM_CreateTimer to prevent more that one function being posted at a time Former-commit-id: f66e5a9c3d6925c3c6d383fbc43fbfbd56721dad --- src/module.cpp | 44 ++++++++++++++++++++++++++++---------------- 1 file changed, 28 insertions(+), 16 deletions(-) diff --git a/src/module.cpp b/src/module.cpp index bc1e9f9e6..dfabebeae 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -5579,8 +5579,10 @@ void RM_SetClusterFlags(RedisModuleCtx *ctx, uint64_t flags) { static rax *Timers; /* The radix tree of all the timers sorted by expire. */ long long aeTimer = -1; /* Main event loop (ae.c) timer identifier. */ -bool aeTimerSet = false;/* Checks whether the main event loop timer is set - or if an aePostFunction is queued up that will set it */ +static bool aeTimerSet = false; /* Checks whether the main event loop timer is set */ +static mstime_t aeTimerPeriod; /* The period of the aeTimer */ +static bool aeTimerPending = false; /* Keeps track of if there is a aePostFunction in flight + * that would modify the current main event loop timer */ typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data); @@ -5669,13 +5671,12 @@ RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisMod expiretime++; } } - /* We need to install the main event loop timer if it's not already * installed, or we may need to refresh its period if we just installed * a timer that will expire sooner than any other else (i.e. the timer * we just installed is the first timer in the Timers rax). */ bool isFirstExpiry = false; - if (raxSize(Timers) > 0){ + if (aeTimerSet){ raxIterator ri; raxStart(&ri, Timers); raxSeek(&ri,"^",NULL,0); @@ -5688,20 +5689,31 @@ RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisMod } /* Now that either we know that we either need to refresh the period of the - * recently installed timer, or that there is no timer to begin with, we must post - * a function call to install the main event timer */ + * recently installed timer, or that there is no timer to begin with, we must post + * a function call to install the main event timer. */ if (isFirstExpiry || !aeTimerSet){ + /* We set the period for the posted function in a global variable + * That is so if a function has been posted but not executed, and another + * function with a different period were to be posted, we can just update + * the period instead of posting a new function. */ + aeTimerPeriod = period; aeTimerSet = true; - aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [period, isFirstExpiry]{ - /* If we deemed that this timer required a reinstall, delete it before proceeding - * to the install */ - if (isFirstExpiry) - aeDeleteTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,aeTimer); - /* If we have no main timer (the old one was invalidated, or this is the - * first module timer we have), install one. */ - aeTimer = aeCreateTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,period,moduleTimerHandler,NULL,NULL); - }); - } + if (!aeTimerPending) { + /* We should only have one aePostFunction in flight at a time, aeTimerPending + * keeps track of that. */ + aeTimerPending = true; + aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [isFirstExpiry]{ + /* If we deemed that this timer required a reinstall, delete it before proceeding + * to the install */ + if (isFirstExpiry) + aeDeleteTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,aeTimer); + /* If we have no main timer (the old one was invalidated, or this is the + * first module timer we have), install one. */ + aeTimer = aeCreateTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,aeTimerPeriod,moduleTimerHandler,NULL,NULL); + aeTimerPending = false; + }); + } + } return key; } From cb5119b756523dbad83ae98e3444e796d124180a Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 31 Jan 2021 06:31:51 +0000 Subject: [PATCH 08/10] Simplify the handling of CreateTimer Former-commit-id: 7b4e25008a352bd425582a3e60b26894826af626 --- src/module.cpp | 76 ++++++++++++++++++++++---------------------------- 1 file changed, 34 insertions(+), 42 deletions(-) diff --git a/src/module.cpp b/src/module.cpp index dfabebeae..ed57e7ef4 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -5579,10 +5579,6 @@ void RM_SetClusterFlags(RedisModuleCtx *ctx, uint64_t flags) { static rax *Timers; /* The radix tree of all the timers sorted by expire. */ long long aeTimer = -1; /* Main event loop (ae.c) timer identifier. */ -static bool aeTimerSet = false; /* Checks whether the main event loop timer is set */ -static mstime_t aeTimerPeriod; /* The period of the aeTimer */ -static bool aeTimerPending = false; /* Keeps track of if there is a aePostFunction in flight - * that would modify the current main event loop timer */ typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data); @@ -5654,6 +5650,9 @@ int moduleTimerHandler(struct aeEventLoop *eventLoop, long long id, void *client * (If the time it takes to execute 'callback' is negligible the two * statements above mean the same) */ RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisModuleTimerProc callback, void *data) { + static uint64_t pending_key; + static mstime_t pending_period = -1; + RedisModuleTimer *timer = (RedisModuleTimer*)zmalloc(sizeof(*timer), MALLOC_LOCAL); timer->module = ctx->module; timer->callback = callback; @@ -5671,49 +5670,42 @@ RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisMod expiretime++; } } + + bool fNeedPost = (pending_period < 0); // If pending_period is already set, then a PostFunction is in flight and we don't need to set a new one + if (pending_period < 0 || period < pending_period) { + pending_period = period; + pending_key = key; + } + /* We need to install the main event loop timer if it's not already * installed, or we may need to refresh its period if we just installed * a timer that will expire sooner than any other else (i.e. the timer * we just installed is the first timer in the Timers rax). */ - bool isFirstExpiry = false; - if (aeTimerSet){ - raxIterator ri; - raxStart(&ri, Timers); - raxSeek(&ri,"^",NULL,0); - raxNext(&ri); - if (memcmp(ri.key,&key,sizeof(key)) == 0) - /* This is the first key, we need to re-install the timer according - * to the just added event. */ - isFirstExpiry = true; - raxStop(&ri); - } - - /* Now that either we know that we either need to refresh the period of the - * recently installed timer, or that there is no timer to begin with, we must post - * a function call to install the main event timer. */ - if (isFirstExpiry || !aeTimerSet){ - /* We set the period for the posted function in a global variable - * That is so if a function has been posted but not executed, and another - * function with a different period were to be posted, we can just update - * the period instead of posting a new function. */ - aeTimerPeriod = period; - aeTimerSet = true; - if (!aeTimerPending) { - /* We should only have one aePostFunction in flight at a time, aeTimerPending - * keeps track of that. */ - aeTimerPending = true; - aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [isFirstExpiry]{ - /* If we deemed that this timer required a reinstall, delete it before proceeding - * to the install */ - if (isFirstExpiry) + if (fNeedPost) { + aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, []{ + if (aeTimer != -1) { + raxIterator ri; + raxStart(&ri,Timers); + raxSeek(&ri,"^",NULL,0); + raxNext(&ri); + if (memcmp(ri.key,&pending_key,sizeof(key)) == 0) { + /* This is the first key, we need to re-install the timer according + * to the just added event. */ aeDeleteTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,aeTimer); - /* If we have no main timer (the old one was invalidated, or this is the - * first module timer we have), install one. */ - aeTimer = aeCreateTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,aeTimerPeriod,moduleTimerHandler,NULL,NULL); - aeTimerPending = false; - }); - } - } + aeTimer = -1; + } + raxStop(&ri); + } + + /* If we have no main timer (the old one was invalidated, or this is the + * first module timer we have), install one. */ + if (aeTimer == -1) { + aeTimer = aeCreateTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,pending_period,moduleTimerHandler,NULL,NULL); + } + + pending_period = -1; + }); + } return key; } From 651abfdca0e3d80965eb5578a7cd23d61d79e95e Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 1 Feb 2021 22:21:49 +0000 Subject: [PATCH 09/10] Module test reliability Former-commit-id: cc827e157a8cdd0224eaa76d6a70e0cccd95f544 --- tests/unit/moduleapi/hooks.tcl | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/unit/moduleapi/hooks.tcl b/tests/unit/moduleapi/hooks.tcl index c4af59bd2..a9387e757 100644 --- a/tests/unit/moduleapi/hooks.tcl +++ b/tests/unit/moduleapi/hooks.tcl @@ -132,6 +132,7 @@ tags "modules" { } $replica replicaof no one + after 300 test {Test role-master hook} { assert_equal [r hooks.event_count role-replica] 1 From aa47e643b0df8db58e809388d9de26f6e79aacb6 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 7 Feb 2021 23:38:09 +0000 Subject: [PATCH 10/10] Fix memory leak in mvccRestore Former-commit-id: 165333b0fc648c79e66f04d9c8c4a1d0059fe57a --- src/Makefile | 4 ++-- src/cluster.cpp | 13 ++++++++----- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/src/Makefile b/src/Makefile index 3af99b3b4..9e4070d4a 100644 --- a/src/Makefile +++ b/src/Makefile @@ -48,8 +48,8 @@ endif USEASM?=true ifneq ($(strip $(SANITIZE)),) - CFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE - CXXFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE + CFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE -fno-omit-frame-pointer + CXXFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE -fno-omit-frame-pointer LDFLAGS+= -fsanitize=$(SANITIZE) MALLOC=libc USEASM=false diff --git a/src/cluster.cpp b/src/cluster.cpp index e60807180..8556f1d28 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -5066,12 +5066,15 @@ void mvccrestoreCommand(client *c) { setMvccTstamp(obj, mvcc); /* Create the key and set the TTL if any */ - dbMerge(c->db,key,obj,true); - if (expire >= 0) { - setExpire(c,c->db,key,nullptr,expire); + if (dbMerge(c->db,key,obj,true)) { + if (expire >= 0) { + setExpire(c,c->db,key,nullptr,expire); + } + signalModifiedKey(c,c->db,key); + notifyKeyspaceEvent(NOTIFY_GENERIC,"restore",key,c->db->id); + } else { + decrRefCount(obj); } - signalModifiedKey(c,c->db,key); - notifyKeyspaceEvent(NOTIFY_GENERIC,"restore",key,c->db->id); addReply(c,shared.ok); g_pserver->dirty++; }