From 189967e7af829a551804b792edd755dae5414da8 Mon Sep 17 00:00:00 2001 From: VivekSainiEQ Date: Fri, 12 Feb 2021 21:48:08 +0000 Subject: [PATCH] added lock releasing w/ hasModuleGIL, changed module serverTL, and moved module_blocking_pipe to global scope to fix issue #276 Former-commit-id: 7d9a2ce827a2f8d48e4682b3cc95460cc82f9778 --- src/module.cpp | 65 ++++++++++++++++++++++++++++++++++---------------- src/server.cpp | 16 +++---------- src/server.h | 14 ++++++----- 3 files changed, 55 insertions(+), 40 deletions(-) diff --git a/src/module.cpp b/src/module.cpp index ed57e7ef4..d0f719566 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -4731,7 +4731,7 @@ int moduleUnblockClientByHandle(RedisModuleBlockedClient *bc, void *privdata) { if (!bc->blocked_on_keys) bc->privdata = privdata; bc->unblocked = 1; listAddNodeTail(moduleUnblockedClients,bc); - if (write(serverTL->module_blocked_pipe[1],"A",1) != 1) { + if (write(g_pserver->module_blocked_pipe[1],"A",1) != 1) { /* Ignore the error, this is best-effort. */ } pthread_mutex_unlock(&moduleUnblockedClientsMutex); @@ -4827,7 +4827,7 @@ void moduleHandleBlockedClients(int iel) { /* Here we unblock all the pending clients blocked in modules operations * so we can read every pending "awake byte" in the pipe. */ char buf[1]; - while (read(serverTL->module_blocked_pipe[0],buf,1) == 1); + while (read(g_pserver->module_blocked_pipe[0],buf,1) == 1); listIter li; listNode *ln; listRewind(moduleUnblockedClients, &li); @@ -5045,6 +5045,7 @@ void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) { zfree(ctx); } +static redisServerThreadVars vars; thread_local bool g_fModuleThread = false; /* Acquire the server lock before executing a thread safe API call. * This is not needed for `RedisModule_Reply*` calls when there is @@ -5052,10 +5053,10 @@ thread_local bool g_fModuleThread = false; void RM_ThreadSafeContextLock(RedisModuleCtx *ctx) { UNUSED(ctx); if (serverTL == nullptr) { - serverTL = &g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN]; // arbitrary module threads get the main thread context + serverTL = &vars; // arbitrary module threads get the main thread context g_fModuleThread = true; } - moduleAcquireGIL(FALSE /*fServerThread*/); + moduleAcquireGIL(FALSE /*fServerThread*/, true /*fExclusive*/); } /* Similar to RM_ThreadSafeContextLock but this function @@ -5067,7 +5068,7 @@ void RM_ThreadSafeContextLock(RedisModuleCtx *ctx) { int RM_ThreadSafeContextTryLock(RedisModuleCtx *ctx) { UNUSED(ctx); - int res = moduleTryAcquireGIL(false /*fServerThread*/); + int res = moduleTryAcquireGIL(false /*fServerThread*/, true /*fExclusive*/); if(res != 0) { errno = res; return REDISMODULE_ERR; @@ -5078,7 +5079,7 @@ int RM_ThreadSafeContextTryLock(RedisModuleCtx *ctx) { /* Release the server lock after a thread safe API call was executed. */ void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) { UNUSED(ctx); - moduleReleaseGIL(FALSE /*fServerThread*/); + moduleReleaseGIL(FALSE /*fServerThread*/, true /*fExclusive*/); } // A module may be triggered synchronously in a non-module context. In this scenario we don't lock again @@ -5087,7 +5088,7 @@ static bool FModuleCallBackLock(bool fServerThread) { return !fServerThread && aeThreadOwnsLock() && !g_fModuleThread && s_cAcquisitionsServer > 0; } -void moduleAcquireGIL(int fServerThread) { +void moduleAcquireGIL(int fServerThread, int fExclusive) { std::unique_lock lock(s_mutex); int *pcheck = fServerThread ? &s_cAcquisitionsModule : &s_cAcquisitionsServer; @@ -5101,23 +5102,27 @@ void moduleAcquireGIL(int fServerThread) { if (fServerThread) { ++s_cAcquisitionsServer; + serverTL->hasModuleGIL = true; } else { - // It is possible that another module thread holds the GIL (and s_mutexModule as a result). - // When said thread goes to release the GIL, it will wait for s_mutex, which this thread owns. - // This thread is however waiting for the GIL (and s_mutexModule) that the other thread owns. - // As a result, a deadlock has occured. - // We release the lock on s_mutex and wait until we are able to safely acquire the GIL - // in order to prevent this deadlock from occuring. - while (!s_mutexModule.try_lock()) - s_cv.wait(lock); + // only try to acquire the mutexModule in exclusive mode + if (fExclusive){ + // It is possible that another module thread holds the GIL (and s_mutexModule as a result). + // When said thread goes to release the GIL, it will wait for s_mutex, which this thread owns. + // This thread is however waiting for the GIL (and s_mutexModule) that the other thread owns. + // As a result, a deadlock has occured. + // We release the lock on s_mutex and wait until we are able to safely acquire the GIL + // in order to prevent this deadlock from occuring. + while (!s_mutexModule.try_lock()) + s_cv.wait(lock); + } ++s_cAcquisitionsModule; fModuleGILWlocked++; } } -int moduleTryAcquireGIL(bool fServerThread) { +int moduleTryAcquireGIL(bool fServerThread, int fExclusive) { std::unique_lock lock(s_mutex, std::defer_lock); if (!lock.try_lock()) return 1; @@ -5133,18 +5138,22 @@ int moduleTryAcquireGIL(bool fServerThread) { if (fServerThread) { ++s_cAcquisitionsServer; + serverTL->hasModuleGIL = true; } else { - if (!s_mutexModule.try_lock()) - return 1; + // only try to acquire the mutexModule in exclusive mode + if (fExclusive){ + if (!s_mutexModule.try_lock()) + return 1; + } ++s_cAcquisitionsModule; fModuleGILWlocked++; } return 0; } -void moduleReleaseGIL(int fServerThread) { +void moduleReleaseGIL(int fServerThread, int fExclusive) { std::unique_lock lock(s_mutex); if (FModuleCallBackLock(fServerThread)) { @@ -5154,10 +5163,13 @@ void moduleReleaseGIL(int fServerThread) { if (fServerThread) { --s_cAcquisitionsServer; + serverTL->hasModuleGIL = false; } else { - s_mutexModule.unlock(); + // only try to release the mutexModule in exclusive mode + if (fExclusive) + s_mutexModule.unlock(); --s_cAcquisitionsModule; fModuleGILWlocked--; } @@ -5165,7 +5177,7 @@ void moduleReleaseGIL(int fServerThread) { } int moduleGILAcquiredByModule(void) { - return fModuleGILWlocked; + return fModuleGILWlocked > 0; } @@ -7735,7 +7747,18 @@ void moduleInitModulesSystem(void) { moduleCommandFilters = listCreate(); moduleRegisterCoreAPI(); + if (pipe(g_pserver->module_blocked_pipe) == -1) { + serverLog(LL_WARNING, + "Can't create the pipe for module blocking commands: %s", + strerror(errno)); + exit(1); + } + /* Make the pipe non blocking. This is just a best effort aware mechanism + * and we do not want to block not in the read nor in the write half. */ + anetNonBlock(NULL,g_pserver->module_blocked_pipe[0]); + anetNonBlock(NULL,g_pserver->module_blocked_pipe[1]); + /* Create the timers radix tree. */ Timers = raxNew(); diff --git a/src/server.cpp b/src/server.cpp index 848b28bcf..8ee76b4e5 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -3104,21 +3104,9 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain) } } - if (pipe(pvar->module_blocked_pipe) == -1) { - serverLog(LL_WARNING, - "Can't create the pipe for module blocking commands: %s", - strerror(errno)); - exit(1); - } - - /* Make the pipe non blocking. This is just a best effort aware mechanism - * and we do not want to block not in the read nor in the write half. */ - anetNonBlock(NULL,pvar->module_blocked_pipe[0]); - anetNonBlock(NULL,pvar->module_blocked_pipe[1]); - /* Register a readable event for the pipe used to awake the event loop * when a blocked client in a module needs attention. */ - if (aeCreateFileEvent(pvar->el, pvar->module_blocked_pipe[0], AE_READABLE, + if (aeCreateFileEvent(pvar->el, g_pserver->module_blocked_pipe[0], AE_READABLE, moduleBlockedClientPipeReadable,NULL) == AE_ERR) { serverPanic( "Error registering the readable event for the module " @@ -5685,6 +5673,8 @@ void *workerThreadMain(void *parg) { } serverAssert(!GlobalLocksAcquired()); + if (serverTL->hasModuleGIL) + moduleReleaseGIL(true); aeDeleteEventLoop(el); return NULL; diff --git a/src/server.h b/src/server.h index 0da8b84f3..fa43ae160 100644 --- a/src/server.h +++ b/src/server.h @@ -1384,15 +1384,13 @@ struct redisServerThreadVars { int cclients; client *current_client; /* Current client */ long fixed_time_expire = 0; /* If > 0, expire keys against server.mstime. */ - int module_blocked_pipe[2]; /* Pipe used to awake the event loop if a - client blocked on a module command needs - to be processed. */ client *lua_client = nullptr; /* The "fake client" to query Redis from Lua */ struct fastlock lockPendingWrite { "thread pending write" }; char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ long unsigned commandsExecuted = 0; bool fRetrySetAofEvent = false; std::vector vecclientsProcess; + bool hasModuleGIL = false; /* Does this thread own the moduleGIL lock? */ }; struct redisMaster { @@ -1857,6 +1855,10 @@ struct redisServer { long long repl_batch_offStart = -1; long long repl_batch_idxStart = -1; + + int module_blocked_pipe[2]; /* Pipe used to awake the event loop if a + client blocked on a module command needs + to be processed. */ }; typedef struct pubsubPattern { @@ -1999,9 +2001,9 @@ void moduleHandleBlockedClients(int iel); void moduleBlockedClientTimedOut(client *c); void moduleBlockedClientPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask); size_t moduleCount(void); -void moduleAcquireGIL(int fServerThread); -int moduleTryAcquireGIL(bool fServerThread); -void moduleReleaseGIL(int fServerThread); +void moduleAcquireGIL(int fServerThread, int fExclusive = FALSE); +int moduleTryAcquireGIL(bool fServerThread, int fExclusive = FALSE); +void moduleReleaseGIL(int fServerThread, int fExclusive = FALSE); void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid); void moduleCallCommandFilters(client *c); int moduleHasCommandFilters();