Fix module multithreaded test failures

Former-commit-id: 1ef35cf466ea944c56974b3795d7d6b5e89f5a3d
This commit is contained in:
John Sully 2020-05-31 23:46:12 -04:00
parent 08fca5ef31
commit 0a9a32e5d7
4 changed files with 22 additions and 8 deletions

View File

@ -122,6 +122,7 @@ struct aeCommand
AE_ASYNC_OP op; AE_ASYNC_OP op;
int fd; int fd;
int mask; int mask;
bool fLock = true;
union { union {
aePostFunctionProc *proc; aePostFunctionProc *proc;
aeFileProc *fproc; aeFileProc *fproc;
@ -166,7 +167,9 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int )
case AE_ASYNC_OP::PostFunction: case AE_ASYNC_OP::PostFunction:
{ {
std::unique_lock<decltype(g_lock)> ulock(g_lock); std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock);
if (cmd.fLock)
ulock.lock();
((aePostFunctionProc*)cmd.proc)(cmd.clientData); ((aePostFunctionProc*)cmd.proc)(cmd.clientData);
break; break;
} }
@ -176,7 +179,9 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int )
if (cmd.pctl != nullptr) if (cmd.pctl != nullptr)
cmd.pctl->mutexcv.lock(); cmd.pctl->mutexcv.lock();
std::unique_lock<decltype(g_lock)> ulock(g_lock); std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock);
if (cmd.fLock)
ulock.lock();
(*cmd.pfn)(); (*cmd.pfn)();
if (cmd.pctl != nullptr) if (cmd.pctl != nullptr)
@ -236,6 +241,7 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
cmd.fproc = proc; cmd.fproc = proc;
cmd.clientData = clientData; cmd.clientData = clientData;
cmd.pctl = nullptr; cmd.pctl = nullptr;
cmd.fLock = true;
if (fSynchronous) if (fSynchronous)
{ {
cmd.pctl = new (MALLOC_LOCAL) aeCommandControl(); cmd.pctl = new (MALLOC_LOCAL) aeCommandControl();
@ -272,13 +278,14 @@ int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg)
cmd.op = AE_ASYNC_OP::PostFunction; cmd.op = AE_ASYNC_OP::PostFunction;
cmd.proc = proc; cmd.proc = proc;
cmd.clientData = arg; cmd.clientData = arg;
cmd.fLock = true;
auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
if (size != sizeof(cmd)) if (size != sizeof(cmd))
return AE_ERR; return AE_ERR;
return AE_OK; return AE_OK;
} }
int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynchronous) int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynchronous, bool fLock)
{ {
if (eventLoop == g_eventLoopThisThread) if (eventLoop == g_eventLoopThisThread)
{ {
@ -290,6 +297,7 @@ int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynch
cmd.op = AE_ASYNC_OP::PostCppFunction; cmd.op = AE_ASYNC_OP::PostCppFunction;
cmd.pfn = new (MALLOC_LOCAL) std::function<void()>(fn); cmd.pfn = new (MALLOC_LOCAL) std::function<void()>(fn);
cmd.pctl = nullptr; cmd.pctl = nullptr;
cmd.fLock = fLock;
if (fSynchronous) if (fSynchronous)
{ {
cmd.pctl = new (MALLOC_LOCAL) aeCommandControl(); cmd.pctl = new (MALLOC_LOCAL) aeCommandControl();
@ -449,6 +457,7 @@ void aeDeleteFileEventAsync(aeEventLoop *eventLoop, int fd, int mask)
cmd.op = AE_ASYNC_OP::DeleteFileEvent; cmd.op = AE_ASYNC_OP::DeleteFileEvent;
cmd.fd = fd; cmd.fd = fd;
cmd.mask = mask; cmd.mask = mask;
cmd.fLock = true;
auto cb = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); auto cb = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
AE_ASSERT(cb == sizeof(cmd)); AE_ASSERT(cb == sizeof(cmd));
} }

View File

@ -135,7 +135,7 @@ aeEventLoop *aeCreateEventLoop(int setsize);
int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg); int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg);
#ifdef __cplusplus #ifdef __cplusplus
} // EXTERN C } // EXTERN C
int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynchronous = false); int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynchronous = false, bool fLock = true);
extern "C" { extern "C" {
#endif #endif
void aeDeleteEventLoop(aeEventLoop *eventLoop); void aeDeleteEventLoop(aeEventLoop *eventLoop);

View File

@ -5439,7 +5439,9 @@ RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisMod
if (memcmp(ri.key,&key,sizeof(key)) == 0) { if (memcmp(ri.key,&key,sizeof(key)) == 0) {
/* This is the first key, we need to re-install the timer according /* This is the first key, we need to re-install the timer according
* to the just added event. */ * to the just added event. */
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [&]{
aeDeleteTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,aeTimer); aeDeleteTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,aeTimer);
}, true /* synchronous */, false /* fLock */);
aeTimer = -1; aeTimer = -1;
} }
raxStop(&ri); raxStop(&ri);
@ -5447,8 +5449,11 @@ RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisMod
/* If we have no main timer (the old one was invalidated, or this is the /* If we have no main timer (the old one was invalidated, or this is the
* first module timer we have), install one. */ * first module timer we have), install one. */
if (aeTimer == -1) if (aeTimer == -1) {
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [&]{
aeTimer = aeCreateTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,period,moduleTimerHandler,NULL,NULL); aeTimer = aeCreateTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,period,moduleTimerHandler,NULL,NULL);
}, true /* synchronous */, false /* fLock */);
}
return key; return key;
} }

View File

@ -2822,7 +2822,7 @@ NULL
if (target && target->flags & CLIENT_BLOCKED) { if (target && target->flags & CLIENT_BLOCKED) {
std::unique_lock<fastlock> ul(target->lock); std::unique_lock<fastlock> ul(target->lock);
if (unblock_error) if (unblock_error)
addReplyError(target, addReplyErrorAsync(target,
"-UNBLOCKED client unblocked via CLIENT UNBLOCK"); "-UNBLOCKED client unblocked via CLIENT UNBLOCK");
else else
replyToBlockedClientTimedOut(target); replyToBlockedClientTimedOut(target);