From ec74ae7ec1e758aa98c452296c2fa75b6ad9219f Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Tue, 8 Dec 2020 16:41:20 +0200 Subject: [PATCH] Handle output buffer limits for Module blocked clients (#8141) Module blocked clients cache the response in a temporary client, the reply list in this client would be affected by the recent fix in #7202, but when the reply is later copied into the real client, it would have bypassed all the checks for output buffer limit, which would have resulted in both: responding with a partial response to the client, and also not disconnecting it at all. (cherry picked from commit 48efc25f749c3620f9245786582ac76cb40e9bf4) --- src/networking.c | 33 +++++++- tests/modules/blockedclient.c | 105 +++++++++++++++++++++++++ tests/unit/moduleapi/blockedclient.tcl | 13 +++ 3 files changed, 150 insertions(+), 1 deletion(-) diff --git a/src/networking.c b/src/networking.c index 54de8ac54..9150426a5 100644 --- a/src/networking.c +++ b/src/networking.c @@ -260,6 +260,9 @@ int prepareClientToWrite(client *c) { * Low level functions to add more data to output buffers. * -------------------------------------------------------------------------- */ +/* Attempts to add the reply to the static buffer in the client struct. + * Returns C_ERR if the buffer is full, or the reply list is not empty, + * in which case the reply must be added to the reply list. */ int _addReplyToBuffer(client *c, const char *s, size_t len) { size_t available = sizeof(c->buf)-c->bufpos; @@ -277,6 +280,8 @@ int _addReplyToBuffer(client *c, const char *s, size_t len) { return C_OK; } +/* Adds the reply to the reply linked list. + * Note: some edits to this function need to be relayed to AddReplyFromClient. */ void _addReplyProtoToList(client *c, const char *s, size_t len) { if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return; @@ -837,14 +842,40 @@ void addReplySubcommandSyntaxError(client *c) { /* Append 'src' client output buffers into 'dst' client output buffers. * This function clears the output buffers of 'src' */ void AddReplyFromClient(client *dst, client *src) { + /* If the source client contains a partial response due to client output + * buffer limits, propagate that to the dest rather than copy a partial + * reply. We don't wanna run the risk of copying partial response in case + * for some reason the output limits don't reach the same decision (maybe + * they changed) */ + if (src->flags & CLIENT_CLOSE_ASAP) { + sds client = catClientInfoString(sdsempty(),dst); + freeClientAsync(dst); + serverLog(LL_WARNING,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client); + sdsfree(client); + return; + } + + /* First add the static buffer (either into the static buffer or reply list) */ + addReplyProto(dst,src->buf, src->bufpos); + + /* We need to check with prepareClientToWrite again (after addReplyProto) + * since addReplyProto may have changed something (like CLIENT_CLOSE_ASAP) */ if (prepareClientToWrite(dst) != C_OK) return; - addReplyProto(dst,src->buf, src->bufpos); + + /* We're bypassing _addReplyProtoToList, so we need to add the pre/post + * checks in it. */ + if (dst->flags & CLIENT_CLOSE_AFTER_REPLY) return; + + /* Concatenate the reply list into the dest */ if (listLength(src->reply)) listJoin(dst->reply,src->reply); dst->reply_bytes += src->reply_bytes; src->reply_bytes = 0; src->bufpos = 0; + + /* Check output buffer limits */ + asyncCloseClientOnOutputBufferLimitReached(dst); } /* Copy 'src' client output buffers into 'dst' client output buffers. diff --git a/tests/modules/blockedclient.c b/tests/modules/blockedclient.c index 558e06502..1c485a44a 100644 --- a/tests/modules/blockedclient.c +++ b/tests/modules/blockedclient.c @@ -79,6 +79,105 @@ int acquire_gil(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_OK; } +typedef struct { + RedisModuleString **argv; + int argc; + RedisModuleBlockedClient *bc; +} bg_call_data; + +void *bg_call_worker(void *arg) { + bg_call_data *bg = arg; + + // Get Redis module context + RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bg->bc); + + // Acquire GIL + RedisModule_ThreadSafeContextLock(ctx); + + // Call the command + const char* cmd = RedisModule_StringPtrLen(bg->argv[1], NULL); + RedisModuleCallReply* rep = RedisModule_Call(ctx, cmd, "v", bg->argv + 2, bg->argc - 2); + + // Release GIL + RedisModule_ThreadSafeContextUnlock(ctx); + + // Reply to client + if (!rep) { + RedisModule_ReplyWithError(ctx, "NULL reply returned"); + } else { + RedisModule_ReplyWithCallReply(ctx, rep); + RedisModule_FreeCallReply(rep); + } + + // Unblock client + RedisModule_UnblockClient(bg->bc, NULL); + + /* Free the arguments */ + for (int i=0; iargc; i++) + RedisModule_FreeString(ctx, bg->argv[i]); + RedisModule_Free(bg->argv); + RedisModule_Free(bg); + + // Free the Redis module context + RedisModule_FreeThreadSafeContext(ctx); + + return NULL; +} + +int do_bg_rm_call(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + UNUSED(argv); + UNUSED(argc); + + /* Make sure we're not trying to block a client when we shouldn't */ + int flags = RedisModule_GetContextFlags(ctx); + int allFlags = RedisModule_GetContextFlagsAll(); + if ((allFlags & REDISMODULE_CTX_FLAGS_MULTI) && + (flags & REDISMODULE_CTX_FLAGS_MULTI)) { + RedisModule_ReplyWithSimpleString(ctx, "Blocked client is not supported inside multi"); + return REDISMODULE_OK; + } + + /* Make a copy of the arguments and pass them to the thread. */ + bg_call_data *bg = RedisModule_Alloc(sizeof(bg_call_data)); + bg->argv = RedisModule_Alloc(sizeof(RedisModuleString*)*argc); + bg->argc = argc; + for (int i=0; iargv[i] = RedisModule_HoldString(ctx, argv[i]); + + /* Block the client */ + bg->bc = RedisModule_BlockClient(ctx, NULL, NULL, NULL, 0); + + /* Start a thread to handle the request */ + pthread_t tid; + int res = pthread_create(&tid, NULL, bg_call_worker, bg); + assert(res == 0); + + return REDISMODULE_OK; +} + +int do_rm_call(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){ + UNUSED(argv); + UNUSED(argc); + + if(argc < 2){ + return RedisModule_WrongArity(ctx); + } + + const char* cmd = RedisModule_StringPtrLen(argv[1], NULL); + + RedisModuleCallReply* rep = RedisModule_Call(ctx, cmd, "v", argv + 2, argc - 2); + if(!rep){ + RedisModule_ReplyWithError(ctx, "NULL reply returned"); + }else{ + RedisModule_ReplyWithCallReply(ctx, rep); + RedisModule_FreeCallReply(rep); + } + + return REDISMODULE_OK; +} + + int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argc); @@ -89,5 +188,11 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) if (RedisModule_CreateCommand(ctx, "acquire_gil", acquire_gil, "", 0, 0, 0) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx, "do_rm_call", do_rm_call, "", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "do_bg_rm_call", do_bg_rm_call, "", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + return REDISMODULE_OK; } diff --git a/tests/unit/moduleapi/blockedclient.tcl b/tests/unit/moduleapi/blockedclient.tcl index 5a541d138..77d94c7bf 100644 --- a/tests/unit/moduleapi/blockedclient.tcl +++ b/tests/unit/moduleapi/blockedclient.tcl @@ -14,4 +14,17 @@ start_server {tags {"modules"}} { r acquire_gil assert_equal {{Blocked client is not supported inside multi}} [r exec] } + + test {blocked client reaches client output buffer limit} { + r hset hash big [string repeat x 50000] + r hset hash bada [string repeat x 50000] + r hset hash boom [string repeat x 50000] + r config set client-output-buffer-limit {normal 100000 0 0} + r client setname myclient + catch {r do_bg_rm_call hgetall hash} e + assert_match "*I/O error*" $e + reconnect + set clients [r client list] + assert_no_match "*name=myclient*" $clients + } }