Modules TSC: GIL and cooperative multi tasking setup.
This commit is contained in:
parent
c180bc7d98
commit
59b06b14c9
28
src/module.c
28
src/module.c
@ -105,7 +105,7 @@ struct RedisModuleCtx {
|
|||||||
int flags; /* REDISMODULE_CTX_... flags. */
|
int flags; /* REDISMODULE_CTX_... flags. */
|
||||||
void **postponed_arrays; /* To set with RM_ReplySetArrayLength(). */
|
void **postponed_arrays; /* To set with RM_ReplySetArrayLength(). */
|
||||||
int postponed_arrays_count; /* Number of entries in postponed_arrays. */
|
int postponed_arrays_count; /* Number of entries in postponed_arrays. */
|
||||||
void *blocked_privdata; /* Privdata set when unblocking a clinet. */
|
void *blocked_privdata; /* Privdata set when unblocking a client. */
|
||||||
|
|
||||||
/* Used if there is the REDISMODULE_CTX_KEYS_POS_REQUEST flag set. */
|
/* Used if there is the REDISMODULE_CTX_KEYS_POS_REQUEST flag set. */
|
||||||
int *keys_pos;
|
int *keys_pos;
|
||||||
@ -203,6 +203,10 @@ typedef struct RedisModuleBlockedClient {
|
|||||||
static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER;
|
static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER;
|
||||||
static list *moduleUnblockedClients;
|
static list *moduleUnblockedClients;
|
||||||
|
|
||||||
|
/* We need a mutex that is unlocked / relocked in beforeSleep() in order to
|
||||||
|
* allow thread safe contexts to execute commands at a safe moment. */
|
||||||
|
static pthread_mutex_t moduleGIL = PTHREAD_MUTEX_INITIALIZER;
|
||||||
|
|
||||||
/* --------------------------------------------------------------------------
|
/* --------------------------------------------------------------------------
|
||||||
* Prototypes
|
* Prototypes
|
||||||
* -------------------------------------------------------------------------- */
|
* -------------------------------------------------------------------------- */
|
||||||
@ -3278,6 +3282,24 @@ void *RM_GetBlockedClientPrivateData(RedisModuleCtx *ctx) {
|
|||||||
return ctx->blocked_privdata;
|
return ctx->blocked_privdata;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* --------------------------------------------------------------------------
|
||||||
|
* Thread Safe Contexts
|
||||||
|
* -------------------------------------------------------------------------- */
|
||||||
|
|
||||||
|
/* Operations executed in thread safe contexts use a global lock in order to
|
||||||
|
* be ran at a safe time. This function unlocks and re-acquire the locks:
|
||||||
|
* hopefully with *any* sane implementation of pthreads, this will allow the
|
||||||
|
* modules to make progresses.
|
||||||
|
*
|
||||||
|
* This function is called in beforeSleep(). */
|
||||||
|
void moduleCooperativeMultiTaskingCycle(void) {
|
||||||
|
if (dictSize(modules) == 0) return; /* No modules, no async ops. */
|
||||||
|
pthread_mutex_unlock(&moduleGIL);
|
||||||
|
/* Here hopefully thread modules waiting to be executed at a safe time
|
||||||
|
* should be able to acquire the lock. */
|
||||||
|
pthread_mutex_lock(&moduleGIL);
|
||||||
|
}
|
||||||
|
|
||||||
/* --------------------------------------------------------------------------
|
/* --------------------------------------------------------------------------
|
||||||
* Modules API internals
|
* Modules API internals
|
||||||
* -------------------------------------------------------------------------- */
|
* -------------------------------------------------------------------------- */
|
||||||
@ -3329,6 +3351,10 @@ void moduleInitModulesSystem(void) {
|
|||||||
* and we do not want to block not in the read nor in the write half. */
|
* and we do not want to block not in the read nor in the write half. */
|
||||||
anetNonBlock(NULL,server.module_blocked_pipe[0]);
|
anetNonBlock(NULL,server.module_blocked_pipe[0]);
|
||||||
anetNonBlock(NULL,server.module_blocked_pipe[1]);
|
anetNonBlock(NULL,server.module_blocked_pipe[1]);
|
||||||
|
|
||||||
|
/* Our thread-safe contexts GIL must start with already locked:
|
||||||
|
* it is just unlocked when it's safe. */
|
||||||
|
pthread_mutex_lock(&moduleGIL);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Load all the modules in the server.loadmodule_queue list, which is
|
/* Load all the modules in the server.loadmodule_queue list, which is
|
||||||
|
@ -1172,6 +1172,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
|||||||
void beforeSleep(struct aeEventLoop *eventLoop) {
|
void beforeSleep(struct aeEventLoop *eventLoop) {
|
||||||
UNUSED(eventLoop);
|
UNUSED(eventLoop);
|
||||||
|
|
||||||
|
/* Give some run time to modules threads using thread safe contexts. */
|
||||||
|
moduleCooperativeMultiTaskingCycle();
|
||||||
|
|
||||||
/* Call the Redis Cluster before sleep function. Note that this function
|
/* Call the Redis Cluster before sleep function. Note that this function
|
||||||
* may change the state of Redis Cluster (from ok to fail or vice versa),
|
* may change the state of Redis Cluster (from ok to fail or vice versa),
|
||||||
* so it's a good idea to call it before serving the unblocked clients
|
* so it's a good idea to call it before serving the unblocked clients
|
||||||
|
@ -1294,6 +1294,7 @@ void unblockClientFromModule(client *c);
|
|||||||
void moduleHandleBlockedClients(void);
|
void moduleHandleBlockedClients(void);
|
||||||
void moduleBlockedClientTimedOut(client *c);
|
void moduleBlockedClientTimedOut(client *c);
|
||||||
void moduleBlockedClientPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask);
|
void moduleBlockedClientPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask);
|
||||||
|
void moduleCooperativeMultiTaskingCycle(void);
|
||||||
|
|
||||||
/* Utils */
|
/* Utils */
|
||||||
long long ustime(void);
|
long long ustime(void);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user