From e69b51a6d4306c0d25aa8683c9e54688984a7bdf Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 9 May 2019 19:00:27 -0400 Subject: [PATCH] Fix module locking issues Former-commit-id: bf26959b722285f9b8caedb853e712d5b4ce6b3f --- src/module.cpp | 81 ++++++++++++++++++++++++++++++---------------- src/networking.cpp | 2 +- src/server.cpp | 58 +++++++++++++++++++++------------ src/server.h | 8 ++--- 4 files changed, 96 insertions(+), 53 deletions(-) diff --git a/src/module.cpp b/src/module.cpp index 26de7853f..04ca21a97 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -30,6 +30,8 @@ #include "server.h" #include "cluster.h" #include +#include +#include #define REDISMODULE_CORE 1 #include "redismodule.h" @@ -235,7 +237,6 @@ static list *moduleUnblockedClients; /* We need a mutex that is unlocked / relocked in beforeSleep() in order to * allow thread safe contexts to execute commands at a safe moment. */ -static pthread_rwlock_t moduleGIL = PTHREAD_RWLOCK_INITIALIZER; int fModuleGILWlocked = FALSE; /* Function pointer type for keyspace event notification subscriptions from modules. */ @@ -293,6 +294,12 @@ typedef struct RedisModuleCommandFilter { /* Registered filters */ static list *moduleCommandFilters; +/* Module GIL Variables */ +static int s_cAcquisitionsServer = 0; +static int s_cAcquisitionsModule = 0; +static std::mutex s_mutex; +static std::condition_variable s_cv; + /* -------------------------------------------------------------------------- * Prototypes * -------------------------------------------------------------------------- */ @@ -2750,7 +2757,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch int replicate = 0; /* Replicate this command? */ int call_flags; sds proto = nullptr; - + /* Create the client and dispatch the command. */ va_start(ap, fmt); c = createClient(-1, IDX_EVENT_LOOP_MAIN); @@ -3663,9 +3670,22 @@ int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) { pthread_mutex_lock(&moduleUnblockedClientsMutex); bc->privdata = privdata; listAddNodeTail(moduleUnblockedClients,bc); - if (write(g_pserver->module_blocked_pipe[1],"A",1) != 1) { - /* Ignore the error, this is best-effort. */ + if (bc->client != nullptr) + { + if (write(g_pserver->rgthreadvar[bc->client->iel].module_blocked_pipe[1],"A",1) != 1) { + /* Ignore the error, this is best-effort. */ + } } + else + { + for (int iel = 0; iel < cserver.cthreads; ++iel) + { + if (write(g_pserver->rgthreadvar[iel].module_blocked_pipe[1],"A",1) != 1) { + /* Ignore the error, this is best-effort. */ + } + } + } + pthread_mutex_unlock(&moduleUnblockedClientsMutex); return REDISMODULE_OK; } @@ -3706,8 +3726,7 @@ void RM_SetDisconnectCallback(RedisModuleBlockedClient *bc, RedisModuleDisconnec * blocked client, it was terminated by Redis (for timeout or other reasons). * When this happens the RedisModuleBlockedClient structure in the queue * will have the 'client' field set to NULL. */ -void moduleHandleBlockedClients(void) { - listNode *ln; +void moduleHandleBlockedClients(int iel) { RedisModuleBlockedClient *bc; serverAssert(GlobalLocksAcquired()); @@ -3715,12 +3734,16 @@ void moduleHandleBlockedClients(void) { /* Here we unblock all the pending clients blocked in modules operations * so we can read every pending "awake byte" in the pipe. */ char buf[1]; - while (read(g_pserver->module_blocked_pipe[0],buf,1) == 1); - while (listLength(moduleUnblockedClients)) { - ln = listFirst(moduleUnblockedClients); + while (read(serverTL->module_blocked_pipe[0],buf,1) == 1); + listIter li; + listNode *ln; + listRewind(moduleUnblockedClients, &li); + while ((ln = listNext(&li))) { bc = (RedisModuleBlockedClient*)ln->value; client *c = bc->client; - serverAssert(c->iel == IDX_EVENT_LOOP_MAIN); + if ((c != nullptr) && (iel != c->iel)) + continue; + listDelNode(moduleUnblockedClients,ln); pthread_mutex_unlock(&moduleUnblockedClientsMutex); @@ -3919,23 +3942,36 @@ void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) { } void moduleAcquireGIL(int fServerThread) { + std::unique_lock lock(s_mutex); + int *pcheck = fServerThread ? &s_cAcquisitionsModule : &s_cAcquisitionsServer; + + while (*pcheck > 0) + s_cv.wait(lock); + if (fServerThread) { - pthread_rwlock_rdlock(&moduleGIL); + ++s_cAcquisitionsServer; } else { - pthread_rwlock_wrlock(&moduleGIL); - fModuleGILWlocked = TRUE; + ++s_cAcquisitionsModule; + fModuleGILWlocked++; } } void moduleReleaseGIL(int fServerThread) { - pthread_rwlock_unlock(&moduleGIL); - if (!fServerThread) + std::unique_lock lock(s_mutex); + + if (fServerThread) { - fModuleGILWlocked = FALSE; + --s_cAcquisitionsServer; } + else + { + --s_cAcquisitionsModule; + fModuleGILWlocked--; + } + s_cv.notify_all(); } int moduleGILAcquiredByModule(void) { @@ -5102,24 +5138,13 @@ void moduleInitModulesSystem(void) { moduleCommandFilters = listCreate(); moduleRegisterCoreAPI(); - if (pipe(g_pserver->module_blocked_pipe) == -1) { - serverLog(LL_WARNING, - "Can't create the pipe for module blocking commands: %s", - strerror(errno)); - exit(1); - } - /* Make the pipe non blocking. This is just a best effort aware mechanism - * and we do not want to block not in the read nor in the write half. */ - anetNonBlock(NULL,g_pserver->module_blocked_pipe[0]); - anetNonBlock(NULL,g_pserver->module_blocked_pipe[1]); /* Create the timers radix tree. */ Timers = raxNew(); /* Our thread-safe contexts GIL must start with already locked: * it is just unlocked when it's safe. */ - pthread_rwlock_init(&moduleGIL, NULL); - pthread_rwlock_rdlock(&moduleGIL); + moduleAcquireGIL(true); } /* Load all the modules in the g_pserver->loadmodule_queue list, which is diff --git a/src/networking.cpp b/src/networking.cpp index c6fb9cbf6..29d17c8a3 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1200,7 +1200,7 @@ void unlinkClient(client *c) { serverAssert(c->fd == -1 || c->lock.fOwnLock()); /* If this is marked as current client unset it. */ - if (serverTL->current_client == c) serverTL->current_client = NULL; + if (serverTL && serverTL->current_client == c) serverTL->current_client = NULL; /* Certain operations must be done only if the client has an active socket. * If the client was already unlinked or if it's a "fake client" the diff --git a/src/server.cpp b/src/server.cpp index eb3163942..7a308fea3 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2103,7 +2103,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* Check if there are clients unblocked by modules that implement * blocking commands. */ - moduleHandleBlockedClients(); + moduleHandleBlockedClients(ielFromEventLoop(eventLoop)); /* Try to process pending commands for clients that were just unblocked. */ if (listLength(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].unblocked_clients)) @@ -2134,6 +2134,10 @@ void beforeSleepLite(struct aeEventLoop *eventLoop) if (listLength(g_pserver->rgthreadvar[iel].unblocked_clients)) { processUnblockedClients(iel); } + + /* Check if there are clients unblocked by modules that implement + * blocking commands. */ + moduleHandleBlockedClients(ielFromEventLoop(eventLoop)); aeReleaseLock(); /* Handle writes with pending output buffers. */ @@ -2872,6 +2876,37 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain) exit(1); } } + + if (pipe(pvar->module_blocked_pipe) == -1) { + serverLog(LL_WARNING, + "Can't create the pipe for module blocking commands: %s", + strerror(errno)); + exit(1); + } + + /* Make the pipe non blocking. This is just a best effort aware mechanism + * and we do not want to block not in the read nor in the write half. */ + anetNonBlock(NULL,pvar->module_blocked_pipe[0]); + anetNonBlock(NULL,pvar->module_blocked_pipe[1]); + + /* Register a readable event for the pipe used to awake the event loop + * when a blocked client in a module needs attention. */ + if (aeCreateFileEvent(pvar->el, pvar->module_blocked_pipe[0], AE_READABLE, + moduleBlockedClientPipeReadable,NULL) == AE_ERR) { + serverPanic( + "Error registering the readable event for the module " + "blocked clients subsystem."); + } + + + /* Register a readable event for the pipe used to awake the event loop + * when a blocked client in a module needs attention. */ + if (aeCreateFileEvent(pvar->el, pvar->module_blocked_pipe[0], AE_READABLE, + moduleBlockedClientPipeReadable,NULL) == AE_ERR) { + serverPanic( + "Error registering the readable event for the module " + "blocked clients subsystem."); + } } void initServer(void) { @@ -2946,25 +2981,6 @@ void initServer(void) { exit(1); } - /* Register a readable event for the pipe used to awake the event loop - * when a blocked client in a module needs attention. */ - if (aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->module_blocked_pipe[0], AE_READABLE, - moduleBlockedClientPipeReadable,NULL) == AE_ERR) { - serverPanic( - "Error registering the readable event for the module " - "blocked clients subsystem."); - } - - - /* Register a readable event for the pipe used to awake the event loop - * when a blocked client in a module needs attention. */ - if (aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->module_blocked_pipe[0], AE_READABLE, - moduleBlockedClientPipeReadable,NULL) == AE_ERR) { - serverPanic( - "Error registering the readable event for the module " - "blocked clients subsystem."); - } - /* Open the AOF file if needed. */ if (g_pserver->aof_state == AOF_ON) { g_pserver->aof_fd = open(g_pserver->aof_filename, @@ -4907,6 +4923,7 @@ void *workerThreadMain(void *parg) serverLog(LOG_INFO, "Thread %d alive.", iel); serverTL = g_pserver->rgthreadvar+iel; // set the TLS threadsafe global + moduleAcquireGIL(true); // Normally afterSleep acquires this, but that won't be called on the first run int isMainThread = (iel == IDX_EVENT_LOOP_MAIN); aeEventLoop *el = g_pserver->rgthreadvar[iel].el; aeSetBeforeSleepProc(el, isMainThread ? beforeSleep : beforeSleepLite, isMainThread ? 0 : AE_SLEEP_THREADSAFE); @@ -5143,6 +5160,7 @@ int main(int argc, char **argv) { } aeReleaseLock(); //Finally we can dump the lock + moduleReleaseGIL(true); serverAssert(cserver.cthreads > 0 && cserver.cthreads <= MAX_EVENT_LOOPS); pthread_t rgthread[MAX_EVENT_LOOPS]; diff --git a/src/server.h b/src/server.h index 7134a02bd..76e7183d3 100644 --- a/src/server.h +++ b/src/server.h @@ -1145,6 +1145,9 @@ struct redisServerThreadVars { list *clients_pending_asyncwrite; int cclients; client *current_client; /* Current client */ + int module_blocked_pipe[2]; /* Pipe used to awake the event loop if a + client blocked on a module command needs + to be processed. */ struct fastlock lockPendingWrite; }; @@ -1247,9 +1250,6 @@ struct redisServer { dict *sharedapi; /* Like moduleapi but containing the APIs that modules share with each other. */ list *loadmodule_queue; /* List of modules to load at startup. */ - int module_blocked_pipe[2]; /* Pipe used to awake the event loop if a - client blocked on a module command needs - to be processed. */ /* Networking */ int port; /* TCP listening port */ int tcp_backlog; /* TCP listen() backlog */ @@ -1665,7 +1665,7 @@ moduleType *moduleTypeLookupModuleByID(uint64_t id); void moduleTypeNameByID(char *name, uint64_t moduleid); void moduleFreeContext(struct RedisModuleCtx *ctx); void unblockClientFromModule(client *c); -void moduleHandleBlockedClients(void); +void moduleHandleBlockedClients(int iel); void moduleBlockedClientTimedOut(client *c); void moduleBlockedClientPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask); size_t moduleCount(void);