Fix module locking issues

Former-commit-id: bf26959b722285f9b8caedb853e712d5b4ce6b3f
This commit is contained in:
John Sully 2019-05-09 19:00:27 -04:00
parent 6ae796cb91
commit 3d2b9e3ab8
4 changed files with 96 additions and 53 deletions

View File

@ -30,6 +30,8 @@
#include "server.h"
#include "cluster.h"
#include <dlfcn.h>
#include <mutex>
#include <condition_variable>
#define REDISMODULE_CORE 1
#include "redismodule.h"
@ -235,7 +237,6 @@ static list *moduleUnblockedClients;
/* We need a mutex that is unlocked / relocked in beforeSleep() in order to
* allow thread safe contexts to execute commands at a safe moment. */
static pthread_rwlock_t moduleGIL = PTHREAD_RWLOCK_INITIALIZER;
int fModuleGILWlocked = FALSE;
/* Function pointer type for keyspace event notification subscriptions from modules. */
@ -293,6 +294,12 @@ typedef struct RedisModuleCommandFilter {
/* Registered filters */
static list *moduleCommandFilters;
/* Module GIL Variables */
static int s_cAcquisitionsServer = 0;
static int s_cAcquisitionsModule = 0;
static std::mutex s_mutex;
static std::condition_variable s_cv;
/* --------------------------------------------------------------------------
* Prototypes
* -------------------------------------------------------------------------- */
@ -2750,7 +2757,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
int replicate = 0; /* Replicate this command? */
int call_flags;
sds proto = nullptr;
/* Create the client and dispatch the command. */
va_start(ap, fmt);
c = createClient(-1, IDX_EVENT_LOOP_MAIN);
@ -3663,9 +3670,22 @@ int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) {
pthread_mutex_lock(&moduleUnblockedClientsMutex);
bc->privdata = privdata;
listAddNodeTail(moduleUnblockedClients,bc);
if (write(g_pserver->module_blocked_pipe[1],"A",1) != 1) {
/* Ignore the error, this is best-effort. */
if (bc->client != nullptr)
{
if (write(g_pserver->rgthreadvar[bc->client->iel].module_blocked_pipe[1],"A",1) != 1) {
/* Ignore the error, this is best-effort. */
}
}
else
{
for (int iel = 0; iel < cserver.cthreads; ++iel)
{
if (write(g_pserver->rgthreadvar[iel].module_blocked_pipe[1],"A",1) != 1) {
/* Ignore the error, this is best-effort. */
}
}
}
pthread_mutex_unlock(&moduleUnblockedClientsMutex);
return REDISMODULE_OK;
}
@ -3706,8 +3726,7 @@ void RM_SetDisconnectCallback(RedisModuleBlockedClient *bc, RedisModuleDisconnec
* blocked client, it was terminated by Redis (for timeout or other reasons).
* When this happens the RedisModuleBlockedClient structure in the queue
* will have the 'client' field set to NULL. */
void moduleHandleBlockedClients(void) {
listNode *ln;
void moduleHandleBlockedClients(int iel) {
RedisModuleBlockedClient *bc;
serverAssert(GlobalLocksAcquired());
@ -3715,12 +3734,16 @@ void moduleHandleBlockedClients(void) {
/* Here we unblock all the pending clients blocked in modules operations
* so we can read every pending "awake byte" in the pipe. */
char buf[1];
while (read(g_pserver->module_blocked_pipe[0],buf,1) == 1);
while (listLength(moduleUnblockedClients)) {
ln = listFirst(moduleUnblockedClients);
while (read(serverTL->module_blocked_pipe[0],buf,1) == 1);
listIter li;
listNode *ln;
listRewind(moduleUnblockedClients, &li);
while ((ln = listNext(&li))) {
bc = (RedisModuleBlockedClient*)ln->value;
client *c = bc->client;
serverAssert(c->iel == IDX_EVENT_LOOP_MAIN);
if ((c != nullptr) && (iel != c->iel))
continue;
listDelNode(moduleUnblockedClients,ln);
pthread_mutex_unlock(&moduleUnblockedClientsMutex);
@ -3919,23 +3942,36 @@ void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) {
}
void moduleAcquireGIL(int fServerThread) {
std::unique_lock<std::mutex> lock(s_mutex);
int *pcheck = fServerThread ? &s_cAcquisitionsModule : &s_cAcquisitionsServer;
while (*pcheck > 0)
s_cv.wait(lock);
if (fServerThread)
{
pthread_rwlock_rdlock(&moduleGIL);
++s_cAcquisitionsServer;
}
else
{
pthread_rwlock_wrlock(&moduleGIL);
fModuleGILWlocked = TRUE;
++s_cAcquisitionsModule;
fModuleGILWlocked++;
}
}
void moduleReleaseGIL(int fServerThread) {
pthread_rwlock_unlock(&moduleGIL);
if (!fServerThread)
std::unique_lock<std::mutex> lock(s_mutex);
if (fServerThread)
{
fModuleGILWlocked = FALSE;
--s_cAcquisitionsServer;
}
else
{
--s_cAcquisitionsModule;
fModuleGILWlocked--;
}
s_cv.notify_all();
}
int moduleGILAcquiredByModule(void) {
@ -5102,24 +5138,13 @@ void moduleInitModulesSystem(void) {
moduleCommandFilters = listCreate();
moduleRegisterCoreAPI();
if (pipe(g_pserver->module_blocked_pipe) == -1) {
serverLog(LL_WARNING,
"Can't create the pipe for module blocking commands: %s",
strerror(errno));
exit(1);
}
/* Make the pipe non blocking. This is just a best effort aware mechanism
* and we do not want to block not in the read nor in the write half. */
anetNonBlock(NULL,g_pserver->module_blocked_pipe[0]);
anetNonBlock(NULL,g_pserver->module_blocked_pipe[1]);
/* Create the timers radix tree. */
Timers = raxNew();
/* Our thread-safe contexts GIL must start with already locked:
* it is just unlocked when it's safe. */
pthread_rwlock_init(&moduleGIL, NULL);
pthread_rwlock_rdlock(&moduleGIL);
moduleAcquireGIL(true);
}
/* Load all the modules in the g_pserver->loadmodule_queue list, which is

View File

@ -1200,7 +1200,7 @@ void unlinkClient(client *c) {
serverAssert(c->fd == -1 || c->lock.fOwnLock());
/* If this is marked as current client unset it. */
if (serverTL->current_client == c) serverTL->current_client = NULL;
if (serverTL && serverTL->current_client == c) serverTL->current_client = NULL;
/* Certain operations must be done only if the client has an active socket.
* If the client was already unlinked or if it's a "fake client" the

View File

@ -2103,7 +2103,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
/* Check if there are clients unblocked by modules that implement
* blocking commands. */
moduleHandleBlockedClients();
moduleHandleBlockedClients(ielFromEventLoop(eventLoop));
/* Try to process pending commands for clients that were just unblocked. */
if (listLength(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].unblocked_clients))
@ -2134,6 +2134,10 @@ void beforeSleepLite(struct aeEventLoop *eventLoop)
if (listLength(g_pserver->rgthreadvar[iel].unblocked_clients)) {
processUnblockedClients(iel);
}
/* Check if there are clients unblocked by modules that implement
* blocking commands. */
moduleHandleBlockedClients(ielFromEventLoop(eventLoop));
aeReleaseLock();
/* Handle writes with pending output buffers. */
@ -2872,6 +2876,37 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain)
exit(1);
}
}
if (pipe(pvar->module_blocked_pipe) == -1) {
serverLog(LL_WARNING,
"Can't create the pipe for module blocking commands: %s",
strerror(errno));
exit(1);
}
/* Make the pipe non blocking. This is just a best effort aware mechanism
* and we do not want to block not in the read nor in the write half. */
anetNonBlock(NULL,pvar->module_blocked_pipe[0]);
anetNonBlock(NULL,pvar->module_blocked_pipe[1]);
/* Register a readable event for the pipe used to awake the event loop
* when a blocked client in a module needs attention. */
if (aeCreateFileEvent(pvar->el, pvar->module_blocked_pipe[0], AE_READABLE,
moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
serverPanic(
"Error registering the readable event for the module "
"blocked clients subsystem.");
}
/* Register a readable event for the pipe used to awake the event loop
* when a blocked client in a module needs attention. */
if (aeCreateFileEvent(pvar->el, pvar->module_blocked_pipe[0], AE_READABLE,
moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
serverPanic(
"Error registering the readable event for the module "
"blocked clients subsystem.");
}
}
void initServer(void) {
@ -2946,25 +2981,6 @@ void initServer(void) {
exit(1);
}
/* Register a readable event for the pipe used to awake the event loop
* when a blocked client in a module needs attention. */
if (aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->module_blocked_pipe[0], AE_READABLE,
moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
serverPanic(
"Error registering the readable event for the module "
"blocked clients subsystem.");
}
/* Register a readable event for the pipe used to awake the event loop
* when a blocked client in a module needs attention. */
if (aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->module_blocked_pipe[0], AE_READABLE,
moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
serverPanic(
"Error registering the readable event for the module "
"blocked clients subsystem.");
}
/* Open the AOF file if needed. */
if (g_pserver->aof_state == AOF_ON) {
g_pserver->aof_fd = open(g_pserver->aof_filename,
@ -4907,6 +4923,7 @@ void *workerThreadMain(void *parg)
serverLog(LOG_INFO, "Thread %d alive.", iel);
serverTL = g_pserver->rgthreadvar+iel; // set the TLS threadsafe global
moduleAcquireGIL(true); // Normally afterSleep acquires this, but that won't be called on the first run
int isMainThread = (iel == IDX_EVENT_LOOP_MAIN);
aeEventLoop *el = g_pserver->rgthreadvar[iel].el;
aeSetBeforeSleepProc(el, isMainThread ? beforeSleep : beforeSleepLite, isMainThread ? 0 : AE_SLEEP_THREADSAFE);
@ -5143,6 +5160,7 @@ int main(int argc, char **argv) {
}
aeReleaseLock(); //Finally we can dump the lock
moduleReleaseGIL(true);
serverAssert(cserver.cthreads > 0 && cserver.cthreads <= MAX_EVENT_LOOPS);
pthread_t rgthread[MAX_EVENT_LOOPS];

View File

@ -1145,6 +1145,9 @@ struct redisServerThreadVars {
list *clients_pending_asyncwrite;
int cclients;
client *current_client; /* Current client */
int module_blocked_pipe[2]; /* Pipe used to awake the event loop if a
client blocked on a module command needs
to be processed. */
struct fastlock lockPendingWrite;
};
@ -1247,9 +1250,6 @@ struct redisServer {
dict *sharedapi; /* Like moduleapi but containing the APIs that
modules share with each other. */
list *loadmodule_queue; /* List of modules to load at startup. */
int module_blocked_pipe[2]; /* Pipe used to awake the event loop if a
client blocked on a module command needs
to be processed. */
/* Networking */
int port; /* TCP listening port */
int tcp_backlog; /* TCP listen() backlog */
@ -1665,7 +1665,7 @@ moduleType *moduleTypeLookupModuleByID(uint64_t id);
void moduleTypeNameByID(char *name, uint64_t moduleid);
void moduleFreeContext(struct RedisModuleCtx *ctx);
void unblockClientFromModule(client *c);
void moduleHandleBlockedClients(void);
void moduleHandleBlockedClients(int iel);
void moduleBlockedClientTimedOut(client *c);
void moduleBlockedClientPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask);
size_t moduleCount(void);