Fix module locking issues

Former-commit-id: bf26959b722285f9b8caedb853e712d5b4ce6b3f
This commit is contained in:
John Sully 2019-05-09 19:00:27 -04:00
parent 7b87123505
commit e69b51a6d4
4 changed files with 96 additions and 53 deletions

View File

@ -30,6 +30,8 @@
#include "server.h" #include "server.h"
#include "cluster.h" #include "cluster.h"
#include <dlfcn.h> #include <dlfcn.h>
#include <mutex>
#include <condition_variable>
#define REDISMODULE_CORE 1 #define REDISMODULE_CORE 1
#include "redismodule.h" #include "redismodule.h"
@ -235,7 +237,6 @@ static list *moduleUnblockedClients;
/* We need a mutex that is unlocked / relocked in beforeSleep() in order to /* We need a mutex that is unlocked / relocked in beforeSleep() in order to
* allow thread safe contexts to execute commands at a safe moment. */ * allow thread safe contexts to execute commands at a safe moment. */
static pthread_rwlock_t moduleGIL = PTHREAD_RWLOCK_INITIALIZER;
int fModuleGILWlocked = FALSE; int fModuleGILWlocked = FALSE;
/* Function pointer type for keyspace event notification subscriptions from modules. */ /* Function pointer type for keyspace event notification subscriptions from modules. */
@ -293,6 +294,12 @@ typedef struct RedisModuleCommandFilter {
/* Registered filters */ /* Registered filters */
static list *moduleCommandFilters; static list *moduleCommandFilters;
/* Module GIL Variables */
static int s_cAcquisitionsServer = 0;
static int s_cAcquisitionsModule = 0;
static std::mutex s_mutex;
static std::condition_variable s_cv;
/* -------------------------------------------------------------------------- /* --------------------------------------------------------------------------
* Prototypes * Prototypes
* -------------------------------------------------------------------------- */ * -------------------------------------------------------------------------- */
@ -3663,9 +3670,22 @@ int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) {
pthread_mutex_lock(&moduleUnblockedClientsMutex); pthread_mutex_lock(&moduleUnblockedClientsMutex);
bc->privdata = privdata; bc->privdata = privdata;
listAddNodeTail(moduleUnblockedClients,bc); listAddNodeTail(moduleUnblockedClients,bc);
if (write(g_pserver->module_blocked_pipe[1],"A",1) != 1) { if (bc->client != nullptr)
{
if (write(g_pserver->rgthreadvar[bc->client->iel].module_blocked_pipe[1],"A",1) != 1) {
/* Ignore the error, this is best-effort. */ /* Ignore the error, this is best-effort. */
} }
}
else
{
for (int iel = 0; iel < cserver.cthreads; ++iel)
{
if (write(g_pserver->rgthreadvar[iel].module_blocked_pipe[1],"A",1) != 1) {
/* Ignore the error, this is best-effort. */
}
}
}
pthread_mutex_unlock(&moduleUnblockedClientsMutex); pthread_mutex_unlock(&moduleUnblockedClientsMutex);
return REDISMODULE_OK; return REDISMODULE_OK;
} }
@ -3706,8 +3726,7 @@ void RM_SetDisconnectCallback(RedisModuleBlockedClient *bc, RedisModuleDisconnec
* blocked client, it was terminated by Redis (for timeout or other reasons). * blocked client, it was terminated by Redis (for timeout or other reasons).
* When this happens the RedisModuleBlockedClient structure in the queue * When this happens the RedisModuleBlockedClient structure in the queue
* will have the 'client' field set to NULL. */ * will have the 'client' field set to NULL. */
void moduleHandleBlockedClients(void) { void moduleHandleBlockedClients(int iel) {
listNode *ln;
RedisModuleBlockedClient *bc; RedisModuleBlockedClient *bc;
serverAssert(GlobalLocksAcquired()); serverAssert(GlobalLocksAcquired());
@ -3715,12 +3734,16 @@ void moduleHandleBlockedClients(void) {
/* Here we unblock all the pending clients blocked in modules operations /* Here we unblock all the pending clients blocked in modules operations
* so we can read every pending "awake byte" in the pipe. */ * so we can read every pending "awake byte" in the pipe. */
char buf[1]; char buf[1];
while (read(g_pserver->module_blocked_pipe[0],buf,1) == 1); while (read(serverTL->module_blocked_pipe[0],buf,1) == 1);
while (listLength(moduleUnblockedClients)) { listIter li;
ln = listFirst(moduleUnblockedClients); listNode *ln;
listRewind(moduleUnblockedClients, &li);
while ((ln = listNext(&li))) {
bc = (RedisModuleBlockedClient*)ln->value; bc = (RedisModuleBlockedClient*)ln->value;
client *c = bc->client; client *c = bc->client;
serverAssert(c->iel == IDX_EVENT_LOOP_MAIN); if ((c != nullptr) && (iel != c->iel))
continue;
listDelNode(moduleUnblockedClients,ln); listDelNode(moduleUnblockedClients,ln);
pthread_mutex_unlock(&moduleUnblockedClientsMutex); pthread_mutex_unlock(&moduleUnblockedClientsMutex);
@ -3919,23 +3942,36 @@ void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) {
} }
void moduleAcquireGIL(int fServerThread) { void moduleAcquireGIL(int fServerThread) {
std::unique_lock<std::mutex> lock(s_mutex);
int *pcheck = fServerThread ? &s_cAcquisitionsModule : &s_cAcquisitionsServer;
while (*pcheck > 0)
s_cv.wait(lock);
if (fServerThread) if (fServerThread)
{ {
pthread_rwlock_rdlock(&moduleGIL); ++s_cAcquisitionsServer;
} }
else else
{ {
pthread_rwlock_wrlock(&moduleGIL); ++s_cAcquisitionsModule;
fModuleGILWlocked = TRUE; fModuleGILWlocked++;
} }
} }
void moduleReleaseGIL(int fServerThread) { void moduleReleaseGIL(int fServerThread) {
pthread_rwlock_unlock(&moduleGIL); std::unique_lock<std::mutex> lock(s_mutex);
if (!fServerThread)
if (fServerThread)
{ {
fModuleGILWlocked = FALSE; --s_cAcquisitionsServer;
} }
else
{
--s_cAcquisitionsModule;
fModuleGILWlocked--;
}
s_cv.notify_all();
} }
int moduleGILAcquiredByModule(void) { int moduleGILAcquiredByModule(void) {
@ -5102,24 +5138,13 @@ void moduleInitModulesSystem(void) {
moduleCommandFilters = listCreate(); moduleCommandFilters = listCreate();
moduleRegisterCoreAPI(); moduleRegisterCoreAPI();
if (pipe(g_pserver->module_blocked_pipe) == -1) {
serverLog(LL_WARNING,
"Can't create the pipe for module blocking commands: %s",
strerror(errno));
exit(1);
}
/* Make the pipe non blocking. This is just a best effort aware mechanism
* and we do not want to block not in the read nor in the write half. */
anetNonBlock(NULL,g_pserver->module_blocked_pipe[0]);
anetNonBlock(NULL,g_pserver->module_blocked_pipe[1]);
/* Create the timers radix tree. */ /* Create the timers radix tree. */
Timers = raxNew(); Timers = raxNew();
/* Our thread-safe contexts GIL must start with already locked: /* Our thread-safe contexts GIL must start with already locked:
* it is just unlocked when it's safe. */ * it is just unlocked when it's safe. */
pthread_rwlock_init(&moduleGIL, NULL); moduleAcquireGIL(true);
pthread_rwlock_rdlock(&moduleGIL);
} }
/* Load all the modules in the g_pserver->loadmodule_queue list, which is /* Load all the modules in the g_pserver->loadmodule_queue list, which is

View File

@ -1200,7 +1200,7 @@ void unlinkClient(client *c) {
serverAssert(c->fd == -1 || c->lock.fOwnLock()); serverAssert(c->fd == -1 || c->lock.fOwnLock());
/* If this is marked as current client unset it. */ /* If this is marked as current client unset it. */
if (serverTL->current_client == c) serverTL->current_client = NULL; if (serverTL && serverTL->current_client == c) serverTL->current_client = NULL;
/* Certain operations must be done only if the client has an active socket. /* Certain operations must be done only if the client has an active socket.
* If the client was already unlinked or if it's a "fake client" the * If the client was already unlinked or if it's a "fake client" the

View File

@ -2103,7 +2103,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
/* Check if there are clients unblocked by modules that implement /* Check if there are clients unblocked by modules that implement
* blocking commands. */ * blocking commands. */
moduleHandleBlockedClients(); moduleHandleBlockedClients(ielFromEventLoop(eventLoop));
/* Try to process pending commands for clients that were just unblocked. */ /* Try to process pending commands for clients that were just unblocked. */
if (listLength(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].unblocked_clients)) if (listLength(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].unblocked_clients))
@ -2134,6 +2134,10 @@ void beforeSleepLite(struct aeEventLoop *eventLoop)
if (listLength(g_pserver->rgthreadvar[iel].unblocked_clients)) { if (listLength(g_pserver->rgthreadvar[iel].unblocked_clients)) {
processUnblockedClients(iel); processUnblockedClients(iel);
} }
/* Check if there are clients unblocked by modules that implement
* blocking commands. */
moduleHandleBlockedClients(ielFromEventLoop(eventLoop));
aeReleaseLock(); aeReleaseLock();
/* Handle writes with pending output buffers. */ /* Handle writes with pending output buffers. */
@ -2872,6 +2876,37 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain)
exit(1); exit(1);
} }
} }
if (pipe(pvar->module_blocked_pipe) == -1) {
serverLog(LL_WARNING,
"Can't create the pipe for module blocking commands: %s",
strerror(errno));
exit(1);
}
/* Make the pipe non blocking. This is just a best effort aware mechanism
* and we do not want to block not in the read nor in the write half. */
anetNonBlock(NULL,pvar->module_blocked_pipe[0]);
anetNonBlock(NULL,pvar->module_blocked_pipe[1]);
/* Register a readable event for the pipe used to awake the event loop
* when a blocked client in a module needs attention. */
if (aeCreateFileEvent(pvar->el, pvar->module_blocked_pipe[0], AE_READABLE,
moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
serverPanic(
"Error registering the readable event for the module "
"blocked clients subsystem.");
}
/* Register a readable event for the pipe used to awake the event loop
* when a blocked client in a module needs attention. */
if (aeCreateFileEvent(pvar->el, pvar->module_blocked_pipe[0], AE_READABLE,
moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
serverPanic(
"Error registering the readable event for the module "
"blocked clients subsystem.");
}
} }
void initServer(void) { void initServer(void) {
@ -2946,25 +2981,6 @@ void initServer(void) {
exit(1); exit(1);
} }
/* Register a readable event for the pipe used to awake the event loop
* when a blocked client in a module needs attention. */
if (aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->module_blocked_pipe[0], AE_READABLE,
moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
serverPanic(
"Error registering the readable event for the module "
"blocked clients subsystem.");
}
/* Register a readable event for the pipe used to awake the event loop
* when a blocked client in a module needs attention. */
if (aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->module_blocked_pipe[0], AE_READABLE,
moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
serverPanic(
"Error registering the readable event for the module "
"blocked clients subsystem.");
}
/* Open the AOF file if needed. */ /* Open the AOF file if needed. */
if (g_pserver->aof_state == AOF_ON) { if (g_pserver->aof_state == AOF_ON) {
g_pserver->aof_fd = open(g_pserver->aof_filename, g_pserver->aof_fd = open(g_pserver->aof_filename,
@ -4907,6 +4923,7 @@ void *workerThreadMain(void *parg)
serverLog(LOG_INFO, "Thread %d alive.", iel); serverLog(LOG_INFO, "Thread %d alive.", iel);
serverTL = g_pserver->rgthreadvar+iel; // set the TLS threadsafe global serverTL = g_pserver->rgthreadvar+iel; // set the TLS threadsafe global
moduleAcquireGIL(true); // Normally afterSleep acquires this, but that won't be called on the first run
int isMainThread = (iel == IDX_EVENT_LOOP_MAIN); int isMainThread = (iel == IDX_EVENT_LOOP_MAIN);
aeEventLoop *el = g_pserver->rgthreadvar[iel].el; aeEventLoop *el = g_pserver->rgthreadvar[iel].el;
aeSetBeforeSleepProc(el, isMainThread ? beforeSleep : beforeSleepLite, isMainThread ? 0 : AE_SLEEP_THREADSAFE); aeSetBeforeSleepProc(el, isMainThread ? beforeSleep : beforeSleepLite, isMainThread ? 0 : AE_SLEEP_THREADSAFE);
@ -5143,6 +5160,7 @@ int main(int argc, char **argv) {
} }
aeReleaseLock(); //Finally we can dump the lock aeReleaseLock(); //Finally we can dump the lock
moduleReleaseGIL(true);
serverAssert(cserver.cthreads > 0 && cserver.cthreads <= MAX_EVENT_LOOPS); serverAssert(cserver.cthreads > 0 && cserver.cthreads <= MAX_EVENT_LOOPS);
pthread_t rgthread[MAX_EVENT_LOOPS]; pthread_t rgthread[MAX_EVENT_LOOPS];

View File

@ -1145,6 +1145,9 @@ struct redisServerThreadVars {
list *clients_pending_asyncwrite; list *clients_pending_asyncwrite;
int cclients; int cclients;
client *current_client; /* Current client */ client *current_client; /* Current client */
int module_blocked_pipe[2]; /* Pipe used to awake the event loop if a
client blocked on a module command needs
to be processed. */
struct fastlock lockPendingWrite; struct fastlock lockPendingWrite;
}; };
@ -1247,9 +1250,6 @@ struct redisServer {
dict *sharedapi; /* Like moduleapi but containing the APIs that dict *sharedapi; /* Like moduleapi but containing the APIs that
modules share with each other. */ modules share with each other. */
list *loadmodule_queue; /* List of modules to load at startup. */ list *loadmodule_queue; /* List of modules to load at startup. */
int module_blocked_pipe[2]; /* Pipe used to awake the event loop if a
client blocked on a module command needs
to be processed. */
/* Networking */ /* Networking */
int port; /* TCP listening port */ int port; /* TCP listening port */
int tcp_backlog; /* TCP listen() backlog */ int tcp_backlog; /* TCP listen() backlog */
@ -1665,7 +1665,7 @@ moduleType *moduleTypeLookupModuleByID(uint64_t id);
void moduleTypeNameByID(char *name, uint64_t moduleid); void moduleTypeNameByID(char *name, uint64_t moduleid);
void moduleFreeContext(struct RedisModuleCtx *ctx); void moduleFreeContext(struct RedisModuleCtx *ctx);
void unblockClientFromModule(client *c); void unblockClientFromModule(client *c);
void moduleHandleBlockedClients(void); void moduleHandleBlockedClients(int iel);
void moduleBlockedClientTimedOut(client *c); void moduleBlockedClientTimedOut(client *c);
void moduleBlockedClientPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask); void moduleBlockedClientPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask);
size_t moduleCount(void); size_t moduleCount(void);