Fix delKeysInSlot server events are not executed inside an execution unit (#12745)

This is a follow-up fix to #12733. We need to apply the same changes to
delKeysInSlot. Refer to #12733 for more details.

This PR contains some other minor cleanups / improvements to the test
suite and docs.
It uses the postnotifications test module in a cluster mode test which
revealed a leak in the test module (fixed).
This commit is contained in:
Binbin 2023-12-12 02:15:19 +08:00 committed by GitHub
parent 62419c01db
commit c85a9b7896
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 22 additions and 10 deletions

View File

@ -51,6 +51,7 @@
# #
# loadmodule /path/to/my_module.so # loadmodule /path/to/my_module.so
# loadmodule /path/to/other_module.so # loadmodule /path/to/other_module.so
# loadmodule /path/to/args_module.so [arg [arg ...]]
################################## NETWORK ##################################### ################################## NETWORK #####################################

View File

@ -5698,6 +5698,7 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
dictEntry *de = NULL; dictEntry *de = NULL;
iter = dictGetSafeIterator(server.db->dict[hashslot]); iter = dictGetSafeIterator(server.db->dict[hashslot]);
while((de = dictNext(iter)) != NULL) { while((de = dictNext(iter)) != NULL) {
enterExecutionUnit(1, 0);
sds sdskey = dictGetKey(de); sds sdskey = dictGetKey(de);
robj *key = createStringObject(sdskey, sdslen(sdskey)); robj *key = createStringObject(sdskey, sdslen(sdskey));
dbDelete(&server.db[0], key); dbDelete(&server.db[0], key);
@ -5707,6 +5708,7 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
* The modules needs to know that these keys are no longer available locally, so just send the * The modules needs to know that these keys are no longer available locally, so just send the
* keyspace notification to the modules, but not to clients. */ * keyspace notification to the modules, but not to clients. */
moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, server.db[0].id); moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, server.db[0].id);
exitExecutionUnit();
postExecutionUnitOperations(); postExecutionUnitOperations();
decrRefCount(key); decrRefCount(key);
j++; j++;

View File

@ -75,7 +75,8 @@ static int KeySpace_NotificationExpired(RedisModuleCtx *ctx, int type, const cha
REDISMODULE_NOT_USED(key); REDISMODULE_NOT_USED(key);
RedisModuleString *new_key = RedisModule_CreateString(NULL, "expired", 7); RedisModuleString *new_key = RedisModule_CreateString(NULL, "expired", 7);
RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, new_key, KeySpace_PostNotificationStringFreePD); int res = RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, new_key, KeySpace_PostNotificationStringFreePD);
if (res == REDISMODULE_ERR) KeySpace_PostNotificationStringFreePD(new_key);
return REDISMODULE_OK; return REDISMODULE_OK;
} }
@ -95,7 +96,8 @@ static int KeySpace_NotificationEvicted(RedisModuleCtx *ctx, int type, const cha
} }
RedisModuleString *new_key = RedisModule_CreateString(NULL, "evicted", 7); RedisModuleString *new_key = RedisModule_CreateString(NULL, "evicted", 7);
RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, new_key, KeySpace_PostNotificationStringFreePD); int res = RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, new_key, KeySpace_PostNotificationStringFreePD);
if (res == REDISMODULE_ERR) KeySpace_PostNotificationStringFreePD(new_key);
return REDISMODULE_OK; return REDISMODULE_OK;
} }
@ -121,7 +123,8 @@ static int KeySpace_NotificationString(RedisModuleCtx *ctx, int type, const char
new_key = RedisModule_CreateStringPrintf(NULL, "string_changed{%s}", key_str); new_key = RedisModule_CreateStringPrintf(NULL, "string_changed{%s}", key_str);
} }
RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, new_key, KeySpace_PostNotificationStringFreePD); int res = RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, new_key, KeySpace_PostNotificationStringFreePD);
if (res == REDISMODULE_ERR) KeySpace_PostNotificationStringFreePD(new_key);
return REDISMODULE_OK; return REDISMODULE_OK;
} }
@ -137,7 +140,8 @@ static int KeySpace_LazyExpireInsidePostNotificationJob(RedisModuleCtx *ctx, int
} }
RedisModuleString *new_key = RedisModule_CreateString(NULL, key_str + 5, strlen(key_str) - 5);; RedisModuleString *new_key = RedisModule_CreateString(NULL, key_str + 5, strlen(key_str) - 5);;
RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationReadKey, new_key, KeySpace_PostNotificationStringFreePD); int res = RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationReadKey, new_key, KeySpace_PostNotificationStringFreePD);
if (res == REDISMODULE_ERR) KeySpace_PostNotificationStringFreePD(new_key);
return REDISMODULE_OK; return REDISMODULE_OK;
} }
@ -236,7 +240,8 @@ static void KeySpace_ServerEventCallback(RedisModuleCtx *ctx, RedisModuleEvent e
KeySpace_EventPostNotificationCtx *pn_ctx = RedisModule_Alloc(sizeof(*pn_ctx)); KeySpace_EventPostNotificationCtx *pn_ctx = RedisModule_Alloc(sizeof(*pn_ctx));
pn_ctx->triggered_on = RedisModule_HoldString(NULL, (RedisModuleString*)key_name); pn_ctx->triggered_on = RedisModule_HoldString(NULL, (RedisModuleString*)key_name);
pn_ctx->new_key = RedisModule_CreateString(NULL, events[subevent], strlen(events[subevent])); pn_ctx->new_key = RedisModule_CreateString(NULL, events[subevent], strlen(events[subevent]));
RedisModule_AddPostNotificationJob(ctx, KeySpace_ServerEventPostNotification, pn_ctx, KeySpace_ServerEventPostNotificationFree); int res = RedisModule_AddPostNotificationJob(ctx, KeySpace_ServerEventPostNotification, pn_ctx, KeySpace_ServerEventPostNotificationFree);
if (res == REDISMODULE_ERR) KeySpace_ServerEventPostNotificationFree(pn_ctx);
} }
/* This function must be present on each Redis module. It is used in order to /* This function must be present on each Redis module. It is used in order to

View File

@ -5,9 +5,9 @@ set ::valgrind_errors {}
proc start_server_error {config_file error} { proc start_server_error {config_file error} {
set err {} set err {}
append err "Can't start the Redis server\n" append err "Can't start the Redis server\n"
append err "CONFIGURATION:" append err "CONFIGURATION:\n"
append err [exec cat $config_file] append err [exec cat $config_file]
append err "\nERROR:" append err "\nERROR:\n"
append err [string trim $error] append err [string trim $error]
send_data_packet $::test_server_fd err $err send_data_packet $::test_server_fd err $err
} }

View File

@ -163,7 +163,9 @@ start_cluster 3 0 [list config_lines $modules] {
$node2_rd close $node2_rd close
} }
set modules [list loadmodule [file normalize tests/modules/keyspace_events.so]] set testmodule_keyspace_events [file normalize tests/modules/keyspace_events.so]
set testmodule_postnotifications "[file normalize tests/modules/postnotifications.so] with_key_events"
set modules [list loadmodule $testmodule_keyspace_events loadmodule $testmodule_postnotifications]
start_cluster 2 2 [list config_lines $modules] { start_cluster 2 2 [list config_lines $modules] {
set master1 [srv 0 client] set master1 [srv 0 client]
@ -195,10 +197,12 @@ start_cluster 2 2 [list config_lines $modules] {
fail "replica did not increase del counter" fail "replica did not increase del counter"
} }
# the {lpush before_deleted count_dels_{4oi}} is a post notification job registered when 'count_dels_{4oi}' was removed
assert_replication_stream $repl { assert_replication_stream $repl {
{multi} {multi}
{del count_dels_{4oi}} {del count_dels_{4oi}}
{keyspace.incr_dels} {keyspace.incr_dels}
{lpush before_deleted count_dels_{4oi}}
{exec} {exec}
} }
close_replication_stream $repl close_replication_stream $repl