Enabled background and reply time tracking on blocked on keys/blocked on background work clients (#7491)
This commit enables tracking time of the background tasks and on replies, opening the door for properly tracking commands that rely on blocking / background work via the slowlog, latency history, and commandstats. Some notes: - The time spent blocked waiting for key changes, or blocked on synchronous replication is not accounted for. - **This commit does not affect latency tracking of commands that are non-blocking or do not have background work.** ( meaning that it all stays the same with exception to `BZPOPMIN`,`BZPOPMAX`,`BRPOP`,`BLPOP`, etc... and module's commands that rely on background threads ). - Specifically for latency history command we've added a new event class named `command-unblocking` that will enable latency monitoring on commands that spawn background threads to do the work. - For blocking commands we're now considering the total time of a command as the time spent on call() + the time spent on replying when unblocked. - For Modules commands that rely on background threads we're now considering the total time of a command as the time spent on call (main thread) + the time spent on the background thread ( if marked within `RedisModule_MeasureTimeStart()` and `RedisModule_MeasureTimeEnd()` ) + the time spent on replying (main thread) To test for this feature we've added a `unit/moduleapi/blockonbackground` test that relies on a module that blocks the client and sleeps on the background for a given time. - check blocked command that uses RedisModule_MeasureTimeStart() is tracking background time - check blocked command that uses RedisModule_MeasureTimeStart() is tracking background time even in timeout - check blocked command with multiple calls RedisModule_MeasureTimeStart() is tracking the total background time - check blocked command without calling RedisModule_MeasureTimeStart() is not reporting background time
This commit is contained in:
parent
b9a0500f16
commit
f0c5052aa8
@ -23,6 +23,7 @@ $TCLSH tests/test_helper.tcl \
|
|||||||
--single unit/moduleapi/hooks \
|
--single unit/moduleapi/hooks \
|
||||||
--single unit/moduleapi/misc \
|
--single unit/moduleapi/misc \
|
||||||
--single unit/moduleapi/blockonkeys \
|
--single unit/moduleapi/blockonkeys \
|
||||||
|
--single unit/moduleapi/blockonbackground \
|
||||||
--single unit/moduleapi/scan \
|
--single unit/moduleapi/scan \
|
||||||
--single unit/moduleapi/datatype \
|
--single unit/moduleapi/datatype \
|
||||||
--single unit/moduleapi/auth \
|
--single unit/moduleapi/auth \
|
||||||
|
@ -61,6 +61,9 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
#include "server.h"
|
#include "server.h"
|
||||||
|
#include "slowlog.h"
|
||||||
|
#include "latency.h"
|
||||||
|
#include "monotonic.h"
|
||||||
|
|
||||||
int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int wherefrom, int whereto);
|
int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int wherefrom, int whereto);
|
||||||
int getListPositionFromObjectOrReply(client *c, robj *arg, int *position);
|
int getListPositionFromObjectOrReply(client *c, robj *arg, int *position);
|
||||||
@ -97,6 +100,20 @@ void blockClient(client *c, int btype) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* This function is called after a client has finished a blocking operation
|
||||||
|
* in order to update the total command duration, log the command into
|
||||||
|
* the Slow log if needed, and log the reply duration event if needed. */
|
||||||
|
void updateStatsOnUnblock(client *c, long blocked_us, long reply_us){
|
||||||
|
const ustime_t total_cmd_duration = c->duration + blocked_us + reply_us;
|
||||||
|
c->lastcmd->microseconds += total_cmd_duration;
|
||||||
|
/* Log the command into the Slow log if needed. */
|
||||||
|
if (!(c->lastcmd->flags & CMD_SKIP_SLOWLOG)) {
|
||||||
|
slowlogPushEntryIfNeeded(c,c->argv,c->argc,total_cmd_duration);
|
||||||
|
/* Log the reply duration event. */
|
||||||
|
latencyAddSampleIfNeeded("command-unblocking",reply_us/1000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* This function is called in the beforeSleep() function of the event loop
|
/* This function is called in the beforeSleep() function of the event loop
|
||||||
* in order to process the pending input buffer of clients that were
|
* in order to process the pending input buffer of clients that were
|
||||||
* unblocked after a blocking operation. */
|
* unblocked after a blocking operation. */
|
||||||
@ -264,6 +281,8 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
|
|||||||
if (dstkey) incrRefCount(dstkey);
|
if (dstkey) incrRefCount(dstkey);
|
||||||
unblockClient(receiver);
|
unblockClient(receiver);
|
||||||
|
|
||||||
|
monotime replyTimer;
|
||||||
|
elapsedStart(&replyTimer);
|
||||||
if (serveClientBlockedOnList(receiver,
|
if (serveClientBlockedOnList(receiver,
|
||||||
rl->key,dstkey,rl->db,value,
|
rl->key,dstkey,rl->db,value,
|
||||||
wherefrom, whereto) == C_ERR)
|
wherefrom, whereto) == C_ERR)
|
||||||
@ -272,6 +291,7 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
|
|||||||
* to also undo the POP operation. */
|
* to also undo the POP operation. */
|
||||||
listTypePush(o,value,wherefrom);
|
listTypePush(o,value,wherefrom);
|
||||||
}
|
}
|
||||||
|
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
|
||||||
|
|
||||||
if (dstkey) decrRefCount(dstkey);
|
if (dstkey) decrRefCount(dstkey);
|
||||||
decrRefCount(value);
|
decrRefCount(value);
|
||||||
@ -316,7 +336,10 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) {
|
|||||||
receiver->lastcmd->proc == bzpopminCommand)
|
receiver->lastcmd->proc == bzpopminCommand)
|
||||||
? ZSET_MIN : ZSET_MAX;
|
? ZSET_MIN : ZSET_MAX;
|
||||||
unblockClient(receiver);
|
unblockClient(receiver);
|
||||||
|
monotime replyTimer;
|
||||||
|
elapsedStart(&replyTimer);
|
||||||
genericZpopCommand(receiver,&rl->key,1,where,1,NULL);
|
genericZpopCommand(receiver,&rl->key,1,where,1,NULL);
|
||||||
|
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
|
||||||
zcard--;
|
zcard--;
|
||||||
|
|
||||||
/* Replicate the command. */
|
/* Replicate the command. */
|
||||||
@ -406,6 +429,8 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
monotime replyTimer;
|
||||||
|
elapsedStart(&replyTimer);
|
||||||
/* Emit the two elements sub-array consisting of
|
/* Emit the two elements sub-array consisting of
|
||||||
* the name of the stream and the data we
|
* the name of the stream and the data we
|
||||||
* extracted from it. Wrapped in a single-item
|
* extracted from it. Wrapped in a single-item
|
||||||
@ -425,6 +450,7 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
|
|||||||
streamReplyWithRange(receiver,s,&start,NULL,
|
streamReplyWithRange(receiver,s,&start,NULL,
|
||||||
receiver->bpop.xread_count,
|
receiver->bpop.xread_count,
|
||||||
0, group, consumer, noack, &pi);
|
0, group, consumer, noack, &pi);
|
||||||
|
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
|
||||||
|
|
||||||
/* Note that after we unblock the client, 'gt'
|
/* Note that after we unblock the client, 'gt'
|
||||||
* and other receiver->bpop stuff are no longer
|
* and other receiver->bpop stuff are no longer
|
||||||
@ -471,7 +497,10 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) {
|
|||||||
* different modules with different triggers to consider if a key
|
* different modules with different triggers to consider if a key
|
||||||
* is ready or not. This means we can't exit the loop but need
|
* is ready or not. This means we can't exit the loop but need
|
||||||
* to continue after the first failure. */
|
* to continue after the first failure. */
|
||||||
|
monotime replyTimer;
|
||||||
|
elapsedStart(&replyTimer);
|
||||||
if (!moduleTryServeClientBlockedOnKey(receiver, rl->key)) continue;
|
if (!moduleTryServeClientBlockedOnKey(receiver, rl->key)) continue;
|
||||||
|
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer));
|
||||||
|
|
||||||
moduleUnblockClient(receiver);
|
moduleUnblockClient(receiver);
|
||||||
}
|
}
|
||||||
|
51
src/module.c
51
src/module.c
@ -29,7 +29,9 @@
|
|||||||
|
|
||||||
#include "server.h"
|
#include "server.h"
|
||||||
#include "cluster.h"
|
#include "cluster.h"
|
||||||
|
#include "slowlog.h"
|
||||||
#include "rdb.h"
|
#include "rdb.h"
|
||||||
|
#include "monotonic.h"
|
||||||
#include <dlfcn.h>
|
#include <dlfcn.h>
|
||||||
#include <sys/stat.h>
|
#include <sys/stat.h>
|
||||||
#include <sys/wait.h>
|
#include <sys/wait.h>
|
||||||
@ -262,6 +264,9 @@ typedef struct RedisModuleBlockedClient {
|
|||||||
int dbid; /* Database number selected by the original client. */
|
int dbid; /* Database number selected by the original client. */
|
||||||
int blocked_on_keys; /* If blocked via RM_BlockClientOnKeys(). */
|
int blocked_on_keys; /* If blocked via RM_BlockClientOnKeys(). */
|
||||||
int unblocked; /* Already on the moduleUnblocked list. */
|
int unblocked; /* Already on the moduleUnblocked list. */
|
||||||
|
monotime background_timer; /* Timer tracking the start of background work */
|
||||||
|
uint64_t background_duration; /* Current command background time duration.
|
||||||
|
Used for measuring latency of blocking cmds */
|
||||||
} RedisModuleBlockedClient;
|
} RedisModuleBlockedClient;
|
||||||
|
|
||||||
static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER;
|
static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER;
|
||||||
@ -915,6 +920,30 @@ long long RM_Milliseconds(void) {
|
|||||||
return mstime();
|
return mstime();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Mark a point in time that will be used as the start time to calculate
|
||||||
|
* the elapsed execution time when RM_BlockedClientMeasureTimeEnd() is called.
|
||||||
|
* Within the same command, you can call multiple times
|
||||||
|
* RM_BlockedClientMeasureTimeStart() and RM_BlockedClientMeasureTimeEnd()
|
||||||
|
* to accummulate indepedent time intervals to the background duration.
|
||||||
|
* This method always return REDISMODULE_OK. */
|
||||||
|
int RM_BlockedClientMeasureTimeStart(RedisModuleBlockedClient *bc) {
|
||||||
|
elapsedStart(&(bc->background_timer));
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Mark a point in time that will be used as the end time
|
||||||
|
* to calculate the elapsed execution time.
|
||||||
|
* On success REDISMODULE_OK is returned.
|
||||||
|
* This method only returns REDISMODULE_ERR if no start time was
|
||||||
|
* previously defined ( meaning RM_BlockedClientMeasureTimeStart was not called ). */
|
||||||
|
int RM_BlockedClientMeasureTimeEnd(RedisModuleBlockedClient *bc) {
|
||||||
|
// If the counter is 0 then we haven't called RM_BlockedClientMeasureTimeStart
|
||||||
|
if (!bc->background_timer)
|
||||||
|
return REDISMODULE_ERR;
|
||||||
|
bc->background_duration += elapsedUs(bc->background_timer);
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
}
|
||||||
|
|
||||||
/* Set flags defining capabilities or behavior bit flags.
|
/* Set flags defining capabilities or behavior bit flags.
|
||||||
*
|
*
|
||||||
* REDISMODULE_OPTIONS_HANDLE_IO_ERRORS:
|
* REDISMODULE_OPTIONS_HANDLE_IO_ERRORS:
|
||||||
@ -5091,6 +5120,7 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF
|
|||||||
bc->dbid = c->db->id;
|
bc->dbid = c->db->id;
|
||||||
bc->blocked_on_keys = keys != NULL;
|
bc->blocked_on_keys = keys != NULL;
|
||||||
bc->unblocked = 0;
|
bc->unblocked = 0;
|
||||||
|
bc->background_duration = 0;
|
||||||
c->bpop.timeout = timeout;
|
c->bpop.timeout = timeout;
|
||||||
|
|
||||||
if (islua || ismulti) {
|
if (islua || ismulti) {
|
||||||
@ -5164,6 +5194,11 @@ int moduleTryServeClientBlockedOnKey(client *c, robj *key) {
|
|||||||
*
|
*
|
||||||
* In these cases, a call to RedisModule_BlockClient() will **not** block the
|
* In these cases, a call to RedisModule_BlockClient() will **not** block the
|
||||||
* client, but instead produce a specific error reply.
|
* client, but instead produce a specific error reply.
|
||||||
|
*
|
||||||
|
* Measuring background time: By default the time spent in the blocked command
|
||||||
|
* is not account for the total command duration. To include such time you should
|
||||||
|
* use RM_BlockedClientMeasureTimeStart() and RM_BlockedClientMeasureTimeEnd() one,
|
||||||
|
* or multiple times within the blocking command background work.
|
||||||
*/
|
*/
|
||||||
RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) {
|
RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) {
|
||||||
return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, NULL,0,NULL);
|
return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, NULL,0,NULL);
|
||||||
@ -5358,6 +5393,7 @@ void moduleHandleBlockedClients(void) {
|
|||||||
* was blocked on keys (RM_BlockClientOnKeys()), because we already
|
* was blocked on keys (RM_BlockClientOnKeys()), because we already
|
||||||
* called such callback in moduleTryServeClientBlockedOnKey() when
|
* called such callback in moduleTryServeClientBlockedOnKey() when
|
||||||
* the key was signaled as ready. */
|
* the key was signaled as ready. */
|
||||||
|
uint64_t reply_us = 0;
|
||||||
if (c && !bc->blocked_on_keys && bc->reply_callback) {
|
if (c && !bc->blocked_on_keys && bc->reply_callback) {
|
||||||
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
|
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
|
||||||
ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY;
|
ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY;
|
||||||
@ -5366,9 +5402,19 @@ void moduleHandleBlockedClients(void) {
|
|||||||
ctx.module = bc->module;
|
ctx.module = bc->module;
|
||||||
ctx.client = bc->client;
|
ctx.client = bc->client;
|
||||||
ctx.blocked_client = bc;
|
ctx.blocked_client = bc;
|
||||||
|
monotime replyTimer;
|
||||||
|
elapsedStart(&replyTimer);
|
||||||
bc->reply_callback(&ctx,(void**)c->argv,c->argc);
|
bc->reply_callback(&ctx,(void**)c->argv,c->argc);
|
||||||
|
reply_us = elapsedUs(replyTimer);
|
||||||
moduleFreeContext(&ctx);
|
moduleFreeContext(&ctx);
|
||||||
}
|
}
|
||||||
|
/* Update stats now that we've finished the blocking operation.
|
||||||
|
* This needs to be out of the reply callback above given that a
|
||||||
|
* module might not define any callback and still do blocking ops.
|
||||||
|
*/
|
||||||
|
if (c && !bc->blocked_on_keys) {
|
||||||
|
updateStatsOnUnblock(c, bc->background_duration, reply_us);
|
||||||
|
}
|
||||||
|
|
||||||
/* Free privdata if any. */
|
/* Free privdata if any. */
|
||||||
if (bc->privdata && bc->free_privdata) {
|
if (bc->privdata && bc->free_privdata) {
|
||||||
@ -5432,6 +5478,9 @@ void moduleBlockedClientTimedOut(client *c) {
|
|||||||
ctx.blocked_privdata = bc->privdata;
|
ctx.blocked_privdata = bc->privdata;
|
||||||
bc->timeout_callback(&ctx,(void**)c->argv,c->argc);
|
bc->timeout_callback(&ctx,(void**)c->argv,c->argc);
|
||||||
moduleFreeContext(&ctx);
|
moduleFreeContext(&ctx);
|
||||||
|
if (!bc->blocked_on_keys) {
|
||||||
|
updateStatsOnUnblock(c, bc->background_duration, 0);
|
||||||
|
}
|
||||||
/* For timeout events, we do not want to call the disconnect callback,
|
/* For timeout events, we do not want to call the disconnect callback,
|
||||||
* because the blocked client will be automatically disconnected in
|
* because the blocked client will be automatically disconnected in
|
||||||
* this case, and the user can still hook using the timeout callback. */
|
* this case, and the user can still hook using the timeout callback. */
|
||||||
@ -9094,6 +9143,8 @@ void moduleRegisterCoreAPI(void) {
|
|||||||
REGISTER_API(GetBlockedClientPrivateData);
|
REGISTER_API(GetBlockedClientPrivateData);
|
||||||
REGISTER_API(AbortBlock);
|
REGISTER_API(AbortBlock);
|
||||||
REGISTER_API(Milliseconds);
|
REGISTER_API(Milliseconds);
|
||||||
|
REGISTER_API(BlockedClientMeasureTimeStart);
|
||||||
|
REGISTER_API(BlockedClientMeasureTimeEnd);
|
||||||
REGISTER_API(GetThreadSafeContext);
|
REGISTER_API(GetThreadSafeContext);
|
||||||
REGISTER_API(GetDetachedThreadSafeContext);
|
REGISTER_API(GetDetachedThreadSafeContext);
|
||||||
REGISTER_API(FreeThreadSafeContext);
|
REGISTER_API(FreeThreadSafeContext);
|
||||||
|
@ -776,6 +776,8 @@ REDISMODULE_API int (*RedisModule_IsBlockedTimeoutRequest)(RedisModuleCtx *ctx)
|
|||||||
REDISMODULE_API void * (*RedisModule_GetBlockedClientPrivateData)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
|
REDISMODULE_API void * (*RedisModule_GetBlockedClientPrivateData)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
|
||||||
REDISMODULE_API RedisModuleBlockedClient * (*RedisModule_GetBlockedClientHandle)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
|
REDISMODULE_API RedisModuleBlockedClient * (*RedisModule_GetBlockedClientHandle)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
|
||||||
REDISMODULE_API int (*RedisModule_AbortBlock)(RedisModuleBlockedClient *bc) REDISMODULE_ATTR;
|
REDISMODULE_API int (*RedisModule_AbortBlock)(RedisModuleBlockedClient *bc) REDISMODULE_ATTR;
|
||||||
|
REDISMODULE_API int (*RedisModule_BlockedClientMeasureTimeStart)(RedisModuleBlockedClient *bc) REDISMODULE_ATTR;
|
||||||
|
REDISMODULE_API int (*RedisModule_BlockedClientMeasureTimeEnd)(RedisModuleBlockedClient *bc) REDISMODULE_ATTR;
|
||||||
REDISMODULE_API RedisModuleCtx * (*RedisModule_GetThreadSafeContext)(RedisModuleBlockedClient *bc) REDISMODULE_ATTR;
|
REDISMODULE_API RedisModuleCtx * (*RedisModule_GetThreadSafeContext)(RedisModuleBlockedClient *bc) REDISMODULE_ATTR;
|
||||||
REDISMODULE_API RedisModuleCtx * (*RedisModule_GetDetachedThreadSafeContext)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
|
REDISMODULE_API RedisModuleCtx * (*RedisModule_GetDetachedThreadSafeContext)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
|
||||||
REDISMODULE_API void (*RedisModule_FreeThreadSafeContext)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
|
REDISMODULE_API void (*RedisModule_FreeThreadSafeContext)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
|
||||||
@ -1049,6 +1051,8 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
|
|||||||
REDISMODULE_GET_API(GetBlockedClientPrivateData);
|
REDISMODULE_GET_API(GetBlockedClientPrivateData);
|
||||||
REDISMODULE_GET_API(GetBlockedClientHandle);
|
REDISMODULE_GET_API(GetBlockedClientHandle);
|
||||||
REDISMODULE_GET_API(AbortBlock);
|
REDISMODULE_GET_API(AbortBlock);
|
||||||
|
REDISMODULE_GET_API(BlockedClientMeasureTimeStart);
|
||||||
|
REDISMODULE_GET_API(BlockedClientMeasureTimeEnd);
|
||||||
REDISMODULE_GET_API(SetDisconnectCallback);
|
REDISMODULE_GET_API(SetDisconnectCallback);
|
||||||
REDISMODULE_GET_API(SubscribeToKeyspaceEvents);
|
REDISMODULE_GET_API(SubscribeToKeyspaceEvents);
|
||||||
REDISMODULE_GET_API(NotifyKeyspaceEvent);
|
REDISMODULE_GET_API(NotifyKeyspaceEvent);
|
||||||
|
10
src/server.c
10
src/server.c
@ -3620,7 +3620,7 @@ void preventCommandReplication(client *c) {
|
|||||||
*/
|
*/
|
||||||
void call(client *c, int flags) {
|
void call(client *c, int flags) {
|
||||||
long long dirty;
|
long long dirty;
|
||||||
ustime_t start, duration;
|
monotime call_timer;
|
||||||
int client_old_flags = c->flags;
|
int client_old_flags = c->flags;
|
||||||
struct redisCommand *real_cmd = c->cmd;
|
struct redisCommand *real_cmd = c->cmd;
|
||||||
static long long prev_err_count;
|
static long long prev_err_count;
|
||||||
@ -3646,9 +3646,10 @@ void call(client *c, int flags) {
|
|||||||
dirty = server.dirty;
|
dirty = server.dirty;
|
||||||
prev_err_count = server.stat_total_error_replies;
|
prev_err_count = server.stat_total_error_replies;
|
||||||
updateCachedTime(0);
|
updateCachedTime(0);
|
||||||
start = server.ustime;
|
elapsedStart(&call_timer);
|
||||||
c->cmd->proc(c);
|
c->cmd->proc(c);
|
||||||
duration = ustime()-start;
|
const long duration = elapsedUs(call_timer);
|
||||||
|
c->duration = duration;
|
||||||
dirty = server.dirty-dirty;
|
dirty = server.dirty-dirty;
|
||||||
if (dirty < 0) dirty = 0;
|
if (dirty < 0) dirty = 0;
|
||||||
|
|
||||||
@ -3692,8 +3693,11 @@ void call(client *c, int flags) {
|
|||||||
* arguments. */
|
* arguments. */
|
||||||
robj **argv = c->original_argv ? c->original_argv : c->argv;
|
robj **argv = c->original_argv ? c->original_argv : c->argv;
|
||||||
int argc = c->original_argv ? c->original_argc : c->argc;
|
int argc = c->original_argv ? c->original_argc : c->argc;
|
||||||
|
/* If the client is blocked we will handle slowlog when it is unblocked . */
|
||||||
|
if (!(c->flags & CLIENT_BLOCKED)) {
|
||||||
slowlogPushEntryIfNeeded(c,argv,argc,duration);
|
slowlogPushEntryIfNeeded(c,argv,argc,duration);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
freeClientOriginalArgv(c);
|
freeClientOriginalArgv(c);
|
||||||
|
|
||||||
if (flags & CMD_CALL_STATS) {
|
if (flags & CMD_CALL_STATS) {
|
||||||
|
@ -881,6 +881,7 @@ typedef struct client {
|
|||||||
size_t sentlen; /* Amount of bytes already sent in the current
|
size_t sentlen; /* Amount of bytes already sent in the current
|
||||||
buffer or object being sent. */
|
buffer or object being sent. */
|
||||||
time_t ctime; /* Client creation time. */
|
time_t ctime; /* Client creation time. */
|
||||||
|
long duration; /* Current command duration. Used for measuring latency of blocking/non-blocking cmds */
|
||||||
time_t lastinteraction; /* Time of the last interaction, used for timeout */
|
time_t lastinteraction; /* Time of the last interaction, used for timeout */
|
||||||
time_t obuf_soft_limit_reached_time;
|
time_t obuf_soft_limit_reached_time;
|
||||||
uint64_t flags; /* Client flags: CLIENT_* macros. */
|
uint64_t flags; /* Client flags: CLIENT_* macros. */
|
||||||
@ -2405,6 +2406,7 @@ void disconnectAllBlockedClients(void);
|
|||||||
void handleClientsBlockedOnKeys(void);
|
void handleClientsBlockedOnKeys(void);
|
||||||
void signalKeyAsReady(redisDb *db, robj *key, int type);
|
void signalKeyAsReady(redisDb *db, robj *key, int type);
|
||||||
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, struct listPos *listpos, streamID *ids);
|
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, robj *target, struct listPos *listpos, streamID *ids);
|
||||||
|
void updateStatsOnUnblock(client *c, long blocked_us, long reply_us);
|
||||||
|
|
||||||
/* timeout.c -- Blocked clients timeout and connections timeout. */
|
/* timeout.c -- Blocked clients timeout and connections timeout. */
|
||||||
void addClientToTimeoutTable(client *c);
|
void addClientToTimeoutTable(client *c);
|
||||||
|
@ -19,6 +19,7 @@ TEST_MODULES = \
|
|||||||
misc.so \
|
misc.so \
|
||||||
hooks.so \
|
hooks.so \
|
||||||
blockonkeys.so \
|
blockonkeys.so \
|
||||||
|
blockonbackground.so \
|
||||||
scan.so \
|
scan.so \
|
||||||
datatype.so \
|
datatype.so \
|
||||||
auth.so \
|
auth.so \
|
||||||
|
220
tests/modules/blockonbackground.c
Normal file
220
tests/modules/blockonbackground.c
Normal file
@ -0,0 +1,220 @@
|
|||||||
|
#define REDISMODULE_EXPERIMENTAL_API
|
||||||
|
#include "redismodule.h"
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <pthread.h>
|
||||||
|
#include <time.h>
|
||||||
|
#include "assert.h"
|
||||||
|
|
||||||
|
#define UNUSED(x) (void)(x)
|
||||||
|
|
||||||
|
/* Reply callback for blocking command BLOCK.DEBUG */
|
||||||
|
int HelloBlock_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||||
|
UNUSED(argv);
|
||||||
|
UNUSED(argc);
|
||||||
|
int *myint = RedisModule_GetBlockedClientPrivateData(ctx);
|
||||||
|
return RedisModule_ReplyWithLongLong(ctx,*myint);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Timeout callback for blocking command BLOCK.DEBUG */
|
||||||
|
int HelloBlock_Timeout(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||||
|
UNUSED(argv);
|
||||||
|
UNUSED(argc);
|
||||||
|
RedisModuleBlockedClient *bc = RedisModule_GetBlockedClientHandle(ctx);
|
||||||
|
assert(RedisModule_BlockedClientMeasureTimeEnd(bc)==REDISMODULE_OK);
|
||||||
|
return RedisModule_ReplyWithSimpleString(ctx,"Request timedout");
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Private data freeing callback for BLOCK.DEBUG command. */
|
||||||
|
void HelloBlock_FreeData(RedisModuleCtx *ctx, void *privdata) {
|
||||||
|
UNUSED(ctx);
|
||||||
|
RedisModule_Free(privdata);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* The thread entry point that actually executes the blocking part
|
||||||
|
* of the command BLOCK.DEBUG. */
|
||||||
|
void *BlockDebug_ThreadMain(void *arg) {
|
||||||
|
void **targ = arg;
|
||||||
|
RedisModuleBlockedClient *bc = targ[0];
|
||||||
|
long long delay = (unsigned long)targ[1];
|
||||||
|
long long enable_time_track = (unsigned long)targ[2];
|
||||||
|
if (enable_time_track)
|
||||||
|
assert(RedisModule_BlockedClientMeasureTimeStart(bc)==REDISMODULE_OK);
|
||||||
|
RedisModule_Free(targ);
|
||||||
|
|
||||||
|
struct timespec ts;
|
||||||
|
ts.tv_sec = delay / 1000;
|
||||||
|
ts.tv_nsec = (delay % 1000) * 1000000;
|
||||||
|
nanosleep(&ts, NULL);
|
||||||
|
int *r = RedisModule_Alloc(sizeof(int));
|
||||||
|
*r = rand();
|
||||||
|
if (enable_time_track)
|
||||||
|
assert(RedisModule_BlockedClientMeasureTimeEnd(bc)==REDISMODULE_OK);
|
||||||
|
RedisModule_UnblockClient(bc,r);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* The thread entry point that actually executes the blocking part
|
||||||
|
* of the command BLOCK.DEBUG. */
|
||||||
|
void *DoubleBlock_ThreadMain(void *arg) {
|
||||||
|
void **targ = arg;
|
||||||
|
RedisModuleBlockedClient *bc = targ[0];
|
||||||
|
long long delay = (unsigned long)targ[1];
|
||||||
|
assert(RedisModule_BlockedClientMeasureTimeStart(bc)==REDISMODULE_OK);
|
||||||
|
RedisModule_Free(targ);
|
||||||
|
struct timespec ts;
|
||||||
|
ts.tv_sec = delay / 1000;
|
||||||
|
ts.tv_nsec = (delay % 1000) * 1000000;
|
||||||
|
nanosleep(&ts, NULL);
|
||||||
|
int *r = RedisModule_Alloc(sizeof(int));
|
||||||
|
*r = rand();
|
||||||
|
RedisModule_BlockedClientMeasureTimeEnd(bc);
|
||||||
|
/* call again RedisModule_BlockedClientMeasureTimeStart() and
|
||||||
|
* RedisModule_BlockedClientMeasureTimeEnd and ensure that the
|
||||||
|
* total execution time is 2x the delay. */
|
||||||
|
assert(RedisModule_BlockedClientMeasureTimeStart(bc)==REDISMODULE_OK);
|
||||||
|
nanosleep(&ts, NULL);
|
||||||
|
RedisModule_BlockedClientMeasureTimeEnd(bc);
|
||||||
|
|
||||||
|
RedisModule_UnblockClient(bc,r);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
void HelloBlock_Disconnected(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc) {
|
||||||
|
RedisModule_Log(ctx,"warning","Blocked client %p disconnected!",
|
||||||
|
(void*)bc);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* BLOCK.DEBUG <delay_ms> <timeout_ms> -- Block for <count> milliseconds, then reply with
|
||||||
|
* a random number. Timeout is the command timeout, so that you can test
|
||||||
|
* what happens when the delay is greater than the timeout. */
|
||||||
|
int HelloBlock_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||||
|
if (argc != 3) return RedisModule_WrongArity(ctx);
|
||||||
|
long long delay;
|
||||||
|
long long timeout;
|
||||||
|
|
||||||
|
if (RedisModule_StringToLongLong(argv[1],&delay) != REDISMODULE_OK) {
|
||||||
|
return RedisModule_ReplyWithError(ctx,"ERR invalid count");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (RedisModule_StringToLongLong(argv[2],&timeout) != REDISMODULE_OK) {
|
||||||
|
return RedisModule_ReplyWithError(ctx,"ERR invalid count");
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_t tid;
|
||||||
|
RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout);
|
||||||
|
|
||||||
|
/* Here we set a disconnection handler, however since this module will
|
||||||
|
* block in sleep() in a thread, there is not much we can do in the
|
||||||
|
* callback, so this is just to show you the API. */
|
||||||
|
RedisModule_SetDisconnectCallback(bc,HelloBlock_Disconnected);
|
||||||
|
|
||||||
|
/* Now that we setup a blocking client, we need to pass the control
|
||||||
|
* to the thread. However we need to pass arguments to the thread:
|
||||||
|
* the delay and a reference to the blocked client handle. */
|
||||||
|
void **targ = RedisModule_Alloc(sizeof(void*)*3);
|
||||||
|
targ[0] = bc;
|
||||||
|
targ[1] = (void*)(unsigned long) delay;
|
||||||
|
// pass 1 as flag to enable time tracking
|
||||||
|
targ[2] = (void*)(unsigned long) 1;
|
||||||
|
|
||||||
|
if (pthread_create(&tid,NULL,BlockDebug_ThreadMain,targ) != 0) {
|
||||||
|
RedisModule_AbortBlock(bc);
|
||||||
|
return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
|
||||||
|
}
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* BLOCK.DEBUG_NOTRACKING <delay_ms> <timeout_ms> -- Block for <count> milliseconds, then reply with
|
||||||
|
* a random number. Timeout is the command timeout, so that you can test
|
||||||
|
* what happens when the delay is greater than the timeout.
|
||||||
|
* this command does not track background time so the background time should no appear in stats*/
|
||||||
|
int HelloBlockNoTracking_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||||
|
if (argc != 3) return RedisModule_WrongArity(ctx);
|
||||||
|
long long delay;
|
||||||
|
long long timeout;
|
||||||
|
|
||||||
|
if (RedisModule_StringToLongLong(argv[1],&delay) != REDISMODULE_OK) {
|
||||||
|
return RedisModule_ReplyWithError(ctx,"ERR invalid count");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (RedisModule_StringToLongLong(argv[2],&timeout) != REDISMODULE_OK) {
|
||||||
|
return RedisModule_ReplyWithError(ctx,"ERR invalid count");
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_t tid;
|
||||||
|
RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout);
|
||||||
|
|
||||||
|
/* Here we set a disconnection handler, however since this module will
|
||||||
|
* block in sleep() in a thread, there is not much we can do in the
|
||||||
|
* callback, so this is just to show you the API. */
|
||||||
|
RedisModule_SetDisconnectCallback(bc,HelloBlock_Disconnected);
|
||||||
|
|
||||||
|
/* Now that we setup a blocking client, we need to pass the control
|
||||||
|
* to the thread. However we need to pass arguments to the thread:
|
||||||
|
* the delay and a reference to the blocked client handle. */
|
||||||
|
void **targ = RedisModule_Alloc(sizeof(void*)*3);
|
||||||
|
targ[0] = bc;
|
||||||
|
targ[1] = (void*)(unsigned long) delay;
|
||||||
|
// pass 0 as flag to enable time tracking
|
||||||
|
targ[2] = (void*)(unsigned long) 0;
|
||||||
|
|
||||||
|
if (pthread_create(&tid,NULL,BlockDebug_ThreadMain,targ) != 0) {
|
||||||
|
RedisModule_AbortBlock(bc);
|
||||||
|
return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
|
||||||
|
}
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* BLOCK.DOUBLE_DEBUG <delay_ms> -- Block for 2 x <count> milliseconds,
|
||||||
|
* then reply with a random number.
|
||||||
|
* This command is used to test multiple calls to RedisModule_BlockedClientMeasureTimeStart()
|
||||||
|
* and RedisModule_BlockedClientMeasureTimeEnd() within the same execution. */
|
||||||
|
int HelloDoubleBlock_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||||
|
if (argc != 2) return RedisModule_WrongArity(ctx);
|
||||||
|
long long delay;
|
||||||
|
long long timeout;
|
||||||
|
|
||||||
|
if (RedisModule_StringToLongLong(argv[1],&delay) != REDISMODULE_OK) {
|
||||||
|
return RedisModule_ReplyWithError(ctx,"ERR invalid count");
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_t tid;
|
||||||
|
RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout);
|
||||||
|
|
||||||
|
/* Now that we setup a blocking client, we need to pass the control
|
||||||
|
* to the thread. However we need to pass arguments to the thread:
|
||||||
|
* the delay and a reference to the blocked client handle. */
|
||||||
|
void **targ = RedisModule_Alloc(sizeof(void*)*2);
|
||||||
|
targ[0] = bc;
|
||||||
|
targ[1] = (void*)(unsigned long) delay;
|
||||||
|
|
||||||
|
if (pthread_create(&tid,NULL,DoubleBlock_ThreadMain,targ) != 0) {
|
||||||
|
RedisModule_AbortBlock(bc);
|
||||||
|
return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
|
||||||
|
}
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||||
|
UNUSED(argv);
|
||||||
|
UNUSED(argc);
|
||||||
|
|
||||||
|
if (RedisModule_Init(ctx,"block",1,REDISMODULE_APIVER_1)
|
||||||
|
== REDISMODULE_ERR) return REDISMODULE_ERR;
|
||||||
|
|
||||||
|
if (RedisModule_CreateCommand(ctx,"block.debug",
|
||||||
|
HelloBlock_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
|
||||||
|
return REDISMODULE_ERR;
|
||||||
|
|
||||||
|
if (RedisModule_CreateCommand(ctx,"block.double_debug",
|
||||||
|
HelloDoubleBlock_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
|
||||||
|
return REDISMODULE_ERR;
|
||||||
|
|
||||||
|
if (RedisModule_CreateCommand(ctx,"block.debug_no_track",
|
||||||
|
HelloBlockNoTracking_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
|
||||||
|
return REDISMODULE_ERR;
|
||||||
|
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
}
|
67
tests/unit/moduleapi/blockonbackground.tcl
Normal file
67
tests/unit/moduleapi/blockonbackground.tcl
Normal file
@ -0,0 +1,67 @@
|
|||||||
|
set testmodule [file normalize tests/modules/blockonbackground.so]
|
||||||
|
|
||||||
|
source tests/support/util.tcl
|
||||||
|
|
||||||
|
start_server {tags {"modules"}} {
|
||||||
|
r module load $testmodule
|
||||||
|
|
||||||
|
test { blocked clients time tracking - check blocked command that uses RedisModule_BlockedClientMeasureTimeStart() is tracking background time} {
|
||||||
|
r slowlog reset
|
||||||
|
r config set slowlog-log-slower-than 200000
|
||||||
|
assert_equal [r slowlog len] 0
|
||||||
|
r block.debug 0 10000
|
||||||
|
assert_equal [r slowlog len] 0
|
||||||
|
r config resetstat
|
||||||
|
r block.debug 200 10000
|
||||||
|
assert_equal [r slowlog len] 1
|
||||||
|
|
||||||
|
set cmdstatline [cmdrstat block.debug r]
|
||||||
|
|
||||||
|
regexp "calls=1,usec=(.*?),usec_per_call=(.*?),rejected_calls=0,failed_calls=0" $cmdstatline usec usec_per_call
|
||||||
|
assert {$usec >= 100000}
|
||||||
|
assert {$usec_per_call >= 100000}
|
||||||
|
}
|
||||||
|
|
||||||
|
test { blocked clients time tracking - check blocked command that uses RedisModule_BlockedClientMeasureTimeStart() is tracking background time even in timeout } {
|
||||||
|
r slowlog reset
|
||||||
|
r config set slowlog-log-slower-than 200000
|
||||||
|
assert_equal [r slowlog len] 0
|
||||||
|
r block.debug 0 20000
|
||||||
|
assert_equal [r slowlog len] 0
|
||||||
|
r config resetstat
|
||||||
|
r block.debug 20000 200
|
||||||
|
assert_equal [r slowlog len] 1
|
||||||
|
|
||||||
|
set cmdstatline [cmdrstat block.debug r]
|
||||||
|
|
||||||
|
regexp "calls=1,usec=(.*?),usec_per_call=(.*?),rejected_calls=0,failed_calls=0" $cmdstatline usec usec_per_call
|
||||||
|
assert {$usec >= 100000}
|
||||||
|
assert {$usec_per_call >= 100000}
|
||||||
|
}
|
||||||
|
|
||||||
|
test { blocked clients time tracking - check blocked command with multiple calls RedisModule_BlockedClientMeasureTimeStart() is tracking the total background time } {
|
||||||
|
r slowlog reset
|
||||||
|
r config set slowlog-log-slower-than 200000
|
||||||
|
assert_equal [r slowlog len] 0
|
||||||
|
r block.double_debug 0
|
||||||
|
assert_equal [r slowlog len] 0
|
||||||
|
r config resetstat
|
||||||
|
r block.double_debug 100
|
||||||
|
assert_equal [r slowlog len] 1
|
||||||
|
|
||||||
|
set cmdstatline [cmdrstat block.double_debug r]
|
||||||
|
|
||||||
|
regexp "calls=1,usec=(.*?),usec_per_call=(.*?),rejected_calls=0,failed_calls=0" $cmdstatline usec usec_per_call
|
||||||
|
assert {$usec >= 60000}
|
||||||
|
assert {$usec_per_call >= 60000}
|
||||||
|
}
|
||||||
|
|
||||||
|
test { blocked clients time tracking - check blocked command without calling RedisModule_BlockedClientMeasureTimeStart() is not reporting background time } {
|
||||||
|
r slowlog reset
|
||||||
|
r config set slowlog-log-slower-than 200000
|
||||||
|
assert_equal [r slowlog len] 0
|
||||||
|
r block.debug_no_track 200 1000
|
||||||
|
# ensure slowlog is still empty
|
||||||
|
assert_equal [r slowlog len] 0
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user