diff --git a/src/blocked.c b/src/blocked.c index 6ad4667db..3108cb677 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -239,7 +239,7 @@ void replyToBlockedClientTimedOut(client *c) { addReplyLongLong(c,server.fsynced_reploff >= c->bstate.reploffset); addReplyLongLong(c,replicationCountAOFAcksByOffset(c->bstate.reploffset)); } else if (c->bstate.btype == BLOCKED_MODULE) { - moduleBlockedClientTimedOut(c); + moduleBlockedClientTimedOut(c, 0); } else { serverPanic("Unknown btype in replyToBlockedClientTimedOut()."); } diff --git a/src/module.c b/src/module.c index a60a345ae..c54d164b7 100644 --- a/src/module.c +++ b/src/module.c @@ -306,7 +306,6 @@ static size_t moduleTempClientMinCount = 0; /* Min client count in pool since * allow thread safe contexts to execute commands at a safe moment. */ static pthread_mutex_t moduleGIL = PTHREAD_MUTEX_INITIALIZER; - /* Function pointer type for keyspace event notification subscriptions from modules. */ typedef int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key); @@ -2338,7 +2337,10 @@ ustime_t RM_CachedMicroseconds(void) { * Within the same command, you can call multiple times * RM_BlockedClientMeasureTimeStart() and RM_BlockedClientMeasureTimeEnd() * to accumulate independent time intervals to the background duration. - * This method always return REDISMODULE_OK. */ + * This method always return REDISMODULE_OK. + * + * This function is not thread safe, If used in module thread and blocked callback (possibly main thread) + * simultaneously, it's recommended to protect them with lock owned by caller instead of GIL. */ int RM_BlockedClientMeasureTimeStart(RedisModuleBlockedClient *bc) { elapsedStart(&(bc->background_timer)); return REDISMODULE_OK; @@ -2348,7 +2350,10 @@ int RM_BlockedClientMeasureTimeStart(RedisModuleBlockedClient *bc) { * to calculate the elapsed execution time. * On success REDISMODULE_OK is returned. * This method only returns REDISMODULE_ERR if no start time was - * previously defined ( meaning RM_BlockedClientMeasureTimeStart was not called ). */ + * previously defined ( meaning RM_BlockedClientMeasureTimeStart was not called ). + * + * This function is not thread safe, If used in module thread and blocked callback (possibly main thread) + * simultaneously, it's recommended to protect them with lock owned by caller instead of GIL. */ int RM_BlockedClientMeasureTimeEnd(RedisModuleBlockedClient *bc) { // If the counter is 0 then we haven't called RM_BlockedClientMeasureTimeStart if (!bc->background_timer) @@ -2717,7 +2722,10 @@ RedisModuleString *RM_CreateStringFromStreamID(RedisModuleCtx *ctx, const RedisM * pass ctx as NULL when releasing the string (but passing a context will not * create any issue). Strings created with a context should be freed also passing * the context, so if you want to free a string out of context later, make sure - * to create it using a NULL context. */ + * to create it using a NULL context. + * + * This API is not thread safe, access to these retained strings (if they originated + * from a client command arguments) must be done with GIL locked. */ void RM_FreeString(RedisModuleCtx *ctx, RedisModuleString *str) { decrRefCount(str); if (ctx != NULL) autoMemoryFreed(ctx,REDISMODULE_AM_STRING,str); @@ -2754,7 +2762,10 @@ void RM_FreeString(RedisModuleCtx *ctx, RedisModuleString *str) { * * Threaded modules that reference retained strings from other threads *must* * explicitly trim the allocation as soon as the string is retained. Not doing - * so may result with automatic trimming which is not thread safe. */ + * so may result with automatic trimming which is not thread safe. + * + * This API is not thread safe, access to these retained strings (if they originated + * from a client command arguments) must be done with GIL locked. */ void RM_RetainString(RedisModuleCtx *ctx, RedisModuleString *str) { if (ctx == NULL || !autoMemoryFreed(ctx,REDISMODULE_AM_STRING,str)) { /* Increment the string reference counting only if we can't @@ -2796,7 +2807,10 @@ void RM_RetainString(RedisModuleCtx *ctx, RedisModuleString *str) { * * Threaded modules that reference held strings from other threads *must* * explicitly trim the allocation as soon as the string is held. Not doing - * so may result with automatic trimming which is not thread safe. */ + * so may result with automatic trimming which is not thread safe. + * + * This API is not thread safe, access to these retained strings (if they originated + * from a client command arguments) must be done with GIL locked. */ RedisModuleString* RM_HoldString(RedisModuleCtx *ctx, RedisModuleString *str) { if (str->refcount == OBJ_STATIC_REFCOUNT) { return RM_CreateStringFromString(ctx, str); @@ -8228,7 +8242,7 @@ int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) { * argument, but better to be safe than sorry. */ if (bc->timeout_callback == NULL) return REDISMODULE_ERR; if (bc->unblocked) return REDISMODULE_OK; - if (bc->client) moduleBlockedClientTimedOut(bc->client); + if (bc->client) moduleBlockedClientTimedOut(bc->client, 1); } moduleUnblockClientByHandle(bc,privdata); return REDISMODULE_OK; @@ -8327,8 +8341,10 @@ void moduleHandleBlockedClients(void) { * This needs to be out of the reply callback above given that a * module might not define any callback and still do blocking ops. */ - if (c && !clientHasModuleAuthInProgress(c) && !bc->blocked_on_keys) { - updateStatsOnUnblock(c, bc->background_duration, reply_us, server.stat_total_error_replies != prev_error_replies); + if (c && !clientHasModuleAuthInProgress(c)) { + int had_errors = c->deferred_reply_errors ? !!listLength(c->deferred_reply_errors) : + (server.stat_total_error_replies != prev_error_replies); + updateStatsOnUnblock(c, bc->background_duration, reply_us, had_errors); } if (c != NULL) { @@ -8346,7 +8362,7 @@ void moduleHandleBlockedClients(void) { * if there are pending replies here. This is needed since * during a non blocking command the client may receive output. */ if (!clientHasModuleAuthInProgress(c) && clientHasPendingReplies(c) && - !(c->flags & CLIENT_PENDING_WRITE)) + !(c->flags & CLIENT_PENDING_WRITE) && c->conn) { c->flags |= CLIENT_PENDING_WRITE; listLinkNodeHead(server.clients_pending_write, &c->clients_pending_write_node); @@ -8381,8 +8397,15 @@ int moduleBlockedClientMayTimeout(client *c) { /* Called when our client timed out. After this function unblockClient() * is called, and it will invalidate the blocked client. So this function * does not need to do any cleanup. Eventually the module will call the - * API to unblock the client and the memory will be released. */ -void moduleBlockedClientTimedOut(client *c) { + * API to unblock the client and the memory will be released. + * + * If this function is called from a module, we handle the timeout callback + * and the update of the unblock status in a thread-safe manner to avoid race + * conditions with the main thread. + * If this function is called from the main thread, we must handle the unblocking + * of the client synchronously. This ensures that we can reply to the client before + * resetClient() is called. */ +void moduleBlockedClientTimedOut(client *c, int from_module) { RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle; /* Protect against re-processing: don't serve clients that are already @@ -8391,14 +8414,22 @@ void moduleBlockedClientTimedOut(client *c) { if (bc->unblocked) return; RedisModuleCtx ctx; - moduleCreateContext(&ctx, bc->module, REDISMODULE_CTX_BLOCKED_TIMEOUT); + int flags = REDISMODULE_CTX_BLOCKED_TIMEOUT; + if (from_module) flags |= REDISMODULE_CTX_THREAD_SAFE; + moduleCreateContext(&ctx, bc->module, flags); ctx.client = bc->client; ctx.blocked_client = bc; ctx.blocked_privdata = bc->privdata; - long long prev_error_replies = server.stat_total_error_replies; + + long long prev_error_replies; + if (!from_module) + prev_error_replies = server.stat_total_error_replies; + bc->timeout_callback(&ctx,(void**)c->argv,c->argc); moduleFreeContext(&ctx); - updateStatsOnUnblock(c, bc->background_duration, 0, server.stat_total_error_replies != prev_error_replies); + + if (!from_module) + updateStatsOnUnblock(c, bc->background_duration, 0, server.stat_total_error_replies != prev_error_replies); /* For timeout events, we do not want to call the disconnect callback, * because the blocked client will be automatically disconnected in diff --git a/src/networking.c b/src/networking.c index c020faf89..0390092e4 100644 --- a/src/networking.c +++ b/src/networking.c @@ -414,8 +414,9 @@ void _addReplyToBufferOrList(client *c, const char *s, size_t len) { * to a channel which we are subscribed to, then we wanna postpone that message to be added * after the command's reply (specifically important during multi-exec). the exception is * the SUBSCRIBE command family, which (currently) have a push message instead of a proper reply. - * The check for executing_client also avoids affecting push messages that are part of eviction. */ - if (c == server.current_client && (c->flags & CLIENT_PUSHING) && + * The check for executing_client also avoids affecting push messages that are part of eviction. + * Check CLIENT_PUSHING first to avoid race conditions, as it's absent in module's fake client. */ + if ((c->flags & CLIENT_PUSHING) && c == server.current_client && server.executing_client && !cmdHasPushAsReply(server.executing_client->cmd)) { _addReplyProtoToList(c,server.pending_push_messages,s,len); @@ -1450,7 +1451,7 @@ void unlinkClient(client *c) { listNode *ln; /* If this is marked as current client unset it. */ - if (server.current_client == c) server.current_client = NULL; + if (c->conn && server.current_client == c) server.current_client = NULL; /* Certain operations must be done only if the client has an active connection. * If the client was already unlinked or if it's a "fake client" the @@ -1494,7 +1495,7 @@ void unlinkClient(client *c) { } /* Remove from the list of pending reads if needed. */ - serverAssert(io_threads_op == IO_THREADS_OP_IDLE); + serverAssert(!c->conn || io_threads_op == IO_THREADS_OP_IDLE); if (c->pending_read_list_node != NULL) { listDelNode(server.clients_pending_read,c->pending_read_list_node); c->pending_read_list_node = NULL; @@ -1649,6 +1650,12 @@ void freeClient(client *c) { reqresReset(c, 1); #endif + /* Remove the contribution that this client gave to our + * incrementally computed memory usage. */ + if (c->conn) + server.stat_clients_type_memory[c->last_memory_type] -= + c->last_memory_usage; + /* Unlink the client: this will close the socket, remove the I/O * handlers, and remove references of the client from different * places where active clients may be referenced. */ @@ -1697,10 +1704,6 @@ void freeClient(client *c) { * we lost the connection with the master. */ if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection(); - /* Remove the contribution that this client gave to our - * incrementally computed memory usage. */ - server.stat_clients_type_memory[c->last_memory_type] -= - c->last_memory_usage; /* Remove client from memory usage buckets */ if (c->mem_usage_bucket) { c->mem_usage_bucket->mem_usage_sum -= c->last_memory_usage; @@ -2487,7 +2490,7 @@ int processCommandAndResetClient(client *c) { commandProcessed(c); /* Update the client's memory to include output buffer growth following the * processed command. */ - updateClientMemUsageAndBucket(c); + if (c->conn) updateClientMemUsageAndBucket(c); } if (server.current_client == NULL) deadclient = 1; diff --git a/src/server.c b/src/server.c index d7707bb5a..4209db42d 100644 --- a/src/server.c +++ b/src/server.c @@ -994,6 +994,7 @@ static inline clientMemUsageBucket *getMemUsageBucket(size_t mem) { * usage bucket. */ void updateClientMemoryUsage(client *c) { + serverAssert(c->conn); size_t mem = getClientMemoryUsage(c, NULL); int type = getClientType(c); /* Now that we have the memory used by the client, remove the old @@ -1006,7 +1007,7 @@ void updateClientMemoryUsage(client *c) { } int clientEvictionAllowed(client *c) { - if (server.maxmemory_clients == 0 || c->flags & CLIENT_NO_EVICT) { + if (server.maxmemory_clients == 0 || c->flags & CLIENT_NO_EVICT || !c->conn) { return 0; } int type = getClientType(c); @@ -1046,7 +1047,7 @@ void removeClientFromMemUsageBucket(client *c, int allow_eviction) { * returns 1 if client eviction for this client is allowed, 0 otherwise. */ int updateClientMemUsageAndBucket(client *c) { - serverAssert(io_threads_op == IO_THREADS_OP_IDLE); + serverAssert(io_threads_op == IO_THREADS_OP_IDLE && c->conn); int allow_eviction = clientEvictionAllowed(c); removeClientFromMemUsageBucket(c, allow_eviction); diff --git a/src/server.h b/src/server.h index a913a1c8b..ba55e3dee 100644 --- a/src/server.h +++ b/src/server.h @@ -2532,7 +2532,7 @@ void moduleFreeContext(struct RedisModuleCtx *ctx); void moduleCallCommandUnblockedHandler(client *c); void unblockClientFromModule(client *c); void moduleHandleBlockedClients(void); -void moduleBlockedClientTimedOut(client *c); +void moduleBlockedClientTimedOut(client *c, int from_module); void modulePipeReadable(aeEventLoop *el, int fd, void *privdata, int mask); size_t moduleCount(void); void moduleAcquireGIL(void); diff --git a/tests/modules/blockedclient.c b/tests/modules/blockedclient.c index 92060fd33..23030cef4 100644 --- a/tests/modules/blockedclient.c +++ b/tests/modules/blockedclient.c @@ -102,6 +102,7 @@ typedef struct { void *bg_call_worker(void *arg) { bg_call_data *bg = arg; + RedisModuleBlockedClient *bc = bg->bc; // Get Redis module context RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bg->bc); @@ -135,6 +136,12 @@ void *bg_call_worker(void *arg) { RedisModuleCallReply *rep = RedisModule_Call(ctx, cmd, format, bg->argv + cmd_pos + 1, bg->argc - cmd_pos - 1); RedisModule_FreeString(NULL, format_redis_str); + /* Free the arguments within GIL to prevent simultaneous freeing in main thread. */ + for (int i=0; iargc; i++) + RedisModule_FreeString(ctx, bg->argv[i]); + RedisModule_Free(bg->argv); + RedisModule_Free(bg); + // Release GIL RedisModule_ThreadSafeContextUnlock(ctx); @@ -147,13 +154,7 @@ void *bg_call_worker(void *arg) { } // Unblock client - RedisModule_UnblockClient(bg->bc, NULL); - - /* Free the arguments */ - for (int i=0; iargc; i++) - RedisModule_FreeString(ctx, bg->argv[i]); - RedisModule_Free(bg->argv); - RedisModule_Free(bg); + RedisModule_UnblockClient(bc, NULL); // Free the Redis module context RedisModule_FreeThreadSafeContext(ctx); diff --git a/tests/modules/blockonbackground.c b/tests/modules/blockonbackground.c index 2e3b1a557..e068e20d9 100644 --- a/tests/modules/blockonbackground.c +++ b/tests/modules/blockonbackground.c @@ -7,12 +7,41 @@ #define UNUSED(x) (void)(x) +typedef struct { + /* Mutex for protecting RedisModule_BlockedClientMeasureTime*() API from race + * conditions due to timeout callback triggered in the main thread. */ + pthread_mutex_t measuretime_mutex; + int measuretime_completed; /* Indicates that time measure has ended and will not continue further */ + int myint; /* Used for replying */ +} BlockPrivdata; + +void blockClientPrivdataInit(RedisModuleBlockedClient *bc) { + BlockPrivdata *block_privdata = RedisModule_Calloc(1, sizeof(*block_privdata)); + block_privdata->measuretime_mutex = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER; + RedisModule_BlockClientSetPrivateData(bc, block_privdata); +} + +void blockClientMeasureTimeStart(RedisModuleBlockedClient *bc, BlockPrivdata *block_privdata) { + pthread_mutex_lock(&block_privdata->measuretime_mutex); + RedisModule_BlockedClientMeasureTimeStart(bc); + pthread_mutex_unlock(&block_privdata->measuretime_mutex); +} + +void blockClientMeasureTimeEnd(RedisModuleBlockedClient *bc, BlockPrivdata *block_privdata, int completed) { + pthread_mutex_lock(&block_privdata->measuretime_mutex); + if (!block_privdata->measuretime_completed) { + RedisModule_BlockedClientMeasureTimeEnd(bc); + if (completed) block_privdata->measuretime_completed = 1; + } + pthread_mutex_unlock(&block_privdata->measuretime_mutex); +} + /* Reply callback for blocking command BLOCK.DEBUG */ int HelloBlock_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { UNUSED(argv); UNUSED(argc); - int *myint = RedisModule_GetBlockedClientPrivateData(ctx); - return RedisModule_ReplyWithLongLong(ctx,*myint); + BlockPrivdata *block_privdata = RedisModule_GetBlockedClientPrivateData(ctx); + return RedisModule_ReplyWithLongLong(ctx,block_privdata->myint); } /* Timeout callback for blocking command BLOCK.DEBUG */ @@ -20,13 +49,16 @@ int HelloBlock_Timeout(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) UNUSED(argv); UNUSED(argc); RedisModuleBlockedClient *bc = RedisModule_GetBlockedClientHandle(ctx); - RedisModule_BlockedClientMeasureTimeEnd(bc); + BlockPrivdata *block_privdata = RedisModule_GetBlockedClientPrivateData(ctx); + blockClientMeasureTimeEnd(bc, block_privdata, 1); return RedisModule_ReplyWithSimpleString(ctx,"Request timedout"); } /* Private data freeing callback for BLOCK.DEBUG command. */ void HelloBlock_FreeData(RedisModuleCtx *ctx, void *privdata) { UNUSED(ctx); + BlockPrivdata *block_privdata = privdata; + pthread_mutex_destroy(&block_privdata->measuretime_mutex); RedisModule_Free(privdata); } @@ -42,19 +74,20 @@ void *BlockDebug_ThreadMain(void *arg) { RedisModuleBlockedClient *bc = targ[0]; long long delay = (unsigned long)targ[1]; long long enable_time_track = (unsigned long)targ[2]; + BlockPrivdata *block_privdata = RedisModule_BlockClientGetPrivateData(bc); + if (enable_time_track) - RedisModule_BlockedClientMeasureTimeStart(bc); + blockClientMeasureTimeStart(bc, block_privdata); RedisModule_Free(targ); struct timespec ts; ts.tv_sec = delay / 1000; ts.tv_nsec = (delay % 1000) * 1000000; nanosleep(&ts, NULL); - int *r = RedisModule_Alloc(sizeof(int)); - *r = rand(); if (enable_time_track) - RedisModule_BlockedClientMeasureTimeEnd(bc); - RedisModule_UnblockClient(bc,r); + blockClientMeasureTimeEnd(bc, block_privdata, 0); + block_privdata->myint = rand(); + RedisModule_UnblockClient(bc,block_privdata); return NULL; } @@ -64,23 +97,22 @@ void *DoubleBlock_ThreadMain(void *arg) { void **targ = arg; RedisModuleBlockedClient *bc = targ[0]; long long delay = (unsigned long)targ[1]; - RedisModule_BlockedClientMeasureTimeStart(bc); + BlockPrivdata *block_privdata = RedisModule_BlockClientGetPrivateData(bc); + blockClientMeasureTimeStart(bc, block_privdata); RedisModule_Free(targ); struct timespec ts; ts.tv_sec = delay / 1000; ts.tv_nsec = (delay % 1000) * 1000000; nanosleep(&ts, NULL); - int *r = RedisModule_Alloc(sizeof(int)); - *r = rand(); - RedisModule_BlockedClientMeasureTimeEnd(bc); + blockClientMeasureTimeEnd(bc, block_privdata, 0); /* call again RedisModule_BlockedClientMeasureTimeStart() and * RedisModule_BlockedClientMeasureTimeEnd and ensure that the * total execution time is 2x the delay. */ - RedisModule_BlockedClientMeasureTimeStart(bc); + blockClientMeasureTimeStart(bc, block_privdata); nanosleep(&ts, NULL); - RedisModule_BlockedClientMeasureTimeEnd(bc); - - RedisModule_UnblockClient(bc,r); + blockClientMeasureTimeEnd(bc, block_privdata, 0); + block_privdata->myint = rand(); + RedisModule_UnblockClient(bc,block_privdata); return NULL; } @@ -107,6 +139,7 @@ int HelloBlock_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int a pthread_t tid; RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout); + blockClientPrivdataInit(bc); /* Here we set a disconnection handler, however since this module will * block in sleep() in a thread, there is not much we can do in the @@ -148,6 +181,7 @@ int HelloBlockNoTracking_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **a pthread_t tid; RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout); + blockClientPrivdataInit(bc); /* Here we set a disconnection handler, however since this module will * block in sleep() in a thread, there is not much we can do in the @@ -184,6 +218,7 @@ int HelloDoubleBlock_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, pthread_t tid; RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,0); + blockClientPrivdataInit(bc); /* Now that we setup a blocking client, we need to pass the control * to the thread. However we need to pass arguments to the thread: diff --git a/tests/modules/usercall.c b/tests/modules/usercall.c index 6b23974d4..316de1eea 100644 --- a/tests/modules/usercall.c +++ b/tests/modules/usercall.c @@ -115,6 +115,7 @@ typedef struct { void *bg_call_worker(void *arg) { bg_call_data *bg = arg; + RedisModuleBlockedClient *bc = bg->bc; // Get Redis module context RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bg->bc); @@ -136,6 +137,12 @@ void *bg_call_worker(void *arg) { RedisModuleCallReply *rep = RedisModule_Call(ctx, cmd, format, bg->argv + 3, bg->argc - 3); RedisModule_FreeString(NULL, format_redis_str); + /* Free the arguments within GIL to prevent simultaneous freeing in main thread. */ + for (int i=0; iargc; i++) + RedisModule_FreeString(ctx, bg->argv[i]); + RedisModule_Free(bg->argv); + RedisModule_Free(bg); + // Release GIL RedisModule_ThreadSafeContextUnlock(ctx); @@ -148,13 +155,7 @@ void *bg_call_worker(void *arg) { } // Unblock client - RedisModule_UnblockClient(bg->bc, NULL); - - /* Free the arguments */ - for (int i=0; iargc; i++) - RedisModule_FreeString(ctx, bg->argv[i]); - RedisModule_Free(bg->argv); - RedisModule_Free(bg); + RedisModule_UnblockClient(bc, NULL); // Free the Redis module context RedisModule_FreeThreadSafeContext(ctx);