added lock releasing w/ hasModuleGIL, changed module serverTL, and moved module_blocking_pipe to global scope to fix issue #276
Former-commit-id: 7d9a2ce827a2f8d48e4682b3cc95460cc82f9778
This commit is contained in:
parent
2fd3097269
commit
932fd2e79a
@ -4731,7 +4731,7 @@ int moduleUnblockClientByHandle(RedisModuleBlockedClient *bc, void *privdata) {
|
||||
if (!bc->blocked_on_keys) bc->privdata = privdata;
|
||||
bc->unblocked = 1;
|
||||
listAddNodeTail(moduleUnblockedClients,bc);
|
||||
if (write(serverTL->module_blocked_pipe[1],"A",1) != 1) {
|
||||
if (write(g_pserver->module_blocked_pipe[1],"A",1) != 1) {
|
||||
/* Ignore the error, this is best-effort. */
|
||||
}
|
||||
pthread_mutex_unlock(&moduleUnblockedClientsMutex);
|
||||
@ -4827,7 +4827,7 @@ void moduleHandleBlockedClients(int iel) {
|
||||
/* Here we unblock all the pending clients blocked in modules operations
|
||||
* so we can read every pending "awake byte" in the pipe. */
|
||||
char buf[1];
|
||||
while (read(serverTL->module_blocked_pipe[0],buf,1) == 1);
|
||||
while (read(g_pserver->module_blocked_pipe[0],buf,1) == 1);
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
listRewind(moduleUnblockedClients, &li);
|
||||
@ -5045,6 +5045,7 @@ void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) {
|
||||
zfree(ctx);
|
||||
}
|
||||
|
||||
static redisServerThreadVars vars;
|
||||
thread_local bool g_fModuleThread = false;
|
||||
/* Acquire the server lock before executing a thread safe API call.
|
||||
* This is not needed for `RedisModule_Reply*` calls when there is
|
||||
@ -5052,10 +5053,10 @@ thread_local bool g_fModuleThread = false;
|
||||
void RM_ThreadSafeContextLock(RedisModuleCtx *ctx) {
|
||||
UNUSED(ctx);
|
||||
if (serverTL == nullptr) {
|
||||
serverTL = &g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN]; // arbitrary module threads get the main thread context
|
||||
serverTL = &vars; // arbitrary module threads get the main thread context
|
||||
g_fModuleThread = true;
|
||||
}
|
||||
moduleAcquireGIL(FALSE /*fServerThread*/);
|
||||
moduleAcquireGIL(FALSE /*fServerThread*/, true /*fExclusive*/);
|
||||
}
|
||||
|
||||
/* Similar to RM_ThreadSafeContextLock but this function
|
||||
@ -5067,7 +5068,7 @@ void RM_ThreadSafeContextLock(RedisModuleCtx *ctx) {
|
||||
int RM_ThreadSafeContextTryLock(RedisModuleCtx *ctx) {
|
||||
UNUSED(ctx);
|
||||
|
||||
int res = moduleTryAcquireGIL(false /*fServerThread*/);
|
||||
int res = moduleTryAcquireGIL(false /*fServerThread*/, true /*fExclusive*/);
|
||||
if(res != 0) {
|
||||
errno = res;
|
||||
return REDISMODULE_ERR;
|
||||
@ -5078,7 +5079,7 @@ int RM_ThreadSafeContextTryLock(RedisModuleCtx *ctx) {
|
||||
/* Release the server lock after a thread safe API call was executed. */
|
||||
void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) {
|
||||
UNUSED(ctx);
|
||||
moduleReleaseGIL(FALSE /*fServerThread*/);
|
||||
moduleReleaseGIL(FALSE /*fServerThread*/, true /*fExclusive*/);
|
||||
}
|
||||
|
||||
// A module may be triggered synchronously in a non-module context. In this scenario we don't lock again
|
||||
@ -5087,7 +5088,7 @@ static bool FModuleCallBackLock(bool fServerThread)
|
||||
{
|
||||
return !fServerThread && aeThreadOwnsLock() && !g_fModuleThread && s_cAcquisitionsServer > 0;
|
||||
}
|
||||
void moduleAcquireGIL(int fServerThread) {
|
||||
void moduleAcquireGIL(int fServerThread, int fExclusive) {
|
||||
std::unique_lock<std::mutex> lock(s_mutex);
|
||||
int *pcheck = fServerThread ? &s_cAcquisitionsModule : &s_cAcquisitionsServer;
|
||||
|
||||
@ -5101,23 +5102,27 @@ void moduleAcquireGIL(int fServerThread) {
|
||||
if (fServerThread)
|
||||
{
|
||||
++s_cAcquisitionsServer;
|
||||
serverTL->hasModuleGIL = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
// It is possible that another module thread holds the GIL (and s_mutexModule as a result).
|
||||
// When said thread goes to release the GIL, it will wait for s_mutex, which this thread owns.
|
||||
// This thread is however waiting for the GIL (and s_mutexModule) that the other thread owns.
|
||||
// As a result, a deadlock has occured.
|
||||
// We release the lock on s_mutex and wait until we are able to safely acquire the GIL
|
||||
// in order to prevent this deadlock from occuring.
|
||||
while (!s_mutexModule.try_lock())
|
||||
s_cv.wait(lock);
|
||||
// only try to acquire the mutexModule in exclusive mode
|
||||
if (fExclusive){
|
||||
// It is possible that another module thread holds the GIL (and s_mutexModule as a result).
|
||||
// When said thread goes to release the GIL, it will wait for s_mutex, which this thread owns.
|
||||
// This thread is however waiting for the GIL (and s_mutexModule) that the other thread owns.
|
||||
// As a result, a deadlock has occured.
|
||||
// We release the lock on s_mutex and wait until we are able to safely acquire the GIL
|
||||
// in order to prevent this deadlock from occuring.
|
||||
while (!s_mutexModule.try_lock())
|
||||
s_cv.wait(lock);
|
||||
}
|
||||
++s_cAcquisitionsModule;
|
||||
fModuleGILWlocked++;
|
||||
}
|
||||
}
|
||||
|
||||
int moduleTryAcquireGIL(bool fServerThread) {
|
||||
int moduleTryAcquireGIL(bool fServerThread, int fExclusive) {
|
||||
std::unique_lock<std::mutex> lock(s_mutex, std::defer_lock);
|
||||
if (!lock.try_lock())
|
||||
return 1;
|
||||
@ -5133,18 +5138,22 @@ int moduleTryAcquireGIL(bool fServerThread) {
|
||||
if (fServerThread)
|
||||
{
|
||||
++s_cAcquisitionsServer;
|
||||
serverTL->hasModuleGIL = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!s_mutexModule.try_lock())
|
||||
return 1;
|
||||
// only try to acquire the mutexModule in exclusive mode
|
||||
if (fExclusive){
|
||||
if (!s_mutexModule.try_lock())
|
||||
return 1;
|
||||
}
|
||||
++s_cAcquisitionsModule;
|
||||
fModuleGILWlocked++;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
void moduleReleaseGIL(int fServerThread) {
|
||||
void moduleReleaseGIL(int fServerThread, int fExclusive) {
|
||||
std::unique_lock<std::mutex> lock(s_mutex);
|
||||
|
||||
if (FModuleCallBackLock(fServerThread)) {
|
||||
@ -5154,10 +5163,13 @@ void moduleReleaseGIL(int fServerThread) {
|
||||
if (fServerThread)
|
||||
{
|
||||
--s_cAcquisitionsServer;
|
||||
serverTL->hasModuleGIL = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
s_mutexModule.unlock();
|
||||
// only try to release the mutexModule in exclusive mode
|
||||
if (fExclusive)
|
||||
s_mutexModule.unlock();
|
||||
--s_cAcquisitionsModule;
|
||||
fModuleGILWlocked--;
|
||||
}
|
||||
@ -5165,7 +5177,7 @@ void moduleReleaseGIL(int fServerThread) {
|
||||
}
|
||||
|
||||
int moduleGILAcquiredByModule(void) {
|
||||
return fModuleGILWlocked;
|
||||
return fModuleGILWlocked > 0;
|
||||
}
|
||||
|
||||
|
||||
@ -7735,7 +7747,18 @@ void moduleInitModulesSystem(void) {
|
||||
moduleCommandFilters = listCreate();
|
||||
|
||||
moduleRegisterCoreAPI();
|
||||
if (pipe(g_pserver->module_blocked_pipe) == -1) {
|
||||
serverLog(LL_WARNING,
|
||||
"Can't create the pipe for module blocking commands: %s",
|
||||
strerror(errno));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
/* Make the pipe non blocking. This is just a best effort aware mechanism
|
||||
* and we do not want to block not in the read nor in the write half. */
|
||||
anetNonBlock(NULL,g_pserver->module_blocked_pipe[0]);
|
||||
anetNonBlock(NULL,g_pserver->module_blocked_pipe[1]);
|
||||
|
||||
/* Create the timers radix tree. */
|
||||
Timers = raxNew();
|
||||
|
||||
|
@ -3104,21 +3104,9 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain)
|
||||
}
|
||||
}
|
||||
|
||||
if (pipe(pvar->module_blocked_pipe) == -1) {
|
||||
serverLog(LL_WARNING,
|
||||
"Can't create the pipe for module blocking commands: %s",
|
||||
strerror(errno));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
/* Make the pipe non blocking. This is just a best effort aware mechanism
|
||||
* and we do not want to block not in the read nor in the write half. */
|
||||
anetNonBlock(NULL,pvar->module_blocked_pipe[0]);
|
||||
anetNonBlock(NULL,pvar->module_blocked_pipe[1]);
|
||||
|
||||
/* Register a readable event for the pipe used to awake the event loop
|
||||
* when a blocked client in a module needs attention. */
|
||||
if (aeCreateFileEvent(pvar->el, pvar->module_blocked_pipe[0], AE_READABLE,
|
||||
if (aeCreateFileEvent(pvar->el, g_pserver->module_blocked_pipe[0], AE_READABLE,
|
||||
moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
|
||||
serverPanic(
|
||||
"Error registering the readable event for the module "
|
||||
@ -5685,6 +5673,8 @@ void *workerThreadMain(void *parg)
|
||||
{
|
||||
}
|
||||
serverAssert(!GlobalLocksAcquired());
|
||||
if (serverTL->hasModuleGIL)
|
||||
moduleReleaseGIL(true);
|
||||
aeDeleteEventLoop(el);
|
||||
|
||||
return NULL;
|
||||
|
14
src/server.h
14
src/server.h
@ -1384,15 +1384,13 @@ struct redisServerThreadVars {
|
||||
int cclients;
|
||||
client *current_client; /* Current client */
|
||||
long fixed_time_expire = 0; /* If > 0, expire keys against server.mstime. */
|
||||
int module_blocked_pipe[2]; /* Pipe used to awake the event loop if a
|
||||
client blocked on a module command needs
|
||||
to be processed. */
|
||||
client *lua_client = nullptr; /* The "fake client" to query Redis from Lua */
|
||||
struct fastlock lockPendingWrite { "thread pending write" };
|
||||
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
|
||||
long unsigned commandsExecuted = 0;
|
||||
bool fRetrySetAofEvent = false;
|
||||
std::vector<client*> vecclientsProcess;
|
||||
bool hasModuleGIL = false; /* Does this thread own the moduleGIL lock? */
|
||||
};
|
||||
|
||||
struct redisMaster {
|
||||
@ -1857,6 +1855,10 @@ struct redisServer {
|
||||
|
||||
long long repl_batch_offStart = -1;
|
||||
long long repl_batch_idxStart = -1;
|
||||
|
||||
int module_blocked_pipe[2]; /* Pipe used to awake the event loop if a
|
||||
client blocked on a module command needs
|
||||
to be processed. */
|
||||
};
|
||||
|
||||
typedef struct pubsubPattern {
|
||||
@ -1999,9 +2001,9 @@ void moduleHandleBlockedClients(int iel);
|
||||
void moduleBlockedClientTimedOut(client *c);
|
||||
void moduleBlockedClientPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask);
|
||||
size_t moduleCount(void);
|
||||
void moduleAcquireGIL(int fServerThread);
|
||||
int moduleTryAcquireGIL(bool fServerThread);
|
||||
void moduleReleaseGIL(int fServerThread);
|
||||
void moduleAcquireGIL(int fServerThread, int fExclusive = FALSE);
|
||||
int moduleTryAcquireGIL(bool fServerThread, int fExclusive = FALSE);
|
||||
void moduleReleaseGIL(int fServerThread, int fExclusive = FALSE);
|
||||
void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid);
|
||||
void moduleCallCommandFilters(client *c);
|
||||
int moduleHasCommandFilters();
|
||||
|
Loading…
x
Reference in New Issue
Block a user