RM_ThreadSafeContextTryLock a non-blocking method for acquiring GIL (#7738)
Co-authored-by: Yossi Gottlieb <yossigo@gmail.com> Co-authored-by: Oran Agra <oran@redislabs.com>
This commit is contained in:
parent
a8b7268911
commit
042189fd87
@ -26,4 +26,5 @@ $TCLSH tests/test_helper.tcl \
|
|||||||
--single unit/moduleapi/datatype \
|
--single unit/moduleapi/datatype \
|
||||||
--single unit/moduleapi/auth \
|
--single unit/moduleapi/auth \
|
||||||
--single unit/moduleapi/keyspace_events \
|
--single unit/moduleapi/keyspace_events \
|
||||||
|
--single unit/moduleapi/blockedclient \
|
||||||
"${@}"
|
"${@}"
|
||||||
|
22
src/module.c
22
src/module.c
@ -4906,6 +4906,23 @@ void RM_ThreadSafeContextLock(RedisModuleCtx *ctx) {
|
|||||||
moduleAcquireGIL();
|
moduleAcquireGIL();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Similar to RM_ThreadSafeContextLock but this function
|
||||||
|
* would not block if the server lock is already acquired.
|
||||||
|
*
|
||||||
|
* If successful (lock acquired) REDISMODULE_OK is returned,
|
||||||
|
* otherwise REDISMODULE_ERR is returned and errno is set
|
||||||
|
* accordingly. */
|
||||||
|
int RM_ThreadSafeContextTryLock(RedisModuleCtx *ctx) {
|
||||||
|
UNUSED(ctx);
|
||||||
|
|
||||||
|
int res = moduleTryAcquireGIL();
|
||||||
|
if(res != 0) {
|
||||||
|
errno = res;
|
||||||
|
return REDISMODULE_ERR;
|
||||||
|
}
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
}
|
||||||
|
|
||||||
/* Release the server lock after a thread safe API call was executed. */
|
/* Release the server lock after a thread safe API call was executed. */
|
||||||
void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) {
|
void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) {
|
||||||
UNUSED(ctx);
|
UNUSED(ctx);
|
||||||
@ -4916,6 +4933,10 @@ void moduleAcquireGIL(void) {
|
|||||||
pthread_mutex_lock(&moduleGIL);
|
pthread_mutex_lock(&moduleGIL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int moduleTryAcquireGIL(void) {
|
||||||
|
return pthread_mutex_trylock(&moduleGIL);
|
||||||
|
}
|
||||||
|
|
||||||
void moduleReleaseGIL(void) {
|
void moduleReleaseGIL(void) {
|
||||||
pthread_mutex_unlock(&moduleGIL);
|
pthread_mutex_unlock(&moduleGIL);
|
||||||
}
|
}
|
||||||
@ -7936,6 +7957,7 @@ void moduleRegisterCoreAPI(void) {
|
|||||||
REGISTER_API(GetThreadSafeContext);
|
REGISTER_API(GetThreadSafeContext);
|
||||||
REGISTER_API(FreeThreadSafeContext);
|
REGISTER_API(FreeThreadSafeContext);
|
||||||
REGISTER_API(ThreadSafeContextLock);
|
REGISTER_API(ThreadSafeContextLock);
|
||||||
|
REGISTER_API(ThreadSafeContextTryLock);
|
||||||
REGISTER_API(ThreadSafeContextUnlock);
|
REGISTER_API(ThreadSafeContextUnlock);
|
||||||
REGISTER_API(DigestAddStringBuffer);
|
REGISTER_API(DigestAddStringBuffer);
|
||||||
REGISTER_API(DigestAddLongLong);
|
REGISTER_API(DigestAddLongLong);
|
||||||
|
@ -666,6 +666,7 @@ REDISMODULE_API int (*RedisModule_AbortBlock)(RedisModuleBlockedClient *bc) REDI
|
|||||||
REDISMODULE_API RedisModuleCtx * (*RedisModule_GetThreadSafeContext)(RedisModuleBlockedClient *bc) REDISMODULE_ATTR;
|
REDISMODULE_API RedisModuleCtx * (*RedisModule_GetThreadSafeContext)(RedisModuleBlockedClient *bc) REDISMODULE_ATTR;
|
||||||
REDISMODULE_API void (*RedisModule_FreeThreadSafeContext)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
|
REDISMODULE_API void (*RedisModule_FreeThreadSafeContext)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
|
||||||
REDISMODULE_API void (*RedisModule_ThreadSafeContextLock)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
|
REDISMODULE_API void (*RedisModule_ThreadSafeContextLock)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
|
||||||
|
REDISMODULE_API int (*RedisModule_ThreadSafeContextTryLock)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
|
||||||
REDISMODULE_API void (*RedisModule_ThreadSafeContextUnlock)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
|
REDISMODULE_API void (*RedisModule_ThreadSafeContextUnlock)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
|
||||||
REDISMODULE_API int (*RedisModule_SubscribeToKeyspaceEvents)(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc cb) REDISMODULE_ATTR;
|
REDISMODULE_API int (*RedisModule_SubscribeToKeyspaceEvents)(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc cb) REDISMODULE_ATTR;
|
||||||
REDISMODULE_API int (*RedisModule_NotifyKeyspaceEvent)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) REDISMODULE_ATTR;
|
REDISMODULE_API int (*RedisModule_NotifyKeyspaceEvent)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) REDISMODULE_ATTR;
|
||||||
@ -899,6 +900,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
|
|||||||
REDISMODULE_GET_API(GetThreadSafeContext);
|
REDISMODULE_GET_API(GetThreadSafeContext);
|
||||||
REDISMODULE_GET_API(FreeThreadSafeContext);
|
REDISMODULE_GET_API(FreeThreadSafeContext);
|
||||||
REDISMODULE_GET_API(ThreadSafeContextLock);
|
REDISMODULE_GET_API(ThreadSafeContextLock);
|
||||||
|
REDISMODULE_GET_API(ThreadSafeContextTryLock);
|
||||||
REDISMODULE_GET_API(ThreadSafeContextUnlock);
|
REDISMODULE_GET_API(ThreadSafeContextUnlock);
|
||||||
REDISMODULE_GET_API(BlockClient);
|
REDISMODULE_GET_API(BlockClient);
|
||||||
REDISMODULE_GET_API(UnblockClient);
|
REDISMODULE_GET_API(UnblockClient);
|
||||||
|
@ -1601,6 +1601,7 @@ void moduleBlockedClientTimedOut(client *c);
|
|||||||
void moduleBlockedClientPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask);
|
void moduleBlockedClientPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask);
|
||||||
size_t moduleCount(void);
|
size_t moduleCount(void);
|
||||||
void moduleAcquireGIL(void);
|
void moduleAcquireGIL(void);
|
||||||
|
int moduleTryAcquireGIL(void);
|
||||||
void moduleReleaseGIL(void);
|
void moduleReleaseGIL(void);
|
||||||
void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid);
|
void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid);
|
||||||
void moduleCallCommandFilters(client *c);
|
void moduleCallCommandFilters(client *c);
|
||||||
|
@ -23,7 +23,9 @@ TEST_MODULES = \
|
|||||||
scan.so \
|
scan.so \
|
||||||
datatype.so \
|
datatype.so \
|
||||||
auth.so \
|
auth.so \
|
||||||
keyspace_events.so
|
keyspace_events.so \
|
||||||
|
blockedclient.so
|
||||||
|
|
||||||
|
|
||||||
.PHONY: all
|
.PHONY: all
|
||||||
|
|
||||||
|
82
tests/modules/blockedclient.c
Normal file
82
tests/modules/blockedclient.c
Normal file
@ -0,0 +1,82 @@
|
|||||||
|
#define REDISMODULE_EXPERIMENTAL_API
|
||||||
|
#include "redismodule.h"
|
||||||
|
#include <assert.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <pthread.h>
|
||||||
|
|
||||||
|
#define UNUSED(V) ((void) V)
|
||||||
|
|
||||||
|
void *sub_worker(void *arg) {
|
||||||
|
// Get Redis module context
|
||||||
|
RedisModuleCtx *ctx = (RedisModuleCtx *)arg;
|
||||||
|
|
||||||
|
// Try acquiring GIL
|
||||||
|
int res = RedisModule_ThreadSafeContextTryLock(ctx);
|
||||||
|
|
||||||
|
// GIL is already taken by the calling thread expecting to fail.
|
||||||
|
assert(res != REDISMODULE_OK);
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *worker(void *arg) {
|
||||||
|
// Retrieve blocked client
|
||||||
|
RedisModuleBlockedClient *bc = (RedisModuleBlockedClient *)arg;
|
||||||
|
|
||||||
|
// Get Redis module context
|
||||||
|
RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bc);
|
||||||
|
|
||||||
|
// Acquire GIL
|
||||||
|
RedisModule_ThreadSafeContextLock(ctx);
|
||||||
|
|
||||||
|
// Create another thread which will try to acquire the GIL
|
||||||
|
pthread_t tid;
|
||||||
|
int res = pthread_create(&tid, NULL, sub_worker, ctx);
|
||||||
|
assert(res == 0);
|
||||||
|
|
||||||
|
// Wait for thread
|
||||||
|
pthread_join(tid, NULL);
|
||||||
|
|
||||||
|
// Release GIL
|
||||||
|
RedisModule_ThreadSafeContextUnlock(ctx);
|
||||||
|
|
||||||
|
// Reply to client
|
||||||
|
RedisModule_ReplyWithSimpleString(ctx, "OK");
|
||||||
|
|
||||||
|
// Unblock client
|
||||||
|
RedisModule_UnblockClient(bc, NULL);
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int acquire_gil(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||||
|
{
|
||||||
|
UNUSED(argv);
|
||||||
|
UNUSED(argc);
|
||||||
|
|
||||||
|
/* This command handler tries to acquire the GIL twice
|
||||||
|
* once in the worker thread using "RedisModule_ThreadSafeContextLock"
|
||||||
|
* second in the sub-worker thread
|
||||||
|
* using "RedisModule_ThreadSafeContextTryLock"
|
||||||
|
* as the GIL is already locked. */
|
||||||
|
RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx, NULL, NULL, NULL, 0);
|
||||||
|
|
||||||
|
pthread_t tid;
|
||||||
|
int res = pthread_create(&tid, NULL, worker, bc);
|
||||||
|
assert(res == 0);
|
||||||
|
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||||
|
REDISMODULE_NOT_USED(argv);
|
||||||
|
REDISMODULE_NOT_USED(argc);
|
||||||
|
|
||||||
|
if (RedisModule_Init(ctx, "blockedclient", 1, REDISMODULE_APIVER_1)== REDISMODULE_ERR)
|
||||||
|
return REDISMODULE_ERR;
|
||||||
|
|
||||||
|
if (RedisModule_CreateCommand(ctx, "acquire_gil", acquire_gil, "", 0, 0, 0) == REDISMODULE_ERR)
|
||||||
|
return REDISMODULE_ERR;
|
||||||
|
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
}
|
11
tests/unit/moduleapi/blockedclient.tcl
Normal file
11
tests/unit/moduleapi/blockedclient.tcl
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
# source tests/support/util.tcl
|
||||||
|
|
||||||
|
set testmodule [file normalize tests/modules/blockedclient.so]
|
||||||
|
|
||||||
|
start_server {tags {"modules"}} {
|
||||||
|
r module load $testmodule
|
||||||
|
|
||||||
|
test {Locked GIL acquisition} {
|
||||||
|
assert_match "OK" [r acquire_gil]
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user