Fix race condition issues between the main thread and module threads (#12817)

Fix #12785 and other race condition issues.
See the following isolated comments.

The following report was obtained using SANITIZER thread.
```sh
make SANITIZER=thread
./runtest-moduleapi --config io-threads 4 --config io-threads-do-reads yes --accurate
```

1. Fixed thread-safe issue in RM_UnblockClient()
Related discussion:
https://github.com/redis/redis/pull/12817#issuecomment-1831181220
* When blocking a client in a module using `RM_BlockClientOnKeys()` or
`RM_BlockClientOnKeysWithFlags()`
with a timeout_callback, calling RM_UnblockClient() in module threads
can lead to race conditions
     in `updateStatsOnUnblock()`.

     - Introduced: 
        Version: 6.2
        PR: #7491

     - Touch:
`server.stat_numcommands`, `cmd->latency_histogram`, `server.slowlog`,
and `server.latency_events`
     
     - Harm Level: High
Potentially corrupts the memory data of `cmd->latency_histogram`,
`server.slowlog`, and `server.latency_events`

     - Solution:
Differentiate whether the call to moduleBlockedClientTimedOut() comes
from the module or the main thread.
Since we can't know if RM_UnblockClient() comes from module threads, we
always assume it does and
let `updateStatsOnUnblock()` asynchronously update the unblock status.
     
* When error reply is called in timeout_callback(), ctx is not
thread-safe, eventually lead to race conditions in `afterErrorReply`.

     - Introduced: 
        Version: 6.2
        PR: #8217

     - Touch
       `server.stat_total_error_replies`, `server.errors`, 

     - Harm Level: High
       Potentially corrupts the memory data of `server.errors`
   
      - Solution: 
Make the ctx in `timeout_callback()` with `REDISMODULE_CTX_THREAD_SAFE`,
and asynchronously reply errors to the client.

2. Made RM_Reply*() family API thread-safe
Related discussion:
https://github.com/redis/redis/pull/12817#discussion_r1408707239
Call chain: `RM_Reply*()` -> `_addReplyToBufferOrList()` -> touch
server.current_client

    - Introduced: 
       Version: 7.2.0
       PR: #12326

   - Harm Level: None
Since the module fake client won't have the `CLIENT_PUSHING` flag, even
if we touch server.current_client,
     we can still exit after `c->flags & CLIENT_PUSHING`.

   - Solution
      Checking `c->flags & CLIENT_PUSHING` earlier.

3. Made freeClient() thread-safe
    Fix #12785

    - Introduced: 
       Version: 4.0
Commit:
3fcf959e60

    - Harm Level: Moderate
       * Trigger assertion
It happens when the module thread calls freeClient while the io-thread
is in progress,
which just triggers an assertion, and doesn't make any race condiaions.

* Touch `server.current_client`, `server.stat_clients_type_memory`, and
`clientMemUsageBucket->clients`.
It happens between the main thread and the module threads, may cause
data corruption.
1. Error reset `server.current_client` to NULL, but theoretically this
won't happen,
because the module has already reset `server.current_client` to old
value before entering freeClient.
2. corrupts `clientMemUsageBucket->clients` in
updateClientMemUsageAndBucket().
3. Causes server.stat_clients_type_memory memory statistics to be
inaccurate.
    
    - Solution:
* No longer counts memory usage on fake clients, to avoid updating
`server.stat_clients_type_memory` in freeClient.
* No longer resetting `server.current_client` in unlinkClient, because
the fake client won't be evicted or disconnected in the mid of the
process.
* Judgment assertion `io_threads_op == IO_THREADS_OP_IDLE` only if c is
not a fake client.

4. Fixed free client args without GIL
Related discussion:
https://github.com/redis/redis/pull/12817#discussion_r1408706695
When freeing retained strings in the module thread (refcount decr), or
using them in some way (refcount incr), we should do so while holding
the GIL,
otherwise, they might be simultaneously freed while the main thread is
processing the unblock client state.

    - Introduced: 
       Version: 6.2.0
       PR: #8141

   - Harm Level: Low
     Trigger assertion or double free or memory leak. 

   - Solution:
Documenting that module API users need to ensure any access to these
retained strings is done with the GIL locked

5. Fix adding fake client to server.clients_pending_write
    It will incorrectly log the memory usage for the fake client.
Related discussion:
https://github.com/redis/redis/pull/12817#issuecomment-1851899163

    - Introduced: 
       Version: 4.0
Commit:
9b01b64430

    - Harm Level: None
      Only result in NOP

    - Solution:
       * Don't add fake client into server.clients_pending_write
* Add c->conn assertion for updateClientMemUsageAndBucket() and
updateClientMemoryUsage() to avoid same
         issue in the future.
So now it will be the responsibility of the caller of both of them to
avoid passing in fake client.

6. Fix calling RM_BlockedClientMeasureTimeStart() and
RM_BlockedClientMeasureTimeEnd() without GIL
    - Introduced: 
       Version: 6.2
       PR: #7491

   - Harm Level: Low
Causes inaccuracies in command latency histogram and slow logs, but does
not corrupt memory.

   - Solution:
Module API users, if know that non-thread-safe APIs will be used in
multi-threading, need to take responsibility for protecting them with
their own locks instead of the GIL, as using the GIL is too expensive.

### Other issue
1. RM_Yield is not thread-safe, fixed via #12905.

### Summarize
1. Fix thread-safe issues for `RM_UnblockClient()`, `freeClient()` and
`RM_Yield`, potentially preventing memory corruption, data disorder, or
assertion.
2. Updated docs and module test to clarify module API users'
responsibility for locking non-thread-safe APIs in multi-threading, such
as RM_BlockedClientMeasureTimeStart/End(), RM_FreeString(),
RM_RetainString(), and RM_HoldString().

### About backpot to 7.2
1. The implement of (1) is not too satisfying, would like to get more
eyes.
2. (2), (3) can be safely for backport
3. (4), (6) just modifying the module tests and updating the
documentation, no need for a backpot.
4. (5) is harmless, no need for a backpot.

---------

Co-authored-by: Oran Agra <oran@redislabs.com>
This commit is contained in:
debing.sun 2024-01-19 21:12:49 +08:00 committed by GitHub
parent f81c3fd89e
commit d0640029dc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 130 additions and 58 deletions

View File

@ -239,7 +239,7 @@ void replyToBlockedClientTimedOut(client *c) {
addReplyLongLong(c,server.fsynced_reploff >= c->bstate.reploffset);
addReplyLongLong(c,replicationCountAOFAcksByOffset(c->bstate.reploffset));
} else if (c->bstate.btype == BLOCKED_MODULE) {
moduleBlockedClientTimedOut(c);
moduleBlockedClientTimedOut(c, 0);
} else {
serverPanic("Unknown btype in replyToBlockedClientTimedOut().");
}

View File

@ -306,7 +306,6 @@ static size_t moduleTempClientMinCount = 0; /* Min client count in pool since
* allow thread safe contexts to execute commands at a safe moment. */
static pthread_mutex_t moduleGIL = PTHREAD_MUTEX_INITIALIZER;
/* Function pointer type for keyspace event notification subscriptions from modules. */
typedef int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key);
@ -2338,7 +2337,10 @@ ustime_t RM_CachedMicroseconds(void) {
* Within the same command, you can call multiple times
* RM_BlockedClientMeasureTimeStart() and RM_BlockedClientMeasureTimeEnd()
* to accumulate independent time intervals to the background duration.
* This method always return REDISMODULE_OK. */
* This method always return REDISMODULE_OK.
*
* This function is not thread safe, If used in module thread and blocked callback (possibly main thread)
* simultaneously, it's recommended to protect them with lock owned by caller instead of GIL. */
int RM_BlockedClientMeasureTimeStart(RedisModuleBlockedClient *bc) {
elapsedStart(&(bc->background_timer));
return REDISMODULE_OK;
@ -2348,7 +2350,10 @@ int RM_BlockedClientMeasureTimeStart(RedisModuleBlockedClient *bc) {
* to calculate the elapsed execution time.
* On success REDISMODULE_OK is returned.
* This method only returns REDISMODULE_ERR if no start time was
* previously defined ( meaning RM_BlockedClientMeasureTimeStart was not called ). */
* previously defined ( meaning RM_BlockedClientMeasureTimeStart was not called ).
*
* This function is not thread safe, If used in module thread and blocked callback (possibly main thread)
* simultaneously, it's recommended to protect them with lock owned by caller instead of GIL. */
int RM_BlockedClientMeasureTimeEnd(RedisModuleBlockedClient *bc) {
// If the counter is 0 then we haven't called RM_BlockedClientMeasureTimeStart
if (!bc->background_timer)
@ -2717,7 +2722,10 @@ RedisModuleString *RM_CreateStringFromStreamID(RedisModuleCtx *ctx, const RedisM
* pass ctx as NULL when releasing the string (but passing a context will not
* create any issue). Strings created with a context should be freed also passing
* the context, so if you want to free a string out of context later, make sure
* to create it using a NULL context. */
* to create it using a NULL context.
*
* This API is not thread safe, access to these retained strings (if they originated
* from a client command arguments) must be done with GIL locked. */
void RM_FreeString(RedisModuleCtx *ctx, RedisModuleString *str) {
decrRefCount(str);
if (ctx != NULL) autoMemoryFreed(ctx,REDISMODULE_AM_STRING,str);
@ -2754,7 +2762,10 @@ void RM_FreeString(RedisModuleCtx *ctx, RedisModuleString *str) {
*
* Threaded modules that reference retained strings from other threads *must*
* explicitly trim the allocation as soon as the string is retained. Not doing
* so may result with automatic trimming which is not thread safe. */
* so may result with automatic trimming which is not thread safe.
*
* This API is not thread safe, access to these retained strings (if they originated
* from a client command arguments) must be done with GIL locked. */
void RM_RetainString(RedisModuleCtx *ctx, RedisModuleString *str) {
if (ctx == NULL || !autoMemoryFreed(ctx,REDISMODULE_AM_STRING,str)) {
/* Increment the string reference counting only if we can't
@ -2796,7 +2807,10 @@ void RM_RetainString(RedisModuleCtx *ctx, RedisModuleString *str) {
*
* Threaded modules that reference held strings from other threads *must*
* explicitly trim the allocation as soon as the string is held. Not doing
* so may result with automatic trimming which is not thread safe. */
* so may result with automatic trimming which is not thread safe.
*
* This API is not thread safe, access to these retained strings (if they originated
* from a client command arguments) must be done with GIL locked. */
RedisModuleString* RM_HoldString(RedisModuleCtx *ctx, RedisModuleString *str) {
if (str->refcount == OBJ_STATIC_REFCOUNT) {
return RM_CreateStringFromString(ctx, str);
@ -8228,7 +8242,7 @@ int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) {
* argument, but better to be safe than sorry. */
if (bc->timeout_callback == NULL) return REDISMODULE_ERR;
if (bc->unblocked) return REDISMODULE_OK;
if (bc->client) moduleBlockedClientTimedOut(bc->client);
if (bc->client) moduleBlockedClientTimedOut(bc->client, 1);
}
moduleUnblockClientByHandle(bc,privdata);
return REDISMODULE_OK;
@ -8327,8 +8341,10 @@ void moduleHandleBlockedClients(void) {
* This needs to be out of the reply callback above given that a
* module might not define any callback and still do blocking ops.
*/
if (c && !clientHasModuleAuthInProgress(c) && !bc->blocked_on_keys) {
updateStatsOnUnblock(c, bc->background_duration, reply_us, server.stat_total_error_replies != prev_error_replies);
if (c && !clientHasModuleAuthInProgress(c)) {
int had_errors = c->deferred_reply_errors ? !!listLength(c->deferred_reply_errors) :
(server.stat_total_error_replies != prev_error_replies);
updateStatsOnUnblock(c, bc->background_duration, reply_us, had_errors);
}
if (c != NULL) {
@ -8346,7 +8362,7 @@ void moduleHandleBlockedClients(void) {
* if there are pending replies here. This is needed since
* during a non blocking command the client may receive output. */
if (!clientHasModuleAuthInProgress(c) && clientHasPendingReplies(c) &&
!(c->flags & CLIENT_PENDING_WRITE))
!(c->flags & CLIENT_PENDING_WRITE) && c->conn)
{
c->flags |= CLIENT_PENDING_WRITE;
listLinkNodeHead(server.clients_pending_write, &c->clients_pending_write_node);
@ -8381,8 +8397,15 @@ int moduleBlockedClientMayTimeout(client *c) {
/* Called when our client timed out. After this function unblockClient()
* is called, and it will invalidate the blocked client. So this function
* does not need to do any cleanup. Eventually the module will call the
* API to unblock the client and the memory will be released. */
void moduleBlockedClientTimedOut(client *c) {
* API to unblock the client and the memory will be released.
*
* If this function is called from a module, we handle the timeout callback
* and the update of the unblock status in a thread-safe manner to avoid race
* conditions with the main thread.
* If this function is called from the main thread, we must handle the unblocking
* of the client synchronously. This ensures that we can reply to the client before
* resetClient() is called. */
void moduleBlockedClientTimedOut(client *c, int from_module) {
RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle;
/* Protect against re-processing: don't serve clients that are already
@ -8391,14 +8414,22 @@ void moduleBlockedClientTimedOut(client *c) {
if (bc->unblocked) return;
RedisModuleCtx ctx;
moduleCreateContext(&ctx, bc->module, REDISMODULE_CTX_BLOCKED_TIMEOUT);
int flags = REDISMODULE_CTX_BLOCKED_TIMEOUT;
if (from_module) flags |= REDISMODULE_CTX_THREAD_SAFE;
moduleCreateContext(&ctx, bc->module, flags);
ctx.client = bc->client;
ctx.blocked_client = bc;
ctx.blocked_privdata = bc->privdata;
long long prev_error_replies = server.stat_total_error_replies;
long long prev_error_replies;
if (!from_module)
prev_error_replies = server.stat_total_error_replies;
bc->timeout_callback(&ctx,(void**)c->argv,c->argc);
moduleFreeContext(&ctx);
updateStatsOnUnblock(c, bc->background_duration, 0, server.stat_total_error_replies != prev_error_replies);
if (!from_module)
updateStatsOnUnblock(c, bc->background_duration, 0, server.stat_total_error_replies != prev_error_replies);
/* For timeout events, we do not want to call the disconnect callback,
* because the blocked client will be automatically disconnected in

View File

@ -414,8 +414,9 @@ void _addReplyToBufferOrList(client *c, const char *s, size_t len) {
* to a channel which we are subscribed to, then we wanna postpone that message to be added
* after the command's reply (specifically important during multi-exec). the exception is
* the SUBSCRIBE command family, which (currently) have a push message instead of a proper reply.
* The check for executing_client also avoids affecting push messages that are part of eviction. */
if (c == server.current_client && (c->flags & CLIENT_PUSHING) &&
* The check for executing_client also avoids affecting push messages that are part of eviction.
* Check CLIENT_PUSHING first to avoid race conditions, as it's absent in module's fake client. */
if ((c->flags & CLIENT_PUSHING) && c == server.current_client &&
server.executing_client && !cmdHasPushAsReply(server.executing_client->cmd))
{
_addReplyProtoToList(c,server.pending_push_messages,s,len);
@ -1450,7 +1451,7 @@ void unlinkClient(client *c) {
listNode *ln;
/* If this is marked as current client unset it. */
if (server.current_client == c) server.current_client = NULL;
if (c->conn && server.current_client == c) server.current_client = NULL;
/* Certain operations must be done only if the client has an active connection.
* If the client was already unlinked or if it's a "fake client" the
@ -1494,7 +1495,7 @@ void unlinkClient(client *c) {
}
/* Remove from the list of pending reads if needed. */
serverAssert(io_threads_op == IO_THREADS_OP_IDLE);
serverAssert(!c->conn || io_threads_op == IO_THREADS_OP_IDLE);
if (c->pending_read_list_node != NULL) {
listDelNode(server.clients_pending_read,c->pending_read_list_node);
c->pending_read_list_node = NULL;
@ -1649,6 +1650,12 @@ void freeClient(client *c) {
reqresReset(c, 1);
#endif
/* Remove the contribution that this client gave to our
* incrementally computed memory usage. */
if (c->conn)
server.stat_clients_type_memory[c->last_memory_type] -=
c->last_memory_usage;
/* Unlink the client: this will close the socket, remove the I/O
* handlers, and remove references of the client from different
* places where active clients may be referenced. */
@ -1697,10 +1704,6 @@ void freeClient(client *c) {
* we lost the connection with the master. */
if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection();
/* Remove the contribution that this client gave to our
* incrementally computed memory usage. */
server.stat_clients_type_memory[c->last_memory_type] -=
c->last_memory_usage;
/* Remove client from memory usage buckets */
if (c->mem_usage_bucket) {
c->mem_usage_bucket->mem_usage_sum -= c->last_memory_usage;
@ -2487,7 +2490,7 @@ int processCommandAndResetClient(client *c) {
commandProcessed(c);
/* Update the client's memory to include output buffer growth following the
* processed command. */
updateClientMemUsageAndBucket(c);
if (c->conn) updateClientMemUsageAndBucket(c);
}
if (server.current_client == NULL) deadclient = 1;

View File

@ -994,6 +994,7 @@ static inline clientMemUsageBucket *getMemUsageBucket(size_t mem) {
* usage bucket.
*/
void updateClientMemoryUsage(client *c) {
serverAssert(c->conn);
size_t mem = getClientMemoryUsage(c, NULL);
int type = getClientType(c);
/* Now that we have the memory used by the client, remove the old
@ -1006,7 +1007,7 @@ void updateClientMemoryUsage(client *c) {
}
int clientEvictionAllowed(client *c) {
if (server.maxmemory_clients == 0 || c->flags & CLIENT_NO_EVICT) {
if (server.maxmemory_clients == 0 || c->flags & CLIENT_NO_EVICT || !c->conn) {
return 0;
}
int type = getClientType(c);
@ -1046,7 +1047,7 @@ void removeClientFromMemUsageBucket(client *c, int allow_eviction) {
* returns 1 if client eviction for this client is allowed, 0 otherwise.
*/
int updateClientMemUsageAndBucket(client *c) {
serverAssert(io_threads_op == IO_THREADS_OP_IDLE);
serverAssert(io_threads_op == IO_THREADS_OP_IDLE && c->conn);
int allow_eviction = clientEvictionAllowed(c);
removeClientFromMemUsageBucket(c, allow_eviction);

View File

@ -2532,7 +2532,7 @@ void moduleFreeContext(struct RedisModuleCtx *ctx);
void moduleCallCommandUnblockedHandler(client *c);
void unblockClientFromModule(client *c);
void moduleHandleBlockedClients(void);
void moduleBlockedClientTimedOut(client *c);
void moduleBlockedClientTimedOut(client *c, int from_module);
void modulePipeReadable(aeEventLoop *el, int fd, void *privdata, int mask);
size_t moduleCount(void);
void moduleAcquireGIL(void);

View File

@ -102,6 +102,7 @@ typedef struct {
void *bg_call_worker(void *arg) {
bg_call_data *bg = arg;
RedisModuleBlockedClient *bc = bg->bc;
// Get Redis module context
RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bg->bc);
@ -135,6 +136,12 @@ void *bg_call_worker(void *arg) {
RedisModuleCallReply *rep = RedisModule_Call(ctx, cmd, format, bg->argv + cmd_pos + 1, bg->argc - cmd_pos - 1);
RedisModule_FreeString(NULL, format_redis_str);
/* Free the arguments within GIL to prevent simultaneous freeing in main thread. */
for (int i=0; i<bg->argc; i++)
RedisModule_FreeString(ctx, bg->argv[i]);
RedisModule_Free(bg->argv);
RedisModule_Free(bg);
// Release GIL
RedisModule_ThreadSafeContextUnlock(ctx);
@ -147,13 +154,7 @@ void *bg_call_worker(void *arg) {
}
// Unblock client
RedisModule_UnblockClient(bg->bc, NULL);
/* Free the arguments */
for (int i=0; i<bg->argc; i++)
RedisModule_FreeString(ctx, bg->argv[i]);
RedisModule_Free(bg->argv);
RedisModule_Free(bg);
RedisModule_UnblockClient(bc, NULL);
// Free the Redis module context
RedisModule_FreeThreadSafeContext(ctx);

View File

@ -7,12 +7,41 @@
#define UNUSED(x) (void)(x)
typedef struct {
/* Mutex for protecting RedisModule_BlockedClientMeasureTime*() API from race
* conditions due to timeout callback triggered in the main thread. */
pthread_mutex_t measuretime_mutex;
int measuretime_completed; /* Indicates that time measure has ended and will not continue further */
int myint; /* Used for replying */
} BlockPrivdata;
void blockClientPrivdataInit(RedisModuleBlockedClient *bc) {
BlockPrivdata *block_privdata = RedisModule_Calloc(1, sizeof(*block_privdata));
block_privdata->measuretime_mutex = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
RedisModule_BlockClientSetPrivateData(bc, block_privdata);
}
void blockClientMeasureTimeStart(RedisModuleBlockedClient *bc, BlockPrivdata *block_privdata) {
pthread_mutex_lock(&block_privdata->measuretime_mutex);
RedisModule_BlockedClientMeasureTimeStart(bc);
pthread_mutex_unlock(&block_privdata->measuretime_mutex);
}
void blockClientMeasureTimeEnd(RedisModuleBlockedClient *bc, BlockPrivdata *block_privdata, int completed) {
pthread_mutex_lock(&block_privdata->measuretime_mutex);
if (!block_privdata->measuretime_completed) {
RedisModule_BlockedClientMeasureTimeEnd(bc);
if (completed) block_privdata->measuretime_completed = 1;
}
pthread_mutex_unlock(&block_privdata->measuretime_mutex);
}
/* Reply callback for blocking command BLOCK.DEBUG */
int HelloBlock_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
UNUSED(argv);
UNUSED(argc);
int *myint = RedisModule_GetBlockedClientPrivateData(ctx);
return RedisModule_ReplyWithLongLong(ctx,*myint);
BlockPrivdata *block_privdata = RedisModule_GetBlockedClientPrivateData(ctx);
return RedisModule_ReplyWithLongLong(ctx,block_privdata->myint);
}
/* Timeout callback for blocking command BLOCK.DEBUG */
@ -20,13 +49,16 @@ int HelloBlock_Timeout(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
UNUSED(argv);
UNUSED(argc);
RedisModuleBlockedClient *bc = RedisModule_GetBlockedClientHandle(ctx);
RedisModule_BlockedClientMeasureTimeEnd(bc);
BlockPrivdata *block_privdata = RedisModule_GetBlockedClientPrivateData(ctx);
blockClientMeasureTimeEnd(bc, block_privdata, 1);
return RedisModule_ReplyWithSimpleString(ctx,"Request timedout");
}
/* Private data freeing callback for BLOCK.DEBUG command. */
void HelloBlock_FreeData(RedisModuleCtx *ctx, void *privdata) {
UNUSED(ctx);
BlockPrivdata *block_privdata = privdata;
pthread_mutex_destroy(&block_privdata->measuretime_mutex);
RedisModule_Free(privdata);
}
@ -42,19 +74,20 @@ void *BlockDebug_ThreadMain(void *arg) {
RedisModuleBlockedClient *bc = targ[0];
long long delay = (unsigned long)targ[1];
long long enable_time_track = (unsigned long)targ[2];
BlockPrivdata *block_privdata = RedisModule_BlockClientGetPrivateData(bc);
if (enable_time_track)
RedisModule_BlockedClientMeasureTimeStart(bc);
blockClientMeasureTimeStart(bc, block_privdata);
RedisModule_Free(targ);
struct timespec ts;
ts.tv_sec = delay / 1000;
ts.tv_nsec = (delay % 1000) * 1000000;
nanosleep(&ts, NULL);
int *r = RedisModule_Alloc(sizeof(int));
*r = rand();
if (enable_time_track)
RedisModule_BlockedClientMeasureTimeEnd(bc);
RedisModule_UnblockClient(bc,r);
blockClientMeasureTimeEnd(bc, block_privdata, 0);
block_privdata->myint = rand();
RedisModule_UnblockClient(bc,block_privdata);
return NULL;
}
@ -64,23 +97,22 @@ void *DoubleBlock_ThreadMain(void *arg) {
void **targ = arg;
RedisModuleBlockedClient *bc = targ[0];
long long delay = (unsigned long)targ[1];
RedisModule_BlockedClientMeasureTimeStart(bc);
BlockPrivdata *block_privdata = RedisModule_BlockClientGetPrivateData(bc);
blockClientMeasureTimeStart(bc, block_privdata);
RedisModule_Free(targ);
struct timespec ts;
ts.tv_sec = delay / 1000;
ts.tv_nsec = (delay % 1000) * 1000000;
nanosleep(&ts, NULL);
int *r = RedisModule_Alloc(sizeof(int));
*r = rand();
RedisModule_BlockedClientMeasureTimeEnd(bc);
blockClientMeasureTimeEnd(bc, block_privdata, 0);
/* call again RedisModule_BlockedClientMeasureTimeStart() and
* RedisModule_BlockedClientMeasureTimeEnd and ensure that the
* total execution time is 2x the delay. */
RedisModule_BlockedClientMeasureTimeStart(bc);
blockClientMeasureTimeStart(bc, block_privdata);
nanosleep(&ts, NULL);
RedisModule_BlockedClientMeasureTimeEnd(bc);
RedisModule_UnblockClient(bc,r);
blockClientMeasureTimeEnd(bc, block_privdata, 0);
block_privdata->myint = rand();
RedisModule_UnblockClient(bc,block_privdata);
return NULL;
}
@ -107,6 +139,7 @@ int HelloBlock_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int a
pthread_t tid;
RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout);
blockClientPrivdataInit(bc);
/* Here we set a disconnection handler, however since this module will
* block in sleep() in a thread, there is not much we can do in the
@ -148,6 +181,7 @@ int HelloBlockNoTracking_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **a
pthread_t tid;
RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout);
blockClientPrivdataInit(bc);
/* Here we set a disconnection handler, however since this module will
* block in sleep() in a thread, there is not much we can do in the
@ -184,6 +218,7 @@ int HelloDoubleBlock_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
pthread_t tid;
RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,0);
blockClientPrivdataInit(bc);
/* Now that we setup a blocking client, we need to pass the control
* to the thread. However we need to pass arguments to the thread:

View File

@ -115,6 +115,7 @@ typedef struct {
void *bg_call_worker(void *arg) {
bg_call_data *bg = arg;
RedisModuleBlockedClient *bc = bg->bc;
// Get Redis module context
RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bg->bc);
@ -136,6 +137,12 @@ void *bg_call_worker(void *arg) {
RedisModuleCallReply *rep = RedisModule_Call(ctx, cmd, format, bg->argv + 3, bg->argc - 3);
RedisModule_FreeString(NULL, format_redis_str);
/* Free the arguments within GIL to prevent simultaneous freeing in main thread. */
for (int i=0; i<bg->argc; i++)
RedisModule_FreeString(ctx, bg->argv[i]);
RedisModule_Free(bg->argv);
RedisModule_Free(bg);
// Release GIL
RedisModule_ThreadSafeContextUnlock(ctx);
@ -148,13 +155,7 @@ void *bg_call_worker(void *arg) {
}
// Unblock client
RedisModule_UnblockClient(bg->bc, NULL);
/* Free the arguments */
for (int i=0; i<bg->argc; i++)
RedisModule_FreeString(ctx, bg->argv[i]);
RedisModule_Free(bg->argv);
RedisModule_Free(bg);
RedisModule_UnblockClient(bc, NULL);
// Free the Redis module context
RedisModule_FreeThreadSafeContext(ctx);