diff --git a/src/aof.c b/src/aof.c index f48c8bd1b..e71229512 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1371,6 +1371,7 @@ struct client *createAOFClient(void) { */ c->raw_flag = 0; c->flag.deny_blocking = 1; + c->flag.fake = 1; /* We set the fake client as a replica waiting for the synchronization * so that the server will not try to send replies to this client. */ diff --git a/src/eval.c b/src/eval.c index 580c35bdc..3b8390af7 100644 --- a/src/eval.c +++ b/src/eval.c @@ -260,6 +260,7 @@ void scriptingInit(int setup) { if (lctx.lua_client == NULL) { lctx.lua_client = createClient(NULL); lctx.lua_client->flag.script = 1; + lctx.lua_client->flag.fake = 1; /* We do not want to allow blocking commands inside Lua */ lctx.lua_client->flag.deny_blocking = 1; diff --git a/src/functions.c b/src/functions.c index 38d063492..985d8b3e1 100644 --- a/src/functions.c +++ b/src/functions.c @@ -408,6 +408,7 @@ int functionsRegisterEngine(const char *engine_name, engine *engine) { client *c = createClient(NULL); c->flag.deny_blocking = 1; c->flag.script = 1; + c->flag.fake = 1; engineInfo *ei = zmalloc(sizeof(*ei)); *ei = (engineInfo){ .name = engine_name_sds, diff --git a/src/module.c b/src/module.c index 24cc6a42e..938ae6ba7 100644 --- a/src/module.c +++ b/src/module.c @@ -656,6 +656,7 @@ client *moduleAllocTempClient(void) { } else { c = createClient(NULL); c->flag.module = 1; + c->flag.fake = 1; c->user = NULL; /* Root user */ } return c; @@ -894,8 +895,10 @@ void moduleCreateContext(ValkeyModuleCtx *out_ctx, ValkeyModule *module, int ctx out_ctx->flags = ctx_flags; if (ctx_flags & VALKEYMODULE_CTX_TEMP_CLIENT) out_ctx->client = moduleAllocTempClient(); - else if (ctx_flags & VALKEYMODULE_CTX_NEW_CLIENT) + else if (ctx_flags & VALKEYMODULE_CTX_NEW_CLIENT) { out_ctx->client = createClient(NULL); + out_ctx->client->flag.fake = 1; + } /* Calculate the initial yield time for long blocked contexts. * in loading we depend on the server hz, but in other cases we also wait diff --git a/src/networking.c b/src/networking.c index 44a94087c..eac102269 100644 --- a/src/networking.c +++ b/src/networking.c @@ -346,6 +346,7 @@ client *createCachedResponseClient(int resp) { /* Allocating the `conn` allows to prepare the caching client before adding * data to the clients output buffer by `prepareClientToWrite`. */ recording_client->conn = zcalloc(sizeof(connection)); + recording_client->flag.fake = 1; return recording_client; } @@ -3246,8 +3247,10 @@ char *getClientSockname(client *c) { int isClientConnIpV6(client *c) { /* The cached client peer id is on the form "[IPv6]:port" for IPv6 * addresses, so we just check for '[' here. */ - if (c->conn->type == NULL && server.current_client) { - /* Fake client? Use current client instead. */ + if (c->flag.fake && server.current_client) { + /* Fake client? Use current client instead. + * Noted that in here we are assuming server.current_client is set + * and real (aof has already violated this in loadSingleAppendOnlyFil). */ c = server.current_client; } return getClientPeerId(c)[0] == '['; diff --git a/src/server.h b/src/server.h index a5cee0305..a7cf8635a 100644 --- a/src/server.h +++ b/src/server.h @@ -1223,26 +1223,26 @@ typedef struct ClientFlags { uint64_t reprocessing_command : 1; /* The client is re-processing the command. */ uint64_t replication_done : 1; /* Indicate that replication has been done on the client */ uint64_t authenticated : 1; /* Indicate a client has successfully authenticated */ - uint64_t - protected_rdb_channel : 1; /* Dual channel replication sync: Protects the RDB client from premature \ - * release during full sync. This flag is used to ensure that the RDB client, which \ - * references the first replication data block required by the replica, is not \ - * released prematurely. Protecting the client is crucial for prevention of \ - * synchronization failures: \ - * If the RDB client is released before the replica initiates PSYNC, the primary \ - * will reduce the reference count (o->refcount) of the block needed by the replica. - * \ - * This could potentially lead to the removal of the required data block, resulting \ - * in synchronization failures. Such failures could occur even in scenarios where \ - * the replica only needs an additional 4KB beyond the minimum size of the - * repl_backlog. - * By using this flag, we ensure that the RDB client remains intact until the replica - * \ has successfully initiated PSYNC. */ - uint64_t repl_rdb_channel : 1; /* Dual channel replication sync: track a connection which is used for rdb snapshot */ - uint64_t dont_cache_primary : 1; /* In some cases we don't want to cache the primary. For example, the replica - * knows that it does not need the cache and required a full sync. With this - * flag, we won't cache the primary in freeClient. */ - uint64_t reserved : 6; /* Reserved for future use */ + uint64_t protected_rdb_channel : 1; /* Dual channel replication sync: Protects the RDB client from premature \ + * release during full sync. This flag is used to ensure that the RDB client, which \ + * references the first replication data block required by the replica, is not \ + * released prematurely. Protecting the client is crucial for prevention of \ + * synchronization failures: \ + * If the RDB client is released before the replica initiates PSYNC, the primary \ + * will reduce the reference count (o->refcount) of the block needed by the replica. + * \ + * This could potentially lead to the removal of the required data block, resulting \ + * in synchronization failures. Such failures could occur even in scenarios where \ + * the replica only needs an additional 4KB beyond the minimum size of the + * repl_backlog. + * By using this flag, we ensure that the RDB client remains intact until the replica + * \ has successfully initiated PSYNC. */ + uint64_t repl_rdb_channel : 1; /* Dual channel replication sync: track a connection which is used for rdb snapshot */ + uint64_t dont_cache_primary : 1; /* In some cases we don't want to cache the primary. For example, the replica + * knows that it does not need the cache and required a full sync. With this + * flag, we won't cache the primary in freeClient. */ + uint64_t fake : 1; /* This is a fake client without a real connection. */ + uint64_t reserved : 5; /* Reserved for future use */ } ClientFlags; typedef struct client { diff --git a/tests/integration/aof.tcl b/tests/integration/aof.tcl index 72fae9915..33c7c12d4 100644 --- a/tests/integration/aof.tcl +++ b/tests/integration/aof.tcl @@ -673,3 +673,26 @@ tags {"aof external:skip"} { } } } + +# make sure the test infra won't use SELECT +set old_singledb $::singledb +set ::singledb 1 + +tags {"aof cluster external:skip"} { + test {Test cluster slots / cluster shards in aof won't crash} { + create_aof $aof_dirpath $aof_file { + append_to_aof [formatCommand cluster slots] + append_to_aof [formatCommand cluster shards] + } + + create_aof_manifest $aof_dirpath $aof_manifest_file { + append_to_manifest "file appendonly.aof.1.incr.aof seq 1 type i\n" + } + + start_server_aof [list dir $server_path cluster-enabled yes] { + assert_equal [r ping] {PONG} + } + } +} + +set ::singledb $old_singledb diff --git a/tests/modules/Makefile b/tests/modules/Makefile index 9966b8840..1690b9b62 100644 --- a/tests/modules/Makefile +++ b/tests/modules/Makefile @@ -63,7 +63,8 @@ TEST_MODULES = \ postnotifications.so \ moduleauthtwo.so \ rdbloadsave.so \ - crash.so + crash.so \ + cluster.so .PHONY: all diff --git a/tests/modules/cluster.c b/tests/modules/cluster.c new file mode 100644 index 000000000..b3b53d5d9 --- /dev/null +++ b/tests/modules/cluster.c @@ -0,0 +1,51 @@ +#include "valkeymodule.h" + +#define UNUSED(x) (void)(x) + +int test_cluster_slots(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { + UNUSED(argv); + + if (argc != 1) return ValkeyModule_WrongArity(ctx); + + ValkeyModuleCallReply *rep = ValkeyModule_Call(ctx, "CLUSTER", "c", "SLOTS"); + if (!rep) { + ValkeyModule_ReplyWithError(ctx, "ERR NULL reply returned"); + } else { + ValkeyModule_ReplyWithCallReply(ctx, rep); + ValkeyModule_FreeCallReply(rep); + } + + return VALKEYMODULE_OK; +} + +int test_cluster_shards(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { + UNUSED(argv); + + if (argc != 1) return ValkeyModule_WrongArity(ctx); + + ValkeyModuleCallReply *rep = ValkeyModule_Call(ctx, "CLUSTER", "c", "SHARDS"); + if (!rep) { + ValkeyModule_ReplyWithError(ctx, "ERR NULL reply returned"); + } else { + ValkeyModule_ReplyWithCallReply(ctx, rep); + ValkeyModule_FreeCallReply(rep); + } + + return VALKEYMODULE_OK; +} + +int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { + VALKEYMODULE_NOT_USED(argv); + VALKEYMODULE_NOT_USED(argc); + + if (ValkeyModule_Init(ctx, "cluster", 1, VALKEYMODULE_APIVER_1)== VALKEYMODULE_ERR) + return VALKEYMODULE_ERR; + + if (ValkeyModule_CreateCommand(ctx, "test.cluster_slots", test_cluster_slots, "", 0, 0, 0) == VALKEYMODULE_ERR) + return VALKEYMODULE_ERR; + + if (ValkeyModule_CreateCommand(ctx, "test.cluster_shards", test_cluster_shards, "", 0, 0, 0) == VALKEYMODULE_ERR) + return VALKEYMODULE_ERR; + + return VALKEYMODULE_OK; +} diff --git a/tests/unit/cluster/scripting.tcl b/tests/unit/cluster/scripting.tcl index 1cf142107..88e158afc 100644 --- a/tests/unit/cluster/scripting.tcl +++ b/tests/unit/cluster/scripting.tcl @@ -88,4 +88,12 @@ start_cluster 1 0 {tags {external:skip cluster}} { assert_match {*Can not run script on cluster, 'no-cluster' flag is set*} $e } + + test "Calling cluster slots in scripts is OK" { + assert_equal [lsort [r 0 cluster slots]] [lsort [r 0 eval "return redis.call('cluster', 'slots')" 0]] + } + + test "Calling cluster shards in scripts is OK" { + assert_equal [lsort [r 0 cluster shards]] [lsort [r 0 eval "return redis.call('cluster', 'shards')" 0]] + } } diff --git a/tests/unit/moduleapi/cluster.tcl b/tests/unit/moduleapi/cluster.tcl index 5570f980f..5e4244d68 100644 --- a/tests/unit/moduleapi/cluster.tcl +++ b/tests/unit/moduleapi/cluster.tcl @@ -219,8 +219,6 @@ start_cluster 2 2 [list config_lines $modules] { } } -} - set testmodule [file normalize tests/modules/basics.so] set modules [list loadmodule $testmodule] start_cluster 3 0 [list config_lines $modules] { @@ -234,3 +232,25 @@ start_cluster 3 0 [list config_lines $modules] { assert_equal {PONG} [$node3 PING] } } + +set testmodule [file normalize tests/modules/cluster.so] +set modules [list loadmodule $testmodule] +start_cluster 3 0 [list config_lines $modules] { + set node1 [srv 0 client] + set node2 [srv -1 client] + set node3 [srv -2 client] + + test "VM_CALL with cluster slots" { + assert_equal [lsort [$node1 cluster slots]] [lsort [$node1 test.cluster_slots]] + assert_equal [lsort [$node2 cluster slots]] [lsort [$node2 test.cluster_slots]] + assert_equal [lsort [$node3 cluster slots]] [lsort [$node3 test.cluster_slots]] + } + + test "VM_CALL with cluster shards" { + assert_equal [lsort [$node1 cluster shards]] [lsort [$node1 test.cluster_shards]] + assert_equal [lsort [$node2 cluster shards]] [lsort [$node2 test.cluster_shards]] + assert_equal [lsort [$node3 cluster shards]] [lsort [$node3 test.cluster_shards]] + } +} + +} ;# end tag