removed synchronous calls to aePostFunction and changed scope of g_fModuleThread in order to prevent module related deadlocks, issue #214

Former-commit-id: 3b8d1f7076e4ab2082cd0768abc7b0b6ed4f951a
This commit is contained in:
VivekSainiEQ 2021-01-08 17:33:14 +00:00 committed by John Sully
parent 84576e9b39
commit dd0b8af2c5
5 changed files with 13 additions and 31 deletions

View File

@ -286,7 +286,7 @@ int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg)
return AE_OK; return AE_OK;
} }
int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynchronous, bool fLock, bool fForceQueue) int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fLock, bool fForceQueue)
{ {
if (eventLoop == g_eventLoopThisThread && !fForceQueue) if (eventLoop == g_eventLoopThisThread && !fForceQueue)
{ {
@ -299,11 +299,6 @@ int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynch
cmd.pfn = new (MALLOC_LOCAL) std::function<void()>(fn); cmd.pfn = new (MALLOC_LOCAL) std::function<void()>(fn);
cmd.pctl = nullptr; cmd.pctl = nullptr;
cmd.fLock = fLock; cmd.fLock = fLock;
if (fSynchronous)
{
cmd.pctl = new (MALLOC_LOCAL) aeCommandControl;
cmd.pctl->mutexcv.lock();
}
auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
if (!(!size || size == sizeof(cmd))) { if (!(!size || size == sizeof(cmd))) {
@ -314,17 +309,7 @@ int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynch
if (size == 0) if (size == 0)
return AE_ERR; return AE_ERR;
int ret = AE_OK; return AE_OK;
if (fSynchronous)
{
{
std::unique_lock<std::mutex> ulock(cmd.pctl->mutexcv, std::adopt_lock);
cmd.pctl->cv.wait(ulock);
ret = cmd.pctl->rval;
}
delete cmd.pctl;
}
return ret;
} }
aeEventLoop *aeCreateEventLoop(int setsize) { aeEventLoop *aeCreateEventLoop(int setsize) {

View File

@ -135,7 +135,7 @@ aeEventLoop *aeCreateEventLoop(int setsize);
int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg); int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg);
#ifdef __cplusplus #ifdef __cplusplus
} // EXTERN C } // EXTERN C
int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynchronous = false, bool fLock = true, bool fForceQueue = false); int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fLock = true, bool fForceQueue = false);
extern "C" { extern "C" {
#endif #endif
void aeDeleteEventLoop(aeEventLoop *eventLoop); void aeDeleteEventLoop(aeEventLoop *eventLoop);

View File

@ -136,7 +136,7 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
executeCronJobExpireHook(keyCopy, val); executeCronJobExpireHook(keyCopy, val);
sdsfree(keyCopy); sdsfree(keyCopy);
decrRefCount(val); decrRefCount(val);
}, false, true /*fLock*/, true /*fForceQueue*/); }, true /*fLock*/, true /*fForceQueue*/);
} }
return; return;

View File

@ -5045,7 +5045,7 @@ void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) {
zfree(ctx); zfree(ctx);
} }
static bool g_fModuleThread = false; __thread bool g_fModuleThread = false;
/* Acquire the server lock before executing a thread safe API call. /* Acquire the server lock before executing a thread safe API call.
* This is not needed for `RedisModule_Reply*` calls when there is * This is not needed for `RedisModule_Reply*` calls when there is
* a blocked client connected to the thread safe context. */ * a blocked client connected to the thread safe context. */
@ -5673,20 +5673,17 @@ RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisMod
if (memcmp(ri.key,&key,sizeof(key)) == 0) { if (memcmp(ri.key,&key,sizeof(key)) == 0) {
/* This is the first key, we need to re-install the timer according /* This is the first key, we need to re-install the timer according
* to the just added event. */ * to the just added event. */
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [&]{ aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [period]{
aeDeleteTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,aeTimer); aeDeleteTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,aeTimer);
}, true /* synchronous */, false /* fLock */); aeTimer = aeCreateTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,period,moduleTimerHandler,NULL,NULL);
aeTimer = -1; });
} }
raxStop(&ri); raxStop(&ri);
} } else {
/* If we have no main timer because this is the first module timer we have, install one. */
/* If we have no main timer (the old one was invalidated, or this is the aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [period]{
* first module timer we have), install one. */
if (aeTimer == -1) {
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [&]{
aeTimer = aeCreateTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,period,moduleTimerHandler,NULL,NULL); aeTimer = aeCreateTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,period,moduleTimerHandler,NULL,NULL);
}, true /* synchronous */, false /* fLock */); });
} }
return key; return key;

View File

@ -4087,7 +4087,7 @@ bool client::postFunction(std::function<void(client *)> fn, bool fLock) {
std::lock_guard<decltype(this->lock)> lock(this->lock); std::lock_guard<decltype(this->lock)> lock(this->lock);
fn(this); fn(this);
--casyncOpsPending; --casyncOpsPending;
}, false, fLock) == AE_OK; }, fLock) == AE_OK;
} }
/*================================== Shutdown =============================== */ /*================================== Shutdown =============================== */