Modules: Unblock from within a timer coverage (#12337)
Apart from adding the missing coverage, this PR also adds `blockedBeforeSleep` that gathers all block-related functions from `beforeSleep` The order inside `blockedBeforeSleep` is different: now `handleClientsBlockedOnKeys` (which may unblock clients) is called before `processUnblockedClients` (which handles unblocked clients). It makes sense to have this order. There are no visible effects of the wrong ordering, except some cleanups of the now-unblocked client would have happen in the next `beforeSleep` (will now happen in the current one) The reason we even got into it is because i triggers an assertion in logresreq.c (breaking the assumption that `unblockClient` is called **before** actually flushing the reply to the socket): `handleClientsBlockedOnKeys` is called, then it calls `moduleUnblockClientOnKey`, which calls `moduleUnblockClient`, which adds the client to `moduleUnblockedClients` back to `beforeSleep`, we call `handleClientsWithPendingWritesUsingThreads`, it writes the data of buf to the client, so `client->bufpos` became 0 On the next `beforeSleep`, we call `moduleHandleBlockedClients`, which calls `unblockClient`, which calls `reqresAppendResponse`, triggering the assert. (because the `bufpos` is 0) - see https://github.com/redis/redis/pull/12301#discussion_r1226386716
This commit is contained in:
parent
9e5f45f230
commit
3230199920
@ -734,3 +734,30 @@ void totalNumberOfBlockingKeys(unsigned long *blocking_keys, unsigned long *blok
|
||||
if (bloking_keys_on_nokey)
|
||||
*bloking_keys_on_nokey = bkeys_on_nokey;
|
||||
}
|
||||
|
||||
void blockedBeforeSleep(void) {
|
||||
/* Handle precise timeouts of blocked clients. */
|
||||
handleBlockedClientsTimeout();
|
||||
|
||||
/* Unblock all the clients blocked for synchronous replication
|
||||
* in WAIT or WAITAOF. */
|
||||
if (listLength(server.clients_waiting_acks))
|
||||
processClientsWaitingReplicas();
|
||||
|
||||
/* Try to process blocked clients every once in while.
|
||||
*
|
||||
* Example: A module calls RM_SignalKeyAsReady from within a timer callback
|
||||
* (So we don't visit processCommand() at all).
|
||||
*
|
||||
* This may unblock clients, so must be done before processUnblockedClients */
|
||||
handleClientsBlockedOnKeys();
|
||||
|
||||
/* Check if there are clients unblocked by modules that implement
|
||||
* blocking commands. */
|
||||
if (moduleCount())
|
||||
moduleHandleBlockedClients();
|
||||
|
||||
/* Try to process pending commands for clients that were just unblocked. */
|
||||
if (listLength(server.unblocked_clients))
|
||||
processUnblockedClients();
|
||||
}
|
||||
|
29
src/server.c
29
src/server.c
@ -1648,9 +1648,6 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
||||
return;
|
||||
}
|
||||
|
||||
/* Handle precise timeouts of blocked clients. */
|
||||
handleBlockedClientsTimeout();
|
||||
|
||||
/* We should handle pending reads clients ASAP after event loop. */
|
||||
handleClientsWithPendingReadsUsingThreads();
|
||||
|
||||
@ -1660,6 +1657,11 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
||||
/* If any connection type(typical TLS) still has pending unread data don't sleep at all. */
|
||||
aeSetDontWait(server.el, connTypeHasPendingData());
|
||||
|
||||
/* Handle blocked clients.
|
||||
* must be done before flushAppendOnlyFile, in case of appendfsync=always,
|
||||
* since the unblocked clients may write data. */
|
||||
blockedBeforeSleep();
|
||||
|
||||
/* Record cron time in beforeSleep, which is the sum of active-expire, active-defrag and all other
|
||||
* tasks done by cron and beforeSleep, but excluding read, write and AOF, that are counted by other
|
||||
* sets of metrics. */
|
||||
@ -1676,24 +1678,12 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
||||
if (server.active_expire_enabled && iAmMaster())
|
||||
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
|
||||
|
||||
/* Unblock all the clients blocked for synchronous replication
|
||||
* in WAIT or WAITAOF. */
|
||||
if (listLength(server.clients_waiting_acks))
|
||||
processClientsWaitingReplicas();
|
||||
|
||||
/* Check if there are clients unblocked by modules that implement
|
||||
* blocking commands. */
|
||||
if (moduleCount()) {
|
||||
moduleFireServerEvent(REDISMODULE_EVENT_EVENTLOOP,
|
||||
REDISMODULE_SUBEVENT_EVENTLOOP_BEFORE_SLEEP,
|
||||
NULL);
|
||||
moduleHandleBlockedClients();
|
||||
}
|
||||
|
||||
/* Try to process pending commands for clients that were just unblocked. */
|
||||
if (listLength(server.unblocked_clients))
|
||||
processUnblockedClients();
|
||||
|
||||
/* Send all the slaves an ACK request if at least one client blocked
|
||||
* during the previous event loop iteration. Note that we do this after
|
||||
* processUnblockedClients(), so if there are multiple pipelined WAITs
|
||||
@ -1723,15 +1713,6 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
||||
* client side caching protocol in broadcasting (BCAST) mode. */
|
||||
trackingBroadcastInvalidationMessages();
|
||||
|
||||
/* Try to process blocked clients every once in while.
|
||||
*
|
||||
* Example: A module calls RM_SignalKeyAsReady from within a timer callback
|
||||
* (So we don't visit processCommand() at all).
|
||||
*
|
||||
* must be done before flushAppendOnlyFile, in case of appendfsync=always,
|
||||
* since the unblocked clients may write data. */
|
||||
handleClientsBlockedOnKeys();
|
||||
|
||||
/* Record time consumption of AOF writing. */
|
||||
monotime aof_start_time = getMonotonicUs();
|
||||
/* Record cron time in beforeSleep. This does not include the time consumed by AOF writing and IO writing below. */
|
||||
|
@ -3383,7 +3383,7 @@ void signalDeletedKeyAsReady(redisDb *db, robj *key, int type);
|
||||
void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors);
|
||||
void scanDatabaseForDeletedKeys(redisDb *emptied, redisDb *replaced_with);
|
||||
void totalNumberOfBlockingKeys(unsigned long *blocking_keys, unsigned long *bloking_keys_on_nokey);
|
||||
|
||||
void blockedBeforeSleep(void);
|
||||
|
||||
/* timeout.c -- Blocked clients timeout and connections timeout. */
|
||||
void addClientToTimeoutTable(client *c);
|
||||
|
@ -553,6 +553,39 @@ static int is_in_slow_bg_operation(RedisModuleCtx *ctx, RedisModuleString **argv
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
static void timer_callback(RedisModuleCtx *ctx, void *data)
|
||||
{
|
||||
UNUSED(ctx);
|
||||
|
||||
RedisModuleBlockedClient *bc = data;
|
||||
|
||||
// Get Redis module context
|
||||
RedisModuleCtx *reply_ctx = RedisModule_GetThreadSafeContext(bc);
|
||||
|
||||
// Reply to client
|
||||
RedisModule_ReplyWithSimpleString(reply_ctx, "OK");
|
||||
|
||||
// Unblock client
|
||||
RedisModule_UnblockClient(bc, NULL);
|
||||
|
||||
// Free the Redis module context
|
||||
RedisModule_FreeThreadSafeContext(reply_ctx);
|
||||
}
|
||||
|
||||
int unblock_by_timer(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
{
|
||||
if (argc != 2)
|
||||
return RedisModule_WrongArity(ctx);
|
||||
|
||||
long long period;
|
||||
if (RedisModule_StringToLongLong(argv[1],&period) != REDISMODULE_OK)
|
||||
return RedisModule_ReplyWithError(ctx,"ERR invalid period");
|
||||
|
||||
RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx, NULL, NULL, NULL, 0);
|
||||
RedisModule_CreateTimer(ctx, period, timer_callback, bc);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
REDISMODULE_NOT_USED(argv);
|
||||
REDISMODULE_NOT_USED(argc);
|
||||
@ -612,5 +645,8 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
if (RedisModule_CreateCommand(ctx, "is_in_slow_bg_operation", is_in_slow_bg_operation, "allow-busy", 0, 0, 0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx, "unblock_by_timer", unblock_by_timer, "", 0, 0, 0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
@ -136,6 +136,67 @@ int fsl_push(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
return RedisModule_ReplyWithSimpleString(ctx, "OK");
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
RedisModuleString *keyname;
|
||||
long long ele;
|
||||
} timer_data_t;
|
||||
|
||||
static void timer_callback(RedisModuleCtx *ctx, void *data)
|
||||
{
|
||||
timer_data_t *td = data;
|
||||
|
||||
fsl_t *fsl;
|
||||
if (!get_fsl(ctx, td->keyname, REDISMODULE_WRITE, 1, &fsl, 1))
|
||||
return;
|
||||
|
||||
if (fsl->length == LIST_SIZE)
|
||||
return; /* list is full */
|
||||
|
||||
if (fsl->length != 0 && fsl->list[fsl->length-1] >= td->ele)
|
||||
return; /* new element has to be greater than the head element */
|
||||
|
||||
fsl->list[fsl->length++] = td->ele;
|
||||
RedisModule_SignalKeyAsReady(ctx, td->keyname);
|
||||
|
||||
RedisModule_Replicate(ctx, "FSL.PUSH", "sl", td->keyname, td->ele);
|
||||
|
||||
RedisModule_FreeString(ctx, td->keyname);
|
||||
RedisModule_Free(td);
|
||||
}
|
||||
|
||||
/* FSL.PUSHTIMER <key> <int> <period-in-ms> - Push the number 9000 to the fixed-size list (to the right).
|
||||
* It must be greater than the element in the head of the list. */
|
||||
int fsl_pushtimer(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
{
|
||||
if (argc != 4)
|
||||
return RedisModule_WrongArity(ctx);
|
||||
|
||||
long long ele;
|
||||
if (RedisModule_StringToLongLong(argv[2],&ele) != REDISMODULE_OK)
|
||||
return RedisModule_ReplyWithError(ctx,"ERR invalid integer");
|
||||
|
||||
long long period;
|
||||
if (RedisModule_StringToLongLong(argv[3],&period) != REDISMODULE_OK)
|
||||
return RedisModule_ReplyWithError(ctx,"ERR invalid period");
|
||||
|
||||
fsl_t *fsl;
|
||||
if (!get_fsl(ctx, argv[1], REDISMODULE_WRITE, 1, &fsl, 1))
|
||||
return REDISMODULE_OK;
|
||||
|
||||
if (fsl->length == LIST_SIZE)
|
||||
return RedisModule_ReplyWithError(ctx,"ERR list is full");
|
||||
|
||||
timer_data_t *td = RedisModule_Alloc(sizeof(*td));
|
||||
td->keyname = argv[1];
|
||||
RedisModule_RetainString(ctx, td->keyname);
|
||||
td->ele = ele;
|
||||
|
||||
RedisModuleTimerID id = RedisModule_CreateTimer(ctx, period, timer_callback, td);
|
||||
RedisModule_ReplyWithLongLong(ctx, id);
|
||||
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
int bpop_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
REDISMODULE_NOT_USED(argv);
|
||||
REDISMODULE_NOT_USED(argc);
|
||||
@ -546,6 +607,9 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
if (RedisModule_CreateCommand(ctx,"fsl.push",fsl_push,"write",1,1,1) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx,"fsl.pushtimer",fsl_pushtimer,"write",1,1,1) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx,"fsl.bpop",fsl_bpop,"write",1,1,1) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
|
@ -276,6 +276,10 @@ foreach call_type {nested normal} {
|
||||
assert_equal [$replica hget bk1 foo] bar
|
||||
}
|
||||
}
|
||||
|
||||
test {Unblock by timer} {
|
||||
assert_match "OK" [r unblock_by_timer 100]
|
||||
}
|
||||
|
||||
test "Unload the module - blockedclient" {
|
||||
assert_equal {OK} [r module unload blockedclient]
|
||||
|
@ -34,6 +34,31 @@ start_server {tags {"modules"}} {
|
||||
assert_equal {42} [r fsl.getall src]
|
||||
}
|
||||
|
||||
test "Module client blocked on keys: BPOPPUSH unblocked by timer" {
|
||||
set rd1 [redis_deferring_client]
|
||||
|
||||
r del src dst
|
||||
|
||||
set repl [attach_to_replication_stream]
|
||||
|
||||
$rd1 fsl.bpoppush src dst 0
|
||||
wait_for_blocked_clients_count 1
|
||||
|
||||
r fsl.pushtimer src 9000 10
|
||||
wait_for_blocked_clients_count 0
|
||||
|
||||
assert_equal {9000} [r fsl.getall dst]
|
||||
assert_equal {} [r fsl.getall src]
|
||||
|
||||
assert_replication_stream $repl {
|
||||
{select *}
|
||||
{fsl.push src 9000}
|
||||
{fsl.bpoppush src dst 0}
|
||||
}
|
||||
|
||||
close_replication_stream $repl
|
||||
} {} {needs:repl}
|
||||
|
||||
test {Module client blocked on keys (no metadata): No block} {
|
||||
r del k
|
||||
r fsl.push k 33
|
||||
|
Loading…
x
Reference in New Issue
Block a user