From 56f97bfa5fb082f812c711676d5a77d29af56940 Mon Sep 17 00:00:00 2001 From: "Meir Shpilraien (Spielrein)" Date: Sun, 16 Oct 2022 08:30:01 +0300 Subject: [PATCH] Fix wrong replication on cluster slotmap changes with module KSN propagation (#11377) As discussed on #11084, `propagatePendingCommands` should happened after the del notification is fired so that the notification effect and the `del` will be replicated inside MULTI EXEC. Test was added to verify the fix. --- src/cluster.c | 10 +++++++- tests/modules/keyspace_events.c | 34 ++++++++++++++++++++++++- tests/test_helper.tcl | 10 +++++--- tests/unit/moduleapi/cluster.tcl | 43 ++++++++++++++++++++++++++++++++ 4 files changed, 92 insertions(+), 5 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index f7603611f..feb8f8d21 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -7092,6 +7092,10 @@ void slotToKeyDestroy(redisDb *db) { * The number of removed items is returned. */ unsigned int delKeysInSlot(unsigned int hashslot) { unsigned int j = 0; + + server.core_propagates = 1; + server.in_nested_call++; + dictEntry *de = (*server.db->slots_to_keys).by_slot[hashslot].head; while (de != NULL) { sds sdskey = dictGetKey(de); @@ -7099,13 +7103,17 @@ unsigned int delKeysInSlot(unsigned int hashslot) { robj *key = createStringObject(sdskey, sdslen(sdskey)); dbDelete(&server.db[0], key); propagateDeletion(&server.db[0], key, server.lazyfree_lazy_server_del); - propagatePendingCommands(); signalModifiedKey(NULL, &server.db[0], key); moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, server.db[0].id); decrRefCount(key); + propagatePendingCommands(); j++; server.dirty++; } + serverAssert(server.core_propagates); /* This function should not be re-entrant */ + + server.core_propagates = 0; + server.in_nested_call--; return j; } diff --git a/tests/modules/keyspace_events.c b/tests/modules/keyspace_events.c index 44de09979..bb9987883 100644 --- a/tests/modules/keyspace_events.c +++ b/tests/modules/keyspace_events.c @@ -45,6 +45,9 @@ RedisModuleDict *loaded_event_log = NULL; /** stores all the keys on which we got 'module' keyspace notification **/ RedisModuleDict *module_event_log = NULL; +/** Counts how many deleted KSN we got on keys with a prefix of "count_dels_" **/ +static size_t dels = 0; + static int KeySpace_NotificationLoaded(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key){ REDISMODULE_NOT_USED(ctx); REDISMODULE_NOT_USED(type); @@ -63,7 +66,14 @@ static int KeySpace_NotificationLoaded(RedisModuleCtx *ctx, int type, const char static int KeySpace_NotificationGeneric(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) { REDISMODULE_NOT_USED(type); - + const char *key_str = RedisModule_StringPtrLen(key, NULL); + if (strncmp(key_str, "count_dels_", 11) == 0 && strcmp(event, "del") == 0) { + if (RedisModule_GetContextFlags(ctx) & REDISMODULE_CTX_FLAGS_MASTER) { + dels++; + RedisModule_Replicate(ctx, "keyspace.incr_dels", ""); + } + return REDISMODULE_OK; + } if (cached_time) { RedisModule_Assert(cached_time == RedisModule_CachedMicroseconds()); usleep(1); @@ -249,6 +259,18 @@ static int cmdIncrCase3(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_OK; } +static int cmdIncrDels(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + dels++; + return RedisModule_ReplyWithSimpleString(ctx, "OK"); +} + +static int cmdGetDels(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + return RedisModule_ReplyWithLongLong(ctx, dels); +} /* This function must be present on each Redis module. It is used in order to * register the commands into the Redis server. */ @@ -322,6 +344,16 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_ERR; } + if (RedisModule_CreateCommand(ctx, "keyspace.incr_dels", cmdIncrDels, + "write", 0, 0, 0) == REDISMODULE_ERR){ + return REDISMODULE_ERR; + } + + if (RedisModule_CreateCommand(ctx, "keyspace.get_dels", cmdGetDels, + "readonly", 0, 0, 0) == REDISMODULE_ERR){ + return REDISMODULE_ERR; + } + return REDISMODULE_OK; } diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index 44a4f2e15..0662b6681 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -797,12 +797,12 @@ if {[llength $filtered_tests] < [llength $::all_tests]} { set ::all_tests $filtered_tests } -proc attach_to_replication_stream {} { +proc attach_to_replication_stream_on_connection {conn} { r config set repl-ping-replica-period 3600 if {$::tls} { - set s [::tls::socket [srv 0 "host"] [srv 0 "port"]] + set s [::tls::socket [srv $conn "host"] [srv $conn "port"]] } else { - set s [socket [srv 0 "host"] [srv 0 "port"]] + set s [socket [srv $conn "host"] [srv $conn "port"]] } fconfigure $s -translation binary puts -nonewline $s "SYNC\r\n" @@ -827,6 +827,10 @@ proc attach_to_replication_stream {} { return $s } +proc attach_to_replication_stream {} { + return [attach_to_replication_stream_on_connection 0] +} + proc read_from_replication_stream {s} { fconfigure $s -blocking 0 set attempt 0 diff --git a/tests/unit/moduleapi/cluster.tcl b/tests/unit/moduleapi/cluster.tcl index c7ec45a71..8851b0301 100644 --- a/tests/unit/moduleapi/cluster.tcl +++ b/tests/unit/moduleapi/cluster.tcl @@ -162,6 +162,49 @@ start_cluster 3 0 [list config_lines $modules] { $node1_rd close $node2_rd close } + +set modules [list loadmodule [file normalize tests/modules/keyspace_events.so]] +start_cluster 2 2 [list config_lines $modules] { + + set master1 [srv 0 client] + set master2 [srv -1 client] + set replica1 [srv -2 client] + set replica2 [srv -3 client] + + test "Verify keys deletion and notification effects happened on cluster slots change are replicated inside multi exec" { + $master2 set count_dels_{4oi} 1 + $master2 del count_dels_{4oi} + assert_equal 1 [$master2 keyspace.get_dels] + assert_equal 1 [$replica2 keyspace.get_dels] + $master2 set count_dels_{4oi} 1 + + set repl [attach_to_replication_stream_on_connection -3] + + $master1 cluster bumpepoch + $master1 cluster setslot 16382 node [$master1 cluster myid] + + wait_for_cluster_propagation + wait_for_condition 50 100 { + [$master2 keyspace.get_dels] eq 2 + } else { + fail "master did not delete the key" + } + wait_for_condition 50 100 { + [$replica2 keyspace.get_dels] eq 2 + } else { + fail "replica did not increase del counter" + } + + assert_replication_stream $repl { + {multi} + {del count_dels_{4oi}} + {keyspace.incr_dels} + {exec} + } + close_replication_stream $repl + } +} + } set testmodule [file normalize tests/modules/basics.so]