Initialized serverTL in more places in module.cpp

Former-commit-id: 8d81592bec0a62b8f3eae6b8c924887839909e2c
This commit is contained in:
VivekSainiEQ 2021-05-25 16:55:47 +00:00
parent 61e054f826
commit 68cdb2f1ad

View File

@ -568,6 +568,19 @@ int moduleDelKeyIfEmpty(RedisModuleKey *key) {
} }
} }
/* This function is used to set the thread local variables (serverTL) for
* arbitrary module threads. All incoming module threads share the same set of
* thread local variables (modulethreadvar).
*
* This is needed as some KeyDB functions use thread local variables to do things,
* and we don't want to share the thread local variables of existing server threads */
void moduleSetThreadVariablesIfNeeded(void) {
if (serverTL == nullptr) {
serverTL = &g_pserver->modulethreadvar;
g_fModuleThread = true;
}
}
/* -------------------------------------------------------------------------- /* --------------------------------------------------------------------------
* Service API exported to modules * Service API exported to modules
* *
@ -2113,6 +2126,7 @@ int RM_GetContextFlags(RedisModuleCtx *ctx) {
* periodically in timer callbacks or other periodic callbacks. * periodically in timer callbacks or other periodic callbacks.
*/ */
int RM_AvoidReplicaTraffic() { int RM_AvoidReplicaTraffic() {
moduleSetThreadVariablesIfNeeded();
return clientsArePaused(); return clientsArePaused();
} }
@ -2181,9 +2195,11 @@ void *RM_OpenKey(RedisModuleCtx *ctx, robj *keyname, int mode) {
/* Destroy a RedisModuleKey struct (freeing is the responsibility of the caller). */ /* Destroy a RedisModuleKey struct (freeing is the responsibility of the caller). */
static void moduleCloseKey(RedisModuleKey *key) { static void moduleCloseKey(RedisModuleKey *key) {
int signal = SHOULD_SIGNAL_MODIFIED_KEYS(key->ctx); int signal = SHOULD_SIGNAL_MODIFIED_KEYS(key->ctx);
moduleAcquireGIL(false);
if ((key->mode & REDISMODULE_WRITE) && signal) if ((key->mode & REDISMODULE_WRITE) && signal)
signalModifiedKey(key->ctx->client,key->db,key->key); signalModifiedKey(key->ctx->client,key->db,key->key);
/* TODO: if (key->iter) RM_KeyIteratorStop(kp); */ /* TODO: if (key->iter) RM_KeyIteratorStop(kp); */
moduleReleaseGIL(false);
RM_ZsetRangeStop(key); RM_ZsetRangeStop(key);
decrRefCount(key->key); decrRefCount(key->key);
} }
@ -4773,10 +4789,7 @@ int moduleClientIsBlockedOnKeys(client *c) {
* RedisModule_BlockClientOnKeys() is accessible from the timeout * RedisModule_BlockClientOnKeys() is accessible from the timeout
* callback via RM_GetBlockedClientPrivateData). */ * callback via RM_GetBlockedClientPrivateData). */
int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) { int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) {
if (serverTL == nullptr) { moduleSetThreadVariablesIfNeeded();
serverTL = &g_pserver->modulethreadvar;
g_fModuleThread = true;
}
if (bc->blocked_on_keys) { if (bc->blocked_on_keys) {
/* In theory the user should always pass the timeout handler as an /* In theory the user should always pass the timeout handler as an
* argument, but better to be safe than sorry. */ * argument, but better to be safe than sorry. */
@ -5056,10 +5069,7 @@ void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) {
* a blocked client connected to the thread safe context. */ * a blocked client connected to the thread safe context. */
void RM_ThreadSafeContextLock(RedisModuleCtx *ctx) { void RM_ThreadSafeContextLock(RedisModuleCtx *ctx) {
UNUSED(ctx); UNUSED(ctx);
if (serverTL == nullptr) { moduleSetThreadVariablesIfNeeded();
serverTL = &g_pserver->modulethreadvar;
g_fModuleThread = true;
}
moduleAcquireGIL(FALSE /*fServerThread*/, true /*fExclusive*/); moduleAcquireGIL(FALSE /*fServerThread*/, true /*fExclusive*/);
} }