From dd0b8af2c569d76fbf03cb1b5b4266a82a66598e Mon Sep 17 00:00:00 2001 From: VivekSainiEQ Date: Fri, 8 Jan 2021 17:33:14 +0000 Subject: [PATCH] removed synchronous calls to aePostFunction and changed scope of g_fModuleThread in order to prevent module related deadlocks, issue #214 Former-commit-id: 3b8d1f7076e4ab2082cd0768abc7b0b6ed4f951a --- src/ae.cpp | 19 ++----------------- src/ae.h | 2 +- src/expire.cpp | 2 +- src/module.cpp | 19 ++++++++----------- src/server.cpp | 2 +- 5 files changed, 13 insertions(+), 31 deletions(-) diff --git a/src/ae.cpp b/src/ae.cpp index 125179c89..4f26df7b3 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -286,7 +286,7 @@ int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg) return AE_OK; } -int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynchronous, bool fLock, bool fForceQueue) +int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fLock, bool fForceQueue) { if (eventLoop == g_eventLoopThisThread && !fForceQueue) { @@ -299,11 +299,6 @@ int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynch cmd.pfn = new (MALLOC_LOCAL) std::function(fn); cmd.pctl = nullptr; cmd.fLock = fLock; - if (fSynchronous) - { - cmd.pctl = new (MALLOC_LOCAL) aeCommandControl; - cmd.pctl->mutexcv.lock(); - } auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); if (!(!size || size == sizeof(cmd))) { @@ -314,17 +309,7 @@ int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynch if (size == 0) return AE_ERR; - int ret = AE_OK; - if (fSynchronous) - { - { - std::unique_lock ulock(cmd.pctl->mutexcv, std::adopt_lock); - cmd.pctl->cv.wait(ulock); - ret = cmd.pctl->rval; - } - delete cmd.pctl; - } - return ret; + return AE_OK; } aeEventLoop *aeCreateEventLoop(int setsize) { diff --git a/src/ae.h b/src/ae.h index e77abb01f..20ef22a5e 100644 --- a/src/ae.h +++ b/src/ae.h @@ -135,7 +135,7 @@ aeEventLoop *aeCreateEventLoop(int setsize); int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg); #ifdef __cplusplus } // EXTERN C -int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynchronous = false, bool fLock = true, bool fForceQueue = false); +int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fLock = true, bool fForceQueue = false); extern "C" { #endif void aeDeleteEventLoop(aeEventLoop *eventLoop); diff --git a/src/expire.cpp b/src/expire.cpp index 46f52c19c..1d14f7152 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -136,7 +136,7 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { executeCronJobExpireHook(keyCopy, val); sdsfree(keyCopy); decrRefCount(val); - }, false, true /*fLock*/, true /*fForceQueue*/); + }, true /*fLock*/, true /*fForceQueue*/); } return; diff --git a/src/module.cpp b/src/module.cpp index c99ab4685..594a172e4 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -5045,7 +5045,7 @@ void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) { zfree(ctx); } -static bool g_fModuleThread = false; +__thread bool g_fModuleThread = false; /* Acquire the server lock before executing a thread safe API call. * This is not needed for `RedisModule_Reply*` calls when there is * a blocked client connected to the thread safe context. */ @@ -5673,20 +5673,17 @@ RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisMod if (memcmp(ri.key,&key,sizeof(key)) == 0) { /* This is the first key, we need to re-install the timer according * to the just added event. */ - aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [&]{ + aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [period]{ aeDeleteTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,aeTimer); - }, true /* synchronous */, false /* fLock */); - aeTimer = -1; + aeTimer = aeCreateTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,period,moduleTimerHandler,NULL,NULL); + }); } raxStop(&ri); - } - - /* If we have no main timer (the old one was invalidated, or this is the - * first module timer we have), install one. */ - if (aeTimer == -1) { - aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [&]{ + } else { + /* If we have no main timer because this is the first module timer we have, install one. */ + aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [period]{ aeTimer = aeCreateTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,period,moduleTimerHandler,NULL,NULL); - }, true /* synchronous */, false /* fLock */); + }); } return key; diff --git a/src/server.cpp b/src/server.cpp index aa142b527..848b28bcf 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -4087,7 +4087,7 @@ bool client::postFunction(std::function fn, bool fLock) { std::lock_guardlock)> lock(this->lock); fn(this); --casyncOpsPending; - }, false, fLock) == AE_OK; + }, fLock) == AE_OK; } /*================================== Shutdown =============================== */