Merge 93acad9f71e3ed04c656739ca96cfb7f852505cd into 26c6f1af9b29d525831c7fa9840ab3e47ed7b700
This commit is contained in:
commit
21cea35bf9
11
src/module.c
11
src/module.c
@ -3721,6 +3721,16 @@ ValkeyModuleString *VM_GetClientUserNameById(ValkeyModuleCtx *ctx, uint64_t id)
|
||||
return str;
|
||||
}
|
||||
|
||||
/* Returns 1 if commands are arriving from the primary client or AOF client
|
||||
* and should never be rejected.
|
||||
* This check can be used in places such as skipping validation of commands
|
||||
* on replicas (to not diverge from primary) or from AOF files.
|
||||
* Returns 0 otherwise (and also if ctx or if the client is NULL). */
|
||||
int VM_MustObeyClient(ValkeyModuleCtx *ctx) {
|
||||
if (!ctx || !ctx->client) return 0;
|
||||
return mustObeyClient(ctx->client);
|
||||
}
|
||||
|
||||
/* This is a helper for VM_GetClientInfoById() and other functions: given
|
||||
* a client, it populates the client info structure with the appropriate
|
||||
* fields depending on the version provided. If the version is not valid
|
||||
@ -13841,6 +13851,7 @@ void moduleRegisterCoreAPI(void) {
|
||||
REGISTER_API(ChannelAtPosWithFlags);
|
||||
REGISTER_API(GetClientId);
|
||||
REGISTER_API(GetClientUserNameById);
|
||||
REGISTER_API(MustObeyClient);
|
||||
REGISTER_API(GetContextFlags);
|
||||
REGISTER_API(AvoidReplicaTraffic);
|
||||
REGISTER_API(PoolAlloc);
|
||||
|
@ -1317,6 +1317,7 @@ VALKEYMODULE_API void (*ValkeyModule_ChannelAtPosWithFlags)(ValkeyModuleCtx *ctx
|
||||
VALKEYMODULE_API unsigned long long (*ValkeyModule_GetClientId)(ValkeyModuleCtx *ctx) VALKEYMODULE_ATTR;
|
||||
VALKEYMODULE_API ValkeyModuleString *(*ValkeyModule_GetClientUserNameById)(ValkeyModuleCtx *ctx,
|
||||
uint64_t id)VALKEYMODULE_ATTR;
|
||||
VALKEYMODULE_API int (*ValkeyModule_MustObeyClient)(ValkeyModuleCtx *ctx) VALKEYMODULE_ATTR;
|
||||
VALKEYMODULE_API int (*ValkeyModule_GetClientInfoById)(void *ci, uint64_t id) VALKEYMODULE_ATTR;
|
||||
VALKEYMODULE_API ValkeyModuleString *(*ValkeyModule_GetClientNameById)(ValkeyModuleCtx *ctx,
|
||||
uint64_t id)VALKEYMODULE_ATTR;
|
||||
@ -1942,6 +1943,7 @@ static int ValkeyModule_Init(ValkeyModuleCtx *ctx, const char *name, int ver, in
|
||||
VALKEYMODULE_GET_API(ChannelAtPosWithFlags);
|
||||
VALKEYMODULE_GET_API(GetClientId);
|
||||
VALKEYMODULE_GET_API(GetClientUserNameById);
|
||||
VALKEYMODULE_GET_API(MustObeyClient);
|
||||
VALKEYMODULE_GET_API(GetContextFlags);
|
||||
VALKEYMODULE_GET_API(AvoidReplicaTraffic);
|
||||
VALKEYMODULE_GET_API(PoolAlloc);
|
||||
|
@ -310,11 +310,26 @@ int propagateTestNestedCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv,
|
||||
return VALKEYMODULE_OK;
|
||||
}
|
||||
|
||||
/* Counter to track "propagate-test.incr" commands which were obeyed (due to being replicated or processed from AOF). */
|
||||
static long long obeyed_cmds = 0;
|
||||
|
||||
/* Handles the "propagate-test.obeyed" command to return the `obeyed_cmds` count. */
|
||||
int propagateTestObeyed(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
|
||||
VALKEYMODULE_NOT_USED(argv);
|
||||
VALKEYMODULE_NOT_USED(argc);
|
||||
ValkeyModule_ReplyWithLongLong(ctx, obeyed_cmds);
|
||||
return VALKEYMODULE_OK;
|
||||
}
|
||||
|
||||
int propagateTestIncr(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc)
|
||||
{
|
||||
VALKEYMODULE_NOT_USED(argc);
|
||||
ValkeyModuleCallReply *reply;
|
||||
|
||||
/* Track the number of commands which are "obeyed". */
|
||||
if (ValkeyModule_MustObeyClient(ctx)) {
|
||||
obeyed_cmds += 1;
|
||||
}
|
||||
/* This test propagates the module command, not the INCR it executes. */
|
||||
reply = ValkeyModule_Call(ctx, "INCR", "s", argv[1]);
|
||||
ValkeyModule_ReplyWithCallReply(ctx,reply);
|
||||
@ -390,6 +405,11 @@ int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int arg
|
||||
"write",1,1,1) == VALKEYMODULE_ERR)
|
||||
return VALKEYMODULE_ERR;
|
||||
|
||||
if (ValkeyModule_CreateCommand(ctx,"propagate-test.obeyed",
|
||||
propagateTestObeyed,
|
||||
"",1,1,1) == VALKEYMODULE_ERR)
|
||||
return VALKEYMODULE_ERR;
|
||||
|
||||
return VALKEYMODULE_OK;
|
||||
}
|
||||
|
||||
|
@ -583,6 +583,7 @@ tags "modules" {
|
||||
after 110
|
||||
|
||||
set repl [attach_to_replication_stream]
|
||||
assert_equal [$replica propagate-test.obeyed] 0
|
||||
$master propagate-test.incr k1
|
||||
|
||||
assert_replication_stream $repl {
|
||||
@ -598,6 +599,10 @@ tags "modules" {
|
||||
assert_equal [$master ttl k1] -1
|
||||
assert_equal [$replica get k1] 1
|
||||
assert_equal [$replica ttl k1] -1
|
||||
# Validate that replicated commands were "obeyed" from primary.
|
||||
assert_equal [$replica propagate-test.obeyed] 1
|
||||
$master propagate-test.incr k1
|
||||
assert_equal [$replica propagate-test.obeyed] 2
|
||||
}
|
||||
|
||||
test {module notification on set} {
|
||||
|
Loading…
x
Reference in New Issue
Block a user