Merge branch 'unstable' into RELEASE_0_9
Former-commit-id: 0bc586933ff91fd07128d5419b06303f05d16f2e
This commit is contained in:
commit
38fd0476d5
@ -725,7 +725,7 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags)
|
||||
/* After sleep callback. */
|
||||
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) {
|
||||
std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock);
|
||||
if (!(eventLoop->beforesleepFlags & AE_SLEEP_THREADSAFE))
|
||||
if (!(eventLoop->aftersleepFlags & AE_SLEEP_THREADSAFE))
|
||||
ulock.lock();
|
||||
eventLoop->aftersleep(eventLoop);
|
||||
}
|
||||
|
@ -96,7 +96,7 @@ void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
listNode *ln;
|
||||
aofrwblock *block;
|
||||
ssize_t nwritten;
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
|
||||
UNUSED(el);
|
||||
UNUSED(fd);
|
||||
|
@ -100,7 +100,7 @@ int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int
|
||||
* flag is set client query buffer is not longer processed, but accumulated,
|
||||
* and will be processed when the client is unblocked. */
|
||||
void blockClient(client *c, int btype) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
c->flags |= CLIENT_BLOCKED;
|
||||
c->btype = btype;
|
||||
server.blocked_clients++;
|
||||
@ -111,7 +111,7 @@ void blockClient(client *c, int btype) {
|
||||
* in order to process the pending input buffer of clients that were
|
||||
* unblocked after a blocking operation. */
|
||||
void processUnblockedClients(int iel) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
|
||||
listNode *ln;
|
||||
client *c;
|
||||
@ -160,7 +160,7 @@ void processUnblockedClients(int iel) {
|
||||
void queueClientForReprocessing(client *c) {
|
||||
/* The client may already be into the unblocked list because of a previous
|
||||
* blocking operation, don't add back it into the list multiple times. */
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
fastlock_lock(&c->lock);
|
||||
if (!(c->flags & CLIENT_UNBLOCKED)) {
|
||||
c->flags |= CLIENT_UNBLOCKED;
|
||||
@ -172,7 +172,7 @@ void queueClientForReprocessing(client *c) {
|
||||
/* Unblock a client calling the right function depending on the kind
|
||||
* of operation the client is blocking for. */
|
||||
void unblockClient(client *c) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
if (c->btype == BLOCKED_LIST ||
|
||||
c->btype == BLOCKED_ZSET ||
|
||||
c->btype == BLOCKED_STREAM) {
|
||||
@ -218,7 +218,7 @@ void replyToBlockedClientTimedOut(client *c) {
|
||||
* The semantics is to send an -UNBLOCKED error to the client, disconnecting
|
||||
* it at the same time. */
|
||||
void disconnectAllBlockedClients(void) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
listNode *ln;
|
||||
listIter li;
|
||||
|
||||
@ -260,7 +260,7 @@ void disconnectAllBlockedClients(void) {
|
||||
* be used only for a single type, like virtually any Redis application will
|
||||
* do, the function is already fair. */
|
||||
void handleClientsBlockedOnKeys(void) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
while(listLength(server.ready_keys) != 0) {
|
||||
list *l;
|
||||
|
||||
|
@ -5390,7 +5390,7 @@ socket_err:
|
||||
* the target instance. See the Redis Cluster specification for more
|
||||
* information. */
|
||||
void askingCommand(client *c) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
if (server.cluster_enabled == 0) {
|
||||
addReplyError(c,"This instance has cluster support disabled");
|
||||
return;
|
||||
@ -5403,7 +5403,7 @@ void askingCommand(client *c) {
|
||||
* In this mode slaves will not redirect clients as long as clients access
|
||||
* with read-only commands to keys that are served by the slave's master. */
|
||||
void readonlyCommand(client *c) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
if (server.cluster_enabled == 0) {
|
||||
addReplyError(c,"This instance has cluster support disabled");
|
||||
return;
|
||||
@ -5414,7 +5414,7 @@ void readonlyCommand(client *c) {
|
||||
|
||||
/* The READWRITE command just clears the READONLY command state. */
|
||||
void readwriteCommand(client *c) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
c->flags &= ~CLIENT_READONLY;
|
||||
addReply(c,shared.ok);
|
||||
}
|
||||
@ -5458,7 +5458,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
|
||||
multiState *ms, _ms;
|
||||
multiCmd mc;
|
||||
int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0;
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
|
||||
/* Allow any key to be set if a module disabled cluster redirections. */
|
||||
if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
|
||||
@ -5671,7 +5671,7 @@ void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_co
|
||||
* longer handles, the client is sent a redirection error, and the function
|
||||
* returns 1. Otherwise 0 is returned and no operation is performed. */
|
||||
int clusterRedirectBlockedClientIfNeeded(client *c) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
if (c->flags & CLIENT_BLOCKED &&
|
||||
(c->btype == BLOCKED_LIST ||
|
||||
c->btype == BLOCKED_ZSET ||
|
||||
|
6
src/db.c
6
src/db.c
@ -99,7 +99,7 @@ robj *lookupKey(redisDb *db, robj *key, int flags) {
|
||||
* expiring our key via DELs in the replication link. */
|
||||
robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
|
||||
robj *val;
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
|
||||
if (expireIfNeeded(db,key) == 1) {
|
||||
/* Key expired. If we are in the context of a master, expireIfNeeded()
|
||||
@ -1073,7 +1073,7 @@ int removeExpire(redisDb *db, robj *key) {
|
||||
* after which the key will no longer be considered valid. */
|
||||
void setExpire(client *c, redisDb *db, robj *key, long long when) {
|
||||
dictEntry *kde, *de;
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
|
||||
/* Reuse the sds from the main dict in the expire dict */
|
||||
kde = dictFind(db->pdict,ptrFromObj(key));
|
||||
@ -1110,7 +1110,7 @@ long long getExpire(redisDb *db, robj *key) {
|
||||
* will be consistent even if we allow write operations against expiring
|
||||
* keys. */
|
||||
void propagateExpire(redisDb *db, robj *key, int lazy) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
robj *argv[2];
|
||||
|
||||
argv[0] = lazy ? shared.unlink : shared.del;
|
||||
|
@ -350,7 +350,7 @@ unsigned long LFUDecrAndReturn(robj *o) {
|
||||
* used memory: the eviction should use mostly data size. This function
|
||||
* returns the sum of AOF and slaves buffer. */
|
||||
size_t freeMemoryGetNotCountedMemory(void) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
size_t overhead = 0;
|
||||
int slaves = listLength(server.slaves);
|
||||
|
||||
@ -445,7 +445,7 @@ int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *lev
|
||||
* Otehrwise if we are over the memory limit, but not enough memory
|
||||
* was freed to return back under the limit, the function returns C_ERR. */
|
||||
int freeMemoryIfNeeded(void) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
/* By default replicas should ignore maxmemory
|
||||
* and just be masters exact copies. */
|
||||
if (server.masterhost && server.repl_slave_ignore_maxmemory) return C_OK;
|
||||
|
39
src/module.c
39
src/module.c
@ -219,8 +219,8 @@ static list *moduleUnblockedClients;
|
||||
|
||||
/* We need a mutex that is unlocked / relocked in beforeSleep() in order to
|
||||
* allow thread safe contexts to execute commands at a safe moment. */
|
||||
static pthread_mutex_t moduleGIL = PTHREAD_MUTEX_INITIALIZER;
|
||||
|
||||
static pthread_rwlock_t moduleGIL = PTHREAD_RWLOCK_INITIALIZER;
|
||||
int fModuleGILWlocked = FALSE;
|
||||
|
||||
/* Function pointer type for keyspace event notification subscriptions from modules. */
|
||||
typedef int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key);
|
||||
@ -484,7 +484,7 @@ void moduleFreeContext(RedisModuleCtx *ctx) {
|
||||
* details needed to correctly replicate commands. */
|
||||
void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) {
|
||||
client *c = ctx->client;
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
|
||||
if (c->flags & CLIENT_LUA) return;
|
||||
|
||||
@ -3624,7 +3624,7 @@ void RM_SetDisconnectCallback(RedisModuleBlockedClient *bc, RedisModuleDisconnec
|
||||
void moduleHandleBlockedClients(void) {
|
||||
listNode *ln;
|
||||
RedisModuleBlockedClient *bc;
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
|
||||
pthread_mutex_lock(&moduleUnblockedClientsMutex);
|
||||
/* Here we unblock all the pending clients blocked in modules operations
|
||||
@ -3824,21 +3824,37 @@ void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) {
|
||||
* a blocked client connected to the thread safe context. */
|
||||
void RM_ThreadSafeContextLock(RedisModuleCtx *ctx) {
|
||||
UNUSED(ctx);
|
||||
moduleAcquireGIL();
|
||||
moduleAcquireGIL(FALSE /*fServerThread*/);
|
||||
}
|
||||
|
||||
/* Release the server lock after a thread safe API call was executed. */
|
||||
void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) {
|
||||
UNUSED(ctx);
|
||||
moduleReleaseGIL();
|
||||
moduleReleaseGIL(FALSE /*fServerThread*/);
|
||||
}
|
||||
|
||||
void moduleAcquireGIL(void) {
|
||||
pthread_mutex_lock(&moduleGIL);
|
||||
void moduleAcquireGIL(int fServerThread) {
|
||||
if (fServerThread)
|
||||
{
|
||||
pthread_rwlock_rdlock(&moduleGIL);
|
||||
}
|
||||
else
|
||||
{
|
||||
pthread_rwlock_wrlock(&moduleGIL);
|
||||
fModuleGILWlocked = TRUE;
|
||||
}
|
||||
}
|
||||
|
||||
void moduleReleaseGIL(void) {
|
||||
pthread_mutex_unlock(&moduleGIL);
|
||||
void moduleReleaseGIL(int fServerThread) {
|
||||
pthread_rwlock_unlock(&moduleGIL);
|
||||
if (!fServerThread)
|
||||
{
|
||||
fModuleGILWlocked = FALSE;
|
||||
}
|
||||
}
|
||||
|
||||
int moduleGILAcquiredByModule(void) {
|
||||
return fModuleGILWlocked;
|
||||
}
|
||||
|
||||
|
||||
@ -4694,7 +4710,8 @@ void moduleInitModulesSystem(void) {
|
||||
|
||||
/* Our thread-safe contexts GIL must start with already locked:
|
||||
* it is just unlocked when it's safe. */
|
||||
pthread_mutex_lock(&moduleGIL);
|
||||
pthread_rwlock_init(&moduleGIL, NULL);
|
||||
pthread_rwlock_rdlock(&moduleGIL);
|
||||
}
|
||||
|
||||
/* Load all the modules in the server.loadmodule_queue list, which is
|
||||
|
12
src/multi.c
12
src/multi.c
@ -72,7 +72,7 @@ void queueMultiCommand(client *c) {
|
||||
}
|
||||
|
||||
void discardTransaction(client *c) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
freeClientMultiState(c);
|
||||
initClientMultiState(c);
|
||||
c->flags &= ~(CLIENT_MULTI|CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC);
|
||||
@ -82,13 +82,13 @@ void discardTransaction(client *c) {
|
||||
/* Flag the transacation as DIRTY_EXEC so that EXEC will fail.
|
||||
* Should be called every time there is an error while queueing a command. */
|
||||
void flagTransaction(client *c) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
if (c->flags & CLIENT_MULTI)
|
||||
c->flags |= CLIENT_DIRTY_EXEC;
|
||||
}
|
||||
|
||||
void multiCommand(client *c) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
if (c->flags & CLIENT_MULTI) {
|
||||
addReplyError(c,"MULTI calls can not be nested");
|
||||
return;
|
||||
@ -294,7 +294,7 @@ void unwatchAllKeys(client *c) {
|
||||
/* "Touch" a key, so that if this key is being WATCHed by some client the
|
||||
* next EXEC will fail. */
|
||||
void touchWatchedKey(redisDb *db, robj *key) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
list *clients;
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
@ -320,7 +320,7 @@ void touchWatchedKey(redisDb *db, robj *key) {
|
||||
void touchWatchedKeysOnFlush(int dbid) {
|
||||
listIter li1, li2;
|
||||
listNode *ln;
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
|
||||
/* For every client, check all the waited keys */
|
||||
listRewind(server.clients,&li1);
|
||||
@ -355,7 +355,7 @@ void watchCommand(client *c) {
|
||||
|
||||
void unwatchCommand(client *c) {
|
||||
unwatchAllKeys(c);
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
c->flags &= (~CLIENT_DIRTY_CAS);
|
||||
addReply(c,shared.ok);
|
||||
}
|
||||
|
@ -264,7 +264,7 @@ void clientInstallWriteHandler(client *c) {
|
||||
}
|
||||
|
||||
void clientInstallAsyncWriteHandler(client *c) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
if (!(c->fPendingAsyncWrite)) {
|
||||
c->fPendingAsyncWrite = TRUE;
|
||||
listAddNodeHead(serverTL->clients_pending_asyncwrite,c);
|
||||
@ -295,8 +295,8 @@ void clientInstallAsyncWriteHandler(client *c) {
|
||||
* data should be appended to the output buffers. */
|
||||
int prepareClientToWrite(client *c, bool fAsync) {
|
||||
fAsync = fAsync && !FCorrectThread(c); // Not async if we're on the right thread
|
||||
serverAssert(!fAsync || aeThreadOwnsLock());
|
||||
serverAssert(c->lock.fOwnLock());
|
||||
serverAssert(!fAsync || GlobalLocksAcquired());
|
||||
serverAssert(c->fd <= 0 || c->lock.fOwnLock());
|
||||
|
||||
/* If it's the Lua client we always return ok without installing any
|
||||
* handler since there is no socket at all. */
|
||||
@ -331,7 +331,7 @@ int _addReplyToBuffer(client *c, const char *s, size_t len, bool fAsync) {
|
||||
fAsync = fAsync && !FCorrectThread(c); // Not async if we're on the right thread
|
||||
if (fAsync)
|
||||
{
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
if ((c->buflenAsync - c->bufposAsync) < (int)len)
|
||||
{
|
||||
int minsize = len + c->bufposAsync;
|
||||
@ -1166,7 +1166,7 @@ static void freeClientArgv(client *c) {
|
||||
* when we resync with our own master and want to force all our slaves to
|
||||
* resync with us as well. */
|
||||
void disconnectSlaves(void) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
|
||||
@ -1183,8 +1183,8 @@ void disconnectSlaves(void) {
|
||||
void unlinkClient(client *c) {
|
||||
listNode *ln;
|
||||
AssertCorrectThread(c);
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(c->lock.fOwnLock());
|
||||
serverAssert(c->fd == -1 || GlobalLocksAcquired());
|
||||
serverAssert(c->fd == -1 || c->lock.fOwnLock());
|
||||
|
||||
/* If this is marked as current client unset it. */
|
||||
if (server.current_client == c) server.current_client = NULL;
|
||||
@ -1245,7 +1245,7 @@ void unlinkClient(client *c) {
|
||||
|
||||
void freeClient(client *c) {
|
||||
listNode *ln;
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(c->fd == -1 || GlobalLocksAcquired());
|
||||
AssertCorrectThread(c);
|
||||
std::unique_lock<decltype(c->lock)> ulock(c->lock);
|
||||
|
||||
@ -1517,7 +1517,10 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
|
||||
void ProcessPendingAsyncWrites()
|
||||
{
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
if (serverTL == nullptr)
|
||||
return; // module fake call
|
||||
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
|
||||
while(listLength(serverTL->clients_pending_asyncwrite)) {
|
||||
client *c = (client*)listNodeValue(listFirst(serverTL->clients_pending_asyncwrite));
|
||||
@ -2770,7 +2773,7 @@ void asyncCloseClientOnOutputBufferLimitReached(client *c) {
|
||||
* This is also called by SHUTDOWN for a best-effort attempt to send
|
||||
* slaves the latest writes. */
|
||||
void flushSlavesOutputBuffers(void) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
|
||||
|
@ -943,7 +943,7 @@ void freeMemoryOverheadData(struct redisMemOverhead *mh) {
|
||||
* information used for the MEMORY OVERHEAD and INFO command. The returned
|
||||
* structure pointer should be freed calling freeMemoryOverheadData(). */
|
||||
struct redisMemOverhead *getMemoryOverheadData(void) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
int j;
|
||||
size_t mem_total = 0;
|
||||
size_t mem = 0;
|
||||
@ -1083,7 +1083,7 @@ void inputCatSds(void *result, const char *str) {
|
||||
/* This implements MEMORY DOCTOR. An human readable analysis of the Redis
|
||||
* memory condition. */
|
||||
sds getMemoryDoctorReport(void) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
int empty = 0; /* Instance is empty or almost empty. */
|
||||
int big_peak = 0; /* Memory peak is much larger than used mem. */
|
||||
int high_frag = 0; /* High fragmentation. */
|
||||
|
@ -329,7 +329,7 @@ int pubsubPublishMessage(robj *channel, robj *message) {
|
||||
|
||||
void subscribeCommand(client *c) {
|
||||
int j;
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
|
||||
for (j = 1; j < c->argc; j++)
|
||||
pubsubSubscribeChannel(c,c->argv[j]);
|
||||
@ -350,7 +350,7 @@ void unsubscribeCommand(client *c) {
|
||||
|
||||
void psubscribeCommand(client *c) {
|
||||
int j;
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
|
||||
for (j = 1; j < c->argc; j++)
|
||||
pubsubSubscribePattern(c,c->argv[j]);
|
||||
|
@ -2157,7 +2157,7 @@ void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) {
|
||||
* This function covers the case of RDB -> Salves socket transfers for
|
||||
* diskless replication. */
|
||||
void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
uint64_t *ok_slaves;
|
||||
|
||||
if (!bysignal && exitcode == 0) {
|
||||
@ -2277,7 +2277,7 @@ void killRDBChild(void) {
|
||||
/* Spawn an RDB child that writes the RDB to the sockets of the slaves
|
||||
* that are currently in SLAVE_STATE_WAIT_BGSAVE_START state. */
|
||||
int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
int *fds;
|
||||
uint64_t *clientids;
|
||||
int numfds;
|
||||
|
@ -116,7 +116,7 @@ void resizeReplicationBacklog(long long newsize) {
|
||||
}
|
||||
|
||||
void freeReplicationBacklog(void) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
serverAssert(listLength(server.slaves) == 0);
|
||||
zfree(server.repl_backlog);
|
||||
server.repl_backlog = NULL;
|
||||
@ -127,7 +127,7 @@ void freeReplicationBacklog(void) {
|
||||
* server.master_repl_offset, because there is no case where we want to feed
|
||||
* the backlog without incrementing the offset. */
|
||||
void feedReplicationBacklog(void *ptr, size_t len) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
unsigned char *p = (unsigned char*)ptr;
|
||||
|
||||
server.master_repl_offset += len;
|
||||
@ -179,7 +179,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
||||
listIter li;
|
||||
int j, len;
|
||||
char llstr[LONG_STR_SIZE];
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
|
||||
/* If the instance is not a top level master, return ASAP: we'll just proxy
|
||||
* the stream of data we receive from our master instead, in order to
|
||||
@ -328,7 +328,7 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv,
|
||||
sds cmdrepr = sdsnew("+");
|
||||
robj *cmdobj;
|
||||
struct timeval tv;
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
|
||||
gettimeofday(&tv,NULL);
|
||||
cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec);
|
||||
@ -468,7 +468,7 @@ int replicationSetupSlaveForFullResync(client *slave, long long offset) {
|
||||
* On success return C_OK, otherwise C_ERR is returned and we proceed
|
||||
* with the usual full resync. */
|
||||
int masterTryPartialResynchronization(client *c) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
long long psync_offset, psync_len;
|
||||
char *master_replid = (char*)ptrFromObj(c->argv[1]);
|
||||
char buf[128];
|
||||
@ -588,7 +588,7 @@ need_full_resync:
|
||||
*
|
||||
* Returns C_OK on success or C_ERR otherwise. */
|
||||
int startBgsaveForReplication(int mincapa) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
int retval;
|
||||
int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF);
|
||||
listIter li;
|
||||
@ -975,7 +975,7 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type)
|
||||
listIter li;
|
||||
int startbgsave = 0;
|
||||
int mincapa = -1;
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
|
||||
listRewind(server.slaves,&li);
|
||||
while((ln = listNext(&li))) {
|
||||
@ -1165,7 +1165,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
UNUSED(privdata);
|
||||
UNUSED(mask);
|
||||
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
|
||||
/* Static vars used to hold the EOF mark, and the last bytes received
|
||||
* form the server: when they match, we reached the end of the transfer. */
|
||||
@ -1651,7 +1651,7 @@ int slaveTryPartialResynchronization(aeEventLoop *el, int fd, int read_reply) {
|
||||
/* This handler fires when the non blocking connect was able to
|
||||
* establish a connection with the master. */
|
||||
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
char tmpfile[256], *err = NULL;
|
||||
int dfd = -1, maxtries = 5;
|
||||
int sockerr = 0, psync_result;
|
||||
@ -2603,7 +2603,7 @@ long long replicationGetSlaveOffset(void) {
|
||||
|
||||
/* Replication cron function, called 1 time per second. */
|
||||
void replicationCron(void) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
static long long replication_cron_loops = 0;
|
||||
std::unique_lock<decltype(server.master->lock)> ulock;
|
||||
if (server.master != nullptr)
|
||||
|
@ -377,7 +377,7 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
|
||||
// Ensure our client is on the right thread
|
||||
serverAssert(!(c->flags & CLIENT_PENDING_WRITE));
|
||||
serverAssert(!(c->flags & CLIENT_UNBLOCKED));
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
c->iel = serverTL - server.rgthreadvar;
|
||||
|
||||
/* Cached across calls. */
|
||||
|
@ -3981,7 +3981,7 @@ int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port) {
|
||||
|
||||
/* Setup the master state to start a failover. */
|
||||
void sentinelStartFailover(sentinelRedisInstance *master) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
serverAssert(master->flags & SRI_MASTER);
|
||||
|
||||
master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
|
||||
@ -4174,7 +4174,7 @@ void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
|
||||
}
|
||||
|
||||
void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
sentinelRedisInstance *slave = sentinelSelectSlave(ri);
|
||||
|
||||
/* We don't handle the timeout in this state as the function aborts
|
||||
@ -4299,7 +4299,7 @@ void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
|
||||
dictIterator *di;
|
||||
dictEntry *de;
|
||||
int in_progress = 0;
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
|
||||
di = dictGetIterator(master->slaves);
|
||||
while((de = dictNext(di)) != NULL) {
|
||||
|
@ -2116,7 +2116,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
||||
/* Before we are going to sleep, let the threads access the dataset by
|
||||
* releasing the GIL. Redis main thread will not touch anything at this
|
||||
* time. */
|
||||
if (moduleCount()) moduleReleaseGIL();
|
||||
if (moduleCount()) moduleReleaseGIL(TRUE /*fServerThread*/);
|
||||
}
|
||||
|
||||
void beforeSleepLite(struct aeEventLoop *eventLoop)
|
||||
@ -2132,6 +2132,11 @@ void beforeSleepLite(struct aeEventLoop *eventLoop)
|
||||
|
||||
/* Handle writes with pending output buffers. */
|
||||
handleClientsWithPendingWrites(iel);
|
||||
|
||||
/* Before we are going to sleep, let the threads access the dataset by
|
||||
* releasing the GIL. Redis main thread will not touch anything at this
|
||||
* time. */
|
||||
if (moduleCount()) moduleReleaseGIL(TRUE /*fServerThread*/);
|
||||
}
|
||||
|
||||
/* This function is called immadiately after the event loop multiplexing
|
||||
@ -2139,7 +2144,7 @@ void beforeSleepLite(struct aeEventLoop *eventLoop)
|
||||
* the different events callbacks. */
|
||||
void afterSleep(struct aeEventLoop *eventLoop) {
|
||||
UNUSED(eventLoop);
|
||||
if (moduleCount()) moduleAcquireGIL();
|
||||
if (moduleCount()) moduleAcquireGIL(TRUE /*fServerThread*/);
|
||||
}
|
||||
|
||||
/* =========================== Server initialization ======================== */
|
||||
@ -3142,7 +3147,7 @@ struct redisCommand *lookupCommandOrOriginal(sds name) {
|
||||
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
|
||||
int flags)
|
||||
{
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
|
||||
feedAppendOnlyFile(cmd,dbid,argv,argc);
|
||||
if (flags & PROPAGATE_REPL)
|
||||
@ -3243,7 +3248,7 @@ void call(client *c, int flags) {
|
||||
long long dirty, start, duration;
|
||||
int client_old_flags = c->flags;
|
||||
struct redisCommand *real_cmd = c->cmd;
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
|
||||
/* Sent the command to clients in MONITOR mode, only if the commands are
|
||||
* not generated from reading an AOF. */
|
||||
@ -3373,7 +3378,7 @@ void call(client *c, int flags) {
|
||||
* other operations can be performed by the caller. Otherwise
|
||||
* if C_ERR is returned the client was destroyed (i.e. after QUIT). */
|
||||
int processCommand(client *c) {
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
/* The QUIT command is handled separately. Normal command procs will
|
||||
* go through checking for replication and QUIT will cause trouble
|
||||
* when FORCE_REPLICATION is enabled and would be implemented in
|
||||
@ -3385,7 +3390,7 @@ int processCommand(client *c) {
|
||||
}
|
||||
|
||||
AssertCorrectThread(c);
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
|
||||
/* Now lookup the command and check ASAP about trivial error conditions
|
||||
* such as wrong arity, bad command name and so forth. */
|
||||
@ -4462,7 +4467,7 @@ void infoCommand(client *c) {
|
||||
|
||||
void monitorCommand(client *c) {
|
||||
/* ignore MONITOR if already slave or in monitor mode */
|
||||
serverAssert(aeThreadOwnsLock());
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
if (c->flags & CLIENT_SLAVE) return;
|
||||
|
||||
c->flags |= (CLIENT_SLAVE|CLIENT_MONITOR);
|
||||
@ -4819,7 +4824,7 @@ void *workerThreadMain(void *parg)
|
||||
int isMainThread = (iel == IDX_EVENT_LOOP_MAIN);
|
||||
aeEventLoop *el = server.rgthreadvar[iel].el;
|
||||
aeSetBeforeSleepProc(el, isMainThread ? beforeSleep : beforeSleepLite, isMainThread ? 0 : AE_SLEEP_THREADSAFE);
|
||||
aeSetAfterSleepProc(el, isMainThread ? afterSleep : NULL, 0);
|
||||
aeSetAfterSleepProc(el, afterSleep, AE_SLEEP_THREADSAFE);
|
||||
aeMain(el);
|
||||
aeDeleteEventLoop(el);
|
||||
return NULL;
|
||||
|
15
src/server.h
15
src/server.h
@ -1546,11 +1546,10 @@ void moduleHandleBlockedClients(void);
|
||||
void moduleBlockedClientTimedOut(client *c);
|
||||
void moduleBlockedClientPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask);
|
||||
size_t moduleCount(void);
|
||||
void moduleAcquireGIL(void);
|
||||
void moduleReleaseGIL(void);
|
||||
void moduleAcquireGIL(int fServerThread);
|
||||
void moduleReleaseGIL(int fServerThread);
|
||||
void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid);
|
||||
|
||||
|
||||
/* Utils */
|
||||
long long ustime(void);
|
||||
long long mstime(void);
|
||||
@ -2352,6 +2351,12 @@ void mixDigest(unsigned char *digest, void *ptr, size_t len);
|
||||
void xorDigest(unsigned char *digest, void *ptr, size_t len);
|
||||
int populateCommandTableParseFlags(struct redisCommand *c, const char *strflags);
|
||||
|
||||
int moduleGILAcquiredByModule(void);
|
||||
static inline int GlobalLocksAcquired(void) // Used in asserts to verify all global locks are correctly acquired for a server-thread to operate
|
||||
{
|
||||
return aeThreadOwnsLock() || moduleGILAcquiredByModule();
|
||||
}
|
||||
|
||||
inline int ielFromEventLoop(const aeEventLoop *eventLoop)
|
||||
{
|
||||
int iel = 0;
|
||||
@ -2366,7 +2371,9 @@ inline int ielFromEventLoop(const aeEventLoop *eventLoop)
|
||||
|
||||
inline int FCorrectThread(client *c)
|
||||
{
|
||||
return server.rgthreadvar[c->iel].el == serverTL->el;
|
||||
return (serverTL != NULL && (server.rgthreadvar[c->iel].el == serverTL->el))
|
||||
|| (c->iel == IDX_EVENT_LOOP_MAIN && moduleGILAcquiredByModule())
|
||||
|| (c->fd == -1);
|
||||
}
|
||||
#define AssertCorrectThread(c) serverAssert(FCorrectThread(c))
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user