added lock releasing w/ hasModuleGIL, changed module serverTL, and moved module_blocking_pipe to global scope to fix issue #276
Former-commit-id: 7d9a2ce827a2f8d48e4682b3cc95460cc82f9778
This commit is contained in:
parent
bc161f684f
commit
189967e7af
@ -4731,7 +4731,7 @@ int moduleUnblockClientByHandle(RedisModuleBlockedClient *bc, void *privdata) {
|
|||||||
if (!bc->blocked_on_keys) bc->privdata = privdata;
|
if (!bc->blocked_on_keys) bc->privdata = privdata;
|
||||||
bc->unblocked = 1;
|
bc->unblocked = 1;
|
||||||
listAddNodeTail(moduleUnblockedClients,bc);
|
listAddNodeTail(moduleUnblockedClients,bc);
|
||||||
if (write(serverTL->module_blocked_pipe[1],"A",1) != 1) {
|
if (write(g_pserver->module_blocked_pipe[1],"A",1) != 1) {
|
||||||
/* Ignore the error, this is best-effort. */
|
/* Ignore the error, this is best-effort. */
|
||||||
}
|
}
|
||||||
pthread_mutex_unlock(&moduleUnblockedClientsMutex);
|
pthread_mutex_unlock(&moduleUnblockedClientsMutex);
|
||||||
@ -4827,7 +4827,7 @@ void moduleHandleBlockedClients(int iel) {
|
|||||||
/* Here we unblock all the pending clients blocked in modules operations
|
/* Here we unblock all the pending clients blocked in modules operations
|
||||||
* so we can read every pending "awake byte" in the pipe. */
|
* so we can read every pending "awake byte" in the pipe. */
|
||||||
char buf[1];
|
char buf[1];
|
||||||
while (read(serverTL->module_blocked_pipe[0],buf,1) == 1);
|
while (read(g_pserver->module_blocked_pipe[0],buf,1) == 1);
|
||||||
listIter li;
|
listIter li;
|
||||||
listNode *ln;
|
listNode *ln;
|
||||||
listRewind(moduleUnblockedClients, &li);
|
listRewind(moduleUnblockedClients, &li);
|
||||||
@ -5045,6 +5045,7 @@ void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) {
|
|||||||
zfree(ctx);
|
zfree(ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static redisServerThreadVars vars;
|
||||||
thread_local bool g_fModuleThread = false;
|
thread_local bool g_fModuleThread = false;
|
||||||
/* Acquire the server lock before executing a thread safe API call.
|
/* Acquire the server lock before executing a thread safe API call.
|
||||||
* This is not needed for `RedisModule_Reply*` calls when there is
|
* This is not needed for `RedisModule_Reply*` calls when there is
|
||||||
@ -5052,10 +5053,10 @@ thread_local bool g_fModuleThread = false;
|
|||||||
void RM_ThreadSafeContextLock(RedisModuleCtx *ctx) {
|
void RM_ThreadSafeContextLock(RedisModuleCtx *ctx) {
|
||||||
UNUSED(ctx);
|
UNUSED(ctx);
|
||||||
if (serverTL == nullptr) {
|
if (serverTL == nullptr) {
|
||||||
serverTL = &g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN]; // arbitrary module threads get the main thread context
|
serverTL = &vars; // arbitrary module threads get the main thread context
|
||||||
g_fModuleThread = true;
|
g_fModuleThread = true;
|
||||||
}
|
}
|
||||||
moduleAcquireGIL(FALSE /*fServerThread*/);
|
moduleAcquireGIL(FALSE /*fServerThread*/, true /*fExclusive*/);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Similar to RM_ThreadSafeContextLock but this function
|
/* Similar to RM_ThreadSafeContextLock but this function
|
||||||
@ -5067,7 +5068,7 @@ void RM_ThreadSafeContextLock(RedisModuleCtx *ctx) {
|
|||||||
int RM_ThreadSafeContextTryLock(RedisModuleCtx *ctx) {
|
int RM_ThreadSafeContextTryLock(RedisModuleCtx *ctx) {
|
||||||
UNUSED(ctx);
|
UNUSED(ctx);
|
||||||
|
|
||||||
int res = moduleTryAcquireGIL(false /*fServerThread*/);
|
int res = moduleTryAcquireGIL(false /*fServerThread*/, true /*fExclusive*/);
|
||||||
if(res != 0) {
|
if(res != 0) {
|
||||||
errno = res;
|
errno = res;
|
||||||
return REDISMODULE_ERR;
|
return REDISMODULE_ERR;
|
||||||
@ -5078,7 +5079,7 @@ int RM_ThreadSafeContextTryLock(RedisModuleCtx *ctx) {
|
|||||||
/* Release the server lock after a thread safe API call was executed. */
|
/* Release the server lock after a thread safe API call was executed. */
|
||||||
void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) {
|
void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) {
|
||||||
UNUSED(ctx);
|
UNUSED(ctx);
|
||||||
moduleReleaseGIL(FALSE /*fServerThread*/);
|
moduleReleaseGIL(FALSE /*fServerThread*/, true /*fExclusive*/);
|
||||||
}
|
}
|
||||||
|
|
||||||
// A module may be triggered synchronously in a non-module context. In this scenario we don't lock again
|
// A module may be triggered synchronously in a non-module context. In this scenario we don't lock again
|
||||||
@ -5087,7 +5088,7 @@ static bool FModuleCallBackLock(bool fServerThread)
|
|||||||
{
|
{
|
||||||
return !fServerThread && aeThreadOwnsLock() && !g_fModuleThread && s_cAcquisitionsServer > 0;
|
return !fServerThread && aeThreadOwnsLock() && !g_fModuleThread && s_cAcquisitionsServer > 0;
|
||||||
}
|
}
|
||||||
void moduleAcquireGIL(int fServerThread) {
|
void moduleAcquireGIL(int fServerThread, int fExclusive) {
|
||||||
std::unique_lock<std::mutex> lock(s_mutex);
|
std::unique_lock<std::mutex> lock(s_mutex);
|
||||||
int *pcheck = fServerThread ? &s_cAcquisitionsModule : &s_cAcquisitionsServer;
|
int *pcheck = fServerThread ? &s_cAcquisitionsModule : &s_cAcquisitionsServer;
|
||||||
|
|
||||||
@ -5101,23 +5102,27 @@ void moduleAcquireGIL(int fServerThread) {
|
|||||||
if (fServerThread)
|
if (fServerThread)
|
||||||
{
|
{
|
||||||
++s_cAcquisitionsServer;
|
++s_cAcquisitionsServer;
|
||||||
|
serverTL->hasModuleGIL = true;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
// It is possible that another module thread holds the GIL (and s_mutexModule as a result).
|
// only try to acquire the mutexModule in exclusive mode
|
||||||
// When said thread goes to release the GIL, it will wait for s_mutex, which this thread owns.
|
if (fExclusive){
|
||||||
// This thread is however waiting for the GIL (and s_mutexModule) that the other thread owns.
|
// It is possible that another module thread holds the GIL (and s_mutexModule as a result).
|
||||||
// As a result, a deadlock has occured.
|
// When said thread goes to release the GIL, it will wait for s_mutex, which this thread owns.
|
||||||
// We release the lock on s_mutex and wait until we are able to safely acquire the GIL
|
// This thread is however waiting for the GIL (and s_mutexModule) that the other thread owns.
|
||||||
// in order to prevent this deadlock from occuring.
|
// As a result, a deadlock has occured.
|
||||||
while (!s_mutexModule.try_lock())
|
// We release the lock on s_mutex and wait until we are able to safely acquire the GIL
|
||||||
s_cv.wait(lock);
|
// in order to prevent this deadlock from occuring.
|
||||||
|
while (!s_mutexModule.try_lock())
|
||||||
|
s_cv.wait(lock);
|
||||||
|
}
|
||||||
++s_cAcquisitionsModule;
|
++s_cAcquisitionsModule;
|
||||||
fModuleGILWlocked++;
|
fModuleGILWlocked++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int moduleTryAcquireGIL(bool fServerThread) {
|
int moduleTryAcquireGIL(bool fServerThread, int fExclusive) {
|
||||||
std::unique_lock<std::mutex> lock(s_mutex, std::defer_lock);
|
std::unique_lock<std::mutex> lock(s_mutex, std::defer_lock);
|
||||||
if (!lock.try_lock())
|
if (!lock.try_lock())
|
||||||
return 1;
|
return 1;
|
||||||
@ -5133,18 +5138,22 @@ int moduleTryAcquireGIL(bool fServerThread) {
|
|||||||
if (fServerThread)
|
if (fServerThread)
|
||||||
{
|
{
|
||||||
++s_cAcquisitionsServer;
|
++s_cAcquisitionsServer;
|
||||||
|
serverTL->hasModuleGIL = true;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
if (!s_mutexModule.try_lock())
|
// only try to acquire the mutexModule in exclusive mode
|
||||||
return 1;
|
if (fExclusive){
|
||||||
|
if (!s_mutexModule.try_lock())
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
++s_cAcquisitionsModule;
|
++s_cAcquisitionsModule;
|
||||||
fModuleGILWlocked++;
|
fModuleGILWlocked++;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void moduleReleaseGIL(int fServerThread) {
|
void moduleReleaseGIL(int fServerThread, int fExclusive) {
|
||||||
std::unique_lock<std::mutex> lock(s_mutex);
|
std::unique_lock<std::mutex> lock(s_mutex);
|
||||||
|
|
||||||
if (FModuleCallBackLock(fServerThread)) {
|
if (FModuleCallBackLock(fServerThread)) {
|
||||||
@ -5154,10 +5163,13 @@ void moduleReleaseGIL(int fServerThread) {
|
|||||||
if (fServerThread)
|
if (fServerThread)
|
||||||
{
|
{
|
||||||
--s_cAcquisitionsServer;
|
--s_cAcquisitionsServer;
|
||||||
|
serverTL->hasModuleGIL = false;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
s_mutexModule.unlock();
|
// only try to release the mutexModule in exclusive mode
|
||||||
|
if (fExclusive)
|
||||||
|
s_mutexModule.unlock();
|
||||||
--s_cAcquisitionsModule;
|
--s_cAcquisitionsModule;
|
||||||
fModuleGILWlocked--;
|
fModuleGILWlocked--;
|
||||||
}
|
}
|
||||||
@ -5165,7 +5177,7 @@ void moduleReleaseGIL(int fServerThread) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
int moduleGILAcquiredByModule(void) {
|
int moduleGILAcquiredByModule(void) {
|
||||||
return fModuleGILWlocked;
|
return fModuleGILWlocked > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -7735,7 +7747,18 @@ void moduleInitModulesSystem(void) {
|
|||||||
moduleCommandFilters = listCreate();
|
moduleCommandFilters = listCreate();
|
||||||
|
|
||||||
moduleRegisterCoreAPI();
|
moduleRegisterCoreAPI();
|
||||||
|
if (pipe(g_pserver->module_blocked_pipe) == -1) {
|
||||||
|
serverLog(LL_WARNING,
|
||||||
|
"Can't create the pipe for module blocking commands: %s",
|
||||||
|
strerror(errno));
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Make the pipe non blocking. This is just a best effort aware mechanism
|
||||||
|
* and we do not want to block not in the read nor in the write half. */
|
||||||
|
anetNonBlock(NULL,g_pserver->module_blocked_pipe[0]);
|
||||||
|
anetNonBlock(NULL,g_pserver->module_blocked_pipe[1]);
|
||||||
|
|
||||||
/* Create the timers radix tree. */
|
/* Create the timers radix tree. */
|
||||||
Timers = raxNew();
|
Timers = raxNew();
|
||||||
|
|
||||||
|
@ -3104,21 +3104,9 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pipe(pvar->module_blocked_pipe) == -1) {
|
|
||||||
serverLog(LL_WARNING,
|
|
||||||
"Can't create the pipe for module blocking commands: %s",
|
|
||||||
strerror(errno));
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Make the pipe non blocking. This is just a best effort aware mechanism
|
|
||||||
* and we do not want to block not in the read nor in the write half. */
|
|
||||||
anetNonBlock(NULL,pvar->module_blocked_pipe[0]);
|
|
||||||
anetNonBlock(NULL,pvar->module_blocked_pipe[1]);
|
|
||||||
|
|
||||||
/* Register a readable event for the pipe used to awake the event loop
|
/* Register a readable event for the pipe used to awake the event loop
|
||||||
* when a blocked client in a module needs attention. */
|
* when a blocked client in a module needs attention. */
|
||||||
if (aeCreateFileEvent(pvar->el, pvar->module_blocked_pipe[0], AE_READABLE,
|
if (aeCreateFileEvent(pvar->el, g_pserver->module_blocked_pipe[0], AE_READABLE,
|
||||||
moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
|
moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
|
||||||
serverPanic(
|
serverPanic(
|
||||||
"Error registering the readable event for the module "
|
"Error registering the readable event for the module "
|
||||||
@ -5685,6 +5673,8 @@ void *workerThreadMain(void *parg)
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
serverAssert(!GlobalLocksAcquired());
|
serverAssert(!GlobalLocksAcquired());
|
||||||
|
if (serverTL->hasModuleGIL)
|
||||||
|
moduleReleaseGIL(true);
|
||||||
aeDeleteEventLoop(el);
|
aeDeleteEventLoop(el);
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
|
14
src/server.h
14
src/server.h
@ -1384,15 +1384,13 @@ struct redisServerThreadVars {
|
|||||||
int cclients;
|
int cclients;
|
||||||
client *current_client; /* Current client */
|
client *current_client; /* Current client */
|
||||||
long fixed_time_expire = 0; /* If > 0, expire keys against server.mstime. */
|
long fixed_time_expire = 0; /* If > 0, expire keys against server.mstime. */
|
||||||
int module_blocked_pipe[2]; /* Pipe used to awake the event loop if a
|
|
||||||
client blocked on a module command needs
|
|
||||||
to be processed. */
|
|
||||||
client *lua_client = nullptr; /* The "fake client" to query Redis from Lua */
|
client *lua_client = nullptr; /* The "fake client" to query Redis from Lua */
|
||||||
struct fastlock lockPendingWrite { "thread pending write" };
|
struct fastlock lockPendingWrite { "thread pending write" };
|
||||||
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
|
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
|
||||||
long unsigned commandsExecuted = 0;
|
long unsigned commandsExecuted = 0;
|
||||||
bool fRetrySetAofEvent = false;
|
bool fRetrySetAofEvent = false;
|
||||||
std::vector<client*> vecclientsProcess;
|
std::vector<client*> vecclientsProcess;
|
||||||
|
bool hasModuleGIL = false; /* Does this thread own the moduleGIL lock? */
|
||||||
};
|
};
|
||||||
|
|
||||||
struct redisMaster {
|
struct redisMaster {
|
||||||
@ -1857,6 +1855,10 @@ struct redisServer {
|
|||||||
|
|
||||||
long long repl_batch_offStart = -1;
|
long long repl_batch_offStart = -1;
|
||||||
long long repl_batch_idxStart = -1;
|
long long repl_batch_idxStart = -1;
|
||||||
|
|
||||||
|
int module_blocked_pipe[2]; /* Pipe used to awake the event loop if a
|
||||||
|
client blocked on a module command needs
|
||||||
|
to be processed. */
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct pubsubPattern {
|
typedef struct pubsubPattern {
|
||||||
@ -1999,9 +2001,9 @@ void moduleHandleBlockedClients(int iel);
|
|||||||
void moduleBlockedClientTimedOut(client *c);
|
void moduleBlockedClientTimedOut(client *c);
|
||||||
void moduleBlockedClientPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask);
|
void moduleBlockedClientPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask);
|
||||||
size_t moduleCount(void);
|
size_t moduleCount(void);
|
||||||
void moduleAcquireGIL(int fServerThread);
|
void moduleAcquireGIL(int fServerThread, int fExclusive = FALSE);
|
||||||
int moduleTryAcquireGIL(bool fServerThread);
|
int moduleTryAcquireGIL(bool fServerThread, int fExclusive = FALSE);
|
||||||
void moduleReleaseGIL(int fServerThread);
|
void moduleReleaseGIL(int fServerThread, int fExclusive = FALSE);
|
||||||
void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid);
|
void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid);
|
||||||
void moduleCallCommandFilters(client *c);
|
void moduleCallCommandFilters(client *c);
|
||||||
int moduleHasCommandFilters();
|
int moduleHasCommandFilters();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user