diff --git a/src/ae.cpp b/src/ae.cpp index 60e22db83..244300fed 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -725,7 +725,7 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) /* After sleep callback. */ if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) { std::unique_lock ulock(g_lock, std::defer_lock); - if (!(eventLoop->beforesleepFlags & AE_SLEEP_THREADSAFE)) + if (!(eventLoop->aftersleepFlags & AE_SLEEP_THREADSAFE)) ulock.lock(); eventLoop->aftersleep(eventLoop); } diff --git a/src/aof.c b/src/aof.c index f19affc64..e5153cd98 100644 --- a/src/aof.c +++ b/src/aof.c @@ -96,7 +96,7 @@ void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) { listNode *ln; aofrwblock *block; ssize_t nwritten; - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); UNUSED(el); UNUSED(fd); diff --git a/src/blocked.c b/src/blocked.c index ad7113d52..b0fb127dc 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -100,7 +100,7 @@ int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int * flag is set client query buffer is not longer processed, but accumulated, * and will be processed when the client is unblocked. */ void blockClient(client *c, int btype) { - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); c->flags |= CLIENT_BLOCKED; c->btype = btype; server.blocked_clients++; @@ -111,7 +111,7 @@ void blockClient(client *c, int btype) { * in order to process the pending input buffer of clients that were * unblocked after a blocking operation. */ void processUnblockedClients(int iel) { - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); listNode *ln; client *c; @@ -160,7 +160,7 @@ void processUnblockedClients(int iel) { void queueClientForReprocessing(client *c) { /* The client may already be into the unblocked list because of a previous * blocking operation, don't add back it into the list multiple times. */ - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); fastlock_lock(&c->lock); if (!(c->flags & CLIENT_UNBLOCKED)) { c->flags |= CLIENT_UNBLOCKED; @@ -172,7 +172,7 @@ void queueClientForReprocessing(client *c) { /* Unblock a client calling the right function depending on the kind * of operation the client is blocking for. */ void unblockClient(client *c) { - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); if (c->btype == BLOCKED_LIST || c->btype == BLOCKED_ZSET || c->btype == BLOCKED_STREAM) { @@ -218,7 +218,7 @@ void replyToBlockedClientTimedOut(client *c) { * The semantics is to send an -UNBLOCKED error to the client, disconnecting * it at the same time. */ void disconnectAllBlockedClients(void) { - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); listNode *ln; listIter li; @@ -260,7 +260,7 @@ void disconnectAllBlockedClients(void) { * be used only for a single type, like virtually any Redis application will * do, the function is already fair. */ void handleClientsBlockedOnKeys(void) { - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); while(listLength(server.ready_keys) != 0) { list *l; diff --git a/src/cluster.c b/src/cluster.c index 946332fa1..13e303b3d 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -5390,7 +5390,7 @@ socket_err: * the target instance. See the Redis Cluster specification for more * information. */ void askingCommand(client *c) { - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); if (server.cluster_enabled == 0) { addReplyError(c,"This instance has cluster support disabled"); return; @@ -5403,7 +5403,7 @@ void askingCommand(client *c) { * In this mode slaves will not redirect clients as long as clients access * with read-only commands to keys that are served by the slave's master. */ void readonlyCommand(client *c) { - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); if (server.cluster_enabled == 0) { addReplyError(c,"This instance has cluster support disabled"); return; @@ -5414,7 +5414,7 @@ void readonlyCommand(client *c) { /* The READWRITE command just clears the READONLY command state. */ void readwriteCommand(client *c) { - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); c->flags &= ~CLIENT_READONLY; addReply(c,shared.ok); } @@ -5458,7 +5458,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in multiState *ms, _ms; multiCmd mc; int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0; - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); /* Allow any key to be set if a module disabled cluster redirections. */ if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION) @@ -5671,7 +5671,7 @@ void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_co * longer handles, the client is sent a redirection error, and the function * returns 1. Otherwise 0 is returned and no operation is performed. */ int clusterRedirectBlockedClientIfNeeded(client *c) { - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); if (c->flags & CLIENT_BLOCKED && (c->btype == BLOCKED_LIST || c->btype == BLOCKED_ZSET || diff --git a/src/db.c b/src/db.c index 67631a597..d2946b341 100644 --- a/src/db.c +++ b/src/db.c @@ -99,7 +99,7 @@ robj *lookupKey(redisDb *db, robj *key, int flags) { * expiring our key via DELs in the replication link. */ robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) { robj *val; - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); if (expireIfNeeded(db,key) == 1) { /* Key expired. If we are in the context of a master, expireIfNeeded() @@ -1073,7 +1073,7 @@ int removeExpire(redisDb *db, robj *key) { * after which the key will no longer be considered valid. */ void setExpire(client *c, redisDb *db, robj *key, long long when) { dictEntry *kde, *de; - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); /* Reuse the sds from the main dict in the expire dict */ kde = dictFind(db->pdict,ptrFromObj(key)); @@ -1110,7 +1110,7 @@ long long getExpire(redisDb *db, robj *key) { * will be consistent even if we allow write operations against expiring * keys. */ void propagateExpire(redisDb *db, robj *key, int lazy) { - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); robj *argv[2]; argv[0] = lazy ? shared.unlink : shared.del; diff --git a/src/evict.c b/src/evict.c index 48d6d0387..cd270c257 100644 --- a/src/evict.c +++ b/src/evict.c @@ -350,7 +350,7 @@ unsigned long LFUDecrAndReturn(robj *o) { * used memory: the eviction should use mostly data size. This function * returns the sum of AOF and slaves buffer. */ size_t freeMemoryGetNotCountedMemory(void) { - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); size_t overhead = 0; int slaves = listLength(server.slaves); @@ -445,7 +445,7 @@ int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *lev * Otehrwise if we are over the memory limit, but not enough memory * was freed to return back under the limit, the function returns C_ERR. */ int freeMemoryIfNeeded(void) { - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); /* By default replicas should ignore maxmemory * and just be masters exact copies. */ if (server.masterhost && server.repl_slave_ignore_maxmemory) return C_OK; diff --git a/src/module.c b/src/module.c index 7cb059b34..a0e61fa9e 100644 --- a/src/module.c +++ b/src/module.c @@ -219,8 +219,8 @@ static list *moduleUnblockedClients; /* We need a mutex that is unlocked / relocked in beforeSleep() in order to * allow thread safe contexts to execute commands at a safe moment. */ -static pthread_mutex_t moduleGIL = PTHREAD_MUTEX_INITIALIZER; - +static pthread_rwlock_t moduleGIL = PTHREAD_RWLOCK_INITIALIZER; +int fModuleGILWlocked = FALSE; /* Function pointer type for keyspace event notification subscriptions from modules. */ typedef int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key); @@ -484,7 +484,7 @@ void moduleFreeContext(RedisModuleCtx *ctx) { * details needed to correctly replicate commands. */ void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) { client *c = ctx->client; - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); if (c->flags & CLIENT_LUA) return; @@ -3624,7 +3624,7 @@ void RM_SetDisconnectCallback(RedisModuleBlockedClient *bc, RedisModuleDisconnec void moduleHandleBlockedClients(void) { listNode *ln; RedisModuleBlockedClient *bc; - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); pthread_mutex_lock(&moduleUnblockedClientsMutex); /* Here we unblock all the pending clients blocked in modules operations @@ -3824,21 +3824,37 @@ void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) { * a blocked client connected to the thread safe context. */ void RM_ThreadSafeContextLock(RedisModuleCtx *ctx) { UNUSED(ctx); - moduleAcquireGIL(); + moduleAcquireGIL(FALSE /*fServerThread*/); } /* Release the server lock after a thread safe API call was executed. */ void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) { UNUSED(ctx); - moduleReleaseGIL(); + moduleReleaseGIL(FALSE /*fServerThread*/); } -void moduleAcquireGIL(void) { - pthread_mutex_lock(&moduleGIL); +void moduleAcquireGIL(int fServerThread) { + if (fServerThread) + { + pthread_rwlock_rdlock(&moduleGIL); + } + else + { + pthread_rwlock_wrlock(&moduleGIL); + fModuleGILWlocked = TRUE; + } } -void moduleReleaseGIL(void) { - pthread_mutex_unlock(&moduleGIL); +void moduleReleaseGIL(int fServerThread) { + pthread_rwlock_unlock(&moduleGIL); + if (!fServerThread) + { + fModuleGILWlocked = FALSE; + } +} + +int moduleGILAcquiredByModule(void) { + return fModuleGILWlocked; } @@ -4694,7 +4710,8 @@ void moduleInitModulesSystem(void) { /* Our thread-safe contexts GIL must start with already locked: * it is just unlocked when it's safe. */ - pthread_mutex_lock(&moduleGIL); + pthread_rwlock_init(&moduleGIL, NULL); + pthread_rwlock_rdlock(&moduleGIL); } /* Load all the modules in the server.loadmodule_queue list, which is diff --git a/src/multi.c b/src/multi.c index 4f7711f6c..d92e515d4 100644 --- a/src/multi.c +++ b/src/multi.c @@ -72,7 +72,7 @@ void queueMultiCommand(client *c) { } void discardTransaction(client *c) { - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); freeClientMultiState(c); initClientMultiState(c); c->flags &= ~(CLIENT_MULTI|CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC); @@ -82,13 +82,13 @@ void discardTransaction(client *c) { /* Flag the transacation as DIRTY_EXEC so that EXEC will fail. * Should be called every time there is an error while queueing a command. */ void flagTransaction(client *c) { - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); if (c->flags & CLIENT_MULTI) c->flags |= CLIENT_DIRTY_EXEC; } void multiCommand(client *c) { - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); if (c->flags & CLIENT_MULTI) { addReplyError(c,"MULTI calls can not be nested"); return; @@ -294,7 +294,7 @@ void unwatchAllKeys(client *c) { /* "Touch" a key, so that if this key is being WATCHed by some client the * next EXEC will fail. */ void touchWatchedKey(redisDb *db, robj *key) { - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); list *clients; listIter li; listNode *ln; @@ -320,7 +320,7 @@ void touchWatchedKey(redisDb *db, robj *key) { void touchWatchedKeysOnFlush(int dbid) { listIter li1, li2; listNode *ln; - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); /* For every client, check all the waited keys */ listRewind(server.clients,&li1); @@ -355,7 +355,7 @@ void watchCommand(client *c) { void unwatchCommand(client *c) { unwatchAllKeys(c); - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); c->flags &= (~CLIENT_DIRTY_CAS); addReply(c,shared.ok); } diff --git a/src/networking.cpp b/src/networking.cpp index 857dfc52d..b608370fb 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -264,7 +264,7 @@ void clientInstallWriteHandler(client *c) { } void clientInstallAsyncWriteHandler(client *c) { - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); if (!(c->fPendingAsyncWrite)) { c->fPendingAsyncWrite = TRUE; listAddNodeHead(serverTL->clients_pending_asyncwrite,c); @@ -295,8 +295,8 @@ void clientInstallAsyncWriteHandler(client *c) { * data should be appended to the output buffers. */ int prepareClientToWrite(client *c, bool fAsync) { fAsync = fAsync && !FCorrectThread(c); // Not async if we're on the right thread - serverAssert(!fAsync || aeThreadOwnsLock()); - serverAssert(c->lock.fOwnLock()); + serverAssert(!fAsync || GlobalLocksAcquired()); + serverAssert(c->fd <= 0 || c->lock.fOwnLock()); /* If it's the Lua client we always return ok without installing any * handler since there is no socket at all. */ @@ -331,7 +331,7 @@ int _addReplyToBuffer(client *c, const char *s, size_t len, bool fAsync) { fAsync = fAsync && !FCorrectThread(c); // Not async if we're on the right thread if (fAsync) { - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); if ((c->buflenAsync - c->bufposAsync) < (int)len) { int minsize = len + c->bufposAsync; @@ -1166,7 +1166,7 @@ static void freeClientArgv(client *c) { * when we resync with our own master and want to force all our slaves to * resync with us as well. */ void disconnectSlaves(void) { - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); listIter li; listNode *ln; @@ -1183,8 +1183,8 @@ void disconnectSlaves(void) { void unlinkClient(client *c) { listNode *ln; AssertCorrectThread(c); - serverAssert(aeThreadOwnsLock()); - serverAssert(c->lock.fOwnLock()); + serverAssert(c->fd == -1 || GlobalLocksAcquired()); + serverAssert(c->fd == -1 || c->lock.fOwnLock()); /* If this is marked as current client unset it. */ if (server.current_client == c) server.current_client = NULL; @@ -1245,7 +1245,7 @@ void unlinkClient(client *c) { void freeClient(client *c) { listNode *ln; - serverAssert(aeThreadOwnsLock()); + serverAssert(c->fd == -1 || GlobalLocksAcquired()); AssertCorrectThread(c); std::unique_locklock)> ulock(c->lock); @@ -1517,7 +1517,10 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { void ProcessPendingAsyncWrites() { - serverAssert(aeThreadOwnsLock()); + if (serverTL == nullptr) + return; // module fake call + + serverAssert(GlobalLocksAcquired()); while(listLength(serverTL->clients_pending_asyncwrite)) { client *c = (client*)listNodeValue(listFirst(serverTL->clients_pending_asyncwrite)); @@ -2770,7 +2773,7 @@ void asyncCloseClientOnOutputBufferLimitReached(client *c) { * This is also called by SHUTDOWN for a best-effort attempt to send * slaves the latest writes. */ void flushSlavesOutputBuffers(void) { - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); listIter li; listNode *ln; diff --git a/src/object.c b/src/object.c index 8d64be50c..600dbfbc9 100644 --- a/src/object.c +++ b/src/object.c @@ -943,7 +943,7 @@ void freeMemoryOverheadData(struct redisMemOverhead *mh) { * information used for the MEMORY OVERHEAD and INFO command. The returned * structure pointer should be freed calling freeMemoryOverheadData(). */ struct redisMemOverhead *getMemoryOverheadData(void) { - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); int j; size_t mem_total = 0; size_t mem = 0; @@ -1083,7 +1083,7 @@ void inputCatSds(void *result, const char *str) { /* This implements MEMORY DOCTOR. An human readable analysis of the Redis * memory condition. */ sds getMemoryDoctorReport(void) { - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); int empty = 0; /* Instance is empty or almost empty. */ int big_peak = 0; /* Memory peak is much larger than used mem. */ int high_frag = 0; /* High fragmentation. */ diff --git a/src/pubsub.c b/src/pubsub.c index af064d06a..e7815a2e8 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -329,7 +329,7 @@ int pubsubPublishMessage(robj *channel, robj *message) { void subscribeCommand(client *c) { int j; - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); for (j = 1; j < c->argc; j++) pubsubSubscribeChannel(c,c->argv[j]); @@ -350,7 +350,7 @@ void unsubscribeCommand(client *c) { void psubscribeCommand(client *c) { int j; - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); for (j = 1; j < c->argc; j++) pubsubSubscribePattern(c,c->argv[j]); diff --git a/src/rdb.c b/src/rdb.c index 64cf1b01f..4a504e815 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -2157,7 +2157,7 @@ void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) { * This function covers the case of RDB -> Salves socket transfers for * diskless replication. */ void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) { - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); uint64_t *ok_slaves; if (!bysignal && exitcode == 0) { @@ -2277,7 +2277,7 @@ void killRDBChild(void) { /* Spawn an RDB child that writes the RDB to the sockets of the slaves * that are currently in SLAVE_STATE_WAIT_BGSAVE_START state. */ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); int *fds; uint64_t *clientids; int numfds; diff --git a/src/replication.cpp b/src/replication.cpp index b1abb285a..899fa3dcf 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -116,7 +116,7 @@ void resizeReplicationBacklog(long long newsize) { } void freeReplicationBacklog(void) { - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); serverAssert(listLength(server.slaves) == 0); zfree(server.repl_backlog); server.repl_backlog = NULL; @@ -127,7 +127,7 @@ void freeReplicationBacklog(void) { * server.master_repl_offset, because there is no case where we want to feed * the backlog without incrementing the offset. */ void feedReplicationBacklog(void *ptr, size_t len) { - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); unsigned char *p = (unsigned char*)ptr; server.master_repl_offset += len; @@ -179,7 +179,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { listIter li; int j, len; char llstr[LONG_STR_SIZE]; - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); /* If the instance is not a top level master, return ASAP: we'll just proxy * the stream of data we receive from our master instead, in order to @@ -328,7 +328,7 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, sds cmdrepr = sdsnew("+"); robj *cmdobj; struct timeval tv; - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); gettimeofday(&tv,NULL); cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec); @@ -468,7 +468,7 @@ int replicationSetupSlaveForFullResync(client *slave, long long offset) { * On success return C_OK, otherwise C_ERR is returned and we proceed * with the usual full resync. */ int masterTryPartialResynchronization(client *c) { - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); long long psync_offset, psync_len; char *master_replid = (char*)ptrFromObj(c->argv[1]); char buf[128]; @@ -588,7 +588,7 @@ need_full_resync: * * Returns C_OK on success or C_ERR otherwise. */ int startBgsaveForReplication(int mincapa) { - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); int retval; int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF); listIter li; @@ -975,7 +975,7 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) listIter li; int startbgsave = 0; int mincapa = -1; - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); listRewind(server.slaves,&li); while((ln = listNext(&li))) { @@ -1165,7 +1165,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { UNUSED(privdata); UNUSED(mask); - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); /* Static vars used to hold the EOF mark, and the last bytes received * form the server: when they match, we reached the end of the transfer. */ @@ -1651,7 +1651,7 @@ int slaveTryPartialResynchronization(aeEventLoop *el, int fd, int read_reply) { /* This handler fires when the non blocking connect was able to * establish a connection with the master. */ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); char tmpfile[256], *err = NULL; int dfd = -1, maxtries = 5; int sockerr = 0, psync_result; @@ -2603,7 +2603,7 @@ long long replicationGetSlaveOffset(void) { /* Replication cron function, called 1 time per second. */ void replicationCron(void) { - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); static long long replication_cron_loops = 0; std::unique_locklock)> ulock; if (server.master != nullptr) diff --git a/src/scripting.cpp b/src/scripting.cpp index 71d1a2815..b23c5dc5a 100644 --- a/src/scripting.cpp +++ b/src/scripting.cpp @@ -377,7 +377,7 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) { // Ensure our client is on the right thread serverAssert(!(c->flags & CLIENT_PENDING_WRITE)); serverAssert(!(c->flags & CLIENT_UNBLOCKED)); - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); c->iel = serverTL - server.rgthreadvar; /* Cached across calls. */ diff --git a/src/sentinel.c b/src/sentinel.c index 859f2121a..abaf7f3a9 100644 --- a/src/sentinel.c +++ b/src/sentinel.c @@ -3981,7 +3981,7 @@ int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port) { /* Setup the master state to start a failover. */ void sentinelStartFailover(sentinelRedisInstance *master) { - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); serverAssert(master->flags & SRI_MASTER); master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START; @@ -4174,7 +4174,7 @@ void sentinelFailoverWaitStart(sentinelRedisInstance *ri) { } void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) { - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); sentinelRedisInstance *slave = sentinelSelectSlave(ri); /* We don't handle the timeout in this state as the function aborts @@ -4299,7 +4299,7 @@ void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) { dictIterator *di; dictEntry *de; int in_progress = 0; - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); di = dictGetIterator(master->slaves); while((de = dictNext(di)) != NULL) { diff --git a/src/server.c b/src/server.c index bdeff0e45..c23ce0674 100644 --- a/src/server.c +++ b/src/server.c @@ -2116,7 +2116,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* Before we are going to sleep, let the threads access the dataset by * releasing the GIL. Redis main thread will not touch anything at this * time. */ - if (moduleCount()) moduleReleaseGIL(); + if (moduleCount()) moduleReleaseGIL(TRUE /*fServerThread*/); } void beforeSleepLite(struct aeEventLoop *eventLoop) @@ -2132,6 +2132,11 @@ void beforeSleepLite(struct aeEventLoop *eventLoop) /* Handle writes with pending output buffers. */ handleClientsWithPendingWrites(iel); + + /* Before we are going to sleep, let the threads access the dataset by + * releasing the GIL. Redis main thread will not touch anything at this + * time. */ + if (moduleCount()) moduleReleaseGIL(TRUE /*fServerThread*/); } /* This function is called immadiately after the event loop multiplexing @@ -2139,7 +2144,7 @@ void beforeSleepLite(struct aeEventLoop *eventLoop) * the different events callbacks. */ void afterSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); - if (moduleCount()) moduleAcquireGIL(); + if (moduleCount()) moduleAcquireGIL(TRUE /*fServerThread*/); } /* =========================== Server initialization ======================== */ @@ -3142,7 +3147,7 @@ struct redisCommand *lookupCommandOrOriginal(sds name) { void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags) { - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF) feedAppendOnlyFile(cmd,dbid,argv,argc); if (flags & PROPAGATE_REPL) @@ -3243,7 +3248,7 @@ void call(client *c, int flags) { long long dirty, start, duration; int client_old_flags = c->flags; struct redisCommand *real_cmd = c->cmd; - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); /* Sent the command to clients in MONITOR mode, only if the commands are * not generated from reading an AOF. */ @@ -3373,7 +3378,7 @@ void call(client *c, int flags) { * other operations can be performed by the caller. Otherwise * if C_ERR is returned the client was destroyed (i.e. after QUIT). */ int processCommand(client *c) { - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); /* The QUIT command is handled separately. Normal command procs will * go through checking for replication and QUIT will cause trouble * when FORCE_REPLICATION is enabled and would be implemented in @@ -3385,7 +3390,7 @@ int processCommand(client *c) { } AssertCorrectThread(c); - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); /* Now lookup the command and check ASAP about trivial error conditions * such as wrong arity, bad command name and so forth. */ @@ -4462,7 +4467,7 @@ void infoCommand(client *c) { void monitorCommand(client *c) { /* ignore MONITOR if already slave or in monitor mode */ - serverAssert(aeThreadOwnsLock()); + serverAssert(GlobalLocksAcquired()); if (c->flags & CLIENT_SLAVE) return; c->flags |= (CLIENT_SLAVE|CLIENT_MONITOR); @@ -4819,7 +4824,7 @@ void *workerThreadMain(void *parg) int isMainThread = (iel == IDX_EVENT_LOOP_MAIN); aeEventLoop *el = server.rgthreadvar[iel].el; aeSetBeforeSleepProc(el, isMainThread ? beforeSleep : beforeSleepLite, isMainThread ? 0 : AE_SLEEP_THREADSAFE); - aeSetAfterSleepProc(el, isMainThread ? afterSleep : NULL, 0); + aeSetAfterSleepProc(el, afterSleep, AE_SLEEP_THREADSAFE); aeMain(el); aeDeleteEventLoop(el); return NULL; diff --git a/src/server.h b/src/server.h index 9e64e8915..3e35a933b 100644 --- a/src/server.h +++ b/src/server.h @@ -1546,11 +1546,10 @@ void moduleHandleBlockedClients(void); void moduleBlockedClientTimedOut(client *c); void moduleBlockedClientPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask); size_t moduleCount(void); -void moduleAcquireGIL(void); -void moduleReleaseGIL(void); +void moduleAcquireGIL(int fServerThread); +void moduleReleaseGIL(int fServerThread); void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid); - /* Utils */ long long ustime(void); long long mstime(void); @@ -2352,6 +2351,12 @@ void mixDigest(unsigned char *digest, void *ptr, size_t len); void xorDigest(unsigned char *digest, void *ptr, size_t len); int populateCommandTableParseFlags(struct redisCommand *c, char *strflags); +int moduleGILAcquiredByModule(void); +static inline int GlobalLocksAcquired(void) // Used in asserts to verify all global locks are correctly acquired for a server-thread to operate +{ + return aeThreadOwnsLock() || moduleGILAcquiredByModule(); +} + inline int ielFromEventLoop(const aeEventLoop *eventLoop) { int iel = 0; @@ -2366,7 +2371,9 @@ inline int ielFromEventLoop(const aeEventLoop *eventLoop) inline int FCorrectThread(client *c) { - return server.rgthreadvar[c->iel].el == serverTL->el; + return (serverTL != NULL && (server.rgthreadvar[c->iel].el == serverTL->el)) + || (c->iel == IDX_EVENT_LOOP_MAIN && moduleGILAcquiredByModule()) + || (c->fd == -1); } #define AssertCorrectThread(c) serverAssert(FCorrectThread(c))