Module threading fixes

Former-commit-id: 2785a8b4d40b09caea5e209ab49fc5f1484981a8
This commit is contained in:
John Sully 2019-03-07 19:13:01 -05:00
parent d487df2caf
commit 02b030bc8c
17 changed files with 109 additions and 77 deletions

View File

@ -725,7 +725,7 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags)
/* After sleep callback. */ /* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) { if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) {
std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock); std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock);
if (!(eventLoop->beforesleepFlags & AE_SLEEP_THREADSAFE)) if (!(eventLoop->aftersleepFlags & AE_SLEEP_THREADSAFE))
ulock.lock(); ulock.lock();
eventLoop->aftersleep(eventLoop); eventLoop->aftersleep(eventLoop);
} }

View File

@ -96,7 +96,7 @@ void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
listNode *ln; listNode *ln;
aofrwblock *block; aofrwblock *block;
ssize_t nwritten; ssize_t nwritten;
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
UNUSED(el); UNUSED(el);
UNUSED(fd); UNUSED(fd);

View File

@ -100,7 +100,7 @@ int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int
* flag is set client query buffer is not longer processed, but accumulated, * flag is set client query buffer is not longer processed, but accumulated,
* and will be processed when the client is unblocked. */ * and will be processed when the client is unblocked. */
void blockClient(client *c, int btype) { void blockClient(client *c, int btype) {
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
c->flags |= CLIENT_BLOCKED; c->flags |= CLIENT_BLOCKED;
c->btype = btype; c->btype = btype;
server.blocked_clients++; server.blocked_clients++;
@ -111,7 +111,7 @@ void blockClient(client *c, int btype) {
* in order to process the pending input buffer of clients that were * in order to process the pending input buffer of clients that were
* unblocked after a blocking operation. */ * unblocked after a blocking operation. */
void processUnblockedClients(int iel) { void processUnblockedClients(int iel) {
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
listNode *ln; listNode *ln;
client *c; client *c;
@ -160,7 +160,7 @@ void processUnblockedClients(int iel) {
void queueClientForReprocessing(client *c) { void queueClientForReprocessing(client *c) {
/* The client may already be into the unblocked list because of a previous /* The client may already be into the unblocked list because of a previous
* blocking operation, don't add back it into the list multiple times. */ * blocking operation, don't add back it into the list multiple times. */
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
fastlock_lock(&c->lock); fastlock_lock(&c->lock);
if (!(c->flags & CLIENT_UNBLOCKED)) { if (!(c->flags & CLIENT_UNBLOCKED)) {
c->flags |= CLIENT_UNBLOCKED; c->flags |= CLIENT_UNBLOCKED;
@ -172,7 +172,7 @@ void queueClientForReprocessing(client *c) {
/* Unblock a client calling the right function depending on the kind /* Unblock a client calling the right function depending on the kind
* of operation the client is blocking for. */ * of operation the client is blocking for. */
void unblockClient(client *c) { void unblockClient(client *c) {
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
if (c->btype == BLOCKED_LIST || if (c->btype == BLOCKED_LIST ||
c->btype == BLOCKED_ZSET || c->btype == BLOCKED_ZSET ||
c->btype == BLOCKED_STREAM) { c->btype == BLOCKED_STREAM) {
@ -218,7 +218,7 @@ void replyToBlockedClientTimedOut(client *c) {
* The semantics is to send an -UNBLOCKED error to the client, disconnecting * The semantics is to send an -UNBLOCKED error to the client, disconnecting
* it at the same time. */ * it at the same time. */
void disconnectAllBlockedClients(void) { void disconnectAllBlockedClients(void) {
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
listNode *ln; listNode *ln;
listIter li; listIter li;
@ -260,7 +260,7 @@ void disconnectAllBlockedClients(void) {
* be used only for a single type, like virtually any Redis application will * be used only for a single type, like virtually any Redis application will
* do, the function is already fair. */ * do, the function is already fair. */
void handleClientsBlockedOnKeys(void) { void handleClientsBlockedOnKeys(void) {
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
while(listLength(server.ready_keys) != 0) { while(listLength(server.ready_keys) != 0) {
list *l; list *l;

View File

@ -5390,7 +5390,7 @@ socket_err:
* the target instance. See the Redis Cluster specification for more * the target instance. See the Redis Cluster specification for more
* information. */ * information. */
void askingCommand(client *c) { void askingCommand(client *c) {
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
if (server.cluster_enabled == 0) { if (server.cluster_enabled == 0) {
addReplyError(c,"This instance has cluster support disabled"); addReplyError(c,"This instance has cluster support disabled");
return; return;
@ -5403,7 +5403,7 @@ void askingCommand(client *c) {
* In this mode slaves will not redirect clients as long as clients access * In this mode slaves will not redirect clients as long as clients access
* with read-only commands to keys that are served by the slave's master. */ * with read-only commands to keys that are served by the slave's master. */
void readonlyCommand(client *c) { void readonlyCommand(client *c) {
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
if (server.cluster_enabled == 0) { if (server.cluster_enabled == 0) {
addReplyError(c,"This instance has cluster support disabled"); addReplyError(c,"This instance has cluster support disabled");
return; return;
@ -5414,7 +5414,7 @@ void readonlyCommand(client *c) {
/* The READWRITE command just clears the READONLY command state. */ /* The READWRITE command just clears the READONLY command state. */
void readwriteCommand(client *c) { void readwriteCommand(client *c) {
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
c->flags &= ~CLIENT_READONLY; c->flags &= ~CLIENT_READONLY;
addReply(c,shared.ok); addReply(c,shared.ok);
} }
@ -5458,7 +5458,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
multiState *ms, _ms; multiState *ms, _ms;
multiCmd mc; multiCmd mc;
int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0; int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0;
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
/* Allow any key to be set if a module disabled cluster redirections. */ /* Allow any key to be set if a module disabled cluster redirections. */
if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION) if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
@ -5671,7 +5671,7 @@ void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_co
* longer handles, the client is sent a redirection error, and the function * longer handles, the client is sent a redirection error, and the function
* returns 1. Otherwise 0 is returned and no operation is performed. */ * returns 1. Otherwise 0 is returned and no operation is performed. */
int clusterRedirectBlockedClientIfNeeded(client *c) { int clusterRedirectBlockedClientIfNeeded(client *c) {
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
if (c->flags & CLIENT_BLOCKED && if (c->flags & CLIENT_BLOCKED &&
(c->btype == BLOCKED_LIST || (c->btype == BLOCKED_LIST ||
c->btype == BLOCKED_ZSET || c->btype == BLOCKED_ZSET ||

View File

@ -99,7 +99,7 @@ robj *lookupKey(redisDb *db, robj *key, int flags) {
* expiring our key via DELs in the replication link. */ * expiring our key via DELs in the replication link. */
robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) { robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
robj *val; robj *val;
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
if (expireIfNeeded(db,key) == 1) { if (expireIfNeeded(db,key) == 1) {
/* Key expired. If we are in the context of a master, expireIfNeeded() /* Key expired. If we are in the context of a master, expireIfNeeded()
@ -1073,7 +1073,7 @@ int removeExpire(redisDb *db, robj *key) {
* after which the key will no longer be considered valid. */ * after which the key will no longer be considered valid. */
void setExpire(client *c, redisDb *db, robj *key, long long when) { void setExpire(client *c, redisDb *db, robj *key, long long when) {
dictEntry *kde, *de; dictEntry *kde, *de;
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
/* Reuse the sds from the main dict in the expire dict */ /* Reuse the sds from the main dict in the expire dict */
kde = dictFind(db->pdict,ptrFromObj(key)); kde = dictFind(db->pdict,ptrFromObj(key));
@ -1110,7 +1110,7 @@ long long getExpire(redisDb *db, robj *key) {
* will be consistent even if we allow write operations against expiring * will be consistent even if we allow write operations against expiring
* keys. */ * keys. */
void propagateExpire(redisDb *db, robj *key, int lazy) { void propagateExpire(redisDb *db, robj *key, int lazy) {
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
robj *argv[2]; robj *argv[2];
argv[0] = lazy ? shared.unlink : shared.del; argv[0] = lazy ? shared.unlink : shared.del;

View File

@ -350,7 +350,7 @@ unsigned long LFUDecrAndReturn(robj *o) {
* used memory: the eviction should use mostly data size. This function * used memory: the eviction should use mostly data size. This function
* returns the sum of AOF and slaves buffer. */ * returns the sum of AOF and slaves buffer. */
size_t freeMemoryGetNotCountedMemory(void) { size_t freeMemoryGetNotCountedMemory(void) {
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
size_t overhead = 0; size_t overhead = 0;
int slaves = listLength(server.slaves); int slaves = listLength(server.slaves);
@ -445,7 +445,7 @@ int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *lev
* Otehrwise if we are over the memory limit, but not enough memory * Otehrwise if we are over the memory limit, but not enough memory
* was freed to return back under the limit, the function returns C_ERR. */ * was freed to return back under the limit, the function returns C_ERR. */
int freeMemoryIfNeeded(void) { int freeMemoryIfNeeded(void) {
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
/* By default replicas should ignore maxmemory /* By default replicas should ignore maxmemory
* and just be masters exact copies. */ * and just be masters exact copies. */
if (server.masterhost && server.repl_slave_ignore_maxmemory) return C_OK; if (server.masterhost && server.repl_slave_ignore_maxmemory) return C_OK;

View File

@ -219,8 +219,8 @@ static list *moduleUnblockedClients;
/* We need a mutex that is unlocked / relocked in beforeSleep() in order to /* We need a mutex that is unlocked / relocked in beforeSleep() in order to
* allow thread safe contexts to execute commands at a safe moment. */ * allow thread safe contexts to execute commands at a safe moment. */
static pthread_mutex_t moduleGIL = PTHREAD_MUTEX_INITIALIZER; static pthread_rwlock_t moduleGIL = PTHREAD_RWLOCK_INITIALIZER;
int fModuleGILWlocked = FALSE;
/* Function pointer type for keyspace event notification subscriptions from modules. */ /* Function pointer type for keyspace event notification subscriptions from modules. */
typedef int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key); typedef int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key);
@ -484,7 +484,7 @@ void moduleFreeContext(RedisModuleCtx *ctx) {
* details needed to correctly replicate commands. */ * details needed to correctly replicate commands. */
void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) { void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) {
client *c = ctx->client; client *c = ctx->client;
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
if (c->flags & CLIENT_LUA) return; if (c->flags & CLIENT_LUA) return;
@ -3624,7 +3624,7 @@ void RM_SetDisconnectCallback(RedisModuleBlockedClient *bc, RedisModuleDisconnec
void moduleHandleBlockedClients(void) { void moduleHandleBlockedClients(void) {
listNode *ln; listNode *ln;
RedisModuleBlockedClient *bc; RedisModuleBlockedClient *bc;
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
pthread_mutex_lock(&moduleUnblockedClientsMutex); pthread_mutex_lock(&moduleUnblockedClientsMutex);
/* Here we unblock all the pending clients blocked in modules operations /* Here we unblock all the pending clients blocked in modules operations
@ -3824,21 +3824,37 @@ void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) {
* a blocked client connected to the thread safe context. */ * a blocked client connected to the thread safe context. */
void RM_ThreadSafeContextLock(RedisModuleCtx *ctx) { void RM_ThreadSafeContextLock(RedisModuleCtx *ctx) {
UNUSED(ctx); UNUSED(ctx);
moduleAcquireGIL(); moduleAcquireGIL(FALSE /*fServerThread*/);
} }
/* Release the server lock after a thread safe API call was executed. */ /* Release the server lock after a thread safe API call was executed. */
void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) { void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) {
UNUSED(ctx); UNUSED(ctx);
moduleReleaseGIL(); moduleReleaseGIL(FALSE /*fServerThread*/);
} }
void moduleAcquireGIL(void) { void moduleAcquireGIL(int fServerThread) {
pthread_mutex_lock(&moduleGIL); if (fServerThread)
{
pthread_rwlock_rdlock(&moduleGIL);
}
else
{
pthread_rwlock_wrlock(&moduleGIL);
fModuleGILWlocked = TRUE;
}
} }
void moduleReleaseGIL(void) { void moduleReleaseGIL(int fServerThread) {
pthread_mutex_unlock(&moduleGIL); pthread_rwlock_unlock(&moduleGIL);
if (!fServerThread)
{
fModuleGILWlocked = FALSE;
}
}
int moduleGILAcquiredByModule(void) {
return fModuleGILWlocked;
} }
@ -4694,7 +4710,8 @@ void moduleInitModulesSystem(void) {
/* Our thread-safe contexts GIL must start with already locked: /* Our thread-safe contexts GIL must start with already locked:
* it is just unlocked when it's safe. */ * it is just unlocked when it's safe. */
pthread_mutex_lock(&moduleGIL); pthread_rwlock_init(&moduleGIL, NULL);
pthread_rwlock_rdlock(&moduleGIL);
} }
/* Load all the modules in the server.loadmodule_queue list, which is /* Load all the modules in the server.loadmodule_queue list, which is

View File

@ -72,7 +72,7 @@ void queueMultiCommand(client *c) {
} }
void discardTransaction(client *c) { void discardTransaction(client *c) {
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
freeClientMultiState(c); freeClientMultiState(c);
initClientMultiState(c); initClientMultiState(c);
c->flags &= ~(CLIENT_MULTI|CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC); c->flags &= ~(CLIENT_MULTI|CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC);
@ -82,13 +82,13 @@ void discardTransaction(client *c) {
/* Flag the transacation as DIRTY_EXEC so that EXEC will fail. /* Flag the transacation as DIRTY_EXEC so that EXEC will fail.
* Should be called every time there is an error while queueing a command. */ * Should be called every time there is an error while queueing a command. */
void flagTransaction(client *c) { void flagTransaction(client *c) {
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
if (c->flags & CLIENT_MULTI) if (c->flags & CLIENT_MULTI)
c->flags |= CLIENT_DIRTY_EXEC; c->flags |= CLIENT_DIRTY_EXEC;
} }
void multiCommand(client *c) { void multiCommand(client *c) {
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
if (c->flags & CLIENT_MULTI) { if (c->flags & CLIENT_MULTI) {
addReplyError(c,"MULTI calls can not be nested"); addReplyError(c,"MULTI calls can not be nested");
return; return;
@ -294,7 +294,7 @@ void unwatchAllKeys(client *c) {
/* "Touch" a key, so that if this key is being WATCHed by some client the /* "Touch" a key, so that if this key is being WATCHed by some client the
* next EXEC will fail. */ * next EXEC will fail. */
void touchWatchedKey(redisDb *db, robj *key) { void touchWatchedKey(redisDb *db, robj *key) {
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
list *clients; list *clients;
listIter li; listIter li;
listNode *ln; listNode *ln;
@ -320,7 +320,7 @@ void touchWatchedKey(redisDb *db, robj *key) {
void touchWatchedKeysOnFlush(int dbid) { void touchWatchedKeysOnFlush(int dbid) {
listIter li1, li2; listIter li1, li2;
listNode *ln; listNode *ln;
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
/* For every client, check all the waited keys */ /* For every client, check all the waited keys */
listRewind(server.clients,&li1); listRewind(server.clients,&li1);
@ -355,7 +355,7 @@ void watchCommand(client *c) {
void unwatchCommand(client *c) { void unwatchCommand(client *c) {
unwatchAllKeys(c); unwatchAllKeys(c);
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
c->flags &= (~CLIENT_DIRTY_CAS); c->flags &= (~CLIENT_DIRTY_CAS);
addReply(c,shared.ok); addReply(c,shared.ok);
} }

View File

@ -264,7 +264,7 @@ void clientInstallWriteHandler(client *c) {
} }
void clientInstallAsyncWriteHandler(client *c) { void clientInstallAsyncWriteHandler(client *c) {
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
if (!(c->fPendingAsyncWrite)) { if (!(c->fPendingAsyncWrite)) {
c->fPendingAsyncWrite = TRUE; c->fPendingAsyncWrite = TRUE;
listAddNodeHead(serverTL->clients_pending_asyncwrite,c); listAddNodeHead(serverTL->clients_pending_asyncwrite,c);
@ -295,8 +295,8 @@ void clientInstallAsyncWriteHandler(client *c) {
* data should be appended to the output buffers. */ * data should be appended to the output buffers. */
int prepareClientToWrite(client *c, bool fAsync) { int prepareClientToWrite(client *c, bool fAsync) {
fAsync = fAsync && !FCorrectThread(c); // Not async if we're on the right thread fAsync = fAsync && !FCorrectThread(c); // Not async if we're on the right thread
serverAssert(!fAsync || aeThreadOwnsLock()); serverAssert(!fAsync || GlobalLocksAcquired());
serverAssert(c->lock.fOwnLock()); serverAssert(c->fd <= 0 || c->lock.fOwnLock());
/* If it's the Lua client we always return ok without installing any /* If it's the Lua client we always return ok without installing any
* handler since there is no socket at all. */ * handler since there is no socket at all. */
@ -331,7 +331,7 @@ int _addReplyToBuffer(client *c, const char *s, size_t len, bool fAsync) {
fAsync = fAsync && !FCorrectThread(c); // Not async if we're on the right thread fAsync = fAsync && !FCorrectThread(c); // Not async if we're on the right thread
if (fAsync) if (fAsync)
{ {
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
if ((c->buflenAsync - c->bufposAsync) < (int)len) if ((c->buflenAsync - c->bufposAsync) < (int)len)
{ {
int minsize = len + c->bufposAsync; int minsize = len + c->bufposAsync;
@ -1166,7 +1166,7 @@ static void freeClientArgv(client *c) {
* when we resync with our own master and want to force all our slaves to * when we resync with our own master and want to force all our slaves to
* resync with us as well. */ * resync with us as well. */
void disconnectSlaves(void) { void disconnectSlaves(void) {
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
listIter li; listIter li;
listNode *ln; listNode *ln;
@ -1183,8 +1183,8 @@ void disconnectSlaves(void) {
void unlinkClient(client *c) { void unlinkClient(client *c) {
listNode *ln; listNode *ln;
AssertCorrectThread(c); AssertCorrectThread(c);
serverAssert(aeThreadOwnsLock()); serverAssert(c->fd == -1 || GlobalLocksAcquired());
serverAssert(c->lock.fOwnLock()); serverAssert(c->fd == -1 || c->lock.fOwnLock());
/* If this is marked as current client unset it. */ /* If this is marked as current client unset it. */
if (server.current_client == c) server.current_client = NULL; if (server.current_client == c) server.current_client = NULL;
@ -1245,7 +1245,7 @@ void unlinkClient(client *c) {
void freeClient(client *c) { void freeClient(client *c) {
listNode *ln; listNode *ln;
serverAssert(aeThreadOwnsLock()); serverAssert(c->fd == -1 || GlobalLocksAcquired());
AssertCorrectThread(c); AssertCorrectThread(c);
std::unique_lock<decltype(c->lock)> ulock(c->lock); std::unique_lock<decltype(c->lock)> ulock(c->lock);
@ -1517,7 +1517,10 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
void ProcessPendingAsyncWrites() void ProcessPendingAsyncWrites()
{ {
serverAssert(aeThreadOwnsLock()); if (serverTL == nullptr)
return; // module fake call
serverAssert(GlobalLocksAcquired());
while(listLength(serverTL->clients_pending_asyncwrite)) { while(listLength(serverTL->clients_pending_asyncwrite)) {
client *c = (client*)listNodeValue(listFirst(serverTL->clients_pending_asyncwrite)); client *c = (client*)listNodeValue(listFirst(serverTL->clients_pending_asyncwrite));
@ -2770,7 +2773,7 @@ void asyncCloseClientOnOutputBufferLimitReached(client *c) {
* This is also called by SHUTDOWN for a best-effort attempt to send * This is also called by SHUTDOWN for a best-effort attempt to send
* slaves the latest writes. */ * slaves the latest writes. */
void flushSlavesOutputBuffers(void) { void flushSlavesOutputBuffers(void) {
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
listIter li; listIter li;
listNode *ln; listNode *ln;

View File

@ -943,7 +943,7 @@ void freeMemoryOverheadData(struct redisMemOverhead *mh) {
* information used for the MEMORY OVERHEAD and INFO command. The returned * information used for the MEMORY OVERHEAD and INFO command. The returned
* structure pointer should be freed calling freeMemoryOverheadData(). */ * structure pointer should be freed calling freeMemoryOverheadData(). */
struct redisMemOverhead *getMemoryOverheadData(void) { struct redisMemOverhead *getMemoryOverheadData(void) {
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
int j; int j;
size_t mem_total = 0; size_t mem_total = 0;
size_t mem = 0; size_t mem = 0;
@ -1083,7 +1083,7 @@ void inputCatSds(void *result, const char *str) {
/* This implements MEMORY DOCTOR. An human readable analysis of the Redis /* This implements MEMORY DOCTOR. An human readable analysis of the Redis
* memory condition. */ * memory condition. */
sds getMemoryDoctorReport(void) { sds getMemoryDoctorReport(void) {
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
int empty = 0; /* Instance is empty or almost empty. */ int empty = 0; /* Instance is empty or almost empty. */
int big_peak = 0; /* Memory peak is much larger than used mem. */ int big_peak = 0; /* Memory peak is much larger than used mem. */
int high_frag = 0; /* High fragmentation. */ int high_frag = 0; /* High fragmentation. */

View File

@ -329,7 +329,7 @@ int pubsubPublishMessage(robj *channel, robj *message) {
void subscribeCommand(client *c) { void subscribeCommand(client *c) {
int j; int j;
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
for (j = 1; j < c->argc; j++) for (j = 1; j < c->argc; j++)
pubsubSubscribeChannel(c,c->argv[j]); pubsubSubscribeChannel(c,c->argv[j]);
@ -350,7 +350,7 @@ void unsubscribeCommand(client *c) {
void psubscribeCommand(client *c) { void psubscribeCommand(client *c) {
int j; int j;
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
for (j = 1; j < c->argc; j++) for (j = 1; j < c->argc; j++)
pubsubSubscribePattern(c,c->argv[j]); pubsubSubscribePattern(c,c->argv[j]);

View File

@ -2157,7 +2157,7 @@ void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) {
* This function covers the case of RDB -> Salves socket transfers for * This function covers the case of RDB -> Salves socket transfers for
* diskless replication. */ * diskless replication. */
void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) { void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) {
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
uint64_t *ok_slaves; uint64_t *ok_slaves;
if (!bysignal && exitcode == 0) { if (!bysignal && exitcode == 0) {
@ -2277,7 +2277,7 @@ void killRDBChild(void) {
/* Spawn an RDB child that writes the RDB to the sockets of the slaves /* Spawn an RDB child that writes the RDB to the sockets of the slaves
* that are currently in SLAVE_STATE_WAIT_BGSAVE_START state. */ * that are currently in SLAVE_STATE_WAIT_BGSAVE_START state. */
int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
int *fds; int *fds;
uint64_t *clientids; uint64_t *clientids;
int numfds; int numfds;

View File

@ -116,7 +116,7 @@ void resizeReplicationBacklog(long long newsize) {
} }
void freeReplicationBacklog(void) { void freeReplicationBacklog(void) {
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
serverAssert(listLength(server.slaves) == 0); serverAssert(listLength(server.slaves) == 0);
zfree(server.repl_backlog); zfree(server.repl_backlog);
server.repl_backlog = NULL; server.repl_backlog = NULL;
@ -127,7 +127,7 @@ void freeReplicationBacklog(void) {
* server.master_repl_offset, because there is no case where we want to feed * server.master_repl_offset, because there is no case where we want to feed
* the backlog without incrementing the offset. */ * the backlog without incrementing the offset. */
void feedReplicationBacklog(void *ptr, size_t len) { void feedReplicationBacklog(void *ptr, size_t len) {
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
unsigned char *p = (unsigned char*)ptr; unsigned char *p = (unsigned char*)ptr;
server.master_repl_offset += len; server.master_repl_offset += len;
@ -179,7 +179,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
listIter li; listIter li;
int j, len; int j, len;
char llstr[LONG_STR_SIZE]; char llstr[LONG_STR_SIZE];
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
/* If the instance is not a top level master, return ASAP: we'll just proxy /* If the instance is not a top level master, return ASAP: we'll just proxy
* the stream of data we receive from our master instead, in order to * the stream of data we receive from our master instead, in order to
@ -328,7 +328,7 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv,
sds cmdrepr = sdsnew("+"); sds cmdrepr = sdsnew("+");
robj *cmdobj; robj *cmdobj;
struct timeval tv; struct timeval tv;
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
gettimeofday(&tv,NULL); gettimeofday(&tv,NULL);
cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec); cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec);
@ -468,7 +468,7 @@ int replicationSetupSlaveForFullResync(client *slave, long long offset) {
* On success return C_OK, otherwise C_ERR is returned and we proceed * On success return C_OK, otherwise C_ERR is returned and we proceed
* with the usual full resync. */ * with the usual full resync. */
int masterTryPartialResynchronization(client *c) { int masterTryPartialResynchronization(client *c) {
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
long long psync_offset, psync_len; long long psync_offset, psync_len;
char *master_replid = (char*)ptrFromObj(c->argv[1]); char *master_replid = (char*)ptrFromObj(c->argv[1]);
char buf[128]; char buf[128];
@ -588,7 +588,7 @@ need_full_resync:
* *
* Returns C_OK on success or C_ERR otherwise. */ * Returns C_OK on success or C_ERR otherwise. */
int startBgsaveForReplication(int mincapa) { int startBgsaveForReplication(int mincapa) {
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
int retval; int retval;
int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF); int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF);
listIter li; listIter li;
@ -975,7 +975,7 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type)
listIter li; listIter li;
int startbgsave = 0; int startbgsave = 0;
int mincapa = -1; int mincapa = -1;
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
listRewind(server.slaves,&li); listRewind(server.slaves,&li);
while((ln = listNext(&li))) { while((ln = listNext(&li))) {
@ -1165,7 +1165,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
UNUSED(privdata); UNUSED(privdata);
UNUSED(mask); UNUSED(mask);
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
/* Static vars used to hold the EOF mark, and the last bytes received /* Static vars used to hold the EOF mark, and the last bytes received
* form the server: when they match, we reached the end of the transfer. */ * form the server: when they match, we reached the end of the transfer. */
@ -1651,7 +1651,7 @@ int slaveTryPartialResynchronization(aeEventLoop *el, int fd, int read_reply) {
/* This handler fires when the non blocking connect was able to /* This handler fires when the non blocking connect was able to
* establish a connection with the master. */ * establish a connection with the master. */
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
char tmpfile[256], *err = NULL; char tmpfile[256], *err = NULL;
int dfd = -1, maxtries = 5; int dfd = -1, maxtries = 5;
int sockerr = 0, psync_result; int sockerr = 0, psync_result;
@ -2603,7 +2603,7 @@ long long replicationGetSlaveOffset(void) {
/* Replication cron function, called 1 time per second. */ /* Replication cron function, called 1 time per second. */
void replicationCron(void) { void replicationCron(void) {
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
static long long replication_cron_loops = 0; static long long replication_cron_loops = 0;
std::unique_lock<decltype(server.master->lock)> ulock; std::unique_lock<decltype(server.master->lock)> ulock;
if (server.master != nullptr) if (server.master != nullptr)

View File

@ -377,7 +377,7 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
// Ensure our client is on the right thread // Ensure our client is on the right thread
serverAssert(!(c->flags & CLIENT_PENDING_WRITE)); serverAssert(!(c->flags & CLIENT_PENDING_WRITE));
serverAssert(!(c->flags & CLIENT_UNBLOCKED)); serverAssert(!(c->flags & CLIENT_UNBLOCKED));
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
c->iel = serverTL - server.rgthreadvar; c->iel = serverTL - server.rgthreadvar;
/* Cached across calls. */ /* Cached across calls. */

View File

@ -3981,7 +3981,7 @@ int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port) {
/* Setup the master state to start a failover. */ /* Setup the master state to start a failover. */
void sentinelStartFailover(sentinelRedisInstance *master) { void sentinelStartFailover(sentinelRedisInstance *master) {
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
serverAssert(master->flags & SRI_MASTER); serverAssert(master->flags & SRI_MASTER);
master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START; master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
@ -4174,7 +4174,7 @@ void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
} }
void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) { void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
sentinelRedisInstance *slave = sentinelSelectSlave(ri); sentinelRedisInstance *slave = sentinelSelectSlave(ri);
/* We don't handle the timeout in this state as the function aborts /* We don't handle the timeout in this state as the function aborts
@ -4299,7 +4299,7 @@ void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
dictIterator *di; dictIterator *di;
dictEntry *de; dictEntry *de;
int in_progress = 0; int in_progress = 0;
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
di = dictGetIterator(master->slaves); di = dictGetIterator(master->slaves);
while((de = dictNext(di)) != NULL) { while((de = dictNext(di)) != NULL) {

View File

@ -2116,7 +2116,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
/* Before we are going to sleep, let the threads access the dataset by /* Before we are going to sleep, let the threads access the dataset by
* releasing the GIL. Redis main thread will not touch anything at this * releasing the GIL. Redis main thread will not touch anything at this
* time. */ * time. */
if (moduleCount()) moduleReleaseGIL(); if (moduleCount()) moduleReleaseGIL(TRUE /*fServerThread*/);
} }
void beforeSleepLite(struct aeEventLoop *eventLoop) void beforeSleepLite(struct aeEventLoop *eventLoop)
@ -2132,6 +2132,11 @@ void beforeSleepLite(struct aeEventLoop *eventLoop)
/* Handle writes with pending output buffers. */ /* Handle writes with pending output buffers. */
handleClientsWithPendingWrites(iel); handleClientsWithPendingWrites(iel);
/* Before we are going to sleep, let the threads access the dataset by
* releasing the GIL. Redis main thread will not touch anything at this
* time. */
if (moduleCount()) moduleReleaseGIL(TRUE /*fServerThread*/);
} }
/* This function is called immadiately after the event loop multiplexing /* This function is called immadiately after the event loop multiplexing
@ -2139,7 +2144,7 @@ void beforeSleepLite(struct aeEventLoop *eventLoop)
* the different events callbacks. */ * the different events callbacks. */
void afterSleep(struct aeEventLoop *eventLoop) { void afterSleep(struct aeEventLoop *eventLoop) {
UNUSED(eventLoop); UNUSED(eventLoop);
if (moduleCount()) moduleAcquireGIL(); if (moduleCount()) moduleAcquireGIL(TRUE /*fServerThread*/);
} }
/* =========================== Server initialization ======================== */ /* =========================== Server initialization ======================== */
@ -3142,7 +3147,7 @@ struct redisCommand *lookupCommandOrOriginal(sds name) {
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags) int flags)
{ {
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF) if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
feedAppendOnlyFile(cmd,dbid,argv,argc); feedAppendOnlyFile(cmd,dbid,argv,argc);
if (flags & PROPAGATE_REPL) if (flags & PROPAGATE_REPL)
@ -3243,7 +3248,7 @@ void call(client *c, int flags) {
long long dirty, start, duration; long long dirty, start, duration;
int client_old_flags = c->flags; int client_old_flags = c->flags;
struct redisCommand *real_cmd = c->cmd; struct redisCommand *real_cmd = c->cmd;
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
/* Sent the command to clients in MONITOR mode, only if the commands are /* Sent the command to clients in MONITOR mode, only if the commands are
* not generated from reading an AOF. */ * not generated from reading an AOF. */
@ -3373,7 +3378,7 @@ void call(client *c, int flags) {
* other operations can be performed by the caller. Otherwise * other operations can be performed by the caller. Otherwise
* if C_ERR is returned the client was destroyed (i.e. after QUIT). */ * if C_ERR is returned the client was destroyed (i.e. after QUIT). */
int processCommand(client *c) { int processCommand(client *c) {
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
/* The QUIT command is handled separately. Normal command procs will /* The QUIT command is handled separately. Normal command procs will
* go through checking for replication and QUIT will cause trouble * go through checking for replication and QUIT will cause trouble
* when FORCE_REPLICATION is enabled and would be implemented in * when FORCE_REPLICATION is enabled and would be implemented in
@ -3385,7 +3390,7 @@ int processCommand(client *c) {
} }
AssertCorrectThread(c); AssertCorrectThread(c);
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
/* Now lookup the command and check ASAP about trivial error conditions /* Now lookup the command and check ASAP about trivial error conditions
* such as wrong arity, bad command name and so forth. */ * such as wrong arity, bad command name and so forth. */
@ -4462,7 +4467,7 @@ void infoCommand(client *c) {
void monitorCommand(client *c) { void monitorCommand(client *c) {
/* ignore MONITOR if already slave or in monitor mode */ /* ignore MONITOR if already slave or in monitor mode */
serverAssert(aeThreadOwnsLock()); serverAssert(GlobalLocksAcquired());
if (c->flags & CLIENT_SLAVE) return; if (c->flags & CLIENT_SLAVE) return;
c->flags |= (CLIENT_SLAVE|CLIENT_MONITOR); c->flags |= (CLIENT_SLAVE|CLIENT_MONITOR);
@ -4819,7 +4824,7 @@ void *workerThreadMain(void *parg)
int isMainThread = (iel == IDX_EVENT_LOOP_MAIN); int isMainThread = (iel == IDX_EVENT_LOOP_MAIN);
aeEventLoop *el = server.rgthreadvar[iel].el; aeEventLoop *el = server.rgthreadvar[iel].el;
aeSetBeforeSleepProc(el, isMainThread ? beforeSleep : beforeSleepLite, isMainThread ? 0 : AE_SLEEP_THREADSAFE); aeSetBeforeSleepProc(el, isMainThread ? beforeSleep : beforeSleepLite, isMainThread ? 0 : AE_SLEEP_THREADSAFE);
aeSetAfterSleepProc(el, isMainThread ? afterSleep : NULL, 0); aeSetAfterSleepProc(el, afterSleep, AE_SLEEP_THREADSAFE);
aeMain(el); aeMain(el);
aeDeleteEventLoop(el); aeDeleteEventLoop(el);
return NULL; return NULL;

View File

@ -1546,11 +1546,10 @@ void moduleHandleBlockedClients(void);
void moduleBlockedClientTimedOut(client *c); void moduleBlockedClientTimedOut(client *c);
void moduleBlockedClientPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask); void moduleBlockedClientPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask);
size_t moduleCount(void); size_t moduleCount(void);
void moduleAcquireGIL(void); void moduleAcquireGIL(int fServerThread);
void moduleReleaseGIL(void); void moduleReleaseGIL(int fServerThread);
void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid); void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid);
/* Utils */ /* Utils */
long long ustime(void); long long ustime(void);
long long mstime(void); long long mstime(void);
@ -2352,6 +2351,12 @@ void mixDigest(unsigned char *digest, void *ptr, size_t len);
void xorDigest(unsigned char *digest, void *ptr, size_t len); void xorDigest(unsigned char *digest, void *ptr, size_t len);
int populateCommandTableParseFlags(struct redisCommand *c, char *strflags); int populateCommandTableParseFlags(struct redisCommand *c, char *strflags);
int moduleGILAcquiredByModule(void);
static inline int GlobalLocksAcquired(void) // Used in asserts to verify all global locks are correctly acquired for a server-thread to operate
{
return aeThreadOwnsLock() || moduleGILAcquiredByModule();
}
inline int ielFromEventLoop(const aeEventLoop *eventLoop) inline int ielFromEventLoop(const aeEventLoop *eventLoop)
{ {
int iel = 0; int iel = 0;
@ -2366,7 +2371,9 @@ inline int ielFromEventLoop(const aeEventLoop *eventLoop)
inline int FCorrectThread(client *c) inline int FCorrectThread(client *c)
{ {
return server.rgthreadvar[c->iel].el == serverTL->el; return (serverTL != NULL && (server.rgthreadvar[c->iel].el == serverTL->el))
|| (c->iel == IDX_EVENT_LOOP_MAIN && moduleGILAcquiredByModule())
|| (c->fd == -1);
} }
#define AssertCorrectThread(c) serverAssert(FCorrectThread(c)) #define AssertCorrectThread(c) serverAssert(FCorrectThread(c))