Fix wrong replication on cluster slotmap changes with module KSN propagation (#11377)

As discussed on #11084, `propagatePendingCommands` should happened after the del
notification is fired so that the notification effect and the `del` will be replicated inside MULTI EXEC.

Test was added to verify the fix.
This commit is contained in:
Meir Shpilraien (Spielrein) 2022-10-16 08:30:01 +03:00 committed by GitHub
parent 871cc200a0
commit 56f97bfa5f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 92 additions and 5 deletions

View File

@ -7092,6 +7092,10 @@ void slotToKeyDestroy(redisDb *db) {
* The number of removed items is returned. */
unsigned int delKeysInSlot(unsigned int hashslot) {
unsigned int j = 0;
server.core_propagates = 1;
server.in_nested_call++;
dictEntry *de = (*server.db->slots_to_keys).by_slot[hashslot].head;
while (de != NULL) {
sds sdskey = dictGetKey(de);
@ -7099,13 +7103,17 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
robj *key = createStringObject(sdskey, sdslen(sdskey));
dbDelete(&server.db[0], key);
propagateDeletion(&server.db[0], key, server.lazyfree_lazy_server_del);
propagatePendingCommands();
signalModifiedKey(NULL, &server.db[0], key);
moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, server.db[0].id);
decrRefCount(key);
propagatePendingCommands();
j++;
server.dirty++;
}
serverAssert(server.core_propagates); /* This function should not be re-entrant */
server.core_propagates = 0;
server.in_nested_call--;
return j;
}

View File

@ -45,6 +45,9 @@ RedisModuleDict *loaded_event_log = NULL;
/** stores all the keys on which we got 'module' keyspace notification **/
RedisModuleDict *module_event_log = NULL;
/** Counts how many deleted KSN we got on keys with a prefix of "count_dels_" **/
static size_t dels = 0;
static int KeySpace_NotificationLoaded(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key){
REDISMODULE_NOT_USED(ctx);
REDISMODULE_NOT_USED(type);
@ -63,7 +66,14 @@ static int KeySpace_NotificationLoaded(RedisModuleCtx *ctx, int type, const char
static int KeySpace_NotificationGeneric(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) {
REDISMODULE_NOT_USED(type);
const char *key_str = RedisModule_StringPtrLen(key, NULL);
if (strncmp(key_str, "count_dels_", 11) == 0 && strcmp(event, "del") == 0) {
if (RedisModule_GetContextFlags(ctx) & REDISMODULE_CTX_FLAGS_MASTER) {
dels++;
RedisModule_Replicate(ctx, "keyspace.incr_dels", "");
}
return REDISMODULE_OK;
}
if (cached_time) {
RedisModule_Assert(cached_time == RedisModule_CachedMicroseconds());
usleep(1);
@ -249,6 +259,18 @@ static int cmdIncrCase3(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
return REDISMODULE_OK;
}
static int cmdIncrDels(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
dels++;
return RedisModule_ReplyWithSimpleString(ctx, "OK");
}
static int cmdGetDels(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
return RedisModule_ReplyWithLongLong(ctx, dels);
}
/* This function must be present on each Redis module. It is used in order to
* register the commands into the Redis server. */
@ -322,6 +344,16 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx, "keyspace.incr_dels", cmdIncrDels,
"write", 0, 0, 0) == REDISMODULE_ERR){
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx, "keyspace.get_dels", cmdGetDels,
"readonly", 0, 0, 0) == REDISMODULE_ERR){
return REDISMODULE_ERR;
}
return REDISMODULE_OK;
}

View File

@ -797,12 +797,12 @@ if {[llength $filtered_tests] < [llength $::all_tests]} {
set ::all_tests $filtered_tests
}
proc attach_to_replication_stream {} {
proc attach_to_replication_stream_on_connection {conn} {
r config set repl-ping-replica-period 3600
if {$::tls} {
set s [::tls::socket [srv 0 "host"] [srv 0 "port"]]
set s [::tls::socket [srv $conn "host"] [srv $conn "port"]]
} else {
set s [socket [srv 0 "host"] [srv 0 "port"]]
set s [socket [srv $conn "host"] [srv $conn "port"]]
}
fconfigure $s -translation binary
puts -nonewline $s "SYNC\r\n"
@ -827,6 +827,10 @@ proc attach_to_replication_stream {} {
return $s
}
proc attach_to_replication_stream {} {
return [attach_to_replication_stream_on_connection 0]
}
proc read_from_replication_stream {s} {
fconfigure $s -blocking 0
set attempt 0

View File

@ -162,6 +162,49 @@ start_cluster 3 0 [list config_lines $modules] {
$node1_rd close
$node2_rd close
}
set modules [list loadmodule [file normalize tests/modules/keyspace_events.so]]
start_cluster 2 2 [list config_lines $modules] {
set master1 [srv 0 client]
set master2 [srv -1 client]
set replica1 [srv -2 client]
set replica2 [srv -3 client]
test "Verify keys deletion and notification effects happened on cluster slots change are replicated inside multi exec" {
$master2 set count_dels_{4oi} 1
$master2 del count_dels_{4oi}
assert_equal 1 [$master2 keyspace.get_dels]
assert_equal 1 [$replica2 keyspace.get_dels]
$master2 set count_dels_{4oi} 1
set repl [attach_to_replication_stream_on_connection -3]
$master1 cluster bumpepoch
$master1 cluster setslot 16382 node [$master1 cluster myid]
wait_for_cluster_propagation
wait_for_condition 50 100 {
[$master2 keyspace.get_dels] eq 2
} else {
fail "master did not delete the key"
}
wait_for_condition 50 100 {
[$replica2 keyspace.get_dels] eq 2
} else {
fail "replica did not increase del counter"
}
assert_replication_stream $repl {
{multi}
{del count_dels_{4oi}}
{keyspace.incr_dels}
{exec}
}
close_replication_stream $repl
}
}
}
set testmodule [file normalize tests/modules/basics.so]