Handle output buffer limits for Module blocked clients (#8141)
Module blocked clients cache the response in a temporary client, the reply list in this client would be affected by the recent fix in #7202, but when the reply is later copied into the real client, it would have bypassed all the checks for output buffer limit, which would have resulted in both: responding with a partial response to the client, and also not disconnecting it at all. (cherry picked from commit 48efc25f749c3620f9245786582ac76cb40e9bf4)
This commit is contained in:
parent
785851a736
commit
ec74ae7ec1
@ -260,6 +260,9 @@ int prepareClientToWrite(client *c) {
|
|||||||
* Low level functions to add more data to output buffers.
|
* Low level functions to add more data to output buffers.
|
||||||
* -------------------------------------------------------------------------- */
|
* -------------------------------------------------------------------------- */
|
||||||
|
|
||||||
|
/* Attempts to add the reply to the static buffer in the client struct.
|
||||||
|
* Returns C_ERR if the buffer is full, or the reply list is not empty,
|
||||||
|
* in which case the reply must be added to the reply list. */
|
||||||
int _addReplyToBuffer(client *c, const char *s, size_t len) {
|
int _addReplyToBuffer(client *c, const char *s, size_t len) {
|
||||||
size_t available = sizeof(c->buf)-c->bufpos;
|
size_t available = sizeof(c->buf)-c->bufpos;
|
||||||
|
|
||||||
@ -277,6 +280,8 @@ int _addReplyToBuffer(client *c, const char *s, size_t len) {
|
|||||||
return C_OK;
|
return C_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Adds the reply to the reply linked list.
|
||||||
|
* Note: some edits to this function need to be relayed to AddReplyFromClient. */
|
||||||
void _addReplyProtoToList(client *c, const char *s, size_t len) {
|
void _addReplyProtoToList(client *c, const char *s, size_t len) {
|
||||||
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return;
|
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return;
|
||||||
|
|
||||||
@ -837,14 +842,40 @@ void addReplySubcommandSyntaxError(client *c) {
|
|||||||
/* Append 'src' client output buffers into 'dst' client output buffers.
|
/* Append 'src' client output buffers into 'dst' client output buffers.
|
||||||
* This function clears the output buffers of 'src' */
|
* This function clears the output buffers of 'src' */
|
||||||
void AddReplyFromClient(client *dst, client *src) {
|
void AddReplyFromClient(client *dst, client *src) {
|
||||||
|
/* If the source client contains a partial response due to client output
|
||||||
|
* buffer limits, propagate that to the dest rather than copy a partial
|
||||||
|
* reply. We don't wanna run the risk of copying partial response in case
|
||||||
|
* for some reason the output limits don't reach the same decision (maybe
|
||||||
|
* they changed) */
|
||||||
|
if (src->flags & CLIENT_CLOSE_ASAP) {
|
||||||
|
sds client = catClientInfoString(sdsempty(),dst);
|
||||||
|
freeClientAsync(dst);
|
||||||
|
serverLog(LL_WARNING,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client);
|
||||||
|
sdsfree(client);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* First add the static buffer (either into the static buffer or reply list) */
|
||||||
|
addReplyProto(dst,src->buf, src->bufpos);
|
||||||
|
|
||||||
|
/* We need to check with prepareClientToWrite again (after addReplyProto)
|
||||||
|
* since addReplyProto may have changed something (like CLIENT_CLOSE_ASAP) */
|
||||||
if (prepareClientToWrite(dst) != C_OK)
|
if (prepareClientToWrite(dst) != C_OK)
|
||||||
return;
|
return;
|
||||||
addReplyProto(dst,src->buf, src->bufpos);
|
|
||||||
|
/* We're bypassing _addReplyProtoToList, so we need to add the pre/post
|
||||||
|
* checks in it. */
|
||||||
|
if (dst->flags & CLIENT_CLOSE_AFTER_REPLY) return;
|
||||||
|
|
||||||
|
/* Concatenate the reply list into the dest */
|
||||||
if (listLength(src->reply))
|
if (listLength(src->reply))
|
||||||
listJoin(dst->reply,src->reply);
|
listJoin(dst->reply,src->reply);
|
||||||
dst->reply_bytes += src->reply_bytes;
|
dst->reply_bytes += src->reply_bytes;
|
||||||
src->reply_bytes = 0;
|
src->reply_bytes = 0;
|
||||||
src->bufpos = 0;
|
src->bufpos = 0;
|
||||||
|
|
||||||
|
/* Check output buffer limits */
|
||||||
|
asyncCloseClientOnOutputBufferLimitReached(dst);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Copy 'src' client output buffers into 'dst' client output buffers.
|
/* Copy 'src' client output buffers into 'dst' client output buffers.
|
||||||
|
@ -79,6 +79,105 @@ int acquire_gil(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
|||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
RedisModuleString **argv;
|
||||||
|
int argc;
|
||||||
|
RedisModuleBlockedClient *bc;
|
||||||
|
} bg_call_data;
|
||||||
|
|
||||||
|
void *bg_call_worker(void *arg) {
|
||||||
|
bg_call_data *bg = arg;
|
||||||
|
|
||||||
|
// Get Redis module context
|
||||||
|
RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bg->bc);
|
||||||
|
|
||||||
|
// Acquire GIL
|
||||||
|
RedisModule_ThreadSafeContextLock(ctx);
|
||||||
|
|
||||||
|
// Call the command
|
||||||
|
const char* cmd = RedisModule_StringPtrLen(bg->argv[1], NULL);
|
||||||
|
RedisModuleCallReply* rep = RedisModule_Call(ctx, cmd, "v", bg->argv + 2, bg->argc - 2);
|
||||||
|
|
||||||
|
// Release GIL
|
||||||
|
RedisModule_ThreadSafeContextUnlock(ctx);
|
||||||
|
|
||||||
|
// Reply to client
|
||||||
|
if (!rep) {
|
||||||
|
RedisModule_ReplyWithError(ctx, "NULL reply returned");
|
||||||
|
} else {
|
||||||
|
RedisModule_ReplyWithCallReply(ctx, rep);
|
||||||
|
RedisModule_FreeCallReply(rep);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unblock client
|
||||||
|
RedisModule_UnblockClient(bg->bc, NULL);
|
||||||
|
|
||||||
|
/* Free the arguments */
|
||||||
|
for (int i=0; i<bg->argc; i++)
|
||||||
|
RedisModule_FreeString(ctx, bg->argv[i]);
|
||||||
|
RedisModule_Free(bg->argv);
|
||||||
|
RedisModule_Free(bg);
|
||||||
|
|
||||||
|
// Free the Redis module context
|
||||||
|
RedisModule_FreeThreadSafeContext(ctx);
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int do_bg_rm_call(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||||
|
{
|
||||||
|
UNUSED(argv);
|
||||||
|
UNUSED(argc);
|
||||||
|
|
||||||
|
/* Make sure we're not trying to block a client when we shouldn't */
|
||||||
|
int flags = RedisModule_GetContextFlags(ctx);
|
||||||
|
int allFlags = RedisModule_GetContextFlagsAll();
|
||||||
|
if ((allFlags & REDISMODULE_CTX_FLAGS_MULTI) &&
|
||||||
|
(flags & REDISMODULE_CTX_FLAGS_MULTI)) {
|
||||||
|
RedisModule_ReplyWithSimpleString(ctx, "Blocked client is not supported inside multi");
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Make a copy of the arguments and pass them to the thread. */
|
||||||
|
bg_call_data *bg = RedisModule_Alloc(sizeof(bg_call_data));
|
||||||
|
bg->argv = RedisModule_Alloc(sizeof(RedisModuleString*)*argc);
|
||||||
|
bg->argc = argc;
|
||||||
|
for (int i=0; i<argc; i++)
|
||||||
|
bg->argv[i] = RedisModule_HoldString(ctx, argv[i]);
|
||||||
|
|
||||||
|
/* Block the client */
|
||||||
|
bg->bc = RedisModule_BlockClient(ctx, NULL, NULL, NULL, 0);
|
||||||
|
|
||||||
|
/* Start a thread to handle the request */
|
||||||
|
pthread_t tid;
|
||||||
|
int res = pthread_create(&tid, NULL, bg_call_worker, bg);
|
||||||
|
assert(res == 0);
|
||||||
|
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
int do_rm_call(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){
|
||||||
|
UNUSED(argv);
|
||||||
|
UNUSED(argc);
|
||||||
|
|
||||||
|
if(argc < 2){
|
||||||
|
return RedisModule_WrongArity(ctx);
|
||||||
|
}
|
||||||
|
|
||||||
|
const char* cmd = RedisModule_StringPtrLen(argv[1], NULL);
|
||||||
|
|
||||||
|
RedisModuleCallReply* rep = RedisModule_Call(ctx, cmd, "v", argv + 2, argc - 2);
|
||||||
|
if(!rep){
|
||||||
|
RedisModule_ReplyWithError(ctx, "NULL reply returned");
|
||||||
|
}else{
|
||||||
|
RedisModule_ReplyWithCallReply(ctx, rep);
|
||||||
|
RedisModule_FreeCallReply(rep);
|
||||||
|
}
|
||||||
|
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||||
REDISMODULE_NOT_USED(argv);
|
REDISMODULE_NOT_USED(argv);
|
||||||
REDISMODULE_NOT_USED(argc);
|
REDISMODULE_NOT_USED(argc);
|
||||||
@ -89,5 +188,11 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
|||||||
if (RedisModule_CreateCommand(ctx, "acquire_gil", acquire_gil, "", 0, 0, 0) == REDISMODULE_ERR)
|
if (RedisModule_CreateCommand(ctx, "acquire_gil", acquire_gil, "", 0, 0, 0) == REDISMODULE_ERR)
|
||||||
return REDISMODULE_ERR;
|
return REDISMODULE_ERR;
|
||||||
|
|
||||||
|
if (RedisModule_CreateCommand(ctx, "do_rm_call", do_rm_call, "", 0, 0, 0) == REDISMODULE_ERR)
|
||||||
|
return REDISMODULE_ERR;
|
||||||
|
|
||||||
|
if (RedisModule_CreateCommand(ctx, "do_bg_rm_call", do_bg_rm_call, "", 0, 0, 0) == REDISMODULE_ERR)
|
||||||
|
return REDISMODULE_ERR;
|
||||||
|
|
||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
@ -14,4 +14,17 @@ start_server {tags {"modules"}} {
|
|||||||
r acquire_gil
|
r acquire_gil
|
||||||
assert_equal {{Blocked client is not supported inside multi}} [r exec]
|
assert_equal {{Blocked client is not supported inside multi}} [r exec]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test {blocked client reaches client output buffer limit} {
|
||||||
|
r hset hash big [string repeat x 50000]
|
||||||
|
r hset hash bada [string repeat x 50000]
|
||||||
|
r hset hash boom [string repeat x 50000]
|
||||||
|
r config set client-output-buffer-limit {normal 100000 0 0}
|
||||||
|
r client setname myclient
|
||||||
|
catch {r do_bg_rm_call hgetall hash} e
|
||||||
|
assert_match "*I/O error*" $e
|
||||||
|
reconnect
|
||||||
|
set clients [r client list]
|
||||||
|
assert_no_match "*name=myclient*" $clients
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user