Fix module / script call CLUSTER SLOTS / SHARDS fake client check crash (#1063)
The reason is VM_Call will use a fake client without connection, so we also need to check if c->conn is NULL. This also affects scripts. If they are called in the script, the server will crash. Injecting commands into AOF will also cause startup failure. Fixes #1054. Signed-off-by: Binbin <binloveplay1314@qq.com>
This commit is contained in:
parent
3e3b955f8f
commit
b6744f2b1e
@ -1371,6 +1371,7 @@ struct client *createAOFClient(void) {
|
||||
*/
|
||||
c->raw_flag = 0;
|
||||
c->flag.deny_blocking = 1;
|
||||
c->flag.fake = 1;
|
||||
|
||||
/* We set the fake client as a replica waiting for the synchronization
|
||||
* so that the server will not try to send replies to this client. */
|
||||
|
@ -260,6 +260,7 @@ void scriptingInit(int setup) {
|
||||
if (lctx.lua_client == NULL) {
|
||||
lctx.lua_client = createClient(NULL);
|
||||
lctx.lua_client->flag.script = 1;
|
||||
lctx.lua_client->flag.fake = 1;
|
||||
|
||||
/* We do not want to allow blocking commands inside Lua */
|
||||
lctx.lua_client->flag.deny_blocking = 1;
|
||||
|
@ -408,6 +408,7 @@ int functionsRegisterEngine(const char *engine_name, engine *engine) {
|
||||
client *c = createClient(NULL);
|
||||
c->flag.deny_blocking = 1;
|
||||
c->flag.script = 1;
|
||||
c->flag.fake = 1;
|
||||
engineInfo *ei = zmalloc(sizeof(*ei));
|
||||
*ei = (engineInfo){
|
||||
.name = engine_name_sds,
|
||||
|
@ -656,6 +656,7 @@ client *moduleAllocTempClient(void) {
|
||||
} else {
|
||||
c = createClient(NULL);
|
||||
c->flag.module = 1;
|
||||
c->flag.fake = 1;
|
||||
c->user = NULL; /* Root user */
|
||||
}
|
||||
return c;
|
||||
@ -894,8 +895,10 @@ void moduleCreateContext(ValkeyModuleCtx *out_ctx, ValkeyModule *module, int ctx
|
||||
out_ctx->flags = ctx_flags;
|
||||
if (ctx_flags & VALKEYMODULE_CTX_TEMP_CLIENT)
|
||||
out_ctx->client = moduleAllocTempClient();
|
||||
else if (ctx_flags & VALKEYMODULE_CTX_NEW_CLIENT)
|
||||
else if (ctx_flags & VALKEYMODULE_CTX_NEW_CLIENT) {
|
||||
out_ctx->client = createClient(NULL);
|
||||
out_ctx->client->flag.fake = 1;
|
||||
}
|
||||
|
||||
/* Calculate the initial yield time for long blocked contexts.
|
||||
* in loading we depend on the server hz, but in other cases we also wait
|
||||
|
@ -346,6 +346,7 @@ client *createCachedResponseClient(int resp) {
|
||||
/* Allocating the `conn` allows to prepare the caching client before adding
|
||||
* data to the clients output buffer by `prepareClientToWrite`. */
|
||||
recording_client->conn = zcalloc(sizeof(connection));
|
||||
recording_client->flag.fake = 1;
|
||||
return recording_client;
|
||||
}
|
||||
|
||||
@ -3246,8 +3247,10 @@ char *getClientSockname(client *c) {
|
||||
int isClientConnIpV6(client *c) {
|
||||
/* The cached client peer id is on the form "[IPv6]:port" for IPv6
|
||||
* addresses, so we just check for '[' here. */
|
||||
if (c->conn->type == NULL && server.current_client) {
|
||||
/* Fake client? Use current client instead. */
|
||||
if (c->flag.fake && server.current_client) {
|
||||
/* Fake client? Use current client instead.
|
||||
* Noted that in here we are assuming server.current_client is set
|
||||
* and real (aof has already violated this in loadSingleAppendOnlyFil). */
|
||||
c = server.current_client;
|
||||
}
|
||||
return getClientPeerId(c)[0] == '[';
|
||||
|
40
src/server.h
40
src/server.h
@ -1223,26 +1223,26 @@ typedef struct ClientFlags {
|
||||
uint64_t reprocessing_command : 1; /* The client is re-processing the command. */
|
||||
uint64_t replication_done : 1; /* Indicate that replication has been done on the client */
|
||||
uint64_t authenticated : 1; /* Indicate a client has successfully authenticated */
|
||||
uint64_t
|
||||
protected_rdb_channel : 1; /* Dual channel replication sync: Protects the RDB client from premature \
|
||||
* release during full sync. This flag is used to ensure that the RDB client, which \
|
||||
* references the first replication data block required by the replica, is not \
|
||||
* released prematurely. Protecting the client is crucial for prevention of \
|
||||
* synchronization failures: \
|
||||
* If the RDB client is released before the replica initiates PSYNC, the primary \
|
||||
* will reduce the reference count (o->refcount) of the block needed by the replica.
|
||||
* \
|
||||
* This could potentially lead to the removal of the required data block, resulting \
|
||||
* in synchronization failures. Such failures could occur even in scenarios where \
|
||||
* the replica only needs an additional 4KB beyond the minimum size of the
|
||||
* repl_backlog.
|
||||
* By using this flag, we ensure that the RDB client remains intact until the replica
|
||||
* \ has successfully initiated PSYNC. */
|
||||
uint64_t repl_rdb_channel : 1; /* Dual channel replication sync: track a connection which is used for rdb snapshot */
|
||||
uint64_t dont_cache_primary : 1; /* In some cases we don't want to cache the primary. For example, the replica
|
||||
* knows that it does not need the cache and required a full sync. With this
|
||||
* flag, we won't cache the primary in freeClient. */
|
||||
uint64_t reserved : 6; /* Reserved for future use */
|
||||
uint64_t protected_rdb_channel : 1; /* Dual channel replication sync: Protects the RDB client from premature \
|
||||
* release during full sync. This flag is used to ensure that the RDB client, which \
|
||||
* references the first replication data block required by the replica, is not \
|
||||
* released prematurely. Protecting the client is crucial for prevention of \
|
||||
* synchronization failures: \
|
||||
* If the RDB client is released before the replica initiates PSYNC, the primary \
|
||||
* will reduce the reference count (o->refcount) of the block needed by the replica.
|
||||
* \
|
||||
* This could potentially lead to the removal of the required data block, resulting \
|
||||
* in synchronization failures. Such failures could occur even in scenarios where \
|
||||
* the replica only needs an additional 4KB beyond the minimum size of the
|
||||
* repl_backlog.
|
||||
* By using this flag, we ensure that the RDB client remains intact until the replica
|
||||
* \ has successfully initiated PSYNC. */
|
||||
uint64_t repl_rdb_channel : 1; /* Dual channel replication sync: track a connection which is used for rdb snapshot */
|
||||
uint64_t dont_cache_primary : 1; /* In some cases we don't want to cache the primary. For example, the replica
|
||||
* knows that it does not need the cache and required a full sync. With this
|
||||
* flag, we won't cache the primary in freeClient. */
|
||||
uint64_t fake : 1; /* This is a fake client without a real connection. */
|
||||
uint64_t reserved : 5; /* Reserved for future use */
|
||||
} ClientFlags;
|
||||
|
||||
typedef struct client {
|
||||
|
@ -673,3 +673,26 @@ tags {"aof external:skip"} {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# make sure the test infra won't use SELECT
|
||||
set old_singledb $::singledb
|
||||
set ::singledb 1
|
||||
|
||||
tags {"aof cluster external:skip"} {
|
||||
test {Test cluster slots / cluster shards in aof won't crash} {
|
||||
create_aof $aof_dirpath $aof_file {
|
||||
append_to_aof [formatCommand cluster slots]
|
||||
append_to_aof [formatCommand cluster shards]
|
||||
}
|
||||
|
||||
create_aof_manifest $aof_dirpath $aof_manifest_file {
|
||||
append_to_manifest "file appendonly.aof.1.incr.aof seq 1 type i\n"
|
||||
}
|
||||
|
||||
start_server_aof [list dir $server_path cluster-enabled yes] {
|
||||
assert_equal [r ping] {PONG}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
set ::singledb $old_singledb
|
||||
|
@ -63,7 +63,8 @@ TEST_MODULES = \
|
||||
postnotifications.so \
|
||||
moduleauthtwo.so \
|
||||
rdbloadsave.so \
|
||||
crash.so
|
||||
crash.so \
|
||||
cluster.so
|
||||
|
||||
.PHONY: all
|
||||
|
||||
|
51
tests/modules/cluster.c
Normal file
51
tests/modules/cluster.c
Normal file
@ -0,0 +1,51 @@
|
||||
#include "valkeymodule.h"
|
||||
|
||||
#define UNUSED(x) (void)(x)
|
||||
|
||||
int test_cluster_slots(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
|
||||
UNUSED(argv);
|
||||
|
||||
if (argc != 1) return ValkeyModule_WrongArity(ctx);
|
||||
|
||||
ValkeyModuleCallReply *rep = ValkeyModule_Call(ctx, "CLUSTER", "c", "SLOTS");
|
||||
if (!rep) {
|
||||
ValkeyModule_ReplyWithError(ctx, "ERR NULL reply returned");
|
||||
} else {
|
||||
ValkeyModule_ReplyWithCallReply(ctx, rep);
|
||||
ValkeyModule_FreeCallReply(rep);
|
||||
}
|
||||
|
||||
return VALKEYMODULE_OK;
|
||||
}
|
||||
|
||||
int test_cluster_shards(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
|
||||
UNUSED(argv);
|
||||
|
||||
if (argc != 1) return ValkeyModule_WrongArity(ctx);
|
||||
|
||||
ValkeyModuleCallReply *rep = ValkeyModule_Call(ctx, "CLUSTER", "c", "SHARDS");
|
||||
if (!rep) {
|
||||
ValkeyModule_ReplyWithError(ctx, "ERR NULL reply returned");
|
||||
} else {
|
||||
ValkeyModule_ReplyWithCallReply(ctx, rep);
|
||||
ValkeyModule_FreeCallReply(rep);
|
||||
}
|
||||
|
||||
return VALKEYMODULE_OK;
|
||||
}
|
||||
|
||||
int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
|
||||
VALKEYMODULE_NOT_USED(argv);
|
||||
VALKEYMODULE_NOT_USED(argc);
|
||||
|
||||
if (ValkeyModule_Init(ctx, "cluster", 1, VALKEYMODULE_APIVER_1)== VALKEYMODULE_ERR)
|
||||
return VALKEYMODULE_ERR;
|
||||
|
||||
if (ValkeyModule_CreateCommand(ctx, "test.cluster_slots", test_cluster_slots, "", 0, 0, 0) == VALKEYMODULE_ERR)
|
||||
return VALKEYMODULE_ERR;
|
||||
|
||||
if (ValkeyModule_CreateCommand(ctx, "test.cluster_shards", test_cluster_shards, "", 0, 0, 0) == VALKEYMODULE_ERR)
|
||||
return VALKEYMODULE_ERR;
|
||||
|
||||
return VALKEYMODULE_OK;
|
||||
}
|
@ -88,4 +88,12 @@ start_cluster 1 0 {tags {external:skip cluster}} {
|
||||
|
||||
assert_match {*Can not run script on cluster, 'no-cluster' flag is set*} $e
|
||||
}
|
||||
|
||||
test "Calling cluster slots in scripts is OK" {
|
||||
assert_equal [lsort [r 0 cluster slots]] [lsort [r 0 eval "return redis.call('cluster', 'slots')" 0]]
|
||||
}
|
||||
|
||||
test "Calling cluster shards in scripts is OK" {
|
||||
assert_equal [lsort [r 0 cluster shards]] [lsort [r 0 eval "return redis.call('cluster', 'shards')" 0]]
|
||||
}
|
||||
}
|
||||
|
@ -219,8 +219,6 @@ start_cluster 2 2 [list config_lines $modules] {
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
set testmodule [file normalize tests/modules/basics.so]
|
||||
set modules [list loadmodule $testmodule]
|
||||
start_cluster 3 0 [list config_lines $modules] {
|
||||
@ -234,3 +232,25 @@ start_cluster 3 0 [list config_lines $modules] {
|
||||
assert_equal {PONG} [$node3 PING]
|
||||
}
|
||||
}
|
||||
|
||||
set testmodule [file normalize tests/modules/cluster.so]
|
||||
set modules [list loadmodule $testmodule]
|
||||
start_cluster 3 0 [list config_lines $modules] {
|
||||
set node1 [srv 0 client]
|
||||
set node2 [srv -1 client]
|
||||
set node3 [srv -2 client]
|
||||
|
||||
test "VM_CALL with cluster slots" {
|
||||
assert_equal [lsort [$node1 cluster slots]] [lsort [$node1 test.cluster_slots]]
|
||||
assert_equal [lsort [$node2 cluster slots]] [lsort [$node2 test.cluster_slots]]
|
||||
assert_equal [lsort [$node3 cluster slots]] [lsort [$node3 test.cluster_slots]]
|
||||
}
|
||||
|
||||
test "VM_CALL with cluster shards" {
|
||||
assert_equal [lsort [$node1 cluster shards]] [lsort [$node1 test.cluster_shards]]
|
||||
assert_equal [lsort [$node2 cluster shards]] [lsort [$node2 test.cluster_shards]]
|
||||
assert_equal [lsort [$node3 cluster shards]] [lsort [$node3 test.cluster_shards]]
|
||||
}
|
||||
}
|
||||
|
||||
} ;# end tag
|
||||
|
Loading…
x
Reference in New Issue
Block a user