Fix module / script call CLUSTER SLOTS / SHARDS fake client check crash (#1063)
The reason is VM_Call will use a fake client without connection, so we also need to check if c->conn is NULL. This also affects scripts. If they are called in the script, the server will crash. Injecting commands into AOF will also cause startup failure. Fixes #1054. Signed-off-by: Binbin <binloveplay1314@qq.com>
This commit is contained in:
parent
6e0216471d
commit
80fcbd3fec
@ -1371,6 +1371,7 @@ struct client *createAOFClient(void) {
|
|||||||
*/
|
*/
|
||||||
c->raw_flag = 0;
|
c->raw_flag = 0;
|
||||||
c->flag.deny_blocking = 1;
|
c->flag.deny_blocking = 1;
|
||||||
|
c->flag.fake = 1;
|
||||||
|
|
||||||
/* We set the fake client as a replica waiting for the synchronization
|
/* We set the fake client as a replica waiting for the synchronization
|
||||||
* so that the server will not try to send replies to this client. */
|
* so that the server will not try to send replies to this client. */
|
||||||
|
@ -260,6 +260,7 @@ void scriptingInit(int setup) {
|
|||||||
if (lctx.lua_client == NULL) {
|
if (lctx.lua_client == NULL) {
|
||||||
lctx.lua_client = createClient(NULL);
|
lctx.lua_client = createClient(NULL);
|
||||||
lctx.lua_client->flag.script = 1;
|
lctx.lua_client->flag.script = 1;
|
||||||
|
lctx.lua_client->flag.fake = 1;
|
||||||
|
|
||||||
/* We do not want to allow blocking commands inside Lua */
|
/* We do not want to allow blocking commands inside Lua */
|
||||||
lctx.lua_client->flag.deny_blocking = 1;
|
lctx.lua_client->flag.deny_blocking = 1;
|
||||||
|
@ -412,6 +412,7 @@ int functionsRegisterEngine(const char *engine_name, engine *engine) {
|
|||||||
client *c = createClient(NULL);
|
client *c = createClient(NULL);
|
||||||
c->flag.deny_blocking = 1;
|
c->flag.deny_blocking = 1;
|
||||||
c->flag.script = 1;
|
c->flag.script = 1;
|
||||||
|
c->flag.fake = 1;
|
||||||
engineInfo *ei = zmalloc(sizeof(*ei));
|
engineInfo *ei = zmalloc(sizeof(*ei));
|
||||||
*ei = (engineInfo){
|
*ei = (engineInfo){
|
||||||
.name = engine_name_sds,
|
.name = engine_name_sds,
|
||||||
|
@ -652,6 +652,7 @@ client *moduleAllocTempClient(void) {
|
|||||||
} else {
|
} else {
|
||||||
c = createClient(NULL);
|
c = createClient(NULL);
|
||||||
c->flag.module = 1;
|
c->flag.module = 1;
|
||||||
|
c->flag.fake = 1;
|
||||||
c->user = NULL; /* Root user */
|
c->user = NULL; /* Root user */
|
||||||
}
|
}
|
||||||
return c;
|
return c;
|
||||||
@ -890,8 +891,10 @@ void moduleCreateContext(ValkeyModuleCtx *out_ctx, ValkeyModule *module, int ctx
|
|||||||
out_ctx->flags = ctx_flags;
|
out_ctx->flags = ctx_flags;
|
||||||
if (ctx_flags & VALKEYMODULE_CTX_TEMP_CLIENT)
|
if (ctx_flags & VALKEYMODULE_CTX_TEMP_CLIENT)
|
||||||
out_ctx->client = moduleAllocTempClient();
|
out_ctx->client = moduleAllocTempClient();
|
||||||
else if (ctx_flags & VALKEYMODULE_CTX_NEW_CLIENT)
|
else if (ctx_flags & VALKEYMODULE_CTX_NEW_CLIENT) {
|
||||||
out_ctx->client = createClient(NULL);
|
out_ctx->client = createClient(NULL);
|
||||||
|
out_ctx->client->flag.fake = 1;
|
||||||
|
}
|
||||||
|
|
||||||
/* Calculate the initial yield time for long blocked contexts.
|
/* Calculate the initial yield time for long blocked contexts.
|
||||||
* in loading we depend on the server hz, but in other cases we also wait
|
* in loading we depend on the server hz, but in other cases we also wait
|
||||||
|
@ -350,6 +350,7 @@ client *createCachedResponseClient(int resp) {
|
|||||||
/* Allocating the `conn` allows to prepare the caching client before adding
|
/* Allocating the `conn` allows to prepare the caching client before adding
|
||||||
* data to the clients output buffer by `prepareClientToWrite`. */
|
* data to the clients output buffer by `prepareClientToWrite`. */
|
||||||
recording_client->conn = zcalloc(sizeof(connection));
|
recording_client->conn = zcalloc(sizeof(connection));
|
||||||
|
recording_client->flag.fake = 1;
|
||||||
return recording_client;
|
return recording_client;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -3250,8 +3251,10 @@ char *getClientSockname(client *c) {
|
|||||||
int isClientConnIpV6(client *c) {
|
int isClientConnIpV6(client *c) {
|
||||||
/* The cached client peer id is on the form "[IPv6]:port" for IPv6
|
/* The cached client peer id is on the form "[IPv6]:port" for IPv6
|
||||||
* addresses, so we just check for '[' here. */
|
* addresses, so we just check for '[' here. */
|
||||||
if (c->conn->type == NULL && server.current_client) {
|
if (c->flag.fake && server.current_client) {
|
||||||
/* Fake client? Use current client instead. */
|
/* Fake client? Use current client instead.
|
||||||
|
* Noted that in here we are assuming server.current_client is set
|
||||||
|
* and real (aof has already violated this in loadSingleAppendOnlyFil). */
|
||||||
c = server.current_client;
|
c = server.current_client;
|
||||||
}
|
}
|
||||||
return getClientPeerId(c)[0] == '[';
|
return getClientPeerId(c)[0] == '[';
|
||||||
|
@ -1222,7 +1222,8 @@ typedef struct ClientFlags {
|
|||||||
uint64_t dont_cache_primary : 1; /* In some cases we don't want to cache the primary. For example, the replica
|
uint64_t dont_cache_primary : 1; /* In some cases we don't want to cache the primary. For example, the replica
|
||||||
* knows that it does not need the cache and required a full sync. With this
|
* knows that it does not need the cache and required a full sync. With this
|
||||||
* flag, we won't cache the primary in freeClient. */
|
* flag, we won't cache the primary in freeClient. */
|
||||||
uint64_t reserved : 6; /* Reserved for future use */
|
uint64_t fake : 1; /* This is a fake client without a real connection. */
|
||||||
|
uint64_t reserved : 5; /* Reserved for future use */
|
||||||
} ClientFlags;
|
} ClientFlags;
|
||||||
|
|
||||||
typedef struct client {
|
typedef struct client {
|
||||||
|
@ -673,3 +673,26 @@ tags {"aof external:skip"} {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# make sure the test infra won't use SELECT
|
||||||
|
set old_singledb $::singledb
|
||||||
|
set ::singledb 1
|
||||||
|
|
||||||
|
tags {"aof cluster external:skip"} {
|
||||||
|
test {Test cluster slots / cluster shards in aof won't crash} {
|
||||||
|
create_aof $aof_dirpath $aof_file {
|
||||||
|
append_to_aof [formatCommand cluster slots]
|
||||||
|
append_to_aof [formatCommand cluster shards]
|
||||||
|
}
|
||||||
|
|
||||||
|
create_aof_manifest $aof_dirpath $aof_manifest_file {
|
||||||
|
append_to_manifest "file appendonly.aof.1.incr.aof seq 1 type i\n"
|
||||||
|
}
|
||||||
|
|
||||||
|
start_server_aof [list dir $server_path cluster-enabled yes] {
|
||||||
|
assert_equal [r ping] {PONG}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
set ::singledb $old_singledb
|
||||||
|
@ -63,7 +63,8 @@ TEST_MODULES = \
|
|||||||
postnotifications.so \
|
postnotifications.so \
|
||||||
moduleauthtwo.so \
|
moduleauthtwo.so \
|
||||||
rdbloadsave.so \
|
rdbloadsave.so \
|
||||||
crash.so
|
crash.so \
|
||||||
|
cluster.so
|
||||||
|
|
||||||
.PHONY: all
|
.PHONY: all
|
||||||
|
|
||||||
|
51
tests/modules/cluster.c
Normal file
51
tests/modules/cluster.c
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
#include "valkeymodule.h"
|
||||||
|
|
||||||
|
#define UNUSED(x) (void)(x)
|
||||||
|
|
||||||
|
int test_cluster_slots(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
|
||||||
|
UNUSED(argv);
|
||||||
|
|
||||||
|
if (argc != 1) return ValkeyModule_WrongArity(ctx);
|
||||||
|
|
||||||
|
ValkeyModuleCallReply *rep = ValkeyModule_Call(ctx, "CLUSTER", "c", "SLOTS");
|
||||||
|
if (!rep) {
|
||||||
|
ValkeyModule_ReplyWithError(ctx, "ERR NULL reply returned");
|
||||||
|
} else {
|
||||||
|
ValkeyModule_ReplyWithCallReply(ctx, rep);
|
||||||
|
ValkeyModule_FreeCallReply(rep);
|
||||||
|
}
|
||||||
|
|
||||||
|
return VALKEYMODULE_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
int test_cluster_shards(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
|
||||||
|
UNUSED(argv);
|
||||||
|
|
||||||
|
if (argc != 1) return ValkeyModule_WrongArity(ctx);
|
||||||
|
|
||||||
|
ValkeyModuleCallReply *rep = ValkeyModule_Call(ctx, "CLUSTER", "c", "SHARDS");
|
||||||
|
if (!rep) {
|
||||||
|
ValkeyModule_ReplyWithError(ctx, "ERR NULL reply returned");
|
||||||
|
} else {
|
||||||
|
ValkeyModule_ReplyWithCallReply(ctx, rep);
|
||||||
|
ValkeyModule_FreeCallReply(rep);
|
||||||
|
}
|
||||||
|
|
||||||
|
return VALKEYMODULE_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
|
||||||
|
VALKEYMODULE_NOT_USED(argv);
|
||||||
|
VALKEYMODULE_NOT_USED(argc);
|
||||||
|
|
||||||
|
if (ValkeyModule_Init(ctx, "cluster", 1, VALKEYMODULE_APIVER_1)== VALKEYMODULE_ERR)
|
||||||
|
return VALKEYMODULE_ERR;
|
||||||
|
|
||||||
|
if (ValkeyModule_CreateCommand(ctx, "test.cluster_slots", test_cluster_slots, "", 0, 0, 0) == VALKEYMODULE_ERR)
|
||||||
|
return VALKEYMODULE_ERR;
|
||||||
|
|
||||||
|
if (ValkeyModule_CreateCommand(ctx, "test.cluster_shards", test_cluster_shards, "", 0, 0, 0) == VALKEYMODULE_ERR)
|
||||||
|
return VALKEYMODULE_ERR;
|
||||||
|
|
||||||
|
return VALKEYMODULE_OK;
|
||||||
|
}
|
@ -88,4 +88,12 @@ start_cluster 1 0 {tags {external:skip cluster}} {
|
|||||||
|
|
||||||
assert_match {*Can not run script on cluster, 'no-cluster' flag is set*} $e
|
assert_match {*Can not run script on cluster, 'no-cluster' flag is set*} $e
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test "Calling cluster slots in scripts is OK" {
|
||||||
|
assert_equal [lsort [r 0 cluster slots]] [lsort [r 0 eval "return redis.call('cluster', 'slots')" 0]]
|
||||||
|
}
|
||||||
|
|
||||||
|
test "Calling cluster shards in scripts is OK" {
|
||||||
|
assert_equal [lsort [r 0 cluster shards]] [lsort [r 0 eval "return redis.call('cluster', 'shards')" 0]]
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -219,8 +219,6 @@ start_cluster 2 2 [list config_lines $modules] {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
set testmodule [file normalize tests/modules/basics.so]
|
set testmodule [file normalize tests/modules/basics.so]
|
||||||
set modules [list loadmodule $testmodule]
|
set modules [list loadmodule $testmodule]
|
||||||
start_cluster 3 0 [list config_lines $modules] {
|
start_cluster 3 0 [list config_lines $modules] {
|
||||||
@ -234,3 +232,25 @@ start_cluster 3 0 [list config_lines $modules] {
|
|||||||
assert_equal {PONG} [$node3 PING]
|
assert_equal {PONG} [$node3 PING]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
set testmodule [file normalize tests/modules/cluster.so]
|
||||||
|
set modules [list loadmodule $testmodule]
|
||||||
|
start_cluster 3 0 [list config_lines $modules] {
|
||||||
|
set node1 [srv 0 client]
|
||||||
|
set node2 [srv -1 client]
|
||||||
|
set node3 [srv -2 client]
|
||||||
|
|
||||||
|
test "VM_CALL with cluster slots" {
|
||||||
|
assert_equal [lsort [$node1 cluster slots]] [lsort [$node1 test.cluster_slots]]
|
||||||
|
assert_equal [lsort [$node2 cluster slots]] [lsort [$node2 test.cluster_slots]]
|
||||||
|
assert_equal [lsort [$node3 cluster slots]] [lsort [$node3 test.cluster_slots]]
|
||||||
|
}
|
||||||
|
|
||||||
|
test "VM_CALL with cluster shards" {
|
||||||
|
assert_equal [lsort [$node1 cluster shards]] [lsort [$node1 test.cluster_shards]]
|
||||||
|
assert_equal [lsort [$node2 cluster shards]] [lsort [$node2 test.cluster_shards]]
|
||||||
|
assert_equal [lsort [$node3 cluster shards]] [lsort [$node3 test.cluster_shards]]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} ;# end tag
|
||||||
|
Loading…
x
Reference in New Issue
Block a user