diff --git a/src/ae.cpp b/src/ae.cpp index 5eba26b8c..ab5eeb163 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -122,6 +122,7 @@ struct aeCommand AE_ASYNC_OP op; int fd; int mask; + bool fLock = true; union { aePostFunctionProc *proc; aeFileProc *fproc; @@ -166,7 +167,9 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int ) case AE_ASYNC_OP::PostFunction: { - std::unique_lock ulock(g_lock); + std::unique_lock ulock(g_lock, std::defer_lock); + if (cmd.fLock) + ulock.lock(); ((aePostFunctionProc*)cmd.proc)(cmd.clientData); break; } @@ -176,7 +179,9 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int ) if (cmd.pctl != nullptr) cmd.pctl->mutexcv.lock(); - std::unique_lock ulock(g_lock); + std::unique_lock ulock(g_lock, std::defer_lock); + if (cmd.fLock) + ulock.lock(); (*cmd.pfn)(); if (cmd.pctl != nullptr) @@ -236,6 +241,7 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask, cmd.fproc = proc; cmd.clientData = clientData; cmd.pctl = nullptr; + cmd.fLock = true; if (fSynchronous) { cmd.pctl = new (MALLOC_LOCAL) aeCommandControl(); @@ -272,13 +278,14 @@ int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg) cmd.op = AE_ASYNC_OP::PostFunction; cmd.proc = proc; cmd.clientData = arg; + cmd.fLock = true; auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); if (size != sizeof(cmd)) return AE_ERR; return AE_OK; } -int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynchronous) +int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynchronous, bool fLock) { if (eventLoop == g_eventLoopThisThread) { @@ -290,6 +297,7 @@ int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynch cmd.op = AE_ASYNC_OP::PostCppFunction; cmd.pfn = new (MALLOC_LOCAL) std::function(fn); cmd.pctl = nullptr; + cmd.fLock = fLock; if (fSynchronous) { cmd.pctl = new (MALLOC_LOCAL) aeCommandControl(); @@ -449,6 +457,7 @@ void aeDeleteFileEventAsync(aeEventLoop *eventLoop, int fd, int mask) cmd.op = AE_ASYNC_OP::DeleteFileEvent; cmd.fd = fd; cmd.mask = mask; + cmd.fLock = true; auto cb = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); AE_ASSERT(cb == sizeof(cmd)); } diff --git a/src/ae.h b/src/ae.h index 156c219ef..3f1ddbf06 100644 --- a/src/ae.h +++ b/src/ae.h @@ -135,7 +135,7 @@ aeEventLoop *aeCreateEventLoop(int setsize); int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg); #ifdef __cplusplus } // EXTERN C -int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynchronous = false); +int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynchronous = false, bool fLock = true); extern "C" { #endif void aeDeleteEventLoop(aeEventLoop *eventLoop); diff --git a/src/module.cpp b/src/module.cpp index b4427630c..859236e72 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -5439,7 +5439,9 @@ RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisMod if (memcmp(ri.key,&key,sizeof(key)) == 0) { /* This is the first key, we need to re-install the timer according * to the just added event. */ - aeDeleteTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,aeTimer); + aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [&]{ + aeDeleteTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,aeTimer); + }, true /* synchronous */, false /* fLock */); aeTimer = -1; } raxStop(&ri); @@ -5447,8 +5449,11 @@ RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisMod /* If we have no main timer (the old one was invalidated, or this is the * first module timer we have), install one. */ - if (aeTimer == -1) - aeTimer = aeCreateTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,period,moduleTimerHandler,NULL,NULL); + if (aeTimer == -1) { + aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [&]{ + aeTimer = aeCreateTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,period,moduleTimerHandler,NULL,NULL); + }, true /* synchronous */, false /* fLock */); + } return key; } diff --git a/src/networking.cpp b/src/networking.cpp index ac248d41f..619db0786 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -2822,7 +2822,7 @@ NULL if (target && target->flags & CLIENT_BLOCKED) { std::unique_lock ul(target->lock); if (unblock_error) - addReplyError(target, + addReplyErrorAsync(target, "-UNBLOCKED client unblocked via CLIENT UNBLOCK"); else replyToBlockedClientTimedOut(target);